RabbitMQ 第3章 RabbitMQ Work Queues(工作隊列)
一、概述
工作隊列(Work queues)
(使用Java客戶端)
它在web應用中是非常有用的,因為在很短的時間內(nèi)http請求窗口處理一個復雜的任務是不可能實現(xiàn)的,它的結構如下圖-1所示:
二、實現(xiàn)步驟
2.1、準備
在本部分內(nèi)容之前,已經(jīng)實現(xiàn)了發(fā)送單條”Hello World!“的消息,現(xiàn)在將發(fā)送一些復雜的字符串任務,由于沒有一個真實的生產(chǎn)環(huán)境來模擬,我們可以通過使用Thread.sleep()函數(shù)來假設任務通過描述字符串hello...將要花費三秒鐘的時間。
從之前的實例中,我們只需要稍微修改Sender01.java代碼,允許任意的消息通過命令行發(fā)送,這一計劃將安排到工作隊列中的任務,讓我們將
String message = getMessage(argv); channel.basicPublish("", "hello", null, message.getBytes()); System.out.println(" [x] Sent '" + message + "'");
消息處理:
private static String getMessage(String[] strings){ if (strings.length < 1) return "Hello World!"; return joinStrings(strings, " "); } private static String joinStrings(String[] strings, String delimiter) { int length = strings.length; if (length == 0) return ""; StringBuilder words = new StringBuilder(strings[0]); for (int i = 1; i < length; i++) { words.append(delimiter).append(strings[i]); } return words.toString(); }
之前的舊的Recv01.java也需要做一些改變。
while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String message = new String(delivery.getBody()); System.out.println(" [x] Received '" + message + "'"); doWork(message); System.out.println(" [x] Done"); }
模擬假任務執(zhí)行時間:
private static void doWork(String task) throws InterruptedException { for (char ch: task.toCharArray()) { if (ch == '.') Thread.sleep(1000); } }
2.2、輪詢調(diào)度
使用消息隊列的優(yōu)點之一就是能夠方便地并行工作,如果我們建立了大量的工作任務,我們就可以添加更多的worker,這樣大規(guī)模應用就顯的比較容易,首先讓我們嘗試同時運行兩個Worker.java腳本,他們都將獲得隊列中的消息,但是具體情況如下,我們需要打開三個控制臺,兩個將運行Worker.java的腳本,這些控制臺將顯示兩個消費者C1和C2,第三個控制臺將發(fā)布一個新的任務,一旦啟動了消費者,你將可以通過第三個控制臺發(fā)布一些消息,在默認情況下RabbitMQ將發(fā)送每條消息給下一個消費者,在隊列里每個消費者將獲得同樣數(shù)量的消息,這種方式被稱之輪詢調(diào)度。
2.3、消息確認
為了確保每條消息不丟失,RabbitMQ支持消息的確認(Acknowledgments),一個確認被送回消費者告訴RabbitMQ的一個特定的消息一經(jīng)被接收和處理,RabbitMQ此時才可以將該特定的消息刪除。
如果消費者進程被殺掉而沒有發(fā)送一個確認給RabbitMQ服務器,RabbitMQ會明白這個消息是沒有被正常完成處理。
QueueingConsumer consumer = new QueueingConsumer(channel); boolean autoAck = false; channel.basicConsume("hello", autoAck, consumer); while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); //... channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); }
2.4、消息持久化
前面已經(jīng)給出了如何確保消費者down了,任務也不會丟失,但是我們的任務在RabbitMQ服務器停止時還是可能會被丟失,當RabbitMQ服務器宕機或者奔潰時,它會丟失所有的隊列和消息,除非告訴它不要這么做,需要做兩件事情確保消息不會被丟失,我們需要標記隊列和消息持久化,首先我們需要確保RabbitMQ永遠不會丟失隊列,為了做到這點,我們需要將隊列聲明為持久化:
boolean durable = true; channel.queueDeclare("task_queue", durable, false, false, null);
此時queueDeclare變化需要被兼容到生產(chǎn)者和消費者的代碼中,在這一點上,我們肯定不會丟失隊列,即使task_queue隊列所屬的RabbitMQ服務器重新啟動,現(xiàn)在我們需要將消息標記為持久性,通過設置MessageProperties值為 PERSISTENT_TEXT_PLAN來實現(xiàn)。
import com.rabbitmq.client.MessageProperties; channel.basicPublish("", "task_queue", MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
消息持久化需要注意的事項:
標記為持久化的消息并不能完全保證消息不會被丟失,雖然它告訴RabbitMQ的消息保存到磁盤,仍然有一個很短的時間內(nèi),RabbitMQ的消息一經(jīng)接收了,但是并沒有成功保存到磁盤,而是保存在緩存匯總,持久化的保證能力不足,但它是我們簡單任務隊列已經(jīng)足以滿足需求了。
2.5、公平調(diào)度
int prefetchCount = 1; channel.basicQos(prefetchCount);
它的結構圖如下圖-2所示:
2.6、完整的代碼清單:
package com.xuz.work; import java.io.IOException; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.ConsumerCancelledException; import com.rabbitmq.client.QueueingConsumer; import com.rabbitmq.client.ShutdownSignalException; public class Worker { private static final String TASK_QUEUE_NAME = "task_queue"; public static void main(String[] args) throws IOException, ShutdownSignalException, ConsumerCancelledException, InterruptedException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("127.0.0.1"); Connection conn = factory.newConnection(); Channel channel = conn.createChannel(); /** * true: * 這樣設置之后,服務器收到消息后就會立刻將消息寫入到硬盤,就可以防止突然服務器掛掉,而引起數(shù)據(jù)丟失了,但是服務器如果剛收到消息,還沒有來得寫入硬盤,就掛掉了,這樣 * 無法避免消息得丟失。 */ channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null); System.out.println("waiting for message.To exit press CTRL+C"); channel.basicQos(1); QueueingConsumer consumer = new QueueingConsumer(channel); /** * false:設置確認消息,true表示接收到消息之后,將返回給服務端確定消息 */ channel.basicConsume(TASK_QUEUE_NAME, false,consumer); while(true){ QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String message = new String(delivery.getBody()); System.out.println("Received:["+message+"] from Task"); doWork(message); System.out.println("Done!"); //設置消息確認機制,如將如下代碼注釋掉,則 //一旦將autoAck關閉之后,一定要記得處理完消息之后,向服務器確認消息。否則服務器將會一直轉發(fā)該消息 channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); } } private static void doWork(String message) throws InterruptedException { for (char ch : message.toCharArray()) { if(ch == '.')Thread.sleep(1000); } } }
Java RabbitMQ
版權聲明:本文內(nèi)容由網(wǎng)絡用戶投稿,版權歸原作者所有,本站不擁有其著作權,亦不承擔相應法律責任。如果您發(fā)現(xiàn)本站中有涉嫌抄襲或描述失實的內(nèi)容,請聯(lián)系我們jiasou666@gmail.com 處理,核實后本網(wǎng)站將在24小時內(nèi)刪除侵權內(nèi)容。