「Spark從精通到重新入門(一)」Spark 中不可不知的動態優化
簡介:?Apache Spark 自 2010 年面世,到現在已經發展為大數據批計算的首選引擎。而在 2020 年 6 月份發布的Spark 3.0 版本也是 Spark 有史以來最大的 Release,其中將近一半的 issue 都屬于 SparkSQL。這也迎合我們現在的主要場景(90% 是 SQL),同時也是優化痛點和主要功能點。我們 Erda 的 FDP 平臺(Fast Data Platform)也從 Spark 2.4 升級到 Spark 3.0 并做了一系列的相關優化,本文將主要結合 Spark 3.0 版本進行探討研究。

前言
Apache Spark 自 2010 年面世,到現在已經發展為大數據批計算的首選引擎。而在 2020 年 6 月份發布的Spark 3.0 版本也是 Spark 有史以來最大的 Release,其中將近一半的 issue 都屬于 SparkSQL。這也迎合我們現在的主要場景(90% 是 SQL),同時也是優化痛點和主要功能點。我們 Erda 的 FDP 平臺(Fast Data Platform)也從 Spark 2.4 升級到 Spark 3.0 并做了一系列的相關優化,本文將主要結合 Spark 3.0 版本進行探討研究。
為什么 Spark 3.0 能夠“神功大成”,在速度和性能方面有質的突破?本文就為大家介紹 Spark 3.0 中 SQL Engine 的“天榜第一”——自適應查詢框架 AQE(Adaptive Query Execution)。
AQE,你是誰?
簡單來說,自適應查詢就是在運行時不斷優化執行邏輯。
Spark 3.0 版本之前,Spark 執行 SQL 是先確定 shuffle 分區數或者選擇 Join 策略后,再按規劃執行,過程中不夠靈活;現在,在執行完部分的查詢后,Spark 利用收集到結果的統計信息再對查詢規劃重新進行優化。這個優化的過程不是一次性的,而是隨著查詢會不斷進行優化, 讓整個查詢優化變得更加靈活和自適應。這一改動讓我們告別之前無休止的被動優化。
AQE,你會啥?
了解了 AQE 是什么之后,我們再看看自適應查詢 AQE 的“三板斧”:
動態合并 Shuffle 分區
動態調整 Join 策略
動態優化數據傾斜
動態合并 shuffle 分區
如果你之前使用過 Spark,也許某些“調優寶典”會告訴你調整 shuffle 的 partitions 數量,默認是 200。但是在不同 shuffle 中,數據的大小和分布基本都是不同的,那么簡單地用一個配置,讓所有的 shuffle 來遵循,顯然不是最優的。
分區過小會導致每個 partition 處理的數據較大,可能需要將數據溢寫到磁盤,從而減慢查詢速度;分區過大又會帶來 GC 壓力和低效 ?I/O 等問題。因此,動態合并 shuffle 分區是非常必要的。AQE 可以在運行期間動態調整分區數來達到性能最優。
如下圖所示,如果沒有 AQE,shuffle 分區數為 5,對應執行的 Task 數為 5,但是其中有三個的數據量很少,任務分配不平衡,浪費了資源,降低了處理效率。
而 AQE 會合并三個小分區,最終只執行三個 Task,這樣就不會出現之前 Task 空轉的資源浪費情況。
動態調整 join 策略
SparkJoin 策略大致可以分三種,分別是 Broadcast Hash Join、Shuffle Hash Join 和 SortMerge Join。其中 Broadcast 通常是性能最好的,Spark 會在執行前選擇合適的 Join 策略。
例如下面兩個表的大小分別為 100 MB 和 30 MB,小表超過 10 MB (spark.sql.autoBroadcastJoinThreshold = 10 MB),所以在 Spark 2.4 中,執行前就選擇了 SortMerge Join 的策略,但是這個方案并沒有考慮 Table2 經過條件過濾之后的大小實際只有 8 MB。
AQE 可以基于運行期間的統計信息,將 SortMerge Join 轉換為 Broadcast Hash Join。
在上圖中,Table2 經過條件過濾后真正參與 Join 的數據只有 8 MB,因此 Broadcast Hash Join 策略更優,Spark 3.0 會及時選擇適合的 Join 策略來提高查詢性能。
動態優化數據傾斜
數據傾斜一直是我們數據處理中的常見問題。當將相同 key 的數據拉取到一個 Task 中處理時,如果某個 key 對應的數據量特別大的話,就會發生數據傾斜,如下圖一樣產生長尾任務導致整個 Stage 耗時增加甚至 OOM。之前的解決方法比如重寫 query 或者增加 key 消除數據分布不均,都非常浪費時間且后期難以維護。
AQE 可以檢查分區數據是否傾斜,如果分區數據過大,就將其分隔成更小的分區,通過分而治之來提升總體性能。
沒有 AQE 傾斜優化時,當某個 shuffle 分區的數據量明顯高于其他分區,會產生長尾 Task,因為整個 Stage 的結束時間是按它的最后一個 Task 完成時間計算,下一個 Stage 只能等待,這會明顯降低查詢性能。
開啟 AQE 后,會將 A0 分成三個子分區,并將對應的 B0 復制三份,優化后將有 6 個 Task 運行 Join,且每個 Task 耗時差不多,從而獲得總體更好的性能。通過對傾斜數據的自適應重分區,解決了傾斜分區導致的整個任務的性能瓶頸,提高了查詢處理效率。
自適應查詢 AQE 憑借著自己的“三板斧”,在 1TB TPC-DS 基準中,可以將 q77 的查詢速度提高 8 倍,q5 的查詢速度提高 2 倍,且對另外 26 個查詢的速度提高 1.1 倍以上,這是普通優化無法想象的傲人戰績!
真的嗎?我不信
口說無憑,自適應查詢 AQE 的優越性到底是如何實現,我們“碼”上看看。
AQE 參數說明
#AQE開關 spark.sql.adaptive.enabled=true #默認false,為true時開啟自適應查詢,在運行過程中基于統計信息重新優化查詢計劃 spark.sql.adaptive.forceApply=true #默認false,自適應查詢在沒有shuffle或子查詢時將不適用,設置為true將始終使用 spark.sql.adaptive.advisoryPartitionSizeInBytes=64M #默認64MB,開啟自適應執行后每個分區的大小。合并小分區和分割傾斜分區都會用到這個參數 #開啟合并shuffle分區 spark.sql.adaptive.coalescePartitions.enabled=true #當spark.sql.adaptive.enabled也開啟時,合并相鄰的shuffle分區,避免產生過多小task spark.sql.adaptive.coalescePartitions.initialPartitionNum=200 #合并之前shuffle分區數的初始值,默認值是spark.sql.shuffle.partitions,可設置高一些 spark.sql.adaptive.coalescePartitions.minPartitionNum=20 #合并后的最小shuffle分區數。默認值是Spark集群的默認并行性 spark.sql.adaptive.maxNumPostShufflePartitions=500 #reduce分區最大值,默認500,可根據資源調整 #開啟動態調整Join策略 spark.sql.adaptive.join.enabled=true #與spark.sql.adaptive.enabled都開啟的話,開啟AQE動態調整Join策略 #開啟優化數據傾斜 spark.sql.adaptive.skewJoin.enabled=true #與spark.sql.adaptive.enabled都開啟的話,開啟AQE動態處理Join時數據傾斜 spark.sql.adaptive.skewedPartitionMaxSplits=5 #處理一個傾斜Partition的task個數上限,默認值為5; spark.sql.adaptive.skewedPartitionRowCountThreshold=1000000 #傾斜Partition的行數下限,即行數低于該值的Partition不會被當作傾斜,默認值一千萬 spark.sql.adaptive.skewedPartitionSizeThreshold=64M #傾斜Partition的大小下限,即大小小于該值的Partition不會被當做傾斜,默認值64M spark.sql.adaptive.skewedPartitionFactor=5 #傾斜因子,默認為5。判斷是否為傾斜的 Partition。如果一個分區(DataSize>64M*5) || (DataNum>(1000w*5)),則視為傾斜分區。 spark.shuffle.statistics.verbose=true #默認false,打開后MapStatus會采集每個partition條數信息,用于傾斜處理
AQE 功能演示
Spark 3.0 默認未開啟 AQE 特性,樣例 sql 執行耗時 41 s。
存在 Task 空轉情況,shuffle 分區數始終為默認的 200。
開啟 AQE 相關配置項,再次執行樣例 sql。
樣例 sql 執行耗時 18 s,快了一倍以上。
并且每個 Stage 的分區數動態調整,而不是固定的 200。無 task 空轉情況,在 DAG 圖中也能觀察到特性開啟。
總結
Spark 3.0 在速度和性能方面得提升有目共睹,它的新特性遠不止自適應查詢一個,當然也不意味著所有的場景都能有明顯的性能提升,還需要我們結合業務和數據進行探索和使用。
注:文中部分圖片源自于網絡,侵刪。
更多技術干貨請關注【爾達 Erda】公眾號,與眾多開源愛好者共同成長~
spark SQL
版權聲明:本文內容由網絡用戶投稿,版權歸原作者所有,本站不擁有其著作權,亦不承擔相應法律責任。如果您發現本站中有涉嫌抄襲或描述失實的內容,請聯系我們jiasou666@gmail.com 處理,核實后本網站將在24小時內刪除侵權內容。
版權聲明:本文內容由網絡用戶投稿,版權歸原作者所有,本站不擁有其著作權,亦不承擔相應法律責任。如果您發現本站中有涉嫌抄襲或描述失實的內容,請聯系我們jiasou666@gmail.com 處理,核實后本網站將在24小時內刪除侵權內容。