9.CyclicBarrier

CyclicBarrier

如果說CountDownLatch是一次性的侦啸,那么CyclicBarrier正好可以循環(huán)使用纹坐。它允許一組線程互相等待贿讹,直到到達(dá)某個公共屏障點(diǎn) (common barrier point)萎羔。所謂屏障點(diǎn)就是一組任務(wù)執(zhí)行完畢的時刻滔吠。

清單1 一個使用CyclicBarrier的例子

package xylz.study.concurrency.lock;

import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;

public class CyclicBarrierDemo {

? final CyclicBarrier barrier;

? final int MAX_TASK;

? public CyclicBarrierDemo(int cnt) {
? barrier = new CyclicBarrier(cnt + 1);
? MAX_TASK = cnt;
? }

? public void doWork(final Runnable work) {
? new Thread() {

? public void run() {
? work.run();
? try {
? int index = barrier.await();
? doWithIndex(index);
? } catch (InterruptedException e) {
? return;
? } catch (BrokenBarrierException e) {
? return;
? }
? }
? }.start();
? }

? private void doWithIndex(int index) {
? if (index == MAX_TASK / 3) {
? System.out.println("Left 30%.");
? } else if (index == MAX_TASK / 2) {
? System.out.println("Left 50%");
? } else if (index == 0) {
? System.out.println("run over");
? }
? }

? public void waitForNext() {
? try {
? doWithIndex(barrier.await());
? } catch (InterruptedException e) {
? return;
? } catch (BrokenBarrierException e) {
? return;
? }
? }

? public static void main(String[] args) {
? final int count = 10;
? CyclicBarrierDemo demo = new CyclicBarrierDemo(count);
? for (int i = 0; i < 100; i++) {
? demo.doWork(new Runnable() {

? public void run() {
? //do something
? try {
? Thread.sleep(1000L);
? } catch (Exception e) {
? return;
? }
? }
? });
? if ((i + 1) % count == 0) {
? demo.waitForNext();
? }
? }
? }

}

清單1描述的是一個周期性處理任務(wù)的例子抵恋,在這個例子中有一對的任務(wù)(100個)焕议,希望每10個為一組進(jìn)行處理,當(dāng)前僅當(dāng)上一組任務(wù)處理完成后才能進(jìn)行下一組馋记,另外在每一組任務(wù)中号坡,當(dāng)任務(wù)剩下50%懊烤,30%以及所有任務(wù)執(zhí)行完成時向觀察者發(fā)出通知。

在這個例子中宽堆,CyclicBarrierDemo 構(gòu)建了一個count+1的任務(wù)組(其中一個任務(wù)時為了外界方便掛起主線程)腌紧。每一個子任務(wù)里,人物本身執(zhí)行完畢后都需要等待同組內(nèi)其它任務(wù)執(zhí)行完成后才能繼續(xù)畜隶。同時在剩下任務(wù)50%壁肋、30%已經(jīng)0時執(zhí)行特殊的其他任務(wù)(發(fā)通知)。

很顯然CyclicBarrier有以下幾個特點(diǎn):

  • await()方法將掛起線程籽慢,直到同組的其它線程執(zhí)行完畢才能繼續(xù)
  • await()方法返回線程執(zhí)行完畢的索引浸遗,注意,索引時從任務(wù)數(shù)-1開始的箱亿,也就是第一個執(zhí)行完成的任務(wù)索引為parties-1,最后一個為0跛锌,這個parties為總?cè)蝿?wù)數(shù),清單中是cnt+1
  • CyclicBarrier 是可循環(huán)的届惋,顯然名稱說明了這點(diǎn)髓帽。在清單1中,每一組任務(wù)執(zhí)行完畢就能夠執(zhí)行下一組任務(wù)脑豹。

另外除了CyclicBarrier除了以上特點(diǎn)外郑藏,還有以下幾個特點(diǎn):

  • 如果屏障操作不依賴于掛起的線程,那么任何線程都可以執(zhí)行屏障操作瘩欺。在清單1中可以看到并沒有指定那個線程執(zhí)行50%必盖、30%、0%的操作俱饿,而是一組線程(cnt+1)個中任何一個線程只要到達(dá)了屏障點(diǎn)都可以執(zhí)行相應(yīng)的操作
  • CyclicBarrier 的構(gòu)造函數(shù)允許攜帶一個任務(wù)歌粥,這個任務(wù)將在0%屏障點(diǎn)執(zhí)行,它將在await()==0后執(zhí)行稍途。
  • CyclicBarrier 如果在await時因?yàn)橹袛喔罅摺⑹ ⒊瑫r等原因提前離開了屏障點(diǎn)械拍,那么任務(wù)組中的其他任務(wù)將立即被中斷突勇,以InterruptedException異常離開線程。
  • 所有await()之前的操作都將在屏障點(diǎn)之前運(yùn)行坷虑,也就是CyclicBarrier 的內(nèi)存一致性效果

