AbstractQueuedSynchronizer 源碼分析 (基于Java 8)

1. AbstractQueuedSynchronizer 定義

AbstractQueuedSynchronizer 是JUC 中通過(guò) Sync Queue(并發(fā)安全的 CLH Queue), Condition Queue(普通的 list) , volatile 變量 state 提供的 控制線程獲取統(tǒng)一資源(state) 的 Synchronized 工具.

主要特點(diǎn):

1. 內(nèi)部含有兩條 Queue(Sync Queue, Condition Queue), 這兩條 Queue 后面會(huì)詳細(xì)說(shuō)明.
2. AQS 內(nèi)部定義獲取鎖(acquire), 釋放鎖(release)的主邏輯, 子類實(shí)現(xiàn)響應(yīng)的模版方法即可
3. 支持共享和獨(dú)占兩種模式(共享模式時(shí)只用 Sync Queue, 獨(dú)占模式有時(shí)只用 Sync Queue, 但若涉及 Condition, 則還有 Condition Queue); 獨(dú)占是排他的.
4. 支持 不響應(yīng)中斷獲取獨(dú)占鎖(acquire), 響應(yīng)中斷獲取獨(dú)占鎖(acquireInterruptibly), 超時(shí)獲取獨(dú)占鎖(tryAcquireNanos); 不響應(yīng)中斷獲取共享鎖(acquireShared), 響應(yīng)中斷獲取共享鎖(acquireSharedInterruptibly), 超時(shí)獲取共享鎖(tryAcquireSharedNanos);
5. 在子類的 tryAcquire, tryAcquireShared 中實(shí)現(xiàn)公平與非公平的區(qū)分

先看一個(gè)demo(實(shí)現(xiàn)獨(dú)占的但是非重入)

import java.io.IOException;
import java.io.ObjectInputStream;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.AbstractQueuedSynchronizer;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;

/**
 * 獨(dú)占模式 并且不支持重入的 lock
 * Created by xujiankang on 2016/12/19.
 */
public class Mutex implements Lock, java.io.Serializable {

    // The sync object does all the hard work. We just forward to it
    private final Sync sync = new Sync();

    @Override
    public void lock() {
        sync.acquire(1);
    }

    // 支持中斷式的獲取 lock
    @Override
    public void lockInterruptibly() throws InterruptedException {
        sync.acquireInterruptibly(1);
    }
    // 嘗試獲取 lock
    @Override
    public boolean tryLock() {
        return sync.tryAcquire(1);
    }

    /**
     * 嘗試 帶 timeout 的獲取 lock
     */
    @Override
    public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
        return sync.tryAcquireNanos(1, unit.toNanos(time));
    }

    /** 釋放lock */
    @Override
    public void unlock() {
        sync.release(1);
    }

    @Override
    public Condition newCondition() {
        return sync.newCondition();
    }

    public boolean isLocked(){
        return sync.inHeldExclusively();
    }
    public boolean hasQueuedThreads(){
        return sync.hasQueuedThreads();
    }

    // internal helper class
    static class Sync extends AbstractQueuedSynchronizer{

        // report whether in locked state
        protected boolean inHeldExclusively(){ // 判斷 lock 是否被占用
            return getState() == 1;
        }
        // 獲取 lock
        // Acquire the lock if state is zero
        public boolean tryAcquire(int acquires){
            assert acquires == 1; // Otherwise unsed
            if(compareAndSetState(0, 1)){
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
            return false;
        }
        // 釋放 lock
        // Releses the lock by setting state to zero
        protected boolean tryRelease(int release){
            assert release == 1; // Otherwise unused
            if(getState() == 0){
                throw new IllegalMonitorStateException();
            }
            setExclusiveOwnerThread(null);
            setState(0);
            return true;
        }

        // Provides a Condition
        Condition newCondition(){
            return new ConditionObject();
        }

        // Deserializes properly
        private void readObject(ObjectInputStream s)throws IOException, ClassNotFoundException{
            s.defaultReadObject();
            setState(0); // reset to unlocked state
        }
    }

}

繼承 AQS 的子類通暢需要實(shí)現(xiàn)以下方法:

# 實(shí)現(xiàn)獨(dú)占
tryAcquire
tryRelease
isHeldExclusively
# 實(shí)現(xiàn)共享
tryAcquireShared
tryReleaseShared

而一般的lock獲取釋放流程如下


# lock 獲取
Acquire:
while(!tryAcquire(arg)){ // tryAcquire交由子類來(lái)實(shí)現(xiàn), 改變 AQS 的state的值
   1. tryAcquire 獲取lock沒成功, 則入 Sync Queue
   2. 若當(dāng)前節(jié)點(diǎn)是 head.next, 則再次嘗試獲取一下lock (tryAcquire)
   3. 獲取 lock 失敗, 則改變 前繼節(jié)點(diǎn)的 waitStatus 的值(變成SIGNAL), 進(jìn)行 blocked
}

# lock 釋放
Release:
if(tryRelease(arg)){ / tryRelease交由子類來(lái)實(shí)現(xiàn), 改變 AQS 的state的值
   1. 判斷 lock 是否釋放徹底
   2. 若自己被標(biāo)記為SIGNAL, 則喚醒后繼節(jié)點(diǎn), 通知其去獲取 AQS 中 state 的值
   3. 將自己的 waitStatus 進(jìn)行復(fù)位到 0
}

整個(gè) AQS 非為以下幾部分

  1. Node 節(jié)點(diǎn), 用于存放獲取線程的節(jié)點(diǎn), 存在于 Sync Queue, Condition Queue, 這些節(jié)點(diǎn)主要的區(qū)分在于 waitStatus 的值(下面會(huì)詳細(xì)敘述)
  2. Condition Queue, 這個(gè)隊(duì)列是用于獨(dú)占模式中, 只有用到 Condition.awaitXX 時(shí)才會(huì)將 node加到 tail 上(PS: 在使用 Condition的前提是已經(jīng)獲取 Lock)
  3. Sync Queue, 獨(dú)占 共享的模式中均會(huì)使用到的存放 Node 的 CLH queue(主要特點(diǎn)是, 隊(duì)列中總有一個(gè) dummy 節(jié)點(diǎn), 后繼節(jié)點(diǎn)獲取鎖的條件由前繼節(jié)點(diǎn)決定, 前繼節(jié)點(diǎn)在釋放 lock 時(shí)會(huì)喚醒sleep中的后繼節(jié)點(diǎn))
  4. ConditionObject, 用于獨(dú)占的模式, 主要是線程釋放lock, 加入 Condition Queue, 并進(jìn)行相應(yīng)的 signal 操作, 詳情點(diǎn)擊這里 Java 8 源碼分析 Condition
  5. 獨(dú)占的獲取lock (acquire, release), 例如 ReentrantLock 就是使用這種, 詳情點(diǎn)擊這里 Java 8 源碼分析 ReentrantLock
  6. 共享的獲取lock (acquireShared, releaseShared), 例如 ReeantrantReadWriteLock, Semaphore, CountDownLatch
    下面將一一講解
2. AbstractQueuedSynchronizer 內(nèi)部類 Node

Node 節(jié)點(diǎn)是代表獲取lock的線程, 存在于 Condition Queue, Sync Queue 里面, 而其主要的分別就是 nextWaiter (標(biāo)記共享還是獨(dú)占),
waitStatus 標(biāo)記node的狀態(tài)(PS: 這是關(guān)鍵, 理解了 waitStatus 的變化流程, 就能理解整個(gè) AQS)

下圖就是 Node

Node.png

見代碼:

/**
 * 代表 Thread 存在于 Sync Queue 與 Condition Queue 的節(jié)點(diǎn)
 */
static final class Node {
    /** marker to indicate a node is wating in shared mode */
    /** 標(biāo)識(shí)節(jié)點(diǎn)是否是 共享的節(jié)點(diǎn)(這樣的節(jié)點(diǎn)只存在于 Sync Queue 里面) */
    static final Node SHARED = new Node();
    /** marker to indicate a node is waiting in exclusive mode */
    /** 標(biāo)識(shí)節(jié)點(diǎn)是 獨(dú)占模式 */
    static final Node EXCLUSIVE = null;

    /** waitStatus value yto indicate thread has cancelled */
    /**
     *  CANCELLED 說(shuō)明節(jié)點(diǎn)已經(jīng) 取消獲取 lock 了(一般是由于 interrupt 或 timeout 導(dǎo)致的)
     *  很多時(shí)候是在 cancelAcquire 里面進(jìn)行設(shè)置這個(gè)標(biāo)識(shí)
     */
    static final int CANCELLED = 1;

    /** waitStatus value to indicate successor;s thread needs unparking */
    /**
     * SIGNAL 標(biāo)識(shí)當(dāng)前節(jié)點(diǎn)的后繼節(jié)點(diǎn)需要喚醒(PS: 這個(gè)通常是在 獨(dú)占模式下使用, 在共享模式下有時(shí)用 PROPAGATE)
     *
     */
    static final int SIGNAL = -1;

    /** waitStatus value to indicate thread is waiting on condition */
    /**
     * 當(dāng)前節(jié)點(diǎn)在 Condition Queue 里面
     */
    static final int CONDITION = -2;
    /**
     * waitStatus value to indicate the next acquireShared should
     * unconditionally propagate
     */
    /**
     * 當(dāng)前節(jié)點(diǎn)獲取到 lock 或進(jìn)行 release lock 時(shí), 共享模式的最終狀態(tài)是 PROPAGATE(PS: 有可能共享模式的節(jié)點(diǎn)變成 PROPAGATE 之前就被其后繼節(jié)點(diǎn)搶占 head 節(jié)點(diǎn), 而從Sync Queue中被踢出掉)
     */
    static final int PROPAGATE = -3;

    /**
     * Status field, taking only the values:
     *
     *  SIGNAL:     The successor of this node is (or will soon be)
     *              blocked (via park), so the current node must
     *              unpark its successor when is releases or
     *              cancels. To avoid races, acquire methods must
     *              first indicate they need a signal,
     *              then retry the atomic acquire, and then,
     *              on failure, block
     *  CANCELLED: This node is cancelled due to timeout or interrupt
     *              Nodes never leave this state. In particular,
     *              a thread with cancelled node never again blocks
     *  CONDITION: This node is currently on a condition queue.
     *              It will not be used as a sync queue node
     *              until transferred, at which time the status
     *              will be set to 0. (Use of this value here has
     *              nothing to do with other uses of the
     *              field, but simplifies mechanics)
     * PROPAGATE: A releaseShared should be propagated to other
     *              nodes. This is set (for head node only) in
     *              doReleaseShared to ensure propagation
     *              continues, even if other operations hava
     *              since intervened
     * 0:          None of the above(以上)
     *
     * The values are arranged numerically to simplify use.
     * Non-negative values mean that a node doesn't need to
     * signal. So, most code doesn't need to check for particular
     * values, just for sign
     *
     * The field is initialized to 0 for narmal sync nodes, and
     * CONDITION for condition nodes. It is modified using CAS
     * (or when possible, unconditional volatile writes)
     *
     */
    volatile int waitStatus;

