Disruptor核心源碼分析

Disruptor核心源碼分析

說來慚愧爆惧,Log4j2的異步日志已經用了將近2年時間了郎任。但是每次想看Disruptor源碼的時候械蹋,總是沒能堅持下去降传。這次通過一次生產環(huán)境的故障至非,堅定了看源碼的決心钠署。

從何說起

在閱讀這篇文章之前,需要你具備一些對Disruptor的基本了解荒椭。如果你對它
還一無所知谐鼎,希望你先通過下面的文章來入個門。

http://ifeve.com/dissecting-disruptor-whats-so-special/
http://ifeve.com/dissecting_the_disruptor_how_doi_read_from_the_ring_buffer/
http://ifeve.com/disruptor-writing-ringbuffer/

上面幾篇文章對Disruptor總體流程的講解還是比較清楚的趣惠,如果你看完仍然不是特別理解狸棍,沒關系,也可以繼續(xù)往下看味悄,畢竟talk is cheap草戈,下面會借助code把Disruptor的工作原理闡述清楚。本篇文章不會涉及”為什么Disruptor這么快“這個主題侍瑟,而把重點放在理解它的工作流程以及熟悉源代碼上唐片。

最簡單的Demo

我們先通過一個最簡單的Demo來感受一下Disruptor的工作流程

// 事件數據結構 StringEvent
@Data
@NoArgsConstructor
public class StringEvent {
    private String value;
}
    public static void main(String[] args) throws InterruptedException {
        Disruptor<StringEvent> disruptor = new Disruptor<>(StringEvent::new, 1024,
                DaemonThreadFactory.INSTANCE);

        disruptor.handleEventsWith(
                (EventHandler<StringEvent>) (event, sequence, endOfBatch) -> System.out
                        .println(event));

        disruptor.start();

        disruptor.publishEvent((event, sequence) -> event.setValue("changed"));
        
        // sleep一下 讓消費者可以執(zhí)行到 因為消費線程是守護線程
        Thread.sleep(1000);
    }

寥寥幾行代碼丙猬,就展示了一個完整的過程。我們來看看每一步具體做了什么:

第一步——創(chuàng)建Disruptor

