1、前言
AQS(AbstractQueuedSynchronizer)是java.util.concurrent的基礎(chǔ)昭躺。也是Doug Lea大神為廣大java開發(fā)作出的卓越貢獻(xiàn)。J.U.C中的工具類如Semaphore乖寒、CountDownLatch代芜、ReentrantLock退子、ReentrantReadWriteLock等都極大程度依賴了AQS.
我們就簡(jiǎn)單的看一下ReentrantLock的具體實(shí)現(xiàn)钉蒲。
public void lock() { sync.lock();}
public void unlock() { sync.release(1);}
public Condition newCondition() { return sync.newCondition();}
public boolean isLocked() { return sync.isLocked();}
public int getHoldCount() { return sync.getHoldCount();}
對(duì)的切端,你沒有看錯(cuò),這些工具類都是這樣憑借一個(gè)Sync的內(nèi)部類做出的實(shí)現(xiàn)顷啼。而這樣一個(gè)內(nèi)部類繼承了AQS,由此可見AQS對(duì)于J.U.S來說是基石般的存在踏枣,本文也將通過ReentrantLock帶領(lǐng)大家深入的了解AQS。
2线梗、AQS的簡(jiǎn)介
AQS的設(shè)計(jì)思路和原理等我高大上知識(shí)這里就不涉及了。想要了解的可以閱讀Doug Lea大師的對(duì)這一部分的解讀怠益。大師原著
當(dāng)然仪搔,筆者也大力推薦讀者能把每個(gè)類前面的注釋都能讀一讀,在研究類的源碼之前能夠?qū)︻愑幸粋€(gè)系統(tǒng)的視圖蜻牢。
如果大體的看一下AQS的話就能發(fā)現(xiàn)這個(gè)類有三個(gè)非常重要的屬性.
private transient volatile Node head;
private transient volatile Node tail;
private volatile int state;
看到這里我們大膽的做一個(gè)猜想(其實(shí)我都已經(jīng)知道了)
- AQS維護(hù)了一個(gè)隊(duì)列,并記錄隊(duì)列的頭節(jié)點(diǎn)和尾節(jié)點(diǎn)
- 隊(duì)列中的節(jié)點(diǎn)應(yīng)該是因請(qǐng)求資源而阻塞的線程
- AQS同樣維護(hù)了一個(gè)狀態(tài),這個(gè)狀態(tài)應(yīng)該是判斷線程能否獲取到鎖的依據(jù),如果不能,就加入到隊(duì)列
接下來我們來看看這個(gè)Node具體是如何實(shí)現(xiàn)的.
volatile Node prev;//此節(jié)點(diǎn)的前一個(gè)節(jié)點(diǎn)烤咧。
volatile Node next;//此節(jié)點(diǎn)的后一個(gè)節(jié)點(diǎn)
volatile Thread thread;節(jié)點(diǎn)綁定的線程。
volatile int waitStatus;//節(jié)點(diǎn)的等待狀態(tài)抢呆,一個(gè)節(jié)點(diǎn)可能位于以下幾種狀態(tài)
//該狀態(tài)表示節(jié)點(diǎn)超時(shí)或被中斷就會(huì)被踢出隊(duì)列
static final int CANCELLED = 1;
//表示節(jié)點(diǎn)的繼任節(jié)點(diǎn)需要成為BLOCKED狀態(tài)(例如通過LockSupport.park()操作)煮嫌,因此一個(gè)節(jié)點(diǎn)一旦被釋放(解鎖)或者取消就需要喚醒他的繼任節(jié)點(diǎn).
static final int SIGNAL = -1;
//表明節(jié)點(diǎn)對(duì)應(yīng)的線程因?yàn)椴粷M足一個(gè)條件(Condition)而被阻塞。
static final int CONDITION = -2;
可以看出,隊(duì)列是一個(gè)雙向隊(duì)列,并且隊(duì)列中的節(jié)點(diǎn)有幾種可以選擇的狀態(tài)值
再看的話就能看到AQS中定義的幾個(gè)重要的放方法
public final void acquire(int arg);//請(qǐng)求獲取獨(dú)占式資源(鎖)
public final boolean release(int arg);//請(qǐng)求釋放獨(dú)占式資源(鎖)
public final void acquireShared(int arg);//請(qǐng)求獲取共享式資源
public final boolean releaseShared(int arg);//請(qǐng)求釋放共享式資源
//獨(dú)占方式抱虐。嘗試獲取資源昌阿,成功則返回true,失敗則返回false
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
//獨(dú)占方式恳邀。嘗試釋放資源懦冰,成功則返回true,失敗則返回false谣沸。
protected boolean tryRelease(int arg) {
throw new UnsupportedOperationException();
}
//共享方式刷钢。嘗試獲取資源。負(fù)數(shù)表示失斎楦健内地;0表示成功,但沒有剩余可用資源赋除;正數(shù)表示成功阱缓,且有剩余資源
protected int tryAcquireShared(int arg) {
throw new UnsupportedOperationException();
}
protected int tryReleaseShared(int arg) {
throw new UnsupportOperationException();
}
可以看到AQS用acquire()和release()方法提供對(duì)資源的獲取和釋放
但是try**()結(jié)構(gòu)的方法都是只拋出了異常,很顯然這類方法是需要子類去實(shí)現(xiàn)的.
這也因?yàn)锳QS定義了兩種資源共享方式:Exclusive(獨(dú)占,只有一個(gè)線程能執(zhí)行,如ReentrantLock)和Share(共享,多個(gè)線程可以同時(shí)執(zhí)行,如Semaphone/CountDownLatch), AQS負(fù)責(zé)獲取資源(修改state的狀態(tài)),而自定義同步器負(fù)責(zé)就要實(shí)現(xiàn)上述方法告訴AQS獲取資源的規(guī)則.
兩個(gè)重要的狀態(tài)
1、AQS的state
- state可以理解有多少線程獲取了資源,即有多少線程獲取了鎖,初始時(shí)state=0表示沒有線程獲取鎖.
- 獨(dú)占鎖時(shí),這個(gè)值通常為1或者0
- 如果獨(dú)占鎖可重入時(shí),即一個(gè)線程可以多次獲取這個(gè)鎖時(shí),每獲取一次,state就加1
- 一旦有線程想要獲得鎖,就可以通過對(duì)state進(jìn)行CAS增量操作,即原子性的增加state的值
- 其他線程發(fā)現(xiàn)state不為0,這時(shí)線程已經(jīng)不能獲得鎖(獨(dú)占鎖),就會(huì)進(jìn)入AQS的隊(duì)列中等待.
- 釋放鎖是仍然是通過CAS來減小state的值,如果減小到0就表示鎖完全釋放(獨(dú)占鎖)
2举农、Node 中的waitStatus
- Node的正常狀態(tài)是0
- 對(duì)于處在隊(duì)列中的節(jié)點(diǎn)來說,前一個(gè)節(jié)點(diǎn)有喚醒后一個(gè)節(jié)點(diǎn)的任務(wù)
- 所以對(duì)與當(dāng)前節(jié)點(diǎn)的前一個(gè)節(jié)點(diǎn)來說
- 如果waitStatus > 0, 則節(jié)點(diǎn)處于cancel狀態(tài),應(yīng)踢出隊(duì)列
- 如果waitStatus = 0, 則將waitStatus改為-1(signal)
- 因此隊(duì)列中節(jié)點(diǎn)的狀態(tài)應(yīng)該為-1,-1,-1,0
源碼解讀
這一塊開始解讀源碼的實(shí)現(xiàn)部分, 仍然只關(guān)心上面提到的幾種方法.
1茬祷、acquire(int)
此方法是AQS實(shí)現(xiàn)獨(dú)占式資源獲取的頂層方法,這個(gè)方法和reentrantLock.lock()等有著相同的語義.下面我們開始看源碼
public final void acquire(int arg) {
if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
這個(gè)函數(shù)共調(diào)用了4個(gè)方法, 其中tryAcquire(arg)是在子類Sync中實(shí)現(xiàn), 其余在均是AQS中提供.
而這個(gè)方法的流程比較簡(jiǎn)單
- tryAcquire()嘗試獲取資源,如果成功, 則方法結(jié)束, 否則
- addWaiter()方法以獨(dú)占方式將線程加入隊(duì)列的尾部
- acquireQueued()方法是線程在等待隊(duì)列中等待獲取資源
- selfInterrupt(), 如果線程在等待過程中被中斷過,在這里相應(yīng)中斷. 應(yīng)該知道的是,線程在等待過程中是不響應(yīng)中斷的, 只有獲取資源后才能自我中斷.
看不懂不要緊,你已經(jīng)知道了acquire()的大致過程,下面我們一一解讀這其中的4個(gè)方法
流程1、tryAcquire()
此方法嘗試去獲取獨(dú)占資源。如果獲取成功祭犯,則直接返回true秸妥,否則直接返回false。tryAcquire()方法前面已經(jīng)說過,這個(gè)方法是在子類中是實(shí)現(xiàn)的. 而在ReentrantLock中,這個(gè)方法也正是tryLock()的語義.如下是ReentrantLock對(duì)tryAcquire()實(shí)現(xiàn)的源碼:
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {//等于0表示當(dāng)前鎖未被其他線程獲取到
if (!hasQueuedPredecessors() //檢查隊(duì)列中是否有線程在當(dāng)前線程的前面
&& compareAndSetState(0, acquires)) {//CAS操作state,鎖獲取成功
setExclusiveOwnerThread(current); //設(shè)置當(dāng)前線程為占有鎖的線程
return true;
}
} else if (current == getExclusiveOwnerThread()) {//非0,鎖已經(jīng)被獲取,并且是當(dāng)前線程獲取.支持可重入鎖
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc); //更改狀態(tài)位,
return true;
}
return false;//未能獲取鎖
}
public final boolean hasQueuedPredecessors() {
Node t = tail;
Node h = head;
Node s;
return h != t && ((s = h.next) == null || s.thread != Thread.currentThread());
}
- 如果head=tail, 則隊(duì)列未被初始化, 返回false, 否則
- 如果隊(duì)列中沒有線程正在等待, 返回true, 否則
- 如果當(dāng)前線程是隊(duì)列中的第一個(gè)元素, 返回true,否則返回false
流程2沃粗、addWaiter(int)
再看acquire()的第二個(gè)流程,獲取鎖失敗, 則將線程加入隊(duì)列尾部, 返回新加入的節(jié)點(diǎn)
private Node addWaiter(Node mode) {
//以給定的模式構(gòu)建節(jié)點(diǎn),節(jié)點(diǎn)有共享和獨(dú)占兩種模式
Node node = new Node(Thread.currentThread(), mode);
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {//CAS更新尾部節(jié)點(diǎn)
pred.next = node; //雙向隊(duì)列
return node;
}
}
enq(node); //如果隊(duì)列沒有初始化,程序就會(huì)到這一步.
return node;
}
private Node enq(final Node node) {
for (;;) {//經(jīng)典的CAS配合使用方式, 一直循環(huán)知道CAS更新成功.
Node t = tail;
if (t == null) {//隊(duì)列為空, 沒有初始化,必須初始化
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) { //設(shè)置尾節(jié)點(diǎn),此時(shí)的head是頭節(jié)點(diǎn),不存放數(shù)據(jù)
t.next = node;
return t;
}
}
}
}
流程3粥惧、acquireQueued()
addWaiter()完成后返回新加入隊(duì)列的節(jié)點(diǎn), 緊接著進(jìn)入下一個(gè)流程acquireQueued(), 在這個(gè)方法中, 會(huì)實(shí)現(xiàn)線程節(jié)點(diǎn)的阻塞和喚醒. 所有節(jié)點(diǎn)在這個(gè)方法的處理下,等待資源
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true; //是否拿到資源
try {
boolean interrupted = false; //等待過程中是否被中斷過
for (;;) { //又是一個(gè)自旋配合CAS設(shè)置變量
final Node p = node.predecessor(); //當(dāng)前節(jié)點(diǎn)的前驅(qū)節(jié)點(diǎn)
if (p == head && tryAcquire(arg)) {
//如果前驅(qū)節(jié)點(diǎn)是頭節(jié)點(diǎn), 則當(dāng)前節(jié)點(diǎn)已經(jīng)具有資格嘗試獲取資源
//(可能是頭節(jié)點(diǎn)任務(wù)完成之后喚醒的后繼節(jié)點(diǎn), 當(dāng)然也可能是被interrup了)
setHead(node); //獲取資源后,設(shè)置當(dāng)前節(jié)點(diǎn)為頭節(jié)點(diǎn)
p.next = null; // help GC
failed = false;
return interrupted;
}
//如果不能獲取資源, 就進(jìn)入waiting狀態(tài)
if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus; //獲取前一個(gè)節(jié)點(diǎn)的狀態(tài),還記得waitStatus是上面意思嘸?
if (ws == Node.SIGNAL)
/*
*如果前驅(qū)節(jié)點(diǎn)完成任務(wù)后能夠喚醒自己,那么當(dāng)前節(jié)點(diǎn)就可以放心的睡覺了.
*記住,喚醒當(dāng)前節(jié)點(diǎn)的任務(wù)是前驅(qū)節(jié)點(diǎn)完成
*/
return true;
if (ws > 0) { //ws大于0表示節(jié)點(diǎn)已經(jīng)被取消,應(yīng)該踢出隊(duì)列.
do {
//節(jié)點(diǎn)的前驅(qū)引用指向更前面的沒有被取消的節(jié)點(diǎn). 所以被取消的節(jié)點(diǎn)沒有引用之后會(huì)被GC
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
//找到了合適的前驅(qū)節(jié)點(diǎn)后,將其狀態(tài)設(shè)置為SIGNAL
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
接下來是 parkAndCheckInterrupt() 方法, 真正讓節(jié)點(diǎn)進(jìn)入waiting狀態(tài)的方法,實(shí)在這個(gè)方法中調(diào)用的..
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this); //JNI調(diào)用, 使線程進(jìn)入waiting狀態(tài)
return Thread.interrupted(); //檢查是否被中斷
}
流程4、selfInterrupt()
上面也說了, acquire()方法不是立即響應(yīng)中斷的. 而是在獲取資源后進(jìn)行自我中斷處理
小結(jié)
到此,acquire()的過程已經(jīng)分析完畢, 我們就知道reentrantLock.lock()的全部過程.總的來說, 就是嘗試獲取資源, 如果獲取不到就進(jìn)入等待隊(duì)列變成等待狀態(tài).具體細(xì)節(jié)前面已經(jīng)詳細(xì)敘述過.
release(int)
講了如何獲取到資源,接下來就應(yīng)該如何釋放資源.這個(gè)方法會(huì)在獨(dú)占的模式下釋放指定的資源(減小state).這個(gè)語義也是reentrantLock.unlock()最盅;
public final boolean release(int arg) {
if (tryRelease(arg)) { //嘗試釋放資源
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h); //喚醒隊(duì)列的下一個(gè)節(jié)點(diǎn)
return true;
}
return false;
}
1 tryRelease()
釋放指定量的資源,這個(gè)方法是在子類中實(shí)現(xiàn)的.我們以reentrantLock.unlock()為例解讀資源釋放的過程
protected final boolean tryRelease(int releases) {
int c = getState() - releases; //state減去指定的量,
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) { //獨(dú)占鎖模式下,state為0時(shí)表示沒有線程獲取鎖,這時(shí)才算是當(dāng)前線程完全釋放鎖
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
2 unparkSuccessor(Node node)
用這個(gè)方法喚醒后繼節(jié)點(diǎn)
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
Node s = node.next;
if (s == null || s.waitStatus > 0) { //waitStatus表示節(jié)點(diǎn)已經(jīng)被取消,應(yīng)該踢出隊(duì)列
s = null;
//從后想前找到最靠前的合法節(jié)點(diǎn)
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
這個(gè)方法也比較簡(jiǎn)單,就是用一個(gè)JNI方法unpark()喚醒隊(duì)列中下一個(gè)需要處理的節(jié)點(diǎn),.
非公平鎖
上述介紹完ReentrantLock中的公平鎖,首先回顧一下公平鎖的整個(gè)流程
1突雪、ReentrantLock.lock(),這個(gè)請(qǐng)求回交由內(nèi)部類Sync處理。因?yàn)橛泄胶头枪降膮^(qū)分涡贱,所以Sync轉(zhuǎn)而把任務(wù)交給子類NonfairSync或者FairSync處理咏删。
2、在具體來看看FairSycn中的lock具體怎么實(shí)現(xiàn)
final void lock() { acquire(1);}
acquire()的具體實(shí)現(xiàn)上面已經(jīng)介紹问词,接下來登場(chǎng)的就是非公平鎖的實(shí)現(xiàn)督函。
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
在非公平鎖中,線程請(qǐng)求資源是會(huì)先查看鎖是否被占用激挪,如果鎖空閑就直接占用鎖辰狡,否則究進(jìn)行acquire()。而這個(gè)函數(shù)是有AQS具體實(shí)現(xiàn)垄分,所以宛篇,如果當(dāng)前鎖被占用,非公平鎖就享受和公平鎖一樣的待遇薄湿,就是老老實(shí)實(shí)的進(jìn)入等待隊(duì)列叫倍,等前任節(jié)點(diǎn)喚醒自己。
總結(jié)
到這里我們已經(jīng)講述了獨(dú)占鎖的獲取和釋放. 當(dāng)然沒有涉及的還很多.如共享模式我們還沒有涉及.以及響應(yīng)中斷的acquireInterruptibly()也沒有涉及.
至于響應(yīng)中斷,實(shí)現(xiàn)起來與上面介紹的并無太大區(qū)別.共享式鎖獲取與釋放待后續(xù)篇章再來研究.
水平有限,水平有限,有任何不當(dāng)之處,還請(qǐng)指正.本文也還在打磨之中