21-阻塞隊(duì)列之SynchronousQueue

SynchronousQueue實(shí)現(xiàn)原理

Java 6的并發(fā)編程包中的SynchronousQueue是一個(gè)沒有數(shù)據(jù)緩沖的BlockingQueue跨算,生產(chǎn)者線程對其的插入操作put必須等待消費(fèi)者的移除操作take疏唾,反過來也一樣兄春。

不像ArrayBlockingQueue或LinkedListBlockingQueue基协,SynchronousQueue內(nèi)部并沒有數(shù)據(jù)緩存空間遏弱,你不能調(diào)用peek()方法來看隊(duì)列中是否有數(shù)據(jù)元素房匆,因?yàn)閿?shù)據(jù)元素只有當(dāng)你試著取走的時(shí)候才可能存在卦尊,不取走而只想偷窺一下是不行的叛拷,當(dāng)然遍歷這個(gè)隊(duì)列的操作也是不允許的。隊(duì)列頭元素是第一個(gè)排隊(duì)要插入數(shù)據(jù)的線程岂却,而不是要交換的數(shù)據(jù)忿薇。數(shù)據(jù)是在配對的生產(chǎn)者和消費(fèi)者線程之間直接傳遞的,并不會將數(shù)據(jù)緩沖到隊(duì)列中躏哩∈鸷疲可以這樣來理解:生產(chǎn)者和消費(fèi)者互相等待對方,握手扫尺,然后一起離開筋栋。

SynchronousQueue的一個(gè)使用場景是在線程池里。Executors.newCachedThreadPool()就使用了SynchronousQueue正驻,這個(gè)線程池根據(jù)需要(新任務(wù)到來時(shí))創(chuàng)建新的線程弊攘,如果有空閑線程則會重復(fù)使用,線程空閑了60秒后會被回收姑曙。

實(shí)現(xiàn)原理

SynchronousQueue是一種很特別的BlockingQueue襟交,任何一個(gè)添加元素的操作都必須等到另外一個(gè)線程拿走元素才會結(jié)束。也就是SynchronousQueue本身不會存儲任何元素伤靠,相當(dāng)于生產(chǎn)者和消費(fèi)者手遞手直接交易捣域。

SynchronousQueue有一個(gè)fair選項(xiàng),如果fair為true宴合,稱為fair模式焕梅,否則就是unfair模式。

在fair模式下卦洽,所有等待的生產(chǎn)者線程或者消費(fèi)者線程會按照開始等待時(shí)間依次排隊(duì)贞言,然后按照等待先后順序進(jìn)行匹配交易。這種情況用隊(duì)列實(shí)現(xiàn)阀蒂。

在unfair模式下蜗字,則剛好相反打肝,后來先匹配,這種情況用棧實(shí)現(xiàn)挪捕。

public SynchronousQueue(boolean fair) {
    transferer = fair ? new TransferQueue() : new TransferStack();
}

因?yàn)樘砑釉睾湍米咴厥穷愃剖诌f手交易的粗梭,所以對于拿走元素和添加元素操作,SynchronousQueue調(diào)用的是Transferer同一個(gè)方法transfer级零。

當(dāng)object為null時(shí)表示是拿走元素断医,用于消費(fèi)者線程,否則則是添加元素奏纪,用于生產(chǎn)者線程鉴嗤。因此transfer方法是分析的重點(diǎn)。

abstract Object transfer(Object e, boolean timed, long nanos);

首先來看用于fair模式的TransferQueue的transfer方法:

看代碼之前序调,來理一下邏輯:

  1. 開始隊(duì)列肯定是空醉锅。

  2. 線程進(jìn)入隊(duì)列,如果隊(duì)列是空的发绢,那么就添加該線程進(jìn)入隊(duì)列硬耍,然后進(jìn)行等待(要么有匹配線程出現(xiàn),要么就是該請求超時(shí)取消)

  3. 第二個(gè)線程進(jìn)入边酒,如果前面一個(gè)線程跟它屬于不同類型经柴,也就是說兩者是可以匹配的,那么就從隊(duì)列刪除第一個(gè)線程墩朦。如果是相同的線程坯认,那么做法參照2。

理清了基本邏輯氓涣,也就是會有兩種情況:

  1. 隊(duì)列為空或者隊(duì)列中的等待線程是相同類型

  2. 隊(duì)列中的等待線程是匹配的類型

