LinkedTransferQueue源碼解析

之前寫了SynchronousQueue的源碼解析霎冯,其內(nèi)部實現(xiàn)有兩個數(shù)據(jù)結(jié)構(gòu):一個是棧,一個是FIFO隊列钞瀑,在之前的文章中主要分析了棧(非公平模式)的實現(xiàn)思路而沒有分析隊列的實現(xiàn)沈撞,其實是因為JDK里面本身有LinkedTransferQueue這個隊列的實現(xiàn),所以今天單獨來說下這個隊列的實現(xiàn)思路雕什,主要目的方便自己以后快速的回顧缠俺。
首先我們回顧一下SynchronousQueue里面的TransferStack的實現(xiàn)方式:
1.棧中節(jié)點通過0显晶,1,2來標(biāo)記節(jié)點是獲取數(shù)據(jù)還是生產(chǎn)數(shù)據(jù)壹士,或者是正在匹配磷雇。
2.棧中的節(jié)點稱為waiter(等待者),等待其它線程與其匹配躏救。
3.匹配過程是將匹配節(jié)點入棧唯笙,并設(shè)置節(jié)點操作模式為FULFILLING|mode讓后面的匹配節(jié)點和等待節(jié)點都不能入棧,即達(dá)到了匹配動作的線程安全盒使。
LinkedTransferQueue的實現(xiàn)在邏輯上與TransferStack大致相同崩掘,不同點在于隊列是FIFO的,所以匹配每次操作的都是從頭節(jié)點開始尋找忠怖,直到找到一個未匹配的節(jié)點呢堰,而入隊列的節(jié)點是從隊尾進(jìn)入抄瑟,與隊頭不沖突凡泣,所以在實現(xiàn)上,不會需要像棧一定皮假,將匹配中的節(jié)點壓入棧鞋拟。
LinkedTransferQueue的實現(xiàn)也沒有像SynchronousQueue里面的實現(xiàn)那樣,在transfer(xfer)方法的處理上也有一些不同惹资。在SynchronousQueue的transfer方法上只有3個參數(shù)贺纲,方法實現(xiàn)是通過是否傳入元素e來判斷本次操作是獲取還是生產(chǎn),而在LinkedTransferQueue的xfer方法的實現(xiàn)上褪测,還需要手動的傳入一個haveData參數(shù)來表名本次的操作猴誊。(xfer是jdk1.7寫的,transfer方法是1.8寫的侮措,可以看出作者后面也發(fā)現(xiàn)這個參數(shù)真沒必要)懈叹。
先說說總體首先思路吧:
1.一次操作,先從隊列頭開始檢查分扎,看節(jié)點是否已經(jīng)匹配澄成,并且模式是否與本次操作相匹配,如果模式匹配畏吓,則進(jìn)行配對CAS操作墨状。
2.如果頭節(jié)點已經(jīng)被匹配,則循環(huán)查找下一個節(jié)點
3.如果隊列為空菲饼,或者操作模式與隊列中的節(jié)點的模式一樣肾砂,則本次操作構(gòu)造為節(jié)點進(jìn)入隊列成為等待者
4.等待者在隊列中睡眠或者自旋,當(dāng)有調(diào)用者將其喚醒后匹配成功出隊列

接下來是對代碼的詳細(xì)說明:

