Disruptor框架學習(1)--怎么實現(xiàn)

1 Disruptor學習

在上一篇文章中技掏,筆者提到了log4j2中的異步logger。通過測試數(shù)據(jù)來看兴溜,在使用異步logger后侦厚,打印日志的時間明顯縮短,系統(tǒng)響應時間得到了巨大的提升拙徽。

那么刨沦,disruptor究竟是什么,為什么它可以提升系統(tǒng)的性能膘怕?

1.1 Disruptor簡介

Disruptor是一個開源框架想诅,研發(fā)的初衷是為了解決高并發(fā)下列隊鎖的問題,最早由LMAX(一種新型零售金融交易平臺)提出并使用,能夠在無鎖的情況下實現(xiàn)隊列的并發(fā)操作来破,并號稱能夠在一個線程里每秒處理6百萬筆訂單(我是不相信)篮灼。

隊列的特性:先進先出(FIFO)--先進入隊列的元素先出隊列(可以理解為我們生活中的排隊情況,早辦完徘禁,早滾蛋)诅诱。生產(chǎn)者(Producer)往隊列里發(fā)布(publish)事件,消費者(Consumer)獲得通知晌坤,消費事件逢艘;如果隊列中沒有事件時旦袋,消費者堵塞骤菠,直到生產(chǎn)者發(fā)布了新事件。

說到隊列疤孕,那就不得不提到Java中的concurrent包商乎,其主要實現(xiàn)包括ArrayBlockingQueue、LinkedBlockingQueue祭阀、ConcurrentLinkedQueue鹉戚、LinkedTransferQueue。下面专控,簡單介紹下:

ArrayBlockingQueue:基于數(shù)組形式的隊列抹凳,通過加鎖的方式,來保證多線程情況下數(shù)據(jù)的安全伦腐;

LinkedBlockingQueue:基于鏈表形式的隊列赢底,也通過加鎖的方式,來保證多線程情況下數(shù)據(jù)的安全柏蘑;

ConcurrentLinkedQueue:基于鏈表形式的隊列幸冻,通過compare and swap(簡稱CAS)協(xié)議的方式,
來保證多線程情況下數(shù)據(jù)的安全咳焚,不加鎖洽损,主要使用了Java中的sun.misc.Unsafe類來實現(xiàn);

LinkedTransferQueue:同上革半;

通過查看以上4個類的源碼碑定,可以發(fā)現(xiàn):

(1)使用CAS協(xié)議實現(xiàn)隊列的類,都是無界的又官,無法保證隊列的長度不傅,理論上來說可以是無限擴展,那么如果生產(chǎn)者生產(chǎn)過快赏胚,消費者還沒來得及消費访娶,最終可能會導致內(nèi)存溢出,影響系統(tǒng)穩(wěn)定觉阅;

(2)而使用加鎖實現(xiàn)隊列的類崖疤,雖然是有界的(可以設置隊列的大忻爻怠),但是有鎖的存在劫哼,性能上有了很大的影響叮趴,線程由于鎖的競爭被掛起,直到鎖的釋放权烧,才能恢復眯亦。此外,由于偽共享的存在般码,也會影響性能

而Disruptor解決了以上的問題妻率,實現(xiàn)了無鎖有界隊列操作。主要是使用了環(huán)形數(shù)組(ringbuffer)板祝、CAS宫静、緩存行填充、解決偽共享等技術(shù)券时,接下來我們一一講解孤里;

1.2 Disruptor結(jié)構(gòu)

在講解disruptor所使用的相關(guān)技術(shù)之前,我覺得有必要簡單的介紹下的Disruptor結(jié)構(gòu)橘洞!

前面介紹了捌袜,Disruptor是一個開源的框架,可以在無鎖的情況下對隊列進行操作炸枣,那么這個隊列的設計就是Disruptor的核心所在虏等;

在Disruptor中,采用了RingBuffer來作為隊列的數(shù)據(jù)結(jié)構(gòu)抛虏,RingBuffer就是一個環(huán)形的數(shù)組博其,既然是數(shù)組,我們便可對其設置大小迂猴。在這個ringBuffer中慕淡,除了數(shù)組之外,還有一個序列號沸毁,是用來指向數(shù)組中的下一個可用元素峰髓,供生產(chǎn)者使用或者消費者使用,也就是生產(chǎn)者可以生產(chǎn)的地方息尺,或者消費者可以消費的地方携兵。(序列號和數(shù)組索引是兩個概念,別搞錯了)

