kafka源碼解析之三:Log模塊讀寫源碼分析——(三)
即MessageSet是MessageAndOffset類的集合。
case class MessageAndOffset(message: Message, offset: Long)
MessageAndOffset是一個case類,帶有Message和offset這兩個成員。
從名字就知道是帶ByteBuffer的MessageSet類,其構造函數類會調用create函數,里面就會創建一個ByteBuffer:
val buffer = ByteBuffer.allocate(MessageSet.messageSetSize(messages))
for(message <- messages)
writeMessage(buffer, message, offsetCounter.getAndIncrement)
buffer.rewind()
buffer
上面的writeMessage代碼如下:
private[Kafka] def writeMessage(buffer: ByteBuffer, message: Message, offset: Long) {
buffer.putLong(offset)
buffer.putInt(message.size)
buffer.put(message.buffer)
message.buffer.rewind()
}
從上面的函數我們看出buffer里是先寫offset和message.size后面再寫消息,這樣我們就可以看出不壓縮時消息的存儲格式為:
ByteBufferMessageSet的消息格式:
MessageSet => [Offset MessageSize Message]
Offset => int64
MessageSize => int32
Message 的消息格式:
Message => Crc MagicByte Attributes Key Value
Crc => int32
MagicByte => int8
Attributes => int8
Key => bytes
Value => bytes
4.??? Log.append 日志流程
流程圖如下:
這個函數把消息集合寫入到真正的日志文件中,并返還LogAppendInfo:
def append(messages: ByteBufferMessageSet, assignOffsets: Boolean = true): LogAppendInfo = {
val appendInfo = analyzeAndValidateMessageSet(messages)
// if we have any valid messages, append them to the log
if(appendInfo.shallowCount == 0)
return appendInfo
//去掉一些不合法的字節,這些不合法的字節是通過檢查CRC值來的
// trim any invalid bytes or partial messages before appending it to the on-dis?log
var validMessages = trimInvalidBytes(messages, appendInfo)
try {
// they are valid, insert them in the log
lock synchronized {
// nextOffsetMetadata是一個LogOffsetMetadata,通過updateLogEndOffset函數每次更新messageOffset字段,就能得到當前日志的lastOffset。下一次寫從這個offset查找就能知道下一次寫入的offset是什么
appendInfo.firstOffset = nextOffsetMetadata.messageOffset
if(assignOffsets) {
// assign offsets to the message set
val offset = new AtomicLong(nextOffsetMetadata.messageOffset)
try {
//對validMessages(ByteBufferMessageSet)消息組里面的每個消息的第一個字段offset進行賦值,這樣每條寫到日志里面的消息頭就有offset了
validMessages = validMessages.assignOffsets(offset, appendInfo.codec)
} catch {
case e: IOException => throw new KafkaException("Error in validating messages while appending to log '%s'".format(name), e)
}
// offset在validMessages.assignOffsets中每遇到一個消息會自增,所以lastoffset 就是offset值減一
appendInfo.lastOffset = offset.get - 1
} else {
// we are taking the offsets we are given
if(!appendInfo.offsetsMonotonic || appendInfo.firstOffset < nextOffsetMetadata.messageOffset)
throw new IllegalArgumentException("Out of order offsets found in " + messages)
}
//在assignOffsets里會重新壓縮,需要檢查消息長度是否過長
// re-validate message sizes since after re-compression some may exceed the limit
for(messageAndOffset <- validMessages.shallowIterator) {
if(MessageSet.entrySize(messageAndOffset.message) > config.maxMessageSize) {
// we record the original message set size instead of trimmed size
// to be consistent with pre-compression bytesRejectedRate recording
BrokerTopicStats.getBrokerTopicStats(topicAndPartition.topic).bytesRejectedRate.mark(messages.sizeInBytes)
BrokerTopicStats.getBrokerAllTopicsStats.bytesRejectedRate.mark(messages.sizeInBytes)
throw new MessageSizeTooLargeException("Message size is %d bytes which exceeds the maximum configured message size of %d."
.format(MessageSet.entrySize(messageAndOffset.message), config.maxMessageSize))
}
}
// check messages set size may be exceed config.segmentSize
if(validMessages.sizeInBytes > config.segmentSize) {
throw new MessageSetSizeTooLargeException("Message set size is %d bytes which exceeds the maximum configured segment size of %d."
.format(validMessages.sizeInBytes, config.segmentSize))
}
//如果當前的消息添加后超過active segments的文件長度,則創建一個新的日志文件再添加。
// maybe roll the log if this segment is full
val segment = maybeRoll(validMessages.sizeInBytes)
//把消息追加到active segment中,如果字節數足夠,就調用OffsetIndex.append添加索引
// now append to the log
segment.append(appendInfo.firstOffset, validMessages)
//更新nextOffsetMetadata變量到最新的offset
// increment the log end offset
updateLogEndOffset(appendInfo.lastOffset + 1)
trace("Appended message set to log %s with first offset: %d, next offset: %d, and messages: %s"
.format(this.name, appendInfo.firstOffset, nextOffsetMetadata.messageOffset, validMessages))
if(unflushedMessages >= config.flushInterval)
flush()
appendInfo
}
} catch {
case e: IOException => throw new KafkaStorageException("I/O exception in append to log '%s'".format(name), e)
}
}
4.1???? maybeRoll
我們再來看看maybeRoll:
private def maybeRoll(messagesSize: Int): LogSegment = {
val segment = activeSegment
if (segment.size > config.segmentSize - messagesSize ||
segment.size > 0 && time.milliseconds - segment.created > config.segmentMs - segment.rollJitterMs ||
segment.index.isFull) {
debug("Rolling new log segment in %s (log_size = %d/%d, index_size = %d/%d, age_ms = %d/%d)."
.format(name,
segment.size,
config.segmentSize,
segment.index.entries,
segment.index.maxEntries,
time.milliseconds - segment.created,
config.segmentMs - segment.rollJitterMs))
roll()
} else {
segment
}
}
如果當前的消息添加后超過active segments的文件長度或者segment創建時間太久就會切文件,否則直接返回active segment。
Roll代碼和注釋如下:
/**
* Roll the log over to a new active segment starting with the current logEndOffset.
* This will trim the index to the exact size of the number of entries it currently contains.
* @return The newly rolled segment
*/
def roll(): LogSegment = {
val start = time.nanoseconds
lock synchronized {
val newOffset = logEndOffset
val logFile = logFilename(dir, newOffset)
val indexFile = indexFilename(dir, newOffset)
//如果文件存在,則先刪除
for(file <- List(logFile, indexFile); if file.exists) {
warn("Newly rolled segment file " + file.getName + " already exists; deleting it first")
file.delete()
}
segments.lastEntry() match {
case null =>
case entry => entry.getValue.index.trimToValidSize()
}
//生成一個新的LogSegment
val segment = new LogSegment(dir,
startOffset = newOffset,
indexIntervalBytes = config.indexInterval,
maxIndexSize = config.maxIndexSize,
rollJitterMs = config.randomSegmentJitter,
time = time)
//添加到segments列表中
val prev = addSegment(segment)
if(prev != null)
throw new KafkaException("Trying to roll a new log segment for topic partition %s with start offset %d while it already exists.".format(name, newOffset))
//調度一個異步刷盤操作
// schedule an asynchronous flush of the old segment
scheduler.schedule("flush-log", () => flush(newOffset), delay = 0L)
info("Rolled new log segment for '" + name + "' in %.0f ms.".format((System.nanoTime - start) / (1000.0*1000.0)))
segment
}
}
我們再來看異步刷屏flush到底做了啥:
/**
* Flush log segments for all offsets up to offset-1
* @param offset The offset to flush up to (non-inclusive); the new recovery point
*/
def flush(offset: Long) : Unit = {
if (offset <= this.recoveryPoint)
return
debug("Flushing log '" + name + " up to offset " + offset + ", last flushed: " + lastFlushTime + " current time: " +
time.milliseconds + " unflushed = " + unflushedMessages)
for(segment <- logSegments(this.recoveryPoint, offset))
segment.flush()
lock synchronized {
if(offset > this.recoveryPoint) {
this.recoveryPoint = offset
lastflushedTime.set(time.milliseconds)
}
}
}
這個函數主要做的事情就是讀取從recveryPoint到offset之間日志段的刷屏,而Segment.flush 最后會分別調用FileMessageSet和OffsetIndex的flush函數刷盤。 最后刷完盤后會更新recoveryPoint到offset。
4.2???? Segment.append
這個函數是LogSegment提供的append,作用是將一組消息追加寫入到以給定offset開始的日志段中。如果寫入超過了4KB(默認的log.index.interval.bytes屬性值)則額外寫入一條新的索引項記錄到索引文件中。這個方法不是線程安全的,所以后面調用的時候需要有鎖同步機制的保護
/**
* Append the given messages starting with the given offset. Add
* an entry to the index if needed.
*
* It is assumed this method is being called from within a lock.
*
* @param offset The first offset in the message set.
* @param messages The messages to append.
*/
@nonthreadsafe
def append(offset: Long, messages: ByteBufferMessageSet) {
if (messages.sizeInBytes > 0) {
trace("Inserting %d bytes at offset %d at position %d".format(messages.sizeInBytes, offset, log.sizeInBytes()))
// append an entry to the index (if needed)
//如果自上次寫入index到現在之間寫入log日志的字節大于配置的indexIntervalBytes,則往索引文件總寫入當前offset。
if(bytesSinceLastIndexEntry > indexIntervalBytes) {
index.append(offset, log.sizeInBytes())
this.bytesSinceLastIndexEntry = 0
}
//調用FileMessageSet.append,把消息寫入到channel里
// append the messages
log.append(messages)
this.bytesSinceLastIndexEntry += messages.sizeInBytes
}
}
Kafka
版權聲明:本文內容由網絡用戶投稿,版權歸原作者所有,本站不擁有其著作權,亦不承擔相應法律責任。如果您發現本站中有涉嫌抄襲或描述失實的內容,請聯系我們jiasou666@gmail.com 處理,核實后本網站將在24小時內刪除侵權內容。
版權聲明:本文內容由網絡用戶投稿,版權歸原作者所有,本站不擁有其著作權,亦不承擔相應法律責任。如果您發現本站中有涉嫌抄襲或描述失實的內容,請聯系我們jiasou666@gmail.com 處理,核實后本網站將在24小時內刪除侵權內容。