AbstractQueuedSynchronizer源碼深度解析

總體介紹

基于隊(duì)列的抽象同步器报慕,它是jdk中所有顯示的線程同步工具的基礎(chǔ)深浮,像ReentrantLock/DelayQueue/CountdownLatch等等,都是借助AQS實(shí)現(xiàn)的眠冈。Java中已經(jīng)有了synchronized關(guān)鍵字飞苇,那么為什么還需要來(lái)這么一出呢?因?yàn)锳QS能實(shí)現(xiàn)更多維度蜗顽,更多場(chǎng)景的鎖機(jī)制布卡,例如共享鎖(讀鎖)/基于條件的線程阻塞/可以實(shí)現(xiàn)公平和非公平的鎖策略/可以實(shí)現(xiàn)鎖等待的中斷,而synchronized關(guān)鍵字由JVM實(shí)現(xiàn)雇盖,在代碼使用層面來(lái)說(shuō)忿等,如果僅僅是使用獨(dú)占鎖,那synchronized關(guān)鍵字比其它的鎖實(shí)現(xiàn)用起來(lái)方便崔挖。
下面步入正題贸街,來(lái)看看AQS都提供了哪些能力。

1虚汛、主體結(jié)構(gòu)

先來(lái)看看內(nèi)部的主體結(jié)構(gòu):

public abstract class AbstractQueuedSynchronizer
    extends AbstractOwnableSynchronizer
    implements java.io.Serializable {
        

    protected AbstractQueuedSynchronizer() { }
    
    //內(nèi)部類(lèi)Node模捂,代表每一個(gè)獲取或者釋放鎖的線程
    static final class Node{}
    
    //head 和 tail構(gòu)成了同步隊(duì)列鏈表的頭節(jié)點(diǎn)和尾節(jié)點(diǎn)舅逸。
    private transient volatile Node head;
    
    private transient volatile Node tail;
    
    //同步狀態(tài)值桶蛔,所有的同步行為都是通過(guò)state這個(gè)共享資源來(lái)實(shí)現(xiàn)的桶错。
    private volatile int state;
    
    //條件對(duì)象,用于同步在當(dāng)前鎖對(duì)象上的線程
    public class ConditionObject implements Condition, java.io.Serializable{}
    
    /******
    一系列的內(nèi)部方法:
    包括嘗試獲取鎖權(quán)力
    嘗試獲取鎖失敗后構(gòu)造節(jié)點(diǎn)放入等待隊(duì)列
    各種入隊(duì)出隊(duì)的操作
    嘗試釋放鎖等
    ********/
    
    
    //一系列的Unsafe操作
    private static final Unsafe unsafe = Unsafe.getUnsafe();
    private static final long stateOffset;
    private static final long headOffset;
    private static final long tailOffset;
    private static final long waitStatusOffset;
    private static final long nextOffset;
}

所以總結(jié)起來(lái)AQS就是通過(guò)一系列的Unsafe方法去操作一個(gè)鏈表隊(duì)列将谊,而鏈表隊(duì)列中每個(gè)節(jié)點(diǎn)需要操作的共享資源就是一個(gè)int state字段冷溶,如果要用到條件等待,則需要了解ConditionObject尊浓。

2逞频、節(jié)點(diǎn)類(lèi)的構(gòu)造

再來(lái)看看Node節(jié)點(diǎn)的內(nèi)部結(jié)構(gòu):

static final class Node {
    //常量,見(jiàn)nextWaiter屬性
    static final Node SHARED = new Node();
    static final Node EXCLUSIVE = null;

    //waitStatus 常量值栋齿。表示等待獲取鎖的線程已經(jīng)被取消(線程中斷/等待超時(shí))
    static final int CANCELLED =  1;
    //waitStatus 常量值苗胀。表示后繼線程需要unpark  
    static final int SIGNAL    = -1;
    //waitStatus 常量值襟诸。表示線程正在Condition隊(duì)列中
    static final int CONDITION = -2;
    //waitStatus 常量值。表示線程的acquireShared行為需要無(wú)條件的向隊(duì)列的下一個(gè)節(jié)點(diǎn)傳遞基协。用在共享鎖的場(chǎng)景歌亲。
    static final int PROPAGATE = -3;
    //注意,waitStatus除了以上常量值以為澜驮,由于是int類(lèi)型陷揪,則默認(rèn)是0
    volatile int waitStatus;

    //前繼節(jié)點(diǎn)
    volatile Node prev;

    //后繼節(jié)點(diǎn)
    volatile Node next;

    //節(jié)點(diǎn)所代表的線程
    volatile Thread thread;

    //如果節(jié)點(diǎn)再Condition等待隊(duì)列中,則該字段指向下一個(gè)節(jié)點(diǎn)杂穷,如果節(jié)點(diǎn)在同步隊(duì)列中悍缠,則為一個(gè)標(biāo)志位,其值為 SHARED或者null
    Node nextWaiter;

    final boolean isShared() {
        return nextWaiter == SHARED;
    }

    final Node predecessor() throws NullPointerException {
        Node p = prev;
        if (p == null)
            throw new NullPointerException();
        else
            return p;
    }

    Node() {    // Used to establish initial head or SHARED marker
    }

    Node(Thread thread, Node mode) {     // Used by addWaiter
        this.nextWaiter = mode;
        this.thread = thread;
    }

    Node(Thread thread, int waitStatus) { // Used by Condition
        this.waitStatus = waitStatus;
        this.thread = thread;
    }
}

所以在同步隊(duì)列中Node應(yīng)該長(zhǎng)下面這個(gè)樣子:


同步隊(duì)列

在Condition等待隊(duì)列中應(yīng)該長(zhǎng)如下這個(gè)樣子:


等待隊(duì)列

