概述
disruptor對(duì)于處理并發(fā)任務(wù)很擅長(zhǎng)抹恳,曾有人測(cè)過署驻,一個(gè)線程里1s內(nèi)可以處理六百萬個(gè)訂單健霹,性能相當(dāng)感人瓶蚂。
這個(gè)框架的結(jié)構(gòu)大概是:數(shù)據(jù)生產(chǎn)端 --> 緩存 --> 消費(fèi)端
緩存中的數(shù)據(jù)是主動(dòng)發(fā)給消費(fèi)端的,而不是像一般的生產(chǎn)者消費(fèi)者模式那樣瞳别,消費(fèi)端去緩存中取數(shù)據(jù)杭攻。
可以將disruptor理解為,基于事件驅(qū)動(dòng)的高效隊(duì)列馆铁、輕量級(jí)的JMS
disruptor學(xué)習(xí)網(wǎng)站:http://ifeve.com/disruptor-getting-started
開發(fā)流程
1.建Event類(數(shù)據(jù)對(duì)象)
2.建立一個(gè)生產(chǎn)數(shù)據(jù)的工廠類锅睛,EventFactory,用于生產(chǎn)數(shù)據(jù)现拒;
3.監(jiān)聽事件類(處理Event數(shù)據(jù))
4.實(shí)例化Disruptor具练,配置參數(shù),綁定事件扛点;
5.建存放數(shù)據(jù)的核心 RingBuffer,生產(chǎn)的數(shù)據(jù)放入 RungBuffer眠饮。
樣例
1.入口
import java.nio.ByteBuffer;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.YieldingWaitStrategy;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;
public class LongEventMain {
public static void main(String[] args) throws Exception {
//創(chuàng)建緩沖池
ExecutorService executor = Executors.newCachedThreadPool();
//創(chuàng)建工廠
LongEventFactory factory = new LongEventFactory();
//創(chuàng)建bufferSize ,也就是RingBuffer大小铜邮,必須是2的N次方
int ringBufferSize = 1024 * 1024; //
/**
//BlockingWaitStrategy 是最低效的策略,但其對(duì)CPU的消耗最小并且在各種不同部署環(huán)境中能提供更加一致的性能表現(xiàn)
WaitStrategy BLOCKING_WAIT = new BlockingWaitStrategy();
//SleepingWaitStrategy 的性能表現(xiàn)跟BlockingWaitStrategy差不多扔茅,對(duì)CPU的消耗也類似秸苗,但其對(duì)生產(chǎn)者線程的影響最小,適合用于異步日志類似的場(chǎng)景
WaitStrategy SLEEPING_WAIT = new SleepingWaitStrategy();
//YieldingWaitStrategy 的性能是最好的惊楼,適合用于低延遲的系統(tǒng)秸讹。在要求極高性能且事件處理線數(shù)小于CPU邏輯核心數(shù)的場(chǎng)景中璃诀,推薦使用此策略蔑匣;例如,CPU開啟超線程的特性
WaitStrategy YIELDING_WAIT = new YieldingWaitStrategy();
*/
//創(chuàng)建disruptor
Disruptor<LongEvent> disruptor =
new Disruptor<LongEvent>(factory, ringBufferSize, executor, ProducerType.SINGLE, new YieldingWaitStrategy());
// 連接消費(fèi)事件方法
disruptor.handleEventsWith(new LongEventHandler());
// 啟動(dòng)
disruptor.start();
//Disruptor 的事件發(fā)布過程是一個(gè)兩階段提交的過程:
//發(fā)布事件
RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer();
LongEventProducer producer = new LongEventProducer(ringBuffer);
//LongEventProducerWithTranslator producer = new LongEventProducerWithTranslator(ringBuffer);
ByteBuffer byteBuffer = ByteBuffer.allocate(8);
for(long l = 0; l<100; l++){
byteBuffer.putLong(0, l);
producer.onData(byteBuffer);
//Thread.sleep(1000);
}
disruptor.shutdown();//關(guān)閉 disruptor氧秘,方法會(huì)堵塞,直至所有的事件都得到處理;
executor.shutdown();//關(guān)閉 disruptor 使用的線程池搔确;如果需要的話,必須手動(dòng)關(guān)閉座硕, disruptor 在 shutdown 時(shí)不會(huì)自動(dòng)關(guān)閉涕蜂;
}
}
2.數(shù)據(jù)對(duì)象:
public class LongEvent {
private long value;
public long getValue() {
return value;
}
public void setValue(long value) {
this.value = value;
}
}
3.Event工廠
import com.lmax.disruptor.EventFactory;
// 需要讓disruptor為我們創(chuàng)建事件,我們同時(shí)還聲明了一個(gè)EventFactory來實(shí)例化Event對(duì)象蜘拉。
public class LongEventFactory implements EventFactory {
@Override
public Object newInstance() {
return new LongEvent();
}
}
4.生產(chǎn)者
import java.nio.ByteBuffer;
import com.lmax.disruptor.RingBuffer;
/**
很明顯的是:當(dāng)用一個(gè)簡(jiǎn)單隊(duì)列來發(fā)布事件的時(shí)候會(huì)牽涉更多的細(xì)節(jié)有鹿,這是因?yàn)槭录?duì)象還需要預(yù)先創(chuàng)建。
發(fā)布事件最少需要兩步:獲取下一個(gè)事件槽并發(fā)布事件(發(fā)布事件的時(shí)候要使用try/finnally保證事件一定會(huì)被發(fā)布)持寄。
如果我們使用RingBuffer.next()獲取一個(gè)事件槽娱俺,那么一定要發(fā)布對(duì)應(yīng)的事件。
如果不能發(fā)布事件荠卷,那么就會(huì)引起Disruptor狀態(tài)的混亂。
-
尤其是在多個(gè)事件生產(chǎn)者的情況下會(huì)導(dǎo)致事件消費(fèi)者失速赖欣,從而不得不重啟應(yīng)用才能會(huì)恢復(fù)屑彻。
*/
public class LongEventProducer {private final RingBuffer<LongEvent> ringBuffer;
public LongEventProducer(RingBuffer<LongEvent> ringBuffer){
this.ringBuffer = ringBuffer;
}/**
- onData用來發(fā)布事件,每調(diào)用一次就發(fā)布一次事件
- 它的參數(shù)會(huì)用過事件傳遞給消費(fèi)者
*/
public void onData(ByteBuffer bb){
//1.可以把ringBuffer看做一個(gè)事件隊(duì)列顶吮,那么next就是得到下面一個(gè)事件槽
long sequence = ringBuffer.next();
try {
//2.用上面的索引取出一個(gè)空的事件用于填充(獲取該序號(hào)對(duì)應(yīng)的事件對(duì)象)
LongEvent event = ringBuffer.get(sequence);
//3.獲取要通過事件傳遞的業(yè)務(wù)數(shù)據(jù)
event.setValue(bb.getLong(0));
} finally {
//4.發(fā)布事件
//注意社牲,最后的 ringBuffer.publish 方法必須包含在 finally 中以確保必須得到調(diào)用;如果某個(gè)請(qǐng)求的 sequence 未被提交悴了,將會(huì)堵塞后續(xù)的發(fā)布操作或者其它的 producer搏恤。
ringBuffer.publish(sequence);
}
}
}
5.消費(fèi)者
import com.lmax.disruptor.EventHandler;
//我們還需要一個(gè)事件消費(fèi)者,也就是一個(gè)事件處理器湃交。這個(gè)事件處理器簡(jiǎn)單地把事件中存儲(chǔ)的數(shù)據(jù)打印到終端:
public class LongEventHandler implements EventHandler<LongEvent> {
@Override
public void onEvent(LongEvent longEvent, long l, boolean b) throws Exception {
System.out.println(longEvent.getValue());
}
}