數據湖應用解析Spark on Elasticsearch一致性問題

      網友投稿 994 2022-05-30

      1.?? 概述

      Spark與Elasticsearch(es)的結合,是近年來大數據解決方案很火熱的一個話題。一個是出色的分布式計算引擎,另一個是出色的搜索引擎。近年來,越來越多的成熟方案落地到行業產品中,包括我們耳熟能詳的Spark+ES+HBase日志分析平臺。

      目前,華為云數據湖探索(DLI)服務已全面支持Spark/Flink跨源訪問Elasticsearch。而之前在實現過程中也遇到過很多場景化問題,本文將挑選其中比較經典的分布式一致性問題進行探討。

      2.?? 分布式一致性問題

      l? 問題描述

      數據容錯是大數據計算引擎面臨的主要問題之一。目前,主流的開源大數據比如Apache Spark和Apache Flink已經完全實現了Exactly Once語義,保證了內部數據處理的正確性。但是在將計算結果寫入到外部數據源時,因為外部數據源架構與訪問方式的多樣性,始終沒能找到一個統一的解決方案來保證一致性(我們稱為Sink算子一致性問題)。再加上es本身沒有事務處理的能力,因此如何保證寫入es數據一致性成為了熱點話題。

      我們舉一個簡單的例子來說明一下,圖1在SparkRDD中(這里假設是一個task),每一條藍色的線代表100萬條數據,那么10條藍色的線表示了有1000萬條數據準備寫入到CSS(華為云搜索服務,內部為es)的某個index中。在寫入過程中,系統發生了故障,導致只有一半(500萬條)數據成功寫入。

      task是Spark執行任務的最小單元,如果task失敗了,當前task需要整個重新執行。所以,當我們重新執行寫入操作(圖2),并最終重試成功之后(這次用紅色來表示相同的1000萬條數據),上一次失敗留下的500萬條數據依然存在(藍色的線),變成臟數據。臟數據對數據計算的正確性帶來了很嚴重的影響。因此,我們需要探索一種方法,能夠實現Spark寫入es數據的可靠性與正確性。

      圖1 Spark task失敗時向es寫入了部分數據

      圖2 task重試成功后上一次寫入的部分數據成為臟數據

      l? 解決方案

      1)????? 寫覆蓋

      從上圖中,我們可以很直觀的看出來,每次task插入數據前,先將es的index中的數據都清空就可以了。那么,每次寫入操作可以看成是以下3個步驟的組合:

      l? 步驟一 判斷當前index中是否有數據

      l? 步驟二 清空當前index中的數據

      l? 步驟三 向index中寫入數據

      換一種角度,我們可以理解為,不管之前是否執行了數據寫入,也不管之前數據寫入了多少次,我們只想要保證當前這一次寫入能夠獨立且正確地完成,這種思想我們稱為冪等。

      冪等式寫入是大數據sink算子解決一致性問題的一種常見思路,另一種說法叫做最終一致性,其中最簡單的做法就是“insert overwrite”。當Spark數據寫入es失敗并嘗試重新執行的時候,利用覆蓋式寫入,可以將index中的殘留數據覆蓋掉。

      圖 使用overwrite模式,task重試時覆蓋上一次數據

      在DLI中,可以在DataFrame接口里將mode設置成“overwrite”來實現覆蓋寫es:

      val?dfWriter?=?sparkSession.createDataFrame(rdd,?schema)

      //

      // 寫入數據至es

      //

      dfWriter.write

      .format("es")

      .option("es.resource",?resource)

      .option("es.nodes",?nodes)

      .mode(SaveMode.Overwrite)

      .save()

      也可以直接使用sql語句:

      //?插入數據至es

      sparkSession.sql("insert?overwrite?table?es_table?values(1,?'John'),(2,?'Bob')")

      2)????? 最終一致性

      利用上述“overwrite”的方式解決容錯問題有一個很大的缺陷。如果es已經存在了正確的數據,這次只是需要追加寫入。那么overwrite會把之前index的正確的數據都覆蓋掉。

      比如說,有多個task并發執行寫入數據的操作,其中一個task執行失敗而其他task執行成功,重新執行失敗的task進行“overwrite”會將其他已經成功寫入的數據覆蓋掉。再比如說,Streaming場景中,每一批次數據寫入都變成覆蓋,這是不合理的方式。

      圖 Spark追加數據寫入es

      圖 用overwrite寫入會將原先正確的數據覆蓋掉

      其實,我們想做的事情,只是清理臟數據而不是所有index中的數據。因此,核心問題變成了如何識別臟數據?借鑒其他數據庫解決方案,我們似乎可以找到方法。在MySQL中,有一個insert ignore into的語法,如果遇到主鍵沖突,能夠單單對這一行數據進行忽略操作,而如果沒有沖突,則進行普通的插入操作。這樣就可以將覆蓋數據的力度細化到了行級別。

      es中有類似的功能么?假如es中每一條數據都有主鍵,主鍵沖突時可以進行覆蓋(忽略和覆蓋其實都能解決這個問題),那么在task失敗重試時,就可以僅針對臟數據進行覆蓋。

      我們先來看一下Elasticsearch中的概念與關系型數據庫之間的一種對照關系:

      Elasticsearch

      關系型數據庫

      Index

      Database

      Type

      Table

      Document

      Row

      Field

      Column

      我們知道,MySQL中的主鍵是對于一行數據(Row)的唯一標識。從表中可以看到,Row對應的就是es中的Document。那么,Document有沒有唯一的標識呢?

      答案是肯定的,每一個Document都有一個id,即doc_id。doc_id是可配置的,index、type、doc_id三者指定了唯一的一條數據(Document)。并且,在插入es時,index、type、doc_id相同,原先的document數據將會被覆蓋掉。因此,doc_id可以等效于“MySQL主鍵沖突忽略插入”功能,即“doc_id沖突覆蓋插入”功能。

      因此,DLI的SQL語法中提供了配置項“es.mapping.id”,可以指定一個字段作為Document id,例如:

      create?table?es_table(id?int,?name?string)?using?es?options(

      'es.nodes'?'localhost:9200',

      'es.resource'?'/mytest/anytype',

      'es.mapping.id'?'id')")

      這里指定了字段“id”作為es的doc_id,當插入數據時,字段“id”的值將成為插入Document的id。值得注意的是,“id”的值要唯一,否則相同的“id”將會使數據被覆蓋。

      這時,如果遇到作業或者task失敗的情況,直接重新執行即可。當最終作業執行成功時,es中將不會出現殘留的臟數據,即實現了最終一致性。

      圖 在插入數據時將主鍵設為doc_id,利用冪等插入來實現最終一致性

      3.?? 總結

      本文可以一句話總結為“利用doc_id實現寫入es的最終一致性”。而這種問題,實際上不需要如此大費周章的探索,因為在es的原生API中,插入數據是需要指定doc_id,這應該是一個基本常識:(詳細API說明可以參考:https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-bulk.html)

      圖 es使用bulk接口進行數據寫入

      權當消遣,聊以慰藉。

      得益于Base理論,最終一致性成為分布式計算中重要的解決方案之一。盡管該解決方案還有一定的限制(比如本文的解決方案中數據必須使用主鍵),而業界還有很多分布式一致性的解決方案(比如2PC、3PC)。但個人認為,衡量工作量與最終效果,最終一致性是一種很有效且很簡約的解決方案。

      4.?? 擴展閱讀:Elasticsearch Datasource

      l? 簡介

      Datasource是Apache Spark提供的訪問外部數據源的統一接口。Spark提供了SPI機制對Datasource進行了插件式管理,可以通過Spark的Datasource模塊自定義訪問Elasticsearch的邏輯。

      華為云DLI(數據湖探索)服務已完全實現了es datasource功能,用戶只要通過簡單的SQL語句或者Spark DataFrame API就能實現Spark訪問es。

      l? 功能描述

      通過Spark訪問es,可以在DLI官方文檔中找到詳細資料:https://support.huaweicloud.com/usermanual-dli/dli_01_0410.html。(Elasticsearch是由華為云CSS云搜索服務提供)。

      可以使用Spark DataFrame API方式來進行數據的讀寫:

      //

      // 初始化設置

      //

      // 設置es的/index/type(es 6.x版本不支持同一個index中存在多個type,7.x版本不支持設置type)

      val resource = "/mytest/anytype";

      // 設置es的連接地址(格式為”node1:port,node2:port...”,因為es的replica機制,即使訪問es集群,只需要配置一個地址即可.)

      val nodes = "localhost:9200"

      // 構造數據

      val?schema?=?StructType(Seq(StructField("id",?IntegerType,?false),?StructField("name",?StringType,?false)))

      val?rdd?=?sparkSession.sparkContext.parallelize(Seq(Row(1,?"John"),Row(2,"Bob")))

      val?dfWriter?=?sparkSession.createDataFrame(rdd,?schema)

      //

      // 寫入數據至es

      //

      dfWriter.write

      .format("es")

      .option("es.resource",?resource)

      .option("es.nodes",?nodes)

      .mode(SaveMode.Append)

      .save()

      //

      //?從es讀取數據

      數據湖應用解析:Spark on Elasticsearch一致性問題

      //

      val?dfReader?=?sparkSession.read.format("es").option("es.resource",resource).option("es.nodes",?nodes).load()

      dfReader.show()

      也可以使用Spark SQL來訪問:

      //?創建一張關聯es?/index/type的Spark臨時表,該表并不存放實際數據

      val?sparkSession?=?SparkSession.builder().getOrCreate()

      sparkSession.sql("create?table?es_table(id?int,?name?string)?using?es?options(

      'es.nodes'?'localhost:9200',

      'es.resource'?'/mytest/anytype')")

      //?插入數據至es

      sparkSession.sql("insert?into?es_table?values(1,?'John'),(2,?'Bob')")

      //?從es中讀取數據

      val?dataFrame?=?sparkSession.sql("select?*?from?es_table")

      dataFrame.show()

      spark Elasticsearch 數據湖探索 DLI

      版權聲明:本文內容由網絡用戶投稿,版權歸原作者所有,本站不擁有其著作權,亦不承擔相應法律責任。如果您發現本站中有涉嫌抄襲或描述失實的內容,請聯系我們jiasou666@gmail.com 處理,核實后本網站將在24小時內刪除侵權內容。

      上一篇:【Android 安裝包優化】Android 中使用 7zr 可執行程序 解壓縮文件
      下一篇:KubeCon 2019 中國 - 參會主題分享
      相關文章
      亚洲日韩中文字幕一区| 另类小说亚洲色图| 亚洲国产成人精品青青草原| 亚洲国产综合无码一区| 中文字幕亚洲不卡在线亚瑟| 亚洲国产精品一区二区九九 | 亚洲日韩中文无码久久| 亚洲综合小说另类图片动图| 久久精品亚洲精品国产色婷| 亚洲黄色网址在线观看| 精品亚洲成a人片在线观看少妇 | 亚洲va国产va天堂va久久| 亚洲日韩精品无码专区网站| 亚洲人成网亚洲欧洲无码| 亚洲一级毛片免费观看| 亚洲AV无码专区国产乱码电影| 亚洲色偷偷综合亚洲AVYP| 亚洲日韩精品无码AV海量| 亚洲成色999久久网站| 国产AV无码专区亚洲A∨毛片| 亚洲中文字幕无码亚洲成A人片| 老色鬼久久亚洲AV综合| 亚洲无线一二三四区手机| 亚洲а∨精品天堂在线| 久久无码av亚洲精品色午夜 | 亚洲一区在线视频| 亚洲www在线观看| 亚洲精品无码久久久久牙蜜区| 亚洲综合精品第一页| 亚洲成AV人影片在线观看| 国产亚洲精品欧洲在线观看| 日韩精品电影一区亚洲| 亚洲精品无码激情AV| 亚洲人成影院在线无码按摩店| 久久青青草原亚洲AV无码麻豆| 亚洲高清不卡视频| 激情亚洲一区国产精品| 亚洲精品无码不卡在线播放| 亚洲第一成人影院| 亚洲一区AV无码少妇电影☆| 亚洲AV无码一区东京热久久|