1.概述
CyclicBarrier是一個同步輔助類贞间,允許一組線程互相等待,直到到達(dá)某個公共屏障點(diǎn) (common barrier point)继找。如果一個程序中有固定的線程數(shù)喘帚,并且線程之間需要相互等待户辱,這時候CyclicBarrier是一個很好的選擇。之所以叫它c(diǎn)yclic疏哗,是因?yàn)樵卺尫诺却€程之后呛讲,它可以被重用。還是那句話返奉,開始之前你需要先了解AQS的實(shí)現(xiàn)機(jī)制贝搁。
CountDownLatch和CyclicBarrier的區(qū)別:
- CountDownLatch的作用是允許1或N個線程等待其他線程完成執(zhí)行;而CyclicBarrier則是允許N個線程相互等待衡瓶。
- CountDownLatch的計數(shù)器無法被重置徘公;CyclicBarrier的計數(shù)器可以被重置后使用,因此它被稱為是循環(huán)的barrier哮针。
2. 函數(shù)列表和核心參數(shù)
//-------------------------核心參數(shù)------------------------------
// 內(nèi)部類
private static class Generation {
boolean broken = false;
}
/** 守護(hù)barrier入口的鎖 */
private final ReentrantLock lock = new ReentrantLock();
/** 等待條件关面,直到所有線程到達(dá)barrier */
private final Condition trip = lock.newCondition();
/** 要屏障的線程數(shù) */
private final int parties;
/* 當(dāng)線程都到達(dá)barrier,運(yùn)行的 barrierCommand*/
private final Runnable barrierCommand;
/** The current generation */
private Generation generation = new Generation();
//等待到達(dá)barrier的參與線程數(shù)量十厢,count=0 -> tripped
private int count;
//-------------------------函數(shù)列表------------------------------
//構(gòu)造函數(shù)等太,指定參與線程數(shù)
public CyclicBarrier(int parties)
//構(gòu)造函數(shù),指定參與線程數(shù)蛮放,并在所有線程到達(dá)barrier之后執(zhí)行給定的barrierAction邏輯
public CyclicBarrier(int parties, Runnable barrierAction);
//等待所有的參與者到達(dá)barrier
public int await();
//等待所有的參與者到達(dá)barrier缩抡,或等待給定的時間
public int await(long timeout, TimeUnit unit);
//獲取參與等待到達(dá)barrier的線程數(shù)
public int getParties();
//查詢barrier是否處于broken狀態(tài)
public boolean isBroken();
//重置barrier為初始狀態(tài)
public void reset();
//返回等待barrier的線程數(shù)量
public int getNumberWaiting();
-
Generation:每個使用中的barrier都表示為一個
generation
實(shí)例。當(dāng)barrier觸發(fā)trip條件或重置時generation
隨之改變包颁。使用barrier時有很多generation
與線程關(guān)聯(lián)瞻想,由于不確定性的方式,鎖可能分配給等待的線程娩嚼。但是在同一時間只有一個是活躍的generation
(通過count
變量確定)蘑险,并且其余的要么被銷毀,要么被trip條件等待岳悟。如果有一個中斷佃迄,但沒有隨后的重置泼差,就不需要有活躍的generation
。CyclicBarrier
的可重用特性就是通過Generation
來實(shí)現(xiàn)呵俏,每一次觸發(fā)tripped都會new一個新的Generation堆缘。 -
barrierCommand:
CyclicBarrier
的另一個特性是在所有參與線程到達(dá)barrier觸發(fā)一個自定義函數(shù),這個函數(shù)就是barrierCommand
普碎,在CyclicBarrier
的構(gòu)造函數(shù)中初始化吼肥。
3. 使用示例
public class CyclicBarrierTest2 {
public static int SIZE = 5;
private static CyclicBarrier cyclicBarrier;
public static void main(String[] args) {
cyclicBarrier = new CyclicBarrier(SIZE, () -> {
//觸發(fā)barrier時執(zhí)行的函數(shù)
System.out.println(Thread.currentThread().getName() + " barrierAction finish");
});
for (int i=0;i<SIZE;i++){
new Thread(new InnerThread(),"Thread"+i).start();
}
}
static class InnerThread implements Runnable{
@Override
public void run() {
try {
System.out.println(Thread.currentThread().getName() + " wait for barrier");
cyclicBarrier.await();
TimeUnit.SECONDS.sleep(1);
System.out.println(Thread.currentThread().getName() + " continued");
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
}
}
}
4. 源碼解析
4.1 await()
public int await() throws InterruptedException, BrokenBarrierException {
try {
return dowait(false, 0L);
} catch (TimeoutException toe) {
throw new Error(toe); // cannot happen
}
}
//await實(shí)現(xiàn)
private int dowait(boolean timed, long nanos)
throws InterruptedException, BrokenBarrierException,
TimeoutException {
final ReentrantLock lock = this.lock;
lock.lock();
try {
//當(dāng)前generation
final Generation g = generation;
if (g.broken)
throw new BrokenBarrierException();
if (Thread.interrupted()) {
breakBarrier();//線程被中斷,終止Barrier随常,喚醒所有等待線程
throw new InterruptedException();
}
int index = --count;
if (index == 0) { // tripped
boolean ranAction = false;
try {
final Runnable command = barrierCommand;
if (command != null)
command.run();//如果有barrierCommand潜沦,在所有parties到達(dá)之后運(yùn)行它
ranAction = true;
//更新barrier狀態(tài)并喚醒所有線程
nextGeneration();
return 0;
} finally {
if (!ranAction)
breakBarrier();
}
}
// loop until tripped, broken, interrupted, or timed out
//自旋等待 所有parties到達(dá) | generation被銷毀 | 線程中斷 | 超時
for (;;) {
try {
if (!timed)
trip.await();
else if (nanos > 0L)
nanos = trip.awaitNanos(nanos);
} catch (InterruptedException ie) {
if (g == generation && ! g.broken) {
breakBarrier();
throw ie;
} else {
// We're about to finish waiting even if we had not
// been interrupted, so this interrupt is deemed to
// "belong" to subsequent execution.
Thread.currentThread().interrupt();
}
}
if (g.broken)
throw new BrokenBarrierException();
if (g != generation)
return index;
if (timed && nanos <= 0L) {
breakBarrier();//超時,銷毀當(dāng)前barrier
throw new TimeoutException();
}
}
} finally {
lock.unlock();
}
}
說明:dowait()
是await()
的實(shí)現(xiàn)函數(shù)绪氛,它的作用就是讓當(dāng)前線程阻塞唆鸡,直到“有parties個線程到達(dá)barrier” 或 “當(dāng)前線程被中斷” 或 “超時”這3者之一發(fā)生,當(dāng)前線程才繼續(xù)執(zhí)行枣察。當(dāng)所有parties到達(dá)barrier(count=0
)争占,如果barrierCommand
不為空,則執(zhí)行barrierCommand
序目。然后調(diào)用nextGeneration()
進(jìn)行換代操作臂痕。
在for(;;)
自旋中。timed
是用來表示當(dāng)前是不是“超時等待”線程猿涨。如果不是握童,則通過trip.await()
進(jìn)行等待;否則叛赚,調(diào)用awaitNanos()
進(jìn)行超時等待澡绩。
小結(jié)
CyclicBarrier
主要通過獨(dú)占鎖ReentrantLock
和Condition
配合實(shí)現(xiàn)。類本身實(shí)現(xiàn)很簡單俺附,重點(diǎn)是分清CyclicBarrier
和CountDownLatch
的用法及區(qū)別肥卡,還有在jdk1.7新增的另外一個與它們相似的同步鎖Phaser
,在后面文章中會詳細(xì)講解事镣。