Disruptor使用數(shù)組作為隊列的另一個好處搂誉,就是可以快速定位到所需元素徐紧,通常使用取摸運算(序列號%數(shù)組大小=所需元素角標),但在Disruptor中使用的是位運算(具體實現(xiàn):UNSAFE.getObject(entries, REF_ARRAY_BASE + ((sequence & indexMask) << REF_ELEMENT_SHIFT))),效率更高并级,定位更快拂檩;此外,在Disruptor中數(shù)組內(nèi)的元素并不會被刪除嘲碧,而是新數(shù)據(jù)來覆蓋原有數(shù)據(jù)稻励;

1.3 Disruptor代碼簡單實現(xiàn)

我們就以一個簡單例子來實現(xiàn)Disruptor:生產(chǎn)者傳遞一個long類型變量給消費者,消費者將這個變量打印出來愈涩。

單生產(chǎn)者望抽,單消費者模型:

(1)向ringbuffer中插入的事件元素:就是在對象中放了一個long變量

public class LongEvent {

    private long value;

    public long getValue() {
        return value;
    }

    public void setValue(long value) {
        this.value = value;
    }
}

(2)事件生產(chǎn)工廠:生產(chǎn)事件存入ringbuffer中

public class LongEventFactory implements EventFactory<LongEvent> {

    public LongEvent newInstance() {
        return new LongEvent();
    }
}

(3)事件處理器,也就是消費者履婉,就是將事件的值打印出來

public class LongEventHandler implements EventHandler<LongEvent> {

    public void onEvent(LongEvent event, long sequence, boolean endOfBatch) throws Exception {
        System.out.println("Event:"+event.getValue());
    }
}

(4)主函數(shù):創(chuàng)建生產(chǎn)者煤篙,向ringbuffer中填充元素

public class DisruptorMain {

    public static void main(String[] agrs) throws InterruptedException {
        
        //創(chuàng)建線程池:
        Executor executor = Executors.newCachedThreadPool();

        //事件生產(chǎn)工廠:
        LongEventFactory longEventFactory = new LongEventFactory();

        //ringbuffer的大小:
        int bufferSize = 256;

        //實例化disruptor對象:初始化ringbuffer
         Disruptor<LongEvent> disruptor = new Disruptor<LongEvent>(longEventFactory, bufferSize, executor,ProducerType.SINGLE, new BlockingWaitStrategy());

        //設置事件的執(zhí)行者:(單消費者)
        disruptor.handleEventsWith(new LongEventHandler());
        
        //disruptor啟動:
        disruptor.start();

        RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer();

        //設置事件單生產(chǎn)者:
        for(int x = 0;x<256; x++){
            // 獲取下一個可用位置的下標
            long sequence = ringBuffer.next();  
            try{
                // 返回可用位置的元素
                LongEvent event = ringBuffer.get(sequence); 
                // 設置該位置元素的值
                event.set(x); 
            }finally{
                //發(fā)布事件 
                ringBuffer.publish(sequence);
            }
            Thread.sleep(10);
        }
    }
}

1.4 Disruptor主要實現(xiàn)類

通過以上代碼谐鼎,我們來簡單的分析下Disruptor的構(gòu)成:

Disruptor:Disruptor的入口舰蟆,主要封裝了環(huán)形隊列RingBuffer趣惠、消費者集合ConsumerRepository的引用狸棍;主要提供了獲取環(huán)形隊列、添加消費者味悄、生產(chǎn)者向RingBuffer中添加事件(可以理解為生產(chǎn)者生產(chǎn)數(shù)據(jù))的操作草戈;

RingBuffer:Disruptor中隊列具體的實現(xiàn),底層封裝了Object[]數(shù)組侍瑟;在初始化時唐片,會使用Event事件對數(shù)組進行填充,填充的大小就是bufferSize設置的值涨颜;此外费韭,該對象內(nèi)部還維護了Sequencer(序列生產(chǎn)器)具體的實現(xiàn);

Sequencer:序列生產(chǎn)器庭瑰,分別有MultiProducerSequencer(多生產(chǎn)者序列生產(chǎn)器) 和 SingleProducerSequencer(單生產(chǎn)者序列生產(chǎn)器)兩個實現(xiàn)類星持。上面的例子中,使用的是SingleProducerSequencer弹灭;在Sequencer中督暂,維護了消費者的Sequence(序列對象)和生產(chǎn)者自己的Sequence(序列對象);以及維護了生產(chǎn)者與消費者序列沖突時候的等待策略WaitStrategy穷吮;

