Disruptor深度解析-RingBuffer

前言

RingBuffer是Disruptor框架負(fù)責(zé)數(shù)據(jù)存儲(chǔ)的模塊,大部分文章也將其稱之為環(huán)形緩存區(qū)廉涕,本文將對其實(shí)現(xiàn)原理進(jìn)行深度探究。

本文依賴Disruptor版本為3.4.3

繼承體系

RingBuffer繼承體系類圖

實(shí)現(xiàn)接口說明

  • DataProvider:獲取指定位置的數(shù)據(jù);
  • EventSink:發(fā)布事件到RingBuffer的接口定義,提供了各種事件發(fā)布的操作方法同眯;
  • Cursored:獲取當(dāng)前游標(biāo)位置;在Disruptor中都是依賴于Sequence的value唯鸭;
  • Sequenced:提供了RingBuffer相關(guān)的sequence判斷操作方法须蜗,包括獲取大小、占用slot、發(fā)布等方法明肮;

繼承類說明

  • RingBufferFields:實(shí)現(xiàn)RingBuffer內(nèi)存預(yù)分配以及數(shù)據(jù)定位等功能菱农,此類非常關(guān)鍵,后續(xù)進(jìn)行詳細(xì)介紹辰斋;
  • RingBufferPad:解決緩存行偽共享的填充類滤否;

RingBuffer初始化流程

//  RingBuffer的構(gòu)造方法
RingBuffer(EventFactory<E> eventFactory, Sequencer sequencer) {
        // 調(diào)用父類即RingBufferFields的構(gòu)造方法
        super(eventFactory, sequencer);
}

可以看出RingBuffer的初始化流程邏輯均在其父類中泼返,我們詳細(xì)分析RingBufferFields類的實(shí)現(xiàn)邏輯;

abstract class RingBufferFields<E> extends RingBufferPad {
    // 用來預(yù)留填充的單側(cè)slot個(gè)數(shù)的妖,加速計(jì)算
    private static final int    BUFFER_PAD;
    // RingBuffer中首個(gè)slot的偏移位置
    private static final long   REF_ARRAY_BASE;
    // log2(scale),該值與BUFFER_PAD作用相似,主要用來加速計(jì)算
    private static final int    REF_ELEMENT_SHIFT;
    // 反射拿到UNSAFE對象
    private static final Unsafe UNSAFE = Util.getUnsafe();

    static {
        // 返回Object數(shù)組中一個(gè)元素指針占用的內(nèi)存大小
        final int scale = UNSAFE.arrayIndexScale(Object[].class);
        // 指針占4個(gè)字節(jié)
        if (4 == scale) {
            REF_ELEMENT_SHIFT = 2;
            // 指針占8個(gè)字節(jié)
        } else if (8 == scale) {
            REF_ELEMENT_SHIFT = 3;
        } else {
            throw new IllegalStateException("Unknown pointer size");
        }
        // BUFFER_PAD的取值為16/32足陨,后續(xù)計(jì)算使用
        BUFFER_PAD = 128 / scale;
        // 計(jì)算數(shù)組首元素的偏移大小嫂粟,往后偏移了128個(gè)字節(jié),128/8=16
        REF_ARRAY_BASE = UNSAFE.arrayBaseOffset(Object[].class) + 128;
    }

    // 數(shù)組的最大下標(biāo)墨缘,即bufferSize - 1
    private final   long      indexMask;
    // 對象數(shù)組星虹,RingBuffer中實(shí)際數(shù)據(jù)載體
    private final   Object[]  entries;
    // RingBuffer的長度
    protected final int       bufferSize;
    // 序列器,分為單生產(chǎn)者和多生產(chǎn)者镊讼,用來在生產(chǎn)者和消費(fèi)者之間傳遞數(shù)據(jù)宽涌,此處暫時(shí)略過
    protected final Sequencer sequencer;