    /**
     * 節(jié)點(diǎn)在 Sync Queue 里面時(shí)的前繼節(jié)點(diǎn)(主要來(lái)進(jìn)行 skip CANCELLED 的節(jié)點(diǎn))
     * 注意: 根據(jù) addWaiter方法:
     *  1. prev節(jié)點(diǎn)在隊(duì)列里面, 則 prev != null 肯定成立
     *  2. prev != null 成立, 不一定 node 就在 Sync Queue 里面
     */
    volatile Node prev;

    /**
     * Node 在 Sync Queue 里面的后繼節(jié)點(diǎn), 主要是在release lock 時(shí)進(jìn)行后繼節(jié)點(diǎn)的喚醒
     * 而后繼節(jié)點(diǎn)在前繼節(jié)點(diǎn)上打上 SIGNAL 標(biāo)識(shí), 來(lái)提醒他 release lock 時(shí)需要喚醒
     */
    volatile Node next;

    /** 獲取 lock 的引用 */
    volatile Thread thread;

    /**
     * 作用分成兩種:
     *  1. 在 Sync Queue 里面, nextWaiter用來(lái)判斷節(jié)點(diǎn)是 共享模式, 還是獨(dú)占模式
     *  2. 在 Condition queue 里面, 節(jié)點(diǎn)主要是鏈接且后繼節(jié)點(diǎn) (Condition queue是一個(gè)單向的, 不支持并發(fā)的 list)
     */
    Node nextWaiter;

    /** 當(dāng)前節(jié)點(diǎn)是否是共享模式 */
    final boolean isShared() {
        return nextWaiter == SHARED;
    }

    /**
     * 獲取 node 的前繼節(jié)點(diǎn)
     */
    final Node predecessor() throws NullPointerException{
        Node p = prev;
        if(p == null){
            throw new NullPointerException();
        }else{
            return p;
        }
    }

    Node(){
        // Used to establish initial head or SHARED marker
    }

    /**
     * 初始化 Node 用于 Sync Queue 里面
     */
    Node(Thread thread, Node mode){     // Used by addWaiter
        this.nextWaiter = mode;
        this.thread = thread;
    }

    /**
     * 初始化 Node 用于 Condition Queue 里面
     */
    Node(Thread thread, int waitStatus){ // Used by Condition
        this.waitStatus = waitStatus;
        this.thread = thread;
    }
}

waitStatus的狀態(tài)變化:

1. 線程剛?cè)?Sync Queue 里面, 發(fā)現(xiàn) 獨(dú)占鎖被其他人獲取, 則將其前繼節(jié)點(diǎn)標(biāo)記為 SIGNAL, 然后再嘗試獲取一下鎖(調(diào)用 tryAcquire 方法)
2. 若 調(diào)用 tryAcquire 方法獲取失敗, 則判斷一下是否前繼節(jié)點(diǎn)被標(biāo)記為 SIGNAL, 若是的話 直接 block(block前會(huì)確保前繼節(jié)點(diǎn)被標(biāo)記為SIGNAL, 因?yàn)榍袄^節(jié)點(diǎn)在進(jìn)行釋放鎖時(shí)根據(jù)是否標(biāo)記為 SIGNAL 來(lái)決定喚醒后繼節(jié)點(diǎn)與否 <- 這是獨(dú)占的情況下)
3. 前繼節(jié)點(diǎn)使用完lock, 進(jìn)行釋放, 因?yàn)樽约罕粯?biāo)記為 SIGNAL, 所以喚醒其后繼節(jié)點(diǎn)

waitStatus 變化過(guò)程:

1. 獨(dú)占模式下:  0(初始) -> signal(被后繼節(jié)點(diǎn)標(biāo)記為release需要喚醒后繼節(jié)點(diǎn)) -> 0 (等釋放好lock, 會(huì)恢復(fù)到0)
2. 獨(dú)占模式 + 使用 Condition情況下: 0(初始) -> signal(被后繼節(jié)點(diǎn)標(biāo)記為release需要喚醒后繼節(jié)點(diǎn)) -> 0 (等釋放好lock, 會(huì)恢復(fù)到0)
   其上可能涉及 中斷與超時(shí), 只是多了一個(gè) CANCELLED, 當(dāng)節(jié)點(diǎn)變成 CANCELLED, 后就等著被清除
3. 共享模式下: 0(初始) -> PROPAGATE(獲取 lock 或release lock 時(shí)) (獲取 lock 時(shí)會(huì)調(diào)用 setHeadAndPropagate 來(lái)進(jìn)行 傳遞式的喚醒后繼節(jié)點(diǎn), 直到碰到 獨(dú)占模式的節(jié)點(diǎn))
4. 共享模式 + 獨(dú)占模式下: 0(初始) -> signal(被后繼節(jié)點(diǎn)標(biāo)記為release需要喚醒后繼節(jié)點(diǎn)) -> 0 (等釋放好lock, 會(huì)恢復(fù)到0)
其上的這些狀態(tài)變化主要在: doReleaseShared , shouldParkAfterFailedAcquire 里面
3. AbstractQueuedSynchronizer 內(nèi)部Queue Condition Queue

Condition Queue 是一個(gè)并發(fā)不安全的, 只用于獨(dú)占模式的隊(duì)列(PS: 為什么是并發(fā)不安全的呢? 主要是在操作 Condition 時(shí), 線程必需獲取 獨(dú)占的 lock, 所以不需要考慮并發(fā)的安全問題);
而當(dāng)Node存在于 Condition Queue 里面, 則其只有 waitStatus, thread, nextWaiter 有值, 其他的都是null(其中的 waitStatus 只能是 CONDITION, 0(0 代表node進(jìn)行轉(zhuǎn)移到 Sync Queue里面, 或被中斷/timeout)); 這里有個(gè)注意點(diǎn), 就是 當(dāng)線程被中斷或獲取 lock 超時(shí), 則一瞬間 node 會(huì)存在于 Condition Queue, Sync Queue 兩個(gè)隊(duì)列中.

見圖:

Condition Queue.png

節(jié)點(diǎn) Node4, Node5, Node6, Node7 都是調(diào)用 Condition.awaitXX 方法 加入 Condition Queue(PS: 加入后會(huì)將原來(lái)的 lock 釋放)

4. Condition Queue 入隊(duì)列方法 addConditionWaiter
/**
 * Adds a new waiter to wait queue
 * 將當(dāng)前線程封裝成一個(gè) Node 節(jié)點(diǎn) 放入大 Condition Queue 里面
 * 大家可以注意到, 下面對(duì) Condition Queue 的操作都沒考慮到 并發(fā)(Sync Queue 的隊(duì)列是支持并發(fā)操作的), 這是為什么呢? 因?yàn)樵谶M(jìn)行操作 Condition 是當(dāng)前的線程已經(jīng)獲取了AQS的獨(dú)占鎖, 所以不需要考慮并發(fā)的情況
 * @return
 */
private Node addConditionWaiter(){
    Node t = lastWaiter;                                // 1. Condition queue 的尾節(jié)點(diǎn)
    // If lastWaiter is cancelled, clean out              // 2.尾節(jié)點(diǎn)已經(jīng)Cancel, 直接進(jìn)行清除,
                                                          //    這里有1個(gè)問題, 1 何時(shí)出現(xiàn)t.waitStatus != Node.CONDITION -> 在對(duì)線程進(jìn)行中斷時(shí) ConditionObject -> await -> checkInterruptWhileWaiting -> transferAfterCancelledWait "compareAndSetWaitStatus(node, Node.CONDITION, 0)" <- 導(dǎo)致這種情況一般是 線程中斷或 await 超時(shí)
                                                          //    一個(gè)注意點(diǎn): 當(dāng)Condition進(jìn)行 awiat 超時(shí)或被中斷時(shí), Condition里面的節(jié)點(diǎn)是沒有被刪除掉的, 需要其他 await 在將線程加入 Condition Queue 時(shí)調(diào)用addConditionWaiter而進(jìn)而刪除, 或 await 操作差不多結(jié)束時(shí), 調(diào)用 "node.nextWaiter != null" 進(jìn)行判斷而刪除 (PS: 通過(guò) signal 進(jìn)行喚醒時(shí) node.nextWaiter 會(huì)被置空, 而中斷和超時(shí)時(shí)不會(huì))
    if(t != null && t.waitStatus != Node.CONDITION){
        unlinkCancelledWaiters();                        // 3. 調(diào)用 unlinkCancelledWaiters 對(duì) "waitStatus != Node.CONDITION" 的節(jié)點(diǎn)進(jìn)行刪除(在Condition里面的Node的waitStatus 要么是CONDITION(正常), 要么就是 0 (signal/timeout/interrupt))
        t = lastWaiter;                                // 4. 獲取最新的 lastWaiter
    }
    Node node = new Node(Thread.currentThread(), Node.CONDITION); // 5. 將線程封裝成 node 準(zhǔn)備放入 Condition Queue 里面
    if(t == null){
        firstWaiter = node;                           // 6 .Condition Queue 是空的
    }else{
        t.nextWaiter = node;                          // 7. 最加到 queue 尾部
    }
    lastWaiter = node;                                // 8. 重新賦值 lastWaiter
    return node;
}
5. Condition Queue 刪除Cancelled節(jié)點(diǎn)的方法 unlinkCancelledWaiters

當(dāng)Node在Condition Queue 中, 若狀態(tài)不是 CONDITION, 則一定是 被中斷或超時(shí)

/**
 * 在 調(diào)用 addConditionWaiter 將線程放入 Condition Queue 里面時(shí) 或 awiat 方法獲取 差不多結(jié)束時(shí) 進(jìn)行清理 Condition queue 里面的因 timeout/interrupt 而還存在的節(jié)點(diǎn)
 * 這個(gè)刪除操作比較巧妙, 其中引入了 trail 節(jié)點(diǎn)挨摸, 可以理解為traverse整個(gè) Condition Queue 時(shí)遇到的最后一個(gè)有效的節(jié)點(diǎn)
 */
