AQS的三個(gè)核心點(diǎn)
- state
- 協(xié)作類實(shí)現(xiàn)的獲取鎖/釋放鎖的方法
- FIFO隊(duì)列
關(guān)于state
state
是用來判斷是否有線程占用當(dāng)前鎖,與另一個(gè)參數(shù)exclusiveOwnerThread
配合使用
以ReentrantLock獲取鎖為例
/**
* ReentrantLock 獲取非公平鎖的代碼
*/
static final class NonfairSync extends Sync {
private static final long serialVersionUID = 7316153563782823691L;
/**
* Performs lock. Try immediate barge, backing up to normal
* acquire on failure.
*/
final void lock() {
// 設(shè)置state -》從0到1
if (compareAndSetState(0, 1))
// 將占用的線程改成當(dāng)前線程
setExclusiveOwnerThread(Thread.currentThread());
else
// 設(shè)置失敗杀怠,說明當(dāng)前的鎖被其他線程占用目锭,嘗試獲取鎖
acquire(1);
}
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
}
關(guān)于協(xié)作類實(shí)現(xiàn)的釋放鎖/獲取鎖的方法
以上面的例子為例,當(dāng)線程要獲取的鎖被其他線程占用的時(shí)候滑频,就需要我們?nèi)プ远ㄒ粋€(gè)獲取鎖的邏輯
public final void acquire(int arg) {
// tryAcquire 就是協(xié)作類自定義的獲取鎖的邏輯
if (!tryAcquire(arg) &&
// 獲取失敗橙垢,統(tǒng)一交給AQS管理(添加等待節(jié)點(diǎn)豺憔,放入隊(duì)列中驶拱,將當(dāng)前線程掛起)-這套屬于固有的邏輯,不需要協(xié)作類去實(shí)現(xiàn)(實(shí)現(xiàn)成本高晶衷,且屬于重復(fù)代碼)
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
那么來重點(diǎn)看看 tryAcquire
方法 (接著以非公平鎖為例)
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
// 再次獲取一下state
int c = getState();
if (c == 0) {
// 說明鎖被釋放蓝纲,再次cas設(shè)置state
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
// 如果持有鎖的線程是當(dāng)前線索,state+1(可重入)
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
再次嘗試晌纫,獲取的到鎖還好說税迷。假如獲取不到鎖,就需要用到了剛剛提到的FIFO隊(duì)列
AQS核心內(nèi)容 FIFO隊(duì)列及入隊(duì)出隊(duì)規(guī)則-入隊(duì)
此處結(jié)合著我們剛剛將的流程來锹漱,不單獨(dú)針對(duì)各個(gè)點(diǎn)做敘述
單獨(dú)再將這塊代碼拿出來
if (!tryAcquire(arg) &&
// 獲取失敗箭养,統(tǒng)一交給AQS管理(添加等待節(jié)點(diǎn),放入隊(duì)列中哥牍,將當(dāng)前線程掛起)-這套屬于固有的邏輯毕泌,不需要協(xié)作類去實(shí)現(xiàn)(實(shí)現(xiàn)成本高,且屬于重復(fù)代碼)
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
添加等待節(jié)點(diǎn)
private Node addWaiter(Node mode) {
// 新建一個(gè)節(jié)點(diǎn)對(duì)象
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
// 已經(jīng)有等待節(jié)點(diǎn)的情況
if (pred != null) {
node.prev = pred;
// CAS將當(dāng)前節(jié)點(diǎn)設(shè)置為尾節(jié)點(diǎn)
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
// 入隊(duì)(此處是沒有等待節(jié)點(diǎn)的情況)
enq(node);
return node;
}
// CAS操作
private final boolean compareAndSetTail(Node expect, Node update) {
return unsafe.compareAndSwapObject(this, tailOffset, expect, update);
}
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
// 新建一個(gè)節(jié)點(diǎn)嗅辣,并將其設(shè)置成頭節(jié)點(diǎn)撼泛。并將尾節(jié)點(diǎn)也指向這個(gè)新建的頭節(jié)點(diǎn)
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
// 再將我們具體的節(jié)點(diǎn)入隊(duì),并將具體的節(jié)點(diǎn)設(shè)置為尾節(jié)點(diǎn)
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
嘗試讓隊(duì)列中的頭節(jié)點(diǎn)獲取鎖
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
// 獲取當(dāng)前節(jié)點(diǎn)的前置節(jié)點(diǎn)
final Node p = node.predecessor();
// 前置節(jié)點(diǎn)是head澡谭,說明當(dāng)前節(jié)點(diǎn)是第一個(gè)等待鎖的節(jié)點(diǎn)(此時(shí)也會(huì)讓當(dāng)前節(jié)點(diǎn)再次去嘗試獲取鎖 即tryAcquire方法)
if (p == head && tryAcquire(arg)) {
// 獲取鎖成功的處理
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
// 獲取鎖失敗的處理 shouldParkAfterFailedAcquire (這個(gè)方法一會(huì)單獨(dú)講)
if (shouldParkAfterFailedAcquire(p, node) &&
// 掛起當(dāng)前線程
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
// 響應(yīng)中斷
if (failed)
// 撤銷嘗試獲取鎖(這個(gè)也一會(huì)在再講)
cancelAcquire(node);
}
}
是否要掛起當(dāng)獲取鎖失敗方法解析 shouldParkAfterFailedAcquire
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
// 前綴節(jié)點(diǎn)的等待狀態(tài)
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
// 前面有節(jié)點(diǎn)已經(jīng)做好被喚醒的準(zhǔn)備愿题,就等資源釋放,去獲取鎖,當(dāng)前節(jié)點(diǎn)可以被掛起
return true;
if (ws > 0) {
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
// 說明是個(gè)從尾到頭的查找過程
do {
node.prev = pred = pred.prev;
// >0 說明前置節(jié)點(diǎn)已經(jīng)被取消潘酗,可以直接刪除
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
// 將前置節(jié)點(diǎn)設(shè)置為SIGNAL的狀態(tài)杆兵,先入隊(duì)的線程先被喚醒
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
此處放一個(gè)WaitStatus枚舉的狀態(tài)表
/** waitStatus value to indicate thread has cancelled */
static final int CANCELLED = 1;
/** waitStatus value to indicate successor's thread needs unparking */
static final int SIGNAL = -1;
/** waitStatus value to indicate thread is waiting on condition */
static final int CONDITION = -2;
/**
* waitStatus value to indicate the next acquireShared should
* unconditionally propagate
*/
static final int PROPAGATE = -3;
釋放鎖的代碼
public void unlock() {
sync.release(1);
}
public final boolean release(int arg) {
// 嘗試釋放鎖
if (tryRelease(arg)) {
Node h = head;
// 如果有等待節(jié)點(diǎn),嘗試喚醒(即使創(chuàng)建的臨時(shí)節(jié)點(diǎn)仔夺,也會(huì)在實(shí)際入隊(duì)的過程中將臨時(shí)節(jié)點(diǎn)改成SIGNAL狀態(tài))
if (h != null && h.waitStatus != 0)
// 喚醒
unparkSuccessor(h);
return true;
}
return false;
}
tryRelease
釋放鎖
protected final boolean tryRelease(int releases) {
// state - release
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
// state = 0琐脏,代表釋放鎖
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
AQS核心內(nèi)容 FIFO隊(duì)列及入隊(duì)出隊(duì)規(guī)則-出隊(duì)
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
int ws = node.waitStatus;
if (ws < 0)
// 頭節(jié)點(diǎn)<0,設(shè)置為初始狀態(tài) (小于0可能是非虛擬節(jié)點(diǎn)作為頭節(jié)點(diǎn)的情況)
compareAndSetWaitStatus(node, ws, 0);
/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
// 從后往前找囚灼,找到最前面的可以被喚醒的節(jié)點(diǎn)(此處不做無效節(jié)點(diǎn)的刪除骆膝,刪除的操作在入隊(duì)的時(shí)候做)
// 猜測(cè):此處也是從后往前找的原因估計(jì)是為了不和入隊(duì)時(shí)候刪除的代碼沖突
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
// 喚醒節(jié)點(diǎn),再走入剛剛的acquireQueued方法
LockSupport.unpark(s.thread);
}
撤銷等待節(jié)點(diǎn) cancelAcquire
private void cancelAcquire(Node node) {
// Ignore if node doesn't exist
if (node == null)
return;
node.thread = null;
// Skip cancelled predecessors
Node pred = node.prev;
while (pred.waitStatus > 0)
// 刪除前面取消的節(jié)點(diǎn)
node.prev = pred = pred.prev;
// predNext is the apparent node to unsplice. CASes below will
// fail if not, in which case, we lost race vs another cancel
// or signal, so no further action is necessary.
Node predNext = pred.next;
// Can use unconditional write instead of CAS here.
// After this atomic step, other Nodes can skip past us.
// Before, we are free of interference from other threads.
node.waitStatus = Node.CANCELLED;
// If we are the tail, remove ourselves.
if (node == tail && compareAndSetTail(node, pred)) {
// 尾節(jié)點(diǎn)灶体,直接刪除當(dāng)前
compareAndSetNext(pred, predNext, null);
} else {
// If successor needs signal, try to set pred's next-link
// so it will get one. Otherwise wake it up to propagate.
int ws;
if (pred != head &&
((ws = pred.waitStatus) == Node.SIGNAL ||
(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
pred.thread != null) {
// 找到前一個(gè)被喚醒的節(jié)點(diǎn)
Node next = node.next;
// 如果當(dāng)前節(jié)點(diǎn)的下一個(gè)節(jié)點(diǎn)是符合被喚醒的條件阅签,
// 將前一個(gè)符合被喚醒的節(jié)點(diǎn)的下一個(gè)節(jié)點(diǎn)設(shè)置為當(dāng)前節(jié)點(diǎn)的下一個(gè)節(jié)點(diǎn)
if (next != null && next.waitStatus <= 0)
compareAndSetNext(pred, predNext, next);
} else {
unparkSuccessor(node);
}
node.next = node; // help GC
}
}
一個(gè)問題
關(guān)于剛剛為什么要從尾節(jié)點(diǎn)往前找去添加節(jié)點(diǎn)
node.prev = pred; compareAndSetTail(pred, node) 這兩個(gè)地方可以看作Tail入隊(duì)的原子操作,但是此時(shí)pred.next = node;還沒執(zhí)行蝎抽,如果這個(gè)時(shí)候執(zhí)行了unparkSuccessor方法政钟,就沒辦法從前往后找了,所以需要從后往前找樟结。還有一點(diǎn)原因养交,在產(chǎn)生CANCELLED狀態(tài)節(jié)點(diǎn)的時(shí)候,先斷開的是Next指針瓢宦,Prev指針并未斷開碎连,因此也是必須要從后往前遍歷才能夠遍歷完全部的Node。