Spark Core快速入門系列(3) | <Transformation>轉換算子
大家好,我是不溫卜火,是一名計算機學院大數據專業大二的學生,昵稱來源于成語—不溫不火,本意是希望自己性情溫和。作為一名互聯網行業的小白,博主寫博客一方面是為了記錄自己的學習過程,另一方面是總結自己所犯的錯誤希望能夠幫助到很多和自己一樣處于起步階段的萌新。但由于水平有限,博客中難免會有一些錯誤出現,有紕漏之處懇請各位大佬不吝賜教!暫時只有csdn這一個平臺,博客主頁:https://buwenbuhuo.blog.csdn.net/
此篇為大家帶來的是Spark Core Transformation轉換算子
目錄
一. Value類型
1 map(func)
2 mapPartitions(func)
3 mapPartitionsWithIndex(func)
4 map()和mapPartitions()的區別
5 flatMap(func)
6 glom()
7 groupBy(func)
8 filter(func)
9 sample(withReplacement, fraction, seed)
10 distinct([numTasks]))
11 coalesce(numPartitions)
12 repartition(numPartitions)
13 coalasce和repartition的區別
14 sortBy(func,[ascending], [numTasks])
15 pipe(command, [envVars])
二. 雙 Value 類型交互
1. union(otherDataset)
2. subtract (otherDataset)
3. intersection(otherDataset)
4. cartesian(otherDataset)
5. zip(otherDataset)
三. Key-Value 類型
1. partitionBy
2. groupByKey
3. reduceByKey(func, [numTasks])
4. reduceByKey和groupByKey的區別
5. aggregateByKey(zeroValue)(seqOp, combOp, [numTasks])
6. foldByKey
7. combineByKey[C]
8. sortByKey([ascending], [numTasks])
9. mapValues
9. mapValues
10. join(otherDataset, [numTasks])
11. cogroup(otherDataset, [numTasks])
在 RDD 上支持 2 種操作:
1.transformation
從一個已知的 RDD 中創建出來一個新的 RDD 例如: map就是一個transformation.
2.action
在數據集上計算結束之后, 給驅動程序返回一個值. 例如: reduce就是一個action.
本篇博文可以學到 RDD 的轉換操作, Action操作以后會詳細講解.
在 Spark 中幾乎所有的transformation操作都是懶執行的(lazy), 也就是說transformation操作并不會立即計算他們的結果, 而是記住了這個操作.
只有當通過一個action來獲取結果返回給驅動程序的時候這些轉換操作才開始計算.這種設計可以使 Spark 運行起來更加的高效.默認情況下, 你每次在一個 RDD 上運行一個action的時候, 前面的每個transformed RDD 都會被重新計算.但是我們可以通過persist (or cache)方法來持久化一個 RDD 在內存中, 也可以持久化到磁盤上, 來加快訪問速度. 后面有專門的章節學習這種持久化技術.
根據 RDD 中數據類型的不同, 整體分為 2 種 RDD:
1.Value類型
2.Key-Value類型(其實就是存一個二維的元組)
一. Value類型
1 map(func)
1.作用:
返回一個新的 RDD, 該 RDD 是由原 RDD 的每個元素經過函數轉換后的值而組成. 就是對 RDD 中的數據做轉換.
2. 案例:創建一個包含1-10的的 RDD,然后將每個元素*2形成新的 RDD
scala > val rdd1 = sc.parallelize(1 to 10) rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at
1
2
3
4
5
6
7
8
9
10
2 mapPartitions(func)
1. 作用
類似于map(func), 但是是獨立在每個分區上運行.所以:Iterator
假設有N個元素,有M個分區,那么map的函數的將被調用N次,而mapPartitions被調用M次,一個函數一次處理所有分區。
2. 案例:創建一個RDD,使每個元素*2組成新的RDD
// 1. 創建一個RDD scala> val source = sc.parallelize(1 to 10) source: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[5] at parallelize at
1
2
3
4
5
6
7
8
9
10
11
12
3 mapPartitionsWithIndex(func)
1. 作用
和mapPartitions(func)類似. 但是會給func多提供一個Int值來表示分區的索引. 所以func的類型是:(Int, Iterator
2. 案例:創建一個RDD,使每個元素跟所在分區形成一個元組組成一個新的RDD
// 1.創建一個RDD scala> val rdd = sc.parallelize(Array(1,2,3,4)) rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[4] at parallelize at
1
2
3
4
5
6
7
8
9
10
11
12
3. 分區數的確定, 和對數組中的元素如何進行分區
// 1.確定分區數: override def defaultParallelism(): Int = scheduler.conf.getInt("spark.default.parallelism", totalCores) // 2.對元素進行分區 // length: RDD 中數據的長度 numSlices: 分區數 def positions(length: Long, numSlices: Int): Iterator[(Int, Int)] = { (0 until numSlices).iterator.map { i =>val start = ((i * length) / numSlices).toInt val end = (((i + 1) * length) / numSlices).toInt (start, end) } } seq match { case r: Range => case nr: NumericRange[_] => case _ => val array = seq.toArray // To prevent O(n^2) operations for List etc positions(array.length, numSlices).map { case (start, end) => array.slice(start, end).toSeq }.toSeq }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
4 map()和mapPartitions()的區別
map():每次處理一條數據。
mapPartitions():每次處理一個分區的數據,這個分區的數據處理完后,原 RDD 中該分區的數據才能釋放,可能導致 OOM。
開發指導:當內存空間較大的時候建議使用mapPartitions(),以提高處理效率。
5 flatMap(func)
1. 作用
類似于map,但是每一個輸入元素可以被映射為 0 或多個輸出元素(所以func應該返回一個序列,而不是單一元素 T => TraversableOnce[U])
2. 案例1:創建一個元素為1-5的RDD,運用flatMap創建一個新的RDD,新的RDD為原RDD的每個元素的2倍(2,4,6,8,10)
// 1.創建 scala> val sourceFlat = sc.parallelize(1 to 5) sourceFlat: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[12] at parallelize at
1
2
3
4
5
6
7
8
9
10
11
12
13
3. 案例2:創建一個元素為 1-5 的RDD,運用 flatMap創建一個新的 RDD,新的 RDD 為原 RDD 每個元素的 平方和三次方 來組成 1,1,4,8,9,27…
scala> val rdd1 = sc.parallelize(Array(1,2,3,4,5)) rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[5] at parallelize at
1
2
3
4
5
6
7
8
9
10
6 glom()
1. 作用
將每一個分區的元素合并成一個數組,形成新的 RDD 類型是RDD[Array[T]]
2. 案例:創建一個 4 個分區的 RDD,并將每個分區的數據放到一個數組
// 1.創建 scala> var rdd1 = sc.parallelize(Array(10,20,30,40,50,60), 4) rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at
1
2
3
4
5
6
7
8
7 groupBy(func)
1. 作用
按照func的返回值進行分組.
func返回值作為 key, 對應的值放入一個迭代器中. 返回的 RDD: RDD[(K, Iterable[T])
每組內元素的順序不能保證, 并且甚至每次調用得到的順序也有可能不同.
2. 案例1:創建一個RDD,按照元素模以2的值進行分組。
// 1.創建 scala> val rdd = sc.parallelize(1 to 4) rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[65] at parallelize at
1
2
3
4
5
6
7
8
9
10
3. 案例2:創建一個 RDD,按照元素的奇偶性進行分組
scala> val rdd1 = sc.makeRDD(Array(1, 3, 4, 20, 4, 5, 8)) rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[2] at makeRDD at
1
2
3
4
5
6
7
8
9
10
8 filter(func)
1. 作用
過濾。返回一個新的RDD,該RDD由經過func函數計算后返回值為true的輸入元素組成。
2. 案例:創建一個RDD,按照元素模以2的值進行分組。
// 1.創建 scala> var sourceFilter = sc.parallelize(Array("xiaoming","xiaojiang","xiaohe","dazhi")) sourceFilter: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[10] at parallelize at
1
2
3
4
5
6
7
8
9
10
11
12
13
9 sample(withReplacement, fraction, seed)
1. 作用
1.以指定的隨機種子隨機抽樣出比例為fraction的數據,(抽取到的數量是: size * fraction). 需要注意的是得到的結果并不能保證準確的比例.
2.withReplacement表示是抽出的數據是否放回,true為有放回的抽樣,false為無放回的抽樣. 放回表示數據有可能會被重復抽取到, false 則不可能重復抽取到. 如果是false, 則fraction必須是:[0,1], 是 true 則大于等于0就可以了.
3.seed用于指定隨機數生成器種子。 一般用默認的, 或者傳入當前的時間戳
2. 案例:創建一個RDD(1-10),從中選擇放回和不放回抽樣
// 1.創建RDD scala> val rdd = sc.parallelize(1 to 10) rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[20] at parallelize at
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
10 distinct([numTasks]))
1. 作用
對 RDD 中元素執行去重操作. 參數表示任務的數量.默認值和分區數保持一致.
2. 案例:創建一個RDD,使用distinct()對其去重。
// 1.創建一個RDD scala> val distinctRdd = sc.parallelize(List(1,2,1,5,2,9,6,1)) distinctRdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[34] at parallelize at
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
11 coalesce(numPartitions)
1. 作用
縮減分區數,用于大數據集過濾后,提高小數據集的執行效率。
2. 案例:創建一個4個分區的RDD,對其縮減分區
// 1.創建一個RDD scala> val rdd = sc.parallelize(1 to 16,4) rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[54] at parallelize at
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
注意:
第二個參數表示是否shuffle, 如果不傳或者傳入的為false, 則表示不進行shuffer, 此時分區數減少有效, 增加分區數無效.
12 repartition(numPartitions)
1. 作用
根據新的分區數, 重新 shuffle 所有的數據, 這個操作總會通過網絡.新的分區數相比以前可以多, 也可以少
2. 案例:創建一個4個分區的RDD,對其重新分區
// 1.創建一個RDD scala> val rdd = sc.parallelize(1 to 16,4) rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[56] at parallelize at
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
13 coalasce和repartition的區別
coalesce重新分區,可以選擇是否進行shuffle過程。由參數shuffle: Boolean = false/true決定。
repartition實際上是調用的coalesce,進行shuffle。源碼如下:
def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope { coalesce(numPartitions, shuffle = true) }
1
2
3
如果是減少分區, 盡量避免 shuffle
14 sortBy(func,[ascending], [numTasks])
1. 作用
使用func先對數據進行處理,按照處理后結果排序,默認為正序。
2. 案例1:創建一個RDD,按照不同的規則進行排序
// 1.創建一個RDD scala> val rdd = sc.parallelize(List(2,1,3,4)) rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[21] at parallelize at
1
2
3
4
5
6
7
8
9
10
11
12
3. 案例2:
scala> val rdd1 = sc.parallelize(Array(1,3,4,10,4,6,9,20,30,16)) rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[46] at parallelize at
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 pipe(command, [envVars])
1. 作用
管道,針對每個分區,把 RDD 中的每個數據通過管道傳遞給shell命令或腳本,返回輸出的RDD。一個分區執行一次這個命令. 如果只有一個分區, 則執行一次命令.
注意:
腳本要放在 worker 節點可以訪問到的位置
2. 案例:編寫一個腳本,使用管道將腳本作用于RDD上。
// 1: 創建一個腳本文件pipe.sh 文件內容如下: echo "hello" while read line;do echo ">>>"$line done // 2: 創建只有 1 個分區的RDD scala> val rdd1 = sc.parallelize(Array(10,20,30,40), 1) rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
總結: 每個分區執行一次腳本, 但是每個元素算是標準輸入中的一行
二. 雙 Value 類型交互
這里的“雙 Value 類型交互”是指的兩個 RDD[V] 進行交互.
1. union(otherDataset)
1. 作用
求并集. 對源 RDD 和參數 RDD 求并集后返回一個新的 RDD
注意:
union和++是等價的
2. 案例:編寫一個腳本,使用管道將腳本作用于RDD上。
// 1.創建第一個RDD scala> val rdd1 = sc.parallelize(1 to 5) rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[23] at parallelize at
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
2. subtract (otherDataset)
1. 作用
計算差集. 從原 RDD 中減去 原 RDD 和 otherDataset 中的共同的部分.
2. 案例:創建兩個RDD,求第一個RDD與第二個RDD的差集
// 1.創建第一個RDD scala> val rdd = sc.parallelize(3 to 8) rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[70] at parallelize at
1
2
3
4
5
6
7
8
9
10
3. intersection(otherDataset)
1. 作用
對源RDD和參數RDD求交集后返回一個新的RDD
2. 案例:創建兩個RDD,求兩個RDD的交集
// 1.創建第一個RDD scala> val rdd1 = sc.parallelize(1 to 7) rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[26] at parallelize at
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
4. cartesian(otherDataset)
1. 作用
計算 2 個 RDD 的笛卡爾積. 盡量避免使用
2. 案例:創建兩個RDD,計算兩個RDD的笛卡爾積
// 1.創建第一個RDD scala> val rdd1 = sc.parallelize(1 to 3) rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[47] at parallelize at
1
2
3
4
5
6
7
8
9
10
11
12
13
5. zip(otherDataset)
1. 作用
拉鏈操作. 需要注意的是, 在 Spark 中, 兩個 RDD 的元素的數量和分區數都必須相同, 否則會拋出異常.(在 scala 中, 兩個集合的長度可以不同)
類似算子: zipWithIndex, zipPartitions
2. 案例:創建兩個RDD,并將兩個RDD組合到一起形成一個(k,v)RDD
// 1.創建第一個RDD scala> val rdd1 = sc.parallelize(Array(1,2,3),3) rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[1] at parallelize at
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
三. Key-Value 類型
大多數的 Spark 操作可以用在任意類型的 RDD 上, 但是有一些比較特殊的操作只能用在key-value類型的 RDD 上.這些特殊操作大多都涉及到 shuffle 操作, 比如: 按照 key 分組(group), 聚集(aggregate)等.
在 Spark 中, 這些操作在包含對偶類型(Tuple2)的 RDD 上自動可用(通過隱式轉換).
object RDD { implicit def rddToPairRDDFunctions[K, V](rdd: RDD[(K, V)]) (implicit kt: ClassTag[K], vt: ClassTag[V], ord: Ordering[K] = null): PairRDDFunctions[K, V] = { new PairRDDFunctions(rdd) }
1
2
3
4
5
鍵值對的操作是定義在PairRDDFunctions類上, 這個類是對RDD[(K, V)]的裝飾.
1. partitionBy
1. 作用
對pairRDD進行分區操作,如果原有的partionRDD和現有的partionRDD是一致的話就不進行分區, 否則會生成ShuffleRDD,即會產生shuffle過程。
2. partitionBy 算子源碼
def partitionBy(partitioner: Partitioner): RDD[(K, V)] = self.withScope { if (self.partitioner == Some(partitioner)) { self } else { new ShuffledRDD[K, V, V](self, partitioner) } }
1
2
3
4
5
6
7
8
9
3. 案例1:創建一個4個分區的RDD,對其重新分區
// 1.創建一個RDD scala> val rdd = sc.parallelize(Array((1,"aaa"),(2,"bbb"),(3,"ccc"),(4,"ddd")),4) rdd: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[44] at parallelize at
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
4. 案例2:
scala> val rdd1 = sc.parallelize(Array((1, "a"), (2, "b"), (3, "c"), (4, "d"))) rdd1: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[0] at parallelize at
1
2
3
4
5
6
7
8
9
2. groupByKey
1. 作用
groupByKey也是對每個key進行操作,但只生成一個sequence。
2. 案例:創建一個pairRDD,將相同key對應值聚合到一個sequence中,并計算相同key對應值的相加結果。
// 1.創建一個pairRDD scala> val words = Array("one", "two", "two", "three", "three", "three") words: Array[String] = Array(one, two, two, three, three, three) scala> val wordPairsRDD = sc.parallelize(words).map(word => (word, 1)) wordPairsRDD: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[4] at map at
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
注意:
基于當前的實現, groupByKey必須在內存中持有所有的鍵值對. 如果一個key有太多的value, 則會導致內存溢出(OutOfMemoryError)
所以這操作非常耗資源, 如果分組的目的是為了在每個key上執行聚合操作(比如: sum 和 average), 則應該使用PairRDDFunctions.aggregateByKey 或者PairRDDFunctions.reduceByKey, 因為他們有更好的性能(會先在分區進行預聚合)
3. reduceByKey(func, [numTasks])
1. 作用
在一個(K,V)的 RDD 上調用,返回一個(K,V)的 RDD,使用指定的reduce函數,將相同key的value聚合到一起,reduce任務的個數可以通過第二個可選的參數來設置。
2. 案例:創建一個pairRDD,計算相同key對應值的相加結果
// 1.創建一個pairRDD scala> val rdd = sc.parallelize(List(("female",1),("male",5),("female",5),("male",2))) rdd: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[46] at parallelize at
1
2
3
4
5
6
7
8
9
10
11
12
4. reduceByKey和groupByKey的區別
reduceByKey:按照key進行聚合,在shuffle之前有combine(預聚合)操作,返回結果是RDD[k,v]。
groupByKey:按照key進行分組,直接進行shuffle。
開發指導:reduceByKey比groupByKey性能更好,建議使用。但是需要注意是否會影響業務邏輯。
5. aggregateByKey(zeroValue)(seqOp, combOp, [numTasks])
1. 函數聲明:
def aggregateByKey[U: ClassTag](zeroValue: U)(seqOp: (U, V) => U, combOp: (U, U) => U): RDD[(K, U)] = self.withScope { aggregateByKey(zeroValue, defaultPartitioner(self))(seqOp, combOp) }
1
2
3
4
5
使用給定的 combine 函數和一個初始化的zero value, 對每個key的value進行聚合.
這個函數返回的類型U不同于源 RDD 中的V類型. U的類型是由初始化的zero value來定的. 所以, 我們需要兩個操作: - 一個操作(seqOp)去把 1 個v變成 1 個U - 另外一個操作(combOp)來合并 2 個U
第一個操作用于在一個分區進行合并, 第二個操作用在兩個分區間進行合并.
為了避免內存分配, 這兩個操作函數都允許返回第一個參數, 而不用創建一個新的U
2. 參數描述:
zeroValue:給每一個分區中的每一個key一個初始值;
seqOp:函數用于在每一個分區中用初始值逐步迭代value;
combOp:函數用于合并每個分區中的結果。
3. 案例:創建一個pairRDD,取出每個分區相同key對應值的最大值,然后相加
4. 案例分析:
5. 案例具體過程:計算每個 key 的平均值
// 1.創建一個pairRDD scala> val rdd = sc.parallelize(List(("a",3),("a",2),("c",4),("b",3),("c",6),("c",8)),2) rdd: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[0] at parallelize at
1
2
3
4
5
6
7
8
9
10
11
12
13
6. foldByKey
參數:(zeroValue: V)(func: (V, V) => V): RDD[(K, V)]
1. 作用
aggregateByKey的簡化操作,seqop和combop相同
2. 案例:創建一個pairRDD,計算相同key對應值的相加結果
// 1.創建一個pairRDD scala> val rdd = sc.parallelize(List((1,3),(1,2),(1,4),(2,3),(3,6),(3,8)),3) rdd: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[91] at parallelize at
1
2
3
4
5
6
7
8
9
10
11
12
13
7. combineByKey[C]
參數:(createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C)
1. 作用
對相同K,把V合并成一個集合。
2. 參數描述:
(1)createCombiner: combineByKey()會遍歷分區中的所有元素,因此每個元素的鍵要么還沒有遇到過,要么就和之前的某個元素的鍵相同。如果這是一個新的元素,combineByKey()會使用一個叫作createCombiner()的函數來創建那個鍵對應的累加器的初始值
(2)mergeValue:如果這是一個在處理當前分區之前已經遇到的鍵,它會使用mergeValue()方法將該鍵的累加器對應的當前值與這個新的值進行合并
(3)mergeCombiners: 由于每個分區都是獨立處理的,因此對于同一個鍵可以有多個累加器。如果有兩個或者更多的分區都有對應同一個鍵的累加器, 就需要使用用戶提供的mergeCombiners() 方法將各個分區的結果進行合并。
3. 案例:創建一個pairRDD,根據key計算每種key的均值。(先計算每個key出現的次數以及可以對應值的總和,再相除得到結果)
4. 案例分析:
5. 案例具體過程:對比幾個按照 key 聚集的函數的區別和聯系
// 1.創建一個pairRDD scala> val input = sc.parallelize(Array(("a", 88), ("b", 95), ("a", 91), ("b", 93), ("a", 95), ("b", 98)),2) input: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[52] at parallelize at
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
8. sortByKey([ascending], [numTasks])
1. 作用
在一個(K,V)的 RDD 上調用, K必須實現 Ordered[K] 接口(或者有一個隱式值: Ordering[K]), 返回一個按照key進行排序的(K,V)的 RDD
2. 案例:創建一個pairRDD,按照key的正序和倒序進行排序
// 1.創建一個pairRDD scala> val rdd = sc.parallelize(Array((3,"aa"),(6,"cc"),(2,"bb"),(1,"dd"))) rdd: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[14] at parallelize at
1
2
3
4
5
6
7
8
9
10
11
12
9. mapValues
1. 作用
針對于(K,V)形式的類型只對V進行操作
2. 案例:創建一個pairRDD,并將value添加字符串"|||"
// 1.創建一個pairRDD scala> val rdd3 = sc.parallelize(Array((1,"a"),(1,"d"),(2,"b"),(3,"c"))) rdd3: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[67] at parallelize at
1
2
3
4
5
6
7
8
9
9. mapValues
1. 作用
針對于(K,V)形式的類型只對V進行操作
2. 案例:創建一個pairRDD,并將value添加字符串"|||"
// 1.創建一個pairRDD scala> val rdd3 = sc.parallelize(Array((1,"a"),(1,"d"),(2,"b"),(3,"c"))) rdd3: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[67] at parallelize at
1
2
3
4
5
6
7
8
10. join(otherDataset, [numTasks])
1. 作用
內連接:在類型為(K,V)和(K,W)的RDD上調用,返回一個相同key對應的所有元素對在一起的(K,(V,W))的RDD
2. 案例:創建兩個pairRDD,并將key相同的數據聚合到一個元組。
// 1.創建第一個pairRDD scala> val rdd = sc.parallelize(Array((1,"a"),(2,"b"),(3,"c"))) rdd: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[32] at parallelize at
1
2
3
4
5
6
7
8
9
10
11
12
注意:
1 如果某一個 RDD 有重復的 Key, 則會分別與另外一個 RDD 的相同的 Key進行組合.
2 也支持外連接:leftOuterJoin, rightOuterJoin, and fullOuterJoin.
11. cogroup(otherDataset, [numTasks])
1. 作用
在類型為(K,V)和(K,W)的RDD上調用,返回一個(K,(Iterable
2. 案例:創建兩個pairRDD,并將key相同的數據聚合到一個迭代器。
// 1.創建第一個pairRDD scala> val rdd = sc.parallelize(Array((1,"a"),(2,"b"),(3,"c"))) rdd: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[37] at parallelize at
1
2
3
4
5
6
7
8
9
10
11
12
本次的分享就到這里了,
好書不厭讀百回,熟讀課思子自知。而我想要成為全場最靚的仔,就必須堅持通過學習來獲取更多知識,用知識改變命運,用博客見證成長,用行動證明我在努力。
如果我的博客對你有幫助、如果你喜歡我的博客內容,請“” “評論”“”一鍵三連哦!聽說的人運氣不會太差,每一天都會元氣滿滿呦!如果實在要白嫖的話,那祝你開心每一天,歡迎常來我博客看看。
碼字不易,大家的支持就是我堅持下去的動力。后不要忘了關注我哦!
spark 數據結構
版權聲明:本文內容由網絡用戶投稿,版權歸原作者所有,本站不擁有其著作權,亦不承擔相應法律責任。如果您發現本站中有涉嫌抄襲或描述失實的內容,請聯系我們jiasou666@gmail.com 處理,核實后本網站將在24小時內刪除侵權內容。