大飛老師帶你看線程(并發(fā)容器-SynchronousQueue)上

本文作者:王一飛,叩丁狼高級講師纹份。原創(chuàng)文章文黎,轉(zhuǎn)載請注明出處侵俗。

概述

SynchronousQueue 是一個(gè)特殊的阻塞BlockingQueue隊(duì)列(實(shí)現(xiàn)類), 但是它跟BlockingQueue又有顯著不同:
1>SynchronousQueue沒有容量,算是一個(gè)不存儲元素的BlockingQueue背镇。每次put操作之后,當(dāng)前線程(生產(chǎn)者)會掛起,等待另外一個(gè)線程(消費(fèi)者)執(zhí)行take操作后,才會喚醒掛起線程(生產(chǎn)者)執(zhí)行,否則阻塞,不運(yùn)行添加元素.反之亦然泽裳。
2>SynchronousQueue 因?yàn)闆]有容量,所以遍歷,size, isEmpty 這些常規(guī)具有遍歷性質(zhì)的方法就沒意義啦
3>SynchronousQueue分公平隊(duì)列和非公平隊(duì)列瞒斩,默認(rèn)是false,非公平隊(duì)列. 當(dāng)然,我也也可以使用SynchronousQueue(boolean fair) 構(gòu)造器額外指定.
外部結(jié)構(gòu):


外部結(jié)構(gòu)

內(nèi)部結(jié)構(gòu):(有點(diǎn)復(fù)雜)
構(gòu)造器:

    public SynchronousQueue() {
        this(false);
    }
    public SynchronousQueue(boolean fair) {
        transferer = fair ? new TransferQueue<E>() : new TransferStack<E>();
    }

帶boolean類型參數(shù)的構(gòu)造器, fair 為true時(shí), 是公平策略, 使用TransferQueue 對象實(shí)現(xiàn), fairl為fasle時(shí), 非公平策略隊(duì)列所, 使用TransferStack 對象實(shí)現(xiàn).

//內(nèi)部api: TransferQueue  跟 TransferStack  類的父類
//目的:規(guī)范跟統(tǒng)一SynchronousQueue 隊(duì)列公平與非公平策略操作api
abstract static class Transferer<E> {
    //take 跟 put 操作
    abstract E transfer(E e, boolean timed, long nanos);
}

TransferStack: 非公平隊(duì)列

static final class TransferStack<E> extends Transferer<E> {
        static final int REQUEST    = 0;      //當(dāng)前線程是消費(fèi)者
        static final int DATA       = 1;           //當(dāng)前線程是生成者
        static final int FULFILLING = 2;     //其他情況

        //使用SNode對象輔助完成隊(duì)列實(shí)現(xiàn)
        static final class SNode {
            volatile SNode next;       
            volatile SNode match;     
            volatile Thread waiter;    
            Object item;              
            int mode;
           //.....................
        }
        E transfer(E e, boolean timed, long nanos) {
            //.....
        }
}

SynchronousQueue隊(duì)列實(shí)現(xiàn)非公平策略核心類:TransferStack, 內(nèi)部定義SNode作為容器內(nèi)容節(jié)點(diǎn).

TransferQueue: 公平隊(duì)列

static final class TransferQueue<E> extends Transferer<E> {
      static final class QNode {
            volatile QNode next;         
            volatile Object item;        
            volatile Thread waiter;       
            final boolean isData;    //標(biāo)記生產(chǎn)者還是消費(fèi)者
            //..................
      }

       E transfer(E e, boolean timed, long nanos) {
          //......................
       }
}

SynchronousQueue隊(duì)列實(shí)現(xiàn)公平策略核心類:TransferQueue, 內(nèi)部定義QNode 作為容器內(nèi)容節(jié)點(diǎn).

基本使用

1:take/put 操作線程阻塞,等待配對的put/take

public class App {

    public static void main(String[] args) throws InterruptedException {

        final  SynchronousQueue<String> queue = new SynchronousQueue<String>();
        queue.put("a");
        queue.take();
        System.out.println("end....");
    }
}

