原文鏈接:深入剖析基于并發(fā)AQS的(獨占鎖)重入鎖(ReetrantLock)及其Condition實現(xiàn)原理 - CSDN博客
神奇的Condition
關(guān)于Condition接口
在并發(fā)編程中式撼,每個Java對象都存在一組監(jiān)視器方法艇劫,如wait()停团、notify()以及notifyAll()方法跛梗,通過這些方法,我們可以實現(xiàn)線程間通信與協(xié)作(也稱為等待喚醒機制)个唧,如生產(chǎn)者-消費者模式惕虑,而且這些方法必須配合著synchronized關(guān)鍵字使用义屏,關(guān)于這點,如果想有更深入的理解舰褪,可觀看博主另外一篇博文【?深入理解Java并發(fā)之synchronized實現(xiàn)原理】皆疹,與synchronized的等待喚醒機制相比Condition具有更多的靈活性以及精確性,這是因為notify()在喚醒線程時是隨機(同一個鎖)占拍,而Condition則可通過多個Condition實例對象建立更加精細的線程控制略就,也就帶來了更多靈活性了,我們可以簡單理解為以下兩點:
1. 通過Condition能夠精細的控制多線程的休眠與喚醒刷喜。
2. 對于一個鎖残制,我們可以為多個線程間建立不同的Condition。
Condition是一個接口類掖疮,其主要方法如下:
public interfaceCondition{
?/**
? * 使當前線程進入等待狀態(tài)直到被通知(signal)或中斷
? * 當其他線程調(diào)用singal()或singalAll()方法時初茶,該線程將被喚醒
? * 當其他線程調(diào)用interrupt()方法中斷當前線程
? * await()相當于synchronized等待喚醒機制中的wait()方法
? */
?void await() throws InterruptedException;
//當前線程進入等待狀態(tài),直到被喚醒,該方法不響應(yīng)中斷要求
?void awaitUninterruptibly();
//調(diào)用該方法恼布,當前線程進入等待狀態(tài)螺戳,直到被喚醒或被中斷或超時
?//其中nanosTimeout指的等待超時時間,單位納秒
?long awaitNanos(long nanosTimeout) throws InterruptedException;
? //同awaitNanos折汞,但可以指明時間單位
? boolean await(long time, TimeUnit unit) throws InterruptedException;
//調(diào)用該方法當前線程進入等待狀態(tài)倔幼,直到被喚醒、中斷或到達某個時
?//間期限(deadline),如果沒到指定時間就被喚醒爽待,返回true损同,其他情況返回false
? boolean awaitUntil(Date deadline) throws InterruptedException;
//喚醒一個等待在Condition上的線程,該線程從等待方法返回前必須
?//獲取與Condition相關(guān)聯(lián)的鎖鸟款,功能與notify()相同
? void signal();
//喚醒所有等待在Condition上的線程膏燃,該線程從等待方法返回前必須
?//獲取與Condition相關(guān)聯(lián)的鎖,功能與notifyAll()相同
? void signalAll();
}
關(guān)于Condition的實現(xiàn)類是AQS的內(nèi)部類ConditionObject何什,關(guān)于這點我們稍后分析组哩,這里先來看一個Condition的使用案例,即經(jīng)典消費者生產(chǎn)者模式处渣。
Condition的使用案例-生產(chǎn)者消費者模式
這里我們通過一個賣烤鴨的案例來演示多生產(chǎn)多消費者的案例伶贰,該場景中存在兩條生產(chǎn)線程t1和t2,用于生產(chǎn)烤鴨罐栈,也存在兩條消費線程t3黍衙,t4用于消費烤鴨,4條線程同時執(zhí)行悠瞬,需要保證只有在生產(chǎn)線程產(chǎn)生烤鴨后们豌,消費線程才能消費,否則只能等待浅妆,直到生產(chǎn)線程產(chǎn)生烤鴨后喚醒消費線程望迎,注意烤鴨不能重復(fù)消費。ResourceByCondition類中定義product()和consume()兩個方法凌外,分別用于生產(chǎn)烤鴨和消費烤鴨辩尊,并且定義ReentrantLock鎖,用于控制product()和consume()的并發(fā)康辑,由于必須在烤鴨生成完成后消費線程才能消費烤鴨摄欲,否則只能等待,因此這里定義兩組Condition對象疮薇,分別是producer_con和consumer_con胸墙,前者擁有控制生產(chǎn)線程,后者擁有控制消費線程按咒,這里我們使用一個標志flag來控制是否有烤鴨迟隅,當flag為true時,代表烤鴨生成完畢,生產(chǎn)線程必須進入等待狀態(tài)同時喚醒消費線程進行消費智袭,消費線程消費完畢后將flag設(shè)置為false奔缠,代表烤鴨消費完成,進入等待狀態(tài)吼野,同時喚醒生產(chǎn)線程生產(chǎn)烤鴨校哎,具體代碼如下:
package com.zejian.concurrencys;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
/**
* Created by zejian on 2017/7/22.
* Blog : http://blog.csdn.net/javazejian [原文地址,請尊重原創(chuàng)]
*/
public class ResourceByCondition{
? ? private String name;
? ? private int count = 1;
? ? private boolean flag = false;
? ? //創(chuàng)建一個鎖對象。
? ? Lock lock = new ReentrantLock();
? ? //通過已有的鎖獲取兩組監(jiān)視器瞳步,一組監(jiān)視生產(chǎn)者闷哆,一組監(jiān)視消費者。??
? ? Condition producer_con = lock.newCondition();
? ? Condition consumer_con = lock.newCondition();
? ? /**
? ? * 生產(chǎn)
? ? * @paramname
? ? */
? ? public? void product(String name)
? ? {
? ? ? ? lock.lock();
? ? ? ? try? ? ? ? {
? ? ? ? ? ? while(flag){
? ? ? ? ? ? ? ? try{
????????????????????producer_con.await();
????????????????}catch(InterruptedException e){}
? ? ? ? ? ? }
? ? ? ? ? ? this.name = name + count;
? ? ? ? ? ? count++;
? ? ? ? ? ? System.out.println(Thread.currentThread().getName()+"...生產(chǎn)者5.0..."+this.name);
? ? ? ? ? ? flag = true;
? ? ? ? ? ? consumer_con.signal();//直接喚醒消費線程
? ? ? ? }
? ? ? ? finally? ? ? ? {
? ? ? ? ? ? lock.unlock();
? ? ? ? }
? ? }
? ? /**
? ? * 消費
? ? */
? ? public? void consume()
? ? {
? ? ? ? lock.lock();
? ? ? ? try? ? ? ? {
? ? ? ? ? ? while(!flag){
? ? ? ? ? ? ? ? try{
????????????????????consumer_con.await();
????????????????}catch(InterruptedException e){}
? ? ? ? ? ? }
? ? ? ? ? ? System.out.println(Thread.currentThread().getName()+"...消費者.5.0......."+this.name);//消費烤鴨1? ? ? ? ? ??
????????????flag = false;
? ? ? ? ? ? producer_con.signal();//直接喚醒生產(chǎn)線程
? ? ? ? }
? ? ? ? finally
? ? ? ? {
? ? ? ? ? ? lock.unlock();
? ? ? ? }
? ? }
}
執(zhí)行代碼
package com.zejian.concurrencys;
/**
* Created by zejian on 2017/7/22.
* Blog : http://blog.csdn.net/javazejian [原文地址,請尊重原創(chuàng)]
*/
public classMutil_Producer_ConsumerByCondition{
? ? public static void main(String[] args) {
? ? ? ? ResourceByCondition r = new ResourceByCondition();
? ? ? ? Mutil_Producer pro = new Mutil_Producer(r);
? ? ? ? Mutil_Consumer con = new Mutil_Consumer(r);
? ? ? ? //生產(chǎn)者線程
? ? ? ? Thread t0 = new Thread(pro);
? ? ? ? Thread t1 = new Thread(pro);
? ? ? ? //消費者線程
? ? ? ? Thread t2 = new Thread(con);
? ? ? ? Thread t3 = new Thread(con);
? ? ? ? //啟動線程
? ? ? ? t0.start();
? ? ? ? t1.start();
? ? ? ? t2.start();
? ? ? ? t3.start();
? ? }
}
/** * @decrition生產(chǎn)者線程 */
class Mutil_Producer implements Runnable {
? ? private ResourceByCondition r;
? ? Mutil_Producer(ResourceByCondition r) {
? ? ? ? this.r = r;
? ? }
? ? public void run() {
? ? ? ? while (true) {
? ? ? ? ? ? r.product("北京烤鴨");
? ? ? ? }
? ? }
}
/** * @decrition消費者線程 */
class Mutil_Consumer implements Runnable {
? ? private ResourceByCondition r;
? ? Mutil_Consumer(ResourceByCondition r) {
? ? ? ? this.r = r;
? ? }
? ? public void run() {
? ? ? ? while (true) {
? ? ? ? ? ? r.consume();
? ? ? ? }
? ? }
}
正如代碼所示谚攒,我們通過兩者Condition對象單獨控制消費線程與生產(chǎn)消費阳准,這樣可以避免消費線程在喚醒線程時喚醒的還是消費線程,如果是通過synchronized的等待喚醒機制實現(xiàn)的話馏臭,就可能無法避免這種情況,畢竟同一個鎖讼稚,對于synchronized關(guān)鍵字來說只能有一組等待喚醒隊列括儒,而不能像Condition一樣,同一個鎖擁有多個等待隊列锐想。synchronized的實現(xiàn)方案如下帮寻,
public classKaoYaResource{
?private String name;
? ? private int count = 1;//烤鴨的初始數(shù)量?
?? private boolean flag = false;//判斷是否有需要線程等待的標志
? ? /**
? ? * 生產(chǎn)烤鴨
? ? */
? ? public synchronized void product(String name){
? ? ? ? while(flag){
? ? ? ? ? ? //此時有烤鴨,等待
? ? ? ? ? ? try {
? ? ? ? ? ? ? ? this.wait();
? ? ? ? ? ? } catch (InterruptedException e) {
? ? ? ? ? ? ? ? e.printStackTrace();
? ? ? ? ? ? }
? ? ? ? }
? ? ? ? this.name=name+count;//設(shè)置烤鴨的名稱
? ? ? ? count++;
? ? ? ? System.out.println(Thread.currentThread().getName()+"...生產(chǎn)者..."+this.name);
? ? ? ? flag=true;//有烤鴨后改變標志
? ? ? ? notifyAll();//通知消費線程可以消費了
? ? }
? ? /**
? ? * 消費烤鴨
? ? */
? ? public synchronized void consume(){
? ? ? ? while(!flag){//如果沒有烤鴨就等待
? ? ? ? ? ? try{this.wait();}catch(InterruptedException e){}
? ? ? ? }
? ? ? ? System.out.println(Thread.currentThread().getName()+"...消費者........"+this.name);//消費烤鴨1? ? ? ? flag = false;
? ? ? ? notifyAll();//通知生產(chǎn)者生產(chǎn)烤鴨
? ? }
}
如上代碼赠摇,在調(diào)用notify()或者 notifyAll()方法時固逗,由于等待隊列中同時存在生產(chǎn)者線程和消費者線程,所以我們并不能保證被喚醒的到底是消費者線程還是生產(chǎn)者線程藕帜,而Codition則可以避免這種情況烫罩。嗯,了解完Condition的使用方式后洽故,下面我們將進一步探討Condition背后的實現(xiàn)機制贝攒。
Condition的實現(xiàn)原理
Condition的具體實現(xiàn)類是AQS的內(nèi)部類ConditionObject,前面我們分析過AQS中存在兩種隊列时甚,一種是同步隊列隘弊,一種是等待隊列,而等待隊列就相對于Condition而言的荒适。注意在使用Condition前必須獲得鎖梨熙,同時在Condition的等待隊列上的結(jié)點與前面同步隊列的結(jié)點是同一個類即Node,其結(jié)點的waitStatus的值為CONDITION刀诬。在實現(xiàn)類ConditionObject中有兩個結(jié)點分別是firstWaiter和lastWaiter咽扇,firstWaiter代表等待隊列第一個等待結(jié)點,lastWaiter代表等待隊列最后一個等待結(jié)點,如下:
public class ConditionObject implements Condition,java.io.Serializable{
?//等待隊列第一個等待結(jié)點
?private transient Node firstWaiter;
? ? //等待隊列最后一個等待結(jié)點
? ? private transient Node lastWaiter;
? ? //省略其他代碼.......
}
每個Condition都對應(yīng)著一個等待隊列肌割,也就是說如果一個鎖上創(chuàng)建了多個Condition對象卧蜓,那么也就存在多個等待隊列。等待隊列是一個FIFO的隊列把敞,在隊列中每一個節(jié)點都包含了一個線程的引用弥奸,而該線程就是Condition對象上等待的線程。當一個線程調(diào)用了await()相關(guān)的方法奋早,那么該線程將會釋放鎖盛霎,并構(gòu)建一個Node節(jié)點封裝當前線程的相關(guān)信息加入到等待隊列中進行等待,直到被喚醒耽装、中斷愤炸、超時才從隊列中移出。Condition中的等待隊列模型如下:
正如圖所示掉奄,Node節(jié)點的數(shù)據(jù)結(jié)構(gòu)规个,在等待隊列中使用的變量與同步隊列是不同的,Condtion中等待隊列的結(jié)點只有直接指向的后繼結(jié)點并沒有指明前驅(qū)結(jié)點姓建,而且使用的變量是nextWaiter而不是next诞仓,這點我們在前面分析結(jié)點Node的數(shù)據(jù)結(jié)構(gòu)時講過。firstWaiter指向等待隊列的頭結(jié)點速兔,lastWaiter指向等待隊列的尾結(jié)點墅拭,等待隊列中結(jié)點的狀態(tài)只有兩種即CANCELLED和CONDITION,前者表示線程已結(jié)束需要從等待隊列中移除涣狗,后者表示條件結(jié)點等待被喚醒谍婉。再次強調(diào)每個Codition對象對于一個等待隊列,也就是說AQS中只能存在一個同步隊列镀钓,但可擁有多個等待隊列穗熬。下面從代碼層面看看被調(diào)用await()方法(其他await()實現(xiàn)原理類似)的線程是如何加入等待隊列的,而又是如何從等待隊列中被喚醒的:
public final void await() throws InterruptedException {
? ? ? //判斷線程是否被中斷
? ? ? if (Thread.interrupted())
? ? ? ? ? throw new InterruptedException();
? ? ? //創(chuàng)建新結(jié)點加入等待隊列并返回
? ? ? Node node = addConditionWaiter();
? ? ? //釋放當前線程鎖即釋放同步狀態(tài)
? ? ? int savedState = fullyRelease(node);
? ? ? int interruptMode = 0;
? ? ? //判斷結(jié)點是否同步隊列(SyncQueue)中,即是否被喚醒
? ? ? while (!isOnSyncQueue(node)) {
? ? ? ? ? //掛起線程
? ? ? ? ? LockSupport.park(this);
? ? ? ? ? //判斷是否被中斷喚醒掸宛,如果是退出循環(huán)死陆。
? ? ? ? ? if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
? ? ? ? ? ? ? break;
? ? ? }
? ? ? //被喚醒后執(zhí)行自旋操作爭取獲得鎖,同時判斷線程是否被中斷
? ? ? if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
? ? ? ? ? interruptMode = REINTERRUPT;
? ? ? // clean up if cancelled
? ? ? if (node.nextWaiter != null)
? ? ? ? ? //清理等待隊列中不為CONDITION狀態(tài)的結(jié)點
? ? ? ? ? unlinkCancelledWaiters();
? ? ? if (interruptMode != 0)
? ? ? ? ? reportInterruptAfterWait(interruptMode);
? }
執(zhí)行addConditionWaiter()添加到等待隊列唧瘾。
private Node addConditionWaiter() {
? ? Node t = lastWaiter;
? ? ? // 判斷是否為結(jié)束狀態(tài)的結(jié)點并移除
? ? ? if (t != null && t.waitStatus != Node.CONDITION) {
? ? ? ? ? unlinkCancelledWaiters();
? ? ? ? ? t = lastWaiter;
? ? ? }
? ? ? //創(chuàng)建新結(jié)點狀態(tài)為CONDITION
? ? ? Node node = new Node(Thread.currentThread(), Node.CONDITION);
? ? ? //加入等待隊列
? ? ? if (t == null)
? ? ? ? ? firstWaiter = node;
? ? ? else
? ? ? ? ? t.nextWaiter = node;
? ? ? lastWaiter = node;
? ? ? return node;
}
await()方法主要做了3件事措译,一是調(diào)用addConditionWaiter()方法將當前線程封裝成node結(jié)點加入等待隊列,二是調(diào)用fullyRelease(node)方法釋放同步狀態(tài)并喚醒后繼結(jié)點的線程饰序。三是調(diào)用isOnSyncQueue(node)方法判斷結(jié)點是否在同步隊列中领虹,注意是個while循環(huán),如果同步隊列中沒有該結(jié)點就直接掛起該線程求豫,需要明白的是如果線程被喚醒后就調(diào)用acquireQueued(node, savedState)執(zhí)行自旋操作爭取鎖塌衰,即當前線程結(jié)點從等待隊列轉(zhuǎn)移到同步隊列并開始努力獲取鎖诉稍。
接著看看喚醒操作singal()方法:
public final void signal() {
? ? //判斷是否持有獨占鎖,如果不是拋出異常
? if (!isHeldExclusively())
? ? ? ? ? throw new IllegalMonitorStateException();
? ? ? Node first = firstWaiter;
? ? ? //喚醒等待隊列第一個結(jié)點的線程
? ? ? if (first != null)
? ? ? ? ? doSignal(first);
}
這里signal()方法做了兩件事最疆,一是判斷當前線程是否持有獨占鎖杯巨,沒有就拋出異常,從這點也可以看出只有獨占模式先采用等待隊列努酸,而共享模式下是沒有等待隊列的服爷,也就沒法使用Condition。二是喚醒等待隊列的第一個結(jié)點获诈,即執(zhí)行doSignal(first):
private void doSignal(Node first) {
? ? do {
? ? ? ? ? ? //移除條件等待隊列中的第一個結(jié)點仍源,
? ? ? ? ? ? //如果后繼結(jié)點為null,那么說沒有其他結(jié)點將尾結(jié)點也設(shè)置為null
? ? ? ? ? ? if ( (firstWaiter = first.nextWaiter) == null)
? ? ? ? ? ? ? ? lastWaiter = null;
? ? ? ? ? ? first.nextWaiter = null;
? ? ? ? ? //如果被通知節(jié)點沒有進入到同步隊列并且條件等待隊列還有不為空的節(jié)點舔涎,則繼續(xù)循環(huán)通知后續(xù)結(jié)點
? ? ? ? } while (!transferForSignal(first) &&
? ? ? ? ? ? ? ? ? (first = firstWaiter) != null);
? ? ? ? }
//transferForSignal方法
final boolean transferForSignal(Node node) {
? ? //嘗試設(shè)置喚醒結(jié)點的waitStatus為0笼踩,即初始化狀態(tài)
? ? //如果設(shè)置失敗,說明當期結(jié)點node的waitStatus已不為
? ? //CONDITION狀態(tài)亡嫌,那么只能是結(jié)束狀態(tài)了嚎于,因此返回false
? ? //返回doSignal()方法中繼續(xù)喚醒其他結(jié)點的線程,注意這里并
? ? //不涉及并發(fā)問題昼伴,所以CAS操作失敗只可能是預(yù)期值不為CONDITION匾旭,
? ? //而不是多線程設(shè)置導致預(yù)期值變化,畢竟操作該方法的線程是持有鎖的圃郊。
? ? if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
? ? ? ? return false;
? ? ? ? //加入同步隊列并返回前驅(qū)結(jié)點p?
?? ? ? Node p = enq(node);
? ? ? ? int ws = p.waitStatus;
? ? ? ? //判斷前驅(qū)結(jié)點是否為結(jié)束結(jié)點(CANCELLED=1)或者在設(shè)置
? ? ? ? //前驅(qū)節(jié)點狀態(tài)為Node.SIGNAL狀態(tài)失敗時,喚醒被通知節(jié)點代表的線程
? ? ? ? if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
? ? ? ? ? ? //喚醒node結(jié)點的線程
? ? ? ? ? ? LockSupport.unpark(node.thread);
? ? ? ? return true;
? ? }
注釋說得很明白了女蜈,這里我們簡單整體說明一下持舆,doSignal(first)方法中做了兩件事,從條件等待隊列移除被喚醒的節(jié)點伪窖,然后重新維護條件等待隊列的firstWaiter和lastWaiter的指向逸寓。二是將從等待隊列移除的結(jié)點加入同步隊列(在transferForSignal()方法中完成的),如果進入到同步隊列失敗并且條件等待隊列還有不為空的節(jié)點覆山,則繼續(xù)循環(huán)喚醒后續(xù)其他結(jié)點的線程竹伸。到此整個signal()的喚醒過程就很清晰了,即signal()被調(diào)用后簇宽,先判斷當前線程是否持有獨占鎖勋篓,如果有,那么喚醒當前Condition對象中等待隊列的第一個結(jié)點的線程魏割,并從等待隊列中移除該結(jié)點譬嚣,移動到同步隊列中,如果加入同步隊列失敗钞它,那么繼續(xù)循環(huán)喚醒等待隊列中的其他結(jié)點的線程拜银,如果成功加入同步隊列殊鞭,那么如果其前驅(qū)結(jié)點是否已結(jié)束或者設(shè)置前驅(qū)節(jié)點狀態(tài)為Node.SIGNAL狀態(tài)失敗,則通過LockSupport.unpark()喚醒被通知節(jié)點代表的線程尼桶,到此signal()任務(wù)完成操灿,注意被喚醒后的線程,將從前面的await()方法中的while循環(huán)中退出泵督,因為此時該線程的結(jié)點已在同步隊列中趾盐,那么while (!isOnSyncQueue(node))將不在符合循環(huán)條件,進而調(diào)用AQS的acquireQueued()方法加入獲取同步狀態(tài)的競爭中幌蚊,這就是等待喚醒機制的整個流程實現(xiàn)原理谤碳,流程如下圖所示(注意無論是同步隊列還是等待隊列使用的Node數(shù)據(jù)結(jié)構(gòu)都是同一個,不過是使用的內(nèi)部變量不同罷了)溢豆。
ok~蜒简,本篇先到這,關(guān)于AQS中的另一種模式即共享模式漩仙,下篇再詳聊搓茬,歡迎繼續(xù)關(guān)注。