CyclicBarrier 的所有API如下:

  • public CyclicBarrier(int parties) 創(chuàng)建一個新的 CyclicBarrier甲馋,它將在給定數(shù)量的參與者(線程)處于等待狀態(tài)時啟動,但它不會在啟動 barrier 時執(zhí)行預(yù)定義的操作迄损。
  • public CyclicBarrier(int parties, Runnable barrierAction) 創(chuàng)建一個新的 CyclicBarrier定躏,它將在給定數(shù)量的參與者(線程)處于等待狀態(tài)時啟動,并在啟動 barrier 時執(zhí)行給定的屏障操作,該操作由最后一個進(jìn)入 barrier 的線程執(zhí)行痊远。
  • public int await() throws InterruptedException, BrokenBarrierException 在所有參與者都已經(jīng)在此 barrier 上調(diào)用 await 方法之前垮抗,將一直等待。
  • public int await(long timeout,TimeUnit unit) throws InterruptedException, BrokenBarrierException,TimeoutException 在所有參與者都已經(jīng)在此屏障上調(diào)用 await 方法之前將一直等待,或者超出了指定的等待時間碧聪。
  • *public int getNumberWaiting() *返回當(dāng)前在屏障處等待的參與者數(shù)目冒版。此方法主要用于調(diào)試和斷言。
  • public int getParties() 返回要求啟動此 barrier 的參與者數(shù)目逞姿。
  • public boolean isBroken() 查詢此屏障是否處于損壞狀態(tài)辞嗡。
  • public void reset() 將屏障重置為其初始狀態(tài)。

針對以上API滞造,下面來探討下CyclicBarrier 的實(shí)現(xiàn)原理续室,以及為什么有這樣的API。

清單2 CyclicBarrier.await*()的實(shí)現(xiàn)片段

? private int dowait(boolean timed, long nanos)
? throws InterruptedException, BrokenBarrierException,
? TimeoutException {
? final ReentrantLock lock = this.lock;
? lock.lock();
? try {
? final Generation g = generation;
? if (g.broken)
? throw new BrokenBarrierException();

? if (Thread.interrupted()) {
? breakBarrier();
? throw new InterruptedException();
? }

? int index = --count;
? if (index == 0) { // tripped
? boolean ranAction = false;
? try {
? final Runnable command = barrierCommand;
? if (command != null)
? command.run();
? ranAction = true;
? nextGeneration();
? return 0;
? } finally {
? if (!ranAction)
? breakBarrier();
? }
? }

? // loop until tripped, broken, interrupted, or timed out
? for (;;) {
? try {
? if (!timed)
? trip.await();
? else if (nanos > 0L)
? nanos = trip.awaitNanos(nanos);
? } catch (InterruptedException ie) {
? if (g == generation && ! g.broken) {
? breakBarrier();
? throw ie;
? } else {
? Thread.currentThread().interrupt();
? }
? }

? if (g.broken)
? throw new BrokenBarrierException();

? if (g != generation)
? return index;

? if (timed && nanos <= 0L) {
? breakBarrier();
? throw new TimeoutException();
? }
? }
? } finally {
? lock.unlock();
? }
}

清單2有點(diǎn)復(fù)雜谒养,這里一點(diǎn)一點(diǎn)的剖析挺狰,并且還原到最原始的狀態(tài)。

利用前面學(xué)到的知識买窟,我們知道要想讓線程等待其他線程執(zhí)行完畢她渴,那么已經(jīng)執(zhí)行完畢的線程(進(jìn)入await*()方法)就需要park(),直到超時或者被中斷蔑祟,或者被其它線程喚醒。

