Spark RDD常用算子整理
一、轉換算子
1、Value類型
1.1、map
函數簽名:
def map[U: ClassTag](f: T => U): RDD[U]
函數說明:
將處理的數據逐條進行映射轉換,這里的轉換可以是類型的轉換,也可以是值的轉換。
使用樣例:
object Spark_Rdd_Operator_Transform01 { def main(args: Array[String]): Unit = { // 創建Spark上下文環境 val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator") val sc = new SparkContext(sparkConf) // 創建RDD val rdd = sc.makeRDD(List(1, 2, 3, 4)) // 使用map轉換算子 val mapRdd = rdd.map(_ * 2) //采集數據并打印 mapRdd.collect().foreach(println) // 關閉環境 sc.stop() } }
1.2、mapPartitions
函數簽名:
def mapPartitions[U: ClassTag]( f: Iterator[T] => Iterator[U], preservesPartitioning: Boolean = false): RDD[U]
函數說明:
將待處理的數據以分區為單位發送到計算節點進行處理,這里的處理是指可以進行任意的處理,哪怕是過濾數據。
使用樣例:
object Spark_Rdd_Operator_Transform02 { def main(args: Array[String]): Unit = { // 創建Spark上下文環境 val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator") val sc = new SparkContext(sparkConf) // 創建RDD val rdd = sc.makeRDD(List(1, 2, 3, 4), 2) // 使用mapPartitions轉換算子 val mapRdd = rdd.mapPartitions(iter => { println(">>>>>>>>>>>>>") iter.map(_ * 2) }) //采集數據并打印 mapRdd.collect().foreach(println) // 關閉環境 sc.stop() } }
map 和 mapPartitions 的區別
數據處理角度:Map算子是分區內一個數據一個數據的執行,類似于串行操作。而mapPartitions算子是以分區為單位進行批處理操作。 功能的角度:Map算子主要目的將數據源中的數據進行轉換和改變。但是不會減少或增多數據。MapPartitions算子需要傳遞一個迭代器,返回一個迭代器,沒有要求的元素的個數保持不變,所以可以增加或減少數據 性能的角度:Map算子因為類似于串行操作,所以性能比較低,而是mapPartitions算子類似于批處理,所以性能較高。但是mapPartitions算子會長時間占用內存,那么這樣會導致內存可能不夠用,出現內存溢出的錯誤。所以在內存有限的情況下,不推薦使用。使用map操作。
1.3、mapPartitionsWithIndex
函數簽名:
def mapPartitionsWithIndex[U: ClassTag]( f: (Int, Iterator[T]) => Iterator[U], preservesPartitioning: Boolean = false): RDD[U]
函數說明:
將待處理的數據以分區為單位發送到計算節點進行處理,這里的處理是指可以進行任意的處理,哪怕是過濾數據,在處理時同時可以獲取當前分區索引。
使用樣例:
object Spark_Rdd_Operator_Transform03 { def main(args: Array[String]): Unit = { // 創建Spark上下文環境 val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator") val sc = new SparkContext(sparkConf) // 創建RDD val rdd = sc.makeRDD(List(1, 2, 3, 4), 2) // 使用mapPartitionsWithIndex轉換算子獲取指定分區的數據 val mapRdd = rdd.mapPartitionsWithIndex((index, iter) => { if (index == 1) { iter } else { Nil.iterator } }) //采集數據并打印 mapRdd.collect().foreach(println) // 關閉環境 sc.stop() } }
1.4、flatMap
函數簽名:
def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U]
函數說明:
將處理的數據進行扁平化后再進行映射處理,所以算子也稱之為扁平映射
使用樣例:
object Spark_Rdd_Operator_Transform04 { def main(args: Array[String]): Unit = { // 創建Spark上下文環境 val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator") val sc = new SparkContext(sparkConf) // 創建RDD val rdd = sc.makeRDD(List(List(1, 2), 3, List(4, 5))) // 轉換算子,使用模式匹配 val flatRdd = rdd.flatMap { case list: List[_] => list case dat => List(dat) } //采集數據并打印 flatRdd.collect().foreach(println) // 關閉環境 sc.stop() } }
1.5、glom
函數簽名:
def glom(): RDD[Array[T]]
函數說明:
將同一個分區的數據直接轉換為相同類型的內存數組進行處理,分區不變
使用樣例:
object Spark_Rdd_Operator_Transform05 { def main(args: Array[String]): Unit = { // 創建Spark上下文環境 val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator") val sc = new SparkContext(sparkConf) // 創建RDD val rdd = sc.makeRDD(List(1, 2, 3, 4), 2) val glomRdd = rdd.glom() // 分區內取最大值 val maxRdd = glomRdd.map(array => array.max) // 分區間最大值求和 println(maxRdd.collect().sum) // 關閉環境 sc.stop() } }
1.6、groupBy
函數簽名:
def groupBy[K](f: T => K)(implicit kt: ClassTag[K]): RDD[(K, Iterable[T])]
函數說明:
將數據根據指定的規則進行分組, 分區默認不變,但是數據會被打亂重新組合,我們將這樣的操作稱之為shuffle。極限情況下,數據可能被分在同一個分區中。 一個組的數據在一個分區中,但是并不是說一個分區中只有一個組。
使用樣例:
object Spark_Rdd_Operator_Transform06 { def main(args: Array[String]): Unit = { // 創建Spark上下文環境 val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator") val sc = new SparkContext(sparkConf) // 創建RDD val rdd = sc.makeRDD(List("Hello", "hive", "hbase", "Hadoop"), 1) // 數據分組 val groupRdd = rdd.groupBy(_.charAt(0)) // 打印數據 groupRdd.collect().foreach(println) // 關閉環境 sc.stop() } }
1.7、filter
函數簽名:
def filter(f: T => Boolean): RDD[T]
函數說明:
將數據根據指定的規則進行篩選過濾,符合規則的數據保留,不符合規則的數據丟棄。當數據進行篩選過濾后,分區不變,但是分區內的數據可能不均衡,生產環境下,可能會出現數據傾斜。
使用樣例:
object Spark_Rdd_Operator_Transform07 { def main(args: Array[String]): Unit = { // 創建Spark上下文環境 val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator") val sc = new SparkContext(sparkConf) // 創建RDD val rdd = sc.makeRDD(List(1, 2, 3, 4), 1) // 數據過濾(取偶數) val filterRdd = rdd.filter(_ % 2 == 0) // 打印數據 filterRdd.collect().foreach(println) // 關閉環境 sc.stop() } }
1.8、sample
函數簽名:
def sample( withReplacement: Boolean, fraction: Double, seed: Long = Utils.random.nextLong): RDD[T]
函數說明:
根據指定的規則從數據集中抽取數據
使用樣例:
object Spark_Rdd_Operator_Transform08 { def main(args: Array[String]): Unit = { // 創建Spark上下文環境 val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator") val sc = new SparkContext(sparkConf) val rdd = sc.makeRDD(List(1, 2, 3, 4), 1) // 抽取數據不放回(伯努利算法) // 伯努利算法:又叫 0、 1 分布。例如扔硬幣,要么正面,要么反面。 // 具體實現:根據種子和隨機算法算出一個數和第二個參數設置幾率比較,小于第二個參數要,大于不要 // 第一個參數:抽取的數據是否放回, false:不放回 // 第二個參數:抽取的幾率,范圍在[0,1]之間,0:全不取; 1:全取; // 第三個參數:隨機數種子 val dataRdd1 = rdd.sample(false, 0.5) // 打印數據 dataRdd1.collect().foreach(println) println("******************") // 抽取數據放回(泊松算法) // 第一個參數:抽取的數據是否放回, true:放回; false:不放回 // 第二個參數:重復數據的幾率,范圍大于等于 0.表示每一個元素被期望抽取到的次數 // 第三個參數:隨機數種子 val dataRdd2 = rdd.sample(true, 2) // 打印數據 dataRdd2.collect().foreach(println) // 關閉環境 sc.stop() } }
1.9、distinct
函數簽名:
def distinct(): RDD[T] def distinct(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T]
函數說明:
將數據集中重復的數據去重
使用樣例:
object Spark_Rdd_Operator_Transform09 { def main(args: Array[String]): Unit = { // 創建Spark上下文環境 val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator") val sc = new SparkContext(sparkConf) // 創建RDD val rdd = sc.makeRDD(List(1, 2, 3, 4, 1, 2), 1) val dataRdd1 = rdd.distinct() // 打印數據 dataRdd1.collect().foreach(println) println("******************") val dataRdd2 = rdd.distinct(2) // 打印數據 dataRdd2.collect().foreach(println) // 關閉環境 sc.stop() } }
1.10、coalesce
函數簽名:
def coalesce(numPartitions: Int, shuffle: Boolean = false, partitionCoalescer: Option[PartitionCoalescer] = Option.empty) (implicit ord: Ordering[T] = null) : RDD[T]
函數說明:
根據數據量縮減分區,用于大數據集過濾后,提高小數據集的執行效率當spark程序中,存在過多的小任務的時候,可以通過coalesce方法,收縮合并分區,減少分區的個數,減小任務調度成本。
使用樣例:
object Spark_Rdd_Operator_Transform10 { def main(args: Array[String]): Unit = { // 創建Spark上下文環境 val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator") val sc = new SparkContext(sparkConf) // 創建RDD val rdd = sc.makeRDD(List(1, 2, 3, 4, 1, 2), 6) val dataRdd = rdd.coalesce(2) dataRdd.saveAsTextFile("output") // 關閉環境 sc.stop() } }
1.11、repartition
函數簽名:
def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T]
函數說明:
該操作內部其實執行的是coalesce操作,參數shuffle的默認值為true。無論是將分區數多的RDD轉換為分區數少的RDD,還是將分區數少的RDD轉換為分區數多的RDD,repartition操作都可以完成,因為無論如何都會經shuffle過程。
使用樣例:
object Spark_Rdd_Operator_Transform11 { def main(args: Array[String]): Unit = { // 創建Spark上下文環境 val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator") val sc = new SparkContext(sparkConf) // 創建RDD val rdd = sc.makeRDD(List(1, 2, 3, 4, 1, 2), 2) val dataRdd = rdd.repartition(4) dataRdd.saveAsTextFile("output") // 關閉環境 sc.stop() } }
1.12、sortBy
函數簽名:
def sortBy[K]( f: (T) => K, ascending: Boolean = true, numPartitions: Int = this.partitions.length) (implicit ord: Ordering[K], ctag: ClassTag[K]): RDD[T]
函數說明:
該操作用于排序數據。在排序之前,可以將數據通過f函數進行處理,之后按照f函數處理的結果進行排序,默認為升序排列。排序后新產生的RDD的分區數與原RDD的分區數一 致。中間存在shuffle的過程。
使用樣例:
object Spark_Rdd_Operator_Transform12 { def main(args: Array[String]): Unit = { // 創建Spark上下文環境 val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator") val sc = new SparkContext(sparkConf) // 創建RDD val rdd = sc.makeRDD(List(1, 2, 3, 4, 1, 2), 2) val dataRdd = rdd.sortBy(num => num, false, 4) dataRdd.collect().foreach(println) // 關閉環境 sc.stop() } }
2、雙 Value 類型
2.1、intersection
函數簽名:
def intersection(other: RDD[T]): RDD[T]
函數說明:
對源RDD和參數RDD求交集后返回一個新的RDD
使用樣例:
object Spark_Rdd_Operator_Transform13 { def main(args: Array[String]): Unit = { // 創建Spark上下文環境 val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator") val sc = new SparkContext(sparkConf) // 創建RDD val rdd1 = sc.makeRDD(List(1, 2, 3, 4)) val rdd2 = sc.makeRDD(List(3, 4, 5, 6)) // 數據取交集 val rdd = rdd1.intersection(rdd2) // 打印數據 rdd.collect().foreach(println) // 關閉環境 sc.stop() } }
2.2、union
函數簽名:
def union(other: RDD[T]): RDD[T]
函數說明:
對源 RDD和參數RDD求并集后返回一個新的RDD
使用樣例:
object Spark_Rdd_Operator_Transform14 { def main(args: Array[String]): Unit = { // 創建Spark上下文環境 val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator") val sc = new SparkContext(sparkConf) // 創建RDD val rdd1 = sc.makeRDD(List(1, 2, 3, 4)) val rdd2 = sc.makeRDD(List(3, 4, 5, 6)) // 數據取交集 val rdd = rdd1.union(rdd2) // 打印數據 rdd.collect().foreach(println) // 關閉環境 sc.stop() } }
2.3、subtract
函數簽名:
def subtract(other: RDD[T]): RDD[T]
函數說明:
以一個RDD元素為主,去除兩個RDD中重復元素,將其他元素保留下來。求差集
使用樣例:
object Spark_Rdd_Operator_Transform15 { def main(args: Array[String]): Unit = { // 創建Spark上下文環境 val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator") val sc = new SparkContext(sparkConf) // 創建RDD val rdd1 = sc.makeRDD(List(1, 2, 3, 4)) val rdd2 = sc.makeRDD(List(3, 4, 5, 6)) // 數據取交集 val rdd = rdd1.subtract(rdd2) // 打印數據 rdd.collect().foreach(println) // 關閉環境 sc.stop() } }
2.4、zip
函數簽名:
def zip[U: ClassTag](other: RDD[U]): RDD[(T, U)]
函數說明:
將兩個RDD中的元素,以鍵值對的形式進行合并。其中,鍵值對中的Key為第1個RDD中的元素,Value為第2個RDD中的相同位置的元素。
使用樣例:
object Spark_Rdd_Operator_Transform16 { def main(args: Array[String]): Unit = { // 創建Spark上下文環境 val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator") val sc = new SparkContext(sparkConf) // 創建RDD val rdd1 = sc.makeRDD(List(1, 2, 3, 4)) val rdd2 = sc.makeRDD(List(3, 4, 5, 6)) // 數據取交集 val rdd = rdd1.zip(rdd2) // 打印數據 rdd.collect().foreach(println) // 關閉環境 sc.stop() } }
3、Key - Value 類型
3.1、partitionBy
函數簽名:
def partitionBy(partitioner: Partitioner): RDD[(K, V)]
函數說明:
將數據按照指定Partitioner重新進行分區。Spark默認的分區器是HashPartitioner。
使用樣例:
object Spark_Rdd_Operator_Transform17 { def main(args: Array[String]): Unit = { // 創建Spark上下文環境 val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator") val sc = new SparkContext(sparkConf) // 創建RDD val rdd = sc.makeRDD(Array((1, "aaa"), (2, "bbb"), (3, "ccc")), 3) // 數據分組 val rdd2 = rdd.partitionBy(new HashPartitioner(2)) // 打印數據 rdd2.saveAsTextFile("output") // 關閉環境 sc.stop() } }
3.2、reduceByKey
函數簽名:
def reduceByKey(func: (V, V) => V): RDD[(K, V)] def reduceByKey(func: (V, V) => V, numPartitions: Int): RDD[(K, V)] def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)]
函數說明:
可以將數據按照相同的Key對Value進行聚合。
使用樣例:
object Spark_Rdd_Operator_Transform18 { def main(args: Array[String]): Unit = { // 創建Spark上下文環境 val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator") val sc = new SparkContext(sparkConf) // 創建RDD val rdd = sc.makeRDD(List(("a", 1), ("b", 2), ("c", 3))) // 數據聚合 val dataRdd1 = rdd.reduceByKey(_ + _) dataRdd1.saveAsTextFile("output1") val dataRdd2 = rdd.reduceByKey(_ + _, 2) dataRdd2.saveAsTextFile("output2") // 關閉環境 sc.stop() } }
3.3、groupByKey
函數簽名:
def groupByKey(): RDD[(K, Iterable[V])] def groupByKey(numPartitions: Int): RDD[(K, Iterable[V])] def groupByKey(partitioner: Partitioner): RDD[(K, Iterable[V])]
函數說明:
將數據源的數據根據key對value進行分組。
使用樣例:
object Spark_Rdd_Operator_Transform19 { def main(args: Array[String]): Unit = { // 創建Spark上下文環境 val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator") val sc = new SparkContext(sparkConf) // 創建RDD val rdd = sc.makeRDD(List(("a", 1), ("b", 2), ("c", 3))) // 數據根據key分組 val dataRdd1 = rdd.groupByKey() val dataRdd2 = rdd.groupByKey(2) val dataRdd3 = rdd.groupByKey(new HashPartitioner(2)) dataRdd1.saveAsTextFile("output1") dataRdd2.saveAsTextFile("output2") dataRdd3.saveAsTextFile("output3") // 關閉環境 sc.stop() } }
reduceByKey 和 groupByKey 的區別:
從shuffle的角度:reduceByKey和groupByKey都存在shuffle的操作,但是reduceByKey可以在shuffle前對分區內相同key的數據進行預聚合(combine)功能,這樣會減少落盤的數據量,而groupByKey只是進行分組,不存在數據量減少的問題,reduceByKey性能比較高。 從功能的角度:reduceByKey其實包含分組和聚合的功能。GroupByKey只能分組,不能聚合,所以在分組聚合的場合下,推薦使用reduceByKey,如果僅僅是分組而不需要聚合。那么還是只能使用groupByKey
3.4、aggregateByKey
函數簽名:
def aggregateByKey[U: ClassTag](zeroValue: U)(seqOp: (U, V) => U, combOp: (U, U) => U): RDD[(K, U)] def aggregateByKey[U: ClassTag](zeroValue: U, numPartitions: Int)(seqOp: (U, V) => U, combOp: (U, U) => U): RDD[(K, U)] def aggregateByKey[U: ClassTag](zeroValue: U, partitioner: Partitioner)(seqOp: (U, V) => U, combOp: (U, U) => U): RDD[(K, U)]
函數說明:
將數據根據不同的規則進行分區內計算和分區間計算
使用樣例1:
object Spark_Rdd_Operator_Transform20 { def main(args: Array[String]): Unit = { // 創建Spark上下文環境 val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator") val sc = new SparkContext(sparkConf) // 創建RDD val rdd = sc.makeRDD(List(("a", 1), ("a", 2), ("c", 3))) // 數據根據key分組 val dataRdd = rdd.aggregateByKey(0)(_ + _, _ + _) dataRdd.collect().foreach(println) // 關閉環境 sc.stop() } }
使用樣例2:
object Spark_Rdd_Operator_Transform20 { def main(args: Array[String]): Unit = { // 創建Spark上下文環境 val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator") val sc = new SparkContext(sparkConf) // 取出每個分區內相同 key 的最大值然后分區間相加 // aggregateByKey 算子是函數柯里化,存在兩個參數列表 // 1. 第一個參數列表中的參數表示初始值 // 2. 第二個參數列表中含有兩個參數 // 2.1 第一個參數表示分區內的計算規則 // 2.2 第二個參數表示分區間的計算規則 val rdd = sc.makeRDD(List(("a", 1), ("a", 2), ("c", 3), ("b", 4), ("c", 5), ("c", 6)), 2) // 0:("a",1),("a",2),("c",3) => (a,10)(c,10) // => (a,10)(b,10)(c,20) // 1:("b",4),("c",5),("c",6) => (b,10)(c,10) val dataRdd = rdd.aggregateByKey(10)( (x, y) => math.max(x, y), (x, y) => x + y ) dataRdd.collect().foreach(println) // 關閉環境 sc.stop() } }
3.5、foldByKey
函數簽名:
def foldByKey(zeroValue: V)(func: (V, V) => V): RDD[(K, V)] def foldByKey(zeroValue: V, numPartitions: Int)(func: (V, V) => V): RDD[(K, V)] def foldByKey(zeroValue: V, partitioner: Partitioner)(func: (V, V) => V): RDD[(K, V)]
函數說明:
當分區內計算規則和分區間計算規則相同時,aggregateByKey就可以簡化為foldByKey
使用樣例:
object Spark_Rdd_Operator_Transform21 { def main(args: Array[String]): Unit = { // 創建Spark上下文環境 val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator") val sc = new SparkContext(sparkConf) // 創建RDD val rdd = sc.makeRDD(List(("a", 1), ("a", 2), ("c", 3))) // 計算結果 val dataRdd = rdd.foldByKey(0)(_ + _) dataRdd.collect().foreach(println) // 關閉環境 sc.stop() } }
3.6、combineByKey
函數簽名:
def combineByKey[C]( createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C, numPartitions: Int): RDD[(K, C)]
函數說明:
最通用的對key-value型rdd進行聚集操作的聚集函數(aggregation function)。類似于aggregate(),combineByKey()允許用戶返回值的類型與輸入不一致。
使用樣例:
object Spark_Rdd_Operator_Transform22 { def main(args: Array[String]): Unit = { // 創建Spark上下文環境 val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator") val sc = new SparkContext(sparkConf) // 創建RDD val rdd = sc.makeRDD(List(("a", 88), ("b", 95), ("a", 91), ("b", 93), ("a", 95), ("b", 98)), 1) // 求每個 key 的平均值 val combineRdd = rdd.combineByKey( (_, 1), (acc: (Int, Int), v) => (acc._1 + v, acc._2 + 1), (acc1: (Int, Int), acc2: (Int, Int)) => (acc1._1 + acc2._1, acc1._2 + acc2._2) ) // 打印數據 combineRdd.collect().foreach(println) // 關閉環境 sc.stop() } }
reduceByKey、 foldByKey、 aggregateByKey、 combineByKey 的區別
reduceByKey: 相同key的第一個數據不進行任何計算,分區內和分區間計算規則相同 foldByKey: 相同key的第一個數據和初始值進行分區內計算,分區內和分區間計算規則相同 aggregateByKey:相同key的第一個數據和初始值進行分區內計算,分區內和分區間計算規則可以不相同 combineByKey:當計算時,發現數據結構不滿足要求時,可以讓第一個數據轉換結構。分區內和分區間計算規則不相同。
3.7、sortByKey
函數簽名:
def sortByKey(ascending: Boolean = true, numPartitions: Int = self.partitions.length) : RDD[(K, V)]
函數說明:
在一個(K,V)的 RDD 上調用, K 必須實現 Ordered 接口(特質),返回一個按照 key 進行排序的
使用樣例:
object Spark_Rdd_Operator_Transform23 { def main(args: Array[String]): Unit = { // 創建Spark上下文環境 val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator") val sc = new SparkContext(sparkConf) // 創建RDD val rdd = sc.makeRDD(List(("a", 1), ("b", 2), ("c", 3))) // 按照key排序 val sortRdd1 = rdd.sortByKey(true) val sortRdd2 = rdd.sortByKey(false) // 打印數據 sortRdd1.collect().foreach(println) println("*******") sortRdd2.collect().foreach(println) // 關閉環境 sc.stop() } }
3.8、join
函數簽名:
def join[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, W))]
函數說明:
在類型為(K,V)和(K,W)的 RDD 上調用,返回一個相同 key 對應的所有元素連接在一起的(K,(V,W))的 RDD
使用樣例:
object Spark_Rdd_Operator_Transform24 { def main(args: Array[String]): Unit = { // 創建Spark上下文環境 val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator") val sc = new SparkContext(sparkConf) // 創建RDD val rdd1 = sc.makeRDD(Array((1, "a"), (2, "b"), (3, "c"))) val rdd2 = sc.makeRDD(Array((1, 4), (2, 5), (3, 6))) rdd1.join(rdd2).collect().foreach(println) // 關閉環境 sc.stop() } }
3.9、leftOuterJoin
函數簽名:
def leftOuterJoin[W](other: RDD[(K, W)],partitioner: Partitioner): RDD[(K, (V, Option[W]))]
函數說明:
類似于 SQL 語句的左外連接
使用樣例:
object Spark_Rdd_Operator_Transform25 { def main(args: Array[String]): Unit = { // 創建Spark上下文環境 val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator") val sc = new SparkContext(sparkConf) // 創建RDD val rdd1 = sc.makeRDD(Array((1, "a"), (2, "b"), (3, "c"), (4, "d"))) val rdd2 = sc.makeRDD(Array((1, 4), (2, 5), (3, 6))) rdd1.leftOuterJoin(rdd2).collect().foreach(println) // 關閉環境 sc.stop() } }
3.10、rightOuterJoin
函數簽名:
def rightOuterJoin[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (Option[V], W))]
函數說明:
類似于 SQL 語句的右外連接
使用樣例:
object Spark_Rdd_Operator_Transform26 { def main(args: Array[String]): Unit = { // 創建Spark上下文環境 val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator") val sc = new SparkContext(sparkConf) // 創建RDD val rdd1 = sc.makeRDD(Array((1, "a"), (2, "b"), (3, "c"), (4, "d"))) val rdd2 = sc.makeRDD(Array((1, 4), (2, 5), (3, 6))) rdd1.rightOuterJoin(rdd2).collect().foreach(println) // 關閉環境 sc.stop() } }
3.11、fullOuterJoin
函數簽名:
def fullOuterJoin[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (Option[V], Option[W]))]
函數說明:
類似于 SQL 語句的全外連接
使用樣例:
object Spark_Rdd_Operator_Transform27 { def main(args: Array[String]): Unit = { // 創建Spark上下文環境 val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator") val sc = new SparkContext(sparkConf) // 創建RDD val rdd1 = sc.makeRDD(Array((1, "a"), (2, "b"), (3, "c"), (4, "d"))) val rdd2 = sc.makeRDD(Array((1, 4), (2, 5), (3, 6))) rdd1.fullOuterJoin(rdd2).collect().foreach(println) // 關閉環境 sc.stop() } }
3.12、cogroup
函數簽名:
def cogroup[W](other: RDD[(K, W)]): RDD[(K, (Iterable[V], Iterable[W]))]
函數說明:
在類型為(K,V)和(K,W)的 RDD 上調用,返回一個(K,(Iterable
使用樣例:
object Spark_Rdd_Operator_Transform28 { def main(args: Array[String]): Unit = { // 創建Spark上下文環境 val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator") val sc = new SparkContext(sparkConf) // 創建RDD val rdd1 = sc.makeRDD(Array((1, "a"), (2, "b"), (3, "c"), (4, "d"))) val rdd2 = sc.makeRDD(Array((1, 4), (2, 5), (3, 6))) rdd1.cogroup(rdd2).collect().foreach(println) // 關閉環境 sc.stop() } }
二、行動算子
2.1、reduce
函數簽名:
def reduce(f: (T, T) => T): T
函數說明:
聚集 RDD 中的所有元素,先聚合分區內數據,再聚合分區間數據
使用樣例:
object Spark_Rdd_Operator_Action01 { def main(args: Array[String]): Unit = { // 創建Spark上下文環境 val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator") val sc = new SparkContext(sparkConf) // 創建RDD val rdd = sc.makeRDD(List(1, 2, 3, 4)) // 聚合函數 val result = rdd.reduce(_ + _) // 數據并打印 println(result) // 關閉環境 sc.stop() } }
2.2、collect
函數簽名:
def collect(): Array[T]
函數說明:
在驅動程序中,以數組 Array 的形式返回數據集的所有元素
使用樣例:
object Spark_Rdd_Operator_Action02 { def main(args: Array[String]): Unit = { // 創建Spark上下文環境 val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator") val sc = new SparkContext(sparkConf) // 創建RDD val rdd = sc.makeRDD(List(1, 2, 3, 4)) // 采集數據并打印 rdd.collect().foreach(println) // 關閉環境 sc.stop() } }
2.3、count
函數簽名:
def count(): Long
函數說明:
返回 RDD 中元素的個數
使用樣例:
object Spark_Rdd_Operator_Action03 { def main(args: Array[String]): Unit = { // 創建Spark上下文環境 val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator") val sc = new SparkContext(sparkConf) // 創建RDD val rdd = sc.makeRDD(List(1, 2, 3, 4)) val count = rdd.count() // 打印數據 println(count) // 關閉環境 sc.stop() } }
2.4、first
函數簽名:
def first(): T
函數說明:
返回 RDD 中的第一個元素
使用樣例:
object Spark_Rdd_Operator_Action04 { def main(args: Array[String]): Unit = { // 創建Spark上下文環境 val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator") val sc = new SparkContext(sparkConf) // 創建RDD val rdd = sc.makeRDD(List(1, 2, 3, 4)) val first = rdd.first() // 打印數據 println(first) // 關閉環境 sc.stop() } }
2.5、take
函數簽名:
def take(num: Int): Array[T]
函數說明:
返回一個由 RDD 的前 n 個元素組成的數組
使用樣例:
object Spark_Rdd_Operator_Action05 { def main(args: Array[String]): Unit = { // 創建Spark上下文環境 val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator") val sc = new SparkContext(sparkConf) // 創建RDD val rdd = sc.makeRDD(List(1, 2, 3, 4)) val data = rdd.take(2) // 打印數據 data.foreach(println) // 關閉環境 sc.stop() } }
2.6、takeOrdered
函數簽名:
def takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T]
函數說明:
返回該 RDD 排序后的前 n 個元素組成的數組
使用樣例:
object Spark_Rdd_Operator_Action06 { def main(args: Array[String]): Unit = { // 創建Spark上下文環境 val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator") val sc = new SparkContext(sparkConf) // 創建RDD val rdd = sc.makeRDD(List(4, 3, 1, 2)) val data = rdd.takeOrdered(2) // 打印數據 data.foreach(println) // 關閉環境 sc.stop() } }
2.7、aggregate
函數簽名:
def aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U
函數說明:
分區的數據通過初始值和分區內的數據進行聚合,然后再和初始值進行分區間的數據聚合
使用樣例:
object Spark_Rdd_Operator_Action07 { def main(args: Array[String]): Unit = { // 創建Spark上下文環境 val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator") val sc = new SparkContext(sparkConf) // 創建RDD val rdd = sc.makeRDD(List(1, 2, 3, 4), 4) val data = rdd.aggregate(10)(_ + _, _ + _) // 打印數據 println(data) // 關閉環境 sc.stop() } }
2.8、fold
函數簽名:
def fold(zeroValue: T)(op: (T, T) => T): T
函數說明:
折疊操作, aggregate的簡化版操作
使用樣例:
object Spark_Rdd_Operator_Action08 { def main(args: Array[String]): Unit = { // 創建Spark上下文環境 val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator") val sc = new SparkContext(sparkConf) // 創建RDD val rdd = sc.makeRDD(List(1, 2, 3, 4), 4) val data = rdd.fold(10)(_ + _) // 打印數據 println(data) // 關閉環境 sc.stop() } }
2.9、countByKey
函數簽名:
def countByKey(): Map[K, Long]
函數說明:
統計每種 key 的個數
使用樣例:
object Spark_Rdd_Operator_Action09 { def main(args: Array[String]): Unit = { // 創建Spark上下文環境 val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator") val sc = new SparkContext(sparkConf) // 創建RDD val rdd = sc.makeRDD(List((1, "a"), (1, "a"), (1, "a"), (2, "b"), (3, "c"), (3, "c"))) val countRdd = rdd.countByKey() // 打印數據 countRdd.foreach(println) // 關閉環境 sc.stop() } }
2.10、save 相關算子
函數簽名:
def saveAsTextFile(path: String): Unit def saveAsObjectFile(path: String): Unit def saveAsSequenceFile(path: String, codec: Option[Class[_ <: CompressionCodec]] = None): Unit
函數說明:
將數據保存到不同格式的文件中
使用樣例:
object Spark_Rdd_Operator_Action10 { def main(args: Array[String]): Unit = { // 創建Spark上下文環境 val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator") val sc = new SparkContext(sparkConf) // 創建RDD val rdd = sc.makeRDD(List((1, "a"), (1, "a"), (1, "a"), (2, "b"), (3, "c"), (3, "c")), 2) // 保存成 Text 文件 rdd.saveAsTextFile("output1") // 序列化成對象保存到文件 rdd.saveAsObjectFile("output2") // 保存成 Sequencefile 文件 rdd.saveAsSequenceFile("output3") // 關閉環境 sc.stop() } }
2.11、foreach
函數簽名:
def foreach(f: T => Unit): Unit = withScope { val cleanF = sc.clean(f) sc.runJob(this, (iter: Iterator[T]) => iter.foreach(cleanF)) }
函數說明:
分布式遍歷 RDD 中的每一個元素,調用指定函數
使用樣例:
object Spark_Rdd_Operator_Action11 { def main(args: Array[String]): Unit = { // 創建Spark上下文環境 val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator") val sc = new SparkContext(sparkConf) // 創建RDD val rdd = sc.makeRDD(List(1, 2, 3, 4)) // 收集后打印 rdd.map(num => num).collect().foreach(println) println("****************") // 分布式打印 rdd.foreach(println) // 關閉環境 sc.stop() } }
spark
版權聲明:本文內容由網絡用戶投稿,版權歸原作者所有,本站不擁有其著作權,亦不承擔相應法律責任。如果您發現本站中有涉嫌抄襲或描述失實的內容,請聯系我們jiasou666@gmail.com 處理,核實后本網站將在24小時內刪除侵權內容。