25.Condition

1.與Lock的關系

Condition在同步鎖synchronized中用的比較多。
Condition本身也是一個接口,其功能和wait/notify類似逛裤。

public interface Condition {
    void await() throws InterruptedException;  //等待

    void awaitUninterruptibly(); 

    long awaitNanos(long var1) throws InterruptedException;

    boolean await(long var1, TimeUnit var3) throws InterruptedException;  //計時等待

    boolean awaitUntil(Date var1) throws InterruptedException;

    void signal();  //喚醒

    void signalAll();   //喚醒全部
}

wait()/notify()必須和synchronized一起使用,Condition也必須和Lock一起使用。因此绝葡,在Lock的接口中,有一個與Condition相關的接口:(所有的Condition都是從Lock中構造出來的)

public interface Lock {
    void lock();

    void lockInterruptibly() throws InterruptedException;

    boolean tryLock();

    boolean tryLock(long var1, TimeUnit var3) throws InterruptedException;

    void unlock();
    // 所有的Condition都是從Lock中構造出來的
    Condition newCondition();
}

2.使用場景

以以ArrayBlockingQueue為例:為一個用數組實現的阻塞隊列腹鹉,執(zhí)行put(...)操作的時候藏畅,隊列滿了,生產者線程被阻塞功咒;執(zhí)行take()操作的時候愉阎,隊列為空,消費者線程被阻塞力奋。

public class ArrayBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, Serializable {
    final Object[] items;
    int takeIndex;
    int putIndex;
    int count;
    // 一把鎖+兩個條件
    final ReentrantLock lock;
    private final Condition notEmpty;
    private final Condition notFull;
    transient ArrayBlockingQueue<E>.Itrs itrs;
    public ArrayBlockingQueue(int capacity, boolean fair) {
        if (capacity <= 0) {
            throw new IllegalArgumentException();
        } else {
            this.items = new Object[capacity];
            // 構造器中創(chuàng)建一把鎖加兩個條件
            this.lock = new ReentrantLock(fair);
            // 構造器中創(chuàng)建一把鎖加兩個條件
            this.notEmpty = this.lock.newCondition();
            // 構造器中創(chuàng)建一把鎖加兩個條件
            this.notFull = this.lock.newCondition();
        }
    }
    public void put(E e) throws InterruptedException {
        Objects.requireNonNull(e);
        ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while(this.count == this.items.length) {
                // 非滿條件阻塞榜旦,隊列容量已滿
                this.notFull.await();
            }
            this.enqueue(e);
        } finally {
            lock.unlock();
        }
    }

    private void enqueue(E e) {
        Object[] items = this.items;
        items[this.putIndex] = e;
        if (++this.putIndex == items.length) {
            this.putIndex = 0;
        }

        ++this.count;
        // put數據結束,通知消費者非空條件
        this.notEmpty.signal();
    }

    public E take() throws InterruptedException {
        ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        Object var2;
        try {
            while(this.count == 0) {
                // 阻塞于非空條件景殷,隊列元素個數為0溅呢,無法消費
                this.notEmpty.await();
            }
            var2 = this.dequeue();
        } finally {
            lock.unlock();
        }
        return var2;
    }

    private E dequeue() {
        Object[] items = this.items;
        E e = items[this.takeIndex];
        items[this.takeIndex] = null;
        if (++this.takeIndex == items.length) {
            this.takeIndex = 0;
        }

        --this.count;
        if (this.itrs != null) {
            this.itrs.elementDequeued();
        }
        // 消費成功,通知非滿條件滨彻,隊列中有空間藕届,可以生產元素了。
        this.notFull.signal();
        return e;
    }

3.實現原理

Condition的使用很方便亭饵,避免了wait/notify的喚醒操作不僅喚醒生產者也會喚醒消費者問題休偶。
由于Condition必須和Lock一起使用,所以Condition的實現也是Lock的一部分辜羊。首先查看互斥鎖和讀寫鎖中Condition的構造方法:

public class ReentrantLock implements Lock, java.io.Serializable {
    public Condition newCondition() {
        return this.sync.newCondition();
    }
}

public class ReentrantReadWriteLock implements ReadWriteLock, java.io.Serializable {
    private final ReentrantReadWriteLock.ReadLock readerLock;
    private final ReentrantReadWriteLock.WriteLock writerLock;
    public static class ReadLock implements Lock, Serializable {
        // 讀鎖不支持Condition
        public Condition newCondition() {
            throw new UnsupportedOperationException();
        }
    }
    public static class WriteLock implements Lock, java.io.Serializable {
        public Condition newCondition() {
            return this.sync.newCondition();
        }
    }
}

首先踏兜,讀寫鎖中的 ReadLock 是不支持 Condition 的词顾,讀寫鎖的寫鎖和互斥鎖都支持Condition。雖然它們各自調用的是自己的內部類Sync碱妆,但內部類Sync都繼承自AQS肉盹。因此,上面的代碼sync.newCondition最終都調用了AQS中的newCondition疹尾。

