【死磕Java并發(fā)】-----J.U.C之阻塞隊(duì)列:SynchronousQueue

原文出處http://cmsblogs.com/chenssy

【注】:SynchronousQueue實(shí)現(xiàn)算法看的暈乎乎的亭姥,寫(xiě)了好久才寫(xiě)完莺禁,如果當(dāng)中有什么錯(cuò)誤之處,忘各位指正

作為BlockingQueue中的一員只酥,SynchronousQueue與其他BlockingQueue有著不同特性:

  1. SynchronousQueue沒(méi)有容量哩罪。與其他BlockingQueue不同,SynchronousQueue是一個(gè)不存儲(chǔ)元素的BlockingQueue锁摔。每一個(gè)put操作必須要等待一個(gè)take操作廓旬,否則不能繼續(xù)添加元素,反之亦然谐腰。
  2. 因?yàn)闆](méi)有容量孕豹,所以對(duì)應(yīng) peek, contains, clear, isEmpty ... 等方法其實(shí)是無(wú)效的。例如clear是不執(zhí)行任何操作的怔蚌,contains始終返回false,peek始終返回null巩步。
  3. SynchronousQueue分為公平和非公平,默認(rèn)情況下采用非公平性訪問(wèn)策略桦踊,當(dāng)然也可以通過(guò)構(gòu)造函數(shù)來(lái)設(shè)置為公平性訪問(wèn)策略(為true即可)椅野。
  4. 若使用 TransferQueue, 則隊(duì)列中永遠(yuǎn)會(huì)存在一個(gè) dummy node(這點(diǎn)后面詳細(xì)闡述)。

SynchronousQueue非常適合做交換工作,生產(chǎn)者的線程和消費(fèi)者的線程同步以傳遞某些信息竟闪、事件或者任務(wù)离福。

SynchronousQueue

與其他BlockingQueue一樣,SynchronousQueue同樣繼承AbstractQueue和實(shí)現(xiàn)BlockingQueue接口:

public class SynchronousQueue<E> extends AbstractQueue<E>
    implements BlockingQueue<E>, java.io.Serializable

SynchronousQueue提供了兩個(gè)構(gòu)造函數(shù):

    public SynchronousQueue() {
        this(false);
    }

    public SynchronousQueue(boolean fair) {
        // 通過(guò) fair 值來(lái)決定公平性和非公平性
        // 公平性使用TransferQueue炼蛤,非公平性采用TransferStack
        transferer = fair ? new TransferQueue<E>() : new TransferStack<E>();
    }

TransferQueue妖爷、TransferStack繼承Transferer,Transferer為SynchronousQueue的內(nèi)部類理朋,它提供了一個(gè)方法transfer()絮识,該方法定義了轉(zhuǎn)移數(shù)據(jù)的規(guī)范,如下:

    abstract static class Transferer<E> {
        abstract E transfer(E e, boolean timed, long nanos);
    }

transfer()方法主要用來(lái)完成轉(zhuǎn)移數(shù)據(jù)的嗽上,如果e != null次舌,相當(dāng)于將一個(gè)數(shù)據(jù)交給消費(fèi)者,如果e == null兽愤,則相當(dāng)于從一個(gè)生產(chǎn)者接收一個(gè)消費(fèi)者交出的數(shù)據(jù)彼念。

SynchronousQueue采用隊(duì)列TransferQueue來(lái)實(shí)現(xiàn)公平性策略,采用堆棧TransferStack來(lái)實(shí)現(xiàn)非公平性策略浅萧,他們兩種都是通過(guò)鏈表實(shí)現(xiàn)的逐沙,其節(jié)點(diǎn)分別為QNode,SNode洼畅。TransferQueue和TransferStack在SynchronousQueue中扮演著非常重要的作用吩案,SynchronousQueue的put、take操作都是委托這兩個(gè)類來(lái)實(shí)現(xiàn)的土思。

TransferQueue

TransferQueue是實(shí)現(xiàn)公平性策略的核心類务热,其節(jié)點(diǎn)為QNode,其定義如下:

    static final class TransferQueue<E> extends Transferer<E> {
        /** 頭節(jié)點(diǎn) */
        transient volatile QNode head;
        /** 尾節(jié)點(diǎn) */
        transient volatile QNode tail;
        // 指向一個(gè)取消的結(jié)點(diǎn)
        //當(dāng)一個(gè)節(jié)點(diǎn)中最后一個(gè)插入時(shí)己儒,它被取消了但是可能還沒(méi)有離開(kāi)隊(duì)列
        transient volatile QNode cleanMe;

        /**
         * 省略很多代碼O(∩_∩)O
         */
    }

在TransferQueue中除了頭、尾節(jié)點(diǎn)外還存在一個(gè)cleanMe節(jié)點(diǎn)捆毫。該節(jié)點(diǎn)主要用于標(biāo)記闪湾,當(dāng)刪除的節(jié)點(diǎn)是尾節(jié)點(diǎn)時(shí)則需要使用該節(jié)點(diǎn)。

