J.U.C 阻塞隊列源碼剖析系列(四)之 SynchronousQueue

上一篇文章剖析了 LinkedBlockingQueue 的相關(guān)源碼掸绞,那這篇文章接著看另外一個常見的阻塞隊列 —— SynchronousQueue

簡介

SynchronousQueue 是一個比較特殊的阻塞隊列類,為什么這樣說呢?我們不妨從官方的類注釋說起...

根據(jù)類注釋可大概得出以下幾點:

  • 每一個插入操作都必須等待另一個線程完成刪除操作
  • 隊列沒有內(nèi)部容量哭当,所以不能迭代數(shù)據(jù)
  • 可以選擇公平策略。公平策略是使用隊列先入先出生均,非公平策略是使用堆棧先入后出

咦矢否?SynchronousQueue 對象沒有容量,那這個阻塞隊列的使用場景是什么呢眼虱?
其實線程池的其中一種實現(xiàn)——Executors.newCachedThreadPool就使用了SynchronousQueue作為阻塞隊列

那先從一個demo開始揭開 SynchronousQueue 的廬山真面目吧或舞!

public class SynchronousQueueDemo {
    public static void main(String[] args) throws InterruptedException {
        SynchronousQueue synchronousQueue = new SynchronousQueue();
        new Thread(() -> {
            try {
                synchronousQueue.put("Hello World!");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }, "線程一").start();
        new Thread(() -> {
            try {
                System.out.println(synchronousQueue.take());
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }, "線程二").start();
    }
}

示例中為什么在main方法里使用兩個線程分別執(zhí)行put和take操作呢?因為在同一線程中蒙幻,有可能存在先執(zhí)行take操作映凳,當程序執(zhí)行take方法的時候發(fā)現(xiàn)隊列為空就會阻塞當前線程,那么之后的put方法就不會執(zhí)行邮破,線程將會一直等待诈豌。

源碼剖析

成員變量

    // SynchronousQueue 定義的抽象類,由 TransferStack 和 TransferQueue 實現(xiàn)
    private transient volatile Transferer<E> transferer;

    // CPU 數(shù)量
    static final int NCPUS = Runtime.getRuntime().availableProcessors();

    // 自旋次數(shù)抒和,如果transfer指定了timeout時間矫渔,則使用maxTimeSpins,如果CPU數(shù)量小于2則自旋次數(shù)為0,否則為32摧莽。不會隨CPU數(shù)量增加而變化
    static final int maxTimedSpins = (NCPUS < 2) ? 0 : 32;

    // 自旋次數(shù)庙洼,如果沒有指定時間設(shè)置,則使用maxUntimedSpins。如果NCPUS數(shù)量大于等于2則設(shè)定為為32*16油够,否則為0
    static final int maxUntimedSpins = maxTimedSpins * 16;

    // 為了防止自定義的時間限過長蚁袭,為了優(yōu)化而設(shè)置,如果自定義時間長于這個值則取默認的 spinForTimeoutThreshold 石咬,單位為納秒揩悄。
    static final long spinForTimeoutThreshold = 1000L;

構(gòu)造函數(shù)

    // 默認使用非公平策略
    public SynchronousQueue() {
        this(false);
    }

    // fair為false是非公平策略,使用的數(shù)據(jù)結(jié)構(gòu)是棧鬼悠;fair為true是公平策略删性,使用的數(shù)據(jù)結(jié)構(gòu)是隊列
    public SynchronousQueue(boolean fair) {
        transferer = fair ? new TransferQueue<E>() : new TransferStack<E>();
    }

先列出相關(guān)方法的源碼,但并沒有加上注釋焕窝,因為核心方法都在 Transferer 對象中聲明5磐Α(值得注意的是,SynchronousQueue類并沒有實現(xiàn)remove它掂、removeAll汗侵、peek、clear等方法群发,都是使用默認值)

添加(add晰韵、offer、put)熟妓、刪除雪猪、查找元素

    public boolean add(E e) {
        if (offer(e))
            return true;
        else
            throw new IllegalStateException("Queue full");
    }
    public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException {
        if (e == null) throw new NullPointerException();
        if (transferer.transfer(e, true, unit.toNanos(timeout)) != null)
            return true;
        if (!Thread.interrupted())
            return false;
        throw new InterruptedException();
    }

    public boolean offer(E e) {
        if (e == null) throw new NullPointerException();
        return transferer.transfer(e, true, 0) != null;
    }
    public void put(E e) throws InterruptedException {
        if (e == null) throw new NullPointerException();
        if (transferer.transfer(e, false, 0) == null) {
            Thread.interrupted();
            throw new InterruptedException();
        }
    }
    
    public E poll() {
        return transferer.transfer(null, true, 0);
    }
    public E take() throws InterruptedException {
        E e = transferer.transfer(null, false, 0);
        if (e != null)
            return e;
        Thread.interrupted();
        throw new InterruptedException();
    }

相對于 ArrayBlockingQueue 和 LinkedBlockingQueue,可以發(fā)現(xiàn)類似poll起愈、take等相關(guān)方法都被抽象成統(tǒng)一方法來進行操作只恨,通過抽象出內(nèi)部類 Transferer 實現(xiàn)不同的操作。接下來抬虽,咱們重點看看公平模式與非公平模式下的源碼官觅。

1.SynchronousQueue 的非公平模式(TransferStack)

眾所周知,堆棧是FILO(First in last out)的方式阐污,所以也可以理解為非公平模式為什么使用棧這種數(shù)據(jù)結(jié)構(gòu)休涤,如果排隊的時候第一個進來,最后一個才能走笛辟,這很不公平嘛功氨!

在 TransferStack 內(nèi)部有 REQUEST、DATA手幢、FULFILLING 這三個狀態(tài)捷凄。

REQUEST 表示請求從棧獲取數(shù)據(jù)操作的消費者,如:take 方法围来;
DATA 表示往棧內(nèi)部放數(shù)據(jù)的生產(chǎn)者跺涤,如:put 方法匈睁;
FULFILLING 表示正在交易的生產(chǎn)者或消費者

REQUEST 和 DATA 這兩種狀態(tài)理解起來還不難,但或許 FULFILLING 還是不太清楚有什么用桶错,先帶著疑問往下去看航唆,現(xiàn)在只需要簡單的理解為:不同狀態(tài) REQUEST 和 DATA 可以相互匹配的,當與棧頂匹配后就會將他們狀態(tài)轉(zhuǎn)換為 FULFILLING牛曹,當匹配成功后就會將棧頂和匹配的元素一同出棧佛点。

成員變量

   //表示一個未填充的消費者
   static final int REQUEST = 0;
   //表示一個未填充的生產(chǎn)者
   static final int DATA = 1;
   // 表示生產(chǎn)者正在給等待資源的消費者補給資源醇滥,或生產(chǎn)者在等待消費者消費資源
   static final int FULFILLING = 2;
   //棧的頭結(jié)點
   volatile SNode head;

棧節(jié)點

    // 棧節(jié)點
    static final class SNode {
        // 節(jié)點的后繼
        volatile SNode next;
        // 相匹配的節(jié)點
        volatile SNode match;
        // 等待的線程
        volatile Thread waiter;

        // item和mode不需要可見黎比,由于他們總是在其他可見/原子操作寫之前,讀之后
        Object item;// 數(shù)據(jù)
        int mode;//節(jié)點模式

        SNode(Object item) {
            this.item = item;
        }

        // cas保證線程安全設(shè)置節(jié)點后繼節(jié)點
        boolean casNext(SNode cmp, SNode val) {
            return cmp == next && UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
        }

        //嘗試匹配目標節(jié)點與本節(jié)點鸳玩,如果匹配阅虫,可以喚醒線程。補給者調(diào)用tryMatch方法不跟,確定它們的等待線程颓帝。等待線程阻塞到它們自己被匹配。如果匹配返回true
        boolean tryMatch(SNode s) {
            // 設(shè)置本節(jié)點的匹配為s節(jié)點
            if (match == null && UNSAFE.compareAndSwapObject(this, matchOffset, null, s)) {
                Thread w = waiter;
                if (w != null) {
                    waiter = null;
                    LockSupport.unpark(w);
                }
                return true;
            }
            return match == s;
        }

        // 節(jié)點嘗試取消等待窝革,match 從原來的 null 變?yōu)閠his
        void tryCancel() {
            UNSAFE.compareAndSwapObject(this, matchOffset, null, this);
        }

        // match 指向自己购城,則取消等待
        boolean isCancelled() {
            return match == this;
        }
    }

核心方法

  • isFulfilling:判斷指定類型是否是互補模式
  • casHead(SNode h, SNode nh):替換當前頭結(jié)點
  • SNode snode(SNode s, Object e, SNode next, int mode):生成SNode節(jié)點對象
  • transfer(E e, boolean timed, long nanos): 主要處理邏輯
  • awaitFulfill(SNode s, boolean timed, long nanos): 等待fulfill操作
  • shouldSpin(SNode s):判斷節(jié)點s是頭結(jié)點或是fulfill節(jié)點則返回true
  • clean(SNode s):將head節(jié)點到S節(jié)點之間所有已經(jīng)取消的節(jié)點全部移出
    // 如果m是一個填充為單元,則返回true
    static boolean isFulfilling(int m) { return (m & FULFILLING) != 0; }
        
    // 比較head是否為h虐译,并且CAS操作nh為當前head
    boolean casHead(SNode h, SNode nh) {
        return h == head && UNSAFE.compareAndSwapObject(this, headOffset, h, nh);
    }

    //創(chuàng)建或重新設(shè)置節(jié)點的變量瘪板。在節(jié)點入棧時創(chuàng)建,在當可能需要保證減少intervals(間隔)讀和head的CAS操或避免由于競爭CAS操作節(jié)點入棧引起的垃圾時漆诽,此方法會被transfer調(diào)用
    static SNode snode(SNode s, Object e, SNode next, int mode) {
        if (s == null) s = new SNode(e);
        s.mode = mode;
        s.next = next;
        return s;
    }

    E transfer(E e, boolean timed, long nanos) {
        // 1.如果隊列為空或已經(jīng)包含相同模式的節(jié)點侮攀,則嘗試節(jié)點入棧,等待匹配返回厢拭,如果取消返回null兰英。
        // 2.如果包含一個互補模式的節(jié)點(take(REQUEST)->put(DATA);put(DATA)->take(REQUEST))供鸠,則嘗試一個FULFILLING節(jié)點入棧畦贸,同時匹配等待的協(xié)同節(jié)點,兩個節(jié)點同時出棧楞捂,返回匹配的元素家制。由于其他線程執(zhí)行步驟3,實際匹配和解除鏈接指針動作不會發(fā)生泡一。
        // 3.如果棧頂存在另外一個FULFILLING的節(jié)點颤殴,則匹配節(jié)點,并出棧鼻忠。這段的代碼與fulfilling相同涵但,除非沒有元素返回
        SNode s = null;
        // 根據(jù)元素判斷節(jié)點模式杈绸,元素不為null,則為DATA矮瘟,否則為REQUEST
        int mode = (e == null) ? REQUEST : DATA;

        for (;;) {
            //剛開始頭節(jié)點為null瞳脓,第一個進來的節(jié)點就是頭節(jié)點。
            SNode h = head;
            if (h == null || h.mode == mode) {// 如果是空隊列澈侠,或棧頭節(jié)點的模式與要放入的節(jié)點模式相同
                if (timed && nanos <= 0) {
                    //如果超時劫侧,則取消等待,出棧哨啃,設(shè)置棧頭為其后繼
                    if (h != null && h.isCancelled())
                        casHead(h, h.next);
                    else
                        return null;
                } else if (casHead(h, s = snode(s, e, h, mode))) {
                    //如果非超時烧栋,則將創(chuàng)建的新節(jié)點入棧成功,即放在棧頭拳球,自旋等待匹配節(jié)點(timed決定是否超時)
                    SNode m = awaitFulfill(s, timed, nanos);
                    // 返回的m == s 表示該節(jié)點被取消了或者超時审姓、中斷了
                    if (m == s) {
                        // 如果返回的是自己,節(jié)點取消等待祝峻,從棧中移除魔吐,并遍歷棧移除取消等待的節(jié)點
                        clean(s);
                        return null;
                    }
                    if ((h = head) != null && h.next == s)
                        //s節(jié)點匹配成功,則設(shè)置棧頭為s的后繼
                        casHead(h, s.next);
                    // 匹配成功莱找,REQUEST模式返回酬姆,匹配到的節(jié)點元素(DATA),DATA模式返回當前節(jié)點元素
                    return (E) ((mode == REQUEST) ? m.item : s.item);
                }
            } else if (!isFulfilling(h.mode)) { // 如果棧頭節(jié)點模式不為Fulfilling奥溺,判斷是否取消等待辞色,是則出棧
                if (h.isCancelled())            // already cancelled
                    casHead(h, h.next);         // pop and retry
                else if (casHead(h, s=snode(s, e, h, FULFILLING|mode))) { //非取消等待,則是節(jié)點入棧
                    for (;;) { // 自旋直到節(jié)點匹配或者等待節(jié)點都沒有
                        SNode m = s.next;
                        //后繼節(jié)點為null谚赎,則出棧
                        if (m == null) {        // 堆棧中沒有等待節(jié)點
                            casHead(s, null);   // 將棧頭節(jié)點位置設(shè)置為null
                            s = null;           // 棧頭節(jié)點設(shè)置為null淫僻,便于GC
                            break;              // 跳出當前循環(huán),重新執(zhí)行主循環(huán)
                        }
                        SNode mn = m.next;
                        // 嘗試匹配 s 節(jié)點
                        if (m.tryMatch(s)) {
                            //匹配成功兩個節(jié)點則出棧
                            casHead(s, mn);     // pop both s and m
                            return (E) ((mode == REQUEST) ? m.item : s.item);
                        } else
                            // 如果沒有匹配成功壶唤,則說明已經(jīng)有其它線程與 m 節(jié)點匹配了雳灵,將 mn 作為 s 的后繼節(jié)點
                            s.casNext(m, mn);   // help unlink
                    }
                }
            } else {
                //如果棧頭節(jié)點模式為Fulfilling,則代表棧頭節(jié)點正在與其它節(jié)點匹配闸盔,可以理解為協(xié)助棧頭節(jié)點匹配成功
                SNode m = h.next;               // m is h's match
                if (m == null)
                    //如果無后繼節(jié)點悯辙,則棧頭出棧
                    casHead(h, null);           // pop fulfilling node
                else {
                    //嘗試匹配,如果匹配成功迎吵,棧頭和匹配節(jié)點出棧躲撰,否則跳過后繼節(jié)點
                    SNode mn = m.next;
                    if (m.tryMatch(h))          // help match
                        casHead(h, mn);         // pop both h and m
                    else                        // lost match
                        h.casNext(m, mn);       // help unlink
                }
            }
        }
    }

    // 自旋或阻塞,直到節(jié)點被一個fulfill操作匹配
    SNode awaitFulfill(SNode s, boolean timed, long nanos) {
        final long deadline = timed ? System.nanoTime() + nanos : 0L;
        Thread w = Thread.currentThread();
        //獲取自旋的次數(shù)
        int spins = (shouldSpin(s) ? (timed ? maxTimedSpins : maxUntimedSpins) : 0);
        for (;;) {
            // 如果線程被中斷击费,則取消等待
            if (w.isInterrupted())
                s.tryCancel();
            SNode m = s.match;
            // 如果節(jié)點的匹配節(jié)點不為null拢蛋,則返回匹配節(jié)點
            if (m != null)
                return m;
            if (timed) {
                nanos = deadline - System.nanoTime();
                //如果超時,則取消等待
                if (nanos <= 0L) {
                    s.tryCancel();
                    continue;
                }
            }
            // 如果自旋次數(shù)大于零蔫巩,且可以自旋谆棱,則自旋次數(shù)減1
            if (spins > 0)
                spins = shouldSpin(s) ? (spins-1) : 0;
            else if (s.waiter == null)
                //如果節(jié)點S的等待線程為空快压,則設(shè)置當前節(jié)點為S節(jié)點的等待線程,以便可以park后繼節(jié)點垃瞧。
                s.waiter = w; // establish waiter so can park next iter
            else if (!timed)
                //非超時等在者蔫劣,park當前線程
                LockSupport.park(this);
            else if (nanos > spinForTimeoutThreshold)
                //如果超時時間大于,最大自旋閾值个从,則超時park當前線程
                LockSupport.parkNanos(this, nanos);
        }
    }

    // 如果節(jié)點在棧頭或棧頭為FULFILLING的節(jié)點脉幢,則返回true
    boolean shouldSpin(SNode s) {
        //因為很可能立刻就會有新的線程到來,那么就會立刻進行交易而不需要進行阻塞嗦锐,然后被喚醒嫌松,這是需要過程的,所以這樣的自旋等待是值得的意推。
        SNode h = head;
        return (h == s || h == null || isFulfilling(h.mode));
    }

    // 將head節(jié)點到S節(jié)點之間所有已經(jīng)取消的節(jié)點全部移出豆瘫。
    void clean(SNode s) {
        s.item = null;   // forget item
        s.waiter = null; // forget thread

        SNode past = s.next;
        if (past != null && past.isCancelled())
            past = past.next;

        // 如果取消的是頭節(jié)點則運行下面的清理操作珊蟀,操作邏輯很簡單就是判斷頭結(jié)點是不是取消節(jié)點菊值,如果是則將節(jié)點一定到下一個節(jié)點
        SNode p;
        while ((p = head) != null && p != past && p.isCancelled())
            //p是從頭節(jié)點開始第一個不移除的節(jié)點
            casHead(p, p.next);

        // 取消不是頭結(jié)點的嵌套節(jié)點
        while (p != null && p != past) {
            SNode n = p.next;
            if (n != null && n.isCancelled())
                //移除節(jié)點n
                p.casNext(n, n.next);
            else
                p = n;
        }
    }

因為堆棧的出棧和入棧操作都在 transfer 方法里面,所以不容易理解育灸。建議讀者多看幾遍腻窒,結(jié)合下面的圖和我個人分析的思路,一步一步的debug磅崭,這樣理解起來應(yīng)該就不難啦~

1.根據(jù)節(jié)點模式判斷是入棧(put)還是出棧(take)操作
2.判斷棧頭是否為空或棧頭節(jié)點操作是否和本次一樣儿子,是的話執(zhí)行第3步,否則執(zhí)行第6步
3.判斷是否是超時操作砸喻,如果是超時操作的話則執(zhí)行第4步柔逼,否則執(zhí)行第5步
4.判斷棧頭是否非空并且是否可以取消,是的話將棧頭后繼節(jié)點cas操作成為棧頭節(jié)點后執(zhí)行第1步割岛,否則返回null
5.cas操作創(chuàng)建節(jié)點并將該節(jié)點入棧愉适,自旋等待匹配節(jié)點
6.判斷棧頭節(jié)點模式是否為Fulfilling,如果不是的話執(zhí)行第7步癣漆,否則執(zhí)行第10步
7.判斷棧頭節(jié)點是否需要取消等待维咸,需要取消等待的話將棧頭節(jié)點的后繼節(jié)點cas操作成為頭節(jié)點后重新執(zhí)行第1步,否則執(zhí)行第8步
8.棧頭節(jié)點不需取消等待惠爽,將當前(take or put or poll)操作封裝為一個節(jié)點入棧后自旋堆棧癌蓖,直到棧頭節(jié)點與棧中其它節(jié)點匹配后兩個節(jié)點都出棧返回節(jié)點信息或者所有的等待節(jié)點都沒有后跳出子循環(huán)重新執(zhí)行第1步
9.棧頭節(jié)點模式為Fulfilling,如果棧頭節(jié)點的后繼節(jié)點為null婚肆,cas設(shè)置棧頭節(jié)點為null并執(zhí)行第1步租副,否則繼續(xù)嘗試匹配棧中其它節(jié)點

2.SynchronousQueue 的公平模式(TransferQueue)

公平模式下使用的數(shù)據(jù)結(jié)構(gòu)是隊列,其方式是先進先出(FIFO:First In First Out)较性。就比如說咱們在結(jié)賬排隊的時候用僧,肯定是先排隊的人先結(jié)賬呀讨越,這樣才公平!

隊列節(jié)點

// 隊列節(jié)點
static final class QNode {
    // 下一個節(jié)點
    volatile QNode next;
    // 元素信息
    volatile Object item;
    // 當前等待的線程
    volatile Thread waiter;
    // 是否是數(shù)據(jù)(put的時候是true永毅,take的時候是false)
    final boolean isData;

    QNode(Object item, boolean isData) {
        this.item = item;
        this.isData = isData;
    }

    // 替換當前節(jié)點的next節(jié)點
    boolean casNext(QNode cmp, QNode val) {
        return next == cmp &&
                UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
    }

    // 替換當前節(jié)點的item數(shù)據(jù)
    boolean casItem(Object cmp, Object val) {
        return item == cmp &&
                UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);
    }

    // 取消當前操作把跨,將當前item賦值為this(當前QNode節(jié)點)
    void tryCancel(Object cmp) {
        UNSAFE.compareAndSwapObject(this, itemOffset, cmp, this);
    }

    // 如果item是this(當前QNode節(jié)點)的話就返回true,反之返回false
    boolean isCancelled() {
        return item == this;
    }

    // 如果已知此節(jié)點離隊列沼死,判斷next節(jié)點是不是為this着逐,則返回true
    boolean isOffList() {
        return next == this;
    }
}

成員變量

// 隊列頭節(jié)點
transient volatile QNode head;
// 隊列尾節(jié)點
transient volatile QNode tail;
// 節(jié)點被取消但沒有從隊列中移除
transient volatile QNode cleanMe;

核心方法

