RabbitMQ 第5章 RabbitMQ Routing(路由)

      網友投稿 717 2025-03-31

      (使用Java客戶端)


      一、概述

      在前面的學習中,構建了一個簡單的日志記錄系統,能夠廣播所有的日志給多個接收者,在該部分學習中,將添加一個新的特點,就是可以只訂閱一個特定的消息源,也就是說能夠直接把關鍵的錯誤日志消息發送到日志文件保存起來,不重要的日志信息文件不保存在磁盤中,但是仍然能夠在控制臺輸出,那么這便是我們這部分要學習的消息的路由分發機制。

      二、路由功能實現

      2.1、綁定(bindings)

      在前面的學習中已經創建了綁定(bindings),代碼如下:

      channel.queueBind(queueName, EXCHANGE_NAME, "")

      一個綁定就是一個關于exchange和queue的關系,它可以簡單的被理解為:隊列是從這個exchange中獲取消息的。

      綁定可以采取一個額外的routingKey的參數,為了避免與basicPublish參數沖突,稱之為一個綁定Key,這是如何創建一個帶routingKey的綁定的關鍵。

      channel.queueBind(queueName, EXCHANGE_NAME, "black");

      一個綁定Key依賴于exchange的類型,像之前使用fanout類型的exchange,完全忽略了該綁定key的值。

      2.2、直接交換(Direct exchange)

      前面實現的日志記錄系統中廣播所有的消息給所有的消費者,現在對其進行擴展,允許根據信息的嚴重程度來對消息進行過濾,比如,希望一個程序寫入到磁盤的日志消息只接收錯誤的消息,而不是浪費磁盤保存所有的日志消息。

      為了實現這個目標,使用一個fanout類型的exchange,顯然是不能夠滿足這樣的需求的,因為它只能廣播所有的消息。

      為此將使用一個direct exchange來代替fanout exchange,direct exchange使用簡單的路由算法,將消息通過綁定的Key匹配將要到達的隊列。

      從上面的結構圖中可以看出direct exchange X綁定著兩個queue(Q1,Q2),第一個queue綁定的routingKey為orange,第二個有兩個routingKey被綁定,一個routingKey為black,另外一個routingKey為green.

      RabbitMQ 第5章 RabbitMQ Routing(路由)

      說明:發送帶有routingKey為orange的消息到X(exchange)中,X將該消息路由到Q1中,發送帶有routingKey為black和green的消息都將被路由到Q2中,其他所有消息將會被丟棄。

      2.3、多綁定(Multiple bindings)

      多個隊列綁定相同的routingKey是允許的,在上述實例中,可以把X和Q1用routingKey:black綁定起來,這種情況下,direct exchange將像fanout類型的exchange一樣會將消息廣播都到所有匹配的queues中,即一個routingKey為black的消息將會被發送到Q1和Q2中。

      2.4、發送的日志

      使用direct代替fanout類型的exchange,發送消息到一個direct exchange中,將根據消息的重要程度作為routingKey,這樣接收程序能夠選擇它想要接收的日志信息,首先必須先創建一個exchange.

      channel.exchangeDeclare(EXCHANGE_NAME, "direct");

      其次,發送一條信息:

      channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes());

      為了簡化程序,將severity設定為info、warning、error三種類型中的一種。

      2.5、訂閱消息(Subscribing Message)

      接收者根據自己感興趣的severity來創建一個新到的綁定。

      String queueName = channel.queueDeclare().getQueue(); for(String severity : argv){ channel.queueBind(queueName, EXCHANGE_NAME, severity); }

      2?.6、代碼實現

      EmitLogDirect.java代碼清單如下:

      package com.xuz.route; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; public class EmitLogDirect { private static final String EXCHANGE_NAME = "direct_logs"; public static void main(String[] args) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("127.0.0.1"); Connection conn = factory.newConnection(); Channel channel = conn.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "direct"); //日志類型 String[] msgType = new String[]{"info","warning","error"}; String severity = getSeverity(msgType); //測試信息 String[] msg = new String[]{"xuz RabbitMQ Routing Test!","very Good!","This is a Info"}; String message = getMessage(msg); channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes()); System.out.println("send:["+severity+":"+message+"]"); channel.close(); conn.close(); } private static String getMessage(String[] strings) { if(strings.length<2){ return "Hello World!"; } return joinStrings(strings,"",1); } private static String joinStrings(String[] strings, String string,int startIndex) { int length = strings.length; if (length == 0 ) return ""; if (length < startIndex ) return ""; StringBuilder words = new StringBuilder(strings[startIndex]); for (int i = startIndex + 1; i < length; i++) { words.append(string).append(strings[i]); } return words.toString(); } private static String getSeverity(String[] strings) { if(strings.length<1){ return "info"; } return strings[0]; } }

      ReceiveLogsDirect代碼清單如下:

      package com.xuz.route; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.QueueingConsumer; /** * 接收所有類型的消息 * @author Administrator * */ public class ReceiveLogsDirectAll { private static final String EXCHANGE_NAME = "direct_logs"; public static void main(String[] args) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("127.0.0.1"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "direct"); String queueName = channel.queueDeclare().getQueue(); String[] msgType = new String[]{"info","warning","error"}; if (msgType.length < 1){ System.err.println("Usage: ReceiveLogsDirect [info] [warning] [error]"); System.exit(1); } /** * 綁定多種類型包括:info、warning、error */ for(String severity : msgType){ channel.queueBind(queueName, EXCHANGE_NAME, severity); } System.out.println(" ReceiveLogsDirectAll---->Waiting for messages. To exit press CTRL+C"); QueueingConsumer consumer = new QueueingConsumer(channel); channel.basicConsume(queueName, true, consumer); while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String message = new String(delivery.getBody()); String routingKey = delivery.getEnvelope().getRoutingKey(); System.out.println("ReceiveLogsDirectAll----->Received '" + routingKey + "':'" + message + "'"); } } }

      NAT RabbitMQ

      版權聲明:本文內容由網絡用戶投稿,版權歸原作者所有,本站不擁有其著作權,亦不承擔相應法律責任。如果您發現本站中有涉嫌抄襲或描述失實的內容,請聯系我們jiasou666@gmail.com 處理,核實后本網站將在24小時內刪除侵權內容。

      版權聲明:本文內容由網絡用戶投稿,版權歸原作者所有,本站不擁有其著作權,亦不承擔相應法律責任。如果您發現本站中有涉嫌抄襲或描述失實的內容,請聯系我們jiasou666@gmail.com 處理,核實后本網站將在24小時內刪除侵權內容。

      上一篇:Excel中進行對比兩列數據是否相同的操作技巧(excel中如何對比兩列數據是否相同)
      下一篇:三級嵌入式準備(七)
      相關文章
      亚洲av无码专区在线| 亚洲国产精品久久人人爱| 亚洲中文字幕无码mv| 亚洲国色天香视频| 亚洲老熟女@TubeumTV| 中文字幕亚洲综合精品一区| 亚洲AV区无码字幕中文色 | 久久精品7亚洲午夜a| 亚洲国产精品无码久久久秋霞2 | 亚洲国产精品一区二区第一页| 亚洲精品成人片在线播放| 亚洲国产一成人久久精品| 亚洲s色大片在线观看| 亚洲va中文字幕无码久久| 久久亚洲精品成人777大小说| 亚洲国产精品第一区二区| 亚洲成a人片7777| 亚洲娇小性色xxxx| 亚洲色欲色欲www在线播放 | 亚洲男人第一av网站| 亚洲黄色免费观看| 亚洲综合校园春色| 亚洲精品人成网线在线播放va| 亚洲av成人无码网站…| 亚洲黄片毛片在线观看| 国产亚洲精久久久久久无码AV| 亚洲国产精品无码成人片久久| 亚洲AV日韩AV天堂一区二区三区 | 国产午夜亚洲精品国产成人小说| 亚洲香蕉网久久综合影视| 久久久久亚洲精品美女| 亚洲精品资源在线| 亚洲国产精品一区二区三区在线观看| 亚洲中文字幕乱码AV波多JI| 噜噜噜亚洲色成人网站| 亚洲中文字幕无码一久久区| 亚洲AV日韩AV天堂久久| 亚洲av无码精品网站| 亚洲国产人成在线观看| 亚洲国产日韩精品| 国产天堂亚洲国产碰碰|