前言
JDK并發(fā)包是基于兩個基礎(chǔ)類 Unsafe
和 LockSupport
實現(xiàn)的:
- unsafe 提供基于os系統(tǒng)調(diào)用的內(nèi)存CAS操作
- LockSupport 提供基于os系統(tǒng)調(diào)用的線程阻塞甸赃、喚醒
在上面兩個核心類的基礎(chǔ)上,JDK并發(fā)包基于鏈表實現(xiàn)的隊列文黎,實現(xiàn)了核心類 AbstractQueuedSynchronizer
, 這個同步器是并發(fā)包其他一切工具類的底層實現(xiàn)核心百炬;
AQS 源碼分析
由于JDK里AQS的源碼風(fēng)格比較像C++,易讀性也比較差,我重新整理了下代碼注釋和結(jié)構(gòu)断箫,便于分析
package zxl.org.aqs;
/**
* AQS隊列的節(jié)點實現(xiàn)
*
* @author zhouxiliang
* @date 2020/9/11 19:34
*/
public class Node {
static final Node SHARED = new Node();
static final Node EXCLUSIVE = null;
static final int CANCELLED = 1;
static final int SIGNAL = -1;
static final int CONDITION = -2;
static final int PROPAGATE = -3;
/**
* Node 屬性
*/
//最關(guān)鍵屬性 每個Node都代表一個線程
volatile Thread thread;
//Node可以組成雙向鏈表
volatile Node prev;
volatile Node next;
//下面兩個字段只是用來表示Node狀態(tài)的
/**
* 共享的node 為 shared
* 獨享的node 為 null
*/
Node nextWaiter;
/**
* 參見上面的4個狀態(tài) cancel 代表取消等待 condition 代表在等待事件發(fā)生 signal 代表事件發(fā)生了
* propagate 是傳播汇歹,應(yīng)該是共享節(jié)點用的
*/
volatile int waitStatus;
public Node() {
}
public Node(Thread thread, Node mode) {
this.nextWaiter = mode;
this.thread = thread;
}
public Node(Thread thread, int waitStatus) {
this.waitStatus = waitStatus;
this.thread = thread;
}
public final boolean isShared() {
return nextWaiter == SHARED;
}
public final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}
}
package zxl.org.aqs;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Date;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.LockSupport;
/**
* AQS 的等待隊列(一個競爭隊列,可以掛載N個等待隊列)
*
* @author zhouxiliang
* @date 2020/9/11 19:43
*/
public class ConditionObject implements Condition, java.io.Serializable {
private static final long serialVersionUID = 1173984872572414699L;
private static final long spinForTimeoutThreshold = 1000L;
private static final int REINTERRUPT = 1;
private static final int THROW_IE = -1;
/**
* condition 的內(nèi)部屬性
*/
private transient Node firstWaiter;
private transient Node lastWaiter;
//每個condition都必須附屬于一個 synchronizer
private AbstractQueuedSynchronizer synchronizer;
/**
* 將源碼的內(nèi)部類獨立提取出來 方便理解
*
* @param synchronizer
*/
public ConditionObject(AbstractQueuedSynchronizer synchronizer) {
this.synchronizer = synchronizer;
}
/**
* conditionObject 的內(nèi)部核心方法
*/
/**
* 添加節(jié)點邏輯
* 添加時候會檢查最后一個Node是否取消犁罩,如果取消過則觸發(fā)整個鏈表的清理過程
*
* @return
*/
private Node addConditionWaiter() {
Node t = lastWaiter;
if (t != null && t.waitStatus != Node.CONDITION) {
//檢查最后一個Node 的狀態(tài)齐蔽,如果被重置為非condition,則清理一下整個鏈表
unlinkCancelledWaiters();
t = lastWaiter;
}
//創(chuàng)建一個新的condition狀態(tài)的節(jié)點
Node node = new Node(Thread.currentThread(), Node.CONDITION);
if (t == null) {
//初始化
lastWaiter = firstWaiter = node;
} else {
//移動尾部節(jié)點 這里處理方式為單向鏈表
t.nextWaiter = node;
lastWaiter = node;
}
//返回新創(chuàng)建的節(jié)點
return node;
}
/**
* 將取消等待的Node從單向鏈表中刪除
*/
private void unlinkCancelledWaiters() {
Node t = firstWaiter;
//循環(huán)過程中用來保存前一個condition節(jié)點
Node trail = null;
//簡單遍歷床估,將非condition節(jié)點從單向鏈表里刪除
while (t != null) {
Node next = t.nextWaiter;
if (t.waitStatus != Node.CONDITION) {
t.nextWaiter = null;
if (trail == null)
firstWaiter = next;
else
trail.nextWaiter = next;
if (next == null)
lastWaiter = trail;
} else
trail = t;
t = next;
}
}
/**
* 簡單的從單向等待列表里 刪除第一個condition節(jié)點
* 這里的循環(huán)是為了保證確定有一個node是在 condition狀態(tài)肴熏, synchronizer會判斷刪除節(jié)點的狀態(tài),并加入到競爭隊列
*
* @param first
*/
private void doSignal(Node first) {
do {
if ((firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
} while (!synchronizer.transferForSignal(first) &&
(first = firstWaiter) != null);
}
/**
* 同上
* 只是循環(huán)處理所有等待節(jié)點顷窒,鍵入synchronizer的競爭隊列里
*
* @param first
*/
private void doSignalAll(Node first) {
lastWaiter = firstWaiter = null;
do {
Node next = first.nextWaiter;
first.nextWaiter = null;
synchronizer.transferForSignal(first);
first = next;
} while (first != null);
}
/**
* 針對Condition接口的實現(xiàn)
*/
/**
* 增加了邊界條件判斷
* 其他邏輯參考 doSignal
*/
public final void signal() {
if (!synchronizer.isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignal(first);
}
/**
* 增加了邊界條件判斷
* 其他邏輯參考 doSignalAll
*/
public final void signalAll() {
if (!synchronizer.isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignalAll(first);
}
/**
* 1. 加入等待隊列
* 2. 通過block方式等待進(jìn)入競爭隊列
* 3. 通過block方式等待變?yōu)楦偁庩犃械牡谝粋€
*/
public final void awaitUninterruptibly() {
//將當(dāng)前線程加入到等到隊列中
Node node = addConditionWaiter();
//根據(jù)具體鎖的實現(xiàn)來決定是否激活競爭隊列中的一個線程
int savedState = synchronizer.fullyRelease(node);
boolean interrupted = false;
//沒進(jìn)入競爭隊列則一直等待 忽略interrupted標(biāo)志
while (!synchronizer.isOnSyncQueue(node)) {
LockSupport.park(this);
if (Thread.interrupted())
interrupted = true;
}
//需要當(dāng)前線程的node 在競爭隊列中排首位才會停止阻塞蛙吏,acquireQueued 是一個不停檢查和通過 LockSupport.park 等待的過程
if (synchronizer.acquireQueued(node, savedState) || interrupted)
synchronizer.selfInterrupt();
}
/**
* 基本邏輯同 awaitUninterruptibly
* <p>
* 不同之處在于處理 Thread.interupted 狀態(tài)標(biāo)識,如果線程被設(shè)置了這個狀態(tài)位鞋吉,則不再被動等待進(jìn)入 競爭隊列鸦做, 直接主動進(jìn)入競爭隊列
* 另外一個注意的點,不是設(shè)置了interupted就會拋異常谓着,還有一種就是signal+interupted是幾乎同時觸發(fā)的情況泼诱,不拋異常
*
* @throws InterruptedException
*/
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
Node node = addConditionWaiter();
int savedState = synchronizer.fullyRelease(node);
int interruptMode = 0;
while (!synchronizer.isOnSyncQueue(node)) {
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
if (synchronizer.acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
private int checkInterruptWhileWaiting(Node node) {
return Thread.interrupted() ?
(synchronizer.transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
0;
}
private void reportInterruptAfterWait(int interruptMode)
throws InterruptedException {
if (interruptMode == THROW_IE)
throw new InterruptedException();
else if (interruptMode == REINTERRUPT)
synchronizer.selfInterrupt();
}
/**
* 基本流程同await
* 在等待進(jìn)入競爭隊列的過程中,加入了時間判斷赊锚,如果超過等待時間則直接進(jìn)入競爭隊列
*
* @param nanosTimeout
* @return
* @throws InterruptedException
*/
public final long awaitNanos(long nanosTimeout)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
Node node = addConditionWaiter();
int savedState = synchronizer.fullyRelease(node);
final long deadline = System.nanoTime() + nanosTimeout;
int interruptMode = 0;
while (!synchronizer.isOnSyncQueue(node)) {
if (nanosTimeout <= 0L) {
synchronizer.transferAfterCancelledWait(node);
break;
}
if (nanosTimeout >= spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanosTimeout);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
nanosTimeout = deadline - System.nanoTime();
}
if (synchronizer.acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null)
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
return deadline - System.nanoTime();
}
/**
* 邏輯同上
*
* @param deadline
* @return
* @throws InterruptedException
*/
public final boolean awaitUntil(Date deadline)
throws InterruptedException {
long abstime = deadline.getTime();
if (Thread.interrupted())
throw new InterruptedException();
Node node = addConditionWaiter();
int savedState = synchronizer.fullyRelease(node);
boolean timedout = false;
int interruptMode = 0;
while (!synchronizer.isOnSyncQueue(node)) {
if (System.currentTimeMillis() > abstime) {
timedout = synchronizer.transferAfterCancelledWait(node);
break;
}
LockSupport.parkUntil(this, abstime);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
if (synchronizer.acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null)
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
return !timedout;
}
/**
* 邏輯同上
* 這坨坨的冗余代碼治筒。。
*
* @param time
* @param unit
* @return
* @throws InterruptedException
*/
public final boolean await(long time, TimeUnit unit)
throws InterruptedException {
long nanosTimeout = unit.toNanos(time);
if (Thread.interrupted())
throw new InterruptedException();
Node node = addConditionWaiter();
int savedState = synchronizer.fullyRelease(node);
final long deadline = System.nanoTime() + nanosTimeout;
boolean timedout = false;
int interruptMode = 0;
while (!synchronizer.isOnSyncQueue(node)) {
if (nanosTimeout <= 0L) {
timedout = synchronizer.transferAfterCancelledWait(node);
break;
}
if (nanosTimeout >= spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanosTimeout);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
nanosTimeout = deadline - System.nanoTime();
}
if (synchronizer.acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null)
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
return !timedout;
}
/**
* 工具擴展
*/
final boolean isOwnedBy(AbstractQueuedSynchronizer sync) {
return sync == synchronizer;
}
/**
* 遍歷整個鏈表
* 檢查是否有等待狀態(tài)的node
*
* @return
*/
protected final boolean hasWaiters() {
if (!synchronizer.isHeldExclusively())
throw new IllegalMonitorStateException();
for (Node w = firstWaiter; w != null; w = w.nextWaiter) {
if (w.waitStatus == Node.CONDITION)
return true;
}
return false;
}
/**
* 遍歷整個鏈表
* 計算等待狀態(tài)的node數(shù)量
*
* @return
*/
protected final int getWaitQueueLength() {
if (!synchronizer.isHeldExclusively())
throw new IllegalMonitorStateException();
int n = 0;
for (Node w = firstWaiter; w != null; w = w.nextWaiter) {
if (w.waitStatus == Node.CONDITION)
++n;
}
return n;
}
/**
* 遍歷整個鏈表
* 獲取等待狀態(tài)的node線程集合
*
* @return
*/
protected final Collection<Thread> getWaitingThreads() {
if (!synchronizer.isHeldExclusively())
throw new IllegalMonitorStateException();
ArrayList<Thread> list = new ArrayList<Thread>();
for (Node w = firstWaiter; w != null; w = w.nextWaiter) {
if (w.waitStatus == Node.CONDITION) {
Thread t = w.thread;
if (t != null)
list.add(t);
}
}
return list;
}
}
package zxl.org.aqs;
import sun.misc.Unsafe;
import java.util.ArrayList;
import java.util.Collection;
import java.util.concurrent.locks.AbstractOwnableSynchronizer;
import java.util.concurrent.locks.LockSupport;
/**
* AQS 的核心實現(xiàn)
* 代碼只有幾百行
* 但是能從幾百行代碼領(lǐng)悟多少精髓 全看悟性了
* 理解代碼的時候舷蒲,注意里面并不是順序的邏輯耸袜,會在多線程之間跳轉(zhuǎn)
*/
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable {
private static final long serialVersionUID = 7373984972572414691L;
//用來判斷使用自旋或者 os mutex 單位為ns 1ms=1000us=1000,000 ns cpu指令一般在ns級別
private static final long spinForTimeoutThreshold = 1000L;
//sun 操作內(nèi)存的內(nèi)部工具類
private static final Unsafe unsafe = Unsafe.getUnsafe();
/**
* 下面幾個字段用來獲取 字段在對象內(nèi)存中的偏移量
*/
private static final long stateOffset;
private static final long headOffset;
private static final long tailOffset;
private static final long waitStatusOffset;
private static final long nextOffset;
static {
try {
stateOffset = unsafe.objectFieldOffset
(AbstractQueuedSynchronizer.class.getDeclaredField("state"));
headOffset = unsafe.objectFieldOffset
(AbstractQueuedSynchronizer.class.getDeclaredField("head"));
tailOffset = unsafe.objectFieldOffset
(AbstractQueuedSynchronizer.class.getDeclaredField("tail"));
waitStatusOffset = unsafe.objectFieldOffset
(Node.class.getDeclaredField("waitStatus"));
nextOffset = unsafe.objectFieldOffset
(Node.class.getDeclaredField("next"));
} catch (Exception ex) {
throw new Error(ex);
}
}
private static final boolean compareAndSetWaitStatus(Node node, int expect, int update) {
return unsafe.compareAndSwapInt(node, waitStatusOffset, expect, update);
}
private static final boolean compareAndSetNext(Node node, Node expect, Node update) {
return unsafe.compareAndSwapObject(node, nextOffset, expect, update);
}
/**
* 當(dāng) node 競爭鎖失敗后(此時一般node處于鏈表的中間),是否進(jìn)入 系統(tǒng)調(diào)用 block線程
*
* @param pred
* @param node
* @return
*/
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
//前一個都在等呢 直接返回true 阻塞當(dāng)前節(jié)點的線程
return true;
if (ws > 0) {
//前一個節(jié)點取消了牲平,則清理一下鏈表
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
//前一個節(jié)點是 propagate 則設(shè)置為 signal
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
//清理或改完狀態(tài)后 返回外層調(diào)用后不要阻塞當(dāng)前線程
return false;
}
static void selfInterrupt() {
Thread.currentThread().interrupt();
}
/**
* AQS 核心的三個內(nèi)部成員
* 維護(hù)了一個雙向鏈表的 head堤框、tail
* 維護(hù)了一個 int型的狀態(tài)標(biāo)識 32位Bit
*/
private transient volatile Node head;
private transient volatile Node tail;
private volatile int state;
protected AbstractQueuedSynchronizer() {
}
/**
* 利用操作系統(tǒng)底層的 CAS 指令完成
* 是一種樂觀鎖的實現(xiàn)方式
*
* @param update
* @return
*/
private final boolean compareAndSetHead(Node update) {
return unsafe.compareAndSwapObject(this, headOffset, null, update);
}
/**
* 同上
*
* @param expect
* @param update
* @return
*/
private final boolean compareAndSetTail(Node expect, Node update) {
return unsafe.compareAndSwapObject(this, tailOffset, expect, update);
}
protected final int getState() {
return state;
}
protected final void setState(int newState) {
state = newState;
}
/**
* 同上
*
* @param expect
* @param update
* @return
*/
protected final boolean compareAndSetState(int expect, int update) {
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
/**
* 在競爭隊列里加一個 node
*
* @param node
* @return
*/
private Node enq(final Node node) {
for (; ; ) {
Node t = tail;
if (t == null) {
//初始化過程包含兩步 設(shè)置head、設(shè)置tail (需要對其他線程有順序性纵柿、可見性蜈抓、原子性)
//其他線程通過判斷最后賦值的 volatile 變量 tail , 來保證初始操作的原子性
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
//這里掛載不是包含三個步驟昂儒,并不是原子性的
//掛載成功的標(biāo)志是設(shè)置tail成功(但雙向鏈表可能還不滿足 prev.next == tail)
//所以其他線程遍歷的時候沟使,有時需要從tail 反向遍歷
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
/**
* 跟enq 入隊類似
* 添加一個 共享或獨享 節(jié)點 node
*
* @param mode
* @return
*/
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
private void setHead(Node node) {
head = node;
node.thread = null;
node.prev = null;
}
/**
* 激活雙向鏈表中,node后面的某一個非取消節(jié)點線程
* 如果激活的節(jié)點恰好獲取到了鎖渊跋,則會改變 競爭隊列的head
*
* @param node
*/
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
if (ws < 0) {
//激活后續(xù)節(jié)點的時候 當(dāng)前節(jié)點的狀態(tài)被吃掉了 signal腊嗡、propagate -> 0
compareAndSetWaitStatus(node, ws, 0);
}
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
//這段代碼是為了兼容其他線程正在并發(fā)修改tail的情況
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null) {
LockSupport.unpark(s.thread);
}
}
/**
* 如果head節(jié)點是 signal 狀態(tài)撤缴,則激活競爭隊列里的一個非取消節(jié)點線程
* 如果head節(jié)點是 復(fù)位 狀態(tài), 則設(shè)置為 propagate 狀態(tài)后返回
*/
private void doReleaseShared() {
for (; ; ) {
Node h = head;
//鏈表非空
if (h != null && h != tail) {
int ws = h.waitStatus;
//右邊相鄰node在park的時候 會設(shè)置head的狀態(tài)為 signal
if (ws == Node.SIGNAL) {
//這里必須吃掉signal 才能激活后續(xù)節(jié)點線程
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
//激活右鄰的node線程
unparkSuccessor(h);
} else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) {
//代碼執(zhí)行到這里 一般代表沒有右鄰節(jié)點在等待
continue;
}
}
if (h == head) {
// loop if head changed
// 如果head節(jié)點被 激活線程給吃掉了 則繼續(xù)嘗試激活等待線程
break;
}
}
}
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head; // Record old head for check below
setHead(node);
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
if (s == null || s.isShared())
//共享鎖的傳播
doReleaseShared();
}
}
/**
* 取消node的時候 需要清理鏈表
* 后續(xù)節(jié)點要么掛到新的 pred節(jié)點叽唱,要么全部喚醒
*
* @param node
*/
private void cancelAcquire(Node node) {
// Ignore if node doesn't exist
if (node == null) {
return;
}
node.thread = null;
// Skip cancelled predecessors
Node pred = node.prev;
while (pred.waitStatus > 0)
node.prev = pred = pred.prev;
// predNext is the apparent node to unsplice. CASes below will
// fail if not, in which case, we lost race vs another cancel
// or signal, so no further action is necessary.
Node predNext = pred.next;
// Can use unconditional write instead of CAS here.
// After this atomic step, other Nodes can skip past us.
// Before, we are free of interference from other threads.
node.waitStatus = Node.CANCELLED;
// If we are the tail, remove ourselves.
if (node == tail && compareAndSetTail(node, pred)) {
compareAndSetNext(pred, predNext, null);
return;
}
// If successor needs signal, try to set pred's next-link
// so it will get one. Otherwise wake it up to propagate.
int ws;
if (pred != head &&
((ws = pred.waitStatus) == Node.SIGNAL ||
(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
pred.thread != null) {
Node next = node.next;
if (next != null && next.waitStatus <= 0) {
compareAndSetNext(pred, predNext, next);
}
} else {
unparkSuccessor(node);
}
node.next = node; // help GC
}
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
/**
* node 競爭鎖(競爭隊列的第一位)
* 競爭成功后 會改變head
*
* @param node
* @param arg
* @return
*/
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (; ; ) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
//等待被激活
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
/**
* 當(dāng)前線程 競爭獨占鎖
* 1. 先將當(dāng)前線程創(chuàng)建一個 獨占node 加入到競爭隊列
* 2. 競爭隊列頭部
*
* @param arg
* @throws InterruptedException
*/
private void doAcquireInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (; ; ) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
/**
* 基本邏輯同上
* <p>
* 當(dāng)前線程競爭獨占鎖
* 加入了超時時間屈呕,超時則直接返回失敗
*
* @param arg
* @param nanosTimeout
* @return
* @throws InterruptedException
*/
private boolean doAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (nanosTimeout <= 0L)
return false;
final long deadline = System.nanoTime() + nanosTimeout;
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (; ; ) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return true;
}
nanosTimeout = deadline - System.nanoTime();
if (nanosTimeout <= 0L)
return false;
if (shouldParkAfterFailedAcquire(p, node) &&
nanosTimeout > spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanosTimeout);
if (Thread.interrupted())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
/**
* 基本邏輯同 doAcquire
* 不同之處在于 獲取到 共享鎖之后,調(diào)用了 setHeadAndPropagate
* setHeadAndPropagate 內(nèi)部會繼續(xù)releaseShared
*
* @param arg
*/
private void doAcquireShared(int arg) {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
for (; ; ) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
//獲取共享鎖 這行代碼是關(guān)鍵 假設(shè)當(dāng)前線程為A棺亭,則A會激活 線程B(node緊鄰的后續(xù)node線程B)的執(zhí)行點
setHeadAndPropagate(node, r);
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
//線程B的執(zhí)行點
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
/**
* 同上
*
* @param arg
* @throws InterruptedException
*/
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (; ; ) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
private boolean doAcquireSharedNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (nanosTimeout <= 0L)
return false;
final long deadline = System.nanoTime() + nanosTimeout;
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (; ; ) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return true;
}
}
nanosTimeout = deadline - System.nanoTime();
if (nanosTimeout <= 0L)
return false;
if (shouldParkAfterFailedAcquire(p, node) &&
nanosTimeout > spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanosTimeout);
if (Thread.interrupted())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
/**
* 下面4個方法為主要的擴展點 用內(nèi)部的 int類型 保存狀態(tài)
*/
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
protected boolean tryRelease(int arg) {
throw new UnsupportedOperationException();
}
protected int tryAcquireShared(int arg) {
throw new UnsupportedOperationException();
}
protected boolean tryReleaseShared(int arg) {
throw new UnsupportedOperationException();
}
/**
* 當(dāng)前線程是否獲取了鎖
*
* @return
*/
protected boolean isHeldExclusively() {
throw new UnsupportedOperationException();
}
/**
* 內(nèi)部會快速嘗試 tryAcquire 如果 成功直接返回
* 否則 走正常的 競爭隊列排隊邏輯
*
* @param arg
*/
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
/**
* 同上
* 特殊處理interupted 標(biāo)識虎眨,如果中斷標(biāo)識則拋異常
*
* @param arg
* @throws InterruptedException
*/
public final void acquireInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (!tryAcquire(arg))
doAcquireInterruptibly(arg);
}
public final boolean tryAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
return tryAcquire(arg) ||
doAcquireNanos(arg, nanosTimeout);
}
/**
* 如果 tryRelease成功了(子類擴展點),則激活競爭鏈表中的某一個節(jié)點線程
*
* @param arg
* @return
*/
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
//注意判斷waitStatus為非零狀態(tài)
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
return tryAcquireShared(arg) >= 0 ||
doAcquireSharedNanos(arg, nanosTimeout);
}
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
// Queue inspection methods
public final boolean hasQueuedThreads() {
return head != tail;
}
public final boolean hasContended() {
return head != null;
}
public final Thread getFirstQueuedThread() {
// handle only fast path, else relay
return (head == tail) ? null : fullGetFirstQueuedThread();
}
private Thread fullGetFirstQueuedThread() {
Node h, s;
Thread st;
if (((h = head) != null && (s = h.next) != null &&
s.prev == head && (st = s.thread) != null) ||
((h = head) != null && (s = h.next) != null &&
s.prev == head && (st = s.thread) != null))
return st;
Node t = tail;
Thread firstThread = null;
while (t != null && t != head) {
Thread tt = t.thread;
if (tt != null)
firstThread = tt;
t = t.prev;
}
return firstThread;
}
public final boolean isQueued(Thread thread) {
if (thread == null)
throw new NullPointerException();
for (Node p = tail; p != null; p = p.prev)
if (p.thread == thread)
return true;
return false;
}
final boolean apparentlyFirstQueuedIsExclusive() {
Node h, s;
return (h = head) != null &&
(s = h.next) != null &&
!s.isShared() &&
s.thread != null;
}
public final boolean hasQueuedPredecessors() {
// The correctness of this depends on head being initialized
// before tail and on head.next being accurate if the current
// thread is first in queue.
Node t = tail; // Read fields in reverse initialization order
Node h = head;
Node s;
return h != t &&
((s = h.next) == null || s.thread != Thread.currentThread());
}
// Instrumentation and monitoring methods
public final int getQueueLength() {
int n = 0;
for (Node p = tail; p != null; p = p.prev) {
if (p.thread != null)
++n;
}
return n;
}
public final Collection<Thread> getQueuedThreads() {
ArrayList<Thread> list = new ArrayList<Thread>();
for (Node p = tail; p != null; p = p.prev) {
Thread t = p.thread;
if (t != null)
list.add(t);
}
return list;
}
public final Collection<Thread> getExclusiveQueuedThreads() {
ArrayList<Thread> list = new ArrayList<Thread>();
for (Node p = tail; p != null; p = p.prev) {
if (!p.isShared()) {
Thread t = p.thread;
if (t != null)
list.add(t);
}
}
return list;
}
public final Collection<Thread> getSharedQueuedThreads() {
ArrayList<Thread> list = new ArrayList<Thread>();
for (Node p = tail; p != null; p = p.prev) {
if (p.isShared()) {
Thread t = p.thread;
if (t != null)
list.add(t);
}
}
return list;
}
public String toString() {
int s = getState();
String q = hasQueuedThreads() ? "non" : "";
return super.toString() +
"[State = " + s + ", " + q + "empty queue]";
}
// Internal support methods for Conditions
final boolean isOnSyncQueue(Node node) {
if (node.waitStatus == Node.CONDITION || node.prev == null)
return false;
if (node.next != null) // If has successor, it must be on queue
return true;
return findNodeFromTail(node);
}
private boolean findNodeFromTail(Node node) {
Node t = tail;
for (; ; ) {
if (t == node)
return true;
if (t == null)
return false;
t = t.prev;
}
}
/**
* 更改node的狀態(tài)為非condition
* 將node加入到競爭隊列里
* 檢查競爭隊列的上一個節(jié)點是否取消過镶摘,取消過則激活對應(yīng)的線程
*
* @param node
* @return
*/
final boolean transferForSignal(Node node) {
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
//將node加入到 AQS里的競爭隊列末尾 返回的是競爭隊列里的上一個tail
Node p = enq(node);
int ws = p.waitStatus;
//檢查上一個tail節(jié)點是不是 cancel了嗽桩, 或者 tail節(jié)點的狀態(tài)有多線程競爭,直接激活這個節(jié)點的線程
//在node狀態(tài)改為signal之前 如果其他線程改變了傳播狀態(tài)waitStatus凄敢,這里會直接觸發(fā)激活等待線程
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread);
return true;
}
final boolean transferAfterCancelledWait(Node node) {
if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
enq(node);
return true;
}
while (!isOnSyncQueue(node))
Thread.yield();
return false;
}
/**
* 成功了則激活一個競爭隊列的節(jié)點線程
* 失敗了碌冶,node會被設(shè)置為cancel狀態(tài)
*
* @param node
* @return
*/
final int fullyRelease(Node node) {
boolean failed = true;
try {
int savedState = getState();
if (release(savedState)) {
failed = false;
return savedState;
} else {
throw new IllegalMonitorStateException();
}
} finally {
if (failed)
node.waitStatus = Node.CANCELLED;
}
}
// Instrumentation methods for conditions
public final boolean owns(ConditionObject condition) {
return condition.isOwnedBy(this);
}
public final boolean hasWaiters(ConditionObject condition) {
if (!owns(condition))
throw new IllegalArgumentException("Not owner");
return condition.hasWaiters();
}
public final int getWaitQueueLength(ConditionObject condition) {
if (!owns(condition))
throw new IllegalArgumentException("Not owner");
return condition.getWaitQueueLength();
}
public final Collection<Thread> getWaitingThreads(ConditionObject condition) {
if (!owns(condition))
throw new IllegalArgumentException("Not owner");
return condition.getWaitingThreads();
}
}