java并發(fā)編程實(shí)戰(zhàn)三之Lock原理

java并發(fā)編程實(shí)戰(zhàn)三之Lock原理

java locks包的核心類(lèi)是AQS(AbstractQueuedSynchronizer), AQS的核心實(shí)現(xiàn)其實(shí)是一個(gè)自旋鎖

1. 自旋鎖(Spin Lock)

自旋鎖是指當(dāng)一個(gè)線程嘗試獲取某個(gè)鎖時(shí)窿撬,如果該鎖已被其他線程占用欣尼,就一直循環(huán)檢測(cè)鎖是否被釋放翎卓,而不是進(jìn)入線程掛起或睡眠狀態(tài)
自旋鎖適用于鎖保護(hù)的臨界區(qū)很小的情況青团,臨界區(qū)很小的話(huà),鎖占用的時(shí)間就很短

1.1 簡(jiǎn)單自旋鎖

下面的代碼是自旋鎖的一種簡(jiǎn)單實(shí)現(xiàn)
它有很多的缺陷罚攀,你可以想到多少缺點(diǎn)言蛇??兽狭?

public class SpinLock {
    /**
     * 使用原子類(lèi)來(lái)標(biāo)識(shí)線程是否獲取到了鎖
     */
    private AtomicReference<Thread> owner = new AtomicReference<Thread>();

    public void lock() {
        Thread currentThread = Thread.currentThread();
        //如果鎖未被占用憾股,則設(shè)置當(dāng)前線程為鎖的擁有者
        while (!owner.compareAndSet(null, currentThread)){
        }
    }

    public void unlock(){
        Thread currentThread = Thread.currentThread();

        // 只有鎖的擁有者才能釋放鎖
        owner.compareAndSet(currentThread, null);
    }
}

1.2 Ticket Lock

為了解決公平性問(wèn)題鹿蜀,我們模仿銀行排隊(duì)的方式箕慧,設(shè)計(jì)了另一種公平鎖
Ticket Lock 雖然解決了公平性的問(wèn)題,但是多處理器系統(tǒng)上茴恰,每個(gè)進(jìn)程/線程占用的處理器都在讀寫(xiě)同一個(gè)變量serviceNum 颠焦,每次讀寫(xiě)操作都必須在多個(gè)處理器緩存之間進(jìn)行緩存同步,這會(huì)導(dǎo)致繁重的系統(tǒng)總線和內(nèi)存的流量往枣,大大降低系統(tǒng)整體的性能

public class TicketLock {

    /**
     * 服務(wù)號(hào)
     */
    private AtomicInteger serviceNum = new AtomicInteger();
    /**
     * 排隊(duì)號(hào)
     */
    private AtomicInteger ticketNum = new AtomicInteger();

    public int lock(){
        //首先原子性地獲得一個(gè)排隊(duì)號(hào)
        int myTicketNum = ticketNum.getAndIncrement();
        //只要當(dāng)前服務(wù)號(hào)不是自己的就不斷輪詢(xún)
        while (serviceNum.get() != myTicketNum){
        }
        return myTicketNum;
    }

    public void unlock(int myTicket){
        int next = myTicket + 1;
        serviceNum.compareAndSet(myTicket, next);
    }
}

1.3 CLH隊(duì)列鎖

CLH鎖是一種基于鏈表的可擴(kuò)展伐庭、高性能、公平的自旋鎖分冈,申請(qǐng)線程只在本地變量上自旋圾另,它不斷輪詢(xún)前驅(qū)的狀態(tài),如果發(fā)現(xiàn)前驅(qū)釋放了鎖就結(jié)束自旋
CLH在SMP系統(tǒng)結(jié)構(gòu)下該法是非常有效的雕沉。但在NUMA系統(tǒng)結(jié)構(gòu)下集乔,每個(gè)線程有自己的內(nèi)存,如果前趨結(jié)點(diǎn)的內(nèi)存位置比較遠(yuǎn)坡椒,自旋判斷前趨結(jié)點(diǎn)的locked域扰路,性能將大打折扣
當(dāng)一個(gè)線程需要獲取鎖時(shí):

a. 創(chuàng)建一個(gè)的QNode,將其中的locked設(shè)置為true表示需要獲取鎖
b. 線程對(duì)tail域調(diào)用getAndSet方法倔叼,使自己成為隊(duì)列的尾部汗唱,同時(shí)獲取一個(gè)指向其前趨結(jié)點(diǎn)的引用myPred
c. 該線程就在前趨結(jié)點(diǎn)的locked字段上旋轉(zhuǎn),直到前趨結(jié)點(diǎn)釋放鎖
d. 當(dāng)一個(gè)線程需要釋放鎖時(shí)丈攒,將當(dāng)前結(jié)點(diǎn)的locked域設(shè)置為false哩罪,同時(shí)回收前趨結(jié)點(diǎn)


CLH Lock
public class CLHLock {
    /**
     * tail 指向最后線程的節(jié)點(diǎn)
     * current 每個(gè)線程的節(jié)點(diǎn)
     */
    private final AtomicReference<CLHNode> tail = new AtomicReference<>(new CLHNode());
    private final ThreadLocal<CLHNode> current;

    public CLHLock() {
        current =ThreadLocal.withInitial(CLHNode::new);
    }

    public void lock(){
        CLHNode own = current.get();
        own.locked = true;
        CLHNode preNode = tail.getAndSet(own);
        //輪詢(xún)前驅(qū)節(jié)點(diǎn)
        while (preNode.locked){

        }
    }

    public void unlock(){
        //當(dāng)前線程節(jié)點(diǎn)上釋放
        current.get().locked = false;
    }

    private static class CLHNode{
        private volatile boolean locked = false;
    }
}

1.4 MCS鎖

為了解決NUMA系統(tǒng)結(jié)構(gòu)下CLH自選出現(xiàn)的性能問(wèn)題,MCS隊(duì)列鎖應(yīng)運(yùn)而生
MSC與CLH最大的不同并不是鏈表是顯示還是隱式巡验,而是線程自旋的規(guī)則不同:CLH是在前趨結(jié)點(diǎn)的locked域上自旋等待识椰,而MSC是在自己的結(jié)點(diǎn)的locked域上自旋等待。正因?yàn)槿绱松罴睿鉀Q了CLH在NUMA系統(tǒng)架構(gòu)中獲取locked域狀態(tài)內(nèi)存過(guò)遠(yuǎn)的問(wèn)題腹鹉。

a. 隊(duì)列初始化時(shí)沒(méi)有結(jié)點(diǎn),tail=null
b. 線程A想要獲取鎖敷硅,于是將自己置于隊(duì)尾功咒,由于它是第一個(gè)結(jié)點(diǎn),它的locked域?yàn)閒alse
c. 線程B和C相繼加入隊(duì)列绞蹦,a->next=b,b->next=c力奋。且B和C現(xiàn)在沒(méi)有獲取鎖,處于等待狀態(tài)幽七,所以它們的locked域?yàn)閠rue景殷,
尾指針指向線程C對(duì)應(yīng)的結(jié)點(diǎn)
d. 線程A釋放鎖后,順著它的next指針找到了線程B,并把B的locked域設(shè)置為false猿挚。這一動(dòng)作會(huì)觸發(fā)線程B獲取鎖


MCS Lock
public class MCSLock {
    /**
     * 隊(duì)列初始化時(shí)沒(méi)有結(jié)點(diǎn)咐旧,tail指向null
     * 
     */
    private final AtomicReference<MCSNode> tail = new AtomicReference<>(null);
    private ThreadLocal<MCSNode> current;

