如何處理消費過程中的重復消息?
如何處理消費過程中的重復消息?
消息傳遞過程中若失敗,則發送方會執行重試,重試就可能產生重復消息。若不處理重復消息,可能收獲驚喜。比如一個消費訂單消息,統計下單金額的微服務。若不正確處理重復消息,就會出現重復統計。那僅靠MQ能保證消息不重復嗎?
消息重復必然存在,在MQTT協議,給出三種傳遞消息時能夠提供的
1 服務質量標準
服務質量從低到高:
At most once
至多一次。消息在傳遞時,最多被送達一次。即沒什么消息可靠性保證,允許丟消息。一般都是一些對消息可靠性要求不太高的監控場景使用,比如每分鐘上報一次機房溫度數據,可接受數據少量丟失
At least once
至少一次。消息在傳遞時,至少會被送達一次。即不允許丟消息,但允許少量重復消息
Exactly once
恰好一次。消息在傳遞時,只會被送達一次,不允許丟失也不允許重復
服務質量標準不僅適于MQTT,對所有MQ都適用。大部分MQ提供服務質量都是At least once,如RocketMQ、RabbitMQ和Kafka。可以說MQ本身并不保證消息不重復。
沒錯,Kafka的確支持Exactly once,但本文說的也沒問題。Kafka的“Exactly once”和消息傳遞服務質量標準中的“Exactly once”不同,它是Kafka提供的另一特性,Kafka中支持的事務也和通常理解的事務有差異。Kafka中的事務和Excactly once主要為配合流計算。
既然MQ無法保證消息不重復,就得消費代碼接受“消息可能重復”這個現實,通過業務代碼解決重復消息對業務的影響。
2 冪等性
一般解決重復消息方案就是在消費端,讓消費消息的操作具備冪等性(Idempotence):
描述一個操作、方法或者服務,其任意多次執行所產生的影響均與一次執行的影響相同。
一個冪等的方法,使用同樣參數,對它進行多次調用和一次調用,對系統產生影響一樣。所以,對冪等方法,無需擔心重復執行會改變系統。
示例
不考慮并發,“將賬戶X的余額設為100元”,執行一次后對系統的影響是,賬戶X的余額變成了100元。只要提供參數100元不變,執行多少次,賬戶X余額始終100,這操作就是個冪等操作。
“將賬戶X余額加100元”,這操作就不是冪等,每執行次,賬戶余額增加100,執行多次和執行一次對系統的影響(即賬戶余額)不同。
若系統消費消息的業務邏輯具冪等性,那就不用擔心消息重復,因為同一消息,消費一次和多次對系統影響一樣。即消費多次等于消費一次。
從對系統影響結果:At least once + 冪等消費 = Exactly once。
3 冪等實現方案
最好從業務邏輯入手,將消費業務設計成具備冪等性的操作。但也不是所有業務都天然冪等,需要一些技巧。
3.1 數據庫唯一約束
比如對于:將賬戶X余額加100。
可限制對每個轉賬單,每個賬戶只能執行一次變更操作。最簡單的,在DB中建一張【轉賬流水表】:
轉賬單ID
賬戶ID
變更金額
然后給【轉賬單ID,賬戶ID】聯合起來創建唯一約束,這樣相同轉賬單ID、賬戶ID,表里至多只存在一條記錄。
消費消息邏輯可變為:“在【轉賬流水表】增加一條轉賬記錄,再根據轉賬記錄,異步更新用戶余額。”
在轉賬流水表加條轉賬記錄操作中,由于【轉賬單ID,賬戶ID】唯一約束,對同一轉賬單,同一賬戶只能插一條記錄,后續重復插入操作都會失敗,這就實現了冪等。
所以,只要是支持類似“INSERT IF NOT EXIST”語義的存儲系統都可實現冪等。
比如,可用
替代數據庫中的唯一約束,實現冪等消費。
3.2 為更新的數據設前置條件(類似CAS)
給數據變更設置一個前置條件:
滿足條件就更新數據
否則拒絕更新數據
更新數據時,同時變更前置條件中需要判斷的數據。于是,重復執行該操作時,由于第一次更新數據時,已變更前置條件中的判斷數據,不滿足前置條件,則不會再執行更新。
“將賬戶X的余額增加100元”,這操作加個前置條件,變為:“若賬戶X當前余額為500元,將余額加100元”就具備冪等性。對應到MQ消息,在消息體中帶上當前余額,消費時判斷DB中當前余額==消息中的余額,相等時才執行更新。
但要更新數據不是數值,或要做個復雜的更新操作咋辦?前置判斷條件是啥呢?
更通用的,是給數據增加版本號version屬性,每次更新數據前,比較
當前數據version == 消息中的version
不一致,拒絕更新
一致,更新數據同時將版本號+1,一樣則可實現冪等更新
3.3 記錄并檢查操作
若前兩種方案都不適用,還有通用性最強、適用范圍最廣方案:記錄并檢查操作,也稱“Token機制或GUID(全局唯一ID)機制”,執行數據更新操作前,先檢查是否執行過這更新操作。
發消息時,給每條消息指定全局唯一ID
消費時,先根據ID檢查消息是否被消費過,若沒有,才更新數據并將消費狀態置為已消費
但分布式系統下很難實現:
首先,給每個消息指定一個全局唯一ID,方法很多,但都不太好同時滿足簡單、高可用和高性能,或多或少都有犧牲
更麻煩的,“檢查消費狀態,然后更新數據并設置消費狀態”,三個操作必須作為一組操作,保證原子性,才能真正實現冪等,否則就是Bug
比如對于同一消息:“全局ID為8,操作為:給ID為666賬戶增加100元”,可能出現這樣情況:
t0時刻:Consumer A 收到條消息,檢查消息執行狀態,發現消息未處理過,開始執行“賬戶增加100元”
t1時刻:Consumer B 收到條消息,檢查消息執行狀態,發現消息未處理過,因這時刻,Consumer A還未來得及更新消息執行狀態
這樣就導致賬戶被錯誤地增加了兩次100元,這是一個在分布式系統中非常容易犯的錯誤
對此,可以用事務實現,也可以鎖,但在分布式系統下,分布式事務、分布式鎖都會引入高復雜度。所以一般不推薦。
總結
這些冪等方案不僅可用于解決重復消息問題,也可解決重復請求或重復調用問題。比如:
將HTTP服務設計成冪等的,解決前端或APP重復提交表單數據的問題
將一個微服務設計成冪等的,解決RPC框架自動重試導致的重復調用問題
為何MQ都只提供At least once服務質量,而非Exactly once
若MQ實現exactly once,會引發:
消費端pull時,需檢測此消息是否被消費,這檢測機制無疑拉低消息消費速度。隨消息劇增,消費性能勢必急劇下降,導致消息積壓
檢查機制還需業務端去配合實現,若一條消息長時間未返回ack,MQ需要去回調看下消費結果(類似事務消息的回查機制)。這就增加業務端的壓力與未知因素。
為了確保消息沒有被丟失或者重復,隊列需采取一定的類似回查的手段,檢測消費者是否有收到消息進行處理,在一定程度上會導致隊列堆積等一系列問題,并且隊列實現的復雜度上升
從消費者的角度而言,因為消費者端和Broker Service端都是會各自集群,消費者端可能會存在網絡抖動,導致Broker Service為了確保消息不丟失和重復,需要一直進行回查類似的操作,但是由于網絡問題,導致隊列堆積。
所以,MQ不實現exactly once,而是at least once + 冪等性,而冪等性我們消費端業務代碼自己處理。
MQ即使做到Exactly once級別,Con也要做冪等。因為Con從MQ取消息時,若Con消費成功,但ack失敗,Con還是會取到重復消息,所以MQ費力做成Exactly once無法避免業務側消息重復問題。
使用DB的唯一索引防止消息被重復消費,若業務系統存在分庫分表,消費消息被路由到不同庫或表,還是會存在問題?
一般也不會有問題,因為使用我們的方法,一條具體消息,總會落到確定的庫表,其重復消息也會落地同樣庫表。
若隊列實現At least once,但為不丟消息,Broker Service會進行一定重試,但不可能一直重試,若就是一直重試還是失敗怎么處理?
有的MQ會有個特殊隊列,保存這些總是消費失敗的“壞消息”,然后繼續消費之后的消息,避免這些壞消息卡死隊列。這種壞消息一般不會是因為網絡原因或消費者宕機導致的,大多都是因為消息數據本身有問題,消費者的業務邏輯無法處理。
exactly once,實現有性能損耗,并發高時易出現消息堆積;消息隊列設計初衷是解決解耦,而解耦的對象往往是高并發,對性能要求較高的,從產品需求層面講,消息隊列設計更注重性能,而非精準(exactly once);基礎架構角度來說,關注點是占比大的需求(不能不發,可以重發),占比極小的需求(敏感型,只能觸發一次)可以單獨抽出來另外實現。最后,請教老師有沒有比較具體的業務場景,非用這種exactly once不可的
版權聲明:本文內容由網絡用戶投稿,版權歸原作者所有,本站不擁有其著作權,亦不承擔相應法律責任。如果您發現本站中有涉嫌抄襲或描述失實的內容,請聯系我們jiasou666@gmail.com 處理,核實后本網站將在24小時內刪除侵權內容。