Java 中的同步器

  • CountDownLatch
  • CyclicBarrier
  • Semaphore

CountDownLatch

1. CountDownLatch 的使用

private void countDownTest() {
        // 1. 首先我們聲明一個(gè)CountDownLatch實(shí)例胜宇,參數(shù)為我們需要同步的線程個(gè)數(shù)
        final CountDownLatch countDownLatch = new CountDownLatch(2);
        ExecutorService executorService = Executors.newFixedThreadPool(2);
        executorService.execute(new Runnable() {
            @Override
            public void run() {
                try {
                    Thread.sleep(1000);
                    Log.i(TAG, "run: Thread  A  run");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }finally {
                    // 2. 在操作完畢后通知
                    countDownLatch.countDown();
                }
            }
        });

        executorService.execute(new Runnable() {
            @Override
            public void run() {
                try {
                    Thread.sleep(1000);
                    Log.i(TAG, "run: Thread  B  run");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }finally {
                    // 2. 操作完畢后通知
                    countDownLatch.countDown();

                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    Log.i(TAG, "run: Thread  B  run   next");
                }
            }
        });

        try {
             // 3. 在需要同步的線程進(jìn)行等待
            countDownLatch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        Log.i(TAG, "countDownTest: main---- run");

        executorService.shutdown();
    }

我們看下輸出的日志情況

ph_MainActivity: run: Thread  A  run
ph_MainActivity: run: Thread  B  run
ph_MainActivity: countDownTest: main---- run
ph_MainActivity: run: Thread  B  run   next

從使用的方法及結(jié)果我們可以看到颅夺,CountDownLatch 可以實(shí)現(xiàn)join 的功能契耿,但是比join更靈活奋早,可以結(jié)合線程池使用默责;并且可以在線程執(zhí)行的任何時(shí)刻進(jìn)行同步过牙,不是必須在任務(wù)結(jié)束時(shí)
2. CountDownLaunch 原理解析

CountDownLaunch — UML圖.png

從UML圖中我們得知其使用的AQS實(shí)現(xiàn)的莺丑。

  • 構(gòu)造方法
// count 是線程在通過之前必須被調(diào)用的countDown的次數(shù)
public CountDownLatch(int count) {
        if (count < 0) throw new IllegalArgumentException("count < 0");
        this.sync = new Sync(count);
    }

Sync(int count) {
            setState(count);
        }
  • await :當(dāng)線程調(diào)用await 方法后線程會(huì)被阻塞谴返,當(dāng)其他線程調(diào)用了相應(yīng)次數(shù)的countdown 方法,計(jì)數(shù)器的state 的值為0 時(shí)命贴;或者其他線程調(diào)用了本線程的intrrupt 方法后 會(huì)拋出異常放回
public void await() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }

那不Sync 中沒有實(shí)現(xiàn)acquireSharedInterruptibly,我們?cè)贏QS中看下

public final void acquireSharedInterruptibly(int arg)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        if (tryAcquireShared(arg) < 0)
             // 如果獲取失敗則進(jìn)入阻塞隊(duì)列
            doAcquireSharedInterruptibly(arg);
    }

tryAcquireShared 在Sync 中有實(shí)現(xiàn)

// 如果當(dāng)前同步器的狀態(tài) 為0 的話道宅,表示可獲得鎖,否則進(jìn)入阻塞隊(duì)列
        protected int tryAcquireShared(int acquires) {
            return (getState() == 0) ? 1 : -1;
        }
  • await(long timeout, TimeUnit unit) 與await 方法類似胸蛛,只不過當(dāng)超時(shí)會(huì)返回false 而結(jié)束等待
  • countDown:調(diào)用該方法后計(jì)數(shù)器值會(huì)遞減污茵,遞減后如果計(jì)數(shù)器值為0則喚醒所有因調(diào)用await 方法二阻塞的線程。
public void countDown() {
        // 委托Sync 調(diào)用AQS方法
        sync.releaseShared(1);
    }
