五分鐘帶你玩轉rabbitmq(五)死信隊列

      網友投稿 847 2025-04-01

      文件目錄如下

      業務背景:

      如果有有錯誤消息 如果手動nack同時將消息放回到隊列中 那么這條消息會反復消費 留在隊列中

      如果nack后將消息丟棄 那么如果碰到網絡抖動 消息也會丟失 。 所以 建立死信隊列避免消息丟失。

      原理 :

      當消息進入進入業務隊列后 如果收到nack那么就將這條消息放入另一條隊列中 。

      1.pom文件

      org.springframework.boot

      spring-boot-starter-amqp

      2.配置文件

      server:

      port: 8088

      spring:

      RabbitMQ:

      host: 192.168.*.*

      port: 5672

      username: root

      password: root

      virtual-host: /

      listener:

      simple:

      acknowledge-mode: manual #手動應答

      prefetch: 1 # 每次只處理一個信息

      publisher-confirms: true #開啟消息確認機制

      publisher-returns: true #支持消息發送失敗返回隊列

      3.RabbitMQ的配置

      @Configuration

      public class RabbitMqConfig {

      /**

      * 連接工廠

      */

      @Autowired

      private ConnectionFactory connectionFactory;

      /**

      * 定制化amqp模版

      *

      * ConfirmCallback接口用于實現消息發送到RabbitMQ交換器后接收ack回調 即消息發送到exchange ack

      * ReturnCallback接口用于實現消息發送到RabbitMQ 交換器,但無相應隊列與交換器綁定時的回調 即消息發送不到任何一個隊列中 ack

      */

      @Bean

      public RabbitTemplate rabbitTemplate() {

      Logger logger = LoggerFactory.getLogger(RabbitTemplate.class);

      RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);

      // 消息發送失敗返回到隊列中, yml需要配置 publisher-returns: true

      rabbitTemplate.setMandatory(true);

      // 發送消息確認, yml需要配置 publisher-confirms: true

      rabbitTemplate.setConfirmCallback(msgSendConfirmCallBack());

      // 消息返回, yml需要配置 publisher-returns: true

      rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {

      String correlationId = message.getMessageProperties().getCorrelationId().toString();

      logger.debug("消息:{} 發送失敗, 應答碼:{} 原因:{} 交換機: {} 路由鍵: {}", correlationId, replyCode, replyText, exchange,

      routingKey);

      });

      return rabbitTemplate;

      }

      /**

      * 確認發送消息是否成功(調用util方法)

      *

      * @return

      */

      @Bean

      public MsgSendConfirmCallBack msgSendConfirmCallBack() {

      return new MsgSendConfirmCallBack();

      }

      }

      util發送回調方法

      public class MsgSendConfirmCallBack implements RabbitTemplate.ConfirmCallback {

      /**

      * 回調方法

      * @param correlationData

      * @param ack

      * @param cause

      */

      @Override

      public void confirm(CorrelationData correlationData, boolean ack, String cause) {

      System.out.println("MsgSendConfirmCallBack , 回調id:" + correlationData);

      if (ack) {

      System.out.println("消息發送成功");

      } else {

      //可以將消息寫入本地,使用定時任務重新發送

      System.out.println("消息發送失敗:" + cause + "\n重新發送");

      }

      }

      }

      這里有一個點 如果想做實現消息失敗重新發送 在注釋處可以實現

      需要將消息寫入本地 如果失敗從本地讀取 然后發送 如果成功刪除本地信息

      4.業務隊列(如:訂單業務)

      這里聲明了一個業務隊列

      關鍵點在于x-dead-letter-exchange,x-dead-letter-routing-key 兩個參數

      @Configuration

      public class BusinessConfig {

      /**

      * 業務1模塊direct交換機的名字

      */

      public static final String YEWU1_EXCHANGE = "yewu1_direct_exchange";

      /**

      * 業務1 demo業務的隊列名稱

      */

      public static final String YEWU1_DEMO_QUEUE = "yewu1_demo_queue";

      /**

      * 業務1 demo業務的routekey

      */

      public static final String YEWU1_DEMO_ROUTINGKEY = "yewu1_demo_key";

      @Bean

      public Queue yewu1DemoDeadQueue() {

      // 將普通隊列綁定到死信隊列交換機上

      Map args = new HashMap<>(2);

      args.put(RetryConfig.RETRY_LETTER_QUEUE_KEY, DeadConfig.FAIL_EXCHANGE_NAME);

      args.put(RetryConfig.RETRY_LETTER_ROUTING_KEY, DeadConfig.FAIL_ROUTING_KEY);

      return new Queue("yewu1_demo_dead_queue", true, false, false, args);

      }

      /**

      * 將消息隊列和交換機進行綁定

      */

      @Bean

      public Binding binding_one() {

      return BindingBuilder.bind(yewu1DemoDeadQueue()).to(yewu1Exchange())

      .with("yewu1_demo_dead_key");

      }

      }

      這里有一個點如果想持久化消息到磁盤 需要新建隊列時?new Queue將第二個參數輸入為true 但是面對大并發時效率會變低

      5.死信隊列

      @Configuration

      public class DeadConfig {

      /**

      * 死信隊列

      */

      public final static String FAIL_QUEUE_NAME = "fail_queue";

      /**

      * 死信交換機

      */

      public final static String FAIL_EXCHANGE_NAME = "fail_exchange";

      /**

      五分鐘帶你玩轉rabbitmq(五)死信隊列

      * 死信routing

      */

      public final static String FAIL_ROUTING_KEY = "fail_routing";

      /**

      * 創建配置死信隊列

      *

      */

      @Bean

      public Queue deadQueue() {

      return new Queue(FAIL_QUEUE_NAME, true, false, false);

      }

      /**

      * 死信交換機

      *

      * @return

      */

      @Bean

      public DirectExchange deadExchange() {

      DirectExchange directExchange = new DirectExchange(FAIL_EXCHANGE_NAME, true, false);

      return directExchange;

      }

      /**

      * 綁定關系

      *

      * @return

      */

      @Bean

      public Binding failBinding() {

      return BindingBuilder.bind(deadQueue()).to(deadExchange()).with(FAIL_ROUTING_KEY);

      }

      }

      6.生產者消費者

      public enum RabbitEnum {

      /**

      * 處理成功

      */

      ACCEPT,

      /**

      * 可以重試的錯誤

      */

      RETRY,

      /**

      * 無需重試的錯誤

      */

      REJECT

      @RequestMapping("/sendDirectDead")

      String sendDirectDead(@RequestBody String message) throws Exception {

      System.out.println("開始生產");

      CorrelationData data = new CorrelationData(UUID.randomUUID().toString());

      rabbitTemplate.convertAndSend(BusinessConfig.YEWU1_EXCHANGE, "yewu1_demo_dead_key",

      message, data);

      System.out.println("結束生產");

      System.out.println("發送id:" + data);

      return "OK,sendDirect:" + message;

      }

      @RabbitListener(queues = "yewu1_demo_dead_queue")

      protected void consumerDead(Message message, Channel channel) throws Exception {

      RabbitEnum ackSign = RabbitEnum.RETRY;

      try {

      int i = 10 / 0;

      channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);

      } catch (Exception e) {

      ackSign = RabbitEnum.RETRY;

      throw e;

      } finally {

      // 通過finally塊來保證Ack/Nack會且只會執行一次

      if (ackSign == RabbitEnum.ACCEPT) {

      channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);

      } else if (ackSign == RabbitEnum.RETRY) {

      channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);

      }

      }

      }

      7.實驗

      當發送yewu1_demo_dead_queue隊列時 如果拋出異常 會放入死信隊列中。

      RabbitMQ

      版權聲明:本文內容由網絡用戶投稿,版權歸原作者所有,本站不擁有其著作權,亦不承擔相應法律責任。如果您發現本站中有涉嫌抄襲或描述失實的內容,請聯系我們jiasou666@gmail.com 處理,核實后本網站將在24小時內刪除侵權內容。

      版權聲明:本文內容由網絡用戶投稿,版權歸原作者所有,本站不擁有其著作權,亦不承擔相應法律責任。如果您發現本站中有涉嫌抄襲或描述失實的內容,請聯系我們jiasou666@gmail.com 處理,核實后本網站將在24小時內刪除侵權內容。

      上一篇:在整列表的身份證號碼中如何提去6位年月數字(怎樣用身份證算出一列數字的年齡)
      下一篇:word2010怎么合并單元格(word表格怎么合并單元格)
      相關文章
      久久久久亚洲AV无码专区桃色 | 亚洲精品国产自在久久| 亚洲精品午夜无码电影网| 色欲aⅴ亚洲情无码AV| 亚洲精品无码你懂的| 亚洲一本之道高清乱码| 亚洲无圣光一区二区| 亚洲国产精品不卡在线电影| 亚洲av之男人的天堂网站| 亚洲a在线视频视频| 久久久久久A亚洲欧洲AV冫| 亚洲精品伦理熟女国产一区二区 | 亚洲AV无码乱码国产麻豆穿越| 亚洲福利中文字幕在线网址| 亚洲Av无码乱码在线播放| 国产精品无码亚洲一区二区三区| 亚洲精品无码av片| 亚洲免费在线观看视频| 亚洲大片在线观看| 精品亚洲麻豆1区2区3区| 亚洲电影中文字幕| 久久久久亚洲精品无码蜜桃| 亚洲精品无码Av人在线观看国产| 亚洲国产午夜福利在线播放| 国产L精品国产亚洲区久久| 婷婷亚洲天堂影院| 亚洲成a人无码av波多野按摩| 亚洲国产香蕉人人爽成AV片久久 | 亚洲黄色一级毛片| 亚洲一级大黄大色毛片| 91亚洲性爱在线视频| 亚洲午夜无码久久久久小说| 亚洲jjzzjjzz在线播放| 亚洲日韩中文字幕一区| 婷婷国产偷v国产偷v亚洲| 亚洲午夜爱爱香蕉片| 亚洲av永久无码制服河南实里| 亚洲精品午夜视频| 日本亚洲精品色婷婷在线影院| 亚洲人成网站在线观看播放青青| 亚洲欧洲日产国码久在线|