Node節(jié)點(diǎn)一共有3個(gè)構(gòu)造方法他們分別在同步隊(duì)列初始化時(shí)/加入同步隊(duì)列/加入Condition隊(duì)列時(shí)使用耐量,結(jié)合構(gòu)造方法以及AQS和Condition類(lèi)飞蚓,我們看下最終隊(duì)列會(huì)長(zhǎng)什么樣子。
首先看一下無(wú)參的構(gòu)造方法在什么地方使用:
private Node addWaiter(Node mode) {
    Node node = new Node(Thread.currentThread(), mode);  //用當(dāng)前線程和mode類(lèi)型構(gòu)造節(jié)點(diǎn)廊蜒,此時(shí)waitStatus默認(rèn)為0
    // Try the fast path of enq; backup to full enq on failure
    Node pred = tail;
    if (pred != null) {
        node.prev = pred;
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    enq(node);
    return node;
}

private Node enq(final Node node) {
    for (;;) {
        Node t = tail;
        if (t == null) { // Must initialize
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
            node.prev = t;
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}

addWaiter方法是將線程放入隊(duì)列的方法玷坠,開(kāi)始構(gòu)造new Node(Thread.currentThread(), mode)這樣的節(jié)點(diǎn),然后嘗試入隊(duì)列劲藐,如果入隊(duì)列失敗則進(jìn)入enq方法,在enq中樟凄,如果隊(duì)列為空(當(dāng)t==null的時(shí)候聘芜,因?yàn)槿腙?duì)操作是從隊(duì)尾進(jìn)行的),此時(shí)構(gòu)造了一個(gè)空的Node節(jié)點(diǎn)缝龄,然后將head和tail指向它汰现。我們稱這個(gè)節(jié)點(diǎn)為哨兵節(jié)點(diǎn),為什么需要這樣一個(gè)節(jié)點(diǎn)叔壤,后面會(huì)說(shuō)明瞎饲,哨兵節(jié)點(diǎn)入隊(duì)列后我們通過(guò)當(dāng)前線程構(gòu)造的的節(jié)點(diǎn)new Node(Thread.currentThread(), mode)再入隊(duì)列。由于構(gòu)造方法沒(méi)有傳入waitStatus值炼绘,所以此時(shí)waitStatus默認(rèn)為0嗅战。

所以綜合AQS來(lái)看,最終的同步隊(duì)列應(yīng)該長(zhǎng)如下這個(gè)樣子:

同步隊(duì)列中Node節(jié)點(diǎn)的存放情況

那么在Condition等待隊(duì)列中呢? Condition隊(duì)列中有firstWaiter和lastWaiter分別指向頭節(jié)點(diǎn)和尾節(jié)點(diǎn)俺亮。
先看下加入Condition隊(duì)列的代碼:

private Node addConditionWaiter() {
    Node t = lastWaiter;
    // If lastWaiter is cancelled, clean out.
    if (t != null && t.waitStatus != Node.CONDITION) {
        unlinkCancelledWaiters();
        t = lastWaiter;
    }
    //構(gòu)造waitStatus等于CONDITION的NODE節(jié)點(diǎn)
    Node node = new Node(Thread.currentThread(), Node.CONDITION);
    if (t == null) //如果隊(duì)列為空驮捍,則新節(jié)點(diǎn)入隊(duì)列,頭節(jié)點(diǎn)指向新節(jié)點(diǎn)
        firstWaiter = node;
    else
        t.nextWaiter = node; //如果隊(duì)列不為空脚曾,則新節(jié)點(diǎn)加入到隊(duì)列末尾
    lastWaiter = node;  //尾節(jié)點(diǎn)指向新節(jié)點(diǎn)
    return node;
}

所以东且,最終Condition隊(duì)列中的節(jié)點(diǎn)如下:


Condition隊(duì)列中的節(jié)點(diǎn)

Condition隊(duì)列中的節(jié)點(diǎn)是單向的,并且沒(méi)有哨兵節(jié)點(diǎn)本讥,在Condition隊(duì)列中珊泳,nextWaiter指向下一個(gè)節(jié)點(diǎn)鲁冯,而不是像在同步隊(duì)列中那樣指向模式(SHARED, null),其中各個(gè)節(jié)點(diǎn)的waitStatus等于CONDITION(-2)色查。

3薯演、獲取鎖

3.1 首先看寫(xiě)鎖lock的實(shí)現(xiàn)

public void lock() {
    sync.acquire(1);  //調(diào)用同步器的acquire方法 
}

這里同步器由ReentrantReadWriteLock來(lái)繼承AQS后具體實(shí)現(xiàn),不同的鎖對(duì)AQS的實(shí)現(xiàn)是不一樣的综慎。

public final void acquire(int arg) {
    if (!tryAcquire(arg) &&  //嘗試獲取鎖失敗
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) //并且節(jié)點(diǎn)入隊(duì)列成功
        selfInterrupt();   //線程中斷
}

整個(gè)acquire方法其實(shí)就是3部曲涣仿,①嘗試獲取鎖 ②如果獲取鎖沒(méi)有成功,則構(gòu)造節(jié)點(diǎn)加入等待隊(duì)列中 ③如果節(jié)點(diǎn)入隊(duì)列成功示惊,則線程自我中斷讓出資源好港。當(dāng)線程進(jìn)入同步隊(duì)列中后就實(shí)現(xiàn)了獲取鎖過(guò)程中的線程阻塞功能了,因?yàn)檫@個(gè)時(shí)候線程是自我中斷狀態(tài)米罚。

protected boolean tryAcquire(int arg) {
    throw new UnsupportedOperationException();
}

我們找到tryAcquire方法發(fā)現(xiàn)钧汹,它直接拋出了一個(gè)異常。這里其實(shí)是AQS用了模板模式录择,將tryAcquire的具體實(shí)現(xiàn)預(yù)留給各種鎖來(lái)實(shí)現(xiàn)拔莱,例如單純的寫(xiě)鎖只用判斷state是否為0,而讀寫(xiě)鎖的實(shí)現(xiàn)是要先取state的高16或者低16位來(lái)判斷隘竭。所以不同的鎖對(duì)tryAcquire的實(shí)現(xiàn)不同塘秦,但是不管如何實(shí)現(xiàn),由于state是共享變量动看,所以各個(gè)實(shí)現(xiàn)一定會(huì)保證state的線程安全尊剔。

3.1.1 線程加入同步隊(duì)列

當(dāng)獲取鎖失敗時(shí),就開(kāi)始構(gòu)造節(jié)點(diǎn)入隊(duì)列了菱皆。

//在寫(xiě)鎖的情況下须误,這里mode為Node.EXCLUSIVE,其實(shí)就是null
private Node addWaiter(Node mode) {
    Node node = new Node(Thread.currentThread(), mode);
    // Try the fast path of enq; backup to full enq on failure
    //在enq之前的這段代碼是一個(gè)快讀入隊(duì)列的實(shí)現(xiàn)仇轻,如果入隊(duì)不成功則交由enq來(lái)實(shí)現(xiàn)
    Node pred = tail; //緩存tail節(jié)點(diǎn)京痢,因?yàn)閺年?duì)尾加入節(jié)點(diǎn),所以理論上tail就是前繼節(jié)點(diǎn)
    if (pred != null) {
        node.prev = pred;
        if (compareAndSetTail(pred, node)) {  //當(dāng)隊(duì)尾不等于空篷店,則用CAS方法修改tail節(jié)點(diǎn)祭椰,讓其指向本次的構(gòu)造的新節(jié)點(diǎn)。
            pred.next = node; //如果CAS成功則說(shuō)明入隊(duì)成功船庇,然后將前繼節(jié)點(diǎn)的next指向新節(jié)點(diǎn)吭产。
            return node;
        }
    }
    //如果隊(duì)列為空,或者新節(jié)點(diǎn)入隊(duì)列失敗鸭轮,則交給enq處理臣淤。
    enq(node);
    return node;
}

從上面的代碼我們能看出AQS中一個(gè)很重要的邏輯:入隊(duì)列,一定是優(yōu)先保證tail的重新指向窃爷,然后才是前節(jié)點(diǎn)的next指向新節(jié)點(diǎn)邑蒋。如果拿分布式系統(tǒng)數(shù)據(jù)一致性舉例的話就是:tail是數(shù)據(jù)強(qiáng)一致性姓蜂,而next是最終一致,這一點(diǎn)在后面的邏輯判斷中很重要医吊。

//自旋的入隊(duì)列方法
private Node enq(final Node node) {
    for (;;) {
        Node t = tail;
        if (t == null) { // Must initialize  //隊(duì)列為空
            if (compareAndSetHead(new Node())) //構(gòu)造空節(jié)點(diǎn)钱慢,CAS改變頭節(jié)點(diǎn)
                tail = head;  //將tail指向新節(jié)點(diǎn)
        } else {
            node.prev = t;
            if (compareAndSetTail(t, node)) {  //隊(duì)列不為空,則將tail指向新節(jié)點(diǎn)
                t.next = node; //前節(jié)點(diǎn)的next指向新節(jié)點(diǎn)
                return t;
            }
        }
    }
}

這里隊(duì)列不為空的情況與addWaiter的快速入隊(duì)邏輯是一樣的沒(méi)什么好說(shuō)的卿堂,這里的重點(diǎn)是隊(duì)列等于空的時(shí)候束莫,我們可以看到代碼構(gòu)造了一個(gè)不包含任何線程的“空”節(jié)點(diǎn),然后將head指向它草描,這里有兩個(gè)問(wèn)題: 1.為什么要構(gòu)造這個(gè)空節(jié)點(diǎn)览绿,直接用新節(jié)點(diǎn)作為頭節(jié)點(diǎn)不好么? 2.為什么隊(duì)列為空的時(shí)候是先修改head而不是像隊(duì)列不為空的時(shí)候那樣直接修改tail呢穗慕。
這里先保留這兩個(gè)問(wèn)題饿敲,后續(xù)再來(lái)解答。
這一節(jié)主要記住的一個(gè)核心點(diǎn)就是:隊(duì)列是否為空通過(guò)tail判斷逛绵,一個(gè)節(jié)點(diǎn)加入隊(duì)列分為兩步怀各,第一步是強(qiáng)一致性的修改tail(節(jié)點(diǎn)的pre已經(jīng)指向的前節(jié)點(diǎn)),下一步再是修改前節(jié)點(diǎn)的next术浪。

