2021年大數(shù)據(jù)Spark(二十五):SparkSQL的RDD、DF、DS相關(guān)操作

      網(wǎng)友投稿 1251 2025-04-05

      目錄

      RDD、DF、DS相關(guān)操作

      SparkSQL初體驗(yàn)

      SparkSession 應(yīng)用入口

      獲取DataFrame/DataSet

      使用樣例類

      指定類型+列名

      自定義Schema

      RDD、DF、DS相互轉(zhuǎn)換

      RDD、DF、DS相關(guān)操作

      SparkSQL初體驗(yàn)

      Spark 2.0開(kāi)始,SparkSQL應(yīng)用程序入口為SparkSession,加載不同數(shù)據(jù)源的數(shù)據(jù),封裝到DataFrame/Dataset集合數(shù)據(jù)結(jié)構(gòu)中,使得編程更加簡(jiǎn)單,程序運(yùn)行更加快速高效。

      SparkSession 應(yīng)用入口

      SparkSession:這是一個(gè)新入口,取代了原本的SQLContext與HiveContext。對(duì)于DataFrame API的用戶來(lái)說(shuō),Spark常見(jiàn)的混亂源頭來(lái)自于使用哪個(gè)“context”。現(xiàn)在使用SparkSession,它作為單個(gè)入口可以兼容兩者,注意原本的SQLContext與HiveContext仍然保留,以支持向下兼容。

      文檔:

      http://spark.apache.org/docs/2.4.5/sql-getting-started.html#starting-point-sparksession

      1)、添加MAVEN依賴

      org.apache.spark

      spark-sql_2.11

      2.4.5

      2)、SparkSession對(duì)象實(shí)例通過(guò)建造者模式構(gòu)建,代碼如下:

      其中①表示導(dǎo)入SparkSession所在的包,②表示建造者模式構(gòu)建對(duì)象和設(shè)置屬性,③表示導(dǎo)入SparkSession類中implicits對(duì)象object中隱式轉(zhuǎn)換函數(shù)。

      3)、范例演示:構(gòu)建SparkSession實(shí)例,加載文本數(shù)據(jù),統(tǒng)計(jì)條目數(shù)。

      package cn.itcast.sql

      import org.apache.spark.SparkContext

      import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}

      /**

      * Author itcast

      * Desc 演示SparkSQL

      */

      object SparkSQLDemo00_hello {

      def main(args: Array[String]): Unit = {

      //1.準(zhǔn)備SparkSQL開(kāi)發(fā)環(huán)境

      println(this.getClass.getSimpleName)

      println(this.getClass.getSimpleName.stripSuffix("$"))

      val spark: SparkSession = SparkSession.builder().appName(this.getClass.getSimpleName.stripSuffix("$")).master("local[*]").getOrCreate()

      val sc: SparkContext = spark.sparkContext

      sc.setLogLevel("WARN")

      val df1: DataFrame = spark.read.text("data/input/text")

      val df2: DataFrame = spark.read.json("data/input/json")

      val df3: DataFrame = spark.read.csv("data/input/csv")

      val df4: DataFrame = spark.read.parquet("data/input/parquet")

      df1.printSchema()

      df1.show(false)

      df2.printSchema()

      df2.show(false)

      df3.printSchema()

      df3.show(false)

      df4.printSchema()

      df4.show(false)

      df1.coalesce(1).write.mode(SaveMode.Overwrite).text("data/output/text")

      df2.coalesce(1).write.mode(SaveMode.Overwrite).json("data/output/json")

      df3.coalesce(1).write.mode(SaveMode.Overwrite).csv("data/output/csv")

      df4.coalesce(1).write.mode(SaveMode.Overwrite).parquet("data/output/parquet")

      //關(guān)閉資源

      sc.stop()

      spark.stop()

      }

      }

      使用SparkSession加載數(shù)據(jù)源數(shù)據(jù),將其封裝到DataFrame或Dataset中,直接使用show函數(shù)就可以顯示樣本數(shù)據(jù)(默認(rèn)顯示前20條)。

      Spark2.0使用全新的SparkSession接口替代Spark1.6中的SQLContext及HiveContext接口來(lái)實(shí)現(xiàn)其對(duì)數(shù)據(jù)加載、轉(zhuǎn)換、處理等功能。SparkSession實(shí)現(xiàn)了SQLContext及HiveContext所有功能。 SparkSession支持從不同的數(shù)據(jù)源加載數(shù)據(jù),并把數(shù)據(jù)轉(zhuǎn)換成DataFrame,并且支持把DataFrame轉(zhuǎn)換成SQLContext自身中的表,然后使用SQL語(yǔ)句來(lái)操作數(shù)據(jù)。SparkSession亦提供了HiveQL以及其他依賴于Hive的功能的支持。

      獲取DataFrame/DataSet

      實(shí)際項(xiàng)目開(kāi)發(fā)中,往往需要將RDD數(shù)據(jù)集轉(zhuǎn)換為DataFrame,本質(zhì)上就是給RDD加上Schema信息,官方提供兩種方式:類型推斷和自定義Schema。

      官方文檔:

      http://spark.apache.org/docs/2.4.5/sql-getting-started.html#interoperating-with-rdds

      使用樣例類

      當(dāng)RDD中數(shù)據(jù)類型CaseClass樣例類時(shí),通過(guò)反射Reflecttion獲取屬性名稱和類型,構(gòu)建Schema,應(yīng)用到RDD數(shù)據(jù)集,將其轉(zhuǎn)換為DataFrame。

      package cn.itcast.sql

      import org.apache.spark.SparkContext

      import org.apache.spark.rdd.RDD

      import org.apache.spark.sql.{DataFrame, SparkSession}

      /**

      * Author itcast

      * Desc 演示基于RDD創(chuàng)建DataFrame--使用樣例類

      */

      object CreateDataFrameDemo1 {

      case class Person(id:Int,name:String,age:Int)

      def main(args: Array[String]): Unit = {

      //1.準(zhǔn)備環(huán)境-SparkSession

      val spark: SparkSession = SparkSession.builder().appName("SparkSQL").master("local[*]").getOrCreate()

      val sc: SparkContext = spark.sparkContext

      sc.setLogLevel("WARN")

      //2.加載數(shù)據(jù)

      val lines: RDD[String] = sc.textFile("data/input/person.txt")

      //3.切割

      //val value: RDD[String] = lines.flatMap(_.split(" "))//錯(cuò)誤的

      val linesArrayRDD: RDD[Array[String]] = lines.map(_.split(" "))

      //4.將每一行(每一個(gè)Array)轉(zhuǎn)為樣例類(相當(dāng)于添加了Schema)

      val personRDD: RDD[Person] = linesArrayRDD.map(arr=>Person(arr(0).toInt,arr(1),arr(2).toInt))

      //5.將RDD轉(zhuǎn)為DataFrame(DF)

      //注意:RDD的API中沒(méi)有toDF方法,需要導(dǎo)入隱式轉(zhuǎn)換!

      import spark.implicits._

      val personDF: DataFrame = personRDD.toDF

      //6.查看約束

      personDF.printSchema()

      //7.查看分布式表中的數(shù)據(jù)集

      personDF.show(6,false)//false表示不截?cái)嗔忻?也就是列名很長(zhǎng)的時(shí)候不會(huì)用...代替

      }

      }

      此種方式要求RDD數(shù)據(jù)類型必須為CaseClass,轉(zhuǎn)換的DataFrame中字段名稱就是CaseClass中屬性名稱。

      指定類型+列名

      除了上述兩種方式將RDD轉(zhuǎn)換為DataFrame以外,SparkSQL中提供一個(gè)函數(shù):toDF,通過(guò)指定列名稱,將數(shù)據(jù)類型為元組的RDD或Seq轉(zhuǎn)換為DataFrame,實(shí)際開(kāi)發(fā)中也常常使用。

      package cn.itcast.sql

      import org.apache.spark.SparkContext

      import org.apache.spark.rdd.RDD

      import org.apache.spark.sql.{DataFrame, SparkSession}

      /**

      * Author itcast

      * Desc 演示基于RDD創(chuàng)建DataFrame--使用類型加列名

      */

      object CreateDataFrameDemo2 {

      def main(args: Array[String]): Unit = {

      //1.準(zhǔn)備環(huán)境-SparkSession

      val spark: SparkSession = SparkSession.builder().appName("SparkSQL").master("local[*]").getOrCreate()

      val sc: SparkContext = spark.sparkContext

      sc.setLogLevel("WARN")

      //2.加載數(shù)據(jù)

      val lines: RDD[String] = sc.textFile("data/input/person.txt")

      //3.切割

      //val value: RDD[String] = lines.flatMap(_.split(" "))//錯(cuò)誤的

      val linesArrayRDD: RDD[Array[String]] = lines.map(_.split(" "))

      //4.將每一行(每一個(gè)Array)轉(zhuǎn)為三元組(相當(dāng)于有了類型!)

      val personWithColumnsTypeRDD: RDD[(Int, String, Int)] = linesArrayRDD.map(arr=>(arr(0).toInt,arr(1),arr(2).toInt))

      //5.將RDD轉(zhuǎn)為DataFrame(DF)并指定列名

      //注意:RDD的API中沒(méi)有toDF方法,需要導(dǎo)入隱式轉(zhuǎn)換!

      import spark.implicits._

      val personDF: DataFrame = personWithColumnsTypeRDD.toDF("id","name","age")

      //6.查看約束

      personDF.printSchema()

      //7.查看分布式表中的數(shù)據(jù)集

      personDF.show(6,false)//false表示不截?cái)嗔忻?也就是列名很長(zhǎng)的時(shí)候不會(huì)用...代替

      }

      }

      自定義Schema

      依據(jù)RDD中數(shù)據(jù)自定義Schema,類型為StructType,每個(gè)字段的約束使用StructField定義,具體步驟如下:

      第一步、RDD中數(shù)據(jù)類型為Row:RDD[Row];

      第二步、針對(duì)Row中數(shù)據(jù)定義Schema:StructType;

      第三步、使用SparkSession中方法將定義的Schema應(yīng)用到RDD[Row]上;

      package cn.itcast.sql

      import org.apache.spark.SparkContext

      import org.apache.spark.rdd.RDD

      import org.apache.spark.sql.types.{IntegerType, LongType, StringType, StructField, StructType}

      import org.apache.spark.sql.{DataFrame, Row, SparkSession}

      /**

      * Author itcast

      * Desc 演示基于RDD創(chuàng)建DataFrame--使用StructType

      */

      object CreateDataFrameDemo3 {

      def main(args: Array[String]): Unit = {

      2021年大數(shù)據(jù)Spark(二十五):SparkSQL的RDD、DF、DS相關(guān)操作

      //1.準(zhǔn)備環(huán)境-SparkSession

      val spark: SparkSession = SparkSession.builder().appName("SparkSQL").master("local[*]").getOrCreate()

      val sc: SparkContext = spark.sparkContext

      sc.setLogLevel("WARN")

      //2.加載數(shù)據(jù)

      val lines: RDD[String] = sc.textFile("data/input/person.txt")

      //3.切割

      //val value: RDD[String] = lines.flatMap(_.split(" "))//錯(cuò)誤的

      val linesArrayRDD: RDD[Array[String]] = lines.map(_.split(" "))

      //4.將每一行(每一個(gè)Array)轉(zhuǎn)為Row

      val rowRDD: RDD[Row] = linesArrayRDD.map(arr=>Row(arr(0).toInt,arr(1),arr(2).toInt))

      //5.將RDD轉(zhuǎn)為DataFrame(DF)并指定列名

      //注意:RDD的API中沒(méi)有toDF方法,需要導(dǎo)入隱式轉(zhuǎn)換!

      import spark.implicits._

      /*val schema: StructType = StructType(

      StructField("id", IntegerType, false) ::

      StructField("name", StringType, false) ::

      StructField("age", IntegerType, false) :: Nil)*/

      val schema: StructType = StructType(List(

      StructField("id", IntegerType, false),

      StructField("name", StringType, false),

      StructField("age", IntegerType, false)

      ))

      val personDF: DataFrame = spark.createDataFrame(rowRDD,schema)

      //6.查看約束

      personDF.printSchema()

      //7.查看分布式表中的數(shù)據(jù)集

      personDF.show(6,false)//false表示不截?cái)嗔忻?也就是列名很長(zhǎng)的時(shí)候不會(huì)用...代替

      }

      }

      此種方式可以更加體會(huì)到DataFrame = RDD[Row] + Schema組成,在實(shí)際項(xiàng)目開(kāi)發(fā)中靈活的選擇方式將RDD轉(zhuǎn)換為DataFrame。

      RDD、DF、DS相互轉(zhuǎn)換

      實(shí)際項(xiàng)目開(kāi)發(fā)中,常常需要對(duì)RDD、DataFrame及Dataset之間相互轉(zhuǎn)換,其中要點(diǎn)就是Schema約束結(jié)構(gòu)信息。

      1)、RDD轉(zhuǎn)換DataFrame或者Dataset

      轉(zhuǎn)換DataFrame時(shí),定義Schema信息,兩種方式

      轉(zhuǎn)換為Dataset時(shí),不僅需要Schema信息,還需要RDD數(shù)據(jù)類型為CaseClass類型

      2)、Dataset或DataFrame轉(zhuǎn)換RDD

      由于Dataset或DataFrame底層就是RDD,所以直接調(diào)用rdd函數(shù)即可轉(zhuǎn)換

      dataframe.rdd 或者dataset.rdd

      3)、DataFrame與Dataset之間轉(zhuǎn)換

      由于DataFrame為Dataset特例,所以Dataset直接調(diào)用toDF函數(shù)轉(zhuǎn)換為DataFrame

      當(dāng)將DataFrame轉(zhuǎn)換為Dataset時(shí),使用函數(shù)as[Type],指定CaseClass類型即可。

      RDD、DataFrame和DataSet之間的轉(zhuǎn)換如下,假設(shè)有個(gè)樣例類:case?class?Emp(name:?String),相互轉(zhuǎn)換

      RDD轉(zhuǎn)換到DataFrame:rdd.toDF(“name”)

      RDD轉(zhuǎn)換到Dataset:rdd.map(x => Emp(x)).toDS

      DataFrame轉(zhuǎn)換到Dataset:df.as[Emp]

      DataFrame轉(zhuǎn)換到RDD:df.rdd

      Dataset轉(zhuǎn)換到DataFrame:ds.toDF

      Dataset轉(zhuǎn)換到RDD:ds.rdd

      注意:

      RDD與DataFrame或者DataSet進(jìn)行操作,都需要引入隱式轉(zhuǎn)換import spark.implicits._,其中的spark是SparkSession對(duì)象的名稱!

      package cn.itcast.sql

      import org.apache.spark.SparkContext

      import org.apache.spark.rdd.RDD

      import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}

      /**

      * Author itcast

      * Desc 演示基于RDD/DataFrame/DataSet三者之間的相互轉(zhuǎn)換

      */

      object TransformationDemo {

      case class Person(id:Int,name:String,age:Int)

      def main(args: Array[String]): Unit = {

      //1.準(zhǔn)備環(huán)境-SparkSession

      val spark: SparkSession = SparkSession.builder().appName("SparkSQL").master("local[*]").getOrCreate()

      val sc: SparkContext = spark.sparkContext

      sc.setLogLevel("WARN")

      //2.加載數(shù)據(jù)

      val lines: RDD[String] = sc.textFile("data/input/person.txt")

      //3.切割

      //val value: RDD[String] = lines.flatMap(_.split(" "))//錯(cuò)誤的

      val linesArrayRDD: RDD[Array[String]] = lines.map(_.split(" "))

      //4.將每一行(每一個(gè)Array)轉(zhuǎn)為樣例類(相當(dāng)于添加了Schema)

      val personRDD: RDD[Person] = linesArrayRDD.map(arr=>Person(arr(0).toInt,arr(1),arr(2).toInt))

      //5.將RDD轉(zhuǎn)為DataFrame(DF)

      //注意:RDD的API中沒(méi)有toDF方法,需要導(dǎo)入隱式轉(zhuǎn)換!

      import spark.implicits._

      //轉(zhuǎn)換1:rdd-->df

      val personDF: DataFrame = personRDD.toDF //注意:DataFrame沒(méi)有泛型

      //轉(zhuǎn)換2:rdd-->ds

      val personDS: Dataset[Person] = personRDD.toDS() //注意:Dataset具有泛型

      //轉(zhuǎn)換3:df-->rdd

      val rdd: RDD[Row] = personDF.rdd //注意:DataFrame沒(méi)有泛型,也就是不知道里面是Person,所以轉(zhuǎn)為rdd之后統(tǒng)一的使用Row表示里面是很多行

      //轉(zhuǎn)換4:ds-->rdd

      val rdd1: RDD[Person] = personDS.rdd //注意:Dataset具有泛型,所以轉(zhuǎn)為rdd之后還有原來(lái)泛型!

      //轉(zhuǎn)換5:ds-->df

      val dataFrame: DataFrame = personDS.toDF()

      //轉(zhuǎn)換5:df-->ds

      val personDS2: Dataset[Person] = personDF.as[Person]

      //目前DataFrame和DataSet使用類似,如:也有show/createOrReplaceTempView/select

      personDS.show()

      personDS.createOrReplaceTempView("t_person")

      personDS.select("name").show()

      }

      }

      spark 大數(shù)據(jù)

      版權(quán)聲明:本文內(nèi)容由網(wǎng)絡(luò)用戶投稿,版權(quán)歸原作者所有,本站不擁有其著作權(quán),亦不承擔(dān)相應(yīng)法律責(zé)任。如果您發(fā)現(xiàn)本站中有涉嫌抄襲或描述失實(shí)的內(nèi)容,請(qǐng)聯(lián)系我們jiasou666@gmail.com 處理,核實(shí)后本網(wǎng)站將在24小時(shí)內(nèi)刪除侵權(quán)內(nèi)容。

      版權(quán)聲明:本文內(nèi)容由網(wǎng)絡(luò)用戶投稿,版權(quán)歸原作者所有,本站不擁有其著作權(quán),亦不承擔(dān)相應(yīng)法律責(zé)任。如果您發(fā)現(xiàn)本站中有涉嫌抄襲或描述失實(shí)的內(nèi)容,請(qǐng)聯(lián)系我們jiasou666@gmail.com 處理,核實(shí)后本網(wǎng)站將在24小時(shí)內(nèi)刪除侵權(quán)內(nèi)容。

      上一篇:如何刪除空白檔(怎樣把空白文檔刪掉)
      下一篇:如何查找本版編輯文字(之前編輯的文檔怎么查找)
      相關(guān)文章
      亚洲人成高清在线播放| 亚洲av无码无在线观看红杏| 亚洲 自拍 另类小说综合图区| 伊人久久综在合线亚洲2019| 亚洲AV无码一区东京热久久 | 亚洲乱码中文字幕手机在线| 久久精品国产亚洲av品善| 亚洲精品伊人久久久久| 最新亚洲精品国偷自产在线| 亚洲毛片无码专区亚洲乱| 亚洲丝袜美腿视频| 亚洲黑人嫩小videos| 亚洲中文字幕久久精品无码喷水 | 亚洲国产成人久久三区| 亚洲国产美女福利直播秀一区二区| 337p日本欧洲亚洲大胆艺术| 亚洲明星合成图综合区在线| 亚洲精品线在线观看| 亚洲综合另类小说色区| 亚洲精品国产精品乱码在线观看| 国产亚洲福利一区二区免费看 | 亚洲AV无码国产精品麻豆天美| 亚洲精品国偷自产在线| 亚洲国产一区二区三区青草影视| 亚洲精品视频免费在线观看| 亚洲国产精品成人久久| 国产亚洲av片在线观看18女人| 国产亚洲精品自在线观看| 国产亚洲精品精华液| 亚洲成AV人在线播放无码| 日韩亚洲Av人人夜夜澡人人爽| 久久亚洲精品无码AV红樱桃| 亚洲美女视频网址| 亚洲一日韩欧美中文字幕在线| 老司机亚洲精品影院在线观看| 亚洲AⅤ视频一区二区三区| 国产亚洲成人在线播放va| 亚洲av无码一区二区乱子伦as| 亚洲天堂男人天堂| 麻豆狠色伊人亚洲综合网站| 亚洲av无码偷拍在线观看|