kafka源碼解析之三:Log模塊讀寫源碼分析——(四)

      網友投稿 1067 2025-04-04

      FileMessageSet.append的代碼比較簡單,直接寫到起FileChannel中

      /**

      * Append these messages to the message set

      */

      def append(messages: ByteBufferMessageSet) {

      val written = messages.writeTo(channel, 0, messages.sizeInBytes)

      _size.getAndAdd(written)

      }

      OffsetIndex.append代碼如下:

      /**

      * Append an entry for the given offset/location pair to the index. This entry must have a larger offset than all subsequent entries.

      */

      def append(offset: Long, position: Int) {

      inLock(lock) {

      require(!isFull, "Attempt to append to a full index (size = " + size + ").")

      if (size.get == 0 || offset > lastOffset) {

      debug("Adding index entry %d => %d to %s.".format(offset, position, file.getName))

      this.mmap.putInt((offset - baseOffset).toInt)

      this.mmap.putInt(position)

      this.size.incrementAndGet()

      this.lastOffset = offset

      require(entries * 8 == mmap.position, entries + " entries but file position in index is " + mmap.position + ".")

      } else {

      throw new InvalidOffsetException("Attempt to append an offset (%d) to position %d no larger than the last offset appended (%d) to %s."

      .format(offset, entries, lastOffset, file.getAbsolutePath))

      }

      }

      }

      代碼里面就比較明顯了,按照章節2中OffsetIndex的寫入描述,先寫4字節的offset-baseOffset,然后再寫入日志在log文件中的位置

      5.??? Log.read日志流程

      流程圖如下:

      這個函數通過指定開始讀的startOffset和最大讀長度等參數,返回FetchDataInfo信息。原理很簡單,就是從保存的segments map中找到baseOffset與startOffset最接近的segment,開始查找和讀取數據。

      /**

      * Read messages from the log

      *

      * @param startOffset The offset to begin reading at

      * @param maxLength The maximum number of bytes to read

      * @param maxOffset -The offset to read up to, exclusive. (i.e. the first offset NOT included in the resulting message set).

      *

      * @throws OffsetOutOfRangeException If startOffset is beyond the log end offset or before the base offset of the first segment.

      * @return The fetch data information including fetch starting offset metadata and messages read

      */

      def read(startOffset: Long, maxLength: Int, maxOffset: Option[Long] = None): FetchDataInfo = {

      trace("Reading %d bytes from offset %d in log %s of length %d bytes".format(maxLength, startOffset, name, size))

      // check if the offset is valid and in range

      //檢查一下startoffset是否就是nextOffset,如果是表明日志還不存在則返回空消息集。

      val next = nextOffsetMetadata.messageOffset

      if(startOffset == next)

      return FetchDataInfo(nextOffsetMetadata, MessageSet.Empty)

      //返回一個Map.Entry(baseOffset,LogSegment),其baseOffset是最大的小于等于startOffset的。

      var entry = segments.floorEntry(startOffset)

      // attempt to read beyond the log end offset is an error

      if(startOffset > next || entry == null)

      throw new OffsetOutOfRangeException("Request for offset %d but we only have log segments in the range %d to %d.".format(startOffset, segments.firstKey, next))

      // do the read on the segment with a base offset less than the target offset

      // but if that segment doesn't contain any messages with an offset greater than that

      // continue to read from successive segments until we get some messages or we reach the end of the log

      while(entry != null) {

      //調用LogSegment.read獲得fetchInfo

      val fetchInfo = entry.getValue.read(startOffset, maxOffset, maxLength)

      //如果為空,則找其連續的下一個Logsegment去讀取

      if(fetchInfo == null) {

      entry = segments.higherEntry(entry.getKey)

      } else {

      return fetchInfo

      }

      }

      // okay we are beyond the end of the last segment with no data fetched although the start offset is in range,

      // this can happen when all messages with offset larger than start offsets have been deleted.

      // In this case, we will return the empty set with log end offset metadata

      FetchDataInfo(nextOffsetMetadata, MessageSet.Empty)

      }

      5.1?? LogSegment.read

      /**

      * Read a message set from this segment beginning with the first offset >= startOffset. The message set will include

      * no more than maxSize bytes and will end before maxOffset if a maxOffset is specified.

      *

      * @param startOffset A lower bound on the first offset to include in the message set we read

      * @param maxSize The maximum number of bytes to include in the message set we read

      * @param maxOffset An optional maximum offset for the message set we read

      *

      * @return The fetched data and the offset metadata of the first message whose offset is >= startOffset,

      *???????? or null if the startOffset is larger than the largest offset in this log

      */

      @threadsafe

      def read(startOffset: Long, maxOffset: Option[Long], maxSize: Int): FetchDataInfo = {

      if(maxSize < 0)

      throw new IllegalArgumentException("Invalid max size for log read (%d)".format(maxSize))

      //獲得改logsegment的size

      val logSize = log.sizeInBytes // this may change, need to save a consistent copy

      //把邏輯offset轉為實際物理地址,并返回一個OffsetPosition,其offset是大于等于startOffset的。后面會對這個函數詳細介紹

      val startPosition = translateOffset(startOffset)

      // if the start position is already off the end of the log, return null

      if(startPosition == null)

      return null

      //生成一個LogOffsetMetadata對象,包含邏輯offset和物理地址等信息

      val offsetMetadata = new LogOffsetMetadata(startOffset, this.baseOffset, startPosition.position)

      // if the size is zero, still return a log segment but with zero size

      if(maxSize == 0)

      return FetchDataInfo(offsetMetadata, MessageSet.Empty)

      //因為這個接口可以指定最大讀取到的maxOffset,下面會計算實際最大能讀取的長度

      // calculate the length of the message set to read based on whether or not they gave us a maxOffset

      val length =

      maxOffset match {

      case None =>

      // no max offset, just use the max size they gave unmolested

      maxSize

      case Some(offset) => {

      // there is a max offset, translate it to a file position and use that to calculate the max read size

      if(offset < startOffset)

      throw new IllegalArgumentException("Attempt to read with a maximum offset (%d) less than the start offset (%d).".format(offset, startOffset))

      //再一次把max offset轉為物理地址,然后取maxSize與maxOffset到startOffset之間的小值作為讀取長度

      val mapping = translateOffset(offset, startPosition.position)

      val endPosition =

      if(mapping == null)

      logSize // the max offset is off the end of the log, use the end of the file

      else

      mapping.position

      min(endPosition - startPosition.position, maxSize)

      }

      }

      kafka源碼解析之三:Log模塊讀寫源碼分析——(四)

      //返回fetchDataInfo對象

      FetchDataInfo(offsetMetadata, log.read(startPosition.position, length))

      }

      代碼中一個主要函數是translateOffset把邏輯地址轉為實際物理地址:

      /**

      * Find the physical file position for the first message with offset >= the requested offset.

      *

      * The lowerBound argument is an optimization that can be used if we already know a valid starting position

      * in the file higher than the greatest-lower-bound from the index.

      *

      * @param offset The offset we want to translate

      * @param startingFilePosition A lower bound on the file position from which to begin the search. This is purely an optimization and

      * when omitted, the search will begin at the position in the offset index.

      *

      * @return The position in the log storing the message with the least offset >= the requested offset or null if no message meets this criteria.

      */

      @threadsafe

      private[log] def translateOffset(offset: Long, startingFilePosition: Int = 0): OffsetPosition = {

      //在index buffer中查找最大的小于等于offset的OffsetPosition關系

      val mapping = index.lookup(offset)

      //根據OffsetPosition得到的物理地址,在log日志查找更接近的OffsetPosition(大于等于目標offset的位置)

      log.searchFor(offset, max(mapping.position, startingFilePosition))

      }

      Index.lookup代碼:

      def lookup(targetOffset: Long): OffsetPosition = {

      maybeLock(lock) {

      //把byteBuffer復制一份,復制的副本和mmap之間是共享內容的,新緩沖區的position,limit,mark和capacity都初始化為原始緩沖區的索引值,然而,它們的這些值是相互獨立的

      val idx = mmap.duplicate

      //用二分法在index buffer中找到最大的小于等于targetOffset的位置。

      val slot = indexSlotFor(idx, targetOffset)

      //找到索引在那個位置后,返回OffsetPosistion類

      if(slot == -1)

      OffsetPosition(baseOffset, 0)

      else

      OffsetPosition(baseOffset + relativeOffset(idx, slot), physical(idx, slot))

      }

      }

      其中relativeOffset與physical函數,就是根據Bytebuffer中的位置slot,返回相對邏輯offset與物理地址:

      /* return the nth offset relative to the base offset */

      private def relativeOffset(buffer: ByteBuffer, n: Int): Int = buffer.getInt(n * 8)

      /* return the nth physical position */

      private def physical(buffer: ByteBuffer, n: Int): Int = buffer.getInt(n * 8 + 4)

      再看FileMessageSet.searchFor: 主要做的事情是從startingPosition開始,按順序找到第一個offset大于等于目標offset的消息位置,并返回OffsetPosition類。

      /**

      * Search forward for the file position of the last offset that is greater than or equal to the target offset

      * and return its physical position. If no such offsets are found, return null.

      * @param targetOffset The offset to search for.

      * @param startingPosition The starting position in the file to begin searching from.

      */

      def searchFor(targetOffset: Long, startingPosition: Int): OffsetPosition = {

      var position = startingPosition

      //分配一個LogOverHead大小的buffer,用于讀取消息頭,LogOverHead大小為12 byte(MessageSize 4, OffsetLength 8)

      val buffer = ByteBuffer.allocate(MessageSet.LogOverhead)

      val size = sizeInBytes()

      while(position + MessageSet.LogOverhead < size) {

      buffer.rewind()

      channel.read(buffer, position)

      //如果消息頭都讀不出來,拋出異常

      if(buffer.hasRemaining)

      throw new IllegalStateException("Failed to read complete buffer for targetOffset %d startPosition %d in %s"

      .format(targetOffset, startingPosition, file.getAbsolutePath))

      buffer.rewind()

      val offset = buffer.getLong()

      //返回大于等于targetOffset的OffsetPosition

      if(offset >= targetOffset)

      return OffsetPosition(offset, position)

      val messageSize = buffer.getInt()

      if(messageSize < Message.MessageOverhead)

      throw new IllegalStateException("Invalid message size: " + messageSize)

      //position移位到下一個消息,具體消息布局請看上面章節。

      position += MessageSet.LogOverhead + messageSize

      }

      null

      }

      至此,Log模塊的基本讀寫函數已經分析完畢

      開發者

      版權聲明:本文內容由網絡用戶投稿,版權歸原作者所有,本站不擁有其著作權,亦不承擔相應法律責任。如果您發現本站中有涉嫌抄襲或描述失實的內容,請聯系我們jiasou666@gmail.com 處理,核實后本網站將在24小時內刪除侵權內容。

      版權聲明:本文內容由網絡用戶投稿,版權歸原作者所有,本站不擁有其著作權,亦不承擔相應法律責任。如果您發現本站中有涉嫌抄襲或描述失實的內容,請聯系我們jiasou666@gmail.com 處理,核實后本網站將在24小時內刪除侵權內容。

      上一篇:Excel自動篩選的9個高級用法(Excel高級篩選的用法)
      下一篇:怎么把圖片中的表格轉換成表格(如何把表格圖片轉化為表格)
      相關文章
      麻豆亚洲AV成人无码久久精品 | 亚洲AV无码一区二区三区鸳鸯影院 | 亚洲H在线播放在线观看H| 国产亚洲av片在线观看18女人| AV激情亚洲男人的天堂国语| 久久久久亚洲av无码专区喷水| 亚洲欧洲美洲无码精品VA| 亚洲福利精品电影在线观看| 亚洲人成色77777在线观看| 亚洲一区欧洲一区| 亚洲一区二区久久| 亚洲mv国产精品mv日本mv| 91亚洲精品麻豆| 亚洲一区二区三区在线观看蜜桃| 亚洲中文无码线在线观看| 亚洲毛片基地日韩毛片基地| 亚洲专区中文字幕| 亚洲中文字幕乱码AV波多JI| 亚洲欧美精品午睡沙发| 怡红院亚洲红怡院在线观看| 亚洲福利精品电影在线观看| 国产日产亚洲系列最新| 亚洲日本va一区二区三区| 亚洲AⅤ男人的天堂在线观看| 日韩精品电影一区亚洲| 亚洲成av人片一区二区三区 | 亚洲视频在线视频| 亚洲伊人久久大香线蕉啊| 亚洲一区二区三区国产精品无码| 亚洲熟妇无码AV| 日韩精品亚洲专区在线观看| 2048亚洲精品国产| 亚洲AV无码成人精品区在线观看 | 国产成人亚洲综合a∨| 久久久久亚洲AV成人网人人网站 | 色婷婷亚洲一区二区三区| 国产精品V亚洲精品V日韩精品| 亚洲乱码日产一区三区| 亚洲av无码一区二区三区网站| 亚洲码在线中文在线观看| 中文有码亚洲制服av片|