3.1.2瓢对、進(jìn)入同步隊(duì)列后繼續(xù)嘗試獲取鎖或者睡眠

線程被構(gòu)造為節(jié)點(diǎn)進(jìn)入隊(duì)列后,接下來(lái)就是對(duì)隊(duì)列中的節(jié)點(diǎn)進(jìn)行獲取鎖的處理:

//獨(dú)占模式并且不可中斷的為同步隊(duì)列中的線程獲取鎖
final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;  //中斷標(biāo)志
        for (;;) {
            final Node p = node.predecessor();  //節(jié)點(diǎn)的前繼節(jié)點(diǎn)
            if (p == head && tryAcquire(arg)) { //如果前節(jié)點(diǎn)等于頭節(jié)點(diǎn)胰苏,則說(shuō)明當(dāng)前節(jié)點(diǎn)應(yīng)該獲取鎖
                setHead(node); //如果獲取鎖成功沥曹,則將頭節(jié)點(diǎn)指向獲取鎖成功的節(jié)點(diǎn),并清空該節(jié)點(diǎn)的thread和pre碟联,讓該節(jié)點(diǎn)變成新的哨兵
                p.next = null; // help GC
                failed = false;
                return interrupted; //返回中斷狀態(tài)
            }
            //如果獲取鎖失敗,或者當(dāng)前節(jié)點(diǎn)不是最靠前的非哨兵節(jié)點(diǎn)僵腺,則嘗試將線程park
            if (shouldParkAfterFailedAcquire(p, node) &&  //判斷當(dāng)前節(jié)點(diǎn)是否可以park鲤孵,如果可以則嘗試park
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed) 
            cancelAcquire(node);  //如果獲取鎖失敗,取消競(jìng)爭(zhēng)鎖
    }
}

//判斷當(dāng)前節(jié)點(diǎn)是否可以睡眠(park)
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    int ws = pred.waitStatus;
    if (ws == Node.SIGNAL) //如果前繼節(jié)點(diǎn)的waitStatus等于Node.SIGNAL辰如,則前節(jié)點(diǎn)會(huì)負(fù)責(zé)喚醒當(dāng)前節(jié)點(diǎn)普监, 當(dāng)前節(jié)點(diǎn)可以park
        return true;
        
    if (ws > 0) { //如果前節(jié)點(diǎn)已經(jīng)取消(被中斷或超時(shí))
        /*
         * Predecessor was cancelled. Skip over predecessors and
         * indicate retry.
         */
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);  //直到遍歷到非取消的前節(jié)點(diǎn)為止
        pred.next = node;//將遍歷到的前節(jié)點(diǎn)next指向node節(jié)點(diǎn)
    } else {
        /*
         * waitStatus must be 0 or PROPAGATE.  Indicate that we
         * need a signal, but don't park yet.  Caller will need to
         * retry to make sure it cannot acquire before parking.
         */
         //將前節(jié)點(diǎn)waitStatus更新為Node.SIGNAL,表示后繼節(jié)點(diǎn)需要unpark
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}

//park當(dāng)前線程琉兜,并返回當(dāng)前線程的中斷狀態(tài)
private final boolean parkAndCheckInterrupt() {
    LockSupport.park(this);
    return Thread.interrupted();
}

看到這里凯正,我們來(lái)說(shuō)一下剛才遺留的那個(gè)問(wèn)題:為什么要增加一個(gè)哨兵節(jié)點(diǎn)。如果沒(méi)有哨兵豌蟋,如上圖廊散,這個(gè)時(shí)候該節(jié)點(diǎn)獲取到鎖,這個(gè)時(shí)候需要做兩件事情梧疲,先判斷head是否等于tail允睹,然后需要將head指向當(dāng)前的next运准,其實(shí)就是指向null,tail也需要指向null缭受,這個(gè)時(shí)候胁澳,如果有另外一個(gè)線程正在入隊(duì)列,需要將head或者tail指向這個(gè)入隊(duì)列新節(jié)點(diǎn)米者,這個(gè)時(shí)候如果入隊(duì)節(jié)點(diǎn)剛完成tail以及pre的指向韭畸,還沒(méi)來(lái)及更改前節(jié)點(diǎn)的next,這時(shí)出隊(duì)列這邊則會(huì)將head指向null蔓搞,tail也會(huì)被指向null胰丁,當(dāng)然這里如果在將tail指向null之前進(jìn)行一次當(dāng)前tail與老tail的對(duì)比, 如果一致則更新為null败明,不一致則不更新能避免tail指向null隘马,但是head這邊就顯得比較麻煩了,每次獲取鎖成功要重新設(shè)置head都需要從隊(duì)尾向?qū)︻^遍歷妻顶,以防止有新的節(jié)點(diǎn)放入酸员,直到遍歷到節(jié)點(diǎn)中thead為空的(獲取鎖已出隊(duì)列)節(jié)點(diǎn)的后一個(gè)節(jié)點(diǎn)為止。 這樣的處理顯得相當(dāng)?shù)穆闊?/p>

無(wú)哨兵的情況

無(wú)哨兵節(jié)點(diǎn)獲取到鎖后修改head和tail的過(guò)程

第二個(gè)問(wèn)題是為什么要先修改head而不能直接修改tail讳嘱,先看有哨兵的情況幔嗦,我改寫(xiě)了一下enq的方法:

private Node enq(final Node node) {
    for (;;) {
        Node t = tail;
        if (t == null) { // Must initialize  //隊(duì)列為空
            Node n = new Node();
            if (compareAndSetTail(n)) //構(gòu)造空節(jié)點(diǎn),CAS改變頭節(jié)點(diǎn)
                head = n;
        } else {
            node.prev = t;
            if (compareAndSetTail(t, node)) {  //隊(duì)列不為空沥潭,則將tail指向新節(jié)點(diǎn)
                t.next = node; //前節(jié)點(diǎn)的next指向新節(jié)點(diǎn)
                return t;
            }
        }
    }
}

這樣的處理方式我認(rèn)為它是線程安全的邀泉。在沒(méi)有哨兵的情況下就是把n換成node,邏輯是一樣的钝鸽。

3.1.3 獲取鎖或者睡眠過(guò)程中線程被中斷(取消獲取鎖)

我們?cè)賮?lái)看看取消獲取鎖做了什么事情:

//取消獲取鎖
private void cancelAcquire(Node node) {
    // Ignore if node doesn't exist
    if (node == null)  //空判斷
        return;

    node.thread = null; //取消節(jié)點(diǎn)對(duì)線程的引用

    //將node節(jié)點(diǎn)往前繼節(jié)點(diǎn)方向所有連續(xù)的取消狀態(tài)的節(jié)點(diǎn)出隊(duì)列
    // Skip cancelled predecessors
    Node pred = node.prev;
    while (pred.waitStatus > 0)
        node.prev = pred = pred.prev;

    // predNext is the apparent node to unsplice. CASes below will
    // fail if not, in which case, we lost race vs another cancel
    // or signal, so no further action is necessary.
    Node predNext = pred.next;

    // Can use unconditional write instead of CAS here.
    // After this atomic step, other Nodes can skip past us.
    // Before, we are free of interference from other threads.
    node.waitStatus = Node.CANCELLED;  //修改node狀態(tài)

    // If we are the tail, remove ourselves.
    //如果當(dāng)前節(jié)點(diǎn)是隊(duì)尾汇恤,則將當(dāng)前節(jié)點(diǎn)移除隊(duì)列,這個(gè)時(shí)候就不用設(shè)置前節(jié)點(diǎn)狀態(tài)
    if (node == tail && compareAndSetTail(node, pred)) {
        compareAndSetNext(pred, predNext, null); //將前節(jié)點(diǎn)(這個(gè)時(shí)候已經(jīng)是隊(duì)尾節(jié)點(diǎn))的next指向null拔恰,這里不保證它一定會(huì)成功因谎,因?yàn)榭赡苡衅渌鹿?jié)點(diǎn)加入,用CAS方式避免將覆蓋其它線程的操作颜懊。
    } else { 
        // If successor needs signal, try to set pred's next-link
        // so it will get one. Otherwise wake it up to propagate.
        int ws;
        if (pred != head &&  //如果當(dāng)前節(jié)點(diǎn)不是隊(duì)尾财岔,也不是最靠前的節(jié)點(diǎn)
            ((ws = pred.waitStatus) == Node.SIGNAL ||  //前節(jié)點(diǎn)狀態(tài)已經(jīng)為Node.SIGNAL  或者 將前節(jié)點(diǎn)狀態(tài)更改為Node.SIGNAL成功
             (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) && 
            pred.thread != null) { //并且前節(jié)點(diǎn)還未出隊(duì)列
            
            Node next = node.next;
            if (next != null && next.waitStatus <= 0)  //當(dāng)前節(jié)點(diǎn)的next節(jié)點(diǎn)存在并且沒(méi)有取消
                compareAndSetNext(pred, predNext, next);//則將前節(jié)點(diǎn)的next指向node節(jié)點(diǎn)的next
        } else { 
            unparkSuccessor(node);  //喚醒后繼節(jié)點(diǎn)  //node為頭節(jié)點(diǎn),或者更新前節(jié)點(diǎn)狀態(tài)為SIGNAL失敽拥(前節(jié)點(diǎn)就沒(méi)辦法自動(dòng)喚醒后節(jié)點(diǎn)了)匠璧,或者前節(jié)點(diǎn)thead==null(前節(jié)點(diǎn)已取消)
        }

        node.next = node; // help GC 后繼節(jié)點(diǎn)指向自己,去除引用咸这,幫助GC
    }
}

其主要邏輯如下圖:


當(dāng)前節(jié)點(diǎn)是尾節(jié)點(diǎn)的情況

尾節(jié)點(diǎn)直接出隊(duì)列夷恍,tail指向pred節(jié)點(diǎn)。

當(dāng)前節(jié)點(diǎn)非首節(jié)點(diǎn)也非尾節(jié)點(diǎn)

這兩個(gè)圖相同的邏輯就在于將當(dāng)前節(jié)點(diǎn)狀態(tài)修改為取消媳维,然后thead置空裁厅,最后出隊(duì)列冰沙,但是這里沒(méi)有重新設(shè)置當(dāng)前節(jié)點(diǎn)的prev指針,如果從tail向前遍歷执虹,還是能遍歷到node節(jié)點(diǎn)拓挥。

3.1.4 需要喚醒后繼節(jié)點(diǎn)的情況

我們?cè)賮?lái)看下,當(dāng)前節(jié)點(diǎn)狀態(tài)更新為SIGNAL失敶(這是哪種情況)侥啤,或者當(dāng)前節(jié)點(diǎn)就是首節(jié)點(diǎn)時(shí),喚醒后繼節(jié)點(diǎn)是如何操作的:

//喚醒node節(jié)點(diǎn)的后繼節(jié)點(diǎn)
private void unparkSuccessor(Node node) {
    /*
     * If status is negative (i.e., possibly needing signal) try
     * to clear in anticipation of signalling.  It is OK if this
     * fails or if status is changed by waiting thread.
     */
     //在喚醒后繼之前茬故,先將node節(jié)點(diǎn)狀態(tài)改為0
    int ws = node.waitStatus;
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0);

    /*
     * Thread to unpark is held in successor, which is normally
     * just the next node.  But if cancelled or apparently null,
     * traverse backwards from tail to find the actual
     * non-cancelled successor.
     */
    //如果node的后繼節(jié)點(diǎn)被取消盖灸,則從隊(duì)尾向node節(jié)點(diǎn)遍歷,找到距離node節(jié)點(diǎn)最近的waitStatus<=0的節(jié)點(diǎn)磺芭,然后喚醒s節(jié)點(diǎn)
    Node s = node.next;
    if (s == null || s.waitStatus > 0) {
        s = null;
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    if (s != null)
        LockSupport.unpark(s.thread);
}

這里有一個(gè)問(wèn)題就是取消當(dāng)前節(jié)點(diǎn)獲取鎖赁炎,然后要喚醒后繼節(jié)點(diǎn)的時(shí)機(jī),為什么需要喚醒后繼節(jié)點(diǎn)钾腺?
我們?cè)倏聪逻@段代碼:

if (pred != head &&  //如果當(dāng)前節(jié)點(diǎn)不是隊(duì)尾徙垫,也不是最靠前的節(jié)點(diǎn)
            ((ws = pred.waitStatus) == Node.SIGNAL ||  //前節(jié)點(diǎn)狀態(tài)已經(jīng)為Node.SIGNAL  或者 將前節(jié)點(diǎn)狀態(tài)更改為Node.SIGNAL成功
             (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) && 
            pred.thread != null) {
 ……..
} else { 
            unparkSuccessor(node);  
}

其中if判斷為false的情況有:

  1. pred == head : 前節(jié)點(diǎn)為head節(jié)點(diǎn),說(shuō)明當(dāng)前節(jié)點(diǎn)為第一個(gè)有效節(jié)點(diǎn)放棒,如果當(dāng)前節(jié)點(diǎn)被中斷了姻报,head節(jié)點(diǎn)在喚醒后繼節(jié)點(diǎn)時(shí)可能會(huì)找到老節(jié)點(diǎn)(當(dāng)前移除的節(jié)點(diǎn)),所以需要手動(dòng)喚醒node的后繼節(jié)點(diǎn)间螟。

  2. pred != head的情況吴旋,pred.waitStatus != Node.SIGNAL 并且ws<=0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL) 為false :即node的前繼節(jié)點(diǎn)非head的情況,前繼節(jié)點(diǎn)狀態(tài)不等于SIGNAL (ws>0即等于CANDLED厢破,或者ws=[0, CONDITION, PROPAGATE])但是更新?tīng)顟B(tài)為SIGNAL失敗荣瑟,這里ws>0是讓后繼節(jié)點(diǎn)醒來(lái),在acquireQueued方法中重新判斷是否獲取鎖還是睡眠摩泪。其它情況將狀態(tài)更新為SIGNAL失敗褂傀,則說(shuō)明前節(jié)點(diǎn)剛好完成鎖釋放并執(zhí)行了unparkSuccessor,在該方法中加勤,需要將當(dāng)前釋放鎖節(jié)點(diǎn)的狀態(tài)更新為0

  3. 當(dāng)pred!=head,前繼節(jié)點(diǎn)狀態(tài)也等于SIGNAL的時(shí)候,pred.thread == null, 這種情況唯有前繼節(jié)點(diǎn)為取消狀態(tài)線程才會(huì)為空同波,如果前繼節(jié)點(diǎn)已經(jīng)取消鳄梅,則應(yīng)該喚醒后繼節(jié)點(diǎn),重新處理acquireQueued未檩。

3.2 釋放寫(xiě)鎖

這里還是從ReentrantReadWriteLock可重入讀寫(xiě)鎖入手戴尸,查看它的釋放寫(xiě)鎖的實(shí)現(xiàn):

public void unlock() {
    sync.release(1);
}

可以看到,通過(guò)同步器調(diào)用release方法來(lái)達(dá)到釋放鎖的目的冤狡。
重點(diǎn)看一下AQS中release方法的實(shí)現(xiàn):

public final boolean release(int arg) {
    if (tryRelease(arg)) {//嘗試釋放鎖(共享變量)
        Node h = head;
        if (h != null && h.waitStatus != 0) //釋放鎖成功后孙蒙,喚醒同步隊(duì)列中下一個(gè)需要獲取鎖的節(jié)點(diǎn)
            unparkSuccessor(h);
        return true;
    }
    return false;
}

這里tryRelease由具體的鎖來(lái)實(shí)現(xiàn)项棠。

