【Java并發(fā)】 - CountDownLatch使用以及原理
概述
CountDownLatch是一個(gè)用來(lái)控制并發(fā)的很常見(jiàn)的工具岁钓,它允許一個(gè)或者多個(gè)線程等待其他的線程執(zhí)行到某一操作,比如說(shuō)需要去解析一個(gè)excel的數(shù)據(jù)血淌,為了更快的解析則每個(gè)sheet都使用一個(gè)線程去進(jìn)行解析顽馋,但是最后的匯總數(shù)據(jù)的工作則需要等待每個(gè)sheet的解析工作完成之后才能進(jìn)行谓厘,這就可以使用CountDownLatch。
使用
例子:
這里有三個(gè)線程(main寸谜,thread1竟稳,thread2),其中main線程將調(diào)用countDownLatch的await方法去等待另外兩個(gè)線程的某個(gè)操作的結(jié)束(調(diào)用countDownLatch的countDown方法)熊痴。
public class CountDownLatchDemo {
public static void main(String[] args) throws InterruptedException{
CountDownLatch countDownLatch = new CountDownLatch(2){
@Override
public void await() throws InterruptedException {
super.await();
System.out.println(Thread.currentThread().getName() + " count down is ok");
}
};
Thread thread1 = new Thread(new Runnable() {
@Override
public void run() {
//do something
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + " is done");
countDownLatch.countDown();
}
}, "thread1");
Thread thread2 = new Thread(new Runnable() {
@Override
public void run() {
//do something
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + " is done");
countDownLatch.countDown();
}
}, "thread2");
thread1.start();
thread2.start();
countDownLatch.await();
}
}
輸出:
test thread1 is done
test thread2 is done
test thread3 is done
test thread3 count down is ok
main count down is ok
實(shí)現(xiàn)原理
CountDownLatch類實(shí)際上是使用計(jì)數(shù)器的方式去控制的他爸,不難想象當(dāng)我們初始化CountDownLatch的時(shí)候傳入了一個(gè)int變量這個(gè)時(shí)候在類的內(nèi)部初始化一個(gè)int的變量,每當(dāng)我們調(diào)用countDownt()方法的時(shí)候就使得這個(gè)變量的值減1果善,而對(duì)于await()方法則去判斷這個(gè)int的變量的值是否為0诊笤,是則表示所有的操作都已經(jīng)完成,否則繼續(xù)等待岭埠。
實(shí)際上如果了解AQS的話應(yīng)該很容易想到可以使用AQS的共享式獲取同步狀態(tài)的方式來(lái)完成這個(gè)功能盏混。而CountDownLatch實(shí)際上也就是這么做的。
從結(jié)構(gòu)上來(lái)看CountDownLatch的實(shí)現(xiàn)還是很簡(jiǎn)單的惜论,通過(guò)很常見(jiàn)的繼承AQS的方式來(lái)完成自己的同步器许赃。
CountDownLatch的同步器實(shí)現(xiàn):
private static final class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 4982264981922014374L;
//初始化state
Sync(int count) {
setState(count);
}
int getCount() {
return getState();
}
//嘗試獲取同步狀態(tài)
//只有當(dāng)同步狀態(tài)為0的時(shí)候返回大于0的數(shù)1
//同步狀態(tài)不為0則返回-1
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
//自旋+CAS的方式釋放同步狀態(tài)
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c-1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
}
比較關(guān)鍵的地方是tryAquireShared()方法的實(shí)現(xiàn),因?yàn)樵诟割惖腁QS中aquireShared()方法在調(diào)用tryAquireShared()方法的時(shí)候的判斷依據(jù)是返回值是否大于零馆类。
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
//失敗則進(jìn)入等待隊(duì)列
doAcquireShared(arg);
}
同步器的實(shí)現(xiàn)相對(duì)都比較簡(jiǎn)單混聊,主要思路和上面基本一致。
CountDownLatch的主要方法(本身代碼量就很少就直接貼了)
public class CountDownLatch {
private static final class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 4982264981922014374L;
Sync(int count) {
setState(count);
}
int getCount() {
return getState();
}
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c-1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
}
private final Sync sync;
//初始化一個(gè)同步器
public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
this.sync = new Sync(count);
}
//調(diào)用同步器的acquireSharedInterruptibly方法
//并且是響應(yīng)中斷的
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
//調(diào)用同步器的releaseShared方法去讓state減1
public void countDown() {
sync.releaseShared(1);
}
//獲取剩余的count
public long getCount() {
return sync.getCount();
}
public String toString() {
return super.toString() + "[Count = " + sync.getCount() + "]";
}
}
最后:由于CountDownLatch需要開(kāi)發(fā)人員很明確需要等待的條件乾巧,否則很容易造成await()方法一直阻塞的情況句喜。