CyclicBarrier 也是 AQS 的同步組件
CyclicBarrier
CyclicBarrier
也是一個同步輔助類 , 它允許一組線程相互等待 , 直到到達某個公共的屏障點 , 通過它可以完成多個線程之間相互等待 ,只有當每個線程都準備好之后, 才能各自繼續(xù)往下執(zhí)行后續(xù)的操作, 和 CountDownLatch
相似的地方就是, 它也是通過計數器來實現的. 當某個線程調用了 await()
方法之后, 該線程就進入了等待狀態(tài) . 而且計數器就進行 +1
操作 , 當計數器的值達到了我們設置的初始值的時候 , 之前調用了await()
方法而進入等待狀態(tài)的線程會被喚醒繼續(xù)執(zhí)行后續(xù)的操作. 因為 CyclicBarrier
釋放線程之后可以重用, 所以又稱之為循環(huán)屏障 . CyclicBarrier
使用場景和 CountDownLatch
很相似 , 可以用于多線程計算數據, 最后合并計算結果的應用場景 .
CyclicBarrier 與 CountDownLatch 區(qū)別
CountDownLatch
的計數器只能使用一次 , 而CyclicBarrier
的計數器可以使用reset
重置 循環(huán)使用-
CountDownLatch
主要事項 1 個 或者 n 個線程需要等待其它線程完成某項操作之后才能繼續(xù)往下執(zhí)行 , 其描述的是 1 個 或者 n 個線程與其它線程的關系 ;CyclicBarrier
主要是實現了 1 個或者多個線程之間相互等待,直到所有的線程都滿足條件之后, 才執(zhí)行后續(xù)的操作 , 其描述的是內部各個線程相互等待的關系 .CyclicBarrier
假如有 5 個線程都調用了await()
方法 , 那這個 5 個線程就等著 , 當這 5 個線程都準備好之后, 它們有各自往下繼續(xù)執(zhí)行 , 如果這 5 個線程在后續(xù)有一個計算發(fā)生錯誤了 , 這里可以重置計數器 , 并讓這 5 個線程再執(zhí)行一遍 .
CyclicBarrier 代碼演示
- 演示代碼1
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* Created by Charles
*/
@Slf4j
public class CyclicBarrierExample1 {
private static CyclicBarrier barrier = new CyclicBarrier(5); // 5 個線程同步等待
public static void main(String[] args) throws Exception {
ExecutorService executor = Executors.newCachedThreadPool();
for (int i = 0; i < 10; i++) {
final int threadNum = i;
Thread.sleep(1000);
executor.execute(() -> {
try {
race(threadNum);
} catch (Exception e) {
log.error("exception", e);
}
});
}
executor.shutdown();
}
private static void race(int threadNum) throws Exception {
Thread.sleep(1000);
log.info("{} is ready", threadNum);
barrier.await(); // 有一個線程準備 ok 了 , 當達到上面設置的5個線程 的時候 , 后續(xù)代碼就開始執(zhí)行了
log.info("{} continue", threadNum);
}
}
控制臺輸出
- 演示代碼 2 (帶有等待時間)
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.*;
/**
* Created by Charles
*/
@Slf4j
public class CyclicBarrierExample2 {
private static CyclicBarrier barrier = new CyclicBarrier(5);
public static void main(String[] args) throws Exception {
ExecutorService executor = Executors.newCachedThreadPool();
for (int i = 0; i < 10; i++) {
final int threadNum = i;
Thread.sleep(1000);
executor.execute(() -> {
try {
race(threadNum);
} catch (Exception e) {
log.error("exception", e);
}
});
}
executor.shutdown();
}
private static void race(int threadNum) throws Exception {
Thread.sleep(1000);
log.info("{} is ready", threadNum);
try {
barrier.await(2000, TimeUnit.MILLISECONDS); //等待2000 毫秒后繼續(xù)執(zhí)行
} catch (BrokenBarrierException | TimeoutException e) {
log.warn("BarrierException",e);
}
log.info("{} continue", threadNum);
}
}
控制臺輸出結果如下圖 :