理解多線程的并發(fā)鎖劝萤,可結(jié)合多進(jìn)程的分布式鎖(如Zookeeper的互斥鎖沮协、讀寫鎖的實(shí)現(xiàn)原理)住册,本質(zhì)是相通的
介紹
AQS牢屋,AbstractQueuedSynchronizer趁矾,即隊(duì)列同步器耙册。它是構(gòu)建鎖或者其他同步組件的基礎(chǔ)框架(如ReentrantLock、CountDownLatch毫捣、CyclicBarrier详拙、ReentrantReadWriteLock帝际、Semaphore等),JUC并發(fā)包的作者(Doug Lea)期望它能夠成為實(shí)現(xiàn)大部分同步需求的基礎(chǔ)饶辙。它是JUC并發(fā)包中的核心基礎(chǔ)組件蹲诀。
看一眼注釋
/**
* Provides a framework for implementing blocking locks and related
* synchronizers (semaphores, events, etc) that rely on
* first-in-first-out (FIFO) wait queues. This class is designed to
* be a useful basis for most kinds of synchronizers that rely on a
* single atomic {@code int} value to represent state.
**/
AQS使用一個int類型的成員變量state來表示同步狀態(tài),當(dāng)state>0時表示已經(jīng)獲取了鎖弃揽,當(dāng)state = 0時表示釋放了鎖脯爪。它提供了三個方法(getState()、setState(int newState)矿微、compareAndSetState(int expect,int update))來對同步狀態(tài)state進(jìn)行操作痕慢,當(dāng)然AQS可以確保對state的操作是安全的。
AQS通過內(nèi)置的FIFO同步隊(duì)列來完成資源獲取線程的排隊(duì)工作涌矢,如果當(dāng)前線程獲取同步狀態(tài)失斒靥搿(鎖)時,AQS則會將當(dāng)前線程以及等待狀態(tài)等信息構(gòu)造成一個節(jié)點(diǎn)(Node)并將其加入同步隊(duì)列蒿辙,同時會阻塞當(dāng)前線程拇泛,當(dāng)同步狀態(tài)釋放時,則會把節(jié)點(diǎn)中的線程喚醒思灌,使其再次嘗試獲取同步狀態(tài)俺叭。
AQS主要提供了如下一些方法:
getState():
返回同步狀態(tài)的當(dāng)前值;
setState(int newState):
設(shè)置當(dāng)前同步狀態(tài)泰偿;
compareAndSetState(int expect, int update):
使用CAS設(shè)置當(dāng)前狀態(tài)熄守,該方法能夠保證狀態(tài)設(shè)置的原子性;
tryAcquire(int arg):
獨(dú)占式獲取同步狀態(tài)耗跛,獲取同步狀態(tài)成功后裕照,其他線程需要等待該線程釋放同步狀態(tài)才能獲取同步狀態(tài);
tryRelease(int arg):
獨(dú)占式釋放同步狀態(tài)调塌;
tryAcquireShared(int arg):
共享式獲取同步狀態(tài)晋南,返回值大于等于0則表示獲取成功,否則獲取失敻崂负间;
tryReleaseShared(int arg):
共享式釋放同步狀態(tài);
isHeldExclusively():
當(dāng)前同步器是否在獨(dú)占式模式下被線程占用姜凄,一般該方法表示是否被當(dāng)前線程所獨(dú)占政溃;
acquire(int arg):
獨(dú)占式獲取同步狀態(tài),如果當(dāng)前線程獲取同步狀態(tài)成功态秧,則由該方法返回董虱,否則,將會進(jìn)入同步隊(duì)列等待申鱼,該方法將會調(diào)用可重寫的tryAcquire(int arg)方法愤诱;
acquireInterruptibly(int arg):
與acquire(int arg)相同藏鹊,但是該方法響應(yīng)中斷,當(dāng)前線程為獲取到同步狀態(tài)而進(jìn)入到同步隊(duì)列中转锈,如果當(dāng)前線程被中斷盘寡,則該方法會拋出InterruptedException異常并返回;
tryAcquireNanos(int arg,long nanos):
超時獲取同步狀態(tài)撮慨,如果當(dāng)前線程在nanos時間內(nèi)沒有獲取到同步狀態(tài)竿痰,那么將會返回false,已經(jīng)獲取則返回true砌溺;
acquireShared(int arg):
共享式獲取同步狀態(tài)影涉,如果當(dāng)前線程未獲取到同步狀態(tài),將會進(jìn)入同步隊(duì)列等待规伐,與獨(dú)占式的主要區(qū)別是在同一時刻可以有多個線程獲取到同步狀態(tài)蟹倾;
acquireSharedInterruptibly(int arg):
共享式獲取同步狀態(tài),響應(yīng)中斷猖闪;
tryAcquireSharedNanos(int arg, long nanosTimeout):
共享式獲取同步狀態(tài)鲜棠,增加超時限制;
release(int arg):
獨(dú)占式釋放同步狀態(tài)培慌,該方法會在釋放同步狀態(tài)之后豁陆,將同步隊(duì)列中第一個節(jié)點(diǎn)包含的線程喚醒;
releaseShared(int arg):
共享式釋放同步狀態(tài)吵护;
引子
JDK中的并發(fā)鎖工具類或多或少都被大家用過不止一次盒音,比如ReentrantLock,我們知道ReentrantLock的功能是實(shí)現(xiàn)代碼段的并發(fā)訪問控制馅而,也就是通常意義上所說的鎖祥诽,在沒有看到AbstractQueuedSynchronizer前,可能會以為它的實(shí)現(xiàn)是通過類似于synchronized瓮恭,通過對對象加鎖來實(shí)現(xiàn)的雄坪。但事實(shí)上它僅僅是一個工具類!沒有使用更“高級”的機(jī)器指令偎血,不是關(guān)鍵字诸衔,也不依靠JDK編譯時的特殊處理,僅僅作為一個普普通通的類就完成了代碼塊的并發(fā)訪問控制颇玷,這就更讓人疑問它怎么實(shí)現(xiàn)的代碼塊的并發(fā)訪問控制的了。那就讓我們一起來仔細(xì)看下Doug Lea怎么去實(shí)現(xiàn)的這個鎖就缆。
設(shè)計(jì)思想
- AQS從設(shè)計(jì)之初(即僅單單的從AQS本身來說)帖渠,它僅僅只是提供獨(dú)占鎖和共享鎖兩種方式,在這里請大家牢記竭宰,從AQS本身來說空郊,是不存在所謂的公平與非公平性份招。AQS基于模板模式設(shè)計(jì),因?yàn)閺腁PI的設(shè)計(jì)之初篡悟,作者(Doug Lea)已經(jīng)預(yù)訂了任何一個子類只能支持AQS當(dāng)中的獨(dú)占鎖和共享鎖當(dāng)中的一種攒读。所以AQS沒有抽象方法损离,所有方法都有默認(rèn)實(shí)現(xiàn),這有區(qū)別于傳統(tǒng)的模板模式谐腰。作者這樣做的目的是讓子類更加清潔,不需要實(shí)現(xiàn)無關(guān)的抽象方法涩盾。
- AbstractQueuedSynchronizer繼承自AbstractOwnableSynchronizer十气。AbstractOwnableSynchronizer簡稱AOS,AOS其實(shí)相對來說是比較簡單的春霍,AOS里面只有一個屬性砸西,那就是exclusiveOwnerThread,也就是用來標(biāo)識當(dāng)前占有鎖的線程址儒。另外還有2個方法芹枷,分別用來get和set這個exclusiveOwnerThread。
- 作者為什么需要將持有鎖的線程的標(biāo)識向上抽攘ぁ杖狼?這是值得我們思考的。在JDK的源碼中有對AQS這樣的一段描述:
A synchronizer that may be exclusively owned by a thread. This class provides a basis for creating locks and related synchronizers that may entail a notion of ownership. The AbstractOwnableSynchronizer class itself does not manage or use this information. However, subclasses and tools may use appropriately maintained values to help control and monitor access and provide diagnostics.
簡單翻譯如下:同步器是需要被線程互斥訪問的妖爷。AOS提供了一個基本的概念蝶涩,那就是創(chuàng)建鎖時賦予一個對于這個鎖的所有權(quán)。AOS本身并不會去管理或者去使用這些信息絮识。然而子類或者其他工具類或許會在適當(dāng)?shù)臅r候去維護(hù)這些信息用來控制和監(jiān)聽訪問控制權(quán)绿聘。
AOS源碼如下,為了閱讀方便次舌,我去掉了源碼中的注釋熄攘,但是我強(qiáng)烈建議你一定要記得去閱讀它,這樣你才能從框架的設(shè)計(jì)者口中得到最準(zhǔn)確的關(guān)于這個類或者接口的設(shè)計(jì)說明彼念。
public abstract class AbstractOwnableSynchronizer implements java.io.Serializable {
protected AbstractOwnableSynchronizer() { }
private transient Thread exclusiveOwnerThread;
protected final void setExclusiveOwnerThread(Thread t) {
exclusiveOwnerThread = t;
}
protected final Thread getExclusiveOwnerThread() {
return exclusiveOwnerThread;
}
}
到了這里挪圾,我們需要回答一個問題,那就是作者為什么需要將持有鎖的線程的標(biāo)識向上抽戎鹕场哲思?其實(shí)原因是很簡單的。AQS誕生與JDK1.5吩案,而AOS是在JDK1.6才出現(xiàn)的棚赔。也就是說在整個AQS的生命過程中,都沒有用到AOS中聲明的屬性或方法,這些屬性或方法是在AQS的子類中才用到的靠益。也就是說丧肴,在JDK1.6以后,Doug Lea對AQS的子類實(shí)現(xiàn)做出了增強(qiáng)胧后。那么Doug Lea為什么不直接把AOS中聲明的屬性或方法直接放在AQS中芋浮?或許Doug Lea認(rèn)為如果這樣做,是對AQS的一種侵入壳快,因?yàn)锳QS根本不需要這些纸巷,所以就往上抽取了一層。
前奏
在深入分析AQS之前濒憋,先從AQS的功能上說明下AQS何暇,站在使用者的角度,AQS的功能可以分為兩類:獨(dú)占功能和共享功能
凛驮,它的所有子類中裆站,要么實(shí)現(xiàn)并使用了它獨(dú)占功能的API,要么使用了共享鎖的功能黔夭,而不會同時使用兩套API宏胯,即便是它最有名的子類ReentrantReadWriteLock,也是通過兩個內(nèi)部類:讀鎖和寫鎖本姥,分別實(shí)現(xiàn)的兩套API來實(shí)現(xiàn)的
肩袍,為什么這么做,后面我們再分析婚惫,到目前為止氛赐,我們只需要明白AQS在功能上有獨(dú)占控制和共享控制兩種功能即可。
獨(dú)占鎖
對于ReentrantLock先舷,使用過的同學(xué)應(yīng)該都知道艰管,通常是這么用它的:
ReentrantLock reentrantLock = new ReentrantLock();
reentrantLock.lock();
try {
// do something
}finally {
reentrantLock.unlock();
}
}
ReentrantLock會保證 do something在同一時間只有一個線程在執(zhí)行這段代碼,或者說蒋川,同一時刻只有一個線程的lock方法會返回牲芋。其余線程會被掛起,直到獲取鎖捺球。從這里可以看出缸浦,其實(shí)ReentrantLock實(shí)現(xiàn)的就是一個獨(dú)占鎖的功能:有且只有一個線程獲取到鎖,其余線程全部掛起氮兵,直到該擁有鎖的線程釋放鎖裂逐,被掛起的線程被喚醒重新開始競爭鎖。沒錯胆剧,ReentrantLock使用的就是AQS的獨(dú)占API實(shí)現(xiàn)的絮姆。
那現(xiàn)在我們就從ReentrantLock的實(shí)現(xiàn)開始一起看看重入鎖是怎么實(shí)現(xiàn)的醉冤。
首先看lock方法:
/**
* Acquires the lock.
*
* <p>Acquires the lock if it is not held by another thread and returns
* immediately, setting the lock hold count to one.
*
* <p>If the current thread already holds the lock then the hold
* count is incremented by one and the method returns immediately.
*
* <p>If the lock is held by another thread then the
* current thread becomes disabled for thread scheduling
* purposes and lies dormant until the lock has been acquired,
* at which time the lock hold count is set to one.
*/
public void lock() {
sync.lock();
}
ReentrantLock內(nèi)部有代理類完成具體操作秩霍,ReentrantLock只是封裝了統(tǒng)一的一套API而已篙悯。值得注意的是,使用過ReentrantLock的同學(xué)應(yīng)該知道铃绒,ReentrantLock又分為公平鎖和非公平鎖鸽照,所以,ReentrantLock內(nèi)部只有兩個sync的實(shí)現(xiàn):
/**
* Creates an instance of {@code ReentrantLock}.
* This is equivalent to using {@code ReentrantLock(false)}.
*/
public ReentrantLock() {
// 默認(rèn)是非公平鎖,因?yàn)榫S護(hù)公平需要額外的處理成本
sync = new NonfairSync();
}
/**
* Creates an instance of {@code ReentrantLock} with the
* given fairness policy.
*
* @param fair {@code true} if this lock should use a fair ordering policy
*/
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
公平鎖:每個線程搶占鎖的順序?yàn)橄群笳{(diào)用lock方法的順序依次獲取鎖,類似于排隊(duì)吃飯灾票。
非公平鎖:每個線程搶占鎖的順序不定峡谊,誰運(yùn)氣好,誰就獲取到鎖刊苍,和調(diào)用lock方法的先后順序無關(guān)既们,類似于堵車時,加塞的那些XXXX正什。
到這里婴氮,通過ReentrantLock的功能和鎖的所謂排不排隊(duì)的方式斯棒,我們是否可以這么猜測ReentrantLock或者AQS的實(shí)現(xiàn)(現(xiàn)在不清楚誰去實(shí)現(xiàn)這些功能):有那么一個被volatile修飾的標(biāo)志位叫做key,用來表示有沒有線程拿走了鎖旨怠,或者說渠驼,鎖還存不存在,還需要一個線程安全的隊(duì)列鉴腻,維護(hù)一堆被掛起的線程迷扇,以至于當(dāng)鎖被歸還時,能通知到這些被掛起的線程爽哎,可以來競爭獲取鎖了蜓席。
至于公平鎖和非公平鎖,唯一的區(qū)別是在獲取鎖的時候是直接去獲取鎖课锌,還是進(jìn)入隊(duì)列排隊(duì)的問題了厨内。為了驗(yàn)證我們的猜想祈秕,我們繼續(xù)看一下ReentrantLock中公平鎖的實(shí)現(xiàn):
/**
* Sync object for fair locks
*/
static final class FairSync extends Sync {
private static final long serialVersionUID = -3000897897090466540L;
final void lock() {
acquire(1);
}
/**
* Fair version of tryAcquire. Don't grant access unless
* recursive call or no waiters or is first.
*/
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
}
調(diào)用到了AQS的acquire方法:
/**
* Acquires in exclusive mode, ignoring interrupts. Implemented
* by invoking at least once {@link #tryAcquire},
* returning on success. Otherwise the thread is queued, possibly
* repeatedly blocking and unblocking, invoking {@link
* #tryAcquire} until success. This method can be used
* to implement method {@link Lock#lock}.
*
*/
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
從方法名字上看語義是,嘗試獲取鎖雏胃,獲取不到則創(chuàng)建一個waiter(當(dāng)前線程)后放到隊(duì)列中请毛,這和我們猜測的好像很類似.
看下tryAcquire方法:
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
留空了,Doug Lea是想留給子類去實(shí)現(xiàn)(既然要給子類實(shí)現(xiàn)瞭亮,應(yīng)該用抽象方法方仿,但是Doug Lea沒有這么做,原因是AQS有兩種功能统翩,面向兩種使用場景仙蚜,需要給子類定義的方法都是抽象方法了,會導(dǎo)致子類無論如何都需要實(shí)現(xiàn)另外一種場景的抽象方法厂汗,顯然委粉,這對子類來說是不友好的。)
看下FairSync的tryAcquire方法:
/**
* Fair version of tryAcquire. Don't grant access unless
* recursive call or no waiters or is first.
*/
protected final boolean tryAcquire(int acquires) {
// 當(dāng)前線程
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
getState方法是AQS的方法娶桦,因?yàn)樵贏QS里面有個叫statede的標(biāo)志位 :
/**
* The synchronization state.
*/
private volatile int state;
/**
* Returns the current value of synchronization state.
* This operation has memory semantics of a {@code volatile} read.
* @return current state value
*/
protected final int getState() {
return state;
}
事實(shí)上贾节,這個state就是前面我們猜想的那個“key”!
回到tryAcquire方法:
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();//獲取當(dāng)前線程
int c = getState(); //獲取父類AQS中的標(biāo)志位
if (c == 0) {
if (!hasQueuedPredecessors() &&
//如果隊(duì)列中沒有其他線程 說明沒有線程正在占有鎖趟紊!
compareAndSetState(0, acquires)) {
//修改一下狀態(tài)位氮双,注意:這里的acquires是在lock的時候傳遞來的,從上面的圖中可以知道霎匈,這個值是寫死的1
setExclusiveOwnerThread(current);
//如果通過CAS操作將狀態(tài)為更新成功則代表當(dāng)前線程獲取鎖戴差,因此,將當(dāng)前線程設(shè)置到AQS的一個變量中铛嘱,說明這個線程拿走了鎖暖释。
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
//如果不為0 意味著,鎖已經(jīng)被拿走了墨吓,但是球匕,因?yàn)镽eentrantLock是重入鎖,
//是可以重復(fù)lock,unlock的帖烘,只要成對出現(xiàn)行亮曹。一次。這里還要再判斷一次 獲取鎖的線程是不是當(dāng)前請求鎖的線程秘症。
int nextc = c + acquires;//如果是的照卦,累加在state字段上就可以了。
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
到此乡摹,如果如果獲取鎖役耕,tryAcquire返回true,反之聪廉,返回false瞬痘,回到AQS的acquire方法故慈。
如果沒有獲取到鎖,按照我們的描述框全,應(yīng)該將當(dāng)前線程放到隊(duì)列中去察绷,只不過,在放之前竣况,需要做些包裝克婶。
先看addWaiter方法:
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
用當(dāng)前線程去構(gòu)造一個Node對象筒严,mode是一個表示Node類型的字段丹泉,僅僅表示這個節(jié)點(diǎn)是獨(dú)占的,還是共享的鸭蛙,或者說摹恨,AQS的這個隊(duì)列中,哪些節(jié)點(diǎn)是獨(dú)占的娶视,哪些是共享的晒哄。
這里lock調(diào)用的是AQS獨(dú)占的API,當(dāng)然肪获,可以寫死是獨(dú)占狀態(tài)的節(jié)點(diǎn)寝凌。
創(chuàng)建好節(jié)點(diǎn)后,將節(jié)點(diǎn)加入到隊(duì)列尾部孝赫,此處较木,在隊(duì)列不為空的時候,先嘗試通過cas方式修改尾節(jié)點(diǎn)為最新的節(jié)點(diǎn)青柄,如果修改失敗伐债,意味著有并發(fā),這個時候才會進(jìn)入enq中死循環(huán)致开,“自旋”方式修改峰锁。
將線程的節(jié)點(diǎn)接入到隊(duì)里中后,當(dāng)然還需要做一件事:將當(dāng)前線程掛起双戳!這個事虹蒋,由acquireQueued來做.
在解釋acquireQueued之前,我們需要先看下AQS中隊(duì)列的內(nèi)存結(jié)構(gòu)飒货,我們知道魄衅,隊(duì)列由Node類型的節(jié)點(diǎn)組成,其中至少有兩個變量膏斤,一個封裝線程徐绑,一個封裝節(jié)點(diǎn)類型。
而實(shí)際上莫辨,它的內(nèi)存結(jié)構(gòu)是這樣的(第一次節(jié)點(diǎn)插入時傲茄,第一個節(jié)點(diǎn)是一個空節(jié)點(diǎn)毅访,如果不為空,則表示有一個線程已經(jīng)獲取鎖盘榨。事實(shí)上喻粹,隊(duì)列的第一個節(jié)點(diǎn)就是代表持有鎖的節(jié)點(diǎn)):
黃色節(jié)點(diǎn)為隊(duì)列默認(rèn)的頭節(jié)點(diǎn),每次有線程競爭失敗守呜,進(jìn)入隊(duì)列后其實(shí)都是插入到隊(duì)列的尾節(jié)點(diǎn)(tail后面)后面山憨。這個從enq方法可以看出來,上文中有提到enq方法為將節(jié)點(diǎn)插入隊(duì)列的方法:
/**
* Inserts node into queue, initializing if necessary. See picture above.
* @param node the node to insert
* @return node's predecessor
*/
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
/**
* CAS tail field. Used only by enq.
*/
private final boolean compareAndSetTail(Node expect, Node update) {
return unsafe.compareAndSwapObject(this, tailOffset, expect, update);
}
// 用到了unsafe 操作:
/**
* Setup to support compareAndSet. We need to natively implement
* this here: For the sake of permitting future enhancements, we
* cannot explicitly subclass AtomicInteger, which would be
* efficient and useful otherwise. So, as the lesser of evils, we
* natively implement using hotspot intrinsics API. And while we
* are at it, we do the same for other CASable fields (which could
* otherwise be done with atomic field updaters).
*/
private static final Unsafe unsafe = Unsafe.getUnsafe();
private static final long stateOffset;
private static final long headOffset;
private static final long tailOffset;
private static final long waitStatusOffset;
private static final long nextOffset;
static {
try {
stateOffset = unsafe.objectFieldOffset
(AbstractQueuedSynchronizer.class.getDeclaredField("state"));
headOffset = unsafe.objectFieldOffset
(AbstractQueuedSynchronizer.class.getDeclaredField("head"));
tailOffset = unsafe.objectFieldOffset
(AbstractQueuedSynchronizer.class.getDeclaredField("tail"));
waitStatusOffset = unsafe.objectFieldOffset
(Node.class.getDeclaredField("waitStatus"));
nextOffset = unsafe.objectFieldOffset
(Node.class.getDeclaredField("next"));
} catch (Exception ex) { throw new Error(ex); }
}
再回來看看
* Acquires in exclusive uninterruptible mode for thread already in
* queue. Used by condition wait methods as well as acquire.
*
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
//如果當(dāng)前的節(jié)點(diǎn)是head說明他是隊(duì)列中第一個“有效的”節(jié)點(diǎn)郁竟,因此嘗試獲取,上文中有提到這個類是交給子類去擴(kuò)展的棚亩。
setHead(node);//成功后,將上圖中的黃色節(jié)點(diǎn)移除讥蟆,Node1變成頭節(jié)點(diǎn)勒虾。
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
//否則瘸彤,檢查前一個節(jié)點(diǎn)的狀態(tài)為,看當(dāng)前獲取鎖失敗的線程是否需要掛起钧栖。
parkAndCheckInterrupt())
//如果需要拯杠,借助JUC包下的LockSopport類的靜態(tài)方法Park掛起當(dāng)前線程,直到被喚醒潭陪。
interrupted = true;
}
} finally {
if (failed) //如果有異常
cancelAcquire(node);// 取消請求,對應(yīng)到隊(duì)列操作老厌,就是將當(dāng)前節(jié)點(diǎn)從隊(duì)列中移除黎炉。
}
}
/**
* Checks and updates status for a node that failed to acquire.
* Returns true if thread should block. This is the main signal
* control in all acquire loops. Requires that pred == node.prev.
*
* @param pred node's predecessor holding status
* @param node the node
* @return {@code true} if thread should block
*/
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
return true;
if (ws > 0) {
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
/**
* Convenience method to park and then check if interrupted
*
* @return {@code true} if interrupted
*/
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
這塊代碼有幾點(diǎn)需要說明:
1. Node節(jié)點(diǎn)中慷嗜,除了存儲當(dāng)前線程丹壕,節(jié)點(diǎn)類型菌赖,隊(duì)列中前后元素的變量沐序,還有一個叫waitStatus的變量策幼,改變量用于描述節(jié)點(diǎn)的狀態(tài),為什么需要這個狀態(tài)呢刁愿?
原因是:AQS的隊(duì)列中,在有并發(fā)時觉壶,肯定會存取一定數(shù)量的節(jié)點(diǎn)件缸,每個節(jié)點(diǎn)代表了一個線程的狀態(tài)他炊,有的線程可能“等不及”獲取鎖了,需要放棄競爭蚕苇,退出隊(duì)列涩笤,有的線程在等待一些條件滿足盒件,滿足后才恢復(fù)執(zhí)行(這里的描述很像某個J.U.C包下的工具類炒刁,ReentrankLock的Condition,事實(shí)上罗心,Condition同樣也是AQS的子類)等等,總之俏脊,各個線程有各個線程的狀態(tài)爷贫,但總需要一個變量來描述它补憾,這個變量就叫waitStatus,它有四種狀態(tài):
static final class Node {
/** Marker to indicate a node is waiting in shared mode */
static final Node SHARED = new Node();
/** Marker to indicate a node is waiting in exclusive mode */
static final Node EXCLUSIVE = null;
/** waitStatus value to indicate thread has cancelled */
static final int CANCELLED = 1;
/** waitStatus value to indicate successor's thread needs unparking */
static final int SIGNAL = -1;
/** waitStatus value to indicate thread is waiting on condition */
static final int CONDITION = -2;
/**
* waitStatus value to indicate the next acquireShared should
* unconditionally propagate
*/
static final int PROPAGATE = -3;
/**
* Status field, taking on only the values:
* SIGNAL: The successor of this node is (or will soon be)
* blocked (via park), so the current node must
* unpark its successor when it releases or
* cancels. To avoid races, acquire methods must
* first indicate they need a signal,
* then retry the atomic acquire, and then,
* on failure, block.
* CANCELLED: This node is cancelled due to timeout or interrupt.
* Nodes never leave this state. In particular,
* a thread with cancelled node never again blocks.
* CONDITION: This node is currently on a condition queue.
* It will not be used as a sync queue node
* until transferred, at which time the status
* will be set to 0. (Use of this value here has
* nothing to do with the other uses of the
* field, but simplifies mechanics.)
* PROPAGATE: A releaseShared should be propagated to other
* nodes. This is set (for head node only) in
* doReleaseShared to ensure propagation
* continues, even if other operations have
* since intervened.
* 0: None of the above
*
* The values are arranged numerically to simplify use.
* Non-negative values mean that a node doesn't need to
* signal. So, most code doesn't need to check for particular
* values, just for sign.
*
* The field is initialized to 0 for normal sync nodes, and
* CONDITION for condition nodes. It is modified using CAS
* (or when possible, unconditional volatile writes).
*/
volatile int waitStatus;
volatile Node prev;
volatile Node next;
volatile Thread thread;
Node nextWaiter;
// 節(jié)點(diǎn)含有等待隊(duì)列的頭尾指針
/**
* Head of the wait queue, lazily initialized. Except for
* initialization, it is modified only via method setHead. Note:
* If head exists, its waitStatus is guaranteed not to be
* CANCELLED.
*/
private transient volatile Node head;
/**
* Tail of the wait queue, lazily initialized. Modified only via
* method enq to add new wait node.
*/
private transient volatile Node tail;
/**
* The synchronization state.
*/·
private volatile int state;
分別表示:
- 節(jié)點(diǎn)取消 CANCELLED
- 節(jié)點(diǎn)等待觸發(fā) SIGNAL
- 節(jié)點(diǎn)等待條件 CONDITION
- 節(jié)點(diǎn)狀態(tài)需要向后傳播腾务。 PROPAGATE
只有當(dāng)前節(jié)點(diǎn)的前一個節(jié)點(diǎn)為SIGNAL時削饵,才能當(dāng)前節(jié)點(diǎn)才能被掛起。
- 對線程的掛起及喚醒操作是通過使用UNSAFE類調(diào)用JNI方法實(shí)現(xiàn)的启昧。當(dāng)然密末,還提供了掛起指定時間后喚醒的API跛璧,在后面我們會講到追城。
到此為止,一個線程對于鎖的一次競爭才告于段落教硫,結(jié)果有兩種辆布,要么成功獲取到鎖(不用進(jìn)入到AQS隊(duì)列中)锋玲,要么,獲取失敗伞插,被掛起,等待下次喚醒后繼續(xù)循環(huán)嘗試獲取鎖舀瓢,值得注意的是耗美,AQS的隊(duì)列為FIFO隊(duì)列商架,所以,每次被CPU假喚醒备图,且當(dāng)前線程不是出在頭節(jié)點(diǎn)的位置揽涮,也是會被掛起的弃鸦。AQS通過這樣的方式唬格,實(shí)現(xiàn)了競爭的排隊(duì)策略颜说。
看完了獲取鎖门粪,在看看釋放鎖,具體看代碼之前乾吻,我們可以先繼續(xù)猜下拟蜻,釋放操作需要做哪些事情:
因?yàn)楂@取鎖的線程的節(jié)點(diǎn)酝锅,此時在AQS的頭節(jié)點(diǎn)位置,所以爸舒,可能需要將頭節(jié)點(diǎn)移除扭勉。
而應(yīng)該是直接釋放鎖,然后找到AQS的頭節(jié)點(diǎn)忠聚,通知它可以來競爭鎖了咒林。
是不是這樣呢?我們繼續(xù)來看下爷光,同樣我們用ReentrantLock的FairSync來說明
/**
* Attempts to release this lock.
*
* <p>If the current thread is the holder of this lock then the hold
* count is decremented. If the hold count is now zero then the lock
* is released. If the current thread is not the holder of this
* lock then {@link IllegalMonitorStateException} is thrown.
*
* @throws IllegalMonitorStateException if the current thread does not
* hold this lock
*/
public void unlock() {
sync.release(1);
}
/**
* Releases in exclusive mode. Implemented by unblocking one or
* more threads if {@link #tryRelease} returns true.
* This method can be used to implement method {@link Lock#unlock}.
*
* @param arg the release argument. This value is conveyed to
* {@link #tryRelease} but is otherwise uninterpreted and
* can represent anything you like.
* @return the value returned from {@link #tryRelease}
*/
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
unlock方法調(diào)用了AQS的release方法蛀序,同樣傳入了參數(shù)1,和獲取鎖的相應(yīng)對應(yīng)遣鼓,獲取一個鎖骑祟,標(biāo)示為+1气笙,釋放一個鎖潜圃,標(biāo)志位-1放坏。
同樣抄瓦,release為空方法饵隙,子類自己實(shí)現(xiàn)邏輯:
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
釋放鎖鸳劳,成功后,找到AQS的頭節(jié)點(diǎn)涵紊,并喚醒它即可:
/**
* Wakes up node's successor, if one exists.
*
* @param node the node
*/
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
值得注意的是摸柄,尋找的順序是從隊(duì)列尾部開始往前去找的最前面的一個waitStatus小于0的節(jié)點(diǎn)驱负。
到此,ReentrantLock的lock和unlock方法已經(jīng)基本解析完畢了宇挫,唯獨(dú)還剩下一個非公平鎖NonfairSync沒說器瘪,其實(shí)绘雁,它和公平鎖的唯一區(qū)別就是獲取鎖的方式不同庐舟,一個是按前后順序一次獲取鎖,一個是搶占式的獲取鎖历帚,那ReentrantLock是怎么實(shí)現(xiàn)的呢抹缕?再看兩段代碼:
/**
* Sync object for non-fair locks
*/
static final class NonfairSync extends Sync {
private static final long serialVersionUID = 7316153563782823691L;
/**
* Performs lock. Try immediate barge, backing up to normal
* acquire on failure.
*/
final void lock() {
// 非公平鎖 直接進(jìn)行一次搶的計(jì)算
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
// 沒搶到 則走隊(duì)列
acquire(1);
}
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
}
/**
* Performs non-fair tryLock. tryAcquire is implemented in
* subclasses, but both need nonfair try for trylock method.
*/
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
小結(jié)
總的來說,思路其實(shí)并不復(fù)雜睹簇,還是使用的標(biāo)志位+隊(duì)列的方式寥闪,記錄獲取鎖疲憋、競爭鎖、釋放鎖等一系列鎖的狀態(tài)埃脏,或許用更準(zhǔn)確一點(diǎn)的描述的話彩掐,應(yīng)該是使用的標(biāo)志位+隊(duì)列的方式,記錄鎖狗超、競爭朴下、釋放等一系列獨(dú)占的狀態(tài)殴胧,因?yàn)檎驹贏QS的層面state可以表示鎖溃肪,也可以表示其他狀態(tài),它并不關(guān)心它的子類把它變成一個什么工具類羔沙,而只是提供了一套維護(hù)一個獨(dú)占狀態(tài)扼雏。甚至夯膀,最準(zhǔn)確的是AQS只是維護(hù)了一個狀態(tài)诱建,因?yàn)椋瑒e忘了茎匠,它還有一套共享狀態(tài)的API诵冒,所以谊惭,AQS只是維護(hù)一個狀態(tài),一個控制各個線程何時可以訪問的狀態(tài)悄雅,它只對狀態(tài)負(fù)責(zé)告组,而這個狀態(tài)表示什么含義木缝,由子類自己去定義。
AbstractQueuedSynchronizer暴露的API
分三個方面:
- 一:獨(dú)占鎖 與共享鎖
- 二:不響應(yīng)中斷 與 響應(yīng)中斷
- 三:aquire 與 release
我們知道放案,AQS僅僅只是提供獨(dú)占鎖和共享鎖兩種方式吱殉,但是每種方式都有響應(yīng)中斷和不響應(yīng)中斷的區(qū)別友雳,所以說AQS鎖的更細(xì)粒度的劃分為:
- 不響應(yīng)中斷的獨(dú)占鎖(acquire)
- 響應(yīng)中斷的獨(dú)占鎖(acquireInterruptibly)
- 不響應(yīng)中斷的共享鎖(acquireShared)
- 響應(yīng)中斷的共享鎖(acquireSharedInterruptibly)
這四種方式在AQS中的入口在上面已經(jīng)標(biāo)注铅匹,而釋放鎖的方式只有兩種包斑,獨(dú)占鎖的釋放與共享鎖的釋放罗丰。分別為:
- 獨(dú)占鎖的釋放(release)
- 共享鎖的釋放(releaseShared)
因?yàn)锳QS是基于模板模式的設(shè)計(jì),可想而知找御,上面的方法萎坷,子類不應(yīng)該去覆蓋沐兰,因?yàn)檫@些方法定義了整體流程住闯,事實(shí)上作者也阻止你去覆蓋它比原,因?yàn)檫@些方法都是final的。
在上面所有的方法中雇寇,都調(diào)用了與之相對應(yīng)的try方法锨侯。在這里需要注意的一點(diǎn)是冬殃,acquire和acquireInterruptibly在AQS中調(diào)用的是同一個try方法审葬,acquireShared和acquireSharedInterruptibly也是調(diào)用相同的try方法涣觉,并且try方法在AQS中都提供了空實(shí)現(xiàn)官册。
也就是說,作者暗示著子類應(yīng)該去重寫這些try方法皂贩,至于如何去重寫try方法明刷,完全是子類的自由满粗。
例如: ReentrantLock是一個典型的獨(dú)占鎖映皆,它提供了對try方法的實(shí)現(xiàn)捅彻,并且提供了兩種實(shí)現(xiàn)方式步淹。這兩種不同的try方式诚撵,就衍生出了公平與非公平的概念寿烟。即ReentrantLock提供如下:
- 非公平方式的不響應(yīng)中斷的獨(dú)占鎖
- 非公平方式的響應(yīng)中斷的獨(dú)占鎖
- 公平方式的不響應(yīng)中斷的獨(dú)占鎖
- 公平方式的響應(yīng)中斷的獨(dú)占鎖
CLH 存儲線程隊(duì)列 + 競爭資源的Status
AQS的基本數(shù)據(jù)結(jié)構(gòu)為Node筛武,關(guān)于Node徘六,JDK作者寫了詳細(xì)的注釋勃蜘,這里我大致總結(jié)幾點(diǎn):
- 1.1 AbstractQueuedSynchronizer的等待隊(duì)列是CLH隊(duì)列的變種缭贡,CLH隊(duì)列通常用于自旋鎖阳惹,AQS中的CLH可以簡單的理解為“等待鎖的線程隊(duì)列”莹汤。
- 1.2 每個節(jié)點(diǎn)中持有一個名為"status"的字段用于一條線程是否應(yīng)當(dāng)阻塞的追蹤。
- 1.3 一條線程所在節(jié)點(diǎn)如果它處于隊(duì)列頭的下一個節(jié)點(diǎn)抹竹,那么它會嘗試去acquire窃判。因?yàn)轭^節(jié)點(diǎn)是一個dummy(虛擬的袄琳、假的)節(jié)點(diǎn)唆樊,也就是說頭節(jié)點(diǎn)不持有任何線程刻蟹。所以舆瘪,一條線程所在節(jié)點(diǎn)如果它處于隊(duì)列頭節(jié)點(diǎn)的下一個節(jié)點(diǎn),那么他會嘗試去acquire,但是并不保證成功舌缤。
- 1.4 要進(jìn)入隊(duì)列国撵,你只需要自動將它拼接在隊(duì)列尾部即可玻墅;如果獲取了鎖澳厢,你只需要設(shè)置header字段即可剩拢。
CLH鎖
CLH(Craig, Landin, and Hagersten locks): 是一個自旋鎖,能確保無饑餓性贯钩,提供先來先服務(wù)的公平性角雷。
CLH鎖也是一種基于鏈表的可擴(kuò)展勺三、高性能、公平的自旋鎖季二,申請線程只在本地變量上自旋檩咱,它不斷輪詢前驅(qū)的狀態(tài),如果發(fā)現(xiàn)前驅(qū)釋放了鎖就結(jié)束自旋胯舷。
- CLH隊(duì)列中的結(jié)點(diǎn)QNode中含有一個locked字段刻蚯,該字段若為true表示該線程需要獲取鎖,且不釋放鎖桑嘶,為false表示線程釋放了鎖炊汹。結(jié)點(diǎn)之間是通過隱形的鏈表相連逃顶,之所以叫隱形的鏈表是因?yàn)檫@些結(jié)點(diǎn)之間沒有明顯的next指針讨便,而是通過myPred所指向的結(jié)點(diǎn)的變化情況來影響myNode的行為充甚。CLHLock上還有一個尾指針,始終指向隊(duì)列的最后一個結(jié)點(diǎn)霸褒。CLHLock的類圖如下所示:
當(dāng)一個線程需要獲取鎖時:
1.創(chuàng)建一個的QNode伴找,將其中的locked設(shè)置為true表示需要獲取鎖
2.線程對tail域調(diào)用getAndSet方法,使自己成為隊(duì)列的尾部废菱,同時獲取一個指向其前趨結(jié)點(diǎn)的引用myPred
3.該線程就在前趨結(jié)點(diǎn)的locked字段上旋轉(zhuǎn)技矮,直到前趨結(jié)點(diǎn)釋放鎖
4.當(dāng)一個線程需要釋放鎖時,將當(dāng)前結(jié)點(diǎn)的locked域設(shè)置為false殊轴,同時回收前趨結(jié)點(diǎn)
如下圖衰倦,線程A需要獲取鎖,其myNode域?yàn)閠rue旁理,tail指向線程A的結(jié)點(diǎn)樊零,然后線程B也加入到線程A后面,tail指向線程B的結(jié)點(diǎn)孽文。然后線程A和B都在其myPred域上旋轉(zhuǎn)驻襟,一旦它的myPred結(jié)點(diǎn)的locked字段變?yōu)閒alse,它就可以獲取鎖叛溢。明顯線程A的myPred locked域?yàn)閒alse塑悼,此時線程A獲取到了鎖。
整個CLH的代碼如下楷掉,其中用到了ThreadLocal類厢蒜,將QNode綁定到每一個線程上,同時用到了AtomicReference,對尾指針的修改正是調(diào)用它的getAndSet()操作來實(shí)現(xiàn)的烹植,它能夠保證以原子方式更新對象引用斑鸦。
public class CLHLock {
AtomicReference<QNode> tail = new AtomicReference<QNode>(new QNode());
ThreadLocal<QNode> myPred;
ThreadLocal<QNode> myNode;
public static class QNode {
//注意這個地方 如果不加volatile則會導(dǎo)致線程永遠(yuǎn)死循環(huán)
//關(guān)于volatile的用法在我的另外一篇文章 http://www.cnblogs.com/daxin/p/3364014.html
public volatile boolean locked = false;
}
public CLHLock() {
myNode = new ThreadLocal<QNode>() {
protected QNode initialValue() {
return new QNode();
}
};
myPred = new ThreadLocal<QNode>() {
protected QNode initialValue() {
return null;
}
};
}
public void lock() {
QNode qnode = myNode.get();
qnode.locked = true;
QNode pred = tail.getAndSet(qnode);
myPred.set(pred);
while (pred.locked) {
//非阻塞算法
}
}
public void unlock() {
QNode qnode = myNode.get();
qnode.locked = false;
myNode.set(myPred.get());
}
}
CLH同步隊(duì)列
在CLH同步隊(duì)列中,一個節(jié)點(diǎn)表示一個線程草雕,它保存著線程的引用(thread)巷屿、狀態(tài)(waitStatus)、前驅(qū)節(jié)點(diǎn)(prev)墩虹、后繼節(jié)點(diǎn)(next)
CLH同步隊(duì)列結(jié)構(gòu)圖如下:
- 入列
學(xué)了數(shù)據(jù)結(jié)構(gòu)的我們嘱巾,CLH隊(duì)列入列是再簡單不過了,無非就是tail指向新節(jié)點(diǎn)诫钓、新節(jié)點(diǎn)的prev指向當(dāng)前最后的節(jié)點(diǎn)旬昭,當(dāng)前最后一個節(jié)點(diǎn)的next指向當(dāng)前節(jié)點(diǎn)。代碼我們可以看看addWaiter(Node node)方法:
- 出列
CLH同步隊(duì)列遵循FIFO菌湃,首節(jié)點(diǎn)的線程釋放同步狀態(tài)后问拘,將會喚醒它的后繼節(jié)點(diǎn)(next),而后繼節(jié)點(diǎn)將會在獲取同步狀態(tài)成功時將自己設(shè)置為首節(jié)點(diǎn),這個過程非常簡單骤坐,head執(zhí)行該節(jié)點(diǎn)并斷開原首節(jié)點(diǎn)的next和當(dāng)前節(jié)點(diǎn)的prev即可绪杏,注意在這個過程是不需要使用CAS來保證的,因?yàn)橹挥幸粋€線程能夠成功獲取到同步狀態(tài)纽绍。
- 整體示意圖
AQS共享功能的實(shí)現(xiàn)
以CountDownLatch為例蕾久,CountDownLatch常被用在多線程環(huán)境下,它在初始時需要指定一個計(jì)數(shù)器的大小拌夏,然后可被多個線程并發(fā)的實(shí)現(xiàn)減1操作腔彰,并在計(jì)數(shù)器為0后調(diào)用await方法的線程被喚醒,從而實(shí)現(xiàn)多線程間的協(xié)作辖佣。它在多線程環(huán)境下的基本使用方式為:D Lea在源碼注釋里已經(jīng)給了兩個經(jīng)典的應(yīng)用例子:
* <p><b>Sample usage:</b> Here is a pair of classes in which a group
* of worker threads use two countdown latches:
* <ul>
* <li>The first is a start signal that prevents any worker from proceeding
* until the driver is ready for them to proceed;
* <li>The second is a completion signal that allows the driver to wait
* until all workers have completed.
* </ul>
*
* <pre> {@code
* class Driver { // ...
* void main() throws InterruptedException {
* CountDownLatch startSignal = new CountDownLatch(1);
* CountDownLatch doneSignal = new CountDownLatch(N);
*
* for (int i = 0; i < N; ++i) // create and start threads
* new Thread(new Worker(startSignal, doneSignal)).start();
*
* doSomethingElse(); // don't let run yet
* startSignal.countDown(); // let all threads proceed
* doSomethingElse();
* doneSignal.await(); // wait for all to finish
* }
* }
*
* class Worker implements Runnable {
* private final CountDownLatch startSignal;
* private final CountDownLatch doneSignal;
* Worker(CountDownLatch startSignal, CountDownLatch doneSignal) {
* this.startSignal = startSignal;
* this.doneSignal = doneSignal;
* }
* public void run() {
* try {
* startSignal.await();
* doWork();
* doneSignal.countDown();
* } catch (InterruptedException ex) {} // return;
* }
*
* void doWork() { ... }
* }}</pre>
*
* <p>Another typical usage would be to divide a problem into N parts,
* describe each part with a Runnable that executes that portion and
* counts down on the latch, and queue all the Runnables to an
* Executor. When all sub-parts are complete, the coordinating thread
* will be able to pass through await. (When threads must repeatedly
* count down in this way, instead use a {@link CyclicBarrier}.)
*
* <pre> {@code
* class Driver2 { // ...
* void main() throws InterruptedException {
* CountDownLatch doneSignal = new CountDownLatch(N);
* Executor e = ...
*
* for (int i = 0; i < N; ++i) // create and start threads
* e.execute(new WorkerRunnable(doneSignal, i));
*
* doneSignal.await(); // wait for all to finish
* }
* }
*
* class WorkerRunnable implements Runnable {
* private final CountDownLatch doneSignal;
* private final int i;
* WorkerRunnable(CountDownLatch doneSignal, int i) {
* this.doneSignal = doneSignal;
* this.i = i;
* }
* public void run() {
* try {
* doWork(i);
* doneSignal.countDown();
* } catch (InterruptedException ex) {} // return;
* }
*
* void doWork() { ... }
* }}</pre>
再看一下CountDownlatch源碼,代碼不多
public class CountDownLatch {
/**
* Synchronization control For CountDownLatch.
* Uses AQS state to represent count.
*/
// 我們可以繼承AbstractQueuedSynchronizer來實(shí)現(xiàn)自己線程鎖
private static final class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 4982264981922014374L;
Sync(int count) {
// 重入鎖里是從1開始增或減搓逾,這里直接設(shè)置計(jì)數(shù)卷谈!
setState(count);
}
int getCount() {
return getState();
}
protected int tryAcquireShared(int acquires) {
// 獲取共享鎖狀態(tài)
return (getState() == 0) ? 1 : -1;
}
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c-1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
}
private final Sync sync;
// 直接構(gòu)造了帶計(jì)數(shù)的共享標(biāo)志sync
public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
this.sync = new Sync(count);
}
// 直接調(diào)用了AQS的acquireSharedInterruptibly
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
public boolean await(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}
// 計(jì)數(shù)標(biāo)志 減少
public void countDown() {
sync.releaseShared(1);
}
public long getCount() {
return sync.getCount();
}
public String toString() {
return super.toString() + "[Count = " + sync.getCount() + "]";
}
}
讓多線程等待
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
// 支持線程中斷
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
從方法名上看,這個方法的調(diào)用是響應(yīng)線程的打斷的霞篡,所以在前兩行會檢查下線程是否被打斷世蔗。接著,嘗試著獲取共享鎖朗兵,小于0污淋,表示獲取失敗,通過本系列的上半部分的解讀余掖, 我們知道AQS在獲取鎖的思路是寸爆,先嘗試直接獲取鎖,如果失敗會將當(dāng)前線程放在隊(duì)列中盐欺,按照FIFO的原則等待鎖赁豆。而對于共享鎖也是這個思路,如果和獨(dú)占鎖一致冗美,這里的tryAcquireShared應(yīng)該是個空方法魔种,留給子類去判斷:
再看看CountDownLatch:
protected int tryAcquireShared(int acquires) {
// 0的話標(biāo)志鎖可以獲取,已經(jīng)減為0了粉洼,否則不成功
return (getState() == 0) ? 1 : -1;
}
如果state變成0了节预,則返回1,表示獲取成功属韧,否則返回-1則表示獲取失敗安拟。
看到這里,讀者可能會發(fā)現(xiàn)挫剑, await方法的獲取方式更像是在獲取一個獨(dú)占鎖去扣,那為什么這里還會用tryAcquireShared呢?
CountDownLatch的await方法是不是只能在主線程中調(diào)用?
答案是否定的愉棱,CountDownLatch的await方法可以在多個線程中調(diào)用
當(dāng)CountDownLatch的計(jì)數(shù)器為0后唆铐,調(diào)用await的方法都會依次返回。 也就是說可以多個線程同時在等待await方法返回奔滑,所以它被設(shè)計(jì)成了實(shí)現(xiàn)tryAcquireShared方法艾岂,獲取的是一個共享鎖,鎖在所有調(diào)用await方法的線程間共享朋其,所以叫共享鎖王浴。
如果獲取共享鎖失敗(返回了-1梅猿,說明state不為0氓辣,也就是CountDownLatch的計(jì)數(shù)器還不為0),進(jìn)入調(diào)用doAcquireSharedInterruptibly方法中袱蚓,按照我們上述的猜想钞啸,應(yīng)該是要將當(dāng)前線程放入到隊(duì)列中去。
在這之前喇潘,我們再回顧一下AQS隊(duì)列的數(shù)據(jù)結(jié)構(gòu):AQS是一個雙向鏈表体斩,通過節(jié)點(diǎn)中的next,pre變量分別指向當(dāng)前節(jié)點(diǎn)后一個節(jié)點(diǎn)和前一個節(jié)點(diǎn)颖低。其中絮吵,每個節(jié)點(diǎn)中都包含了一個線程和一個類型變量:表示當(dāng)前節(jié)點(diǎn)是獨(dú)占節(jié)點(diǎn)還是共享節(jié)點(diǎn),頭節(jié)點(diǎn)中的線程為正在占有鎖的線程忱屑,而后的所有節(jié)點(diǎn)的線程表示為正在等待獲取鎖的線程蹬敲。
進(jìn)入等待:
/**
* Acquires in shared interruptible mode.
* @param arg the acquire argument
*/
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
// 插入等待隊(duì)列
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
// 如果是頭(可以理解為leader),則進(jìn)行一次waitStatus判斷
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
// 設(shè)置頭 并喚醒之后可能等待的線程
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
其中讓線程等待的代碼:
/**
* Checks and updates status for a node that failed to acquire.
* Returns true if thread should block. This is the main signal
* control in all acquire loops. Requires that pred == node.prev.
*
* @param pred node's predecessor holding status
* @param node the node
* @return {@code true} if thread should block
*/
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
// 是通過前一個節(jié)點(diǎn)的狀態(tài)進(jìn)行判斷的想幻,如果前一個節(jié)點(diǎn)已經(jīng)是等待信號的狀態(tài)粱栖,則當(dāng)前節(jié)點(diǎn)可以放心等待
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
return true;
if (ws > 0) {
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
/**
* Convenience method to park and then check if interrupted
*
* @return {@code true} if interrupted
*/
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
這里有幾點(diǎn)需要說明的:
- setHeadAndPropagate方法:
/**
* Sets head of queue, and checks if successor may be waiting
* in shared mode, if so propagating if either propagate > 0 or
* PROPAGATE status was set.
*
* @param node the node
* @param propagate the return value from a tryAcquireShared
*/
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head; // Record old head for check below
setHead(node);
/*
* Try to signal next queued node if:
* Propagation was indicated by caller,
* or was recorded (as h.waitStatus either before
* or after setHead) by a previous operation
* (note: this uses sign-check of waitStatus because
* PROPAGATE status may transition to SIGNAL.)
* and
* The next node is waiting in shared mode,
* or we don't know, because it appears null
*
* The conservatism in both of these checks may cause
* unnecessary wake-ups, but only when there are multiple
* racing acquires/releases, so most need signals now or soon
* anyway.
*/
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
if (s == null || s.isShared())
doReleaseShared();
}
}
/**
* Release action for shared mode -- signals successor and ensures
* propagation. (Note: For exclusive mode, release just amounts
* to calling unparkSuccessor of head if it needs signal.)
*/
private void doReleaseShared() {
/*
* Ensure that a release propagates, even if there are other
* in-progress acquires/releases. This proceeds in the usual
* way of trying to unparkSuccessor of head if it needs
* signal. But if it does not, status is set to PROPAGATE to
* ensure that upon release, propagation continues.
* Additionally, we must loop in case a new node is added
* while we are doing this. Also, unlike other uses of
* unparkSuccessor, we need to know if CAS to reset status
* fails, if so rechecking.
*/
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
//如果當(dāng)前節(jié)點(diǎn)是SIGNAL意味著,它正在等待一個信號脏毯,
//或者說闹究,它在等待被喚醒,因此做兩件事食店,1是重置waitStatus標(biāo)志位渣淤,2是重置成功后,喚醒下一個節(jié)點(diǎn)。
continue; // loop to recheck cases
unparkSuccessor(h);
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
//如果本身頭節(jié)點(diǎn)的waitStatus是出于重置狀態(tài)(waitStatus==0)的吉嫩,將其設(shè)置為“傳播”狀態(tài)价认。
//意味著需要將狀態(tài)向后一個節(jié)點(diǎn)傳播。
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}
為什么要這么做呢自娩?這就是共享功能和獨(dú)占功能最不一樣的地方用踩,對于獨(dú)占功能來說,有且只有一個線程(通常只對應(yīng)一個節(jié)點(diǎn),拿ReentantLock舉例脐彩,如果當(dāng)前持有鎖的線程重復(fù)調(diào)用lock()方法碎乃,那根據(jù)本系列上半部分我們的介紹,我們知道惠奸,會被包裝成多個節(jié)點(diǎn)在AQS的隊(duì)列中梅誓,所以用一個線程來描述更準(zhǔn)確),能夠獲取鎖佛南,但是對于共享功能來說梗掰。
共享的狀態(tài)是可以被共享的,也就是意味著其他AQS隊(duì)列中的其他節(jié)點(diǎn)也應(yīng)能第一時間知道狀態(tài)的變化嗅回。因此及穗,一個節(jié)點(diǎn)獲取到共享狀態(tài)流程圖是這樣的:
比如現(xiàn)在有如下隊(duì)列:
當(dāng)Node1調(diào)用tryAcquireShared成功后,更換了頭節(jié)點(diǎn):
Node1變成了頭節(jié)點(diǎn)然后調(diào)用unparkSuccessor()方法喚醒了Node2绵载、Node2中持有的線程A出于上面流程圖的park node的位置拥坛,
線程A被喚醒后,重復(fù)黃色線條的流程尘分,重新檢查調(diào)用tryAcquireShared方法,看能否成功丸氛,如果成功培愁,則又更改頭節(jié)點(diǎn),重復(fù)以上步驟缓窜,以實(shí)現(xiàn)節(jié)點(diǎn)自身獲取共享鎖成功后定续,喚醒下一個共享類型節(jié)點(diǎn)的操作,實(shí)現(xiàn)共享狀態(tài)的向后傳遞禾锤。
2.其實(shí)對于doAcquireShared方法私股,AQS還提供了集中類似的實(shí)現(xiàn):
分別對應(yīng)了:
- 帶參數(shù)請求共享鎖。 (忽略中斷)
- 帶參數(shù)請求共享鎖恩掷,且響應(yīng)中斷倡鲸。(每次循環(huán)時,會檢查當(dāng)前線程的中斷狀態(tài)黄娘,以實(shí)現(xiàn)對線程中斷的響應(yīng))
- 帶參數(shù)請求共享鎖但是限制等待時間峭状。(第二個參數(shù)設(shè)置超時時間,超出時間后逼争,方法返回优床。)
比較特別的為最后一個doAcquireSharedNanos方法,我們一起看下它怎么實(shí)現(xiàn)超時時間的控制的誓焦。
因?yàn)樵摲椒ê推溆喃@取共享鎖的方法邏輯是類似的胆敞,我用紅色框圈出了它所不一樣的地方,也就是實(shí)現(xiàn)超時時間控制的地方。
private boolean doAcquireSharedNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (nanosTimeout <= 0L)
return false;
final long deadline = System.nanoTime() + nanosTimeout;
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return true;
}
}
nanosTimeout = deadline - System.nanoTime();
if (nanosTimeout <= 0L)
return false;
if (shouldParkAfterFailedAcquire(p, node) &&
nanosTimeout > spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanosTimeout);
if (Thread.interrupted())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
可以看到移层,其實(shí)就是在進(jìn)入方法時仍翰,計(jì)算出了一個“deadline”,每次循環(huán)的時候用當(dāng)前時間和“deadline”比較幽钢,大于“dealine”說明超時時間已到歉备,直接返回方法。
注意這行代碼:
nanosTimeout > spinForTimeoutThreshold
從變量的字面意思可知匪燕,這是拿超時時間和超時自旋的最小作比較蕾羊,在這里Doug Lea把超時自旋的閾值設(shè)置成了1000ns,即只有超時時間大于1000ns才會去掛起線程,否則帽驯,再次循環(huán)龟再,以實(shí)現(xiàn)“自旋”操作。這是“自旋”在AQS中的應(yīng)用之處尼变。
看完await方法利凑,我們再來看下countDown()方法:
public void countDown() {
sync.releaseShared(1);
}
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
// 其中CountDownLatch 的
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {
int c = getState();
// 如果早已是0 則已經(jīng)喚醒過
if (c == 0)
return false;
int nextc = c-1;
if (compareAndSetState(c, nextc))
// 是通過更新計(jì)數(shù) 觸發(fā)到0
return nextc == 0;
}
}
死循環(huán)更新state的值,實(shí)現(xiàn)state的減1操作嫌术,之所以用死循環(huán)是為了確保state值的更新成功哀澈。
從上文的分析中可知,如果state的值為0度气,在CountDownLatch中意味:所有的子線程已經(jīng)執(zhí)行完畢割按,這個時候可以喚醒調(diào)用await()方法的線程了,而這些線程正在AQS的隊(duì)列中磷籍,并被掛起的适荣,
所以下一步應(yīng)該去喚醒AQS隊(duì)列中的頭節(jié)點(diǎn)了(AQS的隊(duì)列為FIFO隊(duì)列),然后由頭節(jié)點(diǎn)去依次喚醒AQS隊(duì)列中的其他共享節(jié)點(diǎn)院领。
如果tryReleaseShared返回true,進(jìn)入doReleaseShared()方法:
private void doReleaseShared() {
/*
* Ensure that a release propagates, even if there are other
* in-progress acquires/releases. This proceeds in the usual
* way of trying to unparkSuccessor of head if it needs
* signal.
* ===But if it does not, status is set to PROPAGATE to
* ===ensure that upon release, propagation continues.
* ===Additionally, we must loop in case a new node is added
* ===while we are doing this. Also, unlike other uses of
* ===unparkSuccessor, we need to know if CAS to reset status
* ===fails, if so rechecking.
*/
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h);
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
// 只有到了頭結(jié)點(diǎn)弛矛,跳出循環(huán)
if (h == head) // loop if head changed
break;
}
}
/**
* Wakes up node's successor, if one exists.
*
* @param node the node
*/
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
這里有幾點(diǎn)值得注意:
- 與AQS的獨(dú)占功能一樣,共享鎖是否可以被獲取的判斷為空方法比然,交由子類去實(shí)現(xiàn)丈氓。
- 與AQS的獨(dú)占功能不同,當(dāng)鎖被頭節(jié)點(diǎn)獲取后强法,獨(dú)占功能是只有頭節(jié)點(diǎn)獲取鎖扒寄,其余節(jié)點(diǎn)的線程繼續(xù)沉睡,等待鎖被釋放后拟烫,才會喚醒下一個節(jié)點(diǎn)的線程该编,而共享功能是只要頭節(jié)點(diǎn)獲取鎖成功,就在喚醒自身節(jié)點(diǎn)對應(yīng)的線程的同時硕淑,繼續(xù)喚醒AQS隊(duì)列中的下一個節(jié)點(diǎn)的線程课竣,每個節(jié)點(diǎn)在喚醒自身的同時還會喚醒下一個節(jié)點(diǎn)對應(yīng)的線程嘉赎,以實(shí)現(xiàn)共享狀態(tài)的“向后傳播”,從而實(shí)現(xiàn)共享功能于樟。
Ref:
不得不說InfoQ上的文字質(zhì)量真不錯9酢!
http://www.infoq.com/cn/articles/jdk1.8-abstractqueuedsynchronizer
http://www.infoq.com/cn/articles/java8-abstractqueuedsynchronizer
http://blog.csdn.net/pfnie/article/details/77599618
http://blog.csdn.net/chenssy/article/details/60479594