一、AQS概述
AQS全名AbstractQueuedSynchronizer,意為抽象隊(duì)列同步器,JUC(java.util.concurrent包)下面的Lock和其他一些并發(fā)工具類都是基于它來實(shí)現(xiàn)的生真。AQS維護(hù)了一個volatile的state和一個CLH(FIFO)雙向隊(duì)列。
二屋吨、分析
state
state是一個由volatile修飾的int變量针贬,它的訪問方式有三種:
- getState()
- setState(int newState)
- compareAndSetState(int expect, int update)
/**
* 由volatile修飾的state
*/
private volatile int state;
/**
* 基于內(nèi)存可見性的讀
*/
protected final int getState() {
return state;
}
/**
* 基于內(nèi)存可見性的寫
*/
protected final void setState(int newState) {
state = newState;
}
/**
* 使用CAS+volatile,基于原子性與可見性的對state進(jìn)行設(shè)值
*/
protected final boolean compareAndSetState(int expect, int update) {
// 使用Unsafe類赋兵,調(diào)用JNI方法
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
資源獲取主要有兩種形式:
- 獨(dú)占式(EXCLUSIVE)
僅有一個線程能在同一時刻獲取到資源并處理笔咽,如ReentrantLock的實(shí)現(xiàn)。
- 共享式(SHARED)
多個線程可以同時獲取到資源并處理霹期,如Semaphore/CountDownLatch等叶组。
AQS中大部分邏輯已經(jīng)被實(shí)現(xiàn),集成類只需要重寫state的獲壤臁(acquire)與釋放(release)方法甩十,因?yàn)樵贏QS中,這些方法默認(rèn)定義的實(shí)現(xiàn)方式都是拋出不支持操作異常帕膜,所以按需實(shí)現(xiàn)即可枣氧。
其中需要繼承類重寫的方法有:
- tryAcquire(int arg)
此方法是獨(dú)占式的獲取資源方法,成功則返回true,失敗返回false垮刹。
- tryRelease(int arg)
此方法是獨(dú)占式的釋放資源方法达吞,成功則返回true,失敗返回false。
- tryAcquireShared(int arg)
此方法是共享式的獲取資源方法荒典,返回負(fù)數(shù)表示失敗酪劫,0表示獲取成功吞鸭,但是沒有可用資源,正數(shù)表示獲取成功覆糟,且有可用資源刻剥。
- tryReleaseShared(int arg)
此方法是共享式的釋放資源方法,如果允許喚醒后續(xù)等待線程則返回true,不允許則返回false滩字。
- isHeldExclusively()
判斷當(dāng)前線程是否正在獨(dú)享資源造虏,是則返回true,否則返回false。
CLH(FIFO)隊(duì)列
AQS中是通過內(nèi)部類Node來維護(hù)一個CLH隊(duì)列的麦箍。源碼如下:
static final class Node {
/** 標(biāo)記共享式訪問 */
static final Node SHARED = new Node();
/** 標(biāo)記獨(dú)占式訪問 */
static final Node EXCLUSIVE = null;
/** 字段waitStatus的值漓藕,表示當(dāng)前節(jié)點(diǎn)已取消等待 */
static final int CANCELLED = 1;
/**字段waitStatus的值,表示當(dāng)前節(jié)點(diǎn)取消或釋放資源后挟裂,通知下一個節(jié)點(diǎn) */
static final int SIGNAL = -1;
/** 表示正在等待觸發(fā)條件 */
static final int CONDITION = -2;
/**
* 表示下一個共享獲取應(yīng)無條件傳播
*/
static final int PROPAGATE = -3;
/**
* Status field, taking on only the values:
* SIGNAL: 表示當(dāng)前節(jié)點(diǎn)取消或釋放資源后享钞,通知下一個節(jié)點(diǎn)
* CANCELLED: 表示當(dāng)前節(jié)點(diǎn)已取消等待
* CONDITION: 表示正在等待觸發(fā)條件
* PROPAGATE: 表示下一個共享獲取應(yīng)無條件傳播
*/
volatile int waitStatus;
/**
* 前節(jié)點(diǎn)
*/
volatile Node prev;
/**
* 下一個節(jié)點(diǎn)
*/
volatile Node next;
/**
* 節(jié)點(diǎn)對應(yīng)線程
*/
volatile Thread thread;
/**
* 下一個等待的節(jié)點(diǎn)
*/
Node nextWaiter;
/**
* 是否是共享式訪問
*/
final boolean isShared() {
return nextWaiter == SHARED;
}
/**
* 返回前節(jié)點(diǎn)
*/
final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}
Node() { // 共享式訪問的構(gòu)造函數(shù)
}
Node(Thread thread, Node mode) { // 用于被添加等待者使用
this.nextWaiter = mode;
this.thread = thread;
}
Node(Thread thread, int waitStatus) { // 用于Condition使用
this.waitStatus = waitStatus;
this.thread = thread;
}
}
獨(dú)占模式-獲取資源
使用AQS中的acquire(int arg)方法
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
該方法分為4個部分:
- tryAcquire()
需要自己實(shí)現(xiàn)的方法,如果獲取到資源使用權(quán)诀蓉,則返回true,反之fasle栗竖。如果獲取到資源,返回true,!true為false,根據(jù)&&的短路性渠啤,則不會執(zhí)行后續(xù)方法狐肢,直接跳過程序。如果未獲取到資源沥曹,返回false,!false為true,則進(jìn)入后續(xù)方法处坪。
- addWaiter()
如果未獲取到資源使用權(quán),則首先會調(diào)用此方法架专。上源碼:
private Node addWaiter(Node mode) {
// 封裝當(dāng)前線程和獨(dú)占模式
Node node = new Node(Thread.currentThread(), mode);
// 獲取尾部節(jié)點(diǎn)
Node pred = tail;
if (pred != null) {
node.prev = pred;
// CAS設(shè)置尾部節(jié)點(diǎn)
if (compareAndSetTail(pred, node)) {
// 將為節(jié)點(diǎn)的下一節(jié)點(diǎn)指向當(dāng)前node
pred.next = node;
return node;
}
}
// 如果尾結(jié)點(diǎn)為空或者設(shè)置尾結(jié)點(diǎn)失敗
enq(node);
return node;
}
private Node enq(final Node node) {
// 如果CAS設(shè)置未成功則死循環(huán)
for (;;) {
// 獲得尾結(jié)點(diǎn)
Node t = tail;
// 如果尾節(jié)點(diǎn)為空,說明CLH隊(duì)列為空玄帕,需要初始化
if (t == null) {
if (compareAndSetHead(new Node()))
tail = head;
} else {
// 設(shè)置當(dāng)前節(jié)點(diǎn)的前驅(qū)節(jié)點(diǎn)
node.prev = t;
// CAS設(shè)置當(dāng)前節(jié)點(diǎn)為尾結(jié)點(diǎn)
if (compareAndSetTail(t, node)) {
// 設(shè)置后驅(qū)節(jié)點(diǎn)
t.next = node;
return t;
}
}
}
}
- acquiredQueued()
final boolean acquireQueued(final Node node, int arg) {
// 標(biāo)識資源獲取是否失敗
boolean failed = true;
try {
// 標(biāo)識線程是否中斷
boolean interrupted = false;
for (;;) {
// 獲得當(dāng)前節(jié)點(diǎn)的前驅(qū)節(jié)點(diǎn)
final Node p = node.predecessor();
// 如果前驅(qū)節(jié)點(diǎn)為頭結(jié)點(diǎn)部脚,說明快到當(dāng)前節(jié)點(diǎn)了,嘗試獲取資源
if (p == head && tryAcquire(arg)) {
// 獲取資源成功
// 設(shè)置當(dāng)前節(jié)點(diǎn)為頭結(jié)點(diǎn)
setHead(node);
// 取消前驅(qū)節(jié)點(diǎn)(以前的頭部)的后節(jié)點(diǎn)裤纹,方便GC回收
p.next = null; // help GC
// 標(biāo)識未失敗
failed = false;
// 返回中斷標(biāo)志
return interrupted;
}
// 如果當(dāng)前節(jié)點(diǎn)的前驅(qū)節(jié)點(diǎn)不是頭結(jié)點(diǎn)或獲取資源失敗
// 需要用shouldParkAfterFailedAcquire函數(shù)判斷是否需要阻塞該節(jié)點(diǎn)持有的線程
// 如果需要阻塞委刘,則執(zhí)行parkAndCheckInterrupt方法,并設(shè)置被中斷
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
// 如果最終獲取資源失敗
if (failed)
// 當(dāng)前節(jié)點(diǎn)取消獲取資源
cancelAcquire(node);
}
}
- selfInterrupt()
中斷當(dāng)前線程
static void selfInterrupt() {
Thread.currentThread().interrupt();
}
獨(dú)占模式-釋放資源
release() 釋放資源并喚醒后繼線程
public final boolean release(int arg) {
if (tryRelease(arg)) {
// 獲取頭結(jié)點(diǎn)
Node h = head;
// 頭結(jié)點(diǎn)不為空且等待狀態(tài)值不為0
if (h != null && h.waitStatus != 0)
// 喚醒后續(xù)等待線程
unparkSuccessor(h);
return true;
}
return false;
}
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
// 如果等待狀態(tài)值小于0
if (ws < 0)
// 使用CAS將waitStatus設(shè)置為0
compareAndSetWaitStatus(node, ws, 0);
Node s = node.next;
// 如果當(dāng)前節(jié)點(diǎn)沒有后繼節(jié)點(diǎn)或者后繼節(jié)點(diǎn)放棄競爭資源
if (s == null || s.waitStatus > 0) {
s = null;
// 從隊(duì)列尾部循環(huán)直到當(dāng)前節(jié)點(diǎn)鹰椒,找到最近的且等待狀態(tài)值小于0的節(jié)點(diǎn)
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
// 如果找到的后繼節(jié)點(diǎn)不為空,則喚醒其持有的線程
if (s != null)
LockSupport.unpark(s.thread);
}