CyclicBarrier(同步容器)
作用
它允許一組線程相互等待直到所有線程都到達(dá)一個(gè)公共的屏障點(diǎn),才開(kāi)始執(zhí)行下面的操作渠驼,舉例:例如做公交車(chē)渴邦,等所有人都坐上車(chē)了谋梭,車(chē)才啟動(dòng)出發(fā)
方法
public CyclicBarrier(int parties);
public CyclicBarrier(int parties, Runnable barrierAction);
private void nextGeneration();
private void breakBarrier();
private int dowait(boolean timed, long nanos)
throws InterruptedException, BrokenBarrierException,
TimeoutException;
public int getParties();
public int await() throws InterruptedException, BrokenBarrierException;
public int await(long timeout, TimeUnit unit)
throws InterruptedException,
BrokenBarrierException,
TimeoutException;
public boolean isBroken();
public void reset();
public int getNumberWaiting();
示例
不設(shè)定阻塞時(shí)間
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* @author scottxuan
*/
@Slf4j
public class CyclicBarrierExample1 {
//線程數(shù)
private final static int threadNum = 4;
//初始化線程同步數(shù)量
final static CyclicBarrier barrier = new CyclicBarrier(2);
public static void main(String[] args) throws InterruptedException {
ExecutorService service = Executors.newCachedThreadPool();
for (int i = 0; i < threadNum; i++) {
final int num = i;
service.execute(()->{
update(num);
});
Thread.sleep(500);
}
service.shutdown();
}
public static void update(int num){
try {
log.info("thread ready {}",num);
//線程阻塞
barrier.await();
log.info("thread continue {}",num);
} catch (Exception e) {
e.printStackTrace();
}
}
}
//輸出結(jié)果 4個(gè)線程 同步數(shù)為2 2個(gè)線程就緒 阻塞釋放 開(kāi)始執(zhí)行await()后續(xù)代碼
//01:48:07.213 [pool-1-thread-1] INFO scottxuan.cyclicbarrier.CyclicBarrierExample1 - thread ready 0
//01:48:07.715 [pool-1-thread-2] INFO scottxuan.cyclicbarrier.CyclicBarrierExample1 - thread ready 1
//01:48:07.715 [pool-1-thread-2] INFO scottxuan.cyclicbarrier.CyclicBarrierExample1 - thread continue 1
//01:48:07.715 [pool-1-thread-1] INFO scottxuan.cyclicbarrier.CyclicBarrierExample1 - thread continue 0
//01:48:08.221 [pool-1-thread-1] INFO scottxuan.cyclicbarrier.CyclicBarrierExample1 - thread ready 2
//01:48:08.736 [pool-1-thread-2] INFO scottxuan.cyclicbarrier.CyclicBarrierExample1 - thread ready 3
//01:48:08.736 [pool-1-thread-2] INFO scottxuan.cyclicbarrier.CyclicBarrierExample1 - thread continue 3
//01:48:08.736 [pool-1-thread-1] INFO scottxuan.cyclicbarrier.CyclicBarrierExample1 - thread continue 2
設(shè)定阻塞時(shí)間一
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
/**
* @author scottxuan
*/
@Slf4j
public class CyclicBarrierExample2 {
//線程數(shù)
private final static int threadNum = 4;
//初始化線程同步數(shù)量
final static CyclicBarrier barrier = new CyclicBarrier(2);
public static void main(String[] args) throws InterruptedException {
ExecutorService service = Executors.newCachedThreadPool();
for (int i = 0; i < threadNum; i++) {
final int num = i;
service.execute(()->{
update(num);
});
Thread.sleep(2000);
}
service.shutdown();
}
public static void update(int num){
try {
log.info("thread ready {}",num);
//線程阻塞
barrier.await(1000, TimeUnit.MILLISECONDS);
log.info("thread continue {}",num);
} catch (Exception e) {
log.error("barrier await error");
}
}
}
//輸出結(jié)果 總共4個(gè)線程 同步數(shù)為2 每個(gè)線程阻塞了1秒后 解除阻塞 不符合同步的數(shù)量(同步數(shù)為2) 報(bào)錯(cuò)
//02:05:32.487 [pool-1-thread-1] INFO scottxuan.cyclicbarrier.CyclicBarrierExample2 - thread ready 0
//02:05:33.497 [pool-1-thread-1] ERROR scottxuan.cyclicbarrier.CyclicBarrierExample2 - barrier await error
//02:05:34.492 [pool-1-thread-1] INFO scottxuan.cyclicbarrier.CyclicBarrierExample2 - thread ready 1
//02:05:34.492 [pool-1-thread-1] ERROR scottxuan.cyclicbarrier.CyclicBarrierExample2 - barrier await error
//02:05:36.504 [pool-1-thread-1] INFO scottxuan.cyclicbarrier.CyclicBarrierExample2 - thread ready 2
//02:05:36.504 [pool-1-thread-1] ERROR scottxuan.cyclicbarrier.CyclicBarrierExample2 - barrier await error
//02:05:38.516 [pool-1-thread-1] INFO scottxuan.cyclicbarrier.CyclicBarrierExample2 - thread ready 3
//02:05:38.516 [pool-1-thread-1] ERROR scottxuan.cyclicbarrier.CyclicBarrierExample2 - barrier await error
設(shè)定阻塞時(shí)間二
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
/**
* @author scottxuan
*/
@Slf4j
public class CyclicBarrierExample2 {
//線程數(shù)
private final static int threadNum = 2;
//初始化線程同步數(shù)量
final static CyclicBarrier barrier = new CyclicBarrier(1);
public static void main(String[] args) throws InterruptedException {
ExecutorService service = Executors.newCachedThreadPool();
for (int i = 0; i < threadNum; i++) {
final int num = i;
service.execute(()->{
update(num);
});
Thread.sleep(2000);
}
service.shutdown();
}
public static void update(int num){
try {
log.info("thread ready {}",num);
//線程阻塞
barrier.await(1000, TimeUnit.MILLISECONDS);
log.info("thread continue {}",num);
} catch (Exception e) {
log.error("barrier await error");
}
}
}
//結(jié)果輸出 總共2個(gè)線程 同步數(shù)為1 線程阻塞了1秒后 解除阻塞 符合同步的數(shù)量(同步數(shù)量為1) 直接執(zhí)行await之后代碼
//02:08:40.742 [pool-1-thread-1] INFO scottxuan.cyclicbarrier.CyclicBarrierExample2 - thread ready 0
//02:08:40.746 [pool-1-thread-1] INFO scottxuan.cyclicbarrier.CyclicBarrierExample2 - thread continue 0
//02:08:42.751 [pool-1-thread-1] INFO scottxuan.cyclicbarrier.CyclicBarrierExample2 - thread ready 1
//02:08:42.752 [pool-1-thread-1] INFO scottxuan.cyclicbarrier.CyclicBarrierExample2 - thread continue 1
帶runable的構(gòu)造方法
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* @author scottxuan
*/
@Slf4j
public class CyclicBarrierExample3 {
//線程數(shù)
private final static int threadNum = 2;
//初始化線程同步數(shù)量
final static CyclicBarrier barrier = new CyclicBarrier(2,() -> {
log.info("is ready over");
});
public static void main(String[] args) throws InterruptedException {
ExecutorService service = Executors.newCachedThreadPool();
for (int i = 0; i < threadNum; i++) {
final int num = i;
service.execute(()->{
update(num);
});
}
service.shutdown();
}
public static void update(int num){
try {
log.info("thread ready {}",num);
//線程阻塞
barrier.await();
log.info("thread continue {}",num);
} catch (Exception e) {
e.printStackTrace();
}
}
}
//輸出結(jié)果 ready 就緒之后, 構(gòu)造方法中 runable方法執(zhí)行 runable執(zhí)行結(jié)束后 await()之后的代碼開(kāi)始執(zhí)行
//02:13:09.648 [pool-1-thread-2] INFO scottxuan.cyclicbarrier.CyclicBarrierExample3 - thread ready 1
//02:13:09.648 [pool-1-thread-1] INFO scottxuan.cyclicbarrier.CyclicBarrierExample3 - thread ready 0
//02:13:09.652 [pool-1-thread-1] INFO scottxuan.cyclicbarrier.CyclicBarrierExample3 - is ready over
//02:13:09.652 [pool-1-thread-1] INFO scottxuan.cyclicbarrier.CyclicBarrierExample3 - thread continue 0
//02:13:09.652 [pool-1-thread-2] INFO scottxuan.cyclicbarrier.CyclicBarrierExample3 - thread continue 1