【netty學(xué)習(xí)筆記十七】Mpsc高性能無(wú)鎖隊(duì)列

Mpsc(Multi producer single consumer)即多生產(chǎn)者單消費(fèi)者隊(duì)列合住,是Jctools中的高性能隊(duì)列绰精,也是netty經(jīng)常的隊(duì)列撒璧,如EventLoop中的事件隊(duì)列就用Mpsc而不是jdk自帶的隊(duì)列。
本文主要介紹二類Mpsc隊(duì)列:MpscArrayQueue笨使、MpscChunkedArrayQueue

MpscArrayQueue

MpscArrayQueue是定長(zhǎng)隊(duì)列卿樱,底層用環(huán)形數(shù)組實(shí)現(xiàn)。

// 計(jì)算下標(biāo)輔助值硫椰,初始為容量-1繁调,這樣可以用&運(yùn)算
protected final long mask;
// 存放數(shù)據(jù)的數(shù)組
protected final E[] buffer;
// 生產(chǎn)者的索引
private volatile long producerIndex;
// 生產(chǎn)者的下標(biāo)限制值,用來(lái)判斷隊(duì)列是否已滿
private volatile long producerLimit;
// 消費(fèi)者的索引
protected long consumerIndex;

接下來(lái)看offer方法:

public boolean offer(final E e)
    {
        if (null == e)
        {
            throw new NullPointerException();
        }

        // use a cached view on consumer index (potentially updated in loop)
        final long mask = this.mask;
        //lvProducerLimit直接返回生產(chǎn)者索引最大限制
        long producerLimit = lvProducerLimit();
        long pIndex;
        do
        {
            //獲取生產(chǎn)者索引
            pIndex = lvProducerIndex();
            if (pIndex >= producerLimit)
            {
                final long cIndex = lvConsumerIndex();
                //重新計(jì)算生產(chǎn)者索引最大限制值靶草,producerLimit=mask+1=容量蹄胰,cIndex是消費(fèi)者索引,cIndex不等于0說(shuō)明有消費(fèi)奕翔,那么producerLimit也要相應(yīng)的增加
                producerLimit = cIndex + mask + 1;

                if (pIndex >= producerLimit)
                {
                    return false; // FULL :(
                }
                else
                {
                    // update producer limit to the next index that we must recheck the consumer index
                    // this is racy, but the race is benign
                    soProducerLimit(producerLimit);
                }
            }
        }
        //死循環(huán)CAS設(shè)置pIndex下標(biāo)實(shí)際內(nèi)存偏移地址
        while (!casProducerIndex(pIndex, pIndex + 1));
        
        //計(jì)算pIndex下標(biāo)實(shí)際內(nèi)存偏移地址
        final long offset = calcCircularRefElementOffset(pIndex, mask);
        //將pIndex下標(biāo)實(shí)際內(nèi)存偏移地址設(shè)置為要插入的值
        soRefElement(buffer, offset, e);
        return true; // AWESOME :)
    }

final boolean casProducerIndex(long expect, long newValue)
    {
        return UNSAFE.compareAndSwapLong(this, P_INDEX_OFFSET, expect, newValue);
    }

public static long calcCircularRefElementOffset(long index, long mask)
    {
        return REF_ARRAY_BASE + ((index & mask) << REF_ELEMENT_SHIFT);
    }

這里計(jì)算pIndex下標(biāo)實(shí)際內(nèi)存偏移地址方法要注意下:

final int scale = UnsafeAccess.UNSAFE.arrayIndexScale(Object[].class);
        if (4 == scale)
        {
            REF_ELEMENT_SHIFT = 2;
        }
        else if (8 == scale)
        {
            REF_ELEMENT_SHIFT = 3;
        }
       
REF_ARRAY_BASE=UnsafeAccess.UNSAFE.arrayBaseOffset(Object[].class)