    RingBufferFields(EventFactory<E> eventFactory, Sequencer sequencer) {
        this.sequencer = sequencer;
        this.bufferSize = sequencer.getBufferSize();

        if (bufferSize < 1) {
            throw new IllegalArgumentException("bufferSize must not be less than 1");
        }
        // bufferSize必須為2^N,否則進(jìn)行報(bào)錯(cuò)
        if (Integer.bitCount(bufferSize) != 1) {
            throw new IllegalArgumentException("bufferSize must be a power of 2");
        }
        // 最大下標(biāo)
        this.indexMask = bufferSize - 1;
        // 預(yù)留32/64個(gè)空槽位
        this.entries = new Object[sequencer.getBufferSize() + 2 * BUFFER_PAD];
        /**
         * 一次性填充滿整個(gè)數(shù)組狠毯,從BUFFER_PAD的下一位開始护糖;
         */
        fill(eventFactory);
    }

    private void fill(EventFactory<E> eventFactory) {
        // 預(yù)留BUFFER_PAD個(gè)空slot
        for (int i = 0; i < bufferSize; i++) {
            entries[BUFFER_PAD + i] = eventFactory.newInstance();
        }
    }

    @SuppressWarnings("unchecked")
    protected final E elementAt(long sequence) {
        // 元素內(nèi)存偏移量=base + index * scale 
        // REF_ARRAY_BASE= UNSAFE.arrayBaseOffset(Object[].class) + 128;
        // 因?yàn)镽ingBuffer的長度為2^N,所以(sequence & indexMask) = sequence % bufferSize = index
        // index<<REF_ELEMENT_SHIFT = index << (2 ^ (log2(scale))) = index * scale
        return (E) UNSAFE.getObject(entries, REF_ARRAY_BASE + ((sequence & indexMask) << REF_ELEMENT_SHIFT));
    }
}

上述代碼分析中嚼松,有幾處關(guān)鍵點(diǎn)如下:

  • 所謂的環(huán)形緩沖區(qū)實(shí)質(zhì)上就是一個(gè)連續(xù)數(shù)組嫡良,只是包裝出來了環(huán)形的概念;
  • RingBuffer預(yù)分配內(nèi)存時(shí)在數(shù)組的左右倆側(cè)各預(yù)留了BUFFER_PAD個(gè)slot献酗;
  • 內(nèi)存預(yù)分配時(shí)從BUFFER_PAD處開始寝受,在64位JDK中,假設(shè)開啟指針壓縮罕偎,則相當(dāng)于左側(cè)預(yù)留32個(gè)slot很澄,右側(cè)預(yù)留32個(gè)slot;
  • 在RingBuffer長度為2^N時(shí)颜及,通過sequence & (bufferSize - 1)加速定位元素實(shí)際下標(biāo)索引甩苛,通過結(jié)合<<操作實(shí)現(xiàn)乘法;
  • 首個(gè)元素的內(nèi)存偏移位置是固定的俏站,即128位讯蒲,因?yàn)锽UFFER_PAD * scale=128;

在MultiProducerSequencer中后續(xù)通過>>>實(shí)現(xiàn)除法肄扎,作用類似墨林;

為何預(yù)留slot赁酝?

在現(xiàn)代CPU中,緩存行偽共享是一個(gè)經(jīng)典問題旭等,即CPU一次load的L1緩存行大小是固定的酌呆,如果兩個(gè)不同的元素被load到同一緩存行中,那么任何一個(gè)元素的修改都會(huì)導(dǎo)致另一元素在其它CPU核對應(yīng)的緩存失效搔耕,這個(gè)問題在很多博文中都有專門講解隙袁,此處不再贅述。引用結(jié)論就是單個(gè)頻繁修改元素盡量不和其它元素在同一緩存行中度迂。
對于entries對象藤乙,左右兩側(cè)各預(yù)留了128 bytes,這就保證了緩存行在<=128bytes時(shí)惭墓,entries對象一定不會(huì)和其它對象共享緩存行坛梁。

Sequence是什么?

Sequence是針對RingBuffer中slot位置的一個(gè)描述腊凶,與AtomicLong的功能幾乎一致划咐,其在AtomicLong的基礎(chǔ)上解決了緩存行偽共享的問題,其繼承關(guān)系如下:



其核心元素為value钧萍,根據(jù)Java的內(nèi)存分配褐缠,value兩次各8個(gè)long對象,即大小為8 * 8=64 bytes风瘦,那么在緩存行<=64 bytes時(shí)队魏,可以避免緩存行偽共享問題。
下面對其關(guān)鍵的兩個(gè)方法做下簡單分析:

/**
     * Perform an ordered write of this sequence.  The intent is a Store/Store barrier between this write and any previous store.
     *
     * putOrderedLong在執(zhí)行時(shí)使用store/store內(nèi)存屏障万搔,因此其在可見性上會(huì)存在一定的延時(shí)胡桨,但是由于其未使用store/load屏障,因此其寫入是無阻塞的
     */
    public void set(final long value) {
        UNSAFE.putOrderedLong(this, VALUE_OFFSET, value);
    }

    /**
     * Performs a volatile write of this sequence.  The intent is a Store/Store barrier between this write and any previous write and a
     * Store/Load barrier between this write and any subsequent volatile read.
     *
     * 本方法支持volatile語義瞬雹,即在寫入前插入store/store內(nèi)存屏障昧谊,寫入后插入store/load內(nèi)存屏障,因此值的修改在其它線程是立即可見的
     */
    public void setVolatile(final long value) {
        UNSAFE.putLongVolatile(this, VALUE_OFFSET, value);
    }

Sequencer是什么酗捌?

前面講完了Sequence呢诬,是時(shí)候推出Sequencer了,Sequencer是Disruptor中非常核心的一個(gè)概念胖缤,其作為生產(chǎn)者和消費(fèi)者的橋梁尚镰,通過維護(hù)兩者的sequence來實(shí)現(xiàn)數(shù)據(jù)的扭轉(zhuǎn);


sequencer.img

從繼承體系上來看哪廓,sequencer主要分為單生產(chǎn)者和多生產(chǎn)者兩類狗唉,這兩者實(shí)現(xiàn)了各種free-lock的算法,此處以SingleProducerSequencer為例分析撩独。

AbstractSequencer

該類主要屬性說明如下:

// JDK提供的基于反射機(jī)制的volatile原子更新工具類敞曹,本更新器用來更新gatingSequences屬性
private static final AtomicReferenceFieldUpdater<AbstractSequencer, Sequence[]> SEQUENCE_UPDATER =
            AtomicReferenceFieldUpdater.newUpdater(AbstractSequencer.class, Sequence[].class, "gatingSequences");

// RingBuffer中環(huán)形緩沖區(qū)的大小
protected final    int          bufferSize;
// 消費(fèi)者等待策略
protected final    WaitStrategy waitStrategy;
// 生產(chǎn)者sequence,初始位置為-1
protected final    Sequence     cursor          = new Sequence(Sequencer.INITIAL_CURSOR_VALUE);
// 消費(fèi)者sequence集合
protected volatile Sequence[]   gatingSequences = new Sequence[0];

該類主要方法說明如下:

    /**
     * 增加消費(fèi)者sequence列表综膀,該方法功能由SequenceGroups代理澳迫,本質(zhì)上還是AtomicReferenceFieldUpdater進(jìn)行更新
     */
    @Override
    public final void addGatingSequences(Sequence... gatingSequences) {
        SequenceGroups.addSequences(this, SEQUENCE_UPDATER, this, gatingSequences);
    }

    /**
     * 移除消費(fèi)者sequence列表,該方法功能由SequenceGroups代理剧劝,本質(zhì)上還是AtomicReferenceFieldUpdater進(jìn)行更新
     */
    @Override
    public boolean removeGatingSequence(Sequence sequence) {
        return SequenceGroups.removeSequence(this, SEQUENCE_UPDATER, sequence);
    }

    /**
     * 獲取消費(fèi)者sequence列表中最小的索引下標(biāo)值橄登,cursor.get()為生產(chǎn)者下標(biāo)值
     */
    @Override
    public long getMinimumSequence() {
        return Util.getMinimumSequence(gatingSequences, cursor.get());
    }

SingleProducerSequencer

SingleProducerSequencer中最主要的屬性為nextValue和cachedValue,其中nextValue表示獲取下一個(gè)可用slot的索引下標(biāo)讥此,而cachedValue則是當(dāng)前消費(fèi)者消費(fèi)最慢slot的索引下標(biāo)拢锹,下面以next(int n)方法為例分析生產(chǎn)者獲取可用sequence的邏輯。

/**
 * 申請N個(gè)可用slot萄喳,用來存放生產(chǎn)出來的數(shù)據(jù)
 * @param n 可用sequence的個(gè)數(shù)
 */
