Disruptor - 工作流程(2)

開篇

?整個博文希望能夠講清楚Disruptor的producer和consumer的處理過程以及兩者之間的消息通知機(jī)制级解。


工作過程

?Disruptor本質(zhì)上是一個內(nèi)存消息隊(duì)列悲靴,適合生產(chǎn)者消費(fèi)者模型饥脑,所以它的整個工作過程其實(shí)也就分三個大過程痕寓,分別是Disruptor本身啟動、Disruptor生產(chǎn)者工作過程、Disruptor消費(fèi)者工作過程购公。整個Disruptor工作流程圖如下圖:

Disruptor工作流程圖


Disruptor對象關(guān)系圖

Disruptor對象關(guān)系圖

說明:

  • Disruptor包含:RingBuffer對象、Executor對象雁歌。
  • RingBuffer包含:Sequencer sequencer對象宏浩、int bufferSize變量、Object[] entries變量靠瞎。
  • Sequencer類分為MultiProducerSequencer和SingleProducerSequencer兩類比庄。
  • Disruptor的整個啟動過程就是上面變量的初始化過程。


Disruptor啟動過程

Disruptor初始化過程

  • 1乏盐、創(chuàng)建Disruptor對象佳窑。
  • 2、創(chuàng)建RingBuffer對象父能。
  • 3神凑、Sequencer分為MultiProducerSequencer和SingleProducerSequencer兩類。
  • 4法竞、整個過程按照Disruptor->RingBuffer->Sequencer的順序創(chuàng)建核心對象耙厚。
 Disruptor<LongEvent> d = new Disruptor<LongEvent>(
            LongEvent.FACTORY, 2048, DaemonThreadFactory.INSTANCE,
            producerType, new SleepingWaitStrategy());


public class Disruptor<T>
{
    private final RingBuffer<T> ringBuffer;
    private final Executor executor;
    private final ConsumerRepository<T> consumerRepository = new ConsumerRepository<T>();
    private final AtomicBoolean started = new AtomicBoolean(false);
    private ExceptionHandler<? super T> exceptionHandler = new ExceptionHandlerWrapper<T>();

    @Deprecated
    public Disruptor(final EventFactory<T> eventFactory, final int ringBufferSize, final Executor executor)
    {
        // RingBuffer.createMultiProducer創(chuàng)建RingBuffer對象
        this(RingBuffer.createMultiProducer(eventFactory, ringBufferSize), executor);
    }

    @Deprecated
    public Disruptor(
        final EventFactory<T> eventFactory,
        final int ringBufferSize,
        final Executor executor,
        final ProducerType producerType,
        final WaitStrategy waitStrategy)
    {
         // RingBuffer.create創(chuàng)建RingBuffer對象
        this(RingBuffer.create(producerType, eventFactory, ringBufferSize, waitStrategy), executor);
    }

    public Disruptor(final EventFactory<T> eventFactory, final int ringBufferSize, final ThreadFactory threadFactory)
    {
         // RingBuffer.createMultiProducer創(chuàng)建RingBuffer對象
        this(RingBuffer.createMultiProducer(eventFactory, ringBufferSize), new BasicExecutor(threadFactory));
    }

    public Disruptor(
            final EventFactory<T> eventFactory,
            final int ringBufferSize,
            final ThreadFactory threadFactory,
            final ProducerType producerType,
            final WaitStrategy waitStrategy)
    {
        // RingBuffer.createMultiProducer創(chuàng)建RingBuffer對象
        this(
            RingBuffer.create(producerType, eventFactory, ringBufferSize, waitStrategy),
            new BasicExecutor(threadFactory));
    }

    // 真正核心的構(gòu)造函數(shù)參數(shù)包括一個RingBuffer對象和Executor對象强挫,前者由于保存數(shù)據(jù)岔霸,后者用于執(zhí)行消費(fèi)者函數(shù)
    private Disruptor(final RingBuffer<T> ringBuffer, final Executor executor)
    {
        this.ringBuffer = ringBuffer;
        this.executor = executor;
    }
}



創(chuàng)建Sequencer對象,根據(jù)單生產(chǎn)者還是多生產(chǎn)者分為SingleProducerSequencer和MultiProducerSequencer兩類俯渤。

abstract class RingBufferPad
{
    protected long p1, p2, p3, p4, p5, p6, p7;
}

abstract class RingBufferFields<E> extends RingBufferPad
{
    //用于填充的對象引用呆细,為什么填充不知道?
    private static final int BUFFER_PAD;
    //entry存儲位置相對與array起始位置的偏移量八匠,用于UNSAFE內(nèi)存操作時進(jìn)行尋址絮爷,注意這個偏移量加上了用于填充的BUFFER_PAD大小
    private static final long REF_ARRAY_BASE;
    //對應(yīng)對象引用占用內(nèi)存大小,計(jì)算出來的相對位移數(shù)梨树,比如對象引用大小是4byte坑夯,那么REF_ELEMENT_SHIFT=2,因?yàn)?的2次方=4抡四;
    private static final int REF_ELEMENT_SHIFT;
    private static final Unsafe UNSAFE = Util.getUnsafe();

