JDK1.8 SynchronousQueue源碼解析

同步隊列:它繼承了一般的AbstractQueue和實現(xiàn)了BlockingQueue接口痘拆。它與其它的BlockingQueue最大的區(qū)別就在它不存儲任何數(shù)據(jù)狱杰,它的內(nèi)部是一個棧(非公平)或者一個隊列(公平策略)用來存儲訪問SynchronousQueue的線程憔披,而訪問它的線程有消費(fèi)者和生產(chǎn)者柱搜,對應(yīng)于方法put和take辩诞。當(dāng)一個生產(chǎn)者或者消費(fèi)者試圖訪問SynchronousQueue的時候课幕,如果找不到與之能夠配對的消費(fèi)者或者生產(chǎn)者锨并,則當(dāng)前線程會阻塞露该,直到對應(yīng)的線程將其喚醒,或者等待超時第煮,或者中斷解幼。
以下是它的put和take方法的實現(xiàn)抑党,不管是put還是take,其核心都是調(diào)用transferer對象的transfer方法撵摆,所以要弄明白SynchronousQueue底靠,就需要弄清楚Transferer。

public void put(E e) throws InterruptedException {
    if (e == null) throw new NullPointerException();
    if (transferer.transfer(e, false, 0) == null) {
        Thread.interrupted();
        throw new InterruptedException();
    }
}

public E take() throws InterruptedException {
    E e = transferer.transfer(null, false, 0);
    if (e != null)
        return e;
    Thread.interrupted();
    throw new InterruptedException();
}

我們先從整體上來看一下SynchronousQueue的內(nèi)部結(jié)構(gòu):


類結(jié)構(gòu)圖

從類圖上可以看出特铝,SynchronousQueue內(nèi)部引用了一個Transfer暑中,而Transfer的實現(xiàn)有兩種,一個是TransferStack鲫剿,一個是TransferQueue.
今天我們分析的重點(diǎn)對象是TransferStack.
1.先來看下TransferStack是如何初始化的:

//SynchronousQueue的構(gòu)造方法
public SynchronousQueue(boolean fair) {
      //如果初始化為非公平策略鳄逾,則使用TransferStack
      transferer = fair ? new TransferQueue<E>() : new TransferStack<E>();
}

2.仔細(xì)看一下TransferStack的實現(xiàn):

static final class TransferStack<E> extends Transferer<E> {
        //代表本次transfer的操作是獲取數(shù)據(jù)
        static final int REQUEST    = 0;
       //代筆本次transfer的操作是放入數(shù)據(jù)
        static final int DATA       = 1;
       //代表節(jié)點(diǎn)正在配對
        static final int FULFILLING = 2;
       //判斷節(jié)點(diǎn)目前是否在匹配中,3&2 == 2(匹配中)  
       //2&2 == 2(匹配中)    1&2==0 灵莲, 0&2==0 (等待匹配)
        static boolean isFulfilling(int m) { return (m & FULFILLING) != 0; }

        /** 內(nèi)部類雕凹,棧節(jié)點(diǎn)*/
        static final class SNode {
            volatile SNode next;        // next node in stack
            volatile SNode match;       // 配對的節(jié)點(diǎn)泳炉,如果未配對則未空
            volatile Thread waiter;     // to control park/unpark
            Object item;                // data; or null for REQUESTs
            int mode;

            SNode(Object item) {
                this.item = item;
            }

            boolean casNext(SNode cmp, SNode val) {
                return cmp == next &&
                    UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
            }

           /**
           嘗試配對
           s:被配對的節(jié)點(diǎn)
           配對的邏輯為:通過CAS設(shè)置節(jié)點(diǎn)的match屬性紊服,如果能設(shè)置成功,則說明配對成功造烁,配對成功后明场,再通過LockSupport.unpark(w);
將其對應(yīng)的等待線程喚醒汽摹。
          這個方法比較簡答,就不再累述
           **/
            boolean tryMatch(SNode s) {
                if (match == null &&
                    UNSAFE.compareAndSwapObject(this, matchOffset, null, s)) {
                    Thread w = waiter;
                    if (w != null) {    // waiters need at most one unpark
                        waiter = null;
                        LockSupport.unpark(w);
                    }
                    return true;
                }
                return match == s;
            }

