1 Disruptor
1.1 簡(jiǎn)介
1.1.1 定義
Disruptor
是一個(gè)開(kāi)源的高性能內(nèi)存隊(duì)列檐涝,由英國(guó)外匯交易公司 LMAX 開(kāi)發(fā)的虽界,獲得了 2011 年的 Oracle 官方的 Duke's Choice Awards(Duke 選擇大獎(jiǎng))挽唉。
Disruptor
提供的功能類似于 Kafka
晕拆、RocketMQ
這類分布式隊(duì)列淤堵,不過(guò)捶枢,其作為范圍是 JVM
(內(nèi)存),Disruptor
解決了 JDK
內(nèi)置線程安全隊(duì)列的性能和內(nèi)存安全問(wèn)題火邓,Disruptor
有個(gè)最大的優(yōu)點(diǎn)就是快
Disruptor
被設(shè)計(jì)用于在生產(chǎn)者
—消費(fèi)者
(producer-consumer problem
丹弱,簡(jiǎn)稱PCP
)問(wèn)題上獲得盡量高的吞吐量(TPS)和盡量低的延遲
Disruptor
是LMAX在線交易平臺(tái)的關(guān)鍵組成部分,LMAX平臺(tái)使用該框架對(duì)訂單處理速度能達(dá)到600萬(wàn)TPS
铲咨,除金融領(lǐng)域之外躲胳,其他一般的應(yīng)用中都可以用到 Disruptor
,它可以帶來(lái)顯著的性能提升纤勒。其實(shí) Disruptor
與其說(shuō)是一個(gè)框架坯苹,不如說(shuō)是一種設(shè)計(jì)思路,這個(gè)設(shè)計(jì)思路對(duì)于存在并發(fā)摇天、緩沖區(qū)粹湃、生產(chǎn)者—消費(fèi)者模型、事務(wù)處理
這些元素的程序來(lái)說(shuō)泉坐,Disruptor
提出了一種大幅提升性能(TPS)的方案为鳄。
github 地址
Github 地址:https://github.com/LMAX-Exchange/disruptor
官方教程:https://lmax-exchange.github.io/disruptor/user-guide/index.html
1.1.2 Java中線程安全隊(duì)列
JDK 中常見(jiàn)的線程安全的隊(duì)列如下:
隊(duì)列名字 | 鎖 | 是否有界 |
---|---|---|
ArrayBlockingQueue | 加鎖(ReentrantLock) | 有界 |
LinkedBlockingQueue | 加鎖(ReentrantLock) | 有界 |
LinkedTransferQueue | 無(wú)鎖(CAS) | 無(wú)界 |
ConcurrentLinkedQueue | 無(wú)鎖(CAS) | 無(wú)界 |
從上表中可以看出:這些隊(duì)列要不就是加鎖有界,要不就是無(wú)鎖無(wú)界腕让。而加鎖的的隊(duì)列勢(shì)必會(huì)影響性能孤钦,無(wú)界的隊(duì)列又存在內(nèi)存溢出的風(fēng)險(xiǎn)。
因此纯丸,一般情況下偏形,我們都是不建議使用 JDK 內(nèi)置線程安全隊(duì)列。
Disruptor
就不一樣了觉鼻!它在無(wú)鎖的情況下還能保證隊(duì)列有界俊扭,并且還是線程安全的。
1.1.3 Disruptor 核心概念
Disruptor
核心概念:
-
Event
:可以把Event
理解為存放在隊(duì)列中等待消費(fèi)的消息對(duì)象坠陈。
在Disruptor
的語(yǔ)義中统扳,生產(chǎn)者和消費(fèi)者之間進(jìn)行交換的數(shù)據(jù)被稱為事件(Event
)喘帚。它不是一個(gè)被Disruptor
定義的特定類型,而是由Disruptor
的使用者定義并指定咒钟。 -
EventFactory
:事件工廠用于生產(chǎn)事件吹由,我們?cè)诔跏蓟?Disruptor
類的時(shí)候需要用到。 -
EventHandler
:Event
在對(duì)應(yīng)的Handler
中被處理朱嘴,你可以將其理解為生產(chǎn)消費(fèi)者模型中的消費(fèi)者倾鲫。
Disruptor
定義的事件處理接口,由用戶實(shí)現(xiàn)萍嬉,用于處理事件乌昔,是Consumer
的真正實(shí)現(xiàn) -
EventProcessor
:EventProcessor
持有特定消費(fèi)者(Consumer
)的Sequence
,并提供用于調(diào)用事件處理實(shí)現(xiàn)的事件循環(huán)(Event Loop
) -
Disruptor
:事件的生產(chǎn)和消費(fèi)需要用到Disruptor
對(duì)象壤追。 -
RingBuffer
:RingBuffer
(環(huán)形數(shù)組)用于保存事件
磕道。
如其名,環(huán)形的緩沖區(qū)行冰。曾經(jīng)RingBuffer
是Disruptor
中的最主要的對(duì)象溺蕉,但從3.0版本開(kāi)始,其職責(zé)被簡(jiǎn)化為僅僅負(fù)責(zé)對(duì)通過(guò)Disruptor
進(jìn)行交換的數(shù)據(jù)(事件)進(jìn)行存儲(chǔ)和更新悼做。在一些更高級(jí)的應(yīng)用場(chǎng)景中疯特,Ring Buffer
可以由用戶的自定義實(shí)現(xiàn)來(lái)完全替代。 -
WaitStrategy
:等待策略肛走。決定了沒(méi)有事件可以消費(fèi)的時(shí)候漓雅,事件消費(fèi)者如何等待新事件的到來(lái)。定義Consumer
如何進(jìn)行等待下一個(gè)事件的策略朽色。(注:Disruptor
定義了多種不同的策略邻吞,針對(duì)不同的場(chǎng)景,提供了不一樣的性能表現(xiàn)) -
Producer
:生產(chǎn)者葫男,只是泛指調(diào)用Disruptor
發(fā)布事件的用戶代碼吃衅,Disruptor
沒(méi)有定義特定接口或類型 -
ProducerType
:指定是單個(gè)事件發(fā)布者模式
還是多個(gè)事件發(fā)布者模式
(發(fā)布者和生產(chǎn)者的意思類似)。 -
Sequencer
:Sequencer
是Disruptor
的真正核心腾誉。此接口有兩個(gè)實(shí)現(xiàn)類 -SingleProducerSequencer
、MultiProducerSequencer
峻呕,它們定義在生產(chǎn)者和消費(fèi)者之間快速利职、正確地傳遞數(shù)據(jù)的并發(fā)算法。 -
Sequence Disruptor
:通過(guò)順序遞增的序號(hào)來(lái)編號(hào)管理通過(guò)其進(jìn)行交換的數(shù)據(jù)(事件)瘦癌,對(duì)數(shù)據(jù)(事件)的處理過(guò)程總是沿著序號(hào)逐個(gè)遞增處理猪贪。一個(gè)Sequence
用于跟蹤標(biāo)識(shí)某個(gè)特定的事件處理者(RingBuffer/Consumer
)的處理進(jìn)度。
雖然一個(gè)AtomicLong
也可以用于標(biāo)識(shí)進(jìn)度讯私,但定義Sequence
來(lái)負(fù)責(zé)該問(wèn)題還有另一個(gè)目的热押,那就是防止不同的Sequence
之間的CPU
緩存?zhèn)喂蚕?Flase Sharing
)問(wèn)題西傀。(注:這是Disruptor
實(shí)現(xiàn)高性能的關(guān)鍵點(diǎn)之一) -
Sequence Barrier
:用于保持對(duì)RingBuffer
的main published Sequence
和Consumer
依賴的其它Consumer
的Sequence
的引用。Sequence Barrier
還定義了決定Consumer
是否還有可處理的事件的邏輯桶癣。
1.2 操作
1.2.1 坐標(biāo)依賴
pom.xml
<dependency>
<groupId>com.lmax</groupId>
<artifactId>disruptor</artifactId>
<version>3.4.4</version>
</dependency>
Gradle:
implementation 'com.lmax:disruptor:3.4.4'
1.2.2 創(chuàng)建事件
我們先來(lái)定義一個(gè)代表日志事件的類:LogEvent 拥褂。
事件中包含了一些和事件相關(guān)的屬性,比如我們這里定義的 LogEvent 對(duì)象中就有一個(gè)用來(lái)表示日志消息內(nèi)容的屬性:message牙寞。
@Data
public class LogEvent {
private String message;
}
我們這里只是為了演示饺鹃,實(shí)際項(xiàng)目中,一個(gè)標(biāo)準(zhǔn)日志事件對(duì)象所包含的屬性肯定不是只有一個(gè) message
1.2.3 創(chuàng)建事件工廠
創(chuàng)建一個(gè)工廠類 LogEventFactory 用來(lái)創(chuàng)建 LogEvent 對(duì)象间雀。
LogEventFactory 繼承 EventFactory
接口并實(shí)現(xiàn)了 newInstance()
方法 悔详。
public class LogEventFactory implements EventFactory<LogEvent> {
@Override
public LogEvent newInstance() {
return new LogEvent();
}
}
1.2.4 創(chuàng)建處理事件Handler--消費(fèi)者
創(chuàng)建一個(gè)用于處理后續(xù)發(fā)布的事件的類:LogEventHandler 。
LogEventHandler 繼承 EventHandler
接口并實(shí)現(xiàn)了 onEvent()
方法 惹挟。
public class LogEventHandler implements EventHandler<LogEvent> {
@Override
public void onEvent(LogEvent logEvent, long sequence, boolean endOfBatch) throws Exception {
System.out.println(logEvent.getMessage());
}
}
EventHandler
接口的 onEvent()
方法共有 3 個(gè)參數(shù):
-
event
:待消費(fèi)/處理
的事件 -
sequence
:正在處理的事件在環(huán)形數(shù)組(RingBuffer)中的位置 -
endOfBatch
:表示這是否是來(lái)自環(huán)形數(shù)組(RingBuffer)中一個(gè)批次的最后一個(gè)事件(批量處理事件)
1.2.5 初始化 Disruptor
1.2.5.1 靜態(tài)類
我們這里定義一個(gè)方法用于獲取 Disruptor 對(duì)象
private static Disruptor<LogEvent> getLogEventDisruptor() {
// 創(chuàng)建 LogEvent 的工廠
LogEventFactory logEventFactory = new LogEventFactory();
// Disruptor 的 RingBuffer 緩存大小
int bufferSize = 1024 * 1024;
// 生產(chǎn)者的線程工廠
ThreadFactory threadFactory = new ThreadFactory() {
final AtomicInteger threadNum = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "LogEventThread" + " [#" + threadNum.incrementAndGet() + "]");
}
};
//實(shí)例化 Disruptor
return new Disruptor<>(
logEventFactory,
bufferSize,
threadFactory,
// 單生產(chǎn)者
ProducerType.SINGLE,
// 阻塞等待策略
new BlockingWaitStrategy());
}
1.2.5.2 配置類
使用配置類的方式
@Configuration
public class MQManager {
@Bean("messageModel")
public RingBuffer<LogEvent> messageModelRingBuffer() {
//定義用于事件處理的線程池茄螃, Disruptor通過(guò)java.util.concurrent.ExecutorSerivce提供的線程來(lái)觸發(fā)consumer的事件處理
// 生產(chǎn)者的線程工廠
ThreadFactory threadFactory = new ThreadFactory() {
final AtomicInteger threadNum = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "LogEventThread" + " [#" + threadNum.incrementAndGet() + "]");
}
};
//指定事件工廠
LogEventFactory factory = new LogEventFactory();
//指定ringbuffer字節(jié)大小,必須為2的N次方(能將求模運(yùn)算轉(zhuǎn)為位運(yùn)算提高效率)连锯,否則將影響效率
int bufferSize = 1024 * 256;
//單線程模式归苍,獲取額外的性能
Disruptor<LogEvent> disruptor = new Disruptor<>(factory,
bufferSize,
threadFactory,
ProducerType.SINGLE,
new BlockingWaitStrategy());
//設(shè)置事件業(yè)務(wù)處理器---消費(fèi)者
//Disruptor 的 handleEventsWith 方法來(lái)綁定處理事件的 Handler 對(duì)象。
disruptor.handleEventsWith(new LogEventHandler ());
// Disruptor 可以設(shè)置多個(gè)處理事件的 Handler萎庭,并且可以靈活的設(shè)置消費(fèi)者的處理順序霜医,串行,并行都是可以的驳规。
//就比如下面的代碼表示 Handler1 和 Handler2 是并行執(zhí)行肴敛,最后再執(zhí)行 Handler3 。
//disruptor.handleEventsWith(new Handler1(), new Handler2()).handleEventsWith(new Handler3());
// 啟動(dòng)disruptor線程
disruptor.start();
//獲取ringbuffer環(huán)吗购,用于接取生產(chǎn)者生產(chǎn)的事件
RingBuffer<LogEvent> ringBuffer = disruptor.getRingBuffer();
return ringBuffer;
}
1.2.5.3 Disruptor 構(gòu)造函數(shù)講解
Disruptor 的推薦使用的構(gòu)造函數(shù)如下:
public class Disruptor<T> {
public Disruptor(
final EventFactory<T> eventFactory,
final int ringBufferSize,
final ThreadFactory threadFactory,
final ProducerType producerType,
final WaitStrategy waitStrategy)
{
this(
RingBuffer.create(producerType, eventFactory, ringBufferSize, waitStrategy),
new BasicExecutor(threadFactory));
}
......
}
我們需要傳遞 5 個(gè)參數(shù):
-
eventFactory
:我們自定義的事件工廠医男。 -
ringBufferSize
:指定RingBuffer
的容量大小。 -
threadFactory
:自定義的線程工廠捻勉。Disruptor 的默認(rèn)線程池是自定義的镀梭,我們只需要傳入線程工廠即可。 -
producerType
:指定是單個(gè)事件發(fā)布者模式還是多個(gè)事件發(fā)布者模式(發(fā)布者和生產(chǎn)者的意思類似踱启,我個(gè)人比較喜歡用發(fā)布者)报账。 -
waitStrategy
:等待策略,決定了沒(méi)有事件可以消費(fèi)的時(shí)候埠偿,事件消費(fèi)者如何等待新事件的到來(lái)透罢。
ProducerType
的源碼如下,它是一個(gè)包含兩個(gè)變量的枚舉類型
-
SINGLE
:?jiǎn)蝹€(gè)事件發(fā)布者模式冠蒋,不需要保證線程安全羽圃。 -
MULTI
:多個(gè)事件發(fā)布者模式,基于 CAS 來(lái)保證線程安全抖剿。
WaitStrategy
(等待策略)接口的實(shí)現(xiàn)類中只有兩個(gè)方法:
-
waitFor()
:等待新事件的到來(lái)朽寞。 -
signalAllWhenBlocking()
:?jiǎn)拘阉械却南M(fèi)者识窿。
public interface WaitStrategy
{
long waitFor(long sequence, Sequence cursor, Sequence dependentSequence, SequenceBarrier barrier)
throws AlertException, InterruptedException, TimeoutException;
void signalAllWhenBlocking();
}
WaitStrategy 的實(shí)現(xiàn)類共有 8 個(gè),也就是說(shuō)共有 8 種等待策略可供選擇脑融。
除了上面介紹的這個(gè)構(gòu)造函數(shù)之外喻频,Disruptor 還有一個(gè)只有 3 個(gè)參數(shù)構(gòu)造函數(shù)。
使用這個(gè)構(gòu)造函數(shù)創(chuàng)建的 Disruptor
對(duì)象會(huì)默認(rèn)使用 ProducerType.MULTI
(多個(gè)事件發(fā)布者模式)和 BlockingWaitStrategy
(阻塞等待策略) 吨掌。
public Disruptor(final EventFactory<T> eventFactory, final int ringBufferSize, final ThreadFactory threadFactory)
{
this(RingBuffer.createMultiProducer(eventFactory, ringBufferSize), new BasicExecutor(threadFactory));
}
1.2.6 發(fā)布事件
1.2.6.1 main方法測(cè)試
//獲取 Disruptor 對(duì)象
Disruptor<LogEvent> disruptor = getLogEventDisruptor();
//綁定處理事件的Handler對(duì)象
disruptor.handleEventsWith(new LogEventHandler());
//啟動(dòng) Disruptor
disruptor.start();
//獲取保存事件的環(huán)形數(shù)組(RingBuffer)
RingBuffer<LogEvent> ringBuffer = disruptor.getRingBuffer();
//發(fā)布 10w 個(gè)事件
for (int i = 1; i <= 100000; i++) {
// 通過(guò)調(diào)用 RingBuffer 的 next() 方法獲取下一個(gè)空閑事件槽的序號(hào)
long sequence = ringBuffer.next();
try {
LogEvent logEvent = ringBuffer.get(sequence);
// 初始化 Event半抱,對(duì)其賦值
logEvent.setMessage("這是第%d條日志消息".formatted(i));
} finally {
// 發(fā)布事件
ringBuffer.publish(sequence);
}
}
// 關(guān)閉 Disruptor
disruptor.shutdown();
1.2.6.2 使用配置方式
public interface DisruptorMqService {
/**
* 消息
* @param message
*/
void sayHelloMq(String message);
}
@Slf4j
@Component
@Service
public class DisruptorMqServiceImpl implements DisruptorMqService {
@Autowired
private RingBuffer<LogEvent> messageModelRingBuffer;
@Override
public void sayHelloMq(String message) {
log.info("record the message: {}",message);
//獲取下一個(gè)Event槽的下標(biāo)
long sequence = messageModelRingBuffer.next();
try {
//給Event填充數(shù)據(jù)
MessageModel event = messageModelRingBuffer.get(sequence);
event.setMessage(message);
log.info("往消息隊(duì)列中添加消息:{}", event);
} catch (Exception e) {
log.error("failed to add event to messageModelRingBuffer for : e = {},{}",e,e.getMessage());
} finally {
//發(fā)布Event,激活觀察者去消費(fèi)膜宋,將sequence傳遞給改消費(fèi)者
//注意最后的publish方法必須放在finally中以確保必須得到調(diào)用窿侈;如果某個(gè)請(qǐng)求的sequence未被提交將會(huì)堵塞后續(xù)的發(fā)布操作或者其他的producer
messageModelRingBuffer.publish(sequence);
}
}
}