Spark Streaming實時流式大數據處理實戰》 ——3.7 共 享 變 量

      網友投稿 679 2022-05-29

      3.7? 共 享 變 量

      通過前面的介紹,我們知道Spark是多機器集群部署的,分為Driver、Master和Worker。Master負責資源調度,Worker是不同的運算節點,由Master統一調度,而Driver是我們提交Spark程序的節點,并且所有的reduce類型的操作都會匯總到Driver節點進行整合。

      節點之間會給每個節點傳遞一個map、reduce等操作函數的獨立副本,這些變量也會被復制到每臺機器上,而節點之間的運算是相互獨立的。當我們利用RDD操作(如map、reduce)在遠程節點執行一個功能函數時,其會在該節點開辟一塊單獨的變量空間供函數使用。

      這些變量會被復制到每一臺機器上,并且當變量發生改變時,并不會同步傳播回Driver程序。如果進行通用支持,任務間的讀寫共享變量需要大量的同步操作,這會導致低效。所以,Spark提供了兩種受限類型的共享變量用于兩種常見的使用模式:廣播變量和累加器。

      3.7.1? 累加器(Accumulator)

      顧名思義,累加器是一種只能通過關聯操作進行“加”操作的變量,因此它能夠高效地應用于并行操作中。累加器能夠用來實現對數據的統計和求和操作。Spark原生支持數值類型的累加器,開發者可以自己添加支持的類型,在2.0.0之前的版本中,通過繼承AccumulatorParam來實現,而2.0.0之后的版本需要繼承AccumulatorV2來實現自定義類型的累加器。

      如果創建了一個具體名稱的累加器,它可以在Spark的UI中顯示。這對于理解運行階段(running stages)的過程有很重要的作用,如圖3.8所示。

      圖3.8? 累加器展示圖

      Spark內置了數值型累加器,一個數值累加器可以由SparkContext.longAccumulator()或者SparkContext.doubleAccumulator()函數來創建,分別累加Long型或Double型數據。

      之后節點上的任務可以利用add方法進行累加操作,但是它們并不能讀取累加器的值。只有Driver程序能夠通過value方法讀取累加器的值,其具體使用方式如下:

      scala> val accum = sc.longAccumulator("My Accumulator")

      accum: org.apache.spark.util.LongAccumulator = LongAccumulator(id: 0,

      name: Some(My Accumulator), value: 0)

      scala> sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum.add(x))

      ...

      10/09/29 18:41:08 INFO SparkContext: Tasks finished in 0.317106 s

      scala> accum.value

      res2: Long = 10

      上面所述的代碼是使用了累加器的內建支持類型Long,當然也可以通過集成AccumulatorV2的方式來創建支持我們自定義類型的累加器。

      AccumulatorV2是一個包含一些方法的抽象類,其中一些方法必須被覆寫:reset方法使得累加器能夠被重置為0,add方法即添加另一個值到累加器中,merge方法能夠將另一個同類型累加器整合到當前累加器中。

      另外,其他必須覆寫的方法可以參考API文檔。這里我們參考官網的一個例子。假設有一個MyVector類型,表示數學中的向量,可以用以下方式來聲明MyVector累加器:

      class VectorAccumulatorV2 extends AccumulatorV2[MyVector, MyVector] {

      // 創建全0向量

      private val myVector: MyVector = MyVector.createZeroVector

      // 重置操作

      def reset(): Unit = {

      myVector.reset()

      }

      // 向量相加

      def add(v: MyVector): Unit = {

      myVector.add(v)

      }

      ...

      }

      // 創建向量類型的累加器

      val myVectorAcc = new VectorAccumulatorV2

      // 將累加器注冊到Spark上下文中

      sc.register(myVectorAcc, "MyVectorAcc1")

      值得一提的是,對于自定義類型的累加器,我們可以設置不同于相加元素的輸出元素。

      累加器只有在Action操作中才會被更新,Spark保證每個任務對于累加器的更新只會執行一次,如重新啟動任務并不會更新累加器的值。在Transformation操作中,如果Task、Job或Stages被重新執行(根據計算圖重新計算結果),那么累加器的更新有可能被執行多次。

      我們知道,Transformation會建立計算圖,只有Action操作才會觸發真正的計算,累加器也同樣遵循這個懶惰(lazy)原則,即如果只在Transformation操作中調用累加器,其結果并不會改變,示例如下:

      《Spark Streaming實時流式大數據處理實戰》 ——3.7 共 享 變 量

      // 創建long型累加器

      val accum = sc.longAccumulator

      // 在map操作內累加器進行累加

      data.map { x => accum.add(x); x }

      // 由于沒有任何Action操作,所以map操作并沒有被執行,accum值還是0

      ?特別注意:上文中關于累加器的使用只適合于Spark 2.0.0之后的版本,在此之前的版本中,累加器的聲明方式如下:

      scala> val accum = sc.accumulator(0, "My Accumulator")

      accum: spark.Accumulator[Int] = 0

      scala> sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum += x)

      ...

      10/09/29 18:41:08 INFO SparkContext: Tasks finished in 0.317106 s

      scala> accum.value

      res2: Int = 10

      這點在使用不同版本的Spark時要特別注意,因為在Spark 2.0.0之后的版本API接口有了很大變化。

      Spark spark 大數據 大數據

      版權聲明:本文內容由網絡用戶投稿,版權歸原作者所有,本站不擁有其著作權,亦不承擔相應法律責任。如果您發現本站中有涉嫌抄襲或描述失實的內容,請聯系我們jiasou666@gmail.com 處理,核實后本網站將在24小時內刪除侵權內容。

      上一篇:Cloudfoundry不完全文檔
      下一篇:K8s 原理架構介紹(一)
      相關文章
      亚洲毛片网址在线观看中文字幕 | 久久久久久a亚洲欧洲aⅴ| 久久精品国产亚洲AV久| 久久亚洲AV成人出白浆无码国产| 日韩亚洲精品福利| 亚洲乱码中文论理电影| 亚洲a级在线观看| 亚洲va乱码一区二区三区| 亚洲免费中文字幕| 亚洲卡一卡二卡乱码新区| 亚洲欧洲日产专区| 亚洲H在线播放在线观看H| 亚洲三级中文字幕| 亚洲最大无码中文字幕| 亚洲人成影院在线高清| 久久丫精品国产亚洲av不卡| 亚洲电影中文字幕| 久久综合亚洲色一区二区三区| 亚洲精品韩国美女在线| 亚洲精品人成电影网| 国产亚洲国产bv网站在线| 亚洲人成色77777在线观看| 亚洲狠狠婷婷综合久久蜜芽| 亚洲aⅴ无码专区在线观看春色| 偷自拍亚洲视频在线观看| 亚洲国产成人五月综合网 | yy6080亚洲一级理论| 亚洲男人av香蕉爽爽爽爽| 国产亚洲精品无码拍拍拍色欲 | 亚洲一区电影在线观看| 亚洲熟妇AV一区二区三区浪潮 | 国产精品国产亚洲区艳妇糸列短篇| 亚洲 综合 国产 欧洲 丝袜| heyzo亚洲精品日韩| 亚洲欧洲日产国码高潮αv| 亚洲韩国精品无码一区二区三区| 亚洲高清视频在线观看| 亚洲一卡2卡3卡4卡国产网站| 亚洲成AV人片在WWW| 精品韩国亚洲av无码不卡区| 久久夜色精品国产亚洲av|