Condition 是一個(gè)多線程協(xié)調(diào)通信的工具類,可以讓某些線程一起等待某個(gè)條件(condition)红且,只有滿足條件時(shí)植袍,線程才會(huì)被喚醒拯欧,相當(dāng)于object的 wait 和 notify 的功能。
public class ConditionWait implements Runnable{
private Lock lock;
private Condition condition;
public ConditionWait(Lock lock, Condition condition) {
this.lock = lock;
this.condition = condition;
}
@Override
public void run() {
try {
lock.lock(); //競(jìng)爭(zhēng)鎖
try {
System.out.println("begin - ConditionWait");
condition.await();//阻塞(1. 釋放鎖, 2.阻塞當(dāng)前線程, FIFO(單向桨武、雙向))
System.out.println("end - ConditionWait");
} catch (InterruptedException e) {
e.printStackTrace();
}
}finally {
lock.unlock();//釋放鎖
}
}
}
public class ConditionNotify implements Runnable{
private Lock lock;
private Condition condition;
public ConditionNotify(Lock lock, Condition condition) {
this.lock = lock;
this.condition = condition;
}
@Override
public void run() {
try{
lock.lock();//獲得了鎖.
System.out.println("begin - conditionNotify");
condition.signal();//喚醒阻塞狀態(tài)的線程
System.out.println("end - conditionNotify");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock(); //釋放鎖
}
}
}
Condition 源碼分析
condition.await
調(diào)用 Condition肋拔,需要獲得 Lock 鎖,所以意味著會(huì)存在一個(gè) AQS 同步隊(duì)列呀酸,在上面那個(gè)案例中凉蜂,假如兩個(gè)線程同時(shí)運(yùn)行的話,那么 AQS 的隊(duì)列可能是下面這種情況
那么這個(gè)時(shí)候 ThreadA 調(diào)用了 condition.await 方法性誉,它做了什么事情呢窿吩?
public final void await() throws InterruptedException {
if (Thread.interrupted()) //表示 await 允許被中斷
throw new InterruptedException();
Node node = addConditionWaiter(); //創(chuàng)建一個(gè)新的節(jié)點(diǎn),節(jié)點(diǎn)狀態(tài)為 condition艾栋,采用的數(shù)據(jù)結(jié)構(gòu)仍然是鏈表
int savedState = fullyRelease(node); //釋放當(dāng)前的鎖爆存,得到鎖的狀態(tài),并喚醒 AQS 隊(duì)列中的一個(gè)線程
int interruptMode = 0;
//如果當(dāng)前節(jié)點(diǎn)沒有在同步隊(duì)列上蝗砾,即還沒有被 signal,則將當(dāng)前線程阻塞
while (!isOnSyncQueue(node)) {//判斷這個(gè)節(jié)點(diǎn)是否在 AQS 隊(duì)列上携冤,第一次判斷的是 false悼粮,因?yàn)榍懊嬉呀?jīng)釋放鎖了
LockSupport.park(this); //通過 park 掛起當(dāng)前線程
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
// 當(dāng)這個(gè)線程醒來,會(huì)嘗試拿鎖, 當(dāng) acquireQueued 返回 false 就是拿到鎖了.
// interruptMode != THROW_IE -> 表示這個(gè)線程沒有成功將 node 入隊(duì),但 signal 執(zhí)行了 enq 方法讓其入隊(duì)了.
// 將這個(gè)變量設(shè)置成 REINTERRUPT.
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
// 如果 node 的下一個(gè)等待者不是 null, 則進(jìn)行清理,清理 Condition 隊(duì)列上的節(jié)點(diǎn).
// 如果是 null ,就沒有什么好清理的了.
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
// 如果線程被中斷了,需要拋出異常.或者什么都不做
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
此方法使當(dāng)前線程進(jìn)入等待隊(duì)列并釋放鎖,同時(shí)線程狀態(tài)變?yōu)榈却隣顟B(tài)曾棕。當(dāng)從 await()方法返回扣猫,當(dāng)前線程一定獲取了 Condition相關(guān)聯(lián)的鎖
查看Node node = addConditionWaiter(); 方法看看
private Node addConditionWaiter() {
Node t = lastWaiter;
// 如果 lastWaiter 不等于空并且 waitStatus 不等于 CONDITION 時(shí),把這個(gè)節(jié)點(diǎn)從鏈表中移除
if (t != null && t.waitStatus != Node.CONDITION) {
unlinkCancelledWaiters();
t = lastWaiter;
}
//構(gòu)建一個(gè) Node翘地,waitStatus=CONDITION申尤。 這里的鏈表是一個(gè)單向的,所以相比 AQS 來說會(huì)簡(jiǎn)單很多
Node node = new Node(Thread.currentThread(), Node.CONDITION);
if (t == null)
firstWaiter = node;
else
t.nextWaiter = node;
lastWaiter = node;
return node;
}
這個(gè)方法的主要作用是把當(dāng)前線程封裝成 Node衙耕,添加到等待隊(duì)列昧穿。這里的隊(duì)列不再是雙向鏈表,而是單向鏈表
執(zhí)行完 addConditionWaiter 這個(gè)方法之后橙喘,就會(huì)產(chǎn)生一個(gè)這樣的 condition 隊(duì)
再看看fullyRelease(node); 方法
final int fullyRelease(Node node) {
boolean failed = true;
try {
int savedState = getState(); //獲得重入的次數(shù)
if (release(savedState)) {//釋放鎖并且喚醒下一個(gè)同步隊(duì)列中的線程
failed = false;
return savedState;
} else {
throw new IllegalMonitorStateException();
}
} finally {
if (failed)
node.waitStatus = Node.CANCELLED;
}
}
fullRelease时鸵,就是徹底的釋放鎖,什么叫徹底呢厅瞎,就是如果當(dāng)前鎖存在多次重入饰潜,那么在這個(gè)方法中只需要釋放一次就會(huì)把所有的重入次數(shù)歸零。
此時(shí)和簸,同步隊(duì)列會(huì)觸發(fā)鎖的釋放和重新競(jìng)爭(zhēng),ThreadB 獲得了鎖
接著看 while (!isOnSyncQueue(node)) 方法
final boolean isOnSyncQueue(Node node) {
if (node.waitStatus == Node.CONDITION || node.prev
== null)
return false;
if (node.next != null) // If has successor, it must
be on queue
return true;
return findNodeFromTail(node);
}
此方法判斷當(dāng)前節(jié)點(diǎn)是否在同步隊(duì)列中彭雾,返回 false 表示不在,返回 true 表示在
如果不在 AQS 同步隊(duì)列锁保,說明當(dāng)前節(jié)點(diǎn)沒有喚醒去爭(zhēng)搶同步鎖薯酝,所以需要把當(dāng)前線程阻塞起
來南誊,直到其他的線程調(diào)用 signal 喚醒
如果在 AQS 同步隊(duì)列,意味著它需要去競(jìng)爭(zhēng)同步鎖去獲得執(zhí)行程序執(zhí)行權(quán)限
為什么要做這個(gè)判斷呢蜜托?原因是在 condition 隊(duì)列中的節(jié)點(diǎn)會(huì)重新加入到 AQS 隊(duì)列去競(jìng)爭(zhēng)鎖抄囚。也就是當(dāng)調(diào)用 signal 的時(shí)候,會(huì)把當(dāng)前節(jié)點(diǎn)從 condition 隊(duì)列轉(zhuǎn)移到 AQS 隊(duì)列
? 思考一下橄务,基于現(xiàn)在的邏輯結(jié)構(gòu)幔托。如何去判斷 ThreadA 這個(gè)節(jié)點(diǎn)是否存在于 AQS 隊(duì)列中呢?
- 如果 ThreadA 的 waitStatus 的狀態(tài)為 CONDITION蜂挪,說明它存在于 condition 隊(duì)列中重挑,不在 AQS 隊(duì)列。因?yàn)?AQS 隊(duì)列的狀態(tài)一定不可能有 CONDITION
- 如果 node.prev 為空棠涮,說明也不存在于 AQS 隊(duì)列谬哀,原因是 prev=null 在 AQS 隊(duì)列中只有一種可能性,就是它是 head 節(jié)點(diǎn)严肪,head 節(jié)點(diǎn)意味著它是獲得鎖的節(jié)點(diǎn)史煎。
- 如果 node.next 不等于空,說明一定存在于 AQS 隊(duì)列中驳糯,因?yàn)橹挥?AQS 隊(duì)列才會(huì)存在next 和 prev 的關(guān)系
- findNodeFromTail篇梭,表示從 tail 節(jié)點(diǎn)往前掃描 AQS 隊(duì)列,一旦發(fā)現(xiàn) AQS 隊(duì)列的節(jié)點(diǎn)和當(dāng)前節(jié)點(diǎn)相等酝枢,說明節(jié)點(diǎn)一定存在于 AQS 隊(duì)列中
Condition.signal
await 方法會(huì)阻塞 ThreadA恬偷,然后 ThreadB 搶占到了鎖獲得了執(zhí)行權(quán)限,這個(gè)時(shí)候在 ThreadB中調(diào)用了 Condition 的 signal()方法帘睦,將會(huì)喚醒在等待隊(duì)列中節(jié)點(diǎn)
public final void signal() {
if (!isHeldExclusively()) //先判斷當(dāng)前線程是否獲得了鎖袍患,這個(gè)判斷比較簡(jiǎn)單,直接
用獲得鎖的線程和當(dāng)前線程相比即可
throw new IllegalMonitorStateException();
Node first = firstWaiter; // 拿到 Condition 隊(duì)列上第一個(gè)節(jié)點(diǎn)
if (first != null)
doSignal(first);
}
private void doSignal(Node first) {
do {
//從 Condition 隊(duì)列中刪除 first 節(jié)點(diǎn)
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null; // 將 next 節(jié)點(diǎn)設(shè)置成 null
first.nextWaiter = null;
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}
對(duì) condition 隊(duì)列中從首部開始的第一個(gè) condition 狀態(tài)的節(jié)點(diǎn)竣付,執(zhí)行 transferForSignal 操作诡延,將 node 從 condition 隊(duì)列中轉(zhuǎn)換到 AQS 隊(duì)列中,同時(shí)修改 AQS 隊(duì)列中原先尾節(jié)點(diǎn)的狀態(tài)
接著看transferForSignal方法
final boolean transferForSignal(Node node) {
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))//更新節(jié)點(diǎn)的狀態(tài)為0卑笨,如果更新失敗孕暇,只有一種可能就是節(jié)點(diǎn)被 CANCELLED 了
return false;
Node p = enq(node);//調(diào)用 enq,把當(dāng)前節(jié)點(diǎn)添加到 AQS 隊(duì)列赤兴。并且返回返回按當(dāng)前節(jié)點(diǎn)的上一個(gè)節(jié)點(diǎn)妖滔,也就是原 tail 節(jié)點(diǎn)
int ws = p.waitStatus;
// 如果上一個(gè)節(jié)點(diǎn)的狀態(tài)被取消了, 或者嘗試設(shè)置上一個(gè)節(jié)點(diǎn)的狀態(tài)為 SIGNAL 失敗了(SIGNAL 表示它的 next 節(jié)點(diǎn)需要停止阻塞),
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread); // 喚醒節(jié)點(diǎn)上的線程.
return true; //如果 node 的 prev 節(jié)點(diǎn)已經(jīng)是 signal 狀態(tài),那么被阻塞的 ThreadA 的喚醒工作由 AQS 隊(duì)列來完成
}
該方法先是 CAS 修改了節(jié)點(diǎn)狀態(tài)桶良,如果成功座舍,就將這個(gè)節(jié)點(diǎn)放到 AQS 隊(duì)列中,然后喚醒這個(gè)節(jié)點(diǎn)上的線程陨帆。此時(shí)曲秉,那個(gè)節(jié)點(diǎn)就會(huì)在 await 方法中蘇醒
執(zhí)行完 doSignal 以后采蚀,會(huì)把 condition 隊(duì)列中的節(jié)點(diǎn)轉(zhuǎn)移到 aqs 隊(duì)列上,邏輯結(jié)構(gòu)圖如下,這個(gè)時(shí)候會(huì)判斷 ThreadA 的 prev 節(jié)點(diǎn)也就是 head 節(jié)點(diǎn)的 waitStatus承二,如果大于 0 或者設(shè)置 SIGNAL 失敗榆鼠,表示節(jié)點(diǎn)被設(shè)置成了 CANCELLED 狀態(tài)。這個(gè)時(shí)候會(huì)喚醒 ThreadA 這個(gè)線程亥鸠。否則就基于 AQS 隊(duì)列的機(jī)制來喚醒妆够,也就是等到 ThreadB 釋放鎖之后來喚醒ThreadA
被阻塞的線程喚醒后的邏輯
前面在分析 await 方法時(shí),線程會(huì)被阻塞负蚊。而通過 signal 被喚醒之后又繼續(xù)回到上次執(zhí)行的
checkInterruptWhileWaiting方法那里神妹。
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
Node node = addConditionWaiter();
int savedState = fullyRelease(node);
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
//喚醒之后接著往下走
if ((interruptMode =
checkInterruptWhileWaiting(node)) != 0)
break;
}
if (acquireQueued(node, savedState) &&
interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if
cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
checkInterruptWhileWaiting 這個(gè)方法是干嘛呢?其實(shí)從名字就可以看出來家妆,就是 ThreadA在 condition 隊(duì)列被阻塞的過程中鸵荠,有沒有被其他線程觸發(fā)過中斷請(qǐng)求
private int checkInterruptWhileWaiting(Node node) {
return Thread.interrupted() ?
(transferAfterCancelledWait(node) ? THROW_IE :
REINTERRUPT) : 0;
}
final boolean transferAfterCancelledWait(Node node) {
//使用 cas 修改節(jié)點(diǎn)狀態(tài),如果還能修改成功伤极,說明線程被中斷時(shí)蛹找, signal 還沒有被調(diào)用泻轰。
// 這里有一個(gè)知識(shí)點(diǎn)瞧哟,就是線程被喚醒,并不一定是在 java 層面執(zhí)行了
//locksupport.unpark兼贸,也可能是調(diào)用了線程的 interrupt()方法齿税,這
//個(gè)方法會(huì)更新一個(gè)中斷標(biāo)識(shí),并且會(huì)喚醒處于阻塞狀態(tài)下的線程炊豪。
if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
enq(node); //如果 cas 成功凌箕,則把 node 添加到 AQS 隊(duì)列
return true;
}
//如果 cas 失敗,則判斷當(dāng)前 node 是否已經(jīng)在 AQS 隊(duì)列上词渤,如果不在牵舱,則讓給其他線程執(zhí)行
//當(dāng) node 被觸發(fā)了 signal 方法時(shí),node 就會(huì)被加到 aqs 隊(duì)列上
while (!isOnSyncQueue(node))//循環(huán)檢測(cè) node 是否已經(jīng)成功添加到 AQS 隊(duì)列中缺虐。如果沒有芜壁,則通過 yield,
Thread.yield();
return false;
}
如果當(dāng)前線程被中斷高氮,則調(diào)用 transferAfterCancelledWait 方法判斷后續(xù)的處理應(yīng)該是拋出
InterruptedException 還是重新中斷慧妄。
這里需要注意的地方是,如果第一次 CAS 失敗了剪芍,則不能判斷當(dāng)前線程是先進(jìn)行了中斷還是先進(jìn)行了 signal 方法的調(diào)用塞淹,可能是先執(zhí)行了 signal 然后中斷,也可能是先執(zhí)行了中斷罪裹,后執(zhí)行了 signal饱普,當(dāng)然运挫,這兩個(gè)操作肯定是發(fā)生在 CAS 之前。這時(shí)需要做的就是等待當(dāng)前線程的 node 被添加到 AQS 隊(duì)列后套耕,也就是 enq 方法返回后谁帕,返回 false 告訴checkInterruptWhileWaiting 方法返回 REINTERRUPT(1),后續(xù)進(jìn)行重新中斷冯袍。簡(jiǎn)單來說匈挖,該方法的返回值代表當(dāng)前線程是否在 park 的時(shí)候被中斷喚醒,如果為 true 表示中斷在 signal 調(diào)用之前颠猴,signal 還未執(zhí)行关划,那么這個(gè)時(shí)候會(huì)根據(jù) await 的語義,在 await 時(shí)遇到中斷需要拋出 interruptedException翘瓮,返回 true 就是告訴checkInterruptWhileWaiting返回 THROW_IE(-1)贮折。如果返回 false,否則表示 signal 已經(jīng)執(zhí)行過了资盅,只需要重新響應(yīng)中斷即可
再繼續(xù)走acquireQueued方法
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();//獲取當(dāng)前節(jié)點(diǎn)的 prev 節(jié)點(diǎn)
if (p == head && tryAcquire(arg)) {//如果是 head 節(jié)點(diǎn)调榄,說明有資格去爭(zhēng)搶鎖
setHead(node);//獲取鎖成功,也就是ThreadA 已經(jīng)釋放了鎖呵扛,然后設(shè)置 head 為 ThreadB 獲得執(zhí)行權(quán)限
p.next = null; //把原 head 節(jié)點(diǎn)從鏈表中移除
failed = false;
return interrupted;
}
//ThreadA 可能還沒釋放鎖每庆,使得 ThreadB 在執(zhí)行 tryAcquire 時(shí)會(huì)返回 false
if (shouldParkAfterFailedAcquire(p,node) && parkAndCheckInterrupt())
interrupted = true; //并且返回當(dāng)前線程在等待過程中有沒有中斷過。
}
} finally {
if (failed)
cancelAcquire(node);
}
}
這個(gè)方法在講上一篇 ReentrantLock 分析過今穿,是當(dāng)前被喚醒的節(jié)點(diǎn) ThreadA 去搶占同步鎖缤灵。并且要恢復(fù)到原本的重入次數(shù)狀態(tài)。調(diào)用完這個(gè)方法之后蓝晒,AQS 隊(duì)列的狀態(tài)如下腮出,將 head 節(jié)點(diǎn)的 waitStatus 設(shè)置為-1,Signal 狀態(tài)芝薇。
最后看看最下面的reportInterruptAfterWait方法
private void reportInterruptAfterWait(int interruptMode) throws InterruptedException {
if (interruptMode == THROW_IE)
throw new InterruptedException();
else if (interruptMode == REINTERRUPT)
selfInterrupt();
}
根據(jù) checkInterruptWhileWaiting 方法返回的中斷標(biāo)識(shí)來進(jìn)行中斷上報(bào)胚嘲。
如果是 THROW_IE,則拋出中斷異常
如果是 REINTERRUPT洛二,則重新響應(yīng)中斷
await 和 signal 的總結(jié)
線程 awaitThread 先通過lock.lock()方法獲取鎖成功后調(diào)用了 condition.await 方法進(jìn)入等待隊(duì)列馋劈,而另一個(gè)線程signalThread 通過 lock.lock()方法獲取鎖成功后調(diào)用了 condition.signal 或者 signalAll 方法,使得線程 awaitThread 能夠有機(jī)會(huì)移入到同步隊(duì)列中晾嘶,當(dāng)其他線程釋放 lock 后使得線程 awaitThread 能夠有機(jī)會(huì)獲取 lock妓雾,從而使得線程 awaitThread 能夠從 await 方法中退出執(zhí)行后續(xù)操作。如果 awaitThread 獲取 lock 失敗會(huì)直接進(jìn)入到同步隊(duì)列变擒。
阻塞:await()方法中君珠,在線程釋放鎖資源之后,如果節(jié)點(diǎn)不在 AQS 等待隊(duì)列娇斑,則阻塞當(dāng)前線程策添,如果在等待隊(duì)列材部,則自旋等待嘗試獲取鎖
釋放:signal()后,節(jié)點(diǎn)會(huì)從 condition 隊(duì)列移動(dòng)到 AQS 等待隊(duì)列唯竹,則進(jìn)入正常鎖的獲取流程
——學(xué)自咕泡學(xué)院