1. CyclicBarrier 定義
CyclicBarrier 內(nèi)部是通過 ReeantrantLock, Condition 以及計(jì)數(shù)器count, 來控制線程的執(zhí)行; 當(dāng)所有線程都到達(dá)統(tǒng)一的地方再執(zhí)行接下來的代碼.
特點(diǎn):
1. CyclicBarrier 區(qū)別于 CountDownLatch 是可以重用
2. 用于CyclicBarrier的線程其中有一個(gè)被中斷或等待超時(shí), 都會(huì)造成, barrier broken, 并且重置初始的值 generation
先看一個(gè)簡單的 demo
import org.apache.log4j.Logger;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
/**
* Created by xjk on 2016/5/9.
*/
public class TestCyclicBarrier {
private static final Logger logger = Logger.getLogger(TestCyclicBarrier.class);
private static final int THREAD_NUM = 5;
public static void main(String[] args) {
CyclicBarrier cb = new CyclicBarrier(THREAD_NUM, new Runnable() {
public void run() {
logger.info("Inside Barrier");
}
});
List<Thread> threads = new ArrayList<>();
for(int i = 0; i < THREAD_NUM; i++){
Thread thread = new Thread(new WorkerThread(cb));
threads.add(thread);
thread.start();
}
// wait until done
for(Thread thread : threads){
try {
thread.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
logger.info("All Thread done()");
}
public static class WorkerThread implements Runnable{
CyclicBarrier barrier;
public WorkerThread(CyclicBarrier barrier) {
this.barrier = barrier;
}
public void run() {
try {
logger.info("Working's waiting");
// 線程在這里等待, 直到所有線程都到達(dá)barrier
barrier.await();
logger.info("Thread ID:" + Thread.currentThread().getId() + " Working");
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
}
}
}
執(zhí)行結(jié)果:
[2017-02-15 14:12:39,506] INFO Thread-0 (TestCyclicBarrier.java:57) - Working's waiting
[2017-02-15 14:12:39,506] INFO Thread-3 (TestCyclicBarrier.java:57) - Working's waiting
[2017-02-15 14:12:39,506] INFO Thread-1 (TestCyclicBarrier.java:57) - Working's waiting
[2017-02-15 14:12:39,506] INFO Thread-2 (TestCyclicBarrier.java:57) - Working's waiting
[2017-02-15 14:12:39,506] INFO Thread-4 (TestCyclicBarrier.java:57) - Working's waiting
[2017-02-15 14:12:39,509] INFO Thread-4 (TestCyclicBarrier.java:23) - Inside Barrier
[2017-02-15 14:12:39,510] INFO Thread-4 (TestCyclicBarrier.java:60) - Thread ID:15 Working
[2017-02-15 14:12:39,510] INFO Thread-0 (TestCyclicBarrier.java:60) - Thread ID:11 Working
[2017-02-15 14:12:39,510] INFO Thread-3 (TestCyclicBarrier.java:60) - Thread ID:14 Working
[2017-02-15 14:12:39,511] INFO Thread-2 (TestCyclicBarrier.java:60) - Thread ID:13 Working
[2017-02-15 14:12:39,510] INFO Thread-1 (TestCyclicBarrier.java:60) - Thread ID:12 Working
[2017-02-15 14:12:39,512] INFO main (TestCyclicBarrier.java:42) - All Thread done()
執(zhí)行步驟:
(1) 一共有5個(gè)線程要求它們都到達(dá) barrier.await() 才能繼續(xù)向下執(zhí)行
(2) 前4個(gè)線程調(diào)用 barrier.await() 時(shí)其實(shí)時(shí)在內(nèi)部統(tǒng)一調(diào)用 Reeantrant.lock()獲取 lock, 然后再調(diào)用 Condition.await() 將lock釋放, 等待喚醒
(3) 第五個(gè)線程到達(dá) barrier.await() 處, 先調(diào)用 Reeantrant.lock() 然后發(fā)現(xiàn)自己是最后一個(gè)線程, 則直接調(diào)用 Condition.signalAll() 喚醒其他線程, 最后自己釋放 lock
(4) 其他4個(gè)線程被 signal 了都爭相獲取 lock, 最后又釋放
2. CyclicBarrier 構(gòu)造函數(shù)
下面兩個(gè)構(gòu)造函數(shù)的主要區(qū)別在于 command, 用于當(dāng)所有線程都到達(dá) barrier 時(shí)執(zhí)行的
/**
* 指定 barrierCommand 的構(gòu)造 CyclicBarrier
*/
public CyclicBarrier(int parties, Runnable barrierCommand) {
if(parties <= 0) throw new IllegalArgumentException();
this.parties = parties;
this.count = parties;
this.barrierCommand = barrierCommand;
}
/**
* 構(gòu)造 CyclicBarrier
*/
public CyclicBarrier(int parties){
this(parties, null);
}
3. CyclicBarrier 主要屬性
private static class Generation{
boolean broken = false;
}
/** The lock for guarding barrier entry */
/** 全局的重入 lock */
private final ReentrantLock lock = new ReentrantLock();
/** Condition to wait on until tripped */
/** 控制線程等待 */
private final Condition trip = lock.newCondition();
/** The number of parties */
/** 參與到這次 barrier 的參與者個(gè)數(shù) */
private final int parties;
/** The command to run when tripped */
/** 到達(dá) barrier 時(shí)執(zhí)行的command */
private final Runnable barrierCommand;
/** The current generation */
/** 初始化 generation */
private Generation generation = new Generation();
4. CyclicBarrier 生成 generation 方法
這是在 一個(gè) barrier 完成后, 重新初始化值
/**
* Updates state on barrier trip and wakes up everyone.
* Called only while holding lock.
*/
/** 生成下一個(gè) generation */
private void nextGeneration(){
// signal completion of last generation
// 喚醒所有等待的線程來獲取 AQS 的state的值
trip.signalAll();
// set up next generation
// 重新賦值計(jì)算器
count = parties;
// 重新初始化 generation
generation = new Generation();
}
5. CyclicBarrier breakBarrier 方法
breakBarrier 主要用于等待的線程當(dāng)被中斷, 或等待超時(shí)執(zhí)行
/**
* Sets current barrier generation as broken and wakes up everyone
* Called only while holding lock
*/
/** 當(dāng)某個(gè)線程被中斷 / 等待超時(shí) 則將 broken = true, 并且喚醒所有等待中的線程 */
private void breakBarrier(){
generation.broken = true;
count = parties;
trip.signalAll();
}
6. CyclicBarrier 主方法 awaitXX
await 方法主要用于等待獲取, 具體看下面的 comment
/**
* 進(jìn)行等待所有線程到達(dá) barrier
* 除非: 其中一個(gè)線程被 inetrrupt
*/
public int await() throws InterruptedException, BrokenBarrierException{
try{
return dowait(false, 0L);
}catch (TimeoutException toe){
throw new Error(toe); // cannot happen
}
}
/**
* 進(jìn)行等待所有線程到達(dá) barrier
* 除非: 等待超時(shí)
*/
public int await(long timeout, TimeUnit unit) throws Exception{
return dowait(true, unit.toNanos(timeout));
}
/**
* Main barrier code, covering the various policies
*/
/**
* CyclicBarrier 的核心方法, 主要是所有線程都獲取一個(gè) ReeantrantLock 來控制
*/
private int dowait(boolean timed, long nanos)throws InterruptedException, BrokenBarrierException, TimeoutException{
final ReentrantLock lock = this.lock;
lock.lock(); // 1. 獲取 ReentrantLock
try{
final Generation g = generation;
if(g.broken){ // 2. 判斷 generation 是否已經(jīng) broken
throw new BrokenBarrierException();
}
if(Thread.interrupted()){ // 3. 判斷線程是否中斷, 中斷后就 breakBarrier
breakBarrier();
throw new InterruptedException();
}
int index = --count; // 4. 更新已經(jīng)到達(dá) barrier 的線程數(shù)
if(index == 0){ // triped // 5. index == 0 說明所有線程到達(dá)了 barrier
boolean ranAction = false;
try{
final Runnable command = barrierCommand;
if(command != null){ // 6. 最后一個(gè)線程到達(dá) barrier, 執(zhí)行 command
command.run();
}
ranAction = true;
nextGeneration(); // 7. 更新 generation
return 0;
}finally {
if(!ranAction){
breakBarrier();
}
}
}
// loop until tripped, broken, interrupted, or timed out
for(;;){
try{
if(!timed){
trip.await(); // 8. 沒有進(jìn)行 timeout 的 await
}else if(nanos > 0L){
nanos = trip.awaitNanos(nanos); // 9. 進(jìn)行 timeout 方式的等待
}
}catch (InterruptedException e){
if(g == generation && !g.broken){ // 10. 等待的過程中線程被中斷, 則直接喚醒所有等待的 線程, 重置 broken 的值
breakBarrier();
throw e;
}else{
/**
* We're about to finish waiting even if we had not
* been interrupted, so this interrupt is deemed to
* "belong" to subsequent execution
*/
/**
* 情況
* 1. await 拋 InterruptedException && g != generation
* 所有線程都到達(dá) barrier(這是會(huì)更新 generation), 并且進(jìn)行喚醒所有的線程; 但這時(shí) 當(dāng)前線程被中斷了
* 沒關(guān)系, 當(dāng)前線程還是能獲取 lock, 但是為了讓外面的程序知道自己被中斷過, 所以自己中斷一下
* 2. await 拋 InterruptedException && g == generation && g.broken = true
* 其他線程觸發(fā)了 barrier broken, 導(dǎo)致 g.broken = true, 并且進(jìn)行 signalALL(), 但就在這時(shí)
* 當(dāng)前的線程也被 中斷, 但是為了讓外面的程序知道自己被中斷過, 所以自己中斷一下
*
*/
Thread.currentThread().interrupt();
}
}
if(g.broken){ // 11. barrier broken 直接拋異常
throw new BrokenBarrierException();
}
if(g != generation){ // 12. 所有線程到達(dá) barrier 直接返回
return index;
}
if(timed && nanos <= 0L){ // 13. 等待超時(shí)直接拋異常, 重置 generation
breakBarrier();
throw new TimeoutException();
}
}
}finally {
lock.unlock(); // 14. 調(diào)用 awaitXX 獲取lock后進(jìn)行釋放lock
}
}
7. CyclicBarrier 一般方法
/**
* 判斷 barrier 是否 broken = true
*/
public boolean isBroken(){
final ReentrantLock lock = this.lock;
lock.lock();
try{
return generation.broken;
}finally {
lock.unlock();
}
}
// 重置 barrier
public void reset(){
final ReentrantLock lock = this.lock;
lock.lock();
try{
breakBarrier(); // break the current generation
nextGeneration(); // start a new generation
}finally {
lock.unlock();
}
}
/**
* 獲取等待中的線程
*/
public int getNumberWaiting(){
final ReentrantLock lock = this.lock;
lock.lock();
try{
return parties - count;
}finally {
lock.unlock();
}
}
8. 總結(jié)
CyclicBarrier 主要用 ReeantrantLock 與 Condition 來控制線程資源的獲取, 在理解 CyclicBarrier時(shí), 首先需要理解 ReentrantLock, Condition.
參考:
Java 8 源碼分析 Condition
Java 8 源碼分析 ReentrantLock
Java多線程之JUC包:CyclicBarrier源碼學(xué)習(xí)筆記