Java的面向?qū)ο缶幊?/a>">Java的面向?qū)ο缶幊?/a>
781
2025-04-01
Aggregate the values of each key, using given combine functions and a neutral "zero value". This function can return a different result type, U, than the type of the values in this RDD, V. Thus, we need one operation for merging a V into a U and one operation for merging two U's. The former operation is used for merging values within a partition, and the latter is used for merging values between partitions. To avoid memory allocation, both of these functions are allowed to modify and return their first argument instead of creating a new U. Parameters: zeroValue - (undocumented) seqFunc - (undocumented) combFunc - (undocumented) Returns: (undocumented)
aggregateByKey函數(shù)對(duì)PairRDD中相同Key的值進(jìn)行聚合操作,在聚合過程中同樣使用了一個(gè)中立的初始值。和aggregate函數(shù)類似,aggregateByKey返回值的類型不需要和RDD中value的類型一致。因?yàn)閍ggregateByKey是對(duì)相同Key中的值進(jìn)行聚合操作,所以aggregateByKey函數(shù)最終返回的類型還是Pair RDD,對(duì)應(yīng)的結(jié)果是Key和聚合好的值;而aggregate函數(shù)直接是返回非RDD的結(jié)果,這點(diǎn)需要注意。在實(shí)現(xiàn)過程中,定義了三個(gè)aggregateByKey函數(shù)原型,但最終調(diào)用的aggregateByKey函數(shù)都一致。
// Scala def aggregateByKey[U: ClassTag](zeroValue: U, partitioner: Partitioner) (seqOp: (U, V) => U, combOp: (U, U) => U): RDD[(K, U)] def aggregateByKey[U: ClassTag](zeroValue: U, numPartitions: Int) (seqOp: (U, V) => U, combOp: (U, U) => U): RDD[(K, U)] def aggregateByKey[U: ClassTag](zeroValue: U) (seqOp: (U, V) => U, combOp: (U, U) => U): RDD[(K, U)] // java public JavaPairRDD
第一個(gè)aggregateByKey函數(shù)我們可以自定義Partitioner。除了這個(gè)參數(shù)之外,其函數(shù)聲明和aggregate很類似;其他的aggregateByKey函數(shù)實(shí)現(xiàn)最終都是調(diào)用這個(gè)。
第二個(gè)aggregateByKey函數(shù)可以設(shè)置分區(qū)的個(gè)數(shù)(numPartitions),最終用的是HashPartitioner。
最后一個(gè)aggregateByKey實(shí)現(xiàn)先會(huì)判斷當(dāng)前RDD是否定義了分區(qū)函數(shù),如果定義了則用當(dāng)前RDD的分區(qū);如果當(dāng)前RDD并未定義分區(qū) ,則使用HashPartitioner。
public class AggregateByKey { public static void main(String[] args) { System.setProperty("hadoop.home.dir","F:\\hadoop-2.7.1"); SparkConf conf = new SparkConf().setMaster("local").setAppName("TestSpark"); JavaSparkContext sc = new JavaSparkContext(conf); JavaPairRDD
// 打印樣例數(shù)據(jù) 這里的分區(qū)是兩個(gè) 其中分區(qū)內(nèi)都有一個(gè)相同key值 19/03/03 22:16:07 INFO Executor: Running task 0.0 in stage 0.0 (TID 0) 樣例數(shù)據(jù)>>>>>>>(cat,3) 樣例數(shù)據(jù)>>>>>>>(dog,33) 19/03/03 22:16:07 INFO Executor: Running task 1.0 in stage 0.0 (TID 1) 樣例數(shù)據(jù)>>>>>>>(cat,16) 樣例數(shù)據(jù)>>>>>>>(tiger,66) 19/03/03 22:16:07 INFO Executor: Running task 0.0 in stage 1.0 (TID 2) // 第一個(gè)分區(qū)比較大小 14 3 => 14(cat) , 14 33 => 33(dog) seqOp>>>>> 參數(shù)One:14--參數(shù)Two:3 seqOp>>>>> 參數(shù)One:14--參數(shù)Two:33 19/03/03 22:16:08 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 4 ms 19/03/03 22:16:07 INFO TaskSetManager: Finished task 0.0 in stage 1.0 (TID 2) in 76 ms on localhost (executor driver) (1/2) // 第二個(gè)分區(qū)比較 14 16 => 16(cat) ,14 66 => 66(tiger) seqOp>>>>> 參數(shù)One:14--參數(shù)Two:16 seqOp>>>>> 參數(shù)One:14--參數(shù)Two:66 19/03/03 22:16:08 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 4 ms // 這個(gè)就是combOp階段 在不同分區(qū)內(nèi) 相同key的值做聚合操作 也就是(cat)14 + (cat)16 = 30 combOp>>>>> 參數(shù)One:14--參數(shù)Two:16 // 最后結(jié)果 結(jié)果數(shù)據(jù)>>>>>>>(dog,33) 結(jié)果數(shù)據(jù)>>>>>>>(cat,30) 19/03/03 22:16:08 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 0 ms 結(jié)果數(shù)據(jù)>>>>>>>(tiger,66)
一定要記住: combOp 是聚合的不同分區(qū)相同key的值
從上述過程中,我們就能明白流程是什么了。
seqOp
開始我們的數(shù)據(jù)是:
分片1:(cat,3) (dog,33)
分片2:(cat,16) (tiger,66)
// 這里只有兩個(gè)分片 所以寫兩個(gè)過程 第一個(gè)分片開始seqOp過程: 14(zeroValue) 和 3(cat) 比較 = 14(結(jié)果1), 14(zeroValue) 和 33(dog) 比較 = 14(結(jié)果2) 第二個(gè)分片開始元素聚合過程: 14(zeroValue) 和 16(cat) 比較 = 14(結(jié)果3), 14(zeroValue) 和 66(tiger) 比較 = 14(結(jié)果4)
combOp(不同分區(qū)相同key值)
開始分片combOp過程:cat在不同分區(qū)有相同key值 結(jié)果1 + 結(jié)果3 = 30(結(jié)果5) 最終得到的結(jié)果2 ,結(jié)果4,結(jié)果5 結(jié)果數(shù)據(jù)>>>>>>>(dog,33) 結(jié)果數(shù)據(jù)>>>>>>>(cat,30) 結(jié)果數(shù)據(jù)>>>>>>>(tiger,66)
如果有什么不明白的評(píng)論留言即可。
EI企業(yè)智能 Java spark 可信智能計(jì)算服務(wù) TICS 智能數(shù)據(jù)
版權(quán)聲明:本文內(nèi)容由網(wǎng)絡(luò)用戶投稿,版權(quán)歸原作者所有,本站不擁有其著作權(quán),亦不承擔(dān)相應(yīng)法律責(zé)任。如果您發(fā)現(xiàn)本站中有涉嫌抄襲或描述失實(shí)的內(nèi)容,請(qǐng)聯(lián)系我們jiasou666@gmail.com 處理,核實(shí)后本網(wǎng)站將在24小時(shí)內(nèi)刪除侵權(quán)內(nèi)容。
版權(quán)聲明:本文內(nèi)容由網(wǎng)絡(luò)用戶投稿,版權(quán)歸原作者所有,本站不擁有其著作權(quán),亦不承擔(dān)相應(yīng)法律責(zé)任。如果您發(fā)現(xiàn)本站中有涉嫌抄襲或描述失實(shí)的內(nèi)容,請(qǐng)聯(lián)系我們jiasou666@gmail.com 處理,核實(shí)后本網(wǎng)站將在24小時(shí)內(nèi)刪除侵權(quán)內(nèi)容。