1. 簡介
在上篇中我們介紹了SyclicBarrier類的使用,通過SyclicBarrier我們可以完成一些分批執(zhí)行匯總的任務昨忆,而此次介紹的CountDownLatch則是實現(xiàn)類似“倒計時”的功能室叉。
2. Api分析
CountDownLatch源碼很簡潔,它提供兩個典型的方法來實現(xiàn)“倒計時功能”。CountDownLatch在構(gòu)造方法中指定門閂鎖latch的個數(shù)术裸,當latch的個數(shù)為0的時候則喚醒所有等待狀態(tài)下的線程旭蠕。
- countDown():通過調(diào)用countDown方法實現(xiàn)門閂鎖-1的效果停团。
- await():讓當前線程進入等待。
- getCount():獲取門閂鎖的個數(shù)掏熬。
下面我們通過一個示例代碼來看一下:
public class MyTest {
static CountDownLatch countDownLatch;
public static void main(String[]args){
countDownLatch = new CountDownLatch(3);
for(int i=0;i<3;i++){
new Thread(new Runnable() {
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + "--到達車門");
countDownLatch.countDown();
System.out.println(Thread.currentThread().getName() + "--已登車");
}
}).start();
}
try {
countDownLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + "--登車完畢");
}
}
在示例代碼中佑稠,通過構(gòu)造函數(shù)創(chuàng)建一個指定latch個數(shù)為3的CountDownLatch的方法,同時創(chuàng)建了三個線程旗芬,在每個線程的內(nèi)部分別調(diào)用一次countDown()方法舌胶,最后在主線程中調(diào)用await方法進行等待阻塞,最后完成輸出疮丛。
執(zhí)行結(jié)果:
Thread-2--到達車門
Thread-2--已登車
Thread-0--到達車門
Thread-0--已登車
Thread-1--到達車門
main--登車完畢
Thread-1--已登車
從上面可以得知CountDownLatch以下特點:
- 線程中執(zhí)行countDown方法后能繼續(xù)執(zhí)行后續(xù)代碼幔嫂,線程不會阻塞等待;
- latch個數(shù)為0的時候會立即喚醒await等待的線程这刷;
3. CountDownLatch源碼解析
結(jié)合上面對CountDownLatch功能的描述婉烟,假如讓我們實現(xiàn)一個這樣的功能,我們首先想到的思路應該是在工具類中維持一個Count計數(shù)變量暇屋,然后維持對該變量的判斷似袁。那么我們來看看CountDownLatch是怎么實現(xiàn)的。
public class CountDownLatch {
/**
* Synchronization control For CountDownLatch.
* Uses AQS state to represent count.
*/
private static final class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 4982264981922014374L;
Sync(int count) {
setState(count);
}
int getCount() {
return getState();
}
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c-1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
}
private final Sync sync;
public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
this.sync = new Sync(count);
}
/**
* 讓當前線程進行等待咐刨,直到鎖個數(shù)為0或者當前線程interrupt
* 如果當前鎖latch個數(shù)為0昙衅,則立即返回
*/
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
public boolean await(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}
/**
* 減少latch鎖,當latch鎖個數(shù)為0的時候釋放所有等待線程定鸟。
*/
public void countDown() {
sync.releaseShared(1);
}
public long getCount() {
return sync.getCount();
}
public String toString() {
return super.toString() + "[Count = " + sync.getCount() + "]";
}
}
首先從構(gòu)造方法來看而涉,CountDownLatch提供了有參構(gòu)造函數(shù):
public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
this.sync = new Sync(count);
}
這里的Sync類是CountDownLatch內(nèi)部集成AQS實現(xiàn)的一個內(nèi)部類。
Sync(int count) {
setState(count);
}
在Sync的構(gòu)造函數(shù)中联予,調(diào)用的是AQS的setState方法修改state值啼县,將state值修改為count值。
通過上面的介紹沸久,對于CountDownLatch類來說季眷,使用最多的就是countDown和await方法。
countDown()方法
public void countDown() {
sync.releaseShared(1);
}
/**AQS類中定義的該方法*/
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
在countDown方法內(nèi)部是調(diào)用Sync的releaseShared方法卷胯,嘗試釋放公共鎖狀態(tài)子刮。
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c-1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
采用自旋的方式,獲取當前state值窑睁,如果state值為0就返回false挺峡,即釋放失敗葵孤。反之則state-1,通過CAS操作成功橱赠,返回nextc == 0尤仍;當最后通過咨詢的方式返回true的時候(state ==0),則執(zhí)行releaseShared方法中的doReleaseShared()方法狭姨,在doReleaseShared()方法內(nèi)部通過unparkSuccessor()方法喚醒阻塞的線程開始執(zhí)行吓著。
await()方法
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
/***AQS中的方法*/
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
這里調(diào)用acquireSharedInterruptibly()方法申請鎖,如果鎖被占有state送挑!=0,則通過doAcquireSharedInterruptibly()進行鎖申請暖眼。
4. CountDownLatch的使用場景
- 確保某個計算在其需要的所有資源都被初始化之后才繼續(xù)執(zhí)行惕耕。
- 確保某個服務在其依賴的所有其他服務都已啟動后才啟動。
- 等待知道某個操作的所有者都就緒在繼續(xù)執(zhí)行诫肠。