前面說過CyclicBarrier 的特點(diǎn)是要么大家都正常執(zhí)行完畢沉唠,要么大家都異常被中斷疆虚,不會其中有一個被中斷而其它正常執(zhí)行完畢的現(xiàn)象存在。這種特點(diǎn)叫all-or-none满葛。類似的概念是原子操作中的要么大家都執(zhí)行完径簿,要么一個操作都不執(zhí)行完。當(dāng)前這其實(shí)是兩個概念了嘀韧。要完成這樣的特點(diǎn)就必須有一個狀態(tài)來描述曾經(jīng)是否有過線程被中斷(broken)了篇亭,這樣后面執(zhí)行完的線程就該知道是否需要繼續(xù)等待了。而在CyclicBarrier 中Generation 就是為了完成這件事情的锄贷。Generation的定義非常簡單译蒂,整個結(jié)構(gòu)就只有一個變量boolean broken = false;,定義是否發(fā)生了broken操作谊却。

由于有競爭資源的存在(broken/index)柔昼,所以毫無疑問需要一把鎖lock。拿到鎖后整個過程是這樣的:

  1. 檢查是否存在中斷位(broken)炎辨,如果存在就立即以BrokenBarrierException異常返回捕透。此異常描述的是線程進(jìn)入屏障被破壞的等待狀態(tài)。否則進(jìn)行2。
  2. 檢查當(dāng)前線程是否被中斷乙嘀,如果是那么就設(shè)置中斷位(使其它將要進(jìn)入等待的線程知道)末购,另外喚醒已經(jīng)等待的線程,同時以InterruptedException異常返回虎谢,表示線程要處理中斷盟榴。否則進(jìn)行3。
  3. 將剩余任務(wù)數(shù)減1嘉冒,如果此時剩下的任務(wù)數(shù)為0曹货,也就是達(dá)到了公共屏障點(diǎn),那么就執(zhí)行屏障點(diǎn)任務(wù)(如果有的話)讳推,同時創(chuàng)建新的Generation(在這個過程中會喚醒其它所有線程顶籽,因此當(dāng)前線程是屏障點(diǎn)線程,那么其它線程就都應(yīng)該在等待狀態(tài))银觅。否則進(jìn)行4礼饱。
  4. 到這里說明還沒有到達(dá)屏障點(diǎn),那么此時線程就應(yīng)該park()究驴。很顯然在下面的for循環(huán)中就是要park線程镊绪。這里park線程采用的是Condition.await()方法。也就是trip.await()洒忧。為什么需要Condition蝴韭?因?yàn)樗械腶wait()其實(shí)等待的都是一個條件,一旦條件滿足就應(yīng)該都被喚醒熙侍,所以Condition整好滿足這個特點(diǎn)榄鉴。所以到這里就會明白為什么在步驟3中到達(dá)屏障點(diǎn)時創(chuàng)建新的Generation的時候是一定要喚醒其它線程的原因了。

上面4個步驟其實(shí)只是描述主體結(jié)構(gòu)蛉抓,事實(shí)上整個過程中有非常多的邏輯來處理異常引發(fā)的問題庆尘,比如執(zhí)行屏障點(diǎn)任務(wù)引發(fā)的異常,park線程超時引發(fā)的中斷異常和超時異常等等巷送。所以對于await()而言驶忌,異常的處理比業(yè)務(wù)邏輯的處理更復(fù)雜,這就解釋了為什么await()的時候可能引發(fā)InterruptedException,BrokenBarrierException,TimeoutException 三種異常笑跛。

清單3 生成下一個循環(huán)周期并喚醒其它線程

private void nextGeneration() {
? trip.signalAll();
? count = parties;
? generation = new Generation();
}

清單3 描述了如何生成下一個循環(huán)周期的過程付魔,在這個過程中當(dāng)然需要使用Condition.signalAll()喚醒所有已經(jīng)執(zhí)行完成并且正在等待的線程。另外這里count描述的是還有多少線程需要執(zhí)行飞蹂,是為了線程執(zhí)行完畢索引計(jì)數(shù)抒抬。

isBroken() 方法描述的就是generation.broken,也即線程組是否發(fā)生了異常晤柄。這里再一次解釋下為什么要有這個狀態(tài)的存在擦剑。

如果一個將要位于屏障點(diǎn)或者已經(jīng)位于屏障點(diǎn)的而執(zhí)行屏障點(diǎn)任務(wù)的線程發(fā)生了異常,那么即使喚醒了其它等待的線程,其它等待的線程也會因?yàn)檠h(huán)等待而“死去”惠勒,因?yàn)樵僖矝]有一個線程來喚醒這些第二次進(jìn)行park的線程了赚抡。還有一個意圖是,如果屏障點(diǎn)都已經(jīng)損壞了纠屋,那么其它將要等待屏障點(diǎn)的再線程掛起就沒有意義了涂臣。