private void unlinkCancelledWaiters(){
    Node t = firstWaiter;
    Node trail = null;
    while(t != null){
        Node next = t.nextWaiter;               // 1. 先初始化 next 節(jié)點(diǎn)
        if(t.waitStatus != Node.CONDITION){   // 2. 節(jié)點(diǎn)不有效, 在Condition Queue 里面 Node.waitStatus 只有可能是 CONDITION 或是 0(timeout/interrupt引起的)
            t.nextWaiter = null;               // 3. Node.nextWaiter 置空
            if(trail == null){                  // 4. 一次都沒有遇到有效的節(jié)點(diǎn)
                firstWaiter = next;            // 5. 將 next 賦值給 firstWaiter(此時(shí) next 可能也是無(wú)效的, 這只是一個(gè)臨時(shí)處理)
            }else{
                trail.nextWaiter = next;       // 6. next 賦值給 trail.nextWaiter, 這一步其實(shí)就是刪除節(jié)點(diǎn) t
            }
            if(next == null){                  // 7. next == null 說(shuō)明 已經(jīng) traverse 完了 Condition Queue
                lastWaiter = trail;
            }
        }else{
            trail = t;                         // 8. 將有效節(jié)點(diǎn)賦值給 trail
        }
        t = next;
    }
}
6. Condition Queue 轉(zhuǎn)移節(jié)點(diǎn)的方法 transferForSignal

transferForSignal只有在節(jié)點(diǎn)被正常喚醒才調(diào)用的正常轉(zhuǎn)移的方法

/**
 * 將 Node 從Condition Queue 轉(zhuǎn)移到 Sync Queue 里面
 * 在調(diào)用transferForSignal之前, 會(huì) first.nextWaiter = null;
 * 而我們發(fā)現(xiàn) 若節(jié)點(diǎn)是因?yàn)?timeout / interrupt 進(jìn)行轉(zhuǎn)移, 則不會(huì)進(jìn)行這步操作; 兩種情況的轉(zhuǎn)移都會(huì)把 wautStatus 置為 0
 */
final boolean transferForSignal(Node node){
    /**
     * If cannot change waitStatus, the node has been cancelled
     */
    if(!compareAndSetWaitStatus(node, Node.CONDITION, 0)){ // 1. 若 node 已經(jīng) cancelled 則失敗
        return false;
    }

    /**
     * Splice onto queue and try to set waitStatus of predecessor to
     * indicate that thread is (probably) waiting, If cancelled or
     * attempt to set waitStatus fails, wake up to resync (in which
     * case the waitStatus can be transiently and harmlessly wrong)
     */
    Node p = enq(node);                                 // 2. 加入 Sync Queue
    int ws = p.waitStatus;
    if(ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)){ // 3. 這里的 ws > 0 指Sync Queue 中node 的前繼節(jié)點(diǎn)cancelled 了, 所以, 喚醒一下 node ; compareAndSetWaitStatus(p, ws, Node.SIGNAL)失敗, 則說(shuō)明 前繼節(jié)點(diǎn)已經(jīng)變成 SIGNAL 或 cancelled, 所以也要 喚醒
        LockSupport.unpark(node.thread);
    }
    return true;
}
7. Condition Queue 轉(zhuǎn)移節(jié)點(diǎn)的方法 transferAfterCancelledWait

transferAfterCancelledWait 在節(jié)點(diǎn)獲取lock時(shí)被中斷或獲取超時(shí)才調(diào)用的轉(zhuǎn)移方法

/**
 * 將 Condition Queue 中因 timeout/interrupt 而喚醒的節(jié)點(diǎn)進(jìn)行轉(zhuǎn)移
 */
final boolean transferAfterCancelledWait(Node node){
    if(compareAndSetWaitStatus(node, Node.CONDITION, 0)){ // 1. 沒有 node 沒有 cancelled , 直接進(jìn)行轉(zhuǎn)移 (轉(zhuǎn)移后, Sync Queue , Condition Queue 都會(huì)存在 node)
        enq(node);
        return true;
    }

    /**
     * If we lost out to a signal(), then we can't proceed
     * until it finishes its enq(). Cancelling during an
     * incomplete transfer is both race and transient, so just
     * spin
     */
    while(!isOnSyncQueue(node)){                // 2.這時(shí)是其他的線程發(fā)送signal,將本線程轉(zhuǎn)移到 Sync Queue 里面的工程中(轉(zhuǎn)移的過(guò)程中 waitStatus = 0了, 所以上面的 CAS 操作失敗)
        Thread.yield();                         // 這里調(diào)用 isOnSyncQueue判斷是否已經(jīng) 入Sync Queue 了
    }
    return false;
}
8. AbstractQueuedSynchronizer 內(nèi)部Queue Sync Queue

Sync Queue 是一個(gè)類似于 CLH Queue 的并發(fā)安全, 雙向, 用于獨(dú)占和共享兩種模式下的 queue.
而當(dāng) Node 存在于 Sync Queue 時(shí), waitStatus,胡桨, prev, next, thread, nextWaiter 均可能有值; waitStatus 可能是 SIGNAL, 0, PROPAGATE, CANCELLED; 當(dāng)節(jié)點(diǎn)不是 head 時(shí)一定prev != null(而 node.prev != null 不能說(shuō)明節(jié)點(diǎn)一定存在于 Sync Queue); node.next != null 則 node一定存在于Sync Queue, 而 node存在于 Sync Queue 則 node.next 就不一定 != null; thread 則代表獲取 lock 的線程; nextWaiter 用于標(biāo)示共享還是獨(dú)占的獲取 lock

見圖:

Sync Queue.png

這個(gè)圖代表有個(gè)線程獲取lock, 而 Node1, Node2, Node3 則在Sync Queue 里面進(jìn)行等待獲取lock(PS: 注意到 dummy Node 的SINGNAL 這是叫獲取 lock 的線程在釋放lock時(shí)通知后繼節(jié)點(diǎn)的標(biāo)示)

9. Sync Queue 節(jié)點(diǎn)入Queue方法

這里有個(gè)地方需要注意, 就是初始化 head, tail 的節(jié)點(diǎn), 不一定是 head.next, 因?yàn)槠陂g可能被其他的線程進(jìn)行搶占了

/**
 * Creates and enqueues node for current thread and given mode.
 *
 * @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared
 * @return the new node
 */
/**
 * 將當(dāng)前的線程封裝成 Node 加入到 Sync Queue 里面
 */
private Node addWaiter(Node mode){
    Node node = new Node(Thread.currentThread(), mode);      // 1. 封裝 Node
    // Try the fast path of enq; backup to full enq on failure
    Node pred = tail;
    if(pred != null){                           // 2. pred != null -> 隊(duì)列中已經(jīng)有節(jié)點(diǎn), 直接 CAS 到尾節(jié)點(diǎn)
        node.prev = pred;                       // 3. 先設(shè)置 Node.pre = pred (PS: 則當(dāng)一個(gè) node在Sync Queue里面時(shí)  node.prev 一定 != null(除 dummy node), 但是 node.prev != null 不能說(shuō)明其在 Sync Queue 里面, 因?yàn)楝F(xiàn)在的CAS可能失敗 )
        if(compareAndSetTail(pred, node)){      // 4. CAS node 到 tail
            pred.next = node;                  // 5. CAS 成功, 將 pred.next = node (PS: 說(shuō)明 node.next != null -> 則 node 一定在 Sync Queue, 但若 node 在Sync Queue 里面不一定 node.next != null)
            return node;
        }
    }
    enq(node);                                 // 6. 隊(duì)列為空, 調(diào)用 enq 入隊(duì)列
    return node;
}


/**
 * 這個(gè)插入會(huì)檢測(cè)head tail 的初始化, 必要的話會(huì)初始化一個(gè) dummy 節(jié)點(diǎn), 這個(gè)和 ConcurrentLinkedQueue 一樣的
 * Insert node into queue, initializing if necessary. See picture above.
 * @param node the node to insert
 * @return node's predecessor 返回的是前繼節(jié)點(diǎn)
 */
/**
 * 將節(jié)點(diǎn) node 加入隊(duì)列
 * 這里有個(gè)注意點(diǎn)
 * 情況:
 *      1. 首先 queue是空的
 *      2. 初始化一個(gè) dummy 節(jié)點(diǎn)
 *      3. 這時(shí)再在tail后面添加節(jié)點(diǎn)(這一步可能失敗, 可能發(fā)生競(jìng)爭(zhēng)被其他的線程搶占)
 *  這里為什么要加入一個(gè) dummy 節(jié)點(diǎn)呢?
 *      這里的 Sync Queue 是CLH lock的一個(gè)變種, 線程節(jié)點(diǎn) node 能否獲取lock的判斷通過(guò)其前繼節(jié)點(diǎn)
 *      而且這里在當(dāng)前節(jié)點(diǎn)想獲取lock時(shí)通常給前繼節(jié)點(diǎn) 打上 signal 的標(biāo)識(shí)(表示前繼節(jié)點(diǎn)釋放lock需要通知我來(lái)獲取lock)
 *      若這里不清楚的同學(xué), 請(qǐng)先看看 CLH lock的資料 (這是理解 AQS 的基礎(chǔ))
 */
private Node enq(final Node node){
    for(;;){
        Node t = tail;
        if(t == null){ // Must initialize       // 1. 隊(duì)列為空 初始化一個(gè) dummy 節(jié)點(diǎn) 其實(shí)和 ConcurrentLinkedQueue 一樣
            if(compareAndSetHead(new Node())){  // 2. 初始化 head 與 tail (這個(gè)CAS成功后, head 就有值了, 詳情將 Unsafe 操作)
                tail = head;
            }
        }else{
            node.prev = t;                      // 3. 先設(shè)置 Node.pre = pred (PS: 則當(dāng)一個(gè) node在Sync Queue里面時(shí)  node.prev 一定 != null, 但是 node.prev != null 不能說(shuō)明其在 Sync Queue 里面, 因?yàn)楝F(xiàn)在的CAS可能失敗 )
            if(compareAndSetTail(t, node)){     // 4. CAS node 到 tail
                t.next = node;                  // 5. CAS 成功, 將 pred.next = node (PS: 說(shuō)明 node.next != null -> 則 node 一定在 Sync Queue, 但若 node 在Sync Queue 里面不一定 node.next != null)
                return t;
            }
        }
    }
}
10. Sync Queue 節(jié)點(diǎn)出Queue方法

這里的出Queue的方法其實(shí)有兩個(gè)

  1. 新節(jié)點(diǎn)獲取lock, 調(diào)用setHead搶占head, 并且剔除原h(huán)ead
  2. 節(jié)點(diǎn)因被中斷或獲取超時(shí)而進(jìn)行 cancelled, 最后被剔除
/**
 * 設(shè)置 head 節(jié)點(diǎn)(在獨(dú)占模式?jīng)]有并發(fā)的可能, 當(dāng)共享的模式有可能)
 */
