Spark之【RDD編程進階】——累加器與廣播變量的使用
上一篇博客博主已經為大家介紹了Spark中數據讀取與保存,這一篇博客則帶來了Spark中的編程進階。其中就涉及到了累加器與廣播變量的使用。

文章目錄
RDD編程進階
1.累加器
1.1系統累加器
2.廣播變量(調優策略)
RDD編程進階
1.累加器
累加器用來對信息進行聚合,通常在向 Spark傳遞函數時,比如使用 map() 函數或者用 filter() 傳條件時,可以使用驅動器程序中定義的變量,但是集群中運行的每個任務都會得到這些變量的一份新的副本,更新這些副本的值也不會影響驅動器中的對應變量。如果我們想實現所有分片處理時更新共享變量的功能,那么累加器可以實現我們想要的效果。
針對一個輸入的日志文件,如果我們想計算文件中所有空行的數量,我們可以編寫以下程序:
scala> val notice = sc.textFile("./NOTICE") notice: org.apache.spark.rdd.RDD[String] = ./NOTICE MapPartitionsRDD[40] at textFile at
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
通過在驅動器中調用SparkContext.
accumulator
(initialValue)方法,創建出存有初始值的累加器。返回值為 org.apache.spark.Accumulator[T] 對象,其中 T 是初始值 initialValue 的類型。Spark閉包里的執行器代碼可以使用累加器的 += 方法(在Java中是 add)增加累加器的值。 驅動器程序可以調用累加器的value屬性(在Java中使用value()或setValue())來訪問累加器的值。
注意:
工作節點上的任務不能訪問累加器的值。從這些任務的角度來看,累加器是一個只寫變量。
對于要在行動操作中使用的累加器,Spark只會把每個任務對各累加器的修改應用一次。因此,如果想要一個無論在失敗還是重復計算時都絕對可靠的累加器,我們必須把它放在 foreach() 這樣的行動操作中。轉化操作中累加器可能會發生不止一次更新。
2.廣播變量(調優策略)
廣播變量用來高效分發較大的對象。向所有工作節點發送一個較大的只讀值,以供一個或多個Spark操作使用。比如,如果你的應用需要向所有節點發送一個較大的只讀查詢表,甚至是機器學習算法中的一個很大的特征向量,廣播變量用起來都很順手。 在多個并行操作中使用同一個變量,但是 Spark會為每個任務分別發送。
scala> val broadcastVar = sc.broadcast(Array(1, 2, 3)) broadcastVar: org.apache.spark.broadcast.Broadcast[Array[Int]] = Broadcast(35) scala> broadcastVar.value res33: Array[Int] = Array(1, 2, 3)
1
2
3
4
5
使用廣播變量的過程如下:
(1) 通過對一個類型 T 的對象調用 SparkContext.broadcast 創建出一個 Broadcast[T] 對象。
任何可序列化
的類型都可以這么實現。
(2) 通過 value 屬性訪問該對象的值(在 Java 中為 value() 方法)。
(3) 變量只會被發到各個節點一次,應作為只讀值處理(修改這個值不會影響到別的節點)。
本次的分享就到這里,對大數據技術感興趣的朋友可以關注一下喲~
spark 硬件開發
版權聲明:本文內容由網絡用戶投稿,版權歸原作者所有,本站不擁有其著作權,亦不承擔相應法律責任。如果您發現本站中有涉嫌抄襲或描述失實的內容,請聯系我們jiasou666@gmail.com 處理,核實后本網站將在24小時內刪除侵權內容。
版權聲明:本文內容由網絡用戶投稿,版權歸原作者所有,本站不擁有其著作權,亦不承擔相應法律責任。如果您發現本站中有涉嫌抄襲或描述失實的內容,請聯系我們jiasou666@gmail.com 處理,核實后本網站將在24小時內刪除侵權內容。