【RabbitMQ】Go語言實現六種消息中間件模型
文章目錄

寫在前面
1. 介紹
1.1 什么是MQ
1.2 什么是RabbitMQ
1.3 AMQP 協議
2. Go語言操作RabbitMQ
2.1 下載
2.2 引入驅動
2.3 HelloWorld 模型
2.3.1 生產者
2.3.2 消費者
2.3.3 結果
2.4 Work Queues 模型
2.4.1 生產者
2.4.2 消費者
2.4.3 結果
2.5 Publish/Subscribe 模型
2.5.1 生產者
2.5.2 消費者
2.5.3 結果
2.6 Routing 模型
2.6.1 生產者
2.6.2 消費者
2.7 Topics 模型
2.7.1 生產者
2.7.2 消費者
2.8 RPC 模型
寫在前面
本文是使用Go語言實現各種RabbitMQ的中間件模型
1. 介紹
1.1 什么是MQ
MQ(Message Quene) : 翻譯為 消息隊列,通過典型的 生產者和消費者模型,生產者不斷向消息隊列中生產消息,消費者不斷的從隊列中獲取消息。因為消息的生產和消費都是異步的,而且只關心消息的發送和接收,沒有業務邏輯的侵入,輕松的實現系統間解耦。
別名為 消息中間件 通過利用高效可靠的消息傳遞機制進行平臺無關的數據交流,并基于數據通信來進行分布式系統的集成。
目前市面上有很多消息中間件:RabbitMQ,RocketMQ,Kafka等等…
1.2 什么是RabbitMQ
RabbitMQ是使用Erlang語言開發的開源消息隊列系統,基于AMQP協議來實現。AMQP的主要特征是面向消息、隊列、路由(包括點對點和發布/訂閱)、可靠性、安全。AMQP協議更多用在企業系統內對數據一致性、穩定性和可靠性要求很高的場景,對性能和吞吐量的要求可能比較低了。
1.3 AMQP 協議
AMQP(advanced message queuing protocol) 在2003年時被提出,最早用于解決金融領不同平臺之間的消息傳遞交互問題。
顧名思義,AMQP是一種協議,更準確的說是一種binary wire-level protocol(鏈接協議)。這是其和JMS的本質差別,AMQP不從API層進行限定,而是直接定義網絡交換的數據格式。這使得實現了AMQP的provider天然性就是跨平臺的。以下是AMQP協議模型:
2. Go語言操作RabbitMQ
2.1 下載
下載rabbitmq過程就省了,可以直接到官網網站下載安裝,像安裝qq一樣。
2.2 引入驅動
驅動
go get github.com/streadway/amqp
連接
var MQ *amqp.Connection // RabbitMQ 鏈接 func RabbitMQ(connString string) { conn, err := amqp.Dial(connString) if err != nil { panic(err) } MQ = conn }
1
2
3
4
5
6
7
8
9
10
2.3 HelloWorld 模型
P代表生產者,C代表消費者,紅色部分是隊列。
生產者生成消息到隊列中,消費者進行消費,直連單點模式。
2.3.1 生產者
聲明連接對象
var ProductMQ *amqp.Connection
1
聲明通道
ch, err := ProductMQ.Channel()
1
創建隊列
q, err := ch.QueueDeclare("hello", // 隊列名字 false, // 是否持久化, false, // 不用的時候是否自動刪除 false, // 用來指定是否獨占隊列 false, // no-wait nil, // 其他參數 )
1
2
3
4
5
6
7
參數1(name):隊列名字
參數2(durable):持久化,隊列中所有的數據都是在內存中的,如果為true的話,這個通道關閉之后,數據就會存在磁盤中持久化,false的話就會丟棄
參數3(autoDelete):不需要用到隊列的時候,是否將消息刪除
參數4(exclusive):是否獨占隊列,true的話,就是只能是這個進程獨占這個隊列,其他都不能對這個隊列進行讀寫
參數5(noWait):是否阻塞
參數6(args):其他參數
發布消息
body := "Hello World!" err = ch.Publish( "", // 交換機 q.Name, // 隊列名字 false, // 是否強制性 // 當mandatory標志位設置為true時,如果exchange根據自身類型和消息routeKey無法找到一個符合條件的queue,那么會調用basic.return方法將消息返回給生產者 // 當mandatory設置為false時,出現上述情形broker會直接將消息扔掉 false, //當immediate標志位設置為true時,如果exchange在將消息路由到queue(s)時發現對于的queue上么有消費者,那么這條消息不會放入隊列中。當與消息routeKey關聯的所有queue(一個或者多個)都沒有消費者時,該消息會通過basic.return方法返還給生產者 // 是否立刻 /** 概括來說,mandatory標志告訴服務器至少將該消息route到一個隊列中,否則將消息返還給生產者;immediate標志告訴服務器如果該消息關聯的queue上有消費者,則馬上將消息投遞給它,如果所有queue都沒有消費者,直接把消息返還給生產者,不用將消息入隊列等待消費者了。 **/ amqp.Publishing{ ContentType: "text/plain", Body: []byte(body), // 發送的消息 })
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
參數1(exchange):交換機,后續會講到
參數2(route-key):隊列名字
參數3(mandatory):是否強制性,
當mandatory標志位設置為true時,如果exchange根據自身類型和消息routeKey無法找到一個符合條件的queue,那么會調用basic.return方法將消息返回給生產者
當mandatory設置為false時,出現上述情形broker會直接將消息扔掉
參數4(immediate):是否立即處理
當immediate標志位設置為true時,如果exchange在將消息路由到queue(s)時發現對于的queue上么有消費者,那么這條消息不會放入隊列中。當與消息routeKey關聯的所有queue(一個或者多個)都沒有消費者時,該消息會通過basic.return方法返還給生產者
也就是說,mandatory 標志告訴服務器至少將該消息route到一個隊列中,否則將消息返還給生產者;immediate標志告訴服務器如果該消息關聯的queue上有消費者,則馬上將消息投遞給它,如果所有queue都沒有消費者,直接把消息返還給生產者,不用將消息入隊列等待消費者了。
參數5(msg):發布的消息,ContentType是傳輸類型,Body是發送的消息。
2.3.2 消費者
聲明通道
ch, err := ConsumerMQ.Channel()
1
創建隊列
q, err := ch.QueueDeclare( "hello", false, false, false, false, nil, )
1
2
3
4
5
6
7
8
讀取隊列消息
msgs, err := ch.Consume( q.Name, "", true, false, false, false, nil, )
1
2
3
4
5
6
7
8
9
由于消費者端需要一直監聽,所以我們要用一個for循環+channel去阻塞主進程,使得主進程一直處于監聽狀態。
forever := make(chan bool) go func() { for d := range msgs { fmt.Printf("Received a message: %s", d.Body) } }() fmt.Printf(" [*] Waiting for messages. To exit press CTRL+C") <-forever
1
2
3
4
5
6
7
8
2.3.3 結果
生產者
消費者
2.4 Work Queues 模型
Work queues,也被稱為(Task queues),任務模型。當消息處理比較耗時的時候,可能生產消息的速度會遠遠大于消息的消費速度。長此以往,消息就會堆積越來越多,無法及時處理。
此時就可以使用work queues模型:讓多個消費者綁定到一個隊列,共同消費隊列中的消息。隊列中的消息一旦消費,就會消失,因此任務是不會被重復執行的。
2.4.1 生產者
生成10條消息到隊列中
body := "Hello World! " for i := 0; i < 10; i++ { msg := strconv.Itoa(i) err = ch.Publish( "", // 交換機 q.Name, // 隊列名字 false, // 是否強制性 false, // 是否立刻 amqp.Publishing{ ContentType: "text/plain", Body: []byte(body+msg), // 發送的消息 }) }
1
2
3
4
5
6
7
8
9
10
11
12
13
2.4.2 消費者
創建兩個一樣的消費者進行監聽消費,與上面2.3.2的消費者保持一致
2.4.3 結果
消費者1號
消費者2號
2.5 Publish/Subscribe 模型
fanout 扇出 也稱為廣播
在廣播模式下,消息發送流程如下:
可以有多個消費者
每個消費者有自己的queue(隊列)
每個隊列都要綁定到Exchange(交換機)
生產者發送的消息,只能發送到交換機,交換機來決定要發給哪個隊列,生產者無法決定。
交換機把消息發送給綁定過的所有隊列
隊列的消費者都能拿到消息。實現一條消息被多個消費者消費
2.5.1 生產者
聲明交換機
_ = ch.ExchangeDeclare("logs", "fanout", true, false, false, false, nil)
1
參數1(name):交換機名稱
參數2(kind):交換機類型
生產消息
_ = ch.Publish("logs", "", false, false, amqp.Publishing{ ContentType: "text/plain", Body: []byte(body), })
1
2
3
4
5
2.5.2 消費者
聲明交換機
_ = ch.ExchangeDeclare("logs", "fanout", true, false, false, false, nil, )
1
聲明隊列
q, _ := ch.QueueDeclare("", false, false, true, false, nil, )
1
綁定交換機
_ = ch.QueueBind(q.Name, "", "logs", false, nil, )
1
消費消息
msgs, _ := ch.Consume(q.Name, "", true, false, false, false, nil, )
1
2.5.3 結果
生產者
消費者
2.6 Routing 模型
P:生產者,向Exchange發送消息,發送消息時,會指定一個routing key。
X:Exchange(交換機),接收生產者的消息,然后把消息遞交給 與routing key完全匹配的隊列
C1:消費者,其所在隊列指定了需要routing key 為 error 的消息
C2:消費者,其所在隊列指定了需要routing key 為 info、error、warning 的消息
在fanout模式中,一條消息,會被所有訂閱的隊列都消費。但是,在某些場景下,我們希望不同的消息被不同的隊列消費。這時就要用到Direct類型的Exchange。
在Direct模型下:
隊列與交換機的綁定,不能是任意綁定了,而是要指定一個RoutingKey(路由key)
消息的發送方在 向 Exchange發送消息時,也必須指定消息的 RoutingKey。
Exchange不再把消息交給每一個綁定的隊列,而是根據消息的Routing Key進行判斷,只有隊列的Routingkey與消息的 Routing key完全一致,才會接收到消息
2.6.1 生產者
_ = ch.ExchangeDeclare("logs_direct", "direct", true, false, false, false, nil, ) body := "Hello World " _ = ch.Publish("logs_direct", "", false, false, amqp.Publishing{ ContentType: "text/plain", Body: []byte(body), })
1
2
3
4
5
6
7
2.6.2 消費者
只接受warn
_ = ch.ExchangeDeclare("logs_direct", "direct", true, false, false, false, nil, ) q, _ := ch.QueueDeclare("hello", false, false, true, false, nil, ) _ = ch.QueueBind(q.Name, "warn", "logs_direct", false, nil, ) msgs, _ := ch.Consume(q.Name, "", true, false, false, false, nil, )
1
2
3
4
只接受info
_ = ch.ExchangeDeclare("logs_direct", "direct", true, false, false, false, nil, ) q, _ := ch.QueueDeclare("hello", false, false, true, false, nil, ) _ = ch.QueueBind(q.Name, "info", "logs_direct", false, nil, ) msgs, _ := ch.Consume(q.Name, "", true, false, false, false, nil, )
1
2
3
4
2.7 Topics 模型
Topic類型的Exchange與Direct相比,都是可以根據RoutingKey把消息路由到不同的隊列。只不過Topic類型Exchange可以讓隊列在綁定Routing key 的時候使用通配符!
這種模型Routingkey 一般都是由一個或多個單詞組成,多個單詞之間以”.”分割,例如: item.insert
統配符
* 匹配不多不少恰好1個詞
# 匹配一個或多個詞
如:
fan.# 匹配 fan.one.two 或者 fan.one 等
fan.* 只能匹配 fan.one
2.7.1 生產者
_ = ch.ExchangeDeclare("logs_topic", "topic", true, false, false, false, nil, ) body := "Hello World " _ = ch.Publish("logs_topic", "", false, false, amqp.Publishing{ ContentType: "text/plain", Body: []byte(body), })
1
2
3
4
5
6
7
2.7.2 消費者
只接受*.one
_ = ch.ExchangeDeclare("logs_topic", "topic", true, false, false, false, nil, ) q, _ := ch.QueueDeclare("hello", false, false, true, false, nil, ) _ = ch.QueueBind(q.Name, "*.one", "logs_topic", false, nil, ) msgs, _ := ch.Consume(q.Name, "", true, false, false, false, nil, )
1
2
3
4
只接受*.fan
_ = ch.ExchangeDeclare("logs_topic", "topic", true, false, false, false, nil, ) q, _ := ch.QueueDeclare("hello", false, false, true, false, nil, ) _ = ch.QueueBind(q.Name, "*.fan", "logs_topic", false, nil, ) msgs, _ := ch.Consume(q.Name, "", true, false, false, false, nil, )
1
2
3
4
2.8 RPC 模型
日后補充
Go RabbitMQ
版權聲明:本文內容由網絡用戶投稿,版權歸原作者所有,本站不擁有其著作權,亦不承擔相應法律責任。如果您發現本站中有涉嫌抄襲或描述失實的內容,請聯系我們jiasou666@gmail.com 處理,核實后本網站將在24小時內刪除侵權內容。
版權聲明:本文內容由網絡用戶投稿,版權歸原作者所有,本站不擁有其著作權,亦不承擔相應法律責任。如果您發現本站中有涉嫌抄襲或描述失實的內容,請聯系我們jiasou666@gmail.com 處理,核實后本網站將在24小時內刪除侵權內容。