基于Redis實現DelayQueue延遲隊列設計方案

      網友投稿 1115 2022-05-29

      應用場景

      創建訂單10分鐘之后自動支付

      訂單超時取消

      …等等…

      實現方式

      最簡單的方式,定時掃表;例如每分鐘掃表一次十分鐘之后未支付的訂單進行主動支付 ;

      優點: 簡單

      缺點: 每分鐘全局掃表,浪費資源,有一分鐘延遲

      使用RabbitMq 實現 RabbitMq實現延遲隊列

      優點: 開源,現成的穩定的實現方案;

      缺點: RabbitMq是一個消息中間件;延遲隊列只是其中一個小功能,如果團隊技術棧中本來就是使用RabbitMq那還好,如果不是,那為了使用延遲隊列而去部署一套RabbitMq成本有點大;

      使用Java中的延遲隊列,DelayQueue

      優點: java.util.concurrent包下一個延遲隊列,簡單易用;拿來即用

      缺點: 單機、不能持久化、宕機任務丟失等等;

      基于Redis自研延遲隊列

      既然上面沒有很好的解決方案,因為Redis的zset、list的特性,我們可以利用Redis來實現一個延遲隊列 RedisDelayQueue

      設計目標

      實時性: 允許存在一定時間內的秒級誤差

      高可用性:支持單機,支持集群

      支持消息刪除:業務費隨時刪除指定消息

      消息可靠性: 保證至少被消費一次

      消息持久化: 基于Redis自身的持久化特性,上面的消息可靠性基于Redis的持久化,所以如果redis數據丟失,意味著延遲消息的丟失,不過可以做主備和集群保證;

      數據結構

      Redis_Delay_Table: 是一個Hash_Table結構;里面存儲了所有的延遲隊列的信息;KV結構;K=TOPIC:ID V=CONENT; V由客戶端傳入的數據,消費的時候回傳;

      RD_ZSET_BUCKET: 延遲隊列的有序集合; 存放member=TOPIC:ID 和score=執行時間戳; 根據時間戳排序;

      RD_LIST_TOPIC: list結構; 每個Topic一個list;list存放的都是當前需要被消費的延遲Job;

      設計圖

      任務的生命周期

      新增一個Job,會在Redis_Delay_Table中插入一條數據,記錄了業務消費方的 數據結構; RD_ZSET_BUCKET 也會插入一條數據,記錄了執行時間戳;

      搬運線程會去RD_ZSET_BUCKET中查找哪些執行時間戳runTimeMillis比現在的時間小;將這些記錄全部刪除;同時會解析出來每個任務的Topic是什么,然后將這些任務push到Topic對應的列表RD_LIST_TOPIC中;

      每個Topic的List都會有一個監聽線程去批量獲取List中的待消費數據;獲取到的數據全部扔給這個Topic的消費線程池

      消息線程池執行會去Redis_Delay_Table查找數據結構,返回給回調接口,執行回調方法;

      以上所有操作,都是基于Lua腳本做的操作,Lua腳本執行的優點在于,批量命令執行具有原子性,事務性, 并且降低了網絡開銷,畢竟只有一次網絡開銷;

      搬運線程操作流程圖

      設計細節

      搬運操作

      1.搬運操作的時機

      為了避免頻繁的執行搬運操作, 我們基于 wait(time)/notify 的方式來通知執行搬運操作;

      我們用一個AtomicLong nextTime 來保存下一次將要搬運的時間;服務啟動的時候nextTime=0;所以肯定比當前時間小,那么就會先去執行一次搬運操作,然后返回搬運操作之后的ZSET的表頭時間戳,這個時間戳就是下一次將要執行的時間戳, 把這個時間戳賦值給 nextTime; 如果表中沒有元素了則將nextTime=Long.MaxValue ;因為while循環,下一次又會跟當前時間對比;如果nextTime比當前時間大,則說明需要等待; 那么我們wait(nextTime-System.currentTimeMills()); 等到時間到了之后,再次去判斷一下,就會比當前時間小,就會執行一次搬運操作;

      那么當有新增延遲任務Job的時間怎么辦,這個時候又會將當前新增Job的執行時間戳跟nextTime做個對比;如果小的話就重新賦值;

      重新賦值之后,還是調用一下 notifyAll() 通知一下搬運線程;讓他重新去判斷一下 新的時間是否比當前時間小;如果還是大的話,那么就繼續wait(nextTime-System.currentTimeMills()); 但是這個時候wait的時間又會變小;更精準;

      2.一次搬運操作的最大數量

      redis的執行速度非常快,在一個Lua里面循環遍歷1000個10000個根本沒差; 而且是在Lua里面操作,就只有一次網絡開銷;一次操作多少個元素根本就不會是問題;

      搬運操作的防護機制

      1.每分鐘喚醒定時線程

      在消費方多實例部署的情況下, 如果某一臺機器掛掉了,但是這臺機器的nextTime是最小的,就在一分鐘之后( 新增job的時候落到這臺機器,剛好時間戳很小), 其他機器可能是1個小時之后執行搬運操作; 如果這臺機器立馬重啟,那么還會立馬執行一次搬運操作;萬一他沒有重啟;那可能就會很久之后才會搬運;

      所以我們需要一種防護手段來應對這種極端情況;

      比如每分鐘將nextTime=0;并且喚醒wait;

      那么就會至少每分鐘會執行一次搬運操作! 這是可以接受的

      LrangeAndLTrim 批量獲取且刪除待消費任務

      1.執行時機以及如何防止頻繁請求redis

      這是一個守護線程,循環去做這樣的操作,把拿到的數據給線程池去消費;

      但是也不能一直不停的去執行操作,如果list已經沒有數據了去操作也沒有任何意義,不然就太浪費資源了,幸好List中有一個BLPOP阻塞原語,如果list中有數據就會立馬返回,如果沒有數據就會一直阻塞在那里,直到有數據返回,可以設置阻塞的超時時間,超時會返回NULL;

      第一次去獲取N個待消費的任務扔進到消費線程池中;如果獲取到了0個,那么我們就立馬用BLPOP來阻塞,等有元素的時候 BLPOP就返回數據了,下次就可以嘗試去LrangeAndLTrim一次了. 通過BLPOP阻塞,我們避免了頻繁的去請求redis,并且更重要的是提高了實時性;

      2.批量獲取的數量和消費線程池的阻塞隊列

      執行上面的一次獲取N個元素是不定的,這個要看線程池的maxPoolSize 最大線程數量; 因為避免消費的任務過多而放入線程池的阻塞隊列, 放入阻塞隊列有宕機丟失任務的風險,關機重啟的時候還要講阻塞隊列中的任務重新放入List中增加了復雜性;

      所以我們每次LrangeAndLTrim獲取的元素不能大于當前線程池可用的線程數; 這樣的一個控制可用用信號量Semaphore來做

      Codis集群對BLPOP的影響

      如果redis集群用了codis方案或者Twemproxy方案; 他們不支持BLPOP的命令;

      codis不支持的命令集合

      那么就不能利用BLPOP來防止頻繁請求redis;那么退而求其次改成每秒執行一次LrangeAndLTrim操作;

      集群對Lua的影響

      Lua腳本的執行只能在單機器上, 集群的環境下如果想要執行Lua腳本不出錯,那么Lua腳本中的所有key必須落在同一臺機器;

      為了支持集群操作Lua,我們利用hashtag; 用{}把三個jey的關鍵詞包起來;

      {projectName}:Redis_Delay_Table

      {projectName}:Redis_Delay_Table

      {projectName}:RD_LIST_TOPIC

      那么所有的數據就會在同一臺機器上了

      重試機制

      消費者回調接口如果拋出異常了,或者執行超時了,那么會將這個Job重新放入到RD_LIST_TOPIC中等待被下一次消費;默認重試2次;可以設置不重試;

      超時機制

      超時機制的主要思路都一樣,就是監聽一個線程的執行時間超過設定值之后拋出異常打斷方法的執行;

      這是使用的方式是 利用Callable接口實現異步超時處理

      public class TimeoutUtil { /**執行用戶回調接口的 線程池; 計算回調接口的超時時間 **/ private static ExecutorService executorService = Executors.newCachedThreadPool(); /** * 有超時時間的方法 * @param timeout 時間秒 * @return */ public static void timeoutMethod(long timeout, Function function) throws InterruptedException, ExecutionException, TimeoutException { FutureTask futureTask = new FutureTask(()->(function.apply(""))); executorService.execute(futureTask); //new Thread(futureTask).start(); try { futureTask.get(timeout, TimeUnit.MILLISECONDS); } catch (InterruptedException | ExecutionException | TimeoutException e) { //e.printStackTrace(); futureTask.cancel(true); throw e; } } }

      這種方式有一點不好就是太費線程了,相當于線程使用翻了一倍;但是相比其他的方式,這種算是更好一點的

      優雅停機

      在Jvm那里注冊一個 Runtime.getRuntime().addShutdownHook(Runnable)停機回調接口;在這里面做好善后工作;

      關閉異步AddJob線程池

      關閉每分鐘喚醒線程

      關閉搬運線程 while(!stop)的形式

      關閉所有的topic監聽線程 while(!stop)的形式

      關閉關閉所有topic的消費線程 ;先調用shutdown;再executor.awaitTermination(20, TimeUnit.SECONDS);檢查是否還有剩余的線程任務沒有執行完; 如果還沒有執行完則等待執行完;最多等待20秒之后強制調用shutdownNow強制關閉;

      關閉重試線程 while(!stop)的形式

      關閉 異常未消費Job重入List線程池

      優雅停止線程一般是用下面的方式

      ①、 while(!stop)的形式 用標識位來停止線程

      ②.先 調用executor.shutdown(); 阻止接受新的任務;然后等待當前正在執行的任務執行完; 如果有阻塞則需要調用executor.shutdownNow()強制結束;所以要給一個等待時間;

      /** * shutdownNow 終止線程的方法是通過調用Thread.interrupt()方法來實現的 * 如果線程中沒有sleep 、wait、Condition、定時鎖等應用, interrupt()方法是無法中斷當前的線程的。 * 上面的情況中斷之后還是可以再執行finally里面的方法的; * 但是如果是其他的情況 finally是不會被執行的 * @param executor */ public static void closeExecutor(ExecutorService executor, String executorName) { try { //新的任務不進隊列 executor.shutdown(); //給10秒鐘沒有停止完強行停止; if(!executor.awaitTermination(20, TimeUnit.SECONDS)) { logger.warn("線程池: {},{}沒有在20秒內關閉,則進行強制關閉",executorName,executor); List droppedTasks = executor.shutdownNow(); logger.warn("線程池: {},{} 被強行關閉,阻塞隊列中將有{}個將不會被執行.", executorName,executor,droppedTasks.size() ); } logger.info("線程池:{},{} 已經關閉...",executorName,executor); } catch (InterruptedException e) { logger.info("線程池:{},{} 打斷...",executorName,executor); } }

      BLPOP阻塞的情況如何優雅停止監聽redis的線程

      如果不是在codis集群的環境下,BLPOP是可以很方便的阻塞線程的;但是停機的時候可能會有點問題;

      基于Redis實現DelayQueue延遲隊列設計方案

      假如正在關機,當前線程正在BLPOP阻塞, 那關機線程等我們20秒執行, 剛好在倒數1秒的時候BLPOP獲取到了數據,丟給消費線程去消費;如果消費線程1秒執行不完,那么20秒倒計時到了,強制關機,那么這個任務就會被丟失了; 怎么解決這個問題呢?

      ①. 不用BLPOP, 每次都sleep一秒去調用LrangeAndLTrim操作;

      ②.關機的時候殺掉 redis的blpop客戶端; 殺掉之后 BLPOP會立馬返回null; 進入下一個循環體;

      不足

      因為Redis的持久化特性,做不到消息完全不丟失,如果要保證完成不丟失,Redis的持久化刷盤策略要收緊

      因為Codis不能使用BLPOP這種阻塞的形式,在獲取消費任務的時候用了每秒一次去獲取,有點浪費性能;

      支持消費者多實例部署,但是可能存在不能均勻的分配到每臺機器上去消費;

      雖然支持redis集群,但是其實是偽集群,因為Lua腳本的原因,讓他們都只能落在一臺機器上;

      總結

      實時性

      正常情況下 消費的時間誤差不超過1秒鐘; 極端情況下,一臺實例宕機,另外的實例nextTime很遲; 那么最大誤差是1分鐘; 真正的誤差來自于業務方的接口的消費速度

      QPS

      完全視業務方的消費速度而定; 延遲隊列不是瓶頸

      Redis 任務調度

      版權聲明:本文內容由網絡用戶投稿,版權歸原作者所有,本站不擁有其著作權,亦不承擔相應法律責任。如果您發現本站中有涉嫌抄襲或描述失實的內容,請聯系我們jiasou666@gmail.com 處理,核實后本網站將在24小時內刪除侵權內容。

      上一篇:C語言學習,這一篇就夠了!(一)
      下一篇:kafka源碼解析之二:Producer代碼分析(scala版本)(下)
      相關文章
      亚洲成色www久久网站夜月| 亚洲欧美成人av在线观看| 亚洲性色精品一区二区在线| 亚洲精品在线免费看| 亚洲乱码无码永久不卡在线 | 亚洲大片免费观看| 亚洲综合色丁香麻豆| 少妇中文字幕乱码亚洲影视| 亚洲国产人成网站在线电影动漫| 久久亚洲国产中v天仙www| 亚洲国产综合无码一区| 国产亚洲一区二区手机在线观看 | 亚洲中文久久精品无码| 亚洲综合AV在线在线播放| 久久亚洲色一区二区三区| 国产亚洲欧洲Aⅴ综合一区| 国产亚洲午夜高清国产拍精品| 国产亚洲精品福利在线无卡一| 国产亚洲精品免费视频播放| 亚洲一区二区女搞男| 亚洲AV无码码潮喷在线观看| 亚洲电影免费在线观看| 亚洲视频精品在线| 亚洲毛片基地日韩毛片基地| 亚洲午夜电影一区二区三区| 亚洲成a人片在线看| 亚洲国产欧洲综合997久久| 国产亚洲Av综合人人澡精品| 亚洲精品国产成人影院| 亚洲一区二区三区AV无码| 亚洲成年人在线观看| 亚洲成人黄色在线| 一本色道久久综合亚洲精品蜜桃冫 | 久久精品国产亚洲AV忘忧草18| 亚洲精品123区在线观看| 亚洲av日韩aⅴ无码色老头| 亚洲国产成人五月综合网| 在线播放亚洲第一字幕| 亚洲AV成人片色在线观看| 亚洲国产理论片在线播放| 亚洲国产成人无码AV在线 |