java并發(fā)包底層原理

前言

JDK并發(fā)包是基于兩個基礎(chǔ)類 UnsafeLockSupport實現(xiàn)的:

  • unsafe 提供基于os系統(tǒng)調(diào)用的內(nèi)存CAS操作
  • LockSupport 提供基于os系統(tǒng)調(diào)用的線程阻塞甸赃、喚醒

在上面兩個核心類的基礎(chǔ)上,JDK并發(fā)包基于鏈表實現(xiàn)的隊列文黎,實現(xiàn)了核心類 AbstractQueuedSynchronizer, 這個同步器是并發(fā)包其他一切工具類的底層實現(xiàn)核心百炬;

AQS 源碼分析

由于JDK里AQS的源碼風(fēng)格比較像C++,易讀性也比較差,我重新整理了下代碼注釋和結(jié)構(gòu)断箫,便于分析

package zxl.org.aqs;

/**
 * AQS隊列的節(jié)點實現(xiàn)
 *
 * @author zhouxiliang
 * @date 2020/9/11 19:34
 */
public class Node {
    static final Node SHARED = new Node();
    static final Node EXCLUSIVE = null;

    static final int CANCELLED = 1;
    static final int SIGNAL = -1;
    static final int CONDITION = -2;
    static final int PROPAGATE = -3;

    /**
     * Node 屬性
     */
    //最關(guān)鍵屬性 每個Node都代表一個線程
    volatile Thread thread;

    //Node可以組成雙向鏈表
    volatile Node prev;
    volatile Node next;

    //下面兩個字段只是用來表示Node狀態(tài)的

    /**
     * 共享的node 為 shared
     * 獨享的node 為 null
     */
    Node nextWaiter;
    /**
     * 參見上面的4個狀態(tài) cancel 代表取消等待  condition 代表在等待事件發(fā)生  signal 代表事件發(fā)生了
     * propagate 是傳播汇歹,應(yīng)該是共享節(jié)點用的
     */
    volatile int waitStatus;

    public Node() {
    }

    public Node(Thread thread, Node mode) {
        this.nextWaiter = mode;
        this.thread = thread;
    }

    public Node(Thread thread, int waitStatus) {
        this.waitStatus = waitStatus;
        this.thread = thread;
    }

    public final boolean isShared() {
        return nextWaiter == SHARED;
    }

    public final Node predecessor() throws NullPointerException {
        Node p = prev;
        if (p == null)
            throw new NullPointerException();
        else
            return p;
    }
}
package zxl.org.aqs;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Date;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.LockSupport;

/**
 * AQS 的等待隊列(一個競爭隊列,可以掛載N個等待隊列)
 *
 * @author zhouxiliang
 * @date 2020/9/11 19:43
 */
public class ConditionObject implements Condition, java.io.Serializable {
    private static final long serialVersionUID = 1173984872572414699L;
    private static final long spinForTimeoutThreshold = 1000L;
    private static final int REINTERRUPT = 1;
    private static final int THROW_IE = -1;

    /**
     * condition 的內(nèi)部屬性
     */
    private transient Node firstWaiter;
    private transient Node lastWaiter;
    //每個condition都必須附屬于一個 synchronizer
    private AbstractQueuedSynchronizer synchronizer;

    /**
     * 將源碼的內(nèi)部類獨立提取出來 方便理解
     *
     * @param synchronizer
     */
    public ConditionObject(AbstractQueuedSynchronizer synchronizer) {
        this.synchronizer = synchronizer;
    }


    /**
     * conditionObject 的內(nèi)部核心方法
     */


    /**
     * 添加節(jié)點邏輯
     * 添加時候會檢查最后一個Node是否取消犁罩,如果取消過則觸發(fā)整個鏈表的清理過程
     *
     * @return
     */
    private Node addConditionWaiter() {
        Node t = lastWaiter;
        if (t != null && t.waitStatus != Node.CONDITION) {
            //檢查最后一個Node 的狀態(tài)齐蔽,如果被重置為非condition,則清理一下整個鏈表
            unlinkCancelledWaiters();
            t = lastWaiter;
        }
        //創(chuàng)建一個新的condition狀態(tài)的節(jié)點
        Node node = new Node(Thread.currentThread(), Node.CONDITION);
        if (t == null) {
            //初始化
            lastWaiter = firstWaiter = node;
        } else {
            //移動尾部節(jié)點 這里處理方式為單向鏈表
            t.nextWaiter = node;
            lastWaiter = node;
        }
        //返回新創(chuàng)建的節(jié)點
        return node;
    }

