Disruptor

一、簡介

LMAX是一家外匯黃金交易所丹莲,Disruptor是由LMAX公司開發(fā)的可信消息傳遞架構(gòu)的一部分
以便用非常快速的方法來在多組件之間傳遞數(shù)據(jù)尸诽。
核心思想是理解并適應(yīng)硬件工作方式來達到最優(yōu)的效果甥材。
github地址:https://github.com/LMAX-Exchange/disruptor
LMAX架構(gòu):https://martinfowler.com/articles/lmax.html

二、架構(gòu)圖

結(jié)構(gòu)圖

三性含、成員

sequencer:序列號分配
sequence:序號洲赵,自增不減
MultiProducerSequencer 多生產(chǎn)者序列分配器
SingleProducerSequencer 單生產(chǎn)者序列分配器
ProcessingSequenceBarrier 管理消費者和生產(chǎn)者的依賴關(guān)系,
Ring Buffer:負責存儲和更新事件的數(shù)據(jù)
Sequence Barrier:由Sequencer生成商蕴,它包含此Sequencer發(fā)布的Sequence指針以及依賴的其它消費者的Sequence叠萍。 它包含為消費者檢查是否有可用的事件的代碼邏輯绪商。
Wait Strategy: 消費者等待事件的策略, 這些事件由生產(chǎn)者放入部宿。
Event:傳遞的事件,完全有用戶定義
EventProcessor:處理事件的主要循環(huán),包含一個Sequence绵患。有一個具體的實現(xiàn)類BatchEventProcessor.
EventHandler: 用戶實現(xiàn)的接口,代表一個消費者
Producer:生產(chǎn)者落蝙,先獲得占位,然后提交事件移迫。

四管行、示例

maven依賴

        <dependency>
            <groupId>com.lmax</groupId>
            <artifactId>disruptor</artifactId>
            <version>3.3.6</version>
        </dependency>

消息體:

import lombok.Data;
/**
 * Created by yangzaining on 2020-04-06.
 */
@Data
public class LongEvent {
    private Long id;
}

消息工廠

import com.lmax.disruptor.EventFactory;
import lombok.Data;

/**
 * Created by yangzaining on 2020-04-06.
 */
@Data
public class LongEventFactory implements EventFactory<LongEvent> {
    @Override
    public LongEvent newInstance() {
        return new LongEvent();
    }
}

消費者

import com.lmax.disruptor.EventHandler;
import lombok.extern.slf4j.Slf4j;

/**
 * Created by yangzaining on 2020-04-06.
 */
@Slf4j
public class LongEventHandler implements EventHandler<LongEvent> {
    @Override
    public void onEvent(LongEvent event, long sequence, boolean endOfBatch) throws Exception {
        log.info("event = {},sequence = {}, endOfBatch = {}", event.getId(), sequence, endOfBatch);
    }
}

生產(chǎn)者

package demo;

import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.util.DaemonThreadFactory;

/**
* Created by yangzaining on 2020-04-06.
*/
public class LongEventPusher {

   private RingBuffer<LongEvent> ringBuffer;

   LongEventPusher(RingBuffer<LongEvent> ringBuffer) {
       this.ringBuffer = ringBuffer;
   }

   public void push(Long id) {
       long sequence = ringBuffer.next();
       try {
           LongEvent event = ringBuffer.get(sequence);
           event.setId(id);
       } finally {
           ringBuffer.publish(sequence);
       }
   }
}


    public static void main(String[] args) throws InterruptedException {
        LongEventFactory factory = new LongEventFactory();
        int bufferSize = 1024;
        Disruptor<LongEvent> disruptor = new Disruptor<>(factory, bufferSize, DaemonThreadFactory.INSTANCE);
        disruptor.handleEventsWith(new LongEventHandler(), new LongEventHandler());
        disruptor.start();

        RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer();
        LongEventPusher pusher = new LongEventPusher(ringBuffer);//java 8可以用pushEvent
        for (long l = 0; l < 100; l++) {
//            long finalL = l;
//            disruptor.publishEvent((eventWrapper, sequence) -> eventWrapper.setId(finalL));
            pusher.push(l);
        }
        Thread.sleep(10000);
    }

