原創(chuàng)文章&經(jīng)驗總結&從校招到A廠一路陽光一路滄桑
詳情請戳www.codercc.com
1.Condition簡介
任何一個java對象都天然繼承于Object類王暗,在線程間實現(xiàn)通信的往往會應用到Object的幾個方法俗壹,比如wait(),wait(long timeout),wait(long timeout, int nanos)與notify(),notifyAll()幾個方法實現(xiàn)等待/通知機制绷雏,同樣的, 在java Lock體系下依然會有同樣的方法實現(xiàn)等待/通知機制拙毫。從整體上來看Object的wait和notify/notify是與對象監(jiān)視器配合完成線程間的等待/通知機制棺禾,而Condition與Lock配合完成等待通知機制膘婶,前者是java底層級別的悬襟,后者是語言級別的拯刁,具有更高的可控制性和擴展性垛玻。兩者除了在使用方式上不同外,在功能特性上還是有很多的不同:
- Condition能夠支持不響應中斷亿驾,而通過使用Object方式不支持莫瞬;
- Condition能夠支持多個等待隊列(new 多個Condition對象)郭蕉,而Object方式只能支持一個召锈;
- Condition能夠支持超時時間的設置,而Object不支持
參照Object的wait和notify/notifyAll方法规求,Condition也提供了同樣的方法:
針對Object的wait方法
- void await() throws InterruptedException:當前線程進入等待狀態(tài)阻肿,如果其他線程調(diào)用condition的signal或者signalAll方法并且當前線程獲取Lock從await方法返回丛塌,如果在等待狀態(tài)中被中斷會拋出被中斷異常较解;
- long awaitNanos(long nanosTimeout):當前線程進入等待狀態(tài)直到被通知印衔,中斷或者超時奸焙;
- boolean await(long time, TimeUnit unit)throws InterruptedException:同第二種与帆,支持自定義時間單位
- boolean awaitUntil(Date deadline) throws InterruptedException:當前線程進入等待狀態(tài)直到被通知墨榄,中斷或者到了某個時間
針對Object的notify/notifyAll方法
- void signal():喚醒一個等待在condition上的線程袄秩,將該線程從等待隊列中轉(zhuǎn)移到同步隊列中,如果在同步隊列中能夠競爭到Lock則可以從等待方法中返回郭卫。
- void signalAll():與1的區(qū)別在于能夠喚醒所有等待在condition上的線程
2.Condition實現(xiàn)原理分析
2.1 等待隊列
要想能夠深入的掌握condition還是應該知道它的實現(xiàn)原理箱沦,現(xiàn)在我們一起來看看condiiton的源碼谓形。創(chuàng)建一個condition對象是通過lock.newCondition()
,而這個方法實際上是會new出一個ConditionObject對象疆前,該類是AQS(AQS的實現(xiàn)原理的文章)的一個內(nèi)部類竹椒,有興趣可以去看看。前面我們說過书释,condition是要和lock配合使用的也就是condition和Lock是綁定在一起的爆惧,而lock的實現(xiàn)原理又依賴于AQS锨能,自然而然ConditionObject作為AQS的一個內(nèi)部類無可厚非。我們知道在鎖機制的實現(xiàn)上熄阻,AQS內(nèi)部維護了一個同步隊列,如果是獨占式鎖的話坝初,所有獲取鎖失敗的線程的尾插入到同步隊列脖卖,同樣的,condition內(nèi)部也是使用同樣的方式袖扛,內(nèi)部維護了一個 等待隊列,所有調(diào)用condition.await方法的線程會加入到等待隊列中唇礁,并且線程狀態(tài)轉(zhuǎn)換為等待狀態(tài)盏筐。另外注意到ConditionObject中有兩個成員變量:
/** First node of condition queue. */
private transient Node firstWaiter;
/** Last node of condition queue. */
private transient Node lastWaiter;
這樣我們就可以看出來ConditionObject通過持有等待隊列的頭尾指針來管理等待隊列。主要注意的是Node類復用了在AQS中的Node類琢融,其節(jié)點狀態(tài)和相關屬性可以去看AQS的實現(xiàn)原理的文章漾抬,如果您仔細看完這篇文章對condition的理解易如反掌常遂,對lock體系的實現(xiàn)也會有一個質(zhì)的提升克胳。Node類有這樣一個屬性:
//后繼節(jié)點
Node nextWaiter;
進一步說明,等待隊列是一個單向隊列捏雌,而在之前說AQS時知道同步隊列是一個雙向隊列腹忽。接下來我們用一個demo,通過debug進去看是不是符合我們的猜想:
public static void main(String[] args) {
for (int i = 0; i < 10; i++) {
Thread thread = new Thread(() -> {
lock.lock();
try {
condition.await();
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
lock.unlock();
}
});
thread.start();
}
}
這段代碼沒有任何實際意義嘹锁,甚至很臭领猾,只是想說明下我們剛才所想的骇扇。新建了10個線程少孝,沒有線程先獲取鎖,然后調(diào)用condition.await方法釋放鎖將當前線程加入到等待隊列中袁翁,通過debug控制當走到第10個線程的時候查看firstWaiter
即等待隊列中的頭結點粱胜,debug模式下情景圖如下:
從這個圖我們可以很清楚的看到這樣幾點:1. 調(diào)用condition.await方法后線程依次尾插入到等待隊列中焙压,如圖隊列中的線程引用依次為Thread-0,Thread-1,Thread-2....Thread-8抑钟;2. 等待隊列是一個單向隊列。通過我們的猜想然后進行實驗驗證掀抹,我們可以得出等待隊列的示意圖如下圖所示:
同時還有一點需要注意的是:我們可以多次調(diào)用lock.newCondition()方法創(chuàng)建多個condition對象傲武,也就是一個lock可以持有多個等待隊列揪利。而在之前利用Object的方式實際上是指在對象Object對象監(jiān)視器上只能擁有一個同步隊列和一個等待隊列疟位,而并發(fā)包中的Lock擁有一個同步隊列和多個等待隊列喘垂。示意圖如下:
如圖所示,ConditionObject是AQS的內(nèi)部類得院,因此每個ConditionObject能夠訪問到AQS提供的方法祥绞,相當于每個Condition都擁有所屬同步器的引用。
2.2 await實現(xiàn)原理
當調(diào)用condition.await()方法后會使得當前獲取lock的線程進入到等待隊列两踏,如果該線程能夠從await()方法返回的話一定是該線程獲取了與condition相關聯(lián)的lock梦染。接下來弓坞,我們還是從源碼的角度去看车荔,只有熟悉了源碼的邏輯我們的理解才是最深的忧便。await()方法源碼為:
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
// 1. 將當前線程包裝成Node珠增,尾插入到等待隊列中
Node node = addConditionWaiter();
// 2. 釋放當前線程所占用的lock蒂教,在釋放的過程中會喚醒同步隊列中的下一個節(jié)點
int savedState = fullyRelease(node);
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
// 3. 當前線程進入到等待狀態(tài)
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
// 4. 自旋等待獲取到同步狀態(tài)(即獲取到lock)
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
// 5. 處理被中斷的情況
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
代碼的主要邏輯請看注釋凝垛,我們都知道當當前線程調(diào)用condition.await()方法后蜓谋,會使得當前線程釋放lock然后加入到等待隊列中桃焕,直至被signal/signalAll后會使得當前線程從等待隊列中移至到同步隊列中去,直到獲得了lock后才會從await方法返回让网,或者在等待時被中斷會做中斷處理。那么關于這個實現(xiàn)過程我們會有這樣幾個問題:1. 是怎樣將當前線程添加到等待隊列中去的而账?2.釋放鎖的過程福扬?3.怎樣才能從await方法退出惜犀?而這段代碼的邏輯就是告訴我們這三個問題的答案。具體請看注釋汽烦,在第1步中調(diào)用addConditionWaiter將當前線程添加到等待隊列中撇吞,該方法源碼為:
private Node addConditionWaiter() {
Node t = lastWaiter;
// If lastWaiter is cancelled, clean out.
if (t != null && t.waitStatus != Node.CONDITION) {
unlinkCancelledWaiters();
t = lastWaiter;
}
//將當前線程包裝成Node
Node node = new Node(Thread.currentThread(), Node.CONDITION);
if (t == null)
firstWaiter = node;
else
//尾插入
t.nextWaiter = node;
//更新lastWaiter
lastWaiter = node;
return node;
}
這段代碼就很容易理解了牍颈,將當前節(jié)點包裝成Node琅关,如果等待隊列的firstWaiter為null的話(等待隊列為空隊列),則將firstWaiter指向當前的Node,否則画机,更新lastWaiter(尾節(jié)點)即可步氏。就是通過尾插入的方式將當前線程封裝的Node插入到等待隊列中即可徒爹,同時可以看出等待隊列是一個不帶頭結點的鏈式隊列隆嗅,之前我們學習AQS時知道同步隊列是一個帶頭結點的鏈式隊列,這是兩者的一個區(qū)別铺董。將當前節(jié)點插入到等待對列之后精续,會使當前線程釋放lock,由fullyRelease方法實現(xiàn)顷级,fullyRelease源碼為:
final int fullyRelease(Node node) {
boolean failed = true;
try {
int savedState = getState();
if (release(savedState)) {
//成功釋放同步狀態(tài)
failed = false;
return savedState;
} else {
//不成功釋放同步狀態(tài)拋出異常
throw new IllegalMonitorStateException();
}
} finally {
if (failed)
node.waitStatus = Node.CANCELLED;
}
}
這段代碼就很容易理解了弓颈,調(diào)用AQS的模板方法release方法釋放AQS的同步狀態(tài)并且喚醒在同步隊列中頭結點的后繼節(jié)點引用的線程翔冀,如果釋放成功則正常返回纤子,若失敗的話就拋出異常款票。到目前為止,這兩段代碼已經(jīng)解決了前面的兩個問題的答案了卡乾,還剩下第三個問題幔妨,怎樣從await方法退出?現(xiàn)在回過頭再來看await方法有這樣一段邏輯:
while (!isOnSyncQueue(node)) {
// 3. 當前線程進入到等待狀態(tài)
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
很顯然钙姊,當線程第一次調(diào)用condition.await()方法時煞额,會進入到這個while()循環(huán)中,然后通過LockSupport.park(this)方法使得當前線程進入等待狀態(tài)胀莹,那么要想退出這個await方法第一個前提條件自然而然的是要先退出這個while循環(huán)描焰,出口就只剩下兩個地方:1. 邏輯走到break退出while循環(huán);2. while循環(huán)中的邏輯判斷為false篱竭。再看代碼出現(xiàn)第1種情況的條件是當前等待的線程被中斷后代碼會走到break退出掺逼,第二種情況是當前節(jié)點被移動到了同步隊列中(即另外線程調(diào)用的condition的signal或者signalAll方法)瓤介,while中邏輯判斷為false后結束while循環(huán)刑桑。總結下病梢,就是當前線程被中斷或者調(diào)用condition.signal/condition.signalAll方法當前節(jié)點移動到了同步隊列后 蜓陌,這是當前線程退出await方法的前提條件钮热。當退出while循環(huán)后就會調(diào)用acquireQueued(node, savedState)
,這個方法在介紹AQS的底層實現(xiàn)時說過了隧期,若感興趣的話可以去看這篇文章仆潮,該方法的作用是在自旋過程中線程不斷嘗試獲取同步狀態(tài)遣臼,直至成功(線程獲取到lock)揍堰。這樣也說明了退出await方法必須是已經(jīng)獲得了condition引用(關聯(lián))的lock。到目前為止隐砸,開頭的三個問題我們通過閱讀源碼的方式已經(jīng)完全找到了答案季希,也對await方法的理解加深。await方法示意圖如下圖:
如圖武通,調(diào)用condition.await方法的線程必須是已經(jīng)獲得了lock冶忱,也就是當前線程是同步隊列中的頭結點囚枪。調(diào)用該方法后會使得當前線程所封裝的Node尾插入到等待隊列中劳淆。
超時機制的支持
condition還額外支持了超時機制,使用者可調(diào)用方法awaitNanos,awaitUtil括勺。這兩個方法的實現(xiàn)原理疾捍,基本上與AQS中的tryAcquire方法如出一轍栏妖,關于tryAcquire可以仔細閱讀這篇文章的第3.4部分。
不響應中斷的支持
要想不響應中斷可以調(diào)用condition.awaitUninterruptibly()方法宛裕,該方法的源碼為:
public final void awaitUninterruptibly() {
Node node = addConditionWaiter();
int savedState = fullyRelease(node);
boolean interrupted = false;
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
if (Thread.interrupted())
interrupted = true;
}
if (acquireQueued(node, savedState) || interrupted)
selfInterrupt();
}
這段方法與上面的await方法基本一致揩尸,只不過減少了對中斷的處理屁奏,并省略了reportInterruptAfterWait方法拋被中斷的異常了袁。
2.3 signal/signalAll實現(xiàn)原理
調(diào)用condition的signal或者signalAll方法可以將等待隊列中等待時間最長的節(jié)點移動到同步隊列中湿颅,使得該節(jié)點能夠有機會獲得lock。按照等待隊列是先進先出(FIFO)的崭庸,所以等待隊列的頭節(jié)點必然會是等待時間最長的節(jié)點怕享,也就是每次調(diào)用condition的signal方法是將頭節(jié)點移動到同步隊列中。我們來通過看源碼的方式來看這樣的猜想是不是對的沙合,signal方法源碼為:
public final void signal() {
//1. 先檢測當前線程是否已經(jīng)獲取lock
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
//2. 獲取等待隊列中第一個節(jié)點首懈,之后的操作都是針對這個節(jié)點
Node first = firstWaiter;
if (first != null)
doSignal(first);
}
signal方法首先會檢測當前線程是否已經(jīng)獲取lock谨敛,如果沒有獲取lock會直接拋出異常脸狸,如果獲取的話再得到等待隊列的頭指針引用的節(jié)點,之后的操作的doSignal方法也是基于該節(jié)點泥彤。下面我們來看看doSignal方法做了些什么事情全景,doSignal方法源碼為:
private void doSignal(Node first) {
do {
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
//1. 將頭結點從等待隊列中移除
first.nextWaiter = null;
//2. while中transferForSignal方法對頭結點做真正的處理
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}
具體邏輯請看注釋爸黄,真正對頭節(jié)點做處理的邏輯在transferForSignal放,該方法源碼為:
final boolean transferForSignal(Node node) {
/*
* If cannot change waitStatus, the node has been cancelled.
*/
//1. 更新狀態(tài)為0
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
/*
* Splice onto queue and try to set waitStatus of predecessor to
* indicate that thread is (probably) waiting. If cancelled or
* attempt to set waitStatus fails, wake up to resync (in which
* case the waitStatus can be transiently and harmlessly wrong).
*/
//2.將該節(jié)點移入到同步隊列中去
Node p = enq(node);
int ws = p.waitStatus;
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread);
return true;
}
關鍵邏輯請看注釋,這段代碼主要做了兩件事情1.將頭結點的狀態(tài)更改為CONDITION野崇;2.調(diào)用enq方法乓梨,將該節(jié)點尾插入到同步隊列中,關于enq方法請看AQS的底層實現(xiàn)這篇文章≡搪拢現(xiàn)在我們可以得出結論:調(diào)用condition的signal的前提條件是當前線程已經(jīng)獲取了lock臭觉,該方法會使得等待隊列中的頭節(jié)點即等待時間最長的那個節(jié)點移入到同步隊列,而移入到同步隊列后才有機會使得等待線程被喚醒狞膘,即從await方法中的LockSupport.park(this)方法中返回挽封,從而才有機會使得調(diào)用await方法的線程成功退出。signal執(zhí)行示意圖如下圖:
signalAll
sigllAll與sigal方法的區(qū)別體現(xiàn)在doSignalAll方法上和悦,前面我們已經(jīng)知道doSignal方法只會對等待隊列的頭節(jié)點進行操作鸽素,亦鳞,而doSignalAll的源碼為:
private void doSignalAll(Node first) {
lastWaiter = firstWaiter = null;
do {
Node next = first.nextWaiter;
first.nextWaiter = null;
transferForSignal(first);
first = next;
} while (first != null);
}
該方法只不過時間等待隊列中的每一個節(jié)點都移入到同步隊列中,即“通知”當前調(diào)用condition.await()方法的每一個線程遭笋。
3. await與signal/signalAll的結合思考
文章開篇提到等待/通知機制徒探,通過使用condition提供的await和signal/signalAll方法就可以實現(xiàn)這種機制测暗,而這種機制能夠解決最經(jīng)典的問題就是“生產(chǎn)者與消費者問題”碗啄,關于“生產(chǎn)者消費者問題”之后會用單獨的一篇文章進行講解,這也是面試的高頻考點饲宿。await和signal和signalAll方法就像一個開關控制著線程A(等待方)和線程B(通知方)瘫想。它們之間的關系可以用下面一個圖來表現(xiàn)得更加貼切:
如圖国夜,線程awaitThread先通過lock.lock()方法獲取鎖成功后調(diào)用了condition.await方法進入等待隊列剧蚣,而另一個線程signalThread通過lock.lock()方法獲取鎖成功后調(diào)用了condition.signal或者signalAll方法鸠按,使得線程awaitThread能夠有機會移入到同步隊列中目尖,當其他線程釋放lock后使得線程awaitThread能夠有機會獲取lock,從而使得線程awaitThread能夠從await方法中退出執(zhí)行后續(xù)操作饮戳。如果awaitThread獲取lock失敗會直接進入到同步隊列扯罐。
3. 一個例子
我們用一個很簡單的例子說說condition的用法:
public class AwaitSignal {
private static ReentrantLock lock = new ReentrantLock();
private static Condition condition = lock.newCondition();
private static volatile boolean flag = false;
public static void main(String[] args) {
Thread waiter = new Thread(new waiter());
waiter.start();
Thread signaler = new Thread(new signaler());
signaler.start();
}
static class waiter implements Runnable {
@Override
public void run() {
lock.lock();
try {
while (!flag) {
System.out.println(Thread.currentThread().getName() + "當前條件不滿足等待");
try {
condition.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println(Thread.currentThread().getName() + "接收到通知條件滿足");
} finally {
lock.unlock();
}
}
}
static class signaler implements Runnable {
@Override
public void run() {
lock.lock();
try {
flag = true;
condition.signalAll();
} finally {
lock.unlock();
}
}
}
}
輸出結果為:
Thread-0當前條件不滿足等待
Thread-0接收到通知歹河,條件滿足
開啟了兩個線程waiter和signaler秸歧,waiter線程開始執(zhí)行的時候由于條件不滿足键菱,執(zhí)行condition.await方法使該線程進入等待狀態(tài)同時釋放鎖,signaler線程獲取到鎖之后更改條件经备,并通知所有的等待線程后釋放鎖弄喘。這時甩牺,waiter線程獲取到鎖,并由于signaler線程更改了條件此時相對于waiter來說條件滿足急但,繼續(xù)執(zhí)行波桩。
參考文獻
《java并發(fā)編程的藝術》