4、獲取讀鎖(共享鎖)

還是以ReentrantReadWriteLock鎖的實(shí)現(xiàn)為例挎峦,首先看下獲取讀鎖的代碼入口:

public void lock() {
    sync.acquireShared(1);  
}

很明顯也是調(diào)用同步器來(lái)實(shí)現(xiàn)的香追,這里主要關(guān)注acquireShared。

public final void acquireShared(int arg) {
    if (tryAcquireShared(arg) < 0)
        doAcquireShared(arg);
}

整體邏輯與獲取寫(xiě)鎖類(lèi)似坦胶,首先tryAcquireShared透典,該方法的具體實(shí)現(xiàn)邏輯有鎖實(shí)現(xiàn)來(lái)決定,在tryAcquireShared中顿苇,鎖的實(shí)現(xiàn)會(huì)根據(jù)state來(lái)判斷峭咒,如果當(dāng)前已經(jīng)加了寫(xiě)鎖,則不能加讀鎖纪岁,如果當(dāng)前有其它線程獲取的讀鎖凑队,則本次同樣能加讀鎖,并且會(huì)判斷是否同一個(gè)線程幔翰,如果是漩氨,則重入次數(shù)加1。 如果獲取鎖不成功导匣,則進(jìn)入doAcquireShared方法才菠。

private void doAcquireShared(int arg) {
    final Node node = addWaiter(Node.SHARED); //構(gòu)造同步隊(duì)列節(jié)點(diǎn)并加入隊(duì)列
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();
            if (p == head) {
                int r = tryAcquireShared(arg); //如果前節(jié)點(diǎn)已經(jīng)是head節(jié)點(diǎn),則繼續(xù)嘗試tryAcquireShared
                if (r >= 0) { //獲取讀鎖成功
                    setHeadAndPropagate(node, r);  //獲取鎖成功贡定,并且喚醒后續(xù)需要獲取共享鎖的節(jié)點(diǎn)
                    p.next = null; // help GC
                    if (interrupted)  //線程unpark后赋访,會(huì)判斷線程的中斷狀態(tài),如果線程已經(jīng)被中斷缓待,這里繼承中斷狀態(tài)
                        selfInterrupt();
                    failed = false;
                    return;
                }
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);  //獲取共享鎖過(guò)程中的任何異常都需要取消該節(jié)點(diǎn)繼續(xù)獲取鎖蚓耽,詳情建寫(xiě)鎖的取消邏輯。
    }
}

共享鎖的核心邏輯在于setHeadAndPropagate方法旋炒,該方法中實(shí)現(xiàn)了共享鎖獲取鎖的冒泡步悠。

private void setHeadAndPropagate(Node node, int propagate) {
    Node h = head; // Record old head for check below
    setHead(node);  //把當(dāng)前節(jié)點(diǎn)變成頭節(jié)點(diǎn), 并且將thread引用置空瘫镇,prev置空

    if (propagate > 0 || 
            h == null || 
     h.waitStatus < 0 ||
   (h = head) == null ||
        h.waitStatus < 0) { //喚醒后繼節(jié)點(diǎn)中所有的共享模式的等待者
        
        Node s = node.next;
        if (s == null || s.isShared())  //后繼節(jié)點(diǎn)有可能正在加入隊(duì)列(在addWaiter中是cas保證tail成功鼎兽,然后再設(shè)置的next引用),或者后繼節(jié)點(diǎn)是共享模式铣除,都需要嘗試喚醒后繼節(jié)點(diǎn)谚咬。
            doReleaseShared();  //這里不是釋放鎖, 是喚醒后繼節(jié)點(diǎn)尚粘。
    //對(duì)于共享模式而言择卦,前者獲取到共享鎖后,需要喚醒后繼的共享鎖等待者;這與前繼節(jié)點(diǎn)釋放鎖后需要喚醒后繼節(jié)點(diǎn)邏輯一致秉继,所以作者把通通的共享模式下喚醒后繼節(jié)點(diǎn)的行為封裝為了一個(gè)方法祈噪。
    }
}

從上面代碼可以看出,判斷是否需要繼續(xù)往后傳遞獲取鎖的行為取決于緊鄰的后繼節(jié)點(diǎn)的模式是否為Shared尚辑。如果為Shared模式辑鲤,則需要向后傳遞獲取鎖的行為,具體邏輯看doReleaseShared的實(shí)現(xiàn)腌巾。

private void doReleaseShared() {
    for (;;) {
        Node h = head;
        
        if (h != null && h != tail) { // 有至少一個(gè)后繼節(jié)點(diǎn)
            int ws = h.waitStatus;
            if (ws == Node.SIGNAL) { //后繼節(jié)點(diǎn)需要喚醒
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))  //這里為什么一定需要CAS成功才能喚醒后繼節(jié)點(diǎn)遂填,因?yàn)樵诠蚕砟J较拢罄^節(jié)點(diǎn)可能被喚醒后很快釋放鎖澈蝙,如果前節(jié)點(diǎn)狀態(tài)還是為SIGNAL,則等它釋放鎖時(shí)需要去喚醒后繼節(jié)點(diǎn)吓坚,此時(shí)它的后繼已經(jīng)釋放鎖了,這里就會(huì)有問(wèn)題灯荧,其實(shí)就是并發(fā)的問(wèn)題礁击。
                    continue;            // loop to recheck cases
                unparkSuccessor(h);
            }
            else if (ws == 0 &&
                     !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) //如果節(jié)點(diǎn)狀態(tài)已經(jīng)為0,則說(shuō)明要么節(jié)點(diǎn)已經(jīng)執(zhí)行了unparkSuccessor逗载,或者沒(méi)有后繼節(jié)點(diǎn)了哆窿。
                continue;                // loop on failed CAS
        }
        
        //如果在執(zhí)行以上動(dòng)作中,有新節(jié)點(diǎn)獲取到鎖(誰(shuí)獲取到鎖厉斟,head指向誰(shuí))挚躯,從新嘗試喚醒新節(jié)點(diǎn)的后繼節(jié)點(diǎn)。
        if (h == head)                   // loop if head changed  
            break;
    }
}