運(yùn)行后, 并控制臺中并沒有打印出"end.....", 前面提到了, SynchronousQueue 沒有容量的概念, 生產(chǎn)線程put元素之后,線程會自動掛起, 等待消費(fèi)線程take喚醒.執(zhí)行put方法之后, main線程會掛起, 所以執(zhí)行不了end....., 執(zhí)行take操作也一樣. 結(jié)論:SynchronousQueue take跟put 必須是配對的, 否則總一個(gè)線程被掛起.

2: 查詢性質(zhì)方法無意義, 必須等待take 與put 配對之后線程才可以繼續(xù)執(zhí)行

public class App {
    public static void main(String[] args) throws InterruptedException {
        final  SynchronousQueue<String> queue = new SynchronousQueue<String>();
        queue.put("a");  //線程被掛起
        System.out.println("大小:" + queue.size());
        System.out.println("為空:" + queue.isEmpty());
    }
}

3: SynchronousQueue 執(zhí)行規(guī)則: 一個(gè)put 對應(yīng)一個(gè)take,反之亦然

public class App {

    public static void main(String[] args) throws InterruptedException {
        final  SynchronousQueue<String> queue = new SynchronousQueue<String>();
        new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    System.out.println("t1 begin...");
                    queue.put("a");
                    System.out.println("t1 end...");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }

            }
        }).start();

        Thread.sleep(1000);

        new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    System.out.println("t2 begin...");
                    System.out.println(queue.take());
                    System.out.println("t2 end....");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }

            }
        }).start();

    }
}

輸出結(jié)果:

t1 begin...
t2 begin...
a
t2 end....
t1 end...

多執(zhí)行幾次, 結(jié)果差不多, 但每次a的輸出必須是在t2 begin... 執(zhí)行之后, 原因: t1 線程添加a元素之后馬上掛起, 等t2線程執(zhí)行take消費(fèi)并喚醒之后, t1才有機(jī)會繼續(xù)執(zhí)行.

4:SynchronousQueue分公平隊(duì)列和非公平隊(duì)列
公平隊(duì)列: put線程入隊(duì)時(shí), 會依次掛起. 當(dāng)執(zhí)行take線程時(shí),掛起的put線程按FIFO原則,誰先掛起,誰先喚醒.

非公平隊(duì)列: put線程入隊(duì)時(shí), 會依次掛起. 當(dāng)執(zhí)行take線程時(shí), 隨機(jī)喚醒掛起的put線程.

public class App{

    public static void main(String[] args) throws InterruptedException {
        //非公平
        final  SynchronousQueue<String> queue = new SynchronousQueue<String>(false);
        //公平
        //final  SynchronousQueue<String> queue = new SynchronousQueue<String>(true);

        //創(chuàng)建5個(gè)偶數(shù)put線程
        for (int i = 0; i < 10; i+= 2){
            new Thread(new Runnable() {
                @Override
                public void run() {
                    try {
                        System.out.println(Thread.currentThread().getName() + " begin...");
                        //用于識別隊(duì)列中元素由哪個(gè)線程入列的
                        queue.put("put:" + Thread.currentThread().getName());
                        System.out.println(Thread.currentThread().getName() + " end...");
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }

                }
            }, "t" + i).start();

        }
        //休眠4s保證 put線程全部掛起等待
        Thread.sleep(4000);

        //創(chuàng)建5個(gè)奇數(shù)take線程
        for (int i = 1; i < 10; i+=2){
            new Thread(new Runnable() {
                @Override
                public void run() {
                    try {
                        System.out.println(Thread.currentThread().getName() + " begin...");
                        //查看take 與 put 線程配對
                        System.out.println(queue.take());
                        System.out.println(Thread.currentThread().getName() + " end...");
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }

                }
            },"t" + i).start();
        }
    }
}

結(jié)果分析:

//公平
final  SynchronousQueue<String> queue = new SynchronousQueue<String>(true);
-------------------------------------------------
t2 begin...
t6 begin...
t0 begin...
t4 begin...
t8 begin...
t1 begin...
put:t2
t1 end...
t3 begin...
put:t6
t3 end...
t9 begin...
put:t0
t9 end...
t0 end...
t7 begin...
put:t4
t7 end...
t5 begin...
t4 end...
t2 end...
t8 end...
t6 end...
put:t8
t5 end...

