原創(chuàng)申明:本文由公眾號【猿燈塔】原創(chuàng),轉載請說明出處標注
今天呢!燈塔君跟大家講:? ? ? ? ? ? ? ? ? ? ??
并發(fā)工具CyclicBarrier源碼分析及應用
一.CyclicBarrier簡介
1.簡介
CyclicBarrier是一個同步的輔助類榄融,允許一組線程相互之間等待袱箱,達到一個共同點,再繼續(xù)執(zhí)行痊硕。 CyclicBarrier(循環(huán)屏障) 直譯為可循環(huán)使用(Cyclic)的屏障(Barrier)拌喉。它可以讓一組線程到 達一個屏障(同步點)時被阻塞速那,直到最后一個線程到達屏障時,屏障才會開門尿背,所有被屏障攔截的線程才 會繼續(xù)工作琅坡。
JDK中的描述:
A synchronization aid that allows a set of threads to all wait for each other to reach a common barrier point. CyclicBarriers are useful in programs involving a fixed sized party of threads that must occasionally wait for each other. The barrier is called cyclic because it can be re-used after the waiting threads are released. CyclicBarrier是一個同步輔助類,它允許一組線程相互等待直到所有線程都到達一個公共的屏障點残家。 在程序中有固定數(shù)量的線程,這些線程有時候必須等待彼此售躁,這種情況下坞淮,使用CyclicBarrier很有幫 助。這個屏障之所以用循環(huán)修飾陪捷,是因為在所有的線程釋放彼此之后回窘,這個屏障是可以重新使用的
2.運行機制
二.CyclicBarrier結構圖
三.CyclicBarrier方法說明
1.CyclicBarrier(parties)
初始化相互等待的線程數(shù)量的構造方法?
2.CyclicBarrier(parties,RunnablebarrierAction)
初始化相互等待的線程數(shù)量的構造方法以及屏障線程的構造方法 屏障線程的運行時機:等待的線程數(shù)量 = parties,CyclicBarrier打開屏障之前 舉例:在分組計算中市袖,每個線程負責一部分計算啡直,最終這些線程計算結束之后,交由屏障線程進行匯總計算
3苍碟、getParties()?
獲取CyclicBarrier打開屏障的線程數(shù)量酒觅,也成為方數(shù)?
4、getNumberWaiting()?
獲取真在CyclicBarrier上等待的線程數(shù)量
5微峰、await()
在CyclicBarrier上進行阻塞等待舷丹,直到發(fā)生以下情形之一:?
a.在CyclicBarrier上等待的線程數(shù)量達到parties,則所有線程被釋放蜓肆,繼續(xù)執(zhí)行颜凯。?
b.當前線程被中斷谋币,則拋出InterruptedException異常,并停止等待症概,繼續(xù)執(zhí)行蕾额。?
c.其他等待的線程被中斷,則當前線程拋出BrokenBarrierException異常彼城,并停止等待诅蝶,繼續(xù)執(zhí)行。?
d.其他等待的線程超時精肃,則當前線程拋出BrokenBarrierException異常秤涩,并停止等待,繼續(xù)執(zhí)行司抱。?
e.其他線程調用CyclicBarrier.reset()方法筐眷,則當前線程拋出BrokenBarrierException異常,并 停止等待习柠,繼續(xù)執(zhí)行匀谣。?
6资溃、await(timeout,TimeUnit)
在CyclicBarrier上進行限時的阻塞等待,直到發(fā)生以下情形之一:?
a.在CyclicBarrier上等待的線程數(shù)量達到parties溶锭,則所有線程被釋放,繼續(xù)執(zhí)行趴捅。?
b.當前線程被中斷,則拋出InterruptedException異常拱绑,并停止等待,繼續(xù)執(zhí)行猎拨。?
c.當前線程等待超時,則拋出TimeoutException異常红省,并停止等待,繼續(xù)執(zhí)行类腮。?
d.其他等待的線程被中斷,則當前線程拋出BrokenBarrierException異常,并停止等待针饥,繼續(xù)執(zhí)行。?
e.其他等待的線程超時需频,則當前線程拋出BrokenBarrierException異常丁眼,并停止等待,繼續(xù)執(zhí)行昭殉。?
f.其他線程調用CyclicBarrier.reset()方法苞七,則當前線程拋出BrokenBarrierException異常,并 停止等待挪丢,繼續(xù)執(zhí)行蹂风。?
7、isBroken
獲取是否破損標志位broken的值乾蓬,此值有以下幾種情況:?
a.CyclicBarrier初始化時惠啄,broken=false,表示屏障未破損任内。?
b.如果正在等待的線程被中斷撵渡,則broken=true,表示屏障破損死嗦。?
c.如果正在等待的線程超時趋距,則broken=true,表示屏障破損越除。?
d.如果有線程調用CyclicBarrier.reset()方法节腐,則broken=false,表示屏障回到未破損狀態(tài)摘盆。?
8铜跑、reset?
使得CyclicBarrier回歸初始狀態(tài),直觀來看它做了兩件事: a.如果有正在等待的線程骡澈,則會拋出BrokenBarrierException異常,且這些線程停止等待掷空,繼續(xù)執(zhí)行肋殴。 b.將是否破損標志位broken置為false。
四.源碼分析
首先看一下CyclicBarrier內部聲明的一些屬性?
/**用于保護屏障入口的鎖*/
private final ReentrantLock lock = new?
ReentrantLock();?
/**線程等待條件 */?
private final Condition trip = lock.newCondition();
?/** 記錄等待的線程數(shù) */?
private final int parties;?
/**所有線程到達屏障點后坦弟,首先執(zhí)行的命令?
*/ private final Runnable barrierCommand;?
private Generation generation = new Generation();?
/**實際中仍在等待的線程數(shù)护锤,每當有一個線程到達屏障點,
count值就會減一酿傍;當一次新的運算開始后烙懦,?
count的值被重置為parties*/?
private int count;
其中,Generation是CyclicBarrier的一個靜態(tài)內部類
它只有一個boolean類型的屬性赤炒,具體代碼如下:
private static class Generation { Generation() {} // prevent access constructor creation boolean broken; // initially false }
當使用構造方法創(chuàng)建CyclicBarrier實例的時候
就是給上面這些屬性賦值
//創(chuàng)建一個CyclicBarrier實例亏较,parties指定參與相互等待的線程數(shù)雪情,?
//barrierAction指定當所有線程到達屏障點之后巡通,
首先執(zhí)行的操作宴凉,該操作由最后一個進入屏障點線程執(zhí)行.?
public CyclicBarrier(int parties,?
Runnable barrierAction)?{?
if (parties <= 0) throw new IllegalArgumentException();?
this.parties = parties; this.count = parties; this.barrierCommand = barrierAction; }
//創(chuàng)建一個CyclicBarrier實例弥锄,parties指定
?參與相互等待的線程數(shù)?
public CyclicBarrier(int parties) { this(parties, null); }
CyclicBarrier.await方法調用CyclicBarrier.dowait()
每次調用await()都會使計數(shù)器-1叉讥,當減少到0 時就會
喚醒所有的線程 图仓,當調用await()方法時救崔,當前線程已經
到達屏障點六孵,當前線程阻塞進入休眠狀態(tài)
//該方法被調用時表示當前線程已經到達屏障點劫窒,當前線程阻塞進入休眠狀態(tài)?
//直到所有線程都到達屏障點拆座,當前線程才會被喚醒?
public int await() throws InterruptedException, BrokenBarrierException { try {return dowait(false, 0L); } catch (TimeoutException toe) { throw new Error(toe); // cannot happen } }
當前線程已經到達屏障點孕索,當前線程阻塞進入休眠狀態(tài)
//該方法被調用時表示當前線程已經到達屏障點.當前? ?線程阻塞進入休眠狀態(tài)?
//在timeout指定的超時時間內搞旭,等待其他參與線程
? 到達屏障點?
//如果超出指定的等待時間則拋TimeoutException異常肄渗,如果該時間小于等于零,則此方法根本不會等待?
public int await(long timeout, TimeUnit unit)?
throws InterruptedException, BrokenBarrierException,?
TimeoutException { return dowait(true, unit.toNanos(timeout)); }
dowait()方法
private int dowait(boolean timed, long nanos)?
throws InterruptedException, BrokenBarrierException, TimeoutException?
{ //使用獨占資源鎖控制多線程并發(fā)進入這段代碼
final ReentrantLock lock = this.lock;?
//獨占鎖控制線程并發(fā)訪問?
lock.lock();?
try {final Generation g = generation;?
if (g.broken) throw new BrokenBarrierException();?
//檢查當前線程是否被中斷?
if (Thread.interrupted()) {?
//如果當前線程被中斷會做以下三件事?
//1.打翻當前柵欄?
//2.喚醒攔截的所有線程?
//3.拋出中斷異常?
breakBarrier();?
throw new InterruptedException(); }
//每調用一次await()方法,計數(shù)器就減一?
int index = --count;?
//計數(shù)器的值減為0則需喚醒所有線程并轉換到下一代?
if (index == 0) { // tripped boolean ranAction = false;?
try {
//如果在創(chuàng)建CyclicBarrier實例時設置了barrierAction翁垂,則先執(zhí)行?
barrierAction inal Runnable command = barrierCommand;
?if (command != null)?
? ? ?command.run();?
? ? ?ranAction = true;?
//當所有參與的線程都到達屏障點沿猜,為喚醒所有處于
? 休眠狀態(tài)的線程做準備工作?
//需要注意的是啼肩,喚醒所有阻塞線程不是在這里 nextGeneration();?
return 0;?
} finally {?
//確保在任務未成功執(zhí)行時能將所有線程喚醒?
if (!ranAction)
?breakBarrier();?
}?
}
// loop until tripped, broken,?
interrupted, or timed out?
//如果計數(shù)器不為0則執(zhí)行此循環(huán)
for (;;)?
{ try {
//根據(jù)傳入的參數(shù)來決定是定時等待還是非定時等待
if (!timed)?
//讓當前執(zhí)行的線程阻塞祈坠,處于休眠狀態(tài) trip.await();?
else if (nanos > 0L)?
//讓當前執(zhí)行的線程阻塞赦拘,在超時時間內處于休眠狀態(tài)?
nanos = trip.awaitNanos(nanos); } catch (InterruptedException ie) {?
//若當前線程在等待期間被中斷則打翻柵欄喚醒其他線程
if (g == generation && ! g.broken) { breakBarrier();?
throw ie; }?
else {?
// We're about to finish waiting even?
? ?if we had not
// been interrupted, so this interrupt?
? ?is deemed to?
// "belong" to subsequent execution. Thread.currentThread().interrupt(); } }
//如果線程因為打翻柵欄操作而被喚醒則拋出異常?
if (g.broken) throw new BrokenBarrierException();?
if (g != generation) return index;?
//如果線程因為時間到了而被喚醒則打翻柵欄
? 并拋出異常?
if (timed && nanos <= 0L) {?
breakBarrier();?
throw new TimeoutException(); }?
}?
}?
finally { lock.unlock(); } }
每次調用await方法都會使內部的計數(shù)器臨時變量-1
當減少到0時躺同,就會調用nextGeneration方法
private void nextGeneration() {?
// signal completion of last generation trip.signalAll();?
// set up next generation?
count = parties; generation = new Generation(); }
在這里喚醒所有阻塞的線程?
提醒:在聲明CyclicBarrier的時候還可以傳一個Runnable的實現(xiàn)類蹋艺,當計數(shù)器減少到0時捎谨,
會執(zhí)行該 實現(xiàn)類?到這里CyclicBarrier的
實現(xiàn)原理基本已經都清楚了下面來深入源碼分析
一下線程阻塞代碼 trip.await()和線程喚醒trip.signalAll()的實現(xiàn)侍芝。
//await()是AQS內部類ConditionObject中的方法 public final void await() throws InterruptedException {?
//如果線程中斷拋異常?
if (Thread.interrupted())?
? ?throw new InterruptedException();?
//新建Node節(jié)點,并將新節(jié)點加入到Condition
? 等待隊列中?
//Condition等待隊列是AQS內部類
ConditionObject實現(xiàn)的凶赁,ConditionObject
有兩個屬性虱肄,
分別是firstWaiter和lastWaiter
都是Node類型?
//firstWaiter和lastWaiter分別用于代表Condition等待隊列的頭結點和尾節(jié)點 Node node = addConditionWaiter();?
//釋放獨占鎖咏窿,讓其它線程可以獲取到dowait()
? 方法中的獨占鎖?
int savedState = fullyRelease(node);?
int interruptMode = 0;?
//檢測此節(jié)點是否在資源等待隊列(AQS同步隊列)中集嵌, //如果不在,說明此線程還沒有競爭資源鎖的權利怜珍,
此線程繼續(xù)阻塞酥泛,直到檢測到此節(jié)點在 資源等待隊列上(AQS同步隊列)中
//這里出現(xiàn)了兩個等待隊列嫌拣,分別是Condition等待
? 隊列和AQS資源鎖等待隊列(或者說是 同步隊列) //Condition等待隊列是等待被喚醒的線程隊列
?AQS資源鎖等待隊列是等待獲取資源鎖 的隊列?
while (!isOnSyncQueue(node))?
{?
//阻塞當前線程异逐,當前線程進入休眠狀態(tài),可以看到
? 這里使用LockSupport.park阻 塞當前線程 LockSupport.park(this);?
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; }
if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode?
= REINTERRUPT;?
if (node.nextWaiter != null)?
// clean up if cancelled unlinkCancelledWaiters();?
if (interruptMode != 0) reportInterruptAfterWait(interruptMode); }
//addConditionWaiter()是AQS內部類ConditionObject中的方法?
private Node addConditionWaiter()?
{ Node t = lastWaiter;?
// 將condition等待隊列中,節(jié)點狀態(tài)不是
CONDITION的節(jié)點箩祥,從condition等待隊列中移除
if (t != null && t.waitStatus != Node.CONDITION)?
{ unlinkCancelledWaiters();?
t = lastWaiter; }
//以下操作是用此線程構造一個節(jié)點,并將之加入到condition等待隊列尾部?
Node node = new Node(Thread.currentThread(), Node.CONDITION);?
if (t == null)?
firstWaiter = node;?
else
t.nextWaiter = node;?
lastWaiter = node; return node; }
//signalAll是AQS內部類ConditionObject中的方法
public final void signalAll() {
?if (!isHeldExclusively())?
throw new IllegalMonitorStateException(); //Condition等待隊列的頭結點
Node first = firstWaiter;?
if (first != null) doSignalAll(first); }
private void doSignalAll(Node first) { lastWaiter = firstWaiter = null; do {Node next = first.nextWaiter; first.nextWaiter = null;
//將Condition等待隊列中的Node節(jié)點按之前順序
都轉移到了AQS同步隊列中 transferForSignal(first);?
first = next;?
} while (first != null); }
final boolean transferForSignal
(Node node) {?
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))?
return false;?
//這里將Condition等待隊列中的Node節(jié)點插入到AQS同步隊列的尾部?
Node p = enq(node);?
int ws = p.waitStatus; if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)) LockSupport.unpark(node.thread); return true; }
//ReentrantLock#unlock()方法?
public void unlock() {?
//Sync是ReentrantLock的內部類,繼承自AbstractQueuedSynchronizer捐凭,
它是 ReentrantLock中公平鎖和非公平鎖的基礎實現(xiàn) sync.release(1); }
public final boolean release(int arg) {?
//釋放鎖?
if (tryRelease(arg)) {?
//AQS同步隊列頭結點?
Node h = head; if (h != null && h.waitStatus != 0)?
//喚醒節(jié)點中的線程?
unparkSuccessor(h); return true; }
return false; }
private void unparkSuccessor(Node node)?
{ int ws = node.waitStatus;?
if (ws < 0) compareAndSetWaitStatus(node, ws, 0);?
Node s = node.next;?
if (s == null || s.waitStatus > 0)?
{ s = null; for (Node t = tail;?
t != null && t != node; t = t.prev)?
if (t.waitStatus <= 0) s = t; }
if (s != null) //喚醒阻塞線程 LockSupport.unpark(s.thread); }
365天干貨不斷,可以微信搜索「猿燈塔」第一時間閱讀茁肠,回復【資料】【面試】【簡歷】有我準備的一線大廠面試資料和簡歷模板