    static
    {
        final int scale = UNSAFE.arrayIndexScale(Object[].class);
        if (4 == scale)
        {
            REF_ELEMENT_SHIFT = 2;
        }
        else if (8 == scale)
        {
            REF_ELEMENT_SHIFT = 3;
        }
        else
        {
            throw new IllegalStateException("Unknown pointer size");
        }

        BUFFER_PAD = 128 / scale;
        REF_ARRAY_BASE = UNSAFE.arrayBaseOffset(Object[].class) + (BUFFER_PAD << REF_ELEMENT_SHIFT);
    }

    private final long indexMask;
    // RingBuffer存儲數(shù)據(jù)對象
    private final Object[] entries;
    // RingBuffer數(shù)組大小
    protected final int bufferSize;
    // Sequencer對象
    protected final Sequencer sequencer;

    RingBufferFields(
        EventFactory<E> eventFactory,
        Sequencer sequencer)
    {
        this.sequencer = sequencer;
        this.bufferSize = sequencer.getBufferSize();

        if (bufferSize < 1)
        {
            throw new IllegalArgumentException("bufferSize must not be less than 1");
        }
        if (Integer.bitCount(bufferSize) != 1)
        {
            throw new IllegalArgumentException("bufferSize must be a power of 2");
        }

        this.indexMask = bufferSize - 1;
        this.entries = new Object[sequencer.getBufferSize() + 2 * BUFFER_PAD];
        fill(eventFactory);
    }
}

public final class RingBuffer<E> extends RingBufferFields<E> implements Cursored, EventSequencer<E>, EventSink<E>
{
    public static final long INITIAL_CURSOR_VALUE = Sequence.INITIAL_VALUE;
    protected long p1, p2, p3, p4, p5, p6, p7;

    RingBuffer(
        EventFactory<E> eventFactory,
        Sequencer sequencer)
    {
        super(eventFactory, sequencer);
    }


    public static <E> RingBuffer<E> createMultiProducer(
        EventFactory<E> factory,
        int bufferSize,
        WaitStrategy waitStrategy)
    {
        // 創(chuàng)建Sequencer對象
        MultiProducerSequencer sequencer = new MultiProducerSequencer(bufferSize, waitStrategy);

        return new RingBuffer<E>(factory, sequencer);
    }


    // 創(chuàng)建Sequencer對象的過程
    public static <E> RingBuffer<E> createMultiProducer(EventFactory<E> factory, int bufferSize)
    {
        return createMultiProducer(factory, bufferSize, new BlockingWaitStrategy());
    }


    public static <E> RingBuffer<E> createSingleProducer(
        EventFactory<E> factory,
        int bufferSize,
        WaitStrategy waitStrategy)
    {
        // 創(chuàng)建Sequencer對象
        SingleProducerSequencer sequencer = new SingleProducerSequencer(bufferSize, waitStrategy);

        return new RingBuffer<E>(factory, sequencer);
    }


    public static <E> RingBuffer<E> createSingleProducer(EventFactory<E> factory, int bufferSize)
    {
        return createSingleProducer(factory, bufferSize, new BlockingWaitStrategy());
    }


    public static <E> RingBuffer<E> create(
        ProducerType producerType,
        EventFactory<E> factory,
        int bufferSize,
        WaitStrategy waitStrategy)
    {
        switch (producerType)
        {
            case SINGLE:
                return createSingleProducer(factory, bufferSize, waitStrategy);
            case MULTI:
                return createMultiProducer(factory, bufferSize, waitStrategy);
            default:
                throw new IllegalStateException(producerType.toString());
        }
    }
}



BasicExecutor只是簡單的實(shí)現(xiàn)了Executor接口柜蜈,用于解決沒有傳遞Executor對象的時候使用默認(rèn)的BasicExecutor即可,可以理解就是默認(rèn)提供的線程池對象指巡。

public class BasicExecutor implements Executor
{
    private final ThreadFactory factory;
    private final Queue<Thread> threads = new ConcurrentLinkedQueue<Thread>();

    public BasicExecutor(ThreadFactory factory)
    {
        this.factory = factory;
    }

    @Override
    public void execute(Runnable command)
    {
        final Thread thread = factory.newThread(command);
        if (null == thread)
        {
            throw new RuntimeException("Failed to create thread to run: " + command);
        }

        thread.start();

        threads.add(thread);
    }
}


綁定消費(fèi)者接口

?整個綁定消費(fèi)者接口主要完成了以下幾個步驟:

  • 1淑履、根據(jù)消費(fèi)者接口創(chuàng)建BatchEventProcessor對象
  • 2、將BatchEventProcessor添加到consumerRepository當(dāng)中
  • 3藻雪、updateGatingSequencesForNextInChain內(nèi)通過ringBuffer.addGatingSequences()方法添加消費(fèi)者的Sequence到gatingSequences秘噪,完成了消費(fèi)者和生產(chǎn)者之間的關(guān)聯(lián)
// 創(chuàng)建Disruptor對象并通過handleEventsWith綁定消費(fèi)者Handler對象
public void shouldBatch() throws Exception
{
    Disruptor<LongEvent> d = new Disruptor<LongEvent>(
            LongEvent.FACTORY, 2048, DaemonThreadFactory.INSTANCE,
            producerType, new SleepingWaitStrategy());

    ParallelEventHandler handler1 = new ParallelEventHandler(1, 0);
    ParallelEventHandler handler2 = new ParallelEventHandler(1, 1);
    
    d.handleEventsWith(handler1, handler2);
    RingBuffer<LongEvent> buffer = d.start();
}


