Java集合之Disruptor 介紹

1 Disruptor

1.1 簡(jiǎn)介

1.1.1 定義

Disruptor 是一個(gè)開(kāi)源的高性能內(nèi)存隊(duì)列檐涝,由英國(guó)外匯交易公司 LMAX 開(kāi)發(fā)的虽界,獲得了 2011 年的 Oracle 官方的 Duke's Choice Awards(Duke 選擇大獎(jiǎng))挽唉。

Disruptor 提供的功能類似于 Kafka晕拆、RocketMQ 這類分布式隊(duì)列淤堵,不過(guò)捶枢,其作為范圍是 JVM(內(nèi)存),Disruptor 解決了 JDK 內(nèi)置線程安全隊(duì)列的性能和內(nèi)存安全問(wèn)題火邓,Disruptor 有個(gè)最大的優(yōu)點(diǎn)就是快

Disruptor被設(shè)計(jì)用于在生產(chǎn)者消費(fèi)者producer-consumer problem丹弱,簡(jiǎn)稱PCP)問(wèn)題上獲得盡量高的吞吐量(TPS)和盡量低的延遲
Disruptor是LMAX在線交易平臺(tái)的關(guān)鍵組成部分,LMAX平臺(tái)使用該框架對(duì)訂單處理速度能達(dá)到600萬(wàn)TPS铲咨,除金融領(lǐng)域之外躲胳,其他一般的應(yīng)用中都可以用到 Disruptor,它可以帶來(lái)顯著的性能提升纤勒。其實(shí) Disruptor 與其說(shuō)是一個(gè)框架坯苹,不如說(shuō)是一種設(shè)計(jì)思路,這個(gè)設(shè)計(jì)思路對(duì)于存在并發(fā)摇天、緩沖區(qū)粹湃、生產(chǎn)者—消費(fèi)者模型、事務(wù)處理這些元素的程序來(lái)說(shuō)泉坐,Disruptor 提出了一種大幅提升性能(TPS)的方案为鳄。

github 地址

Github 地址:https://github.com/LMAX-Exchange/disruptor
官方教程:https://lmax-exchange.github.io/disruptor/user-guide/index.html

1.1.2 Java中線程安全隊(duì)列

JDK 中常見(jiàn)的線程安全的隊(duì)列如下:

隊(duì)列名字 是否有界
ArrayBlockingQueue 加鎖(ReentrantLock) 有界
LinkedBlockingQueue 加鎖(ReentrantLock) 有界
LinkedTransferQueue 無(wú)鎖(CAS) 無(wú)界
ConcurrentLinkedQueue 無(wú)鎖(CAS) 無(wú)界

從上表中可以看出:這些隊(duì)列要不就是加鎖有界,要不就是無(wú)鎖無(wú)界腕让。而加鎖的的隊(duì)列勢(shì)必會(huì)影響性能孤钦,無(wú)界的隊(duì)列又存在內(nèi)存溢出的風(fēng)險(xiǎn)。
因此纯丸,一般情況下偏形,我們都是不建議使用 JDK 內(nèi)置線程安全隊(duì)列。
Disruptor 就不一樣了觉鼻!它在無(wú)鎖的情況下還能保證隊(duì)列有界俊扭,并且還是線程安全的。

1.1.3 Disruptor 核心概念