寫到這里的時候非常不幸,用了4年多了臺燈終于“壽終正寢了”售担。

其實(shí)CyclicBarrier 還有一個reset方法赁遗,描述的是手動立即將所有線程中斷,恢復(fù)屏障點(diǎn)族铆,進(jìn)行下一組任務(wù)的執(zhí)行岩四。也就是與重新創(chuàng)建一個新的屏障點(diǎn)相比,可能維護(hù)的代價要小一些(減少同步哥攘,減少上一個CyclicBarrier 的管理等等)剖煌。

本來是想和Semaphore 一起將的,最后發(fā)現(xiàn)鋪開后就有點(diǎn)長了逝淹,而且也不利于理解和吸收耕姊,所以放到下一篇吧。

參考資料:

  1. 使用 CyclicBarrier 做線程間同步
  2. CyclicBarrier And CountDownLatch Tutorial
  3. 線程—CyclicBarrier
  4. Java線程學(xué)習(xí)筆記(十)CountDownLatch 和CyclicBarrier
  5. 關(guān)于多線程同步的初步教程--Barrier的設(shè)計(jì)及使用
  6. Thread coordination with CountDownLatch and CyclicBarrier
  7. 如何充分利用多核CPU栅葡,計(jì)算很大的List中所有整數(shù)的和
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末茉兰,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子欣簇,更是在濱河造成了極大的恐慌邦邦,老刑警劉巖,帶你破解...
    沈念sama閱讀 211,348評論 6 491
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件醉蚁,死亡現(xiàn)場離奇詭異,居然都是意外死亡鬼店,警方通過查閱死者的電腦和手機(jī)网棍,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,122評論 2 385
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來妇智,“玉大人滥玷,你說我怎么就攤上這事∥±猓” “怎么了惑畴?”我有些...
    開封第一講書人閱讀 156,936評論 0 347
  • 文/不壞的土叔 我叫張陵,是天一觀的道長航徙。 經(jīng)常有香客問我如贷,道長,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 56,427評論 1 283
  • 正文 為了忘掉前任杠袱,我火速辦了婚禮尚猿,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘楣富。我一直安慰自己凿掂,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 65,467評論 6 385
  • 文/花漫 我一把揭開白布纹蝴。 她就那樣靜靜地躺著庄萎,像睡著了一般。 火紅的嫁衣襯著肌膚如雪塘安。 梳的紋絲不亂的頭發(fā)上糠涛,一...
    開封第一講書人閱讀 49,785評論 1 290
  • 那天,我揣著相機(jī)與錄音耙旦,去河邊找鬼脱羡。 笑死,一個胖子當(dāng)著我的面吹牛免都,可吹牛的內(nèi)容都是我干的锉罐。 我是一名探鬼主播,決...
    沈念sama閱讀 38,931評論 3 406
  • 文/蒼蘭香墨 我猛地睜開眼绕娘,長吁一口氣:“原來是場噩夢啊……” “哼脓规!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起险领,我...
    開封第一講書人閱讀 37,696評論 0 266
  • 序言:老撾萬榮一對情侶失蹤侨舆,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后绢陌,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體挨下,經(jīng)...
    沈念sama閱讀 44,141評論 1 303
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,483評論 2 327
  • 正文 我和宋清朗相戀三年脐湾,在試婚紗的時候發(fā)現(xiàn)自己被綠了臭笆。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 38,625評論 1 340
  • 序言:一個原本活蹦亂跳的男人離奇死亡秤掌,死狀恐怖愁铺,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情闻鉴,我是刑警寧澤茵乱,帶...
    沈念sama閱讀 34,291評論 4 329
  • 正文 年R本政府宣布,位于F島的核電站孟岛,受9級特大地震影響瓶竭,放射性物質(zhì)發(fā)生泄漏督勺。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,892評論 3 312
  • 文/蒙蒙 一在验、第九天 我趴在偏房一處隱蔽的房頂上張望玷氏。 院中可真熱鬧,春花似錦腋舌、人聲如沸盏触。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,741評論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽赞辩。三九已至,卻和暖如春授艰,著一層夾襖步出監(jiān)牢的瞬間辨嗽,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 31,977評論 1 265
  • 我被黑心中介騙來泰國打工淮腾, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留糟需,地道東北人。 一個月前我還...
    沈念sama閱讀 46,324評論 2 360
  • 正文 我出身青樓谷朝,卻偏偏與公主長得像洲押,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子圆凰,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 43,492評論 2 348

推薦閱讀更多精彩內(nèi)容