LMAX Disruptor簡介

LMAX是什么碳抄?

要說Disruptor需要先說下LMAX,LMAX是一個英國外匯黃金交易所免绿,它是第一家也是唯一一家采用多邊交易設(shè)施Multilateral Trading Facility(MTF)瘦穆,擁有交易所拍照和經(jīng)紀商拍照的歐洲頂級金融公司。而LMAX所用的Disruptor技術(shù)油宜,在一個線程每秒處理6百萬訂單沪饺。沒錯际跪,這個Disruptor就是我們這里的
Disruptor佩脊。
而Disruptor只是LMAX平臺一部分,LMAX是一個新型零售金融交易平臺垫卤,它能夠達到低延遲、高吞吐量(大量交易)出牧。這個系統(tǒng)建立在JVM平臺上穴肘,核心是一個邏輯處理器,每秒能夠處理600百萬訂單舔痕。業(yè)務(wù)邏輯處理器完全運行在內(nèi)存中(in-memory)评抚,使用事件源驅(qū)動方式(event sourcing)豹缀。而業(yè)務(wù)邏輯處理器核心是Disruptor,這是一個并發(fā)組件慨代,能夠在無鎖情況下實現(xiàn)網(wǎng)絡(luò)并發(fā)查詢操作邢笙。他們研究表明,現(xiàn)在所謂的高性能研究方向似乎和現(xiàn)在CPU設(shè)計是相左的侍匙。

什么是Disruptor

Disruptor

Disruptor實現(xiàn)了隊列的功能氮惯,而且是一個有界的隊列。所以應(yīng)用場景自然就是"生產(chǎn)者-消費者"模型了想暗「竞梗可以看下JDK中的BlockingQuery是一個FIFO隊列,生產(chǎn)者(Producer)發(fā)布(Publish)一項事件(Event说莫,消息)時杨箭,消費者(Consumer)能夠獲得通知;當隊列中沒有事件時储狭,消費者會被阻塞互婿,直到生產(chǎn)者發(fā)布了新的事件。而Disruptor不僅僅只是這些:

  • 同一個事件可以有多個消費者辽狈,消費者之間可以并行處理慈参,可以相互依賴處理
  • 預(yù)分配用于存儲事件的內(nèi)存
  • 針對極高的性能目標而實現(xiàn)極度優(yōu)化和無鎖設(shè)計

可能你對這種場景還不是很明白,簡單說就是當需要兩個獨立的處理過程(兩個線程)之間需要傳遞數(shù)據(jù)時稻艰,就可以使用Disruptor懂牧,當然可以使用隊列。

Disruptor中的核心概念

  • Ring Buffer
    環(huán)形緩沖區(qū)尊勿,曾經(jīng)是Disruptor中的核心對象僧凤,不過從3.0版本開始,只負責(zé)對通過Disruptor進行交換的數(shù)據(jù)(事件)進行存儲和更新元扔。在一些高級使用中躯保,Ring Buffer可以由用戶自定義的來代替睬关。

  • Sequence
    通過遞增的序號管理進行交換的數(shù)據(jù)(事件)专执,對數(shù)據(jù)(事件)的處理過程總是沿著序號逐個遞增處理的。一個Sequence用于跟蹤標識某個特定的事件處理者(RingBuffer/Consumer)的進度吭服。雖然可以使用AutomicLong標識進度擅羞,但定義Sequence另一個目的是防止CPU緩存?zhèn)喂蚕?Disruptor高性能關(guān)鍵點之一)

  • Sequencer
    Sequence是Disruptor的真正核心尸变。這個接口有兩個實現(xiàn)類SingleProducerSequence和MultiProducerSequence,它們定義生產(chǎn)者和消費者之間快速减俏、正確地傳遞數(shù)據(jù)的并發(fā)算法召烂。

  • Sequence Barrier
    保持RingBuffer的main published Sequence和Consumer依賴的其它Sequence的引用。Sequence Barrier還定義決定Consumer是否還有可處理的事件邏輯娃承。

  • Event
    在Disruptor中奏夫,在生產(chǎn)者和消費者之間進行交換的數(shù)據(jù)稱為事件(Event)怕篷。它不是Disruptor定義的類型,而是使用者自己定義并指定的(可以看作一個Bean)

  • EventHandler
    Disruptor定義的事件處理接口酗昼,由用戶實現(xiàn)廊谓,用于處理事件,是Consumer的真正實現(xiàn)

  • EventProcessor
    EventProcessor持有特定消費者(Consumer)的Sequence麻削,并提供用來調(diào)用事件處理實現(xiàn)事件循環(huán)(Event loop)

  • Wait Strategy
    定義Consumer如何等待下一個事件策略蒸痹。(Disruptor提供了多種策略)

  • Producer
    生產(chǎn)者,泛指Disruptor發(fā)布事件的代碼碟婆,Disruptor沒有定義特定的接口或類型电抚。