Disruptor 核心概念:

  • Event:可以把 Event 理解為存放在隊(duì)列中等待消費(fèi)的消息對(duì)象坠陈。
    Disruptor 的語(yǔ)義中统扳,生產(chǎn)者和消費(fèi)者之間進(jìn)行交換的數(shù)據(jù)被稱為事件(Event)喘帚。它不是一個(gè)被 Disruptor 定義的特定類型,而是由 Disruptor 的使用者定義并指定咒钟。
  • EventFactory:事件工廠用于生產(chǎn)事件吹由,我們?cè)诔跏蓟?Disruptor 類的時(shí)候需要用到。
  • EventHandlerEvent 在對(duì)應(yīng)的 Handler 中被處理朱嘴,你可以將其理解為生產(chǎn)消費(fèi)者模型中的消費(fèi)者倾鲫。
    Disruptor 定義的事件處理接口,由用戶實(shí)現(xiàn)萍嬉,用于處理事件乌昔,是 Consumer 的真正實(shí)現(xiàn)
  • EventProcessorEventProcessor 持有特定消費(fèi)者(Consumer)的 Sequence,并提供用于調(diào)用事件處理實(shí)現(xiàn)的事件循環(huán)(Event Loop)
  • Disruptor:事件的生產(chǎn)和消費(fèi)需要用到 Disruptor 對(duì)象壤追。
  • RingBufferRingBuffer(環(huán)形數(shù)組)用于保存事件磕道。
    如其名,環(huán)形的緩沖區(qū)行冰。曾經(jīng) RingBufferDisruptor 中的最主要的對(duì)象溺蕉,但從3.0版本開(kāi)始,其職責(zé)被簡(jiǎn)化為僅僅負(fù)責(zé)對(duì)通過(guò) Disruptor 進(jìn)行交換的數(shù)據(jù)(事件)進(jìn)行存儲(chǔ)和更新悼做。在一些更高級(jí)的應(yīng)用場(chǎng)景中疯特,Ring Buffer 可以由用戶的自定義實(shí)現(xiàn)來(lái)完全替代。
  • WaitStrategy:等待策略肛走。決定了沒(méi)有事件可以消費(fèi)的時(shí)候漓雅,事件消費(fèi)者如何等待新事件的到來(lái)。定義 Consumer 如何進(jìn)行等待下一個(gè)事件的策略朽色。(注:Disruptor 定義了多種不同的策略邻吞,針對(duì)不同的場(chǎng)景,提供了不一樣的性能表現(xiàn))
  • Producer:生產(chǎn)者葫男,只是泛指調(diào)用 Disruptor 發(fā)布事件的用戶代碼吃衅,Disruptor 沒(méi)有定義特定接口或類型
  • ProducerType:指定是單個(gè)事件發(fā)布者模式還是多個(gè)事件發(fā)布者模式(發(fā)布者和生產(chǎn)者的意思類似)。
  • SequencerSequencerDisruptor 的真正核心腾誉。此接口有兩個(gè)實(shí)現(xiàn)類 - SingleProducerSequencerMultiProducerSequencer 峻呕,它們定義在生產(chǎn)者和消費(fèi)者之間快速利职、正確地傳遞數(shù)據(jù)的并發(fā)算法。
  • Sequence Disruptor:通過(guò)順序遞增的序號(hào)來(lái)編號(hào)管理通過(guò)其進(jìn)行交換的數(shù)據(jù)(事件)瘦癌,對(duì)數(shù)據(jù)(事件)的處理過(guò)程總是沿著序號(hào)逐個(gè)遞增處理猪贪。一個(gè) Sequence 用于跟蹤標(biāo)識(shí)某個(gè)特定的事件處理者( RingBuffer/Consumer )的處理進(jìn)度。
    雖然一個(gè) AtomicLong 也可以用于標(biāo)識(shí)進(jìn)度讯私,但定義 Sequence 來(lái)負(fù)責(zé)該問(wèn)題還有另一個(gè)目的热押,那就是防止不同的 Sequence 之間的 CPU 緩存?zhèn)喂蚕?Flase Sharing)問(wèn)題西傀。(注:這是 Disruptor 實(shí)現(xiàn)高性能的關(guān)鍵點(diǎn)之一)
  • Sequence Barrier:用于保持對(duì) RingBuffermain published SequenceConsumer 依賴的其它 ConsumerSequence 的引用。Sequence Barrier 還定義了決定 Consumer 是否還有可處理的事件的邏輯桶癣。
image.png

1.2 操作

1.2.1 坐標(biāo)依賴

pom.xml

<dependency>
    <groupId>com.lmax</groupId>
    <artifactId>disruptor</artifactId>
    <version>3.4.4</version>
</dependency>

Gradle:

implementation 'com.lmax:disruptor:3.4.4'

1.2.2 創(chuàng)建事件

我們先來(lái)定義一個(gè)代表日志事件的類:LogEvent 拥褂。

事件中包含了一些和事件相關(guān)的屬性,比如我們這里定義的 LogEvent 對(duì)象中就有一個(gè)用來(lái)表示日志消息內(nèi)容的屬性:message牙寞。

@Data
public class LogEvent {
    private String message;
}

我們這里只是為了演示饺鹃,實(shí)際項(xiàng)目中,一個(gè)標(biāo)準(zhǔn)日志事件對(duì)象所包含的屬性肯定不是只有一個(gè) message

1.2.3 創(chuàng)建事件工廠

創(chuàng)建一個(gè)工廠類 LogEventFactory 用來(lái)創(chuàng)建 LogEvent 對(duì)象间雀。
LogEventFactory 繼承 EventFactory 接口并實(shí)現(xiàn)了 newInstance() 方法 悔详。

public class LogEventFactory implements EventFactory<LogEvent> {
    @Override
    public LogEvent newInstance() {
        return new LogEvent();
    }
}

