第10章 Java并發(fā)包中線程同步器原理剖析

目錄

CountDownLatch原理剖析

日常開發(fā)中經(jīng)常遇到一個線程需要等待一些線程都結(jié)束后才能繼續(xù)向下運行的場景,在CountDownLatch出現(xiàn)之前通常使用join方法來實現(xiàn)缚窿,但join方法不夠靈活,所以開發(fā)了CountDownLatch在张。

示例

public static void main(String[] args) throws InterruptedException {
    CountDownLatch countDownLatch = new CountDownLatch(2);

    ExecutorService executorService = Executors.newFixedThreadPool(2);
    // 添加任務(wù)
    executorService.execute(new Runnable() {
        @Override
        public void run() {
            try {
                // 模擬運行時間
                Thread.sleep(1000);
                System.out.println("thread one over...");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }finally {
                // 遞減計數(shù)器
                countDownLatch.countDown();
            }
        }
    });

    // 同上
    executorService.execute(new Runnable() {
        @Override
        public void run() {
            try {
                Thread.sleep(1000);
                System.out.println("thread two over...");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }finally {
                countDownLatch.countDown();
            }
        }
    });

    System.out.println("wait all child thread over!");
    // 阻塞直到被interrupt或計數(shù)器遞減至0
    countDownLatch.await();
    System.out.println("all child thread over!");
    executorService.shutdown();
}

輸出為:

wait all child thread over!
thread one over...
thread two over...
all child thread over!

CountDownLatch相對于join方法的優(yōu)點大致有兩點:

  • 調(diào)用一個子線程的join方法后斤蔓,該線程會一直阻塞直到子線程運行完畢订框,而CountDownLatch允許子線程運行完畢或在運行過程中遞減計數(shù)器,也就是說await方法不一定要等到子線程運行結(jié)束才返回坯台。
  • 使用線程池來管理線程一般都是直接添加Runnable到線程池聂沙,這時就沒有辦法再調(diào)用線程的join方法了秆麸,而仍可在子線程中遞減計數(shù)器,也就是說CountDownLatch相比join方法可以更靈活地控制線程的同步及汉。

類圖結(jié)構(gòu)

由圖可知蛔屹,CountDownLatch是基于AQS實現(xiàn)的。

由下面的代碼可知豁生,CountDownLatch的計數(shù)器值就是AQS的state值

public CountDownLatch(int count) {
    if (count < 0) throw new IllegalArgumentException("count < 0");
    this.sync = new Sync(count);
}
Sync(int count) {
    setState(count);
}

源碼解析

void await()

當(dāng)線程調(diào)用CountDownLatch的await方法后漫贞,當(dāng)前線程會被阻塞甸箱,直到CountDownLatch的計數(shù)器值遞減至0或者其他線程調(diào)用了當(dāng)前線程的interrupt方法。

public void await() throws InterruptedException {
    // 允許中斷(中斷時拋出異常)
    sync.acquireSharedInterruptibly(1);
}

// AQS的方法
public final void acquireSharedInterruptibly(int arg)
        throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    // state=0時tryAcquireShared方法返回1迅脐,直接返回
    // 否則執(zhí)行doAcquireSharedInterruptibly方法
    if (tryAcquireShared(arg) < 0)
        // state不為0芍殖,調(diào)用該方法使await方法阻塞
        doAcquireSharedInterruptibly(arg);
}

// Sync的方法(重寫了AQS中的該方法)
protected int tryAcquireShared(int acquires) {
    return (getState() == 0) ? 1 : -1;
}

