kafka源碼解析之三:Log模塊讀寫源碼分析——(三)

      網友投稿 946 2025-03-31

      即MessageSet是MessageAndOffset類的集合。

      case class MessageAndOffset(message: Message, offset: Long)

      MessageAndOffset是一個case類,帶有Message和offset這兩個成員。

      從名字就知道是帶ByteBuffer的MessageSet類,其構造函數類會調用create函數,里面就會創建一個ByteBuffer:

      val buffer = ByteBuffer.allocate(MessageSet.messageSetSize(messages))

      for(message <- messages)

      writeMessage(buffer, message, offsetCounter.getAndIncrement)

      buffer.rewind()

      buffer

      上面的writeMessage代碼如下:

      private[Kafka] def writeMessage(buffer: ByteBuffer, message: Message, offset: Long) {

      buffer.putLong(offset)

      buffer.putInt(message.size)

      buffer.put(message.buffer)

      message.buffer.rewind()

      }

      從上面的函數我們看出buffer里是先寫offset和message.size后面再寫消息,這樣我們就可以看出不壓縮時消息的存儲格式為:

      ByteBufferMessageSet的消息格式:

      MessageSet => [Offset MessageSize Message]

      Offset => int64

      MessageSize => int32

      Message 的消息格式:

      Message => Crc MagicByte Attributes Key Value

      Crc => int32

      MagicByte => int8

      Attributes => int8

      Key => bytes

      Value => bytes

      4.??? Log.append 日志流程

      流程圖如下:

      這個函數把消息集合寫入到真正的日志文件中,并返還LogAppendInfo:

      def append(messages: ByteBufferMessageSet, assignOffsets: Boolean = true): LogAppendInfo = {

      val appendInfo = analyzeAndValidateMessageSet(messages)

      // if we have any valid messages, append them to the log

      if(appendInfo.shallowCount == 0)

      return appendInfo

      //去掉一些不合法的字節,這些不合法的字節是通過檢查CRC值來的

      // trim any invalid bytes or partial messages before appending it to the on-dis?log

      var validMessages = trimInvalidBytes(messages, appendInfo)

      try {

      // they are valid, insert them in the log

      lock synchronized {

      // nextOffsetMetadata是一個LogOffsetMetadata,通過updateLogEndOffset函數每次更新messageOffset字段,就能得到當前日志的lastOffset。下一次寫從這個offset查找就能知道下一次寫入的offset是什么

      appendInfo.firstOffset = nextOffsetMetadata.messageOffset

      if(assignOffsets) {

      // assign offsets to the message set

      val offset = new AtomicLong(nextOffsetMetadata.messageOffset)

      try {

      //對validMessages(ByteBufferMessageSet)消息組里面的每個消息的第一個字段offset進行賦值,這樣每條寫到日志里面的消息頭就有offset了

      validMessages = validMessages.assignOffsets(offset, appendInfo.codec)

      } catch {

      case e: IOException => throw new KafkaException("Error in validating messages while appending to log '%s'".format(name), e)

      }

      // offset在validMessages.assignOffsets中每遇到一個消息會自增,所以lastoffset 就是offset值減一

      appendInfo.lastOffset = offset.get - 1

      } else {

      // we are taking the offsets we are given

      if(!appendInfo.offsetsMonotonic || appendInfo.firstOffset < nextOffsetMetadata.messageOffset)

      throw new IllegalArgumentException("Out of order offsets found in " + messages)

      }

      //在assignOffsets里會重新壓縮,需要檢查消息長度是否過長

      // re-validate message sizes since after re-compression some may exceed the limit

      for(messageAndOffset <- validMessages.shallowIterator) {

      if(MessageSet.entrySize(messageAndOffset.message) > config.maxMessageSize) {

      // we record the original message set size instead of trimmed size

      // to be consistent with pre-compression bytesRejectedRate recording

      BrokerTopicStats.getBrokerTopicStats(topicAndPartition.topic).bytesRejectedRate.mark(messages.sizeInBytes)

      BrokerTopicStats.getBrokerAllTopicsStats.bytesRejectedRate.mark(messages.sizeInBytes)

      throw new MessageSizeTooLargeException("Message size is %d bytes which exceeds the maximum configured message size of %d."

      .format(MessageSet.entrySize(messageAndOffset.message), config.maxMessageSize))

      }

      }

      // check messages set size may be exceed config.segmentSize

      if(validMessages.sizeInBytes > config.segmentSize) {

      throw new MessageSetSizeTooLargeException("Message set size is %d bytes which exceeds the maximum configured segment size of %d."

      .format(validMessages.sizeInBytes, config.segmentSize))

      }

      //如果當前的消息添加后超過active segments的文件長度,則創建一個新的日志文件再添加。

      // maybe roll the log if this segment is full

      val segment = maybeRoll(validMessages.sizeInBytes)

      //把消息追加到active segment中,如果字節數足夠,就調用OffsetIndex.append添加索引

      // now append to the log

      segment.append(appendInfo.firstOffset, validMessages)

      //更新nextOffsetMetadata變量到最新的offset

      // increment the log end offset

      updateLogEndOffset(appendInfo.lastOffset + 1)

      trace("Appended message set to log %s with first offset: %d, next offset: %d, and messages: %s"

      .format(this.name, appendInfo.firstOffset, nextOffsetMetadata.messageOffset, validMessages))

      if(unflushedMessages >= config.flushInterval)

      flush()

      appendInfo

      }

      } catch {

      case e: IOException => throw new KafkaStorageException("I/O exception in append to log '%s'".format(name), e)

      }

      }

      4.1???? maybeRoll

      我們再來看看maybeRoll:

      private def maybeRoll(messagesSize: Int): LogSegment = {

      val segment = activeSegment

      if (segment.size > config.segmentSize - messagesSize ||

      segment.size > 0 && time.milliseconds - segment.created > config.segmentMs - segment.rollJitterMs ||

      segment.index.isFull) {

      debug("Rolling new log segment in %s (log_size = %d/%d, index_size = %d/%d, age_ms = %d/%d)."

      .format(name,

      segment.size,

      config.segmentSize,

      segment.index.entries,

      segment.index.maxEntries,

      time.milliseconds - segment.created,

      config.segmentMs - segment.rollJitterMs))

      roll()

      } else {

      segment

      }

      }

      如果當前的消息添加后超過active segments的文件長度或者segment創建時間太久就會切文件,否則直接返回active segment。

      Roll代碼和注釋如下:

      /**

      * Roll the log over to a new active segment starting with the current logEndOffset.

      * This will trim the index to the exact size of the number of entries it currently contains.

      * @return The newly rolled segment

      */

      def roll(): LogSegment = {

      val start = time.nanoseconds

      lock synchronized {

      val newOffset = logEndOffset

      val logFile = logFilename(dir, newOffset)

      val indexFile = indexFilename(dir, newOffset)

      //如果文件存在,則先刪除

      for(file <- List(logFile, indexFile); if file.exists) {

      warn("Newly rolled segment file " + file.getName + " already exists; deleting it first")

      file.delete()

      }

      segments.lastEntry() match {

      case null =>

      case entry => entry.getValue.index.trimToValidSize()

      }

      //生成一個新的LogSegment

      kafka源碼解析之三:Log模塊讀寫源碼分析——(三)

      val segment = new LogSegment(dir,

      startOffset = newOffset,

      indexIntervalBytes = config.indexInterval,

      maxIndexSize = config.maxIndexSize,

      rollJitterMs = config.randomSegmentJitter,

      time = time)

      //添加到segments列表中

      val prev = addSegment(segment)

      if(prev != null)

      throw new KafkaException("Trying to roll a new log segment for topic partition %s with start offset %d while it already exists.".format(name, newOffset))

      //調度一個異步刷盤操作

      // schedule an asynchronous flush of the old segment

      scheduler.schedule("flush-log", () => flush(newOffset), delay = 0L)

      info("Rolled new log segment for '" + name + "' in %.0f ms.".format((System.nanoTime - start) / (1000.0*1000.0)))

      segment

      }

      }

      我們再來看異步刷屏flush到底做了啥:

      /**

      * Flush log segments for all offsets up to offset-1

      * @param offset The offset to flush up to (non-inclusive); the new recovery point

      */

      def flush(offset: Long) : Unit = {

      if (offset <= this.recoveryPoint)

      return

      debug("Flushing log '" + name + " up to offset " + offset + ", last flushed: " + lastFlushTime + " current time: " +

      time.milliseconds + " unflushed = " + unflushedMessages)

      for(segment <- logSegments(this.recoveryPoint, offset))

      segment.flush()

      lock synchronized {

      if(offset > this.recoveryPoint) {

      this.recoveryPoint = offset

      lastflushedTime.set(time.milliseconds)

      }

      }

      }

      這個函數主要做的事情就是讀取從recveryPoint到offset之間日志段的刷屏,而Segment.flush 最后會分別調用FileMessageSet和OffsetIndex的flush函數刷盤。 最后刷完盤后會更新recoveryPoint到offset。

      4.2???? Segment.append

      這個函數是LogSegment提供的append,作用是將一組消息追加寫入到以給定offset開始的日志段中。如果寫入超過了4KB(默認的log.index.interval.bytes屬性值)則額外寫入一條新的索引項記錄到索引文件中。這個方法不是線程安全的,所以后面調用的時候需要有鎖同步機制的保護

      /**

      * Append the given messages starting with the given offset. Add

      * an entry to the index if needed.

      *

      * It is assumed this method is being called from within a lock.

      *

      * @param offset The first offset in the message set.

      * @param messages The messages to append.

      */

      @nonthreadsafe

      def append(offset: Long, messages: ByteBufferMessageSet) {

      if (messages.sizeInBytes > 0) {

      trace("Inserting %d bytes at offset %d at position %d".format(messages.sizeInBytes, offset, log.sizeInBytes()))

      // append an entry to the index (if needed)

      //如果自上次寫入index到現在之間寫入log日志的字節大于配置的indexIntervalBytes,則往索引文件總寫入當前offset。

      if(bytesSinceLastIndexEntry > indexIntervalBytes) {

      index.append(offset, log.sizeInBytes())

      this.bytesSinceLastIndexEntry = 0

      }

      //調用FileMessageSet.append,把消息寫入到channel里

      // append the messages

      log.append(messages)

      this.bytesSinceLastIndexEntry += messages.sizeInBytes

      }

      }

      Kafka

      版權聲明:本文內容由網絡用戶投稿,版權歸原作者所有,本站不擁有其著作權,亦不承擔相應法律責任。如果您發現本站中有涉嫌抄襲或描述失實的內容,請聯系我們jiasou666@gmail.com 處理,核實后本網站將在24小時內刪除侵權內容。

      版權聲明:本文內容由網絡用戶投稿,版權歸原作者所有,本站不擁有其著作權,亦不承擔相應法律責任。如果您發現本站中有涉嫌抄襲或描述失實的內容,請聯系我們jiasou666@gmail.com 處理,核實后本網站將在24小時內刪除侵權內容。

      上一篇:高效定制家居生產設備:實現夢想的關鍵!
      下一篇:新版本保存文件后發送到微信上,顯示是問號的,有的打不開,為什么?怎么解決?
      相關文章
      亚洲AV永久无码天堂影院| 久久亚洲精品中文字幕无码| 国产亚洲真人做受在线观看| 在线亚洲午夜片AV大片| 亚洲妇女水蜜桃av网网站| 亚洲激情在线视频| 亚洲热妇无码AV在线播放| 亚洲桃色AV无码| 亚洲色无码专区在线观看| 亚洲国产综合精品中文字幕| 国产成人 亚洲欧洲| 国产一区二区三区亚洲综合| 黑人粗长大战亚洲女2021国产精品成人免费视频 | 亚洲国产中文在线视频| 亚洲精品美女久久久久| 亚洲高清不卡视频| 亚洲永久中文字幕在线| 亚洲乱码中文字幕小综合| 国产日本亚洲一区二区三区| 久久亚洲国产最新网站| 亚洲AV无码男人的天堂| 国产亚洲精品美女久久久久| 极品色天使在线婷婷天堂亚洲| 国产偷国产偷亚洲高清人| 亚洲精品视频在线看| 国产亚洲成av人片在线观看| 久久久久亚洲av无码尤物| 亚洲午夜精品久久久久久人妖| 亚洲综合激情九月婷婷| 亚洲资源最新版在线观看| 亚洲欧洲AV无码专区| 国产一区二区三区亚洲综合 | 亚洲国产精品一区二区久| 国产人成亚洲第一网站在线播放| 亚洲高清国产拍精品熟女| 亚洲精品无码你懂的网站| 亚洲狠狠婷婷综合久久久久 | 亚洲人成电影福利在线播放 | 亚洲国产精品成人精品软件 | 亚洲色WWW成人永久网址| 久久亚洲国产伦理|