并發(fā)編程之CyclicBarrier原理與使用

點(diǎn)贊再看只磷,養(yǎng)成習(xí)慣撮执,搜一搜【一角錢技術(shù)】關(guān)注更多原創(chuàng)技術(shù)文章。本文 GitHub org_hejianhui/JavaStudy 已收錄吃粒,有我的系列文章锨络。

前言

控制并發(fā)流程的工具類赌躺,作用就是幫助我們程序員更容易的讓線程之間合作,讓線程之間相互配合來滿足業(yè)務(wù)邏輯羡儿。比如讓線程A等待線程B執(zhí)行完畢后再執(zhí)行等合作策略礼患。

控制并發(fā)流程的工具類主要有:

作用 說明
Semaphore 信號量,可以通過控制“許可證”的數(shù)量掠归,來保證線程之間的配合 線程只有拿到“許可證”后才能繼續(xù)運(yùn)行缅叠,相比于其它的同步器,更靈活
CyclicBarrier 線程會等待虏冻,直到足夠多線程達(dá)到了事先規(guī)定的數(shù)目肤粱。一旦達(dá)到觸發(fā)條件,就可以進(jìn)行下一步的動作 適用于線程之間相互等待處理結(jié)果的就緒場景
Phaser 和CyclicBarrier類似厨相,但是計(jì)數(shù)可變 Java7加入的
CountDownLatch 和CyclicBarrier類似领曼,數(shù)量遞減到0時(shí),觸發(fā)動作 不可重復(fù)使用
Exchanger 讓兩個(gè)線程在合適時(shí)交換對象 適用場景:當(dāng)兩個(gè)線程工作在同一個(gè)類的不同實(shí)例上時(shí)蛮穿,用于交換數(shù)據(jù)
Condition 可以控制線程的“等待”和“喚醒” 是Object.wait() 的升級版

簡介

從字面意思看庶骄,這個(gè)類的中文意思是“循環(huán)柵欄”。大概的意思就是一個(gè)可循環(huán)利用的屏障践磅。它的作用就是會讓所有線程都等待完成后才會繼續(xù)下一步行動单刁。

舉個(gè)例子,就像生活中我們會約朋友到某個(gè)餐廳一起吃飯府适,有些朋友可能會早到羔飞,有些朋友可能會晚到,但這個(gè)餐廳規(guī)定必須等到所有人到期之后才會讓我們進(jìn)去檐春。這里的朋友們就各個(gè)線程逻淌,餐廳就是CyclicBarrier。

在JUC包中為我們提供了一個(gè)同步工具類能夠很好的模擬這類場景喇聊,它就是CyclicBarrier類恍风。利用CyclicBarrier類可以實(shí)現(xiàn)一組線程相互等待,當(dāng)所有線程都到達(dá)某個(gè)屏障點(diǎn)后再進(jìn)行后續(xù)的操作誓篱。下圖演示了這一過程朋贬。


應(yīng)用場景

可用于多線程計(jì)數(shù)數(shù)據(jù),最后合并計(jì)數(shù)結(jié)果的場景窜骄。

使用CyclicBarrier實(shí)現(xiàn)等待的線程都被稱為參與方锦募。參與方只需要執(zhí)行cyclicBarrier.await() 就可以實(shí)現(xiàn)等待。由于CyclicBarrier內(nèi)部維護(hù)了一個(gè)顯示鎖邻遏,這可以知道參與方中誰最后一個(gè)執(zhí)行cyclicBarrier.await() 糠亩。當(dāng)最后一個(gè)線程執(zhí)行完虐骑,會使得使用相應(yīng)CyclicBarrier實(shí)例的其他參與方被喚醒,而最后一個(gè)線程自身不會被暫停赎线。其流程圖如下:


