解讀Disruptor系列--解讀源碼(3)之消費(fèi)者

之前我們已經(jīng)熟悉了Disruptor的啟動(dòng)和事件生產(chǎn)操作锌俱,接下來(lái)我們一同探究Disruptor如何消費(fèi)事件圃阳。

0x00 概念回顧

我們先回顧下Disruptor消費(fèi)相關(guān)的名詞概念:
Event: Disruptor中傳輸?shù)氖录?br> RingBuffer: 存儲(chǔ)和更新事件的容器。
EventHandler: 用戶(hù)實(shí)現(xiàn)接口视卢,包含消費(fèi)處理邏輯纠炮,代表Disruptor一個(gè)消費(fèi)者。
EventProcessor: EventProcessor繼承了Runnable接口莉测,包含處理Disruptor事件的主循環(huán)颜骤。

多播事件: 隊(duì)列和Disruptor在表現(xiàn)行為上最大的區(qū)別唧喉。隊(duì)列中的一個(gè)事件只能被一個(gè)消費(fèi)者消費(fèi),而Disruptor中的事件會(huì)發(fā)布給所有消費(fèi)者忍抽。特別適合同一數(shù)據(jù)的獨(dú)立并行處理操作八孝。
消費(fèi)者依賴(lài)圖(消費(fèi)鏈):同一事件需要被多個(gè)消費(fèi)者消費(fèi)時(shí),消費(fèi)者之間可能有依賴(lài)關(guān)系鸠项,如消費(fèi)者A,B,C干跛,B和C依賴(lài)A先執(zhí)行,但是B和C可以并行消費(fèi)祟绊。

0x01 EventProcessor接口概覽

OK楼入,咱們正式開(kāi)始對(duì)Disruptor消費(fèi)者的源碼解讀。
Disruptor的消費(fèi)者依賴(lài)EventProcessor循環(huán)處理可用事件牧抽。EventProcessor顧名思義嘉熊,就是事件處理器(handle和process都可以翻譯為“處理”,但是process側(cè)重于機(jī)器的處理扬舒,而handle側(cè)重于有人工的處理阐肤,所以使用handle表示用戶(hù)邏輯的處理,使用process表示機(jī)器的處理)讲坎,這個(gè)接口有兩個(gè)實(shí)現(xiàn)類(lèi)孕惜,分別是WorkProcessor和BatchEventProcessor,它們對(duì)應(yīng)的邏輯處理消費(fèi)者分別是EventHandler和WorkHandler晨炕。下面是EventProcessor的UML類(lèi)圖及EventHandler和EventProcessor的接口定義衫画。

image.png
/**
* Callback interface to be implemented for processing events as they become available in the {@link RingBuffer}
*
* @param <T> event implementation storing the data for sharing during exchange or parallel coordination of an event.
* @see BatchEventProcessor#setExceptionHandler(ExceptionHandler) if you want to handle exceptions propagated out of the handler.
* 處理事件的回調(diào)接口
*/
public interface EventHandler<T>
{
    /**
    * Called when a publisher has published an event to the {@link RingBuffer}
    *
    * @param event      published to the {@link RingBuffer}
    * @param sequence  of the event being processed
    * @param endOfBatch flag to indicate if this is the last event in a batch from the {@link RingBuffer}
    * @throws Exception if the EventHandler would like the exception handled further up the chain.
    */
    void onEvent(T event, long sequence, boolean endOfBatch) throws Exception;
}
/**
* EventProcessors waitFor events to become available for consumption from the {@link RingBuffer}
* <p>
* An EventProcessor will generally be associated with a Thread for execution.
* 事件執(zhí)行器,等待RingBuffer有可用消費(fèi)事件瓮栗。一個(gè)事件處理器關(guān)聯(lián)一個(gè)執(zhí)行線(xiàn)程
*/
public interface EventProcessor extends Runnable
{
    /**
    * Get a reference to the {@link Sequence} being used by this {@link EventProcessor}.
    *
    * @return reference to the {@link Sequence} for this {@link EventProcessor}
    */
    Sequence getSequence();

    /**
    * Signal that this EventProcessor should stop when it has finished consuming at the next clean break.
    * It will call {@link SequenceBarrier#alert()} to notify the thread to check status.
    */
    void halt();

    boolean isRunning();
}

EventProcessor接口繼承了Runnable接口碧磅,主要有兩種實(shí)現(xiàn):?jiǎn)尉€(xiàn)程批量處理BatchEventProcessor和多線(xiàn)程處理WorkProcessor碘箍。
在使用Disruptor幫助類(lèi)構(gòu)建消費(fèi)者時(shí),使用handleEventsWith方法傳入多個(gè)EventHandler鲸郊,內(nèi)部使用多個(gè)BatchEventProcessor關(guān)聯(lián)多個(gè)線(xiàn)程執(zhí)行丰榴。這種情況類(lèi)似JMS中的發(fā)布訂閱模式,同一事件會(huì)被多個(gè)消費(fèi)者并行消費(fèi)秆撮。適用于同一事件觸發(fā)多種操作四濒。
而使用Disruptor的handleEventsWithWorkerPool傳入多個(gè)WorkHandler時(shí),內(nèi)部使用多個(gè)WorkProcessor關(guān)聯(lián)多個(gè)線(xiàn)程執(zhí)行职辨。這種情況類(lèi)似JMS的點(diǎn)對(duì)點(diǎn)模式盗蟆,同一事件會(huì)被一組消費(fèi)者其中之一消費(fèi)。適用于提升消費(fèi)者并行處理能力舒裤。

0x02 消費(fèi)技術(shù)實(shí)現(xiàn)

