圖解java.util.concurrent源碼(一)AbstractQueuedSynchronizer(AQS)

引言


這個(gè)系列文章打算用圖解的方式記錄了自己閱讀concurrent包的中一些類(lèi)的大概流程辣苏,加深印象腹缩。

JDK版本


我這里依據(jù)的JDK版本如下:

java version "1.8.0_73"
Java(TM) SE Runtime Environment (build 1.8.0_73-b02)
Java HotSpot(TM) 64-Bit Server VM (build 25.73-b02, mixed mode)

如果你的版本和我不同胡控,看到的源碼可能有細(xì)微的不同呜师。

什么是AbstractQueuedSynchronizer


concurrent包下的很多類(lèi)都有一個(gè)叫做Sync的內(nèi)部類(lèi)(比如ReentrantLock框杜,ThreadPoolExecutor等)蝶押,并且很多功能會(huì)委托給這個(gè)內(nèi)部類(lèi)踱蠢,而這個(gè)內(nèi)部類(lèi)實(shí)現(xiàn)了AbstractQueuedSynchronizer(下面簡(jiǎn)稱(chēng)AQS)。

AQS的功能


按照官方文檔的說(shuō)法,通過(guò)AQS可以很方便的實(shí)現(xiàn)一個(gè)自定義的同步器茎截,子類(lèi)只需要通過(guò)重寫(xiě)以下方法來(lái)控制AQS內(nèi)部的一個(gè)叫做state的同步變量:

//獨(dú)占模式獲取鎖與釋放鎖
//返回值表示獲取鎖是否成功
protected boolean tryAcquire(int arg)
//返回值表示釋放鎖是否成功
protected boolean tryRelease(int arg)
//共享模式獲取鎖與釋放鎖
//返回值表示獲得鎖后還剩余的許可數(shù)量
protected int tryAcquireShared(int arg)
//返回值表示釋放鎖是否成功
protected boolean tryReleaseShared(int arg)

獨(dú)占模式與共享模式的含義是:

  • 獨(dú)占模式:資源是互斥的苇侵,一次只能一個(gè)線程獲取鎖
  • 共享模式:資源一次可以由n個(gè)線程同時(shí)使用(n有限)

在重寫(xiě)這些方法時(shí),如果想要使用state同步變量企锌,必須使用AQS內(nèi)部提供的以下方法來(lái)控制:

protected final int getState()
protected final void setState(int newState)
protected final boolean compareAndSetState(int expect, int update)

一個(gè)AQS的使用示例如下(這里用AQS實(shí)現(xiàn)一個(gè)簡(jiǎn)單的不可重入鎖):

public class SimpleLock extends AbstractQueuedSynchronizer {

    @Override
    protected boolean tryAcquire(int unused) {
       //使用compareAndSetState控制AQS中的同步變量
        if (compareAndSetState(0, 1)) {
            setExclusiveOwnerThread(Thread.currentThread());
            return true;
        }
        return false;
    }

    @Override
    protected boolean tryRelease(int unused) {
        setExclusiveOwnerThread(null);
        //使用setState控制AQS中的同步變量
        setState(0);
        return true;
    }

    public void lock()        { acquire(1); }
    public boolean tryLock()  { return tryAcquire(1); }
    public void unlock()      { release(1); }
    public boolean isLocked() { return isHeldExclusively(); }

    /**
    *發(fā)現(xiàn)線程是順序獲得鎖的
    * 因?yàn)锳QS是基于CLH鎖的一個(gè)變種實(shí)現(xiàn)的FIFO調(diào)度
    */
    public static void main(String[] args) throws InterruptedException {
        final SimpleLock lock = new SimpleLock();
        lock.lock();
        for (int i = 0; i < 10; i++) {
            new Thread(new Runnable() {
                @Override
                public void run() {
                    lock.lock();
                    System.out.println(Thread.currentThread().getId() + " acquired the lock!");
                    lock.unlock();
                }
            }).start();
            // 簡(jiǎn)單的讓線程按照f(shuō)or循環(huán)的順序阻塞在lock上
            //目的是讓線程順序啟動(dòng)
            Thread.sleep(100);
        }

        System.out.println("main thread unlock!");
        lock.unlock();
    }

其實(shí)這個(gè)基本上就是ThreadPoolExecutor的內(nèi)部類(lèi)Worker對(duì)AQS的實(shí)現(xiàn)榆浓,后面的文章我再說(shuō)。

CLH鎖


AQS的原理是CLH鎖的一個(gè)變種撕攒,具體怎么變種的后文再說(shuō)陡鹃,這里先說(shuō)一下什么是CLH鎖。
講之前先推薦一本書(shū)--《The Art of Multiprocessor Programming
》抖坪,這本書(shū)對(duì)并發(fā)的概念和各種鎖的設(shè)計(jì)思想介紹得特別清楚萍鲸,目前沒(méi)發(fā)現(xiàn)中文版,文末的參考文獻(xiàn)我附了一個(gè)英文版的下載鏈接擦俐。書(shū)的7.5.2節(jié)介紹了CLH鎖的設(shè)計(jì)思想脊阴,我這里就從書(shū)中摘抄一下只言片語(yǔ)簡(jiǎn)要介紹。
CLH鎖其實(shí)是自旋鎖的一種改良蚯瞧,與一般的自旋鎖不同嘿期,一般的自旋鎖會(huì)將并發(fā)所有的競(jìng)爭(zhēng)集中在一個(gè)標(biāo)志位里,而CLH鎖將競(jìng)爭(zhēng)資源的線程排成一個(gè)隊(duì)列埋合,每個(gè)線程只在前一個(gè)線程的標(biāo)志位上進(jìn)行自旋备徐,當(dāng)頭節(jié)點(diǎn)釋放鎖時(shí),將自己的標(biāo)志位置為false饥悴,這樣后繼線程在自旋時(shí)發(fā)現(xiàn)標(biāo)志位變?yōu)閒alse后坦喘,便能獲得鎖進(jìn)入臨界區(qū)。
CLH隊(duì)列大概看起來(lái)像下面這樣:


CLH鎖原理

CLH鎖的這種設(shè)計(jì)思想叫做Local Spin西设,它能夠最大化地減少CPU緩存的失效次數(shù)“晗常現(xiàn)在的多核CPU一般每一個(gè)物理核都會(huì)有自己的緩存,假如是普通的自旋鎖贷揽,所有CPU核都自旋在一個(gè)標(biāo)志位上棠笑,因?yàn)檫@個(gè)標(biāo)志位競(jìng)爭(zhēng)非常激烈,所以標(biāo)志位經(jīng)常會(huì)變化禽绪,每當(dāng)標(biāo)志位變化時(shí)蓖救,所有CPU的緩存就會(huì)失效,這樣顯然無(wú)法最大程度上利用CPU的緩存印屁,而在CLH鎖的設(shè)計(jì)中循捺,每個(gè)線程只需要在自己的前繼的標(biāo)志位上自旋即可,而前繼的標(biāo)志位僅僅在前繼釋放鎖的時(shí)候會(huì)發(fā)生變化雄人,這樣每個(gè)CPU核就可以一直在自己的本地緩存上自旋(所以稱(chēng)之為L(zhǎng)ocal Spin)而不會(huì)出現(xiàn)頻繁的緩存失效从橘,減少了緩存失效,鎖算法效率自然就提高了。

一個(gè)最標(biāo)準(zhǔn)的CLH鎖的實(shí)現(xiàn)如下:

public class CLHLock implements Lock {
    AtomicReference<QNode> tail = new AtomicReference<QNode>(new QNode());
    ThreadLocal<QNode> myPred;//代表前繼的節(jié)點(diǎn)
    ThreadLocal<QNode> myNode;//代表當(dāng)前線程的節(jié)點(diǎn)
    public CLHLock() {
        tail = new AtomicReference<QNode>(new QNode());
        myNode = new ThreadLocal<QNode>() {
            protected QNode initialValue() {
                return new QNode();
            }
        };
        myPred = new ThreadLocal<QNode>() {
            protected QNode initialValue() {
                return null;
            }
        };
    }
    
    public void lock() {
        QNode qnode = myNode.get();
        qnode.locked = true;
        QNode pred = tail.getAndSet(qnode);
        myPred.set(pred);
        //在前繼節(jié)點(diǎn)的標(biāo)志位上自旋
        while (pred.locked) {}
    }
    
    public void unlock() {
        QNode qnode = myNode.get();
        //將當(dāng)前線程節(jié)點(diǎn)的標(biāo)志位置為false
        qnode.locked = false;
        //此時(shí)代表前繼節(jié)點(diǎn)的QNode對(duì)象已經(jīng)沒(méi)有用了,這里將其復(fù)用
        myNode.set(myPred.get());
    }
}

CLH鎖在大多數(shù)情況下表現(xiàn)都很優(yōu)異恰力,書(shū)中只給了一處例外叉谜,就是CLH鎖不適合用在NUMA體系結(jié)構(gòu)的計(jì)算機(jī)上,在NUMA體系結(jié)構(gòu)的計(jì)算機(jī)上則應(yīng)該使用另外一種同樣是基于隊(duì)列的鎖方法--MCS鎖踩萎,具體什么是MCS鎖這里就不展開(kāi)說(shuō)停局,有興趣的可以去看我推薦的那本書(shū)的7.5.3節(jié)。

至于什么是NUMA體系結(jié)構(gòu)的計(jì)算機(jī)香府,可以看一看這篇文章http://www.cnblogs.com/yubo/archive/2010/04/23/1718810.html

因?yàn)榕cAQS無(wú)關(guān)董栽,我這里就不再多說(shuō)了。

AQS原理概覽


在真正開(kāi)始閱讀源碼之前回还,我先用簡(jiǎn)要地說(shuō)明一下AQS的原理裆泳。
AQS維護(hù)著兩個(gè)隊(duì)列,一個(gè)是由AQS類(lèi)維護(hù)的CLH隊(duì)列(用于運(yùn)行CLH算法)柠硕,另一個(gè)是由AQS的內(nèi)部類(lèi)ConditionObject維護(hù)的Condition隊(duì)列(用于支持線程間的同步工禾,提供await,signal,signalAll方法)。

AQS中維護(hù)的CLH隊(duì)列看起來(lái)大概像這樣:


AQS中的CLH隊(duì)列

具體運(yùn)作看接下來(lái)的源碼詳解蝗柔。

AQS源碼圖解


有了CLH鎖相關(guān)的知識(shí)后闻葵,就可以來(lái)看一看AQS到底是怎么應(yīng)用這一優(yōu)秀的鎖算法的了。

Node內(nèi)部類(lèi)

前面說(shuō)過(guò)癣丧,CLH鎖是基于隊(duì)列的槽畔,隊(duì)列中每個(gè)節(jié)點(diǎn)對(duì)應(yīng)著一個(gè)等待資源的線程,在AQS中這個(gè)節(jié)點(diǎn)對(duì)用這一個(gè)叫做Node的內(nèi)部類(lèi)來(lái)表示胁编,我列舉一下它比較中要的幾個(gè)字段:

  • waitStatus: 等待狀態(tài)厢钧,有以下幾種取值
//代表線程已經(jīng)被取消
static final int CANCELLED = 1;

//代表后續(xù)節(jié)點(diǎn)需要喚醒
static final int SIGNAL = -1;

//代表線程在condition queue中,等待某一條件
static final int CONDITION = -2;

//代表后續(xù)結(jié)點(diǎn)會(huì)傳播喚醒的操作嬉橙,共享模式下起作用
static final int PROPAGATE = -3;
  • prev: CLH隊(duì)列的前繼
  • next: CLH隊(duì)列的后繼
  • nextWaiter: Condition隊(duì)列的后繼
  • thread: 這個(gè)節(jié)點(diǎn)所代表的線程

從這幾個(gè)字段可以看出早直,AQS中維護(hù)著兩個(gè)隊(duì)列(兩個(gè)隊(duì)列都是由Node組成),一個(gè)隊(duì)列就是CLH鎖算法中的那個(gè)隊(duì)列(我將其稱(chēng)之為CLH隊(duì)列)市框,另一個(gè)是Condition隊(duì)列(下文再講Condition隊(duì)列是用來(lái)做什么的)霞扬。

acquire方法

acquire方法是在獨(dú)占模式下用于獲取鎖的,它的主體邏輯我梳理了一下枫振,如下(只梳理了主題邏輯喻圃,所以代碼中一些我認(rèn)為不重要的細(xì)節(jié)就忽略了):


acquire方法流程圖

圖中紅色的節(jié)點(diǎn)代表開(kāi)始和終止的節(jié)點(diǎn),圖中節(jié)點(diǎn)編號(hào)對(duì)應(yīng)代碼如下粪滤。

    public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

這個(gè)節(jié)點(diǎn)對(duì)應(yīng)著acquireQueued方法的第一個(gè)參數(shù)中對(duì)addWaiter的調(diào)用斧拍。
addWaiter方法會(huì)為當(dāng)前線程建立一個(gè)Node節(jié)點(diǎn),并將節(jié)點(diǎn)加入CLH隊(duì)列后返回杖小。
從上面這段代碼我們也可以看出饮焦,在開(kāi)始真正的CLH算法(即acquireQueued方法)之前怕吴,會(huì)先嘗試一下獲得鎖(即由子類(lèi)重寫(xiě)的tryAcquire方法),這樣在競(jìng)爭(zhēng)較小的情況下能夠提升程序的性能县踢。

①結(jié)束之后,剩下的部分便全部在acquireQueued方法中進(jìn)行伟件,這個(gè)方法的代碼如下:

    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

重點(diǎn)在這個(gè)for循環(huán)
對(duì)應(yīng)著②的是for循環(huán)的前兩句:

                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {

如果發(fā)現(xiàn)了自己的前繼已經(jīng)是頭節(jié)點(diǎn)了的話硼啤,則說(shuō)明此時(shí)有獲得鎖的可能,就會(huì)調(diào)用tryAcquire進(jìn)行嘗試斧账。

③是for循環(huán)的終結(jié)狀態(tài)谴返,當(dāng)前面的tryAcquire獲取鎖成功時(shí)會(huì)執(zhí)行如下代碼(863~868行):

                //這里p是前繼節(jié)點(diǎn)
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }

可以看到這里把自己對(duì)應(yīng)的Node設(shè)置成了頭結(jié)點(diǎn),拋棄掉了原本的頭節(jié)點(diǎn)(即前繼出隊(duì))咧织。

當(dāng)嘗試獲取鎖失敗時(shí)嗓袱,當(dāng)前線程就要考慮一下是否要將自己阻塞了,這個(gè)邏輯位于shouldParkAfterFailedAcquire方法中习绢,代碼如下:

    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int ws = pred.waitStatus;
        if (ws == Node.SIGNAL)
            return true;
        if (ws > 0) {
            do {
                node.prev = pred = pred.prev;
                //waitStatus大于0的情況只有CANNCELLED
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }

④就對(duì)應(yīng)這個(gè)該方法的第二行渠抹,判斷前繼的waitStatus是SIGNAL(說(shuō)明前繼已經(jīng)準(zhǔn)備好喚醒后繼節(jié)點(diǎn))。

這里先將⑦闪萄,從⑥中的代碼可以看出梧却,如果前繼的waitStatus是SIGNAL,則shouldParkAfterFailedAcquire直接放回true败去,然后參考一下上層的acquireQueued方法:

                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())

