并發(fā)編程之 SynchronousQueue 核心源碼分析

前言

SynchronousQueue 是一個普通用戶不怎么常用的隊列,通常在創(chuàng)建無界線程池(Executors.newCachedThreadPool())的時候使用哪替,也就是那個非常危險的線程池 ^_^庸娱。

它是一個非常特殊的阻塞隊列,他的模式是:在 offer的時候,如果沒有另一個線程在 take 或者 poll 的話,就會失敗边篮,反之,如果在 take或者 poll的時候奏甫,沒有線程在offer 苟耻,則也會失敗,而這種特性扶檐,則非常適合用來做高響應并且線程不固定的線程池的Queue。所以胁艰,在很多高性能服務器中款筑,如果并發(fā)很高,這時候腾么,普通的 LinkedQueue就會成為瓶頸奈梳,性能就會出現(xiàn)毛刺,當換上 SynchronousQueue后解虱,性能就會好很多攘须。

今天就看看這個特殊的 Queue 是怎么實現(xiàn)的。友情提示:代碼有些小復雜殴泰。于宙。浮驳。請做好心理準備。

源碼實現(xiàn)

SynchronousQueue 內(nèi)部分為公平(隊列)和非公平(棧)捞魁,隊列的性能相對而言會好點至会。構造方法中,就看出來了谱俭。默認是非公平的奉件,通常非公平(棧 FIFO)的性能會高那么一點點。

構造方法

public SynchronousQueue(boolean fair) {
    transferer = fair ? new TransferQueue<E>() : new TransferStack<E>();
}

offer 方法

該方法我們通常建議使用帶有超時機制的 offer方法昆著。

public boolean offer(E e, long timeout, TimeUnit unit)
    throws InterruptedException {
    if (e == null) throw new NullPointerException();
    if (transferer.transfer(e, true, unit.toNanos(timeout)) != null)
        return true;
    if (!Thread.interrupted())
        return false;
    throw new InterruptedException();
}

從上面的代碼中县貌,可以看到核心方法就是 transfer方法。如果該方法返回 true凑懂,表示煤痕,插入成功,如果失敗征候,就返回 false杭攻。

poll 方法

public E poll(long timeout, TimeUnit unit) throws InterruptedException {
    E e = transferer.transfer(null, true, unit.toNanos(timeout));
    if (e != null || !Thread.interrupted())
        return e;
    throw new InterruptedException();
}

同樣的該方法也是調用了 transfer 方法。結果返回得到的值或者null疤坝。區(qū)別在于兆解,offer方法的 e 參數(shù)是實體的。而 poll 方法 e 參數(shù)是 null跑揉,我們猜測锅睛,方法內(nèi)部肯定根據(jù)這個做了判斷。所以历谍,重點在于transfer方法的實現(xiàn)现拒。

而 transferer 有 2 種,隊列和棧望侈,我們就研究一種印蔬,知曉其原理,另一種有時間在看脱衙。

TransferQueue 源碼實現(xiàn)

構造方法:

TransferQueue() {
    QNode h = new QNode(null, false); // initialize to dummy node.
    head = h;
    tail = h;
}

構造一個 Node 節(jié)點侥猬,注釋說這是一個加的 node。并賦值給 head 和 tail 節(jié)點捐韩。形成一個初始化的鏈表退唠。

看看這個 node:

/** Node class for TransferQueue. */
static final class QNode {
    volatile QNode next;          // next node in queue
    volatile Object item;         // CAS'ed to or from null
    volatile Thread waiter;       // to control park/unpark
    final boolean isData;
}

node 持有隊列中下一個 node,node 對應的值 value荤胁,持有該 node 的線程瞧预,擁有 park 或者 unpark,這里用的是 JUC 的工具類 LockSupport,還有一個布爾類型垢油,isData盆驹,這個非常重要,需要好好理解秸苗,到后面我們會好好講解召娜。

我們更關注的是這個類的 transfer 方法,該方法是 SynchronousQueue 的核心惊楼。

該方法接口定義如下:

/**
 * Performs a put or take. put 或者 take
 *
 * @param e if non-null, the item to be handed to a consumer;
 *          if null, requests that transfer return an item
 *          offered by producer. 
 * @param timed if this operation should timeout
 * @param nanos the timeout, in nanoseconds
 * @return if non-null, the item provided or received; if null,
 *         the operation failed due to timeout or interrupt --
 *         the caller can distinguish which of these occurred
 *         by checking Thread.interrupted.
 */