    public MCSLock() {
        current = ThreadLocal.withInitial(MCSNode::new);
    }

    public void lock(){
        // 線程A想要獲取鎖,于是將自己置于隊(duì)尾绩蜻,由于它是第一個(gè)結(jié)點(diǎn)铣墨,它的locked域?yàn)閒alse
        MCSNode own = current.get();
        MCSNode preNode = tail.getAndSet(own);
        // 線程B和C相繼加入隊(duì)列,a->next=b,b->next=c。
        // 且B和C現(xiàn)在沒(méi)有獲取鎖,處于等待狀態(tài),所以它們的locked域?yàn)閠rue,尾指針指向線程C對(duì)應(yīng)的結(jié)點(diǎn)
        if(preNode != null){
            own.locked = true;
            preNode.next = own;
            // 在自己的結(jié)點(diǎn)的locked域上自旋等待
            while (own.locked){}
        }
    }

    public void unlock(){
        MCSNode own = current.get();
        if(!own.locked){
            return;
        }
        // 最后一個(gè)獲取鎖的線程
        if(own.next == null){
            if(tail.compareAndSet(own, null)){
                return;
            }
            // 過(guò)程中又有線程獲得鎖
            while (own.next == null){}
        }
        // 線程A釋放鎖后办绝,順著它的next指針找到了線程B,并把B的locked域設(shè)置為false.這一動(dòng)作會(huì)觸發(fā)線程B獲取鎖
        own.next.locked = false;
        own.next = null;
    }

    private static class MCSNode{
        private MCSNode next;
        private volatile boolean locked = false;
    }
}

2. Unsafe和LockSupport

2.1 Unsafe類(lèi)簡(jiǎn)介

首先伊约,我們的代碼中不應(yīng)該使用這個(gè)類(lèi),雖然可以通過(guò)反射的方式獲取到它的示例并使用
個(gè)人覺(jué)得孕蝉,我們也不需要詳細(xì)了解這個(gè)類(lèi)的使用
簡(jiǎn)單了解:

  1. put,get方法
    // l offset變量相對(duì)偏移量屡律,可用objectFieldOffset(java.lang.reflect.Field field)獲取
    putObject(java.lang.Object o, long l, java.lang.Object o1)
    getObject(java.lang.Object o, long l)
  2. 變量偏移量
    objectFieldOffset(java.lang.reflect.Field field)
    staticFieldOffset(java.lang.reflect.Field field)
  3. 內(nèi)存管理
    allocateMemory(long l)
    reallocateMemory(long l, long l1)
    setMemory(java.lang.Object o, long l, long l1, byte b)
    copyMemory(java.lang.Object o, long l, java.lang.Object o1, long l1, long l2)
    freeMemory(long l)
  4. CAS操作
    compareAndSwapObject(java.lang.Object o, long l, java.lang.Object o1, java.lang.Object o2)
  5. 實(shí)例化
    allocateInstance(java.lang.Class<?> aClass)
  6. 數(shù)組
    arrayIndexScale(java.lang.Class<?> aClass)
    arrayBaseOffset(java.lang.Class<?> aClass)
  7. 阻塞和喚醒
    park(boolean b, long l)
    unpark(java.lang.Object o)

2.2 LockSupport

  1. 喚醒線程或者歸還令牌
    unpark(Thread thread)
  2. 阻塞線程,附加額外信息
    park(Object blocker)
  3. 阻塞線程一段時(shí)間降淮,附加額外信息
    parkNanos(Object blocker, long nanos)
  4. 阻塞線程直到某時(shí)間疹尾,附加額外信息
    parkUntil(Object blocker, long deadline)
  5. 阻塞線程
    park()
  6. 阻塞線程一段時(shí)間
    阻塞線程一段時(shí)間
  7. 阻塞線程直到某時(shí)間
    parkUntil(long deadline)
  8. Thread添加額外信息
    setBlocker(Thread t, Object arg)
    getBlocker(Thread t)

這里獲取的許可永遠(yuǎn)只有一個(gè),與底層c++實(shí)現(xiàn)有關(guān)骤肛。HotSpot里Parker有一個(gè)私有_counter變量纳本,無(wú)論調(diào)用多少次unpark,_counter都被設(shè)為1

3. AbstractQueuedSynchronizer

AQS改進(jìn)了CLH隊(duì)列自旋鎖,結(jié)合了自旋和睡眠/喚醒兩種方法的優(yōu)點(diǎn)

3.1 雙端鏈表Node

    static final class Node {
        /** 標(biāo)記當(dāng)前結(jié)點(diǎn)是共享模式 */
        static final Node SHARED = new Node();
        /** 標(biāo)記當(dāng)前結(jié)點(diǎn)是獨(dú)占模式 */
        static final Node EXCLUSIVE = null;
        /**
         * 結(jié)點(diǎn)的等待狀態(tài) 可能的取值包含:CANCELLED腋颠、SIGNAL繁成、CONDITION、PROPAGATE和0
         * 在同步等待隊(duì)列中的節(jié)點(diǎn)初始值為0,在條件等待隊(duì)列中的節(jié)點(diǎn)初始值為CONDITION
         * 在獨(dú)占模式下淑玫,取值為CANCELLED印颤、SIGNAL吻谋、0中之一
         * 在共享模式下,取值為CANCELLED、SIGNAL掂碱、PROPAGATE和0中之一
         * 在條件等待隊(duì)列中绵咱,取值為CANCELLED砌梆、CONDITION中之一
         */
        volatile int waitStatus;
        /** 擁有當(dāng)前結(jié)點(diǎn)的線程 */
        volatile Thread thread;

        /** 線程已經(jīng)被取消 */
        static final int CANCELLED =  1;
        /**
         *  后續(xù)節(jié)點(diǎn)需要喚醒
         *  節(jié)點(diǎn)插入隊(duì)列時(shí)濒憋,節(jié)點(diǎn)代表的線程睡眠前會(huì)將前一個(gè)節(jié)點(diǎn)的waitStatus置為SIGNAL
         *  當(dāng)前一個(gè)節(jié)點(diǎn)釋放鎖時(shí),如果其waitStatus置為SIGNAL但壮,則會(huì)喚醒其后下一個(gè)節(jié)點(diǎn)線程
         */
        static final int SIGNAL    = -1;
        /** 表示節(jié)點(diǎn)代表的線程正處于條件等待隊(duì)列中等待signal信號(hào) */
        static final int CONDITION = -2;
        /** 在共享模式下使用,表示同步狀態(tài)能夠無(wú)條件向后傳播 */
        static final int PROPAGATE = -3;

        volatile Node prev;

        volatile Node next;
        /**
         * 在條件等待隊(duì)列中冀泻,用于指向下一個(gè)節(jié)點(diǎn)
         * 在同步等待隊(duì)列中,用于標(biāo)記該節(jié)點(diǎn)所代表的線程在獨(dú)占模式下還是共享模式下獲取鎖
         */
        Node nextWaiter;

        public Node() {
        }

        /**
         *
         * @param thread 一般為當(dāng)前線程
         * @param mode 排他EXCLUSIVE  共享SHARED
         */
        public Node(Thread thread, Node mode) {
            this.thread = thread;
            this.nextWaiter = mode;
        }

        final boolean isShared() {
            return nextWaiter == SHARED;
        }

        final Node predecessor() throws NullPointerException {
            Node p = prev;
            if (p == null) {
                throw new NullPointerException();
            } else {
                return p;
            }
        }
    }

