Disruptor并發(fā)框架

概述
disruptor對(duì)于處理并發(fā)任務(wù)很擅長(zhǎng)抹恳,曾有人測(cè)過署驻,一個(gè)線程里1s內(nèi)可以處理六百萬個(gè)訂單健霹,性能相當(dāng)感人瓶蚂。

這個(gè)框架的結(jié)構(gòu)大概是:數(shù)據(jù)生產(chǎn)端 --> 緩存 --> 消費(fèi)端

緩存中的數(shù)據(jù)是主動(dòng)發(fā)給消費(fèi)端的,而不是像一般的生產(chǎn)者消費(fèi)者模式那樣瞳别,消費(fèi)端去緩存中取數(shù)據(jù)杭攻。

可以將disruptor理解為,基于事件驅(qū)動(dòng)的高效隊(duì)列馆铁、輕量級(jí)的JMS

disruptor學(xué)習(xí)網(wǎng)站:http://ifeve.com/disruptor-getting-started

開發(fā)流程

1.建Event類(數(shù)據(jù)對(duì)象)

2.建立一個(gè)生產(chǎn)數(shù)據(jù)的工廠類锅睛,EventFactory,用于生產(chǎn)數(shù)據(jù)现拒;

3.監(jiān)聽事件類(處理Event數(shù)據(jù))

4.實(shí)例化Disruptor具练,配置參數(shù),綁定事件扛点;

5.建存放數(shù)據(jù)的核心 RingBuffer,生產(chǎn)的數(shù)據(jù)放入 RungBuffer眠饮。

樣例
1.入口
import java.nio.ByteBuffer;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.YieldingWaitStrategy;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;

public class LongEventMain {

public static void main(String[] args) throws Exception {
    //創(chuàng)建緩沖池
    ExecutorService  executor = Executors.newCachedThreadPool();
    //創(chuàng)建工廠
    LongEventFactory factory = new LongEventFactory();
    //創(chuàng)建bufferSize ,也就是RingBuffer大小铜邮,必須是2的N次方
    int ringBufferSize = 1024 * 1024; // 

    /**
    //BlockingWaitStrategy 是最低效的策略,但其對(duì)CPU的消耗最小并且在各種不同部署環(huán)境中能提供更加一致的性能表現(xiàn)
    WaitStrategy BLOCKING_WAIT = new BlockingWaitStrategy();
    //SleepingWaitStrategy 的性能表現(xiàn)跟BlockingWaitStrategy差不多扔茅,對(duì)CPU的消耗也類似秸苗,但其對(duì)生產(chǎn)者線程的影響最小,適合用于異步日志類似的場(chǎng)景
    WaitStrategy SLEEPING_WAIT = new SleepingWaitStrategy();
    //YieldingWaitStrategy 的性能是最好的惊楼,適合用于低延遲的系統(tǒng)秸讹。在要求極高性能且事件處理線數(shù)小于CPU邏輯核心數(shù)的場(chǎng)景中璃诀,推薦使用此策略蔑匣;例如,CPU開啟超線程的特性
    WaitStrategy YIELDING_WAIT = new YieldingWaitStrategy();
    */
    
    //創(chuàng)建disruptor
    Disruptor<LongEvent> disruptor = 
            new Disruptor<LongEvent>(factory, ringBufferSize, executor, ProducerType.SINGLE, new YieldingWaitStrategy());
    // 連接消費(fèi)事件方法
    disruptor.handleEventsWith(new LongEventHandler());
    
    // 啟動(dòng)
    disruptor.start();
    
    //Disruptor 的事件發(fā)布過程是一個(gè)兩階段提交的過程:
    //發(fā)布事件
    RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer();
    
    LongEventProducer producer = new LongEventProducer(ringBuffer); 
    //LongEventProducerWithTranslator producer = new LongEventProducerWithTranslator(ringBuffer);
    ByteBuffer byteBuffer = ByteBuffer.allocate(8);
    for(long l = 0; l<100; l++){
        byteBuffer.putLong(0, l);
        producer.onData(byteBuffer);
        //Thread.sleep(1000);
    }

    
    disruptor.shutdown();//關(guān)閉 disruptor氧秘,方法會(huì)堵塞,直至所有的事件都得到處理;
    executor.shutdown();//關(guān)閉 disruptor 使用的線程池搔确;如果需要的話,必須手動(dòng)關(guān)閉座硕, disruptor 在 shutdown 時(shí)不會(huì)自動(dòng)關(guān)閉涕蜂;        
    
    
}

}
2.數(shù)據(jù)對(duì)象:
public class LongEvent {
private long value;
public long getValue() {
return value;
}

public void setValue(long value) { 
    this.value = value; 
} 

}
3.Event工廠
import com.lmax.disruptor.EventFactory;
// 需要讓disruptor為我們創(chuàng)建事件,我們同時(shí)還聲明了一個(gè)EventFactory來實(shí)例化Event對(duì)象蜘拉。
public class LongEventFactory implements EventFactory {

@Override 
public Object newInstance() { 
    return new LongEvent(); 
} 

}
4.生產(chǎn)者
import java.nio.ByteBuffer;

