Spark3.0主要特性(1)—— Adaptive Query Execution
眾所周知,目前Spark的基于代價的優(yōu)化策略,能夠給SQL執(zhí)行計劃帶來很大的優(yōu)化,比如:調整Join順序,決定Join類型(BroadcastHashJoin 或者 SortMergeJoin)等等。 但是該優(yōu)化策略有一個明顯的問題是:對于代價的估計是基于表的一些統(tǒng)計信息的,若這些統(tǒng)計信息不存在或者過期,則會對SQL的優(yōu)化帶來負面的影響。因此,本文介紹的Adaptive Query Execution就是針對這種問題,不依賴于統(tǒng)計信息進行優(yōu)化。
AQE的一個難點就是在何時進行再次優(yōu)化規(guī)則。Spark程序執(zhí)行時,一般都是并行或者是管道式的,但是了解過Spark內核的人都知道,Spark作業(yè)有一個DAG Stage的劃分,Stage之間會進行shuffle操作,所以每一個stage要等待其上一個stage作業(yè)全部完成才能開始,這就為AQE的執(zhí)行提供了一個時機,因此此時已經能夠知道前一個Stage的中間結果的大小、列數(shù)等統(tǒng)計信息,可以為我們的AQE執(zhí)行提供所需的統(tǒng)計信息。
首先,對于第一層葉子節(jié)點的Stage(即不依賴于其他任何Stage的那些Stage)不需要執(zhí)行AQE;
每一個Stage執(zhí)行完成后,就標記該stage的狀態(tài)為完成,同時收集統(tǒng)計信息,并更新對應的邏輯計劃;
根據(jù)收集到的這些統(tǒng)計信息,重新執(zhí)行指定的一些優(yōu)化規(guī)則,再轉為物理計劃;
然后,基于這個新的優(yōu)化后的Plan, 從之前已經完成的Stage向后繼續(xù)執(zhí)行,并重復上述步驟,直到整個sql執(zhí)行完成
AQE有如下3個特征:
1、動態(tài)合并Shuffle分區(qū)
Spark執(zhí)行查詢過程中會有很多Shuffle操作,即Stage之間的數(shù)據(jù)傳遞,需要通過網絡對數(shù)據(jù)進行傳遞并合并計算等操作。影響Shuffle的性能有很多因素,其中分區(qū)的個數(shù)就是一個很重要的因素。 分區(qū)的個數(shù)目前是用默認的配置項200來決定的,該值的選擇對Shuffle影響很大:
若分區(qū)個數(shù)太少,則每個分區(qū)需要處理的數(shù)據(jù)量很大,每個task處理一個分區(qū)的數(shù)據(jù),可能會需要將數(shù)據(jù)溢寫到磁盤,從而降低執(zhí)行效率;
若分區(qū)個數(shù)太大,則每個分區(qū)處理很少的數(shù)據(jù),但是task個數(shù)很多,導致很多小的網絡數(shù)據(jù)獲取和傳播,同樣會因為IO瓶頸帶來性能下降。
在AQE中,首先設置一個較大的分區(qū)個數(shù),然后隨著Stage任務的執(zhí)行,在運行時根據(jù)metrics統(tǒng)計信息將小的數(shù)據(jù)量的分區(qū)進行合并,從而自動調整分區(qū)個數(shù)。以?SELECT max(i) FROM tbl GROUP BY j??為例,
原表很小,在group執(zhí)行之前,只有兩個分區(qū);
初始分區(qū)個數(shù)設置為5,則本地group之后會將數(shù)據(jù)劃分為5個分區(qū);
若沒有AQE,則shuffle之后將分為5個task分別執(zhí)行,其中有3個task的數(shù)據(jù)量很小,提交這樣的task執(zhí)行會浪費一定的資源;
但是開啟AQE之后,會自動將小分區(qū)合并,如下圖,合并之后剩余3個分區(qū),且每個分區(qū)的數(shù)據(jù)量相近。
2、動態(tài)切換join策略
Spark中用的最多的Join方式為BroadcastHashJoin 和 SortMergeJoin,所有的Join類型中BroadcastHashJoin性能最好,因為避免了數(shù)據(jù)的shuffle。 所以Spark目前通過估計join兩端表的大小與廣播閾值的關系,來判斷是否可以使用BroadcastHashJoin。 但是該值的估計常常是不準確的,比如:有一個過濾效率很高的filter,可能使得過濾后的數(shù)據(jù)可以廣播,但是估計值卻偏大; 或者是Join的一端是一個很復雜的操作時,估計的值就更加不準確,常常估計出一個很大的值導致使用SortMergeJoin,而實際執(zhí)行后會發(fā)現(xiàn)該復雜的查詢后的結果集很小且適合廣播。
AQE在執(zhí)行過程中,重新進行優(yōu)化,可以利用前一個Stage執(zhí)行結果的大小,直接的知道是否適合廣播。如下例子:
在該例子中,兩個表原始Join時,根據(jù)CBO估計的大小是SortMergeJoin,但是當stage2執(zhí)行完成后,調用了AQE重新執(zhí)行優(yōu)化規(guī)則發(fā)現(xiàn),實際結果小于廣播閾值(默認10M),因此可以使用BroadcastHashJoin, 則會修改Join類型,從而節(jié)約Join的時間。
這里要注意,前兩個Stage中shuffle寫的操作此時已經完成,這部分的時間無法避免;能優(yōu)化的是Join的Stage中Shuffle讀以及Join的執(zhí)行時間。
3、動態(tài)優(yōu)化join數(shù)據(jù)傾斜
當每個分區(qū)的數(shù)據(jù)分布不均勻時,容易出現(xiàn)數(shù)據(jù)傾斜的問題,有些場景下尤其是Join時,若出現(xiàn)數(shù)據(jù)傾斜,可能會導致個別的task任務特別繁重,其他所有的task都執(zhí)行完畢,executor處于空閑狀態(tài),等待這幾個數(shù)據(jù)傾斜的task執(zhí)行完成。AQE能夠自動檢測具有數(shù)據(jù)傾斜的分區(qū),并將這些數(shù)據(jù)量很大的分區(qū)進行切分。 如下例: A和B表做Join,其中A0分區(qū)特別大,在沒有AQE時如下圖:
開啟AQE之后,將A0分區(qū)劃分為兩個差不多大的分區(qū)(A0-0和A0-1),這兩個分區(qū)同時與B0分區(qū)進行join,因為A0本來就是需要與B0做Join的。劃分后,使用了5個數(shù)據(jù)量相近的task同時執(zhí)行該任務,可以獲得更好的性能。
參考:https://databricks.com/blog/2020/05/29/adaptive-query-execution-speeding-up-spark-sql-at-runtime.html
數(shù)據(jù)湖探索 DLI
版權聲明:本文內容由網絡用戶投稿,版權歸原作者所有,本站不擁有其著作權,亦不承擔相應法律責任。如果您發(fā)現(xiàn)本站中有涉嫌抄襲或描述失實的內容,請聯(lián)系我們jiasou666@gmail.com 處理,核實后本網站將在24小時內刪除侵權內容。