Spark Core快速入門系列(9) | RDD緩存和設置檢查點
大家好,我是不溫卜火,是一名計算機學院大數據專業大二的學生,昵稱來源于成語—不溫不火,本意是希望自己性情溫和。作為一名互聯網行業的小白,博主寫博客一方面是為了記錄自己的學習過程,另一方面是總結自己所犯的錯誤希望能夠幫助到很多和自己一樣處于起步階段的萌新。但由于水平有限,博客中難免會有一些錯誤出現,有紕漏之處懇請各位大佬不吝賜教!暫時只有csdn這一個平臺,博客主頁:https://buwenbuhuo.blog.csdn.net/
此篇為大家帶來的是RDD緩存和設置檢查點
目錄
一. RDD緩存
二. 設置檢查點(checkpoint)
一. RDD緩存
RDD通過persist方法或cache方法可以將前面的計算結果緩存,默認情況下 persist() 會把數據以序列化的形式緩存在 JVM 的堆空間中。
但是并不是這兩個方法被調用時立即緩存,而是觸發后面的action時,該RDD將會被緩存在計算節點的內存中,并供后面重用。
通過查看源碼發現cache最終也是調用了persist方法,默認的存儲級別都是僅在內存存儲一份,Spark的存儲級別還有好多種,存儲級別在object StorageLevel中定義的。
在存儲級別的末尾加上“_2”來把持久化數據存為兩份
緩存有可能丟失,或者存儲存儲于內存的數據由于內存不足而被刪除,RDD的緩存容錯機制保證了即使緩存丟失也能保證計算的正確執行。通過基于RDD的一系列轉換,丟失的數據會被重算,由于RDD的各個Partition是相對獨立的,因此只需要計算丟失的部分即可,并不需要重算全部Partition。
// 1.創建一個RDD scala> val rdd = sc.makeRDD(Array("buwenbuhuo")) rdd: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[19] at makeRDD at
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
二. 設置檢查點(checkpoint)
Spark 中對于數據的保存除了持久化操作之外,還提供了一種檢查點的機制,檢查點(本質是通過將RDD寫入Disk做檢查點)是為了通過 Lineage 做容錯的輔助
Lineage 過長會造成容錯成本過高,這樣就不如在中間階段做檢查點容錯,如果之后有節點出現問題而丟失分區,從做檢查點的 RDD 開始重做 Lineage,就會減少開銷。
檢查點通過將數據寫入到 HDFS 文件系統實現了 RDD 的檢查點功能。
為當前 RDD 設置檢查點。該函數將會創建一個二進制的文件,并存儲到 checkpoint 目錄中,該目錄是用 SparkContext.setCheckpointDir()設置的。在 checkpoint 的過程中,該RDD 的所有依賴于父 RDD中 的信息將全部被移除。
對 RDD 進行 checkpoint 操作并不會馬上被執行,必須執行 Action 操作才能觸發, 在觸發的時候需要對這個 RDD 重新計算.
1. 代碼
package Day04 import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} /** ** @author 不溫卜火 ** * @create 2020-07-26 15:35 ** * MyCSDN :https://buwenbuhuo.blog.csdn.net/ */ object CheckPointDemo { def main(args: Array[String]): Unit = { // 要在SparkContext初始化之前設置, 都在無效 System.setProperty("HADOOP_USER_NAME", "buwenbuhuo") val conf = new SparkConf().setAppName("Practice").setMaster("local[2]") val sc = new SparkContext(conf) // 設置 checkpoint的目錄. 如果spark運行在集群上, 則必須是 hdfs 目錄 sc.setCheckpointDir("./ck1") val rdd1 = sc.parallelize(Array("abc")) val rdd2: RDD[String] = rdd1.map(_ + " : " + System.currentTimeMillis()) /* 標記 RDD2的 checkpoint. RDD2會被保存到文件中(文件位于前面設置的目錄中), 并且會切斷到父RDD的引用, 也就是切斷了它向上的血緣關系 該函數必須在job被執行之前調用. 強烈建議把這個RDD序列化到內存中, 否則, 把他保存到文件的時候需要重新計算. */ rdd2.checkpoint() rdd2.collect().foreach(println) rdd2.collect().foreach(println) rdd2.collect().foreach(println) } }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
2. 結果
3. 持久化和checkpoint的區別
持久化只是將數據保存在 BlockManager 中,而 RDD 的 Lineage 是不變的。但是checkpoint 執行完后,RDD 已經沒有之前所謂的依賴 RDD 了,而只有一個強行為其設置的checkpointRDD,RDD 的 Lineage 改變了。
持久化的數據丟失可能性更大,磁盤、內存都可能會存在數據丟失的情況。但是 checkpoint 的數據通常是存儲在如 HDFS 等容錯、高可用的文件系統,數據丟失可能性較小。
注意: 默認情況下,如果某個 RDD 沒有持久化,但是設置了checkpoint,會存在問題. 本來這個 job 都執行結束了,但是由于中間 RDD 沒有持久化,checkpoint job 想要將 RDD 的數據寫入外部文件系統的話,需要全部重新計算一次,再將計算出來的 RDD 數據 checkpoint到外部文件系統。 所以,建議對 checkpoint()的 RDD 使用持久化, 這樣 RDD 只需要計算一次就可以了.
本次的分享就到這里了,
好書不厭讀百回,熟讀課思子自知。而我想要成為全場最靚的仔,就必須堅持通過學習來獲取更多知識,用知識改變命運,用博客見證成長,用行動證明我在努力。
如果我的博客對你有幫助、如果你喜歡我的博客內容,請“” “評論”“”一鍵三連哦!聽說的人運氣不會太差,每一天都會元氣滿滿呦!如果實在要白嫖的話,那祝你開心每一天,歡迎常來我博客看看。
碼字不易,大家的支持就是我堅持下去的動力。后不要忘了關注我哦!
spark
版權聲明:本文內容由網絡用戶投稿,版權歸原作者所有,本站不擁有其著作權,亦不承擔相應法律責任。如果您發現本站中有涉嫌抄襲或描述失實的內容,請聯系我們jiasou666@gmail.com 處理,核實后本網站將在24小時內刪除侵權內容。