將SynchronousQueue 切換成公平鎖策略時(shí), put線程掛起順序依次是: t2 t6 t0 t4 t8, 4s之后, take線程執(zhí)行后的配對的順序是: put:t2 put:t6 put:t0 put:t4 put:t8 , put的順序跟take的順序完全一致.

//非公平
final  SynchronousQueue<String> queue = new SynchronousQueue<String>(false);
-------------------------------------------------
t2 begin...
t6 begin...
t0 begin...
t4 begin...
t8 begin...
t1 begin...
t3 begin...
t7 begin...
put:t0
t7 end...
t0 end...
put:t4
t1 end...
put:t8
t3 end...
t4 end...
t8 end...
t5 begin...
t9 begin...
put:t2
t9 end...
t6 end...
put:t6
t5 end...
t2 end...

SynchronousQueue 采用非公平鎖策略時(shí), put線程掛起順序依次是: t2 t6 t0 t4 t8, 4s之后, take線程執(zhí)行后的配對的順序是: put:t0 put:t4 put:t8 put:t2 put:t6 , 很明顯put的順序跟take的順序不一致.

源碼分析

1:公平鎖策略- put / take

    public void put(E e) throws InterruptedException {
        if (e == null) throw new NullPointerException();
        if (transferer.transfer(e, false, 0) == null) {
            Thread.interrupted();
            throw new InterruptedException();
        }
    }
    public E take() throws InterruptedException {
        E e = transferer.transfer(null, false, 0);
        if (e != null)
            return e;
        Thread.interrupted();
        throw new InterruptedException();
    }

put 跟 take 方法有都調(diào)用:
調(diào)用transfer 方法:
put : transferer.transfer(e, false, 0)
take: transferer.transfer(null, false, 0);
從前面內(nèi)部結(jié)構(gòu)看: SynchronousQueue 公平策略底層實(shí)際上委托給TransferQueue 鏈表隊(duì)列實(shí)現(xiàn), 而內(nèi)部節(jié)點(diǎn)QNode, 就是鏈表節(jié)點(diǎn)啦, 來看下節(jié)點(diǎn)源碼:

static final class QNode {
    volatile QNode next;   //下一個(gè)節(jié)點(diǎn)        
    volatile Object item;    //節(jié)點(diǎn)內(nèi)容     
    volatile Thread waiter; //線程掛起與喚醒控制: park /unpark      
    //模式控制變量:
    //true: 數(shù)據(jù)模式, put操作為true    
    //false: 請求模式, take操作時(shí)為false
    //隊(duì)列隊(duì)列的take操作與put操作, 使用同一個(gè)  transfer 方法 , 所以isData變量來區(qū)分是take操作還是put操作       
    final boolean isData; 
    //構(gòu)造器
    QNode(Object item, boolean isData) {...}
    //cas原子操作, 設(shè)置next節(jié)點(diǎn)
    boolean casNext(QNode cmp, QNode val) {...}
    //cas 原子操作, 設(shè)置及節(jié)點(diǎn)內(nèi)容
    boolean casItem(Object cmp, Object val) {...}
    //取消節(jié)點(diǎn), 將節(jié)點(diǎn)內(nèi)容設(shè)置為自己
    void tryCancel(Object cmp) {...}
    //判斷是否操作結(jié)束
    boolean isCancelled() {...}
}

此處研究的是公平鎖策略, 所以, 此時(shí)的transfer變量執(zhí)項(xiàng)的是: TransferQueue 類的實(shí)例