同時(shí)绩卤,對(duì)于TransferQueue需要注意的是途样,其隊(duì)列永遠(yuǎn)都存在一個(gè)dummy node,在構(gòu)造時(shí)創(chuàng)建:

        TransferQueue() {
            QNode h = new QNode(null, false); // initialize to dummy node.
            head = h;
            tail = h;
        }

在TransferQueue中定義了QNode類來(lái)表示隊(duì)列中的節(jié)點(diǎn)濒憋,QNode節(jié)點(diǎn)定義如下:

    static final class QNode {
        // next 域
        volatile QNode next;
        // item數(shù)據(jù)項(xiàng)
        volatile Object item;
        //  等待線程何暇,用于park/unpark
        volatile Thread waiter;       // to control park/unpark
        //模式,表示當(dāng)前是數(shù)據(jù)還是請(qǐng)求凛驮,只有當(dāng)匹配的模式相匹配時(shí)才會(huì)交換
        final boolean isData;

        QNode(Object item, boolean isData) {
            this.item = item;
            this.isData = isData;
        }

        /**
         * CAS next域裆站,在TransferQueue中用于向next推進(jìn)
         */
        boolean casNext(QNode cmp, QNode val) {
            return next == cmp &&
                    UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
        }

        /**
         * CAS itme數(shù)據(jù)項(xiàng)
         */
        boolean casItem(Object cmp, Object val) {
            return item == cmp &&
                    UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);
        }

        /**
         *  取消本結(jié)點(diǎn),將item域設(shè)置為自身
         */
        void tryCancel(Object cmp) {
            UNSAFE.compareAndSwapObject(this, itemOffset, cmp, this);
        }

        /**
         * 是否被取消
         * 與tryCancel相照應(yīng)只需要判斷item釋放等于自身即可
         */
        boolean isCancelled() {
            return item == this;
        }
        
        
        boolean isOffList() {
            return next == this;
        }

        private static final sun.misc.Unsafe UNSAFE;
        private static final long itemOffset;
        private static final long nextOffset;

        static {
            try {
                UNSAFE = sun.misc.Unsafe.getUnsafe();
                Class<?> k = QNode.class;
                itemOffset = UNSAFE.objectFieldOffset
                        (k.getDeclaredField("item"));
                nextOffset = UNSAFE.objectFieldOffset
                        (k.getDeclaredField("next"));
            } catch (Exception e) {
                throw new Error(e);
            }
        }
    }

上面代碼沒(méi)啥好看的,需要注意的一點(diǎn)就是isData宏胯,該屬性在進(jìn)行數(shù)據(jù)交換起到關(guān)鍵性作用羽嫡,兩個(gè)線程進(jìn)行數(shù)據(jù)交換的時(shí)候,必須要兩者的模式保持一致肩袍。

TransferStack

TransferStack用于實(shí)現(xiàn)非公平性,定義如下:

    static final class TransferStack<E> extends Transferer<E> {

        static final int REQUEST    = 0;

        static final int DATA       = 1;

        static final int FULFILLING = 2;

        volatile SNode head;

        /**
         * 省略一堆代碼  O(∩_∩)O~
         */

    }

TransferStack中定義了三個(gè)狀態(tài):REQUEST表示消費(fèi)數(shù)據(jù)的消費(fèi)者,DATA表示生產(chǎn)數(shù)據(jù)的生產(chǎn)者皮仁,F(xiàn)ULFILLING剃根,表示匹配另一個(gè)生產(chǎn)者或消費(fèi)者。任何線程對(duì)TransferStack的操作都屬于上述3種狀態(tài)中的一種(對(duì)應(yīng)著SNode節(jié)點(diǎn)的mode)艰管。同時(shí)還包含一個(gè)head域甫窟,表示頭結(jié)點(diǎn)。

內(nèi)部節(jié)點(diǎn)SNode定義如下:

    static final class SNode {
        // next 域
        volatile SNode next;
        // 相匹配的節(jié)點(diǎn)
        volatile SNode match;
        // 等待的線程
        volatile Thread waiter;
        // item 域
        Object item;                // data; or null for REQUESTs

        // 模型
        int mode;

        /**
         * item域和mode域不需要使用volatile修飾蛙婴,因?yàn)樗鼈冊(cè)趘olatile/atomic操作之前寫(xiě)粗井,之后讀
         */
       
        SNode(Object item) {
            this.item = item;
        }

        boolean casNext(SNode cmp, SNode val) {
            return cmp == next &&
                    UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
        }

        /**
         * 將s結(jié)點(diǎn)與本結(jié)點(diǎn)進(jìn)行匹配,匹配成功街图,則unpark等待線程
         */
        boolean tryMatch(SNode s) {
            if (match == null &&
                    UNSAFE.compareAndSwapObject(this, matchOffset, null, s)) {
                Thread w = waiter;
                if (w != null) {    // waiters need at most one unpark
                    waiter = null;
                    LockSupport.unpark(w);
                }
                return true;
            }
            return match == s;
        }

        void tryCancel() {
            UNSAFE.compareAndSwapObject(this, matchOffset, null, this);
        }

        boolean isCancelled() {
            return match == this;
        }

        // Unsafe mechanics
        private static final sun.misc.Unsafe UNSAFE;
        private static final long matchOffset;
        private static final long nextOffset;

        static {
            try {
                UNSAFE = sun.misc.Unsafe.getUnsafe();
                Class<?> k = SNode.class;
                matchOffset = UNSAFE.objectFieldOffset
                        (k.getDeclaredField("match"));
                nextOffset = UNSAFE.objectFieldOffset
                        (k.getDeclaredField("next"));
            } catch (Exception e) {
                throw new Error(e);
            }
        }
    }

