Kafka中的offset總結
1.?????? 生產端offset
Kafka接收到生產者發送的消息實際上是以日志文件的形式保存在對應分區的磁盤上。每條消息都有一個offset值來表示它在分區中的位置。每次寫入都是追加到文件的末尾,如下圖虛線框表示。
如上圖所示,它代表一個日志文件,這個日志文件中有 9 條消息,第一條消息的 offset( logStartOffset)為 0,最后一條消息的 offset 為 8,LEO(Log End Offset)為 9 ,代表下一條待寫入的消息。日志文件的 HW(Low Watermark)為 6,表示消費者只能拉取到 offset 在 0 至 5 之間的消息, 而 offset 為 6 的消息對消費者而言是不可見的。
每個分區副本都會維護自身的LEO,而ISR集合中最小的LEO即為分區的HW。
2.????? 消費端offset
消費者在消費時,也維護一個offset,表示消費到分區中的某個消息所在的位置。
如上圖所示,ConsumerA的offset=9,表示ConsumerA已經消費完offset為8的那條數據,提交的offset值為9,下次消費從offset為9的數據開始消費。
消費者提交的offset值維護在__consumer_offsets這個Topic中,具體維護在哪個分區中,是由消費者所在的消費者組groupid決定,計算方式是:groupid的hashCode值對50取余。當kafka環境正常而消費者不能消費時,有可能是對應的__consumer_offsets分區leader為none或-1,或者分區中的日志文件損壞導致。
消費者提交offset方式可以是手動提交也可以是自動提交,相關的參數設置是enable.auto.commit,參數默認為true,表示每5秒拉取分區中最大的消息位移進行提交。參數設置為false時,需要手動提交offset,提交方式有同步提交(commitSync)和異步提交(commitAsync)兩種方式。同步提交會根據poll方法拉取最新位移進行提交,只要沒有發生不可恢復的錯誤,它就會阻塞消費線程直至提交完成。異步提交執行時不會阻塞消費線程,但有可能出現先提交的位移失敗了而后提交的位移成功了,如果重試,就會發生重復消費。對此,可設置遞增的序號來維護異步提交順序,也可以在退出或者rebalance前使用同步提交。
消費者消費時,如果沒有對應的offset記錄會按auto.offset.reset的配置來消費,默認值為latest,表示從分區末尾開始消費。如果配置為earliest表示從分區起始處開始消費。在代碼中也可以通過seek()方法指定分區具體的offset處開始消費。另外,我們也可以重置消費者組的offset,具體方式見4重置Consumer offset。
消費者消費提交的offset也會被定期清理,對應的參數是:
offsets.retention.check.interval.ms:offset定期檢查數據過期周期
offsets.retention.minutes:offset保留時長
超過offsets.retention.minutes時間且offset沒有改變時,消費者提交的offset會被清理掉,再次消費時會按auto.offset.reset配置去消費。此時,會有數據丟失或者重復,可通過重置offset來解決。
3.????? 數據目錄中checkpoint維護的offset
Kafka每一個數據根目錄都會包含最基本的N個檢查點文件(xxx- checkpoint,之所以是N個,是因為隨著版本的更新在不斷新增checkpoint文件)和meta.properties文件,在創建topic的時候,如果當前broker中不止配置了一個data目錄,那么會挑選分區數量最少的那個data目錄來完成本次創建任務。
各個checkpoint作用如下:
recovery-point-offset-checkpoint:表示已經刷寫到磁盤的offset信息,對應LEO信息。kafka中會有一個定時任務負責將所有分區的LEO刷寫到恢復點文件recovery-point-offset-checkpoint中,定時周期由broker端參數log.flush.offset.checkpoint.interval.ms配置,默認值60000,即60s。Kafka在啟動時會檢查文件的完整性,如果沒有.kafka_cleanshutdown這個文件,就會進入一個recover邏輯,recover就是從此文件中的offset開始。
replication-offset-checkpoint:用來存儲每個replica的HW,表示已經被commited的offset信息。失敗的follower開始恢復時,會首先將自己的日志截斷到上次的checkpointed時刻的HW,然后向leader拉取消息。kafka有一個定時任務負責將所有分區的HW刷寫到復制點文件replication-offset-checkpoint中,定時周期由broker端參數replica.high.watermark.checkpoint.interval.ms配置,默認值5000,即5s。
log-start-offset-checkpoint:對應logStartOffset,用來標識日志的起始偏移量。kafka中有一個定時任務負責將所有分區的logStartOffset刷寫到起始點文件log-start-offset-checkpoint中,定時周期有broker端參數log.flush.start.offset.checkpoint.interval.ms配置,默認值60000,即60s。
cleaner-offset-checkpoint:存了每個log的最后清理offset。
4. 重置Consumer offset
更新Offset由三個維度決定:Topic的作用域、重置策略、執行方案。
Topic的作用域
?? --all-topics:為consumer group下所有topic的所有分區調整位移)
?? --topic t1 --topic t2:為指定的若干個topic的所有分區調整位移
?? --topic t1:0,1,2:為指定的topic分區調整位移
重置策略
?? --to-earliest:把位移調整到分區當前最小位移
?? --to-latest:把位移調整到分區當前最新位移
?? --to-current:把位移調整到分區當前位移
?? --to-offset
?? --shift-by N: 把位移調整到當前位移 + N處,注意N可以是負數,表示向前移動
?? --to-datetime
?? --by-duration
?? --from-file
確定執行方案
?? 什么參數都不加:只是打印出位移調整方案,不具體執行
?? --execute:執行真正的位移調整
?? --export:把位移調整方案按照CSV格式打印,方便用戶成csv文件,供后續直接使用
常用示例
?? 更新到當前group最初的offset位置
bin/kafka-consumer-groups.sh --bootstrap-server IP:Port --group test-group --reset-offsets --all-topics --to-earliest --execute
?? 更新到指定的offset位置
bin/kafka-consumer-groups.sh --bootstrap-server IP:Port --group test-group --reset-offsets --all-topics --to-offset 500000 --execute
?? 更新到當前offset位置(解決offset的異常)
bin/kafka-consumer-groups.sh --bootstrap-server IP:Port --group test-group --reset-offsets --all-topics --to-current --execute
?? offset位置按設置的值進行位移
bin/kafka-consumer-groups.sh --bootstrap-server IP:Port --group test-group --reset-offsets --all-topics --shift-by -100000 --execute
?? offset設置到指定時刻開始
bin/kafka-consumer-groups.sh --bootstrap-server IP:Port --group test-group --reset-offsets --all-topics --to-datetime 2017-08-04T14:30:00.000
參考文檔:
深入理解Kafka核心設計與實踐原理? ?朱忠華著
https://cloud.tencent.com/developer/article/1436988
Kafka
版權聲明:本文內容由網絡用戶投稿,版權歸原作者所有,本站不擁有其著作權,亦不承擔相應法律責任。如果您發現本站中有涉嫌抄襲或描述失實的內容,請聯系我們jiasou666@gmail.com 處理,核實后本網站將在24小時內刪除侵權內容。
版權聲明:本文內容由網絡用戶投稿,版權歸原作者所有,本站不擁有其著作權,亦不承擔相應法律責任。如果您發現本站中有涉嫌抄襲或描述失實的內容,請聯系我們jiasou666@gmail.com 處理,核實后本網站將在24小時內刪除侵權內容。