RDD原理與基本操作 | Spark,從入門到精通

      網友投稿 781 2025-04-01

      歡迎閱讀美圖數據技術團隊的「Spark,從入門到精通」系列文章,本系列文章將由淺入深為大家介紹 Spark,從框架入門到底層架構的實現,相信總有一種姿勢適合你,歡迎大家持續關注:)


      什么是 RDD?

      傳統的 MapReduce 雖然具有自動容錯、平衡負載和可拓展性的優點,但是其最大缺點是在迭代計算式的時候,要進行大量的磁盤 IO 操作,而 RDD 正是解決這一缺點的抽象方法。RDD(Resilient Distributed Datasets)即彈性分布式數據集,從名字說起:

      當計算過程中內存不足時可刷寫到磁盤等外存上,可與外存做靈活的數據交換;

      RDD 使用了一種“血統”的容錯機制,在結構更新和丟失后可隨時根據血統進行數據模型的重建;

      就是可以分布在多臺機器上進行并行計算;

      一組只讀的、可分區的分布式數據集合,集合內包含了多個分區。分區依照特定規則將具有相同屬性的數據記錄放在一起,每個分區相當于一個數據集片段。

      RDD 內部結構

      上圖是 RDD 的內部結構圖,它是一個只讀、有屬性的數據集。它的屬性用來描述當前數據集的狀態,數據集由數據的分區(partition)組成,并由(block)映射成真實數據。RDD 的主要屬性可以分為 3 類:與其他 RDD 的關系(parents、dependencies);數據(partitioner、checkpoint、storage level、iterator 等);RDD 自身屬性(sparkcontext、sparkconf),接下來我們根據屬性分類來深入介紹各個組件。

      從自身屬性說起,SparkContext 是 Spark job 的入口,由 Driver 創建在 client 端,包括集群連接、RDD ID、累加器、廣播變量等信息。SparkConf 是參數配置信息,包括:

      Spark api,控制大部分的應用程序參數;

      環境變量,配置IP地址、端口等信息;

      日志配置,通過 log4j.properties 配置。

      RDD 內部的數據集合在邏輯上和物理上被劃分成多個小子集合,這樣的每一個子集合我們將其稱為分區(Partitions),分區的個數會決定并行計算的粒度,而每一個分區數值的計算都是在一個單獨的任務中進行的,因此并行任務的個數也是由 RDD分區的個數決定的。但事實上 RDD 只是數據集的抽象,分區內部并不會存儲具體的數據。Partition 類內包含一個 index 成員,表示該分區在 RDD 內的編號,通過 RDD 編號+分區編號可以確定該分區對應的唯一塊編號,再利用底層數據存儲層提供的接口就能從存儲介質(如:HDFS、Memory)中提取出分區對應的數據。

      RDD 的分區方式主要包含兩種:Hash Partitioner 和 Range Partitioner,這兩種分區類型都是針對 Key-Value 類型的數據,如是非 Key-Value 類型則分區函數為 None。Hash 是以 Key 作為分區條件的散列分布,分區數據不連續,極端情況也可能散列到少數幾個分區上導致數據不均等;Range 按 Key 的排序平衡分布,分區內數據連續,大小也相對均等。

      Preferred Location?是一個列表,用于存儲每個 Partition 的優先位置。對于每個 HDFS 文件來說,這個列表保存的是每個 Partition 所在的塊的位置,也就是該文件的「劃分點」。

      Storage Level?是 RDD 持久化的存儲級別,RDD 持久化可以調用兩種方法:cache 和 persist:persist 方法可以自由的設置存儲級別,默認是持久化到內存;cache 方法是將 RDD 持久化到內存,cache 的內部實際上是調用了persist 方法,由于沒有開放存儲級別的參數設置,所以是直接持久化到內存。

      上圖所示是 Storage Level 各級別分布,那么如何選擇一種最合適的持久化策略呢?默認情況下,性能最高的當然是 MEMORY_ONLY,但前提是你的內存必須足夠大到可以綽綽有余地存放下整個 RDD 的所有數據。因為不進行序列化與反序列化操作,就避免了這部分的性能開銷;對這個RDD的后續算子操作,都是基于純內存中的數據的操作,不需要從磁盤文件中讀取數據,性能也很高;而且不需要復制一份數據副本,并遠程傳送到其他節點上。但是這里必須要注意的是,在實際的生產環境中,恐怕能夠直接用這種策略的場景還是有限的,如果 RDD 中數據比較多時(比如幾十億),直接用這種持久化級別,會導致 JVM 的 OOM 內存溢出異常。

      如果使用 MEMORY_ONLY 級別時發生了內存溢出,那么建議嘗試使用 MEMORY_ONLY_SER 級別。該級別會將 RDD 數據序列化后再保存在內存中,此時每個 partition 僅僅是一個字節數組而已,大大減少了對象數量,并降低了內存占用。這種級別比 MEMORY_ONLY 多出來的性能開銷主要就是序列化與反序列化的開銷,但是后續算子可以基于純內存進行操作,因此性能總體還是比較高的。但可能發生 OOM 內存溢出的異常。

      如果純內存的級別都無法使用,那么建議使用 MEMORY_AND_DISK_SER 策略,而不是 MEMORY_AND_DISK 策略。因為既然到了這一步,就說明 RDD 的數據量很大,內存無法完全放下,序列化后的數據比較少,可以節省內存和磁盤的空間開銷。同時該策略會優先盡量嘗試將數據緩存在內存中,內存緩存不下才會寫入磁盤。

      通常不建議使用 DISK_ONLY 和后綴為_2 的級別:因為完全基于磁盤文件進行數據的讀寫,會導致性能急劇降低。后綴為_2的級別,必須將所有數據都復制一份副本,并發送到其他節點上,數據復制以及網絡傳輸會導致較大的性能開銷。

      **Checkpoint **是 Spark 提供的一種緩存機制,當需要計算依賴鏈非常長又想避免重新計算之前的 RDD 時,可以對 RDD 做 Checkpoint 處理,檢查 RDD 是否被物化或計算,并將結果持久化到磁盤或 HDFS 內。Checkpoint 會把當前 RDD 保存到一個目錄,要觸發 action 操作的時候它才會執行。在 Checkpoint 應該先做持久化(persist 或者 cache)操作,否則就要重新計算一遍。若某個 RDD 成功執行 checkpoint,它前面的所有依賴鏈會被銷毀。

      與 Spark 提供的另一種緩存機制 cache 相比:cache 緩存數據由 executor 管理,若 executor 消失,它的數據將被清除,RDD 需要重新計算;而 checkpoint 將數據保存到磁盤或 HDFS 內,job 可以從 checkpoint 點繼續計算。Spark 提供了 rdd.persist(StorageLevel.DISK_ONLY) 這樣的方法,相當于 cache 到磁盤上,這樣可以使 RDD 第一次被計算得到時就存儲到磁盤上,它們之間的區別在于:persist 雖然可以將 RDD 的 partition 持久化到磁盤,但一旦作業執行結束,被 cache 到磁盤上的 RDD 會被清空;而 checkpoint 將 RDD 持久化到 HDFS 或本地文件夾,如果不被手動 remove 掉,是一直存在的。

      **Compute **函數實現方式就是向上遞歸「獲取父 RDD 分區數據進行計算」,直到遇到檢查點 RDD 獲取有緩存的 RDD。

      **Iterator **用來查找當前 RDD Partition 與父 RDD 中 Partition 的血緣關系,并通過 Storage Level 確定迭代位置,直到確定真實數據的位置。它的實現流程如下:

      若標記了有緩存,則取緩存,取不到則進行 computeOrReadCheckpoint(計算或讀檢查點)。完了再存入緩存,以備后續使用。

      若未標記有緩存,則直接進行 computeOrReadCheckpoint。

      computeOrReadCheckpoint 這個過程也做兩個判斷:有做過 checkpoint 和沒有做過 checkpoint,做過 checkpoint 則可以讀取到檢查點數據返回,沒做過則調該 RDD 的實現類的 compute 函數計算。

      一個作業從開始到結束的計算過程中產生了多個 RDD,RDD 之間是彼此相互依賴的,我們把這種父子依賴的關系稱之為「血統」。

      RDD 只支持粗顆粒變換,即只記錄單個塊(分區)上執行的單個操作,然后創建某個 RDD 的變換序列(血統 lineage)存儲下來。

      *變換序列指每個 RDD 都包含了它是如何由其他 RDD 變換過來的以及如何重建某一塊數據的信息。

      因此 RDD 的容錯機制又稱「血統」容錯。 要實現這種「血統」容錯機制,最大的難題就是如何表達父 RDD 和子 RDD 之間的依賴關系。

      上圖所示父 RDD 的每個分區最多只能被子 RDD 的一個分區使用,稱為窄依賴(narrow dependency);若父 RDD 的每個分區可以被子 RDD 的多個分區使用,稱為寬依賴(wide dependency)。簡單來講,窄依賴就是父子RDD分區間「一對一」的關系,而寬依賴就是「一對多」關系。從失敗恢復來看,窄依賴的失敗恢復起來更高效,因為它只需找到父 RDD 的一個對應分區即可,而且可以在不同節點上并行計算做恢復;寬依賴牽涉到父 RDD 的多個分區,需要得到所有依賴的父 RDD 分區的 shuffle 結果,恢復起來相對復雜些。

      根據 RDD 之間的寬窄依賴關系引申出 Stage 的概念,Stage 是由一組 RDD 組成的執行計劃。如果 RDD 的衍生關系都是窄依賴,則可放在同一個 Stage 中運行,若 RDD 的依賴關系為寬依賴,則要劃分到不同的 Stage。這樣 Spark 在執行作業時,會按照 Stage 的劃分, 生成一個最優、完整的執行計劃。

      RDD 的創建方式與分區機制

      RDD 的創建方式

      RDD 的創建方式有四種:

      1.使用程序中的集合創建 RDD,RDD 的數據源是程序中的集合,通過 parallelize 或者 makeRDD 將集合轉化為 RDD;

      *例

      val?num?=?Array(1,2,3,4,5)val?rdd?=?sc.parallelize(num)

      RDD原理與基本操作 | Spark,從入門到精通

      2.使用本地文件或 HDFS 創建 RDD,RDD 的數據源是本地文件系統或 HDFS 的數據,使用 textFile 方法創建RDD。 *例

      val?rdd?=?sc.textFile(“hdfs://master:9000/rec/data”)

      3.使用數據流創建 RDD,使用 Spark Streaming 的相關類,接收實時的輸入數據流創建 RDD(數據流來源可以是 kafka、flume 等)。

      *例

      val?ssc?=?new?StreamingContext(conf,?Seconds(1))val?lines?=?ssc.socketTextStream(“localhost”,?9999)val?words?=?lines.flatMap(_.split(“?”))

      4.使用其他方式創建 RDD,從其他數據庫上創建 RDD,例如 Hbase、MySQL 等。

      *例

      val?sqlContext?=?new?SQLContext(sc)val?url?=?"jdbc:mysql://ip:port/xxxx"val?prop?=?new?Properties()val?df?=?sqlContext.read.jdbc(url,?“play_time”,?prop)

      RDD 的分區機制

      RDD 的分區機制有兩個關鍵點:一個是關鍵參數,即 Spark 的默認并發數 spark.default.parallelism;另一個是關鍵原則,RDD 分區盡可能使得分區的個數等于集群核心數目。

      當配置文件 spark-default.conf 中顯式配置了 spark.default.parallelism,那么 spark.default.parallelism=配置的值,否則按照如下規則進行取值:

      1.本地模式(不會啟動 executor,由 SparkSubmit 進程生成指定數量的線程數來并發)

      spark-shell spark.default.parallelism = 1 spark-shell --master local[N] spark.default.parallelism = N (使用 N 個核) spark-shell --master local spark.default.parallelism = 1

      2.偽集群模式(x 為本機上啟動的 executor 數,y 為每個 executor 使用的 core 數,z 為每個 executor 使用的內存)

      spark-shell --master local-cluster[x,y,z] spark.default.parallelism = x * y

      3.Yarn、standalone 等模式

      spark.default.parallelism = max(所有 executor 使用的 core 總數,2)

      4.Mesos

      spark.default.parallelism = 8

      spark.context 會生成兩個參數,由 spark.default.parallelism 推導出這兩個參數的值:

      sc.defaultParallelism?????=?spark.default.parallelism sc.defaultMinPartitions??=?min(spark.default.parallelism,?2)

      當 sc.defaultParallelism 和 sc.defaultMinPartitions 確認后,就可以推算 RDD 的分區數了。

      以 parallelize 方法為例

      val?rdd?=?sc.parallelize(1?to?10)

      如果使用 parallelize 方法時沒指定分區數, RDD 的分區數 = sc.defaultParallelism

      以 textFile 方法為例

      val?rdd?=?sc.textFile(“path/file”)

      分區機制分兩種情況:

      1.從本地文件生成的 RDD,如果沒有指定分區數,則默認分區數規則為

      rdd 的分區數 = max(本地 file 的分片數, sc.defaultMinPartitions)

      2.從 HDFS 生成的 RDD,如果沒有指定分區數,則默認分區數規則為:

      rdd 的分區數 = max(hdfs 文件的 block 數目, sc.defaultMinPartitions)

      RDD 的常用操作

      RDD 支持兩種類型的操作:轉換(Transformation)和動作(Action),轉換操作是從已經存在的數據集中創建一個新的數據集,而動作操作是在數據集上進行計算后返回結果到 Driver,既觸發 SparkContext 提交 Job 作業。轉換操作都具有 Lazy 特性,即 Spark 不會立刻進行實際的計算,只會記錄執行的軌跡,只有觸發行動操作的時候,它才會根據 DAG 圖真正執行。

      轉換與動作具體包含的操作種類如下圖所示:

      1.轉換操作2.動作操作

      最后我們通過一段代碼來看看它具體的操作:

      這段代碼是用來計算某個視頻被男性或女性用戶的播放次數,其中 rdd_attr 用來記錄用戶性別,rdd_src 是用戶對某個視頻進行播放的記錄,這兩個 RDD 會進行一個 join 操作,比如這是某個男性用戶對某個視頻進行了播放,進行 map 操作之后得到視頻 id 和性別作為 key,根據這個 key 做 reduceByKey 的操作,最終得到一個視頻被男性/女性用戶總共播放了多少次的 RDD,然后使用 combineByKey 合并同一個視頻 id 的多個結果,最后保存到 HDFS 上。

      本文轉載自異步社區。

      原文鏈接

      https://www.epubit.com/articleDetails?id=N38bafe22-7d66-414d-908d-de6d8dbcf1de

      Spark

      版權聲明:本文內容由網絡用戶投稿,版權歸原作者所有,本站不擁有其著作權,亦不承擔相應法律責任。如果您發現本站中有涉嫌抄襲或描述失實的內容,請聯系我們jiasou666@gmail.com 處理,核實后本網站將在24小時內刪除侵權內容。

      版權聲明:本文內容由網絡用戶投稿,版權歸原作者所有,本站不擁有其著作權,亦不承擔相應法律責任。如果您發現本站中有涉嫌抄襲或描述失實的內容,請聯系我們jiasou666@gmail.com 處理,核實后本網站將在24小時內刪除侵權內容。

      上一篇:如何在Excel中的隱藏列和可見列之間切換?
      下一篇:銷售訂單管理的業務流程
      相關文章
      亚洲一级毛片免费看| 色拍自拍亚洲综合图区| 中文字幕在线日亚洲9| 亚洲成aⅴ人在线观看| 亚洲第一成年人网站| 亚洲嫩模在线观看| 亚洲人成网址在线观看| 亚洲制服中文字幕第一区| 亚洲一区二区三区高清| 亚洲精选在线观看| 亚洲人成网站在线播放影院在线| 久久精品九九亚洲精品| 亚洲成a人片在线观看中文!!!| 亚洲伊人久久大香线蕉啊| 国产亚洲sss在线播放| 亚洲综合一区无码精品| 亚洲AV无码专区亚洲AV桃| 久久久久亚洲精品无码网址色欲 | 亚洲精品福利视频| 亚洲网址在线观看你懂的| 亚洲码在线中文在线观看| 色在线亚洲视频www| 亚洲精品无码中文久久字幕| 国产精品亚洲lv粉色| 亚洲国产成人久久一区久久| 国产精品亚洲玖玖玖在线观看 | 色偷偷亚洲女人天堂观看欧| 亚洲成aⅴ人片久青草影院按摩 | 亚洲视频在线观看地址| 亚洲国产成人久久99精品| 亚洲最大天堂无码精品区| 亚洲AV无码一区二区三区牲色| 亚洲国产av无码精品| 亚洲国产一二三精品无码| 久久青青草原亚洲av无码app| 亚洲伊人久久精品| 日韩亚洲翔田千里在线| 国产精品亚洲αv天堂无码| 亚洲AV无码专区在线播放中文| 亚洲视频在线观看视频| 亚洲人片在线观看天堂无码|