RabbitMQ消息模型詳解

      網友投稿 707 2022-05-29

      目錄

      一、消息隊列

      什么是消息隊列

      AMQP和JMS

      常見MQ產品

      二、RabbitMQ

      三、五種消息模型

      四、簡單消息模型

      代碼演示

      獲取連接

      生產者

      消費者

      手動ACK

      五、工作模式

      代碼演示:

      生產者

      消費者1和消費者2

      六、發布訂閱模式

      七、訂閱模型-Fanout

      演示代碼:

      生產者

      消費者1和消費者2

      八、訂閱模型-Direct

      代碼演示:

      生產者

      消費者1和消費者2

      九、訂閱模型-Topic

      代碼演示

      生產者

      消費者1和消費者2

      十、如何避免消息丟失?---持久化

      交換機持久化

      隊列持久化

      隊列持久化

      一、消息隊列

      什么是消息隊列

      消息隊列,即MQ,Message Queue。

      消息隊列是典型的:生產者、消費者模型。生產者不斷向消息隊列中生產消息,消費者不斷的從隊列中獲取消息。因為消息的生產和消費都是異步的,而且只關心消息的發送和接收,沒有業務邏輯的侵入,這樣就實現了生產者和消費者的解耦

      AMQP和JMS

      MQ是消息通信的模型,并不是具體實現。現在實現MQ的有兩種主流方式:AMQP、JMS。

      兩者間的區別和聯系:

      JMS是定義了統一的接口,來對消息操作進行統一;AMQP是通過規定協議來統一數據交互的格式

      JMS限定了必須使用Java語言;AMQP只是協議,不規定實現方式,因此是跨語言的。

      JMS規定了兩種消息模型;而AMQP的消息模型更加豐富

      常見MQ產品

      ActiveMQ:基于JMS

      RabbitMQ:基于AMQP協議,erlang語言開發,穩定性好

      RocketMQ:基于JMS,阿里巴巴產品,目前交由Apache基金會

      Kafka:分布式消息系統,高吞吐量

      二、RabbitMQ

      RabbitMQ是基于AMQP的一款消息管理系統

      官網: Messaging that just works — RabbitMQ

      官方教程:RabbitMQ Tutorials — RabbitMQ

      安裝教程:小小張自由—>張有博_CSDN博客-C#編程基礎,項目實戰,Java進階領域博主

      RabbitMQ 基本概念

      Message

      消息,消息是不具名的,它由消息頭和消息體組成。消息體是不透明的,而消息頭則由一系列的可選屬性組成,這些屬性包括routing-key(路由鍵)、priority(相對于其他消息的優先權)、delivery-mode(指出該消息可能需要持久性存儲)等。

      Publisher

      消息的生產者,也是一個向交換器發布消息的客戶端應用程序。

      Exchange

      交換器,用來接收生產者發送的消息并將這些消息路由給服務器中的隊列。

      Binding

      綁定,用于消息隊列和交換器之間的關聯。一個綁定就是基于路由鍵將交換器和消息隊列連接起來的路由規則,所以可以將交換器理解成一個由綁定構成的路由表。

      Queue

      消息隊列,用來保存消息直到發送給消費者。它是消息的容器,也是消息的終點。一個消息可投入一個或多個隊列。消息一直在隊列里面,等待消費者連接到這個隊列將其取走。

      Connection

      網絡連接,比如一個TCP連接。

      Channel

      信道,多路復用連接中的一條獨立的雙向數據流通道。信道是建立在真實的TCP連接內地虛擬連接,AMQP 命令都是通過信道發出去的,不管是發布消息、訂閱隊列還是接收消息,這些動作都是通過信道完成。因為對于操作系統來說建立和銷毀 TCP 都是非常昂貴的開銷,所以引入了信道的概念,以復用一條 TCP 連接。

      Consumer

      消息的消費者,表示一個從消息隊列中取得消息的客戶端應用程序。

      Virtual Host

      虛擬主機,表示一批交換器、消息隊列和相關對象。虛擬主機是共享相同的身份認證和加密環境的獨立服務器域。每個 vhost 本質上就是一個 mini 版的 RabbitMQ 服務器,擁有自己的隊列、交換器、綁定和權限機制。vhost 是 AMQP 概念的基礎,必須在連接時指定,RabbitMQ 默認的 vhost 是 / 。

      Broker

      表示消息隊列服務器實體。

      RabbitMQ消息模型詳解

      三、五種消息模型

      RabbitMQ提供了6種消息模型,但是第6種其實是RPC,并不是MQ,因此不予學習。那么也就剩下5種。

      但是其實3、4、5這三種都屬于訂閱模型,只不過進行路由的方式不同。

      四、簡單消息模型

      RabbitMQ是一個消息代理:它接受和轉發消息。 你可以把它想象成一個郵局:當你把郵件放在郵箱里時,你可以確定郵差先生最終會把郵件發送給你的收件人。 在這個比喻中,RabbitMQ是郵政信箱,郵局和郵遞員。

      RabbitMQ與郵局的主要區別是它不處理紙張,而是接受,存儲和轉發數據消息的二進制數據塊。

      P(producer/ publisher):生產者,一個發送消息的用戶應用程序。

      C(consumer):消費者,消費和接收有類似的意思,消費者是一個主要用來等待接收消息的用戶應用程序

      隊列(紅色區域):rabbitmq內部類似于郵箱的一個概念。雖然消息流經rabbitmq和你的應用程序,但是它們只能存儲在隊列中。隊列只受主機的內存和磁盤限制,實質上是一個大的消息緩沖區。許多生產者可以發送消息到一個隊列,許多消費者可以嘗試從一個隊列接收數據。

      總之:生產者將消息發送到隊列,消費者從隊列中獲取消息,隊列是存儲消息的緩沖區。

      代碼演示

      引入依賴

      org.springframework.boot spring-boot-starter-amqp 2.0.6.RELEASE

      配置文件

      spring: rabbitmq: host: 192.168.99.99 username: leyou password: leyou virtual-host: /leyou

      獲取連接

      import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.Connection; public class ConnectionUtil { /** * 建立與RabbitMQ的連接 * @return * @throws Exception */ public static Connection getConnection() throws Exception { //定義連接工廠 ConnectionFactory factory = new ConnectionFactory(); //設置服務地址 factory.setHost("192.168.99.99"); //端口 factory.setPort(5672); //設置賬號信息,用戶名、密碼、vhost factory.setVirtualHost("/leyou"); factory.setUsername("leyou"); factory.setPassword("leyou"); // 通過工程獲取連接 Connection connection = factory.newConnection(); return connection; } }

      生產者

      import cn.itcast.rabbitmq.util.ConnectionUtil; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; /** * 生產者 */ public class Send { private final static String QUEUE_NAME = "simple_queue"; public static void main(String[] argv) throws Exception { // 獲取到連接 Connection connection = ConnectionUtil.getConnection(); // 從連接中創建通道,使用通道才能完成消息相關的操作 Channel channel = connection.createChannel(); // 聲明(創建)隊列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 消息內容 String message = "Hello World!"; for (int i = 0; i < 10; i++) { // 向指定的隊列中發送消息 message=message+i; channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); System.out.println(" [x] Sent '" + message + "'"); } //關閉通道和連接 channel.close(); connection.close(); } }

      消費者

      import java.io.IOException; import com.rabbitmq.client.AMQP.BasicProperties; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.DefaultConsumer; import com.rabbitmq.client.Envelope; import cn.itcast.rabbitmq.util.ConnectionUtil; /** * 消費者 */ public class Recv { private final static String QUEUE_NAME = "simple_queue"; public static void main(String[] argv) throws Exception { // 獲取到連接 Connection connection = ConnectionUtil.getConnection(); // 創建通道 Channel channel = connection.createChannel(); // 聲明隊列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 定義隊列的消費者 DefaultConsumer consumer = new DefaultConsumer(channel) { // 獲取消息,并且處理,這個方法類似事件監聽,如果有消息的時候,會被自動調用 @Override public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException { // body 即消息體 String msg = new String(body); System.out.println(" [x] received : " + msg + "!"); } }; // 監聽隊列,第二個參數:是否自動進行消息確認。 channel.basicConsume(QUEUE_NAME, true, consumer); } }

      上述代碼中:消息一旦被消費者接收,隊列中的消息就會被刪除。

      如果消費者領取消息后,還沒執行操作就掛掉了呢?或者拋出了異常?消息消費失敗,但是RabbitMQ無從得知,這樣消息就丟失了!

      因此,RabbitMQ有一個ACK機制。當消費者獲取消息后,會向RabbitMQ發送回執ACK,告知消息已經被接收。不過這種回執ACK分兩種情況:

      自動ACK:消息一旦被接收,消費者自動發送ACK

      手動ACK:消息接收后,不會發送ACK,需要手動調用

      手動ACK

      import java.io.IOException; import com.rabbitmq.client.AMQP.BasicProperties; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.DefaultConsumer; import com.rabbitmq.client.Envelope; import cn.itcast.rabbitmq.util.ConnectionUtil; /** * 消費者,手動進行ACK */ public class Recv2 { private final static String QUEUE_NAME = "simple_queue"; public static void main(String[] argv) throws Exception { // 獲取到連接 Connection connection = ConnectionUtil.getConnection(); // 創建通道 final Channel channel = connection.createChannel(); // 聲明隊列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 定義隊列的消費者 DefaultConsumer consumer = new DefaultConsumer(channel) { // 獲取消息,并且處理,這個方法類似事件監聽,如果有消息的時候,會被自動調用 @Override public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException { // body 即消息體 //int i=1/0; String msg = new String(body); System.out.println(" [x] received : " + msg + "!"); // 手動進行ACK channel.basicAck(envelope.getDeliveryTag(), false); } }; // 監聽隊列,第二個參數false,手動進行ACK // 如果第二個參數為true,則會自動進行ACK;如果為false,則需要手動ACK。方法的聲明: channel.basicConsume(QUEUE_NAME, false, consumer); } }

      五、工作模式

      工作隊列或者競爭消費者模式

      工作隊列,又稱任務隊列。主要思想就是避免執行資源密集型任務時,必須等待它執行完成。相反我們稍后完成任務,我們將任務封裝為消息并將其發送到隊列。 在后臺運行的工作進程將獲取任務并最終執行作業。當你運行許多消費者時,任務將在他們之間共享,但是一個消息只能被一個消費者獲取。

      這個概念在Web應用程序中特別有用,因為在短的HTTP請求窗口中無法處理復雜的任務。

      接下來我們來模擬這個流程:

      P:生產者:任務的發布者

      C1:消費者,領取任務并且完成任務,假設完成速度較快

      C2:消費者2:領取任務并完成任務,假設完成速度慢

      我們可以使用basicQos方法和prefetchCount = 1設置。 這告訴RabbitMQ一次不要向工作人員發送多于一條消息。 或者換句話說,不要向工作人員發送新消息,直到它處理并確認了前一個消息。 相反,它會將其分派給不是仍然忙碌的下一個工作人員。

      代碼演示:

      生產者

      import cn.itcast.rabbitmq.util.ConnectionUtil; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; // 生產者 public class Send { private final static String QUEUE_NAME = "test_work_queue"; public static void main(String[] argv) throws Exception { // 獲取到連接 Connection connection = ConnectionUtil.getConnection(); // 獲取通道 Channel channel = connection.createChannel(); // 聲明隊列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 循環發布任務 for (int i = 0; i < 50; i++) { // 消息內容 String message = "task .. " + i; channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); System.out.println(" [x] Sent '" + message + "'"); Thread.sleep(i * 2); } // 關閉通道和連接 channel.close(); connection.close(); } }

      消費者1和消費者2

      import java.io.IOException; import com.rabbitmq.client.AMQP.BasicProperties; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.DefaultConsumer; import com.rabbitmq.client.Envelope; import cn.itcast.rabbitmq.util.ConnectionUtil; // 消費者1 public class Recv { private final static String QUEUE_NAME = "test_work_queue"; public static void main(String[] argv) throws Exception { // 獲取到連接 Connection connection = ConnectionUtil.getConnection(); // 獲取通道 final Channel channel = connection.createChannel(); // 聲明隊列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 設置每個消費者同時只能處理一條消息 //channel.basicQos(1); // 定義隊列的消費者 DefaultConsumer consumer = new DefaultConsumer(channel) { // 獲取消息,并且處理,這個方法類似事件監聽,如果有消息的時候,會被自動調用 @Override public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException { // body 即消息體 String msg = new String(body); System.out.println(" [消費者1] received : " + msg + "!"); try { // 模擬完成任務的耗時:1000ms Thread.sleep(1000); } catch (InterruptedException e) { } // 手動ACK channel.basicAck(envelope.getDeliveryTag(), false); } }; // 監聽隊列。 channel.basicConsume(QUEUE_NAME, false, consumer); } } import java.io.IOException; import com.rabbitmq.client.AMQP.BasicProperties; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.DefaultConsumer; import com.rabbitmq.client.Envelope; import cn.itcast.rabbitmq.util.ConnectionUtil; //消費者2 public class Recv2 { private final static String QUEUE_NAME = "test_work_queue"; public static void main(String[] argv) throws Exception { // 獲取到連接 Connection connection = ConnectionUtil.getConnection(); // 獲取通道 final Channel channel = connection.createChannel(); // 聲明隊列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 設置每個消費者同時只能處理一條消息 //channel.basicQos(1); // 定義隊列的消費者 DefaultConsumer consumer = new DefaultConsumer(channel) { // 獲取消息,并且處理,這個方法類似事件監聽,如果有消息的時候,會被自動調用 @Override public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException { // body 即消息體 String msg = new String(body); System.out.println(" [消費者2] received : " + msg + "!"); // 手動ACK channel.basicAck(envelope.getDeliveryTag(), false); } }; // 監聽隊列。 channel.basicConsume(QUEUE_NAME, false, consumer); } }

      面試題:避免消息堆積?

      1)采用workqueue,多個消費者監聽同一隊列。

      2)接收到消息以后,而是通過線程池,異步消費。

      六、發布訂閱模式

      解讀:

      1、1個生產者,多個消費者

      2、每一個消費者都有自己的一個隊列

      3、生產者沒有將消息直接發送到隊列,而是發送到了交換機

      4、每個隊列都要綁定到交換機

      5、生產者發送的消息,經過交換機到達隊列,實現一個消息被多個消費者獲取的目的

      X(Exchanges):交換機一方面:接收生產者發送的消息。另一方面:知道如何處理消息,例如遞交給某個特別隊列、遞交給所有隊列、或是將消息丟棄。到底如何操作,取決于Exchange的類型。Exchange(交換機)只負責轉發消息,不具備存儲消息的能力,因此如果沒有任何隊列與Exchange綁定,或者沒有符合路由規則的隊列,那么消息會丟失!

      Exchange類型有以下幾種:

      Fanout:廣播,將消息交給所有綁定到交換機的隊列

      Direct:定向,把消息交給符合指定routing key 的隊列

      Topic:通配符,把消息交給符合routing pattern(路由模式) 的隊列

      七、訂閱模型-Fanout

      Fanout,也稱為廣播。

      流程圖:

      在廣播模式下,消息發送流程是這樣的:

      1) 可以有多個消費者

      2) 每個消費者有自己的queue(隊列)

      3) 每個隊列都要綁定到Exchange(交換機)

      4) 生產者發送的消息,只能發送到交換機,交換機來決定要發給哪個隊列,生產者無法決定。

      5) 交換機把消息發送給綁定過的所有隊列

      6) 隊列的消費者都能拿到消息。實現一條消息被多個消費者消費

      演示代碼:

      生產者

      1) 聲明Exchange,不再聲明Queue

      2) 發送消息到Exchange,不再發送到Queue

      import cn.itcast.rabbitmq.util.ConnectionUtil; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.MessageProperties; public class Send { private final static String EXCHANGE_NAME = "fanout_exchange_test"; public static void main(String[] argv) throws Exception { // 獲取到連接 Connection connection = ConnectionUtil.getConnection(); // 獲取通道 Channel channel = connection.createChannel(); // 聲明exchange,指定類型為fanout channel.exchangeDeclare(EXCHANGE_NAME, "fanout",true); // 消息內容 String message = "Hello everyone"; // 發布消息到Exchange channel.basicPublish(EXCHANGE_NAME, "", MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes()); System.out.println(" [生產者] Sent '" + message + "'"); channel.close(); connection.close(); } }

      消費者1和消費者2

      import java.io.IOException; import com.rabbitmq.client.AMQP.BasicProperties; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.DefaultConsumer; import com.rabbitmq.client.Envelope; import cn.itcast.rabbitmq.util.ConnectionUtil; //消費者1 public class Recv { private final static String QUEUE_NAME = "fanout_exchange_queue_1"; private final static String EXCHANGE_NAME = "fanout_exchange_test"; public static void main(String[] argv) throws Exception { // 獲取到連接 Connection connection = ConnectionUtil.getConnection(); // 獲取通道 Channel channel = connection.createChannel(); // 聲明隊列 channel.queueDeclare(QUEUE_NAME, true, false, false, null); // 綁定隊列到交換機 channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ""); // 定義隊列的消費者 DefaultConsumer consumer = new DefaultConsumer(channel) { // 獲取消息,并且處理,這個方法類似事件監聽,如果有消息的時候,會被自動調用 @Override public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException { // body 即消息體 String msg = new String(body); System.out.println(" [消費者1] received : " + msg + "!"); } }; // 監聽隊列,自動返回完成 channel.basicConsume(QUEUE_NAME, true, consumer); } } import java.io.IOException; import com.rabbitmq.client.AMQP.BasicProperties; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.DefaultConsumer; import com.rabbitmq.client.Envelope; import cn.itcast.rabbitmq.util.ConnectionUtil; // 消費者2 public class Recv2 { private final static String QUEUE_NAME = "fanout_exchange_queue_2"; private final static String EXCHANGE_NAME = "fanout_exchange_test"; public static void main(String[] argv) throws Exception { // 獲取到連接 Connection connection = ConnectionUtil.getConnection(); // 獲取通道 Channel channel = connection.createChannel(); // 聲明隊列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 綁定隊列到交換機 channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ""); // 定義隊列的消費者 DefaultConsumer consumer = new DefaultConsumer(channel) { // 獲取消息,并且處理,這個方法類似事件監聽,如果有消息的時候,會被自動調用 @Override public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException { // body 即消息體 String msg = new String(body); System.out.println(" [消費者2] received : " + msg + "!"); } }; // 監聽隊列,手動返回完成 channel.basicConsume(QUEUE_NAME, true, consumer); } }

      八、訂閱模型-Direct

      在廣播模式中,生產者發布消息,所有消費者都可以獲取所有消息。

      在路由模式中,我們將添加一個功能 - 我們將只能訂閱一部分消息。

      但是,在某些場景下,我們希望不同的消息被不同的隊列消費。這時就要用到Direct類型的Exchange。

      在Direct模型下,隊列與交換機的綁定,不能是任意綁定了,而是要指定一個RoutingKey(路由key)

      消息的發送方在向Exchange發送消息時,也必須指定消息的routing key。

      P:生產者,向Exchange發送消息,發送消息時,會指定一個routing key。

      X:Exchange(交換機),接收生產者的消息,然后把消息遞交給 與routing key完全匹配的隊列

      C1:消費者,其所在隊列指定了需要routing key 為 error 的消息

      C2:消費者,其所在隊列指定了需要routing key 為 info、error、warning 的消息

      代碼演示:

      生產者

      此處我們模擬商品的增刪改,發送消息的RoutingKey分別是:insert、update、delete

      import cn.itcast.rabbitmq.util.ConnectionUtil; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; /** * 生產者,模擬為商品服務 */ public class Send { private final static String EXCHANGE_NAME = "direct_exchange_test"; public static void main(String[] argv) throws Exception { // 獲取到連接 Connection connection = ConnectionUtil.getConnection(); // 獲取通道 Channel channel = connection.createChannel(); // 聲明exchange,指定類型為direct channel.exchangeDeclare(EXCHANGE_NAME, "direct"); // 消息內容 String message = "商品刪除了, id = 1001"; // 發送消息,并且指定routing key 為:insert ,代表新增商品 channel.basicPublish(EXCHANGE_NAME, "delete", null, message.getBytes()); System.out.println(" [商品服務:] Sent '" + message + "'"); channel.close(); connection.close(); } }

      消費者1和消費者2

      import java.io.IOException; import com.rabbitmq.client.AMQP.BasicProperties; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.DefaultConsumer; import com.rabbitmq.client.Envelope; import cn.itcast.rabbitmq.util.ConnectionUtil; /** * 消費者1 */ public class Recv { private final static String QUEUE_NAME = "direct_exchange_queue_1"; private final static String EXCHANGE_NAME = "direct_exchange_test"; public static void main(String[] argv) throws Exception { // 獲取到連接 Connection connection = ConnectionUtil.getConnection(); // 獲取通道 Channel channel = connection.createChannel(); // 聲明隊列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 綁定隊列到交換機,同時指定需要訂閱的routing key。假設此處需要update和delete消息 channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "update"); channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "delete"); // 定義隊列的消費者 DefaultConsumer consumer = new DefaultConsumer(channel) { // 獲取消息,并且處理,這個方法類似事件監聽,如果有消息的時候,會被自動調用 @Override public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException { // body 即消息體 String msg = new String(body); System.out.println(" [消費者1] received : " + msg + "!"); } }; // 監聽隊列,自動ACK channel.basicConsume(QUEUE_NAME, true, consumer); } } import java.io.IOException; import com.rabbitmq.client.AMQP.BasicProperties; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.DefaultConsumer; import com.rabbitmq.client.Envelope; import cn.itcast.rabbitmq.util.ConnectionUtil; /** * 消費者2 */ public class Recv2 { private final static String QUEUE_NAME = "direct_exchange_queue_2"; private final static String EXCHANGE_NAME = "direct_exchange_test"; public static void main(String[] argv) throws Exception { // 獲取到連接 Connection connection = ConnectionUtil.getConnection(); // 獲取通道 Channel channel = connection.createChannel(); // 聲明隊列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 綁定隊列到交換機,同時指定需要訂閱的routing key。訂閱 insert、update、delete channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "insert"); channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "update"); channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "delete"); // 定義隊列的消費者 DefaultConsumer consumer = new DefaultConsumer(channel) { // 獲取消息,并且處理,這個方法類似事件監聽,如果有消息的時候,會被自動調用 @Override public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException { // body 即消息體 String msg = new String(body); System.out.println(" [消費者2] received : " + msg + "!"); } }; // 監聽隊列,自動ACK channel.basicConsume(QUEUE_NAME, true, consumer); } }

      九、訂閱模型-Topic

      Topic類型的Exchange與Direct相比,都是可以根據RoutingKey把消息路由到不同的隊列。只不過Topic類型Exchange可以讓隊列在綁定Routing key 的時候使用通配符!

      Routingkey 一般都是有一個或多個單詞組成,多個單詞之間以”.”分割,例如: item.insert

      通配符規則:

      `#`:匹配一個或多個詞

      `*`:匹配不多不少恰好1個詞

      代碼演示

      生產者

      import cn.itcast.rabbitmq.util.ConnectionUtil; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; /** * 生產者,模擬為商品服務 */ public class Send { private final static String EXCHANGE_NAME = "topic_exchange_test"; public static void main(String[] argv) throws Exception { // 獲取到連接 Connection connection = ConnectionUtil.getConnection(); // 獲取通道 Channel channel = connection.createChannel(); // 聲明exchange,指定類型為topic channel.exchangeDeclare(EXCHANGE_NAME, "topic"); // 消息內容 String message = "新增商品 : id = 1001"; // 發送消息,并且指定routing key 為:insert ,代表新增商品 channel.basicPublish(EXCHANGE_NAME, "item.insert", null, message.getBytes()); System.out.println(" [商品服務:] Sent '" + message + "'"); channel.close(); connection.close(); } }

      消費者1和消費者2

      import java.io.IOException; import com.rabbitmq.client.AMQP.BasicProperties; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.DefaultConsumer; import com.rabbitmq.client.Envelope; import cn.itcast.rabbitmq.util.ConnectionUtil; /** * 消費者1 */ public class Recv { private final static String QUEUE_NAME = "topic_exchange_queue_1"; private final static String EXCHANGE_NAME = "topic_exchange_test"; public static void main(String[] argv) throws Exception { // 獲取到連接 Connection connection = ConnectionUtil.getConnection(); // 獲取通道 Channel channel = connection.createChannel(); // 聲明隊列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 綁定隊列到交換機,同時指定需要訂閱的routing key。需要 update、delete channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "item.update"); channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "item.delete"); // 定義隊列的消費者 DefaultConsumer consumer = new DefaultConsumer(channel) { // 獲取消息,并且處理,這個方法類似事件監聽,如果有消息的時候,會被自動調用 @Override public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException { // body 即消息體 String msg = new String(body); System.out.println(" [消費者1] received : " + msg + "!"); } }; // 監聽隊列,自動ACK channel.basicConsume(QUEUE_NAME, true, consumer); } }

      十、如何避免消息丟失?---持久化

      1) 消費者的ACK機制。可以防止消費者丟失消息。

      2) 但是,如果在消費者消費之前,MQ就宕機了,消息就沒了。

      是可以將消息進行持久化

      要將消息持久化,前提是:隊列、Exchange都持久化

      交換機持久化

      隊列持久化

      隊列持久化

      如果本篇博客對您有一定的幫助,大家記得留言++哦。

      分布式消息隊列 RabbitMQ

      版權聲明:本文內容由網絡用戶投稿,版權歸原作者所有,本站不擁有其著作權,亦不承擔相應法律責任。如果您發現本站中有涉嫌抄襲或描述失實的內容,請聯系我們jiasou666@gmail.com 處理,核實后本網站將在24小時內刪除侵權內容。

      上一篇:藍牙核心規范(V5.2)7.1-深入詳解之L2CAP(2)
      下一篇:【GCC編譯優化系列】從KEIL轉戰GCC,一個C庫函數讓你的bin文件增大好十幾KB!
      相關文章
      亚洲av成人综合网| 亚洲一区精品中文字幕| 亚洲日产2021三区在线| 亚洲av无码专区在线播放| 不卡一卡二卡三亚洲| 亚洲综合色视频在线观看| 天堂亚洲免费视频| 亚洲av无码成人影院一区 | 亚洲午夜久久久影院伊人| 亚洲精品视频免费观看| www.亚洲精品.com| 亚洲精品视频免费| 亚洲午夜福利717| 久久亚洲国产午夜精品理论片| 亚洲乱码国产一区三区| 亚洲精品亚洲人成人网| 亚洲国产精品无码中文字| 亚洲AV无码乱码国产麻豆穿越 | 亚洲一区爱区精品无码| 亚洲色婷婷一区二区三区| 伊伊人成亚洲综合人网7777| 亚洲色WWW成人永久网址| 久久久久久久综合日本亚洲| 婷婷精品国产亚洲AV麻豆不片 | 国产99久久亚洲综合精品| 亚洲av再在线观看| 中文字幕亚洲专区| 亚洲AV无码乱码在线观看裸奔| 亚洲国产精品线在线观看| 亚洲欧洲免费视频| 亚洲成人网在线观看| 久久久国产亚洲精品| 亚洲精品精华液一区二区| 日韩精品电影一区亚洲| 国产AV无码专区亚洲AV漫画| 久久精品国产亚洲AV果冻传媒| 亚洲精品人成电影网| 亚洲中文字幕一二三四区| 337P日本欧洲亚洲大胆精品| 国产成人亚洲精品影院| 亚洲av无码av制服另类专区|