發(fā)現(xiàn)shouldParkAfterFailedAcquire返回true的話就會(huì)執(zhí)行parkAndCheckInterrupt方法放航,parkAndCheckInterrupt方法的內(nèi)容如下:

    private final boolean parkAndCheckInterrupt() {
        LockSupport.park(this);
        return Thread.interrupted();
    }

發(fā)現(xiàn)AQS其實(shí)是調(diào)用concurrent包下的LockSupport來(lái)阻塞線程的。

如果發(fā)現(xiàn)前繼被CANCELLED了圆裕,則會(huì)跳過(guò)前繼广鳍,一直找到第一個(gè)沒(méi)有被CANCELLED的節(jié)點(diǎn)作為自己的前繼,代碼如下(shouldParkAfterFailedAcquire方法的第二個(gè)if判斷):

        if (ws > 0) {
            do {
                node.prev = pred = pred.prev;
                //waitStatus大于0的情況只有CANNCELLED
            } while (pred.waitStatus > 0);
            pred.next = node;
        } 

上面的if語(yǔ)句對(duì)應(yīng)else就是⑤的代碼:

        } else {
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }

當(dāng)前繼節(jié)點(diǎn)的waitStatus為0, PROPAGATE或者CONDITION時(shí)(其實(shí)這里不可能為CONDITION吓妆,這個(gè)是留在Condition隊(duì)列中使用的waitStatus赊时,acquire方法不會(huì)涉及Condition隊(duì)列),則將前繼節(jié)點(diǎn)的waitStatus設(shè)置為SIGNAL耿战。

總體流程

從我畫(huà)得流程圖中可以看出蛋叼,acquire方法在幾次嘗試獲得鎖失敗后成功地將前繼線程的waitStatus設(shè)置為SIGNAL,然后阻塞自己剂陡。之后在某個(gè)時(shí)刻會(huì)被前繼線程喚醒狈涮,然后有經(jīng)過(guò)幾次爭(zhēng)搶后可能會(huì)成功地獲得鎖。

release方法

相比上面的acquire方法鸭栖,release方法可以說(shuō)是非常的簡(jiǎn)單歌馍,它做的就是如果tryRelease成功,就將頭結(jié)點(diǎn)的下一個(gè)節(jié)點(diǎn)對(duì)應(yīng)的線程喚醒:

    public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }

unparkSuccessor方法會(huì)將h節(jié)點(diǎn)的后繼喚醒晕鹊,點(diǎn)開(kāi)這個(gè)方法會(huì)發(fā)現(xiàn)更多細(xì)節(jié)松却,比如暴浦,如果發(fā)現(xiàn)h的后繼節(jié)點(diǎn)為null或者狀態(tài)是CANCELLED時(shí),會(huì)找出離tail最遠(yuǎn)(或者說(shuō)離h節(jié)點(diǎn)最近)的一個(gè)非CANCELLED節(jié)點(diǎn)喚醒晓锻,代碼如下:

    private void unparkSuccessor(Node node) {
        int ws = node.waitStatus;
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);


        Node s = node.next;
        /*
           如果發(fā)現(xiàn)node的后繼節(jié)點(diǎn)為null或者狀態(tài)是CANCELLED時(shí)歌焦,
           會(huì)找出離tail最遠(yuǎn)(或者說(shuō)離node節(jié)點(diǎn)最近)的一個(gè)非CANCELLED節(jié)點(diǎn)喚醒
         */
        if (s == null || s.waitStatus > 0) {
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null)
            LockSupport.unpark(s.thread);
    }

可以看出這里也是依靠LockSupport來(lái)喚醒線程的。
后繼線程被喚醒之后就會(huì)從前面我畫(huà)的acquire方法的流程圖中的節(jié)點(diǎn)⑦開(kāi)始不斷地嘗試獲得鎖砚哆,直到成功独撇。

AQS中對(duì)CLH算法的實(shí)現(xiàn)與標(biāo)準(zhǔn)的CLH算法有什么異同?

到這里已經(jīng)可以解答這個(gè)問(wèn)題了躁锁。AQS到底在哪些地方"變種"CLH鎖算法纷铣?

  1. CLH是一種自旋鎖算法(在得到鎖之前會(huì)不停地自旋),而AQS會(huì)在幾次自旋失敗后就將線程阻塞战转,這是為了避免不必要地占用CPU搜立;
  2. CLH是自旋在前繼節(jié)點(diǎn)的標(biāo)志位上的,而AQS是自旋在p == head上面(即不停地判斷前繼節(jié)點(diǎn)是否是頭節(jié)點(diǎn))槐秧,只有在發(fā)現(xiàn)前繼節(jié)點(diǎn)是頭節(jié)點(diǎn)時(shí)括荡,才會(huì)通過(guò)tryAcquire嘗試獲得鎖傻昙,這里有一個(gè)比較另我困惑的地方避凝,就是head是一個(gè)volatile的全局引用坠七,這么做的話顯然違背了CLH鎖的Local Spin的思想,具體原因未知命雀,可能是因?yàn)锳QS最初就是被設(shè)計(jì)為阻塞的同步器而不是自旋鎖吧蒜哀。

