循環(huán)屏障,它允許多個線程相互等待到一個障礙點之后才繼續(xù)執(zhí)行,可指定等待到屏障之后的執(zhí)行任務(wù),CyclicBarrier支持循環(huán)使用绍妨。
功能說明
CyclicBarrier實現(xiàn)了當(dāng)多個線程達(dá)到屏障之后才執(zhí)行的功能,可指定runable表示達(dá)到屏障之后執(zhí)行的任務(wù)柬脸。類似于CountDownLatch的功能他去。但是與CountDownLatch不同的是,CyclicBarrier可以通過reset方法重置進(jìn)行重復(fù)使用倒堕,所以Cyclicbarries適合更加復(fù)雜的場景灾测,例如可以實現(xiàn)執(zhí)行錯誤之后通過reset重試。
CyclicBarrier(int parties, Runnable barrierAction)//屏障數(shù)和等待后任務(wù)
使用場景
1垦巴、 實現(xiàn)多線程任務(wù)執(zhí)行完后的匯總媳搪。barrierAction表示匯總操作。
實現(xiàn)關(guān)鍵
private int dowait(boolean timed, long nanos)
throws InterruptedException, BrokenBarrierException,
TimeoutException {
final ReentrantLock lock = this.lock;
lock.lock();//每次嗲用await都需要獲取lock鎖
try {
final Generation g = generation;
if (g.broken)//如果當(dāng)前代broken
throw new BrokenBarrierException();
if (Thread.interrupted()) {//如果線程被中中斷骤宣,則breakBarriers秦爆,屏障將會不可用,除非調(diào)用reset開始新代
breakBarrier();
throw new InterruptedException();
}
int index = --count;
if (index == 0) { // 如果當(dāng)前時最后一個到達(dá)屏障的線程憔披,執(zhí)行runnable等限,并且開啟新的代爸吮,喚醒所有等待的線程
boolean ranAction = false;
try {
final Runnable command = barrierCommand;
if (command != null)
command.run();
ranAction = true;
nextGeneration();
return 0;
} finally {
if (!ranAction)//如果runnable執(zhí)行失敗,也會breakBarrier屏障
breakBarrier();
}
}
// loop until tripped, broken, interrupted, or timed out
for (;;) {//不是最后一個到達(dá)屏障的線程望门,通過condition進(jìn)入等待形娇,直到被觸發(fā)、broken怒允、中斷埂软、超時
try {
if (!timed)
trip.await();
else if (nanos > 0L)
nanos = trip.awaitNanos(nanos);
} catch (InterruptedException ie) {
if (g == generation && ! g.broken) {
breakBarrier();
throw ie;
} else {
// We're about to finish waiting even if we had not
// been interrupted, so this interrupt is deemed to
// "belong" to subsequent execution.
Thread.currentThread().interrupt();
}
}
if (g.broken)
throw new BrokenBarrierException();
if (g != generation)//表示開始新的代了
return index;
if (timed && nanos <= 0L) {
breakBarrier();
throw new TimeoutException();
}
}
} finally {
lock.unlock();
}
}
1、調(diào)用await方法的線程表示線程到達(dá)了屏障纫事。
2、是否全部到達(dá)屏障通過count來判斷所灸,因為涉及到多線程同步問題丽惶,所以內(nèi)部使用了ReenTrantLock來同步代碼。
3爬立、如果線程被中斷钾唬,那么會breakBarrier,屏障不可用了侠驯,并且喚醒所有等待的線程抡秆。
4、當(dāng)count為0時吟策,表示全部到達(dá)屏障了儒士,將執(zhí)行設(shè)置的runnable。然后開啟下一代檩坚,喚醒所有等待的線程着撩。如果執(zhí)行runnable失敗,則也會breakBarrier匾委。
5拖叙、否則表示不是最后一個到達(dá)屏障的線程,會通過condition來堵塞線程赂乐。直到最后一個線程執(zhí)行完之后喚醒或者超時薯鳍、中斷。
總結(jié)
CyclicBarrier通過一個count值來表示剩余到達(dá)屏障的線程挨措,由于涉及到并發(fā)操作挖滤,所以通過ReenTrantLock來實現(xiàn)同步,當(dāng)線程執(zhí)行await時运嗜,如果不是最后一個到達(dá)屏障的線程壶辜,那么會通過Condition來堵塞線程,當(dāng)count等于0時担租,表示最后一個到達(dá)屏障的線程砸民,將會喚醒等待中的線程。CyclicBarrier支持循環(huán)利用是因為其中引入了一個generation一代的概念,當(dāng)一代執(zhí)行完將開啟新的代(其實就是將數(shù)據(jù)重置了)