RabbitMQ 第3章 RabbitMQ Work Queues(工作隊列)

      網(wǎng)友投稿 702 2022-05-28

      一、概述

      工作隊列(Work queues)

      (使用Java客戶端)

      它在web應用中是非常有用的,因為在很短的時間內(nèi)http請求窗口處理一個復雜的任務是不可能實現(xiàn)的,它的結構如下圖-1所示:

      二、實現(xiàn)步驟

      2.1、準備

      在本部分內(nèi)容之前,已經(jīng)實現(xiàn)了發(fā)送單條”Hello World!“的消息,現(xiàn)在將發(fā)送一些復雜的字符串任務,由于沒有一個真實的生產(chǎn)環(huán)境來模擬,我們可以通過使用Thread.sleep()函數(shù)來假設任務通過描述字符串hello...將要花費三秒鐘的時間。

      從之前的實例中,我們只需要稍微修改Sender01.java代碼,允許任意的消息通過命令行發(fā)送,這一計劃將安排到工作隊列中的任務,讓我們將

      String message = getMessage(argv); channel.basicPublish("", "hello", null, message.getBytes()); System.out.println(" [x] Sent '" + message + "'");

      消息處理:

      private static String getMessage(String[] strings){ if (strings.length < 1) return "Hello World!"; return joinStrings(strings, " "); } private static String joinStrings(String[] strings, String delimiter) { int length = strings.length; if (length == 0) return ""; StringBuilder words = new StringBuilder(strings[0]); for (int i = 1; i < length; i++) { words.append(delimiter).append(strings[i]); } return words.toString(); }

      之前的舊的Recv01.java也需要做一些改變。

      while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String message = new String(delivery.getBody()); System.out.println(" [x] Received '" + message + "'"); doWork(message); System.out.println(" [x] Done"); }

      模擬假任務執(zhí)行時間:

      private static void doWork(String task) throws InterruptedException { for (char ch: task.toCharArray()) { if (ch == '.') Thread.sleep(1000); } }

      2.2、輪詢調(diào)度

      使用消息隊列的優(yōu)點之一就是能夠方便地并行工作,如果我們建立了大量的工作任務,我們就可以添加更多的worker,這樣大規(guī)模應用就顯的比較容易,首先讓我們嘗試同時運行兩個Worker.java腳本,他們都將獲得隊列中的消息,但是具體情況如下,我們需要打開三個控制臺,兩個將運行Worker.java的腳本,這些控制臺將顯示兩個消費者C1和C2,第三個控制臺將發(fā)布一個新的任務,一旦啟動了消費者,你將可以通過第三個控制臺發(fā)布一些消息,在默認情況下RabbitMQ將發(fā)送每條消息給下一個消費者,在隊列里每個消費者將獲得同樣數(shù)量的消息,這種方式被稱之輪詢調(diào)度。

      2.3、消息確認

      為了確保每條消息不丟失,RabbitMQ支持消息的確認(Acknowledgments),一個確認被送回消費者告訴RabbitMQ的一個特定的消息一經(jīng)被接收和處理,RabbitMQ此時才可以將該特定的消息刪除。

      如果消費者進程被殺掉而沒有發(fā)送一個確認給RabbitMQ服務器,RabbitMQ會明白這個消息是沒有被正常完成處理。

      QueueingConsumer consumer = new QueueingConsumer(channel); boolean autoAck = false; channel.basicConsume("hello", autoAck, consumer); while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); //... channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); }

      2.4、消息持久化

      前面已經(jīng)給出了如何確保消費者down了,任務也不會丟失,但是我們的任務在RabbitMQ服務器停止時還是可能會被丟失,當RabbitMQ服務器宕機或者奔潰時,它會丟失所有的隊列和消息,除非告訴它不要這么做,需要做兩件事情確保消息不會被丟失,我們需要標記隊列和消息持久化,首先我們需要確保RabbitMQ永遠不會丟失隊列,為了做到這點,我們需要將隊列聲明為持久化:

      boolean durable = true; channel.queueDeclare("task_queue", durable, false, false, null);

      此時queueDeclare變化需要被兼容到生產(chǎn)者和消費者的代碼中,在這一點上,我們肯定不會丟失隊列,即使task_queue隊列所屬的RabbitMQ服務器重新啟動,現(xiàn)在我們需要將消息標記為持久性,通過設置MessageProperties值為 PERSISTENT_TEXT_PLAN來實現(xiàn)。

      import com.rabbitmq.client.MessageProperties; channel.basicPublish("", "task_queue", MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());

      消息持久化需要注意的事項:

      標記為持久化的消息并不能完全保證消息不會被丟失,雖然它告訴RabbitMQ的消息保存到磁盤,仍然有一個很短的時間內(nèi),RabbitMQ的消息一經(jīng)接收了,但是并沒有成功保存到磁盤,而是保存在緩存匯總,持久化的保證能力不足,但它是我們簡單任務隊列已經(jīng)足以滿足需求了。

      2.5、公平調(diào)度

      RabbitMQ 第3章 RabbitMQ Work Queues(工作隊列)

      int prefetchCount = 1; channel.basicQos(prefetchCount);

      它的結構圖如下圖-2所示:

      2.6、完整的代碼清單:

      package com.xuz.work; import java.io.IOException; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.ConsumerCancelledException; import com.rabbitmq.client.QueueingConsumer; import com.rabbitmq.client.ShutdownSignalException; public class Worker { private static final String TASK_QUEUE_NAME = "task_queue"; public static void main(String[] args) throws IOException, ShutdownSignalException, ConsumerCancelledException, InterruptedException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("127.0.0.1"); Connection conn = factory.newConnection(); Channel channel = conn.createChannel(); /** * true: * 這樣設置之后,服務器收到消息后就會立刻將消息寫入到硬盤,就可以防止突然服務器掛掉,而引起數(shù)據(jù)丟失了,但是服務器如果剛收到消息,還沒有來得寫入硬盤,就掛掉了,這樣 * 無法避免消息得丟失。 */ channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null); System.out.println("waiting for message.To exit press CTRL+C"); channel.basicQos(1); QueueingConsumer consumer = new QueueingConsumer(channel); /** * false:設置確認消息,true表示接收到消息之后,將返回給服務端確定消息 */ channel.basicConsume(TASK_QUEUE_NAME, false,consumer); while(true){ QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String message = new String(delivery.getBody()); System.out.println("Received:["+message+"] from Task"); doWork(message); System.out.println("Done!"); //設置消息確認機制,如將如下代碼注釋掉,則 //一旦將autoAck關閉之后,一定要記得處理完消息之后,向服務器確認消息。否則服務器將會一直轉發(fā)該消息 channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); } } private static void doWork(String message) throws InterruptedException { for (char ch : message.toCharArray()) { if(ch == '.')Thread.sleep(1000); } } }

      Java RabbitMQ

      版權聲明:本文內(nèi)容由網(wǎng)絡用戶投稿,版權歸原作者所有,本站不擁有其著作權,亦不承擔相應法律責任。如果您發(fā)現(xiàn)本站中有涉嫌抄襲或描述失實的內(nèi)容,請聯(lián)系我們jiasou666@gmail.com 處理,核實后本網(wǎng)站將在24小時內(nèi)刪除侵權內(nèi)容。

      上一篇:使用 Python 進行穩(wěn)定可靠的文件操作
      下一篇:云遷移實踐:VMware虛擬機遷移到華為云
      相關文章
      亚洲邪恶天堂影院在线观看| 亚洲中文字幕无码久久精品1| 亚洲一区二区高清| 亚洲 暴爽 AV人人爽日日碰| 亚洲国语在线视频手机在线| 亚洲色图在线播放| 久久噜噜噜久久亚洲va久| 国产亚洲视频在线| 国产亚洲福利一区二区免费看| 欧洲亚洲综合一区二区三区| 小说专区亚洲春色校园| 亚洲爆乳无码专区www| 亚洲精华液一二三产区| 亚洲Av无码国产一区二区| 老司机亚洲精品影院在线观看| 国产精品久久久久久亚洲影视| 亚洲AV色欲色欲WWW| 国产综合成人亚洲区| 国产亚洲人成在线影院| 亚洲免费无码在线| 久久亚洲中文字幕精品一区| 亚洲精品中文字幕无码蜜桃| 亚洲AV无码精品色午夜果冻不卡| 国产AV无码专区亚洲A∨毛片| 亚洲A∨无码一区二区三区| 在线观看亚洲人成网站| 亚洲国产成人资源在线软件| 色老板亚洲视频免在线观| 亚洲日本va一区二区三区 | 亚洲AV午夜成人片| 亚洲高清在线播放| 亚洲日韩在线视频| 亚洲视频在线观看2018| 亚洲AV无码一区二区三区性色| 五月天婷亚洲天综合网精品偷| 中文字幕第13亚洲另类| 亚洲AV日韩AV高潮无码专区| 亚洲日本视频在线观看| 亚洲色大成网站www| 亚洲国产成人久久笫一页| 亚洲日韩中文字幕在线播放|