Make Apache Spark better with CarbonData

      網友投稿 719 2022-05-30

      Spark 無疑是一個強大的處理引擎和一個用于更快處理的分布式集群計算框架。不幸的是,Spark在一些方面也存在不足。如果我們將 Apache Spark 與 Apache CarbonData 結合使用,它可以克服這些不足:

      1. 不支持 ACID transaction

      2. 沒有quality enforcement

      3. 小文件問題

      4. 低效的data skipping

      什么是ACID?

      Spark和ACID

      ATOMICITY

      ACID 中的 A 代表原子性。基本上,這意味著要么全部成功要么全部失敗。因此,當您使用 spark data frame writer API時,它應該寫入完整數據或不寫入任何數據。讓我們快速查看 Spark 文檔。根據 Spark 文檔:“It is important to realize that these save mode (overwrite) do not utilize any locking and are not atomic. Additionally, when performing an Overwrite, the data will be deleted before writing out the new data.”

      雖然整個情況看起來有點可怕,但實際上并沒有那么糟糕。 Spark dataframe API在內部執行作業級提交,這有助于實現一定程度的原子性,這與使用 Hadoop 的 FileOutputCommitter 的“append”模式一起工作。但是,默認實現會帶來性能開銷,尤其是在使用云存儲 [S3/OBS] 而不是 HDFS 時。

      我們現在可以運行以下代碼來證明 Spark overwrite 不是原子的,它可能導致數據損壞或數據丟失。代碼的第一部分模仿作業 1,它創建 100 條記錄并將其保存到 ACIDpath 目錄中。代碼的第二部分模仿作業 2,它試圖覆蓋現有數據,但在操作過程中引發異常。這兩項工作的結果是數據丟失。最后,我們丟失了第一個作業創建的數據。

      由于異常,作業級提交不會發生,因此不會保存新文件。由于 Spark 刪除了舊文件,我們丟失了現有數據。 Spark data frame writer API 不是原子的,但它的行為類似于追加操作的原子操作。

      CONSISTENCY

      分布式系統通常建立在可用性較低的機器之上。一致性是高可用性系統中的一個關鍵問題。如果所有節點同時看到并返回相同的數據,則系統是一致的。有幾種一致性模型,分布式系統中最常用的一種是強一致性、弱一致性和最終一致性。我們了解到,Spark writer API 的覆蓋模式會先刪除舊文件,然后再放置新文件。因此,在這兩種狀態之間,會有一段時間沒有數據可用。如果我們的工作失敗,那么我們將丟失數據。這意味著這兩個操作之間沒有平滑的事務。這是 Spark 覆蓋操作的典型原子性問題。而這個問題也破壞了數據的一致性。 Spark API 缺乏一致性。因此,Spark 寫模式不支持一致性。

      Isolation and Durability in Spark

      隔離意味著分離。與任何其他并發操作分離。假設我們正在寫入尚未提交的數據集,并且有另一個并發進程正在讀取/寫入同一數據集。根據隔離特性,在這種情況下,不應影響他人。典型的數據庫會提供不同的隔離級別,例如已提交讀和可序列化。雖然 Spark 有任務級提交和作業級提交,但由于寫操作缺乏原子性,Spark 無法提供適當的隔離。

      最后,Durability 是系統保存的已提交狀態/數據,這樣即使在出現故障和系統重啟的情況下,數據也能以正確的狀態使用。持久性由存儲層提供,在 Spark 應用程序的情況下,它是 HDFS 和 S3/OBS 的作用。然而,當 Spark 由于缺乏原子性而沒有提供適當的提交時,如果沒有適當的提交,我們就不能指望持久性。

      如果我們仔細觀察,所有這些 ACID 屬性都是相互關聯的。由于缺乏原子性,我們失去了一致性和隔離性,由于缺乏隔離性,我們失去了持久性。

      Lack of Schema Enforcement

      我們知道 Spark 在讀取時意味著 Schema。因此,當我們寫入任何數據時,如果有任何模式不匹配,它不會拋出異常。讓我們試著用一個例子來理解這一點。讓我們有一個包含以下記錄的輸入數組。下面的程序將讀取 csv 并轉換為 DF

      該程序從 CSV 文件中讀取,以鑲木地板格式寫回并顯示數據。輸出如下

      讓我們讀取另一個輸入 CSV 文件,其中“Cost”列具有十進制值而不是整數(如下所示),并對上述文件執行追加操作

      在這種情況下,我們的程序將讀取 CSV,毫無例外地寫入 Parquet 格式。當我們想要顯示/顯示數據幀時,我們的程序將拋出錯誤

      這是因為 Spark 在寫操作期間從不驗證模式。 “Cost”列的模式在第一次加載期間被推斷為整數,在第二次寫入期間,它會毫無問題地附加雙精度型數據。當我們讀取附加數據并調用操作時,由于模式不兼容,它會引發錯誤。

      How to overcome the above drawbacks of Spark

      如果我們使用 Apache Spark 將 CarbonData 作為存儲解決方案的附加層插入,則可以管理上述問題。

      What is CarbonData

      由于 Hadoop 分布式文件系統 (HDFS) 和對象存儲類似于文件系統,因此它們不是為提供事務支持而設計的。在分布式處理環境中實現事務是一個具有挑戰性的問題。例如,實施通常必須考慮鎖定對存儲系統的訪問,這是以整體吞吐量性能為代價的。 Apache CarbonData 等存儲解決方案通過將這些事務語義和規則推送到文件格式本身或元數據和文件格式組合中,有效地解決了數據湖的這些 ACID 要求。 CarbonData 在 Apache Spark 和存儲系統之間起到中介服務的作用。現在,遵守 ACID 的責任由 CarbonData 負責。底層存儲系統可以是 HDFS、華為 OBS、Amazon S3 或 Azure Blob Storage 之類的任何東西。 CarbonData 為 Spark 提供的幾個重要功能是:

      1. ACID transactions.

      2. Schema enforcement/Schema validation.

      3. Enables Updates, Deletes and Merge.

      4. Automatic data indexing.

      CarbonData in Apache Spark: ACID

      在上面的代碼片段中,代碼的第一部分模仿了 job-1,創建了 100 條記錄并將其保存到 ACIDpath 目錄中。代碼的第二部分模仿 job-2,它試圖覆蓋現有數據但在操作過程中拋出異常。

      這兩項工作的結果是數據丟失。最后,我們丟失了第一個作業創建的數據。現在讓我們更改如下所示的代碼以使用 CarbonData。

      執行第一個作業并計算行數。正如預期的那樣,您將獲得 100 行。

      如果您檢查數據目錄,您將看到一個snappy compressed CarbonData 文件。該數據文件以列式編碼格式保存 100 行。您還將看到一個包含 tablestatus 文件的元數據目錄。現在執行第二個作業。你對第二份工作有什么期望?如前所述,這項工作應該嘗試做以下事情。

      1. 刪除之前的文件。

      2. 創建一個新文件并開始寫入記錄。

      3. 在作業中間拋出運行時異常。

      Make Apache Spark better with CarbonData

      由于異常,作業級別提交不會發生,我們丟失了上述觀察到的現有數據在沒有使用 CarbonData 的情況下。

      但是現在如果你執行第二個作業,你仍然會得到一個異常。然后,計算行數。您得到的輸出為 100,并且不會丟失舊記錄。看起來 CarbonData 已經使 Overwrite 原子化了。我們看一下數據目錄,你會發現兩個 CarbonData 文件。

      一個文件由第一個作業創建,另一個文件由作業 2 創建。作業 2 沒有刪除舊文件,而是直接創建了一個新文件并開始向新文件寫入數據。這種方法使舊數據狀態保持不變。這就是為什么我們沒有丟失舊數據的原因,因為舊文件保持不變。新的不完整文件也在那里,但不讀取新的不完整文件中的數據。該邏輯隱藏在元數據目錄中,并使用 tablestatus 文件進行管理。第二個作業無法在 tablestatus 文件中創建成功的條目,因為它在中間失敗了。讀取 API 不會讀取 tablestatus 文件中的條目被標記為刪除的文件。

      這一次,讓我們無一例外地編寫代碼邏輯,用50條記錄覆蓋舊的100條記錄。

      Now the record count shows 50. As expected. So, you have overwritten the older data set of 100 rows with a new data set of 50 rows.

      CarbonData 將元數據管理引入 Apache Spark 并使 Spark 數據編寫器 API 具有原子性,從而解決了數據一致性問題。一旦一致性問題得到解決,CarbonData 將能夠提供更新和刪除功能。

      Spark With CarbonData: Schema Enforcement

      讓我們考慮一個簡單的用戶場景,其中數據分多批到達以進行轉換。這里為了簡單起見,讓我們假設只有 2 批數據,第二批數據攜帶一些與第一批數據不同類型的列數據。

      為了開始實驗,讓我們從表 1 中讀取數據,并使用和不使用 CarbonData 寫入數據。我們能夠使用“Overwrite”模式在有和沒有 CarbonData 的情況下寫入數據。

      現在讓我們讀取具有成本列類型的雙類型數據的第二個表,然后將數據幀寫入 Parquet 和 CarbonTables(注意:_c2 是整數類型,我們正在嘗試附加雙類型數據)。使用 parquet 附加模式不匹配的數據沒有問題,但是當程序嘗試將相同的數據附加到 CarbonData 表時,它會拋出錯誤:

      因此,基于上述實驗,我們可以看到 CarbonData 在寫入底層存儲之前驗證模式,這意味著 CarbonData 在寫入時使用模式驗證。如果類型不兼容,則 CarbonData 將取消交易。這將有助于在開始時跟蹤問題,而不是與好的數據混淆,然后嘗試找出根本原因。

      在我們的下一篇博客中,讓我們繼續討論更多關于:

      1. 使用 CarbonData 刪除和更新記錄。

      2. 使用 Merge 語句高效插入數據湖。

      英文鏈接:https://brijoobopanna.medium.com/making-apache-spark-better-with-carbondata-d37f98d235de

      FusionInsight 大數據 智能數據

      版權聲明:本文內容由網絡用戶投稿,版權歸原作者所有,本站不擁有其著作權,亦不承擔相應法律責任。如果您發現本站中有涉嫌抄襲或描述失實的內容,請聯系我們jiasou666@gmail.com 處理,核實后本網站將在24小時內刪除侵權內容。

      上一篇:一網打盡Flink算子大全(面試問的都在這里)
      下一篇:android注解使用詳解(圖文)
      相關文章
      亚洲福利电影在线观看| 亚洲最大在线观看| 亚洲熟女www一区二区三区| 亚洲AV综合色区无码另类小说| 亚洲av永久中文无码精品综合| 亚洲一卡一卡二新区无人区 | 久久亚洲最大成人网4438| 亚洲最大中文字幕| 亚洲妇女水蜜桃av网网站| 亚洲精品福利网泷泽萝拉| 色婷婷亚洲十月十月色天| 久久精品亚洲一区二区三区浴池| 亚洲综合国产精品| 亚洲精品国产福利在线观看| 亚洲第一成年人网站| 亚洲国产成人无码av在线播放 | 亚洲国产小视频精品久久久三级 | 亚洲日韩在线观看免费视频| 亚洲欧洲日产国码一级毛片 | 国产成人综合亚洲AV第一页 | 亚洲av无码一区二区三区天堂| 亚洲AV噜噜一区二区三区| 国产精品亚洲一区二区三区在线观看 | 亚洲精品无码少妇30P| 亚洲AV无码XXX麻豆艾秋| 亚洲AV之男人的天堂| 亚洲日韩涩涩成人午夜私人影院| 亚洲精品无码乱码成人| 亚洲精品免费在线观看| 亚洲午夜精品国产电影在线观看| 国产成人亚洲合集青青草原精品| 亚洲欧洲av综合色无码| 亚洲成年看片在线观看| 国产亚洲精久久久久久无码77777 国产亚洲精品成人AA片新蒲金 | 国产亚洲精品影视在线产品| 久久久久久a亚洲欧洲aⅴ| 精品日韩亚洲AV无码| 国产精品亚洲午夜一区二区三区| 亚洲色偷偷色噜噜狠狠99| 九九精品国产亚洲AV日韩| 中文字幕亚洲日韩无线码|