【死磕Java并發(fā)】-----J.U.C之Condition

此篇博客所有源碼均來自JDK 1.8

在沒有Lock之前已慢,我們使用synchronized來控制同步,配合Object的wait()旭咽、notify()系列方法可以實(shí)現(xiàn)等待/通知模式请垛。在Java SE5后混稽,Java提供了Lock接口膳叨,相對(duì)于Synchronized而言龄坪,Lock提供了條件Condition,對(duì)線程的等待抄课、喚醒操作更加詳細(xì)和靈活。下圖是Condition與Object的監(jiān)視器方法的對(duì)比(摘自《Java并發(fā)編程的藝術(shù)》):

Condition

Condition提供了一系列的方法來對(duì)阻塞和喚醒線程:

  1. await() :造成當(dāng)前線程在接到信號(hào)或被中斷之前一直處于等待狀態(tài)哎榴。
  2. **await(long time, TimeUnit unit) **:造成當(dāng)前線程在接到信號(hào)、被中斷或到達(dá)指定等待時(shí)間之前一直處于等待狀態(tài)。
  3. **awaitNanos(long nanosTimeout) **:造成當(dāng)前線程在接到信號(hào)尚蝌、被中斷或到達(dá)指定等待時(shí)間之前一直處于等待狀態(tài)迎变。返回值表示剩余時(shí)間,如果在nanosTimesout之前喚醒飘言,那么返回值 = nanosTimeout - 消耗時(shí)間衣形,如果返回值 <= 0 ,則可以認(rèn)定它已經(jīng)超時(shí)了。
  4. **awaitUninterruptibly() **:造成當(dāng)前線程在接到信號(hào)之前一直處于等待狀態(tài)姿鸿∽晃猓【注意:該方法對(duì)中斷不敏感】。
  5. **awaitUntil(Date deadline) **:造成當(dāng)前線程在接到信號(hào)苛预、被中斷或到達(dá)指定最后期限之前一直處于等待狀態(tài)句狼。如果沒有到指定時(shí)間就被通知,則返回true热某,否則表示到了指定時(shí)間腻菇,返回返回false。
  6. signal():喚醒一個(gè)等待線程昔馋。該線程從等待方法返回前必須獲得與Condition相關(guān)的鎖筹吐。
  7. signal()All:喚醒所有等待線程。能夠從等待方法返回的線程必須獲得與Condition相關(guān)的鎖秘遏。

Condition是一種廣義上的條件隊(duì)列骏令。他為線程提供了一種更為靈活的等待/通知模式,線程在調(diào)用await方法后執(zhí)行掛起操作垄提,直到線程等待的某個(gè)條件為真時(shí)才會(huì)被喚醒。Condition必須要配合鎖一起使用周拐,因?yàn)閷?duì)共享狀態(tài)變量的訪問發(fā)生在多線程環(huán)境下铡俐。一個(gè)Condition的實(shí)例必須與一個(gè)Lock綁定,因此Condition一般都是作為Lock的內(nèi)部實(shí)現(xiàn)妥粟。

Condtion的實(shí)現(xiàn)

獲取一個(gè)Condition必須要通過Lock的newCondition()方法审丘。該方法定義在接口Lock下面,返回的結(jié)果是綁定到此 Lock 實(shí)例的新 Condition 實(shí)例勾给。Condition為一個(gè)接口滩报,其下僅有一個(gè)實(shí)現(xiàn)類ConditionObject,由于Condition的操作需要獲取相關(guān)的鎖播急,而AQS則是同步鎖的實(shí)現(xiàn)基礎(chǔ)脓钾,所以ConditionObject則定義為AQS的內(nèi)部類。定義如下:

public class ConditionObject implements Condition, java.io.Serializable {
}

等待隊(duì)列

每個(gè)Condition對(duì)象都包含著一個(gè)FIFO隊(duì)列桩警,該隊(duì)列是Condition對(duì)象通知/等待功能的關(guān)鍵可训。在隊(duì)列中每一個(gè)節(jié)點(diǎn)都包含著一個(gè)線程引用,該線程就是在該Condition對(duì)象上等待的線程。我們看Condition的定義就明白了:

public class ConditionObject implements Condition, java.io.Serializable {
    private static final long serialVersionUID = 1173984872572414699L;
    
    //頭節(jié)點(diǎn)
    private transient Node firstWaiter;
    //尾節(jié)點(diǎn)
    private transient Node lastWaiter;

    public ConditionObject() {
    }
    
    /** 省略方法 **/
}

從上面代碼可以看出Condition擁有首節(jié)點(diǎn)(firstWaiter)握截,尾節(jié)點(diǎn)(lastWaiter)飞崖。當(dāng)前線程調(diào)用await()方法,將會(huì)以當(dāng)前線程構(gòu)造成一個(gè)節(jié)點(diǎn)(Node)谨胞,并將節(jié)點(diǎn)加入到該隊(duì)列的尾部固歪。結(jié)構(gòu)如下:

Condition

Node里面包含了當(dāng)前線程的引用。Node定義與AQS的CLH同步隊(duì)列的節(jié)點(diǎn)使用的都是同一個(gè)類(AbstractQueuedSynchronized.Node靜態(tài)內(nèi)部類)胯努。

Condition的隊(duì)列結(jié)構(gòu)比CLH同步隊(duì)列的結(jié)構(gòu)簡單些牢裳,新增過程較為簡單只需要將原尾節(jié)點(diǎn)的nextWaiter指向新增節(jié)點(diǎn),然后更新lastWaiter即可康聂。

等待

調(diào)用Condition的await()方法會(huì)使當(dāng)前線程進(jìn)入等待狀態(tài)贰健,同時(shí)會(huì)加入到Condition等待隊(duì)列同時(shí)釋放鎖。當(dāng)從await()方法返回時(shí)恬汁,當(dāng)前線程一定是獲取了Condition相關(guān)連的鎖伶椿。

    public final void await() throws InterruptedException {
        // 當(dāng)前線程中斷
        if (Thread.interrupted())
            throw new InterruptedException();
        //當(dāng)前線程加入等待隊(duì)列
        Node node = addConditionWaiter();
        //釋放鎖
        long savedState = fullyRelease(node);
        int interruptMode = 0;
        /**
         * 檢測此節(jié)點(diǎn)的線程是否在同步隊(duì)上,如果不在氓侧,則說明該線程還不具備競爭鎖的資格脊另,則繼續(xù)等待
         * 直到檢測到此節(jié)點(diǎn)在同步隊(duì)列上
         */
        while (!isOnSyncQueue(node)) {
            //線程掛起
            LockSupport.park(this);
            //如果已經(jīng)中斷了,則退出
            if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                break;
        }
        //競爭同步狀態(tài)
        if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
            interruptMode = REINTERRUPT;
        //清理下條件隊(duì)列中的不是在等待條件的節(jié)點(diǎn)
        if (node.nextWaiter != null) // clean up if cancelled
            unlinkCancelledWaiters();
        if (interruptMode != 0)
            reportInterruptAfterWait(interruptMode);
    }

此段代碼的邏輯是:首先將當(dāng)前線程新建一個(gè)節(jié)點(diǎn)同時(shí)加入到條件隊(duì)列中约巷,然后釋放當(dāng)前線程持有的同步狀態(tài)偎痛。然后則是不斷檢測該節(jié)點(diǎn)代表的線程釋放出現(xiàn)在CLH同步隊(duì)列中(收到signal信號(hào)之后就會(huì)在AQS隊(duì)列中檢測到),如果不存在則一直掛起独郎,否則參與競爭同步狀態(tài)踩麦。