// AQS的方法
private void doAcquireSharedInterruptibly(int arg)
    throws InterruptedException {
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        for (;;) {
            final Node p = node.predecessor();
            if (p == head) {
                // 獲取state值,state=0時r=1谴蔑,直接返回豌骏,不再阻塞
                int r = tryAcquireShared(arg);
                if (r >= 0) {
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    failed = false;
                    return;
                }
            }
            // 若state不為0則阻塞調(diào)用await方法的線程
            // 等到其他線程執(zhí)行countDown方法使計數(shù)器遞減至0
            // (state變?yōu)?)或該線程被interrupt時
            // 該線程才能繼續(xù)向下運行
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                throw new InterruptedException();
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

boolean await(long timeout, TimeUnit unit)

相較于上面的await方法,調(diào)用此方法后調(diào)用線程最多被阻塞timeout時間(單位由unit指定)隐锭,即使計數(shù)器沒有遞減至0或調(diào)用線程沒有被interrupt窃躲,調(diào)用線程也會繼續(xù)向下運行。

public boolean await(long timeout, TimeUnit unit)
    throws InterruptedException {
    return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}

void countDown()

遞減計數(shù)器钦睡,當(dāng)計數(shù)器的值為0(即state=0)時會喚醒所有因調(diào)用await方法而被阻塞的線程蒂窒。

public void countDown() {
    // 將計數(shù)器減1
    sync.releaseShared(1);
}

// AQS的方法
public final boolean releaseShared(int arg) {
    // 當(dāng)state被遞減至0時tryReleaseShared返回true
    // 會執(zhí)行doReleaseShared方法喚醒因調(diào)用await方法阻塞的線程
    // 否則如果state不是0的話什么也不做
    if (tryReleaseShared(arg)) {
        doReleaseShared();
        return true;
    }
    return false;
}

// Sync重寫的AQS中的方法
protected boolean tryReleaseShared(int releases) {
    for (;;) {
        int c = getState();
        // 如果state已經(jīng)為0,沒有遞減必要,直接返回
        // 否則會使state變成負(fù)數(shù)
        if (c == 0)
            return false;
        int nextc = c-1;
        // 通過CAS遞減state的值
        if (compareAndSetState(c, nextc))
            // 如果state被遞減至0洒琢,返回true以進(jìn)行后續(xù)喚醒工作
            return nextc == 0;
    }
}

CyclicBarrier原理探究

CountDownLatch的計數(shù)器時一次性的秧秉,也就是說當(dāng)計數(shù)器至變?yōu)?后,再調(diào)用await和countDown方法會直接返回衰抑。而CyclicBarrier則解決了此問題象迎。CyclicBarrier是回環(huán)屏障的意思,它可以使一組線程全部達(dá)到一個狀態(tài)后再全部同時執(zhí)行呛踊,然后重置自身狀態(tài)又可用于下一次的狀態(tài)同步砾淌。

示例

假設(shè)一個任務(wù)由階段1、階段2恋技、階段3組成拇舀,每個線程要串行地執(zhí)行階段1、階段2蜻底、階段3骄崩,當(dāng)多個線程執(zhí)行該任務(wù)時,必須要保證所有線程的階段1都執(zhí)行完畢后才能進(jìn)入階段2薄辅,當(dāng)所有線程的階段2都執(zhí)行完畢后才能進(jìn)入階段3要拂,可用下面的代碼實現(xiàn):

public static void main(String[] args) {
    // 等待兩個線程同步
    CyclicBarrier cyclicBarrier = new CyclicBarrier(2);

    ExecutorService executorService = Executors.newFixedThreadPool(2);
    // 運行兩個子線程,當(dāng)兩個子線程的step1都執(zhí)行完畢后才會執(zhí)行step2
    // 當(dāng)兩個子線程的step2都執(zhí)行完畢后才會執(zhí)行step3
    for(int i = 0; i < 2; i++) {
        executorService.execute(new Runnable() {
            @Override
            public void run() {
                try{
                    System.out.println(Thread.currentThread() + " step1");
                    cyclicBarrier.await();
                    System.out.println(Thread.currentThread() + " step2");
                    cyclicBarrier.await();
                    System.out.println(Thread.currentThread() + " step3");
                }catch (Exception e){
                    e.printStackTrace();
                }
            }
        });
    }
    executorService.shutdown();
}

輸出如下:

Thread[pool-1-thread-1,5,main] step1
Thread[pool-1-thread-2,5,main] step1
Thread[pool-1-thread-1,5,main] step2
Thread[pool-1-thread-2,5,main] step2
Thread[pool-1-thread-2,5,main] step3
Thread[pool-1-thread-1,5,main] step3

類圖結(jié)構(gòu)

CyclicBarrier基于ReentrantLock實現(xiàn)站楚,本質(zhì)上還是基于AQS的脱惰。parties用于記錄線程個數(shù),表示多少個線程調(diào)用await方法后窿春,所有線程才會沖破屏障往下運行拉一。count一開始等于parties,當(dāng)由線程調(diào)用await方法時會遞減1旧乞,當(dāng)count變成0時到達(dá)屏障點蔚润,所有調(diào)用await的線程會一起往下執(zhí)行,此時要重置CyclicBarrier尺栖,再次令count=parties嫡纠。

lock用于保證更新計數(shù)器count的原子性。lock的條件變量trip用于支持線程間使用await和signalAll進(jìn)行通信延赌。

以下是CyclicBarrier的構(gòu)造函數(shù):

public CyclicBarrier(int parties) {
    this(parties, null);
}

// barrierAction為達(dá)到屏障點(parties個線程調(diào)用了await方法)時執(zhí)行的任務(wù)
public CyclicBarrier(int parties, Runnable barrierAction) {
    if (parties <= 0) throw new IllegalArgumentException();
    this.parties = parties;
    this.count = parties;
    this.barrierCommand = barrierAction;
}

Generation的定義如下:

private static class Generation {
    // 記錄當(dāng)前屏障是否可以被打破
    boolean broken = false;
}

源碼分析

int await()

當(dāng)前線程調(diào)用該方法時會阻塞除盏,直到滿足以下條件之一才會返回:

  • parties個線程調(diào)用了await方法,也就是到達(dá)屏障點
  • 其他線程調(diào)用了當(dāng)前線程的interrupt方法
  • Generation對象的broken標(biāo)志被設(shè)置為true挫以,拋出BrokenBarrierExecption
public int await() throws InterruptedException, BrokenBarrierException {
    try {
        // false表示不設(shè)置超時時間者蠕,此時后面參數(shù)無意義
        // dowait稍后具體分析
        return dowait(false, 0L);
    } catch (TimeoutException toe) {
        throw new Error(toe); // cannot happen
    }
}

boolean await(long timeout, TimeUnit unit)

相比于await(),等待超時會返回false掐松。

public int await(long timeout, TimeUnit unit)
    throws InterruptedException,
            BrokenBarrierException,
            TimeoutException {
           // 設(shè)置了超時時間
           // dowait稍后分析     
    return dowait(true, unit.toNanos(timeout));
}

int dowait(boolean timed, long nanos)

private int dowait(boolean timed, long nanos)
    throws InterruptedException, BrokenBarrierException,
            TimeoutException {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        final Generation g = generation;
        // 屏障已被打破則拋出異常
        if (g.broken)
            throw new BrokenBarrierException();

        // 線程中斷則拋出異常
        if (Thread.interrupted()) {
            // 打破屏障
            // 會做三件事
            // 1. 設(shè)置generation的broken為true
            // 2. 重置count為parites
            // 3. 調(diào)用signalAll激活所有等待線程
            breakBarrier();
            throw new InterruptedException();
        }

        int index = --count;
        // 到達(dá)了屏障點
        if (index == 0) {
            boolean ranAction = false;
            try {
                final Runnable command = barrierCommand;
                if (command != null)
                    // 執(zhí)行每一次到達(dá)屏障點所需要執(zhí)行的任務(wù)
                    command.run();
                ranAction = true;
                // 重置狀態(tài)蠢棱,進(jìn)入下一次屏障
                nextGeneration();
                return 0;
            } finally {
                if (!ranAction)
                    breakBarrier();
            }
        }

        // 如果index不為0
        // loop until tripped, broken, interrupted, or timed out
        for (;;) {
            try {
                if (!timed)
                    trip.await();
                else if (nanos > 0L)
                    nanos = trip.awaitNanos(nanos);
            } catch (InterruptedException ie) {
                // 執(zhí)行此處時锌杀,有可能其他線程已經(jīng)調(diào)用了nextGeneration方法
                // 此時應(yīng)該使當(dāng)前線程正常執(zhí)行下去
                // 否則打破屏障
                if (g == generation && ! g.broken) {
                    breakBarrier();
                    throw ie;
                } else {
                    // We're about to finish waiting even if we had not
                    // been interrupted, so this interrupt is deemed to
                    // "belong" to subsequent execution.
                    Thread.currentThread().interrupt();
                }
            }

            if (g.broken)
                throw new BrokenBarrierException();
            // 如果此次屏障已經(jīng)結(jié)束,則正常返回
            if (g != generation)
                return index;
            // 如果是因為超時泻仙,則打破屏障并拋出異常
            if (timed && nanos <= 0L) {
                breakBarrier();
                throw new TimeoutException();
            }
        }
    } finally {
        lock.unlock();
    }
}

