概述
- RingBuffer#next()這個(gè)方法干的事情是,生產(chǎn)者問(wèn)RingBuffer要一個(gè)能“下蛋”的位置乌奇,具體怎么給生產(chǎn)者給出這個(gè)位置没讲,是由Sequencer的實(shí)現(xiàn)類完成的;
- Disruptor這個(gè)無(wú)鎖并行框架中的“無(wú)鎖”礁苗,在這個(gè)方法中也體現(xiàn)出來(lái)了爬凑;
0)客戶端代碼
- 從這一句開始:long sequence = ringBuffer.next();
import java.nio.ByteBuffer;
import com.lmax.disruptor.RingBuffer;
public class OrderEventProducer {
private RingBuffer<OrderEvent> ringBuffer;
public OrderEventProducer(RingBuffer<OrderEvent> ringBuffer) {
this.ringBuffer = ringBuffer;
}
public void sendData(ByteBuffer data) {
//1 在生產(chǎn)者發(fā)送消息的時(shí)候, 首先 需要從我們的ringBuffer里面 獲取一個(gè)可用的序號(hào)
long sequence = ringBuffer.next(); //0
try {
//2 根據(jù)這個(gè)序號(hào), 找到具體的 "OrderEvent" 元素 注意:此時(shí)獲取的OrderEvent對(duì)象是一個(gè)沒(méi)有被賦值的"空對(duì)象"
OrderEvent event = ringBuffer.get(sequence);
//3 進(jìn)行實(shí)際的賦值處理
event.setValue(data.getLong(0));
} finally {
//4 提交發(fā)布操作
ringBuffer.publish(sequence);
}
}
}
1)RingBuffer#next()
- sequencer是一個(gè)接口,所以要看其實(shí)現(xiàn)類试伙,這里是單消費(fèi)者模式嘁信,所以去SingleProducerSequencer中看next();
@Override
public long next()
{
return sequencer.next();
}
2)SingleProducerSequencer#next()
@Override
public long next()
{
return next(1);
}
3)SingleProducerSequencer#next(int)
其作為服務(wù)提供者迁霎,向RingBuffer提供服務(wù)吱抚,RingBuffer中的相關(guān)信息它都是有的百宇,比如:bufferSize考廉,并且維護(hù)了一些輔助信息,比如:nextValue携御,cachedValue昌粤;
this.nextValue
- 表示生產(chǎn)者最后一次投遞的位置,在方法結(jié)束的時(shí)候啄刹,會(huì)向前走一位涮坐,追上nextSequence;
nextSequence
- 表示生產(chǎn)者這次要投遞的位置誓军;
wrapPoint
- 表示包裹點(diǎn)袱讹,當(dāng)生產(chǎn)者還沒(méi)有生產(chǎn)完一圈的時(shí)候,其值為負(fù),沒(méi)有和生產(chǎn)者中最慢的位置比較的意義捷雕,因?yàn)榫退阆M(fèi)者一個(gè)都沒(méi)消費(fèi)椒丧,其最慢的消費(fèi)位置為-1;就算wrapPoint為-1了救巷,表示生產(chǎn)者要投遞第一圈的最后一個(gè)位置了壶熏,也還沒(méi)追上消費(fèi)者,無(wú)需阻塞浦译;
this.cachedValue
- 表示上一次投遞棒假,消費(fèi)者最慢的位置,這個(gè)位置在最新一次投遞時(shí)精盅,有可能已經(jīng)落后了帽哑,不過(guò)沒(méi)關(guān)系,其作為第一道門檻還是有意義的:如果生產(chǎn)者這次的投遞位置還沒(méi)到上次最慢的消費(fèi)者的位置叹俏,那么無(wú)需擔(dān)心生產(chǎn)者阻塞的問(wèn)題祝拯,直接投遞,不會(huì)覆蓋尚未消費(fèi)的Event的她肯;
minSequence
- 表示最慢的消費(fèi)者的消費(fèi)位置佳头,如果消費(fèi)者一個(gè)都沒(méi)消費(fèi),其值為-1晴氨;minSequence和wrapPoint的關(guān)系是:如果生產(chǎn)者這次要投遞的位置已經(jīng)超過(guò)最慢的消費(fèi)者的位置康嘉,那么生產(chǎn)者這次要投遞的位置上有待消費(fèi)的Event,生產(chǎn)者會(huì)阻塞 1 納秒籽前,直到生產(chǎn)者這次要投遞的位置的Event被消費(fèi)掉亭珍;
@Override
public long next(int n)
{
if (n < 1) // n = 1
{
throw new IllegalArgumentException("n must be > 0");
}
long nextValue = this.nextValue; // -1 0
/**
* nextValue 是生產(chǎn)者當(dāng)前最后投遞成功的位置;
* nextSequence 是生產(chǎn)者的下一個(gè)投遞序號(hào)枝哄;
*/
long nextSequence = nextValue + n; // -1 + 1 = 0, 0+1=1
/**
* 如果生產(chǎn)者還沒(méi)有投夠一圈肄梨,wrapPoint < 0
* wrapPoint > 0,才有和 cachedGatingSequence 比較的意義挠锥;
* wrapPoint 理解成包裹點(diǎn)众羡,生產(chǎn)者投了一圈了才能把entries包裹上;
* wrapPoint 也是生產(chǎn)者的投遞sequence
*/
long wrapPoint = nextSequence - bufferSize; // 0 - 10 = -10, 1-10=-9
/**
* cachedValue 應(yīng)該是上一次生產(chǎn)者獲取sequence的時(shí)候蓖租,最慢的消費(fèi)者的sequence
*/
long cachedGatingSequence = this.cachedValue; // -1
/**
* cachedGatingSequence > nextValue
* 上次生產(chǎn)者投遞的時(shí)候粱侣,最慢的消費(fèi)者的位置都比生產(chǎn)者最后投遞成功的位置大,消費(fèi)者領(lǐng)先于生產(chǎn)者蓖宦;
* wrapPoint > cachedGatingSequence
* 生產(chǎn)者已經(jīng)追上上次最慢的額消費(fèi)者齐婴;
*/
if (wrapPoint > cachedGatingSequence || cachedGatingSequence > nextValue)
{
/**
* 當(dāng)前消費(fèi)者中最慢的sequence;
*/
long minSequence;
/**
* 生產(chǎn)者的速度太快了稠茂,追上消費(fèi)者了
*/
while (wrapPoint > (minSequence = Util.getMinimumSequence(gatingSequences, nextValue)))
{
/**
* 生產(chǎn)者阻塞一丟丟柠偶,讓消費(fèi)者趕緊接著消費(fèi);
*/
LockSupport.parkNanos(1L); // TODO: Use waitStrategy to spin?
}
/**
* 生產(chǎn)者的速度沒(méi)那么快,記錄下這次最慢的消費(fèi)者诱担;
* cachedValue 存的應(yīng)該是消費(fèi)者中最慢的
*/
this.cachedValue = minSequence;
}
/**
* nextValue:已經(jīng)成功投遞的最遠(yuǎn)的位置鲫售;
* nextSequence:本次要投遞的位置;
* this.nextValue = nextSequence; 可以理解成投遞成功该肴;雖然新的Event還沒(méi)有更新到entries中情竹,但意思上已經(jīng)投遞成功了;
*/
this.nextValue = nextSequence; // 0, 1
/**
* 給生產(chǎn)者這次投遞的位置匀哄;
*/
return nextSequence;
}
總結(jié)
- 生產(chǎn)者生產(chǎn)的時(shí)候秦效,按順序生產(chǎn),能生產(chǎn)就生產(chǎn)涎嚼,不能生產(chǎn)就等著阱州;
- 生產(chǎn)者和消費(fèi)者都沒(méi)有對(duì)Event加鎖;