import com.lmax.disruptor.RingBuffer;
/**

  • 很明顯的是:當(dāng)用一個(gè)簡(jiǎn)單隊(duì)列來發(fā)布事件的時(shí)候會(huì)牽涉更多的細(xì)節(jié)有鹿,這是因?yàn)槭录?duì)象還需要預(yù)先創(chuàng)建。

  • 發(fā)布事件最少需要兩步:獲取下一個(gè)事件槽并發(fā)布事件(發(fā)布事件的時(shí)候要使用try/finnally保證事件一定會(huì)被發(fā)布)持寄。

  • 如果我們使用RingBuffer.next()獲取一個(gè)事件槽娱俺,那么一定要發(fā)布對(duì)應(yīng)的事件。

  • 如果不能發(fā)布事件荠卷,那么就會(huì)引起Disruptor狀態(tài)的混亂。

  • 尤其是在多個(gè)事件生產(chǎn)者的情況下會(huì)導(dǎo)致事件消費(fèi)者失速赖欣,從而不得不重啟應(yīng)用才能會(huì)恢復(fù)屑彻。
    */
    public class LongEventProducer {

    private final RingBuffer<LongEvent> ringBuffer;

    public LongEventProducer(RingBuffer<LongEvent> ringBuffer){
    this.ringBuffer = ringBuffer;
    }

    /**

    • onData用來發(fā)布事件,每調(diào)用一次就發(fā)布一次事件
    • 它的參數(shù)會(huì)用過事件傳遞給消費(fèi)者
      */
      public void onData(ByteBuffer bb){
      //1.可以把ringBuffer看做一個(gè)事件隊(duì)列顶吮,那么next就是得到下面一個(gè)事件槽
      long sequence = ringBuffer.next();
      try {
      //2.用上面的索引取出一個(gè)空的事件用于填充(獲取該序號(hào)對(duì)應(yīng)的事件對(duì)象)
      LongEvent event = ringBuffer.get(sequence);
      //3.獲取要通過事件傳遞的業(yè)務(wù)數(shù)據(jù)
      event.setValue(bb.getLong(0));
      } finally {
      //4.發(fā)布事件
      //注意社牲,最后的 ringBuffer.publish 方法必須包含在 finally 中以確保必須得到調(diào)用;如果某個(gè)請(qǐng)求的 sequence 未被提交悴了,將會(huì)堵塞后續(xù)的發(fā)布操作或者其它的 producer搏恤。
      ringBuffer.publish(sequence);
      }
      }
      }
      5.消費(fèi)者

import com.lmax.disruptor.EventHandler;

