之前我們已經(jīng)熟悉了Disruptor的啟動(dòng)和事件生產(chǎn)操作锌俱,接下來(lái)我們一同探究Disruptor如何消費(fèi)事件圃阳。
0x00 概念回顧
我們先回顧下Disruptor消費(fèi)相關(guān)的名詞概念:
Event: Disruptor中傳輸?shù)氖录?br>
RingBuffer: 存儲(chǔ)和更新事件的容器。
EventHandler: 用戶(hù)實(shí)現(xiàn)接口视卢,包含消費(fèi)處理邏輯纠炮,代表Disruptor一個(gè)消費(fèi)者。
EventProcessor: EventProcessor繼承了Runnable接口莉测,包含處理Disruptor事件的主循環(huán)颜骤。
多播事件: 隊(duì)列和Disruptor在表現(xiàn)行為上最大的區(qū)別唧喉。隊(duì)列中的一個(gè)事件只能被一個(gè)消費(fèi)者消費(fèi),而Disruptor中的事件會(huì)發(fā)布給所有消費(fèi)者忍抽。特別適合同一數(shù)據(jù)的獨(dú)立并行處理操作八孝。
消費(fèi)者依賴(lài)圖(消費(fèi)鏈):同一事件需要被多個(gè)消費(fèi)者消費(fèi)時(shí),消費(fèi)者之間可能有依賴(lài)關(guān)系鸠项,如消費(fèi)者A,B,C干跛,B和C依賴(lài)A先執(zhí)行,但是B和C可以并行消費(fèi)祟绊。
0x01 EventProcessor接口概覽
OK楼入,咱們正式開(kāi)始對(duì)Disruptor消費(fèi)者的源碼解讀。
Disruptor的消費(fèi)者依賴(lài)EventProcessor循環(huán)處理可用事件牧抽。EventProcessor顧名思義嘉熊,就是事件處理器(handle和process都可以翻譯為“處理”,但是process側(cè)重于機(jī)器的處理扬舒,而handle側(cè)重于有人工的處理阐肤,所以使用handle表示用戶(hù)邏輯的處理,使用process表示機(jī)器的處理)讲坎,這個(gè)接口有兩個(gè)實(shí)現(xiàn)類(lèi)孕惜,分別是WorkProcessor和BatchEventProcessor,它們對(duì)應(yīng)的邏輯處理消費(fèi)者分別是EventHandler和WorkHandler晨炕。下面是EventProcessor的UML類(lèi)圖及EventHandler和EventProcessor的接口定義衫画。
/**
* Callback interface to be implemented for processing events as they become available in the {@link RingBuffer}
*
* @param <T> event implementation storing the data for sharing during exchange or parallel coordination of an event.
* @see BatchEventProcessor#setExceptionHandler(ExceptionHandler) if you want to handle exceptions propagated out of the handler.
* 處理事件的回調(diào)接口
*/
public interface EventHandler<T>
{
/**
* Called when a publisher has published an event to the {@link RingBuffer}
*
* @param event published to the {@link RingBuffer}
* @param sequence of the event being processed
* @param endOfBatch flag to indicate if this is the last event in a batch from the {@link RingBuffer}
* @throws Exception if the EventHandler would like the exception handled further up the chain.
*/
void onEvent(T event, long sequence, boolean endOfBatch) throws Exception;
}
/**
* EventProcessors waitFor events to become available for consumption from the {@link RingBuffer}
* <p>
* An EventProcessor will generally be associated with a Thread for execution.
* 事件執(zhí)行器,等待RingBuffer有可用消費(fèi)事件瓮栗。一個(gè)事件處理器關(guān)聯(lián)一個(gè)執(zhí)行線(xiàn)程
*/
public interface EventProcessor extends Runnable
{
/**
* Get a reference to the {@link Sequence} being used by this {@link EventProcessor}.
*
* @return reference to the {@link Sequence} for this {@link EventProcessor}
*/
Sequence getSequence();
/**
* Signal that this EventProcessor should stop when it has finished consuming at the next clean break.
* It will call {@link SequenceBarrier#alert()} to notify the thread to check status.
*/
void halt();
boolean isRunning();
}
EventProcessor接口繼承了Runnable接口碧磅,主要有兩種實(shí)現(xiàn):?jiǎn)尉€(xiàn)程批量處理BatchEventProcessor和多線(xiàn)程處理WorkProcessor碘箍。
在使用Disruptor幫助類(lèi)構(gòu)建消費(fèi)者時(shí),使用handleEventsWith方法傳入多個(gè)EventHandler鲸郊,內(nèi)部使用多個(gè)BatchEventProcessor關(guān)聯(lián)多個(gè)線(xiàn)程執(zhí)行丰榴。這種情況類(lèi)似JMS中的發(fā)布訂閱模式,同一事件會(huì)被多個(gè)消費(fèi)者并行消費(fèi)秆撮。適用于同一事件觸發(fā)多種操作四濒。
而使用Disruptor的handleEventsWithWorkerPool傳入多個(gè)WorkHandler時(shí),內(nèi)部使用多個(gè)WorkProcessor關(guān)聯(lián)多個(gè)線(xiàn)程執(zhí)行职辨。這種情況類(lèi)似JMS的點(diǎn)對(duì)點(diǎn)模式盗蟆,同一事件會(huì)被一組消費(fèi)者其中之一消費(fèi)。適用于提升消費(fèi)者并行處理能力舒裤。
0x02 消費(fèi)技術(shù)實(shí)現(xiàn)
我們先回顧下Disruptor消費(fèi)者的兩個(gè)特點(diǎn):消費(fèi)者依賴(lài)圖(即下文所謂的“消費(fèi)鏈”)和事件多播喳资。
假設(shè)現(xiàn)在有A,B,C,D四個(gè)消費(fèi)者,它們都能組成什么樣的形式呢腾供?從眾多的排列組合中仆邓,我挑了4組比較有代表性的消費(fèi)鏈形式。
- 第1組中伴鳖,消費(fèi)者A消費(fèi)按成后节值,B、C榜聂、D可同時(shí)消費(fèi)搞疗;
- 第2組中,消費(fèi)者A须肆、B匿乃、C、D順序消費(fèi)豌汇;
- 第3組中幢炸,消費(fèi)者A、B順序消費(fèi)后瘤礁,C阳懂、D同時(shí)消費(fèi);
- 第4組中柜思,消費(fèi)者A在消費(fèi)完成后岩调,B和C可以同時(shí)消費(fèi),但是必須在都消費(fèi)完成后赡盘,D才能消費(fèi)号枕。
標(biāo)號(hào)為1、3陨享、4的消費(fèi)鏈都使用了事件多播葱淳,可見(jiàn)事件多播屬于消費(fèi)鏈的一種組合形式钝腺。注意,在上面4種組合中赞厕,每個(gè)組合的每一水平行艳狐,都屬于一個(gè)消費(fèi)者組。
這些還只是較為簡(jiǎn)單的消費(fèi)鏈組成皿桑,實(shí)際中消費(fèi)鏈可能會(huì)更復(fù)雜毫目。
那么在Disruptor內(nèi)部是怎么實(shí)現(xiàn)消費(fèi)鏈的呢?
我們可以先思考下诲侮。如果想把獨(dú)立的消費(fèi)者組成消費(fèi)鏈镀虐,那么后方的消費(fèi)者(組)必然要知道在它前方的消費(fèi)者(組)的處理情況,否則就做不到順序消費(fèi)沟绪。同時(shí)刮便,消費(fèi)者也要了解生產(chǎn)者的位置,來(lái)判斷是否有可用事件绽慈。之前我們分析生產(chǎn)者代碼的時(shí)候恨旱,已經(jīng)講過(guò),生產(chǎn)者為了不覆蓋沒(méi)有消費(fèi)完全的事件久信,必須知道最慢消費(fèi)者的處理情況窖杀。
做到了這些才會(huì)有能力去控制消費(fèi)者組成消費(fèi)鏈漓摩。下面讓我們具體看Disruptor中的實(shí)現(xiàn)裙士。
0x02.1 使用BatchEventProcessor單線(xiàn)程批處理事件
在使用BatchEventProcessor時(shí),通過(guò)Disruptor#handleEventsWith方法可以獲取一個(gè)EventHandlerGroup管毙,再通過(guò)EventHandlerGroup的and和then方法可以構(gòu)建一個(gè)復(fù)雜的消費(fèi)者鏈腿椎。EventHandlerGroup表示一組事件消費(fèi)者,內(nèi)部持有了Disruptor類(lèi)實(shí)例disruptor夭咬,其大部分功能都是通過(guò)調(diào)用disruptor實(shí)現(xiàn)啃炸,其實(shí)可以算作是Disruptor這個(gè)輔助類(lèi)的一部分。
// EventHandlerGroup.java
public EventHandlerGroup<T> then(final EventHandler<? super T>... handlers)
{
return handleEventsWith(handlers);
}
public EventHandlerGroup<T> handleEventsWith(final EventHandler<? super T>... handlers)
{
return disruptor.createEventProcessors(sequences, handlers);
}
// Disruptor.java
public EventHandlerGroup<T> handleEventsWith(final EventHandler<? super T>... handlers)
{
return createEventProcessors(new Sequence[0], handlers);
}
// 由EventHandlerGroup調(diào)用時(shí)卓舵,barrierSequences是EventHandlerGroup實(shí)例的序列南用,也就是上一個(gè)事件處理者組的序列,作為當(dāng)前事件處理的門(mén)控掏湾,防止后邊的消費(fèi)鏈超前
// 如果是第一次調(diào)用handleEventsWith裹虫,則barrierSequences是一個(gè)空數(shù)組
EventHandlerGroup<T> **createEventProcessors**(
final Sequence[] barrierSequences,
final EventHandler<? super T>[] eventHandlers)
{
checkNotStarted();
// 對(duì)應(yīng)此事件處理器組的序列組
final Sequence[] processorSequences = new Sequence[eventHandlers.length];
final SequenceBarrier barrier = ringBuffer.newBarrier(barrierSequences);
for (int i = 0, eventHandlersLength = eventHandlers.length; i < eventHandlersLength; i++)
{
final EventHandler<? super T> eventHandler = eventHandlers[i];
// 批量處理事件的循環(huán)
final BatchEventProcessor<T> batchEventProcessor =
new BatchEventProcessor<T>(ringBuffer, barrier, eventHandler);
if (exceptionHandler != null)
{
batchEventProcessor.setExceptionHandler(exceptionHandler);
}
consumerRepository.add(batchEventProcessor, eventHandler, barrier);
processorSequences[i] = batchEventProcessor.getSequence();
}
// 每次添加完事件處理器后,更新門(mén)控序列融击,以便后續(xù)調(diào)用鏈的添加筑公。(所謂門(mén)控,是指后續(xù)消費(fèi)鏈的消費(fèi)尊浪,不能超過(guò)前邊匣屡。)
updateGatingSequencesForNextInChain(barrierSequences, processorSequences);
return new EventHandlerGroup<T>(this, consumerRepository, processorSequences);
}
// 為消費(fèi)鏈下一組消費(fèi)者封救,更新門(mén)控序列
// barrierSequences是上一組事件處理器組的序列(如果本次是第一次,則為空數(shù)組)捣作,本組不能超過(guò)上組序列值
// processorSequences是本次要設(shè)置的事件處理器組的序列
private void updateGatingSequencesForNextInChain(final Sequence[] barrierSequences, final Sequence[] processorSequences)
{
if (processorSequences.length > 0)
{
ringBuffer.addGatingSequences(processorSequences); // 將本組序列添加到Sequencer中的gatingSequences中
for (final Sequence barrierSequence : barrierSequences) // 將上組序列從Sequencer中的gatingSequences中誉结,gatingSequences一直保存消費(fèi)鏈末端消費(fèi)者的序列組
{
ringBuffer.removeGatingSequence(barrierSequence);
}
consumerRepository.unMarkEventProcessorsAsEndOfChain(barrierSequences); // 取消標(biāo)記上一組消費(fèi)者為消費(fèi)鏈末端
}
}
可以看到,使用BatchEventProcessor構(gòu)建消費(fèi)者鏈時(shí)的邏輯都在createEventProcessors這個(gè)方法中券躁。
先簡(jiǎn)單說(shuō)下ConsumerRepository搓彻,這個(gè)類(lèi)主要保存消費(fèi)者的各種關(guān)系,如通過(guò)EventHandler引用獲取EventProcessorInfo信息嘱朽,通過(guò)Sequence獲取ConsumerInfo信息等旭贬。因?yàn)橐褂靡米鰇ey,所以數(shù)據(jù)結(jié)構(gòu)使用IdentityHashMap搪泳。IdentityHashMap
和HashMap最大的不同稀轨,就是使用==而不是equals比較key。
這個(gè)createEventProcessors方法接收兩個(gè)參數(shù)岸军,barrierSequences表示當(dāng)前消費(fèi)者組的屏障序列數(shù)組奋刽,如果當(dāng)前消費(fèi)者組是第一組,則取一個(gè)空的序列數(shù)組艰赞;否則佣谐,barrierSequences就是上一組消費(fèi)者組的序列數(shù)組。createEventProcessors方法的另一個(gè)參數(shù)eventHandlers方妖,這個(gè)參數(shù)是代表事件消費(fèi)邏輯的EventHandler數(shù)組狭魂。
Disruptor為每個(gè)EventHandler實(shí)現(xiàn)類(lèi)都創(chuàng)建了一個(gè)對(duì)應(yīng)的BatchEventProcessor。
在構(gòu)建BatchEventProcessor時(shí)需要以下傳入三個(gè)構(gòu)造參數(shù):dataProvider是數(shù)據(jù)存儲(chǔ)結(jié)構(gòu)如RingBuffer党觅;sequenceBarrier用于跟蹤生產(chǎn)者游標(biāo)雌澄,協(xié)調(diào)數(shù)據(jù)處理;eventHandler是用戶(hù)實(shí)現(xiàn)的事件處理器杯瞻,也就是實(shí)際的消費(fèi)者镐牺。
注意,Disruptor并非為每個(gè)BatchEventProcessor都創(chuàng)建一個(gè)新的SequenceBarrier魁莉,而是每個(gè)消費(fèi)者組共用一個(gè)SequenceBarrier睬涧。
BatchEventProcessor定義如下。至于為什么要叫做BatchEventProcessor旗唁,可以看看在run()方法里每次waitFor獲取的availableSequence是當(dāng)前能夠使用的最大值畦浓,然后再循環(huán)處理這些數(shù)據(jù)。這樣當(dāng)消費(fèi)者有瞬時(shí)抖動(dòng)逆皮,導(dǎo)致暫時(shí)落后生產(chǎn)者時(shí)宅粥,可在下一次循環(huán)中,批量處理所有落后的事件电谣。
/**
* Convenience class for handling the batching semantics of consuming entries from a {@link RingBuffer}
* and delegating the available events to an {@link EventHandler}.
* <p>
* If the {@link EventHandler} also implements {@link LifecycleAware} it will be notified just after the thread
* is started and just before the thread is shutdown.
*
* @param <T> event implementation storing the data for sharing during exchange or parallel coordination of an event.
*
* 每個(gè)EventHandler對(duì)應(yīng)一個(gè)EventProcessor執(zhí)行者秽梅,BatchEventProcessor每次大循環(huán)可以獲取最高可用序號(hào)抹蚀,并循環(huán)調(diào)用EventHandler
*/
public final class BatchEventProcessor<T>
implements EventProcessor
{
private final AtomicBoolean running = new AtomicBoolean(false);
private ExceptionHandler<? super T> exceptionHandler = new FatalExceptionHandler();
private final DataProvider<T> dataProvider; // 數(shù)據(jù)提供者,默認(rèn)是RingBuffer企垦,也可替換為自己的數(shù)據(jù)結(jié)構(gòu)
private final SequenceBarrier sequenceBarrier; // 默認(rèn)為ProcessingSequenceBarrier
private final EventHandler<? super T> eventHandler; // 此EventProcessor對(duì)應(yīng)的用戶(hù)自定義的EventHandler實(shí)現(xiàn)
private final Sequence sequence = new Sequence(Sequencer.INITIAL_CURSOR_VALUE); // 當(dāng)前執(zhí)行位置
private final TimeoutHandler timeoutHandler;
private final BatchStartAware batchStartAware; // 每次循環(huán)取得一批可用事件后环壤,在實(shí)際處理前調(diào)用
/**
* Construct a {@link EventProcessor} that will automatically track the progress by updating its sequence when
* the {@link EventHandler#onEvent(Object, long, boolean)} method returns.
*
* @param dataProvider to which events are published.
* @param sequenceBarrier on which it is waiting.
* @param eventHandler is the delegate to which events are dispatched.
*/
public BatchEventProcessor(
final DataProvider<T> dataProvider,
final SequenceBarrier sequenceBarrier,
final EventHandler<? super T> eventHandler)
{
this.dataProvider = dataProvider;
this.sequenceBarrier = sequenceBarrier;
this.eventHandler = eventHandler;
if (eventHandler instanceof SequenceReportingEventHandler)
{
((SequenceReportingEventHandler<?>) eventHandler).setSequenceCallback(sequence);
}
batchStartAware =
(eventHandler instanceof BatchStartAware) ? (BatchStartAware) eventHandler : null;
timeoutHandler =
(eventHandler instanceof TimeoutHandler) ? (TimeoutHandler) eventHandler : null;
}
// ... 省略部分代碼
/**
* It is ok to have another thread rerun this method after a halt().
*
* @throws IllegalStateException if this object instance is already running in a thread
*/
@Override
public void run()
{
if (!running.compareAndSet(false, true))
{
throw new IllegalStateException("Thread is already running");
}
sequenceBarrier.clearAlert();
notifyStart();
T event = null;
long nextSequence = sequence.get() + 1L;
try
{
while (true)
{
try
{ // availableSequence返回的是可用的最大值
final long availableSequence = sequenceBarrier.waitFor(nextSequence); // 使用給定的等待策略去等待下一個(gè)序列可用
if (batchStartAware != null)
{
batchStartAware.onBatchStart(availableSequence - nextSequence + 1);
}
// 批處理在此處得以體現(xiàn)
while (nextSequence <= availableSequence)
{
event = dataProvider.get(nextSequence);
eventHandler.onEvent(event, nextSequence, nextSequence == availableSequence);
nextSequence++;
}
// eventHandler處理完畢后,更新當(dāng)前序號(hào)
sequence.set(availableSequence);
}
catch (final TimeoutException e)
{
notifyTimeout(sequence.get());
}
catch (final AlertException ex)
{
if (!running.get())
{
break;
}
}
catch (final Throwable ex)
{
exceptionHandler.handleEventException(ex, nextSequence, event);
sequence.set(nextSequence);
nextSequence++;
}
}
}
finally
{
notifyShutdown();
running.set(false);
}
}
}
0x02.2 消費(fèi)者可用序列屏障-SequenceBarrier
我們重點(diǎn)看一下SequenceBarrier钞诡,可直譯為“序列屏障”郑现。SequenceBarrier的主要作用是協(xié)調(diào)獲取消費(fèi)者可處理到的最大序號(hào),內(nèi)部持有著生產(chǎn)者和其依賴(lài)的消費(fèi)者序列荧降。它的接口定義如下接箫。
public interface SequenceBarrier
{
/**
* Wait for the given sequence to be available for consumption.<br>
* 等待指定序列可用
* @param sequence to wait for
* @return the sequence up to which is available
* @throws AlertException if a status change has occurred for the Disruptor
* @throws InterruptedException if the thread needs awaking on a condition variable.
* @throws TimeoutException
*
*/
long waitFor(long sequence) throws AlertException, InterruptedException, TimeoutException;
/**
* Get the current cursor value that can be read.<br>
* 獲取當(dāng)前可讀游標(biāo)值
*
* @return value of the cursor for entries that have been published.
*
*/
long getCursor();
/**
* The current alert status for the barrier.<br>
* 當(dāng)前的alert狀態(tài)
*
* @return true if in alert otherwise false.
*/
boolean isAlerted();
/**
* Alert the {@link EventProcessor}s of a status change and stay in this status until cleared.<br>
*
* 通知消費(fèi)者狀態(tài)變化。當(dāng)調(diào)用EventProcessor#halt()將調(diào)用此方法朵诫。
*/
void alert();
/**
* Clear the current alert status.<br>
* 清楚alert狀態(tài)
*/
void clearAlert();
/**
* Check if an alert has been raised and throw an {@link AlertException} if it has.
* 檢查是否發(fā)生alert辛友,發(fā)生將拋出異常
* @throws AlertException if alert has been raised.
*/
void checkAlert() throws AlertException;
}
SequenceBarrier實(shí)例引用被EventProcessor持有,用于等待并獲取可用的消費(fèi)事件剪返,主要體現(xiàn)在waitFor這個(gè)方法废累。
要實(shí)現(xiàn)這個(gè)功能,需要3點(diǎn)條件:
- 知道生產(chǎn)者的位置脱盲。
- 因?yàn)镈isruptor支持消費(fèi)者鏈邑滨,在不同的消費(fèi)者組之間,要保證后邊的消 費(fèi)者組只有在前消費(fèi)者組中的消費(fèi)者都處理完畢后钱反,才能進(jìn)行處理掖看。
- 暫時(shí)沒(méi)有事件可消費(fèi),在等待可用消費(fèi)時(shí)诈铛,還需要使用某種等待策略進(jìn)行等待乙各。
看下SequenceBarrier實(shí)現(xiàn)類(lèi)ProcessingSequenceBarrier的代碼是如何實(shí)現(xiàn)waitFor方法墨礁。
final class ProcessingSequenceBarrier implements SequenceBarrier
{
private final WaitStrategy waitStrategy; // 等待可用消費(fèi)時(shí)幢竹,指定的等待策略
private final Sequence dependentSequence; // 依賴(lài)的上組消費(fèi)者的序號(hào),如果當(dāng)前為第一組則為cursorSequence(即生產(chǎn)者發(fā)布游標(biāo)序列)恩静,否則使用FixedSequenceGroup封裝上組消費(fèi)者序列
private volatile boolean alerted = false; // 當(dāng)觸發(fā)halt時(shí)焕毫,將標(biāo)記alerted為true
private final Sequence cursorSequence; // AbstractSequencer中的cursor引用,記錄當(dāng)前發(fā)布者發(fā)布的最新位置
private final Sequencer sequencer; // MultiProducerSequencer 或 SingleProducerSequencer
public ProcessingSequenceBarrier(
final Sequencer sequencer,
final WaitStrategy waitStrategy,
final Sequence cursorSequence,
final Sequence[] dependentSequences)
{
this.sequencer = sequencer;
this.waitStrategy = waitStrategy;
this.cursorSequence = cursorSequence;
if (0 == dependentSequences.length) // 依賴(lài)的上一組序列長(zhǎng)度驶乾,第一次是0
{
dependentSequence = cursorSequence;
}
else // 將上一組序列數(shù)組復(fù)制成新數(shù)組保存邑飒,引用不變
{
dependentSequence = new FixedSequenceGroup(dependentSequences);
}
}
@Override
public long waitFor(final long sequence)
throws AlertException, InterruptedException, TimeoutException
{
// 檢查是否停止服務(wù)
checkAlert();
// 獲取最大可用序號(hào) sequence為給定序號(hào),一般為當(dāng)前序號(hào)+1级乐,cursorSequence記錄生產(chǎn)者最新位置疙咸,
long availableSequence = waitStrategy.waitFor(sequence, cursorSequence, dependentSequence, this);
if (availableSequence < sequence)
{
return availableSequence;
}
// 返回已發(fā)布最高的序列值,將對(duì)每個(gè)序號(hào)進(jìn)行校驗(yàn)
return sequencer.getHighestPublishedSequence(sequence, availableSequence);
}
// ...
}
0x02.3 該用什么姿勢(shì)等待可用事件-WaitStrategy
看來(lái)實(shí)際的等待操作還是在WaitStrategy#waitFor完成的风科。
// WaitStrategy.java
/**
* Strategy employed for making {@link EventProcessor}s wait on a cursor {@link Sequence}. <br>
* 消費(fèi)者等待可用事件的策略
*/
public interface WaitStrategy
{
/**
* Wait for the given sequence to be available. It is possible for this method to return a value
* less than the sequence number supplied depending on the implementation of the WaitStrategy. A common
* use for this is to signal a timeout. Any EventProcessor that is using a WaitStrategy to get notifications
* about message becoming available should remember to handle this case. The {@link BatchEventProcessor} explicitly
* handles this case and will signal a timeout if required.
*
* @param sequence to be waited on. 給定序號(hào)
* @param cursor the main sequence from ringbuffer. Wait/notify strategies will
* need this as it's the only sequence that is also notified upon update. 生產(chǎn)者游標(biāo)
* @param dependentSequence on which to wait. 依賴(lài)的序列撒轮,一般是上一個(gè)消費(fèi)者組序列的FixedSequenceGroup封裝乞旦。如果消費(fèi)者是第一組,則為cursor题山。
* @param barrier the processor is waiting on. 在等待時(shí)需要判斷是否對(duì)消費(fèi)者有alert操作
* @return the sequence that is available which may be greater than the requested sequence.
* @throws AlertException if the status of the Disruptor has changed.
* @throws InterruptedException if the thread is interrupted.
* @throws TimeoutException
*/
long waitFor(long sequence, Sequence cursor, Sequence dependentSequence, SequenceBarrier barrier)
throws AlertException, InterruptedException, TimeoutException;
/**
* Implementations should signal the waiting {@link EventProcessor}s that the cursor has advanced. <br>
* 當(dāng)生產(chǎn)者發(fā)布新事件后兰粉,將通知等待的EventProcessor。當(dāng)用鎖機(jī)制時(shí)才會(huì)包含相應(yīng)邏輯顶瞳。
*/
void signalAllWhenBlocking();
}
在各種等待策略中玖姑,我們選取阻塞策略研究。
public final class BlockingWaitStrategy implements WaitStrategy
{
private final Lock lock = new ReentrantLock();
private final Condition processorNotifyCondition = lock.newCondition();
@Override
public long waitFor(long sequence, Sequence cursorSequence, Sequence dependentSequence, SequenceBarrier barrier)
throws AlertException, InterruptedException
{
long availableSequence;
if (cursorSequence.get() < sequence) // 當(dāng)前游標(biāo)小于給定序號(hào)慨菱,也就是無(wú)可用事件
{
lock.lock();
try
{
while (cursorSequence.get() < sequence) // 當(dāng)給定的序號(hào)大于生產(chǎn)者游標(biāo)序號(hào)時(shí)焰络,進(jìn)行等待
{
barrier.checkAlert();
// 循環(huán)等待,在Sequencer中publish進(jìn)行喚醒;等待消費(fèi)時(shí)也會(huì)在循環(huán)中定時(shí)喚醒早处。
// 循環(huán)等待的原因扛或,是要檢查alert狀態(tài)。如果不檢查將導(dǎo)致不能關(guān)閉Disruptor备蚓。
processorNotifyCondition.await();
}
}
finally
{
lock.unlock();
}
}
// 給定序號(hào)大于上一個(gè)消費(fèi)者組最慢消費(fèi)者(如當(dāng)前消費(fèi)者為第一組則和生產(chǎn)者游標(biāo)序號(hào)比較)序號(hào)時(shí),需要等待囱稽。不能超前消費(fèi)上一個(gè)消費(fèi)者組未消費(fèi)完畢的事件郊尝。
// 那么為什么這里沒(méi)有鎖呢?可以想一下此時(shí)的場(chǎng)景战惊,代碼運(yùn)行至此流昏,已能保證生產(chǎn)者有新事件,如果進(jìn)入循環(huán)吞获,說(shuō)明上一組消費(fèi)者還未消費(fèi)完畢况凉。
// 而通常我們的消費(fèi)者都是較快完成任務(wù)的,所以這里才會(huì)考慮使用Busy Spin的方式等待上一組消費(fèi)者完成消費(fèi)各拷。
while ((availableSequence = dependentSequence.get()) < sequence)
{
barrier.checkAlert();
}
return availableSequence;
}
@Override
public void signalAllWhenBlocking()
{
lock.lock();
try
{
processorNotifyCondition.signalAll();
}
finally
{
lock.unlock();
}
}
@Override
public String toString()
{
return "BlockingWaitStrategy{" +
"processorNotifyCondition=" + processorNotifyCondition +
'}';
}
}
阻塞等待策略使用Lock+Condition的方式等待生產(chǎn)者生產(chǎn)可用事件刁绒,而使用Busy Spin的方式等待可能出現(xiàn)的上一個(gè)消費(fèi)者組未消費(fèi)完成的情況。
這里給我們一個(gè)提示烤黍,在構(gòu)建低延遲系統(tǒng)時(shí)知市,因?yàn)殒i的性能消耗,盡量不要使用鎖速蕊。如果必須要用鎖嫂丙,也要把鎖粒度調(diào)到最小。
另外规哲,消費(fèi)者在等待可用消費(fèi)事件時(shí)跟啤,會(huì)循環(huán)調(diào)用barrier.checkAlert(),再去調(diào)用鎖的條件等待,等待可用消費(fèi)事件隅肥。
有三個(gè)地方可以喚醒等待中的消費(fèi)線(xiàn)程关顷。兩種是在Sequencer實(shí)現(xiàn)類(lèi)中,一是有可用事件發(fā)布武福,通知消費(fèi)線(xiàn)程繼續(xù)消費(fèi)议双;二是在調(diào)用next()獲取可用的RingBuffer槽位時(shí),發(fā)現(xiàn)RingBuffer滿(mǎn)了(生產(chǎn)者速度大于消費(fèi)者捉片,導(dǎo)致生產(chǎn)者沒(méi)有可用位置發(fā)布事件)平痰,將喚醒消費(fèi)者線(xiàn)程,此功能在3.3.5版本新增(Resignal any waiting threads when trying to publish to a full ring buffer )伍纫。開(kāi)始我百思不得宗雇,為什么要在buffer滿(mǎn)了的時(shí)候不斷喚醒消費(fèi)者線(xiàn)程,直到看到這個(gè)issue才明白莹规。大意是在log4j2中使用Disruptor時(shí)發(fā)生了死鎖赔蒲,為了避免在發(fā)布事件時(shí),由于某種原因?qū)е聸](méi)有通知到消費(fèi)者良漱,在生產(chǎn)者嘗試往一個(gè)已滿(mǎn)的buffer發(fā)布數(shù)據(jù)時(shí)舞虱,就會(huì)再通知消費(fèi)者進(jìn)行消費(fèi)。而這個(gè)bug最終也被Log4j認(rèn)領(lǐng)母市,與Disruptor無(wú)關(guān)矾兜。Disruptor這里的再次通知也是為了更加保險(xiǎn)。
//*ProducerSequencer.java
// next(n)中的代碼
// 由于慢消費(fèi)者患久,無(wú)可用坑位椅寺,只有當(dāng)消費(fèi)者消費(fèi),向前移動(dòng)后蒋失,才能跳出循環(huán)
// 由于外層判斷使用的是緩存的消費(fèi)者序列最小值返帕,這里使用真實(shí)的消費(fèi)者序列進(jìn)行判斷,并將最新結(jié)果在跳出while循環(huán)之后進(jìn)行緩存
while (wrapPoint > (minSequence = Util.getMinimumSequence(gatingSequences, nextValue)))
{ // 喚醒等待的消費(fèi)者篙挽,正常情況下并無(wú)意義荆萤,只是為了避免極少數(shù)情況下未知原因?qū)е碌陌l(fā)布時(shí)鎖機(jī)制出現(xiàn)異常,未通知到消費(fèi)者
waitStrategy.signalAllWhenBlocking();
LockSupport.parkNanos(1L); // TODO: Use waitStrategy to spin?
}
還有一種喚醒就是關(guān)閉Disruptor時(shí)嫉髓,消費(fèi)者關(guān)閉前將會(huì)處理完當(dāng)前批次數(shù)據(jù)(并非RingBuffer的所有數(shù)據(jù)观腊,而是此次循環(huán)取出的最大可用序號(hào)以下的所有未處理數(shù)據(jù)),如果消費(fèi)者線(xiàn)程當(dāng)前在等待狀態(tài)算行,將被喚醒并終結(jié)。
BatchEventProcessor就講到這苫耸。
0x02.4 使用WorkProcessor多線(xiàn)程處理事件
下面說(shuō)一說(shuō)WorkHandler+WorkProcessor州邢。
上面講過(guò),使用EventHandler+BatchEventProcessor這種方式類(lèi)似JMS的發(fā)布訂閱,同一個(gè)事件會(huì)被不同線(xiàn)程的EventHandler并行消費(fèi)量淌。那么骗村,如果單線(xiàn)程處理能力不足,想多線(xiàn)程處理同一主題下的不同事件該怎么辦呢呀枢?這種方式就類(lèi)似JMS的點(diǎn)到點(diǎn)模式胚股,多個(gè)消費(fèi)者可以監(jiān)聽(tīng)同一個(gè)隊(duì)列,誰(shuí)先拿到就歸誰(shuí)處理裙秋。
在Disruptor中使用WorkHandler+WorkProcessor實(shí)現(xiàn)以上功能琅拌。當(dāng)需要使用這種模式,可在設(shè)置Disruptor消費(fèi)者時(shí)摘刑,通過(guò)使用handleEventsWithWorkerPool和thenHandleEventsWithWorkerPool設(shè)置消費(fèi)鏈进宝。
disruptor
.handleEventsWithWorkerPool(
new WorkHandler[]{
journalHandler,
journalHandler,
journalHandler
}
)
.thenHandleEventsWithWorkerPool(resultHandler);
先看下相關(guān)的源碼。
// Disruptor
public EventHandlerGroup<T> handleEventsWithWorkerPool(final WorkHandler<T>... workHandlers)
{
return createWorkerPool(new Sequence[0], workHandlers);
}
EventHandlerGroup<T> createWorkerPool(
final Sequence[] barrierSequences, final WorkHandler<? super T>[] workHandlers)
{
final SequenceBarrier sequenceBarrier = ringBuffer.newBarrier(barrierSequences);
final WorkerPool<T> workerPool = new WorkerPool<T>(ringBuffer, sequenceBarrier, exceptionHandler, workHandlers);
consumerRepository.add(workerPool, sequenceBarrier);
final Sequence[] workerSequences = workerPool.getWorkerSequences();
updateGatingSequencesForNextInChain(barrierSequences, workerSequences);
return new EventHandlerGroup<T>(this, consumerRepository, workerSequences);
}
// WorkerPool.java WorkerPool構(gòu)造方法
public WorkerPool(
final RingBuffer<T> ringBuffer,
final SequenceBarrier sequenceBarrier,
final ExceptionHandler<? super T> exceptionHandler,
final WorkHandler<? super T>... workHandlers)
{
this.ringBuffer = ringBuffer;
final int numWorkers = workHandlers.length;
workProcessors = new WorkProcessor[numWorkers];
for (int i = 0; i < numWorkers; i++)
{
workProcessors[i] = new WorkProcessor<T>( // 為每個(gè)WorkHandler新建一個(gè)WorkProcessor
ringBuffer,
sequenceBarrier,
workHandlers[i],
exceptionHandler,
workSequence);
}
}
在使用線(xiàn)程池處理事件時(shí)枷恕,與單線(xiàn)程處理相比党晋,最大的不同在于新增了一個(gè)WorkerPool。WorkerPool用于管理一組WorkProcessor徐块,它的屬性未玻、方法如下。
WorkProcessor的原理和BatchEventProcessor類(lèi)似胡控,只是多了workSequence用來(lái)保存同組共用的處理序列深胳。在更新workSequence時(shí),涉及多線(xiàn)程操作铜犬,所以使用CAS進(jìn)行更新舞终。
WorkProcessor的run()方法如下。
@Override
public void run()
{
if (!running.compareAndSet(false, true))
{
throw new IllegalStateException("Thread is already running");
}
sequenceBarrier.clearAlert();
notifyStart();
boolean processedSequence = true;
long cachedAvailableSequence = Long.MIN_VALUE;
long nextSequence = sequence.get();
T event = null;
while (true)
{
try
{
// if previous sequence was processed - fetch the next sequence and set
// that we have successfully processed the previous sequence
// typically, this will be true
// this prevents the sequence getting too far forward if an exception
// is thrown from the WorkHandler
if (processedSequence) // 表示nextSequence序號(hào)的處理情況(不區(qū)分正逞⒒或是異常處理)敛劝。只有處理過(guò),才能申請(qǐng)下一個(gè)序號(hào)纷宇。
{
processedSequence = false;
do
{
// 同組中多個(gè)消費(fèi)線(xiàn)程有可能會(huì)爭(zhēng)搶一個(gè)序號(hào)夸盟,使用CAS避免使用鎖。
// 同一組使用一個(gè)workSequence像捶,WorkProcessor不斷申請(qǐng)下一個(gè)可用序號(hào)上陕,對(duì)workSequence設(shè)置成功才會(huì)實(shí)際消費(fèi)。
nextSequence = workSequence.get() + 1L;
sequence.set(nextSequence - 1L);
}
while (!workSequence.compareAndSet(nextSequence - 1L, nextSequence));
}
// 緩存的可用序號(hào)比要處理的序號(hào)大拓春,才能進(jìn)行處理
if (cachedAvailableSequence >= nextSequence)
{
event = ringBuffer.get(nextSequence);
workHandler.onEvent(event);
processedSequence = true;
}
else // 更新緩存的可用序列释簿。這個(gè)cachedAvailableSequence只用在WorkProcessor實(shí)例內(nèi),不同實(shí)例的緩存可能是不一樣的
{ // 和單線(xiàn)程模式類(lèi)似硼莽,返回的也是最大可用序號(hào)
cachedAvailableSequence = sequenceBarrier.waitFor(nextSequence);
}
}
catch (final TimeoutException e)
{
notifyTimeout(sequence.get());
}
catch (final AlertException ex)
{
if (!running.get())
{
break;
}
}
catch (final Throwable ex)
{
// handle, mark as processed, unless the exception handler threw an exception
exceptionHandler.handleEventException(ex, nextSequence, event);
processedSequence = true;
}
}
notifyShutdown();
running.set(false);
}
代碼邏輯和BatchEventProcessor類(lèi)似庶溶,就不再贅述啦。
還有一點(diǎn)需要留意,Disruptor通過(guò)EventHandlerGroup代表一個(gè)消費(fèi)者組偏螺,就表示之前那四張圖中一個(gè)水平線(xiàn)上的消費(fèi)者組行疏。這樣不同的消費(fèi)者組之間不必關(guān)心各自的實(shí)現(xiàn),從而可以實(shí)現(xiàn)更加復(fù)雜和靈活的消費(fèi)鏈套像,即依賴(lài)圖表酿联。
0x03 消費(fèi)者小結(jié)
從小語(yǔ)文老師就教育我們寫(xiě)作文要總結(jié),好習(xí)慣不能忘~
本文主要探討了Disruptor消費(fèi)者內(nèi)部概要實(shí)現(xiàn)夺巩,重點(diǎn)闡述了BatchEventProcessor贞让、WorkProcess的消費(fèi)代碼原理。同時(shí)省略了超時(shí)通知劲够、開(kāi)始和結(jié)束通知震桶、異常控制等內(nèi)容征绎,并非不重要蹲姐,而只是盡量言簡(jiǎn)意賅,達(dá)到拋磚引玉的目的人柿。
BatchEventProcessor主要用于處理單線(xiàn)程并行任務(wù)柴墩,同一消費(fèi)者組的不同消費(fèi)者會(huì)接收相同的事件,并在所有事件處理完畢后進(jìn)入下一消費(fèi)者組進(jìn)行處理(是不是類(lèi)似JUC里的Phaser凫岖、CyclicBarrier或CountDownLatch呢)江咳。WorkProcessor通過(guò)WorkerPool管理多個(gè)WorkProcessor,達(dá)到多線(xiàn)程處理事件的目的哥放,同一消費(fèi)者組的多個(gè)WorkProcessor不會(huì)處理同一個(gè)事件歼指。通過(guò)選擇不同的WaitStragegy實(shí)現(xiàn),可以控制消費(fèi)者在沒(méi)有可用事件處理時(shí)的等待策略甥雕。
好啦踩身,有關(guān)Disruptor消費(fèi)者的分享就到這。
歡迎大家留言討論社露,一同探討挟阻,一同進(jìn)步。