CountDownLatch or CyclicBarrier

簡介

JDK中提供了一些用于線程之間協(xié)同等待的工具類饵隙,CountDownLatch和CyclicBarrier就是最典型的兩個線程同步輔助類。
<b>CountDownLatch :</b>一個同步輔助類刚梭,在完成一組正在其他線程中執(zhí)行的操作之前,它允許一個或多個線程一直等待
<b>CyclicBarrier :</b>一個同步輔助類,它允許一組線程互相等待梭域,直到到達(dá)某個公共屏障點 (common barrier point)酥夭。

二者功能實現(xiàn)的區(qū)別:

1.CountDownLatch是一次性的赐纱,CyclicBarrier可以重設(shè)置。
2.CountDownLatch強(qiáng)調(diào)一個線程等多個線程完成某件事情采郎。CyclicBarrier是多個線程互等千所,等大家都完成。
3.CyclicBarrier有g(shù)etNumberWaiting接口返回被阻塞的線程數(shù)

二者功能介紹:

<h4>使用CountDownLatch實現(xiàn):</h4>
1蒜埋、5個運動員相繼都準(zhǔn)備就緒
2淫痰、教練員響起發(fā)令槍
3、運動員起跑
流程圖:
<pre>

Paste_Image.png

</pre>
demo code:
<pre>
public class TestCountDownLatch {
private static final int RUNNER_NUMBER = 5; // 運動員個數(shù)
private static final Random RANDOM = new Random();
public static void main(String[] args) {
// 用于判斷發(fā)令之前運動員是否已經(jīng)完全進(jìn)入準(zhǔn)備狀態(tài)整份,需要等待5個運動員待错,所以參數(shù)為5
CountDownLatch readyLatch = new CountDownLatch(RUNNER_NUMBER);
// 用于判斷裁判是否已經(jīng)發(fā)令,只需要等待一個裁判烈评,所以參數(shù)為1
CountDownLatch startLatch = new CountDownLatch(1);
for (int i = 0; i < RUNNER_NUMBER; i++) {
Thread t = new Thread(new Runner((i + 1) + "號運動員", readyLatch, startLatch));
t.start();
}
try {
readyLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("裁判:所有運動員準(zhǔn)備完畢火俄,開始...");
startLatch.countDown();
}
static class Runner implements Runnable {
private CountDownLatch readyLatch;
private CountDownLatch startLatch;
private String name;
public Runner(String name, CountDownLatch readyLatch, CountDownLatch startLatch) {
this.name = name;
this.readyLatch = readyLatch;
this.startLatch = startLatch;
}
public void run() {
int readyTime = RANDOM.nextInt(1000);
try {
Thread.sleep(readyTime);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(name + ":我已經(jīng)準(zhǔn)備完畢.");
readyLatch.countDown();
try {
startLatch.await(); // 等待裁判發(fā)開始命令
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(name + ":開跑...");
}
}
}
---------------------------輸出----------------------------------------------------------
3號運動員:我已經(jīng)準(zhǔn)備完畢.
1號運動員:我已經(jīng)準(zhǔn)備完畢.
4號運動員:我已經(jīng)準(zhǔn)備完畢.
5號運動員:我已經(jīng)準(zhǔn)備完畢.
2號運動員:我已經(jīng)準(zhǔn)備完畢.
裁判:所有運動員準(zhǔn)備完畢,開始...
3號運動員:開跑...
4號運動員:開跑...
1號運動員:開跑...
2號運動員:開跑...
5號運動員:開跑...
</pre>
<h4>使用CyclicBarrier模擬實現(xiàn):</h4>
1讲冠、前端調(diào)用restful api,已知商品id以后,調(diào)用后端商品起價接口瓜客、商品圖片信息接口
2、調(diào)用后端商品起價接口竿开、商品圖片信息接口
3谱仪、在匯總模塊中將多個接口的值拼接組合返回給前端
流程圖:
<pre>
Paste_Image.png

</pre>
demo code:
<pre>
public class TestCyclicBarrier {
private static final int THREAD_NUMBER = 2;
private static final Random RANDOM = new Random();
public static void main(String[] args) {
CyclicBarrier barrier = new CyclicBarrier(THREAD_NUMBER, new Runnable() {
public void run() {
System.out.println("2個接口都調(diào)用完成進(jìn)行后續(xù)處理。否彩。疯攒。");
}
});
for (int i = 0; i < THREAD_NUMBER; i++) {
Thread t = new Thread(new Worker(barrier,i));
t.start();
}
}
static class Worker implements Runnable {
private CyclicBarrier barrier;
private int apiIndex;
public Worker(CyclicBarrier barrier,int apiIndex) {
this.barrier = barrier;
this.apiIndex = apiIndex;
}
public void run() {
int time = RANDOM.nextInt(1000);
try {
Thread.sleep(time);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("接口" + apiIndex + "調(diào)用完成");
try {
barrier.await(); // 等待所有線程都調(diào)用過此函數(shù)才能進(jìn)行后續(xù)動作
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
}
}
}
---------------------------輸出----------------------------------------------------------
接口0調(diào)用完成
接口1調(diào)用完成
2個接口都調(diào)用完成進(jìn)行后續(xù)處理。列荔。敬尺。
</pre>
<h4>CountDownLatch源碼:</h4>
CountDownLatch.countDown:
<pre>
public void countDown() {
sync.releaseShared(1);
}
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) { //嘗試將初始化的state減運算
doReleaseShared();
return true;
}
return false;
}
private void doReleaseShared() {
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h);
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed枚尼,只執(zhí)行這里,中間一大段都會被跳過
break;
}
}
</pre>
CountDownLatch.await:
<pre>
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0) //state狀態(tài)不為0.既
doAcquireSharedInterruptibly(arg);
}
Node節(jié)點的waitStatus取值:
static final int CANCELLED = 1; //節(jié)點因為超時或者中斷被取消砂吞。該狀態(tài)不會再發(fā)生變署恍,而且被取消節(jié)點對應(yīng)的線程不會再發(fā)生阻塞。
static final int SIGNAL = -1; //后繼節(jié)點將被或者已經(jīng)被阻塞呜舒,所以當(dāng)前節(jié)點在釋放或者取消時锭汛,需要unpark它的后繼節(jié)點。
static final int CONDITION = -2; //該狀態(tài)僅供在條件隊列中的節(jié)點使用袭蝗。當(dāng)該節(jié)點轉(zhuǎn)移到同步隊列中時唤殴,該狀態(tài)將被設(shè)置為0。
static final int PROPAGATE = -3; //僅在共享模式下使用到腥。在doReleaseShared()方法中朵逝,僅僅會設(shè)置頭節(jié)點的狀態(tài)為PROPAGATE。
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.SHARED); //共享的形式
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor(); //前一個節(jié)點
if (p == head) {
int r = tryAcquireShared(arg); //判斷state值
if (r >= 0) {
setHeadAndPropagate(node, r); //設(shè)置頭結(jié)點為node,且最終設(shè)置頭結(jié)點的waitStatus為Node.PROPAGATE且喚醒node后面的狀態(tài)<0的某個結(jié)點(如果有的話)
p.next = null; // help GC
failed = false;
return; //結(jié)束阻塞
}
}
if (shouldParkAfterFailedAcquire(p, node) && //shouldParkAfterFailedAcquire 判斷前一個結(jié)點的狀態(tài)來確定結(jié)點是否應(yīng)該被阻塞
parkAndCheckInterrupt()) //阻塞且判斷是否被中斷乡范,如果被中斷則拋異常中斷
throw new InterruptedException();
}
} finally {
if (failed) //異常的話acquire失敗配名,在阻塞隊列中取消node,如果node為頭結(jié)點的話,且喚醒node后面的狀態(tài)<0的某個結(jié)點(如果有的話)
cancelAcquire(node);
}
}
</pre>
<h4>CyclicBarrier源碼:</h4>
CyclicBarrier:因為這個類代碼量比較少全局分析一下:
<pre>
public class CyclicBarrier {
private static class Generation {
boolean broken = false; //當(dāng)前代被中止
}
private final ReentrantLock lock = new ReentrantLock(); //lock
private final Condition trip = lock.newCondition();
private final int parties; //屏障需要攔截的任務(wù)數(shù)
private final Runnable barrierCommand; //線程都執(zhí)行結(jié)束到這個屏障以后執(zhí)行執(zhí)行的任務(wù)
private Generation generation = new Generation(); //當(dāng)前代,主要是為了reset的時候預(yù)留
private int count; //用于統(tǒng)計剩余任務(wù)數(shù)晋辆,初始化的時候=parties渠脉。為0的時候結(jié)束釋放鎖,lock.unlock();
private void nextGeneration() {
// signal completion of last generation
trip.signalAll(); //通知所有被阻塞的線程
// set up next generation
count = parties;
generation = new Generation();
}
private void breakBarrier() {
generation.broken = true; //中斷
count = parties; //count被重置
trip.signalAll();
}
private int dowait(boolean timed, long nanos) //可以定義是否帶超時的等待
throws InterruptedException, BrokenBarrierException,
TimeoutException {
final ReentrantLock lock = this.lock;
lock.lock();
try {
final Generation g = generation;
if (g.broken) //屏障被中斷
throw new BrokenBarrierException();
if (Thread.interrupted()) { //當(dāng)前線程被中斷
breakBarrier(); //喚醒所有
throw new InterruptedException();
}
int index = --count;
if (index == 0) { // tripped
boolean ranAction = false;
try {
final Runnable command = barrierCommand;
if (command != null)
command.run(); //如果barrierCommand不為空的話執(zhí)行任務(wù)瓶佳,不是start重啟一個線程
ranAction = true;
nextGeneration(); //notice all 且更新?lián)Q代
return 0;
} finally {
if (!ranAction)
breakBarrier(); //如果執(zhí)行barrierCommand失敗的話芋膘,喚醒所有wait的線程
}
}
// loop until tripped, broken, interrupted, or timed out
for (;;)
try {
if (!timed) //如果沒有設(shè)置wait時間話
trip.await(); //阻塞
else if (nanos > 0L)
nanos = trip.awaitNanos(nanos); //設(shè)置等待時間
} catch (InterruptedException ie) {
if (g == generation && ! g.broken) { //未中斷
breakBarrier();//則重新中斷
throw ie;
} else {
Thread.currentThread().interrupt();
}
}
if (g.broken)
throw new BrokenBarrierException();
if (g != generation)
return index;
if (timed && nanos <= 0L) {
breakBarrier();
throw new TimeoutException();
}
}
} finally {
lock.unlock();
}
}
public CyclicBarrier(int parties, Runnable barrierAction) { //初始化
if (parties <= 0) throw new IllegalArgumentException();
this.parties = parties;
this.count = parties;
this.barrierCommand = barrierAction;
}
public CyclicBarrier(int parties) { //只有線程數(shù),沒有task霸饲,表示到達(dá)barrier后什么也不做
this(parties, null);
}
public int getParties() {
return parties;
}
public int await() throws InterruptedException, BrokenBarrierException {
try {
return dowait(false, 0L);
} catch (TimeoutException toe) {
throw new Error(toe); // cannot happen;
}
}
public int await(long timeout, TimeUnit unit) //設(shè)置阻塞的最大時間
throws InterruptedException,
BrokenBarrierException,
TimeoutException {
return dowait(true, unit.toNanos(timeout));
}
public boolean isBroken() { //返回屏障的阻塞狀態(tài)为朋,感覺用ReentrantReadWriteLock 的readLock效率更高
final ReentrantLock lock = this.lock;
lock.lock();
try {
return generation.broken;
} finally {
lock.unlock();
}
}
public void reset() { //重置
final ReentrantLock lock = this.lock;
lock.lock();
try {
breakBarrier(); // break the current generation
nextGeneration(); // start a new generation
} finally {
lock.unlock();
}
}
public int getNumberWaiting() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return parties - count;//返回被阻塞的線程數(shù)
} finally {
lock.unlock();
}
}
}
</pre>
<b>思考:</b>
1、CountDownLatch和CyclicBarrier都是通過設(shè)置的阻塞任務(wù)數(shù)減操作厚脉,而不是網(wǎng)上有些人說的CyclicBarrier就是通過count加知道任務(wù)數(shù)(JDK 1.7)
2习寸、CountDownLatch更多是直接結(jié)合AQS來做阻塞使用的是共享鎖,而CyclicBarrier是直接用ReentrantLock來實現(xiàn)使用的是排它鎖傻工,雖然ReentrantLock也是使用了AQS來實現(xiàn)霞溪,
3、CyclicBarrier的代碼結(jié)構(gòu)看起來更簡單清晰中捆,CountDownLatch用的很多基礎(chǔ)的AQS的方法
4威鹿、CountDownLatch偏向于計數(shù),CyclicBarrier可以指定柵欄結(jié)束以后的任務(wù)也方便重置當(dāng)前柵欄

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末轨香,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子幼东,更是在濱河造成了極大的恐慌臂容,老刑警劉巖科雳,帶你破解...
    沈念sama閱讀 218,284評論 6 506
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異脓杉,居然都是意外死亡糟秘,警方通過查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,115評論 3 395
  • 文/潘曉璐 我一進(jìn)店門球散,熙熙樓的掌柜王于貴愁眉苦臉地迎上來尿赚,“玉大人,你說我怎么就攤上這事蕉堰×杈唬” “怎么了?”我有些...
    開封第一講書人閱讀 164,614評論 0 354
  • 文/不壞的土叔 我叫張陵屋讶,是天一觀的道長冰寻。 經(jīng)常有香客問我,道長皿渗,這世上最難降的妖魔是什么斩芭? 我笑而不...
    開封第一講書人閱讀 58,671評論 1 293
  • 正文 為了忘掉前任,我火速辦了婚禮乐疆,結(jié)果婚禮上划乖,老公的妹妹穿的比我還像新娘。我一直安慰自己挤土,他們只是感情好琴庵,可當(dāng)我...
    茶點故事閱讀 67,699評論 6 392
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著耕挨,像睡著了一般细卧。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上筒占,一...
    開封第一講書人閱讀 51,562評論 1 305
  • 那天贪庙,我揣著相機(jī)與錄音,去河邊找鬼翰苫。 笑死止邮,一個胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的奏窑。 我是一名探鬼主播导披,決...
    沈念sama閱讀 40,309評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼埃唯!你這毒婦竟也來了撩匕?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 39,223評論 0 276
  • 序言:老撾萬榮一對情侶失蹤墨叛,失蹤者是張志新(化名)和其女友劉穎止毕,沒想到半個月后模蜡,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,668評論 1 314
  • 正文 獨居荒郊野嶺守林人離奇死亡扁凛,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,859評論 3 336
  • 正文 我和宋清朗相戀三年忍疾,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片谨朝。...
    茶點故事閱讀 39,981評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡卤妒,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出字币,到底是詐尸還是另有隱情则披,我是刑警寧澤,帶...
    沈念sama閱讀 35,705評論 5 347
  • 正文 年R本政府宣布纬朝,位于F島的核電站收叶,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏共苛。R本人自食惡果不足惜判没,卻給世界環(huán)境...
    茶點故事閱讀 41,310評論 3 330
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望隅茎。 院中可真熱鬧澄峰,春花似錦、人聲如沸辟犀。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,904評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽堂竟。三九已至魂毁,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間出嘹,已是汗流浹背席楚。 一陣腳步聲響...
    開封第一講書人閱讀 33,023評論 1 270
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機(jī)就差點兒被人妖公主榨干…… 1. 我叫王不留税稼,地道東北人烦秩。 一個月前我還...
    沈念sama閱讀 48,146評論 3 370
  • 正文 我出身青樓,卻偏偏與公主長得像郎仆,于是被迫代替她去往敵國和親只祠。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 44,933評論 2 355

推薦閱讀更多精彩內(nèi)容