消息隊列最佳實踐】消息恰好被消費一次

      網友投稿 851 2022-05-28

      對系統增加MQ對峰值寫流量做削峰填谷,對次要業務邏輯做異步,對不同系統模塊做解耦。

      因為業務邏輯從同步代碼中移除了,所以也要有相應隊列處理程序處理消息、執行業務邏輯。

      隨著業務邏輯復雜,會引入更多外部系統和服務,就會越來越多使用MQ

      與外部系統解耦合以及提升系統性能。

      比如系統要加紅包功能:用戶在購買一定數量商品后,系統給用戶發一個現金紅包鼓勵用戶消費。由于發放紅包的過程不應在購買商品的主流程,所以考慮MQ異步。

      但發現一個問題:

      若消息在投遞過程丟失

      用戶就會因沒有得到紅包而投訴

      消息在投遞過程出現重復

      就會因為發送兩個紅包而損失

      消息為什么會丟失

      消息從被寫入到MQ到被消費者消費完成,這個鏈路上會有哪些地方存在丟失消息的可能呢?其實主要存在三個場景:

      消息從生產者寫入到消息隊列的過程

      消息在消息隊列中的存儲場景

      消息被消費者消費的過程。

      在消息生產的過程中丟失消息

      兩種情況。

      首先,消息的生產者一般是業務服務器,MQ獨立部署在單獨服務器。二者間的網絡雖是內網,但也存在抖動可能,一旦發生抖動,消息就可能因網絡錯誤而丟失。

      推薦消息重傳,即當你發現發送超時后,就將消息重發一次,但也不能無限重發。一般若不是MQ故障或到MQ的網絡斷開了,重試2~3次即可。

      但這種方案可能造成消息重復,從而在消費時重復消費同樣的消息。

      比方說消息生產時,由于MQ處理慢或網絡抖動,導致雖最終寫入MQ成功,但在生產端卻超時,生產者重傳這條消息就會形成重復消息,你就收到了兩個現金紅包!

      在MQ中丟失消息

      消息在Kafka是存在本地磁盤的,而為了減少消息存儲時對磁盤的隨機I/O,一般會將消息先寫到os的Page Cache,然后再找合適時機刷盤。

      比如Kafka可以配置異步刷盤時機:

      當達到某一時間間隔

      或累積一定消息數量

      假如你經營一個圖書館,讀者每還一本書你都要去把圖書歸位,不僅工作量大而且效率低下,但是如果你可以選擇每隔3小時或者圖書達到一定數量的時候再把圖書歸位,這樣可以把同一類型的書一起歸位,節省了查找圖書位置的時間,可以提高效率。

      不過如果發生掉電或異常重啟,Page Cache中還沒有來得及刷盤的消息就會丟失了。那么怎么解決呢?

      你可能會:

      把刷盤的間隔設置很短

      或設置累積一條消息

      就刷盤,但頻繁刷盤會對很影響性能,而且宕機或掉電幾率也不高,不推薦。

      如果你的系統對消息丟失容忍度很低,可考慮集群部署Kafka,通過部署多個副本備份數據,保證消息盡量不丟失。

      Kafka集群中有一個Leader負責消息的寫入和消費,可以有多個Follower負責數據的備份。Follower中有一個特殊的集合叫做ISR(in-sync replicas),當Leader故障時,新選舉出來的Leader會從ISR中選擇,默認Leader的數據會異步地復制給Follower,這樣在Leader發生掉電或者宕機時,Kafka會從Follower中消費消息,減少消息丟失的可能。

      由于默認消息是異步地從Leader復制到Follower的,所以一旦Leader宕機,那些還沒有來得及復制到Follower的消息還是會丟失。

      為解決這個問題,Kafka為生產者提供“acks”,當這個選項被設置為“all”時,生產者發送的每一條消息除了發給Leader外還會發給所有的ISR,并且必須得到Leader和所有ISR的確認后才被認為發送成功。這樣,只有Leader和所有的ISR都掛了消息才會丟失。

      當設置“acks=all”時,需要同步執行1、3、4三個步驟,對于消息生產的性能來說也是有比較大的影響的,所以你在實際應用中需要仔細地權衡考量。我給你的建議是:

      1.如果你需要確保消息一條都不能丟失,那么建議不要開啟消息隊列的同步刷盤,而是用集群的方式來解決,可以配置當所有ISR Follower都接收到消息才返回成功。

      2.如果對消息的丟失有一定的容忍度,那么建議不部署集群,即使以集群方式部署,也建議配置只發送給一個Follower就可以返回成功了。

      3.我們的業務系統一般對于消息的丟失有一定的容忍度,比如說以上面的紅包系統為例,如果紅包消息丟失了,我們只要后續給沒有發送紅包的用戶補發紅包就好了。

      在消費的過程中存在消息丟失的可能

      一個消費者消費消息的進度是記錄在消息隊列集群中的,而消費的過程分為三步:接收消息、處理消息、更新消費進度。

      這里面接收消息和處理消息的過程都可能會發生異常或者失敗,比如消息接收時網絡發生抖動,導致消息并沒有被正確的接收到;處理消息時可能發生一些業務的異常導致處理流程未執行完成,這時如果更新消費進度,這條失敗的消息就永遠不會被處理了,也可以認為是丟失了。

      所以,在這里你需要注意的是,一定要等到消息接收和處理完成后才能更新消費進度,但是這也會造成消息重復的問題,比方說某一條消息在處理之后消費者恰好宕機了,那么因為沒有更新消費進度,所以當這個消費者重啟之后還會重復地消費這條消息。

      如何保證消息只被消費一次

      從上面的分析中你能發現,為了避免消息丟失我們需要付出兩方面的代價:一方面是性能的損耗,一方面可能造成消息重復消費。

      性能的損耗我們還可以接受,因為一般業務系統只有在寫請求時才會有發送消息隊列的操作,而一般系統的寫請求的量級并不高,但是消息一旦被重復消費就會造成業務邏輯處理的錯誤。那么我們要如何避免消息的重復呢?

      想要完全的避免消息重復的發生是很難做到的,因為網絡的抖動、機器的宕機和處理的異常都是比較難以避免的,在工業上并沒有成熟的方法,因此我們會把要求放寬,只要保證即使消費到了重復的消息,從消費的最終結果來看和只消費一次是等同的就好了,也就是保證在消息的生產和消費的過程是“冪等”的。

      冪等

      多次執行同一個操作和執行一次操作,最終得到的結果是相同的。

      如果消費一條消息,要將庫存數減1,那么如消費兩條相同消息,庫存數減2,這就非冪等。

      而如果消費一條消息后處理邏輯是將庫存的數置0,

      或如果當前庫存數是10,則減1,這樣在消費多條消息時所得到的結果就是相同的,這就是冪等。

      一件事兒無論做多少次都和做一次產生的結果是一樣的,那么這件事兒就具有冪等性。

      生產、消費過程增加消息冪等

      消息在生產和消費的過程中都可能重復,所以要在生產、消費過程增加消息冪等性保證,這樣就可認為從“最終結果上來看”消息實際上是只被消費一次。

      消息生產過程中,在Kafka0.11和Pulsar都支持“producer idempotency”,即生產過程的冪等性,這種特性保證消息雖然可能在生產端產生重復,但最終在MQ 存儲時只會存一份。

      這是怎么做到的呢?

      給每個生產者一個唯一ID,并為生產的每條消息賦予一個唯一ID,MQ服務端會存儲<生產者ID,最后一條消息ID>映射。

      當某生產者產生新消息,MQ服務端比對消息ID是否與存儲的最后一條ID一致,若一致,就認為是重復消息,服務端自動丟棄。

      在消費端,冪等可從如下兩方面考慮:

      通用層

      可在消息被生產時,使用發號器給它生成一個全局唯一消息ID,消息被處理后,把這個ID存儲在DB,在處理下一條消息前,先從DB查詢該全局ID是否被消費過,若被消費過就放棄消費。

      無論是生產端的冪等保證還是消費端通用的冪等性保證,它們的共同特點都是為每個消息生成唯一ID,然后在使用這個消息時,先比對ID是否已存在,存在則認為消息已被使用。

      所以這種方式是一種標準的實現冪等的方式,實戰中可直接使用,偽代碼如 下:

      // 判斷ID是否存在 boolean isIDExisted = selectByID(ID); if(isIDExisted) { // 存在則直接返回 return; } else { // 不存在,則處理消息 process(message); // 存儲ID saveID(ID); }

      1

      2

      3

      4

      5

      6

      7

      8

      9

      10

      11

      12

      不過這樣會有個問題:如果消息在處理之后,還沒有來得及寫入DB,消費者宕機了,重啟后發現DB并無這條消息,還是會重復執行兩次消費邏輯,這時就需要引入事務,保證消息處理和寫入DB必須同時成功或失敗,但這樣消息處理成本更高,所以如果對消息重復沒有特別嚴格要求,可直接使用這種通用方案,而不考慮引入事務。

      業務層

      有很多種處理方式,有一種是增加樂觀鎖。比如你的消息處理程序需要給一個人的賬號加錢。

      具體操作:

      給每個人的賬號數據加個版本號,在生產消息時先查詢該賬戶的版本號,并將版本號連同消息一起發給MQ。消費端拿到消息和版本號后,在執行更新賬戶金額SQL的時候帶上版本號:

      update user set amount = amount + 20, version=version+1 where userId=1 and version=1;

      1

      2

      3

      4

      5

      更新數據時,給數據加樂觀鎖,這樣在消費第一條消息時,version值為1,SQL可以執行成功,并且同時把version值改為2。

      在執行第二條相同消息時,由于version值不再是1,所以這條SQL不能執行成功,實現了消息冪等。

      總結

      消息的丟失可以通過生產端的重試、消息隊列配置集群模式以及消費端合理處理消費進度三個方式來解決;

      為了解決消息的丟失通常會造成性能上的問題以及消息的重復問題;

      通過保證消息處理的冪等性可以解決消息的重復問題。

      并不是說消息丟失一定不能被接受,畢竟你可以看到在允許消息丟失的情況下,消息隊列的性能更好,方案實現的復雜度也最低。比如像是日志處理的場景,日志存在的意義在于排查系統的問題,而系統出現問題的幾率不高,偶發的丟失幾條日志是可以接受的。

      【消息隊列最佳實踐】消息恰好被消費一次

      方案設計看場景,你不能把所有的消息隊列都配置成防止消息丟失的方式,也不能要求所有的業務處理邏輯都要支持冪等性,這樣會給開發和運維帶來額外負擔。

      Kafka 網絡

      版權聲明:本文內容由網絡用戶投稿,版權歸原作者所有,本站不擁有其著作權,亦不承擔相應法律責任。如果您發現本站中有涉嫌抄襲或描述失實的內容,請聯系我們jiasou666@gmail.com 處理,核實后本網站將在24小時內刪除侵權內容。

      上一篇:kudu參數優化設置,讓集群飛起來~
      下一篇:Adobe Illustrator CC 2019破解版安裝
      相關文章
      麻豆狠色伊人亚洲综合网站| 亚洲a级在线观看| 亚洲第一成人影院| 亚洲色大成网站www久久九| 国产亚洲中文日本不卡二区| 亚洲人成黄网在线观看| 亚洲精品mv在线观看| 亚洲精品成人久久| 亚洲白色白色永久观看| 亚洲最新在线视频| 亚洲精品美女久久久久9999| 亚洲成人黄色网址| 亚洲av无码国产综合专区| 亚洲成a人片在线不卡| 日本亚洲色大成网站www久久| 亚洲国产精品免费观看| 在线观看亚洲AV每日更新无码| 亚洲欧美一区二区三区日产| 亚洲AV噜噜一区二区三区| 国产亚洲精品成人久久网站| 亚洲国产黄在线观看| 一本色道久久综合亚洲精品| 国产精品亚洲精品日韩已满| 亚洲成年轻人电影网站www| 78成人精品电影在线播放日韩精品电影一区亚洲 | 久久久久se色偷偷亚洲精品av| 亚洲高清一区二区三区| 亚洲人av高清无码| 国产亚洲人成在线影院| 国产亚洲成人在线播放va| 亚洲狠狠婷婷综合久久久久| 中文字幕亚洲免费无线观看日本| 亚洲欧洲日产国码二区首页| 国产亚洲福利在线视频| 国产精品观看在线亚洲人成网| 亚洲熟女乱综合一区二区| 国产亚洲3p无码一区二区| 亚洲春色另类小说| 亚洲日韩精品无码专区加勒比☆| 国产亚洲美女精品久久久久| 国产AⅤ无码专区亚洲AV|