我們先回顧下Disruptor消費(fèi)者的兩個(gè)特點(diǎn):消費(fèi)者依賴(lài)圖(即下文所謂的“消費(fèi)鏈”)和事件多播喳资。
假設(shè)現(xiàn)在有A,B,C,D四個(gè)消費(fèi)者,它們都能組成什么樣的形式呢腾供?從眾多的排列組合中仆邓,我挑了4組比較有代表性的消費(fèi)鏈形式。

image.png
  • 第1組中伴鳖,消費(fèi)者A消費(fèi)按成后节值,B、C榜聂、D可同時(shí)消費(fèi)搞疗;
  • 第2組中,消費(fèi)者A须肆、B匿乃、C、D順序消費(fèi)豌汇;
  • 第3組中幢炸,消費(fèi)者A、B順序消費(fèi)后瘤礁,C阳懂、D同時(shí)消費(fèi);
  • 第4組中柜思,消費(fèi)者A在消費(fèi)完成后岩调,B和C可以同時(shí)消費(fèi),但是必須在都消費(fèi)完成后赡盘,D才能消費(fèi)号枕。

標(biāo)號(hào)為1、3陨享、4的消費(fèi)鏈都使用了事件多播葱淳,可見(jiàn)事件多播屬于消費(fèi)鏈的一種組合形式钝腺。注意,在上面4種組合中赞厕,每個(gè)組合的每一水平行艳狐,都屬于一個(gè)消費(fèi)者組。
這些還只是較為簡(jiǎn)單的消費(fèi)鏈組成皿桑,實(shí)際中消費(fèi)鏈可能會(huì)更復(fù)雜毫目。
那么在Disruptor內(nèi)部是怎么實(shí)現(xiàn)消費(fèi)鏈的呢?
我們可以先思考下诲侮。如果想把獨(dú)立的消費(fèi)者組成消費(fèi)鏈镀虐,那么后方的消費(fèi)者(組)必然要知道在它前方的消費(fèi)者(組)的處理情況,否則就做不到順序消費(fèi)沟绪。同時(shí)刮便,消費(fèi)者也要了解生產(chǎn)者的位置,來(lái)判斷是否有可用事件绽慈。之前我們分析生產(chǎn)者代碼的時(shí)候恨旱,已經(jīng)講過(guò),生產(chǎn)者為了不覆蓋沒(méi)有消費(fèi)完全的事件久信,必須知道最慢消費(fèi)者的處理情況窖杀。
做到了這些才會(huì)有能力去控制消費(fèi)者組成消費(fèi)鏈漓摩。下面讓我們具體看Disruptor中的實(shí)現(xiàn)裙士。

0x02.1 使用BatchEventProcessor單線(xiàn)程批處理事件

在使用BatchEventProcessor時(shí),通過(guò)Disruptor#handleEventsWith方法可以獲取一個(gè)EventHandlerGroup管毙,再通過(guò)EventHandlerGroup的and和then方法可以構(gòu)建一個(gè)復(fù)雜的消費(fèi)者鏈腿椎。EventHandlerGroup表示一組事件消費(fèi)者,內(nèi)部持有了Disruptor類(lèi)實(shí)例disruptor夭咬,其大部分功能都是通過(guò)調(diào)用disruptor實(shí)現(xiàn)啃炸,其實(shí)可以算作是Disruptor這個(gè)輔助類(lèi)的一部分。

// EventHandlerGroup.java
public EventHandlerGroup<T> then(final EventHandler<? super T>... handlers)
{
    return handleEventsWith(handlers);
}

public EventHandlerGroup<T> handleEventsWith(final EventHandler<? super T>... handlers)
{
    return disruptor.createEventProcessors(sequences, handlers);
}
// Disruptor.java
public EventHandlerGroup<T> handleEventsWith(final EventHandler<? super T>... handlers)
{
    return createEventProcessors(new Sequence[0], handlers);
}

// 由EventHandlerGroup調(diào)用時(shí)卓舵,barrierSequences是EventHandlerGroup實(shí)例的序列南用,也就是上一個(gè)事件處理者組的序列,作為當(dāng)前事件處理的門(mén)控掏湾,防止后邊的消費(fèi)鏈超前
// 如果是第一次調(diào)用handleEventsWith裹虫,則barrierSequences是一個(gè)空數(shù)組
EventHandlerGroup<T> **createEventProcessors**(
    final Sequence[] barrierSequences,
    final EventHandler<? super T>[] eventHandlers)
{
    checkNotStarted();
    // 對(duì)應(yīng)此事件處理器組的序列組
    final Sequence[] processorSequences = new Sequence[eventHandlers.length];
    final SequenceBarrier barrier = ringBuffer.newBarrier(barrierSequences);

    for (int i = 0, eventHandlersLength = eventHandlers.length; i < eventHandlersLength; i++)
    {
        final EventHandler<? super T> eventHandler = eventHandlers[i];
        // 批量處理事件的循環(huán)
        final BatchEventProcessor<T> batchEventProcessor =
            new BatchEventProcessor<T>(ringBuffer, barrier, eventHandler);

        if (exceptionHandler != null)
        {
            batchEventProcessor.setExceptionHandler(exceptionHandler);
        }

        consumerRepository.add(batchEventProcessor, eventHandler, barrier);
        processorSequences[i] = batchEventProcessor.getSequence();
    }
    // 每次添加完事件處理器后,更新門(mén)控序列融击,以便后續(xù)調(diào)用鏈的添加筑公。(所謂門(mén)控,是指后續(xù)消費(fèi)鏈的消費(fèi)尊浪,不能超過(guò)前邊匣屡。)
    updateGatingSequencesForNextInChain(barrierSequences, processorSequences);

    return new EventHandlerGroup<T>(this, consumerRepository, processorSequences);
}