    /**
     * 將取消等待的Node從單向鏈表中刪除
     */
    private void unlinkCancelledWaiters() {
        Node t = firstWaiter;
        //循環(huán)過程中用來保存前一個condition節(jié)點
        Node trail = null;
        //簡單遍歷床估,將非condition節(jié)點從單向鏈表里刪除
        while (t != null) {
            Node next = t.nextWaiter;
            if (t.waitStatus != Node.CONDITION) {
                t.nextWaiter = null;
                if (trail == null)
                    firstWaiter = next;
                else
                    trail.nextWaiter = next;
                if (next == null)
                    lastWaiter = trail;
            } else
                trail = t;
            t = next;
        }
    }

    /**
     * 簡單的從單向等待列表里 刪除第一個condition節(jié)點
     * 這里的循環(huán)是為了保證確定有一個node是在 condition狀態(tài)肴熏, synchronizer會判斷刪除節(jié)點的狀態(tài),并加入到競爭隊列
     *
     * @param first
     */
    private void doSignal(Node first) {
        do {
            if ((firstWaiter = first.nextWaiter) == null)
                lastWaiter = null;
            first.nextWaiter = null;
        } while (!synchronizer.transferForSignal(first) &&
                (first = firstWaiter) != null);
    }

    /**
     * 同上
     * 只是循環(huán)處理所有等待節(jié)點顷窒,鍵入synchronizer的競爭隊列里
     *
     * @param first
     */
    private void doSignalAll(Node first) {
        lastWaiter = firstWaiter = null;
        do {
            Node next = first.nextWaiter;
            first.nextWaiter = null;
            synchronizer.transferForSignal(first);
            first = next;
        } while (first != null);
    }


    /**
     * 針對Condition接口的實現(xiàn)
     */

    /**
     * 增加了邊界條件判斷
     * 其他邏輯參考 doSignal
     */
    public final void signal() {
        if (!synchronizer.isHeldExclusively())
            throw new IllegalMonitorStateException();
        Node first = firstWaiter;
        if (first != null)
            doSignal(first);
    }

    /**
     * 增加了邊界條件判斷
     * 其他邏輯參考 doSignalAll
     */
    public final void signalAll() {
        if (!synchronizer.isHeldExclusively())
            throw new IllegalMonitorStateException();
        Node first = firstWaiter;
        if (first != null)
            doSignalAll(first);
    }

    /**
     * 1. 加入等待隊列
     * 2. 通過block方式等待進(jìn)入競爭隊列
     * 3. 通過block方式等待變?yōu)楦偁庩犃械牡谝粋€
     */
    public final void awaitUninterruptibly() {
        //將當(dāng)前線程加入到等到隊列中
        Node node = addConditionWaiter();
        //根據(jù)具體鎖的實現(xiàn)來決定是否激活競爭隊列中的一個線程
        int savedState = synchronizer.fullyRelease(node);
        boolean interrupted = false;
        //沒進(jìn)入競爭隊列則一直等待 忽略interrupted標(biāo)志
        while (!synchronizer.isOnSyncQueue(node)) {
            LockSupport.park(this);
            if (Thread.interrupted())
                interrupted = true;
        }
        //需要當(dāng)前線程的node 在競爭隊列中排首位才會停止阻塞蛙吏,acquireQueued 是一個不停檢查和通過 LockSupport.park 等待的過程
        if (synchronizer.acquireQueued(node, savedState) || interrupted)
            synchronizer.selfInterrupt();
    }

