Spark的shuffle介紹
Spark的shuffle介紹
shuffle簡介:在 DAG 階段以shuffle為界,劃分 stage,
上游 stage做 map task,每個maptask將計算結果數據分成多份,每一份對應到下游stage 的每個partition中,并將其臨時寫到磁盤,該過程叫做shuffle write;
下游stage 做reduce task,每個reduce task通過網絡拉取上游 stage中所有map task的指定分區結果數據,該過程叫做shuffle read,最后完成reduce的業務邏輯。
Shuffle版本也隨著spark不斷進步和優化:
從2.0開始,把 Sort Based Shuffle和 Tungsten-Sort全部統一到 Sort Based Shuffle中,Hash Based Shuffle退出歷史舞臺。
目前spark2.1,直接把SortBased Shuffle的writer分為三種:BypassMergeSortShuffleWriter,SortShuffleWriter和 UnsafeShuffle Writer。
BypassMergeSortShuffle Writer: Hash Shuffle 中的HashShuffle Writer實現基本一致,唯一的區別在于,map端的多個輸出文件會被匯總為一個文件。所有分區的數據會合并為同一個文件,會生成一個索引文件,是為了索引到每個分區的起始地址,可以隨機 access某個partition 的所有數據。
SortShuffleWriter:會對分區內進行排序或者全局排序。
處理步驟:使用 PartitionedAppendOnlyMap或者 PartitionedPairBuffer在內存中進行排序,排序的K是(partitionld, hash (key))這樣一個元組。如果超過內存limit,spill 到一個文件中,這個文件中元素也是有序的,首先是按照 partitionld 時排廳,如宋 partona相同,再根據hash (key)進行比較排序。如果需要輸出全局有序的文件的時候,就需要對之前所有的輸出文件和當前內存中的數據結構中的數據進行merge sort,實現全局排序。
最終讀取的時候,從整個全局merge后的讀取迭代器中讀取的數據,就是按照partitionld 從小到大排序的數據,誤取過在中使用丹僅州刀區力權,并且記錄每個分區文件的起始寫入位置,把這些位置數據寫入索引文件中。
UnsafeShuffleWriter:優化部分是在shuffle write進行序列化寫入過程中,直接對二進制進行排序,減少了內存消耗,最終只是 partition 級別的排序。
但是這種需要一定條件:對單條記錄、shuffle數量有限制,而且不能帶有聚合函數。排序實現:利用一個LongArray存儲分區 ID、pageNumber、offset in page,并對這個數組排序。每次插入一條 record 到 page 中,就把 partionld + pageNumber + offset in page,作可以迭代器 PackedRecordPointer
為一個元素插入到 LongArray中。要想反向獲得 record,
定義的數據結構就是[24 bit partition number][13 bit memory page number][27 bit offset inpage]然后到根據該指針可以拿到真實的record。
spark
版權聲明:本文內容由網絡用戶投稿,版權歸原作者所有,本站不擁有其著作權,亦不承擔相應法律責任。如果您發現本站中有涉嫌抄襲或描述失實的內容,請聯系我們jiasou666@gmail.com 處理,核實后本網站將在24小時內刪除侵權內容。