Java并發(fā)系列7-Disruptor無鎖緩存框架

聲明:原創(chuàng)文章蜒程,轉載請注明出處绅你。http://www.reibang.com/u/e02df63eaa87

1、從生產(chǎn)者消費者說起

在傳統(tǒng)的生產(chǎn)者消費者模型中昭躺,通常是采用BlockingQueue實現(xiàn)忌锯。其中生產(chǎn)者線程負責提交需求,消費者線程負責處理任務领炫,二者之間通過共享內(nèi)存緩沖區(qū)進行通信偶垮。由于內(nèi)存緩沖區(qū)的存在,允許生產(chǎn)者和消費者之間速度的差異,確保系統(tǒng)正常運行似舵。

下圖展示一個簡單的生產(chǎn)者消費者模型脚猾,生產(chǎn)者從文件中讀取數(shù)據(jù),將數(shù)據(jù)內(nèi)容寫入到阻塞隊列中砚哗,消費者從隊列的另一邊獲取數(shù)據(jù)龙助,進行計算并將結果輸出。其中Main負責創(chuàng)建兩類線程并初始化隊列蛛芥。

生產(chǎn)者-消費者

Main:

public class Main {
    public static void main(String[] args) {
        // 初始化阻塞隊列
        BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(1000);
        // 創(chuàng)建生產(chǎn)者線程
        Thread producer = new Thread(new Producer(blockingQueue, "temp.dat"));
        producer.start();
        // 創(chuàng)建消費者線程
        Thread consumer = new Thread(new Consumer(blockingQueue));
        consumer.start();
    }
}

生產(chǎn)者:

public class Producer implements Runnable {
    private BlockingQueue<String> blockingQueue;
    private String fileName;
    private static final String FINIDHED = "EOF";

    public Producer(BlockingQueue<String> blockingQueue, String fileName)  {
        this.blockingQueue = blockingQueue;
        this.fileName = fileName;
    }