// handleEventsWith用于綁定消費(fèi)者對象
public class Disruptor<T>
{
    // step1 根據(jù)消費(fèi)者接口創(chuàng)建消費(fèi)者對象createEventProcessors
    public EventHandlerGroup<T> handleEventsWith(final EventHandler<? super T>... handlers)
    {
        return createEventProcessors(new Sequence[0], handlers);
    }

    public EventHandlerGroup<T> handleEventsWith(final EventProcessorFactory<T>... eventProcessorFactories)
    {
        final Sequence[] barrierSequences = new Sequence[0];
        return createEventProcessors(barrierSequences, eventProcessorFactories);
    }



    // 根據(jù)消費(fèi)者接口創(chuàng)建批量處理對象BatchEventProcessor
    EventHandlerGroup<T> createEventProcessors(
        final Sequence[] barrierSequences,
        final EventHandler<? super T>[] eventHandlers)
    {
        checkNotStarted();

        // 每個消費(fèi)者配備一個Sequence對象,應(yīng)該需要申請一個Sequence數(shù)組
        final Sequence[] processorSequences = new Sequence[eventHandlers.length];
        // 所有消費(fèi)者公用一個SequenceBarrier對象,這個SequenceBarrier會傳遞到消費(fèi)者當(dāng)中
        final SequenceBarrier barrier = ringBuffer.newBarrier(barrierSequences);

        // 根據(jù)eventHandlers的消費(fèi)接口的個數(shù)勉耀,為每個消費(fèi)接口創(chuàng)建一個BatchEventProcessor對象
        for (int i = 0, eventHandlersLength = eventHandlers.length; i < eventHandlersLength; i++)
        {
            final EventHandler<? super T> eventHandler = eventHandlers[i];

            final BatchEventProcessor<T> batchEventProcessor =
                new BatchEventProcessor<T>(ringBuffer, barrier, eventHandler);

            if (exceptionHandler != null)
            {
                batchEventProcessor.setExceptionHandler(exceptionHandler);
            }
            
            // 增加消費(fèi)者對象到消費(fèi)者數(shù)組consumerRepository當(dāng)中
            consumerRepository.add(batchEventProcessor, eventHandler, barrier);
            // processorSequences保存每個消費(fèi)者的Sequence對象指煎,每個消費(fèi)者配備一個Sequence對象
            processorSequences[i] = batchEventProcessor.getSequence();
        }
        // 內(nèi)部主要是關(guān)聯(lián)所有consumer的Sequence到RingBuffer的gatingSequences數(shù)組當(dāng)中
        updateGatingSequencesForNextInChain(barrierSequences, processorSequences);

        return new EventHandlerGroup<T>(this, consumerRepository, processorSequences);
    }



    private void updateGatingSequencesForNextInChain(final Sequence[] barrierSequences, final Sequence[] processorSequences)
    {
        if (processorSequences.length > 0)
        {
            // 增加消費(fèi)者關(guān)聯(lián)的Sequence到RingBuffer當(dāng)中
            ringBuffer.addGatingSequences(processorSequences);

            // 這里不是特別懂蹋偏,綁定接口的時候barrierSequences的數(shù)組為空數(shù)組
            for (final Sequence barrierSequence : barrierSequences)
            {
                ringBuffer.removeGatingSequence(barrierSequence);
            }
            consumerRepository.unMarkEventProcessorsAsEndOfChain(barrierSequences);
        }
    }

    // RingBuffer->Sequencer
    public void addGatingSequences(Sequence... gatingSequences)
    {
        // RingBuffer當(dāng)中的sequencer為RingBuffer中唯一的值
        // 通過addGatingSequences將所有消費(fèi)者的Sequenece和生產(chǎn)者關(guān)聯(lián)起來,
        // 這樣生產(chǎn)者可以檢測消費(fèi)者進(jìn)度避免覆蓋未消費(fèi)數(shù)據(jù)
        sequencer.addGatingSequences(gatingSequences);
    }
}


// 保存消費(fèi)者信息
class ConsumerRepository<T> implements Iterable<ConsumerInfo>
{
    private final Map<EventHandler<?>, EventProcessorInfo<T>> eventProcessorInfoByEventHandler =
        new IdentityHashMap<EventHandler<?>, EventProcessorInfo<T>>();
    private final Map<Sequence, ConsumerInfo> eventProcessorInfoBySequence =
        new IdentityHashMap<Sequence, ConsumerInfo>();
    private final Collection<ConsumerInfo> consumerInfos = new ArrayList<ConsumerInfo>();

    public void add(
        final EventProcessor eventprocessor,
        final EventHandler<? super T> handler,
        final SequenceBarrier barrier)
    {
        final EventProcessorInfo<T> consumerInfo = new EventProcessorInfo<T>(eventprocessor, handler, barrier);
        eventProcessorInfoByEventHandler.put(handler, consumerInfo);
        eventProcessorInfoBySequence.put(eventprocessor.getSequence(), consumerInfo);
        consumerInfos.add(consumerInfo);
    }
}



? AbstractSequencer的addGatingSequences()方法負(fù)責(zé)把消費(fèi)者的Sequence添加到RingBuffer的Sequencer當(dāng)中g(shù)atingSequences數(shù)組當(dāng)中至壤。內(nèi)部通過調(diào)用SequenceGroups.addSequences()實(shí)現(xiàn)暖侨。