acquireShared方法

點(diǎn)進(jìn)這個(gè)方法,發(fā)現(xiàn)其邏輯和acquire基本一樣吏砂,唯獨(dú)不同的地方如下:

    private void doAcquireShared(int arg) {
        //共享模式入隊(duì)
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head) {
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {
                        //這里不同
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        if (interrupted)
                            selfInterrupt();
                        failed = false;
                        return;
                    }
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

第一行的addWaiter會(huì)將節(jié)點(diǎn)標(biāo)記為共享模式入隊(duì)(這個(gè)標(biāo)記其實(shí)就是在Node的nextWaiter屬性上添加一個(gè)Node.SHARED撵儿,在CLH隊(duì)列中,nextWaiter屬性沒(méi)有用狐血,所以這里就暫時(shí)拿來(lái)標(biāo)記淀歇,isShared方法會(huì)用這個(gè)標(biāo)記來(lái)判斷節(jié)點(diǎn)是否為共享模式)。
雖然寫(xiě)法和acquire不太一樣匈织,但是可以看出邏輯基本相同浪默,唯一值得注意的地方是,acquire方法在tryAcquire成功時(shí)缀匕,直接setHead將自己置為CLH隊(duì)列的隊(duì)頭纳决,而這里調(diào)用了一個(gè)叫做setHeadAndPropagate的方法,雖然名字看起來(lái)差不多乡小,但是邏輯卻很不相同阔加,點(diǎn)開(kāi)來(lái)看看:

    /**
     * @param node      the node
     * @param propagate 剩余許可數(shù)量
     */
    private void setHeadAndPropagate(Node node, int propagate) {
        Node h = head; // Record old head for check below
        setHead(node);

        if (propagate > 0 || h == null || h.waitStatus < 0 ||
            (h = head) == null || h.waitStatus < 0) {
            Node s = node.next;
            if (s == null || s.isShared())
                doReleaseShared();
        }
    }

上面的代碼除了將自己置為頭節(jié)點(diǎn)外,還會(huì)繼續(xù)嘗試喚醒后繼節(jié)點(diǎn)(doReleaseShared)满钟,讓他們也來(lái)嘗試爭(zhēng)搶鎖胜榔。
doReleaseShared的代碼如下:

    private void doReleaseShared() {
        for (;;) {
            Node h = head;
            if (h != null && h != tail) {
                int ws = h.waitStatus;
                if (ws == Node.SIGNAL) {
                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                        continue;            // loop to recheck cases
                    unparkSuccessor(h);
                }
                else if (ws == 0 &&
                         !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                    continue;                // loop on failed CAS
            }
            if (h == head)                   // loop if head changed
                break;
        }
    }

可以看出這個(gè)方法會(huì)讓head的waitStatus產(chǎn)生如下變化:


共享模式waitStatus的變化過(guò)程

releaseShared方法

主要就是調(diào)用上面提到的doReleaseShared方法:

    public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {
            doReleaseShared();
            return true;
        }
        return false;
    }

從這里我們可以看出doReleaseShared方法會(huì)在線程獲得鎖和釋放鎖時(shí)分別調(diào)用一次胳喷,所以在共享模式下一個(gè)線程對(duì)應(yīng)的節(jié)點(diǎn)比較常見(jiàn)的狀態(tài)轉(zhuǎn)移大約是0(新建節(jié)點(diǎn)時(shí)) -> SIGNAL -> 0 -> PROPAGATE

ConditionObject內(nèi)部類(lèi)

雖然加鎖和釋放鎖的代碼都講完了,但是AQS遠(yuǎn)沒(méi)有那么簡(jiǎn)單夭织。
仔細(xì)看一下源碼吭露,發(fā)現(xiàn)AQS還有一個(gè)叫做ConditionObject的內(nèi)部類(lèi),這個(gè)類(lèi)是用來(lái)干嘛的呢摔癣?
在使用conncurrent包中的鎖(比如ReentrantLock)的時(shí)候奴饮,我們一般會(huì)使用lock.newCondition()方法返回一個(gè)Condition對(duì)象來(lái)對(duì)爭(zhēng)搶鎖的線程進(jìn)行同步。
看一下ReentrantLock的newCondition方法的代碼:

    public Condition newCondition() {
        return sync.newCondition();
    }

發(fā)現(xiàn)是委托給內(nèi)部類(lèi)Sync的一個(gè)實(shí)例sync的newCondition方法择浊,在點(diǎn)進(jìn)去看看Sync的newCondition方法,內(nèi)容如下:

        final ConditionObject newCondition() {
            return new ConditionObject();
        }

這里出現(xiàn)了ConditionObject逾条,Sync繼承了AQS琢岩,這里的ConditionObject就是AQS中的內(nèi)部類(lèi)ConditionObject,這個(gè)類(lèi)幾乎不需要經(jīng)過(guò)任何修改就可以直接用來(lái)同步师脂,感覺(jué)很神奇担孔。
其實(shí)ConditionObject內(nèi)部又維護(hù)了一個(gè)隊(duì)列,我稱(chēng)之為Condition隊(duì)列吃警,這個(gè)隊(duì)列同樣是由Node類(lèi)的實(shí)例組成糕篇。
我們來(lái)看一下它的三個(gè)重要方法,分別是:

  • await
  • signal
  • signalAll

await方法

await方法的主體邏輯如下:

await方法
        public final void await() throws InterruptedException {
            if (Thread.interrupted())
                throw new InterruptedException();
            //流程圖一
            Node node = addConditionWaiter();
            //二
            int savedState = fullyRelease(node);
            int interruptMode = 0;
            //三
            while (!isOnSyncQueue(node)) {
                //四
                LockSupport.park(this);
                if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                    break;
            }
            //五
            if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
                interruptMode = REINTERRUPT;
            if (node.nextWaiter != null) // clean up if cancelled
                unlinkCancelledWaiters();
            if (interruptMode != 0)
                reportInterruptAfterWait(interruptMode);
        }

對(duì)應(yīng)著addConditionWaiter方法的調(diào)用酌心,這個(gè)ConditionObject實(shí)例自己的方法拌消。從addConditionWaiter方法可以看出,Node類(lèi)的nextWaiter字段其實(shí)是用來(lái)存放Condition隊(duì)列的后繼的安券,要和next字段(用來(lái)存放CLH隊(duì)列后繼)進(jìn)行區(qū)分墩崩。

字段 含義
next CLH隊(duì)列中的后繼
nextWaiter Condition隊(duì)列中的后繼,如果Node節(jié)點(diǎn)在CLH隊(duì)列中侯勉,這個(gè)字段也可能作為共享模式(Node.SHARED)的標(biāo)記
        private Node addConditionWaiter() {
            Node t = lastWaiter;
            // If lastWaiter is cancelled, clean out.
            if (t != null && t.waitStatus != Node.CONDITION) {
                unlinkCancelledWaiters();
                t = lastWaiter;
            }
            Node node = new Node(Thread.currentThread(), Node.CONDITION);
            if (t == null)
                firstWaiter = node;
            else
                t.nextWaiter = node;
            lastWaiter = node;
            return node;
        }

對(duì)應(yīng)著fullyRelease方法的調(diào)用鹦筹,fullyRelease會(huì)先保存當(dāng)前同步變量(state),然后通過(guò)之前講過(guò)的release方法將其全部釋放掉址貌,最后將其保存的同步變量(state)返回給上層方法铐拐,留復(fù)原的時(shí)候用,代碼如下:

    final int fullyRelease(Node node) {
        boolean failed = true;
        try {
            int savedState = getState();
            if (release(savedState)) {
                failed = false;
                return savedState;
            } else {
                throw new IllegalMonitorStateException();
            }
        } finally {
            if (failed)
                node.waitStatus = Node.CANCELLED;
        }
    }

這一步其實(shí)就是while循環(huán)的判斷條件isOnSyncQueue方法:

    final boolean isOnSyncQueue(Node node) {
        if (node.waitStatus == Node.CONDITION || node.prev == null)
            return false;
        if (node.next != null) // If has successor, it must be on queue
            return true;
        return findNodeFromTail(node);
    }

這個(gè)方法其實(shí)就是在判斷node節(jié)點(diǎn)是否在CLH隊(duì)列中练对,這個(gè)node就是在第①步中創(chuàng)建并加入Condition隊(duì)列的節(jié)點(diǎn)遍蟋。
可能會(huì)疑惑,明明是在Condition隊(duì)列中的節(jié)點(diǎn)锹淌,怎么又突然跑到CLH隊(duì)列中呢匿值?其實(shí)是下文中的signal方法搞的鬼。

這一步很明顯就是while循環(huán)中的LockSupport.park(this);赂摆,這里也是利用LockSupport阻塞線程的挟憔。
這里需要注意的一個(gè)細(xì)節(jié)是下面兩句的中斷檢查:

                if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                    break;

點(diǎn)開(kāi)checkInterruptWhileWaiting方法:

        /**
         * Checks for interrupt, returning THROW_IE if interrupted
         * before signalled, REINTERRUPT if after signalled, or
         * 0 if not interrupted.
         */
        private int checkInterruptWhileWaiting(Node node) {
            return Thread.interrupted() ?
                (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
                0;
        }

英文注釋中已經(jīng)解釋了這個(gè)方法的含義:如果中斷發(fā)生在signal之前钟些,則在await最后會(huì)拋出InterruptedException異常(這里標(biāo)記為T(mén)HROW_IE);如果中斷發(fā)生在signale之后绊谭,則選擇在最后重置中斷位(標(biāo)記為REINTERRUPT)政恍。標(biāo)記的處理在await方法的最后一句reportInterruptAfterWait調(diào)用里面進(jìn)行,reportInterruptAfterWait內(nèi)容如下:

        private void reportInterruptAfterWait(int interruptMode)
            throws InterruptedException {
            if (interruptMode == THROW_IE)
                throw new InterruptedException();
            else if (interruptMode == REINTERRUPT)
                selfInterrupt();
        }

含義非常顯而易見(jiàn)达传。
為什么要做這兩種不同的中斷處理呢篙耗?我覺(jué)得是為了方便上層應(yīng)用區(qū)分:線程從await方法中蘇醒究竟是因?yàn)橹袛啵═HROW_IE)還是因?yàn)楸黄渌€程signal(REINTERRUPT)。

