引言
AbstractQueuedSynchronizer挑围,隊列同步器,簡稱AQS恢氯,它是java并發(fā)用來構(gòu)建鎖或者其他同步組件的基礎(chǔ)框架。
一般使用AQS的主要方式是繼承鼓寺,子類通過實現(xiàn)它提供的抽象方法來管理同步狀態(tài)勋拟,主要管理的方式是通過tryAcquire和tryRelease類似的方法來操作狀態(tài),同時侄刽,AQS提供以下線程安全的方法來對狀態(tài)進行操作:
protected final int getState();
protected final void setState(int newState);
protected final boolean compareAndSetState(int expect, int update);
AQS本身是沒有實現(xiàn)任何同步接口的指黎,它僅僅只是定義了同步狀態(tài)的獲取和釋放的方法來供自定義的同步組件的使用朋凉。
注:AQS主要是怎么使用的呢州丹?
在java的同步組件中,AQS的子類一般是同步組件的靜態(tài)內(nèi)部類杂彭。
AQS是實現(xiàn)同步組件的關(guān)鍵墓毒,它倆的關(guān)系可以這樣描述:同步組件是面向使用者的,它定義了使用者與組件交互的接口亲怠,隱藏了具體的實現(xiàn)細節(jié)所计;而AQS面向的是同步組件的實現(xiàn)者,它簡化了具體的實現(xiàn)方式团秽,屏蔽了線程切換相關(guān)底層操作主胧,它們倆一起很好的對使用者和實現(xiàn)者所關(guān)注的領(lǐng)域做了一個隔離。
AQS實現(xiàn)分析
接下來將從實現(xiàn)的角度來具體分析AQS是如何來完成線程同步的习勤。
同步隊列分析
AQS的實現(xiàn)依賴內(nèi)部的同步隊列(FIFO雙向隊列)來完成同步狀態(tài)的管理踪栋,假如當(dāng)前線程獲取同步狀態(tài)失敗,AQS會將該線程以及等待狀態(tài)等信息構(gòu)造成一個Node图毕,并將其加入同步隊列夷都,同時阻塞當(dāng)前線程。當(dāng)同步狀態(tài)釋放時予颤,喚醒隊列的首節(jié)點囤官。
-
Node
Node主要包含以下成員變量:
static final class Node {
volatile int waitStatus;
volatile Node prev;
volatile Node next;
volatile Thread thread;
Node nextWaiter;
}
- ```waitStatus```:節(jié)點狀態(tài)冬阳,主要有這幾種狀態(tài):
1. ```CANCELLED```:當(dāng)前線程被取消;
2. ```SIGNAL```:當(dāng)前節(jié)點的后繼節(jié)點需要運行党饮;
3. ```CONDITION```:當(dāng)前節(jié)點在等待condition肝陪;
4. ```PROPAGATE```:當(dāng)前場景下后續(xù)的acquireShared可以執(zhí)行;
5. 0:當(dāng)前節(jié)點在sync隊列中等待獲取鎖刑顺。
-
prev
:前驅(qū)節(jié)點见坑; -
next
:后繼節(jié)點; -
thread
:進入隊列的當(dāng)前線程捏检; -
nextWaiter
:存儲condition隊列中的后繼節(jié)點荞驴。
Node是sync隊列和condition隊列構(gòu)建的基礎(chǔ),AQS擁有三個成員變量:
對于鎖的獲取贯城,請求形成節(jié)點將其掛在隊列尾部熊楼,至于資源的轉(zhuǎn)移,是從頭到尾進行能犯,隊列的基本結(jié)構(gòu)就出來了:
-
同步隊列插入/刪除節(jié)點
-
節(jié)點插入
AQS提供基于CAS的設(shè)置尾節(jié)點的方法:
CAS設(shè)置尾節(jié)點
需要傳遞當(dāng)前線程認為的尾節(jié)點和當(dāng)前節(jié)點鲫骗,設(shè)置成功后,當(dāng)前節(jié)點與尾節(jié)點建立關(guān)聯(lián)踩晶。
同步隊列插入節(jié)點-
節(jié)點刪除
同步隊列遵循FIFO执泰,首節(jié)點是獲取同步狀態(tài)成功的節(jié)點,首節(jié)點的線程在釋放同步狀態(tài)之后將會喚醒后繼節(jié)點渡蜻,后繼節(jié)點將會在獲取同步狀態(tài)成功的時候?qū)⒆约涸O(shè)置為首節(jié)點术吝。同步隊列刪除節(jié)點
-
注:設(shè)置首節(jié)點是由獲取同步狀態(tài)成功的線程來完成,因為每次只會有一個線程能夠成功的獲取到同步狀態(tài)茸苇,所以排苍,設(shè)置首節(jié)點并不需要CAS來保證。
AQS源碼解析
AQS提供以下接口以供實現(xiàn)自定義同步器:
-
protected boolean tryAcquire(int arg)
獨占式獲取同步狀態(tài)学密,該方法的實現(xiàn)需要先查詢當(dāng)前的同步狀態(tài)是否可以獲取淘衙,如果可以獲取再進行獲取腻暮; -
protected boolean tryRelease(int arg)
釋放狀態(tài)彤守; -
protected int tryAcquireShared(int arg)
共享式獲取同步狀態(tài); -
protected boolean tryReleaseShared(int arg)
共享式釋放狀態(tài)哭靖; -
protected boolean isHeldExclusively()
獨占模式下具垫,判斷同步狀態(tài)是否已經(jīng)被占用。
使用者可以根據(jù)實際情況使用這些接口自定義同步組件款青。
AQS提供兩種方式來操作同步狀態(tài)做修,獨占式與共享式,下面就針對性做一下源碼分析。
獨占式同步狀態(tài)獲取 - acquire實現(xiàn)
具體執(zhí)行流程如下:
- 調(diào)用
tryAcquire
方法嘗試獲取同步狀態(tài)饰及; - 如果獲取不到同步狀態(tài)蔗坯,將當(dāng)前線程構(gòu)造成節(jié)點Node并加入同步隊列;
- 再次嘗試獲取燎含,如果還是沒有獲取到那么將當(dāng)前線程從線程調(diào)度器上摘下宾濒,進入等待狀態(tài)。
下面我們具體來看一下節(jié)點的構(gòu)造以及加入同步隊列部分的代碼實現(xiàn)屏箍。
- addWaiter實現(xiàn)
- 使用當(dāng)前thread構(gòu)造Node绘梦;
- 嘗試在隊尾插入節(jié)點,如果尾節(jié)點已經(jīng)存在赴魁,就做以下操作:
- 分配引用T指向尾節(jié)點卸奉;
- 將待插入節(jié)點的prev指針指向尾節(jié)點;
- 如果尾節(jié)點還為T颖御,將當(dāng)前尾節(jié)點設(shè)置為帶待插入節(jié)點榄棵;
- T的next指針指向待插入節(jié)點。
- 快速在隊尾插入節(jié)點潘拱,失敗則進入
enq(Node node)
方法疹鳄。
- enq實現(xiàn)
enq的邏輯可以確保Node可以有順序的添加到同步隊列中,具體的加入隊列的邏輯如下:
- 初始化同步隊列:如果尾節(jié)點為空芦岂,分配一個頭結(jié)點瘪弓,并將尾節(jié)點指向頭結(jié)點;
- 節(jié)點入隊禽最,通過CAS將節(jié)點設(shè)置為尾節(jié)點腺怯,以此在隊尾做節(jié)點插入。
可以看出弛随,整個enq方法通過“死循環(huán)”來保證節(jié)點的正確插入瓢喉。
進入同步隊列之后接下來就是同步狀態(tài)的獲取了宁赤,或者說是訪問控制acquireQueued
舀透。對于同步隊列中的線程,在同一時刻只能由隊列首節(jié)點獲取同步狀態(tài)决左,其他的線程進入等待愕够,直到符合條件才能繼續(xù)進行。
- acquireQueued實現(xiàn)
- 獲取當(dāng)前節(jié)點的前驅(qū)節(jié)點佛猛;
- 如果當(dāng)前節(jié)點的前驅(qū)節(jié)點是頭節(jié)點惑芭,并且可以獲取同步狀態(tài),設(shè)置當(dāng)前節(jié)點為頭結(jié)點继找,該節(jié)點占有鎖遂跟;
- 不滿足條件的線程進入等待狀態(tài)。
在整個方法中,當(dāng)前線程一直都在“死循環(huán)”中嘗試獲取同步狀態(tài):
從代碼的邏輯也可以看出幻锁,其實在節(jié)點與節(jié)點之間在循環(huán)檢查的過程中是不會相互通信的凯亮,僅僅只是判斷自己當(dāng)前的前驅(qū)是不是頭結(jié)點,這樣設(shè)計使得節(jié)點的釋放符合FIFO哄尔,同時也避免了過早通知假消。
注:過早通知是指前驅(qū)節(jié)點不是頭結(jié)點的線程由于中斷被喚醒。
-
acquire實現(xiàn)總結(jié)
- 同步狀態(tài)維護:
對同步狀態(tài)的操作是原子岭接、非阻塞的富拗,通過AQS提供的對狀態(tài)訪問的方法來對同步狀態(tài)進行操作,并且利用CAS來確保原子操作鸣戴; - 狀態(tài)獲瓤谢Α:
一旦線程成功的修改了同步狀態(tài),那么該線程會被設(shè)置為同步隊列的頭節(jié)點窄锅; - 同步隊列維護:
不符合獲取同步狀態(tài)的線程會進入等待狀態(tài)谅阿,直到符合條件被喚醒再開始執(zhí)行。
整個執(zhí)行流程如下:
acquire流程圖 - 同步狀態(tài)維護:
當(dāng)前線程獲取同步狀態(tài)并執(zhí)行了相應(yīng)的邏輯之后酬滤,就需要釋放同步狀態(tài)签餐,讓后續(xù)節(jié)點可以獲取到同步狀態(tài),調(diào)用方法release(int arg)
方法可以釋放同步狀態(tài)盯串。
獨占式同步狀態(tài)釋放 - release實現(xiàn)
- 嘗試釋放狀態(tài)氯檐,
tryRelease
保證將狀態(tài)重置回去,同樣采用CAS來保證操作的原子性; - 釋放成功后萨螺,調(diào)用
unparkSuccessor
喚醒當(dāng)前節(jié)點的后繼節(jié)點線程鹊漠。
- unparkSuccessor實現(xiàn)
取出當(dāng)前節(jié)點的next節(jié)點,將該節(jié)點線程喚醒河泳,被喚醒的線程獲取同步狀態(tài)。這里主要通過
LockSupport
的unpark
方法喚醒線程年栓。
共享式同步狀態(tài)獲取
共享式獲取與獨占式獲取最主要的區(qū)別就是在同一時刻能否有多個線程可以同時獲取到同步狀態(tài)拆挥。這兩種不同的方式在獲取資源區(qū)別如下圖所示:
- 共享式訪問資源時,其他共享式訪問都是被允許的某抓;
- 獨占式訪問資源時纸兔,在同一時刻只能有一個訪問,其他的訪問都被阻塞否副。
AQS提供acquireShared
方法來支持共享式獲取同步狀態(tài)汉矿。
- acquireShared實現(xiàn)
- 調(diào)用
tryAcquireShared(int arg)
方法嘗試獲取同步狀態(tài):
tryAcquireShared
方法返回值 > 0時,表示能夠獲取到同步狀態(tài)备禀; - 獲取失敗調(diào)用
doAcquireShared(int arg)
方法進入同步隊列洲拇。
- doAcquireShared實現(xiàn)
- 獲取當(dāng)前節(jié)點的前驅(qū)節(jié)點奈揍;
- 如果當(dāng)前節(jié)點的前驅(qū)節(jié)點是頭結(jié)點,并且獲取到的共享同步狀態(tài) > 0赋续,設(shè)置當(dāng)前節(jié)點的為頭結(jié)點打月,獲取同步狀態(tài)成功;
- 不滿足條件的線程自旋等待蚕捉。
與獨占式獲取同步狀態(tài)一樣奏篙,共享式獲取也是需要釋放同步狀態(tài)的,AQS提供releaseShared(int arg)
方法可以釋放同步狀態(tài)迫淹。
共享式同步狀態(tài)釋放 - releaseShared實現(xiàn)
- 調(diào)用
tryReleaseShared
方法釋放狀態(tài)秘通; - 調(diào)用
doReleaseShared
方法喚醒后繼節(jié)點;
獨占式超時獲取 - doAcquireNanos
該方法提供了超時獲取同步狀態(tài)調(diào)用敛熬,假如在指定的時間段內(nèi)可以獲取到同步狀態(tài)返回true肺稀,否則返回false。它是acquireInterruptibly(int arg)
的增強版应民。
-
acquireInterruptibly實現(xiàn)
該方法提供了獲取同步狀態(tài)的能力话原,同樣,在無法獲取同步狀態(tài)時會進入同步隊列诲锹,這類似于acquire的功能繁仁,但是它和acquire還是區(qū)別的:acquireInterruptibly可以在外界對當(dāng)前線程進行中斷的時候可以提前獲取到同步狀態(tài)的操作,換個通俗易懂的解釋吧:類似于synchronized獲取鎖時归园,這時候外界對當(dāng)前線程中斷了黄虱,線程獲取鎖的這個操作能夠及時響應(yīng)中斷并且提前返回。acquireInterruptibly實現(xiàn)- 判斷當(dāng)前線程是否被中斷庸诱,如果已經(jīng)被中斷捻浦,拋出
InterruptedException
異常并將中斷標(biāo)志位置為false; - 獲取同步狀態(tài)桥爽,獲取成功并返回朱灿,獲取不成功調(diào)用
doAcquireInterruptibly(int arg)
排隊等待。
- 判斷當(dāng)前線程是否被中斷庸诱,如果已經(jīng)被中斷捻浦,拋出
-
doAcquireInterruptibly實現(xiàn)
doAcquireInterruptibly實現(xiàn)- 構(gòu)造節(jié)點Node钠四,加入同步隊列盗扒;
- 假如當(dāng)前節(jié)點是首節(jié)點并且可以獲取到同步狀態(tài),將當(dāng)前節(jié)點設(shè)置為頭結(jié)點形导,其他節(jié)點自旋等待环疼;
- 節(jié)點每次被喚醒的時候,需要進行中斷檢測朵耕,假如當(dāng)前線程被中斷,拋出異常
InterruptedException
淋叶,退出循環(huán)阎曹。
-
doAcquireNanos實現(xiàn)
該方法在支持中斷響應(yīng)的基礎(chǔ)上,增加了超時獲取的特性。針對超時獲取处嫌,主要在于計算出需要睡眠的時間間隔nanosTimeout栅贴,如果nanosTimeout > 0表示當(dāng)前線程還需要睡眠,反之返回false熏迹。
doAcquireNanos實現(xiàn)- nanosTimeout <= 0檐薯,表明當(dāng)前線程不需要睡眠,返回false注暗,不能獲取到同步狀態(tài)坛缕;
- 不滿足條件的線程加入同步隊列;
- 假如當(dāng)前節(jié)點是首節(jié)點捆昏,并且可以獲取到同步狀態(tài)赚楚,將當(dāng)前節(jié)點設(shè)置為頭結(jié)點并退出,返回true骗卜,表明在指定的時間內(nèi)可以獲取到同步狀態(tài)宠页;
- 不滿足條件3的線程,計算出當(dāng)前休眠時間寇仓,nanosTimeout = 原有nanosTimeout + deadline(睡眠之前記錄的時間)- now(
System.nanoTime()
:當(dāng)前時間):
- 如果
nanosTimeout <= 0
举户,返回超時未獲取到同步狀態(tài); - 如果
nanosTimeout > 0 && nanosTimeout <= 1000L
遍烦,線程快速自旋
注:為什么不直接進入超時等待呢敛摘?原因在于非常短的超時等待是無法做到十分精確的,如果這時候再進入超時等待會讓nanosTimeout的超時從整體上表現(xiàn)的不精確乳愉,所以兄淫,在超時非常短的情況下,AQS都會無條件進入快速自旋蔓姚;
- 如果nanosTimeout > 1000L
捕虽,線程通過LockSupport.parkNanos
進入超時等待。
整個流程可以總結(jié)如下圖所示:
后記
在上述對AQS進行了實現(xiàn)層面的分析之后坡脐,我們就一個案例來加深對AQS的理解泄私。
案例:設(shè)計一個AQS同步器,該工具在同一時刻备闲,只能有兩個線程能夠訪問晌端,其他的線程阻塞。
設(shè)計分析:針對上述案例恬砂,我們可以這樣定義AQS咧纠,設(shè)定一個初始狀態(tài)為2,每一個線程獲取一次就-1泻骤,正確的狀態(tài)為:0漆羔,1梧奢,2,0表示新的線程獲取同步狀態(tài)時阻塞演痒。由于資源數(shù)量大與1亲轨,需要實現(xiàn)tryAcquireShared
和tryReleaseShared
方法。
代碼實現(xiàn):
public class LockInstance implements Lock {
private final Sync sync = new Sync(2);
private static final class Sync extends AbstractQueuedSynchronizer {
Sync(int state) {
if (state <= 0) {
throw new IllegalArgumentException("count must large than 0");
}
setState(state);
}
@Override
public int tryAcquireShared(int arg) {
for (;;) {
System.out.println("try acquire....");
int current = getState();
int now = current - arg;
if (now < 0 || compareAndSetState(current, now)) {
return now;
}
}
}
@Override
public boolean tryReleaseShared(int arg) {
for(;;) {
System.out.println("try release....");
int current = getState();
int now = current + arg;
if (compareAndSetState(current, now)) {
return true;
}
}
}
}
@Override
public void lock() {
sync.acquireShared(1);
}
@Override
public void lockInterruptibly() throws InterruptedException {
sync.acquireInterruptibly(1);
}
@Override
public boolean tryLock() {
return sync.tryAcquireShared(1) >= 0;
}
@Override
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
return sync.tryAcquireSharedNanos(1, unit.toNanos(time));
}
@Override
public void unlock() {
sync.tryReleaseShared(1);
}
@Override
public Condition newCondition() {
return null;
}
}