3.2 同步等待隊(duì)列

image

waitStatus在同步等待隊(duì)列中不同的模式下蜡饵,有不同的取值:

  • 在獨(dú)占模式下弹渔,取值為CANCELLED、SIGNAL溯祸、0中之一
  • 在共享模式下肢专,取值為CANCELLED舞肆、SIGNAL、PROPAGATE和0中之一

nextWaiter

  • 在同步等待隊(duì)列中博杖,用于標(biāo)記該節(jié)點(diǎn)所代表的線程在獨(dú)占模式下還是共享模式下獲取鎖

3.3 獨(dú)占模式下鎖獲取

這里我們只細(xì)講acquire方法

public final void acquire(int arg) {
    if (!tryAcquire(arg) &&   //1
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))   //2 -> 3
        selfInterrupt();        //4
}
  1. 首先調(diào)用tryAcquire函數(shù)椿胯,線程嘗試獲取鎖,如果獲取成功則直接返回
  2. 如果獲取失敗欧募,則調(diào)用addWaiter函數(shù),將線程封裝成一個(gè)Node節(jié)點(diǎn)并插入同步等待隊(duì)列尾部仆抵;Node.EXCLUSIVE代表獨(dú)占模式
    /**
     * 此處在尾部插入node時(shí)跟继,先設(shè)置node的prev,再CAS修改隊(duì)列tail指向镣丑,修改成功再設(shè)置前一個(gè)節(jié)點(diǎn)的next域
     * 隊(duì)列中舔糖,如果某個(gè)node的prev!=null,并不一定表示node已經(jīng)成功插入隊(duì)列中,如果某個(gè)node的前一個(gè)節(jié)點(diǎn)的next!=null,則該node一定位于隊(duì)列中
     * @param mode 模式
     * @return
     */
    private Node addWaiter(Node mode) {
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        Node pred = tail;
        if (pred != null) {
            // 雙端鏈表的add操作
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        //頭尾都為null 初始化以及enq
        enq(node);
        return node;
    }

    private Node enq(final Node node) {
        for (;;) {
            Node t = tail;
            if (t == null) {
                // tail為空,初始化head和tail節(jié)點(diǎn)  head lazy initialize
                if (compareAndSetHead(new Node())) {
                    tail = head;
                }
            } else {
                // 雙端鏈表的基本操作
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }
  1. 接著調(diào)用acquireQueued函數(shù)莺匠,將前驅(qū)節(jié)點(diǎn)的waitStatus標(biāo)記為SIGNAL后睡眠金吗,等待前驅(qū)節(jié)點(diǎn)釋放鎖后喚醒,被喚醒后則繼續(xù)嘗試獲取鎖
    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                // 自旋  當(dāng)且僅當(dāng)前驅(qū)節(jié)點(diǎn)是head,嘗試獲取鎖
                // 當(dāng)前驅(qū)節(jié)點(diǎn)等于head時(shí),說(shuō)明前驅(qū)線程為當(dāng)前正擁有鎖,或者剛剛釋放鎖并且喚醒了當(dāng)前節(jié)點(diǎn)
                if (p == head && tryAcquire(arg)) {
                    // 設(shè)置成功獲取鎖的node節(jié)點(diǎn)為隊(duì)列頭head
                    setHead(node);
                    // help GC
                    p.next = null; 
                    failed = false;
                    return interrupted;
                }
                // 如果前驅(qū)節(jié)點(diǎn)不是隊(duì)列head或者獲取鎖失敗,則設(shè)置前驅(qū)節(jié)點(diǎn)waitStatus為SIGNAL,并睡眠
                // shouldParkAfterFailedAcquire 前置節(jié)點(diǎn)waitStatus設(shè)為-1 返回false
                // parkAndCheckInterrupt 阻塞當(dāng)前線程 返回currentThread().isInterrupted(true)
                if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) {
                    interrupted = true;
                }
            }
        } finally {
            if (failed) {
                cancelAcquire(node);
            }
        }
    }

    protected boolean parkAndCheckInterrupt(){
        LockSupport.park(this);
        return Thread.interrupted();
    }

    private boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int ws = pred.waitStatus;
        if(ws == Node.SIGNAL){
            return true;
        }
        // ws = 1  CANCELLED  跳過(guò)取消的節(jié)點(diǎn)
        if(ws > 0){
            do {
                node.prev = pred = pred.prev;
            }while (pred.waitStatus > 0);
            pred.next = node;
        }else {
            // 0 或者 PROPAGATE
            compareAndSetWaitStatus(pred, ws, Node.CANCELLED);
        }
        return false;
    }

    /**
     * 線程在獲取鎖過(guò)程中,可能因?yàn)槌鲥e(cuò)趣竣、被中斷或超時(shí)而取消獲取鎖
     * @param node
     */
    protected void cancelAcquire(Node node){
        if (node == null) {
            return;
        }
        //線程引用清空
        node.thread = null;
        Node pred = node.prev;
        // 若前驅(qū)節(jié)點(diǎn)是CANCELLED,則跳過(guò)繼續(xù)往前找
        while (pred.waitStatus > 0) {
            node.prev = pred = pred.prev;
        }
        // 前驅(qū)節(jié)點(diǎn)中不是CANCELLED節(jié)點(diǎn),獲取pre next指向
        Node predNext = pred.next;
        // 設(shè)置waitStatus值為CANCELLED,標(biāo)記節(jié)點(diǎn)已取消
        node.waitStatus = Node.CANCELLED;

        // 如果被取消的node是尾部節(jié)點(diǎn),則設(shè)置tail指針指向前驅(qū)節(jié)點(diǎn),并且設(shè)置前驅(qū)節(jié)點(diǎn)的next指針為null
        if(node == tail && compareAndSetTail(node, pred)){
            compareAndSetNext(pred, predNext, null);
        }else {
            int ws;
            if(pred != head && ((ws = pred.waitStatus) == Node.SIGNAL || (ws <= 0 && compareAndSetWaitStatus(pred,ws,
                    Node.SIGNAL))) && pred.thread != null){
                Node next = node.next;
                if(next != null && next.waitStatus <= 0){
                    compareAndSetNext(pred, predNext, next);
                }
            }else {
                unparkSuccessor(node);
            }
            node.next = node;
        }

    }

    /**
     * 喚醒后繼節(jié)點(diǎn)
     * @param node
     */
    private void unparkSuccessor(Node node) {
        // 在獨(dú)占模式下,waitStatus<0此時(shí)為SIGNAL,將SIGNAL標(biāo)志清除
        int ws = node.waitStatus;
        if (ws < 0) {
            compareAndSetWaitStatus(node, ws, 0);
        }
        Node s = node.next;
        // node.next == null || node.next.waitStatus > 0,表示該node.next線程取消了獲取鎖,則從隊(duì)列尾部遍歷,找到隊(duì)列中第一個(gè)沒(méi)有取消的線程
        if(s == null || s.waitStatus > 0){
            s = null;
            for(Node t = tail;t != null && t != node; t = t.prev){
                if(t.waitStatus <= 0){
                    s = t;
                }
            }
        }
        if(s != null){
            LockSupport.unpark(s.thread);
        }
    }
  1. 如果線程睡眠過(guò)程中產(chǎn)生中斷摇庙,則調(diào)用selfInterrupt函數(shù)讓線程自我中斷一下,設(shè)置中斷標(biāo)志遥缕,將中斷傳遞給外層
    static void selfInterrupt() {
        Thread.currentThread().interrupt();
    }