public abstract class AbstractSequencer implements Sequencer
{
    // SEQUENCE_UPDATER指向gatingSequences變量的內(nèi)存地址
    private static final AtomicReferenceFieldUpdater<AbstractSequencer, Sequence[]> SEQUENCE_UPDATER =
        AtomicReferenceFieldUpdater.newUpdater(AbstractSequencer.class, Sequence[].class, "gatingSequences");

    protected final int bufferSize;
    protected final WaitStrategy waitStrategy;
    /**
     * 當(dāng)前RingBuffer對應(yīng)的油表位置
     */
    protected final Sequence cursor = new Sequence(Sequencer.INITIAL_CURSOR_VALUE);
    /**
     * 各個消費(fèi)者持有的取數(shù)sequence數(shù)組
     */
    protected volatile Sequence[] gatingSequences = new Sequence[0];

    public AbstractSequencer(int bufferSize, WaitStrategy waitStrategy)
    {
        if (bufferSize < 1)
        {
            throw new IllegalArgumentException("bufferSize must not be less than 1");
        }
        if (Integer.bitCount(bufferSize) != 1)
        {
            throw new IllegalArgumentException("bufferSize must be a power of 2");
        }

        this.bufferSize = bufferSize;
        this.waitStrategy = waitStrategy;
    }

    @Override
    public final void addGatingSequences(Sequence... gatingSequences)
    {
        SequenceGroups.addSequences(this, SEQUENCE_UPDATER, this, gatingSequences);
    }

    @Override
    public boolean removeGatingSequence(Sequence sequence)
    {
        return SequenceGroups.removeSequence(this, SEQUENCE_UPDATER, sequence);
    }



?SequenceGroups的addSequences()方法內(nèi)部通過把新增的sequencesToAdd添加到Sequencer當(dāng)中的gatingSequences數(shù)組中。

class SequenceGroups
{
    static <T> void addSequences(
        final T holder,
        final AtomicReferenceFieldUpdater<T, Sequence[]> updater,
        final Cursored cursor,
        final Sequence... sequencesToAdd)
    {
        long cursorSequence;
        Sequence[] updatedSequences;
        Sequence[] currentSequences;

        do
        {
            currentSequences = updater.get(holder);
            updatedSequences = copyOf(currentSequences, currentSequences.length + sequencesToAdd.length);
            cursorSequence = cursor.getCursor();

            int index = currentSequences.length;
            for (Sequence sequence : sequencesToAdd)
            {
                sequence.set(cursorSequence);
                updatedSequences[index++] = sequence;
            }
        }
        while (!updater.compareAndSet(holder, currentSequences, updatedSequences));

        cursorSequence = cursor.getCursor();
        for (Sequence sequence : sequencesToAdd)
        {
            sequence.set(cursorSequence);
        }
    }
}



?SequenceBarrier的創(chuàng)建過程,所有消費(fèi)者依靠SequenceBarrier的cursor進(jìn)行消費(fèi)崇渗。

public final class RingBuffer<E> extends RingBufferFields<E> implements Cursored, EventSequencer<E>, EventSink<E>
{
    public SequenceBarrier newBarrier(Sequence... sequencesToTrack)
    {
        // protected final Sequencer sequencer
        // sequencer這里指SingleProducerSequencer或MultiProducerSequencer
        return sequencer.newBarrier(sequencesToTrack);
    }
}


public abstract class AbstractSequencer implements Sequencer
{
    public SequenceBarrier newBarrier(Sequence... sequencesToTrack)
    {
        // 以SingleProducerSequencer為例子字逗,這里的curosr是Sequencer的curosr對象。
        return new ProcessingSequenceBarrier(this, waitStrategy, cursor, sequencesToTrack);
    }
}


final class ProcessingSequenceBarrier implements SequenceBarrier
{
    private final WaitStrategy waitStrategy;
    private final Sequence dependentSequence;
    private volatile boolean alerted = false;
    // 記錄生產(chǎn)者現(xiàn)在生產(chǎn)的數(shù)據(jù)到哪個下標(biāo)了,用于生產(chǎn)者和消費(fèi)者之間的消息同步
    private final Sequence cursorSequence;
    private final Sequencer sequencer;

    public ProcessingSequenceBarrier(
        final Sequencer sequencer,
        final WaitStrategy waitStrategy,
        final Sequence cursorSequence,
        final Sequence[] dependentSequences)
    {
        this.sequencer = sequencer;
        this.waitStrategy = waitStrategy;
        // 這里創(chuàng)建ProcessingSequenceBarrier對象的時候傳入的cursorSequence
       //
        this.cursorSequence = cursorSequence;
        if (0 == dependentSequences.length)
        {
            dependentSequence = cursorSequence;
        }
        else
        {
            dependentSequence = new FixedSequenceGroup(dependentSequences);
        }
    }
}


Disruptor啟動

?Disruptor啟動的內(nèi)容主要是通過線程池對象運(yùn)行消費(fèi)者任務(wù)宅广,消費(fèi)者任務(wù)進(jìn)入While循環(huán)進(jìn)行循環(huán)消費(fèi)葫掉。整個啟動過程如下:

  • 1、遍歷所有消費(fèi)者由consumerRepository保存依次啟動跟狱,ConsumerRepository的iterator()方法會遍歷Collection<ConsumerInfo> consumerInfos俭厚。
  • 2、ConsumerInfo中保存的是EventProcessorInfo對象驶臊,EventProcessorInfo的start()方法執(zhí)行 executor.execute(eventprocessor)提交任務(wù)到線程池當(dāng)中挪挤。
  • 3、EventProcessorInfo中的EventProcessor eventprocessor對象是我們綁定消費(fèi)函數(shù)指定的BatchEventProcessor對象关翎。
  • 4扛门、BatchEventProcessor的內(nèi)部方法run()方法由executor負(fù)責(zé)執(zhí)行并啟動,從而讓消費(fèi)者進(jìn)入了消費(fèi)的循環(huán)階段纵寝。
public class Disruptor<T>
{
    public RingBuffer<T> start()
    {
        checkOnlyStartedOnce();
        // 負(fù)責(zé)啟動所有的消費(fèi)者對象论寨,ConsumerInfo對象是EventProcessorInfo
        for (final ConsumerInfo consumerInfo : consumerRepository)
        {
            // 通過executor啟動consumer對象,
            consumerInfo.start(executor);
        }

        return ringBuffer;
    }
}



class ConsumerRepository<T> implements Iterable<ConsumerInfo>
{
    private final Map<EventHandler<?>, EventProcessorInfo<T>> eventProcessorInfoByEventHandler =
        new IdentityHashMap<EventHandler<?>, EventProcessorInfo<T>>();
    private final Map<Sequence, ConsumerInfo> eventProcessorInfoBySequence =
        new IdentityHashMap<Sequence, ConsumerInfo>();
    private final Collection<ConsumerInfo> consumerInfos = new ArrayList<ConsumerInfo>();

    public void add(
        final EventProcessor eventprocessor,
        final EventHandler<? super T> handler,
        final SequenceBarrier barrier)
    {
        final EventProcessorInfo<T> consumerInfo = new EventProcessorInfo<T>(eventprocessor, handler, barrier);
        eventProcessorInfoByEventHandler.put(handler, consumerInfo);
        eventProcessorInfoBySequence.put(eventprocessor.getSequence(), consumerInfo);
        consumerInfos.add(consumerInfo);
    }

    @Override
    public Iterator<ConsumerInfo> iterator()
    {
        return consumerInfos.iterator();
    }
}

class EventProcessorInfo<T> implements ConsumerInfo
{
    private final EventProcessor eventprocessor;
    private final EventHandler<? super T> handler;
    private final SequenceBarrier barrier;
    private boolean endOfChain = true;

    EventProcessorInfo(
        final EventProcessor eventprocessor, final EventHandler<? super T> handler, final SequenceBarrier barrier)
    {
        this.eventprocessor = eventprocessor;
        this.handler = handler;
        this.barrier = barrier;
    }

    @Override
    public void start(final Executor executor)
    {
        executor.execute(eventprocessor);
    }
}


public final class BatchEventProcessor<T>
    implements EventProcessor
{
    private final AtomicBoolean running = new AtomicBoolean(false);
    private ExceptionHandler<? super T> exceptionHandler = new FatalExceptionHandler();
    private final DataProvider<T> dataProvider;
    // 保存sequenceBarrier對象爽茴,批量增加的消費(fèi)者會公用一個sequenceBarrier對象葬凳。
    private final SequenceBarrier sequenceBarrier;
    private final EventHandler<? super T> eventHandler;
    private final Sequence sequence = new Sequence(Sequencer.INITIAL_CURSOR_VALUE);
    private final TimeoutHandler timeoutHandler;
    private final BatchStartAware batchStartAware;

    public BatchEventProcessor(
        final DataProvider<T> dataProvider,
        final SequenceBarrier sequenceBarrier,
        final EventHandler<? super T> eventHandler)
    {
        this.dataProvider = dataProvider;
        this.sequenceBarrier = sequenceBarrier;
        this.eventHandler = eventHandler;

        if (eventHandler instanceof SequenceReportingEventHandler)
        {
            ((SequenceReportingEventHandler<?>) eventHandler).setSequenceCallback(sequence);
        }

        batchStartAware =
                (eventHandler instanceof BatchStartAware) ? (BatchStartAware) eventHandler : null;
        timeoutHandler =
                (eventHandler instanceof TimeoutHandler) ? (TimeoutHandler) eventHandler : null;
    }



    public void run()
    {
        if (!running.compareAndSet(false, true))
        {
            throw new IllegalStateException("Thread is already running");
        }
        sequenceBarrier.clearAlert();

        notifyStart();

        T event = null;
        long nextSequence = sequence.get() + 1L;
        try
        {
            while (true)
            {
                try
                {
                    final long availableSequence = sequenceBarrier.waitFor(nextSequence);
                    if (batchStartAware != null)
                    {
                        batchStartAware.onBatchStart(availableSequence - nextSequence + 1);
                    }

                    while (nextSequence <= availableSequence)
                    {
                        event = dataProvider.get(nextSequence);
                        eventHandler.onEvent(event, nextSequence, nextSequence == availableSequence);
                        nextSequence++;
                    }

                    sequence.set(availableSequence);
                }
                catch (final TimeoutException e)
                {
                    notifyTimeout(sequence.get());
                }
                catch (final AlertException ex)
                {
                    if (!running.get())
                    {
                        break;
                    }
                }
                catch (final Throwable ex)
                {
                    exceptionHandler.handleEventException(ex, nextSequence, event);
                    sequence.set(nextSequence);
                    nextSequence++;
                }
            }
        }
        finally
        {
            notifyShutdown();
            running.set(false);
        }
    }
}


producer工作過程

?producer工作過程如下:

  • 獲取下一個可用的序號用于生成數(shù)據(jù) ringBuffer.next()
  • 生成數(shù)據(jù)并保存到循環(huán)數(shù)組當(dāng)中 ringBuffer.get(sequence)
  • 更新RingBuffer當(dāng)中待消費(fèi)數(shù)據(jù)的位移 ringBuffer.publish(sequence)
  • SingleProducerSequencer的next()方法邏輯都在注釋代碼當(dāng)中,自行閱讀室奏。
  • ringBuffer.publish(sequence)會喚醒消費(fèi)者如果消費(fèi)者在等待狀態(tài)
RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer();
long sequence = ringBuffer.next();//請求下一個事件序號火焰;
    
try {
    LongEvent event = ringBuffer.get(sequence);//獲取該序號對應(yīng)的事件對象;
    long data = getEventData();//獲取要通過事件傳遞的業(yè)務(wù)數(shù)據(jù)胧沫;
    event.set(data);
} finally{
    ringBuffer.publish(sequence);//發(fā)布事件昌简;
}



public final class RingBuffer<E> extends RingBufferFields<E> implements Cursored, EventSequencer<E>, EventSink<E>
{
    public static final long INITIAL_CURSOR_VALUE = Sequence.INITIAL_VALUE;
    protected long p1, p2, p3, p4, p5, p6, p7;

    // 獲取下一個可用的位置
    public long next()
    {
        return sequencer.next();
    }

    // 數(shù)據(jù)放置到環(huán)形數(shù)組當(dāng)中
    public void publish(long sequence)
    {
        sequencer.publish(sequence);
    }
}

?next()方法獲取下一個可用的可寫的位置,publish()方法用于喚醒消費(fèi)者

public final class SingleProducerSequencer extends SingleProducerSequencerFields
{
    protected long p1, p2, p3, p4, p5, p6, p7;

    public SingleProducerSequencer(int bufferSize, final WaitStrategy waitStrategy)
    {
        super(bufferSize, waitStrategy);
    }

    public long next()
    {
        return next(1);
    }

    public long next(int n) 
    {
        if (n < 1) //n表示此次生產(chǎn)者期望獲取多少個序號琳袄,通常是1
        {
            throw new IllegalArgumentException("n must be > 0");
        }

        long nextValue = this.nextValue;
        
        // 生產(chǎn)者當(dāng)前序號值+期望獲取的序號數(shù)量后達(dá)到的序號值
        long nextSequence = nextValue + n;  
        // 減掉RingBuffer的總的buffer值江场,用于判斷是否出現(xiàn)‘覆蓋’
        // 因?yàn)槭黔h(huán)形數(shù)組的設(shè)計(jì),所以生產(chǎn)者最多比消費(fèi)者快一圈
        // 所以減去bufferSize后比較看是否追上
        long wrapPoint = nextSequence - bufferSize; 

        // 從后面代碼分析可得:cachedValue就是緩存的消費(fèi)者中最小序號值窖逗,他不是當(dāng)前‘消費(fèi)者中最小序號值’址否,
        // 而是上次程序進(jìn)入到下面的if判定代碼段是,被賦值的當(dāng)時的‘消費(fèi)者中最小序號值’
        // 這樣做的好處在于:在判定是否出現(xiàn)覆蓋的時候,不用每次都調(diào)用getMininumSequence計(jì)算‘消費(fèi)者中的最小序號值’佑附,從而節(jié)約開銷樊诺。
        // 只要確保當(dāng)生產(chǎn)者的節(jié)奏大于了緩存的cachedGateingSequence一個bufferSize時,從新獲取一下 getMinimumSequence()即可音同。
        long cachedGatingSequence = this.cachedValue;  

        // (wrapPoint > cachedGatingSequence) : 
        // 當(dāng)生產(chǎn)者已經(jīng)超過上一次緩存的‘消費(fèi)者中最小序號值’(cachedGatingSequence)一個‘Ring’大写逝馈(bufferSize),
        // 需要重新獲取cachedGatingSequence权均,避免當(dāng)生產(chǎn)者一直在生產(chǎn)顿膨,但是消費(fèi)者不再消費(fèi)的情況下,出現(xiàn)‘覆蓋’

        // (cachedGatingSequence > nextValue) : 生產(chǎn)者和消費(fèi)者均為順序遞增的叽赊,且生產(chǎn)者的seq“先于”消費(fèi)者的seq恋沃,注意是‘先于’而不是‘大于’。
        // 當(dāng)nextValue>Long.MAXVALUE時必指,nextValue+1就會變成負(fù)數(shù)囊咏,wrapPoint也會變成負(fù)數(shù),這時候必然會是:cachedGatingSequence > nextValue
        // 這個變化的過程會持續(xù)bufferSize個序號塔橡,這個區(qū)間梅割,由于getMinimumSequence()得到的雖然是名義上的‘消費(fèi)者中最小序號值’,但是不代表是走在‘最后面’的消費(fèi)者
        if (wrapPoint > cachedGatingSequence || cachedGatingSequence > nextValue) 
        {
            // 將生產(chǎn)者的cursor值更新到主存葛家,以便對所有的消費(fèi)者線程可見户辞。cursor表示現(xiàn)在生產(chǎn)那個節(jié)點(diǎn)了
            cursor.setVolatile(nextValue);  // StoreLoad fence

            long minSequence;
            // 生產(chǎn)者停下來,等待消費(fèi)者消費(fèi)惦银,直到‘覆蓋’現(xiàn)象清除咆课。
            // 這里會獲取所有消費(fèi)者消費(fèi)最慢的消費(fèi)者的消費(fèi)位移保證不覆蓋。
            while (wrapPoint > (minSequence = Util.getMinimumSequence(gatingSequences, nextValue)))
            {
                // 通知消費(fèi)者進(jìn)行消費(fèi)扯俱,然后自旋
                waitStrategy.signalAllWhenBlocking();
                LockSupport.parkNanos(1L); // TODO: Use waitStrategy to spin?
            }

            this.cachedValue = minSequence;
        }

        this.nextValue = nextSequence;

        return nextSequence;
    }

