節(jié)點(diǎn)刪除原文鏈接:http://www.reibang.com/p/df0d7d6571de
參考資料:https://www.cnblogs.com/sheeva/p/6472949.html
引言
AbstractQueuedSynchronizer,隊(duì)列同步器辱士,簡稱AQS泪掀,它是java并發(fā)用來構(gòu)建鎖或者其他同步組件的基礎(chǔ)框架。
一般使用AQS的主要方式是繼承颂碘,子類通過實(shí)現(xiàn)它提供的抽象方法來管理同步狀態(tài)异赫,主要管理的方式是通過tryAcquire和tryRelease類似的方法來操作狀態(tài),同時,AQS提供以下線程安全的方法來對狀態(tài)進(jìn)行操作:
protected final int getState();
protected final void setState(int newState);
protected final boolean compareAndSetState(intexpect,intupdate);
AQS本身是沒有實(shí)現(xiàn)任何同步接口的祝辣,它僅僅只是定義了同步狀態(tài)的獲取和釋放的方法來供自定義的同步組件的使用贴妻。
注:AQS主要是怎么使用的呢?
在java的同步組件中蝙斜,AQS的子類一般是同步組件的靜態(tài)內(nèi)部類名惩。
AQS是實(shí)現(xiàn)同步組件的關(guān)鍵,它倆的關(guān)系可以這樣描述:同步組件是面向使用者的孕荠,它定義了使用者與組件交互的接口娩鹉,隱藏了具體的實(shí)現(xiàn)細(xì)節(jié);而AQS面向的是同步組件的實(shí)現(xiàn)者稚伍,它簡化了具體的實(shí)現(xiàn)方式弯予,屏蔽了線程切換相關(guān)底層操作,它們倆一起很好的對使用者和實(shí)現(xiàn)者所關(guān)注的領(lǐng)域做了一個隔離个曙。
AQS實(shí)現(xiàn)分析
接下來將從實(shí)現(xiàn)的角度來具體分析AQS是如何來完成線程同步的锈嫩。
同步隊(duì)列分析
AQS的實(shí)現(xiàn)依賴內(nèi)部的同步隊(duì)列(FIFO雙向隊(duì)列)來完成同步狀態(tài)的管理,假如當(dāng)前線程獲取同步狀態(tài)失敗垦搬,AQS會將該線程以及等待狀態(tài)等信息構(gòu)造成一個Node呼寸,并將其加入同步隊(duì)列,同時阻塞當(dāng)前線程猴贰。當(dāng)同步狀態(tài)釋放時对雪,喚醒隊(duì)列的首節(jié)點(diǎn)。
Node
Node主要包含以下成員變量:
static final class Node{
????volatile int waitStatus;
????volatile Node prev;
????volatile Node next;
????volatile Thread thread;
????Node nextWaiter;
}
waitStatus:節(jié)點(diǎn)狀態(tài)米绕,主要有這幾種狀態(tài):
Node節(jié)點(diǎn)狀態(tài)
1.```CANCELLED```:當(dāng)前線程被取消瑟捣;
2.```SIGNAL```:當(dāng)前節(jié)點(diǎn)的后繼節(jié)點(diǎn)需要運(yùn)行;
3.```CONDITION```:當(dāng)前節(jié)點(diǎn)在等待condition栅干;
4.```PROPAGATE```:當(dāng)前場景下后續(xù)的acquireShared可以執(zhí)行迈套;
5.```0```:當(dāng)前節(jié)點(diǎn)在sync隊(duì)列中等待獲取鎖。
prev:前驅(qū)節(jié)點(diǎn)碱鳞;
next:后繼節(jié)點(diǎn)交汤;
thread:進(jìn)入隊(duì)列的當(dāng)前線程;
nextWaiter:存儲condition隊(duì)列中的后繼節(jié)點(diǎn)劫笙。
Node是sync隊(duì)列和condition隊(duì)列構(gòu)建的基礎(chǔ)芙扎,AQS擁有三個成員變量:
AQS成員變量
對于鎖的獲取,請求形成節(jié)點(diǎn)將其掛在隊(duì)列尾部填大,至于資源的轉(zhuǎn)移戒洼,是從頭到尾進(jìn)行,隊(duì)列的基本結(jié)構(gòu)就出來了:
AQS同步隊(duì)列結(jié)構(gòu)
1.節(jié)點(diǎn)插入
AQS提供基于CAS的設(shè)置尾節(jié)點(diǎn)的方法:
CAS設(shè)置尾節(jié)點(diǎn)
2.節(jié)點(diǎn)刪除
同步隊(duì)列遵循FIFO允华,首節(jié)點(diǎn)是獲取同步狀態(tài)成功的節(jié)點(diǎn)圈浇,首節(jié)點(diǎn)的線程在釋放同步狀態(tài)之后將會喚醒后繼節(jié)點(diǎn)寥掐,后繼節(jié)點(diǎn)將會在獲取同步狀態(tài)成功的時候?qū)⒆约涸O(shè)置為首節(jié)點(diǎn)。
注:設(shè)置首節(jié)點(diǎn)是由獲取同步狀態(tài)成功的線程來完成磷蜀,因?yàn)槊看沃粫幸粋€線程能夠成功的獲取到同步狀態(tài)召耘,所以,設(shè)置首節(jié)點(diǎn)并不需要CAS來保證褐隆。
AQS源碼解析
AQS提供以下接口以供實(shí)現(xiàn)自定義同步器:
1.protected boolean tryAcquire(int arg)
獨(dú)占式獲取同步狀態(tài)污它,該方法的實(shí)現(xiàn)需要先查詢當(dāng)前的同步狀態(tài)是否可以獲取,如果可以獲取再進(jìn)行獲仁衫贬;
2.protected boolean tryRelease(int arg)
釋放狀態(tài);
3.protected int tryAcquireShared(int arg)
共享式獲取同步狀態(tài)歇攻;
4.protected boolean tryReleaseShared(int arg)
共享式釋放狀態(tài)固惯;
5.protected boolean isHeldExclusively()
獨(dú)占模式下,判斷同步狀態(tài)是否已經(jīng)被占用缴守。
使用者可以根據(jù)實(shí)際情況使用這些接口自定義同步組件葬毫。
AQS提供兩種方式來操作同步狀態(tài),獨(dú)占式與共享式屡穗,下面就針對性做一下源碼分析贴捡。
獨(dú)占式同步狀態(tài)獲取 - acquire實(shí)現(xiàn)
獨(dú)占式同步狀態(tài)獲取
具體執(zhí)行流程如下:
調(diào)用tryAcquire方法嘗試獲取同步狀態(tài);
如果獲取不到同步狀態(tài)鸡捐,將當(dāng)前線程構(gòu)造成節(jié)點(diǎn)Node并加入同步隊(duì)列栈暇;
再次嘗試獲取麻裁,如果還是沒有獲取到那么將當(dāng)前線程從線程調(diào)度器上摘下箍镜,進(jìn)入等待狀態(tài)。
下面我們具體來看一下節(jié)點(diǎn)的構(gòu)造以及加入同步隊(duì)列部分的代碼實(shí)現(xiàn)煎源。
addWaiter實(shí)現(xiàn)
addWaiter實(shí)現(xiàn)
使用當(dāng)前thread構(gòu)造Node色迂;
嘗試在隊(duì)尾插入節(jié)點(diǎn),如果尾節(jié)點(diǎn)已經(jīng)存在手销,就做以下操作:
- 分配引用T指向尾節(jié)點(diǎn)歇僧;
- 將待插入節(jié)點(diǎn)的prev指針指向尾節(jié)點(diǎn);
- 如果尾節(jié)點(diǎn)還為T锋拖,將當(dāng)前尾節(jié)點(diǎn)設(shè)置為帶待插入節(jié)點(diǎn)诈悍;
- T的next指針指向待插入節(jié)點(diǎn)。
快速在隊(duì)尾插入節(jié)點(diǎn)兽埃,失敗則進(jìn)入enq(Node node)方法侥钳。
enq實(shí)現(xiàn)
enq的邏輯可以確保Node可以有順序的添加到同步隊(duì)列中,具體的加入隊(duì)列的邏輯如下:
初始化同步隊(duì)列:如果尾節(jié)點(diǎn)為空柄错,分配一個頭結(jié)點(diǎn)舷夺,并將尾節(jié)點(diǎn)指向頭結(jié)點(diǎn)苦酱;
節(jié)點(diǎn)入隊(duì),通過CAS將節(jié)點(diǎn)設(shè)置為尾節(jié)點(diǎn)给猾,以此在隊(duì)尾做節(jié)點(diǎn)插入疫萤。
可以看出,整個enq方法通過“死循環(huán)”來保證節(jié)點(diǎn)的正確插入敢伸。
進(jìn)入同步隊(duì)列之后接下來就是同步狀態(tài)的獲取了扯饶,或者說是訪問控制acquireQueued。對于同步隊(duì)列中的線程详拙,在同一時刻只能由隊(duì)列首節(jié)點(diǎn)獲取同步狀態(tài)帝际,其他的線程進(jìn)入等待稳其,直到符合條件才能繼續(xù)進(jìn)行耿戚。
acquireQueued實(shí)現(xiàn)
獲取當(dāng)前節(jié)點(diǎn)的前驅(qū)節(jié)點(diǎn)蹬挺;
如果當(dāng)前節(jié)點(diǎn)的前驅(qū)節(jié)點(diǎn)是頭節(jié)點(diǎn)建邓,并且可以獲取同步狀態(tài)贯要,設(shè)置當(dāng)前節(jié)點(diǎn)為頭結(jié)點(diǎn)讹蘑,該節(jié)點(diǎn)占有鎖启上;
不滿足條件的線程進(jìn)入等待狀態(tài)就漾。
在整個方法中矿微,當(dāng)前線程一直都在“死循環(huán)”中嘗試獲取同步狀態(tài):
節(jié)點(diǎn)自旋獲取同步狀態(tài)
從代碼的邏輯也可以看出痕慢,其實(shí)在節(jié)點(diǎn)與節(jié)點(diǎn)之間在循環(huán)檢查的過程中是不會相互通信的,僅僅只是判斷自己當(dāng)前的前驅(qū)是不是頭結(jié)點(diǎn)涌矢,這樣設(shè)計(jì)使得節(jié)點(diǎn)的釋放符合FIFO掖举,同時也避免了過早通知。
注:過早通知是指前驅(qū)節(jié)點(diǎn)不是頭結(jié)點(diǎn)的線程由于中斷被喚醒娜庇。
public final boolean release(int arg) {
????if (tryRelease(arg)) {
????????Node h =head;
????????if (h !=null && h.waitStatus !=0)
????????????unparkSuccessor(h);
????????return true;
????}
????return false;
}
release方法中塔次,若能夠釋放成功,則會通過unparkSuccessor()方法來重新設(shè)置頭節(jié)點(diǎn)與其下一個節(jié)點(diǎn)的喚醒觸發(fā)名秀。
/**
* Wakes up node's successor, if one exists.
*
* @param node the node
*/
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling.? It is OK if this
* fails or if status is changed by waiting thread.
*/
? ? int ws = node.waitStatus;
if (ws <0)
compareAndSetWaitStatus(node, ws,0);
/*
* Thread to unpark is held in successor, which is normally
* just the next node.? But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
? ? Node s = node.next;
if (s ==null || s.waitStatus >0) {
s =null;
for (Node t =tail; t !=null && t != node; t = t.prev)
if (t.waitStatus <=0)
s = t;
}
if (s !=null)
LockSupport.unpark(s.thread);
}
最終励负,執(zhí)行流程又回到了acquireQueued方法的自旋中,此時的頭節(jié)點(diǎn)已經(jīng)是release了鎖匕得,所以下一節(jié)點(diǎn)能夠獲取成功继榆。(下文會詳述)
acquire實(shí)現(xiàn)總結(jié)
1.同步狀態(tài)維護(hù):
對同步狀態(tài)的操作是原子、非阻塞的汁掠,通過AQS提供的對狀態(tài)訪問的方法來對同步狀態(tài)進(jìn)行操作略吨,并且利用CAS來確保原子操作;
2.狀態(tài)獲瓤稼濉:
一旦線程成功的修改了同步狀態(tài)翠忠,那么該線程會被設(shè)置為同步隊(duì)列的頭節(jié)點(diǎn);
3.同步隊(duì)列維護(hù):
不符合獲取同步狀態(tài)的線程會進(jìn)入等待狀態(tài)羔砾,直到符合條件被喚醒再開始執(zhí)行负间。
整個執(zhí)行流程如下:
當(dāng)前線程獲取同步狀態(tài)并執(zhí)行了相應(yīng)的邏輯之后偶妖,就需要釋放同步狀態(tài),讓后續(xù)節(jié)點(diǎn)可以獲取到同步狀態(tài)政溃,調(diào)用方法release(int arg)方法可以釋放同步狀態(tài)趾访。
獨(dú)占式同步狀態(tài)釋放 - release實(shí)現(xiàn)
1.嘗試釋放狀態(tài),tryRelease保證將狀態(tài)重置回去董虱,同樣采用CAS來保證操作的原子性扼鞋;
2.釋放成功后,調(diào)用unparkSuccessor喚醒當(dāng)前節(jié)點(diǎn)的后繼節(jié)點(diǎn)線程愤诱。
unparkSuccessor實(shí)現(xiàn)
取出當(dāng)前節(jié)點(diǎn)的next節(jié)點(diǎn)云头,將該節(jié)點(diǎn)線程喚醒,被喚醒的線程獲取同步狀態(tài)淫半。這里主要通過LockSupport的unpark方法喚醒線程溃槐。
共享式同步狀態(tài)獲取
共享式獲取與獨(dú)占式獲取最主要的區(qū)別就是在同一時刻能否有多個線程可以同時獲取到同步狀態(tài)。這兩種不同的方式在獲取資源區(qū)別如下圖所示:
共享式訪問資源時科吭,其他共享式訪問都是被允許的昏滴;
獨(dú)占式訪問資源時,在同一時刻只能有一個訪問对人,其他的訪問都被阻塞谣殊。
AQS提供acquireShared方法來支持共享式獲取同步狀態(tài)。
acquireShared實(shí)現(xiàn)
調(diào)用tryAcquireShared(int arg)方法嘗試獲取同步狀態(tài):
tryAcquireShared方法返回值 > 0時牺弄,表示能夠獲取到同步狀態(tài)姻几;
獲取失敗調(diào)用doAcquireShared(int arg)方法進(jìn)入同步隊(duì)列。
doAcquireShared實(shí)現(xiàn)
1.獲取當(dāng)前節(jié)點(diǎn)的前驅(qū)節(jié)點(diǎn)势告;
2.如果當(dāng)前節(jié)點(diǎn)的前驅(qū)節(jié)點(diǎn)是頭結(jié)點(diǎn)蛇捌,并且獲取到的共享同步狀態(tài) > 0,設(shè)置當(dāng)前節(jié)點(diǎn)的為頭結(jié)點(diǎn)培慌,獲取同步狀態(tài)成功豁陆;
3.不滿足條件的線程自旋等待柑爸。
與獨(dú)占式獲取同步狀態(tài)一樣吵护,共享式獲取也是需要釋放同步狀態(tài)的,AQS提供releaseShared(int arg)方法可以釋放同步狀態(tài)表鳍。
共享式同步狀態(tài)釋放 - releaseShared實(shí)現(xiàn)
調(diào)用tryReleaseShared方法釋放狀態(tài)馅而;
調(diào)用doReleaseShared方法喚醒后繼節(jié)點(diǎn);
獨(dú)占式超時獲取 - doAcquireNanos
該方法提供了超時獲取同步狀態(tài)調(diào)用譬圣,假如在指定的時間段內(nèi)可以獲取到同步狀態(tài)返回true瓮恭,否則返回false。它是acquireInterruptibly(int arg)的增強(qiáng)版厘熟。
acquireInterruptibly實(shí)現(xiàn)
該方法提供了獲取同步狀態(tài)的能力屯蹦,同樣维哈,在無法獲取同步狀態(tài)時會進(jìn)入同步隊(duì)列,這類似于acquire的功能登澜,但是它和acquire還是區(qū)別的:acquireInterruptibly可以在外界對當(dāng)前線程進(jìn)行中斷的時候可以提前獲取到同步狀態(tài)的操作阔挠,換個通俗易懂的解釋吧:類似于synchronized獲取鎖時,這時候外界對當(dāng)前線程中斷了脑蠕,線程獲取鎖的這個操作能夠及時響應(yīng)中斷并且提前返回购撼。
判斷當(dāng)前線程是否被中斷,如果已經(jīng)被中斷谴仙,拋出InterruptedException異常并將中斷標(biāo)志位置為false迂求;
獲取同步狀態(tài),獲取成功并返回晃跺,獲取不成功調(diào)用doAcquireInterruptibly(int arg)排隊(duì)等待揩局。
doAcquireInterruptibly實(shí)現(xiàn)
構(gòu)造節(jié)點(diǎn)Node,加入同步隊(duì)列掀虎;
假如當(dāng)前節(jié)點(diǎn)是首節(jié)點(diǎn)并且可以獲取到同步狀態(tài)谐腰,將當(dāng)前節(jié)點(diǎn)設(shè)置為頭結(jié)點(diǎn),其他節(jié)點(diǎn)自旋等待涩盾;
節(jié)點(diǎn)每次被喚醒的時候十气,需要進(jìn)行中斷檢測,假如當(dāng)前線程被中斷春霍,拋出異常InterruptedException砸西,退出循環(huán)。
doAcquireNanos實(shí)現(xiàn)
該方法在支持中斷響應(yīng)的基礎(chǔ)上址儒,增加了超時獲取的特性芹枷。針對超時獲取,主要在于計(jì)算出需要睡眠的時間間隔nanosTimeout莲趣,如果nanosTimeout > 0表示當(dāng)前線程還需要睡眠鸳慈,反之返回false。
nanosTimeout <= 0喧伞,表明當(dāng)前線程不需要睡眠走芋,返回false,不能獲取到同步狀態(tài)潘鲫;
不滿足條件的線程加入同步隊(duì)列翁逞;
假如當(dāng)前節(jié)點(diǎn)是首節(jié)點(diǎn),并且可以獲取到同步狀態(tài)溉仑,將當(dāng)前節(jié)點(diǎn)設(shè)置為頭結(jié)點(diǎn)并退出挖函,返回true,表明在指定的時間內(nèi)可以獲取到同步狀態(tài)浊竟;
不滿足條件3的線程怨喘,計(jì)算出當(dāng)前休眠時間津畸,nanosTimeout = 原有nanosTimeout + deadline(睡眠之前記錄的時間)- now(System.nanoTime():當(dāng)前時間):
如果nanosTimeout <= 0,返回超時未獲取到同步狀態(tài)必怜;
如果nanosTimeout > 0 && nanosTimeout <= 1000L洼畅,線程快速自旋
注:為什么不直接進(jìn)入超時等待呢?原因在于非常短的超時等待是無法做到十分精確的棚赔,如果這時候再進(jìn)入超時等待會讓nanosTimeout的超時從整體上表現(xiàn)的不精確帝簇,所以,在超時非常短的情況下靠益,AQS都會無條件進(jìn)入快速自旋丧肴;
- 如果nanosTimeout > 1000L,線程通過LockSupport.parkNanos進(jìn)入超時等待胧后。
整個流程可以總結(jié)如下圖所示:
后記
在上述對AQS進(jìn)行了實(shí)現(xiàn)層面的分析之后芋浮,我們就一個案例來加深對AQS的理解。
案例:設(shè)計(jì)一個AQS同步器壳快,該工具在同一時刻纸巷,只能有兩個線程能夠訪問,其他的線程阻塞眶痰。
設(shè)計(jì)分析:針對上述案例瘤旨,我們可以這樣定義AQS,設(shè)定一個初始狀態(tài)為2竖伯,每一個線程獲取一次就-1存哲,正確的狀態(tài)為:0,1七婴,2祟偷,0表示新的線程獲取同步狀態(tài)時阻塞。由于資源數(shù)量大與1打厘,需要實(shí)現(xiàn)tryAcquireShared和tryReleaseShared方法修肠。
代碼實(shí)現(xiàn):
public class LockInstance implements Lock {
? ? private final Sync sync = new Sync(2);
? ? private static final class Sync extends AbstractQueuedSynchronizer {
? ? ? Sync(int state) {
? ? ? ? ? ? if (state <= 0) {
? ? ? ? ? ? ? ? throw new IllegalArgumentException("count must large than 0");
? ? ? ? ? ? }
? ? ? ? ? ? setState(state);
? ? ? ? }
? ? ? ? @Override
? ? ? ? public int tryAcquireShared(int arg) {
? ? ? ? ? ? for (;;) {
? ? ? ? ? ? ? ? System.out.println("try acquire....");
? ? ? ? ? ? ? ? int current = getState();
? ? ? ? ? ? ? ? int now = current - arg;
? ? ? ? ? ? ? ? if (now < 0 || compareAndSetState(current, now)) {
? ? ? ? ? ? ? ? ? ? return now;
? ? ? ? ? ? ? ? }
? ? ? ? ? ? }
? ? ? ? }
? ? ? ? @Override
? ? ? ? public boolean tryReleaseShared(int arg) {
? ? ? ? ? ? for(;;) {
? ? ? ? ? ? ? ? System.out.println("try release....");
? ? ? ? ? ? ? ? int current = getState();
? ? ? ? ? ? ? ? int now = current + arg;
? ? ? ? ? ? ? ? if (compareAndSetState(current, now)) {
? ? ? ? ? ? ? ? ? ? return true;
? ? ? ? ? ? ? ? }
? ? ? ? ? ? }
? ? ? ? }
? ? }
? ? @Override
? ? public void lock() {
? ? ? ? sync.acquireShared(1);
? ? }
? ? @Override
? ? public void lockInterruptibly() throws InterruptedException {
? ? ? ? sync.acquireInterruptibly(1);
? ? }
? ? @Override
? ? public boolean tryLock() {
? ? ? ? return sync.tryAcquireShared(1) >= 0;
? ? }
? ? @Override
? ? public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
? ? ? ? return sync.tryAcquireSharedNanos(1, unit.toNanos(time));
? ? }
? ? @Override
? ? public void unlock() {
? ? ? ? sync.tryReleaseShared(1);
? ? }
? ? @Override
? ? public Condition newCondition() {
? ? ? ? return null;
? ? }
}