3.4 獨(dú)占模式釋放鎖

    public final boolean release(int arg) {
        if(tryRelease(arg)){
            // 每個(gè)隊(duì)列中的線程在獲取到鎖以后,會(huì)將隊(duì)列中包含該線程的Node節(jié)點(diǎn)設(shè)置為head,所以head指向的Node即為當(dāng)前占有鎖的線程,也就是當(dāng)前正在進(jìn)行釋放鎖操作的線程
            Node h = head;
            if(h != null && h.waitStatus != 0){
                // 喚醒后續(xù)的結(jié)點(diǎn)
                unparkSuccessor(h);
            }
            return true;
        }
        return false;
    }

    /**
     * 喚醒后繼節(jié)點(diǎn)
     * @param node
     */
    private void unparkSuccessor(Node node) {
        // 在獨(dú)占模式下,waitStatus<0此時(shí)為SIGNAL,將SIGNAL標(biāo)志清除
        int ws = node.waitStatus;
        if (ws < 0) {
            compareAndSetWaitStatus(node, ws, 0);
        }
        Node s = node.next;
        // node.next == null || node.next.waitStatus > 0,表示該node.next線程取消了獲取鎖,則從隊(duì)列尾部遍歷,找到隊(duì)列中第一個(gè)沒(méi)有取消的線程
        if(s == null || s.waitStatus > 0){
            s = null;
            for(Node t = tail;t != null && t != node; t = t.prev){
                if(t.waitStatus <= 0){
                    s = t;
                }
            }
        }
        if(s != null){
            LockSupport.unpark(s.thread);
        }
    }

3.5 共享模式獲取鎖

共享模式獲取鎖與獨(dú)占模式獲取鎖卫袒,僅有一點(diǎn)區(qū)別,就是setHead的同時(shí)會(huì)喚醒下一個(gè)shared節(jié)點(diǎn)

    private void doAcquireShared(int arg){
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            boolean interrupted = false;
            for(;;){
                final Node p = node.predecessor();
                if(p == head){
                    int r = tryAcquireShared(arg);
                    if(r >= 0){
                        // 與獨(dú)占模式下獲取鎖唯一的區(qū)別就在下面這個(gè)setHeadAndPropagate函數(shù)
                        setHeadAndPropagate(node, r);
                        // help GC
                        p.next = null;
                        if (interrupted) {
                            selfInterrupt();
                        }
                        failed = false;
                        return;
                    }
                }
                if(shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()){
                    interrupted = true;
                }
            }
        } finally {
            if(failed){
                cancelAcquire(node);
            }
        }
    }

    private void setHeadAndPropagate(Node node, int propagate) {
        Node h = head;
        // 設(shè)置成功獲取鎖的node節(jié)點(diǎn)為隊(duì)列頭head
        setHead(node);
        /**
         *  propagate > 0 說(shuō)明許可還能夠繼續(xù)被線程acquire
         *  之前的head == null     未知狀態(tài)
         *  之前的head被設(shè)置為PROPAGAT  說(shuō)明需要往后傳遞
         *  當(dāng)前head = null   未知狀態(tài)
         *  當(dāng)前head被設(shè)置為PROPAGAT
         */
       
        if(propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0){
            Node s = node.next;
            if(s == null || s.isShared()){
                doReleaseShared();
            }
        }
    }

    private void doReleaseShared() {
        for(;;){
            Node h = head;
            if(h != null && h != tail){
                int ws = h.waitStatus;
                // SIGNAL狀態(tài)直接喚醒下一個(gè)節(jié)點(diǎn)
                if(ws == Node.SIGNAL){
                    if(!compareAndSetWaitStatus(h, Node.SIGNAL, 0)){
                        continue;
                    }
                    unparkSuccessor(h); 
                }else if(ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)){
                    //首節(jié)點(diǎn)的狀態(tài)是0,則通過(guò)CAS設(shè)置為PROPAGATE,表示如果連接在當(dāng)前節(jié)點(diǎn)后的node的狀態(tài)如果是shared,則無(wú)條件獲取鎖
                    continue;
                }
            }
            if(h == head){
                break;
            }
        }
    }

3.6 共享模式釋放鎖

    public final boolean releaseShared(int arg){
        if(tryReleaseShared(arg)){
            doReleaseShared();
            return true;
        }
        return false;
    }

4. Condition的實(shí)現(xiàn)原理

AQS通過(guò)一個(gè)FIFO單向鏈表來(lái)實(shí)現(xiàn)條件等待隊(duì)列


image

Condition接口定義了一個(gè)類(lèi)似于Object對(duì)象的監(jiān)視器的接口方法单匣。await()夕凝、 awaitNanos(long nanosTimeout)、signal()户秤、signalAll()可以對(duì)應(yīng)上Object對(duì)象的wait()码秉、wait(long timeout)、notify()鸡号、notifyAll()转砖;而且Condition的控制更加靈活精確

4.1 await()

  1. 新建Condition Node包裝線程,加入Condition隊(duì)列
  2. 釋放線程占有的鎖
  3. 在同步等待隊(duì)列中則睡眠等待 直到被喚醒并加入到同步隊(duì)列中
  4. 嘗試獲取鎖 此時(shí)節(jié)點(diǎn)已經(jīng)加入到同步隊(duì)列中
  5. 處理cancled節(jié)點(diǎn)
  6. 處理異常
        /**
         * 1. 新建Condition Node包裝線程,加入Condition隊(duì)列
         * 2. 釋放線程占有的鎖
         * 3. 在同步等待隊(duì)列中則睡眠等待  直到被喚醒并加入到同步隊(duì)列中
         * 4. 嘗試獲取鎖  此時(shí)節(jié)點(diǎn)已經(jīng)加入到同步隊(duì)列中
         * 5. 處理cancled節(jié)點(diǎn)
         * 6. 處理異常
         * @throws InterruptedException
         */
        @Override
        public final void await() throws InterruptedException {
            if(Thread.interrupted()){
                throw new InterruptedException();
            }
            // 新建Condition Node包裝線程,加入Condition隊(duì)列
            Node node = addConditionWaiter();
            // 釋放線程占有的鎖
            int savedState = fullyRelease(node);
            int interruptMode = 0;
            // 不在同步等待隊(duì)列中則睡眠等待  直到被喚醒并加入到同步隊(duì)列中
            while (!isOnSyncQueue(node)){
                LockSupport.park(this);
                // 檢查是否被打斷
                if((interruptMode = checkInterruptWhileWaiting(node)) != 0){
                    break;
                }
            }
            // 嘗試獲取鎖  異常不拋出
            if(acquireQueued(node, savedState) && interruptMode != THROW_IE){
                interruptMode = REINTERRUPT;
            }
            if(node.nextWaiter != null){
                unlinkCancelledWaiters();
            }
            if(interruptMode != 0){
                reportInterruptAfterWait(interruptMode);
            }
        }
        
        /**
         * 單向鏈表add  返回lastWaiter
         * @return
         */
        private Node addConditionWaiter() {
            Node t = lastWaiter;
            // lastWaiter CANCELLED  移除
            if(t != null && t.waitStatus != Node.CONDITION){
                unlinkCancelledWaiters();
                t = lastWaiter;
            }
            Node node = new Node(Thread.currentThread(), Node.CONDITION);
            if(t == null){
                firstWaiter = node;
            }else {
                t.nextWaiter = node;
            }
            lastWaiter = node;
            return node;
        }

        /**
         * 沒(méi)有打斷  0
         * interrupted before signalled  -1
         * interrupted after signalled    1
         * @param node
         * @return
         */
        private int checkInterruptWhileWaiting(Node node) {
            return Thread.interrupted() ? (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) : 0;
        }

        