創(chuàng)建Disruptor费韭,Demo中采用的是參數較少的構造方法茧球,實際上完整的參數列表還包括producerTypewaitStrategy

    public Disruptor(
            final EventFactory<T> eventFactory,
            final int ringBufferSize,
            final ThreadFactory threadFactory,
            final ProducerType producerType,
            final WaitStrategy waitStrategy)
    {
        this(
            RingBuffer.create(producerType, eventFactory, ringBufferSize, waitStrategy),
            new BasicExecutor(threadFactory));
    }
    
    private Disruptor(final RingBuffer<T> ringBuffer, final Executor executor)
    {
        this.ringBuffer = ringBuffer;
        this.executor = executor;
    }
  • eventFactory表示事件的構造器
  • ringBufferSize表示RingBuffer的長度(容量)
  • threadFactory表示消費線程的創(chuàng)建工廠
  • producerType表示是單生產者模式還是多生產者模式(默認是MULTI
  • waitStrategy表示當RingBuffer中沒有可消費的Event時消費者的等待策略(默認是BlockingWaitStrategy

可以看到,通過上面5個參數構造出了一個RingBuffer和一個Executor星持,而這兩個組件構成了一個Disruptor抢埋。這里的RingBuffer除了存儲事件的職能(DataProvider)還承擔著申請sequence和publish event的職能。Executor作為消費者線程池督暂,主要是運行消費邏輯的揪垄。因此可以說,Disruptor串聯起了生產者损痰、消費者以及RingBuffer

創(chuàng)建RingBuffer

RingBuffer是個重點福侈,因為它不止是存儲,還干了很多活卢未。所謂能者多勞肪凛,也更值得我們研究。我們先看下創(chuàng)建它的靜態(tài)方法:

    public static <E> RingBuffer<E> create(
        ProducerType producerType,
        EventFactory<E> factory,
        int bufferSize,
        WaitStrategy waitStrategy)
    {
        switch (producerType)
        {
            case SINGLE:
                return createSingleProducer(factory, bufferSize, waitStrategy);
            case MULTI:
                return createMultiProducer(factory, bufferSize, waitStrategy);
            default:
                throw new IllegalStateException(producerType.toString());
        }
    }

根據生產者類型的不同辽社,存在兩種類型的RingBuffer:單生產者類型和多生產者類型伟墙。為了更容易理解,我們這里先看Single類型的滴铅,也就是單生產者類型的RingBuffer

    // class RingBuffer
    public static <E> RingBuffer<E> createSingleProducer(
        EventFactory<E> factory,
        int bufferSize,
        WaitStrategy waitStrategy)
    {
        SingleProducerSequencer sequencer = new SingleProducerSequencer(bufferSize, waitStrategy);

        return new RingBuffer<E>(factory, sequencer);
    }

可以看到戳葵,這里通過一個EventFactory和一個SingleProducerSequencer構造了一個RingBuffer。前者是用來創(chuàng)建事件對象的汉匙,而后者可以理解成RingBuffer的"幫手"歹茶,RingBuffer委托Sequencer來處理一些非存儲類的工作(比如申請sequence悠砚,維護sequence進度,發(fā)布事件等)。

我們接著跟進去看看RingBuffer的構造函數:

    // class RingBuffer
    RingBuffer(
        EventFactory<E> eventFactory,
        Sequencer sequencer)
    {
        super(eventFactory, sequencer);
    }
    
    // class RingBufferFields
    RingBufferFields(
        EventFactory<E> eventFactory,
        Sequencer sequencer)
    {
        // “幫手”爆侣,主要用來處理sequence申請凉蜂、維護以及發(fā)布等工作
        this.sequencer = sequencer;
        this.bufferSize = sequencer.getBufferSize();

        if (bufferSize < 1)
        {
            throw new IllegalArgumentException("bufferSize must not be less than 1");
        }
        if (Integer.bitCount(bufferSize) != 1)
        {
            throw new IllegalArgumentException("bufferSize must be a power of 2");
        }

        // indexMask主要是為了使用位運算取模的恨课,很多源碼里都能看到這類優(yōu)化
        this.indexMask = bufferSize - 1;
        // 可以看到這個數組除了正常的size之外還有填充的元素险胰,這個是為了解決false sharing的,本篇文章暫不展開
        this.entries = new Object[sequencer.getBufferSize() + 2 * BUFFER_PAD];
        // 預先填充數組元素屯援,這對垃圾回收很優(yōu)化猛们,后續(xù)發(fā)布事件等操作都不需要創(chuàng)建對象,而只需要即可
        fill(eventFactory);
    }
    
    // class RingBufferFields
    private void fill(EventFactory<E> eventFactory)
    {
        for (int i = 0; i < bufferSize; i++)
        {
            entries[BUFFER_PAD + i] = eventFactory.newInstance();
        }
    }

RingBuffer的構造函數還是比較清晰的狞洋,跟著上面的注釋應該就可以理解弯淘。除了Buffer_PAD,這個我們留到后續(xù)文章中再去詳細的講解吉懊。

創(chuàng)建BasicExecutor

BasicExecutor實現了java.util.concurrent.Executor接口耳胎,通過單參數ThreadFactory函數構造:

public class BasicExecutor implements Executor{

    public BasicExecutor(ThreadFactory factory)
    {
        this.factory = factory;
    }
}

具體在哪里使用惯吕,我們后面會看到

第二步——注冊事件處理邏輯

disruptor.handleEventsWith(
                (EventHandler<StringEvent>) (event, sequence, endOfBatch) -> System.out
                        .println(event));

代碼非常容易理解,注冊了一個事件處理的回調怕午,并且可以注冊多個废登,其中回調里有三個參數:

  • event表示消費到的本次事件的主體,在例子里也就是StringEvent
  • sequence表示消費到的本次事件對應的sequence
  • endOfBatch表示消費到的本次事件是否是這個批次中的最后一個

由于默認的消費處理器(BatchEventProcessor)是批量來處理事件的郁惜,所以會有批次的概念堡距。怎么樣算一個批次呢,這個后面講BatchEveentProessor的時候會講到兆蕉。DEMO里的消費邏輯很簡單羽戒,打印一下event就完事。下面看看handleEventsWith的源代碼:

    public EventHandlerGroup<T> handleEventsWith(final EventHandler<? super T>... handlers)
    {
        // 注意虎韵,第一個參數恒為一個空數組
        return createEventProcessors(new Sequence[0], handlers);
    }

createEventProcessors方法的第一個入參叫作barrierSequences易稠,是給存在依賴關系的消費者用的。由于走Disruptor實例調用handleEventsWith都是像上面一樣傳的是空數組包蓝,為了便于理解驶社,可以先將它當成恒為空數組。

    EventHandlerGroup<T> createEventProcessors(
        final Sequence[] barrierSequences,
        final EventHandler<? super T>[] eventHandlers)
    {
        checkNotStarted();

        // 用來保存每個消費者的消費進度
        final Sequence[] processorSequences = new Sequence[eventHandlers.length];
        // SequenceBarrier主要是用來設置消費依賴的[詳解1]
        final SequenceBarrier barrier = ringBuffer.newBarrier(barrierSequences);

        for (int i = 0, eventHandlersLength = eventHandlers.length; i < eventHandlersLength; i++)
        {
            final EventHandler<? super T> eventHandler = eventHandlers[i];

            // 可以看到每個eventHandler會被封裝成BatchEventProcessor测萎,看名字就知道是批量處理的了吧
            final BatchEventProcessor<T> batchEventProcessor =
                new BatchEventProcessor<T>(ringBuffer, barrier, eventHandler);

            // 設置異常處理器
            if (exceptionHandler != null)
            {
                batchEventProcessor.setExceptionHandler(exceptionHandler);
            }

            // 注冊到consumerRepository[詳解2]
            consumerRepository.add(batchEventProcessor, eventHandler, barrier);
            // 每一個BatchEventProcessor的消費進度
            processorSequences[i] = batchEventProcessor.getSequence();
        }

        // 更新一些重要的東西[詳解3]
        updateGatingSequencesForNextInChain(barrierSequences, processorSequences);

        // 返回一個EventHandlerGroup亡电,這個主要是為了DSL服務的,可以先不關心硅瞧,可以看到DEMO中我們也沒有用到這個返回值
        return new EventHandlerGroup<T>(this, consumerRepository, processorSequences);
    }

上面采用了注釋來解釋代碼含義份乒,我覺得這種形式可能更有助于在看代碼的過程中理解。不過注釋也有局限性腕唧,比如上面寫了詳解的幾處或辖,這里我會詳細再跟進下源代碼:

詳解1——SequenceBarrier

SequenceBarrier主要是設置消費依賴的。比如某個消費者必須等它依賴的消費者消費完某個消息之后才可以消費該消息枣接。當然此處是從Disruptor上直接創(chuàng)建消費組孝凌,sequencesToTrack都為空數組,所以只依賴于RingBuffer上的cursorSequence(也就是只要RingBuffer上寫(publish)到哪了月腋,那么我就能消費到哪)

下面的代碼展示了通過RingBuffer創(chuàng)建SequenceBarrier的鏈路,發(fā)現最終創(chuàng)建的是ProcessingSequenceBarrier瓣赂。并且在這條鏈路上榆骚,我們前面假定的sequencesToTrack(也就是dependentSequences)為空數組。那么根據上面的構造函數得出dependentSequence = cursorSequence = cursor

    // class RingBuffer
    public SequenceBarrier newBarrier(Sequence... sequencesToTrack)
    {
        return sequencer.newBarrier(sequencesToTrack);
    }
    
    // class AbstractSequencer
    public SequenceBarrier newBarrier(Sequence... sequencesToTrack)
    {
        return new ProcessingSequenceBarrier(this, waitStrategy, cursor, sequencesToTrack);
    }
    
    // class ProcessingSequenceBarrier
    public ProcessingSequenceBarrier(
        final Sequencer sequencer,
        final WaitStrategy waitStrategy,
        final Sequence cursorSequence,
        final Sequence[] dependentSequences)
    {
        this.sequencer = sequencer;
        this.waitStrategy = waitStrategy;
        this.cursorSequence = cursorSequence;
        if (0 == dependentSequences.length)
        {
            dependentSequence = cursorSequence;
        }
        else
        {
            dependentSequence = new FixedSequenceGroup(dependentSequences);
        }
    }

這個cursor是什么呢煌集?首先它是Sequencer的成員變量妓肢。而Sequencer有兩種:SingleProducerSequencerMultiProducerSequencer。對于SingleProducerSequencer來說苫纤,cursor表示的是RingBuffer上當前已發(fā)布的最大sequence碉钠,而對于MultiProducerSequencer來說纲缓,cursor表示的是RingBuffer上當前已申請的最大sequence。此處先有個概念即可喊废,下面講完生產邏輯之后會詳細描述

詳解2——ConsumerRepository

    // class ConsumerRepository
    public void add(
        final EventProcessor eventprocessor,
        final EventHandler<? super T> handler,
        final SequenceBarrier barrier)
    {
        final EventProcessorInfo<T> consumerInfo = new EventProcessorInfo<T>(eventprocessor, handler, barrier);
        eventProcessorInfoByEventHandler.put(handler, consumerInfo);
        eventProcessorInfoBySequence.put(eventprocessor.getSequence(), consumerInfo);
        consumerInfos.add(consumerInfo);
    }

無論從類名還是方法體祝高,都可以看出,這個對象主要是用來存儲消費者信息的污筷,有兩個維度的Map工闺。具體是哪里用,我們用到的時候再說好了~

詳解3——一些更新

    private void updateGatingSequencesForNextInChain(Sequence[] barrierSequences, Sequence[] processorSequences)
    {
        if (processorSequences.length > 0)
        {
            ringBuffer.addGatingSequences(processorSequences);
            for (final Sequence barrierSequence : barrierSequences)
            {
                ringBuffer.removeGatingSequence(barrierSequence);
            }
            consumerRepository.unMarkEventProcessorsAsEndOfChain(barrierSequences);
        }
    }

首先瓣蛀,我們要搞清楚這兩個入參的意義:

  • barrierSequences:依賴的消費進度
  • processorSequences:新進消費者的進度

其次陆蟆,還要弄明白一個問題:在向RingBuffer寫入數據的時候,如何判定RingBuffer已滿(這個應該在前面入門那幾篇文章里要掌握的)惋增?通過看最慢的消費者的消費進度是不是已經被生產者拉了一圈了(類似1000米跑步的套圈)叠殷。

理解了上面兩個問題之后,再來看代碼總共做的三個事情:

  1. 把新進消費者的消費進度加入到【所有消費者的消費進度數組】中
  2. 如果說這個新進消費者是依賴了其他的消費者的诈皿,那么把其他的消費者從【所有消費者的消費進度數組】中移除林束。這里為什么要移除呢?因為【所有消費者的消費進度數組】主要是用來獲取最慢的進度的纫塌。那么被依賴的可以不用考慮诊县,因為它不可能比依賴它的慢。并且讓這個數組足夠小措左,可以提升計算最慢進度的性能依痊。
  3. 把被依賴的消費者的endOfChain屬性設置成false。這個endOfChain是用來干嘛的呢怎披?其實主要是Disruptor在shutdown的時候需要判定是否所有消費者都已經消費完了(如果依賴了別人的消費者都消費完了胸嘁,那么整條鏈路上一定都消費完了)。

第三步——啟動Disruptor

    public RingBuffer<T> start()
    {
        checkOnlyStartedOnce();
        for (final ConsumerInfo consumerInfo : consumerRepository)
        {
            consumerInfo.start(executor);
        }

        return ringBuffer;
    }

這個consumerRepository是不是很熟悉凉逛?這是第二步詳解二里ConsumerInfo注冊的地方性宏。可以看到啟動Disruptor其實就是在啟動消費線程:

    // class EventProcessorInfo
    public void start(final Executor executor)
    {
        // 這里對應的是BatchEventProcessor
        executor.execute(eventprocessor);
    }
    
    // class BasicExecutor
    public void execute(Runnable command)
    {
        final Thread thread = factory.newThread(command);
        if (null == thread)
        {
            throw new RuntimeException("Failed to create thread to run: " + command);
        }

        thread.start();

        threads.add(thread);
    }

那么消費線程的具體邏輯是状飞?看看BatchEventProcessorrun()方法:

    public void run()
    {
        if (!running.compareAndSet(false, true))
        {
            throw new IllegalStateException("Thread is already running");
        }
        sequenceBarrier.clearAlert();

        notifyStart();

        T event = null;
        // 成員變量sequence維護該Processor的消費進度
        long nextSequence = sequence.get() + 1L;
        try
        {
            while (true)
            {
                try
                {
                    // 以nextSequence作為底線毫胜,去獲取最大的可用sequence(也就是已經被publish的sequence)
                    final long availableSequence = sequenceBarrier.waitFor(nextSequence);

                    // 如果獲取到的sequence大于等于nextSequence,說明有可以消費的event诬辈,從nextSequence(包含)到availableSequence(包含)這一段的事件就作為同一個批次
                    while (nextSequence <= availableSequence)
                    {
                        event = dataProvider.get(nextSequence);
                        // 調用了前面注冊的回調函數
                        eventHandler.onEvent(event, nextSequence, nextSequence == availableSequence);
                        nextSequence++;
                    }

                    // 消費完一批之后 一次性更新消費進度
                    sequence.set(availableSequence);
                }
                catch (final TimeoutException e)
                {
                    // waitFor超時的場景
                    notifyTimeout(sequence.get());
                }
                catch (final AlertException ex)
                {
                    if (!running.get())
                    {
                        break;
                    }
                }
                catch (final Throwable ex)
                {
                
                    // 消費過程中如果拋出異常酵使,表面上看會更新消費進度,也就是說沒有補償機制焙糟。但實際上默認的策略是會拋異常的口渔,消費線程會直接結束掉
                    exceptionHandler.handleEventException(ex, nextSequence, event);
                    sequence.set(nextSequence);
                    nextSequence++;
                }
            }
        }
        finally
        {
            notifyShutdown();
            running.set(false);
        }
    }

sequenceBarrier.waitFor里的邏輯值得好好看一看,不過我覺得等看完發(fā)布事件的流程之后會更容易理解穿撮。

第四步——發(fā)布事件

這里使用的是EventTranslator的方式來發(fā)布事件的:

    // class Disruptor
    public void publishEvent(final EventTranslator<T> eventTranslator)
    {
        ringBuffer.publishEvent(eventTranslator);
    }
    
    // class RingBuffer
    public void publishEvent(EventTranslator<E> translator)
    {
        final long sequence = sequencer.next();
        translateAndPublish(translator, sequence);
    }
    
    private void translateAndPublish(EventTranslator<E> translator, long sequence)
    {
        try
        {
            translator.translateTo(get(sequence), sequence);
        }
        finally
        {
            sequencer.publish(sequence);
        }
    }

從上面的代碼結構可以看出來缺脉,發(fā)布事件總共分為三個步驟:

  1. 申請sequence
  2. 填充事件內容
  3. 提交發(fā)布

有點類似數據庫事務的味道痪欲。為了便于理解,我們還是以SingleProducerSequencer來分析下上面三個步驟:

申請sequence

    // class SingleProducerSequencer
    public long next()
    {
        return next(1);
    }
    
    public long next(int n)
    {
        if (n < 1)
        {
            throw new IllegalArgumentException("n must be > 0");
        }

        // nextValue這個變量名有點詭異攻礼,實際上表示已經申請到的那個sequence
        long nextValue = this.nextValue;

        // nextSequence表示本次需要申請的最大sequence
        long nextSequence = nextValue + n;
        // 計算出nextSequence在上一圈的點位
        long wrapPoint = nextSequence - bufferSize;
        // 最慢消費進度的緩存
        long cachedGatingSequence = this.cachedValue;
        // 下面這個條件表達式以及其代碼塊解釋起來可能需要比較大的篇幅业踢,所以在下面[核心代碼詳解]里說明
        if (wrapPoint > cachedGatingSequence || cachedGatingSequence > nextValue)
        {
            cursor.setVolatile(nextValue);  // StoreLoad fence

            long minSequence;
            
            while (wrapPoint > (minSequence = Util.getMinimumSequence(gatingSequences, nextValue)))
            {
                // 通知下消費者
                waitStrategy.signalAllWhenBlocking();
                // 生產者如果沒有空間寫數據了,只能無限park
                LockSupport.parkNanos(1L); // TODO: Use waitStrategy to spin?
            }
            
            this.cachedValue = minSequence;
        }

        this.nextValue = nextSequence;

        return nextSequence;
    }
核心代碼詳解

首先秘蛔,我們看下if表達式里的兩個條件:

  1. wrapPoint > cachedGatingSequence陨亡,結合我們對Disruptor的了解,此處判斷的應該是此次要申請的sequence是否已經領先最慢消費進度一圈了(類似1000米跑步的套圈)

  2. cachedGatingSequence > nextValue判斷的是最慢消費進度超過了我們即將要申請的sequence深员,乍一看這應該是不可能的吧负蠕,都還沒申請到該sequence怎么可能消費到呢?找了些資料倦畅,發(fā)現確實是存在該場景的:RingBuffer提供了一個叫resetTo的方法遮糖,可以重置當前已申請sequence為一個指定值并publish出去:

    @Deprecated
    public void resetTo(long sequence)
    {
        sequencer.claim(sequence);
        sequencer.publish(sequence);
    }
    

    具體資料可參考:

    不過該代碼已經標注為@Deprecated,按照作者的意思叠赐,后續(xù)是要刪掉的欲账。那么在此處分析的時候,我們就將當它恒為false芭概。

對于第一個條件表達式赛不,也有個值得注意的地方:因為里面的【最慢消費進度】取的是緩存值(cached)。而這個緩存值是什么時候更新的呢罢洲?答案是只有在“套圈”了以后才會更新踢故。這個邏輯你品,你細品惹苗,那么你會發(fā)現殿较,每申請RingBuffer.size()個sequence之后都會走進上面的“套圈”邏輯來更新cachedGatingSequence。這樣就極大的減少了Util.getMinimumSequence(gatingSequences, nextValue)的運算量

再來看看“套圈”時需要執(zhí)行的邏輯:

  1. 插入一個StoreLoad屏障桩蓉,防止是因為內存可見性導致的消費者消費不了數據(應該極少存在這樣的情況吧)
  2. 實時計算一下最慢消費進度Util.getMinimumSequence(gatingSequences, nextValue)
  3. 如果真的套圈了淋纲,那么就一直死循環(huán)直到RingBuffer上有空間可以申請
  4. 更新【最慢消費進度緩存】

注意,當消費者消費過慢時院究,可能會導致生產者無限park洽瞬,這個在編程的時候要特別留意。

提交發(fā)布

填充事件內容沒什么好說的业汰,無非就是設值的過程伙窃。設值完成之后,就可以發(fā)布了:

    public void publish(long sequence)
    {
        cursor.set(sequence);
        waitStrategy.signalAllWhenBlocking();
    }

這里會把cursor的值設置為當前申請的sequence蔬胯,代表序號為sequence的事件發(fā)布成功。這里的cursor表示已經publish的最大事件序號(在多生產者模式中并不是)位他,所以我們在使用過程中需要依次申請氛濒,依次發(fā)布产场,不能直接上來就publish(100),這樣會導致消費者會認為100以前的序號也都就緒了舞竿。另外京景,由于我們現在看的是單生產者模式,也不需要考慮并發(fā)場景骗奖。

sequenceBarrier.waitFor

看完了生產者的流程确徙,我們來回顧下在消費者里這一句關鍵的代碼。前面有提到過ProcessingSequenceBarrier执桌,帶著這一絲絲的印象我們來看看下面這段獲取availableSequence的邏輯:

    // class ProcessingSequenceBarrier
    public long waitFor(final long sequence)
        throws AlertException, InterruptedException, TimeoutException
    {
        checkAlert();

        // waitStrategy派上用場了鄙皇,這是我們在構造Disruptor的時候的入參(也是構造RingBuffer的入參)
        long availableSequence = waitStrategy.waitFor(sequence, cursorSequence, dependentSequence, this);

        // 理論上沒有可能為true,因為當前每種waitStrategy內都保證了availableSequence一定大于等于sequence
        if (availableSequence < sequence)
        {
            return availableSequence;
        }

        // 返回最大的已發(fā)布的sequence仰挣,在單生產者模式下這個函數返回值就等于availableSequence
        return sequencer.getHighestPublishedSequence(sequence, availableSequence);
    }

跟著上面的注釋伴逸,相信應該沒有什么理解上的難點,上面的代碼核心就兩步:

  1. 通過WaitStrategy.waitFor()獲取availableSequence膘壶,下面會分析具體的邏輯
  2. 通過sequencer來得到最大的已發(fā)布的sequence(HighestPublishedSequence)

WaitStrategy.waitFor

先看看第一步中的WaitStrategy.waitFor()方法错蝴,這里以BlockingWaitStrategy為例:

    // class BlockingWaitStrategy
    // 這里的四個入參我們捋一捋
    // sequence:消費者想要消費的最小sequence(底線)
    // cursorSequence:Sequencer的cursor,也就是當前RingBuffer上已經被申請的最大sequence(在講生產者邏輯的時候提到了)
    // dependentSequence:在我們當前鏈路為cursorSequence颓芭,不存在消費依賴(如果存在依賴的話顷锰,則為依賴消費者消費進度)
    // barrier:這個主要是用了其中一些中斷方法,不用太care
    public long waitFor(long sequence, Sequence cursorSequence, Sequence dependentSequence, SequenceBarrier barrier)
        throws AlertException, InterruptedException
    {
        long availableSequence;
        if (cursorSequence.get() < sequence)
        {
            lock.lock();
            try
            {
                while (cursorSequence.get() < sequence)
                {
                    barrier.checkAlert();
                    processorNotifyCondition.await();
                }
            }
            finally
            {
                lock.unlock();
            }
        }

        // 看到了吧亡问,這里已經保證了availableSequence必然大于等于sequence
        // 并且在存在依賴的場景中官紫,被依賴消費者存在慢消費的話,會直接導致下游進入死循環(huán)
        while ((availableSequence = dependentSequence.get()) < sequence)
        {
            barrier.checkAlert();
        }

        return availableSequence;
    }

從上面的代碼能看到玛界,WaitStrategy.waitFor()獲取的是依賴消費者的消費進度sequence(默認依賴RingBuffer上已申請進度的sequence)万矾。需要注意的一點是,當消費者獲取可消費事件的過程中慎框,存在兩種場景需要等待:

  1. RingBuffer上沒有事件可以消費
  2. RingBuffer上有可消費事件良狈,但是依賴的消費者還未消費完該事件

如果是第一種場景,那么消費者會采用WaitStrategy的策略進行等待笨枯。而如果是第二種場景的話薪丁,只能如上所示一樣進入死循環(huán)(此時可能造成cpu升高)。

sequencer.getHighestPublishedSequence

WaitStrategy.waitFor()返回后馅精,得到的是RingBuffer上已申請進度sequence或者是依賴消費者消費進度sequence(當然如果把cursorSequence也看成一種依賴的話严嗜,理解起來就統一了)。注意一個形容詞——“已申請”洲敢,而不是“已發(fā)布”漫玄,“已申請”意味著還不一定“已發(fā)布”,也就是還不能消費。所以睦优,SequenceBarrier.waitFor最后還有一步sequencer.getHighestPublishedSequence(sequence, availableSequence)渗常。

當然如果你很仔細的看到這里并且對于前面的內容都理解了,你可能會產生疑問:對于單生產者來說汗盘,本來就是在publish的時候才更新cursor的爸宓狻?那上一步從WaitStrategy.waitFor()獲取到的不就是“已發(fā)布”的進度sequence嗎隐孽?是的癌椿,你說得很正確。對于單生產者確實如此菱阵,所以但生產者對應的實現為:

    public long getHighestPublishedSequence(long lowerBound, long availableSequence)
    {
        return availableSequence;
    }

而對于多生產者的話踢俄,邏輯就會相對復雜一點,這個我們下一篇文章再分析

總結

到這里送粱,Disruptor核心的邏輯我們基本上看完了褪贵。我們介紹了Disruptor中的單生產者模式的生產邏輯以及默認的單線程批量消費邏輯。當然這只是最基本的模式抗俄,為了讓我們對Disruptor的邏輯和源代碼有一個整體的了解脆丁。后面的文章我們會涉及更多的場景,比如

  • 多生產者模式是如何工作的
  • 如何實現消費依賴
  • Log4j2是如何使用Disruptor的
  • Disruptor性能高的原因以及使用過程中的一些心得等

如文中有描述錯誤动雹,還望指出槽卫,以便改正,多謝~

?著作權歸作者所有,轉載或內容合作請聯系作者
  • 序言:七十年代末胰蝠,一起剝皮案震驚了整個濱河市歼培,隨后出現的幾起案子,更是在濱河造成了極大的恐慌茸塞,老刑警劉巖躲庄,帶你破解...
    沈念sama閱讀 212,454評論 6 493
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現場離奇詭異钾虐,居然都是意外死亡噪窘,警方通過查閱死者的電腦和手機,發(fā)現死者居然都...
    沈念sama閱讀 90,553評論 3 385
  • 文/潘曉璐 我一進店門效扫,熙熙樓的掌柜王于貴愁眉苦臉地迎上來倔监,“玉大人,你說我怎么就攤上這事菌仁『葡埃” “怎么了?”我有些...
    開封第一講書人閱讀 157,921評論 0 348
  • 文/不壞的土叔 我叫張陵济丘,是天一觀的道長谱秽。 經常有香客問我,道長,這世上最難降的妖魔是什么疟赊? 我笑而不...
    開封第一講書人閱讀 56,648評論 1 284
  • 正文 為了忘掉前任辱士,我火速辦了婚禮,結果婚禮上听绳,老公的妹妹穿的比我還像新娘。我一直安慰自己异赫,他們只是感情好椅挣,可當我...
    茶點故事閱讀 65,770評論 6 386
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著塔拳,像睡著了一般鼠证。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上靠抑,一...
    開封第一講書人閱讀 49,950評論 1 291
  • 那天量九,我揣著相機與錄音,去河邊找鬼颂碧。 笑死荠列,一個胖子當著我的面吹牛,可吹牛的內容都是我干的载城。 我是一名探鬼主播肌似,決...
    沈念sama閱讀 39,090評論 3 410
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼诉瓦!你這毒婦竟也來了川队?” 一聲冷哼從身側響起,我...
    開封第一講書人閱讀 37,817評論 0 268
  • 序言:老撾萬榮一對情侶失蹤睬澡,失蹤者是張志新(化名)和其女友劉穎固额,沒想到半個月后,有當地人在樹林里發(fā)現了一具尸體煞聪,經...
    沈念sama閱讀 44,275評論 1 303
  • 正文 獨居荒郊野嶺守林人離奇死亡斗躏,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內容為張勛視角 年9月15日...
    茶點故事閱讀 36,592評論 2 327
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現自己被綠了米绕。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片瑟捣。...
    茶點故事閱讀 38,724評論 1 341
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖栅干,靈堂內的尸體忽然破棺而出迈套,到底是詐尸還是另有隱情,我是刑警寧澤碱鳞,帶...
    沈念sama閱讀 34,409評論 4 333
  • 正文 年R本政府宣布桑李,位于F島的核電站,受9級特大地震影響,放射性物質發(fā)生泄漏贵白。R本人自食惡果不足惜率拒,卻給世界環(huán)境...
    茶點故事閱讀 40,052評論 3 316
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望禁荒。 院中可真熱鬧猬膨,春花似錦、人聲如沸呛伴。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,815評論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽热康。三九已至沛申,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間姐军,已是汗流浹背铁材。 一陣腳步聲響...
    開封第一講書人閱讀 32,043評論 1 266
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留奕锌,地道東北人著觉。 一個月前我還...
    沈念sama閱讀 46,503評論 2 361
  • 正文 我出身青樓,卻偏偏與公主長得像惊暴,于是被迫代替她去往敵國和親固惯。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當晚...
    茶點故事閱讀 43,627評論 2 350

推薦閱讀更多精彩內容