本篇主要介紹ReentrantLock 中 condition的await/signal方法的實(shí)現(xiàn)原理。
使用說明
public void foo() throws InterruptedException {
ReentrantLock reentrantLock = new ReentrantLock();
Condition condition = reentrantLock.newCondition();
reentrantLock.lock();
condition.await();
//....
condition.signal();
reentrantLock.unlock();
}
當(dāng)前線程在獲取到鎖后丈攒,通過await來讓自己進(jìn)入park阻塞狀態(tài)祷杈、加入等待隊(duì)列智袭,并釋放鎖献烦。
signal方法將其他在等待隊(duì)列中曙求,處于park狀態(tài)下的線程喚醒牌借,并嘗試競爭鎖度气。
源碼分析
await() #1
/**
* Implements interruptible condition wait.
* <ol>
* <li> If current thread is interrupted, throw InterruptedException.
* <li> Save lock state returned by {@link #getState}.
* <li> Invoke {@link #release} with saved state as argument,
* throwing IllegalMonitorStateException if it fails.
* <li> Block until signalled or interrupted.
* <li> Reacquire by invoking specialized version of
* {@link #acquire} with saved state as argument.
* <li> If interrupted while blocked in step 4, throw InterruptedException.
* </ol>
*/
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
// 第一部分
Node node = addConditionWaiter();
int savedState = fullyRelease(node);
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
// 第二部分
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
await方法的代碼比較多,可以拆分成兩部分膨报。第一部分如何讓當(dāng)前線程park磷籍。第二部分是線程被unpark后的實(shí)現(xiàn)。
第一部分:
addConditionWaiter()
/** First node of condition queue. */
private transient Node firstWaiter;
/** Last node of condition queue. */
private transient Node lastWaiter;
/**
* Adds a new waiter to wait queue.
* @return its new wait node
*/
private Node addConditionWaiter() {
Node t = lastWaiter;
// If lastWaiter is cancelled, clean out.
if (t != null && t.waitStatus != Node.CONDITION) {
unlinkCancelledWaiters();
t = lastWaiter;
}
Node node = new Node(Thread.currentThread(), Node.CONDITION);
if (t == null)
firstWaiter = node;
else
t.nextWaiter = node;
lastWaiter = node;
return node;
}
主要目的是將線程構(gòu)建成Conditon模式下的Node现柠,加入到隊(duì)列中院领。
首先,隊(duì)列為空够吩,firstWaiter和lastWaiter都為null比然。當(dāng)?shù)谝粋€node創(chuàng)建成功后,firstWaiter和lastWaiter都指向這個node周循。后續(xù)再來節(jié)點(diǎn)强法,則讓node.next 指向新節(jié)點(diǎn),lastWaiter也指向新節(jié)點(diǎn)湾笛。如此構(gòu)建一個帶有頭尾指針的單向鏈表饮怯。
再看方法里第二行的if判斷,因?yàn)檫M(jìn)入到condition隊(duì)列的node一定都是condition(-2)狀態(tài)嚎研,如果不是蓖墅,則說明當(dāng)前node所屬線程已經(jīng)處理了其他的邏輯。一般是cancel狀態(tài)临扮。此時要從鏈表中去掉cancel態(tài)的節(jié)點(diǎn)论矾。
/** waitStatus value to indicate thread has cancelled */
static final int CANCELLED = 1;
/** waitStatus value to indicate successor's thread needs unparking */
static final int SIGNAL = -1;
/** waitStatus value to indicate thread is waiting on condition */
static final int CONDITION = -2;
/**
* waitStatus value to indicate the next acquireShared should
* unconditionally propagate
*/
static final int PROPAGATE = -3;
unlinkCancelledWaiters
/**
* Unlinks cancelled waiter nodes from condition queue.
* Called only while holding lock. This is called when
* cancellation occurred during condition wait, and upon
* insertion of a new waiter when lastWaiter is seen to have
* been cancelled. This method is needed to avoid garbage
* retention in the absence of signals. So even though it may
* require a full traversal, it comes into play only when
* timeouts or cancellations occur in the absence of
* signals. It traverses all nodes rather than stopping at a
* particular target to unlink all pointers to garbage nodes
* without requiring many re-traversals during cancellation
* storms.
*/
private void unlinkCancelledWaiters() {
Node t = firstWaiter;
Node trail = null;
while (t != null) {
Node next = t.nextWaiter;
if (t.waitStatus != Node.CONDITION) {
t.nextWaiter = null;
if (trail == null)
firstWaiter = next;
else
trail.nextWaiter = next;
if (next == null)
lastWaiter = trail;
}
else
trail = t;
t = next;
}
}
總而言之做了一件事,將非condition狀態(tài)的node從鏈表中去掉杆勇。此時lastWaiter一定是condition狀態(tài)拇囊,賦值給t。
fullyRelease()
/**
* Invokes release with current state value; returns saved state.
* Cancels node and throws exception on failure.
* @param node the condition node for this wait
* @return previous sync state
*/
final int fullyRelease(Node node) {
boolean failed = true;
try {
int savedState = getState();
if (release(savedState)) {
failed = false;
return savedState;
} else {
throw new IllegalMonitorStateException();
}
} finally {
if (failed)
node.waitStatus = Node.CANCELLED;
}
}
/**
* Releases in exclusive mode. Implemented by unblocking one or
* more threads if {@link #tryRelease} returns true.
* This method can be used to implement method {@link Lock#unlock}.
*
* @param arg the release argument. This value is conveyed to
* {@link #tryRelease} but is otherwise uninterpreted and
* can represent anything you like.
* @return the value returned from {@link #tryRelease}
*/
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
savedState表示的是重入的次數(shù)靶橱,可能1次,也可能多次,這里一次性全部釋放掉关霸,將全局的state=0传黄,exclusiveOwnerThread=null。并且通過unparkSuccessor獲取同步隊(duì)列中的下一個node队寇。具體過程已經(jīng)在ReentrantLock源碼分析中做了說明膘掰。
簡而言之就是當(dāng)前線程釋放鎖,讓同步隊(duì)列的下一個node開始搶占佳遣。
isOnSyncQueue()
/**
* Returns true if a node, always one that was initially placed on
* a condition queue, is now waiting to reacquire on sync queue.
* @param node the node
* @return true if is reacquiring
*/
final boolean isOnSyncQueue(Node node) {
if (node.waitStatus == Node.CONDITION || node.prev == null)
return false;
if (node.next != null) // If has successor, it must be on queue
return true;
/*
* node.prev can be non-null, but not yet on queue because
* the CAS to place it on queue can fail. So we have to
* traverse from tail to make sure it actually made it. It
* will always be near the tail in calls to this method, and
* unless the CAS failed (which is unlikely), it will be
* there, so we hardly ever traverse much.
*/
return findNodeFromTail(node);
}
/**
* Returns true if node is on sync queue by searching backwards from tail.
* Called only when needed by isOnSyncQueue.
* @return true if present
*/
private boolean findNodeFromTail(Node node) {
Node t = tail;
for (;;) {
if (t == node)
return true;
if (t == null)
return false;
t = t.prev;
}
}
此時判斷node是不是已經(jīng)在sync隊(duì)列中识埋,判斷的標(biāo)準(zhǔn)是waitStatus、prev和next零渐,以及從tail倒序查找窒舟。
這里關(guān)于倒序查詢有很大一段注釋,大意是說單純判斷node.prev是not null诵盼,并不能代表在node已經(jīng)在sync隊(duì)列中惠豺。需要從sync隊(duì)列中的tail倒序查詢,并且說明了node大概率在tail附近风宁,不會有太多性能損耗洁墙。
cas在替換prev時可能失敗,也就是我下面貼的入隊(duì)的方法實(shí)現(xiàn)戒财。因?yàn)閜rev是volatile的热监,會直接可見,但是compareAndSetTail可能會失敗饮寞,從而導(dǎo)致沒有成功入隊(duì)孝扛。
如果node并沒有在sync隊(duì)列中,則被park骂际。
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
至此第一部分說明完成疗琉,當(dāng)前占有鎖的線程被添加到了condition queue中,釋放鎖被處于park狀態(tài)歉铝。
第二部分:
既然線程已經(jīng)被park了盈简,就先說明是如何被unpark的。一般來說我們都是配置signal(signalAll)一起使用太示。先分析下signal().
signal()
/**
* Moves the longest-waiting thread, if one exists, from the
* wait queue for this condition to the wait queue for the
* owning lock.
*
* @throws IllegalMonitorStateException if {@link #isHeldExclusively}
* returns {@code false}
*/
public final void signal() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignal(first);
}
如果condition隊(duì)列里有node柠贤,則開始喚醒。
doSignal()
/**
* Removes and transfers nodes until hit non-cancelled one or
* null. Split out from signal in part to encourage compilers
* to inline the case of no waiters.
* @param first (non-null) the first node on condition queue
*/
private void doSignal(Node first) {
do {
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}
如果first.nextWaiter是null类缤,則說明condition隊(duì)列中只有這個node臼勉,firstWaiter、lastWaiter餐弱、nextWaiter都是null宴霸。
如果后續(xù)還有節(jié)點(diǎn)囱晴,將nextWaiter指向firstWaiter,并斷開first.nextWaiter瓢谢。
重點(diǎn)看下transferForSignal畸写。
transferForSignal()
/**
* Transfers a node from a condition queue onto sync queue.
* Returns true if successful.
* @param node the node
* @return true if successfully transferred (else the node was
* cancelled before signal)
*/
final boolean transferForSignal(Node node) {
/*
* If cannot change waitStatus, the node has been cancelled.
*/
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
/*
* Splice onto queue and try to set waitStatus of predecessor to
* indicate that thread is (probably) waiting. If cancelled or
* attempt to set waitStatus fails, wake up to resync (in which
* case the waitStatus can be transiently and harmlessly wrong).
*/
Node p = enq(node);
int ws = p.waitStatus;
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread);
return true;
}
/**
* Inserts node into queue, initializing if necessary. See picture above.
* @param node the node to insert
* @return node's predecessor
*/
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
顧名思義,這個方法的目的就是將node從condition隊(duì)列轉(zhuǎn)到sync隊(duì)列氓扛。
轉(zhuǎn)移前的狀態(tài)如果不是condition枯芬,說明是cancel,就不再執(zhí)行采郎。成功則繼續(xù)向后執(zhí)行千所,此時當(dāng)前node的waitState=0。
將node節(jié)點(diǎn)enq到sync隊(duì)列中蒜埋,返回前一個node淫痰。
如果前一個node已經(jīng)被取消,或者在cas成signal的過程中失斃砭ァ(也就是可能在設(shè)置過程中cancel)黑界,那就通過unpark將當(dāng)前節(jié)點(diǎn)喚醒(相當(dāng)于被提前喚醒)。
此時皂林,當(dāng)前線程完成signal方法的調(diào)用朗鸠,如果調(diào)用了unpark,則這個線程也被喚醒础倍。兩個線程同時在執(zhí)行烛占。
doSignalAll()
/**
* Removes and transfers all nodes.
* @param first (non-null) the first node on condition queue
*/
private void doSignalAll(Node first) {
lastWaiter = firstWaiter = null;
do {
Node next = first.nextWaiter;
first.nextWaiter = null;
transferForSignal(first);
first = next;
} while (first != null);
}
這個all表示把condition隊(duì)列中的所有node全部transfer到sync隊(duì)列。
至此沟启,signal(signalAll)執(zhí)行完成忆家,transfer或者unpark condition隊(duì)列中的node。
await() #2
無論以什么樣的方式喚醒德迹,await內(nèi)的park線程終究還是會被喚醒芽卿,繼續(xù)向后執(zhí)行。
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
// 第一部分
Node node = addConditionWaiter();
int savedState = fullyRelease(node);
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
// 第二部分
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
先檢查在等待過程中是否中斷過胳搞,如果是卸例,看中斷時機(jī)。
在signal信號前被中斷返回THROW_IE肌毅,已經(jīng)在sync隊(duì)列中返回REINTERRUPT筷转。
/**
* Checks for interrupt, returning THROW_IE if interrupted
* before signalled, REINTERRUPT if after signalled, or
* 0 if not interrupted.
*/
private int checkInterruptWhileWaiting(Node node) {
return Thread.interrupted() ?
(transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
0;
}
/**
* Transfers node, if necessary, to sync queue after a cancelled wait.
* Returns true if thread was cancelled before being signalled.
*
* @param node the node
* @return true if cancelled before the node was signalled
*/
final boolean transferAfterCancelledWait(Node node) {
if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
//如果收到了中斷信號,且當(dāng)前node還在condition隊(duì)列中悬而,則入隊(duì)到sync隊(duì)列呜舒。
enq(node);
return true;
}
/*
* If we lost out to a signal(), then we can't proceed
* until it finishes its enq(). Cancelling during an
* incomplete transfer is both rare and transient, so just
* spin.
*/
while (!isOnSyncQueue(node))
Thread.yield();
return false;
}
acquireQueued()
/**
* Acquires in exclusive uninterruptible mode for thread already in
* queue. Used by condition wait methods as well as acquire.
*
* @param node the node
* @param arg the acquire argument
* @return {@code true} if interrupted while waiting
*/
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
中斷標(biāo)記記錄后,嘗試獲取鎖笨奠,如果沒有達(dá)到條件袭蝗,則再次進(jìn)入park狀態(tài)唤殴。
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
再次被喚醒或者搶占到鎖后,清理一波cancel的condition隊(duì)列到腥。根據(jù)不同的中斷標(biāo)記向上拋出異痴0耍或者返回中斷標(biāo)記。
至此左电,await() 方法也執(zhí)行完成。
總結(jié)
condition的各種await页响、signal的處理結(jié)合了lock和unlock的狀態(tài)篓足。內(nèi)部的很多操作都是需要在獲得鎖的狀態(tài)下執(zhí)行。這也就是為什么await闰蚕、signal需要寫到lock和unlock塊中栈拖。
這四個方法需要整體看。
重點(diǎn)說明
ReentrantLock 內(nèi)部分為了兩個隊(duì)列(sync和condition), 兩種模式(EXCLUSIVE没陡、SHARED)涩哟,五種狀態(tài)(SINGAL, CONDITION, CANCELLED, PROPAGATE, 0)
sync 隊(duì)列是帶有頭尾指針的雙向鏈表,節(jié)點(diǎn)字段是
private transient volatile Node head;
private transient volatile Node tail;
volatile Node prev;
volatile Node next;
condition隊(duì)列是帶有頭尾指針的單鏈表盼玄,節(jié)點(diǎn)字段是
private transient Node firstWaiter;
private transient Node lastWaiter;
Node nextWaiter;
lock()方法本質(zhì)是將未獲得鎖的node加入到sync隊(duì)列
unlock方法本質(zhì)是將sync隊(duì)列的node依次喚醒執(zhí)行贴彼。
await()方法是將node加入到condition隊(duì)列中。
signal()方法是將condition隊(duì)列中的head node(signalAll是全部node)從condition轉(zhuǎn)到sync隊(duì)列埃儿。