Spark Core快速入門系列(12) | 變量與累加器問題
大家好,我是不溫卜火,是一名計(jì)算機(jī)學(xué)院大數(shù)據(jù)專業(yè)大二的學(xué)生,昵稱來源于成語—不溫不火,本意是希望自己性情溫和。作為一名互聯(lián)網(wǎng)行業(yè)的小白,博主寫博客一方面是為了記錄自己的學(xué)習(xí)過程,另一方面是總結(jié)自己所犯的錯(cuò)誤希望能夠幫助到很多和自己一樣處于起步階段的萌新。但由于水平有限,博客中難免會(huì)有一些錯(cuò)誤出現(xiàn),有紕漏之處懇請(qǐng)各位大佬不吝賜教!暫時(shí)只有csdn這一個(gè)平臺(tái),博客主頁:https://buwenbuhuo.blog.csdn.net/
此篇為大家?guī)淼氖亲兞颗c累加器問題
目錄
一. 共享變量
二. 累加器
2.1 內(nèi)置累加器
2.2 自定義累加器
三. 廣播變量
一. 共享變量
1.代碼
package Demo import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} /** ** @author 不溫卜火 ** * @create 2020-08-01 12:18 ** * MyCSDN : https://buwenbuhuo.blog.csdn.net/ * */ object AccDemo1 { def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName("Practice").setMaster("local[2]") val sc = new SparkContext(conf) val p1 = Person(10) // 將來會(huì)把對(duì)象序列化之后傳遞到每個(gè)節(jié)點(diǎn)上 val rdd1 = sc.parallelize(Array(p1)) val rdd2: RDD[Person] = rdd1.map(p => {p.age = 100; p}) rdd2.count() // 仍然是 10 println(p1.age) } } case class Person(var age:Int)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
2.結(jié)果:
正常情況下, 傳遞給 Spark 算子(比如: map, reduce 等)的函數(shù)都是在遠(yuǎn)程的集群節(jié)點(diǎn)上執(zhí)行, 函數(shù)中用到的所有變量都是獨(dú)立的拷貝.
這些變量被拷貝到集群上的每個(gè)節(jié)點(diǎn)上, 都這些變量的更改不會(huì)傳遞回驅(qū)動(dòng)程序.
支持跨 task 之間共享變量通常是低效的, 但是 Spark 對(duì)共享變量也提供了兩種支持:
累加器
廣播變量
二. 累加器
累加器用來對(duì)信息進(jìn)行聚合,通常在向 Spark 傳遞函數(shù)時(shí),比如使用 map() 函數(shù)或者用 filter() 傳條件時(shí),可以使用驅(qū)動(dòng)器程序中定義的變量,但是集群中運(yùn)行的每個(gè)任務(wù)都會(huì)得到這些變量的一份新的副本,所以更新這些副本的值不會(huì)影響驅(qū)動(dòng)器中的對(duì)應(yīng)變量。
如果我們想實(shí)現(xiàn)所有分片處理時(shí)更新共享變量的功能,那么累加器可以實(shí)現(xiàn)我們想要的效果。
累加器是一種變量, 僅僅支持“add”, 支持并發(fā). 累加器用于去實(shí)現(xiàn)計(jì)數(shù)器或者求和. Spark 內(nèi)部已經(jīng)支持?jǐn)?shù)字類型的累加器, 開發(fā)者可以添加其他類型的支持.
2.1 內(nèi)置累加器
需求:計(jì)算文件中空行的數(shù)量
1. 代碼
package Demo import org.apache.spark.rdd.RDD import org.apache.spark.util.LongAccumulator import org.apache.spark.{SparkConf, SparkContext} /** ** @author 不溫卜火 ** * @create 2020-08-01 12:22 ** * MyCSDN : https://buwenbuhuo.blog.csdn.net/ * */ object AccDemo2 { def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName("Practice").setMaster("local[2]") val sc = new SparkContext(conf) val rdd: RDD[String] = sc.textFile("D:/words.txt") // 得到一個(gè) Long 類型的累加器. 將從 0 開始累加 val emptyLineCount: LongAccumulator = sc.longAccumulator rdd.foreach(s => if (s.trim.length == 0) emptyLineCount.add(1)) println(emptyLineCount.value) } }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
2. 運(yùn)行結(jié)果
3. 說明
在驅(qū)動(dòng)程序中通過sc.longAccumulator得到Long類型的累加器, 還有Double類型的
可以通過value來訪問累加器的值.(與sum等價(jià)). avg得到平均值
只能通過add來添加值.
累加器的更新操作最好放在action中, Spark 可以保證每個(gè) task 只執(zhí)行一次. 如果放在 transformations 操作中則不能保證只更新一次.有可能會(huì)被重復(fù)執(zhí)行.
2.2 自定義累加器
通過繼承類AccumulatorV2來自定義累加器.
下面這個(gè)累加器可以用于在程序運(yùn)行過程中收集一些文本類信息,最終以List[String]的形式返回。
1. 累加器
package Demo import java.util import java.util.{ArrayList, Collections} import org.apache.spark.util.AccumulatorV2 /** ** @author 不溫卜火 ** * @create 2020-08-01 12:56 ** * MyCSDN : https://buwenbuhuo.blog.csdn.net/ * */ object MyAccDemo { def main(args: Array[String]): Unit = { } } class MyAcc extends AccumulatorV2[String, java.util.List[String]] { private val _list: java.util.List[String] = Collections.synchronizedList(new ArrayList[String]()) override def isZero: Boolean = _list.isEmpty override def copy(): AccumulatorV2[String, util.List[String]] = { val newAcc = new MyAcc _list.synchronized { newAcc._list.addAll(_list) } newAcc } override def reset(): Unit = _list.clear() override def add(v: String): Unit = _list.add(v) override def merge(other: AccumulatorV2[String, util.List[String]]): Unit =other match { case o: MyAcc => _list.addAll(o.value) case _ => throw new UnsupportedOperationException( s"Cannot merge ${this.getClass.getName} with ${other.getClass.getName}") } override def value: util.List[String] = java.util.Collections.unmodifiableList(new util.ArrayList[String](_list)) }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
2. 測(cè)試
package Demo import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} /** ** * *@author 不溫卜火 ** * @create 2020-08-01 12:57 ** * MyCSDN : https://buwenbuhuo.blog.csdn.net/ * */ object MyAccDemo1 { def main(args: Array[String]): Unit = { val pattern = """^\d+$""" val conf = new SparkConf().setAppName("Practice").setMaster("local[2]") val sc = new SparkContext(conf) // 統(tǒng)計(jì)出來非純數(shù)字, 并計(jì)算純數(shù)字元素的和 val rdd1 = sc.parallelize(Array("abc", "a30b", "aaabb2", "60", "20")) val acc = new MyAcc sc.register(acc) val rdd2: RDD[Int] = rdd1.filter(x => { val flag: Boolean = x.matches(pattern) if (!flag) acc.add(x) flag }).map(_.toInt) println(rdd2.reduce(_ + _)) println(acc.value) } }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
注意:
在使用自定義累加器的不要忘記注冊(cè)sc.register(acc)
3. 運(yùn)行結(jié)果
三. 廣播變量
廣播變量在每個(gè)節(jié)點(diǎn)上保存一個(gè)只讀的變量的緩存, 而不用給每個(gè) task 來傳送一個(gè) copy.
例如, 給每個(gè)節(jié)點(diǎn)一個(gè)比較大的輸入數(shù)據(jù)集是一個(gè)比較高效的方法. Spark 也會(huì)用該對(duì)象的廣播邏輯去分發(fā)廣播變量來降低通訊的成本.
廣播變量通過調(diào)用SparkContext.broadcast(v)來創(chuàng)建. 廣播變量是對(duì)v的包裝, 通過調(diào)用廣播變量的 value方法可以訪問.
scala> val broadcastVar = sc.broadcast(Array(1, 2, 3)) broadcastVar: org.apache.spark.broadcast.Broadcast[Array[Int]] = Broadcast(0) scala> broadcastVar.value res0: Array[Int] = Array(1, 2, 3)
1
2
3
4
5
6
說明
通過對(duì)一個(gè)類型T的對(duì)象調(diào)用SparkContext.broadcast創(chuàng)建出一個(gè)Broadcast[T]對(duì)象。任何可序列化的類型都可以這么實(shí)現(xiàn)。
通過value屬性訪問該對(duì)象的值(在Java中為value()方法)。
變量只會(huì)被發(fā)到各個(gè)節(jié)點(diǎn)一次,應(yīng)作為只讀值處理(修改這個(gè)值不會(huì)影響到別的節(jié)點(diǎn))。
本次的分享就到這里了,
好書不厭讀百回,熟讀課思子自知。而我想要成為全場(chǎng)最靚的仔,就必須堅(jiān)持通過學(xué)習(xí)來獲取更多知識(shí),用知識(shí)改變命運(yùn),用博客見證成長(zhǎng),用行動(dòng)證明我在努力。
如果我的博客對(duì)你有幫助、如果你喜歡我的博客內(nèi)容,請(qǐng)“” “評(píng)論”“”一鍵三連哦!聽說的人運(yùn)氣不會(huì)太差,每一天都會(huì)元?dú)鉂M滿呦!如果實(shí)在要白嫖的話,那祝你開心每一天,歡迎常來我博客看看。
碼字不易,大家的支持就是我堅(jiān)持下去的動(dòng)力。后不要忘了關(guān)注我哦!
spark 硬件開發(fā)
版權(quán)聲明:本文內(nèi)容由網(wǎng)絡(luò)用戶投稿,版權(quán)歸原作者所有,本站不擁有其著作權(quán),亦不承擔(dān)相應(yīng)法律責(zé)任。如果您發(fā)現(xiàn)本站中有涉嫌抄襲或描述失實(shí)的內(nèi)容,請(qǐng)聯(lián)系我們jiasou666@gmail.com 處理,核實(shí)后本網(wǎng)站將在24小時(shí)內(nèi)刪除侵權(quán)內(nèi)容。