前言
RingBuffer是Disruptor框架負(fù)責(zé)數(shù)據(jù)存儲(chǔ)的模塊,大部分文章也將其稱之為環(huán)形緩存區(qū)廉涕,本文將對其實(shí)現(xiàn)原理進(jìn)行深度探究。
本文依賴Disruptor版本為3.4.3
繼承體系
實(shí)現(xiàn)接口說明
- DataProvider:獲取指定位置的數(shù)據(jù);
- EventSink:發(fā)布事件到RingBuffer的接口定義,提供了各種事件發(fā)布的操作方法同眯;
- Cursored:獲取當(dāng)前游標(biāo)位置;在Disruptor中都是依賴于Sequence的value唯鸭;
- Sequenced:提供了RingBuffer相關(guān)的sequence判斷操作方法须蜗,包括獲取大小、占用slot、發(fā)布等方法明肮;
繼承類說明
- RingBufferFields:實(shí)現(xiàn)RingBuffer內(nèi)存預(yù)分配以及數(shù)據(jù)定位等功能菱农,此類非常關(guān)鍵,后續(xù)進(jìn)行詳細(xì)介紹辰斋;
- RingBufferPad:解決緩存行偽共享的填充類滤否;
RingBuffer初始化流程
// RingBuffer的構(gòu)造方法
RingBuffer(EventFactory<E> eventFactory, Sequencer sequencer) {
// 調(diào)用父類即RingBufferFields的構(gòu)造方法
super(eventFactory, sequencer);
}
可以看出RingBuffer的初始化流程邏輯均在其父類中泼返,我們詳細(xì)分析RingBufferFields類的實(shí)現(xiàn)邏輯;
abstract class RingBufferFields<E> extends RingBufferPad {
// 用來預(yù)留填充的單側(cè)slot個(gè)數(shù)的妖,加速計(jì)算
private static final int BUFFER_PAD;
// RingBuffer中首個(gè)slot的偏移位置
private static final long REF_ARRAY_BASE;
// log2(scale),該值與BUFFER_PAD作用相似,主要用來加速計(jì)算
private static final int REF_ELEMENT_SHIFT;
// 反射拿到UNSAFE對象
private static final Unsafe UNSAFE = Util.getUnsafe();
static {
// 返回Object數(shù)組中一個(gè)元素指針占用的內(nèi)存大小
final int scale = UNSAFE.arrayIndexScale(Object[].class);
// 指針占4個(gè)字節(jié)
if (4 == scale) {
REF_ELEMENT_SHIFT = 2;
// 指針占8個(gè)字節(jié)
} else if (8 == scale) {
REF_ELEMENT_SHIFT = 3;
} else {
throw new IllegalStateException("Unknown pointer size");
}
// BUFFER_PAD的取值為16/32足陨,后續(xù)計(jì)算使用
BUFFER_PAD = 128 / scale;
// 計(jì)算數(shù)組首元素的偏移大小嫂粟,往后偏移了128個(gè)字節(jié),128/8=16
REF_ARRAY_BASE = UNSAFE.arrayBaseOffset(Object[].class) + 128;
}
// 數(shù)組的最大下標(biāo)墨缘,即bufferSize - 1
private final long indexMask;
// 對象數(shù)組星虹,RingBuffer中實(shí)際數(shù)據(jù)載體
private final Object[] entries;
// RingBuffer的長度
protected final int bufferSize;
// 序列器,分為單生產(chǎn)者和多生產(chǎn)者镊讼,用來在生產(chǎn)者和消費(fèi)者之間傳遞數(shù)據(jù)宽涌,此處暫時(shí)略過
protected final Sequencer sequencer;
RingBufferFields(EventFactory<E> eventFactory, Sequencer sequencer) {
this.sequencer = sequencer;
this.bufferSize = sequencer.getBufferSize();
if (bufferSize < 1) {
throw new IllegalArgumentException("bufferSize must not be less than 1");
}
// bufferSize必須為2^N,否則進(jìn)行報(bào)錯(cuò)
if (Integer.bitCount(bufferSize) != 1) {
throw new IllegalArgumentException("bufferSize must be a power of 2");
}
// 最大下標(biāo)
this.indexMask = bufferSize - 1;
// 預(yù)留32/64個(gè)空槽位
this.entries = new Object[sequencer.getBufferSize() + 2 * BUFFER_PAD];
/**
* 一次性填充滿整個(gè)數(shù)組狠毯,從BUFFER_PAD的下一位開始护糖;
*/
fill(eventFactory);
}
private void fill(EventFactory<E> eventFactory) {
// 預(yù)留BUFFER_PAD個(gè)空slot
for (int i = 0; i < bufferSize; i++) {
entries[BUFFER_PAD + i] = eventFactory.newInstance();
}
}
@SuppressWarnings("unchecked")
protected final E elementAt(long sequence) {
// 元素內(nèi)存偏移量=base + index * scale
// REF_ARRAY_BASE= UNSAFE.arrayBaseOffset(Object[].class) + 128;
// 因?yàn)镽ingBuffer的長度為2^N,所以(sequence & indexMask) = sequence % bufferSize = index
// index<<REF_ELEMENT_SHIFT = index << (2 ^ (log2(scale))) = index * scale
return (E) UNSAFE.getObject(entries, REF_ARRAY_BASE + ((sequence & indexMask) << REF_ELEMENT_SHIFT));
}
}
上述代碼分析中嚼松,有幾處關(guān)鍵點(diǎn)如下:
- 所謂的環(huán)形緩沖區(qū)實(shí)質(zhì)上就是一個(gè)連續(xù)數(shù)組嫡良,只是包裝出來了環(huán)形的概念;
- RingBuffer預(yù)分配內(nèi)存時(shí)在數(shù)組的左右倆側(cè)各預(yù)留了BUFFER_PAD個(gè)slot献酗;
- 內(nèi)存預(yù)分配時(shí)從BUFFER_PAD處開始寝受,在64位JDK中,假設(shè)開啟指針壓縮罕偎,則相當(dāng)于左側(cè)預(yù)留32個(gè)slot很澄,右側(cè)預(yù)留32個(gè)slot;
- 在RingBuffer長度為2^N時(shí)颜及,通過sequence & (bufferSize - 1)加速定位元素實(shí)際下標(biāo)索引甩苛,通過結(jié)合<<操作實(shí)現(xiàn)乘法;
- 首個(gè)元素的內(nèi)存偏移位置是固定的俏站,即128位讯蒲,因?yàn)锽UFFER_PAD * scale=128;
在MultiProducerSequencer中后續(xù)通過>>>實(shí)現(xiàn)除法肄扎,作用類似墨林;
為何預(yù)留slot赁酝?
在現(xiàn)代CPU中,緩存行偽共享是一個(gè)經(jīng)典問題旭等,即CPU一次load的L1緩存行大小是固定的酌呆,如果兩個(gè)不同的元素被load到同一緩存行中,那么任何一個(gè)元素的修改都會(huì)導(dǎo)致另一元素在其它CPU核對應(yīng)的緩存失效搔耕,這個(gè)問題在很多博文中都有專門講解隙袁,此處不再贅述。引用結(jié)論就是單個(gè)頻繁修改元素盡量不和其它元素在同一緩存行中度迂。
對于entries對象藤乙,左右兩側(cè)各預(yù)留了128 bytes,這就保證了緩存行在<=128bytes時(shí)惭墓,entries對象一定不會(huì)和其它對象共享緩存行坛梁。
Sequence是什么?
Sequence是針對RingBuffer中slot位置的一個(gè)描述腊凶,與AtomicLong的功能幾乎一致划咐,其在AtomicLong的基礎(chǔ)上解決了緩存行偽共享的問題,其繼承關(guān)系如下:
其核心元素為value钧萍,根據(jù)Java的內(nèi)存分配褐缠,value兩次各8個(gè)long對象,即大小為8 * 8=64 bytes风瘦,那么在緩存行<=64 bytes時(shí)队魏,可以避免緩存行偽共享問題。
下面對其關(guān)鍵的兩個(gè)方法做下簡單分析:
/**
* Perform an ordered write of this sequence. The intent is a Store/Store barrier between this write and any previous store.
*
* putOrderedLong在執(zhí)行時(shí)使用store/store內(nèi)存屏障万搔,因此其在可見性上會(huì)存在一定的延時(shí)胡桨,但是由于其未使用store/load屏障,因此其寫入是無阻塞的
*/
public void set(final long value) {
UNSAFE.putOrderedLong(this, VALUE_OFFSET, value);
}
/**
* Performs a volatile write of this sequence. The intent is a Store/Store barrier between this write and any previous write and a
* Store/Load barrier between this write and any subsequent volatile read.
*
* 本方法支持volatile語義瞬雹,即在寫入前插入store/store內(nèi)存屏障昧谊,寫入后插入store/load內(nèi)存屏障,因此值的修改在其它線程是立即可見的
*/
public void setVolatile(final long value) {
UNSAFE.putLongVolatile(this, VALUE_OFFSET, value);
}
Sequencer是什么酗捌?
前面講完了Sequence呢诬,是時(shí)候推出Sequencer了,Sequencer是Disruptor中非常核心的一個(gè)概念胖缤,其作為生產(chǎn)者和消費(fèi)者的橋梁尚镰,通過維護(hù)兩者的sequence來實(shí)現(xiàn)數(shù)據(jù)的扭轉(zhuǎn);
從繼承體系上來看哪廓,sequencer主要分為單生產(chǎn)者和多生產(chǎn)者兩類狗唉,這兩者實(shí)現(xiàn)了各種free-lock的算法,此處以SingleProducerSequencer為例分析撩独。
AbstractSequencer
該類主要屬性說明如下:
// JDK提供的基于反射機(jī)制的volatile原子更新工具類敞曹,本更新器用來更新gatingSequences屬性
private static final AtomicReferenceFieldUpdater<AbstractSequencer, Sequence[]> SEQUENCE_UPDATER =
AtomicReferenceFieldUpdater.newUpdater(AbstractSequencer.class, Sequence[].class, "gatingSequences");
// RingBuffer中環(huán)形緩沖區(qū)的大小
protected final int bufferSize;
// 消費(fèi)者等待策略
protected final WaitStrategy waitStrategy;
// 生產(chǎn)者sequence,初始位置為-1
protected final Sequence cursor = new Sequence(Sequencer.INITIAL_CURSOR_VALUE);
// 消費(fèi)者sequence集合
protected volatile Sequence[] gatingSequences = new Sequence[0];
該類主要方法說明如下:
/**
* 增加消費(fèi)者sequence列表综膀,該方法功能由SequenceGroups代理澳迫,本質(zhì)上還是AtomicReferenceFieldUpdater進(jìn)行更新
*/
@Override
public final void addGatingSequences(Sequence... gatingSequences) {
SequenceGroups.addSequences(this, SEQUENCE_UPDATER, this, gatingSequences);
}
/**
* 移除消費(fèi)者sequence列表,該方法功能由SequenceGroups代理剧劝,本質(zhì)上還是AtomicReferenceFieldUpdater進(jìn)行更新
*/
@Override
public boolean removeGatingSequence(Sequence sequence) {
return SequenceGroups.removeSequence(this, SEQUENCE_UPDATER, sequence);
}
/**
* 獲取消費(fèi)者sequence列表中最小的索引下標(biāo)值橄登,cursor.get()為生產(chǎn)者下標(biāo)值
*/
@Override
public long getMinimumSequence() {
return Util.getMinimumSequence(gatingSequences, cursor.get());
}
SingleProducerSequencer
SingleProducerSequencer中最主要的屬性為nextValue和cachedValue,其中nextValue表示獲取下一個(gè)可用slot的索引下標(biāo)讥此,而cachedValue則是當(dāng)前消費(fèi)者消費(fèi)最慢slot的索引下標(biāo)拢锹,下面以next(int n)方法為例分析生產(chǎn)者獲取可用sequence的邏輯。
/**
* 申請N個(gè)可用slot萄喳,用來存放生產(chǎn)出來的數(shù)據(jù)
* @param n 可用sequence的個(gè)數(shù)
*/
public long next(int n) {
// 申請個(gè)數(shù)不能<1卒稳,不能>bufferSize
if (n < 1 || n > bufferSize) {
throw new IllegalArgumentException("n must be > 0 and < bufferSize");
}
// 當(dāng)前slot下標(biāo),初始時(shí)為-1
long nextValue = this.nextValue;
// 申請可用槽位的最大下標(biāo)他巨,默認(rèn)一次申請1個(gè)充坑,所以初始時(shí)位0
long nextSequence = nextValue + n;
// 倒轉(zhuǎn)1輪,與消費(fèi)者進(jìn)行比較
long wrapPoint = nextSequence - bufferSize;
// 消費(fèi)者中記錄的最小slot下標(biāo)染突,默認(rèn)為-1
long cachedGatingSequence = this.cachedValue;
// wrapPoint > cachedGatingSequence捻爷,主要用來判斷是否出現(xiàn)環(huán)路,即生產(chǎn)者生產(chǎn)速度過快份企,追上了消費(fèi)者也榄;
// cachedGatingSequence > nextValue,主要用來判斷是否消費(fèi)者消費(fèi)過快司志,追上了生產(chǎn)者甜紫;
if (wrapPoint > cachedGatingSequence || cachedGatingSequence > nextValue) {
// 設(shè)置生產(chǎn)者要使用的slot下標(biāo),底層為volatile語義
cursor.setVolatile(nextValue); // StoreLoad fence
long minSequence;
// 重新最小slot值俐芯,此處分兩種情況討論:
// 如果nextValue<gatingSequences最小值棵介,說明消費(fèi)者消費(fèi)過快,那么此時(shí)wrapPoint=nextValue + n-bufferSize<nextValue吧史,直接跳出循環(huán)邮辽,消費(fèi)者處直接等待;
// 如果nextValue>gatingSequences最小值贸营,那么判斷是否出現(xiàn)環(huán)路吨述,如果不出現(xiàn),那么直接分配钞脂,否則揣云,生產(chǎn)者等待;
while (wrapPoint > (minSequence = Util.getMinimumSequence(gatingSequences, nextValue))) {
// 生產(chǎn)者等待
LockSupport.parkNanos(1L); // TODO: Use waitStrategy to spin?
}
this.cachedValue = minSequence;
}
// 更新生產(chǎn)者已使用的slot下標(biāo)
this.nextValue = nextSequence;
return nextSequence;
}
從上述的代碼分析來看冰啃,申請可用slot主要通過生產(chǎn)者和消費(fèi)者的sequence進(jìn)行比較邓夕,單生產(chǎn)者與多生產(chǎn)者的差別僅在于多生產(chǎn)者對sequence的更新可能產(chǎn)生爭用刘莹,因此使用CAS機(jī)制進(jìn)行更新;
publishEvent
事件發(fā)布也是RingBuffer提供的一個(gè)重要功能焚刚,其相當(dāng)于為生產(chǎn)者提供了數(shù)據(jù)操作的入口点弯,通過前面的繼承體系可以看出,事件相關(guān)操作是由EventSink接口提供的矿咕,下面對其先進(jìn)行一個(gè)簡單的分析抢肛。
public void publishEvent(EventTranslator<E> translator) {
// 占用slot位置
final long sequence = sequencer.next();
// 發(fā)布事件
translateAndPublish(translator, sequence);
}
......
private void translateAndPublish(EventTranslator<E> translator, long sequence) {
try {
// 事件轉(zhuǎn)換
translator.translateTo(get(sequence), sequence);
} finally {
// 更新生產(chǎn)者sequence的值
sequencer.publish(sequence);
}
}
......
public void publish(long sequence) {
// 更新生產(chǎn)者的value
cursor.set(sequence);
// 喚醒消費(fèi)者線程,這個(gè)與等待策略有關(guān)碳柱,此處先不具體討論
waitStrategy.signalAllWhenBlocking();
}
結(jié)合上面next()的實(shí)現(xiàn)源碼捡絮,我們大致可以得出如下結(jié)論,以單事件發(fā)布為例莲镣,其是一個(gè)兩階段操作
- 預(yù)占用slot福稳,更新sequencer的nextValue,這樣下次分配時(shí)從新的nextValue往下分配
- 對slot的具體數(shù)據(jù)進(jìn)行更新瑞侮,同時(shí)更新生產(chǎn)者的sequence
由于是單生產(chǎn)者灵寺,這種操作方式不會(huì)產(chǎn)生并發(fā)問題。但是在多生產(chǎn)者中区岗,邏輯會(huì)有問題略板,比如如果在第二步中更新sequence,在一定程度上會(huì)存在順序錯(cuò)亂的問題慈缔,因此在預(yù)申請的時(shí)候就要更新生產(chǎn)者sequence叮称,同時(shí)如何判斷數(shù)據(jù)發(fā)布成功了呢?我們后面揭曉藐鹤。
MultiProducerSequencer
// UNSAFE對象實(shí)例
private static final Unsafe UNSAFE = Util.getUnsafe();
// 獲取int數(shù)組的首個(gè)元素偏移位置
private static final long BASE = UNSAFE.arrayBaseOffset(int[].class);
// 返回一個(gè)int數(shù)組中對象指針占用的內(nèi)存大小
private static final long SCALE = UNSAFE.arrayIndexScale(int[].class);
// 緩存的消費(fèi)者最慢slot的下標(biāo)
private final Sequence gatingSequenceCache = new Sequence(Sequencer.INITIAL_CURSOR_VALUE);
// availableBuffer tracks the state of each ringbuffer slot
// see below for more details on the approach
private final int[] availableBuffer;
// ringbuffer最大下標(biāo)
private final int indexMask;
// log2(bufferSize)
private final int indexShift;
通過這些屬性定義瓤檐,我們似乎有一點(diǎn)熟悉,比如BASE娱节、SCALE挠蛉、indexMask、indexShift這些概念肄满,實(shí)際上這些值主要也是為了加速計(jì)算谴古,其中比較讓人懵圈的是int數(shù)組availableBuffer,所以可以想象BASE這些主要是為了對其進(jìn)行內(nèi)存操作稠歉;
public MultiProducerSequencer(int bufferSize, final WaitStrategy waitStrategy) {
super(bufferSize, waitStrategy);
// 初始化availableBuffer大小為RingBuffer的大小
availableBuffer = new int[bufferSize];
indexMask = bufferSize - 1;
indexShift = Util.log2(bufferSize);
initialiseAvailableBuffer();
}
private void initialiseAvailableBuffer() {
// 這個(gè)寫法有點(diǎn)奇怪掰担,提前分配內(nèi)存,值為-1
for (int i = availableBuffer.length - 1; i != 0; i--) {
setAvailableBufferValue(i, -1);
}
setAvailableBufferValue(0, -1);
}
OK怒炸,分析到這里带饱,我們大致了解了多生產(chǎn)者的一些關(guān)鍵信息,下面回到在單生產(chǎn)者發(fā)布事件的問題上來繼續(xù)探究阅羹,我們知道無論是哪種生產(chǎn)者模式勺疼,第一步都是申請可用的slot教寂,那下面直接上多生產(chǎn)者申請slot的代碼。
public long next(int n) {
// 基礎(chǔ)的邏輯判斷
if (n < 1 || n > bufferSize) {
throw new IllegalArgumentException("n must be > 0 and < bufferSize");
}
// 生產(chǎn)者實(shí)際的sequence值
long current;
// 申請可用slot的最大值
long next;
do {
// 實(shí)際sequence执庐,初始時(shí)為-1
current = cursor.get();
// 申請可用slot的最大值孝宗,初始時(shí)為0
next = current + n;
// 下面與單生產(chǎn)者類似
long wrapPoint = next - bufferSize;
long cachedGatingSequence = gatingSequenceCache.get();
// 這里補(bǔ)充一點(diǎn),cachedGatingSequence是緩存的上一次可能出現(xiàn)環(huán)路或消費(fèi)過快時(shí)的消費(fèi)者最慢slot的下標(biāo)耕肩,正常申請時(shí)可能存在多輪未更新
// 因此理論上cachedGatingSequence可能并不是最新的值,且其值相比實(shí)際值較形侍丁猿诸;
if (wrapPoint > cachedGatingSequence || cachedGatingSequence > current) {
// 獲取最小的sequence值
long gatingSequence = Util.getMinimumSequence(gatingSequences, current);
// 如果出現(xiàn)環(huán)路,等待然后繼續(xù)循環(huán)
if (wrapPoint > gatingSequence) {
LockSupport.parkNanos(1); // TODO, should we spin based on the wait strategy?
continue;
}
// 未出現(xiàn)環(huán)路狡忙,更新消費(fèi)者的sequence值
gatingSequenceCache.set(gatingSequence);
// CAS更新生產(chǎn)者的sequence梳虽,實(shí)現(xiàn)free-lock
} else if (cursor.compareAndSet(current, next)) {
// 更新成功跳出
break;
}
} while (true);
return next;
}
從上述可以看出,next方法在多生產(chǎn)者中永遠(yuǎn)都依賴生產(chǎn)者的實(shí)際sequence值進(jìn)行計(jì)算灾茁,且占用成功后實(shí)時(shí)更新窜觉;
這里是與單生產(chǎn)者模式的第一個(gè)差別,單生產(chǎn)者中計(jì)算依賴的是其自身的nextValue值北专,其通過publish才會(huì)實(shí)際更新生產(chǎn)者游標(biāo)禀挫,所以單生產(chǎn)者是一個(gè)典型的二階段提交;
對于多生產(chǎn)者來說拓颓,由于生產(chǎn)者游標(biāo)被實(shí)時(shí)更新掉了语婴,那消費(fèi)者依據(jù)什么來判斷數(shù)據(jù)已經(jīng)替換(發(fā)布)成功了呢?查看MultiProducerSequencer的publish方法驶睦,代碼如下:
public void publish(final long sequence) {
// 關(guān)鍵代碼
setAvailable(sequence);
waitStrategy.signalAllWhenBlocking();
}
繼續(xù)往下翻看setAvailable的代碼:
private void setAvailable(final long sequence) {
setAvailableBufferValue(calculateIndex(sequence), calculateAvailabilityFlag(sequence));
}
...
private int calculateAvailabilityFlag(final long sequence) {
// 計(jì)算值=sequence / bufferSize砰左,即該sequence屬于哪一圈
return (int) (sequence >>> indexShift);
}
private int calculateIndex(final long sequence) {
// 計(jì)算值=sequence % bufferSize,即該sequence的實(shí)際數(shù)組下標(biāo)
return ((int) sequence) & indexMask;
}
// 由上可知场航,availableBuffer中記錄了對應(yīng)sequence的實(shí)際圈數(shù)(round)
private void setAvailableBufferValue(int index, int flag) {
// 計(jì)算偏移量
long bufferAddress = (index * SCALE) + BASE;
UNSAFE.putOrderedInt(availableBuffer, bufferAddress, flag);
}
由上我們可知缠导,availableBuffer中記錄了最新sequence對應(yīng)的圈數(shù),這里我們先臨時(shí)插入一個(gè)消費(fèi)者的代碼片段溉痢,在Disruptor中僻造,消費(fèi)者的接口主要為EventProcessor,這里以WorkProcessor為例孩饼,代碼如下:
if (cachedAvailableSequence >= nextSequence) {
// 獲取下一個(gè)工作序列
event = ringBuffer.get(nextSequence);
// 處理序列
workHandler.onEvent(event);
// 更新標(biāo)記為true
processedSequence = true;
} else {
// 獲取可處理的最大sequence序列
cachedAvailableSequence = sequenceBarrier.waitFor(nextSequence);
}
...
public long waitFor(final long sequence) throws AlertException, InterruptedException, TimeoutException {
checkAlert();
// 獲取可用的序列
long availableSequence = waitStrategy.waitFor(sequence, cursorSequence, dependentSequence, this);
if (availableSequence < sequence) {
return availableSequence;
}
// 關(guān)鍵在這里 !import
return sequencer.getHighestPublishedSequence(sequence, availableSequence);
}
可知消費(fèi)者消費(fèi)數(shù)據(jù)時(shí)強(qiáng)依賴sequencer.getHighestPublishedSequence的方法嫡意,多生產(chǎn)者的代碼實(shí)現(xiàn)如下:
public long getHighestPublishedSequence(long lowerBound, long availableSequence) {
for (long sequence = lowerBound; sequence <= availableSequence; sequence++) {
// 關(guān)鍵代碼 !import
if (!isAvailable(sequence)) {
return sequence - 1;
}
}
return availableSequence;
}
...
public boolean isAvailable(long sequence) {
// %捣辆,計(jì)算實(shí)際的下標(biāo)
int index = calculateIndex(sequence);
// 計(jì)算其round
int flag = calculateAvailabilityFlag(sequence);
long bufferAddress = (index * SCALE) + BASE;
// 判斷其是否與標(biāo)識位相同蔬螟,如果不相同,則代碼數(shù)據(jù)未publish完成
return UNSAFE.getIntVolatile(availableBuffer, bufferAddress) == flag;
}
由上整個(gè)分析基本結(jié)束汽畴,即消費(fèi)者依靠availableBuffer的值來判斷該數(shù)據(jù)是否已經(jīng)發(fā)布完畢旧巾;
總結(jié)
本文主要對RingBuffer的數(shù)據(jù)結(jié)構(gòu)耸序、生產(chǎn)者以及數(shù)據(jù)發(fā)布流程進(jìn)行了一個(gè)大致的分析,但其中有很多的細(xì)節(jié)還未來得及分析鲁猩,比如生產(chǎn)者和消費(fèi)者之間是如何進(jìn)行數(shù)據(jù)聯(lián)動(dòng)的坎怪,Barrier在其中扮演的角色又是什么,這些留到下一篇文章進(jìn)行深入探討廓握。