業界消息總線技術分析-ZeroMQ

      網友投稿 1398 2022-05-29

      1.????? ZeroMQ的設計理念

      引用官方的說法: “ZMQ (以下 ZeroMQ 簡稱 ZMQ)是一個簡單好用的傳輸層,像框架一樣的一個 socket library,他使得 Socket 編程更加簡單、簡潔和性能更高。是一個消息處理隊列庫,可在多個線程、內核和主機盒之間彈性伸縮。ZMQ 的明確目標是“成為標準網絡協議棧的一部分,之后進入 Linux 內核”。現在還未看到它們的成功。但是,它無疑是極具前景的、并且是人們更加需要的“傳統”BSD 套接字之上的一層封裝。ZMQ 讓編寫高性能網絡應用程序極為簡單和有趣。”

      ZeroMQ從設計開始就打算設計成沒有消息代理,其中Zero表示零 broker,同時也表示接近零時延、零管理、零代價、零浪費。所以ZeroMQ采用極簡的方式實現消息隊列的功能,追求性能的極致,用戶基于zeromq可以很快構建消息隊列的基本能力,其他的一些能力也可以用戶自定義實現。其開源協議是I-GPL協議,需要采用動態鏈接的方式才能避免使用后也被開源的風險。

      其架構可以更容易支持低時延與高吞吐。

      2.????? 為什么需要ZeroMQ

      目前很多應用程序由跨越某種網絡的組件組成,不是局域網就是互聯網。那么多的程序員最終都在從事某種消息傳遞。一些開發者使用消息隊列產品,但大多是用TCP或UDP來自己開發。這些協議不難使用,但是從A到B發送少量字節和任何可靠方式的消息傳遞之間是有非常大的區別的。

      讓我們看看當我們開始用原始TCP來連接時面臨的典型問題。任何可重用的消息層都需要解決全部或大部分以下問題:

      我們如何處理I/O?你的程序阻塞嗎,還是在后臺處理I/O?這是設計上的一個關鍵。阻塞I/O創建的架構不能很好擴展。但是后臺I/O要做好是非常困難的。

      我們如何處理動態組件,就是會暫時離開的部分?我們是否要在形式上將組件劃分為“客戶端”和“服務器”,并要求服務器不能消失?那如果我們想要服務器連接服務器怎么辦?我們是否要每隔幾秒就嘗試重新連接?

      我們如何表述線上的消息?我們如何將數據組幀才能讓它易寫易讀,緩沖溢出也很安全,對小型消息也很高效,又能夠勝任關于戴著狂歡帽的跳舞貓的超大視頻?

      無法立刻投遞的消息我們又如何處理?尤其是正當我們等著一個組件回到在線狀態?我們是放棄消息,扔到數據庫里,還是放到一個內存隊列中?

      我們把消息隊列存到哪兒去?如果組件從隊列讀取的速度很慢導致隊列堆積是什么原因?這種情況下我們的對策又是什么?

      丟失的消息我們如何處理?我們是等待新數據,請求重發,還是建造某種可靠性層來保證消息無法丟失?那如果這個層自身崩潰了又怎么辦?

      假設我們要使用不同的網絡傳輸會如何。比如說,用多播來替代TCP單播?或者IPv6?我們需要重寫程序嗎,或者傳輸已經抽象到某個層了嗎?

      我們如何路由消息?我們能將同一消息發送到多個對等點嗎?

      我們如何寫一個用于其它語言的API?我們是重新實現一個線路級協議還是重新打包一個庫?如果是前者,如何保證效率和穩定堆疊?如果是后者,如何保證互用性?

      我們如何表述數據讓它能在不同架構間讀取?我們要對數據類型強制一種特定編碼嗎?這是消息系統的工作嗎,難道不該是更高層的事嗎?

      如何處理網絡錯誤?我們是等待重試,悄然忽略,還是中斷?

      看一個典型的開源項目如HadoopZookeeper,參見src/c/src/zookeeper.c里的C API。當此文寫作時,2010年,已有3200行神秘代碼,里面有個未公開的客戶端服務器網絡通信協議。我明白它很有效率因為使用了poll()而不是select()。但實際上,Zookeeper應該使用一個通用的消息層和顯式公開的線路級協議。對于團隊來說要一遍一遍的建造這個獨特的輪子真是個驚人的浪費。

      但是如何制作可重用消息層?為何當那么多項目需要這項技術,人們還是在用困難的辦法,通過在代碼中驅使TCP套接字,并解決著那個長長列表中的難題,一遍一遍?

      圖1 – 開始時的消息傳遞Messaging as it Starts

      事實證明建造可重用消息傳遞系統真的很難,這就是為何只有少數FOSS(自由開源軟件)嘗試過,而商業的消息傳遞產品為何復雜、昂貴、僵化、脆弱。2006年iMatix公司設計了AMQP(高級消息隊列協議),它開始給予FOSS開發者或許是第一個消息傳遞系統的可重用處方。AMQP比很多其它設計都工作的更好,但還是相對復雜、昂貴、脆弱。需要花數周時間來學習使用,數月時間才能創造出當事情變得復雜時不至于崩潰的穩定架構。

      大部分消息傳遞項目,例如AMQP,在嘗試以可重用方式解決這個長列表中的難題時,是通過發明一個新概念,“中介”,來做尋址、路由、和隊列。這導致了一個客戶端服務器協議或者一組API構建在一些未公開協議之上,來讓程序與中介交談。中介在減少大型網絡復雜度方面是非常出色的。但是將基于中介的消息傳遞添加到產品例如Zookeeper將使它更糟,而不是更好。這將意味著添加一個額外的大型機,和一個新的單一故障點。中介迅速的成為一個瓶頸和一個新的管理風險。如果軟件支持,我們能添加第二、第三、第四個中介,還能做一些容錯方案。人們這么做著。創建了更多的移動部件、更多復雜度、更多故障。

      并且以中介為中心的模式需要它自己的操作團隊。你真的需要日夜觀察著中介,當它行為不當時用棍子抽打。你需要機子,還需要備份的機子,還有管理這些機子的人。只有在做有很多移動部件、多個團隊人員建造的、跨越多年的大型程序時才值得這么做。

      所以中小型程序開發者被困住了。要么他們避免網絡編程,去做無需縮放的整體應用。要么他們跳入網絡編程去做脆弱、難以維護的復雜程序。要么他們下賭注在一個消息傳遞產品上,最終可擴展性程序基于昂貴、易碎的技術。真沒有什么好的選擇,這也許是為何消息傳遞在上世紀死死卡住并激起強烈情緒。對于用戶來說是負面的,對于依靠支持和授權來盈利的人則是興奮而愉悅的。

      圖 2 – 消息傳遞變成了Messaging as it Becomes

      我們需要的是能做消息傳遞但方式上如此簡單而廉價,它能夠以接近零的成本,工作在任何程序中。它應該是一個只需要鏈接的函數庫,無需任何其它依賴。沒有附加的移動部件,所以也沒有附加的風險。應當能運行在任何操作系統和任何編程語言。

      而這就是?MQ:一個高效的、可嵌入庫,解決了讓一個程序變得富有彈性的跨過網絡的大部分難題,成本不高。

      特別是:

      它在后臺線程異步的處理I/O。這些后臺線程使用無鎖數據結構與程序線程交流,所以并發?MQ程序不需要鎖、信號量、或其它等待狀態。

      組件可以動態的來來去去,而?MQ會自動重連。這意味著你可以按任意順序啟動組件。你可以創建“面向服務架構”(SOAs),服務可以隨時加入和離開網絡。

      當需要時它自動將消息排入隊列。以智能的方式,消息排入隊列前推送消息到盡可能靠近接收者。

      它有幾種辦法處理滿溢隊列(稱為“高水位線”)。當隊列填滿時,?MQ自動阻塞發送者,或丟棄消息,取決于你用的消息傳遞方式(所謂的“模式”)。

      它讓你的程序用任意傳輸方式來相互交談:TCP、多播、進程內、進程間。更改傳輸方式時無需更改代碼。

      安全處理低速/阻塞的讀者,使用的是取決于消息傳遞模式的不同策略。

      它讓你路由消息使用各種模式如請求-應答和發布-訂閱。這些模式是你創建拓撲、網絡結構的方式。

      它讓你用一個調用就能創建代理來做隊列、轉發、或捕獲消息。代理可以降低網絡的互聯復雜度。

      它使用簡單的線上組幀,轉發整個消息并精確重現其發送時的樣子。如果你寫入一個10K的消息,就能接收一個10K的消息。

      它不在消息上強加任何格式。消息就是零到千兆大小的二進制大對象。想要描述數據時你可以在其上選擇一些其它產品,例如谷歌的協議緩沖(protocol buffers)、外部數據表示法(XDR)、或其它。

      它智能的處理網絡錯誤。有時它會重試,有時它告知你一個操作失敗了。

      它減少你的碳排放。用更低的CPU消耗做更多事意味著你的機子使用了更少的能源,并且可以讓你的舊機器使用的更久。阿爾·戈爾會很喜歡?MQ。

      事實上?MQ做的比這更多。對于如何開發支持網絡的程序方面具有顛覆性效果。表面上它是一個受到套接字啟發的API,你通過它做zmq_msg_recv()和zmq_msg_send()。但消息處理很快變成了中心循環,而你的程序馬上分解成一組消息處理任務。優雅而自然。并擴展著:每個任務映射到一個節點,節點們通過任意傳輸方式相互交談。進程內的兩個節點(節點是線程),機子上的兩個節點(節點是進程),或網絡上的兩臺機子(節點是機子)——都是一樣的,程序代碼沒有變化。

      3.????? 應用場景

      ZeroMQ 是一個非常輕量級的消息系統,專門為高吞吐量/低延遲的場景開發,在金融界的應用中經常可以發現它。與RabbitMQ相比,ZeroMQ支持許多高級消 息場景,但是你必須實現ZeroMQ框架中的各個塊(比如Socket或Device等)。

      也可以很輕松應用到其他需要消息通訊的場景例如:應用ZeroMQ的Push-Pull模型實現聯眾游戲服務器的“熱插拔”、負載均衡和消息派發。按照如圖3部署服務器,Push端充當Gateway,作為一組游戲服務器集群最上層的一個Proxy,起負載均衡的作用,所有Gameserver作為Pull端。當一個請求到達Push端(Gateway)時,Push端根據一定的分配策略將任務派發到Pull端(Gameserver)。以聯眾某款游戲A為例,游戲A剛上線時,預計最大同時在線人數是10W,單臺Gameserver并發處理能力為1W,需要10臺Gameserver,由于游戲A可玩性非常好,半個月后最大同時在線人數暴增到50W,那么不需要在某天的凌晨將Gateway和Gameserver停機,只需要隨時在機房新添加40臺Gameserver,啟動并連接到Gateway即可。

      ZeroMQ中對Client和Server的啟動順序沒有要求,Gameserver之間如果需要通信的話,Gameserver的應用層不需要管理這些細節,ZeroMQ已經做了重連處理。

      因為沒有Broker,ZeroMQ不太適合削峰填谷、消息堆積等場景

      4.????? 總體架構圖

      zeromq幾乎所有I/O操作都是異步的,每個zmq i/o?線程(與實際線程不同)都有與之綁定的Poller,Poller采用經典的Reactor模式實現,Poller根據不同操作系統平臺使用不同的網絡I/O模型(select、poll、epoll、devpoll、kequeue等)。在zeromq中,zmq_socket也被看成是一個zmq io線程。每個線程內含一個信箱,用于線程與線程間傳遞命令(后面會詳細講),在創建zmq io線程時,會把信箱句柄加到Poller中,用于監聽是否有命令到達。當client端開始發起連接或者server端開始監聽時,會在主線程創建zmq_connector或者zmq_listener,主線程使用zmq_socket的mailbox發送命令給io線程,將其綁定到io線程中,io線程會把zmq_connector或者zmq_listener含有的句柄加入Poller中,以偵聽讀寫事件。Client端與Server端都是通過Session來管理連接和通信,一個session代表一次會話,每個Session都會關聯到相應的讀/寫管道,?主線程收發消息只是分別從管道中讀/寫數據。Session并不實際跟kernel交換I/O數據,而是通過plugin到Session中的Engine來與kernel交換I/O數據。

      圖4.2 基本流程圖

      5.????? 線性模型

      zeromq的線程分為一個主線程(用戶線程),一個回收線程,以及若干io線程。每一條線程都擁有一個mailbox_t用于接收命令。

      從操作系統來看,ZMQ中只有兩種線程,應用線程和I/O線程。應用線程在ZMQ外部創建,訪問ZMQ的API。I / O線程在ZMQ內部創建,用于在后臺發送和接收消息。thread_t是系統級線程的抽象,可以以OS無關的方式創建線程。

      而從ZMQ的觀點來看,線程只是一個擁有郵箱(mailbox_t)的對象。郵箱存儲發送給居住在當前線程上所有對象的信件(命令command_t),所有這些對象公用線程上的郵箱。線程從郵箱中按序獲取命令并交給其上的對象進行處理。

      目前ZMQ內部使用兩種不同類型的線程(擁有郵箱的對象):I/O線程(io_thread_t)和socket(socket_base_t)。

      I / O線程很容易理解,每個?I/O?線程與一個系統級線程一一對應。I / O線程運行在自己的系統線程上,并且擁有獨立的獲取命令的郵箱。

      socket在某種程度上顯得復雜一些。每個ZMQ?socket擁有自己的接收命令的郵箱,因此socket可被ZMQ視為分離線程。而實際上,一個應用程序線程可以創建多個套接字,也就是說多個ZMQ?socket被映射到同一個系統線程。更加復雜的是,ZMQ socket可以在系統線程之間遷移。例如,Java語言綁定可以在單線程中使用ZMQ socket,而當線程結束時,ZMQ socket會傳遞給垃圾回收線程,并在垃圾回收線程上銷毀。

      5.1???? I/O線程

      I / O線程(io_thread_t)是ZMQ異步處理網絡IO的后臺線程。它的實現非常簡潔。io_thread_t實現繼承object_t?,并實現?i_poll_events?接口,其內部包含一個郵箱(mailbox_t)和一個poller對象(poller_t)。

      繼承object_t使得io_thread_t能夠發送和接收command(如?stop?命令,當收到該命令時,I / O線程將被終止)。

      i_poll_events?接口定義了文件描述符和計時器事件就緒時的回調處理函數(in_event/out_event/timer_event)。io_thread_t?實現此接口(in_event)來處理來自mailbox的事件。當mailbox_t事件觸發時,io線程從mailbox中獲取命令,并讓命令的接收者進行處理。

      mailbox_t?用來存儲發送給任何居住在io_thread_t?上的object_t?的命令,每個io_thread_t?上有多個對象,這些對象公用同一個郵箱,郵箱的收件人就是對象。mailbox_t本質是一個具有就緒通知功能的存儲命令的隊列。就緒通知機制由signaler_t提供的文件描述符實現。隊列是由ypipe_t實現的無鎖無溢出隊列。

      poller_t?是從不同操作系統提供的事件通知機制中抽象出來的概念,用來通知描述符和計時器事件,poller_t?通過typedef定義為操作系統首選的通知機制(select_t/poll_t/epoll_t?等)。所有運行在?io_thread_t上的對象都繼承自輔助類?io_object_t,該類實現了向io_thread_t注冊/刪除文件描述符?(add_fd/rm_fd)和計時器(add_timer/cancel_timer)事件的功能,同時io_object_t?還繼承了?i_poll_events?接口來實現事件回調功能。

      圖3 – io_thread_t示意圖

      5.2???? 回收線程

      上一小節描述了與銷毀相關的機制。而銷毀任何一個指定的對象(包括socket)消耗的時間是不確定的。然而,我們希望close有類似于POSIX的行為:當關閉TCP套接字時,即使在后臺還有沒有完全發出的數據,調用也會立即返回。

      所以,應用程序線程調用?close?時,ZMQ應該關閉對應的套接字,但是,我們不能依賴應用線程來完成socket子對象的銷毀(銷毀可能需要多次命令交互)。同時應用線程調用zmq_close以后不會在繼續使用該socket,甚至可能永遠不會再調用?ZMQ庫函數。因此,ZMQ socket應該從應用線程遷移到一個工作線程來處理銷毀的邏輯。一個可能的解決方案是將socket遷移到某個后臺的I/O線程上去,然而ZMQ可以初始化為具有零個I / O線程(適用于只在進程間通信的情況),因此,我們需要一個專門的回收線程來執行銷毀任務。

      回收線程由類reaper_t實現。socket通過(send_reap)向回收線程發送回收命令,回收線程收到命令后會將socket從應用線程遷移到回收線程上,這樣socket就可以在回收線程上處理命令(term/term_ack),直到socket的所有子對象都成功銷毀時,socket就會在回收線程上銷毀。實際上回收線程只是待回收對象駐留的線程,對象的處理邏輯仍然由對象自身處理。

      6.????? 特性:

      6.1???? 消息分配

      對于小型的消息,拷貝操作比內存分配要經濟的多。只要有需要,完全不分配新的內存塊而直接把消息拷貝到預分配好的內存塊上,這么做是有道理的。另一方面,對于大型的消息,拷貝操作比內存分配的開銷又要昂貴的多。為消息體分配一次內存,然后傳遞指向分配塊的指針,而不是拷貝整個數據。這種方式被稱為“零拷貝”。

      業界消息總線技術分析-ZeroMQ

      ?MQ以透明的方式同時處理這兩種情況。一條?MQ消息由一個不透明的句柄來表示。對于非常短小的消息,其內容被直接編碼到句柄中。因此,對句柄的拷貝實際上就是對消息數據的拷貝。當遇到較大的消息時,它被分配到一個單獨的緩沖區內,而句柄只包含一個指向緩沖區的指針。對句柄的拷貝并不會造成對消息數據的拷貝,當消息有數兆字節長時,這么處理是很有道理的(圖6-1)。需要提醒的是,后一種情況里緩沖區是按引用計數的,因此可以做到被多個句柄引用而不必拷貝數據。

      圖6-1– 針對不同消息不同處理方式

      6.2? ??批量處理

      前面已經提到過,在消息通信系統中,系統調用的數量太多的話會導致出現性能瓶頸。實際上,這個問題絕非一般。當需要遍歷調用棧時會有不小的性能損失,因此,明智的做法是,當創建高性能的應用時應該盡可能多的去避免遍歷調用棧。

      參見圖6.2,為了發送4條消息,你不得不遍歷整個網絡協議棧4次(也就是,?MQ、glibc、用戶/內核空間邊界、TCP實現、IP實現、以太網鏈路層、網卡本身,然后反過來再來一次)。

      圖6.2??發送4條消息

      但是,如果你決定將這些消息集合到一起成為一個單獨的批次,那么就只需要遍歷一次調用棧了(圖6.2.2)。這種處理方式對消息吞吐量的影響是巨大的:可大至2個數量級,尤其是如果消息都比較短小,數百個這樣的短消息才能包裝成一個批次。

      圖6.2.2??批量處理消息

      另一方面,批量處理會對時延帶來負面影響。我們來分析一下,比如,TCP實現中著名的Nagle算法。它為待發出的消息延遲一定的時間,然后將所有的數據合并成一個單獨的數據包。顯然,數據包中的第一條消息,其端到端的時延要比最后一條消息嚴重的多。因此,如果應用程序需要持續的低時延的話,常見做法是將Nagle算法關閉。更常見的是取消整個調用棧層次上的批量處理(比如,網卡的中斷匯聚功能)。

      但同樣,不做批量處理就意味著需要大量穿越整個調用棧,這會導致消息吞吐量降低。似乎我們被困在吞吐量和時延的兩難境地中了。

      ?MQ嘗試采用以下策略來提供一致性的低時延和高吞吐量。當消息流比較稀疏,不超過網絡協議棧的帶寬時,?MQ關閉所有的批量處理以改善時延。這里的權衡是CPU的使用率會變得略高——我們仍然需要經常穿越整個調用棧。但是在大多數情況下,這并不是個問題。

      當消息的速率超過網絡協議棧的帶寬時,消息就必須進行排隊處理了——保存在內存中直到協議棧準備好接收它們。排隊處理就意味著時延的上升。如果消息在隊列中要花費1秒時間,端到端的時延就至少會達到1秒。更糟糕的是,隨著隊列長度的增長,時延會顯著提升。如果隊列的長度沒有限制的話,時延就會超過任何限定值。

      據觀察,即使調整網絡協議棧以追求最低的時延(關閉Nagle算法,關閉網卡中斷匯聚功能,等等),由于受前文所述的隊列的影響,時延仍然會比較高。在這種情況下,積極的采取批量化處理是有意義的。反正時延已經比較高了,也沒什么好顧慮的了。另一方面,積極的采用批量處理能夠提高吞吐量,而且可以清空隊列中等待的消息——這反過來又意味著時延將逐步降低,因為正是排隊才造成了時延的上升。一旦隊列中沒有未發送的消息了,就可以關閉批量處理,進一步的改善時延。

      我們觀察到批量處理只應該在最高層進行,這是需要額外注意的一點。如果消息在最高層匯聚為批次,在低層次上就沒什么可做批量處理的了,而且所有低層次的批量處理算法除了會增加總體時延外什么都沒做。 我們從中學到了:在一個異步系統中,要獲得最佳的吞吐量和響應時間,需要在調用棧的底層關閉批量處理算法,而在高層開啟。僅在新數據到達的速率快于它們被處理的速率時才做批量處理。

      6.3???? 并發模型

      ?MQ需要充分利用多核的優勢,換句話說就是隨著CPU核心數的增長能夠線性的擴展吞吐量。以我們之前對消息通信系統的經驗表明,采用經典的多線程方式(臨界區、信號量等等)并不會使性能得到較大提升。事實上,就算是在多核環境下,一個多線程版的消息通信系統可能會比一個單線程的版本還要慢。有太多時間都花在等待其他線程上了,同時,引入了大量的上下文切換拖慢了整個系統。

      針對這些問題,我們決定采用一種不同的模型。目標是完全避免鎖機制,并讓每個線程能夠全速運行。線程間的通信是通過在線程間傳遞異步消息(事件)來實現的。內行人都應該知道,這就是經典的actor模式。

      事實證明,要以一種清晰的方式關閉一個全異步的系統是一個相當復雜的任務。試圖關閉一個有著上千個運轉著的部分的系統,其中有的正在工作中,有的處于空閑狀態,有的正在初始化過程中,有的已經自行關閉了,此時極易出現各種競態條件、資源泄露等諸如此類的情況。?MQ中最為復雜的部分肯定就是這個關閉子系統了。快速檢查一下bug跟蹤系統的記錄顯示,約30%到50%的bug都同關閉有某種聯系。

      我們從中學到的是:當要追求極端的性能和可擴展性時,考慮采用actor模型。在這種情況下這幾乎是你唯一的選擇。不過,如果不使用像Erlang或者?MQ這種專門的系統,你將不得不手工編寫并調試大量的基礎組件。此外,從一開始就要好好思考關于系統關閉的步驟。這將是代碼中最為復雜的部分,而如果你沒有清晰的思路該如何實現它,你可能應該重新考慮在一開始就使用actor模型。

      6.4? ??無鎖隊列

      最近比較流行使用無鎖算法。它們是用于線程間通信的一種簡單機制,同時并不會依賴于操作系統內核提供的同步原語,如互斥鎖和信號量。相反,它們通過使用CPU原子操作來實現同步,比如原子化的CAS指令(比較并交換)。我們應該理解清楚的是它們并不是字面意義上的無鎖——相反,鎖機制是在硬件層面實現的。

      首先,每個隊列只有一個寫線程,也只有一個讀線程。如果有1對多的通信需求,那么就創建多個隊列(圖8)。鑒于采用這種方式時隊列不需要考慮對寫線程和讀線程的同步(只有一個寫線程,也只有一個讀線程),因此可以以非常高效的方式來實現。

      圖8 隊列

      其次,盡管我們意識到無鎖算法要比傳統的基于互斥鎖的算法更加高效,CPU的原子操作開銷仍然非常高昂(尤其是當CPU核心之間有競爭時),對每條消息的讀或者寫都采用原子操作的話,效率將低于我們所能接受的水平。

      提高速度的方法——再次采用批量處理。假設你有10條消息要寫入到隊列。比如,可能會出現當你收到一個網絡數據包時里面包含有10條小型的消息的情況。由于接收數據包是一個原子事件,你不能只接收一半,因此這個原子事件導致需要寫10條消息到無鎖隊列中。那么對每條消息都采用一次原子操作就顯得沒什么道理了。相反,你可以讓寫線程擁有一塊自己獨占的“預寫”區域,讓它先把消息都寫到這里,然后再用一次單獨的原子操作,整體刷入隊列。

      同樣的方法也適用于從隊列中讀取消息。假設上面提到的10條消息已經刷新到隊列中了。讀線程可以對每條消息采用一個原子操作來讀取,但是,這種做法過于重量級了。相反,讀線程可以將所有待讀取的消息用一個單獨的原子操作移動到隊列的“預讀取”部分。之后就可以從“預讀”緩存中一條一條的讀取消息了。“預讀取”部分只能由讀線程單獨訪問,因此這里沒有什么所謂的同步需求。

      圖9中左邊的箭頭展示了如何通過簡單地修改一個指針來將預寫入緩存刷新到隊列中的。右邊的箭頭展示了隊列的整個內容是如何通過修改另一個指針來移動到預讀緩存中的。

      圖9 無鎖隊列

      我們從中學到的是:發明新的無鎖算法是很困難的,而且實現起來很麻煩,幾乎不可能對其調試。如果可能的話,可以使用現有的成熟算法而不是自己來發明輪子。當需要追求極度的性能時,不要只依靠無鎖算法。雖然它們的速度很快,但可以在其之上通過智能化的批量處理來顯著提高性能

      6.5? ??多種通訊模式

      支持多種的socket類型,通過這些socket類型可以組合成多種通訊模式,其中的socket類型有:

      Socket類型

      說明

      REQ

      請求類型socket,只允許send/recv交替使用

      REP

      響應類型socket, 只允許recv/send交替使用

      PUB

      發布類型socket,只能發布消息無法接收消息

      SUB

      訂閱類型socket,只能訂閱消息無法發送消息

      DEALER

      擴展請求/響應模式的高級模式。每個消息按照round-robin方式發送,按照fair-queue方式接收

      ROUTER

      擴展請求/響應模式的高級模式。按照fair-queue方式接收數據,根據消息目的地進行路由。

      PUSH

      推送類型socket,單向發送消息

      PULL

      拉取類型socket,單向消費消息,可以支持并發消費。

      Request-reply 是 ZeroMQ 提供的最常用的消息傳遞模式之一。在這種模式下,客戶端進程發起請求,服務器端進程接受請求并返回響應給客戶端。客戶端和服務器端進程都可以有多個。

      圖10: 請求響應模式

      清單 1 和清單 2 實現了一個簡單的“請求-應答”應用的服務器端和客戶端。在 HelloWorldServer.py 中,我們首先創建了一個 socket 對象,將它綁定到一個特定的地址。一旦接受到客戶端的請求,就發送內容為”World”的回復。

      從表面上看這種風格與傳統的 socket 十分相似,但實際上它們有重大的差別。首先,ZeroMQ 的 socket 是面向消息的,我們從 socket 里直接獲得消息字符串,而非字節流,發送亦然。其次,開發者無需關心負責底層通訊的連接的管理,這種連接可能是傳統的 socket 連接,也可能基于其他協議。這些底層連接的創建,銷毀,重連以及它如何確保消息被有效的發送,都由 ZeroMQ 負責管理。最后,ZeroMQ 的 socket 之間的連接不受任何限制,而傳統的 socket 之間往往無法建立多對多的連接。因此,ZeroMQ 的 socket 可以被看作一個功能完善的消息隊列。

      REQ 類型的 socket 通常被用來發送請求,并且只有在收到第一個請求的回復之后,才能發送第二個請求。在 HelloWorldClient.py 中該 REQ 類型的 socket 只連接到了一個地址,但它也可以連接多個地址。在這種情況下,ZeroMQ 將確保消息被均勻的發送給每個地址,但每次只有一個地址會受到請求。REP 類型的 socket 用于接受請求。它必須在發送第一個請求的回復之后才能接受第二個請求。尚未來得及處理的請求按順序被置于隊列中。

      REQ 和 REP 類型的 socket 在消息發送和接受的操作序列上存在嚴格限制,為了應對更復雜的情況,ZeroMQ 也提供了更為靈活的 socket 類型,這就是 DEALER 和 ROUTER。

      DEALER 和 REQ 的區別在于,它可以按照任意的次序執行發送消息和接受消息的操作,而不必等待上一個請求的回復。同樣,ROUTER 也不必等待發送上一次請求的響應完成就能接受第二個請求。此外,ROUTER 會為請求加上標識以記錄最初請求者的身份。這樣一來它可以將該請求發送給其他進程處理,得到返回結果后,仍可以根據消息中的身份標識將該請求準確的返回給最初請求者。因此 ROUTER 和 DEALER 可以被用來實現類似于傳統消息隊列架構中的消息服務器的進程。

      Publish-subscribe 是用于廣播消息的模式,在這種模式下發布的消息將同時發送給多個節點。它包含 PUB 和 SUB 兩種 socket 類型。與 Request-reply 不同,PUB 和 SUB 都只能進行單向的消息傳遞。PUB 只能發送消息,而 SUB 只能接受消息。

      圖11: 發布訂閱模式

      清單 3,清單 4 是一個簡單的 Publish-subscribe 模式的實現。從中我們可以看到,作為消息訂閱者的 syncsub.py,將一個 SUB 類型 socket 綁定到‘tcp://localhost:5561’,這代表了一個單一的地址。而作為消息發布者的 syncpub.py,將一個 PUB 類型的 socket 綁定到‘tcp://*:5561’,這實際上匹配了多個地址。也就是說,凡是綁定到符合格式‘tcp://*:5561’地址的任何 SUB 類型的 socket 都可以接收到該 PUB 進程發布的消息。

      在這一實現中,我們還使用 REQ 和 REP 類型的 socket 對 SUB 和 PUB 進程進行了同步,僅當出現 10 個 SUB 進程時,PUB 進程才會開始發送消息。

      Pipeline 模式通常用于實現工作流的概念,每個進程負責整個處理流程中的一個步驟。每個步驟接受上一步的處理結果,并將自己的處理結果傳遞給下一步。每一步可以有多個備選進程。它包含 PUSH 和 PULL 兩種 socket 類型。與 PUB 和 SUB 類型的 socket 類似,這兩種 socket 都只能做單向消息傳遞,PUSH 只能發送消息,PULL 只能接受消息。因此通常一個進程需要同時包含這兩種類型的 socket。此外,與 PUB 不同的是,PUSH 只會將消息發送給單個 PULL 節點。如下圖:

      圖12: 并行PipeLine模式

      這種模式僅適用于兩個特定節點之間互相傳遞消息的場合。不要和正常的socket對混淆。

      7.???? 性能分析

      目前,市面上類似的產品不少,主要有4種:MSMQ(微軟產品)、ActiveMQ(Java)、RabbitMQ(Erlang)、ZeroMQ(C++)。除ZeroMQ外,其它3款產品都是一個單獨服務或者進程,需要單獨安裝和運行,且對環境有一定依賴。其中,MSMQ在非Windows平臺下安裝非常復雜,ActiveMQ需要目標機器上已經安裝了Java,RabbitMQ需要Erlang環境。而ZeroMQ是以庫的形式存在,由應用程序加載、運行即可。但是ZeroMQ僅提供非持久性的消息隊列。

      圖7是來自于Internet的性能測試數據。顯示的是每秒鐘發送和接受的消息數。整個過程共產生1百萬條1K的消息,測試環境為Windows Vista。從測試數據可以看出,ZeroMQ的性能遠遠高于其它3個MQ。

      但是測試數據僅供參考,因為缺少必須的環境參數和性能指標,比如:CPU參數、內存參數、消息模型、通信協議、極限時消耗CPU百分比、極限時消耗內存百分比等。

      8.????? 橫向對比

      ActiveMQ

      RabbitMQ

      RocketMq

      ZeroMQ

      Kafka

      關注度

      成熟度

      成熟

      成熟

      比較成熟

      成熟

      比較成熟

      所屬社區/公司

      Apache

      Mozilla

      Public

      License

      Alibaba

      iMatix

      LinkedIn

      社區活躍度

      文檔

      特點

      功能齊全,被大量開源項目使用

      由于Erlang?語言的并發能力,性能很好

      各個環節分布式擴展設計,主從?HA;支持上萬個隊列;多種消費模式;性能很好

      低延時,高性能,最高?43萬條消息每秒

      1、快速持久化,O(1)系統開銷。

      2、高吞吐,高性能。

      3、完全的分布式系統。

      授權方式

      開源

      開源

      開源

      IGPL

      Apache License

      開發語言

      Java

      Erlang

      Java

      C++

      Scala

      支持的協議

      OpenWire、

      STOMP、

      REST、XMPP、

      AMQP

      AMQP

      自己定義的一

      套(社區提供

      JMS--不成熟)

      TCP、UDP、IPC、廣播

      TCP

      客戶端支持語言

      Java、C、

      C++、

      Python、

      PHP、

      Perl、.net?等

      Java、C、

      C++、

      Python、?PHP、Perl?等

      Java

      C++(不成熟)

      python、?java、?php、.net?等

      Java、Scala

      持久化

      內存、文件、數據庫

      內存、文件

      磁盤文件

      在消息發送端保存

      磁盤文件

      事務

      支持

      不支持

      支持

      不支持

      不支持

      集群

      支持

      支持

      支持

      不支持

      支持

      負載均衡

      支持

      支持

      支持

      不支持

      不支持

      9.????? 總結

      1)???? 簡單

      1、僅僅提供24個API接口,風格類似于BSD Socket。

      2、處理了網絡異常,包括連接異常中斷、重連等。

      3、改變TCP基于字節流收發數據的方式,處理了粘包、半包等問題,以msg為單位收發數據,結合Protocol Buffers,可以對應用層徹底屏蔽網絡通信層。

      4、對大數據通過SENDMORE/RECVMORE提供分包收發機制。

      5、通過線程間數據流動來保證同一時刻任何數據都只會被一個線程持有,以此實現多線程的“去鎖化”。

      6、通過高水位HWM來控制流量,用交換SWAP來轉儲內存數據,彌補HWM丟失數據的缺陷。

      7、服務器端和客戶端的啟動沒有先后順序。

      2)???? 靈活

      1、支持多種通信協議,可以靈活地適應多種通信環境,包括進程內、進程間、機器間、廣播。

      2、支持多種消息模型,消息模型之間可以相互組合,形成特定的解決方案。

      3)???? 跨平臺

      支持Linux、Windows、OS X等。

      4)???? 多語言

      可以綁定C、C++、Java、.NET、Python等30多種開發語言。

      5)???? 高性能

      相對同類產品,性能卓越。

      10.????? 參考資料

      ZeroMQ官網:

      http://zguide.zeromq.org/page:all

      ZeroMQ:

      http://blog.codingnow.com/2011/02/zeromq_message_patterns.html

      zeromq源碼分析筆記之架構(1):

      http://www.cnblogs.com/zengzy/archive/2016/01/13/5122634.html

      From Kafka to ZeroMQ for real-time log aggregation:

      http://www.tuicool.com/articles/aiuIja6

      分布式消息服務

      版權聲明:本文內容由網絡用戶投稿,版權歸原作者所有,本站不擁有其著作權,亦不承擔相應法律責任。如果您發現本站中有涉嫌抄襲或描述失實的內容,請聯系我們jiasou666@gmail.com 處理,核實后本網站將在24小時內刪除侵權內容。

      上一篇:MVCC:聽說有人好奇我的底層實現
      下一篇:物聯網網關開發:基于MQTT消息總線的設計過程丨【拜托了,物聯網!】
      相關文章
      国产99在线|亚洲| 亚洲人成网站在线播放影院在线| 91亚洲精品第一综合不卡播放| 国产亚洲色婷婷久久99精品91| 午夜在线亚洲男人午在线| 亚洲综合精品成人| 亚洲激情视频图片| 亚洲激情视频图片| 亚洲色中文字幕在线播放| 在线综合亚洲欧洲综合网站| 亚洲av永久无码精品三区在线4| 亚洲六月丁香六月婷婷蜜芽| 亚洲国产精品成人精品软件| 亚洲国产成人久久77| 亚洲av乱码一区二区三区香蕉| 亚洲人成电影在线观看青青| 色婷五月综激情亚洲综合| 国产成人亚洲合集青青草原精品| 2020国产精品亚洲综合网 | 亚洲日产2021三区在线 | 亚洲国产精品自产在线播放| www亚洲一级视频com| 亚洲精品国产成人影院| 精品国产香蕉伊思人在线在线亚洲一区二区 | 亚洲色欲一区二区三区在线观看 | 亚洲免费观看视频| 国产AV无码专区亚洲精品| 欧洲亚洲国产清在高| 亚洲AV成人片色在线观看 | 男人的天堂av亚洲一区2区| 亚洲片一区二区三区| 亚洲色偷偷综合亚洲AVYP| 亚洲AV无码专区电影在线观看| 亚洲欧洲国产日韩精品| 亚洲日韩中文字幕| 亚洲精品一卡2卡3卡四卡乱码| 亚洲av高清在线观看一区二区| 国产亚洲精品不卡在线| 亚洲AV永久无码精品一百度影院| 久久精品亚洲精品国产色婷| 亚洲xxxxxx|