Models

簡單Demo

1.定義事件

事件(Event)是Disruptor進行數(shù)據(jù)交換的數(shù)據(jù)類型

<pre>
public class PeopleEvent {
private String name;
private Integer age;
private Integer sex;

public void setName(String name) {
    this.name = name;
}

public String getName() {
    return name;
}

public void setAge(Integer age) {
    this.age = age;
}

public void setSex(Integer sex) {
    this.sex = sex;
}

public Integer getAge() {
    return age;
}

public Integer getSex() {
    return sex;
}

}
</pre>

2.定義事件工廠

事件工廠(Event Factory)用來實例化之前的事件(Event),需要實現(xiàn)接口com.lmax.disruptor.EventFactory<>竖共。Disruptor通過EventFactory在RingBuffer中創(chuàng)建Event的實例蝙叛。一個Event實例實際被用作一個"數(shù)據(jù)槽",發(fā)布者發(fā)布前公给,先從RingBuffer獲得一個Event的實例借帘,然后往Event中填充數(shù)據(jù),之后發(fā)布到RingBuffer中淌铐,之后由Consumer獲得該Event實例并從中取出數(shù)據(jù)肺然。

<pre>
public class PeopleEventFactory implements EventFactory<PeopleEvent> {
public PeopleEvent newInstance(){
return new PeopleEvent();
}
}
</pre>

3.定義事件處理的具體實現(xiàn)(業(yè)務(wù)邏輯核心)
需要實現(xiàn)com.lmax.disruptor.EventHandler<>接口,來定義事件處理的具體邏輯

<pre>
public class PeopleEventHandler implements EventHandler<PeopleEvent> {

public void onEvent(PeopleEvent event, long sequence, boolean endOfBatch) throws Exception {
    System.out.println("name:" + event.getName()+",sex:" + event.getSex() + ",age:" + event.getAge());
}

}
</pre>

4.組合事件處理流程

