《Spark Streaming實時流式大數據處理實戰》 ——3.7 共 享 變 量
3.7? 共 享 變 量
通過前面的介紹,我們知道Spark是多機器集群部署的,分為Driver、Master和Worker。Master負責資源調度,Worker是不同的運算節點,由Master統一調度,而Driver是我們提交Spark程序的節點,并且所有的reduce類型的操作都會匯總到Driver節點進行整合。
節點之間會給每個節點傳遞一個map、reduce等操作函數的獨立副本,這些變量也會被復制到每臺機器上,而節點之間的運算是相互獨立的。當我們利用RDD操作(如map、reduce)在遠程節點執行一個功能函數時,其會在該節點開辟一塊單獨的變量空間供函數使用。
這些變量會被復制到每一臺機器上,并且當變量發生改變時,并不會同步傳播回Driver程序。如果進行通用支持,任務間的讀寫共享變量需要大量的同步操作,這會導致低效。所以,Spark提供了兩種受限類型的共享變量用于兩種常見的使用模式:廣播變量和累加器。
3.7.1? 累加器(Accumulator)
顧名思義,累加器是一種只能通過關聯操作進行“加”操作的變量,因此它能夠高效地應用于并行操作中。累加器能夠用來實現對數據的統計和求和操作。Spark原生支持數值類型的累加器,開發者可以自己添加支持的類型,在2.0.0之前的版本中,通過繼承AccumulatorParam來實現,而2.0.0之后的版本需要繼承AccumulatorV2來實現自定義類型的累加器。
如果創建了一個具體名稱的累加器,它可以在Spark的UI中顯示。這對于理解運行階段(running stages)的過程有很重要的作用,如圖3.8所示。
圖3.8? 累加器展示圖
Spark內置了數值型累加器,一個數值累加器可以由SparkContext.longAccumulator()或者SparkContext.doubleAccumulator()函數來創建,分別累加Long型或Double型數據。
之后節點上的任務可以利用add方法進行累加操作,但是它們并不能讀取累加器的值。只有Driver程序能夠通過value方法讀取累加器的值,其具體使用方式如下:
scala> val accum = sc.longAccumulator("My Accumulator")
accum: org.apache.spark.util.LongAccumulator = LongAccumulator(id: 0,
name: Some(My Accumulator), value: 0)
scala> sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum.add(x))
...
10/09/29 18:41:08 INFO SparkContext: Tasks finished in 0.317106 s
scala> accum.value
res2: Long = 10
上面所述的代碼是使用了累加器的內建支持類型Long,當然也可以通過集成AccumulatorV2的方式來創建支持我們自定義類型的累加器。
AccumulatorV2是一個包含一些方法的抽象類,其中一些方法必須被覆寫:reset方法使得累加器能夠被重置為0,add方法即添加另一個值到累加器中,merge方法能夠將另一個同類型累加器整合到當前累加器中。
另外,其他必須覆寫的方法可以參考API文檔。這里我們參考官網的一個例子。假設有一個MyVector類型,表示數學中的向量,可以用以下方式來聲明MyVector累加器:
class VectorAccumulatorV2 extends AccumulatorV2[MyVector, MyVector] {
// 創建全0向量
private val myVector: MyVector = MyVector.createZeroVector
// 重置操作
def reset(): Unit = {
myVector.reset()
}
// 向量相加
def add(v: MyVector): Unit = {
myVector.add(v)
}
...
}
// 創建向量類型的累加器
val myVectorAcc = new VectorAccumulatorV2
// 將累加器注冊到Spark上下文中
sc.register(myVectorAcc, "MyVectorAcc1")
值得一提的是,對于自定義類型的累加器,我們可以設置不同于相加元素的輸出元素。
累加器只有在Action操作中才會被更新,Spark保證每個任務對于累加器的更新只會執行一次,如重新啟動任務并不會更新累加器的值。在Transformation操作中,如果Task、Job或Stages被重新執行(根據計算圖重新計算結果),那么累加器的更新有可能被執行多次。
我們知道,Transformation會建立計算圖,只有Action操作才會觸發真正的計算,累加器也同樣遵循這個懶惰(lazy)原則,即如果只在Transformation操作中調用累加器,其結果并不會改變,示例如下:
// 創建long型累加器
val accum = sc.longAccumulator
// 在map操作內累加器進行累加
data.map { x => accum.add(x); x }
// 由于沒有任何Action操作,所以map操作并沒有被執行,accum值還是0
?特別注意:上文中關于累加器的使用只適合于Spark 2.0.0之后的版本,在此之前的版本中,累加器的聲明方式如下:
scala> val accum = sc.accumulator(0, "My Accumulator")
accum: spark.Accumulator[Int] = 0
scala> sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum += x)
...
10/09/29 18:41:08 INFO SparkContext: Tasks finished in 0.317106 s
scala> accum.value
res2: Int = 10
這點在使用不同版本的Spark時要特別注意,因為在Spark 2.0.0之后的版本API接口有了很大變化。
Spark spark 大數據 大數據
版權聲明:本文內容由網絡用戶投稿,版權歸原作者所有,本站不擁有其著作權,亦不承擔相應法律責任。如果您發現本站中有涉嫌抄襲或描述失實的內容,請聯系我們jiasou666@gmail.com 處理,核實后本網站將在24小時內刪除侵權內容。