public long next(int n) {
        // 申請個(gè)數(shù)不能<1卒稳,不能>bufferSize
        if (n < 1 || n > bufferSize) {
            throw new IllegalArgumentException("n must be > 0 and < bufferSize");
        }

        // 當(dāng)前slot下標(biāo),初始時(shí)為-1
        long nextValue = this.nextValue;

        // 申請可用槽位的最大下標(biāo)他巨,默認(rèn)一次申請1個(gè)充坑,所以初始時(shí)位0
        long nextSequence = nextValue + n;
        // 倒轉(zhuǎn)1輪,與消費(fèi)者進(jìn)行比較
        long wrapPoint = nextSequence - bufferSize;

        // 消費(fèi)者中記錄的最小slot下標(biāo)染突,默認(rèn)為-1
        long cachedGatingSequence = this.cachedValue;

        // wrapPoint > cachedGatingSequence捻爷,主要用來判斷是否出現(xiàn)環(huán)路,即生產(chǎn)者生產(chǎn)速度過快份企,追上了消費(fèi)者也榄;
        // cachedGatingSequence > nextValue,主要用來判斷是否消費(fèi)者消費(fèi)過快司志,追上了生產(chǎn)者甜紫;
        if (wrapPoint > cachedGatingSequence || cachedGatingSequence > nextValue) {
            // 設(shè)置生產(chǎn)者要使用的slot下標(biāo),底層為volatile語義
            cursor.setVolatile(nextValue);  // StoreLoad fence
            
            long minSequence;
            // 重新最小slot值俐芯,此處分兩種情況討論:
            // 如果nextValue<gatingSequences最小值棵介,說明消費(fèi)者消費(fèi)過快,那么此時(shí)wrapPoint=nextValue + n-bufferSize<nextValue吧史,直接跳出循環(huán)邮辽,消費(fèi)者處直接等待;
            // 如果nextValue>gatingSequences最小值贸营,那么判斷是否出現(xiàn)環(huán)路吨述,如果不出現(xiàn),那么直接分配钞脂,否則揣云,生產(chǎn)者等待;
            while (wrapPoint > (minSequence = Util.getMinimumSequence(gatingSequences, nextValue))) {
                // 生產(chǎn)者等待
                LockSupport.parkNanos(1L); // TODO: Use waitStrategy to spin?
            }

            this.cachedValue = minSequence;
        }
        // 更新生產(chǎn)者已使用的slot下標(biāo)
        this.nextValue = nextSequence;

        return nextSequence;
}

從上述的代碼分析來看冰啃,申請可用slot主要通過生產(chǎn)者和消費(fèi)者的sequence進(jìn)行比較邓夕,單生產(chǎn)者與多生產(chǎn)者的差別僅在于多生產(chǎn)者對sequence的更新可能產(chǎn)生爭用刘莹,因此使用CAS機(jī)制進(jìn)行更新;

publishEvent

事件發(fā)布也是RingBuffer提供的一個(gè)重要功能焚刚,其相當(dāng)于為生產(chǎn)者提供了數(shù)據(jù)操作的入口点弯,通過前面的繼承體系可以看出,事件相關(guān)操作是由EventSink接口提供的矿咕,下面對其先進(jìn)行一個(gè)簡單的分析抢肛。

public void publishEvent(EventTranslator<E> translator) {
        // 占用slot位置
        final long sequence = sequencer.next();
        // 發(fā)布事件
        translateAndPublish(translator, sequence);
}
......
private void translateAndPublish(EventTranslator<E> translator, long sequence) {
        try {
            // 事件轉(zhuǎn)換
            translator.translateTo(get(sequence), sequence);
        } finally {
            // 更新生產(chǎn)者sequence的值
            sequencer.publish(sequence);
        }
}
......
public void publish(long sequence) {
        // 更新生產(chǎn)者的value
        cursor.set(sequence);
        // 喚醒消費(fèi)者線程,這個(gè)與等待策略有關(guān)碳柱,此處先不具體討論
        waitStrategy.signalAllWhenBlocking();
}

結(jié)合上面next()的實(shí)現(xiàn)源碼捡絮,我們大致可以得出如下結(jié)論,以單事件發(fā)布為例莲镣,其是一個(gè)兩階段操作

  • 預(yù)占用slot福稳,更新sequencer的nextValue,這樣下次分配時(shí)從新的nextValue往下分配
  • 對slot的具體數(shù)據(jù)進(jìn)行更新瑞侮,同時(shí)更新生產(chǎn)者的sequence