// 為消費(fèi)鏈下一組消費(fèi)者封救,更新門(mén)控序列
// barrierSequences是上一組事件處理器組的序列(如果本次是第一次,則為空數(shù)組)捣作,本組不能超過(guò)上組序列值
// processorSequences是本次要設(shè)置的事件處理器組的序列
private void updateGatingSequencesForNextInChain(final Sequence[] barrierSequences, final Sequence[] processorSequences)
{
    if (processorSequences.length > 0)
    {
        ringBuffer.addGatingSequences(processorSequences); // 將本組序列添加到Sequencer中的gatingSequences中
        for (final Sequence barrierSequence : barrierSequences) // 將上組序列從Sequencer中的gatingSequences中誉结,gatingSequences一直保存消費(fèi)鏈末端消費(fèi)者的序列組
        {
            ringBuffer.removeGatingSequence(barrierSequence);
        }
        consumerRepository.unMarkEventProcessorsAsEndOfChain(barrierSequences); // 取消標(biāo)記上一組消費(fèi)者為消費(fèi)鏈末端
    }
}

可以看到,使用BatchEventProcessor構(gòu)建消費(fèi)者鏈時(shí)的邏輯都在createEventProcessors這個(gè)方法中券躁。
先簡(jiǎn)單說(shuō)下ConsumerRepository搓彻,這個(gè)類(lèi)主要保存消費(fèi)者的各種關(guān)系,如通過(guò)EventHandler引用獲取EventProcessorInfo信息嘱朽,通過(guò)Sequence獲取ConsumerInfo信息等旭贬。因?yàn)橐褂靡米鰇ey,所以數(shù)據(jù)結(jié)構(gòu)使用IdentityHashMap搪泳。IdentityHashMap
和HashMap最大的不同稀轨,就是使用==而不是equals比較key
這個(gè)createEventProcessors方法接收兩個(gè)參數(shù)岸军,barrierSequences表示當(dāng)前消費(fèi)者組的屏障序列數(shù)組奋刽,如果當(dāng)前消費(fèi)者組是第一組,則取一個(gè)空的序列數(shù)組艰赞;否則佣谐,barrierSequences就是上一組消費(fèi)者組的序列數(shù)組。createEventProcessors方法的另一個(gè)參數(shù)eventHandlers方妖,這個(gè)參數(shù)是代表事件消費(fèi)邏輯的EventHandler數(shù)組狭魂。
Disruptor為每個(gè)EventHandler實(shí)現(xiàn)類(lèi)都創(chuàng)建了一個(gè)對(duì)應(yīng)的BatchEventProcessor。
在構(gòu)建BatchEventProcessor時(shí)需要以下傳入三個(gè)構(gòu)造參數(shù):dataProvider是數(shù)據(jù)存儲(chǔ)結(jié)構(gòu)如RingBuffer党觅;sequenceBarrier用于跟蹤生產(chǎn)者游標(biāo)雌澄,協(xié)調(diào)數(shù)據(jù)處理;eventHandler是用戶(hù)實(shí)現(xiàn)的事件處理器杯瞻,也就是實(shí)際的消費(fèi)者镐牺。
注意,Disruptor并非為每個(gè)BatchEventProcessor都創(chuàng)建一個(gè)新的SequenceBarrier魁莉,而是每個(gè)消費(fèi)者組共用一個(gè)SequenceBarrier睬涧。
BatchEventProcessor定義如下。至于為什么要叫做BatchEventProcessor旗唁,可以看看在run()方法里每次waitFor獲取的availableSequence是當(dāng)前能夠使用的最大值畦浓,然后再循環(huán)處理這些數(shù)據(jù)。這樣當(dāng)消費(fèi)者有瞬時(shí)抖動(dòng)逆皮,導(dǎo)致暫時(shí)落后生產(chǎn)者時(shí)宅粥,可在下一次循環(huán)中,批量處理所有落后的事件电谣。