加入條件隊(duì)列(addConditionWaiter())源碼如下:

    private Node addConditionWaiter() {
        Node t = lastWaiter;    //尾節(jié)點(diǎn)
        //Node的節(jié)點(diǎn)狀態(tài)如果不為CONDITION,則表示該節(jié)點(diǎn)不處于等待狀態(tài)氓癌,需要清除節(jié)點(diǎn)
        if (t != null && t.waitStatus != Node.CONDITION) {
            //清除條件隊(duì)列中所有狀態(tài)不為Condition的節(jié)點(diǎn)
            unlinkCancelledWaiters();
            t = lastWaiter;
        }
        //當(dāng)前線程新建節(jié)點(diǎn)谓谦,狀態(tài)CONDITION
        Node node = new Node(Thread.currentThread(), Node.CONDITION);
        /**
         * 將該節(jié)點(diǎn)加入到條件隊(duì)列中最后一個(gè)位置
         */
        if (t == null)
            firstWaiter = node;
        else
            t.nextWaiter = node;
        lastWaiter = node;
        return node;
    }

該方法主要是將當(dāng)前線程加入到Condition條件隊(duì)列中。當(dāng)然在加入到尾節(jié)點(diǎn)之前會(huì)清楚所有狀態(tài)不為Condition的節(jié)點(diǎn)贪婉。

fullyRelease(Node node)反粥,負(fù)責(zé)釋放該線程持有的鎖。

    final long fullyRelease(Node node) {
        boolean failed = true;
        try {
            //節(jié)點(diǎn)狀態(tài)--其實(shí)就是持有鎖的數(shù)量
            long savedState = getState();
            //釋放鎖
            if (release(savedState)) {
                failed = false;
                return savedState;
            } else {
                throw new IllegalMonitorStateException();
            }
        } finally {
            if (failed)
                node.waitStatus = Node.CANCELLED;
        }
    }

isOnSyncQueue(Node node):如果一個(gè)節(jié)點(diǎn)剛開始在條件隊(duì)列上疲迂,現(xiàn)在在同步隊(duì)列上獲取鎖則返回true

    final boolean isOnSyncQueue(Node node) {
        //狀態(tài)為Condition才顿,獲取前驅(qū)節(jié)點(diǎn)為null,返回false
        if (node.waitStatus == Node.CONDITION || node.prev == null)
            return false;
        //后繼節(jié)點(diǎn)不為null尤蒿,肯定在CLH同步隊(duì)列中
        if (node.next != null)
            return true;

        return findNodeFromTail(node);
    }

unlinkCancelledWaiters():負(fù)責(zé)將條件隊(duì)列中狀態(tài)不為Condition的節(jié)點(diǎn)刪除

        private void unlinkCancelledWaiters() {
            Node t = firstWaiter;
            Node trail = null;
            while (t != null) {
                Node next = t.nextWaiter;
                if (t.waitStatus != Node.CONDITION) {
                    t.nextWaiter = null;
                    if (trail == null)
                        firstWaiter = next;
                    else
                        trail.nextWaiter = next;
                    if (next == null)
                        lastWaiter = trail;
                }
                else
                    trail = t;
                t = next;
            }
        }

通知

調(diào)用Condition的signal()方法郑气,將會(huì)喚醒在等待隊(duì)列中等待最長時(shí)間的節(jié)點(diǎn)(條件隊(duì)列里的首節(jié)點(diǎn)),在喚醒節(jié)點(diǎn)前优质,會(huì)將節(jié)點(diǎn)移到CLH同步隊(duì)列中竣贪。

    public final void signal() {
        //檢測當(dāng)前線程是否為擁有鎖的獨(dú)
        if (!isHeldExclusively())
            throw new IllegalMonitorStateException();
        //頭節(jié)點(diǎn)军洼,喚醒條件隊(duì)列中的第一個(gè)節(jié)點(diǎn)
        Node first = firstWaiter;
        if (first != null)
            doSignal(first);    //喚醒
    }

該方法首先會(huì)判斷當(dāng)前線程是否已經(jīng)獲得了鎖,這是前置條件演怎。然后喚醒條件隊(duì)列中的頭節(jié)點(diǎn)匕争。

doSignal(Node first):喚醒頭節(jié)點(diǎn)

    private void doSignal(Node first) {
        do {
            //修改頭結(jié)點(diǎn),完成舊頭結(jié)點(diǎn)的移出工作
            if ( (firstWaiter = first.nextWaiter) == null)
                lastWaiter = null;
            first.nextWaiter = null;
        } while (!transferForSignal(first) &&
                (first = firstWaiter) != null);
    }

doSignal(Node first)主要是做兩件事:1.修改頭節(jié)點(diǎn)爷耀,2.調(diào)用transferForSignal(Node first) 方法將節(jié)點(diǎn)移動(dòng)到CLH同步隊(duì)列中甘桑。transferForSignal(Node first)源碼如下:

     final boolean transferForSignal(Node node) {
        //將該節(jié)點(diǎn)從狀態(tài)CONDITION改變?yōu)槌跏紶顟B(tài)0,
        if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
            return false;

        //將節(jié)點(diǎn)加入到syn隊(duì)列中去,返回的是syn隊(duì)列中node節(jié)點(diǎn)前面的一個(gè)節(jié)點(diǎn)
        Node p = enq(node);
        int ws = p.waitStatus;
        //如果結(jié)點(diǎn)p的狀態(tài)為cancel 或者修改waitStatus失敗歹叮,則直接喚醒
        if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
            LockSupport.unpark(node.thread);
        return true;
    }

整個(gè)通知的流程如下:

  1. 判斷當(dāng)前線程是否已經(jīng)獲取了鎖跑杭,如果沒有獲取則直接拋出異常,因?yàn)楂@取鎖為通知的前置條件咆耿。
  2. 如果線程已經(jīng)獲取了鎖德谅,則將喚醒條件隊(duì)列的首節(jié)點(diǎn)
  3. 喚醒首節(jié)點(diǎn)是先將條件隊(duì)列中的頭節(jié)點(diǎn)移出,然后調(diào)用AQS的enq(Node node)方法將其安全地移到CLH同步隊(duì)列中
  4. 最后判斷如果該節(jié)點(diǎn)的同步狀態(tài)是否為Cancel萨螺,或者修改狀態(tài)為Signal失敗時(shí)窄做,則直接調(diào)用LockSupport喚醒該節(jié)點(diǎn)的線程。

總結(jié)

一個(gè)線程獲取鎖后慰技,通過調(diào)用Condition的await()方法椭盏,會(huì)將當(dāng)前線程先加入到條件隊(duì)列中,然后釋放鎖吻商,最后通過isOnSyncQueue(Node node)方法不斷自檢看節(jié)點(diǎn)是否已經(jīng)在CLH同步隊(duì)列了掏颊,如果是則嘗試獲取鎖,否則一直掛起艾帐。當(dāng)線程調(diào)用signal()方法后乌叶,程序首先檢查當(dāng)前線程是否獲取了鎖,然后通過doSignal(Node first)方法喚醒CLH同步隊(duì)列的首節(jié)點(diǎn)柒爸。被喚醒的線程枉昏,將從await()方法中的while循環(huán)中退出來,然后調(diào)用acquireQueued()方法競爭同步狀態(tài)揍鸟。

Condition的應(yīng)用

只知道原理,如果不知道使用那就坑爹了句旱,下面是用Condition實(shí)現(xiàn)的生產(chǎn)者消費(fèi)者問題:

public class ConditionTest {
    private LinkedList<String> buffer;    //容器
    private int maxSize ;           //容器最大
    private Lock lock;
    private Condition fullCondition;
    private Condition notFullCondition;

    ConditionTest(int maxSize){
        this.maxSize = maxSize;
        buffer = new LinkedList<String>();
        lock = new ReentrantLock();
        fullCondition = lock.newCondition();
        notFullCondition = lock.newCondition();
    }

    public void set(String string) throws InterruptedException {
        lock.lock();    //獲取鎖
        try {
            while (maxSize == buffer.size()){
                notFullCondition.await();       //滿了阳藻,添加的線程進(jìn)入等待狀態(tài)
            }

            buffer.add(string);
            fullCondition.signal();
        } finally {
            lock.unlock();      //記得釋放鎖
        }
    }