abstract E transfer(E e, boolean timed, long nanos);

注釋說道 e 參數(shù)的作用:

如果 e 不是 null(說明是生產(chǎn)者調用) 玖瘸,將 item 交給消費者,并返回 e檀咙;反之雅倒,如果是 null(說明是消費者調用),將生產(chǎn)者提供的 item 返回給消費者弧可。

看看 TransferQueue 類的 transfer 方法實現(xiàn)蔑匣,樓主寫了很多的注釋嘗試解讀:

QNode s = null; // constructed/reused as needed
boolean isData = (e != null);// 當輸入的是數(shù)據(jù)時,isData 就是 ture棕诵,表明這個操作是一個輸入數(shù)據(jù)的操作裁良;同理,當調用者輸入的是 null校套,則是在消費數(shù)據(jù)价脾。

for (;;) {
    QNode t = tail;
    QNode h = head;
    if (t == null || h == null)         // 如果并發(fā)導致未"來得及"初始化
        continue;                       // 自旋重來

    // 以下分成兩個部分進行

    // 1. 如果當前操作和 tail 節(jié)點的操作是一樣的;或者頭尾相同(表明隊列中啥都沒有)笛匙。
    if (h == t || t.isData == isData) { 
        QNode tn = t.next;
        if (t != tail)                  // 如果 t 和 tail 不一樣侨把,說明,tail 被其他的線程改了妹孙,重來
            continue;
        if (tn != null) {               // 如果 tail 的 next 不是空秋柄。就需要將 next 追加到 tail 后面了。
            advanceTail(t, tn); // 使用 CAS 將 tail.next 變成 tail,        
            continue;
        }
        if (timed && nanos <= 0)        // 時間到了蠢正,不等待骇笔,返回 null,插入失敗嚣崭,獲取也是失敗的蜘拉。
            return null;
        if (s == null) // 如果能走到這里,說明 tail 的 next 是 null有鹿,這里的判斷是避免重復創(chuàng)建 Qnode 對象。
            s = new QNode(e, isData);// 創(chuàng)建一個新的節(jié)點谎脯。
        if (!t.casNext(null, s))        // 嘗試 CAS 將這個剛剛創(chuàng)建的節(jié)點追加到 tail 的 next 節(jié)點上.
            continue;// 如果失敗葱跋,則重來

        advanceTail(t, s); // 當新的節(jié)點成功追加到 tail 節(jié)點的 next 上了, 就嘗試將 tail.next 節(jié)點覆蓋 tail 節(jié)點,稱之為推進。
        // s == 新節(jié)點娱俺,“可能”是新的 tail稍味;e 是實際數(shù)據(jù)。
        Object x = awaitFulfill(s, e, timed, nanos);// 該方法作用就是荠卷,讓當前線程等待模庐。排除意外情況和超時的話,就是等待其他線程拿走數(shù)據(jù)并替換成 isData 不同的數(shù)據(jù)油宜。
        if (x == s) { // x == s 是什么意思呢掂碱? 表明在 awaitFulfill 方法中,這個數(shù)據(jù)被取消了慎冤,tryCancel 方法就是將 item 覆蓋了 QNode疼燥。說明這次操作失敗了。
            clean(t, s);// 操作失敗則需要清理數(shù)據(jù)蚁堤,并返回 null醉者。
            return null;
        }

        // 如果一切順利,確實被其他線程喚醒了披诗,其他線程也交換了數(shù)據(jù)撬即。
        // 這個判斷:next != this,說明了什么呈队?當這個 tail 節(jié)點的 next 不再指向自己剥槐,說明了
        if (!s.isOffList()) {           // not already unlinked
            // 這一步是將 S 節(jié)點設置為 Head,并且將新 Head 的 next 指向自己掂咒,讓 Head 和之前的 next 斷開才沧。
            advanceHead(t, s);          // unlink if head     
            // 當 x 不是 null,表明對方線程是存放數(shù)據(jù)的绍刮。     
            if (x != null)              // and forget fields
                // 這一步操作將自己的 item 設置成自己温圆。
                s.item = s;
            // 將 S 節(jié)點的持有線程變成 null。
            s.waiter = null;
        }
        // x 不是 null 表明孩革,對方線程是生產(chǎn)者岁歉,返回他生產(chǎn)的數(shù)據(jù);如果是 null膝蜈,說明對方線程是消費者锅移,那他自己就是生產(chǎn)者,返回自己的數(shù)據(jù)饱搏,表示成功非剃。
        return (x != null) ? (E)x : e;

    } 
    // 2. 如果當前的操作類型和 tail 的操作不一樣。稱之為互補推沸。
    else {                            // complementary-mode
        QNode m = h.next;               // node to fulfill
        // 如果下方這些判斷沒過备绽,說明并發(fā)修改了券坞,自旋重來。 
        if (t != tail || m == null || h != head)
            continue;                   // inconsistent read

        Object x = m.item;
        // 如果 head 節(jié)點的 isData 和當前操作相同肺素,
        // 如果 操作不同恨锚,但 head 的 item 就是自身,也就是發(fā)生了取消操作倍靡,tryCancel 方法會做這件事情猴伶。
        // 如果上面2個都不滿足,嘗試使用 CAS 將 e 覆蓋 item塌西。 
        if (isData == (x != null) ||    // m already fulfilled
            x == m ||                   // m cancelled
            !m.casItem(x, e)) {         // lost CAS
            // CAS 失敗了他挎,Head 的操作類型和當前類型相同,item 被取消了雨让,都會走這里雇盖。
            // 將 h.next 覆蓋 head。重來栖忠。
            advanceHead(h, m);          // dequeue and retry
            continue;
        }
        // 這里也是將 h.next 覆蓋 head崔挖。能夠走到這里,說明庵寞,上面的 CAS 操作成功了狸相,當前線程已經(jīng)將 e 覆蓋了 next 的 item 。
        advanceHead(h, m);              // successfully fulfilled
        // 喚醒 next 的 線程捐川。提醒他可以取出數(shù)據(jù)脓鹃,或者“我”已經(jīng)拿到數(shù)據(jù)了。
        LockSupport.unpark(m.waiter);
        // 如果 x 不是 null古沥,表明這是一次消費數(shù)據(jù)的操作瘸右,反之,這是一次生產(chǎn)數(shù)據(jù)的操作岩齿。
        return (x != null) ? (E)x : e;
    }
}

