countdownlatch源碼分析

countdownlatch是JDK提供的一個線程控制的工具類,雖然代碼短少琼掠,實現(xiàn)簡單腋腮,但是它的作用卻十分的大勒虾。

1.從一個例子開始####

1.現(xiàn)有一文件,文件的大小超過100G瘸彤,現(xiàn)在的需求是修然,計算文件中每一行數(shù)據(jù)的MD5值。
2.現(xiàn)在要實現(xiàn)一個RPC請求模型质况,要求實現(xiàn)愕宋,RPC過程中的請求超時的判斷和處理流程。

先說第一個場景结榄,第一個場景需要計算所有文件的MD5中贝,但是100G文件處理相對較大,那么我們勢必要利用多線程去并行處理大文件臼朗,并將最后的結(jié)果輸出邻寿。但是如何控制主線程在所有線程結(jié)束之后結(jié)束蝎土,是一個需要思考的過程。

第二個場景绣否,在RPC請求發(fā)出后誊涯,我們需要在一定時間內(nèi)等待請求的響應(yīng),在超時之后沒有響應(yīng)的蒜撮,我們需要拋出異常暴构。

上面兩種場景,其實用wait notify都可以解決段磨,但是實現(xiàn)起來是比較麻煩的取逾,但是用countdownlatch解決起來十分簡單。

拿第一個例子來說苹支,我們簡單的實現(xiàn)一下:

package countdownlatch;

import com.google.common.base.Charsets;
import com.google.common.hash.HashCode;
import com.google.common.hash.HashFunction;
import com.google.common.hash.Hashing;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 * 多線程處理一個文件
 */
public class MultiThread {
    private static ArrayBlockingQueue<String> blockingQueue = new ArrayBlockingQueue<String>(10);
    private static CountDownLatch latch;
    private static final int ThreadNum = 10;

    static {
        for (int i = 0; i < 10; i++) {
            blockingQueue.add("test" + i);
        }
        latch = new CountDownLatch(10);
    }

    /**
     * 用blockQueue中的元素模擬文件分片
     * @return
     */
    public static String getFileSplit() {
        return blockingQueue.poll();
    }

    static class myThread implements Runnable {

