在上一篇文章中我們對Lock
和AbstractQueuedSynchronizer(AQS)
有了初步的認識瓶堕。在同步組件的實現(xiàn)中,AQS
是核心部分症歇,同步組件的實現(xiàn)者通過使用AQS
提供的模板方法實現(xiàn)同步組件語義郎笆,AQS
則實現(xiàn)了對同步狀態(tài)的管理谭梗,以及對阻塞線程進行排隊,等待通知等等一些底層的實現(xiàn)處理宛蚓。AQS
的核心也包括了這些方面:同步隊列激捏,獨占式鎖的獲取和釋放,共享鎖的獲取和釋放以及可中斷鎖凄吏,超時等待鎖獲取這些特性的實現(xiàn)缩幸,而這些實際上則是AQS
提供出來的模板方法,歸納整理如下:
獨占式鎖:
void acquire(int arg):
獨占式獲取同步狀態(tài)竞思,如果獲取失敗則插入同步隊列進行等待表谊;
void acquireInterruptibly(int arg):
與acquire
方法相同,但在同步隊列中進行等待的時候可以檢測中斷盖喷;
boolean tryAcquireNanos(int arg, long nanosTimeout):
在acquireInterruptibly
基礎(chǔ)上增加了超時等待功能爆办,在超時時間內(nèi)沒有獲得同步狀態(tài)返回false;
boolean release(int arg):
釋放同步狀態(tài),該方法會喚醒在同步隊列中的下一個節(jié)點
共享式鎖:
void acquireShared(int arg):
共享式獲取同步狀態(tài)课梳,與獨占式的區(qū)別在于同一時刻有多個線程獲取同步狀態(tài)距辆;
void acquireSharedInterruptibly(int arg):
在acquireShared方法基礎(chǔ)上增加了能響應中斷的功能;
boolean tryAcquireSharedNanos(int arg, long nanosTimeout):
在acquireSharedInterruptibly基礎(chǔ)上增加了超時等待的功能暮刃;
boolean releaseShared(int arg):
共享式釋放同步狀態(tài)
要想掌握AQS
的底層實現(xiàn)跨算,其實也就是對這些模板方法的邏輯進行學習。在學習這些模板方法之前椭懊,我們得首先了解下AQS
中的同步隊列是一種什么樣的數(shù)據(jù)結(jié)構(gòu)诸蚕,因為同步隊列是AQS
對同步狀態(tài)的管理的基石。
2. 同步隊列
AQS提供了一個基于FIFO隊列氧猬,可以用于構(gòu)建鎖或者其他相關(guān)同步裝置的基礎(chǔ)框架背犯。該同步器(以下簡稱同步器)利用了一個int來表示狀態(tài),期望它能夠成為實現(xiàn)大部分同步需求的基礎(chǔ)盅抚。使用的方法是繼承漠魏,子類通過繼承同步器并需要實現(xiàn)它的方法來管理其狀態(tài),管理的方式就是通過類似acquire和release的方式來操縱狀態(tài)妄均。然而多線程環(huán)境中對狀態(tài)的操縱必須確保原子性柱锹,因此子類對于狀態(tài)的把握,需要使用這個同步器提供的以下三個方法對狀態(tài)進行操作:
- java.util.concurrent.locks.AbstractQueuedSynchronizer.getState()
- java.util.concurrent.locks.AbstractQueuedSynchronizer.setState(int)
- java.util.concurrent.locks.AbstractQueuedSynchronizer.compareAndSetState(int, int)
子類推薦被定義為自定義同步裝置的內(nèi)部類丰包,同步器自身沒有實現(xiàn)任何同步接口禁熏,它僅僅是定義了若干acquire之類的方法來供使用。該同步器即可以作為排他模式也可以作為共享模式烫沙,當它被定義為一個排他模式時匹层,其他線程對其的獲取就被阻止隙笆,而共享模式對于多個線程獲取都可以成功锌蓄。
同步器是實現(xiàn)鎖的關(guān)鍵升筏,利用同步器將鎖的語義實現(xiàn),然后在鎖的實現(xiàn)中聚合同步器瘸爽∧茫可以這樣理解:鎖的API是面向使用者的,它定義了與鎖交互的公共行為剪决,而每個鎖需要完成特定的操作也是透過這些行為來完成的(比如:可以允許兩個線程進行加鎖灵汪,排除兩個以上的線程),但是實現(xiàn)是依托給同步器來完成柑潦;同步器面向的是線程訪問和資源控制享言,它定義了線程對資源是否能夠獲取以及線程的排隊等操作。鎖和同步器很好的隔離了二者所需要關(guān)注的領(lǐng)域渗鬼,嚴格意義上講览露,同步器可以適用于除了鎖以外的其他同步設(shè)施上(包括鎖)。
同步器的開始提到了其實現(xiàn)依賴于一個FIFO隊列譬胎,那么隊列中的元素Node就是保存著線程引用和線程狀態(tài)的容器差牛,每個線程對同步器的訪問,都可以看做是隊列中的一個節(jié)點堰乔。Node的主要包含以下成員變量:
Node {
int waitStatus;
Node prev;
Node next;
Node nextWaiter;
Thread thread;
}
以上五個成員變量主要負責保存該節(jié)點的線程引用偏化,同步等待隊列(以下簡稱sync隊列)的前驅(qū)和后繼節(jié)點,同時也包括了同步狀態(tài)镐侯。
屬性名稱 | 描述 |
---|---|
int waitStatus | 表示節(jié)點的狀態(tài)侦讨。其中包含的狀態(tài)有:1. CANCELLED,值為1苟翻,表示當前的線程被取消搭伤; 2. SIGNAL,值為-1袜瞬,表示當前節(jié)點的后繼節(jié)點包含的線程需要運行怜俐,也就是unpark; 3. CONDITION邓尤,值為-2拍鲤,表示當前節(jié)點在等待condition,也就是在condition隊列中汞扎; 4. PROPAGATE季稳,值為-3,表示當前場景下后續(xù)的acquireShared能夠得以執(zhí)行澈魄; 5. 值為0景鼠,表示當前節(jié)點在sync隊列中,等待著獲取鎖。
|
Node prev | 前驅(qū)節(jié)點铛漓,比如當前節(jié)點被取消溯香,那就需要前驅(qū)節(jié)點和后繼節(jié)點來完成連接。 |
Node next | 后繼節(jié)點浓恶。 |
Node nextWaiter | 存儲condition隊列中的后繼節(jié)點玫坛。 |
Thread thread | 入隊列時的當前線程。 |
節(jié)點成為sync隊列和condition隊列構(gòu)建的基礎(chǔ)包晰,在同步器中就包含了sync隊列湿镀。同步器擁有三個成員變量:sync隊列的頭結(jié)點head、sync隊列的尾節(jié)點tail和狀態(tài)state伐憾。對于鎖的獲取勉痴,請求形成節(jié)點,將其掛載在尾部树肃,而鎖資源的轉(zhuǎn)移(釋放再獲仁赐取)是從頭部開始向后進行。對于同步器維護的狀態(tài)state扫外,多個線程對其的獲取將會產(chǎn)生一個鏈式的結(jié)構(gòu)莉钙。
現(xiàn)在我們知道了節(jié)點的數(shù)據(jù)結(jié)構(gòu)類型,并且每個節(jié)點擁有其前驅(qū)和后繼節(jié)點筛谚,很顯然這是一個雙向隊列磁玉。同樣的我們可以用一段demo看一下。
public class LockDemo {
private static ReentrantLock lock = new ReentrantLock();
public static void main(String[] args) {
for (int i = 0; i < 5; i++) {
Thread thread = new Thread(() -> {
lock.lock();
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
});
thread.start();
}
}
}
實例代碼中開啟了5個線程驾讲,先獲取鎖之后再睡眠10S中蚊伞,實際上這里讓線程睡眠是想模擬出當線程無法獲取鎖時進入同步隊列的情況。通過debug吮铭,當Thread-4(在本例中最后一個線程)獲取鎖失敗后進入同步時时迫,AQS時現(xiàn)在的同步隊列如圖所示:
Thread-0先獲得鎖后進行睡眠,其他線程(Thread-1,Thread-2,Thread-3,Thread-4)獲取鎖失敗進入同步隊列谓晌,同時也可以很清楚的看出來每個節(jié)點有兩個域:prev
(前驅(qū))和next
(后繼)掠拳,并且每個節(jié)點用來保存獲取同步狀態(tài)失敗的線程引用以及等待狀態(tài)等信息。另外AQS
中有兩個重要的成員變量:
private transient volatile Node head;
private transient volatile Node tail;
也就是說AQS
實際上通過頭尾指針來管理同步隊列纸肉,同時實現(xiàn)包括獲取鎖失敗的線程進行入隊溺欧,釋放鎖時對同步隊列中的線程進行通知等核心方法。其示意圖如下:
通過對源碼的理解以及做實驗的方式柏肪,現(xiàn)在我們可以清楚的知道這樣幾點:
-
節(jié)點的數(shù)據(jù)結(jié)構(gòu)姐刁,即
AQS
的靜態(tài)內(nèi)部類Node
,節(jié)點的等待狀態(tài)等信息; -
同步隊列是一個雙向隊列烦味,
AQS
通過持有頭尾指針管理同步隊列聂使;
那么,節(jié)點如何進行入隊和出隊是怎樣做的了?實際上這對應著鎖的獲取和釋放兩個操作:獲取鎖失敗進行入隊操作柏靶,獲取鎖成功進行出隊操作弃理。
3. 獨占鎖
3.1 獨占鎖的獲取(acquire方法)
我們繼續(xù)通過看源碼和debug的方式來看宿礁,還是以上面的demo為例案铺,調(diào)用lock()
方法是獲取獨占式鎖蔬芥,獲取失敗就將當前線程加入同步隊列梆靖,成功則線程執(zhí)行。而lock()
方法實際上會調(diào)用AQS
的acquire()方法笔诵,源碼如下
public final void acquire(int arg) {
//先看同步狀態(tài)是否獲取成功返吻,如果成功則方法結(jié)束返回
//若失敗則先調(diào)用addWaiter()方法再調(diào)用acquireQueued()方法
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
上述邏輯主要包括:
- 嘗試獲取(調(diào)用tryAcquire更改狀態(tài)乎婿,需要保證原子性)测僵;
在tryAcquire方法中使用了同步器提供的對state操作的方法,利用compareAndSet保證只有一個線程能夠?qū)顟B(tài)進行成功修改谢翎,而沒有成功修改的線程將進入sync隊列排隊捍靠。 - 如果獲取不到,將當前線程構(gòu)造成節(jié)點Node并加入sync隊列森逮;
進入隊列的每個線程都是一個節(jié)點Node榨婆,從而形成了一個雙向隊列,類似CLH隊列褒侧,這樣做的目的是線程間的通信會被限制在較小規(guī)模(也就是兩個節(jié)點左右)良风。 - 再次嘗試獲取,如果沒有獲取到那么將當前線程從線程調(diào)度器上摘下闷供,進入等待狀態(tài)烟央。
總結(jié): 使用LockSupport將當前線程unpark,關(guān)于LockSupport后續(xù)會詳細介紹歪脏。關(guān)鍵信息請看注釋疑俭,acquire
根據(jù)當前獲得同步狀態(tài)成功與否做了兩件事情:1. 成功,則方法結(jié)束返回婿失,2. 失敗怠硼,則先調(diào)用addWaiter()
然后在調(diào)用acquireQueued()
方法。
獲取同步狀態(tài)失敗移怯,入隊操作
當線程獲取獨占式鎖失敗后就會將當前線程加入同步隊列香璃,那么加入隊列的方式是怎樣的了?我們接下來就應該去研究一下addWaiter()
和acquireQueued()
舟误。addWaiter()
源碼如下:
private Node addWaiter(Node mode) {
// 1\. 將當前線程構(gòu)建成Node類型
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
// 2\. 當前尾節(jié)點是否為null葡秒?
Node pred = tail;
if (pred != null) {
// 2.2 將當前節(jié)點尾插入的方式插入同步隊列中
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
// 2.1\. 當前同步隊列尾節(jié)點為null,說明當前線程是第一個加入同步隊列進行等待的線程
enq(node);
return node;
}
上述邏輯主要包括:
- 使用當前線程構(gòu)造Node;
對于一個節(jié)點需要做的是將當節(jié)點前驅(qū)節(jié)點指向尾節(jié)點(current.prev = tail)眯牧,尾節(jié)點指向它(tail = current)蹋岩,原有的尾節(jié)點的后繼節(jié)點指向它(t.next = current)而這些操作要求是原子的。上面的操作是利用尾節(jié)點的設(shè)置來保證的学少,也就是compareAndSetTail來完成的剪个。 - 先行嘗試在隊尾添加;
如果尾節(jié)點已經(jīng)有了版确,然后做如下操作:
(1)分配引用T指向尾節(jié)點扣囊;
(2)將節(jié)點的前驅(qū)節(jié)點更新為尾節(jié)點(current.prev = tail);
(3)如果尾節(jié)點是T绒疗,那么將當尾節(jié)點設(shè)置為該節(jié)點(tail = current侵歇,原子更新);
(4)T的后繼節(jié)點指向當前節(jié)點(T.next = current)吓蘑。
注意第3點是要求原子的惕虑。
這樣可以以最短路徑O(1)的效果來完成線程入隊,是最大化減少開銷的一種方式磨镶。 - 如果隊尾添加失敗或者是第一個入隊的節(jié)點溃蔫。
如果是第1個節(jié)點,也就是sync隊列沒有初始化琳猫,那么會進入到enq這個方法伟叛,進入的線程可能有多個,或者說在addWaiter中沒有成功入隊的線程都將進入enq這個方法沸移。
可以看到enq的邏輯是確保進入的Node都會有機會順序的添加到sync隊列中痪伦,而加入的步驟如下:
(1)如果尾節(jié)點為空,那么原子化的分配一個頭節(jié)點雹锣,并將尾節(jié)點指向頭節(jié)點网沾,這一步是初始化;
(2)然后是重復在addWaiter中做的工作蕊爵,但是在一個while(true)的循環(huán)中辉哥,直到當前節(jié)點入隊為止。
進入sync隊列之后攒射,接下來就是要進行鎖的獲取醋旦,或者說是訪問控制了,只有一個線程能夠在同一時刻繼續(xù)的運行会放,而其他的進入等待狀態(tài)饲齐。而每個線程都是一個獨立的個體,它們自省的觀察咧最,當條件滿足的時候(自己的前驅(qū)是頭結(jié)點并且原子性的獲取了狀態(tài))捂人,那么這個線程能夠繼續(xù)運行御雕。
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
//1\. 構(gòu)造頭結(jié)點
if (compareAndSetHead(new Node()))
tail = head;
} else {
// 2\. 尾插入,CAS操作失敗自旋嘗試
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
在上面的分析中我們可以看出在第1步中會先創(chuàng)建頭結(jié)點滥搭,說明同步隊列是帶頭結(jié)點的鏈式存儲結(jié)構(gòu)酸纲。帶頭結(jié)點與不帶頭結(jié)點相比,會在入隊和出隊的操作中獲得更大的便捷性瑟匆,因此同步隊列選擇了帶頭結(jié)點的鏈式存儲結(jié)構(gòu)闽坡。那么帶頭節(jié)點的隊列初始化時機是什么?自然而然是在tail為null時愁溜,即當前線程是第一次插入同步隊列疾嗅。compareAndSetTail(t, node)
方法會利用CAS
操作設(shè)置尾節(jié)點,如果CAS
操作失敗會在for (;;)
for死循環(huán)中不斷嘗試祝谚,直至成功return返回為止宪迟。因此酣衷,對enq()
方法可以做這樣的總結(jié):
- 在當前線程是第一個加入同步隊列時交惯,調(diào)用compareAndSetHead(new Node())方法,完成鏈式隊列的頭結(jié)點的初始化穿仪;
- 自旋不斷嘗試CAS尾插入節(jié)點直至成功為止席爽。
現(xiàn)在我們已經(jīng)很清楚獲取獨占式鎖失敗的線程包裝成Node
然后插入同步隊列的過程了?那么緊接著會有下一個問題啊片?在同步隊列中的節(jié)點(線程)會做什么事情了來保證自己能夠有機會獲得獨占式鎖了只锻?帶著這樣的問題我們就來看看acquireQueued()
方法,從方法名就可以很清楚紫谷,這個方法的作用就是排隊獲取鎖的過程齐饮,源碼如下:
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
// 1\. 獲得當前節(jié)點的先驅(qū)節(jié)點
final Node p = node.predecessor();
// 2\. 當前節(jié)點能否獲取獨占式鎖
// 2.1 如果當前節(jié)點的先驅(qū)節(jié)點是頭結(jié)點并且成功獲取同步狀態(tài),即可以獲得獨占式鎖
if (p == head && tryAcquire(arg)) {
//隊列頭指針用指向當前節(jié)點
setHead(node);
//釋放前驅(qū)節(jié)點
p.next = null; // help GC
failed = false;
return interrupted;
}
// 2.2 獲取鎖失敗笤昨,線程進入等待狀態(tài)等待獲取獨占式鎖
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
上述邏輯主要包括:
- 獲取當前節(jié)點的前驅(qū)節(jié)點祖驱;
需要獲取當前節(jié)點的前驅(qū)節(jié)點,而頭結(jié)點所對應的含義是當前站有鎖且正在運行瞒窒。 - 當前驅(qū)節(jié)點是頭結(jié)點并且能夠獲取狀態(tài)捺僻,代表該當前節(jié)點占有鎖;
如果滿足上述條件崇裁,那么代表能夠占有鎖匕坯,根據(jù)節(jié)點對鎖占有的含義茶没,設(shè)置頭結(jié)點為當前節(jié)點革屠。 - 否則進入等待狀態(tài)猜拾。
如果沒有輪到當前節(jié)點運行脆侮,那么將當前線程從線程調(diào)度器上摘下狡孔,也就是進入等待狀態(tài)适荣。
這里針對acquire做一下總結(jié):
- 狀態(tài)的維護雨女;
需要在鎖定時宇弛,需要維護一個狀態(tài)(int類型),而對狀態(tài)的操作是原子和非阻塞的腰耙,通過同步器提供的對狀態(tài)訪問的方法對狀態(tài)進行操縱榛丢,并且利用compareAndSet來確保原子性的修改。 - 狀態(tài)的獲韧ε印晰赞;
一旦成功的修改了狀態(tài),當前線程或者說節(jié)點选侨,就被設(shè)置為頭節(jié)點掖鱼。 - sync隊列的維護。
在獲取資源未果的過程中條件不符合的情況下(不該自己援制,前驅(qū)節(jié)點不是頭節(jié)點或者沒有獲取到資源)進入睡眠狀態(tài)戏挡,停止線程調(diào)度器對當前節(jié)點線程的調(diào)度。
這時引入的一個釋放的問題晨仑,也就是說使睡眠中的Node或者說線程獲得通知的關(guān)鍵褐墅,就是前驅(qū)節(jié)點的通知,而這一個過程就是釋放洪己,釋放會通知它的后繼節(jié)點從睡眠中返回準備運行妥凳。整體示意圖為下圖:
獲取鎖成功,出隊操作
獲取鎖的節(jié)點出隊的邏輯是:
//隊列頭結(jié)點引用指向當前節(jié)點
setHead(node);
//釋放前驅(qū)節(jié)點
p.next = null; // help GC
failed = false;
return interrupted;
setHead()
方法為:
private void setHead(Node node) {
head = node;
node.thread = null;
node.prev = null;
}
將當前節(jié)點通過setHead()
方法設(shè)置為隊列的頭結(jié)點答捕,然后將之前的頭結(jié)點的next
域設(shè)置為null
并且pre
域也為null
逝钥,即與隊列斷開,無任何引用方便GC
時能夠?qū)?nèi)存進行回收拱镐。示意圖如下:
那么當獲取鎖失敗的時候會調(diào)用shouldParkAfterFailedAcquire()
方法和parkAndCheckInterrupt()
方法艘款,看看他們做了什么事情。shouldParkAfterFailedAcquire()
方法源碼為:
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
return true;
if (ws > 0) {
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
shouldParkAfterFailedAcquire()
方法主要邏輯是使用compareAndSetWaitStatus(pred, ws, Node.SIGNAL)
使用CAS
將節(jié)點狀態(tài)由INITIAL
設(shè)置成SIGNAL
沃琅,表示當前線程阻塞哗咆。當compareAndSetWaitStatus
設(shè)置失敗則說明shouldParkAfterFailedAcquire
方法返回false
,然后會在acquireQueued()
方法中for (;;)
死循環(huán)中會繼續(xù)重試阵难,直至compareAndSetWaitStatus
設(shè)置節(jié)點狀態(tài)位為SIGNAL
時shouldParkAfterFailedAcquire
返回true
時才會執(zhí)行方法parkAndCheckInterrupt()
方法岳枷,該方法的源碼為:
private final boolean parkAndCheckInterrupt() {
//使得該線程阻塞
LockSupport.park(this);
return Thread.interrupted();
}
該方法的關(guān)鍵是會調(diào)用LookSupport.park()
方法(關(guān)于LookSupport
會在以后的文章進行討論),該方法是用來阻塞當前線程的呜叫。因此到這里就應該清楚了空繁,acquireQueued()
在自旋過程中主要完成了兩件事情:
- 如果當前節(jié)點的前驅(qū)節(jié)點是頭節(jié)點,并且能夠獲得同步狀態(tài)的話朱庆,當前線程能夠獲得鎖該方法執(zhí)行結(jié)束退出盛泡;
- 獲取鎖失敗的話,先將節(jié)點狀態(tài)設(shè)置成SIGNAL娱颊,然后調(diào)用LookSupport.park方法使得當前線程阻塞傲诵。
經(jīng)過上面的分析凯砍,獨占式鎖的獲取過程也就是acquire()
方法的執(zhí)行流程如下圖所示:
3.2 獨占鎖的釋放(release()方法)
獨占鎖的釋放就相對來說比較容易理解了,廢話不多說先來看下源碼:
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
這段代碼邏輯就比較容易理解了拴竹,如果同步狀態(tài)釋放成功(tryRelease
返回true
)則會執(zhí)行if塊中的代碼悟衩,當head
指向的頭結(jié)點不為null
,并且該節(jié)點的狀態(tài)值不為0的話才會執(zhí)行unparkSuccessor()
方法栓拜。unparkSuccessor
方法源碼:
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
//頭節(jié)點的后繼節(jié)點
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
//后繼節(jié)點不為null時喚醒該線程
LockSupport.unpark(s.thread);
}
源碼的關(guān)鍵信息請看注釋座泳,首先獲取頭節(jié)點的后繼節(jié)點,當后繼節(jié)點的時候會調(diào)用LookSupport.unpark()
方法幕与,該方法會喚醒該節(jié)點的后繼節(jié)點所包裝的線程挑势。因此,每一次鎖釋放后就會喚醒隊列中該節(jié)點的后繼節(jié)點所引用的線程啦鸣,從而進一步可以佐證獲得鎖的過程是一個FIFO
(先進先出)的過程潮饱。
到現(xiàn)在我們終于啃下了一塊硬骨頭了,通過學習源碼的方式非常深刻的學習到了獨占式鎖的獲取和釋放的過程以及同步隊列诫给∠憷可以做一下總結(jié):
- 線程獲取鎖失敗,線程被封裝成Node進行入隊操作蝙搔,核心方法在于addWaiter()和enq()缕溉,同時enq()完成對同步隊列的頭結(jié)點初始化工作以及CAS操作失敗的重試;
- 線程獲取鎖是一個自旋的過程考传,當且僅當 當前節(jié)點的前驅(qū)節(jié)點是頭結(jié)點并且成功獲得同步狀態(tài)時吃型,節(jié)點出隊即該節(jié)點引用的線程獲得鎖,否則僚楞,當不滿足條件時就會調(diào)用LookSupport.park()方法使得線程阻塞勤晚;
- 釋放鎖的時候會喚醒后繼節(jié)點;
總體來說:在獲取同步狀態(tài)時泉褐,AQS
維護一個同步隊列赐写,獲取同步狀態(tài)失敗的線程會加入到隊列中進行自旋;移除隊列(或停止自旋)的條件是前驅(qū)節(jié)點是頭結(jié)點并且成功獲得了同步狀態(tài)膜赃。在釋放同步狀態(tài)時挺邀,同步器會調(diào)用unparkSuccessor()
方法喚醒后繼節(jié)點。
獨占鎖特性學習
3.3 可中斷式獲取鎖(acquireInterruptibly方法)
我們知道lock
相較于synchronized
有一些更方便的特性跳座,比如能響應中斷以及超時等待等特性端铛,現(xiàn)在我們依舊采用通過學習源碼的方式來看看能夠響應中斷是怎么實現(xiàn)的∑>欤可響應中斷式鎖可調(diào)用方法lock.lockInterruptibly()
;而該方法其底層會調(diào)用AQS
的acquireInterruptibly
方法禾蚕,源碼為:
public final void acquireInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (!tryAcquire(arg))
//線程獲取鎖失敗
doAcquireInterruptibly(arg);
}
在獲取同步狀態(tài)失敗后就會調(diào)用doAcquireInterruptibly
方法:
private void doAcquireInterruptibly(int arg)
throws InterruptedException {
//將節(jié)點插入到同步隊列中
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
//獲取鎖出隊
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
//線程中斷拋異常
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
關(guān)鍵信息請看注釋,現(xiàn)在看這段代碼就很輕松了吧:),與acquire
方法邏輯幾乎一致狂丝,唯一的區(qū)別是當parkAndCheckInterrupt返回true時即線程阻塞時該線程被中斷换淆,代碼拋出被中斷異常哗总。
3.4 超時等待式獲取鎖(tryAcquireNanos()方法)
通過調(diào)用lock.tryLock(timeout,TimeUnit)
方式達到超時等待獲取鎖的效果,該方法會在三種情況下才會返回:
- 在超時時間內(nèi)倍试,當前線程成功獲取了鎖讯屈;
- 當前線程在超時時間內(nèi)被中斷;
- 超時時間結(jié)束县习,仍未獲得鎖返回false耻煤。
我們?nèi)匀煌ㄟ^采取閱讀源碼的方式來學習底層具體是怎么實現(xiàn)的,該方法會調(diào)用AQS
的方法tryAcquireNanos()
,源碼為:
public final boolean tryAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
return tryAcquire(arg) ||
//實現(xiàn)超時等待的效果
doAcquireNanos(arg, nanosTimeout);
}
很顯然這段源碼最終是靠doAcquireNanos
方法實現(xiàn)超時等待的效果准颓,該方法源碼如下:
private boolean doAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (nanosTimeout <= 0L)
return false;
//1\. 根據(jù)超時時間和當前時間計算出截止時間
final long deadline = System.nanoTime() + nanosTimeout;
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
//2\. 當前線程獲得鎖出隊列
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return true;
}
// 3.1 重新計算超時時間
nanosTimeout = deadline - System.nanoTime();
// 3.2 已經(jīng)超時返回false
if (nanosTimeout <= 0L)
return false;
// 3.3 線程阻塞等待
if (shouldParkAfterFailedAcquire(p, node) &&
nanosTimeout > spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanosTimeout);
// 3.4 線程被中斷拋出被中斷異常
if (Thread.interrupted())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
程序邏輯如圖所示:
程序邏輯同獨占鎖可響應中斷式獲取基本一致哈蝇,唯一的不同在于獲取鎖失敗后,對超時時間的處理上攘已,在第1步會先計算出按照現(xiàn)在時間和超時時間計算出理論上的截止時間炮赦,比如當前時間是8h10min,超時時間是10min,那么根據(jù)deadline = System.nanoTime() + nanosTimeout
計算出剛好達到超時時間時的系統(tǒng)時間就是8h 10min+10min = 8h 20min样勃。然后根據(jù)deadline - System.nanoTime()
就可以判斷是否已經(jīng)超時了吠勘,比如,當前系統(tǒng)時間是8h 30min很明顯已經(jīng)超過了理論上的系統(tǒng)時間8h 20min峡眶,deadline - System.nanoTime()
計算出來就是一個負數(shù)剧防,自然而然會在3.2步中的If判斷之間返回false。如果還沒有超時即3.2步中的if判斷為true時就會繼續(xù)執(zhí)行3.3步通過LockSupport.parkNanos
使得當前線程阻塞辫樱,同時在3.4步增加了對中斷的檢測峭拘,若檢測出被中斷直接拋出被中斷異常。
4. 共享鎖
4.1 執(zhí)行過程概述
獲取鎖的過程:
- 當線程調(diào)用
acquireShared()
申請獲取鎖資源時狮暑,如果成功鸡挠,則進入臨界區(qū)。 - 當獲取鎖失敗時搬男,則創(chuàng)建一個共享類型的節(jié)點并進入一個
FIFO
等待隊列拣展,然后被掛起等待喚醒。 - 當隊列中的等待線程被喚醒以后就重新嘗試獲取鎖資源缔逛,如果成功則喚醒后面還在等待的共享節(jié)點并把該喚醒事件傳遞下去备埃,即會依次喚醒在該節(jié)點后面的所有共享節(jié)點,然后進入臨界區(qū)褐奴,否則繼續(xù)掛起等待按脚。
釋放鎖過程:
- 當線程調(diào)用
releaseShared()
進行鎖資源釋放時,如果釋放成功歉糜,則喚醒隊列中等待的節(jié)點乘寒,如果有的話。
4.2 源碼深入分析
基于上面所說的共享鎖執(zhí)行流程匪补,我們接下來看下源碼實現(xiàn)邏輯:
首先來看下獲取鎖的方法acquireShared()伞辛,如下
public final void acquireShared(int arg) {
//嘗試獲取共享鎖烂翰,返回值小于0表示獲取失敗
if (tryAcquireShared(arg) < 0)
//執(zhí)行獲取鎖失敗以后的方法
doAcquireShared(arg);
}
這里tryAcquireShared()
方法是留給用戶去實現(xiàn)具體的獲取鎖邏輯的。關(guān)于該方法的實現(xiàn)有兩點需要特別說明:
該方法必須自己檢查當前上下文是否支持獲取共享鎖蚤氏,如果支持再進行獲取甘耿。
該方法返回值是個重點。其一竿滨、由上面的源碼片段可以看出返回值小于0表示獲取鎖失敗佳恬,需要進入等待隊列。其二于游、如果返回值等于0表示當前線程獲取共享鎖成功毁葱,但它后續(xù)的線程是無法繼續(xù)獲取的,也就是不需要把它后面等待的節(jié)點喚醒贰剥。最后倾剿、如果返回值大于0,表示當前線程獲取共享鎖成功且它后續(xù)等待的節(jié)點也有可能繼續(xù)獲取共享鎖成功蚌成,也就是說此時需要把后續(xù)節(jié)點喚醒讓它們?nèi)L試獲取共享鎖前痘。
有了上面的約定,我們再來看下doAcquireShared方法的實現(xiàn):
//參數(shù)不多說担忧,就是傳給acquireShared()的參數(shù)
private void doAcquireShared(int arg) {
//添加等待節(jié)點的方法跟獨占鎖一樣芹缔,唯一區(qū)別就是節(jié)點類型變?yōu)榱斯蚕硇停辉儋樖? final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
//表示前面的節(jié)點已經(jīng)獲取到鎖瓶盛,自己會嘗試獲取鎖
if (p == head) {
int r = tryAcquireShared(arg);
//注意上面說的最欠, 等于0表示不用喚醒后繼節(jié)點,大于0需要
if (r >= 0) {
//這里是重點蓬网,獲取到鎖以后的喚醒操作窒所,后面詳細說
setHeadAndPropagate(node, r);
p.next = null;
//如果是因為中斷醒來則設(shè)置中斷標記位
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
//掛起邏輯跟獨占鎖一樣,不再贅述
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
//獲取失敗的取消邏輯跟獨占鎖一樣帆锋,不再贅述
if (failed)
cancelAcquire(node);
}
}
獨占鎖模式獲取成功以后設(shè)置頭結(jié)點然后返回中斷狀態(tài),結(jié)束流程禽额。而共享鎖模式獲取成功以后锯厢,調(diào)用了setHeadAndPropagate
方法,從方法名就可以看出除了設(shè)置新的頭結(jié)點以外還有一個傳遞動作脯倒,一起看下代碼:
//兩個入?yún)⑹导粋€是當前成功獲取共享鎖的節(jié)點,一個就是tryAcquireShared方法的返回值藻丢,注意上面說的剪撬,它可能大于0也可能等于0
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head; //記錄當前頭節(jié)點
//設(shè)置新的頭節(jié)點,即把當前獲取到鎖的節(jié)點設(shè)置為頭節(jié)點
//注:這里是獲取到鎖之后的操作悠反,不需要并發(fā)控制
setHead(node);
//這里意思有兩種情況是需要執(zhí)行喚醒操作
//1.propagate > 0 表示調(diào)用方指明了后繼節(jié)點需要被喚醒
//2.頭節(jié)點后面的節(jié)點需要被喚醒(waitStatus<0)残黑,不論是老的頭結(jié)點還是新的頭結(jié)點
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
//如果當前節(jié)點的后繼節(jié)點是共享類型或者沒有后繼節(jié)點馍佑,則進行喚醒
//這里可以理解為除非明確指明不需要喚醒(后繼等待節(jié)點是獨占類型),否則都要喚醒
if (s == null || s.isShared())
//后面詳細說
doReleaseShared();
}
}
private void setHead(Node node) {
head = node;
node.thread = null;
node.prev = null;
}
最終的喚醒操作也很復雜梨水,專門拿出來分析一下:
注:這個喚醒操作在releaseShare()
方法里也會調(diào)用拭荤。
private void doReleaseShared() {
for (;;) {
//喚醒操作由頭結(jié)點開始,注意這里的頭節(jié)點已經(jīng)是上面新設(shè)置的頭結(jié)點了
//其實就是喚醒上面新獲取到共享鎖的節(jié)點的后繼節(jié)點
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
//表示后繼節(jié)點需要被喚醒
if (ws == Node.SIGNAL) {
//這里需要控制并發(fā)疫诽,因為入口有setHeadAndPropagate跟release兩個舅世,避免兩次unpark
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue;
//執(zhí)行喚醒操作
unparkSuccessor(h);
}
//如果后繼節(jié)點暫時不需要喚醒,則把當前節(jié)點狀態(tài)設(shè)置為PROPAGATE確保以后可以傳遞下去
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue;
}
//如果頭結(jié)點沒有發(fā)生變化奇徒,表示設(shè)置完成雏亚,退出循環(huán)
//如果頭結(jié)點發(fā)生變化,比如說其他線程獲取到了鎖摩钙,為了使自己的喚醒動作可以傳遞评凝,必須進行重試
if (h == head)
break;
}
}
接下來看下釋放共享鎖的過程:
public final boolean releaseShared(int arg) {
//嘗試釋放共享鎖
if (tryReleaseShared(arg)) {
//喚醒過程,詳情見上面分析
doReleaseShared();
return true;
}
return false;
}
注:上面的setHeadAndPropagate()方法表示等待隊列中的線程成功獲取到共享鎖腺律,這時候它需要喚醒它后面的共享節(jié)點(如果有)奕短,但是當通過releaseShared()方法去釋放一個共享鎖的時候,接下來等待獨占鎖跟共享鎖的線程都可以被喚醒進行嘗試獲取匀钧。
4.3 總結(jié)
跟獨占鎖相比翎碑,共享鎖的主要特征在于當一個在等待隊列中的共享節(jié)點成功獲取到鎖以后(它獲取到的是共享鎖),既然是共享之斯,那它必須要依次喚醒后面所有可以跟它一起共享當前鎖資源的節(jié)點日杈,毫無疑問,這些節(jié)點必須也是在等待共享鎖(這是大前提佑刷,如果等待的是獨占鎖莉擒,那前面已經(jīng)有一個共享節(jié)點獲取鎖了,它肯定是獲取不到的)瘫絮。當共享鎖被釋放的時候涨冀,可以用讀寫鎖為例進行思考,當一個讀鎖被釋放麦萤,此時不論是讀鎖還是寫鎖都是可以競爭資源的鹿鳖。
5. 總結(jié):
如果獲取共享鎖失敗后,將請求共享鎖的線程封裝成Node
對象放入AQS
的隊列中壮莹,并掛起Node
對象對應的線程翅帜,實現(xiàn)請求鎖線程的等待操作。待共享鎖可以被獲取后命满,從頭節(jié)點開始涝滴,依次喚醒頭節(jié)點及其以后的所有共享類型的節(jié)點。實現(xiàn)共享狀態(tài)的傳播。這里有幾點值得注意:
1. 與AQS
的獨占功能一樣歼疮,共享鎖是否可以被獲取的判斷為空方法杂抽,交由子類去實現(xiàn)。
2. 與AQS
的獨占功能不同腋妙,當鎖被頭節(jié)點獲取后默怨,獨占功能是只有頭節(jié)點獲取鎖,其余節(jié)點的線程繼續(xù)沉睡骤素,等待鎖被釋放后匙睹,才會喚醒下一個節(jié)點的線程,而共享功能是只要頭節(jié)點獲取鎖成功济竹,就在喚醒自身節(jié)點對應的線程的同時痕檬,繼續(xù)喚醒AQS
隊列中的下一個節(jié)點的線程,每個節(jié)點在喚醒自身的同時還會喚醒下一個節(jié)點對應的線程送浊,以實現(xiàn)共享狀態(tài)的“向后傳播”梦谜,從而實現(xiàn)共享功能。
以上的分析都是從AQS
子類的角度去看待AQS
的部分功能的袭景,而如果直接看待AQS
唁桩,或許可以這么去解讀:
首先,AQS
并不關(guān)心“是什么鎖”耸棒,對于AQS
來說它只是實現(xiàn)了一系列的用于判斷“資源”是否可以訪問的API
,并且封裝了在“訪問資源”受限時將請求訪問的線程的加入隊列荒澡、掛起、喚醒等操作与殃,AQS
只關(guān)心“資源不可以訪問時单山,怎么處理?”幅疼、“資源是可以被同時訪問米奸,還是在同一時間只能被一個線程訪問?”爽篷、“如果有線程等不及資源了悴晰,怎么從AQS
的隊列中退出?”等一系列圍繞資源訪問的問題狼忱,而至于“資源是否可以被訪問膨疏?”這個問題則交給AQS
的子類去實現(xiàn)。
當AQS
的子類是實現(xiàn)獨占功能時钻弄,例如ReentrantLock
,“資源是否可以被訪問”被定義為只要AQS的state變量不為0者吁,并且持有鎖的線程不是當前線程窘俺,則代表資源不能訪問。
當AQS
的子類是實現(xiàn)共享功能時,例如:CountDownLatch
瘤泪,“資源是否可以被訪問”被定義為只要AQS
的state
變量不為0灶泵,說明資源不能訪問。這是典型的將規(guī)則和操作分開的設(shè)計思路:規(guī)則子類定義对途,操作邏輯因為具有公用性赦邻,放在父類中去封裝。當然实檀,正式因為AQS
只是關(guān)心“資源在什么條件下可被訪問”惶洲,所以子類還可以同時使用AQS
的共享功能和獨占功能的API
以實現(xiàn)更為復雜的功能。
比如:ReentrantReadWriteLock
膳犹,我們知道ReentrantReadWriteLock
的中也有一個叫Sync
的內(nèi)部類繼承了AQS
恬吕,而AQS
的隊列可以同時存放共享鎖和獨占鎖,對于ReentrantReadWriteLock
來說分別代表讀鎖和寫鎖须床,當隊列中的頭節(jié)點為讀鎖時铐料,代表讀操作可以執(zhí)行,而寫操作不能執(zhí)行豺旬,因此請求寫操作的線程會被掛起钠惩,當讀操作依次推出后,寫鎖成為頭節(jié)點族阅,請求寫操作的線程被喚醒篓跛,可以執(zhí)行寫操作,而此時的讀請求將被封裝成Node
放入AQS
的隊列中耘分。如此往復举塔,實現(xiàn)讀寫鎖的讀寫交替進行。
參考文獻
《java并發(fā)編程的藝術(shù)》