CyclicBarrier 源碼分析 (基于Java 8)

1. CyclicBarrier 定義

CyclicBarrier 內(nèi)部是通過 ReeantrantLock, Condition 以及計(jì)數(shù)器count, 來控制線程的執(zhí)行; 當(dāng)所有線程都到達(dá)統(tǒng)一的地方再執(zhí)行接下來的代碼.

特點(diǎn):
1. CyclicBarrier 區(qū)別于 CountDownLatch 是可以重用
2. 用于CyclicBarrier的線程其中有一個(gè)被中斷或等待超時(shí), 都會(huì)造成, barrier broken, 并且重置初始的值 generation

先看一個(gè)簡單的 demo


import org.apache.log4j.Logger;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;

/**
 * Created by xjk on 2016/5/9.
 */
public class TestCyclicBarrier {

    private static final Logger logger = Logger.getLogger(TestCyclicBarrier.class);

    private static final int THREAD_NUM = 5;


    public static void main(String[] args) {
        CyclicBarrier cb = new CyclicBarrier(THREAD_NUM, new Runnable() {
            public void run() {
                logger.info("Inside Barrier");
            }
        });

        List<Thread> threads = new ArrayList<>();
        for(int i = 0; i < THREAD_NUM; i++){
            Thread thread = new Thread(new WorkerThread(cb));
            threads.add(thread);
            thread.start();
        }

        // wait until done
        for(Thread thread : threads){
            try {
                thread.join();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        logger.info("All Thread done()");
    }



    public static class WorkerThread implements Runnable{

        CyclicBarrier barrier;

        public WorkerThread(CyclicBarrier barrier) {
            this.barrier = barrier;
        }

        public void run() {
            try {
                logger.info("Working's waiting");
                // 線程在這里等待, 直到所有線程都到達(dá)barrier
                barrier.await();
                logger.info("Thread ID:" + Thread.currentThread().getId() + " Working");
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (BrokenBarrierException e) {
                e.printStackTrace();
            }

        }
    }
}

執(zhí)行結(jié)果:

[2017-02-15 14:12:39,506] INFO  Thread-0 (TestCyclicBarrier.java:57) - Working's waiting
[2017-02-15 14:12:39,506] INFO  Thread-3 (TestCyclicBarrier.java:57) - Working's waiting
[2017-02-15 14:12:39,506] INFO  Thread-1 (TestCyclicBarrier.java:57) - Working's waiting
[2017-02-15 14:12:39,506] INFO  Thread-2 (TestCyclicBarrier.java:57) - Working's waiting
[2017-02-15 14:12:39,506] INFO  Thread-4 (TestCyclicBarrier.java:57) - Working's waiting
[2017-02-15 14:12:39,509] INFO  Thread-4 (TestCyclicBarrier.java:23) - Inside Barrier
[2017-02-15 14:12:39,510] INFO  Thread-4 (TestCyclicBarrier.java:60) - Thread ID:15 Working
[2017-02-15 14:12:39,510] INFO  Thread-0 (TestCyclicBarrier.java:60) - Thread ID:11 Working
[2017-02-15 14:12:39,510] INFO  Thread-3 (TestCyclicBarrier.java:60) - Thread ID:14 Working
[2017-02-15 14:12:39,511] INFO  Thread-2 (TestCyclicBarrier.java:60) - Thread ID:13 Working
[2017-02-15 14:12:39,510] INFO  Thread-1 (TestCyclicBarrier.java:60) - Thread ID:12 Working
[2017-02-15 14:12:39,512] INFO  main (TestCyclicBarrier.java:42) - All Thread done()

執(zhí)行步驟:
(1) 一共有5個(gè)線程要求它們都到達(dá) barrier.await() 才能繼續(xù)向下執(zhí)行
(2) 前4個(gè)線程調(diào)用 barrier.await() 時(shí)其實(shí)時(shí)在內(nèi)部統(tǒng)一調(diào)用 Reeantrant.lock()獲取 lock, 然后再調(diào)用 Condition.await() 將lock釋放, 等待喚醒
(3) 第五個(gè)線程到達(dá) barrier.await() 處, 先調(diào)用 Reeantrant.lock() 然后發(fā)現(xiàn)自己是最后一個(gè)線程, 則直接調(diào)用 Condition.signalAll() 喚醒其他線程, 最后自己釋放 lock
(4) 其他4個(gè)線程被 signal 了都爭相獲取 lock, 最后又釋放

2. CyclicBarrier 構(gòu)造函數(shù)

下面兩個(gè)構(gòu)造函數(shù)的主要區(qū)別在于 command, 用于當(dāng)所有線程都到達(dá) barrier 時(shí)執(zhí)行的

/**
 * 指定 barrierCommand 的構(gòu)造 CyclicBarrier
 */
public CyclicBarrier(int parties, Runnable barrierCommand) {
    if(parties <= 0) throw new IllegalArgumentException();
    this.parties = parties;
    this.count = parties;
    this.barrierCommand = barrierCommand;
}

/**
 * 構(gòu)造 CyclicBarrier
 */
public CyclicBarrier(int parties){
    this(parties, null);
}
3. CyclicBarrier 主要屬性
private static class Generation{
    boolean broken = false;
}

/** The lock for guarding barrier entry */
/** 全局的重入 lock */
private final ReentrantLock lock = new ReentrantLock();
/** Condition to wait on until tripped */
/** 控制線程等待  */
private final Condition trip = lock.newCondition();
/** The number of parties */
/** 參與到這次 barrier 的參與者個(gè)數(shù) */
private final int parties;
/** The command to run when tripped */
/** 到達(dá) barrier 時(shí)執(zhí)行的command */
private final Runnable barrierCommand;
/** The current generation */
/** 初始化 generation */
private Generation generation = new Generation();
4. CyclicBarrier 生成 generation 方法

這是在 一個(gè) barrier 完成后, 重新初始化值

/**
 * Updates state on barrier trip and wakes up everyone.
 * Called only while holding lock.
 */
/** 生成下一個(gè) generation */
private void nextGeneration(){
    // signal completion of last generation
    // 喚醒所有等待的線程來獲取 AQS 的state的值
    trip.signalAll();
    // set up next generation
    // 重新賦值計(jì)算器
    count = parties;
    // 重新初始化 generation
    generation = new Generation();
}
5. CyclicBarrier breakBarrier 方法

breakBarrier 主要用于等待的線程當(dāng)被中斷, 或等待超時(shí)執(zhí)行

/**
 * Sets current barrier generation as broken and wakes up everyone
 * Called only while holding lock
 */
/** 當(dāng)某個(gè)線程被中斷 / 等待超時(shí) 則將 broken = true, 并且喚醒所有等待中的線程 */
private void breakBarrier(){
    generation.broken = true;
    count = parties;
    trip.signalAll();
}
6. CyclicBarrier 主方法 awaitXX

await 方法主要用于等待獲取, 具體看下面的 comment

/**
 * 進(jìn)行等待所有線程到達(dá) barrier
 * 除非: 其中一個(gè)線程被 inetrrupt
 */
public int await() throws InterruptedException, BrokenBarrierException{
    try{
        return dowait(false, 0L);
    }catch (TimeoutException toe){
        throw new Error(toe); // cannot happen
    }
}

/**
 * 進(jìn)行等待所有線程到達(dá) barrier
 * 除非: 等待超時(shí)
 */
public int await(long timeout, TimeUnit unit) throws Exception{
    return dowait(true, unit.toNanos(timeout));
}

/**
 * Main barrier code, covering the various policies
 */
/**
 * CyclicBarrier 的核心方法, 主要是所有線程都獲取一個(gè) ReeantrantLock 來控制
 */
private int dowait(boolean timed, long nanos)throws InterruptedException, BrokenBarrierException, TimeoutException{
    final ReentrantLock lock = this.lock;
    lock.lock();                            // 1. 獲取 ReentrantLock
    try{
        final Generation g = generation;

        if(g.broken){                       // 2. 判斷 generation 是否已經(jīng) broken
            throw new BrokenBarrierException();
        }

        if(Thread.interrupted()){           // 3. 判斷線程是否中斷, 中斷后就 breakBarrier
            breakBarrier();
            throw new InterruptedException();
        }

        int index = --count;                // 4. 更新已經(jīng)到達(dá) barrier 的線程數(shù)
        if(index == 0){ // triped           // 5. index == 0 說明所有線程到達(dá)了 barrier
            boolean ranAction = false;
            try{
                final Runnable command = barrierCommand;
                if(command != null){        // 6. 最后一個(gè)線程到達(dá) barrier, 執(zhí)行 command
                    command.run();
                }
                ranAction = true;
                nextGeneration();           // 7. 更新 generation
                return 0;
            }finally {
                if(!ranAction){
                    breakBarrier();
                }
            }
        }

        // loop until tripped, broken, interrupted, or timed out
        for(;;){
            try{
                if(!timed){
                    trip.await();           // 8. 沒有進(jìn)行 timeout 的 await
                }else if(nanos > 0L){
                    nanos = trip.awaitNanos(nanos); // 9. 進(jìn)行 timeout 方式的等待
                }
            }catch (InterruptedException e){
                if(g == generation && !g.broken){ // 10. 等待的過程中線程被中斷, 則直接喚醒所有等待的 線程, 重置 broken 的值
                    breakBarrier();
                    throw e;
                }else{
                    /**
                     * We're about to finish waiting even if we had not
                     * been interrupted, so this interrupt is deemed to
                     * "belong" to subsequent execution
                     */
                    /**
                     * 情況
                     *  1. await 拋 InterruptedException && g != generation
                     *      所有線程都到達(dá) barrier(這是會(huì)更新 generation), 并且進(jìn)行喚醒所有的線程; 但這時(shí) 當(dāng)前線程被中斷了
                     *      沒關(guān)系, 當(dāng)前線程還是能獲取 lock, 但是為了讓外面的程序知道自己被中斷過, 所以自己中斷一下
                     *  2. await 拋 InterruptedException && g == generation && g.broken = true
                     *      其他線程觸發(fā)了 barrier broken, 導(dǎo)致 g.broken = true, 并且進(jìn)行 signalALL(), 但就在這時(shí)
                     *      當(dāng)前的線程也被 中斷, 但是為了讓外面的程序知道自己被中斷過, 所以自己中斷一下
                     *
                     */
                    Thread.currentThread().interrupt();
                }
            }



            if(g.broken){                       // 11. barrier broken 直接拋異常
                throw new BrokenBarrierException();
            }

            if(g != generation){                 // 12. 所有線程到達(dá) barrier 直接返回
                return index;
            }

            if(timed && nanos <= 0L){           // 13. 等待超時(shí)直接拋異常, 重置 generation
                breakBarrier();
                throw new TimeoutException();
            }
        }
    }finally {
        lock.unlock();                          // 14. 調(diào)用 awaitXX 獲取lock后進(jìn)行釋放lock
    }
}
7. CyclicBarrier 一般方法
/**
 * 判斷 barrier 是否 broken = true
 */
public boolean isBroken(){
    final ReentrantLock lock = this.lock;
    lock.lock();
    try{
        return generation.broken;
    }finally {
        lock.unlock();
    }
}

// 重置 barrier
public void reset(){
    final ReentrantLock lock = this.lock;
    lock.lock();
    try{
        breakBarrier();  // break the current generation
        nextGeneration(); // start a new generation
    }finally {
        lock.unlock();
    }
}

/**
 * 獲取等待中的線程
 */
public int getNumberWaiting(){
    final ReentrantLock lock = this.lock;
    lock.lock();
    try{
        return parties - count;
    }finally {
        lock.unlock();
    }
}
8. 總結(jié)

CyclicBarrier 主要用 ReeantrantLock 與 Condition 來控制線程資源的獲取, 在理解 CyclicBarrier時(shí), 首先需要理解 ReentrantLock, Condition.

參考:
Java 8 源碼分析 Condition
Java 8 源碼分析 ReentrantLock
Java多線程之JUC包:CyclicBarrier源碼學(xué)習(xí)筆記

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末击碗,一起剝皮案震驚了整個(gè)濱河市脊奋,隨后出現(xiàn)的幾起案子缆娃,更是在濱河造成了極大的恐慌廷粒,老刑警劉巖枝哄,帶你破解...
    沈念sama閱讀 211,948評(píng)論 6 492
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異聂示,居然都是意外死亡彬檀,警方通過查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,371評(píng)論 3 385
  • 文/潘曉璐 我一進(jìn)店門溯泣,熙熙樓的掌柜王于貴愁眉苦臉地迎上來虐秋,“玉大人,你說我怎么就攤上這事垃沦】透” “怎么了?”我有些...
    開封第一講書人閱讀 157,490評(píng)論 0 348
  • 文/不壞的土叔 我叫張陵肢簿,是天一觀的道長靶剑。 經(jīng)常有香客問我蜻拨,道長,這世上最難降的妖魔是什么桩引? 我笑而不...
    開封第一講書人閱讀 56,521評(píng)論 1 284
  • 正文 為了忘掉前任缎讼,我火速辦了婚禮,結(jié)果婚禮上坑匠,老公的妹妹穿的比我還像新娘血崭。我一直安慰自己,他們只是感情好笛辟,可當(dāng)我...
    茶點(diǎn)故事閱讀 65,627評(píng)論 6 386
  • 文/花漫 我一把揭開白布功氨。 她就那樣靜靜地躺著序苏,像睡著了一般手幢。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上忱详,一...
    開封第一講書人閱讀 49,842評(píng)論 1 290
  • 那天围来,我揣著相機(jī)與錄音,去河邊找鬼匈睁。 笑死监透,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的航唆。 我是一名探鬼主播胀蛮,決...
    沈念sama閱讀 38,997評(píng)論 3 408
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢(mèng)啊……” “哼糯钙!你這毒婦竟也來了粪狼?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 37,741評(píng)論 0 268
  • 序言:老撾萬榮一對(duì)情侶失蹤任岸,失蹤者是張志新(化名)和其女友劉穎再榄,沒想到半個(gè)月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體享潜,經(jīng)...
    沈念sama閱讀 44,203評(píng)論 1 303
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡困鸥,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,534評(píng)論 2 327
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了剑按。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片疾就。...
    茶點(diǎn)故事閱讀 38,673評(píng)論 1 341
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖艺蝴,靈堂內(nèi)的尸體忽然破棺而出虐译,到底是詐尸還是另有隱情,我是刑警寧澤吴趴,帶...
    沈念sama閱讀 34,339評(píng)論 4 330
  • 正文 年R本政府宣布漆诽,位于F島的核電站侮攀,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏厢拭。R本人自食惡果不足惜兰英,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,955評(píng)論 3 313
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望供鸠。 院中可真熱鬧畦贸,春花似錦、人聲如沸楞捂。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,770評(píng)論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽寨闹。三九已至胶坠,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間繁堡,已是汗流浹背沈善。 一陣腳步聲響...
    開封第一講書人閱讀 32,000評(píng)論 1 266
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留椭蹄,地道東北人闻牡。 一個(gè)月前我還...
    沈念sama閱讀 46,394評(píng)論 2 360
  • 正文 我出身青樓,卻偏偏與公主長得像绳矩,于是被迫代替她去往敵國和親罩润。 傳聞我的和親對(duì)象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 43,562評(píng)論 2 349

推薦閱讀更多精彩內(nèi)容