前言
AQS(AbstractQueuedSynchronizer)
是一個(gè)抽象類蛉腌,預(yù)定義了一些需要由我們自己實(shí)現(xiàn)的方法恰响,用來(lái)構(gòu)建自定義的同步工具拓颓。
抽象類同接口一樣讲坎,都無(wú)法利用new語(yǔ)句創(chuàng)建對(duì)象實(shí)例,它存在意義在于預(yù)先幫我們實(shí)現(xiàn)好了一些方法政己,并暴露出一些抽象方法由子類實(shí)現(xiàn)酌壕,本質(zhì)上相當(dāng)于一個(gè)模版掏愁。
先看一下AbstractQueuedSynchronizer
類里有哪些方法需要我們自己實(shí)現(xiàn)。
//獨(dú)占模式
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
protected boolean tryRelease(int arg) {
throw new UnsupportedOperationException();
}
protected boolean isHeldExclusively() {
throw new UnsupportedOperationException();
}
// 共享模式
protected int tryAcquireShared(int arg) {
throw new UnsupportedOperationException();
}
protected boolean tryReleaseShared(int arg) {
throw new UnsupportedOperationException();
}
上述五種方法卵牍,雖然沒(méi)有明確要求子類實(shí)現(xiàn)果港,但是可以看到其方法內(nèi)部沒(méi)有任何判斷邏輯,而是直接拋出了一個(gè)UnsupportedOperationException()
的異常糊昙,表示不支持該操作辛掠,顯然當(dāng)我們實(shí)現(xiàn)定制化同步工具時(shí),這5
個(gè)方法就是關(guān)鍵所在释牺。而這5
個(gè)方法又可以分為獨(dú)占模式
和共享模式兩大類
萝衩。
獨(dú)占模式,比如互斥鎖没咙,一個(gè)鎖在一個(gè)時(shí)刻只能由一個(gè)線程占用猩谊,其他線程必須阻塞等待該線程釋放鎖。而共享模式祭刚,比如讀鎖牌捷,可以由多個(gè)線程共同持有。
這里我們介紹的是獨(dú)占模式涡驮,也就是前三個(gè)方法暗甥,我們先來(lái)分析一下這三個(gè)方法分別代表的含義。
方法名 | 作用 |
---|---|
boolean tryAcquire(int arg) | 嘗試去獲取同步狀態(tài) |
boolean tryRelease(int arg) | 嘗試去釋放同步狀態(tài) |
boolean isHeldExclusively | 判斷當(dāng)前線程有無(wú)獲取到同步狀態(tài) |
這里先解釋一下捉捅,什么是同步狀態(tài)淋袖。在AQS
類內(nèi)部,給我們定義了一個(gè)私有的類變量state
private volatile int state;
這個(gè)state
變量由volatile
關(guān)鍵詞修飾锯梁,所以保證了其在多個(gè)線程之間的可見(jiàn)性。在獨(dú)占模式下焰情,我們可以定義這個(gè)state
只有0
和1
兩個(gè)狀態(tài), 假設(shè)0
表示已有線程進(jìn)入同步代碼塊陌凳,則其他線程阻塞等待,假設(shè)1
表示當(dāng)前沒(méi)有線程進(jìn)入同步代碼塊内舟,則當(dāng)前線程可以嘗試進(jìn)入合敦。那么當(dāng)一個(gè)線程想進(jìn)入同步代碼塊時(shí),會(huì)判斷state
是否是0
验游,若是則修改其為1
充岛,表示自己已經(jīng)成功進(jìn)入同步代碼塊,其他線程需要等待耕蝉,這個(gè)過(guò)程就是tryAcquire
崔梗。而當(dāng)已經(jīng)在同步代碼塊內(nèi)部的線程準(zhǔn)備離開(kāi)時(shí),必須要修改當(dāng)前state
由1
變?yōu)?code>0,這個(gè)過(guò)程就是tryRelease
垒在。
理解了這個(gè)是不是就很容易說(shuō)自己實(shí)現(xiàn)tryAcquire
方法呢蒜魄?比如如下代碼。
//錯(cuò)誤的實(shí)現(xiàn)
@Override
protected boolean tryAcquire(int arg) {
if (state == 0) {
state = 1;
return true;
} else {
return false;
}
}
顯然在并發(fā)場(chǎng)景下這里會(huì)發(fā)生錯(cuò)誤,比如線程A
和線程B
同時(shí)判斷state==0
為true
谈为,然后都修改state = 1
,此時(shí)線程A
和B
都進(jìn)入同步代碼塊旅挤,這顯然和我們獨(dú)占模式的設(shè)計(jì)沖突了。這是一個(gè)經(jīng)典的比較并更新的場(chǎng)景伞鲫,需要我們保證這兩個(gè)操作的原子性粘茄。AQS
內(nèi)部給我們提供了幾個(gè)訪問(wèn)和設(shè)置state
值的方法。
protected final boolean compareAndSetState(int expect, int update) {
// See below for intrinsics setup to support this
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
protected final void setState(int newState) {
state = newState;
}
protected final int getState() {
return state;
}
可以看到第一個(gè)compareAndSetState
方法利用CAS
保證了原子性秕脓,而第二個(gè)setState
則沒(méi)有柒瓣,setState
方法可以用在我們之前所說(shuō)的tryRelease
方法里,因?yàn)楠?dú)占模式下同步代碼塊內(nèi)部同一時(shí)刻最多只存在一個(gè)線程撒会,所以不會(huì)發(fā)生并發(fā)錯(cuò)誤嘹朗,只要簡(jiǎn)單的通過(guò)setState
方法將state
的值由1
設(shè)置為0
就可以了。所以正確的tryAcquire
實(shí)現(xiàn)如下:
@Override
protected boolean tryAcquire(int arg) {
return compareAndSetState(0, 1);
}
現(xiàn)在只要我們實(shí)現(xiàn)了自己的tryAcquire
方法和tryRelease
方法邏輯诵肛,就可以自定義一個(gè)同步工具了屹培,當(dāng)一個(gè)線程tryAcquire
成功時(shí),就會(huì)發(fā)生阻塞怔檩。當(dāng)一個(gè)線程tryRelease
成功時(shí)褪秀,就離開(kāi)了同步代碼塊。AQS
已經(jīng)幫我們實(shí)現(xiàn)了諸如阻塞等待等的復(fù)雜方法薛训,tryAcquire
和tryRelease
只是它們判斷邏輯中小小的一環(huán)而已媒吗,下面看一下AQS
的具體執(zhí)行流程。
acquire()
tryAcquire
方法的注釋里有一段話乙埃,This method is always invoked by the thread performing acquire. If this method reports failure, the acquire method may queue the thread.
說(shuō)的是tryAcquire
都會(huì)被acquire
執(zhí)行闸英,并且如果返回false
,就有可能把當(dāng)前的請(qǐng)求線程加入到一個(gè)等待隊(duì)列中去介袜,直到被喚醒甫何。我們先看下這個(gè)等待隊(duì)列是什么樣的。
- 等待隊(duì)列的節(jié)點(diǎn)類
static final class Node {
/** Marker to indicate a node is waiting in shared mode */
static final Node SHARED = new Node();
/** Marker to indicate a node is waiting in exclusive mode */
static final Node EXCLUSIVE = null;
/** waitStatus value to indicate thread has cancelled */
static final int CANCELLED = 1;
/** waitStatus value to indicate successor's thread needs unparking */
static final int SIGNAL = -1;
/** waitStatus value to indicate thread is waiting on condition */
static final int CONDITION = -2;
/**
* waitStatus value to indicate the next acquireShared should
* unconditionally propagate
*/
static final int PROPAGATE = -3;
volatile int waitStatus;
volatile Node prev;
volatile Node next;
volatile Thread thread;
Node nextWaiter;
final boolean isShared() {
return nextWaiter == SHARED;
}
final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}
Node() { // Used to establish initial head or SHARED marker
}
Node(Thread thread, Node mode) { // Used by addWaiter
this.nextWaiter = mode;
this.thread = thread;
}
Node(Thread thread, int waitStatus) { // Used by Condition
this.waitStatus = waitStatus;
this.thread = thread;
}
}
AQS
內(nèi)部定義了一個(gè)靜態(tài)內(nèi)部類Node
遇伞,也就是這個(gè)等待隊(duì)列的節(jié)點(diǎn)辙喂,從Node
的定義上可以看出,這個(gè)等待隊(duì)列實(shí)際上是一個(gè)雙向鏈表鸠珠,鏈表中的每一個(gè)節(jié)點(diǎn)代表了對(duì)應(yīng)的Thread
巍耗,同時(shí)還給每個(gè)Node
設(shè)置了waitStatus
狀態(tài)(之后細(xì)說(shuō))。
AQS
內(nèi)部定義指向這個(gè)隊(duì)列的頭節(jié)點(diǎn)和尾節(jié)點(diǎn)的引用渐排,同樣用volatile
修飾炬太,保證多線程這間的可見(jiàn)性。
private transient volatile Node head;
private transient volatile Node tail;
之前說(shuō)了驯耻,我們自己實(shí)現(xiàn)的方法tryAcquire
只是acquire
執(zhí)行邏輯之的一環(huán)娄琉,我們來(lái)看看acquire
具體是如何執(zhí)行次乓,并使用這個(gè)雙向鏈表結(jié)構(gòu)的隊(duì)列的。
-
acquire
方法
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
當(dāng)tryAcquire
失敗時(shí)孽水,說(shuō)明此時(shí)已有其他線程在同步代碼塊內(nèi)部票腰,需要等待,于是執(zhí)行addWaiter
方法女气。
-
addWaiter
方法
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
addWaiter
方法的功能是創(chuàng)建一個(gè)代表當(dāng)前線程的Node
加入到等待隊(duì)列中去杏慰,其執(zhí)行邏輯為
- 為當(dāng)前進(jìn)入同步代碼塊失敗的線程,創(chuàng)建一個(gè)對(duì)應(yīng)的
Node
實(shí)例炼鞠,這里的mode
有兩種Node.EXCLUSIVE
和Node.SHARED
缘滥,在獨(dú)占模式下也就是選擇Node.EXCLUSIVE
。- 拿到當(dāng)前等待隊(duì)列的尾節(jié)點(diǎn)
tail
谒主,如果當(dāng)前等待隊(duì)列不為空且執(zhí)行CAS
操作插入成功朝扼,則該節(jié)點(diǎn)成為新的tail
,則返回。- 否則調(diào)用
enq
方法進(jìn)行插入霎肯,并返回擎颖。
值得關(guān)注是,若當(dāng)前隊(duì)列不為空观游,這個(gè)插入操作是需要通過(guò)CAS
來(lái)完成的搂捧,否則可能造成并發(fā)錯(cuò)誤,使得多個(gè)節(jié)點(diǎn)都誤以為自己插入成功懂缕,實(shí)際上最終真正插入成功的只有一個(gè)允跑,從而造成一些節(jié)點(diǎn)丟失的現(xiàn)象。在這里搪柑,多個(gè)線程在第一步都會(huì)把自己的prev
指向當(dāng)前隊(duì)列的尾部聋丝,然后通過(guò)CAS,使得只有一個(gè)節(jié)點(diǎn)成為新的tail
節(jié)點(diǎn)工碾,其余的線程會(huì)再去執(zhí)行enq
方法潮针。
然后再看下未通過(guò)CAS成功插入的節(jié)點(diǎn)會(huì)發(fā)生什么。
-
enq
方法
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
enq
方法整體上采用了一個(gè)無(wú)限循環(huán)結(jié)構(gòu)倚喂,也就是不斷的嘗試插入,直到成功為止瓣戚。
- 判斷當(dāng)前隊(duì)列是否為空端圈,如果為空則插入一個(gè)新節(jié)點(diǎn)作為頭部,該節(jié)點(diǎn)不表示任何線程子库,具體作用之后會(huì)提到舱权。
- 若當(dāng)前隊(duì)列不為空,采用和之前
addWaiter
里一樣的方法仑嗅,執(zhí)行CAS
插入宴倍,不成功則重試张症,直到成功為止。
當(dāng)一個(gè)代表特定線程的Node
被加入到隊(duì)列中(尾部)之后鸵贬,acquire
會(huì)接著調(diào)用acquireQueued
方法俗他,acquireQueued
方法具體實(shí)現(xiàn)如下。
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
acquireQueued
內(nèi)部實(shí)現(xiàn)整體上還是一個(gè)無(wú)限循環(huán)阔逼,具體執(zhí)行流程如下
- 拿到之前加入隊(duì)列的節(jié)點(diǎn)的前驅(qū)節(jié)點(diǎn)
- 判斷這個(gè)前驅(qū)節(jié)點(diǎn)是不是
head
節(jié)點(diǎn)兆衅,(注意我們之前提到過(guò)head
節(jié)點(diǎn)是不指向任何線程的 ), 如果是head
節(jié)點(diǎn)嗜浮,那么說(shuō)明當(dāng)前節(jié)點(diǎn)排在等待隊(duì)列的隊(duì)首羡亩,即有可能馬上就會(huì)被喚醒,所以阻塞該線程之前危融,再嘗試一下看看能否進(jìn)入同步代碼塊畏铆。- 如果運(yùn)氣好,
tryAcquire
成功吉殃,那么這個(gè)等待的線程說(shuō)明可以進(jìn)入同步代碼塊了辞居,下面要做的就是把該節(jié)點(diǎn)設(shè)為新的頭節(jié)點(diǎn)(因?yàn)檫@個(gè)節(jié)點(diǎn)已經(jīng)沒(méi)用了)。并且把之前的頭節(jié)點(diǎn)的next
指針設(shè)置成null
寨腔,來(lái)讓GC
把它回收掉速侈。- 如果不能不滿足上述條件,說(shuō)明當(dāng)前線程需要阻塞等待了迫卢,但在真正進(jìn)入阻塞等待狀態(tài)(調(diào)用
parkAndCheckInterrupt
方法)之前倚搬,還會(huì)再進(jìn)行一次判斷,是不是應(yīng)該wait
, 如果不是乾蛤,再重新執(zhí)行循環(huán)流程每界。這個(gè)再判斷的方法就是shouldParkAfterFailedAcquire
。
-
shouldParkAfterFailedAcquire
方法的功能就如同它的名字一樣家卖,判斷在tryAcquire之后眨层,是否要進(jìn)行阻塞等待狀態(tài)
。該方法傳入的參數(shù)是當(dāng)前節(jié)點(diǎn)node
的其前驅(qū)節(jié)點(diǎn)pred
上荡。
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
return true;
if (ws > 0){
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
這里出現(xiàn)了我們之前沒(méi)有介紹的waitStatus
,Node
節(jié)點(diǎn)內(nèi)部定義了幾種Status
/** waitStatus value to indicate thread has cancelled */
static final int CANCELLED = 1;
/** waitStatus value to indicate successor's thread needs unparking */
static final int SIGNAL = -1;
/** waitStatus value to indicate thread is waiting on condition */
static final int CONDITION = -2;
/**
* waitStatus value to indicate the next acquireShared should
* unconditionally propagate
*/
static final int PROPAGATE = -3;
CANCELLED = 1
表示當(dāng)前線程已經(jīng)被取消趴樱,可以忽略并把它移出隊(duì)列了,SIGNAL = -1
表示當(dāng)前節(jié)點(diǎn)的后繼結(jié)點(diǎn)
需要被喚醒, condition
表示該線程處于條件等待狀態(tài)酪捡。waitStatus
的初始狀態(tài)為0
叁征。基于此我們?cè)賮?lái)看shouldParkAfterFailedAcquire
的執(zhí)行流程逛薇。
- 獲取當(dāng)前節(jié)點(diǎn)其前驅(qū)節(jié)點(diǎn)的
waitStatus
,如果waitStatus== node.SIGNAL
捺疼,說(shuō)明其后繼節(jié)點(diǎn)(也就是當(dāng)前節(jié)點(diǎn))處于一個(gè)等待被喚醒的狀態(tài)
,也就是說(shuō)處于阻塞等待狀態(tài),因此返回true
永罚。- 如果
ws > 0
也就是其處于CANCELLED
狀態(tài)啤呼,因當(dāng)從隊(duì)列中移除卧秘,于是利用node.prev = pred = pred.prev;
移除所有CANCELLED
節(jié)點(diǎn)。- 否則官扣,利用CAS操作將前驅(qū)節(jié)點(diǎn)的
waitSatus
設(shè)置為SINGNAL
, 表示其后繼節(jié)點(diǎn)應(yīng)當(dāng)處于阻塞且需要被喚醒狀態(tài)翅敌,返回false
,當(dāng)處于外層的循環(huán)下一次執(zhí)行事,就會(huì)返回true
并開(kāi)始阻塞醇锚。
也就是說(shuō)shouldParkAfterFailedAcquire
哼御,除了給當(dāng)前節(jié)點(diǎn)賦予需要被喚醒狀態(tài)的同時(shí)(通過(guò)設(shè)置前驅(qū)節(jié)點(diǎn)的waitStatus
來(lái)完成),還復(fù)雜移除當(dāng)前節(jié)點(diǎn)前驅(qū)中的代表已經(jīng)被取消的線程的節(jié)點(diǎn)焊唬。
release()
相較于acquire
恋昼,release
方法的邏輯要簡(jiǎn)單很多
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
- 首先判斷是否能夠成功釋放同步狀態(tài),若可以赶促,則獲取當(dāng)前隊(duì)列的頭節(jié)點(diǎn)液肌,否則返回false。
- 獲得當(dāng)前隊(duì)列的頭節(jié)點(diǎn), 如果當(dāng)前隊(duì)列不為空鸥滨,且其
waitStatus != 0
嗦哆,說(shuō)明其后繼節(jié)點(diǎn)所關(guān)聯(lián)的可能是一個(gè)處于阻塞狀態(tài)需要被喚醒的線程,于是調(diào)用unparkSuccessor
喚醒它婿滓,這里體現(xiàn)了第一個(gè)不代表任何線程的頭節(jié)點(diǎn)的作用老速。- 無(wú)論是否成功喚醒一個(gè)等待中的線程,最終都會(huì)釋放同步狀態(tài)凸主,返回true橘券。
再來(lái)看下unparkSuccessor
方法做了什么。
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
- 首先如果當(dāng)前節(jié)點(diǎn)的
waitStatus<0
卿吐,則將其清0旁舰,因?yàn)樗暮罄^節(jié)點(diǎn)所代表的線程會(huì)被喚醒。- 找到要被喚醒的線程嗡官,如果要被喚醒的線程被取消或者為null箭窜,則反向從隊(duì)尾開(kāi)始找到離隊(duì)首最近的未被取消的線程節(jié)點(diǎn)。
3.如果線程存在衍腥,則調(diào)用LockSupport.unpark(s.thread)
方法磺樱,將其從阻塞狀態(tài)中恢復(fù)。
應(yīng)用
ReentrantLock
是JUC
包里提供的一個(gè)支持公平/非公平的可重入鎖婆咸,其實(shí)際就是基于AQS
構(gòu)建的一個(gè)同步工具竹捉。
- ReentrantLock
public class ReentrantLock implements Lock, java.io.Serializable {
private static final long serialVersionUID = 7373984872572414699L;
/** Synchronizer providing all implementation mechanics */
private final Sync sync;
abstract static class Sync extends AbstractQueuedSynchronizer {
....
}
static final class NonfairSync extends Sync {
....
}
static final class FairSync extends Sync {
....
}
public ReentrantLock() {
sync = new NonfairSync();
}
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
....
}
其內(nèi)部包含一個(gè)繼承于 AQS
的抽象子類sync
,并在sync
的基礎(chǔ)上實(shí)現(xiàn)了NonfairSync
非公平鎖和FairSync
公平鎖來(lái)實(shí)現(xiàn)對(duì)應(yīng)的功能擅耽。
下面讓我們也基于AQS
,模仿ReentrantLock
中的寫(xiě)法,來(lái)實(shí)現(xiàn)自己的一個(gè)互斥鎖物遇。
public class MyLock {
private static class Sync extends AbstractQueuedSynchronizer {
@Override
protected boolean tryAcquire(int arg) {
return compareAndSetState(0, 1);
}
@Override
protected boolean tryRelease(int arg) {
setState(0);
return true;
}
@Override
protected boolean isHeldExclusively() {
return getState() == 1;
}
}
private Sync sync;
public MyLock(){
sync = new Sync();
}
public void lock(){
sync.acquire(1);
}
public void unlock(){
sync.release(1);
}
}