上面簡(jiǎn)單介紹了TransferQueue浇衬、TransferStack,由于SynchronousQueue的put餐济、take操作都是調(diào)用Transfer的transfer()方法耘擂,只不過(guò)是傳遞的參數(shù)不同而已,put傳遞的是e參數(shù)絮姆,所以模式為數(shù)據(jù)(公平isData = true醉冤,非公平mode= DATA),而take操作傳遞的是null篙悯,所以模式為請(qǐng)求(公平isData = false蚁阳,非公平mode = REQUEST),如下:

    // put操作
    public void put(E e) throws InterruptedException {
        if (e == null) throw new NullPointerException();
        if (transferer.transfer(e, false, 0) == null) {
            Thread.interrupted();
            throw new InterruptedException();
        }
    }

    // take操作
    public E take() throws InterruptedException {
        E e = transferer.transfer(null, false, 0);
        if (e != null)
            return e;
        Thread.interrupted();
        throw new InterruptedException();
    }

公平模式

公平性調(diào)用TransferQueue的transfer方法:

    E transfer(E e, boolean timed, long nanos) {
        QNode s = null;
        // 當(dāng)前節(jié)點(diǎn)模式
        boolean isData = (e != null);

        for (;;) {
            QNode t = tail;
            QNode h = head;
            // 頭鸽照、尾節(jié)點(diǎn) 為null螺捐,沒(méi)有初始化
            if (t == null || h == null)
                continue;

            // 頭尾節(jié)點(diǎn)相等(隊(duì)列為null) 或者當(dāng)前節(jié)點(diǎn)和隊(duì)列節(jié)點(diǎn)模式一樣
            if (h == t || t.isData == isData) {
                // tn = t.next
                QNode tn = t.next;
                // t != tail表示已有其他線程操作了,修改了tail矮燎,重新再來(lái)
                if (t != tail)
                    continue;
                // tn != null定血,表示已經(jīng)有其他線程添加了節(jié)點(diǎn),tn 推進(jìn)诞外,重新處理
                if (tn != null) {
                    // 當(dāng)前線程幫忙推進(jìn)尾節(jié)點(diǎn)澜沟,就是嘗試將tn設(shè)置為尾節(jié)點(diǎn)
                    advanceTail(t, tn);
                    continue;
                }
                //  調(diào)用的方法的 wait 類型的, 并且 超時(shí)了, 直接返回 null
                // timed 在take操作闡述
                if (timed && nanos <= 0)
                    return null;

                // s == null,構(gòu)建一個(gè)新節(jié)點(diǎn)Node
                if (s == null)
                    s = new QNode(e, isData);

                // 將新建的節(jié)點(diǎn)加入到隊(duì)列中峡谊,如果不成功茫虽,繼續(xù)處理
                if (!t.casNext(null, s))
                    continue;

                // 替換尾節(jié)點(diǎn)
                advanceTail(t, s);

                // 調(diào)用awaitFulfill, 若節(jié)點(diǎn)是 head.next, 則進(jìn)行自旋
                // 若不是的話, 直接 block, 直到有其他線程 與之匹配, 或它自己進(jìn)行線程的中斷
                Object x = awaitFulfill(s, e, timed, nanos);

                // 若返回的x == s表示刊苍,當(dāng)前線程已經(jīng)超時(shí)或者中斷,不然的話s == null或者是匹配的節(jié)點(diǎn)
                if (x == s) {
                    // 清理節(jié)點(diǎn)S
                    clean(t, s);
                    return null;
                }

                // isOffList:用于判斷節(jié)點(diǎn)是否已經(jīng)從隊(duì)列中離開(kāi)了
                if (!s.isOffList()) {
                    // 嘗試將S節(jié)點(diǎn)設(shè)置為head席噩,移出t
                    advanceHead(t, s);
                    if (x != null)
                        s.item = s;
                    // 釋放線程 ref
                    s.waiter = null;
                }

                // 返回
                return (x != null) ? (E)x : e;

            }

            // 這里是從head.next開(kāi)始班缰,因?yàn)門(mén)ransferQueue總是會(huì)存在一個(gè)dummy node節(jié)點(diǎn)
            else {
                // 節(jié)點(diǎn)
                QNode m = h.next;

                // 不一致讀,重新開(kāi)始
                // 有其他線程更改了線程結(jié)構(gòu)
                if (t != tail || m == null || h != head)
                    continue;

                /**
                 * 生產(chǎn)者producer和消費(fèi)者consumer匹配操作
                 */
                Object x = m.item;
                // isData == (x != null):判斷isData與x的模式是否相同悼枢,相同表示已經(jīng)匹配了
                // x == m :m節(jié)點(diǎn)被取消了
                // !m.casItem(x, e):如果嘗試將數(shù)據(jù)e設(shè)置到m上失敗
                if (isData == (x != null) ||  x  == m || !m.casItem(x, e)) {
                    // 將m設(shè)置為頭結(jié)點(diǎn)埠忘,h出列,然后重試
                    advanceHead(h, m);
                    continue;
                }

                // 成功匹配了馒索,m設(shè)置為頭結(jié)點(diǎn)h出列莹妒,向前推進(jìn)
                advanceHead(h, m);
                // 喚醒m上的等待線程
                LockSupport.unpark(m.waiter);
                return (x != null) ? (E)x : e;
            }
        }
    }