            /**
             * Tries to cancel a wait by matching node to itself.
             如果一個節(jié)點(diǎn)等待超時榕堰,或者線程被中斷竖慧,則需要取消節(jié)點(diǎn)的等待,而取消等待的標(biāo)志就是將match指向自己
             */
            void tryCancel() {
                UNSAFE.compareAndSwapObject(this, matchOffset, null, this);
            }
            boolean isCancelled() {
                return match == this;
            }

            // Unsafe mechanics
            //Unsafe機(jī)制逆屡, 不懂的就不要看juc包了圾旨,先搞懂Unsafe再看
            private static final sun.misc.Unsafe UNSAFE;
            private static final long matchOffset;
            private static final long nextOffset;

            static {
                try {
                    UNSAFE = sun.misc.Unsafe.getUnsafe();
                    Class<?> k = SNode.class;
                    matchOffset = UNSAFE.objectFieldOffset
                        (k.getDeclaredField("match"));
                    nextOffset = UNSAFE.objectFieldOffset
                        (k.getDeclaredField("next"));
                } catch (Exception e) {
                    throw new Error(e);
                }
            }
        }

        /** The head (top) of the stack */
        //棧頂節(jié)點(diǎn)
        volatile SNode head;
       //重新CAS棧頂節(jié)點(diǎn),一般都用在棧頂元素出棧的時候
        boolean casHead(SNode h, SNode nh) {
            return h == head &&
                UNSAFE.compareAndSwapObject(this, headOffset, h, nh);
        }

        //構(gòu)造棧節(jié)點(diǎn)
        static SNode snode(SNode s, Object e, SNode next, int mode) {
            if (s == null) s = new SNode(e);
            s.mode = mode;
            s.next = next;
            return s;
        }