// 共享模式下的釋放
    public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {
            // 這個(gè)方法就是線程在獲得鎖時(shí)葬项,喚醒后續(xù)節(jié)點(diǎn)時(shí)調(diào)用的方法
            doReleaseShared();
            return true;
        }
        return false;
    }

釋放鎖主要是在tryReleaseShared 中做的泞当,在Sync 中有實(shí)現(xiàn)

// 對(duì) state 進(jìn)行遞減,直到 state 變成 0民珍;
        // state 遞減為 0 時(shí)襟士,返回 true盗飒,其余返回 false
        protected boolean tryReleaseShared(int releases) {
            // 自旋保證 CAS 一定可以成功
            for (;;) {
                int c = getState();
                // state 已經(jīng)是 0 了,直接返回 false
                if (c == 0)
                    return false;
                // 對(duì) state 進(jìn)行遞減
                int nextc = c-1;
                if (compareAndSetState(c, nextc))
                    return nextc == 0;
            }
        }

我們可以看到CountDownLaunch 主要使用了AQS實(shí)現(xiàn)陋桂,主要通過重寫 tryAcquireShared 和 tryReleaseShared 方法進(jìn)行了控制箩兽。


CyclicBarrier

1. CyclicBarrier 使用

final CyclicBarrier cyclicBarrier = new CyclicBarrier(2, new Runnable() {
            @Override
            public void run() {
                Log.i(TAG, "run: cyclicBarrier over!");
            }
        });
        
        ExecutorService executorService = Executors.newFixedThreadPool(2);
        executorService.execute(new Runnable() {
            @Override
            public void run() {

                try {
                    Log.i(TAG, "run: A  =====1  ");
                    cyclicBarrier.await();
                    Log.i(TAG, "run: A  =====2  ");
                    cyclicBarrier.await();
                    Log.i(TAG, "run: A  =====3  ");
                } catch (BrokenBarrierException e) {
                    e.printStackTrace();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }


            }
        });

        executorService.execute(new Runnable() {
            @Override
            public void run() {
                try {
                    Log.i(TAG, "run: B  =====1  ");
                    cyclicBarrier.await();
                    Log.i(TAG, "run: B  =====2  ");
                    cyclicBarrier.await();
                    Log.i(TAG, "run: B  =====3  ");
                } catch (BrokenBarrierException e) {
                    e.printStackTrace();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }

            }
        });

運(yùn)行結(jié)果:

ph_MainActivity: run: A  =====1  
ph_MainActivity: run: B  =====1  
ph_MainActivity: run: cyclicBarrier over!
ph_MainActivity: run: B  =====2  
ph_MainActivity: run: A  =====2  
ph_MainActivity: run: cyclicBarrier over!
ph_MainActivity: run: A  =====3  
ph_MainActivity: run: B  =====3  

CyclicBarrier 使多個(gè)線程相互等待,假如計(jì)數(shù)器為n章喉,前n-1個(gè)線程都會(huì)因?yàn)榈竭_(dá)屏障而被阻塞,當(dāng)?shù)趎個(gè)線程調(diào)用await 后身坐,計(jì)數(shù)器的值為0了秸脱,這時(shí)候會(huì)發(fā)通知喚醒前n-1個(gè)線程。并且CyclicBarrier 是可以復(fù)用的部蛇,可以定制突破屏障后的操作
2. CyclicBarrier 實(shí)現(xiàn)

CyclicBarrier -- UML圖.png

CyclicBarrier是基于獨(dú)占鎖實(shí)現(xiàn)的摊唇,底層還是基于AQS。

  • parties:用于記錄多少個(gè)線程調(diào)用await 才會(huì)沖破屏障的個(gè)數(shù)涯鲁,即我們初始化傳入的值

  • count:開始為parties的值巷查,當(dāng)調(diào)用一次await 后就-1,當(dāng)為0時(shí)到達(dá)屏障調(diào)用await的線程結(jié)束等待抹腿,隨后便會(huì)恢復(fù)為parties 的值用來復(fù)用岛请。

  • 初始化方法:只是進(jìn)行簡單的賦值

