Hive RuntimeFilter
1????? 介紹
select *
from store join store_sales on (store.id ? = store_sales.store_id)
where store.s_store_name = 'My Store'
在這個語句中,事實表store_sales與維度表store通過store_id進行jion,維度表store通過s_store_name進行過濾。Store表經過過濾后得到的記錄條數可能非常少,store_id的值也非常少。所以可以將store_id的最大最小值,及bloomFilter作為store_sales表的過濾條件,過濾條件甚至可以下推至存儲層,減少數據的讀取IO。
Hive Runtime Filter的優化是基于動態分區剪裁優化,推薦先閱讀《Hive 動態分區剪裁原理》之后再閱讀本文。
2????? Runtime filter使用
針對Hive3.1.0版本,有如下的參數與Runtime Filter有關。需要注意的是,只有開啟動態分區剪裁才能開啟runtime filter。
參數名
默認值
描述
hive.tez.dynamic.partition.pruning
true
動態分區剪裁
hive.tez.dynamic.semijoin.reduction
true
是否開啟runtime ? filter
hive.tez.min.bloom.filter.entries
1000000L
Bloom過濾器最小條數
hive.tez.max.bloom.filter.entries
100000000L
Bloom過濾器最大條數
hive.tez.bloom.filter.factor
1.0
hive.tez.bigtable.minsize.semijoin.reduction
100000000L
大表的最小值
hive.tez.dynamic.semijoin.reduction.threshold
0.5
計算的cost大于0.5才會執行runtime filter
3????? 原理
3.1????? 物理優化
Runtime filter的邏輯優化與動態分區剪裁的邏輯優化相同,都是先合成動態filter。這里不再贅述。Runtime filter的物理優化發生在DynamicPartitionPruningOptimization類中,當動態過濾條件的列是分區列時,執行動態分區剪裁優化,如果只是普通的列,就會執行runtime filter優化。
如上圖所示,當判斷列為非分區字段時,執行runtime filter優化。針對每一個dynamic value filter,hive執行以下操作:
1.???????? 取出當前filter的predicate表達式中的所有列,遍歷所有列,如果是非分區列執行以下runtimefilter優化,否則執行動態分區剪裁優化。
2.???????? 給動態值表達式指向的RS的父節點增加runtime filter分支。分支的內容包括:SelectOperator、GroupByOperator、ReduceSinkOperator。其中GroupByOperator會通過聚合函數計算列的最大值、最小值、和bloom filter。Reduce Operator會將最大值、最小值和bloom filter序列化發送給TableScanOperator。圖中用虛線表示runtime filter分支與TableScanOperator的關系,實際上在Operator樹中,這兩者并沒有父子關系。在tez生成task時才會將兩者連接起來。
3.???????? 替換TableScanOperator的predicate為min,max和bloomfilter的表達式。min,max和bloomfilter的值在編譯階段都是不可知的,hive使用ExprNodeDynamicValueDesc對象表示動態的值,在運行時可通過該對象得到真實的值。
4.???????? FilterOperator的predicate設置為true,后續優化將會把predicate的filter移除。
3.2????? 運行時
Runtime filter在運行時包含兩個部分:
1.???????? Runtime filter分支會生成一個reducer,該reducer運行GroupByOperator,通過定義的min、max、bloomfilter三個聚合函數生成三個值。隨后的ReducerSinkOperator會將這三個值序列化。
2.???????? Mapper在運行TableScanOperator之前會執行初始化操作,初始化操作中會等待runtime filter的輸入,并且將runtime filter的輸入反序列化為min,max和bloomfilter對象。在構建reader時,會將min,max下推到存儲層。而bloom filter作為TableScanOperator的過濾條件。
這里有兩個問題需要解決:
l? Mapper如何知道其需要等待哪個runtime filter的輸入?
在tez生成task階段會在Mapper task中注入runtime filter的信息,mapper初始化時拿到這個信息就可以知道需要等待的runtime filter了。
l? TableScanOperator的predicate使用了ExprNodeDynamicValueDesc對象來表示動態的值,那么在運行時如何知道真實的值?
每一個runtime filter分支都由唯一key標識,runtime filter的三個輸出由key_min, key_max, key_bloomfilter標識。Mapper在初始化時,會將獲取的runtime filter三個值存儲到cache中,而ExprNodeDynamicValueDesc對象中也保存了一個key,在計算ExprNodeDynamicValueDesc對象的值時,起始是通過key從cache中讀取相應的值。
4????? 缺陷
目前hive runtime filter和dynamic partition pruning都只是兩張表直接通過join生成的,如果多張表通過多個jion間接關聯,是否也可以生成runtime filter和動態分區剪裁呢?這可能是hive優化的一個方向。
5????? 參考文檔
1.???? https://issues.apache.org/jira/browse/HIVE-15269
大數據 EI企業智能
版權聲明:本文內容由網絡用戶投稿,版權歸原作者所有,本站不擁有其著作權,亦不承擔相應法律責任。如果您發現本站中有涉嫌抄襲或描述失實的內容,請聯系我們jiasou666@gmail.com 處理,核實后本網站將在24小時內刪除侵權內容。