為什么大家都在抵制定時任務關閉超時訂單

      網友投稿 888 2025-03-31

      哈嘍大家好,我是阿Q!

      前幾天領導突然宣布幾年前停用的電商項目又重新啟動了,帶著復雜的心情仔細賞閱“兒時”的代碼,心中的酸楚只有自己能夠體會。

      這不,昨天又被領導叫進了“小黑屋”,讓我把代碼重構下進行升級。看到這么“可愛”的代碼,心中一萬只“xx馬”疾馳而過。

      讓我最深惡痛覺的就是里邊竟然用定時任務實現了“關閉超時訂單”的功能,現在想來,哭笑不得。我們先分析一波為什么大家都在抵制用定時任務來實現該功能。

      定時任務

      關閉超時訂單是在創建訂單之后的一段時間內未完成支付而關閉訂單的操作,該功能一般要求每筆訂單的超時時間是一致的。

      如果我們使用定時任務來進行該操作,很難把握定時任務輪詢的時間間隔:

      時間間隔足夠小,在誤差允許的范圍內可以達到我們說的時間一致性問題,但是頻繁掃描數據庫,執行定時任務,會造成網絡IO和磁盤IO的消耗,對實時交易造成一定的沖擊;

      時間間隔比較大,由于每個訂單創建的時間不一致,所以上邊的一致性要求很難達到,舉例如下:

      假設30分鐘訂單超時自動關閉,定時任務的執行間隔時間為30分鐘:

      我們在第5分鐘進行下單操作;

      當時間來到第30分鐘時,定時任務執行一次,但是我們的訂單未滿足條件,不執行;

      當時間來到第35分鐘時,訂單達到關閉條件,但是定時任務未執行,所以不執行;

      當時間來到第60分鐘時,開始執行我們的訂單關閉操作,而此時,誤差達到25分鐘。

      經此種種,我們需要舍棄該方式。

      延時隊列

      為了滿足領導的需求,我便將手伸向了消息隊列:RabbitMQ。盡管它本身并沒有提供延時隊列的功能,但是我們可以利用它的存活時間和死信交換機的特性來間接實現。

      首先我們先來簡單介紹下什么是存活時間?什么是死信交換機?

      存活時間的全拼是Time To Live,簡稱 TTL。它既支持對消息本身進行設置(延遲隊列的關鍵),又支持對隊列進行設置(該隊列中所有消息存在相同的過期時間)。

      對消息本身進行設置:即使消息過期,也不會馬上從隊列中抹去,因為每條消息是否過期是在即將投遞到消費者之前判定的;

      對隊列進行設置:一旦消息過期,就會從隊列中抹去;

      如果同時使用這兩種方法,那么以過期時間小的那個數值為準。當消息達到過期時間還沒有被消費,那么該消息就“死了”,我們把它稱為 死信 消息。

      消息變為死信的條件:

      為什么大家都在抵制定時任務關閉超時訂單

      消息被拒絕(basic.reject/basic.nack),并且requeue=false;

      消息的過期時間到期了;

      隊列達到最大長度;

      隊列設置注意事項

      隊列中該屬性的設置要在第一次聲明隊列的時候設置才有效,如果隊列一開始已存在且沒有這個屬性,則要刪掉隊列再重新聲明才可以;

      隊列的 ttl 只能被設置為某個固定的值,一旦設置后則不能更改,否則會拋出異常;

      死信交換機全拼Dead-Letter-Exchange,簡稱DLX。

      當消息在一個隊列中變成死信之后,如果這個消息所在的隊列設置了x-dead-letter-exchange參數,那么它會被發送到x-dead-letter-exchange對應值的交換機上,這個交換機就稱之為死信交換機,與這個死信交換器綁定的隊列就是死信隊列。

      x-dead-letter-exchange:出現死信之后將死信重新發送到指定交換機;

      x-dead-letter-routing-key:出現死信之后將死信重新按照指定的routing-key發送,如果不設置默認使用消息本身的routing-key

      死信隊列與普通隊列的區別就是它的RoutingKey和Exchange需要作為參數,綁定到正常的隊列上。

      實戰教學

      先來張圖感受下我們的整體思路

      生產者發送帶有 ttl 的消息放入交換機路由到延時隊列中;

      在延時隊列中綁定死信交換機與死信轉發的routing-key;

      等延時隊列中的消息達到延時時間之后變成死信轉發到死信交換機并路由到死信隊列中;

      最后供消費者消費。

      我們在上文的基礎上進行代碼實現:

      @Configuration public class DelayQueueRabbitConfig { public static final String DLX_QUEUE = "queue.dlx";//死信隊列 public static final String DLX_EXCHANGE = "exchange.dlx";//死信交換機 public static final String DLX_ROUTING_KEY = "routingkey.dlx";//死信隊列與死信交換機綁定的routing-key public static final String ORDER_QUEUE = "queue.order";//訂單的延時隊列 public static final String ORDER_EXCHANGE = "exchange.order";//訂單交換機 public static final String ORDER_ROUTING_KEY = "routingkey.order";//延時隊列與訂單交換機綁定的routing-key /** * 定義死信隊列 **/ @Bean public Queue dlxQueue(){ return new Queue(DLX_QUEUE,true); } /** * 定義死信交換機 **/ @Bean public DirectExchange dlxExchange(){ return new DirectExchange(DLX_EXCHANGE, true, false); } /** * 死信隊列和死信交換機綁定 * 設置路由鍵:routingkey.dlx **/ @Bean Binding bindingDLX(){ return BindingBuilder.bind(dlxQueue()).to(dlxExchange()).with(DLX_ROUTING_KEY); } /** * 訂單延時隊列 * 設置隊列里的死信轉發到的DLX名稱 * 設置死信在轉發時攜帶的 routing-key 名稱 **/ @Bean public Queue orderQueue() { Map params = new HashMap<>(); params.put("x-dead-letter-exchange", DLX_EXCHANGE); params.put("x-dead-letter-routing-key", DLX_ROUTING_KEY); return new Queue(ORDER_QUEUE, true, false, false, params); } /** * 訂單交換機 **/ @Bean public DirectExchange orderExchange() { return new DirectExchange(ORDER_EXCHANGE, true, false); } /** * 把訂單隊列和訂單交換機綁定在一起 **/ @Bean public Binding orderBinding() { return BindingBuilder.bind(orderQueue()).to(orderExchange()).with(ORDER_ROUTING_KEY); } }

      @RequestMapping("/order") public class OrderSendMessageController { @Autowired private RabbitTemplate rabbitTemplate; @GetMapping("/sendMessage") public String sendMessage(){ String delayTime = "10000"; //將消息攜帶路由鍵值 rabbitTemplate.convertAndSend(DelayQueueRabbitConfig.ORDER_EXCHANGE, DelayQueueRabbitConfig.ORDER_ROUTING_KEY, "發送消息!",message->{ message.getMessageProperties().setExpiration(delayTime); return message; }); return "ok"; } }

      @Component @RabbitListener(queues = DelayQueueRabbitConfig.DLX_QUEUE)//監聽隊列名稱 public class OrderMQReciever { @RabbitHandler public void process(String message){ System.out.println("OrderMQReciever接收到的消息是:"+ message); } }

      通過調用接口,發現10秒之后才會消費消息

      問題升級

      由于開發環境和測試環境使用的是同一個交換機和隊列,所以發送的延時時間都是30分鐘。但是為了在測試環境讓測試同學方便測試,故手動將測試環境的時間改為了1分鐘。

      接著問題就來了:延時時間為1分鐘的消息并沒有立即被消費,而是等30分鐘的消息被消費完之后才被消費了。至于原因,我們下邊再分析,先用代碼來給大家復現下該問題。

      @GetMapping("/sendManyMessage") public String sendManyMessage(){ send("延遲消息睡10秒",10000+""); send("延遲消息睡2秒",2000+""); send("延遲消息睡5秒",5000+""); return "ok"; } private void send(String msg, String delayTime){ rabbitTemplate.convertAndSend(DelayQueueRabbitConfig.ORDER_EXCHANGE, DelayQueueRabbitConfig.ORDER_ROUTING_KEY, msg,message->{ message.getMessageProperties().setExpiration(delayTime); return message; }); }

      執行結果如下:

      OrderMQReciever接收到的消息是:延遲消息睡10秒 OrderMQReciever接收到的消息是:延遲消息睡2秒 OrderMQReciever接收到的消息是:延遲消息睡5秒

      原因就是延時隊列也滿足隊列先進先出的特征,當10秒的消息未出隊列時,后邊的消息不能順利出隊,造成后邊的消息阻塞了,未能達到精準延時。

      我們可以利用x-delay-message插件來解決該問題

      消息的延遲范圍是 Delay > 0, Delay =< ?ERL_MAX_T(在 Erlang 中可以被設置的范圍為 (2^32)-1 毫秒)

      生產者發送消息到交換機時,并不會立即進入,而是先將消息持久化到 Mnesia(一個分布式數據庫管理系統);

      插件將會嘗試確認消息是否過期;

      如果消息過期,消息會通過 x-delayed-type 類型標記的交換機投遞至目標隊列,供消費者消費;

      官網下載,我這邊使用的是v3.8.0.ez,將文件下載下來放到服務器的/usr/local/soft/rabbitmq_server-3.7.14/plugins 路徑下,執行rabbitmq-plugins enable rabbitmq_delayed_message_exchange命令即可。

      出現如圖所示,代表安裝成功。

      配置類

      @Configuration public class XDelayedMessageConfig { public static final String DIRECT_QUEUE = "queue.direct";//隊列 public static final String DELAYED_EXCHANGE = "exchange.delayed";//延遲交換機 public static final String ROUTING_KEY = "routingkey.bind";//綁定的routing-key /** * 定義隊列 **/ @Bean public Queue directQueue(){ return new Queue(DIRECT_QUEUE,true); } /** * 定義延遲交換機 * args:根據該參數進行靈活路由,設置為“direct”,意味著該插件具有與直連交換機具有相同的路由行為, * 如果想要不同的路由行為,可以更換現有的交換類型如:“topic” * 交換機類型為 x-delayed-message **/ @Bean public CustomExchange delayedExchange(){ Map args = new HashMap(); args.put("x-delayed-type", "direct"); return new CustomExchange(DELAYED_EXCHANGE, "x-delayed-message", true, false, args); } /** * 隊列和延遲交換機綁定 **/ @Bean public Binding orderBinding() { return BindingBuilder.bind(directQueue()).to(delayedExchange()).with(ROUTING_KEY).noargs(); } }

      發送消息

      @RestController @RequestMapping("/delayed") public class DelayedSendMessageController { @Autowired private RabbitTemplate rabbitTemplate; @GetMapping("/sendManyMessage") public String sendManyMessage(){ send("延遲消息睡10秒",10000); send("延遲消息睡2秒",2000); send("延遲消息睡5秒",5000); return "ok"; } private void send(String msg, Integer delayTime){ //將消息攜帶路由鍵值 rabbitTemplate.convertAndSend( XDelayedMessageConfig.DELAYED_EXCHANGE, XDelayedMessageConfig.ROUTING_KEY, msg, message->{ message.getMessageProperties().setDelay(delayTime); return message; }); } }

      消費消息

      @Component @RabbitListener(queues = XDelayedMessageConfig.DIRECT_QUEUE)//監聽隊列名稱 public class DelayedMQReciever { @RabbitHandler public void process(String message){ System.out.println("DelayedMQReciever接收到的消息是:"+ message); } }

      測試

      DelayedMQReciever接收到的消息是:延遲消息睡2秒 DelayedMQReciever接收到的消息是:延遲消息睡5秒 DelayedMQReciever接收到的消息是:延遲消息睡10秒

      這樣我們的問題就順利解決了。

      延遲的消息存儲在一個Mnesia表中,當前節點上只有一個磁盤副本,它們將在節點重啟后存活。

      雖然觸發計劃交付的計時器不會持久化,但它將在節點啟動時的插件激活期間重新初始化。顯然,集群中只有一個預定消息的副本意味著丟失該節點或禁用其上的插件將丟失駐留在該節點上的消息。

      該插件的當前設計并不適合延遲消息數量較多的場景(如數萬條或數百萬條),另外該插件的一個可變性來源是依賴于 Erlang 計時器,在系統中使用了一定數量的長時間計時器之后,它們開始爭用調度程序資源,并且時間漂移不斷累積。

      回復“rabbitMQ”獲取源碼!

      今天的內容就講這了,如果你有不同的意見或者更好的idea,歡迎聯系阿Q,

      阿Q將持續更新java實戰方面的文章,感興趣的可以關注下阿Q,也可以來技術群討論問題呦,之交值得深交!

      NAT

      版權聲明:本文內容由網絡用戶投稿,版權歸原作者所有,本站不擁有其著作權,亦不承擔相應法律責任。如果您發現本站中有涉嫌抄襲或描述失實的內容,請聯系我們jiasou666@gmail.com 處理,核實后本網站將在24小時內刪除侵權內容。

      版權聲明:本文內容由網絡用戶投稿,版權歸原作者所有,本站不擁有其著作權,亦不承擔相應法律責任。如果您發現本站中有涉嫌抄襲或描述失實的內容,請聯系我們jiasou666@gmail.com 處理,核實后本網站將在24小時內刪除侵權內容。

      上一篇:2022年國內十大低代碼平臺盤點,哪個值得一試?
      下一篇:中微公司研發的等離子刻蝕設備已經進入客戶的5nm生產線
      相關文章
      亚洲A∨精品一区二区三区下载| 亚洲妇女无套内射精| 亚洲欧洲中文日韩久久AV乱码| 亚洲国产成人久久精品软件 | 亚洲伊人久久大香线蕉AV| 亚洲另类古典武侠| 亚洲中文字幕无码av在线| 亚洲国产精品成人综合久久久| 亚洲精品动漫在线| 久久久久亚洲AV无码永不| 久久久亚洲欧洲日产国码aⅴ| 亚洲激情中文字幕| 亚洲日本在线看片| 久久久久亚洲AV无码专区首JN| 亚洲色图视频在线观看| 亚洲熟妇av一区二区三区下载| 亚洲人成日本在线观看| 亚洲香蕉在线观看| 亚洲色成人四虎在线观看| 亚洲成AV人片高潮喷水| 亚洲日本VA午夜在线影院| 亚洲AV无码一区二区三区电影| 综合偷自拍亚洲乱中文字幕| 国产成人亚洲精品蜜芽影院| 亚洲国产精品一区二区九九| 国产亚洲人成网站在线观看| 国产亚洲精品岁国产微拍精品| 久久久久久a亚洲欧洲aⅴ| 亚洲A∨无码一区二区三区| 久久亚洲AV无码精品色午夜麻豆| 亚洲美女视频一区二区三区| 亚洲午夜久久久久久尤物| 亚洲综合色丁香婷婷六月图片| 中文字幕在线观看亚洲日韩| 久久人午夜亚洲精品无码区| 亚洲男女内射在线播放| 亚洲日本乱码在线观看| 亚洲国产精品国自产拍电影| 亚洲噜噜噜噜噜影院在线播放| 亚洲 暴爽 AV人人爽日日碰| 狠狠入ady亚洲精品|