<pre>
public class DisruptorDemo {

public static void main(String[] args){

    //Executor將用來為消費者構(gòu)建線程
    Executor executor = Executors.newCachedThreadPool();

    //事件工廠用來創(chuàng)建事件
    PeopleEventFactory peopleEventFactory = new PeopleEventFactory();

    //指定Ring Buffer大小腿准,2的倍數(shù)
    int buffSize = 1024;

    /**
     * 構(gòu)造Disruptor
     * 并發(fā)系統(tǒng)提高性能之一就是單一寫者原則际起,如果代碼中僅有一個事件生產(chǎn)者,可以設(shè)置單一生產(chǎn)者模式來提高系統(tǒng)的性能吐葱。
     * 通過ProduceType.SINGLE和ProduceType.MULTI進行控制街望。
     *
     * 等待策略
     * Disruptor默認的等待策略是BlockingWaitStrategy,使用一個鎖和條件變量來控制執(zhí)行和等待弟跑,這是最慢的策略灾前,但也是CPU使用最低
     * 和最穩(wěn)定的策略。
     * SleepingWaitStrategy:也是CPU使用率低的策略孟辑,它使用循環(huán)等待并且循環(huán)間調(diào)用LockSupport.parkNanos(1)來睡眠哎甲。它的優(yōu)點在于
     * 生產(chǎn)線程只需記數(shù),而不執(zhí)行任何命令饲嗽,并且沒有條件變量的消耗炭玫。但是對象從生產(chǎn)者到消費者傳遞延遲變大了,適用于不需要低延遲的場景貌虾,
     * YieldingWaitStrategy:是可以被用作低延遲系統(tǒng)的兩個策略之一础嫡,這種策略在低延遲同時會增加CPU運算量。YieldingWaitStrategy
     * 會循環(huán)等待sequence增加到合適值,循環(huán)調(diào)用Tread.yield()允許其它準備好的線程執(zhí)行榴鼎。如果高性能而且事件消費者線程比邏輯內(nèi)核少的
     * 時候,推薦使用YieldingWaitStrategy策略晚唇。
     * BusySpinWaitStrategy是性能最高的策略巫财,同時也是對部署環(huán)境要求最高的策略。這個策略最好用在時間處理線程比物理內(nèi)核數(shù)目還要少的時候哩陕。
     */
    Disruptor<PeopleEvent> disruptor = new Disruptor<PeopleEvent>(peopleEventFactory,buffSize,executor,
            ProducerType.SINGLE,new YieldingWaitStrategy());

    //鏈接處理器
    disruptor.handleEventsWith(new PeopleEventHandler());

    //啟動Disruptor平项,啟動所有線程
    disruptor.start();

    //從Disruptor獲取RingBuffer,用來發(fā)布
    RingBuffer<PeopleEvent> ringBuffer = disruptor.getRingBuffer();

    PeopleEventProducer producer = new PeopleEventProducer(ringBuffer);

    Map<String,Object> map = new HashMap<String, Object>();
    map.put("name","yjz");
    map.put("age",25);
    map.put("sex",1);

    producer.onData(map);

}

}

</pre>

5.事件發(fā)布

事件發(fā)布包括三個步驟:

  • 從RingBuffer獲取一個可以寫入事件的序號
  • 獲取對應(yīng)的事件對象悍及,將數(shù)據(jù)寫入事件對象
  • 將事件提交到RingBuffer

事件在提交之后才會通知EventProcessor進行處理闽瓢。

<pre>
long sequence = ringBuffer.next();
try {
PeopleEvent event = ringBuffer.get(sequence);
event.setName(data.get("name").toString());
event.setAge((Integer) data.get("age"));
event.setSex((Integer) data.get("sex"));
}finally {
ringBuffer.publish(sequence);
}
</code></pre>

RingBuffer.publish必須在finally來確保調(diào)用,如果某個sequence未被提交心赶,將會阻塞還需發(fā)布或其它的producer扣讼。
Disruptor提供了另一種方式簡化上述操作,來確保publish總被調(diào)用:

<pre>
private static final EventTranslatorOneArg<PeopleEvent,Map<String,Object>> tranlator = new EventTranslatorOneArg<PeopleEvent, Map<String, Object>>() {
public void translateTo(PeopleEvent event, long sequence, Map<String, Object> data) {
event.setName(data.get("name").toString());
event.setAge((Integer) data.get("age"));
event.setSex((Integer) data.get("sex"));
}
};

/**
 * onData用來發(fā)布事件缨叫,每調(diào)用一次就發(fā)布一次事件椭符,它的參數(shù)會通過事件傳遞給消費者
 */
public void onData(Map<String,Object> data){
    ringBuffer.publishEvent(tranlator,data);
}  

</code></pre>

事件發(fā)布完整代碼

<pre>
/**

  • Created by yangjianzhang on 17/2/4.

  • PeopleEventProducer是一個生成事件的源,在這里面通過讀取磁盤IO耻姥、數(shù)據(jù)庫销钝、network等。當事件源會在IO讀取一部分數(shù)據(jù)時候觸發(fā)事件(觸發(fā)事件

  • 不是自動觸發(fā)的琐簇,需要在讀取到數(shù)據(jù)的時候自己觸發(fā)事件并發(fā)布)
    */
    public class PeopleEventProducer {

    private final RingBuffer<PeopleEvent> ringBuffer;

    public PeopleEventProducer(RingBuffer ringBuffer){
    this.ringBuffer = ringBuffer;
    }

    private static final EventTranslatorOneArg<PeopleEvent,Map<String,Object>> tranlator = new EventTranslatorOneArg<PeopleEvent, Map<String, Object>>() {
    public void translateTo(PeopleEvent event, long sequence, Map<String, Object> data) {
    event.setName(data.get("name").toString());
    event.setAge((Integer) data.get("age"));
    event.setSex((Integer) data.get("sex"));
    }
    };

    /**

    • onData用來發(fā)布事件蒸健,每調(diào)用一次就發(fā)布一次事件,它的參數(shù)會通過事件傳遞給消費者
      */
      public void onData(Map<String,Object> data){
      ringBuffer.publishEvent(tranlator,data);
      }
      }
      </code></pre>

