上一篇博客中峦剔,我們提到AQS
的隊列管理是基于CLH鎖隊列實現(xiàn)的霹陡,所以首先我們來看下CLH鎖隊列
皆尔。
1 CLH鎖隊列
CLH鎖隊列
本質(zhì)上是一個基于鏈表的FIFO自旋鎖隊列姚建,隊列中的每一個節(jié)點實質(zhì)上是一個自旋鎖:在阻塞時不斷循環(huán)讀取狀態(tài)變量矫俺,當(dāng)前驅(qū)節(jié)點釋放同步對象使用權(quán)后,跳出循環(huán)掸冤,執(zhí)行同步代碼厘托。其基本結(jié)構(gòu)如下:
隊列中每一個節(jié)點有兩個成員:
- 節(jié)點狀態(tài)變量
- 前驅(qū)指針:pred
head,tail并不是實際節(jié)點,只是為了表示隊列的首尾稿湿,被稱為dumb node铅匹。
在如此結(jié)構(gòu)之下,其enqueue操作邏輯如下:
do { pred = tail;
} while(!tail.compareAndSet(pred, node));
其lock操作如下:
public void lock() {
final Node node = new Node();
node.locked = true;
// 一個CAS操作即可將當(dāng)前線程對應(yīng)的節(jié)點加入到隊列中饺藤,
// 并且同時獲得了前繼節(jié)點的引用包斑,然后就是等待前繼釋放鎖
Node pred = this.tail.getAndSet(node);
this.prev.set(pred);
while (pred.locked) {// 進入自旋
}
}
可以看到其自旋邏輯考杉。
而其dequeue操做更加簡單:
head = node;
從面的操作,可以看到CLH鎖隊列
有如下優(yōu)勢:
- 隊列的入列舰始、出列操作原子性完成崇棠,無需加鎖,高效
- 判斷當(dāng)前隊列等待是否為空同樣簡單丸卷,只需檢查head是否為tail即可
- 每個節(jié)點獨立維護其狀態(tài)變量枕稀,避免了集中狀態(tài)管理的內(nèi)存競爭
2 AQS
進程隊列
AQS
進程隊列相比于CLH鎖隊列
主要做了兩處修改:
- 每個節(jié)點新增一個next指針。由于
AQS
隊列中的進程不僅有自旋等待谜嫉,還包括阻塞等待的情況萎坷。阻塞等待的隊列需要其他隊列主動喚醒。這就要求隊列中某個節(jié)點出列時需要顯式告知其后繼節(jié)點沐兰,因而需要加入next指針 - 節(jié)點狀態(tài)變量status由一個bit替換成一個int哆档。這主要是由于
AQS
下的狀態(tài)更加復(fù)雜
首先來看下AQS隊列節(jié)點
的基本結(jié)構(gòu):
static final class Node {
// 表明節(jié)點是否以共享模式等待的標(biāo)記
static final Node SHARED = new Node();
// 表明節(jié)點是否以獨占模式等待的標(biāo)記
static final Node EXCLUSIVE = null;
// 表明線程已被取消
static final int CANCELLED = 1;
// 表明后續(xù)節(jié)點的線程需要unparking
static final int SIGNAL = -1;
// 表明線程正在等待一個條件
static final int CONDITION = -2;
// 表明下一次acquireShared應(yīng)該無條件傳播
static final int PROPAGATE = -3;
/*
* 狀態(tài)字段,只能取下面的值:
* SIGNAL(-1): 這個結(jié)點的后繼是(或很快是)阻塞的(通過park)住闯,所以當(dāng)前結(jié)點
* 必須unpark它的后繼瓜浸,當(dāng)它釋放或取消時。為了避免競爭比原,acquire方法必須
* 首先表明它們需要一個信號插佛,然后再次嘗試原子性acquire,如果失敗了就阻塞量窘。
*
* CANCELLED(1): 這個結(jié)點由于超時或中斷已被取消雇寇。結(jié)點從不離開這種狀態(tài)。尤其是蚌铜,
* 這種狀態(tài)的線程從不再次阻塞锨侯。
*
* CONDITION(-2): 這個結(jié)點當(dāng)前在一個條件隊列上。它將不會用于sync隊列的結(jié)點冬殃,
* 直到被轉(zhuǎn)移囚痴,在那時,結(jié)點的狀態(tài)將被設(shè)為0.
* 這個值在這里的使用與其他字段的使用沒有關(guān)系造壮,僅僅是簡化結(jié)構(gòu)渡讼。
*
* PROPAGATE(-3): releaseShared應(yīng)該傳遞給其他結(jié)點。這是在doReleaseShared里設(shè)置
* (僅僅是頭結(jié)點)以確保傳遞繼續(xù)耳璧,即使其他操作有干涉。
*
* 0: 非以上任何值展箱。
*
* 值是組織為數(shù)字的用以簡化使用旨枯。非負(fù)值表示結(jié)點不需要信號。這樣混驰,大部分代碼不需要
* 檢查特定的值攀隔,只需要(檢查)符號皂贩。
*
* 對于普通同步結(jié)點,字段初始化為0昆汹;對于條件結(jié)點初始化為CONDITION(-2)明刷。
* 通過CAS操作修改(或者,當(dāng)允許時满粗,用無條件volatile寫辈末。)
*/
volatile int waitStatus;
/*
* 連接到當(dāng)前結(jié)點/線程依賴的用來檢查等待狀態(tài)的前驅(qū)結(jié)點。
* 在進入隊列時賦值映皆,只在出隊列時置為空(為了GC考慮)挤聘。
* 根據(jù)前驅(qū)結(jié)點的取消,我們使查找一個非取消結(jié)點的while循環(huán)短路捅彻,這個總是會退出组去,
* 因為頭結(jié)點從不會是取消了的:一個結(jié)點成為頭只能是一次成功的acquire操作結(jié)果。
*
* 一個取消了的線程從不會在獲取操作成功步淹,線程只能取消自己从隆,不能是其他結(jié)點。
*/
volatile Node prev;
/*
* 連接到當(dāng)前結(jié)點/線程釋放時解除阻塞的后續(xù)結(jié)點缭裆。
* 在入隊列時賦值广料,在繞過已取消前驅(qū)節(jié)點時調(diào)整,出隊列時置為空(for GC)幼驶。
* 入隊操作不會給前驅(qū)結(jié)點的next字段賦值艾杏,直到附件后(把新節(jié)點賦值給隊列的tail屬性?)盅藻,
* 所以看到next字段為空不一定表示它就是隊列的尾結(jié)點购桑。然而,如果next字段看起來是空氏淑,
* 我們可以從tail向前遍歷進行雙重檢查勃蜘。
* 被取消了的結(jié)點的next字段被設(shè)置為指向它自己而不是空,這讓isOnSyncQueue變得容易假残。
*/
volatile Node next;
/*
* 列隊在這個結(jié)點的線程缭贡,在構(gòu)造時初始化,用完后置空辉懒。
*/
volatile Thread thread;
/*
* 連接到下一個在條件上等待的結(jié)點或是特殊的值SHARED阳惹。
* 因為條件隊列只在獨占模式下持有時訪問,我們只需要一個簡單的鏈表隊列來持有在條件上等待的結(jié)點眶俩。
* 他們?nèi)缓蟊晦D(zhuǎn)移到隊列去re-acquire莹汤。
* 因為條件只能是獨占的,我們通過用一個特殊的值來表明共享模式 來節(jié)省一個字段颠印。
*/
Node nextWaiter;
Node() { // Used to establish initial head or SHARED marker
}
Node(Thread thread, Node mode) { // Used by addWaiter
this.nextWaiter = mode;
this.thread = thread;
}
Node(Thread thread, int waitStatus) { // Used by Condition
this.waitStatus = waitStatus;
this.thread = thread;
}
}
接下來我們就來看下其主要操作的主要邏輯纲岭。
3 enqueue
由于AQS
隊列節(jié)點包括pred和next兩個指針抹竹,無法通過一次原子操作更新兩個指針。所以添加結(jié)點到隊列的操作最重要的是要保證:即使添加的CAS操作失敗了止潮,也不能影響隊列結(jié)點現(xiàn)有的連接關(guān)系窃判。
對于新加結(jié)點:
- 在CAS之前指向它的預(yù)期前驅(qū)
- CAS成功之后再更新預(yù)期前驅(qū)的后繼指針。
在步驟1成功之后喇闸、步驟2完成之前袄琳,其他線程通過結(jié)點的 “next” 連接可能看到“尾結(jié)點”(即代碼里的 pred)的 “next” 為空,但其實隊列里已經(jīng)加入新的結(jié)點仅偎,這也是為什么通過 “next” 連接遍歷隊列時碰到后繼為空的跨蟹,必須從原子地更新的 “tail” 結(jié)點向后遍歷。
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// 嘗試enq的快速路徑橘沥;失敗后回退到完整的enq窗轩。
Node pred = tail;
if (pred != null) {
// 把新結(jié)點的前驅(qū)指向pred,必須在下面的CAS完成之前設(shè)置座咆,
// 這樣確保一旦CAS成功后痢艺,從tail向后遍歷是ok的。
node.prev = pred;// 步驟 1
if (compareAndSetTail(pred, node)) { //CAS
// 新節(jié)點成功進入隊列
// 前驅(qū)結(jié)點的next字段指向新結(jié)點介陶,建立完整的連接堤舒。
pred.next = node; // 步驟 2
return node;
}
}
enq(node);
return node;
}
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // 隊列是空,必須初始化哺呜。
if (compareAndSetHead(new Node())) // 原子地設(shè)置頭結(jié)點
tail = head; // 尾結(jié)點也指向頭結(jié)點
} else {
node.prev = t;
if (compareAndSetTail(t, node)) { // 步驟 1
t.next = node; // 步驟 2
// 在把新結(jié)點設(shè)置為tail后才能更新前驅(qū)的next字段舌缤,這樣,即使CAS失敗了也不會影響原來的連接關(guān)系某残。
return t;
}
}
}
}
4 acquire
acquire方法不提供絕對公平的保證国撵,因為現(xiàn)在在加入隊列之前先進行tryAcquire操作,如果這個線程先于頭結(jié)點鎖定玻墅,那么頭結(jié)點就只能繼續(xù)等待了介牙。這種情形稱為闖入。
這個acquire之所以先嘗試獲取是為了在無競爭的情況下提高性能澳厢,并可以實現(xiàn)非公平的獲取环础。如果要保證絕對的公平性,則可以在子類實現(xiàn)的tryAcquire方法里判斷當(dāng)前線程是否是頭結(jié)點剩拢,是則嘗試獲取线得,不是則直接返回false。
// 以獨占模式獲取
public final void acquire(int arg) {
if (!tryAcquire(arg) && // 首先嘗試獲取
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
// 失敗后加入等待隊列裸扶,再從隊列里再次嘗試獲瓤蚨肌;成功獲取后才返回呵晨,
// 返回的boolean表示線程是否曾經(jīng)被中斷魏保。
// 在acquireQueued方法里,線程可能被反復(fù)park摸屠、unpark谓罗,直到獲取鎖。
selfInterrupt(); // 重新設(shè)置中斷狀態(tài)位季二,是否執(zhí)行取決于acquireQueued的返回值
}
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false; // 線程是否曾被中斷是由這個變量記錄的檩咱。
for (;;) { // 死循環(huán),用于acquire失敗后重試
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {// 前驅(qū)是頭結(jié)點胯舷,繼續(xù)嘗試獲取
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
// 檢測是否需要等待刻蚯,如果需要,則park當(dāng)前線程
// 只有前驅(qū)在等待時才進入等待桑嘶,否則繼續(xù)重試
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt()) // 線程進入等待,需要其他線程來喚醒這個線程以繼續(xù)執(zhí)行
interrupted = true; // 只要線程在等待過程中被中斷過一次就會記錄下來
}
} finally {
if (failed)
// acquire失敗讨便,取消acquire
cancelAcquire(node);
}
}
/*
* 這個方法是信號控制的核心霸褒。檢查和更新沒有成功獲取的結(jié)點的狀態(tài)盈蛮。
*/
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
// 前驅(qū)結(jié)點也在等待废菱,說明這是一個穩(wěn)定的等待狀態(tài)。
return true ;
if (ws > 0) {
// 前驅(qū)結(jié)點已取消抖誉,向前遍歷直到找到一個非取消結(jié)點殊轴。
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
// 把找到的結(jié)點的后繼指向node梳凛,那么當(dāng)前pred與node之間的已取消結(jié)點就不再被引用了韧拒,可以被垃圾回收十性。
pred.next = node;
} else {
// 前驅(qū)的狀態(tài)必是 0 或 PROPAGATE之一。表明需要一個信號楷掉,但不park先烹植。
// 調(diào)用者需要重試來確保它在park之前沒法獲取。
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
private final boolean parkAndCheckInterrupt() {
// park當(dāng)前執(zhí)行線程草雕, 其他線程unpark這個線程后繼續(xù)執(zhí)行
LockSupport.park( this);
return Thread.interrupted();
}
5 release
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
private void unparkSuccessor(Node node) {
/*
* 如果status是負(fù)的(比如墩虹,可能需要信號)嘗試清除預(yù)期的信號。
* 如果這失敗了或status被其他等待線程修改也是沒關(guān)系的诫钓。
*/
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
/*
* 準(zhǔn)備unpark的線程在后繼里持有菌湃,一般就是下一個結(jié)點。
* 但如果被取消或是空场梆,從tail向后遍歷來找到實際的非取消后繼纯路。
*/
Node s = node.next;
if (s == null || s.waitStatus > 0) {
// 沒有直接后繼或直接后繼不需要通知
s = null;
// 從tail向后遍歷,查找需要通知的結(jié)點
for (Node t = tail; t != null && t != node; t = t.prev)
// 找到一個后不跳出循環(huán)是為了找到最老的需要通知的結(jié)點顶岸。
if (t.waitStatus <= 0)
s = t;
}
if (s != null) // 結(jié)點不為null辖佣,喚醒后繼的等待線程
LockSupport.unpark(s.thread);
}