由于是單生產(chǎn)者灵寺,這種操作方式不會(huì)產(chǎn)生并發(fā)問題。但是在多生產(chǎn)者中区岗,邏輯會(huì)有問題略板,比如如果在第二步中更新sequence,在一定程度上會(huì)存在順序錯(cuò)亂的問題慈缔,因此在預(yù)申請的時(shí)候就要更新生產(chǎn)者sequence叮称,同時(shí)如何判斷數(shù)據(jù)發(fā)布成功了呢?我們后面揭曉藐鹤。

MultiProducerSequencer

    // UNSAFE對象實(shí)例
    private static final Unsafe UNSAFE = Util.getUnsafe();
    // 獲取int數(shù)組的首個(gè)元素偏移位置
    private static final long   BASE   = UNSAFE.arrayBaseOffset(int[].class);
    // 返回一個(gè)int數(shù)組中對象指針占用的內(nèi)存大小
    private static final long   SCALE  = UNSAFE.arrayIndexScale(int[].class);

    // 緩存的消費(fèi)者最慢slot的下標(biāo)
    private final Sequence gatingSequenceCache = new Sequence(Sequencer.INITIAL_CURSOR_VALUE);

    // availableBuffer tracks the state of each ringbuffer slot
    // see below for more details on the approach
    private final int[] availableBuffer;

    // ringbuffer最大下標(biāo)
    private final int indexMask;

    // log2(bufferSize)
    private final int indexShift;

通過這些屬性定義瓤檐,我們似乎有一點(diǎn)熟悉,比如BASE娱节、SCALE挠蛉、indexMask、indexShift這些概念肄满,實(shí)際上這些值主要也是為了加速計(jì)算谴古,其中比較讓人懵圈的是int數(shù)組availableBuffer,所以可以想象BASE這些主要是為了對其進(jìn)行內(nèi)存操作稠歉;

public MultiProducerSequencer(int bufferSize, final WaitStrategy waitStrategy) {
        super(bufferSize, waitStrategy);
        // 初始化availableBuffer大小為RingBuffer的大小
        availableBuffer = new int[bufferSize];
        indexMask = bufferSize - 1;
        indexShift = Util.log2(bufferSize);
        initialiseAvailableBuffer();
}

private void initialiseAvailableBuffer() {
         // 這個(gè)寫法有點(diǎn)奇怪掰担,提前分配內(nèi)存,值為-1
        for (int i = availableBuffer.length - 1; i != 0; i--) {
            setAvailableBufferValue(i, -1);
        }

        setAvailableBufferValue(0, -1);
}

OK怒炸,分析到這里带饱,我們大致了解了多生產(chǎn)者的一些關(guān)鍵信息,下面回到在單生產(chǎn)者發(fā)布事件的問題上來繼續(xù)探究阅羹,我們知道無論是哪種生產(chǎn)者模式勺疼,第一步都是申請可用的slot教寂,那下面直接上多生產(chǎn)者申請slot的代碼。

public long next(int n) {
        // 基礎(chǔ)的邏輯判斷
        if (n < 1 || n > bufferSize) {
            throw new IllegalArgumentException("n must be > 0 and < bufferSize");
        }
        // 生產(chǎn)者實(shí)際的sequence值 
        long current;
        // 申請可用slot的最大值
        long next;

        do {
            // 實(shí)際sequence执庐,初始時(shí)為-1
            current = cursor.get();
            // 申請可用slot的最大值孝宗,初始時(shí)為0
            next = current + n;
            
            // 下面與單生產(chǎn)者類似
            long wrapPoint = next - bufferSize;
            long cachedGatingSequence = gatingSequenceCache.get();
            // 這里補(bǔ)充一點(diǎn),cachedGatingSequence是緩存的上一次可能出現(xiàn)環(huán)路或消費(fèi)過快時(shí)的消費(fèi)者最慢slot的下標(biāo)耕肩,正常申請時(shí)可能存在多輪未更新
            // 因此理論上cachedGatingSequence可能并不是最新的值,且其值相比實(shí)際值較形侍丁猿诸;
            if (wrapPoint > cachedGatingSequence || cachedGatingSequence > current) {
                // 獲取最小的sequence值
                long gatingSequence = Util.getMinimumSequence(gatingSequences, current);
                // 如果出現(xiàn)環(huán)路,等待然后繼續(xù)循環(huán)
                if (wrapPoint > gatingSequence) {
                    LockSupport.parkNanos(1); // TODO, should we spin based on the wait strategy?
                    continue;
                }
                // 未出現(xiàn)環(huán)路狡忙,更新消費(fèi)者的sequence值
                gatingSequenceCache.set(gatingSequence);
            // CAS更新生產(chǎn)者的sequence梳虽,實(shí)現(xiàn)free-lock
            } else if (cursor.compareAndSet(current, next)) {
                // 更新成功跳出
                break;
            }
        } while (true);
      
        return next;
}