E transfer(E e, boolean timed, long nanos) {
            QNode s = null; 
            //e != null, isData 為true , 表示數(shù)據(jù)模式
            boolean isData = (e != null);
            for (;;) { //非鎖并發(fā)中, 自旋到滿足條件為止
                //TransferQueue 初始化時(shí), tail 跟 head 變量執(zhí)行同一個(gè)節(jié)點(diǎn): h
                //QNode tail, head  --> new QNode(null, false);
                QNode t = tail; //尾節(jié)點(diǎn)
                QNode h = head; //頭節(jié)點(diǎn)
                if (t == null || h == null)  //初始化出問題, 跳過       
                    continue;   
                //h==t 為true時(shí), 表示SynchronousQueue隊(duì)列此時(shí)沒數(shù)據(jù), 可以添加         
                //t.isData == isData  h != t時(shí), 隊(duì)列不為空,需要進(jìn)行判斷此時(shí)為take操作還是put操作
                if (h == t || t.isData == isData) {  
                    //進(jìn)入表示put操作 : 隊(duì)列此時(shí)可能為空, 也可能不為空
                    QNode tn = t.next;
                    //前面剛設(shè)置t=tail, 此時(shí)若不等,表示其他線程修改tail值, 放棄此次操作,進(jìn)入下次循環(huán)       
                    if (t != tail)          
                        continue;
                    if (tn != null) { 
                        // tn != null, 表示此時(shí)隊(duì)列已有值, 嘗試推進(jìn)節(jié)點(diǎn)后移
                       //advanceTail 方法內(nèi)部做了限制, 當(dāng) t == tail 才進(jìn)行
                        advanceTail(t, tn);
                        continue;
                    }
                    if (timed && nanos <= 0) //過期,沒必要操作      
                        return null;
                    if (s == null) //執(zhí)行到這, 表示所有操作合法
                        //創(chuàng)建節(jié)點(diǎn)
                        s = new QNode(e, isData);
                    if (!t.casNext(null, s))  //在鏈表上掛載節(jié)點(diǎn), 若失敗,重來      
                        continue;
                    //推進(jìn)節(jié)點(diǎn)后移, 嘗試將s設(shè)置為尾節(jié)點(diǎn)
                    //注意:但且僅當(dāng)t == tail才執(zhí)行
                    //一旦執(zhí)行, 表示s節(jié)點(diǎn)入列
                    advanceTail(t, s);             
                    //advanceTail 操作成功之后, awaitFulfill
                    //awaitFulfill : 此用于自旋/阻塞節(jié)點(diǎn),直到節(jié)點(diǎn)被匹配返回或者取消涮总、中斷胸囱。
                    Object x = awaitFulfill(s, e, timed, nanos);
                    //awaitFulfill 方法只有一個(gè)出口:  s.item == e, 而出現(xiàn)這情況,   
                    //前提必須是s.tryCancel 方法 ,2種情形調(diào)用: 1>等待時(shí)間到 2>線程中斷
                    //執(zhí)行到這步, 有幾種可能: 1>線程被中斷  2>等待時(shí)間到 3>有消費(fèi)/生產(chǎn)線程配對了
                   //當(dāng)x == s 表示線程中斷或等待時(shí)間到
                    if (x == s) {                  
                        //清除s節(jié)點(diǎn)
                        clean(t, s);
                        return null;
                    }
                    //到這: 表示有消費(fèi)/生產(chǎn)線程配對了
                    //用于判斷節(jié)點(diǎn)是否已經(jīng)從隊(duì)列中離開了 
                    if (!s.isOffList()) {          
                        //嘗試將s節(jié)點(diǎn)設(shè)置為head,移出t
                        advanceHead(t, s);       
                        if (x != null)            
                            s.item = s;
                        // 釋放線程 
                        s.waiter = null;
                    }
                    //返回: 
                    return (x != null) ? (E)x : e;

                } else { 
                    //能進(jìn)入到這個(gè)分支, 表示等待隊(duì)列中有等待線程
                    //可能是take1 然后在等put1  也可能是put1在等take1                           
                    QNode m = h.next; 
                    //當(dāng)?shù)却?duì)列中有掛起線程, m != null
                    if (t != tail || m == null || h != head)
                        continue;                
                    Object x = m.item; //拿到第一個(gè)節(jié)點(diǎn)元素
                    if (isData == (x != null) ||   
                        x == m ||    //如果相等表示已經(jīng)有線程跟掛起的線程配置了               
                        !m.casItem(x, e)) {   //交換item的內(nèi)容      
                        advanceHead(h, m);  //推動頭節(jié)點(diǎn)移動         
                        continue;
                    }
                    //推動頭節(jié)點(diǎn)移動
                    //這里注意: advanceHead 有個(gè)動作 h.next = h
                    //節(jié)點(diǎn)被匹配之后, next節(jié)點(diǎn)指向自己, 這種節(jié)點(diǎn)會在clean 方法中被刪除
                    advanceHead(h, m);      
                    //喚醒等待隊(duì)列中被配對的線程       
                    LockSupport.unpark(m.waiter);
                    //返回
                    return (x != null) ? (E)x : e;
                }
            }
        }