6.關(guān)閉Disruptor

<pre>
disruptor.shutdown();
executor.shutdown();
</pre>

這里只是Disruptor的一個簡介婉商,深入了解后再繼續(xù)分享似忧。

關(guān)注我

歡迎關(guān)注我的公眾號,會定期推送優(yōu)質(zhì)技術(shù)文章据某,讓我們一起進步橡娄、一起成長!
公眾號搜索:data_tc
或直接掃碼:??


歡迎關(guān)注我
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末癣籽,一起剝皮案震驚了整個濱河市挽唉,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌筷狼,老刑警劉巖瓶籽,帶你破解...
    沈念sama閱讀 218,858評論 6 508
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異埂材,居然都是意外死亡塑顺,警方通過查閱死者的電腦和手機,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,372評論 3 395
  • 文/潘曉璐 我一進店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來严拒,“玉大人扬绪,你說我怎么就攤上這事】氵耄” “怎么了挤牛?”我有些...
    開封第一講書人閱讀 165,282評論 0 356
  • 文/不壞的土叔 我叫張陵,是天一觀的道長种蘸。 經(jīng)常有香客問我墓赴,道長,這世上最難降的妖魔是什么航瞭? 我笑而不...
    開封第一講書人閱讀 58,842評論 1 295
  • 正文 為了忘掉前任诫硕,我火速辦了婚禮,結(jié)果婚禮上刊侯,老公的妹妹穿的比我還像新娘章办。我一直安慰自己,他們只是感情好滔吠,可當我...
    茶點故事閱讀 67,857評論 6 392
  • 文/花漫 我一把揭開白布纲菌。 她就那樣靜靜地躺著,像睡著了一般疮绷。 火紅的嫁衣襯著肌膚如雪翰舌。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 51,679評論 1 305
  • 那天冬骚,我揣著相機與錄音椅贱,去河邊找鬼。 笑死只冻,一個胖子當著我的面吹牛庇麦,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播喜德,決...
    沈念sama閱讀 40,406評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼山橄,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了舍悯?” 一聲冷哼從身側(cè)響起航棱,我...
    開封第一講書人閱讀 39,311評論 0 276
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎萌衬,沒想到半個月后饮醇,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,767評論 1 315
  • 正文 獨居荒郊野嶺守林人離奇死亡秕豫,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,945評論 3 336
  • 正文 我和宋清朗相戀三年朴艰,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 40,090評論 1 350
  • 序言:一個原本活蹦亂跳的男人離奇死亡祠墅,死狀恐怖侮穿,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情饵隙,我是刑警寧澤撮珠,帶...
    沈念sama閱讀 35,785評論 5 346
  • 正文 年R本政府宣布,位于F島的核電站金矛,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏勺届。R本人自食惡果不足惜驶俊,卻給世界環(huán)境...
    茶點故事閱讀 41,420評論 3 331
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望免姿。 院中可真熱鬧饼酿,春花似錦、人聲如沸胚膊。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,988評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽紊婉。三九已至药版,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間喻犁,已是汗流浹背槽片。 一陣腳步聲響...
    開封第一講書人閱讀 33,101評論 1 271
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留肢础,地道東北人还栓。 一個月前我還...
    沈念sama閱讀 48,298評論 3 372
  • 正文 我出身青樓,卻偏偏與公主長得像传轰,于是被迫代替她去往敵國和親剩盒。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當晚...
    茶點故事閱讀 45,033評論 2 355

推薦閱讀更多精彩內(nèi)容