解讀Disruptor系列--解讀源碼(1)之初始化

解讀Disruptor源碼系列文章將從一個(gè)demo入手,逐步探究Disruptor中的源碼實(shí)現(xiàn)庶橱。
對原理不熟悉的同學(xué)建議先看我之前的兩個(gè)翻譯和導(dǎo)讀文章。
對Disruptor源碼感興趣的同學(xué)厘擂,可以下載我注釋的Disruptor代碼程腹。

完整版Demo

package com.coderjerry.disruptor;

import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.lmax.disruptor.BlockingWaitStrategy;
import com.lmax.disruptor.EventFactory;
import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.EventTranslatorOneArg;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;
import java.lang.Thread.UncaughtExceptionHandler;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ThreadFactory;

/**
* Disruptor例子
* jerry li
*/
public class DisruptorDSLExample {

  /**
  * 用戶自定義事件
  */
  class ExampleEvent{
    Object data ;
    Object ext;
    @Override
    public String toString() {
      return "DisruptorDSLExample[data:"+this.data+",ext:"+ext+"]";
    }
  }

  /**
  * 用戶事件工廠,實(shí)現(xiàn)EventFactory接口焕数,用于初始化事件對象
  */
  class ExampleEventFactory implements EventFactory<ExampleEvent>{

    @Override
    public ExampleEvent newInstance() {
      return new ExampleEvent();
    }
  }

  /**
  * 生產(chǎn)者在發(fā)布事件時(shí)纱昧,使用翻譯器將原始對象設(shè)置到RingBuffer的對象中
  */
  static class IntToExampleEventTranslator implements EventTranslatorOneArg<ExampleEvent, Integer>{

    static final IntToExampleEventTranslator INSTANCE = new IntToExampleEventTranslator();

    @Override
    public void translateTo(ExampleEvent event, long sequence, Integer arg0) {
      event.data = arg0 ;
      System.err.println("put data "+sequence+", "+event+", "+arg0);
    }
  }

  // 用于事件處理(EventProcessor)的線程工廠
  ThreadFactory threadFactory =
      new ThreadFactoryBuilder()
          .setNameFormat("disruptor-executor-%d")
          .setUncaughtExceptionHandler(new UncaughtExceptionHandler() {
            @Override
            public void uncaughtException(Thread t, Throwable e) {
              System.out.println("Thread " + t + "throw " + e);
              e.printStackTrace();
            }
          })

          .build();
  Disruptor disruptor = null;

  // 初始化Disruptor
  public void createDisruptor(final CountDownLatch latch){

    disruptor = new Disruptor<ExampleEvent>(
        new ExampleEventFactory(),  // 用于創(chuàng)建環(huán)形緩沖中對象的工廠
        8,  // 環(huán)形緩沖的大小
        threadFactory,  // 用于事件處理的線程工廠
        ProducerType.MULTI, // 生產(chǎn)者類型,單vs多生產(chǎn)者
        new BlockingWaitStrategy()); // 等待環(huán)形緩沖游標(biāo)的等待策略堡赔,這里使用阻塞模式识脆,也是Disruptor中唯一有鎖的地方

    // 消費(fèi)者模擬-日志處理
    EventHandler journalHandler = new EventHandler() {
      @Override
      public void onEvent(Object event, long sequence, boolean endOfBatch) throws Exception {
        Thread.sleep(8);
        System.out.println(Thread.currentThread().getId() + " process journal " + event + ", seq: " + sequence);
      }
    };

    // 消費(fèi)者模擬-復(fù)制處理
    EventHandler replicateHandler = new EventHandler() {
      @Override
      public void onEvent(Object event, long sequence, boolean endOfBatch) throws Exception {
        Thread.sleep(10);
        System.out.println(Thread.currentThread().getId() + " process replication " + event + ", seq: " + sequence);
      }
    };

    // 消費(fèi)者模擬-解碼處理
    EventHandler unmarshallHandler = new EventHandler() { // 最慢
      @Override
      public void onEvent(Object event, long sequence, boolean endOfBatch) throws Exception {
        Thread.sleep(1*1000);
        if(event instanceof ExampleEvent){
          ((ExampleEvent)event).ext = "unmarshalled ";
        }
        System.out.println(Thread.currentThread().getId() + " process unmarshall " + event + ", seq: " + sequence);

      }
    };

    // 消費(fèi)者處理-結(jié)果上報(bào)碳锈,只有執(zhí)行完以上三種后才能執(zhí)行此消費(fèi)者
    EventHandler resultHandler = new EventHandler() {
      @Override
      public void onEvent(Object event, long sequence, boolean endOfBatch) throws Exception {
        System.out.println(Thread.currentThread().getId() + " =========== result " + event + ", seq: " + sequence);
        latch.countDown();
      }
    };
    // 定義消費(fèi)鏈抖剿,先并行處理日志、解碼和復(fù)制咏雌,再處理結(jié)果上報(bào)
    disruptor
        .handleEventsWith(
          new EventHandler[]{
              journalHandler,
              unmarshallHandler,
              replicateHandler
          }
        )
        .then(resultHandler);
    // 啟動Disruptor
    disruptor.start();

  }