說實話太颤,代碼還是比較復雜的。JDK 中注釋是這么說的:

基本算法是死循環(huán)采取 2 種方式中的其中一種盹沈。
1 如果隊列是空的龄章,或者持有相同的模式節(jié)點(isData 相同),就嘗試添加節(jié)點到隊列中乞封,并讓當前線程等待做裙。
2 如果隊列中有線程在等待,那么就使用一種互補的方式肃晚,使用 CAS 和等待者交換數(shù)據(jù)锚贱。并返回。

什么意思呢关串?

首先明確一點拧廊,隊列中杂穷,數(shù)據(jù)有 2 種情況(但同時只存在一種),要么QNode 中有實際數(shù)據(jù)(offer 的時候卦绣,是有數(shù)據(jù)的,但沒有“人”來确沈尽)滤港,要么沒有實際數(shù)據(jù)(poll 的時候,隊列中沒有數(shù)據(jù)趴拧,線程只好等待)溅漾。隊列在哪一種狀態(tài)取決于他為空后,第一個插入的是什么類型的數(shù)據(jù)著榴。

樓主畫了點圖來表示:

  1. 隊列初始化的時候添履,只有一個空的Node
image.png
  1. 此時脑又,一個線程嘗試 offer 或者 poll數(shù)據(jù)暮胧,都會插入一個 Node 插入到節(jié)點中。
image.png
  1. 假設剛剛發(fā)生的是 offer 操作问麸,這個時候往衷,另一個線程也來 offer,這時就會有 2 個節(jié)點严卖。
image.png
  1. 這個時候席舍,隊列中有 2 個有真實數(shù)據(jù)(offer 操作)的節(jié)點了,注意哮笆,這個時候来颤,那 2 個線程都是 wait的,因為沒有人接受他們的數(shù)據(jù)稠肘。此時福铅,又來一個線程,做 poll 操作启具。
image.png

從上圖可以看出本讥,poll 線程從head 開始取數(shù)據(jù),因為它的 isDatatail 節(jié)點的 isData 不同鲁冯,那么就會從 head 開始找節(jié)點拷沸,并嘗試將自己的 null 值和節(jié)點中的真實數(shù)據(jù)進行交換。并喚醒等待中的線程薯演。

這 4 幅圖就是 SynchronousQueue的精華撞芍。