整個(gè)transfer的算法如下:

  1. 如果隊(duì)列為null或者尾節(jié)點(diǎn)模式與當(dāng)前節(jié)點(diǎn)模式一致,則嘗試將節(jié)點(diǎn)加入到等待隊(duì)列中(采用自旋的方式)绰上,直到被匹配或旨怠、超時(shí)或者取消。匹配成功的話要么返回null(producer返回的)要么返回真正傳遞的值(consumer返回的)蜈块,如果返回的是node節(jié)點(diǎn)本身則表示當(dāng)前線程超時(shí)或者取消了鉴腻。
  2. 如果隊(duì)列不為null,且隊(duì)列的節(jié)點(diǎn)是當(dāng)前節(jié)點(diǎn)匹配的節(jié)點(diǎn)百揭,則進(jìn)行數(shù)據(jù)的傳遞匹配并返回匹配節(jié)點(diǎn)的數(shù)據(jù)
  3. 在整個(gè)過(guò)程中都會(huì)檢測(cè)并幫助其他線程推進(jìn)

當(dāng)隊(duì)列為空時(shí)爽哎,節(jié)點(diǎn)入列然后通過(guò)調(diào)用awaitFulfill()方法自旋,該方法主要用于自旋/阻塞節(jié)點(diǎn)器一,直到節(jié)點(diǎn)被匹配返回或者取消课锌、中斷。

    Object awaitFulfill(QNode s, E e, boolean timed, long nanos) {

        // 超時(shí)控制
        final long deadline = timed ? System.nanoTime() + nanos : 0L;
        Thread w = Thread.currentThread();
        // 自旋次數(shù)
        // 如果節(jié)點(diǎn)Node恰好是head節(jié)點(diǎn)祈秕,則自旋一段時(shí)間渺贤,這里主要是為了效率問(wèn)題,如果里面阻塞请毛,會(huì)存在喚醒志鞍、線程上下文切換的問(wèn)題
        // 如果生產(chǎn)者、消費(fèi)者者里面到來(lái)的話获印,就避免了這個(gè)阻塞的過(guò)程
        int spins = ((head.next == s) ?
                (timed ? maxTimedSpins : maxUntimedSpins) : 0);
        // 自旋
        for (;;) {
            // 線程中斷了述雾,剔除當(dāng)前節(jié)點(diǎn)
            if (w.isInterrupted())
                s.tryCancel(e);

            // 如果線程進(jìn)行了阻塞 -> 喚醒或者中斷了,那么x != e 肯定成立兼丰,直接返回當(dāng)前節(jié)點(diǎn)即可
            Object x = s.item;
            if (x != e)
                return x;
            // 超時(shí)判斷
            if (timed) {
                nanos = deadline - System.nanoTime();
                // 如果超時(shí)了,取消節(jié)點(diǎn),continue唆缴,在if(x != e)肯定會(huì)成立鳍征,直接返回x
                if (nanos <= 0L) {
                    s.tryCancel(e);
                    continue;
                }
            }

            // 自旋- 1
            if (spins > 0)
                --spins;

            // 等待線程
            else if (s.waiter == null)
                s.waiter = w;

            // 進(jìn)行沒(méi)有超時(shí)的 park
            else if (!timed)
                LockSupport.park(this);

            // 自旋次數(shù)過(guò)了, 直接 + timeout 方式 park
            else if (nanos > spinForTimeoutThreshold)
                LockSupport.parkNanos(this, nanos);
        }
    }

在自旋/阻塞過(guò)程中做了一點(diǎn)優(yōu)化,就是判斷當(dāng)前節(jié)點(diǎn)是否為對(duì)頭元素面徽,如果是的則先自旋艳丛,如果自旋次數(shù)過(guò)了匣掸,則才阻塞,這樣做的主要目的就在如果生產(chǎn)者氮双、消費(fèi)者立馬來(lái)匹配了則不需要阻塞碰酝,因?yàn)樽枞拘褧?huì)消耗資源戴差。在整個(gè)自旋的過(guò)程中會(huì)不斷判斷是否超時(shí)或者中斷了送爸,如果中斷或者超時(shí)了則調(diào)用tryCancel()取消該節(jié)點(diǎn)。

