數值求和如何屏蔽掉時間
831
2025-03-31
前言:
Apache Spark 是專為大數據處理而設計的快速的計算引擎,Spark擁有Hadoop MapReduce所具有的優點;但不同于MapReduce的是—spark的輸出結果可以保存在內存中,不用再進行HDFS的讀寫,因此Spark被廣泛用于機器學習跟需要迭代計算類的算法。但是面對大量需要處理的數據,要讓Spark穩定快速的運行,這就需要對Spark進行全方位的調優,從而在工作中擁有更高的處理效率。本篇文章主要對Spark如何進行全方位的調優進行闡述
主要從下面幾點對Spark進行調優:
RDD是一個編程模型,是一種容錯的,并行的數據結構,可以讓用戶顯示的將數據儲存在磁盤與內存中,并且可以控制數據的分區。RDD一個很重要的特性就是可以相互依賴,如果RDD的每個分區只可以被一個子RDD分區使用,則稱之為窄依賴,可以被多個RDD分區使用則稱之為寬依賴。我們在進行一個Spark作業的時候,一般會讀取一個數據源作為一個初始的RDD,之后以此RDD為開始得到后面需要的RDD,形成一個RDD關系鏈。
在進行RDD創建的時候要避免RDD的重復創建,也就是不要對一份數據進行創建多個相同的RDD。重復創建RDD會對Spark帶來更大的性能開銷,如下:
//錯誤的創建RDD的方式 val rdd1 = sc.textFile("hdfs://localhost:9000/test.txt") rdd1.map() val rdd2 = sc.textFile("hdfs://localhost:9000/test.txt") rdd2.reduce() //正確的RDD創建方式形成一條RDD鏈 val rdd1 = sc.textFile("hdfs://localhost:9000/test.txt") val rdd2=rdd1.map() rdd2.reduce()
在執行作業的時候報錯可以說是每一個技術人員都會遇到的事情,Spark提供的作業日志就可以很好的幫助我們對出現的問題進行定位。Spark日志通常是排錯的唯一根據。一般的報錯我們可以從Spark的Driver日志中進行定位。也可以通過yarn-client的模式執行,將日志輸出到客戶端,方便我們進行查看,如下:
./bin/spark-submit \ --class org.apache.spark.examples.SparkPi \ --master yarn \ --deploy-mode client \ --executor-memory 20G \ --num-executors 50 \ /app/spark-2.2/examples/jars/xx.jar //這里指定自己路徑的jar包
根據運行的ID號可以查看到日志
使用這種方式進行報錯日志的定位往往是最有效的解決問題的辦法。
Shuffle表示數據從Map Task輸出到Reduce Task輸入的這段過程。shuffle是連接Map和Reduce之間的橋梁,Map的輸出要用到Reduce中必須經過shuffle這個環節,shuffle的性能高低直接影響了整個程序的性能和吞吐量。同時,Shuffle也是Spark進行作業的時候很關鍵的一個環節,也是對Spark進行性能調優的一個重點,下面是Spark進行詞頻統計作業時候的Map Reduce的過程
Spark中與Shuffle性能有關的參數:
spark.shuffle.file.buffer spark.reducer.maxSizeInFlight spark.shuffle.compress
1.第一個配置是Map端輸出結果的緩沖區大小。
2.第二個配置是Map端輸出結果文件的大小。
3.第三個配置是Map端是否開啟壓縮
第一個配置當然是越高越好,緩沖區越大,數據寫入的性能可想而知也是會越高的,所以如果機器條件優越的情況下,這個可以盡可能的調大,來提高Shuffle性能,在Reduce的過程中Reduce的Task所在的位置會按照spark.reducer.maxSizeInFlight的配置大小去拉取文件,之后用內存緩沖區來接收,所以提高spark.reducer.maxSizeInFlight的參數大小也是可以提高Shuffle的效率的。第三個配置一般都是默認開啟的,默認對Map端的輸出進行壓縮操作。
在Spark作業進行的時候,提高Spark作業的并行程度是提高運行效率的最有效的辦法。那么我們應該要明確spark中的并行度是指什么?spark中的并行度指的就是各個stage里面task的數量。
spark.default.parallelism textfile()
可以根據地2個參數來設置該作業的并行度。Spark任務的RDD一開始的分區數量時與HDFS上的數據塊數量保持一致的,通過coalesce 與 repartition 算子可以進行重分區,但是這個操作并不可以改變Rdeduce的分區數,改變的只是Map端的分區數量,想要對Reduce端的分區數量進行修改,就可以對spark.default.parallelism配置進行修改。通過在官網的描述中,設置的并行度為這個application 中cpu-core數量的2到3倍為最優。
Spark作業中內存的主要用途就是計算跟儲存。Spark在執行程序的時候,集群就會啟動Driver和Executor兩種JVM進程,Driver為主控進程,Executor負責執行具體的計算任務。 spark的進程是JVM進程,所以Executor的內存管理是建立在 JVM 的內存管理之上,所以還涉及到了堆的概念,內存受到 JVM 統一管理這一點就導致spark釋放內存的時候收到限制,所以Spark引入了堆外內存。
只要是在Executor內運行的任務一律共享 JVM 堆內存,按照用途主要可以分為三大類:Storage負責緩存數據和廣播變量數據,Execution負責執行Shuffle過程中占用的內存,剩下空間則是儲存Spark內部的對象實例。Spark雖然不可以精準的對堆內存進行控制,但是通過決定是否要在儲存的內存里面緩存新的RDD,是否為新的任務分配執行內存,也可以提高內存的利用率,相關的參數配置如下:
spark.memory.fraction spark.memory.storageFraction
更改參數配置spark.memory.fraction可調整storage+executor總共占內存的百分比,更改配置spark.memory.storageFraction可調整storage占二者內存和的百分比,這兩個參數一般使用默認值就可以滿足我們絕大部分的作業的要求了。
再說說Spark的堆外內存,為了提高Spark內存的使用以及提高Shuffle時的效率,Spark引入了堆外(Off-heap)內存。在默認的情況下堆外內存是不會啟用的,可以通過如下參數進行開啟:
spark.memory.offHeap.enabled
Spark Executor可以通過參數spark.yarn.executor.memoryoverhead 進行配置,最小為 384MB,默認為 Executor 內存的 10%。配置堆外內存大小的參數為spark.memory.offHeap.size,堆外內存與堆內存的劃分方式其實是相同的,用戶需要知道每個部分的大小如何調節,才能針對場景進行調優,這個對于普通用戶來說其實不是特別的友好。
Spark速度非常快的原因之一,就是在不同操作中可以在內存中持久化或緩存數據集。RDD通過persist方法或cache方法可以將前面的計算結果進行緩存,但是要注意的是并不會馬上進行緩存,而是觸發后面的action動作的時候,RDD才會被緩存在計算節點的內存中。如果某一份數據要被重復使用的時候,就可以使用cache算子進行緩存,可以達到很不錯的效果
Spark 的filter算子主要用于數據過濾。返回一個新的RDD,該RDD由經過func函數計算后返回值為true的輸入元素組成。在執行這個算子的時候數據一般會被拆分成多個分區,這些分區也會影響到后面的計算,所以在執行這個算子的時候用 coalesce 算子進行一次合并,也可以對作業的執行速度達到提升。
數據傾斜是數據處理作業中一個很常見的問題。 正常情況下,數據通常都會出現數據傾斜的問題,只不過嚴重程度不一。數據傾斜的癥狀是大量數據集中到一個或者幾個任務里,導致這幾個任務計算大量數據,拖慢整個作業的執行速度,這里給大家詳細分析一下數據傾斜是怎么出現的。
我們都知道在執行shuffle操作的時候,程序是按照key進行統計聚合,來進行values的數據的輸出、拉取和聚合的。同一個key的values,會被程序分配到一個Reduce task進行處理。但是這個時候如果你處理的是大量的數據,問題可能就要出現了。大量數據必定有多個Key,多個Key對應的values,舉個例子假如一共100萬數據。可能程序執行后,某個聚合的key對應了98萬數據,這些數據全部被分配到一個task上去面去執行。另外兩個task,可能各分配到了1萬數據。也可能是幾百個key被對應到了剩余的兩萬條數據,這個時候數據傾斜的問題就出現了,而且對于Spark作業的整體性能來說是及其不樂觀的。場景如下:
對應數據發生傾斜的情況,可以采用如下幾種解決辦法:
1.對源數據進行聚合
Spark中一些用于聚合操作的算子,比如groupByKey、reduceByKey,這些算子都是要去拿到每個key對應的values進行計算的。在一些大數據量的計算中,我們可以找到數據的一些維度進行一步聚合,比如說是時間維度的年月日,城市的地區等等,聚合了第一個維度之火再進行下一步的聚合
2.對臟數據進行首先過濾
對應源數據處理中,必定是會存在很多臟數據,這個也是導致數據傾斜的重要原因之一,這個時候我們需要第一步將臟數據進行過濾
3.使用廣播變量
在作業進行連接操作的時候,我們可以將小表通過廣播變量進行廣播,這樣可以避免Shuffle過程,讓數據相對比較均勻的分布在Map任務。
4.提高作業的并行度
這個方式在前面我們也說到過如何進行參數配置,但是要注意的是,這個配置只是提高瀏覽作業的運行速度,但是并不能從根本上解決數據傾斜的問題。
5.使用隨機Key進行雙重聚合
groupByKey、reduceByKey比較適合使用這種方式。join操作通常不會這樣來做。
到這里,相信大家對與Spark如何進行調優也有了全新的認識!
spark
版權聲明:本文內容由網絡用戶投稿,版權歸原作者所有,本站不擁有其著作權,亦不承擔相應法律責任。如果您發現本站中有涉嫌抄襲或描述失實的內容,請聯系我們jiasou666@gmail.com 處理,核實后本網站將在24小時內刪除侵權內容。
版權聲明:本文內容由網絡用戶投稿,版權歸原作者所有,本站不擁有其著作權,亦不承擔相應法律責任。如果您發現本站中有涉嫌抄襲或描述失實的內容,請聯系我們jiasou666@gmail.com 處理,核實后本網站將在24小時內刪除侵權內容。