    @Override
    public void run() {
        try {
            BufferedReader reader = new BufferedReader(new FileReader(new File(fileName)));
            String line;
            while ((line = reader.readLine()) != null) {
                blockingQueue.put(line);
            }
            // 結束標志
            blockingQueue.put(FINIDHED);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

消費者:

public class Consumer implements Runnable {
    private BlockingQueue<String> blockingQueue;
    private static final String FINIDHED = "EOF";

    public Consumer(BlockingQueue<String> blockingQueue) {
        this.blockingQueue = blockingQueue;
    }

    @Override
    public void run() {
        String line;
        String[] arrStr;
        int ret;
        try {
            while (!(line = blockingQueue.take()).equals(FINIDHED)) {
                // 消費
                arrStr = line.split("\t");
                if (arrStr.length != 2) {
                    continue;
                }
                ret = Integer.parseInt(arrStr[0]) + Integer.parseInt(arrStr[1]);
                System.out.println(ret);
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

生產(chǎn)者-消費者模型可以很容易地將生產(chǎn)和消費進行解耦提鸟,優(yōu)化系統(tǒng)整體結構,并且由于存在緩沖區(qū)仅淑,可以緩解兩端性能不匹配的問題称勋。

2、BlockingQueue的不足

上述使用了ArrayBlockingQueue漓糙,通過查看其實現(xiàn)铣缠,完全是使用鎖和阻塞等待實現(xiàn)線程同步。在高并發(fā)場景下昆禽,性能不是很優(yōu)越。

public void put(E e) throws InterruptedException {
        checkNotNull(e);
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == items.length)
                notFull.await();
            insert(e);
        } finally {
            lock.unlock();
        }
    }

但是蝇庭,ConcurrentLinkedQueue卻是一個高性能隊列醉鳖,這是因為其實現(xiàn)使用了無鎖的CAS操作。

3哮内、Disruptor初體驗

Disruptor是由LMAX公司開發(fā)的一款高效無鎖內(nèi)存隊列盗棵。使用無鎖方式實現(xiàn)了一個環(huán)形隊列代替線性隊列。相對于普通的線性隊列酝锅,環(huán)形隊列不需要維護頭尾兩個指針刚操,只需維護一個當前位置就可以完成出入隊操作壕曼。受限于環(huán)形結構,隊列的大小只能初始化時指定瞭恰,不能動態(tài)擴展。

如下圖所示狱庇,Disruptor的實現(xiàn)為一個循環(huán)隊列惊畏,ringbuffer擁有一個序號(Seq),這個序號指向數(shù)組中下一個可用的元素密任。

Disruptor循環(huán)隊列

隨著不停地填充這個buffer(可能也會有相應的讀妊掌簟),這個序號會一直增長浪讳,直到超過這個環(huán)缰盏。


Disruptor循環(huán)隊列

Disruptor要求數(shù)組大小設置為2的N次方。這樣可以通過Seq & (QueueSize - 1) 直接獲取,其效率要比取目诓拢快得多形葬。這是因為(Queue - 1)的二進制為全1等形式。例如暮的,上圖中QueueSize大小為8笙以,Seq為10,則只需要計算二進制1010 & 0111 = 2冻辩,可直接得到index=2位置的元素猖腕。

在RingBuffer中,生產(chǎn)者向數(shù)組中寫入數(shù)據(jù)恨闪,生產(chǎn)者寫入數(shù)據(jù)時倘感,使用CAS操作。消費者從中讀取數(shù)據(jù)時咙咽,為防止多個消費者同時處理一個數(shù)據(jù)老玛,也使用CAS操作進行數(shù)據(jù)保護。
這種固定大小的RingBuffer還有一個好處是钧敞,可以內(nèi)存復用蜡豹。不會有新空間需要分配或者舊的空間回收,當數(shù)組填充滿后溉苛,再寫入數(shù)據(jù)會將數(shù)據(jù)覆蓋镜廉。

4、Disruptor小試牛刀

同樣地愚战,使用Disruptor處理第一節(jié)中的生產(chǎn)者消費者的案例娇唯。

4.1 添加Maven依賴
<dependency>
        <groupId>com.lmax</groupId>
        <artifactId>disruptor</artifactId>
        <version>3.3.2</version>
</dependency>
4.2 定義事件對象

由于我們只需要將文件中的數(shù)據(jù)行讀出,然后進行計算寂玲。因此塔插,定義FileData.class來保存文件行。

public class FileData {
    private String line;

    public String getLine() {
        return line;
    }

    public void setLine(String line) {
        this.line = line;
    }
}
4.3 定義工廠類

用于產(chǎn)生FileData的工廠類拓哟,會在Disruptor系統(tǒng)初始化時想许,構造所有的緩沖區(qū)中的對象實例。

public class DisruptorFactory implements EventFactory<FileData> {

    public FileData newInstance() {
        return new FileData();
    }
}
4.4 定義消費者

消費者的作用是讀取數(shù)據(jù)并進行處理彰檬。數(shù)據(jù)的讀取已經(jīng)由Disruptor封裝伸刃,onEvent()方法為Disruptor框架的回調(diào)方法。只需要進行簡單的數(shù)據(jù)處理即可逢倍。

public class DisruptorConsumer implements WorkHandler<FileData> {
    private static final String FINIDHED = "EOF";

    @Override
    public void onEvent(FileData event) throws Exception {
       String line = event.getLine();
        if (line.equals(FINIDHED)) {
            return;
        }
        // 消費
        String[] arrStr = line.split("\t");
        if (arrStr.length != 2) {
            return;
        }
        int ret = Integer.parseInt(arrStr[0]) + Integer.parseInt(arrStr[1]);
        System.out.println(ret);
    }
}
4.5 定義生產(chǎn)者

生產(chǎn)者需要一個Ringbuffer的引用捧颅。其中pushData()方法是將生產(chǎn)的數(shù)據(jù)寫入到RingBuffer中。具體的過程是较雕,首先通過next()方法得到下一個可用的序列號碉哑;取得下一個可用的FileData挚币,并設置該對象的值;最后扣典,進行數(shù)據(jù)發(fā)布妆毕,這個FileData對象會傳遞給消費者。

public class DisruptorProducer {
    private static final String FINIDHED = "EOF";
    private final RingBuffer<FileData> ringBuffer;

    public DisruptorProducer(RingBuffer<FileData> ringBuffer) {
        this.ringBuffer = ringBuffer;
    }

    public void pushData(String line) {
        long seq = ringBuffer.next();
        try {
            FileData event = ringBuffer.get(seq);   // 獲取可用位置
            event.setLine(line);                    // 填充可用位置
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            ringBuffer.publish(seq);        // 通知消費者
        }
    }

    public void read(String fileName) {
        try {
            BufferedReader reader = new BufferedReader(new FileReader(new File(fileName)));
            String line;
            while ((line = reader.readLine()) != null) {
                // 生產(chǎn)數(shù)據(jù)
                pushData(line);
            }
            // 結束標志
            pushData(FINIDHED);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
4.6 定義Main函數(shù)

最后需要一個DisruptorMain()將上述的數(shù)據(jù)贮尖、生產(chǎn)者和消費者進行整合笛粘。

public class DisruptorMain {
    public static void main(String[] args) {
        DisruptorFactory factory = new DisruptorFactory();          // 工廠
        ExecutorService executor = Executors.newCachedThreadPool(); // 線程池
        int BUFFER_SIZE = 16;   // 必須為2的冪指數(shù)

        // 初始化Disruptor
        Disruptor<FileData> disruptor = new Disruptor<>(factory,
                BUFFER_SIZE,
                executor,
                ProducerType.MULTI,         // Create a RingBuffer supporting multiple event publishers to the one RingBuffer
                new BlockingWaitStrategy()  // 默認阻塞策略
                );
        // 啟動消費者
        disruptor.handleEventsWithWorkerPool(new DisruptorConsumer(),
                new DisruptorConsumer()
        );
        disruptor.start();
        // 啟動生產(chǎn)者
        RingBuffer<FileData> ringBuffer = disruptor.getRingBuffer();
        DisruptorProducer producer = new DisruptorProducer(ringBuffer);
        producer.read("temp.dat");

        // 關閉
        disruptor.shutdown();
        executor.shutdown();
    }
}

5、Disruptor策略

Disruptor生產(chǎn)者和消費者之間是通過什么策略進行同步呢湿硝?Disruptor提供了如下幾種策略:

  • BlockingWaitStrategy:默認等待策略薪前。和BlockingQueue的實現(xiàn)很類似,通過使用鎖和條件(Condition)進行線程同步和喚醒关斜。此策略對于線程切換來說示括,最節(jié)約CPU資源,但在高并發(fā)場景下性能有限痢畜。
  • SleepingWaitStrategy:CPU友好型策略垛膝。會在循環(huán)中不斷等待數(shù)據(jù)。首先進行自旋等待丁稀,若不成功吼拥,則使用Thread.yield()讓出CPU,并使用LockSupport.parkNanos(1)進行線程睡眠二驰。所以扔罪,此策略數(shù)據(jù)處理數(shù)據(jù)可能會有較高的延遲,適合用于對延遲不敏感的場景桶雀。優(yōu)點是對生產(chǎn)者線程影響小,典型應用場景是異步日志唬复。
  • YieldingWaitStrategy:低延時策略矗积。消費者線程會不斷循環(huán)監(jiān)控RingBuffer的變化,在循環(huán)內(nèi)部使用Thread.yield()讓出CPU給其他線程敞咧。
  • BusySpinWaitStrategy:死循環(huán)策略棘捣。消費者線程會盡最大可能監(jiān)控緩沖區(qū)的變化,會占用所有CPU資源休建。

6乍恐、Disruptor解決CPU Cache偽共享問題

為了解決CPU和內(nèi)存速度不匹配的問題,CPU中有多個高速緩存Cache测砂。在Cache中茵烈,讀寫數(shù)據(jù)的基本單位是緩存行,緩存行是內(nèi)存復制到緩存的最小單位砌些。

偽共享問題

若兩個變量放在同一個Cache Line中呜投,在多線程情況下加匈,可能會相互影響彼此的性能。如上圖所示仑荐,CPU1上的線程更新了變量X雕拼,則CPU上的緩存行會失效,同一行的Y即使沒有更新也會失效粘招,導致Cache無法命中啥寇。
同樣地,若CPU2上的線程更新了Y洒扎,則導致CPU1上的緩存行又失效辑甜。如果CPU經(jīng)常不能命中緩存,則系統(tǒng)的吞吐量則會下降逊笆。這就是偽共享問題栈戳。

解決偽共享問題

解決偽共享問題,可以在變量的前后都占據(jù)一定的填充位置难裆,盡量讓變量占用一個完整的緩存行子檀。如上圖中,CPU1上的線程更新了X乃戈,則CPU2上的Y則不會失效褂痰。同樣地,CPU2上的線程更新了Y症虑,則CPU1的不會失效缩歪。

class LhsPadding
{
    protected long p1, p2, p3, p4, p5, p6, p7;
}

class Value extends LhsPadding
{
    protected volatile long value;
}

class RhsPadding extends Value
{
    protected long p9, p10, p11, p12, p13, p14, p15;
}

/**
 * <p>Concurrent sequence class used for tracking the progress of
 * the ring buffer and event processors.  Support a number
 * of concurrent operations including CAS and order writes.
 *
 * <p>Also attempts to be more efficient with regards to false
 * sharing by adding padding around the volatile field.
 */
public class Sequence extends RhsPadding
{
    static final long INITIAL_VALUE = -1L;
    private static final Unsafe UNSAFE;
    private static final long VALUE_OFFSET;

    static
    {
        UNSAFE = Util.getUnsafe();
        try
        {
            VALUE_OFFSET = UNSAFE.objectFieldOffset(Value.class.getDeclaredField("value"));
        }
        catch (final Exception e)
        {
            throw new RuntimeException(e);
        }
    }
... ...
}

Sequence的實現(xiàn)中,主要使用的是Value谍憔,但通過LhsPaddingRhsPadding在Value的前后填充了一些空間匪蝙,使Value無沖突的存在于緩存行中。

參考
http://ifeve.com/dissecting-disruptor-whats-so-special/

最后編輯于
?著作權歸作者所有,轉載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末习贫,一起剝皮案震驚了整個濱河市逛球,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌苫昌,老刑警劉巖颤绕,帶你破解...
    沈念sama閱讀 212,599評論 6 492
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異祟身,居然都是意外死亡奥务,警方通過查閱死者的電腦和手機,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,629評論 3 385
  • 文/潘曉璐 我一進店門袜硫,熙熙樓的掌柜王于貴愁眉苦臉地迎上來氯葬,“玉大人,你說我怎么就攤上這事父款∫绨” “怎么了瞻凤?”我有些...
    開封第一講書人閱讀 158,084評論 0 348
  • 文/不壞的土叔 我叫張陵,是天一觀的道長世杀。 經(jīng)常有香客問我阀参,道長,這世上最難降的妖魔是什么瞻坝? 我笑而不...
    開封第一講書人閱讀 56,708評論 1 284
  • 正文 為了忘掉前任蛛壳,我火速辦了婚禮,結果婚禮上所刀,老公的妹妹穿的比我還像新娘衙荐。我一直安慰自己,他們只是感情好浮创,可當我...
    茶點故事閱讀 65,813評論 6 386
  • 文/花漫 我一把揭開白布忧吟。 她就那樣靜靜地躺著,像睡著了一般斩披。 火紅的嫁衣襯著肌膚如雪溜族。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 50,021評論 1 291
  • 那天垦沉,我揣著相機與錄音煌抒,去河邊找鬼。 笑死厕倍,一個胖子當著我的面吹牛寡壮,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播讹弯,決...
    沈念sama閱讀 39,120評論 3 410
  • 文/蒼蘭香墨 我猛地睜開眼况既,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了组民?” 一聲冷哼從身側響起坏挠,我...
    開封第一講書人閱讀 37,866評論 0 268
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎邪乍,沒想到半個月后,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體对竣,經(jīng)...
    沈念sama閱讀 44,308評論 1 303
  • 正文 獨居荒郊野嶺守林人離奇死亡庇楞,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 36,633評論 2 327
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了否纬。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片吕晌。...
    茶點故事閱讀 38,768評論 1 341
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖临燃,靈堂內(nèi)的尸體忽然破棺而出睛驳,到底是詐尸還是另有隱情烙心,我是刑警寧澤,帶...
    沈念sama閱讀 34,461評論 4 333
  • 正文 年R本政府宣布乏沸,位于F島的核電站淫茵,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏蹬跃。R本人自食惡果不足惜匙瘪,卻給世界環(huán)境...
    茶點故事閱讀 40,094評論 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望蝶缀。 院中可真熱鬧丹喻,春花似錦、人聲如沸翁都。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,850評論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽柄慰。三九已至鳍悠,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間先煎,已是汗流浹背贼涩。 一陣腳步聲響...
    開封第一講書人閱讀 32,082評論 1 267
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留薯蝎,地道東北人遥倦。 一個月前我還...
    沈念sama閱讀 46,571評論 2 362
  • 正文 我出身青樓,卻偏偏與公主長得像占锯,于是被迫代替她去往敵國和親袒哥。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當晚...
    茶點故事閱讀 43,666評論 2 350

推薦閱讀更多精彩內(nèi)容

  • 本文是筆者在研究Disruptor過程中翻譯的Disruptor1.0論文精選消略,中間穿插了一些感想和說明堡称,均以“譯...
    coder_jerry閱讀 5,160評論 3 52
  • Java-Review-Note——4.多線程 標簽: JavaStudy PS:本來是分開三篇的,后來想想還是整...
    coder_pig閱讀 1,641評論 2 17
  • 距離是什么艺演? 你在的地方下雪了却紧, 而我還在過夏天。 幅員遼闊的中國胎撤, 終究隔開了你我晓殊。 渠藝 2016.10.26
    渠六億閱讀 195評論 0 3
  • 首先登陸Navicat官網(wǎng)下載Linux版本: https://www.navicat.com.cn/downlo...
    呂志豪閱讀 902評論 0 0
  • 我寫的東西巫俺,你最好別看。因為我知道肿男,我寫得不好介汹∪次耍可是我需要,我需要表達嘹承。負能量退散窗价,正能量快來。寫得不好是赶撰,因為太...
    夜深月明閱讀 150評論 0 0