標(biāo)記 行為 含義
THROW_IE await方法的最后拋出InterruptedException異常 線程蘇醒是由中斷引起的
REINTERRUPT await方法的最后重置中斷標(biāo)志位 線程蘇醒是由其他線程調(diào)用signal方法引起的

再回到checkInterruptWhileWaiting方法宪赶,這個(gè)方法中處理判斷要怎么處理中斷以外宗弯,transferAfterCancelledWait調(diào)用還干了其他事情,代碼如下:

    final boolean transferAfterCancelledWait(Node node) {
        if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
            enq(node);
            return true;
        }
        /*
         * If we lost out to a signal(), then we can't proceed
         * until it finishes its enq().  Cancelling during an
         * incomplete transfer is both rare and transient, so just
         * spin.
         */
        while (!isOnSyncQueue(node))
            Thread.yield();
        return false;
    }

這個(gè)方法一定會(huì)等到節(jié)點(diǎn)加入CLH隊(duì)列才會(huì)返回搂妻,而且按照英文注釋?zhuān)旅婺欠N自旋的情況只有在release方法將節(jié)點(diǎn)從Condition隊(duì)列搬運(yùn)到CLH隊(duì)列的工程中才會(huì)發(fā)生蒙保,發(fā)生的可能性是很低的,所以我們可以認(rèn)為當(dāng)線程被中斷后欲主,它就會(huì)立刻將自己加入CLH隊(duì)列邓厕。

