SparkAPI Java版】JavaPairRDD——aggregateByKey(二)

      網(wǎng)友投稿 781 2025-04-01

      Aggregate the values of each key, using given combine functions and a neutral "zero value". This function can return a different result type, U, than the type of the values in this RDD, V. Thus, we need one operation for merging a V into a U and one operation for merging two U's. The former operation is used for merging values within a partition, and the latter is used for merging values between partitions. To avoid memory allocation, both of these functions are allowed to modify and return their first argument instead of creating a new U. Parameters: zeroValue - (undocumented) seqFunc - (undocumented) combFunc - (undocumented) Returns: (undocumented)

      aggregateByKey函數(shù)對(duì)PairRDD中相同Key的值進(jìn)行聚合操作,在聚合過程中同樣使用了一個(gè)中立的初始值。和aggregate函數(shù)類似,aggregateByKey返回值的類型不需要和RDD中value的類型一致。因?yàn)閍ggregateByKey是對(duì)相同Key中的值進(jìn)行聚合操作,所以aggregateByKey函數(shù)最終返回的類型還是Pair RDD,對(duì)應(yīng)的結(jié)果是Key和聚合好的值;而aggregate函數(shù)直接是返回非RDD的結(jié)果,這點(diǎn)需要注意。在實(shí)現(xiàn)過程中,定義了三個(gè)aggregateByKey函數(shù)原型,但最終調(diào)用的aggregateByKey函數(shù)都一致。

      // Scala def aggregateByKey[U: ClassTag](zeroValue: U, partitioner: Partitioner)     (seqOp: (U, V) => U, combOp: (U, U) => U): RDD[(K, U)] def aggregateByKey[U: ClassTag](zeroValue: U, numPartitions: Int)     (seqOp: (U, V) => U, combOp: (U, U) => U): RDD[(K, U)] def aggregateByKey[U: ClassTag](zeroValue: U)     (seqOp: (U, V) => U, combOp: (U, U) => U): RDD[(K, U)] // java public JavaPairRDD aggregateByKey(U zeroValue, Partitioner partitioner, Function2 seqFunc, Function2 combFunc) public JavaPairRDD aggregateByKey(U zeroValue, int numPartitions, Function2 seqFunc, Function2 combFunc) public JavaPairRDD aggregateByKey(U zeroValue, Function2 seqFunc, Function2 combFunc)

      【SparkAPI JAVA版】JavaPairRDD——aggregateByKey(二)

      第一個(gè)aggregateByKey函數(shù)我們可以自定義Partitioner。除了這個(gè)參數(shù)之外,其函數(shù)聲明和aggregate很類似;其他的aggregateByKey函數(shù)實(shí)現(xiàn)最終都是調(diào)用這個(gè)。

      第二個(gè)aggregateByKey函數(shù)可以設(shè)置分區(qū)的個(gè)數(shù)(numPartitions),最終用的是HashPartitioner。

      最后一個(gè)aggregateByKey實(shí)現(xiàn)先會(huì)判斷當(dāng)前RDD是否定義了分區(qū)函數(shù),如果定義了則用當(dāng)前RDD的分區(qū);如果當(dāng)前RDD并未定義分區(qū) ,則使用HashPartitioner。

      public class AggregateByKey { public static void main(String[] args) { System.setProperty("hadoop.home.dir","F:\\hadoop-2.7.1"); SparkConf conf = new SparkConf().setMaster("local").setAppName("TestSpark"); JavaSparkContext sc = new JavaSparkContext(conf); JavaPairRDD javaPairRDD = sc.parallelizePairs(Lists.>newArrayList( new Tuple2("cat",3), new Tuple2("dog",33), new Tuple2("cat",16), new Tuple2("tiger",66)), 2); // 打印樣例數(shù)據(jù) javaPairRDD.foreach(new VoidFunction>() { public void call(Tuple2 stringIntegerTuple2) throws Exception { System.out.println("樣例數(shù)據(jù)>>>>>>>" + stringIntegerTuple2); } }); JavaPairRDD javaPairRDD1 = javaPairRDD.aggregateByKey(14, new Function2() { public Integer call(Integer v1, Integer v2) throws Exception { System.out.println("seqOp>>>>> 參數(shù)One:"+v1+"--參數(shù)Two:"+v2); return Math.max(v1,v2); } }, new Function2() { public Integer call(Integer v1, Integer v2) throws Exception { System.out.println("combOp>>>>> 參數(shù)One:"+v1+"--參數(shù)Two:"+v2); return v1+v2; } }); // 打印結(jié)果數(shù)據(jù) javaPairRDD1.foreach(new VoidFunction>() { public void call(Tuple2 stringIntegerTuple2) throws Exception { System.out.println("結(jié)果數(shù)據(jù)>>>>>>>" + stringIntegerTuple2); } }); } }

      // 打印樣例數(shù)據(jù) 這里的分區(qū)是兩個(gè) 其中分區(qū)內(nèi)都有一個(gè)相同key值 19/03/03 22:16:07 INFO Executor: Running task 0.0 in stage 0.0 (TID 0) 樣例數(shù)據(jù)>>>>>>>(cat,3) 樣例數(shù)據(jù)>>>>>>>(dog,33) 19/03/03 22:16:07 INFO Executor: Running task 1.0 in stage 0.0 (TID 1) 樣例數(shù)據(jù)>>>>>>>(cat,16) 樣例數(shù)據(jù)>>>>>>>(tiger,66) 19/03/03 22:16:07 INFO Executor: Running task 0.0 in stage 1.0 (TID 2) // 第一個(gè)分區(qū)比較大小 14 3 => 14(cat) , 14 33 => 33(dog) seqOp>>>>> 參數(shù)One:14--參數(shù)Two:3 seqOp>>>>> 參數(shù)One:14--參數(shù)Two:33 19/03/03 22:16:08 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 4 ms 19/03/03 22:16:07 INFO TaskSetManager: Finished task 0.0 in stage 1.0 (TID 2) in 76 ms on localhost (executor driver) (1/2) // 第二個(gè)分區(qū)比較 14 16 => 16(cat) ,14 66 => 66(tiger) seqOp>>>>> 參數(shù)One:14--參數(shù)Two:16 seqOp>>>>> 參數(shù)One:14--參數(shù)Two:66 19/03/03 22:16:08 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 4 ms // 這個(gè)就是combOp階段 在不同分區(qū)內(nèi) 相同key的值做聚合操作 也就是(cat)14 + (cat)16 = 30 combOp>>>>> 參數(shù)One:14--參數(shù)Two:16 // 最后結(jié)果 結(jié)果數(shù)據(jù)>>>>>>>(dog,33) 結(jié)果數(shù)據(jù)>>>>>>>(cat,30) 19/03/03 22:16:08 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 0 ms 結(jié)果數(shù)據(jù)>>>>>>>(tiger,66)

      一定要記住: combOp 是聚合的不同分區(qū)相同key的值

      從上述過程中,我們就能明白流程是什么了。

      seqOp

      開始我們的數(shù)據(jù)是:

      分片1:(cat,3) (dog,33)

      分片2:(cat,16) (tiger,66)

      // 這里只有兩個(gè)分片 所以寫兩個(gè)過程 第一個(gè)分片開始seqOp過程: 14(zeroValue) 和 3(cat) 比較 = 14(結(jié)果1), 14(zeroValue) 和 33(dog) 比較 = 14(結(jié)果2) 第二個(gè)分片開始元素聚合過程: 14(zeroValue) 和 16(cat) 比較 = 14(結(jié)果3), 14(zeroValue) 和 66(tiger) 比較 = 14(結(jié)果4)

      combOp(不同分區(qū)相同key值)

      開始分片combOp過程:cat在不同分區(qū)有相同key值 結(jié)果1 + 結(jié)果3 = 30(結(jié)果5) 最終得到的結(jié)果2 ,結(jié)果4,結(jié)果5 結(jié)果數(shù)據(jù)>>>>>>>(dog,33) 結(jié)果數(shù)據(jù)>>>>>>>(cat,30) 結(jié)果數(shù)據(jù)>>>>>>>(tiger,66)

      如果有什么不明白的評(píng)論留言即可。

      EI企業(yè)智能 Java spark 可信智能計(jì)算服務(wù) TICS 智能數(shù)據(jù)

      版權(quán)聲明:本文內(nèi)容由網(wǎng)絡(luò)用戶投稿,版權(quán)歸原作者所有,本站不擁有其著作權(quán),亦不承擔(dān)相應(yīng)法律責(zé)任。如果您發(fā)現(xiàn)本站中有涉嫌抄襲或描述失實(shí)的內(nèi)容,請(qǐng)聯(lián)系我們jiasou666@gmail.com 處理,核實(shí)后本網(wǎng)站將在24小時(shí)內(nèi)刪除侵權(quán)內(nèi)容。

      版權(quán)聲明:本文內(nèi)容由網(wǎng)絡(luò)用戶投稿,版權(quán)歸原作者所有,本站不擁有其著作權(quán),亦不承擔(dān)相應(yīng)法律責(zé)任。如果您發(fā)現(xiàn)本站中有涉嫌抄襲或描述失實(shí)的內(nèi)容,請(qǐng)聯(lián)系我們jiasou666@gmail.com 處理,核實(shí)后本網(wǎng)站將在24小時(shí)內(nèi)刪除侵權(quán)內(nèi)容。

      上一篇:庫(kù)存管理系統(tǒng)的來(lái)龍去脈
      下一篇:“比較和合并工作簿”灰色選項(xiàng)不可用?90%的人不知道這個(gè)魔法技能的用法!
      相關(guān)文章
      亚洲中文字幕无码一去台湾| 久久亚洲中文字幕精品一区| 亚洲18在线天美| 国产av无码专区亚洲av桃花庵| 亚洲五月午夜免费在线视频| 国产精品亚洲天堂| 亚洲a∨无码精品色午夜| 亚洲大码熟女在线观看| 亚洲欧美aⅴ在线资源| 亚洲AV无码久久久久网站蜜桃| 亚洲伊人久久大香线焦| 亚洲乱码在线视频| 亚洲国产精品综合久久久| 亚洲国产日产无码精品| 亚洲Av无码一区二区二三区| 四虎亚洲精品高清在线观看| 亚洲综合av一区二区三区不卡| 亚洲欧好州第一的日产suv| 亚洲AV香蕉一区区二区三区| 亚洲av第一网站久章草| 亚洲AV成人精品日韩一区18p| 亚洲国产V高清在线观看| 久久精品国产精品亚洲人人 | 亚洲国产成人乱码精品女人久久久不卡| 亚洲AV噜噜一区二区三区| 亚洲成片观看四虎永久| 亚洲一区二区三区自拍公司| 亚洲AV无码乱码国产麻豆穿越| 亚洲午夜视频在线观看| 亚洲日本在线播放| 中文字幕亚洲精品无码| mm1313亚洲国产精品无码试看| 亚洲日本在线观看视频| 亚洲精品乱码久久久久久 | 亚洲AV成人影视在线观看| 亚洲精品无码专区| 亚洲国产成人精品91久久久| 亚洲中文字幕无码久久精品1| 久久精品国产亚洲| 亚洲国产精品乱码在线观看97| 亚洲综合激情五月色一区|