Impala - Runtime Filter的原理及實現
上一篇介紹了impala中Bloom Filter的實現原理,及其應用(即其在RuntimeFilter中的應用),有興趣的童鞋可以點這回顧下。那么RuntimeFilter在impala當中又是如何應用的呢?它在何時產生,又在何時應用以過濾數據,本文接著來學習介紹。
什么是Runtime Filter
一條SQL往往包括如下幾個部分:select/join/where/group by/order by,這幾個常用的算子分別對應著SQL執行計劃中的project、join、filter、aggregation和sort。Runtime Filter主要關注join和filter,先看這個例子:
select?A.name,?B.class?from?PEOPLE?A?join?CLASS?B?on?A.classid?=?B.id?where?A.age?>?10?and?B.grade?=?4;
A表中保存著100000條學生記錄,通過'age > 10'可以過濾掉其中的40000條;B表中保存1000個班級信息,通過'grade = 4' 可以過濾掉900條。join之后產生30000條記錄。從這個SQL的表名和字段名中不難分析出我們想得到什么結果。最簡單的執行引擎會首先將PEOPLE表和CLASS表從存儲引擎中讀出來,然后將它們按照classid和id進行比較然后join,之后根據join的結果中age和grade列進行過濾,最后讀取name列和classname列返回給用戶。 這種執行計劃最簡單直接,完全按照SQL語義順序執行,顯而易見性能是最差的。
聰明點的執行引擎會把filter和project下推到數據掃描節點(Impala中此處實現HdfsScanNode),首先從HDFS讀取數據,如果存儲引擎有提供傳入謂詞的API(例如Kudu)那就再好不過了,如果沒有則需要根據度上來的數據進行過濾,選擇需要的列交給joinNode。上面例子中要從PEOPLE表選擇classid、name和age列,從CLASS表中選擇id、classname和grade列,join節點(Impala中主要實現PartitionedHashJoinNode)根據classid和id進行inner join,然后選擇name和classname列輸出。這類映射和謂詞下推是SQL優化的基本手段了,效果如下:
那么Impala是采用了這種實現嗎?并不是,Impala使用Runtime Filter技術在上述基礎上進一步優化,它并不是使用SQL中寫明的條件,而是在運行時生成的、動態的過濾條件。在IMpala中使用Runtime Filter的一個前提是:通常假設join的兩個表一個是大表而另一個是小表,例如通常進行join的是一張事實表和一張維表。顯而易見對小表的掃描(HdfsScanNode)速度要遠遠快于大表,這樣的話就可以先對小表執行掃描操作,將輸出的記錄交由JoinNode,而大表則會主動等待一段時間(默認等待1000ms),JoinNode會根據小表輸出的記錄計算出一個過濾條件(PartitionedHashJoinNode,借助PhjBuilder來完成),這個條件就是本文的主角-Runtime Filter。接著JoinNode會將這個RF發送給執行大表掃描的HdfsScanNode,后者基于這個RF進行再次過濾,將過濾的記錄輸出給JoinNode,效果如下:
Runtime Filter在Impala中的作用
本質上它還是表掃描過程的一個Filter,與謂詞下推不同,后者在實現讀取更少的數據的同時意味著更少的記錄輸出,而RF仍然需要從存儲引擎中先讀取數據再進行過濾,無法將Filter應用到存儲引擎,因此它只能減少輸出的記錄數。
Runtime Filter在Impala中的實現
上面已經說明了什么是Runtime Filter,那么它具體是什么樣的一種Filter呢,沒錯就是上一篇提到的Bloom Filter(點這回顧)。JoinNode會依賴小表掃描后輸出的記錄生成一個BF,BF中包含了小表中等值on條件中所有的id列值,大表進行scan時依據此BF進行數據過濾。
Impala中的RF主要有兩類:Local和Global,這主要基于不同join算法實現。常用的join算法有很多,Impala只實現了其中的兩種:hash join(Impala中PartitionedHashJoinNode)和?nested loop join(Impala中NestedLoopJoinNode),后者針對一些比較特殊的場景,例如復雜數據結構的查詢和非等值join,在此不做更多討論;而hash join又分為shuffle join和broadcast join兩種,前者主要針對大表之間的join,后者針對大表和小表之間的join。不論采用哪種join,最終都是基于BF實現,join節點首先根據小表(通常是右表)的輸入構建BF中的hashtable,這個過程稱為build階段。然后對于大表的每一條記錄進行匹配過濾以生成一條或多條記錄,這個過程稱為probe階段。
shuffle join是通過將兩個輸入表分別進行shuffle,保證整個join被分割成多個完全不重疊的任務并行執行,join計算之后輸出到父節點匯總
broadcast join是通過將小表廣播,大表的數據掃描不需要網絡傳輸而直接輸出到join節點,join節點處理的是整個小表和部分大表的數據。
回到RF的種類,Local表示生成的RF不需要傳輸到遠端的HdfsScanNode就可以直接應用(即本地的HdfsScanNode)。典型的情況時broadcast join的時候,JoinNode和左表的ScanNode是在一個Fragment中實現的(在一個線程中),由于每一個節點上運行的join都會獲取到所有的右表數據,因此都能夠build出完整的基于右表數據的RF信息,然后直接將這個信息交給左表的ScanNode,不需要經過任何的網絡傳輸。Global是指全局的,例如在執行shuffle join的時候,每一個分區都只讀取部分數據交給JoinNode聚合,而每一個JoinNode都只處理全局數據的一部分,因此也只能生成部分RF,它需要將這個局部的RF交給Coordinator節點進行合并,然后再由Coordinator推送到每一個大表ScanNode上,完成RF的分發。
一個RF(更準確的說是RF中BF,因為RF信息在JoinNode和ScanNode生成的時候就已經確定,但其中的BF是空的,需要等待JoinNode基于右表數據生成后發送給ScanNode)從誕生到應用的流程如下:
執行順序:
1、同時下發兩個表的Scan操作,左邊是大表右邊是小表(相對而言),但是左表會等待一段時間,因此右表的Scan會先執行。
2、右表的掃描的結果根據join鍵哈希傳遞到不同的Join節點,由Join節點執行BF的構建和RF的構建。
3、Join節點完成RF的構建后將RF交給Coordinator節點(如果是Broadcast Join則會直接交給左表的Scan節點)。
4、Coordinator節點將不同的RF進行merge,也就是把BF進行merge,merge之后它將這個RF分發給每一個左表Scan。
5、左表會等待一段時間(默認1000ms)再開啟數據掃描以盡可能的等待RF的到達,但是無論RF是否到達,左邊在等待超時后都會開始掃描。遲到的RF會在到達那一刻之后被應用。(這里多說一句,Impala的實現是對整個RF列表進行遍歷,依次檢查是否到達
6、左表使用RF完成掃描之后將數據交給Join節點,以完成整個Join過程。
過程5的等待,是在ScanNode初始化后進行,更準確的是在上游ExecNode首次調用GetNext()獲取數據時進行,以確保當前ScanNode的創建及初始化能夠立即完成不受RF的影響。具體細節本文暫不介紹。
過程5的默認等待時間,可通過啟動參數runtime_filter_wait_time_ms修改為其他值,比如在左右表數據量都較大的情況下,需要更多的時間以完成RF的生成,可以設置為2000或更大。也可以在impala-shell中通過set命令設置,這僅對當前會話生效。更多的配置參數這里暫不多說,可以查閱官網。
Runtime Filter在Impala中的實現細節
更多實現細節此處不做更多說明,僅拋出幾個關鍵類,有興趣的同學可以自行閱讀源碼
RuntimeFilter
總結
目前RF被應用到Parquet、Orc、TEXT,Avro,RCFile和SequenceFile等多種文件格式,對查詢性能的提升是非常可觀。但其實RF并不總是有效的,在某種極端情況下比如join列的數據差集非常小,join兩邊的表并不能過濾掉很多數據,這時候進行大量的無謂的RF過濾反而會浪費資源(從BF的角度來說,每查必中)。好在Impala考慮到了這一點,會根據誤檢率這個指標更新RF為always true,從而減少更多的hash運算。如果生成RF的開銷超過了其帶來的性能提升,甚至可以完全關閉該功能,具體方法是設置RUNTIME_FILTER_MODE為off。
注:本文的圖片來源于網絡,因其通俗易懂因此借用。
————————————————
參考文檔
https://impala.apache.org/docs/build/html/topics/impala_runtime_filtering.html
https://blog.csdn.net/yu616568/java/article/details/77073166
存儲 SQL 數據庫
版權聲明:本文內容由網絡用戶投稿,版權歸原作者所有,本站不擁有其著作權,亦不承擔相應法律責任。如果您發現本站中有涉嫌抄襲或描述失實的內容,請聯系我們jiasou666@gmail.com 處理,核實后本網站將在24小時內刪除侵權內容。
版權聲明:本文內容由網絡用戶投稿,版權歸原作者所有,本站不擁有其著作權,亦不承擔相應法律責任。如果您發現本站中有涉嫌抄襲或描述失實的內容,請聯系我們jiasou666@gmail.com 處理,核實后本網站將在24小時內刪除侵權內容。