/**
* Convenience class for handling the batching semantics of consuming entries from a {@link RingBuffer}
* and delegating the available events to an {@link EventHandler}.
* <p>
* If the {@link EventHandler} also implements {@link LifecycleAware} it will be notified just after the thread
* is started and just before the thread is shutdown.
*
* @param <T> event implementation storing the data for sharing during exchange or parallel coordination of an event.
*
* 每個(gè)EventHandler對(duì)應(yīng)一個(gè)EventProcessor執(zhí)行者秽梅,BatchEventProcessor每次大循環(huán)可以獲取最高可用序號(hào)抹蚀,并循環(huán)調(diào)用EventHandler
*/
public final class BatchEventProcessor<T>
    implements EventProcessor
{
    private final AtomicBoolean running = new AtomicBoolean(false);
    private ExceptionHandler<? super T> exceptionHandler = new FatalExceptionHandler();
    private final DataProvider<T> dataProvider; // 數(shù)據(jù)提供者,默認(rèn)是RingBuffer企垦,也可替換為自己的數(shù)據(jù)結(jié)構(gòu)
    private final SequenceBarrier sequenceBarrier; // 默認(rèn)為ProcessingSequenceBarrier
    private final EventHandler<? super T> eventHandler; // 此EventProcessor對(duì)應(yīng)的用戶(hù)自定義的EventHandler實(shí)現(xiàn)
    private final Sequence sequence = new Sequence(Sequencer.INITIAL_CURSOR_VALUE); // 當(dāng)前執(zhí)行位置
    private final TimeoutHandler timeoutHandler;
    private final BatchStartAware batchStartAware; // 每次循環(huán)取得一批可用事件后环壤,在實(shí)際處理前調(diào)用

    /**
    * Construct a {@link EventProcessor} that will automatically track the progress by updating its sequence when
    * the {@link EventHandler#onEvent(Object, long, boolean)} method returns.
    *
    * @param dataProvider    to which events are published.
    * @param sequenceBarrier on which it is waiting.
    * @param eventHandler    is the delegate to which events are dispatched.
    */
    public BatchEventProcessor(
        final DataProvider<T> dataProvider,
        final SequenceBarrier sequenceBarrier,
        final EventHandler<? super T> eventHandler)
    {
        this.dataProvider = dataProvider;
        this.sequenceBarrier = sequenceBarrier;
        this.eventHandler = eventHandler;

        if (eventHandler instanceof SequenceReportingEventHandler)
        {
            ((SequenceReportingEventHandler<?>) eventHandler).setSequenceCallback(sequence);
        }

        batchStartAware =
                (eventHandler instanceof BatchStartAware) ? (BatchStartAware) eventHandler : null;
        timeoutHandler =
                (eventHandler instanceof TimeoutHandler) ? (TimeoutHandler) eventHandler : null;
    }

    // ... 省略部分代碼

    /**
    * It is ok to have another thread rerun this method after a halt().
    *
    * @throws IllegalStateException if this object instance is already running in a thread
    */
    @Override
    public void run()
    {
        if (!running.compareAndSet(false, true))
        {
            throw new IllegalStateException("Thread is already running");
        }
        sequenceBarrier.clearAlert();

        notifyStart();

        T event = null;
        long nextSequence = sequence.get() + 1L;
        try
        {
            while (true)
            {
                try
                {  // availableSequence返回的是可用的最大值
                    final long availableSequence = sequenceBarrier.waitFor(nextSequence); // 使用給定的等待策略去等待下一個(gè)序列可用
                    if (batchStartAware != null)
                    {
                        batchStartAware.onBatchStart(availableSequence - nextSequence + 1);
                    }
                    // 批處理在此處得以體現(xiàn)
                    while (nextSequence <= availableSequence)
                    {
                        event = dataProvider.get(nextSequence);
                        eventHandler.onEvent(event, nextSequence, nextSequence == availableSequence);
                        nextSequence++;
                    }
                    // eventHandler處理完畢后,更新當(dāng)前序號(hào)
                    sequence.set(availableSequence);
                }
                catch (final TimeoutException e)
                {
                    notifyTimeout(sequence.get());
                }
                catch (final AlertException ex)
                {
                    if (!running.get())
                    {
                        break;
                    }
                }
                catch (final Throwable ex)
                {
                    exceptionHandler.handleEventException(ex, nextSequence, event);
                    sequence.set(nextSequence);
                    nextSequence++;
                }
            }
        }
        finally
        {
            notifyShutdown();
            running.set(false);
        }
    }

}

0x02.2 消費(fèi)者可用序列屏障-SequenceBarrier

我們重點(diǎn)看一下SequenceBarrier钞诡,可直譯為“序列屏障”郑现。SequenceBarrier的主要作用是協(xié)調(diào)獲取消費(fèi)者可處理到的最大序號(hào),內(nèi)部持有著生產(chǎn)者和其依賴(lài)的消費(fèi)者序列荧降。它的接口定義如下接箫。

public interface SequenceBarrier
{
    /**
    * Wait for the given sequence to be available for consumption.<br>
    * 等待指定序列可用
    * @param sequence to wait for
    * @return the sequence up to which is available
    * @throws AlertException      if a status change has occurred for the Disruptor
    * @throws InterruptedException if the thread needs awaking on a condition variable.
    * @throws TimeoutException
    *
    */
    long waitFor(long sequence) throws AlertException, InterruptedException, TimeoutException;

    /**
    * Get the current cursor value that can be read.<br>
    * 獲取當(dāng)前可讀游標(biāo)值
    *
    * @return value of the cursor for entries that have been published.
    *
    */
    long getCursor();

    /**
    * The current alert status for the barrier.<br>
    * 當(dāng)前的alert狀態(tài)
    *
    * @return true if in alert otherwise false.
    */
    boolean isAlerted();

    /**
    * Alert the {@link EventProcessor}s of a status change and stay in this status until cleared.<br>
    *
    * 通知消費(fèi)者狀態(tài)變化。當(dāng)調(diào)用EventProcessor#halt()將調(diào)用此方法朵诫。
    */
    void alert();

    /**
    * Clear the current alert status.<br>
    * 清楚alert狀態(tài)
    */
    void clearAlert();

    /**
    * Check if an alert has been raised and throw an {@link AlertException} if it has.
    * 檢查是否發(fā)生alert辛友,發(fā)生將拋出異常
    * @throws AlertException if alert has been raised.
    */
    void checkAlert() throws AlertException;
}

SequenceBarrier實(shí)例引用被EventProcessor持有,用于等待并獲取可用的消費(fèi)事件剪返,主要體現(xiàn)在waitFor這個(gè)方法废累。
要實(shí)現(xiàn)這個(gè)功能,需要3點(diǎn)條件:

  1. 知道生產(chǎn)者的位置脱盲。
  2. 因?yàn)镈isruptor支持消費(fèi)者鏈邑滨,在不同的消費(fèi)者組之間,要保證后邊的消 費(fèi)者組只有在前消費(fèi)者組中的消費(fèi)者都處理完畢后钱反,才能進(jìn)行處理掖看。
  3. 暫時(shí)沒(méi)有事件可消費(fèi),在等待可用消費(fèi)時(shí)诈铛,還需要使用某種等待策略進(jìn)行等待乙各。

看下SequenceBarrier實(shí)現(xiàn)類(lèi)ProcessingSequenceBarrier的代碼是如何實(shí)現(xiàn)waitFor方法墨礁。