  public void shutdown(){
    disruptor.shutdown();
  }

  public Disruptor getDisruptor(){
    return disruptor;
  }

  public static void main(String[] args) {
    final int events = 20; // 必須為偶數(shù)
    DisruptorDSLExample disruptorDSLExample = new DisruptorDSLExample();
    final CountDownLatch latch = new CountDownLatch(events);

    disruptorDSLExample.createDisruptor(latch);

    final Disruptor disruptor = disruptorDSLExample.getDisruptor();
    // 生產(chǎn)線程0
    Thread produceThread0 = new Thread(new Runnable() {
      @Override
      public void run() {
        int x = 0;
        while(x++ < events / 2){
          disruptor.publishEvent(IntToExampleEventTranslator.INSTANCE, x);
        }
      }
    });
    // 生產(chǎn)線程1
    Thread produceThread1 = new Thread(new Runnable() {
      @Override
      public void run() {
        int x = 0;
        while(x++ < events / 2){
          disruptor.publishEvent(IntToExampleEventTranslator.INSTANCE, x);

        }
      }
    });

    produceThread0.start();
    produceThread1.start();

    try {
      latch.await();
    } catch (InterruptedException e) {
      e.printStackTrace();
    }
    disruptorDSLExample.shutdown();
  }

}

構(gòu)建Disruptor類

可以發(fā)現(xiàn)整個(gè)例子都是圍繞Disruptor這個(gè)類實(shí)現(xiàn)的换团,相關(guān)內(nèi)容可參見官方文檔Disruptor Wizard悉稠。
其實(shí)不使用Disruptor類也是完全可以的,直接操作RingBuffer更加靈活也更麻煩艘包。Disruptor類提供了操作RingBuffer和設(shè)置消費(fèi)依賴的便捷API的猛,如構(gòu)建Ringbuffer耀盗、設(shè)置消費(fèi)鏈、啟動關(guān)閉Disruptor卦尊、暫停消費(fèi)者叛拷、發(fā)布事件等。
接下來岂却,我們把示例拆開看忿薇。

disruptor = new Disruptor<ExampleEvent>(
    new ExampleEventFactory(),  // 用于創(chuàng)建環(huán)形緩沖中對象的工廠
    8,  // 環(huán)形緩沖的大小
    threadFactory,  // 用于事件處理的線程工廠
    ProducerType.MULTI, // 生產(chǎn)者類型,單vs多生產(chǎn)者
    new BlockingWaitStrategy()); // 等待環(huán)形緩沖游標(biāo)的等待策略躏哩,這里使用阻塞模式署浩,也是Disruptor中唯一有鎖的地方

這里調(diào)用構(gòu)造方法創(chuàng)建了一個(gè)Disruptor對象,實(shí)際上創(chuàng)建了一個(gè)RingBuffer對象和一個(gè)Executor扫尺,并將引入傳入私有化的構(gòu)造方法創(chuàng)建了Disruptor對象筋栋。

