RabbitMQ消息模型詳解
目錄
一、消息隊列
什么是消息隊列
AMQP和JMS
常見MQ產品
二、RabbitMQ
三、五種消息模型
四、簡單消息模型
代碼演示
獲取連接
生產者
消費者
手動ACK
五、工作模式
代碼演示:
生產者
消費者1和消費者2
六、發布訂閱模式
七、訂閱模型-Fanout
演示代碼:
生產者
消費者1和消費者2
八、訂閱模型-Direct
代碼演示:
生產者
消費者1和消費者2
九、訂閱模型-Topic
代碼演示
生產者
消費者1和消費者2
十、如何避免消息丟失?---持久化
交換機持久化
隊列持久化
隊列持久化
一、消息隊列
什么是消息隊列
消息隊列,即MQ,Message Queue。
消息隊列是典型的:生產者、消費者模型。生產者不斷向消息隊列中生產消息,消費者不斷的從隊列中獲取消息。因為消息的生產和消費都是異步的,而且只關心消息的發送和接收,沒有業務邏輯的侵入,這樣就實現了生產者和消費者的解耦
AMQP和JMS
MQ是消息通信的模型,并不是具體實現。現在實現MQ的有兩種主流方式:AMQP、JMS。
兩者間的區別和聯系:
JMS是定義了統一的接口,來對消息操作進行統一;AMQP是通過規定協議來統一數據交互的格式
JMS限定了必須使用Java語言;AMQP只是協議,不規定實現方式,因此是跨語言的。
JMS規定了兩種消息模型;而AMQP的消息模型更加豐富
常見MQ產品
ActiveMQ:基于JMS
RabbitMQ:基于AMQP協議,erlang語言開發,穩定性好
RocketMQ:基于JMS,阿里巴巴產品,目前交由Apache基金會
Kafka:分布式消息系統,高吞吐量
二、RabbitMQ
RabbitMQ是基于AMQP的一款消息管理系統
官網: Messaging that just works — RabbitMQ
官方教程:RabbitMQ Tutorials — RabbitMQ
安裝教程:小小張自由—>張有博_CSDN博客-C#編程基礎,項目實戰,Java進階領域博主
RabbitMQ 基本概念
Message
消息,消息是不具名的,它由消息頭和消息體組成。消息體是不透明的,而消息頭則由一系列的可選屬性組成,這些屬性包括routing-key(路由鍵)、priority(相對于其他消息的優先權)、delivery-mode(指出該消息可能需要持久性存儲)等。
Publisher
消息的生產者,也是一個向交換器發布消息的客戶端應用程序。
Exchange
交換器,用來接收生產者發送的消息并將這些消息路由給服務器中的隊列。
Binding
綁定,用于消息隊列和交換器之間的關聯。一個綁定就是基于路由鍵將交換器和消息隊列連接起來的路由規則,所以可以將交換器理解成一個由綁定構成的路由表。
Queue
消息隊列,用來保存消息直到發送給消費者。它是消息的容器,也是消息的終點。一個消息可投入一個或多個隊列。消息一直在隊列里面,等待消費者連接到這個隊列將其取走。
Connection
網絡連接,比如一個TCP連接。
Channel
信道,多路復用連接中的一條獨立的雙向數據流通道。信道是建立在真實的TCP連接內地虛擬連接,AMQP 命令都是通過信道發出去的,不管是發布消息、訂閱隊列還是接收消息,這些動作都是通過信道完成。因為對于操作系統來說建立和銷毀 TCP 都是非常昂貴的開銷,所以引入了信道的概念,以復用一條 TCP 連接。
Consumer
消息的消費者,表示一個從消息隊列中取得消息的客戶端應用程序。
Virtual Host
虛擬主機,表示一批交換器、消息隊列和相關對象。虛擬主機是共享相同的身份認證和加密環境的獨立服務器域。每個 vhost 本質上就是一個 mini 版的 RabbitMQ 服務器,擁有自己的隊列、交換器、綁定和權限機制。vhost 是 AMQP 概念的基礎,必須在連接時指定,RabbitMQ 默認的 vhost 是 / 。
Broker
表示消息隊列服務器實體。
三、五種消息模型
RabbitMQ提供了6種消息模型,但是第6種其實是RPC,并不是MQ,因此不予學習。那么也就剩下5種。
但是其實3、4、5這三種都屬于訂閱模型,只不過進行路由的方式不同。
四、簡單消息模型
RabbitMQ是一個消息代理:它接受和轉發消息。 你可以把它想象成一個郵局:當你把郵件放在郵箱里時,你可以確定郵差先生最終會把郵件發送給你的收件人。 在這個比喻中,RabbitMQ是郵政信箱,郵局和郵遞員。
RabbitMQ與郵局的主要區別是它不處理紙張,而是接受,存儲和轉發數據消息的二進制數據塊。
P(producer/ publisher):生產者,一個發送消息的用戶應用程序。
C(consumer):消費者,消費和接收有類似的意思,消費者是一個主要用來等待接收消息的用戶應用程序
隊列(紅色區域):rabbitmq內部類似于郵箱的一個概念。雖然消息流經rabbitmq和你的應用程序,但是它們只能存儲在隊列中。隊列只受主機的內存和磁盤限制,實質上是一個大的消息緩沖區。許多生產者可以發送消息到一個隊列,許多消費者可以嘗試從一個隊列接收數據。
總之:生產者將消息發送到隊列,消費者從隊列中獲取消息,隊列是存儲消息的緩沖區。
代碼演示
引入依賴
配置文件
spring: rabbitmq: host: 192.168.99.99 username: leyou password: leyou virtual-host: /leyou
獲取連接
import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.Connection; public class ConnectionUtil { /** * 建立與RabbitMQ的連接 * @return * @throws Exception */ public static Connection getConnection() throws Exception { //定義連接工廠 ConnectionFactory factory = new ConnectionFactory(); //設置服務地址 factory.setHost("192.168.99.99"); //端口 factory.setPort(5672); //設置賬號信息,用戶名、密碼、vhost factory.setVirtualHost("/leyou"); factory.setUsername("leyou"); factory.setPassword("leyou"); // 通過工程獲取連接 Connection connection = factory.newConnection(); return connection; } }
生產者
import cn.itcast.rabbitmq.util.ConnectionUtil; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; /** * 生產者 */ public class Send { private final static String QUEUE_NAME = "simple_queue"; public static void main(String[] argv) throws Exception { // 獲取到連接 Connection connection = ConnectionUtil.getConnection(); // 從連接中創建通道,使用通道才能完成消息相關的操作 Channel channel = connection.createChannel(); // 聲明(創建)隊列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 消息內容 String message = "Hello World!"; for (int i = 0; i < 10; i++) { // 向指定的隊列中發送消息 message=message+i; channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); System.out.println(" [x] Sent '" + message + "'"); } //關閉通道和連接 channel.close(); connection.close(); } }
消費者
import java.io.IOException; import com.rabbitmq.client.AMQP.BasicProperties; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.DefaultConsumer; import com.rabbitmq.client.Envelope; import cn.itcast.rabbitmq.util.ConnectionUtil; /** * 消費者 */ public class Recv { private final static String QUEUE_NAME = "simple_queue"; public static void main(String[] argv) throws Exception { // 獲取到連接 Connection connection = ConnectionUtil.getConnection(); // 創建通道 Channel channel = connection.createChannel(); // 聲明隊列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 定義隊列的消費者 DefaultConsumer consumer = new DefaultConsumer(channel) { // 獲取消息,并且處理,這個方法類似事件監聽,如果有消息的時候,會被自動調用 @Override public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException { // body 即消息體 String msg = new String(body); System.out.println(" [x] received : " + msg + "!"); } }; // 監聽隊列,第二個參數:是否自動進行消息確認。 channel.basicConsume(QUEUE_NAME, true, consumer); } }
上述代碼中:消息一旦被消費者接收,隊列中的消息就會被刪除。
如果消費者領取消息后,還沒執行操作就掛掉了呢?或者拋出了異常?消息消費失敗,但是RabbitMQ無從得知,這樣消息就丟失了!
因此,RabbitMQ有一個ACK機制。當消費者獲取消息后,會向RabbitMQ發送回執ACK,告知消息已經被接收。不過這種回執ACK分兩種情況:
自動ACK:消息一旦被接收,消費者自動發送ACK
手動ACK:消息接收后,不會發送ACK,需要手動調用
手動ACK
import java.io.IOException; import com.rabbitmq.client.AMQP.BasicProperties; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.DefaultConsumer; import com.rabbitmq.client.Envelope; import cn.itcast.rabbitmq.util.ConnectionUtil; /** * 消費者,手動進行ACK */ public class Recv2 { private final static String QUEUE_NAME = "simple_queue"; public static void main(String[] argv) throws Exception { // 獲取到連接 Connection connection = ConnectionUtil.getConnection(); // 創建通道 final Channel channel = connection.createChannel(); // 聲明隊列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 定義隊列的消費者 DefaultConsumer consumer = new DefaultConsumer(channel) { // 獲取消息,并且處理,這個方法類似事件監聽,如果有消息的時候,會被自動調用 @Override public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException { // body 即消息體 //int i=1/0; String msg = new String(body); System.out.println(" [x] received : " + msg + "!"); // 手動進行ACK channel.basicAck(envelope.getDeliveryTag(), false); } }; // 監聽隊列,第二個參數false,手動進行ACK // 如果第二個參數為true,則會自動進行ACK;如果為false,則需要手動ACK。方法的聲明: channel.basicConsume(QUEUE_NAME, false, consumer); } }
五、工作模式
工作隊列或者競爭消費者模式
工作隊列,又稱任務隊列。主要思想就是避免執行資源密集型任務時,必須等待它執行完成。相反我們稍后完成任務,我們將任務封裝為消息并將其發送到隊列。 在后臺運行的工作進程將獲取任務并最終執行作業。當你運行許多消費者時,任務將在他們之間共享,但是一個消息只能被一個消費者獲取。
這個概念在Web應用程序中特別有用,因為在短的HTTP請求窗口中無法處理復雜的任務。
接下來我們來模擬這個流程:
P:生產者:任務的發布者
C1:消費者,領取任務并且完成任務,假設完成速度較快
C2:消費者2:領取任務并完成任務,假設完成速度慢
我們可以使用basicQos方法和prefetchCount = 1設置。 這告訴RabbitMQ一次不要向工作人員發送多于一條消息。 或者換句話說,不要向工作人員發送新消息,直到它處理并確認了前一個消息。 相反,它會將其分派給不是仍然忙碌的下一個工作人員。
代碼演示:
生產者
import cn.itcast.rabbitmq.util.ConnectionUtil; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; // 生產者 public class Send { private final static String QUEUE_NAME = "test_work_queue"; public static void main(String[] argv) throws Exception { // 獲取到連接 Connection connection = ConnectionUtil.getConnection(); // 獲取通道 Channel channel = connection.createChannel(); // 聲明隊列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 循環發布任務 for (int i = 0; i < 50; i++) { // 消息內容 String message = "task .. " + i; channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); System.out.println(" [x] Sent '" + message + "'"); Thread.sleep(i * 2); } // 關閉通道和連接 channel.close(); connection.close(); } }
消費者1和消費者2
import java.io.IOException; import com.rabbitmq.client.AMQP.BasicProperties; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.DefaultConsumer; import com.rabbitmq.client.Envelope; import cn.itcast.rabbitmq.util.ConnectionUtil; // 消費者1 public class Recv { private final static String QUEUE_NAME = "test_work_queue"; public static void main(String[] argv) throws Exception { // 獲取到連接 Connection connection = ConnectionUtil.getConnection(); // 獲取通道 final Channel channel = connection.createChannel(); // 聲明隊列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 設置每個消費者同時只能處理一條消息 //channel.basicQos(1); // 定義隊列的消費者 DefaultConsumer consumer = new DefaultConsumer(channel) { // 獲取消息,并且處理,這個方法類似事件監聽,如果有消息的時候,會被自動調用 @Override public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException { // body 即消息體 String msg = new String(body); System.out.println(" [消費者1] received : " + msg + "!"); try { // 模擬完成任務的耗時:1000ms Thread.sleep(1000); } catch (InterruptedException e) { } // 手動ACK channel.basicAck(envelope.getDeliveryTag(), false); } }; // 監聽隊列。 channel.basicConsume(QUEUE_NAME, false, consumer); } } import java.io.IOException; import com.rabbitmq.client.AMQP.BasicProperties; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.DefaultConsumer; import com.rabbitmq.client.Envelope; import cn.itcast.rabbitmq.util.ConnectionUtil; //消費者2 public class Recv2 { private final static String QUEUE_NAME = "test_work_queue"; public static void main(String[] argv) throws Exception { // 獲取到連接 Connection connection = ConnectionUtil.getConnection(); // 獲取通道 final Channel channel = connection.createChannel(); // 聲明隊列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 設置每個消費者同時只能處理一條消息 //channel.basicQos(1); // 定義隊列的消費者 DefaultConsumer consumer = new DefaultConsumer(channel) { // 獲取消息,并且處理,這個方法類似事件監聽,如果有消息的時候,會被自動調用 @Override public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException { // body 即消息體 String msg = new String(body); System.out.println(" [消費者2] received : " + msg + "!"); // 手動ACK channel.basicAck(envelope.getDeliveryTag(), false); } }; // 監聽隊列。 channel.basicConsume(QUEUE_NAME, false, consumer); } }
面試題:避免消息堆積?
1)采用workqueue,多個消費者監聽同一隊列。
2)接收到消息以后,而是通過線程池,異步消費。
六、發布訂閱模式
解讀:
1、1個生產者,多個消費者
2、每一個消費者都有自己的一個隊列
3、生產者沒有將消息直接發送到隊列,而是發送到了交換機
4、每個隊列都要綁定到交換機
5、生產者發送的消息,經過交換機到達隊列,實現一個消息被多個消費者獲取的目的
X(Exchanges):交換機一方面:接收生產者發送的消息。另一方面:知道如何處理消息,例如遞交給某個特別隊列、遞交給所有隊列、或是將消息丟棄。到底如何操作,取決于Exchange的類型。Exchange(交換機)只負責轉發消息,不具備存儲消息的能力,因此如果沒有任何隊列與Exchange綁定,或者沒有符合路由規則的隊列,那么消息會丟失!
Exchange類型有以下幾種:
Fanout:廣播,將消息交給所有綁定到交換機的隊列
Direct:定向,把消息交給符合指定routing key 的隊列
Topic:通配符,把消息交給符合routing pattern(路由模式) 的隊列
七、訂閱模型-Fanout
Fanout,也稱為廣播。
流程圖:
在廣播模式下,消息發送流程是這樣的:
1) 可以有多個消費者
2) 每個消費者有自己的queue(隊列)
3) 每個隊列都要綁定到Exchange(交換機)
4) 生產者發送的消息,只能發送到交換機,交換機來決定要發給哪個隊列,生產者無法決定。
5) 交換機把消息發送給綁定過的所有隊列
6) 隊列的消費者都能拿到消息。實現一條消息被多個消費者消費
演示代碼:
生產者
1) 聲明Exchange,不再聲明Queue
2) 發送消息到Exchange,不再發送到Queue
import cn.itcast.rabbitmq.util.ConnectionUtil; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.MessageProperties; public class Send { private final static String EXCHANGE_NAME = "fanout_exchange_test"; public static void main(String[] argv) throws Exception { // 獲取到連接 Connection connection = ConnectionUtil.getConnection(); // 獲取通道 Channel channel = connection.createChannel(); // 聲明exchange,指定類型為fanout channel.exchangeDeclare(EXCHANGE_NAME, "fanout",true); // 消息內容 String message = "Hello everyone"; // 發布消息到Exchange channel.basicPublish(EXCHANGE_NAME, "", MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes()); System.out.println(" [生產者] Sent '" + message + "'"); channel.close(); connection.close(); } }
消費者1和消費者2
import java.io.IOException; import com.rabbitmq.client.AMQP.BasicProperties; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.DefaultConsumer; import com.rabbitmq.client.Envelope; import cn.itcast.rabbitmq.util.ConnectionUtil; //消費者1 public class Recv { private final static String QUEUE_NAME = "fanout_exchange_queue_1"; private final static String EXCHANGE_NAME = "fanout_exchange_test"; public static void main(String[] argv) throws Exception { // 獲取到連接 Connection connection = ConnectionUtil.getConnection(); // 獲取通道 Channel channel = connection.createChannel(); // 聲明隊列 channel.queueDeclare(QUEUE_NAME, true, false, false, null); // 綁定隊列到交換機 channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ""); // 定義隊列的消費者 DefaultConsumer consumer = new DefaultConsumer(channel) { // 獲取消息,并且處理,這個方法類似事件監聽,如果有消息的時候,會被自動調用 @Override public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException { // body 即消息體 String msg = new String(body); System.out.println(" [消費者1] received : " + msg + "!"); } }; // 監聽隊列,自動返回完成 channel.basicConsume(QUEUE_NAME, true, consumer); } } import java.io.IOException; import com.rabbitmq.client.AMQP.BasicProperties; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.DefaultConsumer; import com.rabbitmq.client.Envelope; import cn.itcast.rabbitmq.util.ConnectionUtil; // 消費者2 public class Recv2 { private final static String QUEUE_NAME = "fanout_exchange_queue_2"; private final static String EXCHANGE_NAME = "fanout_exchange_test"; public static void main(String[] argv) throws Exception { // 獲取到連接 Connection connection = ConnectionUtil.getConnection(); // 獲取通道 Channel channel = connection.createChannel(); // 聲明隊列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 綁定隊列到交換機 channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ""); // 定義隊列的消費者 DefaultConsumer consumer = new DefaultConsumer(channel) { // 獲取消息,并且處理,這個方法類似事件監聽,如果有消息的時候,會被自動調用 @Override public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException { // body 即消息體 String msg = new String(body); System.out.println(" [消費者2] received : " + msg + "!"); } }; // 監聽隊列,手動返回完成 channel.basicConsume(QUEUE_NAME, true, consumer); } }
八、訂閱模型-Direct
在廣播模式中,生產者發布消息,所有消費者都可以獲取所有消息。
在路由模式中,我們將添加一個功能 - 我們將只能訂閱一部分消息。
但是,在某些場景下,我們希望不同的消息被不同的隊列消費。這時就要用到Direct類型的Exchange。
在Direct模型下,隊列與交換機的綁定,不能是任意綁定了,而是要指定一個RoutingKey(路由key)
消息的發送方在向Exchange發送消息時,也必須指定消息的routing key。
P:生產者,向Exchange發送消息,發送消息時,會指定一個routing key。
X:Exchange(交換機),接收生產者的消息,然后把消息遞交給 與routing key完全匹配的隊列
C1:消費者,其所在隊列指定了需要routing key 為 error 的消息
C2:消費者,其所在隊列指定了需要routing key 為 info、error、warning 的消息
代碼演示:
生產者
此處我們模擬商品的增刪改,發送消息的RoutingKey分別是:insert、update、delete
import cn.itcast.rabbitmq.util.ConnectionUtil; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; /** * 生產者,模擬為商品服務 */ public class Send { private final static String EXCHANGE_NAME = "direct_exchange_test"; public static void main(String[] argv) throws Exception { // 獲取到連接 Connection connection = ConnectionUtil.getConnection(); // 獲取通道 Channel channel = connection.createChannel(); // 聲明exchange,指定類型為direct channel.exchangeDeclare(EXCHANGE_NAME, "direct"); // 消息內容 String message = "商品刪除了, id = 1001"; // 發送消息,并且指定routing key 為:insert ,代表新增商品 channel.basicPublish(EXCHANGE_NAME, "delete", null, message.getBytes()); System.out.println(" [商品服務:] Sent '" + message + "'"); channel.close(); connection.close(); } }
消費者1和消費者2
import java.io.IOException; import com.rabbitmq.client.AMQP.BasicProperties; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.DefaultConsumer; import com.rabbitmq.client.Envelope; import cn.itcast.rabbitmq.util.ConnectionUtil; /** * 消費者1 */ public class Recv { private final static String QUEUE_NAME = "direct_exchange_queue_1"; private final static String EXCHANGE_NAME = "direct_exchange_test"; public static void main(String[] argv) throws Exception { // 獲取到連接 Connection connection = ConnectionUtil.getConnection(); // 獲取通道 Channel channel = connection.createChannel(); // 聲明隊列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 綁定隊列到交換機,同時指定需要訂閱的routing key。假設此處需要update和delete消息 channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "update"); channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "delete"); // 定義隊列的消費者 DefaultConsumer consumer = new DefaultConsumer(channel) { // 獲取消息,并且處理,這個方法類似事件監聽,如果有消息的時候,會被自動調用 @Override public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException { // body 即消息體 String msg = new String(body); System.out.println(" [消費者1] received : " + msg + "!"); } }; // 監聽隊列,自動ACK channel.basicConsume(QUEUE_NAME, true, consumer); } } import java.io.IOException; import com.rabbitmq.client.AMQP.BasicProperties; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.DefaultConsumer; import com.rabbitmq.client.Envelope; import cn.itcast.rabbitmq.util.ConnectionUtil; /** * 消費者2 */ public class Recv2 { private final static String QUEUE_NAME = "direct_exchange_queue_2"; private final static String EXCHANGE_NAME = "direct_exchange_test"; public static void main(String[] argv) throws Exception { // 獲取到連接 Connection connection = ConnectionUtil.getConnection(); // 獲取通道 Channel channel = connection.createChannel(); // 聲明隊列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 綁定隊列到交換機,同時指定需要訂閱的routing key。訂閱 insert、update、delete channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "insert"); channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "update"); channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "delete"); // 定義隊列的消費者 DefaultConsumer consumer = new DefaultConsumer(channel) { // 獲取消息,并且處理,這個方法類似事件監聽,如果有消息的時候,會被自動調用 @Override public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException { // body 即消息體 String msg = new String(body); System.out.println(" [消費者2] received : " + msg + "!"); } }; // 監聽隊列,自動ACK channel.basicConsume(QUEUE_NAME, true, consumer); } }
九、訂閱模型-Topic
Topic類型的Exchange與Direct相比,都是可以根據RoutingKey把消息路由到不同的隊列。只不過Topic類型Exchange可以讓隊列在綁定Routing key 的時候使用通配符!
Routingkey 一般都是有一個或多個單詞組成,多個單詞之間以”.”分割,例如: item.insert
通配符規則:
`#`:匹配一個或多個詞
`*`:匹配不多不少恰好1個詞
代碼演示
生產者
import cn.itcast.rabbitmq.util.ConnectionUtil; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; /** * 生產者,模擬為商品服務 */ public class Send { private final static String EXCHANGE_NAME = "topic_exchange_test"; public static void main(String[] argv) throws Exception { // 獲取到連接 Connection connection = ConnectionUtil.getConnection(); // 獲取通道 Channel channel = connection.createChannel(); // 聲明exchange,指定類型為topic channel.exchangeDeclare(EXCHANGE_NAME, "topic"); // 消息內容 String message = "新增商品 : id = 1001"; // 發送消息,并且指定routing key 為:insert ,代表新增商品 channel.basicPublish(EXCHANGE_NAME, "item.insert", null, message.getBytes()); System.out.println(" [商品服務:] Sent '" + message + "'"); channel.close(); connection.close(); } }
消費者1和消費者2
import java.io.IOException; import com.rabbitmq.client.AMQP.BasicProperties; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.DefaultConsumer; import com.rabbitmq.client.Envelope; import cn.itcast.rabbitmq.util.ConnectionUtil; /** * 消費者1 */ public class Recv { private final static String QUEUE_NAME = "topic_exchange_queue_1"; private final static String EXCHANGE_NAME = "topic_exchange_test"; public static void main(String[] argv) throws Exception { // 獲取到連接 Connection connection = ConnectionUtil.getConnection(); // 獲取通道 Channel channel = connection.createChannel(); // 聲明隊列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 綁定隊列到交換機,同時指定需要訂閱的routing key。需要 update、delete channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "item.update"); channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "item.delete"); // 定義隊列的消費者 DefaultConsumer consumer = new DefaultConsumer(channel) { // 獲取消息,并且處理,這個方法類似事件監聽,如果有消息的時候,會被自動調用 @Override public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException { // body 即消息體 String msg = new String(body); System.out.println(" [消費者1] received : " + msg + "!"); } }; // 監聽隊列,自動ACK channel.basicConsume(QUEUE_NAME, true, consumer); } }
十、如何避免消息丟失?---持久化
1) 消費者的ACK機制。可以防止消費者丟失消息。
2) 但是,如果在消費者消費之前,MQ就宕機了,消息就沒了。
是可以將消息進行持久化
要將消息持久化,前提是:隊列、Exchange都持久化
交換機持久化
隊列持久化
隊列持久化
如果本篇博客對您有一定的幫助,大家記得留言++哦。
分布式消息隊列 RabbitMQ
版權聲明:本文內容由網絡用戶投稿,版權歸原作者所有,本站不擁有其著作權,亦不承擔相應法律責任。如果您發現本站中有涉嫌抄襲或描述失實的內容,請聯系我們jiasou666@gmail.com 處理,核實后本網站將在24小時內刪除侵權內容。