開篇
?整個博文希望能夠講清楚Disruptor的producer和consumer的處理過程以及兩者之間的消息通知機(jī)制级解。
工作過程
?Disruptor本質(zhì)上是一個內(nèi)存消息隊(duì)列悲靴,適合生產(chǎn)者消費(fèi)者模型饥脑,所以它的整個工作過程其實(shí)也就分三個大過程痕寓,分別是Disruptor本身啟動、Disruptor生產(chǎn)者工作過程、Disruptor消費(fèi)者工作過程购公。整個Disruptor工作流程圖如下圖:
Disruptor對象關(guān)系圖
說明:
- Disruptor包含:RingBuffer對象、Executor對象雁歌。
- RingBuffer包含:Sequencer sequencer對象宏浩、int bufferSize變量、Object[] entries變量靠瞎。
- Sequencer類分為MultiProducerSequencer和SingleProducerSequencer兩類比庄。
- Disruptor的整個啟動過程就是上面變量的初始化過程。
Disruptor啟動過程
Disruptor初始化過程
- 1乏盐、創(chuàng)建Disruptor對象佳窑。
- 2、創(chuàng)建RingBuffer對象父能。
- 3神凑、Sequencer分為MultiProducerSequencer和SingleProducerSequencer兩類。
- 4法竞、整個過程按照Disruptor->RingBuffer->Sequencer的順序創(chuàng)建核心對象耙厚。
Disruptor<LongEvent> d = new Disruptor<LongEvent>(
LongEvent.FACTORY, 2048, DaemonThreadFactory.INSTANCE,
producerType, new SleepingWaitStrategy());
public class Disruptor<T>
{
private final RingBuffer<T> ringBuffer;
private final Executor executor;
private final ConsumerRepository<T> consumerRepository = new ConsumerRepository<T>();
private final AtomicBoolean started = new AtomicBoolean(false);
private ExceptionHandler<? super T> exceptionHandler = new ExceptionHandlerWrapper<T>();
@Deprecated
public Disruptor(final EventFactory<T> eventFactory, final int ringBufferSize, final Executor executor)
{
// RingBuffer.createMultiProducer創(chuàng)建RingBuffer對象
this(RingBuffer.createMultiProducer(eventFactory, ringBufferSize), executor);
}
@Deprecated
public Disruptor(
final EventFactory<T> eventFactory,
final int ringBufferSize,
final Executor executor,
final ProducerType producerType,
final WaitStrategy waitStrategy)
{
// RingBuffer.create創(chuàng)建RingBuffer對象
this(RingBuffer.create(producerType, eventFactory, ringBufferSize, waitStrategy), executor);
}
public Disruptor(final EventFactory<T> eventFactory, final int ringBufferSize, final ThreadFactory threadFactory)
{
// RingBuffer.createMultiProducer創(chuàng)建RingBuffer對象
this(RingBuffer.createMultiProducer(eventFactory, ringBufferSize), new BasicExecutor(threadFactory));
}
public Disruptor(
final EventFactory<T> eventFactory,
final int ringBufferSize,
final ThreadFactory threadFactory,
final ProducerType producerType,
final WaitStrategy waitStrategy)
{
// RingBuffer.createMultiProducer創(chuàng)建RingBuffer對象
this(
RingBuffer.create(producerType, eventFactory, ringBufferSize, waitStrategy),
new BasicExecutor(threadFactory));
}
// 真正核心的構(gòu)造函數(shù)參數(shù)包括一個RingBuffer對象和Executor對象强挫,前者由于保存數(shù)據(jù)岔霸,后者用于執(zhí)行消費(fèi)者函數(shù)
private Disruptor(final RingBuffer<T> ringBuffer, final Executor executor)
{
this.ringBuffer = ringBuffer;
this.executor = executor;
}
}
創(chuàng)建Sequencer對象,根據(jù)單生產(chǎn)者還是多生產(chǎn)者分為SingleProducerSequencer和MultiProducerSequencer兩類俯渤。
abstract class RingBufferPad
{
protected long p1, p2, p3, p4, p5, p6, p7;
}
abstract class RingBufferFields<E> extends RingBufferPad
{
//用于填充的對象引用呆细,為什么填充不知道?
private static final int BUFFER_PAD;
//entry存儲位置相對與array起始位置的偏移量八匠,用于UNSAFE內(nèi)存操作時進(jìn)行尋址絮爷,注意這個偏移量加上了用于填充的BUFFER_PAD大小
private static final long REF_ARRAY_BASE;
//對應(yīng)對象引用占用內(nèi)存大小,計(jì)算出來的相對位移數(shù)梨树,比如對象引用大小是4byte坑夯,那么REF_ELEMENT_SHIFT=2,因?yàn)?的2次方=4抡四;
private static final int REF_ELEMENT_SHIFT;
private static final Unsafe UNSAFE = Util.getUnsafe();
static
{
final int scale = UNSAFE.arrayIndexScale(Object[].class);
if (4 == scale)
{
REF_ELEMENT_SHIFT = 2;
}
else if (8 == scale)
{
REF_ELEMENT_SHIFT = 3;
}
else
{
throw new IllegalStateException("Unknown pointer size");
}
BUFFER_PAD = 128 / scale;
REF_ARRAY_BASE = UNSAFE.arrayBaseOffset(Object[].class) + (BUFFER_PAD << REF_ELEMENT_SHIFT);
}
private final long indexMask;
// RingBuffer存儲數(shù)據(jù)對象
private final Object[] entries;
// RingBuffer數(shù)組大小
protected final int bufferSize;
// Sequencer對象
protected final Sequencer sequencer;
RingBufferFields(
EventFactory<E> eventFactory,
Sequencer sequencer)
{
this.sequencer = sequencer;
this.bufferSize = sequencer.getBufferSize();
if (bufferSize < 1)
{
throw new IllegalArgumentException("bufferSize must not be less than 1");
}
if (Integer.bitCount(bufferSize) != 1)
{
throw new IllegalArgumentException("bufferSize must be a power of 2");
}
this.indexMask = bufferSize - 1;
this.entries = new Object[sequencer.getBufferSize() + 2 * BUFFER_PAD];
fill(eventFactory);
}
}
public final class RingBuffer<E> extends RingBufferFields<E> implements Cursored, EventSequencer<E>, EventSink<E>
{
public static final long INITIAL_CURSOR_VALUE = Sequence.INITIAL_VALUE;
protected long p1, p2, p3, p4, p5, p6, p7;
RingBuffer(
EventFactory<E> eventFactory,
Sequencer sequencer)
{
super(eventFactory, sequencer);
}
public static <E> RingBuffer<E> createMultiProducer(
EventFactory<E> factory,
int bufferSize,
WaitStrategy waitStrategy)
{
// 創(chuàng)建Sequencer對象
MultiProducerSequencer sequencer = new MultiProducerSequencer(bufferSize, waitStrategy);
return new RingBuffer<E>(factory, sequencer);
}
// 創(chuàng)建Sequencer對象的過程
public static <E> RingBuffer<E> createMultiProducer(EventFactory<E> factory, int bufferSize)
{
return createMultiProducer(factory, bufferSize, new BlockingWaitStrategy());
}
public static <E> RingBuffer<E> createSingleProducer(
EventFactory<E> factory,
int bufferSize,
WaitStrategy waitStrategy)
{
// 創(chuàng)建Sequencer對象
SingleProducerSequencer sequencer = new SingleProducerSequencer(bufferSize, waitStrategy);
return new RingBuffer<E>(factory, sequencer);
}
public static <E> RingBuffer<E> createSingleProducer(EventFactory<E> factory, int bufferSize)
{
return createSingleProducer(factory, bufferSize, new BlockingWaitStrategy());
}
public static <E> RingBuffer<E> create(
ProducerType producerType,
EventFactory<E> factory,
int bufferSize,
WaitStrategy waitStrategy)
{
switch (producerType)
{
case SINGLE:
return createSingleProducer(factory, bufferSize, waitStrategy);
case MULTI:
return createMultiProducer(factory, bufferSize, waitStrategy);
default:
throw new IllegalStateException(producerType.toString());
}
}
}
BasicExecutor只是簡單的實(shí)現(xiàn)了Executor接口柜蜈,用于解決沒有傳遞Executor對象的時候使用默認(rèn)的BasicExecutor即可,可以理解就是默認(rèn)提供的線程池對象指巡。
public class BasicExecutor implements Executor
{
private final ThreadFactory factory;
private final Queue<Thread> threads = new ConcurrentLinkedQueue<Thread>();
public BasicExecutor(ThreadFactory factory)
{
this.factory = factory;
}
@Override
public void execute(Runnable command)
{
final Thread thread = factory.newThread(command);
if (null == thread)
{
throw new RuntimeException("Failed to create thread to run: " + command);
}
thread.start();
threads.add(thread);
}
}
綁定消費(fèi)者接口
?整個綁定消費(fèi)者接口主要完成了以下幾個步驟:
- 1淑履、根據(jù)消費(fèi)者接口創(chuàng)建BatchEventProcessor對象
- 2、將BatchEventProcessor添加到consumerRepository當(dāng)中
- 3藻雪、updateGatingSequencesForNextInChain內(nèi)通過ringBuffer.addGatingSequences()方法添加消費(fèi)者的Sequence到gatingSequences秘噪,完成了消費(fèi)者和生產(chǎn)者之間的關(guān)聯(lián)
// 創(chuàng)建Disruptor對象并通過handleEventsWith綁定消費(fèi)者Handler對象
public void shouldBatch() throws Exception
{
Disruptor<LongEvent> d = new Disruptor<LongEvent>(
LongEvent.FACTORY, 2048, DaemonThreadFactory.INSTANCE,
producerType, new SleepingWaitStrategy());
ParallelEventHandler handler1 = new ParallelEventHandler(1, 0);
ParallelEventHandler handler2 = new ParallelEventHandler(1, 1);
d.handleEventsWith(handler1, handler2);
RingBuffer<LongEvent> buffer = d.start();
}
// handleEventsWith用于綁定消費(fèi)者對象
public class Disruptor<T>
{
// step1 根據(jù)消費(fèi)者接口創(chuàng)建消費(fèi)者對象createEventProcessors
public EventHandlerGroup<T> handleEventsWith(final EventHandler<? super T>... handlers)
{
return createEventProcessors(new Sequence[0], handlers);
}
public EventHandlerGroup<T> handleEventsWith(final EventProcessorFactory<T>... eventProcessorFactories)
{
final Sequence[] barrierSequences = new Sequence[0];
return createEventProcessors(barrierSequences, eventProcessorFactories);
}
// 根據(jù)消費(fèi)者接口創(chuàng)建批量處理對象BatchEventProcessor
EventHandlerGroup<T> createEventProcessors(
final Sequence[] barrierSequences,
final EventHandler<? super T>[] eventHandlers)
{
checkNotStarted();
// 每個消費(fèi)者配備一個Sequence對象,應(yīng)該需要申請一個Sequence數(shù)組
final Sequence[] processorSequences = new Sequence[eventHandlers.length];
// 所有消費(fèi)者公用一個SequenceBarrier對象,這個SequenceBarrier會傳遞到消費(fèi)者當(dāng)中
final SequenceBarrier barrier = ringBuffer.newBarrier(barrierSequences);
// 根據(jù)eventHandlers的消費(fèi)接口的個數(shù)勉耀,為每個消費(fèi)接口創(chuàng)建一個BatchEventProcessor對象
for (int i = 0, eventHandlersLength = eventHandlers.length; i < eventHandlersLength; i++)
{
final EventHandler<? super T> eventHandler = eventHandlers[i];
final BatchEventProcessor<T> batchEventProcessor =
new BatchEventProcessor<T>(ringBuffer, barrier, eventHandler);
if (exceptionHandler != null)
{
batchEventProcessor.setExceptionHandler(exceptionHandler);
}
// 增加消費(fèi)者對象到消費(fèi)者數(shù)組consumerRepository當(dāng)中
consumerRepository.add(batchEventProcessor, eventHandler, barrier);
// processorSequences保存每個消費(fèi)者的Sequence對象指煎,每個消費(fèi)者配備一個Sequence對象
processorSequences[i] = batchEventProcessor.getSequence();
}
// 內(nèi)部主要是關(guān)聯(lián)所有consumer的Sequence到RingBuffer的gatingSequences數(shù)組當(dāng)中
updateGatingSequencesForNextInChain(barrierSequences, processorSequences);
return new EventHandlerGroup<T>(this, consumerRepository, processorSequences);
}
private void updateGatingSequencesForNextInChain(final Sequence[] barrierSequences, final Sequence[] processorSequences)
{
if (processorSequences.length > 0)
{
// 增加消費(fèi)者關(guān)聯(lián)的Sequence到RingBuffer當(dāng)中
ringBuffer.addGatingSequences(processorSequences);
// 這里不是特別懂蹋偏,綁定接口的時候barrierSequences的數(shù)組為空數(shù)組
for (final Sequence barrierSequence : barrierSequences)
{
ringBuffer.removeGatingSequence(barrierSequence);
}
consumerRepository.unMarkEventProcessorsAsEndOfChain(barrierSequences);
}
}
// RingBuffer->Sequencer
public void addGatingSequences(Sequence... gatingSequences)
{
// RingBuffer當(dāng)中的sequencer為RingBuffer中唯一的值
// 通過addGatingSequences將所有消費(fèi)者的Sequenece和生產(chǎn)者關(guān)聯(lián)起來,
// 這樣生產(chǎn)者可以檢測消費(fèi)者進(jìn)度避免覆蓋未消費(fèi)數(shù)據(jù)
sequencer.addGatingSequences(gatingSequences);
}
}
// 保存消費(fèi)者信息
class ConsumerRepository<T> implements Iterable<ConsumerInfo>
{
private final Map<EventHandler<?>, EventProcessorInfo<T>> eventProcessorInfoByEventHandler =
new IdentityHashMap<EventHandler<?>, EventProcessorInfo<T>>();
private final Map<Sequence, ConsumerInfo> eventProcessorInfoBySequence =
new IdentityHashMap<Sequence, ConsumerInfo>();
private final Collection<ConsumerInfo> consumerInfos = new ArrayList<ConsumerInfo>();
public void add(
final EventProcessor eventprocessor,
final EventHandler<? super T> handler,
final SequenceBarrier barrier)
{
final EventProcessorInfo<T> consumerInfo = new EventProcessorInfo<T>(eventprocessor, handler, barrier);
eventProcessorInfoByEventHandler.put(handler, consumerInfo);
eventProcessorInfoBySequence.put(eventprocessor.getSequence(), consumerInfo);
consumerInfos.add(consumerInfo);
}
}
? AbstractSequencer的addGatingSequences()方法負(fù)責(zé)把消費(fèi)者的Sequence添加到RingBuffer的Sequencer當(dāng)中g(shù)atingSequences數(shù)組當(dāng)中至壤。內(nèi)部通過調(diào)用SequenceGroups.addSequences()實(shí)現(xiàn)暖侨。
public abstract class AbstractSequencer implements Sequencer
{
// SEQUENCE_UPDATER指向gatingSequences變量的內(nèi)存地址
private static final AtomicReferenceFieldUpdater<AbstractSequencer, Sequence[]> SEQUENCE_UPDATER =
AtomicReferenceFieldUpdater.newUpdater(AbstractSequencer.class, Sequence[].class, "gatingSequences");
protected final int bufferSize;
protected final WaitStrategy waitStrategy;
/**
* 當(dāng)前RingBuffer對應(yīng)的油表位置
*/
protected final Sequence cursor = new Sequence(Sequencer.INITIAL_CURSOR_VALUE);
/**
* 各個消費(fèi)者持有的取數(shù)sequence數(shù)組
*/
protected volatile Sequence[] gatingSequences = new Sequence[0];
public AbstractSequencer(int bufferSize, WaitStrategy waitStrategy)
{
if (bufferSize < 1)
{
throw new IllegalArgumentException("bufferSize must not be less than 1");
}
if (Integer.bitCount(bufferSize) != 1)
{
throw new IllegalArgumentException("bufferSize must be a power of 2");
}
this.bufferSize = bufferSize;
this.waitStrategy = waitStrategy;
}
@Override
public final void addGatingSequences(Sequence... gatingSequences)
{
SequenceGroups.addSequences(this, SEQUENCE_UPDATER, this, gatingSequences);
}
@Override
public boolean removeGatingSequence(Sequence sequence)
{
return SequenceGroups.removeSequence(this, SEQUENCE_UPDATER, sequence);
}
?SequenceGroups的addSequences()方法內(nèi)部通過把新增的sequencesToAdd添加到Sequencer當(dāng)中的gatingSequences數(shù)組中。
class SequenceGroups
{
static <T> void addSequences(
final T holder,
final AtomicReferenceFieldUpdater<T, Sequence[]> updater,
final Cursored cursor,
final Sequence... sequencesToAdd)
{
long cursorSequence;
Sequence[] updatedSequences;
Sequence[] currentSequences;
do
{
currentSequences = updater.get(holder);
updatedSequences = copyOf(currentSequences, currentSequences.length + sequencesToAdd.length);
cursorSequence = cursor.getCursor();
int index = currentSequences.length;
for (Sequence sequence : sequencesToAdd)
{
sequence.set(cursorSequence);
updatedSequences[index++] = sequence;
}
}
while (!updater.compareAndSet(holder, currentSequences, updatedSequences));
cursorSequence = cursor.getCursor();
for (Sequence sequence : sequencesToAdd)
{
sequence.set(cursorSequence);
}
}
}
?SequenceBarrier的創(chuàng)建過程,所有消費(fèi)者依靠SequenceBarrier的cursor進(jìn)行消費(fèi)崇渗。
public final class RingBuffer<E> extends RingBufferFields<E> implements Cursored, EventSequencer<E>, EventSink<E>
{
public SequenceBarrier newBarrier(Sequence... sequencesToTrack)
{
// protected final Sequencer sequencer
// sequencer這里指SingleProducerSequencer或MultiProducerSequencer
return sequencer.newBarrier(sequencesToTrack);
}
}
public abstract class AbstractSequencer implements Sequencer
{
public SequenceBarrier newBarrier(Sequence... sequencesToTrack)
{
// 以SingleProducerSequencer為例子字逗,這里的curosr是Sequencer的curosr對象。
return new ProcessingSequenceBarrier(this, waitStrategy, cursor, sequencesToTrack);
}
}
final class ProcessingSequenceBarrier implements SequenceBarrier
{
private final WaitStrategy waitStrategy;
private final Sequence dependentSequence;
private volatile boolean alerted = false;
// 記錄生產(chǎn)者現(xiàn)在生產(chǎn)的數(shù)據(jù)到哪個下標(biāo)了,用于生產(chǎn)者和消費(fèi)者之間的消息同步
private final Sequence cursorSequence;
private final Sequencer sequencer;
public ProcessingSequenceBarrier(
final Sequencer sequencer,
final WaitStrategy waitStrategy,
final Sequence cursorSequence,
final Sequence[] dependentSequences)
{
this.sequencer = sequencer;
this.waitStrategy = waitStrategy;
// 這里創(chuàng)建ProcessingSequenceBarrier對象的時候傳入的cursorSequence
//
this.cursorSequence = cursorSequence;
if (0 == dependentSequences.length)
{
dependentSequence = cursorSequence;
}
else
{
dependentSequence = new FixedSequenceGroup(dependentSequences);
}
}
}
Disruptor啟動
?Disruptor啟動的內(nèi)容主要是通過線程池對象運(yùn)行消費(fèi)者任務(wù)宅广,消費(fèi)者任務(wù)進(jìn)入While循環(huán)進(jìn)行循環(huán)消費(fèi)葫掉。整個啟動過程如下:
- 1、遍歷所有消費(fèi)者由consumerRepository保存依次啟動跟狱,ConsumerRepository的iterator()方法會遍歷Collection<ConsumerInfo> consumerInfos俭厚。
- 2、ConsumerInfo中保存的是EventProcessorInfo對象驶臊,EventProcessorInfo的start()方法執(zhí)行 executor.execute(eventprocessor)提交任務(wù)到線程池當(dāng)中挪挤。
- 3、EventProcessorInfo中的EventProcessor eventprocessor對象是我們綁定消費(fèi)函數(shù)指定的BatchEventProcessor對象关翎。
- 4扛门、BatchEventProcessor的內(nèi)部方法run()方法由executor負(fù)責(zé)執(zhí)行并啟動,從而讓消費(fèi)者進(jìn)入了消費(fèi)的循環(huán)階段纵寝。
public class Disruptor<T>
{
public RingBuffer<T> start()
{
checkOnlyStartedOnce();
// 負(fù)責(zé)啟動所有的消費(fèi)者對象论寨,ConsumerInfo對象是EventProcessorInfo
for (final ConsumerInfo consumerInfo : consumerRepository)
{
// 通過executor啟動consumer對象,
consumerInfo.start(executor);
}
return ringBuffer;
}
}
class ConsumerRepository<T> implements Iterable<ConsumerInfo>
{
private final Map<EventHandler<?>, EventProcessorInfo<T>> eventProcessorInfoByEventHandler =
new IdentityHashMap<EventHandler<?>, EventProcessorInfo<T>>();
private final Map<Sequence, ConsumerInfo> eventProcessorInfoBySequence =
new IdentityHashMap<Sequence, ConsumerInfo>();
private final Collection<ConsumerInfo> consumerInfos = new ArrayList<ConsumerInfo>();
public void add(
final EventProcessor eventprocessor,
final EventHandler<? super T> handler,
final SequenceBarrier barrier)
{
final EventProcessorInfo<T> consumerInfo = new EventProcessorInfo<T>(eventprocessor, handler, barrier);
eventProcessorInfoByEventHandler.put(handler, consumerInfo);
eventProcessorInfoBySequence.put(eventprocessor.getSequence(), consumerInfo);
consumerInfos.add(consumerInfo);
}
@Override
public Iterator<ConsumerInfo> iterator()
{
return consumerInfos.iterator();
}
}
class EventProcessorInfo<T> implements ConsumerInfo
{
private final EventProcessor eventprocessor;
private final EventHandler<? super T> handler;
private final SequenceBarrier barrier;
private boolean endOfChain = true;
EventProcessorInfo(
final EventProcessor eventprocessor, final EventHandler<? super T> handler, final SequenceBarrier barrier)
{
this.eventprocessor = eventprocessor;
this.handler = handler;
this.barrier = barrier;
}
@Override
public void start(final Executor executor)
{
executor.execute(eventprocessor);
}
}
public final class BatchEventProcessor<T>
implements EventProcessor
{
private final AtomicBoolean running = new AtomicBoolean(false);
private ExceptionHandler<? super T> exceptionHandler = new FatalExceptionHandler();
private final DataProvider<T> dataProvider;
// 保存sequenceBarrier對象爽茴,批量增加的消費(fèi)者會公用一個sequenceBarrier對象葬凳。
private final SequenceBarrier sequenceBarrier;
private final EventHandler<? super T> eventHandler;
private final Sequence sequence = new Sequence(Sequencer.INITIAL_CURSOR_VALUE);
private final TimeoutHandler timeoutHandler;
private final BatchStartAware batchStartAware;
public BatchEventProcessor(
final DataProvider<T> dataProvider,
final SequenceBarrier sequenceBarrier,
final EventHandler<? super T> eventHandler)
{
this.dataProvider = dataProvider;
this.sequenceBarrier = sequenceBarrier;
this.eventHandler = eventHandler;
if (eventHandler instanceof SequenceReportingEventHandler)
{
((SequenceReportingEventHandler<?>) eventHandler).setSequenceCallback(sequence);
}
batchStartAware =
(eventHandler instanceof BatchStartAware) ? (BatchStartAware) eventHandler : null;
timeoutHandler =
(eventHandler instanceof TimeoutHandler) ? (TimeoutHandler) eventHandler : null;
}
public void run()
{
if (!running.compareAndSet(false, true))
{
throw new IllegalStateException("Thread is already running");
}
sequenceBarrier.clearAlert();
notifyStart();
T event = null;
long nextSequence = sequence.get() + 1L;
try
{
while (true)
{
try
{
final long availableSequence = sequenceBarrier.waitFor(nextSequence);
if (batchStartAware != null)
{
batchStartAware.onBatchStart(availableSequence - nextSequence + 1);
}
while (nextSequence <= availableSequence)
{
event = dataProvider.get(nextSequence);
eventHandler.onEvent(event, nextSequence, nextSequence == availableSequence);
nextSequence++;
}
sequence.set(availableSequence);
}
catch (final TimeoutException e)
{
notifyTimeout(sequence.get());
}
catch (final AlertException ex)
{
if (!running.get())
{
break;
}
}
catch (final Throwable ex)
{
exceptionHandler.handleEventException(ex, nextSequence, event);
sequence.set(nextSequence);
nextSequence++;
}
}
}
finally
{
notifyShutdown();
running.set(false);
}
}
}
producer工作過程
?producer工作過程如下:
- 獲取下一個可用的序號用于生成數(shù)據(jù) ringBuffer.next()
- 生成數(shù)據(jù)并保存到循環(huán)數(shù)組當(dāng)中 ringBuffer.get(sequence)
- 更新RingBuffer當(dāng)中待消費(fèi)數(shù)據(jù)的位移 ringBuffer.publish(sequence)
- SingleProducerSequencer的next()方法邏輯都在注釋代碼當(dāng)中,自行閱讀室奏。
- ringBuffer.publish(sequence)會喚醒消費(fèi)者如果消費(fèi)者在等待狀態(tài)
RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer();
long sequence = ringBuffer.next();//請求下一個事件序號火焰;
try {
LongEvent event = ringBuffer.get(sequence);//獲取該序號對應(yīng)的事件對象;
long data = getEventData();//獲取要通過事件傳遞的業(yè)務(wù)數(shù)據(jù)胧沫;
event.set(data);
} finally{
ringBuffer.publish(sequence);//發(fā)布事件昌简;
}
public final class RingBuffer<E> extends RingBufferFields<E> implements Cursored, EventSequencer<E>, EventSink<E>
{
public static final long INITIAL_CURSOR_VALUE = Sequence.INITIAL_VALUE;
protected long p1, p2, p3, p4, p5, p6, p7;
// 獲取下一個可用的位置
public long next()
{
return sequencer.next();
}
// 數(shù)據(jù)放置到環(huán)形數(shù)組當(dāng)中
public void publish(long sequence)
{
sequencer.publish(sequence);
}
}
?next()方法獲取下一個可用的可寫的位置,publish()方法用于喚醒消費(fèi)者
public final class SingleProducerSequencer extends SingleProducerSequencerFields
{
protected long p1, p2, p3, p4, p5, p6, p7;
public SingleProducerSequencer(int bufferSize, final WaitStrategy waitStrategy)
{
super(bufferSize, waitStrategy);
}
public long next()
{
return next(1);
}
public long next(int n)
{
if (n < 1) //n表示此次生產(chǎn)者期望獲取多少個序號琳袄,通常是1
{
throw new IllegalArgumentException("n must be > 0");
}
long nextValue = this.nextValue;
// 生產(chǎn)者當(dāng)前序號值+期望獲取的序號數(shù)量后達(dá)到的序號值
long nextSequence = nextValue + n;
// 減掉RingBuffer的總的buffer值江场,用于判斷是否出現(xiàn)‘覆蓋’
// 因?yàn)槭黔h(huán)形數(shù)組的設(shè)計(jì),所以生產(chǎn)者最多比消費(fèi)者快一圈
// 所以減去bufferSize后比較看是否追上
long wrapPoint = nextSequence - bufferSize;
// 從后面代碼分析可得:cachedValue就是緩存的消費(fèi)者中最小序號值窖逗,他不是當(dāng)前‘消費(fèi)者中最小序號值’址否,
// 而是上次程序進(jìn)入到下面的if判定代碼段是,被賦值的當(dāng)時的‘消費(fèi)者中最小序號值’
// 這樣做的好處在于:在判定是否出現(xiàn)覆蓋的時候,不用每次都調(diào)用getMininumSequence計(jì)算‘消費(fèi)者中的最小序號值’佑附,從而節(jié)約開銷樊诺。
// 只要確保當(dāng)生產(chǎn)者的節(jié)奏大于了緩存的cachedGateingSequence一個bufferSize時,從新獲取一下 getMinimumSequence()即可音同。
long cachedGatingSequence = this.cachedValue;
// (wrapPoint > cachedGatingSequence) :
// 當(dāng)生產(chǎn)者已經(jīng)超過上一次緩存的‘消費(fèi)者中最小序號值’(cachedGatingSequence)一個‘Ring’大写逝馈(bufferSize),
// 需要重新獲取cachedGatingSequence权均,避免當(dāng)生產(chǎn)者一直在生產(chǎn)顿膨,但是消費(fèi)者不再消費(fèi)的情況下,出現(xiàn)‘覆蓋’
// (cachedGatingSequence > nextValue) : 生產(chǎn)者和消費(fèi)者均為順序遞增的叽赊,且生產(chǎn)者的seq“先于”消費(fèi)者的seq恋沃,注意是‘先于’而不是‘大于’。
// 當(dāng)nextValue>Long.MAXVALUE時必指,nextValue+1就會變成負(fù)數(shù)囊咏,wrapPoint也會變成負(fù)數(shù),這時候必然會是:cachedGatingSequence > nextValue
// 這個變化的過程會持續(xù)bufferSize個序號塔橡,這個區(qū)間梅割,由于getMinimumSequence()得到的雖然是名義上的‘消費(fèi)者中最小序號值’,但是不代表是走在‘最后面’的消費(fèi)者
if (wrapPoint > cachedGatingSequence || cachedGatingSequence > nextValue)
{
// 將生產(chǎn)者的cursor值更新到主存葛家,以便對所有的消費(fèi)者線程可見户辞。cursor表示現(xiàn)在生產(chǎn)那個節(jié)點(diǎn)了
cursor.setVolatile(nextValue); // StoreLoad fence
long minSequence;
// 生產(chǎn)者停下來,等待消費(fèi)者消費(fèi)惦银,直到‘覆蓋’現(xiàn)象清除咆课。
// 這里會獲取所有消費(fèi)者消費(fèi)最慢的消費(fèi)者的消費(fèi)位移保證不覆蓋。
while (wrapPoint > (minSequence = Util.getMinimumSequence(gatingSequences, nextValue)))
{
// 通知消費(fèi)者進(jìn)行消費(fèi)扯俱,然后自旋
waitStrategy.signalAllWhenBlocking();
LockSupport.parkNanos(1L); // TODO: Use waitStrategy to spin?
}
this.cachedValue = minSequence;
}
this.nextValue = nextSequence;
return nextSequence;
}
public void publish(long sequence)
{
// 設(shè)置當(dāng)前生產(chǎn)者的位置便于消費(fèi)者消費(fèi)
cursor.set(sequence);
// 通知消費(fèi)者進(jìn)行消費(fèi)
waitStrategy.signalAllWhenBlocking();
}
consumer工作過程
?consumer在綁定消費(fèi)者的過程中創(chuàng)建的BatchEventProcessor對象,核心邏輯在于run()方法:
- 獲取待消費(fèi)的數(shù)據(jù)的位置 sequenceBarrier.waitFor(nextSequence)
- 遍歷待消費(fèi)的數(shù)據(jù)進(jìn)行消費(fèi)eventHandler.onEvent()
- 設(shè)置該消費(fèi)者的消費(fèi)的位移sequence.set(availableSequence)
public final class BatchEventProcessor<T>
implements EventProcessor
{
private final AtomicBoolean running = new AtomicBoolean(false);
private ExceptionHandler<? super T> exceptionHandler = new FatalExceptionHandler();
private final DataProvider<T> dataProvider;
// 保存sequenceBarrier對象喇澡,這個sequenceBarrier在創(chuàng)建Consumer對象傳遞
private final SequenceBarrier sequenceBarrier;
private final EventHandler<? super T> eventHandler;
private final Sequence sequence = new Sequence(Sequencer.INITIAL_CURSOR_VALUE);
private final TimeoutHandler timeoutHandler;
private final BatchStartAware batchStartAware;
public BatchEventProcessor(
final DataProvider<T> dataProvider,
final SequenceBarrier sequenceBarrier,
final EventHandler<? super T> eventHandler)
{
this.dataProvider = dataProvider;
this.sequenceBarrier = sequenceBarrier;
this.eventHandler = eventHandler;
if (eventHandler instanceof SequenceReportingEventHandler)
{
((SequenceReportingEventHandler<?>) eventHandler).setSequenceCallback(sequence);
}
batchStartAware =
(eventHandler instanceof BatchStartAware) ? (BatchStartAware) eventHandler : null;
timeoutHandler =
(eventHandler instanceof TimeoutHandler) ? (TimeoutHandler) eventHandler : null;
}
// 消費(fèi)者進(jìn)行消費(fèi)迅栅,會在while循環(huán)當(dāng)中一直消費(fèi)
public void run()
{
if (!running.compareAndSet(false, true))
{
throw new IllegalStateException("Thread is already running");
}
sequenceBarrier.clearAlert();
notifyStart();
T event = null;
long nextSequence = sequence.get() + 1L;
try
{
while (true)
{
try
{
// 等待可消費(fèi)的Sequence,在sequenceBarrier.waitFor()內(nèi)部會執(zhí)行waitStrategy.waitFor()方法實(shí)現(xiàn)生產(chǎn)者和消費(fèi)者之間的通信晴玖。
final long availableSequence = sequenceBarrier.waitFor(nextSequence);
if (batchStartAware != null)
{
batchStartAware.onBatchStart(availableSequence - nextSequence + 1);
}
// 進(jìn)行數(shù)據(jù)消費(fèi)
while (nextSequence <= availableSequence)
{
event = dataProvider.get(nextSequence);
eventHandler.onEvent(event, nextSequence, nextSequence == availableSequence);
nextSequence++;
}
// 設(shè)置消費(fèi)者當(dāng)前的消費(fèi)進(jìn)度至sequence
sequence.set(availableSequence);
}
catch (final TimeoutException e)
{
notifyTimeout(sequence.get());
}
catch (final AlertException ex)
{
if (!running.get())
{
break;
}
}
catch (final Throwable ex)
{
exceptionHandler.handleEventException(ex, nextSequence, event);
sequence.set(nextSequence);
nextSequence++;
}
}
}
finally
{
notifyShutdown();
running.set(false);
}
}
}
?ProcessingSequenceBarrier的waitFor()方法內(nèi)部調(diào)用waitStrategy.waitFor()獲取當(dāng)前可消費(fèi)的數(shù)據(jù)的位置读存,不同的waitStrategy由不同的策略,在內(nèi)部通過信號量進(jìn)行通知呕屎。
final class ProcessingSequenceBarrier implements SequenceBarrier
{
private final WaitStrategy waitStrategy;
private final Sequence dependentSequence;
private volatile boolean alerted = false;
// 記錄生產(chǎn)者現(xiàn)在生產(chǎn)的數(shù)據(jù)到哪個下標(biāo)了,用于生產(chǎn)者和消費(fèi)者之間的消息同步
private final Sequence cursorSequence;
private final Sequencer sequencer;
public ProcessingSequenceBarrier(
final Sequencer sequencer,
final WaitStrategy waitStrategy,
final Sequence cursorSequence,
final Sequence[] dependentSequences)
{
this.sequencer = sequencer;
this.waitStrategy = waitStrategy;
this.cursorSequence = cursorSequence;
if (0 == dependentSequences.length)
{
dependentSequence = cursorSequence;
}
else
{
dependentSequence = new FixedSequenceGroup(dependentSequences);
}
}
public long waitFor(final long sequence)
throws AlertException, InterruptedException, TimeoutException
{
checkAlert();
// Wait for the given sequence to be available让簿。
// 這里并不保證返回值availableSequence一定等于 given sequence,他們的大小關(guān)系取決于采用的WaitStrategy秀睛。
// 1尔当、YieldingWaitStrategy在自旋100次嘗試后,會直接返回dependentSequence的最小seq蹂安,這時并不保證返回值>=given sequence
// 2椭迎、BlockingWaitStrategy則會阻塞等待given sequence可用為止锐帜,可用并不是說availableSequence == given sequence,而應(yīng)當(dāng)是指 >=
long availableSequence = waitStrategy.waitFor(sequence, cursorSequence, dependentSequence, this);
if (availableSequence < sequence)
{
return availableSequence;
}
// 獲取消費(fèi)者可以消費(fèi)的最大的可用序號畜号,支持批處理效應(yīng)缴阎,提升處理效率。
// 當(dāng)availableSequence > sequence時简软,需要遍歷 sequence --> availableSequence蛮拔,找到最前一個準(zhǔn)備就緒,可以被消費(fèi)的event對應(yīng)的seq痹升。
// 最小值為:sequence-1
return sequencer.getHighestPublishedSequence(sequence, availableSequence);
}
}