public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable { 
    public class ConditionObject implements Condition, java.io.Serializable { 
        // Condition的所有實現上忍,都在ConditionObject類中 
    } 
}
abstract static class Sync extends AbstractQueuedSynchronizer { 
    final ConditionObject newCondition() { 
        return new ConditionObject(); 
    } 
}

每一個Condition對象上面,都阻塞了多個線程纳本。因此窍蓝,在ConditionObject內部也有一個雙向鏈表組成的隊列

public class ConditionObject implements Condition, java.io.Serializable { 
    private transient Node firstWaiter; 
    private transient Node lastWaiter; 
}
static final class Node { 
    volatile Node prev; 
    volatile Node next;
     volatile Thread thread; 
    Node nextWaiter; 
}

4.await()實現分析

java.util.concurrent.locks.AbstractQueuedSynchronizer.ConditionObject#await()實現分析

public final void await() throws InterruptedException { 
    // 剛要執(zhí)行await()操作,收到中斷信號繁成,拋異常 
    if (Thread.interrupted()) throw new InterruptedException(); 
    // 加入Condition的等待隊列 
    Node node = addConditionWaiter(); 
    // 阻塞在Condition之前必須先釋放鎖吓笙,否則會死鎖 
    int savedState = fullyRelease(node); 
    int interruptMode = 0; 
    while (!isOnSyncQueue(node)) { 
        // 阻塞當前線程 LockSupport.park(this); 
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; 
    }
    // 重新獲取鎖
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE) 
        interruptMode = REINTERRUPT;
    .....

關于await,有幾個關鍵點要說明:

  1. 線程調用 await()的時候巾腕,肯定已經先拿到了鎖面睛。所以,在 addConditionWaiter()內部尊搬,對這個雙向鏈表的操作不需要執(zhí)行CAS操作叁鉴,線程天生是安全的,代碼如下:
private Node addConditionWaiter() { 
    // ... 
    Node t = lastWaiter; 
    Node node = new Node(Thread.currentThread(), Node.CONDITION); 
    if (t == null) 
        firstWaiter = node; 
    else
        t.nextWaiter = node; 
    lastWaiter = node; 
    return node; 
}
  1. 在線程執(zhí)行wait操作之前毁嗦,必須先釋放鎖亲茅。也就是fullyRelease(node),否則會發(fā)生死鎖狗准。這個和wait/notify與synchronized的配合機制一樣克锣。
  2. 線程從await中被喚醒后,必須用acquireQueued(node, savedState)方法重新拿鎖腔长。
  3. checkInterruptWhileWaiting(node)代碼在park(this)代碼之后袭祟,是為了檢測在park期間是否收到過中斷信號。當線程從park中醒來時捞附,有兩種可能:一種是其他線程調用了unpark巾乳,另一種是收到中斷信號。這里的await()方法是可以響應中斷的鸟召,所以當發(fā)現自己是被中斷喚醒的胆绊,而不是被unpark喚醒的時,會直接退出while循環(huán)欧募,await()方法也會返回压状。
  4. isOnSyncQueue(node)用于判斷該Node是否在AQS的同步隊列里面。初始的時候,Node只 在Condition的隊列里种冬,而不在AQS的隊列里镣丑。但執(zhí)行notity操作的時候,會放進AQS的同步隊列娱两。

5.signal實現

java.util.concurrent.locks.AbstractQueuedSynchronizer.ConditionObject#signal()

        public final void signal() {
            //// 只有持有鎖的線程莺匠,才有資格調用signal()方法
            if (!AbstractQueuedSynchronizer.this.isHeldExclusively()) {
                throw new IllegalMonitorStateException();
            } else {
                AbstractQueuedSynchronizer.Node first = this.firstWaiter;
                if (first != null) {
                    // 發(fā)起通知
                    this.doSignal(first);
                }
            }
        }

        // 喚醒隊列中的第1個線程
        private void doSignal(AbstractQueuedSynchronizer.Node first) {
            do {
                if ((this.firstWaiter = first.nextWaiter) == null) {
                    this.lastWaiter = null;
                }
                first.nextWaiter = null;
            } while(!AbstractQueuedSynchronizer.this.transferForSignal(first) && (first = this.firstWaiter) != null);

        }

        final boolean transferForSignal(Node node) {
            if (!node.compareAndSetWaitStatus(Node.CONDITION, 0)) 
                return false; 
            // 先把Node放入互斥鎖的同步隊列中,再調用unpark方法 
            Node p = enq(node); 
            int ws = p.waitStatus; 
            if (ws > 0 || !p.compareAndSetWaitStatus(ws, Node.SIGNAL)) 
                LockSupport.unpark(node.thread); 
            return true; 
        }

