什么是AQS?
AQS(AbstractQueuedSynchronizer): 是并發(fā)容器J.U.C(java.util.concurrent)下locks包內(nèi)的一個類. 它實現(xiàn)了一個FIFO(FirstIn汁汗、FisrtOut先進先出)的隊列. 底層實現(xiàn)的數(shù)據(jù)結(jié)構(gòu)是一個雙向鏈表.
AQS的核心思想是, 如果被請求的共享資源是空閑的, 則將當(dāng)前請求資源的線程設(shè)置為有效線程, 并且將共享的資源設(shè)置為鎖定的狀態(tài). 如果被請求的共享資源被占用, 那么就需要一套線程阻塞/等待以及喚醒進行鎖分配的機制, 這個機制AQS是用CLH(參考:
備注1
)隊列鎖實現(xiàn)的, 就是將暫時獲取不到鎖的線程加入到隊列中進行等待.AQS定義兩種資源共享方式: Exclusive(獨占,只有一個線程能執(zhí)行,如ReentrantLock)和Share(共享,多個線程可同時執(zhí)行,如Semaphore/CountDownLatch)
AQS同步器是用一個int變量state來表示狀態(tài). 同步功能使用的方法都是子類繼承AbstractQueuedSynchronizer類實現(xiàn)的. 子類通過繼承同步器實現(xiàn)自身需要的方法來管理state狀態(tài), 管理的方式就是通過
accquire()/accquireShared()/release()/releaseShared()
等方法來操作狀態(tài). 在多線程環(huán)境下狀態(tài)的操作必須保證其原子性, 所以子類在狀態(tài)的管理中需要使用AQS同步器提供的三個方法操作state:getState()/setState(int)/compareAndSetState(int, int)
.子類推薦被定義為自定義同步裝置的內(nèi)部類
(大佬都是這么實現(xiàn)的, 跟著沒毛病)
.
備注1:
CLH鎖即Craig, Landin, and Hagersten (CLH) locks. CLH鎖是一個自旋鎖尊惰。能確保無饑餓性. 提供先來先服務(wù)的公平性.
CLH鎖也是一種基于鏈表的可擴展专缠、高性能、公平的自旋鎖, 申請線程僅僅在本地變量上自旋, 它不斷輪詢前驅(qū)的狀態(tài), 假設(shè)發(fā)現(xiàn)前驅(qū)釋放了鎖就結(jié)束自旋
AQS中的數(shù)據(jù)結(jié)構(gòu)
節(jié)點和同步隊列
Node節(jié)點屬性
- 節(jié)點的狀態(tài)waitStatus.
CANCELLED(1): 表示當(dāng)前節(jié)點被取消, 進入改狀態(tài)的節(jié)點將不會在發(fā)生變化.
SIGNAL(-1): 表示后繼結(jié)點在等待當(dāng)前結(jié)點喚醒. 后繼結(jié)點入隊時, 會將前繼結(jié)點的狀態(tài)更新為SIGNAL.
CONDITION:(-2) : 表示當(dāng)前節(jié)點在condition隊列中進行等待. 當(dāng)其他線程調(diào)用了Condition的signal()方法后, CONDITION狀態(tài)的結(jié)點將從等待隊列轉(zhuǎn)移到同步隊列中, 等待獲取同步鎖.
PROPAGATE(-3): 共享模式下, 前繼結(jié)點不僅會喚醒其后繼結(jié)點, 同時也可能會喚醒后繼的后繼結(jié)點.
值為0, 表示當(dāng)前節(jié)點在sync隊列中勾笆,等待著獲取同步鎖.
- Node prev: 前驅(qū)節(jié)點.
- Node next: 后繼節(jié)點.
- Node nextWaiter: 存儲condition隊列中的后繼節(jié)點.
- Thread thread: 當(dāng)前線程.
同步隊列數(shù)據(jù)結(jié)構(gòu)
核心方法分析
public final void acquire(int arg)
該方法是獨占模式下線程獲取共享資源的入口, 如果獲取到資源后, 線程直接返回.否則將進入等待隊列, 直到獲取到資源為止(整個過程忽略中斷的影響
). 這就是Lock.lock()的語義, 你也可以自定義Lock頂層接口, 參考 Doug Lea對Lock的定義.
public final void acquire(int var1) {
if (!tryAcquire(var1) && acquireQueued(addWaiter(Node.EXCLUSIVE), var1)) {
selfInterrupt();
}
}
函數(shù)流程如下:
- tryAcquire(): 嘗試直接獲取資源, 如果成功直接返回
(調(diào)用tryAcquire更改狀態(tài),需要保證原子性. 這里體現(xiàn)了非公平鎖, 每個線程獲取鎖時會嘗試直接搶占加塞一次, 而CLH隊列中可能還有別的線程在等待)
. - addWaiter(): 如果獲取不到, 將當(dāng)前線程構(gòu)造成節(jié)點Node并加入sync隊列的尾部, 并且標(biāo)記為獨占模式.
- acquireQueued(): 使線程阻塞在等待隊列中獲取資源, 一直獲取到資源后才返回. 如果在整個等待過程中被中斷過, 則返回true, 否則返回false.
- 如果線程在等待過程中被中斷過, 它是不響應(yīng)的. 只是獲取資源后才再進行自我中斷selfInterrupt(), 將中斷補上
(響應(yīng)前面說的, 整個等待過程忽略中斷的影響)
.
1. tryAcquire()方法
此方法嘗試去獲取獨占資源. 如果獲取成功, 則直接返回true, 否則直接返回false. 這也正是tryLock()的語義, 還是那句話. 當(dāng)然不僅僅只限于tryLock().
如下是tryAcquire()的源碼
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
這里throw異常是留給我們進行實現(xiàn)的. AQS只是一個框架, 具體資源的獲取和釋放邏輯由我們自定義同步器去實現(xiàn)(就像ReentrantLock類)
. 需要自定義實現(xiàn)的方法都沒有定義成abstract, 由我們根據(jù)同步器獨占/共享自有選擇.
2. addWaiter(Node)方法
private Node addWaiter(Node mode) {
// 以給定模式構(gòu)造結(jié)點. mode有兩種: EXCLUSIVE(獨占)和SHARED(共享)
Node node = new Node(Thread.currentThread(), mode);
//嘗試直接將節(jié)點放到sync隊列尾部,
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
//如果放入尾部失敗, 調(diào)用enq()入隊
enq(node);
return node;
}
3. enq(Node)方法
private Node enq(final Node node) {
//CAS"自旋", 直到成功加入隊尾
for (;;) {
Node t = tail;
if (t == null) { // 隊列為空, 創(chuàng)建一個空的結(jié)點作為head結(jié)點, 并將tail也指向它, 這是一個初始化的動作
if (compareAndSetHead(new Node()))
tail = head;
} else {//正常流程, 放入隊尾
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
CAS自旋volatile變量, 保證了可見性, 操作上又是原子方法. 這是一種很經(jīng)典的用法
4. acquireQueued(Node, int)方法
當(dāng)節(jié)點進入同步隊列后, 接下來就是要等待獲取鎖(訪問控制), 同一時刻只有一個線程在運行, 其他都要進入等待狀態(tài). 每個線程節(jié)點都是獨立的, 他們進行自旋判斷, 當(dāng)發(fā)現(xiàn)前驅(qū)節(jié)點是頭結(jié)點并且獲取了狀態(tài)(tryAcquire()自己實現(xiàn)原子性操作)
, 那這個線程就可以運行了.
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;//標(biāo)記是否可以成功拿到狀態(tài)
try {
boolean interrupted = false;//處理過程中是否被中斷過
for (;;) {//自旋
final Node p = node.predecessor();//獲取當(dāng)前節(jié)點的前驅(qū)節(jié)點
//如果前驅(qū)節(jié)點是head, 當(dāng)前節(jié)點就是排第二. 這個時候可以嘗試去獲取資源了(頭結(jié)點可能釋放完喚醒自己了)
if (p == head && tryAcquire(arg)) {
setHead(node);//設(shè)置頭節(jié)點為當(dāng)前節(jié)點
p.next = null; // help GC setHead()中node.prev已置為null, 此處再將head.next置為null. 方便gc回收head節(jié)點.
failed = false;//標(biāo)記成功獲取資源
return interrupted;
}
//不滿足喚醒條件, 調(diào)用park()進入waiting狀態(tài), 等待unpark(). 如果等待的過程被中斷, 線程會從park()中醒過來, 發(fā)現(xiàn)拿不到資源后繼續(xù)進入park()中等待.
if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
interrupted = true;//如果線程被終端, 標(biāo)記interrupted為true, 等待線程獲取到資源后在中斷
}
} finally {
if (failed)//如果等待過程中沒有成功獲取資源(不可控異常), 取消線程在隊列的等待
cancelAcquire(node);
}
}
shouldParkAfterFailedAcquire()方法如果發(fā)現(xiàn)前驅(qū)節(jié)點狀態(tài)不是SIGNAL, 會標(biāo)記前驅(qū)節(jié)點狀態(tài)為SIGNAL(-1). 如果發(fā)現(xiàn)前驅(qū)節(jié)點放棄等待了就一直往前找節(jié)點, 直到找到正常等待的節(jié)點排隊到它后面.
parkAndCheckInterrupt()使線程進入waiting狀態(tài), 如果發(fā)現(xiàn)被喚醒, 檢查是不是被中斷了并且清除狀態(tài).
acquire()方法總結(jié)
- 嘗試直接插隊獲取資源, 如果不成功進入同步隊列排隊.
- 調(diào)用park()進入waiting狀態(tài), 等待前驅(qū)節(jié)點調(diào)用unpark()或者interrupt()喚醒自己.
interrupt()喚醒拿不到資源繼續(xù)進入waiting狀態(tài)
. - 被喚醒后嘗試獲取資源, 如果獲取不到資源進入2流程, 獲取到資源就執(zhí)行后續(xù)代碼
(如果等待過程被中斷過此時會調(diào)用selfInterrupt()將中斷補上)
.
public final boolean release(int arg)
該方法是獨占模式下線程釋放共享資源的入口.
public final boolean release(int arg) {
if (tryRelease(arg)) {//釋放資源, 自定義函數(shù)實現(xiàn)
Node h = head;
if (h != null && h.waitStatus != 0)//拿到頭結(jié)點
unparkSuccessor(h);//喚醒等待隊列中的下一個線程
return true;
}
return false;
}
1. tryRelease(arg)方法
需要我們實現(xiàn)的獨占資源釋放函數(shù).
protected boolean tryRelease(int arg) {
throw new UnsupportedOperationException();
}
2. unparkSuccessor(node) 方法
喚醒等待隊列中的下一個線程
private void unparkSuccessor(Node node) {
//當(dāng)前線程節(jié)點的狀態(tài)
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);//設(shè)置當(dāng)前線程的節(jié)點狀態(tài)為0, 因為已經(jīng)釋放資源
Node s = node.next; //找到下一個需要喚醒的節(jié)點
if (s == null || s.waitStatus > 0) {//下一個節(jié)點為空或者已經(jīng)放棄等待就取消喚醒操作
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)//從后往前找有效的節(jié)點
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);//喚醒有效節(jié)點
}
下一個有效的線程被喚醒后處在acquireQueued()的自旋流程中, 然后進入資源判斷獲取(if (p == head && tryAcquire(arg))).
public final void acquireShared(int arg)
此方法是共享模式下線程獲取共享資源的頂層入口. 它會獲取指定量的資源(state), 獲取成功后直接返回, 獲取失敗進入等待隊列, 直到獲取到資源(整個過程忽略中斷的影響
).參考ReentrantReadWriteLock設(shè)計.
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)//改方法需要自定義同步器實現(xiàn). 返回語義負(fù)數(shù)表示失敗, 0或者大于零表示獲取成功.
doAcquireShared(arg);//小于零進入等待隊列, 獲取資源后返回
}
1. doAcquireShared(arg)方法
private void doAcquireShared(int arg) {
final Node node = addWaiter(Node.SHARED);//加入隊列的尾部, 模式為共享. addWaiter()方法參考上面介紹
boolean failed = true;//成功失敗標(biāo)識
try {
boolean interrupted = false;//是否中斷標(biāo)識
for (;;) {//CAS自旋
final Node p = node.predecessor();//獲取前驅(qū)節(jié)點
if (p == head) {//前驅(qū)節(jié)點為頭結(jié)點, 嘗試獲取資源(此處有可能是前驅(qū)節(jié)點喚醒了自己)
int r = tryAcquireShared(arg);//獲取資源
if (r >= 0) {//成功
setHeadAndPropagate(node, r);//將head指向自己, 此時r>0, 還有剩余資源喚醒后續(xù)排隊線程
p.next = null; // help GC
if (interrupted)// 中斷標(biāo)識
selfInterrupt();//補上中斷
failed = false;
return;
}
}
//不滿足喚醒條件, 調(diào)用park()進入waiting狀態(tài), 等待unpark(). 如果等待的過程被中斷, 線程會從park()中醒過來, 發(fā)現(xiàn)拿不到資源后繼續(xù)進入park()中等待.
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
2. setHeadAndPropagate(Node, int)方法
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head;
setHead(node);/head指向自己
//如果還有剩余量, 繼續(xù)喚醒下一個排隊的線程
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
if (s == null || s.isShared())
doReleaseShared();
}
}
acquireShared()方法總結(jié)
- tryAcquireShared()方法嘗試獲取資源, 成功直接返回, 如果不成功進入同步隊列排隊.
- 調(diào)用park()進入waiting狀態(tài), 等待前驅(qū)節(jié)點調(diào)用unpark()或者interrupt()喚醒自己.
- 被喚醒后嘗試獲取資源, 如果獲取不到資源進入2流程, 獲取到資源就執(zhí)行后續(xù)代碼.
其實同acquir()方法一樣, 只不過該方法在自己拿到資源后回去喚醒后繼線程
public final boolean releaseShared(int arg)
該方法是共享模式下線程釋放共享資源的入口. 跟獨占模式下的資源釋放方法release()很相似, 不同的是獨占模式一般是完全釋放資源(state=0)后才允許去喚醒其他線程, 而共享模式往往不會這么控制, 具體實現(xiàn)要看自定義同步器的邏輯.
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {//嘗試釋放資源, 該方法需要自定義共享同步器實現(xiàn).
doReleaseShared();//喚醒后繼節(jié)點
return true;
}
return false;
}
1. tryReleaseShared()方法
需要我們自己實現(xiàn)的共享資源釋放方法.
protected boolean tryReleaseShared(int arg) {
throw new UnsupportedOperationException();
}
2. doReleaseShared()方法
該方法是用來喚醒后繼節(jié)點的.
private void doReleaseShared() {
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue;
unparkSuccessor(h);//喚醒后繼節(jié)點
} else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue;
}
if (h == head) //head節(jié)點如果發(fā)生變化即退出自旋
break;
}
}
releaseShared()方法總結(jié)
- tryReleaseShared()方法進行共享資源的釋放.
- doReleaseShared()方法用來喚醒后繼節(jié)點.
以上是幾個AQS常用的資源獲取和釋放的基本方法, 其實還有一些方法和上面分析的方法略有不同, 如下:
- 獨占式獲取資源
1. acquireInterruptibly(int arg): 類似于acquire()方法, 不同的地方是該方法響應(yīng)外界對線程的中斷信號, 并且拋出InterruptedException()異常.
2. tryAcquireNanos(int arg, long nanosTimeout) : 類似于acquire()方法, 同樣響應(yīng)中斷拋出InterruptedException()異常, 并且該方法有獲取超時時間.
- 共享式獲取資源
1. acquireSharedInterruptibly(int arg): 類似acquireInterruptibly()方法的共享實現(xiàn), 同樣響應(yīng)中斷拋出InterruptedException()異常.
2. tryAcquireSharedNanos(int arg, long nanosTimeout)(): 類似tryAcquireNanos()方法的共享實現(xiàn), 同樣響應(yīng)中斷拋出InterruptedException()異常, 并且該方法有獲取超時時間.
測試案例
1. ExclusiveLock(自定義獨占鎖)
ExclusiveLock是互斥的不可重入鎖實現(xiàn), 對鎖資源State的操作只有0和1兩個狀態(tài), 0代表未鎖定,1代表鎖定. 按照上面的分析, 我們需要實現(xiàn)AQS的tryAcquire()和tryRelease()方法.
public class ExclusiveLock implements Lock {
//自定義內(nèi)部類同步器
private static class ExclusiveSync extends AbstractQueuedSynchronizer {
//判斷是否是鎖定狀態(tài)
@Override
protected boolean isHeldExclusively() {
return getState() == 1;
}
//嘗試獲取資源, 如果成功直接返回. 獲取成功返回true, 否則返回false.
@Override
protected boolean tryAcquire(int arg) {
if(compareAndSetState(0, 1)){//狀態(tài)變更必須為CAS原子操作, 保證原子性
setExclusiveOwnerThread(Thread.currentThread());//同樣也是原子操作
return true;
}
return false;
}
//嘗試釋放資源
@Override
protected boolean tryRelease(int arg) {
if(getState() == 0){
throw new UnsupportedOperationException();
}
setExclusiveOwnerThread(null);
setState(0);
return true;
}
}
//創(chuàng)建自定義同步器的實現(xiàn)
private final ExclusiveSync sync = new ExclusiveSync();
//獲取資源, 同acquire()語義一樣, 獲取不到進入同步隊列等待成功返回
@Override
public void lock() {
sync.acquire(1);
}
//判斷鎖是否被占有
@Override
public void lockInterruptibly() throws InterruptedException {
sync.acquireInterruptibly(1);
}
//獲取資源, 立刻返回結(jié)果
@Override
public boolean tryLock() {
return sync.tryAcquire(1);
}
//
@Override
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
return sync.tryAcquireNanos(1, unit.toNanos(time));
}
//釋放資源
@Override
public void unlock() {
sync.release(1);
}
}
1. ShareLock(自定義共享鎖)
ShareLock為一個共享同步器的實現(xiàn), 設(shè)計同一時刻可以有兩個線程獲取到資源, 超過兩個進行同步隊列阻塞. 按照上面的分析, 我們實現(xiàn)AQS的tryAcquireShared()和tryReleaseShared()方法.
public class ShareLock implements Lock {
public static class ShareSync extends AbstractQueuedSynchronizer{
//定義同步器的初始狀態(tài)為2
ShareSync(int count) {
if (count <= 0) {
throw new IllegalArgumentException("count must large than zero.");
}
setState(count);
}
@Override
protected int tryAcquireShared(int reduceCount) {
for (;;) {
int current = getState();
int newCount = current - reduceCount;
if (newCount < 0 || compareAndSetState(current, newCount)) {
return newCount;
}
}
}
@Override
protected boolean tryReleaseShared(int reduceCount) {
for (;;) {
int current = getState();
int newCount = current + reduceCount;
if (compareAndSetState(current, newCount)) {
return true;
}
}
}
}
private final ShareSync sync = new ShareSync(2);
@Override
public void lock() {
sync.acquireShared(1);
}
@Override
public void lockInterruptibly() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
@Override
public boolean tryLock() {
return sync.tryAcquireShared(1) >= 0;
}
@Override
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
return sync.tryAcquireSharedNanos(1, unit.toNanos(time));
}
@Override
public void unlock() {
sync.releaseShared(1);
}
@Override
public Condition newCondition() {
return null;
}
}