tryCancel

            void tryCancel(Object cmp) {
                UNSAFE.compareAndSwapObject(this, itemOffset, cmp, this);
            }

取消過(guò)程就是將節(jié)點(diǎn)的item設(shè)置為自身(itemOffset是item的偏移量)暖释。所以在調(diào)用awaitFulfill()方法時(shí)袭厂,如果當(dāng)前線程被取消、中斷球匕、超時(shí)了那么返回的值肯定時(shí)S纹磺,否則返回的則是匹配的節(jié)點(diǎn)。如果返回值是節(jié)點(diǎn)S亮曹,那么if(x == s)必定成立橄杨,如下:

                    Object x = awaitFulfill(s, e, timed, nanos);
                    if (x == s) {                   // wait was cancelled
                        clean(t, s);
                        return null;
                    }

如果返回的x == s成立,則調(diào)用clean()方法清理節(jié)點(diǎn)S:

    void clean(QNode pred, QNode s) {
        //
        s.waiter = null;

        while (pred.next == s) {
            QNode h = head;
            QNode hn = h.next;
            // hn節(jié)點(diǎn)被取消了照卦,向前推進(jìn)
            if (hn != null && hn.isCancelled()) {
                advanceHead(h, hn);
                continue;
            }

            // 隊(duì)列為空式矫,直接return null
            QNode t = tail;
            if (t == h)
                return;

            QNode tn = t.next;
            // 不一致,說(shuō)明有其他線程改變了tail節(jié)點(diǎn)窄瘟,重新開(kāi)始
            if (t != tail)
                continue;

            // tn != null 推進(jìn)tail節(jié)點(diǎn)衷佃,重新開(kāi)始
            if (tn != null) {
                advanceTail(t, tn);
                continue;
            }

            // s 不是尾節(jié)點(diǎn) 移出
            if (s != t) {
                QNode sn = s.next;
                // 如果s已經(jīng)被移除退出循環(huán),否則嘗試斷開(kāi)s
                if (sn == s || pred.casNext(s, sn))
                    return;
            }

            // s是尾節(jié)點(diǎn)蹄葱,則有可能會(huì)有其他線程在添加新節(jié)點(diǎn)氏义,則cleanMe出場(chǎng)
            QNode dp = cleanMe;
            // 如果dp不為null,說(shuō)明是前一個(gè)被取消節(jié)點(diǎn)图云,將其移除
            if (dp != null) {
                QNode d = dp.next;
                QNode dn;
                if (d == null ||               // 節(jié)點(diǎn)d已經(jīng)刪除
                        d == dp ||                 // 原來(lái)的節(jié)點(diǎn) cleanMe 已經(jīng)通過(guò) advanceHead 進(jìn)行刪除
                        !d.isCancelled() ||        // 原來(lái)的節(jié)點(diǎn) s已經(jīng)刪除
                        (d != t &&                 // d 不是tail節(jié)點(diǎn)
                                (dn = d.next) != null &&  //
                                dn != d &&                //   that is on list
                                dp.casNext(d, dn)))       // d unspliced
                    // 清除 cleanMe 節(jié)點(diǎn), 這里的 dp == pred 若成立, 說(shuō)明清除節(jié)點(diǎn)s惯悠,成功, 直接return, 不然的話要再次循環(huán)
                    casCleanMe(dp, null);
                if (dp == pred)
                    return;
            } else if (casCleanMe(null, pred))  // 原來(lái)的 cleanMe 是 null, 則將 pred 標(biāo)記為 cleamMe 為下次 清除 s 節(jié)點(diǎn)做標(biāo)識(shí)
                return;
        }
    }

這個(gè)clean()方法感覺(jué)有點(diǎn)兒難度,我也看得不是很懂竣况。這里是引用http://www.reibang.com/p/95cb570c8187

  1. 刪除的節(jié)點(diǎn)不是queue尾節(jié)點(diǎn), 這時(shí) 直接 pred.casNext(s, s.next) 方式來(lái)進(jìn)行刪除(和ConcurrentLikedQueue中差不多)

  2. 刪除的節(jié)點(diǎn)是隊(duì)尾節(jié)點(diǎn)

  • 此時(shí) cleanMe == null, 則 前繼節(jié)點(diǎn)pred標(biāo)記為 cleanMe, 為下次刪除做準(zhǔn)備
  • 此時(shí) cleanMe != null, 先刪除上次需要?jiǎng)h除的節(jié)點(diǎn), 然后將 cleanMe至null, 讓后再將 pred 賦值給 cleanMe

非公平模式