  • advanceHead(QNode h, QNode nh):更新頭節(jié)點
  • advanceTail(QNode t, QNode nt):更新尾節(jié)點
  • casCleanMe(QNode cmp, QNode val):更新 cleanMe 節(jié)點
  • awaitFulfill(QNode s, E e, boolean timed, long nanos):等待fulfill操作
  • clean(QNode pred, QNode s):清空cleanMe節(jié)點
  • transfer(E e, boolean timed, long nanos): 主要處理邏輯
    void advanceHead(QNode h, QNode nh) {
        if (h == head && UNSAFE.compareAndSwapObject(this, headOffset, h, nh))
            h.next = h; // forget old next
    }
    
    void advanceTail(QNode t, QNode nt) {
        if (tail == t)
            UNSAFE.compareAndSwapObject(this, tailOffset, t, nt);
    }
    
    boolean casCleanMe(QNode cmp, QNode val) {
        return cleanMe == cmp && UNSAFE.compareAndSwapObject(this, cleanMeOffset, cmp, val);
    }
    
    Object awaitFulfill(QNode s, E e, boolean timed, long nanos) {
        // 和 TransferStack.awaitFulfill 方法的邏輯一樣,因此就不顯示方法的邏輯啦
    }
    
    void clean(QNode pred, QNode s) {
        s.waiter = null; // forget thread
        while (pred.next == s) { // Return early if already unlinked
            QNode h = head;
            QNode hn = h.next;   // Absorb cancelled first node as head
            if (hn != null && hn.isCancelled()) {
                advanceHead(h, hn);
                continue;
            }
            QNode t = tail;      // Ensure consistent read for tail
            if (t == h)
                return;
            QNode tn = t.next;
            // 判斷現(xiàn)在的t是不是末尾節(jié)點意蛀,可能其他線程插入了內(nèi)容導(dǎo)致不是最后的節(jié)點耸别。
            if (t != tail)
                continue;
            // 如果不是最后節(jié)點的話將其現(xiàn)在t.next節(jié)點作為tail尾節(jié)點。
            if (tn != null) {
                advanceTail(t, tn);
                continue;
            }
            // 如果當前節(jié)點不是尾節(jié)點進入到這里面县钥。
            if (s != t) {        // If not tail, try to unsplice
                // 獲取當前節(jié)點(被取消的節(jié)點)的下一個節(jié)點秀姐。
                QNode sn = s.next;
                // 修改上一個節(jié)點的next(下一個)元素為下下個節(jié)點。
                if (sn == s || pred.casNext(s, sn))
                    return;
            }
            QNode dp = cleanMe;
            // 嘗試清除上一個標記為清除的節(jié)點
            if (dp != null) {    // Try unlinking previous cancelled node
                //1.獲取要被清除的節(jié)點
                QNode d = dp.next;
                QNode dn;
                if (d == null ||               // d is gone or
                        d == dp ||                 // d is off list or
                        !d.isCancelled() ||        // d not cancelled or
                        (d != t &&                 // d not tail and
                                (dn = d.next) != null &&  //   has successor
                                dn != d &&                //   that is on list
                                dp.casNext(d, dn)))       // d unspliced
                    casCleanMe(dp, null);
                if (dp == pred)
                    return;      // s is already saved node
            } else if (casCleanMe(null, pred))
                return;          // Postpone cleaning s
        }
    }   
    
