AQS Condition的實現(xiàn)
AQS中的ConditionObject和node
static final class Node {
/**
* 同步隊列的頭 初始化 或者setHead方法可修改
*/
static final Node SHARED = new Node();
/**
* 標識這個節(jié)點用于 獨占模式(排它 反正一個意思)
*/
static final Node EXCLUSIVE = null;
/** 下面是 waitStatus 的幾個常量值 */
/**
* 表明等待線程已經(jīng)取消
*/
static final int CANCELLED = 1;
/**
* 表述如果當前節(jié)點的前一個節(jié)點狀態(tài)是 SIGNAL 那么就可以阻塞當前自己的線程 不用去爭搶資源了 沒用 不然會一直嘗試去獲取資源
*/
static final int SIGNAL = -1;
/**
* 線程在條件隊列中等待
*/
static final int CONDITION = -2;
/**
* 共享模式下 無條件傳播 該狀態(tài)的進程處于可運行狀態(tài)
*/
static final int PROPAGATE = -3;
/**
* 當前node 狀態(tài)
*/
volatile int waitStatus;
/**
* 同步隊列的前置節(jié)點
*/
volatile Node prev;
/**
* 同步隊列的后置節(jié)點
*/
volatile Node next;
/**
* 當前節(jié)點所屬的線程
*/
volatile Thread thread;
/**
* 用于條件隊列 是條件隊列的下一個節(jié)點
*/
Node nextWaiter;
/**
* 是否是共享模式 這個方法只會在同步隊列中使用 nextWaiter在同步隊列中復用了
*/
final boolean isShared() {
return nextWaiter == SHARED;
}
/**
* 獲取當前節(jié)點的前置節(jié)點 沒有就拋出異常
*/
final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}
Node() { // Used to establish initial head or SHARED marker
}
Node(Thread thread, Node mode) { // Used by addWaiter
this.nextWaiter = mode;
this.thread = thread;
}
Node(Thread thread, int waitStatus) { // Used by Condition
this.waitStatus = waitStatus;
this.thread = thread;
}
}
final class ConditionObject implements Condition {
/**
* 條件隊列的頭節(jié)點
*/
private transient Node firstWaiter;
/**
* 條件隊列的尾節(jié)點
*/
private transient Node lastWaiter;
/**
* ConditionObject 默認的構(gòu)造函數(shù)
*/
public ConditionObject() {
}
}
第一篇文章的時候 我和大家也描述過 Condition Queue 實際上是一個單向鏈表 在分析Node節(jié)點的時候 我描述過prev和next都是給Sync Queue使用的 實際上對于Condition Queue node節(jié)點 有效的字段 就是 nextWaiter 泵督,waitStatus和thread字段
條件隊列-await源碼分析
0-await方法
public void await() throws InterruptedException {
if (Thread.interrupted())//判斷當前線程 是否被中斷了 如果中斷了 拋出中斷異常
throw new InterruptedException();
Node node = addConditionWaiter();//新增一個新的等待節(jié)點到條件隊列中
int savedState = fullyRelease(node);//釋放當前節(jié)點占用的資源 并返回線程持有的狀態(tài)值
int interruptMode = 0;
while (!isOnSyncQueue(node)) {//判斷當線程節(jié)點是否在同步隊列中
LockSupport.park(this);//如果不在同步隊列中 那就阻塞當前線程 等待喚醒
/*
* 能執(zhí)行到下面的代碼 說明線程從阻塞狀態(tài)中喚醒了 喚醒可能有2種情況
* 1:是線程發(fā)生了中斷
* 2:是線程接受到signal信號 從阻塞狀態(tài)中被喚醒
* checkInterruptWhileWaiting 返回值有3個
* 0表示:線程沒有被中斷
* 1 REINTERRUPT表示:中斷在signal之后發(fā)生的
* -1 THROW_IE表示:中斷在signal之前發(fā)生的
*/
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)//檢查是否發(fā)生過線程中斷 0表示沒有發(fā)生
break;// 如果線程沒有中斷 說明被signal喚醒 那就繼續(xù)判斷是否喚醒了當前線程 如果是當前線程 會進入到同步隊列中
}
/*
* 這邊的代碼 就是當前的node已經(jīng)在Sync Queue 中了
* acquireQueued 我們在之前獨占鎖加鎖的時候 也分析過 就是去獲取資源 獲取不到的話 就排隊等待繼續(xù)阻塞
* acquireQueued返回true 說明在進入Sync隊列中 等待的過程中鎖的過程中也發(fā)生了中斷
*acquireQueued返回true 返回false 說明沒有發(fā)送過中斷 那下面的賦值就不會走到
*如果acquireQueued返回true 而且interruptMode是非THROW_IE 那個整個方法就是REINTERRUPT的結(jié)果 因為都不需要拋出異常
*/
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;//記錄線程中斷的表示位
/*
*這邊的意思就是如果當前節(jié)點nextWaiter不是等于null的說明 node節(jié)點還是和Condition
* queue 關(guān)聯(lián)著的 那就執(zhí)行一下清理操作 吧condition queue里面的非等待節(jié)點剔除
* 那種情況下會走到這步呢 那就是當前的interruptMode是THROW_IE的時候
* 為什么呢 因為THROW_IE的意思 是中斷發(fā)送在signal之前 signal
* 因為如果是signal的話 當前節(jié)點的nextWaiter為被置為null的 可以回看下代碼
*/
if (node.nextWaiter != null)
unlinkCancelledWaiters();
if (interruptMode != 0)//這邊0 說明一直沒發(fā)生過中斷
reportInterruptAfterWait(interruptMode);
}
1-addConditionWaiter
/**
* 新增一個新的等待節(jié)點到等待的條件隊列中
*
* @return its new wait node
*/
private Node addConditionWaiter() {
Node t = lastWaiter;//等待條件的隊列的最后一個
//如果最后的lastwaiter 節(jié)點狀態(tài)是非Condition 說明已經(jīng)取消 就清理ConditionQueue的方法
if (t != null && t.waitStatus != Node.CONDITION) {
unlinkCancelledWaiters();
t = lastWaiter;//unlinkCancelledWaiters方法里面lastWaiter可能又重寫賦值了
}
Node node = new Node(Thread.currentThread(), Node.CONDITION);//當前線程包裝成node節(jié)點
/*
* t是null 說明尾節(jié)點為null 說明條件隊列中沒有值 所以node 成了firstWaiter
* t不為null 那就加入到隊尾
* */
if (t == null)
firstWaiter = node;
else
t.nextWaiter = node;
lastWaiter = node;//lastWaiter 重寫賦值 因為node是最后加入的 node就是lastWaiter
return node;
}
/**
* 條件隊列從頭部開始 移除非CONDITION節(jié)點
*/
private void unlinkCancelledWaiters() {
Node t = firstWaiter;//頭節(jié)點賦值給t
Node trail = null;//trail是t的next節(jié)點的上一個為CONDITION的節(jié)點
//這個循環(huán)做的是從頭節(jié)點開始移除不是CONDITION的節(jié)點
while (t != null) {
Node next = t.nextWaiter;//next為t的下一個節(jié)點
if (t.waitStatus != Node.CONDITION) {//如果t的狀態(tài)不是CONDITION 說明不應該在條件隊列中 取消了 要移除
t.nextWaiter = null;//把t的下一個節(jié)點設置為null 這樣讓t 和整個條件隊列鏈表斷開 也方便GC
/**
*trail 是null 說明是第一次進來吧 但是第一次的t是firstWaiter 這個時候firstWaiter的節(jié)點為CONDITION
* 所以下面有個賦值把 firstWaiter的下一個節(jié)點 賦值給firstWaiter 意思就是說 讓下個節(jié)點成為頭節(jié)點
* 如果trail不是為null 那就把當?shù)毓?jié)點的next賦值給trail的下個節(jié)點 因為當前節(jié)點t 不可用了 所以要將t的
* 下個節(jié)點 重新和鏈表關(guān)聯(lián)起來 也就是說重新指向上一個節(jié)點 而trail其所就是t的上一個有效的節(jié)點
* 所以有了這個賦值
* */
if (trail == null)
firstWaiter = next;
else
trail.nextWaiter = next;
/*
* 如果next 等于null 了 說明t沒有下個節(jié)點了 這個時候trail 應該就是有效的最后一個節(jié)點
* */
if (next == null)
lastWaiter = trail;
} else
trail = t;//trail相當于一個臨時的變量 這邊的賦值就是我上面說的 trail是next的上一個有效的節(jié)點值
t = next;//next賦值給t 準備下一次的循環(huán)
}
}
上面的整個代碼 是我注釋了addConditionWaiter方法 大家應該能看明白,這個方法主要做的就是包裝當前線程為node 然后加入的Condition Queue的隊尾 這其中還做了一個條件隊列元素清理的工作,清除一些非Condition狀態(tài)的節(jié)點
2-fullyRelease
下面我們來看下第二個方法 fullyRelease 看名字 我們應該也能猜出就是釋放當前線程占用的資源,而且是完全釋放,為什么是fully呢愧旦,那是比如重入鎖,可以重入析既,每次lock的時候同步器的狀態(tài)State都會+1,可以去看下第一篇的文章吼鱼,應該有描述過蓬豁,而且fullyRelease方法是有返回值的 返回的savedState就是當前線程持有的狀態(tài)值,為什么要記錄下來呢菇肃,那是后面我們再次爭取鎖資源的時候 需要用到這個savedState
/**
* 釋放當前節(jié)點持有的所有資源地粪,并且喚醒同步隊列中的head節(jié)點去獲取資源
*/
final int fullyRelease(Node node) {
boolean failed = true;//表示 是否釋放失敗
try {
int savedState = getState();//獲取同步器的狀態(tài)值state
if (release(savedState)) {//就是釋放資源 喚醒等待的線程去獲取資源 之前已經(jīng)描述過 不清楚的 看下第二篇文章
failed = false;
return savedState;
} else {
throw new IllegalMonitorStateException();//釋放失敗 拋出異常
}
} finally {
if (failed)
node.waitStatus = Node.CANCELLED;//如果釋放失敗 就把當前節(jié)點設置去取消 著就解釋了 為什么之前加入節(jié)點的時候回去做檢查
// 丟棄非Condition的節(jié)點
}
}
3-isOnSyncQueue
isOnSyncQueue 方法 就是判斷當前節(jié)點是否在同步隊列SyncQueue中,如果是的話 就跳出while循環(huán)執(zhí)行后面的方法,如果不在的話 那就要進入while循環(huán)體呢 做線程等待了,至于為什么要這樣判斷琐谤,那時因為node 節(jié)點加入到ConditionQueue 中蟆技,如果執(zhí)行Signal方法,被喚醒的線程節(jié)點斗忌,會轉(zhuǎn)移到SyncQueue中质礼,這個具體后面的Signal方法里面 我們具體再說。
看下代碼:
/**
* 判斷當前node 是否在同步隊列中
*/
final boolean isOnSyncQueue(Node node) {
/*
*節(jié)點的狀態(tài)是condition一定不再同步隊列中
*如果節(jié)點加入到同步隊列中 使用enq方法 那么當前節(jié)點的pre 一定是非空的
*那么如果當前pre是為null
*那就不在Sync queue 中
*/
if (node.waitStatus == Node.CONDITION || node.prev == null)
return false;
if (node.next != null) // 如果當前節(jié)點有后繼節(jié)點 必然是在同步隊列中的 因為next是同步隊列中的node 才會存在這一的情況
return true;
return findNodeFromTail(node);//去同步隊列中匹配node 節(jié)點
}
/**
* 從尾部節(jié)點開始搜索 看是否能找到當前的node節(jié)點
*/
private boolean findNodeFromTail(Node node) {
Node t = tail;//同步隊列的尾部節(jié)點
for (; ; ) {
if (t == node)// t ==node 說明在同步隊列能找到 返回true
return true;
/*
* t==null 第一次循環(huán)說明tail節(jié)點不存在 說明同步隊列就是不存在的 那node更不可能存在于同步隊列中返回false
* 后面的循環(huán)t 就是之前的節(jié)點的前pre節(jié)點 如果為null 說明已經(jīng)找到了頭部節(jié)點了 都沒有匹配到node 也返回false
*/
if (t == null)
return false;
t = t.prev;
}
}
每行代碼的 具體語義 我都在注釋里面了 不清楚的 結(jié)和整個方法理解一下
4-while方法體內(nèi)部
當執(zhí)行到while 內(nèi)部的時候织阳,剛才我也分析過眶蕉,執(zhí)行到while里面說明 當前的node節(jié)點不在SyncQueue中,說明就在ConditionQueue中陈哑,首先看到 有個阻塞線程的操作妻坝,這個和獨占鎖 阻塞當前線程是一個道理伸眶,這邊等待是Signal喚醒當前線程,然后繼續(xù)往下執(zhí)行
后面有一個方法checkInterruptWhileWaiting 這個方法其實是要關(guān)注一下的刽宪,
先看下代碼:
/**
* Mode meaning to reinterrupt on exit from wait
*/
private static final int REINTERRUPT = 1;
/**
* Mode meaning to throw InterruptedException on exit from wait
*/
private static final int THROW_IE = -1;
/**
* 檢查是否發(fā)生過線程中斷
* 返回0表示:線程沒有被中斷
* 1表示:中斷在signal之后發(fā)生的
* -1表示:中斷在signal之前發(fā)生的
*/
private int checkInterruptWhileWaiting(Node node) {
return Thread.interrupted() ?
(transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
0;
}
/**
* Transfers node, if necessary, to sync queue after a cancelled wait.
* Returns true if thread was cancelled before being signalled.
*/
final boolean transferAfterCancelledWait(Node node) {
/*
* 這個地方給大家特別說明下:
* 剛才上面我提到過 被喚醒有2中方式 可能是被signalled 或者被interrupted
* 下面的有個CAS的操作 就是 將當前節(jié)點的狀態(tài)更新成0
* 如果更新成功說明了 當前節(jié)點的狀態(tài)依舊是CONDITION 也就是說還在條件隊列中 那就說明了不是被signal喚醒的 那就是被中斷了
* 同理 如果更新失敗 則說明當前節(jié)點的狀態(tài) 已經(jīng)被修改了 那說明就是被signalled了的 因為被signal 會將當前節(jié)點狀態(tài)修改 轉(zhuǎn)移到Sync queue中
*/
if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
enq(node);//這邊更新成功 說明當前線程發(fā)生了中斷 而且中斷在signal之前 這邊做一個補償操作 把節(jié)點放入到Sync 隊列中
return true;
}
/*
* 這邊又判斷了下 當前節(jié)點是否在同步隊列中 為什么還要判斷呢 是因為雖然發(fā)生了signal
* 但是 我們看下transferForSignal的方法能知道 是先執(zhí)行修改節(jié)點狀態(tài)的CAS操作 然后再執(zhí)行enq的入隊操作
* 所以這邊雖然狀態(tài)已經(jīng)修改 但是可能線程正在執(zhí)行enq 方法 所以這邊判斷了下 如果沒有在Sync隊列中
* 那當前線程就坐下yield 就是線程執(zhí)行讓出一下 意思就是稍等會兒
*/
while (!isOnSyncQueue(node))
Thread.yield();
return false;
}
線程在while內(nèi)部被阻塞 然后被喚醒 只有2中方式:1是線程發(fā)生了中斷厘贼,二是
checkInterruptWhileWaiting 方法返回值有3個 一個是0說明線程從等待到喚醒沒有發(fā)生過中斷
第二個返回值是THROW_IE,它的值是-1圣拄,從命名上面我們能知道 這個是要拋出中斷異常嘴秸,它的執(zhí)行結(jié)果其實就是線程的中斷在Signal之前發(fā)生了
第三個返回結(jié)果是REINTERRUPT 它的值是1 意思就是重新做下線程中斷,這個是由于中斷在Signal之后發(fā)生的
==這邊有個條件就是 如果返回0的話 循環(huán)是繼續(xù)的 不會break 我在網(wǎng)上查詢說 這邊可能存在“假喚醒”的問題 因為返回0 線程一定是沒有中斷庇谆,那就是被喚醒了岳掐,但是被喚醒的node 會進入到SyncQueue中的呀,為什么這邊不跳過循環(huán)饭耳,反而是繼續(xù)循環(huán)判斷串述?這邊沒搞明白,有知道的小伙伴 可以告知一下寞肖!==
5-while之后的方法
while 之后的方法 說明當前線程已經(jīng)在SyncQueue 那就執(zhí)行和獨占鎖的獲取方法一樣的acquireQueued方法纲酗,不清楚的這個方法怎么運行的小伙伴,可以回看下第一篇文章新蟆,acquireQueued主要做的就是去嘗試獲取鎖資源觅赊,如果獲取不到線程還是阻塞等待的,直到被喚醒琼稻。該方法是有返回值的 如果返回ture 說明在等待過程中發(fā)生了中斷吮螺,如果是false 說明沒有。如果返回true 而且上面的interruptMode是非Throw-IE的 那interruptMode值就是ReInterrupt
后面的nextWaiter!=null帕翻,說明當前節(jié)點還沒有和ConditionQueue斷開鸠补,這邊執(zhí)行下ConditionQueue的清理操作,把非Condition狀態(tài)的節(jié)點從條件隊列中剔除出去熊咽。最后如果interruptMode非0就執(zhí)行下對于的狀態(tài)操作reportInterruptAfterWait
具體代碼也很簡單:
/**
* 這邊就是根據(jù) 剛才interruptMode 不同的值 做出不同的回應
* THROW_IE 意思就是拋出異常
* REINTERRUPT 意思就是做出線程重寫中斷的操作 可以讓上層去檢測處理
*/
private void reportInterruptAfterWait(int interruptMode)
throws InterruptedException {
if (interruptMode == THROW_IE)
throw new InterruptedException();
else if (interruptMode == REINTERRUPT)
selfInterrupt();
}
6-await 總結(jié)
await還有幾個重構(gòu)的方法莫鸭,里面的核心方法上面我都講了,剩下的有興趣的可以自己嘗試去理解看看横殴,具體方法有什么區(qū)別 我在上篇分析Condition接口的時候 接口方法上面都注釋了!
第一步 執(zhí)行await方法執(zhí)行的時候當前線程一定是獲得了鎖的被因,不然執(zhí)行這個方法的時候回報錯的,有興趣的可以自己寫下Demo,自己看下在哪一步報錯衫仑,偷偷告訴你下在釋放tryRelease的時候梨与!
第二步 就是將當前線程封裝成node節(jié)點 放入ConditionQueue的尾部
第三步 釋放當前線程持有的所有同步器State
第四步 判斷當前節(jié)點是否在SyncQueue中 如果是 就第五步 如果不是 就線程阻塞 等待Signal信號 喚醒
第五步 執(zhí)行acquireQueued方法 去重新獲取鎖資源
最后一步 獲取到鎖后 根據(jù)前面的中斷狀態(tài) 做出對應的處理 方法返回
條件隊列-signal源碼分析
signal方法
signal方法 是從Contidion頭部開始選一個合法的節(jié)點 轉(zhuǎn)換到SyncQueue中
public void signal() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignal(first);
}
/**
* 一個判斷 判斷是否用于鎖的線程和釋放線程是同一個 子類從寫實現(xiàn)
*/
protected boolean isHeldExclusively() {
throw new UnsupportedOperationException();
}
/**
* 使得條件隊列中的第一個沒有被cancel的節(jié)點 enq到同步隊列的尾部
*/
private void doSignal(Node first) {
do {
/*
* 這邊說明條件隊列只有first 一個節(jié)點轉(zhuǎn)移完first節(jié)點設置lastWaiter也為null
* 設置first的nextWaiter 等于null
*/
if ((firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;//first 要被加入到同步隊列中 修改nextWaiter==null
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}
/**
* 將node節(jié)點從調(diào)節(jié)隊列中轉(zhuǎn)換到同步隊列中 如果返回是true 那說明轉(zhuǎn)換成功
*/
final boolean transferForSignal(Node node) {
/*
* 如果當前的CAS操作失敗 說明node節(jié)點的狀態(tài)已經(jīng)不是condition了 可能已經(jīng)被cancel了 所以返回false
*/
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
Node p = enq(node);//將當前的node節(jié)點 加入到同步隊列中 獨占鎖的時候已經(jīng)分析過 返回的節(jié)點p是node節(jié)點的prev節(jié)點
int ws = p.waitStatus;
/*
* 這邊ws是node的prev 節(jié)點p的狀態(tài) 如果p的ws 大于0 那說明p已經(jīng)cancel了 那就可以直接喚醒node節(jié)點
* 這邊不明白的可以去結(jié)合shouldParkAfterFailedAcquire 方法看下 這個方法里面有如果node的pre節(jié)點是Cancel的話 會做重寫尋找pre節(jié)點
* 同樣的下面的CAS 操作將node的前驅(qū)節(jié)點P的ws狀態(tài)修改為signal失敗 說明當前的p節(jié)點的狀態(tài)已經(jīng)被別的線程修改了
* 那就要去喚醒node節(jié)點線程去獲取資源鎖
* 之前我們獨占鎖的時候都說過 同步隊列中 節(jié)點都是通過自己的前驅(qū)節(jié)點去喚醒的
*/
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread);
return true;
}
signal 方法比較簡單 上面我也描述過了 有些地方如果看不懂 還是要結(jié)合整個await方法互相看下 每一個判斷都存在道理
signalAll方法
signalAll方法 是將所有ConditiaonQueue中node節(jié)點轉(zhuǎn)換到SyncQueue中
public void signalAll() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignalAll(first);
}
/**
* 移除條件隊列中所有節(jié)點 挨個轉(zhuǎn)移到同步隊列中
*/
private void doSignalAll(Node first) {
lastWaiter = firstWaiter = null;//因為所以節(jié)點 都已經(jīng)轉(zhuǎn)移 所以條件隊列就為null 了
do {
Node next = first.nextWaiter;
first.nextWaiter = null;
transferForSignal(first);
first = next;
} while (first != null);//循環(huán)轉(zhuǎn)移 直到最后一個nextWaiter等于null
}
從代碼上 我們也能看到signalAll就是做一個所有節(jié)點的轉(zhuǎn)移操作,doSignalAll方法入口就是設置當前的 lastWaiter = firstWaiter = null 保證了一個整體的操作文狱,如果有人想問 為什么不直接把ConditionQueue接到SyncQueue的后面 不就好了么粥鞋,為什么還要挨個去循環(huán),那是因為2中隊列的結(jié)構(gòu)不一樣瞄崇,沒法直接全部轉(zhuǎn)移呻粹,Sync是用next和prev連接前后節(jié)點的但是Condition 是用NextWaiter連接后面的節(jié)點的壕曼,是一個單向鏈表,2者沒法直接關(guān)聯(lián)等浊!
總結(jié)Sync-Queue和Conditian-Queue
Sync-Queue:
Condition-Queue:
上面就是SyncQueue和ConditionQueue的流程圖
寫了5篇文章分析了下AQS的源碼 大部分源碼都已經(jīng)做了注解,如果看不明白的腮郊,多看幾遍 ,對著源碼看筹燕,第一篇可以看我的注解轧飞,第二遍可以嘗試自己單獨看源碼,是否能看明白撒踪,最好自己能debug走一遍 看下过咬,一定的能夠加深影響,最后文中如果有些的不對的制妄,希望大家能夠指正
后面我會整理下掸绞,把所有代碼放到github 里面 方便大家看
==預告:后面幾篇我會寫下具體實現(xiàn)AQS的java的的類,ReentrantLock,Semaphore,CountDownLatch,DelayQueue等等==