非公平模式transfer方法如下:

    E transfer(E e, boolean timed, long nanos) {
        SNode s = null; // constructed/reused as needed
        int mode = (e == null) ? REQUEST : DATA;

        for (;;) {
            SNode h = head;
            // 棧為空或者當(dāng)前節(jié)點(diǎn)模式與頭節(jié)點(diǎn)模式一樣克婶,將節(jié)點(diǎn)壓入棧內(nèi),等待匹配
            if (h == null || h.mode == mode) {
                // 超時(shí)
                if (timed && nanos <= 0) {
                    // 節(jié)點(diǎn)被取消了丹泉,向前推進(jìn)
                    if (h != null && h.isCancelled())
                        //  重新設(shè)置頭結(jié)點(diǎn)(彈出之前的頭結(jié)點(diǎn))
                        casHead(h, h.next);
                    else
                        return null;
                }
                // 不超時(shí)
                // 生成一個(gè)SNode節(jié)點(diǎn)情萤,并嘗試替換掉頭節(jié)點(diǎn)head (head -> s)
                else if (casHead(h, s = snode(s, e, h, mode))) {
                    // 自旋,等待線程匹配
                    SNode m = awaitFulfill(s, timed, nanos);
                    // 返回的m == s 表示該節(jié)點(diǎn)被取消了或者超時(shí)摹恨、中斷了
                    if (m == s) {
                        // 清理節(jié)點(diǎn)S筋岛,return null
                        clean(s);
                        return null;
                    }

                    // 因?yàn)橥ㄟ^(guò)前面一步將S替換成了head,如果h.next == s 晒哄,則表示有其他節(jié)點(diǎn)插入到S前面了,變成了head
                    // 且該節(jié)點(diǎn)就是與節(jié)點(diǎn)S匹配的節(jié)點(diǎn)
                    if ((h = head) != null && h.next == s)
                        // 將s.next節(jié)點(diǎn)設(shè)置為head睁宰,相當(dāng)于取消節(jié)點(diǎn)h肪获、s
                        casHead(h, s.next);

                    // 如果是請(qǐng)求則返回匹配的域,否則返回節(jié)點(diǎn)S的域
                    return (E) ((mode == REQUEST) ? m.item : s.item);
                }
            }

            // 如果棧不為null柒傻,且兩者模式不匹配(h != null && h.mode != mode)
            // 說(shuō)明他們是一隊(duì)對(duì)等匹配的節(jié)點(diǎn)孝赫,嘗試用當(dāng)前節(jié)點(diǎn)s來(lái)滿足h節(jié)點(diǎn)
            else if (!isFulfilling(h.mode)) {
                // head 節(jié)點(diǎn)已經(jīng)取消了,向前推進(jìn)
                if (h.isCancelled())
                    casHead(h, h.next);

                // 嘗試將當(dāng)前節(jié)點(diǎn)打上"正在匹配"的標(biāo)記红符,并設(shè)置為head
                else if (casHead(h, s=snode(s, e, h, FULFILLING|mode))) {
                    // 循環(huán)loop
                    for (;;) {
                        // s為當(dāng)前節(jié)點(diǎn)青柄,m是s的next節(jié)點(diǎn),
                        // m節(jié)點(diǎn)是s節(jié)點(diǎn)的匹配節(jié)點(diǎn)
                        SNode m = s.next;
                        // m == null违孝,其他節(jié)點(diǎn)把m節(jié)點(diǎn)匹配走了
                        if (m == null) {
                            // 將s彈出
                            casHead(s, null);
                            // 將s置空刹前,下輪循環(huán)的時(shí)候還會(huì)新建
                            s = null;
                            // 退出該循環(huán),繼續(xù)主循環(huán)
                            break;
                        }
                        // 獲取m的next節(jié)點(diǎn)
                        SNode mn = m.next;
                        // 嘗試匹配
                        if (m.tryMatch(s)) {
                            // 匹配成功雌桑,將s 喇喉、 m彈出
                            casHead(s, mn);     // pop both s and m
                            return (E) ((mode == REQUEST) ? m.item : s.item);
                        } else
                            // 如果沒(méi)有匹配成功,說(shuō)明有其他線程已經(jīng)匹配了校坑,把m移出
                            s.casNext(m, mn);
                    }
                }
            }
            // 到這最后一步說(shuō)明節(jié)點(diǎn)正在匹配階段
            else {
                // head 的next的節(jié)點(diǎn)拣技,是正在匹配的節(jié)點(diǎn),m 和 h配對(duì)
                SNode m = h.next;

                // m == null 其他線程把m節(jié)點(diǎn)搶走了耍目,彈出h節(jié)點(diǎn)
                if (m == null)
                    casHead(h, null);
                else {
                    SNode mn = m.next;
                    if (m.tryMatch(h))
                        casHead(h, mn);
                    else 
                        h.casNext(m, mn);
                }
            }
        }
    }

