SpringBoot教程(十五) | SpringBoot集成RabbitMq

      網友投稿 891 2025-04-01

      RabbitMq是我們在開發過程中經常會使用的一種消息隊列。今天我們來研究研究rabbitMq的使用。


      rabbitMq的官網: rabbitmq.com/

      rabbitMq的安裝這里先略過,因為我嘗試了幾次都失敗了,后面等我安裝成功了會把詳細的文章發出來。目前是使用公司的環境進行的調試。

      1. 一些概念

      RabbitMQ是一個開源的消息代理和隊列服務器,用來實現各個應用服務間的數據共享(跨平臺 ,跨語言)。RabbitMQ是使用erlang語言編寫的,并且基于AMQP協議實現。

      所有的消息隊列產品模型抽象上來說,都是類似的過程。生產者創建消息,然后發布到消息隊列中,由消費者進行消費。

      而rabbitMQ也是類似的,有生產者,消費者角色。其內部結構如下圖所示。

      那么接下來我們就來介紹一下RabbitMQ中的這些概念。

      消息,就是我們需要傳遞和共享的信息,消息由一些列的可選屬性組成,包括路由鍵,優先級,是否持久化等信息

      消息的生產者,也是一個向交換機發布消息的客戶端應用程序。

      交換機,這是RabbitMQ中的一個非常重要的概念,在rabbitMq中,生產者產生的消息都不是直接發送到隊列中去的,而是發送到了交換機中,交換機會通過一定的規則綁定隊列,交換機會根據相應的路由規則發送給對服務器中的隊列。

      綁定, 用于交換機和消息列隊之間的關聯。一個綁定就是基于路由鍵(routing-key)將交換機和消息隊列連接起來的路由規則。所以可以將交換機理解成一個有綁定有成的路由表。

      消息隊列,用來保存消息直到發送給消費者。它是消息的容器,也是消息的終點。一個消息可以投入一個或多個隊列中。消息一直在對隊列里邊,等待消費者連接到這個隊列將其消費。

      網絡連接,比如一個TCP連接。

      信道,多路復用連接中的一條獨立的雙向數據流通道。信道是簡歷在真實的TCP連接內的虛擬連接。AMQP命令都是通過信道發出去的,不管是發布消息、訂閱隊列還是接收消息,這些動作都是通過信道完成的。因為對于操作系統過來說建立和銷毀TCP都是非常昂貴的開銷,所以引入了信道的概念,以復用一條TCP連接。

      消息的消費者,表示一個從消息隊列中取得消息的客戶端應用。

      虛擬主機,標識一批交換機、消息隊列和相關對象。 虛擬主機是相同的身份認證和加密環境的獨立服務器域。 每個vhost本質就是一個mini版的rabbitMQ服務器,擁有自己的隊列,交換機,綁定和權限機制。vhost是AMQP概念的基礎,必須在連接時指定,RabbitMQ的默認vhost是/.

      標識消息隊列服務器實體。

      2. Exchange類型

      Exchange分發消息的時候根據類型的不同分發策略有所區別,目前常見的有四種類型: direct、fanout、topic、headers。 headers匹配AMQP消息的header而不是路由鍵,此外headers交換機和direct交換機完成一直但是性能差很多,幾乎用不到了,所以直接看另外三種類型。

      2.1 direct交換機

      消息中的路由鍵(routing key)如果和Binding中的bing key一致,交換機就將消息發送到隊列的隊列中。路由鍵要完全匹配,單個傳播。

      2.2 fanout

      每個發到fanout類型交換機的消息都會分到所有綁定的隊列上去。fanout交換器不處理路由鍵,只是簡單的將隊列綁定到交換機上,每個發送到交換機的消息都會被轉發到與該交換機綁定的所有隊列上。很像子網廣播,每臺子網內的主機都獲得了一份復制的消息。fanout類型轉發消息是最快的。

      2.3 topic

      topic交換機通過模式匹配分配路由的路由鍵屬性,將路由鍵和某個模式進行匹配,此時隊列需要綁定到一個模式上。它將路由鍵和綁定鍵的字符串切分成單詞,這些單詞之間用.隔開。它同樣會識別兩個通配符: # 和* 。 #匹配0個或多個單詞, * 匹配一個單詞

      3. springboot集成RabbitMQ

      springboot集成rabbitMQ還是比較簡單的,因為springBoot使用RabbitTemplate對常用操作進行了封裝。

      接下來我們來看一下集成過程。首先導入依賴。

      org.springframework.boot spring-boot-starter-amqp

      然后在springBoot的配置文件 application.yml中配置rabbitMQ連接信息

      server: port: 7890 spring: rabbitmq: host: 172.15.33.52 port: 5672 username: root password: 123456

      SpringBoot教程(十五) | SpringBoot集成RabbitMq

      接下來我們我們分三種交換機進行演示。

      3.1 direct

      首先是配置類,在配置類中我們需要聲明交換機,隊列和綁定關系。

      @Configuration public class DirectExchangeConfig { public static final String DIRECT_QUEUE = "directQueue"; public static final String DIRECT_QUEUE2 = "directQueue2"; public static final String DIRECT_EXCHANGE = "directExchange"; public static final String DIRECT_ROUTING_KEY = "direct"; @Bean public Queue directQueue() { return new Queue(DIRECT_QUEUE, true); } @Bean public Queue directQueue2() { return new Queue(DIRECT_QUEUE2, true); } @Bean public DirectExchange directExchange() { return new DirectExchange(DIRECT_EXCHANGE, true, false); } @Bean public Binding bindingDirectExchange(Queue directQueue, DirectExchange directExchange) { return BindingBuilder.bind(directQueue).to(directExchange).with(DIRECT_ROUTING_KEY); } @Bean public Binding bindingDirectExchange2(Queue directQueue2, DirectExchange directExchange) { return BindingBuilder.bind(directQueue2).to(directExchange).with(DIRECT_ROUTING_KEY); } }

      這里我們創建了一個叫directExchange的交換機,綁定了directQueue和directQueue2兩個隊列,路由鍵是direct.

      消息的生產者,我們通過一個Controller來進行模擬,直接引用rabbitTemplate

      @RestController @Slf4j @RequestMapping("/direct") public class DirectController { private final RabbitTemplate rabbitTemplate; public DirectController(RabbitTemplate rabbitTemplate) { this.rabbitTemplate = rabbitTemplate; } /** * direct交換機為直連模式交換機 * 根據消息攜帶的路由鍵將消息投遞給對應隊列 * * * @return */ @GetMapping("send") public Object sendMsg() { rabbitTemplate.convertAndSend(DirectExchangeConfig.DIRECT_EXCHANGE, DirectExchangeConfig.DIRECT_ROUTING_KEY, "發送一條測試消息:direct"); return "direct消息發送成功!!"; }

      當我在瀏覽器訪問對應連接的時候,就會生產一條消息發送到directExchange交換機,路由key為:direct, 消息內容為:發送一條測試消息:direct

      接下來我們來看消息的消費者。

      package com.lsqingfeng.action.rabbitmq.direct; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; /** * @className: DirectQueueListener * @description: 直連交換機的- * @author: sh.Liu * @date: 2021-08-23 16:03 */ @Slf4j @Component public class DirectQueueListener { /** * 盡管設置了兩個消費者,但是只有一個能夠消費成功 * 多次發送則輪訓消費: * DirectReceiver消費者收到消息1 : 發送一條測試消息:direct * DirectReceiver消費者收到消息2 : 發送一條測試消息:direct * DirectReceiver消費者收到消息1 : 發送一條測試消息:direct * DirectReceiver消費者收到消息2 : 發送一條測試消息:direct * * 一個交換機可以綁定多個隊列。如果通過路由key可以匹配到多個隊列,消費的時候也只能有一個進行消費 * @param testMessage */ @RabbitHandler @RabbitListener(queues = DirectExchangeConfig.DIRECT_QUEUE) public void process(String testMessage) { System.out.println("DirectReceiver消費者收到消息1 : " + testMessage); } @RabbitHandler @RabbitListener(queues = DirectExchangeConfig.DIRECT_QUEUE) public void process2(String testMessage) { System.out.println("DirectReceiver消費者收到消息2 : " + testMessage); } @RabbitHandler @RabbitListener(queues = DirectExchangeConfig.DIRECT_QUEUE2) public void process3(String testMessage) { System.out.println("DirectReceiver消費者收到消息3 : " + testMessage); } }

      當我們訪問瀏覽器生產消息會,觀察控制臺結果:

      DirectReceiver消費者收到消息1 : 發送一條測試消息:direct DirectReceiver消費者收到消息3 : 發送一條測試消息:direct

      在發送一次:

      DirectReceiver消費者收到消息3 : 發送一條測試消息:direct DirectReceiver消費者收到消息2 : 發送一條測試消息:direct

      由于我們又兩個隊列都綁定了交換機,且routeKey一樣,所以會打印兩條。要注意direct只有routeKey完全匹配的時候才能被消費,同時每個隊列中的消息只會 被消費一次。

      3.2 fanout

      配置類:

      @Configuration public class FanoutExchangeConfig { public static final String FANOUT_QUEUE = "fanoutQueue"; public static final String FANOUT_QUEUE2 = "fanoutQueue2"; public static final String FANOUT_QUEUE3 = "fanoutQueue3"; public static final String FANOUT_EXCHANGE = "fanoutExchange"; public static final String FANOUT_ROUTING_KEY = "fanout"; @Bean public Queue fanoutQueue() { return new Queue(FANOUT_QUEUE, true); } @Bean public Queue fanoutQueue2() { return new Queue(FANOUT_QUEUE2, true); } @Bean public FanoutExchange fanoutExchange() { return new FanoutExchange(FANOUT_EXCHANGE, true, false); } @Bean public Binding bindingFanoutExchange(Queue fanoutQueue, FanoutExchange fanoutExchange) { return BindingBuilder.bind(fanoutQueue).to(fanoutExchange); } @Bean public Binding bindingFanoutExchange2(Queue fanoutQueue2, FanoutExchange fanoutExchange) { return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange); }

      這里也是用一個Fanout類型的交換機綁定了兩個隊列,要注意在這種模式下,是不需要指定routing-Key的,因為所有綁定的隊列都會收到消息。

      生產者代碼如下:

      @RestController @Slf4j @RequestMapping("/fanout") public class FanoutController { private final RabbitTemplate rabbitTemplate; public FanoutController(RabbitTemplate rabbitTemplate) { this.rabbitTemplate = rabbitTemplate; } /** * fanout交換機為扇形模式交換機 * 消息會發送到所有綁定的隊列上。 * @return */ @GetMapping("send") public Object sendMsg() { rabbitTemplate.convertAndSend(FanoutExchangeConfig.FANOUT_EXCHANGE, null, "發送一條測試消息:fanout"); return "fanout消息發送成功!!"; } }

      消息的消費者:

      @Slf4j @Component public class FanoutQueueListener { /** * fanout交換機: 扇型交換機,這個交換機沒有路由鍵概念,就算你綁了路由鍵也是無視的。 這個交換機在接收到消息后,會直接轉發到綁定到它上面的所有隊列 * 同一個隊列監聽多次,只會消費一次。 * 交換機綁定的多個隊列都可以收到消息 * @param testMessage */ @RabbitHandler @RabbitListener(queues = FanoutExchangeConfig.FANOUT_QUEUE) public void process(String testMessage) { System.out.println("FanoutReceiver消費者收到消息1 : " + testMessage); } @RabbitHandler @RabbitListener(queues = FanoutExchangeConfig.FANOUT_QUEUE) public void process2(String testMessage) { System.out.println("FanoutReceiver消費者收到消息2 : " + testMessage); } @RabbitHandler @RabbitListener(queues = FanoutExchangeConfig.FANOUT_QUEUE2) public void process3(String testMessage) { System.out.println("FanoutReceiver消費者收到消息3 : " + testMessage); } }

      打印結果:

      FanoutReceiver消費者收到消息1 : 發送一條測試消息:fanout FanoutReceiver消費者收到消息3 : 發送一條測試消息:fanout

      因為方法1和方法2監聽的是同一個隊列,只有一個可以消費成功。多次執行,兩個方法交替執行。

      3.3 topic

      主題交換機,會根據routing-Key的匹配規則,將消息發送到符合規則的隊列中。

      配置類:

      /** * @className: TopicExchangeConfig * @description: * * (星號) 用來表示一個單詞 (必須出現的) * # (井號) 用來表示任意數量(零個或多個)單詞 * @author: sh.Liu * @date: 2021-08-23 15:49 */ @Configuration public class TopicExchangeConfig { public static final String TOPIC_QUEUE = "topicQueue"; public static final String TOPIC_QUEUE2 = "topicQueue2"; public static final String TOPIC_QUEUE3 = "topicQueue3"; public static final String TOPIC_EXCHANGE = "topicExchange"; public static final String TOPIC_ROUTING_KEY = "topic*"; @Bean public Queue topicQueue() { return new Queue(TOPIC_QUEUE, true); } @Bean public Queue topicQueue2() { return new Queue(TOPIC_QUEUE2, true); } @Bean public Queue topicQueue3() { return new Queue(TOPIC_QUEUE3, true); } @Bean public TopicExchange topicExchange() { return new TopicExchange(TOPIC_EXCHANGE, true, false); } @Bean public Binding bindingTopicExchange(Queue topicQueue, TopicExchange topicExchange) { return BindingBuilder.bind(topicQueue).to(topicExchange).with("topic.#"); } @Bean public Binding bindingTopicExchange2(Queue topicQueue2, TopicExchange topicExchange) { return BindingBuilder.bind(topicQueue2).to(topicExchange).with("test.#"); } @Bean public Binding bindingTopicExchange3(Queue topicQueue3, TopicExchange topicExchange) { return BindingBuilder.bind(topicQueue3).to(topicExchange).with("#"); } }

      這里要注意我們的綁定管關系。分別是topic.#, test.*, #

      #: 代表所有,* 代表有且只有一個。

      消息的發送者,我們將routingKey作為參數方便我們看效果:

      @RestController @Slf4j @RequestMapping("/topic") public class TopicController { private final RabbitTemplate rabbitTemplate; public TopicController(RabbitTemplate rabbitTemplate) { this.rabbitTemplate = rabbitTemplate; } @GetMapping("send") public Object sendMsg(String routingKey) { rabbitTemplate.convertAndSend(TopicExchangeConfig.TOPIC_EXCHANGE, routingKey, "發送一條測試消息:topic"); return "topic消息發送成功!!"; } }

      消息的消費者:

      /** * @className: TopicQueueListener * @description: 主題交換機的- * @author: sh.Liu * @date: 2021-08-23 16:03 */ @Slf4j @Component public class TopicQueueListener { /** * topic: 主題交換機 * @param testMessage */ @RabbitHandler @RabbitListener(queues = TopicExchangeConfig.TOPIC_QUEUE) public void process(String testMessage) { System.out.println("TopicReceiver消費者收到消息1 : " + testMessage); } @RabbitHandler @RabbitListener(queues = TopicExchangeConfig.TOPIC_QUEUE) public void process2(String testMessage) { System.out.println("TopicReceiver消費者收到消息2 : " + testMessage); } @RabbitHandler @RabbitListener(queues = TopicExchangeConfig.TOPIC_QUEUE2) public void process3(String testMessage) { System.out.println("TopicReceiver消費者收到消息3 : " + testMessage); } @RabbitHandler @RabbitListener(queues = TopicExchangeConfig.TOPIC_QUEUE3) public void process4(String testMessage) { System.out.println("TopicReceiver消費者收到消息4 : " + testMessage); } }

      請求:http://localhost:7890/topic/send?routingKey=test.a

      結果:

      TopicReceiver消費者收到消息3 : 發送一條測試消息:topic TopicReceiver消費者收到消息4 : 發送一條測試消息:topic

      代表: test.* 和 # 與路由key匹配成功

      請求:http://localhost:7890/topic/send?routingKey=topic.123

      TopicReceiver消費者收到消息1 : 發送一條測試消息:topic TopicReceiver消費者收到消息4 : 發送一條測試消息:topic

      代表: topic.# 和 # 匹配成功

      請求: http://localhost:7890/topic/send?routingKey=test

      TopicReceiver消費者收到消息4 : 發送一條測試消息:topic

      test.* 后面必須要有一個單詞

      請求: http://localhost:7890/topic/send?routingKey=test.aaa

      TopicReceiver消費者收到消息4 : 發送一條測試消息:topic TopicReceiver消費者收到消息3 : 發送一條測試消息:topic

      test.*和 #匹配成功

      請求:http://localhost:7890/topic/send?routingKey=test.aaa.b

      TopicReceiver消費者收到消息4 : 發送一條測試消息:topic

      只對# 匹配成功, 因為test.*只能匹配一個單詞,aaa.b代表兩個

      RabbitMQ Spring Boot

      版權聲明:本文內容由網絡用戶投稿,版權歸原作者所有,本站不擁有其著作權,亦不承擔相應法律責任。如果您發現本站中有涉嫌抄襲或描述失實的內容,請聯系我們jiasou666@gmail.com 處理,核實后本網站將在24小時內刪除侵權內容。

      版權聲明:本文內容由網絡用戶投稿,版權歸原作者所有,本站不擁有其著作權,亦不承擔相應法律責任。如果您發現本站中有涉嫌抄襲或描述失實的內容,請聯系我們jiasou666@gmail.com 處理,核實后本網站將在24小時內刪除侵權內容。

      上一篇:如何排序呢(怎么把排序)
      下一篇:excel表格怎樣設置自動保存
      相關文章
      亚洲美女视频一区| 亚洲精品成人片在线播放| 亚洲va中文字幕无码| 亚洲白嫩在线观看| 亚洲码国产精品高潮在线| 精品国产亚洲一区二区在线观看| 亚洲国产成人爱av在线播放| www.亚洲一区| 亚洲成av人在片观看| 亚洲人成电影网站国产精品| 亚洲精品国产电影| 国产成人亚洲精品电影| 国产成人va亚洲电影| 亚洲精品国产高清嫩草影院| 久久激情亚洲精品无码?V | 亚洲AⅤ视频一区二区三区| 亚洲国产成人久久一区久久| 亚洲男人av香蕉爽爽爽爽| 亚洲无码高清在线观看| 亚洲人成伊人成综合网久久久| 亚洲熟妇无码八AV在线播放| 亚洲国产精品无码成人片久久| 久久夜色精品国产嚕嚕亚洲av| 亚洲丝袜美腿视频| 亚洲福利电影在线观看| 国产99在线|亚洲| 亚洲精品精华液一区二区| 精品国产亚洲一区二区三区在线观看| 国产精品国产亚洲区艳妇糸列短篇 | 亚洲首页国产精品丝袜| 亚洲综合激情五月色一区| 亚洲AV无码一区二区乱子仑| 无码欧精品亚洲日韩一区夜夜嗨 | 综合亚洲伊人午夜网 | 亚洲国产精品一区二区久久hs| 亚洲一区二区在线视频| 亚洲中文字幕在线无码一区二区| 亚洲а∨天堂久久精品9966| 自拍偷自拍亚洲精品播放| 中文国产成人精品久久亚洲精品AⅤ无码精品 | 亚洲国产日韩在线视频|