//我們還需要一個(gè)事件消費(fèi)者,也就是一個(gè)事件處理器湃交。這個(gè)事件處理器簡(jiǎn)單地把事件中存儲(chǔ)的數(shù)據(jù)打印到終端:
public class LongEventHandler implements EventHandler<LongEvent> {

@Override
public void onEvent(LongEvent longEvent, long l, boolean b) throws Exception {
    System.out.println(longEvent.getValue());         
}

}

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末熟空,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子搞莺,更是在濱河造成了極大的恐慌,老刑警劉巖才沧,帶你破解...
    沈念sama閱讀 212,816評(píng)論 6 492
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件迈喉,死亡現(xiàn)場(chǎng)離奇詭異,居然都是意外死亡温圆,警方通過查閱死者的電腦和手機(jī)挨摸,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,729評(píng)論 3 385
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來岁歉,“玉大人得运,你說我怎么就攤上這事」疲” “怎么了熔掺?”我有些...
    開封第一講書人閱讀 158,300評(píng)論 0 348
  • 文/不壞的土叔 我叫張陵,是天一觀的道長(zhǎng)帆啃。 經(jīng)常有香客問我瞬女,道長(zhǎng),這世上最難降的妖魔是什么努潘? 我笑而不...
    開封第一講書人閱讀 56,780評(píng)論 1 285
  • 正文 為了忘掉前任诽偷,我火速辦了婚禮,結(jié)果婚禮上疯坤,老公的妹妹穿的比我還像新娘报慕。我一直安慰自己,他們只是感情好压怠,可當(dāng)我...
    茶點(diǎn)故事閱讀 65,890評(píng)論 6 385
  • 文/花漫 我一把揭開白布眠冈。 她就那樣靜靜地躺著,像睡著了一般。 火紅的嫁衣襯著肌膚如雪蜗顽。 梳的紋絲不亂的頭發(fā)上布卡,一...
    開封第一講書人閱讀 50,084評(píng)論 1 291
  • 那天,我揣著相機(jī)與錄音雇盖,去河邊找鬼忿等。 笑死,一個(gè)胖子當(dāng)著我的面吹牛崔挖,可吹牛的內(nèi)容都是我干的贸街。 我是一名探鬼主播,決...
    沈念sama閱讀 39,151評(píng)論 3 410
  • 文/蒼蘭香墨 我猛地睜開眼狸相,長(zhǎng)吁一口氣:“原來是場(chǎng)噩夢(mèng)啊……” “哼薛匪!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起脓鹃,我...
    開封第一講書人閱讀 37,912評(píng)論 0 268
  • 序言:老撾萬榮一對(duì)情侶失蹤逸尖,失蹤者是張志新(化名)和其女友劉穎,沒想到半個(gè)月后瘸右,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體冷溶,經(jīng)...
    沈念sama閱讀 44,355評(píng)論 1 303
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,666評(píng)論 2 327
  • 正文 我和宋清朗相戀三年尊浓,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片纯衍。...
    茶點(diǎn)故事閱讀 38,809評(píng)論 1 341
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡栋齿,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出襟诸,到底是詐尸還是另有隱情瓦堵,我是刑警寧澤,帶...
    沈念sama閱讀 34,504評(píng)論 4 334
  • 正文 年R本政府宣布歌亲,位于F島的核電站菇用,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏陷揪。R本人自食惡果不足惜惋鸥,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 40,150評(píng)論 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望悍缠。 院中可真熱鬧卦绣,春花似錦、人聲如沸飞蚓。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,882評(píng)論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)趴拧。三九已至溅漾,卻和暖如春山叮,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背添履。 一陣腳步聲響...
    開封第一講書人閱讀 32,121評(píng)論 1 267
  • 我被黑心中介騙來泰國(guó)打工屁倔, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人缝龄。 一個(gè)月前我還...
    沈念sama閱讀 46,628評(píng)論 2 362
  • 正文 我出身青樓汰现,卻偏偏與公主長(zhǎng)得像,于是被迫代替她去往敵國(guó)和親叔壤。 傳聞我的和親對(duì)象是個(gè)殘疾皇子瞎饲,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 43,724評(píng)論 2 351

推薦閱讀更多精彩內(nèi)容

  • Spring Cloud為開發(fā)人員提供了快速構(gòu)建分布式系統(tǒng)中一些常見模式的工具(例如配置管理,服務(wù)發(fā)現(xiàn)炼绘,斷路器嗅战,智...
    卡卡羅2017閱讀 134,638評(píng)論 18 139
  • 本文是筆者在研究Disruptor過程中對(duì)Disruptor官方介紹與入門指南的翻譯,有些部分做了適當(dāng)編輯和增減俺亮。...
    coder_jerry閱讀 8,489評(píng)論 2 57
  • LMAX是什么驮捍? 要說Disruptor需要先說下LMAX,LMAX是一個(gè)英國(guó)外匯黃金交易所,它是第一家也是唯一一...
    零度沸騰_yjz閱讀 13,168評(píng)論 0 12
  • LMAX是一種新型零售金融交易平臺(tái)脚曾,它能夠以很低的延遲(latency)產(chǎn)生大量交易(吞吐量). 這個(gè)系統(tǒng)是建立在...
    舉頭明鑒閱讀 6,270評(píng)論 2 10
  • Disruptor提供了一種線程之間信息交換的方式东且。 鎖的缺點(diǎn) 并發(fā)的問題 想象有兩個(gè)線程嘗試修改同一個(gè)變量val...
    jiangmo閱讀 1,681評(píng)論 0 2