final class ProcessingSequenceBarrier implements SequenceBarrier
{
    private final WaitStrategy waitStrategy; // 等待可用消費(fèi)時(shí)幢竹,指定的等待策略
    private final Sequence dependentSequence; // 依賴(lài)的上組消費(fèi)者的序號(hào),如果當(dāng)前為第一組則為cursorSequence(即生產(chǎn)者發(fā)布游標(biāo)序列)恩静,否則使用FixedSequenceGroup封裝上組消費(fèi)者序列
    private volatile boolean alerted = false; // 當(dāng)觸發(fā)halt時(shí)焕毫,將標(biāo)記alerted為true
    private final Sequence cursorSequence; // AbstractSequencer中的cursor引用,記錄當(dāng)前發(fā)布者發(fā)布的最新位置
    private final Sequencer sequencer; // MultiProducerSequencer 或 SingleProducerSequencer

    public ProcessingSequenceBarrier(
        final Sequencer sequencer,
        final WaitStrategy waitStrategy,
        final Sequence cursorSequence,
        final Sequence[] dependentSequences)
    {
        this.sequencer = sequencer;
        this.waitStrategy = waitStrategy;
        this.cursorSequence = cursorSequence;
        if (0 == dependentSequences.length) // 依賴(lài)的上一組序列長(zhǎng)度驶乾,第一次是0
        {
            dependentSequence = cursorSequence;
        }
        else // 將上一組序列數(shù)組復(fù)制成新數(shù)組保存邑飒,引用不變
        {
            dependentSequence = new FixedSequenceGroup(dependentSequences);
        }
    }

    @Override
    public long waitFor(final long sequence)
        throws AlertException, InterruptedException, TimeoutException
    {
        // 檢查是否停止服務(wù)
        checkAlert();
        // 獲取最大可用序號(hào) sequence為給定序號(hào),一般為當(dāng)前序號(hào)+1级乐,cursorSequence記錄生產(chǎn)者最新位置疙咸,
        long availableSequence = waitStrategy.waitFor(sequence, cursorSequence, dependentSequence, this);

        if (availableSequence < sequence)
        {
            return availableSequence;
        }
        // 返回已發(fā)布最高的序列值,將對(duì)每個(gè)序號(hào)進(jìn)行校驗(yàn)
        return sequencer.getHighestPublishedSequence(sequence, availableSequence);
    }

    // ... 
}

0x02.3 該用什么姿勢(shì)等待可用事件-WaitStrategy

看來(lái)實(shí)際的等待操作還是在WaitStrategy#waitFor完成的风科。

// WaitStrategy.java
/**
* Strategy employed for making {@link EventProcessor}s wait on a cursor {@link Sequence}. <br>
* 消費(fèi)者等待可用事件的策略
*/
public interface WaitStrategy
{
    /**
    * Wait for the given sequence to be available.  It is possible for this method to return a value
    * less than the sequence number supplied depending on the implementation of the WaitStrategy.  A common
    * use for this is to signal a timeout.  Any EventProcessor that is using a WaitStrategy to get notifications
    * about message becoming available should remember to handle this case.  The {@link BatchEventProcessor} explicitly
    * handles this case and will signal a timeout if required.
    *
    * @param sequence          to be waited on. 給定序號(hào)
    * @param cursor            the main sequence from ringbuffer. Wait/notify strategies will
    *                          need this as it's the only sequence that is also notified upon update. 生產(chǎn)者游標(biāo)
    * @param dependentSequence on which to wait. 依賴(lài)的序列撒轮,一般是上一個(gè)消費(fèi)者組序列的FixedSequenceGroup封裝乞旦。如果消費(fèi)者是第一組,則為cursor题山。
    * @param barrier          the processor is waiting on. 在等待時(shí)需要判斷是否對(duì)消費(fèi)者有alert操作
    * @return the sequence that is available which may be greater than the requested sequence.
    * @throws AlertException      if the status of the Disruptor has changed.
    * @throws InterruptedException if the thread is interrupted.
    * @throws TimeoutException
    */
    long waitFor(long sequence, Sequence cursor, Sequence dependentSequence, SequenceBarrier barrier)
        throws AlertException, InterruptedException, TimeoutException;

    /**
    * Implementations should signal the waiting {@link EventProcessor}s that the cursor has advanced. <br>
    * 當(dāng)生產(chǎn)者發(fā)布新事件后兰粉,將通知等待的EventProcessor。當(dāng)用鎖機(jī)制時(shí)才會(huì)包含相應(yīng)邏輯顶瞳。
    */
    void signalAllWhenBlocking();
}

在各種等待策略中玖姑,我們選取阻塞策略研究。
public final class BlockingWaitStrategy implements WaitStrategy
{
    private final Lock lock = new ReentrantLock();
    private final Condition processorNotifyCondition = lock.newCondition();

    @Override
    public long waitFor(long sequence, Sequence cursorSequence, Sequence dependentSequence, SequenceBarrier barrier)
        throws AlertException, InterruptedException
    {
        long availableSequence;
        if (cursorSequence.get() < sequence) // 當(dāng)前游標(biāo)小于給定序號(hào)慨菱,也就是無(wú)可用事件
        {
            lock.lock();
            try
            {
                while (cursorSequence.get() < sequence) // 當(dāng)給定的序號(hào)大于生產(chǎn)者游標(biāo)序號(hào)時(shí)焰络,進(jìn)行等待
                {
                    barrier.checkAlert();
                    // 循環(huán)等待,在Sequencer中publish進(jìn)行喚醒;等待消費(fèi)時(shí)也會(huì)在循環(huán)中定時(shí)喚醒早处。
                    // 循環(huán)等待的原因扛或,是要檢查alert狀態(tài)。如果不檢查將導(dǎo)致不能關(guān)閉Disruptor备蚓。
                    processorNotifyCondition.await();
                }
            }
            finally
            {
                lock.unlock();
            }
        }
// 給定序號(hào)大于上一個(gè)消費(fèi)者組最慢消費(fèi)者(如當(dāng)前消費(fèi)者為第一組則和生產(chǎn)者游標(biāo)序號(hào)比較)序號(hào)時(shí),需要等待囱稽。不能超前消費(fèi)上一個(gè)消費(fèi)者組未消費(fèi)完畢的事件郊尝。
// 那么為什么這里沒(méi)有鎖呢?可以想一下此時(shí)的場(chǎng)景战惊,代碼運(yùn)行至此流昏,已能保證生產(chǎn)者有新事件,如果進(jìn)入循環(huán)吞获,說(shuō)明上一組消費(fèi)者還未消費(fèi)完畢况凉。
// 而通常我們的消費(fèi)者都是較快完成任務(wù)的,所以這里才會(huì)考慮使用Busy Spin的方式等待上一組消費(fèi)者完成消費(fèi)各拷。
        while ((availableSequence = dependentSequence.get()) < sequence)
        {
            barrier.checkAlert();
        }

        return availableSequence;
    }

