徹底弄懂AQS

先來看看 AQS 有哪些屬性

// 頭結(jié)點(diǎn)吼和,你直接把它當(dāng)做 當(dāng)前持有鎖的線程 可能是最好理解的
private transient volatile Node head;

// 阻塞的尾節(jié)點(diǎn),每個(gè)新的節(jié)點(diǎn)進(jìn)來,都插入到最后,也就形成了一個(gè)鏈表
private transient volatile Node tail;

// 這個(gè)是最重要的,代表當(dāng)前鎖的狀態(tài)泽腮,0代表沒有被占用,大于 0 代表有線程持有當(dāng)前鎖
// 這個(gè)值可以大于 1,是因?yàn)殒i可以重入遵馆,每次重入都加上 1
private volatile int state;

// 代表當(dāng)前持有獨(dú)占鎖的線程,舉個(gè)最重要的使用例子戈二,因?yàn)殒i可以重入
// reentrantLock.lock()可以嵌套調(diào)用多次,所以每次用這個(gè)來判斷當(dāng)前線程是否已經(jīng)擁有了鎖
// if (currentThread == getExclusiveOwnerThread()) {state++}
private transient Thread exclusiveOwnerThread;
image.png

等待隊(duì)列中每個(gè)線程被包裝成一個(gè) Node 實(shí)例绒北,數(shù)據(jù)結(jié)構(gòu)是鏈表峻汉,一起看看源碼:

static final class Node {
    // 標(biāo)識節(jié)點(diǎn)當(dāng)前在共享模式下
    static final Node SHARED = new Node();
    // 標(biāo)識節(jié)點(diǎn)當(dāng)前在獨(dú)占模式下
    static final Node EXCLUSIVE = null;

    // ======== 下面的幾個(gè)int常量是給waitStatus用的 ===========
    /** waitStatus value to indicate thread has cancelled */
    // 代碼此線程取消了爭搶這個(gè)鎖
    static final int CANCELLED =  1;
    /** waitStatus value to indicate successor's thread needs unparking */
    // 官方的描述是瘤礁,其表示當(dāng)前node的后繼節(jié)點(diǎn)對應(yīng)的線程需要被喚醒
    static final int SIGNAL    = -1;
    /** waitStatus value to indicate thread is waiting on condition */
    static final int CONDITION = -2;
    /**
     * waitStatus value to indicate the next acquireShared should
     * unconditionally propagate
     */
    static final int PROPAGATE = -3;
    // =====================================================


    // 取值為上面的1、-1钝腺、-2拍屑、-3,或者0
    // 這么理解喷斋,暫時(shí)只需要知道如果這個(gè)值 大于0 代表此線程取消了等待蒜茴,
    //    ps: 半天搶不到鎖,不搶了,ReentrantLock是可以指定timeouot的漓摩。腿椎。。
    volatile int waitStatus;
    // 前驅(qū)節(jié)點(diǎn)的引用
    volatile Node prev;
    // 后繼節(jié)點(diǎn)的引用
    volatile Node next;
    // 這個(gè)就是線程本尊
    volatile Thread thread;

}

Node 的數(shù)據(jù)結(jié)構(gòu)其實(shí)也挺簡單的夭咬,就是 thread + waitStatus + pre + next 四個(gè)屬性而已.

上面是一些基本的數(shù)據(jù)結(jié)構(gòu)啃炸,下面,我們開始說 ReentrantLock 的公平鎖卓舵。

ReentrantLock 在內(nèi)部用了內(nèi)部類 Sync 來管理鎖南用,所以真正的獲取鎖和釋放鎖是由 Sync 的實(shí)現(xiàn)類來控制的。

abstract static class Sync extends AbstractQueuedSynchronizer {
}

Sync 有兩個(gè)實(shí)現(xiàn)边器,分別為 NonfairSync(非公平鎖)和 FairSync(公平鎖)训枢,我們看 FairSync 部分。

public ReentrantLock(boolean fair) {
    sync = fair ? new FairSync() : new NonfairSync();
}
static final class FairSync extends Sync {
    private static final long serialVersionUID = -3000897897090466540L;
      // 爭鎖
    final void lock() {
        acquire(1);
    }
      // 來自父類AQS忘巧,
    // 我們看到恒界,這個(gè)方法,如果tryAcquire(arg) 返回true, 也就結(jié)束了砚嘴。
    // 否則十酣,acquireQueued方法會將線程壓到隊(duì)列中
    public final void acquire(int arg) { // 此時(shí) arg == 1
        // 首先調(diào)用tryAcquire(1)一下,名字上就知道际长,這個(gè)只是試一試
        // 因?yàn)橛锌赡苤苯泳统晒α四厮什桑簿筒恍枰M(jìn)隊(duì)列排隊(duì)了,
        if (!tryAcquire(arg) &&
            // tryAcquire(arg)沒有成功工育,這個(gè)時(shí)候需要把當(dāng)前線程掛起虾宇,放到阻塞隊(duì)列中。
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) {
              selfInterrupt();
        }
    }

    /**
     * Fair version of tryAcquire.  Don't grant access unless
     * recursive call or no waiters or is first.
     */
    // 嘗試直接獲取鎖如绸,返回值是boolean嘱朽,代表是否獲取到鎖
    // 返回true:1.沒有線程在等待鎖;2.重入鎖怔接,線程本來就持有鎖搪泳,也就可以理所當(dāng)然可以直接獲取
    protected final boolean tryAcquire(int acquires) {
        final Thread current = Thread.currentThread();
        int c = getState();
        // state == 0 此時(shí)此刻沒有線程持有鎖
        if (c == 0) {
            // 雖然此時(shí)此刻鎖是可以用的,但是這是公平鎖扼脐,既然是公平岸军,就得講究先來后到,
            // 看看有沒有別人在隊(duì)列中等了半天了
            if (!hasQueuedPredecessors() &&
                // 如果沒有線程在等待,那就用CAS嘗試一下艰赞,成功了就獲取到鎖了佣谐,
                // 不成功的話,只能說明一個(gè)問題方妖,就在剛剛幾乎同一時(shí)刻有個(gè)線程搶先了 =_=
                // 因?yàn)閯倓傔€沒人的台谍,我判斷過了
                compareAndSetState(0, acquires)) {

                // 到這里就是獲取到鎖了,標(biāo)記一下吁断,告訴大家,現(xiàn)在是我占用了鎖
                setExclusiveOwnerThread(current);
                return true;
            }
        }
          // 會進(jìn)入這個(gè)else if分支坞生,說明是重入了仔役,需要操作:state=state+1
        // 這里不存在并發(fā)問題
        else if (current == getExclusiveOwnerThread()) {
            int nextc = c + acquires;
            if (nextc < 0)
                throw new Error("Maximum lock count exceeded");
            setState(nextc);
            return true;
        }
        // 如果到這里,說明前面的if和else if都沒有返回true是己,說明沒有獲取到鎖
        // 回到上面一個(gè)外層調(diào)用方法繼續(xù)看:
        // if (!tryAcquire(arg) 
        //        && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) 
        //     selfInterrupt();
        return false;
    }

    // 假設(shè)tryAcquire(arg) 返回false又兵,那么代碼將執(zhí)行:
      //        acquireQueued(addWaiter(Node.EXCLUSIVE), arg),
    // 這個(gè)方法卒废,首先需要執(zhí)行:addWaiter(Node.EXCLUSIVE)

    /**
     * Creates and enqueues node for current thread and given mode.
     *
     * @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared
     * @return the new node
     */
    // 此方法的作用是把線程包裝成node沛厨,同時(shí)進(jìn)入到隊(duì)列中
    // 參數(shù)mode此時(shí)是Node.EXCLUSIVE,代表獨(dú)占模式
    private Node addWaiter(Node mode) {
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        // 以下幾行代碼想把當(dāng)前node加到鏈表的最后面去摔认,也就是進(jìn)到阻塞隊(duì)列的最后
        Node pred = tail;

        // tail!=null => 隊(duì)列不為空(tail==head的時(shí)候逆皮,其實(shí)隊(duì)列是空的,不過不管這個(gè)吧)
        if (pred != null) { 
            // 將當(dāng)前的隊(duì)尾節(jié)點(diǎn)参袱,設(shè)置為自己的前驅(qū) 
            node.prev = pred; 
            // 用CAS把自己設(shè)置為隊(duì)尾, 如果成功后电谣,tail == node 了,這個(gè)節(jié)點(diǎn)成為阻塞隊(duì)列新的尾巴
            if (compareAndSetTail(pred, node)) { 
                // 進(jìn)到這里說明設(shè)置成功抹蚀,當(dāng)前node==tail, 將自己與之前的隊(duì)尾相連剿牺,
                // 上面已經(jīng)有 node.prev = pred,加上下面這句环壤,也就實(shí)現(xiàn)了和之前的尾節(jié)點(diǎn)雙向連接了
                pred.next = node;
                // 線程入隊(duì)了晒来,可以返回了
                return node;
            }
        }
        // 仔細(xì)看看上面的代碼,如果會到這里郑现,
        // 說明 pred==null(隊(duì)列是空的) 或者 CAS失敗(有線程在競爭入隊(duì))
        // 讀者一定要跟上思路湃崩,如果沒有跟上,建議先不要往下讀了懂酱,往回仔細(xì)看竹习,否則會浪費(fèi)時(shí)間的
        enq(node);
        return node;
    }

