14 rocketmq整合SpringCloudStream
14 RocketMQ整合springCloudStream
發(fā)送消息
消費(fèi)消息:
spring Cloud Stream
14 RocketMQ整合SpringCloudStream
發(fā)送消息
消費(fèi)消息:
Spring Cloud Stream
14 rocketmq整合SpringCloudStream
發(fā)送消息
spring: cloud: stream: rocketmq: binder: name-server: 127.0.0.1:9876 bindings: output: destination: TopicTest group: PRODUCER_GROUP_TOPIC_TEST
@SpringBootApplication @EnableBinding({ Source.class }) public class Application { public static void main(String[] args) { SpringApplication.run(Application.class, args); } }
@Component public class ProduceController { @Autowired private Source source; @PostConstruct private void init() throws InterruptedException { MessageBuilder builder = MessageBuilder.withPayload("init..."); Message message = builder.build(); source.output().send(message); System.out.println("init..."); } }
@EnableBinding({ Source.class })表示綁定配置文件中名稱為output的消息通道Binding,Source類中定義的消息通道名稱為output。
消費(fèi)消息:
spring: cloud: stream: rocketmq: binder: name-server: 127.0.0.1:9876 bindings: input: destination: TopicTest2 group: CONSUER_GROUP_DEMO_1
name-server是RocketMq的NameServer地址,destination指定Topic名稱,指定名稱為input的Binding接收TopicTest的消息
消息監(jiān)聽:
@EnableBinding({ Sink.class}) @SpringBootApplication public class Application { @StreamListener(value = InputChannel.ORDER_INPUT) public void receive(String receiveMsg) { System.out.println("receive: " + receiveMsg); } public static void main(String[] args) { SpringApplication.run(Application.class, args); } }
@EnableBinding({ Sink.class})表示綁定配置文件名稱為input的消息通道Binding,Sink類中定義的消息通道名稱為input,@StreamListener表示定義一個(gè)消息-,接收RocketMQ中的消息。
Spring Cloud Stream
Spring Cloud Stream是構(gòu)建與共享消息傳遞系統(tǒng)連接的高度可伸縮的事件驅(qū)動(dòng)微服務(wù),目的是簡(jiǎn)化消息業(yè)務(wù)在Spring Cloud應(yīng)用程序中的開發(fā)。
通過Spring Cloud Stream注入的輸入通道inputs和輸出通道outputs與消息中間件Middleware通信,消息通道通過特定中間件綁定器Binder實(shí)現(xiàn)連接到外部代理。
Spring Cloud Stream實(shí)現(xiàn)基于發(fā)布/訂閱機(jī)制,核心四個(gè)部分組成:Spring Framework中的Spring Messaging和Spring Integration,Spring Cloud Stream中的Binders和Bindings。
Spring Messaging:Spring Framework中的統(tǒng)一消息編程模型
Message:消息對(duì)象,包含消息頭Header和消息體Payload
MessageChannel:消息通道接口,用于接收消息,提供send方法將消息發(fā)送至消息通道。
MessageHandler:消息處理器接口,用于處理消息邏輯。
Spring Integration:支持企業(yè)集成的擴(kuò)展機(jī)制,提供簡(jiǎn)單的模型來(lái)構(gòu)建企業(yè)集成解決方案,對(duì)Spring Messaging進(jìn)行擴(kuò)展。
MessageDispatcher:消息分發(fā)接口,用于分發(fā)消息和添加刪除消息處理器
MessageRouter:消息路由接口,定義默認(rèn)的輸出消息通道。
Filter:消息過濾注解,用于配置消息過濾表達(dá)式
Aggregator:消息的聚合注解,用于將一條消息拆分成多條。
Splitter:消息分割,用于將一條消息拆分成多條。
Binders:目標(biāo)綁定器,負(fù)責(zé)與外部消息中間件系統(tǒng)集成的組件。
doBindProducer:綁定消息中間件客戶端發(fā)送消息模塊。
doBindConsumer:綁定消息中間件客戶端接收消息模塊。
Bindings:外部消息中間件系統(tǒng)與應(yīng)用程序提供的消息生產(chǎn)者和消費(fèi)者之間的橋梁。
Spring Cloud Alibaba RocketMQ架構(gòu)圖
MessageChannel(output):消息通道,用于發(fā)送消息,Spring Cloud Stream的標(biāo)準(zhǔn)接口
MessageChannel(input):消息通道,用于訂閱消息,Spring Cloud Stream的標(biāo)準(zhǔn)接口
Binder bindProducer:目標(biāo)綁定器,將發(fā)送通道發(fā)過來(lái)的消息發(fā)送到RocketMQ消息服務(wù)器
Binder bindConsumer:目標(biāo)綁定器,將接收到RocketMQ消息服務(wù)器的消息推送給訂閱通道
Spring
版權(quán)聲明:本文內(nèi)容由網(wǎng)絡(luò)用戶投稿,版權(quán)歸原作者所有,本站不擁有其著作權(quán),亦不承擔(dān)相應(yīng)法律責(zé)任。如果您發(fā)現(xiàn)本站中有涉嫌抄襲或描述失實(shí)的內(nèi)容,請(qǐng)聯(lián)系我們jiasou666@gmail.com 處理,核實(shí)后本網(wǎng)站將在24小時(shí)內(nèi)刪除侵權(quán)內(nèi)容。
版權(quán)聲明:本文內(nèi)容由網(wǎng)絡(luò)用戶投稿,版權(quán)歸原作者所有,本站不擁有其著作權(quán),亦不承擔(dān)相應(yīng)法律責(zé)任。如果您發(fā)現(xiàn)本站中有涉嫌抄襲或描述失實(shí)的內(nèi)容,請(qǐng)聯(lián)系我們jiasou666@gmail.com 處理,核實(shí)后本網(wǎng)站將在24小時(shí)內(nèi)刪除侵權(quán)內(nèi)容。