        //關(guān)鍵方法transfer魏蔗, 通過e是否為空來判斷本次操作是獲取元素還是放入元素
        E transfer(E e, boolean timed, long nanos) {
            SNode s = null; // constructed/reused as needed
            int mode = (e == null) ? REQUEST : DATA;  //e為空砍的, 表示該次請求為獲取元素,e不為空則是生產(chǎn)放入元素
            for (;;) {
                SNode h = head;
                //①當(dāng)頭節(jié)點(diǎn)為空莺治,則新加入節(jié)點(diǎn)直接入棧廓鞠,然后進(jìn)入等待,當(dāng)頭節(jié)點(diǎn)不為空谣旁,并且本次操作的模式和頭節(jié)點(diǎn)模式一樣床佳,則繼續(xù)進(jìn)入棧等待,否則去到代碼②
                if (h == null || h.mode == mode) {  // empty or same-mode       
                    if (timed && nanos <= 0) {      // can't wait //不需要等待
                        if (h != null && h.isCancelled()) //頭節(jié)點(diǎn)已經(jīng)中斷或者超時榄审,重新設(shè)置頭節(jié)點(diǎn)
                            casHead(h, h.next);     // pop cancelled node
                        else
                            return null; //由于不能等待砌们,而頭節(jié)點(diǎn)與本次入隊列的節(jié)點(diǎn)操作模式相同,所以直接返回空(如果可以等待的話會進(jìn)入自旋)
                    } else if (casHead(h, s = snode(s, e, h, mode))) { //新節(jié)點(diǎn)入棧
                        SNode m = awaitFulfill(s, timed, nanos); //等待該節(jié)點(diǎn)被配對或者超時中斷,詳情見下面的方法具體解釋浪感。
                        if (m == s) {               // wait was cancelled
                            clean(s);  //清理從head節(jié)點(diǎn)到s.next節(jié)點(diǎn)之間的“取消”節(jié)點(diǎn)
                            return null;  //返回null昔头,線程被中斷或者節(jié)點(diǎn)等待超時,調(diào)用者(上層應(yīng)用)能處理線程中斷邏輯影兽。
                        }
                
                         //s節(jié)點(diǎn)從awaitFulfill中出來揭斧,說明是配對成功節(jié)點(diǎn)將其喚醒,而喚醒的節(jié)點(diǎn)一定是棧頂節(jié)點(diǎn)(見代碼②邏輯峻堰,因為每次只允許有一個節(jié)點(diǎn)進(jìn)入配對狀態(tài)讹开,
                         //進(jìn)入配對狀態(tài)后,其它節(jié)點(diǎn)無法加入棧捐名,只有等到配對成功萧吠,重新casHead后其它節(jié)點(diǎn)才能入棧,所以這里直接判斷頭節(jié)點(diǎn))
                        if ((h = head) != null && h.next == s) //因為匹配線程那邊也可能在調(diào)用casHead,所以這里h.next == s判斷一下head是否已經(jīng)被重置  
                            casHead(h, s.next);     // help s's fulfiller   
                        
                        return (E) ((mode == REQUEST) ? m.item : s.item); //如果是消費(fèi)者則返回匹配上的m.item, 如果是生產(chǎn)者桐筏,則返回生產(chǎn)的節(jié)點(diǎn)的item
                     }
                }
                //②     
                //本次操作模式和頭節(jié)點(diǎn)模式不相等,先檢查頭節(jié)點(diǎn)模式是否為匹配中拇砰。
                //頭節(jié)點(diǎn)模式為 3或者2時為匹配中梅忌,這時任何新增的節(jié)點(diǎn)都無法入棧,因為任何新增操作的模式都不等于3或者2除破,所以在這次匹配完成之前牧氮,head節(jié)點(diǎn)是不會變化的。
                else if (!isFulfilling(h.mode)) { 
                    //頭節(jié)點(diǎn)模式為:0或者1
                    //head節(jié)點(diǎn)已經(jīng)取消瑰枫,所以將head指向下一個節(jié)點(diǎn)踱葛,head出棧
                    if (h.isCancelled())            
                         casHead(h, h.next);         // pop and retry 
            
                    //②-1
                    //如果頭節(jié)點(diǎn)模式為0或1,則本次操作的節(jié)點(diǎn)入棧光坝,入棧的模式為:2或者3尸诽, ps(這個時候頭節(jié)點(diǎn)判斷isFullfilling則返回true),
                    //所以一旦有節(jié)點(diǎn)在匹配中,則其它新增節(jié)點(diǎn)都會直接去到代碼③
                    //FULFILLING=2,  2|0 == 2, 2|1==3
                    else if (casHead(h, s=snode(s, e, h, FULFILLING|mode))) { 
            
                        for (;;) { // loop until matched or waiters disappear
                            SNode m = s.next;       // m is s's match
                            //如果新的頭節(jié)點(diǎn)后面沒有等待節(jié)點(diǎn)了(等待節(jié)點(diǎn)有可能被中斷或者超時出棧了)盯另,則清空棧性含,自旋重新開始
                            if (m == null) {        // all waiters are gone
                                casHead(s, null);   // pop fulfill node
                                s = null;           // use new node next time
                                break;              // restart main loop
                            }
                            //②-2如果下一個節(jié)點(diǎn)不為空,則嘗試匹配
                            SNode mn = m.next;
                            if (m.tryMatch(s)) { 
                                casHead(s, mn);     // pop both s and m //匹配成功鸳惯,則彈出頭節(jié)點(diǎn)和等待節(jié)點(diǎn)
                                //如果本次操作是獲取數(shù)據(jù)商蕴,則返回匹配上的生產(chǎn)數(shù)據(jù)的item,相反芝发, 如果本次操作是生產(chǎn)數(shù)據(jù)绪商,則直接返回自己生產(chǎn)的數(shù)據(jù)。
                                return (E) ((mode == REQUEST) ? m.item : s.item);
                            } else                  // lost match
                                //如果匹配失敻ňā(理論上一次只有一個節(jié)點(diǎn)能夠進(jìn)行匹配格郁,見②-1,所以理論上不應(yīng)該會匹配失敗,但是為什么會失敗呢理张?
                                //因為匹配的m還有可能超時或者被中斷)赫蛇,所以如果匹配失敗,彈出m節(jié)點(diǎn)雾叭。
                                s.casNext(m, mn);   // help unlink
                        }
                    }
                } 
                //③如果頭節(jié)點(diǎn)不為空悟耘,并且本次操作的模式和頭節(jié)點(diǎn)的模式不相同,并且頭節(jié)點(diǎn)是正在匹配
                else {                            // help a fulfiller
                    SNode m = h.next;               // m is h's match
                    //當(dāng)②中新的匹配節(jié)點(diǎn)進(jìn)入后织狐,有可能等待節(jié)點(diǎn)超時或者被中斷了暂幼,即waiter is gone
                    if (m == null)                  // waiter is gone
                    //刪除匹配節(jié)點(diǎn),棧清空重新開始移迫,因為最外層有一個自旋旺嬉,所以又會重新安置這個節(jié)點(diǎn)。
                    casHead(h, null);           // pop fulfilling node
                    else {
                        //這一段代碼邏輯與②-2相同
                        SNode mn = m.next;
                        if (m.tryMatch(h))          // help match
                            casHead(h, mn);         // pop both h and m
                        else                        // lost match
                            h.casNext(m, mn);       // help unlink
                    }
                }
            }
        }