Object transfer(Object e, boolean timed, long nanos) {
    
    QNode s = null;
    // e不是null表示是生成者線程牛哺,e就是產(chǎn)品,反之就是消費(fèi)者線程
    boolean isData = (e != null);

    for (;;) {
        QNode t = tail;
        QNode h = head;
        // tail和head在隊(duì)列創(chuàng)建時(shí)會被初始化成一個(gè)虛擬節(jié)點(diǎn)
        // 因此發(fā)現(xiàn)沒有初始化劳吠,重新循環(huán)等待直到初始化完成
        if (t == null || h == null)
            continue;

        // 隊(duì)列為空或等待線程類型相同(不同類型才能匹配)
        // 這兩種情況都要把當(dāng)前線程加入到等待隊(duì)列中
        if (h == t || t.isData == isData) {
            QNode tn = t.next;
            // tail對象已經(jīng)被更新引润,出現(xiàn)不一致讀的現(xiàn)象,重新循環(huán)
            if (t != tail)
                continue;
            // 添加線程到等待隊(duì)列時(shí)會先更新當(dāng)前tail的next赴背,然后
            // 更新tail本身椰拒,因此出現(xiàn)只有next被更新的情況晶渠,應(yīng)該
            // 更新tail凰荚,然后重新循環(huán)
            if (tn != null) {
                advanceTail(t, tn);
                continue;
            }
            // 設(shè)定了超時(shí),剩余等待時(shí)間耗盡的時(shí)候褒脯,就無需再等待
            if (timed && nanos <= 0)
                return null;
            // 首次使用s的時(shí)候便瑟,新建一個(gè)節(jié)點(diǎn)保存當(dāng)前線程和數(shù)據(jù)來初始化s
            if (s == null)
                s = new QNode(e, isData);
            // 嘗試更新tail的next,把新建節(jié)點(diǎn)添加到tail的后面番川,如果失敗了到涂,就重新循環(huán)
            if (!t.casNext(null, s))
                continue;
            // 把新建的節(jié)點(diǎn)設(shè)置為tail
            advanceTail(t, s);
            // 等待匹配線程脊框,成功匹配則返回的匹配的值
            // 否則返回當(dāng)前節(jié)點(diǎn),因此s和x相同表示請求被取消
            Object x = awaitFulfill(s, e, timed, nanos);
            if (x == s) {
                clean(t, s);
                return null;
            }

            // 這個(gè)時(shí)候已經(jīng)匹配成功了践啄,s應(yīng)該是排在第一個(gè)的等待線程
            // 如果s依然在隊(duì)列中浇雹,那么需要更新head。
            // 更新head的方法是把s這個(gè)排在第一位的節(jié)點(diǎn)作為新的head
            // 因此需要重置一些屬性使它變成虛擬節(jié)點(diǎn)
            if (!s.isOffList()) {
                advanceHead(t, s);
                if (x != null)
                    s.item = s;
                s.waiter = null;
            }
            // x不為null表示拿到匹配線程的數(shù)據(jù)(消費(fèi)者拿到生產(chǎn)者的數(shù)據(jù))屿讽,
            // 因此返回該數(shù)據(jù)昭灵,否則返回本身的數(shù)據(jù)(生成者返回自己的數(shù)據(jù))
            return (x != null) ? x : e;

        } else { // 線程可以匹配
            // 因?yàn)槭顷?duì)列,因此匹配的是第一個(gè)節(jié)點(diǎn)
            QNode m = h.next;
            // 同樣需要檢查不一致讀的情況
            if (t != tail || m == null || h != head)
                continue;

            Object x = m.item;
            // 匹配失敗時(shí)伐谈,把m從隊(duì)列中移走烂完,重新循環(huán)
            if (isData == (x != null) ||    // m已經(jīng)被匹配了
                x == m ||                   // m已經(jīng)被取消了
                !m.casItem(x, e)) {         // 用CAS設(shè)置m的數(shù)據(jù)為null
                advanceHead(h, m);
                continue;
            }

            // 匹配成功,更新head
            advanceHead(h, m);
            // 解除m的線程等待狀態(tài)
            LockSupport.unpark(m.waiter);
            // 返回匹配的數(shù)據(jù)
            return (x != null) ? x : e;
        }
    }
}