private void setHead(Node node){
    head = node;
    node.thread = null; // 清除線程引用
    node.prev = null; // 清除原來(lái) head 的引用 <- 都是 help GC
}


/**
 * Cancels an ongoing attempt to acquire.
 *
 * @param node the node
 */
/**
 * 清除因中斷/超時(shí)而放棄獲取lock的線程節(jié)點(diǎn)(此時(shí)節(jié)點(diǎn)在 Sync Queue 里面)
 */
private void cancelAcquire(Node node) {
    // Ignore if node doesn't exist
    if (node == null)
        return;

    node.thread = null;                 // 1. 線程引用清空

    // Skip cancelled predecessors
    Node pred = node.prev;
    while (pred.waitStatus > 0)       // 2.  若前繼節(jié)點(diǎn)是 CANCELLED 的, 則也一并清除
        node.prev = pred = pred.prev;

    // predNext is the apparent node to unsplice. CASes below will
    // fail if not, in which case, we lost race vs another cancel
    // or signal, so no further action is necessary.
    Node predNext = pred.next;         // 3. 這里的 predNext也是需要清除的(只不過(guò)在清除時(shí)的 CAS 操作需要 它)

    // Can use unconditional write instead of CAS here.
    // After this atomic step, other Nodes can skip past us.
    // Before, we are free of interference from other threads.
    node.waitStatus = Node.CANCELLED; // 4. 標(biāo)識(shí)節(jié)點(diǎn)需要清除

    // If we are the tail, remove ourselves.
    if (node == tail && compareAndSetTail(node, pred)) { // 5. 若需要清除額節(jié)點(diǎn)是尾節(jié)點(diǎn), 則直接 CAS pred為尾節(jié)點(diǎn)
        compareAndSetNext(pred, predNext, null);    // 6. 刪除節(jié)點(diǎn)predNext
    } else {
        // If successor needs signal, try to set pred's next-link
        // so it will get one. Otherwise wake it up to propagate.
        int ws;
        if (pred != head &&
                ((ws = pred.waitStatus) == Node.SIGNAL || // 7. 后繼節(jié)點(diǎn)需要喚醒(但這里的后繼節(jié)點(diǎn)predNext已經(jīng) CANCELLED 了)
                        (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) && // 8. 將 pred 標(biāo)識(shí)為 SIGNAL
                pred.thread != null) {
            Node next = node.next;
            if (next != null && next.waitStatus <= 0) // 8. next.waitStatus <= 0 表示 next 是個(gè)一個(gè)想要獲取lock的節(jié)點(diǎn)
                compareAndSetNext(pred, predNext, next);
        } else {
            unparkSuccessor(node); // 若 pred 是頭節(jié)點(diǎn), 則此刻可能有節(jié)點(diǎn)剛剛進(jìn)入 queue ,所以進(jìn)行一下喚醒
        }

        node.next = node; // help GC
    }
}
11. AbstractQueuedSynchronizer 獨(dú)占的獲取lock

獨(dú)占方式獲取lock主要流程:

1. 調(diào)用 tryAcquire 嘗試性的獲取鎖(一般都是由子類實(shí)現(xiàn)), 成功的話直接返回
2. tryAcquire 調(diào)用獲取失敗, 將當(dāng)前的線程封裝成 Node 加入到 Sync Queue 里面(調(diào)用addWaiter), 等待獲取 signal 信號(hào)
3. 調(diào)用 acquireQueued 進(jìn)行自旋的方式獲取鎖(有可能會(huì) repeatedly blocking and unblocking)
4. 根據(jù)acquireQueued的返回值判斷在獲取lock的過(guò)程中是否被中斷, 若被中斷, 則自己再中斷一下(selfInterrupt), 若是響應(yīng)中斷的則直接拋出異常

獨(dú)占方式獲取lock主要分成下面3類:

1. acquire 不響應(yīng)中斷的獲取lock, 這里的不響應(yīng)中斷指的是線程被中斷后會(huì)被喚醒, 并且繼續(xù)獲取lock,在方法返回時(shí), 根據(jù)剛才的獲取過(guò)程是否被中斷來(lái)決定是否要自己中斷一下(方法 selfInterrupt)
2. doAcquireInterruptibly 響應(yīng)中斷的獲取 lock, 這里的響應(yīng)中斷, 指在線程獲取 lock 過(guò)程中若被中斷, 則直接拋出異常
3. doAcquireNanos 響應(yīng)中斷及超時(shí)的獲取 lock, 當(dāng)線程被中斷, 或獲取超時(shí), 則直接拋出異常, 獲取失敗
12. AbstractQueuedSynchronizer 獨(dú)占的獲取lock 方法 acquire
/** acquire 是用于獲取鎖的最常用的模式
 * 步驟
 *      1. 調(diào)用 tryAcquire 嘗試性的獲取鎖(一般都是又子類實(shí)現(xiàn)), 成功的話直接返回
 *      2. tryAcquire 調(diào)用獲取失敗, 將當(dāng)前的線程封裝成 Node 加入到 Sync Queue 里面(調(diào)用addWaiter), 等待獲取 signal 信號(hào)
 *      3. 調(diào)用 acquireQueued 進(jìn)行自旋的方式獲取鎖(有可能會(huì) repeatedly blocking and unblocking)
 *      4. 根據(jù)acquireQueued的返回值判斷在獲取lock的過(guò)程中是否被中斷, 若被中斷, 則自己再中斷一下(selfInterrupt)
 *
 */
public final void acquire(int arg){
    if(!tryAcquire(arg)&&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg)){
        selfInterrupt();
    }
}
13. AbstractQueuedSynchronizer 循環(huán)獲取lock 方法 acquireQueued
    /**
     * 不支持中斷的獲取鎖
     * 主邏輯:
     *  1. 當(dāng)當(dāng)前節(jié)點(diǎn)的前繼節(jié)點(diǎn)是head節(jié)點(diǎn)時(shí)先 tryAcquire獲取一下鎖, 成功的話設(shè)置新 head, 返回
     *  2. 第一步不成功, 檢測(cè)是否需要sleep, 需要的話就 sleep, 等待前繼節(jié)點(diǎn)在釋放lock時(shí)喚醒 或通過(guò)中斷來(lái)喚醒
     *  3. 整個(gè)過(guò)程可能需要blocking nonblocking 幾次
     */
    final boolean acquireQueued(final Node node, int arg){
        boolean failed = true;
        try {
            boolean interrupted = false;
            for(;;){
                final Node p = node.predecessor();      // 1. 獲取當(dāng)前節(jié)點(diǎn)的前繼節(jié)點(diǎn) (當(dāng)一個(gè)n在 Sync Queue 里面, 并且沒有獲取 lock 的 node 的前繼節(jié)點(diǎn)不可能是 null)
                if(p == head && tryAcquire(arg)){       // 2. 判斷前繼節(jié)點(diǎn)是否是head節(jié)點(diǎn)(前繼節(jié)點(diǎn)是head, 存在兩種情況 (1) 前繼節(jié)點(diǎn)現(xiàn)在占用 lock (2)前繼節(jié)點(diǎn)是個(gè)空節(jié)點(diǎn), 已經(jīng)釋放 lock, node 現(xiàn)在有機(jī)會(huì)獲取 lock); 則再次調(diào)用 tryAcquire嘗試獲取一下
                    setHead(node);                       // 3. 獲取 lock 成功, 直接設(shè)置 新head(原來(lái)的head可能就直接被回收)
                    p.next = null; // help GC          // help gc
                    failed = false;
                    return interrupted;                // 4. 返回在整個(gè)獲取的過(guò)程中是否被中斷過(guò) ; 但這又有什么用呢? 若整個(gè)過(guò)程中被中斷過(guò), 則最后我在 自我中斷一下 (selfInterrupt), 因?yàn)橥饷娴暮瘮?shù)可能需要知道整個(gè)過(guò)程是否被中斷過(guò)
                }
                if(shouldParkAfterFailedAcquire(p, node) && // 5. 調(diào)用 shouldParkAfterFailedAcquire 判斷是否需要中斷(這里可能會(huì)一開始 返回 false, 但在此進(jìn)去后直接返回 true(主要和前繼節(jié)點(diǎn)的狀態(tài)是否是 signal))
                        parkAndCheckInterrupt()){      // 6. 現(xiàn)在lock還是被其他線程占用 那就睡一會(huì), 返回值判斷是否這次線程的喚醒是被中斷喚醒
                    interrupted = true;
                }
            }
        }finally {
            if(failed){                             // 7. 在整個(gè)獲取中出錯(cuò)
                cancelAcquire(node);                // 8. 清除 node 節(jié)點(diǎn)(清除的過(guò)程是先給 node 打上 CANCELLED標(biāo)志, 然后再刪除)
            }
        }
    }
14. AbstractQueuedSynchronizer 支持中斷獲取lock 方法 doAcquireInterruptibly
/**
 * Acquire in exclusive interruptible mode.
 * @param arg the acquire argument
 */
private void doAcquireInterruptibly(int arg) throws InterruptedException{
    final Node node = addWaiter(Node.EXCLUSIVE);  // 1. 將當(dāng)前的線程封裝成 Node 加入到 Sync Queue 里面
    boolean failed = true;
    try {
        for(;;){
            final Node p = node.predecessor(); // 2. 獲取當(dāng)前節(jié)點(diǎn)的前繼節(jié)點(diǎn) (當(dāng)一個(gè)n在 Sync Queue 里面, 并且沒有獲取 lock 的 node 的前繼節(jié)點(diǎn)不可能是 null)
            if(p == head && tryAcquire(arg)){  // 3. 判斷前繼節(jié)點(diǎn)是否是head節(jié)點(diǎn)(前繼節(jié)點(diǎn)是head, 存在兩種情況 (1) 前繼節(jié)點(diǎn)現(xiàn)在占用 lock (2)前繼節(jié)點(diǎn)是個(gè)空節(jié)點(diǎn), 已經(jīng)釋放 lock, node 現(xiàn)在有機(jī)會(huì)獲取 lock); 則再次調(diào)用 tryAcquire嘗試獲取一下
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return;
            }

            if(shouldParkAfterFailedAcquire(p, node) && // 4. 調(diào)用 shouldParkAfterFailedAcquire 判斷是否需要中斷(這里可能會(huì)一開始 返回 false, 但在此進(jìn)去后直接返回 true(主要和前繼節(jié)點(diǎn)的狀態(tài)是否是 signal))
                    parkAndCheckInterrupt()){           // 5. 現(xiàn)在lock還是被其他線程占用 那就睡一會(huì), 返回值判斷是否這次線程的喚醒是被中斷喚醒
                throw new InterruptedException();       // 6. 線程此時(shí)喚醒是通過(guò)線程中斷, 則直接拋異常
            }
        }
    }finally {
        if(failed){                 // 7. 在整個(gè)獲取中出錯(cuò)(比如線程中斷)
            cancelAcquire(node);    // 8. 清除 node 節(jié)點(diǎn)(清除的過(guò)程是先給 node 打上 CANCELLED標(biāo)志, 然后再刪除)
        }
    }
}
15. AbstractQueuedSynchronizer 支持超時(shí)&中斷獲取lock 方法 doAcquireNanos(int arg, long nanosTimeout)
/**
 * Acquire in exclusive timed mode
 *
 * @param arg the acquire argument
 * @param nanosTimeout max wait time
 * @return {@code true} if acquired
 */
