[03][01][05] 常見并發(fā)工具的基本原理分析

線程這塊的一些工具類,基本都會(huì)以原理為主,希望大家能有一個(gè)這樣的意識(shí),通過分析別人代碼的設(shè)計(jì)和實(shí)現(xiàn),給自己提供積累一些方法和工具

Condition

在前面學(xué)習(xí) synchronized 的時(shí)候,有講到 wait/notify 的基本使用,結(jié)合 synchronized 可以實(shí)現(xiàn)對(duì)線程的通信.那么這個(gè)時(shí)候我就在思考了,既然 J.U.C 里面提供了鎖的實(shí)現(xiàn)機(jī)制,那 J.U.C 里面有沒有提供類似的線程通信的工具呢油昂?于是找阿找,發(fā)現(xiàn)了一個(gè) Condition 工具類

Condition 是一個(gè)多線程協(xié)調(diào)通信的工具類,可以讓某些線程一起等待某個(gè)條件(condition),只有滿足條件時(shí),線程才會(huì)被喚醒

Condition 的基本使用

ConditionWait

public class ConditionDemoWait implements Runnable{
    private Lock lock;
    private Condition condition;

    public ConditionDemoWait(Lock lock, Condition condition){
        this.lock=lock;
        this.condition=condition;
    }

    @Override
    public void run() {
        System.out.println("begin - ConditionDemoWait");
        try {
            lock.lock();
            condition.await();
            System.out.println("end - ConditionDemoWait");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }finally {
            lock.unlock();
        }
    }
}

ConditionSignal

public class ConditionDemoSignal implements Runnable{
    private Lock lock;
    private Condition condition;

    public ConditionDemoSignal(Lock lock, Condition condition){
        this.lock=lock;
        this.condition=condition;
    }

    @Override
    public void run() {
        System.out.println("begin - ConditionDemoSignal");
        try {
            lock.lock();
            condition.signal();
            System.out.println("end - ConditionDemoSignal");
        }finally {
            lock.unlock();
        }
    }
}

通過這個(gè)案例簡(jiǎn)單實(shí)現(xiàn)了 wait 和 notify 的功能,當(dāng)調(diào)用 await 方法后,當(dāng)前線程會(huì)釋放鎖并等待,而其他線程調(diào)用 condition 對(duì)象的 signal 或者 signalall 方法通知并被阻塞的線程,然后自己執(zhí)行 unlock 釋放鎖,被喚醒的線程獲得之前的鎖繼續(xù)執(zhí)行,最后釋放鎖

所以,condition 中兩個(gè)最重要的方法,一個(gè)是 await,一個(gè)是 signal 方法

  • await: 把當(dāng)前線程阻塞掛起
  • signal: 喚醒阻塞的線程

Condition 源碼分析

調(diào)用 Condition,需要獲得 Lock 鎖,所以意味著會(huì)存在一個(gè) AQS 同步隊(duì)列,在上面那個(gè)案例中,假如兩個(gè)線程同時(shí)運(yùn)行的話,那么 AQS 的隊(duì)列可能是下面這種情況

image

那么這個(gè)時(shí)候 ThreadA 調(diào)用了 condition.await 方法,它做了什么事情呢革娄?

condition.await

調(diào)用 Condition 的 await() 方法(或者以 await 開頭的方法 ),會(huì)使當(dāng)前線程進(jìn)入等待隊(duì)列并釋放鎖,同時(shí)線程狀態(tài)變?yōu)榈却隣顟B(tài).當(dāng)從 await() 方法返回時(shí),當(dāng)前線程一定獲取了 Condition 相關(guān)聯(lián)的鎖

public final void await() throws InterruptedException {
    if (Thread.interrupted()) { //表示 await 允許被中斷
        throw new InterruptedException();
    }

    Node node = addConditionWaiter(); //創(chuàng)建一個(gè)新的節(jié)點(diǎn),節(jié)點(diǎn)狀態(tài)為 condition冕碟,采用的數(shù)據(jù)結(jié)構(gòu)仍然是鏈表
    int savedState = fullyRelease(node); //釋放當(dāng)前的鎖拦惋,得到鎖的狀態(tài),并喚醒 AQS 隊(duì)列中的一個(gè)線程

    int interruptMode = 0;

    //如果當(dāng)前節(jié)點(diǎn)沒有在同步隊(duì)列上安寺,即還沒有被 signal,則將當(dāng)前線程阻塞
    while (!isOnSyncQueue(node)) {//判斷這個(gè)節(jié)點(diǎn)是否在 AQS 隊(duì)列上厕妖,第一次判斷的是 false,因?yàn)榍懊嬉呀?jīng)釋放鎖了
        LockSupport.park(this); //通過 park 掛起當(dāng)前線程
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) {
            break;
        }
    }

    // 當(dāng)這個(gè)線程醒來,會(huì)嘗試拿鎖, 當(dāng) acquireQueued 返回 false 就是拿到鎖了
    // interruptMode != THROW_IE -> 表示這個(gè)線程沒有成功將 node 入隊(duì), 但 signal 執(zhí)行了 enq 方法讓其入隊(duì)了
    // 將這個(gè)變量設(shè)置成 REINTERRUPT
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE) {
        interruptMode = REINTERRUPT;
    }

    // 如果 node 的下一個(gè)等待者不是 null, 則進(jìn)行清理,清理 Condition 隊(duì)列上的節(jié)點(diǎn)
    // 如果是 null ,就沒有什么好清理的了
    if (node.nextWaiter != null) {// clean up if
        cancelled
    }

    unlinkCancelledWaiters();

    // 如果線程被中斷了,需要拋出異常.或者什么都不做
    if (interruptMode != 0) {
        reportInterruptAfterWait(interruptMode);
    }
}

