Condition是java.util.concurrent.locks包下的類,提供了對線程鎖的更精細(xì)的控制方法,下面我們就來看一下Java多線程編程中使用Condition類操作鎖的方法詳解
引用網(wǎng)上的話:
Condition的作用是對鎖進(jìn)行更精確的控制希坚。Condition中的await()方法相當(dāng)于Object的wait()方法哀蘑,Condition中的signal()方法相當(dāng)于Object的notify()方法悬嗓,Condition中的signalAll()相當(dāng)于Object的notifyAll()方法。不同的是读串,Object中的wait(),notify(),notifyAll()方法是和"同步鎖"(synchronized關(guān)鍵字)捆綁使用的;而Condition是需要與"互斥鎖"/"共享鎖"捆綁使用的。
先來復(fù)習(xí)下上面的幾個知識點(diǎn),知道的同學(xué)可以字節(jié)略過念赶。
wait,notify恰力,notifyAll 是定義在Object類的實(shí)例方法叉谜,用于控制線程狀態(tài)。
三個方法都必須在synchronized 同步關(guān)鍵字所限定的作用域中調(diào)用踩萎,否則會報(bào)錯java.lang.IllegalMonitorStateException 停局,意思是因?yàn)闆]有同步,所以線程對對象鎖的狀態(tài)是不確定的香府,不能調(diào)用這些方法董栽。
- wait 表示持有對象鎖的線程A準(zhǔn)備釋放對象鎖權(quán)限,釋放cpu資源并進(jìn)入等待企孩。
- notify 表示持有對象鎖的線程A準(zhǔn)備釋放對象鎖權(quán)限锭碳,通知jvm喚醒某個競爭該對象鎖的線程X。線程A synchronized 代碼作用域結(jié)束后勿璃,線程X直接獲得對象鎖權(quán)限擒抛,其他競爭線程繼續(xù)等待(即使線程X同步完畢,釋放對象鎖补疑,其他競爭線程仍然等待歧沪,直至有新的notify ,notifyAll被調(diào)用)。
- notifyAll 表示持有對象鎖的線程A準(zhǔn)備釋放對象鎖權(quán)限莲组,通知jvm喚醒所有競爭該對象鎖的線程诊胞,線程A synchronized 代碼作用域結(jié)束后,jvm通過算法將對象鎖權(quán)限指派給某個線程X胁编,所有被喚醒的線程不再等待厢钧。線程X synchronized 代碼作用域結(jié)束后,之前所有被喚醒的線程都有可能獲得該對象鎖權(quán)限嬉橙,這個由JVM算法決定早直。
說到這里,順便說一下在網(wǎng)上遇到的 問題,為什么wait
市框、notify
霞扬、nofityAll
是在Object
中而不是Thread
中,看了些網(wǎng)上說的,感覺說的有道理,但是又聽不懂,網(wǎng)上的回答如下:
一種回答是:
這個涉及到wait什么的問題。
等的是某個對象上的鎖喻圃,大家(多個線程)競爭這個鎖萤彩,是吧,
wait和notify方法都放到目標(biāo)對象上斧拍,那這個對象上可以維護(hù)線程的隊(duì)列雀扶,可以對相關(guān)線程進(jìn)行調(diào)度。
(方法和方法所操縱的數(shù)據(jù)要在一起)
如果將wait方法和線程隊(duì)列都放到Thread中肆汹,那么就必然要求某個Thread知道其他所有Thread的信息愚墓,(大家都要相互知道),這合理嗎昂勉?很容易出問題的浪册。……
另一種回答是:
簡單說:因?yàn)閟ynchronized中的這把鎖可以是任意對象岗照,所以任意對象都可以調(diào)用wait()和notify()村象;所以wait和notify屬于Object。
專業(yè)說:因?yàn)檫@些方法在操作同步線程時(shí)攒至,都必須要標(biāo)識它們操作線程的鎖厚者,只有同一個鎖上的被等待線程,可以被同一個鎖上的notify喚醒嗓袱,不可以對不同鎖中的線程進(jìn)行喚醒籍救。
也就是說,等待和喚醒必須是同一個鎖渠抹。而鎖可以是任意對象蝙昙,所以可以被任意對象調(diào)用的方法是定義在object類中。
在jdk1.5以后梧却,將同步synchronized替換成了Lock奇颠,將同步鎖對象換成了Condition對象,并且Condition對象可以有多個放航,這樣可以解決一個問題烈拒。
我結(jié)合上面的回答,說一下我自己的理解:
`notify`和`notifyAll`都是基于`wait`的,所以只分析wiat即可
首先,wait是在同步塊中的,同步塊中可能會鎖住一個方法或者一個對象,
正常情況不調(diào)用wait時(shí),會等到同步塊中的代碼執(zhí)行完畢之后會釋放鎖,
同步方法這里不多說,主要是同步鎖對象,如果鎖住一個對象,
那其他線程在獲取這個對象的時(shí)候就會等待阻塞,
可能是因?yàn)閟ynchronized中有特殊的處理,
如果在同步代碼塊代碼未執(zhí)行完畢就要釋放鎖的話,就會調(diào)用wait,
但是為什么會是object.wait呢,首先如果是Thread.wait,
我們知道wait的作用是使當(dāng)前持有該鎖的線程釋放鎖,
并使其他等待該鎖的線程獲取鎖,怎么通知其他等待該鎖的線程呢,就是通過對象本身
下面我們來看Condition
Condition
是一個接口 里面的方法比較少 我們先來簡單做個分析
atait() 使當(dāng)前線程釋放鎖,并處于等待狀態(tài)
aiait(long time,TimeUnit unit) 使當(dāng)前線程等待指定的時(shí)間,第二個參數(shù)為時(shí)間單位
awaitNanos() 同上
awaitUninterruptibly() 使當(dāng)前線程處于等待狀態(tài)
awaitUntil(Data deadline) 造成當(dāng)前線程在接到信號广鳍、被中斷或到達(dá)指定最后期限之前一直處于等待狀態(tài),參數(shù)為最后的時(shí)間期限
signal() 隨機(jī)喚醒一個等待線程
signalAll() 喚醒所有的等待線程
在AbstractQueuedSynchronizer
中的內(nèi)部類ConditionObject
實(shí)現(xiàn)了這個接口
await()
AbstractQueuedSynchronizer#ConditionObject#await()
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
Node node = addConditionWaiter();
int savedState = fullyRelease(node);
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
首先調(diào)用了addConditionWaiter()
AbstractQueuedSynchronizer#ConditionObject#addConditionWaiter()
private Node addConditionWaiter() {
Node t = lastWaiter;
// If lastWaiter is cancelled, clean out.
if (t != null && t.waitStatus != Node.CONDITION) {
unlinkCancelledWaiters();
t = lastWaiter;
}
Node node = new Node(Node.CONDITION);
if (t == null)
firstWaiter = node;
else
t.nextWaiter = node;
lastWaiter = node;
return node;
}
lastWaiter
是Condition
中的屬性,Condition
也維護(hù)了一個隊(duì)列,所有調(diào)用await
的線程都會進(jìn)入這個隊(duì)列,
首先獲取到隊(duì)列的最后一個節(jié)點(diǎn),然后根據(jù)此節(jié)點(diǎn)的狀態(tài)判讀對應(yīng)的線程有沒有被取消荆几,如果沒有則會進(jìn)入unlinkCancelledWaiters()
private void unlinkCancelledWaiters() {
Node t = firstWaiter;
Node trail = null;
while (t != null) {
Node next = t.nextWaiter;
if (t.waitStatus != Node.CONDITION) {
t.nextWaiter = null;
if (trail == null)
firstWaiter = next;
else
trail.nextWaiter = next;
if (next == null)
lastWaiter = trail;
}
else
trail = t;
t = next;
}
}
這個方法的作用是重新梳理隊(duì)列,剔除隊(duì)列中不是處于等待狀態(tài)的節(jié)點(diǎn),回到addConditionWaiter()
,那么最后t得到的是一個處于等待狀態(tài)的尾節(jié)點(diǎn),然后創(chuàng)建一個阻塞狀態(tài)的空節(jié)點(diǎn),加入隊(duì)列,并返回這個節(jié)點(diǎn),所以addConditionWaiter()
的作用是創(chuàng)建一個空的阻塞節(jié)點(diǎn)加入阻塞隊(duì)列,然后將新建的這個Node
傳入到fullyRelease()
AbstractQueuedSynchronizer#ConditionObject#fullyRelease()
final int fullyRelease(Node node) {
try {
int savedState = getState();
if (release(savedState))
return savedState;
throw new IllegalMonitorStateException();
} catch (Throwable t) {
node.waitStatus = Node.CANCELLED;
throw t;
}
}
首先會獲取當(dāng)前線程持有的鎖的數(shù)量state
,下面的release
方法會調(diào)用到ReentrantLock#Sync#tryRelease()
(這里以ReentrantLock為例),而tryRelease()
則是釋放鎖的一個方法,是持有的鎖數(shù)量減一,成功的條件則是,當(dāng)前線程持有的鎖數(shù)量只有一個.否則拋出異常,這里我們可以猜到,通過awit()
來掛起線程的條件是當(dāng)前線程持有的鎖的數(shù)量只能為1,否則的話當(dāng)前線程的掛起可能會影響其他地方的異常,因?yàn)槠渌胤娇赡懿恍枰蛘呤遣荒軐?dāng)前線程掛起,
所以最后得到的saveState = 0
, 接著看下面的循環(huán),循環(huán)體內(nèi)是將當(dāng)前線程喚醒,而當(dāng)前方法await
則是將當(dāng)前線程掛起,顯然是沖突,所以這里的isOnSyncQueue
只能返回true,才能達(dá)到掛起線程的作用,
AbstractQueuedSynchronizer#isOnSyncQueue():
final boolean isOnSyncQueue(Node node) {
if (node.waitStatus == Node.CONDITION || node.prev == null)
return false;
if (node.next != null) // If has successor, it must be on queue
return true;
/*
* node.prev can be non-null, but not yet on queue because
* the CAS to place it on queue can fail. So we have to
* traverse from tail to make sure it actually made it. It
* will always be near the tail in calls to this method, and
* unless the CAS failed (which is unlikely), it will be
* there, so we hardly ever traverse much.
*/
return findNodeFromTail(node);
}
第一個條件是返回false,內(nèi)容是尾節(jié)點(diǎn)的狀態(tài)為取消狀態(tài),也就是前面要被掛起的線程持有的鎖的數(shù)量不為1,所以即使這個線程被掛起來,也會被喚醒赊时;要么就是尾節(jié)點(diǎn)的前任節(jié)點(diǎn)等于null,我們知道這個尾節(jié)點(diǎn)是我們自己加入的,并不包含任何的線程,所以也就是說,隊(duì)列中只有我們要掛起的線程這一個線程,所以掛起沒有意義,也就返回了false
第二個條件返回了true,也就是尾節(jié)點(diǎn)有后繼節(jié)點(diǎn),所以這時(shí)候會去嘗試喚醒后繼節(jié)點(diǎn),返回了true
最后則是當(dāng)前尾節(jié)點(diǎn)沒有后繼節(jié)點(diǎn),但是有前任節(jié)點(diǎn)就會調(diào)用findNodeFromTail()
private boolean findNodeFromTail(Node node) {
// We check for node first, since it's likely to be at or near tail.
// tail is known to be non-null, so we could re-order to "save"
// one null check, but we leave it this way to help the VM.
for (Node p = tail;;) {
if (p == node)
return true;
if (p == null)
return false;
p = p.prev;
}
}
首先要聲明一點(diǎn)的是,lastwaiter firstWaiter
是Condition
中屬性,tail head
是Node
中的屬性吨铸,即tail
是AbstractQueuedSynchronizer
中等待線程隊(duì)列的尾節(jié)點(diǎn),lastWaiter
是Condition
中掛起線程的隊(duì)列的尾節(jié)點(diǎn).
從等待隊(duì)列的尾節(jié)點(diǎn)開始遍歷,如果尾節(jié)點(diǎn)等于掛起線程的隊(duì)列的尾節(jié)點(diǎn)返回true,否則返回false
現(xiàn)在可以大致的理解下isOnSyncQueue
的作用就是,判斷要被掛起的線程能否被真正的掛起,如果要被掛起的線程是頭結(jié)點(diǎn)或者等待隊(duì)列中沒有其他線程則無法被掛起,接著往下看判斷體內(nèi)的代碼
首先是將當(dāng)前線程掛起,此時(shí)當(dāng)前線程也就是執(zhí)行到了這里不再執(zhí)行了,因?yàn)榫€程掛起了,只有等到線程被喚醒的時(shí)候,才會從park
方法中返回
AbstractQueuedSynchronizer#checkInterruptWhileWaiting:
private int checkInterruptWhileWaiting(Node node) {
return Thread.interrupted() ?
(transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
0;
}
final boolean transferAfterCancelledWait(Node node) {
if (node.compareAndSetWaitStatus(Node.CONDITION, 0)) {
enq(node);
return true;
}
/*
* If we lost out to a signal(), then we can't proceed
* until it finishes its enq(). Cancelling during an
* incomplete transfer is both rare and transient, so just
* spin.
*/
while (!isOnSyncQueue(node))
Thread.yield();
return false;
}
private Node enq(Node node) {
for (;;) {
Node oldTail = tail;
if (oldTail != null) {
U.putObject(node, Node.PREV, oldTail);
if (compareAndSetTail(oldTail, node)) {
oldTail.next = node;
return oldTail;
}
} else {
initializeSyncQueue();
}
}
}
首先看transferAfterCancelledWait
,當(dāng)初我們再將此線程加入隊(duì)列時(shí),設(shè)置節(jié)點(diǎn)的狀態(tài)默認(rèn)為CONDITION
祖秒,所以這個判斷是成立的,然后進(jìn)入enq
,將喚醒的此線程重新加入到AbstractQueuedSynchronizer
的等待隊(duì)列中.
AbstrQueuedSynchronizer#acquireQueued
final boolean acquireQueued(final Node node, int arg) {
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} catch (Throwable t) {
cancelAcquire(node);
throw t;
}
}
之后進(jìn)入正常的流程中,先判斷被喚醒的此線程的前任線程是否是頭結(jié)點(diǎn),并且此線程可以獲取到鎖,那么講此線程設(shè)置為頭結(jié)點(diǎn),如果前任節(jié)點(diǎn)不是頭結(jié)點(diǎn),或者不能獲取到鎖,那么會再次將此線程掛起,等待阻塞隊(duì)列的輪詢喚醒. 然后會重新整理Condition
中的等待隊(duì)列,將狀態(tài)不為CONDITION
的節(jié)點(diǎn)剔除出去.
signal
public final void signal() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignal(first);
}
首先獲取到Condition
中阻塞隊(duì)列的頭結(jié)點(diǎn),
private void doSignal(Node first) {
do {
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}
final boolean transferForSignal(Node node) {
if (!node.compareAndSetWaitStatus(Node.CONDITION, 0))
return false;
Node p = enq(node);
int ws = p.waitStatus;
if (ws > 0 || !p.compareAndSetWaitStatus(ws, Node.SIGNAL))
LockSupport.unpark(node.thread);
return true;
}
doSignal
的循環(huán)體內(nèi)是將頭結(jié)點(diǎn)的后繼節(jié)點(diǎn)設(shè)置為空,如果后繼節(jié)點(diǎn)本來就為空,則設(shè)置尾節(jié)點(diǎn)為空,
transferForSignal
方法中首先將等待隊(duì)列中的頭結(jié)點(diǎn)的狀態(tài)設(shè)置為0,然后將此節(jié)點(diǎn)加入到阻塞隊(duì)列中,我們知道調(diào)用await
的線程的狀態(tài)是CONDITION
,狀態(tài)大于0的情況是線程取消,后面的!p.compareAndSetWaitStatus(ws, Node.SIGNAL)
始終為true,