        public void run() {
            System.out.println(Thread.currentThread().getName() + "begin running...");
            String m = getFileSplit();
            HashFunction hf = Hashing.md5();
            HashCode hc = hf.newHasher()
                    .putString(m, Charsets.UTF_8)
                    .hash();
            System.out.println(hc.toString());
            try {
                Thread.currentThread().sleep(5000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            latch.countDown();
            System.out.println(Thread.currentThread().getName() + "ended");
        }
    }

    public static void main(String args[]){
        System.out.println("主線程開始運(yùn)行");
        ExecutorService service = Executors.newFixedThreadPool(10);
        for (int i=0;i<ThreadNum;i++){
            service.execute(new Thread(new myThread()));
        }
        service.shutdown();
        System.out.println("線程已經(jīng)全部運(yùn)行");
        System.out.println("等待所有線程運(yùn)行結(jié)束");
        try {
            latch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("主線程退出");
    }
}

輸出是這樣的:
主線程開始運(yùn)行
線程已經(jīng)全部運(yùn)行
等待所有線程運(yùn)行結(jié)束
pool-1-thread-2begin running...
pool-1-thread-6begin running...
pool-1-thread-1begin running...
pool-1-thread-3begin running...
pool-1-thread-5begin running...
pool-1-thread-9begin running...
pool-1-thread-8begin running...
pool-1-thread-10begin running...
pool-1-thread-7begin running...
pool-1-thread-4begin running...
b04083e53e242626595e2b8ea327e525
5e40d09fa0529781afd1254a42913847
8ad8757baa8564dc136c1e07507f4a98
86985e105f79b95d6bc918fb45ec7727
739969b53246b2c727850dbb3490ede6
5a105e8b9d40e1329780d62ea2265d8a
4cfad7076129962ee70c36839a1e3e15
ad0234829205b9033196ba818f7a872b
f6f4061a1bddc1c04d8109b39f581270
e3d704f3542b44a621ebed70dc0efe13
pool-1-thread-3ended
pool-1-thread-2ended
pool-1-thread-10ended
pool-1-thread-4ended
pool-1-thread-7ended
pool-1-thread-5ended
pool-1-thread-6ended
pool-1-thread-8ended
pool-1-thread-1ended
pool-1-thread-9ended
主線程退出

從輸出我們可以看出砾隅,主線程確實是等所有線程結(jié)束后才退出的,這也正是我們預(yù)期的結(jié)果沐序,有的童鞋就說了琉用,我可以利用join實現(xiàn)和你一樣的效果,但是Join是基于wait實現(xiàn)的策幼,每一個線程join另一個線程就會有一個線程進(jìn)入wait狀態(tài)邑时,也就是說同一時刻只有一個線程在運(yùn)行,多余的CPU都浪費掉了特姐,這顯然不是很合理晶丘。

2.說說countdownlatch的API####

countdownlatch的API真的很少,下圖是這個工具類的完整結(jié)構(gòu)唐含。


Paste_Image.png

我們可以看到主要方法有三個:await()浅浮,await(long, TimeUnit),countDown()

await():阻塞當(dāng)前線程捷枯,直到latch的值為0滚秩,或當(dāng)前線程被中斷

     * Causes the current thread to wait until the latch has counted down to
     * zero, unless the thread is {@linkplain Thread#interrupt interrupted}.

await(long, TimeUnit):阻塞當(dāng)前線程,知道latch為0淮捆,線程被中斷郁油,或者超時。

     * Causes the current thread to wait until the latch has counted down to
     * zero, unless the thread is {@linkplain Thread#interrupt interrupted},
     * or the specified waiting time elapses.

countDown():使latch的值減小1

       Decrements the count of the latch, releasing all waiting threads if
     * the count reaches zero.

3.說說countdownlatch的實現(xiàn)

countdownlatch其實是基于同步器AbstractQueuedSynchronizer實現(xiàn)的攀痊,ReentrantLock其實也是基于AbstractQueuedSynchronizer實現(xiàn)的桐腌,那么好像預(yù)示了什么。

首先看構(gòu)造函數(shù):

    /**
     * Constructs a {@code CountDownLatch} initialized with the given count.
     *
     * @param count the number of times {@link #countDown} must be invoked
     *        before threads can pass through {@link #await}
     * @throws IllegalArgumentException if {@code count} is negative
     */
    public CountDownLatch(int count) {
        if (count < 0) throw new IllegalArgumentException("count < 0");
        this.sync = new Sync(count);
    }

構(gòu)造函數(shù)的參數(shù)是一個整數(shù)值苟径,意思是說需要多少個latch案站。
實體化Sync,sync是countdownlatch的內(nèi)部類棘街,它繼承了AbstractQueuedSynchronizer蟆盐。

 Sync(int count) {
            setState(count);
        }

主要是將latch的值賦予AbstractQueuedSynchronizer的State
再看await()方法:

 public void await() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }

await()內(nèi)調(diào)用了 sync.acquireSharedInterruptibly(1) 承边;

/**
     * Acquires in shared mode, aborting if interrupted.  Implemented
     * by first checking interrupt status, then invoking at least once
     * {@link #tryAcquireShared}, returning on success.  Otherwise the
     * thread is queued, possibly repeatedly blocking and unblocking,
     * invoking {@link #tryAcquireShared} until success or the thread
     * is interrupted.
     * @param arg the acquire argument
     * This value is conveyed to {@link #tryAcquireShared} but is
     * otherwise uninterpreted and can represent anything
     * you like.
     * @throws InterruptedException if the current thread is interrupted
     */
    public final void acquireSharedInterruptibly(int arg)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        if (tryAcquireShared(arg) < 0)
            doAcquireSharedInterruptibly(arg);
    }

這里先檢測了線程中斷狀態(tài),中斷了則拋出異常舱禽,接下來調(diào)用tryAcquireShared炒刁,tryAcquireShared是Syn的實現(xiàn)的:

        protected int tryAcquireShared(int acquires) {
            return (getState() == 0) ? 1 : -1;
        }

其實就是簡單的獲取了同步器的state,判斷是否為0誊稚,之前博客里面有寫ReentrantLock翔始,兩者的機(jī)制是一樣的。因為countDownLacth實例化之后的State一般不是0里伯,那么此方法返回-1.進(jìn)入doAcquireSharedInterruptibly:

/**
/**
    * Acquires in shared interruptible mode.
    * @param arg the acquire argument
    */
   private void doAcquireSharedInterruptibly(int arg)
       throws InterruptedException {
       final Node node = addWaiter(Node.SHARED);
       boolean failed = true;
       try {
           for (;;) {
               final Node p = node.predecessor();
               if (p == head) {
                   int r = tryAcquireShared(arg);
                   if (r >= 0) {
                       setHeadAndPropagate(node, r);
                       p.next = null; // help GC
                       failed = false;
                       return;
                   }
               }
               if (shouldParkAfterFailedAcquire(p, node) &&
                   parkAndCheckInterrupt())
                   throw new InterruptedException();
           }
       } finally {
           if (failed)
               cancelAcquire(node);
       }
   }

這段代碼是比較熟悉的在ReentrantLock中分析過城瞎,這里關(guān)鍵的點是parkAndCheckInterrupt()

/**
     * Convenience method to park and then check if interrupted
     *
     * @return {@code true} if interrupted
     * 
     */
    private final boolean parkAndCheckInterrupt() {
        LockSupport.park(this);
        return Thread.interrupted();
    }

執(zhí)行到此處時,線程會阻塞疾瓮,知道有其他線程喚醒此線程脖镀,執(zhí)行await之后,上文中的主線程阻塞在這狼电。
接下來分析下countDown():

    /**
     * Decrements the count of the latch, releasing all waiting threads if
     * the count reaches zero.
     *
     * <p>If the current count is greater than zero then it is decremented.
     * If the new count is zero then all waiting threads are re-enabled for
     * thread scheduling purposes.
     *
     * <p>If the current count equals zero then nothing happens.
     */
    public void countDown() {
        sync.releaseShared(1);
    }

調(diào)用了Sync的releaseShared:

    /**
     * Releases in shared mode.  Implemented by unblocking one or more
     * threads if {@link #tryReleaseShared} returns true.
     *
     * @param arg the release argument.  This value is conveyed to
     *        {@link #tryReleaseShared} but is otherwise uninterpreted
     *        and can represent anything you like.
     * @return the value returned from {@link #tryReleaseShared}
     */
    public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {
            doReleaseShared();
            return true;
        }
        return false;
    }

接下來是tryReleaseShared