    public void publish(long sequence)
    {
        // 設(shè)置當(dāng)前生產(chǎn)者的位置便于消費(fèi)者消費(fèi)
        cursor.set(sequence);
        // 通知消費(fèi)者進(jìn)行消費(fèi)
        waitStrategy.signalAllWhenBlocking();
    }


consumer工作過程

?consumer在綁定消費(fèi)者的過程中創(chuàng)建的BatchEventProcessor對象,核心邏輯在于run()方法:

  • 獲取待消費(fèi)的數(shù)據(jù)的位置 sequenceBarrier.waitFor(nextSequence)
  • 遍歷待消費(fèi)的數(shù)據(jù)進(jìn)行消費(fèi)eventHandler.onEvent()
  • 設(shè)置該消費(fèi)者的消費(fèi)的位移sequence.set(availableSequence)
public final class BatchEventProcessor<T>
    implements EventProcessor
{
    private final AtomicBoolean running = new AtomicBoolean(false);
    private ExceptionHandler<? super T> exceptionHandler = new FatalExceptionHandler();
    private final DataProvider<T> dataProvider;
    // 保存sequenceBarrier對象喇澡,這個sequenceBarrier在創(chuàng)建Consumer對象傳遞
    private final SequenceBarrier sequenceBarrier;
    private final EventHandler<? super T> eventHandler;
    private final Sequence sequence = new Sequence(Sequencer.INITIAL_CURSOR_VALUE);
    private final TimeoutHandler timeoutHandler;
    private final BatchStartAware batchStartAware;

    public BatchEventProcessor(
        final DataProvider<T> dataProvider,
        final SequenceBarrier sequenceBarrier,
        final EventHandler<? super T> eventHandler)
    {
        this.dataProvider = dataProvider;
        this.sequenceBarrier = sequenceBarrier;
        this.eventHandler = eventHandler;

        if (eventHandler instanceof SequenceReportingEventHandler)
        {
            ((SequenceReportingEventHandler<?>) eventHandler).setSequenceCallback(sequence);
        }

        batchStartAware =
                (eventHandler instanceof BatchStartAware) ? (BatchStartAware) eventHandler : null;
        timeoutHandler =
                (eventHandler instanceof TimeoutHandler) ? (TimeoutHandler) eventHandler : null;
    }


    // 消費(fèi)者進(jìn)行消費(fèi)迅栅,會在while循環(huán)當(dāng)中一直消費(fèi)
    public void run()
    {
        if (!running.compareAndSet(false, true))
        {
            throw new IllegalStateException("Thread is already running");
        }
        sequenceBarrier.clearAlert();

        notifyStart();

        T event = null;
        long nextSequence = sequence.get() + 1L;
        try
        {
            while (true)
            {
                try
                {
                    // 等待可消費(fèi)的Sequence,在sequenceBarrier.waitFor()內(nèi)部會執(zhí)行waitStrategy.waitFor()方法實(shí)現(xiàn)生產(chǎn)者和消費(fèi)者之間的通信晴玖。
                    final long availableSequence = sequenceBarrier.waitFor(nextSequence);
                    if (batchStartAware != null)
                    {
                        batchStartAware.onBatchStart(availableSequence - nextSequence + 1);
                    }
                    
                    // 進(jìn)行數(shù)據(jù)消費(fèi)
                    while (nextSequence <= availableSequence)
                    {
                        event = dataProvider.get(nextSequence);
                        eventHandler.onEvent(event, nextSequence, nextSequence == availableSequence);
                        nextSequence++;
                    }
                    // 設(shè)置消費(fèi)者當(dāng)前的消費(fèi)進(jìn)度至sequence
                    sequence.set(availableSequence);
                }
                catch (final TimeoutException e)
                {
                    notifyTimeout(sequence.get());
                }
                catch (final AlertException ex)
                {
                    if (!running.get())
                    {
                        break;
                    }
                }
                catch (final Throwable ex)
                {
                    exceptionHandler.handleEventException(ex, nextSequence, event);
                    sequence.set(nextSequence);
                    nextSequence++;
                }
            }
        }
        finally
        {
            notifyShutdown();
            running.set(false);
        }
    }
}



?ProcessingSequenceBarrier的waitFor()方法內(nèi)部調(diào)用waitStrategy.waitFor()獲取當(dāng)前可消費(fèi)的數(shù)據(jù)的位置读存,不同的waitStrategy由不同的策略,在內(nèi)部通過信號量進(jìn)行通知呕屎。

final class ProcessingSequenceBarrier implements SequenceBarrier
{
    private final WaitStrategy waitStrategy;
    private final Sequence dependentSequence;
    private volatile boolean alerted = false;
    // 記錄生產(chǎn)者現(xiàn)在生產(chǎn)的數(shù)據(jù)到哪個下標(biāo)了,用于生產(chǎn)者和消費(fèi)者之間的消息同步
    private final Sequence cursorSequence;
    private final Sequencer sequencer;