    E transfer(E e, boolean timed, long nanos) {
        QNode s = null; // constructed/reused as needed
        // 標識此次操作是存數(shù)據(jù)(put)還是取數(shù)據(jù)(take)
        boolean isData = (e != null);

        // 自旋匹配節(jié)點
        for (;;) {
            QNode t = tail;
            QNode h = head;
            // 如果頭節(jié)點或尾節(jié)點為空繼續(xù)自旋(在TransferQueue初始化的時候已經(jīng)賦值頭尾結(jié)點)
            if (t == null || h == null)         // saw uninitialized value
                continue;                       // spin

            // h == t 說明頭尾結(jié)點相同若贮,是空隊列
            // t.isData == isData 說明尾節(jié)點與當前操作一樣
            if (h == t || t.isData == isData) { // empty or same-mode
                QNode tn = t.next;
                // 如果臨時變量 t 不等于尾節(jié)點省有,說明有其它線程改變了尾節(jié)點,則重新自旋匹配節(jié)點
                if (t != tail)                  // inconsistent read
                    continue;
                // 如果尾節(jié)點之后的節(jié)點值不為空谴麦,說明也是有其它線程改變了尾節(jié)點蠢沿,將tn節(jié)點賦值給尾節(jié)點
                if (tn != null) {               // lagging tail
                    advanceTail(t, tn);
                    continue;
                }
                //超時直接返回 null
                if (timed && nanos <= 0)
                    return null;
                // 創(chuàng)建node節(jié)點
                if (s == null)
                    s = new QNode(e, isData);
                // 將新創(chuàng)建的node節(jié)點添加到隊列尾部,如果失敗則重新自旋
                if (!t.casNext(null, s))
                    continue;
                
                // 更新新創(chuàng)建節(jié)點為尾節(jié)點
                advanceTail(t, s);            
                // 調(diào)用 awaitFulfill 方法自旋匹配等待節(jié)點
                Object x = awaitFulfill(s, e, timed, nanos);
                // 如果返回當前節(jié)點匾效,則說明節(jié)點由于被取消舷蟀、超時、中斷導(dǎo)致匹配失敗
                if (x == s) {                   // wait was cancelled
                    // 清除當前等待匹配節(jié)點
                    clean(t, s);
                    return null;
                }

                // 判斷節(jié)點是否已從隊列離開
                if (!s.isOffList()) {           // not already unlinked
                    // 嘗試將s節(jié)點設(shè)置為head面哼,移出t
                    advanceHead(t, s);          // unlink if head
                    if (x != null)              // and forget fields
                        s.item = s;
                    // 釋放 s 節(jié)點當前的等待線程
                    s.waiter = null;
                }
                // 返回節(jié)點值(put返回put操作的值野宜,take返回匹配到的節(jié)點值)
                return (x != null) ? (E)x : e;
            } else {// 隊列不為空,并且當前操作與尾節(jié)點的操作不一致魔策。所以當前操作與尾節(jié)點的操作是互相匹配的
                QNode m = h.next;               // node to fulfill
                if (t != tail || m == null || h != head)
                    continue;                   // inconsistent read

                Object x = m.item;
                // isData == (x != null):判斷isData與x的模式是否相同匈子,相同表示已經(jīng)完成匹配,繼續(xù)自旋
                // x == m :m節(jié)點被取消了
                // !m.casItem(x, e):如果嘗試將數(shù)據(jù)e設(shè)置到m上失敗
                if (isData == (x != null) ||    // m already fulfilled
                        x == m ||                   // m cancelled
                        !m.casItem(x, e)) {         // lost CAS
                    // 將m設(shè)置為頭結(jié)點代乃,h出列旬牲,然后重試
                    advanceHead(h, m);          // dequeue and retry
                    continue;
                }

                // 成功匹配了,節(jié)點m 設(shè)置為頭結(jié)點搁吓,h出列
                advanceHead(h, m);              // successfully fulfilled
                // 喚醒節(jié)點 m 的等待線程
                LockSupport.unpark(m.waiter);
                // 返回節(jié)點值(put返回put操作的值原茅,take返回匹配到的節(jié)點值)
                return (x != null) ? (E)x : e;
            }
        }
    }        

OMG!源碼又是那么長堕仔。但其實也不是很難擂橘,知道一個規(guī)律就行:由于是使用隊列作為公平策略,所以在put的時候會在隊列尾部添加數(shù)據(jù)摩骨,而take的時候會從隊列尾部向隊列頭部方向?qū)ふ业谝粋€被阻塞的線程通贞,這樣就可以保證公平的朗若、按順序的釋放被阻塞的線程。

