解讀Spark Datasource

      網友投稿 1145 2025-03-31

      在日常的工作中,我們會接觸到各種各樣的數據庫,他們存儲的數據也是各式各樣。當我們使用Spark去處理數據的時候,我們常常會遇到數據來源于不同數據源的情況。如果重新加載和保存數據的話,會非常的麻煩,還浪費空間,而且很多時候還需要考慮數據格式轉換的問題。

      為了解決這樣的問題,Spark提供了一個框架,叫Datasource。我目前就在學習這個框架的內容,并嘗試著用Spark Datesource這個框架實現一個與自建數據源連接的功能。

      下面就是我通過學習記錄的內容。

      1.概念介紹

      1.1 什么是Spark Datasource

      Spark Datasource是連接外部數據源和Spark引擎的框架,是一個連接器。可以利用這個連接器來對外部的數據源進行一個讀寫的操作。華為云的DLI支持原生Spark的Datasource能力,并在其基礎上進行了擴展。

      DLI利用Spark Datasource構建的功能為跨源連接。當前跨源連接分為以下兩種:

      經典型跨源連接

      DLI 經典型跨源連接可用于訪問CloudTable的Hbase和OpenTSDB,MRS的OpenTSDB,DWS,RDS,CSS數據源。

      增強型跨源連接

      DLI 增強型跨源連接底層采用對等連接,直接打通DLI集群與目的數據源的vpc網絡,通過點對點的方式實現數據互通,能夠提供比經典型跨源更加靈活的使用場景與更加強勁的性能。增強型跨源支持所有DLI服務已實現的跨源業務,包括CloudTable的Hbase和OpenTSDB,MRS的OpenTSDB,DWS,RDS,CSS,DCS,DDS等數據源。并且通過Spark作業方式能夠實現與自建數據源之間的訪問。

      1.2 Spark Datasource如何使用?

      當你使用華為云DLI的跨源連接服務時,首先需要按照官網上面的知識將跨源連接創建好,將隊列綁定好。然后就可以創建作業,然后用綁定好的這個隊列來運行你的作業。本文主要講述的是Spark Datasource,所以具體使用講的是spark作業相關,以scala語言為例。

      利用spark作業讀取外部數據源的模式總共有兩種:

      DataFrame模式

      利用DataFrame的API讀取外部數據源的方式如下:

      val jdbcDF = sparkSession .read //表示是讀取數據(write就是寫) .format("jdbc") //驅動類,這里連接的是jdbc數據源 .option("url", "jdbc:postgresql:dbserver") //option表示的是填入的參數。 .load()

      SparkSql模式

      利用Spark SQL的方式讀取外部數據源的方式如下:

      sparkSession.sql(" CREATE TABLE IF NOT EXISTS dli_to_rds //創建的spark sql表名 USING JDBC OPTIONS //驅動類,表明連接的是jdbc數據源 ( 'url'='jdbc:mysql://to-rds-1174404209-cA37siB6.datasource.com:3306', 'driver'='com.mysql.jdbc.Driver' //上述是數據源的一些具體參數。 )") sparkSession.sql("select * from dli_to_rds")

      因為sql的簡單易用以及易上手性,所以我們推薦大家使用Spark SQL模式。

      2.運行原理

      2.1 讀流程

      def read: DataFrameReader = new DataFrameReader(self)

      read對應的是DataFrameReader,這是Spark Datasource讀取外部數據源的入口。當我們使用sparkSession.read的時候,就會構造出一個DataFrameReader,這是后面參數設置的基礎,其中內置了讀取多種數據源的方法,還包括參數配置的接口。

      def format(source: String): DataFrameReader = {this.source = source}

      format需要填入的是數據源的名稱,表明連接的是什么連接源。Spark原生支持的數據源有csv、orc、parquet、jdbc等,通過在format這里填寫來表示連接的數據源具體是什么。除此之外,format這里也是匹配自建數據源的一個重要窗口。當我們需要連接自己的數據源時,需要設置源的名字是什么,也就是source是什么,設置好以后通過format這里的填寫,Spark Datasource框架會利用Spi機制連接創建我們所需要的連接。

      def option(key: String, value: String): DataFrameReader = { this.extraOptions = this.extraOptions + (key -> value)}

      option的作用就是具體的參數的填寫。以連接一個jdbc的數據源為例,需要填入的參數有:

      url 這是jdbc數據源的host地址。

      dbtable 這是具體要讀取的表的表名。

      user 這是數據源的用戶名。

      password 這是數據源的密碼。

      driver 這是數據源的驅動器,jdbc的驅動器就是com.mysql.jdbc.Driver。

      .load()

      最后通過這個load的方法真正建立Spark與數據源之間的連接,并且得到一個DataFrame。

      2.2 寫流程

      def write: DataFrameWriter[T] = {new DataFrameWriter[T](this)}

      寫的流程是與讀流程類似的,區別的是write返回的是一個DataFrameWrite類型對象,這是Spark Datasource寫數據的入口。

      解讀Spark Datasource

      2.3 連接是如何創建的?

      這里主要以讀取華為云JDBC數據源流程為例來講解。

      如果不關注Spark Datasource內部是如何運行的話,那么對用使用者來說,load完就是成功建立起連接并讀取到外部數據源中的數據了。但是,如果關注Spark Datasource內部是如何運作的話,那么你就會知道,load對于這個流程來說,僅僅是個開始。

      def load(paths: String*): DataFrame = { DataSource.lookupDataSourceV2(source, ....).map {......}.getOrElse(loadV1Source(paths: _*)) }

      與load相關的函數主要有兩個:

      lookupDataSourceV2

      這個函數的主要作用為將source name匹配為RelationProvider的名字。如果source name為jdbc,那么匹配到的名字就是“org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider”。

      loadV1Source

      這個函數的主要作用為利用上面函數得到的RelationProvider建立與外部數據源之間的連接,并讀取option中設置的數據,最終返回一個包含了我們希望讀取到的數據的DataFrame。

      private def loadV1Source(paths: String*) = { ... // Code path for data source v1. sparkSession.baseRelationToDataFrame((...).resolveRelation()) }

      loadV1Source中調用的函數主要為二:

      resolveRelation

      這個函數就是通過上面匹配后的source name來返回一個JdbcRelationProvider。在這個函數的內部,通過匹配來判斷獲取的RelationProvider類型,如果是外部的自建數據源,則會利用java的Spi機制來獲取外部的RelationProvider。每個RelationProvider對象中都有著createRelation的功能,在這個函數內部,通過調用這些createRelation方法建立起了與外部數據源的連接,并利用之前的option獲取了相關的數據信息。

      baseRelationToDataFrame

      這個函數則是將RelationProvider轉換為DataFrame。

      到這里,一個與外部數據源建立連接并獲取相關數據的過程就完成了。

      3.DLI是如何使用Spark Datasource的

      DLI支持原生Spark的DataSource能力,并在其基礎上進行了相應的擴展,能夠利用spark作業去訪問其他華為云的數據源并導入、查詢和分析處理其中的數據。

      目前支持DLI跨源訪問的服務有:

      云搜索服務CSS

      分布式緩存服務DCS

      文檔數據庫服務DDS

      文檔數據庫服務DDS

      云數據庫RDS

      MapReduce服務MRS

      云數據庫RDS

      上述服務中,CSS集群存在著安全和非安全兩種情況、MRS集群存在著是否開啟Kerberos認證的情況,DLI均支持。只需要按照相關的指南配置連接文件和參數,即可利用DLI對CSS、MRS集群進行數據的操作。

      數據湖探索 DLI

      版權聲明:本文內容由網絡用戶投稿,版權歸原作者所有,本站不擁有其著作權,亦不承擔相應法律責任。如果您發現本站中有涉嫌抄襲或描述失實的內容,請聯系我們jiasou666@gmail.com 處理,核實后本網站將在24小時內刪除侵權內容。

      版權聲明:本文內容由網絡用戶投稿,版權歸原作者所有,本站不擁有其著作權,亦不承擔相應法律責任。如果您發現本站中有涉嫌抄襲或描述失實的內容,請聯系我們jiasou666@gmail.com 處理,核實后本網站將在24小時內刪除侵權內容。

      上一篇:excel中取最小值的函數是什么(excel取最小值)
      下一篇:輕松查找和匯總行或列中的第一個或所有匹配值
      相關文章
      国产成人精品日本亚洲18图| 亚洲一区二区三区高清视频| 亚洲AV永久无码精品一福利| 亚洲综合亚洲国产尤物| 亚洲av无码片在线播放| 亚洲午夜精品久久久久久浪潮 | 亚洲AV无码码潮喷在线观看| 亚洲午夜久久久久妓女影院| 亚洲中文字幕无码中文字在线| av在线亚洲欧洲日产一区二区| 亚洲国产精品成人久久蜜臀| 亚洲AV日韩AV无码污污网站| 亚洲日韩国产一区二区三区在线| 亚洲人成网国产最新在线| 中文字幕在线日亚洲9| 在线亚洲高清揄拍自拍一品区| 国产成人亚洲综合一区| 亚洲一日韩欧美中文字幕在线| 亚洲码和欧洲码一码二码三码 | 亚洲国产精品成人久久| 久久亚洲国产欧洲精品一| 国产成A人亚洲精V品无码 | 好看的电影网站亚洲一区| 国产亚洲精品xxx| 亚洲AV电影院在线观看| 亚洲丝袜美腿视频| 亚洲另类春色国产精品| 亚洲国产成a人v在线观看| 国产婷婷综合丁香亚洲欧洲| 亚洲一卡2卡3卡4卡5卡6卡| 亚洲狠狠色丁香婷婷综合| 成人精品国产亚洲欧洲| 国产成人亚洲精品无码AV大片| 亚洲av无码专区在线观看素人| 亚洲第一区在线观看| 狠狠综合久久综合88亚洲| 亚洲成AV人片在线观看无码| 91亚洲国产成人精品下载| 亚洲国产熟亚洲女视频| 久久水蜜桃亚洲AV无码精品| 国产成人毛片亚洲精品|