五分鐘帶你玩轉rabbitmq(七)怎么保證消息不丟失
先來說明一個概念,什么是可靠投遞呢?在RabbitMQ中,一個消息從生產者發送到RabbitMQ服務器,需要經歷這么幾個步驟:
生產者準備好需要投遞的消息。
生產者與RabbitMQ服務器建立連接。
生產者發送消息。
RabbitMQ服務器接收到消息,并將其路由到指定隊列。
RabbitMQ服務器發起回調,告知生產者消息發送成功。
所謂可靠投遞,就是確保消息能夠百分百從生產者發送到服務器。
隊列存在的以下問題:消息丟失問題?重復消費問題 以下為解決點
1:隊列持久化硬盤
丟失的過程就只有在內存發送到磁盤時會丟失消息?如果保存到磁盤后 重啟服務消息不會丟失 但是會影效率
new Queue("demo_queue", true, false, false, args); 第二個參數為true
2:手動ack
告知生產者消息成功/失敗,否則,如果失敗此隊列會保持掛起狀態,他們消息會等待。所以在消費完成之后通知生產者消費是否成功/失敗,ack/nack
配置文件
rabbitmq:
host: 192.168.xx.xx
port: 5672
username: root
password: root
virtual-host: /
listener:
simple:
acknowledge-mode: manual #手動應答
prefetch: 1 # 每次只處理一個信息
publisher-confirms: true #開啟消息確認機制
publisher-returns: true #支持消息發送失敗返回隊列
@RabbitListener(queues = "demo_queue")
protected void consumerDead(Message message, Channel channel) throws Exception {
RabbitEnum ackSign = RabbitEnum.ACCEPT;
try {
int i = 10 / 0;
} catch (Exception e) {
ackSign = RabbitEnum.RETRY;
throw e;
} finally {
// 通過finally塊來保證Ack/Nack會且只會執行一次
if (ackSign == RabbitEnum.ACCEPT) {
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} else if (ackSign == RabbitEnum.RETRY) {
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
}
}
}
3:確認是否發送成功
判斷消息是否發送到交換機
@Bean
public RabbitTemplate rabbitTemplate() {
Logger logger = LoggerFactory.getLogger(RabbitTemplate.class);
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
// 消息發送失敗返回到隊列中, yml需要配置 publisher-returns: true
rabbitTemplate.setMandatory(true);
// 發送消息確認, yml需要配置 publisher-confirms: true 消息是否發送的核心配置
rabbitTemplate.setConfirmCallback(msgSendConfirmCallBack());
// 消息返回, yml需要配置 publisher-returns: true 消息返回的配置
rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
String correlationId = message.getMessageProperties().getCorrelationId().toString();
logger.debug("消息:{} 發送失敗, 應答碼:{} 原因:{} 交換機: {} 路由鍵: {}", correlationId, replyCode, replyText, exchange,
routingKey);
});
return rabbitTemplate;
}
/**
* 確認發送消息是否成功
*
* @return
*/
@Bean
public MsgSendConfirmCallBack msgSendConfirmCallBack() {
return new MsgSendConfirmCallBack();
}
public class MsgSendConfirmCallBack implements RabbitTemplate.ConfirmCallback {
/**
* 回調方法
* @param correlationData
* @param ack
* @param cause
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
System.out.println("MsgSendConfirmCallBack , 回調id:" + correlationData);
if (ack) {
System.out.println("消息發送成功");
} else {
//可以將消息寫入本地,使用定時任務重新發送
System.out.println("消息發送失敗:" + cause + "\n重新發送");
}
}
}
4:集群化處理
5:異地容災
6:發送消息持久化到db中 進行消息的重新發送
7:消費者消息固話到db中 通過消息id判斷是否重復消費
參考:https://www.freesion.com/article/1880596463/
RabbitMQ
版權聲明:本文內容由網絡用戶投稿,版權歸原作者所有,本站不擁有其著作權,亦不承擔相應法律責任。如果您發現本站中有涉嫌抄襲或描述失實的內容,請聯系我們jiasou666@gmail.com 處理,核實后本網站將在24小時內刪除侵權內容。
版權聲明:本文內容由網絡用戶投稿,版權歸原作者所有,本站不擁有其著作權,亦不承擔相應法律責任。如果您發現本站中有涉嫌抄襲或描述失實的內容,請聯系我們jiasou666@gmail.com 處理,核實后本網站將在24小時內刪除侵權內容。