private E xfer(E e, boolean putData, int how, long nanos) {
    //判斷參數(shù)操作模式和e是否匹配宏悦,如果是put通今,則e不為空
    if (putData && (e == null))
        throw new NullPointerException();
    Node s = null;                        // the node to append, if needed

    retry:
    for (;;) {                            // restart on append race

        //代碼1:從頭節(jié)點開始遍歷粥谬,直到隊列為空
        for (Node h = head, p = h; p != null;) { // find & match first node
            boolean nodeIsPutData = p.isData; //頭節(jié)點操作模式(false為消費,true為生產(chǎn))
            Object item = p.item; //頭節(jié)點數(shù)據(jù)
            
            //代碼2
            //代碼2-1:如果頭節(jié)點的數(shù)據(jù)不等于自己(節(jié)點未被匹配辫塌,見awaitMatch方法代碼1)
            //(item != null) == nodeIsPutData 表明節(jié)點的操作模式要與節(jié)點的數(shù)據(jù)對應(yīng)漏策,這樣節(jié)點才是正常的
            if (item != p && (item != null) == nodeIsPutData) { // unmatched //如果頭節(jié)點已經(jīng)被匹配,則去到代碼3臼氨,繼續(xù)循環(huán)下一個節(jié)點
                if (nodeIsPutData == putData)   // can't match //如果頭節(jié)點的模式和本次操作的模式一樣掺喻,則不能匹配,所以跳出循環(huán)來到代碼4(入隊列)
                    break;
                
                if (p.casItem(item, e)) { // match //匹配成功
                    //n1(head: h)->n2->n3->n4->n5(p)->n6->n7(tail)  假如找到n5才匹配上,則說明n1,n2,n3,n4都已經(jīng)
                    //
                    for (Node q = p; q != h;) {
                        Node n = q.next;  // update by 2 unless singleton
                        
                        //將頭節(jié)點指向p節(jié)點(p為尾節(jié)點)或者指向p.next储矩,從而p之前的那些節(jié)點出隊列
                        //這里為什么不是直接指向p.next感耙?為什么有可能指向p,p不是已經(jīng)被匹配了么 持隧?
                        //因為如果讓head指向null的話還需要cas改變tail即硼,兩個cas沒辦法保證原子性),否則指向q的下一個節(jié)點屡拨。這里在代碼3處得到補(bǔ)償
                        if (head == h && casHead(h, n == null ? q : n)) {
                            h.forgetNext(); //將next指向自己只酥,這里印證了代碼3
                            break; //
                        }                 // advance and retry
                        
                        //重新判斷隊列里面是否至少還有2個節(jié)點,并且第二個是已經(jīng)被匹配了的呀狼,這種情況裂允,再次進(jìn)入循環(huán),嘗試將已經(jīng)匹配的節(jié)點出隊列哥艇。
                        //if((h = head) != null && (q = h.next) != null && q.isMatched()){
                        //  continue;
                        //}else break;
                        
                        if ((h = head)   == null ||
                            (q = h.next) == null || 
                            !q.isMatched())
                            break;        // unless slack < 2
                        
                    }
                    
                    //代碼2-2
                    //喚醒p節(jié)點绝编,節(jié)點從awaitMatch方法中醒來,然后返回配對的值
                    LockSupport.unpark(p.waiter);
                    return LinkedTransferQueue.<E>cast(item);
                }
            }
            
            //代碼3貌踏,如果p!=p.next(p沒有出隊列)十饥,則取下一個節(jié)點繼續(xù)嘗試,否則從“頭”開始
            Node n = p.next;
            p = (p != n) ? n : (h = head); // Use head if p offlist
        }

        //代碼4:如果入隊節(jié)點模式與等待者相同祖乳,或者隊列中已經(jīng)沒有待匹配的等待者
        //NOW代表不等待的poll操作逗堵,除此之外
        if (how != NOW) {                 // No matches available
            if (s == null)
                s = new Node(e, putData); //本次操作構(gòu)造節(jié)點
            
            Node pred = tryAppend(s, putData); //嘗試入隊列,詳情見tryAppend方法
            if (pred == null)
                continue retry;   //入隊失敗凡资,從新嘗試        // lost race vs opposite mode
            
            //ASYNC代表offer, put, add
            //如果入隊列成功砸捏,并且是take和timed poll方法,則進(jìn)入等待
            if (how != ASYNC) //SYNC / TIMED
                //讓當(dāng)前節(jié)點進(jìn)入睡眠狀態(tài)隙赁,等待配對者將其喚醒
                return awaitMatch(s, pred, e, (how == TIMED), nanos);
        }
        
        return e; // not waiting
    }
}


