RabbitMQ實踐【綻放吧!數(shù)據(jù)庫】(rabbitmq筆記)
一 概述
1.1 背景
近期在做告警集成平臺,其中需要告警消息發(fā)送,類型需要涵蓋目前市場主流的消息接受端,例如微信/企業(yè)微信/釘釘/郵件/短信/電話等等,這勢必要利用到MQ,在眾多的消息中間件中,經(jīng)過調(diào)研此場景并不象大數(shù)據(jù)處理場景需要kafka,同時需要較高性能和確認機制,數(shù)據(jù)的可靠性和活躍的社區(qū),支持消息的持久化于中間件的高可用部署,最終選型了RabbitMQ來作為應用的中間件。
1.2 概念
MQ全稱為Message Queue, 即消息隊列。MQ是一種應用程序?qū)贸绦虻耐ㄐ欧椒ā贸绦蛲ㄟ^讀寫出入隊列的消息(針對應用程序的數(shù)據(jù))來通信,而無需專用連接來鏈接它們。消息傳遞指的是程序之間通過在消息中發(fā)送數(shù)據(jù)進行通信,而不是通過直接調(diào)用彼此來通信,直接調(diào)用通常是用于諸如遠程過程調(diào)用的技術(shù)。排隊指的是應用程序通過隊列來通信。隊列的使用除去了接收和發(fā)送應用程序同時執(zhí)行的要求。RabbitMQ則是一個在AMQP基礎上完整的,可復用的企業(yè)消息系統(tǒng)。
1.3 功能
應用解耦:mq基于數(shù)據(jù)的接口層,將耦合的應用來分解開,兩邊都實現(xiàn)這個接口,這樣就允許獨立的修改或者擴展兩邊的處理過程,只要兩邊遵守相同的接口約束即可。
流量削峰:在高并發(fā),大流量的場景下,rabbitmq可以減少突發(fā)訪問壓力,不會因為突發(fā)的超時負荷要求而崩潰
異步通信:通過把把消息發(fā)送給消息中間件,將不是實時的業(yè)務異步處理
1.4 特點
可靠性:RabbitMQ使用一些機制來保證可靠性,如持久化、傳輸確認及發(fā)布確認等。
靈活的路由:在消息進入隊列之前,通過交換器來路由消息。對于典型的路由功能,RabbitMQ己經(jīng)提供了一些內(nèi)置的交換器來實現(xiàn)。針對更復雜的路由功能,可以將多個交換器綁定在一起,也可以通過插件機制來實現(xiàn)自己的交換器。
擴展性:多個RabbitMQ節(jié)點可以組成一個集群,也可以根據(jù)實際業(yè)務情況動態(tài)地擴展集群中節(jié)點。
高可用性:隊列可以在集群中的機器上設置鏡像,使得在部分節(jié)點出現(xiàn)問題的情況下隊仍然可用。
多種協(xié)議:RabbitMQ除了原生支持AMQP協(xié)議,還支持STOMP,MQTT等多種消息中間件協(xié)議。
多語言客戶端:RabbitMQ幾乎支持所有常用語言,比如Jav a、Python、Ruby、PHP、C#、JavaScript等。
管理界面:RabbitMQ提供了一個易用的用戶界面,使得用戶可以監(jiān)控和管理消息、集群中的節(jié)點等。
插件機制:RabbitMQ提供了許多插件,以實現(xiàn)從多方面進行擴展,當然也可以編寫自己的插件。
二 架構(gòu)
2.1 架構(gòu)圖
RabbitMQ Server
也叫Broker Server,它不是運送食物的卡車,而是一種傳輸服務。原話是RabbitMQ isn’t a food truck, it’s a delivery service. 它的角色就是維護一條從Producer到Consumer的路線,保證數(shù)據(jù)能夠按照指定的方式進行傳輸。雖然這個保證也不是100%的保證,但是對于普通的應用來說這已經(jīng)足夠了。當然對于商業(yè)系統(tǒng)來說,可以再做一層數(shù)據(jù)一致性的guard,就可以徹底保證系統(tǒng)的一致性了。
Client P
也叫Producer,數(shù)據(jù)的發(fā)送方。Create messages and publish (send) them to a Broker Server (RabbitMQ)。一個Message有兩個部分:payload(有效載荷)和label(標簽)。payload顧名思義就是傳輸?shù)臄?shù)據(jù)。label是exchange的名字或者說是一個tag,它描述了payload,而且RabbitMQ也是通過這個label來決定把這個Message發(fā)給哪個Consumer。AMQP僅僅描述了label,而RabbitMQ決定了如何使用這個label的規(guī)則。
Client C
也叫Consumer,數(shù)據(jù)的接收方。Consumers attach to a Broker Server (RabbitMQ) and subscribe to a queue。把queue比作是一個有名字的郵箱。當有Message到達某個郵箱后,RabbitMQ把它發(fā)送給它的某個訂閱者即Consumer。當然可能會把同一個Message發(fā)送給很多的Consumer。在這個Message中,只有payload,label已經(jīng)被刪掉了。對于Consumer來說,它是不知道誰發(fā)送的這個信息的,就是協(xié)議本身不支持。當然了,如果Producer發(fā)送的payload包含了Producer的信息就另當別論了。
Connection
就是一個TCP的連接。Producer和Consumer都是通過TCP連接到RabbitMQ Server的。以后我們可以看到,程序的起始處就是建立這個TCP連接。
Channel
虛擬連接。它建立在上述的TCP連接中。數(shù)據(jù)流動都是在Channel中進行的。也就是說,一般情況是程序起始建立TCP連接,第二步就是建立這個Channel。
那么,為什么使用Channel,而不是直接使用TCP連接?
對于OS來說,建立和關(guān)閉TCP連接是有代價的,頻繁的建立關(guān)閉TCP連接對于系統(tǒng)的性能有很大的影響,而且TCP的連接數(shù)也有限制,這也限制了系統(tǒng)處理高并發(fā)的能力。但是,在TCP連接中建立Channel是沒有上述代價的。對于Producer或者Consumer來說,可以并發(fā)的使用多個Channel進行Publish或者Receive。有實驗表明,1s的數(shù)據(jù)可以Publish10K的數(shù)據(jù)包。當然對于不同的硬件環(huán)境,不同的數(shù)據(jù)包大小這個數(shù)據(jù)肯定不一樣,但是我只想說明,對于普通的Consumer或者Producer來說,這已經(jīng)足夠了。如果不夠用,你考慮的應該是如何細化SPLIT你的設計。
2.2 相關(guān)定義
Broker: 簡單來說就是消息隊列服務器實體
Exchange: 消息交換機,它指定消息按什么規(guī)則,路由到哪個隊列
Queue: 消息隊列載體,每個消息都會被投入到一個或多個隊列
Binding: 綁定,它的作用就是把exchange和queue按照路由規(guī)則綁定起來
Routing Key: 路由關(guān)鍵字,exchange根據(jù)這個關(guān)鍵字進行消息投遞
VHost: 虛擬主機,一個broker里可以開設多個vhost,用作不同用戶的權(quán)限分離。
Producer: 消息生產(chǎn)者,就是投遞消息的程序
Consumer: 消息消費者,就是接受消息的程序
Channel: 消息通道,在客戶端的每個連接里,可建立多個channel,每個channel代表一個會話任務
由Exchange、Queue、RoutingKey三個才能決定一個從Exchange到Queue的唯一的線路。
2.3 基本概念
Connection Factory、Connection、Channel都是RabbitMQ對外提供的API中最基本的對象。Connection是RabbitMQ的socket鏈接,它封裝了socket協(xié)議相關(guān)部分邏輯。Connection Factory則是Connection的制造工廠。
Channel是我們與RabbitMQ打交道的最重要的一個接口,我們大部分的業(yè)務操作是在Channel這個接口中完成的,包括定義Queue、定義Exchange、綁定Queue與Exchange、發(fā)布消息等。
Queue
Queue(隊列)是RabbitMQ的內(nèi)部對象,用于存儲消息,如下圖表示。
RabbitMQ中的消息都只能存儲在Queue中,生產(chǎn)者(下圖中的P)生產(chǎn)消息并最終投遞到Queue中,消費者(下圖中的C)可以從Queue中獲取消息并消費。
多個消費者可以訂閱同一個Queue,這時Queue中的消息會被平均分攤給多個消費者進行處理,而不是每個消費者都收到所有的消息并處理。
Message acknowledgment
在實際應用中,可能會發(fā)生消費者收到Queue中的消息,但沒有處理完成就宕機(或出現(xiàn)其他意外)的情況,這種情況下就可能會導致消息丟失。為了避免這種情況發(fā)生,我們可以要求消費者在消費完消息后發(fā)送一個回執(zhí)給RabbitMQ,RabbitMQ收到消息回執(zhí)(Message acknowledgment)后才將該消息從Queue中移除。
如果RabbitMQ沒有收到回執(zhí)并檢測到消費者的RabbitMQ連接斷開,則RabbitMQ會將該消息發(fā)送給其他消費者(如果存在多個消費者)進行處理。這里不存在timeout,一個消費者處理消息時間再長也不會導致該消息被發(fā)送給其他消費者,除非它的RabbitMQ連接斷開。
這里會產(chǎn)生另外一個問題,如果我們的開發(fā)人員在處理完業(yè)務邏輯后,忘記發(fā)送回執(zhí)給RabbitMQ,這將會導致嚴重的bug——Queue中堆積的消息會越來越多。消費者重啟后會重復消費這些消息并重復執(zhí)行業(yè)務邏輯。
另外publish message 是沒有ACK的。
Message durability
如果我們希望即使在RabbitMQ服務重啟的情況下,也不會丟失消息,我們可以將Queue與Message都設置為可持久化的(durable),這樣可以保證絕大部分情況下我們的RabbitMQ消息不會丟失。但依然解決不了小概率丟失事件的發(fā)生(比如RabbitMQ服務器已經(jīng)接收到生產(chǎn)者的消息,但還沒來得及持久化該消息時RabbitMQ服務器就斷電了),如果我們需要對這種小概率事件也要管理起來,那么我們要用到事務。由于這里僅為RabbitMQ的簡單介紹,所以這里將不講解RabbitMQ相關(guān)的事務。
Prefetch count
前面我們講到如果有多個消費者同時訂閱同一個Queue中的消息,Queue中的消息會被平攤給多個消費者。這時如果每個消息的處理時間不同,就有可能會導致某些消費者一直在忙,而另外一些消費者很快就處理完手頭工作并一直空閑的情況。我們可以通過設置Prefetch count來限制Queue每次發(fā)送給每個消費者的消息數(shù),比如我們設置prefetchCount=1,則Queue每次給每個消費者發(fā)送一條消息;消費者處理完這條消息后Queue會再給該消費者發(fā)送一條消息。
Exchange
在上一節(jié)我們看到生產(chǎn)者將消息投遞到Queue中,實際上這在RabbitMQ中這種事情永遠都不會發(fā)生。實際的情況是,生產(chǎn)者將消息發(fā)送到Exchange(交換器,下圖中的X),由Exchange將消息路由到一個或多個Queue中(或者丟棄)。
Exchange是按照什么邏輯將消息路由到Queue的?這個將在Binding一節(jié)中介紹。
RabbitMQ中的Exchange有四種類型,不同的類型有著不同的路由策略,這將在Exchange Types一節(jié)介紹。
Routing Key
生產(chǎn)者在將消息發(fā)送給Exchange的時候,一般會指定一個Routing Key,來指定這個消息的路由規(guī)則,而這個Routing Key需要與Exchange Type及Binding key聯(lián)合使用才能最終生效。
在Exchange Type與Binding key固定的情況下(在正常使用時一般這些內(nèi)容都是固定配置好的),我們的生產(chǎn)者就可以在發(fā)送消息給Exchange時,通過指定Routing Key來決定消息流向哪里。
RabbitMQ為Routing Key設定的長度限制為255 bytes。
Binding
RabbitMQ中通過Binding將Exchange與Queue關(guān)聯(lián)起來,這樣RabbitMQ就知道如何正確地將消息路由到指定的Queue了。
Binding key
在綁定(Binding)Exchange與Queue的同時,一般會指定一個Binding key。消費者將消息發(fā)送給Exchange時,一般會指定一個Routing Key。當 Binding key與Routing Key相匹配時,消息將會被路由到對應的Queue中。這個將在Exchange Types章節(jié)會列舉實際的例子加以說明。
在綁定多個Queue到同一個Exchange的時候,這些Binding允許使用相同的Binding key。
Binding key并不是在所有情況下都生效,它依賴于Exchange Type,比如fanout類型的Exchange就會無視Binding key,而是將消息路由到所有綁定到該Exchange的Queue。
Exchange Types
RabbitMQ常用的Exchange Type有fanout、direct、topic、headers這四種(AMQP規(guī)范里還提到兩種Exchange Type,分別為system與自定義,這里不予以描述),下面分別進行介紹。
fanout
fanout類型的Exchange路由規(guī)則非常簡單,它會把所有發(fā)送到該Exchange的消息路由到所有與它綁定的Queue中。
上圖中,生產(chǎn)者(P)發(fā)送到Exchange(X)的所有消息都會路由到圖中的兩個Queue,并最終被兩個消費者(C1與C2)消費。
direct
direct類型的Exchange路由規(guī)則也很簡單,它會把消息路由到那些Binding key與Routing key完全匹配的Queue中。
以上圖的配置為例,我們以routingKey="error"發(fā)送消息到Exchange,則消息會路由到Queue1(amqp.gen-S9b…,這是由RabbitMQ自動生成的Queue名稱)和Queue2(amqp.gen-Agl…);如果我們以Routing Key="info"或routingKey="warning"來發(fā)送消息,則消息只會路由到Queue2。如果我們以其他Routing Key發(fā)送消息,則消息不會路由到這兩個Queue中。
topic
前面講到direct類型的Exchange路由規(guī)則是完全匹配Binding Key與Routing Key,但這種嚴格的匹配方式在很多情況下不能滿足實際業(yè)務需求。topic類型的Exchange在匹配規(guī)則上進行了擴展,它與direct類型的Exchage相似,也是將消息路由到Binding Key與Routing Key相匹配的Queue中,但這里的匹配規(guī)則有些不同,它約定:
Routing Key為一個句點號“.”分隔的字符串(我們將被句點號". “分隔開的每一段獨立的字符串稱為一個單詞),如"stock.usd.nyse”、“nyse.vmw”、“quick.orange.rabbit”。Binding Key與Routing Key一樣也是句點號“. ”分隔的字符串。
Binding Key中可以存在兩種特殊字符"“與”#",用于做模糊匹配,其中"“用于匹配一個單詞,”#"用于匹配多個單詞(可以是零個)。
以上圖中的配置為例,routingKey=”quick.orange.rabbit”的消息會同時路由到Q1與Q2,routingKey=”lazy.orange.fox”的消息會路由到Q1,routingKey=”lazy.brown.fox”的消息會路由到Q2,routingKey=”lazy.pink.rabbit”的消息會路由到Q2(只會投遞給Q2一次,雖然這個routingKey與Q2的兩個bindingKey都匹配);routingKey=”quick.brown.fox”、routingKey=”orange”、routingKey=”quick.orange.male.rabbit”的消息將會被丟棄,因為它們沒有匹配任何bindingKey。
headers
headers類型的Exchange不依賴于Routing Key與Binding Key的匹配規(guī)則來路由消息,而是根據(jù)發(fā)送的消息內(nèi)容中的headers屬性進行匹配。
在綁定Queue與Exchange時指定一組鍵值對;當消息發(fā)送到Exchange時,RabbitMQ會取到該消息的headers(也是一個鍵值對的形式),對比其中的鍵值對是否完全匹配Queue與Exchange綁定時指定的鍵值對。如果完全匹配則消息會路由到該Queue,否則不會路由到該Queue。
該類型的Exchange沒有用到過(不過也應該很有用武之地),所以不做介紹。
RPC
MQ本身是基于異步的消息處理,前面的示例中所有的生產(chǎn)者(P)將消息發(fā)送到RabbitMQ后不會知道消費者(C)處理成功或者失敗(甚至連有沒有消費者來處理這條消息都不知道)。
但實際的應用場景中,我們很可能需要一些同步處理,需要同步等待服務端將我的消息處理完成后再進行下一步處理。這相當于RPC(Remote Procedure Call,遠程過程調(diào)用)。在RabbitMQ中也支持RPC。
RabbitMQ中實現(xiàn)RPC的機制是:
客戶端發(fā)送請求(消息)時,在消息的屬性(Message Properties,在AMQP協(xié)議中定義了14種properties,這些屬性會隨著消息一起發(fā)送)中設置兩個值replyTo(一個Queue名稱,用于告訴服務器處理完成后將通知我的消息發(fā)送到這個Queue中)和correlationId(此次請求的標識號,服務器處理完成后需要將此屬性返還,客戶端將根據(jù)這個id了解哪條請求被成功執(zhí)行了或執(zhí)行失敗)。服務器端收到消息處理完后,將生成一條應答消息到replyTo指定的Queue,同時帶上correlationId屬性。客戶端之前已訂閱replyTo指定的Queue,從中收到服務器的應答消息后,根據(jù)其中的correlationId屬性分析哪條請求被執(zhí)行了,根據(jù)執(zhí)行結(jié)果進行后續(xù)業(yè)務處理。
2.4 細節(jié)闡明
默認情況下,如果Message 已經(jīng)被某個Consumer正確的接收到了,那么該Message就會被從Queue中移除。當然也可以讓同一個Message發(fā)送到很多的Consumer。
如果一個Queue沒被任何的Consumer Subscribe(訂閱),當有數(shù)據(jù)到達時,這個數(shù)據(jù)會被cache,不會被丟棄。當有Consumer時,這個數(shù)據(jù)會被立即發(fā)送到這個Consumer。這個數(shù)據(jù)被Consumer正確收到時,這個數(shù)據(jù)就被從Queue中刪除。
那么什么是正確收到呢?通過ACK。每個Message都要被acknowledged(確認,ACK)。我們可以顯示的在程序中去ACK,也可以自動的ACK。如果有數(shù)據(jù)沒有被ACK,那么RabbitMQ Server會把這個信息發(fā)送到下一個Consumer。
如果這個APP有bug,忘記了ACK,那么RabbitMQ Server不會再發(fā)送數(shù)據(jù)給它,因為Server認為這個Consumer處理能力有限。而且ACK的機制可以起到限流的作用(Benefitto throttling):在Consumer處理完成數(shù)據(jù)后發(fā)送ACK,甚至在額外的延時后發(fā)送ACK,將有效的balance Consumer的load。
當然對于實際的例子,比如我們可能會對某些數(shù)據(jù)進行merge,比如merge 4s內(nèi)的數(shù)據(jù),然后sleep 4s后再獲取數(shù)據(jù)。特別是在監(jiān)聽系統(tǒng)的state,我們不希望所有的state實時的傳遞上去,而是希望有一定的延時。這樣可以減少某些IO,而且終端用戶也不會感覺到。
有兩種方式,第一種的Reject可以讓RabbitMQ Server將該Message 發(fā)送到下一個Consumer。第二種是從Queue中立即刪除該Message。
Consumer和Procuder都可以通過 queue.declare 創(chuàng)建queue。對于某個Channel來說,Consumer不能declare一個queue,卻訂閱其他的queue。當然也可以創(chuàng)建私有的queue。這樣只有APP本身才可以使用這個queue。queue也可以自動刪除,被標為auto-delete的queue在最后一個Consumer unsubscribe后就會被自動刪除。那么如果是創(chuàng)建一個已經(jīng)存在的queue呢?那么不會有任何的影響。需要注意的是沒有任何的影響,也就是說第二次創(chuàng)建如果參數(shù)和第一次不一樣,那么該操作雖然成功,但是queue的屬性并不會被修改。
那么誰應該負責創(chuàng)建這個queue呢?是Consumer,還是Producer?
如果queue不存在,當然Consumer不會得到任何的Message。那么Producer Publish的Message會被丟棄。所以,還是為了數(shù)據(jù)不丟失,Consumer和Producer都try to create the queue!反正不管怎么樣,這個接口都不會出問題。
queue對load balance的處理是完美的。對于多個Consumer來說,RabbitMQ 使用循環(huán)的方式(round-robin)的方式均衡的發(fā)送給不同的Consumer。
從架構(gòu)圖可以看出,Procuder Publish的Message進入了Exchange。接著通過"routing keys”, RabbitMQ會找到應該把這個Message放到哪個queue里。queue也是通過這個routing keys來做的綁定。有三種類型的Exchanges:direct, fanout,topic。 每個實現(xiàn)了不同的路由算法(routing algorithm)。
Direct exchange:**如果 routing key 匹配,那么Message就會被傳遞到相應的queue中。其實在queue創(chuàng)建時,它會自動的以queue的名字作為routing key來綁定那個exchange。
Fanout exchange: 會向響應的queue廣播。
**Topic exchange:**對key進行模式匹配,比如ab可以傳遞到所有ab的queue。
每個virtual host本質(zhì)上都是一個RabbitMQ Server,擁有它自己的queue,exchagne,和bings rule等等。這保證了你可以在多個不同的Application中使用RabbitMQ。
tcp創(chuàng)建銷毀有三次握手和四次揮手,開銷太大
操作系統(tǒng)tcp鏈接有限制,如果使用tcp鏈接,高峰期每秒成千上萬的鏈接造成資源浪費
channel的原理一個進程一條通道,多條進程多條通道公用一條tcp鏈接,一條tcp鏈接可以容納無限的channel,不會有性能瓶頸。
三 部署
此文檔為centos7 安裝部署
3.1 安裝erlang
# 配置yum源 cat > /etc/yum.repos.d/erlang.repo << EOF [rabbitmq_erlang] name=rabbitmq_erlang baseurl=https://packagecloud.io/rabbitmq/erlang/el/7/\$basearch repo_gpgcheck=1 gpgcheck=0 enabled=1 gpgkey=https://packagecloud.io/rabbitmq/erlang/gpgkey sslverify=1 sslcacert=/etc/pki/tls/certs/ca-bundle.crt metadata_expire=300 [rabbitmq_erlang-source] name=rabbitmq_erlang-source baseurl=https://packagecloud.io/rabbitmq/erlang/el/7/SRPMS repo_gpgcheck=1 gpgcheck=0 enabled=1 gpgkey=https://packagecloud.io/rabbitmq/erlang/gpgkey sslverify=1 sslcacert=/etc/pki/tls/certs/ca-bundle.crt metadata_expire=300 EOF
3.2 配置yum源
cat > /etc/yum.repos.d/rabbitmq.repo < 3.3 rabbitmq服務 yum -y install rabbitmq-server chkconfig rabbitmq-server on # 更改rabbitmq數(shù)據(jù)和日志存儲目錄 # 創(chuàng)建數(shù)據(jù)和日志目錄 mkdir -pv /data/rabbitmq/mnesia mkdir -pv /data/rabbitmq/log chown rabbitmq.rabbitmq /data/rabbitmq/* -R # 創(chuàng)建配置文件 cat >/etc/rabbitmq/rabbitmq-env.conf < 3.4 配置 vim /usr/lib/rabbitmq/lib/rabbitmq_server-3.7.17/ebin/rabbit.app 3.5 插件 rabbitmq-plugins enable rabbitmq_management # rabbitmq 為了安全guest用戶只能localhost訪問,開啟guest/guest登陸 cat > /etc/rabbitmq/rabbitmq.config < 四 使用 由于技術(shù)棧為python,此處簡單舉例python中rabbitmq的使用 Introduction 由于AMQP是雙向RPC協(xié)議,客戶端可以向服務器發(fā)送請求,服務器可以向客戶端發(fā)送請求,因此Pika在其每個異步連接適配器中實現(xiàn)或擴展IO循環(huán)。這些IO循環(huán)是阻塞循環(huán)和偵聽事件的方法。每個異步適配器都遵循相同的標準來調(diào)用IO循環(huán)。創(chuàng)建連接適配器時會創(chuàng)建IO循環(huán)。要為任何給定的適配器啟動IO循環(huán),請調(diào)用connection.ioloop.start()方法。 install pip install pika demo We start by creating our connection object, then starting our event loop. When we are connected, the on_connected method is called. In that method we create a channel. When the channel is created, the on_channel_open method is called. In that method we declare a queue. When the queue is declared successfully, on_queue_declared is called. In that method we call channel.basic_consume telling it to call the handle_delivery for each message RabbitMQ delivers to us. When RabbitMQ has a message to send us, it calls the handle_delivery method passing the AMQP Method frame, Header frame, and Body. import pika # Create a global channel variable to hold our channel object in channel = None # Step #2 def on_connected(connection): """Called when we are fully connected to RabbitMQ""" # Open a channel connection.channel(on_open_callback=on_channel_open) # Step #3 def on_channel_open(new_channel): """Called when our channel has opened""" global channel channel = new_channel channel.queue_declare(queue="test", durable=True, exclusive=False, auto_delete=False, callback=on_queue_declared) # Step #4 def on_queue_declared(frame): """Called when RabbitMQ has told us our Queue has been declared, frame is the response from RabbitMQ""" channel.basic_consume('test', handle_delivery) # Step #5 def handle_delivery(channel, method, header, body): """Called when we receive a message from RabbitMQ""" print(body) # Step #1: Connect to RabbitMQ using the default parameters parameters = pika.ConnectionParameters() connection = pika.SelectConnection(parameters, on_open_callback=on_connected) try: # Loop so we can communicate with RabbitMQ connection.ioloop.start() except KeyboardInterrupt: # Gracefully close the connection connection.close() # Loop until we're fully closed, will stop on its own connection.ioloop.start() 五 消息發(fā)送服務設計 最總消費者和生產(chǎn)者總體均跑在k8s集群總,對于消息發(fā)送服務生產(chǎn)者發(fā)送消息攜帶routing_key,使用confirm確認,exchange使用direct模式,對應bind_key發(fā)送到對應queue中,在對英queue的connection中啟動多個channel,每個對應自己多個consumer來提高并發(fā)。 參考鏈接 https://github.com/rabbitmq/erlang-rpm https://www.rabbitmq.com/install-rpm.html https://juejin.im/entry/6844903492549787661 https://blog.csdn.net/weixin_38608626/column/info/43439 https://www.bilibili.com/video/av57640471/?p=9 【綻放吧!數(shù)據(jù)庫】有獎征文火熱進行中:https://bbs.huaweicloud.com/blogs/285617 RabbitMQ 數(shù)據(jù)庫
版權(quán)聲明:本文內(nèi)容由網(wǎng)絡用戶投稿,版權(quán)歸原作者所有,本站不擁有其著作權(quán),亦不承擔相應法律責任。如果您發(fā)現(xiàn)本站中有涉嫌抄襲或描述失實的內(nèi)容,請聯(lián)系我們jiasou666@gmail.com 處理,核實后本網(wǎng)站將在24小時內(nèi)刪除侵權(quán)內(nèi)容。