1 CyclicBarrier.Generation內(nèi)部類
private static class Generation {
boolean broken = false;
}
2 CyclicBarrier
2.1 CyclicBarrier中的字段
(1)lock:執(zhí)行breakBarrier方法和nextGeneration方法都需要獲取該鎖乃秀。
(2)trip:和lock綁定的Condition對象楼雹。
(3)parties:通過一個柵欄需要的線程數(shù)量筐咧。
(4)barrierCommand:在所有線程通過柵欄之前需要執(zhí)行的任務(wù)轨香。
(5)generation:一個generation代表一個柵欄。
(6)count:通過當(dāng)前柵欄需要的線程數(shù)量招刨。
private final ReentrantLock lock = new ReentrantLock();
private final Condition trip = lock.newCondition();
private final int parties;
private final Runnable barrierCommand;
private Generation generation = new Generation();
private int count;
2.2 CyclicBarrier中的構(gòu)造方法
public CyclicBarrier(int parties, Runnable barrierAction) {
if (parties <= 0) throw new IllegalArgumentException();
this.parties = parties;
this.count = parties;
this.barrierCommand = barrierAction;
}
public CyclicBarrier(int parties) {
this(parties, null);
}
2.3 CyclicBarrier中的await方法
public int await() throws InterruptedException, BrokenBarrierException {
try {
return dowait(false, 0L);
} catch (TimeoutException toe) {
throw new Error(toe);
}
}
public int await(long timeout, TimeUnit unit)
throws InterruptedException,
BrokenBarrierException,
TimeoutException {
return dowait(true, unit.toNanos(timeout));
}
2.3.1 CyclicBarrier中的dowait方法
private int dowait(boolean timed, long nanos)
throws InterruptedException, BrokenBarrierException,
TimeoutException {
final ReentrantLock lock = this.lock;
lock.lock();
try {
final Generation g = generation;
if (g.broken)
throw new BrokenBarrierException();
if (Thread.interrupted()) {
breakBarrier();
throw new InterruptedException();
}
int index = --count;
if (index == 0) {
boolean ranAction = false;
try {
final Runnable command = barrierCommand;
if (command != null)
command.run();
ranAction = true;
nextGeneration();
return 0;
} finally {
// 如果執(zhí)行barrierCommand中的run方法拋出異常
if (!ranAction)
// 打破柵欄
breakBarrier();
}
}
for (;;) {
try {
if (!timed)
trip.await();
else if (nanos > 0L)
nanos = trip.awaitNanos(nanos);
} catch (InterruptedException ie) {
// 運行到這里艰争,說明其它線程調(diào)用了當(dāng)前線程的interrupt方法
// 如果尚未更換柵欄并且尚未打破柵欄
if (g == generation && ! g.broken) {
// 打破柵欄
breakBarrier();
// 拋出異常
throw ie;
} else {
// 如果已經(jīng)更換柵欄上陕,恢復(fù)當(dāng)前線程的中斷狀態(tài)
// 如果尚未更換柵欄并且已經(jīng)打破柵欄,恢復(fù)當(dāng)前線程的中斷狀態(tài)
Thread.currentThread().interrupt();
}
}
// 如果已經(jīng)打破柵欄
if (g.broken)
// 拋出異常
throw new BrokenBarrierException();
// 如果已經(jīng)更換柵欄
if (g != generation)
return index;
// 如果阻塞超時
if (timed && nanos <= 0L) {
// 打破柵欄
breakBarrier();
// 拋出異常
throw new TimeoutException();
}
}
} finally {
lock.unlock();
}
}
2.3.1.1 CyclicBarrier中的breakBarrier方法
// 打破柵欄
private void breakBarrier() {
generation.broken = true;
count = parties;
trip.signalAll();
}
2.3.1.2 CyclicBarrier中的nextGeneration方法
// 更換柵欄
private void nextGeneration() {
trip.signalAll();
count = parties;
generation = new Generation();
}
3 CountDownLatch和CyclicBarrier的主要區(qū)別
(1)CountDownLatch可以讓一個或多個線程等待其它線程完成一些任務(wù)阶捆,再開始執(zhí)行各自的任務(wù)凌节;CyclicBarrier可以讓多個線程相互等待,當(dāng)所有線程到達之后洒试,再開始執(zhí)行各自的任務(wù)倍奢。
(2)CountDownLatch不能被重置,CyclicBarrier可以被重置垒棋。
(3)CountDownLatch利用共享鎖實現(xiàn)卒煞,CyclicBarrier利用ReentrantLock+Condition實現(xiàn)。
4 CyclicBarrier使用案例
public class Test {
public static void main(String[] args) {
CyclicBarrier cyclicBarrier = new CyclicBarrier(5, new Runnable() {
@Override
public void run() {
System.out.println("導(dǎo)游開始分發(fā)護照5鸺堋E显!!");
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("導(dǎo)游分發(fā)了所有護照9远0缛摹!");
}
});
ExecutorService executorService = Executors.newFixedThreadPool(5);
executorService.execute(new TravelTask(cyclicBarrier, "薩拉赫"));
executorService.execute(new TravelTask(cyclicBarrier, "馬內(nèi)"));
executorService.execute(new TravelTask(cyclicBarrier, "菲爾米諾"));
executorService.execute(new TravelTask(cyclicBarrier, "范迪克"));
executorService.execute(new TravelTask(cyclicBarrier, "阿利森"));
executorService.shutdown();
}
}
public class TravelTask implements Runnable {
private CyclicBarrier cyclicBarrier;
private String name;
public TravelTask(CyclicBarrier cyclicBarrier, String name) {
this.cyclicBarrier = cyclicBarrier;
this.name = name;
}
@Override
public void run() {
Random rand = new Random();
try {
TimeUnit.SECONDS.sleep(rand.nextInt(10) + 1);
System.out.println(name + "到達集合區(qū)域---");
cyclicBarrier.await();
System.out.println(name + "開始旅行啦...");
} catch (Exception e) {
e.printStackTrace();
}
}
}