    /**
     * Inserts node into queue, initializing if necessary. See picture above.
     * @param node the node to insert
     * @return node's predecessor
     */
    // 采用自旋的方式入隊(duì)
    // 之前說過,到這個(gè)方法只有兩種可能:等待隊(duì)列為空列牺,或者有線程競爭入隊(duì)整陌,
    // 自旋在這邊的語義是:CAS設(shè)置tail過程中,競爭一次競爭不到,我就多次競爭泌辫,總會排到的
    private Node enq(final Node node) {
        for (;;) {
            Node t = tail;
            // 之前說過随夸,隊(duì)列為空也會進(jìn)來這里
            if (t == null) { // Must initialize
                // 初始化head節(jié)點(diǎn)
                // 細(xì)心的讀者會知道原來 head 和 tail 初始化的時(shí)候都是 null 的
                // 還是一步CAS,你懂的震放,現(xiàn)在可能是很多線程同時(shí)進(jìn)來呢
                if (compareAndSetHead(new Node()))
                    // 給后面用:這個(gè)時(shí)候head節(jié)點(diǎn)的waitStatus==0, 看new Node()構(gòu)造方法就知道了

                    // 這個(gè)時(shí)候有了head宾毒,但是tail還是null,設(shè)置一下殿遂,
                    // 把tail指向head诈铛,放心,馬上就有線程要來了墨礁,到時(shí)候tail就要被搶了
                    // 注意:這里只是設(shè)置了tail=head幢竹,這里可沒return哦,沒有return恩静,沒有return
                    // 所以焕毫,設(shè)置完了以后,繼續(xù)for循環(huán)驶乾,下次就到下面的else分支了
                    tail = head;
            } else {
                // 下面幾行邑飒,和上一個(gè)方法 addWaiter 是一樣的,
                // 只是這個(gè)套在無限循環(huán)里级乐,反正就是將當(dāng)前線程排到隊(duì)尾疙咸,有線程競爭的話排不上重復(fù)排
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }


    // 現(xiàn)在,又回到這段代碼了
    // if (!tryAcquire(arg) 
    //        && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) 
    //     selfInterrupt();

    // 下面這個(gè)方法风科,參數(shù)node罕扎,經(jīng)過addWaiter(Node.EXCLUSIVE),此時(shí)已經(jīng)進(jìn)入阻塞隊(duì)列
    // 注意一下:如果acquireQueued(addWaiter(Node.EXCLUSIVE), arg))返回true的話丐重,
    // 意味著上面這段代碼將進(jìn)入selfInterrupt()腔召,所以正常情況下,下面應(yīng)該返回false
    // 這個(gè)方法非常重要扮惦,應(yīng)該說真正的線程掛起臀蛛,然后被喚醒后去獲取鎖,都在這個(gè)方法里了
    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                // p == head 說明當(dāng)前節(jié)點(diǎn)雖然進(jìn)到了阻塞隊(duì)列崖蜜,但是是阻塞隊(duì)列的第一個(gè)浊仆,因?yàn)樗那膀?qū)是head
                // 注意,阻塞隊(duì)列不包含head節(jié)點(diǎn)豫领,head一般指的是占有鎖的線程抡柿,head后面的才稱為阻塞隊(duì)列
                // 所以當(dāng)前節(jié)點(diǎn)可以去試搶一下鎖
                // 這里我們說一下,為什么可以去試試:
                // 首先等恐,它是隊(duì)頭洲劣,這個(gè)是第一個(gè)條件备蚓,其次,當(dāng)前的head有可能是剛剛初始化的node囱稽,
                // enq(node) 方法里面有提到郊尝,head是延時(shí)初始化的,而且new Node()的時(shí)候沒有設(shè)置任何線程
                // 也就是說战惊,當(dāng)前的head不屬于任何一個(gè)線程流昏,所以作為隊(duì)頭,可以去試一試吞获,
                // tryAcquire已經(jīng)分析過了, 忘記了請往前看一下况凉,就是簡單用CAS試操作一下state
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                // 到這里,說明上面的if分支沒有成功各拷,要么當(dāng)前node本來就不是隊(duì)頭茎刚,
                // 要么就是tryAcquire(arg)沒有搶贏別人,繼續(xù)往下看
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            // 什么時(shí)候 failed 會為 true???
            // tryAcquire() 方法拋異常的情況
            if (failed)
                cancelAcquire(node);
        }
    }

    /**
     * Checks and updates status for a node that failed to acquire.
     * Returns true if thread should block. This is the main signal
     * control in all acquire loops.  Requires that pred == node.prev
     *
     * @param pred node's predecessor holding status
     * @param node the node
     * @return {@code true} if thread should block
     */
    // 剛剛說過撤逢,會到這里就是沒有搶到鎖唄,這個(gè)方法說的是:"當(dāng)前線程沒有搶到鎖粮坞,是否需要掛起當(dāng)前線程蚊荣?"
    // 第一個(gè)參數(shù)是前驅(qū)節(jié)點(diǎn),第二個(gè)參數(shù)才是代表當(dāng)前線程的節(jié)點(diǎn)
    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int ws = pred.waitStatus;
        // 前驅(qū)節(jié)點(diǎn)的 waitStatus == -1 莫杈,說明前驅(qū)節(jié)點(diǎn)狀態(tài)正常互例,當(dāng)前線程需要掛起,直接可以返回true
        if (ws == Node.SIGNAL)
            /*
             * This node has already set status asking a release
             * to signal it, so it can safely park.
             */
            return true;

        // 前驅(qū)節(jié)點(diǎn) waitStatus大于0 筝闹,之前說過媳叨,大于0 說明前驅(qū)節(jié)點(diǎn)取消了排隊(duì)。
        // 這里需要知道這點(diǎn):進(jìn)入阻塞隊(duì)列排隊(duì)的線程會被掛起糊秆,而喚醒的操作是由前驅(qū)節(jié)點(diǎn)完成的平痰。
        // 所以下面這塊代碼說的是將當(dāng)前節(jié)點(diǎn)的prev指向waitStatus<=0的節(jié)點(diǎn)赔蒲,
        // 簡單說债热,就是為了找個(gè)好爹舶沿,因?yàn)槟氵€得依賴它來喚醒呢高镐,如果前驅(qū)節(jié)點(diǎn)取消了排隊(duì)刨仑,
        // 找前驅(qū)節(jié)點(diǎn)的前驅(qū)節(jié)點(diǎn)做爹量淌,往前遍歷總能找到一個(gè)好爹的
        if (ws > 0) {
            /*
             * Predecessor was cancelled. Skip over predecessors and
             * indicate retry.
             */
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            /*
             * waitStatus must be 0 or PROPAGATE.  Indicate that we
             * need a signal, but don't park yet.  Caller will need to
             * retry to make sure it cannot acquire before parking.
             */
            // 仔細(xì)想想硫狞,如果進(jìn)入到這個(gè)分支意味著什么
            // 前驅(qū)節(jié)點(diǎn)的waitStatus不等于-1和1残吩,那也就是只可能是0泣侮,-2紧唱,-3
            // 在我們前面的源碼中活尊,都沒有看到有設(shè)置waitStatus的深胳,所以每個(gè)新的node入隊(duì)時(shí)癣猾,waitStatu都是0
            // 正常情況下夸盟,前驅(qū)節(jié)點(diǎn)是之前的 tail,那么它的 waitStatus 應(yīng)該是 0
            // 用CAS將前驅(qū)節(jié)點(diǎn)的waitStatus設(shè)置為Node.SIGNAL(也就是-1)
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        // 這個(gè)方法返回 false像捶,那么會再走一次 for 循序上陕,
        //     然后再次進(jìn)來此方法,此時(shí)會從第一個(gè)分支返回 true
        return false;
    }

    // private static boolean shouldParkAfterFailedAcquire(Node pred, Node node)
    // 這個(gè)方法結(jié)束根據(jù)返回值我們簡單分析下:
    // 如果返回true, 說明前驅(qū)節(jié)點(diǎn)的waitStatus==-1拓春,是正常情況释簿,那么當(dāng)前線程需要被掛起,等待以后被喚醒
    //        我們也說過痘儡,以后是被前驅(qū)節(jié)點(diǎn)喚醒,就等著前驅(qū)節(jié)點(diǎn)拿到鎖枢步,然后釋放鎖的時(shí)候叫你好了
    // 如果返回false, 說明當(dāng)前不需要被掛起沉删,為什么呢?往后看

    // 跳回到前面是這個(gè)方法
    // if (shouldParkAfterFailedAcquire(p, node) &&
    //                parkAndCheckInterrupt())
    //                interrupted = true;

    // 1. 如果shouldParkAfterFailedAcquire(p, node)返回true醉途,
    // 那么需要執(zhí)行parkAndCheckInterrupt():

    // 這個(gè)方法很簡單矾瑰,因?yàn)榍懊娣祷豻rue,所以需要掛起線程隘擎,這個(gè)方法就是負(fù)責(zé)掛起線程的
    // 這里用了LockSupport.park(this)來掛起線程殴穴,然后就停在這里了,等待被喚醒=======
    private final boolean parkAndCheckInterrupt() {
        LockSupport.park(this);
        return Thread.interrupted();
    }

    // 2. 接下來說說如果shouldParkAfterFailedAcquire(p, node)返回false的情況

   // 仔細(xì)看shouldParkAfterFailedAcquire(p, node)货葬,我們可以發(fā)現(xiàn)采幌,其實(shí)第一次進(jìn)來的時(shí)候,一般都不會返回true的震桶,原因很簡單休傍,前驅(qū)節(jié)點(diǎn)的waitStatus=-1是依賴于后繼節(jié)點(diǎn)設(shè)置的。也就是說蹲姐,我都還沒給前驅(qū)設(shè)置-1呢磨取,怎么可能是true呢人柿,但是要看到,這個(gè)方法是套在循環(huán)里的忙厌,所以第二次進(jìn)來的時(shí)候狀態(tài)就是-1了凫岖。

    // 解釋下為什么shouldParkAfterFailedAcquire(p, node)返回false的時(shí)候不直接掛起線程:
    // => 是為了應(yīng)對在經(jīng)過這個(gè)方法后,node已經(jīng)是head的直接后繼節(jié)點(diǎn)了逢净。 為了減少線程陷入park哥放,線程阻塞的開銷切換比較大
}

解鎖操作
如果線程沒獲取到鎖,線程會被 LockSupport.park(this); 掛起停止汹胃,等待被喚醒婶芭。

// 喚醒的代碼還是比較簡單的,你如果上面加鎖的都看懂了着饥,下面都不需要看就知道怎么回事了
public void unlock() {
    sync.release(1);
}

public final boolean release(int arg) {
    // 往后看吧
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}

// 回到ReentrantLock看tryRelease方法
protected final boolean tryRelease(int releases) {
    int c = getState() - releases;
    if (Thread.currentThread() != getExclusiveOwnerThread())
        throw new IllegalMonitorStateException();
    // 是否完全釋放鎖
    boolean free = false;
    // 其實(shí)就是重入的問題犀农,如果c==0,也就是說沒有嵌套鎖了宰掉,可以釋放了呵哨,否則還不能釋放掉
    if (c == 0) {
        free = true;
        setExclusiveOwnerThread(null);
    }
    setState(c);
    return free;
}