既然叫做同步隊列,一定是 A 線程生產(chǎn)數(shù)據(jù)的時候跨扮,有 B 線程在消費序无,否則 A 線程就需要等待验毡,反之,如果 A 線程準備消費數(shù)據(jù)帝嗡,但隊列中沒有數(shù)據(jù)晶通,線程也會等待,直到有 B 線程存放數(shù)據(jù)哟玷。

而 JDK 的實現(xiàn)原理則是:使用一個隊列狮辽,隊列中的用一個 isData 來區(qū)分生產(chǎn)還是消費,所有新操作都根據(jù) tail 節(jié)點的模式來決定到底是追加到 tail節(jié)點還是和 tail節(jié)點(從 head 開始)交換數(shù)據(jù)巢寡。

而所謂的交換是從head開始喉脖,取出節(jié)點的實際數(shù)據(jù),然后使用 CAS 和匹配到的節(jié)點進行交換抑月。從而完成兩個線程直接交換數(shù)據(jù)的操作树叽。

為什么他在某些情況下,比LinkedBlockingQueue性能高呢谦絮?其中有個原因就是沒有使用鎖题诵,減少了線程上下文切換。第二則是線程之間交換數(shù)據(jù)的方式更加的高效挨稿。

好仇轻,重點部分講完了,再看看其中線程是如何等待的奶甘。邏輯在 awaitFulfill 方法中:

// 自旋或者等待篷店,直到填充完畢
// 這里的策略是什么呢?如果自旋次數(shù)不夠了臭家,通常是 16 次疲陕,但還有超過 1 秒的時間,就阻塞等待被喚醒钉赁。
// 如果時間到了蹄殃,就取消這次的入隊行為。
// 返回的是 Node 本身
// s.item 就是 e 
Object awaitFulfill(QNode s, E e, boolean timed, long nanos) {
    final long deadline = timed ? System.nanoTime() + nanos : 0L;
    Thread w = Thread.currentThread();
    int spins = ((head.next == s) ?// 如果成功將 tail.next 覆蓋了 tail你踩,如果有超時機制诅岩,則自旋 32 次,如果沒有超時機制带膜,則自旋 32 *16 = 512次
                 (timed ? maxTimedSpins : maxUntimedSpins) : 0);
    for (;;) {
        if (w.isInterrupted())// 當前線程被中斷
            s.tryCancel(e);// 嘗試取消這個 item
        Object x = s.item;// 獲取到這個 tail 的 item
        if (x != e) // 如果不相等吩谦,說明 node 中的 item 取消了,返回這個 item膝藕。
            // 這里是唯一停止循環(huán)的地方式廷。當 s.item 已經(jīng)不是當初的哪個 e 了,說明要么是時間到了被取消了芭挽,要么是線程中斷被取消了滑废。
            // 當然蝗肪,不僅僅只有這2種 “意外” 情況,還有一種情況是:當另一個線程拿走了這個數(shù)據(jù)蠕趁,并修改了 item薛闪,也會通過這個判斷,返回被“修改”過的 item俺陋。
            return x;
        if (timed) {// 如果有時間限制
            nanos = deadline - System.nanoTime();
            if (nanos <= 0L) {// 如果時間到了
                s.tryCancel(e);// 嘗試取消 item逛绵,供上面的 x != e 判斷
                continue;// 重來
            }
        }
        if (spins > 0)// 如果還有自旋次數(shù)
            --spins;// 減一
        else if (s.waiter == null)// 如果自旋不夠,且 tail 的等待線程還沒有賦值
            s.waiter = w;// 當前線程賦值給 tail 的等待線程
        else if (!timed)// 如果自旋不夠倔韭,且如果線程賦值過了,且沒有限制時間瓢对,則 wait寿酌,(危險操作)
            LockSupport.park(this);
        else if (nanos > spinForTimeoutThreshold)// 如果自旋不夠,且如果限制了時間硕蛹,且時間還剩余超過 1 秒醇疼,則 wait 剩余時間。
            // 主要目的就是等待法焰,等待其他線程喚醒這個節(jié)點所在的線程秧荆。
            LockSupport.parkNanos(this, nanos);
    }
}

該方法邏輯如下:

  1. 默認自旋 32 次,如果沒有超時機制埃仪,則 512 次乙濒。
  2. 如果時間到了,或者線程被中斷卵蛉,則取消這次的操作颁股,將item設置成自己。供后面判斷傻丝。
  3. 如果自旋結束甘有,且剩余時間還超過 1 秒窒盐,則阻塞等待至剩余時間秆吵。
  4. 當線程被其他的線程喚醒,說明數(shù)據(jù)被交換了渤弛。則 return泛释,返回的是交換后的數(shù)據(jù)滤愕。