        protected boolean tryReleaseShared(int releases) {
            // Decrement count; signal when transition to zero
            for (;;) {
                int c = getState();
                if (c == 0)
                    return false;
                int nextc = c-1;
                if (compareAndSetState(c, nextc))
                    return nextc == 0;
            }
        }

此方法是用CAS減小State的值蜒灰。如果State=0那么嘗試喚醒等待線程,執(zhí)行doReleaseShared:

    /**
     * Release action for shared mode -- signal successor and ensure
     * propagation. (Note: For exclusive mode, release just amounts
     * to calling unparkSuccessor of head if it needs signal.)
     */
    private void doReleaseShared() {
        /*
         * Ensure that a release propagates, even if there are other
         * in-progress acquires/releases.  This proceeds in the usual
         * way of trying to unparkSuccessor of head if it needs
         * signal. But if it does not, status is set to PROPAGATE to
         * ensure that upon release, propagation continues.
         * Additionally, we must loop in case a new node is added
         * while we are doing this. Also, unlike other uses of
         * unparkSuccessor, we need to know if CAS to reset status
         * fails, if so rechecking.
         */
        for (;;) {
            Node h = head;
            if (h != null && h != tail) {
                int ws = h.waitStatus;
                if (ws == Node.SIGNAL) {
                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                        continue;            // loop to recheck cases
                    unparkSuccessor(h);
                }
                else if (ws == 0 &&
                         !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                    continue;                // loop on failed CAS
            }
            if (h == head)                   // loop if head changed
                break;
        }
    }

這里需要關(guān)注一點就是unparkSuccessor肩碟,這個方法是喚醒上文中的主線程强窖。至此countdownlatch的主流程就走通了。

不得不說countdownlatch是一個很高的線程控制工具削祈,極大的方便了我們開發(fā)翅溺。由于知識能力有限,上面是自己的一點見識髓抑,有什么錯誤還望提出咙崎,便于我及時改進(jìn)。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末吨拍,一起剝皮案震驚了整個濱河市褪猛,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌羹饰,老刑警劉巖握爷,帶你破解...
    沈念sama閱讀 212,599評論 6 492
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異严里,居然都是意外死亡,警方通過查閱死者的電腦和手機(jī)追城,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,629評論 3 385
  • 文/潘曉璐 我一進(jìn)店門刹碾,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人座柱,你說我怎么就攤上這事迷帜∥锸妫” “怎么了?”我有些...
    開封第一講書人閱讀 158,084評論 0 348
  • 文/不壞的土叔 我叫張陵戏锹,是天一觀的道長冠胯。 經(jīng)常有香客問我,道長锦针,這世上最難降的妖魔是什么荠察? 我笑而不...
    開封第一講書人閱讀 56,708評論 1 284
  • 正文 為了忘掉前任,我火速辦了婚禮奈搜,結(jié)果婚禮上悉盆,老公的妹妹穿的比我還像新娘。我一直安慰自己馋吗,他們只是感情好焕盟,可當(dāng)我...
    茶點故事閱讀 65,813評論 6 386
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著宏粤,像睡著了一般脚翘。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上绍哎,一...
    開封第一講書人閱讀 50,021評論 1 291
  • 那天来农,我揣著相機(jī)與錄音,去河邊找鬼蛇摸。 笑死备图,一個胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的赶袄。 我是一名探鬼主播揽涮,決...
    沈念sama閱讀 39,120評論 3 410
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼饿肺!你這毒婦竟也來了蒋困?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 37,866評論 0 268
  • 序言:老撾萬榮一對情侶失蹤敬辣,失蹤者是張志新(化名)和其女友劉穎雪标,沒想到半個月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體溉跃,經(jīng)...
    沈念sama閱讀 44,308評論 1 303
  • 正文 獨居荒郊野嶺守林人離奇死亡村刨,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 36,633評論 2 327
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了撰茎。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片嵌牺。...
    茶點故事閱讀 38,768評論 1 341
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出逆粹,到底是詐尸還是另有隱情募疮,我是刑警寧澤,帶...
    沈念sama閱讀 34,461評論 4 333
  • 正文 年R本政府宣布僻弹,位于F島的核電站阿浓,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏蹋绽。R本人自食惡果不足惜芭毙,卻給世界環(huán)境...
    茶點故事閱讀 40,094評論 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望蟋字。 院中可真熱鬧稿蹲,春花似錦、人聲如沸鹊奖。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,850評論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽忠聚。三九已至设哗,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間两蟀,已是汗流浹背网梢。 一陣腳步聲響...
    開封第一講書人閱讀 32,082評論 1 267
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機(jī)就差點兒被人妖公主榨干…… 1. 我叫王不留赂毯,地道東北人战虏。 一個月前我還...
    沈念sama閱讀 46,571評論 2 362
  • 正文 我出身青樓,卻偏偏與公主長得像党涕,于是被迫代替她去往敵國和親烦感。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 43,666評論 2 350

推薦閱讀更多精彩內(nèi)容