Spark之【RDD編程進階】——累加器與廣播變量的使用

      網友投稿 890 2025-03-31

      上一篇博客博主已經為大家介紹了Spark中數據讀取與保存,這一篇博客則帶來了Spark中的編程進階。其中就涉及到了累加器與廣播變量的使用。


      文章目錄

      RDD編程進階

      1.累加器

      1.1系統累加器

      2.廣播變量(調優策略)

      RDD編程進階

      1.累加器

      累加器用來對信息進行聚合,通常在向 Spark傳遞函數時,比如使用 map() 函數或者用 filter() 傳條件時,可以使用驅動器程序中定義的變量,但是集群中運行的每個任務都會得到這些變量的一份新的副本,更新這些副本的值也不會影響驅動器中的對應變量。如果我們想實現所有分片處理時更新共享變量的功能,那么累加器可以實現我們想要的效果。

      針對一個輸入的日志文件,如果我們想計算文件中所有空行的數量,我們可以編寫以下程序:

      scala> val notice = sc.textFile("./NOTICE") notice: org.apache.spark.rdd.RDD[String] = ./NOTICE MapPartitionsRDD[40] at textFile at :32 scala> val blanklines = sc.accumulator(0) warning: there were two deprecation warnings; re-run with -deprecation for details blanklines: org.apache.spark.Accumulator[Int] = 0 scala> val tmp = notice.flatMap(line => { | if (line == "") { | blanklines += 1 | } | line.split(" ") | }) tmp: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[41] at flatMap at :36 scala> tmp.count() res31: Long = 3213 scala> blanklines.value res32: Int = 171

      1

      2

      3

      4

      5

      6

      7

      8

      9

      10

      11

      Spark之【RDD編程進階】——累加器與廣播變量的使用

      12

      13

      14

      15

      16

      17

      18

      19

      20

      通過在驅動器中調用SparkContext.

      accumulator

      (initialValue)方法,創建出存有初始值的累加器。返回值為 org.apache.spark.Accumulator[T] 對象,其中 T 是初始值 initialValue 的類型。Spark閉包里的執行器代碼可以使用累加器的 += 方法(在Java中是 add)增加累加器的值。 驅動器程序可以調用累加器的value屬性(在Java中使用value()或setValue())來訪問累加器的值。

      注意:

      工作節點上的任務不能訪問累加器的值。從這些任務的角度來看,累加器是一個只寫變量。

      對于要在行動操作中使用的累加器,Spark只會把每個任務對各累加器的修改應用一次。因此,如果想要一個無論在失敗還是重復計算時都絕對可靠的累加器,我們必須把它放在 foreach() 這樣的行動操作中。轉化操作中累加器可能會發生不止一次更新。

      2.廣播變量(調優策略)

      廣播變量用來高效分發較大的對象。向所有工作節點發送一個較大的只讀值,以供一個或多個Spark操作使用。比如,如果你的應用需要向所有節點發送一個較大的只讀查詢表,甚至是機器學習算法中的一個很大的特征向量,廣播變量用起來都很順手。 在多個并行操作中使用同一個變量,但是 Spark會為每個任務分別發送。

      scala> val broadcastVar = sc.broadcast(Array(1, 2, 3)) broadcastVar: org.apache.spark.broadcast.Broadcast[Array[Int]] = Broadcast(35) scala> broadcastVar.value res33: Array[Int] = Array(1, 2, 3)

      1

      2

      3

      4

      5

      使用廣播變量的過程如下:

      (1) 通過對一個類型 T 的對象調用 SparkContext.broadcast 創建出一個 Broadcast[T] 對象。

      任何可序列化

      的類型都可以這么實現。

      (2) 通過 value 屬性訪問該對象的值(在 Java 中為 value() 方法)。

      (3) 變量只會被發到各個節點一次,應作為只讀值處理(修改這個值不會影響到別的節點)。

      本次的分享就到這里,對大數據技術感興趣的朋友可以關注一下喲~

      spark 硬件開發

      版權聲明:本文內容由網絡用戶投稿,版權歸原作者所有,本站不擁有其著作權,亦不承擔相應法律責任。如果您發現本站中有涉嫌抄襲或描述失實的內容,請聯系我們jiasou666@gmail.com 處理,核實后本網站將在24小時內刪除侵權內容。

      版權聲明:本文內容由網絡用戶投稿,版權歸原作者所有,本站不擁有其著作權,亦不承擔相應法律責任。如果您發現本站中有涉嫌抄襲或描述失實的內容,請聯系我們jiasou666@gmail.com 處理,核實后本網站將在24小時內刪除侵權內容。

      上一篇:真正隱藏excel表單(excel中的隱藏)
      下一篇:舊版ERP系統:是否該更換ERP?
      相關文章
      jizzjizz亚洲日本少妇| 亚洲日韩AV一区二区三区四区| 亚洲av永久无码| 亚洲高清一区二区三区| 亚洲神级电影国语版| 亚洲黄网在线观看| 亚洲人成电影福利在线播放 | 日韩亚洲翔田千里在线| 亚洲第一成年网站视频| 亚洲成在人线在线播放无码| 亚洲乱码日产精品一二三| 亚洲欧美国产欧美色欲| 亚洲国产精品网站在线播放| 亚洲爆乳无码精品AAA片蜜桃| 亚洲老熟女五十路老熟女bbw| 亚洲色中文字幕在线播放| 亚洲人成电影网站免费| 亚洲欧美不卡高清在线| 国产亚洲欧美在线观看| 国产亚洲精品美女久久久久| 亚洲国产人成精品| 综合久久久久久中文字幕亚洲国产国产综合一区首 | 亚洲免费中文字幕| 亚洲AV无码专区在线亚| 男人天堂2018亚洲男人天堂| 亚洲中文字幕乱码熟女在线| 亚洲午夜福利在线视频| 精品国产日韩亚洲一区在线| 亚洲福利在线播放| 亚洲乳大丰满中文字幕| 亚洲精品高清视频| 亚洲一区免费视频| 亚洲国产精品无码久久| 亚洲国产精品无码久久九九| 亚洲午夜福利在线观看| 亚洲午夜精品久久久久久人妖| 亚洲成人动漫在线观看| 在线观看亚洲AV每日更新无码| 午夜亚洲WWW湿好爽| 国产综合亚洲专区在线| 亚洲AV日韩精品久久久久久|