    public String get() throws InterruptedException {
        String string;
        lock.lock();
        try {
            while (buffer.size() == 0){
                fullCondition.await();
            }
            string = buffer.poll();
            notFullCondition.signal();
        } finally {
            lock.unlock();
        }
        return string;
    }
}

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市谈撒,隨后出現(xiàn)的幾起案子腥泥,更是在濱河造成了極大的恐慌,老刑警劉巖啃匿,帶你破解...
    沈念sama閱讀 206,214評(píng)論 6 481
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件蛔外,死亡現(xiàn)場離奇詭異蛆楞,居然都是意外死亡,警方通過查閱死者的電腦和手機(jī)夹厌,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,307評(píng)論 2 382
  • 文/潘曉璐 我一進(jìn)店門豹爹,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人矛纹,你說我怎么就攤上這事臂聋。” “怎么了或南?”我有些...
    開封第一講書人閱讀 152,543評(píng)論 0 341
  • 文/不壞的土叔 我叫張陵孩等,是天一觀的道長。 經(jīng)常有香客問我采够,道長肄方,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 55,221評(píng)論 1 279
  • 正文 為了忘掉前任蹬癌,我火速辦了婚禮权她,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘冀瓦。我一直安慰自己伴奥,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 64,224評(píng)論 5 371
  • 文/花漫 我一把揭開白布翼闽。 她就那樣靜靜地躺著拾徙,像睡著了一般。 火紅的嫁衣襯著肌膚如雪感局。 梳的紋絲不亂的頭發(fā)上尼啡,一...
    開封第一講書人閱讀 49,007評(píng)論 1 284
  • 那天,我揣著相機(jī)與錄音询微,去河邊找鬼崖瞭。 笑死,一個(gè)胖子當(dāng)著我的面吹牛撑毛,可吹牛的內(nèi)容都是我干的书聚。 我是一名探鬼主播,決...
    沈念sama閱讀 38,313評(píng)論 3 399
  • 文/蒼蘭香墨 我猛地睜開眼藻雌,長吁一口氣:“原來是場噩夢啊……” “哼雌续!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起胯杭,我...
    開封第一講書人閱讀 36,956評(píng)論 0 259
  • 序言:老撾萬榮一對(duì)情侶失蹤驯杜,失蹤者是張志新(化名)和其女友劉穎,沒想到半個(gè)月后做个,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體鸽心,經(jīng)...
    沈念sama閱讀 43,441評(píng)論 1 300
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡滚局,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 35,925評(píng)論 2 323
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了顽频。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片藤肢。...
    茶點(diǎn)故事閱讀 38,018評(píng)論 1 333
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖冲九,靈堂內(nèi)的尸體忽然破棺而出谤草,到底是詐尸還是另有隱情,我是刑警寧澤莺奸,帶...
    沈念sama閱讀 33,685評(píng)論 4 322
  • 正文 年R本政府宣布丑孩,位于F島的核電站,受9級(jí)特大地震影響灭贷,放射性物質(zhì)發(fā)生泄漏温学。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,234評(píng)論 3 307
  • 文/蒙蒙 一甚疟、第九天 我趴在偏房一處隱蔽的房頂上張望仗岖。 院中可真熱鬧,春花似錦览妖、人聲如沸轧拄。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,240評(píng)論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽檩电。三九已至,卻和暖如春府树,著一層夾襖步出監(jiān)牢的瞬間俐末,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 31,464評(píng)論 1 261
  • 我被黑心中介騙來泰國打工奄侠, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留卓箫,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 45,467評(píng)論 2 352
  • 正文 我出身青樓垄潮,卻偏偏與公主長得像烹卒,于是被迫代替她去往敵國和親。 傳聞我的和親對(duì)象是個(gè)殘疾皇子弯洗,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 42,762評(píng)論 2 345

推薦閱讀更多精彩內(nèi)容