⑤主要就是對(duì)我之前講過(guò)的acquireQueued方法的調(diào)用,里面的過(guò)程見(jiàn)我對(duì)acquire方法的講解扁瓢,線程會(huì)在這個(gè)方法里反復(fù)嘗試详恼,直到獲得鎖才會(huì)退出該方法(即便遇到了中斷也直到獲得鎖才會(huì)退出,這時(shí)acquireQueued返回true)引几。

signal方法

主體邏輯的流程圖如下:

signal方法

① ②

signal方法的代碼如下:

        public final void signal() {
            //流程圖節(jié)點(diǎn)一
            if (!isHeldExclusively())
                //二
                throw new IllegalMonitorStateException();
            //剩下的步驟(三,四,五,六,七)
            Node first = firstWaiter;
            if (first != null)
                doSignal(first);
        }

①②兩步對(duì)應(yīng)著signal方法的前兩句昧互,非常顯然,就不多說(shuō)了她紫。

接下來(lái)的步驟的關(guān)鍵是doSignal方法:

        private void doSignal(Node first) {
            do {
                if ( (firstWaiter = first.nextWaiter) == null)
                    lastWaiter = null;
                first.nextWaiter = null;
            } while (!transferForSignal(first) &&
                     (first = firstWaiter) != null);
        }

第二行的firstWaiter = first.nextWaiter會(huì)獲取Condition隊(duì)列的下一個(gè)節(jié)點(diǎn)硅堆,這個(gè)節(jié)點(diǎn)被獲取過(guò)之后就會(huì)被從Condition隊(duì)列中刪除。

④其實(shí)就是while語(yǔ)句中的第一個(gè)判斷條件贿讹,第二個(gè)判斷條件很好懂渐逃,就是如果一直到隊(duì)列尾部都沒(méi)有找到合適的節(jié)點(diǎn),循環(huán)就結(jié)束民褂。
這個(gè)循環(huán)中需要注意一點(diǎn)的是茄菊,雖然每次一進(jìn)來(lái)就會(huì)獲取下一個(gè)節(jié)點(diǎn)(將Condition隊(duì)列隊(duì)頭(firstWaiter)設(shè)置為下一個(gè)節(jié)點(diǎn) ),但是傳進(jìn)while的第一個(gè)判斷條件transferForSignal方法的參數(shù)其實(shí)是前一個(gè)節(jié)點(diǎn)(first)赊堪,第二個(gè)判斷條件用的才是firstWaiter節(jié)點(diǎn)面殖。
第一個(gè)判斷條件的含義需要點(diǎn)開(kāi)transferForSignal方法才能明白:

    final boolean transferForSignal(Node node) {
        if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
            return false;

        Node p = enq(node);
        int ws = p.waitStatus;
        if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
            LockSupport.unpark(node.thread);
        return true;
    }

只有在第一行的CAS操作失敗時(shí)才會(huì)返回false,個(gè)人認(rèn)為在這里CAS操作失敗的唯一可能就是node的waitStatus不是CONDITION哭廉,所以我認(rèn)為第一個(gè)判斷條件的含義就是判斷節(jié)點(diǎn)的waitStatus是否是CONDITION脊僚。

從transferForSignal方法中可以看出在CAS操作成功后,首先就會(huì)調(diào)用enq方法將node節(jié)點(diǎn)加入CLH隊(duì)列(node節(jié)點(diǎn)在前面的while循環(huán)中已經(jīng)從Condition隊(duì)列刪除了,所以node節(jié)點(diǎn)同時(shí)只會(huì)在一個(gè)隊(duì)列)辽幌,enq方法同時(shí)會(huì)返回node節(jié)點(diǎn)的前繼增淹。在這里我們也可以看出,之所以Condition隊(duì)列和CLH隊(duì)列都采用Node類(lèi)作為節(jié)點(diǎn)的原因就是為了方便將節(jié)點(diǎn)從Condition隊(duì)列搬運(yùn)到CLH隊(duì)列乌企。

