XML DOM 獲取節點值
717
2025-03-31
(使用Java客戶端)
一、概述
在前面的學習中,構建了一個簡單的日志記錄系統,能夠廣播所有的日志給多個接收者,在該部分學習中,將添加一個新的特點,就是可以只訂閱一個特定的消息源,也就是說能夠直接把關鍵的錯誤日志消息發送到日志文件保存起來,不重要的日志信息文件不保存在磁盤中,但是仍然能夠在控制臺輸出,那么這便是我們這部分要學習的消息的路由分發機制。
二、路由功能實現
2.1、綁定(bindings)
在前面的學習中已經創建了綁定(bindings),代碼如下:
channel.queueBind(queueName, EXCHANGE_NAME, "")
一個綁定就是一個關于exchange和queue的關系,它可以簡單的被理解為:隊列是從這個exchange中獲取消息的。
綁定可以采取一個額外的routingKey的參數,為了避免與basicPublish參數沖突,稱之為一個綁定Key,這是如何創建一個帶routingKey的綁定的關鍵。
channel.queueBind(queueName, EXCHANGE_NAME, "black");
一個綁定Key依賴于exchange的類型,像之前使用fanout類型的exchange,完全忽略了該綁定key的值。
2.2、直接交換(Direct exchange)
前面實現的日志記錄系統中廣播所有的消息給所有的消費者,現在對其進行擴展,允許根據信息的嚴重程度來對消息進行過濾,比如,希望一個程序寫入到磁盤的日志消息只接收錯誤的消息,而不是浪費磁盤保存所有的日志消息。
為了實現這個目標,使用一個fanout類型的exchange,顯然是不能夠滿足這樣的需求的,因為它只能廣播所有的消息。
為此將使用一個direct exchange來代替fanout exchange,direct exchange使用簡單的路由算法,將消息通過綁定的Key匹配將要到達的隊列。
從上面的結構圖中可以看出direct exchange X綁定著兩個queue(Q1,Q2),第一個queue綁定的routingKey為orange,第二個有兩個routingKey被綁定,一個routingKey為black,另外一個routingKey為green.
說明:發送帶有routingKey為orange的消息到X(exchange)中,X將該消息路由到Q1中,發送帶有routingKey為black和green的消息都將被路由到Q2中,其他所有消息將會被丟棄。
2.3、多綁定(Multiple bindings)
多個隊列綁定相同的routingKey是允許的,在上述實例中,可以把X和Q1用routingKey:black綁定起來,這種情況下,direct exchange將像fanout類型的exchange一樣會將消息廣播都到所有匹配的queues中,即一個routingKey為black的消息將會被發送到Q1和Q2中。
2.4、發送的日志
使用direct代替fanout類型的exchange,發送消息到一個direct exchange中,將根據消息的重要程度作為routingKey,這樣接收程序能夠選擇它想要接收的日志信息,首先必須先創建一個exchange.
channel.exchangeDeclare(EXCHANGE_NAME, "direct");
其次,發送一條信息:
channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes());
為了簡化程序,將severity設定為info、warning、error三種類型中的一種。
2.5、訂閱消息(Subscribing Message)
接收者根據自己感興趣的severity來創建一個新到的綁定。
String queueName = channel.queueDeclare().getQueue(); for(String severity : argv){ channel.queueBind(queueName, EXCHANGE_NAME, severity); }
2?.6、代碼實現
EmitLogDirect.java代碼清單如下:
package com.xuz.route; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; public class EmitLogDirect { private static final String EXCHANGE_NAME = "direct_logs"; public static void main(String[] args) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("127.0.0.1"); Connection conn = factory.newConnection(); Channel channel = conn.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "direct"); //日志類型 String[] msgType = new String[]{"info","warning","error"}; String severity = getSeverity(msgType); //測試信息 String[] msg = new String[]{"xuz RabbitMQ Routing Test!","very Good!","This is a Info"}; String message = getMessage(msg); channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes()); System.out.println("send:["+severity+":"+message+"]"); channel.close(); conn.close(); } private static String getMessage(String[] strings) { if(strings.length<2){ return "Hello World!"; } return joinStrings(strings,"",1); } private static String joinStrings(String[] strings, String string,int startIndex) { int length = strings.length; if (length == 0 ) return ""; if (length < startIndex ) return ""; StringBuilder words = new StringBuilder(strings[startIndex]); for (int i = startIndex + 1; i < length; i++) { words.append(string).append(strings[i]); } return words.toString(); } private static String getSeverity(String[] strings) { if(strings.length<1){ return "info"; } return strings[0]; } }
ReceiveLogsDirect代碼清單如下:
package com.xuz.route; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.QueueingConsumer; /** * 接收所有類型的消息 * @author Administrator * */ public class ReceiveLogsDirectAll { private static final String EXCHANGE_NAME = "direct_logs"; public static void main(String[] args) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("127.0.0.1"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "direct"); String queueName = channel.queueDeclare().getQueue(); String[] msgType = new String[]{"info","warning","error"}; if (msgType.length < 1){ System.err.println("Usage: ReceiveLogsDirect [info] [warning] [error]"); System.exit(1); } /** * 綁定多種類型包括:info、warning、error */ for(String severity : msgType){ channel.queueBind(queueName, EXCHANGE_NAME, severity); } System.out.println(" ReceiveLogsDirectAll---->Waiting for messages. To exit press CTRL+C"); QueueingConsumer consumer = new QueueingConsumer(channel); channel.basicConsume(queueName, true, consumer); while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String message = new String(delivery.getBody()); String routingKey = delivery.getEnvelope().getRoutingKey(); System.out.println("ReceiveLogsDirectAll----->Received '" + routingKey + "':'" + message + "'"); } } }
NAT RabbitMQ
版權聲明:本文內容由網絡用戶投稿,版權歸原作者所有,本站不擁有其著作權,亦不承擔相應法律責任。如果您發現本站中有涉嫌抄襲或描述失實的內容,請聯系我們jiasou666@gmail.com 處理,核實后本網站將在24小時內刪除侵權內容。
版權聲明:本文內容由網絡用戶投稿,版權歸原作者所有,本站不擁有其著作權,亦不承擔相應法律責任。如果您發現本站中有涉嫌抄襲或描述失實的內容,請聯系我們jiasou666@gmail.com 處理,核實后本網站將在24小時內刪除侵權內容。