Disruptor真香

Disruptor

What? 是什么惜互?
高性能的無鎖隊列本昏。

大學(xué)在學(xué)習(xí)到隊列的時候值朋,老師是不是讓我們課下自己去實現(xiàn)阻塞隊列垛叨,大家還有印象么衡怀?沒有印象建議讀一讀java.util.concurrent.ArrayBlockingQueue
Why颇象? 為什么這么快
偽共享問題處理,緩沖行填充你虹,大量CAS操作绘搞,去鎖
無鎖環(huán)形隊列設(shè)計,位運算鎖定坑位等

CPU緩存認(rèn)知

個人圖解disruptor
ringbuffer.jpg
so 到目前為止傅物,如果讓你自己實現(xiàn)一個Disruptor 你會怎么做呢夯辖?
Disruptor HelloWorld
知其然,再知其所以然董饰。so 先知其然

(```)

//環(huán)形數(shù)組 位置的數(shù)據(jù)對象
public final class ValueEvent {
    private String value;

    public String getValue() {
        return value;
    }

    public void setValue(String value) {
        this.value = value;
    }

    /**
     * 數(shù)據(jù)對象數(shù)據(jù)工廠
     */
    public final static EventFactory<ValueEvent> EVENT_FACTORY = new EventFactory<ValueEvent>() {
        @Override
        public ValueEvent newInstance() {
            return new ValueEvent();
        }
    };


}

//demo main
public static void main(String[] args) throws InterruptedException {

    //環(huán)形大小
    int ringBufferSize = 2 << 2;

    //disruptor
    Disruptor<ValueEvent> disruptor = new Disruptor<ValueEvent>(ValueEvent.EVENT_FACTORY, ringBufferSize,
            Executors.defaultThreadFactory(), ProducerType.MULTI, new BlockingWaitStrategy());

    //消費端事件處理器
    EventHandler<ValueEvent> eventHandler = new EventHandler<ValueEvent>() {

        @Override
        public void onEvent(final ValueEvent event, final long sequence, final boolean endOfBatch) throws Exception {
            System.out.println("Sequence: " + sequence);
            System.out.println("ValueEvent: " + event.getValue());
        }
    };

    disruptor.handleEventsWith(eventHandler);

    //初始化disruptor
    RingBuffer<ValueEvent> ringBuffer = disruptor.start();

    Thread.sleep(3000L);
    //生產(chǎn)端事件
    for (long i = 0; i < 11; i++) {
        String uuid = UUID.randomUUID().toString();
        // Two phase commit. Grab one of the 1024 slots
        long seq = ringBuffer.next();
        ValueEvent valueEvent = ringBuffer.get(seq);
        valueEvent.setValue(uuid);
        ringBuffer.publish(seq);
    }


    disruptor.shutdown();

}

(```)

關(guān)鍵類個人理解
Disruptor:包裝了ringbuffer,消費組

RingBuffer:
數(shù)據(jù)結(jié)構(gòu):環(huán)形數(shù)組 2的n次階
核心屬性:sequencer發(fā)布坑位神器
next()方法:獲取下一個可以發(fā)布的坑位
publish(index)方法:發(fā)布坑位蒿褂,告訴消費者圆米,這個坑位可以消費了

Sequence:可以理解為一個增強的AtomicLong,解決了緩存行失效的問題啄栓,性能更好娄帖,代碼就不分析了,大家有興趣自己讀汝汲(主要是解決了偽共享的問題)

Sequencer:維護了當(dāng)前發(fā)布的坑位(單生成器近速,多生成器)

SequenceBarrier:跟蹤 生產(chǎn)者當(dāng)前發(fā)布坑位,消費者消費當(dāng)前坑位堪旧,依賴坑位等

WaitStrategy:等待策略確定消費者如何等待生產(chǎn)者將事件放入Disruptor

Event:從生產(chǎn)者傳遞給消費者的數(shù)據(jù)單位削葱。事件沒有特定的代碼表示,因為它完全由用戶定義淳梦。

EventProcessor:
用于處理來自Disruptor的事件的主事件循環(huán)析砸,消費坑位。
單線程:BatchEventProcessor
多線程  WorkProcessor

EventHandler:用戶自己實現(xiàn)谭跨,拿到數(shù)據(jù)了如何消費

Producer:用戶自己實現(xiàn)
核心類解讀
師傅領(lǐng)進門干厚,修行靠自己李滴。

建議看完上面的圖螃宙,跑跑helloworld
生產(chǎn)者#SingleProducerSequencer

