Spark性能優化 (1) | 常規性能調優
大家好,我是不溫卜火,是一名計算機學院大數據專業大二的學生,昵稱來源于成語—不溫不火,本意是希望自己性情溫和。作為一名互聯網行業的小白,博主寫博客一方面是為了記錄自己的學習過程,另一方面是總結自己所犯的錯誤希望能夠幫助到很多和自己一樣處于起步階段的萌新。但由于水平有限,博客中難免會有一些錯誤出現,有紕漏之處懇請各位大佬不吝賜教!暫時只有csdn這一個平臺,博客主頁:https://buwenbuhuo.blog.csdn.net/
本片博文為大家帶來的是常規性能調優。
目錄
一. 最優資源配置
二. RDD 優化
三. 并行度調節
四. 常規性能調優四:廣播大變量
五. Kryo 序列化
六. 調節本地化等待時間
一. 最優資源配置
Spark 性能調優的第一步,就是為任務分配更多的資源,在一定范圍內,增加資源的分配與性能的提升是成正比的,實現了最優的資源配置后,在此基礎上再考慮進行后面論述的性能調優策略。
資源的分配在使用腳本提交Spark任務時進行指定,標準的Spark任務提交腳本如代碼清單:
opt/modules/spark/bin/spark-submit \ --class com.buwenbuhuo.spark.Analysis \ --num-executors 80 \ --driver-memory 6g \ --executor-memory 6g \ --executor-cores 3 \ opt/modules/spark/jar/spark.jar \
1
2
3
4
5
6
7
8
調節原則:盡量將任務分配的資源調節到可以使用的資源的最大限度。
對于具體資源的分配,我們分別討論 Spark 的兩種 Cluste 運行模式:
第一種是SparkStandalone模式,你在提交任務前,一定知道或者可以從運維部門獲取到你可以使用的資源情況,在編寫submit腳本的時候,就根據可用的資源情況進行資源的分配,比如說集群有15臺機器,每臺機器為8G內存,2個CPU core,那么就指定15個Executor,每個Executor分配8G內存,2個CPU core。
第二種是Spark Yarn模式,由于Yarn使用資源隊列進行資源的分配和調度,在表寫 submit腳本的時候,就根據Spark作業要提交到的資源隊列,進行資源的分配,比如資源隊列有400G內存,100個CPU
core,那么指定50個Executor,每個Executor分配8G內存,2個CPU core。
資源調節后的性能提升
生產環境Spark submit腳本配置
/usr/local/spark/bin/spark-submit \ --class com.buwenbuhuo.spark.WordCount \ --num-executors 80 \ --driver-memory 6g \ --executor-memory 6g \ --executor-cores 3 \ --master yarn \ --deploy-mode cluster \ --queue root.default \ --conf spark.executor.memoryOverhead=2048 \ --conf spark.core.connection.ack.wait.timeout=300 \ /usr/local/spark/spark.jar
1
2
3
4
5
6
7
8
9
10
11
12
13
–num-executors:50~100
–driver-memory:1G~5G
–executor-memory:6G~10G
–executor-cores:3
–master: yarn
–deploy-mode: cluster
如果想看官網的具體配置,博主再此給出官方網址,可自行查看:
http://spark.apache.org/docs/latest/configuration.html
二. RDD 優化
1. RDD 復用
在對RDD進行算子時,要避免相同的算子和計算邏輯之下對 RDD 進行重復的計算:
對上圖中的RDD計算架構進行修改:
2. RDD 持久化
在Spark中,當多次對同一個 RDD 執行算子操作時,每一次都會對這個 RDD 的祖先 RDD 重新計算一次,這種情況是必須要避免的,對同一個RDD的重復計算是對資源的極大浪費,因此,必須對多次使用的RDD進行持久化,通過持久化將公共RDD的數據緩存到內存/磁盤中,之后對于公共RDD的計算都會從內存/磁盤中直接獲取RDD數據。 對于RDD的持久化,有兩點需要說明:
RDD的持久化是可以進行序列化的,當內存無法將RDD的數據完整的進行存放的時候,可以考慮使用序列化的方式減小數據體積,將數據完整存儲在內存中。
如果對于數據的可靠性要求很高,并且內存充足,可以使用副本機制,對RDD數據進行持久化。當持久化啟用了復本機制時,對于持久化的每個數據單元都存儲一個副本,放在其他節點上面,由此實現數據的容錯,一旦一個副本數據丟失,不需要重新計算,還可以使用另外一個副本。
3. RDD 盡可能早的 filter 操作
獲取到初始RDD后,應該考慮盡早地過濾掉不需要的數據,進而減少對內存的占用,從而提升Spark作業的運行效率。
三. 并行度調節
Spark作業中的并行度指各個stage 的 task 的數量。
如果并行度設置不合理而導致并行度過低,會導致資源的極大浪費,例如,20個 Executor,每個 Executor 分配 3 個CPU core,而Spark作業有 40 個task,這樣每個Executor分配到的task個數是2個,這就使得每個Executor有一個CPU core空閑,導致資源的浪費。
理想的并行度設置,應該是讓并行度與資源相匹配,簡單來說就是在資源允許的前提下,并行度要設置的盡可能大,達到可以充分利用集群資源。合理的設置并行度,可以提升整個 Spark 作業的性能和運行速度。
Spark官方推薦,task數量應該設置為Spark作業總CPU core數量的2~3倍。之所以沒有推薦task數量與CPU core總數相等,是因為task的執行時間不同,有的task執行速度快而有的task執行速度慢,如果task數量與CPU core總數相等,那么執行快的task執行完成后,會出現CPU core空閑的情況。如果task數量設置為CPU core總數的2~3倍,那么一個task執行完畢后,CPU core會立刻執行下一個task,降低了資源的浪費,同時提升了Spark作業運行的效率。
Spark作業并行度的設置如代碼:
val conf = new SparkConf() .set("spark.default.parallelism", "500")
1
2
3
四. 常規性能調優四:廣播大變量
默認情況下,task 中的算子中如果使用了外部的變量,每個 task 都會獲取一份變量的復本,這就造成了內存的極大消耗。
一方面,如果后續對 RDD 進行持久化,可能就無法將 RDD 數據存入內存,只能寫入磁盤,磁盤IO將會嚴重消耗性能;
另一方面,task在創建對象的時候,也許會發現堆內存無法存放新創建的對象,這就會導致頻繁的GC,GC會導致工作線程停止,進而導致Spark暫停工作一段時間,嚴重影響Spark性能。
假設當前任務配置了20個Executor,指定500個task,有一個20M的變量被所有task共用,此時會在500個task中產生500個副本,耗費集群10G的內存,如果使用了廣播變量, 那么每個Executor保存一個副本,一共消耗400M內存,內存消耗減少了5倍。
廣播變量在每個Executor保存一個副本,此Executor的所有task共用此廣播變量,這讓變量產生的副本數量大大減少。
在初始階段,廣播變量只在Driver中有一份副本。task在運行的時候,想要使用廣播變量中的數據,此時首先會在自己本地的Executor對應的BlockManager中嘗試獲取變量,如果本地沒有,BlockManager就會從Driver或者其他節點的BlockManager上遠程拉取變量的復本,并由本地的BlockManager進行管理;之后此Executor的所有task都會直接從本地的BlockManager中獲取變量。
五. Kryo 序列化
默認情況下,Spark 使用 Java 的序列化機制。Java的序列化機制使用方便,不需要額外的配置,在算子中使用的變量實現Serializable接口即可,但是,Java 序列化機制的效率不高,序列化速度慢并且序列化后的數據所占用的空間依然較大。
Kryo序列化機制比Java序列化機制性能提高10倍左右,Spark之所以沒有默認使用Kryo作為序列化類庫,是因為它不支持所有對象的序列化,同時Kryo需要用戶在使用前注冊需要序列化的類型,不夠方便,但從Spark 2.0.0版本開始,簡單類型、簡單類型數組、字符串類型的Shuffling RDDs 已經默認使用Kryo序列化方式了。
public class MyKryoRegistrator implements KryoRegistrator{ @Override public void registerClasses(Kryo kryo){ kryo.register(StartupReportLogs.class); } } //創建SparkConf對象 val conf = new SparkConf().setMaster(…).setAppName(…) //使用Kryo序列化庫,如果要使用Java序列化庫,需要把該行屏蔽掉 conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); //在Kryo序列化庫中注冊自定義的類集合,如果要使用Java序列化庫,需要把該行屏蔽掉 conf.set("spark.kryo.registrator", "buwenbuhuo.com.MyKryoRegistrator");
1
2
3
4
5
6
7
8
9
10
11
12
13
六. 調節本地化等待時間
Spark 作業運行過程中,Driver 會對每一個 stage 的 task 進行分配。根據 Spark 的 task 分配算法,Spark希望task能夠運行在它要計算的數據所在的節點(數據本地化思想),這樣就可以避免數據的網絡傳輸。
通常來說,task可能不會被分配到它處理的數據所在的節點,因為這些節點可用的資源可能已經用盡,此時,Spark會等待一段時間,默認3s,如果等待指定時間后仍然無法在指定節點運行,那么會自動降級,嘗試將task分配到比較差的本地化級別所對應的節點上,比如將task分配到離它要計算的數據比較近的一個節點,然后進行計算,如果當前級別仍然不行,那么繼續降級。
當task要處理的數據不在task所在節點上時,會發生數據的傳輸。task會通過所在節點的BlockManager獲取數據,BlockManager發現數據不在本地時,會通過網絡傳輸組件從數據所在節點的BlockManager處獲取數據。
網絡傳輸數據的情況是我們不愿意看到的,大量的網絡傳輸會嚴重影響性能,因此,我們希望通過調節本地化等待時長,如果在等待時長這段時間內,目標節點處理完成了一部分task,那么當前的task將有機會得到執行,這樣就能夠改善Spark作業的整體性能。
Spark本地化等級
在Spark項目開發階段,可以使用client模式對程序進行測試,此時,可以在本地看到比較全的日志信息,日志信息中有明確的task數據本地化的級別,如果大部分都是PROCESS_LOCAL,那么就無需進行調節,但是如果發現很多的級別都是NODE_LOCAL、ANY,那么需要對本地化的等待時長進行調節,通過延長本地化等待時長,看看task的本地化級別有沒有提升,并觀察Spark作業的運行時間有沒有縮短。 注意,過猶不及,不要將本地化等待時長延長地過長,導致因為大量的等待時長,使得Spark作業的運行時間反而增加了。
val conf = new SparkConf() .set("spark.locality.wait", "6")
1
2
3
本次的分享就到這里了,
好書不厭讀百回,熟讀課思子自知。而我想要成為全場最靚的仔,就必須堅持通過學習來獲取更多知識,用知識改變命運,用博客見證成長,用行動證明我在努力。
如果我的博客對你有幫助、如果你喜歡我的博客內容,請“” “評論”“”一鍵三連哦!聽說的人運氣不會太差,每一天都會元氣滿滿呦!如果實在要白嫖的話,那祝你開心每一天,歡迎常來我博客看看。
碼字不易,大家的支持就是我堅持下去的動力。后不要忘了關注我哦!
spark 應用性能調優
版權聲明:本文內容由網絡用戶投稿,版權歸原作者所有,本站不擁有其著作權,亦不承擔相應法律責任。如果您發現本站中有涉嫌抄襲或描述失實的內容,請聯系我們jiasou666@gmail.com 處理,核實后本網站將在24小時內刪除侵權內容。