RocketMQ本地環(huán)境搭建

      網(wǎng)友投稿 863 2025-04-03

      RocketMQ在mac+docker環(huán)境的本地搭建,以及用go語言實(shí)現(xiàn)一個簡單的生產(chǎn)和消費(fèi)案例


      1.創(chuàng)建NameServer服務(wù)

      先用命令docker search rocketmq搜索rocketmq相關(guān)鏡像

      這一步先拉取rocketmqinc/rocketmq鏡像,docker pull rocketmqinc/rocketmq

      然后在本地創(chuàng)建數(shù)據(jù)存儲路徑,因為是在本地電腦搭建環(huán)境,所以要用絕對路徑

      mkdir -p /Users/mymac/docker/rocketmq/data/namesrv/logs /Users/mymac/docker/rocketmq/data/namesrv/store

      然后構(gòu)建namesrv容器,-v也是用剛剛創(chuàng)建的絕對路徑

      docker run -d \ --restart=always \ --name rmqnamesrv \ -p 9876:9876 \ -v /Users/mymac/docker/rocketmq/data/namesrv/logs:/root/logs \ -v /Users/mymac/docker/rocketmq/data/namesrv/store:/root/store \ -e "MAX_POSSIBLE_HEAP=100000000" \ rocketmqinc/rocketmq \ sh mqnamesrv

      2.創(chuàng)建broker結(jié)點(diǎn)

      先在本地創(chuàng)建數(shù)據(jù)存儲路徑

      mkdir -p /Users/mymac/docker/rocketmq/data/broker/logs /Users/mymac/docker/rocketmq/data/broker/store /Users/mymac/docker/rocketmq/conf

      然后在conf文件夾里面創(chuàng)建一個配置文件夾broker.conf,編輯里面的內(nèi)容如下

      #集群名稱 brokerClusterName = DefaultCluster #broker名稱,master和slave名稱相同 brokerName = broker-a #0表示master,大于0表示各個slave brokerId = 0 #默認(rèn)凌晨4點(diǎn)消息刪除 deleteWhen = 04 #消息在磁盤保留時長,單位小時 fileReservedTime = 48 #broker角色復(fù)制方式:SYNC_MASTER,ASYNC_MASTER,SLAVE;即 Master同步復(fù)制、Master異步Master、Slave之間同步數(shù)據(jù) brokerRole = ASYNC_MASTER #刷盤策略:ASYNC_FLUSH,SYNC_FLUSH;表示同步刷盤和異步刷盤 flushDiskType = ASYNC_FLUSH #nameserver地址,其中10.0.54.77是我本機(jī)的ip地址(因為我在本機(jī)測試),通過ifconfig的en0可以查出 namesrvAddr = 10.0.54.77:9876 #broker結(jié)點(diǎn)所在服務(wù)器ip地址,因為我在本機(jī)測試,所以填寫本機(jī)ip 10.0.54.77 brokerIP1 = 10.0.54.77 # 監(jiān)聽端口,默認(rèn)是10911 listenPort = 10911

      然后可以創(chuàng)建broker容器了,-v用剛剛創(chuàng)建的絕對路徑

      docker run -d \ --name rmqbroker \ --link rmqnamesrv:namesrv \ -p 10911:10911 \ -p 10909:10909 \ -v /Users/mymac/docker/rocketmq/data/broker/logs:/root/logs \ -v /Users/mymac/docker/rocketmq/data/broker/store:/root/store \ -v /Users/mymac/docker/rocketmq/conf/broker.conf:/opt/rocketmq-4.4.0/conf/broker.conf \ -e "NAMESRV_ADDR=namesrv:9876" \ -e "MAX_POSSIBLE_HEAP=200000000" \ rocketmqinc/rocketmq \ sh mqbroker -c /opt/rocketmq-4.4.0/conf/broker.conf

      3.創(chuàng)建rockermq-console服務(wù)

      先拉取styletang/rocketmq-console-ng鏡像,docker pull styletang/rocketmq-console-ng

      然后創(chuàng)建容器,其中9999是在本地訪問的端口

      docker run -d \ --name rmqconsole \ -e "JAVA_OPTS=-Drocketmq.namesrv.addr=10.0.54.77:9876 \ -Dcom.rocketmq.sendMessageWithVIPChannel=false" \ -p 9999:8080 \ styletang/rocketmq-console-ng

      RocketMQ本地環(huán)境搭建

      此時三個容器都創(chuàng)建好了,可以看到如下圖

      此時在瀏覽器輸入127.0.0.1:9999可以看到如下場景

      4.創(chuàng)建生產(chǎn)者

      先在rockermq-console瀏覽器里面創(chuàng)建一個叫kevintest的topic,然后運(yùn)行如下代碼

      func main() { topic := "kevintest" p, _ := rocketmq.NewProducer( producer.WithNsResolver(primitive.NewPassthroughResolver([]string{"10.0.54.89:9876"})), producer.WithRetry(2), ) err := p.Start() if err != nil { log.Printf("start producer error: %s \n", err.Error()) os.Exit(1) } for i := 0; i < 10; i++ { msg := &primitive.Message{ Topic: topic, Body: []byte("啦啦啦啦啦啦啦啦啦" + strconv.Itoa(i)), } res, err := p.SendSync(context.Background(), msg) if err != nil { fmt.Printf("send message error: %s\n", err) } else { fmt.Printf("send message success: result=%s\n", res.String()) } } err = p.Shutdown() if err != nil { fmt.Printf("shutdown producer error: %s", err.Error()) } }

      運(yùn)行之后可以看到如下信息

      INFO[0000] the topic route info changed changeTo="{\"OrderTopicConf\":\"\",\"queueDatas\":[{\"brokerName\":\"broker-a\",\"readQueueNums\":8,\"writeQueueNums\":8,\"perm\":6,\"topicSynFlag\":0}],\"brokerDatas\":[{\"cluster\":\"DefaultCluster\",\"brokerName\":\"broker-a\",\"brokerAddrs\":{\"0\":\"10.0.54.89:10911\"}}]}" changedFrom="" topic=kevintest send message success: result=SendResult [sendStatus=0, msgIds=0A003659F951000000007e00b6c00001, offsetMsgId=0A00365900002A9F0000000000000D48, queueOffset=4, messageQueue=MessageQueue [topic=kevintest, brokerName=broker-a, queueId=1]] send message success: result=SendResult [sendStatus=0, msgIds=0A003659F951000000007e00b6c00002, offsetMsgId=0A00365900002A9F0000000000000DF2, queueOffset=4, messageQueue=MessageQueue [topic=kevintest, brokerName=broker-a, queueId=2]] send message success: result=SendResult [sendStatus=0, msgIds=0A003659F951000000007e00b6c00003, offsetMsgId=0A00365900002A9F0000000000000E9C, queueOffset=2, messageQueue=MessageQueue [topic=kevintest, brokerName=broker-a, queueId=3]] send message success: result=SendResult [sendStatus=0, msgIds=0A003659F951000000007e00b6c00004, offsetMsgId=0A00365900002A9F0000000000000F46, queueOffset=2, messageQueue=MessageQueue [topic=kevintest, brokerName=broker-a, queueId=4]] send message success: result=SendResult [sendStatus=0, msgIds=0A003659F951000000007e00b6c00005, offsetMsgId=0A00365900002A9F0000000000000FF0, queueOffset=2, messageQueue=MessageQueue [topic=kevintest, brokerName=broker-a, queueId=5]] send message success: result=SendResult [sendStatus=0, msgIds=0A003659F951000000007e00b6c00006, offsetMsgId=0A00365900002A9F000000000000109A, queueOffset=2, messageQueue=MessageQueue [topic=kevintest, brokerName=broker-a, queueId=6]] send message success: result=SendResult [sendStatus=0, msgIds=0A003659F951000000007e00b6c00007, offsetMsgId=0A00365900002A9F0000000000001144, queueOffset=2, messageQueue=MessageQueue [topic=kevintest, brokerName=broker-a, queueId=7]] send message success: result=SendResult [sendStatus=0, msgIds=0A003659F951000000007e00b6c00008, offsetMsgId=0A00365900002A9F00000000000011EE, queueOffset=2, messageQueue=MessageQueue [topic=kevintest, brokerName=broker-a, queueId=0]] send message success: result=SendResult [sendStatus=0, msgIds=0A003659F951000000007e00b6c00009, offsetMsgId=0A00365900002A9F0000000000001298, queueOffset=5, messageQueue=MessageQueue [topic=kevintest, brokerName=broker-a, queueId=1]] send message success: result=SendResult [sendStatus=0, msgIds=0A003659F951000000007e00b6c0000a, offsetMsgId=0A00365900002A9F0000000000001342, queueOffset=5, messageQueue=MessageQueue [topic=kevintest, brokerName=broker-a, queueId=2]] INFO[0000] will remove client from clientMap clientID=10.0.54.89@63825 進(jìn)程 已完成,退出代碼為 0

      在rockermq-console瀏覽器的Message也可以看到消息發(fā)送成功

      5.創(chuàng)建消費(fèi)者

      現(xiàn)在來編寫消費(fèi)者的代碼

      func main() { c, _ := rocketmq.NewPushConsumer( consumer.WithNsResolver(primitive.NewPassthroughResolver([]string{"10.0.54.89:9876"})), ) err := c.Subscribe("kevintest", consumer.MessageSelector{}, func(ctx context.Context, msgs ...*primitive.MessageExt) (consumer.ConsumeResult, error) { for i := range msgs { fmt.Printf("subscribe callback: %v \n", msgs[i]) } return consumer.ConsumeSuccess, nil }) if err != nil { fmt.Println(err.Error()) } err = c.Start() if err != nil { fmt.Println(err.Error()) os.Exit(-1) } //不能馬上退出,要等到收到消息 time.Sleep(time.Millisecond*30000) err = c.Shutdown() if err != nil { fmt.Printf("shutdown Consumer error: %s", err.Error()) } }

      運(yùn)行上面代碼之后會到的如下結(jié)果

      WARN[0000] delete mq from offset table MessageQueue="MessageQueue [topic=%RETRY%DEFAULT_CONSUMER, brokerName=broker-a, queueId=0]" consumerGroup=DEFAULT_CONSUMER WARN[0000] fecth offset of mq from broker success MessageQueue="MessageQueue [topic=%RETRY%DEFAULT_CONSUMER, brokerName=broker-a, queueId=0]" consumerGroup=DEFAULT_CONSUMER offset=0 INFO[0000] the MessageQueue changed, version also updated changeTo=1643080672878951000 changedFrom=0 INFO[0000] The PullThresholdForTopic is changed changeTo=102400 changedFrom=102400 INFO[0000] The PullThresholdSizeForTopic is changed changeTo=51200 changedFrom=51200 WARN[0000] delete mq from offset table MessageQueue="MessageQueue [topic=kevintest, brokerName=broker-a, queueId=4]" consumerGroup=DEFAULT_CONSUMER WARN[0000] fecth offset of mq from broker success MessageQueue="MessageQueue [topic=kevintest, brokerName=broker-a, queueId=4]" consumerGroup=DEFAULT_CONSUMER offset=1 WARN[0000] delete mq from offset table MessageQueue="MessageQueue [topic=kevintest, brokerName=broker-a, queueId=5]" consumerGroup=DEFAULT_CONSUMER WARN[0000] fecth offset of mq from broker success MessageQueue="MessageQueue [topic=kevintest, brokerName=broker-a, queueId=5]" consumerGroup=DEFAULT_CONSUMER offset=1 WARN[0000] delete mq from offset table MessageQueue="MessageQueue [topic=kevintest, brokerName=broker-a, queueId=6]" consumerGroup=DEFAULT_CONSUMER WARN[0000] fecth offset of mq from broker success MessageQueue="MessageQueue [topic=kevintest, brokerName=broker-a, queueId=6]" consumerGroup=DEFAULT_CONSUMER offset=1 WARN[0000] delete mq from offset table MessageQueue="MessageQueue [topic=kevintest, brokerName=broker-a, queueId=7]" consumerGroup=DEFAULT_CONSUMER subscribe callback: [Message=[topic=kevintest, body=啦啦啦啦啦啦啦啦啦3, Flag=0, properties=map[CONSUME_START_TIME:1643080672908 MAX_OFFSET:3 MIN_OFFSET:0 UNIQ_KEY:0A00364DF14300000000799d60880004], TransactionId=], MsgId=0A00364DF14300000000799d60880004, Offse00364D00002A9F00000000000008A2,QueueId=4, StoreSize=170, QueueOffset=1, SysFlag=0, BornTimestamp=1643006757033, BornHost=172.17.0.1:59328, StoreTimestamp=1643006757044, StoreHost=10.0.54.77:10911, CommitLogOffset=2210, BodyCRC=2146591400, ReconsumeTimes=0, PreparedTransactionOffset=0] subscribe callback: [Message=[topic=kevintest, body=啦啦啦啦啦啦啦啦啦3, Flag=0, properties=map[CONSUME_START_TIME:1643080672908 MAX_OFFSET:3 MIN_OFFSET:0 UNIQ_KEY:0A003659F951000000007e00b6c00004], TransactionId=], MsgId=0A003659F951000000007e00b6c00004, Offse00365900002A9F0000000000000F46,QueueId=4, StoreSize=170, QueueOffset=2, SysFlag=0, BornTimestamp=1643080376546, BornHost=172.17.0.1:55196, StoreTimestamp=1643080376569, StoreHost=10.0.54.89:10911, CommitLogOffset=3910, BodyCRC=2146591400, ReconsumeTimes=0, PreparedTransactionOffset=0] subscribe callback: [Message=[topic=kevintest, body=啦啦啦啦啦啦啦啦啦4, Flag=0, properties=map[CONSUME_START_TIME:1643080672910 MAX_OFFSET:3 MIN_OFFSET:0 UNIQ_KEY:0A00364DF14300000000799d60880005], TransactionId=], MsgId=0A00364DF14300000000799d60880005, Offse00364D00002A9F000000000000094C,QueueId=5, StoreSize=170, QueueOffset=1, SysFlag=0, BornTimestamp=1643006757036, BornHost=172.17.0.1:59328, StoreTimestamp=1643006757047, StoreHost=10.0.54.77:10911, CommitLogOffset=2380, BodyCRC=1637283595, ReconsumeTimes=0, PreparedTransactionOffset=0] subscribe callback: [Message=[topic=kevintest, body=啦啦啦啦啦啦啦啦啦4, Flag=0, properties=map[CONSUME_START_TIME:1643080672910 MAX_OFFSET:3 MIN_OFFSET:0 UNIQ_KEY:0A003659F951000000007e00b6c00005], TransactionId=], MsgId=0A003659F951000000007e00b6c00005, Offse00365900002A9F0000000000000FF0,QueueId=5, StoreSize=170, QueueOffset=2, SysFlag=0, BornTimestamp=1643080376550, BornHost=172.17.0.1:55196, StoreTimestamp=1643080376573, StoreHost=10.0.54.89:10911, CommitLogOffset=4080, BodyCRC=1637283595, ReconsumeTimes=0, PreparedTransactionOffset=0] WARN[0000] fecth offset of mq from broker success MessageQueue="MessageQueue [topic=kevintest, brokerName=broker-a, queueId=7]" consumerGroup=DEFAULT_CONSUMER offset=1 subscribe callback: [Message=[topic=kevintest, body=啦啦啦啦啦啦啦啦啦5, Flag=0, properties=map[CONSUME_START_TIME:1643080672914 MAX_OFFSET:3 MIN_OFFSET:0 UNIQ_KEY:0A00364DF14300000000799d60880006], TransactionId=], MsgId=0A00364DF14300000000799d60880006, Offse00364D00002A9F00000000000009F6,QueueId=6, StoreSize=170, QueueOffset=1, SysFlag=0, BornTimestamp=1643006757039, BornHost=172.17.0.1:59328, StoreTimestamp=1643006757051, StoreHost=10.0.54.77:10911, CommitLogOffset=2550, BodyCRC=378652573, ReconsumeTimes=0, PreparedTransactionOffset=0] subscribe callback: [Message=[topic=kevintest, body=啦啦啦啦啦啦啦啦啦5, Flag=0, properties=map[CONSUME_START_TIME:1643080672914 MAX_OFFSET:3 MIN_OFFSET:0 UNIQ_KEY:0A003659F951000000007e00b6c00006], TransactionId=], MsgId=0A003659F951000000007e00b6c00006, Offse00365900002A9F000000000000109A,QueueId=6, StoreSize=170, QueueOffset=2, SysFlag=0, BornTimestamp=1643080376554, BornHost=172.17.0.1:55196, StoreTimestamp=1643080376577, StoreHost=10.0.54.89:10911, CommitLogOffset=4250, BodyCRC=378652573, ReconsumeTimes=0, PreparedTransactionOffset=0] WARN[0000] delete mq from offset table MessageQueue="MessageQueue [topic=kevintest, brokerName=broker-a, queueId=0]" consumerGroup=DEFAULT_CONSUMER WARN[0000] fecth offset of mq from broker success MessageQueue="MessageQueue [topic=kevintest, brokerName=broker-a, queueId=0]" consumerGroup=DEFAULT_CONSUMER offset=1 WARN[0000] delete mq from offset table MessageQueue="MessageQueue [topic=kevintest, brokerName=broker-a, queueId=1]" consumerGroup=DEFAULT_CONSUMER WARN[0000] fecth offset of mq from broker success MessageQueue="MessageQueue [topic=kevintest, brokerName=broker-a, queueId=1]" consumerGroup=DEFAULT_CONSUMER offset=2 WARN[0000] delete mq from offset table MessageQueue="MessageQueue [topic=kevintest, brokerName=broker-a, queueId=2]" consumerGroup=DEFAULT_CONSUMER subscribe callback: [Message=[topic=kevintest, body=啦啦啦啦啦啦啦啦啦6, Flag=0, properties=map[CONSUME_START_TIME:1643080672927 MAX_OFFSET:3 MIN_OFFSET:0 UNIQ_KEY:0A00364DF14300000000799d60880007], TransactionId=], MsgId=0A00364DF14300000000799d60880007, Offse00364D00002A9F0000000000000AA0,QueueId=7, StoreSize=170, QueueOffset=1, SysFlag=0, BornTimestamp=1643006757043, BornHost=172.17.0.1:59328, StoreTimestamp=1643006757054, StoreHost=10.0.54.77:10911, CommitLogOffset=2720, BodyCRC=261658151, ReconsumeTimes=0, PreparedTransactionOffset=0] subscribe callback: [Message=[topic=kevintest, body=啦啦啦啦啦啦啦啦啦6, Flag=0, properties=map[CONSUME_START_TIME:1643080672927 MAX_OFFSET:3 MIN_OFFSET:0 UNIQ_KEY:0A003659F951000000007e00b6c00007], TransactionId=], MsgId=0A003659F951000000007e00b6c00007, Offse00365900002A9F0000000000001144,QueueId=7, StoreSize=170, QueueOffset=2, SysFlag=0, BornTimestamp=1643080376558, BornHost=172.17.0.1:55196, StoreTimestamp=1643080376581, StoreHost=10.0.54.89:10911, CommitLogOffset=4420, BodyCRC=261658151, ReconsumeTimes=0, PreparedTransactionOffset=0] subscribe callback: [Message=[topic=kevintest, body=啦啦啦啦啦啦啦啦啦7, Flag=0, properties=map[CONSUME_START_TIME:1643080672927 MAX_OFFSET:3 MIN_OFFSET:0 UNIQ_KEY:0A00364DF14300000000799d60880008], TransactionId=], MsgId=0A00364DF14300000000799d60880008, Offse00364D00002A9F0000000000000B4A,QueueId=0, StoreSize=170, QueueOffset=1, SysFlag=0, BornTimestamp=1643006757046, BornHost=172.17.0.1:59328, StoreTimestamp=1643006757057, StoreHost=10.0.54.77:10911, CommitLogOffset=2890, BodyCRC=2023728817, ReconsumeTimes=0, PreparedTransactionOffset=0] subscribe callback: [Message=[topic=kevintest, body=啦啦啦啦啦啦啦啦啦7, Flag=0, properties=map[CONSUME_START_TIME:1643080672927 MAX_OFFSET:3 MIN_OFFSET:0 UNIQ_KEY:0A003659F951000000007e00b6c00008], TransactionId=], MsgId=0A003659F951000000007e00b6c00008, Offse00365900002A9F00000000000011EE,QueueId=0, StoreSize=170, QueueOffset=2, SysFlag=0, BornTimestamp=1643080376562, BornHost=172.17.0.1:55196, StoreTimestamp=1643080376585, StoreHost=10.0.54.89:10911, CommitLogOffset=4590, BodyCRC=2023728817, ReconsumeTimes=0, PreparedTransactionOffset=0] WARN[0000] fecth offset of mq from broker success MessageQueue="MessageQueue [topic=kevintest, brokerName=broker-a, queueId=2]" consumerGroup=DEFAULT_CONSUMER offset=2 WARN[0000] delete mq from offset table MessageQueue="MessageQueue [topic=kevintest, brokerName=broker-a, queueId=3]" consumerGroup=DEFAULT_CONSUMER subscribe callback: [Message=[topic=kevintest, body=啦啦啦啦啦啦啦啦啦0, Flag=0, properties=map[CONSUME_START_TIME:1643080672933 MAX_OFFSET:6 MIN_OFFSET:0 UNIQ_KEY:0A00364DF14300000000799d60880001], TransactionId=], MsgId=0A00364DF14300000000799d60880001, Offse00364D00002A9F00000000000006A4,QueueId=1, StoreSize=170, QueueOffset=2, SysFlag=0, BornTimestamp=1643006757016, BornHost=172.17.0.1:59328, StoreTimestamp=1643006757031, StoreHost=10.0.54.77:10911, CommitLogOffset=1700, BodyCRC=1727738642, ReconsumeTimes=0, PreparedTransactionOffset=0] subscribe callback: [Message=[topic=kevintest, body=啦啦啦啦啦啦啦啦啦8, Flag=0, properties=map[CONSUME_START_TIME:1643080672933 MAX_OFFSET:6 MIN_OFFSET:0 UNIQ_KEY:0A00364DF14300000000799d60880009], TransactionId=], MsgId=0A00364DF14300000000799d60880009, Offse00364D00002A9F0000000000000BF4,QueueId=1, StoreSize=170, QueueOffset=3, SysFlag=0, BornTimestamp=1643006757048, BornHost=172.17.0.1:59328, StoreTimestamp=1643006757059, StoreHost=10.0.54.77:10911, CommitLogOffset=3060, BodyCRC=1746975520, ReconsumeTimes=0, PreparedTransactionOffset=0] subscribe callback: [Message=[topic=kevintest, body=啦啦啦啦啦啦啦啦啦0, Flag=0, properties=map[CONSUME_START_TIME:1643080672933 MAX_OFFSET:6 MIN_OFFSET:0 UNIQ_KEY:0A003659F951000000007e00b6c00001], TransactionId=], MsgId=0A003659F951000000007e00b6c00001, Offse00365900002A9F0000000000000D48,QueueId=1, StoreSize=170, QueueOffset=4, SysFlag=0, BornTimestamp=1643080376514, BornHost=172.17.0.1:55196, StoreTimestamp=1643080376550, StoreHost=10.0.54.89:10911, CommitLogOffset=3400, BodyCRC=1727738642, ReconsumeTimes=0, PreparedTransactionOffset=0] subscribe callback: [Message=[topic=kevintest, body=啦啦啦啦啦啦啦啦啦8, Flag=0, properties=map[CONSUME_START_TIME:1643080672933 MAX_OFFSET:6 MIN_OFFSET:0 UNIQ_KEY:0A003659F951000000007e00b6c00009], TransactionId=], MsgId=0A003659F951000000007e00b6c00009, Offse00365900002A9F0000000000001298,QueueId=1, StoreSize=170, QueueOffset=5, SysFlag=0, BornTimestamp=1643080376566, BornHost=172.17.0.1:55196, StoreTimestamp=1643080376589, StoreHost=10.0.54.89:10911, CommitLogOffset=4760, BodyCRC=1746975520, ReconsumeTimes=0, PreparedTransactionOffset=0] WARN[0000] fecth offset of mq from broker success MessageQueue="MessageQueue [topic=kevintest, brokerName=broker-a, queueId=3]" consumerGroup=DEFAULT_CONSUMER offset=1 INFO[0000] the MessageQueue changed, version also updated changeTo=1643080672937325000 changedFrom=0 subscribe callback: [Message=[topic=kevintest, body=啦啦啦啦啦啦啦啦啦1, Flag=0, properties=map[CONSUME_START_TIME:1643080672937 MAX_OFFSET:6 MIN_OFFSET:0 UNIQ_KEY:0A00364DF14300000000799d60880002], TransactionId=], MsgId=0A00364DF14300000000799d60880002, Offse00364D00002A9F000000000000074E,QueueId=2, StoreSize=170, QueueOffset=2, SysFlag=0, BornTimestamp=1643006757023, BornHost=172.17.0.1:59328, StoreTimestamp=1643006757037, StoreHost=10.0.54.77:10911, CommitLogOffset=1870, BodyCRC=301728644, ReconsumeTimes=0, PreparedTransactionOffset=0] INFO[0000] The PullThresholdForTopic is changed changeTo=11377 changedFrom=102400 subscribe callback: [Message=[topic=kevintest, body=啦啦啦啦啦啦啦啦啦9, Flag=0, properties=map[CONSUME_START_TIME:1643080672937 MAX_OFFSET:6 MIN_OFFSET:0 UNIQ_KEY:0A00364DF14300000000799d6088000a], TransactionId=], MsgId=0A00364DF14300000000799d6088000a, Offse00364D00002A9F0000000000000C9E,QueueId=2, StoreSize=170, QueueOffset=3, SysFlag=0, BornTimestamp=1643006757051, BornHost=172.17.0.1:59328, StoreTimestamp=1643006757062, StoreHost=10.0.54.77:10911, CommitLogOffset=3230, BodyCRC=522685366, ReconsumeTimes=0, PreparedTransactionOffset=0] INFO[0000] The PullThresholdSizeForTopic is changed changeTo=5688 changedFrom=51200 subscribe callback: [Message=[topic=kevintest, body=啦啦啦啦啦啦啦啦啦1, Flag=0, properties=map[CONSUME_START_TIME:1643080672937 MAX_OFFSET:6 MIN_OFFSET:0 UNIQ_KEY:0A003659F951000000007e00b6c00002], TransactionId=], MsgId=0A003659F951000000007e00b6c00002, Offse00365900002A9F0000000000000DF2,QueueId=2, StoreSize=170, QueueOffset=4, SysFlag=0, BornTimestamp=1643080376538, BornHost=172.17.0.1:55196, StoreTimestamp=1643080376561, StoreHost=10.0.54.89:10911, CommitLogOffset=3570, BodyCRC=301728644, ReconsumeTimes=0, PreparedTransactionOffset=0] subscribe callback: [Message=[topic=kevintest, body=啦啦啦啦啦啦啦啦啦9, Flag=0, properties=map[CONSUME_START_TIME:1643080672937 MAX_OFFSET:6 MIN_OFFSET:0 UNIQ_KEY:0A003659F951000000007e00b6c0000a], TransactionId=], MsgId=0A003659F951000000007e00b6c0000a, Offse00365900002A9F0000000000001342,QueueId=2, StoreSize=170, QueueOffset=5, SysFlag=0, BornTimestamp=1643080376570, BornHost=172.17.0.1:55196, StoreTimestamp=1643080376593, StoreHost=10.0.54.89:10911, CommitLogOffset=4930, BodyCRC=522685366, ReconsumeTimes=0, PreparedTransactionOffset=0] subscribe callback: [Message=[topic=kevintest, body=啦啦啦啦啦啦啦啦啦2, Flag=0, properties=map[CONSUME_START_TIME:1643080672941 MAX_OFFSET:3 MIN_OFFSET:0 UNIQ_KEY:0A00364DF14300000000799d60880003], TransactionId=], MsgId=0A00364DF14300000000799d60880003, Offse00364D00002A9F00000000000007F8,QueueId=3, StoreSize=170, QueueOffset=1, SysFlag=0, BornTimestamp=1643006757029, BornHost=172.17.0.1:59328, StoreTimestamp=1643006757040, StoreHost=10.0.54.77:10911, CommitLogOffset=2040, BodyCRC=150295102, ReconsumeTimes=0, PreparedTransactionOffset=0] subscribe callback: [Message=[topic=kevintest, body=啦啦啦啦啦啦啦啦啦2, Flag=0, properties=map[CONSUME_START_TIME:1643080672941 MAX_OFFSET:3 MIN_OFFSET:0 UNIQ_KEY:0A003659F951000000007e00b6c00003], TransactionId=], MsgId=0A003659F951000000007e00b6c00003, Offse00365900002A9F0000000000000E9C,QueueId=3, StoreSize=170, QueueOffset=2, SysFlag=0, BornTimestamp=1643080376542, BornHost=172.17.0.1:55196, StoreTimestamp=1643080376566, StoreHost=10.0.54.89:10911, CommitLogOffset=3740, BodyCRC=150295102, ReconsumeTimes=0, PreparedTransactionOffset=0] 進(jìn)程 已完成,退出代碼為 0

      可見消息消費(fèi)成功

      Docker

      版權(quán)聲明:本文內(nèi)容由網(wǎng)絡(luò)用戶投稿,版權(quán)歸原作者所有,本站不擁有其著作權(quán),亦不承擔(dān)相應(yīng)法律責(zé)任。如果您發(fā)現(xiàn)本站中有涉嫌抄襲或描述失實(shí)的內(nèi)容,請聯(lián)系我們jiasou666@gmail.com 處理,核實(shí)后本網(wǎng)站將在24小時內(nèi)刪除侵權(quán)內(nèi)容。

      版權(quán)聲明:本文內(nèi)容由網(wǎng)絡(luò)用戶投稿,版權(quán)歸原作者所有,本站不擁有其著作權(quán),亦不承擔(dān)相應(yīng)法律責(zé)任。如果您發(fā)現(xiàn)本站中有涉嫌抄襲或描述失實(shí)的內(nèi)容,請聯(lián)系我們jiasou666@gmail.com 處理,核實(shí)后本網(wǎng)站將在24小時內(nèi)刪除侵權(quán)內(nèi)容。

      上一篇:微軟商城宣布Office365訂閱版包含Office 2016全新應(yīng)用(Microsoft365怎么訂閱)
      下一篇:excel2013表格設(shè)置下拉選項(2013excel表格下拉選項怎么設(shè)置)
      相關(guān)文章
      337P日本欧洲亚洲大胆精品| 亚洲人成网网址在线看| 亚洲人成网站日本片| 亚洲视频在线观看不卡| 亚洲AV日韩AV鸥美在线观看| 亚洲AV无码乱码国产麻豆穿越| 亚洲国产高清在线一区二区三区| 亚洲精品成a人在线观看☆| 亚洲欧美成人一区二区三区| 亚洲AV无码成人专区| 亚洲六月丁香六月婷婷蜜芽| 亚洲成av人片在线看片| 亚洲福利一区二区精品秒拍| 亚洲理论片在线中文字幕| 亚洲男人的天堂在线| 亚洲国产成人精品无码区在线网站| 亚洲一区二区成人| 亚洲视频在线观看不卡| 亚洲最大视频网站| 精品亚洲AV无码一区二区三区| 亚洲国产综合精品| 亚洲av无码一区二区三区观看| 亚洲av成人一区二区三区| 日韩亚洲国产综合高清| 亚洲区日韩精品中文字幕| 亚洲国产精品久久久久秋霞小| 亚洲av午夜国产精品无码中文字| 国产精品亚洲天堂| 亚洲人AV永久一区二区三区久久| 亚洲天堂中文字幕在线| 国产亚洲精AA在线观看SEE| 亚洲AV综合色区无码一区| 久久精品国产亚洲77777| 亚洲大香伊人蕉在人依线| 成人区精品一区二区不卡亚洲| 亚洲人成自拍网站在线观看| 亚洲AV网一区二区三区 | 亚洲色图国产精品| 亚洲男人电影天堂| 亚洲乱码日产精品一二三| 国产成人va亚洲电影|