1 什么是條件鎖?
條件鎖希太,是指在獲取鎖之后發(fā)現(xiàn)當(dāng)前業(yè)務(wù)場(chǎng)景自己無(wú)法處理克饶,而需要等待某個(gè)條件的出現(xiàn)才可以繼續(xù)處理時(shí)使用的一種鎖。
這里的條件誊辉,必須是在獲取鎖之后去等待矾湃,對(duì)應(yīng)到ReentrantLock的條件鎖,就是獲取鎖之后才能調(diào)用condition.await()方法堕澄。
在java中邀跃,條件鎖的實(shí)現(xiàn)都在AQS的ConditionObject類中,ConditionObject實(shí)現(xiàn)了Condition接口奈偏,下面我們通過(guò)一個(gè)例子來(lái)進(jìn)入到條件鎖的學(xué)習(xí)中坞嘀。
2 源碼分析
2.1 ConditionObject的主要屬性
public class ConditionObject implements Condition, java.io.Serializable {
private static final long serialVersionUID = 1173984872572414699L;
/** First node of condition queue. */
private transient Node firstWaiter;
/** Last node of condition queue. */
private transient Node lastWaiter;
}
可以看見(jiàn)條件鎖中也維護(hù)了一個(gè)隊(duì)列躯护,為了和AQS的隊(duì)列區(qū)分惊来,這里稱為條件隊(duì)列,firstWaiter是隊(duì)列的頭節(jié)點(diǎn)棺滞,lastWaiter是隊(duì)列的尾節(jié)點(diǎn)
2.2 新建條件鎖方法 lock.newCondition()
public Condition newCondition() {
return sync.newCondition();
}
final ConditionObject newCondition() {
return new ConditionObject();
}
// AbstractQueuedSynchronizer.ConditionObject.ConditionObject()
public ConditionObject() { }
新建一個(gè)條件鎖最后就是調(diào)用的AQS中的ConditionObject類來(lái)實(shí)例化條件鎖裁蚁。
2.3 condition.await()方法
condition.await()方法,表明現(xiàn)在要等待條件的出現(xiàn)继准。
AbstractQueuedSynchronizer.ConditionObject.await()方法
public final void await() throws InterruptedException {
// 如果線程中斷了,拋出異常
if (Thread.interrupted())
throw new InterruptedException();
// 添加節(jié)點(diǎn)到Condition的條件隊(duì)列中枉证,并返回該節(jié)點(diǎn)
Node node = addConditionWaiter();
// 完全釋放當(dāng)前線程獲取的鎖
// 因?yàn)殒i是可重入的,所以這里要把獲取的鎖全部釋放
int savedState = fullyRelease(node);
int interruptMode = 0;
// 是否在同步隊(duì)列中
while (!isOnSyncQueue(node)) {
// 阻塞當(dāng)前線程移必,掛起線程
LockSupport.park(this);
// 阻塞等待條件的出現(xiàn)室谚,條件出現(xiàn)跳出循環(huán)
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
// 嘗試獲取鎖,如果沒(méi)獲取到會(huì)再次阻塞
// 被喚醒后 自旋操作爭(zhēng)取到鎖崔泵,同時(shí)判斷線程是否被中斷
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
// 清除取消的節(jié)點(diǎn)
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
// 線程中斷相關(guān)
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
添加節(jié)點(diǎn)到Condition的條件等待隊(duì)列中秒赤,addConditionWaiter()方法
private Node addConditionWaiter() {
Node t = lastWaiter;
// If lastWaiter is cancelled, clean out.
// 如果條件隊(duì)列的尾節(jié)點(diǎn)已取消,從頭節(jié)點(diǎn)開(kāi)始清除所有已取消的節(jié)點(diǎn)
if (t != null && t.waitStatus != Node.CONDITION) {
unlinkCancelledWaiters();
// 重新獲取尾節(jié)點(diǎn)
t = lastWaiter;
}
// 新建一個(gè)節(jié)點(diǎn)憎瘸,它的等待狀態(tài)是condition
Node node = new Node(Thread.currentThread(), Node.CONDITION);
// 如果尾節(jié)點(diǎn)為空入篮,則把新節(jié)點(diǎn)賦值給頭節(jié)點(diǎn)(相當(dāng)于初始化隊(duì)列)
// 否則把新節(jié)點(diǎn)賦值給尾節(jié)點(diǎn)的nextWaiter指針
if (t == null)
firstWaiter = node;
else
t.nextWaiter = node;
// 尾節(jié)點(diǎn)指向新節(jié)點(diǎn)
lastWaiter = node;
return node;
}
釋放全部鎖fullyRelease()方法,返回獲得鎖的次數(shù)
final int fullyRelease(Node node) {
boolean failed = true;
try {
// 獲取狀態(tài)變量的值幌甘,該值也代表獲取鎖的次數(shù)
int savedState = getState();
// 一次性釋放所有獲得的鎖
if (release(savedState)) {
failed = false;
// 返回獲得鎖的次數(shù)
return savedState;
} else {
throw new IllegalMonitorStateException();
}
} finally {
if (failed)
node.waitStatus = Node.CANCELLED;
}
}
判斷節(jié)點(diǎn)是否在同步隊(duì)列中潮售,AbstractQueuedSynchronizer.isOnSyncQueue中
final boolean isOnSyncQueue(Node node) {
// 如果等待狀態(tài)是Condition,或者前一個(gè)指針為空锅风,返回false
// 說(shuō)明還沒(méi)有移到AQS的隊(duì)列中
if (node.waitStatus == Node.CONDITION || node.prev == null)
return false;
// 如果next指針有值酥诽,說(shuō)明已經(jīng)移到AQS的隊(duì)列中了
if (node.next != null) // If has successor, it must be on queue
return true;
// 從AQS的尾節(jié)點(diǎn)開(kāi)始往前尋找看是否可以找到當(dāng)前節(jié)點(diǎn),找到了也說(shuō)明已經(jīng)在AQS的隊(duì)列中了
return findNodeFromTail(node);
}
注意:
<1> Condition的隊(duì)列和AQS的隊(duì)列不完全一樣皱埠;
AQS的頭節(jié)點(diǎn)不存在任何值的肮帐,是一個(gè)虛節(jié)點(diǎn);
Condition的隊(duì)列頭節(jié)點(diǎn)是存儲(chǔ)實(shí)實(shí)在在的元素值的漱逸,是真實(shí)節(jié)點(diǎn)
<2> 各種等待狀態(tài)(waitStatus)的變化
首先泪姨,在條件隊(duì)列中游沿,新建節(jié)點(diǎn)的初始等待狀態(tài)是CONDITION(-2);
其次,移到AQS的隊(duì)列中等待狀態(tài)就會(huì)變?yōu)?肮砾,(AQS隊(duì)列節(jié)點(diǎn)的初始等待狀態(tài)為0)
然后诀黍,在AQS的隊(duì)列中如果需要阻塞,會(huì)把它上一個(gè)節(jié)點(diǎn)的等待狀態(tài)設(shè)置為SIGNAL(-1)
最后仗处,不管在Condition隊(duì)列還是AQS隊(duì)列中眯勾,已取消的節(jié)點(diǎn)的等待狀態(tài)都設(shè)置為CANCELED(1)
<3> 相似的名稱
AQS中下一個(gè)節(jié)點(diǎn)是next,上一個(gè)節(jié)點(diǎn)是prev婆誓;
Condition中下一個(gè)節(jié)點(diǎn)是nextWaiter 吃环,沒(méi)有上一個(gè)節(jié)點(diǎn)。
另外:所以AQS中存在兩種隊(duì)列洋幻,一種是同步隊(duì)列郁轻,一種是條件隊(duì)列。
每個(gè)Condition都對(duì)應(yīng)著一個(gè)條件隊(duì)列文留;一個(gè)鎖上可以創(chuàng)建多個(gè)Condition對(duì)象好唯,那么也就存在多個(gè)條件隊(duì)列。條件隊(duì)列同樣是一個(gè)FIFO的隊(duì)列燥翅,在隊(duì)列中每一個(gè)節(jié)點(diǎn)都包含了一個(gè)線程的引用骑篙,而該線程就是Condition對(duì)象上等待的線程。
當(dāng)一個(gè)線程調(diào)用了await()相關(guān)的方法森书,那么該線程將會(huì)釋放鎖靶端,并構(gòu)建一個(gè)Node節(jié)點(diǎn)封裝當(dāng)前線程的相關(guān)信息加入到條件隊(duì)列中進(jìn)行等待,直到被喚醒凛膏、中斷杨名、超時(shí)才從隊(duì)列中移出。Condition中的等待隊(duì)列模型如下
條件隊(duì)列中節(jié)點(diǎn)的狀態(tài)只有兩種译柏,即CANCELLED和CONDITION(前者表示線程已結(jié)束需要從等待隊(duì)列中移除镣煮,后者表示條件節(jié)點(diǎn)等到被喚醒)
await()主要做了三件事:
一是調(diào)用addConditionWaiter()方法將當(dāng)前線程封裝成node結(jié)點(diǎn)加入等待隊(duì)列。
二是調(diào)用fullyRelease(node)方法釋放同步狀態(tài)并喚醒后繼結(jié)點(diǎn)的線程鄙麦。
三是調(diào)用isOnSyncQueue(node)方法判斷結(jié)點(diǎn)是否在同步隊(duì)列中典唇,這里是個(gè)while循環(huán),如果同步隊(duì)列中沒(méi)有該結(jié)點(diǎn)就直接掛起該線程胯府,需要明白的是如果線程被喚醒后就調(diào)用acquireQueued(node, savedState)執(zhí)行自旋操作爭(zhēng)取鎖介衔,即當(dāng)前線程結(jié)點(diǎn)從等待隊(duì)列轉(zhuǎn)移到同步隊(duì)列并開(kāi)始努力獲取鎖。
2.4 condition.signal()方法
condition.signal()方法通知條件已經(jīng)出現(xiàn)骂因。
AbstractQueuedSynchronizer.ConditionObject.signal方法
public final void signal() {
// 如果不是當(dāng)前線程占有著鎖炎咖,調(diào)用這個(gè)方法拋出異常
// 說(shuō)明signal()方法也要在獲取鎖之后執(zhí)行
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
// 條件隊(duì)列的頭節(jié)點(diǎn)
Node first = firstWaiter;
// 如果有等待條件的節(jié)點(diǎn),則通知條件成立
if (first != null)
doSignal(first);
}
AbstractQueuedSynchronizer.ConditionObject.doSignal方法
while條件,如果被通知節(jié)點(diǎn)沒(méi)有進(jìn)入到同步隊(duì)列并且條件等待隊(duì)列還有不為空的節(jié)點(diǎn)乘盼,則繼續(xù)循環(huán)通知后續(xù)結(jié)點(diǎn)
private void doSignal(Node first) {
do {
// 移除條件等待隊(duì)列中的第一個(gè)節(jié)點(diǎn)
// 如果后繼節(jié)點(diǎn)為null升熊,那么說(shuō)明沒(méi)有其他節(jié)點(diǎn),將尾節(jié)點(diǎn)也設(shè)置為null
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
// 相當(dāng)于將頭節(jié)點(diǎn)從隊(duì)列中出隊(duì)
first.nextWaiter = null;
// 轉(zhuǎn)移節(jié)點(diǎn)到AQS隊(duì)列中去
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}
轉(zhuǎn)移節(jié)點(diǎn)到AQS隊(duì)列中
final boolean transferForSignal(Node node) {
/*
* If cannot change waitStatus, the node has been cancelled.
*/
// 把節(jié)點(diǎn)的狀態(tài)更改為0绸栅,也就是即將移到AQS隊(duì)列中
// 如果失敗了级野,說(shuō)明節(jié)點(diǎn)已經(jīng)改成取消狀態(tài)了
// 返回false,通過(guò)上面的循環(huán)可知會(huì)尋找下一個(gè)可用節(jié)點(diǎn)
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
// 調(diào)用AQS的入隊(duì)方法將節(jié)點(diǎn)移到AQS隊(duì)列中
// 這里的enq的返回值是node的上一個(gè)節(jié)點(diǎn)粹胯,就是舊尾節(jié)點(diǎn)
Node p = enq(node);
// 上一個(gè)節(jié)點(diǎn)的等待狀態(tài)
int ws = p.waitStatus;
// 如果上一個(gè)節(jié)點(diǎn)已經(jīng)取消了蓖柔,或者更新?tīng)顟B(tài)為signal失敗
// 則直接喚醒當(dāng)前節(jié)點(diǎn)對(duì)應(yīng)的線程
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread);
return true;
}
如果更新上一個(gè)節(jié)點(diǎn)的等待狀態(tài)為signal成功了,則返回true风纠,這個(gè)時(shí)候况鸣,上面的循環(huán)就不成立了,退出循環(huán)竹观,也就是通知了一個(gè)節(jié)點(diǎn)镐捧。
此時(shí),當(dāng)前節(jié)點(diǎn)還是阻塞狀態(tài)栈幸,也就是說(shuō)調(diào)用signal()的時(shí)候并不會(huì)真正喚醒一個(gè)節(jié)點(diǎn)愤估,只是把節(jié)點(diǎn)從條件隊(duì)列移到AQS隊(duì)列中
總結(jié):
signal()方法:
(1)從條件隊(duì)列的頭節(jié)點(diǎn)開(kāi)始尋找一個(gè)非取消狀態(tài)的節(jié)點(diǎn);
(2)把它從條件隊(duì)列移到AQS隊(duì)列速址;
(3)且只移動(dòng)一個(gè)節(jié)點(diǎn);