public CyclicBarrier(int parties, Runnable barrierAction) {
        if (parties <= 0) throw new IllegalArgumentException();
        this.parties = parties;
        this.count = parties;
        this.barrierCommand = barrierAction;
    }
  • await()方法:當(dāng)線程調(diào)用該方法后會(huì)進(jìn)行阻塞,直到滿足一下某個(gè)條件才會(huì)繼續(xù)執(zhí)行:parties 為0警绩,即都到了屏障點(diǎn)崇败;其他線程調(diào)用了本線程的interrupt方法
 public int await() throws InterruptedException, BrokenBarrierException {
        try {
            return dowait(false, 0L);
        } catch (TimeoutException toe) {
            throw new Error(toe); // cannot happen
        }
    }
private int dowait(boolean timed, long nanos)
        throws InterruptedException, BrokenBarrierException,
               TimeoutException {
        // 獲取鎖并上鎖
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            final Generation g = generation;

            //如果屏障被打破則拋出BrokenBarrierException異常,在調(diào)用breakBarrier 方法時(shí)會(huì)被打破
            if (g.broken)
                throw new BrokenBarrierException();

            // 如果線程被interrupt 則打破屏障并拋出異常
            if (Thread.interrupted()) {
                breakBarrier();
                throw new InterruptedException();
            }

            // count 進(jìn)行減1操作
            int index = --count;
            // 如果為0肩祥,即所有的線程都到達(dá)了屏障點(diǎn)
            if (index == 0) {  // tripped
                boolean ranAction = false;
                try {
                    // 如果設(shè)置的破除屏障點(diǎn)后需要執(zhí)行的任務(wù)不為空則執(zhí)行
                    final Runnable command = barrierCommand;
                    if (command != null)
                        command.run();
                    ranAction = true;
                    // 喚醒所有的線程并重置
                    nextGeneration();
                    return 0;
                } finally {
                    if (!ranAction)
                        breakBarrier();
                }
            }

            // loop until tripped, broken, interrupted, or timed out
            // 循環(huán)進(jìn)行等待后室,直到被喚醒、打破混狠,或者超時(shí)岸霹?TODO 為什么使用循環(huán)?将饺?贡避?
            for (;;) {
                try {
                    // 如果沒有設(shè)置超時(shí),則調(diào)用await方法直接進(jìn)行等待
                    if (!timed)
                        trip.await();
                    else if (nanos > 0L)
                        nanos = trip.awaitNanos(nanos);
                } catch (InterruptedException ie) {
                    if (g == generation && ! g.broken) {
                        breakBarrier();
                        throw ie;
                    } else {
                        // We're about to finish waiting even if we had not
                        // been interrupted, so this interrupt is deemed to
                        // "belong" to subsequent execution.
                        Thread.currentThread().interrupt();
                    }
                }

                if (g.broken)
                    throw new BrokenBarrierException();
                if (g != generation)
                    return index;

                if (timed && nanos <= 0L) {
                    breakBarrier();
                    throw new TimeoutException();
                }
            }
        } finally {
            lock.unlock();
        }
    }
private void nextGeneration() {
        // 喚醒所有的線程
        trip.signalAll();
        // 重置屏障參數(shù)
        count = parties;
        generation = new Generation();
    }
  • await(timeout, unit):與await 類似予弧,只不過當(dāng)超時(shí)后會(huì)拋出TimeOutException 返回

Semaphore

1. Semaphore 使用方法

final Semaphore semaphore  = new Semaphore(0);
        ExecutorService executorService = Executors.newFixedThreadPool(2);
        executorService.execute(new Runnable() {
            @Override
            public void run() {
                Log.i(TAG, "run: A===");
                semaphore.release();
            }
        });
        executorService.execute(new Runnable() {
            @Override
            public void run() {
                Log.i(TAG, "run: B===");
                semaphore.release();

            }
        });
        semaphore.acquire(2);
        Log.i(TAG, "semaphT: 1=======end");

        executorService.execute(new Runnable() {
            @Override
            public void run() {
                Log.i(TAG, "run: C===");
                semaphore.release();

            }
        });
        executorService.execute(new Runnable() {
            @Override
            public void run() {
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                Log.i(TAG, "run: D===");
                semaphore.release();

            }
        });
        semaphore.tryAcquire(2,1000,TimeUnit.MILLISECONDS);
        Log.i(TAG, "semaphT: 2=======end");