addConditionWaiter

這個(gè)方法的主要作用是把當(dāng)前線程封裝成 Node,添加到等待隊(duì)列.這里的隊(duì)列不再是雙向鏈表,而是單向鏈表

private Node addConditionWaiter() {
    Node t = lastWaiter;

    // 如果 lastWaiter 不等于空并且 waitStatus 不等于 CONDITION 時(shí)挑庶,把沖好這個(gè)節(jié)點(diǎn)從鏈表中移除
    if (t != null && t.waitStatus != Node.CONDITION) {
        unlinkCancelledWaiters();
        t = lastWaiter;
    }

    //構(gòu)建一個(gè) Node言秸,waitStatus=CONDITION。這里的鏈表是一個(gè)單向的迎捺,所以相比 AQS 來說會(huì)簡(jiǎn)單很多
    Node node = new Node(Thread.currentThread(), Node.CONDITION);

    if (t == null) {
        firstWaiter = node;
    } else {
        t.nextWaiter = node;
    }

    lastWaiter = node;
    return node;
}

圖解分析

執(zhí)行完 addConditionWaiter 這個(gè)方法之后,就會(huì)產(chǎn)生一個(gè)這樣的 condition 隊(duì)列

image

fullyRelease

fullRelease,就是徹底的釋放鎖,什么叫徹底呢,就是如果當(dāng)前鎖存在多次重入,那么在這個(gè)方法中只需要釋放一次就會(huì)把所有的重入次數(shù)歸零

final int fullyRelease(Node node) {
    boolean failed = true;
    try {
        int savedState = getState();

        //獲得重入的次數(shù)
        if (release(savedState)) {//釋放鎖并且喚醒下一個(gè)同步隊(duì)列中的線程
            failed = false;
            return savedState;
        } else {
            throw new IllegalMonitorStateException();
        }
    } finally {
        if (failed) {
            node.waitStatus = Node.CANCELLED;
        }
    }
}

圖解分析

此時(shí),同步隊(duì)列會(huì)觸發(fā)鎖的釋放和重新競(jìng)爭(zhēng).ThreadB 獲得了鎖


image

isOnSyncQueue

判斷當(dāng)前節(jié)點(diǎn)是否在同步隊(duì)列中,返回 false 表示不在,返回 true 表示在

如果不在 AQS 同步隊(duì)列,說明當(dāng)前節(jié)點(diǎn)沒有喚醒去爭(zhēng)搶同步鎖,所以需要把當(dāng)前線程阻塞起來,直到其他的線程調(diào)用 signal 喚醒

如果在 AQS 同步隊(duì)列,意味著它需要去競(jìng)爭(zhēng)同步鎖去獲得執(zhí)行程序執(zhí)行權(quán)限

為什么要做這個(gè)判斷呢举畸?原因是在 condition 隊(duì)列中的節(jié)點(diǎn)會(huì)重新加入到 AQS 隊(duì)列去競(jìng)爭(zhēng)鎖.也就是當(dāng)調(diào)用 signal 的時(shí)候,會(huì)把當(dāng)前節(jié)點(diǎn)從 condition 隊(duì)列轉(zhuǎn)移到 AQS 隊(duì)列

  • 大家思考一下,基于現(xiàn)在的邏輯結(jié)構(gòu).如何去判斷 ThreadA 這個(gè)節(jié)點(diǎn)是否存在于 AQS 隊(duì)列中呢?
  • 如果 ThreadA 的 waitStatus 的狀態(tài)為 CONDITION,說明它存在于 condition 隊(duì)列中,不在 AQS 隊(duì)列.因?yàn)?AQS 隊(duì)列的狀態(tài)一定不可能有 CONDITION
  • 如果 node.prev 為空,說明也不存在于 AQS 隊(duì)列,原因是 prev=null 在 AQS 隊(duì)列中只有一種可能性,就是它是 head 節(jié)點(diǎn),head 節(jié)點(diǎn)意味著它是獲得鎖的節(jié)點(diǎn).
  • 如果 node.next 不等于空,說明一定存在于 AQS 隊(duì)列中,因?yàn)橹挥?AQS 隊(duì)列才會(huì)存在 next 和 prev 的關(guān)系
  • findNodeFromTail,表示從 tail 節(jié)點(diǎn)往前掃描 AQS 隊(duì)列,一旦發(fā)現(xiàn) AQS 隊(duì)列的節(jié)點(diǎn)和當(dāng)前節(jié)點(diǎn)相等,說明節(jié)點(diǎn)一定存在于 AQS 隊(duì)列中
final boolean isOnSyncQueue(Node node) {
    if (node.waitStatus == Node.CONDITION || node.prev == null) {
        return false;
    }
    if (node.next != null) {// If has successor, it must be on queue
        return true;
    }
    return findNodeFromTail(node);
}

Condition.signal

await 方法會(huì)阻塞 ThreadA,然后 ThreadB 搶占到了鎖獲得了執(zhí)行權(quán)限,這個(gè)時(shí)候在 ThreadB 中調(diào)用了 Condition 的 signal() 方法,將會(huì)喚醒在等待隊(duì)列中節(jié)點(diǎn)

