java并發(fā)之CyclicBarrier

java并發(fā)之CyclicBarrier

知識導(dǎo)讀

  • CyclicBarrier是一個阻塞器森书,當(dāng)阻塞線程達(dá)到CyclicBarrier指定數(shù)量時澎迎,所有線程被喚醒執(zhí)行,否則阻塞該線程
  • CyclicBarrier可以循環(huán)使用
  • CyclicBarrier是基于ReentrantLock和ReentrantLock的一個Condition實現(xiàn),每次調(diào)用await方法后加鎖灌闺,然后計數(shù)減1,減之后不為0,則調(diào)用Condition的await方法坏瞄,當(dāng)前線程進入阻塞等待狀態(tài)桂对;當(dāng)計數(shù)減之后為0則signalAll,激活所有等待線程鸠匀。
  • CyclicBarrier上阻塞的線程蕉斜,任意一個線程被中斷、超時缀棍、執(zhí)行異常都會導(dǎo)致CyclicBarrier被打破宅此,從而導(dǎo)致所有阻塞線程被喚醒

原理

CyclicBarrier屏障初始化時規(guī)定一個數(shù)目,然后計算調(diào)用了CyclicBarrier.await()進入等待的線程數(shù)爬范。

  • 如果線程數(shù)未達(dá)到這個數(shù)量父腕,則線程阻塞等待;
  • 當(dāng)線程數(shù)達(dá)到了這個數(shù)目時青瀑,所有進入等待狀態(tài)的線程被喚醒并繼續(xù)運行

CyclicBarrier中使用了一個ReentrantLock和該ReentrantLock的一個Condition璧亮。使用Condition.await方法進行線程阻塞。使用Condition.notifyAll方法喚醒所有阻塞線程斥难。

CyclicBarrier使用了兩個數(shù)來記錄屏障允許通過的線程數(shù)閾值枝嘶。parties值不變,count值用來記錄剩余需阻塞的線程哑诊。當(dāng)count=0的時候群扶,屏障被打開,重置count=parties镀裤,實現(xiàn)CyclicBarrier的重復(fù)使用竞阐。

CyclicBarrier柵欄被放開分兩種情況

  • 線程數(shù)達(dá)到閾值: 打開柵欄,喚醒所有線程淹禾,柵欄重復(fù)使用
  • 運行異常: 打破柵欄馁菜,喚醒所有線程,柵欄不可再用铃岔,包括任意線程被中斷汪疮、超時峭火、運行異常等導(dǎo)致的異常

源碼分析

CyclicBarrier內(nèi)部封裝了一個ReentrantLock和一個Condition用于進行線程的阻塞和喚醒。

定義了parties變量用來記錄線程數(shù)的閾值智嚷,初始化時賦值卖丸,count變量用來動態(tài)記錄剩余需要進入的線程數(shù)

Generation變量可以用于區(qū)分CyclicBarrier是被正常打開還是異常后打破

barrierCommand變量是一個Runnable任務(wù),當(dāng)柵欄被正常打開的時候執(zhí)行該任務(wù)盏道。

private final ReentrantLock lock = new ReentrantLock();
/** Condition to wait on until tripped */
private final Condition trip = lock.newCondition();
/** The number of parties */
private final int parties;
/* The command to run when tripped */
private final Runnable barrierCommand;
/** The current generation */
private Generation generation = new Generation();
public CyclicBarrier(int parties, Runnable barrierAction) {
    if (parties <= 0) throw new IllegalArgumentException();
    this.parties = parties;  //阻塞的閾值稍浆,不會變
    this.count = parties; //記錄  還剩多少個阻塞會喚醒所有線程
    this.barrierCommand = barrierAction;
}

CyclicBarrier的await方法,用于實現(xiàn)在CyclicBarrier柵欄上進行阻塞線程猜嘱。

public int await() throws InterruptedException, BrokenBarrierException {
    try {
        return dowait(false, 0L);
    } catch (TimeoutException toe) {
        throw new Error(toe); // cannot happen
    }
}

await方法調(diào)用dowait方法衅枫,實現(xiàn)具體邏輯

  1. 在進入方法前首先要獲取ReentrantLock鎖資源
  2. 限制被打破的柵欄不能重復(fù)使用
  3. 如果線程被中斷,調(diào)用breakBarrier方法打破柵欄朗伶,柵欄不可再用
  4. 每進來一個工作線程弦撩,count-1,當(dāng)count值為0的時候论皆,則調(diào)用nextGeneration方法打開柵欄
  5. 如果count不為0益楼,則自旋調(diào)用Condition的await方法,阻塞當(dāng)前線程
  6. 如果超時点晴、中斷感凤、異常則調(diào)用breakBarrier方法打破柵欄