    @Override
    public void signalAllWhenBlocking()
    {
        lock.lock();
        try
        {
            processorNotifyCondition.signalAll();
        }
        finally
        {
            lock.unlock();
        }
    }

    @Override
    public String toString()
    {
        return "BlockingWaitStrategy{" +
            "processorNotifyCondition=" + processorNotifyCondition +
            '}';
    }
}

阻塞等待策略使用Lock+Condition的方式等待生產(chǎn)者生產(chǎn)可用事件刁绒,而使用Busy Spin的方式等待可能出現(xiàn)的上一個(gè)消費(fèi)者組未消費(fèi)完成的情況。
這里給我們一個(gè)提示烤黍,在構(gòu)建低延遲系統(tǒng)時(shí)知市,因?yàn)殒i的性能消耗,盡量不要使用鎖速蕊。如果必須要用鎖嫂丙,也要把鎖粒度調(diào)到最小。
另外规哲,消費(fèi)者在等待可用消費(fèi)事件時(shí)跟啤,會(huì)循環(huán)調(diào)用barrier.checkAlert(),再去調(diào)用鎖的條件等待,等待可用消費(fèi)事件隅肥。
有三個(gè)地方可以喚醒等待中的消費(fèi)線(xiàn)程关顷。兩種是在Sequencer實(shí)現(xiàn)類(lèi)中,一是有可用事件發(fā)布武福,通知消費(fèi)線(xiàn)程繼續(xù)消費(fèi)议双;二是在調(diào)用next()獲取可用的RingBuffer槽位時(shí),發(fā)現(xiàn)RingBuffer滿(mǎn)了(生產(chǎn)者速度大于消費(fèi)者捉片,導(dǎo)致生產(chǎn)者沒(méi)有可用位置發(fā)布事件)平痰,將喚醒消費(fèi)者線(xiàn)程,此功能在3.3.5版本新增(Resignal any waiting threads when trying to publish to a full ring buffer )伍纫。開(kāi)始我百思不得宗雇,為什么要在buffer滿(mǎn)了的時(shí)候不斷喚醒消費(fèi)者線(xiàn)程,直到看到這個(gè)issue才明白莹规。大意是在log4j2中使用Disruptor時(shí)發(fā)生了死鎖赔蒲,為了避免在發(fā)布事件時(shí),由于某種原因?qū)е聸](méi)有通知到消費(fèi)者良漱,在生產(chǎn)者嘗試往一個(gè)已滿(mǎn)的buffer發(fā)布數(shù)據(jù)時(shí)舞虱,就會(huì)再通知消費(fèi)者進(jìn)行消費(fèi)。而這個(gè)bug最終也被Log4j認(rèn)領(lǐng)母市,與Disruptor無(wú)關(guān)矾兜。Disruptor這里的再次通知也是為了更加保險(xiǎn)。

//*ProducerSequencer.java
// next(n)中的代碼
// 由于慢消費(fèi)者患久,無(wú)可用坑位椅寺,只有當(dāng)消費(fèi)者消費(fèi),向前移動(dòng)后蒋失,才能跳出循環(huán)
// 由于外層判斷使用的是緩存的消費(fèi)者序列最小值返帕,這里使用真實(shí)的消費(fèi)者序列進(jìn)行判斷,并將最新結(jié)果在跳出while循環(huán)之后進(jìn)行緩存
while (wrapPoint > (minSequence = Util.getMinimumSequence(gatingSequences, nextValue)))
{  // 喚醒等待的消費(fèi)者篙挽,正常情況下并無(wú)意義荆萤,只是為了避免極少數(shù)情況下未知原因?qū)е碌陌l(fā)布時(shí)鎖機(jī)制出現(xiàn)異常,未通知到消費(fèi)者
    waitStrategy.signalAllWhenBlocking();
    LockSupport.parkNanos(1L); // TODO: Use waitStrategy to spin?
}

還有一種喚醒就是關(guān)閉Disruptor時(shí)嫉髓,消費(fèi)者關(guān)閉前將會(huì)處理完當(dāng)前批次數(shù)據(jù)(并非RingBuffer的所有數(shù)據(jù)观腊,而是此次循環(huán)取出的最大可用序號(hào)以下的所有未處理數(shù)據(jù)),如果消費(fèi)者線(xiàn)程當(dāng)前在等待狀態(tài)算行,將被喚醒并終結(jié)。
BatchEventProcessor就講到這苫耸。

0x02.4 使用WorkProcessor多線(xiàn)程處理事件