// 打破屏障
private void breakBarrier() {
    // 設(shè)置打破標(biāo)志
    generation.broken = true;
    // 重置count
    count = parties;
    // 喚醒所有等待的線程
    trip.signalAll();
}

private void nextGeneration() {
    // 喚醒當(dāng)前屏障下所有被阻塞的線程
    trip.signalAll();
    // 重置狀態(tài)糕再,進(jìn)入下一次屏障
    count = parties;
    generation = new Generation();
}

Semaphore原理探究

Semaphore信號量也是一個同步器,與CountDownLatch和CyclicBarrier不同的是玉转,它內(nèi)部的計數(shù)器是遞增的突想,并且在初始化時可以指定計數(shù)器的初始值(通常為0),但不必知道需要同步的線程個數(shù)究抓,而是在需要同步的地方調(diào)用acquire方法時指定需要同步的線程個數(shù)猾担。

示例

public static void main(String[] args) throws InterruptedException {
    final int THREAD_COUNT = 2;
    // 初始信號量為0
    Semaphore semaphore = new Semaphore(0);
    ExecutorService executorService = Executors.newFixedThreadPool(THREAD_COUNT);

    for (int i = 0; i < THREAD_COUNT; i++){
        executorService.execute(new Runnable() {
            @Override
            public void run() {
                System.out.println(Thread.currentThread() + " over");
                // 信號量+1
                semaphore.release();
            }
        });
    }

    // 當(dāng)信號量達(dá)到2時才停止阻塞
    semaphore.acquire(2);
    System.out.println("all child thread over!");

    executorService.shutdown();
}

