CountDownLatch是一個(gè)同步工具類,用來協(xié)調(diào)多個(gè)線程之間的同步。
CountDownLatch能夠使一個(gè)線程在等待另外一些線程完成各自工作之后,再繼續(xù)執(zhí)行。使用一個(gè)計(jì)數(shù)器進(jìn)行實(shí)現(xiàn)冗懦。計(jì)數(shù)器初始值為線程的數(shù)量。當(dāng)每一個(gè)線程完成自己任務(wù)后仇祭,計(jì)數(shù)器的值就會(huì)減一披蕉。當(dāng)計(jì)數(shù)器的值為0時(shí),表示所有的線程都已經(jīng)完成一些任務(wù)乌奇,然后在CountDownLatch上等待的線程就可以恢復(fù)執(zhí)行接下來的任務(wù)没讲。
舉個(gè)例子來說明CountDownLatch的使用:
百米賽跑,10名運(yùn)動(dòng)員選手到達(dá)場地等待裁判口令礁苗,裁判一聲口令爬凑,選手聽到后同時(shí)起跑,當(dāng)所有選手到達(dá)終點(diǎn)试伙,裁判進(jìn)行匯總排名嘁信。
public class CountDownLatchTest {
public static void main(String[] args) {
ExecutorService service = Executors.newCachedThreadPool();
final CountDownLatch cdOrder = new CountDownLatch(1);
final CountDownLatch cdAnswer = new CountDownLatch(10);
for(int i=0; i<10; i++){
Runnable runnable = new Runnable() {
@Override
public void run() {
try{
System.out.println("選手" + Thread.currentThread().getName() + "正在等待裁判發(fā)布口令");
cdOrder.await();
System.out.println("選手" + Thread.currentThread().getName() + "已接受裁判口令");
Thread.sleep((long) (Math.random() * 10000));
System.out.println("選手" + Thread.currentThread().getName() + "到達(dá)終點(diǎn)");
cdAnswer.countDown();
}catch (InterruptedException ie){
ie.printStackTrace();
}
}
};
service.execute(runnable);
}
try {
Thread.sleep((long) (Math.random() * 10000));
System.out.println("裁判"+Thread.currentThread().getName()+"即將發(fā)布口令");
cdOrder.countDown();
System.out.println("裁判"+Thread.currentThread().getName()+"已發(fā)送口令,正在等待所有選手到達(dá)終點(diǎn)");
cdAnswer.await();
System.out.println("所有選手都到達(dá)終點(diǎn)");
System.out.println("裁判"+Thread.currentThread().getName()+"匯總成績排名");
} catch (InterruptedException e) {
e.printStackTrace();
}
service.shutdown();
}
}
運(yùn)行結(jié)果如下:
選手pool-1-thread-1正在等待裁判發(fā)布口令
選手pool-1-thread-3正在等待裁判發(fā)布口令
選手pool-1-thread-2正在等待裁判發(fā)布口令
選手pool-1-thread-4正在等待裁判發(fā)布口令
選手pool-1-thread-5正在等待裁判發(fā)布口令
選手pool-1-thread-6正在等待裁判發(fā)布口令
選手pool-1-thread-7正在等待裁判發(fā)布口令
選手pool-1-thread-8正在等待裁判發(fā)布口令
選手pool-1-thread-9正在等待裁判發(fā)布口令
選手pool-1-thread-10正在等待裁判發(fā)布口令
裁判main即將發(fā)布口令
裁判main已發(fā)送口令疏叨,正在等待所有選手到達(dá)終點(diǎn)
選手pool-1-thread-3已接受裁判口令
選手pool-1-thread-2已接受裁判口令
選手pool-1-thread-1已接受裁判口令
選手pool-1-thread-9已接受裁判口令
選手pool-1-thread-8已接受裁判口令
選手pool-1-thread-7已接受裁判口令
選手pool-1-thread-6已接受裁判口令
選手pool-1-thread-5已接受裁判口令
選手pool-1-thread-4已接受裁判口令
選手pool-1-thread-10已接受裁判口令
選手pool-1-thread-7到達(dá)終點(diǎn)
選手pool-1-thread-3到達(dá)終點(diǎn)
選手pool-1-thread-10到達(dá)終點(diǎn)
選手pool-1-thread-9到達(dá)終點(diǎn)
選手pool-1-thread-1到達(dá)終點(diǎn)
選手pool-1-thread-5到達(dá)終點(diǎn)
選手pool-1-thread-4到達(dá)終點(diǎn)
選手pool-1-thread-2到達(dá)終點(diǎn)
選手pool-1-thread-8到達(dá)終點(diǎn)
選手pool-1-thread-6到達(dá)終點(diǎn)
所有選手都到達(dá)終點(diǎn)
裁判main匯總成績排名
具體看一下CountDownLatch是如何實(shí)現(xiàn)線程調(diào)度的潘靖。
首先看一下其構(gòu)造方法:
public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
this.sync = new Sync(count);
}
繼續(xù)看一下Sync。
// Sync繼承自AQS
private static final class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 4982264981922014374L;
Sync(int count) {
setState(count);
}
int getCount() {
return getState();
}
// 定義嘗試獲取共享鎖的方法
protected int tryAcquireShared(int acquires) {
// 當(dāng)狀態(tài)為0蚤蔓,則該線程獲取到該共享鎖
return (getState() == 0) ? 1 : -1;
}
protected boolean tryReleaseShared(int releases) {
// 減少count卦溢,當(dāng)檢測到狀態(tài)值為0時(shí),通知同步隊(duì)列中被掛起的線程
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c-1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
}
可以發(fā)現(xiàn)秀又,CountDownLatch是基于AQS共享鎖來實(shí)現(xiàn)的单寂,,只要共享鎖狀態(tài)值不為0,則請求共享鎖的線程均會(huì)添加到同步隊(duì)列中涮坐,阻塞掛起凄贩,等待被通知。
接著看一下await方法:
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
直接調(diào)用的是AQS的acquireSharedInterruptibly方法:
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
// tryAcquireShared由Sync實(shí)現(xiàn)袱讹,即只要狀態(tài)不為0,則返回-1
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
CountDownLatch初始化后狀態(tài)值肯定不為0,所以當(dāng)前線程tryAcquireShared必然返回-1捷雕,繼續(xù)執(zhí)行doAcquireSharedInterruptibly方法椒丧。
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
// 將當(dāng)前線程添加到同步隊(duì)列
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
// 若當(dāng)前線程所在節(jié)點(diǎn)的前繼節(jié)點(diǎn)為頭節(jié)點(diǎn),則執(zhí)行tryAcquireShared嘗試獲取共享鎖
if (p == head) {
// 因?yàn)闋顟B(tài)值不為0救巷,tryAcquireShared必然返回-1
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
// 將前繼節(jié)點(diǎn)狀態(tài)CAS更改為SIGNAL后壶熏,然后線程阻塞掛起
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
綜上,持有CountDownLatch的線程只要調(diào)用await方法浦译,就會(huì)被添加進(jìn)AQS的同步隊(duì)列棒假,并被阻塞掛起。
那這些被阻塞掛起的線程啥時(shí)候會(huì)被喚醒繼續(xù)執(zhí)行呢精盅?
答案在CountDownLatch的countDown方法中:
public void countDown() {
sync.releaseShared(1);
}
底層調(diào)用的是AQS的releaseShared方法:
public final boolean releaseShared(int arg) {
// 若狀態(tài)執(zhí)行-1操作后帽哑,狀態(tài)值未歸零,tryReleaseShared返回false
// 若狀態(tài)執(zhí)行-1操作后叹俏,狀態(tài)值歸零妻枕,tryReleaseShared返回true
if (tryReleaseShared(arg)) {
// 若狀態(tài)值歸零,繼續(xù)執(zhí)行doReleaseShared方法
doReleaseShared();
return true;
}
return false;
}
當(dāng)狀態(tài)值歸零后粘驰,當(dāng)前線程會(huì)執(zhí)行AQS的doReleaseShared方法屡谐,doReleaseShared方法我們在之前AQS詳解的系列文章里詳細(xì)介紹過,該方法是一個(gè)"喚醒風(fēng)暴"蝌数,其會(huì)喚醒同步隊(duì)列中阻塞掛起的線程愕掏。
喚醒后的線程會(huì)進(jìn)行獲取鎖的操作,當(dāng)狀態(tài)值歸零后顶伞,由于tryReleaseShared恒返回1饵撑,代表任何線程均可以獲取共享鎖成功,當(dāng)線程獲取到鎖之后枝哄,會(huì)執(zhí)行setHeadAndPropagate方法將當(dāng)前節(jié)點(diǎn)置為頭節(jié)點(diǎn)并喚醒后繼節(jié)點(diǎn)肄梨,后繼節(jié)點(diǎn)被喚醒后執(zhí)行獲取鎖操作,如此反復(fù)挠锥,直到同步隊(duì)列中的所有阻塞線程被喚醒众羡。
需要注意的是:CountDownLatch是一次性的,計(jì)算器的值只能在構(gòu)造方法中初始化一次蓖租,之后沒有任何機(jī)制再次對其設(shè)置值粱侣,當(dāng)CountDownLatch使用完畢后,它不能再次被使用蓖宦。