Spark shuffle介紹:概述
在基于mapreduce思想的計算模型里,Shuffle是map和reduce的紐帶。計算框架對大數據分而治之,對處理數據進行分塊并行處理,當需要對分塊數據做聚合處理時,多個分塊的數據在map階段轉為k-v結構,然后按key分區,在reduce階段對各自分區的數據進行計算歸并。map和reduce中間對數據做分區并規整的過程,就是shuffle的過程。
在spark中,對shuffle也從RDD的角度進行了定義。spark core的作業就是rdd的一系列轉換,從aRDD轉為bRDD,RDD之間有寬依賴和窄依賴,對于存在寬依賴的2個RDD之間,就會存在shuffle。
窄依賴:父RDD的每個分區數據都只被RDD的一個分區使用
寬依賴:父RDD的每個分區數據都被子RDD的多個分區使用
在hadoop、spark這類批處理的計算框架中,考慮到容錯,shuffle過程的數據都是序列化到了磁盤。map做shuffle write,reduce做shuffle read。這個過程涉及了CPU對數據序列化及各種內存拷貝、內存對數據做分區排序、磁盤對shuffle數據存儲讀取、以及網絡遠程fetch,是計算過程中的資源消耗大戶,也是最大的瓶頸。
下面以wordcount為例子,概要描述下作業shuffle過程,目標為統計輸入文件里每個單詞的數量。
比如有2個文本文件,文件1內容為: abc def 文件2內容為:abc ghi。
期望結果為:abc 2;def 1;ghi 1(abc有2個,def有1個,ghi有1個)
整個數據流如下:
1、作業運行時首先啟2個task,分別讀取文件1和文件2,每個task將自己讀到文件內容split為單詞
2、每個task將讀到的單詞,轉為key-value結構,即word-數量的結構
3、每個task將key-value數據按key做分區,假設目標分區為3個,3個單詞abc、def、ghi分別分到3個分區,并各自shuffle write為對應的文件
4、啟動3個reduce任務,每個reduce任務讀取對應分區的數據
5、每個reduce任務對相同的單詞合并,數量合計
6、每個reduce任務的結果合并到driver輸出結果
為了并行處理數據,需要啟動多個task進行文件的讀取和split。但count是對全部的單詞做count,每個task里只包含了某1個文件的單詞,這種就需要shuffle,把相同的單詞先分到同一個分區里,再分別對每個分區做count,即可得到最終結果。這個簡單的例子幫到你理解為什么需要shuffle了么。
spark
版權聲明:本文內容由網絡用戶投稿,版權歸原作者所有,本站不擁有其著作權,亦不承擔相應法律責任。如果您發現本站中有涉嫌抄襲或描述失實的內容,請聯系我們jiasou666@gmail.com 處理,核實后本網站將在24小時內刪除侵權內容。