同 await()一樣十兢,在調用 signal()的時候趣竣,必須先拿到鎖(否則就會拋出上面的異常),是因為前面執(zhí)行await()的時候纪挎,把鎖釋放了期贫。
然后,從隊列中取出firstWaiter异袄,喚醒它。在通過調用unpark喚醒它之前玛臂,先用enq(node)方法把這個Node從condition中移除并放入AQS的鎖對應的阻塞隊列中烤蜕。也正因為如此,才有了await()方法里面的判斷條件:

  • while( ! isOnSyncQueue(node))
  • 這個判斷條件滿足迹冤,說明await線程不是被中斷讽营,而是被unpark喚醒的。
  • notifyAll()與此類似泡徙。

簡述:
阻塞隊列使用condition實現橱鹏,condition必須和Lock一起使用,一般使用ReentrantLock來創(chuàng)建condition阻塞條件堪藐,并用ReentrantLock的鎖來加鎖控制并發(fā)莉兰。解決了wait/notify同時喚醒生產者和消費者的問題。讀寫鎖中的 ReadLock 是不支持 Condition 的礁竞,讀寫鎖的寫鎖和互斥鎖都支持Condition糖荒。他們最終都調用了AQS中的newCondition,ConditionObject是condition的具體實現模捂,內部也有一個雙向鏈表組成的隊列來存儲阻塞的隊列捶朵。一開始調用await會放入ConditionObject的隊列,然后調用最終是調用park原語阻塞狂男,最后調用signal的時候把線程在ConditionObject中移除综看,放入阻塞隊列本身,并調用unpark岖食。

最后編輯于
?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
  • 序言:七十年代末红碑,一起剝皮案震驚了整個濱河市,隨后出現的幾起案子县耽,更是在濱河造成了極大的恐慌句喷,老刑警劉巖镣典,帶你破解...
    沈念sama閱讀 212,599評論 6 492
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現場離奇詭異唾琼,居然都是意外死亡兄春,警方通過查閱死者的電腦和手機,發(fā)現死者居然都...
    沈念sama閱讀 90,629評論 3 385
  • 文/潘曉璐 我一進店門锡溯,熙熙樓的掌柜王于貴愁眉苦臉地迎上來赶舆,“玉大人,你說我怎么就攤上這事祭饭∥咭穑” “怎么了?”我有些...
    開封第一講書人閱讀 158,084評論 0 348
  • 文/不壞的土叔 我叫張陵倡蝙,是天一觀的道長九串。 經常有香客問我,道長寺鸥,這世上最難降的妖魔是什么猪钮? 我笑而不...
    開封第一講書人閱讀 56,708評論 1 284
  • 正文 為了忘掉前任,我火速辦了婚禮胆建,結果婚禮上烤低,老公的妹妹穿的比我還像新娘。我一直安慰自己笆载,他們只是感情好扑馁,可當我...
    茶點故事閱讀 65,813評論 6 386
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著凉驻,像睡著了一般腻要。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上沿侈,一...
    開封第一講書人閱讀 50,021評論 1 291
  • 那天闯第,我揣著相機與錄音,去河邊找鬼缀拭。 笑死咳短,一個胖子當著我的面吹牛,可吹牛的內容都是我干的蛛淋。 我是一名探鬼主播咙好,決...
    沈念sama閱讀 39,120評論 3 410
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼褐荷!你這毒婦竟也來了勾效?” 一聲冷哼從身側響起,我...
    開封第一講書人閱讀 37,866評論 0 268
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎层宫,沒想到半個月后杨伙,有當地人在樹林里發(fā)現了一具尸體,經...
    沈念sama閱讀 44,308評論 1 303
  • 正文 獨居荒郊野嶺守林人離奇死亡萌腿,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內容為張勛視角 年9月15日...
    茶點故事閱讀 36,633評論 2 327
  • 正文 我和宋清朗相戀三年限匣,在試婚紗的時候發(fā)現自己被綠了。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片毁菱。...
    茶點故事閱讀 38,768評論 1 341
  • 序言:一個原本活蹦亂跳的男人離奇死亡米死,死狀恐怖,靈堂內的尸體忽然破棺而出贮庞,到底是詐尸還是另有隱情峦筒,我是刑警寧澤,帶...
    沈念sama閱讀 34,461評論 4 333
  • 正文 年R本政府宣布窗慎,位于F島的核電站物喷,受9級特大地震影響,放射性物質發(fā)生泄漏捉邢。R本人自食惡果不足惜脯丝,卻給世界環(huán)境...
    茶點故事閱讀 40,094評論 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望伏伐。 院中可真熱鬧,春花似錦晕拆、人聲如沸藐翎。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,850評論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽吝镣。三九已至,卻和暖如春昆庇,著一層夾襖步出監(jiān)牢的瞬間末贾,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 32,082評論 1 267
  • 我被黑心中介騙來泰國打工整吆, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留拱撵,地道東北人。 一個月前我還...
    沈念sama閱讀 46,571評論 2 362
  • 正文 我出身青樓表蝙,卻偏偏與公主長得像拴测,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子府蛇,可洞房花燭夜當晚...
    茶點故事閱讀 43,666評論 2 350

推薦閱讀更多精彩內容