java并發(fā)之CyclicBarrier
知識導(dǎo)讀
- CyclicBarrier是一個阻塞器森书,當(dāng)阻塞線程達(dá)到CyclicBarrier指定數(shù)量時澎迎,所有線程被喚醒執(zhí)行,否則阻塞該線程
- CyclicBarrier可以循環(huán)使用
- CyclicBarrier是基于ReentrantLock和ReentrantLock的一個Condition實現(xiàn),每次調(diào)用await方法后加鎖灌闺,然后計數(shù)減1,減之后不為0,則調(diào)用Condition的await方法坏瞄,當(dāng)前線程進入阻塞等待狀態(tài)桂对;當(dāng)計數(shù)減之后為0則signalAll,激活所有等待線程鸠匀。
- CyclicBarrier上阻塞的線程蕉斜,任意一個線程被中斷、超時缀棍、執(zhí)行異常都會導(dǎo)致CyclicBarrier被打破宅此,從而導(dǎo)致所有阻塞線程被喚醒
原理
CyclicBarrier屏障初始化時規(guī)定一個數(shù)目,然后計算調(diào)用了CyclicBarrier.await()進入等待的線程數(shù)爬范。
- 如果線程數(shù)未達(dá)到這個數(shù)量父腕,則線程阻塞等待;
- 當(dāng)線程數(shù)達(dá)到了這個數(shù)目時青瀑,所有進入等待狀態(tài)的線程被喚醒并繼續(xù)運行
CyclicBarrier中使用了一個ReentrantLock和該ReentrantLock的一個Condition璧亮。使用Condition.await方法進行線程阻塞。使用Condition.notifyAll方法喚醒所有阻塞線程斥难。
CyclicBarrier使用了兩個數(shù)來記錄屏障允許通過的線程數(shù)閾值枝嘶。parties值不變,count值用來記錄剩余需阻塞的線程哑诊。當(dāng)count=0的時候群扶,屏障被打開,重置count=parties镀裤,實現(xiàn)CyclicBarrier的重復(fù)使用竞阐。
CyclicBarrier柵欄被放開分兩種情況
- 線程數(shù)達(dá)到閾值: 打開柵欄,喚醒所有線程淹禾,柵欄重復(fù)使用
- 運行異常: 打破柵欄馁菜,喚醒所有線程,柵欄不可再用铃岔,包括任意線程被中斷汪疮、超時峭火、運行異常等導(dǎo)致的異常
源碼分析
CyclicBarrier內(nèi)部封裝了一個ReentrantLock和一個Condition用于進行線程的阻塞和喚醒。
定義了parties變量用來記錄線程數(shù)的閾值智嚷,初始化時賦值卖丸,count變量用來動態(tài)記錄剩余需要進入的線程數(shù)
Generation變量可以用于區(qū)分CyclicBarrier是被正常打開還是異常后打破
barrierCommand變量是一個Runnable任務(wù),當(dāng)柵欄被正常打開的時候執(zhí)行該任務(wù)盏道。
private final ReentrantLock lock = new ReentrantLock();
/** Condition to wait on until tripped */
private final Condition trip = lock.newCondition();
/** The number of parties */
private final int parties;
/* The command to run when tripped */
private final Runnable barrierCommand;
/** The current generation */
private Generation generation = new Generation();
public CyclicBarrier(int parties, Runnable barrierAction) {
if (parties <= 0) throw new IllegalArgumentException();
this.parties = parties; //阻塞的閾值稍浆,不會變
this.count = parties; //記錄 還剩多少個阻塞會喚醒所有線程
this.barrierCommand = barrierAction;
}
CyclicBarrier的await方法,用于實現(xiàn)在CyclicBarrier柵欄上進行阻塞線程猜嘱。
public int await() throws InterruptedException, BrokenBarrierException {
try {
return dowait(false, 0L);
} catch (TimeoutException toe) {
throw new Error(toe); // cannot happen
}
}
await方法調(diào)用dowait方法衅枫,實現(xiàn)具體邏輯
- 在進入方法前首先要獲取ReentrantLock鎖資源
- 限制被打破的柵欄不能重復(fù)使用
- 如果線程被中斷,調(diào)用breakBarrier方法打破柵欄朗伶,柵欄不可再用
- 每進來一個工作線程弦撩,count-1,當(dāng)count值為0的時候论皆,則調(diào)用nextGeneration方法打開柵欄
- 如果count不為0益楼,則自旋調(diào)用Condition的await方法,阻塞當(dāng)前線程
- 如果超時点晴、中斷感凤、異常則調(diào)用breakBarrier方法打破柵欄
private int dowait(boolean timed, long nanos)
throws InterruptedException, BrokenBarrierException,
TimeoutException {
final ReentrantLock lock = this.lock;
lock.lock();
try {
final Generation g = generation;
if (g.broken)//被打破的柵欄不能再使用
throw new BrokenBarrierException();
if (Thread.interrupted()) {
breakBarrier(); //被中斷,則打破柵欄
throw new InterruptedException();
}
int index = --count; //每次執(zhí)行 count-1
if (index == 0) { //當(dāng)index = 0 的時候粒督,打開柵欄陪竿,喚醒所有等待線程
boolean ranAction = false;
try {
final Runnable command = barrierCommand;
if (command != null)
command.run();
ranAction = true;
nextGeneration();//正常結(jié)束,則喚醒所有線程坠陈,重置Generation
return 0;
} finally {
if (!ranAction)
breakBarrier();// 喚醒所有等待的線程
}
}
//自旋萨惑,直到線程被interrupted,或者超時或者被notifyAll喚醒
// loop until tripped, broken, interrupted, or timed out
for (;;) {
try {
if (!timed)
trip.await();
else if (nanos > 0L)
nanos = trip.awaitNanos(nanos);
} catch (InterruptedException ie) {
if (g == generation && ! g.broken) {
breakBarrier();
throw ie;
} else {
//標(biāo)記中斷信號
Thread.currentThread().interrupt();
}
}
if (g.broken)
throw new BrokenBarrierException();
if (g != generation)
return index;
//超時 打破柵欄
if (timed && nanos <= 0L) {
breakBarrier();
throw new TimeoutException();
}
}
} finally {
lock.unlock();
}
}
正常結(jié)束調(diào)用nextGeneration方法打開柵欄仇矾,所有的線程被喚醒執(zhí)行
- 調(diào)用Condition的signalAll方法庸蔼,喚醒等待的所有線程
- 重置count為parties,這時CyclicBarrier可以重復(fù)使用的關(guān)鍵
- 創(chuàng)建一個新的可用的Generation
private void nextGeneration() {
// signal completion of last generation
trip.signalAll();
// set up next generation
count = parties;
generation = new Generation();
}
非正常結(jié)束贮匕,調(diào)用breakBarrier方法打破柵欄姐仅,所有的線程被喚醒執(zhí)行
- 標(biāo)記generation被打破。不能再用了
- 重置count為parties刻盐,這時CyclicBarrier可以重復(fù)使用的關(guān)鍵
- 調(diào)用Condition的signalAll方法掏膏,喚醒等待的所有線程
private void breakBarrier() {
generation.broken = true;
count = parties;
trip.signalAll();
}
CyclicBarrier對外提供了reset方法,可以不用等阻塞線程達(dá)到閾值敦锌,然后打開柵欄馒疹,激活所有的線程
- 調(diào)用breakBarrier方法,喚醒所有線程
- 創(chuàng)建一個新的Generation
public void reset() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
breakBarrier(); // break the current generation
nextGeneration(); // start a new generation
} finally {
lock.unlock();
}
}