        //讓節(jié)點(diǎn)進(jìn)入等待(通常發(fā)生在節(jié)點(diǎn)入棧時發(fā)現(xiàn)沒有對應(yīng)的匹配者)
        //根據(jù)設(shè)置的是否等待超時條件厨埋,讓當(dāng)前線程進(jìn)入等待配對的自旋邏輯中邪媳,只有當(dāng)配對成功,或者線程中斷才會退出荡陷。
        SNode awaitFulfill(SNode s, boolean timed, long nanos) {
            final long deadline = timed ? System.nanoTime() + nanos : 0L;
            Thread w = Thread.currentThread();
            int spins = (shouldSpin(s) ?
                         (timed ? maxTimedSpins : maxUntimedSpins) : 0);
            for (;;) {
                if (w.isInterrupted())  //當(dāng)前線程已經(jīng)中斷雨效,則取消當(dāng)前節(jié)點(diǎn)的配對等待
                    s.tryCancel();
                SNode m = s.match;
                if (m != null) //當(dāng)前節(jié)點(diǎn)已經(jīng)配對成功,或者取消(m==s)
                    return m;
                if (timed) { //有超時設(shè)置
                    nanos = deadline - System.nanoTime();
                    if (nanos <= 0L) { //已經(jīng)超時
                        s.tryCancel(); //取消s節(jié)點(diǎn)的等待
                        continue;
                    }
                }
                //不超時(一直等待)废赞,或者設(shè)置超時時間后還沒有超時徽龟。
                if (spins > 0)
                    spins = shouldSpin(s) ? (spins-1) : 0; //減少循環(huán)次數(shù)
                else if (s.waiter == null) //不再自旋,設(shè)置線程
                    s.waiter = w; // establish waiter so can park next iter
                else if (!timed) //如果設(shè)置的是一直等待唉地,則睡眠當(dāng)前線程
                    LockSupport.park(this);
                else if (nanos > spinForTimeoutThreshold) //如果是有時間的等待据悔,但是如果還剩下小于1秒的時間,則不再park耘沼,直接繼續(xù)自旋
                    LockSupport.parkNanos(this, nanos);
            }
        }


        //判斷是否還需要繼續(xù)自旋還是睡眠
        //這里要注意的是該方法只在awaitFulfill方法被調(diào)用极颓,而awaitFulfill只在入棧元素和棧頂元素操作模式一致時(者說明棧頂元素不是匹配中狀態(tài),見代碼②處的說明)
        boolean shouldSpin(SNode s) {
            SNode h = head;
            //這個邏輯尤其是h==null確實沒看明白什么時候會出現(xiàn)h!=s然后h == null,因為s入棧時head會指向s耕拷,就算s在自旋過程中有新的head節(jié)點(diǎn)加入讼昆,那h也不會為null。 先打骚烧?浸赫??
           //但是如果我來寫這個方法的話赃绊, 我我會寫一個shouldNotSpin,如下
            return (h == s || h == null || isFulfilling(h.mode));
        }
       //s不在棧頂既峡,并且棧頂元素不在匹配中,則棧頂以后的元素可以暫時不用自旋碧查,這樣邏輯就很容易說得通
       //因為棧頂元素還沒有進(jìn)來匹配者运敢,說明棧頂元素和s一樣屬于等待者(前者都還沒被匹配校仑,s自然不會被匹配,所以先睡一會兒)
        boolean shouldNotSpin(Snode s){
            Snode h = head;
            return (h!=null && h!=s && !isFulfilling(h.mode));
        }

        //清理節(jié)點(diǎn)出棧
        void clean(SNode s) {
            s.item = null;   // forget item
            s.waiter = null; // forget thread
            //這里為什么要嘗試下一個传惠,而不到下下一個迄沫??卦方?羊瘩?
            SNode past = s.next;
            if (past != null && past.isCancelled())
                past = past.next;
        
            // Absorb cancelled nodes at head
            //清理頭節(jié)點(diǎn)到past節(jié)點(diǎn)之間,從頭節(jié)點(diǎn)開始連續(xù)的“取消”狀態(tài)的節(jié)點(diǎn)(主要是清理頭節(jié)點(diǎn))
            SNode p;
            while ((p = head) != null && p != past && p.isCancelled())
                casHead(p, p.next);
        
            // Unsplice embedded nodes
            //清理頭節(jié)點(diǎn)到past節(jié)點(diǎn)之間盼砍,跳躍的被“取消”的節(jié)點(diǎn)(主要是清理頭節(jié)點(diǎn)與past中間的節(jié)點(diǎn))
            while (p != null && p != past) {
                SNode n = p.next;
                if (n != null && n.isCancelled())
                    p.casNext(n, n.next);
                else
                    p = n;
            }
        }