// Disruptor.java
public Disruptor(
        final EventFactory<T> eventFactory, // 用于創(chuàng)建環(huán)形緩沖中對象的工廠
        final int ringBufferSize, // 環(huán)形緩沖的大小
        final ThreadFactory threadFactory, // 用于事件處理的線程工廠
        final ProducerType producerType, // 生產(chǎn)者類型,單vs多生產(chǎn)者
        final WaitStrategy waitStrategy) // 等待環(huán)形緩沖游標(biāo)的等待策略
{
    this(
        RingBuffer.create(producerType, eventFactory, ringBufferSize, waitStrategy),
        new BasicExecutor(threadFactory));
}

private Disruptor(final RingBuffer<T> ringBuffer, final Executor executor)
{
    this.ringBuffer = ringBuffer;
    this.executor = executor;
}

// RingBuffer.java
public static <E> RingBuffer<E> create(
        ProducerType producerType,
        EventFactory<E> factory,
        int bufferSize,
        WaitStrategy waitStrategy)
    {
        switch (producerType) // 構(gòu)建RingBuffer時(shí)通過producerType來區(qū)分單生產(chǎn)者或多生產(chǎn)者
        {
            case SINGLE:
                return createSingleProducer(factory, bufferSize, waitStrategy);
            case MULTI:
                return createMultiProducer(factory, bufferSize, waitStrategy);
            default:
                throw new IllegalStateException(producerType.toString());
        }
    }

// 單生產(chǎn)者模式創(chuàng)建RingBuffer
public static <E> RingBuffer<E> createSingleProducer(
    EventFactory<E> factory,
    int bufferSize,
    WaitStrategy waitStrategy)
{
    SingleProducerSequencer sequencer = new SingleProducerSequencer(bufferSize, waitStrategy);

    return new RingBuffer<E>(factory, sequencer);
}

// 多生產(chǎn)者模式創(chuàng)建RingBuffer
public static <E> RingBuffer<E> createMultiProducer(
        EventFactory<E> factory,
        int bufferSize,
        WaitStrategy waitStrategy)
    {
        MultiProducerSequencer sequencer = new MultiProducerSequencer(bufferSize, waitStrategy);

        return new RingBuffer<E>(factory, sequencer);
    }

// RingBuffer構(gòu)造器
RingBuffer(
    EventFactory<E> eventFactory,
    Sequencer sequencer)
{
    super(eventFactory, sequencer);
}

這里注意下器联,在構(gòu)造RingBuffer時(shí)二汛,需要傳入用于創(chuàng)建事件對象的工廠eventFactory和記錄生產(chǎn)者序號的sequencer。根據(jù)生產(chǎn)者是否是多線程生產(chǎn)拨拓,Sequencer又分為單肴颊、多生產(chǎn)者模式,后續(xù)還會講到渣磷。
構(gòu)建Disruptor實(shí)例后婿着,需要設(shè)置Disruptor的消費(fèi)者。

設(shè)置消費(fèi)者

// 消費(fèi)者模擬-日志處理
EventHandler journalHandler = new EventHandler() {
  @Override
  public void onEvent(Object event, long sequence, boolean endOfBatch) throws Exception {
    Thread.sleep(8);
    System.out.println(Thread.currentThread().getId() + " process journal " + event + ", seq: " + sequence);
  }
};

// 消費(fèi)者模擬-復(fù)制處理
EventHandler replicateHandler = new EventHandler() {
  @Override
  public void onEvent(Object event, long sequence, boolean endOfBatch) throws Exception {
    Thread.sleep(10);
    System.out.println(Thread.currentThread().getId() + " process replication " + event + ", seq: " + sequence);
  }
};

// 消費(fèi)者模擬-解碼處理
EventHandler unmarshallHandler = new EventHandler() { // 最慢
  @Override
  public void onEvent(Object event, long sequence, boolean endOfBatch) throws Exception {
    Thread.sleep(1*1000);
    if(event instanceof ExampleEvent){
      ((ExampleEvent)event).ext = "unmarshalled ";
    }
    System.out.println(Thread.currentThread().getId() + " process unmarshall " + event + ", seq: " + sequence);

  }
};