/**
將本次操作封裝成隊列節(jié)點入隊
入隊列邏輯:
1.判斷隊列是否為空垦藏,為空則直接入
2.從隊尾入手,判斷隊尾節(jié)點的狀態(tài)是否與本次操作相同伞访,如果不同則不能入隊列
2.1.如果能入隊列掂骏,則判斷循環(huán)中的p節(jié)點是否還是尾節(jié)點,如果不是厚掷,則繼續(xù)往下找弟灼,直到尾節(jié)點
3.如果p是尾節(jié)點级解,則cas將s節(jié)點入隊列,如果cas失敗田绑,則p重新定位為next勤哗,又重新嘗試
4.如果入隊成功
**/
private Node tryAppend(Node s, boolean putData) {
    for (Node t = tail, p = t;;) {        // move p to last node and append
        Node n, u;                        // temps for reads of next & tail
        //1.隊列為空,將當(dāng)前節(jié)點設(shè)置為頭節(jié)點
        if (p == null && (p = head) == null) {
            if (casHead(null, s))
                return s;                 // initialize
        }
        //2.來到這里說明隊列不為空掩驱,此時理論上p!=null
        //p節(jié)點未被匹配芒划,并且模式與putData相反,則說明s節(jié)點無法接入隊列
        else if (p.cannotPrecede(putData))
            return null;                  // lost race vs opposite mode
        //2.1 如果已經(jīng)有新節(jié)點入隊列了欧穴,或者隊尾出隊了(p.next == p)
        else if ((n = p.next) != null)    // not last; keep traversing
        
            // tail發(fā)生了變化民逼,則指向新的tail(u),否則看老的tail節(jié)點是否出隊列,如果未出涮帘,則讓p指向p.next,如果已出隊列拼苍,則說明隊列空了,則讓p指向null重新嘗試
            //讓t每次都指向最新的tail调缨,而如果p!=t疮鲫,則說明p不是尾節(jié)點, 如果t已經(jīng)等于了tail同蜻,則單獨設(shè)置p(如果p!=p.next,則p就指向next棚点,否則p指向null早处,重新從head開始)
            //如果t不等于tail湾蔓,則將t指向tail
            //這里為什么要這么復(fù)雜? 因為有代碼3.可知砌梆,節(jié)點入隊的保證是casNext, 如果保證線程安全的入隊默责,不能只拿到tail就append,因為有可能老的tail已經(jīng)不是隊尾了咸包,
            //所以必須一直遍歷到p.next等于null才能入隊列桃序,并且如果發(fā)現(xiàn)p已經(jīng)出隊列,則將p指向null,然后代碼來到1.處烂瘫,從“頭”開始往后遍歷媒熊。
            //為什么從頭開始能保證線程安全呢? 因為節(jié)點出隊列是casHead來保證的坟比。
            p = (p != t) && t != (u = tail) ? 
                        (t = u) :             
                        ((p != n) ? n : null;  )// restart if off list
                    
        //3.入隊列
        else if (!p.casNext(null, s))
            p = p.next;                   // re-read on CAS failure
        //4.如果入隊成功返回前節(jié)點
        else {
            if (p != t) {                 // update if slack now >= 2
                while ((tail != t || !casTail(t, s)) && 
                       (t = tail)   != null && 
                       (s = t.next) != null &&
                       (s = s.next) != null && 
                       s != t);                    
                // advance and retry       
            }
            return p;
        }
    }// for over
}

//返回true芦鳍,說明調(diào)用者節(jié)點不能作為前繼節(jié)點
final boolean cannotPrecede(boolean haveData) {
    boolean d = isData; //
    Object x;
    //如果調(diào)用者操作模式和本次操作模式不相同
    //并且調(diào)用者沒有被匹配 x = item) != this && (x != null) == d
    return d != haveData && (x = item) != this && (x != null) == d;
}



