Mpsc(Multi producer single consumer)即多生產(chǎn)者單消費(fèi)者隊(duì)列合住,是Jctools中的高性能隊(duì)列绰精,也是netty經(jīng)常的隊(duì)列撒璧,如EventLoop中的事件隊(duì)列就用Mpsc而不是jdk自帶的隊(duì)列。
本文主要介紹二類Mpsc隊(duì)列:MpscArrayQueue笨使、MpscChunkedArrayQueue
MpscArrayQueue
MpscArrayQueue是定長(zhǎng)隊(duì)列卿樱,底層用環(huán)形數(shù)組實(shí)現(xiàn)。
// 計(jì)算下標(biāo)輔助值硫椰,初始為容量-1繁调,這樣可以用&運(yùn)算
protected final long mask;
// 存放數(shù)據(jù)的數(shù)組
protected final E[] buffer;
// 生產(chǎn)者的索引
private volatile long producerIndex;
// 生產(chǎn)者的下標(biāo)限制值,用來(lái)判斷隊(duì)列是否已滿
private volatile long producerLimit;
// 消費(fèi)者的索引
protected long consumerIndex;
接下來(lái)看offer方法:
public boolean offer(final E e)
{
if (null == e)
{
throw new NullPointerException();
}
// use a cached view on consumer index (potentially updated in loop)
final long mask = this.mask;
//lvProducerLimit直接返回生產(chǎn)者索引最大限制
long producerLimit = lvProducerLimit();
long pIndex;
do
{
//獲取生產(chǎn)者索引
pIndex = lvProducerIndex();
if (pIndex >= producerLimit)
{
final long cIndex = lvConsumerIndex();
//重新計(jì)算生產(chǎn)者索引最大限制值靶草,producerLimit=mask+1=容量蹄胰,cIndex是消費(fèi)者索引,cIndex不等于0說(shuō)明有消費(fèi)奕翔,那么producerLimit也要相應(yīng)的增加
producerLimit = cIndex + mask + 1;
if (pIndex >= producerLimit)
{
return false; // FULL :(
}
else
{
// update producer limit to the next index that we must recheck the consumer index
// this is racy, but the race is benign
soProducerLimit(producerLimit);
}
}
}
//死循環(huán)CAS設(shè)置pIndex下標(biāo)實(shí)際內(nèi)存偏移地址
while (!casProducerIndex(pIndex, pIndex + 1));
//計(jì)算pIndex下標(biāo)實(shí)際內(nèi)存偏移地址
final long offset = calcCircularRefElementOffset(pIndex, mask);
//將pIndex下標(biāo)實(shí)際內(nèi)存偏移地址設(shè)置為要插入的值
soRefElement(buffer, offset, e);
return true; // AWESOME :)
}
final boolean casProducerIndex(long expect, long newValue)
{
return UNSAFE.compareAndSwapLong(this, P_INDEX_OFFSET, expect, newValue);
}
public static long calcCircularRefElementOffset(long index, long mask)
{
return REF_ARRAY_BASE + ((index & mask) << REF_ELEMENT_SHIFT);
}
這里計(jì)算pIndex下標(biāo)實(shí)際內(nèi)存偏移地址方法要注意下:
final int scale = UnsafeAccess.UNSAFE.arrayIndexScale(Object[].class);
if (4 == scale)
{
REF_ELEMENT_SHIFT = 2;
}
else if (8 == scale)
{
REF_ELEMENT_SHIFT = 3;
}
REF_ARRAY_BASE=UnsafeAccess.UNSAFE.arrayBaseOffset(Object[].class)
其中REF_ARRAY_BASE是數(shù)組內(nèi)存初始偏移量裕寨,scale是每個(gè)數(shù)組每個(gè)元素的內(nèi)存增量REF_ELEMENT_SHIFT是轉(zhuǎn)成2的n次方,好利用位運(yùn)算來(lái)計(jì)算糠悯。
最后看下poll方法:
public E poll()
{
final long cIndex = lpConsumerIndex();
//根據(jù)cIndex計(jì)算實(shí)際的內(nèi)存偏移值
final long offset = calcCircularRefElementOffset(cIndex, mask);
// Copy field to avoid re-reading after volatile load
final E[] buffer = this.buffer;
// If we can't see the next available element we can't poll
//獲取當(dāng)前可消費(fèi)的元素
E e = lvRefElement(buffer, offset);
//e=null則一直循環(huán)獲取
if (null == e)
{
if (cIndex != lvProducerIndex())
{
do
{
e = lvRefElement(buffer, offset);
}
while (e == null);
}
else
{
return null;
}
}
//消費(fèi)成功帮坚,將元素設(shè)為null
soRefElement(buffer, offset, null);
//消費(fèi)者索引+1
soConsumerIndex(cIndex + 1);
return e;
}
public static long calcCircularRefElementOffset(long index, long mask)
{
return REF_ARRAY_BASE + ((index & mask) << REF_ELEMENT_SHIFT);
}
public static <E> E lvRefElement(E[] buffer, long offset)
{
return (E) UNSAFE.getObjectVolatile(buffer, offset);
}
public static <E> void soRefElement(E[] buffer, long offset, E e)
{
UNSAFE.putOrderedObject(buffer, offset, e);
}
final void soConsumerIndex(long newValue)
{
UNSAFE.putOrderedLong(this, C_INDEX_OFFSET, newValue);
}
MpscChunkedArrayQueue
MpscChunkedArrayQueue是一個(gè)非定長(zhǎng)的隊(duì)列,適合無(wú)法預(yù)測(cè)隊(duì)列長(zhǎng)度的場(chǎng)景互艾∈院停基于數(shù)組+鏈表的結(jié)構(gòu),不會(huì)像鏈表那樣分配過(guò)多的Node纫普,吞吐量比傳統(tǒng)的鏈表高阅悍。
屬性:
//消費(fèi)者輔助計(jì)算值=(容量-1)/2
protected long consumerMask;
//和生產(chǎn)者一樣,指向數(shù)組引用
protected E[] consumerBuffer;
//消費(fèi)者索引
protected long consumerIndex;
protected long producerMask;
protected long producerIndex;
private volatile long producerLimit;
protected E[] producerBuffer;
//最大容量昨稼,默認(rèn)為Pow2.roundToPowerOfTwo(maxCapacity)) << 1
protected final long maxQueueCapacity;
構(gòu)造方法:
public BaseMpscLinkedArrayQueue(final int initialCapacity)
{
RangeUtil.checkGreaterThanOrEqual(initialCapacity, 2, "initialCapacity");
//比initialCapacity大的最近的2^n值
int p2capacity = Pow2.roundToPowerOfTwo(initialCapacity);
// leave lower bit of mask clear
long mask = (p2capacity - 1) << 1;
// need extra element to point at next array
//初始數(shù)組大小=p2capacity + 1
E[] buffer = allocateRefArray(p2capacity + 1);
producerBuffer = buffer;
producerMask = mask;
consumerBuffer = buffer;
consumerMask = mask;
soProducerLimit(mask); // we know it's all empty to start with
}
先看poll方法:
public boolean offer(final E e)
{
if (null == e)
{
throw new NullPointerException();
}
long mask;
E[] buffer;
long pIndex;
while (true)
{
long producerLimit = lvProducerLimit();
pIndex = lvProducerIndex();
// lower bit is indicative of resize, if we see it we spin until it's cleared
//和MpscArrayQueue不一樣节视,pIndex每次會(huì)加2,低位是識(shí)別擴(kuò)容用的假栓,如果是擴(kuò)容寻行,則等待擴(kuò)容完畢(擴(kuò)容完會(huì)設(shè)置pIndex為2的倍數(shù))
if ((pIndex & 1) == 1)
{
continue;
}
// pIndex is even (lower bit is 0) -> actual index is (pIndex >> 1)
mask = this.producerMask;
buffer = this.producerBuffer;
//當(dāng)閾值小于生產(chǎn)者索引時(shí),需要擴(kuò)容匾荆,否則pIndex+2
if (producerLimit <= pIndex)
{
//返回狀態(tài)值拌蜘,根據(jù)狀態(tài)值處理新元素
int result = offerSlowPath(mask, pIndex, producerLimit);
switch (result)
{
case CONTINUE_TO_P_INDEX_CAS: //繼續(xù)嘗試CAS設(shè)置pIndex+2
break;
case RETRY: //可能CAS并發(fā)失敗,繼續(xù)
continue;
case QUEUE_FULL: //隊(duì)列滿了
return false;
case QUEUE_RESIZE: //隊(duì)列需要擴(kuò)容
resize(mask, buffer, pIndex, e, null);
return true;
}
}
//CAS設(shè)置pIndex+2
if (casProducerIndex(pIndex, pIndex + 2))
{
break;
}
}
// 獲取pIndex實(shí)際內(nèi)存偏移值并設(shè)置牙丽,和MSPCArrayQueue一樣
final long offset = modifiedCalcCircularRefElementOffset(pIndex, mask);
soRefElement(buffer, offset, e); // release element e
return true;
}
繼續(xù)看offerSlowPath方法:
private int offerSlowPath(long mask, long pIndex, long producerLimit)
{
final long cIndex = lvConsumerIndex(); //計(jì)算消費(fèi)者索引
long bufferCapacity = getCurrentBufferCapacity(mask); //獲取buffer容量
//如果消費(fèi)者索引+buff容量>生產(chǎn)者索引简卧,說(shuō)明當(dāng)前容量不夠用了
if (cIndex + bufferCapacity > pIndex)
{
//嘗試CAS設(shè)置producerLimit=cIndex + bufferCapacity,成功返回繼續(xù)烤芦,失敗返回重試
if (!casProducerLimit(producerLimit, cIndex + bufferCapacity))
{
// retry from top
return RETRY;
}
else
{
// continue to pIndex CAS
return CONTINUE_TO_P_INDEX_CAS;
}
}
//超過(guò)最大容量举娩,滿了
// full and cannot grow
else if (availableInQueue(pIndex, cIndex) <= 0)
{
// offer should return false;
return QUEUE_FULL;
}
// grab index for resize -> set lower bit
//設(shè)置pIndex+1,成功的話返回?cái)U(kuò)容,注意這個(gè)+1操作铜涉,前面就有pIndex&1操作來(lái)判斷是否擴(kuò)容
else if (casProducerIndex(pIndex, pIndex + 1))
{
// trigger a resize
return QUEUE_RESIZE;
}
else
{
// failed resize attempt, retry from top
return RETRY;
}
}
protected long availableInQueue(long pIndex, long cIndex)
{
return maxQueueCapacity - (pIndex - cIndex);
}
繼續(xù)看擴(kuò)容方法resize
private void resize(long oldMask, E[] oldBuffer, long pIndex, E e, Supplier<E> s)
{
assert (e != null && s == null) || (e == null || s != null);
//獲取oldBuffer長(zhǎng)度值
int newBufferLength = getNextBufferSize(oldBuffer);
final E[] newBuffer;
try
{
newBuffer = allocateRefArray(newBufferLength);
}
catch (OutOfMemoryError oom)
{
assert lvProducerIndex() == pIndex + 1;
soProducerIndex(pIndex);
throw oom;
}
producerBuffer = newBuffer;
final int newMask = (newBufferLength - 2) << 1;
producerMask = newMask;
//分別根據(jù)oldMask智玻、newMask獲取偏移位置值
final long offsetInOld = modifiedCalcCircularRefElementOffset(pIndex, oldMask);
final long offsetInNew = modifiedCalcCircularRefElementOffset(pIndex, newMask);
//將元素e設(shè)置到新的緩沖區(qū)newBuffer的offsetInNew位置處
soRefElement(newBuffer, offsetInNew, e == null ? s.get() : e);// element in new array
// 將oldBuffer中最后一個(gè)元素的位置指向新的緩沖區(qū)newBuffer
soRefElement(oldBuffer, nextArrayOffset(oldMask), newBuffer);// buffer linked
// ASSERT code
final long cIndex = lvConsumerIndex();
final long availableInQueue = availableInQueue(pIndex, cIndex);
RangeUtil.checkPositive(availableInQueue, "availableInQueue");
// 重新計(jì)算閾值
soProducerLimit(pIndex + Math.min(newMask, availableInQueue));
// make resize visible to the other producers
soProducerIndex(pIndex + 2);
// INDEX visible before ELEMENT, consistent with consumer expectation
// make resize visible to consumer
//用一個(gè)空對(duì)象JUMP來(lái)連接新老緩沖區(qū),消費(fèi)遇到JUMP就要獲取新數(shù)組地址了
soRefElement(oldBuffer, offsetInOld, JUMP);
}
繼續(xù)看poll方法:
public E poll()
{
final E[] buffer = consumerBuffer;
final long index = lpConsumerIndex();
final long mask = consumerMask;
//獲取消費(fèi)者索引實(shí)際內(nèi)存偏移值
final long offset = modifiedCalcCircularRefElementOffset(index, mask);
Object e = lvRefElement(buffer, offset);
if (e == null)
{
if (index != lvProducerIndex())
{
// poll() == null iff queue is empty, null element is not strong enough indicator, so we must
// check the producer index. If the queue is indeed not empty we spin until element is
// visible.
do
{
e = lvRefElement(buffer, offset);
}
while (e == null);
}
else
{
return null;
}
}
//如果e為JUMP骄噪,說(shuō)明擴(kuò)容過(guò)尚困,要找下一個(gè)數(shù)組
if (e == JUMP)
{
final E[] nextBuffer = nextBuffer(buffer, mask);
return newBufferPoll(nextBuffer, index);
}
//設(shè)置元素為null并更新消費(fèi)者索引
soRefElement(buffer, offset, null); // release element null
soConsumerIndex(index + 2); // release cIndex
return (E) e;
}
private E newBufferPoll(E[] nextBuffer, long index)
{
final long offset = modifiedCalcCircularRefElementOffset(index, consumerMask);
final E n = lvRefElement(nextBuffer, offset);
if (n == null)
{
throw new IllegalStateException("new buffer must have at least one element");
}
soRefElement(nextBuffer, offset, null);
soConsumerIndex(index + 2);
return n;
}
總結(jié)下優(yōu)化點(diǎn):
- 大量的位運(yùn)算
- 使用Unsafe.putOrderedXXX(以前是putXXXVolitaile),Volitaile語(yǔ)義會(huì)讓其他線程立刻看到值链蕊,使用的是store-load屏障事甜,性能差些,在Mpsc場(chǎng)景沒(méi)有使用的必要
- 無(wú)鎖化
- 偽共享(本文沒(méi)展現(xiàn)去掉了填充代碼)
- 底層結(jié)構(gòu)主要使用數(shù)組滔韵,性能更好