????????CountDownLatch中count down是倒數(shù)的意思,latch則是門閂的含義。整體含義可以理解為倒數(shù)的門栓晌端,似乎有一點“三二一,芝麻開門”的感覺恬砂。CountDownLatch的作用也是如此,在構造CountDownLatch的時候需要傳入一個整數(shù)n,在這個整數(shù)“倒數(shù)”到0之前诈乒,主線程需要等待在門口,而這個“倒數(shù)”過程則是由各個執(zhí)行線程驅(qū)動的梧奢,每個線程執(zhí)行完一個任務“倒數(shù)”一次⊙菅鳎總結來說亲轨,CountDownLatch的作用就是等待其他的線程都執(zhí)行完任務,必要時可以對各個任務的執(zhí)行結果進行匯總鸟顺,然后主線程才繼續(xù)往下執(zhí)行惦蚊。
????????CountDownLatch主要有兩個方法:countDown()和await()。countDown()方法用于使計數(shù)器減一讯嫂,其一般是執(zhí)行任務的線程調(diào)用蹦锋,await()方法則使調(diào)用該方法的線程處于等待狀態(tài),其一般是主線程調(diào)用欧芽。這里需要注意的是莉掂,countDown()方法并沒有規(guī)定一個線程只能調(diào)用一次,當同一個線程調(diào)用多次countDown()方法時千扔,每次都會使計數(shù)器減一巫湘;另外,await()方法也并沒有規(guī)定只能有一個線程執(zhí)行該方法昏鹃,如果多個線程同時執(zhí)行await()方法尚氛,那么這幾個線程都將處于等待狀態(tài),并且以共享模式享有同一個鎖洞渤。如下是其使用示例:
public class CountDownLatchExample {
public static void main(String[] args) throws InterruptedException {
CountDownLatch latch = new CountDownLatch(5);
Service service = new Service(latch);
Runnable task = () -> service.exec();
for (int i = 0; i < 5; i++) {
Thread thread = new Thread(task);
thread.start();
}
System.out.println("main thread await. ");
latch.await();
System.out.println("main thread finishes await. ");
}
}
public class Service {
private CountDownLatch latch;
public Service(CountDownLatch latch) {
this.latch = latch;
}
public void exec() {
try {
System.out.println(Thread.currentThread().getName() + " execute task. ");
sleep(2);
System.out.println(Thread.currentThread().getName() + " finished task. ");
} finally {
latch.countDown();
}
}
private void sleep(int seconds) {
try {
TimeUnit.SECONDS.sleep(seconds);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
????????在上面的例子中阅嘶,首先聲明了一個CountDownLatch對象,并且由主線程創(chuàng)建了5個線程载迄,分別執(zhí)行任務讯柔,在每個任務中,當前線程會休眠2秒护昧。在啟動線程之后魂迄,主線程調(diào)用了CountDownLatch.await()方法,此時惋耙,主線程將在此處等待創(chuàng)建的5個線程執(zhí)行完任務之后才繼續(xù)往下執(zhí)行捣炬。如下是執(zhí)行結果:
Thread-0 execute task.
Thread-1 execute task.
Thread-2 execute task.
Thread-3 execute task.
Thread-4 execute task.
main thread await.
Thread-0 finished task.
Thread-4 finished task.
Thread-3 finished task.
Thread-1 finished task.
Thread-2 finished task.
main thread finishes await.
????????從輸出結果可以看出,主線程先啟動了五個線程绽榛,然后主線程進入等待狀態(tài)湿酸,當這五個線程都執(zhí)行完任務之后主線程才結束了等待。上述代碼中需要注意的是灭美,在執(zhí)行任務的線程中推溃,使用了try...finally結構,該結構可以保證創(chuàng)建的線程發(fā)生異常時CountDownLatch.countDown()方法也會執(zhí)行届腐,也就保證了主線程不會一直處于等待狀態(tài)铁坎。
????????CountDownLatch非常適合于對任務進行拆分蜂奸,使其并行執(zhí)行,比如某個任務執(zhí)行2s硬萍,其對數(shù)據(jù)的請求可以分為五個部分扩所,那么就可以將這個任務拆分為5個子任務,分別交由五個線程執(zhí)行襟铭,執(zhí)行完成之后再由主線程進行匯總碌奉,此時,總的執(zhí)行時間將決定于執(zhí)行最慢的任務寒砖,平均來看赐劣,還是大大減少了總的執(zhí)行時間。
????????另外一種比較合適使用CountDownLatch的地方是使用某些外部鏈接請求數(shù)據(jù)的時候哩都,比如圖片魁兼。在本人所從事的項目中就有類似的情況,因為我們使用的圖片服務只提供了獲取單個圖片的功能漠嵌,而每次獲取圖片的時間不等咐汞,一般都需要1.5s~2s。當我們需要批量獲取圖片的時候儒鹿,比如列表頁需要展示一系列的圖片化撕,如果使用單個線程順序獲取,那么等待時間將會極長约炎,此時我們就可以使用CountDownLatch對獲取圖片的操作進行拆分植阴,并行的獲取圖片,這樣也就縮短了總的獲取時間圾浅。
????????CountDownLatch是基于AbstractQueuedSynchronizer實現(xiàn)的掠手,在AbstractQueuedSynchronizer中維護了一個volatile類型的整數(shù)state,volatile可以保證多線程環(huán)境下該變量的修改對每個線程都可見狸捕,并且由于該屬性為整型喷鸽,因而對該變量的修改也是原子的。創(chuàng)建一個CountDownLatch對象時灸拍,所傳入的整數(shù)n就會賦值給state屬性做祝,當countDown()方法調(diào)用時,該線程就會嘗試對state減一株搔,而調(diào)用await()方法時剖淀,當前線程就會判斷state屬性是否為0,如果為0纤房,則繼續(xù)往下執(zhí)行,如果不為0翻诉,則使當前線程進入等待狀態(tài)炮姨,直到某個線程將state屬性置為0捌刮,其就會喚醒在await()方法中等待的線程。如下是countDown()方法的源代碼:
public void countDown() {
sync.releaseShared(1);
}
????????這里sync也即一個繼承了AbstractQueuedSynchronizer的類實例舒岸,該類是CountDownLatch的一個內(nèi)部類绅作,其聲明如下:
private static final class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 4982264981922014374L;
Sync(int count) {
setState(count);
}
int getCount() {
return getState();
}
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
protected boolean tryReleaseShared(int releases) {
for (;;) {
int c = getState(); // 獲取當前state屬性的值
if (c == 0) // 如果state為0,則說明當前計數(shù)器已經(jīng)計數(shù)完成蛾派,直接返回
return false;
int nextc = c-1;
if (compareAndSetState(c, nextc)) // 使用CAS算法對state進行設置
return nextc == 0; // 設置成功后返回當前是否為最后一個設置state的線程
}
}
}
????????這里tryReleaseShared(int)方法即對state屬性進行減一操作的代碼俄认。可以看到洪乍,CAS也即compare and set的縮寫眯杏,jvm會保證該方法的原子性,其會比較state是否為c壳澳,如果是則將其設置為nextc(自減1)岂贩,如果state不為c,則說明有另外的線程在getState()方法和compareAndSetState()方法調(diào)用之間對state進行了設置巷波,當前線程也就沒有成功設置state屬性的值萎津,其會進入下一次循環(huán)中,如此往復抹镊,直至其成功設置state屬性的值锉屈,即countDown()方法調(diào)用成功。
????????在countDown()方法中調(diào)用的sync.releaseShared(1)調(diào)用時實際還是調(diào)用的tryReleaseShared(int)方法垮耳,如下是releaseShared(int)方法的實現(xiàn):
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
????????可以看到颈渊,在執(zhí)行sync.releaseShared(1)方法時,其在調(diào)用tryReleaseShared(int)方法時會在無限for循環(huán)中設置state屬性的值氨菇,設置成功之后其會根據(jù)設置的返回值(此時state已經(jīng)自減了一)儡炼,即當前線程是否為將state屬性設置為0的線程,來判斷是否執(zhí)行if塊中的代碼查蓉。doReleaseShared()方法主要作用是喚醒調(diào)用了await()方法的線程乌询。需要注意的是,如果有多個線程調(diào)用了await()方法豌研,這些線程都是以共享的方式等待在await()方法處的妹田,試想,如果以獨占的方式等待鹃共,那么當計數(shù)器減少至零時鬼佣,就只有一個線程會被喚醒執(zhí)行await()之后的代碼,這顯然不符合邏輯霜浴。如下是doReleaseShared()方法的實現(xiàn)代碼:
private void doReleaseShared() {
for (;;) {
Node h = head; // 記錄等待隊列中的頭結點的線程
if (h != null && h != tail) { // 頭結點不為空晶衷,且頭結點不等于尾節(jié)點
int ws = h.waitStatus;
if (ws == Node.SIGNAL) { // SIGNAL狀態(tài)表示當前節(jié)點正在等待被喚醒
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) // 清除當前節(jié)點的等待狀態(tài)
continue;
unparkSuccessor(h); // 喚醒當前節(jié)點的下一個節(jié)點
} else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue;
}
if (h == head) // 如果h還是指向頭結點,說明前面這段代碼執(zhí)行過程中沒有其他線程對頭結點進行過處理
break;
}
}
????????在doReleaseShared()方法中(始終注意當前方法是最后一個執(zhí)行countDown()方法的線程執(zhí)行的),首先判斷頭結點不為空晌纫,且不為尾節(jié)點税迷,說明等待隊列中有等待喚醒的線程,這里需要說明的是锹漱,在等待隊列中箭养,頭節(jié)點中并沒有保存正在等待的線程,其只是一個空的Node對象哥牍,真正等待的線程是從頭節(jié)點的下一個節(jié)點開始存放的毕泌,因而會有對頭結點是否等于尾節(jié)點的判斷。在判斷等待隊列中有正在等待的線程之后嗅辣,其會清除頭結點的狀態(tài)信息撼泛,并且調(diào)用unparkSuccessor(Node)方法喚醒頭結點的下一個節(jié)點,使其繼續(xù)往下執(zhí)行辩诞。如下是unparkSuccessor(Node)方法的具體實現(xiàn):
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0); // 清除當前節(jié)點的等待狀態(tài)
Node s = node.next;
if (s == null || s.waitStatus > 0) { // s的等待狀態(tài)大于0說明該節(jié)點中的線程已經(jīng)被外部取消等待了
s = null;
// 從隊列尾部往前遍歷坎弯,找到最后一個處于等待狀態(tài)的節(jié)點,用s記錄下來
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread); // 喚醒離傳入節(jié)點最近的處于等待狀態(tài)的節(jié)點線程
}
????????可以看到译暂,unparkSuccessor(Node)方法的作用是喚醒離傳入節(jié)點最近的一個處于等待狀態(tài)的線程抠忘,使其繼續(xù)往下執(zhí)行。前面我們講到過外永,等待隊列中的線程可能有多個崎脉,而調(diào)用countDown()方法的線程只喚醒了一個處于等待狀態(tài)的線程,這里剩下的等待線程是如何被喚醒的呢伯顶?其實這些線程是被當前喚醒的線程喚醒的囚灼。具體的我們可以看看await()方法的具體執(zhí)行過程。如下是await()方法的代碼:
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
????????await()方法實際還是調(diào)用了Sync對象的方法acquireSharedInterruptibly(int)方法祭衩,如下是該方法的具體實現(xiàn):
public final void acquireSharedInterruptibly(int arg) throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
????????可以看到acquireSharedInterruptibly(int)方法判斷當前線程是否需要以共享狀態(tài)獲取執(zhí)行權限灶体,這里tryAcquireShared(int)方法是AbstractQueuedSynchronizer中的一個模板方法,其具體實現(xiàn)在前面的Sync類中掐暮,可以看到蝎抽,其主要是判斷state是否為零,如果為零則返回1路克,表示當前線程不需要進行權限獲取樟结,可直接執(zhí)行后續(xù)代碼,返回-1則表示當前線程需要進行共享權限精算。具體的獲取執(zhí)行權限的代碼在doAcquireSharedInterruptibly(int)方法中瓢宦,如下是該方法的具體實現(xiàn):
private void doAcquireSharedInterruptibly(int arg) throws InterruptedException {
final Node node = addWaiter(Node.SHARED); // 使用當前線程創(chuàng)建一個共享模式的節(jié)點
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor(); // 獲取當前節(jié)點的前一個節(jié)點
if (p == head) { // 判斷前一個節(jié)點是否為頭結點
int r = tryAcquireShared(arg); // 查看當前線程是否獲取到了執(zhí)行權限
if (r >= 0) { // 大于0表示獲取了執(zhí)行權限
setHeadAndPropagate(node, r); // 將當前節(jié)點設置為頭結點,并且喚醒后面處于等待狀態(tài)的節(jié)點
p.next = null; // help GC
failed = false;
return;
}
}
// 走到這一步說明沒有獲取到執(zhí)行權限灰羽,就使當前線程進入“擱置”狀態(tài)
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
????????在doAcquireSharedInterruptibly(int)方法中驮履,首先使用當前線程創(chuàng)建一個共享模式的節(jié)點鱼辙。然后在一個for循環(huán)中判斷當前線程是否獲取到執(zhí)行權限,如果有(r >= 0判斷)則將當前節(jié)點設置為頭節(jié)點疲吸,并且喚醒后續(xù)處于共享模式的節(jié)點座每;如果沒有前鹅,則對調(diào)用shouldParkAfterFailedAcquire(Node, Node)和parkAndCheckInterrupt()方法使當前線程處于“擱置”狀態(tài)摘悴,該“擱置”狀態(tài)是由操作系統(tǒng)進行的,這樣可以避免該線程無限循環(huán)而獲取不到執(zhí)行權限舰绘,造成資源浪費蹂喻,這里也就是線程處于等待狀態(tài)的位置,也就是說當線程被阻塞的時候就是阻塞在這個位置捂寿。當有多個線程調(diào)用await()方法而進入等待狀態(tài)時口四,這幾個線程都將等待在此處。這里回過頭來看前面將的countDown()方法秦陋,其會喚醒處于等待隊列中離頭節(jié)點最近的一個處于等待狀態(tài)的線程蔓彩,也就是說該線程被喚醒之后會繼續(xù)從這個位置開始往下執(zhí)行,此時執(zhí)行到tryAcquireShared(int)方法時驳概,發(fā)現(xiàn)r大于0(因為state已經(jīng)被置為0了)赤嚼,該線程就會調(diào)用setHeadAndPropagate(Node, int)方法,并且退出當前循環(huán)顺又,也就開始執(zhí)行awat()方法之后的代碼更卒。這里我們看看setHeadAndPropagate(Node, int)方法的具體實現(xiàn):
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head;
setHead(node); // 將當前節(jié)點設置為頭節(jié)點
// 檢查喚醒過程是否需要往下傳遞,并且檢查頭結點的等待狀態(tài)
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
if (s == null || s.isShared()) // 如果下一個節(jié)點是嘗試以共享狀態(tài)獲取獲取執(zhí)行權限的節(jié)點稚照,則將其喚醒
doReleaseShared();
}
}
????????setHeadAndPropagate(Node, int)方法主要作用是設置當前節(jié)點為頭結點蹂空,并且將喚醒工作往下傳遞,在傳遞的過程中果录,其會判斷被傳遞的節(jié)點是否是以共享模式嘗試獲取執(zhí)行權限的上枕,如果不是,則傳遞到該節(jié)點處為止(一般情況下弱恒,等待隊列中都只會都是處于共享模式或者處于獨占模式的節(jié)點)辨萍。也就是說,頭結點會依次喚醒后續(xù)處于共享狀態(tài)的節(jié)點斤彼,這也就是共享鎖與獨占鎖的實現(xiàn)方式分瘦。這里doReleaseShared()方法也就是我們前面講到的會將離頭結點最近的一個處于等待狀態(tài)的節(jié)點喚醒的方法。