isOnSyncQueue是AQS的方法鲸伴,功能是判斷節(jié)點(diǎn)是否在同步隊(duì)列中

    /**
     * 節(jié)點(diǎn)是否在同步隊(duì)列中
     * @param node
     * @return
     */
    private boolean isOnSyncQueue(Node node) {
        // 在條件隊(duì)列中
        // 或者同時(shí)別的線程調(diào)用signal,Node會(huì)從Condition隊(duì)列中移除,從移除到進(jìn)入release隊(duì)列,中間這段時(shí)間prev必然為null
        if(node.waitStatus == Node.CONDITION || node.prev == null){
            return false;
        }
        if(node.next != null){
            return true;
        }
        // 可能該Node剛剛最后一個(gè)進(jìn)入release隊(duì)列,所以是tail,其next必然是null,所以需要從隊(duì)尾向前查找
        return findNodeFromTail(node);
    }

transferAfterCancelledWait喚醒前或后被打斷

    /**
     * 喚醒前或者后被打斷
     * @param node
     * @return
     */
    private boolean transferAfterCancelledWait(Node node) {
        // 喚醒前線程被打斷(超時(shí)等原因?qū)⒉辉俚却? 直接加入到同步隊(duì)列等待獲取鎖
        if(compareAndSetWaitStatus(node, Node.CONDITION, 0)){
            enq(node);
            return true;
        }
        // 喚醒的時(shí)候被打斷  等待加入隊(duì)列(可以獲取鎖的狀態(tài))結(jié)束
        while (!isOnSyncQueue(node)){
            Thread.yield();
        }
        return false;
    }

4.2 signal()

  1. 條件隊(duì)列中移除first節(jié)點(diǎn)
  2. 跳過(guò)cancled節(jié)點(diǎn),將first加入到同步隊(duì)列直到加入成功

這里并沒(méi)有喚醒該線程堪藐,而是在await()的第4步才可能喚醒該線程

        /**
         * 1. 條件隊(duì)列中移除first節(jié)點(diǎn)
         * 2. 跳過(guò)cancled節(jié)點(diǎn),找到一個(gè)沒(méi)有取消的first放入release隊(duì)列
         * @param first
         */
        private void doSignal(Node first) {
            do {
                if((firstWaiter = first.nextWaiter) == null){
                    lastWaiter = null;
                }
                first.nextWaiter = null;
                
            }while (!transferForSignal(first) && (first = firstWaiter) != null);
        }

transferForSignal是AQS的方法

    /**
     * condition隊(duì)列中節(jié)點(diǎn)加入到同步隊(duì)列中
     * @param first
     * @return
     */
    private boolean transferForSignal(Node first) {
        // 無(wú)法cas waitStatus, 節(jié)點(diǎn)被取消
        if(!compareAndSetWaitStatus(first, Node.CONDITION, 0)){
            return false;
        }
        // p是該Node的前驅(qū)
        Node p = enq(first);
        int ws = p.waitStatus;
        // 前驅(qū)節(jié)點(diǎn)cancled或者變更waitStatus失敗,直接喚醒當(dāng)前線程
        if(ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)){
            LockSupport.unpark(first.thread);
        }
        return true;
    }

5. 簡(jiǎn)單注釋源碼

public abstract class MyAbstractQueuedSynchronizer {

    /**
     * <pre>
     *      +------+  prev +-----+       +-----+
     * head |      | <---- |     | <---- |     |  tail
     *      |      | ----> |     | ----> |     |  tail
     *      +------+       +-----+       +-----+
     * </pre>
     */

    /** 雙端隊(duì)列的頭,只有setHead可以修改 */
    private transient volatile Node head;
    /** 端隊(duì)列的尾,只有enq可以修改 */
    private transient volatile Node tail;
    /** 同步狀態(tài) */
    private volatile int state;

    protected final int getState() {
        return state;
    }

    /** 節(jié)點(diǎn)自旋還是阻塞的超時(shí)時(shí)間 */
    static final long spinForTimeoutThreshold = 1000L;

    protected MyAbstractQueuedSynchronizer() {
    }

    protected boolean tryAcquire(int arg){
        throw new UnsupportedOperationException();
    }

    protected boolean tryRelease(int arg){
        throw new UnsupportedOperationException();
    }

    protected boolean isHeldExclusively() {
        throw new UnsupportedOperationException();
    }


