線程這塊的一些工具類,基本都會(huì)以原理為主,希望大家能有一個(gè)這樣的意識(shí),通過分析別人代碼的設(shè)計(jì)和實(shí)現(xiàn),給自己提供積累一些方法和工具
Condition
在前面學(xué)習(xí) synchronized 的時(shí)候,有講到 wait/notify 的基本使用,結(jié)合 synchronized 可以實(shí)現(xiàn)對(duì)線程的通信.那么這個(gè)時(shí)候我就在思考了,既然 J.U.C 里面提供了鎖的實(shí)現(xiàn)機(jī)制,那 J.U.C 里面有沒有提供類似的線程通信的工具呢油昂?于是找阿找,發(fā)現(xiàn)了一個(gè) Condition 工具類
Condition 是一個(gè)多線程協(xié)調(diào)通信的工具類,可以讓某些線程一起等待某個(gè)條件(condition),只有滿足條件時(shí),線程才會(huì)被喚醒
Condition 的基本使用
ConditionWait
public class ConditionDemoWait implements Runnable{
private Lock lock;
private Condition condition;
public ConditionDemoWait(Lock lock, Condition condition){
this.lock=lock;
this.condition=condition;
}
@Override
public void run() {
System.out.println("begin - ConditionDemoWait");
try {
lock.lock();
condition.await();
System.out.println("end - ConditionDemoWait");
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
lock.unlock();
}
}
}
ConditionSignal
public class ConditionDemoSignal implements Runnable{
private Lock lock;
private Condition condition;
public ConditionDemoSignal(Lock lock, Condition condition){
this.lock=lock;
this.condition=condition;
}
@Override
public void run() {
System.out.println("begin - ConditionDemoSignal");
try {
lock.lock();
condition.signal();
System.out.println("end - ConditionDemoSignal");
}finally {
lock.unlock();
}
}
}
通過這個(gè)案例簡(jiǎn)單實(shí)現(xiàn)了 wait 和 notify 的功能,當(dāng)調(diào)用 await 方法后,當(dāng)前線程會(huì)釋放鎖并等待,而其他線程調(diào)用 condition 對(duì)象的 signal 或者 signalall 方法通知并被阻塞的線程,然后自己執(zhí)行 unlock 釋放鎖,被喚醒的線程獲得之前的鎖繼續(xù)執(zhí)行,最后釋放鎖
所以,condition 中兩個(gè)最重要的方法,一個(gè)是 await,一個(gè)是 signal 方法
- await: 把當(dāng)前線程阻塞掛起
- signal: 喚醒阻塞的線程
Condition 源碼分析
調(diào)用 Condition,需要獲得 Lock 鎖,所以意味著會(huì)存在一個(gè) AQS 同步隊(duì)列,在上面那個(gè)案例中,假如兩個(gè)線程同時(shí)運(yùn)行的話,那么 AQS 的隊(duì)列可能是下面這種情況
那么這個(gè)時(shí)候 ThreadA 調(diào)用了 condition.await 方法,它做了什么事情呢革娄?
condition.await
調(diào)用 Condition 的 await() 方法(或者以 await 開頭的方法 ),會(huì)使當(dāng)前線程進(jìn)入等待隊(duì)列并釋放鎖,同時(shí)線程狀態(tài)變?yōu)榈却隣顟B(tài).當(dāng)從 await() 方法返回時(shí),當(dāng)前線程一定獲取了 Condition 相關(guān)聯(lián)的鎖
public final void await() throws InterruptedException {
if (Thread.interrupted()) { //表示 await 允許被中斷
throw new InterruptedException();
}
Node node = addConditionWaiter(); //創(chuàng)建一個(gè)新的節(jié)點(diǎn),節(jié)點(diǎn)狀態(tài)為 condition冕碟,采用的數(shù)據(jù)結(jié)構(gòu)仍然是鏈表
int savedState = fullyRelease(node); //釋放當(dāng)前的鎖拦惋,得到鎖的狀態(tài),并喚醒 AQS 隊(duì)列中的一個(gè)線程
int interruptMode = 0;
//如果當(dāng)前節(jié)點(diǎn)沒有在同步隊(duì)列上安寺,即還沒有被 signal,則將當(dāng)前線程阻塞
while (!isOnSyncQueue(node)) {//判斷這個(gè)節(jié)點(diǎn)是否在 AQS 隊(duì)列上厕妖,第一次判斷的是 false,因?yàn)榍懊嬉呀?jīng)釋放鎖了
LockSupport.park(this); //通過 park 掛起當(dāng)前線程
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) {
break;
}
}
// 當(dāng)這個(gè)線程醒來,會(huì)嘗試拿鎖, 當(dāng) acquireQueued 返回 false 就是拿到鎖了
// interruptMode != THROW_IE -> 表示這個(gè)線程沒有成功將 node 入隊(duì), 但 signal 執(zhí)行了 enq 方法讓其入隊(duì)了
// 將這個(gè)變量設(shè)置成 REINTERRUPT
if (acquireQueued(node, savedState) && interruptMode != THROW_IE) {
interruptMode = REINTERRUPT;
}
// 如果 node 的下一個(gè)等待者不是 null, 則進(jìn)行清理,清理 Condition 隊(duì)列上的節(jié)點(diǎn)
// 如果是 null ,就沒有什么好清理的了
if (node.nextWaiter != null) {// clean up if
cancelled
}
unlinkCancelledWaiters();
// 如果線程被中斷了,需要拋出異常.或者什么都不做
if (interruptMode != 0) {
reportInterruptAfterWait(interruptMode);
}
}
addConditionWaiter
這個(gè)方法的主要作用是把當(dāng)前線程封裝成 Node,添加到等待隊(duì)列.這里的隊(duì)列不再是雙向鏈表,而是單向鏈表
private Node addConditionWaiter() {
Node t = lastWaiter;
// 如果 lastWaiter 不等于空并且 waitStatus 不等于 CONDITION 時(shí)挑庶,把沖好這個(gè)節(jié)點(diǎn)從鏈表中移除
if (t != null && t.waitStatus != Node.CONDITION) {
unlinkCancelledWaiters();
t = lastWaiter;
}
//構(gòu)建一個(gè) Node言秸,waitStatus=CONDITION。這里的鏈表是一個(gè)單向的迎捺,所以相比 AQS 來說會(huì)簡(jiǎn)單很多
Node node = new Node(Thread.currentThread(), Node.CONDITION);
if (t == null) {
firstWaiter = node;
} else {
t.nextWaiter = node;
}
lastWaiter = node;
return node;
}
圖解分析
執(zhí)行完 addConditionWaiter 這個(gè)方法之后,就會(huì)產(chǎn)生一個(gè)這樣的 condition 隊(duì)列
fullyRelease
fullRelease,就是徹底的釋放鎖,什么叫徹底呢,就是如果當(dāng)前鎖存在多次重入,那么在這個(gè)方法中只需要釋放一次就會(huì)把所有的重入次數(shù)歸零
final int fullyRelease(Node node) {
boolean failed = true;
try {
int savedState = getState();
//獲得重入的次數(shù)
if (release(savedState)) {//釋放鎖并且喚醒下一個(gè)同步隊(duì)列中的線程
failed = false;
return savedState;
} else {
throw new IllegalMonitorStateException();
}
} finally {
if (failed) {
node.waitStatus = Node.CANCELLED;
}
}
}
圖解分析
此時(shí),同步隊(duì)列會(huì)觸發(fā)鎖的釋放和重新競(jìng)爭(zhēng).ThreadB 獲得了鎖
isOnSyncQueue
判斷當(dāng)前節(jié)點(diǎn)是否在同步隊(duì)列中,返回 false 表示不在,返回 true 表示在
如果不在 AQS 同步隊(duì)列,說明當(dāng)前節(jié)點(diǎn)沒有喚醒去爭(zhēng)搶同步鎖,所以需要把當(dāng)前線程阻塞起來,直到其他的線程調(diào)用 signal 喚醒
如果在 AQS 同步隊(duì)列,意味著它需要去競(jìng)爭(zhēng)同步鎖去獲得執(zhí)行程序執(zhí)行權(quán)限
為什么要做這個(gè)判斷呢举畸?原因是在 condition 隊(duì)列中的節(jié)點(diǎn)會(huì)重新加入到 AQS 隊(duì)列去競(jìng)爭(zhēng)鎖.也就是當(dāng)調(diào)用 signal 的時(shí)候,會(huì)把當(dāng)前節(jié)點(diǎn)從 condition 隊(duì)列轉(zhuǎn)移到 AQS 隊(duì)列
- 大家思考一下,基于現(xiàn)在的邏輯結(jié)構(gòu).如何去判斷 ThreadA 這個(gè)節(jié)點(diǎn)是否存在于 AQS 隊(duì)列中呢?
- 如果 ThreadA 的 waitStatus 的狀態(tài)為 CONDITION,說明它存在于 condition 隊(duì)列中,不在 AQS 隊(duì)列.因?yàn)?AQS 隊(duì)列的狀態(tài)一定不可能有 CONDITION
- 如果 node.prev 為空,說明也不存在于 AQS 隊(duì)列,原因是 prev=null 在 AQS 隊(duì)列中只有一種可能性,就是它是 head 節(jié)點(diǎn),head 節(jié)點(diǎn)意味著它是獲得鎖的節(jié)點(diǎn).
- 如果 node.next 不等于空,說明一定存在于 AQS 隊(duì)列中,因?yàn)橹挥?AQS 隊(duì)列才會(huì)存在 next 和 prev 的關(guān)系
- findNodeFromTail,表示從 tail 節(jié)點(diǎn)往前掃描 AQS 隊(duì)列,一旦發(fā)現(xiàn) AQS 隊(duì)列的節(jié)點(diǎn)和當(dāng)前節(jié)點(diǎn)相等,說明節(jié)點(diǎn)一定存在于 AQS 隊(duì)列中
final boolean isOnSyncQueue(Node node) {
if (node.waitStatus == Node.CONDITION || node.prev == null) {
return false;
}
if (node.next != null) {// If has successor, it must be on queue
return true;
}
return findNodeFromTail(node);
}
Condition.signal
await 方法會(huì)阻塞 ThreadA,然后 ThreadB 搶占到了鎖獲得了執(zhí)行權(quán)限,這個(gè)時(shí)候在 ThreadB 中調(diào)用了 Condition 的 signal() 方法,將會(huì)喚醒在等待隊(duì)列中節(jié)點(diǎn)
public final void signal() {
if (!isHeldExclusively()) { //先判斷當(dāng)前線程是否獲得了鎖凳枝,這個(gè)判斷比較簡(jiǎn)單抄沮,直接用獲得鎖的線程和當(dāng)前線程相比即可
throw new IllegalMonitorStateException();
}
Node first = firstWaiter; // 拿到 Condition 隊(duì)列上第一個(gè)節(jié)點(diǎn)
if (first != null) {
doSignal(first);
}
}
Condition.doSignal
對(duì) condition 隊(duì)列中從首部開始的第一個(gè) condition 狀態(tài)的節(jié)點(diǎn),執(zhí)行 transferForSignal 操作,將 node 從 condition 隊(duì)列中轉(zhuǎn)換到 AQS 隊(duì)列中,同時(shí)修改 AQS 隊(duì)列中原先尾節(jié)點(diǎn)的狀態(tài)
private void doSignal(Node first) {
do {//從 Condition 隊(duì)列中刪除 first 節(jié)點(diǎn)
if ( (firstWaiter = first.nextWaiter) == null) {
lastWaiter = null; // 將 next 節(jié)點(diǎn)設(shè)置成 null
}
first.nextWaiter = null;
} while (!transferForSignal(first) && (first = firstWaiter) != null);
}
AQS.transferForSignal
該方法先是 CAS 修改了節(jié)點(diǎn)狀態(tài),如果成功,就將這個(gè)節(jié)點(diǎn)放到 AQS 隊(duì)列中,然后喚醒這個(gè)節(jié)點(diǎn)上的線程.此時(shí),那個(gè)節(jié)點(diǎn)就會(huì)在 await 方法中蘇醒
final boolean transferForSignal(Node node) {
//更新節(jié)點(diǎn)的狀態(tài)為 0,如果更新失敗岖瑰,只有一種可能就是節(jié)點(diǎn)被 CANCELLED 了
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
return false;
}
//調(diào)用 enq叛买,把當(dāng)前節(jié)點(diǎn)添加到 AQS 隊(duì)列。并且返回返回按當(dāng)前節(jié)點(diǎn)的上一個(gè)節(jié)點(diǎn)蹋订,也就是原 tail 節(jié)點(diǎn)
Node p = enq(node);
int ws = p.waitStatus;
// 如果上一個(gè)節(jié)點(diǎn)的狀態(tài)被取消了, 或者嘗試設(shè)置上一個(gè)節(jié)點(diǎn)的狀態(tài)為 SIGNAL 失敗了(SIGNAL 表示:他的 next 節(jié)點(diǎn)需要停止阻塞 ),
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)) {
LockSupport.unpark(node.thread); // 喚醒節(jié)點(diǎn)上的線程
}
//如果 node 的 prev 節(jié)點(diǎn)已經(jīng)是 signal 狀態(tài)率挣,那么被阻塞的 ThreadA 的喚醒工作由 AQS 隊(duì)列來完成
return true;
}
圖解分析
執(zhí)行完 doSignal 以后,會(huì)把 condition 隊(duì)列中的節(jié)點(diǎn)轉(zhuǎn)移到 aqs 隊(duì)列上,邏輯結(jié)構(gòu)圖如下
這個(gè)時(shí)候會(huì)判斷 ThreadA 的 prev 節(jié)點(diǎn)也就是 head 節(jié)點(diǎn)的 waitStatus,如果大于 0 或者設(shè)置 SIGNAL 失敗,表示節(jié)點(diǎn)被設(shè)置成了 CANCELLED 狀態(tài).這個(gè)時(shí)候會(huì)喚醒 ThreadA 這個(gè)線程.否則就基于 AQS 隊(duì)列的機(jī)制來喚醒,也就是等到 ThreadB 釋放鎖之后來喚醒 ThreadA
被阻塞的線程喚醒后的邏輯
前面在分析 await 方法時(shí),線程會(huì)被阻塞.而通過 signal 被喚醒之后又繼續(xù)回到上次執(zhí)行的邏輯中標(biāo)注為紅色部分的代碼 checkInterruptWhileWaiting 這個(gè)方法是干嘛呢?其實(shí)從名字就可以看出來,就是 ThreadA 在 condition 隊(duì)列被阻塞的過程中,有沒有被其他線程觸發(fā)過中斷請(qǐng)求
public final void await() throws InterruptedException {
if (Thread.interrupted()) {
throw new InterruptedException();
}
Node node = addConditionWaiter();
int savedState = fullyRelease(node);
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) {
break;
}
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE) {
interruptMode = REINTERRUPT;
}
if (node.nextWaiter != null) {//clean up if cancelled
unlinkCancelledWaiters();
}
if (interruptMode != 0) {
reportInterruptAfterWait(interruptMode);
}
}
checkInterruptWhileWaiting
如果當(dāng)前線程被中斷,則調(diào)用 transferAfterCancelledWait 方法判斷后續(xù)的處理應(yīng)該是拋出 InterruptedException 還是重新中斷
這里需要注意的地方是,如果第一次 CAS 失敗了,則不能判斷當(dāng)前線程是先進(jìn)行了中斷還是先進(jìn)行了 signal 方法的調(diào)用,可能是先執(zhí)行了 signal 然后中斷,也可能是先執(zhí)行了中斷,后執(zhí)行了 signal,當(dāng)然,這兩個(gè)操作肯定是發(fā)生在 CAS 之前.這時(shí)需要做的就是等待當(dāng)前線程的 node 被添加到 AQS 隊(duì)列后,也就是 enq 方法返回后,返回 false 告訴 checkInterruptWhileWaiting 方法返回 REINTERRUPT(1),后續(xù)進(jìn)行重新中斷
簡(jiǎn)單來說,該方法的返回值代表當(dāng)前線程是否在 park 的時(shí)候被中斷喚醒,如果為 true 表示中斷在 signal 調(diào)用之前,signal 還未執(zhí)行,那么這個(gè)時(shí)候會(huì)根據(jù) await 的語義,在 await 時(shí)遇到中斷需要拋出 interruptedException,返回 true 就是告訴 checkInterruptWhileWaiting 返回 THROW_IE(-1).如果返回 false,否則表示 signal 已經(jīng)執(zhí)行過了,只需要重新響應(yīng)中斷即可
private int checkInterruptWhileWaiting(Node node) {
return Thread.interrupted() ? (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) : 0;
}
final boolean transferAfterCancelledWait(Node node) {
// 使用 cas 修改節(jié)點(diǎn)狀態(tài)露戒,如果還能修改成功难礼,說明線程被中斷時(shí),signal 還沒有被調(diào)用
// 這里有一個(gè)知識(shí)點(diǎn)玫锋,就是線程被喚醒蛾茉,并不一定是在 java 層面執(zhí)行了 locksupport.unpark,
// 也可能是調(diào)用了線程的 interrupt () 方法撩鹿,這個(gè)方法會(huì)更新一個(gè)中斷標(biāo)識(shí)谦炬,并且會(huì)喚醒處于阻塞狀態(tài)下的線程
if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
enq(node); //如果 cas 成功,則把 node 添加到 AQS 隊(duì)列
return true;
}
// 如果 cas 失敗,則判斷當(dāng)前 node 是否已經(jīng)在 AQS 隊(duì)列上键思,如果不在础爬,則讓給其他線程執(zhí)行
// 當(dāng) node 被觸發(fā)了 signal 方法時(shí),node 就會(huì)被加到 aqs 隊(duì)列上
while (!isOnSyncQueue(node)) {//循 環(huán)檢測(cè) node 是否已經(jīng)成功添加到 AQS 隊(duì)列中, 如果沒有吼鳞,則通過 yield看蚜,
Thread.yield();
}
return false;
}
acquireQueued
這個(gè)方法在講 aqs 的時(shí)候說過,是的當(dāng)前被喚醒的節(jié)點(diǎn) ThreadA 去搶占同步鎖.并且要恢復(fù)到原本的重入次數(shù)狀態(tài).調(diào)用完這個(gè)方法之后,AQS 隊(duì)列的狀態(tài)如下將 head 節(jié)點(diǎn)的 waitStatus 設(shè)置為-1,Signal 狀態(tài)
reportInterruptAfterWait
根據(jù) checkInterruptWhileWaiting 方法返回的中斷標(biāo)識(shí)來進(jìn)行中斷上報(bào)
如果是 THROW_IE,則拋出中斷異常
如果是 REINTERRUPT,則重新響應(yīng)中斷
private void reportInterruptAfterWait(int interruptMode) throws InterruptedException {
if (interruptMode == THROW_IE) {
throw new InterruptedException();
} else if (interruptMode == REINTERRUPT) {
selfInterrupt();
}
}
Condition 總結(jié)
await 和 signal 的總結(jié)
我把前面的整個(gè)分解的圖再通過一張整體的結(jié)構(gòu)圖來表述,線程 awaitThread 先通過 lock.lock() 方法獲取鎖成功后調(diào)用了 condition.await 方法進(jìn)入等待隊(duì)列,而另一個(gè)線程 signalThread 通過 lock.lock() 方法獲取鎖成功后調(diào)用了 condition.signal 或者 signalAll 方法,使得線程 awaitThread 能夠有機(jī)會(huì)移入到同步隊(duì)列中,當(dāng)其他線程釋放 lock 后使得線程 awaitThread 能夠有機(jī)會(huì)獲取 lock,從而使得線程 awaitThread 能夠從 await 方法中退出執(zhí)行后續(xù)操作.如果 awaitThread 獲取 lock 失敗會(huì)直接進(jìn)入到同步隊(duì)列
- 阻塞:await() 方法中,在線程釋放鎖資源之后,如果節(jié)點(diǎn)不在 AQS 等待隊(duì)列,則阻塞當(dāng)前線程,如果在等待隊(duì)列,則自旋等待嘗試獲取鎖
- 釋放:signal() 后,節(jié)點(diǎn)會(huì)從 condition 隊(duì)列移動(dòng)到 AQS 等待隊(duì)列,則進(jìn)入正常鎖的獲取流程
了解完 Lock 以及 Condition 之后,意味著我們對(duì)于 J.U.C 里面的鎖機(jī)制以及線程通信機(jī)制有了一個(gè)全面和深入的了解,接下來我們來看看其他比較常用的一些工具
限制
JUC 中提供了幾個(gè)比較用的并發(fā)工具類, 比如 CountDownLatch,CyclicBarrier,Semaphore. 其實(shí)在以前我們課堂的演示代碼中,或多或少都有用到過這樣一些 api,接下來我們會(huì)帶大家去深入研究一些常用的 api
CountDownLatch
countdownlatch 是一個(gè)同步工具類,它允許一個(gè)或多個(gè)線程一直等待,直到其他線程的操作執(zhí)行完畢再執(zhí)行.從命名可以解讀到 countdown 是倒數(shù)的意思,類似于我們倒計(jì)時(shí)的概念
countdownlatch 提供了兩個(gè)方法,一個(gè)是 countDown,一個(gè)是 await,countdownlatch 初始化的時(shí)候需要傳入一個(gè)整數(shù),在這個(gè)整數(shù)倒數(shù)到 0 之前,調(diào)用了 await 方法的程序都必須要等待,然后通過 countDown 來倒數(shù)
使用案例
public static void main(String[] args) throws InterruptedException {
CountDownLatch countDownLatch = new CountDownLatch(3);
new Thread(() -> {
System.out.println("" + Thread.currentThread().getName() + "-執(zhí)行中");
countDownLatch.countDown();
System.out.println("" + Thread.currentThread().getName() + "-執(zhí)行完畢");
}, "t1").start();
new Thread(() -> {
System.out.println("" + Thread.currentThread().getName() + "-執(zhí)行中");
countDownLatch.countDown();
System.out.println("" + Thread.currentThread().getName() + "-執(zhí)行完畢");
}, "t2").start();
new Thread(() -> {
System.out.println("" + Thread.currentThread().getName() + "-執(zhí)行中");
countDownLatch.countDown();
System.out.println("" + Thread.currentThread().getName() + "-執(zhí)行完畢");
}, "t3").start();
countDownLatch.await();
System.out.println("所有線程執(zhí)行完畢");
}
從代碼的實(shí)現(xiàn)來看,有點(diǎn)類似 join 的功能,但是比 join 更加靈活.CountDownLatch 構(gòu)造函數(shù)會(huì)接收一個(gè) int 類型的參數(shù)作為計(jì)數(shù)器的初始值,當(dāng)調(diào)用 CountDownLatch 的 countDown 方法時(shí),這個(gè)計(jì)數(shù)器就會(huì)減一.通過 await 方法去阻塞去阻塞主流程
模擬高并發(fā)場(chǎng)景
static CountDownLatch countDownLatch = new CountDownLatch(1);
@Override
public void run() {
try {
countDownLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("ThreadName:" + Thread.currentThread().getName());
}
public static void main(String[] args) throws InterruptedException {
for (int i = 0; i < 1000; i++) {
new Demo().start();
}
countDownLatch.countDown();
}
總的來說,凡事涉及到需要指定某個(gè)人物在執(zhí)行之前,要等到前置人物執(zhí)行完畢之后才執(zhí)行的場(chǎng)景,都可以使用 CountDownLatch
CountDownLatch 源碼分析
對(duì)于 CountDownLatch,我們僅僅需要關(guān)心兩個(gè)方法,一個(gè)是 countDown() 方法,另一個(gè)是 await() 方法.countDown() 方法每次調(diào)用都會(huì)將 state 減 1,直到 state 的值為 0;而 await 是一個(gè)阻塞方法,當(dāng) state 減 為 0 的時(shí)候,await 方法才會(huì)返回.await 可以被多個(gè)線程調(diào)用,大家在這個(gè)時(shí)候腦子里要有個(gè)圖:所有調(diào)用了 await 方法的線程阻塞在 AQS 的阻塞隊(duì)列中,等待條件滿足(state == 0),將線程從隊(duì)列中一個(gè)個(gè)喚醒過來
acquireSharedInterruptibly
countdownlatch 也用到了 AQS,在 CountDownLatch 內(nèi)部寫了一個(gè) Sync 并且繼承了 AQS 這個(gè)抽象類重寫了 AQS 中的共享鎖方法.首先看到下面這個(gè)代碼,這塊代碼主要是判斷是否到了共享鎖;( 在
CountDownLatch,使用的是共享鎖機(jī)制,因?yàn)?CountDownLatch 并不需要實(shí)現(xiàn)互斥的特性 )
public final void acquireSharedInterruptibly(int arg) throws InterruptedException {
if (Thread.interrupted()) {
throw new InterruptedException();
}
if (tryAcquireShared(arg) < 0) {//state 如果不等于 0赔桌,說明當(dāng)前線程需要加入到共享鎖隊(duì)列中
doAcquireSharedInterruptibly(arg);
}
}
doAcquireSharedInterruptibly
- addWaiter 設(shè)置為 shared 模式.
- tryAcquire 和 tryAcquireShared 的返回值不同,因此會(huì)多出一個(gè)判斷過程
- 在判斷前驅(qū)節(jié)點(diǎn)是頭節(jié)點(diǎn)后,調(diào)用了 setHeadAndPropagate 方法,而不是簡(jiǎn)單的更新一下頭節(jié)點(diǎn)
private void doAcquireSharedInterruptibly(int arg) throws InterruptedException {
final Node node = addWaiter(Node.SHARED);
//創(chuàng)建一個(gè)共享模式的節(jié)點(diǎn)添加到隊(duì)列中
boolean failed = true;
try {
for (; ; ) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);// 就判斷嘗試獲取鎖
if (r >= 0) {//r>=0 表示獲取到了執(zhí)行權(quán)限供炎,這個(gè)時(shí)候因?yàn)?state!=0,所以不會(huì)執(zhí)行這段代碼
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
//阻塞線程
if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) {
throw new InterruptedException();
}
}
} finally {
if (failed) {
cancelAcquire(node);
}
}
}
圖解分析
加入這個(gè)時(shí)候有 3 個(gè)線程調(diào)用了 await 方法,由于這個(gè)時(shí)候 state 的值還不為 0,所以這三個(gè)線程都會(huì)加入到 AQS 隊(duì)列中.并且三個(gè)線程都處于阻塞狀態(tài)
CountDownLatch.countDown
由于線程被 await 方法阻塞了,所以只有等到 countdown 方法使得 state=0 的時(shí)候才會(huì)被喚醒,我們來看看 countdown 做了什么
- 只有當(dāng) state 減為 0 的時(shí)候,tryReleaseShared 才返回 true, 否則只是簡(jiǎn)單的 state = state - 1
- 如果 state=0, 則調(diào)用 doReleaseShared 喚醒處于 await 狀態(tài)下的線程
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
// 用自旋的方法實(shí)現(xiàn) state 減 1
protected boolean
tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (; ; ) {
int c = getState();
if (c == 0) {
return false;
}
int nextc = c - 1;
if (compareAndSetState(c, nextc)) {
return nextc == 0;
}
}
}
AQS. doReleaseShared
共享鎖的釋放和獨(dú)占鎖的釋放有一定的差別
前面喚醒鎖的邏輯和獨(dú)占鎖是一樣,先判斷頭結(jié)點(diǎn)是不是 SIGNAL 狀態(tài),如果是,則修改為 0,并且喚醒頭結(jié)點(diǎn)的下一個(gè)節(jié)點(diǎn)
// PROPAGATE:標(biāo)識(shí)為 PROPAGATE 狀態(tài)的節(jié)點(diǎn)疾党,是共享鎖模式下的節(jié)點(diǎn)狀態(tài)音诫,處于這個(gè)狀態(tài)下的節(jié)點(diǎn),會(huì)對(duì)線程的喚醒進(jìn)行傳播
private void doReleaseShared() {
for (; ; ) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) {
continue;
}
// loop to recheck cases
unparkSuccessor(h);
}
// 這個(gè) CAS 失敗的場(chǎng)景是:執(zhí)行到這里的時(shí)候雪位,剛好有一個(gè)節(jié)點(diǎn)入隊(duì)竭钝,入隊(duì)會(huì)將這個(gè) ws 設(shè)置為 - 1
else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) {
continue;
}
// loop on failed CAS
}
// 如果到這里的時(shí)候,前面喚醒的線程已經(jīng)占領(lǐng)了 head雹洗,那么再循環(huán)
// 通過檢查頭節(jié)點(diǎn)是否改變了香罐,如果改變了就繼續(xù)循環(huán)
if (h == head) {
// loop if head changed
break;
}
}
}
- h == head:說明頭節(jié)點(diǎn)還沒有被剛剛用 unparkSuccessor 喚醒的線程(這里可以理解為 ThreadB) 占有,此時(shí) break 退出循環(huán).
- h != head:頭節(jié)點(diǎn)被剛剛喚醒的線程(這里可以理解為 ThreadB) 占有,那么這里重新進(jìn)入下一輪循環(huán),喚醒下一個(gè)節(jié)點(diǎn)(這里是 ThreadB ).我們知道,等到 ThreadB 被喚醒后,其實(shí)是會(huì)主動(dòng)喚醒 ThreadC...
doAcquireSharedInterruptibly
一旦 ThreadA 被喚醒,代碼又會(huì)繼續(xù)回到 doAcquireSharedInterruptibly 中來執(zhí)行.如果當(dāng)前 state 滿足 =0 的條件,則會(huì)執(zhí)行 setHeadAndPropagate 方法
private void doAcquireSharedInterruptibly(int arg) throws InterruptedException {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (; ; ) {//被喚醒的線程進(jìn)入下一次循環(huán)繼續(xù)判斷
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; //把當(dāng)前節(jié)點(diǎn)移除 aqs 隊(duì)列
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) {
throw new InterruptedException();
}
}
} finally {
if (failed) {
cancelAcquire(node);
}
}
}
setHeadAndPropagate
這個(gè)方法的主要作用是把被喚醒的節(jié)點(diǎn),設(shè)置成 head 節(jié) 點(diǎn). 然后繼續(xù)喚醒隊(duì)列中的其他線程.由于現(xiàn)在隊(duì)列中有 3 個(gè)線程處于阻塞狀態(tài),一旦 ThreadA 被喚醒,并且設(shè)置為 head 之后,會(huì)繼續(xù)喚醒后續(xù)的 ThreadB
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head; // Record old head for check below
setHead(node);
if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0) {
Node s = node.next;
if (s == null || s.isShared()) {
doReleaseShared();
}
}
}
Semaphore
semaphore 也就是我們常說的信號(hào)燈,semaphore 可以控制同時(shí)訪問的線程個(gè)數(shù),通過 acquire 獲取一個(gè)許可,如果沒有就等待,通過 release 釋放一個(gè)許可.有點(diǎn)類似限流的作用.叫信號(hào)燈的原因也和他的用處有關(guān),比如某商場(chǎng)就 5 個(gè)停車位,每個(gè)停車位只能停一輛車,如果這個(gè)時(shí)候來了 10 輛車,必須要等前面有空的車位才能進(jìn)入.
使用案例
public class Test {
public static void main(String[] args) {
Semaphore semaphore = new Semaphore(5);
for (int i = 0; i < 10; i++) {
new Car(i, semaphore).start();
}
}
static class Car extends Thread {
private int num;
private Semaphore semaphore;
public Car(int num, Semaphore semaphore) {
this.num = num;
this.semaphore = semaphore;
}
public void run() {
try {
semaphore.acquire();//獲取一個(gè)許可
System.out.println("第" + num + "占用一個(gè)停車位");
TimeUnit.SECONDS.sleep(2);
System.out.println("第" + num + "倆車走嘍");
semaphore.release();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
使用場(chǎng)景
Semaphore 比較常見的就是用來做限流操作了
Semaphore 源碼分析
從 Semaphore 的功能來看,我們基本能猜測(cè)到它的底層實(shí)現(xiàn)一定是基于 AQS 的共享所,因?yàn)樾枰獙?shí)現(xiàn)多個(gè)線程共享一個(gè)領(lǐng)排池
創(chuàng)建 Semaphore 實(shí)例的時(shí)候,需要一個(gè)參數(shù) permits,這個(gè)基本上可以確定是設(shè)置給 AQS 的 state 的,然后每個(gè)線程調(diào)用 acquire 的時(shí)候,執(zhí)行 state = state - 1,release 的時(shí)候執(zhí)行 state = state + 1,當(dāng)然,acquire 的時(shí)候,如果 state = 0,說明沒有資源了,需要等待其他線程 release
Semaphore 分公平策略和非公平策略
FairSync
static final class FairSync extends Sync {
private static final long serialVersionUID = 2014338818796000944L;
FairSync(int permits) {
super(permits);
}
protected int
tryAcquireShared(int acquires) {
for (; ; ) {
// 區(qū)別就在于是不是會(huì)先判斷是否有線程在排隊(duì),然后才進(jìn)行 CAS 減操作
if (hasQueuedPredecessors()) {
return -1;
}
int available = getState();
int remaining = available - acquires;
if (remaining < 0 || compareAndSetState(available, remaining)) {
return remaining;
}
}
}
}
NofairSync
通過對(duì)比發(fā)現(xiàn)公平和非公平的區(qū)別就在于是否多了一個(gè) hasQueuedPredecessors 的判斷
static final class NonfairSync extends Sync {
private static final long serialVersionUID = -2694183684443567898L;
NonfairSync(int permits) {
super(permits);
}
protected int
tryAcquireShared(int acquires) {
return nonfairTryAcquireShared(acquires);
}
}
final int nonfairTryAcquireShared(int acquires) {
for (; ; ) {
int available = getState();
int remaining = available -
acquires;
if (remaining < 0 || compareAndSetState(available, remaining)) {
return remaining;
}
}
}
由于后面的代碼和 CountDownLatch 的是完全一樣,都是基于共享鎖的實(shí)現(xiàn),所以也就沒必要再花時(shí)間來分析了
CyclicBarrier
CyclicBarrier 的字面意思是可循環(huán)使用(Cyclic) 的屏障(Barrier).它要做的事情是,讓一組線程到達(dá)一個(gè)屏障(也可以叫同步點(diǎn) ) 時(shí)被阻塞,直到最后一個(gè)線程到達(dá)屏障時(shí),屏障才會(huì)開門,所有被屏障攔截的線程才會(huì)繼續(xù)工作.CyclicBarrier 默認(rèn)的構(gòu)造方法是 CyclicBarrier(int parties),其參數(shù)表示屏障攔截的線程數(shù)量,每個(gè)線程調(diào)用 await 方法告訴 CyclicBarrier 當(dāng)前線程已經(jīng)到達(dá)了屏障,然后當(dāng)前線程被阻塞
使用場(chǎng)景
當(dāng)存在需要所有的子任務(wù)都完成時(shí),才執(zhí)行主任務(wù),這個(gè)時(shí)候就可以選擇使用 CyclicBarrier
使用案例
DataImportThread
public class DataImportThread extends Thread {
private CyclicBarrier cyclicBarrier;
private String path;
public DataImportThread(CyclicBarrier cyclicBarrier, String path) {
this.cyclicBarrier = cyclicBarrier;
this.path = path;
}
@Override
public void run() {
System.out.println("開始導(dǎo)入:" + path + " 位置的數(shù)據(jù)");
try {
cyclicBarrier.await();//阻塞
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
}
}
CycliBarrierDemo
public class CycliBarrierDemo extends Thread {
@Override
public void run() {
System.out.println("開始進(jìn)行數(shù)據(jù)分析");
}
public static void main(String[] args) {
CyclicBarrier cycliBarrier = new CyclicBarrier(3, new CycliBarrierDemo());
new Thread(new DataImportThread(cycliBarrier, "file 1")).start();
new Thread(new DataImportThread(cycliBarrier, "file 2")).start();
new Thread(new DataImportThread(cycliBarrier, "file 3")).start();
}
}
注意點(diǎn)
- 對(duì)于指定計(jì)數(shù)值 parties,若由于某種原因,沒有足夠的線程調(diào)用 CyclicBarrier 的 await,則所有調(diào)用 await 的線程都會(huì)被阻塞
- 同樣的 CyclicBarrier 也可以調(diào)用 await(timeout, unit),設(shè)置超時(shí)時(shí)間,在設(shè)定時(shí)間內(nèi),如果沒有足夠線程到達(dá),則解除阻塞狀態(tài),繼續(xù)工作
- 通過 reset 重置計(jì)數(shù),會(huì)使得進(jìn)入 await 的線程出現(xiàn) BrokenBarrierException
- 如果采用是 CyclicBarrier(int parties, Runnable barrierAction) 構(gòu)造方法,執(zhí)行 barrierAction 操作的是最后一個(gè)到達(dá)的線程
實(shí)現(xiàn)原理
CyclicBarrier 相比 CountDownLatch 來說,要簡(jiǎn)單很多,源碼實(shí)現(xiàn)是基于 ReentrantLock 和 Condition 的組合使用.看如下示意圖,CyclicBarrier 和 CountDownLatch 是不是很像,只是 CyclicBarrier 可以有不止一個(gè)柵欄,因?yàn)樗臇艡?Barrier) 可以重復(fù)使用(Cyclic)