一、什么是 Disruptor
從功能上來看肺蔚,Disruptor 是實(shí)現(xiàn)了“隊(duì)列”的功能煌妈,而且是一個(gè)有界隊(duì)列。那么它的應(yīng)用場景自然就是“生產(chǎn)者-消費(fèi)者”模型的應(yīng)用場合了宣羊。
可以拿 JDK 的 BlockingQueue 做一個(gè)簡單對比璧诵,以便更好地認(rèn)識(shí) Disruptor 是什么。
我們知道 BlockingQueue 是一個(gè) FIFO 隊(duì)列仇冯,生產(chǎn)者(Producer)往隊(duì)列里發(fā)布(publish)一項(xiàng)事件(或稱之為“消息”也可以)時(shí)之宿,消費(fèi)者(Consumer)能獲得通知;如果沒有事件時(shí)苛坚,消費(fèi)者被堵塞比被,直到生產(chǎn)者發(fā)布了新的事件。
這些都是 Disruptor 能做到的泼舱,與之不同的是等缀,Disruptor 能做更多:
- 同一個(gè)“事件”可以有多個(gè)消費(fèi)者,消費(fèi)者之間既可以并行處理娇昙,也可以相互依賴形成處理的先后次序(形成一個(gè)依賴圖)尺迂;
- 預(yù)分配用于存儲(chǔ)事件內(nèi)容的內(nèi)存空間;
- 針對極高的性能目標(biāo)而實(shí)現(xiàn)的極度優(yōu)化和無鎖的設(shè)計(jì)冒掌;
以上的描述雖然簡單地指出了 Disruptor 是什么噪裕,但對于它“能做什么”還不是那么直截了當(dāng)。一般性地來說宋渔,當(dāng)你需要在兩個(gè)獨(dú)立的處理過程(兩個(gè)線程)之間交換數(shù)據(jù)時(shí)州疾,就可以使用 Disruptor 辜限。當(dāng)然使用隊(duì)列(如上面提到的 BlockingQueue)也可以皇拣,只不過 Disruptor 做得更好。
拿隊(duì)列來作比較的做法弱化了對 Disruptor 有多強(qiáng)大的認(rèn)識(shí)薄嫡,如果想要對此有更多的了解氧急,可以仔細(xì)看看 Disruptor 在其東家 LMAX 交易平臺(tái)(也是實(shí)現(xiàn)者) 是如何作為核心架構(gòu)來使用的,這方面就不做詳述了毫深,問度娘或谷哥都能找到吩坝。
二、Disruptor 的核心概念
先從了解 Disruptor 的核心概念開始哑蔫,來了解它是如何運(yùn)作的钉寝。下面介紹的概念模型弧呐,既是領(lǐng)域?qū)ο螅彩怯成涞酱a實(shí)現(xiàn)上的核心對象嵌纲。
- Ring Buffer
如其名俘枫,環(huán)形的緩沖區(qū)。曾經(jīng) RingBuffer 是 Disruptor 中的最主要的對象逮走,但從3.0版本開始鸠蚪,其職責(zé)被簡化為僅僅負(fù)責(zé)對通過 Disruptor 進(jìn)行交換的數(shù)據(jù)(事件)進(jìn)行存儲(chǔ)和更新。在一些更高級(jí)的應(yīng)用場景中师溅,Ring Buffer 可以由用戶的自定義實(shí)現(xiàn)來完全替代茅信。
- Sequence Disruptor
通過順序遞增的序號(hào)來編號(hào)管理通過其進(jìn)行交換的數(shù)據(jù)(事件),對數(shù)據(jù)(事件)的處理過程總是沿著序號(hào)逐個(gè)遞增處理墓臭。一個(gè) Sequence 用于跟蹤標(biāo)識(shí)某個(gè)特定的事件處理者( RingBuffer/Consumer )的處理進(jìn)度蘸鲸。雖然一個(gè) AtomicLong 也可以用于標(biāo)識(shí)進(jìn)度,但定義 Sequence 來負(fù)責(zé)該問題還有另一個(gè)目的窿锉,那就是防止不同的 Sequence 之間的CPU緩存?zhèn)喂蚕?Flase Sharing)問題棚贾。
(注:這是 Disruptor 實(shí)現(xiàn)高性能的關(guān)鍵點(diǎn)之一,網(wǎng)上關(guān)于偽共享問題的介紹已經(jīng)汗牛充棟榆综,在此不再贅述)妙痹。
- Sequencer
Sequencer 是 Disruptor 的真正核心。此接口有兩個(gè)實(shí)現(xiàn)類 SingleProducerSequencer鼻疮、MultiProducerSequencer 怯伊,它們定義在生產(chǎn)者和消費(fèi)者之間快速、正確地傳遞數(shù)據(jù)的并發(fā)算法判沟。
- Sequence Barrier
用于保持對RingBuffer的 main published Sequence 和Consumer依賴的其它Consumer的 Sequence 的引用耿芹。 Sequence Barrier 還定義了決定 Consumer 是否還有可處理的事件的邏輯。
- Wait Strategy
定義 Consumer 如何進(jìn)行等待下一個(gè)事件的策略挪哄。 (注:Disruptor 定義了多種不同的策略吧秕,針對不同的場景,提供了不一樣的性能表現(xiàn))
- Event
在 Disruptor 的語義中迹炼,生產(chǎn)者和消費(fèi)者之間進(jìn)行交換的數(shù)據(jù)被稱為事件(Event)砸彬。它不是一個(gè)被 Disruptor 定義的特定類型,而是由 Disruptor 的使用者定義并指定斯入。
- EventProcessor
EventProcessor 持有特定消費(fèi)者(Consumer)的 Sequence砂碉,并提供用于調(diào)用事件處理實(shí)現(xiàn)的事件循環(huán)(Event Loop)。
- EventHandler
Disruptor 定義的事件處理接口刻两,由用戶實(shí)現(xiàn)增蹭,用于處理事件,是 Consumer 的真正實(shí)現(xiàn)磅摹。
- Producer
即生產(chǎn)者滋迈,只是泛指調(diào)用 Disruptor 發(fā)布事件的用戶代碼霎奢,Disruptor 沒有定義特定接口或類型。
三饼灿、如何使用 Disruptor
Disruptor 的 API 十分簡單椰憋,主要有以下幾個(gè)步驟:
1.定義事件
事件(Event)就是通過 Disruptor 進(jìn)行交換的數(shù)據(jù)類型。
public class LongEvent
{
private long value;
public void set(long value)
{
this.value = value;
}
}
2.定義事件工廠
事件工廠(Event Factory)定義了如何實(shí)例化前面第1步中定義的事件(Event)赔退,需要實(shí)現(xiàn)接口 com.lmax.disruptor.EventFactory<T>橙依。
Disruptor 通過 EventFactory 在 RingBuffer 中預(yù)創(chuàng)建 Event 的實(shí)例。
一個(gè) Event 實(shí)例實(shí)際上被用作一個(gè)“數(shù)據(jù)槽”硕旗,發(fā)布者發(fā)布前窗骑,先從 RingBuffer 獲得一個(gè) Event 的實(shí)例,然后往 Event 實(shí)例中填充數(shù)據(jù)漆枚,之后再發(fā)布到 RingBuffer 中创译,之后由 Consumer 獲得該 Event 實(shí)例并從中讀取數(shù)據(jù)。
import com.lmax.disruptor.EventFactory;
public class LongEventFactory implements EventFactory<LongEvent>
{
public LongEvent newInstance()
{
return new LongEvent();
}
}
3.定義事件處理的具體實(shí)現(xiàn)
通過實(shí)現(xiàn)接口 com.lmax.disruptor.EventHandler<T> 定義事件處理的具體實(shí)現(xiàn)墙基。
import com.lmax.disruptor.EventHandler;
public class LongEventHandler implements EventHandler<LongEvent>
{
public void onEvent(LongEvent event, long sequence, boolean endOfBatch)
{
System.out.println("Event: " + event);
}
}
4.定義用于事件處理的線程池
Disruptor 通過 java.util.concurrent.ExecutorService 提供的線程來觸發(fā) Consumer 的事件處理软族。例如:
ExecutorService executor = Executors.newCachedThreadPool();
5.指定等待策略
Disruptor 定義了 com.lmax.disruptor.WaitStrategy 接口用于抽象 Consumer 如何等待新事件,這是策略模式的應(yīng)用残制。
Disruptor 提供了多個(gè) WaitStrategy 的實(shí)現(xiàn)立砸,每種策略都具有不同性能和優(yōu)缺點(diǎn),根據(jù)實(shí)際運(yùn)行環(huán)境的 CPU 的硬件特點(diǎn)選擇恰當(dāng)?shù)牟呗猿醪瑁⑴浜咸囟ǖ?JVM 的配置參數(shù)颗祝,能夠?qū)崿F(xiàn)不同的性能提升。
例如恼布,BlockingWaitStrategy螺戳、SleepingWaitStrategy、YieldingWaitStrategy 等折汞,其中倔幼,
- BlockingWaitStrategy 是最低效的策略,但其對CPU的消耗最小并且在各種不同部署環(huán)境中能提供更加一致的性能表現(xiàn)爽待;
- SleepingWaitStrategy 的性能表現(xiàn)跟 BlockingWaitStrategy 差不多损同,對 CPU 的消耗也類似,但其對生產(chǎn)者線程的影響最小堕伪,適合用于異步日志類似的場景揖庄;
- YieldingWaitStrategy 的性能是最好的,適合用于低延遲的系統(tǒng)欠雌。在要求極高性能且事件處理線數(shù)小于 CPU 邏輯核心數(shù)的場景中,推薦使用此策略疙筹;例如富俄,CPU開啟超線程的特性禁炒。
WaitStrategy BLOCKING_WAIT = new BlockingWaitStrategy();
WaitStrategy SLEEPING_WAIT = new SleepingWaitStrategy();
WaitStrategy YIELDING_WAIT = new YieldingWaitStrategy();
6.啟動(dòng) Disruptor
EventFactory<LongEvent> eventFactory = new LongEventFactory();
ExecutorService executor = Executors.newSingleThreadExecutor();
int ringBufferSize = 1024 * 1024; // RingBuffer 大小,必須是 2 的 N 次方霍比;
Disruptor<LongEvent> disruptor = new Disruptor<LongEvent>(eventFactory,
ringBufferSize, executor, ProducerType.SINGLE,
new YieldingWaitStrategy());
EventHandler<LongEvent> eventHandler = new LongEventHandler();
disruptor.handleEventsWith(eventHandler);
disruptor.start();
7.發(fā)布事件
Disruptor 的事件發(fā)布過程是一個(gè)兩階段提交的過程:
- 第一步:先從 RingBuffer 獲取下一個(gè)可以寫入的事件的序號(hào)幕袱;
- 第二步:獲取對應(yīng)的事件對象,將數(shù)據(jù)寫入事件對象悠瞬;
- 第三步:將事件提交到 RingBuffer;
事件只有在提交之后才會(huì)通知 EventProcessor 進(jìn)行處理们豌;
// 發(fā)布事件;
RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer();
long sequence = ringBuffer.next();//請求下一個(gè)事件序號(hào)浅妆;
try {
LongEvent event = ringBuffer.get(sequence);//獲取該序號(hào)對應(yīng)的事件對象望迎;
long data = getEventData();//獲取要通過事件傳遞的業(yè)務(wù)數(shù)據(jù);
event.set(data);
} finally{
ringBuffer.publish(sequence);//發(fā)布事件凌外;
}
注意辩尊,最后的 ringBuffer.publish 方法必須包含在 finally 中以確保必須得到調(diào)用;如果某個(gè)請求的 sequence 未被提交康辑,將會(huì)堵塞后續(xù)的發(fā)布操作或者其它的 producer摄欲。
Disruptor 還提供另外一種形式的調(diào)用來簡化以上操作,并確保 publish 總是得到調(diào)用疮薇。
static class Translator implements EventTranslatorOneArg<LongEvent, Long>{
@Override
public void translateTo(LongEvent event, long sequence, Long data) {
event.set(data);
}
}
public static Translator TRANSLATOR = new Translator();
public static void publishEvent2(Disruptor<LongEvent> disruptor) {
// 發(fā)布事件胸墙;
RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer();
long data = getEventData();//獲取要通過事件傳遞的業(yè)務(wù)數(shù)據(jù);
ringBuffer.publishEvent(TRANSLATOR, data);
}
此外按咒,Disruptor 要求 RingBuffer.publish 必須得到調(diào)用的潛臺(tái)詞就是劳秋,如果發(fā)生異常也一樣要調(diào)用 publish ,那么胖齐,很顯然這個(gè)時(shí)候需要調(diào)用者在事件處理的實(shí)現(xiàn)上來判斷事件攜帶的數(shù)據(jù)是否是正確的或者完整的玻淑,這是實(shí)現(xiàn)者應(yīng)該要注意的事情。
8.關(guān)閉 Disruptor
disruptor.shutdown();//關(guān)閉 disruptor呀伙,方法會(huì)堵塞补履,直至所有的事件都得到處理;
executor.shutdown();//關(guān)閉 disruptor 使用的線程池剿另;如果需要的話箫锤,必須手動(dòng)關(guān)閉, disruptor 在 shutdown 時(shí)不會(huì)自動(dòng)關(guān)閉雨女;