Spark Core快速入門系列(9) | RDD緩存和設置檢查點

      網友投稿 857 2022-05-29

      大家好,我是不溫卜火,是一名計算機學院大數據專業大二的學生,昵稱來源于成語—不溫不火,本意是希望自己性情溫和。作為一名互聯網行業的小白,博主寫博客一方面是為了記錄自己的學習過程,另一方面是總結自己所犯的錯誤希望能夠幫助到很多和自己一樣處于起步階段的萌新。但由于水平有限,博客中難免會有一些錯誤出現,有紕漏之處懇請各位大佬不吝賜教!暫時只有csdn這一個平臺,博客主頁:https://buwenbuhuo.blog.csdn.net/

      此篇為大家帶來的是RDD緩存和設置檢查點

      目錄

      一. RDD緩存

      二. 設置檢查點(checkpoint)

      一. RDD緩存

      RDD通過persist方法或cache方法可以將前面的計算結果緩存,默認情況下 persist() 會把數據以序列化的形式緩存在 JVM 的堆空間中。

      但是并不是這兩個方法被調用時立即緩存,而是觸發后面的action時,該RDD將會被緩存在計算節點的內存中,并供后面重用。

      通過查看源碼發現cache最終也是調用了persist方法,默認的存儲級別都是僅在內存存儲一份,Spark的存儲級別還有好多種,存儲級別在object StorageLevel中定義的。

      在存儲級別的末尾加上“_2”來把持久化數據存為兩份

      緩存有可能丟失,或者存儲存儲于內存的數據由于內存不足而被刪除,RDD的緩存容錯機制保證了即使緩存丟失也能保證計算的正確執行。通過基于RDD的一系列轉換,丟失的數據會被重算,由于RDD的各個Partition是相對獨立的,因此只需要計算丟失的部分即可,并不需要重算全部Partition。

      // 1.創建一個RDD scala> val rdd = sc.makeRDD(Array("buwenbuhuo")) rdd: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[19] at makeRDD at :25 // 2.將RDD轉換為攜帶當前時間戳不做緩存 scala> val nocache = rdd.map(_.toString+System.currentTimeMillis) nocache: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[20] at map at :27 // 3.多次打印結果 scala> nocache.collect res0: Array[String] = Array(buwenbuhuo1538978275359) scala> nocache.collect res1: Array[String] = Array(buwenbuhuo1538978282416) scala> nocache.collect res2: Array[String] = Array(buwenbuhuo1538978283199) // 4.將RDD轉換為攜帶當前時間戳并做緩存 scala> val cache = rdd.map(_.toString+System.currentTimeMillis).cache cache: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[21] at map at :27 // 5.多次打印做了緩存的結果 scala> cache.collect res3: Array[String] = Array(buwenbuhuo1538978435705) scala> cache.collect res4: Array[String] = Array(buwenbuhuo1538978435705) scala> cache.collect res5: Array[String] = Array(buwenbuhuo1538978435705)

      1

      2

      3

      4

      5

      6

      7

      8

      9

      10

      11

      12

      13

      14

      15

      16

      17

      18

      19

      20

      21

      22

      23

      24

      25

      26

      27

      28

      29

      Spark Core快速入門系列(9) | RDD緩存和設置檢查點

      30

      31

      32

      二. 設置檢查點(checkpoint)

      Spark 中對于數據的保存除了持久化操作之外,還提供了一種檢查點的機制,檢查點(本質是通過將RDD寫入Disk做檢查點)是為了通過 Lineage 做容錯的輔助

      Lineage 過長會造成容錯成本過高,這樣就不如在中間階段做檢查點容錯,如果之后有節點出現問題而丟失分區,從做檢查點的 RDD 開始重做 Lineage,就會減少開銷。

      檢查點通過將數據寫入到 HDFS 文件系統實現了 RDD 的檢查點功能。

      為當前 RDD 設置檢查點。該函數將會創建一個二進制的文件,并存儲到 checkpoint 目錄中,該目錄是用 SparkContext.setCheckpointDir()設置的。在 checkpoint 的過程中,該RDD 的所有依賴于父 RDD中 的信息將全部被移除。

      對 RDD 進行 checkpoint 操作并不會馬上被執行,必須執行 Action 操作才能觸發, 在觸發的時候需要對這個 RDD 重新計算.

      1. 代碼

      package Day04 import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} /** ** @author 不溫卜火 ** * @create 2020-07-26 15:35 ** * MyCSDN :https://buwenbuhuo.blog.csdn.net/ */ object CheckPointDemo { def main(args: Array[String]): Unit = { // 要在SparkContext初始化之前設置, 都在無效 System.setProperty("HADOOP_USER_NAME", "buwenbuhuo") val conf = new SparkConf().setAppName("Practice").setMaster("local[2]") val sc = new SparkContext(conf) // 設置 checkpoint的目錄. 如果spark運行在集群上, 則必須是 hdfs 目錄 sc.setCheckpointDir("./ck1") val rdd1 = sc.parallelize(Array("abc")) val rdd2: RDD[String] = rdd1.map(_ + " : " + System.currentTimeMillis()) /* 標記 RDD2的 checkpoint. RDD2會被保存到文件中(文件位于前面設置的目錄中), 并且會切斷到父RDD的引用, 也就是切斷了它向上的血緣關系 該函數必須在job被執行之前調用. 強烈建議把這個RDD序列化到內存中, 否則, 把他保存到文件的時候需要重新計算. */ rdd2.checkpoint() rdd2.collect().foreach(println) rdd2.collect().foreach(println) rdd2.collect().foreach(println) } }

      1

      2

      3

      4

      5

      6

      7

      8

      9

      10

      11

      12

      13

      14

      15

      16

      17

      18

      19

      20

      21

      22

      23

      24

      25

      26

      27

      28

      29

      30

      31

      32

      33

      34

      35

      36

      2. 結果

      3. 持久化和checkpoint的區別

      持久化只是將數據保存在 BlockManager 中,而 RDD 的 Lineage 是不變的。但是checkpoint 執行完后,RDD 已經沒有之前所謂的依賴 RDD 了,而只有一個強行為其設置的checkpointRDD,RDD 的 Lineage 改變了。

      持久化的數據丟失可能性更大,磁盤、內存都可能會存在數據丟失的情況。但是 checkpoint 的數據通常是存儲在如 HDFS 等容錯、高可用的文件系統,數據丟失可能性較小。

      注意: 默認情況下,如果某個 RDD 沒有持久化,但是設置了checkpoint,會存在問題. 本來這個 job 都執行結束了,但是由于中間 RDD 沒有持久化,checkpoint job 想要將 RDD 的數據寫入外部文件系統的話,需要全部重新計算一次,再將計算出來的 RDD 數據 checkpoint到外部文件系統。 所以,建議對 checkpoint()的 RDD 使用持久化, 這樣 RDD 只需要計算一次就可以了.

      本次的分享就到這里了,

      好書不厭讀百回,熟讀課思子自知。而我想要成為全場最靚的仔,就必須堅持通過學習來獲取更多知識,用知識改變命運,用博客見證成長,用行動證明我在努力。

      如果我的博客對你有幫助、如果你喜歡我的博客內容,請“” “評論”“”一鍵三連哦!聽說的人運氣不會太差,每一天都會元氣滿滿呦!如果實在要白嫖的話,那祝你開心每一天,歡迎常來我博客看看。

      碼字不易,大家的支持就是我堅持下去的動力。后不要忘了關注我哦!

      spark

      版權聲明:本文內容由網絡用戶投稿,版權歸原作者所有,本站不擁有其著作權,亦不承擔相應法律責任。如果您發現本站中有涉嫌抄襲或描述失實的內容,請聯系我們jiasou666@gmail.com 處理,核實后本網站將在24小時內刪除侵權內容。

      上一篇:UAVStack之文件數據歸集
      下一篇:【愚公系列】2021年12月 Redis數據庫-Redis的配置
      相關文章
      亚洲国产精品无码久久SM| 亚洲av无码专区在线电影天堂| 亚洲白嫩在线观看| 久久亚洲av无码精品浪潮| 亚洲日韩国产一区二区三区在线| 亚洲女人18毛片水真多| 亚洲无限乱码一二三四区| 亚洲一二成人精品区| 久久亚洲春色中文字幕久久久| 亚洲女同成av人片在线观看| 亚洲乱码国产一区三区| 亚洲色欲久久久综合网| 国精无码欧精品亚洲一区| 日韩va亚洲va欧洲va国产| 久久久亚洲精品国产| 亚洲视频免费播放| 亚洲免费在线观看视频| 亚洲伊人久久精品| 在线亚洲高清揄拍自拍一品区| 亚洲欧美日韩中文字幕一区二区三区| 亚洲欧美日韩中文字幕一区二区三区 | 亚洲AV无码成人网站在线观看| 亚洲国产精品18久久久久久| 综合一区自拍亚洲综合图区| 九九精品国产亚洲AV日韩| 亚洲成?v人片天堂网无码| 狠狠亚洲狠狠欧洲2019| 日本红怡院亚洲红怡院最新| 亚洲毛片在线观看| 亚洲国产精品综合久久网各| 亚洲AV无码专区在线亚| 亚洲AV无码国产一区二区三区| 亚洲精品久久久www| 亚洲成色在线综合网站| 91精品国产亚洲爽啪在线影院| 亚洲国产中文在线二区三区免| 中国china体内裑精亚洲日本| www.亚洲色图| 亚洲AV永久无码精品| 亚洲男女性高爱潮网站| 亚洲人成网站999久久久综合|