Spark內核詳解 (7) | Spark 內存管理
大家好,我是不溫卜火,是一名計算機學院大數據專業大二的學生,昵稱來源于成語—不溫不火,本意是希望自己性情溫和。作為一名互聯網行業的小白,博主寫博客一方面是為了記錄自己的學習過程,另一方面是總結自己所犯的錯誤希望能夠幫助到很多和自己一樣處于起步階段的萌新。但由于水平有限,博客中難免會有一些錯誤出現,有紕漏之處懇請各位大佬不吝賜教!暫時只有csdn這一個平臺,博客主頁:https://buwenbuhuo.blog.csdn.net/
本片博文為大家帶來的是Spark 內存管理。
目錄
一. 堆內和堆外內存規劃
二. 內存空間分配
三. 存儲內存管理
3.1 RDD 的持久化機制
3.2 RDD 的緩存過程
3.3 淘汰和落盤
四. 執行內存管理
4.1 多任務內存分配
4.2 Shuffle 的內存占用
Spark 與 Hadoop 的重要區別之一就在于對內存的使用.
Hadoop 只將內存作為計算資源, Spark 除將內存作為計算資源外, 還將內存的一部分納入到存儲體系中. Spark 使用 MemoryManage 對存儲體系和計算使用的內存進行管理.
一. 堆內和堆外內存規劃
Spark 將內存從邏輯上區分為堆內內存和堆外內存, 稱為內存模型(MemoryMode).
枚舉類型MemoryMode中定義了堆內存和堆外內存:
@Private public enum MemoryMode { ON_HEAP, // 堆內內存 OFF_HEAP // 堆外內存 }
1
2
3
4
5
6
說明:
這里的堆內存不能與 JVM 中的 Java 堆直接畫等號, 它只是 JVM 堆內存的一部分. 由 JVM 統一管理
堆外內存則是 Spark 使用 sun.misc.Unsafe的 API 直接在工作節點的系統內存中開辟的空間.
內存池
無論前面的哪種內存, 都需要一個內存池對內存進行資源管理, 抽象類MemoryPool定義了內存池的規范:
private[memory] abstract class MemoryPool(lock: Object) { @GuardedBy("lock") private[this] var _poolSize: Long = 0 /** * Returns the current size of the pool, in bytes. */ final def poolSize: Long = lock.synchronized { _poolSize } /** * Returns the amount of free memory in the pool, in bytes. */ final def memoryFree: Long = lock.synchronized { _poolSize - memoryUsed } /** * Expands the pool by `delta` bytes. */ final def incrementPoolSize(delta: Long): Unit = lock.synchronized { require(delta >= 0) _poolSize += delta } /** * Shrinks the pool by `delta` bytes. */ final def decrementPoolSize(delta: Long): Unit = lock.synchronized { require(delta >= 0) require(delta <= _poolSize) require(_poolSize - delta >= memoryUsed) _poolSize -= delta } /** * Returns the amount of used memory in this pool (in bytes). */ def memoryUsed: Long }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
有兩個實現類:
2. 堆內內存
堆內內存的大小由 Spark 應用程序啟動時的-executor-memory 或 spark.executor.memory 參數配置.
Executor 內運行的并發任務共享 JVM 堆內內存, 這些任務在緩存 RDD 數據和廣播數據時占用的內存被規劃為存儲內存
而這些任務在執行 Shuffle 時占用的內存被規劃為執行內存.
剩余的部分不做特殊規劃, 那些 Spark 內部的對象實例, 或者用戶定義的 Spark 應用程序中的對象實例, 均占用剩余的空間.
不同的管理模式下, 這三部分占用的空間大小各不相同.
Spark 對堆內內存的管理是一種邏輯上的”規劃式”的管理,因為對象實例占用內存的申請和釋放都由 JVM 完成,Spark 只能在申請后和釋放前記錄這些內存,我們來看其具體流程:
申請內存流程如下:
Spark 在代碼中 new 一個對象實例;
JVM 從堆內內存分配空間,創建對象并返回對象引用;
Spark 保存該對象的引用,記錄該對象占用的內存。
釋放內存流程如下
Spark記錄該對象釋放的內存,刪除該對象的引用;
等待JVM的垃圾回收機制釋放該對象占用的堆內內存。
存在的問題
我們知道,JVM 的對象可以以序列化的方式存儲,序列化的過程是將對象轉換為二進制字節流,本質上可以理解為將非連續空間的鏈式存儲轉化為連續空間或塊存儲,在訪問時則需要進行序列化的逆過程——反序列化,將字節流轉化為對象,序列化的方式可以節省存儲空間,但增加了存儲和讀取時候的計算開銷。
對于 Spark 中序列化的對象,由于是字節流的形式,其占用的內存大小可直接計算,而對于非序列化的對象,其占用的內存是通過周期性地采樣近似估算而得,即并不是每次新增的數據項都會計算一次占用的內存大小,這種方法降低了時間開銷但是有可能誤差較大,導致某一時刻的實際內存有可能遠遠超出預期。
此外,在被 Spark 標記為釋放的對象實例,很有可能在實際上并沒有被 JVM 回收,導致實際可用的內存小于 Spark 記錄的可用內存。所以 Spark 并不能準確記錄實際可用的堆內內存,從而也就無法完全避免內存溢出(OOM, Out of Memory)的異常。
雖然不能精準控制堆內內存的申請和釋放,但 Spark 通過對存儲內存和執行內存各自獨立的規劃管理,可以決定是否要在存儲內存里緩存新的 RDD,以及是否為新的任務分配執行內存,在一定程度上可以提升內存的利用率,減少異常的出現。
3. 堆外內存
為了進一步優化內存的使用以及提高 Shuffle 時排序的效率,Spark 引入了堆外(Off-heap)內存,使之可以直接在工作節點的系統內存中開辟空間,存儲經過序列化的二進制數據。
堆外內存意味著把內存對象分配在 Java 虛擬機的堆以外的內存,這些內存直接受操作系統管理(而不是虛擬機)。這樣做的結果就是能保持一個較小的堆,以減少垃圾收集對應用的影響。
利用 JDK Unsafe API,Spark 可以直接操作系統堆外內存,減少了不必要的內存開銷,以及頻繁的 GC 掃描和回收,提升了處理性能。
堆外內存可以被精確地申請和釋放(堆外內存之所以能夠被精確的申請和釋放,是由于內存的申請和釋放不再通過JVM機制,而是直接向操作系統申請,JVM對于內存的清理是無法準確指定時間點的,因此無法實現精確的釋放),而且序列化的數據占用的空間可以被精確計算,所以相比堆內內存來說降低了管理的難度,也降低了誤差。
在默認情況下堆外內存并不啟用,可通過配置 spark.memory.offHeap.enabled 參數啟用,并由 spark.memory.offHeap.size 參數設定堆外空間的大小。
除了沒有 other 空間,堆外內存與堆內內存的劃分方式相同,所有運行中的并發任務共享存儲內存和執行內存。
二. 內存空間分配
靜態內存管理(Static Memory Manager)
在 Spark1.6之前采用的靜態內存管理機制下,存儲內存、執行內存和其他內存的大小在 Spark 應用程序運行期間均為固定的,但用戶可以在應用程序啟動前進行配置.
堆內內存管理
Storage 內存(Storage Memory): 主要用于存儲 Spark 的 cache 數據,例如 RDD的緩存、Broadcast 變量,Unroll 數據等。
Execution 內存(Execution Memory):主要用于存放 Shuffle、Join、Sort、Aggregation等計算過程中的臨時數據。
other(有時候也叫用戶內存):主要用于存儲 RDD 轉換操作所需要的數據,例如 RDD 依賴等信息。 預留內存(Reserved
Memory):系統預留內存,會用來存儲Spark內部對象。
預留內存(Reserved Memory): 防止 OOM
可用的存儲內存 = systemMaxMemory * spark.storage.memoryFraction * spark.storage.safety Fraction 可用的執行內存 = systemMaxMemory * spark.shuffle.memoryFraction * spark.shuffle.safety Fraction
1
2
3
其中 systemMaxMemory 取決于當前 JVM 堆內內存的大小,最后可用的執行內存或者存儲內存要在此基礎上與各自的 memoryFraction 參數和 safetyFraction 參數相乘得出。
上述計算公式中的兩個 safetyFraction 參數,其意義在于在邏輯上預留出 1-safetyFraction 這么一塊保險區域,降低因實際內存超出當前預設范圍而導致 OOM 的風險(上文提到,對于非序列化對象的內存采樣估算會產生誤差)。
值得注意的是,這個預留的保險區域僅僅是一種邏輯上的規劃,在具體使用時 Spark 并沒有區別對待,和”其它內存”一樣交給了 JVM 去管理。
Storage內存和Execution內存都有預留空間,目的是防止OOM,因為Spark堆內內存大小的記錄是不準確的,需要留出保險區域。
堆外內存管理
堆外的空間分配較為簡單,只有存儲內存和執行內存。
可用的執行內存和存儲內存占用的空間大小直接由參數 spark.memory.storageFraction 決定,由于堆外內存占用的空間可以被精確計算,所以無需再設定保險區域。
靜態內存管理機制實現起來較為簡單,但如果用戶不熟悉 Spark 的存儲機制,或沒有根據具體的數據規模和計算任務或做相應的配置,很容易造成“一半海水,一半火焰”的局面,即存儲內存和執行內存中的一方剩余大量的空間,而另一方卻早早被占滿,不得不淘汰或移出舊的內容以存儲新的內容。
由于新的內存管理機制的出現,這種方式目前已經很少有開發者使用,出于兼容舊版本的應用程序的目的,Spark 仍然保留了它的實現。
2. 統一內存管理(Unified Memory Manager)
Spark 1.6 之后引入的統一內存管理機制,與靜態內存管理的區別在于存儲內存和執行內存共享同一塊空間,可以動態占用對方的空閑區域.
統一堆內內存管理
統一堆外內存管理
統一內存管理最重要的優化在于動態占用機制, 其規則如下:
設定基本的存儲內存和執行內存區域spark.storage.storageFraction, 該設定確定了雙方各自擁有的空間的范圍
雙方的空間都不足時, 則存儲到硬盤. 若己方空間不足而對方空余時, 可借用對方的空間.
執行內存的空間被對方占用后, 可讓對方講占用的部分轉存到硬盤, 然后“歸還”借用的空間
存儲內存的空間被對方占用后, 無法讓對方“歸還”, 因為需要考慮 Shuffle 過程中的諸多因素, 實現起來比較復雜.
憑借統一內存管理機制, Spark 在一定程度上提高了堆內內存和堆外內存的利用率, 降低了開發者維護 Spark 內存的難度, 但并不意味著開發者可以高枕無憂.
如果存儲內存的空間太大或者說緩存的數據過多,反而會導致頻繁的全量垃圾回收,降低任務執行時的性能,因為緩存的 RDD 數據通常都是長期駐留內存的。所以要想充分發揮 Spark 的性能,需要開發者進一步了解存儲內存和執行內存各自的管理方式和實現原理。
三. 存儲內存管理
3.1 RDD 的持久化機制
RDD 作為 Spark 最基本的數據抽象, 是分區記錄(partition)的只讀集合, 只能基于在穩定物理存儲中的數據集上創建, 或者在其他已有的 RDD 上執行轉換(Transformation)操作產生一個新的 RDD.
轉換后的 RDD 與原始的 RDD 之間產生的依賴關系, 構成了血統(Lineage). 憑借血統, Spark 可以保證每一個 RDD 都可以被重新恢復.
但 RDD 的所有轉換都是惰性的, 即只有當行動(Action)發生時, Spark 才會創建任務讀取 RDD, 然后才會真正的執行轉換操作.
Task 在啟動之初讀取一個分區的時, 會先判斷這個分區是否已經被持久化, 如果沒有則需要檢查 Checkpoint 或按照血統重新計算.
如果要在一個 RDD 上執行多次行動, 可以在第一次行動中使用 persis 或 cache 方法, 在內存或磁盤中持久化或緩存這個 RDD, 從而在后面的Action 時提示計算速度.
事實上, cache 方法是使用默認的 MEMORY_ONLY的存儲級別將 RDD 持久化到內存, 所以緩存是一種特殊的持久化.
堆內內存和堆外內存的設計, 便可以對緩存 RDD 時使用的內存做統一的規劃和管理
RDD 的持久化由 Spark 的 Storage 模塊負責, 實現了 RDD 與物理存儲的緊耦合.
Storage 模塊負責管理 Spark 在計算過程中產生的數據, 將那些在內存或磁盤, 在本地或遠程存取數據的功能封裝了起來.
在具體實現時 Driver 端和 Executor 端的 Storage 模塊構成了主從式的架構: 即 Driver 端的 BlockManager 為 Master, Executor 端的 BlockManager 為 Slave. Storage 模塊在邏輯上以 Block 為基本存儲單位, RDD 的每個 Partition 經過處理后唯一對應一個 Block. Master 負責整個 Spark 應用程序的 Block 元數據信息的管理和維護, 而 Slave 需要將 Block 的更新狀態上報到 Master, 同時接收 Master 的命令, 例如新增或刪除一個 RDD
在對 RDD 持久化時,Spark 規定了 MEMORY_ONLY、MEMORY_AND_DISK 等 7 種不同的存儲級別 ,而存儲級別是以下 5 個變量的組合:
class StorageLevel private( private var _useDisk: Boolean, private var _useMemory: Boolean, private var _useOffHeap: Boolean, private var _deserialized: Boolean, private var _replication: Int = 1)
1
2
3
4
5
6
7
通過對數據結構的分析,可以看出存儲級別從三個維度定義了 RDD 的 Partition(同時也就是 Block)的存儲方式:
存儲位置:磁盤/堆內內存/堆外內存。如 MEMORY_AND_DISK 是同時在磁盤和堆內內存上存儲,實現了冗余備份。OFF_HEAP 則是只在堆外內存存儲,目前選擇堆外內存時不能同時存儲到其他位置。
存儲形式:Block 緩存到存儲內存后,是否為非序列化的形式。如 MEMORY_ONLY 是非序列化方式存儲,OFF_HEAP 是序列化方式存儲。
副本數量:大于 1 時需要遠程冗余備份到其他節點。如 DISK_ONLY_2 需要遠程備份 1 個副本。
3.2 RDD 的緩存過程
RDD 在緩存到存儲內存之前, Partition 中的數據一般以迭代器(Iterator)的數據結構來訪問, 這是 Scala 語言中遍歷數據集合的方法. 通過 Iterator 可以獲取分區中每一條序列化或者非序列化的數據項(Record), 這些 Record的對象實例在邏輯上占用了 JVM 堆內內存的 other 部分的空間, 同一 Partition 的不同 Record 的空間并不連續.
RDD 在緩存到存儲內存之后, Partition 被轉換成 Block, Record 在堆內內存或堆外內存中占用一塊連續的空間.
將 Partition 由不連續的存儲空間轉換為連續存儲空間的過程, Spark 稱之為展開(Unroll)
Block 有序列化和非序列化兩種存儲格式, 具體以哪種方式存取決于該 RDD 的存儲級別.
非序列化的 Block 以一種 DeserializedMemoryEntry 的數據結構定義, 用一個數組存儲所有的對象實例, 序列化的 Block 則以 SerializedMemoryEntry的數據結構定義, 用字節緩沖區(ByteBuffer)來存儲二進制數據.
每個 Executor 的 Storage 模塊用一個鏈式 Map 結構(LinkedHashMap)來管理堆內和堆外存儲內存中所有的 Block 對象的實例, 對這個LinkedHashMap新增和刪除, 間接記錄了內存的申請和釋放.
因為不能保證存儲空間可以一次容納Iterator中的所有數據, 當前的計算任務在 Unroll 時要向 MemoryManager 申請足夠的 Unroll 空間來臨時占位, 空間不足則 Unroll 失敗, 空間足夠時可以繼續進行.
對于序列化的 Partition, 其所需的 Unroll 空間可以直接累加計算, 一次申請. 而對于非序列化的 Partition 則要在遍歷 Record 的過程中依次申請, 即讀取一條 Record, 采用估算器所需的 Unroll 空間并進行申請, 空間不足時可以中斷, 釋放已占用的 Unroll 空間.
如果最終 Unroll 成功, 當前 Partition 所占用的 Unroll 空間被轉換為正常的緩存 RDD 的存儲空間.
說明:
? 在靜態內存管理時,Spark 在存儲內存中專門劃分了一塊 Unroll 空間,其大小是固定的,統一內存管理時則沒有對 Unroll 空間進行特別區分,當存儲空間不足時會根據動態占用機制進行處理。
3.3 淘汰和落盤
由于同一個 Executor 的所有的計算任務共享有限的存儲內存空間, 當有新的 Block 需要緩存但是剩余空間不足無法動態占用時, 就要對 LinkedHashMap中的舊 Block 進行淘汰(Eviction), 而被淘汰的 Block 如果其存儲級別中同時包含存儲到磁盤的要求, 則要對其進行落盤(Drop), 否則就是直接刪除該 Block
存儲內存的淘汰規則為:
被的淘汰的舊 Block 要與新 Block 的 MemoryNode 相同, 即同屬于堆內內存或堆外內存
新舊Block 不能同屬于同一個 RDD, 避免循環淘汰
舊 Block 所屬 RDD 不能處于被讀狀態, 避免引發一致性問題
遍歷 LinkedHashMap 中的 Block, 按照最近最少使用(LRU)的順序淘汰, 直到滿足新 Block 所需的空間.
落盤的流程則比較簡單, 如果其存儲級別符號_useDisk為true的條件, 再根據其_deserialized判斷是否是非序列化的形式, 若是則對其進行序列化, 最后將數據存儲到磁盤, 然后在 Storage 模塊中更新其信息
四. 執行內存管理
4.1 多任務內存分配
Executor 內運行的任務同樣共享執行內存, Spark 用一個 HashMap 結構保存了“任務->內存耗費”的映射.
每個任務可占用的執行內存大小的范圍為1/2N ~ 1/N, 其中 N 為當前 Executor 內正在運行的任務的個數.
每個任務在啟動之時, 要向 MemoryManage 申請最少 1/2N的執行內存, 如果不能被滿足要求則該任務被阻塞,直到有其他任務釋放了足夠的執行內存, 該任務才可以被喚醒.
4.2 Shuffle 的內存占用
執行內存主要用來存儲任務在執行 Shuffle 時占用的內存,Shuffle 是按照一定規則對 RDD 數據重新分區的過程,我們來看 Shuffle 的 Write 和 Read 兩階段對執行內存的使用:
Shuffle Write
若在 map 端選擇普通的排序方式,會采用 ExternalSorter 進行外排,在內存中存儲數據時主要占用堆內執行空間。
若在 map 端選擇 Tungsten 的排序方式,則采用 ShuffleExternalSorter 直接對以序列化形式存儲的數據排序,在內存中存儲數據時可以占用堆外或堆內執行空間,取決于用戶是否開啟了堆外內存以及堆外執行內存是否足夠。
Shuffle Read
在對 reduce 端的數據進行聚合時,要將數據交給 Aggregator 處理,在內存中存儲數據時占用堆內執行空間。
如果需要進行最終結果排序,則要將再次將數據交給 ExternalSorter 處理,占用堆內執行空間。
在 ExternalSorter 和 Aggregator 中,Spark 會使用一種叫 AppendOnlyMap 的哈希表在堆內執行內存中存儲數據,但在 Shuffle 過程中所有數據并不能都保存到該哈希表中,當這個哈希表占用的內存會進行周期性地采樣估算,當其大到一定程度,無法再從 MemoryManager 申請到新的執行內存時,Spark 就會將其全部內容存儲到磁盤文件中,這個過程被稱為溢存(Spill),溢存到磁盤的文件最后會被歸并(Merge)。
Shuffle Write 階段中用到的 Tungsten(鎢絲) 是 Databricks 公司提出的對 Spark 優化內存和 CPU 使用的計劃(鎢絲計劃),解決了一些 JVM 在性能上的限制和弊端。Spark 會根據 Shuffle 的情況來自動選擇是否采用 Tungsten 排序。
Tungsten 采用的頁式內存管理機制建立在 MemoryManager 之上,即 Tungsten 對執行內存的使用進行了一步的抽象,這樣在 Shuffle 過程中無需關心數據具體存儲在堆內還是堆外。每個內存頁用一個 MemoryBlock 來定義,并用 Object obj 和 long offset 這兩個變量統一標識一個內存頁在系統內存中的地址。
堆內的 MemoryBlock 是以 long 型數組的形式分配的內存,其 obj 的值為是這個數組的對象引用,offset是 long 型數組的在 JVM 中的初始偏移地址,兩者配合使用可以定位這個數組在堆內的絕對地址;
堆外的 MemoryBlock 是直接申請到的內存塊,其 obj 為 null,offset 是這個內存塊在系統內存中的 64 位絕對地址。Spark 用 MemoryBlock 巧妙地將堆內和堆外內存頁統一抽象封裝,并用頁表(pageTable)管理每個 Task 申請到的內存頁。
Tungsten 頁式管理下的所有內存用 64 位的邏輯地址表示,由頁號和頁內偏移量組成:
頁號:占 13 位,唯一標識一個內存頁,Spark 在申請內存頁之前要先申請空閑頁號。 頁內偏移量:占 51 位,是在使用內存頁存儲數據時,數據在頁內的偏移地址。 有了統一的尋址方式,Spark 可以用 64 位邏輯地址的指針定位到堆內或堆外的內存,整個 Shuffle Write 排序的過程只需要對指針進行排序,并且無需反序列化,整個過程非常高效,對于內存訪問效率和 CPU 使用效率帶來了明顯的提升[10]。
Spark 的存儲內存和執行內存有著截然不同的管理方式:對于存儲內存來說,Spark 用一個 LinkedHashMap 來集中管理所有的 Block,Block 由需要緩存的 RDD 的 Partition 轉化而成;而對于執行內存,Spark 用 AppendOnlyMap 來存儲 Shuffle 過程中的數據,在 Tungsten 排序中甚至抽象成為頁式內存管理,開辟了全新的 JVM 內存管理機制。
本次的分享就到這里了,
好書不厭讀百回,熟讀課思子自知。而我想要成為全場最靚的仔,就必須堅持通過學習來獲取更多知識,用知識改變命運,用博客見證成長,用行動證明我在努力。
如果我的博客對你有幫助、如果你喜歡我的博客內容,請“” “評論”“”一鍵三連哦!聽說的人運氣不會太差,每一天都會元氣滿滿呦!如果實在要白嫖的話,那祝你開心每一天,歡迎常來我博客看看。
碼字不易,大家的支持就是我堅持下去的動力。后不要忘了關注我哦!
JVM spark
版權聲明:本文內容由網絡用戶投稿,版權歸原作者所有,本站不擁有其著作權,亦不承擔相應法律責任。如果您發現本站中有涉嫌抄襲或描述失實的內容,請聯系我們jiasou666@gmail.com 處理,核實后本網站將在24小時內刪除侵權內容。