[toc]
1.類結(jié)構(gòu)及注釋部分
1.1 類結(jié)構(gòu)
CountDownLatch是我們常用的并發(fā)工具,主要用于倒數(shù)計(jì)數(shù)等場(chǎng)景拿霉,如在zookeeper連接管理中用于初始化連接數(shù)。CountDownlatch是AbstractQueueSynchronizer的共享模式實(shí)現(xiàn)。實(shí)際上匾竿,我們可以理解AQS為什么沒有將所有方法定義為abstract方法绰沥,這是因?yàn)樽宇惪梢愿鶕?jù)共享還是獨(dú)占模式來(lái)自由選擇需要實(shí)現(xiàn)的方法篱蝇。
CountDownLatch的類結(jié)構(gòu)如下:
1.2
翻譯部分:
CountDownLatch是一種同步工具,允許一個(gè)或多個(gè)線程等待揪利,直到在其他線程中執(zhí)行的一組操作完成為止态兴。
CountDownLatch使用count進(jìn)行初始化,await方法將當(dāng)前線程阻塞疟位,直到調(diào)用countDown方法而導(dǎo)致當(dāng)前計(jì)數(shù)器歸零瞻润,此后所有的線程均被釋放,并且await的后續(xù)調(diào)用將立即返回甜刻。這是一種一次性的現(xiàn)象绍撞,計(jì)數(shù)器無(wú)法重置,如果需要用于重置的計(jì)數(shù)器版本得院,請(qǐng)考慮使用CyclicBarrier傻铣。
CountDownLatch是一種多功能的同步工具,可以用于多種用途祥绞,當(dāng)CountDownLatch初始化為1時(shí)非洲,用于一個(gè)簡(jiǎn)單的on/off開關(guān)鸭限。所有調(diào)用await的線程在await等待,直到被調(diào)用countDown两踏,初始化為N的CountDownLatch可用于使一個(gè)線程等待直到N線程已完成某項(xiàng)操作或者某項(xiàng)操作已完成N次败京。
CountDownLatch的一個(gè)有用的屬性是,它不需要調(diào)用countDown的線程在繼續(xù)計(jì)數(shù)之前就等待計(jì)數(shù)值到達(dá)0梦染,它只是防止任何線程經(jīng)過await直到所有的線程都可以通過赡麦。
用法示例:
這是一組類,其中一組線程使用兩個(gè)CountDownLatch帕识。
第一個(gè)是啟動(dòng)信號(hào)泛粹,可防止任何worker繼續(xù)前進(jìn),直到driver都完成準(zhǔn)備為止肮疗。第二個(gè)是完成信號(hào)晶姊,允許driver等到所有worker都完成為止。
class Driver { // ...
void main() throws InterruptedException {
CountDownLatch startSignal = new CountDownLatch(1);
CountDownLatch doneSignal = new CountDownLatch(N);
for (int i = 0; i < N; ++i) // create and start threads
new Thread(new Worker(startSignal,
doSomethingElse(); // don't let run yet
startSignal.countDown(); // let all threads proceed
doSomethingElse();
doneSignal.await(); // wait for all to finish
}
}
class Worker implements Runnable {
private final CountDownLatch startSignal;
private final CountDownLatch doneSignal;
Worker(CountDownLatch startSignal, CountDownLatch doneSignal) {
this.startSignal = startSignal;
this.doneSignal = doneSignal;
}
public void run() {
try {
startSignal.await();
doWork();
doneSignal.countDown();
} catch (InterruptedException ex) {} // return;
}
void doWork() { ... }
}}
另外一個(gè)典型的用法是將問題分為N個(gè)部分族吻,用Runnable描述每個(gè)部分帽借,該Runnable執(zhí)行該部分并在CountDownLatch中遞減計(jì)數(shù),并將所有的Runnable排隊(duì)給執(zhí)行程序超歌。當(dāng)所有的子部分都執(zhí)行完成時(shí)砍艾,協(xié)調(diào)線程將能夠通過等待。當(dāng)線程必須以此方法反復(fù)遞減計(jì)數(shù)器時(shí)巍举,請(qǐng)使用CyclicBarrier脆荷。
class Driver2 { // ...
void main() throws InterruptedException {
CountDownLatch doneSignal = new CountDownLatch(N);
Executor e = ...
for (int i = 0; i < N; ++i) // create and start threads
e.execute(new WorkerRunnable(doneSignal, i));
doneSignal.await(); // wait for all to finish
}
}
class WorkerRunnable implements Runnable {
private final CountDownLatch doneSignal;
private final int i;
WorkerRunnable(CountDownLatch doneSignal, int i) {
this.doneSignal = doneSignal;
this.i = i;
}
public void run() {
try {
doWork(i);
doneSignal.countDown();
} catch (InterruptedException ex) {} // return;
}
void doWork() { ... }
}}
內(nèi)存一致性分析,在計(jì)數(shù)器到達(dá)0之前懊悯,在調(diào)用CountDown()之前在線程中執(zhí)行操作happen-before于在另外一個(gè)線程中await()成功返回的動(dòng)作蜓谋。
2. 內(nèi)部類Sync
這是AQS的實(shí)現(xiàn)類,主要實(shí)現(xiàn)了兩個(gè)方法炭分,tryAcquireShared和tryReleaseShared桃焕。
public class CountDownLatch {
/**
* Synchronization control For CountDownLatch.
* Uses AQS state to represent count.
*/
private static final class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 4982264981922014374L;
Sync(int count) {
setState(count);
}
int getCount() {
return getState();
}
//根據(jù)state是否為0來(lái)決定此方法的返回,這是因?yàn)榕趺绻麨?观堂,則說明條件已經(jīng)達(dá)到,await的方法阻塞的線程需要被喚醒呀忧,反之則說明條件沒有達(dá)到师痕,后續(xù)線程需要繼續(xù)WAIT。這說明tryAcquireShared方法是一個(gè)控制AQS在共享模式下后續(xù)運(yùn)行的方法而账。
protected int tryAcquireShared(int acquires) {
//判斷state是否為0胰坟,如果為0則返回1,反之返回-1
return (getState() == 0) ? 1 : -1;
}
//釋放
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
//死循環(huán)
for (;;) {
//拿到state
int c = getState();
//判斷是否為0 如果為0 返回false 即倒計(jì)時(shí)器不能再減
if (c == 0)
return false;
//計(jì)算減1之后的值
int nextc = c-1;
//cas修改這個(gè)值
if (compareAndSetState(c, nextc))
//只有當(dāng)減去之后為0 則返回true 其他情況都返回false
return nextc == 0;
}
}
}
Sync類只重寫了兩個(gè)方法泞辐,tryAcquireShared與tryReleaseShared笔横。
tryAcquireShared是用戶控制acquire流程的方法竞滓,此處將其重寫為當(dāng)state為0時(shí)返回1,否則返回-1.只有當(dāng)doAcquireShared小于0的時(shí)候才會(huì)執(zhí)行doAcquireSharedInterruptibly或者doAcquireShared吹缔。也就是說虽界,當(dāng)state不為0的時(shí)候,就可以將調(diào)用countDownLatch的await方法的線程進(jìn)行阻塞涛菠。
tryReleaseShared方法則是設(shè)計(jì)為countDown方法所使用。當(dāng)state調(diào)用countDown之后減1為0 則返回true撇吞。這樣才會(huì)執(zhí)行doReleaseShared方法俗冻,將前面阻塞的Node的線程都喚醒。
3.構(gòu)造方法
此方法用于初始化CountDownLatch牍颈。
public CountDownLatch(int count) {
//校驗(yàn)count不能為負(fù)數(shù)迄薄。
if (count < 0) throw new IllegalArgumentException("count < 0");
//之后new Sync
this.sync = new Sync(count);
}
count的值就是AQS中state的值。
4.其他方法
4.1 await
此處調(diào)用acquireSharedInterruptibly方法煮岁。
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
acquireSharedInterruptibly方法除了一開始判斷中斷狀態(tài)之外讥蔽,實(shí)際上調(diào)用的是前面重寫的tryAcquireShared方法。
4.2 await(long timeout, TimeUnit unit)
public boolean await(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}
此方法與前面的同理画机,只是加入了超時(shí)時(shí)間冶伞。
4.3 countDown
public void countDown() {
sync.releaseShared(1);
}
此方法調(diào)用前面重寫的releaseShared,當(dāng)減1之后值為0的時(shí)候步氏,就調(diào)用doReleaseShard响禽,將wait的線程喚醒。
4.4 getCount
public long getCount() {
return sync.getCount();
}
這個(gè)getCount主要是返回state的值荚醒。此方法也重寫在Sync中了芋类。
int getCount() {
return getState();
}
5. 總結(jié)
CountDownLatch是在AQS基礎(chǔ)之上實(shí)現(xiàn)的一個(gè)倒計(jì)時(shí)器,這個(gè)類先初始化count界阁,之后在state不為0的時(shí)候?qū)⒄{(diào)用await的線程阻塞侯繁,之后當(dāng)其他線程調(diào)用countDown的時(shí)候,回逐漸將state減少泡躯,直到state為0的時(shí)候贮竟,將之前被阻塞的線程喚醒。
這是AQS基于共享模式的一種實(shí)現(xiàn)精续,所謂共享模式就是對(duì)于AQS的操作坝锰,不關(guān)心state為非0的時(shí)候,獲得資源的線程究竟是誰(shuí)重付,只用關(guān)心state的狀態(tài)顷级。如果是獨(dú)占模式,則除了關(guān)心state的狀態(tài)之外确垫,還需要關(guān)心獲得阻塞的線程是誰(shuí)弓颈。
也就是說帽芽,ReentrantLock,如果一旦被線程持有,那么其他任何線程在想要獲得鎖的時(shí)候翔冀,就會(huì)返回失敗导街,只有這個(gè)線程自己可以再次重入繼續(xù)獲得鎖,從而將鏈表增加纤子。而共享模式則只用關(guān)心state的狀態(tài)搬瑰。