簡(jiǎn)敘
之前我寫(xiě)過(guò)很多關(guān)于JUC下各種鎖的使用文章,但是都沒(méi)說(shuō)是如何實(shí)現(xiàn)的.如果你去看ReentrantLock
的源碼,你會(huì)發(fā)現(xiàn)它內(nèi)部有一個(gè)Sync
類(lèi),它繼承了AbstractQueuedSynchronizer
這個(gè)抽象類(lèi)實(shí)現(xiàn)了該類(lèi)的某些方法.這個(gè)就是我們今天要說(shuō)的AQS
.我們一定先要搞懂這個(gè)類(lèi)才能真正的了解到ReentrantLock
它是如何實(shí)現(xiàn)的.
閱讀預(yù)備知識(shí)點(diǎn)
CAS
:需要知道什么是CAS.簡(jiǎn)單的說(shuō)就是比較
和交換
.舉個(gè)簡(jiǎn)單的例子:例如你去更新數(shù)據(jù)中的訂單狀態(tài)為未支付
的訂單為已支付
.你的sql語(yǔ)句必須是update t_order set status = '已支付' where order_id = 'xxxx' and status = '未支付'
.而不應(yīng)該是update t_order set status = '已支付' where order_id = 'xxxx'
.如果還是不理解可以查看我們之前寫(xiě)的關(guān)于CAS的文章.volatile
:需要知道volatile
關(guān)鍵字的作用.簡(jiǎn)單的說(shuō)使用它修飾的變量,只要值發(fā)生改變?cè)谄渌€程能立馬看見(jiàn)改變后的值.可以參考我之前寫(xiě)的關(guān)于volatile相關(guān)的文章.CLH隊(duì)列
:一種基于單向鏈表
實(shí)現(xiàn)的隊(duì)列.基于該隊(duì)列可以實(shí)現(xiàn)一個(gè)簡(jiǎn)單高效的自旋鎖.在閱讀源碼中我們同時(shí)可以思考AQS是如何來(lái)實(shí)現(xiàn)可重入的.關(guān)于CLH隊(duì)列的介紹可以參照我之前寫(xiě)的文章CLHLOCK實(shí)現(xiàn)LockSupport
:用來(lái)創(chuàng)建鎖和其他同步類(lèi)的線程阻塞原語(yǔ).簡(jiǎn)單的說(shuō)就是用來(lái)阻塞和喚醒線程的.具體介紹可以查看pack和unpack.
上面說(shuō)的預(yù)備知識(shí)請(qǐng)保證先經(jīng)歷弄清楚,如果不是很清楚不太建議看關(guān)于AQS
的實(shí)現(xiàn).因?yàn)槿绻厦娴闹R(shí)點(diǎn)你不清楚,你看完也不知道為什么會(huì)這樣,到處都是各種疑問(wèn).當(dāng)然還有其他知識(shí)點(diǎn)我可能沒(méi)寫(xiě)上,但是這幾個(gè)比較重要的還請(qǐng)先弄清楚或者說(shuō)至少知道他們的作用.
解析
注意:本文是基于JDK1.8所寫(xiě),因?yàn)槲覜](méi)有看到其他版本的JDK源碼,如果你使用的不是JDK1.8可能會(huì)稍有不同,當(dāng)并不影響我們了解AQS
的實(shí)現(xiàn).
為什么AQS是abstract類(lèi)
AbstractQueuedSynchronizer
被聲明成一個(gè)abstract
類(lèi),而在java中abstract
一般代表這個(gè)類(lèi)有部分方法未被實(shí)現(xiàn),我們?cè)谟玫臅r(shí)候需要根據(jù)自己的具體需求來(lái)實(shí)現(xiàn).但是你查看源碼可以發(fā)現(xiàn)并沒(méi)有方法被聲明成abstract
.但是有部分方法內(nèi)部實(shí)現(xiàn)為:throw new UnsupportedOperationException();
.這部分方法主要如下:
//獨(dú)占方式獲取資源.true表示成功,false表示失敗
protected boolean tryAcquire(int arg)
//獨(dú)占方式釋放資源.true表示成功,false表示失敗
protected boolean tryRelease(int arg)
//共享方式獲取資源.返回值大于等于0是代表成功,小于0代表失敗
protected int tryAcquireShared(int arg)
//共享方式釋放鎖.true表示成,false表示失敗
protected boolean tryReleaseShared(int arg)
//對(duì)于調(diào)用的線程同步是以獨(dú)占的形式進(jìn)行的返回true,否則返回false.
protected boolean isHeldExclusively()
上面這些方法主要可以分為SHARED(共享)
和EXCLUSIVE(獨(dú)占)
這兩大類(lèi).而我們自定義的同步器一般就是獨(dú)占式例如ReentrantLock
或者共享式CountDownLatch
這兩大類(lèi).我們只需要實(shí)現(xiàn)tryAcquire-tryRelease
或者tryAcquireShared-tryReleaseShared
即可.但是也有同時(shí)實(shí)現(xiàn)這兩種的例如ReentrantReadWriteLock
.
CLH實(shí)現(xiàn)
AQS
中的CLH隊(duì)列是CLH的一種變種實(shí)現(xiàn).在我之前寫(xiě)的CLHLOCK
文章中指出了CLHLOCK
是無(wú)法實(shí)現(xiàn)鎖的可重入的.而AQS
是支持鎖重入的.下面介紹AQS
中的CLH
是如何實(shí)現(xiàn)的.
Node
AQS
內(nèi)部有一個(gè)Node
靜態(tài)內(nèi)部類(lèi),該類(lèi)就是組成CLH
隊(duì)列的節(jié)點(diǎn).它是對(duì)每一個(gè)等待獲取共享資源的線程的封裝
.先看下面的源碼我們注釋來(lái)了解一個(gè)Node
具體封裝了哪些內(nèi)容.
static final class Node {
//共享模式的標(biāo)記
static final Node SHARED = new Node();
//獨(dú)占模式的標(biāo)記
static final Node EXCLUSIVE = null;
//表示當(dāng)前節(jié)點(diǎn)已經(jīng)被取消了
static final int CANCELLED = 1;
//表示當(dāng)前節(jié)點(diǎn)的后繼節(jié)點(diǎn)等待喚醒.后繼節(jié)點(diǎn)在進(jìn)入等待前會(huì)將它的前節(jié)點(diǎn)(前節(jié)點(diǎn)有效)設(shè)置為該狀態(tài)
static final int SIGNAL = -1;
//表示該節(jié)點(diǎn)等待在condition上,當(dāng)其他線程調(diào)用signal()方法后,condition狀態(tài)的節(jié)點(diǎn)將從等待隊(duì)列轉(zhuǎn)移到同步隊(duì)列.
static final int CONDITION = -2;
//共享模式下,當(dāng)前節(jié)點(diǎn)喚醒后繼節(jié)點(diǎn)及其后繼節(jié)點(diǎn)的后繼節(jié)點(diǎn)
static final int PROPAGATE = -3;
//節(jié)點(diǎn)的等待狀態(tài).默認(rèn)為0.
volatile int waitStatus;
//前節(jié)點(diǎn)
volatile Node prev;
//后繼節(jié)點(diǎn)
volatile Node next;
//線程
volatile Thread thread;
//鏈接在條件隊(duì)列等待的下一個(gè)節(jié)點(diǎn)或者是特殊值SHARD
Node nextWaiter;
// 判斷當(dāng)前節(jié)點(diǎn)是否是共享模式
final boolean isShared() {
return nextWaiter == SHARED;
}
//返回前驅(qū)節(jié)點(diǎn)
final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}
}
從上面的Node的結(jié)構(gòu)可以看出Node組成的隊(duì)列是一個(gè)基于雙向鏈表
實(shí)現(xiàn)的隊(duì)列.每個(gè)因?yàn)楂@取共享資源的線程被阻塞等待時(shí)都將被封裝成一個(gè)Node
添加到等待隊(duì)列.線程本身以及線程的等待狀態(tài)都被封裝在該隊(duì)列中.
state
上面說(shuō)了封裝等待線程信息的Node,但是原生的CLH
實(shí)現(xiàn)的鎖沒(méi)法實(shí)現(xiàn)重入性
的.但是AQS
中卻實(shí)現(xiàn)了重入性
,AQS
是如何實(shí)現(xiàn)可重入性的呢?同時(shí)我們所說(shuō)的共享資源到底是什么呢?
上面兩個(gè)問(wèn)題的答案就是:state
.在AQS
內(nèi)部有一個(gè)變量,它的定義如下:
private volatile int state;
AQS
就是使用state
來(lái)表示共享資源.而且需要注意的是它是使用volatile
來(lái)修飾的.其實(shí)不僅僅只有這個(gè)變量是使用volatile
來(lái)修飾,還有很多變量都是使用了該詞修飾,主要作用還是為了多線程修改值在其他線程中能感知到.同時(shí)AQS
還提供了一個(gè)重要的方法用來(lái)修改state
的值,具體方法如下:
protected final boolean compareAndSetState(int expect, int update) {
// See below for intrinsics setup to support this
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
上面是一個(gè)CAS
操作,expect
預(yù)期值也可以理解為舊值,update
要更新的值.在更新state
值的時(shí)候會(huì)先比較目前state
的值是不是和expect
值一樣,如果一樣說(shuō)明沒(méi)有其他線程修改過(guò)該值則更新成功.如果不一樣則說(shuō)明有其他線程修改過(guò)state
的值則該次修改失敗并返回false
.在AQS
中海油其他地方也使用CAS
操作,后面遇見(jiàn)了不會(huì)再做解釋了.
上面說(shuō)明state
用來(lái)表示共享資源,還有另外一個(gè)問(wèn)題就是為什么能重入?我摘取ReentrantLock
中一段源碼來(lái)解釋,該段代碼為內(nèi)部類(lèi)Sync
中非公平模式獲取資源的實(shí)現(xiàn).
// acquires 代表需要獲取的資源數(shù)
final boolean nonfairTryAcquire(int acquires) {
//獲取當(dāng)前線程
final Thread current = Thread.currentThread();
//獲取state的值
int c = getState();
//如果state=0,說(shuō)明資源沒(méi)有被任何線程占用.如果大于0,說(shuō)明已經(jīng)有線程獲取了資源.需要說(shuō)明是state代表資源,但是具體意思會(huì)因?yàn)楣蚕砟J胶酮?dú)占模式而有所不同
if (c == 0) {
//CAS操作更新資源值
if (compareAndSetState(0, acquires)) {
//將當(dāng)前線程設(shè)置成當(dāng)共享資源的擁有者.其實(shí)就是將當(dāng)前線程保存到AQS實(shí)例中
setExclusiveOwnerThread(current);
//返回獲取資源成功
return true;
}
}
//說(shuō)明已經(jīng)有線程占用了共享資源,且是同一線程
else if (current == getExclusiveOwnerThread()) {
//重入重入重入重入重入重入重入
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
//修改state的值,很巧妙的實(shí)現(xiàn)了重入
setState(nextc);
return true;
}
//占用資源的不是當(dāng)前線程,說(shuō)明該次獲取鎖失敗
return false;
}
上面代碼解釋了如何重入性
是如何實(shí)現(xiàn)的.這里面有兩個(gè)關(guān)鍵點(diǎn)就在于state
和ownerThread
.當(dāng)線程再次重入獲取鎖時(shí),我們只需要比較當(dāng)前獲取鎖的線程是不是AQS
的擁有者時(shí)就能判斷是不是重入
.如果是只需要更新state
的值即可,這個(gè)設(shè)計(jì)真的牛逼.而如何保存AQS
的擁有者時(shí),這個(gè)實(shí)現(xiàn)也很簡(jiǎn)單,代碼如下:
public abstract class AbstractOwnableSynchronizer
implements java.io.Serializable {
private static final long serialVersionUID = 3737899427754241961L;
protected AbstractOwnableSynchronizer() { }
private transient Thread exclusiveOwnerThread;
protected final void setExclusiveOwnerThread(Thread thread) {
exclusiveOwnerThread = thread;
}
protected final Thread getExclusiveOwnerThread() {
return exclusiveOwnerThread;
}
}
AbstractQueuedSynchronizer
繼承了這個(gè)類(lèi),這個(gè)類(lèi)就是通過(guò)一個(gè)屬性用來(lái)保存當(dāng)前擁有線程的.
結(jié)構(gòu)圖
通過(guò)上面的圖心里大概應(yīng)該有了模糊的概念了.下面這張圖將展示AQS
內(nèi)部的等待隊(duì)列的結(jié)構(gòu):
AQS
內(nèi)部使用head
和tail
用來(lái)保存隊(duì)列的頭結(jié)點(diǎn)和尾節(jié)點(diǎn).使用state
代表資源.每個(gè)Node
用來(lái)保存等待獲取共享資源的線程.這就是AQS
的核心內(nèi)容了.
代碼解析
從上面的內(nèi)容我們知道了獲取資源有兩種模式,一種為獨(dú)占模式,另一種為共享模式.我們下面就根據(jù)模式的不同做詳細(xì)的代碼分析整個(gè)流程:
獨(dú)占模式
獨(dú)占模式獲取鎖
如果我們要分析源碼首先要找到一個(gè)入口,這樣分析起來(lái)就比較容易了.獨(dú)占模式獲取共享資源的入口一共有下面三個(gè):
-
acquire(int arg)
:這個(gè)方法是不響應(yīng)中斷和超時(shí)的. -
acquireInterruptibly(int arg)
:這個(gè)是響應(yīng)中斷的. -
tryAcquireNanos(int arg, long nanosTimeout)
:這個(gè)是響應(yīng)超時(shí)的.
上面三個(gè)方法就是獲取獨(dú)占模式下獲取共享資源的入口了,具體實(shí)現(xiàn)有部分差別,但是核心過(guò)程都差不多.我這里只分析acquire
方法,其他的如果你自己有興趣可以自己分析.
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
//如果線程被中斷過(guò)則調(diào)用 Thread.currentThread().interrupt()
selfInterrupt();
}
- 首先線程嘗試去獲取鎖,如果獲取成功則繼續(xù)執(zhí)行業(yè)務(wù)邏輯,失敗則進(jìn)行下一步.
- 嘗試搶鎖失敗,調(diào)用
addWaiter()
將線程添加到尾部并標(biāo)記自己為獨(dú)占模式 - acquireQueued線程阻塞在等待隊(duì)列.
private Node addWaiter(Node mode) {
//創(chuàng)建Node節(jié)點(diǎn),根據(jù)mode參數(shù)設(shè)置該節(jié)點(diǎn)是共享還是排他模式
Node node = new Node(Thread.currentThread(), mode);
//獲取到尾節(jié)點(diǎn)
Node pred = tail;
if (pred != null) {
//尾節(jié)點(diǎn)不為空,說(shuō)明當(dāng)前隊(duì)列不為空
//設(shè)置當(dāng)前節(jié)點(diǎn)的前節(jié)點(diǎn)為獲取到的尾節(jié)點(diǎn)
node.prev = pred;
//CAS操作將當(dāng)前節(jié)點(diǎn)設(shè)置成尾節(jié)點(diǎn)
if (compareAndSetTail(pred, node)) {
//設(shè)置成功之后將前置節(jié)點(diǎn)的尾節(jié)點(diǎn)設(shè)置成自己
pred.next = node;
//返回當(dāng)前線程封裝的Node實(shí)例
return node;
}
}
//可能尾節(jié)點(diǎn)為空或者設(shè)置當(dāng)前節(jié)點(diǎn)為尾節(jié)點(diǎn)失敗
enq(node);
return node;
}
//快速插入隊(duì)列失敗,使用自旋方式再試
private Node enq(final Node node) {
//for循環(huán)自旋
for (;;) {
//獲取當(dāng)前隊(duì)列的尾節(jié)點(diǎn)
Node t = tail;
if (t == null) {
//尾節(jié)點(diǎn)為空,創(chuàng)建一個(gè)新節(jié)點(diǎn)作為頭尾節(jié)點(diǎn).CAS操作.然后再次循環(huán)
if (compareAndSetHead(new Node()))
tail = head;
} else {
//已經(jīng)存在尾節(jié)點(diǎn)了,將等待線程的node的前置節(jié)點(diǎn)為尾節(jié)點(diǎn)
node.prev = t;
//CAS操作將自己設(shè)置成尾節(jié)點(diǎn),失敗則自旋重試
if (compareAndSetTail(t, node)) {
//將自己設(shè)置成為節(jié)點(diǎn)成功,將前置節(jié)點(diǎn)的尾節(jié)點(diǎn)設(shè)置成自己,退出自旋
t.next = node;
return t;
}
}
}
}
上面代碼展示了將當(dāng)前線程封裝成Node
實(shí)例,并將它添加到隊(duì)列尾部的操作.在這個(gè)過(guò)程中首先會(huì)在addWaiter
快速的插入一次,如果這次失敗則進(jìn)入enq
自旋插入.這里面的精髓就在于自旋
和CAS
.
完成節(jié)點(diǎn)的插入之后,下面要進(jìn)行的就是使線程在隊(duì)列中阻塞等待.阻塞等待知道其他線程徹底釋放了資源然后喚醒自己.
final boolean acquireQueued(final Node node, int arg) {
//是否發(fā)生了異常
boolean failed = true;
try {
//標(biāo)記線程是否中斷
boolean interrupted = false;
//開(kāi)始自旋
for (;;) {
//獲取當(dāng)前節(jié)點(diǎn)的前置節(jié)點(diǎn)
final Node p = node.predecessor();
//如果前置節(jié)點(diǎn)是頭節(jié)點(diǎn),說(shuō)明自己實(shí)際上是隊(duì)列的頭節(jié)點(diǎn)(頭節(jié)點(diǎn)實(shí)際上是個(gè)虛節(jié)點(diǎn)),那么可以嘗試再獲取一次鎖,如果成功那就不進(jìn)入等待了
if (p == head && tryAcquire(arg)) {
//獲取鎖成功了,將頭結(jié)點(diǎn)設(shè)計(jì)成自己
setHead(node);
// 去掉前置節(jié)點(diǎn)與當(dāng)前節(jié)點(diǎn)的引用,便于GC回收無(wú)用節(jié)點(diǎn)
p.next = null;
//設(shè)置為false
failed = false;
return interrupted;
}
//如果自己不是老二或者獲取鎖失敗,那就老老實(shí)實(shí)等待.但是進(jìn)入等待前需要告訴前面一個(gè)人記得叫醒自己,同時(shí)前面沒(méi)用人需要把他從隊(duì)列中移除.
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
//進(jìn)入到該代碼塊說(shuō)明線程被中斷了,修改中斷標(biāo)志
interrupted = true;
}
} finally {
//如果獲取鎖的過(guò)程中出現(xiàn)異常,則會(huì)取消當(dāng)前節(jié)點(diǎn)
if (failed)
cancelAcquire(node);
}
}
//將前置節(jié)點(diǎn)的狀態(tài)設(shè)置為-1(SIGNAL)
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
//獲取當(dāng)前置節(jié)點(diǎn)的狀態(tài)
int ws = pred.waitStatus;
//前置節(jié)點(diǎn)已經(jīng)為-1了,說(shuō)明前置節(jié)點(diǎn)會(huì)叫我們醒來(lái)
if (ws == Node.SIGNAL)
return true;
//waitStatus說(shuō)明節(jié)點(diǎn)已經(jīng)被取消了,需要將無(wú)效節(jié)點(diǎn)刪除掉
if (ws > 0) {
//刪除當(dāng)前節(jié)點(diǎn)前的無(wú)效節(jié)點(diǎn)
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
//前置節(jié)點(diǎn)正常,將前置節(jié)點(diǎn)狀態(tài)設(shè)置成-1,CAS操作
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
//讓當(dāng)前線程進(jìn)入等待,醒來(lái)后返回當(dāng)前線程是否被中斷過(guò)
private final boolean parkAndCheckInterrupt() {
//調(diào)用park使線程進(jìn)入等待.它醒來(lái)的條件為其他線程調(diào)用unpack或者線程中斷
LockSupport.park(this);
return Thread.interrupted();
}
上面代碼就是讓線程進(jìn)入等待的全過(guò)程.它核心兩個(gè)操作就是告訴前置有效節(jié)點(diǎn)喚醒自己
和進(jìn)入等待
.
- 判斷當(dāng)前節(jié)點(diǎn)的前置節(jié)點(diǎn)是不是頭節(jié)點(diǎn),如果當(dāng)前節(jié)點(diǎn)的前置節(jié)點(diǎn)是頭節(jié)點(diǎn)則嘗試獲取一次鎖.
- 如果步驟1獲取鎖成功則返回退出了.如果沒(méi)有獲取成功,則從第一步從新再來(lái).
- 如果前置節(jié)點(diǎn)不是頭節(jié)點(diǎn),則進(jìn)入
shouldParkAfterFailedAcquire
邏輯.
shouldParkAfterFailedAcquire
主要作用就是讓自己"安心的休息".怎樣才能安心的休息呢?就是將當(dāng)前節(jié)點(diǎn)的前置節(jié)點(diǎn)狀態(tài)設(shè)置為SIGNAL(-1)
.
- 獲取前置節(jié)點(diǎn)的狀態(tài),可能為
-1
,大于0
,小于或者等于0
這三種情況. -
-1
:不需要再做別的事了,直接返回true
-
大于0
:只要節(jié)點(diǎn)狀態(tài)大于0說(shuō)明節(jié)點(diǎn)是個(gè)無(wú)效節(jié)點(diǎn),這個(gè)時(shí)候會(huì)刪除當(dāng)前節(jié)點(diǎn)的無(wú)效節(jié)點(diǎn).這個(gè)里面的邏輯就是一個(gè)簡(jiǎn)單的鏈表刪除邏輯. -
小于或者等于0
:節(jié)點(diǎn)都是有效的節(jié)點(diǎn).我們使用CAS的方式將其設(shè)置成SIGNAL
狀態(tài)即可.
parkAndCheckInterrupt
讓線程進(jìn)入等待.這個(gè)里面代碼很簡(jiǎn)短,它就是通過(guò)調(diào)用LockSupport.park(this)
進(jìn)入等待的.再次強(qiáng)調(diào)文章前的預(yù)備知識(shí)請(qǐng)一定先了解!!!!
.
在finally
代碼塊中還有一個(gè)邏輯就是用來(lái)處理獲取資源失敗的情況.有些博文說(shuō)這個(gè)地方會(huì)在線程中斷和線程等待超時(shí)時(shí)就會(huì)調(diào)用cancelAcquire
用來(lái)取消節(jié)點(diǎn)在隊(duì)列中等待,這個(gè)不是不對(duì)
的.這個(gè)只會(huì)在獲取鎖的過(guò)程中出現(xiàn)異常才會(huì)取消當(dāng)前節(jié)點(diǎn).因?yàn)橹挥芯€程只有獲取到鎖和異常才會(huì)退出自旋,acquireQueued
是不會(huì)響應(yīng)超時(shí)和中斷的.而響應(yīng)中斷和超時(shí)的方法為:doAcquireInterruptibly
和doAcquireNanos
.
//取消在隊(duì)列中等待
private void cancelAcquire(Node node) {
//如果節(jié)點(diǎn)不存在,則直接忽略
if (node == null)
return;
//將節(jié)點(diǎn)上的線程引用設(shè)置為null
node.thread = null;
//跳過(guò)節(jié)點(diǎn)前面取消的節(jié)點(diǎn)
Node pred = node.prev;
while (pred.waitStatus > 0)
node.prev = pred = pred.prev;
//獲取篩選后的前置節(jié)點(diǎn)的后置節(jié)點(diǎn)
Node predNext = pred.next;
//將當(dāng)前節(jié)點(diǎn)設(shè)置成取消狀態(tài)
node.waitStatus = Node.CANCELLED;
//如果當(dāng)前節(jié)點(diǎn)為尾節(jié)點(diǎn),將從后往前的第一個(gè)節(jié)點(diǎn)設(shè)置為尾節(jié)點(diǎn)
if (node == tail && compareAndSetTail(node, pred)) {
//去掉前置節(jié)點(diǎn)對(duì)當(dāng)前節(jié)點(diǎn)的引用.
compareAndSetNext(pred, predNext, null);
} else {
//前一個(gè)有效節(jié)點(diǎn)不是頭節(jié)點(diǎn)
int ws;
//如果當(dāng)前節(jié)點(diǎn)不是老二,1-判斷當(dāng)前節(jié)點(diǎn)的前置節(jié)點(diǎn)是否為SIGNAL,2-如果不是則將它設(shè)置成SIGNAL
//如果1和2任意一個(gè)成功,再判斷當(dāng)前節(jié)點(diǎn)是不是null
if (pred != head &&
((ws = pred.waitStatus) == Node.SIGNAL ||
(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
pred.thread != null) {
//將當(dāng)前節(jié)點(diǎn)的前置節(jié)點(diǎn)的后節(jié)點(diǎn)設(shè)置為當(dāng)前節(jié)點(diǎn)的后置節(jié)點(diǎn)
Node next = node.next;
if (next != null && next.waitStatus <= 0)
compareAndSetNext(pred, predNext, next);
} else {
//如果當(dāng)前節(jié)點(diǎn)是head的后繼節(jié)點(diǎn),或者上述條件不滿足,那就喚醒當(dāng)前節(jié)點(diǎn)的后繼節(jié)點(diǎn).(這個(gè)方法將會(huì)在解鎖的時(shí)候細(xì)說(shuō))
unparkSuccessor(node);
}
node.next = node; // help GC
}
}
上面便是取消節(jié)點(diǎn)在隊(duì)列中等待的源碼.首先看前節(jié)點(diǎn)狀態(tài)是不是無(wú)效節(jié)點(diǎn)(即CANCEL
狀態(tài)),如果是就一直往前遍歷直到找到是有效的節(jié)點(diǎn),然后將找到的節(jié)點(diǎn)和當(dāng)前節(jié)點(diǎn)關(guān)聯(lián),接著將當(dāng)前節(jié)點(diǎn)設(shè)置成CANCEL
.接著判斷當(dāng)前節(jié)點(diǎn)的位置做不同的處理方式:
-
當(dāng)前節(jié)點(diǎn)是尾節(jié)點(diǎn)
:直接將前節(jié)點(diǎn)設(shè)置成尾節(jié)點(diǎn),并把前節(jié)的后置節(jié)點(diǎn)設(shè)置成null
.
-
當(dāng)前節(jié)點(diǎn)是老二
:將當(dāng)前節(jié)點(diǎn)的后置節(jié)點(diǎn)設(shè)置成自己.
-
不上上面兩種
:將前節(jié)點(diǎn)的尾節(jié)點(diǎn)設(shè)置成當(dāng)前節(jié)點(diǎn)的后節(jié)點(diǎn),將自己的后節(jié)點(diǎn)設(shè)置成自己.
上面操作中我們只對(duì)next
做了操作并沒(méi)有對(duì)節(jié)點(diǎn)的pred
做修改.因?yàn)槿绻薷?code>pred可能導(dǎo)致pred
指向一個(gè)已經(jīng)被移除的節(jié)點(diǎn).在shouldParkAfterFailedAcquire
方法中我們可能會(huì)做下面這個(gè)操作:
if (ws > 0) {
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
}
當(dāng)進(jìn)入這個(gè)方法說(shuō)明共享資源已經(jīng)被其他線程獲取了,當(dāng)前節(jié)點(diǎn)之前的節(jié)點(diǎn)都不會(huì)變化了所以在這個(gè)時(shí)候修改pred
是安全的.
獨(dú)占模式釋放鎖
上面我們講了獨(dú)占模式下如何獲取鎖,下面介紹如何釋放鎖.它的入口方法為release(int arg)
.
public final boolean release(int arg) {
//tryRelease根據(jù)自己的需求實(shí)現(xiàn)
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
解鎖過(guò)程相對(duì)比較簡(jiǎn)單,核心的方法就是unparkSuccessor
.
//喚醒節(jié)點(diǎn)
private void unparkSuccessor(Node node) {
//獲取當(dāng)前節(jié)點(diǎn)的等待狀態(tài)
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
//獲取當(dāng)前節(jié)點(diǎn)的下一個(gè)節(jié)點(diǎn)
Node s = node.next;
//如果下一個(gè)節(jié)點(diǎn)為空或者取消了,就找到隊(duì)列最開(kāi)始的有效節(jié)點(diǎn)
if (s == null || s.waitStatus > 0) {
s = null;
//從隊(duì)列的尾部向頭部開(kāi)始找,找到隊(duì)列第一個(gè)有效狀態(tài)(waitStatus < 0)的節(jié)點(diǎn)
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
//如果找到了,則喚醒封裝在s中的線程.
if (s != null)
LockSupport.unpark(s.thread);
}
可以發(fā)現(xiàn)解鎖喚醒線程的核心就是調(diào)用LockSupport.unpark(s.thread)
.而獲取鎖時(shí)線程的等待是通過(guò)LockSupport.park(this)
實(shí)現(xiàn)的.
上面代碼中還有個(gè)一個(gè)關(guān)鍵點(diǎn)就是如果當(dāng)前節(jié)點(diǎn)的后繼節(jié)點(diǎn)為空或者后繼節(jié)點(diǎn)無(wú)效則需要找到一個(gè)節(jié)點(diǎn)喚醒,這里面為什么是從尾節(jié)點(diǎn)往前找而不是往后找?
在入隊(duì)的操作中,我們的代碼如下:
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
//可能這個(gè)還未執(zhí)行
pred.next = node;
return node;
}
}
enq(node);
return node;
}
節(jié)點(diǎn)加入到尾節(jié)點(diǎn)會(huì)先將隊(duì)列之前的tail
節(jié)點(diǎn)設(shè)置為當(dāng)前當(dāng)前節(jié)點(diǎn)的pred
,然后通過(guò)一個(gè)CAS
操作將當(dāng)前節(jié)點(diǎn)設(shè)置成尾節(jié)點(diǎn).如果設(shè)置當(dāng)前尾節(jié)點(diǎn)成功那么node.pred = pred
和compareAndSetTail(pred, node)
就可以看做是一個(gè)原子操作了.但是pred.next = node
這個(gè)操作并不能保證,很可能這個(gè)還未執(zhí)行.同時(shí)我們?cè)谌∠?jié)點(diǎn)時(shí),也是修改next
并未修改pred
.所以在這里才會(huì)從tail
往head
方向查找.
小結(jié)
上面是我對(duì)AQS中獨(dú)占模式獲取鎖和釋放鎖的所有分析.可能分析的不是那么精準(zhǔn)到位.后面還有兩部分內(nèi)容關(guān)于共享模式鎖的獲取和釋放
還有Condition實(shí)現(xiàn)
.這兩部分內(nèi)容會(huì)通過(guò)兩篇文章講述,歡迎關(guān)注查看后續(xù)更新內(nèi)容.