Kafka中的offset總結

      網友投稿 4144 2025-03-31

      1.?????? 生產端offset

      Kafka接收到生產者發送的消息實際上是以日志文件的形式保存在對應分區的磁盤上。每條消息都有一個offset值來表示它在分區中的位置。每次寫入都是追加到文件的末尾,如下圖虛線框表示。

      如上圖所示,它代表一個日志文件,這個日志文件中有 9 條消息,第一條消息的 offset( logStartOffset)為 0,最后一條消息的 offset 為 8,LEO(Log End Offset)為 9 ,代表下一條待寫入的消息。日志文件的 HW(Low Watermark)為 6,表示消費者只能拉取到 offset 在 0 至 5 之間的消息, 而 offset 為 6 的消息對消費者而言是不可見的。

      每個分區副本都會維護自身的LEO,而ISR集合中最小的LEO即為分區的HW。

      2.????? 消費端offset

      消費者在消費時,也維護一個offset,表示消費到分區中的某個消息所在的位置。

      如上圖所示,ConsumerA的offset=9,表示ConsumerA已經消費完offset為8的那條數據,提交的offset值為9,下次消費從offset為9的數據開始消費。

      消費者提交的offset值維護在__consumer_offsets這個Topic中,具體維護在哪個分區中,是由消費者所在的消費者組groupid決定,計算方式是:groupid的hashCode值對50取余。當kafka環境正常而消費者不能消費時,有可能是對應的__consumer_offsets分區leader為none或-1,或者分區中的日志文件損壞導致。

      消費者提交offset方式可以是手動提交也可以是自動提交,相關的參數設置是enable.auto.commit,參數默認為true,表示每5秒拉取分區中最大的消息位移進行提交。參數設置為false時,需要手動提交offset,提交方式有同步提交(commitSync)和異步提交(commitAsync)兩種方式。同步提交會根據poll方法拉取最新位移進行提交,只要沒有發生不可恢復的錯誤,它就會阻塞消費線程直至提交完成。異步提交執行時不會阻塞消費線程,但有可能出現先提交的位移失敗了而后提交的位移成功了,如果重試,就會發生重復消費。對此,可設置遞增的序號來維護異步提交順序,也可以在退出或者rebalance前使用同步提交。

      消費者消費時,如果沒有對應的offset記錄會按auto.offset.reset的配置來消費,默認值為latest,表示從分區末尾開始消費。如果配置為earliest表示從分區起始處開始消費。在代碼中也可以通過seek()方法指定分區具體的offset處開始消費。另外,我們也可以重置消費者組的offset,具體方式見4重置Consumer offset。

      消費者消費提交的offset也會被定期清理,對應的參數是:

      offsets.retention.check.interval.ms:offset定期檢查數據過期周期

      Kafka中的offset總結

      offsets.retention.minutes:offset保留時長

      超過offsets.retention.minutes時間且offset沒有改變時,消費者提交的offset會被清理掉,再次消費時會按auto.offset.reset配置去消費。此時,會有數據丟失或者重復,可通過重置offset來解決。

      3.????? 數據目錄中checkpoint維護的offset

      Kafka每一個數據根目錄都會包含最基本的N個檢查點文件(xxx- checkpoint,之所以是N個,是因為隨著版本的更新在不斷新增checkpoint文件)和meta.properties文件,在創建topic的時候,如果當前broker中不止配置了一個data目錄,那么會挑選分區數量最少的那個data目錄來完成本次創建任務。

      各個checkpoint作用如下:

      recovery-point-offset-checkpoint:表示已經刷寫到磁盤的offset信息,對應LEO信息。kafka中會有一個定時任務負責將所有分區的LEO刷寫到恢復點文件recovery-point-offset-checkpoint中,定時周期由broker端參數log.flush.offset.checkpoint.interval.ms配置,默認值60000,即60s。Kafka在啟動時會檢查文件的完整性,如果沒有.kafka_cleanshutdown這個文件,就會進入一個recover邏輯,recover就是從此文件中的offset開始。

      replication-offset-checkpoint:用來存儲每個replica的HW,表示已經被commited的offset信息。失敗的follower開始恢復時,會首先將自己的日志截斷到上次的checkpointed時刻的HW,然后向leader拉取消息。kafka有一個定時任務負責將所有分區的HW刷寫到復制點文件replication-offset-checkpoint中,定時周期由broker端參數replica.high.watermark.checkpoint.interval.ms配置,默認值5000,即5s。

      log-start-offset-checkpoint:對應logStartOffset,用來標識日志的起始偏移量。kafka中有一個定時任務負責將所有分區的logStartOffset刷寫到起始點文件log-start-offset-checkpoint中,定時周期有broker端參數log.flush.start.offset.checkpoint.interval.ms配置,默認值60000,即60s。

      cleaner-offset-checkpoint:存了每個log的最后清理offset。

      4. 重置Consumer offset

      更新Offset由三個維度決定:Topic的作用域、重置策略、執行方案。

      Topic的作用域

      ?? --all-topics:為consumer group下所有topic的所有分區調整位移)

      ?? --topic t1 --topic t2:為指定的若干個topic的所有分區調整位移

      ?? --topic t1:0,1,2:為指定的topic分區調整位移

      重置策略

      ?? --to-earliest:把位移調整到分區當前最小位移

      ?? --to-latest:把位移調整到分區當前最新位移

      ?? --to-current:把位移調整到分區當前位移

      ?? --to-offset : 把位移調整到指定位移處

      ?? --shift-by N: 把位移調整到當前位移 + N處,注意N可以是負數,表示向前移動

      ?? --to-datetime :把位移調整到大于給定時間的最早位移處,datetime格式是yyyy-MM-ddTHH:mm:ss.xxx,比如2017-08-04T00:00:00.000

      ?? --by-duration :把位移調整到距離當前時間指定間隔的位移處,duration格式是PnDTnHnMnS,比如PT0H5M0S

      ?? --from-file :從CSV文件中讀取調整策略

      確定執行方案

      ?? 什么參數都不加:只是打印出位移調整方案,不具體執行

      ?? --execute:執行真正的位移調整

      ?? --export:把位移調整方案按照CSV格式打印,方便用戶成csv文件,供后續直接使用

      常用示例

      ?? 更新到當前group最初的offset位置

      bin/kafka-consumer-groups.sh --bootstrap-server IP:Port --group test-group --reset-offsets --all-topics --to-earliest --execute

      ?? 更新到指定的offset位置

      bin/kafka-consumer-groups.sh --bootstrap-server IP:Port --group test-group --reset-offsets --all-topics --to-offset 500000 --execute

      ?? 更新到當前offset位置(解決offset的異常)

      bin/kafka-consumer-groups.sh --bootstrap-server IP:Port --group test-group --reset-offsets --all-topics --to-current --execute

      ?? offset位置按設置的值進行位移

      bin/kafka-consumer-groups.sh --bootstrap-server IP:Port --group test-group --reset-offsets --all-topics --shift-by -100000 --execute

      ?? offset設置到指定時刻開始

      bin/kafka-consumer-groups.sh --bootstrap-server IP:Port --group test-group --reset-offsets --all-topics --to-datetime 2017-08-04T14:30:00.000

      參考文檔:

      深入理解Kafka核心設計與實踐原理? ?朱忠華著

      https://cloud.tencent.com/developer/article/1436988

      Kafka

      版權聲明:本文內容由網絡用戶投稿,版權歸原作者所有,本站不擁有其著作權,亦不承擔相應法律責任。如果您發現本站中有涉嫌抄襲或描述失實的內容,請聯系我們jiasou666@gmail.com 處理,核實后本網站將在24小時內刪除侵權內容。

      版權聲明:本文內容由網絡用戶投稿,版權歸原作者所有,本站不擁有其著作權,亦不承擔相應法律責任。如果您發現本站中有涉嫌抄襲或描述失實的內容,請聯系我們jiasou666@gmail.com 處理,核實后本網站將在24小時內刪除侵權內容。

      上一篇:項目管理進度是甘特圖嗎(項目進度表 甘特圖)
      下一篇:如何在Excel中按姓氏對全名進行排序?
      相關文章
      亚洲久本草在线中文字幕| 国产亚洲精品美女久久久久 | 亚洲日韩小电影在线观看| 亚洲精品视频在线观看视频| 亚洲AV永久无码精品一百度影院| 亚洲精品专区在线观看| 亚洲人成欧美中文字幕| 亚洲狠狠综合久久| 亚洲精品无码成人| 日韩欧美亚洲国产精品字幕久久久| 国产亚洲福利精品一区| 国产亚洲老熟女视频| 国产亚洲情侣一区二区无| 国产成人综合亚洲| 中文字幕日韩亚洲| 亚洲国产成人高清在线观看 | 亚洲欧洲日本国产| 亚洲综合色丁香麻豆| 亚洲国产一区二区a毛片| 亚洲精品乱码久久久久久蜜桃不卡| 亚洲熟伦熟女新五十路熟妇 | 无码色偷偷亚洲国内自拍| 亚洲国产精品综合久久20| 亚洲一级毛片免观看| 亚洲国产成人精品久久| 亚洲中文字幕无码av在线| 久久精品国产亚洲av水果派| 精品国产综合成人亚洲区| 国产亚洲精品无码成人| 亚洲男女内射在线播放| 亚洲第一区精品观看| 亚洲成av人片在线观看天堂无码 | 亚洲色婷婷六月亚洲婷婷6月| 精品国产亚洲一区二区在线观看| 亚洲老熟女五十路老熟女bbw| jlzzjlzz亚洲jzjzjz| 亚洲综合久久1区2区3区| 久久久久亚洲精品天堂| 久久亚洲精品成人AV| 亚洲av片不卡无码久久| 亚洲人成电影福利在线播放|