解讀Disruptor源碼系列文章將從一個(gè)demo入手,逐步探究Disruptor中的源碼實(shí)現(xiàn)庶橱。
對原理不熟悉的同學(xué)建議先看我之前的兩個(gè)翻譯和導(dǎo)讀文章。
對Disruptor源碼感興趣的同學(xué)厘擂,可以下載我注釋的Disruptor代碼程腹。
完整版Demo
package com.coderjerry.disruptor;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.lmax.disruptor.BlockingWaitStrategy;
import com.lmax.disruptor.EventFactory;
import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.EventTranslatorOneArg;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;
import java.lang.Thread.UncaughtExceptionHandler;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ThreadFactory;
/**
* Disruptor例子
* jerry li
*/
public class DisruptorDSLExample {
/**
* 用戶自定義事件
*/
class ExampleEvent{
Object data ;
Object ext;
@Override
public String toString() {
return "DisruptorDSLExample[data:"+this.data+",ext:"+ext+"]";
}
}
/**
* 用戶事件工廠,實(shí)現(xiàn)EventFactory接口焕数,用于初始化事件對象
*/
class ExampleEventFactory implements EventFactory<ExampleEvent>{
@Override
public ExampleEvent newInstance() {
return new ExampleEvent();
}
}
/**
* 生產(chǎn)者在發(fā)布事件時(shí)纱昧,使用翻譯器將原始對象設(shè)置到RingBuffer的對象中
*/
static class IntToExampleEventTranslator implements EventTranslatorOneArg<ExampleEvent, Integer>{
static final IntToExampleEventTranslator INSTANCE = new IntToExampleEventTranslator();
@Override
public void translateTo(ExampleEvent event, long sequence, Integer arg0) {
event.data = arg0 ;
System.err.println("put data "+sequence+", "+event+", "+arg0);
}
}
// 用于事件處理(EventProcessor)的線程工廠
ThreadFactory threadFactory =
new ThreadFactoryBuilder()
.setNameFormat("disruptor-executor-%d")
.setUncaughtExceptionHandler(new UncaughtExceptionHandler() {
@Override
public void uncaughtException(Thread t, Throwable e) {
System.out.println("Thread " + t + "throw " + e);
e.printStackTrace();
}
})
.build();
Disruptor disruptor = null;
// 初始化Disruptor
public void createDisruptor(final CountDownLatch latch){
disruptor = new Disruptor<ExampleEvent>(
new ExampleEventFactory(), // 用于創(chuàng)建環(huán)形緩沖中對象的工廠
8, // 環(huán)形緩沖的大小
threadFactory, // 用于事件處理的線程工廠
ProducerType.MULTI, // 生產(chǎn)者類型,單vs多生產(chǎn)者
new BlockingWaitStrategy()); // 等待環(huán)形緩沖游標(biāo)的等待策略堡赔,這里使用阻塞模式识脆,也是Disruptor中唯一有鎖的地方
// 消費(fèi)者模擬-日志處理
EventHandler journalHandler = new EventHandler() {
@Override
public void onEvent(Object event, long sequence, boolean endOfBatch) throws Exception {
Thread.sleep(8);
System.out.println(Thread.currentThread().getId() + " process journal " + event + ", seq: " + sequence);
}
};
// 消費(fèi)者模擬-復(fù)制處理
EventHandler replicateHandler = new EventHandler() {
@Override
public void onEvent(Object event, long sequence, boolean endOfBatch) throws Exception {
Thread.sleep(10);
System.out.println(Thread.currentThread().getId() + " process replication " + event + ", seq: " + sequence);
}
};
// 消費(fèi)者模擬-解碼處理
EventHandler unmarshallHandler = new EventHandler() { // 最慢
@Override
public void onEvent(Object event, long sequence, boolean endOfBatch) throws Exception {
Thread.sleep(1*1000);
if(event instanceof ExampleEvent){
((ExampleEvent)event).ext = "unmarshalled ";
}
System.out.println(Thread.currentThread().getId() + " process unmarshall " + event + ", seq: " + sequence);
}
};
// 消費(fèi)者處理-結(jié)果上報(bào)碳锈,只有執(zhí)行完以上三種后才能執(zhí)行此消費(fèi)者
EventHandler resultHandler = new EventHandler() {
@Override
public void onEvent(Object event, long sequence, boolean endOfBatch) throws Exception {
System.out.println(Thread.currentThread().getId() + " =========== result " + event + ", seq: " + sequence);
latch.countDown();
}
};
// 定義消費(fèi)鏈抖剿,先并行處理日志、解碼和復(fù)制咏雌,再處理結(jié)果上報(bào)
disruptor
.handleEventsWith(
new EventHandler[]{
journalHandler,
unmarshallHandler,
replicateHandler
}
)
.then(resultHandler);
// 啟動Disruptor
disruptor.start();
}
public void shutdown(){
disruptor.shutdown();
}
public Disruptor getDisruptor(){
return disruptor;
}
public static void main(String[] args) {
final int events = 20; // 必須為偶數(shù)
DisruptorDSLExample disruptorDSLExample = new DisruptorDSLExample();
final CountDownLatch latch = new CountDownLatch(events);
disruptorDSLExample.createDisruptor(latch);
final Disruptor disruptor = disruptorDSLExample.getDisruptor();
// 生產(chǎn)線程0
Thread produceThread0 = new Thread(new Runnable() {
@Override
public void run() {
int x = 0;
while(x++ < events / 2){
disruptor.publishEvent(IntToExampleEventTranslator.INSTANCE, x);
}
}
});
// 生產(chǎn)線程1
Thread produceThread1 = new Thread(new Runnable() {
@Override
public void run() {
int x = 0;
while(x++ < events / 2){
disruptor.publishEvent(IntToExampleEventTranslator.INSTANCE, x);
}
}
});
produceThread0.start();
produceThread1.start();
try {
latch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
disruptorDSLExample.shutdown();
}
}
構(gòu)建Disruptor類
可以發(fā)現(xiàn)整個(gè)例子都是圍繞Disruptor這個(gè)類實(shí)現(xiàn)的换团,相關(guān)內(nèi)容可參見官方文檔Disruptor Wizard悉稠。
其實(shí)不使用Disruptor類也是完全可以的,直接操作RingBuffer更加靈活也更麻煩艘包。Disruptor類提供了操作RingBuffer和設(shè)置消費(fèi)依賴的便捷API的猛,如構(gòu)建Ringbuffer耀盗、設(shè)置消費(fèi)鏈、啟動關(guān)閉Disruptor卦尊、暫停消費(fèi)者叛拷、發(fā)布事件等。
接下來岂却,我們把示例拆開看忿薇。
disruptor = new Disruptor<ExampleEvent>(
new ExampleEventFactory(), // 用于創(chuàng)建環(huán)形緩沖中對象的工廠
8, // 環(huán)形緩沖的大小
threadFactory, // 用于事件處理的線程工廠
ProducerType.MULTI, // 生產(chǎn)者類型,單vs多生產(chǎn)者
new BlockingWaitStrategy()); // 等待環(huán)形緩沖游標(biāo)的等待策略躏哩,這里使用阻塞模式署浩,也是Disruptor中唯一有鎖的地方
這里調(diào)用構(gòu)造方法創(chuàng)建了一個(gè)Disruptor對象,實(shí)際上創(chuàng)建了一個(gè)RingBuffer對象和一個(gè)Executor扫尺,并將引入傳入私有化的構(gòu)造方法創(chuàng)建了Disruptor對象筋栋。
// Disruptor.java
public Disruptor(
final EventFactory<T> eventFactory, // 用于創(chuàng)建環(huán)形緩沖中對象的工廠
final int ringBufferSize, // 環(huán)形緩沖的大小
final ThreadFactory threadFactory, // 用于事件處理的線程工廠
final ProducerType producerType, // 生產(chǎn)者類型,單vs多生產(chǎn)者
final WaitStrategy waitStrategy) // 等待環(huán)形緩沖游標(biāo)的等待策略
{
this(
RingBuffer.create(producerType, eventFactory, ringBufferSize, waitStrategy),
new BasicExecutor(threadFactory));
}
private Disruptor(final RingBuffer<T> ringBuffer, final Executor executor)
{
this.ringBuffer = ringBuffer;
this.executor = executor;
}
// RingBuffer.java
public static <E> RingBuffer<E> create(
ProducerType producerType,
EventFactory<E> factory,
int bufferSize,
WaitStrategy waitStrategy)
{
switch (producerType) // 構(gòu)建RingBuffer時(shí)通過producerType來區(qū)分單生產(chǎn)者或多生產(chǎn)者
{
case SINGLE:
return createSingleProducer(factory, bufferSize, waitStrategy);
case MULTI:
return createMultiProducer(factory, bufferSize, waitStrategy);
default:
throw new IllegalStateException(producerType.toString());
}
}
// 單生產(chǎn)者模式創(chuàng)建RingBuffer
public static <E> RingBuffer<E> createSingleProducer(
EventFactory<E> factory,
int bufferSize,
WaitStrategy waitStrategy)
{
SingleProducerSequencer sequencer = new SingleProducerSequencer(bufferSize, waitStrategy);
return new RingBuffer<E>(factory, sequencer);
}
// 多生產(chǎn)者模式創(chuàng)建RingBuffer
public static <E> RingBuffer<E> createMultiProducer(
EventFactory<E> factory,
int bufferSize,
WaitStrategy waitStrategy)
{
MultiProducerSequencer sequencer = new MultiProducerSequencer(bufferSize, waitStrategy);
return new RingBuffer<E>(factory, sequencer);
}
// RingBuffer構(gòu)造器
RingBuffer(
EventFactory<E> eventFactory,
Sequencer sequencer)
{
super(eventFactory, sequencer);
}
這里注意下器联,在構(gòu)造RingBuffer時(shí)二汛,需要傳入用于創(chuàng)建事件對象的工廠eventFactory和記錄生產(chǎn)者序號的sequencer。根據(jù)生產(chǎn)者是否是多線程生產(chǎn)拨拓,Sequencer又分為單肴颊、多生產(chǎn)者模式,后續(xù)還會講到渣磷。
構(gòu)建Disruptor實(shí)例后婿着,需要設(shè)置Disruptor的消費(fèi)者。
設(shè)置消費(fèi)者
// 消費(fèi)者模擬-日志處理
EventHandler journalHandler = new EventHandler() {
@Override
public void onEvent(Object event, long sequence, boolean endOfBatch) throws Exception {
Thread.sleep(8);
System.out.println(Thread.currentThread().getId() + " process journal " + event + ", seq: " + sequence);
}
};
// 消費(fèi)者模擬-復(fù)制處理
EventHandler replicateHandler = new EventHandler() {
@Override
public void onEvent(Object event, long sequence, boolean endOfBatch) throws Exception {
Thread.sleep(10);
System.out.println(Thread.currentThread().getId() + " process replication " + event + ", seq: " + sequence);
}
};
// 消費(fèi)者模擬-解碼處理
EventHandler unmarshallHandler = new EventHandler() { // 最慢
@Override
public void onEvent(Object event, long sequence, boolean endOfBatch) throws Exception {
Thread.sleep(1*1000);
if(event instanceof ExampleEvent){
((ExampleEvent)event).ext = "unmarshalled ";
}
System.out.println(Thread.currentThread().getId() + " process unmarshall " + event + ", seq: " + sequence);
}
};
// 消費(fèi)者處理-結(jié)果上報(bào)醋界,只有執(zhí)行完以上三種后才能執(zhí)行此消費(fèi)者
EventHandler resultHandler = new EventHandler() {
@Override
public void onEvent(Object event, long sequence, boolean endOfBatch) throws Exception {
System.out.println(Thread.currentThread().getId() + " =========== result " + event + ", seq: " + sequence);
latch.countDown();
}
};
這里使用了兩組消費(fèi)者竟宋,第一組包含三個(gè)消費(fèi)者,第二組包含一個(gè)消費(fèi)者形纺。當(dāng)事件可消費(fèi)后丘侠,只有當(dāng)?shù)谝唤M全部消費(fèi)者都處理完畢后,事件才能被第二組消費(fèi)者處理逐样。
// 定義消費(fèi)鏈蜗字,先并行處理日志、解碼和復(fù)制脂新,再處理結(jié)果上報(bào)
disruptor
.handleEventsWith(
new EventHandler[]{
journalHandler,
unmarshallHandler,
replicateHandler
}
)
.then(resultHandler);
啟動Disruptor
消費(fèi)者設(shè)置成功后挪捕,即可啟動Disruptor。
// 啟動Disruptor
disruptor.start();
// Disruptor.java
public RingBuffer<T> start()
{
checkOnlyStartedOnce();
for (final ConsumerInfo consumerInfo : consumerRepository)
{
consumerInfo.start(executor);
}
return ringBuffer;
}
ConsumerRepository這個(gè)類實(shí)現(xiàn)了Iterable接口争便,iterator()方法返回ConsumerInfo集合的迭代器级零。ConsumerInfo是一個(gè)封裝類,對應(yīng)EventBatchProcessor和WorkProcessor有兩種實(shí)現(xiàn)滞乙。EventProcessorInfo對應(yīng)BatchEventProcessor奏纪,保存了與一個(gè)事件處理過程相關(guān)的EventProcessor鉴嗤、EventHandler、SequenceBarrier的引用序调。WorkerPoolInfo對應(yīng)WorkProcessor躬窜,保存了WorkerPool、SequenceBarrier的引用以及代表消費(fèi)者組是否為消費(fèi)者鏈尾的標(biāo)志endOfChain炕置。
如果看不懂,不要著急哈男韧,后續(xù)講到消費(fèi)者的時(shí)候就會明白了朴摊。
// ConsumerRepository.java
class ConsumerRepository<T> implements Iterable<ConsumerInfo>
{
private final Map<EventHandler<?>, EventProcessorInfo<T>> eventProcessorInfoByEventHandler =
new IdentityHashMap<EventHandler<?>, EventProcessorInfo<T>>(); // hander引用為key
private final Map<Sequence, ConsumerInfo> eventProcessorInfoBySequence =
new IdentityHashMap<Sequence, ConsumerInfo>(); // 處理器的序列引用為key
private final Collection<ConsumerInfo> consumerInfos = new ArrayList<ConsumerInfo>();
// 省略代碼若干...
@Override
public Iterator<ConsumerInfo> iterator()
{
return consumerInfos.iterator();
}
}
調(diào)用ConsumerInfo.start()方法,其實(shí)就是啟動了消費(fèi)者線程:
// EventProcessorInfo.java
class EventProcessorInfo<T> implements ConsumerInfo
{
// 省略代碼若干...
@Override
public void start(final Executor executor)
{
executor.execute(eventprocessor);
}
}
// WorkerPoolInfo.java
class WorkerPoolInfo<T> implements ConsumerInfo
{
// 省略代碼若干...
@Override
public void start(final Executor executor)
{
workerPool.start(executor);
}
}
// WorkerPool.java
public final class WorkerPool<T>
{
// 省略代碼若干...
public RingBuffer<T> start(final Executor executor)
{
if (!started.compareAndSet(false, true))
{
throw new IllegalStateException("WorkerPool has already been started and cannot be restarted until halted.");
}
final long cursor = ringBuffer.getCursor();
workSequence.set(cursor);
for (WorkProcessor<?> processor : workProcessors)
{
processor.getSequence().set(cursor);
executor.execute(processor);
}
return ringBuffer;
}
至此此虑,Disruptor的初始化和啟動就完成了甚纲。主要是完成了RingBuffer數(shù)據(jù)結(jié)構(gòu)的初始化、設(shè)置消費(fèi)者以及啟動朦前。
后續(xù)將繼續(xù)分享消費(fèi)者代碼介杆。