類圖結(jié)構(gòu)

由圖可知,Semaphore還是使用AQS實現(xiàn)的刺下,并且可以選取公平性策略(默認(rèn)為非公平性的)绑嘹。

源碼解析

void acquire()

表示當(dāng)前線程希望獲取一個信號量資源,如果當(dāng)前信號量大于0橘茉,則當(dāng)前信號量的計數(shù)減1工腋,然后該方法直接返回。否則如果當(dāng)前信號量等于0畅卓,則被阻塞擅腰。

public void acquire() throws InterruptedException {
    sync.acquireSharedInterruptibly(1);
}

public final void acquireSharedInterruptibly(int arg)
        throws InterruptedException {
    // 可以被中斷
    if (Thread.interrupted())
        throw new InterruptedException();
    // 調(diào)用Sync子類方法嘗試獲取,這里根據(jù)構(gòu)造函數(shù)決定公平策略
    if (tryAcquireShared(arg) < 0)
        // 將當(dāng)前線程放入阻塞隊列翁潘,然后再次嘗試
        // 如果失敗則掛起當(dāng)前線程
        doAcquireSharedInterruptibly(arg);
}

tryAcquireShared由Sync的子類實現(xiàn)以根據(jù)公平性采取相應(yīng)的行為趁冈。

以下是非公平策略NofairSync的實現(xiàn):

protected int tryAcquireShared(int acquires) {
    return nonfairTryAcquireShared(acquires);
}

final int nonfairTryAcquireShared(int acquires) {
    for (;;) {
        int available = getState();
        int remaining = available - acquires;
        // 如果剩余信號量小于0直接返回
        // 否則如果更新信號量成功則返回
        if (remaining < 0 ||
            compareAndSetState(available, remaining))
            return remaining;
    }
}

假設(shè)線程A調(diào)用了acquire方法嘗試獲取信號量但因信號量不足被阻塞,這時線程B通過release增加了信號量拜马,此時線程C完全可以調(diào)用acquire方法成功獲取到信號量(如果信號量足夠的話)渗勘,這就是非公平性的體現(xiàn)。

下面是公平性的實現(xiàn):

protected int tryAcquireShared(int acquires) {
    for (;;) {
        // 關(guān)鍵在于先判斷AQS隊列中是否已經(jīng)有元素要獲取信號量
        if (hasQueuedPredecessors())
            return -1;
        int available = getState();
        int remaining = available - acquires;
        if (remaining < 0 ||
            compareAndSetState(available, remaining))
            return remaining;
    }
}

hasQueuedPredecessors方法(可參看第6章 Java并發(fā)包中鎖原理剖析)用于判斷當(dāng)前線程的前驅(qū)節(jié)點是否也在等待獲取該資源俩莽,如果是則自己放棄獲取的權(quán)限呀邢,然后當(dāng)前線程會被放入AQS中,否則嘗試去獲取豹绪。

void acquire(int permits)

可獲取多個信號量。

public void acquire(int permits) throws InterruptedException {
    if (permits < 0) throw new IllegalArgumentException();
    sync.acquireSharedInterruptibly(permits);
}

void acquireUninterruptibly()

不對中斷進(jìn)行響應(yīng)申眼。

public void acquireUninterruptibly() {
    sync.acquireShared(1);
}

void acquireUninterruptibly(int permits)

不對中斷進(jìn)行相應(yīng)并且可獲取多個信號量瞒津。

