亞寵展、全球寵物產業風向標——亞洲寵物展覽會深度解析
1308
2022-05-30
目錄
案例一:花式查詢
案例二:WordCount
基于DSL編程
基于SQL編程
具體演示代碼如下:
案例一:花式查詢
package cn.itcast.sql
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, SparkSession}
/**
* Author itcast
* Desc 演示SparkSQL的各種花式查詢
*/
object FlowerQueryDemo {
case class Person(id:Int,name:String,age:Int)
def main(args: Array[String]): Unit = {
//1.準備環境-SparkSession
val spark: SparkSession = SparkSession.builder().appName("SparkSQL").master("local[*]").getOrCreate()
val sc: SparkContext = spark.sparkContext
sc.setLogLevel("WARN")
//2.加載數據
val lines: RDD[String] = sc.textFile("data/input/person.txt")
//3.切割
//val value: RDD[String] = lines.flatMap(_.split(" "))//錯誤的
val linesArrayRDD: RDD[Array[String]] = lines.map(_.split(" "))
//4.將每一行(每一個Array)轉為樣例類(相當于添加了Schema)
val personRDD: RDD[Person] = linesArrayRDD.map(arr=>Person(arr(0).toInt,arr(1),arr(2).toInt))
//5.將RDD轉為DataFrame(DF)
//注意:RDD的API中沒有toDF方法,需要導入隱式轉換!
import spark.implicits._
val personDF: DataFrame = personRDD.toDF
//6.查看約束
personDF.printSchema()
//7.查看分布式表中的數據集
personDF.show(6,false)//false表示不截斷列名,也就是列名很長的時候不會用...代替
//演示SQL風格查詢
//0.注冊表名
//personDF.registerTempTable("t_person")//已經過時
//personDF.createTempView("t_person")//創建表,如果已存在則報錯:TempTableAlreadyExistsException
//personDF.createOrReplaceGlobalTempView("t_person")//創建全局表,可以夸session使用,查詢的時候使用:SELECT * FROM global_temp.表名;生命周期太大,一般不用
personDF.createOrReplaceTempView("t_person")//創建一個臨時表,只有當前session可用!且表如果存在會替換!
//1.查看name字段的數據
spark.sql("select name from t_person").show
//2.查看?name 和age字段數據
spark.sql("select name,age from t_person").show
//3.查詢所有的name和age,并將age+1
spark.sql("select name,age,age+1 from t_person").show
//4.過濾age大于等于25的
spark.sql("select name,age from t_person where age >=25").show
//5.統計年齡大于30的人數
spark.sql("select count(age) from t_person where age >30").show
//6.按年齡進行分組并統計相同年齡的人數
spark.sql("select age,count(age) from t_person group by age").show
//演示DSL風格查詢
//1.查看name字段的數據
import org.apache.spark.sql.functions._
personDF.select(personDF.col("name")).show
personDF.select(personDF("name")).show
personDF.select(col("name")).show
personDF.select("name").show
//2.查看?name 和age字段數據
personDF.select(personDF.col("name"),personDF.col("age")).show
personDF.select("name","age").show
//3.查詢所有的name和age,并將age+1
//personDF.select("name","age","age+1").show//錯誤,沒有age+1這一列
//personDF.select("name","age","age"+1).show//錯誤,沒有age1這一列
personDF.select(col("name"),col("age"),col("age")+1).show
personDF.select($"name",$"age",$"age"+1).show
//$表示將"age"變為了列對象,先查詢再和+1進行計算
personDF.select('name,'age,'age+1).show
//'表示將age變為了列對象,先查詢再和+1進行計算
//4.過濾age大于等于25的,使用filter方法/where方法過濾
personDF.select("name","age").filter("age>=25").show
personDF.select("name","age").where("age>=25").show
//5.統計年齡大于30的人數
personDF.where("age>30").count()
//6.按年齡進行分組并統計相同年齡的人數
personDF.groupBy("age").count().show
}
}
案例二:WordCount
前面使用RDD封裝數據,實現詞頻統計WordCount功能,從Spark 1.0開始,一直到Spark 2.0,建立在RDD之上的一種新的數據結構DataFrame/Dataset發展而來,更好的實現數據處理分析。DataFrame 數據結構相當于給RDD加上約束Schema,知道數據內部結構(字段名稱、字段類型),提供兩種方式分析處理數據:DataFrame API(DSL編程)和SQL(類似HiveQL編程),下面以WordCount程序為例編程實現,體驗DataFrame使用。
基于DSL編程
使用SparkSession加載文本數據,封裝到Dataset/DataFrame中,調用API函數處理分析數據(類似RDD中API函數,如flatMap、map、filter等),編程步驟:
第一步、構建SparkSession實例對象,設置應用名稱和運行本地模式;
第二步、讀取HDFS上文本文件數據;
第三步、使用DSL(Dataset?API),類似RDD?API處理分析數據;
第四步、控制臺打印結果數據和關閉SparkSession;
基于SQL編程
也可以實現類似HiveQL方式進行詞頻統計,直接對單詞分組group by,再進行count即可,步驟如下:
第一步、構建SparkSession對象,加載文件數據,分割每行數據為單詞;
第二步、將DataFrame/Dataset注冊為臨時視圖(Spark 1.x中為臨時表);
第三步、編寫SQL語句,使用SparkSession執行獲取結果;
第四步、控制臺打印結果數據和關閉SparkSession;
具體演示代碼如下:
package cn.itcast.sql
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
/**
* Author itcast
* Desc 使用SparkSQL完成WordCount---SQL風格和DSL風格
*/
object WordCount {
def main(args: Array[String]): Unit = {
//1.準備環境
val spark: SparkSession = SparkSession.builder().appName("WordCount").master("local[*]").getOrCreate()
val sc: SparkContext = spark.sparkContext
sc.setLogLevel("WARN")
import spark.implicits._
//2.加載數據
//val rdd: RDD[String] = sc.textFile("data/input/words.txt")//可以使用該方式,然后使用昨天的知識將rdd轉為df/ds
val df: DataFrame = spark.read.text("data/input/words.txt")
val ds: Dataset[String] = spark.read.textFile("data/input/words.txt")
//df.show()//查看分布式表數據
//ds.show()//查看分布式表數據
//3.做WordCount
//切割
//df.flatMap(_.split(" ")) //注意:直接這樣寫報錯!因為df沒有泛型,不知道_是String!
//df.flatMap(row=>row.getAs[String]("value").split(" "))
val wordsDS: Dataset[String] = ds.flatMap(_.split(" "))
//wordsDS.show()
//使用SQL風格做WordCount
wordsDS.createOrReplaceTempView("t_words")
val sql:String =
"""
|select value,count(*) as count
|from t_words
|group by value
|order by count desc
|""".stripMargin
spark.sql(sql).show()
//使用DSL風格做WordCount
wordsDS
.groupBy("value")
.count()
.orderBy($"count".desc)
.show()
/*
+-----+-----+
|value|count|
+-----+-----+
|hello| ???4|
| ?her| ???3|
| ?you| ???2|
| ??me| ???1|
+-----+-----+
+-----+-----+
|value|count|
+-----+-----+
|hello| ???4|
| ?her| ???3|
| ?you| ???2|
| ??me| ???1|
+-----+-----+
*/
}
}
無論使用DSL還是SQL編程方式,底層轉換為RDD操作都是一樣,性能一致,查看WEB UI監控中Job運行對應的DAG圖如下:
從上述的案例可以發現將數據封裝到Dataset/DataFrame中,進行處理分析,更加方便簡潔,這就是Spark框架中針對結構化數據處理模:Spark SQL模塊。
官方文檔:
http://spark.apache.org/sql/
spark 大數據
版權聲明:本文內容由網絡用戶投稿,版權歸原作者所有,本站不擁有其著作權,亦不承擔相應法律責任。如果您發現本站中有涉嫌抄襲或描述失實的內容,請聯系我們jiasou666@gmail.com 處理,核實后本網站將在24小時內刪除侵權內容。