public static void main(String[] args) {
        CyclicBarrier cyclicBarrier = new CyclicBarrier(7,() ->{
            System.out.println("****召喚神龍");
        });
        for(int i = 1;i <= 7; i++){
            int finalI = i;
            new Thread(() -> {
                System.out.println(Thread.currentThread().getName() + "\t 收集到第"+ finalI +"顆龍珠");
                try {
                    cyclicBarrier.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (BrokenBarrierException e) {
                    e.printStackTrace();
                }
            },String.valueOf(i)).start();
        }
    }

源碼分析

CyclicBarrier 類圖



CyclicBarrier是包含了 “ReentrantLock對象lock” 和 “Condition對象trip”廷没,它是通過獨(dú)占鎖實(shí)現(xiàn)的。

其內(nèi)部主要變量和方法如下:


成員變量

//同步操作鎖
private final ReentrantLock lock = new ReentrantLock();
//線程攔截器
private final Condition trip = lock.newCondition();
//每次攔截的線程數(shù)
private final int parties;
//換代前執(zhí)行的任務(wù)
private final Runnable barrierCommand;
//表示柵欄的當(dāng)前代
private Generation generation = new Generation();
//計(jì)數(shù)器
private int count;
 
//靜態(tài)內(nèi)部類Generation
private static class Generation {
  boolean broken = false;
}

可以看到 CyclicBarrier 內(nèi)部是通過條件隊(duì)列 trip 來對線程進(jìn)行阻塞的垂寥,并且其內(nèi)部維護(hù)了兩個(gè) int 型的變量 parties 和 count:

  • parties 表示每次攔截的線程數(shù)颠黎,該值在構(gòu)造時(shí)進(jìn)行賦值;
  • count 是內(nèi)部計(jì)數(shù)器滞项,它的初始值和 parties 相同狭归,以后隨著每次 await 方法的調(diào)用而減 1,直到減為 0 就將所有線程喚醒文判。

CycliBarrier 有一個(gè)靜態(tài)內(nèi)部類 Generation过椎,該類的對象代表柵欄的當(dāng)前代,就像玩游戲時(shí)代表的本局有些戏仓,利用它可以實(shí)現(xiàn)循環(huán)等待疚宇。barrierCommand 表示換代前執(zhí)行的任務(wù),當(dāng) count 減為 0 時(shí)表示本局游戲結(jié)束柜去,需要轉(zhuǎn)到下一句灰嫉。在轉(zhuǎn)到下一句游戲之前會將所有阻塞的線程喚醒,在喚醒所有線程之前你可以通過指定 barrierCommand 來執(zhí)行自己的任務(wù)嗓奢。

構(gòu)造函數(shù)

主要提供了兩個(gè)構(gòu)造方法

public CyclicBarrier(int parties) {
    this(parties, null);
}

public CyclicBarrier(int parties, Runnable barrierAction) {
    if (parties <= 0) throw new IllegalArgumentException();
    // parties表示“必須同時(shí)到達(dá)barrier的線程個(gè)數(shù)”。
    this.parties = parties;
    // count表示“處在等待狀態(tài)的線程個(gè)數(shù)”浑厚。
    this.count = parties;
    // barrierCommand表示“parties個(gè)線程到達(dá)barrier時(shí)股耽,會執(zhí)行的動作”。
    this.barrierCommand = barrierAction;
}

解析

  • parties 是參與線程的個(gè)數(shù)
  • 第二個(gè)構(gòu)造方法有一個(gè)Runnable參數(shù)钳幅,這個(gè)參數(shù)的意思是最后一個(gè)到達(dá)線程要執(zhí)行的動作物蝙。

重要方法

CyclicBarrier類最主要的功能就是使先到達(dá)屏障點(diǎn)的線程阻塞并等待后面的線程,其中它提供了兩種等待的方法敢艰,分別是定時(shí)等待和非定時(shí)等待诬乞。

await()方法

//非定時(shí)等待
public int await() throws InterruptedException, BrokenBarrierException {
  try {
    return dowait(false, 0L);
  } catch (TimeoutException toe) {
    throw new Error(toe);
  }
}
 
//定時(shí)等待
public int await(long timeout, TimeUnit unit) throws InterruptedException, BrokenBarrierException, TimeoutException {
  return dowait(true, unit.toNanos(timeout));
}

解析

  • 線程調(diào)用await()表示總結(jié)已經(jīng)到達(dá)柵欄
  • BrokenBarrierException表示柵欄已經(jīng)被破壞,破壞的原因可能是其中一個(gè)線程await()時(shí)被中斷或者超時(shí)钠导。

dowait()方法

可以看到不管是定時(shí)等待還是非定時(shí)等待震嫉,它們都調(diào)用了dowait方法,只不過是傳入的參數(shù)不同而已牡属。下面我們就來看看dowait方法都做了些什么票堵。

//核心等待方法
private int dowait(boolean timed, long nanos) throws InterruptedException, BrokenBarrierException, TimeoutException {
  // 顯示鎖
  final ReentrantLock lock = this.lock;
  lock.lock();
  try {
    final Generation g = generation;
    //檢查當(dāng)前柵欄是否被打翻
    if (g.broken) {
      throw new BrokenBarrierException();
    }
    //檢查當(dāng)前線程是否被中斷
    if (Thread.interrupted()) {
      //如果當(dāng)前線程被中斷會做以下三件事
      //1.打翻當(dāng)前柵欄
      //2.喚醒攔截的所有線程
      //3.拋出中斷異常
      breakBarrier();
      throw new InterruptedException();
    }
    //每次都將計(jì)數(shù)器的值減1
    int index = --count;
    //計(jì)數(shù)器的值減為0則需喚醒所有線程并轉(zhuǎn)換到下一代
    if (index == 0) {
      boolean ranAction = false;
      try {
        //喚醒所有線程前先執(zhí)行指定的任務(wù)
        final Runnable command = barrierCommand;
        if (command != null) {
          command.run();
        }
        ranAction = true;
        //喚醒所有線程并轉(zhuǎn)到下一代
        nextGeneration();
        return 0;
      } finally {
        //確保在任務(wù)未成功執(zhí)行時(shí)能將所有線程喚醒
        if (!ranAction) {
          breakBarrier();
        }
      }
    }
 
    //如果計(jì)數(shù)器不為0則執(zhí)行此循環(huán)
    for (;;) {
      try {
        //根據(jù)傳入的參數(shù)來決定是定時(shí)等待還是非定時(shí)等待
        if (!timed) {
          trip.await();
        }else if (nanos > 0L) {
          nanos = trip.awaitNanos(nanos);
        }
      } catch (InterruptedException ie) {
        //若當(dāng)前線程在等待期間被中斷則打翻柵欄喚醒其他線程
        if (g == generation && ! g.broken) {
          breakBarrier();
          throw ie;
        } else {
          //若在捕獲中斷異常前已經(jīng)完成在柵欄上的等待, 則直接調(diào)用中斷操作
          Thread.currentThread().interrupt();
        }
      }
      //如果線程因?yàn)榇蚍瓥艡诓僮鞫粏拘褎t拋出異常
      if (g.broken) {
        throw new BrokenBarrierException();
      }
      //如果線程因?yàn)閾Q代操作而被喚醒則返回計(jì)數(shù)器的值
      if (g != generation) {
        return index;
      }
      //如果線程因?yàn)闀r(shí)間到了而被喚醒則打翻柵欄并拋出異常
      if (timed && nanos <= 0L) {
        breakBarrier();
        throw new TimeoutException();
      }
    }
  } finally {
    lock.unlock();
  }
}

上面執(zhí)行的代碼相對比較容易看懂,我們再來看一下執(zhí)行流程:


  1. 獲得顯示鎖逮栅,判斷當(dāng)前線程狀態(tài)是否被中斷悴势,如果是窗宇,則執(zhí)行 breakBarrier 方法,喚醒之前阻塞的所有線程特纤,并將計(jì)數(shù)器重置军俊;
  2. 計(jì)數(shù)器 count 減 1,如果 count == 0捧存,表示最后一個(gè)線程達(dá)到柵欄粪躬,接著執(zhí)行之前指定的 Runnable 接口,同時(shí)執(zhí)行 nextGeneration 方法進(jìn)入下一代矗蕊;
  3. 否則短蜕,進(jìn)入自旋,判斷當(dāng)前線程是進(jìn)入定時(shí)等待還是非定時(shí)等待傻咖,如果在等待過程中被中斷朋魔,執(zhí)行 breakBarrier 方法,喚醒之前阻塞的所有線程卿操;
  4. 判斷是否是因?yàn)閳?zhí)行 breakBarrier 方法而被喚醒警检,如果是,則拋出異常害淤;
  5. 判斷是否是正常的換代操作而被喚醒扇雕,如果是,則返回計(jì)數(shù)器的值窥摄;
  6. 判斷是否是超時(shí)而被喚醒镶奉,如果是,則喚醒之前阻塞的所有線程崭放,并拋出異常哨苛;
  7. 釋放鎖。