public void acquireUninterruptibly(int permits) {
    if (permits < 0) throw new IllegalArgumentException();
    sync.acquireShared(permits);
}

void release()

使信號量加1,如果當(dāng)前有線程因為調(diào)用acquire方法被阻塞而被放入AQS中的話括尸,會根據(jù)公平性策略選擇一個信號量個數(shù)能被滿足的線程進(jìn)行激活巷蚪。

public void release() {
    sync.releaseShared(1);
}

public final boolean releaseShared(int arg) {
    // 嘗試釋放資源(增加信號量)
    if (tryReleaseShared(arg)) {
        // 釋放資源成功則根據(jù)公平性策略喚醒AQS中阻塞的線程
        doReleaseShared();
        return true;
    }
    return false;
}

protected final boolean tryReleaseShared(int releases) {
    for (;;) {
        int current = getState();
        int next = current + releases;
        if (next < current) // overflow
            throw new Error("Maximum permit count exceeded");
        if (compareAndSetState(current, next))
            return true;
    }
}

void release(int permits)

可增加多個信號量。

public void release(int permits) {
    if (permits < 0) throw new IllegalArgumentException();
    sync.releaseShared(permits);
}

更多

相關(guān)筆記:《Java并發(fā)編程之美》閱讀筆記

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末濒翻,一起剝皮案震驚了整個濱河市屁柏,隨后出現(xiàn)的幾起案子啦膜,更是在濱河造成了極大的恐慌,老刑警劉巖淌喻,帶你破解...
    沈念sama閱讀 211,817評論 6 492
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件僧家,死亡現(xiàn)場離奇詭異,居然都是意外死亡裸删,警方通過查閱死者的電腦和手機八拱,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,329評論 3 385
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來涯塔,“玉大人肌稻,你說我怎么就攤上這事∝拜” “怎么了爹谭?”我有些...
    開封第一講書人閱讀 157,354評論 0 348
  • 文/不壞的土叔 我叫張陵,是天一觀的道長榛搔。 經(jīng)常有香客問我诺凡,道長,這世上最難降的妖魔是什么药薯? 我笑而不...
    開封第一講書人閱讀 56,498評論 1 284
  • 正文 為了忘掉前任绑洛,我火速辦了婚禮,結(jié)果婚禮上童本,老公的妹妹穿的比我還像新娘真屯。我一直安慰自己,他們只是感情好穷娱,可當(dāng)我...
    茶點故事閱讀 65,600評論 6 386
  • 文/花漫 我一把揭開白布绑蔫。 她就那樣靜靜地躺著,像睡著了一般泵额。 火紅的嫁衣襯著肌膚如雪配深。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 49,829評論 1 290
  • 那天嫁盲,我揣著相機與錄音篓叶,去河邊找鬼。 笑死羞秤,一個胖子當(dāng)著我的面吹牛缸托,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播瘾蛋,決...
    沈念sama閱讀 38,979評論 3 408
  • 文/蒼蘭香墨 我猛地睜開眼俐镐,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了哺哼?” 一聲冷哼從身側(cè)響起佩抹,我...
    開封第一講書人閱讀 37,722評論 0 266
  • 序言:老撾萬榮一對情侶失蹤叼风,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后棍苹,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體无宿,經(jīng)...
    沈念sama閱讀 44,189評論 1 303
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 36,519評論 2 327
  • 正文 我和宋清朗相戀三年廊勃,在試婚紗的時候發(fā)現(xiàn)自己被綠了懈贺。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 38,654評論 1 340
  • 序言:一個原本活蹦亂跳的男人離奇死亡坡垫,死狀恐怖梭灿,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情冰悠,我是刑警寧澤堡妒,帶...
    沈念sama閱讀 34,329評論 4 330
  • 正文 年R本政府宣布,位于F島的核電站溉卓,受9級特大地震影響皮迟,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜桑寨,卻給世界環(huán)境...
    茶點故事閱讀 39,940評論 3 313
  • 文/蒙蒙 一伏尼、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧尉尾,春花似錦爆阶、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,762評論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至肢藐,卻和暖如春故河,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背吆豹。 一陣腳步聲響...
    開封第一講書人閱讀 31,993評論 1 266
  • 我被黑心中介騙來泰國打工鱼的, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人痘煤。 一個月前我還...
    沈念sama閱讀 46,382評論 2 360
  • 正文 我出身青樓凑阶,卻偏偏與公主長得像,于是被迫代替她去往敵國和親速勇。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 43,543評論 2 349

推薦閱讀更多精彩內(nèi)容