一個多線程協(xié)同器品追,它可以讓一組線程相互等待,當(dāng)?shù)却臄?shù)量達(dá)到預(yù)設(shè)數(shù)量時這組線程通過等待繼續(xù)工作冯丙。說得形象點肉瓦,CyclicBarrier就好比汽車站滾動發(fā)車的模式,把客車看著CyclicBarrier银还,乘客看著是各個任務(wù)線程风宁,當(dāng)乘客到達(dá)客車時,需要等待另外的乘客蛹疯,當(dāng)乘客到齊后自動發(fā)車戒财,如果等待乘客超時了,則將乘客全部趕下車(司機太兇殘了)捺弦,然后重新安排依次上車(是否要上車由乘客自己決定)饮寞;每個上車的乘客都需要判斷自己是否是這輛車的最后一個乘客,如果不是列吼,則上車后立即開始睡覺幽崩,如果是最后一個,則他需要叫醒所有乘客寞钥。當(dāng)然客車站在創(chuàng)建這些客車的時候可能會做一些額外的事情慌申,例如所有乘客到齊后,司機給大伙一人發(fā)一瓶礦泉水,或者是其它的蹄溉,但是前提條件就是乘客到齊咨油。
圖中的CyclicBarrier需要等待8個線程到達(dá)后才會“發(fā)車”,目前已經(jīng)到達(dá)的線程有4個柒爵,還需要等待4個線程役电;線程上車的過程(也就是進(jìn)入await的過程)是要進(jìn)行排隊的,這里是通過ReentrantLock來實現(xiàn)的棉胀,上車后的睡眠是通過鎖的條件等待Condition來實現(xiàn)的法瑟。
首先看一下它的內(nèi)部整體結(jié)構(gòu)
public class CyclicBarrier {
//一個標(biāo)識,標(biāo)識這一次的協(xié)同是否完成(正常完成唁奢,異常完成)
private static class Generation {
boolean broken = false;
}
//線程進(jìn)入條件等待時需要獲取鎖
private final ReentrantLock lock = new ReentrantLock();
//等待條件
private final Condition trip = lock.newCondition();
//每次需要協(xié)同的線程數(shù)(客車的準(zhǔn)載數(shù))
private final int parties;
//這組線程(parties)滿足協(xié)同條件后需要做的一件事情
private final Runnable barrierCommand;
//標(biāo)識實例霎挟,一個generation代表一次線程協(xié)同
private Generation generation = new Generation();
//還需要等待的線程數(shù)量(還未上車的乘客數(shù))
private int count;
//最后一個乘客上車后使用的工具,喚醒所有乘客麻掸,
private void nextGeneration() {
// signal completion of last generation
trip.signalAll();
// set up next generation
count = parties;
generation = new Generation(); //換一輛車
}
//等待超時后氓扛,司機生氣了,就用這個方法把大家叫醒论笔,然后把這輛車標(biāo)記為broken,把所有人趕下車
private void breakBarrier() {
generation.broken = true;
count = parties;
trip.signalAll();
}
//車輛的構(gòu)造器采郎,客車占為車輛設(shè)置的規(guī)則
public CyclicBarrier(int parties, Runnable barrierAction) {
if (parties <= 0) throw new IllegalArgumentException();
this.parties = parties;
this.count = parties;
this.barrierCommand = barrierAction;
}
//同上,只是線程協(xié)同完成后不需要做額外的動作
public CyclicBarrier(int parties) {
this(parties, null);
}
/**
一些核心方法
**/
}
核心方法
CyclicBarrier的核心方法是await狂魔,該方法是線程相互等待的關(guān)鍵蒜埋,它有兩種實現(xiàn),一種是帶等待超時的最楷,一種是不會等待超時:
public int await() throws InterruptedException, BrokenBarrierException {
try {
return dowait(false, 0L);
} catch (TimeoutException toe) {
throw new Error(toe); // cannot happen
}
}
public int await(long timeout, TimeUnit unit) throws InterruptedException, BrokenBarrierException,TimeoutException {
return dowait(true, unit.toNanos(timeout));
}
從代碼可以看出整份,其核心都是使用了dowait這個方法
private int dowait(boolean timed, long nanos)
throws InterruptedException, BrokenBarrierException,
TimeoutException {
final ReentrantLock lock = this.lock;
lock.lock(); //獲取鎖,這里可以看出線程進(jìn)入等待是單線程的
try {
final Generation g = generation; //當(dāng)前的實例標(biāo)記
if (g.broken) //新到達(dá)的線程判斷實例是否正常籽孙,如不正常則拋出異常
throw new BrokenBarrierException();
if (Thread.interrupted()) { //新到達(dá)的線程判斷線程中斷狀態(tài)
breakBarrier(); //如果線程被中斷烈评,則標(biāo)記當(dāng)前實例為中斷,并喚醒所有等待線程
throw new InterruptedException();
}
int index = --count; //上車成功犯建,還需要上車的人數(shù)減1
if (index == 0) { // tripped //判斷是否是最后一個乘客
boolean ranAction = false;
try {
final Runnable command = barrierCommand;
if (command != null)
command.run(); //最后一個乘客上車成功讲冠,人滿,則執(zhí)行客車統(tǒng)一的規(guī)定
ranAction = true;
nextGeneration(); //喚醒本車所有乘客适瓦,并幫助喚來下一輛車竿开。
return 0;
} finally {
if (!ranAction) //如果在執(zhí)行客車統(tǒng)一任務(wù)的時候出了問題,則整趟車標(biāo)記為broken,喚醒所有乘客并趕下車
breakBarrier();
}
}
//如果上車的不是最后一個乘客
for (;;) {
try {
if (!timed)
trip.await(); //不需要判斷睡眠時間玻熙,一直睡
else if (nanos > 0L) //設(shè)置睡眠時間并睡眠
nanos = trip.awaitNanos(nanos);
} catch (InterruptedException ie) { //如果在失眠過程中被中斷(這里不是被正常喚醒否彩,是被中斷)
if (g == generation && ! g.broken) { //如果沒有換車,并且客車也沒被標(biāo)記為broken
breakBarrier(); //則被中斷的線程(乘客)負(fù)責(zé)將該輛車標(biāo)記為中斷
throw ie;
} else {
Thread.currentThread().interrupt(); //如果已經(jīng)換車嗦随,或者被標(biāo)記為了broken,則保存中斷狀態(tài)列荔,繼續(xù)后面的執(zhí)行
}
}
if (g.broken)
throw new BrokenBarrierException(); //被標(biāo)記為了broken(這可能自己前面標(biāo)的,也可能其它線程標(biāo)的),則所有的線程都拋出異常贴浙。
if (g != generation) //如果是正常被喚醒筷转,則直接返回還需上車的人(理論上應(yīng)該是0)
return index;
if (timed && nanos <= 0L) { //如果是應(yīng)為等待超時,則拋出TimeoutException
breakBarrier();
throw new TimeoutException();
}
}
} finally {
lock.unlock();
}
}
總結(jié)
從代碼可以看出悬而,CyclicBarrier的實現(xiàn)是利用條件等待,用到條件等待當(dāng)然就會用到鎖锭汛。
多個線程協(xié)調(diào)過程中笨奠,只要有一個線程被中斷或者發(fā)生異常,則整個協(xié)調(diào)取消唤殴。
CyclicBarrier與CountDownLatch異同點
相同點:
1.都能讓多個線程協(xié)調(diào)般婆,在某一個點上等待
不同點:
1.CyclicBarrier是多個線程自行協(xié)同,當(dāng)線程到達(dá)等待數(shù)量時自動放行朵逝,而CountDownLatch是多個線程阻塞后蔚袍,需要外界條件達(dá)到某種狀態(tài)的時候才會被統(tǒng)一喚醒,即CyclicBarrier只需要各個線程await配名,而CountDownLatch還需要額外是countDown啤咽。
2.實現(xiàn)上,CyclicBarrier是使用獨占鎖+Condition實現(xiàn)的渠脉,而CountDownLatch是自己實現(xiàn)AQS宇整,利用共享鎖的原理實現(xiàn)。
3.CountDownLatch一旦滿足條件后需要重新初始化才能再使用芋膘,而CyclicBarrier可以循環(huán)使用鳞青。