1.2.4 創(chuàng)建處理事件Handler--消費(fèi)者

創(chuàng)建一個(gè)用于處理后續(xù)發(fā)布的事件的類:LogEventHandler 。
LogEventHandler 繼承 EventHandler 接口并實(shí)現(xiàn)了 onEvent() 方法 惹挟。

public class LogEventHandler implements EventHandler<LogEvent> {
    @Override
    public void onEvent(LogEvent logEvent, long sequence, boolean endOfBatch) throws Exception {
        System.out.println(logEvent.getMessage());
    }
}

EventHandler 接口的 onEvent() 方法共有 3 個(gè)參數(shù):

  • event:待消費(fèi)/處理的事件
  • sequence:正在處理的事件在環(huán)形數(shù)組(RingBuffer)中的位置
  • endOfBatch:表示這是否是來(lái)自環(huán)形數(shù)組(RingBuffer)中一個(gè)批次的最后一個(gè)事件(批量處理事件)

1.2.5 初始化 Disruptor

1.2.5.1 靜態(tài)類

我們這里定義一個(gè)方法用于獲取 Disruptor 對(duì)象

private static Disruptor<LogEvent> getLogEventDisruptor() {
    // 創(chuàng)建 LogEvent 的工廠
    LogEventFactory logEventFactory = new LogEventFactory();
    // Disruptor 的 RingBuffer 緩存大小
    int bufferSize = 1024 * 1024;
    // 生產(chǎn)者的線程工廠
    ThreadFactory threadFactory = new ThreadFactory() {
        final AtomicInteger threadNum = new AtomicInteger(0);

        @Override
        public Thread newThread(Runnable r) {
            return new Thread(r, "LogEventThread" + " [#" + threadNum.incrementAndGet() + "]");
        }
    };
    //實(shí)例化 Disruptor
    return new Disruptor<>(
            logEventFactory,
            bufferSize,
            threadFactory,
            // 單生產(chǎn)者
            ProducerType.SINGLE,
            // 阻塞等待策略
            new BlockingWaitStrategy());
}

1.2.5.2 配置類

使用配置類的方式

@Configuration
public class MQManager {

    @Bean("messageModel")
    public RingBuffer<LogEvent> messageModelRingBuffer() {
        //定義用于事件處理的線程池茄螃, Disruptor通過(guò)java.util.concurrent.ExecutorSerivce提供的線程來(lái)觸發(fā)consumer的事件處理
        // 生產(chǎn)者的線程工廠
    ThreadFactory threadFactory = new ThreadFactory() {
        final AtomicInteger threadNum = new AtomicInteger(0);

        @Override
        public Thread newThread(Runnable r) {
            return new Thread(r, "LogEventThread" + " [#" + threadNum.incrementAndGet() + "]");
        }
    };

        //指定事件工廠
        LogEventFactory factory = new LogEventFactory();

        //指定ringbuffer字節(jié)大小,必須為2的N次方(能將求模運(yùn)算轉(zhuǎn)為位運(yùn)算提高效率)连锯,否則將影響效率
        int bufferSize = 1024 * 256;

        //單線程模式归苍,獲取額外的性能
        Disruptor<LogEvent> disruptor = new Disruptor<>(factory,
             bufferSize, 
             threadFactory,
             ProducerType.SINGLE, 
             new BlockingWaitStrategy());

        //設(shè)置事件業(yè)務(wù)處理器---消費(fèi)者
        //Disruptor 的 handleEventsWith 方法來(lái)綁定處理事件的 Handler 對(duì)象。
       
        disruptor.handleEventsWith(new LogEventHandler ());
      // Disruptor 可以設(shè)置多個(gè)處理事件的 Handler萎庭,并且可以靈活的設(shè)置消費(fèi)者的處理順序霜医,串行,并行都是可以的驳规。
       //就比如下面的代碼表示 Handler1 和 Handler2 是并行執(zhí)行肴敛,最后再執(zhí)行 Handler3 。
       //disruptor.handleEventsWith(new Handler1(), new Handler2()).handleEventsWith(new Handler3());
       
        // 啟動(dòng)disruptor線程
        disruptor.start();

        //獲取ringbuffer環(huán)吗购,用于接取生產(chǎn)者生產(chǎn)的事件
        RingBuffer<LogEvent> ringBuffer = disruptor.getRingBuffer();

        return ringBuffer;
    }

1.2.5.3 Disruptor 構(gòu)造函數(shù)講解

Disruptor 的推薦使用的構(gòu)造函數(shù)如下:

public class Disruptor<T> {
  public Disruptor(
          final EventFactory<T> eventFactory,
          final int ringBufferSize,
          final ThreadFactory threadFactory,
          final ProducerType producerType,
          final WaitStrategy waitStrategy)
  {
      this(
          RingBuffer.create(producerType, eventFactory, ringBufferSize, waitStrategy),
          new BasicExecutor(threadFactory));
  }

......
}

我們需要傳遞 5 個(gè)參數(shù):