    /**
     * 基本邏輯同 awaitUninterruptibly
     * <p>
     * 不同之處在于處理 Thread.interupted 狀態(tài)標(biāo)識,如果線程被設(shè)置了這個狀態(tài)位鞋吉,則不再被動等待進(jìn)入 競爭隊列鸦做, 直接主動進(jìn)入競爭隊列
     * 另外一個注意的點,不是設(shè)置了interupted就會拋異常谓着,還有一種就是signal+interupted是幾乎同時觸發(fā)的情況泼诱,不拋異常
     *
     * @throws InterruptedException
     */
    public final void await() throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        Node node = addConditionWaiter();
        int savedState = synchronizer.fullyRelease(node);
        int interruptMode = 0;
        while (!synchronizer.isOnSyncQueue(node)) {
            LockSupport.park(this);
            if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                break;
        }
        if (synchronizer.acquireQueued(node, savedState) && interruptMode != THROW_IE)
            interruptMode = REINTERRUPT;
        if (node.nextWaiter != null) // clean up if cancelled
            unlinkCancelledWaiters();
        if (interruptMode != 0)
            reportInterruptAfterWait(interruptMode);
    }

    private int checkInterruptWhileWaiting(Node node) {
        return Thread.interrupted() ?
                (synchronizer.transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
                0;
    }

    private void reportInterruptAfterWait(int interruptMode)
            throws InterruptedException {
        if (interruptMode == THROW_IE)
            throw new InterruptedException();
        else if (interruptMode == REINTERRUPT)
            synchronizer.selfInterrupt();
    }

    /**
     * 基本流程同await
     * 在等待進(jìn)入競爭隊列的過程中,加入了時間判斷赊锚,如果超過等待時間則直接進(jìn)入競爭隊列
     *
     * @param nanosTimeout
     * @return
     * @throws InterruptedException
     */
    public final long awaitNanos(long nanosTimeout)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        Node node = addConditionWaiter();
        int savedState = synchronizer.fullyRelease(node);
        final long deadline = System.nanoTime() + nanosTimeout;
        int interruptMode = 0;
        while (!synchronizer.isOnSyncQueue(node)) {
            if (nanosTimeout <= 0L) {
                synchronizer.transferAfterCancelledWait(node);
                break;
            }
            if (nanosTimeout >= spinForTimeoutThreshold)
                LockSupport.parkNanos(this, nanosTimeout);
            if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                break;
            nanosTimeout = deadline - System.nanoTime();
        }
        if (synchronizer.acquireQueued(node, savedState) && interruptMode != THROW_IE)
            interruptMode = REINTERRUPT;
        if (node.nextWaiter != null)
            unlinkCancelledWaiters();
        if (interruptMode != 0)
            reportInterruptAfterWait(interruptMode);
        return deadline - System.nanoTime();
    }

    /**
     * 邏輯同上
     *
     * @param deadline
     * @return
     * @throws InterruptedException
     */
    public final boolean awaitUntil(Date deadline)
            throws InterruptedException {
        long abstime = deadline.getTime();
        if (Thread.interrupted())
            throw new InterruptedException();
        Node node = addConditionWaiter();
        int savedState = synchronizer.fullyRelease(node);
        boolean timedout = false;
        int interruptMode = 0;
        while (!synchronizer.isOnSyncQueue(node)) {
            if (System.currentTimeMillis() > abstime) {
                timedout = synchronizer.transferAfterCancelledWait(node);
                break;
            }
            LockSupport.parkUntil(this, abstime);
            if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                break;
        }
        if (synchronizer.acquireQueued(node, savedState) && interruptMode != THROW_IE)
            interruptMode = REINTERRUPT;
        if (node.nextWaiter != null)
            unlinkCancelledWaiters();
        if (interruptMode != 0)
            reportInterruptAfterWait(interruptMode);
        return !timedout;
    }

    /**
     * 邏輯同上
     * 這坨坨的冗余代碼治筒。。
     *
     * @param time
     * @param unit
     * @return
     * @throws InterruptedException
     */
    public final boolean await(long time, TimeUnit unit)
            throws InterruptedException {
        long nanosTimeout = unit.toNanos(time);
        if (Thread.interrupted())
            throw new InterruptedException();
        Node node = addConditionWaiter();
        int savedState = synchronizer.fullyRelease(node);
        final long deadline = System.nanoTime() + nanosTimeout;
        boolean timedout = false;
        int interruptMode = 0;
        while (!synchronizer.isOnSyncQueue(node)) {
            if (nanosTimeout <= 0L) {
                timedout = synchronizer.transferAfterCancelledWait(node);
                break;
            }
            if (nanosTimeout >= spinForTimeoutThreshold)
                LockSupport.parkNanos(this, nanosTimeout);
            if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                break;
            nanosTimeout = deadline - System.nanoTime();
        }
        if (synchronizer.acquireQueued(node, savedState) && interruptMode != THROW_IE)
            interruptMode = REINTERRUPT;
        if (node.nextWaiter != null)
            unlinkCancelledWaiters();
        if (interruptMode != 0)
            reportInterruptAfterWait(interruptMode);
        return !timedout;
    }

    /**
     * 工具擴展
     */

    final boolean isOwnedBy(AbstractQueuedSynchronizer sync) {
        return sync == synchronizer;
    }

    /**
     * 遍歷整個鏈表
     * 檢查是否有等待狀態(tài)的node
     *
     * @return
     */
    protected final boolean hasWaiters() {
        if (!synchronizer.isHeldExclusively())
            throw new IllegalMonitorStateException();
        for (Node w = firstWaiter; w != null; w = w.nextWaiter) {
            if (w.waitStatus == Node.CONDITION)
                return true;
        }
        return false;
    }

    /**
     * 遍歷整個鏈表
     * 計算等待狀態(tài)的node數(shù)量
     *
     * @return
     */
    protected final int getWaitQueueLength() {
        if (!synchronizer.isHeldExclusively())
            throw new IllegalMonitorStateException();
        int n = 0;
        for (Node w = firstWaiter; w != null; w = w.nextWaiter) {
            if (w.waitStatus == Node.CONDITION)
                ++n;
        }
        return n;
    }

    /**
     * 遍歷整個鏈表
     * 獲取等待狀態(tài)的node線程集合
     *
     * @return
     */
    protected final Collection<Thread> getWaitingThreads() {
        if (!synchronizer.isHeldExclusively())
            throw new IllegalMonitorStateException();
        ArrayList<Thread> list = new ArrayList<Thread>();
        for (Node w = firstWaiter; w != null; w = w.nextWaiter) {
            if (w.waitStatus == Node.CONDITION) {
                Thread t = w.thread;
                if (t != null)
                    list.add(t);
            }
        }
        return list;
    }
}
package zxl.org.aqs;

