RocketMQ一個新的消費組初次啟動時從何處開始消費呢?

      網友投稿 1626 2025-04-01

      本文目錄

      1、拋出問題

      1.1 環境準備

      1.2 消息發送者代碼

      1.3 消費端驗證代碼

      2、探究CONSUME_FROM_MAX_OFFSET實現原理

      2.1 CONSUME_FROM_LAST_OFFSET計算邏輯

      2.2 CONSUME_FROM_FIRST_OFFSET

      2.4 CONSUME_FROM_TIMESTAMP

      3、猜想與驗證

      4、解決方案

      1、拋出問題

      一個新的消費組訂閱一個已存在的Topic主題時,消費組是從該Topic的哪條消息開始消費呢?

      首先翻閱DefaultMQPushConsumer的API時,setConsumeFromWhere(ConsumeFromWhere consumeFromWhere)API映入眼簾,從字面意思來看是設置消費者從哪里開始消費,正是解開該問題的”鑰匙“。ConsumeFromWhere枚舉類圖如下:

      CONSUME_FROM_MAX_OFFSET

      從消費隊列最大的偏移量開始消費。

      CONSUME_FROM_FIRST_OFFSET

      從消費隊列最小偏移量開始消費。

      CONSUME_FROM_TIMESTAMP

      從指定的時間戳開始消費,默認為消費者啟動之前的30分鐘處開始消費??梢酝ㄟ^DefaultMQPushConsumer#setConsumeTimestamp。

      是不是點小激動,還不快試試。

      需求:新的消費組啟動時,從隊列最后開始消費,即只消費啟動后發送到消息服務器后的最新消息。

      1.1 環境準備

      本示例所用到的Topic路由信息如下:

      Broker的配置如下(broker.conf)

      brokerClusterName = DefaultCluster brokerName = broker-a brokerId = 0 deleteWhen = 04 fileReservedTime = 48 brokerRole = ASYNC_MASTER flushDiskType = ASYNC_FLUSH storePathRootDir=E:/SH2019/tmp/rocketmq_home/rocketmq4.5_simple/store storePathCommitLog=E:/SH2019/tmp/rocketmq_home/rocketmq4.5_simple/store/commitlog namesrvAddr=127.0.0.1:9876 autoCreateTopicEnable=false mapedFileSizeCommitLog=10240 mapedFileSizeConsumeQueue=2000

      1

      2

      3

      4

      5

      6

      7

      8

      9

      10

      11

      12

      13

      14

      其中重點修改了如下兩個參數:

      mapedFileSizeCommitLog

      單個commitlog文件的大小,這里使用10M,方便測試用。

      mapedFileSizeConsumeQueue

      單個consumequeue隊列長度,這里使用1000,表示一個consumequeue文件中包含1000個條目。

      1.2 消息發送者代碼

      public static void main(String[] args) throws MQClientException, InterruptedException { DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name"); producer.setNamesrvAddr("127.0.0.1:9876"); producer.start(); for (int i = 0; i < 300; i++) { try { Message msg = new Message("TopicTest" ,"TagA" , ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)); SendResult sendResult = producer.send(msg); System.out.printf("%s%n", sendResult); } catch (Exception e) { e.printStackTrace(); Thread.sleep(1000); } } producer.shutdown(); }

      1

      2

      3

      4

      5

      6

      7

      8

      9

      10

      11

      12

      13

      14

      15

      16

      通過上述,往TopicTest發送300條消息,發送完畢后,RocketMQ Broker存儲結構如下:

      1.3 消費端驗證代碼

      public static void main(String[] args) throws InterruptedException, MQClientException { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("my_consumer_01"); consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET); consumer.subscribe("TopicTest", "*"); consumer.setNamesrvAddr("127.0.0.1:9876"); consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List msgs, ConsumeConcurrentlyContext context) { System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); System.out.printf("Consumer Started.%n"); }

      1

      2

      3

      4

      5

      6

      7

      8

      9

      10

      11

      12

      13

      14

      15

      16

      執行上述代碼后,按照期望,應該是不會消費任何消息,只有等生產者再發送消息后,才會對消息進行消費,事實是這樣嗎?執行效果如圖所示:

      令人意外的是,竟然從隊列的最小偏移量開始消費了,這就“尷尬”了。難不成是RocketMQ的Bug。帶著這個疑問,從源碼的角度嘗試來解讀該問題,并指導我們實踐。

      2、探究CONSUME_FROM_MAX_OFFSET實現原理

      對于一個新的消費組,無論是集群模式還是廣播模式都不會存儲該消費組的消費進度,可以理解為-1,此時就需要根據DefaultMQPushConsumer#consumeFromWhere屬性來決定其從何處開始消費,首先我們需要找到其對應的處理入口。我們知道,消息消費者從Broker服務器拉取消息時,需要進行消費隊列的負載,即RebalanceImpl。

      溫馨提示:本文不會詳細介紹RocketMQ消息隊列負載、消息拉取、消息消費邏輯,只會展示出通往該問題的簡短流程,如想詳細了解消息消費具體細節,建議購買筆者出版的《RocketMQ技術內幕》書籍。

      RebalancePushImpl#computePullFromWhere

      public long computePullFromWhere(MessageQueue mq) { long result = -1; // @1 final ConsumeFromWhere consumeFromWhere = this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer().getConsumeFromWhere(); final OffsetStore offsetStore = this.defaultMQPushConsumerImpl.getOffsetStore(); switch (consumeFromWhere) { case CONSUME_FROM_LAST_OFFSET_AND_FROM_MIN_WHEN_BOOT_FIRST: case CONSUME_FROM_MIN_OFFSET: case CONSUME_FROM_MAX_OFFSET: case CONSUME_FROM_LAST_OFFSET: { // @2 // 省略部分代碼 break; } case CONSUME_FROM_FIRST_OFFSET: { // @3 // 省略部分代碼 break; } case CONSUME_FROM_TIMESTAMP: { //@4 // 省略部分代碼 break; } default: break; } return result; // @5 }

      1

      2

      3

      4

      5

      6

      7

      8

      9

      10

      11

      12

      13

      14

      15

      16

      17

      18

      19

      20

      21

      22

      23

      24

      25

      代碼@1:先解釋幾個局部變量。

      result

      最終的返回結果,默認為-1。

      consumeFromWhere

      消息消費者開始消費的策略,即CONSUME_FROM_LAST_OFFSET等。

      offsetStore

      offset存儲器,消費組消息偏移量存儲實現器。

      代碼@2:CONSUME_FROM_LAST_OFFSET(從隊列的最大偏移量開始消費)的處理邏輯,下文會詳細介紹。

      代碼@3:CONSUME_FROM_FIRST_OFFSET(從隊列最小偏移量開始消費)的處理邏輯,下文會詳細介紹。

      代碼@4:CONSUME_FROM_TIMESTAMP(從指定時間戳開始消費)的處理邏輯,下文會詳細介紹。

      代碼@5:返回最后計算的偏移量,從該偏移量出開始消費。

      2.1 CONSUME_FROM_LAST_OFFSET計算邏輯

      case CONSUME_FROM_LAST_OFFSET: { long lastOffset = offsetStore.readOffset(mq, ReadOffsetType.READ_FROM_STORE); // @1 if (lastOffset >= 0) { // @2 result = lastOffset; } // First start,no offset else if (-1 == lastOffset) { // @3 if (mq.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) { result = 0L; } else { try { result = this.mQClientFactory.getMQAdminImpl().maxOffset(mq); } catch (MQClientException e) { // @4 result = -1; } } } else { result = -1; } break; }

      1

      2

      3

      4

      5

      6

      7

      8

      9

      10

      11

      12

      13

      14

      15

      16

      17

      18

      19

      20

      21

      代碼@1:使用offsetStore從消息消費進度文件中讀取消費消費進度,本文將以集群模式為例展開。稍后詳細分析。

      代碼@2:如果返回的偏移量大于等于0,則直接使用該offset,這個也能理解,大于等于0,表示查詢到有效的消息消費進度,從該有效進度開始消費,但我們要特別留意lastOffset為0是什么場景,因為返回0,并不會執行CONSUME_FROM_LAST_OFFSET(語義)。

      代碼@3:如果lastOffset為-1,表示當前并未存儲其有效偏移量,可以理解為第一次消費,如果是消費組重試主題,從重試隊列偏移量為0開始消費;如果是普通主題,則從隊列當前的最大的有效偏移量開始消費,即CONSUME_FROM_LAST_OFFSET語義的實現。

      代碼@4:如果從遠程服務拉取最大偏移量拉取異?;蚱渌闆r,則使用-1作為第一次拉取偏移量。

      分析,上述執行的現象,雖然設置的是CONSUME_FROM_LAST_OFFSET,但現象是從隊列的第一條消息開始消費,根據上述源碼的分析,只有從消費組消費進度存儲文件中取到的消息偏移量為0時,才會從第一條消息開始消費,故接下來重點分析消息消費進度存儲器(OffsetStore)在什么情況下會返回0。

      接下來我們將以集群模式來查看一下消息消費進度的查詢邏輯,集群模式的消息進度存儲管理器實現為:

      RemoteBrokerOffsetStore,最終Broker端的命令處理類為:ConsumerManageProcessor。

      ConsumerManageProcessor#queryConsumerOffset private RemotingCommand queryConsumerOffset(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { final RemotingCommand response = RemotingCommand.createResponseCommand(QueryConsumerOffsetResponseHeader.class); final QueryConsumerOffsetResponseHeader responseHeader = (QueryConsumerOffsetResponseHeader) response.readCustomHeader(); final QueryConsumerOffsetRequestHeader requestHeader = (QueryConsumerOffsetRequestHeader) request .decodeCommandCustomHeader(QueryConsumerOffsetRequestHeader.class); long offset = this.brokerController.getConsumerOffsetManager().queryOffset( requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueId()); // @1 if (offset >= 0) { // @2 responseHeader.setOffset(offset); response.setCode(ResponseCode.SUCCESS); response.setRemark(null); } else { // @3 long minOffset = this.brokerController.getMessageStore().getMinOffsetInQueue(requestHeader.getTopic(), requestHeader.getQueueId()); // @4 if (minOffset <= 0 && !this.brokerController.getMessageStore().checkInDiskByConsumeOffset( // @5 requestHeader.getTopic(), requestHeader.getQueueId(), 0)) { responseHeader.setOffset(0L); response.setCode(ResponseCode.SUCCESS); response.setRemark(null); } else { // @6 response.setCode(ResponseCode.QUERY_NOT_FOUND); response.setRemark("Not found, V3_0_6_SNAPSHOT maybe this group consumer boot first"); } } return response; }

      1

      2

      3

      4

      5

      6

      7

      8

      9

      10

      11

      12

      13

      14

      15

      16

      17

      18

      19

      20

      21

      22

      23

      24

      25

      26

      27

      28

      29

      30

      31

      32

      33

      34

      35

      代碼@1:從消費消息進度文件中查詢消息消費進度。

      代碼@2:如果消息消費進度文件中存儲該隊列的消息進度,其返回的offset必然會大于等于0,則直接返回該偏移量該客戶端,客戶端從該偏移量開始消費。

      代碼@3:如果未從消息消費進度文件中查詢到其進度,offset為-1。則首先獲取該主題、消息隊列當前在Broker服務器中的最小偏移量(@4)。如果小于等于0(返回0則表示該隊列的文件還未曾刪除過)并且其最小偏移量對應的消息存儲在內存中而不是存在磁盤中,則返回偏移量0,這就意味著ConsumeFromWhere中定義的三種枚舉類型都不會生效,直接從0開始消費,到這里就能解開其謎團了(@5)。

      代碼@6:如果偏移量小于等于0,但其消息已經存儲在磁盤中,此時返回未找到,最終RebalancePushImpl#computePullFromWhere中得到的偏移量為-1。

      看到這里,大家應該能回答文章開頭處提到的問題了吧?

      看到這里,大家應該明白了,為什么設置的CONSUME_FROM_LAST_OFFSET,但消費組是從消息隊列的開始處消費了吧,原因就是消息消費進度文件中并沒有找到其消息消費進度,并且該隊列在Broker端的最小偏移量為0,說的更直白點,consumequeue/topicName/queueNum的第一個消息消費隊列文件為00000000000000000000,并且消息其對應的消息緩存在Broker端的內存中(pageCache),其返回給消費端的偏移量為0,故會從0開始消費,而不是從隊列的最大偏移量處開始消費。

      為了知識體系的完備性,我們順便來看一下其他兩種策略的計算邏輯。

      2.2 CONSUME_FROM_FIRST_OFFSET

      case CONSUME_FROM_FIRST_OFFSET: { long lastOffset = offsetStore.readOffset(mq, ReadOffsetType.READ_FROM_STORE); // @1 if (lastOffset >= 0) { // @2 result = lastOffset; } else if (-1 == lastOffset) { // @3 result = 0L; } else { result = -1; // @4 } break; }

      1

      2

      3

      4

      5

      6

      7

      8

      9

      10

      11

      從隊列的開始偏移量開始消費,其計算邏輯如下:

      代碼@1:首先通過偏移量存儲器查詢消費隊列的消費進度。

      代碼@2:如果大于等于0,則從當前該偏移量開始消費。

      代碼@3:如果遠程返回-1,表示并沒有存儲該隊列的消息消費進度,從0開始。

      代碼@4:否則從-1開始消費。

      2.4 CONSUME_FROM_TIMESTAMP

      從指定時戳后的消息開始消費。

      case CONSUME_FROM_TIMESTAMP: { ong lastOffset = offsetStore.readOffset(mq, ReadOffsetType.READ_FROM_STORE); // @1 if (lastOffset >= 0) { // @2 result = lastOffset; } else if (-1 == lastOffset) { // @3 if (mq.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) { try { result = this.mQClientFactory.getMQAdminImpl().maxOffset(mq); } catch (MQClientException e) { result = -1; } } else { try { long timestamp = UtilAll.parseDate(this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer().getConsumeTimestamp(), UtilAll.YYYYMMDDHHMMSS).getTime(); result = this.mQClientFactory.getMQAdminImpl().searchOffset(mq, timestamp); } catch (MQClientException e) { result = -1; } } } else { result = -1; } break; }

      1

      2

      3

      4

      5

      6

      7

      8

      9

      10

      11

      12

      13

      14

      15

      16

      17

      18

      19

      20

      21

      22

      RocketMQ一個新的消費組初次啟動時從何處開始消費呢?

      23

      24

      25

      其基本套路與CONSUME_FROM_LAST_OFFSET一樣:

      代碼@1:首先通過偏移量存儲器查詢消費隊列的消費進度。

      代碼@2:如果大于等于0,則從當前該偏移量開始消費。

      代碼@3:如果遠程返回-1,表示并沒有存儲該隊列的消息消費進度,如果是重試主題,則從當前隊列的最大偏移量開始消費,如果是普通主題,則根據時間戳去Broker端查詢,根據查詢到的偏移量開始消費。

      原理就介紹到這里,下面根據上述理論對其進行驗證。

      3、猜想與驗證

      根據上述理論分析我們得知設置CONSUME_FROM_LAST_OFFSET但并不是從消息隊列的最大偏移量開始消費的“罪魁禍首”是因為消息消費隊列的最小偏移量為0,如果不為0,則就會符合預期,我們來驗證一下這個猜想。

      首先我們刪除commitlog目錄下的文件,如圖所示:

      其消費隊列截圖如下:

      消費端的驗證代碼如下:

      public static void main(String[] args) throws InterruptedException, MQClientException { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("my_consumer_02"); consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET); consumer.subscribe("TopicTest", "*"); consumer.setNamesrvAddr("127.0.0.1:9876"); consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List msgs, ConsumeConcurrentlyContext context) { System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); System.out.printf("Consumer Started.%n"); }

      1

      2

      3

      4

      5

      6

      7

      8

      9

      10

      11

      12

      13

      14

      15

      16

      運行結果如下:

      并沒有消息存在的消息,符合預期。

      4、解決方案

      如果在生產環境下,一個新的消費組訂閱一個已經存在比較久的topic,設置CONSUME_FROM_MAX_OFFSET是符合預期的,即該主題的consumequeue/{queueNum}/fileName,fileName通常不會是00000000000000000000,如是是上面文件名,想要實現從隊列的最后開始消費,該如何做呢?那就走自動創建消費組的路子,執行如下命令:

      ./mqadmin updateSubGroup -n 127.0.0.1:9876 -c DefaultCluster -g my_consumer_05 //克隆一個訂閱了該topic的消費組消費進度 ./mqadmin cloneGroupOffset -n 127.0.0.1:9876 -s my_consumer_01 -d my_consumer_05 -t TopicTest //重置消費進度到當前隊列的最大值 ./mqadmin resetOffsetByTime -n 127.0.0.1:9876 -g my_consumer_05 -t TopicTest -s -1

      1

      2

      3

      4

      5

      6

      7

      按照上上述命令后,即可實現其目的。

      您都看到這里了,麻煩幫忙點個贊,謝謝您的認可與鼓勵。

      見文如面,我是威哥,熱衷于成體系剖析JAVA主流中間件,關注公眾號『中間件興趣圈』,回復專欄可獲取成體系專欄導航,回復資料可以獲取筆者的學習思維導圖。

      版權聲明:本文內容由網絡用戶投稿,版權歸原作者所有,本站不擁有其著作權,亦不承擔相應法律責任。如果您發現本站中有涉嫌抄襲或描述失實的內容,請聯系我們jiasou666@gmail.com 處理,核實后本網站將在24小時內刪除侵權內容。

      版權聲明:本文內容由網絡用戶投稿,版權歸原作者所有,本站不擁有其著作權,亦不承擔相應法律責任。如果您發現本站中有涉嫌抄襲或描述失實的內容,請聯系我們jiasou666@gmail.com 處理,核實后本網站將在24小時內刪除侵權內容。

      上一篇:2007版excel表格怎么設置打印區域(excel 2007怎么設置打印區域)
      下一篇:微軟無代碼開發平臺教程(微軟無代碼開發平臺教程下載)
      相關文章
      亚洲va久久久噜噜噜久久狠狠| 在线观看亚洲精品专区| 亚洲中文字幕久久久一区| 香蕉蕉亚亚洲aav综合| 亚洲无线一二三四区手机| 另类专区另类专区亚洲| 亚洲国产日韩a在线播放| 亚洲色中文字幕在线播放| 亚洲免费视频网址| 亚洲最大黄色网站| 亚洲欧洲日韩综合| 亚洲毛片一级带毛片基地| 久久亚洲春色中文字幕久久久 | 国产亚洲精品自在久久| 国产AV无码专区亚洲AV漫画| 久久久久久久亚洲精品| 亚洲最大av无码网址| 久久久久无码专区亚洲av| 不卡一卡二卡三亚洲| 中国亚洲女人69内射少妇| 亚洲色爱图小说专区| 亚洲国产精品福利片在线观看| 国产亚洲综合色就色| 婷婷久久久亚洲欧洲日产国码AV| 亚洲国产精品无码久久久蜜芽| 久久被窝电影亚洲爽爽爽| 亚洲av无码不卡| 久久精品国产亚洲AV大全| 亚洲人成网址在线观看| 亚洲色偷偷av男人的天堂| 亚洲国产精品白丝在线观看| 久久国产亚洲精品| 亚洲av永久中文无码精品综合| 国产精品手机在线亚洲| 亚洲人成国产精品无码| 亚洲欭美日韩颜射在线二| 亚洲国产精品一区二区成人片国内| 亚洲AV无码久久| 亚洲国产精品一区二区久| 亚洲第一区二区快射影院| 亚洲av成本人无码网站|