// 消費(fèi)者處理-結(jié)果上報(bào)醋界,只有執(zhí)行完以上三種后才能執(zhí)行此消費(fèi)者
EventHandler resultHandler = new EventHandler() {
  @Override
  public void onEvent(Object event, long sequence, boolean endOfBatch) throws Exception {
    System.out.println(Thread.currentThread().getId() + " =========== result " + event + ", seq: " + sequence);
    latch.countDown();
  }
};

這里使用了兩組消費(fèi)者竟宋,第一組包含三個(gè)消費(fèi)者,第二組包含一個(gè)消費(fèi)者形纺。當(dāng)事件可消費(fèi)后丘侠,只有當(dāng)?shù)谝唤M全部消費(fèi)者都處理完畢后,事件才能被第二組消費(fèi)者處理逐样。

// 定義消費(fèi)鏈蜗字,先并行處理日志、解碼和復(fù)制脂新,再處理結(jié)果上報(bào)
disruptor
    .handleEventsWith(
      new EventHandler[]{
          journalHandler,
          unmarshallHandler,
          replicateHandler
      }
    )
    .then(resultHandler);

啟動Disruptor

消費(fèi)者設(shè)置成功后挪捕,即可啟動Disruptor。

// 啟動Disruptor
disruptor.start();
// Disruptor.java
public RingBuffer<T> start()
{
    checkOnlyStartedOnce();
    for (final ConsumerInfo consumerInfo : consumerRepository)
    {
        consumerInfo.start(executor);
    }

    return ringBuffer;
}

ConsumerRepository這個(gè)類實(shí)現(xiàn)了Iterable接口争便,iterator()方法返回ConsumerInfo集合的迭代器级零。ConsumerInfo是一個(gè)封裝類,對應(yīng)EventBatchProcessor和WorkProcessor有兩種實(shí)現(xiàn)滞乙。EventProcessorInfo對應(yīng)BatchEventProcessor奏纪,保存了與一個(gè)事件處理過程相關(guān)的EventProcessor鉴嗤、EventHandler、SequenceBarrier的引用序调。WorkerPoolInfo對應(yīng)WorkProcessor躬窜,保存了WorkerPool、SequenceBarrier的引用以及代表消費(fèi)者組是否為消費(fèi)者鏈尾的標(biāo)志endOfChain炕置。
如果看不懂,不要著急哈男韧,后續(xù)講到消費(fèi)者的時(shí)候就會明白了朴摊。

// ConsumerRepository.java
class ConsumerRepository<T> implements Iterable<ConsumerInfo>
{
    private final Map<EventHandler<?>, EventProcessorInfo<T>> eventProcessorInfoByEventHandler =
        new IdentityHashMap<EventHandler<?>, EventProcessorInfo<T>>(); // hander引用為key
    private final Map<Sequence, ConsumerInfo> eventProcessorInfoBySequence =
        new IdentityHashMap<Sequence, ConsumerInfo>(); // 處理器的序列引用為key
    private final Collection<ConsumerInfo> consumerInfos = new ArrayList<ConsumerInfo>();

    // 省略代碼若干... 

    @Override
    public Iterator<ConsumerInfo> iterator()
    {
        return consumerInfos.iterator();
    }

}

調(diào)用ConsumerInfo.start()方法,其實(shí)就是啟動了消費(fèi)者線程:

// EventProcessorInfo.java
class EventProcessorInfo<T> implements ConsumerInfo
{

    // 省略代碼若干...
    @Override
    public void start(final Executor executor)
    {
        executor.execute(eventprocessor);

    }
}

// WorkerPoolInfo.java
class WorkerPoolInfo<T> implements ConsumerInfo
{
     // 省略代碼若干...
    @Override
    public void start(final Executor executor)

    {
        workerPool.start(executor);
    }
}

