AbstractQueuedSynchronizer是juc包下面解決資源競爭的基礎,功能主要包括三部分:
第一部分Condition監(jiān)視器街佑,已在Condition源碼解析文章中做了分析。
第二部分獨占模式陨献,ReentrantLock帚屉,ReentrantReadWriteLock的寫鎖都是基于AbstractQueuedSynchronizer的獨占鎖實現(xiàn)的。
第三部分共享模式牵祟,ReentrantReadWriteLock的讀鎖,CountDownLatch猴伶,Semaphore都是基于AbstractQueuedSynchronizer的共享鎖實現(xiàn)的课舍。
AbstractQueuedSynchronizer實現(xiàn)由兩個隊列實現(xiàn),一個Sync queue他挎。一個Condition Queue筝尾,Condition Queue再第一篇文章中已介紹。
Sync queue是實現(xiàn)共享鎖和獨占鎖的基礎办桨。
本文分析獨占鎖實現(xiàn)筹淫。
AbstractQueuedSynchronizer 類變量
// 記錄Sync Queue 的頭
private transient volatile Node head;
/**
* 記錄Sync Queue 的尾部
* method enq to add new wait node.
*/
private transient volatile Node tail;
// 同步狀態(tài),lock呢撞,計數(shù)器损姜,信號箱的資源都是維護這個狀態(tài),類似于資源總數(shù)
private volatile int state;
Node類
該類是兩個隊列實現(xiàn)的關鍵殊霞,Sync queue摧阅,Condition Queue里的節(jié)點都是Node節(jié)點
static final class Node {
/**
SHARED 和EXCLUSIVE 主要區(qū)分共享模式和獨占模式
**/
static final Node SHARED = new Node();
static final Node EXCLUSIVE = null;
// 取消狀態(tài)
static final int CANCELLED = 1;
// 該狀態(tài)說明后繼節(jié)點需要被喚醒,獨占模式
static final int SIGNAL = -1;
// 該狀態(tài)說明節(jié)點是Condition下的節(jié)點绷蹲,參考Condition解析
static final int CONDITION = -2;
// 該狀態(tài)用于共享模式棒卷,無條件傳播喚醒
static final int PROPAGATE = -3;
// waitStatus 表示node節(jié)點的狀態(tài)顾孽,0初始結束狀態(tài)
volatile int waitStatus;
// Sync Queue是雙向鏈表
volatile Node prev;
volatile Node next;
// node線程
volatile Thread thread;
// Condition queue 單向鏈表
Node nextWaiter;
final boolean isShared() {
return nextWaiter == SHARED;
}
final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}
Node() { // Used to establish initial head or SHARED marker
}
Node(Thread thread, Node mode) { // Used by addWaiter
this.nextWaiter = mode;
this.thread = thread;
}
Node(Thread thread, int waitStatus) { // Used by Condition
this.waitStatus = waitStatus;
this.thread = thread;
}
}
獨占獲取資源方法
獨占獲取資源的入口如下:
// 忽略中斷異常
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
// 拋出中斷異常
public final void acquireInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (!tryAcquire(arg))
doAcquireInterruptibly(arg);
}
// 超時獲取,并拋出中斷異常
public final boolean tryAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
return tryAcquire(arg) ||
doAcquireNanos(arg, nanosTimeout);
}
tryAcquire留給工具自己去實現(xiàn)比规,用于判斷是否滿足獲取資源要求
本文重點分析acquireQueued若厚,doAcquireInterruptibly,doAcquireNanos實現(xiàn)蜒什。
acquireQueued方法
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
// 獨占模式一次只能喚醒一個節(jié)點测秸,即喚醒head的后繼節(jié)點,這里判斷節(jié)點是否是head的后繼節(jié)點
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
// 如果是head的后繼節(jié)點灾常,說明沒有競爭霎冯,直接把改節(jié)點設置成head節(jié)點,然后返回中斷狀態(tài)
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
// 如果不是head的后繼節(jié)點钞瀑,說明存在競爭肃晚,需要封裝改節(jié)點,然后等待被喚醒
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
shouldParkAfterFailedAcquire
/**
* 該方法檢查是否滿足節(jié)點被封裝的要求
* 即pred的狀態(tài)是否是SIGNAL仔戈,SIGNAL代表后繼節(jié)點需要被喚醒
* 不滿足要求,修改滿足要求拧廊,返回false监徘,再來一次
*/
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
// 滿足要求,返回true
if (ws == Node.SIGNAL)
return true;
// 大于0表示節(jié)點的前驅節(jié)點被取消吧碾,需要跳過已被取消的所有節(jié)點
if (ws > 0) {
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
// 否則把前驅節(jié)點設置成SIGNAL狀態(tài)
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
parkAndCheckInterrupt
// 該方法封裝阻塞線程凰盔,runnable->waiting,被喚醒之后檢查是否是中斷喚醒
private final boolean parkAndCheckInterrupt() {
// 這里阻塞,線程狀態(tài)由runnable變成waiting狀態(tài)倦春,直到被喚醒
LockSupport.park(this);
// 被喚醒之后户敬,檢查是否是中斷喚醒
return Thread.interrupted();
}
cancelAcquire
//因異常取消獲取節(jié)點
private void cancelAcquire(Node node) {
// Ignore if node doesn't exist
if (node == null)
return;
node.thread = null; // 1. 線程引用清空
Node pred = node.prev;
while (pred.waitStatus > 0) // 2. 若前繼節(jié)點是 CANCELLED 的, 則也一并清除
node.prev = pred = pred.prev;
Node predNext = pred.next; // 3. 這里的 predNext也是需要清除的(只不過在清除時的 CAS 操作需要 它)
node.waitStatus = Node.CANCELLED; // 4. 設置成清除狀態(tài)
if (node == tail && compareAndSetTail(node, pred)) { // 5. 若需要清除額節(jié)點是尾節(jié)點, 則直接 設置pred為尾節(jié)點,并刪除predNext
compareAndSetNext(pred, predNext, null);
} else {
int ws;
if (pred != head &&
((ws = pred.waitStatus) == Node.SIGNAL || // 6. 如果pred沒被取消睁本,設置pred的waitStatus==SIGNAL 表示后繼節(jié)點需要喚醒
(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
pred.thread != null) {
Node next = node.next;
if (next != null && next.waitStatus <= 0) // 7. next.waitStatus <= 0 表示 next 是個一個想要獲取lock的節(jié)點
compareAndSetNext(pred, predNext, next);
} else {
unparkSuccessor(node); // 若 pred 是頭節(jié)點, 直接喚醒下node的next節(jié)點尿庐。
}
node.next = node; // help GC
}
}
以上是獨占方式獲取資源的過程,其中doAcquireInterruptibly呢堰,doAcquireNanos兩方法實現(xiàn)和acquireQueued實現(xiàn)區(qū)別不是很大抄瑟,支持拋出中斷異常,和超時等待枉疼,不做具體分析皮假。
release
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
// 如果head不為空,并列有后繼節(jié)點需要被喚醒骂维,直接喚醒后繼節(jié)點
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
unparkSuccessor
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
// 喚醒后繼節(jié)點之前惹资,先把node設置成最終狀態(tài)0
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
// 拿到node的后繼節(jié)點,如果被取消航闺,設置成null褪测,遍歷,直到拿到需要被喚醒的節(jié)點
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
//不為空,喚醒
if (s != null)
LockSupport.unpark(s.thread);
}
以上就是獨占模式下獲取資源和釋放資源的過程汰扭,之后會繼續(xù)分析共享模式實現(xiàn)稠肘。