RabbitMQ實(shí)踐(rabbitmq實(shí)踐指南pdf)
一 概述
1.1 背景
近期在做告警集成平臺(tái),其中需要告警消息發(fā)送,類型需要涵蓋目前市場(chǎng)主流的消息接受端,例如微信/企業(yè)微信/釘釘/郵件/短信/電話等等,這勢(shì)必要利用到MQ,在眾多的消息中間件中,經(jīng)過調(diào)研此場(chǎng)景并不象大數(shù)據(jù)處理場(chǎng)景需要kafka,同時(shí)需要較高性能和確認(rèn)機(jī)制,數(shù)據(jù)的可靠性和活躍的社區(qū),支持消息的持久化于中間件的高可用部署,最終選型了RabbitMQ來作為應(yīng)用的中間件。
1.2 概念
MQ全稱為Message Queue, 即消息隊(duì)列。MQ是一種應(yīng)用程序?qū)?yīng)用程序的通信方法。應(yīng)用程序通過讀寫出入隊(duì)列的消息(針對(duì)應(yīng)用程序的數(shù)據(jù))來通信,而無需專用連接來鏈接它們。消息傳遞指的是程序之間通過在消息中發(fā)送數(shù)據(jù)進(jìn)行通信,而不是通過直接調(diào)用彼此來通信,直接調(diào)用通常是用于諸如遠(yuǎn)程過程調(diào)用的技術(shù)。排隊(duì)指的是應(yīng)用程序通過隊(duì)列來通信。隊(duì)列的使用除去了接收和發(fā)送應(yīng)用程序同時(shí)執(zhí)行的要求。RabbitMQ則是一個(gè)在AMQP基礎(chǔ)上完整的,可復(fù)用的企業(yè)消息系統(tǒng)。
1.3 功能
應(yīng)用解耦:mq基于數(shù)據(jù)的接口層,將耦合的應(yīng)用來分解開,兩邊都實(shí)現(xiàn)這個(gè)接口,這樣就允許獨(dú)立的修改或者擴(kuò)展兩邊的處理過程,只要兩邊遵守相同的接口約束即可。
流量削峰:在高并發(fā),大流量的場(chǎng)景下,rabbitmq可以減少突發(fā)訪問壓力,不會(huì)因?yàn)橥话l(fā)的超時(shí)負(fù)荷要求而崩潰
異步通信:通過把把消息發(fā)送給消息中間件,將不是實(shí)時(shí)的業(yè)務(wù)異步處理
1.4 特點(diǎn)
可靠性:RabbitMQ使用一些機(jī)制來保證可靠性,如持久化、傳輸確認(rèn)及發(fā)布確認(rèn)等。
靈活的路由:在消息進(jìn)入隊(duì)列之前,通過交換器來路由消息。對(duì)于典型的路由功能,RabbitMQ己經(jīng)提供了一些內(nèi)置的交換器來實(shí)現(xiàn)。針對(duì)更復(fù)雜的路由功能,可以將多個(gè)交換器綁定在一起,也可以通過插件機(jī)制來實(shí)現(xiàn)自己的交換器。
擴(kuò)展性:多個(gè)RabbitMQ節(jié)點(diǎn)可以組成一個(gè)集群,也可以根據(jù)實(shí)際業(yè)務(wù)情況動(dòng)態(tài)地?cái)U(kuò)展集群中節(jié)點(diǎn)。
高可用性:隊(duì)列可以在集群中的機(jī)器上設(shè)置鏡像,使得在部分節(jié)點(diǎn)出現(xiàn)問題的情況下隊(duì)仍然可用。
多種協(xié)議:RabbitMQ除了原生支持AMQP協(xié)議,還支持STOMP,MQTT等多種消息中間件協(xié)議。
多語言客戶端:RabbitMQ幾乎支持所有常用語言,比如Jav a、Python、Ruby、PHP、C#、JavaScript等。
管理界面:RabbitMQ提供了一個(gè)易用的用戶界面,使得用戶可以監(jiān)控和管理消息、集群中的節(jié)點(diǎn)等。
插件機(jī)制:RabbitMQ提供了許多插件,以實(shí)現(xiàn)從多方面進(jìn)行擴(kuò)展,當(dāng)然也可以編寫自己的插件。
二 架構(gòu)
2.1 架構(gòu)圖
RabbitMQ Server
也叫Broker Server,它不是運(yùn)送食物的卡車,而是一種傳輸服務(wù)。原話是RabbitMQ isn't a food truck, it's a delivery service. 它的角色就是維護(hù)一條從Producer到Consumer的路線,保證數(shù)據(jù)能夠按照指定的方式進(jìn)行傳輸。雖然這個(gè)保證也不是100%的保證,但是對(duì)于普通的應(yīng)用來說這已經(jīng)足夠了。當(dāng)然對(duì)于商業(yè)系統(tǒng)來說,可以再做一層數(shù)據(jù)一致性的guard,就可以徹底保證系統(tǒng)的一致性了。
Client P
也叫Producer,數(shù)據(jù)的發(fā)送方。Create messages and publish (send) them to a Broker Server (RabbitMQ)。一個(gè)Message有兩個(gè)部分:payload(有效載荷)和label(標(biāo)簽)。payload顧名思義就是傳輸?shù)臄?shù)據(jù)。label是exchange的名字或者說是一個(gè)tag,它描述了payload,而且RabbitMQ也是通過這個(gè)label來決定把這個(gè)Message發(fā)給哪個(gè)Consumer。AMQP僅僅描述了label,而RabbitMQ決定了如何使用這個(gè)label的規(guī)則。
Client C
也叫Consumer,數(shù)據(jù)的接收方。Consumers attach to a Broker Server (RabbitMQ) and subscribe to a queue。把queue比作是一個(gè)有名字的郵箱。當(dāng)有Message到達(dá)某個(gè)郵箱后,RabbitMQ把它發(fā)送給它的某個(gè)訂閱者即Consumer。當(dāng)然可能會(huì)把同一個(gè)Message發(fā)送給很多的Consumer。在這個(gè)Message中,只有payload,label已經(jīng)被刪掉了。對(duì)于Consumer來說,它是不知道誰發(fā)送的這個(gè)信息的,就是協(xié)議本身不支持。當(dāng)然了,如果Producer發(fā)送的payload包含了Producer的信息就另當(dāng)別論了。
Connection
就是一個(gè)TCP的連接。Producer和Consumer都是通過TCP連接到RabbitMQ Server的。以后我們可以看到,程序的起始處就是建立這個(gè)TCP連接。
Channel
虛擬連接。它建立在上述的TCP連接中。數(shù)據(jù)流動(dòng)都是在Channel中進(jìn)行的。也就是說,一般情況是程序起始建立TCP連接,第二步就是建立這個(gè)Channel。
那么,為什么使用Channel,而不是直接使用TCP連接?
對(duì)于OS來說,建立和關(guān)閉TCP連接是有代價(jià)的,頻繁的建立關(guān)閉TCP連接對(duì)于系統(tǒng)的性能有很大的影響,而且TCP的連接數(shù)也有限制,這也限制了系統(tǒng)處理高并發(fā)的能力。但是,在TCP連接中建立Channel是沒有上述代價(jià)的。對(duì)于Producer或者Consumer來說,可以并發(fā)的使用多個(gè)Channel進(jìn)行Publish或者Receive。有實(shí)驗(yàn)表明,1s的數(shù)據(jù)可以Publish10K的數(shù)據(jù)包。當(dāng)然對(duì)于不同的硬件環(huán)境,不同的數(shù)據(jù)包大小這個(gè)數(shù)據(jù)肯定不一樣,但是我只想說明,對(duì)于普通的Consumer或者Producer來說,這已經(jīng)足夠了。如果不夠用,你考慮的應(yīng)該是如何細(xì)化SPLIT你的設(shè)計(jì)。
2.2 相關(guān)定義
Broker: 簡(jiǎn)單來說就是消息隊(duì)列服務(wù)器實(shí)體
Exchange: 消息交換機(jī),它指定消息按什么規(guī)則,路由到哪個(gè)隊(duì)列
Queue: 消息隊(duì)列載體,每個(gè)消息都會(huì)被投入到一個(gè)或多個(gè)隊(duì)列
Binding: 綁定,它的作用就是把exchange和queue按照路由規(guī)則綁定起來
Routing Key: 路由關(guān)鍵字,exchange根據(jù)這個(gè)關(guān)鍵字進(jìn)行消息投遞
VHost: 虛擬主機(jī),一個(gè)broker里可以開設(shè)多個(gè)vhost,用作不同用戶的權(quán)限分離。
Producer: 消息生產(chǎn)者,就是投遞消息的程序
Consumer: 消息消費(fèi)者,就是接受消息的程序
Channel: 消息通道,在客戶端的每個(gè)連接里,可建立多個(gè)channel,每個(gè)channel代表一個(gè)會(huì)話任務(wù)
由Exchange、Queue、RoutingKey三個(gè)才能決定一個(gè)從Exchange到Queue的唯一的線路。
2.3 基本概念
Connection Factory、Connection、Channel都是RabbitMQ對(duì)外提供的API中最基本的對(duì)象。Connection是RabbitMQ的socket鏈接,它封裝了socket協(xié)議相關(guān)部分邏輯。Connection Factory則是Connection的制造工廠。
Channel是我們與RabbitMQ打交道的最重要的一個(gè)接口,我們大部分的業(yè)務(wù)操作是在Channel這個(gè)接口中完成的,包括定義Queue、定義Exchange、綁定Queue與Exchange、發(fā)布消息等。
Queue
Queue(隊(duì)列)是RabbitMQ的內(nèi)部對(duì)象,用于存儲(chǔ)消息,如下圖表示。
RabbitMQ中的消息都只能存儲(chǔ)在Queue中,生產(chǎn)者(下圖中的P)生產(chǎn)消息并最終投遞到Queue中,消費(fèi)者(下圖中的C)可以從Queue中獲取消息并消費(fèi)。
多個(gè)消費(fèi)者可以訂閱同一個(gè)Queue,這時(shí)Queue中的消息會(huì)被平均分?jǐn)偨o多個(gè)消費(fèi)者進(jìn)行處理,而不是每個(gè)消費(fèi)者都收到所有的消息并處理。
Message acknowledgment
在實(shí)際應(yīng)用中,可能會(huì)發(fā)生消費(fèi)者收到Queue中的消息,但沒有處理完成就宕機(jī)(或出現(xiàn)其他意外)的情況,這種情況下就可能會(huì)導(dǎo)致消息丟失。為了避免這種情況發(fā)生,我們可以要求消費(fèi)者在消費(fèi)完消息后發(fā)送一個(gè)回執(zhí)給RabbitMQ,RabbitMQ收到消息回執(zhí)(Message acknowledgment)后才將該消息從Queue中移除。
如果RabbitMQ沒有收到回執(zhí)并檢測(cè)到消費(fèi)者的RabbitMQ連接斷開,則RabbitMQ會(huì)將該消息發(fā)送給其他消費(fèi)者(如果存在多個(gè)消費(fèi)者)進(jìn)行處理。這里不存在timeout,一個(gè)消費(fèi)者處理消息時(shí)間再長(zhǎng)也不會(huì)導(dǎo)致該消息被發(fā)送給其他消費(fèi)者,除非它的RabbitMQ連接斷開。
這里會(huì)產(chǎn)生另外一個(gè)問題,如果我們的開發(fā)人員在處理完業(yè)務(wù)邏輯后,忘記發(fā)送回執(zhí)給RabbitMQ,這將會(huì)導(dǎo)致嚴(yán)重的bug——Queue中堆積的消息會(huì)越來越多。消費(fèi)者重啟后會(huì)重復(fù)消費(fèi)這些消息并重復(fù)執(zhí)行業(yè)務(wù)邏輯。
另外publish message 是沒有ACK的。
Message durability
如果我們希望即使在RabbitMQ服務(wù)重啟的情況下,也不會(huì)丟失消息,我們可以將Queue與Message都設(shè)置為可持久化的(durable),這樣可以保證絕大部分情況下我們的RabbitMQ消息不會(huì)丟失。但依然解決不了小概率丟失事件的發(fā)生(比如RabbitMQ服務(wù)器已經(jīng)接收到生產(chǎn)者的消息,但還沒來得及持久化該消息時(shí)RabbitMQ服務(wù)器就斷電了),如果我們需要對(duì)這種小概率事件也要管理起來,那么我們要用到事務(wù)。由于這里僅為RabbitMQ的簡(jiǎn)單介紹,所以這里將不講解RabbitMQ相關(guān)的事務(wù)。
Prefetch count
前面我們講到如果有多個(gè)消費(fèi)者同時(shí)訂閱同一個(gè)Queue中的消息,Queue中的消息會(huì)被平攤給多個(gè)消費(fèi)者。這時(shí)如果每個(gè)消息的處理時(shí)間不同,就有可能會(huì)導(dǎo)致某些消費(fèi)者一直在忙,而另外一些消費(fèi)者很快就處理完手頭工作并一直空閑的情況。我們可以通過設(shè)置Prefetch count來限制Queue每次發(fā)送給每個(gè)消費(fèi)者的消息數(shù),比如我們?cè)O(shè)置prefetchCount=1,則Queue每次給每個(gè)消費(fèi)者發(fā)送一條消息;消費(fèi)者處理完這條消息后Queue會(huì)再給該消費(fèi)者發(fā)送一條消息。
Exchange
在上一節(jié)我們看到生產(chǎn)者將消息投遞到Queue中,實(shí)際上這在RabbitMQ中這種事情永遠(yuǎn)都不會(huì)發(fā)生。實(shí)際的情況是,生產(chǎn)者將消息發(fā)送到Exchange(交換器,下圖中的X),由Exchange將消息路由到一個(gè)或多個(gè)Queue中(或者丟棄)。
Exchange是按照什么邏輯將消息路由到Queue的?這個(gè)將在Binding一節(jié)中介紹。
RabbitMQ中的Exchange有四種類型,不同的類型有著不同的路由策略,這將在Exchange Types一節(jié)介紹。
Routing Key
生產(chǎn)者在將消息發(fā)送給Exchange的時(shí)候,一般會(huì)指定一個(gè)Routing Key,來指定這個(gè)消息的路由規(guī)則,而這個(gè)Routing Key需要與Exchange Type及Binding key聯(lián)合使用才能最終生效。
在Exchange Type與Binding key固定的情況下(在正常使用時(shí)一般這些內(nèi)容都是固定配置好的),我們的生產(chǎn)者就可以在發(fā)送消息給Exchange時(shí),通過指定Routing Key來決定消息流向哪里。
RabbitMQ為Routing Key設(shè)定的長(zhǎng)度限制為255 bytes。
Binding
RabbitMQ中通過Binding將Exchange與Queue關(guān)聯(lián)起來,這樣RabbitMQ就知道如何正確地將消息路由到指定的Queue了。
Binding key
在綁定(Binding)Exchange與Queue的同時(shí),一般會(huì)指定一個(gè)Binding key。消費(fèi)者將消息發(fā)送給Exchange時(shí),一般會(huì)指定一個(gè)Routing Key。當(dāng) Binding key與Routing Key相匹配時(shí),消息將會(huì)被路由到對(duì)應(yīng)的Queue中。這個(gè)將在Exchange Types章節(jié)會(huì)列舉實(shí)際的例子加以說明。
在綁定多個(gè)Queue到同一個(gè)Exchange的時(shí)候,這些Binding允許使用相同的Binding key。
Binding key并不是在所有情況下都生效,它依賴于Exchange Type,比如fanout類型的Exchange就會(huì)無視Binding key,而是將消息路由到所有綁定到該Exchange的Queue。
Exchange Types
RabbitMQ常用的Exchange Type有fanout、direct、topic、headers這四種(AMQP規(guī)范里還提到兩種Exchange Type,分別為system與自定義,這里不予以描述),下面分別進(jìn)行介紹。
fanout
fanout類型的Exchange路由規(guī)則非常簡(jiǎn)單,它會(huì)把所有發(fā)送到該Exchange的消息路由到所有與它綁定的Queue中。
上圖中,生產(chǎn)者(P)發(fā)送到Exchange(X)的所有消息都會(huì)路由到圖中的兩個(gè)Queue,并最終被兩個(gè)消費(fèi)者(C1與C2)消費(fèi)。
direct
direct類型的Exchange路由規(guī)則也很簡(jiǎn)單,它會(huì)把消息路由到那些Binding key與Routing key完全匹配的Queue中。
以上圖的配置為例,我們以routingKey="error"發(fā)送消息到Exchange,則消息會(huì)路由到Queue1(amqp.gen-S9b…,這是由RabbitMQ自動(dòng)生成的Queue名稱)和Queue2(amqp.gen-Agl…);如果我們以Routing Key="info"或routingKey="warning"來發(fā)送消息,則消息只會(huì)路由到Queue2。如果我們以其他Routing Key發(fā)送消息,則消息不會(huì)路由到這兩個(gè)Queue中。
topic
前面講到direct類型的Exchange路由規(guī)則是完全匹配Binding Key與Routing Key,但這種嚴(yán)格的匹配方式在很多情況下不能滿足實(shí)際業(yè)務(wù)需求。topic類型的Exchange在匹配規(guī)則上進(jìn)行了擴(kuò)展,它與direct類型的Exchage相似,也是將消息路由到Binding Key與Routing Key相匹配的Queue中,但這里的匹配規(guī)則有些不同,它約定:
Routing Key為一個(gè)句點(diǎn)號(hào)“.”分隔的字符串(我們將被句點(diǎn)號(hào)". "分隔開的每一段獨(dú)立的字符串稱為一個(gè)單詞),如"stock.usd.nyse"、"nyse.vmw"、"quick.orange.rabbit"。Binding Key與Routing Key一樣也是句點(diǎn)號(hào)“. ”分隔的字符串。
Binding Key中可以存在兩種特殊字符""與"#",用于做模糊匹配,其中""用于匹配一個(gè)單詞,"#"用于匹配多個(gè)單詞(可以是零個(gè))。
以上圖中的配置為例,routingKey=”quick.orange.rabbit”的消息會(huì)同時(shí)路由到Q1與Q2,routingKey=”lazy.orange.fox”的消息會(huì)路由到Q1,routingKey=”lazy.brown.fox”的消息會(huì)路由到Q2,routingKey=”lazy.pink.rabbit”的消息會(huì)路由到Q2(只會(huì)投遞給Q2一次,雖然這個(gè)routingKey與Q2的兩個(gè)bindingKey都匹配);routingKey=”quick.brown.fox”、routingKey=”orange”、routingKey=”quick.orange.male.rabbit”的消息將會(huì)被丟棄,因?yàn)樗鼈儧]有匹配任何bindingKey。
headers
headers類型的Exchange不依賴于Routing Key與Binding Key的匹配規(guī)則來路由消息,而是根據(jù)發(fā)送的消息內(nèi)容中的headers屬性進(jìn)行匹配。
在綁定Queue與Exchange時(shí)指定一組鍵值對(duì);當(dāng)消息發(fā)送到Exchange時(shí),RabbitMQ會(huì)取到該消息的headers(也是一個(gè)鍵值對(duì)的形式),對(duì)比其中的鍵值對(duì)是否完全匹配Queue與Exchange綁定時(shí)指定的鍵值對(duì)。如果完全匹配則消息會(huì)路由到該Queue,否則不會(huì)路由到該Queue。
該類型的Exchange沒有用到過(不過也應(yīng)該很有用武之地),所以不做介紹。
RPC
MQ本身是基于異步的消息處理,前面的示例中所有的生產(chǎn)者(P)將消息發(fā)送到RabbitMQ后不會(huì)知道消費(fèi)者(C)處理成功或者失敗(甚至連有沒有消費(fèi)者來處理這條消息都不知道)。
但實(shí)際的應(yīng)用場(chǎng)景中,我們很可能需要一些同步處理,需要同步等待服務(wù)端將我的消息處理完成后再進(jìn)行下一步處理。這相當(dāng)于RPC(Remote Procedure Call,遠(yuǎn)程過程調(diào)用)。在RabbitMQ中也支持RPC。
RabbitMQ中實(shí)現(xiàn)RPC的機(jī)制是:
客戶端發(fā)送請(qǐng)求(消息)時(shí),在消息的屬性(Message Properties,在AMQP協(xié)議中定義了14種properties,這些屬性會(huì)隨著消息一起發(fā)送)中設(shè)置兩個(gè)值replyTo(一個(gè)Queue名稱,用于告訴服務(wù)器處理完成后將通知我的消息發(fā)送到這個(gè)Queue中)和correlationId(此次請(qǐng)求的標(biāo)識(shí)號(hào),服務(wù)器處理完成后需要將此屬性返還,客戶端將根據(jù)這個(gè)id了解哪條請(qǐng)求被成功執(zhí)行了或執(zhí)行失敗)。服務(wù)器端收到消息處理完后,將生成一條應(yīng)答消息到replyTo指定的Queue,同時(shí)帶上correlationId屬性。客戶端之前已訂閱replyTo指定的Queue,從中收到服務(wù)器的應(yīng)答消息后,根據(jù)其中的correlationId屬性分析哪條請(qǐng)求被執(zhí)行了,根據(jù)執(zhí)行結(jié)果進(jìn)行后續(xù)業(yè)務(wù)處理。
2.4 細(xì)節(jié)闡明
默認(rèn)情況下,如果Message 已經(jīng)被某個(gè)Consumer正確的接收到了,那么該Message就會(huì)被從Queue中移除。當(dāng)然也可以讓同一個(gè)Message發(fā)送到很多的Consumer。
如果一個(gè)Queue沒被任何的Consumer Subscribe(訂閱),當(dāng)有數(shù)據(jù)到達(dá)時(shí),這個(gè)數(shù)據(jù)會(huì)被cache,不會(huì)被丟棄。當(dāng)有Consumer時(shí),這個(gè)數(shù)據(jù)會(huì)被立即發(fā)送到這個(gè)Consumer。這個(gè)數(shù)據(jù)被Consumer正確收到時(shí),這個(gè)數(shù)據(jù)就被從Queue中刪除。
那么什么是正確收到呢?通過ACK。每個(gè)Message都要被acknowledged(確認(rèn),ACK)。我們可以顯示的在程序中去ACK,也可以自動(dòng)的ACK。如果有數(shù)據(jù)沒有被ACK,那么RabbitMQ Server會(huì)把這個(gè)信息發(fā)送到下一個(gè)Consumer。
如果這個(gè)APP有bug,忘記了ACK,那么RabbitMQ Server不會(huì)再發(fā)送數(shù)據(jù)給它,因?yàn)镾erver認(rèn)為這個(gè)Consumer處理能力有限。而且ACK的機(jī)制可以起到限流的作用(Benefitto throttling):在Consumer處理完成數(shù)據(jù)后發(fā)送ACK,甚至在額外的延時(shí)后發(fā)送ACK,將有效的balance Consumer的load。
當(dāng)然對(duì)于實(shí)際的例子,比如我們可能會(huì)對(duì)某些數(shù)據(jù)進(jìn)行merge,比如merge 4s內(nèi)的數(shù)據(jù),然后sleep 4s后再獲取數(shù)據(jù)。特別是在監(jiān)聽系統(tǒng)的state,我們不希望所有的state實(shí)時(shí)的傳遞上去,而是希望有一定的延時(shí)。這樣可以減少某些IO,而且終端用戶也不會(huì)感覺到。
有兩種方式,第一種的Reject可以讓RabbitMQ Server將該Message 發(fā)送到下一個(gè)Consumer。第二種是從Queue中立即刪除該Message。
Consumer和Procuder都可以通過 queue.declare 創(chuàng)建queue。對(duì)于某個(gè)Channel來說,Consumer不能declare一個(gè)queue,卻訂閱其他的queue。當(dāng)然也可以創(chuàng)建私有的queue。這樣只有APP本身才可以使用這個(gè)queue。queue也可以自動(dòng)刪除,被標(biāo)為auto-delete的queue在最后一個(gè)Consumer unsubscribe后就會(huì)被自動(dòng)刪除。那么如果是創(chuàng)建一個(gè)已經(jīng)存在的queue呢?那么不會(huì)有任何的影響。需要注意的是沒有任何的影響,也就是說第二次創(chuàng)建如果參數(shù)和第一次不一樣,那么該操作雖然成功,但是queue的屬性并不會(huì)被修改。
那么誰應(yīng)該負(fù)責(zé)創(chuàng)建這個(gè)queue呢?是Consumer,還是Producer?
如果queue不存在,當(dāng)然Consumer不會(huì)得到任何的Message。那么Producer Publish的Message會(huì)被丟棄。所以,還是為了數(shù)據(jù)不丟失,Consumer和Producer都try to create the queue!反正不管怎么樣,這個(gè)接口都不會(huì)出問題。
queue對(duì)load balance的處理是完美的。對(duì)于多個(gè)Consumer來說,RabbitMQ 使用循環(huán)的方式(round-robin)的方式均衡的發(fā)送給不同的Consumer。
從架構(gòu)圖可以看出,Procuder Publish的Message進(jìn)入了Exchange。接著通過"routing keys”, RabbitMQ會(huì)找到應(yīng)該把這個(gè)Message放到哪個(gè)queue里。queue也是通過這個(gè)routing keys來做的綁定。有三種類型的Exchanges:direct, fanout,topic。 每個(gè)實(shí)現(xiàn)了不同的路由算法(routing algorithm)。
Direct exchange:**如果 routing key 匹配,那么Message就會(huì)被傳遞到相應(yīng)的queue中。其實(shí)在queue創(chuàng)建時(shí),它會(huì)自動(dòng)的以queue的名字作為routing key來綁定那個(gè)exchange。
Fanout exchange: 會(huì)向響應(yīng)的queue廣播。
**Topic exchange:**對(duì)key進(jìn)行模式匹配,比如ab可以傳遞到所有ab的queue。
每個(gè)virtual host本質(zhì)上都是一個(gè)RabbitMQ Server,擁有它自己的queue,exchagne,和bings rule等等。這保證了你可以在多個(gè)不同的Application中使用RabbitMQ。
tcp創(chuàng)建銷毀有三次握手和四次揮手,開銷太大
操作系統(tǒng)tcp鏈接有限制,如果使用tcp鏈接,高峰期每秒成千上萬的鏈接造成資源浪費(fèi)
channel的原理一個(gè)進(jìn)程一條通道,多條進(jìn)程多條通道公用一條tcp鏈接,一條tcp鏈接可以容納無限的channel,不會(huì)有性能瓶頸。
三 部署
此文檔為centos7 安裝部署
3.1 安裝erlang
#?配置yum源?cat?>?/etc/yum.repos.d/erlang.repo?<
3.2 配置yum源
cat?>?/etc/yum.repos.d/rabbitmq.repo?< 3.3 rabbitmq服務(wù) yum?-y?install?rabbitmq-server?chkconfig?rabbitmq-server?on?#?更改rabbitmq數(shù)據(jù)和日志存儲(chǔ)目錄?#?創(chuàng)建數(shù)據(jù)和日志目錄?mkdir?-pv?/data/rabbitmq/mnesia?mkdir?-pv?/data/rabbitmq/log?chown?rabbitmq.rabbitmq?/data/rabbitmq/*?-R?#?創(chuàng)建配置文件?cat?>/etc/rabbitmq/rabbitmq-env.conf?< 3.4 配置 vim?/usr/lib/rabbitmq/lib/rabbitmq_server-3.7.17/ebin/rabbit.app?復(fù)制代碼 3.5 插件 rabbitmq-plugins?enable?rabbitmq_management?#?rabbitmq?為了安全guest用戶只能localhost訪問,開啟guest/guest登陸?cat?>?/etc/rabbitmq/rabbitmq.config?< 四 使用 由于技術(shù)棧為python,此處簡(jiǎn)單舉例python中rabbitmq的使用 Introduction 由于AMQP是雙向RPC協(xié)議,客戶端可以向服務(wù)器發(fā)送請(qǐng)求,服務(wù)器可以向客戶端發(fā)送請(qǐng)求,因此Pika在其每個(gè)異步連接適配器中實(shí)現(xiàn)或擴(kuò)展IO循環(huán)。這些IO循環(huán)是阻塞循環(huán)和偵聽事件的方法。每個(gè)異步適配器都遵循相同的標(biāo)準(zhǔn)來調(diào)用IO循環(huán)。創(chuàng)建連接適配器時(shí)會(huì)創(chuàng)建IO循環(huán)。要為任何給定的適配器啟動(dòng)IO循環(huán),請(qǐng)調(diào)用connection.ioloop.start()方法。 install pip?install?pika?復(fù)制代碼 demo We start by creating our connection object, then starting our event loop. When we are connected, the on_connected method is called. In that method we create a channel. When the channel is created, the on_channel_open method is called. In that method we declare a queue. When the queue is declared successfully, on_queue_declared is called. In that method we call channel.basic_consume telling it to call the handle_delivery for each message RabbitMQ delivers to us. When RabbitMQ has a message to send us, it calls the handle_delivery method passing the AMQP Method frame, Header frame, and Body. import?pika?#?Create?a?global?channel?variable?to?hold?our?channel?object?in?channel?=?None?#?Step?#2?def?on_connected(connection):?????"""Called?when?we?are?fully?connected?to?RabbitMQ"""?????#?Open?a?channel?????connection.channel(on_open_callback=on_channel_open)?#?Step?#3?def?on_channel_open(new_channel):?????"""Called?when?our?channel?has?opened"""?????global?channel?????channel?=?new_channel?????channel.queue_declare(queue="test",?durable=True,?exclusive=False,?auto_delete=False,?callback=on_queue_declared)?#?Step?#4?def?on_queue_declared(frame):?????"""Called?when?RabbitMQ?has?told?us?our?Queue?has?been?declared,?frame?is?the?response?from?RabbitMQ"""?????channel.basic_consume('test',?handle_delivery)?#?Step?#5?def?handle_delivery(channel,?method,?header,?body):?????"""Called?when?we?receive?a?message?from?RabbitMQ"""?????print(body)?#?Step?#1:?Connect?to?RabbitMQ?using?the?default?parameters?parameters?=?pika.ConnectionParameters()?connection?=?pika.SelectConnection(parameters,?on_open_callback=on_connected)?try:?????#?Loop?so?we?can?communicate?with?RabbitMQ?????connection.ioloop.start()?except?KeyboardInterrupt:?????#?Gracefully?close?the?connection?????connection.close()?????#?Loop?until?we're?fully?closed,?will?stop?on?its?own?????connection.ioloop.start()?復(fù)制代碼 五 消息發(fā)送服務(wù)設(shè)計(jì) 最總消費(fèi)者和生產(chǎn)者總體均跑在k8s集群總,對(duì)于消息發(fā)送服務(wù)生產(chǎn)者發(fā)送消息攜帶routing_key,使用confirm確認(rèn),exchange使用direct模式,對(duì)應(yīng)bind_key發(fā)送到對(duì)應(yīng)queue中,在對(duì)英queue的connection中啟動(dòng)多個(gè)channel,每個(gè)對(duì)應(yīng)自己多個(gè)consumer來提高并發(fā)。 參考鏈接 github.com/rabbitmq/er… www.rabbitmq.com/install-rpm… juejin.im/entry/599e5… blog.csdn.net/weixin_3860… www.bilibili.com/video/av576… 分布式消息隊(duì)列 RabbitMQ
版權(quán)聲明:本文內(nèi)容由網(wǎng)絡(luò)用戶投稿,版權(quán)歸原作者所有,本站不擁有其著作權(quán),亦不承擔(dān)相應(yīng)法律責(zé)任。如果您發(fā)現(xiàn)本站中有涉嫌抄襲或描述失實(shí)的內(nèi)容,請(qǐng)聯(lián)系我們jiasou666@gmail.com 處理,核實(shí)后本網(wǎng)站將在24小時(shí)內(nèi)刪除侵權(quán)內(nèi)容。