// WorkerPool.java
public final class WorkerPool<T>
{
     // 省略代碼若干...
     public RingBuffer<T> start(final Executor executor)
     {
    if (!started.compareAndSet(false, true))
    {
        throw new IllegalStateException("WorkerPool has already been started and cannot be restarted until halted.");
    }

    final long cursor = ringBuffer.getCursor();
    workSequence.set(cursor);

    for (WorkProcessor<?> processor : workProcessors)
    {
        processor.getSequence().set(cursor);
        executor.execute(processor);
    }

    return ringBuffer;    
}

至此此虑,Disruptor的初始化和啟動就完成了甚纲。主要是完成了RingBuffer數(shù)據(jù)結(jié)構(gòu)的初始化、設(shè)置消費(fèi)者以及啟動朦前。
后續(xù)將繼續(xù)分享消費(fèi)者代碼介杆。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市韭寸,隨后出現(xiàn)的幾起案子春哨,更是在濱河造成了極大的恐慌,老刑警劉巖恩伺,帶你破解...
    沈念sama閱讀 221,695評論 6 515
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件赴背,死亡現(xiàn)場離奇詭異,居然都是意外死亡晶渠,警方通過查閱死者的電腦和手機(jī)凰荚,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 94,569評論 3 399
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來褒脯,“玉大人便瑟,你說我怎么就攤上這事》ǎ” “怎么了到涂?”我有些...
    開封第一講書人閱讀 168,130評論 0 360
  • 文/不壞的土叔 我叫張陵,是天一觀的道長爽彤。 經(jīng)常有香客問我养盗,道長,這世上最難降的妖魔是什么适篙? 我笑而不...
    開封第一講書人閱讀 59,648評論 1 297
  • 正文 為了忘掉前任往核,我火速辦了婚禮,結(jié)果婚禮上嚷节,老公的妹妹穿的比我還像新娘聂儒。我一直安慰自己虎锚,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 68,655評論 6 397
  • 文/花漫 我一把揭開白布衩婚。 她就那樣靜靜地躺著窜护,像睡著了一般。 火紅的嫁衣襯著肌膚如雪非春。 梳的紋絲不亂的頭發(fā)上柱徙,一...
    開封第一講書人閱讀 52,268評論 1 309
  • 那天,我揣著相機(jī)與錄音奇昙,去河邊找鬼护侮。 笑死,一個(gè)胖子當(dāng)著我的面吹牛储耐,可吹牛的內(nèi)容都是我干的羊初。 我是一名探鬼主播,決...
    沈念sama閱讀 40,835評論 3 421
  • 文/蒼蘭香墨 我猛地睜開眼什湘,長吁一口氣:“原來是場噩夢啊……” “哼长赞!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起闽撤,我...
    開封第一講書人閱讀 39,740評論 0 276
  • 序言:老撾萬榮一對情侶失蹤得哆,失蹤者是張志新(化名)和其女友劉穎,沒想到半個(gè)月后腹尖,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體柳恐,經(jīng)...
    沈念sama閱讀 46,286評論 1 318
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 38,375評論 3 340
  • 正文 我和宋清朗相戀三年热幔,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了乐设。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 40,505評論 1 352
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡绎巨,死狀恐怖近尚,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情场勤,我是刑警寧澤戈锻,帶...
    沈念sama閱讀 36,185評論 5 350
  • 正文 年R本政府宣布,位于F島的核電站和媳,受9級特大地震影響格遭,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜留瞳,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,873評論 3 333
  • 文/蒙蒙 一拒迅、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧,春花似錦璧微、人聲如沸作箍。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,357評論 0 24
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽胞得。三九已至,卻和暖如春屹电,著一層夾襖步出監(jiān)牢的瞬間阶剑,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 33,466評論 1 272
  • 我被黑心中介騙來泰國打工危号, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留个扰,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 48,921評論 3 376
  • 正文 我出身青樓葱色,卻偏偏與公主長得像,于是被迫代替她去往敵國和親娘香。 傳聞我的和親對象是個(gè)殘疾皇子苍狰,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,515評論 2 359

推薦閱讀更多精彩內(nèi)容