Sequence:序列對象逻翁,內(nèi)部維護了一個long型的value,這個序列指向了RingBuffer中Object[]數(shù)組具體的角標捡鱼。生產(chǎn)者和消費者各自維護自己的Sequence八回;但都是指向RingBuffer的Object[]數(shù)組;

Wait Strategy:等待策略。當沒有可消費的事件時缠诅,消費者根據(jù)特定的策略進行等待伟墙;當沒有可生產(chǎn)的地方時,生產(chǎn)者根據(jù)特定的策略進行等待滴铅;

Event:事件對象戳葵,就是我們Ringbuffer中存在的數(shù)據(jù),在Disruptor中用Event來定義數(shù)據(jù)汉匙,并不存在Event類拱烁,它只是一個定義;

EventProcessor:事件處理器噩翠,單獨在一個線程內(nèi)執(zhí)行戏自,判斷消費者的序列和生產(chǎn)者序列關(guān)系,決定是否調(diào)用我們自定義的事件處理器伤锚,也就是是否可以進行消費擅笔;

EventHandler:事件處理器,由用戶自定義實現(xiàn)屯援,也就是最終的事件消費者猛们,需要實現(xiàn)EventHandler接口;

Producer:事件生產(chǎn)者狞洋,也就是我們上面代碼中最后那部門的for循環(huán)弯淘;

1.5 Disruptor的生產(chǎn)和消費

上面我們通過代碼簡單的實現(xiàn)了Disruptor,闡述其中具體實現(xiàn)類的含義吉懊,接下來再用圖文的方式進一步介紹Disruptor的生產(chǎn)和消費庐橙;

暫時還是以單生產(chǎn)和單消費者舉例:

(1)當Disruptor框架啟動:


(2)此時,還沒有數(shù)據(jù)進行寫入


(3)準備寫入數(shù)據(jù)前的準備借嗽,獲取可以寫入數(shù)據(jù)的最大序列态鳖;


(4)寫入數(shù)據(jù)完成,更新生產(chǎn)者序列對象的值恶导;


以上浆竭,就是單生產(chǎn)者寫入數(shù)據(jù)的過程。要注意的是甲锡,無論是生產(chǎn)者還是消費者兆蕉,序列的初始值都是-1;

當引入消費者后缤沦,生產(chǎn)者在獲取可寫入的序列之前虎韵,都會判斷消費者所處的序列。

我們假設一種情況缸废,當在我們的消費者端使用Thread.sleep(巨大的值)的時候包蓝,消費者使用被等待驶社,無法進行消費。

那么此時测萎,生產(chǎn)者會一直對數(shù)組中的元素進行生產(chǎn)亡电,當生產(chǎn)到7準備生產(chǎn)序列8的時候,通過計算序列8對應的是index = 0的元素硅瞧,我們此時會判斷覆蓋點所對應的角標是否大于消費者的序列大小份乒,如果大于消費者序列,那么生產(chǎn)者不會進行生產(chǎn)腕唧,直到消費者消費了此角標下的元素或辖;

public long next(int n){
    if (n < 1)
    {
        throw new IllegalArgumentException("n must be > 0");
    }

    long nextValue = this.nextValue;

    long nextSequence = nextValue + n;
    long wrapPoint = nextSequence - bufferSize;
    long cachedGatingSequence = this.cachedValue;

    if (wrapPoint > cachedGatingSequence || cachedGatingSequence > nextValue){
        cursor.setVolatile(nextValue);  // StoreLoad fence

        long minSequence;
        //此處進行判斷,如果覆蓋點的大小枣接,超過了消費者的序列颂暇,那么會一直while循環(huán)進行判斷
        while (wrapPoint > (minSequence = Util.getMinimumSequence(gatingSequences, nextValue))){
            waitStrategy.signalAllWhenBlocking();
            LockSupport.parkNanos(1L); // TODO: Use waitStrategy to spin?
        }

        this.cachedValue = minSequence;
    }

    this.nextValue = nextSequence;

    return nextSequence;
}

單消費者,進行消費的邏輯但惶,與單生產(chǎn)者類似耳鸯,大家可以進行深入研究;

