Spark性能優(yōu)化 (2) | 算子調(diào)優(yōu)

      網(wǎng)友投稿 1019 2022-05-29

      大家好,我是不溫卜火,是一名計算機(jī)學(xué)院大數(shù)據(jù)專業(yè)大二的學(xué)生,昵稱來源于成語—不溫不火,本意是希望自己性情溫和。作為一名互聯(lián)網(wǎng)行業(yè)的小白,博主寫博客一方面是為了記錄自己的學(xué)習(xí)過程,另一方面是總結(jié)自己所犯的錯誤希望能夠幫助到很多和自己一樣處于起步階段的萌新。但由于水平有限,博客中難免會有一些錯誤出現(xiàn),有紕漏之處懇請各位大佬不吝賜教!暫時只有csdn這一個平臺,博客主頁:https://buwenbuhuo.blog.csdn.net/

      本片博文為大家?guī)淼氖撬阕诱{(diào)優(yōu)。

      Spark性能優(yōu)化 (2) | 算子調(diào)優(yōu)

      目錄

      一. mapPartitions

      二. foreachPartition 優(yōu)化數(shù)據(jù)庫操作

      三. filter 與 coalesce 的配合使用

      四. repartition解決 SparkSQL 低并行度問題

      五. reduceByKey 預(yù)聚合

      一. mapPartitions

      普通的 map 算子對 RDD 中的每一個元素進(jìn)行操作,而 mapPartitions 算子對 RDD 中每一個分區(qū)進(jìn)行操作。

      如果是普通的map算子,假設(shè)一個 partition 有 1 萬條數(shù)據(jù),那么 map 算子中的 function 要執(zhí)行1萬次,也就是對每個元素進(jìn)行操作。

      如果是 mapPartition 算子,由于一個 task 處理一個 RDD 的partition,那么一個task只會執(zhí)行一次function,function一次接收所有的partition數(shù)據(jù),效率比較高。

      比如,當(dāng)要把 RDD 中的所有數(shù)據(jù)通過 JDBC 寫入數(shù)據(jù),如果使用 map 算子,那么需要對 RDD 中的每一個元素都創(chuàng)建一個數(shù)據(jù)庫連接,這樣對資源的消耗很大,如果使用mapPartitions算子,那么針對一個分區(qū)的數(shù)據(jù),只需要建立一個數(shù)據(jù)庫連接。

      mapPartitions算子也存在一些缺點:對于普通的map操作,一次處理一條數(shù)據(jù),如果在處理了2000條數(shù)據(jù)后內(nèi)存不足,那么可以將已經(jīng)處理完的2000條數(shù)據(jù)從內(nèi)存中垃圾回收掉;但是如果使用mapPartitions算子,但數(shù)據(jù)量非常大時,function一次處理一個分區(qū)的數(shù)據(jù),如果一旦內(nèi)存不足,此時無法回收內(nèi)存,就可能會OOM,即內(nèi)存溢出。

      因此,mapPartitions算子適用于數(shù)據(jù)量不是特別大的時候,此時使用mapPartitions算子對性能的提升效果還是不錯的。(當(dāng)數(shù)據(jù)量很大的時候,一旦使用mapPartitions算子,就會直接OOM) 在項目中,應(yīng)該首先估算一下RDD的數(shù)據(jù)量、每個partition的數(shù)據(jù)量,以及分配給每個Executor的內(nèi)存資源,如果資源允許,可以考慮使用mapPartitions算子代替map。

      二. foreachPartition 優(yōu)化數(shù)據(jù)庫操作

      在生產(chǎn)環(huán)境中,通常使用foreachPartition算子來完成數(shù)據(jù)庫的寫入,通過foreachPartition算子的特性,可以優(yōu)化寫數(shù)據(jù)庫的性能。

      如果使用foreach算子完成數(shù)據(jù)庫的操作,由于foreach算子是遍歷RDD的每條數(shù)據(jù),因此,每條數(shù)據(jù)都會建立一個數(shù)據(jù)庫連接,這是對資源的極大浪費(fèi),因此,對于寫數(shù)據(jù)庫操作,我們應(yīng)當(dāng)使用foreachPartition算子。 與mapPartitions算子非常相似,foreachPartition是將RDD的每個分區(qū)作為遍歷對象,一次處理一個分區(qū)的數(shù)據(jù),也就是說,如果涉及數(shù)據(jù)庫的相關(guān)操作,一個分區(qū)的數(shù)據(jù)只需要創(chuàng)建一次數(shù)據(jù)庫連接:

      使用了foreachPartition算子后,可以獲得以下的性能提升:

      對于我們寫的function函數(shù),一次處理一整個分區(qū)的數(shù)據(jù);

      對于一個分區(qū)內(nèi)的數(shù)據(jù),創(chuàng)建唯一的數(shù)據(jù)庫連接;

      只需要向數(shù)據(jù)庫發(fā)送一次SQL語句和多組參數(shù);

      在生產(chǎn)環(huán)境中,全部都會使用foreachPartition算子完成數(shù)據(jù)庫操作。foreachPartition算子存在一個問題,與mapPartitions算子類似,如果一個分區(qū)的數(shù)據(jù)量特別大,可能會造成OOM,即內(nèi)存溢出。

      三. filter 與 coalesce 的配合使用

      在Spark任務(wù)中我們經(jīng)常會使用filter算子完成RDD中數(shù)據(jù)的過濾,在任務(wù)初始階段,從各個分區(qū)中加載到的數(shù)據(jù)量是相近的,但是一旦進(jìn)過filter過濾后,每個分區(qū)的數(shù)據(jù)量有可能會存在較大差異

      根據(jù)上圖我們可以發(fā)現(xiàn)兩個問題:

      每個partition的數(shù)據(jù)量變小了,如果還按照之前與partition相等的task個數(shù)去處理當(dāng)前數(shù)據(jù),有點浪費(fèi)task的計算資源;

      每個partition的數(shù)據(jù)量不一樣,會導(dǎo)致后面的每個task處理每個partition數(shù)據(jù)的時候,每個task要處理的數(shù)據(jù)量不同,這很有可能導(dǎo)致數(shù)據(jù)傾斜問題。

      在上圖中, 第二個分區(qū)的數(shù)據(jù)過濾后只剩100條,而第三個分區(qū)的數(shù)據(jù)過濾后剩下800條,在相同的處理邏輯下,第二個分區(qū)對應(yīng)的task處理的數(shù)據(jù)量與第三個分區(qū)對應(yīng)的task處理的數(shù)據(jù)量差距達(dá)到了8倍,這也會導(dǎo)致運(yùn)行速度可能存在數(shù)倍的差距,這也就是數(shù)據(jù)傾斜問題。

      針對上述的兩個問題,我們分別進(jìn)行分析:

      針對第一個問題,既然分區(qū)的數(shù)據(jù)量變小了,我們希望可以對分區(qū)數(shù)據(jù)進(jìn)行重新分配,比如將原來4個分區(qū)的數(shù)據(jù)轉(zhuǎn)化到2個分區(qū)中,這樣只需要用后面的兩個task進(jìn)行處理即可,避免了資源的浪費(fèi)。

      針對第二個問題,解決方法和第一個問題的解決方法非常相似,對分區(qū)數(shù)據(jù)重新分配,讓每個partition中的數(shù)據(jù)量差不多,這就避免了數(shù)據(jù)傾斜問題。

      那么具體應(yīng)該如何實現(xiàn)上面的解決思路?我們需要coalesce算子。

      repartition與coalesce都可以用來進(jìn)行重分區(qū),其中repartition只是coalesce接口中shuffle為true的簡易實現(xiàn),coalesce默認(rèn)情況下不進(jìn)行shuffle,但是可以通過參數(shù)進(jìn)行設(shè)置。

      假設(shè)我們希望將原本的分區(qū)個數(shù)A通過重新分區(qū)變?yōu)锽,那么有以下幾種情況: 1. A > B(多數(shù)分區(qū)合并為少數(shù)分區(qū))

      A與B相差值不大

      此時使用coalesce即可,無需shuffle過程。

      A與B相差值很大

      此時可以使用 coalesce 并且不啟用 shuffle 過程,但是會導(dǎo)致合并過程性能低下,所以推薦設(shè)置 coalesce 的第二個參數(shù)為 true,即啟動 shuffle 過程。

      A < B(少數(shù)分區(qū)分解為多數(shù)分區(qū))

      此時使用repartition即可,如果使用coalesce需要將shuffle設(shè)置為true,否則coalesce無效。

      總結(jié): 我們可以在filter操作之后,使用coalesce算子針對每個partition的數(shù)據(jù)量各不相同的情況,壓縮partition的數(shù)量,而且讓每個partition的數(shù)據(jù)量盡量均勻緊湊,以便于后面的task進(jìn)行計算操作,在某種程度上能夠在一定程度上提升性能。

      注意:local模式是進(jìn)程內(nèi)模擬集群運(yùn)行,已經(jīng)對并行度和分區(qū)數(shù)量有了一定的內(nèi)部優(yōu)化,因此不用去設(shè)置并行度和分區(qū)數(shù)量。

      四. repartition解決 SparkSQL 低并行度問題

      在第一節(jié)的常規(guī)性能調(diào)優(yōu)中我們講解了并行度的調(diào)節(jié)策略,但是,并行度的設(shè)置對于Spark SQL是不生效的,用戶設(shè)置的并行度只對于Spark SQL以外的所有Spark的stage生效。

      Spark SQL的并行度不允許用戶自己指定,Spark SQL自己會默認(rèn)根據(jù) hive 表對應(yīng)的 HDFS 文件的 split 個數(shù)自動設(shè)置 Spark SQL 所在的那個 stage 的并行度,用戶自己通spark.default.parallelism參數(shù)指定的并行度,只會在沒Spark SQL的stage中生效。

      由于Spark SQL所在stage的并行度無法手動設(shè)置,如果數(shù)據(jù)量較大,并且此stage中后續(xù)的transformation操作有著復(fù)雜的業(yè)務(wù)邏輯,而Spark SQL自動設(shè)置的task數(shù)量很少,這就意味著每個task要處理為數(shù)不少的數(shù)據(jù)量,然后還要執(zhí)行非常復(fù)雜的處理邏輯,這就可能表現(xiàn)為第一個有 Spark SQL 的 stage 速度很慢,而后續(xù)的沒有 Spark SQL 的 stage 運(yùn)行速度非常快。

      為了解決Spark SQL無法設(shè)置并行度和 task 數(shù)量的問題,我們可以使用repartition算子。

      Spark SQL這一步的并行度和task數(shù)量肯定是沒有辦法去改變了,但是,對于Spark SQL查詢出來的RDD,立即使用repartition算子,去重新進(jìn)行分區(qū),這樣可以重新分區(qū)為多個partition,從repartition之后的RDD操作,由于不再涉及 Spark SQL,因此 stage 的并行度就會等于你手動設(shè)置的值,這樣就避免了 Spark SQL 所在的 stage 只能用少量的 task 去處理大量數(shù)據(jù)并執(zhí)行復(fù)雜的算法邏輯。

      五. reduceByKey 預(yù)聚合

      reduceByKey相較于普通的shuffle操作一個顯著的特點就是會進(jìn)行map端的本地聚合,map端會先對本地的數(shù)據(jù)進(jìn)行combine操作,然后將數(shù)據(jù)寫入給下個stage的每個task創(chuàng)建的文件中,也就是在map端,對每一個key對應(yīng)的value,執(zhí)行reduceByKey算子函數(shù)。

      使用reduceByKey對性能的提升如下: 1. 本地聚合后,在map端的數(shù)據(jù)量變少,減少了磁盤IO,也減少了對磁盤空間的占用; 2. 本地聚合后,下一個stage拉取的數(shù)據(jù)量變少,減少了網(wǎng)絡(luò)傳輸?shù)臄?shù)據(jù)量; 3. 本地聚合后,在reduce端進(jìn)行數(shù)據(jù)緩存的內(nèi)存占用減少; 4. 本地聚合后,在reduce端進(jìn)行聚合的數(shù)據(jù)量減少。

      基于reduceByKey的本地聚合特征,我們應(yīng)該考慮使用reduceByKey代替其他的shuffle算子,例如groupByKey。

      reduceByKey與groupByKey的運(yùn)行原理如圖:

      根據(jù)上圖可知,groupByKey不會進(jìn)行map端的聚合,而是將所有map端的數(shù)據(jù)shuffle到reduce端,然后在reduce端進(jìn)行數(shù)據(jù)的聚合操作。由于reduceByKey有map端聚合的特性,使得網(wǎng)絡(luò)傳輸?shù)臄?shù)據(jù)量減小,因此效率要明顯高于groupByKey。

      本次的分享就到這里了,

      好書不厭讀百回,熟讀課思子自知。而我想要成為全場最靚的仔,就必須堅持通過學(xué)習(xí)來獲取更多知識,用知識改變命運(yùn),用博客見證成長,用行動證明我在努力。

      如果我的博客對你有幫助、如果你喜歡我的博客內(nèi)容,請“” “評論”“”一鍵三連哦!聽說的人運(yùn)氣不會太差,每一天都會元?dú)鉂M滿呦!如果實在要白嫖的話,那祝你開心每一天,歡迎常來我博客看看。

      碼字不易,大家的支持就是我堅持下去的動力。后不要忘了關(guān)注我哦!

      spark 數(shù)據(jù)庫

      版權(quán)聲明:本文內(nèi)容由網(wǎng)絡(luò)用戶投稿,版權(quán)歸原作者所有,本站不擁有其著作權(quán),亦不承擔(dān)相應(yīng)法律責(zé)任。如果您發(fā)現(xiàn)本站中有涉嫌抄襲或描述失實的內(nèi)容,請聯(lián)系我們jiasou666@gmail.com 處理,核實后本網(wǎng)站將在24小時內(nèi)刪除侵權(quán)內(nèi)容。

      上一篇:互聯(lián)網(wǎng)公司面試必問的Redis題目
      下一篇:一文搞懂Java日志級別,重復(fù)記錄、丟日志問題
      相關(guān)文章
      亚洲一区无码中文字幕乱码| 亚洲国产精品张柏芝在线观看| 亚洲综合激情五月丁香六月| 国产成人精品日本亚洲网址| 亚洲一区电影在线观看| 亚洲国产成人综合| 亚洲免费在线视频播放| 亚洲国产日韩在线| 久久精品国产亚洲AV忘忧草18| 亚洲中文无码av永久| 亚洲伊人久久大香线蕉在观 | 亚洲日本韩国在线| 国产a v无码专区亚洲av| 亚洲人成网站观看在线播放| 亚洲一区二区精品视频| 久久亚洲欧洲国产综合| 亚洲色成人中文字幕网站| 久久精品国产亚洲网站| 亚洲AV无码国产在丝袜线观看| 亚洲av无码乱码国产精品| 亚洲AV成人无码久久精品老人 | 亚洲AV无码一区二区三区性色 | 理论亚洲区美一区二区三区| 亚洲国产中文字幕在线观看| 久久亚洲色一区二区三区| 亚洲精品国产美女久久久| 亚洲av无码av制服另类专区| 久久久国产精品亚洲一区| 亚洲欧洲日产专区| 亚洲偷自拍另类图片二区| 色噜噜噜噜亚洲第一| 亚洲国模精品一区| 亚洲精品乱码久久久久久久久久久久 | 激情五月亚洲色图| 亚洲国产精华液2020| 亚洲高清国产拍精品青青草原 | 色婷五月综激情亚洲综合| 亚洲偷自拍另类图片二区| 亚洲成年看片在线观看| 亚洲欧洲自拍拍偷午夜色无码| 亚洲国产精品久久66|