五分鐘帶你玩轉rabbitmq(五)死信隊列
文件目錄如下
業務背景:
如果有有錯誤消息 如果手動nack同時將消息放回到隊列中 那么這條消息會反復消費 留在隊列中
如果nack后將消息丟棄 那么如果碰到網絡抖動 消息也會丟失 。 所以 建立死信隊列避免消息丟失。
原理 :
當消息進入進入業務隊列后 如果收到nack那么就將這條消息放入另一條隊列中 。
1.pom文件
2.配置文件
server:
port: 8088
spring:
host: 192.168.*.*
port: 5672
username: root
password: root
virtual-host: /
listener:
simple:
acknowledge-mode: manual #手動應答
prefetch: 1 # 每次只處理一個信息
publisher-confirms: true #開啟消息確認機制
publisher-returns: true #支持消息發送失敗返回隊列
3.RabbitMQ的配置
@Configuration
public class RabbitMqConfig {
/**
* 連接工廠
*/
@Autowired
private ConnectionFactory connectionFactory;
/**
* 定制化amqp模版
*
* ConfirmCallback接口用于實現消息發送到RabbitMQ交換器后接收ack回調 即消息發送到exchange ack
* ReturnCallback接口用于實現消息發送到RabbitMQ 交換器,但無相應隊列與交換器綁定時的回調 即消息發送不到任何一個隊列中 ack
*/
@Bean
public RabbitTemplate rabbitTemplate() {
Logger logger = LoggerFactory.getLogger(RabbitTemplate.class);
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
// 消息發送失敗返回到隊列中, yml需要配置 publisher-returns: true
rabbitTemplate.setMandatory(true);
// 發送消息確認, yml需要配置 publisher-confirms: true
rabbitTemplate.setConfirmCallback(msgSendConfirmCallBack());
// 消息返回, yml需要配置 publisher-returns: true
rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
String correlationId = message.getMessageProperties().getCorrelationId().toString();
logger.debug("消息:{} 發送失敗, 應答碼:{} 原因:{} 交換機: {} 路由鍵: {}", correlationId, replyCode, replyText, exchange,
routingKey);
});
return rabbitTemplate;
}
/**
* 確認發送消息是否成功(調用util方法)
*
* @return
*/
@Bean
public MsgSendConfirmCallBack msgSendConfirmCallBack() {
return new MsgSendConfirmCallBack();
}
}
util發送回調方法
public class MsgSendConfirmCallBack implements RabbitTemplate.ConfirmCallback {
/**
* 回調方法
* @param correlationData
* @param ack
* @param cause
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
System.out.println("MsgSendConfirmCallBack , 回調id:" + correlationData);
if (ack) {
System.out.println("消息發送成功");
} else {
//可以將消息寫入本地,使用定時任務重新發送
System.out.println("消息發送失敗:" + cause + "\n重新發送");
}
}
}
這里有一個點 如果想做實現消息失敗重新發送 在注釋處可以實現
需要將消息寫入本地 如果失敗從本地讀取 然后發送 如果成功刪除本地信息
4.業務隊列(如:訂單業務)
這里聲明了一個業務隊列
關鍵點在于x-dead-letter-exchange,x-dead-letter-routing-key 兩個參數
@Configuration
public class BusinessConfig {
/**
* 業務1模塊direct交換機的名字
*/
public static final String YEWU1_EXCHANGE = "yewu1_direct_exchange";
/**
* 業務1 demo業務的隊列名稱
*/
public static final String YEWU1_DEMO_QUEUE = "yewu1_demo_queue";
/**
* 業務1 demo業務的routekey
*/
public static final String YEWU1_DEMO_ROUTINGKEY = "yewu1_demo_key";
@Bean
public Queue yewu1DemoDeadQueue() {
// 將普通隊列綁定到死信隊列交換機上
Map
args.put(RetryConfig.RETRY_LETTER_QUEUE_KEY, DeadConfig.FAIL_EXCHANGE_NAME);
args.put(RetryConfig.RETRY_LETTER_ROUTING_KEY, DeadConfig.FAIL_ROUTING_KEY);
return new Queue("yewu1_demo_dead_queue", true, false, false, args);
}
/**
* 將消息隊列和交換機進行綁定
*/
@Bean
public Binding binding_one() {
return BindingBuilder.bind(yewu1DemoDeadQueue()).to(yewu1Exchange())
.with("yewu1_demo_dead_key");
}
}
這里有一個點如果想持久化消息到磁盤 需要新建隊列時?new Queue將第二個參數輸入為true 但是面對大并發時效率會變低
5.死信隊列
@Configuration
public class DeadConfig {
/**
* 死信隊列
*/
public final static String FAIL_QUEUE_NAME = "fail_queue";
/**
* 死信交換機
*/
public final static String FAIL_EXCHANGE_NAME = "fail_exchange";
/**
* 死信routing
*/
public final static String FAIL_ROUTING_KEY = "fail_routing";
/**
* 創建配置死信隊列
*
*/
@Bean
public Queue deadQueue() {
return new Queue(FAIL_QUEUE_NAME, true, false, false);
}
/**
* 死信交換機
*
* @return
*/
@Bean
public DirectExchange deadExchange() {
DirectExchange directExchange = new DirectExchange(FAIL_EXCHANGE_NAME, true, false);
return directExchange;
}
/**
* 綁定關系
*
* @return
*/
@Bean
public Binding failBinding() {
return BindingBuilder.bind(deadQueue()).to(deadExchange()).with(FAIL_ROUTING_KEY);
}
}
6.生產者消費者
public enum RabbitEnum {
/**
* 處理成功
*/
ACCEPT,
/**
* 可以重試的錯誤
*/
RETRY,
/**
* 無需重試的錯誤
*/
REJECT
@RequestMapping("/sendDirectDead")
String sendDirectDead(@RequestBody String message) throws Exception {
System.out.println("開始生產");
CorrelationData data = new CorrelationData(UUID.randomUUID().toString());
rabbitTemplate.convertAndSend(BusinessConfig.YEWU1_EXCHANGE, "yewu1_demo_dead_key",
message, data);
System.out.println("結束生產");
System.out.println("發送id:" + data);
return "OK,sendDirect:" + message;
}
@RabbitListener(queues = "yewu1_demo_dead_queue")
protected void consumerDead(Message message, Channel channel) throws Exception {
RabbitEnum ackSign = RabbitEnum.RETRY;
try {
int i = 10 / 0;
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (Exception e) {
ackSign = RabbitEnum.RETRY;
throw e;
} finally {
// 通過finally塊來保證Ack/Nack會且只會執行一次
if (ackSign == RabbitEnum.ACCEPT) {
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} else if (ackSign == RabbitEnum.RETRY) {
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
}
}
}
7.實驗
當發送yewu1_demo_dead_queue隊列時 如果拋出異常 會放入死信隊列中。
RabbitMQ
版權聲明:本文內容由網絡用戶投稿,版權歸原作者所有,本站不擁有其著作權,亦不承擔相應法律責任。如果您發現本站中有涉嫌抄襲或描述失實的內容,請聯系我們jiasou666@gmail.com 處理,核實后本網站將在24小時內刪除侵權內容。
版權聲明:本文內容由網絡用戶投稿,版權歸原作者所有,本站不擁有其著作權,亦不承擔相應法律責任。如果您發現本站中有涉嫌抄襲或描述失實的內容,請聯系我們jiasou666@gmail.com 處理,核實后本網站將在24小時內刪除侵權內容。