下面說(shuō)一說(shuō)WorkHandler+WorkProcessor州邢。
上面講過(guò),使用EventHandler+BatchEventProcessor這種方式類(lèi)似JMS的發(fā)布訂閱,同一個(gè)事件會(huì)被不同線(xiàn)程的EventHandler并行消費(fèi)量淌。那么骗村,如果單線(xiàn)程處理能力不足,想多線(xiàn)程處理同一主題下的不同事件該怎么辦呢呀枢?這種方式就類(lèi)似JMS的點(diǎn)到點(diǎn)模式胚股,多個(gè)消費(fèi)者可以監(jiān)聽(tīng)同一個(gè)隊(duì)列,誰(shuí)先拿到就歸誰(shuí)處理裙秋。
在Disruptor中使用WorkHandler+WorkProcessor實(shí)現(xiàn)以上功能琅拌。當(dāng)需要使用這種模式,可在設(shè)置Disruptor消費(fèi)者時(shí)摘刑,通過(guò)使用handleEventsWithWorkerPool和thenHandleEventsWithWorkerPool設(shè)置消費(fèi)鏈进宝。

disruptor
    .handleEventsWithWorkerPool(
      new WorkHandler[]{
          journalHandler,
          journalHandler,
          journalHandler
      }
    )
    .thenHandleEventsWithWorkerPool(resultHandler);

先看下相關(guān)的源碼。

// Disruptor
public EventHandlerGroup<T> handleEventsWithWorkerPool(final WorkHandler<T>... workHandlers)
{
    return createWorkerPool(new Sequence[0], workHandlers);
}

EventHandlerGroup<T> createWorkerPool(
    final Sequence[] barrierSequences, final WorkHandler<? super T>[] workHandlers)
{
    final SequenceBarrier sequenceBarrier = ringBuffer.newBarrier(barrierSequences);
    final WorkerPool<T> workerPool = new WorkerPool<T>(ringBuffer, sequenceBarrier, exceptionHandler, workHandlers);

    consumerRepository.add(workerPool, sequenceBarrier);

    final Sequence[] workerSequences = workerPool.getWorkerSequences();

    updateGatingSequencesForNextInChain(barrierSequences, workerSequences);

    return new EventHandlerGroup<T>(this, consumerRepository, workerSequences);
}

// WorkerPool.java WorkerPool構(gòu)造方法
public WorkerPool(
    final RingBuffer<T> ringBuffer,
    final SequenceBarrier sequenceBarrier,
    final ExceptionHandler<? super T> exceptionHandler,
    final WorkHandler<? super T>... workHandlers)
{
    this.ringBuffer = ringBuffer;
    final int numWorkers = workHandlers.length;
    workProcessors = new WorkProcessor[numWorkers];

    for (int i = 0; i < numWorkers; i++)
    {
        workProcessors[i] = new WorkProcessor<T>( // 為每個(gè)WorkHandler新建一個(gè)WorkProcessor
            ringBuffer,
            sequenceBarrier,
            workHandlers[i],
            exceptionHandler,
            workSequence);
    }
}

在使用線(xiàn)程池處理事件時(shí)枷恕,與單線(xiàn)程處理相比党晋,最大的不同在于新增了一個(gè)WorkerPool。WorkerPool用于管理一組WorkProcessor徐块,它的屬性未玻、方法如下。

image.png

WorkProcessor的原理和BatchEventProcessor類(lèi)似胡控,只是多了workSequence用來(lái)保存同組共用的處理序列深胳。在更新workSequence時(shí),涉及多線(xiàn)程操作铜犬,所以使用CAS進(jìn)行更新舞终。
WorkProcessor的run()方法如下。

@Override
public void run()
{
    if (!running.compareAndSet(false, true))
    {
        throw new IllegalStateException("Thread is already running");
    }
    sequenceBarrier.clearAlert();

    notifyStart();

    boolean processedSequence = true;
    long cachedAvailableSequence = Long.MIN_VALUE;
    long nextSequence = sequence.get();
    T event = null;
    while (true)
    {
        try
        {
            // if previous sequence was processed - fetch the next sequence and set
            // that we have successfully processed the previous sequence
            // typically, this will be true
            // this prevents the sequence getting too far forward if an exception
            // is thrown from the WorkHandler
            if (processedSequence) // 表示nextSequence序號(hào)的處理情況(不區(qū)分正逞⒒或是異常處理)敛劝。只有處理過(guò),才能申請(qǐng)下一個(gè)序號(hào)纷宇。
            {
                processedSequence = false;
                do
                {
                    // 同組中多個(gè)消費(fèi)線(xiàn)程有可能會(huì)爭(zhēng)搶一個(gè)序號(hào)夸盟,使用CAS避免使用鎖。
                    // 同一組使用一個(gè)workSequence像捶,WorkProcessor不斷申請(qǐng)下一個(gè)可用序號(hào)上陕,對(duì)workSequence設(shè)置成功才會(huì)實(shí)際消費(fèi)。
                    nextSequence = workSequence.get() + 1L;
                    sequence.set(nextSequence - 1L);
                }
                while (!workSequence.compareAndSet(nextSequence - 1L, nextSequence));
            }
            // 緩存的可用序號(hào)比要處理的序號(hào)大拓春,才能進(jìn)行處理
            if (cachedAvailableSequence >= nextSequence)
            {
                event = ringBuffer.get(nextSequence);
                workHandler.onEvent(event);
                processedSequence = true;
            }
            else // 更新緩存的可用序列释簿。這個(gè)cachedAvailableSequence只用在WorkProcessor實(shí)例內(nèi),不同實(shí)例的緩存可能是不一樣的
            {     // 和單線(xiàn)程模式類(lèi)似硼莽,返回的也是最大可用序號(hào)
                cachedAvailableSequence = sequenceBarrier.waitFor(nextSequence);
            }
        }
        catch (final TimeoutException e)
        {
            notifyTimeout(sequence.get());
        }
        catch (final AlertException ex)
        {
            if (!running.get())
            {
                break;
            }
        }
        catch (final Throwable ex)
        {
            // handle, mark as processed, unless the exception handler threw an exception
            exceptionHandler.handleEventException(ex, nextSequence, event);
            processedSequence = true;
        }
    }

    notifyShutdown();

    running.set(false);
}

