談?wù)?/a>flink內(nèi)存管理

      網(wǎng)友投稿 942 2025-04-03

      Flink是jvm之上的大數(shù)據(jù)處理引擎,jvm存在java對象存儲(chǔ)密度低、full gc時(shí)消耗性能,gc存在stw的問題,同時(shí)omm時(shí)會(huì)影響穩(wěn)定性。同時(shí)針對頻繁序列化和反序列化問題Flink使用堆內(nèi)堆外內(nèi)存可以直接在一些場景下操作二進(jìn)制數(shù)據(jù),減少序列化反序列化的消耗。同時(shí)基于大數(shù)據(jù)流式處理的特點(diǎn),flink定制了自己的一套序列化框架。flink也會(huì)基于cpu L1 L2 L3高速緩存的機(jī)制以及局部性原理,設(shè)計(jì)使用緩存友好的數(shù)據(jù)結(jié)構(gòu)。flink內(nèi)存管理和spark的tungsten的內(nèi)存管理的出發(fā)點(diǎn)很相似。


      內(nèi)存模型

      Flink可以使用堆內(nèi)和堆外內(nèi)存,內(nèi)存模型如圖所示:

      flink使用內(nèi)存劃分為堆內(nèi)內(nèi)存和堆外內(nèi)存。按照用途可以劃分為task所用內(nèi)存,network memory、managed memory、以及framework所用內(nèi)存,其中task network managed所用內(nèi)存計(jì)入slot內(nèi)存。framework為taskmanager公用。

      堆內(nèi)內(nèi)存包含用戶代碼所用內(nèi)存、heapstatebackend、框架執(zhí)行所用內(nèi)存。

      堆外內(nèi)存是未經(jīng)jvm虛擬化的內(nèi)存,直接映射到操作系統(tǒng)的內(nèi)存地址,堆外內(nèi)存包含框架執(zhí)行所用內(nèi)存,jvm堆外內(nèi)存、Direct、native等。

      Direct memory內(nèi)存可用于網(wǎng)絡(luò)傳輸緩沖。network memory屬于direct memory的范疇,flink可以借助于此進(jìn)行zero copy,從而減少內(nèi)核態(tài)到用戶態(tài)copy次數(shù),從而進(jìn)行更高效的io操作。

      jvm metaspace存放jvm加載的類的元數(shù)據(jù),加載的類越多,需要的空間越大,overhead用于jvm的其他開銷,如native memory、code cache、thread stack等。

      Managed Memory主要用于RocksDBStateBackend和批處理算子,也屬于native memory的范疇,其中rocksdbstatebackend對應(yīng)rocksdb,rocksdb基于lsm數(shù)據(jù)結(jié)構(gòu)實(shí)現(xiàn),每個(gè)state對應(yīng)一個(gè)列族,占有獨(dú)立的writebuffer,rocksdb占用native內(nèi)存大小為 blockCahe + writebufferNum * writeBuffer + index ,同時(shí)堆外內(nèi)存是進(jìn)程之間共享的,jvm虛擬化大量heap內(nèi)存耗時(shí)較久,使用堆外內(nèi)存的話可以有效的避免該環(huán)節(jié)。但堆外內(nèi)存也有一定的弊端,即監(jiān)控調(diào)試使用相對復(fù)雜,對于生命周期較短的segment使用堆內(nèi)內(nèi)存開銷更低,flink在一些情況下,直接操作二進(jìn)制數(shù)據(jù),避免一些反序列化帶來的開銷。如果需要處理的數(shù)據(jù)超出了內(nèi)存限制,則會(huì)將部分?jǐn)?shù)據(jù)存儲(chǔ)到硬盤上。

      談?wù)刦link內(nèi)存管理

      內(nèi)存管理

      類似于OS中的page機(jī)制,flink模擬了操作系統(tǒng)的機(jī)制,通過page來管理內(nèi)存,flink對應(yīng)page的數(shù)據(jù)結(jié)構(gòu)為dataview和MemorySegment,memorysegment是flink內(nèi)存分配的最小單位,默認(rèn)32kb,其可以在堆上也可以在堆外,flink通過MemorySegment的數(shù)據(jù)結(jié)構(gòu)來訪問堆內(nèi)堆外內(nèi)存,借助于flink序列化機(jī)制(序列化機(jī)制會(huì)在下一小節(jié)講解),memorysegment提供了對二進(jìn)制數(shù)據(jù)的讀取和寫入的方法,flink使用datainputview和dataoutputview進(jìn)行memorysegment的二進(jìn)制的讀取和寫入,flink可以通過HeapMemorySegment 管理堆內(nèi)內(nèi)存,通過HybridMemorySegment來管理堆內(nèi)和堆外內(nèi)存,MemorySegment管理jvm堆內(nèi)存時(shí),其定義一個(gè)字節(jié)數(shù)組的引用指向內(nèi)存端,基于該內(nèi)部字節(jié)數(shù)組的引用進(jìn)行操作的HeapMemorySegment。

      public abstract class MemorySegment { /** * The heap byte array object relative to which we access the memory. * 如果為堆內(nèi)存,則指向訪問的內(nèi)存的引用,否則若內(nèi)存為非堆內(nèi)存,則為null *

      Is non-null if the memory is on the heap, and is null, if the memory is * off the heap. If we have this buffer, we must never void this reference, or the memory * segment will point to undefined addresses outside the heap and may in out-of-order execution * cases cause segmentation faults. */ protected final byte[] heapMemory; /** * The address to the data, relative to the heap memory byte array. If the heap memory byte * array is null, this becomes an absolute memory address outside the heap. * 字節(jié)數(shù)組對應(yīng)的相對地址 */ protected long address; }

      HeapMemorySegment用來分配堆上內(nèi)存。

      public final class HeapMemorySegment extends MemorySegment { /** * An extra reference to the heap memory, so we can let byte array checks fail by the built-in * checks automatically without extra checks. * 字節(jié)數(shù)組的引用指向該內(nèi)存段 */ private byte[] memory; public void free() { super.free(); this.memory = null; } public final void get(DataOutput out, int offset, int length) throws IOException { out.write(this.memory, offset, length); } }

      HybridMemorySegment即支持onheap和offheap內(nèi)存,flink通過jvm的unsafe操作,如果對象o不為null,為onheap的場景,并且后面的地址或者位置是相對位置,那么會(huì)直接對當(dāng)前對象(比如數(shù)組)的相對位置進(jìn)行操作。如果對象o為null,操作的內(nèi)存塊不是JVM堆內(nèi)存,為off-heap的場景,并且后面的地址是某個(gè)內(nèi)存塊的絕對地址,那么這些方法的調(diào)用也相當(dāng)于對該內(nèi)存塊進(jìn)行操作。

      public final class HybridMemorySegment extends MemorySegment { @Override public ByteBuffer wrap(int offset, int length) { if (address <= addressLimit) { if (heapMemory != null) { return ByteBuffer.wrap(heapMemory, offset, length); } else { try { ByteBuffer wrapper = offHeapBuffer.duplicate(); wrapper.limit(offset + length); wrapper.position(offset); return wrapper; } catch (IllegalArgumentException e) { throw new IndexOutOfBoundsException(); } } } else { throw new IllegalStateException("segment has been freed"); } } }

      flink通過MemorySegmentFactory來創(chuàng)建memorySegment,memorySegment是flink內(nèi)存分配的最小單位。對于跨memorysegment的數(shù)據(jù)方位,flink抽象出一個(gè)訪問視圖,數(shù)據(jù)讀取datainputView,數(shù)據(jù)寫入dataoutputview。

      /** * This interface defines a view over some memory that can be used to sequentially read the contents of the memory. * The view is typically backed by one or more {@link org.apache.flink.core.memory.MemorySegment}. */ @Public public interface DataInputView extends DataInput { private MemorySegment[] memorySegments; // view持有的MemorySegment的引用, 該組memorysegment可以視為一個(gè)內(nèi)存頁, flink可以順序讀取memorysegmet中的數(shù)據(jù) /** * Reads up to {@code len} bytes of memory and stores it into {@code b} starting at offset {@code off}. * It returns the number of read bytes or -1 if there is no more data left. * @param b byte array to store the data to * @param off offset into byte array * @param len byte length to read * @return the number of actually read bytes of -1 if there is no more data left */ int read(byte[] b, int off, int len) throws IOException; }

      dataoutputview是數(shù)據(jù)寫入的視圖,outputview持有多個(gè)memorysegment的引用,flink可以順序的寫入segment。

      /** * This interface defines a view over some memory that can be used to sequentially write contents to the memory. * The view is typically backed by one or more {@link org.apache.flink.core.memory.MemorySegment}. */ @Public public interface DataOutputView extends DataOutput { private final List memory; // memorysegment的引用 /** * Copies {@code numBytes} bytes from the source to this view. * @param source The source to copy the bytes from. * @param numBytes The number of bytes to copy. void write(DataInputView source, int numBytes) throws IOException; }

      上一小節(jié)中講到的managedmemory內(nèi)存部分,flink使用memorymanager來管理該內(nèi)存,managedmemory只使用堆外內(nèi)存,主要用于批處理中的sorting、hashing、以及caching(社區(qū)消息,未來流處理也會(huì)使用到該部分),在流計(jì)算中作為rocksdbstatebackend的部分內(nèi)存。memeorymanager通過memorypool來管理memorysegment。

      /** * The memory manager governs the memory that Flink uses for sorting, hashing, caching or off-heap state backends * (e.g. RocksDB). Memory is represented either in {@link MemorySegment}s of equal size or in reserved chunks of certain * size. Operators allocate the memory either by requesting a number of memory segments or by reserving chunks. * Any allocated memory has to be released to be reused later. *

      The memory segments are represented as off-heap unsafe memory regions (both via {@link HybridMemorySegment}). * Releasing a memory segment will make it re-claimable by the garbage collector, but does not necessarily immediately * releases the underlying memory. */ public class MemoryManager { /** * Allocates a set of memory segments from this memory manager. *

      The total allocated memory will not exceed its size limit, announced in the constructor. * @param owner The owner to associate with the memory segment, for the fallback release. * @param target The list into which to put the allocated memory pages. * @param numberOfPages The number of pages to allocate. * @throws MemoryAllocationException Thrown, if this memory manager does not have the requested amount * of memory pages any more. */ public void allocatePages( Object owner, Collection target, int numberOfPages) throws MemoryAllocationException { } private static void freeSegment(MemorySegment segment, @Nullable Collection segments) { segment.free(); if (segments != null) { segments.remove(segment); } } /** * Frees this memory segment. *

      After this operation has been called, no further operations are possible on the memory * segment and will fail. The actual memory (heap or off-heap) will only be released after this * memory segment object has become garbage collected. */ public void free() { // this ensures we can place no more data and trigger // the checks for the freed segment address = addressLimit + 1; } }

      對于上一小節(jié)中提到的NetWorkMemory的內(nèi)存,flink使用networkbuffer做了一層buffer封裝。buffer的底層也是memorysegment,flink通過bufferpool來管理buffer,每個(gè)taskmanager都有一個(gè)netwokbufferpool,該tm上的各個(gè)task共享該networkbufferpool,同時(shí)task對應(yīng)的localbufferpool所需的內(nèi)存需要從networkbufferpool申請而來,它們都是flink申請的堆外內(nèi)存。

      上游算子向resultpartition寫入數(shù)據(jù)時(shí),申請buffer資源,使用bufferbuilder將數(shù)據(jù)寫入memorysegment,下游算子從resultsubpartition消費(fèi)數(shù)據(jù)時(shí),利用bufferconsumer從memorysegment中讀取數(shù)據(jù),bufferbuilder與bufferconsumer一一對應(yīng)。同時(shí)這一流程也和flink的反壓機(jī)制相關(guān)。如圖

      /** * A buffer pool used to manage a number of {@link Buffer} instances from the * {@link NetworkBufferPool}. *

      Buffer requests are mediated to the network buffer pool to ensure dead-lock * free operation of the network stack by limiting the number of buffers per * local buffer pool. It also implements the default mechanism for buffer * recycling, which ensures that every buffer is ultimately returned to the * network buffer pool. *

      The size of this pool can be dynamically changed at runtime ({@link #setNumBuffers(int)}. It * will then lazily return the required number of buffers to the {@link NetworkBufferPool} to * match its new size. */ class LocalBufferPool implements BufferPool { @Nullable private MemorySegment requestMemorySegment(int targetChannel) throws IOException { MemorySegment segment = null; synchronized (availableMemorySegments) { returnExcessMemorySegments(); if (availableMemorySegments.isEmpty()) { segment = requestMemorySegmentFromGlobal(); } // segment may have been released by buffer pool owner if (segment == null) { segment = availableMemorySegments.poll(); } if (segment == null) { availabilityHelper.resetUnavailable(); } if (segment != null && targetChannel != UNKNOWN_CHANNEL) { if (subpartitionBuffersCount[targetChannel]++ == maxBuffersPerChannel) { unavailableSubpartitionsCount++; availabilityHelper.resetUnavailable(); } } } return segment; } } /** * A result partition for data produced by a single task. * *

      This class is the runtime part of a logical {@link IntermediateResultPartition}. Essentially, * a result partition is a collection of {@link Buffer} instances. The buffers are organized in one * or more {@link ResultSubpartition} instances, which further partition the data depending on the * number of consuming tasks and the data {@link DistributionPattern}. *

      Tasks, which consume a result partition have to request one of its subpartitions. The request * happens either remotely (see {@link RemoteInputChannel}) or locally (see {@link LocalInputChannel}) The life-cycle of each result partition has three (possibly overlapping) phases: Produce Consume Release Buffer management State management */ public abstract class ResultPartition implements ResultPartitionWriter, BufferPoolOwner { @Override public BufferBuilder getBufferBuilder(int targetChannel) throws IOException, InterruptedException { checkInProduceState(); return bufferPool.requestBufferBuilderBlocking(targetChannel); } } }

      自定義序列化框架

      flink對自身支持的基本數(shù)據(jù)類型,實(shí)現(xiàn)了定制的序列化機(jī)制,flink數(shù)據(jù)集對象相對固定,可以只保存一份schema信息,從而節(jié)省存儲(chǔ)空間,數(shù)據(jù)序列化就是java對象和二進(jìn)制數(shù)據(jù)之間的數(shù)據(jù)轉(zhuǎn)換,flink使用TypeInformation的createSerializer接口負(fù)責(zé)創(chuàng)建每種類型的序列化器,進(jìn)行數(shù)據(jù)的序列化反序列化,類型信息在構(gòu)建streamtransformation時(shí)通過typeextractor根據(jù)方法簽名類信息等提取類型信息并存儲(chǔ)在streamconfig中。

      /** * Creates a serializer for the type. The serializer may use the ExecutionConfig * for parameterization. * 創(chuàng)建出對應(yīng)類型的序列化器 * @param config The config used to parameterize the serializer. * @return A serializer for this type. */ @PublicEvolving public abstract TypeSerializer createSerializer(ExecutionConfig config); /** * A utility for reflection analysis on classes, to determine the return type of implementations of transformation * functions. */ @Public public class TypeExtractor { /** * Creates a {@link TypeInformation} from the given parameters. * If the given {@code instance} implements {@link ResultTypeQueryable}, its information * is used to determine the type information. Otherwise, the type information is derived * based on the given class information. * @param instance instance to determine type information for * @param baseClass base class of {@code instance} * @param clazz class of {@code instance} * @param returnParamPos index of the return type in the type arguments of {@code clazz} * @param output type * @return type information */ @SuppressWarnings("unchecked") @PublicEvolving public static TypeInformation createTypeInfo(Object instance, Class baseClass, Class clazz, int returnParamPos) { if (instance instanceof ResultTypeQueryable) { return ((ResultTypeQueryable) instance).getProducedType(); } else { return createTypeInfo(baseClass, clazz, returnParamPos, null, null); } } }

      對于嵌套的數(shù)據(jù)類型,flink從最內(nèi)層的字段開始序列化,內(nèi)層序列化的結(jié)果將組成外層序列化結(jié)果,反序列時(shí),從內(nèi)存中順序讀取二進(jìn)制數(shù)據(jù),根據(jù)偏移量反序列化為java對象。flink自帶序列化機(jī)制存儲(chǔ)密度很高,序列化對應(yīng)的類型值即可。flink中的table模塊在memorysegment的基礎(chǔ)上使用了BinaryRow的數(shù)據(jù)結(jié)構(gòu),可以更好地減少反序列化開銷,需要反序列化是可以只序列化相應(yīng)的字段,而無需序列化整個(gè)對象。

      同時(shí)你也可以注冊子類型和自定義序列化器,對于flink無法序列化的類型,會(huì)交給kryo進(jìn)行處理,如果kryo也無法處理,將強(qiáng)制使用avro來序列化,kryo序列化性能相對flink自帶序列化機(jī)制較低,開發(fā)時(shí)可以使用env.getConfig().disableGenericTypes()來禁用kryo,盡量使用flink框架自帶的序列化器對應(yīng)的數(shù)據(jù)類型。

      緩存友好的數(shù)據(jù)結(jié)構(gòu)

      cpu中L1、L2、L3的緩存讀取速度比從內(nèi)存中讀取數(shù)據(jù)快很多,高速緩存的訪問速度是主存的訪問速度的很多倍。另外一個(gè)重要的程序特性是局部性原理,程序常常使用它們最近使用的數(shù)據(jù)和指令,其中兩種局部性類型,時(shí)間局部性指最近訪問的內(nèi)容很可能短期內(nèi)被再次訪問,空間局部性是指地址相互臨近的項(xiàng)目很可能短時(shí)間內(nèi)被再次訪問。

      結(jié)合這兩個(gè)特性設(shè)計(jì)緩存友好的數(shù)據(jù)結(jié)構(gòu)可以有效的提升緩存命中率和本地化特性,該特性主要用于排序操作中,常規(guī)情況下一個(gè)指針指向一個(gè)對象,排序時(shí)需要根據(jù)指針pointer獲取到實(shí)際數(shù)據(jù),然后再進(jìn)行比較,這個(gè)環(huán)節(jié)涉及到內(nèi)存的隨機(jī)訪問,緩存本地化會(huì)很低,使用序列化的定長key + pointer,這樣key就會(huì)連續(xù)存儲(chǔ)到內(nèi)存中,避免的內(nèi)存的隨機(jī)訪問,還可以提升cpu緩存命中率。對兩條記錄進(jìn)行排序時(shí)首先比較key,如果大小不同直接返回結(jié)果,只需交換指針即可,不用交換實(shí)際數(shù)據(jù),如果相同,則比較指針實(shí)際指向的數(shù)據(jù)。

      后記

      flink社區(qū)已走向流批一體的發(fā)展,后繼將更多的關(guān)注與流批一體的引擎實(shí)現(xiàn)及結(jié)合存儲(chǔ)層面的實(shí)現(xiàn)。flink服務(wù)請使用華為云 EI DLI-FLINK serverless服務(wù)。

      參考

      [1]: https://ci.apache.org/projects/flink/flink-docs-stable/

      [2]: https://github.com/apache/flink

      [3]: https://ververica.cn/

      數(shù)據(jù)湖探索 DLI 分布式

      版權(quán)聲明:本文內(nèi)容由網(wǎng)絡(luò)用戶投稿,版權(quán)歸原作者所有,本站不擁有其著作權(quán),亦不承擔(dān)相應(yīng)法律責(zé)任。如果您發(fā)現(xiàn)本站中有涉嫌抄襲或描述失實(shí)的內(nèi)容,請聯(lián)系我們jiasou666@gmail.com 處理,核實(shí)后本網(wǎng)站將在24小時(shí)內(nèi)刪除侵權(quán)內(nèi)容。

      版權(quán)聲明:本文內(nèi)容由網(wǎng)絡(luò)用戶投稿,版權(quán)歸原作者所有,本站不擁有其著作權(quán),亦不承擔(dān)相應(yīng)法律責(zé)任。如果您發(fā)現(xiàn)本站中有涉嫌抄襲或描述失實(shí)的內(nèi)容,請聯(lián)系我們jiasou666@gmail.com 處理,核實(shí)后本網(wǎng)站將在24小時(shí)內(nèi)刪除侵權(quán)內(nèi)容。

      上一篇:怎樣取消最后一頁的空白頁面?(如何把最后一頁的空白頁刪掉)
      下一篇:怎么刪除多余空白的文檔(如何刪除文檔中的空白)
      相關(guān)文章
      亚洲美女大bbbbbbbbb| 亚洲精品97久久中文字幕无码| 亚洲中文字幕无码一区二区三区| 亚洲av无码片vr一区二区三区| 亚洲va乱码一区二区三区| 亚洲综合网美国十次| 中文字幕亚洲精品| 亚洲黄色网址在线观看| 亚洲精品国产第1页| 亚洲福利视频网站| 亚洲一区二区三区免费视频| 亚洲人成片在线观看| 国产精品高清视亚洲精品| 亚洲理论片中文字幕电影| 亚洲一级毛片免费看| 97se亚洲国产综合自在线| 国产AV旡码专区亚洲AV苍井空| 最新亚洲卡一卡二卡三新区| 亚洲入口无毒网址你懂的| 亚洲熟妇无码一区二区三区| 亚洲人成色99999在线观看| 亚洲AV永久无码天堂影院| 激情小说亚洲色图| 亚洲国产a级视频| 色久悠悠婷婷综合在线亚洲| 亚洲乱码无码永久不卡在线 | 亚洲最大福利视频| 亚洲一卡2卡3卡4卡5卡6卡| 亚洲欧美成人综合久久久| www亚洲精品久久久乳| 亚洲国产激情一区二区三区| 日韩亚洲变态另类中文| 亚洲国产精品无码专区在线观看| 亚洲av日韩av不卡在线观看| 亚洲成人网在线观看| 亚洲熟妇av午夜无码不卡| 在线亚洲v日韩v| 中文字幕亚洲专区| 久久夜色精品国产嚕嚕亚洲av| 亚洲三级电影网址| 亚洲中文字幕无码一去台湾|