什么是AQS?
AQS是AbstractQueuedSynchronizer的縮寫(xiě)还蹲。
AQS是一個(gè)用來(lái)構(gòu)建鎖和同步器的框架,使用AQS能簡(jiǎn)單且高效地構(gòu)造出應(yīng)用廣泛的大量的同步器存淫,比如我們提到的ReentrantLock,Semaphore沼填,其他的諸如ReentrantReadWriteLock桅咆,SynchronousQueue,F(xiàn)utureTask等等皆是基于AQS的倾哺。當(dāng)然轧邪,我們自己也能利用AQS非常輕松容易地構(gòu)造出符合我們自己需求的同步器刽脖。
AQS 核心思想
AQS核心思想是羞海,如果被請(qǐng)求的共享資源空閑,則將當(dāng)前請(qǐng)求資源的線(xiàn)程設(shè)置為有效的工作線(xiàn)程曲管,并且將共享資源設(shè)置為鎖定狀態(tài)却邓。如果被請(qǐng)求的共享資源被占用,那么就需要一套線(xiàn)程阻塞等待以及被喚醒時(shí)鎖分配的機(jī)制院水,這個(gè)機(jī)制AQS是用CLH隊(duì)列鎖實(shí)現(xiàn)的腊徙,即將暫時(shí)獲取不到鎖的線(xiàn)程加入到隊(duì)列中。
CLH(Craig,Landin,and Hagersten)隊(duì)列是一個(gè)虛擬的雙向隊(duì)列(虛擬的雙向隊(duì)列即不存在隊(duì)列實(shí)例檬某,僅存在結(jié)點(diǎn)之間的關(guān)聯(lián)關(guān)系)撬腾。AQS是將每條請(qǐng)求共享資源的線(xiàn)程封裝成一個(gè)CLH鎖隊(duì)列的一個(gè)結(jié)點(diǎn)(Node)來(lái)實(shí)現(xiàn)鎖的分配。
AQS使用一個(gè)int成員變量來(lái)表示同步狀態(tài)恢恼,通過(guò)內(nèi)置的FIFO隊(duì)列來(lái)完成獲取資源線(xiàn)程的排隊(duì)工作民傻。AQS使用CAS對(duì)該同步狀態(tài)進(jìn)行原子操作實(shí)現(xiàn)對(duì)其值的修改。
/**
* The synchronization state.
*/
private volatile int state;
通過(guò)getState,setState,compareAndSetState方法對(duì)state進(jìn)行操作。
protected final int getState() {
return state;
}
protected final void setState(int newState) {
state = newState;
}
protected final boolean compareAndSetState(int expect, int update) {
// See below for intrinsics setup to support this
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
AQS 對(duì)資源的共享方式
AQS定義兩種資源共享方式
- Exclusive(獨(dú)占):只有一個(gè)線(xiàn)程能執(zhí)行漓踢,如ReentrantLock牵署。又可分為公平鎖和非公平鎖:
- 公平鎖:按照線(xiàn)程在隊(duì)列中的排隊(duì)順序,先到者先拿到鎖
- 非公平鎖:當(dāng)線(xiàn)程要獲取鎖時(shí)喧半,無(wú)視隊(duì)列順序直接去搶鎖奴迅,誰(shuí)搶到就是誰(shuí)的
- Share(共享):多個(gè)線(xiàn)程可同時(shí)執(zhí)行,如Semaphore/CountDownLatch挺据。Semaphore取具、CountDownLatCh、 CyclicBarrier扁耐、ReadWriteLock 我們都會(huì)在后面講到者填。
ReentrantReadWriteLock 可以看成是組合式,因?yàn)镽eentrantReadWriteLock也就是讀寫(xiě)鎖允許多個(gè)線(xiàn)程同時(shí)對(duì)某一資源進(jìn)行讀做葵。
不同的自定義同步器爭(zhēng)用共享資源的方式也不同占哟。自定義同步器在實(shí)現(xiàn)時(shí)只需要實(shí)現(xiàn)共享資源 state 的獲取與釋放方式即可,至于具體線(xiàn)程等待隊(duì)列的維護(hù)(如獲取資源失敗入隊(duì)/喚醒出隊(duì)等)酿矢,AQS已經(jīng)在上層已經(jīng)幫我們實(shí)現(xiàn)好了榨乎。
AbstractQueuedSynchronizer數(shù)據(jù)結(jié)構(gòu)
AbstractQueuedSynchronizer類(lèi)底層的數(shù)據(jù)結(jié)構(gòu)是使用CLH(Craig,Landin,and Hagersten)隊(duì)列
是一個(gè)虛擬的雙向隊(duì)列(虛擬的雙向隊(duì)列即不存在隊(duì)列實(shí)例,僅存在結(jié)點(diǎn)之間的關(guān)聯(lián)關(guān)系)瘫筐。AQS是將每條請(qǐng)求共享資源的線(xiàn)程封裝成一個(gè)CLH鎖隊(duì)列的一個(gè)結(jié)點(diǎn)(Node)來(lái)實(shí)現(xiàn)鎖的分配蜜暑。其中Sync queue,即同步隊(duì)列策肝,是雙向鏈表肛捍,包括head結(jié)點(diǎn)和tail結(jié)點(diǎn),head結(jié)點(diǎn)主要用作后續(xù)的調(diào)度之众。而Condition queue不是必須的拙毫,其是一個(gè)單向鏈表,只有當(dāng)使用Condition時(shí)棺禾,才會(huì)存在此單向鏈表缀蹄。并且可能會(huì)有多個(gè)Condition queue。
為什么要使用雙向隊(duì)列膘婶?
假如隊(duì)列是單向的如:Head -> N1 -> N2 -> Tail缺前。出隊(duì)的時(shí)候你要獲取N1很簡(jiǎn)單,Head.next就行了悬襟,入隊(duì)就麻煩了衅码,要遍歷整個(gè)鏈表到N2,然后N2.next = N3;N3.next = Tail脊岳。入隊(duì)的復(fù)雜度就是O(n),而且Tail也失去他的意義逝段。相反雙向鏈表出隊(duì)和入隊(duì)都是O(1)時(shí)間復(fù)雜度筛璧。說(shuō)白了空間換時(shí)間。假設(shè)有10個(gè)Node惹恃,入隊(duì)1次要遍歷10個(gè)節(jié)點(diǎn)夭谤,但是在高并發(fā)場(chǎng)景下,要不停的入隊(duì)出隊(duì)無(wú)數(shù)次巫糙,這樣在性能上的消耗就可觀(guān)了朗儒,如果用雙向隊(duì)列,空間浪費(fèi)始終只有10個(gè)引用所占的空間啊参淹。
AQS的內(nèi)部類(lèi) - Node類(lèi)
static final class Node {
// 模式醉锄,分為共享與獨(dú)占
// 共享模式
static final Node SHARED = new Node();
// 獨(dú)占模式
static final Node EXCLUSIVE = null;
// 結(jié)點(diǎn)狀態(tài)
// CANCELLED,值為1浙值,表示當(dāng)前的線(xiàn)程被取消
// SIGNAL恳不,值為-1,表示當(dāng)前節(jié)點(diǎn)的后繼節(jié)點(diǎn)包含的線(xiàn)程需要運(yùn)行开呐,也就是unpark
// CONDITION烟勋,值為-2,表示當(dāng)前節(jié)點(diǎn)在等待condition筐付,也就是在condition隊(duì)列中
// PROPAGATE卵惦,值為-3,表示當(dāng)前場(chǎng)景下后續(xù)的acquireShared能夠得以執(zhí)行
// 值為0瓦戚,表示當(dāng)前節(jié)點(diǎn)在sync隊(duì)列中沮尿,等待著獲取鎖
static final int CANCELLED = 1;
static final int SIGNAL = -1;
static final int CONDITION = -2;
static final int PROPAGATE = -3;
// 結(jié)點(diǎn)狀態(tài)
volatile int waitStatus;
// 前驅(qū)結(jié)點(diǎn)
volatile Node prev;
// 后繼結(jié)點(diǎn)
volatile Node next;
// 結(jié)點(diǎn)所對(duì)應(yīng)的線(xiàn)程
volatile Thread thread;
// 下一個(gè)等待者
Node nextWaiter;
// 結(jié)點(diǎn)是否在共享模式下等待
final boolean isShared() {
return nextWaiter == SHARED;
}
// 獲取前驅(qū)結(jié)點(diǎn),若前驅(qū)結(jié)點(diǎn)為空较解,拋出異常
final Node predecessor() throws NullPointerException {
// 保存前驅(qū)結(jié)點(diǎn)
Node p = prev;
if (p == null) // 前驅(qū)結(jié)點(diǎn)為空畜疾,拋出異常
throw new NullPointerException();
else // 前驅(qū)結(jié)點(diǎn)不為空,返回
return p;
}
// 無(wú)參構(gòu)造方法
Node() { // Used to establish initial head or SHARED marker
}
// 構(gòu)造方法
Node(Thread thread, Node mode) { // Used by addWaiter
this.nextWaiter = mode;
this.thread = thread;
}
// 構(gòu)造方法
Node(Thread thread, int waitStatus) { // Used by Condition
this.waitStatus = waitStatus;
this.thread = thread;
}
}
AQS的內(nèi)部類(lèi) - ConditionObject類(lèi)
// 內(nèi)部類(lèi)
public class ConditionObject implements Condition, java.io.Serializable {
// 版本號(hào)
private static final long serialVersionUID = 1173984872572414699L;
/** First node of condition queue. */
// condition隊(duì)列的頭節(jié)點(diǎn)
private transient Node firstWaiter;
/** Last node of condition queue. */
// condition隊(duì)列的尾結(jié)點(diǎn)
private transient Node lastWaiter;
/**
* Creates a new {@code ConditionObject} instance.
*/
// 構(gòu)造方法
public ConditionObject() { }
// Internal methods
/**
* Adds a new waiter to wait queue.
* @return its new wait node
*/
// 添加新的waiter到wait隊(duì)列
private Node addConditionWaiter() {
// 保存尾結(jié)點(diǎn)
Node t = lastWaiter;
// If lastWaiter is cancelled, clean out.
if (t != null && t.waitStatus != Node.CONDITION) { // 尾結(jié)點(diǎn)不為空印衔,并且尾結(jié)點(diǎn)的狀態(tài)不為CONDITION
// 清除狀態(tài)為CONDITION的結(jié)點(diǎn)
unlinkCancelledWaiters();
// 將最后一個(gè)結(jié)點(diǎn)重新賦值給t
t = lastWaiter;
}
// 新建一個(gè)結(jié)點(diǎn)
Node node = new Node(Thread.currentThread(), Node.CONDITION);
if (t == null) // 尾結(jié)點(diǎn)為空
// 設(shè)置condition隊(duì)列的頭節(jié)點(diǎn)
firstWaiter = node;
else // 尾結(jié)點(diǎn)不為空
// 設(shè)置為節(jié)點(diǎn)的nextWaiter域?yàn)閚ode結(jié)點(diǎn)
t.nextWaiter = node;
// 更新condition隊(duì)列的尾結(jié)點(diǎn)
lastWaiter = node;
return node;
}
/**
* Removes and transfers nodes until hit non-cancelled one or
* null. Split out from signal in part to encourage compilers
* to inline the case of no waiters.
* @param first (non-null) the first node on condition queue
*/
private void doSignal(Node first) {
// 循環(huán)
do {
if ( (firstWaiter = first.nextWaiter) == null) // 該節(jié)點(diǎn)的nextWaiter為空
// 設(shè)置尾結(jié)點(diǎn)為空
lastWaiter = null;
// 設(shè)置first結(jié)點(diǎn)的nextWaiter域
first.nextWaiter = null;
} while (!transferForSignal(first) &&
(first = firstWaiter) != null); // 將結(jié)點(diǎn)從condition隊(duì)列轉(zhuǎn)移到sync隊(duì)列失敗并且condition隊(duì)列中的頭節(jié)點(diǎn)不為空啡捶,一直循環(huán)
}
/**
* Removes and transfers all nodes.
* @param first (non-null) the first node on condition queue
*/
private void doSignalAll(Node first) {
// condition隊(duì)列的頭節(jié)點(diǎn)尾結(jié)點(diǎn)都設(shè)置為空
lastWaiter = firstWaiter = null;
// 循環(huán)
do {
// 獲取first結(jié)點(diǎn)的nextWaiter域結(jié)點(diǎn)
Node next = first.nextWaiter;
// 設(shè)置first結(jié)點(diǎn)的nextWaiter域?yàn)榭? first.nextWaiter = null;
// 將first結(jié)點(diǎn)從condition隊(duì)列轉(zhuǎn)移到sync隊(duì)列
transferForSignal(first);
// 重新設(shè)置first
first = next;
} while (first != null);
}
/**
* Unlinks cancelled waiter nodes from condition queue.
* Called only while holding lock. This is called when
* cancellation occurred during condition wait, and upon
* insertion of a new waiter when lastWaiter is seen to have
* been cancelled. This method is needed to avoid garbage
* retention in the absence of signals. So even though it may
* require a full traversal, it comes into play only when
* timeouts or cancellations occur in the absence of
* signals. It traverses all nodes rather than stopping at a
* particular target to unlink all pointers to garbage nodes
* without requiring many re-traversals during cancellation
* storms.
*/
// 從condition隊(duì)列中清除狀態(tài)為CANCEL的結(jié)點(diǎn)
private void unlinkCancelledWaiters() {
// 保存condition隊(duì)列頭節(jié)點(diǎn)
Node t = firstWaiter;
Node trail = null;
while (t != null) { // t不為空
// 下一個(gè)結(jié)點(diǎn)
Node next = t.nextWaiter;
if (t.waitStatus != Node.CONDITION) { // t結(jié)點(diǎn)的狀態(tài)不為CONDTION狀態(tài)
// 設(shè)置t節(jié)點(diǎn)的額nextWaiter域?yàn)榭? t.nextWaiter = null;
if (trail == null) // trail為空
// 重新設(shè)置condition隊(duì)列的頭節(jié)點(diǎn)
firstWaiter = next;
else // trail不為空
// 設(shè)置trail結(jié)點(diǎn)的nextWaiter域?yàn)閚ext結(jié)點(diǎn)
trail.nextWaiter = next;
if (next == null) // next結(jié)點(diǎn)為空
// 設(shè)置condition隊(duì)列的尾結(jié)點(diǎn)
lastWaiter = trail;
}
else // t結(jié)點(diǎn)的狀態(tài)為CONDTION狀態(tài)
// 設(shè)置trail結(jié)點(diǎn)
trail = t;
// 設(shè)置t結(jié)點(diǎn)
t = next;
}
}
// public methods
/**
* Moves the longest-waiting thread, if one exists, from the
* wait queue for this condition to the wait queue for the
* owning lock.
*
* @throws IllegalMonitorStateException if {@link #isHeldExclusively}
* returns {@code false}
*/
// 喚醒一個(gè)等待線(xiàn)程。如果所有的線(xiàn)程都在等待此條件当编,則選擇其中的一個(gè)喚醒届慈。在從 await 返回之前徒溪,該線(xiàn)程必須重新獲取鎖忿偷。
public final void signal() {
if (!isHeldExclusively()) // 不被當(dāng)前線(xiàn)程獨(dú)占,拋出異常
throw new IllegalMonitorStateException();
// 保存condition隊(duì)列頭節(jié)點(diǎn)
Node first = firstWaiter;
if (first != null) // 頭節(jié)點(diǎn)不為空
// 喚醒一個(gè)等待線(xiàn)程
doSignal(first);
}
/**
* Moves all threads from the wait queue for this condition to
* the wait queue for the owning lock.
*
* @throws IllegalMonitorStateException if {@link #isHeldExclusively}
* returns {@code false}
*/
// 喚醒所有等待線(xiàn)程臊泌。如果所有的線(xiàn)程都在等待此條件鲤桥,則喚醒所有線(xiàn)程。在從 await 返回之前渠概,每個(gè)線(xiàn)程都必須重新獲取鎖茶凳。
public final void signalAll() {
if (!isHeldExclusively()) // 不被當(dāng)前線(xiàn)程獨(dú)占嫂拴,拋出異常
throw new IllegalMonitorStateException();
// 保存condition隊(duì)列頭節(jié)點(diǎn)
Node first = firstWaiter;
if (first != null) // 頭節(jié)點(diǎn)不為空
// 喚醒所有等待線(xiàn)程
doSignalAll(first);
}
/**
* Implements uninterruptible condition wait.
* <ol>
* <li> Save lock state returned by {@link #getState}.
* <li> Invoke {@link #release} with saved state as argument,
* throwing IllegalMonitorStateException if it fails.
* <li> Block until signalled.
* <li> Reacquire by invoking specialized version of
* {@link #acquire} with saved state as argument.
* </ol>
*/
// 等待,當(dāng)前線(xiàn)程在接到信號(hào)之前一直處于等待狀態(tài)贮喧,不響應(yīng)中斷
public final void awaitUninterruptibly() {
// 添加一個(gè)結(jié)點(diǎn)到等待隊(duì)列
Node node = addConditionWaiter();
// 獲取釋放的狀態(tài)
int savedState = fullyRelease(node);
boolean interrupted = false;
while (!isOnSyncQueue(node)) { //
// 阻塞當(dāng)前線(xiàn)程
LockSupport.park(this);
if (Thread.interrupted()) // 當(dāng)前線(xiàn)程被中斷
// 設(shè)置interrupted狀態(tài)
interrupted = true;
}
if (acquireQueued(node, savedState) || interrupted) //
selfInterrupt();
}
/*
* For interruptible waits, we need to track whether to throw
* InterruptedException, if interrupted while blocked on
* condition, versus reinterrupt current thread, if
* interrupted while blocked waiting to re-acquire.
*/
/** Mode meaning to reinterrupt on exit from wait */
private static final int REINTERRUPT = 1;
/** Mode meaning to throw InterruptedException on exit from wait */
private static final int THROW_IE = -1;
/**
* Checks for interrupt, returning THROW_IE if interrupted
* before signalled, REINTERRUPT if after signalled, or
* 0 if not interrupted.
*/
private int checkInterruptWhileWaiting(Node node) {
return Thread.interrupted() ?
(transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
0;
}
/**
* Throws InterruptedException, reinterrupts current thread, or
* does nothing, depending on mode.
*/
private void reportInterruptAfterWait(int interruptMode)
throws InterruptedException {
if (interruptMode == THROW_IE)
throw new InterruptedException();
else if (interruptMode == REINTERRUPT)
selfInterrupt();
}
/**
* Implements interruptible condition wait.
* <ol>
* <li> If current thread is interrupted, throw InterruptedException.
* <li> Save lock state returned by {@link #getState}.
* <li> Invoke {@link #release} with saved state as argument,
* throwing IllegalMonitorStateException if it fails.
* <li> Block until signalled or interrupted.
* <li> Reacquire by invoking specialized version of
* {@link #acquire} with saved state as argument.
* <li> If interrupted while blocked in step 4, throw InterruptedException.
* </ol>
*/
// // 等待筒狠,當(dāng)前線(xiàn)程在接到信號(hào)或被中斷之前一直處于等待狀態(tài)
public final void await() throws InterruptedException {
if (Thread.interrupted()) // 當(dāng)前線(xiàn)程被中斷,拋出異常
throw new InterruptedException();
// 在wait隊(duì)列上添加一個(gè)結(jié)點(diǎn)
Node node = addConditionWaiter();
//
int savedState = fullyRelease(node);
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
// 阻塞當(dāng)前線(xiàn)程
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) // 檢查結(jié)點(diǎn)等待時(shí)的中斷類(lèi)型
break;
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
/**
* Implements timed condition wait.
* <ol>
* <li> If current thread is interrupted, throw InterruptedException.
* <li> Save lock state returned by {@link #getState}.
* <li> Invoke {@link #release} with saved state as argument,
* throwing IllegalMonitorStateException if it fails.
* <li> Block until signalled, interrupted, or timed out.
* <li> Reacquire by invoking specialized version of
* {@link #acquire} with saved state as argument.
* <li> If interrupted while blocked in step 4, throw InterruptedException.
* </ol>
*/
// 等待箱沦,當(dāng)前線(xiàn)程在接到信號(hào)辩恼、被中斷或到達(dá)指定等待時(shí)間之前一直處于等待狀態(tài)
public final long awaitNanos(long nanosTimeout)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
Node node = addConditionWaiter();
int savedState = fullyRelease(node);
final long deadline = System.nanoTime() + nanosTimeout;
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
if (nanosTimeout <= 0L) {
transferAfterCancelledWait(node);
break;
}
if (nanosTimeout >= spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanosTimeout);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
nanosTimeout = deadline - System.nanoTime();
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null)
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
return deadline - System.nanoTime();
}
/**
* Implements absolute timed condition wait.
* <ol>
* <li> If current thread is interrupted, throw InterruptedException.
* <li> Save lock state returned by {@link #getState}.
* <li> Invoke {@link #release} with saved state as argument,
* throwing IllegalMonitorStateException if it fails.
* <li> Block until signalled, interrupted, or timed out.
* <li> Reacquire by invoking specialized version of
* {@link #acquire} with saved state as argument.
* <li> If interrupted while blocked in step 4, throw InterruptedException.
* <li> If timed out while blocked in step 4, return false, else true.
* </ol>
*/
// 等待,當(dāng)前線(xiàn)程在接到信號(hào)谓形、被中斷或到達(dá)指定最后期限之前一直處于等待狀態(tài)
public final boolean awaitUntil(Date deadline)
throws InterruptedException {
long abstime = deadline.getTime();
if (Thread.interrupted())
throw new InterruptedException();
Node node = addConditionWaiter();
int savedState = fullyRelease(node);
boolean timedout = false;
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
if (System.currentTimeMillis() > abstime) {
timedout = transferAfterCancelledWait(node);
break;
}
LockSupport.parkUntil(this, abstime);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null)
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
return !timedout;
}
/**
* Implements timed condition wait.
* <ol>
* <li> If current thread is interrupted, throw InterruptedException.
* <li> Save lock state returned by {@link #getState}.
* <li> Invoke {@link #release} with saved state as argument,
* throwing IllegalMonitorStateException if it fails.
* <li> Block until signalled, interrupted, or timed out.
* <li> Reacquire by invoking specialized version of
* {@link #acquire} with saved state as argument.
* <li> If interrupted while blocked in step 4, throw InterruptedException.
* <li> If timed out while blocked in step 4, return false, else true.
* </ol>
*/
// 等待灶伊,當(dāng)前線(xiàn)程在接到信號(hào)、被中斷或到達(dá)指定等待時(shí)間之前一直處于等待狀態(tài)寒跳。此方法在行為上等效于: awaitNanos(unit.toNanos(time)) > 0
public final boolean await(long time, TimeUnit unit)
throws InterruptedException {
long nanosTimeout = unit.toNanos(time);
if (Thread.interrupted())
throw new InterruptedException();
Node node = addConditionWaiter();
int savedState = fullyRelease(node);
final long deadline = System.nanoTime() + nanosTimeout;
boolean timedout = false;
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
if (nanosTimeout <= 0L) {
timedout = transferAfterCancelledWait(node);
break;
}
if (nanosTimeout >= spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanosTimeout);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
nanosTimeout = deadline - System.nanoTime();
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null)
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
return !timedout;
}
// support for instrumentation
/**
* Returns true if this condition was created by the given
* synchronization object.
*
* @return {@code true} if owned
*/
final boolean isOwnedBy(AbstractQueuedSynchronizer sync) {
return sync == AbstractQueuedSynchronizer.this;
}
/**
* Queries whether any threads are waiting on this condition.
* Implements {@link AbstractQueuedSynchronizer#hasWaiters(ConditionObject)}.
*
* @return {@code true} if there are any waiting threads
* @throws IllegalMonitorStateException if {@link #isHeldExclusively}
* returns {@code false}
*/
// 查詢(xún)是否有正在等待此條件的任何線(xiàn)程
protected final boolean hasWaiters() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
for (Node w = firstWaiter; w != null; w = w.nextWaiter) {
if (w.waitStatus == Node.CONDITION)
return true;
}
return false;
}
/**
* Returns an estimate of the number of threads waiting on
* this condition.
* Implements {@link AbstractQueuedSynchronizer#getWaitQueueLength(ConditionObject)}.
*
* @return the estimated number of waiting threads
* @throws IllegalMonitorStateException if {@link #isHeldExclusively}
* returns {@code false}
*/
// 返回正在等待此條件的線(xiàn)程數(shù)估計(jì)值
protected final int getWaitQueueLength() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
int n = 0;
for (Node w = firstWaiter; w != null; w = w.nextWaiter) {
if (w.waitStatus == Node.CONDITION)
++n;
}
return n;
}
/**
* Returns a collection containing those threads that may be
* waiting on this Condition.
* Implements {@link AbstractQueuedSynchronizer#getWaitingThreads(ConditionObject)}.
*
* @return the collection of threads
* @throws IllegalMonitorStateException if {@link #isHeldExclusively}
* returns {@code false}
*/
// 返回包含那些可能正在等待此條件的線(xiàn)程集合
protected final Collection<Thread> getWaitingThreads() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
ArrayList<Thread> list = new ArrayList<Thread>();
for (Node w = firstWaiter; w != null; w = w.nextWaiter) {
if (w.waitStatus == Node.CONDITION) {
Thread t = w.thread;
if (t != null)
list.add(t);
}
}
return list;
}
}
未完待續(xù)...