XML DOM 獲取節點值
1067
2025-04-04
FileMessageSet.append的代碼比較簡單,直接寫到起FileChannel中
/**
* Append these messages to the message set
*/
def append(messages: ByteBufferMessageSet) {
val written = messages.writeTo(channel, 0, messages.sizeInBytes)
_size.getAndAdd(written)
}
OffsetIndex.append代碼如下:
/**
* Append an entry for the given offset/location pair to the index. This entry must have a larger offset than all subsequent entries.
*/
def append(offset: Long, position: Int) {
inLock(lock) {
require(!isFull, "Attempt to append to a full index (size = " + size + ").")
if (size.get == 0 || offset > lastOffset) {
debug("Adding index entry %d => %d to %s.".format(offset, position, file.getName))
this.mmap.putInt((offset - baseOffset).toInt)
this.mmap.putInt(position)
this.size.incrementAndGet()
this.lastOffset = offset
require(entries * 8 == mmap.position, entries + " entries but file position in index is " + mmap.position + ".")
} else {
throw new InvalidOffsetException("Attempt to append an offset (%d) to position %d no larger than the last offset appended (%d) to %s."
.format(offset, entries, lastOffset, file.getAbsolutePath))
}
}
}
代碼里面就比較明顯了,按照章節2中OffsetIndex的寫入描述,先寫4字節的offset-baseOffset,然后再寫入日志在log文件中的位置
5.??? Log.read日志流程
流程圖如下:
這個函數通過指定開始讀的startOffset和最大讀長度等參數,返回FetchDataInfo信息。原理很簡單,就是從保存的segments map中找到baseOffset與startOffset最接近的segment,開始查找和讀取數據。
/**
* Read messages from the log
*
* @param startOffset The offset to begin reading at
* @param maxLength The maximum number of bytes to read
* @param maxOffset -The offset to read up to, exclusive. (i.e. the first offset NOT included in the resulting message set).
*
* @throws OffsetOutOfRangeException If startOffset is beyond the log end offset or before the base offset of the first segment.
* @return The fetch data information including fetch starting offset metadata and messages read
*/
def read(startOffset: Long, maxLength: Int, maxOffset: Option[Long] = None): FetchDataInfo = {
trace("Reading %d bytes from offset %d in log %s of length %d bytes".format(maxLength, startOffset, name, size))
// check if the offset is valid and in range
//檢查一下startoffset是否就是nextOffset,如果是表明日志還不存在則返回空消息集。
val next = nextOffsetMetadata.messageOffset
if(startOffset == next)
return FetchDataInfo(nextOffsetMetadata, MessageSet.Empty)
//返回一個Map.Entry(baseOffset,LogSegment),其baseOffset是最大的小于等于startOffset的。
var entry = segments.floorEntry(startOffset)
// attempt to read beyond the log end offset is an error
if(startOffset > next || entry == null)
throw new OffsetOutOfRangeException("Request for offset %d but we only have log segments in the range %d to %d.".format(startOffset, segments.firstKey, next))
// do the read on the segment with a base offset less than the target offset
// but if that segment doesn't contain any messages with an offset greater than that
// continue to read from successive segments until we get some messages or we reach the end of the log
while(entry != null) {
//調用LogSegment.read獲得fetchInfo
val fetchInfo = entry.getValue.read(startOffset, maxOffset, maxLength)
//如果為空,則找其連續的下一個Logsegment去讀取
if(fetchInfo == null) {
entry = segments.higherEntry(entry.getKey)
} else {
return fetchInfo
}
}
// okay we are beyond the end of the last segment with no data fetched although the start offset is in range,
// this can happen when all messages with offset larger than start offsets have been deleted.
// In this case, we will return the empty set with log end offset metadata
FetchDataInfo(nextOffsetMetadata, MessageSet.Empty)
}
5.1?? LogSegment.read
/**
* Read a message set from this segment beginning with the first offset >= startOffset. The message set will include
* no more than maxSize bytes and will end before maxOffset if a maxOffset is specified.
*
* @param startOffset A lower bound on the first offset to include in the message set we read
* @param maxSize The maximum number of bytes to include in the message set we read
* @param maxOffset An optional maximum offset for the message set we read
*
* @return The fetched data and the offset metadata of the first message whose offset is >= startOffset,
*???????? or null if the startOffset is larger than the largest offset in this log
*/
@threadsafe
def read(startOffset: Long, maxOffset: Option[Long], maxSize: Int): FetchDataInfo = {
if(maxSize < 0)
throw new IllegalArgumentException("Invalid max size for log read (%d)".format(maxSize))
//獲得改logsegment的size
val logSize = log.sizeInBytes // this may change, need to save a consistent copy
//把邏輯offset轉為實際物理地址,并返回一個OffsetPosition,其offset是大于等于startOffset的。后面會對這個函數詳細介紹
val startPosition = translateOffset(startOffset)
// if the start position is already off the end of the log, return null
if(startPosition == null)
return null
//生成一個LogOffsetMetadata對象,包含邏輯offset和物理地址等信息
val offsetMetadata = new LogOffsetMetadata(startOffset, this.baseOffset, startPosition.position)
// if the size is zero, still return a log segment but with zero size
if(maxSize == 0)
return FetchDataInfo(offsetMetadata, MessageSet.Empty)
//因為這個接口可以指定最大讀取到的maxOffset,下面會計算實際最大能讀取的長度
// calculate the length of the message set to read based on whether or not they gave us a maxOffset
val length =
maxOffset match {
case None =>
// no max offset, just use the max size they gave unmolested
maxSize
case Some(offset) => {
// there is a max offset, translate it to a file position and use that to calculate the max read size
if(offset < startOffset)
throw new IllegalArgumentException("Attempt to read with a maximum offset (%d) less than the start offset (%d).".format(offset, startOffset))
//再一次把max offset轉為物理地址,然后取maxSize與maxOffset到startOffset之間的小值作為讀取長度
val mapping = translateOffset(offset, startPosition.position)
val endPosition =
if(mapping == null)
logSize // the max offset is off the end of the log, use the end of the file
else
mapping.position
min(endPosition - startPosition.position, maxSize)
}
}
//返回fetchDataInfo對象
FetchDataInfo(offsetMetadata, log.read(startPosition.position, length))
}
代碼中一個主要函數是translateOffset把邏輯地址轉為實際物理地址:
/**
* Find the physical file position for the first message with offset >= the requested offset.
*
* The lowerBound argument is an optimization that can be used if we already know a valid starting position
* in the file higher than the greatest-lower-bound from the index.
*
* @param offset The offset we want to translate
* @param startingFilePosition A lower bound on the file position from which to begin the search. This is purely an optimization and
* when omitted, the search will begin at the position in the offset index.
*
* @return The position in the log storing the message with the least offset >= the requested offset or null if no message meets this criteria.
*/
@threadsafe
private[log] def translateOffset(offset: Long, startingFilePosition: Int = 0): OffsetPosition = {
//在index buffer中查找最大的小于等于offset的OffsetPosition關系
val mapping = index.lookup(offset)
//根據OffsetPosition得到的物理地址,在log日志查找更接近的OffsetPosition(大于等于目標offset的位置)
log.searchFor(offset, max(mapping.position, startingFilePosition))
}
Index.lookup代碼:
def lookup(targetOffset: Long): OffsetPosition = {
maybeLock(lock) {
//把byteBuffer復制一份,復制的副本和mmap之間是共享內容的,新緩沖區的position,limit,mark和capacity都初始化為原始緩沖區的索引值,然而,它們的這些值是相互獨立的
val idx = mmap.duplicate
//用二分法在index buffer中找到最大的小于等于targetOffset的位置。
val slot = indexSlotFor(idx, targetOffset)
//找到索引在那個位置后,返回OffsetPosistion類
if(slot == -1)
OffsetPosition(baseOffset, 0)
else
OffsetPosition(baseOffset + relativeOffset(idx, slot), physical(idx, slot))
}
}
其中relativeOffset與physical函數,就是根據Bytebuffer中的位置slot,返回相對邏輯offset與物理地址:
/* return the nth offset relative to the base offset */
private def relativeOffset(buffer: ByteBuffer, n: Int): Int = buffer.getInt(n * 8)
/* return the nth physical position */
private def physical(buffer: ByteBuffer, n: Int): Int = buffer.getInt(n * 8 + 4)
再看FileMessageSet.searchFor: 主要做的事情是從startingPosition開始,按順序找到第一個offset大于等于目標offset的消息位置,并返回OffsetPosition類。
/**
* Search forward for the file position of the last offset that is greater than or equal to the target offset
* and return its physical position. If no such offsets are found, return null.
* @param targetOffset The offset to search for.
* @param startingPosition The starting position in the file to begin searching from.
*/
def searchFor(targetOffset: Long, startingPosition: Int): OffsetPosition = {
var position = startingPosition
//分配一個LogOverHead大小的buffer,用于讀取消息頭,LogOverHead大小為12 byte(MessageSize 4, OffsetLength 8)
val buffer = ByteBuffer.allocate(MessageSet.LogOverhead)
val size = sizeInBytes()
while(position + MessageSet.LogOverhead < size) {
buffer.rewind()
channel.read(buffer, position)
//如果消息頭都讀不出來,拋出異常
if(buffer.hasRemaining)
throw new IllegalStateException("Failed to read complete buffer for targetOffset %d startPosition %d in %s"
.format(targetOffset, startingPosition, file.getAbsolutePath))
buffer.rewind()
val offset = buffer.getLong()
//返回大于等于targetOffset的OffsetPosition
if(offset >= targetOffset)
return OffsetPosition(offset, position)
val messageSize = buffer.getInt()
if(messageSize < Message.MessageOverhead)
throw new IllegalStateException("Invalid message size: " + messageSize)
//position移位到下一個消息,具體消息布局請看上面章節。
position += MessageSet.LogOverhead + messageSize
}
null
}
至此,Log模塊的基本讀寫函數已經分析完畢
開發者
版權聲明:本文內容由網絡用戶投稿,版權歸原作者所有,本站不擁有其著作權,亦不承擔相應法律責任。如果您發現本站中有涉嫌抄襲或描述失實的內容,請聯系我們jiasou666@gmail.com 處理,核實后本網站將在24小時內刪除侵權內容。
版權聲明:本文內容由網絡用戶投稿,版權歸原作者所有,本站不擁有其著作權,亦不承擔相應法律責任。如果您發現本站中有涉嫌抄襲或描述失實的內容,請聯系我們jiasou666@gmail.com 處理,核實后本網站將在24小時內刪除侵權內容。