整個(gè)處理過(guò)程分為三種情況膏斤,具體如下:

  1. 如果當(dāng)前棧為空獲取節(jié)點(diǎn)模式與棧頂模式一樣,則嘗試將節(jié)點(diǎn)加入棧內(nèi)邪驮,同時(shí)通過(guò)自旋方式等待節(jié)點(diǎn)匹配莫辨,最后返回匹配的節(jié)點(diǎn)或者null(被取消)
  2. 如果棧不為空且節(jié)點(diǎn)的模式與首節(jié)點(diǎn)模式匹配,則嘗試將該節(jié)點(diǎn)打上FULFILLING標(biāo)記毅访,然后加入棧中沮榜,與相應(yīng)的節(jié)點(diǎn)匹配,成功后將這兩個(gè)節(jié)點(diǎn)彈出棧并返回匹配節(jié)點(diǎn)的數(shù)據(jù)
  3. 如果有節(jié)點(diǎn)在匹配喻粹,那么幫助這個(gè)節(jié)點(diǎn)完成匹配和出棧操作蟆融,然后在主循環(huán)中繼續(xù)執(zhí)行

當(dāng)節(jié)點(diǎn)加入棧內(nèi)后,通過(guò)調(diào)用awaitFulfill()方法自旋等待節(jié)點(diǎn)匹配:

    SNode awaitFulfill(SNode s, boolean timed, long nanos) {
        // 超時(shí)
        final long deadline = timed ? System.nanoTime() + nanos : 0L;
        // 當(dāng)前線程
        Thread w = Thread.currentThread();

        // 自旋次數(shù)
        // shouldSpin 用于檢測(cè)當(dāng)前節(jié)點(diǎn)是否需要自旋
        // 如果棧為空守呜、該節(jié)點(diǎn)是首節(jié)點(diǎn)或者該節(jié)點(diǎn)是匹配節(jié)點(diǎn)型酥,則先采用自旋,否則阻塞
        int spins = (shouldSpin(s) ?
                (timed ? maxTimedSpins : maxUntimedSpins) : 0);
        for (;;) {
            // 線程中斷了查乒,取消該節(jié)點(diǎn)
            if (w.isInterrupted())
                s.tryCancel();

            // 匹配節(jié)點(diǎn)
            SNode m = s.match;

            // 如果匹配節(jié)點(diǎn)m不為空弥喉,則表示匹配成功,直接返回
            if (m != null)
                return m;
            // 超時(shí)
            if (timed) {
                nanos = deadline - System.nanoTime();
                // 節(jié)點(diǎn)超時(shí)玛迄,取消
                if (nanos <= 0L) {
                    s.tryCancel();
                    continue;
                }
            }

            // 自旋;每次自旋的時(shí)候都需要檢查自身是否滿足自旋條件档桃,滿足就 - 1,否則為0
            if (spins > 0)
                spins = shouldSpin(s) ? (spins-1) : 0;

            // 第一次阻塞時(shí)憔晒,會(huì)將當(dāng)前線程設(shè)置到s上
            else if (s.waiter == null)
                s.waiter = w;

            // 阻塞 當(dāng)前線程
            else if (!timed)
                LockSupport.park(this);
            // 超時(shí)
            else if (nanos > spinForTimeoutThreshold)
                LockSupport.parkNanos(this, nanos);
        }
    }

awaitFulfill()方法會(huì)一直自旋/阻塞直到匹配節(jié)點(diǎn)藻肄。在S節(jié)點(diǎn)阻塞之前會(huì)先調(diào)用shouldSpin()方法判斷是否采用自旋方式,為的就是如果有生產(chǎn)者或者消費(fèi)者馬上到來(lái)拒担,就不需要阻塞了嘹屯,在多核條件下這種優(yōu)化是有必要的。同時(shí)在調(diào)用park()阻塞之前會(huì)將當(dāng)前線程設(shè)置到S節(jié)點(diǎn)的waiter上从撼。匹配成功州弟,返回匹配節(jié)點(diǎn)m。

shouldSpin()方法如下:

        boolean shouldSpin(SNode s) {
            SNode h = head;
            return (h == s || h == null || isFulfilling(h.mode));
        }

同時(shí)在阻塞過(guò)程中會(huì)一直檢測(cè)當(dāng)前線程是否中斷了低零,如果中斷了婆翔,則調(diào)用tryCancel()方法取消該節(jié)點(diǎn),取消過(guò)程就是將當(dāng)前節(jié)點(diǎn)的math設(shè)置為當(dāng)前節(jié)點(diǎn)掏婶。所以如果線程中斷了啃奴,那么在返回m時(shí)一定是S節(jié)點(diǎn)自身。

            void tryCancel() {
                UNSAFE.compareAndSwapObject(this, matchOffset, null, this);
            }

