從構建分布式秒殺系統聊聊Disruptor高性能隊列

      網友投稿 913 2025-04-02

      前言


      秒殺架構持續優化中,基于自身認知不足之處在所難免,也請大家指正,共同進步。文章標題來自碼友的建議,希望可以把阻塞隊列ArrayBlockingQueue這個隊列替換成Disruptor,由于之前曾接觸過這個東西,聽說很不錯,正好借此機會整合進來。

      簡介

      LMAX Disruptor是一個高性能的線程間消息庫。它源于LMAX對并發性,性能和非阻塞算法的研究,如今構成了Exchange基礎架構的核心部分。

      Disruptor它是一個開源的并發框架,并獲得2011 Duke’s 程序框架創新獎,能夠在無鎖的情況下實現網絡的Queue并發操作。

      Disruptor是一個高性能的異步處理框架,或者可以認為是最快的消息框架(輕量的JMS),也可以認為是一個觀察者模式的實現,或者事件監聽模式的實現。

      在這里你可以跟BlockingQueue隊列作比對,簡單的理解為它是一種高效的"生產者-消費者"模型,先了解后深入底層原理。

      核心

      寫代碼案例之前,大家最好先了解 Disruptor 的核心概念,至少知道它是如何運作的。

      Ring Buffer

      如其名,環形的緩沖區。曾經 RingBuffer 是 Disruptor 中的最主要的對象,但從3.0版本開始,其職責被簡化為僅僅負責對通過 Disruptor 進行交換的數據(事件)進行存儲和更新。在一些更高級的應用場景中,Ring Buffer 可以由用戶的自定義實現來完全替代。

      Sequence Disruptor

      通過順序遞增的序號來編號管理通過其進行交換的數據(事件),對數據(事件)的處理過程總是沿著序號逐個遞增處理。一個 Sequence 用于跟蹤標識某個特定的事件處理者( RingBuffer/Consumer )的處理進度。雖然一個 AtomicLong 也可以用于標識進度,但定義 Sequence 來負責該問題還有另一個目的,那就是防止不同的 Sequence 之間的CPU緩存偽共享(Flase Sharing)問題。

      Sequencer

      Sequencer 是 Disruptor 的真正核心。此接口有兩個實現類 SingleProducerSequencer、MultiProducerSequencer ,它們定義在生產者和消費者之間快速、正確地傳遞數據的并發算法。

      Sequence Barrier

      用于保持對RingBuffer的 main published Sequence 和Consumer依賴的其它Consumer的 Sequence 的引用。 Sequence Barrier 還定義了決定 Consumer 是否還有可處理的事件的邏輯。

      Wait Strategy

      定義 Consumer 如何進行等待下一個事件的策略。 (注:Disruptor 定義了多種不同的策略,針對不同的場景,提供了不一樣的性能表現)

      Event

      在 Disruptor 的語義中,生產者和消費者之間進行交換的數據被稱為事件(Event)。它不是一個被 Disruptor 定義的特定類型,而是由 Disruptor 的使用者定義并指定。

      EventProcessor

      EventProcessor 持有特定消費者(Consumer)的 Sequence,并提供用于調用事件處理實現的事件循環(Event Loop)。

      EventHandler

      Disruptor 定義的事件處理接口,由用戶實現,用于處理事件,是 Consumer 的真正實現。

      Producer

      即生產者,只是泛指調用 Disruptor 發布事件的用戶代碼,Disruptor 沒有定義特定接口或類型。

      優點

      剖析Disruptor:為什么會這么快?(一)鎖的缺點

      剖析Disruptor:為什么會這么快?(二)神奇的緩存行填充

      剖析Disruptor:為什么會這么快?(三)偽共享

      剖析Disruptor:為什么會這么快?(四)揭秘內存屏障

      有興趣的參考:

      https://coolshell.cn/articles/9169.html

      https://www.cnblogs.com/daoqidelv/p/6995888.html

      使用案例

      這里以我們系統中的秒殺作為案例,后面有相對復雜的場景介紹。

      定義秒殺事件對象:

      /** ?*?事件對象(秒殺事件) ?*?創建者?科幫網 ?*/public?class?SeckillEvent?implements?Serializable?{????private?static?final?long?serialVersionUID?=?1L;????private?long?seckillId;????private?long?userId;????public?SeckillEvent(){ ????}????public?long?getSeckillId()?{????????return?seckillId; ????}????public?void?setSeckillId(long?seckillId)?{????????this.seckillId?=?seckillId; ????}????public?long?getUserId()?{????????return?userId; ????}????public?void?setUserId(long?userId)?{????????this.userId?=?userId; ????} }

      為了讓Disruptor為我們預先分配這些事件,我們需要一個將執行構造的EventFactory:

      /** ?*?事件生成工廠(用來初始化預分配事件對象) ?*?創建者?科幫網 ?*/public?class?SeckillEventFactory?implements?EventFactory?{????public?SeckillEvent?newInstance()?{????????return?new?SeckillEvent(); ????} }

      然后,我們需要創建一個處理這些事件的消費者:

      /** ?*?消費者(秒殺處理器) ?*?創建者?科幫網 ?*/public?class?SeckillEventConsumer?implements?EventHandler?{????//業務處理、這里是無法注入的,需要手動獲取,見源碼 ????private?ISeckillService?seckillService?=?(ISeckillService)?SpringUtil.getBean("seckillService");????public?void?onEvent(SeckillEvent?seckillEvent,?long?seq,?boolean?bool)?throws?Exception?{ ????????seckillService.startSeckil(seckillEvent.getSeckillId(),?seckillEvent.getUserId()); ????} }

      既然有消費者,我們將需要這些秒殺事件的來源:

      /** ?*?使用translator方式生產者 ?*?創建者?科幫網 ?*/public?class?SeckillEventProducer?{????private?final?static?EventTranslatorVararg?translator?=?new?EventTranslatorVararg()?{????????public?void?translateTo(SeckillEvent?seckillEvent,?long?seq,?Object...?objs)?{ ????????????seckillEvent.setSeckillId((Long)?objs[0]); ????????????seckillEvent.setUserId((Long)?objs[1]); ????????} ????};????private?final?RingBuffer?ringBuffer;????public?SeckillEventProducer(RingBuffer?ringBuffer){????????this.ringBuffer?=?ringBuffer; ????}????public?void?seckill(long?seckillId,?long?userId){????????this.ringBuffer.publishEvent(translator,?seckillId,?userId); ????} }

      最后,我們來寫一個測試類,運行一下(跑不通,需要修改消費者):

      /** ?*?測試類 ?*?創建者?科幫網 ?*/public?class?SeckillEventMain?{????public?static?void?main(String[]?args)?{ ????????producerWithTranslator(); ????}????public?static?void?producerWithTranslator(){ ????????SeckillEventFactory?factory?=?new?SeckillEventFactory();????????int?ringBufferSize?=?1024; ????????ThreadFactory?threadFactory?=?new?ThreadFactory()?{????????????public?Thread?newThread(Runnable?runnable)?{????????????????return?new?Thread(runnable); ????????????} ????????};????????//創建disruptor ????????Disruptor?disruptor?=?new?Disruptor(factory,?ringBufferSize,?threadFactory);????????//連接消費事件方法 ????????disruptor.handleEventsWith(new?SeckillEventConsumer());????????//啟動 ????????disruptor.start(); ????????RingBuffer?ringBuffer?=?disruptor.getRingBuffer(); ????????SeckillEventProducer?producer?=?new?SeckillEventProducer(ringBuffer);????????for(long?i?=?0;?i<10;?i++){ ????????????producer.seckill(i,?i); ????????} ????????disruptor.shutdown();//關閉?disruptor,方法會堵塞,直至所有的事件都得到處理; ????} }

      使用場景

      PCP (生產者-消費者問題)

      網上搜了下國內實戰案例并不多,大廠可能有在使用

      這里舉一個大家日常的例子,停車場景。當汽車進入停車場時(A),系統首先會記錄汽車信息(B)。同時也會發送消息到其他系統處理相關業務(C),最后發送短信通知車主收費開始(D)。

      一個生產者A與三個消費者B、C、D,D的事件處理需要B與C先完成。則該模型結構如下:

      在這個結構下,每個消費者擁有各自獨立的事件序號Sequence,消費者之間不存在共享競態。SequenceBarrier1監聽RingBuffer的序號cursor,消費者B與C通過SequenceBarrier1等待可消費事件。SequenceBarrier2除了監聽cursor,同時也監聽B與C的序號Sequence,從而將最小的序號返回給消費者D,由此實現了D依賴B與C的邏輯。

      從構建分布式秒殺系統聊聊Disruptor高性能隊列

      代碼案例:從0到1構建分布式秒殺系統

      參考:

      https://github.com/LMAX-Exchange/disruptor/wiki

      https://github.com/LMAX-Exchange/disruptor/wiki/Getting-Started

      http://wiki.jikexueyuan.com/project/disruptor-getting-started/lmax-framework.html

      分布式

      版權聲明:本文內容由網絡用戶投稿,版權歸原作者所有,本站不擁有其著作權,亦不承擔相應法律責任。如果您發現本站中有涉嫌抄襲或描述失實的內容,請聯系我們jiasou666@gmail.com 處理,核實后本網站將在24小時內刪除侵權內容。

      版權聲明:本文內容由網絡用戶投稿,版權歸原作者所有,本站不擁有其著作權,亦不承擔相應法律責任。如果您發現本站中有涉嫌抄襲或描述失實的內容,請聯系我們jiasou666@gmail.com 處理,核實后本網站將在24小時內刪除侵權內容。

      上一篇:文檔網頁端中如何把文件生成「鏈接」分享?(怎樣上傳云文檔)
      下一篇:通過antd-charts可視化對比科比和詹姆斯誰更強
      相關文章
      性xxxx黑人与亚洲| www.亚洲成在线| 久久亚洲精品无码网站| 亚洲国产精品成人综合色在线婷婷| 亚洲中文字幕久久精品无码喷水 | 亚洲日韩一页精品发布| 亚洲精品视频免费| 亚洲av午夜成人片精品电影 | 亚洲熟妇无码爱v在线观看| 亚洲精品熟女国产| 亚洲欧洲日产韩国在线| 亚洲明星合成图综合区在线| 亚洲第一永久在线观看| 亚洲视频中文字幕在线| 亚洲成AV人片久久| 亚洲深深色噜噜狠狠网站| 久久亚洲精品国产精品婷婷| 亚洲国产欧洲综合997久久| 亚洲成av人无码亚洲成av人| 国产亚洲高清在线精品不卡| 亚洲AV无码一区二区三区国产| 亚洲伦乱亚洲h视频| 亚洲国产三级在线观看| 亚洲av无码乱码国产精品fc2| 久久综合图区亚洲综合图区| 亚洲一区二区三区首页| 亚洲制服在线观看| 亚洲夂夂婷婷色拍WW47| 日本亚洲中午字幕乱码| 亚洲午夜国产片在线观看| 丁香五月亚洲综合深深爱| 亚洲av最新在线网址| 亚洲色av性色在线观无码| 亚洲不卡中文字幕| 亚洲狠狠婷婷综合久久| 亚洲精品成a人在线观看| 亚洲成A人片在线观看无码不卡| 亚洲一级二级三级不卡| 亚洲中文字幕一二三四区苍井空| 亚洲第一成年网站视频| 国产成人毛片亚洲精品|