接著來用于Unfair模式的TransferStack的transfer方法

大體邏輯應(yīng)該是一樣的诵棵,不同就是隊(duì)列的入隊(duì)和出隊(duì)操作對應(yīng)到棧時(shí)就是入棧和出棧的操作抠蚣。

Object transfer(Object e, boolean timed, long nanos) {
    SNode s = null;
    int mode = (e == null) ? REQUEST : DATA;

    for (;;) {
        SNode h = head;
        // 棧為空或者節(jié)點(diǎn)類型相同的情況
        if (h == null || h.mode == mode) {
            if (timed && nanos <= 0) {
                // 檢查棧頂節(jié)點(diǎn)是否已經(jīng)取消,如果已經(jīng)取消履澳,彈出節(jié)點(diǎn)
                // 重新循環(huán)嘶窄,接著檢查新的棧頂節(jié)點(diǎn)
                if (h != null && h.isCancelled())
                    casHead(h, h.next);
                else
                    return null;
            // 新建節(jié)點(diǎn),并且嘗試把新節(jié)點(diǎn)入棧
            } else if (casHead(h, s = snode(s, e, h, mode))) {
                // 等待匹配奇昙,如果發(fā)現(xiàn)是被取消的情況护侮,則釋放節(jié)點(diǎn),返回null
                SNode m = awaitFulfill(s, timed, nanos);
                if (m == s) {
                    clean(s);
                    return null;
                }
                // 如果匹配的成功兩個(gè)節(jié)點(diǎn)是棧頂?shù)膬蓚€(gè)節(jié)點(diǎn)
                // 把這兩個(gè)節(jié)點(diǎn)都彈出
                if ((h = head) != null && h.next == s)
                    casHead(h, s.next);     // help s's fulfiller
                return (mode == REQUEST) ? m.item : s.item;
            }
        } else if (!isFulfilling(h.mode)) { // 棧頂節(jié)點(diǎn)沒有和其他線程在匹配储耐,可以匹配
            if (h.isCancelled())            // 棧頂節(jié)點(diǎn)的請求已經(jīng)被取消
                casHead(h, h.next);         // 移除棧頂元素重新循環(huán)
            // 嘗試把該節(jié)點(diǎn)也入棧羊初,該節(jié)點(diǎn)設(shè)置為正在匹配的狀態(tài)
            // 也就是isFulfilling返回true
            else if (casHead(h, s=snode(s, e, h, FULFILLING|mode))) {
                for (;;) {
                    // 棧頂節(jié)點(diǎn)(當(dāng)前線程的節(jié)點(diǎn))和它的下一個(gè)節(jié)點(diǎn)進(jìn)行匹配,m為null意味著
                    // 棧里沒有其他節(jié)點(diǎn)了什湘,因?yàn)榍懊嬖摴?jié)點(diǎn)入棧了长赞,需要彈出這個(gè)節(jié)點(diǎn)重新循環(huán)
                    SNode m = s.next;
                    if (m == null) {
                        casHead(s, null);
                        s = null;
                        break;
                    }

                    // 這個(gè)時(shí)候是有節(jié)點(diǎn)可以匹配的,嘗試為這兩個(gè)節(jié)點(diǎn)做匹配
                    SNode mn = m.next;
                    // m和s匹配成功闽撤,彈出這兩個(gè)節(jié)點(diǎn)得哆,返回?cái)?shù)據(jù);匹配失敗哟旗,把m移除
                    if (m.tryMatch(s)) {
                        casHead(s, mn);
                        return (mode == REQUEST) ? m.item : s.item;
                    } else
                        s.casNext(m, mn);
                }
            }
        // 棧頂正在匹配贩据,參見代碼:
        // else if (casHead(h, s=snode(s, e, h, FULFILLING|mode))) {
        // 做法基本類似,只是這里幫助其他線程匹配闸餐,無論成功與否
        // 都要重新循環(huán)
        } else {
            SNode m = h.next;               
            if (m == null)
                casHead(h, null);
            else {
                SNode mn = m.next;
                if (m.tryMatch(h))
                    casHead(h, mn);
                else
                    h.casNext(m, mn);
            }
        }
    }
}
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末饱亮,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子舍沙,更是在濱河造成了極大的恐慌近上,老刑警劉巖,帶你破解...
    沈念sama閱讀 218,525評論 6 507
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件拂铡,死亡現(xiàn)場離奇詭異壹无,居然都是意外死亡葱绒,警方通過查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,203評論 3 395
  • 文/潘曉璐 我一進(jìn)店門斗锭,熙熙樓的掌柜王于貴愁眉苦臉地迎上來地淀,“玉大人,你說我怎么就攤上這事岖是∩兀” “怎么了?”我有些...
    開封第一講書人閱讀 164,862評論 0 354
  • 文/不壞的土叔 我叫張陵璧微,是天一觀的道長作箍。 經(jīng)常有香客問我,道長前硫,這世上最難降的妖魔是什么胞得? 我笑而不...
    開封第一講書人閱讀 58,728評論 1 294
  • 正文 為了忘掉前任,我火速辦了婚禮屹电,結(jié)果婚禮上阶剑,老公的妹妹穿的比我還像新娘。我一直安慰自己危号,他們只是感情好牧愁,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,743評論 6 392
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著外莲,像睡著了一般猪半。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上偷线,一...
    開封第一講書人閱讀 51,590評論 1 305
  • 那天磨确,我揣著相機(jī)與錄音,去河邊找鬼声邦。 笑死乏奥,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的亥曹。 我是一名探鬼主播邓了,決...
    沈念sama閱讀 40,330評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼媳瞪!你這毒婦竟也來了骗炉?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 39,244評論 0 276
  • 序言:老撾萬榮一對情侶失蹤材失,失蹤者是張志新(化名)和其女友劉穎痕鳍,沒想到半個(gè)月后硫豆,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體龙巨,經(jīng)...
    沈念sama閱讀 45,693評論 1 314
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡笼呆,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,885評論 3 336
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了旨别。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片诗赌。...
    茶點(diǎn)故事閱讀 40,001評論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖秸弛,靈堂內(nèi)的尸體忽然破棺而出铭若,到底是詐尸還是另有隱情,我是刑警寧澤递览,帶...
    沈念sama閱讀 35,723評論 5 346
  • 正文 年R本政府宣布叼屠,位于F島的核電站,受9級特大地震影響绞铃,放射性物質(zhì)發(fā)生泄漏镜雨。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,343評論 3 330
  • 文/蒙蒙 一儿捧、第九天 我趴在偏房一處隱蔽的房頂上張望荚坞。 院中可真熱鬧,春花似錦菲盾、人聲如沸颓影。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,919評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽诡挂。三九已至,卻和暖如春临谱,著一層夾襖步出監(jiān)牢的瞬間咆畏,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 33,042評論 1 270
  • 我被黑心中介騙來泰國打工吴裤, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留旧找,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 48,191評論 3 370
  • 正文 我出身青樓麦牺,卻偏偏與公主長得像钮蛛,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個(gè)殘疾皇子剖膳,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,955評論 2 355

