【SparkAPI JAVA版】JavaPairRDD——aggregate(一)

      網友投稿 750 2025-04-03

      /** * Aggregate the elements of each partition, and then the results for all the partitions, using * given combine functions and a neutral "zero value". This function can return a different result * type, U, than the type of this RDD, T. Thus, we need one operation for merging a T into an U * and one operation for merging two U's, as in scala.TraversableOnce. Both of these functions are * allowed to modify and return their first argument instead of creating a new U to avoid memory * allocation. * * @param zeroValue the initial value for the accumulated result of each partition for the * `seqOp` operator, and also the initial value for the combine results from * different partitions for the `combOp` operator - this will typically be the * neutral element (e.g. `Nil` for list concatenation or `0` for summation) * @param seqOp an operator used to accumulate results within a partition * @param combOp an associative operator used to combine results from different partitions */


      aggregate先對每個分區的元素做聚集,然后對所有分區的結果做聚集,聚集過程中,使用的是給定的聚集函數以及初始值”zero value”。這個函數能返回一個與原始RDD不同的類型U,因此,需要一個合并RDD類型T到結果類型U的函數,還需要一個合并類型U的函數。這兩個函數都可以修改和返回他們的第一個參數,而不是重新新建一個U類型的參數以避免重新分配內存。

      參數zeroValue:seqOp運算符的每個分區的累積結果的初始值以及combOp運算符的不同分區的組合結果的初始值 - 這通常將是初始元素(例如“Nil”表的列表 連接或“0”表示求和)

      參數seqOp: 每個分區累積結果的聚集函數。

      // Scala def aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U // java public static U aggregate(U zeroValue, Function2 seqOp, Function2 combOp)

      public class Aggregate { public static void main(String[] args) { System.setProperty("hadoop.home.dir","F:\hadoop-2.7.1"); SparkConf conf = new SparkConf().setMaster("local").setAppName("TestSpark"); JavaSparkContext sc = new JavaSparkContext(conf); JavaPairRDD javaPairRDD = sc.parallelizePairs(Lists.>newArrayList(new Tuple2("cat",34), new Tuple2("cat",34),new Tuple2("dog",34),new Tuple2("tiger",34)),2); // 打印樣例數據 javaPairRDD.foreach(new VoidFunction>() { public void call(Tuple2 stringIntegerTuple2) throws Exception { System.out.println("樣例數據>>>>>>>" + stringIntegerTuple2); } }); Integer integer = javaPairRDD.aggregate(0, new Function2, Integer>() { public Integer call(Integer v1, Tuple2 v2) throws Exception { System.out.println("seqOp>>>>> 參數One:"+v1+"--參數Two:"+v2); return v1+v2._2(); } }, new Function2() { public Integer call(Integer v1, Integer v2) throws Exception { System.out.println("combOp>>>>> 參數One:"+v1+"--參數Two:"+v2); return v1+v2; } }); System.out.println("result:"+integer); } }

      // 打印樣例數據 因為是2個分片 所以會隔開打印 如果只有一個分片 中間是沒有日志的 樣例數據>>>>>>>(cat,34) 樣例數據>>>>>>>(cat,34) 19/02/27 23:06:21 INFO Executor: Finished task 0.0 in stage 0.0 (TID 0). 665 bytes result sent to driver 19/02/27 23:06:21 INFO TaskSetManager: Starting task 1.0 in stage 0.0 (TID 1, localhost, executor driver, partition 1, PROCESS_LOCAL, 4907 bytes) 19/02/27 23:06:21 INFO Executor: Running task 1.0 in stage 0.0 (TID 1) 樣例數據>>>>>>>(dog,34) 樣例數據>>>>>>>(tiger,34) // 進行seqOp 或者也可以說是分片元素聚合 0這個初始值 會被添加進去 seqOp>>>>> 參數One:0--參數Two:(cat,34) seqOp>>>>> 參數One:34--參數Two:(cat,34) 19/02/27 23:06:21 INFO Executor: Finished task 0.0 in stage 1.0 (TID 2). 659 bytes result sent to driver 19/02/27 23:06:21 INFO TaskSetManager: Starting task 1.0 in stage 1.0 (TID 3, localhost, executor driver, partition 1, PROCESS_LOCAL, 4907 bytes) 19/02/27 23:06:21 INFO Executor: Running task 1.0 in stage 1.0 (TID 3) seqOp>>>>> 參數One:0--參數Two:(dog,34) seqOp>>>>> 參數One:34--參數Two:(tiger,34) // 進行combOp 也可以說是分片結果聚合 combOp>>>>> 參數One:0--參數Two:68 combOp>>>>> 參數One:68--參數Two:68 // 最終結果 result:136

      【SparkAPI JAVA版】JavaPairRDD——aggregate(一)

      從上述過程中,我們就能明白流程是什么了。

      seqOp(分片元素聚合)

      開始我們的數據是:

      分片1:(cat,34) (cat,34)

      分片2:(dog,34) (tiger,34)

      // 這里只有兩個分片 所以寫兩個過程 第一個分片開始元素聚合過程: zeroValue + 第一個元素值 = 結果1, 結果1+第二個元素值 = 結果2, 。。。。 結果n-1 + 結果n = 分片結果1。 第二個分片開始元素聚合過程: zeroValue + 第一個元素值 = 結果1, 結果1+第二個元素值 = 結果2, 。。。。 結果n-1 + 結果n = 分片結果2。

      combOp(分片結果聚合)

      開始分片結果聚合過程: zeroValue + 分片結果1 = 最終結果1, 最終結果1+ 分片結果2 = 最終結果2 最終得到的最終結果2 就是返回的結果 136

      如果有什么不明白的評論留言即可。

      EI企業智能 spark 可信智能計算服務 TICS 智能數據

      版權聲明:本文內容由網絡用戶投稿,版權歸原作者所有,本站不擁有其著作權,亦不承擔相應法律責任。如果您發現本站中有涉嫌抄襲或描述失實的內容,請聯系我們jiasou666@gmail.com 處理,核實后本網站將在24小時內刪除侵權內容。

      版權聲明:本文內容由網絡用戶投稿,版權歸原作者所有,本站不擁有其著作權,亦不承擔相應法律責任。如果您發現本站中有涉嫌抄襲或描述失實的內容,請聯系我們jiasou666@gmail.com 處理,核實后本網站將在24小時內刪除侵權內容。

      上一篇:生產計劃管理系統(生產計劃排程四個原則)
      下一篇:業務流程圖制作模板(業務流程圖制作模板怎么寫)
      相關文章
      亚洲AV中文无码乱人伦下载| 亚洲JLZZJLZZ少妇| 一级毛片直播亚洲| 亚洲熟女乱色一区二区三区| 亚洲最大的视频网站| 亚洲福利视频导航| 亚洲国产精品一区二区第一页| 亚洲亚洲人成综合网络| 中文字幕久久亚洲一区| 亚洲综合熟女久久久30p| 亚洲色一色噜一噜噜噜| 国产日产亚洲系列最新| 亚洲综合日韩久久成人AV| 亚洲日韩精品一区二区三区无码 | 亚洲国产精品无码久久98 | 亚洲AV男人的天堂在线观看| 中日韩亚洲人成无码网站| 亚洲熟伦熟女专区hd高清| 亚洲精品精华液一区二区| 国产精品无码亚洲一区二区三区| 亚洲AV无码国产剧情| 在线观看亚洲专区| 亚洲日本中文字幕天堂网| 红杏亚洲影院一区二区三区 | 波多野结衣亚洲一级| 亚洲日韩一区二区三区| 久久精品国产亚洲av瑜伽| 亚洲JIZZJIZZ中国少妇中文| 久久久久亚洲AV综合波多野结衣| 亚洲人成色77777| 久久亚洲精品成人av无码网站| 亚洲成人黄色在线| 亚洲中文字幕AV每天更新| 久久亚洲精品无码gv| 亚洲日本一区二区一本一道| 亚洲va无码va在线va天堂| 337p欧洲亚洲大胆艺术| 亚洲国产成a人v在线观看 | 亚洲日本中文字幕区| 亚洲一卡二卡三卡| 亚洲AV色欲色欲WWW|