Spark避坑指南----UnsafeRow對象的持久化
Spark推出Tungsten計劃用于提升Spark的性能與資源使用,其中為了消除JVM對象模型和GC代價,提供了UnsafeRow對象類型。它由jvm提供的sun.misc.Unsafe實現(xiàn),內部存儲的是二進制,繼承自InternalRow,是SparkSQL中的中間算子的處理和輸出數(shù)據(jù)類型。

正是由于UnsafeRow的特殊性,我們發(fā)現(xiàn)在某些情況下可能會無法正確序列/持久化該類型,產生數(shù)據(jù)讀取不一致的情況,下面我們通過幾個例子說明:
例子1:默認的RDD.saveAsObjectFile無法正確處理UnsafeRow類型
首先我們準備一些用于測試的數(shù)據(jù)表:
我們希望獲取到查詢`select * from emp, dept where emp.emp_dept_id = dept.dept_id`的結果,該查詢對兩張表進行了Join操作,后續(xù)我們希望將其RDD的數(shù)據(jù)保存,再重新讀取保存的數(shù)據(jù)。
為了得到RDD的結果,我們構造一個LogicalRDD的Plan,該Plan會直接Scan RDD的數(shù)據(jù)。再通過Dataset.ofrows來得到Plan的結果
得到的正確結果如下:
同樣的,我們調用rdd.saveAsObjectFile將RDD[InternalRow]持久化,再利用spark.sparkContext.objectFile讀取。
得到的結果發(fā)生了問題,它在總行數(shù)不變的情況下,數(shù)據(jù)被多次復制了,數(shù)據(jù)讀取不一致:
我們查看saveAsObjectFile方法,發(fā)現(xiàn)序列化的方式是Java的默認序列化方式,該方法無法正確序列化UnsafeRow對象。
例子2:RDD的checkpoint方法
Spark提供了checkpoint的方法幫助開發(fā)者做中間結果的持久化,開發(fā)者可以利用checkpoint將計算查詢中復雜的中間結果進行緩存,減少重復計算。
其中,localCheckpoint是將結果存在executor的本地磁盤中,checkpoint是將結果存在hdfs中,checkpoint相比localCheckpoint能獲得容錯機制,但是性能會相對較差。
在本例中,我們仍采用之前的數(shù)據(jù)和查詢,首先驗證一下localCheckpoint():
得到的結果也是錯誤的:
由于checkpoint是惰性的,并且在實際的調用過程中會將原來的計算重新執(zhí)行一遍,所以一般推薦在checkpoint之前進行cache操作,這樣到了真正執(zhí)行時,checkpoint會直接讀取cache的數(shù)據(jù),而不用觸發(fā)二次計算:
結果和localCheckpoint一樣,是錯誤的數(shù)據(jù)。
通過以上的例子我們發(fā)現(xiàn)在對UnsafeRow的類型持久化時,java的序列化方法不能起到正確的作用。UnsafeRow支持的序列化方式為Externalizable和KryoSerializable,我們再對例子2進行驗證,需要做兩處修改:
1) 在創(chuàng)建sparkSession的時候設置“spark.serializer” 為 “org.apache.spark.serializer.KryoSerializer”
2)在persist的時候選擇帶SER的StorageLevel
得到的結果如下:
和實際的結果是一致的。
EI企業(yè)智能 智能數(shù)據(jù) 數(shù)據(jù)湖探索 DLI
版權聲明:本文內容由網絡用戶投稿,版權歸原作者所有,本站不擁有其著作權,亦不承擔相應法律責任。如果您發(fā)現(xiàn)本站中有涉嫌抄襲或描述失實的內容,請聯(lián)系我們jiasou666@gmail.com 處理,核實后本網站將在24小時內刪除侵權內容。
版權聲明:本文內容由網絡用戶投稿,版權歸原作者所有,本站不擁有其著作權,亦不承擔相應法律責任。如果您發(fā)現(xiàn)本站中有涉嫌抄襲或描述失實的內容,請聯(lián)系我們jiasou666@gmail.com 處理,核實后本網站將在24小時內刪除侵權內容。