以上便是單消費者和單生產(chǎn)者的大體流程膀曾;

下一篇县爬,筆者將著重要介紹,Disruptor中使用的技術(shù)方案<酥0剖 苫纤!

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末碉钠,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子卷拘,更是在濱河造成了極大的恐慌喊废,老刑警劉巖,帶你破解...
    沈念sama閱讀 222,252評論 6 516
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件栗弟,死亡現(xiàn)場離奇詭異污筷,居然都是意外死亡,警方通過查閱死者的電腦和手機乍赫,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 94,886評論 3 399
  • 文/潘曉璐 我一進店門瓣蛀,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人雷厂,你說我怎么就攤上這事惋增。” “怎么了改鲫?”我有些...
    開封第一講書人閱讀 168,814評論 0 361
  • 文/不壞的土叔 我叫張陵诈皿,是天一觀的道長林束。 經(jīng)常有香客問我,道長稽亏,這世上最難降的妖魔是什么壶冒? 我笑而不...
    開封第一講書人閱讀 59,869評論 1 299
  • 正文 為了忘掉前任,我火速辦了婚禮截歉,結(jié)果婚禮上胖腾,老公的妹妹穿的比我還像新娘。我一直安慰自己瘪松,他們只是感情好胸嘁,可當我...
    茶點故事閱讀 68,888評論 6 398
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著凉逛,像睡著了一般性宏。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上状飞,一...
    開封第一講書人閱讀 52,475評論 1 312
  • 那天毫胜,我揣著相機與錄音,去河邊找鬼诬辈。 笑死酵使,一個胖子當著我的面吹牛,可吹牛的內(nèi)容都是我干的焙糟。 我是一名探鬼主播口渔,決...
    沈念sama閱讀 41,010評論 3 422
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼穿撮!你這毒婦竟也來了缺脉?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 39,924評論 0 277
  • 序言:老撾萬榮一對情侶失蹤悦穿,失蹤者是張志新(化名)和其女友劉穎攻礼,沒想到半個月后,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體栗柒,經(jīng)...
    沈念sama閱讀 46,469評論 1 319
  • 正文 獨居荒郊野嶺守林人離奇死亡礁扮,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 38,552評論 3 342
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了瞬沦。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片太伊。...
    茶點故事閱讀 40,680評論 1 353
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖逛钻,靈堂內(nèi)的尸體忽然破棺而出僚焦,到底是詐尸還是另有隱情,我是刑警寧澤绣的,帶...
    沈念sama閱讀 36,362評論 5 351
  • 正文 年R本政府宣布叠赐,位于F島的核電站欲账,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏芭概。R本人自食惡果不足惜赛不,卻給世界環(huán)境...
    茶點故事閱讀 42,037評論 3 335
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望罢洲。 院中可真熱鬧踢故,春花似錦、人聲如沸惹苗。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,519評論 0 25
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽桩蓉。三九已至淋纲,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間院究,已是汗流浹背洽瞬。 一陣腳步聲響...
    開封第一講書人閱讀 33,621評論 1 274
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留业汰,地道東北人伙窃。 一個月前我還...
    沈念sama閱讀 49,099評論 3 378
  • 正文 我出身青樓,卻偏偏與公主長得像样漆,于是被迫代替她去往敵國和親为障。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當晚...
    茶點故事閱讀 45,691評論 2 361

推薦閱讀更多精彩內(nèi)容

  • Spring Cloud為開發(fā)人員提供了快速構(gòu)建分布式系統(tǒng)中一些常見模式的工具(例如配置管理放祟,服務發(fā)現(xiàn)鳍怨,斷路器,智...
    卡卡羅2017閱讀 134,713評論 18 139
  • 本文是筆者在研究Disruptor過程中翻譯的Disruptor1.0論文精選舞竿,中間穿插了一些感想和說明京景,均以“譯...
    coder_jerry閱讀 5,185評論 3 52
  • 風把暮暮朝朝的情緒移到瀚海 梳理山巖的硬發(fā) 雕刻成滿灘的圓球并帶走砂礫 把湖水打散化霧 匯聚升騰凝結(jié)為寒冰 月亮路...
    風言無語閱讀 446評論 18 57
  • 在這個世界上芜赌,兩個人相遇的可能性是千萬分之一仰挣,成為朋友的可能性是兩億分之一,而成為終生伴侶的可能性只有五十億分之一...
    無名逍遙閱讀 4,618評論 0 0