通過這篇文章來記錄和分享對(duì)Disruptor的初步了解認(rèn)識(shí)Disruptor框架巡社。
Disruptor是什么寂拆?
Disruptor是一個(gè)高性能的異步處理框架,或者可以認(rèn)為是線程間通信的高效低延時(shí)的內(nèi)存消息組件,它最大特點(diǎn)是高性能蔬崩,其LMAX架構(gòu)可以獲得每秒6百萬訂單蚁署,用1微秒的延遲獲得吞吐量為100K+便脊。
它是如何實(shí)現(xiàn)高性能的呢?它由于JDK內(nèi)置的隊(duì)列有什么區(qū)別呢光戈?
JDK內(nèi)置內(nèi)存隊(duì)列哪痰?
我們知道,Java內(nèi)置了幾種內(nèi)存消息隊(duì)列久妆,如下所示:
隊(duì)列 | 加鎖方式 | 是否有界 | 數(shù)據(jù)結(jié)構(gòu) |
---|---|---|---|
ArrayBlockingQueue | 加鎖 | 有界 | ArrayList |
LinkedBlockingQueue | 加鎖 | 無界 | LinkedList |
ConcurrentLinkedQueue | CAS | 無界 | LinkedList |
LinkedTransferQueue | CAS | 無界 | LinkedList |
我們知道CAS算法比通過加鎖實(shí)現(xiàn)同步性能高很多晌杰,而上表可以看出基于CAS實(shí)現(xiàn)的隊(duì)列都是無界的,而有界隊(duì)列是通過同步實(shí)現(xiàn)的筷弦。在系統(tǒng)穩(wěn)定性要求比較高的場(chǎng)景下肋演,為了防止生產(chǎn)者速度過快抑诸,如果采用無界隊(duì)列會(huì)最終導(dǎo)致內(nèi)存溢出,只能選擇有界隊(duì)列爹殊。
而有界隊(duì)列只有ArrayBlockingQueue
蜕乡,該隊(duì)列是通過加鎖實(shí)現(xiàn)的,在請(qǐng)求鎖和釋放鎖時(shí)對(duì)性能開銷很大梗夸,這時(shí)候基于有界隊(duì)列的高性能的Disruptor就應(yīng)運(yùn)而生层玲。
Disruptor如何實(shí)現(xiàn)高性能?
Disruptor實(shí)現(xiàn)高性能主要體現(xiàn)了去掉了鎖反症,采用CAS算法辛块,同時(shí)內(nèi)部通過環(huán)形隊(duì)列實(shí)現(xiàn)有界隊(duì)列。
- 環(huán)形數(shù)據(jù)結(jié)構(gòu)
為了避免垃圾回收铅碍,采用數(shù)組而非鏈表憨降。同時(shí),數(shù)組對(duì)處理器的緩存機(jī)制更加友好该酗。 - 元素位置定位
數(shù)組長(zhǎng)度2^n授药,通過位運(yùn)算,加快定位的速度呜魄。下標(biāo)采取遞增的形式悔叽。不用擔(dān)心index溢出的問題。index是long類型爵嗅,即使100萬QPS的處理速度娇澎,也需要30萬年才能用完。 - 無鎖設(shè)計(jì)
每個(gè)生產(chǎn)者或者消費(fèi)者線程睹晒,會(huì)先申請(qǐng)可以操作的元素在數(shù)組中的位置趟庄,申請(qǐng)到之后,直接在該位置寫入或者讀取數(shù)據(jù)伪很。整個(gè)過程通過原子變量CAS戚啥,保證操作的線程安全。
Disruptor可以用來做什么锉试?
當(dāng)前業(yè)界開源組件使用Disruptor的包括Log4j2猫十、Apache Storm等,它可以用來作為高性能的有界內(nèi)存隊(duì)列呆盖,基于生產(chǎn)者消費(fèi)者模式拖云,實(shí)現(xiàn)一個(gè)/多個(gè)生產(chǎn)者對(duì)應(yīng)多個(gè)消費(fèi)者。它也可以認(rèn)為是觀察者模式的一種實(shí)現(xiàn)应又,或者發(fā)布訂閱模式宙项。
同時(shí),Disruptor還允許開發(fā)者使用多線程技術(shù)去創(chuàng)建基于任務(wù)的工作流株扛。Disruptor能用來并行創(chuàng)建任務(wù)尤筐,同時(shí)保證多個(gè)處理過程的有序性邑贴,并且它是沒有鎖的。
為什么要使用Disruptor叔磷?
使用Disruptor拢驾,主要用于對(duì)性能要求高、延遲低的場(chǎng)景改基,它通過“榨干”機(jī)器的性能來換取處理的高性能繁疤。如果你的項(xiàng)目有對(duì)性能要求高,對(duì)延遲要求低的需求秕狰,并且需要一個(gè)無鎖的有界隊(duì)列稠腊,來實(shí)現(xiàn)生產(chǎn)者/消費(fèi)者模式,那么Disruptor是你的不二選擇鸣哀。
怎么用Disruptor架忌?
要學(xué)會(huì)基于Disruptor進(jìn)行編程,我們先了解下大概流程示意圖我衬,其中綠色部分是表示我們需要編寫和實(shí)現(xiàn)的類叹放。
下面我們實(shí)現(xiàn)一個(gè)簡(jiǎn)單的用例,生產(chǎn)者負(fù)責(zé)將輸入的字符串輸出到隊(duì)列挠羔,消費(fèi)者負(fù)責(zé)打印出來井仰。
public class DisruptorTest {
/**
* 消息事件類
*/
public static class MessageEvent{
/**
* 原始消息
*/
private String message;
public String getMessage() {
return message;
}
public void setMessage(String message) {
this.message = message;
}
}
/**
* 消息事件工廠類
*/
public static class MessageEventFactory implements EventFactory<MessageEvent>{
@Override
public MessageEvent newInstance() {
return new MessageEvent();
}
}
/**
* 消息轉(zhuǎn)換類,負(fù)責(zé)將消息轉(zhuǎn)換為事件
*/
public static class MessageEventTranslator implements EventTranslatorOneArg<MessageEvent,String> {
@Override
public void translateTo(MessageEvent messageEvent, long l, String s) {
messageEvent.setMessage(s);
}
}
/**
* 消費(fèi)者線程工廠類
*/
public static class MessageThreadFactory implements ThreadFactory{
@Override
public Thread newThread(Runnable r) {
return new Thread(r,"Simple Disruptor Test Thread");
}
}
/**
* 消息事件處理類破加,這里只打印消息
*/
public static class MessageEventHandler implements EventHandler<MessageEvent>{
@Override
public void onEvent(MessageEvent messageEvent, long l, boolean b) throws Exception {
System.out.println(messageEvent.getMessage());
}
}
/**
* 異常處理類
*/
public static class MessageExceptionHandler implements ExceptionHandler<MessageEvent>{
@Override
public void handleEventException(Throwable ex, long sequence, MessageEvent event) {
ex.printStackTrace();
}
@Override
public void handleOnStartException(Throwable ex) {
ex.printStackTrace();
}
@Override
public void handleOnShutdownException(Throwable ex) {
ex.printStackTrace();
}
}
/**
* 消息生產(chǎn)者類
*/
public static class MessageEventProducer{
private RingBuffer<MessageEvent> ringBuffer;
public MessageEventProducer(RingBuffer<MessageEvent> ringBuffer) {
this.ringBuffer = ringBuffer;
}
/**
* 將接收到的消息輸出到ringBuffer
* @param message
*/
public void onData(String message){
EventTranslatorOneArg<MessageEvent,String> translator = new MessageEventTranslator();
ringBuffer.publishEvent(translator,message);
}
}
public static void main(String[] args) {
String message = "Hello Disruptor!";
int ringBufferSize = 1024;//必須是2的N次方
Disruptor<MessageEvent> disruptor = new Disruptor<MessageEvent>(new MessageEventFactory(),ringBufferSize,new MessageThreadFactory(),ProducerType.SINGLE,new BlockingWaitStrategy());
disruptor.handleEventsWith(new MessageEventHandler());
disruptor.setDefaultExceptionHandler(new MessageExceptionHandler());
RingBuffer<MessageEvent> ringBuffer = disruptor.start();
MessageEventProducer producer = new MessageEventProducer(ringBuffer);
producer.onData(message);
}
}