微吼云上線多路互動直播服務(wù) 加速多場景互動直播落地
1099
2025-03-31
27. read:??先說說這個方法的返回對象:??FetchDataInfo——這是一個case類,包含了日志位移元數(shù)據(jù)信息以及一個消息集合。這個方法也很簡單,就是從日志中讀取消息,將起始位移和讀取到的消息集合封裝進(jìn)一個FetchDataInfo中。此方法接收3個參數(shù):??startOffset表示讀取操作執(zhí)行的開始位移點;?maxLength表示最多讀取的字節(jié)數(shù);?maxOffset表示讀取操作不能超過的位移點,即返回的消息集合中不能包含該位移。具體邏輯如下:?首先檢查下一條消息的位移與給定的起始位移,如果兩者相等直接返回,只不過是空的消息集合。否則的話,找到小于等于startOffset的最大位移所在的日志段。如果startOffset比當(dāng)前最大位移還大或者壓根就沒有找到剛才的日志段,那么說明要讀取的內(nèi)容已經(jīng)超出了日至當(dāng)前的結(jié)束offset,直接報錯退出。okay,如果到這里很運行正常的話,那么下面就開始循環(huán)讀取消息:?如果讀取的消息集合不為空直接返回,否則跳到下一個日志段繼續(xù)讀取直到下一個日志段為空退出循環(huán)。此時的情況是我們已經(jīng)跨過了最后一個日志段但給定的startOffset確實合法的值——這是有可能的,比如所有比startOffset大的消息都已經(jīng)被刪除了。如果是這樣的話,程序簡單地返回一個包含空消息集合的FetchDataInfo對象。(后面有更詳細(xì)的分析)
26. trimInvalidBytes:??消除消息集合尾部的無效字節(jié)。在學(xué)習(xí)這個方法之前,我們要了解Kafka在這個scala文件中定義一個case class類:?LogAppendInfo類——這個類保存了每個消息集合的各種信息:?包括這個集合的起始位移、結(jié)束位移、未壓縮消息(shallow message)數(shù),合法字節(jié)數(shù)、用到的壓縮算法以及表明該消息集合位移是否是單調(diào)增加的布爾值。現(xiàn)在再來看trimInvalidBytes方法,這個方法接收2個參數(shù):?一個要是做trim的消息集合,另一個是用LogAppendInfo對象標(biāo)識的消息集合的通用信息,結(jié)果自然是被trim過的消息集合——可能與原消息集合相同。具體做法是:?首先計算出這個消息集合的合法字節(jié)數(shù)(這要通過analyzeAndValidateMessageSet方法給出,后面會說這個方法)——如果合法字節(jié)數(shù)小于0,直接報退出。如果該字節(jié)數(shù)就是消息集合的字節(jié)數(shù),那么說明不用做trim直接返回傳入的messages即可。否則即說明有非法的字節(jié),那么就新建一個ByteBufferMessageSet消息集合,將limit設(shè)置為剛才計算的合法字節(jié)數(shù),然后返回。
27. analyzeAndvalidateMessageSet:??上面的方法提到了一個消息集合的合法字節(jié)與非法字節(jié),那么如何定義合法性呢?答案就有這個方法給出。從字面上來說,這個方法做的工作就是分析驗證消息集合,主要的工作有:?a. 驗證消息CRC碼;?b. 驗證消息長度合法性;?c. 計算消息集合的起始位移;?d. 計算結(jié)束位移;?e. 消息集合中的消息數(shù);??f. 計算合法字節(jié)數(shù)(就是合法消息字節(jié)數(shù)的累加和);??g. 驗證位移是否單調(diào)增加;??h. 驗證是否使用了壓縮,如果指定了多個,只以最后一條消息的壓縮算法為準(zhǔn)
28. loadSegments:??該方法就是加載磁盤上的日志文件。具體邏輯如下:?a.?如果給定的路徑不存在則創(chuàng)建出來;??b.?遍歷該目錄路徑下的所有文件刪除掉那些臨時文件(包括后綴名是.deleted和.cleaned);?c. 如果發(fā)現(xiàn)是以.swap結(jié)尾的文件,說明在上一次的swap過程中Kafka失敗了,需要執(zhí)行恢復(fù)操作。針對上面的情況,先去掉結(jié)尾的.swap然后判斷是.log還是.index結(jié)尾。如果是索引文件(.index結(jié)尾)則直接刪除,反正后面可以重建;?如果是日志數(shù)據(jù)文件(.log結(jié)尾),那么先刪除對應(yīng)的索引文件,然后將.swap去掉表示修復(fù)成功;?d.?第一遍遍歷之后再次進(jìn)行第二遍遍歷。對目錄下的每個文件,如果它是索引文件,則尋找對應(yīng)的.log文件,如果不存在拋出告警信息并直接該索引文件;?如果存在的話不做任何處理;?但如果該文件本身就是日志數(shù)據(jù)文件,則必然是000000...0000【offset】.log這樣的形式;??e.?提取基礎(chǔ)offset,并判斷是否存在對應(yīng)的索引文件,然后就創(chuàng)建新的日志段對象。f. 創(chuàng)建日志段之后判斷是否存在索引文件,如果沒有的話重建索引;??g. 最后將新創(chuàng)建的日志段加入到日志段map中,至此第二遍遍歷完成;??h.?此時判斷日志段map中是否存在任何日志段,如果沒有的話則創(chuàng)建一個offset為0的空日志段——因為每個日志都至少要有一個日志段。如果map中的確有日志段,先調(diào)用recoverLog方法(稍后會說)恢復(fù)日志段然后重設(shè)activetSegment的索引長度(否則容易引發(fā)日志段切分);j. 最后為每個日志段檢查對應(yīng)的索引文件(確保索引文件為空以及索引長度一定要是8的倍數(shù),因為索引項長度總是位移的整數(shù)倍)
29. recoverLog:??主要為日志段map中自恢復(fù)點起的每個日志段重建索引文件并且砍掉那些位于日志和索引尾部的無效字節(jié)。如果發(fā)現(xiàn)確實存在無效字節(jié),那么就把那些日志段全部刪除掉
30. append:??添加給定的消息集合到當(dāng)前激活的日志段中,如果滿足條件的話做切分。
2.3????????????? LogSegment
Segment是個邏輯概念,為了防止log文件過大, 將log分成許多的LogSegments
Segment又分為兩部分,MessageSet文件和Index文件,分別命名為[base_offset].log和[base_offset].index,base_offset就是該Segment的起始o(jì)ffset,比前一個segment里面的offset都要大。MessageSet文件是用FileMessageSet類實現(xiàn)的,Index文件是由類OffsetIndex實現(xiàn),這兩個類后面有介紹。同時index文件是可以根據(jù)MessageSet文件重新rebuild的
Segment提供對于MessageSet的讀寫接口
寫,需要間隔的更新index文件,應(yīng)該為了盡量減小index的size,所以只是當(dāng)寫入數(shù)據(jù)大于indexIntervalBytes時,才增加一條索引
讀,由于user傳入的是邏輯offest,需要先轉(zhuǎn)化為物理地址才能從文件中讀到數(shù)據(jù),如何轉(zhuǎn)化參考下面的介紹。
其對應(yīng)的代碼文件是LogSegment.scala,該scala文件有一個非線程安全的類:LogSegment,用于表示日志段。該類的構(gòu)造函數(shù)有6個參數(shù),分別是:
1. log:?FileMessageSet定義的消息集合
2. index:?OffsetIndex定義的位移索引,包含了位移到物理文件位置的映射
3. baseOffset:?日志文件的基礎(chǔ)位移,也就是這個日志段中最低的位移
4. indexIntervalBytes:?索引文件中索引項的間隔,即Kafka查找下一個物理位置時進(jìn)行線性查找的最大字節(jié)數(shù)。
5. rollJitterMs:?指定日志段切分時的jitter time,避免日志切分時出現(xiàn)驚群
6. time:?一個時間變量,主要提供時間方面的服務(wù)
下面對LogSegment的一些關(guān)鍵代碼進(jìn)行分析:
1. created變量:?創(chuàng)建一個日志段的時間信息是很有用的,所以需要有個變量保存這個信息
2. size方法:?既然是保存消息的日志段,也必然有個方法保存當(dāng)前日志段占用的字節(jié)數(shù),具體實現(xiàn)方法就是調(diào)用LogSegment包含的日志對象的size方法
3. bytesSinceLastIndexEntry變量:?這個變量主要的作用就是用于判斷在追加寫日志的同時是否需要增加一條索引項。由于log.index.interval.bytes默認(rèn)是4KB,因此每寫4KB就會在索引文件中增加一條索引記錄。增加索引項之后需要將該變量置為0重新計算
4. lastModified以及l(fā)astModified_方法:?Kafka在清理日志段的時候根據(jù)當(dāng)前時間與該方法返回值比較清理那些陳舊的日志段并且根據(jù)UAP原則提供了同名的setter方法用于更新日志段對象中日志文件和索引文件的最近修改時間。
5. delete方法:?邏輯很簡單的方法,就是刪除日志文件和索引文件
6. close方法:?關(guān)閉日志段的方法,具體就是關(guān)閉底層的日志文件和索引文件
7. changeFileSuffixes方法:?同時更改日志文件和索引文件的后綴名。例如在刪除日志段的時候把a.log和a.index更名為a.log.delete和a.index.delete
8. flush方法:?將buffer中的消息和索引項寫入磁盤
9. nextOffset方法:?計算這個日志段中下一條消息的位移。這個方法運行起來是有很高的代價的,因為它需要從索引文件中最后一項標(biāo)識的位移處開始讀出一個消息集合。特別注意的是如果索引文件為空的話,它就需要將整個日志段的數(shù)據(jù)都讀出來并返回一個FetchDataInfo對象。這個對象由一個位移元數(shù)據(jù)加上一個消息集合組成。如果這個FetchDataInfo為空,或者它包含的消息集合為空就只返回baseOffset——即這個日志段開始o(jì)ffset,否則返回offset+1 (主要是因為消息集合本身也就是一組MessageAndOffset對象)
10. truncateTo方法:?給定一個位移,將位于該位移之后的所有索引項和日志項全部清除,如果給定的位移大于日志段本身的最大位移就什么都不做。最后函數(shù)返回日志數(shù)據(jù)總共截斷的字節(jié)數(shù)。值得注意的是,如果把所有日志數(shù)據(jù)都截斷了,那么需要更新這個日志段的創(chuàng)建日期。同時還會將檢查是否增加索引項的指針清零。
11. append方法:?將一組消息追加寫入到以給定offset開始的日志段中。如果寫入超過了4KB(默認(rèn)的log.index.interval.bytes屬性值)則額外寫入一條新的索引項記錄到索引文件中。這個方法不是線程安全的,所以后面調(diào)用的時候需要有鎖同步機(jī)制的保護(hù)
12. translateOffset方法:?給定一個offset,找出該日志段中不小于該offset的第一條消息對應(yīng)的物理文件位置。這個方法還有一個參數(shù)可以用來調(diào)優(yōu),不必從查詢到的索引項中包含的位置開始,可以直接從給定的文件位置開始查找。當(dāng)然這樣做的前提是你必須已經(jīng)知道這是文件中的一個合法的開始位置并且比最靠近的索引項中包含的位置要大。
13. read方法: 給定一個offset,從不小于這個offset處的第一條開始讀消息,不能超過maxSize個字節(jié),也必須在maxOffset(如果提供了maxOffset)處結(jié)束——讀到的這些消息封裝到一個FetchDataInfo對象返回。FetchDataInfo由一個日志位移元數(shù)據(jù)LogOffsetMetadata對象和一個消息集合組成,所謂的LogOffsetMetadata就是由消息offset加上該日志段的基礎(chǔ)位移再加上日志段內(nèi)的相對物理位置組成。這個方法有一個關(guān)鍵的問題是,要讀取消息集合到底多少字節(jié)?如果給定的maxSize是0,那么就返回一個空的消息集合。如果maxSize大于0且沒有指定maxOffset,那么就表示我們能夠讀取最多maxSize字節(jié)的消息;而如果maxSize>0且指定了maxOffset,程序就需要計算一下maxOffset所表示的物理文件位置與起始位置的差距和maxSize誰大誰小——同時也只能選取小的作為最終的可讀取字節(jié)數(shù)
14. recover方法: 恢復(fù)一個日志段——即根據(jù)日志文件重建索引并砍掉那些無效的字節(jié),所謂的無效字節(jié)就是由參數(shù)限定的,任何在maxMessageSize之外的字節(jié)都是為無效狀態(tài)。該方法實現(xiàn)也很簡單,就是先將索引項全部截斷并將索引文件重置為原來的大小,然后遍歷該消息集合,超過indexIntervalBytes之后就追加一條索引記錄從而達(dá)到重建索引的目的
2.4????????????? FileMessageSet
FileMessageSet繼承MessageSet類,通過FileChannel可以讀寫文件,是Segment中實際存放ByteMessageSet消息的文件對象。 FileChannel是Java NIO提供的類,實際上寫日志時用的是RandomAccessFile來打開文件,允許來回讀寫文件,指定位置讀寫。FileMessageSet類有一個起始和結(jié)束的指針標(biāo)識消息集合的起始位置和結(jié)束位置——這樣就能實現(xiàn)從整個消息集合中切片的功能。該類有5個構(gòu)造函數(shù)參數(shù):
1. file:?日志文件
2. channel:?底層使用到的文件通道(file channel)
3. start/end:?消息集合在文件中的絕對起始位置/絕對結(jié)束位置
4. isSlice:?是否從整個消息集合中切分處一個切片
除了主構(gòu)造函數(shù)之外還提供了很多便利的輔助構(gòu)造函數(shù)。另外FileMessageSet類定義了一個_size變量,用于保存消息集合的字節(jié)數(shù)(同時考慮了是否支持切片)。如果不是一個切片,則將底層文件通道的指針移到最后一個字節(jié)。該類定義的方法如下:
1. read:?從日志文件中的指定位置讀取指定大小的buffer并封裝到一個FileMessageSet對象返回。
2. sizeInBytes:?該文件消息集合字節(jié)數(shù)
3. searchFor:?從給定位置處開始向后尋找不小于targetOffset的位移,并返回實際的物理文件位置。如果沒有找到的話,直接返回null
4. writeTo:?寫入這個消息集合到指定的channel,允許從指定的位置寫入指定大小的字節(jié)數(shù),并返回真實寫入的字節(jié)數(shù)
5. iterator:?獲取遍歷該消息集合的迭代器,只做一層迭代
6. append:?將保存在一個ByteBuffer中的一組消息追加到指定的該消息集合所在的channel尾部并增加總的消息集合字節(jié)數(shù)
7. flush:?提交所有已寫數(shù)據(jù)到物理磁盤
8. close:?先調(diào)用flush存磁盤,然后關(guān)閉channel
9. delete:?從文件系統(tǒng)中刪除消息集合
10. truncateTo:?將文件消息集合截斷成指定的字節(jié)大小
11. readInto:?將底層的文件從給定的位置開始讀取內(nèi)容到一個ByteBuffer中
12. renameTo:?更名消息集合底層的文件名
除了FileMessageSet類,該scala還定義了一個object:?LogFlushStats——里面只定義了一個定時器,用于統(tǒng)計寫入日志段到文件的時間
2.5????????????? OffsetIndex
Segment的index文件, 這是0.8后加上的,之前message直接使用物理offset標(biāo)識。新版本中還是改成了使用邏輯offset,讓物理地址對用戶透明, 這樣就需要一個index來匹配邏輯offset和物理地址。index考慮到效率,最好放在內(nèi)存中,但是考慮到size問題, 所以使用MappedByteBuffer(參考,Java RandomAccessFile用法?)
注釋里面說,?Index是sparse的,不保證每個message在index都有索引的entry。Index由entry組成,每個entry為8-byte,邏輯offset4-byte,物理地址4-byte。
邏輯offset是基于base offset的相對offset,否則無法保證只使用4-byte,例如baseoffset為50,那么offset 55的邏輯offset為5。
物理地址是該offset所對應(yīng)的message所在日志文件的位置。因為只有4個字節(jié),這么看日志文件每個最大只能為4G?
下面開始分析代碼,首先從構(gòu)造函數(shù)開始,該構(gòu)造函數(shù)接收三個變量:一個表示索引文件的文件變量;一個基礎(chǔ)offset和一個變量表示最大的索引文件字節(jié)數(shù)。該類還定義了一些類成員變量和很多方法,我們一個一個分析:
1. lock:?私有字段,使用ReentrantLock實現(xiàn),用于同步訪問MappedByteBuffer。ReentrantLock提供與synchronized相同的內(nèi)存和并發(fā)性語義,另外性能也更好。
2. roundToExactMultiple:?私有方法,就是計算小于第一個參數(shù)的第二個參數(shù)的最大整數(shù)倍,比如roundToExactMultiple(67, 8)返回64
3. mmap:?私有字段,負(fù)責(zé)初始化包含該索引的內(nèi)存映射對象。首先檢查給定的File對象,如果不存在的話預(yù)先創(chuàng)建出來并設(shè)定長度為maxIndexSize,并設(shè)定好開始的位置之后返回。
4. size:?私有字段,索引文件中當(dāng)前保存的索引項(每項都是8字節(jié))
5. maxEntries:?成員變量,索引文件能包含的最大索引項個數(shù)
6. relativeOffset:?返回根據(jù)base offset的第n個位移。假設(shè)n是5,每項是8個字節(jié),那么相對位移的值(使用4個字節(jié))必然是保存在buffer的第40個字節(jié)到第43個字節(jié)。
7. physical:?獲取第n個位移對應(yīng)的物理文件位置(依然是4個字節(jié))——還是假設(shè)n=5,那么返回的值就是從44字節(jié)~47字節(jié)處保存的值。
8. readLastEntry:?讀取索引文件中最后一項對應(yīng)的OffsetPosition
9. lastOffset:?返回索引文件中最后一個索引項的位移
10. maybeLock:?在一個鎖保護(hù)的情況下執(zhí)行給定的方法
11. indexSlotFor:?以二分查找的方式尋找比給定offset小的最大offset。當(dāng)然了,如果最小的位移都比給定的offset大或者索引文件干脆就是空的話直接返回-1
12. lookup:?計算比給定offset小的最大位移,找到后返回offset-對應(yīng)物理文件位置的映射對
13. entry:?返回索引文件中的第n個位移映射對
14. append:?從給定的offset-position處插入一索引項。既然叫append,該項給定的offset必須比現(xiàn)有的所有索引項都要大
15. isFull:?判斷該索引文件是否已滿
16. truncate:?刪除所有索引項
17. truncateToEntries:?刪除索引項到給定的數(shù)目
18. truncateTo:?刪除那些位移不小于給定offset的所有索引項
19. resize:?重設(shè)索引文件的大小——主要用于新的日志segment切分時候調(diào)用。需要注意的是代碼中區(qū)分了操作系統(tǒng),因為Windows平臺不允許調(diào)整內(nèi)存映射文件的長度
20. forceUnmap:?主要為Windows平臺上使用。因為在Windows平臺上修改文件長度時需要先釋放內(nèi)存映射對象
21. trimToValidSize:?調(diào)整為當(dāng)前索引文件的真實占用字節(jié)大小
22. flush:?調(diào)用MappedByteBuffer的force方法將對buffer的修改寫入底層的文件
23. delete:?刪除該索引文件
24. entries:?返回索引文件中的索引項數(shù)
25. sizeInBytes:?索引文件當(dāng)前使用的索引項字節(jié)總數(shù)
26. close:?調(diào)用trimToValidSize方法關(guān)閉索引
27. renameTo:?重命名索引文件名稱
28. sanityCheck:?對索引文件進(jìn)行完整性檢查,包括索引文件字節(jié)數(shù)是否為8的整數(shù)倍、當(dāng)前最大位移是否小于基礎(chǔ)位移等
2.6????????????? OffsetPosition
是一個case classe類,是邏輯offset與物理地址直接的映射關(guān)系類。
/**
* The mapping between a logical log offset and the physical position
* in some log file of the beginning of the message set entry with the
* given offset.
*/
case class OffsetPosition(val offset: Long, val position: Int)
3.??? 其他包相關(guān)組件
3.1????????????? LogOffsetMetadata
/*
* A log offset structure, including:
*? 1. the message offset
*? 2. the base message offset of the located segment
*? 3. the physical position on the located segment
*/
case class LogOffsetMetadata(messageOffset: Long,
segmentBaseOffset: Long = LogOffsetMetadata.UnknownSegBaseOffset,
relativePositionInSegment: Int = LogOffsetMetadata.UnknownFilePosition)
該scala文件時一組伴生對象,定義了日志位移的元數(shù)據(jù)信息。該類定義了kafka的位移元數(shù)據(jù)結(jié)構(gòu),它包括:
1. 消息位移
2. 位移所在日志段的基礎(chǔ)位移(起始位移)
3. 所在段的物理位置
該類定義了一些方法用于獲取這些信息以及使用這些信息執(zhí)行一些判斷操作:
1. messageOffsetOnly:判斷位移元數(shù)據(jù)信息是否只包括消息位移部分的數(shù)據(jù),而其他兩部分為空
2. offsetOnOlderSegment:與給定的位移元數(shù)據(jù)實例相比較判斷這個位移是否是在一個比較舊的日志段上。
3. offsetOnSameSegment:與上個方法類似,只是這次比較兩個位移元數(shù)據(jù)信息是否在同一個日志段上
4. precedes:比較這個位移是否在給定位移之前
5. offsetDiff:計算此位移與給定位移之間所含的消息數(shù)
6. positionDiff:計算此位移與給定位移之間所差的字節(jié)數(shù)——前提是兩個位移位于同一日志段且此位移在給定位移之前出現(xiàn)。實現(xiàn)方法就是元數(shù)據(jù)信息中的段內(nèi)相對物理位置相減。
再說說LogOffsetMetadata object。它定義了三個常量分別代表未知位移的元數(shù)據(jù)、未知的段起始位移和位置的段內(nèi)物理文件位置。最后該object還定義了一個OffsetOrdering嵌套類實現(xiàn)了scala的Ordering接口因而支持兩個位移元數(shù)據(jù)實例的比較。compare方法就是調(diào)用兩個元數(shù)據(jù)的offsetDiff方法獲取兩個元數(shù)據(jù)之間的消息差值。
3.2????????????? FetchDataInfo
就是一個簡單的case類,由一個LogOffsetMetadata和一個MessageSet組成
case class FetchDataInfo(fetchOffset: LogOffsetMetadata, messageSet: MessageSet)
3.3????????????? ByteBufferMessageSet
ByteBufferMessageSet之前在Producer分析中也有提到,繼承了MessageSet類,是實際寫入log文件的結(jié)構(gòu)。我們先來看看這幾個帶MessageSet后序的類的關(guān)系:
ByteBufferMessageSet直接繼承MessageSet,而MessageSet繼承scala.collection.Iterable:
MessageSet extends Iterable[MessageAndOffset]
即MessageSet是MessageAndOffset類的集合。
Kafka
版權(quán)聲明:本文內(nèi)容由網(wǎng)絡(luò)用戶投稿,版權(quán)歸原作者所有,本站不擁有其著作權(quán),亦不承擔(dān)相應(yīng)法律責(zé)任。如果您發(fā)現(xiàn)本站中有涉嫌抄襲或描述失實的內(nèi)容,請聯(lián)系我們jiasou666@gmail.com 處理,核實后本網(wǎng)站將在24小時內(nèi)刪除侵權(quán)內(nèi)容。
版權(quán)聲明:本文內(nèi)容由網(wǎng)絡(luò)用戶投稿,版權(quán)歸原作者所有,本站不擁有其著作權(quán),亦不承擔(dān)相應(yīng)法律責(zé)任。如果您發(fā)現(xiàn)本站中有涉嫌抄襲或描述失實的內(nèi)容,請聯(lián)系我們jiasou666@gmail.com 處理,核實后本網(wǎng)站將在24小時內(nèi)刪除侵權(quán)內(nèi)容。