RocketMQ本地環(huán)境搭建
RocketMQ在mac+docker環(huán)境的本地搭建,以及用go語言實(shí)現(xiàn)一個簡單的生產(chǎn)和消費(fèi)案例

1.創(chuàng)建NameServer服務(wù)
先用命令docker search rocketmq搜索rocketmq相關(guān)鏡像
這一步先拉取rocketmqinc/rocketmq鏡像,docker pull rocketmqinc/rocketmq
然后在本地創(chuàng)建數(shù)據(jù)存儲路徑,因為是在本地電腦搭建環(huán)境,所以要用絕對路徑
mkdir -p /Users/mymac/docker/rocketmq/data/namesrv/logs /Users/mymac/docker/rocketmq/data/namesrv/store
然后構(gòu)建namesrv容器,-v也是用剛剛創(chuàng)建的絕對路徑
docker run -d \ --restart=always \ --name rmqnamesrv \ -p 9876:9876 \ -v /Users/mymac/docker/rocketmq/data/namesrv/logs:/root/logs \ -v /Users/mymac/docker/rocketmq/data/namesrv/store:/root/store \ -e "MAX_POSSIBLE_HEAP=100000000" \ rocketmqinc/rocketmq \ sh mqnamesrv
2.創(chuàng)建broker結(jié)點(diǎn)
先在本地創(chuàng)建數(shù)據(jù)存儲路徑
mkdir -p /Users/mymac/docker/rocketmq/data/broker/logs /Users/mymac/docker/rocketmq/data/broker/store /Users/mymac/docker/rocketmq/conf
然后在conf文件夾里面創(chuàng)建一個配置文件夾broker.conf,編輯里面的內(nèi)容如下
#集群名稱 brokerClusterName = DefaultCluster #broker名稱,master和slave名稱相同 brokerName = broker-a #0表示master,大于0表示各個slave brokerId = 0 #默認(rèn)凌晨4點(diǎn)消息刪除 deleteWhen = 04 #消息在磁盤保留時長,單位小時 fileReservedTime = 48 #broker角色復(fù)制方式:SYNC_MASTER,ASYNC_MASTER,SLAVE;即 Master同步復(fù)制、Master異步Master、Slave之間同步數(shù)據(jù) brokerRole = ASYNC_MASTER #刷盤策略:ASYNC_FLUSH,SYNC_FLUSH;表示同步刷盤和異步刷盤 flushDiskType = ASYNC_FLUSH #nameserver地址,其中10.0.54.77是我本機(jī)的ip地址(因為我在本機(jī)測試),通過ifconfig的en0可以查出 namesrvAddr = 10.0.54.77:9876 #broker結(jié)點(diǎn)所在服務(wù)器ip地址,因為我在本機(jī)測試,所以填寫本機(jī)ip 10.0.54.77 brokerIP1 = 10.0.54.77 # 監(jiān)聽端口,默認(rèn)是10911 listenPort = 10911
然后可以創(chuàng)建broker容器了,-v用剛剛創(chuàng)建的絕對路徑
docker run -d \ --name rmqbroker \ --link rmqnamesrv:namesrv \ -p 10911:10911 \ -p 10909:10909 \ -v /Users/mymac/docker/rocketmq/data/broker/logs:/root/logs \ -v /Users/mymac/docker/rocketmq/data/broker/store:/root/store \ -v /Users/mymac/docker/rocketmq/conf/broker.conf:/opt/rocketmq-4.4.0/conf/broker.conf \ -e "NAMESRV_ADDR=namesrv:9876" \ -e "MAX_POSSIBLE_HEAP=200000000" \ rocketmqinc/rocketmq \ sh mqbroker -c /opt/rocketmq-4.4.0/conf/broker.conf
3.創(chuàng)建rockermq-console服務(wù)
先拉取styletang/rocketmq-console-ng鏡像,docker pull styletang/rocketmq-console-ng
然后創(chuàng)建容器,其中9999是在本地訪問的端口
docker run -d \ --name rmqconsole \ -e "JAVA_OPTS=-Drocketmq.namesrv.addr=10.0.54.77:9876 \ -Dcom.rocketmq.sendMessageWithVIPChannel=false" \ -p 9999:8080 \ styletang/rocketmq-console-ng
此時三個容器都創(chuàng)建好了,可以看到如下圖
此時在瀏覽器輸入127.0.0.1:9999可以看到如下場景
4.創(chuàng)建生產(chǎn)者
先在rockermq-console瀏覽器里面創(chuàng)建一個叫kevintest的topic,然后運(yùn)行如下代碼
func main() { topic := "kevintest" p, _ := rocketmq.NewProducer( producer.WithNsResolver(primitive.NewPassthroughResolver([]string{"10.0.54.89:9876"})), producer.WithRetry(2), ) err := p.Start() if err != nil { log.Printf("start producer error: %s \n", err.Error()) os.Exit(1) } for i := 0; i < 10; i++ { msg := &primitive.Message{ Topic: topic, Body: []byte("啦啦啦啦啦啦啦啦啦" + strconv.Itoa(i)), } res, err := p.SendSync(context.Background(), msg) if err != nil { fmt.Printf("send message error: %s\n", err) } else { fmt.Printf("send message success: result=%s\n", res.String()) } } err = p.Shutdown() if err != nil { fmt.Printf("shutdown producer error: %s", err.Error()) } }
運(yùn)行之后可以看到如下信息
INFO[0000] the topic route info changed changeTo="{\"OrderTopicConf\":\"\",\"queueDatas\":[{\"brokerName\":\"broker-a\",\"readQueueNums\":8,\"writeQueueNums\":8,\"perm\":6,\"topicSynFlag\":0}],\"brokerDatas\":[{\"cluster\":\"DefaultCluster\",\"brokerName\":\"broker-a\",\"brokerAddrs\":{\"0\":\"10.0.54.89:10911\"}}]}" changedFrom="
在rockermq-console瀏覽器的Message也可以看到消息發(fā)送成功
5.創(chuàng)建消費(fèi)者
現(xiàn)在來編寫消費(fèi)者的代碼
func main() { c, _ := rocketmq.NewPushConsumer( consumer.WithNsResolver(primitive.NewPassthroughResolver([]string{"10.0.54.89:9876"})), ) err := c.Subscribe("kevintest", consumer.MessageSelector{}, func(ctx context.Context, msgs ...*primitive.MessageExt) (consumer.ConsumeResult, error) { for i := range msgs { fmt.Printf("subscribe callback: %v \n", msgs[i]) } return consumer.ConsumeSuccess, nil }) if err != nil { fmt.Println(err.Error()) } err = c.Start() if err != nil { fmt.Println(err.Error()) os.Exit(-1) } //不能馬上退出,要等到收到消息 time.Sleep(time.Millisecond*30000) err = c.Shutdown() if err != nil { fmt.Printf("shutdown Consumer error: %s", err.Error()) } }
運(yùn)行上面代碼之后會到的如下結(jié)果
WARN[0000] delete mq from offset table MessageQueue="MessageQueue [topic=%RETRY%DEFAULT_CONSUMER, brokerName=broker-a, queueId=0]" consumerGroup=DEFAULT_CONSUMER WARN[0000] fecth offset of mq from broker success MessageQueue="MessageQueue [topic=%RETRY%DEFAULT_CONSUMER, brokerName=broker-a, queueId=0]" consumerGroup=DEFAULT_CONSUMER offset=0 INFO[0000] the MessageQueue changed, version also updated changeTo=1643080672878951000 changedFrom=0 INFO[0000] The PullThresholdForTopic is changed changeTo=102400 changedFrom=102400 INFO[0000] The PullThresholdSizeForTopic is changed changeTo=51200 changedFrom=51200 WARN[0000] delete mq from offset table MessageQueue="MessageQueue [topic=kevintest, brokerName=broker-a, queueId=4]" consumerGroup=DEFAULT_CONSUMER WARN[0000] fecth offset of mq from broker success MessageQueue="MessageQueue [topic=kevintest, brokerName=broker-a, queueId=4]" consumerGroup=DEFAULT_CONSUMER offset=1 WARN[0000] delete mq from offset table MessageQueue="MessageQueue [topic=kevintest, brokerName=broker-a, queueId=5]" consumerGroup=DEFAULT_CONSUMER WARN[0000] fecth offset of mq from broker success MessageQueue="MessageQueue [topic=kevintest, brokerName=broker-a, queueId=5]" consumerGroup=DEFAULT_CONSUMER offset=1 WARN[0000] delete mq from offset table MessageQueue="MessageQueue [topic=kevintest, brokerName=broker-a, queueId=6]" consumerGroup=DEFAULT_CONSUMER WARN[0000] fecth offset of mq from broker success MessageQueue="MessageQueue [topic=kevintest, brokerName=broker-a, queueId=6]" consumerGroup=DEFAULT_CONSUMER offset=1 WARN[0000] delete mq from offset table MessageQueue="MessageQueue [topic=kevintest, brokerName=broker-a, queueId=7]" consumerGroup=DEFAULT_CONSUMER subscribe callback: [Message=[topic=kevintest, body=啦啦啦啦啦啦啦啦啦3, Flag=0, properties=map[CONSUME_START_TIME:1643080672908 MAX_OFFSET:3 MIN_OFFSET:0 UNIQ_KEY:0A00364DF14300000000799d60880004], TransactionId=], MsgId=0A00364DF14300000000799d60880004, Offse00364D00002A9F00000000000008A2,QueueId=4, StoreSize=170, QueueOffset=1, SysFlag=0, BornTimestamp=1643006757033, BornHost=172.17.0.1:59328, StoreTimestamp=1643006757044, StoreHost=10.0.54.77:10911, CommitLogOffset=2210, BodyCRC=2146591400, ReconsumeTimes=0, PreparedTransactionOffset=0] subscribe callback: [Message=[topic=kevintest, body=啦啦啦啦啦啦啦啦啦3, Flag=0, properties=map[CONSUME_START_TIME:1643080672908 MAX_OFFSET:3 MIN_OFFSET:0 UNIQ_KEY:0A003659F951000000007e00b6c00004], TransactionId=], MsgId=0A003659F951000000007e00b6c00004, Offse00365900002A9F0000000000000F46,QueueId=4, StoreSize=170, QueueOffset=2, SysFlag=0, BornTimestamp=1643080376546, BornHost=172.17.0.1:55196, StoreTimestamp=1643080376569, StoreHost=10.0.54.89:10911, CommitLogOffset=3910, BodyCRC=2146591400, ReconsumeTimes=0, PreparedTransactionOffset=0] subscribe callback: [Message=[topic=kevintest, body=啦啦啦啦啦啦啦啦啦4, Flag=0, properties=map[CONSUME_START_TIME:1643080672910 MAX_OFFSET:3 MIN_OFFSET:0 UNIQ_KEY:0A00364DF14300000000799d60880005], TransactionId=], MsgId=0A00364DF14300000000799d60880005, Offse00364D00002A9F000000000000094C,QueueId=5, StoreSize=170, QueueOffset=1, SysFlag=0, BornTimestamp=1643006757036, BornHost=172.17.0.1:59328, StoreTimestamp=1643006757047, StoreHost=10.0.54.77:10911, CommitLogOffset=2380, BodyCRC=1637283595, ReconsumeTimes=0, PreparedTransactionOffset=0] subscribe callback: [Message=[topic=kevintest, body=啦啦啦啦啦啦啦啦啦4, Flag=0, properties=map[CONSUME_START_TIME:1643080672910 MAX_OFFSET:3 MIN_OFFSET:0 UNIQ_KEY:0A003659F951000000007e00b6c00005], TransactionId=], MsgId=0A003659F951000000007e00b6c00005, Offse00365900002A9F0000000000000FF0,QueueId=5, StoreSize=170, QueueOffset=2, SysFlag=0, BornTimestamp=1643080376550, BornHost=172.17.0.1:55196, StoreTimestamp=1643080376573, StoreHost=10.0.54.89:10911, CommitLogOffset=4080, BodyCRC=1637283595, ReconsumeTimes=0, PreparedTransactionOffset=0] WARN[0000] fecth offset of mq from broker success MessageQueue="MessageQueue [topic=kevintest, brokerName=broker-a, queueId=7]" consumerGroup=DEFAULT_CONSUMER offset=1 subscribe callback: [Message=[topic=kevintest, body=啦啦啦啦啦啦啦啦啦5, Flag=0, properties=map[CONSUME_START_TIME:1643080672914 MAX_OFFSET:3 MIN_OFFSET:0 UNIQ_KEY:0A00364DF14300000000799d60880006], TransactionId=], MsgId=0A00364DF14300000000799d60880006, Offse00364D00002A9F00000000000009F6,QueueId=6, StoreSize=170, QueueOffset=1, SysFlag=0, BornTimestamp=1643006757039, BornHost=172.17.0.1:59328, StoreTimestamp=1643006757051, StoreHost=10.0.54.77:10911, CommitLogOffset=2550, BodyCRC=378652573, ReconsumeTimes=0, PreparedTransactionOffset=0] subscribe callback: [Message=[topic=kevintest, body=啦啦啦啦啦啦啦啦啦5, Flag=0, properties=map[CONSUME_START_TIME:1643080672914 MAX_OFFSET:3 MIN_OFFSET:0 UNIQ_KEY:0A003659F951000000007e00b6c00006], TransactionId=], MsgId=0A003659F951000000007e00b6c00006, Offse00365900002A9F000000000000109A,QueueId=6, StoreSize=170, QueueOffset=2, SysFlag=0, BornTimestamp=1643080376554, BornHost=172.17.0.1:55196, StoreTimestamp=1643080376577, StoreHost=10.0.54.89:10911, CommitLogOffset=4250, BodyCRC=378652573, ReconsumeTimes=0, PreparedTransactionOffset=0] WARN[0000] delete mq from offset table MessageQueue="MessageQueue [topic=kevintest, brokerName=broker-a, queueId=0]" consumerGroup=DEFAULT_CONSUMER WARN[0000] fecth offset of mq from broker success MessageQueue="MessageQueue [topic=kevintest, brokerName=broker-a, queueId=0]" consumerGroup=DEFAULT_CONSUMER offset=1 WARN[0000] delete mq from offset table MessageQueue="MessageQueue [topic=kevintest, brokerName=broker-a, queueId=1]" consumerGroup=DEFAULT_CONSUMER WARN[0000] fecth offset of mq from broker success MessageQueue="MessageQueue [topic=kevintest, brokerName=broker-a, queueId=1]" consumerGroup=DEFAULT_CONSUMER offset=2 WARN[0000] delete mq from offset table MessageQueue="MessageQueue [topic=kevintest, brokerName=broker-a, queueId=2]" consumerGroup=DEFAULT_CONSUMER subscribe callback: [Message=[topic=kevintest, body=啦啦啦啦啦啦啦啦啦6, Flag=0, properties=map[CONSUME_START_TIME:1643080672927 MAX_OFFSET:3 MIN_OFFSET:0 UNIQ_KEY:0A00364DF14300000000799d60880007], TransactionId=], MsgId=0A00364DF14300000000799d60880007, Offse00364D00002A9F0000000000000AA0,QueueId=7, StoreSize=170, QueueOffset=1, SysFlag=0, BornTimestamp=1643006757043, BornHost=172.17.0.1:59328, StoreTimestamp=1643006757054, StoreHost=10.0.54.77:10911, CommitLogOffset=2720, BodyCRC=261658151, ReconsumeTimes=0, PreparedTransactionOffset=0] subscribe callback: [Message=[topic=kevintest, body=啦啦啦啦啦啦啦啦啦6, Flag=0, properties=map[CONSUME_START_TIME:1643080672927 MAX_OFFSET:3 MIN_OFFSET:0 UNIQ_KEY:0A003659F951000000007e00b6c00007], TransactionId=], MsgId=0A003659F951000000007e00b6c00007, Offse00365900002A9F0000000000001144,QueueId=7, StoreSize=170, QueueOffset=2, SysFlag=0, BornTimestamp=1643080376558, BornHost=172.17.0.1:55196, StoreTimestamp=1643080376581, StoreHost=10.0.54.89:10911, CommitLogOffset=4420, BodyCRC=261658151, ReconsumeTimes=0, PreparedTransactionOffset=0] subscribe callback: [Message=[topic=kevintest, body=啦啦啦啦啦啦啦啦啦7, Flag=0, properties=map[CONSUME_START_TIME:1643080672927 MAX_OFFSET:3 MIN_OFFSET:0 UNIQ_KEY:0A00364DF14300000000799d60880008], TransactionId=], MsgId=0A00364DF14300000000799d60880008, Offse00364D00002A9F0000000000000B4A,QueueId=0, StoreSize=170, QueueOffset=1, SysFlag=0, BornTimestamp=1643006757046, BornHost=172.17.0.1:59328, StoreTimestamp=1643006757057, StoreHost=10.0.54.77:10911, CommitLogOffset=2890, BodyCRC=2023728817, ReconsumeTimes=0, PreparedTransactionOffset=0] subscribe callback: [Message=[topic=kevintest, body=啦啦啦啦啦啦啦啦啦7, Flag=0, properties=map[CONSUME_START_TIME:1643080672927 MAX_OFFSET:3 MIN_OFFSET:0 UNIQ_KEY:0A003659F951000000007e00b6c00008], TransactionId=], MsgId=0A003659F951000000007e00b6c00008, Offse00365900002A9F00000000000011EE,QueueId=0, StoreSize=170, QueueOffset=2, SysFlag=0, BornTimestamp=1643080376562, BornHost=172.17.0.1:55196, StoreTimestamp=1643080376585, StoreHost=10.0.54.89:10911, CommitLogOffset=4590, BodyCRC=2023728817, ReconsumeTimes=0, PreparedTransactionOffset=0] WARN[0000] fecth offset of mq from broker success MessageQueue="MessageQueue [topic=kevintest, brokerName=broker-a, queueId=2]" consumerGroup=DEFAULT_CONSUMER offset=2 WARN[0000] delete mq from offset table MessageQueue="MessageQueue [topic=kevintest, brokerName=broker-a, queueId=3]" consumerGroup=DEFAULT_CONSUMER subscribe callback: [Message=[topic=kevintest, body=啦啦啦啦啦啦啦啦啦0, Flag=0, properties=map[CONSUME_START_TIME:1643080672933 MAX_OFFSET:6 MIN_OFFSET:0 UNIQ_KEY:0A00364DF14300000000799d60880001], TransactionId=], MsgId=0A00364DF14300000000799d60880001, Offse00364D00002A9F00000000000006A4,QueueId=1, StoreSize=170, QueueOffset=2, SysFlag=0, BornTimestamp=1643006757016, BornHost=172.17.0.1:59328, StoreTimestamp=1643006757031, StoreHost=10.0.54.77:10911, CommitLogOffset=1700, BodyCRC=1727738642, ReconsumeTimes=0, PreparedTransactionOffset=0] subscribe callback: [Message=[topic=kevintest, body=啦啦啦啦啦啦啦啦啦8, Flag=0, properties=map[CONSUME_START_TIME:1643080672933 MAX_OFFSET:6 MIN_OFFSET:0 UNIQ_KEY:0A00364DF14300000000799d60880009], TransactionId=], MsgId=0A00364DF14300000000799d60880009, Offse00364D00002A9F0000000000000BF4,QueueId=1, StoreSize=170, QueueOffset=3, SysFlag=0, BornTimestamp=1643006757048, BornHost=172.17.0.1:59328, StoreTimestamp=1643006757059, StoreHost=10.0.54.77:10911, CommitLogOffset=3060, BodyCRC=1746975520, ReconsumeTimes=0, PreparedTransactionOffset=0] subscribe callback: [Message=[topic=kevintest, body=啦啦啦啦啦啦啦啦啦0, Flag=0, properties=map[CONSUME_START_TIME:1643080672933 MAX_OFFSET:6 MIN_OFFSET:0 UNIQ_KEY:0A003659F951000000007e00b6c00001], TransactionId=], MsgId=0A003659F951000000007e00b6c00001, Offse00365900002A9F0000000000000D48,QueueId=1, StoreSize=170, QueueOffset=4, SysFlag=0, BornTimestamp=1643080376514, BornHost=172.17.0.1:55196, StoreTimestamp=1643080376550, StoreHost=10.0.54.89:10911, CommitLogOffset=3400, BodyCRC=1727738642, ReconsumeTimes=0, PreparedTransactionOffset=0] subscribe callback: [Message=[topic=kevintest, body=啦啦啦啦啦啦啦啦啦8, Flag=0, properties=map[CONSUME_START_TIME:1643080672933 MAX_OFFSET:6 MIN_OFFSET:0 UNIQ_KEY:0A003659F951000000007e00b6c00009], TransactionId=], MsgId=0A003659F951000000007e00b6c00009, Offse00365900002A9F0000000000001298,QueueId=1, StoreSize=170, QueueOffset=5, SysFlag=0, BornTimestamp=1643080376566, BornHost=172.17.0.1:55196, StoreTimestamp=1643080376589, StoreHost=10.0.54.89:10911, CommitLogOffset=4760, BodyCRC=1746975520, ReconsumeTimes=0, PreparedTransactionOffset=0] WARN[0000] fecth offset of mq from broker success MessageQueue="MessageQueue [topic=kevintest, brokerName=broker-a, queueId=3]" consumerGroup=DEFAULT_CONSUMER offset=1 INFO[0000] the MessageQueue changed, version also updated changeTo=1643080672937325000 changedFrom=0 subscribe callback: [Message=[topic=kevintest, body=啦啦啦啦啦啦啦啦啦1, Flag=0, properties=map[CONSUME_START_TIME:1643080672937 MAX_OFFSET:6 MIN_OFFSET:0 UNIQ_KEY:0A00364DF14300000000799d60880002], TransactionId=], MsgId=0A00364DF14300000000799d60880002, Offse00364D00002A9F000000000000074E,QueueId=2, StoreSize=170, QueueOffset=2, SysFlag=0, BornTimestamp=1643006757023, BornHost=172.17.0.1:59328, StoreTimestamp=1643006757037, StoreHost=10.0.54.77:10911, CommitLogOffset=1870, BodyCRC=301728644, ReconsumeTimes=0, PreparedTransactionOffset=0] INFO[0000] The PullThresholdForTopic is changed changeTo=11377 changedFrom=102400 subscribe callback: [Message=[topic=kevintest, body=啦啦啦啦啦啦啦啦啦9, Flag=0, properties=map[CONSUME_START_TIME:1643080672937 MAX_OFFSET:6 MIN_OFFSET:0 UNIQ_KEY:0A00364DF14300000000799d6088000a], TransactionId=], MsgId=0A00364DF14300000000799d6088000a, Offse00364D00002A9F0000000000000C9E,QueueId=2, StoreSize=170, QueueOffset=3, SysFlag=0, BornTimestamp=1643006757051, BornHost=172.17.0.1:59328, StoreTimestamp=1643006757062, StoreHost=10.0.54.77:10911, CommitLogOffset=3230, BodyCRC=522685366, ReconsumeTimes=0, PreparedTransactionOffset=0] INFO[0000] The PullThresholdSizeForTopic is changed changeTo=5688 changedFrom=51200 subscribe callback: [Message=[topic=kevintest, body=啦啦啦啦啦啦啦啦啦1, Flag=0, properties=map[CONSUME_START_TIME:1643080672937 MAX_OFFSET:6 MIN_OFFSET:0 UNIQ_KEY:0A003659F951000000007e00b6c00002], TransactionId=], MsgId=0A003659F951000000007e00b6c00002, Offse00365900002A9F0000000000000DF2,QueueId=2, StoreSize=170, QueueOffset=4, SysFlag=0, BornTimestamp=1643080376538, BornHost=172.17.0.1:55196, StoreTimestamp=1643080376561, StoreHost=10.0.54.89:10911, CommitLogOffset=3570, BodyCRC=301728644, ReconsumeTimes=0, PreparedTransactionOffset=0] subscribe callback: [Message=[topic=kevintest, body=啦啦啦啦啦啦啦啦啦9, Flag=0, properties=map[CONSUME_START_TIME:1643080672937 MAX_OFFSET:6 MIN_OFFSET:0 UNIQ_KEY:0A003659F951000000007e00b6c0000a], TransactionId=], MsgId=0A003659F951000000007e00b6c0000a, Offse00365900002A9F0000000000001342,QueueId=2, StoreSize=170, QueueOffset=5, SysFlag=0, BornTimestamp=1643080376570, BornHost=172.17.0.1:55196, StoreTimestamp=1643080376593, StoreHost=10.0.54.89:10911, CommitLogOffset=4930, BodyCRC=522685366, ReconsumeTimes=0, PreparedTransactionOffset=0] subscribe callback: [Message=[topic=kevintest, body=啦啦啦啦啦啦啦啦啦2, Flag=0, properties=map[CONSUME_START_TIME:1643080672941 MAX_OFFSET:3 MIN_OFFSET:0 UNIQ_KEY:0A00364DF14300000000799d60880003], TransactionId=], MsgId=0A00364DF14300000000799d60880003, Offse00364D00002A9F00000000000007F8,QueueId=3, StoreSize=170, QueueOffset=1, SysFlag=0, BornTimestamp=1643006757029, BornHost=172.17.0.1:59328, StoreTimestamp=1643006757040, StoreHost=10.0.54.77:10911, CommitLogOffset=2040, BodyCRC=150295102, ReconsumeTimes=0, PreparedTransactionOffset=0] subscribe callback: [Message=[topic=kevintest, body=啦啦啦啦啦啦啦啦啦2, Flag=0, properties=map[CONSUME_START_TIME:1643080672941 MAX_OFFSET:3 MIN_OFFSET:0 UNIQ_KEY:0A003659F951000000007e00b6c00003], TransactionId=], MsgId=0A003659F951000000007e00b6c00003, Offse00365900002A9F0000000000000E9C,QueueId=3, StoreSize=170, QueueOffset=2, SysFlag=0, BornTimestamp=1643080376542, BornHost=172.17.0.1:55196, StoreTimestamp=1643080376566, StoreHost=10.0.54.89:10911, CommitLogOffset=3740, BodyCRC=150295102, ReconsumeTimes=0, PreparedTransactionOffset=0] 進(jìn)程 已完成,退出代碼為 0
可見消息消費(fèi)成功
Docker
版權(quán)聲明:本文內(nèi)容由網(wǎng)絡(luò)用戶投稿,版權(quán)歸原作者所有,本站不擁有其著作權(quán),亦不承擔(dān)相應(yīng)法律責(zé)任。如果您發(fā)現(xiàn)本站中有涉嫌抄襲或描述失實(shí)的內(nèi)容,請聯(lián)系我們jiasou666@gmail.com 處理,核實(shí)后本網(wǎng)站將在24小時內(nèi)刪除侵權(quán)內(nèi)容。
版權(quán)聲明:本文內(nèi)容由網(wǎng)絡(luò)用戶投稿,版權(quán)歸原作者所有,本站不擁有其著作權(quán),亦不承擔(dān)相應(yīng)法律責(zé)任。如果您發(fā)現(xiàn)本站中有涉嫌抄襲或描述失實(shí)的內(nèi)容,請聯(lián)系我們jiasou666@gmail.com 處理,核實(shí)后本網(wǎng)站將在24小時內(nèi)刪除侵權(quán)內(nèi)容。