前言
引自:https://tech.meituan.com/2019/12/05/aqs-theory-and-apply.html
Java中的大部分同步類(Lock、Semaphore师痕、ReentrantLock等)都是基于AbstractQueuedSynchronizer(簡稱為AQS)實現(xiàn)的沪哺。AQS是一種提供了原子式管理同步狀態(tài)、阻塞和喚醒線程功能以及隊列模型的簡單框架悲幅。本文會從應(yīng)用層逐漸深入到原理層,并通過ReentrantLock的基本特性和ReentrantLock與AQS的關(guān)聯(lián),來深入解讀AQS相關(guān)獨占鎖的知識點祟身,同時采取問答的模式來幫助大家理解AQS。由于篇幅原因涕烧,本篇文章主要闡述AQS中獨占鎖的邏輯和Sync Queue月而,不講述包含共享鎖和Condition Queue的部分(本篇文章核心為AQS原理剖析,只是簡單介紹了ReentrantLock议纯,感興趣同學(xué)可以閱讀一下ReentrantLock的源碼)父款。
下面列出本篇文章的大綱和思路,以便于大家更好地理解:
1 ReentrantLock
1.1 ReentrantLock特性概覽
ReentrantLock意思為可重入鎖瞻凤,指的是一個線程能夠?qū)σ粋€臨界資源重復(fù)加鎖憨攒。為了幫助大家更好地理解ReentrantLock的特性,我們先將ReentrantLock跟常用的Synchronized進行比較阀参,其特性如下(藍色部分為本篇文章主要剖析的點):
下面通過偽代碼肝集,進行更加直觀的比較:
// **************************Synchronized的使用方式**************************// 1.用于代碼塊synchronized (this) {}// 2.用于對象synchronized (object) {}// 3.用于方法public synchronized void test (){}// 4.可重入for(inti =0; i <100; i++) {synchronized (this) {}}// **************************ReentrantLock的使用方式**************************public void test () throw Exception{// 1.初始化選擇公平鎖、非公平鎖ReentrantLocklock=newReentrantLock(true);// 2.可用于代碼塊lock.lock();try{try{// 3.支持多種加鎖方式蛛壳,比較靈活; 具有可重入特性if(lock.tryLock(100, TimeUnit.MILLISECONDS)){ }}finally{// 4.手動釋放鎖lock.unlock()}}finally{lock.unlock();}}
1.2 ReentrantLock與AQS的關(guān)聯(lián)
通過上文我們已經(jīng)了解杏瞻,ReentrantLock支持公平鎖和非公平鎖(關(guān)于公平鎖和非公平鎖的原理分析,可參考《不可不說的Java“鎖”事》)衙荐,并且ReentrantLock的底層就是由AQS來實現(xiàn)的捞挥。那么ReentrantLock是如何通過公平鎖和非公平鎖與AQS關(guān)聯(lián)起來呢? 我們著重從這兩者的加鎖過程來理解一下它們與AQS之間的關(guān)系(加鎖過程中與AQS的關(guān)聯(lián)比較明顯忧吟,解鎖流程后續(xù)會介紹)砌函。
非公平鎖源碼中的加鎖流程如下:
// java.util.concurrent.locks.ReentrantLock#NonfairSync// 非公平鎖staticfinalclass NonfairSync extends Sync {...finalvoid lock() {if(compareAndSetState(0,1))setExclusiveOwnerThread(Thread.currentThread());elseacquire(1);}? ...}
這塊代碼的含義為:
若通過CAS設(shè)置變量State(同步狀態(tài))成功,也就是獲取鎖成功溜族,則將當前線程設(shè)置為獨占線程讹俊。
若通過CAS設(shè)置變量State(同步狀態(tài))失敗,也就是獲取鎖失敗煌抒,則進入Acquire方法進行后續(xù)處理仍劈。
第一步很好理解,但第二步獲取鎖失敗后摧玫,后續(xù)的處理策略是怎么樣的呢耳奕?這塊可能會有以下思考:
某個線程獲取鎖失敗的后續(xù)流程是什么呢绑青?有以下兩種可能:
(1) 將當前線程獲鎖結(jié)果設(shè)置為失敗,獲取鎖流程結(jié)束屋群。這種設(shè)計會極大降低系統(tǒng)的并發(fā)度闸婴,并不滿足我們實際的需求。所以就需要下面這種流程芍躏,也就是AQS框架的處理流程邪乍。
(2) 存在某種排隊等候機制,線程繼續(xù)等待对竣,仍然保留獲取鎖的可能庇楞,獲取鎖流程仍在繼續(xù)。
對于問題1的第二種情況否纬,既然說到了排隊等候機制吕晌,那么就一定會有某種隊列形成,這樣的隊列是什么數(shù)據(jù)結(jié)構(gòu)呢临燃?
處于排隊等候機制中的線程睛驳,什么時候可以有機會獲取鎖呢?
如果處于排隊等候機制中的線程一直無法獲取鎖膜廊,還是需要一直等待嗎乏沸,還是有別的策略來解決這一問題?
帶著非公平鎖的這些問題爪瓜,再看下公平鎖源碼中獲鎖的方式:
// java.util.concurrent.locks.ReentrantLock#FairSyncstaticfinalclass FairSync extends Sync {? ...finalvoid lock() {acquire(1);}? ...}
看到這塊代碼蹬跃,我們可能會存在這種疑問:Lock函數(shù)通過Acquire方法進行加鎖,但是具體是如何加鎖的呢铆铆?
結(jié)合公平鎖和非公平鎖的加鎖流程蝶缀,雖然流程上有一定的不同,但是都調(diào)用了Acquire方法薄货,而Acquire方法是FairSync和UnfairSync的父類AQS中的核心方法扼劈。
對于上邊提到的問題,其實在ReentrantLock類源碼中都無法解答菲驴,而這些問題的答案,都是位于Acquire方法所在的類AbstractQueuedSynchronizer中骑冗,也就是本文的核心——AQS赊瞬。下面我們會對AQS以及ReentrantLock和AQS的關(guān)聯(lián)做詳細介紹(相關(guān)問題答案會在2.3.5小節(jié)中解答)。
2 AQS
首先贼涩,我們通過下面的架構(gòu)圖來整體了解一下AQS框架:
上圖中有顏色的為Method巧涧,無顏色的為Attribution。
總的來說遥倦,AQS框架共分為五層谤绳,自上而下由淺入深占锯,從AQS對外暴露的API到底層基礎(chǔ)數(shù)據(jù)。
當有自定義同步器接入時缩筛,只需重寫第一層所需要的部分方法即可消略,不需要關(guān)注底層具體的實現(xiàn)流程。當自定義同步器進行加鎖或者解鎖操作時瞎抛,先經(jīng)過第一層的API進入AQS內(nèi)部方法艺演,然后經(jīng)過第二層進行鎖的獲取,接著對于獲取鎖失敗的流程桐臊,進入第三層和第四層的等待隊列處理胎撤,而這些處理方式均依賴于第五層的基礎(chǔ)數(shù)據(jù)提供層。
下面我們會從整體到細節(jié)断凶,從流程到方法逐一剖析AQS框架伤提,主要分析過程如下:
2.1 原理概覽
AQS核心思想是,如果被請求的共享資源空閑认烁,那么就將當前請求資源的線程設(shè)置為有效的工作線程肿男,將共享資源設(shè)置為鎖定狀態(tài);如果共享資源被占用砚著,就需要一定的阻塞等待喚醒機制來保證鎖分配次伶。這個機制主要用的是CLH隊列的變體實現(xiàn)的,將暫時獲取不到鎖的線程加入到隊列中稽穆。
CLH:Craig冠王、Landin and Hagersten隊列,是單向鏈表舌镶,AQS中的隊列是CLH變體的虛擬雙向隊列(FIFO)柱彻,AQS是通過將每條請求共享資源的線程封裝成一個節(jié)點來實現(xiàn)鎖的分配。
主要原理圖如下:
AQS使用一個Volatile的int類型的成員變量來表示同步狀態(tài)餐胀,通過內(nèi)置的FIFO隊列來完成資源獲取的排隊工作哟楷,通過CAS完成對State值的修改。
2.1.1 AQS數(shù)據(jù)結(jié)構(gòu)
先來看下AQS中最基本的數(shù)據(jù)結(jié)構(gòu)——Node否灾,Node即為上面CLH變體隊列中的節(jié)點卖擅。
解釋一下幾個方法和屬性值的含義:
方法和屬性值含義
waitStatus當前節(jié)點在隊列中的狀態(tài)
thread表示處于該節(jié)點的線程
prev前驅(qū)指針
predecessor返回前驅(qū)節(jié)點,沒有的話拋出npe
nextWaiter指向下一個處于CONDITION狀態(tài)的節(jié)點(由于本篇文章不講述Condition Queue隊列墨技,這個指針不多介紹)
next后繼指針
線程兩種鎖的模式:
模式含義
SHARED表示線程以共享的模式等待鎖
EXCLUSIVE表示線程正在以獨占的方式等待鎖
waitStatus有下面幾個枚舉值:
枚舉含義
0當一個Node被初始化的時候的默認值
CANCELLED為1惩阶,表示線程獲取鎖的請求已經(jīng)取消了
CONDITION為-2,表示節(jié)點在等待隊列中扣汪,節(jié)點線程等待喚醒
PROPAGATE為-3断楷,當前線程處在SHARED情況下,該字段才會使用
SIGNAL為-1崭别,表示線程已經(jīng)準備好了冬筒,就等資源釋放了
2.1.2 同步狀態(tài)State
在了解數(shù)據(jù)結(jié)構(gòu)后恐锣,接下來了解一下AQS的同步狀態(tài)——State。AQS中維護了一個名為state的字段舞痰,意為同步狀態(tài)土榴,是由Volatile修飾的,用于展示當前臨界資源的獲鎖情況匀奏。
// java.util.concurrent.locks.AbstractQueuedSynchronizerprivatevolatileintstate;
下面提供了幾個訪問這個字段的方法:
方法名描述
protected final int getState()獲取State的值
protected final void setState(int newState)設(shè)置State的值
protected final boolean compareAndSetState(int expect, int update)使用CAS方式更新State
這幾個方法都是Final修飾的鞭衩,說明子類中無法重寫它們。我們可以通過修改State字段表示的同步狀態(tài)來實現(xiàn)多線程的獨占模式和共享模式(加鎖過程)娃善。
對于我們自定義的同步工具论衍,需要自定義獲取同步狀態(tài)和釋放狀態(tài)的方式,也就是AQS架構(gòu)圖中的第一層:API層聚磺。
2.2 AQS重要方法與ReentrantLock的關(guān)聯(lián)
從架構(gòu)圖中可以得知坯台,AQS提供了大量用于自定義同步器實現(xiàn)的Protected方法。自定義同步器實現(xiàn)的相關(guān)方法也只是為了通過修改State字段來實現(xiàn)多線程的獨占模式或者共享模式瘫寝。自定義同步器需要實現(xiàn)以下方法(ReentrantLock需要實現(xiàn)的方法如下蜒蕾,并不是全部):
方法名描述
protected boolean isHeldExclusively()該線程是否正在獨占資源。只有用到Condition才需要去實現(xiàn)它焕阿。
protected boolean tryAcquire(int arg)獨占方式咪啡。arg為獲取鎖的次數(shù),嘗試獲取資源暮屡,成功則返回True撤摸,失敗則返回False。
protected boolean tryRelease(int arg)獨占方式褒纲。arg為釋放鎖的次數(shù)准夷,嘗試釋放資源,成功則返回True莺掠,失敗則返回False衫嵌。
protected int tryAcquireShared(int arg)共享方式。arg為獲取鎖的次數(shù)彻秆,嘗試獲取資源楔绞。負數(shù)表示失敗唇兑;0表示成功墓律,但沒有剩余可用資源;正數(shù)表示成功幔亥,且有剩余資源。
protected boolean tryReleaseShared(int arg)共享方式察纯。arg為釋放鎖的次數(shù)帕棉,嘗試釋放資源针肥,如果釋放后允許喚醒后續(xù)等待結(jié)點返回True,否則返回False香伴。
一般來說慰枕,自定義同步器要么是獨占方式,要么是共享方式即纲,它們也只需實現(xiàn)tryAcquire-tryRelease具帮、tryAcquireShared-tryReleaseShared中的一種即可。AQS也支持自定義同步器同時實現(xiàn)獨占和共享兩種方式低斋,如ReentrantReadWriteLock蜂厅。ReentrantLock是獨占鎖,所以實現(xiàn)了tryAcquire-tryRelease膊畴。
以非公平鎖為例掘猿,這里主要闡述一下非公平鎖與AQS之間方法的關(guān)聯(lián)之處,具體每一處核心方法的作用會在文章后面詳細進行闡述唇跨。
為了幫助大家理解ReentrantLock和AQS之間方法的交互過程稠通,以非公平鎖為例,我們將加鎖和解鎖的交互流程單獨拎出來強調(diào)一下买猖,以便于對后續(xù)內(nèi)容的理解改橘。
加鎖:
通過ReentrantLock的加鎖方法Lock進行加鎖操作。
會調(diào)用到內(nèi)部類Sync的Lock方法玉控,由于Sync#lock是抽象方法飞主,根據(jù)ReentrantLock初始化選擇的公平鎖和非公平鎖,執(zhí)行相關(guān)內(nèi)部類的Lock方法奸远,本質(zhì)上都會執(zhí)行AQS的Acquire方法既棺。
AQS的Acquire方法會執(zhí)行tryAcquire方法,但是由于tryAcquire需要自定義同步器實現(xiàn)懒叛,因此執(zhí)行了ReentrantLock中的tryAcquire方法丸冕,由于ReentrantLock是通過公平鎖和非公平鎖內(nèi)部類實現(xiàn)的tryAcquire方法,因此會根據(jù)鎖類型不同薛窥,執(zhí)行不同的tryAcquire胖烛。
tryAcquire是獲取鎖邏輯,獲取失敗后诅迷,會執(zhí)行框架AQS的后續(xù)邏輯佩番,跟ReentrantLock自定義同步器無關(guān)。
解鎖:
通過ReentrantLock的解鎖方法Unlock進行解鎖罢杉。
Unlock會調(diào)用內(nèi)部類Sync的Release方法趟畏,該方法繼承于AQS。
Release中會調(diào)用tryRelease方法滩租,tryRelease需要自定義同步器實現(xiàn)赋秀,tryRelease只在ReentrantLock中的Sync實現(xiàn)利朵,因此可以看出,釋放鎖的過程猎莲,并不區(qū)分是否為公平鎖绍弟。
釋放成功后,所有處理由AQS框架完成著洼,與自定義同步器無關(guān)樟遣。
通過上面的描述,大概可以總結(jié)出ReentrantLock加鎖解鎖時API層核心方法的映射關(guān)系身笤。
2.3 通過ReentrantLock理解AQS
ReentrantLock中公平鎖和非公平鎖在底層是相同的豹悬,這里以非公平鎖為例進行分析。
在非公平鎖中展鸡,有一段這樣的代碼:
// java.util.concurrent.locks.ReentrantLockstaticfinalclass NonfairSync extends Sync {...finalvoid lock() {if(compareAndSetState(0,1))setExclusiveOwnerThread(Thread.currentThread());elseacquire(1);}? ...}
看一下這個Acquire是怎么寫的:
// java.util.concurrent.locks.AbstractQueuedSynchronizerpublic final void acquire(int arg) {if(!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))selfInterrupt();}
再看一下tryAcquire方法:
// java.util.concurrent.locks.AbstractQueuedSynchronizerprotected boolean tryAcquire(int arg) {thrownewUnsupportedOperationException();}
可以看出屿衅,這里只是AQS的簡單實現(xiàn),具體獲取鎖的實現(xiàn)方法是由各自的公平鎖和非公平鎖單獨實現(xiàn)的(以ReentrantLock為例)莹弊。如果該方法返回了True涤久,則說明當前線程獲取鎖成功,就不用往后執(zhí)行了忍弛;如果獲取失敗响迂,就需要加入到等待隊列中。下面會詳細解釋線程是何時以及怎樣被加入進等待隊列中的细疚。
2.3.1 線程加入等待隊列
2.3.1.1 加入隊列的時機
當執(zhí)行Acquire(1)時蔗彤,會通過tryAcquire獲取鎖。在這種情況下疯兼,如果獲取鎖失敗然遏,就會調(diào)用addWaiter加入到等待隊列中去。
2.3.1.2 如何加入隊列
獲取鎖失敗后吧彪,會執(zhí)行addWaiter(Node.EXCLUSIVE)加入等待隊列待侵,具體實現(xiàn)方法如下:
// java.util.concurrent.locks.AbstractQueuedSynchronizerprivateNode addWaiter(Node mode) {Node node = new Node(Thread.currentThread(), mode);// Try the fast path of enq; backup to full enq on failureNode pred = tail;if(pred !=null) {node.prev = pred;if(compareAndSetTail(pred, node)) {pred.next = node;returnnode;}}enq(node);returnnode;}privatefinalboolean compareAndSetTail(Nodeexpect, Node update) {returnunsafe.compareAndSwapObject(this, tailOffset,expect, update);}
主要的流程如下:
通過當前的線程和鎖模式新建一個節(jié)點。
Pred指針指向尾節(jié)點Tail姨裸。
將New中Node的Prev指針指向Pred秧倾。
通過compareAndSetTail方法,完成尾節(jié)點的設(shè)置傀缩。這個方法主要是對tailOffset和Expect進行比較那先,如果tailOffset的Node和Expect的Node地址是相同的,那么設(shè)置Tail的值為Update的值赡艰。
// java.util.concurrent.locks.AbstractQueuedSynchronizerstatic{try{stateOffset = unsafe.objectFieldOffset(AbstractQueuedSynchronizer.class.getDeclaredField("state"));headOffset = unsafe.objectFieldOffset(AbstractQueuedSynchronizer.class.getDeclaredField("head"));tailOffset = unsafe.objectFieldOffset(AbstractQueuedSynchronizer.class.getDeclaredField("tail"));waitStatusOffset = unsafe.objectFieldOffset(Node.class.getDeclaredField("waitStatus"));nextOffset = unsafe.objectFieldOffset(Node.class.getDeclaredField("next"));}catch(Exception ex) {thrownewError(ex);? }}
從AQS的靜態(tài)代碼塊可以看出售淡,都是獲取一個對象的屬性相對于該對象在內(nèi)存當中的偏移量,這樣我們就可以根據(jù)這個偏移量在對象內(nèi)存當中找到這個屬性。tailOffset指的是tail對應(yīng)的偏移量揖闸,所以這個時候會將new出來的Node置為當前隊列的尾節(jié)點苦掘。同時,由于是雙向鏈表楔壤,也需要將前一個節(jié)點指向尾節(jié)點。
如果Pred指針是Null(說明等待隊列中沒有元素)惯驼,或者當前Pred指針和Tail指向的位置不同(說明被別的線程已經(jīng)修改)蹲嚣,就需要看一下Enq的方法。
// java.util.concurrent.locks.AbstractQueuedSynchronizerprivate Node enq(final Node node) {for(;;) {Node t = tail;if(t ==null) {// Must initializeif(compareAndSetHead(newNode()))tail = head;}else{node.prev = t;if(compareAndSetTail(t, node)) {t.next = node;returnt;}}}}
如果沒有被初始化祟牲,需要進行初始化一個頭結(jié)點出來隙畜。但請注意,初始化的頭結(jié)點并不是當前線程節(jié)點说贝,而是調(diào)用了無參構(gòu)造函數(shù)的節(jié)點议惰。如果經(jīng)歷了初始化或者并發(fā)導(dǎo)致隊列中有元素,則與之前的方法相同乡恕。其實言询,addWaiter就是一個在雙端鏈表添加尾節(jié)點的操作,需要注意的是傲宜,雙端鏈表的頭結(jié)點是一個無參構(gòu)造函數(shù)的頭結(jié)點运杭。
總結(jié)一下,線程獲取鎖的時候函卒,過程大體如下:
當沒有線程獲取到鎖時辆憔,線程1獲取鎖成功。
線程2申請鎖报嵌,但是鎖被線程1占有虱咧。
如果再有線程要獲取鎖,依次在隊列中往后排隊即可锚国。
回到上邊的代碼腕巡,hasQueuedPredecessors是公平鎖加鎖時判斷等待隊列中是否存在有效節(jié)點的方法。如果返回False跷叉,說明當前線程可以爭取共享資源逸雹;如果返回True,說明隊列中存在有效節(jié)點云挟,當前線程必須加入到等待隊列中梆砸。
// java.util.concurrent.locks.ReentrantLockpublic final boolean hasQueuedPredecessors() {// The correctness of this depends on head being initialized// before tail and on head.next being accurate if the current// thread is first in queue.Node t = tail;// Read fields in reverse initialization orderNode h = head;Node s;returnh != t && ((s = h.next) ==null|| s.thread != Thread.currentThread());}
看到這里,我們理解一下h != t && ((s = h.next) == null || s.thread != Thread.currentThread());為什么要判斷的頭結(jié)點的下一個節(jié)點园欣?第一個節(jié)點儲存的數(shù)據(jù)是什么帖世?
雙向鏈表中,第一個節(jié)點為虛節(jié)點,其實并不存儲任何信息日矫,只是占位赂弓。真正的第一個有數(shù)據(jù)的節(jié)點,是在第二個節(jié)點開始的哪轿。當h != t時: 如果(s = h.next) == null盈魁,等待隊列正在有線程進行初始化,但只是進行到了Tail指向Head窃诉,沒有將Head指向Tail杨耙,此時隊列中有元素,需要返回True(這塊具體見下邊代碼分析)飘痛。 如果(s = h.next) != null珊膜,說明此時隊列中至少有一個有效節(jié)點。如果此時s.thread == Thread.currentThread()宣脉,說明等待隊列的第一個有效節(jié)點中的線程與當前線程相同车柠,那么當前線程是可以獲取資源的;如果s.thread != Thread.currentThread()塑猖,說明等待隊列的第一個有效節(jié)點線程與當前線程不同竹祷,當前線程必須加入進等待隊列。
// java.util.concurrent.locks.AbstractQueuedSynchronizer#enqif(t ==null) {// Must initializeif(compareAndSetHead(newNode()))tail = head;}else{node.prev = t;if(compareAndSetTail(t, node)) {t.next = node;returnt;}}
節(jié)點入隊不是原子操作萌庆,所以會出現(xiàn)短暫的head != tail溶褪,此時Tail指向最后一個節(jié)點,而且Tail指向Head践险。如果Head沒有指向Tail(可見5猿妈、6、7行)巍虫,這種情況下也需要將相關(guān)線程加入隊列中彭则。所以這塊代碼是為了解決極端情況下的并發(fā)問題。
2.3.1.3 等待隊列中線程出隊列時機
回到最初的源碼:
// java.util.concurrent.locks.AbstractQueuedSynchronizerpublic final void acquire(int arg) {if(!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))selfInterrupt();}
上文解釋了addWaiter方法占遥,這個方法其實就是把對應(yīng)的線程以Node的數(shù)據(jù)結(jié)構(gòu)形式加入到雙端隊列里俯抖,返回的是一個包含該線程的Node。而這個Node會作為參數(shù)瓦胎,進入到acquireQueued方法中芬萍。acquireQueued方法可以對排隊中的線程進行“獲鎖”操作。
總的來說搔啊,一個線程獲取鎖失敗了柬祠,被放入等待隊列,acquireQueued會把放入隊列中的線程不斷去獲取鎖负芋,直到獲取成功或者不再需要獲嚷住(中斷)。
下面我們從“何時出隊列?”和“如何出隊列莽龟?”兩個方向來分析一下acquireQueued源碼:
// java.util.concurrent.locks.AbstractQueuedSynchronizerfinal boolean acquireQueued(final Node node, int arg) {// 標記是否成功拿到資源booleanfailed =true;try{// 標記等待過程中是否中斷過booleaninterrupted =false;// 開始自旋蠕嫁,要么獲取鎖,要么中斷for(;;) {// 獲取當前節(jié)點的前驅(qū)節(jié)點finalNode p = node.predecessor();// 如果p是頭結(jié)點毯盈,說明當前節(jié)點在真實數(shù)據(jù)隊列的首部剃毒,就嘗試獲取鎖(別忘了頭結(jié)點是虛節(jié)點)if(p == head && tryAcquire(arg)) {// 獲取鎖成功,頭指針移動到當前nodesetHead(node);p.next =null;// help GCfailed =false;returninterrupted;}// 說明p為頭節(jié)點且當前沒有獲取到鎖(可能是非公平鎖被搶占了)或者是p不為頭結(jié)點搂赋,這個時候就要判斷當前node是否要被阻塞(被阻塞條件:前驅(qū)節(jié)點的waitStatus為-1)迟赃,防止無限循環(huán)浪費資源。具體兩個方法下面細細分析if(shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())interrupted =true;}}finally{if(failed)cancelAcquire(node);}}
注:setHead方法是把當前節(jié)點置為虛節(jié)點厂镇,但并沒有修改waitStatus,因為它是一直需要用的數(shù)據(jù)左刽。
// java.util.concurrent.locks.AbstractQueuedSynchronizerprivate void setHead(Node node) {head = node;node.thread =null;node.prev =null;}// java.util.concurrent.locks.AbstractQueuedSynchronizer// 靠前驅(qū)節(jié)點判斷當前線程是否應(yīng)該被阻塞private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {// 獲取頭結(jié)點的節(jié)點狀態(tài)intws = pred.waitStatus;// 說明頭結(jié)點處于喚醒狀態(tài)if(ws == Node.SIGNAL)returntrue;// 通過枚舉值我們知道waitStatus>0是取消狀態(tài)if(ws >0) {do{// 循環(huán)向前查找取消節(jié)點捺信,把取消節(jié)點從隊列中剔除node.prev = pred = pred.prev;}while(pred.waitStatus >0);pred.next = node;}else{// 設(shè)置前任節(jié)點等待狀態(tài)為SIGNALcompareAndSetWaitStatus(pred, ws, Node.SIGNAL);}returnfalse;}
parkAndCheckInterrupt主要用于掛起當前線程,阻塞調(diào)用棧欠痴,返回當前線程的中斷狀態(tài)迄靠。
// java.util.concurrent.locks.AbstractQueuedSynchronizerprivatefinalboolean parkAndCheckInterrupt() {? ? LockSupport.park(this);returnThread.interrupted();}
上述方法的流程圖如下:
從上圖可以看出,跳出當前循環(huán)的條件是當“前置節(jié)點是頭結(jié)點喇辽,且當前線程獲取鎖成功”掌挚。為了防止因死循環(huán)導(dǎo)致CPU資源被浪費,我們會判斷前置節(jié)點的狀態(tài)來決定是否要將當前線程掛起菩咨,具體掛起流程用流程圖表示如下(shouldParkAfterFailedAcquire流程):
從隊列中釋放節(jié)點的疑慮打消了,那么又有新問題了:
shouldParkAfterFailedAcquire中取消節(jié)點是怎么生成的呢?什么時候會把一個節(jié)點的waitStatus設(shè)置為-1灭必?
是在什么時間釋放節(jié)點通知到被掛起的線程呢理肺?
2.3.2 CANCELLED狀態(tài)節(jié)點生成
acquireQueued方法中的Finally代碼:
// java.util.concurrent.locks.AbstractQueuedSynchronizerfinal boolean acquireQueued(final Node node, int arg) {booleanfailed =true;try{? ? ...for(;;) {finalNode p = node.predecessor();if(p == head && tryAcquire(arg)) {...failed =false;? ? ? ? ...}...}finally{if(failed)cancelAcquire(node);}}
通過cancelAcquire方法,將Node的狀態(tài)標記為CANCELLED云茸。接下來是目,我們逐行來分析這個方法的原理:
// java.util.concurrent.locks.AbstractQueuedSynchronizerprivate void cancelAcquire(Node node) {// 將無效節(jié)點過濾if(node ==null)return;// 設(shè)置該節(jié)點不關(guān)聯(lián)任何線程,也就是虛節(jié)點node.thread =null;Node pred = node.prev;// 通過前驅(qū)節(jié)點标捺,跳過取消狀態(tài)的nodewhile(pred.waitStatus >0)node.prev = pred = pred.prev;// 獲取過濾后的前驅(qū)節(jié)點的后繼節(jié)點Node predNext = pred.next;// 把當前node的狀態(tài)設(shè)置為CANCELLEDnode.waitStatus = Node.CANCELLED;// 如果當前節(jié)點是尾節(jié)點懊纳,將從后往前的第一個非取消狀態(tài)的節(jié)點設(shè)置為尾節(jié)點// 更新失敗的話,則進入else亡容,如果更新成功嗤疯,將tail的后繼節(jié)點設(shè)置為nullif(node == tail && compareAndSetTail(node, pred)) {compareAndSetNext(pred, predNext,null);}else{intws;// 如果當前節(jié)點不是head的后繼節(jié)點,1:判斷當前節(jié)點前驅(qū)節(jié)點的是否為SIGNAL萍倡,2:如果不是身弊,則把前驅(qū)節(jié)點設(shè)置為SINGAL看是否成功// 如果1和2中有一個為true,再判斷當前節(jié)點的線程是否為null// 如果上述條件都滿足,把當前節(jié)點的前驅(qū)節(jié)點的后繼指針指向當前節(jié)點的后繼節(jié)點if(pred != head && ((ws = pred.waitStatus) == Node.SIGNAL || (ws <=0&& compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) && pred.thread !=null) {Node next = node.next;if(next !=null&& next.waitStatus <=0)compareAndSetNext(pred, predNext, next);}else{// 如果當前節(jié)點是head的后繼節(jié)點阱佛,或者上述條件不滿足帖汞,那就喚醒當前節(jié)點的后繼節(jié)點unparkSuccessor(node);}node.next = node;// help GC}}
當前的流程:
獲取當前節(jié)點的前驅(qū)節(jié)點,如果前驅(qū)節(jié)點的狀態(tài)是CANCELLED凑术,那就一直往前遍歷翩蘸,找到第一個waitStatus <= 0的節(jié)點,將找到的Pred節(jié)點和當前Node關(guān)聯(lián)淮逊,將當前Node設(shè)置為CANCELLED催首。
根據(jù)當前節(jié)點的位置,考慮以下三種情況:
(1) 當前節(jié)點是尾節(jié)點泄鹏。
(2) 當前節(jié)點是Head的后繼節(jié)點郎任。
(3) 當前節(jié)點不是Head的后繼節(jié)點,也不是尾節(jié)點备籽。
根據(jù)上述第二條舶治,我們來分析每一種情況的流程。
當前節(jié)點是尾節(jié)點车猬。
當前節(jié)點是Head的后繼節(jié)點霉猛。
當前節(jié)點不是Head的后繼節(jié)點,也不是尾節(jié)點珠闰。
通過上面的流程惜浅,我們對于CANCELLED節(jié)點狀態(tài)的產(chǎn)生和變化已經(jīng)有了大致的了解,但是為什么所有的變化都是對Next指針進行了操作伏嗜,而沒有對Prev指針進行操作呢坛悉?什么情況下會對Prev指針進行操作?
執(zhí)行cancelAcquire的時候承绸,當前節(jié)點的前置節(jié)點可能已經(jīng)從隊列中出去了(已經(jīng)執(zhí)行過Try代碼塊中的shouldParkAfterFailedAcquire方法了)吹散,如果此時修改Prev指針,有可能會導(dǎo)致Prev指向另一個已經(jīng)移除隊列的Node八酒,因此這塊變化Prev指針不安全空民。 shouldParkAfterFailedAcquire方法中,會執(zhí)行下面的代碼羞迷,其實就是在處理Prev指針界轩。shouldParkAfterFailedAcquire是獲取鎖失敗的情況下才會執(zhí)行,進入該方法后衔瓮,說明共享資源已被獲取浊猾,當前節(jié)點之前的節(jié)點都不會出現(xiàn)變化,因此這個時候變更Prev指針比較安全热鞍。
do{node.prev = pred = pred.prev;}while(pred.waitStatus >0);
2.3.3 如何解鎖
我們已經(jīng)剖析了加鎖過程中的基本流程葫慎,接下來再對解鎖的基本流程進行分析衔彻。由于ReentrantLock在解鎖的時候,并不區(qū)分公平鎖和非公平鎖偷办,所以我們直接看解鎖的源碼:
// java.util.concurrent.locks.ReentrantLockpublic void unlock() {sync.release(1);}
可以看到艰额,本質(zhì)釋放鎖的地方,是通過框架來完成的椒涯。
// java.util.concurrent.locks.AbstractQueuedSynchronizerpublic final boolean release(int arg) {if(tryRelease(arg)) {Node h = head;if(h !=null&& h.waitStatus !=0)unparkSuccessor(h);returntrue;}returnfalse;}
在ReentrantLock里面的公平鎖和非公平鎖的父類Sync定義了可重入鎖的釋放鎖機制柄沮。
// java.util.concurrent.locks.ReentrantLock.Sync// 方法返回當前鎖是不是沒有被線程持有protected final boolean tryRelease(int releases) {// 減少可重入次數(shù)intc = getState() - releases;// 當前線程不是持有鎖的線程,拋出異常if(Thread.currentThread() != getExclusiveOwnerThread())thrownewIllegalMonitorStateException();booleanfree =false;// 如果持有線程全部釋放废岂,將當前獨占鎖所有線程設(shè)置為null祖搓,并更新stateif(c ==0) {free =true;setExclusiveOwnerThread(null);}setState(c);returnfree;}
我們來解釋下述源碼:
// java.util.concurrent.locks.AbstractQueuedSynchronizerpublic final boolean release(int arg) {// 上邊自定義的tryRelease如果返回true,說明該鎖沒有被任何線程持有if(tryRelease(arg)) {// 獲取頭結(jié)點Node h = head;// 頭結(jié)點不為空并且頭結(jié)點的waitStatus不是初始化節(jié)點情況湖苞,解除線程掛起狀態(tài)if(h !=null&& h.waitStatus !=0)unparkSuccessor(h);returntrue;}returnfalse;}
這里的判斷條件為什么是h != null && h.waitStatus != 0拯欧?
h == null Head還沒初始化。初始情況下财骨,head == null哈扮,第一個節(jié)點入隊,Head會被初始化一個虛擬節(jié)點蚓再。所以說,這里如果還沒來得及入隊包各,就會出現(xiàn)head == null 的情況摘仅。
h != null && waitStatus == 0 表明后繼節(jié)點對應(yīng)的線程仍在運行中,不需要喚醒问畅。
h != null && waitStatus < 0 表明后繼節(jié)點可能被阻塞了娃属,需要喚醒。
再看一下unparkSuccessor方法:
// java.util.concurrent.locks.AbstractQueuedSynchronizerprivate void unparkSuccessor(Node node) {// 獲取頭結(jié)點waitStatusintws = node.waitStatus;if(ws <0)compareAndSetWaitStatus(node, ws,0);// 獲取當前節(jié)點的下一個節(jié)點Node s = node.next;// 如果下個節(jié)點是null或者下個節(jié)點被cancelled护姆,就找到隊列最開始的非cancelled的節(jié)點if(s ==null|| s.waitStatus >0) {s =null;// 就從尾部節(jié)點開始找矾端,到隊首,找到隊列第一個waitStatus<0的節(jié)點卵皂。for(Node t = tail; t !=null&& t != node; t = t.prev)if(t.waitStatus <=0)s = t;}// 如果當前節(jié)點的下個節(jié)點不為空秩铆,而且狀態(tài)<=0,就把當前節(jié)點unparkif(s !=null)LockSupport.unpark(s.thread);}
為什么要從后往前找第一個非Cancelled的節(jié)點呢灯变?原因如下殴玛。
之前的addWaiter方法:
// java.util.concurrent.locks.AbstractQueuedSynchronizerprivateNode addWaiter(Node mode) {Node node = new Node(Thread.currentThread(), mode);// Try the fast path of enq; backup to full enq on failureNode pred = tail;if(pred !=null) {node.prev = pred;if(compareAndSetTail(pred, node)) {pred.next = node;returnnode;}}enq(node);returnnode;}
我們從這里可以看到,節(jié)點入隊并不是原子操作添祸,也就是說滚粟,node.prev = pred; compareAndSetTail(pred, node) 這兩個地方可以看作Tail入隊的原子操作,但是此時pred.next = node;還沒執(zhí)行刃泌,如果這個時候執(zhí)行了unparkSuccessor方法凡壤,就沒辦法從前往后找了署尤,所以需要從后往前找。還有一點原因亚侠,在產(chǎn)生CANCELLED狀態(tài)節(jié)點的時候曹体,先斷開的是Next指針,Prev指針并未斷開盖奈,因此也是必須要從后往前遍歷才能夠遍歷完全部的Node混坞。
綜上所述,如果是從前往后找钢坦,由于極端情況下入隊的非原子操作和CANCELLED節(jié)點產(chǎn)生過程中斷開Next指針的操作究孕,可能會導(dǎo)致無法遍歷所有的節(jié)點。所以爹凹,喚醒對應(yīng)的線程后厨诸,對應(yīng)的線程就會繼續(xù)往下執(zhí)行。繼續(xù)執(zhí)行acquireQueued方法以后禾酱,中斷如何處理微酬?
2.3.4 中斷恢復(fù)后的執(zhí)行流程
喚醒后,會執(zhí)行return Thread.interrupted();颤陶,這個函數(shù)返回的是當前執(zhí)行線程的中斷狀態(tài)颗管,并清除。
// java.util.concurrent.locks.AbstractQueuedSynchronizerprivatefinalboolean parkAndCheckInterrupt() {LockSupport.park(this);returnThread.interrupted();}
再回到acquireQueued代碼滓走,當parkAndCheckInterrupt返回True或者False的時候垦江,interrupted的值不同,但都會執(zhí)行下次循環(huán)搅方。如果這個時候獲取鎖成功比吭,就會把當前interrupted返回。
// java.util.concurrent.locks.AbstractQueuedSynchronizerfinal boolean acquireQueued(final Node node, int arg) {booleanfailed =true;try{booleaninterrupted =false;for(;;) {finalNode p = node.predecessor();if(p == head && tryAcquire(arg)) {setHead(node);p.next =null;// help GCfailed =false;returninterrupted;}if(shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())interrupted =true;}}finally{if(failed)cancelAcquire(node);}}
如果acquireQueued為True姨涡,就會執(zhí)行selfInterrupt方法衩藤。
// java.util.concurrent.locks.AbstractQueuedSynchronizerstaticvoidselfInterrupt() {Thread.currentThread().interrupt();}
該方法其實是為了中斷線程。但為什么獲取了鎖以后還要中斷線程呢涛漂?這部分屬于Java提供的協(xié)作式中斷知識內(nèi)容赏表,感興趣同學(xué)可以查閱一下。這里簡單介紹一下:
當中斷線程被喚醒時匈仗,并不知道被喚醒的原因底哗,可能是當前線程在等待中被中斷,也可能是釋放了鎖以后被喚醒锚沸。因此我們通過Thread.interrupted()方法檢查中斷標記(該方法返回了當前線程的中斷狀態(tài)跋选,并將當前線程的中斷標識設(shè)置為False),并記錄下來哗蜈,如果發(fā)現(xiàn)該線程被中斷過前标,就再中斷一次坠韩。
線程在等待資源的過程中被喚醒,喚醒后還是會不斷地去嘗試獲取鎖炼列,直到搶到鎖為止只搁。也就是說,在整個流程中俭尖,并不響應(yīng)中斷氢惋,只是記錄中斷記錄。最后搶到鎖返回了稽犁,那么如果被中斷過的話焰望,就需要補充一次中斷。
這里的處理方式主要是運用線程池中基本運作單元Worder中的runWorker已亥,通過Thread.interrupted()進行額外的判斷處理熊赖,感興趣的同學(xué)可以看下ThreadPoolExecutor源碼。
2.3.5 小結(jié)
我們在1.3小節(jié)中提出了一些問題虑椎,現(xiàn)在來回答一下震鹉。
Q:某個線程獲取鎖失敗的后續(xù)流程是什么呢?
A:存在某種排隊等候機制捆姜,線程繼續(xù)等待传趾,仍然保留獲取鎖的可能,獲取鎖流程仍在繼續(xù)泥技。
Q:既然說到了排隊等候機制浆兰,那么就一定會有某種隊列形成,這樣的隊列是什么數(shù)據(jù)結(jié)構(gòu)呢零抬?
A:是CLH變體的FIFO雙端隊列。
Q:處于排隊等候機制中的線程宽涌,什么時候可以有機會獲取鎖呢平夜?
A:可以詳細看下2.3.1.3小節(jié)。
Q:如果處于排隊等候機制中的線程一直無法獲取鎖卸亮,需要一直等待么忽妒?還是有別的策略來解決這一問題?
A:線程所在節(jié)點的狀態(tài)會變成取消狀態(tài)兼贸,取消狀態(tài)的節(jié)點會從隊列中釋放段直,具體可見2.3.2小節(jié)。
Q:Lock函數(shù)通過Acquire方法進行加鎖溶诞,但是具體是如何加鎖的呢鸯檬?
A:AQS的Acquire會調(diào)用tryAcquire方法,tryAcquire由各個自定義同步器實現(xiàn)螺垢,通過tryAcquire完成加鎖過程喧务。
3 AQS應(yīng)用
3.1 ReentrantLock的可重入應(yīng)用
ReentrantLock的可重入性是AQS很好的應(yīng)用之一赖歌,在了解完上述知識點以后,我們很容易得知ReentrantLock實現(xiàn)可重入的方法功茴。在ReentrantLock里面庐冯,不管是公平鎖還是非公平鎖,都有一段邏輯坎穿。
公平鎖:
// java.util.concurrent.locks.ReentrantLock.FairSync#tryAcquireif(c ==0) {if(!hasQueuedPredecessors() && compareAndSetState(0, acquires)) {setExclusiveOwnerThread(current);returntrue;}}elseif(current == getExclusiveOwnerThread()) {int nextc = c + acquires;if(nextc <0)thrownewError("Maximum lock count exceeded");setState(nextc);returntrue;}
非公平鎖:
// java.util.concurrent.locks.ReentrantLock.Sync#nonfairTryAcquireif(c ==0) {if(compareAndSetState(0, acquires)){setExclusiveOwnerThread(current);returntrue;}}elseif(current == getExclusiveOwnerThread()) {int nextc = c + acquires;if(nextc <0)// overflowthrownewError("Maximum lock count exceeded");setState(nextc);returntrue;}
從上面這兩段都可以看到展父,有一個同步狀態(tài)State來控制整體可重入的情況。State是Volatile修飾的玲昧,用于保證一定的可見性和有序性栖茉。
// java.util.concurrent.locks.AbstractQueuedSynchronizerprivatevolatileintstate;
接下來看State這個字段主要的過程:
State初始化的時候為0,表示沒有任何線程持有鎖酌呆。
當有線程持有該鎖時衡载,值就會在原來的基礎(chǔ)上+1,同一個線程多次獲得鎖是隙袁,就會多次+1痰娱,這里就是可重入的概念。
解鎖也是對這個字段-1菩收,一直到0梨睁,此線程對鎖釋放。
3.2 JUC中的應(yīng)用場景
除了上邊ReentrantLock的可重入性的應(yīng)用娜饵,AQS作為并發(fā)編程的框架坡贺,為很多其他同步工具提供了良好的解決方案。下面列出了JUC中的幾種同步工具箱舞,大體介紹一下AQS的應(yīng)用場景:
同步工具同步工具與AQS的關(guān)聯(lián)
ReentrantLock使用AQS保存鎖重復(fù)持有的次數(shù)遍坟。當一個線程獲取鎖時,ReentrantLock記錄當前獲得鎖的線程標識晴股,用于檢測是否重復(fù)獲取愿伴,以及錯誤線程試圖解鎖操作時異常情況的處理。
Semaphore使用AQS同步狀態(tài)來保存信號量的當前計數(shù)电湘。tryRelease會增加計數(shù)隔节,acquireShared會減少計數(shù)。
CountDownLatch使用AQS同步狀態(tài)來表示計數(shù)寂呛。計數(shù)為0時怎诫,所有的Acquire操作(CountDownLatch的await方法)才可以通過。
ReentrantReadWriteLock使用AQS同步狀態(tài)中的16位保存寫鎖持有的次數(shù)贷痪,剩下的16位用于保存讀鎖的持有次數(shù)幻妓。
ThreadPoolExecutorWorker利用AQS同步狀態(tài)實現(xiàn)對獨占線程變量的設(shè)置(tryAcquire和tryRelease)。
3.3 自定義同步工具
了解AQS基本原理以后劫拢,按照上面所說的AQS知識點涌哲,自己實現(xiàn)一個同步工具胖缤。
publicclass LeeLock? {privatestaticclass Sync extends AbstractQueuedSynchronizer {@Overrideprotected boolean tryAcquire (int arg) {returncompareAndSetState(0,1);? ? ? ? }@Overrideprotected boolean tryRelease (int arg) {? ? ? ? ? ? setState(0);returntrue;? ? ? ? }@Overrideprotected boolean isHeldExclusively () {returngetState() ==1;? ? ? ? }? ? }privateSync sync =newSync();public void lock () {? ? ? ? sync.acquire(1);? ? }public void unlock () {? ? ? ? sync.release(1);? ? }}
通過我們自己定義的Lock完成一定的同步功能。
publicclassLeeMain{staticintcount =0;staticLeeLock leeLock =newLeeLock();public static void main (String[] args) throws InterruptedException{? ? ? ? Runnable runnable =newRunnable() {? ? ? ? ? ? @Override
? ? ? ? ? ? public void run (){try{? ? ? ? ? ? ? ? ? ? leeLock.lock();for(inti =0; i <10000; i++) {? ? ? ? ? ? ? ? ? ? ? ? count++;? ? ? ? ? ? ? ? ? ? }? ? ? ? ? ? ? ? }catch(Exception e) {? ? ? ? ? ? ? ? ? ? e.printStackTrace();? ? ? ? ? ? ? ? }finally{? ? ? ? ? ? ? ? ? ? leeLock.unlock();? ? ? ? ? ? ? ? }? ? ? ? ? ? }? ? ? ? };? ? ? ? Thread thread1 =newThread(runnable);? ? ? ? Thread thread2 =newThread(runnable);? ? ? ? thread1.start();? ? ? ? thread2.start();? ? ? ? thread1.join();? ? ? ? thread2.join();? ? ? ? System.out.println(count);? ? }}
上述代碼每次運行結(jié)果都會是20000阀圾。通過簡單的幾行代碼就能實現(xiàn)同步功能哪廓,這就是AQS的強大之處。
總結(jié)
我們?nèi)粘i_發(fā)中使用并發(fā)的場景太多初烘,但是對并發(fā)內(nèi)部的基本框架原理了解的人卻不多涡真。由于篇幅原因,本文僅介紹了可重入鎖ReentrantLock的原理和AQS原理肾筐,希望能夠成為大家了解AQS和ReentrantLock等同步器的“敲門磚”