(```)

//核心屬性 維護了生產(chǎn)者當(dāng)前生產(chǎn)的下標(biāo) 和 緩存門衛(wèi)下標(biāo)
long nextValue = -1; 
long cachedValue = -1;


public long next(int n)
{
    if (n < 1)
    {
        throw new IllegalArgumentException("n must be > 0");
    }

    //初始值為-1,最后申請的生產(chǎn)者坑位
    long nextValue = this.nextValue;

    //本次申請的坑位所坯,單個發(fā)生n=1 【nextSequence&(bufferSize-1)即為在數(shù)組中的具體位置】
    long nextSequence = nextValue + n;

    //本次申請的坑位減一圈谆扎? 讓一圈還比你跑的快的意思
    long wrapPoint = nextSequence - bufferSize;

    //初始值為-1 最小的消費者坑位
    long cachedGatingSequence = this.cachedValue;

    /*
     * 生產(chǎn)者追尾 消費者 ||  消費者追尾生產(chǎn)者?芹助? 這種情況什么時候會發(fā)生
     */
    if (wrapPoint > cachedGatingSequence || cachedGatingSequence > nextValue)
    {
        cursor.setVolatile(nextValue);

        long minSequence;

        //當(dāng)發(fā)生生產(chǎn)者 追尾消費者的時候
        while (wrapPoint > (minSequence = Util.getMinimumSequence(gatingSequences, nextValue)))
        {
            //生產(chǎn)者等1ns 讓消費者先泡一會
            LockSupport.parkNanos(1L); // TODO: Use waitStrategy to spin?
        }

        /*
         * 消費端讓出坑位堂湖,記下消費者占用的最小的那個坑位
         */
        this.cachedValue = minSequence;
    }

    //成功申請到坑位
    this.nextValue = nextSequence;

    return nextSequence;
}



public void publish(long sequence)
{
    //設(shè)置當(dāng)前生產(chǎn)的下標(biāo)位置
    cursor.set(sequence);
    //喚醒消費線程->獲取可消費的下標(biāo)
    waitStrategy.signalAllWhenBlocking();
}

(```)

生產(chǎn)者#MultiProducerSequencer

(```)

public long next(int n)
{
    if (n < 1)
    {
        throw new IllegalArgumentException("n must be > 0");
    }

    long current;
    long next;

    do
    {
        //當(dāng)前坑位
        current = cursor.get();
        //下一個坑位
        next = current + n;

        //讓一圈后的坑位
        long wrapPoint = next - bufferSize;
        //守門員坑位  最小消費者坑位
        long cachedGatingSequence = gatingSequenceCache.get();

        //不能超車,等待后 自旋重試
        if (wrapPoint > cachedGatingSequence || cachedGatingSequence > current)
        {
            long gatingSequence = Util.getMinimumSequence(gatingSequences, current);

            if (wrapPoint > gatingSequence)
            {
                LockSupport.parkNanos(1); // TODO, should we spin based on the wait strategy?
                continue;
            }

            gatingSequenceCache.set(gatingSequence);
        }
        //多生產(chǎn)者 用cas的方式獲取坑位
        else if (cursor.compareAndSet(current, next))
        {
            break;
        }
    }
    while (true);

    return next;
}

public void publish(final long sequence)
{
    //當(dāng)前坑位   圈數(shù)量状土?
    setAvailable(sequence);
    //通知阻塞的消費者消費
    waitStrategy.signalAllWhenBlocking();
}

(```)

環(huán)形數(shù)組#RingBuffer