private E awaitMatch(Node s, Node pred, E e, boolean timed, long nanos) {
    final long deadline = timed ? System.nanoTime() + nanos : 0L;
    Thread w = Thread.currentThread();
    int spins = -1; //讓出CPU的次數(shù) // initialized after first item and cancel checks
    ThreadLocalRandom randomYields = null; // bound if needed

    //自旋方法:睡眠節(jié)點等待超時,被中斷葛账,或者匹配成功才會返回柠衅。
    for (;;) {
        Object item = s.item;
        //代碼1:當(dāng)節(jié)點數(shù)據(jù)放生變化時,說明節(jié)點已經(jīng)被匹配籍琳,返回匹配值
        if (item != e) {                  // matched
            // assert item != s;
            s.forgetContents();           // avoid garbage
            return LinkedTransferQueue.<E>cast(item);
        }
        //如果當(dāng)前線程被中斷菲宴,或者線程等待超時(timed && nanos <= 0)并且將自己的item指向自己(在代碼2-1處可以看到贷祈,判斷節(jié)點是否被匹配,是通過判斷item是否等于自己)
        if ((w.isInterrupted() || (timed && nanos <= 0)) &&
                s.casItem(e, s)) {        // cancel
            unsplice(pred, s);
            return e;
        }

        //初始化spins==-1,當(dāng)其小于0時喝峦,
        if (spins < 0) {                  // establish spins at/near front
            if ((spins = spinsFor(pred, s.isData)) > 0)
                randomYields = ThreadLocalRandom.current();
        }
        //自旋次數(shù)大于0势誊,繼續(xù)
        else if (spins > 0) {             // spin
            --spins;
            if (randomYields.nextInt(CHAINED_SPINS) == 0)
                Thread.yield();   //隨機(jī)讓出CPU時間       // occasionally yield
        }
        //自旋完成,準(zhǔn)備進(jìn)入睡眠谣蠢,先設(shè)置等待線程
        else if (s.waiter == null) {
            s.waiter = w;                 // request unpark then recheck
        }
        //如果有時間等待限制键科,判斷是否超過等待時間,如果沒有超過等待時間漩怎,則讓節(jié)點睡眠(一定時間勋颖,節(jié)點超時或者被其它節(jié)點喚醒)
        else if (timed) {
            nanos = deadline - System.nanoTime();
            if (nanos > 0L)
                LockSupport.parkNanos(this, nanos);
        }
        else { //如果沒有時間限制,則直接將線程睡眠
            LockSupport.park(this);
        }
    }
}

/**
通過前節(jié)點和當(dāng)前節(jié)點的操作模式來判斷自旋次數(shù)
**/
private static int spinsFor(Node pred, boolean haveData) {
    //如果是多核心CPU勋锤,并且前繼節(jié)點存在饭玲,則根據(jù)前繼節(jié)點的情況設(shè)置自旋次數(shù)。
    if (MP && pred != null) {
        if (pred.isData != haveData)      // phase change //沒看懂為什么前節(jié)點的模式和本次操作模式會不相等(因為節(jié)點都已入隊列叁执,說明前節(jié)點和本節(jié)點類型應(yīng)該一定相等才對)
            return FRONT_SPINS + CHAINED_SPINS;
        if (pred.isMatched())             // probably at front  //如果前節(jié)點以及被匹配茄厘,那么自己就是最前面的節(jié)點了,那就使用前節(jié)前的自旋次數(shù)定義
            return FRONT_SPINS;
        if (pred.waiter == null)          // pred apparently spinning //如果前節(jié)點的waiter等于空谈宛,說明前節(jié)點還在自旋次哈,則使用較小的CHAINED_SPINS作為自旋次數(shù)
            return CHAINED_SPINS;
    }
    
    return 0;
}