doReleaseShared方法是一個(gè)自旋方法擦秽,首先判斷是否還有后繼節(jié)點(diǎn)if (h != null && h != tail)码荔,如果有后繼節(jié)點(diǎn),拿到當(dāng)前頭節(jié)點(diǎn)狀態(tài)感挥,如果頭節(jié)點(diǎn)狀態(tài)為SIGNAL缩搅,則需要喚醒后繼節(jié)點(diǎn),這里后繼節(jié)點(diǎn)可能是共享節(jié)點(diǎn)触幼,也可能是一個(gè)獨(dú)占節(jié)點(diǎn)(才掛上來(lái)的)硼瓣,我們注意到代碼在執(zhí)行喚醒后繼節(jié)點(diǎn)unparkSuccessor之前,先執(zhí)行了if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))置谦,這里的邏輯顯然于獨(dú)占模式釋放鎖喚醒后繼節(jié)點(diǎn)或者共享模式釋放鎖的邏輯不太一樣堂鲤,這里是強(qiáng)制優(yōu)先講頭節(jié)點(diǎn)狀態(tài)設(shè)置0,只有設(shè)置為0成功的線程才能執(zhí)行喚醒下一個(gè)節(jié)點(diǎn)的操作媒峡。這里之所以要這樣設(shè)計(jì)瘟栖,是因?yàn)楣蚕砟J较拢淮吾尫沛i的線程可能有多個(gè)丝蹭,甚至存在有的共享節(jié)點(diǎn)在釋放鎖,有的共享節(jié)點(diǎn)卻在獲取鎖,但是不管怎么樣奔穿,他們都會(huì)并發(fā)的調(diào)用doReleaseShared方法镜沽,如果這里不加以控制,則會(huì)重復(fù)的調(diào)用unparkSuccessor贱田,最后會(huì)重復(fù)的調(diào)用unpark方法(這里我還沒(méi)明白如果重復(fù)調(diào)用unpark會(huì)有什么問(wèn)題)缅茉。
那么剛才沒(méi)有compareAndSetWaitStatus(h, Node.SIGNAL, 0)成功的節(jié)點(diǎn)會(huì)再次循環(huán),再次進(jìn)來(lái)時(shí)發(fā)現(xiàn)此時(shí)頭節(jié)點(diǎn)狀態(tài)為0(因?yàn)槠渌€程執(zhí)行了喚醒操作)男摧,則講狀態(tài)更新為PROPAGATE蔬墩,這里重點(diǎn)說(shuō)下這個(gè)邏輯,我翻遍了所有的代碼都沒(méi)有發(fā)現(xiàn)哪里有指定將PROPAGATE變更為其它狀態(tài)的邏輯耗拓,所以我不指定作者這里設(shè)計(jì)一個(gè)這個(gè)狀態(tài)有何用拇颅,我認(rèn)為更不需要compareAndSetWaitStatus(h, 0, Node.PROPAGATE)這段邏輯,當(dāng)其線程再次進(jìn)入判斷發(fā)現(xiàn)頭節(jié)點(diǎn)狀態(tài)不為SIGNAL時(shí)乔询,直接結(jié)束樟插。這種情況下再來(lái)看剛才的喚醒操作,當(dāng)喚醒后的節(jié)點(diǎn)重新嘗試獲取鎖失敗時(shí)竿刁,它又會(huì)執(zhí)行shouldParkAfterFailedAcquire方法黄锤,然后將前節(jié)點(diǎn)更改為SIGNAL,如果喚醒后的節(jié)點(diǎn)獲取鎖成功食拜,則head節(jié)點(diǎn)指向該節(jié)點(diǎn)鸵熟,后續(xù)的喚醒操作只與該節(jié)點(diǎn)有關(guān)。所以真心不明白為什么要有段設(shè)置為PROPAGATE的邏輯负甸,主要是代碼中任何地方看不到使用了PROPAGATE的這個(gè)邏輯流强。
這里再說(shuō)下自旋,退出自旋的條件是if (h == head)惑惶,即在處理過(guò)程中沒(méi)有新的節(jié)點(diǎn)獲取到鎖煮盼,用反證法,假設(shè)有新的節(jié)點(diǎn)獲取到鎖带污,這種情況下如果退出了方法會(huì)有問(wèn)題么 僵控? 我認(rèn)為沒(méi)有問(wèn)題,因?yàn)楂@取到鎖的節(jié)點(diǎn)最終會(huì)釋放鎖鱼冀,釋放鎖的動(dòng)作又會(huì)喚起后繼節(jié)點(diǎn)报破,所以為什么要自旋呢?
這一段如果有看明白作者意圖的還請(qǐng)指點(diǎn)一二千绪。

這里我們可以來(lái)看下整個(gè)node節(jié)點(diǎn)的狀態(tài)變化:

node節(jié)點(diǎn)狀態(tài)變化

通過(guò)上圖觀察充易,所有從0變到PROPAGATE的,最后都會(huì)因?yàn)樾录尤牍?jié)點(diǎn)或者節(jié)點(diǎn)取消而變稱SIGNAL荸型,而SIGNAL最終又都會(huì)因?yàn)樾枰獑酒鸷罄^節(jié)點(diǎn)而將當(dāng)前節(jié)點(diǎn)更新為0盹靴,這形成了一個(gè)完整的閉環(huán),那為什么不是直接0變?yōu)镾IGNAL,不知道PROPAGATE在中間起到什么作用稿静。

5梭冠、釋放讀鎖

相較于獲取讀鎖,釋放的過(guò)程就比較簡(jiǎn)單了

public void unlock() {
    sync.releaseShared(1); //調(diào)用同步器的releaseShared
}

public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {
        doReleaseShared();
        return true;
    }
    return false;
}

可以看到改备, 在嘗試獲取鎖成功后控漠,直接調(diào)用doReleaseShared喚醒后繼節(jié)點(diǎn)。這里的邏輯就與共享鎖獲取鎖后需要繼續(xù)喚醒緊鄰的后繼共享節(jié)點(diǎn)一樣悬钳。
還有從這里就可以看出盐捷,doReleaseShared方法是被多個(gè)線程同時(shí)調(diào)用的,所以在代碼里面unparkSuccessor處需要進(jìn)行并發(fā)的控制默勾。

6碉渡、帶中斷的獲取鎖

不管是讀鎖還是寫(xiě)鎖,在獲取鎖的過(guò)程中還有一類(lèi)獲取方法是在等待鎖的過(guò)程中允許線程被中斷的灾测,方法會(huì)拋出InterruptedException異常爆价。
這里用共享鎖的lockInterruptibly方法舉例:

public void lockInterruptibly() throws InterruptedException {
    sync.acquireSharedInterruptibly(1); //調(diào)用同步器的帶中斷的獲取共享鎖方法
}

public final void acquireSharedInterruptibly(int arg) throws InterruptedException {
    if (Thread.interrupted()) //首先檢查線程的中斷狀態(tài),如果線程已經(jīng)被中斷媳搪,則拋出異常铭段,終止程序
        throw new InterruptedException();
    if (tryAcquireShared(arg) < 0)
        doAcquireSharedInterruptibly(arg);
}

接下來(lái)看下doAcquireSharedInterruptibly的實(shí)現(xiàn):

