2021年數據Spark(二十七):SparkSQL案例一花式查詢和案例二WordCount

      網友投稿 1308 2022-05-30

      目錄

      案例一:花式查詢

      案例二:WordCount

      基于DSL編程

      基于SQL編程

      具體演示代碼如下:

      案例一:花式查詢

      package cn.itcast.sql

      import org.apache.spark.SparkContext

      import org.apache.spark.rdd.RDD

      import org.apache.spark.sql.{DataFrame, SparkSession}

      /**

      * Author itcast

      * Desc 演示SparkSQL的各種花式查詢

      */

      object FlowerQueryDemo {

      case class Person(id:Int,name:String,age:Int)

      def main(args: Array[String]): Unit = {

      //1.準備環境-SparkSession

      val spark: SparkSession = SparkSession.builder().appName("SparkSQL").master("local[*]").getOrCreate()

      val sc: SparkContext = spark.sparkContext

      sc.setLogLevel("WARN")

      //2.加載數據

      val lines: RDD[String] = sc.textFile("data/input/person.txt")

      //3.切割

      //val value: RDD[String] = lines.flatMap(_.split(" "))//錯誤的

      val linesArrayRDD: RDD[Array[String]] = lines.map(_.split(" "))

      //4.將每一行(每一個Array)轉為樣例類(相當于添加了Schema)

      val personRDD: RDD[Person] = linesArrayRDD.map(arr=>Person(arr(0).toInt,arr(1),arr(2).toInt))

      //5.將RDD轉為DataFrame(DF)

      //注意:RDD的API中沒有toDF方法,需要導入隱式轉換!

      import spark.implicits._

      val personDF: DataFrame = personRDD.toDF

      //6.查看約束

      personDF.printSchema()

      //7.查看分布式表中的數據集

      personDF.show(6,false)//false表示不截斷列名,也就是列名很長的時候不會用...代替

      //演示SQL風格查詢

      //0.注冊表名

      //personDF.registerTempTable("t_person")//已經過時

      //personDF.createTempView("t_person")//創建表,如果已存在則報錯:TempTableAlreadyExistsException

      //personDF.createOrReplaceGlobalTempView("t_person")//創建全局表,可以夸session使用,查詢的時候使用:SELECT * FROM global_temp.表名;生命周期太大,一般不用

      personDF.createOrReplaceTempView("t_person")//創建一個臨時表,只有當前session可用!且表如果存在會替換!

      //1.查看name字段的數據

      spark.sql("select name from t_person").show

      //2.查看?name 和age字段數據

      spark.sql("select name,age from t_person").show

      //3.查詢所有的name和age,并將age+1

      spark.sql("select name,age,age+1 from t_person").show

      //4.過濾age大于等于25的

      spark.sql("select name,age from t_person where age >=25").show

      //5.統計年齡大于30的人數

      spark.sql("select count(age) from t_person where age >30").show

      //6.按年齡進行分組并統計相同年齡的人數

      spark.sql("select age,count(age) from t_person group by age").show

      //演示DSL風格查詢

      //1.查看name字段的數據

      import org.apache.spark.sql.functions._

      personDF.select(personDF.col("name")).show

      personDF.select(personDF("name")).show

      personDF.select(col("name")).show

      personDF.select("name").show

      //2.查看?name 和age字段數據

      personDF.select(personDF.col("name"),personDF.col("age")).show

      personDF.select("name","age").show

      //3.查詢所有的name和age,并將age+1

      //personDF.select("name","age","age+1").show//錯誤,沒有age+1這一列

      //personDF.select("name","age","age"+1).show//錯誤,沒有age1這一列

      personDF.select(col("name"),col("age"),col("age")+1).show

      personDF.select($"name",$"age",$"age"+1).show

      //$表示將"age"變為了列對象,先查詢再和+1進行計算

      personDF.select('name,'age,'age+1).show

      //'表示將age變為了列對象,先查詢再和+1進行計算

      //4.過濾age大于等于25的,使用filter方法/where方法過濾

      personDF.select("name","age").filter("age>=25").show

      personDF.select("name","age").where("age>=25").show

      //5.統計年齡大于30的人數

      personDF.where("age>30").count()

      //6.按年齡進行分組并統計相同年齡的人數

      personDF.groupBy("age").count().show

      }

      }

      案例二:WordCount

      前面使用RDD封裝數據,實現詞頻統計WordCount功能,從Spark 1.0開始,一直到Spark 2.0,建立在RDD之上的一種新的數據結構DataFrame/Dataset發展而來,更好的實現數據處理分析。DataFrame 數據結構相當于給RDD加上約束Schema,知道數據內部結構(字段名稱、字段類型),提供兩種方式分析處理數據:DataFrame API(DSL編程)和SQL(類似HiveQL編程),下面以WordCount程序為例編程實現,體驗DataFrame使用。

      基于DSL編程

      使用SparkSession加載文本數據,封裝到Dataset/DataFrame中,調用API函數處理分析數據(類似RDD中API函數,如flatMap、map、filter等),編程步驟:

      第一步、構建SparkSession實例對象,設置應用名稱和運行本地模式;

      第二步、讀取HDFS上文本文件數據;

      第三步、使用DSL(Dataset?API),類似RDD?API處理分析數據;

      第四步、控制臺打印結果數據和關閉SparkSession;

      基于SQL編程

      也可以實現類似HiveQL方式進行詞頻統計,直接對單詞分組group by,再進行count即可,步驟如下:

      第一步、構建SparkSession對象,加載文件數據,分割每行數據為單詞;

      第二步、將DataFrame/Dataset注冊為臨時視圖(Spark 1.x中為臨時表);

      第三步、編寫SQL語句,使用SparkSession執行獲取結果;

      第四步、控制臺打印結果數據和關閉SparkSession;

      具體演示代碼如下:

      package cn.itcast.sql

      import org.apache.spark.SparkContext

      import org.apache.spark.rdd.RDD

      import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}

      /**

      * Author itcast

      * Desc 使用SparkSQL完成WordCount---SQL風格和DSL風格

      */

      object WordCount {

      def main(args: Array[String]): Unit = {

      //1.準備環境

      val spark: SparkSession = SparkSession.builder().appName("WordCount").master("local[*]").getOrCreate()

      2021年大數據Spark(二十七):SparkSQL案例一花式查詢和案例二WordCount

      val sc: SparkContext = spark.sparkContext

      sc.setLogLevel("WARN")

      import spark.implicits._

      //2.加載數據

      //val rdd: RDD[String] = sc.textFile("data/input/words.txt")//可以使用該方式,然后使用昨天的知識將rdd轉為df/ds

      val df: DataFrame = spark.read.text("data/input/words.txt")

      val ds: Dataset[String] = spark.read.textFile("data/input/words.txt")

      //df.show()//查看分布式表數據

      //ds.show()//查看分布式表數據

      //3.做WordCount

      //切割

      //df.flatMap(_.split(" ")) //注意:直接這樣寫報錯!因為df沒有泛型,不知道_是String!

      //df.flatMap(row=>row.getAs[String]("value").split(" "))

      val wordsDS: Dataset[String] = ds.flatMap(_.split(" "))

      //wordsDS.show()

      //使用SQL風格做WordCount

      wordsDS.createOrReplaceTempView("t_words")

      val sql:String =

      """

      |select value,count(*) as count

      |from t_words

      |group by value

      |order by count desc

      |""".stripMargin

      spark.sql(sql).show()

      //使用DSL風格做WordCount

      wordsDS

      .groupBy("value")

      .count()

      .orderBy($"count".desc)

      .show()

      /*

      +-----+-----+

      |value|count|

      +-----+-----+

      |hello| ???4|

      | ?her| ???3|

      | ?you| ???2|

      | ??me| ???1|

      +-----+-----+

      +-----+-----+

      |value|count|

      +-----+-----+

      |hello| ???4|

      | ?her| ???3|

      | ?you| ???2|

      | ??me| ???1|

      +-----+-----+

      */

      }

      }

      無論使用DSL還是SQL編程方式,底層轉換為RDD操作都是一樣,性能一致,查看WEB UI監控中Job運行對應的DAG圖如下:

      從上述的案例可以發現將數據封裝到Dataset/DataFrame中,進行處理分析,更加方便簡潔,這就是Spark框架中針對結構化數據處理模:Spark SQL模塊。

      官方文檔:

      http://spark.apache.org/sql/

      spark 大數據

      版權聲明:本文內容由網絡用戶投稿,版權歸原作者所有,本站不擁有其著作權,亦不承擔相應法律責任。如果您發現本站中有涉嫌抄襲或描述失實的內容,請聯系我們jiasou666@gmail.com 處理,核實后本網站將在24小時內刪除侵權內容。

      上一篇:Python命名空間和作用域淺析
      下一篇:2021年大數據Spark(十):環境搭建集群模式 Spark on YARN
      相關文章
      www.亚洲日本| 亚洲国产成人精品无码一区二区| www.亚洲日本| 亚洲人xxx日本人18| 亚洲成年人电影网站| 91亚洲国产成人久久精品网站| 亚洲av无码成h人动漫无遮挡 | 亚洲欧美日韩中文字幕一区二区三区 | 亚洲熟女www一区二区三区| 亚洲性线免费观看视频成熟| 亚洲一卡二卡三卡四卡无卡麻豆 | 伊人久久综在合线亚洲91| 国产精品亚洲mnbav网站| 色久悠悠婷婷综合在线亚洲| 亚洲熟妇无码八AV在线播放| 亚洲日韩精品无码专区网址| 久久精品夜色国产亚洲av| 亚洲伦理一区二区| 亚洲精品视频免费看| 亚洲人成电影院在线观看| 国产婷婷综合丁香亚洲欧洲| 亚洲色大情网站www| 亚洲JIZZJIZZ妇女| 亚洲国产电影av在线网址| 亚洲精品视频免费| 中文国产成人精品久久亚洲精品AⅤ无码精品 | 亚洲成色www久久网站夜月| 久久久无码精品亚洲日韩蜜桃 | 亚洲精品韩国美女在线| 亚洲国产系列一区二区三区 | 亚洲国产精品自在线一区二区| 亚洲黄色免费在线观看| 亚洲国产一区在线观看| 亚洲色大情网站www| www.亚洲精品.com| 亚洲午夜福利717| 91情国产l精品国产亚洲区| 亚洲综合校园春色| 亚洲 小说区 图片区 都市| 国产亚洲精品看片在线观看 | 亚洲?v女人的天堂在线观看|