private boolean doAcquireNanos(int arg, long nanosTimeout) throws InterruptedException{
    if(nanosTimeout <= 0L){
        return false;
    }

    final long deadline = System.nanoTime() + nanosTimeout; // 0. 計(jì)算截至?xí)r間
    final Node node = addWaiter(Node.EXCLUSIVE);  // 1. 將當(dāng)前的線程封裝成 Node 加入到 Sync Queue 里面
    boolean failed = true;

    try {
        for(;;){
            final Node p = node.predecessor(); // 2. 獲取當(dāng)前節(jié)點(diǎn)的前繼節(jié)點(diǎn) (當(dāng)一個(gè)n在 Sync Queue 里面, 并且沒有獲取 lock 的 node 的前繼節(jié)點(diǎn)不可能是 null)
            if(p == head && tryAcquire(arg)){  // 3. 判斷前繼節(jié)點(diǎn)是否是head節(jié)點(diǎn)(前繼節(jié)點(diǎn)是head, 存在兩種情況 (1) 前繼節(jié)點(diǎn)現(xiàn)在占用 lock (2)前繼節(jié)點(diǎn)是個(gè)空節(jié)點(diǎn), 已經(jīng)釋放 lock, node 現(xiàn)在有機(jī)會(huì)獲取 lock); 則再次調(diào)用 tryAcquire嘗試獲取一下
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return true;
            }

            nanosTimeout = deadline - System.nanoTime(); // 4. 計(jì)算還剩余的時(shí)間
            if(nanosTimeout <= 0L){                      // 5. 時(shí)間超時(shí), 直接返回
                return false;
            }
            if(shouldParkAfterFailedAcquire(p, node) && // 6. 調(diào)用 shouldParkAfterFailedAcquire 判斷是否需要中斷(這里可能會(huì)一開始 返回 false, 但在此進(jìn)去后直接返回 true(主要和前繼節(jié)點(diǎn)的狀態(tài)是否是 signal))
                    nanosTimeout > spinForTimeoutThreshold){ // 7. 若沒超時(shí), 并且大于spinForTimeoutThreshold, 則線程 sleep(小于spinForTimeoutThreshold, 則直接自旋, 因?yàn)樾矢?調(diào)用 LockSupport 是需要開銷的)
                LockSupport.parkNanos(this, nanosTimeout);
            }
            if(Thread.interrupted()){                           // 8. 線程此時(shí)喚醒是通過(guò)線程中斷, 則直接拋異常
                throw new InterruptedException();
            }
        }
    }finally {
        if(failed){                 // 9. 在整個(gè)獲取中出錯(cuò)(比如線程中斷/超時(shí))
            cancelAcquire(node);    // 10. 清除 node 節(jié)點(diǎn)(清除的過(guò)程是先給 node 打上 CANCELLED標(biāo)志, 然后再刪除)
        }
    }
}
16. AbstractQueuedSynchronizer 釋放lock方法

整個(gè)釋放 lock 流程

1. 調(diào)用子類的 tryRelease 方法釋放獲取的資源
2. 判斷是否完全釋放lock(這里有 lock 重復(fù)獲取的情況)
3. 判斷是否有后繼節(jié)點(diǎn)需要喚醒, 需要的話調(diào)用unparkSuccessor進(jìn)行喚醒

看代碼:

/**
 * Releasing in exclusive mode. Implemented by unblocking one or
 * more threads if {@link #tryRelease(int)} returns true.
 * This method can be used to implement method {@link "Lock#unlock}.
 *
 * @param arg the release argument. This value is conveyed to
 *            {@link #tryRelease(int)} but is otherwise uninterpreted and
 *            can represent anything you like.
 * @return the value returned from {@link #tryRelease(int)}
 */
public final boolean release(int arg){
    if(tryRelease(arg)){   // 1. 調(diào)用子類, 若完全釋放好, 則返回true(這里有l(wèi)ock重復(fù)獲取)
        Node h = head;
        if(h != null && h.waitStatus != 0){ // 2. h.waitStatus !=0 其實(shí)就是 h.waitStatus < 0 后繼節(jié)點(diǎn)需要喚醒
            unparkSuccessor(h);   // 3. 喚醒后繼節(jié)點(diǎn)
        }
        return true;
    }
    return false;
}


/**
 * Wakes up node's successor, if one exists.
 * 喚醒 node 的后繼節(jié)點(diǎn)
 * 這里有個(gè)注意點(diǎn): 喚醒時(shí)會(huì)將當(dāng)前node的標(biāo)識(shí)歸位為 0
 * 等于當(dāng)前節(jié)點(diǎn)標(biāo)識(shí)位 的流轉(zhuǎn)過(guò)程: 0(剛加入queue) -> signal (被后繼節(jié)點(diǎn)要求在釋放時(shí)需要喚醒) -> 0 (進(jìn)行喚醒后繼節(jié)點(diǎn))
 *
 */
private void unparkSuccessor(Node node) {
    logger.info("unparkSuccessor node:" + node + Thread.currentThread().getName());
    /*
     * If status is negative (i.e., possibly needing signal) try
     * to clear in anticipation of signalling.  It is OK if this
     * fails or if status is changed by waiting thread.
     */
    int ws = node.waitStatus;
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0);       // 1. 清除前繼節(jié)點(diǎn)的標(biāo)識(shí)

    /*
     * Thread to unpark is held in successor, which is normally
     * just the next node.  But if cancelled or apparently null,
     * traverse backwards from tail to find the actual
     * non-cancelled successor.
     */
    Node s = node.next;
    logger.info("unparkSuccessor s:" + node + Thread.currentThread().getName());
    if (s == null || s.waitStatus > 0) {         // 2. 這里若在 Sync Queue 里面存在想要獲取 lock 的節(jié)點(diǎn),則一定需要喚醒一下(跳過(guò)取消的節(jié)點(diǎn))〉忍(PS: s == null發(fā)生在共享模式的競(jìng)爭(zhēng)釋放資源)
        s = null;
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)              // 3. 找到 queue 里面最前面想要獲取 Lock 的節(jié)點(diǎn)
                s = t;
    }
    logger.info("unparkSuccessor s:"+s);
    if (s != null)
        LockSupport.unpark(s.thread);
}
17. AbstractQueuedSynchronizer 獲取共享lock

共享方式獲取lock流程:

1. 調(diào)用 tryAcquireShared 嘗試性的獲取鎖(一般都是由子類實(shí)現(xiàn)), 成功的話直接返回
2. tryAcquireShared 調(diào)用獲取失敗, 將當(dāng)前的線程封裝成 Node 加入到 Sync Queue 里面(調(diào)用addWaiter), 等待獲取 signal 信號(hào)
3. 在 Sync Queue 里面進(jìn)行自旋的方式獲取鎖(有可能會(huì) repeatedly blocking and unblocking
4. 當(dāng)獲取失敗, 則判斷是否可以 block(block的前提是前繼節(jié)點(diǎn)被打上 SIGNAL 標(biāo)示)
5. 共享與獨(dú)占獲取lock的區(qū)別主要在于 在共享方式下獲取 lock 成功會(huì)判斷是否需要繼續(xù)喚醒下面的繼續(xù)獲取共享lock的節(jié)點(diǎn)(及方法 doReleaseShared)

共享方式獲取lock主要分成下面3類:

1. acquireShared 不響應(yīng)中斷的獲取lock, 這里的不響應(yīng)中斷指的是線程被中斷后會(huì)被喚醒, 并且繼續(xù)獲取lock,在方法返回時(shí), 根據(jù)剛才的獲取過(guò)程是否被中斷來(lái)決定是否要自己中斷一下(方法 selfInterrupt)
2. doAcquireSharedInterruptibly 響應(yīng)中斷的獲取 lock, 這里的響應(yīng)中斷, 指在線程獲取 lock 過(guò)程中若被中斷, 則直接拋出異常
3. doAcquireSharedNanos 響應(yīng)中斷及超時(shí)的獲取 lock, 當(dāng)線程被中斷, 或獲取超時(shí), 則直接拋出異常, 獲取失敗
18. AbstractQueuedSynchronizer 獲取共享lock 方法 acquireShared
/**
 * 獲取 共享 lock
 */
public final void acquireShared(int arg){
    if(tryAcquireShared(arg) < 0){  // 1. 調(diào)用子類, 獲取共享 lock  返回 < 0, 表示失敗
        doAcquireShared(arg);       // 2. 調(diào)用 doAcquireShared 當(dāng)前 線程加入 Sync Queue 里面, 等待獲取 lock
    }
}
19. AbstractQueuedSynchronizer 獲取共享lock 方法 doAcquireShared
/**
 * Acquire in shared uninterruptible mode
 * @param arg the acquire argument
 */
private void doAcquireShared(int arg){
    final Node node = addWaiter(Node.SHARED);       // 1. 將當(dāng)前的線程封裝成 Node 加入到 Sync Queue 里面
    boolean failed = true;

    try {
        boolean interrupted = false;
        for(;;){
            final Node p = node.predecessor();      // 2. 獲取當(dāng)前節(jié)點(diǎn)的前繼節(jié)點(diǎn) (當(dāng)一個(gè)n在 Sync Queue 里面, 并且沒有獲取 lock 的 node 的前繼節(jié)點(diǎn)不可能是 null)
            if(p == head){
                int r = tryAcquireShared(arg);      // 3. 判斷前繼節(jié)點(diǎn)是否是head節(jié)點(diǎn)(前繼節(jié)點(diǎn)是head, 存在兩種情況 (1) 前繼節(jié)點(diǎn)現(xiàn)在占用 lock (2)前繼節(jié)點(diǎn)是個(gè)空節(jié)點(diǎn), 已經(jīng)釋放 lock, node 現(xiàn)在有機(jī)會(huì)獲取 lock); 則再次調(diào)用 tryAcquireShared 嘗試獲取一下
                if(r >= 0){
                    setHeadAndPropagate(node, r);   // 4. 獲取 lock 成功, 設(shè)置新的 head, 并喚醒后繼獲取  readLock 的節(jié)點(diǎn)
                    p.next = null; // help GC
                    if(interrupted){               // 5. 在獲取 lock 時(shí), 被中斷過(guò), 則自己再自我中斷一下(外面的函數(shù)可能需要這個(gè)參數(shù))
                        selfInterrupt();
                    }
                    failed = false;
                    return;
                }
            }

            if(shouldParkAfterFailedAcquire(p, node) && // 6. 調(diào)用 shouldParkAfterFailedAcquire 判斷是否需要中斷(這里可能會(huì)一開始 返回 false, 但在此進(jìn)去后直接返回 true(主要和前繼節(jié)點(diǎn)的狀態(tài)是否是 signal))
                    parkAndCheckInterrupt()){           // 7. 現(xiàn)在lock還是被其他線程占用 那就睡一會(huì), 返回值判斷是否這次線程的喚醒是被中斷喚醒
                interrupted = true;
            }
        }
    }finally {
        if(failed){             // 8. 在整個(gè)獲取中出錯(cuò)(比如線程中斷/超時(shí))
            cancelAcquire(node);  // 9. 清除 node 節(jié)點(diǎn)(清除的過(guò)程是先給 node 打上 CANCELLED標(biāo)志, 然后再刪除)
        }
    }
}
20. AbstractQueuedSynchronizer 獲取共享lock 方法 doAcquireSharedInterruptibly
/**
 * Acquire in shared interruptible mode
 * @param arg the acquire argument
 */
private void doAcquireSharedInterruptibly(int arg) throws InterruptedException{
    final Node node = addWaiter(Node.SHARED);            // 1. 將當(dāng)前的線程封裝成 Node 加入到 Sync Queue 里面
    boolean failed = true;

    try {
        for(;;){
            final Node p = node.predecessor();          // 2. 獲取當(dāng)前節(jié)點(diǎn)的前繼節(jié)點(diǎn) (當(dāng)一個(gè)n在 Sync Queue 里面, 并且沒有獲取 lock 的 node 的前繼節(jié)點(diǎn)不可能是 null)
            if(p == head){
                int r = tryAcquireShared(arg);          // 3. 判斷前繼節(jié)點(diǎn)是否是head節(jié)點(diǎn)(前繼節(jié)點(diǎn)是head, 存在兩種情況 (1) 前繼節(jié)點(diǎn)現(xiàn)在占用 lock (2)前繼節(jié)點(diǎn)是個(gè)空節(jié)點(diǎn), 已經(jīng)釋放 lock, node 現(xiàn)在有機(jī)會(huì)獲取 lock); 則再次調(diào)用 tryAcquireShared 嘗試獲取一下
                if(r >= 0){
                    setHeadAndPropagate(node, r);       // 4. 獲取 lock 成功, 設(shè)置新的 head, 并喚醒后繼獲取  readLock 的節(jié)點(diǎn)
                    p.next = null; // help GC
                    failed = false;
                    return;
                }
            }

            if(shouldParkAfterFailedAcquire(p, node) && // 5. 調(diào)用 shouldParkAfterFailedAcquire 判斷是否需要中斷(這里可能會(huì)一開始 返回 false, 但在此進(jìn)去后直接返回 true(主要和前繼節(jié)點(diǎn)的狀態(tài)是否是 signal))
                    parkAndCheckInterrupt()){           // 6. 現(xiàn)在lock還是被其他線程占用 那就睡一會(huì), 返回值判斷是否這次線程的喚醒是被中斷喚醒
                throw new InterruptedException();     // 7. 若此次喚醒是 通過(guò)線程中斷, 則直接拋出異常
            }
        }
    }finally {
        if(failed){              // 8. 在整個(gè)獲取中出錯(cuò)(比如線程中斷/超時(shí))
            cancelAcquire(node); // 9. 清除 node 節(jié)點(diǎn)(清除的過(guò)程是先給 node 打上 CANCELLED標(biāo)志, 然后再刪除)
        }
    }
}
21. AbstractQueuedSynchronizer 獲取共享lock 方法 doAcquireSharedNanos
/**
 * Acquire in shared timed mode
 *
 * @param arg the acquire argument
 * @param nanosTimeout max wait time
 * @return {@code true} if acquired
 */
private boolean doAcquireSharedNanos(int arg, long nanosTimeout) throws InterruptedException{
    if (nanosTimeout <= 0L){
        return false;
    }

    final long deadline = System.nanoTime() + nanosTimeout;  // 0. 計(jì)算超時(shí)的時(shí)間
    final Node node = addWaiter(Node.SHARED);               // 1. 將當(dāng)前的線程封裝成 Node 加入到 Sync Queue 里面
    boolean failed = true;

    try {
        for(;;){
            final Node p = node.predecessor();          // 2. 獲取當(dāng)前節(jié)點(diǎn)的前繼節(jié)點(diǎn) (當(dāng)一個(gè)n在 Sync Queue 里面, 并且沒有獲取 lock 的 node 的前繼節(jié)點(diǎn)不可能是 null)
            if(p == head){
                int r = tryAcquireShared(arg);          // 3. 判斷前繼節(jié)點(diǎn)是否是head節(jié)點(diǎn)(前繼節(jié)點(diǎn)是head, 存在兩種情況 (1) 前繼節(jié)點(diǎn)現(xiàn)在占用 lock (2)前繼節(jié)點(diǎn)是個(gè)空節(jié)點(diǎn), 已經(jīng)釋放 lock, node 現(xiàn)在有機(jī)會(huì)獲取 lock); 則再次調(diào)用 tryAcquireShared 嘗試獲取一下
                if(r >= 0){
                    setHeadAndPropagate(node, r);       // 4. 獲取 lock 成功, 設(shè)置新的 head, 并喚醒后繼獲取  readLock 的節(jié)點(diǎn)
                    p.next = null; // help GC
                    failed = false;
                    return true;
                }
            }

            nanosTimeout = deadline - System.nanoTime(); // 5. 計(jì)算還剩余的 timeout , 若小于0 則直接return
            if(nanosTimeout <= 0L){
                return false;
            }
            if(shouldParkAfterFailedAcquire(p, node) &&         // 6. 調(diào)用 shouldParkAfterFailedAcquire 判斷是否需要中斷(這里可能會(huì)一開始 返回 false, 但在此進(jìn)去后直接返回 true(主要和前繼節(jié)點(diǎn)的狀態(tài)是否是 signal))
                    nanosTimeout > spinForTimeoutThreshold){// 7. 在timeout 小于  spinForTimeoutThreshold 時(shí) spin 的效率, 比 LockSupport 更高
                LockSupport.parkNanos(this, nanosTimeout);
            }
            if(Thread.interrupted()){                           // 7. 若此次喚醒是 通過(guò)線程中斷, 則直接拋出異常
                throw new InterruptedException();
            }
        }
    }finally {
        if (failed){                // 8. 在整個(gè)獲取中出錯(cuò)(比如線程中斷/超時(shí))
            cancelAcquire(node);    // 10. 清除 node 節(jié)點(diǎn)(清除的過(guò)程是先給 node 打上 CANCELLED標(biāo)志, 然后再刪除)
        }
    }
}
22. AbstractQueuedSynchronizer 釋放共享lock 方法

特點(diǎn): 當(dāng) Sync Queue中存在連續(xù)多個(gè)獲取 共享lock的節(jié)點(diǎn)時(shí), 會(huì)出現(xiàn)并發(fā)的喚醒后繼節(jié)點(diǎn)(因?yàn)楣蚕砟J较芦@取lock后會(huì)喚醒近鄰的后繼節(jié)點(diǎn)來(lái)獲取lock)

流程:

1. 調(diào)用子類的 tryReleaseShared來(lái)進(jìn)行釋放 lock
2. 判斷是否需要喚醒后繼節(jié)點(diǎn)來(lái)獲取 lock

調(diào)用流分類

場(chǎng)景1: Sync Queue 里面存在 : 1(共享) -> 2(共享) -> 3(共享) -> 4(共享)
   節(jié)點(diǎn)1獲取 lock 后調(diào)用 setHeadAndPropagate -> doReleaseShared 喚醒 節(jié)點(diǎn)2 —> 接下來(lái) node 1 在 release 時(shí)再次 doReleaseShared, 而 node 2在獲取 lock 后調(diào)用 setHeadAndPropagate 時(shí)再次 doReleaseShared -> 直至到 node 4, node 4的狀態(tài)變成 PROPAGATE (期間可能有些節(jié)點(diǎn)還沒設(shè)置為 PROPAGATE 就被其他節(jié)點(diǎn)調(diào)用 setHead 而踢出 Sync Queue)
   
場(chǎng)景2: Sync Queue 里面存在 : 1(共享) -> 2(共享) -> 3(獨(dú)占) -> 4(共享)
   節(jié)點(diǎn)1獲取 lock 后調(diào)用 setHeadAndPropagate -> doReleaseShared 喚醒 節(jié)點(diǎn)2 —> 接下來(lái) node 1 在 release 時(shí)再次 doReleaseShared, 而 node 2 在獲取 lock 后
   這是發(fā)現(xiàn)后繼節(jié)點(diǎn)不是共享的, 則 Node 2 不在 setHeadAndPropagate 中調(diào)用 doReleaseShared, 而Node 3 沒有獲取lock, 將 Node 2 變成 SIGNAL, 而 node 2 在 release lock 時(shí)喚醒 node 3, 而 node 3 最終在 release lock 時(shí) 釋放 node 4倒槐, node 4在release lock后狀態(tài)還是保持 0

看代碼:

private void doReleaseShared(){
    /**
     * Ensure that a release propagates, even if there are other
     * in-progress acquires/releases. This proceed in the usual
     * way of trying to unparkSuccessor of the head if it needs
     * signal. But if it does not, status is set to PROPAGATE to
     * ensure that upon release, propagation continues.
     * Additionally, we must loop in case a new node is added
     * while we are doing this. Also, unlike other uses of
     * unparkSuccessor, we need to know if CAS to reset status
     * fails, if so rechecking.
     */
    for(;;){
        Node h = head;                      // 1. 獲取 head 節(jié)點(diǎn), 準(zhǔn)備 release
        if(h != null && h != tail){        // 2. Sync Queue 里面不為 空
            int ws = h.waitStatus;
            if(ws == Node.SIGNAL){         // 3. h節(jié)點(diǎn)后面可能是 獨(dú)占的節(jié)點(diǎn), 也可能是 共享的, 并且請(qǐng)求了喚醒(就是給前繼節(jié)點(diǎn)打標(biāo)記 SIGNAL)
                if(!compareAndSetWaitStatus(h, Node.SIGNAL, 0)){ // 4. h 恢復(fù)  waitStatus 值置0 (為啥這里要用 CAS 呢, 因?yàn)檫@里的調(diào)用可能是在 節(jié)點(diǎn)剛剛獲取 lock, 而其他線程又對(duì)其進(jìn)行中斷, 所用cas就出現(xiàn)失敗)
                    continue; // loop to recheck cases
                }
                unparkSuccessor(h);         // 5. 喚醒后繼節(jié)點(diǎn)
            }
            else if(ws == 0 &&
                    !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)){ //6. h后面沒有節(jié)點(diǎn)需要喚醒, 則標(biāo)識(shí)為 PROPAGATE 表示需要繼續(xù)傳遞喚醒(主要是區(qū)別 獨(dú)占節(jié)點(diǎn)最終狀態(tài)0 (獨(dú)占的節(jié)點(diǎn)在沒有后繼節(jié)點(diǎn), 并且release lock 時(shí)最終 waitStatus 保存為 0))
                continue; // loop on failed CAS // 7. 同樣這里可能存在競(jìng)爭(zhēng)
            }
        }

        if(h == head){ // 8. head 節(jié)點(diǎn)沒變化, 直接 return(從這里也看出, 一個(gè)共享模式的 節(jié)點(diǎn)在其喚醒后繼節(jié)點(diǎn)時(shí), 只喚醒一個(gè), 但是 它會(huì)在 獲取 lock 時(shí)喚醒, 釋放 lock 時(shí)也進(jìn)行, 所以或?qū)е赂?jìng)爭(zhēng)的操作)
            break;           // head 變化了, 說(shuō)明其他節(jié)點(diǎn)獲取 lock 了, 自己的任務(wù)完成, 直接退出
        }

    }
}
23. AbstractQueuedSynchronizer 判斷是否阻塞線程方法 shouldParkAfterFailedAcquire
    /**
     * shouldParkAfterFailedAcquire 這個(gè)方法最終的作用:
     *  本節(jié)點(diǎn)在進(jìn)行 sleep 前一定需要給 前繼節(jié)點(diǎn)打上 SIGNAL 標(biāo)識(shí)(
     *  因?yàn)榍袄^節(jié)點(diǎn)在 release lock 時(shí)會(huì)根據(jù) 這個(gè)標(biāo)識(shí)決定是否需要喚醒后繼節(jié)點(diǎn)來(lái)獲取 lock,
     *  若釋放時(shí) 標(biāo)識(shí)是0, 則說(shuō)明 Sync queue 里面沒有等待獲取lock的線程, 或Sync queue里面的節(jié)點(diǎn)正在獲取 lock)
     *
     *  一般流程:
     *      1. 第一次進(jìn)入此方法 前繼節(jié)點(diǎn)狀態(tài)是 0, 則 CAS 為SIGNAL 返回 false(干嘛返回的是FALSE <- 主要是為了再次 tryAcquire 一下, 說(shuō)不定就能獲取鎖呢)
     *      2. 第二次進(jìn)來(lái) 前繼節(jié)點(diǎn)標(biāo)志為SIGNAL, ok, 標(biāo)識(shí)好了, 這下就可以安心睡覺, 不怕前繼節(jié)點(diǎn)在釋放lock后不進(jìn)行喚醒我了
     */
    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node){
        int ws = pred.waitStatus;
        if(ws == Node.SIGNAL){                                      // 1. 判斷是否已經(jīng)給前繼節(jié)點(diǎn)打上標(biāo)識(shí)SIGNAL, 為前繼節(jié)點(diǎn)釋放 lock 時(shí)喚醒自己做準(zhǔn)備
            /**
             * This node has already set status asking a release
             * to signal it, so it can safely park
             */
            return true;
        }

        if(ws > 0){                                                 // 2. 遇到個(gè) CANCELLED 的節(jié)點(diǎn) (ws > 0 只可能是 CANCELLED 的節(jié)點(diǎn), 也就是 獲取中被中斷, 或超時(shí)的節(jié)點(diǎn))
            /**                                                     // 這里我們幫助刪除
             * Predecessor was cancelled. Skip over predecessors and
             * indicate retry
             */
            do{
                node.prev = pred = pred.prev;                    // 3. 跳過(guò)所有 CANCELLED 的節(jié)點(diǎn)
            }while(pred.waitStatus > 0);
            pred.next = node;                                    // 跳過(guò) CANCELLED 節(jié)點(diǎn)
        }
        else{
            /**
             * waitStatus must be 0 or PROPAGATE. Indicate that we
             * need a signal, but don't park yet. Caller will need to
             * retry to make sure it cannot acquire before parking
             */
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);     // 4. 到這里 ws 只可能是 0 或 PROPAGATE (用于 共享模式的, 所以在共享模式中的提醒前繼節(jié)點(diǎn)喚醒自己的方式,
                                                                // 也是給前繼節(jié)點(diǎn)打上 SIGNAL標(biāo)識(shí) 見 方法 "doReleaseShared" -> "!compareAndSetWaitStatus(h, Node.SIGNAL, 0)" -> unparkSuccessor)
        }

        return false;
    }
24. AbstractQueuedSynchronizer 線程自己中斷方法selfInterrupt
    /**
     * 自我中斷, 這主要是怕外面的線程不知道整個(gè)獲取的過(guò)程中是否中斷過(guò), 所以才 ....
     */
    static void selfInterrupt(){
        Thread.currentThread().interrupt();
    }
25. AbstractQueuedSynchronizer 中斷線程方法parkAndCheckInterrupt
    /**
     * Convenience method to park and then check if interrupted
     *
     * @return {@code true} if interrupted
     */
    /**
     * 中斷當(dāng)前線程, 并且返回此次的喚醒是否是通過(guò)中斷
     */
    private final boolean parkAndCheckInterrupt() {
        LockSupport.park(this);
        logger.info(Thread.currentThread().getName() + " " + "parkAndCheckInterrupt , ThreadName:" + Thread.currentThread().getName());
        return Thread.interrupted(); //  Thread.interrupted() 會(huì)清除中斷標(biāo)識(shí), 并返上次的中斷標(biāo)識(shí)
    }