從上述可以看出,next方法在多生產(chǎn)者中永遠(yuǎn)都依賴生產(chǎn)者的實(shí)際sequence值進(jìn)行計(jì)算灾茁,且占用成功后實(shí)時(shí)更新窜觉;

這里是與單生產(chǎn)者模式的第一個(gè)差別,單生產(chǎn)者中計(jì)算依賴的是其自身的nextValue值北专,其通過publish才會(huì)實(shí)際更新生產(chǎn)者游標(biāo)禀挫,所以單生產(chǎn)者是一個(gè)典型的二階段提交;

對于多生產(chǎn)者來說拓颓,由于生產(chǎn)者游標(biāo)被實(shí)時(shí)更新掉了语婴,那消費(fèi)者依據(jù)什么來判斷數(shù)據(jù)已經(jīng)替換(發(fā)布)成功了呢?查看MultiProducerSequencer的publish方法驶睦,代碼如下:

public void publish(final long sequence) {
        // 關(guān)鍵代碼
        setAvailable(sequence);
        waitStrategy.signalAllWhenBlocking();
}

繼續(xù)往下翻看setAvailable的代碼:

private void setAvailable(final long sequence) {
        setAvailableBufferValue(calculateIndex(sequence), calculateAvailabilityFlag(sequence));
}
...
private int calculateAvailabilityFlag(final long sequence) {
        // 計(jì)算值=sequence / bufferSize砰左,即該sequence屬于哪一圈
        return (int) (sequence >>> indexShift);
}

private int calculateIndex(final long sequence) {
        // 計(jì)算值=sequence % bufferSize,即該sequence的實(shí)際數(shù)組下標(biāo)
        return ((int) sequence) & indexMask;
}
// 由上可知场航,availableBuffer中記錄了對應(yīng)sequence的實(shí)際圈數(shù)(round)
private void setAvailableBufferValue(int index, int flag) {
        // 計(jì)算偏移量
        long bufferAddress = (index * SCALE) + BASE;
        UNSAFE.putOrderedInt(availableBuffer, bufferAddress, flag);
}

由上我們可知缠导,availableBuffer中記錄了最新sequence對應(yīng)的圈數(shù),這里我們先臨時(shí)插入一個(gè)消費(fèi)者的代碼片段溉痢,在Disruptor中僻造,消費(fèi)者的接口主要為EventProcessor,這里以WorkProcessor為例孩饼,代碼如下:

if (cachedAvailableSequence >= nextSequence) {
  // 獲取下一個(gè)工作序列
  event = ringBuffer.get(nextSequence);
  // 處理序列
  workHandler.onEvent(event);
  // 更新標(biāo)記為true
  processedSequence = true;
} else {
  // 獲取可處理的最大sequence序列
  cachedAvailableSequence = sequenceBarrier.waitFor(nextSequence);
}
...
public long waitFor(final long sequence) throws AlertException, InterruptedException, TimeoutException {
  checkAlert();
  // 獲取可用的序列
  long availableSequence = waitStrategy.waitFor(sequence, cursorSequence, dependentSequence, this);

  if (availableSequence < sequence) {
    return availableSequence;
  }
  // 關(guān)鍵在這里 !import
 return sequencer.getHighestPublishedSequence(sequence, availableSequence);
}

可知消費(fèi)者消費(fèi)數(shù)據(jù)時(shí)強(qiáng)依賴sequencer.getHighestPublishedSequence的方法嫡意,多生產(chǎn)者的代碼實(shí)現(xiàn)如下:

public long getHighestPublishedSequence(long lowerBound, long availableSequence) {
        for (long sequence = lowerBound; sequence <= availableSequence; sequence++) {
            // 關(guān)鍵代碼 !import
            if (!isAvailable(sequence)) {
                return sequence - 1;
            }
        }
        return availableSequence;
}
...
public boolean isAvailable(long sequence) {
        // %捣辆,計(jì)算實(shí)際的下標(biāo)
        int index = calculateIndex(sequence);
        // 計(jì)算其round
        int flag = calculateAvailabilityFlag(sequence);
        long bufferAddress = (index * SCALE) + BASE;
        // 判斷其是否與標(biāo)識位相同蔬螟,如果不相同,則代碼數(shù)據(jù)未publish完成
        return UNSAFE.getIntVolatile(availableBuffer, bufferAddress) == flag;
}

由上整個(gè)分析基本結(jié)束汽畴,即消費(fèi)者依靠availableBuffer的值來判斷該數(shù)據(jù)是否已經(jīng)發(fā)布完畢旧巾;

總結(jié)

本文主要對RingBuffer的數(shù)據(jù)結(jié)構(gòu)耸序、生產(chǎn)者以及數(shù)據(jù)發(fā)布流程進(jìn)行了一個(gè)大致的分析,但其中有很多的細(xì)節(jié)還未來得及分析鲁猩,比如生產(chǎn)者和消費(fèi)者之間是如何進(jìn)行數(shù)據(jù)聯(lián)動(dòng)的坎怪,Barrier在其中扮演的角色又是什么,這些留到下一篇文章進(jìn)行深入探討廓握。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末搅窿,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子隙券,更是在濱河造成了極大的恐慌男应,老刑警劉巖,帶你破解...
    沈念sama閱讀 221,635評論 6 515
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件娱仔,死亡現(xiàn)場離奇詭異沐飘,居然都是意外死亡,警方通過查閱死者的電腦和手機(jī)牲迫,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 94,543評論 3 399
  • 文/潘曉璐 我一進(jìn)店門耐朴,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人盹憎,你說我怎么就攤上這事筛峭。” “怎么了陪每?”我有些...
    開封第一講書人閱讀 168,083評論 0 360
  • 文/不壞的土叔 我叫張陵蜒滩,是天一觀的道長。 經(jīng)常有香客問我奶稠,道長俯艰,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 59,640評論 1 296
  • 正文 為了忘掉前任锌订,我火速辦了婚禮竹握,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘辆飘。我一直安慰自己啦辐,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 68,640評論 6 397
  • 文/花漫 我一把揭開白布蜈项。 她就那樣靜靜地躺著芹关,像睡著了一般。 火紅的嫁衣襯著肌膚如雪紧卒。 梳的紋絲不亂的頭發(fā)上侥衬,一...
    開封第一講書人閱讀 52,262評論 1 308
  • 那天,我揣著相機(jī)與錄音,去河邊找鬼轴总。 笑死直颅,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的怀樟。 我是一名探鬼主播功偿,決...
    沈念sama閱讀 40,833評論 3 421
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼往堡!你這毒婦竟也來了械荷?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 39,736評論 0 276
  • 序言:老撾萬榮一對情侶失蹤虑灰,失蹤者是張志新(化名)和其女友劉穎吨瞎,沒想到半個(gè)月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體瘩缆,經(jīng)...
    沈念sama閱讀 46,280評論 1 319
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 38,369評論 3 340
  • 正文 我和宋清朗相戀三年佃蚜,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了庸娱。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 40,503評論 1 352
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡谐算,死狀恐怖熟尉,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情洲脂,我是刑警寧澤斤儿,帶...
    沈念sama閱讀 36,185評論 5 350
  • 正文 年R本政府宣布,位于F島的核電站恐锦,受9級特大地震影響往果,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜一铅,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,870評論 3 333
  • 文/蒙蒙 一陕贮、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧潘飘,春花似錦肮之、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,340評論 0 24
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至艰毒,卻和暖如春筐高,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 33,460評論 1 272
  • 我被黑心中介騙來泰國打工凯傲, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留犬辰,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 48,909評論 3 376
  • 正文 我出身青樓冰单,卻偏偏與公主長得像幌缝,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個(gè)殘疾皇子诫欠,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,512評論 2 359

推薦閱讀更多精彩內(nèi)容