最后再總結(jié)一下:
1.隊列中的節(jié)點稱為等待者,等待者首先會自旋一會兒來判斷自己是否被匹配上吆录,自旋完了還沒匹配上就線程睡眠窑滞。
2.判斷一個節(jié)點是否被匹配,直接判斷item跟isData模式上是否能匹配恢筝,但是代碼里面多加了一個item是否指向自己的判斷(我覺得沒必要)哀卫,因為匹配成功的條件就是p.casItem(item, e),見代碼2-1
3.判斷節(jié)點是否出隊列撬槽,需要判斷節(jié)點以及節(jié)點的next是否指向自己此改。
4.等待著發(fā)現(xiàn)自己被匹配上了,會將自己節(jié)點的內(nèi)容清空:即通過forgetContents方法將item指向自己侄柔,將線程指向null.

由于個人能力有限共啃,代碼的理解上可能會存在一定的錯誤,還請各位指正暂题,謝謝移剪。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市敢靡,隨后出現(xiàn)的幾起案子挂滓,更是在濱河造成了極大的恐慌,老刑警劉巖,帶你破解...
    沈念sama閱讀 217,657評論 6 505
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件赶站,死亡現(xiàn)場離奇詭異幔虏,居然都是意外死亡,警方通過查閱死者的電腦和手機(jī)贝椿,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,889評論 3 394
  • 文/潘曉璐 我一進(jìn)店門想括,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人烙博,你說我怎么就攤上這事瑟蜈。” “怎么了渣窜?”我有些...
    開封第一講書人閱讀 164,057評論 0 354
  • 文/不壞的土叔 我叫張陵铺根,是天一觀的道長。 經(jīng)常有香客問我乔宿,道長位迂,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 58,509評論 1 293
  • 正文 為了忘掉前任详瑞,我火速辦了婚禮掂林,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘坝橡。我一直安慰自己泻帮,他們只是感情好,可當(dāng)我...
    茶點故事閱讀 67,562評論 6 392
  • 文/花漫 我一把揭開白布计寇。 她就那樣靜靜地躺著锣杂,像睡著了一般。 火紅的嫁衣襯著肌膚如雪饲常。 梳的紋絲不亂的頭發(fā)上蹲堂,一...
    開封第一講書人閱讀 51,443評論 1 302
  • 那天狼讨,我揣著相機(jī)與錄音贝淤,去河邊找鬼。 笑死政供,一個胖子當(dāng)著我的面吹牛播聪,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播布隔,決...
    沈念sama閱讀 40,251評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼离陶,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了衅檀?” 一聲冷哼從身側(cè)響起招刨,我...
    開封第一講書人閱讀 39,129評論 0 276
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎哀军,沒想到半個月后沉眶,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體打却,經(jīng)...
    沈念sama閱讀 45,561評論 1 314
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,779評論 3 335
  • 正文 我和宋清朗相戀三年谎倔,在試婚紗的時候發(fā)現(xiàn)自己被綠了柳击。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 39,902評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡片习,死狀恐怖捌肴,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情藕咏,我是刑警寧澤状知,帶...
    沈念sama閱讀 35,621評論 5 345
  • 正文 年R本政府宣布,位于F島的核電站孽查,受9級特大地震影響试幽,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜卦碾,卻給世界環(huán)境...
    茶點故事閱讀 41,220評論 3 328
  • 文/蒙蒙 一铺坞、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧洲胖,春花似錦济榨、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,838評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至叉弦,卻和暖如春丐一,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背淹冰。 一陣腳步聲響...
    開封第一講書人閱讀 32,971評論 1 269
  • 我被黑心中介騙來泰國打工库车, 沒想到剛下飛機(jī)就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人樱拴。 一個月前我還...
    沈念sama閱讀 48,025評論 2 370
  • 正文 我出身青樓柠衍,卻偏偏與公主長得像,于是被迫代替她去往敵國和親晶乔。 傳聞我的和親對象是個殘疾皇子珍坊,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 44,843評論 2 354