公平策略源碼操作流程簡化:


put操作
take操作

總結(jié):
結(jié)合代碼: transferer執(zhí)行流程可歸納為以下:
1: transferer調(diào)用transfer方法實(shí)現(xiàn)SynchronousQueue 公平隊(duì)列的take跟put操作
2:為區(qū)分take與put操作瀑梗, 設(shè)計(jì)控制變量 isData區(qū)分 true:put操作烹笔, fasle:take操作
3:如果隊(duì)列為null或者isData一致(為true), 隊(duì)列嘗試將節(jié)點(diǎn)添加到等待隊(duì)列中, 知道被其他線程匹配, 超時(shí) 或者取消.
4:如果隊(duì)列不為null, 隊(duì)列嘗試配對, 一旦配對成功, 按順序喚醒掛起的線程, 調(diào)用clean方法清除配對節(jié)點(diǎn).

想獲取更多技術(shù)干貨,請前往叩丁狼官網(wǎng):http://www.wolfcode.cn/all_article.html

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末抛丽,一起剝皮案震驚了整個(gè)濱河市谤职,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌亿鲜,老刑警劉巖允蜈,帶你破解...
    沈念sama閱讀 207,248評論 6 481
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異蒿柳,居然都是意外死亡饶套,警方通過查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,681評論 2 381
  • 文/潘曉璐 我一進(jìn)店門垒探,熙熙樓的掌柜王于貴愁眉苦臉地迎上來妓蛮,“玉大人,你說我怎么就攤上這事圾叼「蚩耍” “怎么了捺癞?”我有些...
    開封第一講書人閱讀 153,443評論 0 344
  • 文/不壞的土叔 我叫張陵,是天一觀的道長咖耘。 經(jīng)常有香客問我翘簇,道長,這世上最難降的妖魔是什么儿倒? 我笑而不...
    開封第一講書人閱讀 55,475評論 1 279
  • 正文 為了忘掉前任版保,我火速辦了婚禮,結(jié)果婚禮上夫否,老公的妹妹穿的比我還像新娘彻犁。我一直安慰自己,他們只是感情好凰慈,可當(dāng)我...
    茶點(diǎn)故事閱讀 64,458評論 5 374
  • 文/花漫 我一把揭開白布汞幢。 她就那樣靜靜地躺著,像睡著了一般微谓。 火紅的嫁衣襯著肌膚如雪森篷。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 49,185評論 1 284
  • 那天豺型,我揣著相機(jī)與錄音仲智,去河邊找鬼。 笑死姻氨,一個(gè)胖子當(dāng)著我的面吹牛钓辆,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播肴焊,決...
    沈念sama閱讀 38,451評論 3 401
  • 文/蒼蘭香墨 我猛地睜開眼前联,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了娶眷?” 一聲冷哼從身側(cè)響起似嗤,我...
    開封第一講書人閱讀 37,112評論 0 261
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎茂浮,沒想到半個(gè)月后双谆,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 43,609評論 1 300
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡席揽,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,083評論 2 325
  • 正文 我和宋清朗相戀三年顽馋,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片幌羞。...
    茶點(diǎn)故事閱讀 38,163評論 1 334
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡寸谜,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出属桦,到底是詐尸還是另有隱情熊痴,我是刑警寧澤他爸,帶...
    沈念sama閱讀 33,803評論 4 323
  • 正文 年R本政府宣布,位于F島的核電站果善,受9級特大地震影響诊笤,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜巾陕,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,357評論 3 307
  • 文/蒙蒙 一讨跟、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧鄙煤,春花似錦晾匠、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,357評論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至亡资,卻和暖如春澜共,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背锥腻。 一陣腳步聲響...
    開封第一講書人閱讀 31,590評論 1 261
  • 我被黑心中介騙來泰國打工咳胃, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人旷太。 一個(gè)月前我還...
    沈念sama閱讀 45,636評論 2 355
  • 正文 我出身青樓,卻偏偏與公主長得像销睁,于是被迫代替她去往敵國和親供璧。 傳聞我的和親對象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 42,925評論 2 344

推薦閱讀更多精彩內(nèi)容