(```)

//環(huán)形最大的index
private final long indexMask;

/**
 * 環(huán)形數(shù)組元素
 *null,null,null,null [ 中間這一塊是環(huán)形數(shù)組,兩邊對稱的數(shù)組位置暫時沒有放任何東西无蜂,做填充] null,null,null,null
 */
private final Object[] entries;
protected final int bufferSize;

/**
 * 生產(chǎn)者對象的引用
 * @see MultiProducerSequencer
 * @see SingleProducerSequencer
 */
protected final Sequencer sequencer;

RingBufferFields(
        EventFactory<E> eventFactory,
        Sequencer sequencer) {
    //生產(chǎn)者對象
    this.sequencer = sequencer;
    this.bufferSize = sequencer.getBufferSize();

    if (bufferSize < 1) {
        throw new IllegalArgumentException("bufferSize must not be less than 1");
    }
    if (Integer.bitCount(bufferSize) != 1) {
        throw new IllegalArgumentException("bufferSize must be a power of 2");
    }

    //indexMask = 2^n次-1  用來做與運算 相當(dāng)快,計算機非常喜歡010101
    this.indexMask = bufferSize - 1;

    /**
     * 其實是多創(chuàng)建了一些數(shù)組蒙谓,填充了一些數(shù)組斥季,保證ringbuffer的數(shù)組不和別人共享緩存行,
     * 因為ringbuffer的數(shù)組對象一開始就創(chuàng)建好了累驮,保持具體的應(yīng)用就好了酣倾,不會變,提高了性能
     */
    this.entries = new Object[sequencer.getBufferSize() + 2 * BUFFER_PAD];
    fill(eventFactory);
}

private void fill(EventFactory<E> eventFactory) {
    for (int i = 0; i < bufferSize; i++) {
        //初始化坑位
        entries[BUFFER_PAD + i] = eventFactory.newInstance();
    }
}

@SuppressWarnings("unchecked")
protected final E elementAt(long sequence) {
    //根據(jù)下標(biāo)獲取entry
    return (E) UNSAFE.getObject(entries, REF_ARRAY_BASE + ((sequence & indexMask) << REF_ELEMENT_SHIFT));
}

(```)

Disruptor

(```)

EventHandlerGroup<T> createEventProcessors(
        /*  new Sequence[0]  參數(shù)傳進
        來的*/
    final Sequence[] barrierSequences,
    final EventHandler<? super T>[] eventHandlers)
{
    checkNotStarted();

    /*
     * 有幾個消費者 就有幾個sequence
     */
    final Sequence[] processorSequences = new Sequence[eventHandlers.length];


    /**
     * 追蹤 生產(chǎn)者的生產(chǎn)下標(biāo) 和 依賴消費端的的Sequence
     */
    final SequenceBarrier barrier = ringBuffer.newBarrier(barrierSequences);

    for (int i = 0, eventHandlersLength = eventHandlers.length; i < eventHandlersLength; i++)
    {
        final EventHandler<? super T> eventHandler = eventHandlers[i];

        /**
         * 批次事件處理器
         */
        final BatchEventProcessor<T> batchEventProcessor =
            new BatchEventProcessor<>(ringBuffer, barrier, eventHandler);

        if (exceptionHandler != null)
        {
            batchEventProcessor.setExceptionHandler(exceptionHandler);
        }

        /**
         * 消費端倉庫 加入事件處理器
         */
        consumerRepository.add(batchEventProcessor, eventHandler, barrier);

        /**
         * 每個hanlder都有一個sequence
         */
        processorSequences[i] = batchEventProcessor.getSequence();
    }

    /**
     * 消費端的Sequence 需要加到sequencer中谤专。
     */
    updateGatingSequencesForNextInChain(barrierSequences, processorSequences);

    return new EventHandlerGroup<>(this, consumerRepository, processorSequences);
}

(```)

事件處理器 BatchEventProcessor

(```)

private void processEvents()
{

    /**
     * 每個消費事件是一個線程躁锡。
     *
     * sequence 最開始是-1
     *
     * 從0開始消費
     */

    T event = null;
    long nextSequence = sequence.get() + 1L;

    while (true)
    {
        try
        {

            /**
             * 檢查生產(chǎn)者生產(chǎn)的位置,消費端需要消費置侍,消費者追尾生產(chǎn)者的時候映之,阻塞在這里
             */
            final long availableSequence = sequenceBarrier.waitFor(nextSequence);


            /**
             * 消費端拿到可用的消費下標(biāo)
             */
            while (nextSequence <= availableSequence)
            {
                //獲取數(shù)據(jù)
                event = dataProvider.get(nextSequence);
                //事件處理
                eventHandler.onEvent(event, nextSequence, nextSequence == availableSequence);

                //消費坑位+1
                nextSequence++;
            }

            //消費坑位為availableSequence
            sequence.set(availableSequence);
        }
        catch (final TimeoutException e)
        {
            notifyTimeout(sequence.get());
        }
        catch (final AlertException ex)
        {
            if (running.get() != RUNNING)
            {
                break;
            }
        }
        catch (final Throwable ex)
        {
            exceptionHandler.handleEventException(ex, nextSequence, event);
            sequence.set(nextSequence);
            nextSequence++;
        }
    }
}

(```)

disruptor 一個實現(xiàn)處理器 多線程消費 disruptor.handleEventsWithWorkerPool
    換湯不換藥的存在拦焚,多線程用cas拿到當(dāng)前坑位消費。留給大家自行解讀惕医。
