1赖舟、引言
在JDK1.5之前,一般是靠synchronized
關(guān)鍵字來實現(xiàn)線程對共享變量的互斥訪問硫椰。synchronized
是在字節(jié)碼上加指令铜邮,依賴于底層操作系統(tǒng)的Mutex Lock
實現(xiàn)。
而從JDK1.5以后java界的一位大神—— Doug Lea 開發(fā)了AbstractQueuedSynchronizer
(AQS)組件酱虎,使用原生java代碼實現(xiàn)了synchronized
語義雨膨。換句話說,Doug Lea沒有使用更“高級”的機器指令读串,也不依靠JDK編譯時的特殊處理聊记,僅用一個普普通通的類就完成了代碼塊的并發(fā)訪問控制撒妈,比那些費力不討好的實現(xiàn)不知高到哪里去了。
java.util.concurrent
包有多重要無需多言甥雕,一言以蔽之踩身,是Doug Lea大爺對天下所有Java程序員的憐憫。
AQS定義了一套多線程訪問共享資源的同步器框架社露,是整個java.util.concurrent
包的基石挟阻,Lock
、ReadWriteLock
峭弟、CountDowndLatch
附鸽、CyclicBarrier
、Semaphore
瞒瘸、ThreadPoolExecutor
等都是在AQS的基礎(chǔ)上實現(xiàn)的坷备。
2、實現(xiàn)原理
并發(fā)控制的核心是鎖的獲取與釋放情臭,鎖的實現(xiàn)方式有很多種省撑,AQS采用的是一種改進的CLH鎖。
2.1 CLH鎖
CLH(Craig, Landin, and Hagersten locks)是一種自旋鎖俯在,能確保無饑餓性竟秫,提供先來先服務(wù)的公平性。
何謂自旋鎖跷乐?它是為實現(xiàn)保護共享資源而提出一種鎖機制肥败。其實,自旋鎖與互斥鎖比較類似愕提,它們都是為了解決對某項資源的互斥使用馒稍。無論是互斥鎖,還是自旋鎖浅侨,在任何時刻纽谒,最多只能有一個保持者,也就是說如输,在任何時刻最多只能有一個執(zhí)行單元獲得鎖佛舱。但是兩者在調(diào)度機制上略有不同。對于互斥鎖挨决,如果資源已經(jīng)被占用,資源申請者只能進入睡眠狀態(tài)订歪。但是自旋鎖不會引起調(diào)用者睡眠脖祈,如果自旋鎖已經(jīng)被別的執(zhí)行單元保持,調(diào)用者就一直循環(huán)在那里看是否該自旋鎖的保持者已經(jīng)釋放了鎖刷晋,“自旋”一詞就是因此而得名盖高。
CLH鎖是一種基于鏈表的可擴展慎陵、高性能、公平的自旋鎖喻奥,申請線程只在本地變量上自旋席纽,它不斷輪詢前驅(qū)的狀態(tài),如果發(fā)現(xiàn)前驅(qū)釋放了鎖就結(jié)束自旋撞蚕。
CLH隊列中的節(jié)點QNode
中含有一個locked
字段润梯,該字段若為true表示該線程需要獲取鎖,且不釋放鎖甥厦,為false表示線程釋放了鎖纺铭。節(jié)點之間是通過隱形的鏈表相連,之所以叫隱形的鏈表是因為這些節(jié)點之間沒有明顯的next指針刀疙,而是通過myPred
所指向的節(jié)點的變化情況來影響myNode
的行為舶赔。
CLHLock上還有一個尾指針,始終指向隊列的最后一個節(jié)點谦秧。
當一個線程需要獲取鎖時竟纳,會創(chuàng)建一個新的QNode
,將其中的locked
設(shè)置為true表示需要獲取鎖疚鲤,然后使自己成為隊列的尾部锥累,同時獲取一個指向其前趨的引用myPred
,然后該線程就在前趨節(jié)點的locked
字段上旋轉(zhuǎn)石咬,直到前趨節(jié)點釋放鎖揩悄。當一個線程需要釋放鎖時,將當前節(jié)點的locked
域設(shè)置為false鬼悠,同時回收前趨節(jié)點删性。如上圖所示,線程A需要獲取鎖焕窝,其myNode
域為true蹬挺,些時tail指向線程A的節(jié)點,然后線程B也加入到線程A后面它掂,tail
指向線程B的節(jié)點巴帮。然后線程A和B都在它的myPred
域上旋轉(zhuǎn),一旦它的myPred
節(jié)點的locked
字段變?yōu)閒alse虐秋,它就可以獲取鎖榕茧。
2.2 AQS數(shù)據(jù)模型
AQS維護了一個volatile int state
(代表共享資源)和一個FIFO線程等待隊列(多線程爭用資源被阻塞時會進入此隊列)。
AQS的內(nèi)部隊列是CLH同步鎖的一種變形客给。其主要從以下方面進行了改造:
- 在結(jié)構(gòu)上引入了頭節(jié)點和尾節(jié)點用押,分別指向隊列的頭和尾,嘗試獲取鎖靶剑、入隊列蜻拨、釋放鎖等實現(xiàn)都與頭尾節(jié)點相關(guān)池充,
- 為了可以處理timeout和cancel操作,每個node維護一個指向前驅(qū)的指針缎讼。如果一個node的前驅(qū)被cancel收夸,這個node可以前向移動使用前驅(qū)的狀態(tài)字段
- 在每個node里面使用一個狀態(tài)字段來控制阻塞/喚醒,而不是自旋
- head節(jié)點使用的是傀儡節(jié)點
FIFO隊列中的節(jié)點有AQS的靜態(tài)內(nèi)部類Node
定義:
static final class Node {
// 共享模式
static final Node SHARED = new Node();
// 獨占模式
static final Node EXCLUSIVE = null;
static final int CANCELLED = 1;
static final int SIGNAL = -1;
static final int CONDITION = -2;
static final int PROPAGATE = -3;
/**
* CANCELLED血崭,值為1卧惜,表示當前的線程被取消
* SIGNAL,值為-1功氨,表示當前節(jié)點的后繼節(jié)點包含的線程需要運行序苏,也就是unpark;
* CONDITION捷凄,值為-2忱详,表示當前節(jié)點在等待condition,也就是在condition隊列中跺涤;
* PROPAGATE匈睁,值為-3,表示當前場景下后續(xù)的acquireShared能夠得以執(zhí)行桶错;
* 值為0航唆,表示當前節(jié)點在sync隊列中,等待著獲取鎖院刁。
*/
volatile int waitStatus;
// 前驅(qū)結(jié)點
volatile Node prev;
// 后繼結(jié)點
volatile Node next;
// 與該結(jié)點綁定的線程
volatile Thread thread;
// 存儲condition隊列中的后繼節(jié)點
Node nextWaiter;
// 是否為共享模式
final boolean isShared() {
return nextWaiter == SHARED;
}
// 獲取前驅(qū)結(jié)點
final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}
Node() { // Used to establish initial head or SHARED marker
}
Node(Thread thread, Node mode) { // Used by addWaiter
this.nextWaiter = mode;
this.thread = thread;
}
Node(Thread thread, int waitStatus) { // Used by Condition
this.waitStatus = waitStatus;
this.thread = thread;
}
}
Node
類中有兩個常量SHARE
和EXCLUSIVE
糯钙,顧名思義這兩個常量用于表示這個節(jié)點支持共享模式還是獨占模式,共享模式指的是允許多個線程獲取同一個鎖而且可能獲取成功退腥,獨占模式指的是一個鎖如果被一個線程持有任岸,其他線程必須等待。多個線程讀取一個文件可以采用共享模式狡刘,而當有一個線程在寫文件時不會允許另一個線程寫這個文件享潜,這就是獨占模式的應(yīng)用場景。
2.3 CAS操作
AQS有三個重要的變量:
// 隊頭結(jié)點
private transient volatile Node head;
// 隊尾結(jié)點
private transient volatile Node tail;
// 代表共享資源
private volatile int state;
protected final int getState() {
return state;
}
protected final void setState(int newState) {
state = newState;
}
protected final boolean compareAndSetState(int expect, int update) {
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
compareAndSetState
方法是以樂觀鎖的方式更新共享資源嗅蔬。
獨占鎖是一種悲觀鎖剑按,synchronized
就是一種獨占鎖,會導(dǎo)致其它所有需要鎖的線程掛起澜术,等待持有鎖的線程釋放鎖艺蝴。而另一個更加有效的鎖就是樂觀鎖。所謂樂觀鎖就是鸟废,每次不加鎖而是假設(shè)沒有沖突而去完成某項操作猜敢,如果因為沖突失敗就重試,直到成功為止。樂觀鎖用到的機制就是CAS锣枝,即Compare And Swap
。
CAS 指的是現(xiàn)代 CPU 廣泛支持的一種對內(nèi)存中的共享數(shù)據(jù)進行操作的一種特殊指令兰英。這個指令會對內(nèi)存中的共享數(shù)據(jù)做原子的讀寫操作撇叁。簡單介紹一下這個指令的操作過程:
首先,CPU 會將內(nèi)存中將要被更改的數(shù)據(jù)與期望的值做比較畦贸。然后陨闹,當這兩個值相等時,CPU 才會將內(nèi)存中的數(shù)值替換為新的值薄坏。否則便不做操作趋厉。最后,CPU 會將舊的數(shù)值返回胶坠。
這一系列的操作是原子的君账。它們雖然看似復(fù)雜,但卻是 Java 5 并發(fā)機制優(yōu)于原有鎖機制的根本沈善。簡單來說乡数,CAS 的含義是“我認為原有的值應(yīng)該是什么,如果是闻牡,則將原有的值更新為新值净赴,否則不做修改,并告訴我原來的值是多少”罩润。
CAS通過調(diào)用JNI(Java Native Interface)調(diào)用實現(xiàn)的玖翅。JNI允許java調(diào)用其他語言,而CAS就是借助C語言來調(diào)用CPU底層指令實現(xiàn)的割以。Unsafe
是CAS的核心類金度,它提供了硬件級別的原子操作。
Doug Lea大神在java同步器中大量使用了CAS技術(shù)拳球,鬼斧神工的實現(xiàn)了多線程執(zhí)行的安全性审姓。CAS不僅在AQS的實現(xiàn)中隨處可見,也是整個java.util.concurrent
包的基石祝峻。
可以發(fā)現(xiàn)魔吐,head
、tail
莱找、state
三個變量都是volatile
的酬姆。
volatile
是輕量級的synchronized
,它在多處理器開發(fā)中保證了共享變量的“可見性”奥溺。可見性的意思是當一個線程修改一個共享變量時辞色,另外一個線程能讀到這個修改的值。如果一個字段被聲明成volatile
浮定,Java線程內(nèi)存模型確保所有線程看到這個變量的值是一致的相满。
volatile變量也存在一些局限:不能用于構(gòu)建原子的復(fù)合操作层亿,因此當一個變量依賴舊值時就不能使用volatile變量。而CAS呢立美,恰恰可以提供對共享變量的原子的讀寫操作匿又。
volatile保證共享變量的可見性,CAS保證更新操作的原子性建蹄,簡直是絕配碌更!把這些特性整合在一起,就形成了整個concurrent包得以實現(xiàn)的基石洞慎。如果仔細分析concurrent包的源代碼實現(xiàn)痛单,會發(fā)現(xiàn)一個通用化的實現(xiàn)模式:
- 首先,聲明共享變量為volatile劲腿;
- 然后旭绒,使用CAS的原子條件更新來實現(xiàn)線程之間的同步;
- 同時谆棱,配合以volatile的讀/寫和CAS所具有的volatile讀和寫的內(nèi)存語義來實現(xiàn)線程之間的通信快压。
AQS,非阻塞數(shù)據(jù)結(jié)構(gòu)和原子變量類(java.util.concurrent.atomic包中的類)垃瞧,這些concurrent包中的基礎(chǔ)類都是使用這種模式來實現(xiàn)的蔫劣,而concurrent包中的高層類又是依賴于這些基礎(chǔ)類來實現(xiàn)的。從整體來看个从,concurrent包的實現(xiàn)示意圖如下:
3脉幢、源碼解讀
前面提到過,AQS定義兩種資源共享方式:
-
Exclusive:獨占嗦锐,只有一個線程能執(zhí)行嫌松,如
ReentrantLock
-
Share:共享,多個線程可同時執(zhí)行奕污,如
Semaphore
/CountDownLatch
不同的自定義同步器爭用共享資源的方式也不同萎羔。自定義同步器在實現(xiàn)時只需要實現(xiàn)共享資源state
的獲取與釋放方式即可,至于具體線程等待隊列的維護(如獲取資源失敗入隊/喚醒出隊等)碳默,AQS已經(jīng)在頂層實現(xiàn)好了贾陷。自定義同步器實現(xiàn)時主要實現(xiàn)以下幾種方法:
-
isHeldExclusively()
:該線程是否正在獨占資源。只有用到condition
才需要去實現(xiàn)它嘱根。 -
tryAcquire(int)
:獨占方式髓废。嘗試獲取資源,成功則返回true该抒,失敗則返回false慌洪。 -
tryRelease(int)
:獨占方式。嘗試釋放資源,成功則返回true冈爹,失敗則返回false涌攻。 -
tryAcquireShared(int)
:共享方式。嘗試獲取資源频伤。負數(shù)表示失斞⑵帷;0表示成功剂买,但沒有剩余可用資源;正數(shù)表示成功癌蓖,且有剩余資源瞬哼。 -
tryReleaseShared(int)
:共享方式。嘗試釋放資源租副,成功則返回true坐慰,失敗則返回false。
以ReentrantLock
為例用僧,state
初始化為0结胀,表示未鎖定狀態(tài)。A線程lock()
時责循,會調(diào)用tryAcquire()
獨占該鎖并將state+1糟港。此后,其他線程再tryAcquire()
時就會失敗院仿,直到A線程unlock()
到state=0(即釋放鎖)為止秸抚,其它線程才有機會獲取該鎖。當然歹垫,釋放鎖之前剥汤,A線程自己是可以重復(fù)獲取此鎖的(state會累加),這就是可重入的概念排惨。但要注意吭敢,獲取多少次就要釋放多么次,這樣才能保證state是能回到零態(tài)的暮芭。
再以CountDownLatch
以例鹿驼,任務(wù)分為N個子線程去執(zhí)行,state
也初始化為N(注意N要與線程個數(shù)一致)谴麦。這N個子線程是并行執(zhí)行的蠢沿,每個子線程執(zhí)行完后countDown()
一次,state
會CAS減1匾效。等到所有子線程都執(zhí)行完后(即state=0)舷蟀,會unpark()
主調(diào)用線程,然后主調(diào)用線程就會從await()
函數(shù)返回,繼續(xù)后余動作野宜。
一般來說扫步,自定義同步器要么是獨占方法,要么是共享方式匈子,他們也只需實現(xiàn)tryAcquire-tryRelease
河胎、tryAcquireShared-tryReleaseShared
中的一種即可。但AQS也支持自定義同步器同時實現(xiàn)獨占和共享兩種方式虎敦,如ReentrantReadWriteLock
游岳。
3.1 acquire(int)
public final void acquire(int arg) {
if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
此方法是獨占模式下線程獲取共享資源的頂層入口。如果獲取到資源其徙,線程直接返回胚迫,否則進入等待隊列,直到獲取到資源為止唾那,且整個過程忽略中斷的影響访锻。獲取到資源后,線程就可以去執(zhí)行其臨界區(qū)代碼了闹获。
函數(shù)流程如下:
-
tryAcquire()
嘗試直接去獲取資源期犬,如果成功則直接返回; -
addWaiter()
將該線程加入等待隊列的尾部避诽,并標記為獨占模式龟虎; -
acquireQueued()
使線程在等待隊列中獲取資源,一直獲取到資源后才返回沙庐。如果在整個等待過程中被中斷過遣总,則返回true,否則返回false轨功。 - 如果線程在等待過程中被中斷過旭斥,它是不響應(yīng)的。只是獲取資源后才再進行自我中斷
selfInterrupt()
古涧,將中斷補上垂券。
下面再來看看每個方法的實現(xiàn)代碼。
3.1.1 tryAcquire(int)
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
此方法嘗試去獲取獨占資源羡滑。如果獲取成功菇爪,則直接返回true,否則直接返回false柒昏。
AQS只是一個框架凳宙,在這里定義了一個接口,具體資源的獲取交由自定義同步器去實現(xiàn)了(通過state的get/set/CAS)职祷,至于能不能重入氏涩,能不能加塞届囚,那就看具體的自定義同步器怎么去設(shè)計了。當然是尖,自定義同步器在進行資源訪問時要考慮線程安全的影響意系。
這里之所以沒有定義成abstract
,是因為獨占模式下只用實現(xiàn)tryAcquire-tryRelease
饺汹,而共享模式下只用實現(xiàn)tryAcquireShared-tryReleaseShared
蛔添。如果都定義成abstract
,那么每個模式也要去實現(xiàn)另一模式下的接口兜辞。說到底迎瞧,Doug Lea還是站在咱們開發(fā)者的角度,盡量減少不必要的工作量逸吵。
3.1.2 addWaiter(Node)
private Node addWaiter(Node mode) {
// 使用當前線程構(gòu)造結(jié)點
Node node = new Node(Thread.currentThread(), mode);
Node pred = tail;
if (pred != null) { // 如果隊尾結(jié)點不為空夹攒,將當前節(jié)點插入隊尾
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
// 隊尾結(jié)點為空(隊列還沒有初始化),則轉(zhuǎn)調(diào)enq入隊
enq(node);
return node;
}
其中胁塞,compareAndSetTail方法也是調(diào)用Unsafe類實現(xiàn)CAS操作,更新隊尾压语。
3.1.3 enq(Node)
private Node enq(final Node node) {
for (;;) { // CAS自旋啸罢,直到插入成功
Node t = tail;
if (t == null) { // 隊尾為空,則先初始化隊列胎食,new一個傀儡節(jié)點
if (compareAndSetHead(new Node()))
tail = head; // 頭尾指針都指向傀儡節(jié)點
} else { // 插入隊尾
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
這段代碼的精髓就在于CAS自旋volatile
變量扰才,也是AtomicInteger
、AtomicBoolean
等原子量的靈魂厕怜。
3.1.4 acquireQueued(Node, int)
通過tryAcquire()
和addWaiter()
衩匣,如果線程獲取資源失敗,已經(jīng)被放入等待隊列尾部了粥航。但是琅捏,后面還有一項重要的事沒干,就是讓線程進入阻塞狀態(tài)递雀,直到其他線程釋放資源后喚醒自己。過程跟在銀行辦理業(yè)務(wù)時排隊拿號有點相似,acquireQueued()
就是干這件事:在等待隊列中排隊拿號(中間沒其它事干可以休息)陈哑,直到拿到號后再返回合瓢。
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true; // 是否獲取到了資源
try {
boolean interrupted = false; // 等待過程中有沒有被中斷
for (;;) { // 自旋,直到
final Node p = node.predecessor();
// 前驅(qū)是head杨凑,則有資格去嘗試獲取資源
if (p == head && tryAcquire(arg)) {
// 獲取資源成功滤奈,將自己置為隊頭,并回收其前驅(qū)(舊的隊頭)
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
// 獲取資源失敗撩满,
if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
如果獲取資源失敗后蜒程,會調(diào)用兩個函數(shù)绅你,shouldParkAfterFailedAcquire
和parkAndCheckInterrupt
,下面來看看它倆是干什么的搞糕。
3.1.5 shouldParkAfterFailedAcquire(Node, Node)
從名字可以猜出來勇吊,該函數(shù)的作用是“在獲取資源失敗后是否需要阻塞”:
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus; // 前驅(qū)狀態(tài)
if (ws == Node.SIGNAL)
// Node.SIGNAL,代表前驅(qū)釋放資源后會通知后繼結(jié)點
return true;
if (ws > 0) { // 代表前驅(qū)已取消任務(wù)窍仰,相當于退出了等待隊列
do { // 一個個往前找汉规,找到最近一個正常等待的前驅(qū),排在它的后面
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
// 前驅(qū)狀態(tài)正常驹吮,則將其狀態(tài)置為SIGNAL针史,意為,釋放資源后通知后繼結(jié)點
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
整個流程中碟狞,如果前驅(qū)節(jié)點的狀態(tài)不是SIGNAL
啄枕,那么自己就不能安心去休息,需要去找個安心的休息點族沃,同時可以再嘗試下看有沒有機會輪到自己拿號频祝。
3.1.6 parkAndCheckInterrupt()
如果線程找好安全休息點后,那就可以安心去休息了脆淹。此方法就是讓線程去休息常空,真正進入等待狀態(tài)。
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this); // 使線程進入waiting狀態(tài)
return Thread.interrupted();
}
park()
會讓當前線程進入waiting
狀態(tài)盖溺。在此狀態(tài)下漓糙,有兩種途徑可以喚醒該線程:被unpark()
或被interrupt()
。
3.1.7 小結(jié)
總結(jié)下acquire的流程:
- 調(diào)用自定義同步器的
tryAcquire()
嘗試直接去獲取資源烘嘱,如果成功則直接返回昆禽; - 沒成功,則
addWaiter()
將該線程加入等待隊列的尾部蝇庭,并標記為獨占模式醉鳖; -
acquireQueued()
使線程在等待隊列中休息,有機會時(輪到自己哮内,會被unpark()
)會去嘗試獲取資源辐棒。獲取到資源后才返回。如果在整個等待過程中被中斷過牍蜂,則返回true漾根,否則返回false。 - 如果線程在等待過程中被中斷過鲫竞,它是不響應(yīng)的辐怕。只是獲取資源后才再進行自我中斷
selfInterrupt()
,將中斷補上从绘。
3.2 release(int)
release()是acquire()的逆操作寄疏,是獨占模式下線程釋放共享資源的頂層入口是牢。它會釋放指定量的資源,如果徹底釋放了(即state=0),它會喚醒等待隊列里的其他線程來獲取資源陕截。
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0) // 狀態(tài)不為0驳棱,證明需要喚醒后繼結(jié)點
unparkSuccessor(h);
return true;
}
return false;
}
3.2.1 tryRelease(int)
protected boolean tryRelease(int arg) {
throw new UnsupportedOperationException();
}
跟tryAcquire()
一樣,這個方法是需要自定義同步器去實現(xiàn)的农曲。正常來說社搅,tryRelease()
都會成功的,因為這是獨占模式乳规,該線程來釋放資源形葬,那么它肯定已經(jīng)拿到獨占資源了,直接減掉相應(yīng)量的資源即可暮的,也不需要考慮線程安全的問題笙以。
3.2.2 unparkSuccessor(Node)
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
if (ws < 0) // 將當前結(jié)點狀態(tài)置零
compareAndSetWaitStatus(node, ws, 0);
Node s = node.next;
if (s == null || s.waitStatus > 0) { // 后繼結(jié)點為空或者已取消
s = null;
// 從隊尾開始向前尋找,找到第一個正常的后繼結(jié)點
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread); // 喚醒該結(jié)點上的線程
}
邏輯并不復(fù)雜冻辩,一句話概括:用unpark()
喚醒等待隊列中最前邊的那個未放棄線程猖腕。
3.3 acquireShared(int)
此方法是共享模式下線程獲取共享資源的頂層入口。它會獲取指定量的資源恨闪,獲取成功則直接返回倘感,獲取失敗則進入等待隊列,直到獲取到資源為止凛剥,整個過程忽略中斷。
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
protected int tryAcquireShared(int arg) { // 留給子類實現(xiàn)
throw new UnsupportedOperationException();
}
這里tryAcquireShared()
依然需要自定義同步器去實現(xiàn)轻姿。但是AQS已經(jīng)把其返回值的語義定義好了:負值代表獲取失斃缰椤;0代表獲取成功互亮,但沒有剩余資源犁享;正數(shù)表示獲取成功,還有剩余資源豹休,其他線程還可以去獲取炊昆。
3.3.1 doAcquireShared(int)
private void doAcquireShared(int arg) {
final Node node = addWaiter(Node.SHARED); // 以共享模式加入隊尾
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head) { // 前驅(qū)是隊頭(隊頭肯定是已經(jīng)拿到資源的結(jié)點)
int r = tryAcquireShared(arg); // 嘗試獲取資源
if (r >= 0) { // 獲取資源成功
setHeadAndPropagate(node, r); // 將自己置為隊頭,若還有剩余資源威根,向后傳播
p.next = null; // help GC
if (interrupted)
selfInterrupt(); // 如果等待過程中被打斷過凤巨,此時將中斷補上。
failed = false;
return;
}
}
// 判斷狀態(tài)洛搀,尋找合適的前驅(qū)敢茁,進入waiting狀態(tài),等著被unpark()或interrupt()
if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
該函數(shù)的功能類似于獨占模式下的acquireQueued()
留美。
跟獨占模式比彰檬,有一點需要注意的是伸刃,這里只有線程是head.next
時(“老二”),才會去嘗試獲取資源逢倍,有剩余的話還會喚醒之后的隊友捧颅。那么問題就來了,假如老大用完后釋放了5個資源较雕,而老二需要6個碉哑,老三需要1個,老四需要2個郎笆。因為老大先喚醒老二谭梗,老二一看資源不夠自己用繼續(xù)park(),也更不會去喚醒老三和老四了宛蚓。獨占模式激捏,同一時刻只有一個線程去執(zhí)行,這樣做未嘗不可凄吏;但共享模式下远舅,多個線程是可以同時執(zhí)行的,現(xiàn)在因為老二的資源需求量大痕钢,而把后面量小的老三和老四也都卡住了图柏。
3.3.2 setHeadAndPropagate(Node, int)
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head;
setHead(node); // 將自己置為隊頭
if (propagate > 0 || h == null || h.waitStatus < 0) {
Node s = node.next;
if (s == null || s.isShared()) // 后繼結(jié)點也為共享模式,則觸發(fā)釋放資源函數(shù)
doReleaseShared();
}
}
此方法在setHead()
的基礎(chǔ)上多了一步任连,就是自己蘇醒的同時蚤吹,如果條件符合(比如還有剩余資源),還會去喚醒后繼節(jié)點随抠,畢竟是共享模式裁着。
3.4 releaseShared(int)
此方法是共享模式下線程釋放共享資源的頂層入口。它會釋放指定量的資源拱她,如果徹底釋放了(即state=0),它會喚醒等待隊列里的其他線程來獲取資源二驰。
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) { // 嘗試釋放資源
doReleaseShared(); // 釋放成功,繼續(xù)喚醒后繼結(jié)點
return true;
}
return false;
}
protected boolean tryReleaseShared(int arg) { // 留給子類實現(xiàn)
throw new UnsupportedOperationException();
}
跟獨占模式下的release()
相似秉沼,但有一點稍微需要注意:獨占模式下的tryRelease()
在完全釋放掉資源(state=0)后桶雀,才會返回true去喚醒其他線程,這主要是基于可重入的考量唬复;而共享模式下的releaseShared()
則沒有這種要求矗积,多線程可并發(fā)執(zhí)行,不適用于可重入敞咧。
3.4.1 doReleaseShared()
private void doReleaseShared() {
for (;;) {
Node h = head;
if (h != null && h != tail) { // 頭結(jié)點不為空且有后繼結(jié)點
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) // 頭結(jié)點狀態(tài)漠魏,SIGNAL——>0
continue; // 狀態(tài)更新失敗則循環(huán)進行,直到成功
unparkSuccessor(h); // 喚醒后繼結(jié)點
} else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
// 頭結(jié)點狀態(tài)妄均,0——>PROPAGATE
continue; // 持續(xù)循環(huán)柱锹,直到狀態(tài)更新成功
}
if (h == head) // 頭結(jié)點沒變哪自,則結(jié)束循環(huán);否則繼續(xù)
break;
}
}
其余函數(shù)已經(jīng)在上面分析過了禁熏。至此壤巷,AQS的獨占模式與共享模式下的實現(xiàn)原理剖析的差不多了,代碼是最好的老師瞧毙。
除了上面分析的核心方法胧华,AQS還有定義了附帶超時功能的tryAcquireNanos()
/tryAcquireSharedNanos()
方法,以及響應(yīng)中斷的acquireInterruptibly()
/acquireSharedInterruptibly()
方法宙彪,其核心流程與通用方法大同小異矩动,不再贅述。
4释漆、應(yīng)用實例
我們利用AQS來實現(xiàn)一個不可重入的互斥鎖實現(xiàn)悲没。鎖資源(AQS里的state)只有兩種狀態(tài):0表示未鎖定,1表示鎖定男图。下邊是Mutex的核心源碼:
public class Mutex {
/**
* 靜態(tài)內(nèi)部類示姿,自定義同步器
*/
private static class Sync extends AbstractQueuedSynchronizer {
@Override
protected boolean isHeldExclusively() {
return getState() == 1; // 是否有資源可用
}
@Override
public boolean tryAcquire(int acquires) {
assert acquires == 1;
if (compareAndSetState(0, 1)) { // state:0——>1,代表獲取鎖
setExclusiveOwnerThread(Thread.currentThread()); // 設(shè)置當前占用資源的線程
return true;
}
return false;
}
@Override
protected boolean tryRelease(int releases) {
assert releases == 1;
if (getState() == 0) {
throw new IllegalMonitorStateException();
}
setExclusiveOwnerThread(null);
setState(0); // state:1——>0逊笆,代表釋放鎖
return true;
}
}
private final Sync sync = new Sync();
/**
* 獲取鎖栈戳,可能會阻塞
*/
public void lock() {
sync.acquire(1);
}
/**
* 嘗試獲取鎖,無論成功或失敗难裆,立即返回
*/
public boolean tryLock() {
return sync.tryAcquire(1);
}
/**
* 釋放鎖
*/
public void unlock() {
sync.release(1);
}
}
同步類在實現(xiàn)時一般都將自定義同步器(sync
)定義為內(nèi)部類子檀,供自己使用;而同步類自己(Mutex)則實現(xiàn)某個接口乃戈,對外服務(wù)褂痰。當然,接口的實現(xiàn)要直接依賴sync
偏化,它們在語義上也存在某種對應(yīng)關(guān)系脐恩。而sync
只用實現(xiàn)資源state
的獲取-釋放方式tryAcquire-tryRelelase
镐侯,至于線程的排隊侦讨、等待、喚醒等苟翻,上層的AQS都已經(jīng)實現(xiàn)好了韵卤,我們不用關(guān)心。
ReentrantLock
/CountDownLatch
/Semphore
這些同步類的實現(xiàn)方式都差不多崇猫,不同的地方就在獲取沈条、釋放資源的方式tryAcquire-tryRelelase
。