/**
 * Wakes up node's successor, if one exists.
 *
 * @param node the node
 */
// 喚醒后繼節(jié)點(diǎn)
// 從上面調(diào)用處知道,參數(shù)node是head頭結(jié)點(diǎn)
private void unparkSuccessor(Node node) {
    /*
     * If status is negative (i.e., possibly needing signal) try
     * to clear in anticipation of signalling.  It is OK if this
     * fails or if status is changed by waiting thread.
     */
    int ws = node.waitStatus;
    // 如果head節(jié)點(diǎn)當(dāng)前waitStatus<0, 將其修改為0
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0);
    /*
     * Thread to unpark is held in successor, which is normally
     * just the next node.  But if cancelled or apparently null,
     * traverse backwards from tail to find the actual
     * non-cancelled successor.
     */
    // 下面的代碼就是喚醒后繼節(jié)點(diǎn)轨奄,但是有可能后繼節(jié)點(diǎn)取消了等待(waitStatus==1)
    // 從隊(duì)尾往前找孟害,找到waitStatus<=0的所有節(jié)點(diǎn)中排在最前面的
    Node s = node.next;
    if (s == null || s.waitStatus > 0) {
        s = null;
        // 從后往前找,仔細(xì)看代碼挪拟,不必?fù)?dān)心中間有節(jié)點(diǎn)取消(waitStatus==1)的情況
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    if (s != null)
        // 喚醒線程
        LockSupport.unpark(s.thread);
}
喚醒線程以后挨务,被喚醒的線程將從以下代碼中繼續(xù)往前走:

private final boolean parkAndCheckInterrupt() {
    LockSupport.park(this); // 剛剛線程被掛起在這里了
    return Thread.interrupted();
}
// 又回到這個(gè)方法了:acquireQueued(final Node node, int arg),這個(gè)時(shí)候玉组,node的前驅(qū)是head了

unparkSuccessor 方法中for循環(huán)從tail開始而不是head

上面AQS的代碼谎柄,我不明白的是這個(gè)for循環(huán),思路是找到后繼能夠有效的節(jié)點(diǎn)惯雳,為什么不從head開始朝巫,然后找到就break而非要從尾巴開始呢?石景?劈猿??

for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;

看插入的地方就會明白了潮孽。

private Node enq(final Node node) {
        for (;;) {
            Node t = tail;
            if (t == null) { // Must initialize
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
                //看這里
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }

新節(jié)點(diǎn)pre指向tail揪荣,tail指向新節(jié)點(diǎn),這里后繼指向前驅(qū)的指針是由CAS操作保證線程安全的往史。而cas操作之后t.next=node之前变逃,可能會有其他線程進(jìn)來。所以出現(xiàn)了問題怠堪,從尾部向前遍歷是一定能遍歷到所有的節(jié)點(diǎn)揽乱。(cas和t.next=node不是原子性的名眉,導(dǎo)致在利用next指針遍歷節(jié)點(diǎn)時(shí),可能會出現(xiàn)凰棉,節(jié)點(diǎn)已經(jīng)插入即tail已經(jīng)更新损拢,而pre的next指針依然為null的情況,這將無法完整遍歷所有節(jié)點(diǎn))撒犀。

private Node enq(final Node node) {
        for (;;) {
            Node t = tail;
            if (t == null) { // Must initialize
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
                node.prev = t;                     // ①
                if (compareAndSetTail(t, node)) {  // ②
                    t.next = node;                 // ③            
                    return t;
                }
            }
        }
    }

① 處將新結(jié)點(diǎn) node 的 prev 引用指向當(dāng)前的 t, 即 tail 結(jié)點(diǎn). 然而, 由于 ①, ② 這兩行代碼的合在一起并非原子性的, 所以很有可能在設(shè)置 tail 時(shí)存在著競爭, 也即 tail 被其它線程更新過了. 所以要自旋操作, 即在死循環(huán)中操作, 直到成功為止. 自旋地 CAS volatile 變量是很經(jīng)典的用法. 如果設(shè)置成功了, 那么 從 node.prev 執(zhí)行完畢到正在用 CAS 設(shè)置 tail 時(shí), tail 變量是沒有被修改的, 所以如果 CAS成功, 那么 node.prev = t 一定是指向上一個(gè) tail 的. 同樣的, ②, ③ 合在一起也并非原子操作, 更重要的是, next field 的設(shè)置發(fā)生在 CAS 操作之后, 所以可能會存在 tail 已經(jīng)更新, 但是 last tail 的 next field 還未設(shè)置完畢, 即它的 lastTail.next 為 null 這種情況. 因此如果此時(shí)訪問該結(jié)點(diǎn)的 next 引用可能就會得到它在隊(duì)尾, 不存在后繼結(jié)點(diǎn)的"錯(cuò)覺". 而我們總是能夠通過從 tail 開始反向查找, 借助可靠的 prev 引用來定位到指定的結(jié)點(diǎn). 簡單總結(jié)一下, prev 引用的設(shè)置發(fā)生在 CAS之前, 因此如果 CAS 設(shè)置 tail 成功, 那么 prev 一定是正確地指向 last tail, 而 next 引用的設(shè)置發(fā)生在其后, 因而會存在一個(gè) tail 更新成功, 但是 last tail 的 next 引用還未設(shè)置的尷尬時(shí)期. 所以我們說 prev 是可靠的, 而 next 有時(shí)會為 null, 但并不一定真的就沒有后繼結(jié)點(diǎn).

在有文檔的情況下福压,不太建議生看代碼反推實(shí)現(xiàn)原理,效率很低也容易出現(xiàn)理解偏差或舞。

在并發(fā)環(huán)境下荆姆,加鎖和解鎖需要以下三個(gè)部件的協(xié)調(diào):

  1. 鎖狀態(tài)。我們要知道鎖是不是被別的線程占有了映凳,這個(gè)就是 state 的作用胆筒,它為 0 的時(shí)候代表沒有線程占有鎖,可以去爭搶這個(gè)鎖诈豌,用 CAS 將 state 設(shè)為 1仆救,如果 CAS 成功,說明搶到了鎖矫渔,這樣其他線程就搶不到了彤蔽,如果鎖重入的話,state進(jìn)行 +1 就可以庙洼,解鎖就是減 1顿痪,直到 state 又變?yōu)?0,代表釋放鎖油够,所以 lock() 和 unlock() 必須要配對啊蚁袭。然后喚醒等待隊(duì)列中的第一個(gè)線程,讓其來占有鎖叠聋。

  2. 線程的阻塞和解除阻塞撕阎。AQS 中采用了 LockSupport.park(thread) 來掛起線程受裹,用 unpark 來喚醒線程

  3. 阻塞隊(duì)列碌补。因?yàn)闋帗屾i的線程可能很多,但是只能有一個(gè)線程拿到鎖棉饶,其他的線程都必須等待厦章,這個(gè)時(shí)候就需要一個(gè) queue 來管理這些線程,AQS 用的是一個(gè) FIFO 的隊(duì)列照藻,就是一個(gè)鏈表袜啃,每個(gè) node 都持有后繼節(jié)點(diǎn)的引用

注意細(xì)節(jié)

現(xiàn)假設(shè)有線程1和線程2,線程1獲取了鎖幸缕,線程 1 沒有調(diào)用 unlock() 之前群发,線程 2 調(diào)用了 lock()晰韵,會發(fā)生什么?

線程 2 會初始化 head【new Node()】熟妓,同時(shí)線程 2 也會插入到阻塞隊(duì)列并掛起 (注意看這里是一個(gè) for 循環(huán)雪猪,而且設(shè)置 head 和 tail 的部分是不 return 的,只有入隊(duì)成功才會跳出循環(huán))

private Node enq(final Node node) {
    for (;;) {
        Node t = tail;
        if (t == null) { // Must initialize
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
            node.prev = t;
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}

首先起愈,是線程 2 初始化 head 節(jié)點(diǎn)只恨,此時(shí) head==tail, waitStatus==0

image.png

然后線程 2 入隊(duì):

image.png

同時(shí)我們也要看此時(shí)節(jié)點(diǎn)的 waitStatus,我們知道 head 節(jié)點(diǎn)是線程 2 初始化的抬虽,此時(shí)的 waitStatus 沒有設(shè)置官觅, java 默認(rèn)會設(shè)置為 0,但是到 shouldParkAfterFailedAcquire 這個(gè)方法的時(shí)候阐污,線程 2 會把前驅(qū)節(jié)點(diǎn)休涤,也就是 head 的waitStatus設(shè)置為 -1。

那線程 2 節(jié)點(diǎn)此時(shí)的 waitStatus 是多少呢疤剑,由于沒有設(shè)置滑绒,所以是 0;

如果線程 3 此時(shí)再進(jìn)來隘膘,直接插到線程 2 的后面就可以了疑故,此時(shí)線程 3 的 waitStatus 是 0,到 shouldParkAfterFailedAcquire 方法的時(shí)候把前驅(qū)節(jié)點(diǎn)線程 2 的 waitStatus 設(shè)置為 -1弯菊。

image.png

這里可以簡單說下 waitStatus 中 SIGNAL(-1) 狀態(tài)的意思纵势,Doug Lea 注釋的是:代表后繼節(jié)點(diǎn)需要被喚醒。也就是說這個(gè) waitStatus 其實(shí)代表的不是自己的狀態(tài)管钳,而是后繼節(jié)點(diǎn)的狀態(tài)钦铁,我們知道,每個(gè) node 在入隊(duì)的時(shí)候才漆,都會把前驅(qū)節(jié)點(diǎn)的狀態(tài)改為 SIGNAL牛曹,然后阻塞,等待被前驅(qū)喚醒醇滥。

在acquireQueued方法里面黎比,第一次調(diào)用shouldParkAfterFailedAcquire(p, node)的時(shí)候,把前驅(qū)節(jié)點(diǎn)waitStatus從0改為-1鸳玩,然后返回false阅虫,回到acquireQueued方法,再嘗試拿一次鎖不跟,然后第二次調(diào)用shouldParkAfterFailedAcquire返回true颓帝,調(diào)用parkAndCheckInterrupt()掛起線程。
那么,如果在某線程B還沒有掛起之前购城,前驅(qū)節(jié)點(diǎn)的線程A發(fā)現(xiàn)自己waitStatus為-1直接unpark吕座,然后剛剛的線程B才掛起。那不就沒人能喚醒它了嗎瘪板?它是怎么保證被喚醒的米诉?

1、如果一個(gè)線程 park 了篷帅,那么調(diào)用 unpark(thread) 這個(gè)線程會被喚醒史侣;

2、如果一個(gè)線程先被調(diào)用了 unpark魏身,那么下一個(gè) park(thread) 操作不會掛起線程惊橱。

unparkSuccessor(Node node)

 Node s = node.next;
    if (s == null || s.waitStatus > 0) {
        s = null;
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    if (s != null)
        LockSupport.unpark(s.thread);

首先,第一行代碼先檢測 head 的后繼節(jié)點(diǎn)箭昵,只有當(dāng)此時(shí)的后繼節(jié)點(diǎn)不存在或者這個(gè)后繼節(jié)點(diǎn)取消了才開始從后往前找税朴,所以大部分情況下,其實(shí)不會發(fā)生從后往前遍歷整個(gè)隊(duì)列的情況家制。(后繼節(jié)點(diǎn)取消很正常正林,但是某節(jié)點(diǎn)在入隊(duì)的時(shí)候,如果發(fā)現(xiàn)前驅(qū)是取消狀態(tài)颤殴,前驅(qū)節(jié)點(diǎn)是會被請出隊(duì)列的)

這里為啥倒序遍歷觅廓?
之所以要倒序是因?yàn)閏ancelAcquire方法的最后一行node.next = node; 假如按照正序遍歷,剛好遍歷到node, 由于node線程異常(acquireQueued方法的for循環(huán))進(jìn)入cancelAcquire方法,執(zhí)行了最后一行后, 那么正序遍歷就會陷入死循環(huán)!(貌似不會進(jìn)入cancelAcquire這個(gè)方法?涵但?)

公平鎖和非公平鎖

ReentrantLock 默認(rèn)采用非公平鎖杈绸,除非你在構(gòu)造方法中傳入?yún)?shù) true 。

public ReentrantLock() {
    // 默認(rèn)非公平鎖
    sync = new NonfairSync();
}
public ReentrantLock(boolean fair) {
    sync = fair ? new FairSync() : new NonfairSync();
}

公平鎖的 lock 方法:

static final class FairSync extends Sync {
    final void lock() {
        acquire(1);
    }
    // AbstractQueuedSynchronizer.acquire(int arg)
    public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }
    protected final boolean tryAcquire(int acquires) {
        final Thread current = Thread.currentThread();
        int c = getState();
        if (c == 0) {
            // 1. 和非公平鎖相比矮瘟,這里多了一個(gè)判斷:是否有線程在等待
            if (!hasQueuedPredecessors() &&
                compareAndSetState(0, acquires)) {
                setExclusiveOwnerThread(current);
                return true;
            }
        }
        else if (current == getExclusiveOwnerThread()) {
            int nextc = c + acquires;
            if (nextc < 0)
                throw new Error("Maximum lock count exceeded");
            setState(nextc);
            return true;
        }
        return false;
    }
}

非公平鎖的 lock 方法:

static final class NonfairSync extends Sync {
    final void lock() {
        // 2. 和公平鎖相比瞳脓,這里會直接先進(jìn)行一次CAS,成功就返回了
        if (compareAndSetState(0, 1))
            setExclusiveOwnerThread(Thread.currentThread());
        else
            acquire(1);
    }
    // AbstractQueuedSynchronizer.acquire(int arg)
    public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }
    protected final boolean tryAcquire(int acquires) {
        return nonfairTryAcquire(acquires);
    }
}
/**
 * Performs non-fair tryLock.  tryAcquire is implemented in
 * subclasses, but both need nonfair try for trylock method.
 */
final boolean nonfairTryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    if (c == 0) {
        // 這里沒有對阻塞隊(duì)列進(jìn)行判斷
        if (compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
        if (nextc < 0) // overflow
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    return false;
}

公平鎖和非公平鎖只有兩處不同:

  1. 非公平鎖在調(diào)用 lock 后澈侠,首先就會調(diào)用 CAS 進(jìn)行一次搶鎖劫侧,如果這個(gè)時(shí)候恰巧鎖沒有被占用,那么直接就獲取到鎖返回了哨啃。

  2. 非公平鎖在 CAS 失敗后烧栋,和公平鎖一樣都會進(jìn)入到 tryAcquire 方法,在 tryAcquire 方法中棘催,如果發(fā)現(xiàn)鎖這個(gè)時(shí)候被釋放了(state == 0)劲弦,非公平鎖會直接 CAS 搶鎖耳标,但是公平鎖會判斷等待隊(duì)列是否有線程處于等待狀態(tài)醇坝,如果有則不去搶鎖,乖乖排到后面.

公平鎖和非公平鎖就這兩點(diǎn)區(qū)別,如果這兩次 CAS 都不成功呼猪,那么后面非公平鎖和公平鎖是一樣的画畅,都要進(jìn)入到阻塞隊(duì)列等待喚醒。相對來說宋距,非公平鎖會有更好的性能轴踱,因?yàn)樗耐掏铝勘容^大。當(dāng)然谚赎,非公平鎖讓獲取鎖的時(shí)間變得更加不確定淫僻,可能會導(dǎo)致在阻塞隊(duì)列中的線程長期處于饑餓狀態(tài)。

Condition

我們先來看看 Condition 的使用場景壶唤,Condition 經(jīng)出椋可以用在生產(chǎn)者-消費(fèi)者的場景中,請看 Doug Lea 給出的這個(gè)例子:

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

class BoundedBuffer {
    final Lock lock = new ReentrantLock();
    // condition 依賴于 lock 來產(chǎn)生
    final Condition notFull = lock.newCondition();
    final Condition notEmpty = lock.newCondition();

    final Object[] items = new Object[100];
    int putptr, takeptr, count;

    // 生產(chǎn)
    public void put(Object x) throws InterruptedException {
        lock.lock();
        try {
            while (count == items.length)
                notFull.await();  // 隊(duì)列已滿闸盔,等待悯辙,直到 not full 才能繼續(xù)生產(chǎn)
            items[putptr] = x;
            if (++putptr == items.length) putptr = 0;
            ++count;
            notEmpty.signal(); // 生產(chǎn)成功,隊(duì)列已經(jīng) not empty 了迎吵,發(fā)個(gè)通知出去
        } finally {
            lock.unlock();
        }
    }

    // 消費(fèi)
    public Object take() throws InterruptedException {
        lock.lock();
        try {
            while (count == 0)
                notEmpty.await(); // 隊(duì)列為空躲撰,等待,直到隊(duì)列 not empty击费,才能繼續(xù)消費(fèi)
            Object x = items[takeptr];
            if (++takeptr == items.length) takeptr = 0;
            --count;
            notFull.signal(); // 被我消費(fèi)掉一個(gè)拢蛋,隊(duì)列 not full 了,發(fā)個(gè)通知出去
            return x;
        } finally {
            lock.unlock();
        }
    }
}

我們可以看到蔫巩,在使用 condition 時(shí)瓤狐,必須先持有相應(yīng)的鎖。這個(gè)和 Object 類中的方法有相似的語義批幌,需要先持有某個(gè)對象的監(jiān)視器鎖才可以執(zhí)行 wait(), notify() 或 notifyAll() 方法础锐。

ArrayBlockingQueue 采用這種方式實(shí)現(xiàn)了生產(chǎn)者-消費(fèi)者,所以請只把這個(gè)例子當(dāng)做學(xué)習(xí)例子荧缘,實(shí)際生產(chǎn)中可以直接使用 ArrayBlockingQueue皆警。

我們常用 obj.wait(),obj.notify() 或 obj.notifyAll() 來實(shí)現(xiàn)相似的功能截粗,但是信姓,它們是基于對象的監(jiān)視器鎖的. 這里說的 Condition 是基于 ReentrantLock 實(shí)現(xiàn)的,而 ReentrantLock 是依賴于 AbstractQueuedSynchronizer 實(shí)現(xiàn)的绸罗。

每個(gè) ReentrantLock 實(shí)例可以通過調(diào)用多次 newCondition 產(chǎn)生多個(gè) ConditionObject 的實(shí)例:

final ConditionObject newCondition() {
    // 實(shí)例化一個(gè) ConditionObject
    return new ConditionObject();
}

我們首先來看下我們關(guān)注的 Condition 的實(shí)現(xiàn)類 AbstractQueuedSynchronizer 類中的 ConditionObject意推。

public class ConditionObject implements Condition, java.io.Serializable {
        private static final long serialVersionUID = 1173984872572414699L;
        // 條件隊(duì)列的第一個(gè)節(jié)點(diǎn)
          // 不要管這里的關(guān)鍵字 transient,是不參與序列化的意思
        private transient Node firstWaiter;
        // 條件隊(duì)列的最后一個(gè)節(jié)點(diǎn)
        private transient Node lastWaiter;
        ......

在介紹 AQS 的時(shí)候珊蟀,我們有一個(gè)阻塞隊(duì)列(或者叫同步隊(duì)列sync queue)菊值,用于保存等待獲取鎖的線程的隊(duì)列,這里我們引入另一個(gè)概念,叫條件隊(duì)列(condition queue)腻窒,我畫了一張簡單的圖用來說明這個(gè)昵宇。

image.png

這里,我們簡單回顧下 Node 的屬性:

volatile int waitStatus; // 可取值 0儿子、CANCELLED(1)瓦哎、SIGNAL(-1)、CONDITION(-2)柔逼、PROPAGATE(-3)
volatile Node prev;
volatile Node next;
volatile Thread thread;
Node nextWaiter;

prev 和 next 用于實(shí)現(xiàn)阻塞隊(duì)列的雙向鏈表蒋譬,這里的 nextWaiter 用于實(shí)現(xiàn)條件隊(duì)列的單向鏈表。

  1. 條件隊(duì)列和阻塞隊(duì)列的節(jié)點(diǎn)愉适,都是 Node 的實(shí)例羡铲,因?yàn)闂l件隊(duì)列的節(jié)點(diǎn)是需要轉(zhuǎn)移到阻塞隊(duì)列中去的;

  2. 我們知道一個(gè) ReentrantLock 實(shí)例可以通過多次調(diào)用 newCondition() 來產(chǎn)生多個(gè) Condition 實(shí)例儡毕,這里對應(yīng) condition1 和 condition2也切。注意,ConditionObject 只有兩個(gè)屬性 firstWaiter 和 lastWaiter

  3. 每個(gè) condition 有一個(gè)關(guān)聯(lián)的條件隊(duì)列腰湾,如線程 1 調(diào)用 condition1.await() 方法即可將當(dāng)前線程 1 包裝成 Node 后加入到條件隊(duì)列中雷恃,然后阻塞在這里,不繼續(xù)往下執(zhí)行费坊,條件隊(duì)列是一個(gè)單向鏈表倒槐;

  4. 調(diào)用condition1.signal() 觸發(fā)一次喚醒,此時(shí)喚醒的是隊(duì)頭附井,會將condition1 對應(yīng)的條件隊(duì)列的 firstWaiter(隊(duì)頭) 移到阻塞隊(duì)列的隊(duì)尾讨越,等待獲取鎖,獲取鎖后 await 方法才能返回永毅,繼續(xù)往下執(zhí)行把跨。

我們先來看看 wait 方法:

// 首先,這個(gè)方法是可被中斷的沼死,不可被中斷的是另一個(gè)方法 awaitUninterruptibly()
// 這個(gè)方法會阻塞着逐,直到調(diào)用 signal 方法(指 signal() 和 signalAll(),下同)意蛀,或被中斷
public final void await() throws InterruptedException {
    // 老規(guī)矩耸别,既然該方法要響應(yīng)中斷,那么在最開始就判斷中斷狀態(tài)
    if (Thread.interrupted())
        throw new InterruptedException();

    // 添加到 condition 的條件隊(duì)列中
    Node node = addConditionWaiter();

    // 釋放鎖县钥,返回值是釋放鎖之前的 state 值
    // await() 之前秀姐,當(dāng)前線程是必須持有鎖的,這里肯定要釋放掉
    int savedState = fullyRelease(node);

    int interruptMode = 0;
    // 這里退出循環(huán)有兩種情況若贮,之后再仔細(xì)分析
    // 1. isOnSyncQueue(node) 返回 true省有,即當(dāng)前 node 已經(jīng)轉(zhuǎn)移到阻塞隊(duì)列了
    // 2. checkInterruptWhileWaiting(node) != 0 會到 break痒留,然后退出循環(huán),代表的是線程中斷
    while (!isOnSyncQueue(node)) {
        LockSupport.park(this);
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
    }
    // 被喚醒后锥咸,將進(jìn)入阻塞隊(duì)列,等待獲取鎖
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
    if (node.nextWaiter != null) // clean up if cancelled
        unlinkCancelledWaiters();
    if (interruptMode != 0)
        reportInterruptAfterWait(interruptMode);
}

具體細(xì)節(jié):

  1. 將節(jié)點(diǎn)加入到條件隊(duì)列
    addConditionWaiter() 是將當(dāng)前節(jié)點(diǎn)加入到條件隊(duì)列
// 將當(dāng)前線程對應(yīng)的節(jié)點(diǎn)入隊(duì)细移,插入隊(duì)尾
private Node addConditionWaiter() {
    Node t = lastWaiter;
    // 如果條件隊(duì)列的最后一個(gè)節(jié)點(diǎn)取消了搏予,將其清除出去
    // 為什么這里把 waitStatus 不等于 Node.CONDITION,就判定為該節(jié)點(diǎn)發(fā)生了取消排隊(duì)弧轧?
    if (t != null && t.waitStatus != Node.CONDITION) {
        // 這個(gè)方法會遍歷整個(gè)條件隊(duì)列雪侥,然后會將已取消的所有節(jié)點(diǎn)清除出隊(duì)列
        unlinkCancelledWaiters();
        t = lastWaiter;
    }
    // node 在初始化的時(shí)候,指定 waitStatus 為 Node.CONDITION
    Node node = new Node(Thread.currentThread(), Node.CONDITION);

    // t 此時(shí)是 lastWaiter精绎,隊(duì)尾
    // 如果隊(duì)列為空
    if (t == null)
        firstWaiter = node;
    else
        t.nextWaiter = node;
    lastWaiter = node;
    return node;
}

上面的這塊代碼很簡單速缨,就是將當(dāng)前線程進(jìn)入到條件隊(duì)列的隊(duì)尾。

在addWaiter 方法中代乃,有一個(gè) unlinkCancelledWaiters() 方法旬牲,該方法用于清除隊(duì)列中已經(jīng)取消等待的節(jié)點(diǎn)。

/ 等待隊(duì)列是一個(gè)單向鏈表搁吓,遍歷鏈表將已經(jīng)取消等待的節(jié)點(diǎn)清除出去
// 純屬鏈表操作原茅,很好理解,看不懂多看幾遍就可以了
private void unlinkCancelledWaiters() {
    Node t = firstWaiter;
    Node trail = null;
    while (t != null) {
        Node next = t.nextWaiter;
        // 如果節(jié)點(diǎn)的狀態(tài)不是 Node.CONDITION 的話堕仔,這個(gè)節(jié)點(diǎn)就是被取消的
        if (t.waitStatus != Node.CONDITION) {
            t.nextWaiter = null;
            if (trail == null)
                firstWaiter = next;
            else
                trail.nextWaiter = next;
            if (next == null)
                lastWaiter = trail;
        }
        else
            trail = t;
        t = next;
    }
}

完全釋放獨(dú)占鎖

回到 wait 方法擂橘,節(jié)點(diǎn)入隊(duì)了以后,會調(diào)用 int savedState = fullyRelease(node); 方法釋放鎖摩骨,注意通贞,這里是完全釋放獨(dú)占鎖(fully release),因?yàn)?ReentrantLock 是可以重入的恼五。

考慮一下這里的 savedState昌罩。如果在 condition1.await() 之前,假設(shè)線程先執(zhí)行了 2 次 lock() 操作灾馒,那么 state 為 2峡迷,我們理解為該線程持有 2 把鎖,這里 await() 方法必須將 state 設(shè)置為 0你虹,然后再進(jìn)入掛起狀態(tài)绘搞,這樣其他線程才能持有鎖。當(dāng)它被喚醒的時(shí)候傅物,它需要重新持有 2 把鎖夯辖,才能繼續(xù)下去。

// 首先董饰,我們要先觀察到返回值 savedState 代表 release 之前的 state 值
// 對于最簡單的操作:先 lock.lock()蒿褂,然后 condition1.await()圆米。
//         那么 state 經(jīng)過這個(gè)方法由 1 變?yōu)?0,鎖釋放啄栓,此方法返回 1
//         相應(yīng)的娄帖,如果 lock 重入了 n 次,savedState == n
// 如果這個(gè)方法失敗昙楚,會將節(jié)點(diǎn)設(shè)置為"取消"狀態(tài)近速,并拋出異常 IllegalMonitorStateException
final int fullyRelease(Node node) {
    boolean failed = true;
    try {
        int savedState = getState();
        // 這里使用了當(dāng)前的 state 作為 release 的參數(shù),也就是完全釋放掉鎖堪旧,將 state 置為 0
        if (release(savedState)) {
            failed = false;
            return savedState;
        } else {
            throw new IllegalMonitorStateException();
        }
    } finally {
        if (failed)
            node.waitStatus = Node.CANCELLED;
    }
}

如果一個(gè)線程在不持有 lock 的基礎(chǔ)上削葱,就去調(diào)用 condition1.await() 方法,它能進(jìn)入條件隊(duì)列淳梦,但是在上面的這個(gè)方法中析砸,由于它不持有鎖,release(savedState) 這個(gè)方法肯定要返回 false爆袍,進(jìn)入到異常分支首繁,然后進(jìn)入 finally 塊設(shè)置 node.waitStatus = Node.CANCELLED,這個(gè)已經(jīng)入隊(duì)的節(jié)點(diǎn)之后會被后繼的節(jié)點(diǎn)”請出去“陨囊。

等待進(jìn)入阻塞隊(duì)列
釋放掉鎖以后蛮瞄,接下來是這段,這邊會自旋谆扎,如果發(fā)現(xiàn)自己還沒到阻塞隊(duì)列挂捅,那么掛起,等待被轉(zhuǎn)移到阻塞隊(duì)列堂湖。

int interruptMode = 0;
// 如果不在阻塞隊(duì)列中闲先,注意了,是阻塞隊(duì)列
while (!isOnSyncQueue(node)) {
    // 線程掛起
    LockSupport.park(this);

    // 這里可以先不用看了无蜂,等看到它什么時(shí)候被 unpark 再說
    if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
        break;
}

isOnSyncQueue(Node node) 用于判斷節(jié)點(diǎn)是否已經(jīng)轉(zhuǎn)移到阻塞隊(duì)列了:

//  在節(jié)點(diǎn)入條件隊(duì)列的時(shí)候伺糠,初始化時(shí)設(shè)置了 waitStatus = Node.CONDITION
// 前面我提到,signal 的時(shí)候需要將節(jié)點(diǎn)從條件隊(duì)列移到阻塞隊(duì)列斥季,
// 這個(gè)方法就是判斷 node 是否已經(jīng)移動到阻塞隊(duì)列了
final boolean isOnSyncQueue(Node node) {

    // 移動過去的時(shí)候训桶,node 的 waitStatus 會置為 0,這個(gè)之后在說 signal 方法的時(shí)候會說到
    // 如果 waitStatus 還是 Node.CONDITION酣倾,也就是 -2舵揭,那肯定就是還在條件隊(duì)列中
    // 如果 node 的前驅(qū) prev 指向還是 null,說明肯定沒有在 阻塞隊(duì)列(prev是阻塞隊(duì)列鏈表中使用的)
    if (node.waitStatus == Node.CONDITION || node.prev == null)
        return false;
    // 如果 node 已經(jīng)有后繼節(jié)點(diǎn) next 的時(shí)候躁锡,那肯定是在阻塞隊(duì)列了
    if (node.next != null) 
        return true;

    // 下面這個(gè)方法從阻塞隊(duì)列的隊(duì)尾開始從后往前遍歷找午绳,如果找到相等的,說明在阻塞隊(duì)列映之,否則就是不在阻塞隊(duì)列

    // 可以通過判斷 node.prev() != null 來推斷出 node 在阻塞隊(duì)列嗎拦焚?答案是:不能蜡坊。
    // AQS 的入隊(duì)方法,首先設(shè)置的是 node.prev 指向 tail赎败,
    // 然后是 CAS 操作將自己設(shè)置為新的 tail秕衙,可是這次的 CAS 是可能失敗的。

    return findNodeFromTail(node);
}

// 從阻塞隊(duì)列的隊(duì)尾往前遍歷僵刮,如果找到据忘,返回 true
private boolean findNodeFromTail(Node node) {
    Node t = tail;
    for (;;) {
        if (t == node)
            return true;
        if (t == null)
            return false;
        t = t.prev;
    }
}

回到前面的循環(huán),isOnSyncQueue(node) 返回 false 的話妓笙,那么進(jìn)到 LockSupport.park(this); 這里線程掛起若河。

signal 喚醒線程能岩,轉(zhuǎn)移到阻塞隊(duì)列
喚醒操作通常由另一個(gè)線程來操作寞宫,就像生產(chǎn)者-消費(fèi)者模式中,如果線程因?yàn)榈却M(fèi)而掛起拉鹃,那么當(dāng)生產(chǎn)者生產(chǎn)了一個(gè)東西后辈赋,會調(diào)用 signal 喚醒正在等待的線程來消費(fèi)。

// 喚醒等待了最久的線程
// 其實(shí)就是膏燕,將這個(gè)線程對應(yīng)的 node 從條件隊(duì)列轉(zhuǎn)移到阻塞隊(duì)列
public final void signal() {
    // 調(diào)用 signal 方法的線程必須持有當(dāng)前的獨(dú)占鎖
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    Node first = firstWaiter;
    if (first != null)
        doSignal(first);
}

// 從條件隊(duì)列隊(duì)頭往后遍歷钥屈,找出第一個(gè)需要轉(zhuǎn)移的 node
// 因?yàn)榍懊嫖覀冋f過,有些線程會取消排隊(duì)坝辫,但是可能還在隊(duì)列中
private void doSignal(Node first) {
    do {
          // 將 firstWaiter 指向 first 節(jié)點(diǎn)后面的第一個(gè)篷就,因?yàn)?first 節(jié)點(diǎn)馬上要離開了
        // 如果將 first 移除后,后面沒有節(jié)點(diǎn)在等待了近忙,那么需要將 lastWaiter 置為 null
        if ( (firstWaiter = first.nextWaiter) == null)
            lastWaiter = null;
        // 因?yàn)?first 馬上要被移到阻塞隊(duì)列了竭业,和條件隊(duì)列的鏈接關(guān)系在這里斷掉
        first.nextWaiter = null;
    } while (!transferForSignal(first) &&
             (first = firstWaiter) != null);
      // 這里 while 循環(huán),如果 first 轉(zhuǎn)移不成功及舍,那么選擇 first 后面的第一個(gè)節(jié)點(diǎn)進(jìn)行轉(zhuǎn)移未辆,依此類推
}

// 將節(jié)點(diǎn)從條件隊(duì)列轉(zhuǎn)移到阻塞隊(duì)列
// true 代表成功轉(zhuǎn)移
// false 代表在 signal 之前,節(jié)點(diǎn)已經(jīng)取消了
final boolean transferForSignal(Node node) {

    // CAS 如果失敗锯玛,說明此 node 的 waitStatus 已不是 Node.CONDITION妙色,說明節(jié)點(diǎn)已經(jīng)取消屋灌,
    // 既然已經(jīng)取消,也就不需要轉(zhuǎn)移了,方法返回吏饿,轉(zhuǎn)移后面一個(gè)節(jié)點(diǎn)
    // 否則,將 waitStatus 置為 0
    if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
        return false;

    // enq(node): 自旋進(jìn)入阻塞隊(duì)列的隊(duì)尾
    // 注意荐捻,這里的返回值 p 是 node 在阻塞隊(duì)列的前驅(qū)節(jié)點(diǎn)
    Node p = enq(node);
    int ws = p.waitStatus;
    // ws > 0 說明 node 在阻塞隊(duì)列中的前驅(qū)節(jié)點(diǎn)取消了等待鎖堡僻,直接喚醒 node 對應(yīng)的線程。喚醒之后會怎么樣实撒,后面再解釋
    // 如果 ws <= 0, 那么 compareAndSetWaitStatus 將會被調(diào)用姊途,節(jié)點(diǎn)入隊(duì)后涉瘾,需要把前驅(qū)節(jié)點(diǎn)的狀態(tài)設(shè)為 Node.SIGNAL(-1)
    if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
        // 如果前驅(qū)節(jié)點(diǎn)取消或者 CAS 失敗,會進(jìn)到這里喚醒線程捷兰,之后的操作看下一節(jié)
        LockSupport.unpark(node.thread);
    return true;
}

正常情況下立叛,ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL) 這句中,ws <= 0贡茅,而且 compareAndSetWaitStatus(p, ws, Node.SIGNAL) 會返回 true秘蛇,所以一般也不會進(jìn)去 if 語句塊中喚醒 node 對應(yīng)的線程。然后這個(gè)方法返回 true顶考,也就意味著 signal 方法結(jié)束了赁还,節(jié)點(diǎn)進(jìn)入了阻塞隊(duì)列。

假設(shè)發(fā)生了阻塞隊(duì)列中的前驅(qū)節(jié)點(diǎn)取消等待驹沿,或者 CAS 失敗艘策,只要喚醒線程,讓其進(jìn)到下一步即可渊季。

喚醒后檢查中斷狀態(tài)
上一步 signal 之后朋蔫,我們的線程由條件隊(duì)列轉(zhuǎn)移到了阻塞隊(duì)列,之后就準(zhǔn)備獲取鎖了却汉。只要重新獲取到鎖了以后驯妄,繼續(xù)往下執(zhí)行。

int interruptMode = 0;
while (!isOnSyncQueue(node)) {
    // 線程掛起
    LockSupport.park(this);

    if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
        break;
}

先解釋下 interruptMode合砂。interruptMode 可以取值為 REINTERRUPT(1)青扔,THROW_IE(-1),0

  • REINTERRUPT: 代表 await 返回的時(shí)候翩伪,需要重新設(shè)置中斷狀態(tài)
  • THROW_IE: 代表 await 返回的時(shí)候微猖,需要拋出 InterruptedException 異常
  • 0 :說明在 await 期間,沒有發(fā)生中斷

有以下三種情況會讓 LockSupport.park(this); 這句返回繼續(xù)往下執(zhí)行:

  1. 常規(guī)路徑幻工。signal -> 轉(zhuǎn)移節(jié)點(diǎn)到阻塞隊(duì)列 -> 獲取了鎖(unpark)
  2. 線程中斷励两。在 park 的時(shí)候,另外一個(gè)線程對這個(gè)線程進(jìn)行了中斷
  3. signal 的時(shí)候我們說過囊颅,轉(zhuǎn)移以后的前驅(qū)節(jié)點(diǎn)取消了当悔,或者對前驅(qū)節(jié)點(diǎn)的CAS操作失敗了

線程喚醒后第一步是調(diào)用 checkInterruptWhileWaiting(node) 這個(gè)方法,此方法用于判斷是否在線程掛起期間發(fā)生了中斷踢代,如果發(fā)生了中斷盲憎,是 signal 調(diào)用之前中斷的,還是 signal 之后發(fā)生的中斷胳挎。

// 1. 如果在 signal 之前已經(jīng)中斷饼疙,返回 THROW_IE
// 2. 如果是 signal 之后中斷,返回 REINTERRUPT
// 3. 沒有發(fā)生中斷慕爬,返回 0
private int checkInterruptWhileWaiting(Node node) {
    return Thread.interrupted() ?
        (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
        0;
}

Thread.interrupted():如果當(dāng)前線程已經(jīng)處于中斷狀態(tài)窑眯,那么該方法返回 true屏积,同時(shí)將中斷狀態(tài)重置為 false,所以磅甩,才有后續(xù)的 重新中斷(REINTERRUPT) 的使用炊林。

看看怎么判斷是 signal 之前還是之后發(fā)生的中斷:

// 只有線程處于中斷狀態(tài),才會調(diào)用此方法
// 如果需要的話卷要,將這個(gè)已經(jīng)取消等待的節(jié)點(diǎn)轉(zhuǎn)移到阻塞隊(duì)列
// 返回 true:如果此線程在 signal 之前被取消渣聚,
final boolean transferAfterCancelledWait(Node node) {
    // 用 CAS 將節(jié)點(diǎn)狀態(tài)設(shè)置為 0 
    // 如果這步 CAS 成功,說明是 signal 方法之前發(fā)生的中斷僧叉,因?yàn)槿绻?signal 先發(fā)生的話奕枝,signal 中會將 waitStatus 設(shè)置為 0
    if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
        // 將節(jié)點(diǎn)放入阻塞隊(duì)列
        // 這里我們看到,即使中斷了瓶堕,依然會轉(zhuǎn)移到阻塞隊(duì)列
        enq(node);
        return true;
    }

    // 到這里是因?yàn)?CAS 失敗隘道,肯定是因?yàn)?signal 方法已經(jīng)將 waitStatus 設(shè)置為了 0
    // signal 方法會將節(jié)點(diǎn)轉(zhuǎn)移到阻塞隊(duì)列,但是可能還沒完成捞烟,這邊自旋等待其完成
    // 當(dāng)然薄声,這種事情還是比較少的吧:signal 調(diào)用之后当船,沒完成轉(zhuǎn)移之前题画,發(fā)生了中斷
    while (!isOnSyncQueue(node))
        Thread.yield();
    return false;
}

即使發(fā)生了中斷,節(jié)點(diǎn)依然會轉(zhuǎn)移到阻塞隊(duì)列德频。

這里描繪了一個(gè)場景,本來有個(gè)線程苍息,它是排在條件隊(duì)列的后面的,但是因?yàn)樗恢袛嗔艘贾茫敲此鼤粏拘丫核迹缓笏l(fā)現(xiàn)自己不是被 signal 的那個(gè),但是它會自己主動去進(jìn)入到阻塞隊(duì)列钞护。

獲取獨(dú)占鎖
while 循環(huán)出來以后盖喷,下面是這段代碼:

if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
    interruptMode = REINTERRUPT;

由于 while 出來后,我們確定節(jié)點(diǎn)已經(jīng)進(jìn)入了阻塞隊(duì)列难咕,準(zhǔn)備獲取鎖课梳。

這里的 acquireQueued(node, savedState) 的第一個(gè)參數(shù) node 之前已經(jīng)經(jīng)過 enq(node) 進(jìn)入了隊(duì)列,參數(shù) savedState 是之前釋放鎖前的 state余佃,這個(gè)方法返回的時(shí)候暮刃,代表當(dāng)前線程獲取了鎖,而且 state == savedState了爆土。

前面我們說過椭懊,不管有沒有發(fā)生中斷,都會進(jìn)入到阻塞隊(duì)列步势,而 acquireQueued(node, savedState) 的返回值就是代表線程是否被中斷氧猬。如果返回 true背犯,說明被中斷了,而且 interruptMode != THROW_IE盅抚,說明在 signal 之前就發(fā)生中斷了媳板,這里將 interruptMode 設(shè)置為 REINTERRUPT,用于待會重新中斷泉哈。

if (node.nextWaiter != null) // clean up if cancelled
    unlinkCancelledWaiters();
if (interruptMode != 0)
    reportInterruptAfterWait(interruptMode);

這邊說說 node.nextWaiter != null 怎么滿足蛉幸。我前面也說了 signal 的時(shí)候會將節(jié)點(diǎn)轉(zhuǎn)移到阻塞隊(duì)列,有一步是 node.nextWaiter = null丛晦,將斷開節(jié)點(diǎn)和條件隊(duì)列的聯(lián)系奕纫。可是烫沙,在判斷發(fā)生中斷的情況下匹层,是 signal 之前還是之后發(fā)生的? 這部分的時(shí)候锌蓄,我也介紹了升筏,如果 signal 之前就中斷了,也需要將節(jié)點(diǎn)進(jìn)行轉(zhuǎn)移到阻塞隊(duì)列瘸爽,這部分轉(zhuǎn)移的時(shí)候,是沒有設(shè)置 node.nextWaiter = null 的柑潦。之前我們說過享言,如果有節(jié)點(diǎn)取消,也會調(diào)用 unlinkCancelledWaiters 這個(gè)方法渗鬼,就是這里了览露。

處理中斷狀態(tài)

  • 0:什么都不做差牛,沒有被中斷過;
  • THROW_IE:await 方法拋出 InterruptedException 異常银择,因?yàn)樗碓?await() 期間發(fā)生了中斷夹孔;
  • REINTERRUPT:重新中斷當(dāng)前線程,因?yàn)樗?await() 期間沒有被中斷,而是 signal() 以后發(fā)生的中斷
private void reportInterruptAfterWait(int interruptMode)
    throws InterruptedException {
    if (interruptMode == THROW_IE)
        throw new InterruptedException();
    else if (interruptMode == REINTERRUPT)
        selfInterrupt();
}

帶超時(shí)機(jī)制的 await
經(jīng)過前面的 7 步搭伤,整個(gè) ConditionObject 類基本上都分析完了只怎,接下來簡單分析下帶超時(shí)機(jī)制的 await 方法。

public final long awaitNanos(long nanosTimeout) 
                  throws InterruptedException
public final boolean awaitUntil(Date deadline)
                throws InterruptedException
public final boolean await(long time, TimeUnit unit)
                throws InterruptedException

這三個(gè)方法都差不多怜俐,我們就挑一個(gè)出來看看吧:

public final boolean await(long time, TimeUnit unit)
        throws InterruptedException {
    // 等待這么多納秒
    long nanosTimeout = unit.toNanos(time);
    if (Thread.interrupted())
        throw new InterruptedException();
    Node node = addConditionWaiter();
    int savedState = fullyRelease(node);
    // 當(dāng)前時(shí)間 + 等待時(shí)長 = 過期時(shí)間
    final long deadline = System.nanoTime() + nanosTimeout;
    // 用于返回 await 是否超時(shí)
    boolean timedout = false;
    int interruptMode = 0;
    while (!isOnSyncQueue(node)) {
        // 時(shí)間到啦
        if (nanosTimeout <= 0L) {
            // 這里因?yàn)橐?break 取消等待了身堡。取消等待的話一定要調(diào)用 transferAfterCancelledWait(node) 這個(gè)方法
            // 如果這個(gè)方法返回 true,在這個(gè)方法內(nèi)拍鲤,將節(jié)點(diǎn)轉(zhuǎn)移到阻塞隊(duì)列成功
            // 返回 false 的話贴谎,說明 signal 已經(jīng)發(fā)生,signal 方法將節(jié)點(diǎn)轉(zhuǎn)移了季稳。也就是說沒有超時(shí)嘛
            timedout = transferAfterCancelledWait(node);
            break;
        }
        // spinForTimeoutThreshold 的值是 1000 納秒擅这,也就是 1 毫秒
        // 也就是說,如果不到 1 毫秒了景鼠,那就不要選擇 parkNanos 了仲翎,自旋的性能反而更好
        if (nanosTimeout >= spinForTimeoutThreshold)
            LockSupport.parkNanos(this, nanosTimeout);
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
        // 得到剩余時(shí)間
        nanosTimeout = deadline - System.nanoTime();
    }
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
    if (node.nextWaiter != null)
        unlinkCancelledWaiters();
    if (interruptMode != 0)
        reportInterruptAfterWait(interruptMode);
    return !timedout;
}

AbstractQueuedSynchronizer 獨(dú)占鎖的取消排隊(duì)

final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}
private final boolean parkAndCheckInterrupt() {
    LockSupport.park(this);
    return Thread.interrupted();
}

如果我們要取消一個(gè)線程的排隊(duì),我們需要在另外一個(gè)線程中對其進(jìn)行中斷铛漓。比如某線程調(diào)用 lock() 老久不返回溯香,我想中斷它。一旦對其進(jìn)行中斷浓恶,此線程會從 LockSupport.park(this); 中喚醒玫坛,然后 Thread.interrupted(); 返回 true。

我們發(fā)現(xiàn)一個(gè)問題问顷,即使是中斷喚醒了這個(gè)線程昂秃,也就只是設(shè)置了 interrupted = true 然后繼續(xù)下一次循環(huán)禀梳。而且杜窄,由于 Thread.interrupted(); 會清除中斷狀態(tài),第二次進(jìn) parkAndCheckInterrupt 的時(shí)候算途,返回會是 false塞耕。所以,我們要看到嘴瓤,在這個(gè)方法中扫外,interrupted 只是用來記錄是否發(fā)生了中斷,然后用于方法返回值廓脆,其他沒有做任何相關(guān)事情筛谚。

所以,我們看外層方法怎么處理 acquireQueued 返回 false 的情況停忿。

public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}
static void selfInterrupt() {
    Thread.currentThread().interrupt();
}

所以說驾讲,lock() 方法處理中斷的方法就是,你中斷歸中斷,我搶鎖還是照樣搶鎖吮铭,幾乎沒關(guān)系时迫,只是我搶到鎖了以后,設(shè)置線程的中斷狀態(tài)而已谓晌,也不拋出任何異常出來掠拳。調(diào)用者獲取鎖后,可以去檢查是否發(fā)生過中斷纸肉,也可以不理會溺欧。

我們來看 ReentrantLock 的另一個(gè) lock 方法:

public void lockInterruptibly() throws InterruptedException {
    sync.acquireInterruptibly(1);
}
public final void acquireInterruptibly(int arg)
        throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    if (!tryAcquire(arg))
        doAcquireInterruptibly(arg);
}
private void doAcquireInterruptibly(int arg) throws InterruptedException {
    final Node node = addWaiter(Node.EXCLUSIVE);
    boolean failed = true;
    try {
        for (;;) {
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return;
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                // 就是這里了,一旦異常柏肪,馬上結(jié)束這個(gè)方法胧奔,拋出異常。
                // 這里不再只是標(biāo)記這個(gè)方法的返回值代表中斷狀態(tài)
                // 而是直接拋出異常预吆,而且外層也不捕獲龙填,一直往外拋到 lockInterruptibly
                throw new InterruptedException();
        }
    } finally {
        // 如果通過 InterruptedException 異常出去,那么 failed 就是 true 了
        if (failed)
            cancelAcquire(node);
    }
}

private void doAcquireInterruptibly(int arg) throws InterruptedException {
    final Node node = addWaiter(Node.EXCLUSIVE);
    boolean failed = true;
    try {
        for (;;) {
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return;
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                // 就是這里了拐叉,一旦異常岩遗,馬上結(jié)束這個(gè)方法,拋出異常凤瘦。
                // 這里不再只是標(biāo)記這個(gè)方法的返回值代表中斷狀態(tài)
                // 而是直接拋出異常宿礁,而且外層也不捕獲,一直往外拋到 lockInterruptibly
                throw new InterruptedException();
        }
    } finally {
        // 如果通過 InterruptedException 異常出去蔬芥,那么 failed 就是 true 了
        if (failed)
            cancelAcquire(node);
    }
}
private void cancelAcquire(Node node) {
    // Ignore if node doesn't exist
    if (node == null)
        return;
    node.thread = null;
    // Skip cancelled predecessors
    // 找一個(gè)合適的前驅(qū)梆靖。其實(shí)就是將它前面的隊(duì)列中已經(jīng)取消的節(jié)點(diǎn)都”請出去“
    Node pred = node.prev;
    while (pred.waitStatus > 0)
        node.prev = pred = pred.prev;
    // predNext is the apparent node to unsplice. CASes below will
    // fail if not, in which case, we lost race vs another cancel
    // or signal, so no further action is necessary.
    Node predNext = pred.next;
    // Can use unconditional write instead of CAS here.
    // After this atomic step, other Nodes can skip past us.
    // Before, we are free of interference from other threads.
    node.waitStatus = Node.CANCELLED;
    // If we are the tail, remove ourselves.
    if (node == tail && compareAndSetTail(node, pred)) {
        compareAndSetNext(pred, predNext, null);
    } else {
        // If successor needs signal, try to set pred's next-link
        // so it will get one. Otherwise wake it up to propagate.
        int ws;
        if (pred != head &&
            ((ws = pred.waitStatus) == Node.SIGNAL ||
             (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
            pred.thread != null) {
            Node next = node.next;
            if (next != null && next.waitStatus <= 0)
                compareAndSetNext(pred, predNext, next);
        } else {
            unparkSuccessor(node);
        }
        node.next = node; // help GC
    }
}

線程中斷
首先,我們要明白笔诵,中斷不是類似 linux 里面的命令 kill -9 pid返吻,不是說我們中斷某個(gè)線程,這個(gè)線程就停止運(yùn)行了乎婿。中斷代表線程狀態(tài)测僵,每個(gè)線程都關(guān)聯(lián)了一個(gè)中斷狀態(tài),是一個(gè) true 或 false 的 boolean 值谢翎,初始值為 false捍靠。

// Thread 類中的實(shí)例方法,持有線程實(shí)例引用即可檢測線程中斷狀態(tài)
public boolean isInterrupted() {}

// Thread 中的靜態(tài)方法森逮,檢測調(diào)用這個(gè)方法的線程是否已經(jīng)中斷
// 注意:這個(gè)方法返回中斷狀態(tài)的同時(shí)榨婆,會將此線程的中斷狀態(tài)重置為 false
// 所以,如果我們連續(xù)調(diào)用兩次這個(gè)方法的話褒侧,第二次的返回值肯定就是 false 了
public static boolean interrupted() {}

// Thread 類中的實(shí)例方法良风,用于設(shè)置一個(gè)線程的中斷狀態(tài)為 true
public void interrupt() {}

我們說中斷一個(gè)線程颜武,其實(shí)就是設(shè)置了線程的 interrupted status 為 true,至于說被中斷的線程怎么處理這個(gè)狀態(tài)拖吼,那是那個(gè)線程自己的事鳞上。

如果線程處于以下三種情況,那么當(dāng)線程被中斷的時(shí)候吊档,能自動感知到:
來自 Object 類的 wait()篙议、wait(long)、wait(long, int)怠硼,

來自 Thread 類的 join()鬼贱、join(long)、join(long, int)香璃、sleep(long)这难、sleep(long, int) 這幾個(gè)方法的相同之處是,方法上都有: throws InterruptedException,如果線程阻塞在這些方法上(我們知道葡秒,這些方法會讓當(dāng)前線程阻塞)姻乓,這個(gè)時(shí)候如果其他線程對這個(gè)線程進(jìn)行了中斷,那么這個(gè)線程會從這些方法中立即返回眯牧,拋出 InterruptedException 異常蹋岩,同時(shí)重置中斷狀態(tài)為 false。

  1. Selector 中的 select 方法 一旦中斷学少,方法立即返回

因?yàn)樗麄兡茏詣痈兄街袛啵ㄟ@里說自動剪个,當(dāng)然也是基于底層實(shí)現(xiàn)),并且在做出相應(yīng)的操作后都會重置中斷狀態(tài)為 false版确。

那是不是只有以上 幾種方法能自動感知到中斷呢扣囊?不是的,如果線程阻塞在 LockSupport.park(Object obj) 方法绒疗,也叫掛起侵歇,這個(gè)時(shí)候的中斷也會導(dǎo)致線程喚醒,但是喚醒后不會重置中斷狀態(tài)忌堂,所以喚醒后去檢測中斷狀態(tài)將是 true盒至。

InterruptedException
它是一個(gè)特殊的異常,不是說 JVM 對其有特殊的處理士修,而是它的使用場景比較特殊。通常樱衷,我們可以看到棋嘲,像 Object 中的 wait() 方法,ReentrantLock 中的 lockInterruptibly() 方法矩桂,Thread 中的 sleep() 方法等等沸移,這些方法都帶有 throws InterruptedException,我們通常稱這些方法為阻塞方法(blocking method)。

阻塞方法一個(gè)很明顯的特征是雹锣,它們需要花費(fèi)比較長的時(shí)間(不是絕對的网沾,只是說明時(shí)間不可控),還有它們的方法結(jié)束返回往往依賴于外部條件蕊爵,如 wait 方法依賴于其他線程的 notify辉哥,lock 方法依賴于其他線程的 unlock等等。

當(dāng)我們看到方法上帶有 throws InterruptedException 時(shí)攒射,我們就要知道醋旦,這個(gè)方法應(yīng)該是阻塞方法,我們?nèi)绻M茉琰c(diǎn)返回的話会放,我們往往可以通過中斷來實(shí)現(xiàn)饲齐。

除了幾個(gè)特殊類(如 Object,Thread等)外咧最,感知中斷并提前返回是通過輪詢中斷狀態(tài)來實(shí)現(xiàn)的捂人。我們自己需要寫可中斷的方法的時(shí)候,就是通過在合適的時(shí)機(jī)(通常在循環(huán)的開始處)去判斷線程的中斷狀態(tài)矢沿,然后做相應(yīng)的操作(通常是方法直接返回或者拋出異常)先慷。當(dāng)然,我們也要看到咨察,如果我們一次循環(huán)花的時(shí)間比較長的話论熙,那么就需要比較長的時(shí)間才能感知到線程中斷了。

處理中斷
一旦中斷發(fā)生摄狱,我們接收到了這個(gè)信息脓诡,然后怎么去處理中斷呢?

我們經(jīng)常會這么寫代碼:

try {
    Thread.sleep(10000);
} catch (InterruptedException e) {
    // ignore
}
// go on 

當(dāng) sleep 結(jié)束繼續(xù)往下執(zhí)行的時(shí)候媒役,我們往往都不知道這塊代碼是真的 sleep 了 10 秒祝谚,還是只休眠了 1 秒就被中斷了。這個(gè)代碼的問題在于酣衷,我們將這個(gè)異常信息吞掉了交惯。(對于 sleep 方法,我相信大部分情況下穿仪,我們都不在意是否是中斷了席爽,這里是舉例)

AQS 的做法很值得我們借鑒,我們知道 ReentrantLock 有兩種 lock 方法:

public void lock() {
    sync.lock();
}

public void lockInterruptibly() throws InterruptedException {
    sync.acquireInterruptibly(1);
}

前面我們提到過啊片,lock() 方法不響應(yīng)中斷只锻。如果 thread1 調(diào)用了 lock() 方法,過了很久還沒搶到鎖紫谷,這個(gè)時(shí)候 thread2 對其進(jìn)行了中斷齐饮,thread1 是不響應(yīng)這個(gè)請求的捐寥,它會繼續(xù)搶鎖,當(dāng)然它不會把“被中斷”這個(gè)信息扔掉祖驱。我們可以看以下代碼:

public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        // 我們看到握恳,這里也沒做任何特殊處理,就是記錄下來中斷狀態(tài)捺僻。
        // 這樣乡洼,如果外層方法需要去檢測的時(shí)候,至少我們沒有把這個(gè)信息丟了
        selfInterrupt();// Thread.currentThread().interrupt();
}

而對于 lockInterruptibly() 方法陵像,因?yàn)槠浞椒ㄉ厦嬗?throws InterruptedException 就珠,這個(gè)信號告訴我們,如果我們要取消線程搶鎖醒颖,直接中斷這個(gè)線程即可妻怎,它會立即返回,拋出 InterruptedException 異常泞歉。

在并發(fā)包中逼侦,有非常多的這種處理中斷的例子,提供兩個(gè)方法腰耙,分別為響應(yīng)中斷和不響應(yīng)中斷榛丢,對于不響應(yīng)中斷的方法,記錄中斷而不是丟失這個(gè)信息挺庞。如 Condition 中的兩個(gè)方法就是這樣的:

void await() throws InterruptedException;
void awaitUninterruptibly();

通常晰赞,如果方法會拋出 InterruptedException 異常,往往方法體的第一句就是:

public final void await() throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
     ...... 
}
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末选侨,一起剝皮案震驚了整個(gè)濱河市掖鱼,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌援制,老刑警劉巖戏挡,帶你破解...
    沈念sama閱讀 218,036評論 6 506
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異晨仑,居然都是意外死亡褐墅,警方通過查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,046評論 3 395
  • 文/潘曉璐 我一進(jìn)店門洪己,熙熙樓的掌柜王于貴愁眉苦臉地迎上來妥凳,“玉大人,你說我怎么就攤上這事码泛』猓” “怎么了?”我有些...
    開封第一講書人閱讀 164,411評論 0 354
  • 文/不壞的土叔 我叫張陵噪珊,是天一觀的道長晌缘。 經(jīng)常有香客問我,道長,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 58,622評論 1 293
  • 正文 為了忘掉前任空繁,我火速辦了婚禮,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘。我一直安慰自己舰罚,他們只是感情好蝙搔,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,661評論 6 392
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著挺邀,像睡著了一般。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上耻煤,一...
    開封第一講書人閱讀 51,521評論 1 304
  • 那天,我揣著相機(jī)與錄音辫樱,去河邊找鬼鞋囊。 笑死止后,一個(gè)胖子當(dāng)著我的面吹牛译株,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播于游,決...
    沈念sama閱讀 40,288評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼筷频!你這毒婦竟也來了蚌成?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 39,200評論 0 276
  • 序言:老撾萬榮一對情侶失蹤截驮,失蹤者是張志新(化名)和其女友劉穎笑陈,沒想到半個(gè)月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體葵袭,經(jīng)...
    沈念sama閱讀 45,644評論 1 314
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡涵妥,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,837評論 3 336
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了坡锡。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片蓬网。...
    茶點(diǎn)故事閱讀 39,953評論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡窒所,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出帆锋,到底是詐尸還是另有隱情吵取,我是刑警寧澤,帶...
    沈念sama閱讀 35,673評論 5 346
  • 正文 年R本政府宣布锯厢,位于F島的核電站皮官,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏实辑。R本人自食惡果不足惜捺氢,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,281評論 3 329
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望剪撬。 院中可真熱鬧摄乒,春花似錦、人聲如沸残黑。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,889評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽梨水。三九已至拭荤,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間冰木,已是汗流浹背穷劈。 一陣腳步聲響...
    開封第一講書人閱讀 33,011評論 1 269
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留踊沸,地道東北人歇终。 一個(gè)月前我還...
    沈念sama閱讀 48,119評論 3 370
  • 正文 我出身青樓,卻偏偏與公主長得像逼龟,于是被迫代替她去往敵國和親评凝。 傳聞我的和親對象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,901評論 2 355

推薦閱讀更多精彩內(nèi)容