大數據“復活”記
881
2025-04-02
Spark SQL編程
SparkSQL的依賴
不帶Hive支持
帶Hive支持(推薦使用)
SparkSQL的入口:SQLContex
SQLContext是SparkSQL的入口
val sc: SparkContext
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
import sqlContext._ //導?入各種sql操作的?口與各種隱式轉換
SparkSQL的入口: HiveContext
HiveContext是SQLContext的子類,提供了對Hive的支持
complete HiveQL parser,
access to Hive UDFs
the ability to read data from Hive tables
編譯時要包含Hive支持
mvn -Pyarn -Phadoop-2.7 -Dhadoop.version=2.7.0 -Phive -Phive-thriftserver -DskipTests clean package
不需要提前安裝Hive(連接已有Hive會在后面講解)
HiveContext可以使用任何在SQLContext上可用的data source
SQLContext vs HiveContext
SQLContext現在只支持SQL語法解析器(SQL-92語法)
val sc: SparkContext
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
import sqlContext._ //導?入各種sql操作的?口與各種隱式轉換
SQLContext vs HiveContext
HiveContext現在支持SQL語法解析器和HiveSQL語法解析器,默認為HiveSQL語法解析器,用戶可以通過配置切換成SQL語法解析器,來運行HiveSQL不支持的語法。
使用HiveContext可以使用Hive的UDF,讀寫Hive表數據等Hive操作。SQLContext不可以對Hive進行操作
Spark SQL未來的版本會不斷豐富SQLContext的功能,做到SQLContext和HiveContext的功能容和,最終可能兩者會統一成一個Context
HiveContext包裝了Hive的依賴包,把HiveContext單獨拿出來,可以在部署基本的Spark的時候就不需要Hive的依賴包,需要使用HiveContext時再把Hive的各種依賴包加進來。
Spark SQL的作用與使用方式:
Spark程序中使用SparkSQL
輕松讀取數據并使用SQL 查詢,同時還能把這一過程和普通的Python/Java/Scala 程序代碼結合在一起
CLI---Spark SQL shell
JDBC/ODBC
各種支持jdbc的軟件、商業智能(BI)工具、平臺
Spark SQL支持的API
SQL
DataFrame(推薦方式,也能執行SQL)
Dataset(還在發展)
SQL
支持basic SQL syntax/HiveQL
程序中使用SQL會返回DataFrame
command-line和JDBC/ODBC中均可以使用
SparkSQL數據源:從各種數據源創建DataFrame
因為 spark sql,dataframe,datasets 都是共用 spark sql 這個庫的,三者共享同樣的代碼優化,生成以及執行流程,所以 sql,dataframe,datasets 的入口都是 sqlContext。
可用于創建 spark dataframe 的數據源有很多:
SparkSQL數據源:RDD
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
// this is used to implicitly convert an RDD to a DataFrame.
import sqlContext.implicits._
// Define the schema using a case class.
case class Person(name: String, age: Int)
// Create an RDD of Person objects and register it as a table.
val people = sc.textFile("examples/src/main/resources/people.txt").map(_.split(",")).map(p => Person(p(0), p(1).trim.toInt)).toDF()
//
val people = sc.textFile("examples/src/main/resources/people.txt").map(_.split(",")).map(p => Person(p(0), p(1).trim.toInt))
sqlContext.createDataFrame(people)
SparkSQL數據源:Hive
當從Hive 中讀取數據時,Spark SQL 支持任何Hive 支持的存儲格式(SerDe),包括文件、RCFiles、ORC、Parquet、Avro,以及Protocol Buffer(當然Spark SQL也可以直接讀取這些文件)。
要連接已部署好的Hive,需要拷貝hive-site.xml、core-site.xml、hdfs-site.xml到Spark 的./conf/ 目錄下即可
如果不想連接到已有的hive,可以什么都不做直接使用HiveContext:
Spark SQL 會在當前的工作目錄中創建出自己的Hive 元數據倉庫,叫作metastore_db
如果你嘗試使用HiveQL 中的CREATE TABLE(并非CREATE EXTERNAL TABLE)語句來創建表,這些表會被放在你默認的文件系統中的/user/hive/warehouse 目錄中(如果你的classpath 中有配好的hdfs-site.xml,默認的文件系統就是HDFS,否則就是本地文件系統)。
SparkSQL數據源:Hive讀寫
// sc is an existing SparkContext.
val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc)
sqlContext.sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)")
sqlContext.sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src")
// Queries are expressed in HiveQL
sqlContext.sql("FROM src SELECT key, value").collect().foreach(println)
SparkSQL數據源:訪問不同版本的metastore
從Spark1.4開始,Spark SQL可以通過修改配置去查詢不同版本的?Hive metastores(不用重新編譯)
SparkSQL數據源:Parquet
Parquet(http://parquet.apache.org/)是一種流行的列式存儲格式,可以高效地存儲具有嵌套字段的記錄。
Parquet 格式經常在Hadoop 生態圈中被使用,它也支持Spark SQL 的全部數據類型。Spark SQL 提供了直接讀取和存儲Parquet 格式文件的方法。
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
// this is used to implicitly convert an RDD to a DataFrame.
import sqlContext.implicits._
// Define the schema using a case class.
case class Person(name: String, age: Int)
// Create an RDD of Person objects and register it as a table.
val people = sc.textFile("examples/src/main/resources/people.txt").map(_.split(",")).map(p => Person(p(0), p(1).trim.toInt)).toDF()
people.write.parquet("xxxx")
val parquetFile = sqlContext.read.parquet("people.parquet")
//Parquet files can also be registered as tables and then used in SQL statements.
parquetFile.registerTempTable("parquetFile")
val agers = sqlContext.sql("SELECT name FROM parquetFile WHERE age >= 13 AND age <= 19")
agers.map(t => "Name: " + t(0)).collect().foreach(println)
SparkSQL數據源:Parquet-- Partition Discovery
在Hive中通常會用分區表來優化性能,比如:
SQLContext.read.parquet或者SQLContext.read.load只需要指定path/to/table,SparkSQL會自動從路徑中提取分區信息,返回的DataFrame 的schema 將是:
當然你可以使用Hive讀取方式:
hiveContext.sql("FROM src SELECT key, value").
SparkSQL數據源:Json
SparkSQL支持從Json文件或者Json格式的RDD讀取數據
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
// 可以是目錄或者文件夾
val path = "examples/src/main/resources/people.json"
val people = sqlContext.read.json(path)
// The inferred schema can be visualized using the printSchema() method.
people.printSchema()
// Register this DataFrame as a table.
people.registerTempTable("people")
// SQL statements can be run by using the sql methods provided by sqlContext.
val agers = sqlContext.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19")
// Alternatively, a DataFrame can be created for a JSON dataset represented by
// an RDD[String] storing one JSON object per string.
val anotherPeopleRDD = sc.parallelize("""{"name":"Yin","address":{"city":"Columbus","state":"Ohio"}}""" :: Nil)
val anotherPeople = sqlContext.read.json(anotherPeopleRDD)
SparkSQL數據源:JDBC
val jdbcDF = sqlContext.read.format("jdbc").options(Map("url" -> "jdbc:postgresql:dbserver","dbtable" -> "schema.tablename")).load()
支持的參數:
spark Hive SQL
版權聲明:本文內容由網絡用戶投稿,版權歸原作者所有,本站不擁有其著作權,亦不承擔相應法律責任。如果您發現本站中有涉嫌抄襲或描述失實的內容,請聯系我們jiasou666@gmail.com 處理,核實后本網站將在24小時內刪除侵權內容。
版權聲明:本文內容由網絡用戶投稿,版權歸原作者所有,本站不擁有其著作權,亦不承擔相應法律責任。如果您發現本站中有涉嫌抄襲或描述失實的內容,請聯系我們jiasou666@gmail.com 處理,核實后本網站將在24小時內刪除侵權內容。