public final void signal() {
    if (!isHeldExclusively()) { //先判斷當(dāng)前線程是否獲得了鎖凳枝,這個(gè)判斷比較簡(jiǎn)單抄沮,直接用獲得鎖的線程和當(dāng)前線程相比即可
        throw new IllegalMonitorStateException();
    }

    Node first = firstWaiter; // 拿到 Condition 隊(duì)列上第一個(gè)節(jié)點(diǎn)
    if (first != null) {
        doSignal(first);
    }
}

Condition.doSignal

對(duì) condition 隊(duì)列中從首部開始的第一個(gè) condition 狀態(tài)的節(jié)點(diǎn),執(zhí)行 transferForSignal 操作,將 node 從 condition 隊(duì)列中轉(zhuǎn)換到 AQS 隊(duì)列中,同時(shí)修改 AQS 隊(duì)列中原先尾節(jié)點(diǎn)的狀態(tài)

private void doSignal(Node first) {
    do {//從 Condition 隊(duì)列中刪除 first 節(jié)點(diǎn)
        if ( (firstWaiter = first.nextWaiter) == null) {
            lastWaiter = null; // 將 next 節(jié)點(diǎn)設(shè)置成 null
        }

        first.nextWaiter = null;
    } while (!transferForSignal(first) && (first = firstWaiter) != null);
}

AQS.transferForSignal

該方法先是 CAS 修改了節(jié)點(diǎn)狀態(tài),如果成功,就將這個(gè)節(jié)點(diǎn)放到 AQS 隊(duì)列中,然后喚醒這個(gè)節(jié)點(diǎn)上的線程.此時(shí),那個(gè)節(jié)點(diǎn)就會(huì)在 await 方法中蘇醒

final boolean transferForSignal(Node node) {

    //更新節(jié)點(diǎn)的狀態(tài)為 0,如果更新失敗岖瑰,只有一種可能就是節(jié)點(diǎn)被 CANCELLED 了
    if (!compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
        return false;
    }

    //調(diào)用 enq叛买,把當(dāng)前節(jié)點(diǎn)添加到 AQS 隊(duì)列。并且返回返回按當(dāng)前節(jié)點(diǎn)的上一個(gè)節(jié)點(diǎn)蹋订,也就是原 tail 節(jié)點(diǎn)
    Node p = enq(node);
    int ws = p.waitStatus;

    // 如果上一個(gè)節(jié)點(diǎn)的狀態(tài)被取消了, 或者嘗試設(shè)置上一個(gè)節(jié)點(diǎn)的狀態(tài)為 SIGNAL 失敗了(SIGNAL 表示:他的 next 節(jié)點(diǎn)需要停止阻塞 ),
    if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)) {
        LockSupport.unpark(node.thread); // 喚醒節(jié)點(diǎn)上的線程
    }

    //如果 node 的 prev 節(jié)點(diǎn)已經(jīng)是 signal 狀態(tài)率挣,那么被阻塞的 ThreadA 的喚醒工作由 AQS 隊(duì)列來完成
    return true;
}

圖解分析

執(zhí)行完 doSignal 以后,會(huì)把 condition 隊(duì)列中的節(jié)點(diǎn)轉(zhuǎn)移到 aqs 隊(duì)列上,邏輯結(jié)構(gòu)圖如下

這個(gè)時(shí)候會(huì)判斷 ThreadA 的 prev 節(jié)點(diǎn)也就是 head 節(jié)點(diǎn)的 waitStatus,如果大于 0 或者設(shè)置 SIGNAL 失敗,表示節(jié)點(diǎn)被設(shè)置成了 CANCELLED 狀態(tài).這個(gè)時(shí)候會(huì)喚醒 ThreadA 這個(gè)線程.否則就基于 AQS 隊(duì)列的機(jī)制來喚醒,也就是等到 ThreadB 釋放鎖之后來喚醒 ThreadA

image

被阻塞的線程喚醒后的邏輯

前面在分析 await 方法時(shí),線程會(huì)被阻塞.而通過 signal 被喚醒之后又繼續(xù)回到上次執(zhí)行的邏輯中標(biāo)注為紅色部分的代碼 checkInterruptWhileWaiting 這個(gè)方法是干嘛呢?其實(shí)從名字就可以看出來,就是 ThreadA 在 condition 隊(duì)列被阻塞的過程中,有沒有被其他線程觸發(fā)過中斷請(qǐng)求

public final void await() throws InterruptedException {
    if (Thread.interrupted()) {
        throw new InterruptedException();
    }
    Node node = addConditionWaiter();
    int savedState = fullyRelease(node);
    int interruptMode = 0;

    while (!isOnSyncQueue(node)) {
        LockSupport.park(this);
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) {
            break;
        }
    }

    if (acquireQueued(node, savedState) && interruptMode != THROW_IE) {
        interruptMode = REINTERRUPT;
    }

    if (node.nextWaiter != null) {//clean up if cancelled
        unlinkCancelledWaiters();
    }

    if (interruptMode != 0) {
        reportInterruptAfterWait(interruptMode);
    }
}

checkInterruptWhileWaiting

如果當(dāng)前線程被中斷,則調(diào)用 transferAfterCancelledWait 方法判斷后續(xù)的處理應(yīng)該是拋出 InterruptedException 還是重新中斷

這里需要注意的地方是,如果第一次 CAS 失敗了,則不能判斷當(dāng)前線程是先進(jìn)行了中斷還是先進(jìn)行了 signal 方法的調(diào)用,可能是先執(zhí)行了 signal 然后中斷,也可能是先執(zhí)行了中斷,后執(zhí)行了 signal,當(dāng)然,這兩個(gè)操作肯定是發(fā)生在 CAS 之前.這時(shí)需要做的就是等待當(dāng)前線程的 node 被添加到 AQS 隊(duì)列后,也就是 enq 方法返回后,返回 false 告訴 checkInterruptWhileWaiting 方法返回 REINTERRUPT(1),后續(xù)進(jìn)行重新中斷

簡(jiǎn)單來說,該方法的返回值代表當(dāng)前線程是否在 park 的時(shí)候被中斷喚醒,如果為 true 表示中斷在 signal 調(diào)用之前,signal 還未執(zhí)行,那么這個(gè)時(shí)候會(huì)根據(jù) await 的語義,在 await 時(shí)遇到中斷需要拋出 interruptedException,返回 true 就是告訴 checkInterruptWhileWaiting 返回 THROW_IE(-1).如果返回 false,否則表示 signal 已經(jīng)執(zhí)行過了,只需要重新響應(yīng)中斷即可

private int checkInterruptWhileWaiting(Node node) {
    return Thread.interrupted() ? (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) : 0;
}

final boolean transferAfterCancelledWait(Node node) {
    // 使用 cas 修改節(jié)點(diǎn)狀態(tài)露戒,如果還能修改成功难礼,說明線程被中斷時(shí),signal 還沒有被調(diào)用
    // 這里有一個(gè)知識(shí)點(diǎn)玫锋,就是線程被喚醒蛾茉,并不一定是在 java 層面執(zhí)行了 locksupport.unpark,
    // 也可能是調(diào)用了線程的 interrupt () 方法撩鹿,這個(gè)方法會(huì)更新一個(gè)中斷標(biāo)識(shí)谦炬,并且會(huì)喚醒處于阻塞狀態(tài)下的線程
    if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
        enq(node); //如果 cas 成功,則把 node 添加到 AQS 隊(duì)列
        return true;
    }

    // 如果 cas 失敗,則判斷當(dāng)前 node 是否已經(jīng)在 AQS 隊(duì)列上键思,如果不在础爬,則讓給其他線程執(zhí)行
    // 當(dāng) node 被觸發(fā)了 signal 方法時(shí),node 就會(huì)被加到 aqs 隊(duì)列上
    while (!isOnSyncQueue(node)) {//循 環(huán)檢測(cè) node 是否已經(jīng)成功添加到 AQS 隊(duì)列中, 如果沒有吼鳞,則通過 yield看蚜,
        Thread.yield();
    }

    return false;
}

acquireQueued

這個(gè)方法在講 aqs 的時(shí)候說過,是的當(dāng)前被喚醒的節(jié)點(diǎn) ThreadA 去搶占同步鎖.并且要恢復(fù)到原本的重入次數(shù)狀態(tài).調(diào)用完這個(gè)方法之后,AQS 隊(duì)列的狀態(tài)如下將 head 節(jié)點(diǎn)的 waitStatus 設(shè)置為-1,Signal 狀態(tài)

image

reportInterruptAfterWait

根據(jù) checkInterruptWhileWaiting 方法返回的中斷標(biāo)識(shí)來進(jìn)行中斷上報(bào)
如果是 THROW_IE,則拋出中斷異常
如果是 REINTERRUPT,則重新響應(yīng)中斷

private void reportInterruptAfterWait(int interruptMode) throws InterruptedException {
    if (interruptMode == THROW_IE) {
        throw new InterruptedException();
    } else if (interruptMode == REINTERRUPT) {
        selfInterrupt();
    }
}

Condition 總結(jié)

await 和 signal 的總結(jié)

我把前面的整個(gè)分解的圖再通過一張整體的結(jié)構(gòu)圖來表述,線程 awaitThread 先通過 lock.lock() 方法獲取鎖成功后調(diào)用了 condition.await 方法進(jìn)入等待隊(duì)列,而另一個(gè)線程 signalThread 通過 lock.lock() 方法獲取鎖成功后調(diào)用了 condition.signal 或者 signalAll 方法,使得線程 awaitThread 能夠有機(jī)會(huì)移入到同步隊(duì)列中,當(dāng)其他線程釋放 lock 后使得線程 awaitThread 能夠有機(jī)會(huì)獲取 lock,從而使得線程 awaitThread 能夠從 await 方法中退出執(zhí)行后續(xù)操作.如果 awaitThread 獲取 lock 失敗會(huì)直接進(jìn)入到同步隊(duì)列

image
  • 阻塞:await() 方法中,在線程釋放鎖資源之后,如果節(jié)點(diǎn)不在 AQS 等待隊(duì)列,則阻塞當(dāng)前線程,如果在等待隊(duì)列,則自旋等待嘗試獲取鎖
  • 釋放:signal() 后,節(jié)點(diǎn)會(huì)從 condition 隊(duì)列移動(dòng)到 AQS 等待隊(duì)列,則進(jìn)入正常鎖的獲取流程

了解完 Lock 以及 Condition 之后,意味著我們對(duì)于 J.U.C 里面的鎖機(jī)制以及線程通信機(jī)制有了一個(gè)全面和深入的了解,接下來我們來看看其他比較常用的一些工具

限制

JUC 中提供了幾個(gè)比較用的并發(fā)工具類, 比如 CountDownLatch,CyclicBarrier,Semaphore. 其實(shí)在以前我們課堂的演示代碼中,或多或少都有用到過這樣一些 api,接下來我們會(huì)帶大家去深入研究一些常用的 api

CountDownLatch

countdownlatch 是一個(gè)同步工具類,它允許一個(gè)或多個(gè)線程一直等待,直到其他線程的操作執(zhí)行完畢再執(zhí)行.從命名可以解讀到 countdown 是倒數(shù)的意思,類似于我們倒計(jì)時(shí)的概念

countdownlatch 提供了兩個(gè)方法,一個(gè)是 countDown,一個(gè)是 await,countdownlatch 初始化的時(shí)候需要傳入一個(gè)整數(shù),在這個(gè)整數(shù)倒數(shù)到 0 之前,調(diào)用了 await 方法的程序都必須要等待,然后通過 countDown 來倒數(shù)

使用案例

public static void main(String[] args) throws InterruptedException {
    CountDownLatch countDownLatch = new CountDownLatch(3);
    new Thread(() -> {
        System.out.println("" + Thread.currentThread().getName() + "-執(zhí)行中");
        countDownLatch.countDown();
        System.out.println("" + Thread.currentThread().getName() + "-執(zhí)行完畢");
    }, "t1").start();

    new Thread(() -> {
        System.out.println("" + Thread.currentThread().getName() + "-執(zhí)行中");
        countDownLatch.countDown();
        System.out.println("" + Thread.currentThread().getName() + "-執(zhí)行完畢");
    }, "t2").start();

    new Thread(() -> {
        System.out.println("" + Thread.currentThread().getName() + "-執(zhí)行中");
        countDownLatch.countDown();
        System.out.println("" + Thread.currentThread().getName() + "-執(zhí)行完畢");
    }, "t3").start();

    countDownLatch.await();
    System.out.println("所有線程執(zhí)行完畢");
}

從代碼的實(shí)現(xiàn)來看,有點(diǎn)類似 join 的功能,但是比 join 更加靈活.CountDownLatch 構(gòu)造函數(shù)會(huì)接收一個(gè) int 類型的參數(shù)作為計(jì)數(shù)器的初始值,當(dāng)調(diào)用 CountDownLatch 的 countDown 方法時(shí),這個(gè)計(jì)數(shù)器就會(huì)減一.通過 await 方法去阻塞去阻塞主流程

image

模擬高并發(fā)場(chǎng)景

static CountDownLatch countDownLatch = new CountDownLatch(1);

@Override
public void run() {
    try {
        countDownLatch.await();
    } catch (InterruptedException e) {
        e.printStackTrace();
    }

    System.out.println("ThreadName:" + Thread.currentThread().getName());
}

public static void main(String[] args) throws InterruptedException {
    for (int i = 0; i < 1000; i++) {
        new Demo().start();
    }
    countDownLatch.countDown();
}

總的來說,凡事涉及到需要指定某個(gè)人物在執(zhí)行之前,要等到前置人物執(zhí)行完畢之后才執(zhí)行的場(chǎng)景,都可以使用 CountDownLatch

CountDownLatch 源碼分析

image

對(duì)于 CountDownLatch,我們僅僅需要關(guān)心兩個(gè)方法,一個(gè)是 countDown() 方法,另一個(gè)是 await() 方法.countDown() 方法每次調(diào)用都會(huì)將 state 減 1,直到 state 的值為 0;而 await 是一個(gè)阻塞方法,當(dāng) state 減 為 0 的時(shí)候,await 方法才會(huì)返回.await 可以被多個(gè)線程調(diào)用,大家在這個(gè)時(shí)候腦子里要有個(gè)圖:所有調(diào)用了 await 方法的線程阻塞在 AQS 的阻塞隊(duì)列中,等待條件滿足(state == 0),將線程從隊(duì)列中一個(gè)個(gè)喚醒過來

acquireSharedInterruptibly

countdownlatch 也用到了 AQS,在 CountDownLatch 內(nèi)部寫了一個(gè) Sync 并且繼承了 AQS 這個(gè)抽象類重寫了 AQS 中的共享鎖方法.首先看到下面這個(gè)代碼,這塊代碼主要是判斷是否到了共享鎖;( 在
CountDownLatch,使用的是共享鎖機(jī)制,因?yàn)?CountDownLatch 并不需要實(shí)現(xiàn)互斥的特性 )

public final void acquireSharedInterruptibly(int arg) throws InterruptedException {
    if (Thread.interrupted()) {
        throw new InterruptedException();
    }

    if (tryAcquireShared(arg) < 0) {//state 如果不等于 0赔桌,說明當(dāng)前線程需要加入到共享鎖隊(duì)列中
        doAcquireSharedInterruptibly(arg);
    }
}

doAcquireSharedInterruptibly

  • addWaiter 設(shè)置為 shared 模式.
  • tryAcquire 和 tryAcquireShared 的返回值不同,因此會(huì)多出一個(gè)判斷過程
  • 在判斷前驅(qū)節(jié)點(diǎn)是頭節(jié)點(diǎn)后,調(diào)用了 setHeadAndPropagate 方法,而不是簡(jiǎn)單的更新一下頭節(jié)點(diǎn)
private void doAcquireSharedInterruptibly(int arg) throws InterruptedException {
    final Node node = addWaiter(Node.SHARED);

    //創(chuàng)建一個(gè)共享模式的節(jié)點(diǎn)添加到隊(duì)列中
    boolean failed = true;
    try {
        for (; ; ) {
            final Node p = node.predecessor();
            if (p == head) {
                int r = tryAcquireShared(arg);// 就判斷嘗試獲取鎖
                if (r >= 0) {//r>=0 表示獲取到了執(zhí)行權(quán)限供炎,這個(gè)時(shí)候因?yàn)?state!=0,所以不會(huì)執(zhí)行這段代碼
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    failed = false;
                    return;
                }
            }

            //阻塞線程
            if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) {
                throw new InterruptedException();
            }
        }
    } finally {
        if (failed) {
            cancelAcquire(node);
        }
    }
}

