Disruptor核心源碼分析
說來慚愧爆惧,Log4j2
的異步日志已經用了將近2年時間了郎任。但是每次想看Disruptor
源碼的時候械蹋,總是沒能堅持下去降传。這次通過一次生產環(huán)境的故障至非,堅定了看源碼的決心钠署。
從何說起
在閱讀這篇文章之前,需要你具備一些對Disruptor
的基本了解荒椭。如果你對它
還一無所知谐鼎,希望你先通過下面的文章來入個門。
http://ifeve.com/dissecting-disruptor-whats-so-special/
http://ifeve.com/dissecting_the_disruptor_how_doi_read_from_the_ring_buffer/
http://ifeve.com/disruptor-writing-ringbuffer/
上面幾篇文章對Disruptor
總體流程的講解還是比較清楚的趣惠,如果你看完仍然不是特別理解狸棍,沒關系,也可以繼續(xù)往下看味悄,畢竟talk is cheap草戈,下面會借助code把Disruptor
的工作原理闡述清楚。本篇文章不會涉及”為什么Disruptor這么快“這個主題侍瑟,而把重點放在理解它的工作流程以及熟悉源代碼上唐片。
最簡單的Demo
我們先通過一個最簡單的Demo來感受一下Disruptor
的工作流程
// 事件數據結構 StringEvent
@Data
@NoArgsConstructor
public class StringEvent {
private String value;
}
public static void main(String[] args) throws InterruptedException {
Disruptor<StringEvent> disruptor = new Disruptor<>(StringEvent::new, 1024,
DaemonThreadFactory.INSTANCE);
disruptor.handleEventsWith(
(EventHandler<StringEvent>) (event, sequence, endOfBatch) -> System.out
.println(event));
disruptor.start();
disruptor.publishEvent((event, sequence) -> event.setValue("changed"));
// sleep一下 讓消費者可以執(zhí)行到 因為消費線程是守護線程
Thread.sleep(1000);
}
寥寥幾行代碼丙猬,就展示了一個完整的過程。我們來看看每一步具體做了什么:
第一步——創(chuàng)建Disruptor
創(chuàng)建Disruptor
费韭,Demo中采用的是參數較少的構造方法茧球,實際上完整的參數列表還包括producerType
和waitStrategy
:
public Disruptor(
final EventFactory<T> eventFactory,
final int ringBufferSize,
final ThreadFactory threadFactory,
final ProducerType producerType,
final WaitStrategy waitStrategy)
{
this(
RingBuffer.create(producerType, eventFactory, ringBufferSize, waitStrategy),
new BasicExecutor(threadFactory));
}
private Disruptor(final RingBuffer<T> ringBuffer, final Executor executor)
{
this.ringBuffer = ringBuffer;
this.executor = executor;
}
-
eventFactory
表示事件的構造器 -
ringBufferSize
表示RingBuffer的長度(容量) -
threadFactory
表示消費線程的創(chuàng)建工廠 -
producerType
表示是單生產者模式還是多生產者模式(默認是MULTI
) -
waitStrategy
表示當RingBuffer
中沒有可消費的Event時消費者的等待策略(默認是BlockingWaitStrategy
)
可以看到,通過上面5個參數構造出了一個RingBuffer
和一個Executor
星持,而這兩個組件構成了一個Disruptor
抢埋。這里的RingBuffer
除了存儲事件的職能(DataProvider
)還承擔著申請sequence和publish event的職能。Executor
作為消費者線程池督暂,主要是運行消費邏輯的揪垄。因此可以說,Disruptor
串聯起了生產者损痰、消費者以及RingBuffer
創(chuàng)建RingBuffer
RingBuffer
是個重點福侈,因為它不止是存儲,還干了很多活卢未。所謂能者多勞肪凛,也更值得我們研究。我們先看下創(chuàng)建它的靜態(tài)方法:
public static <E> RingBuffer<E> create(
ProducerType producerType,
EventFactory<E> factory,
int bufferSize,
WaitStrategy waitStrategy)
{
switch (producerType)
{
case SINGLE:
return createSingleProducer(factory, bufferSize, waitStrategy);
case MULTI:
return createMultiProducer(factory, bufferSize, waitStrategy);
default:
throw new IllegalStateException(producerType.toString());
}
}
根據生產者類型的不同辽社,存在兩種類型的RingBuffer
:單生產者類型和多生產者類型伟墙。為了更容易理解,我們這里先看Single
類型的滴铅,也就是單生產者類型的RingBuffer
:
// class RingBuffer
public static <E> RingBuffer<E> createSingleProducer(
EventFactory<E> factory,
int bufferSize,
WaitStrategy waitStrategy)
{
SingleProducerSequencer sequencer = new SingleProducerSequencer(bufferSize, waitStrategy);
return new RingBuffer<E>(factory, sequencer);
}
可以看到戳葵,這里通過一個EventFactory
和一個SingleProducerSequencer
構造了一個RingBuffer
。前者是用來創(chuàng)建事件對象的汉匙,而后者可以理解成RingBuffer
的"幫手"歹茶,RingBuffer
委托Sequencer
來處理一些非存儲類的工作(比如申請sequence悠砚,維護sequence進度,發(fā)布事件等)。
我們接著跟進去看看RingBuffer
的構造函數:
// class RingBuffer
RingBuffer(
EventFactory<E> eventFactory,
Sequencer sequencer)
{
super(eventFactory, sequencer);
}
// class RingBufferFields
RingBufferFields(
EventFactory<E> eventFactory,
Sequencer sequencer)
{
// “幫手”爆侣,主要用來處理sequence申請凉蜂、維護以及發(fā)布等工作
this.sequencer = sequencer;
this.bufferSize = sequencer.getBufferSize();
if (bufferSize < 1)
{
throw new IllegalArgumentException("bufferSize must not be less than 1");
}
if (Integer.bitCount(bufferSize) != 1)
{
throw new IllegalArgumentException("bufferSize must be a power of 2");
}
// indexMask主要是為了使用位運算取模的恨课,很多源碼里都能看到這類優(yōu)化
this.indexMask = bufferSize - 1;
// 可以看到這個數組除了正常的size之外還有填充的元素险胰,這個是為了解決false sharing的,本篇文章暫不展開
this.entries = new Object[sequencer.getBufferSize() + 2 * BUFFER_PAD];
// 預先填充數組元素屯援,這對垃圾回收很優(yōu)化猛们,后續(xù)發(fā)布事件等操作都不需要創(chuàng)建對象,而只需要即可
fill(eventFactory);
}
// class RingBufferFields
private void fill(EventFactory<E> eventFactory)
{
for (int i = 0; i < bufferSize; i++)
{
entries[BUFFER_PAD + i] = eventFactory.newInstance();
}
}
RingBuffer
的構造函數還是比較清晰的狞洋,跟著上面的注釋應該就可以理解弯淘。除了Buffer_PAD,這個我們留到后續(xù)文章中再去詳細的講解吉懊。
創(chuàng)建BasicExecutor
BasicExecutor實現了java.util.concurrent.Executor
接口耳胎,通過單參數ThreadFactory
函數構造:
public class BasicExecutor implements Executor{
public BasicExecutor(ThreadFactory factory)
{
this.factory = factory;
}
}
具體在哪里使用惯吕,我們后面會看到
第二步——注冊事件處理邏輯
disruptor.handleEventsWith(
(EventHandler<StringEvent>) (event, sequence, endOfBatch) -> System.out
.println(event));
代碼非常容易理解,注冊了一個事件處理的回調怕午,并且可以注冊多個废登,其中回調里有三個參數:
-
event
表示消費到的本次事件的主體,在例子里也就是StringEvent -
sequence
表示消費到的本次事件對應的sequence -
endOfBatch
表示消費到的本次事件是否是這個批次中的最后一個
由于默認的消費處理器(BatchEventProcessor
)是批量來處理事件的郁惜,所以會有批次的概念堡距。怎么樣算一個批次呢,這個后面講BatchEveentProessor
的時候會講到兆蕉。DEMO里的消費邏輯很簡單羽戒,打印一下event就完事。下面看看handleEventsWith
的源代碼:
public EventHandlerGroup<T> handleEventsWith(final EventHandler<? super T>... handlers)
{
// 注意虎韵,第一個參數恒為一個空數組
return createEventProcessors(new Sequence[0], handlers);
}
createEventProcessors
方法的第一個入參叫作barrierSequences
易稠,是給存在依賴關系的消費者用的。由于走Disruptor
實例調用handleEventsWith
都是像上面一樣傳的是空數組包蓝,為了便于理解驶社,可以先將它當成恒為空數組。
EventHandlerGroup<T> createEventProcessors(
final Sequence[] barrierSequences,
final EventHandler<? super T>[] eventHandlers)
{
checkNotStarted();
// 用來保存每個消費者的消費進度
final Sequence[] processorSequences = new Sequence[eventHandlers.length];
// SequenceBarrier主要是用來設置消費依賴的[詳解1]
final SequenceBarrier barrier = ringBuffer.newBarrier(barrierSequences);
for (int i = 0, eventHandlersLength = eventHandlers.length; i < eventHandlersLength; i++)
{
final EventHandler<? super T> eventHandler = eventHandlers[i];
// 可以看到每個eventHandler會被封裝成BatchEventProcessor测萎,看名字就知道是批量處理的了吧
final BatchEventProcessor<T> batchEventProcessor =
new BatchEventProcessor<T>(ringBuffer, barrier, eventHandler);
// 設置異常處理器
if (exceptionHandler != null)
{
batchEventProcessor.setExceptionHandler(exceptionHandler);
}
// 注冊到consumerRepository[詳解2]
consumerRepository.add(batchEventProcessor, eventHandler, barrier);
// 每一個BatchEventProcessor的消費進度
processorSequences[i] = batchEventProcessor.getSequence();
}
// 更新一些重要的東西[詳解3]
updateGatingSequencesForNextInChain(barrierSequences, processorSequences);
// 返回一個EventHandlerGroup亡电,這個主要是為了DSL服務的,可以先不關心硅瞧,可以看到DEMO中我們也沒有用到這個返回值
return new EventHandlerGroup<T>(this, consumerRepository, processorSequences);
}
上面采用了注釋來解釋代碼含義份乒,我覺得這種形式可能更有助于在看代碼的過程中理解。不過注釋也有局限性腕唧,比如上面寫了詳解的幾處或辖,這里我會詳細再跟進下源代碼:
詳解1——SequenceBarrier
SequenceBarrier
主要是設置消費依賴的。比如某個消費者必須等它依賴的消費者消費完某個消息之后才可以消費該消息枣接。當然此處是從Disruptor
上直接創(chuàng)建消費組孝凌,sequencesToTrack
都為空數組,所以只依賴于RingBuffer上的cursorSequence
(也就是只要RingBuffer上寫(publish)到哪了月腋,那么我就能消費到哪)
下面的代碼展示了通過RingBuffer
創(chuàng)建SequenceBarrier
的鏈路,發(fā)現最終創(chuàng)建的是ProcessingSequenceBarrier
瓣赂。并且在這條鏈路上榆骚,我們前面假定的sequencesToTrack
(也就是dependentSequences
)為空數組。那么根據上面的構造函數得出dependentSequence
= cursorSequence
= cursor
// class RingBuffer
public SequenceBarrier newBarrier(Sequence... sequencesToTrack)
{
return sequencer.newBarrier(sequencesToTrack);
}
// class AbstractSequencer
public SequenceBarrier newBarrier(Sequence... sequencesToTrack)
{
return new ProcessingSequenceBarrier(this, waitStrategy, cursor, sequencesToTrack);
}
// class ProcessingSequenceBarrier
public ProcessingSequenceBarrier(
final Sequencer sequencer,
final WaitStrategy waitStrategy,
final Sequence cursorSequence,
final Sequence[] dependentSequences)
{
this.sequencer = sequencer;
this.waitStrategy = waitStrategy;
this.cursorSequence = cursorSequence;
if (0 == dependentSequences.length)
{
dependentSequence = cursorSequence;
}
else
{
dependentSequence = new FixedSequenceGroup(dependentSequences);
}
}
這個cursor
是什么呢煌集?首先它是Sequencer
的成員變量妓肢。而Sequencer
有兩種:SingleProducerSequencer
和MultiProducerSequencer
。對于SingleProducerSequencer
來說苫纤,cursor
表示的是RingBuffer
上當前已發(fā)布的最大sequence碉钠,而對于MultiProducerSequencer
來說纲缓,cursor
表示的是RingBuffer
上當前已申請的最大sequence。此處先有個概念即可喊废,下面講完生產邏輯之后會詳細描述
詳解2——ConsumerRepository
// class ConsumerRepository
public void add(
final EventProcessor eventprocessor,
final EventHandler<? super T> handler,
final SequenceBarrier barrier)
{
final EventProcessorInfo<T> consumerInfo = new EventProcessorInfo<T>(eventprocessor, handler, barrier);
eventProcessorInfoByEventHandler.put(handler, consumerInfo);
eventProcessorInfoBySequence.put(eventprocessor.getSequence(), consumerInfo);
consumerInfos.add(consumerInfo);
}
無論從類名還是方法體祝高,都可以看出,這個對象主要是用來存儲消費者信息的污筷,有兩個維度的Map工闺。具體是哪里用,我們用到的時候再說好了~
詳解3——一些更新
private void updateGatingSequencesForNextInChain(Sequence[] barrierSequences, Sequence[] processorSequences)
{
if (processorSequences.length > 0)
{
ringBuffer.addGatingSequences(processorSequences);
for (final Sequence barrierSequence : barrierSequences)
{
ringBuffer.removeGatingSequence(barrierSequence);
}
consumerRepository.unMarkEventProcessorsAsEndOfChain(barrierSequences);
}
}
首先瓣蛀,我們要搞清楚這兩個入參的意義:
- barrierSequences:依賴的消費進度
- processorSequences:新進消費者的進度
其次陆蟆,還要弄明白一個問題:在向RingBuffer
寫入數據的時候,如何判定RingBuffer已滿(這個應該在前面入門那幾篇文章里要掌握的)惋增?通過看最慢的消費者的消費進度是不是已經被生產者拉了一圈了(類似1000米跑步的套圈)叠殷。
理解了上面兩個問題之后,再來看代碼總共做的三個事情:
- 把新進消費者的消費進度加入到【所有消費者的消費進度數組】中
- 如果說這個新進消費者是依賴了其他的消費者的诈皿,那么把其他的消費者從【所有消費者的消費進度數組】中移除林束。這里為什么要移除呢?因為【所有消費者的消費進度數組】主要是用來獲取最慢的進度的纫塌。那么被依賴的可以不用考慮诊县,因為它不可能比依賴它的慢。并且讓這個數組足夠小措左,可以提升計算最慢進度的性能依痊。
- 把被依賴的消費者的endOfChain屬性設置成false。這個endOfChain是用來干嘛的呢怎披?其實主要是Disruptor在shutdown的時候需要判定是否所有消費者都已經消費完了(如果依賴了別人的消費者都消費完了胸嘁,那么整條鏈路上一定都消費完了)。
第三步——啟動Disruptor
public RingBuffer<T> start()
{
checkOnlyStartedOnce();
for (final ConsumerInfo consumerInfo : consumerRepository)
{
consumerInfo.start(executor);
}
return ringBuffer;
}
這個consumerRepository
是不是很熟悉凉逛?這是第二步詳解二里ConsumerInfo
注冊的地方性宏。可以看到啟動Disruptor
其實就是在啟動消費線程:
// class EventProcessorInfo
public void start(final Executor executor)
{
// 這里對應的是BatchEventProcessor
executor.execute(eventprocessor);
}
// class BasicExecutor
public void execute(Runnable command)
{
final Thread thread = factory.newThread(command);
if (null == thread)
{
throw new RuntimeException("Failed to create thread to run: " + command);
}
thread.start();
threads.add(thread);
}
那么消費線程的具體邏輯是状飞?看看BatchEventProcessor
的run()
方法:
public void run()
{
if (!running.compareAndSet(false, true))
{
throw new IllegalStateException("Thread is already running");
}
sequenceBarrier.clearAlert();
notifyStart();
T event = null;
// 成員變量sequence維護該Processor的消費進度
long nextSequence = sequence.get() + 1L;
try
{
while (true)
{
try
{
// 以nextSequence作為底線毫胜,去獲取最大的可用sequence(也就是已經被publish的sequence)
final long availableSequence = sequenceBarrier.waitFor(nextSequence);
// 如果獲取到的sequence大于等于nextSequence,說明有可以消費的event诬辈,從nextSequence(包含)到availableSequence(包含)這一段的事件就作為同一個批次
while (nextSequence <= availableSequence)
{
event = dataProvider.get(nextSequence);
// 調用了前面注冊的回調函數
eventHandler.onEvent(event, nextSequence, nextSequence == availableSequence);
nextSequence++;
}
// 消費完一批之后 一次性更新消費進度
sequence.set(availableSequence);
}
catch (final TimeoutException e)
{
// waitFor超時的場景
notifyTimeout(sequence.get());
}
catch (final AlertException ex)
{
if (!running.get())
{
break;
}
}
catch (final Throwable ex)
{
// 消費過程中如果拋出異常酵使,表面上看會更新消費進度,也就是說沒有補償機制焙糟。但實際上默認的策略是會拋異常的口渔,消費線程會直接結束掉
exceptionHandler.handleEventException(ex, nextSequence, event);
sequence.set(nextSequence);
nextSequence++;
}
}
}
finally
{
notifyShutdown();
running.set(false);
}
}
sequenceBarrier.waitFor
里的邏輯值得好好看一看,不過我覺得等看完發(fā)布事件的流程之后會更容易理解穿撮。
第四步——發(fā)布事件
這里使用的是EventTranslator
的方式來發(fā)布事件的:
// class Disruptor
public void publishEvent(final EventTranslator<T> eventTranslator)
{
ringBuffer.publishEvent(eventTranslator);
}
// class RingBuffer
public void publishEvent(EventTranslator<E> translator)
{
final long sequence = sequencer.next();
translateAndPublish(translator, sequence);
}
private void translateAndPublish(EventTranslator<E> translator, long sequence)
{
try
{
translator.translateTo(get(sequence), sequence);
}
finally
{
sequencer.publish(sequence);
}
}
從上面的代碼結構可以看出來缺脉,發(fā)布事件總共分為三個步驟:
- 申請sequence
- 填充事件內容
- 提交發(fā)布
有點類似數據庫事務的味道痪欲。為了便于理解,我們還是以SingleProducerSequencer
來分析下上面三個步驟:
申請sequence
// class SingleProducerSequencer
public long next()
{
return next(1);
}
public long next(int n)
{
if (n < 1)
{
throw new IllegalArgumentException("n must be > 0");
}
// nextValue這個變量名有點詭異攻礼,實際上表示已經申請到的那個sequence
long nextValue = this.nextValue;
// nextSequence表示本次需要申請的最大sequence
long nextSequence = nextValue + n;
// 計算出nextSequence在上一圈的點位
long wrapPoint = nextSequence - bufferSize;
// 最慢消費進度的緩存
long cachedGatingSequence = this.cachedValue;
// 下面這個條件表達式以及其代碼塊解釋起來可能需要比較大的篇幅业踢,所以在下面[核心代碼詳解]里說明
if (wrapPoint > cachedGatingSequence || cachedGatingSequence > nextValue)
{
cursor.setVolatile(nextValue); // StoreLoad fence
long minSequence;
while (wrapPoint > (minSequence = Util.getMinimumSequence(gatingSequences, nextValue)))
{
// 通知下消費者
waitStrategy.signalAllWhenBlocking();
// 生產者如果沒有空間寫數據了,只能無限park
LockSupport.parkNanos(1L); // TODO: Use waitStrategy to spin?
}
this.cachedValue = minSequence;
}
this.nextValue = nextSequence;
return nextSequence;
}
核心代碼詳解
首先秘蛔,我們看下if表達式里的兩個條件:
wrapPoint > cachedGatingSequence
陨亡,結合我們對Disruptor
的了解,此處判斷的應該是此次要申請的sequence是否已經領先最慢消費進度一圈了(類似1000米跑步的套圈)-
cachedGatingSequence > nextValue
判斷的是最慢消費進度超過了我們即將要申請的sequence深员,乍一看這應該是不可能的吧负蠕,都還沒申請到該sequence怎么可能消費到呢?找了些資料倦畅,發(fā)現確實是存在該場景的:RingBuffer
提供了一個叫resetTo
的方法遮糖,可以重置當前已申請sequence為一個指定值并publish出去:@Deprecated public void resetTo(long sequence) { sequencer.claim(sequence); sequencer.publish(sequence); }
具體資料可參考:
- https://github.com/LMAX-Exchange/disruptor/issues/280
- https://github.com/LMAX-Exchange/disruptor/issues/76
不過該代碼已經標注為@Deprecated,按照作者的意思叠赐,后續(xù)是要刪掉的欲账。那么在此處分析的時候,我們就將當它恒為false芭概。
對于第一個條件表達式赛不,也有個值得注意的地方:因為里面的【最慢消費進度】取的是緩存值(cached)。而這個緩存值是什么時候更新的呢罢洲?答案是只有在“套圈”了以后才會更新踢故。這個邏輯你品,你細品惹苗,那么你會發(fā)現殿较,每申請RingBuffer.size()
個sequence之后都會走進上面的“套圈”邏輯來更新cachedGatingSequence
。這樣就極大的減少了Util.getMinimumSequence(gatingSequences, nextValue)
的運算量
再來看看“套圈”時需要執(zhí)行的邏輯:
- 插入一個StoreLoad屏障桩蓉,防止是因為內存可見性導致的消費者消費不了數據(應該極少存在這樣的情況吧)
- 實時計算一下最慢消費進度
Util.getMinimumSequence(gatingSequences, nextValue)
- 如果真的套圈了淋纲,那么就一直死循環(huán)直到RingBuffer上有空間可以申請
- 更新【最慢消費進度緩存】
注意,當消費者消費過慢時院究,可能會導致生產者無限park洽瞬,這個在編程的時候要特別留意。
提交發(fā)布
填充事件內容沒什么好說的业汰,無非就是設值的過程伙窃。設值完成之后,就可以發(fā)布了:
public void publish(long sequence)
{
cursor.set(sequence);
waitStrategy.signalAllWhenBlocking();
}
這里會把cursor
的值設置為當前申請的sequence蔬胯,代表序號為sequence的事件發(fā)布成功。這里的cursor
表示已經publish的最大事件序號(在多生產者模式中并不是)位他,所以我們在使用過程中需要依次申請氛濒,依次發(fā)布产场,不能直接上來就publish(100),這樣會導致消費者會認為100以前的序號也都就緒了舞竿。另外京景,由于我們現在看的是單生產者模式,也不需要考慮并發(fā)場景骗奖。
sequenceBarrier.waitFor
看完了生產者的流程确徙,我們來回顧下在消費者里這一句關鍵的代碼。前面有提到過ProcessingSequenceBarrier
执桌,帶著這一絲絲的印象我們來看看下面這段獲取availableSequence的邏輯:
// class ProcessingSequenceBarrier
public long waitFor(final long sequence)
throws AlertException, InterruptedException, TimeoutException
{
checkAlert();
// waitStrategy派上用場了鄙皇,這是我們在構造Disruptor的時候的入參(也是構造RingBuffer的入參)
long availableSequence = waitStrategy.waitFor(sequence, cursorSequence, dependentSequence, this);
// 理論上沒有可能為true,因為當前每種waitStrategy內都保證了availableSequence一定大于等于sequence
if (availableSequence < sequence)
{
return availableSequence;
}
// 返回最大的已發(fā)布的sequence仰挣,在單生產者模式下這個函數返回值就等于availableSequence
return sequencer.getHighestPublishedSequence(sequence, availableSequence);
}
跟著上面的注釋伴逸,相信應該沒有什么理解上的難點,上面的代碼核心就兩步:
- 通過
WaitStrategy.waitFor()
獲取availableSequence
膘壶,下面會分析具體的邏輯 - 通過
sequencer
來得到最大的已發(fā)布的sequence(HighestPublishedSequence
)
WaitStrategy.waitFor
先看看第一步中的WaitStrategy.waitFor()
方法错蝴,這里以BlockingWaitStrategy
為例:
// class BlockingWaitStrategy
// 這里的四個入參我們捋一捋
// sequence:消費者想要消費的最小sequence(底線)
// cursorSequence:Sequencer的cursor,也就是當前RingBuffer上已經被申請的最大sequence(在講生產者邏輯的時候提到了)
// dependentSequence:在我們當前鏈路為cursorSequence颓芭,不存在消費依賴(如果存在依賴的話顷锰,則為依賴消費者消費進度)
// barrier:這個主要是用了其中一些中斷方法,不用太care
public long waitFor(long sequence, Sequence cursorSequence, Sequence dependentSequence, SequenceBarrier barrier)
throws AlertException, InterruptedException
{
long availableSequence;
if (cursorSequence.get() < sequence)
{
lock.lock();
try
{
while (cursorSequence.get() < sequence)
{
barrier.checkAlert();
processorNotifyCondition.await();
}
}
finally
{
lock.unlock();
}
}
// 看到了吧亡问,這里已經保證了availableSequence必然大于等于sequence
// 并且在存在依賴的場景中官紫,被依賴消費者存在慢消費的話,會直接導致下游進入死循環(huán)
while ((availableSequence = dependentSequence.get()) < sequence)
{
barrier.checkAlert();
}
return availableSequence;
}
從上面的代碼能看到玛界,WaitStrategy.waitFor()
獲取的是依賴消費者的消費進度sequence(默認依賴RingBuffer上已申請進度的sequence)万矾。需要注意的一點是,當消費者獲取可消費事件的過程中慎框,存在兩種場景需要等待:
- RingBuffer上沒有事件可以消費
- RingBuffer上有可消費事件良狈,但是依賴的消費者還未消費完該事件
如果是第一種場景,那么消費者會采用WaitStrategy
的策略進行等待笨枯。而如果是第二種場景的話薪丁,只能如上所示一樣進入死循環(huán)(此時可能造成cpu升高)。
sequencer.getHighestPublishedSequence
從WaitStrategy.waitFor()
返回后馅精,得到的是RingBuffer
上已申請進度sequence或者是依賴消費者消費進度sequence(當然如果把cursorSequence
也看成一種依賴的話严嗜,理解起來就統一了)。注意一個形容詞——“已申請”洲敢,而不是“已發(fā)布”漫玄,“已申請”意味著還不一定“已發(fā)布”,也就是還不能消費。所以睦优,SequenceBarrier.waitFor
最后還有一步sequencer.getHighestPublishedSequence(sequence, availableSequence)
渗常。
當然如果你很仔細的看到這里并且對于前面的內容都理解了,你可能會產生疑問:對于單生產者來說汗盘,本來就是在publish的時候才更新cursor的爸宓狻?那上一步從WaitStrategy.waitFor()
獲取到的不就是“已發(fā)布”的進度sequence嗎隐孽?是的癌椿,你說得很正確。對于單生產者確實如此菱阵,所以但生產者對應的實現為:
public long getHighestPublishedSequence(long lowerBound, long availableSequence)
{
return availableSequence;
}
而對于多生產者的話踢俄,邏輯就會相對復雜一點,這個我們下一篇文章再分析
總結
到這里送粱,Disruptor核心的邏輯我們基本上看完了褪贵。我們介紹了Disruptor中的單生產者模式的生產邏輯以及默認的單線程批量消費邏輯。當然這只是最基本的模式抗俄,為了讓我們對Disruptor的邏輯和源代碼有一個整體的了解脆丁。后面的文章我們會涉及更多的場景,比如
- 多生產者模式是如何工作的
- 如何實現消費依賴
- Log4j2是如何使用Disruptor的
- Disruptor性能高的原因以及使用過程中的一些心得等
如文中有描述錯誤动雹,還望指出槽卫,以便改正,多謝~