其中REF_ARRAY_BASE是數(shù)組內(nèi)存初始偏移量裕寨,scale是每個(gè)數(shù)組每個(gè)元素的內(nèi)存增量REF_ELEMENT_SHIFT是轉(zhuǎn)成2的n次方,好利用位運(yùn)算來(lái)計(jì)算糠悯。

最后看下poll方法:

public E poll()
    {
        final long cIndex = lpConsumerIndex();
        //根據(jù)cIndex計(jì)算實(shí)際的內(nèi)存偏移值
        final long offset = calcCircularRefElementOffset(cIndex, mask);
        // Copy field to avoid re-reading after volatile load
        final E[] buffer = this.buffer;

        // If we can't see the next available element we can't poll
        //獲取當(dāng)前可消費(fèi)的元素
        E e = lvRefElement(buffer, offset);
        //e=null則一直循環(huán)獲取
        if (null == e)
        {
            
            if (cIndex != lvProducerIndex())
            {
                do
                {
                    e = lvRefElement(buffer, offset);
                }
                while (e == null);
            }
            else
            {
                return null;
            }
        }
        //消費(fèi)成功帮坚,將元素設(shè)為null
        soRefElement(buffer, offset, null);
        //消費(fèi)者索引+1
        soConsumerIndex(cIndex + 1);
        return e;
    }
public static long calcCircularRefElementOffset(long index, long mask)
    {
        return REF_ARRAY_BASE + ((index & mask) << REF_ELEMENT_SHIFT);
    }
public static <E> E lvRefElement(E[] buffer, long offset)
    {
        return (E) UNSAFE.getObjectVolatile(buffer, offset);
    }
public static <E> void soRefElement(E[] buffer, long offset, E e)
    {
        UNSAFE.putOrderedObject(buffer, offset, e);
    }
final void soConsumerIndex(long newValue)
    {
        UNSAFE.putOrderedLong(this, C_INDEX_OFFSET, newValue);
    }

MpscChunkedArrayQueue

MpscChunkedArrayQueue是一個(gè)非定長(zhǎng)的隊(duì)列,適合無(wú)法預(yù)測(cè)隊(duì)列長(zhǎng)度的場(chǎng)景互艾∈院停基于數(shù)組+鏈表的結(jié)構(gòu),不會(huì)像鏈表那樣分配過(guò)多的Node纫普,吞吐量比傳統(tǒng)的鏈表高阅悍。

屬性:

//消費(fèi)者輔助計(jì)算值=(容量-1)/2
protected long consumerMask;
//和生產(chǎn)者一樣,指向數(shù)組引用
protected E[] consumerBuffer;
//消費(fèi)者索引
protected long consumerIndex;

protected long producerMask;
protected long producerIndex;
private volatile long producerLimit;
protected E[] producerBuffer;
//最大容量昨稼,默認(rèn)為Pow2.roundToPowerOfTwo(maxCapacity)) << 1
protected final long maxQueueCapacity;

構(gòu)造方法:

public BaseMpscLinkedArrayQueue(final int initialCapacity)
    {
        RangeUtil.checkGreaterThanOrEqual(initialCapacity, 2, "initialCapacity");
        //比initialCapacity大的最近的2^n值
        int p2capacity = Pow2.roundToPowerOfTwo(initialCapacity);
        // leave lower bit of mask clear
        long mask = (p2capacity - 1) << 1;
        // need extra element to point at next array
        //初始數(shù)組大小=p2capacity + 1
        E[] buffer = allocateRefArray(p2capacity + 1);
        producerBuffer = buffer;
        producerMask = mask;
        consumerBuffer = buffer;
        consumerMask = mask;
        soProducerLimit(mask); // we know it's all empty to start with
    }

先看poll方法:

public boolean offer(final E e)
    {
        if (null == e)
        {
            throw new NullPointerException();
        }

        long mask;
        E[] buffer;
        long pIndex;

        while (true)
        {
            long producerLimit = lvProducerLimit();
            pIndex = lvProducerIndex();
            // lower bit is indicative of resize, if we see it we spin until it's cleared
            //和MpscArrayQueue不一樣节视,pIndex每次會(huì)加2,低位是識(shí)別擴(kuò)容用的假栓,如果是擴(kuò)容寻行,則等待擴(kuò)容完畢(擴(kuò)容完會(huì)設(shè)置pIndex為2的倍數(shù))
            if ((pIndex & 1) == 1)
            {
                continue;
            }
            // pIndex is even (lower bit is 0) -> actual index is (pIndex >> 1)


            mask = this.producerMask;
            buffer = this.producerBuffer;
            //當(dāng)閾值小于生產(chǎn)者索引時(shí),需要擴(kuò)容匾荆,否則pIndex+2
            if (producerLimit <= pIndex)
            {
                //返回狀態(tài)值拌蜘,根據(jù)狀態(tài)值處理新元素
                int result = offerSlowPath(mask, pIndex, producerLimit);
                switch (result)
                {
                    case CONTINUE_TO_P_INDEX_CAS: //繼續(xù)嘗試CAS設(shè)置pIndex+2
                        break;
                    case RETRY: //可能CAS并發(fā)失敗,繼續(xù)
                        continue;
                    case QUEUE_FULL: //隊(duì)列滿了
                        return false;
                    case QUEUE_RESIZE: //隊(duì)列需要擴(kuò)容
                        resize(mask, buffer, pIndex, e, null);
                        return true;
                }
            }
            //CAS設(shè)置pIndex+2
            if (casProducerIndex(pIndex, pIndex + 2))
            {
                break;
            }
        }
        // 獲取pIndex實(shí)際內(nèi)存偏移值并設(shè)置牙丽,和MSPCArrayQueue一樣
        final long offset = modifiedCalcCircularRefElementOffset(pIndex, mask);
        soRefElement(buffer, offset, e); // release element e
        return true;
    }

繼續(xù)看offerSlowPath方法:

private int offerSlowPath(long mask, long pIndex, long producerLimit)
    {
        final long cIndex = lvConsumerIndex(); //計(jì)算消費(fèi)者索引
        long bufferCapacity = getCurrentBufferCapacity(mask); //獲取buffer容量
        //如果消費(fèi)者索引+buff容量>生產(chǎn)者索引简卧,說(shuō)明當(dāng)前容量不夠用了
        if (cIndex + bufferCapacity > pIndex)
        {
            //嘗試CAS設(shè)置producerLimit=cIndex + bufferCapacity,成功返回繼續(xù)烤芦,失敗返回重試
            if (!casProducerLimit(producerLimit, cIndex + bufferCapacity))
            {
                // retry from top
                return RETRY;
            }
            else
            {
                // continue to pIndex CAS
                return CONTINUE_TO_P_INDEX_CAS;
            }
        }
        //超過(guò)最大容量举娩,滿了
        // full and cannot grow
        else if (availableInQueue(pIndex, cIndex) <= 0)
        {
            // offer should return false;
            return QUEUE_FULL;
        }
        // grab index for resize -> set lower bit
        //設(shè)置pIndex+1,成功的話返回?cái)U(kuò)容,注意這個(gè)+1操作铜涉,前面就有pIndex&1操作來(lái)判斷是否擴(kuò)容
        else if (casProducerIndex(pIndex, pIndex + 1))
        {
            // trigger a resize
            return QUEUE_RESIZE;
        }
        else
        {
            // failed resize attempt, retry from top
            return RETRY;
        }
    }

protected long availableInQueue(long pIndex, long cIndex)
    {
        return maxQueueCapacity - (pIndex - cIndex);
    }

繼續(xù)看擴(kuò)容方法resize