圖解分析

加入這個(gè)時(shí)候有 3 個(gè)線程調(diào)用了 await 方法,由于這個(gè)時(shí)候 state 的值還不為 0,所以這三個(gè)線程都會(huì)加入到 AQS 隊(duì)列中.并且三個(gè)線程都處于阻塞狀態(tài)

image

CountDownLatch.countDown

由于線程被 await 方法阻塞了,所以只有等到 countdown 方法使得 state=0 的時(shí)候才會(huì)被喚醒,我們來看看 countdown 做了什么

  • 只有當(dāng) state 減為 0 的時(shí)候,tryReleaseShared 才返回 true, 否則只是簡(jiǎn)單的 state = state - 1
  • 如果 state=0, 則調(diào)用 doReleaseShared 喚醒處于 await 狀態(tài)下的線程
public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {
        doReleaseShared();
        return true;
    }
    return false;
}

// 用自旋的方法實(shí)現(xiàn) state 減 1
protected boolean
tryReleaseShared(int releases) {
    // Decrement count; signal when transition to zero
    for (; ; ) {
        int c = getState();
        if (c == 0) {
            return false;
        }

        int nextc = c - 1;
        if (compareAndSetState(c, nextc)) {
            return nextc == 0;
        }
    }
}

AQS. doReleaseShared

共享鎖的釋放和獨(dú)占鎖的釋放有一定的差別

前面喚醒鎖的邏輯和獨(dú)占鎖是一樣,先判斷頭結(jié)點(diǎn)是不是 SIGNAL 狀態(tài),如果是,則修改為 0,并且喚醒頭結(jié)點(diǎn)的下一個(gè)節(jié)點(diǎn)

// PROPAGATE:標(biāo)識(shí)為 PROPAGATE 狀態(tài)的節(jié)點(diǎn)疾党,是共享鎖模式下的節(jié)點(diǎn)狀態(tài)音诫,處于這個(gè)狀態(tài)下的節(jié)點(diǎn),會(huì)對(duì)線程的喚醒進(jìn)行傳播
private void doReleaseShared() {
    for (; ; ) {
        Node h = head;
        if (h != null && h != tail) {
            int ws = h.waitStatus;
            if (ws == Node.SIGNAL) {
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) {
                    continue;
                }

                // loop to recheck cases
                unparkSuccessor(h);
            }
            // 這個(gè) CAS 失敗的場(chǎng)景是:執(zhí)行到這里的時(shí)候雪位,剛好有一個(gè)節(jié)點(diǎn)入隊(duì)竭钝,入隊(duì)會(huì)將這個(gè) ws 設(shè)置為 - 1
            else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) {
                continue;
            }
            // loop on failed CAS
        }

        // 如果到這里的時(shí)候,前面喚醒的線程已經(jīng)占領(lǐng)了 head雹洗,那么再循環(huán)
        // 通過檢查頭節(jié)點(diǎn)是否改變了香罐,如果改變了就繼續(xù)循環(huán)
        if (h == head) {
            // loop if head changed
            break;
        }
    }
}
  • h == head:說明頭節(jié)點(diǎn)還沒有被剛剛用 unparkSuccessor 喚醒的線程(這里可以理解為 ThreadB) 占有,此時(shí) break 退出循環(huán).
  • h != head:頭節(jié)點(diǎn)被剛剛喚醒的線程(這里可以理解為 ThreadB) 占有,那么這里重新進(jìn)入下一輪循環(huán),喚醒下一個(gè)節(jié)點(diǎn)(這里是 ThreadB ).我們知道,等到 ThreadB 被喚醒后,其實(shí)是會(huì)主動(dòng)喚醒 ThreadC...

doAcquireSharedInterruptibly

一旦 ThreadA 被喚醒,代碼又會(huì)繼續(xù)回到 doAcquireSharedInterruptibly 中來執(zhí)行.如果當(dāng)前 state 滿足 =0 的條件,則會(huì)執(zhí)行 setHeadAndPropagate 方法

private void doAcquireSharedInterruptibly(int arg) throws InterruptedException {
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;

    try {
        for (; ; ) {//被喚醒的線程進(jìn)入下一次循環(huán)繼續(xù)判斷
            final Node p = node.predecessor();

            if (p == head) {
                int r = tryAcquireShared(arg);
                if (r >= 0) {
                    setHeadAndPropagate(node, r);
                    p.next = null; //把當(dāng)前節(jié)點(diǎn)移除 aqs 隊(duì)列
                    failed = false;
                    return;
                }
            }

            if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) {
                throw new InterruptedException();
            }
        }
    } finally {
        if (failed) {
            cancelAcquire(node);
        }
    }
}

setHeadAndPropagate

這個(gè)方法的主要作用是把被喚醒的節(jié)點(diǎn),設(shè)置成 head 節(jié) 點(diǎn). 然后繼續(xù)喚醒隊(duì)列中的其他線程.由于現(xiàn)在隊(duì)列中有 3 個(gè)線程處于阻塞狀態(tài),一旦 ThreadA 被喚醒,并且設(shè)置為 head 之后,會(huì)繼續(xù)喚醒后續(xù)的 ThreadB

