點(diǎn)贊再看,養(yǎng)成習(xí)慣急黎,搜一搜【一角錢技術(shù)】關(guān)注更多原創(chuàng)技術(shù)文章扎狱。本文 GitHub org_hejianhui/JavaStudy 已收錄,有我的系列文章勃教。
前言
控制并發(fā)流程的工具類淤击,作用就是幫助我們程序員更容易的讓線程之間合作,讓線程之間相互配合來滿足業(yè)務(wù)邏輯故源。比如讓線程A等待線程B執(zhí)行完畢后再執(zhí)行等合作策略污抬。
控制并發(fā)流程的工具類主要有:
類 | 作用 | 說明 |
---|---|---|
Semaphore | 信號(hào)量,可以通過控制“許可證”的數(shù)量绳军,來保證線程之間的配合 | 線程只有拿到“許可證”后才能繼續(xù)運(yùn)行印机,相比于其它的同步器,更靈活 |
CyclicBarrier | 線程會(huì)等待门驾,直到足夠多線程達(dá)到了事先規(guī)定的數(shù)目射赛。一旦達(dá)到觸發(fā)條件,就可以進(jìn)行下一步的動(dòng)作 | 適用于線程之間相互等待處理結(jié)果的就緒場(chǎng)景 |
Phaser | 和CyclicBarrier類似奶是,但是計(jì)數(shù)可變 | Java7加入的 |
CountDownLatch | 和CyclicBarrier類似楣责,數(shù)量遞減到0時(shí),觸發(fā)動(dòng)作 | 不可重復(fù)使用 |
Exchanger | 讓兩個(gè)線程在合適時(shí)交換對(duì)象 | 適用場(chǎng)景:當(dāng)兩個(gè)線程工作在同一個(gè)類的不同實(shí)例上時(shí)诫隅,用于交換數(shù)據(jù) |
Condition | 可以控制線程的“等待”和“喚醒” | 是Object.wait() 的升級(jí)版 |
簡(jiǎn)介
背景
- CountDownLatch是在Java1.5被引入腐魂,跟它一起被引入的工具類還有CyclicBarrier、Semaphore逐纬、ConcurrenthashMap和BlockingQueue。
- 在java.util.cucurrent包下削樊。
概念
- CountDownLatch是一個(gè)同步計(jì)數(shù)器豁生,他允許一個(gè)或者多個(gè)線程在另外一組線程執(zhí)行完成之前一直等待,基于AQS共享模式實(shí)現(xiàn)的漫贞。
- 是通過一個(gè)計(jì)數(shù)器來實(shí)現(xiàn)的甸箱,計(jì)數(shù)器的初始值是線程的數(shù)量。每當(dāng)一個(gè)線程執(zhí)行完畢后迅脐,計(jì)數(shù)器的值就-1芍殖,當(dāng)計(jì)數(shù)器的值為0時(shí),表示所有線程都執(zhí)行完畢谴蔑,然后在閉鎖上等待的線程就可以恢復(fù)工作來豌骏。
關(guān)于 AQS龟梦,可以查看《并發(fā)編程之抽象隊(duì)列同步器AQS應(yīng)用ReentrantLock》
應(yīng)用場(chǎng)景
Zookeeper分布式鎖,Jmeter模擬高并發(fā)等
場(chǎng)景1 讓多個(gè)線程等待:模擬并發(fā)窃躲,讓并發(fā)線程一起執(zhí)行
為了模擬高并發(fā)计贰,讓一組線程在指定時(shí)刻(秒殺時(shí)間)執(zhí)行搶購,這些線程在準(zhǔn)備就緒后蒂窒,進(jìn)行等待(CountDownLatch.await())躁倒,直到秒殺時(shí)刻的到來,然后一擁而上洒琢。這也是本地測(cè)試接口并發(fā)的一個(gè)簡(jiǎn)易實(shí)現(xiàn)秧秉。
在這個(gè)場(chǎng)景中,CountDownLatch充當(dāng)?shù)氖且粋€(gè)發(fā)令槍
的角色衰抑;就像田徑賽跑時(shí)象迎,運(yùn)動(dòng)員會(huì)在起跑線做準(zhǔn)備動(dòng)作,等到發(fā)令槍一聲響停士,運(yùn)動(dòng)員就會(huì)奮力奔跑挖帘。和上面的秒殺場(chǎng)景類似。
代碼實(shí)現(xiàn)如下:
package com.niuh.tools;
import java.util.concurrent.CountDownLatch;
/**
* <p>
* CountDownLatch示例
* 場(chǎng)景1 讓多個(gè)線程等待:模擬并發(fā)恋技,讓并發(fā)線程一起執(zhí)行
* </p>
*/
public class CountDownLatchRunner1 {
public static void main(String[] args) throws InterruptedException {
CountDownLatch countDownLatch = new CountDownLatch(1);
for (int i = 0; i < 5; i++) {
new Thread(() -> {
try {
//準(zhǔn)備完畢……運(yùn)動(dòng)員都阻塞在這拇舀,等待號(hào)令
countDownLatch.await();
String parter = "【" + Thread.currentThread().getName() + "】";
System.out.println(parter + "開始執(zhí)行……");
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
}
Thread.sleep(2000);// 裁判準(zhǔn)備發(fā)令
countDownLatch.countDown();// 發(fā)令槍:執(zhí)行發(fā)令
}
}
運(yùn)行結(jié)果:
【Thread-2】開始執(zhí)行……
【Thread-4】開始執(zhí)行……
【Thread-3】開始執(zhí)行……
【Thread-0】開始執(zhí)行……
【Thread-1】開始執(zhí)行……
我們通過CountDownLatch.await(),讓多個(gè)參與者線程啟動(dòng)后阻塞等待蜻底,然后在主線程 調(diào)用CountDownLatch.countdown(1) 將計(jì)數(shù)減為0骄崩,讓所有線程一起往下執(zhí)行;以此實(shí)現(xiàn)了多個(gè)線程在同一時(shí)刻并發(fā)執(zhí)行薄辅,來模擬并發(fā)請(qǐng)求的目的要拂。
場(chǎng)景2 讓單個(gè)線程等待:多個(gè)線程(任務(wù))完成后,進(jìn)行匯總合并
很多時(shí)候站楚,我們的并發(fā)任務(wù)忧侧,存在前后依賴關(guān)系;比如數(shù)據(jù)詳情頁需要同時(shí)調(diào)用多個(gè)接口獲取數(shù)據(jù)劣坊,并發(fā)請(qǐng)求獲取到數(shù)據(jù)后政勃、需要進(jìn)行結(jié)果合并;或者多個(gè)數(shù)據(jù)操作完成后旧乞,需要數(shù)據(jù)check蔚润;這其實(shí)都是:在多個(gè)線程(任務(wù))完成后,進(jìn)行匯總合并的場(chǎng)景尺栖。
代碼實(shí)現(xiàn)如下:
package com.niuh.tools;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ThreadLocalRandom;
/**
* <p>
* CountDownLatch示例
* 場(chǎng)景2 讓單個(gè)線程等待:多個(gè)線程(任務(wù))完成后嫡纠,進(jìn)行匯總合并
* </p>
*/
public class CountDownLatchRunner2 {
public static void main(String[] args) throws InterruptedException {
CountDownLatch countDownLatch = new CountDownLatch(5);
for (int i = 0; i < 5; i++) {
final int index = i;
new Thread(() -> {
try {
Thread.sleep(1000 + ThreadLocalRandom.current().nextInt(1000));
System.out.println("finish" + index + Thread.currentThread().getName());
countDownLatch.countDown();
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
}
countDownLatch.await();// 主線程在阻塞,當(dāng)計(jì)數(shù)器==0,就喚醒主線程往下執(zhí)行除盏。
System.out.println("主線程:在所有任務(wù)運(yùn)行完成后叉橱,進(jìn)行結(jié)果匯總");
}
}
運(yùn)行結(jié)果:
finish3Thread-3
finish0Thread-0
finish1Thread-1
finish4Thread-4
finish2Thread-2
主線程:在所有任務(wù)運(yùn)行完成后,進(jìn)行結(jié)果匯總
在每個(gè)線程(任務(wù)) 完成的最后一行加上CountDownLatch.countDown()痴颊,讓計(jì)數(shù)器-1赏迟;當(dāng)所有線程完成-1,計(jì)數(shù)器減到0后蠢棱,主線程往下執(zhí)行匯總?cè)蝿?wù)锌杀。
源碼分析
本文基于JDK1.8
CountDownLatch 類圖
從圖中可以看出CountDownLatch是基于Sync類實(shí)現(xiàn)的,而Sync繼承AQS泻仙,使用的是AQS共享模式糕再。
其內(nèi)部主要變量和方法如下:
在我們方法中調(diào)用 awit()
和 countDown()
的時(shí)候,發(fā)生了幾個(gè)關(guān)鍵的調(diào)用關(guān)系玉转,如下圖所示:
其與AQS交互原理如下:
構(gòu)造函數(shù)
CountDownLatch類中只提供了一個(gè)構(gòu)造器突想,參數(shù)count為計(jì)數(shù)器的大小
public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
this.sync = new Sync(count);
}
這里需要注意,設(shè)置state的數(shù)量只有在初始化CountDownLatch的時(shí)候究抓,如果該state被減成了0猾担,就無法繼續(xù)使用這個(gè)CountDownLatch了,需要重新new一個(gè)刺下,這就是這個(gè)類不可重用的原因绑嘹,有另一個(gè)類也實(shí)現(xiàn)了類似的功能,但是可以重用橘茉,就是CyclicBarrier工腋。
內(nèi)部同步器
private static final class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 4982264981922014374L;
//初始化,設(shè)置資源個(gè)數(shù)
Sync(int count) {
setState(count);
}
//獲取共享資源個(gè)數(shù)
int getCount() {
return getState();
}
//嘗試獲取共享鎖畅卓,只有當(dāng)共享資源個(gè)數(shù)為0的時(shí)候擅腰,才會(huì)返回1,否則為-1
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
//釋放共享資源翁潘,通過CAS每次對(duì)state減1
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c-1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
}
主要方法
類中有三個(gè)方法是最重要的
// 調(diào)用await()方法的線程會(huì)被掛起趁冈,它會(huì)等待直到count值為0才繼續(xù)執(zhí)行
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
//和await()方法類似,只不過等待一定的時(shí)間后count值還沒變?yōu)?的化就會(huì)繼續(xù)執(zhí)行
public boolean await(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}
//將count值減1
public void countDown() {
sync.releaseShared(1);
}
await()方法
// 調(diào)用await()方法的線程會(huì)被掛起拜马,它會(huì)等待直到count值為0才繼續(xù)執(zhí)行
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
進(jìn)入AbstractQueuedSynchronizer #acquireSharedInterruptibly()方法.
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
//等待過程不可中斷
if (Thread.interrupted())
throw new InterruptedException();
//這里的tryAcquireShared在AbstractQueuedSynchronizer中沒有實(shí)現(xiàn)箱歧,在上面介紹的Sync中實(shí)現(xiàn)的
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
在上面介紹Sync類的時(shí)候#tryAcquireShared(),當(dāng)AQS的state = 0的時(shí)候才會(huì)返回1一膨,否則一直返回-1,如果返回-1洒沦,要執(zhí)行#doAcquireSharedInterruptibly()豹绪,進(jìn)入該方法
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
//這里就把主線程加入隊(duì)列,隊(duì)列中有兩個(gè)節(jié)點(diǎn),第一個(gè)是虛擬節(jié)點(diǎn)瞒津,第二個(gè)就是主線程節(jié)點(diǎn)
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
//總共只有兩個(gè)節(jié)點(diǎn)蝉衣,主線程前一個(gè)就是首節(jié)點(diǎn)
final Node p = node.predecessor();
if (p == head) {
//這里又執(zhí)行到CountDownLatch中Sync類中實(shí)現(xiàn)的方法,判斷state是否為0
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
//如果state不為0巷蚪,這里會(huì)把主線程掛起阻塞
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
這里使用AQS很神奇病毡,在阻塞隊(duì)列中就只加入了一個(gè)主線程,但是呢屁柏,只要其他線程沒有執(zhí)行完啦膜,那state就不為0,那主線程就在這里阻塞著淌喻,那問題了僧家,誰來喚醒這個(gè)主線程呢?就是 countDown() 這個(gè)方法裸删。
await(long timeout, TimeUnit unit)方法
該方法就是指定等待時(shí)間八拱,如果在規(guī)定的等待時(shí)間中沒有完成,就直接返回false涯塔,在主線程中可以根據(jù)這個(gè)狀態(tài)進(jìn)行后續(xù)的處理肌稻。
//和await()方法類似,只不過等待一定的時(shí)間后count值還沒變?yōu)?的化就會(huì)繼續(xù)執(zhí)行
public boolean await(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}
countDown() 方法
//將count值減1
public void countDown() {
sync.releaseShared(1);
}
進(jìn)入AbstractQueuedSynchronizer #releaseShared方法
public final boolean releaseShared(int arg) {
//該方法同樣在AbstractQueuedSynchronizer中沒有實(shí)現(xiàn)匕荸,在CountDownLatch中實(shí)現(xiàn)
if (tryReleaseShared(arg)) {
//喚醒主線程
doReleaseShared();
return true;
}
return false;
}
在分析Sync類的時(shí)候爹谭,介紹了tryReleaseShared(),該方法會(huì)把AQS的state減1,如果減1操作成功每聪,執(zhí)行喚醒主線程操作旦棉,進(jìn)入AbstractQueuedSynchronizer#tryReleaseShared()方法
private void doReleaseShared() {
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
//首節(jié)點(diǎn)狀態(tài)為SIGNAL = -1
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
//喚醒主線程,也就是隊(duì)列中的第二個(gè)節(jié)點(diǎn),如果線程沒有執(zhí)行完成药薯,主線程被喚醒之后绑洛,發(fā)現(xiàn)state依然不為零,會(huì)再次阻塞
unparkSuccessor(h);
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}
總結(jié)
CountDownLatch 和 Semaphore 一樣都是共享模式下資源問題童本,這些源碼實(shí)現(xiàn)AQS的模版方法真屯,然后使用CAS+循環(huán)重試實(shí)現(xiàn)自己的功能。在RT多個(gè)資源調(diào)用穷娱,或者執(zhí)行某種操作依賴其他操作完成下可以發(fā)揮這個(gè)計(jì)數(shù)器的作用绑蔫。
CountDownLatch就只在隊(duì)列中放入一個(gè)主線程,然后不停的喚醒泵额,喚醒之后發(fā)現(xiàn)state還是不為0配深,就繼續(xù)等待。每個(gè)子線程執(zhí)行完都會(huì)對(duì)state進(jìn)行減1操作嫁盲,當(dāng)所有子線程都執(zhí)行完了篓叶,那state也就為0,這時(shí)候主線程被喚醒之后才可以繼續(xù)執(zhí)行。而這也正是CountDownLatch不可重用的原因缸托,如果想要重用左敌,需要重新new一個(gè),因?yàn)橹挥性趎ew的時(shí)候才可以設(shè)置資源的數(shù)量俐镐。
CountDownLatch與Thread.join
CountDownLatch的作用就是允許一個(gè)或多個(gè)線程等待其他線程完成操作矫限,看起來有點(diǎn)類似join() 方法,但其提供了比 join() 更加靈活的API佩抹。
CountDownLatch可以手動(dòng)控制在n個(gè)線程里調(diào)用n次countDown()方法使計(jì)數(shù)器進(jìn)行減一操作叼风,也可以在一個(gè)線程里調(diào)用n次執(zhí)行減一操作。
而 join() 的實(shí)現(xiàn)原理是不停檢查join線程是否存活匹摇,如果 join 線程存活則讓當(dāng)前線程永遠(yuǎn)等待咬扇。所以兩者之間相對(duì)來說還是CountDownLatch使用起來較為靈活。
CountDownLatch與CyclicBarrier
CountDownLatch和CyclicBarrier都能夠?qū)崿F(xiàn)線程之間的等待廊勃,只不過它們側(cè)重點(diǎn)不同:
- CountDownLatch一般用于一個(gè)或多個(gè)線程懈贺,等待其他線程執(zhí)行完任務(wù)后,再才執(zhí)行坡垫;
- CyclicBarrier一般用于一組線程互相等待至某個(gè)狀態(tài)梭灿,然后這一組線程再同時(shí)執(zhí)行;
- CountDownLatch 是一次性的冰悠,CyclicBarrier 是可循環(huán)利用的堡妒;
- CountDownLathch是一個(gè)計(jì)數(shù)器,線程完成一個(gè)記錄一個(gè)溉卓,計(jì)數(shù)器遞減皮迟,只能用一次。如下圖:
- CyclicBarrier的計(jì)數(shù)器更像一個(gè)閥門桑寨,需要所有線程都到達(dá)伏尼,然后繼續(xù)執(zhí)行,計(jì)數(shù)器遞減尉尾,提供reset功能爆阶,可以多次使用。如下圖:
PS:以上代碼提交在 Github :https://github.com/Niuh-Study/niuh-juc-final.git
文章持續(xù)更新沙咏,可以搜一搜「 一角錢技術(shù) 」第一時(shí)間閱讀辨图, 本文 GitHub org_hejianhui/JavaStudy 已經(jīng)收錄,歡迎 Star肢藐。