模式
1.發(fā)布訂閱模式返十,同一事件會(huì)被多個(gè)消費(fèi)者并行消費(fèi)
2.點(diǎn)對(duì)點(diǎn)模式彭谁,同一事件會(huì)被一組消費(fèi)者其中之一消費(fèi)
3.順序消費(fèi);
使用場(chǎng)景
低延遲共虑,高吞吐量愧怜,有界的緩存隊(duì)列
提高吞吐量,減少并發(fā)執(zhí)行上下文之間的延遲并確笨囱粒可預(yù)測(cè)延遲
為什么RingBuffer這么快叫搁?
1.首先是CPU false sharing的解決,Disruptor通過將基本對(duì)象填充冗余基本類型變量來(lái)填充滿整個(gè)緩存行供炎,減少false sharing的概率,這部分沒怎么看懂疾党,Disruptor通過填充失效這個(gè)效果音诫。
(就是一個(gè)緩存行8個(gè)變量,預(yù)設(shè)7個(gè)變量雪位,然后再保存一個(gè)唯一變量竭钝,這樣就不會(huì)出現(xiàn)相同的變量)
2.無(wú)鎖隊(duì)列的實(shí)現(xiàn),對(duì)于傳統(tǒng)并發(fā)隊(duì)列雹洗,至少要維護(hù)兩個(gè)指針香罐,一個(gè)頭指針和一個(gè)尾指針。在并發(fā)訪問修改時(shí)时肿,頭指針和尾指針的維護(hù)不可避免的應(yīng)用了鎖庇茫。Disruptor由于是環(huán)狀隊(duì)列,對(duì)于Producer而言只有頭指針而且鎖是樂觀鎖螃成,在標(biāo)準(zhǔn)Disruptor應(yīng)用中旦签,只有一個(gè)生產(chǎn)者,避免了頭指針鎖的爭(zhēng)用寸宏。所以我們可以理解Disruptor為無(wú)鎖隊(duì)列宁炫。
為什么要用Disruptor?
鎖的成本: 傳統(tǒng)阻塞隊(duì)列使用鎖保證線程安全氮凝。而鎖通過操作系統(tǒng)內(nèi)核的上下文切換實(shí)現(xiàn)羔巢,會(huì)暫停線程去等待鎖直到釋放。執(zhí)行這樣的上下文切換,會(huì)丟失之前保存的數(shù)據(jù)和指令竿秆。由于消費(fèi)者和生產(chǎn)者之間的速度差異炭臭,隊(duì)列總是接近滿或者空的狀態(tài)。這種狀態(tài)會(huì)導(dǎo)致高水平的寫入爭(zhēng)用袍辞。
偽共享問題導(dǎo)致的性能低下鞋仍。
隊(duì)列是垃圾的重要來(lái)源,隊(duì)列中的元素和用于存儲(chǔ)元素的節(jié)點(diǎn)對(duì)象需要進(jìn)行頻繁的重新分配搅吁。
代碼demo
public class MessageEvent<T> {
private T message;
public T getMessage() {
return message;
}
public void setMessage(T message) {
this.message = message;
}
}
public class MessageEventFactory implements EventFactory<MessageEvent> {
@Override
public MessageEvent newInstance() {
return new MessageEvent();
}
}
public class MessageEvenHandler3 implements EventHandler<MessageEvent> {
@Override
public void onEvent(MessageEvent messageEvent, long l, boolean b) throws Exception {
System.out.println("----------------"+messageEvent.getMessage());
}
}
public class MessageEventProducer {
private RingBuffer<MessageEvent> ringBuffer;
public MessageEventProducer(RingBuffer<MessageEvent> ringBuffer) {
this.ringBuffer = ringBuffer;
}
public void onData(String message) {
EventTranslatorOneArg<MessageEvent, String> translator = new MessageEventTranslator();
ringBuffer.publishEvent(translator, message);
}
}
public class MessageEventTranslator implements EventTranslatorOneArg<MessageEvent,String> {
@Override
public void translateTo(MessageEvent messageEvent, long l, String o2) {
messageEvent.setMessage(o2);
}
}
public class MessageExceptionHandler implements ExceptionHandler {
@Override
public void handleEventException(Throwable throwable, long l, Object o) {
throwable.printStackTrace();
}
@Override
public void handleOnStartException(Throwable throwable) {
throwable.printStackTrace();
}
@Override
public void handleOnShutdownException(Throwable throwable) {
throwable.printStackTrace();
}
}
public class MessageThreadFactory implements ThreadFactory {
@Override
public Thread newThread(Runnable r) {
return new Thread(r,"Simple Disruptor Test Thread");
}
}
public class MessageConsumer {
public static void main(String[] args) {
String message = "Hello Disruptor!";
int ringBufferSize = 1024;//必須是2的N次方
Disruptor<MessageEvent> disruptor = new Disruptor<MessageEvent>(new MessageEventFactory(),ringBufferSize,new MessageThreadFactory(), ProducerType.SINGLE,new BlockingWaitStrategy());
//這里用的是單一生成者威创,如果是多生成者的話是另一種模式,自己的類實(shí)現(xiàn)WorkHandler接口谎懦,
//然后這邊調(diào)用 disruptor.handleEventsWithWorkerPool(new MessageEventHandler());
disruptor.handleEventsWith(new MessageEvenHandler3());
disruptor.setDefaultExceptionHandler(new MessageExceptionHandler());
RingBuffer<MessageEvent> ringBuffer = disruptor.start();
MessageEventProducer producer = new MessageEventProducer(ringBuffer);
IntStream.range(0,20).forEach(x->{
producer.onData(x+message);
});
}
}
下面是實(shí)現(xiàn)WorkHandler接口的類
public class MessageEventHandler implements WorkHandler<MessageEvent> {
@Override
public void onEvent(MessageEvent messageEvent) throws Exception {
System.out.println(System.currentTimeMillis()+"------我是1號(hào)消費(fèi)者----------"+messageEvent.getMessage());
}
}