breakBarrier()方法

private void breakBarrier() {
    generation.broken = true;//柵欄被打破
    count = parties;//重置count
    trip.signalAll();//喚醒之前阻塞的線程
}

nextGeneration()方法

private void nextGeneration() {
    //喚醒所以的線程
    trip.signalAll();
    //重置計(jì)數(shù)器
    count = parties;
    //重新開始
    generation = new Generation();
}

reset()方法

接下來看看柵欄重置的方法

// 重置barrier到初始狀態(tài)币砂,所有還在等待中的線程最終會拋出BrokenBarrierException建峭。
public void reset() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        breakBarrier();   // break the current generation
        nextGeneration(); // start a new generation
    } finally {
        lock.unlock();
    }
}

其它方法

CyclicBarrier 其它還提供了例如getParties,isBroken决摧,getNumberWaiting等方法亿蒸,都比較簡單,其中除了getParties由于parties被final修飾不可變掌桩,其余方法都會先去獲得互斥鎖边锁。

/**
 * 獲取當(dāng)前這一輪是否已經(jīng)broken。
 */
public boolean isBroken() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        return generation.broken;
    } finally {
        lock.unlock();
    }
}

/**
 * 獲得當(dāng)前在barrier中等待的線程數(shù)拘鞋。
 */
public int getNumberWaiting() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        return parties - count;
    } finally {
        lock.unlock();
    }
}

