千個Hive UDF遷移到Spark--Facebook實踐經驗

      網友投稿 1325 2025-04-01

      多年來,Facebook已將Hive用作主要的查詢引擎,當Facebook將作業從Hive遷移到Spark SQL時,遇到了各種各樣的挑戰和困難,其中Hive UDF的遷移就是遇到了很多明顯的問題。本文將著重介紹Facebook在將上千個Hive UDF遷移到Spark SQL時,遇到的兼容性、功能、性能方面的幾個問題,以及相應的解決辦法。

      一、什么是Hive UDF

      Hive 支持的函數分為內置函數,和用戶自定義函數(UDF)。當內置函數不足以滿足需求時,可以通過自定義函數邏輯注冊成UDF在SQL中使用即可。

      UDF分為三類:

      1.?普通UDF

      輸入一行數據中的一列或者多列,輸出單個值。

      例如:

      SELECT?is_substring(col1,?col2)?AS?substring?FROM?normal_udf

      輸入

      輸出

      上述例子中,判斷col2列是否是col1的子子字符串,每一行對應一個輸出結果。

      2. 自定義聚合函數

      輸入表中的一行或多行數據,輸出單個結果。例如:max、min屬于內置聚合函數。

      例如:

      SELECT?CONTACT_SET(id)?AS?all_ids?FROM?dim_three_row

      輸入

      輸出

      上述例子中,COLLECT_SET方法即將id列多個值進行聚合, 用逗號分隔后輸出一個值。

      3.自定義表生成函數

      輸入表中一行數據,輸出多行多列數據,即類似于生成一個表

      例如:

      SELECT?a.id,?b.col1,?b.col2?FROM?a?LATERAL?VIEW?split_str(a.name)?b?AS?name,?age

      輸入

      輸出

      上述例子,split_str方法對name列進行切分,并結合literal view一起使用,將UDTF的執行結果轉儲為view b,再與a表進行合并輸出最后結果。

      二、Spark中如何執行HiveUDF

      由于Hive中部分數據類型Spark并不支持。 在Spark中通過封裝類,創建Hive的GenericUDF,SimpleGenericUDAF 和GenericUDTF這三種基本類型,例如下圖第6行代碼。然后調用Hive中的接口執行相應的方法體,再輸出最終結果時,再進行解封裝將結果轉換為Spark中的數據類型(第10行)。

      三、Hive UDF遷移到Spark SQL的困難

      Facebook計算引擎主要時Hive,其中包含有一千多的UDF,UDF的執行時間占整個業務CPU時間的70%。所以UDF的遷移是整個遷移工作中很重要的一部分。經過最初的幾線測試發現,其中只有58%的UDF能夠被很好的兼容和支持,對于失敗的測試用例所遇到的問題可以分為如下幾類:

      1.? Spark不支持部分Hive接口

      眾所周知,目前Spark不支持Hive的部分接口。如下:

      以上這些方法都存在兼容性問題,雖然通過業務調整可以避免一些不兼容的接口,但是getRequiredJars,getRequiredFiles這兩個方法的使用特別的廣泛,它們是用于自動加載需要的文件和jar包的,比如在executor中讀取文件時,則需要保證該文件已經在executor的工作目錄中,是否則就會出錯。需要解決該兼容性問題。

      解決方法:

      在Spark driver端進行UDF初始化時,識別出需要用到的文件和jar包,通過SparkContext.addJar 和 addFile方法注冊這些資源并分布到各個executor節點。

      在executor中進行二次確認,先查看需要用到的資源是否已經在工作目錄中,若已經包含則不需要處理;否則,嘗試創建一個軟鏈接到該文件的絕對路徑中。這里沒有直接復制文件到executor節點,是為了防止資源文件或jar包過大,帶來太多額外的工作負擔。

      2.?線程安全

      上千個Hive UDF遷移到Spark--Facebook實踐經驗

      Hive中每個任務都在單獨的JVM進程中運行,因此絕大多數Hive UDF的編寫都沒有考慮并發性問題,但是在Spark中每個executor運行在一個單獨的JVM進程中,一個executor中可以同時運行多個任務。因此Hive UDF直接運行在Spark中可能會存在線程不安全的問題。

      如下例子中,定義了一個static的全局變量mapping,當在Spark中運行時,可能會有多個實例同時執行該UDF,當兩個實例都運行到17~19行時,實例1判斷mapping為空,執行初始化,此時實例2也判斷發現mapping為空,也進行初始化。實例1執行略快,當實例1初始化完成向mapping中寫數據后,實例2初始化完成,覆蓋了實例1中的數據,從而導致數據丟失。

      解決方法1:

      通過Synchronize方法加鎖,對mapping的初始化進行加鎖,從而避免多個進程同時初始化。

      這種方法有兩個弊端:1、頻繁的加鎖解鎖導致性能下降,因為每一行數據進來都需要進行加鎖操作,降低執行效率;2、代碼改動量大,上千個UDF都進行這樣的修改,代碼工作量大。

      解決方法2:

      將mapping對象從static改為普通對象,每個實例使用自己內部的mapping對象。

      缺點:實例之間數據和狀態無法共享,消耗更多的內存,因為每個實例都需要保存一份該數據;

      優點:代碼改動小,易于操作。

      3.?序列化和反序列化

      在Spark中,UDF的對象在driver端進行初始化、序列化,然后分發給executor節點,在executor中再進行反序列化讀取對象。這樣的序列化和反序列化中有一些問題,一些類型可以用Kryo進行序列化但是卻不能正確的反序列化,例如:guava的ImmutableSet類型。

      解決方法:

      1、? 對于公共的常用類型,可以自定義序列化方法,或者引入已有的序列化和反序列化方法;

      2、? 對于無法正確序列化和反序列化的對象,加上transient修飾詞。即:該對象再driver端只做初始化,不進行序列化,在executor中需要使用該對象是,重新進行初始化。

      4.? 性能問題

      由于Spark和Hive都不支持對方的一些數據類型,在Spark中執行Hive UDF時,先將Spark的數據類型轉換為Hive的inspectors或Java類型,在Hive中進行計算,執行完成后再將結果從Hive類型轉換為Spark類型,如下圖紅框中的部分代碼。

      這種封裝和解封的過程,比Spark原生的UDF需要多花2倍的CPU,對于復雜的數據類型,如:map, array, structure等則需要花費更多的時間。Facebook的UDF在Spark計算中占用15%的CPU時間,因此,對于耗時最長的那些Hive UDF,可以將其轉換為Spark原生的UDF。

      四、Partial Aggregate

      Hive中max函數即為聚合函數,其執行流程如下圖, Mapper端將所有數據進行shuffle,在Reducer端將相同key的數據讀取到同一個reducer中進行max的計算。

      SELECT?id,?max(value)?FROM?t1?GROUP?BY?id

      這種計算方法有兩個問題:(1)每一條數據都需要通過網絡shuffle傳播,帶來的網絡壓力很大;(2)當存在數據傾斜時,個別的reducer處理時長將明顯比其他reducer長,出現大部分reducer空閑,等待某一個reducer執行的情況,從而導致整個作業的執行時間很長。

      同樣的sql和UDF,Partial Aggregation計算過程如下,每一個mapper先將自己本地的數據執行一遍部分聚合,再將聚合后的結果進行shuffle,reducer再對所有的部分聚合結果進行全局的聚合。

      優點:不必要所有的數據都通過網絡進行shuffle傳輸,減輕網絡壓力;部分計算壓力移到mapper端,減輕reducer壓力,并且在一定程度上可以減輕數據傾斜帶來的弊端。

      Spark已經支持了Partial Aggregation,通過一定的適配Hive UDF使其可以支持Partial Aggregation后,整體的工作性能提升,CPU提升20%,shuffle的數據量減少17%。但是卻存在部分UDF性能變差,最差的變慢了300%。

      經過分析發現有兩種情況會導致性能變慢:

      (1) 查詢規模:列擴展的情況下,可能會導致shuffle的數據量變多。如下例子,查詢的列包含了min, max,count,avg,經過partial aggregation mapper端輸出列從原來的一列,變為5列,但是mapper端合并的數據量卻很少,從而導致整體的shuffle數據量增多。

      (2) 數據分布:在一些分布情況下,mapper端數據不能進行合并或者合并的極少,導致數據量沒有變少,但是卻增加了額外的CPU時間。

      解決方法:

      我們可以看出,影響Partial aggregation性能的有這三個因素: UDAF的性能、列擴張、行縮減。Facebook提出一種基于代價的Optimizer,利用輸入列數、輸出的列數、UDAF的計算代價等等屬性,計算出計算代價,從而決定是否使用Partial aggregation。

      經過該方法,整體的性能有所提升,但是并不能解決所有的UDAF的執行性能變慢的問題,因為“行縮減”這個因素并沒有一個好的衡量標準。因為,數據集每天的分布不一定相同、即使是相同的數據集在不同聚合函數下分布情況是不一樣的,所以“行縮減”的程度較難量化。

      五、未來工作

      根據以上分析和介紹,其中性能的提升還有待進一步優化。將考慮一種基于歷史作業的開關,決定是否使用Partial aggregation。基于歷史作業的eventLog,可以得到很多的統計信息,包括各個節點的執行時間、輸出的數據行數等。基于這些信息可以做出更好的預測是否使用Partial aggregation。

      近年來,越來越多的論文中提到,利用歷史作業的執行信息進行作業優化,比如:提取高價值SubExpression等,進行物化提升具有相同子結構作業的執行時間,相信未來eventLog的利用將越來越多樣,越成熟。

      引用

      https://databricks.com/session/supporting-over-a-thousand-custom-hive-user-defined-functions

      軟件開發 云計算

      版權聲明:本文內容由網絡用戶投稿,版權歸原作者所有,本站不擁有其著作權,亦不承擔相應法律責任。如果您發現本站中有涉嫌抄襲或描述失實的內容,請聯系我們jiasou666@gmail.com 處理,核實后本網站將在24小時內刪除侵權內容。

      版權聲明:本文內容由網絡用戶投稿,版權歸原作者所有,本站不擁有其著作權,亦不承擔相應法律責任。如果您發現本站中有涉嫌抄襲或描述失實的內容,請聯系我們jiasou666@gmail.com 處理,核實后本網站將在24小時內刪除侵權內容。

      上一篇:為什么你需要OA系統,解放你的工作效率
      下一篇:家具定制管理系統
      相關文章
      亚洲综合另类小说色区| 亚洲av色福利天堂| 噜噜综合亚洲AV中文无码| 亚洲AV无码成H人在线观看| 亚洲成a人片在线观看无码专区| 亚洲日本乱码一区二区在线二产线 | 久久久久亚洲AV无码麻豆| 久久久久久亚洲精品无码| 亚洲乱亚洲乱妇24p| 亚洲综合一区二区精品导航| 亚洲av日韩av无码| 亚洲国产人成在线观看69网站 | 精品久久久久久久久亚洲偷窥女厕| 亚洲天堂男人影院| 久久亚洲AV无码精品色午夜麻| 久久九九亚洲精品| 亚洲av日韩av不卡在线观看 | 国产成人不卡亚洲精品91| 亚洲剧情在线观看| 亚洲av无码精品网站| 亚洲an天堂an在线观看| 久久久无码精品亚洲日韩按摩| 亚洲美女在线观看播放| 亚洲大成色www永久网站| 亚洲欧洲日产国产综合网| 激情内射亚洲一区二区三区| 亚洲熟女少妇一区二区| 亚洲国产精品嫩草影院在线观看 | 色婷婷亚洲十月十月色天| 亚洲国产精品久久网午夜 | JLZZJLZZ亚洲乱熟无码| 国产精品国产亚洲区艳妇糸列短篇| 亚洲AV成人精品日韩一区| 亚洲大成色www永久网址| 亚洲国产欧美国产综合一区| 亚洲成人午夜电影| 亚洲综合小说另类图片动图| 亚洲高清不卡视频| 国产亚洲精品影视在线| 欧洲亚洲综合一区二区三区| 久久久久亚洲av成人无码电影|