大數(shù)據(jù)服務(wù)上云的思考">大數(shù)據(jù)服務(wù)上云的思考
1251
2025-04-05
目錄
RDD、DF、DS相關(guān)操作
SparkSQL初體驗(yàn)
SparkSession 應(yīng)用入口
獲取DataFrame/DataSet
使用樣例類
指定類型+列名
自定義Schema
RDD、DF、DS相互轉(zhuǎn)換
RDD、DF、DS相關(guān)操作
SparkSQL初體驗(yàn)
Spark 2.0開(kāi)始,SparkSQL應(yīng)用程序入口為SparkSession,加載不同數(shù)據(jù)源的數(shù)據(jù),封裝到DataFrame/Dataset集合數(shù)據(jù)結(jié)構(gòu)中,使得編程更加簡(jiǎn)單,程序運(yùn)行更加快速高效。
SparkSession 應(yīng)用入口
SparkSession:這是一個(gè)新入口,取代了原本的SQLContext與HiveContext。對(duì)于DataFrame API的用戶來(lái)說(shuō),Spark常見(jiàn)的混亂源頭來(lái)自于使用哪個(gè)“context”。現(xiàn)在使用SparkSession,它作為單個(gè)入口可以兼容兩者,注意原本的SQLContext與HiveContext仍然保留,以支持向下兼容。
文檔:
http://spark.apache.org/docs/2.4.5/sql-getting-started.html#starting-point-sparksession
1)、添加MAVEN依賴
2)、SparkSession對(duì)象實(shí)例通過(guò)建造者模式構(gòu)建,代碼如下:
其中①表示導(dǎo)入SparkSession所在的包,②表示建造者模式構(gòu)建對(duì)象和設(shè)置屬性,③表示導(dǎo)入SparkSession類中implicits對(duì)象object中隱式轉(zhuǎn)換函數(shù)。
3)、范例演示:構(gòu)建SparkSession實(shí)例,加載文本數(shù)據(jù),統(tǒng)計(jì)條目數(shù)。
package cn.itcast.sql
import org.apache.spark.SparkContext
import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
/**
* Author itcast
* Desc 演示SparkSQL
*/
object SparkSQLDemo00_hello {
def main(args: Array[String]): Unit = {
//1.準(zhǔn)備SparkSQL開(kāi)發(fā)環(huán)境
println(this.getClass.getSimpleName)
println(this.getClass.getSimpleName.stripSuffix("$"))
val spark: SparkSession = SparkSession.builder().appName(this.getClass.getSimpleName.stripSuffix("$")).master("local[*]").getOrCreate()
val sc: SparkContext = spark.sparkContext
sc.setLogLevel("WARN")
val df1: DataFrame = spark.read.text("data/input/text")
val df2: DataFrame = spark.read.json("data/input/json")
val df3: DataFrame = spark.read.csv("data/input/csv")
val df4: DataFrame = spark.read.parquet("data/input/parquet")
df1.printSchema()
df1.show(false)
df2.printSchema()
df2.show(false)
df3.printSchema()
df3.show(false)
df4.printSchema()
df4.show(false)
df1.coalesce(1).write.mode(SaveMode.Overwrite).text("data/output/text")
df2.coalesce(1).write.mode(SaveMode.Overwrite).json("data/output/json")
df3.coalesce(1).write.mode(SaveMode.Overwrite).csv("data/output/csv")
df4.coalesce(1).write.mode(SaveMode.Overwrite).parquet("data/output/parquet")
//關(guān)閉資源
sc.stop()
spark.stop()
}
}
使用SparkSession加載數(shù)據(jù)源數(shù)據(jù),將其封裝到DataFrame或Dataset中,直接使用show函數(shù)就可以顯示樣本數(shù)據(jù)(默認(rèn)顯示前20條)。
Spark2.0使用全新的SparkSession接口替代Spark1.6中的SQLContext及HiveContext接口來(lái)實(shí)現(xiàn)其對(duì)數(shù)據(jù)加載、轉(zhuǎn)換、處理等功能。SparkSession實(shí)現(xiàn)了SQLContext及HiveContext所有功能。 SparkSession支持從不同的數(shù)據(jù)源加載數(shù)據(jù),并把數(shù)據(jù)轉(zhuǎn)換成DataFrame,并且支持把DataFrame轉(zhuǎn)換成SQLContext自身中的表,然后使用SQL語(yǔ)句來(lái)操作數(shù)據(jù)。SparkSession亦提供了HiveQL以及其他依賴于Hive的功能的支持。
獲取DataFrame/DataSet
實(shí)際項(xiàng)目開(kāi)發(fā)中,往往需要將RDD數(shù)據(jù)集轉(zhuǎn)換為DataFrame,本質(zhì)上就是給RDD加上Schema信息,官方提供兩種方式:類型推斷和自定義Schema。
官方文檔:
http://spark.apache.org/docs/2.4.5/sql-getting-started.html#interoperating-with-rdds
使用樣例類
當(dāng)RDD中數(shù)據(jù)類型CaseClass樣例類時(shí),通過(guò)反射Reflecttion獲取屬性名稱和類型,構(gòu)建Schema,應(yīng)用到RDD數(shù)據(jù)集,將其轉(zhuǎn)換為DataFrame。
package cn.itcast.sql
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, SparkSession}
/**
* Author itcast
* Desc 演示基于RDD創(chuàng)建DataFrame--使用樣例類
*/
object CreateDataFrameDemo1 {
case class Person(id:Int,name:String,age:Int)
def main(args: Array[String]): Unit = {
//1.準(zhǔn)備環(huán)境-SparkSession
val spark: SparkSession = SparkSession.builder().appName("SparkSQL").master("local[*]").getOrCreate()
val sc: SparkContext = spark.sparkContext
sc.setLogLevel("WARN")
//2.加載數(shù)據(jù)
val lines: RDD[String] = sc.textFile("data/input/person.txt")
//3.切割
//val value: RDD[String] = lines.flatMap(_.split(" "))//錯(cuò)誤的
val linesArrayRDD: RDD[Array[String]] = lines.map(_.split(" "))
//4.將每一行(每一個(gè)Array)轉(zhuǎn)為樣例類(相當(dāng)于添加了Schema)
val personRDD: RDD[Person] = linesArrayRDD.map(arr=>Person(arr(0).toInt,arr(1),arr(2).toInt))
//5.將RDD轉(zhuǎn)為DataFrame(DF)
//注意:RDD的API中沒(méi)有toDF方法,需要導(dǎo)入隱式轉(zhuǎn)換!
import spark.implicits._
val personDF: DataFrame = personRDD.toDF
//6.查看約束
personDF.printSchema()
//7.查看分布式表中的數(shù)據(jù)集
personDF.show(6,false)//false表示不截?cái)嗔忻?也就是列名很長(zhǎng)的時(shí)候不會(huì)用...代替
}
}
此種方式要求RDD數(shù)據(jù)類型必須為CaseClass,轉(zhuǎn)換的DataFrame中字段名稱就是CaseClass中屬性名稱。
指定類型+列名
除了上述兩種方式將RDD轉(zhuǎn)換為DataFrame以外,SparkSQL中提供一個(gè)函數(shù):toDF,通過(guò)指定列名稱,將數(shù)據(jù)類型為元組的RDD或Seq轉(zhuǎn)換為DataFrame,實(shí)際開(kāi)發(fā)中也常常使用。
package cn.itcast.sql
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, SparkSession}
/**
* Author itcast
* Desc 演示基于RDD創(chuàng)建DataFrame--使用類型加列名
*/
object CreateDataFrameDemo2 {
def main(args: Array[String]): Unit = {
//1.準(zhǔn)備環(huán)境-SparkSession
val spark: SparkSession = SparkSession.builder().appName("SparkSQL").master("local[*]").getOrCreate()
val sc: SparkContext = spark.sparkContext
sc.setLogLevel("WARN")
//2.加載數(shù)據(jù)
val lines: RDD[String] = sc.textFile("data/input/person.txt")
//3.切割
//val value: RDD[String] = lines.flatMap(_.split(" "))//錯(cuò)誤的
val linesArrayRDD: RDD[Array[String]] = lines.map(_.split(" "))
//4.將每一行(每一個(gè)Array)轉(zhuǎn)為三元組(相當(dāng)于有了類型!)
val personWithColumnsTypeRDD: RDD[(Int, String, Int)] = linesArrayRDD.map(arr=>(arr(0).toInt,arr(1),arr(2).toInt))
//5.將RDD轉(zhuǎn)為DataFrame(DF)并指定列名
//注意:RDD的API中沒(méi)有toDF方法,需要導(dǎo)入隱式轉(zhuǎn)換!
import spark.implicits._
val personDF: DataFrame = personWithColumnsTypeRDD.toDF("id","name","age")
//6.查看約束
personDF.printSchema()
//7.查看分布式表中的數(shù)據(jù)集
personDF.show(6,false)//false表示不截?cái)嗔忻?也就是列名很長(zhǎng)的時(shí)候不會(huì)用...代替
}
}
自定義Schema
依據(jù)RDD中數(shù)據(jù)自定義Schema,類型為StructType,每個(gè)字段的約束使用StructField定義,具體步驟如下:
第一步、RDD中數(shù)據(jù)類型為Row:RDD[Row];
第二步、針對(duì)Row中數(shù)據(jù)定義Schema:StructType;
第三步、使用SparkSession中方法將定義的Schema應(yīng)用到RDD[Row]上;
package cn.itcast.sql
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.types.{IntegerType, LongType, StringType, StructField, StructType}
import org.apache.spark.sql.{DataFrame, Row, SparkSession}
/**
* Author itcast
* Desc 演示基于RDD創(chuàng)建DataFrame--使用StructType
*/
object CreateDataFrameDemo3 {
def main(args: Array[String]): Unit = {
//1.準(zhǔn)備環(huán)境-SparkSession
val spark: SparkSession = SparkSession.builder().appName("SparkSQL").master("local[*]").getOrCreate()
val sc: SparkContext = spark.sparkContext
sc.setLogLevel("WARN")
//2.加載數(shù)據(jù)
val lines: RDD[String] = sc.textFile("data/input/person.txt")
//3.切割
//val value: RDD[String] = lines.flatMap(_.split(" "))//錯(cuò)誤的
val linesArrayRDD: RDD[Array[String]] = lines.map(_.split(" "))
//4.將每一行(每一個(gè)Array)轉(zhuǎn)為Row
val rowRDD: RDD[Row] = linesArrayRDD.map(arr=>Row(arr(0).toInt,arr(1),arr(2).toInt))
//5.將RDD轉(zhuǎn)為DataFrame(DF)并指定列名
//注意:RDD的API中沒(méi)有toDF方法,需要導(dǎo)入隱式轉(zhuǎn)換!
import spark.implicits._
/*val schema: StructType = StructType(
StructField("id", IntegerType, false) ::
StructField("name", StringType, false) ::
StructField("age", IntegerType, false) :: Nil)*/
val schema: StructType = StructType(List(
StructField("id", IntegerType, false),
StructField("name", StringType, false),
StructField("age", IntegerType, false)
))
val personDF: DataFrame = spark.createDataFrame(rowRDD,schema)
//6.查看約束
personDF.printSchema()
//7.查看分布式表中的數(shù)據(jù)集
personDF.show(6,false)//false表示不截?cái)嗔忻?也就是列名很長(zhǎng)的時(shí)候不會(huì)用...代替
}
}
此種方式可以更加體會(huì)到DataFrame = RDD[Row] + Schema組成,在實(shí)際項(xiàng)目開(kāi)發(fā)中靈活的選擇方式將RDD轉(zhuǎn)換為DataFrame。
RDD、DF、DS相互轉(zhuǎn)換
實(shí)際項(xiàng)目開(kāi)發(fā)中,常常需要對(duì)RDD、DataFrame及Dataset之間相互轉(zhuǎn)換,其中要點(diǎn)就是Schema約束結(jié)構(gòu)信息。
1)、RDD轉(zhuǎn)換DataFrame或者Dataset
轉(zhuǎn)換DataFrame時(shí),定義Schema信息,兩種方式
轉(zhuǎn)換為Dataset時(shí),不僅需要Schema信息,還需要RDD數(shù)據(jù)類型為CaseClass類型
2)、Dataset或DataFrame轉(zhuǎn)換RDD
由于Dataset或DataFrame底層就是RDD,所以直接調(diào)用rdd函數(shù)即可轉(zhuǎn)換
dataframe.rdd 或者dataset.rdd
3)、DataFrame與Dataset之間轉(zhuǎn)換
由于DataFrame為Dataset特例,所以Dataset直接調(diào)用toDF函數(shù)轉(zhuǎn)換為DataFrame
當(dāng)將DataFrame轉(zhuǎn)換為Dataset時(shí),使用函數(shù)as[Type],指定CaseClass類型即可。
RDD、DataFrame和DataSet之間的轉(zhuǎn)換如下,假設(shè)有個(gè)樣例類:case?class?Emp(name:?String),相互轉(zhuǎn)換
RDD轉(zhuǎn)換到DataFrame:rdd.toDF(“name”)
RDD轉(zhuǎn)換到Dataset:rdd.map(x => Emp(x)).toDS
DataFrame轉(zhuǎn)換到Dataset:df.as[Emp]
DataFrame轉(zhuǎn)換到RDD:df.rdd
Dataset轉(zhuǎn)換到DataFrame:ds.toDF
Dataset轉(zhuǎn)換到RDD:ds.rdd
注意:
RDD與DataFrame或者DataSet進(jìn)行操作,都需要引入隱式轉(zhuǎn)換import spark.implicits._,其中的spark是SparkSession對(duì)象的名稱!
package cn.itcast.sql
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}
/**
* Author itcast
* Desc 演示基于RDD/DataFrame/DataSet三者之間的相互轉(zhuǎn)換
*/
object TransformationDemo {
case class Person(id:Int,name:String,age:Int)
def main(args: Array[String]): Unit = {
//1.準(zhǔn)備環(huán)境-SparkSession
val spark: SparkSession = SparkSession.builder().appName("SparkSQL").master("local[*]").getOrCreate()
val sc: SparkContext = spark.sparkContext
sc.setLogLevel("WARN")
//2.加載數(shù)據(jù)
val lines: RDD[String] = sc.textFile("data/input/person.txt")
//3.切割
//val value: RDD[String] = lines.flatMap(_.split(" "))//錯(cuò)誤的
val linesArrayRDD: RDD[Array[String]] = lines.map(_.split(" "))
//4.將每一行(每一個(gè)Array)轉(zhuǎn)為樣例類(相當(dāng)于添加了Schema)
val personRDD: RDD[Person] = linesArrayRDD.map(arr=>Person(arr(0).toInt,arr(1),arr(2).toInt))
//5.將RDD轉(zhuǎn)為DataFrame(DF)
//注意:RDD的API中沒(méi)有toDF方法,需要導(dǎo)入隱式轉(zhuǎn)換!
import spark.implicits._
//轉(zhuǎn)換1:rdd-->df
val personDF: DataFrame = personRDD.toDF //注意:DataFrame沒(méi)有泛型
//轉(zhuǎn)換2:rdd-->ds
val personDS: Dataset[Person] = personRDD.toDS() //注意:Dataset具有泛型
//轉(zhuǎn)換3:df-->rdd
val rdd: RDD[Row] = personDF.rdd //注意:DataFrame沒(méi)有泛型,也就是不知道里面是Person,所以轉(zhuǎn)為rdd之后統(tǒng)一的使用Row表示里面是很多行
//轉(zhuǎn)換4:ds-->rdd
val rdd1: RDD[Person] = personDS.rdd //注意:Dataset具有泛型,所以轉(zhuǎn)為rdd之后還有原來(lái)泛型!
//轉(zhuǎn)換5:ds-->df
val dataFrame: DataFrame = personDS.toDF()
//轉(zhuǎn)換5:df-->ds
val personDS2: Dataset[Person] = personDF.as[Person]
//目前DataFrame和DataSet使用類似,如:也有show/createOrReplaceTempView/select
personDS.show()
personDS.createOrReplaceTempView("t_person")
personDS.select("name").show()
}
}
spark 大數(shù)據(jù)
版權(quán)聲明:本文內(nèi)容由網(wǎng)絡(luò)用戶投稿,版權(quán)歸原作者所有,本站不擁有其著作權(quán),亦不承擔(dān)相應(yīng)法律責(zé)任。如果您發(fā)現(xiàn)本站中有涉嫌抄襲或描述失實(shí)的內(nèi)容,請(qǐng)聯(lián)系我們jiasou666@gmail.com 處理,核實(shí)后本網(wǎng)站將在24小時(shí)內(nèi)刪除侵權(quán)內(nèi)容。
版權(quán)聲明:本文內(nèi)容由網(wǎng)絡(luò)用戶投稿,版權(quán)歸原作者所有,本站不擁有其著作權(quán),亦不承擔(dān)相應(yīng)法律責(zé)任。如果您發(fā)現(xiàn)本站中有涉嫌抄襲或描述失實(shí)的內(nèi)容,請(qǐng)聯(lián)系我們jiasou666@gmail.com 處理,核實(shí)后本網(wǎng)站將在24小時(shí)內(nèi)刪除侵權(quán)內(nèi)容。