26. AbstractQueuedSynchronizer 一般方法
   /******************************************* Queue inspection methods ****************************/


    /**
     *  SyncQueue 里面是否有 node 節(jié)點(diǎn)
     */
    public final boolean hasQueuedThreads() {
        return head != tail;
    }

   
    /**
     * 獲取 lock 是否發(fā)生競(jìng)爭(zhēng)
     */
    public final boolean hasContented(){
        return head != null;
    }


    /**
     * Sync Queue 里面的有效的, 最前面的 node 節(jié)點(diǎn)
     */
    public final Thread getFirstQueuedThread(){
        return (head == tail) ? null : fullGetFirstQueuedThread();
    }

    /**
     * Sync Queue 里面的有效的, 最前面的 node 節(jié)點(diǎn)
     */
    private Thread fullGetFirstQueuedThread(){
        /**
         * The first node is normally head next. Try to get its
         * thread field, ensuring consistent reads: If thread
         * field is nulled out or s.prev is no longer head, then
         * some other thread(s) concurrently performed sethead in
         * between some of our reads. we try this twice before
         * restorting to traversal
         */

        Node h, s;
        Thread st;
        /**
         * 這里兩次檢測(cè)是怕線程 timeout 或 cancelled
         */
        if((
                (h = head) != null && (s = h.next) != null &&
                        s.prev == head && (st = s.thread) != null ||
                        (
                                (h = head) != null && (s = h.next) != null &&
                                        s.prev == head && (st = s.thread) != null
                                )
                )){
            return st;
        }

        /**
         * Head's next field might not have been set yet, or may have
         * been unset after setHead, So we must check to see if tail
         * is actually first node. If not, we continue on, safely
         * traversing from tail back to head to find first,
         * guaranteeing termination
         */
        /**
         * 從 tail 開始找
         */
        Node t = tail;
        Thread firstThread = null;
        while(t != null && t != head){
            Thread tt = t.thread;
            if(tt != null){
                firstThread = tt;
            }
            t = t.prev;
        }
        return firstThread;
    }


    /**
     * 判斷線程是否在 Sync Queue 里面
     */
    public final boolean isQueued(Thread thread){
        if(thread == null){
            throw new NullPointerException();
        }
        for(Node p = tail; p != null; p = p.prev){ // 從tail 開始
            if(p.thread == thread){
                return true;
            }
        }
        return false;
    }


    /**
     * 判斷 Sync Queue 中等待獲取 lock 的第一個(gè) node 是否是 獲取 writeLock 的(head 節(jié)點(diǎn)是已經(jīng)獲取 lock 的節(jié)點(diǎn))
     */
    public final boolean apparentlyFirstQueuedIsExclusive(){
        Node h, s;
        return (h = head) != null &&
                (s = h.next) != null &&
                !s.isShared()       &&
                s.thread != null;
    }


    /**
     * 當(dāng)前節(jié)點(diǎn)之前在 Sync Queue 里面是否有等待獲取的 Node
     */
    public final boolean hasQueuedPredecessors(){
        /**
         * The correctness of this depends on head being initialized
         * before tail and on head next being accurate if the current
         * thread is first in queue
         */
        Node t = tail; // Read fields in reverse initialization order
        Node h = head;
        Node s;
        return h != t &&        // h != t 表示 Sync Queu 里面至少存在 一個(gè)節(jié)點(diǎn) (這時(shí)的 h節(jié)點(diǎn)可能是 null)
                ((s = h.next) == null || s.thread != Thread.currentThread()); // (s = h.next) == null 說(shuō)明 h節(jié)點(diǎn)獲取 lock, 而后又被其他獲取 lock 的節(jié)點(diǎn)從 Sync Queue 里面剔除掉了
    }


    /********************************************* Instrumentation and monitoring methods **************************/


    /**
     * 獲取 Sync Queue 里面 等待 獲取 lock 的 長(zhǎng)度
     */
    public final int getQueueLength(){
        int n = 0;
        for(Node p = tail; p != null; p = p.prev){
            if(p.thread != null){
                ++n;
            }
        }
        return n;
    }


    /**
     * 獲取 Sync Queue 里面 等待 獲取 lock 的 thread
     */
    public final Collection<Thread> getQueuedThreads(){
        ArrayList<Thread> list = new ArrayList<>();
        for(Node p = tail; p != null; p = p.prev){
            Thread t = p.thread;
            if(t != null){
                list.add(t);
            }
        }
        return list;
    }


    /**
     * 獲取 Sync Queue 里面 等待 獲取 writeLock 的 thread
     */
    public final Collection<Thread> getExclusiveQueuedThreads(){
        ArrayList<Thread> list = new ArrayList<>();
        for(Node p = tail; p != null; p = p.prev){
            if(!p.isShared()){
                Thread t = p.thread;
                if(t != null){
                    list.add(t);
                }
            }
        }
        return list;
    }


    /**
     * 獲取 Sync Queue 里面 等待 獲取 readLock 的 thread
     */
    public final Collection<Thread> getSharedQueuedThreads(){
        ArrayList<Thread> list = new ArrayList<>();
        for(Node p = tail; p != null; p = p.prev){
            if(p.isShared()){
                Thread t = p.thread;
                if(t != null){
                    list.add(t);
                }
            }
        }
        return list;
    }


    public String toString(){
        int s = getState();
        String q = hasQueuedThreads() ? "non" : "";
        return super.toString() + "[State = " + s + ", " + q + " empty queue]";
    }


    /*********************** Internal support methods for Conditions ***********************/

    /**
     * 判斷 node 是否在 Sync Queue 里面
     */
    final boolean isOnSyncQueue(Node node){
        /**
         * 這里有點(diǎn) tricky,
         * node.waitStatus == Node.CONDITION 則說(shuō)明 node 一定在 Condition 里面
         * node.prev == null 說(shuō)明 node 一定不在 Sync Queue 里面
         */
        if(node.waitStatus == Node.CONDITION || node.prev == null){
            return false;
        }
        // node.next != null 則 node 一定在 Sync Queue; 但是反過(guò)來(lái) 在Sync Queue 里面的節(jié)點(diǎn) 不一定  node.next != null
        if(node.next != null){ // If has successor, it must be on queue
            return true;
        }

        /**
         * node.prev can be non-null, but not yet on queue because
         * the CAS to place it on queue can fail. So we have to
         * traverse from tail to make sure it actually make it. It
         * will always be near the tail in calls to this method, and
         * unless the CAS failed (which is unlikely), it will be
         * there, so we hardly ever traverse much
         */
        /**
         * 因?yàn)檫@里存在 node 開始enq Sync Queue 的情形, 所以在此查找一下
         */
        return findNodeFromTail(node);
    }
    
    /**
     * 從 tail 開始查找 node
     */
    private boolean findNodeFromTail(Node node){
        Node t = tail;
        for(;;){
            if(t == node){
                return true;
            }
            if(t == null){
                return false;
            }
            t = t.prev;
        }
    }
    
    /**
     * 將 Node 從Condition Queue 轉(zhuǎn)移到 Sync Queue 里面
     * 在調(diào)用transferForSignal之前, 會(huì) first.nextWaiter = null;
     * 而我們發(fā)現(xiàn) 若節(jié)點(diǎn)是因?yàn)?timeout / interrupt 進(jìn)行轉(zhuǎn)移, 則不會(huì)清除兩種情況的轉(zhuǎn)移都會(huì)把 wautStatus 置為 0
     */
    final boolean transferForSignal(Node node){
        /**
         * If cannot change waitStatus, the node has been cancelled
         */
        if(!compareAndSetWaitStatus(node, Node.CONDITION, 0)){ // 1. 若 node 已經(jīng) cancelled 則失敗
            return false;
        }

        /**
         * Splice onto queue and try to set waitStatus of predecessor to
         * indicate that thread is (probably) waiting, If cancelled or
         * attempt to set waitStatus fails, wake up to resync (in which
         * case the waitStatus can be transiently and harmlessly wrong)
         */
        Node p = enq(node);                                 // 2. 加入 Sync Queue
        int ws = p.waitStatus;
        if(ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)){ // 3. 這里的 ws > 0 指Sync Queue 中node 的前繼節(jié)點(diǎn)cancelled 了, 所以, 喚醒一下 node ; compareAndSetWaitStatus(p, ws, Node.SIGNAL)失敗, 則說(shuō)明 前繼節(jié)點(diǎn)已經(jīng)變成 SIGNAL 或 cancelled, 所以也要 喚醒
            LockSupport.unpark(node.thread);
        }
        return true;
    }


    /**
     * 將 Condition Queue 中因 timeout/interrupt 而喚醒的節(jié)點(diǎn)進(jìn)行轉(zhuǎn)移
     */
    final boolean transferAfterCancelledWait(Node node){
        if(compareAndSetWaitStatus(node, Node.CONDITION, 0)){ // 1. 沒有 node 沒有 cancelled , 直接進(jìn)行轉(zhuǎn)移 (轉(zhuǎn)移后, Sync Queue , Condition Queue 都會(huì)存在 node)
            enq(node);
            return true;
        }

        /**
         * If we lost out to a signal(), then we can't proceed
         * until it finishes its enq(). Cancelling during an
         * incomplete transfer is both race and transient, so just
         * spin
         */
        while(!isOnSyncQueue(node)){                // 2.這時(shí)是其他的線程發(fā)送signal,將本線程轉(zhuǎn)移到 Sync Queue 里面的工程中(轉(zhuǎn)移的過(guò)程中 waitStatus = 0了, 所以上面的 CAS 操作失敗)
            Thread.yield();                         // 這里調(diào)用 isOnSyncQueue判斷是否已經(jīng) 入Sync Queue 了
        }
        return false;
    }

    /******************** Instrumentation methods for conditions ***************/

  
    /**
     * condition 是否屬于這個(gè) AQS 的
     */
    public final boolean owns(ConditionObject condition){
        return condition.isOwnedBy(this);
    }


    /**
     * 這個(gè) condition Queue 里面是否有等待的線程
     */
    public final boolean hasWaiters(ConditionObject condition){
        if(!owns(condition)){
            throw new IllegalArgumentException();
        }
        return condition.hasWaiters();
    }


    /**
     * 這個(gè) condition Queue 里面等待的線程的量
     */
    public final int getWaitQueueLength(ConditionObject condition){
        if(!owns(condition)){
            throw new IllegalArgumentException("Not owner");
        }
        return condition.getWaitQueueLength();
    }

    /**
     * 這個(gè) condition Queue 里面等待的線程
     */
    public final Collection<Thread> getWaitingThreads(ConditionObject condition){
        if(!owns(condition)){
            throw new IllegalArgumentException("not owner");
        }
        return condition.getWaitingThreads();
    }
27. 總結(jié)

AQS 主要由 Node, Condition Queue, Sync Queue, 獨(dú)占獲取lock, 共享獲取lock 這幾部分組成; 它 JUC 中l(wèi)ock及l(fā)ock的工具類的基礎(chǔ)構(gòu)件, 理解了AQS, 就能很好理解 ReeantrantLock, Condition, ReeantrantReadWriteLock;

參考:
The java.util.concurrent Synchronizer Framework (這是AQS作者寫的, 強(qiáng)烈建議看一下)
Java多線程之JUC包:AbstractQueuedSynchronizer(AQS)源碼學(xué)習(xí)筆記
AbstractQueuedSynchronizer的介紹和原理分析
Java 1.6 AbstractQueuedSynchronizer源碼解析
Jdk1.6 JUC源碼解析(6)-locks-AbstractQueuedSynchronizer
JDK1.8源碼分析之AbstractQueuedSynchronizer(二)
java并發(fā)鎖ReentrantReadWriteLock讀寫鎖源碼分析
Java 理論與實(shí)踐: 非阻塞算法簡(jiǎn)介

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末热幔,一起剝皮案震驚了整個(gè)濱河市蛉谜,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌姻采,老刑警劉巖雅采,帶你破解...
    沈念sama閱讀 206,482評(píng)論 6 481
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場(chǎng)離奇詭異慨亲,居然都是意外死亡婚瓜,警方通過(guò)查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,377評(píng)論 2 382
  • 文/潘曉璐 我一進(jìn)店門刑棵,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)巴刻,“玉大人,你說(shuō)我怎么就攤上這事蛉签『悖” “怎么了沥寥?”我有些...
    開封第一講書人閱讀 152,762評(píng)論 0 342
  • 文/不壞的土叔 我叫張陵,是天一觀的道長(zhǎng)柠座。 經(jīng)常有香客問我邑雅,道長(zhǎng),這世上最難降的妖魔是什么愚隧? 我笑而不...
    開封第一講書人閱讀 55,273評(píng)論 1 279
  • 正文 為了忘掉前任蒂阱,我火速辦了婚禮,結(jié)果婚禮上狂塘,老公的妹妹穿的比我還像新娘录煤。我一直安慰自己,他們只是感情好荞胡,可當(dāng)我...
    茶點(diǎn)故事閱讀 64,289評(píng)論 5 373
  • 文/花漫 我一把揭開白布妈踊。 她就那樣靜靜地躺著,像睡著了一般泪漂。 火紅的嫁衣襯著肌膚如雪廊营。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 49,046評(píng)論 1 285
  • 那天萝勤,我揣著相機(jī)與錄音露筒,去河邊找鬼。 笑死敌卓,一個(gè)胖子當(dāng)著我的面吹牛慎式,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播趟径,決...
    沈念sama閱讀 38,351評(píng)論 3 400
  • 文/蒼蘭香墨 我猛地睜開眼瘪吏,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼!你這毒婦竟也來(lái)了蜗巧?” 一聲冷哼從身側(cè)響起掌眠,我...
    開封第一講書人閱讀 36,988評(píng)論 0 259
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤,失蹤者是張志新(化名)和其女友劉穎幕屹,沒想到半個(gè)月后蓝丙,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 43,476評(píng)論 1 300
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡望拖,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 35,948評(píng)論 2 324
  • 正文 我和宋清朗相戀三年渺尘,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片靠娱。...
    茶點(diǎn)故事閱讀 38,064評(píng)論 1 333
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖掠兄,靈堂內(nèi)的尸體忽然破棺而出像云,到底是詐尸還是另有隱情锌雀,我是刑警寧澤,帶...
    沈念sama閱讀 33,712評(píng)論 4 323
  • 正文 年R本政府宣布迅诬,位于F島的核電站腋逆,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏侈贷。R本人自食惡果不足惜惩歉,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,261評(píng)論 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望俏蛮。 院中可真熱鬧撑蚌,春花似錦、人聲如沸搏屑。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,264評(píng)論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)辣恋。三九已至亮垫,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間伟骨,已是汗流浹背饮潦。 一陣腳步聲響...
    開封第一講書人閱讀 31,486評(píng)論 1 262
  • 我被黑心中介騙來(lái)泰國(guó)打工, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留携狭,地道東北人继蜡。 一個(gè)月前我還...
    沈念sama閱讀 45,511評(píng)論 2 354
  • 正文 我出身青樓,卻偏偏與公主長(zhǎng)得像暑中,于是被迫代替她去往敵國(guó)和親壹瘟。 傳聞我的和親對(duì)象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 42,802評(píng)論 2 345

推薦閱讀更多精彩內(nèi)容