    public ProcessingSequenceBarrier(
        final Sequencer sequencer,
        final WaitStrategy waitStrategy,
        final Sequence cursorSequence,
        final Sequence[] dependentSequences)
    {
        this.sequencer = sequencer;
        this.waitStrategy = waitStrategy;
        this.cursorSequence = cursorSequence;
        if (0 == dependentSequences.length)
        {
            dependentSequence = cursorSequence;
        }
        else
        {
            dependentSequence = new FixedSequenceGroup(dependentSequences);
        }
    }


    public long waitFor(final long sequence)
        throws AlertException, InterruptedException, TimeoutException
    {
        checkAlert();

        // Wait for the given sequence to be available让簿。
        // 這里并不保證返回值availableSequence一定等于 given sequence,他們的大小關(guān)系取決于采用的WaitStrategy秀睛。
        // 1尔当、YieldingWaitStrategy在自旋100次嘗試后,會直接返回dependentSequence的最小seq蹂安,這時并不保證返回值>=given sequence
        // 2椭迎、BlockingWaitStrategy則會阻塞等待given sequence可用為止锐帜,可用并不是說availableSequence == given sequence,而應(yīng)當(dāng)是指 >=
        long availableSequence = waitStrategy.waitFor(sequence, cursorSequence, dependentSequence, this);

        if (availableSequence < sequence)
        {
            return availableSequence;
        }

        // 獲取消費(fèi)者可以消費(fèi)的最大的可用序號畜号,支持批處理效應(yīng)缴阎,提升處理效率。
        // 當(dāng)availableSequence > sequence時简软,需要遍歷 sequence --> availableSequence蛮拔,找到最前一個準(zhǔn)備就緒,可以被消費(fèi)的event對應(yīng)的seq痹升。
        // 最小值為:sequence-1
        return sequencer.getHighestPublishedSequence(sequence, availableSequence);
    }
}


參考文章

Disruptor3.0的實(shí)現(xiàn)細(xì)節(jié)

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末语泽,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子视卢,更是在濱河造成了極大的恐慌踱卵,老刑警劉巖,帶你破解...
    沈念sama閱讀 217,657評論 6 505
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件据过,死亡現(xiàn)場離奇詭異惋砂,居然都是意外死亡,警方通過查閱死者的電腦和手機(jī)绳锅,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,889評論 3 394
  • 文/潘曉璐 我一進(jìn)店門西饵,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人鳞芙,你說我怎么就攤上這事眷柔。” “怎么了原朝?”我有些...
    開封第一講書人閱讀 164,057評論 0 354
  • 文/不壞的土叔 我叫張陵驯嘱,是天一觀的道長。 經(jīng)常有香客問我喳坠,道長鞠评,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 58,509評論 1 293
  • 正文 為了忘掉前任壕鹉,我火速辦了婚禮剃幌,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘晾浴。我一直安慰自己负乡,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,562評論 6 392
  • 文/花漫 我一把揭開白布脊凰。 她就那樣靜靜地躺著抖棘,像睡著了一般。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上钉答,一...
    開封第一講書人閱讀 51,443評論 1 302
  • 那天础芍,我揣著相機(jī)與錄音,去河邊找鬼数尿。 笑死仑性,一個胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的右蹦。 我是一名探鬼主播诊杆,決...
    沈念sama閱讀 40,251評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼何陆!你這毒婦竟也來了晨汹?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 39,129評論 0 276
  • 序言:老撾萬榮一對情侶失蹤贷盲,失蹤者是張志新(化名)和其女友劉穎淘这,沒想到半個月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體巩剖,經(jīng)...
    沈念sama閱讀 45,561評論 1 314
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡铝穷,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,779評論 3 335
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了佳魔。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片曙聂。...
    茶點(diǎn)故事閱讀 39,902評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖鞠鲜,靈堂內(nèi)的尸體忽然破棺而出宁脊,到底是詐尸還是另有隱情,我是刑警寧澤贤姆,帶...
    沈念sama閱讀 35,621評論 5 345
  • 正文 年R本政府宣布榆苞,位于F島的核電站,受9級特大地震影響庐氮,放射性物質(zhì)發(fā)生泄漏语稠。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,220評論 3 328
  • 文/蒙蒙 一弄砍、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧输涕,春花似錦音婶、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,838評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至,卻和暖如春碴卧,著一層夾襖步出監(jiān)牢的瞬間弱卡,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 32,971評論 1 269
  • 我被黑心中介騙來泰國打工住册, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留婶博,地道東北人。 一個月前我還...
    沈念sama閱讀 48,025評論 2 370
  • 正文 我出身青樓荧飞,卻偏偏與公主長得像凡人,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子叹阔,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,843評論 2 354

推薦閱讀更多精彩內(nèi)容