awaitFullfill()方法如果返回的m == s雄妥,則表示當(dāng)前節(jié)點(diǎn)已經(jīng)中斷取消了最蕾,則需要調(diào)用clean()方法,清理節(jié)點(diǎn)S:

    void clean(SNode s) {

        // 清理item域
        s.item = null;
        // 清理waiter域
        s.waiter = null;

        // past節(jié)點(diǎn)
        SNode past = s.next;
        if (past != null && past.isCancelled())
            past = past.next;

        // 從棧頂head節(jié)點(diǎn)老厌,取消從棧頂head到past節(jié)點(diǎn)之間所有已經(jīng)取消的節(jié)點(diǎn)
        // 注意:這里如果遇到一個(gè)節(jié)點(diǎn)沒(méi)有取消瘟则,則會(huì)退出while
        SNode p;
        while ((p = head) != null && p != past && p.isCancelled())
            casHead(p, p.next);     // 如果p節(jié)點(diǎn)已經(jīng)取消了,則剔除該節(jié)點(diǎn)

        // 如果經(jīng)歷上面while p節(jié)點(diǎn)還沒(méi)有取消枝秤,則再次循環(huán)取消掉所有p 到past之間的取消節(jié)點(diǎn)
        while (p != null && p != past) {
            SNode n = p.next;
            if (n != null && n.isCancelled())
                p.casNext(n, n.next);
            else
                p = n;
        }
    }

clean()方法就是將head節(jié)點(diǎn)到S節(jié)點(diǎn)之間所有已經(jīng)取消的節(jié)點(diǎn)全部移出醋拧。【不清楚為何要用兩個(gè)while淀弹,一個(gè)不行么】

至此丹壕,SynchronousQueue的源碼分析完成了,說(shuō)下我個(gè)人感覺(jué)吧:個(gè)人感覺(jué)SynchronousQueue實(shí)現(xiàn)好復(fù)雜(可能是自己智商不夠吧~~~(>_<)~~~)垦页,源碼看了好久雀费,這篇博客寫(xiě)了將近一個(gè)星期,如果有什么錯(cuò)誤之處痊焊,煩請(qǐng)各位指正U蛋馈!

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末薄啥,一起剝皮案震驚了整個(gè)濱河市辕羽,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌垄惧,老刑警劉巖刁愿,帶你破解...
    沈念sama閱讀 219,110評(píng)論 6 508
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場(chǎng)離奇詭異到逊,居然都是意外死亡铣口,警方通過(guò)查閱死者的電腦和手機(jī)滤钱,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,443評(píng)論 3 395
  • 文/潘曉璐 我一進(jìn)店門(mén),熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)脑题,“玉大人件缸,你說(shuō)我怎么就攤上這事∈逅欤” “怎么了他炊?”我有些...
    開(kāi)封第一講書(shū)人閱讀 165,474評(píng)論 0 356
  • 文/不壞的土叔 我叫張陵,是天一觀的道長(zhǎng)已艰。 經(jīng)常有香客問(wèn)我痊末,道長(zhǎng),這世上最難降的妖魔是什么哩掺? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 58,881評(píng)論 1 295
  • 正文 為了忘掉前任凿叠,我火速辦了婚禮,結(jié)果婚禮上疮丛,老公的妹妹穿的比我還像新娘幔嫂。我一直安慰自己,他們只是感情好誊薄,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,902評(píng)論 6 392
  • 文/花漫 我一把揭開(kāi)白布履恩。 她就那樣靜靜地躺著,像睡著了一般呢蔫。 火紅的嫁衣襯著肌膚如雪切心。 梳的紋絲不亂的頭發(fā)上,一...
    開(kāi)封第一講書(shū)人閱讀 51,698評(píng)論 1 305
  • 那天片吊,我揣著相機(jī)與錄音绽昏,去河邊找鬼。 笑死俏脊,一個(gè)胖子當(dāng)著我的面吹牛全谤,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播爷贫,決...
    沈念sama閱讀 40,418評(píng)論 3 419
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼认然,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼!你這毒婦竟也來(lái)了漫萄?” 一聲冷哼從身側(cè)響起卷员,我...
    開(kāi)封第一講書(shū)人閱讀 39,332評(píng)論 0 276
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤,失蹤者是張志新(化名)和其女友劉穎腾务,沒(méi)想到半個(gè)月后毕骡,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,796評(píng)論 1 316
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,968評(píng)論 3 337
  • 正文 我和宋清朗相戀三年未巫,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了窿撬。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 40,110評(píng)論 1 351
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡橱赠,死狀恐怖尤仍,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情狭姨,我是刑警寧澤,帶...
    沈念sama閱讀 35,792評(píng)論 5 346
  • 正文 年R本政府宣布苏遥,位于F島的核電站饼拍,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏田炭。R本人自食惡果不足惜师抄,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,455評(píng)論 3 331
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望教硫。 院中可真熱鬧叨吮,春花似錦、人聲如沸瞬矩。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 32,003評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)景用。三九已至涵叮,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間伞插,已是汗流浹背割粮。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 33,130評(píng)論 1 272
  • 我被黑心中介騙來(lái)泰國(guó)打工, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留媚污,地道東北人舀瓢。 一個(gè)月前我還...
    沈念sama閱讀 48,348評(píng)論 3 373
  • 正文 我出身青樓,卻偏偏與公主長(zhǎng)得像耗美,于是被迫代替她去往敵國(guó)和親京髓。 傳聞我的和親對(duì)象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,047評(píng)論 2 355

推薦閱讀更多精彩內(nèi)容