producer.png
    /**
     * @see Sequencer#next(int)
     */
    @Override
    public long next(int n)
    {
        if (n < 1)
        {
            throw new IllegalArgumentException("n must be > 0");
        }

        long current;
        long next;

        do
        {
            current = cursor.get();
            next = current + n;

            long wrapPoint = next - bufferSize;
            long cachedGatingSequence = gatingSequenceCache.get();

            if (wrapPoint > cachedGatingSequence || cachedGatingSequence > current)
            {
                long gatingSequence = Util.getMinimumSequence(gatingSequences, current);

                if (wrapPoint > gatingSequence)
                {
                    waitStrategy.signalAllWhenBlocking();
                    LockSupport.parkNanos(1); // TODO, should we spin based on the wait strategy?
                    continue;
                }

                gatingSequenceCache.set(gatingSequence);
            }
            else if (cursor.compareAndSet(current, next))
            {
                break;
            }
        }
        while (true);

        return next;
    }
consumer.png
           while (true)
            {
                try
                {
                    final long availableSequence = sequenceBarrier.waitFor(nextSequence);

                    while (nextSequence <= availableSequence)
                    {
                        event = dataProvider.get(nextSequence);
                        eventHandler.onEvent(event, nextSequence, nextSequence == availableSequence);
                        nextSequence++;
                    }

                    sequence.set(availableSequence);
                }
                catch (final TimeoutException e)
                {
                    notifyTimeout(sequence.get());
                }
消費圖解.png

注:以上全憑自己對Disruptor的理解雨效,有不對的地方歡迎指正

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末徽龟,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子据悔,更是在濱河造成了極大的恐慌,老刑警劉巖屠尊,帶你破解...
    沈念sama閱讀 219,039評論 6 508
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件耕拷,死亡現(xiàn)場離奇詭異,居然都是意外死亡浸赫,警方通過查閱死者的電腦和手機,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,426評論 3 395
  • 文/潘曉璐 我一進店門既峡,熙熙樓的掌柜王于貴愁眉苦臉地迎上來碧查,“玉大人,你說我怎么就攤上這事忠售。” “怎么了稻扬?”我有些...
    開封第一講書人閱讀 165,417評論 0 356
  • 文/不壞的土叔 我叫張陵,是天一觀的道長盼砍。 經(jīng)常有香客問我逝她,道長,這世上最難降的妖魔是什么黔宛? 我笑而不...
    開封第一講書人閱讀 58,868評論 1 295
  • 正文 為了忘掉前任,我火速辦了婚禮跌宛,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘疆拘。我一直安慰自己,他們只是感情好哎迄,可當我...
    茶點故事閱讀 67,892評論 6 392
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著翔烁,像睡著了一般。 火紅的嫁衣襯著肌膚如雪旨涝。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 51,692評論 1 305
  • 那天慨默,我揣著相機與錄音弧腥,去河邊找鬼。 笑死管搪,一個胖子當著我的面吹牛,可吹牛的內(nèi)容都是我干的更鲁。 我是一名探鬼主播,決...
    沈念sama閱讀 40,416評論 3 419
  • 文/蒼蘭香墨 我猛地睜開眼朋沮,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起纠亚,我...
    開封第一講書人閱讀 39,326評論 0 276
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎蒂胞,沒想到半個月后图呢,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,782評論 1 316
  • 正文 獨居荒郊野嶺守林人離奇死亡赴叹,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,957評論 3 337
  • 正文 我和宋清朗相戀三年指蚜,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片摊鸡。...
    茶點故事閱讀 40,102評論 1 350
  • 序言:一個原本活蹦亂跳的男人離奇死亡免猾,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出猎提,到底是詐尸還是另有隱情,我是刑警寧澤锨苏,帶...
    沈念sama閱讀 35,790評論 5 346
  • 正文 年R本政府宣布,位于F島的核電站蚓炬,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏肯夏。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 41,442評論 3 331
  • 文/蒙蒙 一烁兰、第九天 我趴在偏房一處隱蔽的房頂上張望徊都。 院中可真熱鬧,春花似錦暇矫、人聲如沸主之。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,996評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽所森。三九已至,卻和暖如春焕济,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背晴弃。 一陣腳步聲響...
    開封第一講書人閱讀 33,113評論 1 272
  • 我被黑心中介騙來泰國打工问欠, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人顺献。 一個月前我還...
    沈念sama閱讀 48,332評論 3 373
  • 正文 我出身青樓,卻偏偏與公主長得像注整,于是被迫代替她去往敵國和親能曾。 傳聞我的和親對象是個殘疾皇子肿轨,可洞房花燭夜當晚...
    茶點故事閱讀 45,044評論 2 355

推薦閱讀更多精彩內(nèi)容