Spark SQL編程

      網友投稿 881 2025-04-02

      Spark SQL編程


      SparkSQL的依賴

      不帶Hive支持

      org.apache.spark

      spark-sql_2.10

      1.6.2

      帶Hive支持(推薦使用)

      org.apache.spark

      spark-hive_2.10

      1.6.2

      SparkSQL的入口:SQLContex

      SQLContext是SparkSQL的入口

      val sc: SparkContext

      val sqlContext = new org.apache.spark.sql.SQLContext(sc)

      import sqlContext._ //導?入各種sql操作的?口與各種隱式轉換

      SparkSQL的入口: HiveContext

      HiveContext是SQLContext的子類,提供了對Hive的支持

      complete HiveQL parser,

      access to Hive UDFs

      the ability to read data from Hive tables

      編譯時要包含Hive支持

      mvn -Pyarn -Phadoop-2.7 -Dhadoop.version=2.7.0 -Phive -Phive-thriftserver -DskipTests clean package

      不需要提前安裝Hive(連接已有Hive會在后面講解)

      HiveContext可以使用任何在SQLContext上可用的data source

      SQLContext vs HiveContext

      SQLContext現在只支持SQL語法解析器(SQL-92語法)

      val sc: SparkContext

      val sqlContext = new org.apache.spark.sql.SQLContext(sc)

      import sqlContext._ //導?入各種sql操作的?口與各種隱式轉換

      SQLContext vs HiveContext

      HiveContext現在支持SQL語法解析器和HiveSQL語法解析器,默認為HiveSQL語法解析器,用戶可以通過配置切換成SQL語法解析器,來運行HiveSQL不支持的語法。

      使用HiveContext可以使用Hive的UDF,讀寫Hive表數據等Hive操作。SQLContext不可以對Hive進行操作

      Spark SQL未來的版本會不斷豐富SQLContext的功能,做到SQLContext和HiveContext的功能容和,最終可能兩者會統一成一個Context

      HiveContext包裝了Hive的依賴包,把HiveContext單獨拿出來,可以在部署基本的Spark的時候就不需要Hive的依賴包,需要使用HiveContext時再把Hive的各種依賴包加進來。

      Spark SQL的作用與使用方式:

      Spark程序中使用SparkSQL

      輕松讀取數據并使用SQL 查詢,同時還能把這一過程和普通的Python/Java/Scala 程序代碼結合在一起

      CLI---Spark SQL shell

      JDBC/ODBC

      各種支持jdbc的軟件、商業智能(BI)工具、平臺

      Spark SQL支持的API

      SQL

      DataFrame(推薦方式,也能執行SQL)

      Dataset(還在發展)

      SQL

      支持basic SQL syntax/HiveQL

      程序中使用SQL會返回DataFrame

      command-line和JDBC/ODBC中均可以使用

      SparkSQL數據源:從各種數據源創建DataFrame

      因為 spark sql,dataframe,datasets 都是共用 spark sql 這個庫的,三者共享同樣的代碼優化,生成以及執行流程,所以 sql,dataframe,datasets 的入口都是 sqlContext。

      可用于創建 spark dataframe 的數據源有很多:

      SparkSQL數據源:RDD

      val sqlContext = new org.apache.spark.sql.SQLContext(sc)

      Spark SQL編程

      // this is used to implicitly convert an RDD to a DataFrame.

      import sqlContext.implicits._

      // Define the schema using a case class.

      case class Person(name: String, age: Int)

      // Create an RDD of Person objects and register it as a table.

      val people = sc.textFile("examples/src/main/resources/people.txt").map(_.split(",")).map(p => Person(p(0), p(1).trim.toInt)).toDF()

      //

      val people = sc.textFile("examples/src/main/resources/people.txt").map(_.split(",")).map(p => Person(p(0), p(1).trim.toInt))

      sqlContext.createDataFrame(people)

      SparkSQL數據源:Hive

      當從Hive 中讀取數據時,Spark SQL 支持任何Hive 支持的存儲格式(SerDe),包括文件、RCFiles、ORC、Parquet、Avro,以及Protocol Buffer(當然Spark SQL也可以直接讀取這些文件)。

      要連接已部署好的Hive,需要拷貝hive-site.xml、core-site.xml、hdfs-site.xml到Spark 的./conf/ 目錄下即可

      如果不想連接到已有的hive,可以什么都不做直接使用HiveContext:

      Spark SQL 會在當前的工作目錄中創建出自己的Hive 元數據倉庫,叫作metastore_db

      如果你嘗試使用HiveQL 中的CREATE TABLE(并非CREATE EXTERNAL TABLE)語句來創建表,這些表會被放在你默認的文件系統中的/user/hive/warehouse 目錄中(如果你的classpath 中有配好的hdfs-site.xml,默認的文件系統就是HDFS,否則就是本地文件系統)。

      SparkSQL數據源:Hive讀寫

      // sc is an existing SparkContext.

      val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc)

      sqlContext.sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)")

      sqlContext.sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src")

      // Queries are expressed in HiveQL

      sqlContext.sql("FROM src SELECT key, value").collect().foreach(println)

      SparkSQL數據源:訪問不同版本的metastore

      從Spark1.4開始,Spark SQL可以通過修改配置去查詢不同版本的?Hive metastores(不用重新編譯)

      SparkSQL數據源:Parquet

      Parquet(http://parquet.apache.org/)是一種流行的列式存儲格式,可以高效地存儲具有嵌套字段的記錄。

      Parquet 格式經常在Hadoop 生態圈中被使用,它也支持Spark SQL 的全部數據類型。Spark SQL 提供了直接讀取和存儲Parquet 格式文件的方法。

      val sqlContext = new org.apache.spark.sql.SQLContext(sc)

      // this is used to implicitly convert an RDD to a DataFrame.

      import sqlContext.implicits._

      // Define the schema using a case class.

      case class Person(name: String, age: Int)

      // Create an RDD of Person objects and register it as a table.

      val people = sc.textFile("examples/src/main/resources/people.txt").map(_.split(",")).map(p => Person(p(0), p(1).trim.toInt)).toDF()

      people.write.parquet("xxxx")

      val parquetFile = sqlContext.read.parquet("people.parquet")

      //Parquet files can also be registered as tables and then used in SQL statements.

      parquetFile.registerTempTable("parquetFile")

      val agers = sqlContext.sql("SELECT name FROM parquetFile WHERE age >= 13 AND age <= 19")

      agers.map(t => "Name: " + t(0)).collect().foreach(println)

      SparkSQL數據源:Parquet-- Partition Discovery

      在Hive中通常會用分區表來優化性能,比如:

      SQLContext.read.parquet或者SQLContext.read.load只需要指定path/to/table,SparkSQL會自動從路徑中提取分區信息,返回的DataFrame 的schema 將是:

      當然你可以使用Hive讀取方式:

      hiveContext.sql("FROM src SELECT key, value").

      SparkSQL數據源:Json

      SparkSQL支持從Json文件或者Json格式的RDD讀取數據

      val sqlContext = new org.apache.spark.sql.SQLContext(sc)

      // 可以是目錄或者文件夾

      val path = "examples/src/main/resources/people.json"

      val people = sqlContext.read.json(path)

      // The inferred schema can be visualized using the printSchema() method.

      people.printSchema()

      // Register this DataFrame as a table.

      people.registerTempTable("people")

      // SQL statements can be run by using the sql methods provided by sqlContext.

      val agers = sqlContext.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19")

      // Alternatively, a DataFrame can be created for a JSON dataset represented by

      // an RDD[String] storing one JSON object per string.

      val anotherPeopleRDD = sc.parallelize("""{"name":"Yin","address":{"city":"Columbus","state":"Ohio"}}""" :: Nil)

      val anotherPeople = sqlContext.read.json(anotherPeopleRDD)

      SparkSQL數據源:JDBC

      val jdbcDF = sqlContext.read.format("jdbc").options(Map("url" -> "jdbc:postgresql:dbserver","dbtable" -> "schema.tablename")).load()

      支持的參數:

      spark Hive SQL

      版權聲明:本文內容由網絡用戶投稿,版權歸原作者所有,本站不擁有其著作權,亦不承擔相應法律責任。如果您發現本站中有涉嫌抄襲或描述失實的內容,請聯系我們jiasou666@gmail.com 處理,核實后本網站將在24小時內刪除侵權內容。

      版權聲明:本文內容由網絡用戶投稿,版權歸原作者所有,本站不擁有其著作權,亦不承擔相應法律責任。如果您發現本站中有涉嫌抄襲或描述失實的內容,請聯系我們jiasou666@gmail.com 處理,核實后本網站將在24小時內刪除侵權內容。

      上一篇:Word2003實用技巧單欄輕松變多欄(word2003分欄怎么兩欄獨立)
      下一篇:Word 2016樣式怎么應用于文本
      相關文章
      国产亚洲欧洲Aⅴ综合一区 | 国产精品亚洲专一区二区三区| 亚洲第一永久在线观看| 国产亚洲精品一品区99热| 国产成人精品亚洲精品| 亚洲日韩国产精品乱| 亚洲精品国产自在久久| 亚洲人成色77777在线观看大| 在线观看亚洲网站| 国产亚洲精品2021自在线| 国产一区二区三区亚洲综合 | 久久久久亚洲精品成人网小说| 亚洲大尺度无码专区尤物| 亚洲av午夜成人片精品网站| 久久久久亚洲AV成人无码网站 | 国产精品无码亚洲精品2021| 精品国产亚洲AV麻豆| 九月婷婷亚洲综合在线| 亚洲а∨天堂久久精品| 亚洲熟伦熟女新五十路熟妇 | 亚洲性久久久影院| 亚洲色无码一区二区三区| 亚洲第一AAAAA片| 亚洲视频精品在线观看| 亚洲国产精品yw在线观看| 国产成人精品日本亚洲18图| 亚洲一区二区三区成人网站| 久久水蜜桃亚洲AV无码精品| 亚洲第一区在线观看| 久久乐国产精品亚洲综合| 亚洲精品国产成人片| 亚洲另类激情综合偷自拍| 亚洲特级aaaaaa毛片| 日本亚洲免费无线码| 亚洲精品9999久久久久无码| 亚洲AV无码乱码在线观看牲色 | 亚洲午夜电影在线观看| 亚洲一区二区三区写真| 亚洲电影日韩精品| 日韩亚洲一区二区三区| 亚洲美女在线观看播放|