private void resize(long oldMask, E[] oldBuffer, long pIndex, E e, Supplier<E> s)
    {
        assert (e != null && s == null) || (e == null || s != null);
        //獲取oldBuffer長(zhǎng)度值
        int newBufferLength = getNextBufferSize(oldBuffer);
        final E[] newBuffer;
        try
        {
            newBuffer = allocateRefArray(newBufferLength);
        }
        catch (OutOfMemoryError oom)
        {
            assert lvProducerIndex() == pIndex + 1;
            soProducerIndex(pIndex);
            throw oom;
        }

        producerBuffer = newBuffer;
        final int newMask = (newBufferLength - 2) << 1;
        producerMask = newMask;
        //分別根據(jù)oldMask智玻、newMask獲取偏移位置值
        final long offsetInOld = modifiedCalcCircularRefElementOffset(pIndex, oldMask);
        final long offsetInNew = modifiedCalcCircularRefElementOffset(pIndex, newMask);
        //將元素e設(shè)置到新的緩沖區(qū)newBuffer的offsetInNew位置處
        soRefElement(newBuffer, offsetInNew, e == null ? s.get() : e);// element in new array
        // 將oldBuffer中最后一個(gè)元素的位置指向新的緩沖區(qū)newBuffer
        soRefElement(oldBuffer, nextArrayOffset(oldMask), newBuffer);// buffer linked

        // ASSERT code
        final long cIndex = lvConsumerIndex();
        final long availableInQueue = availableInQueue(pIndex, cIndex);
        RangeUtil.checkPositive(availableInQueue, "availableInQueue");

        // 重新計(jì)算閾值
        soProducerLimit(pIndex + Math.min(newMask, availableInQueue));

        // make resize visible to the other producers
        soProducerIndex(pIndex + 2);

        // INDEX visible before ELEMENT, consistent with consumer expectation

        // make resize visible to consumer
        //用一個(gè)空對(duì)象JUMP來(lái)連接新老緩沖區(qū),消費(fèi)遇到JUMP就要獲取新數(shù)組地址了
        soRefElement(oldBuffer, offsetInOld, JUMP);
    }

繼續(xù)看poll方法:

public E poll()
    {
        final E[] buffer = consumerBuffer;
        final long index = lpConsumerIndex();
        final long mask = consumerMask;
        //獲取消費(fèi)者索引實(shí)際內(nèi)存偏移值
        final long offset = modifiedCalcCircularRefElementOffset(index, mask);
        Object e = lvRefElement(buffer, offset);
        if (e == null)
        {
            if (index != lvProducerIndex())
            {
                // poll() == null iff queue is empty, null element is not strong enough indicator, so we must
                // check the producer index. If the queue is indeed not empty we spin until element is
                // visible.
                do
                {
                    e = lvRefElement(buffer, offset);
                }
                while (e == null);
            }
            else
            {
                return null;
            }
        }
        //如果e為JUMP骄噪,說(shuō)明擴(kuò)容過(guò)尚困,要找下一個(gè)數(shù)組
        if (e == JUMP)
        {
            final E[] nextBuffer = nextBuffer(buffer, mask);
            return newBufferPoll(nextBuffer, index);
        }
        //設(shè)置元素為null并更新消費(fèi)者索引
        soRefElement(buffer, offset, null); // release element null
        soConsumerIndex(index + 2); // release cIndex
        return (E) e;
    }

private E newBufferPoll(E[] nextBuffer, long index)
    {
        final long offset = modifiedCalcCircularRefElementOffset(index, consumerMask);
        final E n = lvRefElement(nextBuffer, offset);
        if (n == null)
        {
            throw new IllegalStateException("new buffer must have at least one element");
        }
        soRefElement(nextBuffer, offset, null);
        soConsumerIndex(index + 2);
        return n;
    }