private int dowait(boolean timed, long nanos)
    throws InterruptedException, BrokenBarrierException,
           TimeoutException {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        final Generation g = generation;
        if (g.broken)//被打破的柵欄不能再使用
            throw new BrokenBarrierException();

        if (Thread.interrupted()) {
            breakBarrier(); //被中斷,則打破柵欄
            throw new InterruptedException();
        }

        int index = --count; //每次執(zhí)行 count-1
        if (index == 0) {  //當(dāng)index = 0 的時候粒督,打開柵欄陪竿,喚醒所有等待線程
            boolean ranAction = false;
            try {
                final Runnable command = barrierCommand;
                if (command != null)
                    command.run();
                ranAction = true;
                nextGeneration();//正常結(jié)束,則喚醒所有線程坠陈,重置Generation
                return 0;
            } finally {
                if (!ranAction)
                    breakBarrier();// 喚醒所有等待的線程
            }
        }
        //自旋萨惑,直到線程被interrupted,或者超時或者被notifyAll喚醒
        // loop until tripped, broken, interrupted, or timed out
        for (;;) {
            try {
                if (!timed)
                    trip.await();
                else if (nanos > 0L)
                    nanos = trip.awaitNanos(nanos);
            } catch (InterruptedException ie) {
                if (g == generation && ! g.broken) {
                    breakBarrier();
                    throw ie;
                } else {
                  //標(biāo)記中斷信號
                    Thread.currentThread().interrupt();
                }
            }
            if (g.broken)
                throw new BrokenBarrierException();
            if (g != generation)
                return index;
            //超時 打破柵欄
            if (timed && nanos <= 0L) {
                breakBarrier();
                throw new TimeoutException();
            }
        }
    } finally {
        lock.unlock();
    }
}

正常結(jié)束調(diào)用nextGeneration方法打開柵欄仇矾,所有的線程被喚醒執(zhí)行

  1. 調(diào)用Condition的signalAll方法庸蔼,喚醒等待的所有線程
  2. 重置count為parties,這時CyclicBarrier可以重復(fù)使用的關(guān)鍵
  3. 創(chuàng)建一個新的可用的Generation
private void nextGeneration() {
    // signal completion of last generation
    trip.signalAll();
    // set up next generation
    count = parties;
    generation = new Generation();
}

非正常結(jié)束贮匕,調(diào)用breakBarrier方法打破柵欄姐仅,所有的線程被喚醒執(zhí)行

  1. 標(biāo)記generation被打破。不能再用了
  2. 重置count為parties刻盐,這時CyclicBarrier可以重復(fù)使用的關(guān)鍵
  3. 調(diào)用Condition的signalAll方法掏膏,喚醒等待的所有線程
private void breakBarrier() {
    generation.broken = true;
    count = parties;
    trip.signalAll();
}

CyclicBarrier對外提供了reset方法,可以不用等阻塞線程達(dá)到閾值敦锌,然后打開柵欄馒疹,激活所有的線程

  1. 調(diào)用breakBarrier方法,喚醒所有線程
  2. 創(chuàng)建一個新的Generation
public void reset() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        breakBarrier();   // break the current generation
        nextGeneration(); // start a new generation
    } finally {
        lock.unlock();
    }
}
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末乙墙,一起剝皮案震驚了整個濱河市颖变,隨后出現(xiàn)的幾起案子生均,更是在濱河造成了極大的恐慌,老刑警劉巖腥刹,帶你破解...
    沈念sama閱讀 221,888評論 6 515
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件马胧,死亡現(xiàn)場離奇詭異,居然都是意外死亡衔峰,警方通過查閱死者的電腦和手機佩脊,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 94,677評論 3 399
  • 文/潘曉璐 我一進店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來垫卤,“玉大人威彰,你說我怎么就攤上這事『校” “怎么了抱冷?”我有些...
    開封第一講書人閱讀 168,386評論 0 360
  • 文/不壞的土叔 我叫張陵崔列,是天一觀的道長梢褐。 經(jīng)常有香客問我,道長赵讯,這世上最難降的妖魔是什么盈咳? 我笑而不...
    開封第一講書人閱讀 59,726評論 1 297
  • 正文 為了忘掉前任,我火速辦了婚禮边翼,結(jié)果婚禮上鱼响,老公的妹妹穿的比我還像新娘。我一直安慰自己组底,他們只是感情好丈积,可當(dāng)我...
    茶點故事閱讀 68,729評論 6 397
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著债鸡,像睡著了一般江滨。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上厌均,一...
    開封第一講書人閱讀 52,337評論 1 310
  • 那天唬滑,我揣著相機與錄音,去河邊找鬼棺弊。 笑死晶密,一個胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的模她。 我是一名探鬼主播稻艰,決...
    沈念sama閱讀 40,902評論 3 421
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼侈净!你這毒婦竟也來了尊勿?” 一聲冷哼從身側(cè)響起归苍,我...
    開封第一講書人閱讀 39,807評論 0 276
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎运怖,沒想到半個月后拼弃,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 46,349評論 1 318
  • 正文 獨居荒郊野嶺守林人離奇死亡摇展,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 38,439評論 3 340
  • 正文 我和宋清朗相戀三年吻氧,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片咏连。...
    茶點故事閱讀 40,567評論 1 352
  • 序言:一個原本活蹦亂跳的男人離奇死亡盯孙,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出祟滴,到底是詐尸還是另有隱情振惰,我是刑警寧澤,帶...
    沈念sama閱讀 36,242評論 5 350
  • 正文 年R本政府宣布垄懂,位于F島的核電站骑晶,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏草慧。R本人自食惡果不足惜桶蛔,卻給世界環(huán)境...
    茶點故事閱讀 41,933評論 3 334
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望漫谷。 院中可真熱鬧仔雷,春花似錦、人聲如沸舔示。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,420評論 0 24
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽惕稻。三九已至竖共,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間缩宜,已是汗流浹背肘迎。 一陣腳步聲響...
    開封第一講書人閱讀 33,531評論 1 272
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留锻煌,地道東北人妓布。 一個月前我還...
    沈念sama閱讀 48,995評論 3 377
  • 正文 我出身青樓,卻偏偏與公主長得像宋梧,于是被迫代替她去往敵國和親匣沼。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 45,585評論 2 359