    /**
     * 獨(dú)占模式下線程獲取鎖的方法,該函數(shù)忽略中斷,即線程在aquire過(guò)程中,中斷此線程是無(wú)效的
     * @param arg
     */
    public final void acquire(int arg) {
        if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) {
            selfInterrupt();
        }
    }

    /**
     * 釋放鎖
     * @param arg
     * @return
     */
    public final boolean release(int arg) {
        if(tryRelease(arg)){
            // 每個(gè)隊(duì)列中的線程在獲取到鎖以后,會(huì)將隊(duì)列中包含該線程的Node節(jié)點(diǎn)設(shè)置為head,所以head指向的Node即為當(dāng)前占有鎖的線程,也就是當(dāng)前正在進(jìn)行釋放鎖操作的線程
            Node h = head;
            if(h != null && h.waitStatus != 0){
                // 喚醒后續(xù)的結(jié)點(diǎn)
                unparkSuccessor(h);
            }
            return true;
        }
        return false;
    }


    /**
     * doAcquireShared與acquireQueued僅有一點(diǎn)區(qū)別
     * @param arg
     */
    private void doAcquireShared(int arg){
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            boolean interrupted = false;
            for(;;){
                final Node p = node.predecessor();
                if(p == head){
                    int r = tryAcquireShared(arg);
                    if(r >= 0){
                        // 與獨(dú)占模式下獲取鎖唯一的區(qū)別就在下面這個(gè)setHeadAndPropagate函數(shù)
                        setHeadAndPropagate(node, r);
                        // help GC
                        p.next = null;
                        if (interrupted) {
                            selfInterrupt();
                        }
                        failed = false;
                        return;
                    }
                }
                if(shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()){
                    interrupted = true;
                }
            }
        } finally {
            if(failed){
                cancelAcquire(node);
            }
        }
    }

    public final boolean releaseShared(int arg){
        if(tryReleaseShared(arg)){
            doReleaseShared();
            return true;
        }
        return false;
    }


    /**
     *
     * @param node
     * @param propagate
     */
    private void setHeadAndPropagate(Node node, int propagate) {
        Node h = head;
        // 設(shè)置成功獲取鎖的node節(jié)點(diǎn)為隊(duì)列頭head
        setHead(node);
        /**
         *  propagate > 0 說(shuō)明許可還能夠繼續(xù)被線程acquire
         *  之前的head == null     未知狀態(tài)
         *  之前的head被設(shè)置為PROPAGAT  說(shuō)明需要往后傳遞
         *  當(dāng)前head = null   未知狀態(tài)
         *  當(dāng)前head被設(shè)置為PROPAGAT
         */

        if(propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0){
            Node s = node.next;
            if(s == null || s.isShared()){
                doReleaseShared();
            }
        }
    }

    private void doReleaseShared() {
        for(;;){
            Node h = head;
            if(h != null && h != tail){
                int ws = h.waitStatus;
                // SIGNAL狀態(tài)直接喚醒下一個(gè)節(jié)點(diǎn)
                if(ws == Node.SIGNAL){
                    if(!compareAndSetWaitStatus(h, Node.SIGNAL, 0)){
                        continue;
                    }
                    unparkSuccessor(h);
                }else if(ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)){
                    //首節(jié)點(diǎn)的狀態(tài)是0,則通過(guò)CAS設(shè)置為PROPAGATE,表示如果連接在當(dāng)前節(jié)點(diǎn)后的node的狀態(tài)如果是shared,則無(wú)條件獲取鎖
                    continue;
                }
            }
            if(h == head){
                break;
            }
        }
    }

    /**
     * 子類(lèi)實(shí)現(xiàn)
     * @param arg
     * @return
     */
    protected int tryAcquireShared(int arg){
        throw new UnsupportedOperationException();
    }

    private boolean tryReleaseShared(int arg) {
        throw new UnsupportedOperationException();
    }

    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                // 自旋  當(dāng)且僅當(dāng)前驅(qū)節(jié)點(diǎn)是head,嘗試獲取鎖
                // 當(dāng)前驅(qū)節(jié)點(diǎn)等于head時(shí),說(shuō)明前驅(qū)線程為當(dāng)前正擁有鎖,或者剛剛釋放鎖并且喚醒了當(dāng)前節(jié)點(diǎn)
                if (p == head && tryAcquire(arg)) {
                    // 設(shè)置成功獲取鎖的node節(jié)點(diǎn)為隊(duì)列頭head
                    setHead(node);
                    // help GC
                    p.next = null;
                    failed = false;
                    return interrupted;
                }
                // 如果前驅(qū)節(jié)點(diǎn)不是隊(duì)列head或者獲取鎖失敗,則設(shè)置前驅(qū)節(jié)點(diǎn)waitStatus為SIGNAL,并睡眠
                // shouldParkAfterFailedAcquire 前置節(jié)點(diǎn)waitStatus設(shè)為-1 返回false
                // parkAndCheckInterrupt 阻塞當(dāng)前線程 返回currentThread().isInterrupted(true)
                if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) {
                    interrupted = true;
                }
            }
        } finally {
            if (failed) {
                cancelAcquire(node);
            }
        }
    }

    /**
     * 此處在尾部插入node時(shí),先設(shè)置node的prev挑围,再CAS修改隊(duì)列tail指向礁竞,修改成功再設(shè)置前一個(gè)節(jié)點(diǎn)的next域
     * 隊(duì)列中,如果某個(gè)node的prev!=null,并不一定表示node已經(jīng)成功插入隊(duì)列中,如果某個(gè)node的前一個(gè)節(jié)點(diǎn)的next!=null,則該node一定位于隊(duì)列中
     * @param mode 模式
     * @return
     */
    private Node addWaiter(Node mode) {
        Node node = new Node(Thread.currentThread(), mode);
        Node pred = tail;
        if (pred != null) {
            // 雙端鏈表的add操作
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        //頭尾都為null 初始化以及enq
        enq(node);
        return node;
    }


    private Node enq(final Node node) {
        for (;;) {
            Node t = tail;
            if (t == null) {
                // tail為空,初始化head和tail節(jié)點(diǎn)  head lazy initialize
                if (compareAndSetHead(new Node())) {
                    tail = head;
                }
            } else {
                // 雙端鏈表的基本操作
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }

    /**
     * 線程在獲取鎖過(guò)程中,可能因?yàn)槌鲥e(cuò)杉辙、被中斷或超時(shí)而取消獲取鎖
     * @param node
     */
    protected void cancelAcquire(Node node){
        if (node == null) {
            return;
        }
        //線程引用清空
        node.thread = null;
        Node pred = node.prev;
        // 若前驅(qū)節(jié)點(diǎn)是CANCELLED,則跳過(guò)繼續(xù)往前找
        while (pred.waitStatus > 0) {
            node.prev = pred = pred.prev;
        }
        // 前驅(qū)節(jié)點(diǎn)中不是CANCELLED節(jié)點(diǎn),獲取pre next指向
        Node predNext = pred.next;
        // 設(shè)置waitStatus值為CANCELLED,標(biāo)記節(jié)點(diǎn)已取消
        node.waitStatus = Node.CANCELLED;

        // 如果被取消的node是尾部節(jié)點(diǎn),則設(shè)置tail指針指向前驅(qū)節(jié)點(diǎn),并且設(shè)置前驅(qū)節(jié)點(diǎn)的next指針為null
        if(node == tail && compareAndSetTail(node, pred)){
            compareAndSetNext(pred, predNext, null);
        }else {
            int ws;
            if(pred != head && ((ws = pred.waitStatus) == Node.SIGNAL || (ws <= 0 && compareAndSetWaitStatus(pred,ws,
                    Node.SIGNAL))) && pred.thread != null){
                Node next = node.next;
                if(next != null && next.waitStatus <= 0){
                    compareAndSetNext(pred, predNext, next);
                }
            }else {
                unparkSuccessor(node);
            }
            node.next = node;
        }

    }


    /**
     * 喚醒后繼節(jié)點(diǎn)
     * @param node
     */
    private void unparkSuccessor(Node node) {
        // 在獨(dú)占模式下,waitStatus<0此時(shí)為SIGNAL,將SIGNAL標(biāo)志清除
        int ws = node.waitStatus;
        if (ws < 0) {
            compareAndSetWaitStatus(node, ws, 0);
        }
        Node s = node.next;
        // node.next == null || node.next.waitStatus > 0,表示該node.next線程取消了獲取鎖,則從隊(duì)列尾部遍歷,找到隊(duì)列中第一個(gè)沒(méi)有取消的線程
        if(s == null || s.waitStatus > 0){
            s = null;
            for(Node t = tail;t != null && t != node; t = t.prev){
                if(t.waitStatus <= 0){
                    s = t;
                }
            }
        }
        if(s != null){
            LockSupport.unpark(s.thread);
        }
    }


    protected boolean parkAndCheckInterrupt(){
        LockSupport.park(this);
        return Thread.interrupted();
    }

    private boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int ws = pred.waitStatus;
        if(ws == Node.SIGNAL){
            return true;
        }
        // ws = 1  CANCELLED  跳過(guò)取消的節(jié)點(diǎn)
        if(ws > 0){
            do {
                node.prev = pred = pred.prev;
            }while (pred.waitStatus > 0);
            pred.next = node;
        }else {
            // 0 或者 PROPAGATE
            compareAndSetWaitStatus(pred, ws, Node.CANCELLED);
        }
        return false;
    }

    private void setHead(Node node) {
        head = node;
        node.thread = null;
        node.prev = null;
    }

    private int fullyRelease(Node node) {
        boolean failed = true;
        try {
            int savedState = getState();
            if (release(savedState)) {
                failed = false;
                return savedState;
            } else {
                throw new IllegalMonitorStateException();
            }
        } finally {
            if(failed){
                node.waitStatus = Node.CANCELLED;
            }
        }
    }

    /**
     * 節(jié)點(diǎn)是否在同步隊(duì)列中
     * @param node
     * @return
     */
    private boolean isOnSyncQueue(Node node) {
        // 在條件隊(duì)列中
        // 或者同時(shí)別的線程調(diào)用signal,Node會(huì)從Condition隊(duì)列中移除,從移除到進(jìn)入release隊(duì)列,中間這段時(shí)間prev必然為null
        if(node.waitStatus == Node.CONDITION || node.prev == null){
            return false;
        }
        if(node.next != null){
            return true;
        }
        // 可能該Node剛剛最后一個(gè)進(jìn)入release隊(duì)列,所以是tail,其next必然是null,所以需要從隊(duì)尾向前查找
        return findNodeFromTail(node);
    }

    private boolean findNodeFromTail(Node node) {
        Node t = tail;
        for (;;) {
            if (t == node) {
                return true;
            }
            if (t == null) {
                return false;
            }
            t = t.prev;
        }
    }


    /**
     * 喚醒前或者后被打斷
     * @param node
     * @return
     */
    private boolean transferAfterCancelledWait(Node node) {
        // 喚醒前線程被打斷(超時(shí)等原因?qū)⒉辉俚却? 直接加入到同步隊(duì)列等待獲取鎖
        if(compareAndSetWaitStatus(node, Node.CONDITION, 0)){
            enq(node);
            return true;
        }
        // 喚醒的時(shí)候被打斷  等待加入隊(duì)列(可以獲取鎖的狀態(tài))結(jié)束
        while (!isOnSyncQueue(node)){
            Thread.yield();
        }
        return false;
    }

    /**
     * condition隊(duì)列中節(jié)點(diǎn)加入到同步隊(duì)列中
     * @param first
     * @return
     */
    private boolean transferForSignal(Node first) {
        // 無(wú)法cas waitStatus, 節(jié)點(diǎn)被取消
        if(!compareAndSetWaitStatus(first, Node.CONDITION, 0)){
            return false;
        }
        // p是該Node的前驅(qū)
        Node p = enq(first);
        int ws = p.waitStatus;
        // 前驅(qū)節(jié)點(diǎn)cancled或者變更waitStatus失敗,直接喚醒當(dāng)前線程
        if(ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)){
            LockSupport.unpark(first.thread);
        }
        return true;
    }

    static void selfInterrupt() {
        Thread.currentThread().interrupt();
    }

    /** 原子性交換state */
    protected final boolean compareAndSetState(int expect, int update) {
        return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
    }

    /** 原子性交換head   僅enq調(diào)用 */
    private final boolean compareAndSetHead(Node update) {
        return unsafe.compareAndSwapObject(this, headOffset, null, update);
    }

    /** 原子性交換tail  僅enq調(diào)用 */
    private final boolean compareAndSetTail(Node expect, Node update) {
        return unsafe.compareAndSwapObject(this, tailOffset, expect, update);
    }

    private static final boolean compareAndSetWaitStatus(Node node,
                                                         int expect,
                                                         int update) {
        return unsafe.compareAndSwapInt(node, waitStatusOffset,
                expect, update);
    }

    private static final boolean compareAndSetNext(Node node, Node expect, Node update) {
        return unsafe.compareAndSwapObject(node, nextOffset, expect, update);
    }



    static final class Node {
        /** 標(biāo)記當(dāng)前結(jié)點(diǎn)是共享模式 */
        static final Node SHARED = new Node();
        /** 標(biāo)記當(dāng)前結(jié)點(diǎn)是獨(dú)占模式 */
        static final Node EXCLUSIVE = null;
        /**
         * 結(jié)點(diǎn)的等待狀態(tài) 可能的取值包含:CANCELLED模捂、SIGNAL、CONDITION、PROPAGATE和0
         * 在同步等待隊(duì)列中的節(jié)點(diǎn)初始值為0,在條件等待隊(duì)列中的節(jié)點(diǎn)初始值為CONDITION
         * 在獨(dú)占模式下狂男,取值為CANCELLED综看、SIGNAL、0中之一
         * 在共享模式下岖食,取值為CANCELLED红碑、SIGNAL、PROPAGATE和0中之一
         * 在條件等待隊(duì)列中泡垃,取值為CANCELLED析珊、CONDITION中之一
         */
        volatile int waitStatus;
        /** 擁有當(dāng)前結(jié)點(diǎn)的線程 */
        volatile Thread thread;

        /** 線程已經(jīng)被取消 */
        static final int CANCELLED =  1;
        /**
         *  后續(xù)節(jié)點(diǎn)需要喚醒
         *  節(jié)點(diǎn)插入隊(duì)列時(shí),節(jié)點(diǎn)代表的線程睡眠前會(huì)將前一個(gè)節(jié)點(diǎn)的waitStatus置為SIGNAL
         *  當(dāng)前一個(gè)節(jié)點(diǎn)釋放鎖時(shí)蔑穴,如果其waitStatus置為SIGNAL忠寻,則會(huì)喚醒其后下一個(gè)節(jié)點(diǎn)線程
         */
        static final int SIGNAL    = -1;
        /** 表示節(jié)點(diǎn)代表的線程正處于條件等待隊(duì)列中等待signal信號(hào) */
        static final int CONDITION = -2;
        /** 在共享模式下使用,表示同步狀態(tài)能夠無(wú)條件向后傳播 */
        static final int PROPAGATE = -3;

        volatile Node prev;

        volatile Node next;
        /**
         * 在條件等待隊(duì)列中,用于指向下一個(gè)節(jié)點(diǎn)
         * 在同步等待隊(duì)列中存和,用于標(biāo)記該節(jié)點(diǎn)所代表的線程在獨(dú)占模式下還是共享模式下獲取鎖
         */
        Node nextWaiter;

        public Node() {
        }

        /**
         *
         * @param thread 一般為當(dāng)前線程
         * @param mode 排他EXCLUSIVE  共享SHARED
         */
        public Node(Thread thread, Node mode) {
            this.thread = thread;
            this.nextWaiter = mode;
        }

        public Node(Thread thread, int waitStatus) {
            this.waitStatus = waitStatus;
            this.thread = thread;
        }

        final boolean isShared() {
            return nextWaiter == SHARED;
        }

        final Node predecessor() throws NullPointerException {
            Node p = prev;
            if (p == null) {
                throw new NullPointerException();
            } else {
                return p;
            }
        }
    }

    private static final Unsafe unsafe = Unsafe.getUnsafe();
    private static final long stateOffset;
    private static final long headOffset;
    private static final long tailOffset;
    private static final long waitStatusOffset;
    private static final long nextOffset;

    static {
        try {
            stateOffset = unsafe.objectFieldOffset
                    (MyAbstractQueuedSynchronizer.class.getDeclaredField("state"));
            headOffset = unsafe.objectFieldOffset
                    (MyAbstractQueuedSynchronizer.class.getDeclaredField("head"));
            tailOffset = unsafe.objectFieldOffset
                    (MyAbstractQueuedSynchronizer.class.getDeclaredField("tail"));
            waitStatusOffset = unsafe.objectFieldOffset
                    (Node.class.getDeclaredField("waitStatus"));
            nextOffset = unsafe.objectFieldOffset
                    (Node.class.getDeclaredField("next"));

        } catch (Exception ex) { throw new Error(ex); }
    }


    public class ConditionObject implements Condition, Serializable{

        private transient Node firstWaiter;

        private transient Node lastWaiter;

        private static final int REINTERRUPT =  1;

        private static final int THROW_IE    = -1;

        public ConditionObject() {}


        /**
         * 1. 新建Condition Node包裝線程,加入Condition隊(duì)列
         * 2. 釋放線程占有的鎖
         * 3. 在同步等待隊(duì)列中則睡眠等待  直到被喚醒并加入到同步隊(duì)列中
         * 4. 嘗試獲取鎖  此時(shí)節(jié)點(diǎn)已經(jīng)加入到同步隊(duì)列中
         * 5. 處理cancled節(jié)點(diǎn)
         * 6. 處理異常
         * @throws InterruptedException
         */
        @Override
        public final void await() throws InterruptedException {
            if(Thread.interrupted()){
                throw new InterruptedException();
            }
            // 新建Condition Node包裝線程,加入Condition隊(duì)列
            Node node = addConditionWaiter();
            // 釋放線程占有的鎖
            int savedState = fullyRelease(node);
            int interruptMode = 0;
            // 不在同步等待隊(duì)列中則睡眠等待  直到被喚醒并加入到同步隊(duì)列中
            while (!isOnSyncQueue(node)){
                LockSupport.park(this);
                // 檢查是否被打斷
                if((interruptMode = checkInterruptWhileWaiting(node)) != 0){
                    break;
                }
            }
            // 嘗試獲取鎖  異常不拋出
            if(acquireQueued(node, savedState) && interruptMode != THROW_IE){
                interruptMode = REINTERRUPT;
            }
            if(node.nextWaiter != null){
                unlinkCancelledWaiters();
            }
            if(interruptMode != 0){
                reportInterruptAfterWait(interruptMode);
            }
        }

        @Override
        public void awaitUninterruptibly() {

        }

        @Override
        public long awaitNanos(long nanosTimeout) throws InterruptedException {
            return 0;
        }

        @Override
        public boolean await(long time, TimeUnit unit) throws InterruptedException {
            return false;
        }

        @Override
        public boolean awaitUntil(Date deadline) throws InterruptedException {
            return false;
        }

        @Override
        public void signal() {
            if(!isHeldExclusively()){
                throw new IllegalMonitorStateException();
            }
            Node first = firstWaiter;
            if(first != null){
                doSignal(first);
            }
        }


        @Override
        public void signalAll() {

        }

        /**
         * 1. 條件隊(duì)列中移除first節(jié)點(diǎn)
         * 2. 跳過(guò)cancled節(jié)點(diǎn),找到一個(gè)沒(méi)有取消的first放入release隊(duì)列
         * @param first
         */
        private void doSignal(Node first) {
            do {
                if((firstWaiter = first.nextWaiter) == null){
                    lastWaiter = null;
                }
                first.nextWaiter = null;
                
            }while (!transferForSignal(first) && (first = firstWaiter) != null);
        }


        private void reportInterruptAfterWait(int interruptMode) throws InterruptedException {
            if(interruptMode == THROW_IE){
                throw new InterruptedException();
            }else if (interruptMode == REINTERRUPT) {
                selfInterrupt();
            }
        }

        /**
         * 沒(méi)有打斷 ∞忍辍0
         * interrupted before signalled  -1
         * interrupted after signalled    1
         * @param node
         * @return
         */
        private int checkInterruptWhileWaiting(Node node) {
            return Thread.interrupted() ? (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) : 0;
        }

        /**
         * 單向鏈表add  返回lastWaiter
         * @return
         */
        private Node addConditionWaiter() {
            Node t = lastWaiter;
            // lastWaiter CANCELLED  移除
            if(t != null && t.waitStatus != Node.CONDITION){
                unlinkCancelledWaiters();
                t = lastWaiter;
            }
            Node node = new Node(Thread.currentThread(), Node.CONDITION);
            if(t == null){
                firstWaiter = node;
            }else {
                t.nextWaiter = node;
            }
            lastWaiter = node;
            return node;
        }

        /**
         * 單向鏈表去掉某個(gè)節(jié)點(diǎn)的過(guò)程
         */
        private void unlinkCancelledWaiters() {
            Node t = firstWaiter;
            Node trail = null;
            while (t != null){
                Node next = t.nextWaiter;
                if(t.waitStatus != Node.CONDITION){
                    t.nextWaiter = null;
                    if(trail == null){
                        firstWaiter = next;
                    }else {
                        trail.nextWaiter = next;
                    }
                }else {
                    trail = t;
                }
                t = next;
            }
        }

    }
    
}
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市捐腿,隨后出現(xiàn)的幾起案子纵朋,更是在濱河造成了極大的恐慌,老刑警劉巖茄袖,帶你破解...
    沈念sama閱讀 212,816評(píng)論 6 492
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件倡蝙,死亡現(xiàn)場(chǎng)離奇詭異,居然都是意外死亡绞佩,警方通過(guò)查閱死者的電腦和手機(jī)寺鸥,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,729評(píng)論 3 385
  • 文/潘曉璐 我一進(jìn)店門(mén),熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)品山,“玉大人胆建,你說(shuō)我怎么就攤上這事≈饨唬” “怎么了笆载?”我有些...
    開(kāi)封第一講書(shū)人閱讀 158,300評(píng)論 0 348
  • 文/不壞的土叔 我叫張陵,是天一觀的道長(zhǎng)涯呻。 經(jīng)常有香客問(wèn)我凉驻,道長(zhǎng),這世上最難降的妖魔是什么复罐? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 56,780評(píng)論 1 285
  • 正文 為了忘掉前任涝登,我火速辦了婚禮,結(jié)果婚禮上效诅,老公的妹妹穿的比我還像新娘胀滚。我一直安慰自己趟济,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 65,890評(píng)論 6 385
  • 文/花漫 我一把揭開(kāi)白布咽笼。 她就那樣靜靜地躺著顷编,像睡著了一般。 火紅的嫁衣襯著肌膚如雪剑刑。 梳的紋絲不亂的頭發(fā)上媳纬,一...
    開(kāi)封第一講書(shū)人閱讀 50,084評(píng)論 1 291
  • 那天,我揣著相機(jī)與錄音施掏,去河邊找鬼钮惠。 笑死,一個(gè)胖子當(dāng)著我的面吹牛其监,可吹牛的內(nèi)容都是我干的萌腿。 我是一名探鬼主播限匣,決...
    沈念sama閱讀 39,151評(píng)論 3 410
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼抖苦,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼!你這毒婦竟也來(lái)了米死?” 一聲冷哼從身側(cè)響起锌历,我...
    開(kāi)封第一講書(shū)人閱讀 37,912評(píng)論 0 268
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤,失蹤者是張志新(化名)和其女友劉穎峦筒,沒(méi)想到半個(gè)月后究西,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 44,355評(píng)論 1 303
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡物喷,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,666評(píng)論 2 327
  • 正文 我和宋清朗相戀三年卤材,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片峦失。...
    茶點(diǎn)故事閱讀 38,809評(píng)論 1 341
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡扇丛,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出尉辑,到底是詐尸還是另有隱情帆精,我是刑警寧澤,帶...
    沈念sama閱讀 34,504評(píng)論 4 334
  • 正文 年R本政府宣布隧魄,位于F島的核電站卓练,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏购啄。R本人自食惡果不足惜襟企,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 40,150評(píng)論 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望狮含。 院中可真熱鬧整吆,春花似錦拱撵、人聲如沸。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 30,882評(píng)論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)。三九已至府蛇,卻和暖如春集索,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背汇跨。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 32,121評(píng)論 1 267
  • 我被黑心中介騙來(lái)泰國(guó)打工务荆, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人穷遂。 一個(gè)月前我還...
    沈念sama閱讀 46,628評(píng)論 2 362
  • 正文 我出身青樓函匕,卻偏偏與公主長(zhǎng)得像,于是被迫代替她去往敵國(guó)和親蚪黑。 傳聞我的和親對(duì)象是個(gè)殘疾皇子盅惜,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 43,724評(píng)論 2 351

推薦閱讀更多精彩內(nèi)容