【云圖說】第235期 DDS讀寫兩步走 帶您領略只讀節點的風采
764
2025-04-09
算子調優一:mapPartitions
普通的 map 算子對 RDD 中的每一個元素進行操作,而 mapPartitions 算子對 RDD 中每一個分區進行操作。如果是普通的 map 算子,假設一個 partition 有 1 萬條數據, 那么 map 算子中的 function 要執行 1 萬次, 也就是對每個元素進行操作。
圖 2-3 map 算子
image.png
圖 2-4 mapPartitions 算子
image.png
mapPartitions 算子也存在一些缺點:對于普通的 map 操作,一次處理一條數據, 如果在處理了 2000 條數據后內存不足, 那么可以將已經處理完的 2000 條數據從內存中垃圾回收掉; 但是如果使用 mapPartitions 算子,但數據量非常大時, function 一次處理一個分區的數據,如果一旦內存不足,此時無法回收內存,就可能會 OOM, 即內存溢出。
因 此 , mapPartitions 算 子 適 用 于 數 據 量 不 是 特 別 大 的 時 候 , 此 時 使 用mapPartitions 算子對性能的提升效果還是不錯的。(當數據量很大的時候,一旦使用 mapPartitions 算子, 就會直接 OOM)
在項目中,應該首先估算一下 RDD 的數據量、每個 partition 的數據量,以及分配給每個 Executor 的內存資源,如果資源允許,可以考慮使用 mapPartitions 算子代替 map。
在生產環境中, 通常使用 foreachPartition 算子來完成數據庫的寫入,通過
foreachPartition 算子的特性, 可以優化寫數據庫的性能。
如果使用 foreach 算子完成數據庫的操作,由于 foreach 算子是遍歷 RDD 的每條數據,因此,每條數據都會建立一個數據庫連接,這是對資源的極大浪費,因此,對于寫數據庫操作,我們應當使用 foreachPartition 算子。
與 mapPartitions 算子非常相似,foreachPartition 是將 RDD 的每個分區作為遍歷對象,一次處理一個分區的數據,也就說,如果涉及數據庫的相關操作,一個分區的數據只需要創建一次數據庫連接,如圖 2-5 所示:
image.png
對于我們寫的 function 函數, 一次處理一整個分區的數據;
對于一個分區內的數據, 創建唯一的數據庫連接;
只需要向數據庫發送一次 SQL 語句和多組參數;
在 生 產 環 境 中 , 全 部 都 會 使 用 foreachPartition 算 子 完 成 數 據 庫 操 作 。
foreachPartition 算子存在一個問題,與 mapPartitions 算子類似,如果一個分區的數據量特別大,可能會造成 OOM, 即內存溢出。
在 Spark 任務中我們經常會使用 filter 算子完成 RDD 中數據的過濾,在任務初始階段,從各個分區中加載到的數據量是相近的,但是一旦進過 filter 過濾后,每個分區的數據量有可能會存在較大差異, 如圖 2-6 所示:
image.png
針對第一個問題,既然分區的數據量變小了,我們希望可以對分區數據進行
重新分配,比如將原來 4 個分區的數據轉化到 2 個分區中, 這樣只需要用后面的兩
個 task 進行處理即可,避免了資源的浪費。
針對第二個問題,解決方法和第一個問題的解決方法非常相似,對分區數據重新分配,讓每個 partition 中的數據量差不多,這就避免了數據傾斜問題。
那么具體應該如何實現上面的解決思路? 我們需要 coalesce 算子。
repartition 與 coalesce 都可以用來進行重分區,其中 repartition 只是 coalesce 接口中 shuffle 為 true 的簡易實現,coalesce 默認情況下不進行 shuffle,但是可以通過參數進行設置。
假設我們希望將原本的分區個數 A 通過重新分區變為 B,那么有以下幾種情況:
A > B(多數分區合并為少數分區)
① A 與 B 相差值不大
此時使用 coalesce 即可,無需 shuffle 過程。
② A 與 B 相差值很大
此時可以使用 coalesce 并且不啟用 shuffle 過程,但是會導致合并過程性能低下, 所以推薦設置 coalesce 的第二個參數為 true,即啟動 shuffle 過程。
A < B(少數分區分解為多數分區)
此時使用 repartition 即可,如果使用 coalesce 需要將 shuffle 設置為 true,否則
coalesce 無效。
我們可以在 filter 操作之后,使用 coalesce 算子針對每個 partition 的數據量各不相同的情況,壓縮 partition 的數量,而且讓每個 partition 的數據量盡量均勻緊湊, 以便于后面的 task 進行計算操作, 在某種程度上能夠在一定程度上提升性能。
注意:local 模式是進程內模擬集群運行,已經對并行度和分區數量有了一定的內部優化,因此不用去設置并行度和分區數量。
在第一節的常規性能調優中我們講解了并行度的調節策略,但是,并行度的設置對于 Spark SQL 是不生效的, 用戶設置的并行度只對于 Spark SQL 以外的所有Spark 的stage 生效。
Spark SQL 的并行度不允許用戶自己指定,Spark SQL 自己會默認根據 hive 表對應的 HDFS 文件的 split 個數自動設置 Spark SQL 所在的那個 stage 的并行度, 用戶自己通 spark.default.parallelism 參數指定的并行度, 只會在沒 Spark SQL 的 stage 中生效。
由于 Spark SQL 所在 stage 的并行度無法手動設置, 如果數據量較大,并且此stage 中后續的 transformation 操作有著復雜的業務邏輯,而 Spark SQL 自動設置的task 數量很少, 這就意味著每個 task 要處理為數不少的數據量,然后還要執行非常復雜的處理邏輯,這就可能表現為第一個有 Spark SQL 的 stage 速度很慢,而后續的沒有 Spark SQL 的 stage 運行速度非常快。
為了解決 Spark SQL 無法設置并行度和 task 數量的問題, 我們可以使用
repartition 算子。
image.png
圖 2-7 repartition算子使用前后對比圖
Spark SQL 這一步的并行度和 task 數量肯定是沒有辦法去改變了,但是, 對于Spark SQL 查詢出來的 RDD, 立即使用 repartition 算子, 去重新進行分區, 這樣可以重新分區為多個 partition,從 repartition 之后的 RDD 操作,由于不再涉及SparkSQL,因此 stage 的并行度就會等于你手動設置的值,這樣就避免了 Spark SQL 所在的 stage 只能用少量的 task 去處理大量數據并執行復雜的算法邏輯。使用 repartition 算子的前后對比如圖 2-7 所示。
reduceByKey 相較于普通的 shuffle 操作一個顯著的特點就是會進行map 端的本地聚合,map 端會先對本地的數據進行 combine 操作,然后將數據寫入給下個 stage 的每個 task 創建的文件中,也就是在 map 端,對每一個 key 對應的 value,執行reduceByKey 算子函數。reduceByKey 算子的執行過程如圖 2-8 所示:
image.png
本地聚合后,在 map 端的數據量變少, 減少了磁盤 IO,也減少了對磁盤空間的占用;
本地聚合后,下一個 stage 拉取的數據量變少,減少了網絡傳輸的數據量;
本地聚合后,在 reduce 端進行數據緩存的內存占用減少;
本地聚合后,在 reduce 端進行聚合的數據量減少。
基于 reduceByKey 的本地聚合特征, 我們應該考慮使用 reduceByKey 代替其他的 shuffle 算子,例如 groupByKey。reduceByKey 與 groupByKey 的運行原理如圖 2-9 和圖 2-10 所示:
image.png
圖 2-9 groupByKey 原理
image.png
根據上圖可知, groupByKey 不會進行 map 端的聚合, 而是將所有 map 端的數據 shuffle 到 reduce 端, 然后在 reduce 端進行數據的聚合操作。由于 reduceByKey 有 map 端 聚 合 的 特 性 , 使 得 網 絡 傳 輸 的 數 據 量 減 小 , 因 此 效 率 要 明 顯 高 于groupByKey。
我們建議,如果能通過reduceByKey就用reduceByKey,因為reducebykey會在map端,先進行本地的combine,可以大大減少要傳輸到reduce端的數據量,減小網絡傳輸的開銷。
ReduceByKey這個操作首先會在HashShufferWriter的write()方法,先判斷一下,如果是 isMapCombined,那么就在本地聚合,聚合完以后在寫入磁盤。
而groupbykey的性能,相對來說很低,因為他是不會進行本地聚合的,而是原封不動,把shufferMapTask的輸出,拉取到ResultTask的內存中,所以這樣的話,就會導致,所有的數據,都要進行網絡傳輸,從而導致網絡傳輸的性能開銷特別大!
spark SQL
版權聲明:本文內容由網絡用戶投稿,版權歸原作者所有,本站不擁有其著作權,亦不承擔相應法律責任。如果您發現本站中有涉嫌抄襲或描述失實的內容,請聯系我們jiasou666@gmail.com 處理,核實后本網站將在24小時內刪除侵權內容。
版權聲明:本文內容由網絡用戶投稿,版權歸原作者所有,本站不擁有其著作權,亦不承擔相應法律責任。如果您發現本站中有涉嫌抄襲或描述失實的內容,請聯系我們jiasou666@gmail.com 處理,核實后本網站將在24小時內刪除侵權內容。