LMAX是什么碳抄?
要說Disruptor需要先說下LMAX,LMAX是一個英國外匯黃金交易所免绿,它是第一家也是唯一一家采用多邊交易設(shè)施Multilateral Trading Facility(MTF)瘦穆,擁有交易所拍照和經(jīng)紀商拍照的歐洲頂級金融公司。而LMAX所用的Disruptor技術(shù)油宜,在一個線程每秒處理6百萬訂單沪饺。沒錯际跪,這個Disruptor就是我們這里的
Disruptor佩脊。
而Disruptor只是LMAX平臺一部分,LMAX是一個新型零售金融交易平臺垫卤,它能夠達到低延遲、高吞吐量(大量交易)出牧。這個系統(tǒng)建立在JVM平臺上穴肘,核心是一個邏輯處理器,每秒能夠處理600百萬訂單舔痕。業(yè)務(wù)邏輯處理器完全運行在內(nèi)存中(in-memory)评抚,使用事件源驅(qū)動方式(event sourcing)豹缀。而業(yè)務(wù)邏輯處理器核心是Disruptor,這是一個并發(fā)組件慨代,能夠在無鎖情況下實現(xiàn)網(wǎng)絡(luò)并發(fā)查詢操作邢笙。他們研究表明,現(xiàn)在所謂的高性能研究方向似乎和現(xiàn)在CPU設(shè)計是相左的侍匙。
什么是Disruptor
Disruptor實現(xiàn)了隊列的功能氮惯,而且是一個有界的隊列。所以應(yīng)用場景自然就是"生產(chǎn)者-消費者"模型了想暗「竞梗可以看下JDK中的BlockingQuery是一個FIFO隊列,生產(chǎn)者(Producer)發(fā)布(Publish)一項事件(Event说莫,消息)時杨箭,消費者(Consumer)能夠獲得通知;當隊列中沒有事件時储狭,消費者會被阻塞互婿,直到生產(chǎn)者發(fā)布了新的事件。而Disruptor不僅僅只是這些:
- 同一個事件可以有多個消費者辽狈,消費者之間可以并行處理慈参,可以相互依賴處理
- 預(yù)分配用于存儲事件的內(nèi)存
- 針對極高的性能目標而實現(xiàn)極度優(yōu)化和無鎖設(shè)計
可能你對這種場景還不是很明白,簡單說就是當需要兩個獨立的處理過程(兩個線程)之間需要傳遞數(shù)據(jù)時稻艰,就可以使用Disruptor懂牧,當然可以使用隊列。
Disruptor中的核心概念
Ring Buffer
環(huán)形緩沖區(qū)尊勿,曾經(jīng)是Disruptor中的核心對象僧凤,不過從3.0版本開始,只負責(zé)對通過Disruptor進行交換的數(shù)據(jù)(事件)進行存儲和更新元扔。在一些高級使用中躯保,Ring Buffer可以由用戶自定義的來代替睬关。Sequence
通過遞增的序號管理進行交換的數(shù)據(jù)(事件)专执,對數(shù)據(jù)(事件)的處理過程總是沿著序號逐個遞增處理的。一個Sequence用于跟蹤標識某個特定的事件處理者(RingBuffer/Consumer)的進度吭服。雖然可以使用AutomicLong標識進度擅羞,但定義Sequence另一個目的是防止CPU緩存?zhèn)喂蚕?Disruptor高性能關(guān)鍵點之一)Sequencer
Sequence是Disruptor的真正核心尸变。這個接口有兩個實現(xiàn)類SingleProducerSequence和MultiProducerSequence,它們定義生產(chǎn)者和消費者之間快速减俏、正確地傳遞數(shù)據(jù)的并發(fā)算法召烂。Sequence Barrier
保持RingBuffer的main published Sequence和Consumer依賴的其它Sequence的引用。Sequence Barrier還定義決定Consumer是否還有可處理的事件邏輯娃承。Event
在Disruptor中奏夫,在生產(chǎn)者和消費者之間進行交換的數(shù)據(jù)稱為事件(Event)怕篷。它不是Disruptor定義的類型,而是使用者自己定義并指定的(可以看作一個Bean)EventHandler
Disruptor定義的事件處理接口酗昼,由用戶實現(xiàn)廊谓,用于處理事件,是Consumer的真正實現(xiàn)EventProcessor
EventProcessor持有特定消費者(Consumer)的Sequence麻削,并提供用來調(diào)用事件處理實現(xiàn)事件循環(huán)(Event loop)Wait Strategy
定義Consumer如何等待下一個事件策略蒸痹。(Disruptor提供了多種策略)Producer
生產(chǎn)者,泛指Disruptor發(fā)布事件的代碼碟婆,Disruptor沒有定義特定的接口或類型电抚。
簡單Demo
1.定義事件
事件(Event)是Disruptor進行數(shù)據(jù)交換的數(shù)據(jù)類型
<pre>
public class PeopleEvent {
private String name;
private Integer age;
private Integer sex;
public void setName(String name) {
this.name = name;
}
public String getName() {
return name;
}
public void setAge(Integer age) {
this.age = age;
}
public void setSex(Integer sex) {
this.sex = sex;
}
public Integer getAge() {
return age;
}
public Integer getSex() {
return sex;
}
}
</pre>
2.定義事件工廠
事件工廠(Event Factory)用來實例化之前的事件(Event),需要實現(xiàn)接口com.lmax.disruptor.EventFactory<>竖共。Disruptor通過EventFactory在RingBuffer中創(chuàng)建Event的實例蝙叛。一個Event實例實際被用作一個"數(shù)據(jù)槽",發(fā)布者發(fā)布前公给,先從RingBuffer獲得一個Event的實例借帘,然后往Event中填充數(shù)據(jù),之后發(fā)布到RingBuffer中淌铐,之后由Consumer獲得該Event實例并從中取出數(shù)據(jù)肺然。
<pre>
public class PeopleEventFactory implements EventFactory<PeopleEvent> {
public PeopleEvent newInstance(){
return new PeopleEvent();
}
}
</pre>
3.定義事件處理的具體實現(xiàn)(業(yè)務(wù)邏輯核心)
需要實現(xiàn)com.lmax.disruptor.EventHandler<>接口,來定義事件處理的具體邏輯
<pre>
public class PeopleEventHandler implements EventHandler<PeopleEvent> {
public void onEvent(PeopleEvent event, long sequence, boolean endOfBatch) throws Exception {
System.out.println("name:" + event.getName()+",sex:" + event.getSex() + ",age:" + event.getAge());
}
}
</pre>
4.組合事件處理流程
<pre>
public class DisruptorDemo {
public static void main(String[] args){
//Executor將用來為消費者構(gòu)建線程
Executor executor = Executors.newCachedThreadPool();
//事件工廠用來創(chuàng)建事件
PeopleEventFactory peopleEventFactory = new PeopleEventFactory();
//指定Ring Buffer大小腿准,2的倍數(shù)
int buffSize = 1024;
/**
* 構(gòu)造Disruptor
* 并發(fā)系統(tǒng)提高性能之一就是單一寫者原則际起,如果代碼中僅有一個事件生產(chǎn)者,可以設(shè)置單一生產(chǎn)者模式來提高系統(tǒng)的性能吐葱。
* 通過ProduceType.SINGLE和ProduceType.MULTI進行控制街望。
*
* 等待策略
* Disruptor默認的等待策略是BlockingWaitStrategy,使用一個鎖和條件變量來控制執(zhí)行和等待弟跑,這是最慢的策略灾前,但也是CPU使用最低
* 和最穩(wěn)定的策略。
* SleepingWaitStrategy:也是CPU使用率低的策略孟辑,它使用循環(huán)等待并且循環(huán)間調(diào)用LockSupport.parkNanos(1)來睡眠哎甲。它的優(yōu)點在于
* 生產(chǎn)線程只需記數(shù),而不執(zhí)行任何命令饲嗽,并且沒有條件變量的消耗炭玫。但是對象從生產(chǎn)者到消費者傳遞延遲變大了,適用于不需要低延遲的場景貌虾,
* YieldingWaitStrategy:是可以被用作低延遲系統(tǒng)的兩個策略之一础嫡,這種策略在低延遲同時會增加CPU運算量。YieldingWaitStrategy
* 會循環(huán)等待sequence增加到合適值,循環(huán)調(diào)用Tread.yield()允許其它準備好的線程執(zhí)行榴鼎。如果高性能而且事件消費者線程比邏輯內(nèi)核少的
* 時候,推薦使用YieldingWaitStrategy策略晚唇。
* BusySpinWaitStrategy是性能最高的策略巫财,同時也是對部署環(huán)境要求最高的策略。這個策略最好用在時間處理線程比物理內(nèi)核數(shù)目還要少的時候哩陕。
*/
Disruptor<PeopleEvent> disruptor = new Disruptor<PeopleEvent>(peopleEventFactory,buffSize,executor,
ProducerType.SINGLE,new YieldingWaitStrategy());
//鏈接處理器
disruptor.handleEventsWith(new PeopleEventHandler());
//啟動Disruptor平项,啟動所有線程
disruptor.start();
//從Disruptor獲取RingBuffer,用來發(fā)布
RingBuffer<PeopleEvent> ringBuffer = disruptor.getRingBuffer();
PeopleEventProducer producer = new PeopleEventProducer(ringBuffer);
Map<String,Object> map = new HashMap<String, Object>();
map.put("name","yjz");
map.put("age",25);
map.put("sex",1);
producer.onData(map);
}
}
</pre>
5.事件發(fā)布
事件發(fā)布包括三個步驟:
- 從RingBuffer獲取一個可以寫入事件的序號
- 獲取對應(yīng)的事件對象悍及,將數(shù)據(jù)寫入事件對象
- 將事件提交到RingBuffer
事件在提交之后才會通知EventProcessor進行處理闽瓢。
<pre>
long sequence = ringBuffer.next();
try {
PeopleEvent event = ringBuffer.get(sequence);
event.setName(data.get("name").toString());
event.setAge((Integer) data.get("age"));
event.setSex((Integer) data.get("sex"));
}finally {
ringBuffer.publish(sequence);
}
</code></pre>
RingBuffer.publish必須在finally來確保調(diào)用,如果某個sequence未被提交心赶,將會阻塞還需發(fā)布或其它的producer扣讼。
Disruptor提供了另一種方式簡化上述操作,來確保publish總被調(diào)用:
<pre>
private static final EventTranslatorOneArg<PeopleEvent,Map<String,Object>> tranlator = new EventTranslatorOneArg<PeopleEvent, Map<String, Object>>() {
public void translateTo(PeopleEvent event, long sequence, Map<String, Object> data) {
event.setName(data.get("name").toString());
event.setAge((Integer) data.get("age"));
event.setSex((Integer) data.get("sex"));
}
};
/**
* onData用來發(fā)布事件缨叫,每調(diào)用一次就發(fā)布一次事件椭符,它的參數(shù)會通過事件傳遞給消費者
*/
public void onData(Map<String,Object> data){
ringBuffer.publishEvent(tranlator,data);
}
</code></pre>
事件發(fā)布完整代碼
<pre>
/**
Created by yangjianzhang on 17/2/4.
PeopleEventProducer是一個生成事件的源,在這里面通過讀取磁盤IO耻姥、數(shù)據(jù)庫销钝、network等。當事件源會在IO讀取一部分數(shù)據(jù)時候觸發(fā)事件(觸發(fā)事件
-
不是自動觸發(fā)的琐簇,需要在讀取到數(shù)據(jù)的時候自己觸發(fā)事件并發(fā)布)
*/
public class PeopleEventProducer {private final RingBuffer<PeopleEvent> ringBuffer;
public PeopleEventProducer(RingBuffer ringBuffer){
this.ringBuffer = ringBuffer;
}private static final EventTranslatorOneArg<PeopleEvent,Map<String,Object>> tranlator = new EventTranslatorOneArg<PeopleEvent, Map<String, Object>>() {
public void translateTo(PeopleEvent event, long sequence, Map<String, Object> data) {
event.setName(data.get("name").toString());
event.setAge((Integer) data.get("age"));
event.setSex((Integer) data.get("sex"));
}
};/**
- onData用來發(fā)布事件蒸健,每調(diào)用一次就發(fā)布一次事件,它的參數(shù)會通過事件傳遞給消費者
*/
public void onData(Map<String,Object> data){
ringBuffer.publishEvent(tranlator,data);
}
}
</code></pre>
- onData用來發(fā)布事件蒸健,每調(diào)用一次就發(fā)布一次事件,它的參數(shù)會通過事件傳遞給消費者
6.關(guān)閉Disruptor
<pre>
disruptor.shutdown();
executor.shutdown();
</pre>
這里只是Disruptor的一個簡介婉商,深入了解后再繼續(xù)分享似忧。
關(guān)注我
歡迎關(guān)注我的公眾號,會定期推送優(yōu)質(zhì)技術(shù)文章据某,讓我們一起進步橡娄、一起成長!
公眾號搜索:data_tc
或直接掃碼:??