總體介紹
一個(gè)同步器:能夠讓一個(gè)或者多個(gè)線程等待等待某個(gè)條件的到來(lái)再繼續(xù)執(zhí)行元暴。
大家可以把CountDownLatch初始化的值認(rèn)為是有N道門,剛開(kāi)始是線程調(diào)用await方法發(fā)現(xiàn)門關(guān)著的,所以就只有等待婴渡。而外界條件的變化是通過(guò)countDown來(lái)實(shí)現(xiàn)俺抽,可以認(rèn)為countDown一次就是打開(kāi)一道門,當(dāng)countDown的次數(shù)為N時(shí)沸伏,則全部的門打開(kāi)糕珊,則剛才在await處等待的線程將繼續(xù)執(zhí)行任務(wù)。
看下具體的代碼實(shí)現(xiàn):
public class CountDownLatch {
private static final class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 4982264981922014374L;
...
}
private final Sync sync;
public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
this.sync = new Sync(count);
}
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
public void countDown() {
sync.releaseShared(1);
}
}
CountDownLatch的代碼非常簡(jiǎn)單毅糟,內(nèi)部有一個(gè)基于AQS實(shí)現(xiàn)的同步器红选,重載了構(gòu)造方法,然后就是await和countDown方法姆另。
同步器的實(shí)現(xiàn)
我們先看一下同步器的實(shí)現(xiàn):
private static final class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 4982264981922014374L;
Sync(int count) { //同步器的構(gòu)造方法喇肋,必須傳入初始化狀態(tài)
setState(count);
}
//獲取狀態(tài)
int getCount() {
return getState();
}
/**
AQS的tryAcquireShared的實(shí)現(xiàn),看到tryAcquireShared我們更多的會(huì)想起ReentrantReadWriteLock迹辐;
它的同步器同樣對(duì)tryAcquireShared進(jìn)行了實(shí)現(xiàn)蝶防,但是他們之間的實(shí)現(xiàn)有很大區(qū)別:
這里的tryAcquireShared過(guò)程根本沒(méi)有對(duì)state進(jìn)行累加,反而是只判斷state是否等于0明吩,
所以這里大家就可以把state想象成我們剛才說(shuō)的門间学,state為多少就有多少門。
**/
//代碼1
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
//這里的tryReleaseShared與讀寫鎖的實(shí)現(xiàn)邏輯大致一樣
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c-1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
}
await方法
接下來(lái)我們來(lái)看下是如果利用AQS實(shí)現(xiàn)latch的,首先我們看await方法:
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1); //調(diào)用同步器獲取鎖
}
同步器調(diào)用AQS的方法:
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
這里整體的方法已經(jīng)在AQS的源碼解析過(guò)專門講解過(guò)低葫,共享節(jié)點(diǎn)如果放入同步隊(duì)列详羡,如何等待,如果冒泡共享鎖等嘿悬,這里不再專門說(shuō)实柠,我們要關(guān)注的核心是tryAcquireShared,由代碼1我們可以看到tryAcquireShared的實(shí)現(xiàn)只有當(dāng)state為0時(shí)才會(huì)獲取鎖成功(CountDownLatch的初始會(huì)將state出初始化為非0)善涨,否則都是獲取鎖失敗窒盐,而獲取鎖失敗的線程自然就進(jìn)入了同步隊(duì)列進(jìn)行等待。這就是實(shí)現(xiàn)一個(gè)或者多少線程并發(fā)的情況躯概,協(xié)同的等待某個(gè)條件登钥。
countDown方法
再來(lái)看下如何實(shí)現(xiàn)打開(kāi)這N個(gè)門的:
public void countDown() {
sync.releaseShared(1);
}
看到這段代碼其實(shí)就已經(jīng)足夠了,每次countDown都會(huì)釋放一次鎖(其實(shí)就是state減1娶靡,其實(shí)就是打開(kāi)一道門)牧牢,當(dāng)初始化的值N都被減去后,則在await上等待鎖的線程將會(huì)被喚醒(釋放鎖的線程會(huì)嘗試喚醒等待鎖的節(jié)點(diǎn))姿锭,從而繼續(xù)執(zhí)行任務(wù)塔鳍。
總結(jié):
1.CountDownLatch使用AQS的共享模式實(shí)現(xiàn)。
2.初始化N時(shí)其實(shí)相當(dāng)于初始化了N把共享鎖呻此,只是這N把鎖不是通過(guò)tryAcquireShared來(lái)獲取的轮纫,而是直接初始化的。
3.業(yè)務(wù)線程調(diào)用await相當(dāng)于就是等待初始化的N把鎖釋放
4.countDown其實(shí)就相當(dāng)于釋放著N把鎖焚鲜,一次釋放一把