先簡單的總結(jié)一下核心流程(TransferQueue.trasnfer方法):
1.獲取當前操作是存數(shù)據(jù)還是取數(shù)據(jù)
2.自旋尋找匹配的節(jié)點(put操作匹配take操作昌罩、take操作匹配put操作)
3.如果頭節(jié)點或尾節(jié)點為空哭懈,則繼續(xù)執(zhí)行第2步
4.如果是空隊列或尾節(jié)點和當前操作一樣,執(zhí)行第5步茎用,否則執(zhí)行第13步
5.如果尾節(jié)點被其它線程更改遣总,重新執(zhí)行第2步,否則執(zhí)行第6步
6.如果尾節(jié)點的后繼節(jié)點不為空轨功,說明有其它線程更改旭斥,則設(shè)置后繼節(jié)點為尾節(jié)點,并重新執(zhí)行第2步古涧;否則執(zhí)行第7步
7.如果超時直接返回null垂券,否則往下執(zhí)行第8步
8.為當前操作創(chuàng)建節(jié)點并添加到隊列尾部,如果添加成功往下執(zhí)行第9步羡滑,否則執(zhí)行第2步
9.自旋匹配等待節(jié)點菇爪,當返回節(jié)點與當前節(jié)點不一樣,說明節(jié)點匹配成功啄栓,執(zhí)行第10步娄帖;否則說明由于被取消也祠、超時昙楚、中斷導(dǎo)致匹配失敗,則清除當前節(jié)點并返回null
10.如果節(jié)點已從隊列離開诈嘿,執(zhí)行第11步堪旧,否則執(zhí)行第12步
11.返回節(jié)點值(put返回put操作的值,take返回匹配到的節(jié)點值)
12.將節(jié)點從隊列中移除奖亚,并重新設(shè)置隊列頭節(jié)點后執(zhí)行第11步
13.執(zhí)行到這一步淳梦,說明當前操作與尾節(jié)點的操作是互相匹配;那么如果隊列頭節(jié)點是否匹配完成或隊列頭節(jié)點被取消昔字,又或者cas更新頭節(jié)點操作失敗爆袍,則執(zhí)行第14步,否則執(zhí)行第15步
14.重新設(shè)置頭節(jié)點并執(zhí)行第2步
15.執(zhí)行到這一步代表匹配成功作郭,重新設(shè)置隊列的頭節(jié)點陨囊,并喚醒頭節(jié)點的等待線程,最后返回節(jié)點值(put返回put操作的值夹攒,take返回匹配到的節(jié)點值)

