前言
??JDK中為了處理線程之間的同步問題,除了提供鎖機制之外,還提供了幾個非常有用的并發(fā)工具類:CountDownLatch面殖、CyclicBarrier、Semphore赂毯、Exchanger、Phaser拣宰;
??CountDownLatch党涕、CyclicBarrier、Semphore巡社、Phaser 這四個工具類提供一種并發(fā)流程的控制手段膛堤;而Exchanger工具類則提供了在線程之間交換數(shù)據(jù)的一種手段。
簡介
??CountDownLatch 允許一個或多個線程等待其他線程完成操作晌该。單詞Latch的意思是“門閂”肥荔,所以沒有打開時,N個人是不能進(jìn)入屋內(nèi)的朝群,也就是N個線程是不能往下執(zhí)行的燕耿,從而控制線程執(zhí)行任務(wù)的時機,使線程以“組團(tuán)”的方式一起執(zhí)行任務(wù)潜圃。
??CountDownLatch 類 在創(chuàng)建時缸棵,給定一個計數(shù)count舟茶。線程調(diào)用CountDownLatch 對象的awiat( )方法時谭期,判斷這個計數(shù)count是否為0,如果不為0吧凉,就進(jìn)入等待狀態(tài)隧出。其他線程在完成一定任務(wù)時,調(diào)用CountDownLatch 的countDown()方法阀捅,使計數(shù)count減一胀瞪。直到count的值等于0或者少于0時,便是等待線程的運行時機,將會繼續(xù)往下運行凄诞。
CountDownLatch的API接口
方法名稱 描 述
void await() 使當(dāng)前線程在鎖存器倒計數(shù)至零之前一直等待圆雁,除非線程被中斷。
boolean await(long timeout, TimeUnit unit) 使當(dāng)前線程在鎖存器倒計數(shù)至零之前一直等待帆谍,除非線程被中斷或超出了指定的等待時間伪朽。
void countDown() 遞減鎖存器的計數(shù),如果計數(shù)到達(dá)零汛蝙,則釋放所有等待的線程烈涮。
long getCount() 返回當(dāng)前計數(shù)。
String toString() 返回標(biāo)識此鎖存器及其狀態(tài)的字符串窖剑。
注意:
await()也可以被多個線程同時調(diào)用坚洽,從而實現(xiàn)多個線程 等待其他的多個線程完成某部分操作。
下面是API文檔介紹的兩個經(jīng)典用法:
@ Example1
:
Driver類中創(chuàng)建了一組worker 線程西土,所有的worker線程必須等待Driver類完成初始化動作讶舰,才能往下運行。完成初始化動作后需了,Driver類也必須等待所有worker線程完成才能結(jié)束绘雁。本例子中使用了兩個CountDownLatch類:
startSignal是一個啟動信號,在 driver 為繼續(xù)執(zhí)行 worker 做好準(zhǔn)備之前援所,它會阻止所有的 worker 繼續(xù)執(zhí)行庐舟。
doneSignal是一個完成信號,它允許 driver 在完成所有 worker 之前一直等待住拭。
class Driver { // ...
void main() throws InterruptedException {
CountDownLatch startSignal = new CountDownLatch(1);
CountDownLatch doneSignal = new CountDownLatch(N);
for (int i = 0; i < N; ++i) // create and start threads
new Thread(new Worker(startSignal, doneSignal)).start();
doSomethingElse(); // don't let run yet
startSignal.countDown(); // let all threads proceed
doSomethingElse();
doneSignal.await(); // wait for all to finish
}
}
class Worker implements Runnable {
private final CountDownLatch startSignal;
private final CountDownLatch doneSignal;
Worker(CountDownLatch startSignal, CountDownLatch doneSignal) {
this.startSignal = startSignal;
this.doneSignal = doneSignal;
}
public void run() {
try {
startSignal.await();
doWork();
doneSignal.countDown();
} catch (InterruptedException ex) {} // return;
}
void doWork() { ... }
}
@ Example2
:另一種典型用法是挪略,將一個問題分成 N 個部分(N個小任務(wù)),然后將這些任務(wù)Runnable交由線程池來完成,每個子任務(wù)執(zhí)行完成滔岳,就計數(shù)一次杠娱,主線程則等待這些子任務(wù)完成。當(dāng)所有的子部分完成后谱煤,主線程就能夠通過 await摊求。(當(dāng)線程必須用這種方法反復(fù)倒計數(shù)時,可改為使用 CyclicBarrier刘离。)
class Driver2 { // ...
void main() throws InterruptedException {
CountDownLatch doneSignal = new CountDownLatch(N);
Executor e = ...
for (int i = 0; i < N; ++i) // create and start threads
e.execute(new WorkerRunnable(doneSignal, i));
doneSignal.await(); // wait for all to finish
}
}
class WorkerRunnable implements Runnable {
private final CountDownLatch doneSignal;
private final int i;
WorkerRunnable(CountDownLatch doneSignal, int i) {
this.doneSignal = doneSignal;
this.i = i;
}
public void run() {
try {
doWork(i);
doneSignal.countDown();
} catch (InterruptedException ex) {} // return;
}
void doWork() { ... }
}
應(yīng)用場景
??假如有這樣一個需求室叉,當(dāng)我們需要解析一個Excel里多個sheet的數(shù)據(jù)時,可以考慮使用多線程硫惕,每個線程解析一個sheet里的數(shù)據(jù)茧痕,等到所有的sheet都解析完之后,程序需要提示解析完成恼除。在這個需求中踪旷,要實現(xiàn)主線程等待所有線程完成sheet的解析操作,最簡單的做法是使用join。代碼如下:
public class JoinCountDownLatchTest {
public static void main(String[] args) throws InterruptedException {
Thread parser1 = new Thread(new Runnable() {
@Override
public void run() {
}
});
Thread parser2 = new Thread(new Runnable() {
@Override
public void run() {
System.out.println("parser2 finish");
}
});
parser1.start();
parser2.start();
parser1.join();
parser2.join();
System.out.println("all parser finish");
}
}
join用于讓當(dāng)前執(zhí)行線程等待join線程執(zhí)行結(jié)束令野。其實現(xiàn)原理是不停檢查join線程是否存活舀患,如果join線程存活則讓當(dāng)前線程永遠(yuǎn)wait,代碼片段如下气破,wait(0)表示永遠(yuǎn)等待下去构舟。
while (isAlive()) {
wait(0);
}
直到j(luò)oin線程中止后,線程的this.notifyAll會被調(diào)用堵幽,調(diào)用notifyAll是在JVM里實現(xiàn)的狗超,所以JDK里看不到,有興趣的同學(xué)可以看看JVM源碼朴下。JDK不推薦在線程實例上使用wait努咐,notify和notifyAll方法。
而在JDK1.5之后的并發(fā)包中提供的
CountDownLatch也可以實現(xiàn)join的這個功能殴胧,并且比join的功能更多渗稍。
public class CountDownLatchTest {
static CountDownLatch c = new CountDownLatch(2);
public static void main(String[] args) throws InterruptedException {
new Thread(new Runnable() {
@Override
public void run() {
System.out.println(1);
c.countDown();
System.out.println(2);
c.countDown();
}
}).start();
c.await();
System.out.println("3");
}
}
CountDownLatch的構(gòu)造函數(shù)接收一個int類型的參數(shù)作為計數(shù)器,如果你想等待N個點完成团滥,這里就傳入N竿屹。
當(dāng)我們調(diào)用一次CountDownLatch的countDown方法時,N就會減1灸姊,CountDownLatch的await會阻塞當(dāng)前線程拱燃,直到N變成零。
由于countDown方法可以用在任何地方力惯,所以這里說的N個點碗誉,可以是N個線程,也可以是1個線程里的N個執(zhí)行步驟
父晶。用在多個線程時哮缺,你只需要把這個CountDownLatch的引用傳遞到線程里。
其他方法:
如果有某個解析sheet的線程處理的比較慢甲喝,我們不可能讓主線程一直等待尝苇,所以我們可以使用另外一個帶指定時間的await方法,await(long time, TimeUnit unit): 這個方法等待特定時間后埠胖,就會不再阻塞當(dāng)前線程糠溜。join也有類似的方法。
注意:
計數(shù)器必須大于等于0押袍,只是等于0時候诵冒,計數(shù)器就是零,調(diào)用await方法時不會阻塞當(dāng)前線程谊惭。CountDownLatch不可能重新初始化或者修改CountDownLatch對象的內(nèi)部計數(shù)器的值。
一個線程調(diào)用countDown方法 happen-before 另外一個線程調(diào)用await方法。
CountDownLatch 的源碼分析
最后圈盔,我們簡單看一下 CountDownLatch是怎么實現(xiàn)的:
public class CountDownLatch {
private final Sync sync;
public CountDownLatch(int count) {//構(gòu)造器
//count少于0將拋出異常
if (count < 0) throw new IllegalArgumentException("count < 0");
this.sync = new Sync(count);
}
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
public void countDown() {
sync.releaseShared(1);
}
//........
}
在創(chuàng)建countDownLatch豹芯,其構(gòu)造器里面創(chuàng)建了一個sync類,并且await()驱敲、countDown方法都是都是通過此類來實現(xiàn)的铁蹈。
private static final class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 4982264981922014374L;
Sync(int count) {
//設(shè)置state的值為countDownLatch的計數(shù)的數(shù)目
setState(count);
}
int getCount() {
return getState();
}
//如果state值為0.也就是計數(shù)完成了,就不可以再獲取共享鎖众眨,這也是為什么CountLatch只能用一次
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
//是否可以釋放共享鎖
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c-1; //狀態(tài)state減一
if (compareAndSetState(c, nextc))
return nextc == 0;//計數(shù)到0了握牧,表示釋放鎖成功。
}
}
}
與大部分的并發(fā)工具類一樣娩梨,都是繼承使用了JDK提供的強大的AQS框架類AbstractQueuedSynchronizer沿腰,而且使用的還是共享鎖,共享鎖能允許線程進(jìn)入的線程數(shù)目狈定,就是CountDownLatch傳入的參數(shù)颂龙。