簡(jiǎn)介
AbstractQueuedSynchronizer(簡(jiǎn)稱AQS)是JAVA中用來實(shí)現(xiàn)鎖的基礎(chǔ)框架拖吼,其內(nèi)部維護(hù)了一個(gè)等待隊(duì)列,用來存儲(chǔ)等待獲取鎖的線程巨坊。隊(duì)列遵循FIFO(先進(jìn)先出)的原則,每次入隊(duì)列都是添加到隊(duì)列的尾部,tail總是指向尾部節(jié)點(diǎn)檩帐;出隊(duì)列就是將head節(jié)點(diǎn)設(shè)置指向head的next節(jié)點(diǎn),head節(jié)點(diǎn)中沒有線程另萤,其代表當(dāng)前獲取到鎖的線程湃密。其可實(shí)現(xiàn)共享鎖和獨(dú)占鎖兩種方式的鎖。
+------+ prev +-----+ +-----+
| head | <---- | | <---- | tail|
+------+ +-----+ +-----+
隊(duì)列節(jié)點(diǎn)類
static final class Node {
/** 共享模式標(biāo)識(shí) */
static final Node SHARED = new Node();
/** 獨(dú)占模式標(biāo)識(shí) */
static final Node EXCLUSIVE = null;
/** 節(jié)點(diǎn)狀態(tài)值四敞,表示線程被取消 */
static final int CANCELLED = 1;
/** 節(jié)點(diǎn)狀態(tài)值泛源,表示后一個(gè)節(jié)點(diǎn)需要被unparking喚醒 */
static final int SIGNAL = -1;
/** 節(jié)點(diǎn)狀態(tài)值,表示在等待某一個(gè)條件目养,此節(jié)點(diǎn)在condition隊(duì)列中俩由,而不是sync隊(duì)列中 */
static final int CONDITION = -2;
/** 節(jié)點(diǎn)狀態(tài)值,喚醒可以向后傳遞癌蚁,共享模式時(shí)使用 */
static final int PROPAGATE = -3;
/**
* 節(jié)點(diǎn)狀態(tài)幻梯,sync時(shí)默認(rèn)為0;condition時(shí)默認(rèn)初始化為Node.CONDITION
*/
volatile int waitStatus;
/**
* 前一個(gè)節(jié)點(diǎn)
*/
volatile Node prev;
/**
* 后一個(gè)節(jié)點(diǎn)
*/
volatile Node next;
/**
* 節(jié)點(diǎn)代表的線程
*/
volatile Thread thread;
/**
* sync時(shí)為標(biāo)識(shí)位
*/
Node nextWaiter;
/**
* 節(jié)點(diǎn)是否為共享模式
*/
final boolean isShared() {
return nextWaiter == SHARED;
}
/**
* 返回當(dāng)前節(jié)點(diǎn)的前一個(gè)節(jié)點(diǎn)努释,前一個(gè)節(jié)點(diǎn)為空則拋異常
*
* @return the predecessor of this node
*/
final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}
Node() {
}
Node(Thread thread, Node mode) { // addWaiter時(shí)使用
this.nextWaiter = mode;
this.thread = thread;
}
Node(Thread thread, int waitStatus) { // Condition時(shí)使用
this.waitStatus = waitStatus;
this.thread = thread;
}
}
AQS中的屬性
/**
* 等待隊(duì)列的頭結(jié)點(diǎn)碘梢,惰性初始化。除初始化時(shí)伐蒂,只能通過setHead方法修改煞躬。
* 注意:如果頭結(jié)點(diǎn)存在,它的狀態(tài)確保不能是CANCELLED狀態(tài)逸邦。
*/
private transient volatile Node head;
/**
* 等待隊(duì)列的尾結(jié)點(diǎn)恩沛,惰性初始化。只能通過enq方法添加新的等待節(jié)點(diǎn)
*/
private transient volatile Node tail;
/**
* 同步狀態(tài)標(biāo)識(shí)
*/
private volatile int state;
AQS中的方法
- enq缕减,即入隊(duì)列(enqueue)方法雷客,將節(jié)點(diǎn)插入到隊(duì)列的尾部
/**
* 插入節(jié)點(diǎn)到隊(duì)列尾部,必要時(shí)需要進(jìn)行初始化
* @param node 需要被插入的節(jié)點(diǎn)
* @return 此節(jié)點(diǎn)的前一個(gè)節(jié)點(diǎn)
*/
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // 尾節(jié)點(diǎn)為空桥狡,表示隊(duì)列未被初始化搅裙,設(shè)置頭節(jié)點(diǎn)為一個(gè)空的Node,并將tail指向頭結(jié)點(diǎn)裹芝。設(shè)置完后進(jìn)行下一次循環(huán)部逮,進(jìn)入else中
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t; //設(shè)置node的前節(jié)點(diǎn)為隊(duì)列的尾節(jié)點(diǎn)
if (compareAndSetTail(t, node)) {
t.next = node; //原子更新尾節(jié)點(diǎn)成功后,node成為新的尾節(jié)點(diǎn)tail嫂易,原來的尾節(jié)點(diǎn)t成為node的前節(jié)點(diǎn)兄朋,設(shè)置t的后節(jié)點(diǎn)為node
return t;
}
}
}
}
- addWaiter,為當(dāng)前線程創(chuàng)建節(jié)點(diǎn)并加入到等待隊(duì)列中
/**
* 為當(dāng)前線程創(chuàng)建節(jié)點(diǎn)并加入到隊(duì)列中
*
* @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared
* @return the new node
*/
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) { // 與enq方法中的else中功能相同
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node); //尾節(jié)點(diǎn)tail為null或者更新node為尾節(jié)點(diǎn)失敗怜械,執(zhí)行此處
return node;
}
- setHead蜈漓,設(shè)置隊(duì)列的頭結(jié)點(diǎn)穆桂,也是出隊(duì)列。
/**
* 設(shè)置隊(duì)列的頭結(jié)點(diǎn)融虽,也是出隊(duì)列享完。被acquire方法調(diào)用
*
* @param node the node
*/
private void setHead(Node node) {
head = node;
node.thread = null;
node.prev = null;
}
- unparkSuccessor,喚醒掛起的節(jié)點(diǎn)
private void unparkSuccessor(Node node) {
/*
* 當(dāng)前結(jié)點(diǎn)狀態(tài)小于0時(shí)有额,設(shè)置狀態(tài)為0
*/
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
/*
* 喚醒的線程被successor持有般又,通常情況下就是下一個(gè)節(jié)點(diǎn)。但是如果被取消或者為null了巍佑,
* 需要從隊(duì)列尾tail向前查找離node最近的未被取消的successor
*/
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread); //喚醒線程茴迁。【對(duì)同一線程先unpark再park萤衰,park是不會(huì)阻塞線程的堕义。】
}
- cancelAcquire脆栋,取消正在嘗試獲取鎖的node
private void cancelAcquire(Node node) {
// Ignore if node doesn't exist
if (node == null)
return;
node.thread = null;
// 跳過已取消的前節(jié)點(diǎn)
Node pred = node.prev;
while (pred.waitStatus > 0)
node.prev = pred = pred.prev;
// predNext is the apparent node to unsplice. CASes below will
// fail if not, in which case, we lost race vs another cancel
// or signal, so no further action is necessary.
Node predNext = pred.next;
// Can use unconditional write instead of CAS here.
// After this atomic step, other Nodes can skip past us.
// Before, we are free of interference from other threads.
node.waitStatus = Node.CANCELLED;
// 如果node為尾節(jié)點(diǎn)倦卖,則將有效的前節(jié)點(diǎn)設(shè)置為尾節(jié)點(diǎn)
if (node == tail && compareAndSetTail(node, pred)) {
compareAndSetNext(pred, predNext, null); //尾節(jié)點(diǎn)的next設(shè)置為null
} else {
// If successor needs signal, try to set pred's next-link
// so it will get one. Otherwise wake it up to propagate.
int ws;
if (pred != head && ((ws = pred.waitStatus) == Node.SIGNAL || (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) && pred.thread != null) {
// 1. pred不是頭節(jié)點(diǎn)
// 2. pred的狀態(tài)為Node.SIGNAL或者狀態(tài)<=0同時(shí)可以設(shè)置為Node.SIGNAL狀態(tài)
// 3. pred的持有的線程不為空
Node next = node.next;
if (next != null && next.waitStatus <= 0)
compareAndSetNext(pred, predNext, next); //pred的next節(jié)點(diǎn)設(shè)置為當(dāng)前節(jié)點(diǎn)的下一個(gè)節(jié)點(diǎn)
} else {
unparkSuccessor(node);
}
node.next = node; // help GC
}
}
- shouldParkAfterFailedAcquire,檢查與更新獲取鎖失敗的節(jié)點(diǎn)的狀態(tài)椿争,如果線程需要掛起怕膛,返回true.
/**
*
* 檢查與更新獲取鎖失敗的節(jié)點(diǎn)的狀態(tài),如果線程需要掛起秦踪,返回true.
*
* @param pred node's predecessor holding status
* @param node the node
* @return {@code true} if thread should block
*/
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
/*
* 這個(gè)節(jié)點(diǎn)已經(jīng)設(shè)置為允許釋放鎖release來喚醒它的狀態(tài)褐捻,所以它可以被掛起
*/
return true;
if (ws > 0) {
/*
* 跳過所有已經(jīng)被取消的前節(jié)點(diǎn)
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* 此處的waitStatus狀態(tài)為0或者PROPAGATE。
* 表示此節(jié)點(diǎn)需要一個(gè)喚醒信號(hào)椅邓,但是還沒有被掛起柠逞。
* 調(diào)用者需要重試來確保它在掛起前不能獲取到鎖
*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
- selfInterrupt,中斷當(dāng)前線程
static void selfInterrupt() {
Thread.currentThread().interrupt();
}
- parkAndCheckInterrupt景馁,掛起當(dāng)前線程并檢查是否中斷
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
- acquireQueued边苹,對(duì)于已經(jīng)在隊(duì)列中的節(jié)點(diǎn),以獨(dú)占的方式獲取鎖
/**
* 對(duì)于已經(jīng)在隊(duì)列中的節(jié)點(diǎn)裁僧,以獨(dú)占的方式獲取鎖。condition的wait和acquire方法會(huì)使用它
*
* @param node the node
* @param arg the acquire argument
* @return {@code true} if interrupted while waiting
*/
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true; //是否失敗
try {
boolean interrupted = false; //是否中斷
for (;;) {
/**
* 隊(duì)列中的所有節(jié)點(diǎn)都在此處循環(huán)調(diào)用慕购,
* 只有pred為頭節(jié)點(diǎn)的節(jié)點(diǎn)可能獲取到鎖聊疲,
* 這時(shí)重新設(shè)置獲取到鎖的節(jié)點(diǎn)為頭結(jié)點(diǎn)(獲取到鎖的節(jié)點(diǎn)出隊(duì)列)并返回
*/
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) { //前節(jié)點(diǎn)為頭節(jié)點(diǎn)同時(shí)獲取鎖成功
setHead(node); //將node節(jié)點(diǎn)設(shè)置為頭結(jié)點(diǎn)
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) //檢查是否需要被掛起
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
- acquire,獲取獨(dú)占鎖
/**
* 調(diào)用tryAcquire獲取鎖沪悲,如果成功則結(jié)束
* 如果失敗获洲,則用addWaiter為當(dāng)前線程創(chuàng)建獨(dú)占鎖節(jié)點(diǎn)加入到等待隊(duì)列中
* 調(diào)用acquireQueued方法不斷循環(huán)檢查是否能獲取到鎖或者掛起線程,直到獲取到鎖后返回
* 可用來實(shí)現(xiàn)Lock#lock方法
* @param arg
* @param arg the acquire argument. This value is conveyed to
* {@link #tryAcquire} but is otherwise uninterpreted and
* can represent anything you like.
*/
public final void acquire(int arg) {
if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
- release殿如,釋放鎖
**
* 釋放鎖贡珊。
* 可用來實(shí)現(xiàn){@link Lock#unlock}
*
* @param arg arg the release argument. This value is conveyed to
* {@link #tryRelease} but is otherwise uninterpreted and
* can represent anything you like.
* @return the value returned from {@link #tryRelease}
*/
public final boolean release(int arg) {
if (tryRelease(arg)) {
/*
* 釋放鎖成功最爬,頭結(jié)點(diǎn)不為空且狀態(tài)不為0,則喚醒頭結(jié)點(diǎn)后面節(jié)點(diǎn)的線程门岔;
* 喚醒后爱致,被喚醒的線程則在acquireQueued方法的for循環(huán)里循環(huán)獲取鎖
*/
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
- tryAcquire, 需子類實(shí)現(xiàn)寒随,以獨(dú)占的方式獲取鎖
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
- tryRelease糠悯,需子類實(shí)現(xiàn),釋放獨(dú)占所
protected boolean tryRelease(int arg) {
throw new UnsupportedOperationException();
}