⑥ ⑦

transferForSignal的最后幾句干的就是這件事情:

        if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
            LockSupport.unpark(node.thread);

調(diào)用signal方法的線程先企圖幫助node節(jié)點(diǎn)對(duì)應(yīng)的等待線程將它的CLH隊(duì)列的前繼的waitStatus設(shè)置為SIGNAL虑润,如果不成功的話才將等待線程喚醒,讓其自行設(shè)置加酵。
之所以要進(jìn)行這么一次嘗試拳喻,是為了減少線程切換的開(kāi)銷(xiāo),盡量在當(dāng)前線程把事情都做掉猪腕,就不再麻煩等待線程了冗澈,等到有資源的時(shí)候,自然會(huì)有它CLH隊(duì)列的前繼來(lái)將其喚醒陋葡。

signalAll方法

它與signal唯一的不同就在于渗柿,將transferForSignal方法加入了循環(huán)體,并且將while循環(huán)的判斷條件改成了first != null脖岛。

            do {
                Node next = first.nextWaiter;
                first.nextWaiter = null;
                transferForSignal(first);
                first = next;
            } while (first != null);

也就是說(shuō)它會(huì)遍歷完Condition隊(duì)列將他們?nèi)考尤隒LH隊(duì)列。

參考文獻(xiàn)


《The Art of Multiprocessor Programming》

https://www.e-reading.club/bookreader.php/134637/Herlihy,Shavit-_The_art_of_multiprocessor_programming.pdf

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末颊亮,一起剝皮案震驚了整個(gè)濱河市柴梆,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌终惑,老刑警劉巖绍在,帶你破解...
    沈念sama閱讀 212,542評(píng)論 6 493
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場(chǎng)離奇詭異雹有,居然都是意外死亡偿渡,警方通過(guò)查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,596評(píng)論 3 385
  • 文/潘曉璐 我一進(jìn)店門(mén)霸奕,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)溜宽,“玉大人,你說(shuō)我怎么就攤上這事质帅∈嗜啵” “怎么了?”我有些...
    開(kāi)封第一講書(shū)人閱讀 158,021評(píng)論 0 348
  • 文/不壞的土叔 我叫張陵煤惩,是天一觀的道長(zhǎng)嫉嘀。 經(jīng)常有香客問(wèn)我,道長(zhǎng)魄揉,這世上最難降的妖魔是什么剪侮? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 56,682評(píng)論 1 284
  • 正文 為了忘掉前任,我火速辦了婚禮洛退,結(jié)果婚禮上瓣俯,老公的妹妹穿的比我還像新娘杰标。我一直安慰自己,他們只是感情好降铸,可當(dāng)我...
    茶點(diǎn)故事閱讀 65,792評(píng)論 6 386
  • 文/花漫 我一把揭開(kāi)白布在旱。 她就那樣靜靜地躺著,像睡著了一般推掸。 火紅的嫁衣襯著肌膚如雪桶蝎。 梳的紋絲不亂的頭發(fā)上,一...
    開(kāi)封第一講書(shū)人閱讀 49,985評(píng)論 1 291
  • 那天谅畅,我揣著相機(jī)與錄音登渣,去河邊找鬼。 笑死毡泻,一個(gè)胖子當(dāng)著我的面吹牛胜茧,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播仇味,決...
    沈念sama閱讀 39,107評(píng)論 3 410
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼呻顽,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼!你這毒婦竟也來(lái)了丹墨?” 一聲冷哼從身側(cè)響起廊遍,我...
    開(kāi)封第一講書(shū)人閱讀 37,845評(píng)論 0 268
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤,失蹤者是張志新(化名)和其女友劉穎贩挣,沒(méi)想到半個(gè)月后喉前,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 44,299評(píng)論 1 303
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡王财,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,612評(píng)論 2 327
  • 正文 我和宋清朗相戀三年卵迂,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片绒净。...
    茶點(diǎn)故事閱讀 38,747評(píng)論 1 341
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡见咒,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出疯溺,到底是詐尸還是另有隱情论颅,我是刑警寧澤,帶...
    沈念sama閱讀 34,441評(píng)論 4 333
  • 正文 年R本政府宣布囱嫩,位于F島的核電站恃疯,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏墨闲。R本人自食惡果不足惜今妄,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 40,072評(píng)論 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧盾鳞,春花似錦犬性、人聲如沸。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 30,828評(píng)論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)。三九已至推励,卻和暖如春鹤耍,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背验辞。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 32,069評(píng)論 1 267
  • 我被黑心中介騙來(lái)泰國(guó)打工稿黄, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人跌造。 一個(gè)月前我還...
    沈念sama閱讀 46,545評(píng)論 2 362
  • 正文 我出身青樓杆怕,卻偏偏與公主長(zhǎng)得像,于是被迫代替她去往敵國(guó)和親壳贪。 傳聞我的和親對(duì)象是個(gè)殘疾皇子陵珍,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 43,658評(píng)論 2 350

推薦閱讀更多精彩內(nèi)容