總結(jié)下優(yōu)化點(diǎn):

  1. 大量的位運(yùn)算
  2. 使用Unsafe.putOrderedXXX(以前是putXXXVolitaile),Volitaile語(yǔ)義會(huì)讓其他線程立刻看到值链蕊,使用的是store-load屏障事甜,性能差些,在Mpsc場(chǎng)景沒(méi)有使用的必要
  3. 無(wú)鎖化
  4. 偽共享(本文沒(méi)展現(xiàn)去掉了填充代碼)
  5. 底層結(jié)構(gòu)主要使用數(shù)組滔韵,性能更好
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末逻谦,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子陪蜻,更是在濱河造成了極大的恐慌邦马,老刑警劉巖,帶你破解...
    沈念sama閱讀 211,194評(píng)論 6 490
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件宴卖,死亡現(xiàn)場(chǎng)離奇詭異滋将,居然都是意外死亡,警方通過(guò)查閱死者的電腦和手機(jī)症昏,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,058評(píng)論 2 385
  • 文/潘曉璐 我一進(jìn)店門(mén)随闽,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái),“玉大人肝谭,你說(shuō)我怎么就攤上這事掘宪。” “怎么了攘烛?”我有些...
    開(kāi)封第一講書(shū)人閱讀 156,780評(píng)論 0 346
  • 文/不壞的土叔 我叫張陵魏滚,是天一觀的道長(zhǎng)。 經(jīng)常有香客問(wèn)我坟漱,道長(zhǎng)鼠次,這世上最難降的妖魔是什么? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 56,388評(píng)論 1 283
  • 正文 為了忘掉前任芋齿,我火速辦了婚禮腥寇,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘沟突。我一直安慰自己,他們只是感情好捕传,可當(dāng)我...
    茶點(diǎn)故事閱讀 65,430評(píng)論 5 384
  • 文/花漫 我一把揭開(kāi)白布惠拭。 她就那樣靜靜地躺著,像睡著了一般。 火紅的嫁衣襯著肌膚如雪职辅。 梳的紋絲不亂的頭發(fā)上棒呛,一...
    開(kāi)封第一講書(shū)人閱讀 49,764評(píng)論 1 290
  • 那天,我揣著相機(jī)與錄音域携,去河邊找鬼簇秒。 笑死,一個(gè)胖子當(dāng)著我的面吹牛秀鞭,可吹牛的內(nèi)容都是我干的趋观。 我是一名探鬼主播,決...
    沈念sama閱讀 38,907評(píng)論 3 406
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼锋边,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼皱坛!你這毒婦竟也來(lái)了?” 一聲冷哼從身側(cè)響起豆巨,我...
    開(kāi)封第一講書(shū)人閱讀 37,679評(píng)論 0 266
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤剩辟,失蹤者是張志新(化名)和其女友劉穎,沒(méi)想到半個(gè)月后往扔,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體贩猎,經(jīng)...
    沈念sama閱讀 44,122評(píng)論 1 303
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,459評(píng)論 2 325
  • 正文 我和宋清朗相戀三年萍膛,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了吭服。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 38,605評(píng)論 1 340
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡卦羡,死狀恐怖噪馏,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情绿饵,我是刑警寧澤欠肾,帶...
    沈念sama閱讀 34,270評(píng)論 4 329
  • 正文 年R本政府宣布,位于F島的核電站拟赊,受9級(jí)特大地震影響刺桃,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜吸祟,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,867評(píng)論 3 312
  • 文/蒙蒙 一瑟慈、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧屋匕,春花似錦葛碧、人聲如沸。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 30,734評(píng)論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)蔗衡。三九已至,卻和暖如春乳绕,著一層夾襖步出監(jiān)牢的瞬間绞惦,已是汗流浹背。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 31,961評(píng)論 1 265
  • 我被黑心中介騙來(lái)泰國(guó)打工洋措, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留济蝉,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 46,297評(píng)論 2 360
  • 正文 我出身青樓菠发,卻偏偏與公主長(zhǎng)得像王滤,于是被迫代替她去往敵國(guó)和親。 傳聞我的和親對(duì)象是個(gè)殘疾皇子雷酪,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 43,472評(píng)論 2 348