Spark Core快速入門系列(12) | 變量與累加器問題

      網(wǎng)友投稿 634 2022-05-29

      大家好,我是不溫卜火,是一名計(jì)算機(jī)學(xué)院大數(shù)據(jù)專業(yè)大二的學(xué)生,昵稱來源于成語—不溫不火,本意是希望自己性情溫和。作為一名互聯(lián)網(wǎng)行業(yè)的小白,博主寫博客一方面是為了記錄自己的學(xué)習(xí)過程,另一方面是總結(jié)自己所犯的錯(cuò)誤希望能夠幫助到很多和自己一樣處于起步階段的萌新。但由于水平有限,博客中難免會(huì)有一些錯(cuò)誤出現(xiàn),有紕漏之處懇請(qǐng)各位大佬不吝賜教!暫時(shí)只有csdn這一個(gè)平臺(tái),博客主頁:https://buwenbuhuo.blog.csdn.net/

      此篇為大家?guī)淼氖亲兞颗c累加器問題

      目錄

      一. 共享變量

      二. 累加器

      2.1 內(nèi)置累加器

      2.2 自定義累加器

      三. 廣播變量

      一. 共享變量

      1.代碼

      package Demo import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} /** ** @author 不溫卜火 ** * @create 2020-08-01 12:18 ** * MyCSDN : https://buwenbuhuo.blog.csdn.net/ * */ object AccDemo1 { def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName("Practice").setMaster("local[2]") val sc = new SparkContext(conf) val p1 = Person(10) // 將來會(huì)把對(duì)象序列化之后傳遞到每個(gè)節(jié)點(diǎn)上 val rdd1 = sc.parallelize(Array(p1)) val rdd2: RDD[Person] = rdd1.map(p => {p.age = 100; p}) rdd2.count() // 仍然是 10 println(p1.age) } } case class Person(var age:Int)

      1

      2

      3

      4

      5

      6

      7

      8

      9

      10

      11

      12

      13

      14

      15

      16

      17

      18

      19

      20

      21

      22

      23

      24

      25

      26

      27

      28

      29

      2.結(jié)果:

      正常情況下, 傳遞給 Spark 算子(比如: map, reduce 等)的函數(shù)都是在遠(yuǎn)程的集群節(jié)點(diǎn)上執(zhí)行, 函數(shù)中用到的所有變量都是獨(dú)立的拷貝.

      這些變量被拷貝到集群上的每個(gè)節(jié)點(diǎn)上, 都這些變量的更改不會(huì)傳遞回驅(qū)動(dòng)程序.

      支持跨 task 之間共享變量通常是低效的, 但是 Spark 對(duì)共享變量也提供了兩種支持:

      累加器

      Spark Core快速入門系列(12) | 變量與累加器問題

      廣播變量

      二. 累加器

      累加器用來對(duì)信息進(jìn)行聚合,通常在向 Spark 傳遞函數(shù)時(shí),比如使用 map() 函數(shù)或者用 filter() 傳條件時(shí),可以使用驅(qū)動(dòng)器程序中定義的變量,但是集群中運(yùn)行的每個(gè)任務(wù)都會(huì)得到這些變量的一份新的副本,所以更新這些副本的值不會(huì)影響驅(qū)動(dòng)器中的對(duì)應(yīng)變量。

      如果我們想實(shí)現(xiàn)所有分片處理時(shí)更新共享變量的功能,那么累加器可以實(shí)現(xiàn)我們想要的效果。

      累加器是一種變量, 僅僅支持“add”, 支持并發(fā). 累加器用于去實(shí)現(xiàn)計(jì)數(shù)器或者求和. Spark 內(nèi)部已經(jīng)支持?jǐn)?shù)字類型的累加器, 開發(fā)者可以添加其他類型的支持.

      2.1 內(nèi)置累加器

      需求:計(jì)算文件中空行的數(shù)量

      1. 代碼

      package Demo import org.apache.spark.rdd.RDD import org.apache.spark.util.LongAccumulator import org.apache.spark.{SparkConf, SparkContext} /** ** @author 不溫卜火 ** * @create 2020-08-01 12:22 ** * MyCSDN : https://buwenbuhuo.blog.csdn.net/ * */ object AccDemo2 { def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName("Practice").setMaster("local[2]") val sc = new SparkContext(conf) val rdd: RDD[String] = sc.textFile("D:/words.txt") // 得到一個(gè) Long 類型的累加器. 將從 0 開始累加 val emptyLineCount: LongAccumulator = sc.longAccumulator rdd.foreach(s => if (s.trim.length == 0) emptyLineCount.add(1)) println(emptyLineCount.value) } }

      1

      2

      3

      4

      5

      6

      7

      8

      9

      10

      11

      12

      13

      14

      15

      16

      17

      18

      19

      20

      21

      22

      23

      24

      25

      26

      27

      2. 運(yùn)行結(jié)果

      3. 說明

      在驅(qū)動(dòng)程序中通過sc.longAccumulator得到Long類型的累加器, 還有Double類型的

      可以通過value來訪問累加器的值.(與sum等價(jià)). avg得到平均值

      只能通過add來添加值.

      累加器的更新操作最好放在action中, Spark 可以保證每個(gè) task 只執(zhí)行一次. 如果放在 transformations 操作中則不能保證只更新一次.有可能會(huì)被重復(fù)執(zhí)行.

      2.2 自定義累加器

      通過繼承類AccumulatorV2來自定義累加器.

      下面這個(gè)累加器可以用于在程序運(yùn)行過程中收集一些文本類信息,最終以List[String]的形式返回。

      1. 累加器

      package Demo import java.util import java.util.{ArrayList, Collections} import org.apache.spark.util.AccumulatorV2 /** ** @author 不溫卜火 ** * @create 2020-08-01 12:56 ** * MyCSDN : https://buwenbuhuo.blog.csdn.net/ * */ object MyAccDemo { def main(args: Array[String]): Unit = { } } class MyAcc extends AccumulatorV2[String, java.util.List[String]] { private val _list: java.util.List[String] = Collections.synchronizedList(new ArrayList[String]()) override def isZero: Boolean = _list.isEmpty override def copy(): AccumulatorV2[String, util.List[String]] = { val newAcc = new MyAcc _list.synchronized { newAcc._list.addAll(_list) } newAcc } override def reset(): Unit = _list.clear() override def add(v: String): Unit = _list.add(v) override def merge(other: AccumulatorV2[String, util.List[String]]): Unit =other match { case o: MyAcc => _list.addAll(o.value) case _ => throw new UnsupportedOperationException( s"Cannot merge ${this.getClass.getName} with ${other.getClass.getName}") } override def value: util.List[String] = java.util.Collections.unmodifiableList(new util.ArrayList[String](_list)) }

      1

      2

      3

      4

      5

      6

      7

      8

      9

      10

      11

      12

      13

      14

      15

      16

      17

      18

      19

      20

      21

      22

      23

      24

      25

      26

      27

      28

      29

      30

      31

      32

      33

      34

      35

      36

      37

      38

      39

      40

      41

      42

      43

      44

      45

      46

      47

      2. 測(cè)試

      package Demo import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} /** ** * *@author 不溫卜火 ** * @create 2020-08-01 12:57 ** * MyCSDN : https://buwenbuhuo.blog.csdn.net/ * */ object MyAccDemo1 { def main(args: Array[String]): Unit = { val pattern = """^\d+$""" val conf = new SparkConf().setAppName("Practice").setMaster("local[2]") val sc = new SparkContext(conf) // 統(tǒng)計(jì)出來非純數(shù)字, 并計(jì)算純數(shù)字元素的和 val rdd1 = sc.parallelize(Array("abc", "a30b", "aaabb2", "60", "20")) val acc = new MyAcc sc.register(acc) val rdd2: RDD[Int] = rdd1.filter(x => { val flag: Boolean = x.matches(pattern) if (!flag) acc.add(x) flag }).map(_.toInt) println(rdd2.reduce(_ + _)) println(acc.value) } }

      1

      2

      3

      4

      5

      6

      7

      8

      9

      10

      11

      12

      13

      14

      15

      16

      17

      18

      19

      20

      21

      22

      23

      24

      25

      26

      27

      28

      29

      30

      31

      32

      33

      34

      35

      36

      37

      注意:

      在使用自定義累加器的不要忘記注冊(cè)sc.register(acc)

      3. 運(yùn)行結(jié)果

      三. 廣播變量

      廣播變量在每個(gè)節(jié)點(diǎn)上保存一個(gè)只讀的變量的緩存, 而不用給每個(gè) task 來傳送一個(gè) copy.

      例如, 給每個(gè)節(jié)點(diǎn)一個(gè)比較大的輸入數(shù)據(jù)集是一個(gè)比較高效的方法. Spark 也會(huì)用該對(duì)象的廣播邏輯去分發(fā)廣播變量來降低通訊的成本.

      廣播變量通過調(diào)用SparkContext.broadcast(v)來創(chuàng)建. 廣播變量是對(duì)v的包裝, 通過調(diào)用廣播變量的 value方法可以訪問.

      scala> val broadcastVar = sc.broadcast(Array(1, 2, 3)) broadcastVar: org.apache.spark.broadcast.Broadcast[Array[Int]] = Broadcast(0) scala> broadcastVar.value res0: Array[Int] = Array(1, 2, 3)

      1

      2

      3

      4

      5

      6

      說明

      通過對(duì)一個(gè)類型T的對(duì)象調(diào)用SparkContext.broadcast創(chuàng)建出一個(gè)Broadcast[T]對(duì)象。任何可序列化的類型都可以這么實(shí)現(xiàn)。

      通過value屬性訪問該對(duì)象的值(在Java中為value()方法)。

      變量只會(huì)被發(fā)到各個(gè)節(jié)點(diǎn)一次,應(yīng)作為只讀值處理(修改這個(gè)值不會(huì)影響到別的節(jié)點(diǎn))。

      本次的分享就到這里了,

      好書不厭讀百回,熟讀課思子自知。而我想要成為全場(chǎng)最靚的仔,就必須堅(jiān)持通過學(xué)習(xí)來獲取更多知識(shí),用知識(shí)改變命運(yùn),用博客見證成長(zhǎng),用行動(dòng)證明我在努力。

      如果我的博客對(duì)你有幫助、如果你喜歡我的博客內(nèi)容,請(qǐng)“” “評(píng)論”“”一鍵三連哦!聽說的人運(yùn)氣不會(huì)太差,每一天都會(huì)元?dú)鉂M滿呦!如果實(shí)在要白嫖的話,那祝你開心每一天,歡迎常來我博客看看。

      碼字不易,大家的支持就是我堅(jiān)持下去的動(dòng)力。后不要忘了關(guān)注我哦!

      spark 硬件開發(fā)

      版權(quán)聲明:本文內(nèi)容由網(wǎng)絡(luò)用戶投稿,版權(quán)歸原作者所有,本站不擁有其著作權(quán),亦不承擔(dān)相應(yīng)法律責(zé)任。如果您發(fā)現(xiàn)本站中有涉嫌抄襲或描述失實(shí)的內(nèi)容,請(qǐng)聯(lián)系我們jiasou666@gmail.com 處理,核實(shí)后本網(wǎng)站將在24小時(shí)內(nèi)刪除侵權(quán)內(nèi)容。

      上一篇:基礎(chǔ)備忘:C++類靜態(tài)成員與類靜態(tài)成員函數(shù)詳解
      下一篇:Source Insight崩潰的2種解決方法
      相關(guān)文章
      亚洲一区在线视频| 亚洲国产一区在线| 亚洲一区二区三区国产精品无码 | 亚洲乱码中文字幕在线| 91亚洲精品麻豆| 色播亚洲视频在线观看| 亚洲av无码乱码国产精品| 国产亚洲A∨片在线观看| 亚洲热线99精品视频| 丁香五月亚洲综合深深爱| 国产亚洲精品拍拍拍拍拍| 亚洲伊人久久综合中文成人网| 亚洲乱码中文字幕综合234 | 国产亚洲综合一区二区三区| www.亚洲色图.com| 亚洲A∨精品一区二区三区| 国产精品亚洲专区一区| 亚洲А∨精品天堂在线| 亚洲欧洲日产国码高潮αv| 亚洲欧洲精品成人久久奇米网| 亚洲免费在线观看| 久久久久国产亚洲AV麻豆| 亚洲综合另类小说色区| 亚洲日韩中文字幕在线播放| 亚洲Av无码精品色午夜| 亚洲资源在线观看| 亚洲字幕在线观看| 亚洲高清一区二区三区| 亚洲av日韩av永久无码电影| 伊在人亚洲香蕉精品区麻豆| 亚洲精品无码日韩国产不卡?V| 国产亚洲精品资在线| 亚洲第一AV网站| 亚洲黄色在线视频| tom影院亚洲国产一区二区| 亚洲欧洲免费无码| 成人亚洲综合天堂| 日韩一卡2卡3卡4卡新区亚洲| 亚洲va久久久噜噜噜久久 | 久久夜色精品国产嚕嚕亚洲av| 亚洲一本综合久久|