前言:為什么要了解AQS药版?
在如今有很多高并發(fā)的場(chǎng)景下,都免不了使用多線程喻犁,使用多線程就避免不了了解鎖槽片。之前也提到了synchronized鎖(參見(jiàn)文章synchronized鎖)何缓,另一個(gè)常用的鎖就是ReentrantLock,而ReentrantLock底層實(shí)現(xiàn)就是AQS筐乳,當(dāng)然還有很多其他的實(shí)現(xiàn)歌殃,接下來(lái)我們一起了解下。
一蝙云、AQS是什么氓皱?
AQS全稱:AbstractQueuedSynchronizer,抽象隊(duì)列式同步器勃刨。
AQS是一個(gè)抽象類波材,它定義了一套多線程訪問(wèn)共享資源的同步器框架。通俗解釋身隐,AQS就像是一個(gè)隊(duì)列管理員廷区,當(dāng)多線程操作時(shí),對(duì)這些線程進(jìn)行排隊(duì)管理贾铝。
AQS本身是一個(gè)抽象類隙轻,所以并沒(méi)有單獨(dú)實(shí)現(xiàn)什么功能,但是很多功能都繼承了AQS類垢揩,依賴于其底層支持玖绿。如:ReentrantLock、Semaphore叁巨、CountDownLatch斑匪、CycleBarrier。
二锋勺、AQS如何實(shí)現(xiàn)的蚀瘸?
AQS主要通過(guò)維護(hù)了兩個(gè)變量來(lái)實(shí)現(xiàn)同步機(jī)制的
2.1、state
AQS使用一個(gè)volatile修飾的私有變量來(lái)表示同步狀態(tài)庶橱,當(dāng)state=0表示釋放了鎖贮勃,當(dāng)state>0表示獲得鎖。
/**
* The synchronization state.
*/
private volatile int state;
另外苏章,AQS提供了以下三個(gè)方法來(lái)對(duì)state進(jìn)行操作衙猪。
protected final int getState() {
return state;
}
protected final void setState(int newState) {
state = newState;
}
protected final boolean compareAndSetState(int expect, int update) {
// See below for intrinsics setup to support this
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
2.2、FIFO同步隊(duì)列
AQS通過(guò)內(nèi)置的FIFO同步隊(duì)列布近,來(lái)實(shí)現(xiàn)線程的排隊(duì)工作。
如果線程獲取當(dāng)前同步狀態(tài)失敗丝格,AQS會(huì)將當(dāng)前線程的信息封裝成一個(gè)Node節(jié)點(diǎn)撑瞧,加入同步隊(duì)列中,并且阻塞該線程显蝌,當(dāng)同步狀態(tài)釋放预伺,則會(huì)將隊(duì)列中的線程喚醒订咸,重新嘗試獲取同步狀態(tài)。
static final class Node {
/** Marker to indicate a node is waiting in shared mode */
static final Node SHARED = new Node();
/** Marker to indicate a node is waiting in exclusive mode */
static final Node EXCLUSIVE = null;
/** waitStatus value to indicate thread has cancelled */
static final int CANCELLED = 1;
/** waitStatus value to indicate successor's thread needs unparking */
static final int SIGNAL = -1;
/** waitStatus value to indicate thread is waiting on condition */
static final int CONDITION = -2;
/**
* waitStatus value to indicate the next acquireShared should
* unconditionally propagate
*/
static final int PROPAGATE = -3;
volatile int waitStatus;
volatile Node prev;
volatile Node next;
volatile Thread thread;
Node nextWaiter;
/**
* Returns true if node is waiting in shared mode.
*/
final boolean isShared() {
return nextWaiter == SHARED;
}
/**
* Returns previous node, or throws NullPointerException if null.
* Use when predecessor cannot be null. The null check could
* be elided, but is present to help the VM.
*
* @return the predecessor of this node
*/
final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}
Node() { // Used to establish initial head or SHARED marker
}
Node(Thread thread, Node mode) { // Used by addWaiter
this.nextWaiter = mode;
this.thread = thread;
}
Node(Thread thread, int waitStatus) { // Used by Condition
this.waitStatus = waitStatus;
this.thread = thread;
}
}
三酬诀、AQS一些特點(diǎn)
3.1 共享鎖和獨(dú)占鎖
AQS實(shí)現(xiàn)的獨(dú)占鎖有ReentrantLock脏嚷,共享鎖有Semaphore,CountDownlatch,CycleBarrier瞒御。
3.1.1 要實(shí)現(xiàn)一個(gè)獨(dú)占鎖父叙,需要重寫tryAcquire,tryRelease方法
Acquire: tryAcquire(嘗試獲取鎖)肴裙、addWaiter(入隊(duì))趾唱、acquireQueued(隊(duì)列中的線程循環(huán)獲取鎖,失敗則掛起shouldParkAfterFailedAcquire)蜻懦。
Release:tryRelease(嘗試釋放鎖)甜癞、unparkSuccessor(喚醒后繼節(jié)點(diǎn))。
3.1.2 要實(shí)現(xiàn)共享鎖宛乃,需要重寫tryAcquireShared悠咱、tryReleaseShared
AcquireShared:tryAcquireShared,doAcquireShared征炼。
RealseShared:tryRealseShared析既、doReleaseShared。
3.2 等待狀態(tài)位
CANCELLED = 1:因?yàn)槌瑫r(shí)或中斷柒室,狀態(tài)位倍設(shè)置為取消渡贾,該線程不能去競(jìng)爭(zhēng)鎖,也不能轉(zhuǎn)換為其他狀態(tài)雄右;被檢測(cè)到之后會(huì)被踢出同步隊(duì)列空骚,被GC回收。
SIGNAL = -1:該節(jié)點(diǎn)的后繼節(jié)點(diǎn)被阻塞擂仍,到時(shí)需要喚醒
CONDITION = -2:該節(jié)點(diǎn)在條件隊(duì)列中(condition)囤屹,因?yàn)榈却龡l件而阻塞。
PROPAGATE = -3:使用在共享模式的頭節(jié)點(diǎn)可能處于此狀態(tài)逢渔,表示鎖的下一次獲取可以無(wú)條件傳播肋坚。
0 :無(wú)狀態(tài)
四、了解AQS的整體流程
可見(jiàn)下面流程圖
五肃廓、AQS詳細(xì)分析
5.1 Sync.nonfairTryAcquire
nonfairTryAcquire方法將是lock方法間接調(diào)用的第一個(gè)方法智厌,每次請(qǐng)求鎖時(shí)都會(huì)首先調(diào)用該方法。
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
- 該方法會(huì)首先判斷當(dāng)前狀態(tài)盲赊,如果c==0說(shuō)明沒(méi)有線程正在競(jìng)爭(zhēng)該鎖铣鹏,如果不c !=0 說(shuō)明有線程正擁有了該鎖。
- 如果發(fā)現(xiàn)c==0哀蘑,則通過(guò)CAS設(shè)置該狀態(tài)值為acquires,acquires的初始調(diào)用值為1诚卸,每次線程重入該鎖都會(huì)+1葵第,每次unlock都會(huì)-1,但為0時(shí)釋放鎖合溺。如果CAS設(shè)置成功卒密,則可以預(yù)計(jì)其他任何線程調(diào)用CAS都不會(huì)再成功,也就認(rèn)為當(dāng)前線程得到了該鎖棠赛,也作為Running線程哮奇,很顯然這個(gè)Running線程并未進(jìn)入等待隊(duì)列。
- 如果c !=0 但發(fā)現(xiàn)自己已經(jīng)擁有鎖恭朗,只是簡(jiǎn)單地++acquires屏镊,并修改status值,但因?yàn)闆](méi)有競(jìng)爭(zhēng)痰腮,所以通過(guò)setStatus修改而芥,而非CAS,也就是說(shuō)這段代碼實(shí)現(xiàn)了偏向鎖的功能膀值,并且實(shí)現(xiàn)的非常漂亮棍丐。
5.2 AbstractQueuedSynchronizer.addWaiter
addWaiter方法負(fù)責(zé)把當(dāng)前無(wú)法獲得鎖的線程包裝為一個(gè)Node添加到隊(duì)尾:
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
其中參數(shù)mode是獨(dú)占鎖還是共享鎖,默認(rèn)為null沧踏,獨(dú)占鎖歌逢。追加到隊(duì)尾的動(dòng)作分兩步:
- 如果當(dāng)前隊(duì)尾已經(jīng)存在(tail!=null),則使用CAS把當(dāng)前線程更新為Tail翘狱。
- 如果當(dāng)前Tail為null或則線程調(diào)用CAS設(shè)置隊(duì)尾失敗秘案,則通過(guò)enq方法繼續(xù)設(shè)置Tail
下面是enq方法:
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
Node h = new Node(); // Dummy header
h.next = node;
node.prev = h;
if (compareAndSetHead(h)) {
tail = node;
return h;
}
}
else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
該方法就是循環(huán)調(diào)用CAS,即使有高并發(fā)的場(chǎng)景潦匈,無(wú)限循環(huán)將會(huì)最終成功把當(dāng)前線程追加到隊(duì)尾(或設(shè)置隊(duì)頭)阱高。總而言之,addWaiter的目的就是通過(guò)CAS把當(dāng)前現(xiàn)在追加到隊(duì)尾茬缩,并返回包裝后的Node實(shí)例赤惊。
把線程要包裝為Node對(duì)象的主要原因,除了用Node構(gòu)造供虛擬隊(duì)列外凰锡,還用Node包裝了各種線程狀態(tài)未舟。
5.3 AbstractQueuedSynchronizer.acquireQueued
acquireQueued的主要作用是把已經(jīng)追加到隊(duì)列的線程節(jié)點(diǎn)(addWaiter方法返回值)進(jìn)行阻塞,但阻塞前又通過(guò)tryAccquire重試是否能獲得鎖掂为,如果重試成功能則無(wú)需阻塞裕膀,這里是非公平鎖的由來(lái)之二
final boolean acquireQueued(final Node node, int arg) {
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} catch (RuntimeException ex) {
cancelAcquire(node);
throw ex;
}
}
仔細(xì)看看這個(gè)方法是個(gè)無(wú)限循環(huán),感覺(jué)如果p == head && tryAcquire(arg)條件不滿足循環(huán)將永遠(yuǎn)無(wú)法結(jié)束勇哗,當(dāng)然不會(huì)出現(xiàn)死循環(huán)昼扛,奧秘在于第12行的parkAndCheckInterrupt會(huì)把當(dāng)前線程掛起,從而阻塞住線程的調(diào)用棧智绸。
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
如前面所述野揪,LockSupport.park最終把線程交給系統(tǒng)(Linux)內(nèi)核進(jìn)行阻塞。當(dāng)然也不是馬上把請(qǐng)求不到鎖的線程進(jìn)行阻塞瞧栗,還要檢查該線程的狀態(tài)斯稳,比如如果該線程處于Cancel狀態(tài)則沒(méi)有必要,具體的檢查在shouldParkAfterFailedAcquire中:
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
/*
* This node has already set status asking a release
* to signal it, so it can safely park
*/
return true;
if (ws > 0) {
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
檢查原則在于:
- 規(guī)則1:如果前繼的節(jié)點(diǎn)狀態(tài)為SIGNAL迹恐,表明當(dāng)前節(jié)點(diǎn)需要unpark挣惰,則返回成功,此時(shí)acquireQueued方法的第12行(parkAndCheckInterrupt)將導(dǎo)致線程阻塞
- 規(guī)則2:如果前繼節(jié)點(diǎn)狀態(tài)為CANCELLED(ws>0)殴边,說(shuō)明前置節(jié)點(diǎn)已經(jīng)被放棄憎茂,則回溯到一個(gè)非取消的前繼節(jié)點(diǎn),返回false锤岸,acquireQueued方法的無(wú)限循環(huán)將遞歸調(diào)用該方法竖幔,直至規(guī)則1返回true,導(dǎo)致線程阻塞
- 規(guī)則3:如果前繼節(jié)點(diǎn)狀態(tài)為非SIGNAL是偷、非CANCELLED拳氢,則設(shè)置前繼的狀態(tài)為SIGNAL,返回false后進(jìn)入acquireQueued的無(wú)限循環(huán)蛋铆,與規(guī)則2同
總體看來(lái)馋评,shouldParkAfterFailedAcquire就是靠前繼節(jié)點(diǎn)判斷當(dāng)前線程是否應(yīng)該被阻塞,如果前繼節(jié)點(diǎn)處于CANCELLED狀態(tài)刺啦,則順便刪除這些節(jié)點(diǎn)重新構(gòu)造隊(duì)列留特。
至此,鎖住線程的邏輯已經(jīng)完成玛瘸,下面討論解鎖的過(guò)程蜕青。
5.4. 解鎖
請(qǐng)求鎖不成功的線程會(huì)被掛起在acquireQueued方法的第12行,12行以后的代碼必須等線程被解鎖鎖才能執(zhí)行捧韵,假如被阻塞的線程得到解鎖市咆,則執(zhí)行第13行,即設(shè)置interrupted = true再来,之后又進(jìn)入無(wú)限循環(huán)蒙兰。
從無(wú)限循環(huán)的代碼可以看出,并不是得到解鎖的線程一定能獲得鎖芒篷,必須在第6行中調(diào)用tryAccquire重新競(jìng)爭(zhēng)搜变,因?yàn)殒i是非公平的,有可能被新加入的線程獲得针炉,從而導(dǎo)致剛被喚醒的線程再次被阻塞挠他,這里充分體現(xiàn)了“非公平”。通過(guò)之后將要介紹的解鎖機(jī)制會(huì)看到篡帕,第一個(gè)被解鎖的線程就是Head殖侵,因此p == head的判斷基本都會(huì)成功贸呢。
解鎖代碼相對(duì)簡(jiǎn)單,主要體現(xiàn)在AbstractQueuedSynchronizer.release和Sync.tryRelease方法中:
class AbstractQueuedSynchronizer
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
class Sync
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
tryRelease與tryAcquire語(yǔ)義相同拢军,把如何釋放的邏輯延遲到子類中楞陷。tryRelease語(yǔ)義很明確:如果線程多次鎖定,則進(jìn)行多次釋放茉唉,直至status==0則真正釋放鎖固蛾,所謂釋放鎖即設(shè)置status為0,因?yàn)闊o(wú)競(jìng)爭(zhēng)所以沒(méi)有使用CAS度陆。
release的語(yǔ)義在于:如果可以釋放鎖艾凯,則喚醒隊(duì)列第一個(gè)線程(Head),具體喚醒代碼如下:
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
這段代碼的意思在于找出第一個(gè)可以u(píng)npark的線程懂傀,一般說(shuō)來(lái)head.next == head趾诗,Head就是第一個(gè)線程,但Head.next可能被取消或被置為null鸿竖,因此比較穩(wěn)妥的辦法是從后往前找第一個(gè)可用線程沧竟。貌似回溯會(huì)導(dǎo)致性能降低,其實(shí)這個(gè)發(fā)生的幾率很小缚忧,所以不會(huì)有性能影響悟泵。之后便是通知系統(tǒng)內(nèi)核繼續(xù)該線程,在Linux下是通過(guò)pthread_mutex_unlock完成闪水。之后糕非,被解鎖的線程進(jìn)入上面所說(shuō)的重新競(jìng)爭(zhēng)狀態(tài)。