還是依照個人習(xí)慣蜘醋,喜歡通過畫圖分析一下源碼流程!


總結(jié):

  • SynchronousQueue 是一個沒有隊列大小的概念咏尝,所有的操作都必須與其匹配的節(jié)點共同入隊出隊(公平模式)或入棧出棧(非公平模式)
  • SynchronousQueue 是輕量級的阻塞隊列压语。因為SynchronousQueue是沒有使用到鎖啸罢,都是通過CAS方法保證線程安全

其實也不難發(fā)現(xiàn),SynchronousQueue 的缺點也是十分明顯胎食。如果同一個模式的節(jié)點多的話扰才,就會一直阻塞,這是會損耗性能厕怜,所以需要根據(jù)實際業(yè)務(wù)場景使用训桶。

最后希望讀者們看源碼的時候,親自debug酣倾,這樣才會加深源碼的理解舵揭,讀任何文章都只是輔助,自己真正理解才是學(xué)會東西躁锡。

如果覺得源碼剖析不錯的話午绳,麻煩點個贊哈!對于文章有哪里不清楚或者有誤的地方映之,歡迎在評論區(qū)留言~

參考資料:

https://www.cnblogs.com/dwlsxj/p/Thread.html
慕課網(wǎng):面試官系統(tǒng)精講Java源碼及大廠真題: https://www.imooc.com/read/47/article/862

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末拦焚,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子杠输,更是在濱河造成了極大的恐慌赎败,老刑警劉巖,帶你破解...
    沈念sama閱讀 218,640評論 6 507
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件蠢甲,死亡現(xiàn)場離奇詭異僵刮,居然都是意外死亡,警方通過查閱死者的電腦和手機鹦牛,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,254評論 3 395
  • 文/潘曉璐 我一進店門搞糕,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人曼追,你說我怎么就攤上這事窍仰±河洌” “怎么了阱飘?”我有些...
    開封第一講書人閱讀 165,011評論 0 355
  • 文/不壞的土叔 我叫張陵,是天一觀的道長贡定。 經(jīng)常有香客問我晶伦,道長碟狞,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 58,755評論 1 294
  • 正文 為了忘掉前任坝辫,我火速辦了婚禮篷就,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘。我一直安慰自己竭业,他們只是感情好智润,可當我...
    茶點故事閱讀 67,774評論 6 392
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著未辆,像睡著了一般窟绷。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上咐柜,一...
    開封第一講書人閱讀 51,610評論 1 305
  • 那天兼蜈,我揣著相機與錄音,去河邊找鬼拙友。 笑死为狸,一個胖子當著我的面吹牛,可吹牛的內(nèi)容都是我干的遗契。 我是一名探鬼主播辐棒,決...
    沈念sama閱讀 40,352評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼牍蜂!你這毒婦竟也來了漾根?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 39,257評論 0 276
  • 序言:老撾萬榮一對情侶失蹤鲫竞,失蹤者是張志新(化名)和其女友劉穎辐怕,沒想到半個月后,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體从绘,經(jīng)...
    沈念sama閱讀 45,717評論 1 315
  • 正文 獨居荒郊野嶺守林人離奇死亡寄疏,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,894評論 3 336
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了顶考。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片赁还。...
    茶點故事閱讀 40,021評論 1 350
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖驹沿,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情蹈胡,我是刑警寧澤渊季,帶...
    沈念sama閱讀 35,735評論 5 346
  • 正文 年R本政府宣布,位于F島的核電站罚渐,受9級特大地震影響却汉,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜荷并,卻給世界環(huán)境...
    茶點故事閱讀 41,354評論 3 330
  • 文/蒙蒙 一合砂、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧源织,春花似錦翩伪、人聲如沸微猖。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,936評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽凛剥。三九已至,卻和暖如春轻姿,著一層夾襖步出監(jiān)牢的瞬間犁珠,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 33,054評論 1 270
  • 我被黑心中介騙來泰國打工互亮, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留犁享,地道東北人。 一個月前我還...
    沈念sama閱讀 48,224評論 3 371
  • 正文 我出身青樓豹休,卻偏偏與公主長得像饼疙,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子慕爬,可洞房花燭夜當晚...
    茶點故事閱讀 44,974評論 2 355

推薦閱讀更多精彩內(nèi)容