CyclicBarrier
如果說CountDownLatch是一次性的侦啸,那么CyclicBarrier正好可以循環(huán)使用纹坐。它允許一組線程互相等待贿讹,直到到達(dá)某個公共屏障點(diǎn) (common barrier point)萎羔。所謂屏障點(diǎn)就是一組任務(wù)執(zhí)行完畢的時刻滔吠。
清單1 一個使用CyclicBarrier的例子
package xylz.study.concurrency.lock;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;public class CyclicBarrierDemo {
? final CyclicBarrier barrier;
? final int MAX_TASK;
? public CyclicBarrierDemo(int cnt) {
? barrier = new CyclicBarrier(cnt + 1);
? MAX_TASK = cnt;
? }? public void doWork(final Runnable work) {
? new Thread() {? public void run() {
? work.run();
? try {
? int index = barrier.await();
? doWithIndex(index);
? } catch (InterruptedException e) {
? return;
? } catch (BrokenBarrierException e) {
? return;
? }
? }
? }.start();
? }? private void doWithIndex(int index) {
? if (index == MAX_TASK / 3) {
? System.out.println("Left 30%.");
? } else if (index == MAX_TASK / 2) {
? System.out.println("Left 50%");
? } else if (index == 0) {
? System.out.println("run over");
? }
? }? public void waitForNext() {
? try {
? doWithIndex(barrier.await());
? } catch (InterruptedException e) {
? return;
? } catch (BrokenBarrierException e) {
? return;
? }
? }? public static void main(String[] args) {
? final int count = 10;
? CyclicBarrierDemo demo = new CyclicBarrierDemo(count);
? for (int i = 0; i < 100; i++) {
? demo.doWork(new Runnable() {? public void run() {
? //do something
? try {
? Thread.sleep(1000L);
? } catch (Exception e) {
? return;
? }
? }
? });
? if ((i + 1) % count == 0) {
? demo.waitForNext();
? }
? }
? }}
清單1描述的是一個周期性處理任務(wù)的例子抵恋,在這個例子中有一對的任務(wù)(100個)焕议,希望每10個為一組進(jìn)行處理,當(dāng)前僅當(dāng)上一組任務(wù)處理完成后才能進(jìn)行下一組馋记,另外在每一組任務(wù)中号坡,當(dāng)任務(wù)剩下50%懊烤,30%以及所有任務(wù)執(zhí)行完成時向觀察者發(fā)出通知。
在這個例子中宽堆,CyclicBarrierDemo 構(gòu)建了一個count+1的任務(wù)組(其中一個任務(wù)時為了外界方便掛起主線程)腌紧。每一個子任務(wù)里,人物本身執(zhí)行完畢后都需要等待同組內(nèi)其它任務(wù)執(zhí)行完成后才能繼續(xù)畜隶。同時在剩下任務(wù)50%壁肋、30%已經(jīng)0時執(zhí)行特殊的其他任務(wù)(發(fā)通知)。
很顯然CyclicBarrier有以下幾個特點(diǎn):
- await()方法將掛起線程籽慢,直到同組的其它線程執(zhí)行完畢才能繼續(xù)
- await()方法返回線程執(zhí)行完畢的索引浸遗,注意,索引時從任務(wù)數(shù)-1開始的箱亿,也就是第一個執(zhí)行完成的任務(wù)索引為parties-1,最后一個為0跛锌,這個parties為總?cè)蝿?wù)數(shù),清單中是cnt+1
- CyclicBarrier 是可循環(huán)的届惋,顯然名稱說明了這點(diǎn)髓帽。在清單1中,每一組任務(wù)執(zhí)行完畢就能夠執(zhí)行下一組任務(wù)脑豹。
另外除了CyclicBarrier除了以上特點(diǎn)外郑藏,還有以下幾個特點(diǎn):
- 如果屏障操作不依賴于掛起的線程,那么任何線程都可以執(zhí)行屏障操作瘩欺。在清單1中可以看到并沒有指定那個線程執(zhí)行50%必盖、30%、0%的操作俱饿,而是一組線程(cnt+1)個中任何一個線程只要到達(dá)了屏障點(diǎn)都可以執(zhí)行相應(yīng)的操作
- CyclicBarrier 的構(gòu)造函數(shù)允許攜帶一個任務(wù)歌粥,這個任務(wù)將在0%屏障點(diǎn)執(zhí)行,它將在await()==0后執(zhí)行稍途。
- CyclicBarrier 如果在await時因?yàn)橹袛喔罅摺⑹ ⒊瑫r等原因提前離開了屏障點(diǎn)械拍,那么任務(wù)組中的其他任務(wù)將立即被中斷突勇,以InterruptedException異常離開線程。
- 所有await()之前的操作都將在屏障點(diǎn)之前運(yùn)行坷虑,也就是CyclicBarrier 的內(nèi)存一致性效果
CyclicBarrier 的所有API如下:
-
public CyclicBarrier(int parties) 創(chuàng)建一個新的
CyclicBarrier
甲馋,它將在給定數(shù)量的參與者(線程)處于等待狀態(tài)時啟動,但它不會在啟動 barrier 時執(zhí)行預(yù)定義的操作迄损。 -
public CyclicBarrier(int parties, Runnable barrierAction) 創(chuàng)建一個新的
CyclicBarrier
定躏,它將在給定數(shù)量的參與者(線程)處于等待狀態(tài)時啟動,并在啟動 barrier 時執(zhí)行給定的屏障操作,該操作由最后一個進(jìn)入 barrier 的線程執(zhí)行痊远。 -
public int await() throws InterruptedException, BrokenBarrierException 在所有參與者都已經(jīng)在此 barrier 上調(diào)用
await
方法之前垮抗,將一直等待。 - public int await(long timeout,TimeUnit unit) throws InterruptedException, BrokenBarrierException,TimeoutException 在所有參與者都已經(jīng)在此屏障上調(diào)用 await 方法之前將一直等待,或者超出了指定的等待時間碧聪。
- *public int getNumberWaiting() *返回當(dāng)前在屏障處等待的參與者數(shù)目冒版。此方法主要用于調(diào)試和斷言。
- public int getParties() 返回要求啟動此 barrier 的參與者數(shù)目逞姿。
- public boolean isBroken() 查詢此屏障是否處于損壞狀態(tài)辞嗡。
- public void reset() 將屏障重置為其初始狀態(tài)。
針對以上API滞造,下面來探討下CyclicBarrier 的實(shí)現(xiàn)原理续室,以及為什么有這樣的API。
清單2 CyclicBarrier.await*()的實(shí)現(xiàn)片段
? private int dowait(boolean timed, long nanos)
? throws InterruptedException, BrokenBarrierException,
? TimeoutException {
? final ReentrantLock lock = this.lock;
? lock.lock();
? try {
? final Generation g = generation;
? if (g.broken)
? throw new BrokenBarrierException();? if (Thread.interrupted()) {
? breakBarrier();
? throw new InterruptedException();
? }? int index = --count;
? if (index == 0) { // tripped
? boolean ranAction = false;
? try {
? final Runnable command = barrierCommand;
? if (command != null)
? command.run();
? ranAction = true;
? nextGeneration();
? return 0;
? } finally {
? if (!ranAction)
? breakBarrier();
? }
? }? // loop until tripped, broken, interrupted, or timed out
? for (;;) {
? try {
? if (!timed)
? trip.await();
? else if (nanos > 0L)
? nanos = trip.awaitNanos(nanos);
? } catch (InterruptedException ie) {
? if (g == generation && ! g.broken) {
? breakBarrier();
? throw ie;
? } else {
? Thread.currentThread().interrupt();
? }
? }? if (g.broken)
? throw new BrokenBarrierException();? if (g != generation)
? return index;? if (timed && nanos <= 0L) {
? breakBarrier();
? throw new TimeoutException();
? }
? }
? } finally {
? lock.unlock();
? }
}
清單2有點(diǎn)復(fù)雜谒养,這里一點(diǎn)一點(diǎn)的剖析挺狰,并且還原到最原始的狀態(tài)。
利用前面學(xué)到的知識买窟,我們知道要想讓線程等待其他線程執(zhí)行完畢她渴,那么已經(jīng)執(zhí)行完畢的線程(進(jìn)入await*()方法)就需要park(),直到超時或者被中斷蔑祟,或者被其它線程喚醒。
前面說過CyclicBarrier 的特點(diǎn)是要么大家都正常執(zhí)行完畢沉唠,要么大家都異常被中斷疆虚,不會其中有一個被中斷而其它正常執(zhí)行完畢的現(xiàn)象存在。這種特點(diǎn)叫all-or-none满葛。類似的概念是原子操作中的要么大家都執(zhí)行完径簿,要么一個操作都不執(zhí)行完。當(dāng)前這其實(shí)是兩個概念了嘀韧。要完成這樣的特點(diǎn)就必須有一個狀態(tài)來描述曾經(jīng)是否有過線程被中斷(broken)了篇亭,這樣后面執(zhí)行完的線程就該知道是否需要繼續(xù)等待了。而在CyclicBarrier 中Generation 就是為了完成這件事情的锄贷。Generation的定義非常簡單译蒂,整個結(jié)構(gòu)就只有一個變量boolean broken = false;,定義是否發(fā)生了broken操作谊却。
由于有競爭資源的存在(broken/index)柔昼,所以毫無疑問需要一把鎖lock。拿到鎖后整個過程是這樣的:
- 檢查是否存在中斷位(broken)炎辨,如果存在就立即以BrokenBarrierException異常返回捕透。此異常描述的是線程進(jìn)入屏障被破壞的等待狀態(tài)。否則進(jìn)行2。
- 檢查當(dāng)前線程是否被中斷乙嘀,如果是那么就設(shè)置中斷位(使其它將要進(jìn)入等待的線程知道)末购,另外喚醒已經(jīng)等待的線程,同時以InterruptedException異常返回虎谢,表示線程要處理中斷盟榴。否則進(jìn)行3。
- 將剩余任務(wù)數(shù)減1嘉冒,如果此時剩下的任務(wù)數(shù)為0曹货,也就是達(dá)到了公共屏障點(diǎn),那么就執(zhí)行屏障點(diǎn)任務(wù)(如果有的話)讳推,同時創(chuàng)建新的Generation(在這個過程中會喚醒其它所有線程顶籽,因此當(dāng)前線程是屏障點(diǎn)線程,那么其它線程就都應(yīng)該在等待狀態(tài))银觅。否則進(jìn)行4礼饱。
- 到這里說明還沒有到達(dá)屏障點(diǎn),那么此時線程就應(yīng)該park()究驴。很顯然在下面的for循環(huán)中就是要park線程镊绪。這里park線程采用的是Condition.await()方法。也就是trip.await()洒忧。為什么需要Condition蝴韭?因?yàn)樗械腶wait()其實(shí)等待的都是一個條件,一旦條件滿足就應(yīng)該都被喚醒熙侍,所以Condition整好滿足這個特點(diǎn)榄鉴。所以到這里就會明白為什么在步驟3中到達(dá)屏障點(diǎn)時創(chuàng)建新的Generation的時候是一定要喚醒其它線程的原因了。
上面4個步驟其實(shí)只是描述主體結(jié)構(gòu)蛉抓,事實(shí)上整個過程中有非常多的邏輯來處理異常引發(fā)的問題庆尘,比如執(zhí)行屏障點(diǎn)任務(wù)引發(fā)的異常,park線程超時引發(fā)的中斷異常和超時異常等等巷送。所以對于await()而言驶忌,異常的處理比業(yè)務(wù)邏輯的處理更復(fù)雜,這就解釋了為什么await()的時候可能引發(fā)InterruptedException,BrokenBarrierException,TimeoutException 三種異常笑跛。
清單3 生成下一個循環(huán)周期并喚醒其它線程
private void nextGeneration() {
? trip.signalAll();
? count = parties;
? generation = new Generation();
}
清單3 描述了如何生成下一個循環(huán)周期的過程付魔,在這個過程中當(dāng)然需要使用Condition.signalAll()喚醒所有已經(jīng)執(zhí)行完成并且正在等待的線程。另外這里count描述的是還有多少線程需要執(zhí)行飞蹂,是為了線程執(zhí)行完畢索引計(jì)數(shù)抒抬。
isBroken() 方法描述的就是generation.broken,也即線程組是否發(fā)生了異常晤柄。這里再一次解釋下為什么要有這個狀態(tài)的存在擦剑。
如果一個將要位于屏障點(diǎn)或者已經(jīng)位于屏障點(diǎn)的而執(zhí)行屏障點(diǎn)任務(wù)的線程發(fā)生了異常,那么即使喚醒了其它等待的線程,其它等待的線程也會因?yàn)檠h(huán)等待而“死去”惠勒,因?yàn)樵僖矝]有一個線程來喚醒這些第二次進(jìn)行park的線程了赚抡。還有一個意圖是,如果屏障點(diǎn)都已經(jīng)損壞了纠屋,那么其它將要等待屏障點(diǎn)的再線程掛起就沒有意義了涂臣。
寫到這里的時候非常不幸,用了4年多了臺燈終于“壽終正寢了”售担。
其實(shí)CyclicBarrier 還有一個reset方法赁遗,描述的是手動立即將所有線程中斷,恢復(fù)屏障點(diǎn)族铆,進(jìn)行下一組任務(wù)的執(zhí)行岩四。也就是與重新創(chuàng)建一個新的屏障點(diǎn)相比,可能維護(hù)的代價要小一些(減少同步哥攘,減少上一個CyclicBarrier 的管理等等)剖煌。
本來是想和Semaphore 一起將的,最后發(fā)現(xiàn)鋪開后就有點(diǎn)長了逝淹,而且也不利于理解和吸收耕姊,所以放到下一篇吧。
參考資料: