快速嘗鮮:SpringBoot 集成 RabbitMQ

      網友投稿 918 2025-03-31

      哈嘍大家好,我是阿Q!


      上文我們已經完成了RabbitMQ的安裝,安完就要讓它發揮點作用,今天就在springboot項目里集成一下子,嘗嘗鮮!

      在項目真正開始之前我們先來簡單介紹下RabbitMQ的工作流程:

      生產者往交換機中發送消息;

      交換機通過規則綁定隊列,通過路由鍵將消息存儲到隊列中;

      消費者獲取隊列中的消息進行消費;

      環境:springboot 2.6.3、JDK 1.8

      項目搭建

      首先創建SpringBoot項目 rabbit-mq

      引入依賴

      org.springframework.boot spring-boot-starter-amqp

      yml文件配置

      spring: rabbitmq: host: 127.0.0.1 //rabbitMQ服務地址 port: 15672 //這個地方暫時先用我們之前配置的15672 username: cheetah //自己的賬戶名 password: 123456 //自己的密碼

      直連交換機

      本項目以直連交換機為例,至于其他的交換機類型將在后文中給出詳細介紹。

      @Configuration public class DirectRabbitConfig { /** * 定義交換機 **/ @Bean public DirectExchange directExchange(){ /** * 交換機名稱 * 持久性標志:是否持久化,默認是 true 即聲明一個持久的 exchange,該exchange將在服務器重啟后繼續運行 * 自動刪除標志:是否自動刪除,默認為 false, 如果服務器想在 exchange不再使用時刪除它,則設置為 true **/ return new DirectExchange("directExchange", true, false); } /** * 定義隊列 **/ @Bean public Queue directQueue(){ /** * name:隊列名稱 * durable:是否持久化,默認是 true,持久化隊列,會被存儲在磁盤上,當消息代理重啟時仍然存在 * exclusive:是否排他,默認為 false,true則表示聲明了一個排他隊列(該隊列將僅由聲明者連接使用),如果連接關閉,則隊列被刪除。此參考優先級高于durable * autoDelete:是否自動刪除, 默認是 false,true則表示當隊列不再使用時,服務器刪除該隊列 **/ return new Queue("directQueue",true); } /** * 隊列和交換機綁定 * 設置路由鍵:directRouting **/ @Bean Binding bindingDirect(){ return BindingBuilder.bind(directQueue()).to(directExchange()).with("directRouting"); } }

      消息發送

      @RestController public class SendMessageController { @Autowired private RabbitTemplate rabbitTemplate; @GetMapping("/sendMessage") public String sendMessage(){ //將消息攜帶路由鍵值 rabbitTemplate.convertAndSend("directExchange", "directRouting", "發送消息!"); return "ok"; } }

      我們先啟動程序,在瀏覽器訪問下

      http://127.0.0.1:9001/sendMessage

      報錯如下:

      我們之前已經給該用戶分配過權限了,如果之前未分配,直接在客戶端中配置:

      之所以訪問不到,是因為我們使用的端口號不正確

      所以我們需要將端口改為 5672(如果是阿里云服務器實例,需要將該端口開放權限)

      我們再來訪問下

      http://127.0.0.1:9001/sendMessage

      請求返回"OK",控制臺輸出

      客戶端相關頁面截圖如下:

      消息消費

      @Component @RabbitListener(queues = "directQueue")//監聽隊列名稱 public class MQReciever { @RabbitHandler public void process(String message){ System.out.println("接收到的消息是:"+ message); } }

      啟動項目,發現消息已經被消費。

      為了防止消息丟失,RabbitMQ增加了消息確認機制。

      確認機制

      一、生產者消息確認機制

      在yml中增加配置信息

      spring: rabbitmq: #確認消息已發送到交換機(Exchange) publisher-confirm-type: correlated #確認消息已發送到隊列(Queue) publisher-returns: true

      spring.rabbitmq.publisher-confirm 新版本已被棄用,現在使用?spring.rabbitmq.publisher-confirm-type =?correlated 實現相同效果

      增加回調

      @Bean public RabbitTemplate createRabbitTemplate(ConnectionFactory connectionFactory){ RabbitTemplate rabbitTemplate = new RabbitTemplate(); rabbitTemplate.setConnectionFactory(connectionFactory); //設置開啟 Mandatory,才能觸發回調函數,無論消息推送結果怎么樣都強制調用回調函數 rabbitTemplate.setMandatory(true); rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() { @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { System.out.println("ConfirmCallback: "+"相關數據:"+correlationData); System.out.println("ConfirmCallback: "+"確認情況:"+ack); System.out.println("ConfirmCallback: "+"原因:"+cause); } }); rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback(){ @Override public void returnedMessage(ReturnedMessage returned) { System.out.println("ReturnCallback: "+"消息:"+returned.getMessage()); System.out.println("ReturnCallback: "+"回應碼:"+returned.getReplyCode()); System.out.println("ReturnCallback: "+"回應信息:"+returned.getReplyText()); System.out.println("ReturnCallback: "+"交換機:"+returned.getExchange()); System.out.println("ReturnCallback: "+"路由鍵:"+returned.getRoutingKey()); } }); return rabbitTemplate; }

      confirm機制是只保證消息到達exchange,并不保證消息可以路由到正確的queue

      當前的exchange不存在或者指定的路由key路由不到才會觸發return機制

      大家可以自行演示以下情況的執行結果:

      不存在交換機和隊列

      存在交換機,不存在隊列

      消息推送成功

      二、消費者消息的確認機制

      默認情況下如果一個消息被消費者正確接收則會從隊列中移除。如果一個隊列沒被任何消費者訂閱,那么這個隊列中的消息會被緩存,當有消費者訂閱時則會立即發送,進而從隊列中移除。

      消費者消息的確認機制可以分為以下3種:

      自動確認

      AcknowledgeMode.NONE 默認為自動確認,不管消費者是否成功處理了消息,消息都會從隊列中被移除。

      根據情況確認

      AcknowledgeMode.AUTO 根據方法的執行情況來決定是否確認還是拒絕(是否重新入隊列)

      如果消息成功被消費(成功的意思是在消費的過程中沒有拋出異常),則自動確認

      當拋出AmqpRejectAndDontRequeueException 異常的時候,則消息會被拒絕,且消息不會重回隊列

      當拋出 ImmediateAcknowledgeAmqpException 異常,則消費者會被確認

      其他的異常,則消息會被拒絕,并且該消息會重回隊列,如果此時只有一個消費者監聽該隊列,則有發生死循環的風險,多消費端也會造成資源的極大浪費,這個在開發過程中一定要避免的。可以通過 setDefaultRequeueRejected(默認是true)去設置

      可能造成消息丟失,一般是需要我們在try-catch捕捉異常后,打印日志用于追蹤數據,這樣找出對應數據再做后續處理。

      手動確認

      AcknowledgeMode.MANUAL對于手動確認,也是我們工作中最常用到的,它的用法如下:

      /* * 肯定確認 * deliveryTag:消息隊列數據的唯一id * multiple:是否批量 * true :一次性確認所有小于等于deliveryTag的消息 * false:對當前消息進行確認; */ channel.basicAck(long deliveryTag, boolean multiple);

      /* * 否定確認 * multiple:是否批量 * true:一次性拒絕所有小于deliveryTag的消息 * false:對當前消息進行確認; * requeue:被拒絕的是否重新入列, * true:就是將數據重新丟回隊列里,那么下次還會消費這消息; * false:就是拒絕處理該消息,服務器把該消息丟掉即可。 */ channel.basicNack(long deliveryTag, boolean multiple, boolean requeue);

      /* * 用于否定確認,但與basicNack相比有一個限制,一次只能拒絕單條消息 */ channel.basicReject(long deliveryTag, boolean requeue);

      快速嘗鮮:SpringBoot 集成 RabbitMQ

      在yml配置中開啟手動確認模式

      spring: rabbitmq: listener: simple: acknowledge-mode: manual

      或者在代碼中開啟

      @Configuration public class MessageListenerConfig { @Autowired private CachingConnectionFactory connectionFactory; @Autowired private MQReciever mqReciever;//消息接收處理類 @Bean public SimpleMessageListenerContainer simpleMessageListenerContainer(){ SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory); //并發使用者的數量 container.setConcurrentConsumers(1); //消費者人數上限 container.setMaxConcurrentConsumers(1); container.setAcknowledgeMode(AcknowledgeMode.MANUAL); // RabbitMQ默認是自動確認,這里改為手動確認消息 //設置一個隊列,此處支持設置多個 container.setQueueNames("directQueue"); container.setMessageListener(mqReciever); return container; } }

      消息消費類

      @Component @RabbitListener(queues = "directQueue")//監聽隊列名稱 public class MQReciever implements ChannelAwareMessageListener { @Override public void onMessage(Message message, Channel channel) throws Exception { long deliveryTag = message.getMessageProperties().getDeliveryTag(); try { String msg = message.toString(); String[] msgArray = msg.split("'");//可以點進Message里面看源碼,單引號直接的數據就是我們的map消息數據 System.out.println("消費的消息內容:"+msgArray[1]); System.out.println("消費的主題消息來自:"+message.getMessageProperties().getConsumerQueue()); //業務處理 ...... channel.basicAck(deliveryTag, true); } catch (Exception e) { //拒絕重新入隊列 channel.basicReject(deliveryTag, false); e.printStackTrace(); } } }

      無ack:效率高,存在丟失大量消息的風險;有ack:效率低,不會丟消息。

      RabbitMQ Spring Boot

      版權聲明:本文內容由網絡用戶投稿,版權歸原作者所有,本站不擁有其著作權,亦不承擔相應法律責任。如果您發現本站中有涉嫌抄襲或描述失實的內容,請聯系我們jiasou666@gmail.com 處理,核實后本網站將在24小時內刪除侵權內容。

      版權聲明:本文內容由網絡用戶投稿,版權歸原作者所有,本站不擁有其著作權,亦不承擔相應法律責任。如果您發現本站中有涉嫌抄襲或描述失實的內容,請聯系我們jiasou666@gmail.com 處理,核實后本網站將在24小時內刪除侵權內容。

      上一篇:Java并發編程(四)--- 死鎖的發生與避免
      下一篇:如何保護您的ERP數據免受勒索軟件的侵害
      相關文章
      精品久久久久亚洲| 婷婷亚洲综合一区二区| 久久亚洲国产精品五月天婷| 亚洲aⅴ无码专区在线观看春色 | 国产亚洲福利精品一区| 久久久久亚洲av毛片大| 超清首页国产亚洲丝袜| 久久99亚洲综合精品首页| 亚洲国产综合久久天堂| 亚洲国产成人久久一区久久| 亚洲国产精品毛片av不卡在线| 亚洲国产精品无码久久青草| 亚洲精品无码av天堂| 国内精品99亚洲免费高清| 在线a亚洲v天堂网2019无码| 人人狠狠综合久久亚洲88| 亚洲国产成人久久精品动漫 | 老牛精品亚洲成av人片| 亚洲av无码成人精品区一本二本| 亚洲AV永久无码精品一福利| 国产精品亚洲专区一区| 亚洲麻豆精品国偷自产在线91| 亚洲麻豆精品国偷自产在线91| 亚洲色偷偷综合亚洲AVYP| 亚洲AV无码码潮喷在线观看| 亚洲好看的理论片电影| 亚洲精品在线免费观看视频| 亚洲伊人久久大香线蕉结合| 亚洲国产精品无码久久九九大片 | 中文字幕亚洲综合久久| 亚洲精品无码久久毛片波多野吉衣| 亚洲视屏在线观看| 国产成人亚洲合集青青草原精品| 亚洲乱理伦片在线观看中字| 成人亚洲网站www在线观看| 亚洲欧洲自拍拍偷精品 美利坚| 亚洲精品午夜无码电影网| 久久青青草原亚洲av无码app | 91麻豆国产自产在线观看亚洲| 亚洲Av综合色区无码专区桃色| 亚洲视频国产视频|