private void setHeadAndPropagate(Node node, int propagate) {
    Node h = head; // Record old head for check below
    setHead(node);
    if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0) {
        Node s = node.next;
        if (s == null || s.isShared()) {
            doReleaseShared();
        }
    }
}
image

Semaphore

semaphore 也就是我們常說的信號(hào)燈,semaphore 可以控制同時(shí)訪問的線程個(gè)數(shù),通過 acquire 獲取一個(gè)許可,如果沒有就等待,通過 release 釋放一個(gè)許可.有點(diǎn)類似限流的作用.叫信號(hào)燈的原因也和他的用處有關(guān),比如某商場(chǎng)就 5 個(gè)停車位,每個(gè)停車位只能停一輛車,如果這個(gè)時(shí)候來了 10 輛車,必須要等前面有空的車位才能進(jìn)入.

使用案例

public class Test {
    public static void main(String[] args) {
        Semaphore semaphore = new Semaphore(5);
        for (int i = 0; i < 10; i++) {
            new Car(i, semaphore).start();
        }
    }

    static class Car extends Thread {
        private int num;
        private Semaphore semaphore;

        public Car(int num, Semaphore semaphore) {
            this.num = num;
            this.semaphore = semaphore;
        }

        public void run() {
            try {
                semaphore.acquire();//獲取一個(gè)許可
                System.out.println("第" + num + "占用一個(gè)停車位");

                TimeUnit.SECONDS.sleep(2);
                System.out.println("第" + num + "倆車走嘍");

                semaphore.release();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

使用場(chǎng)景

Semaphore 比較常見的就是用來做限流操作了

Semaphore 源碼分析

從 Semaphore 的功能來看,我們基本能猜測(cè)到它的底層實(shí)現(xiàn)一定是基于 AQS 的共享所,因?yàn)樾枰獙?shí)現(xiàn)多個(gè)線程共享一個(gè)領(lǐng)排池

創(chuàng)建 Semaphore 實(shí)例的時(shí)候,需要一個(gè)參數(shù) permits,這個(gè)基本上可以確定是設(shè)置給 AQS 的 state 的,然后每個(gè)線程調(diào)用 acquire 的時(shí)候,執(zhí)行 state = state - 1,release 的時(shí)候執(zhí)行 state = state + 1,當(dāng)然,acquire 的時(shí)候,如果 state = 0,說明沒有資源了,需要等待其他線程 release

Semaphore 分公平策略和非公平策略

FairSync

static final class FairSync extends Sync {
    private static final long serialVersionUID = 2014338818796000944L;

    FairSync(int permits) {
        super(permits);
    }

    protected int
    tryAcquireShared(int acquires) {
        for (; ; ) {
            // 區(qū)別就在于是不是會(huì)先判斷是否有線程在排隊(duì),然后才進(jìn)行 CAS 減操作
            if (hasQueuedPredecessors()) {
                return -1;
            }

            int available = getState();
            int remaining = available - acquires;
            if (remaining < 0 || compareAndSetState(available, remaining)) {
                return remaining;
            }
        }
    }
}

NofairSync

通過對(duì)比發(fā)現(xiàn)公平和非公平的區(qū)別就在于是否多了一個(gè) hasQueuedPredecessors 的判斷

static final class NonfairSync extends Sync {
    private static final long serialVersionUID = -2694183684443567898L;

    NonfairSync(int permits) {
        super(permits);
    }

    protected int
    tryAcquireShared(int acquires) {
        return nonfairTryAcquireShared(acquires);
    }
}

final int nonfairTryAcquireShared(int acquires) {
    for (; ; ) {
        int available = getState();
        int remaining = available -
                acquires;
        if (remaining < 0 || compareAndSetState(available, remaining)) {
            return remaining;
        }
    }
}

由于后面的代碼和 CountDownLatch 的是完全一樣,都是基于共享鎖的實(shí)現(xiàn),所以也就沒必要再花時(shí)間來分析了

CyclicBarrier

CyclicBarrier 的字面意思是可循環(huán)使用(Cyclic) 的屏障(Barrier).它要做的事情是,讓一組線程到達(dá)一個(gè)屏障(也可以叫同步點(diǎn) ) 時(shí)被阻塞,直到最后一個(gè)線程到達(dá)屏障時(shí),屏障才會(huì)開門,所有被屏障攔截的線程才會(huì)繼續(xù)工作.CyclicBarrier 默認(rèn)的構(gòu)造方法是 CyclicBarrier(int parties),其參數(shù)表示屏障攔截的線程數(shù)量,每個(gè)線程調(diào)用 await 方法告訴 CyclicBarrier 當(dāng)前線程已經(jīng)到達(dá)了屏障,然后當(dāng)前線程被阻塞

使用場(chǎng)景

當(dāng)存在需要所有的子任務(wù)都完成時(shí),才執(zhí)行主任務(wù),這個(gè)時(shí)候就可以選擇使用 CyclicBarrier

使用案例

DataImportThread

public class DataImportThread extends Thread {
    private CyclicBarrier cyclicBarrier;
    private String path;

    public DataImportThread(CyclicBarrier cyclicBarrier, String path) {
        this.cyclicBarrier = cyclicBarrier;
        this.path = path;
    }