        // Unsafe mechanics
        private static final sun.misc.Unsafe UNSAFE;
        private static final long headOffset;
        static {
            try {
                UNSAFE = sun.misc.Unsafe.getUnsafe();
                Class<?> k = TransferStack.class;
                headOffset = UNSAFE.objectFieldOffset
                    (k.getDeclaredField("head"));
            } catch (Exception e) {
                throw new Error(e);
            }
        }
    }

核心邏輯就是以上的代碼尘吗,只要記住:SynchronousQueue不存儲實際的數(shù)據(jù)浇坐,它的棽谴罚或者隊列中存儲的是生產(chǎn)者和消費(fèi)者節(jié)點(diǎn),最開始進(jìn)入棧的節(jié)點(diǎn)就成為了等待者(這里不管是生產(chǎn)還是消費(fèi))近刘,而后面進(jìn)入的節(jié)點(diǎn)都需要根據(jù)操作的mode來判斷是繼續(xù)入棧等待還是入棧后立即進(jìn)行匹配擒贸。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市觉渴,隨后出現(xiàn)的幾起案子酗宋,更是在濱河造成了極大的恐慌,老刑警劉巖疆拘,帶你破解...
    沈念sama閱讀 217,657評論 6 505
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異寂曹,居然都是意外死亡哎迄,警方通過查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,889評論 3 394
  • 文/潘曉璐 我一進(jìn)店門隆圆,熙熙樓的掌柜王于貴愁眉苦臉地迎上來漱挚,“玉大人,你說我怎么就攤上這事渺氧≈祭裕” “怎么了?”我有些...
    開封第一講書人閱讀 164,057評論 0 354
  • 文/不壞的土叔 我叫張陵侣背,是天一觀的道長白华。 經(jīng)常有香客問我,道長贩耐,這世上最難降的妖魔是什么弧腥? 我笑而不...
    開封第一講書人閱讀 58,509評論 1 293
  • 正文 為了忘掉前任,我火速辦了婚禮潮太,結(jié)果婚禮上管搪,老公的妹妹穿的比我還像新娘虾攻。我一直安慰自己,他們只是感情好更鲁,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,562評論 6 392
  • 文/花漫 我一把揭開白布霎箍。 她就那樣靜靜地躺著,像睡著了一般澡为。 火紅的嫁衣襯著肌膚如雪漂坏。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 51,443評論 1 302
  • 那天缀壤,我揣著相機(jī)與錄音樊拓,去河邊找鬼。 笑死塘慕,一個胖子當(dāng)著我的面吹牛筋夏,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播图呢,決...
    沈念sama閱讀 40,251評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼条篷,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了蛤织?” 一聲冷哼從身側(cè)響起赴叹,我...
    開封第一講書人閱讀 39,129評論 0 276
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎指蚜,沒想到半個月后乞巧,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,561評論 1 314
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡摊鸡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,779評論 3 335
  • 正文 我和宋清朗相戀三年绽媒,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片免猾。...
    茶點(diǎn)故事閱讀 39,902評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡是辕,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出猎提,到底是詐尸還是另有隱情获三,我是刑警寧澤,帶...
    沈念sama閱讀 35,621評論 5 345
  • 正文 年R本政府宣布锨苏,位于F島的核電站疙教,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏伞租。R本人自食惡果不足惜松逊,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,220評論 3 328
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望肯夏。 院中可真熱鬧经宏,春花似錦犀暑、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,838評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至沪斟,卻和暖如春广辰,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背主之。 一陣腳步聲響...
    開封第一講書人閱讀 32,971評論 1 269
  • 我被黑心中介騙來泰國打工择吊, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人槽奕。 一個月前我還...
    沈念sama閱讀 48,025評論 2 370
  • 正文 我出身青樓几睛,卻偏偏與公主長得像,于是被迫代替她去往敵國和親粤攒。 傳聞我的和親對象是個殘疾皇子所森,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,843評論 2 354

推薦閱讀更多精彩內(nèi)容