分布式進階(十五)ZMQ(分布式進階小冊)

      網友投稿 1049 2022-05-30

      我們為什么需要ZMQ

      目前的應用程序很多都會包含跨網絡的組件,無論是局域網還是因特網。這些程序的開發者都會用到某種消息通信機制。有些人會使用某種消息隊列產品,而大多數人則會自己手工來做這些事,使用TCP或UDP協議。這些協議使用起來并不困難,但是,簡單地將消息從A發給B,和在任何情況下都能進行可靠的消息傳輸,這兩種情況顯然是不同的。

      讓我們看看在使用純TCP協議進行消息傳輸時會遇到的一些典型問題。任何可復用的消息傳輸層肯定或多或少地會要解決以下問題:

      如何處理I/O?是讓程序阻塞等待響應,還是在后臺處理這些事?這是軟件設計的關鍵因素。阻塞式的I/O操作會讓程序架構難以擴展,而后臺處理I/O也是比較困難的。

      如何處理那些臨時的、來去自由的組件?我們是否要將組件分為客戶端和服務端兩種,并要求服務端永不消失?那如果我們想要將服務端相連怎么辦?我們要每隔幾秒就進行重連嗎?

      我們如何表示一條消息?我們怎樣通過拆分消息,讓其變得易讀易寫,不用擔心緩存溢出,既能高效地傳輸小消息,又能勝任視頻等大型文件的傳輸?

      如何處理那些不能立刻發送出去的消息?比如我們需要等待一個網絡組件重新連接的時候?我們是直接丟棄該條消息,還是將它存入數據庫,或是內存中的一個隊列?

      要在哪里保存消息隊列?如果某個組件讀取消息隊列的速度很慢,造成消息的堆積怎么辦?我們要采取什么樣的策略?

      如何處理丟失的消息?我們是等待新的數據,請求重發,還是需要建立一套新的可靠性機制以保證消息不會丟失?如果這個機制自身崩潰了呢?

      如果我們想換一種網絡連接協議,如用廣播代替TCP單播?或者改用IPv6?我們是否需要重寫所有的應用程序,或者將這種協議抽象到一個單獨的層中?

      我們如何對消息進行路由?我們可以將消息同時發送給多個節點嗎?是否能將應答消息返回給請求的發送方?

      我們如何為另一種語言寫一個API?我們是否需要完全重寫某項協議,還是重新打包一個類庫?

      怎樣才能做到在不同的架構之間傳送消息?是否需要為消息規定一種編碼?

      我們如何處理網絡通信錯誤?等待并重試,還是直接忽略或取消?

      ZMQ就是這樣一種軟件:它高效,提供了嵌入式的類庫,使應用程序能夠很好地在網絡中擴展,成本低廉。

      ZMQ的主要特點有:

      ZMQ會在后臺線程異步地處理I/O操作,它使用一種不會死鎖的數據結構來存儲消息。

      網絡組件可以來去自如,ZMQ會負責自動重連,這就意味著你可以以任何順序啟動組件;用它創建的面向服務架構(SOA)中,服務端可以隨意地加入或退出網絡。

      ZMQ會在有必要的情況下自動將消息放入隊列中保存,一旦建立了連接就開始發送。

      ZMQ有閾值(HWM)的機制,可以避免消息溢出。當隊列已滿,ZMQ會自動阻塞發送者,或丟棄部分消息,這些行為取決于你所使用的消息模式。

      ZMQ可以讓你用不同的通信協議進行連接,如TCP、廣播、進程內、進程間。改變通信協議時你不需要去修改代碼。

      ZMQ會恰當地處理速度較慢的節點,會根據消息模式使用不同的策略。

      ZMQ提供了多種模式進行消息路由,如請求-應答模式、發布-訂閱模式等。這些模式可以用來搭建網絡拓撲結構。

      ZMQ中可以根據消息模式建立起一些中間裝置(很小巧),可以用來降低網絡的復雜程度。

      ZMQ會發送整個消息,使用消息幀的機制來傳遞。如果你發送了10KB大小的消息,你就會收到10KB大小的消息。

      ZMQ不強制使用某種消息格式,消息可以是0字節的,或是大到GB級的數據。當你表示這些消息時,可以選用諸如谷歌的protocol?buffers,XDR等序列化產品。

      ZMQ能夠智能地處理網絡錯誤,有時它會進行重試,有時會告知你某項操作發生了錯誤。

      ZMQ甚至可以降低對環境的污染,因為節省了CPU時間意味著節省了電能。

      其實ZMQ可以做的還不止這些,它會顛覆人們編寫網絡應用程序的模式。雖然從表面上看,它不過是提供了一套處理套接字的API,能夠用zmq_recv()和zmq_send()進行消息的收發,但是,消息處理將成為應用程序的核心部分,很快你的程序就會變成一個個消息處理模塊,這既美觀又自然。它的擴展性還很強,每項任務由一個節點(節點是一個線程)、同一臺機器上的兩個節點(節點是一個進程)、同一網絡上的兩臺機器(節點是一臺機器)來處理,而不需要改動應用程序。

      一、ZeroMQ的背景介紹

      官方:“ZMQ(以下ZeroMQ簡稱ZMQ)是一個簡單好用的傳輸層,像框架一樣的一個socket?library,他使得Socket編程更加簡單、簡潔和性能更高。是一個消息處理隊列庫,可在多個線程、內核和主機盒之間彈性伸縮。ZMQ的明確目標是“成為標準網絡協議棧的一部分,之后進入Linux內核”。現在還未看到它們的成功。但是,它無疑是極具前景的、并且是人們更加需要的“傳統”BSD套接字之上的一層封裝。ZMQ讓編寫高性能網絡應用程序極為簡單和有趣。”

      與其他消息中間件相比,ZMQ并不像是一個傳統意義上的消息隊列服務器,事實上,它也根本不是一個服務器,它更像是一個底層的網絡通訊庫,在Socket?API之上做了一層封裝,將網絡通訊、進程通訊和線程通訊抽象為統一的API接口。

      二、ZMQ是什么

      閱讀了ZMQ的Guide文檔后,我的理解是,這是個類似于Socket的一系列接口,他跟Socket的區別是:普通的socket是端到端的(1:1的關系),而ZMQ卻是可以N:M?的關系,人們對BSD套接字的了解較多的是點對點的連接,點對點連接需要顯式地建立連接、銷毀連接、選擇協議(TCP/UDP)和處理錯誤等,而ZMQ屏蔽了這些細節,讓你的網絡編程更為簡單。ZMQ用于node與node間的通信,node可以是主機或者是進程。

      三、三種模型

      參考網址:http://blog.csdn.net/whuqin/article/details/8442919/

      a)?應答模式:

      使用REQ-REP套接字發送和接受消息是需要遵循一定規律的。客戶端首先使用zmq_send()發送消息,再用zmq_recv()接收,如此循環。如果打亂了這個順序(如連續發送兩次)則會報錯。類似地,服務端必須先進行接收,后進行發送。

      b)?訂閱發布模式

      PUB-SUB套接字組合是異步的。客戶端在一個循環體中使用recv?()接收消息,如果向SUB套接字發送消息則會報錯;類似地,服務端可以不斷地使用send?()發送消息,但不能再PUB套接字上使用recv?()。

      關于PUB-SUB套接字,還有一點需要注意:你無法得知SUB是何時開始接收消息的。就算你先打開了SUB套接字,后打開PUB發送消息,這時SUB還是會丟失一些消息的,因為建立連接是需要一些時間的。很少,但并不是零。解決此問題需要在PUB端加入sleep。

      c)?基于分布式處理(管道模式)

      這篇博客對ZMQ有一個初步的介紹,下篇博客介紹如何通過JAVA來調用ZMQ實現消息處理。

      下面貼出PUB_SUB(應答模式)模式下的代碼:

      發布端:

      package?cn.edu.ujn.pub_sub;

      import?org.zeromq.ZMQ;

      import?org.zeromq.ZMQ.Context;

      import?org.zeromq.ZMQ.Socket;

      /**

      *?Pubsub?envelope?publisher

      */

      public?class?psenvpub?{

      public?static?void?main?(String[]?args)?throws?Exception?{

      //?Prepare?our?context?and?publisher

      Context?context?=?ZMQ.context(1);

      Socket?publisher?=?context.socket(ZMQ.PUB);

      publisher.bind("tcp://*:5563");

      while?(!Thread.currentThread?().isInterrupted?())?{

      //?Write?two?messages,?each?with?an?envelope?and?content

      publisher.sendMore?("A");

      publisher.send?("We?don't?want?to?see?this");

      publisher.sendMore?("B");

      publisher.send("We?would?like?to?see?this");

      }

      publisher.close?();

      context.term?();

      }

      }

      發布端需要通過context.socket(ZMQ.PUB)表示為發布端,通過bind方法來創建發布端連接,等待訂閱者連接。

      之后通過send方法將數據發送到出去。

      之后來寫訂閱端代碼

      package?cn.edu.ujn.pub_sub;

      import?org.zeromq.ZMQ;

      import?org.zeromq.ZMQ.Context;

      import?org.zeromq.ZMQ.Socket;

      /**

      *?Pubsub?envelope?subscriber

      */

      public?class?psenvsub?{

      public?static?void?main?(String[]?args)?{

      //?Prepare?our?context?and?subscriber

      Context?context?=?ZMQ.context(1);

      Socket?subscriber?=?context.socket(ZMQ.SUB);

      subscriber.connect("tcp://localhost:5563");

      subscriber.subscribe("B".getBytes());

      while?(!Thread.currentThread?().isInterrupted?())?{

      //?Read?envelope?with?address

      String?address?=?subscriber.recvStr?();

      //?Read?message?contents

      String?contents?=?subscriber.recvStr?();

      System.out.println(address?+?"?:?"?+?contents);

      }

      subscriber.close?();

      context.term?();

      }

      }

      客戶端通過connect進行連接,之后通過recv來進行數據接收。

      下面貼出REQ_REP(訂閱發布模式)模式下的代碼:

      發送端:

      package?cn.edu.ujn.req_rep;

      //

      //??Hello?World?server?in?Java

      //??Binds?REP?socket?to?tcp://*:5555

      //??Expects?"Hello"?from?client,?replies?with?"World"

      //

      import?org.zeromq.ZMQ;

      public?class?hwserver?{

      private?static?int?i?=?0;

      public?static?void?main(String[]?args)?throws?Exception?{

      ZMQ.Context?context?=?ZMQ.context(1);

      //??Socket?to?talk?to?clients

      ZMQ.Socket?responder?=?context.socket(ZMQ.REP);

      responder.bind("tcp://*:5555");

      while?(!Thread.currentThread().isInterrupted())?{

      //?Wait?for?next?request?from?the?client

      byte[]?request?=?responder.recv(0);

      System.out.println("Received?"?+?new?String(request)?+?i++);

      //?Do?some?'work'

      Thread.sleep(1000);

      //?Send?reply?back?to?client

      String?reply?=?"World";

      responder.send(reply.getBytes(),?0);

      }

      responder.close();

      context.term();

      }

      }

      接收端:

      package?cn.edu.ujn.req_rep;

      //

      //??Hello?World?client?in?Java

      //??Connects?REQ?socket?to?tcp://localhost:5555

      //??Sends?"Hello"?to?server,?expects?"World"?back

      //

      import?org.zeromq.ZMQ;

      public?class?hwclient?{

      public?static?void?main(String[]?args)?{

      ZMQ.Context?context?=?ZMQ.context(1);

      //??Socket?to?talk?to?server

      System.out.println("Connecting?to?hello?world?server…");

      ZMQ.Socket?requester?=?context.socket(ZMQ.REQ);

      requester.connect("tcp://localhost:5555");

      for?(int?requestNbr?=?0;?requestNbr?!=?10;?requestNbr++)?{

      String?request?=?"Hello";

      System.out.println("Sending?Hello?"?+?requestNbr);

      requester.send(request.getBytes(),?0);

      byte[]?reply?=?requester.recv(0);

      System.out.println("Received?"?+?new?String(reply)?+?"?"?+?requestNbr);

      }

      requester.close();

      context.term();

      }

      }

      下面貼出Para_Pipe(基于分布式處理(管道模式))模式下的代碼:

      發送端:

      package?cn.edu.ujn.para_pipe;

      import?java.util.Random;

      import?org.zeromq.ZMQ;

      //

      //??Task?ventilator?in?Java

      //??Binds?PUSH?socket?to?tcp://localhost:5557

      //??Sends?batch?of?tasks?to?workers?via?that?socket

      //

      public?class?taskvent?{

      public?static?void?main?(String[]?args)?throws?Exception?{

      ZMQ.Context?context?=?ZMQ.context(1);

      //??Socket?to?send?messages?on

      ZMQ.Socket?sender?=?context.socket(ZMQ.PUSH);

      sender.bind("tcp://*:5557");

      //??Socket?to?send?messages?on

      ZMQ.Socket?sink?=?context.socket(ZMQ.PUSH);

      sink.connect("tcp://localhost:5558");

      System.out.println("Press?Enter?when?the?workers?are?ready:?");

      System.in.read();

      System.out.println("Sending?tasks?to?workers\n");

      //??The?first?message?is?"0"?and?signals?start?of?batch

      sink.send("0",?0);

      //??Initialize?random?number?generator

      Random?srandom?=?new?Random(System.currentTimeMillis());

      //??Send?100?tasks

      int?task_nbr;

      int?total_msec?=?0;?????//??Total?expected?cost?in?msecs

      分布式進階(十五)ZMQ(分布式進階小冊)

      for?(task_nbr?=?0;?task_nbr?

      int?workload;

      //??Random?workload?from?1?to?100msecs

      workload?=?srandom.nextInt(100)?+?1;

      total_msec?+=?workload;

      System.out.print(workload?+?".");

      String?string?=?String.format("%d",?workload);

      sender.send(string,?0);

      }

      System.out.println("Total?expected?cost:?"?+?total_msec?+?"?msec");

      Thread.sleep(1000);??????????????//??Give?0MQ?time?to?deliver

      sink.close();

      sender.close();

      context.term();

      }

      }

      中介端:

      package?cn.edu.ujn.para_pipe;

      import?org.zeromq.ZMQ;

      //

      //??Task?worker?in?Java

      //??Connects?PULL?socket?to?tcp://localhost:5557

      //??Collects?workloads?from?ventilator?via?that?socket

      //??Connects?PUSH?socket?to?tcp://localhost:5558

      //??Sends?results?to?sink?via?that?socket

      //

      public?class?taskwork?{

      public?static?void?main?(String[]?args)?throws?Exception?{

      ZMQ.Context?context?=?ZMQ.context(1);

      //??Socket?to?receive?messages?on

      ZMQ.Socket?receiver?=?context.socket(ZMQ.PULL);

      receiver.connect("tcp://localhost:5557");

      //??Socket?to?send?messages?to

      ZMQ.Socket?sender?=?context.socket(ZMQ.PUSH);

      sender.connect("tcp://localhost:5558");

      //??Process?tasks?forever

      while?(!Thread.currentThread?().isInterrupted?())?{

      String?string?=?new?String(receiver.recv(0)).trim();

      long?msec?=?Long.parseLong(string);

      //??Simple?progress?indicator?for?the?viewer

      System.out.flush();

      System.out.print(string?+?'.');

      //??Do?the?work

      Thread.sleep(msec);

      //??Send?results?to?sink

      sender.send("".getBytes(),?0);

      }

      sender.close();

      receiver.close();

      context.term();

      }

      }

      接收端:

      package?cn.edu.ujn.para_pipe;

      import?org.zeromq.ZMQ;

      //

      //??Task?sink?in?Java

      //??Binds?PULL?socket?to?tcp://localhost:5558

      //??Collects?results?from?workers?via?that?socket

      //

      public?class?tasksink?{

      public?static?void?main?(String[]?args)?throws?Exception?{

      //??Prepare?our?context?and?socket

      ZMQ.Context?context?=?ZMQ.context(1);

      ZMQ.Socket?receiver?=?context.socket(ZMQ.PULL);

      receiver.bind("tcp://*:5558");

      //??Wait?for?start?of?batch

      String?string?=?new?String(receiver.recv(0));

      //??Start?our?clock?now

      long?tstart?=?System.currentTimeMillis();

      //??Process?100?confirmations

      int?task_nbr;

      int?total_msec?=?0;?????//??Total?calculated?cost?in?msecs

      for?(task_nbr?=?0;?task_nbr?

      string?=?new?String(receiver.recv(0)).trim();

      if?((task_nbr?/?10)?*?10?==?task_nbr)?{

      System.out.print(":");

      }?else?{

      System.out.print(".");

      }

      }

      //??Calculate?and?report?duration?of?batch

      long?tend?=?System.currentTimeMillis();

      System.out.println("\nTotal?elapsed?time:?"?+?(tend?-?tstart)?+?"?msec");

      receiver.close();

      context.term();

      }

      }

      到此為止發布消息的三種模式就寫完了,希望通過這篇博客讀者能夠對ZMQ有初步的認識和簡單實用,希望這篇博客對學習zmq的讀者有所幫助。

      正確地使用上下文

      ZMQ應用程序的一開始總是會先創建一個上下文,并用它來創建套接字。在C語言中,創建上下文的函數是zmq_init()。一個進程中只應該創建一個上下文。從技術的角度來說,上下文是一個容器,包含了該進程下所有的套接字,并為inproc協議提供實現,用以高速連接進程內不同的線程。如果一個進程中創建了兩個上下文,那就相當于啟動了兩個ZMQ實例。如果這正是你需要的,那沒有問題,但一般情況下:

      在一個進程中使用zmq_init()函數創建一個上下文,并在結束時使用zmq_term()函數關閉它。

      如果你使用了fork()系統調用,那每個進程需要自己的上下文對象。如果在調用fork()之前調用了zmq_init()函數,那每個子進程都會有自己的上下文對象。通常情況下,你會需要在子進程中做些有趣的事,而讓父進程來管理它們。

      正確地退出和清理

      程序員的一個良好習慣是:總是在結束時進行清理工作。當你使用像Python那樣的語言編寫ZMQ應用程序時,系統會自動幫你完成清理。但如果使用的是C語言,那就需要小心地處理了,否則可能發生內存泄露、應用程序不穩定等問題。

      內存泄露只是問題之一,其實ZMQ是很在意程序的退出方式的。個中原因比較復雜,但簡單的來說,如果仍有套接字處于打開狀態,調用zmq_term()時會導致程序掛起;就算關閉了所有的套接字,如果仍有消息處于待發送狀態,zmq_term()也會造成程序的等待。只有當套接字的LINGER選項設為0時才能避免。

      我們需要關注的ZMQ對象包括:消息、套接字、上下文。好在內容并不多,至少在一般的應用程序中是這樣:

      處理完消息后,記得用zmq_msg_close()函數關閉消息;

      如果你同時打開或關閉了很多套接字,那可能需要重新規劃一下程序的結構了;

      退出程序時,應該先關閉所有的套接字,最后調用zmq_term()函數,銷毀上下文對象。

      警告:你的想法可能會被顛覆!

      傳統網絡編程的一個規則是套接字只能和一個節點建立連接。雖然也有廣播的協議,但畢竟是第三方的。當我們認定“一個套接字?=?一個連接”的時候,我們會用一些特定的方式來擴展應用程序架構:我們為每一塊邏輯創建線程,該線程獨立地維護一個套接字。

      但在ZMQ的世界里,套接字是智能的、多線程的,能夠自動地維護一組完整的連接。你無法看到它們,甚至不能直接操縱這些連接。當你進行消息的收發、輪詢等操作時,只能和ZMQ套接字打交道,而不是連接本身。所以說,ZMQ世界里的連接是私有的,不對外部開放,這也是ZMQ易于擴展的原因之一。

      由于你的代碼只會和某個套接字進行通信,這樣就可以處理任意多個連接,使用任意一種網絡協議。而ZMQ的消息模式又可以進行更為廉價和便捷的擴展。

      這樣一來,傳統的思維就無法在ZMQ的世界里應用了。在你閱讀示例程序代碼的時候,也許你腦子里會想方設法地將這些代碼和傳統的網絡編程相關聯:當你讀到“套接字”的時候,會認為它就表示與另一個節點的連接——這種想法是錯誤的;當你讀到“線程”時,會認為它是與另一個節點的連接——這也是錯誤的。

      如果你是第一次閱讀本指南,使用ZMQ進行了一兩天的開發(或者更長),可能會覺得疑惑,ZMQ怎么會讓事情便得如此簡單。你再次嘗試用以往的思維去理解ZMQ,但又無功而返。最后,你會被ZMQ的理念所折服,撥云見霧,開始享受ZMQ帶來的樂趣。

      若朋友們有疑問,可留言,以求共進!

      任務調度 分布式

      版權聲明:本文內容由網絡用戶投稿,版權歸原作者所有,本站不擁有其著作權,亦不承擔相應法律責任。如果您發現本站中有涉嫌抄襲或描述失實的內容,請聯系我們jiasou666@gmail.com 處理,核實后本網站將在24小時內刪除侵權內容。

      上一篇:Vue-Router路由快速了解與應用(vue-router路由的作用)
      下一篇:還不懂三次握手四次揮手?看這一篇絕對包你會!(四次揮手只有三個包)
      相關文章
      亚洲国产av高清无码| 亚洲国产成人一区二区精品区 | 国产成人高清亚洲| 亚洲欧美日韩中文无线码 | 伊伊人成亚洲综合人网7777| 日日摸日日碰夜夜爽亚洲| 亚洲色成人网站WWW永久四虎| 亚洲欧洲精品一区二区三区| 亚洲午夜精品一区二区| 亚洲国产二区三区久久| 久久亚洲精品视频| 亚洲成AV人片一区二区密柚| 亚洲婷婷五月综合狠狠爱| 国产精品亚洲mnbav网站| 国产啪亚洲国产精品无码 | 久久精品国产亚洲77777| 久久亚洲私人国产精品vA| 亚洲尹人香蕉网在线视颅| 久久精品a亚洲国产v高清不卡| 亚洲日本在线看片| 亚洲精品国产肉丝袜久久| 亚洲小说区图片区| 麻豆狠色伊人亚洲综合网站| 亚洲六月丁香婷婷综合| 在线亚洲高清揄拍自拍一品区| 亚洲一卡2卡三卡4卡无卡下载| 亚洲色大网站WWW永久网站| 亚洲狠狠婷婷综合久久| 国产精品成人亚洲| 区三区激情福利综合中文字幕在线一区亚洲视频1 | 亚洲s色大片在线观看| 亚洲av午夜福利精品一区| 亚洲AV无码久久精品狠狠爱浪潮| 久久亚洲精品国产精品黑人| 亚洲天堂在线播放| 亚洲一区二区三区无码国产| 亚洲午夜成人精品无码色欲| 国产精品亚洲一区二区三区 | 亚洲色偷偷综合亚洲AV伊人| 国产综合亚洲专区在线| 亚洲国产精久久久久久久|