微吼云上線多路互動直播服務 加速多場景互動直播落地
814
2025-04-01
寫在前面
嗯,陸續的整理一些中間件的筆記
今天和小伙伴們分享RabbitMQ相關的筆記
博文內容涉及:
《分布式消息中間件實踐》 RabbitMQ部分讀書筆記
RabbitMQ的簡單介紹
AMQP協議標準介紹
RabbitMQ部分場景API Demo
寫在前面
嗯,陸續的整理一些中間件的筆記
今天和小伙伴們分享RabbitMQ相關的筆記
博文內容涉及:
《分布式消息中間件實踐》 RabbitMQ部分讀書筆記
RabbitMQ的簡單介紹
AMQP協議標準介紹
RabbitMQ部分場景API Demo
《分布式消息中間件實踐》 RabbitMQ部分讀書筆記
RabbitMQ的簡單介紹
AMQP協議標準介紹
RabbitMQ部分場景API Demo
「 傍晚時分,你坐在屋檐下,看著天慢慢地黑下去,心里寂寞而凄涼,感到自己的生命被剝奪了。當時我是個年輕人,但我害怕這樣生活下去,衰老下去。在我看來,這是比死亡更可怕的事。--------王小波」
RabbitMQ
RabbitMQ簡介
RabbitMQ是一個由Erlang語言開發的基于AMOP標準的開源消息中間件。RabbitMQ最初起源于金融系統,用于在分布式系統中存儲轉發消息,在易用性、擴展性、高可用性等方面表現不俗。其具體特點包括:
保證可靠性( Reliability), RabbitMQ使用一些機制來保證可靠性,如持久化、傳輸確認、發布確認等。
具有靈活的路由(Flexible Routing)功能。在消息進入隊列之前,是通過Exchange (交換器)來路由消息的。對于典型的路由功能, RabbitMQ已經提供了一些內置的Exchange來實現。針對更復雜的路由功能,可以將多個Exchange綁定在一起,也可以通過插件機制來實現自己的Exchange.
支持消息集群(Clustering),多臺RabbiMQ服務器可以組成一個集群,形成一個邏輯Broker.
具有高可用性(Highly Available),隊列可以在集群中的機器上進行鏡像,使得在部分節點出現問題的情況下隊列仍然可用。
支持多種協議(Multi-protocol), RabbitMQ除支持AMQP協議之外,還通過插件的方式支持其他消息隊列協議,比如STOMP, MQTT等。
支持多語言客戶端(Many Client),RabbitMQ幾乎支持所有常用的語言,比如Java. .NET, Ruby等
提供管理界面(Management UI), RabbitMQ提供了一個易用的用戶界面,使得用戶可以監控和管理消息Broker的許多方面
提供跟蹤機制(Tracing), RabbitMQ提供了消息跟蹤機制,如果消息異常,使用者可以查出發生了什么情況。
提供插件機制(Plugin System), RabbitMQ提供了許多插件,從多方面進行擴展,也可以編寫自己的插件.
上面講到了AMOP協議,這里我們簡單了解下
AMQP標準
在2004年,摩根大通和iMatrix開始著手Advanced Message Queuing Protocol (AMQP)開放標準的開發。2006年,發布了AMQP規范。目前AMQP協議的版本為1.0。
「一般來說,將AMQP協議的內容分為三部分:基本概念、功能命令和傳輸層協議」 。
基本概念:指AMQP內部定義的各組件及組件的功能說明。
功能命令:指該協議所定義的一系列命令,應用程序可以基于這些命令來實現相應的功能。
傳輸層協議(TCP/UDP):是一個網絡級協議,它定義了數據的傳輸格式,消息隊列的客戶端可以基于這個協議與消息代理和AMQP的相關模型進行交互通信,該協議的內容包括數據幀處理、信道復用、內容編碼、心跳檢測、數據表示和錯誤處理等。
「主要概念」
Message (消息) :消息服務器所處理數據的原子單元。消息可以攜帶內容,從格式上看,消息包括一個內容頭、一組屬性和一個內容體。
這里所說的消息可以對應到許多不同應用程序的實體,比如一個應用程序級消息、一個傳輸文件、一個數據流幀等。消息可以被保存到磁盤上,這樣即使發生嚴重的網絡故障、服務器崩潰也可確保投遞消息可以有優先級,高優先級的消息會在等待同一個消息隊列時在低優先級的消息之前發送,當消息必須被丟棄以確保消息服務器的服務質量時,服務器將會優先丟棄低優先級的消息。消息服務器不能修改所接收到的并將傳遞給消費者應用程序的消息內容體。消息服務器可以在內容頭中添加額外信息,但不能刪除或修改現有信息。
Publisher (消息生產者):也是一個向交換器發布消息的客戶端應用程序。
Exchange (交換器):用來接收消息生產者所發送的消息并將這些消息路由給服務器中的隊列。
Binding (綁定):用于消息隊列和交換器之間的關聯。一個綁定就是基于路由鍵將交換器和消息隊列連接起來的路由規則
所以可以將交換器理解成一個由綁定構成的路由表(路由控制表,IP尋址)。
Virtual Host (虛擬主機):它是消息隊列以及相關對象的集合,是共享同一個身份驗證和加密環境的獨立服務器域。每個虛擬主機本質上都是一個mini版的消息服務器,擁有自己的隊列、交換器、綁定和權限機制。
Broker (消息代理):表示消息隊列服務器,接受客戶端連接,實現AMQP消息隊列和路由功能的過程。
Routing Key (路由規則):虛擬機可用它來確定如何路由一個特定消息。
Queue (消息隊列):用來保存消息直到發送給消費者。它是消息的容器,也是消息的終點。一個消息可被投入一個或多個隊列中。消息一直在隊列里面,等待消費者連接到這個隊列將其取走。
Connection (連接):可以理解成客戶端和消息隊列服務器之間的一個TCP連接。
Channel (信道):僅僅當創建了連接后,若客戶端還是不能發送消息,則需要為連接創建一個信道。信道是一條獨立的雙向數據流通道,它是建立在真實的TCP連接內的虛擬連接。
AMQP命令都是通過信道發出去的,不管是發布消息、訂閱隊列還是接收消息,它們都通過信道完成。一個連接可以包含多個信道,之所以需要信道,是因為TCP連接的建立和釋放都是十分昂貴的,如果客戶端的每一個線程都需要與消息服務器交互,如果每一個線程都建立了一個TCP連接,則暫且不考慮TCP連接是否浪費,就算操作系統也無法承受每秒建立如此多的TCP連接。
Consumer (消息消費者):表示一個從消息隊列中取得消息的客戶端應用程序。
「消息的生命周期,一條消息的流轉過程通常是這樣的」:
Publisher(消息生產者)產生一條數據,發送到Broker(消息代理), Broker中的Exchange(交換器)可以被理解為一個規則表(Routing Key和Queue的映射關系-Binding), Broker收到消息后根據Routing Key查詢投遞的目標Queue.
Consumer向Broker發送訂閱消息時會指定自己監聽哪個Queue,當有數據到達Queue時Broker會推送數據到Consumer.
「交換器的生命周期」
每臺AMQP服務器都預先創建了許多交換器實例,它們在服務器啟動時就存在并且不能被銷毀。如果你的應用程序有特殊要求,則可以選擇自己創建交換器,并在完成工作后進行銷毀。
「隊列的生命周期」
這里主要有兩種消息隊列的生命周期,即持久化消息隊列和臨時消息隊列。持久化消息隊列可被多個消費者共享,不管是否有消費者接收,它們都可以獨立存在。臨時消息隊列對某個消費者是私有的,只能綁定到此消費者,當消費者斷開連接時,該消息隊列將被刪除。
「功能命令」
AMQP協議文本是分層描述的,在不同主版本中劃分的層次是有一定區別的。
「0-9」 版本共分兩層: Functional Layer (功能層)和Transport Layer (傳輸層)
功能層定義了一系列命令,這些命令按功能邏輯組合成不同的類(Class),客戶端應用可以利用它們來實現自己的業務功能。
傳輸層將功能層所接收的消息傳遞給服務器經過相應處理后再返回,處理的事情包括信道復用、幀同步、內容編碼、心跳檢測、數據表示和錯誤處理等.
「0-10」 版本則分為三層: Model Layer (模型層)、Session Layer (會話層)和Transport Layer(傳輸層)。
模型層定義了一套命令,客戶端應用利用這些命令來實現業務功能。
會話層負責將命令從客戶端應用傳遞給服務器,再將服務器的響應返回給客戶端應用,會話層為這個傳遞過程提供了可靠性、同步機制和錯誤處理。
傳輸層負責提供幀處理、信道復用、錯誤檢測和數據表示
所有的消息必須有特定的格式來支持,這部分就是在傳輸層中定義的。AMQP是二進制協議,協議的不同版本在該部分的描述有所不同。0-9-1版本為例,看一下該版本中的消息格式
所有的消息數據都被組織成各種類型的幀(Frame),幀可以攜帶協議方法和其他信息,所有幀都有同樣的格式,都由一個幀頭(header, 7個字節)、任意大小的負載(payload)和一個檢測錯誤的結束幀(frame-end)字節組成。其中:
幀頭包括一個type字段、一個channel字段和一個size字段;
幀負載的格式依賴幀類型(type)
要讀取一個幀需要三步。
①讀取幀頭,檢查幀類型和通道(channel).
②根據幀類型讀取幀負載并進行處理。
③讀取結束幀字節。
AMQP定義了如下幀類型。
type=1, "METHOD":方法幀;
type=2, "HEADER":內容頭幀;
type=3,"BODY":內容體幀;
type=4, "HEARTBEAT":心跳幀通道
編號為0的代表全局連接中的所有幀, 1-65535代表特定通道的幀。size字段是指幀負載的大小,它的數值不包括結束幀字節。AMQP使用結束幀來檢測錯誤客戶端和服務器實現引起的錯誤。
RabbitMQ基本概念
如圖是RabbitMQ的整體架構圖。
Message (消息):消息是不具名的,它由消息頭和消息體組成。消息體是不透明的,而消息頭則由一系列可選屬性組成,這些屬性包括 routing-key (路由鍵),priority (相對于其他消息的優先級)、 delivery-mode (指出該消息可能需要持久化存儲)等。
Publisher (消息生產者):一個向交換器發布消息的客戶端應用程序。
Exchange (交換器):用來接收生產者發送的消息,并將這些消息路由給服務器中的隊列。.
RabbitMQ是AMQP協議的一個開源實現,所以其基本概念也就是AMQPt中的基本概念。關于其他的概念小伙伴可以看上面。
(1) AMQP中的消息路由
在AMQP中增加了Exchange和Binding的角色。生產者需要把消息發布到Exchange上,消息最終到達隊列并被消費者接收,而Binding決定交換器上的消息應該被發送到哪個隊列中。
(2)交換器類型
不同類型的交換器分發消息的策略也不同,目前交換器有4種類型: Direct, Fanout, Topic,Headers。其中Headers交換器匹配AMQP消息的Header而不是路由鍵。此外, Headers交換器和Direct交換器完全一致,但性能相差很多,目前幾乎不用了,所以下面我們看另外三種類型。
「如果消息中的路由鍵(routing key)和Binding中的綁定鍵(binding key)一致,交換器就將消息發送到對應的隊列中」.
路由鍵與隊列名稱要完全匹配,如果將一個隊列綁定到交換機要求路由鍵為“dog",則只轉發routing key標記為"dog"的消息,不會轉發"dog.puppy"消息,也不會轉發"dog.guard "消息等。Direct交換器是完全匹配、單播的模式。
Fanout交換器
「Fanout交換器不處理路由鍵,只是簡單地將隊列綁定到交換器」 發送到交換器的每條消息都會被轉發到與該交換器綁定的所有隊列中,這很像子網廣播,子網內的每個主機都獲得了一份復制的消息。通過Fanout交換器轉發消息是最快的。
Topic交換器
「Topic交換器通過模式匹配分配消息的路由鍵屬性,將路由鍵和某種模式進行匹配,此時隊列需要綁定一種模式?!?/p>
Topic交換器將路由鍵和綁定鍵的字符串切分成單詞,這些單詞之間用點"."隔開,該交換器會識別兩個通配符: “#”和“*”,其中“#”匹配0個或多個單詞, “*”匹配不多不少一個單詞。
RabbitMQ Demo
RabbitMQ官網:https://www.rabbitmq.com/
RabbitMQ服務安裝
基于Docker的安裝:
RabbitMQ鏡像 :https://registry.hub.docker.com/_/rabbitmq?tab=description&page=2&ordering=last_updated
# 啟動docker服務 [root@liruilong ~]# systemctl restart docker # 查看鏡像 [root@liruilong ~]# docker images #指定版本,該版本包含了web控制頁面 [root@liruilong ~]# docker pull rabbitmq:management
運行容器:
方式一:默認guest 用戶,密碼也是 guest
[root@liruilong ~]# docker run -d --hostname my-rabbit --name rabbit -p 15672:15672 -p 5672:5672 rabbitmq:management
方式二:設置用戶名和密碼
[root@liruilong ~]# docker run -d --hostname my-rabbit --name rabbit -e RABBITMQ_DEFAULT_USER=user -e RABBITMQ_DEFAULT_PASS=password -p 15672:15672 -p 5672:5672 rabbitmq:management
發布服務,將端口映射到15672,5672
[root@liruilong ~]# docker run -d --hostname my-rabbit --name rabbit -p 15672:15672 -p 5672:5672 rabbitmq:management 2189f2fa53f1e76306a2ad422e0fa33bca1ae0f3ee77514573d71aca9ce24801 [root@liruilong ~]#
「這里需要注意的是端口綁定,需要把訪問端口和管理端口同時綁定。如果是ESC的話,需要配置安全組」
訪問路徑:http://localhost:15672/ 登錄
Hello World!
「Java客戶端訪問RabbitMQ實例」
RabbitMQ支持多種語言訪問。使用java需要添加的maven依賴,下面我們看一個簡單的Demo
「消息生產者」
package msg_queue.rabbitmq; import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; /** * @author Liruilong * @Description TODO 消息生產者 * @date 2022/4/20 20:46 **/ public class Producer { public static void main(String[] args) throws IOException, TimeoutException { //創建連接工廠 ConnectionFactory factory = new ConnectionFactory(); factory.setUsername("guest"); factory.setPassword("guest"); //設置 RabbitMQ 地址 factory.setHost("localhost"); //默認訪問5672端口 factory.setPort(5672); factory.setVirtualHost("/"); //建立到代理服務器到連接 try (Connection conn = factory.newConnection(); //創建信道 Channel channel = conn.createChannel()) { //聲明交換器 String exchangeName = "hello-exchange"; channel.exchangeDeclare(exchangeName, "direct", true); // 定義 路由鍵 String routingKey = "testRoutingKey"; //發布消息 byte[] messageBodyBytes = "學習Rabbitmq".getBytes(); channel.basicPublish(exchangeName, routingKey, null, messageBodyBytes); } } }
首先創建一個連接工廠,再根據連接工廠創建連接,之后從連接中創建信道,接著聲明一個交換器和指定路由鍵,然后才發布消息,最后將所創建的信道、連接等資源關閉。代碼中的ConnectionFactory, Connection、 Channel都是RabbitMQ提供的API中最基本的類。
ConnectionFactory是Connection的制造工廠
Connection代表RabbitMQ的Socket連接,它封裝了Socket操作的相關邏輯。
Channel是與RabbitMQ打交道的最重要的接口,大部分業務操作都是在Channel中完成的,比如定義隊列、定義交換器、隊列與交換器的綁定、發布消息等。
「消息消費者」
package msg_queue.rabbitmq; import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; /** * @author Liruilong * @Description TODO 消息消費者 * @date 2022/4/20 20:48 **/ public class Consumer { public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setUsername("guest"); factory.setPassword("guest"); factory.setHost("127.0.0.1"); factory.setVirtualHost("/"); //建立到代理服務器到連接 try (Connection conn = factory.newConnection(); //創建信道 final Channel channel = conn.createChannel()) { //聲明交換器 String exchangeName = "hello-exchange"; // true 設置是否持久化 channel.exchangeDeclare(exchangeName, "direct", true); //聲明隊列 String queueName = channel.queueDeclare().getQueue(); String routingKey = "testRoutingKey"; //綁定隊列,通過鍵 testRoutingKey 將隊列和交換器綁定起來 channel.queueBind(queueName, exchangeName, routingKey); //消費消息 while (true) { // 設置是否自動確認,當消費者接收到消息后要告訴 mq 消息已接收,如果將此參數設置為 true 表示會自動回復 mq,如果設置為 false,要通過編程實現回復 boolean autoAck = false; channel.basicConsume(queueName, autoAck // 設置消費者獲取消息成功的回調函數 , (consumerTag, delivery) -> { System.out.printf("消費的消息體內容:%s\n", new String(delivery.getBody(), "UTF-8")); System.out.println("消費的路由鍵:" + delivery.getEnvelope().getRoutingKey()); System.out.println("消費的內容類型:" + delivery.getProperties().getContentType()); System.out.println("consumerTag:"+consumerTag); //確認消息 channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); // 設置消費者獲取消息失敗的回調函數 }, consumerTag -> { System.out.println("consumerTag:"+consumerTag); }); } } } }
消費的消息體內容:學習Rabbitmq 消費的路由鍵:testRoutingKey 消費的內容類型:null consumerTag:amq.ctag-rC_49IlY-Awwj7G_hXIR_Q
通道
消息客戶端和消息服務器之間的通信是雙向的,不管是對客戶端還是服務器來說,保持它們之間的網絡連接是很耗費資源的。為了在不占用大量TCP/P連接的情況下也能有大量的邏輯連接, AMQP增加了通道(Channel)的概念..
RabbitMQ支持并鼓勵在一個連接中創建多個通道,因為相對來說創建和銷毀通道的代價會小很多。需要提醒的是,作為經驗法則,應該盡量避免在線程之間共享通道,你的應用應該使用每個線程單獨的通道,而不是在多個線程上共享同一個通道,因為大多數客戶端不會讓通道線程安全(因為這將對性能產生嚴重的負面影響)。
總結
個人認為, RabbitMQ最大的優勢在于提供了比較靈活的消息路由策略、高可用性、可靠性,以及豐富的插件、多種平臺支持和完善的文檔。不過,由于AMQP協議本身導致它的實現比較重量,從而使得與其他MQ (比如Kafka)對比其吞吐量處于下風。在選擇MQ時關鍵還是看需求-是更看重消息的吞吐量、消息堆積能力還是消息路由的靈活性、高可用性、可靠性等方面,先確定場景,再對不同產品進行有針對性的測試和分析,最終得到的結論才能作為技術選型的依據
RabbitMQ TCP/IP 分布式
版權聲明:本文內容由網絡用戶投稿,版權歸原作者所有,本站不擁有其著作權,亦不承擔相應法律責任。如果您發現本站中有涉嫌抄襲或描述失實的內容,請聯系我們jiasou666@gmail.com 處理,核實后本網站將在24小時內刪除侵權內容。
版權聲明:本文內容由網絡用戶投稿,版權歸原作者所有,本站不擁有其著作權,亦不承擔相應法律責任。如果您發現本站中有涉嫌抄襲或描述失實的內容,請聯系我們jiasou666@gmail.com 處理,核實后本網站將在24小時內刪除侵權內容。