在之前寫過的關(guān)于CountDownLatch的這篇文章中镀裤,我們通過使用"學(xué)生春游場景"這個示例來講述了CountDownLatch的使用抹蚀,在這個示例中我們這樣處理:老師拿了個包含50個同學(xué)名字的名單笨农,同學(xué)來一個就劃掉一個荠医,當(dāng)所有的同學(xué)都被劃掉后褂乍,說明所有的同學(xué)都到了持隧,這時候就可以出發(fā)了。"劃掉"也體現(xiàn)了"count down"的含義逃片。
繼續(xù)拿這個案例說來說屡拨,不過我們現(xiàn)在關(guān)注的不是"到齊-出發(fā)"這個先來后到的問題,而關(guān)注的是"學(xué)生上車"這個動作,我們現(xiàn)在的規(guī)定是:學(xué)生來了呀狼,不能立刻上車裂允,只有當(dāng)所有的學(xué)生都到齊之后,才能上車赠潦。這里叫胖,我們可以采用"簽到"的方式來記錄學(xué)生的到來數(shù)草冈,簽到滿50個后她奥,學(xué)生上車。
"簽到"是有加計數(shù)的含義怎棱,這和上面說的"劃掉"正好是相反的概念哩俭。
這里說"加計數(shù)"只是個人覺得從直觀上比較好的理解方式。我們后面講的CycleBarrier與CountDownLatch一樣內(nèi)部也有一個計數(shù)器拳恋,調(diào)用一次await凡资,就將計數(shù)減1,這和CountDownLatch計數(shù)處理的原理是一樣的谬运。只是CycleBarrier的本意是等所有的線程都到了再做處理隙赁,所以我覺得把調(diào)用一次await邏輯理解成為加上一次,直到加到滿足我們的總數(shù)梆暖,這樣能更好的理解伞访,因為await并不像countDown那樣具有很直觀的邏輯含義。
CountDownLatch的使用場景中被等待的學(xué)生線程是可以執(zhí)行完自己的邏輯的轰驳,而我們的等待線程就是要等被等待線程所有都執(zhí)行完厚掷,這也是CountDownLatch的語義所在。但是現(xiàn)在的要求是被等待線程到了某個點后就應(yīng)該停止级解,等待所有的線程都到達某個點冒黑。這時候使用CountDownLatch就沒法滿足我們的要求了。
Java提供了CyclicBarrier類勤哗,這個類可以滿足我們的需求抡爹,雖然我們現(xiàn)在還沒具體講這個類,不過可以先通過使用示例認(rèn)識下它芒划。
學(xué)生春游場景
import java.util.HashSet;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
public class SpringOuting {
public static void main(String[] args) throws Exception {
CyclicBarrier cyclicBarrier = new CyclicBarrier(50);// 簽到名單
Set<Thread> hashSet = new HashSet<>();
for (int i=1; i<=50; i++) {
Thread t = new Thread(new Runnable() {
@Override
public void run() {
try {
System.out.println(Thread.currentThread().getName() + "簽到...");
cyclicBarrier.await();// 簽到并在這里等起
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + "上車了###");
}
}, "同學(xué)" + i);
hashSet.add(t);
}
Iterator<Thread> it = hashSet.iterator();
while (it.hasNext()) {
Thread t = it.next();
t.start();
Thread.sleep(200);
}
}
}
下面的代碼我們創(chuàng)建的了一個CyclicBarrier對象冬竟,規(guī)定了要簽到的人數(shù):
CyclicBarrier cyclicBarrier = new CyclicBarrier(50);
當(dāng)學(xué)生線程執(zhí)行,到了下面這里:
cyclicBarrier.await();// 簽到并等起
await()之后就掛起了腊状。
運行代碼后我們可以發(fā)現(xiàn)诱咏,等到最后一個學(xué)生線程執(zhí)行await方法后,其他等待的學(xué)生線程都被喚醒了缴挖,然后就各自執(zhí)行await后的代碼袋狞。
通過CyclicBarrier我們成功得實現(xiàn)了"等學(xué)生都到了才能上車"這個需求。
老師發(fā)出命令后才上車
現(xiàn)在又有一個要求,在所有的學(xué)生到來之后苟鸯,要先等老師大喊一聲:同學(xué)們上車吧同蜻。之后學(xué)生才陸續(xù)上車。
CyclicBarrier也提供了這種支持早处,對上面的代碼進行改造:
import java.util.HashSet;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
public class SpringOuting {
public static void main(String[] args) throws Exception {
CyclicBarrier cyclicBarrier = new CyclicBarrier(50, new Runnable(){
@Override
public void run() {
System.out.println("======同學(xué)們上車吧======");
}
});
Set<Thread> hashSet = new HashSet<>();
for (int i=1; i<=50; i++) {
Thread t = new Thread(new Runnable() {
@Override
public void run() {
try {
System.out.println(Thread.currentThread().getName() + "簽到...");
cyclicBarrier.await();// 簽到并子并在這里等起
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + "上車了###");
}
}, "同學(xué)" + i);
hashSet.add(t);
}
Iterator<Thread> it = hashSet.iterator();
while (it.hasNext()) {
Thread t = it.next();
t.start();
Thread.sleep(200);
}
}
}
通過可以傳遞兩個參數(shù)的構(gòu)造方法創(chuàng)建了一個CyclicBarrier對象:
CyclicBarrier cyclicBarrier = new CyclicBarrier(50, new Runnable(){
@Override
public void run() {
System.out.println("======同學(xué)們上車吧======");
}
});
第二個參數(shù)是Runnable接口的實現(xiàn)续崖,看下API中的說明:
public CyclicBarrier(int parties, Runnable barrierAction)
Creates a new CyclicBarrier that will trip when the given number of parties (threads) are waiting upon it, and which will execute the given barrier action when the barrier is tripped, performed by the last thread entering the barrier.
創(chuàng)建一個新的 CyclicBarrier,它將在給定數(shù)量的參與者(線程)都進入等待狀態(tài)后啟動佛寿,并在啟動 barrier 時執(zhí)行給定的屏障操作壶熏,該操作由最后一個進入 barrier 的線程執(zhí)行。
某個時刻因為車沒到咸包,老師就通知已經(jīng)簽到的同學(xué)先回去
先來看代碼:
import java.util.HashSet;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
public class SpringOuting {
public static void main(String[] args) throws Exception {
CyclicBarrier cyclicBarrier = new CyclicBarrier(50, new Runnable(){
@Override
public void run() {
System.out.println("======同學(xué)們上車吧======");
}
});
Set<Thread> hashSet = new HashSet<>();
for (int i=1; i<=50; i++) {
Thread t = new Thread(new Runnable() {
@Override
public void run() {
try {
System.out.println(Thread.currentThread().getName() + "簽到...");
cyclicBarrier.await();// 簽到并子并在這里等起
} catch (InterruptedException | BrokenBarrierException e) {
System.out.println(Thread.currentThread().getName() + "返回!!!");
return;
}
System.out.println(Thread.currentThread().getName() + "上車了###");
}
}, "同學(xué)" + i);
hashSet.add(t);
}
int i = 1;
Iterator<Thread> it = hashSet.iterator();
while (it.hasNext()) {
Thread t = it.next();
t.start();
Thread.sleep(200);
if (i == 10) {
System.out.println("%%%%%%%同學(xué)們桃序,車還沒有到,你們先回去%%%%%%%");
cyclicBarrier.reset();
}
i++;
}
}
}
運行代碼烂瘫,其中一次的結(jié)果如下:
同學(xué)41簽到...
同學(xué)30簽到...
同學(xué)33簽到...
同學(xué)11簽到...
同學(xué)28簽到...
同學(xué)45簽到...
同學(xué)47簽到...
同學(xué)10簽到...
同學(xué)12簽到...
同學(xué)37簽到...
%%%%%%%同學(xué)們媒熊,車還沒有到,你們先回去%%%%%%%
同學(xué)41返回!!!
同學(xué)45返回!!!
同學(xué)33返回!!!
同學(xué)11返回!!!
同學(xué)12返回!!!
同學(xué)30返回!!!
同學(xué)10返回!!!
同學(xué)47返回!!!
同學(xué)37返回!!!
同學(xué)28返回!!!
同學(xué)17簽到...
同學(xué)29簽到...
同學(xué)39簽到...
同學(xué)7簽到...
同學(xué)16簽到...
同學(xué)9簽到...
同學(xué)27簽到...
同學(xué)40簽到...
同學(xué)31簽到...
同學(xué)44簽到...
同學(xué)8簽到...
同學(xué)38簽到...
同學(xué)25簽到...
同學(xué)5簽到...
同學(xué)3簽到...
同學(xué)42簽到...
同學(xué)19簽到...
同學(xué)48簽到...
同學(xué)22簽到...
同學(xué)24簽到...
同學(xué)26簽到...
同學(xué)46簽到...
同學(xué)34簽到...
同學(xué)32簽到...
同學(xué)18簽到...
同學(xué)35簽到...
同學(xué)4簽到...
同學(xué)13簽到...
同學(xué)14簽到...
同學(xué)2簽到...
同學(xué)15簽到...
同學(xué)21簽到...
同學(xué)20簽到...
同學(xué)1簽到...
同學(xué)36簽到...
同學(xué)49簽到...
同學(xué)6簽到...
同學(xué)23簽到...
同學(xué)50簽到...
同學(xué)43簽到...
通過結(jié)果可以看到坟比,當(dāng)已簽到10個同學(xué)的時候芦鳍,老師突然通知車還沒到,已經(jīng)簽到的同學(xué)先回去葛账,正在等待上車的同學(xué)收到了這個通知就返回了柠衅;但是我們看到剩下的40位同學(xué)依然過來進行了簽到,并且全部都未上車注竿。
從運行代碼的IDE或命令窗口可以看到程序被掛起了茄茁。
為什么代碼會有這樣的表現(xiàn)?
我們可以看到與之前的代碼不同的是巩割,上面的代碼中增加了一段這樣的代碼:
if (i == 10) {
System.out.println("%%%%%%%同學(xué)們裙顽,車還沒有到,你們先回去%%%%%%%");
cyclicBarrier.reset();
}
要知道這段代碼的作用宣谈,我們需要知道reset的作用愈犹, API中描述如下:
public void reset()
Resets the barrier to its initial state. If any parties are currently waiting at the barrier, they will return with a BrokenBarrierException.
將屏障重置為其初始狀態(tài)。如果存在參與者(們)目前在屏障處等待闻丑,則會喚醒它們漩怎,同時拋出一個BrokenBarrierException異常。
看了這個API的描述嗦嗡,我們應(yīng)該就可以說明上面程序表現(xiàn)那樣的行為的原因了:當(dāng)i=10的時候勋锤,這時候有10個學(xué)生線程處于等待狀態(tài),調(diào)用了reset方法會做兩個動作:
- 會重置狀態(tài)
也就是說之前10個學(xué)生的簽名作廢侥祭,簽名單重新需要50個同學(xué)簽到才能有效 - 通知這10個正在等待的線程
喚醒的時候會拋出BrokenBarrierException異常叁执,await方法會捕獲到茄厘,我們上面的代碼捕獲到后做了讓簽到的同學(xué)返回的處理。
屏障被重置了谈宛,重新需要50個同學(xué)簽到次哈,這時已經(jīng)有10個學(xué)生回去了,雖然后面的40個學(xué)生依然成功簽到吆录,但是如果那10個學(xué)生不回來重新簽到的話窑滞,所有的學(xué)生就都無法上車。
await() API
Waits until all parties have invoked await on this barrier.
在所有等待在屏障的參與者都調(diào)用了await 方法之前恢筝,此參與者將一直等待哀卫。
If the current thread is not the last to arrive then it is disabled for thread scheduling purposes and lies dormant until one of the following things happens:
如果當(dāng)前線程不是最后一個到達的線程,出于調(diào)度目的滋恬,將禁用它聊训,且在發(fā)生以下情況之一前,該線程將一直處于休眠狀態(tài):
- The last thread arrives; or
- Some other thread interrupts the current thread; or
- Some other thread interrupts one of the other waiting threads; or
- Some other thread times out while waiting for barrier; or
- Some other thread invokes reset() on this barrier.
- 最后一個線程到達恢氯;或者
- 其他某個線程中斷當(dāng)前線程;或者
- 其他某個線程中斷另一個等待線程鼓寺;或者
- 其他某個線程在等待 barrier 時超時勋拟;或者
- 其他某個線程在此 barrier 上調(diào)用 reset()。
從這里可以看到對于中斷妈候,中斷當(dāng)前線程和中斷另一個等待的線程都會對當(dāng)前線程有喚醒影響敢靡。
If the current thread:
- has its interrupted status set on entry to this method; or
- is interrupted while waiting
then InterruptedException is thrown and the current thread's interrupted status is cleared.
如果當(dāng)前線程:
- 在進入此方法時已經(jīng)設(shè)置了該線程的中斷狀態(tài);或者
- 在等待時被中斷
則拋出 InterruptedException苦银,并且清除當(dāng)前線程的已中斷狀態(tài)啸胧。
If the barrier is reset() while any thread is waiting, or if the barrier is broken when await is invoked, or while any thread is waiting, then BrokenBarrierException is thrown.
如果在線程處于等待狀態(tài)時barrier被reset(),或者在調(diào)用await時barrier被損壞幔虏,抑或任意一個線程正處于等待狀態(tài)纺念,則拋出 BrokenBarrierException 異常。
If any thread is interrupted while waiting, then all other waiting threads will throw BrokenBarrierException and the barrier is placed in the broken state.
如果任何線程在等待時被中斷想括,則其他所有等待線程都將拋出 BrokenBarrierException異常陷谱,并將barrier置于損壞狀態(tài)。
If the current thread is the last thread to arrive, and a non-null barrier action was supplied in the constructor, then the current thread runs the action before allowing the other threads to continue. If an exception occurs during the barrier action then that exception will be propagated in the current thread and the barrier is placed in the broken state.
如果當(dāng)前線程是最后一個到達的線程瑟蜈,并且構(gòu)造方法中提供了一個非空的屏障操作烟逊,則在允許其他線程繼續(xù)運行之前,當(dāng)前線程將運行該操作铺根。如果在執(zhí)行屏障操作過程中發(fā)生異常宪躯,則該異常將傳播到當(dāng)前線程中,并將barrier置于損壞狀態(tài)位迂。
CyclicBarrier工作原理
從兩個方面來研究CyclicBarrier的工作原理:
- 計數(shù)值
- 掛起和喚醒
計數(shù)值
CyclicBarrier通過計數(shù)值來計算到達的線程數(shù)访雪。類中屬性的定義如下:
/**
* Number of parties still waiting. Counts down from parties to 0
* on each generation. It is reset to parties on each new
* generation or when broken.
*/
private int count;
這個是CyclicBarrier中的一個普通屬性予颤,不過現(xiàn)在問題來了CyclicBarrier對象會被多個線程使用,也就是說會并發(fā)得被訪問冬阳,那就需要保證每次只能一個線程對其修改操作蛤虐。CyclicBarrier是怎么做的呢?
/**
* Main barrier code, covering the various policies.
*/
private int dowait(boolean timed, long nanos)
throws InterruptedException, BrokenBarrierException,
TimeoutException {
final ReentrantLock lock = this.lock;
lock.lock();
try {
final Generation g = generation;
if (g.broken)
throw new BrokenBarrierException();
if (Thread.interrupted()) {
breakBarrier();
throw new InterruptedException();
}
int index = --count;
if (index == 0) { // tripped
boolean ranAction = false;
try {
final Runnable command = barrierCommand;
if (command != null)
command.run();
ranAction = true;
nextGeneration();
return 0;
} finally {
if (!ranAction)
breakBarrier();
}
}
// loop until tripped, broken, interrupted, or timed out
for (;;) {
try {
if (!timed)
trip.await();
else if (nanos > 0L)
nanos = trip.awaitNanos(nanos);
} catch (InterruptedException ie) {
if (g == generation && ! g.broken) {
breakBarrier();
throw ie;
} else {
// We're about to finish waiting even if we had not
// been interrupted, so this interrupt is deemed to
// "belong" to subsequent execution.
Thread.currentThread().interrupt();
}
}
if (g.broken)
throw new BrokenBarrierException();
if (g != generation)
return index;
if (timed && nanos <= 0L) {
breakBarrier();
throw new TimeoutException();
}
}
} finally {
lock.unlock();
}
}
從上面的代碼可以看到肝陪,使用了ReentrantLock進行加鎖處理驳庭,保證了count屬性的同步訪問。
掛起和喚醒
通過Condition接口的await和signalAll()方法實現(xiàn)氯窍。
底層的方法不是很難饲常,主要用ReentrantLock和Condition來操作。這兩個類不在本文講述范圍內(nèi)狼讨,所以工作原理的研究就只點到為止贝淤。
總結(jié)
下面的內(nèi)容摘抄自《Java并發(fā)編程的藝術(shù)》
CyclicBarrier的字面意思是可循環(huán)使用(Cyclic)的屏障(Barrier)。它要做的事情是政供,讓一組線程到達一個屏障(也可以叫同步點)時被掛起播聪,直到最后一個線程到達屏障時,屏障才會開門布隔,所有被屏障攔截的線程才會繼續(xù)運行离陶。
CountDownLatch的計數(shù)器只能使用一次,而CyclicBarrier的計數(shù)器可以使用reset()方法重置衅檀。所以CyclicBarrier能處理更為復(fù)雜的業(yè)務(wù)場景招刨。例如,如果計算發(fā)生錯誤哀军,可以重置計數(shù)器沉眶,并讓線程重新執(zhí)行一次。
使用場景
下面的內(nèi)容摘抄自《Java并發(fā)編程的藝術(shù)》
CyclicBarrier可以用于多線程計算數(shù)據(jù)杉适,最后合并計算結(jié)果的場景谎倔。例如,用一個Excel保存了用戶所有銀行流水淘衙,每個Sheet保存一個賬戶近一年的每筆銀行流水传藏,現(xiàn)在需要統(tǒng)計用戶的日均銀行流水,先用多線程處理每個sheet里的銀行流水彤守,都執(zhí)行完之后毯侦,得到每個sheet的日均流水,最后具垫,在用barrierAction用這些線程的計算結(jié)果侈离,計算出整個Excel的日均銀行流水。
實現(xiàn)原理
內(nèi)部通過ReentrantLock和Condition實現(xiàn)筝蚕。
給定一個令牌總數(shù)卦碾,線程調(diào)用await方法將令牌數(shù)減1铺坞,如果令牌剩余不為0,調(diào)用Condition的await方法將線程放入到條件隊列中洲胖,當(dāng)最后一個線程調(diào)用await方法后济榨,令牌剩余數(shù)為0,則通過Condtion的signalAll方法喚醒所有在條件隊列中等待線程绿映。