- AQS(上)-獨占模式
- AQS(中)-共享模式
-
AQS(下)-Condition
在之前的文章中已經(jīng)介紹了獨占模式和共享模式獲取資源以及使用資源的分析了,現(xiàn)在開始介紹Condition
.Condition
是用來替換監(jiān)視鎖的wait/notify
,但是它比wait/notify
這種機制更加靈活.在我之前寫的ReentrantLock中介紹過如何使用.如果還不知道它是如何使用的可以先參照該文章中對Condition
使用的介紹再來看本文.本文主要是將如何在AQS中是如何來實現(xiàn)的.
Condition Queue
在前面的文章中說過AQS中存在一個變種CLH實現(xiàn)的隊列,而AQS中的Condition
其實也就是一個隊列,該隊列是一個單向鏈表實現(xiàn)的隊列,在這里我就叫他Condition Queue(條件隊列)
了.在AQS中有一個ConditionObject
內(nèi)部類,該類就實現(xiàn)了Condition
接口.在該類中有幾個比較重要的屬性.
-
firstWaiter
:頭節(jié)點,用來指示隊列的頭部. -
lastWaiter
:尾節(jié)點,用來指示隊列的尾部.
鏈表中的節(jié)點就是就是我們之前介紹的Node
.節(jié)點中有一個屬性nextWaiter
在我們之前的介紹中它是用來標(biāo)記節(jié)點是SHARED
還是EXCLUSIVE
.但是在同步隊列中它的作用是用來指示下一個節(jié)點,這樣就形成了一個單向鏈表了.簡單的數(shù)據(jù)結(jié)構(gòu)就如下圖所示:
上圖中:
-
thread
:用來保存正在等待條件的線程. -
waitStatus
:用來保存節(jié)點的狀態(tài). -
nextWaiter
:用來指示下一個節(jié)點.
而構(gòu)建同步隊列中關(guān)鍵屬性prev
和next
在同步隊列中并任何作用,我們暫時不需要關(guān)注他們,在這里我們需要關(guān)系的就是上面幾個屬性了.我們沒創(chuàng)建一個Condition
其實就是創(chuàng)建了一個新的條件隊列.
await
通過上面對Condition Queue
的大致了解我們現(xiàn)在開始從await
方法開始分析.
public final void await() throws InterruptedException {
//判斷線程是否已中斷
if (Thread.interrupted())
throw new InterruptedException();
//將當(dāng)前線程封裝成Node添加到條件隊列并返回Node
Node node = addConditionWaiter();
//完全釋放當(dāng)前Node所攜帶的資源
int savedState = fullyRelease(node);
//中斷類型
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
//掛起線程
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
上面是調(diào)用await
方法時的內(nèi)容,我們先分析到LockSupport.park(this);
,即當(dāng)前線程掛起這個地方,后續(xù)內(nèi)容稍后分析.首先它先判斷調(diào)用await
方法的線程是否中斷,如果中斷了直接拋出中斷異常.接著它調(diào)用addConditionWaiter()
方法將當(dāng)前線程添加到同步隊列,我們來看這個方法內(nèi)部如何實現(xiàn).
private Node addConditionWaiter() {
//使用臨時變量t保存舊的尾節(jié)點
Node t = lastWaiter;
//如果尾節(jié)點不為空且尾節(jié)點的waitStatus不是CONDITION,則刪除無效節(jié)點.
if (t != null && t.waitStatus != Node.CONDITION) {
unlinkCancelledWaiters();
//刪除無效節(jié)點后再次使用t保存尾節(jié)點
t = lastWaiter;
}
//將當(dāng)前線程封裝成Node,狀態(tài)為:CONDITION(-2)
Node node = new Node(Thread.currentThread(), Node.CONDITION);
if (t == null)
//如果沒有尾節(jié)點,說明隊列是空的,初始化頭節(jié)點
firstWaiter = node;
else
//隊列已經(jīng)初始化過了,將舊的尾節(jié)點的下一個節(jié)點屬性指向當(dāng)前創(chuàng)建的節(jié)點(其實就是將當(dāng)前節(jié)點添加到尾部)
t.nextWaiter = node;
//設(shè)置當(dāng)前節(jié)點為新的尾節(jié)點
lastWaiter = node;
//返回新創(chuàng)建的節(jié)點
return node;
}
上面這個方法內(nèi)容注釋已經(jīng)講的很清楚了.主要任務(wù)就是將當(dāng)前線程封裝成Node
,然后將這個封裝好的Node
添加到隊列的尾部.需要注意的是,在將當(dāng)前節(jié)點添加到尾節(jié)點前,需要找到有效的尾節(jié)點,并將無效的節(jié)點從條件隊列中刪除.
private void unlinkCancelledWaiters() {
Node t = firstWaiter;
Node trail = null;
while (t != null) {
Node next = t.nextWaiter;
if (t.waitStatus != Node.CONDITION) {
t.nextWaiter = null;
if (trail == null)
firstWaiter = next;
else
trail.nextWaiter = next;
if (next == null)
lastWaiter = trail;
}
else
trail = t;
t = next;
}
}
上面的代碼其實就是刪除鏈表中無用節(jié)點的實現(xiàn),該方法是從鏈表的頭部開始往后開始刪除無效節(jié)點,主要就是解除節(jié)點nextWaiter
的引用和設(shè)置firstWaiter
和lastWaiter
引用來實現(xiàn).這就是一個簡單單向鏈表的刪除操作在這里不多說了.
這是后調(diào)用await()
方法已經(jīng)走到了int savedState = fullyRelease(node);
這里了.這個方法主要就是用來完全釋放共享資源的
.完全釋放的意思就是一次性釋放所有的共享資源.因為對于可重入鎖而言,它是可以多次獲取同步資源的.我們來看看這個方法如何實現(xiàn):
final int fullyRelease(Node node) {
//用來標(biāo)記中間是否出現(xiàn)異常
boolean failed = true;
try {
//獲取當(dāng)前的共享資源數(shù)
int savedState = getState();
//釋放指定數(shù)目共享資源
if (release(savedState)) {
//釋放成功,將失敗標(biāo)記標(biāo)記成false,并返回釋放資源的個數(shù)
failed = false;
return savedState;
} else {
//釋放資源失敗
throw new IllegalMonitorStateException();
}
} finally {
//如果失敗需要將當(dāng)前節(jié)點設(shè)置成CANCELLED便于清除無效節(jié)點
if (failed)
node.waitStatus = Node.CANCELLED;
}
}
代碼中的注釋已經(jīng)展示了代碼的執(zhí)行邏輯,關(guān)于release(savedState)
方法在之前的獨占模式釋放鎖中已做解釋,這里不再做講解了.如果釋放鎖失敗,則拋出IllegalMonitorStateException
異常.這里正好驗證了我們說的在調(diào)用await()方法前必須先獲取鎖,否則會報異常
的說明.如果節(jié)點釋放鎖失敗我們需要將該節(jié)點狀態(tài)設(shè)置為CANCELLED
,便于我們后面將它刪除.這也就是我們之前在調(diào)用addConditionWaiter
時為什么需要檢查尾節(jié)點是否已經(jīng)取消了的操作.
接下來通過isOnSyncQueue(node)
即當(dāng)前節(jié)點是不是在同步隊列中判斷是不是要掛起線程.可以看見它是一個while循環(huán),第一次isOnSyncQueue(node)
返回的則是false
,則會調(diào)用LockSupport.park(this)
將當(dāng)前線程掛起.我們來看isOnSyncQueue
方法內(nèi)部如何實現(xiàn).
//判斷當(dāng)前節(jié)點是不是在同步隊列
final boolean isOnSyncQueue(Node node) {
//通過CONDITION或者prev判斷
if (node.waitStatus == Node.CONDITION || node.prev == null)
return false;
//通過后繼節(jié)點判斷
if (node.next != null)
return true;
return findNodeFromTail(node);
}
//從尾節(jié)點往前開始找該節(jié)點是不是在同步隊列
private boolean findNodeFromTail(Node node) {
Node t = tail;
for (;;) {
if (t == node)
//存在
return true;
if (t == null)
//已到遍歷完了還是沒找到
return false;
//設(shè)置為前置節(jié)點繼續(xù)找(從尾到頭)
t = t.prev;
}
}
為什么要判斷當(dāng)前節(jié)點是不是在同步隊列呢?在后面的signal
分析中會講為什么.現(xiàn)在你只需要知道調(diào)用signal
之后位于條件隊列中的節(jié)點會被轉(zhuǎn)移到同步隊列.對于上面兩個判斷條件我這里做一下解釋:
node.waitStatus == Node.CONDITION || node.prev == null
:因為位于同步隊列中節(jié)點它的狀態(tài)不可能為CONDITION
,而節(jié)點添加到同步隊列的方式都是從尾節(jié)點開始插入的,而且插入都是先設(shè)置pre節(jié)點,然后再通過CAS的方式將自己設(shè)置成尾節(jié)點.如果還是不明白請看同步隊列節(jié)點入隊相關(guān)代碼.node.next != null
:在條件隊列中,next
是沒有用到的,所以它的默認值應(yīng)該為null
.
經(jīng)過上面分析,線程調(diào)用await
之后會在方法內(nèi)部中調(diào)用LockSupport.park(this)
使線程掛起.后面的代碼分析我們在講完signal
之后繼續(xù).
signal
public final void signal() {
//判斷當(dāng)前持有鎖的線程是不是當(dāng)前線程
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
//如果頭節(jié)點不是null,則調(diào)用doSignal
if (first != null)
doSignal(first);
}
isHeldExclusively()
這個方法在AQS中并沒有實現(xiàn),它是用來判斷當(dāng)前持有鎖的線程是不是當(dāng)前線程.這個方法需要同步器自己去實現(xiàn).如果當(dāng)前線程沒有持有鎖,則同樣拋出IllegalMonitorStateException
異常.這個里面最重要的方法就是doSignal(first)
,我們來看這個方法如何實現(xiàn).
private void doSignal(Node first) {
do {
//將頭節(jié)點指向當(dāng)前節(jié)點的下一個節(jié)點,然后判斷新頭節(jié)點是不是空
if ( (firstWaiter = first.nextWaiter) == null)
//如果新頭結(jié)點是空說明條件隊列中已經(jīng)沒有數(shù)據(jù)了,將lastWaiter設(shè)置為null
lastWaiter = null;
//將當(dāng)前節(jié)點的下一個節(jié)點設(shè)置成null,相當(dāng)于當(dāng)前節(jié)點是一個單獨節(jié)點,從隊列中刪除了
first.nextWaiter = null;
//將first節(jié)點移到同步隊列
//如果轉(zhuǎn)移失敗則重復(fù)上面的操作
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}
//將節(jié)點從條件隊列轉(zhuǎn)移到同步隊列
final boolean transferForSignal(Node node) {
//如果節(jié)點在同步隊列狀態(tài)無效,說明該節(jié)點被取消了直接返回fasle
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
//將該節(jié)點添加到同步隊列并返回節(jié)點的前驅(qū)節(jié)點(此處是以獨占模式入隊的)
Node p = enq(node);
int ws = p.waitStatus;
//如果當(dāng)前node中的線程無法通過AQS喚醒則直接在這里喚醒
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread);
return true;
}
上面的doSignal(Node first)
主要的操作就是將條件隊列中第一個未被cancalled
的節(jié)點"喚醒".首先它通過firstWaiter = first.nextWaiter
和first.nextWaiter = null
將頭節(jié)點移出,然后使用transferForSignal(first)
將它轉(zhuǎn)移到同步隊列.如果失敗則一直重復(fù)上述操作,直到轉(zhuǎn)移成功或者條件隊列為空(first = firstWaiter) == null
.
transferForSignal
方法將節(jié)點移動到同步隊列首先判斷節(jié)點是不是有效,無效則直接返回失敗.如果節(jié)點有效就將節(jié)點添加到同步隊列(Node p = enq(node)
)并返回節(jié)點的前置節(jié)點.接著判斷前置節(jié)點是不是有效(ws > 0
)或者設(shè)置前節(jié)點狀態(tài)為SIGNAL
是否成功來決定是不是要喚醒當(dāng)前節(jié)點.如果ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)
成立說明當(dāng)前node指向的節(jié)點無法通過AQS的隊列喚醒,所以在這里才會直接喚醒.而源碼中也解釋了in which case the waitStatus can be transiently and harmlessly wrong
.因為即使喚醒了節(jié)點并不意味著它就獲取到了鎖,它還是需要再去搶鎖,沒有搶到也沒有什么問題.
signalAll
通過對上面signal
的了解后,再看signalAll
就很輕松了.與signal
不同的是signal
一次只會將一個節(jié)點添加到同步隊列,而signalAll
會清空條件隊列,然后將隊列中的節(jié)點一個一個添加到同步隊列.
private void doSignalAll(Node first) {
//清空隊列
lastWaiter = firstWaiter = null;
//一個個將節(jié)點添加到同步隊列,直到所有節(jié)點都添加完了
do {
Node next = first.nextWaiter;
first.nextWaiter = null;
transferForSignal(first);
first = next;
} while (first != null);
}
await()中LockSupport.park(this)后分析
public final void await() throws InterruptedException {
//判斷線程是否已中斷
if (Thread.interrupted())
throw new InterruptedException();
//將當(dāng)前線程封裝成Node添加到條件隊列并返回Node
Node node = addConditionWaiter();
//完全釋放當(dāng)前Node所攜帶的資源
int savedState = fullyRelease(node);
//中斷類型
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
//掛起線程
LockSupport.park(this);
//判斷中途是否有中斷
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
接著分析之前await()
部分分析,線程在LockSupport.park(this)
后繼續(xù)執(zhí)行.而線程之所以被喚醒執(zhí)行只可能是當(dāng)前線程被中斷
或者其他線程調(diào)用了unpack
方法,不管是哪種情況導(dǎo)致的喚醒,最后該節(jié)點都將離開條件隊列
進入同步隊列
,然后使用acquireQueued
方式獲取到鎖最后才能退出await
方法.我們先知道是這樣的,然后通過中斷時間點作為分析起點開始分析.
從源碼可以知道,當(dāng)線程被喚醒后就開始調(diào)用checkInterruptWhileWaiting
來判斷線程是否被中斷了.這個方法不僅能分析出線程是否被中斷,還指示了再await()方法退出前需要做什么:
-
0
:代表整個過程沒有中斷發(fā)生. -
REINTERRUPT
:代表在退出await()方法前再次自我中斷即可. -
THROW_IE
:代表在退出await()方法前需要拋出中斷異常.
但是看完你肯定在想為什么要這么做呢?
首先你要清楚的一件事是acquireQueued
去獲取鎖這種方式并不會響應(yīng)中斷,這里所指的是在獲取鎖的過程中中斷和不中斷并不會有什么區(qū)別,唯一有區(qū)別的是線程的中斷狀態(tài)不一樣,而在該方法內(nèi)部并有根據(jù)中斷狀態(tài)做什么特殊操作,僅僅只是修改了返回值的中斷狀態(tài).關(guān)于acquireQueued
方法的內(nèi)部詳情,請參照之前寫的獨占模式獲取鎖
相關(guān)內(nèi)容.
正是因為acquireQueued
不會對中斷做特殊處理,我們需要判斷中斷發(fā)生的時間點.我們根據(jù)當(dāng)前節(jié)點所在隊列可以分為兩種情況:
-
在條件隊列
:說明在中斷發(fā)生時,它還沒有被signal屬于正常的等待狀態(tài)中,此時被中斷將導(dǎo)致正常的線程等待狀態(tài)被中斷,進入到同步隊列搶鎖.因此我們在退出await前需要拋出中斷異常用來代表是因為中斷而導(dǎo)致線程醒來.而這種情況剛好對應(yīng)的就是THROW_IE
. -
在同步隊列
:說明在中斷發(fā)生時,它已經(jīng)被signal,這個時候線程已經(jīng)位于同步隊列了.所以這個時候即便是發(fā)生了中斷,我們都將忽略它,所以僅僅只是在await退出前再次自我中斷即可.而這種情況對應(yīng)的就是THROW_IE
.
根據(jù)上面的分析,我們就按照線程中斷的情況做分析:
在條件隊列中被中斷
//檢查是否中斷
private int checkInterruptWhileWaiting(Node node) {
return Thread.interrupted() ?
(transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
0;
}
final boolean transferAfterCancelledWait(Node node) {
if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
enq(node);
return true;
}
while (!isOnSyncQueue(node))
Thread.yield();
return false;
}
在當(dāng)前情況下Thread.interrupted()
的值為true
,接下來進步一判斷中斷調(diào)用transferAfterCancelledWait
.而當(dāng)前節(jié)點在是不是在條件隊列.因為如果當(dāng)前節(jié)點在同步隊列它的狀態(tài)一定是CONDITION
.所以compareAndSetWaitStatus(node, Node.CONDITION, 0)
將會成功.接下來則是調(diào)用enq(node)
方法將當(dāng)前節(jié)點放入到同步隊列,需要注意的是這個時候該節(jié)點并沒有從等待隊列刪除.
接下來回到await()
方法內(nèi)部,因為(interruptMode = checkInterruptWhileWaiting(node)) != 0
條件不成立,退出while循環(huán).接下來執(zhí)行的代碼如下:
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null)
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
上面的代碼就是我們之前說的線程被喚醒后會acquireQueued
方法去搶鎖,而這里的savedState
就是我們之前釋放資源的個數(shù),關(guān)于該方法在之前的文章中有介紹過這里就不再做解釋了,該方法最后的返回值是線程是否中斷.在我們當(dāng)前分析的情況下,interruptMode != THROW_IE
不會成立,程序繼續(xù)向下執(zhí)行.
我們之前說過,在transferAfterCancelledWait
中只是將當(dāng)前節(jié)點加入到同步隊列,但是并沒有對條件隊列中的節(jié)點進行刪除,所以接下來node.nextWaiter != null
成立后要做的事情就是將條件隊列中的節(jié)點刪除掉.這個方法在上面也介紹過這里不做過多解釋.
最后如果線程被中斷過,則進行reportInterruptAfterWait(interruptMode)
處理.
private void reportInterruptAfterWait(int interruptMode)
throws InterruptedException {
if (interruptMode == THROW_IE)
throw new InterruptedException();
else if (interruptMode == REINTERRUPT)
selfInterrupt();
}
經(jīng)過上面整個過程分析,當(dāng)線程還在條件隊列中就被刪除了(即還沒有被signal),會做下面幾件事:
- 將當(dāng)前節(jié)點添加到同步隊列
- 通過
acquireQueued
獲取鎖,沒有獲取到再次掛起 - 獲取到鎖之后,將之前的節(jié)點從條件隊列刪除
- 根據(jù)中斷類型做中斷異常處理.
在同步隊列中斷
與在條件隊列中斷
不同的是,在同步隊列中斷
這種情況要簡單一些,主要差別就在于中間省略了enq
(即從條件隊列同步轉(zhuǎn)移到同步隊列)的操作.
final boolean transferAfterCancelledWait(Node node) {
if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
enq(node);
return true;
}
while (!isOnSyncQueue(node))
Thread.yield();
return false;
}
該情況中compareAndSetWaitStatus(node, Node.CONDITION, 0)
不成立,即意味著節(jié)點已經(jīng)添加到了同步隊列.其實這個時候并不能保證一定添加到同步隊列了,因為可能正在添加到同步隊列.
final boolean transferForSignal(Node node) {
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
Node p = enq(node);
int ws = p.waitStatus;
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread);
return true;
}
在transferForSignal
方法中,可能因為其compareAndSetWaitStatus(node, Node.CONDITION, 0)
執(zhí)行成功,導(dǎo)致transferAfterCancelledWait
中的compareAndSetWaitStatus(node, Node.CONDITION, 0)
執(zhí)行失敗,這就是我們說的可能正在同步隊列.這個時候處理也很簡單,就是自旋判斷節(jié)點是不是成功添加到同步隊列即可.
后面的操作與之前的并沒有太大區(qū)別.最后就是在reportInterruptAfterWait
根據(jù)中斷情況做一個自我中斷的操作.
小結(jié)
通過對Condition
的分析,最后重點也就是下面幾個點:
-
Condition
其實就是一個單向鏈表構(gòu)成的隊列. - 線程調(diào)用
await
就將自己添加到條件隊列,然后釋放自己帶的鎖,如果沒有獲取到鎖就拋出異常. - 線程調(diào)用
signal
就是從條件隊列中獲取一個節(jié)點,然后將該節(jié)點添加到同步隊列讓它搶鎖.