一 簡介-API 文檔閱讀
提供一個基于 FIFO 等待隊列實現(xiàn)阻塞鎖和相關(guān)同步器的基礎(chǔ)框架。這個類被設(shè)計用于依靠一個原子類 int 值表示狀態(tài)并成為大多數(shù)同步器的基礎(chǔ)類磕仅。通過繼承這個類并實現(xiàn)必要的方法來使用它钠龙。子類通過維護內(nèi)部的 state 字段來獲取和更新線程狀態(tài)蝗拿。這一系列方法包含 getState/setState 等。API 文檔推薦我們在使用這個類的時候样屠,將其作為目標(biāo)類的一個內(nèi)部類來實現(xiàn)穿撮,并且也不對外公開。
這個類支持默認的獨占(exclusive)模式或者共享(share)模式痪欲,或者同時支持這兩種模式悦穿。在獨占模式下獲取到鎖后,其他嘗試獲取鎖的線程都會失敗业踢。在共享模式下多個線程嘗試獲取鎖或許會成功栗柒。
在共享模式下一個線程獲取鎖成功,后續(xù)存在的等待線程在獲取鎖的時候還是要重復(fù)判斷一下是否可以獲取鎖知举。在不同的模式下等待的線程共享了同一個 FIFO 隊列瞬沦。 一般來說,這個框架接口的實現(xiàn)類只會實現(xiàn)一種模式(共享或者獨占)雇锡。支持單一模式的類不需要實現(xiàn)另一個模式下需要實現(xiàn)的方法逛钻。
AQS 內(nèi)部包含一個實現(xiàn)了 Condition 接口的 ConditionObject 內(nèi)部類。該類內(nèi)部維護了一個 Condition 的 Node 隊列遮糖。包含 Node-firstWaiter 與 Node-lastWaiter绣的。一系列的內(nèi)部私有方法用于對這個隊列的維護和操作。
private Node addConditionWaiter()
添加一個 Node 節(jié)點到當(dāng)前 Condition 隊列的隊尾。
AQS 同步器是 java.util.concurrent 并發(fā)包下許多工具類的基礎(chǔ)框架屡江。包括 RetreensLock芭概。
public abstract class AbstractQueuedLongSynchronizer
extends AbstractOwnableSynchronizer
implements java.io.Serializable {
AQS 類的編寫用到了 模板方法設(shè)計模式,由抽象類定義同步器需要實現(xiàn)的方法惩嘉,具體的鎖或者基于 AQS 的同步器在繼承了這個類之后要按要求實現(xiàn)指定的方法罢洲。
使用方式
如果想要以 AQS 類作為基類,需要重新定義如下的系列方法文黎,這系列方法用戶獲取/設(shè)置 state 的值
- tryAcquire
- tryRelease
- tryAcquireShared
- tryReleaseShared
- isHeldExclusively
二 部分代碼閱讀
2.1 acquire(int arg)
在獨占模式下多個線程競爭同一個對象監(jiān)視器的時候惹苗,首先的第一步就是去獲取監(jiān)視器,這里會調(diào)用到這個方法(比如 ReentrantLock)
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
先從整體的邏輯說一下這個方法及其內(nèi)部調(diào)用的一系列方法的邏輯:
多個線程進入耸峭,爭用對象監(jiān)視器桩蓉。爭用成功即持有鎖,失敗的會先加入內(nèi)部維護的等待隊列劳闹,這個隊列是一個雙向的 Node 鏈表院究。
獨占模式下獲取對象監(jiān)視器的方法,首先執(zhí)行 tryAcquire()本涕,如果獲取失敗就將當(dāng)前線程加入隊列中业汰,加入隊列之后再次嘗試獲取鎖。
在 AQS 中沒有給出具體的 tryAcquire 方法實現(xiàn)邏輯菩颖,抽象類中的方法中僅僅以拋出異常來結(jié)束样漆。所以這個方法的具體實現(xiàn),要依賴使用 AQS 框架的人晦闰。這里僅僅是給出了方法范式放祟。
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
依賴于 AQS 框架的實現(xiàn),就是在 tryAcquire 里面去獲取/修改 AQS 內(nèi)部維護的 state 狀態(tài)值鹅髓,基本上都是使用 cas 方式來更新鎖標(biāo)識字段舞竿。
在直接調(diào)用 tryAcquire 方法失敗后京景,就需要將當(dāng)前線程加入 FIFO 的等待隊列窿冯,這里調(diào)用了 addWaiter 方法
private Node addWaiter(Node mode) {
// 將當(dāng)前線程包裝構(gòu)造為一個 node 節(jié)點
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure 先嘗試快速入隊,如果嘗試失敗确徙,還是會執(zhí)行 enq 方法將已經(jīng)包裝好的 node 節(jié)點入隊
// 獲取尾節(jié)點引用并檢查尾節(jié)點是否為 null醒串,如果是則執(zhí)行執(zhí)行 enq 方法入隊,如果不是則執(zhí)行前面注釋的 'fast path of enq'
Node pred = tail;
if (pred != null) {
// 獲取尾節(jié)點的前驅(qū)節(jié)點
node.prev = pred;
// 嘗試使用 cas 方式將包裝的節(jié)點以尾插法加入隊列
if (compareAndSetTail(pred, node)) {
// 入隊成功鄙皇,調(diào)整 sync 隊列尾節(jié)點位置并返回已經(jīng)包裝好的 node 節(jié)點
pred.next = node;
return node;
}
}
// 走到這里芜赌,則說明前面的快速入隊策略沒有成功,那么最終還是要執(zhí)行 enq 方法入隊
enq(node);
return node;
}
此處傳入的 mode 參數(shù)是 獨占模式伴逸,然后將當(dāng)前線程構(gòu)造成一個 node 節(jié)點缠沈。這里加入等待隊列有兩種策略,第一種當(dāng) tail 節(jié)點不為空的時候,直接嘗試使用 CAS 方式將新節(jié)點加入到隊尾洲愤;當(dāng) tail 為空颓芭,則調(diào)用了 enq 入隊方法,總結(jié)一下柬赐,進入到 enq 方法的場景列表:
- 尾節(jié)點為空亡问,即整個 sync 隊列還未初始化
- cas 尾插法將節(jié)點加入隊列失敗
private Node enq(final Node node) {
// 死循環(huán),即要求入隊操作一定要成功
for (;;) {
// 獲取尾節(jié)點
Node t = tail;
// 如果尾節(jié)點為空肛宋,則執(zhí)行 sync 隊列的初始化
if (t == null) { // Must
// 創(chuàng)建一個新的 node 節(jié)點作為初始化 sync 隊列的頭節(jié)點
if (compareAndSetHead(new Node()))
// 將 tail 與 head 指向這個頭節(jié)點州藕,然后又會進入下一輪 for 循環(huán)
tail = head;
} else {
// 尾節(jié)點不為空,可能是進入方法的時候就不為空了酝陈,也可能是進入方法后第一輪循環(huán)初始化創(chuàng)建了一個節(jié)點床玻,將要插入的 node 節(jié)點的前驅(qū)引用指向已經(jīng)存在的尾節(jié)點
node.prev = t;
// cas 方式尾插法加入節(jié)點,注意如果 cas 方式執(zhí)行失敗了沉帮,會再次進入 for 循環(huán)然后嘗試入隊笨枯,直到最后嘗試成功
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
這里是 CAS+自旋 的方式構(gòu)建等待線程隊列。這種構(gòu)建方式因為使用了自旋遇西,所以會有不確定時長的自旋時間馅精,比起前面代碼自己注釋的 快速入隊 要稍慢。不過這種方式也比掛起線程更快粱檀,同時也減少了上下文切換洲敢,節(jié)約了資源。
需要注意的是茄蚯,node 隊列的頭節(jié)點是不包含任何線程的压彭,是一個虛擬節(jié)點。在隊列初始化的時候渗常,會由無參構(gòu)造創(chuàng)建一個 node 節(jié)點作為頭節(jié)點壮不。當(dāng)有后續(xù)節(jié)點成為頭節(jié)點的時候(成為頭節(jié)點,其內(nèi)的線程就已經(jīng)持有了鎖皱碘,然后會被 AQS 清除持有線程的引用)询一。
前面的邏輯已經(jīng)將當(dāng)前線程加入到 sync 隊列中,現(xiàn)在檢查線程的中斷狀態(tài)并再次嘗試獲取獨占鎖
// 傳入以當(dāng)前節(jié)點為參數(shù)構(gòu)造的 node 節(jié)點
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
// 獲取 node 的前驅(qū)節(jié)點
final Node p = node.predecessor();
// 如果當(dāng)前 node 的前驅(qū)節(jié)點是頭節(jié)點并嘗試獲取(改變)狀態(tài)
if (p == head && tryAcquire(arg)) {
// tryAcquire 成功癌椿,表明當(dāng)前節(jié)點已經(jīng)成功獲取到鎖健蕊,更新頭節(jié)點關(guān)系
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
// 當(dāng)前節(jié)點的前驅(qū)節(jié)點不是頭節(jié)點或者 tryAcquire 失敗,掛起當(dāng)前線程
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
// 方法最終退出前檢查踢俄,這是異常狀態(tài)下的進入缩功,取消獲取鎖的相關(guān)操作
if (failed)
cancelAcquire(node);
}
}
入隊后的線程節(jié)點進入方法 acquireQueued(Final Node node, int arg),在這個方法中依然存在一個死循環(huán)都办。
獲取當(dāng)前節(jié)點的前置節(jié)點嫡锌,檢查當(dāng)這個前置節(jié)點是不是頭結(jié)點(頭結(jié)點即意味著這個節(jié)點正持有鎖)虑稼,然后當(dāng)前 Node 節(jié)點嘗試獲取對象監(jiān)視器,如果獲取成功势木,表示當(dāng)前節(jié)點成功獲取到對象監(jiān)視器动雹,將當(dāng)前節(jié)點設(shè)置為新的頭結(jié)點。同時置空前置節(jié)點的后繼引用(方便 GC 回收)跟压。
方法 shouldParkAfterFailedAcquire 會根據(jù)情況阻塞線程胰蝠,這里又要回顧一下之前的 status 字段各個值的含義,方法內(nèi)判斷的依據(jù)就是根據(jù) status 字段震蒋。
SIGNAL = -1:當(dāng)前線程的后繼線程需要被unpark(喚醒);
/**
* 判斷前驅(qū)節(jié)點的 status 狀態(tài)值是否是 SIGNAL茸塞,如果是則返回 true,否則返回false查剖。
*/
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus; // 前驅(qū)節(jié)點線程的狀態(tài)標(biāo)識字段值
if (ws == Node.SIGNAL) // 當(dāng)前線程的后繼線程需要被unpark(喚醒)钾虐,直接返回 true
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
return true;
if (ws > 0) { // 當(dāng)前線程被取消(前驅(qū)節(jié)點所搭載的線程)
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
do { // 回溯前驅(qū)節(jié)點的前驅(qū)節(jié)點,直到找到一個狀態(tài)值為 非取消 狀態(tài)的節(jié)點
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node; // 找到一個 status 為 非取消 狀態(tài)的節(jié)點后笋庄,將這個節(jié)點的后繼節(jié)點指向當(dāng)前的節(jié)點
} else { // 其他情況就將傳入的前驅(qū)節(jié)點狀態(tài)置為 SIGNAL 狀態(tài)
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
方法 parkAndCheckInterrupt 會阻塞當(dāng)前線程效扫,并且返回“線程被喚醒之后”的中斷狀態(tài)
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
注意, 這里使用的方法是
interrupted()
在調(diào)用這個方法的適合直砂,會清除線程的標(biāo)識位菌仁,這樣在下一次喚醒該線程的時候才會生效。
2.2 release(int arg)
public final boolean release(int arg) {
// 嘗試釋放鎖
if (tryRelease(arg)) {
// 獲取頭節(jié)點
Node h = head;
// 頭節(jié)點不為空且頭節(jié)點的節(jié)點狀態(tài)不為0(不為0表示這個節(jié)點不是初始化虛擬節(jié)點)
if (h != null && h.waitStatus != 0)
// 修改節(jié)點 status 字段并喚醒等待線程
unparkSuccessor(h);
return true;
}
// 釋放鎖失敗静暂,返回 false
return false;
}
和 acquire 一樣济丘,release 方法會先嘗試調(diào)用 tryRelease 方法來釋放鎖,但是這個方法的實現(xiàn)并不在 AQS 內(nèi)洽蛀,而是實現(xiàn)了 AQS 的類來填充這個方法摹迷。
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
// 因為要釋放當(dāng)前節(jié)點,所以先更新節(jié)點 status 字段為初始化值 0
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
// 獲取一個可用的后繼節(jié)點(節(jié)點可用的條件是節(jié)點 status 字段不為 cancel 狀態(tài))郊供,注意這里是從尾節(jié)點從后向前嘗試喚醒峡碉。
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}