Spark內核詳解 (6) | Spark Shuffle 解析

      網友投稿 748 2025-04-01

      大家好,我是不溫卜火,是一名計算機學院大數據專業大二的學生,昵稱來源于成語—不溫不火,本意是希望自己性情溫和。作為一名互聯網行業的小白,博主寫博客一方面是為了記錄自己的學習過程,另一方面是總結自己所犯的錯誤希望能夠幫助到很多和自己一樣處于起步階段的萌新。但由于水平有限,博客中難免會有一些錯誤出現,有紕漏之處懇請各位大佬不吝賜教!暫時只有csdn這一個平臺,博客主頁:https://buwenbuhuo.blog.csdn.net/


      本片博文為大家帶來的是Spark Shuffle 解析。

      目錄

      Shuffle 的核心要點

      一. Shuffle 流程源碼分析

      二. HashShuffle 解析

      三. SortShuffle 解析

      在所有的 MapReduce 框架中, Shuffle 是連接 map 任務和 reduce 任務的橋梁. map 任務的中間輸出要作為 reduce 任務的輸入, 就必須經過 Shuffle, 所以 Shuffle 的性能的優劣直接決定了整個計算引擎的性能和吞吐量.

      相比于 Hadoop 的 MapReduce, 我們將看到 Spark 提供了多種結算結果處理的方式及對 Shuffle 過程進行的多種優化.

      Shuffle 是所有 MapReduce 計算框架必須面臨的執行階段, Shuffle 用于打通 map 任務的輸出與reduce 任務的輸入.

      map 任務的中間輸出結果按照指定的分區策略(例如, 按照 key 的哈希值)分配給處理某一個分區的 reduce 任務.

      通用的 MapReduce 框架:

      Shuffle 的核心要點

      在劃分 Stage 時,最后一個 Stage 稱為finalStage(變量名),它本質上是一個ResultStage類型的對象,前面的所有 Stage 被稱為 ShuffleMapStage。

      ShuffleMapStage 的結束伴隨著 shuffle 文件的寫磁盤。

      ResultStage 基本上對應代碼中的 action 算子,即將一個函數應用在 RDD的各個partition的數據集上,意味著一個job的運行結束。

      一. Shuffle 流程源碼分析

      我們從CoarseGrainedExecutorBackend開始分析

      啟動任務

      override def receive: PartialFunction[Any, Unit] = { case LaunchTask(data) => if (executor == null) { } else { val taskDesc = ser.deserialize[TaskDescription](data.value) // 啟動任務 executor.launchTask(this, taskId = taskDesc.taskId, attemptNumber = taskDesc.attemptNumber, taskDesc.name, taskDesc.serializedTask) } }

      1

      2

      3

      4

      5

      6

      7

      8

      9

      10

      11

      12

      13

      Executor.launchTask 方法

      def launchTask( context: ExecutorBackend, taskId: Long, attemptNumber: Int, taskName: String, serializedTask: ByteBuffer): Unit = { // Runnable 接口的對象. val tr = new TaskRunner(context, taskId = taskId, attemptNumber = attemptNumber, taskName, serializedTask) runningTasks.put(taskId, tr) // 在線程池中執行 task threadPool.execute(tr) }

      1

      2

      3

      4

      5

      6

      7

      8

      9

      10

      11

      12

      13

      14

      tr.run方法

      override def run(): Unit = { // 更新 task 的狀態 execBackend.statusUpdate(taskId, TaskState.RUNNING, EMPTY_BYTE_BUFFER) try { // 把任務相關的數據反序列化出來 val (taskFiles, taskJars, taskProps, taskBytes) = Task.deserializeWithDependencies(serializedTask) val value = try { // 開始運行 Task val res = task.run( taskAttemptId = taskId, attemptNumber = attemptNumber, metricsSystem = env.metricsSystem) res } finally { } } catch { } finally { } }

      1

      2

      3

      4

      5

      6

      7

      8

      9

      10

      11

      12

      13

      14

      15

      16

      17

      18

      19

      20

      21

      22

      Task.run 方法

      final def run( taskAttemptId: Long, attemptNumber: Int, metricsSystem: MetricsSystem): T = { context = new TaskContextImpl( stageId, partitionId, taskAttemptId, attemptNumber, taskMemoryManager, localProperties, metricsSystem, metrics) try { // 運行任務 runTask(context) } catch { } finally { } }

      1

      2

      3

      4

      5

      6

      7

      8

      9

      10

      11

      12

      13

      14

      15

      16

      17

      18

      19

      20

      21

      22

      23

      Task.runTask 方法

      Task.runTask是一個抽象方法.

      Task 有兩個實現類, 分別執行不同階段的Task

      ShuffleMapTask源碼分析

      ShuffleMapTask.runTask 方法

      override def runTask(context: TaskContext): MapStatus = { var writer: ShuffleWriter[Any, Any] = null try { val manager = SparkEnv.get.shuffleManager // 獲取 ShuffleWriter writer = manager.getWriter[Any, Any](dep.shuffleHandle, partitionId, context) // 寫出 RDD 中的數據. rdd.iterator 是讀(計算)數據的操作. writer.write(rdd.iterator(partition, context).asInstanceOf[Iterator[_ <: Product2[Any, Any]]]) } catch { } }

      1

      2

      3

      4

      5

      6

      7

      8

      9

      10

      11

      12

      13

      具體如何把數據寫入到磁盤, 是由ShuffleWriter.write方法來完成.

      ShuffleWriter是一個抽象類, 有 3 個實現:

      根據在manager.getWriter[Any, Any](dep.shuffleHandle, partitionId, context)中的dep.shuffleHandle由manager來決定選使用哪種ShuffleWriter.

      ShuffleManager

      ShuffleManage 是一個Trait, 從2.0.0開始就只有一個實現類了: SortShuffleManager

      registerShuffle 方法: 匹配出來使用哪種ShuffleHandle

      override def registerShuffle[K, V, C]( shuffleId: Int, numMaps: Int, dependency: ShuffleDependency[K, V, C]): ShuffleHandle = { if (SortShuffleWriter.shouldBypassMergeSort(SparkEnv.get.conf, dependency)) { new BypassMergeSortShuffleHandle[K, V]( shuffleId, numMaps, dependency.asInstanceOf[ShuffleDependency[K, V, V]]) } else if (SortShuffleManager.canUseSerializedShuffle(dependency)) { new SerializedShuffleHandle[K, V]( shuffleId, numMaps, dependency.asInstanceOf[ShuffleDependency[K, V, V]]) } else { new BaseShuffleHandle(shuffleId, numMaps, dependency) } }

      1

      2

      3

      4

      5

      6

      7

      8

      9

      10

      11

      12

      13

      14

      15

      16

      getWriter 方法

      /** Get a writer for a given partition. Called on executors by map tasks. */ override def getWriter[K, V]( handle: ShuffleHandle, mapId: Int, context: TaskContext): ShuffleWriter[K, V] = { // 根據不同的 Handle, 創建不同的 ShuffleWriter handle match { case unsafeShuffleHandle: SerializedShuffleHandle[K@unchecked, V@unchecked] => new UnsafeShuffleWriter( env.blockManager, shuffleBlockResolver.asInstanceOf[IndexShuffleBlockResolver], context.taskMemoryManager(), unsafeShuffleHandle, mapId, context, env.conf) case bypassMergeSortHandle: BypassMergeSortShuffleHandle[K@unchecked, V@unchecked] => new BypassMergeSortShuffleWriter( env.blockManager, shuffleBlockResolver.asInstanceOf[IndexShuffleBlockResolver], bypassMergeSortHandle, mapId, context, env.conf) case other: BaseShuffleHandle[K@unchecked, V@unchecked, _] => new SortShuffleWriter(shuffleBlockResolver, other, mapId, context) } }

      1

      2

      3

      4

      5

      6

      7

      8

      9

      10

      11

      12

      13

      14

      Spark內核詳解 (6) | Spark Shuffle 解析

      15

      16

      17

      18

      19

      20

      21

      22

      23

      24

      25

      26

      27

      28

      29

      30

      二. HashShuffle 解析

      Spark-1.6 之前默認的shuffle方式是hash. 在 spark-1.6版本之后使用Sort-Base Shuffle,因為HashShuffle存在的不足所以就替換了HashShuffle. Spark2.0之后, 從源碼中完全移除了HashShuffle.

      未優化的HashShuffle

      為了方便分析假設前提:每個 Executor 只有 1 個CPU core,也就是說,無論這個 Executor 上分配多少個 task 線程,同一時間都只能執行一個 task 線程。

      如下圖中有 3個 Reducer,從 Task 開始那邊各自把自己進行 Hash 計算(分區器:hash/numreduce取模),分類出3個不同的類別,每個 Task 都分成3種類別的數據,想把不同的數據匯聚然后計算出最終的結果,所以Reducer 會在每個 Task 中把屬于自己類別的數據收集過來,匯聚成一個同類別的大集合,每1個 Task 輸出3份本地文件,這里有4個 Mapper Tasks,所以總共輸出了4個 Tasks x 3個分類文件 = 12個本地小文件。

      缺點:

      map 任務的中間結果首先存入內存(緩存), 然后才寫入磁盤. 這對于內存的開銷很大, 當一個節點上 map 任務的輸出結果集很大時, 很容易導致內存緊張, 發生 OOM

      生成很多的小文件. 假設有 M 個 MapTask, 有 N 個 ReduceTask, 則會創建 M * n 個小文件, 磁盤 I/O 將成為性能瓶頸.

      優化的HashShuffle

      優化的 HashShuffle 過程就是啟用合并機制,合并機制就是復用buffer,開啟合并機制的配置是spark.shuffle.consolidateFiles。該參數默認值為false,將其設置為true即可開啟優化機制。通常來說,如果我們使用HashShuffleManager,那么都建議開啟這個選項。

      這里還是有 4 個Tasks,數據類別還是分成 3 種類型,因為Hash算法會根據你的 Key 進行分類,在同一個進程中,無論是有多少過Task,都會把同樣的Key放在同一個Buffer里,然后把Buffer中的數據寫入以Core數量為單位的本地文件中,(一個Core只有一種類型的Key的數據),每1個Task所在的進程中,分別寫入共同進程中的3份本地文件,這里有4個Mapper Tasks,所以總共輸出是 2個Cores x 3個分類文件 = 6個本地小文件。

      三. SortShuffle 解析

      1. 普通 SortShuffle

      在該模式下,數據會先寫入一個數據結構,reduceByKey 寫入 Map,一邊通過 Map 局部聚合,一遍寫入內存。Join 算子寫入 ArrayList 直接寫入內存中。然后需要判斷是否達到閾值,如果達到就會將內存數據結構的數據寫入到磁盤,清空內存數據結構。

      在溢寫磁盤前,先根據 key 進行排序,排序過后的數據,會分批寫入到磁盤文件中。默認批次為 10000 條,數據會以每批一萬條寫入到磁盤文件。寫入磁盤文件通過緩沖區溢寫的方式,每次溢寫都會產生一個磁盤文件,也就是說一個 Task 過程會產生多個臨時文件。

      最后在每個 Task 中,將所有的臨時文件合并,這就是merge過程,此過程將所有臨時文件讀取出來,一次寫入到最終文件。意味著一個Task的所有數據都在這一個文件中。同時單獨寫一份索引文件,標識下游各個Task的數據在文件中的索引,start offset和end offset。

      2.普通 SortShuffle 源碼解析

      write 方法

      override def write(records: Iterator[Product2[K, V]]): Unit = { // 排序器 sorter = if (dep.mapSideCombine) { require(dep.aggregator.isDefined, "Map-side combine without Aggregator specified!") new ExternalSorter[K, V, C]( context, dep.aggregator, Some(dep.partitioner), dep.keyOrdering, dep.serializer) } else { // In this case we pass neither an aggregator nor an ordering to the sorter, because we don't // care whether the keys get sorted in each partition; that will be done on the reduce side // if the operation being run is sortByKey. new ExternalSorter[K, V, V]( context, aggregator = None, Some(dep.partitioner), ordering = None, dep.serializer) } // 將 Map 任務的輸出記錄插入到緩存中 sorter.insertAll(records) // 數據 shuffle 數據文件 val output = shuffleBlockResolver.getDataFile(dep.shuffleId, mapId) try { // 將 map 端緩存的數據寫入到磁盤中, 并生成 Block 文件對應的索引文件. val blockId = ShuffleBlockId(dep.shuffleId, mapId, IndexShuffleBlockResolver.NOOP_REDUCE_ID) // 記錄各個分區數據的長度 val partitionLengths = sorter.writePartitionedFile(blockId, tmp) // 生成 Block 文件對應的索引文件. 此索引文件用于記錄各個分區在 Block文件中的偏移量, 以便于 // Reduce 任務拉取時使用 shuffleBlockResolver.writeIndexFileAndCommit(dep.shuffleId, mapId, partitionLengths, tmp) mapStatus = MapStatus(blockManager.shuffleServerId, partitionLengths) } finally { } }

      1

      2

      3

      4

      5

      6

      7

      8

      9

      10

      11

      12

      13

      14

      15

      16

      17

      18

      19

      20

      21

      22

      23

      24

      25

      26

      27

      28

      29

      30

      31

      32

      3 bypassSortShuffle

      bypass運行機制的觸發條件如下(必須同時滿足):

      shuffle map task數量小于spark.shuffle.sort.bypassMergeThreshold參數的值,默認為200。

      不是聚合類的shuffle算子(沒有預聚合)(比如groupByKey)。

      此時 task 會為每個 reduce 端的 task 都創建一個臨時磁盤文件,并將數據按 key 進行 hash 然后根據key 的 hash 值,將 key 寫入對應的磁盤文件之中。當然,寫入磁盤文件時也是先寫入內存緩沖,緩沖寫滿之后再溢寫到磁盤文件的。最后,同樣會將所有臨時磁盤文件都合并成一個磁盤文件,并創建一個單獨的索引文件。

      該過程的磁盤寫機制其實跟未經優化的 HashShuffleManager 是一模一樣的,因為都要創建數量驚人的磁盤文件,只是在最后會做一個磁盤文件的合并而已。因此少量的最終磁盤文件,也讓該機制相對未經優化的HashShuffleManager來說,shuffle read的性能會更好。 而該機制與普通SortShuffleManager運行機制的不同在于:不會進行排序。也就是說,啟用該機制的最大好處在于,shuffle write過程中,不需要進行數據的排序操作,也就節省掉了這部分的性能開銷。

      4. bypass SortShuffle 源碼解析

      有時候, map 端不需要在持久化數據之前進行排序等操作, 那么 ShuffleWriter的實現類之一BypassMergeSortShuffleWriter 就可以派上用場了.

      觸發 BypassMergeSort

      private[spark] object SortShuffleWriter { def shouldBypassMergeSort(conf: SparkConf, dep: ShuffleDependency[_, _, _]): Boolean = { // We cannot bypass sorting if we need to do map-side aggregation. // 如果 map 端有聚合, 則不能繞過排序 if (dep.mapSideCombine) { require(dep.aggregator.isDefined, "Map-side combine without Aggregator specified!") false } else { val bypassMergeThreshold: Int = conf.getInt("spark.shuffle.sort.bypassMergeThreshold", 200) // 分區數不能超過200 默認值 dep.partitioner.numPartitions <= bypassMergeThreshold } } }

      1

      2

      3

      4

      5

      6

      7

      8

      9

      10

      11

      12

      13

      14

      15

      本次的分享就到這里了,

      好書不厭讀百回,熟讀課思子自知。而我想要成為全場最靚的仔,就必須堅持通過學習來獲取更多知識,用知識改變命運,用博客見證成長,用行動證明我在努力。

      如果我的博客對你有幫助、如果你喜歡我的博客內容,請“” “評論”“”一鍵三連哦!聽說的人運氣不會太差,每一天都會元氣滿滿呦!如果實在要白嫖的話,那祝你開心每一天,歡迎常來我博客看看。

      碼字不易,大家的支持就是我堅持下去的動力。后不要忘了關注我哦!

      spark 任務調度

      版權聲明:本文內容由網絡用戶投稿,版權歸原作者所有,本站不擁有其著作權,亦不承擔相應法律責任。如果您發現本站中有涉嫌抄襲或描述失實的內容,請聯系我們jiasou666@gmail.com 處理,核實后本網站將在24小時內刪除侵權內容。

      版權聲明:本文內容由網絡用戶投稿,版權歸原作者所有,本站不擁有其著作權,亦不承擔相應法律責任。如果您發現本站中有涉嫌抄襲或描述失實的內容,請聯系我們jiasou666@gmail.com 處理,核實后本網站將在24小時內刪除侵權內容。

      上一篇:還在焦頭爛額裸寫Scrapy?這個神器讓你90秒內配好一個爬蟲
      下一篇:紫光國微DRAM設計業務具備世界主流設計水平,但產品銷量還是不大
      相關文章
      在线亚洲午夜片AV大片| 国产国拍亚洲精品福利| 国产精品亚洲аv无码播放| 亚洲一区二区视频在线观看| 亚洲国产精品无码久久久久久曰 | 亚洲精品无码AV人在线播放 | 国产亚洲精aa成人网站| 日本系列1页亚洲系列| 狼人大香伊蕉国产WWW亚洲| 亚洲日韩一区二区一无码| 亚洲色大成网站www永久网站| 亚洲精品无码少妇30P| 亚洲精品乱码久久久久蜜桃| 亚洲精品乱码久久久久蜜桃| 亚洲成a人无码亚洲成av无码| 亚洲av成人中文无码专区| 国产精品自拍亚洲| 亚洲国产精品成人一区| 亚洲一区无码精品色| 浮力影院亚洲国产第一页| 亚洲精品少妇30p| 夜夜亚洲天天久久| 亚洲熟妇色自偷自拍另类| 亚洲伊人久久大香线蕉| 亚洲乱码在线视频| 亚洲欧洲无码一区二区三区| 老司机亚洲精品影院在线观看| 亚洲乱码中文字幕手机在线| 中文字幕中韩乱码亚洲大片| 亚洲夜夜欢A∨一区二区三区| 亚洲毛片免费视频| 亚洲国产日韩在线人成下载| 亚洲国产乱码最新视频 | 亚洲国产精品SSS在线观看AV| 亚洲视频在线观看| 亚洲国产超清无码专区| 亚洲熟妇少妇任你躁在线观看| 国产精品亚洲专区无码WEB| 亚洲一区视频在线播放| 亚洲精选在线观看| 亚洲三级视频在线观看 |