import sun.misc.Unsafe;

import java.util.ArrayList;
import java.util.Collection;
import java.util.concurrent.locks.AbstractOwnableSynchronizer;
import java.util.concurrent.locks.LockSupport;

/**
 * AQS 的核心實現(xiàn)
 * 代碼只有幾百行
 * 但是能從幾百行代碼領(lǐng)悟多少精髓 全看悟性了
 * 理解代碼的時候舷蒲,注意里面并不是順序的邏輯耸袜,會在多線程之間跳轉(zhuǎn)
 */
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable {
    private static final long serialVersionUID = 7373984972572414691L;
    //用來判斷使用自旋或者 os mutex 單位為ns  1ms=1000us=1000,000 ns cpu指令一般在ns級別
    private static final long spinForTimeoutThreshold = 1000L;
    //sun 操作內(nèi)存的內(nèi)部工具類
    private static final Unsafe unsafe = Unsafe.getUnsafe();
    /**
     * 下面幾個字段用來獲取 字段在對象內(nèi)存中的偏移量
     */
    private static final long stateOffset;
    private static final long headOffset;
    private static final long tailOffset;
    private static final long waitStatusOffset;
    private static final long nextOffset;

    static {
        try {
            stateOffset = unsafe.objectFieldOffset
                    (AbstractQueuedSynchronizer.class.getDeclaredField("state"));
            headOffset = unsafe.objectFieldOffset
                    (AbstractQueuedSynchronizer.class.getDeclaredField("head"));
            tailOffset = unsafe.objectFieldOffset
                    (AbstractQueuedSynchronizer.class.getDeclaredField("tail"));
            waitStatusOffset = unsafe.objectFieldOffset
                    (Node.class.getDeclaredField("waitStatus"));
            nextOffset = unsafe.objectFieldOffset
                    (Node.class.getDeclaredField("next"));

        } catch (Exception ex) {
            throw new Error(ex);
        }
    }

    private static final boolean compareAndSetWaitStatus(Node node, int expect, int update) {
        return unsafe.compareAndSwapInt(node, waitStatusOffset, expect, update);
    }

    private static final boolean compareAndSetNext(Node node, Node expect, Node update) {
        return unsafe.compareAndSwapObject(node, nextOffset, expect, update);
    }

    /**
     * 當(dāng) node 競爭鎖失敗后(此時一般node處于鏈表的中間),是否進(jìn)入 系統(tǒng)調(diào)用 block線程
     *
     * @param pred
     * @param node
     * @return
     */
    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int ws = pred.waitStatus;
        if (ws == Node.SIGNAL)
            //前一個都在等呢 直接返回true 阻塞當(dāng)前節(jié)點的線程
            return true;
        if (ws > 0) {
            //前一個節(jié)點取消了牲平,則清理一下鏈表
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            //前一個節(jié)點是 propagate 則設(shè)置為 signal
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        //清理或改完狀態(tài)后 返回外層調(diào)用后不要阻塞當(dāng)前線程
        return false;
    }

    static void selfInterrupt() {
        Thread.currentThread().interrupt();
    }

    /**
     * AQS 核心的三個內(nèi)部成員
     * 維護(hù)了一個雙向鏈表的 head堤框、tail
     * 維護(hù)了一個 int型的狀態(tài)標(biāo)識 32位Bit
     */
    private transient volatile Node head;
    private transient volatile Node tail;
    private volatile int state;

    protected AbstractQueuedSynchronizer() {
    }

    /**
     * 利用操作系統(tǒng)底層的 CAS 指令完成
     * 是一種樂觀鎖的實現(xiàn)方式
     *
     * @param update
     * @return
     */
    private final boolean compareAndSetHead(Node update) {
        return unsafe.compareAndSwapObject(this, headOffset, null, update);
    }

    /**
     * 同上
     *
     * @param expect
     * @param update
     * @return
     */
    private final boolean compareAndSetTail(Node expect, Node update) {
        return unsafe.compareAndSwapObject(this, tailOffset, expect, update);
    }

    protected final int getState() {
        return state;
    }

    protected final void setState(int newState) {
        state = newState;
    }

    /**
     * 同上
     *
     * @param expect
     * @param update
     * @return
     */
    protected final boolean compareAndSetState(int expect, int update) {
        return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
    }

    /**
     * 在競爭隊列里加一個 node
     *
     * @param node
     * @return
     */
    private Node enq(final Node node) {
        for (; ; ) {
            Node t = tail;
            if (t == null) {
                //初始化過程包含兩步 設(shè)置head、設(shè)置tail (需要對其他線程有順序性纵柿、可見性蜈抓、原子性)
                //其他線程通過判斷最后賦值的 volatile 變量 tail , 來保證初始操作的原子性
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
                node.prev = t;
                //這里掛載不是包含三個步驟昂儒,并不是原子性的
                //掛載成功的標(biāo)志是設(shè)置tail成功(但雙向鏈表可能還不滿足 prev.next == tail)
                //所以其他線程遍歷的時候沟使,有時需要從tail 反向遍歷
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }

    /**
     * 跟enq 入隊類似
     * 添加一個 共享或獨享 節(jié)點 node
     *
     * @param mode
     * @return
     */
    private Node addWaiter(Node mode) {
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        enq(node);
        return node;
    }

    private void setHead(Node node) {
        head = node;
        node.thread = null;
        node.prev = null;
    }

    /**
     * 激活雙向鏈表中,node后面的某一個非取消節(jié)點線程
     * 如果激活的節(jié)點恰好獲取到了鎖渊跋,則會改變 競爭隊列的head
     *
     * @param node
     */
    private void unparkSuccessor(Node node) {
        int ws = node.waitStatus;
        if (ws < 0) {
            //激活后續(xù)節(jié)點的時候 當(dāng)前節(jié)點的狀態(tài)被吃掉了 signal腊嗡、propagate -> 0
            compareAndSetWaitStatus(node, ws, 0);
        }
        Node s = node.next;
        if (s == null || s.waitStatus > 0) {
            s = null;
            //這段代碼是為了兼容其他線程正在并發(fā)修改tail的情況
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null) {
            LockSupport.unpark(s.thread);
        }
    }

    /**
     * 如果head節(jié)點是 signal 狀態(tài)撤缴,則激活競爭隊列里的一個非取消節(jié)點線程
     * 如果head節(jié)點是 復(fù)位 狀態(tài), 則設(shè)置為 propagate 狀態(tài)后返回
     */
    private void doReleaseShared() {
        for (; ; ) {
            Node h = head;
            //鏈表非空
            if (h != null && h != tail) {
                int ws = h.waitStatus;
                //右邊相鄰node在park的時候 會設(shè)置head的狀態(tài)為 signal
                if (ws == Node.SIGNAL) {
                    //這里必須吃掉signal 才能激活后續(xù)節(jié)點線程
                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                        continue;            // loop to recheck cases
                    //激活右鄰的node線程
                    unparkSuccessor(h);
                } else if (ws == 0 &&
                        !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) {
                    //代碼執(zhí)行到這里 一般代表沒有右鄰節(jié)點在等待
                    continue;
                }
            }
            if (h == head) {
                // loop if head changed
                // 如果head節(jié)點被 激活線程給吃掉了 則繼續(xù)嘗試激活等待線程
                break;
            }
        }
    }

    private void setHeadAndPropagate(Node node, int propagate) {
        Node h = head; // Record old head for check below
        setHead(node);
        if (propagate > 0 || h == null || h.waitStatus < 0 ||
                (h = head) == null || h.waitStatus < 0) {
            Node s = node.next;
            if (s == null || s.isShared())
                //共享鎖的傳播
                doReleaseShared();
        }
    }

    /**
     * 取消node的時候 需要清理鏈表
     * 后續(xù)節(jié)點要么掛到新的 pred節(jié)點叽唱,要么全部喚醒
     *
     * @param node
     */
    private void cancelAcquire(Node node) {
        // Ignore if node doesn't exist
        if (node == null) {
            return;
        }
        node.thread = null;
        // Skip cancelled predecessors
        Node pred = node.prev;
        while (pred.waitStatus > 0)
            node.prev = pred = pred.prev;
        // predNext is the apparent node to unsplice. CASes below will
        // fail if not, in which case, we lost race vs another cancel
        // or signal, so no further action is necessary.
        Node predNext = pred.next;
        // Can use unconditional write instead of CAS here.
        // After this atomic step, other Nodes can skip past us.
        // Before, we are free of interference from other threads.
        node.waitStatus = Node.CANCELLED;
        // If we are the tail, remove ourselves.
        if (node == tail && compareAndSetTail(node, pred)) {
            compareAndSetNext(pred, predNext, null);
            return;
        }
        // If successor needs signal, try to set pred's next-link
        // so it will get one. Otherwise wake it up to propagate.
        int ws;
        if (pred != head &&
                ((ws = pred.waitStatus) == Node.SIGNAL ||
                        (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
                pred.thread != null) {
            Node next = node.next;
            if (next != null && next.waitStatus <= 0) {
                compareAndSetNext(pred, predNext, next);
            }
        } else {
            unparkSuccessor(node);
        }
        node.next = node; // help GC

    }


    private final boolean parkAndCheckInterrupt() {
        LockSupport.park(this);
        return Thread.interrupted();
    }


    /**
     * node 競爭鎖(競爭隊列的第一位)
     * 競爭成功后 會改變head
     *
     * @param node
     * @param arg
     * @return
     */
    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (; ; ) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                        parkAndCheckInterrupt())
                    //等待被激活
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

    /**
     * 當(dāng)前線程 競爭獨占鎖
     * 1. 先將當(dāng)前線程創(chuàng)建一個 獨占node 加入到競爭隊列
     * 2. 競爭隊列頭部
     *
     * @param arg
     * @throws InterruptedException
     */
    private void doAcquireInterruptibly(int arg)
            throws InterruptedException {
        final Node node = addWaiter(Node.EXCLUSIVE);
        boolean failed = true;
        try {
            for (; ; ) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                        parkAndCheckInterrupt())
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

    /**
     * 基本邏輯同上
     * <p>
     * 當(dāng)前線程競爭獨占鎖
     * 加入了超時時間屈呕,超時則直接返回失敗
     *
     * @param arg
     * @param nanosTimeout
     * @return
     * @throws InterruptedException
     */
    private boolean doAcquireNanos(int arg, long nanosTimeout)
            throws InterruptedException {
        if (nanosTimeout <= 0L)
            return false;
        final long deadline = System.nanoTime() + nanosTimeout;
        final Node node = addWaiter(Node.EXCLUSIVE);
        boolean failed = true;
        try {
            for (; ; ) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return true;
                }
                nanosTimeout = deadline - System.nanoTime();
                if (nanosTimeout <= 0L)
                    return false;
                if (shouldParkAfterFailedAcquire(p, node) &&
                        nanosTimeout > spinForTimeoutThreshold)
                    LockSupport.parkNanos(this, nanosTimeout);
                if (Thread.interrupted())
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

    /**
     * 基本邏輯同 doAcquire
     * 不同之處在于 獲取到 共享鎖之后,調(diào)用了 setHeadAndPropagate
     * setHeadAndPropagate 內(nèi)部會繼續(xù)releaseShared
     *
     * @param arg
     */
    private void doAcquireShared(int arg) {
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (; ; ) {
                final Node p = node.predecessor();
                if (p == head) {
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {
                        //獲取共享鎖 這行代碼是關(guān)鍵  假設(shè)當(dāng)前線程為A棺亭,則A會激活 線程B(node緊鄰的后續(xù)node線程B)的執(zhí)行點
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        if (interrupted)
                            selfInterrupt();
                        failed = false;
                        return;
                    }
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                        parkAndCheckInterrupt())
                    //線程B的執(zhí)行點
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

    /**
     * 同上
     *
     * @param arg
     * @throws InterruptedException
     */
    private void doAcquireSharedInterruptibly(int arg)
            throws InterruptedException {
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            for (; ; ) {
                final Node p = node.predecessor();
                if (p == head) {
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        failed = false;
                        return;
                    }
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                        parkAndCheckInterrupt())
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

    private boolean doAcquireSharedNanos(int arg, long nanosTimeout)
            throws InterruptedException {
        if (nanosTimeout <= 0L)
            return false;
        final long deadline = System.nanoTime() + nanosTimeout;
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            for (; ; ) {
                final Node p = node.predecessor();
                if (p == head) {
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        failed = false;
                        return true;
                    }
                }
                nanosTimeout = deadline - System.nanoTime();
                if (nanosTimeout <= 0L)
                    return false;
                if (shouldParkAfterFailedAcquire(p, node) &&
                        nanosTimeout > spinForTimeoutThreshold)
                    LockSupport.parkNanos(this, nanosTimeout);
                if (Thread.interrupted())
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }


    /**
     * 下面4個方法為主要的擴展點 用內(nèi)部的 int類型 保存狀態(tài)
     */

    protected boolean tryAcquire(int arg) {
        throw new UnsupportedOperationException();
    }

    protected boolean tryRelease(int arg) {
        throw new UnsupportedOperationException();
    }

    protected int tryAcquireShared(int arg) {
        throw new UnsupportedOperationException();
    }

    protected boolean tryReleaseShared(int arg) {
        throw new UnsupportedOperationException();
    }

    /**
     * 當(dāng)前線程是否獲取了鎖
     *
     * @return
     */
    protected boolean isHeldExclusively() {
        throw new UnsupportedOperationException();
    }

    /**
     * 內(nèi)部會快速嘗試 tryAcquire 如果 成功直接返回
     * 否則 走正常的 競爭隊列排隊邏輯
     *
     * @param arg
     */
    public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
                acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

    /**
     * 同上
     * 特殊處理interupted 標(biāo)識虎眨,如果中斷標(biāo)識則拋異常
     *
     * @param arg
     * @throws InterruptedException
     */
    public final void acquireInterruptibly(int arg)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        if (!tryAcquire(arg))
            doAcquireInterruptibly(arg);
    }

    public final boolean tryAcquireNanos(int arg, long nanosTimeout)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        return tryAcquire(arg) ||
                doAcquireNanos(arg, nanosTimeout);
    }

    /**
     * 如果 tryRelease成功了(子類擴展點),則激活競爭鏈表中的某一個節(jié)點線程
     *
     * @param arg
     * @return
     */
    public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            //注意判斷waitStatus為非零狀態(tài)
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }

    public final void acquireShared(int arg) {
        if (tryAcquireShared(arg) < 0)
            doAcquireShared(arg);
    }

    public final void acquireSharedInterruptibly(int arg)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        if (tryAcquireShared(arg) < 0)
            doAcquireSharedInterruptibly(arg);
    }

    public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        return tryAcquireShared(arg) >= 0 ||
                doAcquireSharedNanos(arg, nanosTimeout);
    }

    public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {
            doReleaseShared();
            return true;
        }
        return false;
    }

    // Queue inspection methods

    public final boolean hasQueuedThreads() {
        return head != tail;
    }

    public final boolean hasContended() {
        return head != null;
    }

    public final Thread getFirstQueuedThread() {
        // handle only fast path, else relay
        return (head == tail) ? null : fullGetFirstQueuedThread();
    }

    private Thread fullGetFirstQueuedThread() {
        Node h, s;
        Thread st;
        if (((h = head) != null && (s = h.next) != null &&
                s.prev == head && (st = s.thread) != null) ||
                ((h = head) != null && (s = h.next) != null &&
                        s.prev == head && (st = s.thread) != null))
            return st;


        Node t = tail;
        Thread firstThread = null;
        while (t != null && t != head) {
            Thread tt = t.thread;
            if (tt != null)
                firstThread = tt;
            t = t.prev;
        }
        return firstThread;
    }

    public final boolean isQueued(Thread thread) {
        if (thread == null)
            throw new NullPointerException();
        for (Node p = tail; p != null; p = p.prev)
            if (p.thread == thread)
                return true;
        return false;
    }

    final boolean apparentlyFirstQueuedIsExclusive() {
        Node h, s;
        return (h = head) != null &&
                (s = h.next) != null &&
                !s.isShared() &&
                s.thread != null;
    }

    public final boolean hasQueuedPredecessors() {
        // The correctness of this depends on head being initialized
        // before tail and on head.next being accurate if the current
        // thread is first in queue.
        Node t = tail; // Read fields in reverse initialization order
        Node h = head;
        Node s;
        return h != t &&
                ((s = h.next) == null || s.thread != Thread.currentThread());
    }


    // Instrumentation and monitoring methods

    public final int getQueueLength() {
        int n = 0;
        for (Node p = tail; p != null; p = p.prev) {
            if (p.thread != null)
                ++n;
        }
        return n;
    }

    public final Collection<Thread> getQueuedThreads() {
        ArrayList<Thread> list = new ArrayList<Thread>();
        for (Node p = tail; p != null; p = p.prev) {
            Thread t = p.thread;
            if (t != null)
                list.add(t);
        }
        return list;
    }

    public final Collection<Thread> getExclusiveQueuedThreads() {
        ArrayList<Thread> list = new ArrayList<Thread>();
        for (Node p = tail; p != null; p = p.prev) {
            if (!p.isShared()) {
                Thread t = p.thread;
                if (t != null)
                    list.add(t);
            }
        }
        return list;
    }

    public final Collection<Thread> getSharedQueuedThreads() {
        ArrayList<Thread> list = new ArrayList<Thread>();
        for (Node p = tail; p != null; p = p.prev) {
            if (p.isShared()) {
                Thread t = p.thread;
                if (t != null)
                    list.add(t);
            }
        }
        return list;
    }

    public String toString() {
        int s = getState();
        String q = hasQueuedThreads() ? "non" : "";
        return super.toString() +
                "[State = " + s + ", " + q + "empty queue]";
    }


    // Internal support methods for Conditions

    final boolean isOnSyncQueue(Node node) {
        if (node.waitStatus == Node.CONDITION || node.prev == null)
            return false;
        if (node.next != null) // If has successor, it must be on queue
            return true;
        return findNodeFromTail(node);
    }

    private boolean findNodeFromTail(Node node) {
        Node t = tail;
        for (; ; ) {
            if (t == node)
                return true;
            if (t == null)
                return false;
            t = t.prev;
        }
    }

    /**
     * 更改node的狀態(tài)為非condition
     * 將node加入到競爭隊列里
     * 檢查競爭隊列的上一個節(jié)點是否取消過镶摘,取消過則激活對應(yīng)的線程
     *
     * @param node
     * @return
     */
    final boolean transferForSignal(Node node) {
        if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
            return false;
        //將node加入到 AQS里的競爭隊列末尾 返回的是競爭隊列里的上一個tail
        Node p = enq(node);
        int ws = p.waitStatus;
        //檢查上一個tail節(jié)點是不是 cancel了嗽桩, 或者 tail節(jié)點的狀態(tài)有多線程競爭,直接激活這個節(jié)點的線程
        //在node狀態(tài)改為signal之前 如果其他線程改變了傳播狀態(tài)waitStatus凄敢,這里會直接觸發(fā)激活等待線程
        if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
            LockSupport.unpark(node.thread);
        return true;
    }

    final boolean transferAfterCancelledWait(Node node) {
        if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
            enq(node);
            return true;
        }
        while (!isOnSyncQueue(node))
            Thread.yield();
        return false;
    }

    /**
     * 成功了則激活一個競爭隊列的節(jié)點線程
     * 失敗了碌冶,node會被設(shè)置為cancel狀態(tài)
     *
     * @param node
     * @return
     */
    final int fullyRelease(Node node) {
        boolean failed = true;
        try {
            int savedState = getState();
            if (release(savedState)) {
                failed = false;
                return savedState;
            } else {
                throw new IllegalMonitorStateException();
            }
        } finally {
            if (failed)
                node.waitStatus = Node.CANCELLED;
        }
    }

    // Instrumentation methods for conditions

    public final boolean owns(ConditionObject condition) {
        return condition.isOwnedBy(this);
    }

    public final boolean hasWaiters(ConditionObject condition) {
        if (!owns(condition))
            throw new IllegalArgumentException("Not owner");
        return condition.hasWaiters();
    }

    public final int getWaitQueueLength(ConditionObject condition) {
        if (!owns(condition))
            throw new IllegalArgumentException("Not owner");
        return condition.getWaitQueueLength();
    }

    public final Collection<Thread> getWaitingThreads(ConditionObject condition) {
        if (!owns(condition))
            throw new IllegalArgumentException("Not owner");
        return condition.getWaitingThreads();
    }


}

一些參考資料

ReentrantLock實現(xiàn)-AQS

AtomicInteger實現(xiàn)-CAS

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市涝缝,隨后出現(xiàn)的幾起案子扑庞,更是在濱河造成了極大的恐慌,老刑警劉巖拒逮,帶你破解...
    沈念sama閱讀 218,755評論 6 507
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件罐氨,死亡現(xiàn)場離奇詭異,居然都是意外死亡滩援,警方通過查閱死者的電腦和手機栅隐,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,305評論 3 395
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來玩徊,“玉大人租悄,你說我怎么就攤上這事《鞲ぃ” “怎么了泣棋?”我有些...
    開封第一講書人閱讀 165,138評論 0 355
  • 文/不壞的土叔 我叫張陵,是天一觀的道長憎蛤。 經(jīng)常有香客問我外傅,道長,這世上最難降的妖魔是什么俩檬? 我笑而不...
    開封第一講書人閱讀 58,791評論 1 295
  • 正文 為了忘掉前任,我火速辦了婚禮碾盟,結(jié)果婚禮上棚辽,老公的妹妹穿的比我還像新娘。我一直安慰自己冰肴,他們只是感情好屈藐,可當(dāng)我...
    茶點故事閱讀 67,794評論 6 392
  • 文/花漫 我一把揭開白布榔组。 她就那樣靜靜地躺著,像睡著了一般联逻。 火紅的嫁衣襯著肌膚如雪搓扯。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 51,631評論 1 305
  • 那天包归,我揣著相機與錄音锨推,去河邊找鬼。 笑死公壤,一個胖子當(dāng)著我的面吹牛换可,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播厦幅,決...
    沈念sama閱讀 40,362評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼沾鳄,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了确憨?” 一聲冷哼從身側(cè)響起译荞,我...
    開封第一講書人閱讀 39,264評論 0 276
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎休弃,沒想到半個月后磁椒,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,724評論 1 315
  • 正文 獨居荒郊野嶺守林人離奇死亡玫芦,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,900評論 3 336
  • 正文 我和宋清朗相戀三年浆熔,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片桥帆。...
    茶點故事閱讀 40,040評論 1 350
  • 序言:一個原本活蹦亂跳的男人離奇死亡医增,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出老虫,到底是詐尸還是另有隱情叶骨,我是刑警寧澤,帶...
    沈念sama閱讀 35,742評論 5 346
  • 正文 年R本政府宣布祈匙,位于F島的核電站忽刽,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏夺欲。R本人自食惡果不足惜跪帝,卻給世界環(huán)境...
    茶點故事閱讀 41,364評論 3 330
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望些阅。 院中可真熱鬧伞剑,春花似錦、人聲如沸市埋。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,944評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至抒倚,卻和暖如春褐着,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背托呕。 一陣腳步聲響...
    開封第一講書人閱讀 33,060評論 1 270
  • 我被黑心中介騙來泰國打工含蓉, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人镣陕。 一個月前我還...
    沈念sama閱讀 48,247評論 3 371
  • 正文 我出身青樓谴餐,卻偏偏與公主長得像,于是被迫代替她去往敵國和親呆抑。 傳聞我的和親對象是個殘疾皇子岂嗓,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 44,979評論 2 355