代碼邏輯和BatchEventProcessor類(lèi)似庶溶,就不再贅述啦。
還有一點(diǎn)需要留意,Disruptor通過(guò)EventHandlerGroup代表一個(gè)消費(fèi)者組偏螺,就表示之前那四張圖中一個(gè)水平線(xiàn)上的消費(fèi)者組行疏。這樣不同的消費(fèi)者組之間不必關(guān)心各自的實(shí)現(xiàn),從而可以實(shí)現(xiàn)更加復(fù)雜和靈活的消費(fèi)鏈套像,即依賴(lài)圖表酿联。

0x03 消費(fèi)者小結(jié)

從小語(yǔ)文老師就教育我們寫(xiě)作文要總結(jié),好習(xí)慣不能忘~
本文主要探討了Disruptor消費(fèi)者內(nèi)部概要實(shí)現(xiàn)夺巩,重點(diǎn)闡述了BatchEventProcessor贞让、WorkProcess的消費(fèi)代碼原理。同時(shí)省略了超時(shí)通知劲够、開(kāi)始和結(jié)束通知震桶、異常控制等內(nèi)容征绎,并非不重要蹲姐,而只是盡量言簡(jiǎn)意賅,達(dá)到拋磚引玉的目的人柿。
BatchEventProcessor主要用于處理單線(xiàn)程并行任務(wù)柴墩,同一消費(fèi)者組的不同消費(fèi)者會(huì)接收相同的事件,并在所有事件處理完畢后進(jìn)入下一消費(fèi)者組進(jìn)行處理(是不是類(lèi)似JUC里的Phaser凫岖、CyclicBarrier或CountDownLatch呢)江咳。WorkProcessor通過(guò)WorkerPool管理多個(gè)WorkProcessor,達(dá)到多線(xiàn)程處理事件的目的哥放,同一消費(fèi)者組的多個(gè)WorkProcessor不會(huì)處理同一個(gè)事件歼指。通過(guò)選擇不同的WaitStragegy實(shí)現(xiàn),可以控制消費(fèi)者在沒(méi)有可用事件處理時(shí)的等待策略甥雕。
好啦踩身,有關(guān)Disruptor消費(fèi)者的分享就到這。
歡迎大家留言討論社露,一同探討挟阻,一同進(jìn)步。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末峭弟,一起剝皮案震驚了整個(gè)濱河市附鸽,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌瞒瘸,老刑警劉巖坷备,帶你破解...
    沈念sama閱讀 221,695評(píng)論 6 515
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場(chǎng)離奇詭異挨务,居然都是意外死亡击你,警方通過(guò)查閱死者的電腦和手機(jī)玉组,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 94,569評(píng)論 3 399
  • 文/潘曉璐 我一進(jìn)店門(mén)谎柄,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)丁侄,“玉大人,你說(shuō)我怎么就攤上這事朝巫『枰。” “怎么了?”我有些...
    開(kāi)封第一講書(shū)人閱讀 168,130評(píng)論 0 360
  • 文/不壞的土叔 我叫張陵劈猿,是天一觀的道長(zhǎng)拙吉。 經(jīng)常有香客問(wèn)我,道長(zhǎng)揪荣,這世上最難降的妖魔是什么筷黔? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 59,648評(píng)論 1 297
  • 正文 為了忘掉前任,我火速辦了婚禮仗颈,結(jié)果婚禮上佛舱,老公的妹妹穿的比我還像新娘。我一直安慰自己挨决,他們只是感情好请祖,可當(dāng)我...
    茶點(diǎn)故事閱讀 68,655評(píng)論 6 397
  • 文/花漫 我一把揭開(kāi)白布。 她就那樣靜靜地躺著脖祈,像睡著了一般肆捕。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上盖高,一...
    開(kāi)封第一講書(shū)人閱讀 52,268評(píng)論 1 309
  • 那天慎陵,我揣著相機(jī)與錄音,去河邊找鬼喻奥。 笑死席纽,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的映凳。 我是一名探鬼主播胆筒,決...
    沈念sama閱讀 40,835評(píng)論 3 421
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼诈豌!你這毒婦竟也來(lái)了仆救?” 一聲冷哼從身側(cè)響起,我...
    開(kāi)封第一講書(shū)人閱讀 39,740評(píng)論 0 276
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤矫渔,失蹤者是張志新(化名)和其女友劉穎彤蔽,沒(méi)想到半個(gè)月后,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體庙洼,經(jīng)...
    沈念sama閱讀 46,286評(píng)論 1 318
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡顿痪,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 38,375評(píng)論 3 340
  • 正文 我和宋清朗相戀三年镊辕,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片蚁袭。...
    茶點(diǎn)故事閱讀 40,505評(píng)論 1 352
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡征懈,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出揩悄,到底是詐尸還是另有隱情卖哎,我是刑警寧澤,帶...
    沈念sama閱讀 36,185評(píng)論 5 350
  • 正文 年R本政府宣布删性,位于F島的核電站亏娜,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏蹬挺。R本人自食惡果不足惜维贺,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,873評(píng)論 3 333
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望巴帮。 院中可真熱鬧溯泣,春花似錦、人聲如沸晰韵。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 32,357評(píng)論 0 24
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)雪猪。三九已至栏尚,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間只恨,已是汗流浹背译仗。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 33,466評(píng)論 1 272
  • 我被黑心中介騙來(lái)泰國(guó)打工, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留官觅,地道東北人纵菌。 一個(gè)月前我還...
    沈念sama閱讀 48,921評(píng)論 3 376
  • 正文 我出身青樓,卻偏偏與公主長(zhǎng)得像休涤,于是被迫代替她去往敵國(guó)和親咱圆。 傳聞我的和親對(duì)象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,515評(píng)論 2 359

推薦閱讀更多精彩內(nèi)容