思考:
    1耕漱、必須A處理器處理完 才能處理B和C處理器的場景,disrupptor如何實現(xiàn)的抬伺?
    SequenceBarrier的實現(xiàn)類ProcessingSequenceBarrier構(gòu)造器public ProcessingSequenceBarrier(
    final Sequencer sequencer,
    final WaitStrategy waitStrategy,
    final Sequence cursorSequence,
    final Sequence[] dependentSequences /***哇哦 這是什么東西螟够?**/)
    
    創(chuàng)建來源
    Disruptor.createEventProcessors(
    final Sequence[] barrierSequences/**哇哦,這個參數(shù)原來是要傳依賴的那個消費組的下標(biāo)峡钓,一開始我還不知道有什么用呢 哈哈哈 讀源碼真實件有趣的事情**/,
    final EventHandler<? super T>[] eventHandlers)
    
    
    SequenceBarrier維護了一個依賴的坑位序列組妓笙。具體代碼也留給大家自行解讀
    
    2、為什么Disruptor快能岩?寞宫?
      現(xiàn)在你有答案了么
參考

disruptor一些不錯的文章

Disruptor github

about ME

雨人

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市拉鹃,隨后出現(xiàn)的幾起案子辈赋,更是在濱河造成了極大的恐慌,老刑警劉巖膏燕,帶你破解...
    沈念sama閱讀 217,907評論 6 506
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件钥屈,死亡現(xiàn)場離奇詭異,居然都是意外死亡坝辫,警方通過查閱死者的電腦和手機篷就,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,987評論 3 395
  • 文/潘曉璐 我一進店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來近忙,“玉大人竭业,你說我怎么就攤上這事〖吧幔” “怎么了未辆?”我有些...
    開封第一講書人閱讀 164,298評論 0 354
  • 文/不壞的土叔 我叫張陵,是天一觀的道長锯玛。 經(jīng)常有香客問我咐柜,道長,這世上最難降的妖魔是什么更振? 我笑而不...
    開封第一講書人閱讀 58,586評論 1 293
  • 正文 為了忘掉前任炕桨,我火速辦了婚禮,結(jié)果婚禮上肯腕,老公的妹妹穿的比我還像新娘献宫。我一直安慰自己,他們只是感情好实撒,可當(dāng)我...
    茶點故事閱讀 67,633評論 6 392
  • 文/花漫 我一把揭開白布姊途。 她就那樣靜靜地躺著涉瘾,像睡著了一般。 火紅的嫁衣襯著肌膚如雪捷兰。 梳的紋絲不亂的頭發(fā)上立叛,一...
    開封第一講書人閱讀 51,488評論 1 302
  • 那天,我揣著相機與錄音贡茅,去河邊找鬼秘蛇。 笑死,一個胖子當(dāng)著我的面吹牛顶考,可吹牛的內(nèi)容都是我干的赁还。 我是一名探鬼主播,決...
    沈念sama閱讀 40,275評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼驹沿,長吁一口氣:“原來是場噩夢啊……” “哼艘策!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起渊季,我...
    開封第一講書人閱讀 39,176評論 0 276
  • 序言:老撾萬榮一對情侶失蹤朋蔫,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后却汉,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體驯妄,經(jīng)...
    沈念sama閱讀 45,619評論 1 314
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,819評論 3 336
  • 正文 我和宋清朗相戀三年病涨,在試婚紗的時候發(fā)現(xiàn)自己被綠了富玷。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片璧坟。...
    茶點故事閱讀 39,932評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡既穆,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出雀鹃,到底是詐尸還是另有隱情幻工,我是刑警寧澤,帶...
    沈念sama閱讀 35,655評論 5 346
  • 正文 年R本政府宣布黎茎,位于F島的核電站囊颅,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏傅瞻。R本人自食惡果不足惜踢代,卻給世界環(huán)境...
    茶點故事閱讀 41,265評論 3 329
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望嗅骄。 院中可真熱鬧胳挎,春花似錦、人聲如沸溺森。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,871評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至医窿,卻和暖如春磅甩,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背姥卢。 一陣腳步聲響...
    開封第一講書人閱讀 32,994評論 1 269
  • 我被黑心中介騙來泰國打工卷要, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人独榴。 一個月前我還...
    沈念sama閱讀 48,095評論 3 370
  • 正文 我出身青樓却妨,卻偏偏與公主長得像,于是被迫代替她去往敵國和親括眠。 傳聞我的和親對象是個殘疾皇子彪标,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 44,884評論 2 354

推薦閱讀更多精彩內(nèi)容