CountDownLatch試用場景
來自于Javadoc的解釋
A synchronization aid that allows one or more threads to wait until
a set of operations being performed in other threads completes.
表示一個或者多個線程等待一系列的操作完成。
CountDownLatch是一個同步工具類舷手,用來協(xié)調(diào)多個線程之間的同步着倾,或者說起到線程之間的通信作用拾酝,簡單的說CountDownLatch就是一個計(jì)數(shù)器燕少,能夠使一個或者多個線程等待另外一些線程操作完成之后卡者,再繼續(xù)執(zhí)行,計(jì)數(shù)器的數(shù)量就是線程的數(shù)量客们,當(dāng)每個線程完成自己的任務(wù)之后崇决,計(jì)數(shù)器減一
當(dāng)計(jì)數(shù)器的數(shù)值變?yōu)?的時候,表示所有線程都完成了自己的任務(wù)底挫,等待在CountDownLatch上的線程可以繼續(xù)執(zhí)行自己的任務(wù)恒傻。
例如當(dāng)需要對兩個表進(jìn)行查詢,然后將查詢結(jié)果合并進(jìn)行下一步操作建邓,由于對兩個表的查詢時IO密集型操作盈厘,我們可以考慮使用多線程來提高性能,但是我們需要在兩個查詢操作完成之后通知主線程進(jìn)行合并操作等官边,我們可以用CountDownLatch來完成沸手,代碼如下,通過Thread.sleep(1000)來模擬一個IO耗時的操作
public class CountDownLatchDemo {
static ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(4, 8, 60,
TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(100), new ThreadPoolExecutor.CallerRunsPolicy());
public static void main(String[] args){
CountDownLatch countDownLatch = new CountDownLatch(2);
poolExecutor.execute(() -> {
System.out.println("query from table A");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
countDownLatch.countDown();
});
poolExecutor.execute(() -> {
System.out.println("query from table B");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
countDownLatch.countDown();
});
try {
countDownLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("merge result");
}
}
執(zhí)行結(jié)果為
query from table A
query from table B
merge result
CountDownLatch源碼解析
CountDownLatch通過內(nèi)部類Sync來完成線程間的同步和通信注簿,
private static final class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 4982264981922014374L;
Sync(int count) {
setState(count);
}
int getCount() {
return getState();
}
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c-1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
}
Sync實(shí)現(xiàn)了AQS的兩個模板方法tryAcquireShared和tryReleaseShared契吉,分別表示獲取同步鎖和釋放同步鎖的操作。CountDownLatch中最重要的兩個方法分別為countDown()和await()
countDown方法如下所示
public void countDown() {
sync.releaseShared(1);
}
直接調(diào)用AQS的releaseShared方法來修改同步狀態(tài)诡渴,將state的值減一捐晶。
await方法如下所示
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
直接調(diào)用AQS的acquireSharedInterruptibly方法,該方法如下
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
首先調(diào)用tryAcquireShared方法妄辩,該方法若state為0則返回1惑灵,反之返回-1,即若state為0時眼耀,該方法直接返回泣棋,若state不為0,該方法會執(zhí)行doAcquireSharedInterruptibly方法畔塔,
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
doAcquireSharedInterruptibly方法會自旋的調(diào)用Sync實(shí)現(xiàn)的tryAcquireShared方法潭辈,直至state為0鸯屿,該方法才能返回,以此來達(dá)到阻塞線程的目的把敢。