Disruptor源碼(三)生產(chǎn)者怎么知道在哪里“下蛋”披蕉?

概述

  • RingBuffer#next()這個(gè)方法干的事情是,生產(chǎn)者問(wèn)RingBuffer要一個(gè)能“下蛋”的位置乌奇,具體怎么給生產(chǎn)者給出這個(gè)位置没讲,是由Sequencer的實(shí)現(xiàn)類完成的;
  • Disruptor這個(gè)無(wú)鎖并行框架中的“無(wú)鎖”礁苗,在這個(gè)方法中也體現(xiàn)出來(lái)了爬凑;

0)客戶端代碼

  • 從這一句開始:long sequence = ringBuffer.next();
import java.nio.ByteBuffer;

import com.lmax.disruptor.RingBuffer;

public class OrderEventProducer {

    private RingBuffer<OrderEvent> ringBuffer;
    
    public OrderEventProducer(RingBuffer<OrderEvent> ringBuffer) {
        this.ringBuffer = ringBuffer;
    }
    
    public void sendData(ByteBuffer data) {
        //1 在生產(chǎn)者發(fā)送消息的時(shí)候, 首先 需要從我們的ringBuffer里面 獲取一個(gè)可用的序號(hào)
        long sequence = ringBuffer.next();  //0 
        try {
            //2 根據(jù)這個(gè)序號(hào), 找到具體的 "OrderEvent" 元素 注意:此時(shí)獲取的OrderEvent對(duì)象是一個(gè)沒(méi)有被賦值的"空對(duì)象"
            OrderEvent event = ringBuffer.get(sequence);
            //3 進(jìn)行實(shí)際的賦值處理
            event.setValue(data.getLong(0));            
        } finally {
            //4 提交發(fā)布操作
            ringBuffer.publish(sequence);           
        }
    }
    
}

1)RingBuffer#next()

  • sequencer是一個(gè)接口,所以要看其實(shí)現(xiàn)類试伙,這里是單消費(fèi)者模式嘁信,所以去SingleProducerSequencer中看next();
@Override
public long next()
{
    return sequencer.next();
}

2)SingleProducerSequencer#next()

@Override
public long next()
{
    return next(1);
}

3)SingleProducerSequencer#next(int)

其作為服務(wù)提供者迁霎,向RingBuffer提供服務(wù)吱抚,RingBuffer中的相關(guān)信息它都是有的百宇,比如:bufferSize考廉,并且維護(hù)了一些輔助信息,比如:nextValue携御,cachedValue昌粤;

this.nextValue
  • 表示生產(chǎn)者最后一次投遞的位置,在方法結(jié)束的時(shí)候啄刹,會(huì)向前走一位涮坐,追上nextSequence;
nextSequence
  • 表示生產(chǎn)者這次要投遞的位置誓军;
wrapPoint
  • 表示包裹點(diǎn)袱讹,當(dāng)生產(chǎn)者還沒(méi)有生產(chǎn)完一圈的時(shí)候,其值為負(fù),沒(méi)有和生產(chǎn)者中最慢的位置比較的意義捷雕,因?yàn)榫退阆M(fèi)者一個(gè)都沒(méi)消費(fèi)椒丧,其最慢的消費(fèi)位置為-1;就算wrapPoint為-1了救巷,表示生產(chǎn)者要投遞第一圈的最后一個(gè)位置了壶熏,也還沒(méi)追上消費(fèi)者,無(wú)需阻塞浦译;
this.cachedValue
  • 表示上一次投遞棒假,消費(fèi)者最慢的位置,這個(gè)位置在最新一次投遞時(shí)精盅,有可能已經(jīng)落后了帽哑,不過(guò)沒(méi)關(guān)系,其作為第一道門檻還是有意義的:如果生產(chǎn)者這次的投遞位置還沒(méi)到上次最慢的消費(fèi)者的位置叹俏,那么無(wú)需擔(dān)心生產(chǎn)者阻塞的問(wèn)題祝拯,直接投遞,不會(huì)覆蓋尚未消費(fèi)的Event的她肯;
minSequence
  • 表示最慢的消費(fèi)者的消費(fèi)位置佳头,如果消費(fèi)者一個(gè)都沒(méi)消費(fèi),其值為-1晴氨;minSequence和wrapPoint的關(guān)系是:如果生產(chǎn)者這次要投遞的位置已經(jīng)超過(guò)最慢的消費(fèi)者的位置康嘉,那么生產(chǎn)者這次要投遞的位置上有待消費(fèi)的Event,生產(chǎn)者會(huì)阻塞 1 納秒籽前,直到生產(chǎn)者這次要投遞的位置的Event被消費(fèi)掉亭珍;
