CountDownLatch和CyclicBarrier區(qū)別
CountDownLatch 和 CyclicBarrier 是 Java 并發(fā)包提供的兩個(gè)非常易用的線程同步工具類(lèi)约郁,把它們放在一起介紹是因?yàn)樗鼈冎g有點(diǎn)像歇盼,又很不同。
CountDownLatch 主要用來(lái)解決一個(gè)線程等待多個(gè)線程的場(chǎng)景籍救,可以類(lèi)比旅游團(tuán)團(tuán)長(zhǎng)要等待所有的游客到齊才能去下一個(gè)景點(diǎn)胸蛛。
CyclicBarrier 是一組線程之間互相等待污茵,只要有一個(gè)線程沒(méi)有完成,其他線程都要等待,更像是你和你老婆不離不棄葬项。
對(duì)于CountDownLatch來(lái)說(shuō)泞当,重點(diǎn)是那一個(gè)線程, 是它在等待,而另外那N個(gè)線程在把“某個(gè)事情”做完之后可以繼續(xù)等待民珍,可以終止襟士。
而對(duì)于CyclicBarrier來(lái)說(shuō),重點(diǎn)是那一組(N個(gè))線程嚷量,他們之間任何一個(gè)沒(méi)有完成陋桂,所有的線程都必須等待。
除此之外 CountDownLatch 的計(jì)數(shù)器是不能循環(huán)利用的津肛,也就是說(shuō)一旦計(jì)數(shù)器減到 0章喉,再有線程調(diào)用 await(),該線程會(huì)直接通過(guò)。
但CyclicBarrier 的計(jì)數(shù)器是可以循環(huán)利用的秸脱,而且具備自動(dòng)重置的功能落包,一旦計(jì)數(shù)器減到 0 會(huì)自動(dòng)重置到你設(shè)置的初始值。除此之外摊唇,CyclicBarrier 還可以設(shè)置回調(diào)函數(shù)咐蝇,可以說(shuō)是功能豐富。
CountDownLatch使用
我們假設(shè)旅游團(tuán)有3個(gè)游客巷查,團(tuán)長(zhǎng)要等到游客都到齊了之后才能出發(fā)去下一個(gè)景點(diǎn)
CountDownLatch latch = new CountDownLatch(3);
ExecutorService executor = Executors.newFixedThreadPool(3);
IntStream.rangeClosed(1, 3).forEach(i -> {
executor.execute(() -> {
System.out.println("游客" + i + "到了集合地點(diǎn)");
latch.countDown();
});
});
latch.await();
System.out.println("所有人員都已經(jīng)到齊了有序,出發(fā)去下個(gè)景點(diǎn)");
首先創(chuàng)建了一個(gè) CountDownLatch,計(jì)數(shù)器的初始值等于 3岛请,之后每當(dāng)一個(gè)團(tuán)員到達(dá)就對(duì)計(jì)數(shù)器執(zhí)行減 1操作(latch.countDown()實(shí)現(xiàn))旭寿。在主線程中,我們通過(guò)調(diào)用 latch.await() 來(lái)實(shí)現(xiàn)對(duì)計(jì)數(shù)器等于 0 的等待崇败。
CountDownLatch源碼分析
CountDownLatch是通過(guò)AQS來(lái)實(shí)現(xiàn)的盅称,這里的計(jì)數(shù)器的值實(shí)際上是AQS中State的值,也就是我們的state的值會(huì)被初始化為我們傳入的值。
當(dāng)我們調(diào)用coutnDown的時(shí)候?qū)嶋H上是減去state的值(計(jì)數(shù)器減1)
// CountDownLatch中代碼
public void countDown() {
sync.releaseShared(1);
}
private static final class Sync extends AbstractQueuedSynchronizer {
// countDown會(huì)調(diào)用這個(gè)方法
protected boolean tryReleaseShared(int releases) {
for (;;) {
int c = getState();
if (c == 0)
return false;
// 計(jì)數(shù)器的值減1
int nextc = c-1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
}
//AQS中的代碼
public final boolean releaseShared(int arg) {
// 當(dāng)state等于0的時(shí)候后室,去喚醒阻塞的線程
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
調(diào)用await的時(shí)候缩膝,會(huì)判斷當(dāng)前state的值是否等于0,如果等于0,就代表其他線程已經(jīng)執(zhí)行完成了岸霹,可以接著往下執(zhí)行疾层。否則就阻塞當(dāng)前線程。
// CountDownLatch中代碼
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
// await會(huì)調(diào)用這個(gè)方法
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
// AQS中的代碼
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)
// 阻塞線程
doAcquireSharedInterruptibly(arg);
}
至此邏輯就很清晰了贡避。 當(dāng)我們調(diào)用countDown()方法的時(shí)候痛黎,會(huì)將AQS中的state的值減去1,當(dāng)state值變?yōu)?的時(shí)候會(huì)喚醒CLH隊(duì)列中阻塞的線程。當(dāng)我們調(diào)用await()方法的時(shí)候贸桶,會(huì)判斷state的值是否等于0舅逸,如果等于0則繼續(xù)往下執(zhí)行。如果不等于0則線程被阻塞皇筛,等待被喚醒(countDown()方法中會(huì)喚醒)琉历。
CyclicBarrier使用
周末的時(shí)候,我和我老婆一起去吃燒烤水醋,用CyclicBarrier來(lái)描述是這樣的
// 執(zhí)行回調(diào)的線程池旗笔,很重要的點(diǎn)
ExecutorService executor = Executors.newFixedThreadPool(1);
// 指定計(jì)數(shù)器的值和回調(diào)函數(shù)
CyclicBarrier barrier = new CyclicBarrier(2, () -> {
executor.execute(() -> System.out.println("到齊了,出發(fā)吃燒烤"));
});
new Thread(() -> {
try {
TimeUnit.SECONDS.sleep(2);
System.out.println("半個(gè)小時(shí)后拄踪,think123媳婦兒準(zhǔn)備好了");
barrier.await();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
}).start();
new Thread(() -> {
System.out.println("think123準(zhǔn)備好了");
try {
barrier.await();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
}).start();
CyclicBarrier源碼分析
public class CyclicBarrier {
private static class Generation {
boolean broken = false;
}
private final ReentrantLock lock = new ReentrantLock();
private final Condition trip = lock.newCondition();
private final int parties;
private final Runnable barrierCommand;
private Generation generation = new Generation();
private int count;
}
上面展示了CyclicBarrier的重要屬性,它的屬性名稱(chēng)還是挺有趣的蝇恶。
為了實(shí)現(xiàn)一組線程相互等待,使用到了lock和condition惶桐,而parties則是表明一組線程的個(gè)數(shù)(計(jì)數(shù)器)撮弧,count表示當(dāng)前有多少個(gè)線程還未執(zhí)行完成潘懊。 barrierCommand表示當(dāng)所有線程都就緒時(shí),需要回調(diào)的函數(shù)贿衍。而generation是為了實(shí)現(xiàn)計(jì)數(shù)器循環(huán)利用授舟,你可以理解為版本。
接下來(lái)我們看看await方法是如何實(shí)現(xiàn)的贸辈,下面的代碼我只保留了核心代碼邏輯
public int await() {
return dowait(false, 0L);
}
private int dowait(boolean timed, long nanos) {
final ReentrantLock lock = this.lock;
// 加鎖释树,獨(dú)占鎖
lock.lock();
final Generation g = generation;
//省略部分代碼
// 還有多少線程未執(zhí)行完成
int index = --count;
// 所有線程都執(zhí)行完成
if (index == 0) {
// 執(zhí)行barrierCommand回調(diào)函數(shù)
final Runnable command = barrierCommand;
// 注意這里調(diào)用的是Runnable的run方法,而不是thread.start()
if (command != null)
command.run();
ranAction = true;
// 重置計(jì)數(shù)器擎淤,并喚醒所有正在等待的線程
nextGeneration();
return 0;
}
// 只保留核心代碼
for (;;) {
// 阻塞線程
if (!timed)
trip.await();
else if (nanos > 0L)
nanos = trip.awaitNanos(nanos);
if (g != generation)
return index;
}
lock.unlock();
}
private void nextGeneration() {
// 喚醒所有正在休假的線程
trip.signalAll();
// set up next generation
count = parties;
generation = new Generation();
}
await的邏輯很簡(jiǎn)單奢啥,主要就是判斷當(dāng)前線程是否是最后一個(gè)執(zhí)行完成的線程,如果是最后一個(gè)嘴拢,則需要執(zhí)行回調(diào)函數(shù)桩盲,然后喚醒其他所有被阻塞的線程并重置計(jì)數(shù)器。
如果不是最后一個(gè)執(zhí)行完的炊汤,則阻塞當(dāng)前線程正驻。
尤其需要注意的CyclicBarrier的回調(diào)函數(shù)執(zhí)行在一個(gè)回合里最后執(zhí)行await()的線程上,而且是同步調(diào)用回調(diào)函數(shù)抢腐,調(diào)用完之后,才會(huì)開(kāi)始第二回合襟交。
所以回調(diào)函數(shù)如果不另開(kāi)一線程異步執(zhí)行迈倍,就起不到性能優(yōu)化的作用了。
寫(xiě)到最后
你告訴我捣域,要是你你怎么選啼染?
如果覺(jué)得好看,記得三連喲焕梅。