    @Override
    public void run() {
        System.out.println("開始導(dǎo)入:" + path + " 位置的數(shù)據(jù)");
        try {
            cyclicBarrier.await();//阻塞
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (BrokenBarrierException e) {
            e.printStackTrace();
        }
    }
}

CycliBarrierDemo

public class CycliBarrierDemo extends Thread {
    @Override
    public void run() {
        System.out.println("開始進(jìn)行數(shù)據(jù)分析");
    }

    public static void main(String[] args) {
        CyclicBarrier cycliBarrier = new CyclicBarrier(3, new CycliBarrierDemo());

        new Thread(new DataImportThread(cycliBarrier, "file 1")).start();

        new Thread(new DataImportThread(cycliBarrier, "file 2")).start();

        new Thread(new DataImportThread(cycliBarrier, "file 3")).start();
    }
}

注意點(diǎn)

  • 對(duì)于指定計(jì)數(shù)值 parties,若由于某種原因,沒有足夠的線程調(diào)用 CyclicBarrier 的 await,則所有調(diào)用 await 的線程都會(huì)被阻塞
  • 同樣的 CyclicBarrier 也可以調(diào)用 await(timeout, unit),設(shè)置超時(shí)時(shí)間,在設(shè)定時(shí)間內(nèi),如果沒有足夠線程到達(dá),則解除阻塞狀態(tài),繼續(xù)工作
  • 通過 reset 重置計(jì)數(shù),會(huì)使得進(jìn)入 await 的線程出現(xiàn) BrokenBarrierException
  • 如果采用是 CyclicBarrier(int parties, Runnable barrierAction) 構(gòu)造方法,執(zhí)行 barrierAction 操作的是最后一個(gè)到達(dá)的線程

實(shí)現(xiàn)原理

CyclicBarrier 相比 CountDownLatch 來說,要簡(jiǎn)單很多,源碼實(shí)現(xiàn)是基于 ReentrantLock 和 Condition 的組合使用.看如下示意圖,CyclicBarrier 和 CountDownLatch 是不是很像,只是 CyclicBarrier 可以有不止一個(gè)柵欄,因?yàn)樗臇艡?Barrier) 可以重復(fù)使用(Cyclic)

image
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末时肿,一起剝皮案震驚了整個(gè)濱河市庇茫,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌嗜侮,老刑警劉巖港令,帶你破解...
    沈念sama閱讀 212,718評(píng)論 6 492
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件啥容,死亡現(xiàn)場(chǎng)離奇詭異锈颗,居然都是意外死亡,警方通過查閱死者的電腦和手機(jī)咪惠,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,683評(píng)論 3 385
  • 文/潘曉璐 我一進(jìn)店門击吱,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人遥昧,你說我怎么就攤上這事覆醇。” “怎么了炭臭?”我有些...
    開封第一講書人閱讀 158,207評(píng)論 0 348
  • 文/不壞的土叔 我叫張陵永脓,是天一觀的道長(zhǎng)。 經(jīng)常有香客問我鞋仍,道長(zhǎng)常摧,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 56,755評(píng)論 1 284
  • 正文 為了忘掉前任,我火速辦了婚禮落午,結(jié)果婚禮上谎懦,老公的妹妹穿的比我還像新娘。我一直安慰自己溃斋,他們只是感情好界拦,可當(dāng)我...
    茶點(diǎn)故事閱讀 65,862評(píng)論 6 386
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著梗劫,像睡著了一般享甸。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上在跳,一...
    開封第一講書人閱讀 50,050評(píng)論 1 291
  • 那天枪萄,我揣著相機(jī)與錄音,去河邊找鬼猫妙。 笑死瓷翻,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的割坠。 我是一名探鬼主播齐帚,決...
    沈念sama閱讀 39,136評(píng)論 3 410
  • 文/蒼蘭香墨 我猛地睜開眼,長(zhǎng)吁一口氣:“原來是場(chǎng)噩夢(mèng)啊……” “哼彼哼!你這毒婦竟也來了对妄?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 37,882評(píng)論 0 268
  • 序言:老撾萬榮一對(duì)情侶失蹤敢朱,失蹤者是張志新(化名)和其女友劉穎剪菱,沒想到半個(gè)月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體拴签,經(jīng)...
    沈念sama閱讀 44,330評(píng)論 1 303
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡孝常,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,651評(píng)論 2 327
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了蚓哩。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片构灸。...
    茶點(diǎn)故事閱讀 38,789評(píng)論 1 341
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖岸梨,靈堂內(nèi)的尸體忽然破棺而出喜颁,到底是詐尸還是另有隱情,我是刑警寧澤曹阔,帶...
    沈念sama閱讀 34,477評(píng)論 4 333
  • 正文 年R本政府宣布半开,位于F島的核電站,受9級(jí)特大地震影響赃份,放射性物質(zhì)發(fā)生泄漏寂拆。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 40,135評(píng)論 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望漓库。 院中可真熱鬧恃慧,春花似錦、人聲如沸渺蒿。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,864評(píng)論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽茂装。三九已至怠蹂,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間少态,已是汗流浹背城侧。 一陣腳步聲響...
    開封第一講書人閱讀 32,099評(píng)論 1 267
  • 我被黑心中介騙來泰國(guó)打工, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留彼妻,地道東北人嫌佑。 一個(gè)月前我還...
    沈念sama閱讀 46,598評(píng)論 2 362
  • 正文 我出身青樓,卻偏偏與公主長(zhǎng)得像侨歉,于是被迫代替她去往敵國(guó)和親屋摇。 傳聞我的和親對(duì)象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 43,697評(píng)論 2 351

推薦閱讀更多精彩內(nèi)容