AQS是指AbstractQueuedSynchronizer迫肖,抽象隊(duì)列同步锅劝。AQS是多個(gè)重要接口實(shí)現(xiàn)的工具類包括之前講的ReentrantLock、ReentrantReadWriteLock蟆湖、Semaphore故爵、CountDownLatch、Condition接口由ConditionObject實(shí)現(xiàn)隅津,而ConditionObject又是AQS的內(nèi)部類诬垂,所以AQS是實(shí)現(xiàn)其功能的重要工具類。下面是AQS的大致類圖:
- AQS是一個(gè)通過內(nèi)置的FIFO雙向隊(duì)列來完成線程的排隊(duì)工作(內(nèi)部通過結(jié)點(diǎn)head和tail記錄隊(duì)首和隊(duì)尾元素伦仍,元素的結(jié)點(diǎn)類型為Node類型结窘,后面我們會(huì)看到Node的具體構(gòu)造)。
- Node中的thread用來存放進(jìn)入AQS隊(duì)列中的線程引用充蓝,Node結(jié)點(diǎn)內(nèi)部的SHARED表示標(biāo)記線程是因?yàn)楂@取共享資源失敗被阻塞添加到隊(duì)列中的隧枫;Node中的EXCLUSIVE表示線程因?yàn)楂@取獨(dú)占資源失敗被阻塞添加到隊(duì)列中的。waitStatus表示當(dāng)前線程的等待狀態(tài):
①CANCELLED=1:表示線程因?yàn)橹袛嗷蛘叩却瑫r(shí)谓苟,需要從等待隊(duì)列中取消等待官脓;
②SIGNAL=-1:當(dāng)前線程thread1占有鎖,隊(duì)列中的head(僅僅代表頭結(jié)點(diǎn)涝焙,里面沒有存放線程引用)的后繼結(jié)點(diǎn)node1處于等待狀態(tài)仑撞,如果已占有鎖的線程thread1釋放鎖或被CANCEL之后就會(huì)通知這個(gè)結(jié)點(diǎn)node1去獲取鎖執(zhí)行。
?③CONDITION=-2:表示結(jié)點(diǎn)在等待隊(duì)列中(這里指的是等待在某個(gè)lock的condition上搀缠,關(guān)于Condition的原理下面會(huì)寫到)艺普,當(dāng)持有鎖的線程調(diào)用了Condition的signal()方法之后歧譬,結(jié)點(diǎn)會(huì)從該condition的等待隊(duì)列轉(zhuǎn)移到該lock的同步隊(duì)列上瑰步,去競爭lock缩焦。(注意:這里的同步隊(duì)列就是我們說的AQS維護(hù)的FIFO隊(duì)列,等待隊(duì)列則是每個(gè)condition關(guān)聯(lián)的隊(duì)列)
?④PROPAGTE=-3:表示下一次共享狀態(tài)獲取將會(huì)傳遞給后繼結(jié)點(diǎn)獲取這個(gè)共享同步狀態(tài)盖桥。
同步狀態(tài)值(state)干什么用的揩徊?
①首先在多線程競爭的條件下塑荒,采用CAS的方式來獲取和設(shè)置同步狀態(tài)值(state)齿税。
②同步狀態(tài)值state代表獲取鎖的線程加鎖的次數(shù)炊豪,如果線程獲取鎖溜在,那么state加1變?yōu)?掖肋。如果線程釋放鎖志笼,那么state減1變?yōu)?纫溃。
③volatile實(shí)現(xiàn)原則還是將緩存中的數(shù)據(jù)寫入到主存紊浩,每個(gè)線程都是從主存中讀取值坊谁。保證了數(shù)據(jù)的一致性口芍。
- AQS中維持了一個(gè)單一的volatile修飾的狀態(tài)信息state(AQS通過Unsafe的相關(guān)方法鬓椭,以原子性的方式由線程去獲取這個(gè)state)。AQS提供了getState()翘瓮、setState()春畔、compareAndSetState()函數(shù)修改值(實(shí)際上調(diào)用的是unsafe的compareAndSwapInt方法)律姨。下面是AQS中的部分成員變量以及更新state的方法
//這就是我們剛剛說到的head結(jié)點(diǎn)臼疫,懶加載的(只有競爭失敗需要構(gòu)建同步隊(duì)列的時(shí)候烫堤,才會(huì)創(chuàng)建這個(gè)head)鸽斟,如果頭節(jié)點(diǎn)存在富蓄,它的waitStatus不能為CANCELLED
private transient volatile Node head;
//當(dāng)前同步隊(duì)列尾節(jié)點(diǎn)的引用立倍,也是懶加載的口注,只有調(diào)用enq方法的時(shí)候會(huì)添加一個(gè)新的wait node
private transient volatile Node tail;
//AQS核心:同步狀態(tài)
private volatile int state;
protected final int getState() {
return state;
}
protected final void setState(int newState) {
state = newState;
}
protected final boolean compareAndSetState(int expect, int update) {
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
這個(gè)方法采用cas原則來保證狀態(tài)設(shè)置的原子性寝志,cas本身采用樂觀鎖的方式來實(shí)現(xiàn)的材部,從而不會(huì)產(chǎn)生線程的阻塞問題败富。由于采用volatile讀寫語義兽叮,那么線程訪問是保持一致性的猾愿。
理由:因?yàn)関olatile實(shí)現(xiàn)原則是將緩存中的數(shù)據(jù)寫入到主存中的蒂秘。所以每個(gè)線程讀寫的數(shù)據(jù)都是從主存中獲取來的姻僧,而不是每個(gè)線程緩存的數(shù)據(jù)撇贺,所以保證了一致性松嘶。
- AQS的設(shè)計(jì)模式是基于模板方法模式的翠订。使用時(shí)候需要繼承同步器并重寫指定的方法尽超,并且通常將子類推薦為定義同步組件的靜態(tài)內(nèi)部類似谁,子類重寫這些方法之后,AQS工作時(shí)使用的是提供的模板方法斜筐,在這些模板方法中調(diào)用子類重寫的方法顷链。其中子類可以重寫的方法
//獨(dú)占式的獲取同步狀態(tài)嗤练,實(shí)現(xiàn)該方法需要查詢當(dāng)前狀態(tài)并判斷同步狀態(tài)是否符合預(yù)期煞抬,然后再進(jìn)行CAS設(shè)置同步狀態(tài)
protected boolean tryAcquire(int arg) { throw new UnsupportedOperationException();}
//獨(dú)占式的釋放同步狀態(tài)革答,等待獲取同步狀態(tài)的線程可以有機(jī)會(huì)獲取同步狀態(tài)
protected boolean tryRelease(int arg) { throw new UnsupportedOperationException();}
//共享式的獲取同步狀態(tài)
protected int tryAcquireShared(int arg) { throw new UnsupportedOperationException();}
//嘗試將狀態(tài)設(shè)置為以共享模式釋放同步狀態(tài)。 該方法總是由執(zhí)行釋放的線程調(diào)用途茫。
protected int tryReleaseShared(int arg) { throw new UnsupportedOperationException(); }
//當(dāng)前同步器是否在獨(dú)占模式下被線程占用囊卜,一般該方法表示是否被當(dāng)前線程所獨(dú)占
protected int isHeldExclusively(int arg) { throw new UnsupportedOperationException();}
- AQS的內(nèi)部類ConditionObject是通過結(jié)合鎖實(shí)現(xiàn)線程同步栅组,ConditionObject可以直接訪問AQS的變量(state笑窜、queue)排截,ConditionObject是個(gè)條件變量 断傲,每個(gè)ConditionObject對(duì)應(yīng)一個(gè)隊(duì)列用來存放線程調(diào)用condition條件變量的await方法之后被阻塞的線程认罩。
AQS模式:
AQS分為獨(dú)占模式和共享模式,獨(dú)占模式下其他線程無法獲取該線程鎖宦搬。在共享模式下AQS維護(hù)了一個(gè)等待資源的FIFO先進(jìn)先出隊(duì)列间校,在線程1資源被釋放后憔足,AQS會(huì)從頭結(jié)點(diǎn)開始依次喚醒隊(duì)列中的線程2等線程2結(jié)束后再喚醒后面的從前至后的所有結(jié)點(diǎn)滓彰,使他們對(duì)應(yīng)的線程恢復(fù)執(zhí)行揭绑,直到隊(duì)列為空洗做。
AQS中的獨(dú)占模式
了解了一下AQS的基本組成,這里通過ReentrantLock的非公平鎖實(shí)現(xiàn)來具體分析AQS的獨(dú)占模式的加鎖和釋放鎖的過程撰筷。
非公平鎖的加鎖流程
簡單說來毕籽,AQS會(huì)把所有的請(qǐng)求線程構(gòu)成一個(gè)CLH隊(duì)列关筒,當(dāng)一個(gè)線程執(zhí)行完畢(lock.unlock())時(shí)會(huì)激活自己的后繼節(jié)點(diǎn)蒸播,但正在執(zhí)行的線程并不在隊(duì)列中袍榆,而那些等待執(zhí)行的線程全部處于阻塞狀態(tài)(park())塘揣,如下圖所示亲铡。
? 1. 假設(shè)這個(gè)時(shí)候在初始情況下赞草,還沒有多任務(wù)來請(qǐng)求競爭這個(gè)state厨疙,這時(shí)候如果第一個(gè)線程thread1調(diào)用了lock方法請(qǐng)求獲得鎖檀头,首先會(huì)通過CAS的方式將state更新為1暑始,表示自己thread1獲得了鎖廊镜,并將獨(dú)占鎖的線程持有者設(shè)置為thread1唉俗。
final void lock() {
if (compareAndSetState(0, 1))
//setExclusiveOwnerThread是AbstractOwnableSynchronizer的方法,AQS繼承了AbstractOwnableSynchronizer
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
- 這個(gè)時(shí)候有另一個(gè)線程thread2來嘗試或者鎖股缸,同樣也調(diào)用lock方法吱雏,嘗試通過CAS的方式將state更新為1歧杏,但是由于之前已經(jīng)有線程持有了state犬绒,所以thread2這一步CAS失斂Α(前面的thread1已經(jīng)獲取state并且沒有釋放)沮协,就會(huì)調(diào)用acquire(1)方法(該方法是AQS提供的模板方法,它會(huì)調(diào)用子類的tryAcquire方法)聘殖。非公平鎖的實(shí)現(xiàn)中奸腺,AQS的模板方法acquire(1)就會(huì)調(diào)用NofairSync的tryAcquire方法突照,而tryAcquire方法又調(diào)用的Sync的nonfairTryAcquire方法讹蘑,所以我們看看nonfairTryAcquire的流程座慰。
//NofairSync
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
final boolean nonfairTryAcquire(int acquires) {
//(1)獲取當(dāng)前線程
final Thread current = Thread.currentThread();
//(2)獲得當(dāng)前同步狀態(tài)state
int c = getState();
//(3)如果state==0版仔,表示沒有線程獲取
if (c == 0) {
//(3-1)那么就嘗試以CAS的方式更新state的值
if (compareAndSetState(0, acquires)) {
//(3-2)如果更新成功益缎,就設(shè)置當(dāng)前獨(dú)占模式下同步狀態(tài)的持有者為當(dāng)前線程
setExclusiveOwnerThread(current);
//(3-3)獲得成功之后然想,返回true
return true;
}
}
//(4)這里是重入鎖的邏輯
else if (current == getExclusiveOwnerThread()) {
//(4-1)判斷當(dāng)前占有state的線程就是當(dāng)前來再次獲取state的線程之后变泄,就計(jì)算重入后的state
int nextc = c + acquires;
//(4-2)這里是風(fēng)險(xiǎn)處理
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
//(4-3)通過setState無條件的設(shè)置state的值杖刷,(因?yàn)檫@里也只有一個(gè)線程操作state的值滑燃,即
//已經(jīng)獲取到的線程表窘,所以沒有進(jìn)行CAS操作)
setState(nextc);
return true;
}
//(5)沒有獲得state乐严,也不是重入昂验,就返回false
return false;
}
總結(jié)來說就是:
1既琴、獲取當(dāng)前將要去獲取鎖的線程thread2。
2逆济、獲取當(dāng)前AQS的state的值奖慌。如果此時(shí)state的值是0简僧,那么我們就通過CAS操作獲取鎖涎劈,然后設(shè)置AQS的線程占有者為thread2蛛枚。很明顯蹦浦,在當(dāng)前的這個(gè)執(zhí)行情況下盲镶,state的值是1不是0蝌诡,因?yàn)槲覀兊膖hread1還沒有釋放鎖浦旱。所以CAS失敗颁湖,后面第3步的重入邏輯也不會(huì)進(jìn)行
3甥捺、如果當(dāng)前將要去獲取鎖的線程等于此時(shí)AQS的exclusiveOwnerThread的線程镰禾,則此時(shí)將state的值加1吴侦,這是重入鎖的實(shí)現(xiàn)方式妈倔。
4、最終thread2執(zhí)行到這里會(huì)返回false毅哗。
? (3)上面的thread2加鎖失敗虑绵,返回false翅睛。那么根據(jù)開始我們講到的AQS概述就應(yīng)該將thread2構(gòu)造為一個(gè)Node結(jié)點(diǎn)加入同步隊(duì)列中。因?yàn)镹ofairSync的tryAcquire方法是由AQS的模板方法acquire()來調(diào)用的疏旨,那么我們看看該方法的源碼以及執(zhí)行流程檐涝。
//(1)tryAcquire谁榜,這里thread2執(zhí)行返回了false凡纳,那么就會(huì)執(zhí)行addWaiter將當(dāng)前線程構(gòu)造為一個(gè)結(jié)點(diǎn)加入同步隊(duì)列中
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
AQS共享模式
AQS框架數(shù)據(jù)結(jié)構(gòu)是一個(gè)先進(jìn)先出的雙向隊(duì)列,當(dāng)多個(gè)線程進(jìn)行競爭資源時(shí)撕瞧,那些競爭失敗的線程會(huì)加入到隊(duì)列中丛版。他向上層提供了很多接口,其中一個(gè)是acquireShared獲取共享模式的接口页畦。本文將會(huì)根據(jù)這個(gè)接口一步步分析,獲取資源失敗的線程是怎么進(jìn)入到隊(duì)列中的,進(jìn)入到隊(duì)列中又是怎么出隊(duì)列再次競爭資源的,下面是acquireShared執(zhí)行的一個(gè)大致流程:
多個(gè)線程通過調(diào)用tryAcquireShared方法獲取共享資源豫缨,返回值大于等于0則獲取資源成功,返回值小于0則獲取失敗端朵。
當(dāng)前線程獲取共享資源失敗后冲呢,通過調(diào)用addWaiter方法把該線程封裝為Node節(jié)點(diǎn)敬拓,并設(shè)置該節(jié)點(diǎn)為共享模式。然后把該節(jié)點(diǎn)添加到隊(duì)列的尾部厕诡。
添加到尾部后,判斷該節(jié)點(diǎn)的上一個(gè)節(jié)點(diǎn)是不是隊(duì)列的頭節(jié)點(diǎn)灵嫌,如果是頭節(jié)點(diǎn)寿羞,那么該節(jié)點(diǎn)的上一個(gè)節(jié)點(diǎn)出隊(duì)列并獲取共享資源,同時(shí)調(diào)用setHeadAndPropagate方法把該節(jié)點(diǎn)設(shè)置為新的頭節(jié)點(diǎn),同時(shí)喚醒隊(duì)列中所有共享類型的節(jié)點(diǎn),去獲取共享資源。如果獲取失敗,則再次加入到隊(duì)列中霞幅。
如果該節(jié)點(diǎn)的前驅(qū)節(jié)點(diǎn)不是頭節(jié)點(diǎn),那么通過for循環(huán)進(jìn)行自旋轉(zhuǎn)等待,直到當(dāng)前節(jié)點(diǎn)的前驅(qū)節(jié)點(diǎn)是頭節(jié)點(diǎn)司恳,結(jié)束自旋绍傲。
這就是AQS共享模式競爭資源失敗的大致流程烫饼。
AQS總結(jié):
1.狀態(tài)變量state杠纵,AQS中定義了一個(gè)狀態(tài)變量state,它有以下兩種使用方法:
(1)互斥鎖
當(dāng)AQS只實(shí)現(xiàn)為互斥鎖的時(shí)候铝量,每次只要原子更新state的值從0變?yōu)?成功了就獲取了鎖慢叨,可重入是通過不斷把state原子更新加1實(shí)現(xiàn)的拍谐。
(2)互斥鎖 + 共享鎖
當(dāng)AQS需要同時(shí)實(shí)現(xiàn)為互斥鎖+共享鎖的時(shí)候赠尾,低16位存儲(chǔ)互斥鎖的狀態(tài)气嫁,高16位存儲(chǔ)共享鎖的狀態(tài)寸宵,主要用于實(shí)現(xiàn)讀寫鎖崖面∥自保互斥鎖是一種獨(dú)占鎖简识,每次只允許一個(gè)線程獨(dú)占感猛,且當(dāng)一個(gè)線程獨(dú)占時(shí),其它線程將無法再獲取互斥鎖及共享鎖颈走,但是它自己可以獲取共享鎖立由。共享鎖同時(shí)允許多個(gè)線程占有锐膜,只要有一個(gè)線程占有了共享鎖枣耀,所有線程(包括自己)都將無法再獲取互斥鎖捞奕,但是可以獲取共享鎖颅围。
2.AQS隊(duì)列
AQS中維護(hù)了一個(gè)隊(duì)列恨搓,獲取鎖失敵M亍(非tryLock())的線程都將進(jìn)入這個(gè)隊(duì)列中排隊(duì)弄抬,等待鎖釋放后喚醒下一個(gè)排隊(duì)的線程(互斥鎖模式下)宪郊。
3.Condition隊(duì)列
AQS中還有另一個(gè)非常重要的內(nèi)部類ConditionObject,它實(shí)現(xiàn)了Condition接口依啰,主要用于實(shí)現(xiàn)條件鎖速警。ConditionObject中也維護(hù)了一個(gè)隊(duì)列闷旧,這個(gè)隊(duì)列主要用于等待條件的成立鸠匀,當(dāng)條件成立時(shí)逾柿,其它線程將signal這個(gè)隊(duì)列中的元素,將其移動(dòng)到AQS的隊(duì)列中父腕,等待占有鎖的線程釋放鎖后被喚醒弱匪。Condition典型的運(yùn)用場景是在BlockingQueue中的實(shí)現(xiàn),當(dāng)隊(duì)列為空時(shí)璧亮,獲取元素的線程阻塞在notEmpty條件上萧诫,一旦隊(duì)列中添加了一個(gè)元素,將通知notEmpty條件枝嘶,將其隊(duì)列中的元素移動(dòng)到AQS隊(duì)列中等待被喚醒帘饶。
4.設(shè)計(jì)模式之—模版方法
AQS抽象類巧妙地用了模版方法面定義了一系列的模板方法,比如下面這些:
// 獲取互斥鎖
public final void acquire(int arg) {
// tryAcquire(arg)需要子類實(shí)現(xiàn)
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
// 獲取互斥鎖可中斷
public final void acquireInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
// tryAcquire(arg)需要子類實(shí)現(xiàn)
if (!tryAcquire(arg))
doAcquireInterruptibly(arg);
}
// 獲取共享鎖
public final void acquireShared(int arg) {
// tryAcquireShared(arg)需要子類實(shí)現(xiàn)
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
// 獲取共享鎖可中斷
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
// tryAcquireShared(arg)需要子類實(shí)現(xiàn)
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
// 釋放互斥鎖
public final boolean release(int arg) {
// tryRelease(arg)需要子類實(shí)現(xiàn)
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
// 釋放共享鎖
public final boolean releaseShared(int arg) {
// tryReleaseShared(arg)需要子類實(shí)現(xiàn)
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
獲取鎖群扶、釋放鎖的這些方法基本上都穿插在ReentrantLock及刻、ReentrantReadWriteLock、Semaphore竞阐、CountDownLatch的源碼解析中了,這些工具類后面會(huì)降講到峭火。
需要子類實(shí)現(xiàn)的方法
上面一起學(xué)習(xí)了AQS中幾個(gè)重要的模板方法,下面我們?cè)僖黄饘W(xué)習(xí)下幾個(gè)需要子類實(shí)現(xiàn)的方法:
// 互斥模式下使用:嘗試獲取鎖
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
// 互斥模式下使用:嘗試釋放鎖
protected boolean tryRelease(int arg) {
throw new UnsupportedOperationException();
}
// 共享模式下使用:嘗試獲取鎖
protected int tryAcquireShared(int arg) {
throw new UnsupportedOperationException();
}
// 共享模式下使用:嘗試釋放鎖
protected boolean tryReleaseShared(int arg) {
throw new UnsupportedOperationException();
}
// 如果當(dāng)前線程獨(dú)占著鎖,返回true
protected boolean isHeldExclusively() {
throw new UnsupportedOperationException();
}
這幾個(gè)方法為什么不直接定義成抽象方法呢?
因?yàn)樽宇愔灰獙?shí)現(xiàn)這幾個(gè)方法中的一部分就可以實(shí)現(xiàn)一個(gè)同步器了,所以不需要定義成抽象方法猾漫。
總結(jié)
今天我們大概講了下AQS中幾個(gè)重要的組成部分:
(1)狀態(tài)變量state陪竿;
(2)AQS隊(duì)列;
(3)Condition隊(duì)列;
(4)模板方法花枫;
(5)需要子類實(shí)現(xiàn)的方法馒疹;
本文參考:
https://www.cnblogs.com/tong-yuan/p/abstractqueuedsynchronizer.html
AQS源碼詳細(xì)分析資料:
https://www.cnblogs.com/fsmly/p/11274572.html
https://blog.csdn.net/ya_1249463314/article/details/77838509