private void doAcquireSharedInterruptibly(int arg)throws InterruptedException {
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        for (;;) {
            final Node p = node.predecessor();
            if (p == head) {
                int r = tryAcquireShared(arg);
                if (r >= 0) {
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    failed = false;
                    return;
                }
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                throw new InterruptedException();  //線程在等待過(guò)程中被中斷,當(dāng)線程醒來(lái)后判斷到中斷狀態(tài)秦爆,直接拋出中斷異常終止程序
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

doAcquireSharedInterruptibly與doAcquireShared方法相比序愚,唯一的區(qū)別就在于線程被unpark后如何處理中斷的,在doAcquireShared中等限,是將中斷狀態(tài)設(shè)置為一個(gè)boolean標(biāo)志返回給調(diào)用者爸吮,讓調(diào)用者自行決定該如何處理,而在doAcquireSharedInterruptibly中望门,如果發(fā)現(xiàn)線程被中斷形娇,直接拋出中斷異常終止程序。

7筹误、公平/非公平同步器

公平與非公平同步器其實(shí)與AQS本身無(wú)關(guān)桐早,這都是AQS具體的實(shí)現(xiàn),不同的鎖有不同的實(shí)現(xiàn)方式厨剪,但是總體的規(guī)則是一致的哄酝。因?yàn)槭茿QS的實(shí)現(xiàn),我想有必要這里說(shuō)一下祷膳,具體的實(shí)現(xiàn)請(qǐng)參考各個(gè)鎖的實(shí)現(xiàn)陶衅。
公平同步器:
所謂公平,即遵循先到先得的原則直晨,誰(shuí)先到達(dá)隊(duì)列搀军,則誰(shuí)就優(yōu)先獲取鎖膨俐,不得在剛嘗試鎖的時(shí)候不管隊(duì)列中是否有等待中的節(jié)點(diǎn)直接競(jìng)爭(zhēng)鎖。
而所謂不公平其實(shí)就是當(dāng)一個(gè)線程嘗試獲取鎖的時(shí)候罩句,不會(huì)主動(dòng)的去判斷同步隊(duì)列中是否有等待的節(jié)點(diǎn)吟策,直接就去競(jìng)爭(zhēng)鎖,如果競(jìng)爭(zhēng)失敗則進(jìn)入同步隊(duì)列進(jìn)行等待的止。

8、ConditionObject條件等待

在AQS中還有一個(gè)比較重要的類(lèi):ConditionObject着撩,這個(gè)類(lèi)實(shí)現(xiàn)了在基于AQS鎖的情況下對(duì)獲取到鎖的線程進(jìn)行有條件的等待和喚醒诅福,其主要的方法是await和signal以及它們的變種。有很多博客都拿它與Object對(duì)象的wait和notify作比較拖叙,說(shuō)ConditionObject的await和signal運(yùn)用的地方要比wait和notify廣氓润,其實(shí)并不然。它們使用的場(chǎng)景是截然不同的薯鳍,不然的話Doug Lea也不會(huì)費(fèi)這力氣重新造一個(gè)輪子咖气。
我認(rèn)為他們的區(qū)別主要在于,Object的wait和notify其實(shí)是在任何情況下都是可以調(diào)用的挖滤,而ConditionObject的await和signal必須要在基于AQS的鎖環(huán)境下才能調(diào)用崩溪,不然就會(huì)拋出異常(這也是我認(rèn)為它們之間最大的差異);其次斩松,ConditionObject是專(zhuān)門(mén)為AQS服務(wù)的伶唯,它的節(jié)點(diǎn)的構(gòu)造,狀態(tài)的標(biāo)志等都與AQS有關(guān)惧盹,在wait操作和notify操作時(shí)都需要去操作AQS的同步隊(duì)列乳幸。
所以綜上所述,ConditionObject是專(zhuān)門(mén)為AQS服務(wù)的钧椰,而不像有的博客寫(xiě)的它的用途要比Object的實(shí)現(xiàn)要廣粹断。
接下來(lái)看看ConditionObject的具體實(shí)現(xiàn)。

public class ConditionObject implements Condition, java.io.Serializable {
    private static final long serialVersionUID = 1173984872572414699L;
    /** First node of condition queue. */
    private transient Node firstWaiter;
    /** Last node of condition queue. */
    private transient Node lastWaiter;

    /**
     * Creates a new {@code ConditionObject} instance.
     */
    public ConditionObject() { }
    
    /***
    methods
    
    ***/
}

它的核心結(jié)構(gòu)就是有一個(gè)Node的頭節(jié)點(diǎn)和尾節(jié)點(diǎn)嫡霞,然后Node中通過(guò)Node nextWaiter進(jìn)行關(guān)聯(lián)形成了一個(gè)Condition鏈表隊(duì)列瓶埋,后續(xù)所有的操作都是圍繞這個(gè)隊(duì)列和同步隊(duì)列來(lái)進(jìn)行的。

8.1 await方法

public final void await() throws InterruptedException {
    if (Thread.interrupted())  //判斷當(dāng)前線程是否被中斷
        throw new InterruptedException();
        
    Node node = addConditionWaiter();  //①將當(dāng)前線程構(gòu)造為Condition等待節(jié)點(diǎn)并加入隊(duì)列秒际,詳情見(jiàn)addConditionWaiter說(shuō)明
    int savedState = fullyRelease(node);  //②釋放鎖現(xiàn)在全部的狀態(tài)悬赏,鎖可能有重入,所以這里不是直接調(diào)用AQS的release方法娄徊,詳情見(jiàn)fullyRelease說(shuō)明
    
    int interruptMode = 0;  //標(biāo)記線程在await過(guò)程中的中斷狀態(tài)闽颇,0表示未中斷

    while (!isOnSyncQueue(node)) { //判斷node節(jié)點(diǎn)是否在同步隊(duì)列中,只有node節(jié)點(diǎn)進(jìn)入了同步隊(duì)列循環(huán)才會(huì)結(jié)束(即寄锐,被signal了)
        LockSupport.park(this);//如果不在同步隊(duì)列中兵多, 則park當(dāng)前線程
        
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) //③線程被喚醒或者中斷后尖啡,判斷線程的中斷狀態(tài)
            break;  //如果線程被中斷過(guò),則退出循環(huán)
    }
    
//④線程被喚醒后重新獲取鎖奈懒,鎖狀態(tài)恢復(fù)到savedState
    //不管線程是:未中斷,還是signal中斷,singnal后中斷,前面的代碼 都會(huì)保證node節(jié)點(diǎn)進(jìn)入同步隊(duì)列。
//acquireQueued 方法獲取到鎖茂嗓,并且在獲取鎖park的過(guò)程中有被中斷,并且之前在await過(guò)程中入撒,不是被signal之前就中斷的情況,則標(biāo)記后續(xù)處理中斷的情況為interruptMode。
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
    
    //⑤重新獲取到鎖墙贱,把節(jié)點(diǎn)從condition隊(duì)列中去除窍箍,同時(shí)也會(huì)清除被取消的節(jié)點(diǎn)
    if (node.nextWaiter != null) // clean up if cancelled
        unlinkCancelledWaiters(); 
    
    //⑥線程被中斷祷蝌,根據(jù)中斷條件選擇拋出異澈龋或者重新中斷傳遞狀態(tài)
    if (interruptMode != 0) 
        reportInterruptAfterWait(interruptMode);
}
//向Condition隊(duì)列添加等待者
private Node addConditionWaiter() {
    Node t = lastWaiter;
    // If lastWaiter is cancelled, clean out.
    if (t != null && t.waitStatus != Node.CONDITION) { //如果隊(duì)尾節(jié)點(diǎn)已經(jīng)取消,則先清空隊(duì)列中的取消節(jié)點(diǎn)
        unlinkCancelledWaiters();
        t = lastWaiter;
    }
    
    Node node = new Node(Thread.currentThread(), Node.CONDITION); //加入到Condition隊(duì)列中的節(jié)點(diǎn)狀態(tài)都為CONDITION
    if (t == null)
        firstWaiter = node;
    else
        t.nextWaiter = node;
    lastWaiter = node;
    return node;
}

從以上代碼可以看出:condition隊(duì)列中钾埂,節(jié)點(diǎn)是通過(guò)nextWaiter來(lái)形成鏈表的汞贸,隊(duì)列中所有節(jié)點(diǎn)的狀態(tài)為CONDITION

//釋放節(jié)點(diǎn)
final int fullyRelease(Node node) {
    boolean failed = true;
    try {
        int savedState = getState();  //獲取當(dāng)前鎖的狀態(tài)
        if (release(savedState)) { //全部釋放掉當(dāng)前狀態(tài)
            failed = false;
            return savedState;
        } else {
            throw new IllegalMonitorStateException();  //釋放失敗拋出異常
        }
    } finally {
        if (failed)
            node.waitStatus = Node.CANCELLED;
    }
}


public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}

從fullyRelease的實(shí)現(xiàn)可以看出:release方法調(diào)用了tryRelease楣责,如果tryRelease成功,則喚醒后繼節(jié)點(diǎn)秆麸,這里要注意的是如果release不成功初嘹,則會(huì)拋出IllegalMonitorStateException異常,其實(shí)我認(rèn)為這里的else里面拋出異常是多余的沮趣,因?yàn)榫蛅ryRelease的實(shí)現(xiàn)來(lái)看屯烦,如果不是本線程去釋放自己獲得的鎖,tryRelease本身就會(huì)拋出IllegalMonitorStateException異常的房铭,而如果是本線程在釋放鎖驻龟,那一定是在持有鎖的情況下來(lái)釋放鎖的,這種情況一定會(huì)成功的缸匪,所以根本不會(huì)release失敗迅脐,所以代碼怎么都進(jìn)不到else中去。但是可以總結(jié)出的是豪嗽,await等方法一定是要在線程獲取AQS鎖的情況下調(diào)用谴蔑,否則就會(huì)拋出異常。另外龟梦,如果在釋放過(guò)程中線程中斷隐锭,則將節(jié)點(diǎn)設(shè)置為CANCELLED。