輸出結(jié)果:

ph_MainActivity: run: B===
ph_MainActivity: run: A===
ph_MainActivity: semaphT: 1=======end
ph_MainActivity: run: C===
ph_MainActivity: semaphT: 2=======end
ph_MainActivity: run: D===

Semaphore 和CyclicBarrier 類似可以重復(fù)使用


2. SemaphoreUML 圖

Semaphore — UML圖.png

Semaphore 的源碼我們就不再分析了如果感興趣可以去看一下

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末贸桶,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子桌肴,更是在濱河造成了極大的恐慌皇筛,老刑警劉巖,帶你破解...
    沈念sama閱讀 216,744評(píng)論 6 502
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件坠七,死亡現(xiàn)場離奇詭異水醋,居然都是意外死亡旗笔,警方通過查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,505評(píng)論 3 392
  • 文/潘曉璐 我一進(jìn)店門拄踪,熙熙樓的掌柜王于貴愁眉苦臉地迎上來蝇恶,“玉大人,你說我怎么就攤上這事惶桐〈榛。” “怎么了?”我有些...
    開封第一講書人閱讀 163,105評(píng)論 0 353
  • 文/不壞的土叔 我叫張陵姚糊,是天一觀的道長贿衍。 經(jīng)常有香客問我,道長救恨,這世上最難降的妖魔是什么贸辈? 我笑而不...
    開封第一講書人閱讀 58,242評(píng)論 1 292
  • 正文 為了忘掉前任,我火速辦了婚禮肠槽,結(jié)果婚禮上擎淤,老公的妹妹穿的比我還像新娘。我一直安慰自己秸仙,他們只是感情好嘴拢,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,269評(píng)論 6 389
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著寂纪,像睡著了一般炊汤。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上弊攘,一...
    開封第一講書人閱讀 51,215評(píng)論 1 299
  • 那天抢腐,我揣著相機(jī)與錄音,去河邊找鬼襟交。 笑死迈倍,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的捣域。 我是一名探鬼主播啼染,決...
    沈念sama閱讀 40,096評(píng)論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼焕梅!你這毒婦竟也來了迹鹅?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 38,939評(píng)論 0 274
  • 序言:老撾萬榮一對(duì)情侶失蹤贞言,失蹤者是張志新(化名)和其女友劉穎斜棚,沒想到半個(gè)月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,354評(píng)論 1 311
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡弟蚀,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,573評(píng)論 2 333
  • 正文 我和宋清朗相戀三年蚤霞,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片义钉。...
    茶點(diǎn)故事閱讀 39,745評(píng)論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡昧绣,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出捶闸,到底是詐尸還是另有隱情夜畴,我是刑警寧澤,帶...
    沈念sama閱讀 35,448評(píng)論 5 344
  • 正文 年R本政府宣布删壮,位于F島的核電站贪绘,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏醉锅。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,048評(píng)論 3 327
  • 文/蒙蒙 一发绢、第九天 我趴在偏房一處隱蔽的房頂上張望硬耍。 院中可真熱鬧,春花似錦边酒、人聲如沸经柴。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,683評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽坯认。三九已至,卻和暖如春氓涣,著一層夾襖步出監(jiān)牢的瞬間牛哺,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 32,838評(píng)論 1 269
  • 我被黑心中介騙來泰國打工劳吠, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留引润,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 47,776評(píng)論 2 369
  • 正文 我出身青樓痒玩,卻偏偏與公主長得像淳附,于是被迫代替她去往敵國和親。 傳聞我的和親對(duì)象是個(gè)殘疾皇子蠢古,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,652評(píng)論 2 354

推薦閱讀更多精彩內(nèi)容