總結

好了,關于 SynchronousQueue的核心源碼分析就到這里了胁澳,樓主沒有分析這個類的所有源碼该互,只研究了核心部分代碼,這足夠我們理解這個 Queue 的內(nèi)部實現(xiàn)了韭畸。

總結下來就是:

JDK 使用了隊列或者棧來實現(xiàn)公平或非公平模型宇智。其中蔓搞,isData 屬性極為重要,標識這這個線程的這次操作随橘,決定了他到底應該是追加到隊列中喂分,還是從隊列中交換數(shù)據(jù)。

每個線程在沒有遇到自己的另一半時机蔗,要么快速失敗蒲祈,要么進行阻塞,阻塞等待自己的另一半來萝嘁,至于對方是給數(shù)據(jù)還是取數(shù)據(jù)梆掸,取決于她自己,如果她是消費者牙言,那么他就是生產(chǎn)者酸钦。

good luck!T弁鳌1傲颉!

?著作權歸作者所有,轉載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末蚕断,一起剝皮案震驚了整個濱河市欢伏,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌亿乳,老刑警劉巖硝拧,帶你破解...
    沈念sama閱讀 218,284評論 6 506
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異葛假,居然都是意外死亡河爹,警方通過查閱死者的電腦和手機,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,115評論 3 395
  • 文/潘曉璐 我一進店門桐款,熙熙樓的掌柜王于貴愁眉苦臉地迎上來咸这,“玉大人,你說我怎么就攤上這事魔眨∠蔽” “怎么了?”我有些...
    開封第一講書人閱讀 164,614評論 0 354
  • 文/不壞的土叔 我叫張陵遏暴,是天一觀的道長侄刽。 經(jīng)常有香客問我,道長朋凉,這世上最難降的妖魔是什么州丹? 我笑而不...
    開封第一講書人閱讀 58,671評論 1 293
  • 正文 為了忘掉前任,我火速辦了婚禮,結果婚禮上墓毒,老公的妹妹穿的比我還像新娘吓揪。我一直安慰自己,他們只是感情好所计,可當我...
    茶點故事閱讀 67,699評論 6 392
  • 文/花漫 我一把揭開白布柠辞。 她就那樣靜靜地躺著,像睡著了一般主胧。 火紅的嫁衣襯著肌膚如雪叭首。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 51,562評論 1 305
  • 那天踪栋,我揣著相機與錄音焙格,去河邊找鬼。 笑死夷都,一個胖子當著我的面吹牛间螟,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播损肛,決...
    沈念sama閱讀 40,309評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼荣瑟!你這毒婦竟也來了治拿?” 一聲冷哼從身側響起,我...
    開封第一講書人閱讀 39,223評論 0 276
  • 序言:老撾萬榮一對情侶失蹤笆焰,失蹤者是張志新(化名)和其女友劉穎劫谅,沒想到半個月后,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體嚷掠,經(jīng)...
    沈念sama閱讀 45,668評論 1 314
  • 正文 獨居荒郊野嶺守林人離奇死亡捏检,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,859評論 3 336
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了不皆。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片贯城。...
    茶點故事閱讀 39,981評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖霹娄,靈堂內(nèi)的尸體忽然破棺而出能犯,到底是詐尸還是另有隱情,我是刑警寧澤犬耻,帶...
    沈念sama閱讀 35,705評論 5 347
  • 正文 年R本政府宣布踩晶,位于F島的核電站,受9級特大地震影響枕磁,放射性物質發(fā)生泄漏渡蜻。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 41,310評論 3 330
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望茸苇。 院中可真熱鬧排苍,春花似錦、人聲如沸税弃。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,904評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽则果。三九已至幔翰,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間西壮,已是汗流浹背遗增。 一陣腳步聲響...
    開封第一講書人閱讀 33,023評論 1 270
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留款青,地道東北人做修。 一個月前我還...
    沈念sama閱讀 48,146評論 3 370
  • 正文 我出身青樓,卻偏偏與公主長得像抡草,于是被迫代替她去往敵國和親饰及。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當晚...
    茶點故事閱讀 44,933評論 2 355

推薦閱讀更多精彩內(nèi)容