//檢查Condition隊(duì)列中節(jié)點(diǎn)在等待過(guò)程中的中斷狀態(tài)
//THROW_IE:表示在signal之前被中斷喚醒
// REINTERRUPT:表示在signal之后有中斷计贰,在singnal之后被通斷钦睡,需要保證singnal的行為最終完成,所以中斷只用延續(xù)狀態(tài)狀態(tài)REINTERRUPT躁倒,不用拋出異常荞怒。
/**
* Checks for interrupt, returning THROW_IE if interrupted
* before signalled, REINTERRUPT if after signalled, or
* 0 if not interrupted.
*/
private int checkInterruptWhileWaiting(Node node) {
    return Thread.interrupted() ?  //當(dāng)前線程中斷狀態(tài)
        (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :  
        0;
}


final boolean transferAfterCancelledWait(Node node) {
    //如果節(jié)點(diǎn)是被signal喚醒洒琢,則狀態(tài)會(huì)被更新為0,然后入同步隊(duì)列褐桌,最后才是被unpark衰抑,所以這里如果能CAS成功,則說(shuō)明節(jié)點(diǎn)沒(méi)有被signal荧嵌,所以線程是在await過(guò)程中被中斷的呛踊。所以在這里需要將節(jié)點(diǎn)入隊(duì)列。
    if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {  
        enq(node);  //節(jié)點(diǎn)入同步隊(duì)列
        return true;
    }
    
    /*
     * If we lost out to a signal(), then we can't proceed
     * until it finishes its enq().  Cancelling during an
     * incomplete transfer is both rare and transient, so just
     * spin.
     */
     //如果節(jié)點(diǎn)是被signal喚醒的啦撮,則節(jié)點(diǎn)應(yīng)該會(huì)在同步隊(duì)列中谭网,什么情況下被signal喚醒但是node節(jié)點(diǎn)不在同步隊(duì)列中,而等待一會(huì)兒就在同步隊(duì)列中了赃春,這點(diǎn)確實(shí)沒(méi)想明白愉择。
    while (!isOnSyncQueue(node))
        Thread.yield();
    
    return false;
}

8.2 signal

signal的用途是喚醒調(diào)用await方法后進(jìn)入park的線程。主要代碼如下:

public final void signal() {
    if (!isHeldExclusively())  //判斷當(dāng)前線程是否是鎖的持有者织中,如果不是則拋出異常
        throw new IllegalMonitorStateException();
    Node first = firstWaiter;
    if (first != null) //頭節(jié)點(diǎn)不為空則鏈表不為空
        doSignal(first);
}

下面詳細(xì)看下doSignal的實(shí)現(xiàn):

//喚醒在該條件上等待時(shí)間最長(zhǎng)的且狀態(tài)正常的節(jié)點(diǎn)
private void doSignal(Node first) {
    do {
        if ( (firstWaiter = first.nextWaiter) == null)
            lastWaiter = null;
        first.nextWaiter = null;
    } while (!transferForSignal(first) &&  //循環(huán)從first嘗試將節(jié)點(diǎn)轉(zhuǎn)換為同步隊(duì)列節(jié)點(diǎn)锥涕,直到轉(zhuǎn)換成功或者遍歷完鏈表。
             (first = firstWaiter) != null);
}

final boolean transferForSignal(Node node) {
    /*
     * If cannot change waitStatus, the node has been cancelled.
     */
     //如果CAS失敗抠璃,則說(shuō)明節(jié)點(diǎn)狀態(tài)不為CCONDITION,則返回false繼續(xù)嘗試下一個(gè)節(jié)點(diǎn)脱惰。
    if (!compareAndSetWaitStatus(node, Node.CONDITION, 0)) 
        return false;

    /*
     * Splice onto queue and try to set waitStatus of predecessor to
     * indicate that thread is (probably) waiting. If cancelled or
     * attempt to set waitStatus fails, wake up to resync (in which
     * case the waitStatus can be transiently and harmlessly wrong).
     */
    Node p = enq(node);  //將node節(jié)點(diǎn)加入同步隊(duì)列
    int ws = p.waitStatus;
    if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)) //如果隊(duì)列中前節(jié)點(diǎn)被取消或者CAS前節(jié)點(diǎn)狀態(tài)為SIGNAL失敗搏嗡,手動(dòng)喚醒已經(jīng)進(jìn)入同步隊(duì)列的node節(jié)點(diǎn)
        LockSupport.unpark(node.thread);
    return true;
}

寫(xiě)在最后:看AQS的源碼真的是花了不少的時(shí)間,但是其中任然有一些我認(rèn)為不應(yīng)該這樣寫(xiě)的邏輯拉一,我知道大部分情況是我沒(méi)想到作者所思考的情況采盒,所以還請(qǐng)各位看客輕拍,如果覺(jué)得對(duì)大家有幫助麻煩點(diǎn)個(gè)贊。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末蔚润,一起剝皮案震驚了整個(gè)濱河市磅氨,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌嫡纠,老刑警劉巖烦租,帶你破解...
    沈念sama閱讀 217,657評(píng)論 6 505
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場(chǎng)離奇詭異除盏,居然都是意外死亡叉橱,警方通過(guò)查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,889評(píng)論 3 394
  • 文/潘曉璐 我一進(jìn)店門(mén)者蠕,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)窃祝,“玉大人,你說(shuō)我怎么就攤上這事踱侣》嘈。” “怎么了大磺?”我有些...
    開(kāi)封第一講書(shū)人閱讀 164,057評(píng)論 0 354
  • 文/不壞的土叔 我叫張陵,是天一觀的道長(zhǎng)探膊。 經(jīng)常有香客問(wèn)我杠愧,道長(zhǎng),這世上最難降的妖魔是什么突想? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 58,509評(píng)論 1 293
  • 正文 為了忘掉前任殴蹄,我火速辦了婚禮,結(jié)果婚禮上猾担,老公的妹妹穿的比我還像新娘袭灯。我一直安慰自己,他們只是感情好绑嘹,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,562評(píng)論 6 392
  • 文/花漫 我一把揭開(kāi)白布稽荧。 她就那樣靜靜地躺著,像睡著了一般工腋。 火紅的嫁衣襯著肌膚如雪姨丈。 梳的紋絲不亂的頭發(fā)上趣惠,一...
    開(kāi)封第一講書(shū)人閱讀 51,443評(píng)論 1 302
  • 那天连茧,我揣著相機(jī)與錄音鸽粉,去河邊找鬼郊尝。 笑死费韭,一個(gè)胖子當(dāng)著我的面吹牛掉弛,可吹牛的內(nèi)容都是我干的铃慷。 我是一名探鬼主播考抄,決...
    沈念sama閱讀 40,251評(píng)論 3 418
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼渗勘,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼沐绒!你這毒婦竟也來(lái)了?” 一聲冷哼從身側(cè)響起旺坠,我...
    開(kāi)封第一講書(shū)人閱讀 39,129評(píng)論 0 276
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤乔遮,失蹤者是張志新(化名)和其女友劉穎,沒(méi)想到半個(gè)月后取刃,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體蹋肮,經(jīng)...
    沈念sama閱讀 45,561評(píng)論 1 314
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,779評(píng)論 3 335
  • 正文 我和宋清朗相戀三年璧疗,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了括尸。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 39,902評(píng)論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡病毡,死狀恐怖濒翻,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情,我是刑警寧澤有送,帶...
    沈念sama閱讀 35,621評(píng)論 5 345
  • 正文 年R本政府宣布淌喻,位于F島的核電站,受9級(jí)特大地震影響雀摘,放射性物質(zhì)發(fā)生泄漏裸删。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,220評(píng)論 3 328
  • 文/蒙蒙 一阵赠、第九天 我趴在偏房一處隱蔽的房頂上張望涯塔。 院中可真熱鬧,春花似錦清蚀、人聲如沸匕荸。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 31,838評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)榛搔。三九已至,卻和暖如春东揣,著一層夾襖步出監(jiān)牢的瞬間践惑,已是汗流浹背。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 32,971評(píng)論 1 269
  • 我被黑心中介騙來(lái)泰國(guó)打工嘶卧, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留尔觉,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 48,025評(píng)論 2 370
  • 正文 我出身青樓芥吟,卻偏偏與公主長(zhǎng)得像侦铜,于是被迫代替她去往敵國(guó)和親。 傳聞我的和親對(duì)象是個(gè)殘疾皇子运沦,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,843評(píng)論 2 354