Disruptor是什么
線程間通信的消息組件,類似java里的阻塞隊(duì)列(BlockingQueue),與BlockingQueue的異同:
- 同:目的相同,都是為了在同一進(jìn)程的線程間傳輸數(shù)據(jù)荷逞。
- 異:對消費(fèi)者多播事件坟冲;預(yù)分配事件內(nèi)存撤嫩;可選無鎖
使用場景
- 你是否有這樣的應(yīng)用場景,需要高性能的線程間通信的隊(duì)列?
- MPSC 多生產(chǎn)單消費(fèi)
- SPSC 單生產(chǎn)單消費(fèi)
- SPMC 單生產(chǎn) 多消費(fèi)
Disruptor的核心組成
- 消息
- 存放消息的容器
- 消息的生產(chǎn)者
- 消息的消費(fèi)者
Disruptor的核心流程
- 構(gòu)建Disruptor
1.1 指定ringBuffer的大小,隊(duì)列的容量是多大
1.2 指定EventFactory - Disruptor中添加消費(fèi)者
- 啟動Disruptor
- producer 往Disruptor中投遞消息.
- Disruptor中有消費(fèi)后,消費(fèi)者開始消費(fèi)消息
Disruptor的入門使用
事件類:
public class LongEvent {
private long value;
public void set(long value)
{
this.value = value;
}
@Override
public String toString() {
return "LongEvent{" +
"value=" + value +
'}';
}
}
事件工廠類:
public class LongEventFactory implements EventFactory<LongEvent>
{
public LongEvent newInstance()
{
return new LongEvent();
}
}
事件消費(fèi)者
public class LongEventHandler implements EventHandler<LongEvent> {
@Override
public void onEvent(LongEvent longEvent, long l, boolean b) throws Exception {
System.out.println("----------------"+ longEvent.toString());
}
}
事件轉(zhuǎn)換器
public class LongEventTranslator implements EventTranslatorOneArg<LongEvent,Long> {
@Override
public void translateTo(LongEvent longEvent, long l, Long aLong) {
longEvent.set(aLong);
}
}
事件生產(chǎn)者
public class LongEventProducer {
private RingBuffer<LongEvent> ringBuffer;
public LongEventProducer(RingBuffer<LongEvent> ringBuffer) {
this.ringBuffer = ringBuffer;
}
public void publishData(Long aLong) {
EventTranslatorOneArg<LongEvent, Long> translator = new LongEventTranslator();
ringBuffer.publishEvent(translator, aLong);
}
}
主流程
public static void main(String[] args) throws InterruptedException {
// Executor that will be used to construct new threads for consumers
Executor executor = Executors.newCachedThreadPool();
// 事件工廠,用于創(chuàng)建event
LongEventFactory factory = new LongEventFactory();
// 指定ringbuf的大小,必須是2的整數(shù)倍
int bufferSize = 1024;
// 構(gòu)建一個 Disruptor
Disruptor<LongEvent> disruptor = new Disruptor<>(factory, bufferSize, executor);
// 給disruptor中添加消費(fèi)者
disruptor.handleEventsWith(new LongEventHandler());
// 啟動disruptor
disruptor.start();
//-----------萬事俱備,只欠消息(消息的生產(chǎn)者投遞消息)
// Get the ring buffer from the Disruptor to be used for publishing.
RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer();
LongEventProducer producer = new LongEventProducer(ringBuffer);
// 跟blockqueue 比對一下
for (long l = 0; l<100_0000; l++)
{
long startAt = System.currentTimeMillis();
producer.publishData(l);
long endAt = System.currentTimeMillis();
System.out.println(endAt-startAt);
//Thread.sleep(1000);
}
}