  • eventFactory:我們自定義的事件工廠医男。
  • ringBufferSize:指定 RingBuffer 的容量大小。
  • threadFactory:自定義的線程工廠捻勉。Disruptor 的默認(rèn)線程池是自定義的镀梭,我們只需要傳入線程工廠即可。
  • producerType:指定是單個(gè)事件發(fā)布者模式還是多個(gè)事件發(fā)布者模式(發(fā)布者和生產(chǎn)者的意思類似踱启,我個(gè)人比較喜歡用發(fā)布者)报账。
  • waitStrategy:等待策略,決定了沒(méi)有事件可以消費(fèi)的時(shí)候埠偿,事件消費(fèi)者如何等待新事件的到來(lái)透罢。

ProducerType 的源碼如下,它是一個(gè)包含兩個(gè)變量的枚舉類型

  • SINGLE:?jiǎn)蝹€(gè)事件發(fā)布者模式冠蒋,不需要保證線程安全羽圃。
  • MULTI:多個(gè)事件發(fā)布者模式,基于 CAS 來(lái)保證線程安全抖剿。

WaitStrategy (等待策略)接口的實(shí)現(xiàn)類中只有兩個(gè)方法:

  • waitFor():等待新事件的到來(lái)朽寞。
  • signalAllWhenBlocking():?jiǎn)拘阉械却南M(fèi)者识窿。
public interface WaitStrategy
{
    long waitFor(long sequence, Sequence cursor, Sequence dependentSequence, SequenceBarrier barrier)
        throws AlertException, InterruptedException, TimeoutException;
    void signalAllWhenBlocking();
}

WaitStrategy 的實(shí)現(xiàn)類共有 8 個(gè),也就是說(shuō)共有 8 種等待策略可供選擇脑融。


image.png

除了上面介紹的這個(gè)構(gòu)造函數(shù)之外喻频,Disruptor 還有一個(gè)只有 3 個(gè)參數(shù)構(gòu)造函數(shù)。

使用這個(gè)構(gòu)造函數(shù)創(chuàng)建的 Disruptor 對(duì)象會(huì)默認(rèn)使用 ProducerType.MULTI(多個(gè)事件發(fā)布者模式)和 BlockingWaitStrategy(阻塞等待策略) 吨掌。

public Disruptor(final EventFactory<T> eventFactory, final int ringBufferSize, final ThreadFactory threadFactory)
{
    this(RingBuffer.createMultiProducer(eventFactory, ringBufferSize), new BasicExecutor(threadFactory));
}

1.2.6 發(fā)布事件

1.2.6.1 main方法測(cè)試

//獲取 Disruptor 對(duì)象
Disruptor<LogEvent> disruptor = getLogEventDisruptor();
//綁定處理事件的Handler對(duì)象
disruptor.handleEventsWith(new LogEventHandler());
//啟動(dòng) Disruptor
disruptor.start();
//獲取保存事件的環(huán)形數(shù)組(RingBuffer)
RingBuffer<LogEvent> ringBuffer = disruptor.getRingBuffer();
//發(fā)布 10w 個(gè)事件
for (int i = 1; i <= 100000; i++) {
    // 通過(guò)調(diào)用 RingBuffer 的 next() 方法獲取下一個(gè)空閑事件槽的序號(hào)
    long sequence = ringBuffer.next();
    try {
        LogEvent logEvent = ringBuffer.get(sequence);
        // 初始化 Event半抱,對(duì)其賦值
        logEvent.setMessage("這是第%d條日志消息".formatted(i));
    } finally {
        // 發(fā)布事件
        ringBuffer.publish(sequence);
    }
}
// 關(guān)閉 Disruptor
disruptor.shutdown();

1.2.6.2 使用配置方式

public interface DisruptorMqService {

    /**
     * 消息
     * @param message
     */
    void sayHelloMq(String message);
}

@Slf4j
@Component
@Service
public class DisruptorMqServiceImpl implements DisruptorMqService {

    @Autowired
    private RingBuffer<LogEvent> messageModelRingBuffer;

