Spark Core快速入門系列(3) | <Transformation>轉換算子

      網友投稿 979 2022-05-29

      大家好,我是不溫卜火,是一名計算機學院大數據專業大二的學生,昵稱來源于成語—不溫不火,本意是希望自己性情溫和。作為一名互聯網行業的小白,博主寫博客一方面是為了記錄自己的學習過程,另一方面是總結自己所犯的錯誤希望能夠幫助到很多和自己一樣處于起步階段的萌新。但由于水平有限,博客中難免會有一些錯誤出現,有紕漏之處懇請各位大佬不吝賜教!暫時只有csdn這一個平臺,博客主頁:https://buwenbuhuo.blog.csdn.net/

      此篇為大家帶來的是Spark Core Transformation轉換算子

      目錄

      一. Value類型

      1 map(func)

      2 mapPartitions(func)

      3 mapPartitionsWithIndex(func)

      4 map()和mapPartitions()的區別

      5 flatMap(func)

      6 glom()

      7 groupBy(func)

      8 filter(func)

      9 sample(withReplacement, fraction, seed)

      10 distinct([numTasks]))

      11 coalesce(numPartitions)

      12 repartition(numPartitions)

      13 coalasce和repartition的區別

      14 sortBy(func,[ascending], [numTasks])

      15 pipe(command, [envVars])

      二. 雙 Value 類型交互

      1. union(otherDataset)

      2. subtract (otherDataset)

      3. intersection(otherDataset)

      4. cartesian(otherDataset)

      5. zip(otherDataset)

      三. Key-Value 類型

      1. partitionBy

      2. groupByKey

      3. reduceByKey(func, [numTasks])

      4. reduceByKey和groupByKey的區別

      5. aggregateByKey(zeroValue)(seqOp, combOp, [numTasks])

      6. foldByKey

      7. combineByKey[C]

      8. sortByKey([ascending], [numTasks])

      9. mapValues

      9. mapValues

      10. join(otherDataset, [numTasks])

      11. cogroup(otherDataset, [numTasks])

      在 RDD 上支持 2 種操作:

      1.transformation

      從一個已知的 RDD 中創建出來一個新的 RDD 例如: map就是一個transformation.

      2.action

      在數據集上計算結束之后, 給驅動程序返回一個值. 例如: reduce就是一個action.

      本篇博文可以學到 RDD 的轉換操作, Action操作以后會詳細講解.

      在 Spark 中幾乎所有的transformation操作都是懶執行的(lazy), 也就是說transformation操作并不會立即計算他們的結果, 而是記住了這個操作.

      只有當通過一個action來獲取結果返回給驅動程序的時候這些轉換操作才開始計算.這種設計可以使 Spark 運行起來更加的高效.默認情況下, 你每次在一個 RDD 上運行一個action的時候, 前面的每個transformed RDD 都會被重新計算.但是我們可以通過persist (or cache)方法來持久化一個 RDD 在內存中, 也可以持久化到磁盤上, 來加快訪問速度. 后面有專門的章節學習這種持久化技術.

      根據 RDD 中數據類型的不同, 整體分為 2 種 RDD:

      1.Value類型

      2.Key-Value類型(其實就是存一個二維的元組)

      一. Value類型

      1 map(func)

      1.作用:

      返回一個新的 RDD, 該 RDD 是由原 RDD 的每個元素經過函數轉換后的值而組成. 就是對 RDD 中的數據做轉換.

      2. 案例:創建一個包含1-10的的 RDD,然后將每個元素*2形成新的 RDD

      scala > val rdd1 = sc.parallelize(1 to 10) rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at :24 // 得到一個新的 RDD, 但是這個 RDD 中的元素并不是立即計算出來的 scala> val rdd2 = rdd1.map(_ * 2) rdd2: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[1] at map at :26 // 開始計算 rdd2 中的元素, 并把計算后的結果傳遞給驅動程序 scala> rdd2.collect res0: Array[Int] = Array(2, 4, 6, 8, 10, 12, 14, 16, 18, 20)

      1

      2

      3

      4

      5

      6

      7

      8

      9

      10

      2 mapPartitions(func)

      1. 作用

      類似于map(func), 但是是獨立在每個分區上運行.所以:Iterator => Iterator

      假設有N個元素,有M個分區,那么map的函數的將被調用N次,而mapPartitions被調用M次,一個函數一次處理所有分區。

      2. 案例:創建一個RDD,使每個元素*2組成新的RDD

      // 1. 創建一個RDD scala> val source = sc.parallelize(1 to 10) source: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[5] at parallelize at :24 // 2. 使每個元素*2組成新的RDD scala> source.mapPartitions(it => it.map(_ * 2)) res7: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[6] at mapPartitions at :27 // 3. 打印新的RDD scala> res7.collect res8: Array[Int] = Array(2, 4, 6, 8, 10, 12, 14, 16, 18, 20)

      1

      2

      3

      4

      5

      6

      7

      8

      9

      10

      11

      12

      3 mapPartitionsWithIndex(func)

      1. 作用

      和mapPartitions(func)類似. 但是會給func多提供一個Int值來表示分區的索引. 所以func的類型是:(Int, Iterator) => Iterator

      2. 案例:創建一個RDD,使每個元素跟所在分區形成一個元組組成一個新的RDD

      // 1.創建一個RDD scala> val rdd = sc.parallelize(Array(1,2,3,4)) rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[4] at parallelize at :24 // 2.使每個元素跟所在分區形成一個元組組成一個新的RDD scala> val indexRdd = rdd.mapPartitionsWithIndex((index,items)=>(items.map((index,_)))) indexRdd: org.apache.spark.rdd.RDD[(Int, Int)] = MapPartitionsRDD[5] at mapPartitionsWithIndex at :26 // 3.打印新的RDD scala> indexRdd.collect res2: Array[(Int, Int)] = Array((0,1), (0,2), (1,3), (1,4))

      1

      2

      3

      4

      5

      6

      7

      8

      9

      10

      11

      12

      3. 分區數的確定, 和對數組中的元素如何進行分區

      // 1.確定分區數: override def defaultParallelism(): Int = scheduler.conf.getInt("spark.default.parallelism", totalCores) // 2.對元素進行分區 // length: RDD 中數據的長度 numSlices: 分區數 def positions(length: Long, numSlices: Int): Iterator[(Int, Int)] = { (0 until numSlices).iterator.map { i =>val start = ((i * length) / numSlices).toInt val end = (((i + 1) * length) / numSlices).toInt (start, end) } } seq match { case r: Range => case nr: NumericRange[_] => case _ => val array = seq.toArray // To prevent O(n^2) operations for List etc positions(array.length, numSlices).map { case (start, end) => array.slice(start, end).toSeq }.toSeq }

      1

      2

      3

      4

      5

      6

      7

      8

      9

      10

      11

      12

      13

      14

      15

      16

      17

      18

      19

      20

      21

      22

      23

      4 map()和mapPartitions()的區別

      map():每次處理一條數據。

      mapPartitions():每次處理一個分區的數據,這個分區的數據處理完后,原 RDD 中該分區的數據才能釋放,可能導致 OOM。

      開發指導:當內存空間較大的時候建議使用mapPartitions(),以提高處理效率。

      5 flatMap(func)

      1. 作用

      類似于map,但是每一個輸入元素可以被映射為 0 或多個輸出元素(所以func應該返回一個序列,而不是單一元素 T => TraversableOnce[U])

      2. 案例1:創建一個元素為1-5的RDD,運用flatMap創建一個新的RDD,新的RDD為原RDD的每個元素的2倍(2,4,6,8,10)

      // 1.創建 scala> val sourceFlat = sc.parallelize(1 to 5) sourceFlat: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[12] at parallelize at :24 // 2.打印 scala> sourceFlat.collect() res11: Array[Int] = Array(1, 2, 3, 4, 5) // 3.根據原RDD創建新RDD(1->1,2->1,2……5->1,2,3,4,5) scala> val flatMap = sourceFlat.flatMap(1 to _) flatMap: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[13] at flatMap at :26 // 4.打印新RDD scala> flatMap.collect() res12: Array[Int] = Array(1, 1, 2, 1, 2, 3, 1, 2, 3, 4, 1, 2, 3, 4, 5)

      1

      2

      3

      4

      5

      6

      7

      8

      9

      10

      11

      12

      13

      3. 案例2:創建一個元素為 1-5 的RDD,運用 flatMap創建一個新的 RDD,新的 RDD 為原 RDD 每個元素的 平方和三次方 來組成 1,1,4,8,9,27…

      scala> val rdd1 = sc.parallelize(Array(1,2,3,4,5)) rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[5] at parallelize at :24 scala> rdd1.flatMap(x => Array(x * x, x * x * x)) res13: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[6] at flatMap at :27 scala> res13.collect res14: Array[Int] = Array(1, 1, 4, 8, 9, 27, 16, 64, 25, 125)

      1

      2

      3

      4

      5

      6

      7

      8

      9

      10

      6 glom()

      1. 作用

      將每一個分區的元素合并成一個數組,形成新的 RDD 類型是RDD[Array[T]]

      2. 案例:創建一個 4 個分區的 RDD,并將每個分區的數據放到一個數組

      // 1.創建 scala> var rdd1 = sc.parallelize(Array(10,20,30,40,50,60), 4) rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at :24 // 2.將每個分區的數據放到一個數組并收集到Driver端打印 scala> rdd1.glom.collect res2: Array[Array[Int]] = Array(Array(10), Array(20, 30), Array(40), Array(50, 60))

      1

      2

      3

      4

      5

      6

      7

      8

      7 groupBy(func)

      1. 作用

      按照func的返回值進行分組.

      func返回值作為 key, 對應的值放入一個迭代器中. 返回的 RDD: RDD[(K, Iterable[T])

      每組內元素的順序不能保證, 并且甚至每次調用得到的順序也有可能不同.

      2. 案例1:創建一個RDD,按照元素模以2的值進行分組。

      // 1.創建 scala> val rdd = sc.parallelize(1 to 4) rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[65] at parallelize at :24 // 2.按照元素模以2的值進行分組 scala> val group = rdd.groupBy(_%2) group: org.apache.spark.rdd.RDD[(Int, Iterable[Int])] = ShuffledRDD[2] at groupBy at :26 // 3.打印結果 scala> group.collect res0: Array[(Int, Iterable[Int])] = Array((0,CompactBuffer(2, 4)), (1,CompactBuffer(1, 3)))

      1

      2

      3

      4

      5

      6

      7

      8

      9

      10

      3. 案例2:創建一個 RDD,按照元素的奇偶性進行分組

      scala> val rdd1 = sc.makeRDD(Array(1, 3, 4, 20, 4, 5, 8)) rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[2] at makeRDD at :24 scala> rdd1.groupBy(x => if(x % 2 == 1) "odd" else "even") res4: org.apache.spark.rdd.RDD[(String, Iterable[Int])] = ShuffledRDD[4] at groupBy at :27 scala> res4.collect res5: Array[(String, Iterable[Int])] = Array((even,CompactBuffer(4, 20, 4, 8)), (odd,CompactBuffer(1, 3, 5)))

      1

      2

      3

      4

      5

      6

      7

      8

      9

      10

      8 filter(func)

      1. 作用

      過濾。返回一個新的RDD,該RDD由經過func函數計算后返回值為true的輸入元素組成。

      2. 案例:創建一個RDD,按照元素模以2的值進行分組。

      // 1.創建 scala> var sourceFilter = sc.parallelize(Array("xiaoming","xiaojiang","xiaohe","dazhi")) sourceFilter: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[10] at parallelize at :24 // 2.打印 scala> sourceFilter.collect() res9: Array[String] = Array(xiaoming, xiaojiang, xiaohe, dazhi) // 3.過濾出含” xiao”子串的形成一個新的RDD scala> val filter = sourceFilter.filter(_.contains("xiao")) filter: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[11] at filter at :26 // 4.打印新RDD scala> filter.collect() res10: Array[String] = Array(xiaoming, xiaojiang, xiaohe)

      1

      2

      3

      4

      5

      6

      7

      8

      9

      10

      11

      12

      13

      9 sample(withReplacement, fraction, seed)

      1. 作用

      1.以指定的隨機種子隨機抽樣出比例為fraction的數據,(抽取到的數量是: size * fraction). 需要注意的是得到的結果并不能保證準確的比例.

      2.withReplacement表示是抽出的數據是否放回,true為有放回的抽樣,false為無放回的抽樣. 放回表示數據有可能會被重復抽取到, false 則不可能重復抽取到. 如果是false, 則fraction必須是:[0,1], 是 true 則大于等于0就可以了.

      3.seed用于指定隨機數生成器種子。 一般用默認的, 或者傳入當前的時間戳

      2. 案例:創建一個RDD(1-10),從中選擇放回和不放回抽樣

      // 1.創建RDD scala> val rdd = sc.parallelize(1 to 10) rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[20] at parallelize at :24 // 2.打印 scala> rdd.collect() res15: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10) // 3.放回抽樣 scala> var sample1 = rdd.sample(true,0.4,2) sample1: org.apache.spark.rdd.RDD[Int] = PartitionwiseSampledRDD[21] at sample at :26 // 4.打印放回抽樣結果 scala> sample1.collect() res16: Array[Int] = Array(1, 2, 2, 7, 7, 8, 9) // 5.不放回抽樣 scala> var sample2 = rdd.sample(false,0.2,3) sample2: org.apache.spark.rdd.RDD[Int] = PartitionwiseSampledRDD[22] at sample at :26 // 6.打印不放回抽樣結果 scala> sample2.collect() res17: Array[Int] = Array(1, 9)

      1

      2

      3

      4

      5

      6

      7

      8

      9

      10

      11

      12

      13

      14

      15

      16

      17

      18

      19

      20

      21

      22

      23

      24

      10 distinct([numTasks]))

      1. 作用

      對 RDD 中元素執行去重操作. 參數表示任務的數量.默認值和分區數保持一致.

      2. 案例:創建一個RDD,使用distinct()對其去重。

      // 1.創建一個RDD scala> val distinctRdd = sc.parallelize(List(1,2,1,5,2,9,6,1)) distinctRdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[34] at parallelize at :24 // 2.對RDD進行去重(不指定并行度) scala> val unionRDD = distinctRdd.distinct() unionRDD: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[37] at distinct at :26 // 3.打印去重后生成的新RDD scala> unionRDD.collect() res20: Array[Int] = Array(1, 9, 5, 6, 2) // 4.對RDD(指定并行度為2) scala> val unionRDD = distinctRdd.distinct(2) unionRDD: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[40] at distinct at :26 // 5.打印去重后生成的新RDD scala> unionRDD.collect() res21: Array[Int] = Array(6, 2, 1, 9, 5)

      1

      2

      3

      4

      5

      6

      7

      8

      9

      10

      11

      12

      13

      14

      15

      16

      17

      18

      19

      20

      11 coalesce(numPartitions)

      1. 作用

      縮減分區數,用于大數據集過濾后,提高小數據集的執行效率。

      2. 案例:創建一個4個分區的RDD,對其縮減分區

      // 1.創建一個RDD scala> val rdd = sc.parallelize(1 to 16,4) rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[54] at parallelize at :24 // 2.查看RDD的分區數 scala> rdd.partitions.size res20: Int = 4 // 3.對RDD重新分區 scala> val coalesceRDD = rdd.coalesce(3) coalesceRDD: org.apache.spark.rdd.RDD[Int] = CoalescedRDD[55] at coalesce at :26 // 4.查看新RDD的分區數 scala> coalesceRDD.partitions.size res21: Int = 3

      1

      2

      3

      4

      5

      6

      7

      8

      9

      10

      11

      12

      13

      14

      15

      16

      17

      注意:

      第二個參數表示是否shuffle, 如果不傳或者傳入的為false, 則表示不進行shuffer, 此時分區數減少有效, 增加分區數無效.

      12 repartition(numPartitions)

      1. 作用

      根據新的分區數, 重新 shuffle 所有的數據, 這個操作總會通過網絡.新的分區數相比以前可以多, 也可以少

      2. 案例:創建一個4個分區的RDD,對其重新分區

      // 1.創建一個RDD scala> val rdd = sc.parallelize(1 to 16,4) rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[56] at parallelize at :24 // 2.查看RDD的分區數 scala> rdd.partitions.size res22: Int = 4 // 3.對RDD重新分區 scala> val rerdd = rdd.repartition(2) rerdd: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[60] at repartition at :26 // 4.查看新RDD的分區數 scala> rerdd.partitions.size res23: Int = 2

      1

      2

      3

      4

      5

      6

      7

      8

      9

      10

      11

      12

      13

      14

      15

      16

      13 coalasce和repartition的區別

      coalesce重新分區,可以選擇是否進行shuffle過程。由參數shuffle: Boolean = false/true決定。

      repartition實際上是調用的coalesce,進行shuffle。源碼如下:

      def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope { coalesce(numPartitions, shuffle = true) }

      1

      2

      3

      如果是減少分區, 盡量避免 shuffle

      14 sortBy(func,[ascending], [numTasks])

      1. 作用

      使用func先對數據進行處理,按照處理后結果排序,默認為正序。

      2. 案例1:創建一個RDD,按照不同的規則進行排序

      // 1.創建一個RDD scala> val rdd = sc.parallelize(List(2,1,3,4)) rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[21] at parallelize at :24 // 2.按照自身大小排序 scala> rdd.sortBy(x => x).collect() res11: Array[Int] = Array(1, 2, 3, 4) // 3.按照與3余數的大小排序 scala> rdd.sortBy(x => x%3).collect() res12: Array[Int] = Array(3, 4, 1, 2)

      1

      2

      3

      4

      5

      6

      7

      8

      9

      10

      11

      12

      3. 案例2:

      scala> val rdd1 = sc.parallelize(Array(1,3,4,10,4,6,9,20,30,16)) rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[46] at parallelize at :24 scala> rdd1.sortBy(x => x).collect res17: Array[Int] = Array(1, 3, 4, 4, 6, 9, 10, 16, 20, 30) scala> rdd1.sortBy(x => x, true).collect res18: Array[Int] = Array(1, 3, 4, 4, 6, 9, 10, 16, 20, 30) // 不用正序 scala> rdd1.sortBy(x => x, false).collect res19: Array[Int] = Array(30, 20, 16, 10, 9, 6, 4, 4, 3, 1)

      1

      2

      3

      4

      5

      6

      7

      8

      9

      10

      11

      12

      13

      14

      15 pipe(command, [envVars])

      1. 作用

      管道,針對每個分區,把 RDD 中的每個數據通過管道傳遞給shell命令或腳本,返回輸出的RDD。一個分區執行一次這個命令. 如果只有一個分區, 則執行一次命令.

      注意:

      腳本要放在 worker 節點可以訪問到的位置

      2. 案例:編寫一個腳本,使用管道將腳本作用于RDD上。

      // 1: 創建一個腳本文件pipe.sh 文件內容如下: echo "hello" while read line;do echo ">>>"$line done // 2: 創建只有 1 個分區的RDD scala> val rdd1 = sc.parallelize(Array(10,20,30,40), 1) rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at :24 scala> rdd1.pipe("./pipe.sh").collect res1: Array[String] = Array(hello, >>>10, >>>20, >>>30, >>>40) // 3: 創建有 2 個分區的 RDD scala> val rdd1 = sc.parallelize(Array(10,20,30,40), 2) rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[3] at parallelize at :24 scala> rdd1.pipe("./pipe.sh").collect res2: Array[String] = Array(hello, >>>10, >>>20, hello, >>>30, >>>40)

      1

      2

      3

      4

      5

      6

      7

      8

      9

      10

      11

      12

      13

      14

      15

      16

      17

      18

      19

      20

      21

      總結: 每個分區執行一次腳本, 但是每個元素算是標準輸入中的一行

      二. 雙 Value 類型交互

      這里的“雙 Value 類型交互”是指的兩個 RDD[V] 進行交互.

      1. union(otherDataset)

      1. 作用

      求并集. 對源 RDD 和參數 RDD 求并集后返回一個新的 RDD

      注意:

      union和++是等價的

      2. 案例:編寫一個腳本,使用管道將腳本作用于RDD上。

      // 1.創建第一個RDD scala> val rdd1 = sc.parallelize(1 to 5) rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[23] at parallelize at :24 // 2.創建第二個RDD scala> val rdd2 = sc.parallelize(5 to 10) rdd2: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[24] at parallelize at :24 // 3.計算兩個RDD的并集 scala> val rdd3 = rdd1.union(rdd2) rdd3: org.apache.spark.rdd.RDD[Int] = UnionRDD[25] at union at :28 // 4.打印并集結果 scala> rdd3.collect() res18: Array[Int] = Array(1, 2, 3, 4, 5, 5, 6, 7, 8, 9, 10)

      1

      2

      3

      4

      5

      6

      7

      8

      9

      10

      11

      12

      13

      14

      15

      16

      2. subtract (otherDataset)

      1. 作用

      計算差集. 從原 RDD 中減去 原 RDD 和 otherDataset 中的共同的部分.

      2. 案例:創建兩個RDD,求第一個RDD與第二個RDD的差集

      // 1.創建第一個RDD scala> val rdd = sc.parallelize(3 to 8) rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[70] at parallelize at :24 // 2.創建第二個RDD scala> val rdd1 = sc.parallelize(1 to 5) rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[71] at parallelize at :24 // 3.計算第一個RDD與第二個RDD的差集并打印 scala> rdd.subtract(rdd1).collect() res27: Array[Int] = Array(8, 6, 7)

      1

      2

      3

      4

      5

      6

      7

      8

      9

      10

      3. intersection(otherDataset)

      1. 作用

      對源RDD和參數RDD求交集后返回一個新的RDD

      2. 案例:創建兩個RDD,求兩個RDD的交集

      // 1.創建第一個RDD scala> val rdd1 = sc.parallelize(1 to 7) rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[26] at parallelize at :24 // 2.創建第二個RDD scala> val rdd2 = sc.parallelize(5 to 10) rdd2: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[27] at parallelize at :24 // 3.計算兩個RDD的交集 scala> val rdd3 = rdd1.intersection(rdd2) rdd3: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[33] at intersection at :28 // 4.打印計算結果 scala> rdd3.collect() res19: Array[Int] = Array(5, 6, 7)

      1

      2

      3

      4

      5

      6

      7

      8

      9

      10

      11

      12

      13

      14

      15

      16

      4. cartesian(otherDataset)

      1. 作用

      計算 2 個 RDD 的笛卡爾積. 盡量避免使用

      2. 案例:創建兩個RDD,計算兩個RDD的笛卡爾積

      // 1.創建第一個RDD scala> val rdd1 = sc.parallelize(1 to 3) rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[47] at parallelize at :24 // 2.創建第二個RDD scala> val rdd2 = sc.parallelize(2 to 5) rdd2: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[48] at parallelize at :24 // 3.計算兩個RDD的笛卡爾積并打印 scala> rdd1.cartesian(rdd2).collect() res17: Array[(Int, Int)] = Array((1,2), (1,3), (1,4), (1,5), (2,2), (2,3), (2,4), (2,5), (3,2), (3,3), (3,4), (3,5))

      1

      2

      3

      4

      5

      6

      7

      8

      9

      10

      11

      12

      13

      5. zip(otherDataset)

      1. 作用

      拉鏈操作. 需要注意的是, 在 Spark 中, 兩個 RDD 的元素的數量和分區數都必須相同, 否則會拋出異常.(在 scala 中, 兩個集合的長度可以不同)

      類似算子: zipWithIndex, zipPartitions

      2. 案例:創建兩個RDD,并將兩個RDD組合到一起形成一個(k,v)RDD

      // 1.創建第一個RDD scala> val rdd1 = sc.parallelize(Array(1,2,3),3) rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[1] at parallelize at :24 // 2.創建第二個RDD(與1分區數相同) scala> val rdd2 = sc.parallelize(Array("a","b","c"),3) rdd2: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[2] at parallelize at :24 // 3.第一個RDD組合第二個RDD并打印 scala> rdd1.zip(rdd2).collect res1: Array[(Int, String)] = Array((1,a), (2,b), (3,c)) // 4.第二個RDD組合第一個RDD并打印 scala> rdd2.zip(rdd1).collect res2: Array[(String, Int)] = Array((a,1), (b,2), (c,3)) // 5.創建第三個RDD(與1,2分區數不同) scala> val rdd3 = sc.parallelize(Array("a","b","c"),2) rdd3: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[5] at parallelize at :24 // 6.第一個RDD組合第三個RDD并打印 scala> rdd1.zip(rdd3).collect java.lang.IllegalArgumentException: Can't zip RDDs with unequal numbers of partitions: List(3, 2) at org.apache.spark.rdd.ZippedPartitionsBaseRDD.getPartitions(ZippedPartitionsRDD.scala:57) at org.apache.spark.rdd.RDD$$anonfun$partitions.apply(RDD.scala:252) at org.apache.spark.rdd.RDD$$anonfun$partitions.apply(RDD.scala:250) at scala.Option.getOrElse(Option.scala:121) at org.apache.spark.rdd.RDD.partitions(RDD.scala:250) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1965) at org.apache.spark.rdd.RDD$$anonfun$collect.apply(RDD.scala:936) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) at org.apache.spark.rdd.RDD.withScope(RDD.scala:362) at org.apache.spark.rdd.RDD.collect(RDD.scala:935) ... 48 elided

      1

      2

      3

      4

      5

      6

      7

      8

      9

      10

      11

      12

      13

      14

      15

      16

      17

      18

      19

      20

      21

      22

      23

      24

      25

      26

      27

      28

      29

      30

      31

      32

      33

      34

      35

      36

      三. Key-Value 類型

      大多數的 Spark 操作可以用在任意類型的 RDD 上, 但是有一些比較特殊的操作只能用在key-value類型的 RDD 上.這些特殊操作大多都涉及到 shuffle 操作, 比如: 按照 key 分組(group), 聚集(aggregate)等.

      在 Spark 中, 這些操作在包含對偶類型(Tuple2)的 RDD 上自動可用(通過隱式轉換).

      object RDD { implicit def rddToPairRDDFunctions[K, V](rdd: RDD[(K, V)]) (implicit kt: ClassTag[K], vt: ClassTag[V], ord: Ordering[K] = null): PairRDDFunctions[K, V] = { new PairRDDFunctions(rdd) }

      1

      2

      3

      4

      5

      鍵值對的操作是定義在PairRDDFunctions類上, 這個類是對RDD[(K, V)]的裝飾.

      1. partitionBy

      1. 作用

      對pairRDD進行分區操作,如果原有的partionRDD和現有的partionRDD是一致的話就不進行分區, 否則會生成ShuffleRDD,即會產生shuffle過程。

      2. partitionBy 算子源碼

      def partitionBy(partitioner: Partitioner): RDD[(K, V)] = self.withScope { if (self.partitioner == Some(partitioner)) { self } else { new ShuffledRDD[K, V, V](self, partitioner) } }

      1

      2

      3

      4

      5

      6

      7

      8

      9

      3. 案例1:創建一個4個分區的RDD,對其重新分區

      // 1.創建一個RDD scala> val rdd = sc.parallelize(Array((1,"aaa"),(2,"bbb"),(3,"ccc"),(4,"ddd")),4) rdd: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[44] at parallelize at :24 // 2.查看RDD的分區數 scala> rdd.partitions.size res24: Int = 4 // 3.對RDD重新分區 scala> var rdd2 = rdd.partitionBy(new org.apache.spark.HashPartitioner(2)) rdd2: org.apache.spark.rdd.RDD[(Int, String)] = ShuffledRDD[45] at partitionBy at :26 // 4.查看新RDD的分區數 scala> rdd2.partitions.size res25: Int = 2

      1

      2

      3

      4

      5

      6

      7

      8

      9

      10

      11

      12

      13

      14

      15

      16

      4. 案例2:

      scala> val rdd1 = sc.parallelize(Array((1, "a"), (2, "b"), (3, "c"), (4, "d"))) rdd1: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[0] at parallelize at :24 scala> rdd1.partitions.length res1: Int = 2 scala> rdd1.partitionBy(new org.apache.spark.HashPartitioner(3)).partitions.length res3: Int = 3

      1

      2

      3

      4

      5

      6

      7

      8

      9

      2. groupByKey

      1. 作用

      groupByKey也是對每個key進行操作,但只生成一個sequence。

      2. 案例:創建一個pairRDD,將相同key對應值聚合到一個sequence中,并計算相同key對應值的相加結果。

      // 1.創建一個pairRDD scala> val words = Array("one", "two", "two", "three", "three", "three") words: Array[String] = Array(one, two, two, three, three, three) scala> val wordPairsRDD = sc.parallelize(words).map(word => (word, 1)) wordPairsRDD: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[4] at map at :26 // 2.將相同key對應值聚合到一個sequence中 scala> val group = wordPairsRDD.groupByKey() group: org.apache.spark.rdd.RDD[(String, Iterable[Int])] = ShuffledRDD[5] at groupByKey at :28 // 3.打印結果 scala> group.collect() res1: Array[(String, Iterable[Int])] = Array((two,CompactBuffer(1, 1)), (one,CompactBuffer(1)), (three,CompactBuffer(1, 1, 1))) // 4.計算相同key對應值的相加結果 scala> group.map(t => (t._1, t._2.sum)) res2: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[6] at map at :31 // 5.打印結果 scala> res2.collect() res3: Array[(String, Int)] = Array((two,2), (one,1), (three,3))

      1

      2

      3

      4

      5

      6

      7

      8

      9

      10

      11

      12

      13

      14

      15

      16

      17

      18

      19

      20

      21

      22

      23

      注意:

      基于當前的實現, groupByKey必須在內存中持有所有的鍵值對. 如果一個key有太多的value, 則會導致內存溢出(OutOfMemoryError)

      所以這操作非常耗資源, 如果分組的目的是為了在每個key上執行聚合操作(比如: sum 和 average), 則應該使用PairRDDFunctions.aggregateByKey 或者PairRDDFunctions.reduceByKey, 因為他們有更好的性能(會先在分區進行預聚合)

      3. reduceByKey(func, [numTasks])

      1. 作用

      在一個(K,V)的 RDD 上調用,返回一個(K,V)的 RDD,使用指定的reduce函數,將相同key的value聚合到一起,reduce任務的個數可以通過第二個可選的參數來設置。

      2. 案例:創建一個pairRDD,計算相同key對應值的相加結果

      // 1.創建一個pairRDD scala> val rdd = sc.parallelize(List(("female",1),("male",5),("female",5),("male",2))) rdd: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[46] at parallelize at :24 // 2.計算相同key對應值的相加結果 scala> val reduce = rdd.reduceByKey((x,y) => x+y) reduce: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[47] at reduceByKey at :26 // 3.打印結果 scala> reduce.collect() res29: Array[(String, Int)] = Array((female,6), (male,7))

      1

      2

      3

      4

      5

      6

      7

      8

      9

      10

      11

      12

      4. reduceByKey和groupByKey的區別

      reduceByKey:按照key進行聚合,在shuffle之前有combine(預聚合)操作,返回結果是RDD[k,v]。

      groupByKey:按照key進行分組,直接進行shuffle。

      開發指導:reduceByKey比groupByKey性能更好,建議使用。但是需要注意是否會影響業務邏輯。

      5. aggregateByKey(zeroValue)(seqOp, combOp, [numTasks])

      1. 函數聲明:

      def aggregateByKey[U: ClassTag](zeroValue: U)(seqOp: (U, V) => U, combOp: (U, U) => U): RDD[(K, U)] = self.withScope { aggregateByKey(zeroValue, defaultPartitioner(self))(seqOp, combOp) }

      1

      2

      3

      4

      5

      使用給定的 combine 函數和一個初始化的zero value, 對每個key的value進行聚合.

      這個函數返回的類型U不同于源 RDD 中的V類型. U的類型是由初始化的zero value來定的. 所以, 我們需要兩個操作: - 一個操作(seqOp)去把 1 個v變成 1 個U - 另外一個操作(combOp)來合并 2 個U

      第一個操作用于在一個分區進行合并, 第二個操作用在兩個分區間進行合并.

      為了避免內存分配, 這兩個操作函數都允許返回第一個參數, 而不用創建一個新的U

      2. 參數描述:

      zeroValue:給每一個分區中的每一個key一個初始值;

      seqOp:函數用于在每一個分區中用初始值逐步迭代value;

      combOp:函數用于合并每個分區中的結果。

      3. 案例:創建一個pairRDD,取出每個分區相同key對應值的最大值,然后相加

      4. 案例分析:

      5. 案例具體過程:計算每個 key 的平均值

      // 1.創建一個pairRDD scala> val rdd = sc.parallelize(List(("a",3),("a",2),("c",4),("b",3),("c",6),("c",8)),2) rdd: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[0] at parallelize at :24 // 2.取出每個分區相同key對應值的最大值,分區間然后相加 scala> val agg = rdd.aggregateByKey(0)(math.max(_,_),_+_) agg: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[1] at aggregateByKey at :26 // 3.打印結果 scala> agg.collect() res0: Array[(String, Int)] = Array((b,3), (a,3), (c,12))

      1

      2

      3

      4

      5

      6

      7

      8

      9

      10

      11

      12

      13

      6. foldByKey

      參數:(zeroValue: V)(func: (V, V) => V): RDD[(K, V)]

      1. 作用

      aggregateByKey的簡化操作,seqop和combop相同

      2. 案例:創建一個pairRDD,計算相同key對應值的相加結果

      // 1.創建一個pairRDD scala> val rdd = sc.parallelize(List((1,3),(1,2),(1,4),(2,3),(3,6),(3,8)),3) rdd: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[91] at parallelize at :24 // 2.計算相同key對應值的相加結果 scala> val agg = rdd.foldByKey(0)(_+_) agg: org.apache.spark.rdd.RDD[(Int, Int)] = ShuffledRDD[92] at foldByKey at :26 // 3.打印結果 scala> agg.collect() res61: Array[(Int, Int)] = Array((3,14), (1,9), (2,3))

      1

      2

      3

      4

      5

      6

      7

      8

      9

      10

      11

      12

      13

      7. combineByKey[C]

      參數:(createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C)

      1. 作用

      對相同K,把V合并成一個集合。

      2. 參數描述:

      (1)createCombiner: combineByKey()會遍歷分區中的所有元素,因此每個元素的鍵要么還沒有遇到過,要么就和之前的某個元素的鍵相同。如果這是一個新的元素,combineByKey()會使用一個叫作createCombiner()的函數來創建那個鍵對應的累加器的初始值

      (2)mergeValue:如果這是一個在處理當前分區之前已經遇到的鍵,它會使用mergeValue()方法將該鍵的累加器對應的當前值與這個新的值進行合并

      (3)mergeCombiners: 由于每個分區都是獨立處理的,因此對于同一個鍵可以有多個累加器。如果有兩個或者更多的分區都有對應同一個鍵的累加器, 就需要使用用戶提供的mergeCombiners() 方法將各個分區的結果進行合并。

      3. 案例:創建一個pairRDD,根據key計算每種key的均值。(先計算每個key出現的次數以及可以對應值的總和,再相除得到結果)

      4. 案例分析:

      5. 案例具體過程:對比幾個按照 key 聚集的函數的區別和聯系

      // 1.創建一個pairRDD scala> val input = sc.parallelize(Array(("a", 88), ("b", 95), ("a", 91), ("b", 93), ("a", 95), ("b", 98)),2) input: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[52] at parallelize at :26 // 2.將相同key對應的值相加,同時記錄該key出現的次數,放入一個二元組 scala> val combine = input.combineByKey((_,1),(acc:(Int,Int),v)=>(acc._1+v,acc._2+1),(acc1:(Int,Int),acc2:(Int,Int))=>(acc1._1+acc2._1,acc1._2+acc2._2)) combine: org.apache.spark.rdd.RDD[(String, (Int, Int))] = ShuffledRDD[5] at combineByKey at :28 // 3.打印合并后的結果 scala> combine.collect res5: Array[(String, (Int, Int))] = Array((b,(286,3)), (a,(274,3))) // 4.計算平均值 scala> val result = combine.map{case (key,value) => (key,value._1/value._2.toDouble)} result: org.apache.spark.rdd.RDD[(String, Double)] = MapPartitionsRDD[54] at map at :30 // 5.打印結果 scala> result.collect() res33: Array[(String, Double)] = Array((b,95.33333333333333), (a,91.33333333333333))

      1

      2

      3

      4

      5

      6

      7

      8

      9

      10

      11

      12

      13

      14

      15

      16

      17

      18

      19

      20

      21

      8. sortByKey([ascending], [numTasks])

      1. 作用

      在一個(K,V)的 RDD 上調用, K必須實現 Ordered[K] 接口(或者有一個隱式值: Ordering[K]), 返回一個按照key進行排序的(K,V)的 RDD

      2. 案例:創建一個pairRDD,按照key的正序和倒序進行排序

      // 1.創建一個pairRDD scala> val rdd = sc.parallelize(Array((3,"aa"),(6,"cc"),(2,"bb"),(1,"dd"))) rdd: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[14] at parallelize at :24 // 2.按照key的正序 scala> rdd.sortByKey(true).collect() res9: Array[(Int, String)] = Array((1,dd), (2,bb), (3,aa), (6,cc)) // 3.按照key的倒序 scala> rdd.sortByKey(false).collect() res10: Array[(Int, String)] = Array((6,cc), (3,aa), (2,bb), (1,dd))

      1

      2

      3

      4

      5

      6

      7

      8

      9

      10

      11

      12

      9. mapValues

      1. 作用

      針對于(K,V)形式的類型只對V進行操作

      2. 案例:創建一個pairRDD,并將value添加字符串"|||"

      // 1.創建一個pairRDD scala> val rdd3 = sc.parallelize(Array((1,"a"),(1,"d"),(2,"b"),(3,"c"))) rdd3: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[67] at parallelize at :24 // 2.對value添加字符串"|||" scala> rdd3.mapValues(_+"|||").collect() res26: Array[(Int, String)] = Array((1,a|||), (1,d|||), (2,b|||), (3,c|||))

      1

      2

      3

      4

      5

      6

      7

      8

      9

      9. mapValues

      1. 作用

      針對于(K,V)形式的類型只對V進行操作

      2. 案例:創建一個pairRDD,并將value添加字符串"|||"

      Spark Core快速入門系列(3) | <Transformation>轉換算子

      // 1.創建一個pairRDD scala> val rdd3 = sc.parallelize(Array((1,"a"),(1,"d"),(2,"b"),(3,"c"))) rdd3: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[67] at parallelize at :24 // 2.對value添加字符串"|||" scala> rdd3.mapValues(_+"|||").collect() res26: Array[(Int, String)] = Array((1,a|||), (1,d|||), (2,b|||), (3,c|||))

      1

      2

      3

      4

      5

      6

      7

      8

      10. join(otherDataset, [numTasks])

      1. 作用

      內連接:在類型為(K,V)和(K,W)的RDD上調用,返回一個相同key對應的所有元素對在一起的(K,(V,W))的RDD

      2. 案例:創建兩個pairRDD,并將key相同的數據聚合到一個元組。

      // 1.創建第一個pairRDD scala> val rdd = sc.parallelize(Array((1,"a"),(2,"b"),(3,"c"))) rdd: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[32] at parallelize at :24 // 2.創建第二個pairRDD scala> val rdd1 = sc.parallelize(Array((1,4),(2,5),(3,6))) rdd1: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[33] at parallelize at :24 // 3.join操作并打印結果 scala> rdd.join(rdd1).collect() res13: Array[(Int, (String, Int))] = Array((1,(a,4)), (2,(b,5)), (3,(c,6)))

      1

      2

      3

      4

      5

      6

      7

      8

      9

      10

      11

      12

      注意:

      1 如果某一個 RDD 有重復的 Key, 則會分別與另外一個 RDD 的相同的 Key進行組合.

      2 也支持外連接:leftOuterJoin, rightOuterJoin, and fullOuterJoin.

      11. cogroup(otherDataset, [numTasks])

      1. 作用

      在類型為(K,V)和(K,W)的RDD上調用,返回一個(K,(Iterable,Iterable))類型的RDD

      2. 案例:創建兩個pairRDD,并將key相同的數據聚合到一個迭代器。

      // 1.創建第一個pairRDD scala> val rdd = sc.parallelize(Array((1,"a"),(2,"b"),(3,"c"))) rdd: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[37] at parallelize at :24 // 2.創建第二個pairRDD scala> val rdd1 = sc.parallelize(Array((1,4),(2,5),(3,6))) rdd1: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[38] at parallelize at :24 // 3.cogroup兩個RDD并打印結果 scala> rdd.cogroup(rdd1).collect() res14: Array[(Int, (Iterable[String], Iterable[Int]))] = Array((1,(CompactBuffer(a),CompactBuffer(4))), (2,(CompactBuffer(b),CompactBuffer(5))), (3,(CompactBuffer(c),CompactBuffer(6))))

      1

      2

      3

      4

      5

      6

      7

      8

      9

      10

      11

      12

      本次的分享就到這里了,

      好書不厭讀百回,熟讀課思子自知。而我想要成為全場最靚的仔,就必須堅持通過學習來獲取更多知識,用知識改變命運,用博客見證成長,用行動證明我在努力。

      如果我的博客對你有幫助、如果你喜歡我的博客內容,請“” “評論”“”一鍵三連哦!聽說的人運氣不會太差,每一天都會元氣滿滿呦!如果實在要白嫖的話,那祝你開心每一天,歡迎常來我博客看看。

      碼字不易,大家的支持就是我堅持下去的動力。后不要忘了關注我哦!

      spark 數據結構

      版權聲明:本文內容由網絡用戶投稿,版權歸原作者所有,本站不擁有其著作權,亦不承擔相應法律責任。如果您發現本站中有涉嫌抄襲或描述失實的內容,請聯系我們jiasou666@gmail.com 處理,核實后本網站將在24小時內刪除侵權內容。

      上一篇:【經驗科普】實戰分析C工程代碼可能遇到的編譯問題及其解決思路
      下一篇:軟件測試基礎知識
      相關文章
      中文字幕精品三区无码亚洲| 91亚洲精品视频| 亚洲国产高清在线精品一区| 亚洲中文字幕无码av永久| 亚洲日本香蕉视频| 亚洲欧洲日本精品| 亚洲黄网在线观看| 亚洲另类视频在线观看| 亚洲成年人电影网站| 亚洲色图视频在线观看| 337p欧洲亚洲大胆艺术| 亚洲一区二区在线视频| 亚洲日韩区在线电影| 久久精品国产亚洲av高清漫画 | 亚洲午夜免费视频| 亚洲视频.com| 亚洲精品视频在线观看免费| 亚洲第一成年人网站| 亚洲欧洲校园自拍都市| 久久精品国产亚洲AV忘忧草18 | 亚洲熟妇无码另类久久久| 亚洲人成在线播放网站| 亚洲成AV人在线播放无码 | 国产精品日本亚洲777| 亚洲精品人成无码中文毛片| 在线观看亚洲精品福利片| 韩国亚洲伊人久久综合影院| 亚洲第一视频在线观看免费| 亚洲一级特黄大片无码毛片| 亚洲人成网77777亚洲色| 久久香蕉国产线看观看亚洲片| 久久久久亚洲Av无码专| 亚洲国产日韩在线人成下载| 午夜在线a亚洲v天堂网2019| 亚洲国产理论片在线播放| 亚洲综合激情五月色一区| 日本系列1页亚洲系列| 国产亚洲精品线观看动态图| 亚洲成A∨人片在线观看不卡| 久久久久亚洲AV无码观看| 亚洲日本乱码卡2卡3卡新区|