推薦閱讀更多精彩內(nèi)容

  • Java中的阻塞隊(duì)列 什么是阻塞隊(duì)列 阻塞隊(duì)列(BlockingQueue)是一個(gè)支持兩個(gè)附加操作的隊(duì)列魏颓。這兩個(gè)附...
    史路比閱讀 366評論 0 1
  • 相關(guān)文章Java并發(fā)編程(一)線程定義、狀態(tài)和屬性 Java并發(fā)編程(二)同步Java并發(fā)編程(三)volatil...
    劉望舒閱讀 5,235評論 1 31
  • 阻塞隊(duì)列(BlockingQueue)是一個(gè)支持兩個(gè)附加操作的隊(duì)列吱晒。這兩個(gè)附加的操作是:在隊(duì)列為空時(shí)甸饱,獲取元素的線...
    端木軒閱讀 1,005評論 0 2
  • 今天是第一篇文章 所有的事情都不會一蹴而就的 學(xué)習(xí)之路 不能回頭 一往無前
    杰哥寫文案閱讀 155評論 0 0
  • 沒有反思的人生不值得過! 1. 習(xí)慣+閱讀+學(xué)習(xí) 1)生活習(xí)慣:早5:00晚22:00。有一天晚上失眠叹话,周五半夜醒...
    Lena呂秋麗閱讀 129評論 0 0