分布式進階(十五)ZMQ(分布式進階小冊)
我們為什么需要ZMQ
目前的應用程序很多都會包含跨網絡的組件,無論是局域網還是因特網。這些程序的開發者都會用到某種消息通信機制。有些人會使用某種消息隊列產品,而大多數人則會自己手工來做這些事,使用TCP或UDP協議。這些協議使用起來并不困難,但是,簡單地將消息從A發給B,和在任何情況下都能進行可靠的消息傳輸,這兩種情況顯然是不同的。
讓我們看看在使用純TCP協議進行消息傳輸時會遇到的一些典型問題。任何可復用的消息傳輸層肯定或多或少地會要解決以下問題:
如何處理I/O?是讓程序阻塞等待響應,還是在后臺處理這些事?這是軟件設計的關鍵因素。阻塞式的I/O操作會讓程序架構難以擴展,而后臺處理I/O也是比較困難的。
如何處理那些臨時的、來去自由的組件?我們是否要將組件分為客戶端和服務端兩種,并要求服務端永不消失?那如果我們想要將服務端相連怎么辦?我們要每隔幾秒就進行重連嗎?
我們如何表示一條消息?我們怎樣通過拆分消息,讓其變得易讀易寫,不用擔心緩存溢出,既能高效地傳輸小消息,又能勝任視頻等大型文件的傳輸?
如何處理那些不能立刻發送出去的消息?比如我們需要等待一個網絡組件重新連接的時候?我們是直接丟棄該條消息,還是將它存入數據庫,或是內存中的一個隊列?
要在哪里保存消息隊列?如果某個組件讀取消息隊列的速度很慢,造成消息的堆積怎么辦?我們要采取什么樣的策略?
如何處理丟失的消息?我們是等待新的數據,請求重發,還是需要建立一套新的可靠性機制以保證消息不會丟失?如果這個機制自身崩潰了呢?
如果我們想換一種網絡連接協議,如用廣播代替TCP單播?或者改用IPv6?我們是否需要重寫所有的應用程序,或者將這種協議抽象到一個單獨的層中?
我們如何對消息進行路由?我們可以將消息同時發送給多個節點嗎?是否能將應答消息返回給請求的發送方?
我們如何為另一種語言寫一個API?我們是否需要完全重寫某項協議,還是重新打包一個類庫?
怎樣才能做到在不同的架構之間傳送消息?是否需要為消息規定一種編碼?
我們如何處理網絡通信錯誤?等待并重試,還是直接忽略或取消?
ZMQ就是這樣一種軟件:它高效,提供了嵌入式的類庫,使應用程序能夠很好地在網絡中擴展,成本低廉。
ZMQ的主要特點有:
ZMQ會在后臺線程異步地處理I/O操作,它使用一種不會死鎖的數據結構來存儲消息。
網絡組件可以來去自如,ZMQ會負責自動重連,這就意味著你可以以任何順序啟動組件;用它創建的面向服務架構(SOA)中,服務端可以隨意地加入或退出網絡。
ZMQ會在有必要的情況下自動將消息放入隊列中保存,一旦建立了連接就開始發送。
ZMQ有閾值(HWM)的機制,可以避免消息溢出。當隊列已滿,ZMQ會自動阻塞發送者,或丟棄部分消息,這些行為取決于你所使用的消息模式。
ZMQ可以讓你用不同的通信協議進行連接,如TCP、廣播、進程內、進程間。改變通信協議時你不需要去修改代碼。
ZMQ會恰當地處理速度較慢的節點,會根據消息模式使用不同的策略。
ZMQ提供了多種模式進行消息路由,如請求-應答模式、發布-訂閱模式等。這些模式可以用來搭建網絡拓撲結構。
ZMQ中可以根據消息模式建立起一些中間裝置(很小巧),可以用來降低網絡的復雜程度。
ZMQ會發送整個消息,使用消息幀的機制來傳遞。如果你發送了10KB大小的消息,你就會收到10KB大小的消息。
ZMQ不強制使用某種消息格式,消息可以是0字節的,或是大到GB級的數據。當你表示這些消息時,可以選用諸如谷歌的protocol?buffers,XDR等序列化產品。
ZMQ能夠智能地處理網絡錯誤,有時它會進行重試,有時會告知你某項操作發生了錯誤。
ZMQ甚至可以降低對環境的污染,因為節省了CPU時間意味著節省了電能。
其實ZMQ可以做的還不止這些,它會顛覆人們編寫網絡應用程序的模式。雖然從表面上看,它不過是提供了一套處理套接字的API,能夠用zmq_recv()和zmq_send()進行消息的收發,但是,消息處理將成為應用程序的核心部分,很快你的程序就會變成一個個消息處理模塊,這既美觀又自然。它的擴展性還很強,每項任務由一個節點(節點是一個線程)、同一臺機器上的兩個節點(節點是一個進程)、同一網絡上的兩臺機器(節點是一臺機器)來處理,而不需要改動應用程序。
一、ZeroMQ的背景介紹
官方:“ZMQ(以下ZeroMQ簡稱ZMQ)是一個簡單好用的傳輸層,像框架一樣的一個socket?library,他使得Socket編程更加簡單、簡潔和性能更高。是一個消息處理隊列庫,可在多個線程、內核和主機盒之間彈性伸縮。ZMQ的明確目標是“成為標準網絡協議棧的一部分,之后進入Linux內核”。現在還未看到它們的成功。但是,它無疑是極具前景的、并且是人們更加需要的“傳統”BSD套接字之上的一層封裝。ZMQ讓編寫高性能網絡應用程序極為簡單和有趣。”
與其他消息中間件相比,ZMQ并不像是一個傳統意義上的消息隊列服務器,事實上,它也根本不是一個服務器,它更像是一個底層的網絡通訊庫,在Socket?API之上做了一層封裝,將網絡通訊、進程通訊和線程通訊抽象為統一的API接口。
二、ZMQ是什么
閱讀了ZMQ的Guide文檔后,我的理解是,這是個類似于Socket的一系列接口,他跟Socket的區別是:普通的socket是端到端的(1:1的關系),而ZMQ卻是可以N:M?的關系,人們對BSD套接字的了解較多的是點對點的連接,點對點連接需要顯式地建立連接、銷毀連接、選擇協議(TCP/UDP)和處理錯誤等,而ZMQ屏蔽了這些細節,讓你的網絡編程更為簡單。ZMQ用于node與node間的通信,node可以是主機或者是進程。
三、三種模型
參考網址:http://blog.csdn.net/whuqin/article/details/8442919/
a)?應答模式:
使用REQ-REP套接字發送和接受消息是需要遵循一定規律的。客戶端首先使用zmq_send()發送消息,再用zmq_recv()接收,如此循環。如果打亂了這個順序(如連續發送兩次)則會報錯。類似地,服務端必須先進行接收,后進行發送。
b)?訂閱發布模式
PUB-SUB套接字組合是異步的。客戶端在一個循環體中使用recv?()接收消息,如果向SUB套接字發送消息則會報錯;類似地,服務端可以不斷地使用send?()發送消息,但不能再PUB套接字上使用recv?()。
關于PUB-SUB套接字,還有一點需要注意:你無法得知SUB是何時開始接收消息的。就算你先打開了SUB套接字,后打開PUB發送消息,這時SUB還是會丟失一些消息的,因為建立連接是需要一些時間的。很少,但并不是零。解決此問題需要在PUB端加入sleep。
c)?基于分布式處理(管道模式)
這篇博客對ZMQ有一個初步的介紹,下篇博客介紹如何通過JAVA來調用ZMQ實現消息處理。
下面貼出PUB_SUB(應答模式)模式下的代碼:
發布端:
package?cn.edu.ujn.pub_sub;
import?org.zeromq.ZMQ;
import?org.zeromq.ZMQ.Context;
import?org.zeromq.ZMQ.Socket;
/**
*?Pubsub?envelope?publisher
*/
public?class?psenvpub?{
public?static?void?main?(String[]?args)?throws?Exception?{
//?Prepare?our?context?and?publisher
Context?context?=?ZMQ.context(1);
Socket?publisher?=?context.socket(ZMQ.PUB);
publisher.bind("tcp://*:5563");
while?(!Thread.currentThread?().isInterrupted?())?{
//?Write?two?messages,?each?with?an?envelope?and?content
publisher.sendMore?("A");
publisher.send?("We?don't?want?to?see?this");
publisher.sendMore?("B");
publisher.send("We?would?like?to?see?this");
}
publisher.close?();
context.term?();
}
}
發布端需要通過context.socket(ZMQ.PUB)表示為發布端,通過bind方法來創建發布端連接,等待訂閱者連接。
之后通過send方法將數據發送到出去。
之后來寫訂閱端代碼
package?cn.edu.ujn.pub_sub;
import?org.zeromq.ZMQ;
import?org.zeromq.ZMQ.Context;
import?org.zeromq.ZMQ.Socket;
/**
*?Pubsub?envelope?subscriber
*/
public?class?psenvsub?{
public?static?void?main?(String[]?args)?{
//?Prepare?our?context?and?subscriber
Context?context?=?ZMQ.context(1);
Socket?subscriber?=?context.socket(ZMQ.SUB);
subscriber.connect("tcp://localhost:5563");
subscriber.subscribe("B".getBytes());
while?(!Thread.currentThread?().isInterrupted?())?{
//?Read?envelope?with?address
String?address?=?subscriber.recvStr?();
//?Read?message?contents
String?contents?=?subscriber.recvStr?();
System.out.println(address?+?"?:?"?+?contents);
}
subscriber.close?();
context.term?();
}
}
客戶端通過connect進行連接,之后通過recv來進行數據接收。
下面貼出REQ_REP(訂閱發布模式)模式下的代碼:
發送端:
package?cn.edu.ujn.req_rep;
//
//??Hello?World?server?in?Java
//??Binds?REP?socket?to?tcp://*:5555
//??Expects?"Hello"?from?client,?replies?with?"World"
//
import?org.zeromq.ZMQ;
public?class?hwserver?{
private?static?int?i?=?0;
public?static?void?main(String[]?args)?throws?Exception?{
ZMQ.Context?context?=?ZMQ.context(1);
//??Socket?to?talk?to?clients
ZMQ.Socket?responder?=?context.socket(ZMQ.REP);
responder.bind("tcp://*:5555");
while?(!Thread.currentThread().isInterrupted())?{
//?Wait?for?next?request?from?the?client
byte[]?request?=?responder.recv(0);
System.out.println("Received?"?+?new?String(request)?+?i++);
//?Do?some?'work'
Thread.sleep(1000);
//?Send?reply?back?to?client
String?reply?=?"World";
responder.send(reply.getBytes(),?0);
}
responder.close();
context.term();
}
}
接收端:
package?cn.edu.ujn.req_rep;
//
//??Hello?World?client?in?Java
//??Connects?REQ?socket?to?tcp://localhost:5555
//??Sends?"Hello"?to?server,?expects?"World"?back
//
import?org.zeromq.ZMQ;
public?class?hwclient?{
public?static?void?main(String[]?args)?{
ZMQ.Context?context?=?ZMQ.context(1);
//??Socket?to?talk?to?server
System.out.println("Connecting?to?hello?world?server…");
ZMQ.Socket?requester?=?context.socket(ZMQ.REQ);
requester.connect("tcp://localhost:5555");
for?(int?requestNbr?=?0;?requestNbr?!=?10;?requestNbr++)?{
String?request?=?"Hello";
System.out.println("Sending?Hello?"?+?requestNbr);
requester.send(request.getBytes(),?0);
byte[]?reply?=?requester.recv(0);
System.out.println("Received?"?+?new?String(reply)?+?"?"?+?requestNbr);
}
requester.close();
context.term();
}
}
下面貼出Para_Pipe(基于分布式處理(管道模式))模式下的代碼:
發送端:
package?cn.edu.ujn.para_pipe;
import?java.util.Random;
import?org.zeromq.ZMQ;
//
//??Task?ventilator?in?Java
//??Binds?PUSH?socket?to?tcp://localhost:5557
//??Sends?batch?of?tasks?to?workers?via?that?socket
//
public?class?taskvent?{
public?static?void?main?(String[]?args)?throws?Exception?{
ZMQ.Context?context?=?ZMQ.context(1);
//??Socket?to?send?messages?on
ZMQ.Socket?sender?=?context.socket(ZMQ.PUSH);
sender.bind("tcp://*:5557");
//??Socket?to?send?messages?on
ZMQ.Socket?sink?=?context.socket(ZMQ.PUSH);
sink.connect("tcp://localhost:5558");
System.out.println("Press?Enter?when?the?workers?are?ready:?");
System.in.read();
System.out.println("Sending?tasks?to?workers\n");
//??The?first?message?is?"0"?and?signals?start?of?batch
sink.send("0",?0);
//??Initialize?random?number?generator
Random?srandom?=?new?Random(System.currentTimeMillis());
//??Send?100?tasks
int?task_nbr;
int?total_msec?=?0;?????//??Total?expected?cost?in?msecs
for?(task_nbr?=?0;?task_nbr?100;?task_nbr++)?{
int?workload;
//??Random?workload?from?1?to?100msecs
workload?=?srandom.nextInt(100)?+?1;
total_msec?+=?workload;
System.out.print(workload?+?".");
String?string?=?String.format("%d",?workload);
sender.send(string,?0);
}
System.out.println("Total?expected?cost:?"?+?total_msec?+?"?msec");
Thread.sleep(1000);??????????????//??Give?0MQ?time?to?deliver
sink.close();
sender.close();
context.term();
}
}
中介端:
package?cn.edu.ujn.para_pipe;
import?org.zeromq.ZMQ;
//
//??Task?worker?in?Java
//??Connects?PULL?socket?to?tcp://localhost:5557
//??Collects?workloads?from?ventilator?via?that?socket
//??Connects?PUSH?socket?to?tcp://localhost:5558
//??Sends?results?to?sink?via?that?socket
//
public?class?taskwork?{
public?static?void?main?(String[]?args)?throws?Exception?{
ZMQ.Context?context?=?ZMQ.context(1);
//??Socket?to?receive?messages?on
ZMQ.Socket?receiver?=?context.socket(ZMQ.PULL);
receiver.connect("tcp://localhost:5557");
//??Socket?to?send?messages?to
ZMQ.Socket?sender?=?context.socket(ZMQ.PUSH);
sender.connect("tcp://localhost:5558");
//??Process?tasks?forever
while?(!Thread.currentThread?().isInterrupted?())?{
String?string?=?new?String(receiver.recv(0)).trim();
long?msec?=?Long.parseLong(string);
//??Simple?progress?indicator?for?the?viewer
System.out.flush();
System.out.print(string?+?'.');
//??Do?the?work
Thread.sleep(msec);
//??Send?results?to?sink
sender.send("".getBytes(),?0);
}
sender.close();
receiver.close();
context.term();
}
}
接收端:
package?cn.edu.ujn.para_pipe;
import?org.zeromq.ZMQ;
//
//??Task?sink?in?Java
//??Binds?PULL?socket?to?tcp://localhost:5558
//??Collects?results?from?workers?via?that?socket
//
public?class?tasksink?{
public?static?void?main?(String[]?args)?throws?Exception?{
//??Prepare?our?context?and?socket
ZMQ.Context?context?=?ZMQ.context(1);
ZMQ.Socket?receiver?=?context.socket(ZMQ.PULL);
receiver.bind("tcp://*:5558");
//??Wait?for?start?of?batch
String?string?=?new?String(receiver.recv(0));
//??Start?our?clock?now
long?tstart?=?System.currentTimeMillis();
//??Process?100?confirmations
int?task_nbr;
int?total_msec?=?0;?????//??Total?calculated?cost?in?msecs
for?(task_nbr?=?0;?task_nbr?100;?task_nbr++)?{
string?=?new?String(receiver.recv(0)).trim();
if?((task_nbr?/?10)?*?10?==?task_nbr)?{
System.out.print(":");
}?else?{
System.out.print(".");
}
}
//??Calculate?and?report?duration?of?batch
long?tend?=?System.currentTimeMillis();
System.out.println("\nTotal?elapsed?time:?"?+?(tend?-?tstart)?+?"?msec");
receiver.close();
context.term();
}
}
到此為止發布消息的三種模式就寫完了,希望通過這篇博客讀者能夠對ZMQ有初步的認識和簡單實用,希望這篇博客對學習zmq的讀者有所幫助。
正確地使用上下文
ZMQ應用程序的一開始總是會先創建一個上下文,并用它來創建套接字。在C語言中,創建上下文的函數是zmq_init()。一個進程中只應該創建一個上下文。從技術的角度來說,上下文是一個容器,包含了該進程下所有的套接字,并為inproc協議提供實現,用以高速連接進程內不同的線程。如果一個進程中創建了兩個上下文,那就相當于啟動了兩個ZMQ實例。如果這正是你需要的,那沒有問題,但一般情況下:
在一個進程中使用zmq_init()函數創建一個上下文,并在結束時使用zmq_term()函數關閉它。
如果你使用了fork()系統調用,那每個進程需要自己的上下文對象。如果在調用fork()之前調用了zmq_init()函數,那每個子進程都會有自己的上下文對象。通常情況下,你會需要在子進程中做些有趣的事,而讓父進程來管理它們。
正確地退出和清理
程序員的一個良好習慣是:總是在結束時進行清理工作。當你使用像Python那樣的語言編寫ZMQ應用程序時,系統會自動幫你完成清理。但如果使用的是C語言,那就需要小心地處理了,否則可能發生內存泄露、應用程序不穩定等問題。
內存泄露只是問題之一,其實ZMQ是很在意程序的退出方式的。個中原因比較復雜,但簡單的來說,如果仍有套接字處于打開狀態,調用zmq_term()時會導致程序掛起;就算關閉了所有的套接字,如果仍有消息處于待發送狀態,zmq_term()也會造成程序的等待。只有當套接字的LINGER選項設為0時才能避免。
我們需要關注的ZMQ對象包括:消息、套接字、上下文。好在內容并不多,至少在一般的應用程序中是這樣:
處理完消息后,記得用zmq_msg_close()函數關閉消息;
如果你同時打開或關閉了很多套接字,那可能需要重新規劃一下程序的結構了;
退出程序時,應該先關閉所有的套接字,最后調用zmq_term()函數,銷毀上下文對象。
警告:你的想法可能會被顛覆!
傳統網絡編程的一個規則是套接字只能和一個節點建立連接。雖然也有廣播的協議,但畢竟是第三方的。當我們認定“一個套接字?=?一個連接”的時候,我們會用一些特定的方式來擴展應用程序架構:我們為每一塊邏輯創建線程,該線程獨立地維護一個套接字。
但在ZMQ的世界里,套接字是智能的、多線程的,能夠自動地維護一組完整的連接。你無法看到它們,甚至不能直接操縱這些連接。當你進行消息的收發、輪詢等操作時,只能和ZMQ套接字打交道,而不是連接本身。所以說,ZMQ世界里的連接是私有的,不對外部開放,這也是ZMQ易于擴展的原因之一。
由于你的代碼只會和某個套接字進行通信,這樣就可以處理任意多個連接,使用任意一種網絡協議。而ZMQ的消息模式又可以進行更為廉價和便捷的擴展。
這樣一來,傳統的思維就無法在ZMQ的世界里應用了。在你閱讀示例程序代碼的時候,也許你腦子里會想方設法地將這些代碼和傳統的網絡編程相關聯:當你讀到“套接字”的時候,會認為它就表示與另一個節點的連接——這種想法是錯誤的;當你讀到“線程”時,會認為它是與另一個節點的連接——這也是錯誤的。
如果你是第一次閱讀本指南,使用ZMQ進行了一兩天的開發(或者更長),可能會覺得疑惑,ZMQ怎么會讓事情便得如此簡單。你再次嘗試用以往的思維去理解ZMQ,但又無功而返。最后,你會被ZMQ的理念所折服,撥云見霧,開始享受ZMQ帶來的樂趣。
若朋友們有疑問,可留言,以求共進!
任務調度 分布式
版權聲明:本文內容由網絡用戶投稿,版權歸原作者所有,本站不擁有其著作權,亦不承擔相應法律責任。如果您發現本站中有涉嫌抄襲或描述失實的內容,請聯系我們jiasou666@gmail.com 處理,核實后本網站將在24小時內刪除侵權內容。