寫在前面
上一節(jié)我們講到了CAS
的基本原理贸铜,JUC
下的atomic
類都是通過CAS
來實(shí)現(xiàn)的耙箍。它的核心思想就是比較并替換聪廉,其原子性是有操作系統(tǒng)底層來保證瞬痘。CAS
的無鎖并發(fā)原理很適合在某些場景下替換鎖,但是它仍然有很多缺點(diǎn)板熊,比如ABA的問題图云,比如長時間自旋的性能消耗。今天要講的是JUC
下的核心類AQS
邻邮,即AbstractQueuedSynchronizer
,抽象隊列同步器竣况,它是用來構(gòu)建鎖和其他同步組件的基礎(chǔ)框架,像ReentrantLock
,ReentrantReadWriteLock
都是用AQS來構(gòu)建的筒严。
AQS的基本結(jié)構(gòu)
AQS
內(nèi)部用了一個整型變量state
來表示同步狀態(tài)丹泉,通過內(nèi)置的CLH
隊列來完成資源獲取線程的排隊工作,CLH
隊列是一個FIFO
的雙向隊列鸭蛙,當(dāng)當(dāng)前線程嘗試獲取同步狀態(tài)失敗時摹恨,AQS
將會將當(dāng)前線程的等待狀態(tài),線程等信息構(gòu)造成一個Node
節(jié)點(diǎn)加入CLH
隊列尾部娶视,同時阻塞當(dāng)前線程晒哄。當(dāng)同步狀態(tài)釋放時,喚醒head
節(jié)點(diǎn)肪获,讓其再次嘗試獲取同步狀態(tài)寝凌。
同步狀態(tài)
同步器的主要使用方式是繼承,子類通過繼承AQS
并實(shí)現(xiàn)它的同步方法來管理同步狀態(tài)孝赫,上圖中的state
即為AQS
的同步狀態(tài)较木。
并提供了幾個操作同步狀態(tài)的方法:
protected final int getState() { //獲取state的值
return state;
}
protected final void setState(int newState) { //更新state的值
state = newState;
}
protected final boolean compareAndSetState(int expect, int update) //cas更新state的值
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
這幾個方法都是final
修飾的,說明子類不能重寫青柄,又是protected
修飾的伐债,說明只能在子類中使用预侯。
由于AQS
是基于模板方法設(shè)計的,子類需要重寫指定的方法峰锁,其提供的模板方法分為3類:
- 獨(dú)占式獲取與釋放同步狀態(tài)
- 共享式獲取與釋放同步狀態(tài)
- 查詢同步隊列中的等待線程情況
同步器可重寫的方法如下表所示:
方法名稱 | 描述 |
---|---|
protected boolean tryAcquire(int arg) | 獨(dú)占式獲取同步狀態(tài) |
protected boolean tryRelease(int arg) | 獨(dú)占式釋放同步狀態(tài) |
protected int tryAcquireShared(int arg) | 共享式獲取同步狀態(tài) |
protected boolean tryReleaseShared(int arg) | 共享式釋放同步狀態(tài) |
protected boolean isHeldExclusively() | 當(dāng)前同步器是否在獨(dú)占模式下被占用 |
關(guān)于獨(dú)占模式和共享模式萎馅,一般來講,我們可以通過修改state
字段的同步狀態(tài)來實(shí)現(xiàn)虹蒋。
比如說校坑,在獨(dú)占模式下,state
的初始值是0千诬,每當(dāng)有線程要進(jìn)入獨(dú)占操作的時候耍目,CAS
操作下state
,將其改為1,當(dāng)?shù)诙€線程也需要進(jìn)行獨(dú)占操作的時候徐绑,發(fā)現(xiàn)state
的值已經(jīng)是1了邪驮,那么當(dāng)前線程會一直阻塞,直到獲取同步狀態(tài)成功的線程釋放了同步狀態(tài)傲茄,也就是把state
的值改為了0毅访,喚醒等待隊列的頭結(jié)點(diǎn)線程,重新加入競爭盘榨。
在共享模式下喻粹,假設(shè)允許n個線程并發(fā)執(zhí)行,那么初始化state
的值就為n草巡,超過這個值的線程就會等待守呜。一個線程嘗試獲取同步狀態(tài),就需要先判斷state
的值是不是大于0的山憨,如果不大于0意味著此時n個線程跑滿了查乒,需要阻塞等待,如果大于0郁竟,那么可以嘗試獲取同步狀態(tài)玛迄,并將state
的值減1。當(dāng)某個線程同步操作執(zhí)行結(jié)束后需要釋放同步狀態(tài)棚亩,也就是將state
的值加1蓖议。
說白了,在獨(dú)占模式下讥蟆,我們要重寫tryAcquire
,tryRelease
,isHeldExclusively
,在共享模式下勒虾,需要重寫tryAcquireShared
,tryReleaseShared
,舉個獨(dú)占鎖的栗子:
public class SyncDemo {
private final Sync sync = new Sync();
public void lock() {
sync.acquire(1);
}
public void unlock() {
sync.release(1);
}
private static class Sync extends AbstractQueuedSynchronizer {
/**
* 當(dāng)state為0的時候嘗試獲取同步狀態(tài)
*
* @param arg
* @return
*/
@Override
protected boolean tryAcquire(int arg) {
if (compareAndSetState(0, 1)) { //CAS更新state的值
setExclusiveOwnerThread(Thread.currentThread()); //設(shè)置占用獨(dú)占鎖的線程是當(dāng)前線程
return true;
}
return false;
}
/**
* 嘗試釋放同步狀態(tài),將state設(shè)置為0
*
* @param arg
* @return
*/
@Override
protected boolean tryRelease(int arg) {
setState(0);
setExclusiveOwnerThread(null);
return true;
}
/**
* 是否處于獨(dú)占狀態(tài)
*
* @return
*/
@Override
protected boolean isHeldExclusively() {
return getState() == 1;
}
}
}
這樣我們把lock()
和unlock()
方法暴露出去攻询,就可以當(dāng)成一個獨(dú)占鎖來使用了从撼。
同步隊列
AQS
依賴內(nèi)部的一個FIFO
的CLH
雙向隊列來完成同步狀態(tài)器的管理州弟。當(dāng)前線程獲取同步狀態(tài)失敗時钧栖,同步器會將當(dāng)前線程以及等待狀態(tài)信息等構(gòu)造成一個Node節(jié)點(diǎn)加入CLH
隊列尾部低零,同時阻塞當(dāng)前線程。當(dāng)同步狀態(tài)釋放時拯杠,會把首節(jié)點(diǎn)的線程喚醒掏婶,使其再次嘗試獲取同步狀態(tài)。
Node
節(jié)點(diǎn)是AQS
的一個靜態(tài)內(nèi)部類:
static final class Node {
/** Marker to indicate a node is waiting in shared mode */
static final Node SHARED = new Node();
/** Marker to indicate a node is waiting in exclusive mode */
static final Node EXCLUSIVE = null;
/** waitStatus value to indicate thread has cancelled */
static final int CANCELLED = 1;
/** waitStatus value to indicate successor's thread needs unparking */
static final int SIGNAL = -1;
/** waitStatus value to indicate thread is waiting on condition */
static final int CONDITION = -2;
/**
* waitStatus value to indicate the next acquireShared should
* unconditionally propagate
*/
static final int PROPAGATE = -3;
volatile int waitStatus;
volatile Node prev;
volatile Node next;
volatile Thread thread;
Node nextWaiter;
}
同時我們看到AQS
有兩個成員變量head
和tail
對應(yīng)了CLH
隊列的首尾節(jié)點(diǎn):
Node
里的waitStatus
表示節(jié)點(diǎn)狀態(tài):
變量 | 值 | 描述 |
---|---|---|
CANCELLED | 1 | 等待超時或被中斷的線程節(jié)點(diǎn) |
SIGNAL | -1 | 后繼節(jié)點(diǎn)的線程處于等待狀態(tài) |
CONDITION | -2 | 節(jié)點(diǎn)在等待隊列中 |
PROPAGATE | -3 | 下一次共享式同步狀態(tài)獲取將會無條件地被傳播下去 |
INITIAL | 0 | 初始狀態(tài) |
thread
表示當(dāng)前線程潭陪。
prev
和next
表示同步隊列的前驅(qū)節(jié)點(diǎn)和后繼節(jié)點(diǎn)雄妥。
nextWaiter
表示等待隊列中的后繼節(jié)點(diǎn)。
Node節(jié)點(diǎn)構(gòu)成了同步隊列CLH
依溯,獲取同步狀態(tài)失敗的節(jié)點(diǎn)將被放入CLH
的尾部老厌。這一個操作是需要保證線程安全的,因?yàn)榭赡苡写罅揩@取同步狀態(tài)失敗的線程同時插入同步隊列尾節(jié)點(diǎn)黎炉。AQS
利用CAS
的操作提供了一個設(shè)置尾節(jié)點(diǎn)的方法:compareAndSetTail(Node expect, Node update)
枝秤。再看CLH
的首節(jié)點(diǎn),由于是FIFO的隊列慷嗜,首節(jié)點(diǎn)其實(shí)對應(yīng)的就是獲取同步狀態(tài)成功的節(jié)點(diǎn)淀弹,首節(jié)點(diǎn)在獲取同步狀態(tài)成功后會釋放掉占用的同步狀態(tài),并喚醒后繼節(jié)點(diǎn)(即CLH的第二個節(jié)點(diǎn))庆械,后繼節(jié)點(diǎn)在自己獲取同步狀態(tài)成功之后也會將自己設(shè)置為首節(jié)點(diǎn)薇溃。
下面對AQS
源碼進(jìn)行解析。
獨(dú)占式
所謂的獨(dú)占缭乘,就是指同一時刻沐序,只有一個線程能獲取到同步狀態(tài),例如堕绩,ReentrantLock就是獨(dú)占的薄啥。
獨(dú)占式獲取同步狀態(tài)
這里我們有一個很重要的議題就是,獲取同步狀態(tài)失敗的線程將會被包裝成Node
節(jié)點(diǎn)放入同步隊列CLH
里面逛尚,等待著同步狀態(tài)釋放后被喚醒垄惧。
通過調(diào)用AQS
的acquire(int arg)
方法可以獲取同步狀態(tài),來看代碼:
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
這里首先執(zhí)行tryAcquire
方法绰寞,這個方法前面我們講過到逊,是由同步器的子類去實(shí)現(xiàn)的,那么如果返回true
則表示獲取同步狀態(tài)成功滤钱,下面就不必執(zhí)行了觉壶,返回false
的話,我們需要將此節(jié)點(diǎn)加入到同步隊列的尾部件缸。
嘗試獲取同步狀態(tài)失敗的話铜靶,接著會執(zhí)行addWaiter
方法:
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode); //構(gòu)造新節(jié)點(diǎn),thread為當(dāng)前線程
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) { //尾節(jié)點(diǎn)不為空他炊,將新節(jié)點(diǎn)插入尾部
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node); //自旋插入新節(jié)點(diǎn)
return node;
}
addWaiter
用于向同步列表尾部插入新節(jié)點(diǎn)争剿,如果尾節(jié)點(diǎn)不為空已艰,直接CAS插入Node,否則執(zhí)行enq
方法蚕苇。
private Node enq(final Node node) {
for (;;) { //自旋
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node())) //尾節(jié)點(diǎn)為空哩掺,則新建節(jié)點(diǎn)當(dāng)做頭節(jié)點(diǎn)
tail = head;
} else { //尾節(jié)點(diǎn)不為空則CAS將新節(jié)點(diǎn)插入尾部
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
可以看到,初始化開始涩笤,同步隊列里是沒有節(jié)點(diǎn)的嚼吞,經(jīng)過addWaiter
之后,同步隊列將插入兩個節(jié)點(diǎn)蹬碧,一個不攜帶任何線程信息head
節(jié)點(diǎn)(假設(shè)叫init節(jié)點(diǎn))舱禽,以及攜帶當(dāng)前線程信息的新節(jié)點(diǎn)Node(假設(shè)叫節(jié)點(diǎn)1)。示意圖如下:
其中節(jié)點(diǎn)1是我們真正插入的節(jié)點(diǎn)恩沽,代表的是獲取同步狀態(tài)失敗的線程(假設(shè)這個線程叫線程1)呢蔫,而init節(jié)點(diǎn)是初始化出來的節(jié)點(diǎn),它并沒有線程信息飒筑,waitStatus
都為0 片吊,表示節(jié)點(diǎn)都是剛初始化的。
最后协屡,addWaiter
方法返回了當(dāng)前節(jié)點(diǎn)node
,對應(yīng)于圖中的節(jié)點(diǎn)1俏脊。
addWaiter
方法走完,接著走acquireQueued
方法:
final boolean acquireQueued(final Node node, int arg) { //此時的node為addWaiter返回的節(jié)點(diǎn)
boolean failed = true;
try {
boolean interrupted = false;
for (;;) { //自旋
final Node p = node.predecessor(); //前驅(qū)節(jié)點(diǎn)
if (p == head && tryAcquire(arg)) { //前驅(qū)節(jié)點(diǎn)為head節(jié)點(diǎn)則再次嘗試獲取同步狀態(tài)
setHead(node); //同步狀態(tài)獲取成功則將頭節(jié)點(diǎn)設(shè)為當(dāng)前節(jié)點(diǎn)
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
acquireQueued
方法里我們先看自旋里的第一個if
語句肤晓,它的邏輯是:如果在經(jīng)過addWaiter
方法我們將當(dāng)前節(jié)點(diǎn)加入到同步隊列的尾部之后爷贫,判斷當(dāng)前節(jié)點(diǎn)的前驅(qū)節(jié)點(diǎn),如果前驅(qū)節(jié)點(diǎn)是head
節(jié)點(diǎn)补憾,則再次嘗試獲取同步狀態(tài)漫萄。這樣做是因?yàn)椋绻@取同步狀態(tài)的線程在與當(dāng)前線程競爭的時候雖然獲取了同步狀態(tài)盈匾,于是當(dāng)前線程乖乖地通過addWaiter
將自己加入到同步隊列的尾部腾务,但是如果獲取同步狀態(tài)的線程很快又把同步狀態(tài)釋放了,那么當(dāng)前線程就應(yīng)該再次嘗試能不能獲取到同步狀態(tài)削饵,如果能岩瘦,則把head
節(jié)點(diǎn)設(shè)置為自己,同時把thread
變量設(shè)為null
窿撬,意味著自己成為了init
節(jié)點(diǎn)启昧。(為什么要判斷前驅(qū)節(jié)點(diǎn)是head?因?yàn)樵谕疥犃?code>CLH中只有head
節(jié)點(diǎn)的后繼節(jié)點(diǎn)才有資格去獲取同步狀態(tài)劈伴。)
如果前驅(qū)節(jié)點(diǎn)不是head
節(jié)點(diǎn)密末,我們看自旋方法里的第二個if
語句,首先去執(zhí)行shouldParkAfterFailedAcquire(p, node)
方法,它傳入的第一個參數(shù)是前驅(qū)節(jié)點(diǎn)严里,第二個參數(shù)是當(dāng)前節(jié)點(diǎn)新啼。來看源碼:
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
return true;
if (ws > 0) {
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
shouldParkAfterFailedAcquire
主要是判斷waitStatus
即節(jié)點(diǎn)線程的狀態(tài),我們一步一步來田炭。
- 如果前驅(qū)節(jié)點(diǎn)的
waitStatus
等于SIGNAL师抄,即等于-1漓柑,則返回true
教硫。意味著前驅(qū)節(jié)點(diǎn)的后繼節(jié)點(diǎn),即當(dāng)前節(jié)點(diǎn)可以被阻塞辆布。 - 如果前驅(qū)節(jié)點(diǎn)的
waitStatus
大于0瞬矩,則表示前驅(qū)節(jié)點(diǎn)是等待超時或被中斷的節(jié)點(diǎn),需要把之前所有waitStatus
大于0的節(jié)點(diǎn)都移除掉锋玲。 - 如果前驅(qū)節(jié)點(diǎn)的
waitStatus
不滿足于上兩種情況景用,則CAS
更新waitStatus
的值為SIGNAL,即-1惭蹂。此時表示當(dāng)前節(jié)點(diǎn)是可以被阻塞的伞插。那么在acquireQueued
自旋進(jìn)入下一次的時候,就會走到第一個if
語句了盾碗,最終返回true
媚污。
回到剛剛那張圖,初始化的時候init節(jié)點(diǎn)
和節(jié)點(diǎn)1的waitStatus
都為0 廷雅,在經(jīng)過shouldParkAfterFailedAcquire
方法之后耗美,init節(jié)點(diǎn)
的waitStatus
變成了SIGNAL,-1,如圖:
此時我們再看parkAndCheckInterrupt
方法:
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
這個方法很簡單航缀,就是立即阻塞當(dāng)前線程商架。
至此,在獨(dú)占模式下一個獲取同步狀態(tài)失敗的線程插入同步隊列尾部并阻塞當(dāng)前線程的流程就跑完了芥玉。
此時蛇摸,如果又來了一個線程(假設(shè)是線程2)進(jìn)行同步狀態(tài)的獲取,假如此時獲取到同步狀態(tài)的線程還沒有釋放灿巧,則最終結(jié)果會是這樣:
獨(dú)占式釋放同步狀態(tài)
我們上面講到皇型,獨(dú)占模式下,多個線程同時競爭砸烦,只有一個線程能獲取到同步狀態(tài)弃鸦,剩余的線程將會組裝成CLH
同步隊列進(jìn)行阻塞,等待被喚醒幢痘。當(dāng)獲取到同步狀態(tài)的線程釋放掉同步狀態(tài)時唬格,我們才會去喚醒下一個可以被喚醒的線程(即state
從1再次變成0時),釋放同步狀態(tài)的方法是release
:
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
可以看到,獨(dú)占式地釋放同步狀態(tài)购岗,要求子類里重寫的tryRelease方法返回了true
(即state
從1變成了0)汰聋,并且head
節(jié)點(diǎn)不為null
且waitStatus
不為0。那么久去執(zhí)行真正的喚醒線程的方法unparkSuccessor
:
private void unparkSuccessor(Node node) {
int ws = node.waitStatus; //head節(jié)點(diǎn)的狀態(tài)
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0); //cas更新waitStatus為0
Node s = node.next; //有資格被喚醒的下一個節(jié)點(diǎn)
if (s == null || s.waitStatus > 0) { //當(dāng)前節(jié)點(diǎn)是尾節(jié)點(diǎn)或后繼節(jié)點(diǎn)被取消/中斷了
s = null;
for (Node t = tail; t != null && t != node; t = t.prev) //tail回溯尋找第一個有資格被喚醒的線程
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread); //喚醒此線程
}
這段代碼的意思是喊积,head
節(jié)點(diǎn)的后繼節(jié)點(diǎn)如果有資格被喚醒(不是被取消或中斷的)烹困,那么就直接喚醒此線程;如果head
的后繼節(jié)點(diǎn)不滿足被喚醒的條件乾吻,就從tail
節(jié)點(diǎn)回溯尋找waitStatus
小于0的第一個節(jié)點(diǎn)髓梅,然后喚醒此節(jié)點(diǎn)的線程。這里有個疑問绎签,為什么不從node.next
往后尋找枯饿,而需要從tail
回溯向前呢?大概是因?yàn)?code>node.next也可能為null
吧诡必。
線程被喚醒之后此時同步隊列的情況是怎樣的呢奢方?我們在回看一下acquire
的代碼,尤其是acquireQueued
這個方法爸舒,因?yàn)?code>addWaiter僅僅是向同步隊列尾部添加節(jié)點(diǎn)蟋字。
final boolean acquireQueued(final Node node, int arg) { //此時的node為addWaiter返回的節(jié)點(diǎn)
boolean failed = true;
try {
boolean interrupted = false;
for (;;) { //自旋
final Node p = node.predecessor(); //前驅(qū)節(jié)點(diǎn)
if (p == head && tryAcquire(arg)) { //前驅(qū)節(jié)點(diǎn)為head節(jié)點(diǎn)則再次嘗試獲取同步狀態(tài)
setHead(node); //同步狀態(tài)獲取成功則將頭節(jié)點(diǎn)設(shè)為當(dāng)前節(jié)點(diǎn)
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
shouldParkAfterFailedAcquire
是將前置節(jié)點(diǎn)的waitStatus
設(shè)為-1并剔除已中斷或取消的線程,而parkAndCheckInterrupt
方法是真正用來阻塞線程的:
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
此時喚醒下一個可被喚醒的線程之后扭勉,LockSupport.park(this)
會恢復(fù)鹊奖,繼續(xù)向下走,返回當(dāng)前的線程阻斷狀態(tài)剖效。由于acquireQueued
里的自旋方法嫉入,此時被喚醒的線程會重新獲取同步狀態(tài),獲取成功則將head
節(jié)點(diǎn)設(shè)置成自己璧尸,并把thread
和pre
的信息置為null
咒林,最后,退出自旋爷光。畫個圖就是這樣的:
此時線程1獲取到同步狀態(tài)垫竞,就會將自己設(shè)成head
節(jié)點(diǎn)了,此時同步隊列里只有一個線程2是阻塞的蛀序。
可能有朋友會有疑問欢瞪,獲取到同步狀態(tài)并將head
節(jié)點(diǎn)設(shè)置為自己的前提不是應(yīng)該前置節(jié)點(diǎn)是head
節(jié)點(diǎn)么(p == head && tryAcquire(arg)
)?
這個沒關(guān)系,假如我們在喚醒下一個可被喚醒的線程之間徐裸,前面有線程被取消或中斷了遣鼓,那么找到的這個可被喚醒的線程必然不是head
的后繼節(jié)點(diǎn),但是因?yàn)樽孕拇嬖谥睾兀粯訒?jīng)過shouldParkAfterFailedAcquire
進(jìn)行節(jié)點(diǎn)過濾骑祟,這個節(jié)點(diǎn)必然會成為head
的后繼節(jié)點(diǎn)回懦。而由于剛剛這個可被喚醒線程已經(jīng)通過unpark
獲取到許可了,那么此時parkAndCheckInterrupt
的park
方法是不會阻塞的次企,所以再次循環(huán)后可以繼續(xù)將head
節(jié)點(diǎn)設(shè)置成當(dāng)前可被喚醒線程的節(jié)點(diǎn)怯晕。
共享式
共享式與獨(dú)占式獲取同步狀態(tài)最主要的區(qū)別在于同一時刻能否有多個線程同時獲取到同步狀態(tài)。
共享式獲取同步狀態(tài)
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
和獨(dú)占式的一樣缸棵,當(dāng)子類實(shí)現(xiàn)的tryAcquireShared
方法返回的值小于0舟茶,我們才去嘗試將線程加入同步隊列里。
來看doAcquireShared
方法:
private void doAcquireShared(int arg) {
final Node node = addWaiter(Node.SHARED); //向同步隊列尾部插入新節(jié)點(diǎn)
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor(); //當(dāng)前節(jié)點(diǎn)的前驅(qū)節(jié)點(diǎn)
if (p == head) { //如果前驅(qū)節(jié)點(diǎn)是head節(jié)點(diǎn)堵第,則再次嘗試獲取同步狀態(tài)
int r = tryAcquireShared(arg);
if (r >= 0) { //如果拿到了同步狀態(tài)吧凉,此時r大于0為還有資源可用,需要傳遞喚醒后續(xù)節(jié)點(diǎn)型诚;r=0為當(dāng)前線程獲取到了鎖客燕,但是資源已占盡鸳劳,不需要喚醒后續(xù)節(jié)點(diǎn)狰贯。
setHeadAndPropagate(node, r); //獲取鎖以后的喚醒操作
p.next = null; // help GC
if (interrupted) //如果被中斷,設(shè)置中斷狀態(tài)
selfInterrupt();
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
這里有個地方和獨(dú)占模式不一樣赏廓,setHeadAndPropagate
方法這個方法涵紊,獨(dú)占模式下只需要將頭節(jié)點(diǎn)設(shè)為自己就行,但在共享模式下幔摸,我們將head
節(jié)點(diǎn)設(shè)為了當(dāng)前節(jié)點(diǎn)后摸柄,還要去嘗試喚醒其他的可喚醒節(jié)點(diǎn)。(這個很好理解既忆,因?yàn)槭枪蚕砟J角海员厝灰蠹乙黄鹑帗?
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head; //老的頭結(jié)點(diǎn)
setHead(node); //將當(dāng)前線程設(shè)為頭結(jié)點(diǎn)
//如果還有資源或者頭結(jié)點(diǎn)的后續(xù)節(jié)點(diǎn)為待喚醒節(jié)點(diǎn)(h.waitStatus為SIGNAL或PROPAGATE)
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
//后繼節(jié)點(diǎn)不確定或?yàn)楣蚕砟J焦?jié)點(diǎn)
if (s == null || s.isShared())
doReleaseShared(); //同步狀態(tài)的釋放
}
}
共享式釋放同步狀態(tài)
可以看到,共享式釋放同步狀態(tài)里患雇,在tryReleaseShared
釋放資源成功的時候就會去執(zhí)行doReleaseShared
方法跃脊,這個也是上文setHeadAndPropagate
里作為傳播喚醒的方法。
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
private void doReleaseShared() {
for (;;) {
Node h = head;
//同步隊列中有正在等待的線程
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) { //head結(jié)點(diǎn)正在運(yùn)行苛吱,后續(xù)線程需被喚醒
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) //后繼節(jié)點(diǎn)成為head結(jié)點(diǎn)酪术,設(shè)置初始化狀態(tài)
continue; // loop to recheck cases
unparkSuccessor(h); //喚醒head節(jié)點(diǎn)后繼節(jié)點(diǎn)的線程
}
//將head節(jié)點(diǎn)的狀態(tài)設(shè)置為PROPAGATE,以便于傳播喚醒
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
//如果頭結(jié)點(diǎn)發(fā)生變化翠储,證明其他線程獲取了鎖(setHeadAndPropagate里重設(shè)head節(jié)點(diǎn))绘雁,需要重試,否則退出循環(huán)
if (h == head) // loop if head changed
break;
}
}
共享式釋放同步狀態(tài)說的是援所,在tryReleaseShared
執(zhí)行成功的時候執(zhí)行doReleaseShared
傳播釋放后繼節(jié)點(diǎn)庐舟。假如現(xiàn)在同步隊列里沒有等待線程,則把目前唯一的head節(jié)點(diǎn)設(shè)置為PROPAGATE
住拭,這時候是說明資源過剩挪略。如果是在acquireShared
方法通過setHeadAndPropagate
正好到達(dá)doReleaseShared
方法耻涛,這時候需要判斷waitStatus
的狀態(tài),SIGNAL
就喚醒(PROPAGATE在shouldParkAfterFailedAcquire里會變成SIGNAL)瘟檩。如果同步隊列里有等待線程則都喚醒他們抹缕。
總結(jié)
本篇簡單分析了一下AQS
的基本概念和原理,其中還有一些沒提到的如中斷異常的acquireInterruptibly
墨辛,支持超時獲取的tryAcquireNanos
卓研,等待隊列(不是同步隊列)等。后續(xù)也將會基于AQS
分析可重入鎖ReentrantLock
,讀寫鎖ReentrantReadWriteLock
,以及同步工具CountdownLatch
,Semaphore
,CyclicBarrier
的原理和實(shí)現(xiàn)睹簇。