countdownlatch是JDK提供的一個線程控制的工具類,雖然代碼短少琼掠,實現(xiàn)簡單腋腮,但是它的作用卻十分的大勒虾。
1.從一個例子開始####
1.現(xiàn)有一文件,文件的大小超過100G瘸彤,現(xiàn)在的需求是修然,計算文件中每一行數(shù)據(jù)的MD5值。
2.現(xiàn)在要實現(xiàn)一個RPC請求模型质况,要求實現(xiàn)愕宋,RPC過程中的請求超時的判斷和處理流程。
先說第一個場景结榄,第一個場景需要計算所有文件的MD5中贝,但是100G文件處理相對較大,那么我們勢必要利用多線程去并行處理大文件臼朗,并將最后的結(jié)果輸出邻寿。但是如何控制主線程在所有線程結(jié)束之后結(jié)束蝎土,是一個需要思考的過程。
第二個場景绣否,在RPC請求發(fā)出后誊涯,我們需要在一定時間內(nèi)等待請求的響應(yīng),在超時之后沒有響應(yīng)的蒜撮,我們需要拋出異常暴构。
上面兩種場景,其實用wait notify都可以解決段磨,但是實現(xiàn)起來是比較麻煩的取逾,但是用countdownlatch解決起來十分簡單。
拿第一個例子來說苹支,我們簡單的實現(xiàn)一下:
package countdownlatch;
import com.google.common.base.Charsets;
import com.google.common.hash.HashCode;
import com.google.common.hash.HashFunction;
import com.google.common.hash.Hashing;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* 多線程處理一個文件
*/
public class MultiThread {
private static ArrayBlockingQueue<String> blockingQueue = new ArrayBlockingQueue<String>(10);
private static CountDownLatch latch;
private static final int ThreadNum = 10;
static {
for (int i = 0; i < 10; i++) {
blockingQueue.add("test" + i);
}
latch = new CountDownLatch(10);
}
/**
* 用blockQueue中的元素模擬文件分片
* @return
*/
public static String getFileSplit() {
return blockingQueue.poll();
}
static class myThread implements Runnable {
public void run() {
System.out.println(Thread.currentThread().getName() + "begin running...");
String m = getFileSplit();
HashFunction hf = Hashing.md5();
HashCode hc = hf.newHasher()
.putString(m, Charsets.UTF_8)
.hash();
System.out.println(hc.toString());
try {
Thread.currentThread().sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
latch.countDown();
System.out.println(Thread.currentThread().getName() + "ended");
}
}
public static void main(String args[]){
System.out.println("主線程開始運(yùn)行");
ExecutorService service = Executors.newFixedThreadPool(10);
for (int i=0;i<ThreadNum;i++){
service.execute(new Thread(new myThread()));
}
service.shutdown();
System.out.println("線程已經(jīng)全部運(yùn)行");
System.out.println("等待所有線程運(yùn)行結(jié)束");
try {
latch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("主線程退出");
}
}
輸出是這樣的:
主線程開始運(yùn)行
線程已經(jīng)全部運(yùn)行
等待所有線程運(yùn)行結(jié)束
pool-1-thread-2begin running...
pool-1-thread-6begin running...
pool-1-thread-1begin running...
pool-1-thread-3begin running...
pool-1-thread-5begin running...
pool-1-thread-9begin running...
pool-1-thread-8begin running...
pool-1-thread-10begin running...
pool-1-thread-7begin running...
pool-1-thread-4begin running...
b04083e53e242626595e2b8ea327e525
5e40d09fa0529781afd1254a42913847
8ad8757baa8564dc136c1e07507f4a98
86985e105f79b95d6bc918fb45ec7727
739969b53246b2c727850dbb3490ede6
5a105e8b9d40e1329780d62ea2265d8a
4cfad7076129962ee70c36839a1e3e15
ad0234829205b9033196ba818f7a872b
f6f4061a1bddc1c04d8109b39f581270
e3d704f3542b44a621ebed70dc0efe13
pool-1-thread-3ended
pool-1-thread-2ended
pool-1-thread-10ended
pool-1-thread-4ended
pool-1-thread-7ended
pool-1-thread-5ended
pool-1-thread-6ended
pool-1-thread-8ended
pool-1-thread-1ended
pool-1-thread-9ended
主線程退出
從輸出我們可以看出砾隅,主線程確實是等所有線程結(jié)束后才退出的,這也正是我們預(yù)期的結(jié)果沐序,有的童鞋就說了琉用,我可以利用join實現(xiàn)和你一樣的效果,但是Join是基于wait實現(xiàn)的策幼,每一個線程join另一個線程就會有一個線程進(jìn)入wait狀態(tài)邑时,也就是說同一時刻只有一個線程在運(yùn)行,多余的CPU都浪費掉了特姐,這顯然不是很合理晶丘。
2.說說countdownlatch的API####
countdownlatch的API真的很少,下圖是這個工具類的完整結(jié)構(gòu)唐含。
我們可以看到主要方法有三個:await()浅浮,await(long, TimeUnit),countDown()
await():阻塞當(dāng)前線程捷枯,直到latch的值為0滚秩,或當(dāng)前線程被中斷
* Causes the current thread to wait until the latch has counted down to
* zero, unless the thread is {@linkplain Thread#interrupt interrupted}.
await(long, TimeUnit):阻塞當(dāng)前線程,知道latch為0淮捆,線程被中斷郁油,或者超時。
* Causes the current thread to wait until the latch has counted down to
* zero, unless the thread is {@linkplain Thread#interrupt interrupted},
* or the specified waiting time elapses.
countDown():使latch的值減小1
Decrements the count of the latch, releasing all waiting threads if
* the count reaches zero.
3.說說countdownlatch的實現(xiàn)
countdownlatch其實是基于同步器AbstractQueuedSynchronizer實現(xiàn)的攀痊,ReentrantLock其實也是基于AbstractQueuedSynchronizer實現(xiàn)的桐腌,那么好像預(yù)示了什么。
首先看構(gòu)造函數(shù):
/**
* Constructs a {@code CountDownLatch} initialized with the given count.
*
* @param count the number of times {@link #countDown} must be invoked
* before threads can pass through {@link #await}
* @throws IllegalArgumentException if {@code count} is negative
*/
public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
this.sync = new Sync(count);
}
構(gòu)造函數(shù)的參數(shù)是一個整數(shù)值苟径,意思是說需要多少個latch案站。
實體化Sync,sync是countdownlatch的內(nèi)部類棘街,它繼承了AbstractQueuedSynchronizer蟆盐。
Sync(int count) {
setState(count);
}
主要是將latch的值賦予AbstractQueuedSynchronizer的State
再看await()方法:
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
await()內(nèi)調(diào)用了 sync.acquireSharedInterruptibly(1) 承边;
/**
* Acquires in shared mode, aborting if interrupted. Implemented
* by first checking interrupt status, then invoking at least once
* {@link #tryAcquireShared}, returning on success. Otherwise the
* thread is queued, possibly repeatedly blocking and unblocking,
* invoking {@link #tryAcquireShared} until success or the thread
* is interrupted.
* @param arg the acquire argument
* This value is conveyed to {@link #tryAcquireShared} but is
* otherwise uninterpreted and can represent anything
* you like.
* @throws InterruptedException if the current thread is interrupted
*/
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
這里先檢測了線程中斷狀態(tài),中斷了則拋出異常舱禽,接下來調(diào)用tryAcquireShared炒刁,tryAcquireShared是Syn的實現(xiàn)的:
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
其實就是簡單的獲取了同步器的state,判斷是否為0誊稚,之前博客里面有寫ReentrantLock翔始,兩者的機(jī)制是一樣的。因為countDownLacth實例化之后的State一般不是0里伯,那么此方法返回-1.進(jìn)入doAcquireSharedInterruptibly:
/**
/**
* Acquires in shared interruptible mode.
* @param arg the acquire argument
*/
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
這段代碼是比較熟悉的在ReentrantLock中分析過城瞎,這里關(guān)鍵的點是parkAndCheckInterrupt()
/**
* Convenience method to park and then check if interrupted
*
* @return {@code true} if interrupted
*
*/
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
執(zhí)行到此處時,線程會阻塞疾瓮,知道有其他線程喚醒此線程脖镀,執(zhí)行await之后,上文中的主線程阻塞在這狼电。
接下來分析下countDown():
/**
* Decrements the count of the latch, releasing all waiting threads if
* the count reaches zero.
*
* <p>If the current count is greater than zero then it is decremented.
* If the new count is zero then all waiting threads are re-enabled for
* thread scheduling purposes.
*
* <p>If the current count equals zero then nothing happens.
*/
public void countDown() {
sync.releaseShared(1);
}
調(diào)用了Sync的releaseShared:
/**
* Releases in shared mode. Implemented by unblocking one or more
* threads if {@link #tryReleaseShared} returns true.
*
* @param arg the release argument. This value is conveyed to
* {@link #tryReleaseShared} but is otherwise uninterpreted
* and can represent anything you like.
* @return the value returned from {@link #tryReleaseShared}
*/
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
接下來是tryReleaseShared
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c-1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
此方法是用CAS減小State的值蜒灰。如果State=0那么嘗試喚醒等待線程,執(zhí)行doReleaseShared:
/**
* Release action for shared mode -- signal successor and ensure
* propagation. (Note: For exclusive mode, release just amounts
* to calling unparkSuccessor of head if it needs signal.)
*/
private void doReleaseShared() {
/*
* Ensure that a release propagates, even if there are other
* in-progress acquires/releases. This proceeds in the usual
* way of trying to unparkSuccessor of head if it needs
* signal. But if it does not, status is set to PROPAGATE to
* ensure that upon release, propagation continues.
* Additionally, we must loop in case a new node is added
* while we are doing this. Also, unlike other uses of
* unparkSuccessor, we need to know if CAS to reset status
* fails, if so rechecking.
*/
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h);
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}
這里需要關(guān)注一點就是unparkSuccessor肩碟,這個方法是喚醒上文中的主線程强窖。至此countdownlatch的主流程就走通了。
不得不說countdownlatch是一個很高的線程控制工具削祈,極大的方便了我們開發(fā)翅溺。由于知識能力有限,上面是自己的一點見識髓抑,有什么錯誤還望提出咙崎,便于我及時改進(jìn)。