@Override
public long next(int n)
{
    if (n < 1) // n = 1
    {
        throw new IllegalArgumentException("n must be > 0");
    }

    long nextValue = this.nextValue; // -1 0

    /**
     * nextValue 是生產(chǎn)者當(dāng)前最后投遞成功的位置;
     * nextSequence 是生產(chǎn)者的下一個(gè)投遞序號(hào)枝哄;
     */
    long nextSequence = nextValue + n; // -1 + 1 = 0, 0+1=1
    /**
     * 如果生產(chǎn)者還沒(méi)有投夠一圈肄梨,wrapPoint < 0
     * wrapPoint > 0,才有和 cachedGatingSequence 比較的意義挠锥;
     * wrapPoint 理解成包裹點(diǎn)众羡,生產(chǎn)者投了一圈了才能把entries包裹上;
     * wrapPoint 也是生產(chǎn)者的投遞sequence
     */
    long wrapPoint = nextSequence - bufferSize; // 0 - 10 = -10, 1-10=-9
    /**
     * cachedValue 應(yīng)該是上一次生產(chǎn)者獲取sequence的時(shí)候蓖租,最慢的消費(fèi)者的sequence
     */
    long cachedGatingSequence = this.cachedValue; // -1

    /**
     * cachedGatingSequence > nextValue 
     * 上次生產(chǎn)者投遞的時(shí)候粱侣,最慢的消費(fèi)者的位置都比生產(chǎn)者最后投遞成功的位置大,消費(fèi)者領(lǐng)先于生產(chǎn)者蓖宦;
     * wrapPoint > cachedGatingSequence
     * 生產(chǎn)者已經(jīng)追上上次最慢的額消費(fèi)者齐婴;
     */
    if (wrapPoint > cachedGatingSequence || cachedGatingSequence > nextValue)
    {
        /**
         * 當(dāng)前消費(fèi)者中最慢的sequence;
         */
        long minSequence;
        /**
         * 生產(chǎn)者的速度太快了稠茂,追上消費(fèi)者了
         */
        while (wrapPoint > (minSequence = Util.getMinimumSequence(gatingSequences, nextValue)))
        {
            /**
             * 生產(chǎn)者阻塞一丟丟柠偶,讓消費(fèi)者趕緊接著消費(fèi);
             */
            LockSupport.parkNanos(1L); // TODO: Use waitStrategy to spin?
        }
        /**
         * 生產(chǎn)者的速度沒(méi)那么快,記錄下這次最慢的消費(fèi)者诱担;
         * cachedValue 存的應(yīng)該是消費(fèi)者中最慢的
         */
        this.cachedValue = minSequence;
    }

    /**
     * nextValue:已經(jīng)成功投遞的最遠(yuǎn)的位置鲫售;
     * nextSequence:本次要投遞的位置;
     * this.nextValue = nextSequence; 可以理解成投遞成功该肴;雖然新的Event還沒(méi)有更新到entries中情竹,但意思上已經(jīng)投遞成功了;
     */
    this.nextValue = nextSequence; // 0, 1

    /**
     * 給生產(chǎn)者這次投遞的位置匀哄;
     */
    return nextSequence;
}

總結(jié)

  • 生產(chǎn)者生產(chǎn)的時(shí)候秦效,按順序生產(chǎn),能生產(chǎn)就生產(chǎn)涎嚼,不能生產(chǎn)就等著阱州;
  • 生產(chǎn)者和消費(fèi)者都沒(méi)有對(duì)Event加鎖;
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末法梯,一起剝皮案震驚了整個(gè)濱河市苔货,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌立哑,老刑警劉巖夜惭,帶你破解...
    沈念sama閱讀 217,657評(píng)論 6 505
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場(chǎng)離奇詭異铛绰,居然都是意外死亡诈茧,警方通過(guò)查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,889評(píng)論 3 394
  • 文/潘曉璐 我一進(jìn)店門捂掰,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)敢会,“玉大人,你說(shuō)我怎么就攤上這事这嚣∨富瑁” “怎么了?”我有些...
    開封第一講書人閱讀 164,057評(píng)論 0 354
  • 文/不壞的土叔 我叫張陵姐帚,是天一觀的道長(zhǎng)吏垮。 經(jīng)常有香客問(wèn)我,道長(zhǎng)卧土,這世上最難降的妖魔是什么惫皱? 我笑而不...
    開封第一講書人閱讀 58,509評(píng)論 1 293
  • 正文 為了忘掉前任像樊,我火速辦了婚禮尤莺,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘生棍。我一直安慰自己颤霎,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,562評(píng)論 6 392
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著友酱,像睡著了一般晴音。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上缔杉,一...
    開封第一講書人閱讀 51,443評(píng)論 1 302
  • 那天锤躁,我揣著相機(jī)與錄音,去河邊找鬼或详。 笑死系羞,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的霸琴。 我是一名探鬼主播椒振,決...
    沈念sama閱讀 40,251評(píng)論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼梧乘!你這毒婦竟也來(lái)了澎迎?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 39,129評(píng)論 0 276
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤选调,失蹤者是張志新(化名)和其女友劉穎夹供,沒(méi)想到半個(gè)月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體仁堪,經(jīng)...
    沈念sama閱讀 45,561評(píng)論 1 314
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡罩引,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,779評(píng)論 3 335
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了枝笨。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片袁铐。...
    茶點(diǎn)故事閱讀 39,902評(píng)論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖横浑,靈堂內(nèi)的尸體忽然破棺而出剔桨,到底是詐尸還是另有隱情,我是刑警寧澤徙融,帶...
    沈念sama閱讀 35,621評(píng)論 5 345
  • 正文 年R本政府宣布洒缀,位于F島的核電站,受9級(jí)特大地震影響欺冀,放射性物質(zhì)發(fā)生泄漏树绩。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,220評(píng)論 3 328
  • 文/蒙蒙 一隐轩、第九天 我趴在偏房一處隱蔽的房頂上張望饺饭。 院中可真熱鬧,春花似錦职车、人聲如沸瘫俊。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,838評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)扛芽。三九已至骂蓖,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間川尖,已是汗流浹背登下。 一陣腳步聲響...
    開封第一講書人閱讀 32,971評(píng)論 1 269
  • 我被黑心中介騙來(lái)泰國(guó)打工, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留叮喳,地道東北人庐船。 一個(gè)月前我還...
    沈念sama閱讀 48,025評(píng)論 2 370
  • 正文 我出身青樓,卻偏偏與公主長(zhǎng)得像嘲更,于是被迫代替她去往敵國(guó)和親筐钟。 傳聞我的和親對(duì)象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,843評(píng)論 2 354

推薦閱讀更多精彩內(nèi)容