總結(jié)

CountDownLatch和CyclicBarrier區(qū)別

CountDownLatch和CyclicBarrier都能夠?qū)崿F(xiàn)線程之間的等待砚蓬,只不過它們側(cè)重點(diǎn)不同:

  • CountDownLatch一般用于一個(gè)或多個(gè)線程,等待其他線程執(zhí)行完任務(wù)后盆色,再才執(zhí)行灰蛙;
  • CyclicBarrier一般用于一組線程互相等待至某個(gè)狀態(tài)祟剔,然后這一組線程再同時(shí)執(zhí)行;
  • CountDownLatch 是一次性的摩梧,CyclicBarrier 是可循環(huán)利用的物延;
  • CountDownLathch是一個(gè)計(jì)數(shù)器,線程完成一個(gè)記錄一個(gè)仅父,計(jì)數(shù)器遞減叛薯,只能用一次。如下圖:
  • CyclicBarrier的計(jì)數(shù)器更像一個(gè)閥門笙纤,需要所有線程都到達(dá)耗溜,然后繼續(xù)執(zhí)行,計(jì)數(shù)器遞減省容,提供reset功能抖拴,可以多次使用。如下圖:

PS:以上代碼提交在 Githubhttps://github.com/Niuh-Study/niuh-juc-final.git

文章持續(xù)更新腥椒,可以公眾號搜一搜「 一角錢技術(shù) 」第一時(shí)間閱讀阿宅, 本文 GitHub org_hejianhui/JavaStudy 已經(jīng)收錄,歡迎 Star笼蛛。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末洒放,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子滨砍,更是在濱河造成了極大的恐慌往湿,老刑警劉巖,帶你破解...
    沈念sama閱讀 216,372評論 6 498
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件惋戏,死亡現(xiàn)場離奇詭異煌茴,居然都是意外死亡,警方通過查閱死者的電腦和手機(jī)日川,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,368評論 3 392
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來矩乐,“玉大人龄句,你說我怎么就攤上這事∩⒑保” “怎么了分歇?”我有些...
    開封第一講書人閱讀 162,415評論 0 353
  • 文/不壞的土叔 我叫張陵,是天一觀的道長欧漱。 經(jīng)常有香客問我职抡,道長,這世上最難降的妖魔是什么误甚? 我笑而不...
    開封第一講書人閱讀 58,157評論 1 292
  • 正文 為了忘掉前任缚甩,我火速辦了婚禮谱净,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘擅威。我一直安慰自己壕探,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,171評論 6 388
  • 文/花漫 我一把揭開白布郊丛。 她就那樣靜靜地躺著李请,像睡著了一般。 火紅的嫁衣襯著肌膚如雪厉熟。 梳的紋絲不亂的頭發(fā)上导盅,一...
    開封第一講書人閱讀 51,125評論 1 297
  • 那天,我揣著相機(jī)與錄音揍瑟,去河邊找鬼白翻。 笑死,一個(gè)胖子當(dāng)著我的面吹牛月培,可吹牛的內(nèi)容都是我干的嘁字。 我是一名探鬼主播,決...
    沈念sama閱讀 40,028評論 3 417
  • 文/蒼蘭香墨 我猛地睜開眼杉畜,長吁一口氣:“原來是場噩夢啊……” “哼纪蜒!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起此叠,我...
    開封第一講書人閱讀 38,887評論 0 274
  • 序言:老撾萬榮一對情侶失蹤纯续,失蹤者是張志新(化名)和其女友劉穎,沒想到半個(gè)月后灭袁,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體猬错,經(jīng)...
    沈念sama閱讀 45,310評論 1 310
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,533評論 2 332
  • 正文 我和宋清朗相戀三年茸歧,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了倦炒。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 39,690評論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡软瞎,死狀恐怖逢唤,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情涤浇,我是刑警寧澤鳖藕,帶...
    沈念sama閱讀 35,411評論 5 343
  • 正文 年R本政府宣布,位于F島的核電站只锭,受9級特大地震影響著恩,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,004評論 3 325
  • 文/蒙蒙 一喉誊、第九天 我趴在偏房一處隱蔽的房頂上張望邀摆。 院中可真熱鬧,春花似錦裹驰、人聲如沸隧熙。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,659評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽贞盯。三九已至,卻和暖如春沪饺,著一層夾襖步出監(jiān)牢的瞬間躏敢,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 32,812評論 1 268
  • 我被黑心中介騙來泰國打工整葡, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留件余,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 47,693評論 2 368
  • 正文 我出身青樓遭居,卻偏偏與公主長得像啼器,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個(gè)殘疾皇子俱萍,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,577評論 2 353