    @Override
    public void sayHelloMq(String message) {
        log.info("record the message: {}",message);
        //獲取下一個(gè)Event槽的下標(biāo)
        long sequence = messageModelRingBuffer.next();
        try {
            //給Event填充數(shù)據(jù)
            MessageModel event = messageModelRingBuffer.get(sequence);
            event.setMessage(message);
            log.info("往消息隊(duì)列中添加消息:{}", event);
        } catch (Exception e) {
            log.error("failed to add event to messageModelRingBuffer for : e = {},{}",e,e.getMessage());
        } finally {
            //發(fā)布Event,激活觀察者去消費(fèi)膜宋,將sequence傳遞給改消費(fèi)者
            //注意最后的publish方法必須放在finally中以確保必須得到調(diào)用窿侈;如果某個(gè)請(qǐng)求的sequence未被提交將會(huì)堵塞后續(xù)的發(fā)布操作或者其他的producer
            messageModelRingBuffer.publish(sequence);
        }
    }
}
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市秋茫,隨后出現(xiàn)的幾起案子史简,更是在濱河造成了極大的恐慌,老刑警劉巖肛著,帶你破解...
    沈念sama閱讀 222,104評(píng)論 6 515
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件圆兵,死亡現(xiàn)場(chǎng)離奇詭異,居然都是意外死亡枢贿,警方通過(guò)查閱死者的電腦和手機(jī)殉农,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 94,816評(píng)論 3 399
  • 文/潘曉璐 我一進(jìn)店門(mén),熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)局荚,“玉大人超凳,你說(shuō)我怎么就攤上這事∫” “怎么了轮傍?”我有些...
    開(kāi)封第一講書(shū)人閱讀 168,697評(píng)論 0 360
  • 文/不壞的土叔 我叫張陵,是天一觀的道長(zhǎng)首装。 經(jīng)常有香客問(wèn)我创夜,道長(zhǎng),這世上最難降的妖魔是什么仙逻? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 59,836評(píng)論 1 298
  • 正文 為了忘掉前任驰吓,我火速辦了婚禮,結(jié)果婚禮上系奉,老公的妹妹穿的比我還像新娘檬贰。我一直安慰自己,他們只是感情好喜最,可當(dāng)我...
    茶點(diǎn)故事閱讀 68,851評(píng)論 6 397
  • 文/花漫 我一把揭開(kāi)白布。 她就那樣靜靜地躺著庄蹋,像睡著了一般瞬内。 火紅的嫁衣襯著肌膚如雪迷雪。 梳的紋絲不亂的頭發(fā)上,一...
    開(kāi)封第一講書(shū)人閱讀 52,441評(píng)論 1 310
  • 那天虫蝶,我揣著相機(jī)與錄音章咧,去河邊找鬼。 笑死能真,一個(gè)胖子當(dāng)著我的面吹牛赁严,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播粉铐,決...
    沈念sama閱讀 40,992評(píng)論 3 421
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼疼约,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼!你這毒婦竟也來(lái)了蝙泼?” 一聲冷哼從身側(cè)響起程剥,我...
    開(kāi)封第一講書(shū)人閱讀 39,899評(píng)論 0 276
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤,失蹤者是張志新(化名)和其女友劉穎汤踏,沒(méi)想到半個(gè)月后织鲸,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 46,457評(píng)論 1 318
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡溪胶,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 38,529評(píng)論 3 341
  • 正文 我和宋清朗相戀三年搂擦,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片哗脖。...
    茶點(diǎn)故事閱讀 40,664評(píng)論 1 352
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡瀑踢,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出懒熙,到底是詐尸還是另有隱情丘损,我是刑警寧澤,帶...
    沈念sama閱讀 36,346評(píng)論 5 350
  • 正文 年R本政府宣布工扎,位于F島的核電站徘钥,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏肢娘。R本人自食惡果不足惜呈础,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 42,025評(píng)論 3 334
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望橱健。 院中可真熱鬧而钞,春花似錦、人聲如沸拘荡。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 32,511評(píng)論 0 24
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)。三九已至网缝,卻和暖如春巨税,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背粉臊。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 33,611評(píng)論 1 272
  • 我被黑心中介騙來(lái)泰國(guó)打工草添, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人扼仲。 一個(gè)月前我還...
    沈念sama閱讀 49,081評(píng)論 3 377
  • 正文 我出身青樓远寸,卻偏偏與公主長(zhǎng)得像,于是被迫代替她去往敵國(guó)和親屠凶。 傳聞我的和親對(duì)象是個(gè)殘疾皇子驰后,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,675評(píng)論 2 359

推薦閱讀更多精彩內(nèi)容