Java AQS源碼解讀

1椎例、先聊點別的

說實話,關(guān)于AQS的設(shè)計理念请祖、實現(xiàn)订歪、使用,我有打算寫過一篇技術(shù)文章肆捕,但是在寫完初稿后刷晋,發(fā)現(xiàn)掌握的還是模模糊糊的,模棱兩可慎陵。
痛定思痛眼虱,腳踏實地重新再來一遍。這次以 Java 8源碼為基礎(chǔ)進(jìn)行解讀席纽。

2捏悬、AQS簡介

java.util.concurrent.locks包下,有兩個這樣的類:

  • AbstractQueuedSynchronizer
  • AbstractQueuedLongSynchronizer

這兩個類的唯一區(qū)別就是:

  • AbstractQueuedSynchronizer內(nèi)部維護(hù)的state變量是int類型
  • AbstractQueuedLongSynchronizer內(nèi)部維護(hù)的state變量是long類型

我們常說的AQS其實泛指的就是這兩個類润梯,即抽象隊列同步器过牙。

抽象隊列同步器AbstractQueuedSynchronizer (以下都簡稱AQS),是用來構(gòu)建鎖或者其他同步組件的骨架類仆救,減少了各功能組件實現(xiàn)的代碼量抒和,也解決了在實現(xiàn)同步器時涉及的大量細(xì)節(jié)問題,例如等待線程采用FIFO隊列操作的順序彤蔽。在不同的同步器中還可以定義一些靈活的標(biāo)準(zhǔn)來判斷某個線程是應(yīng)該通過還是等待摧莽。

AQS采用模板方法模式,在內(nèi)部維護(hù)了n多的模板的方法的基礎(chǔ)上顿痪,子類只需要實現(xiàn)特定的幾個方法(不是抽象方法镊辕!不是抽象方法!不是抽象方法R舷)征懈,就可以實現(xiàn)子類自己的需求。

基于AQS實現(xiàn)的組件揩悄,諸如:

  • ReentrantLock 可重入鎖(支持公平和非公平的方式獲取鎖)
  • Semaphore 計數(shù)信號量
  • ReentrantReadWriteLock 讀寫鎖
  • ...

AQS是Doug Lea的大作之一卖哎,在維基百科查關(guān)于他的資料時,偶然發(fā)現(xiàn)老爺子喜歡紅色或淡粉色襯衫?

3亏娜、AQS設(shè)計思路

AQS內(nèi)部維護(hù)了一個int成員變量來表示同步狀態(tài)焕窝,通過內(nèi)置的FIFO(first-in-first-out)同步隊列來控制獲取共享資源的線程。

我們可以猜測出维贺,AQS其實主要做了這么幾件事情:

  • 同步狀態(tài)(state)的維護(hù)管理
  • 等待隊列的維護(hù)管理
  • 線程的阻塞與喚醒

ps: 當(dāng)然了它掂,其內(nèi)部還維護(hù)了一個ConditionObject 內(nèi)部類,主要用作線程的協(xié)作與通信溯泣,我們暫時先不講這個帥哥虐秋。

通過AQS內(nèi)部維護(hù)的int型的state,可以用于表示任意狀態(tài)垃沦!

  • ReentrantLock用它來表示鎖的持有者線程已經(jīng)重復(fù)獲取該鎖的次數(shù)客给,而對于非鎖的持有者線程來說,如果state大于0栏尚,意味著無法獲取該鎖起愈,將該線程包裝為Node,加入到同步等待隊列里译仗。
  • Semaphore用它來表示剩余的許可數(shù)量抬虽,當(dāng)許可數(shù)量為0時,對未獲取到許可但正在努力嘗試獲取許可的線程來說纵菌,會進(jìn)入同步等待隊列阐污,阻塞,直到一些線程釋放掉持有的許可(state+1)咱圆,然后爭用釋放掉的許可笛辟。
  • FutureTask用它來表示任務(wù)的狀態(tài)(未開始、運(yùn)行中序苏、完成手幢、取消)。
  • ReentrantReadWriteLock在使用時忱详,稍微有些不同围来,int型state用二進(jìn)制表示是32位,前16位(高位)表示為讀鎖匈睁,后面的16位(低位)表示為寫鎖监透。
  • CountDownLatch使用state表示計數(shù)次數(shù),state大于0航唆,表示需要加入到同步等待隊列并阻塞胀蛮,直到state等于0,才會逐一喚醒等待隊列里的線程糯钙。

3.1 偽代碼之獲取鎖:

boolean acquire() throws InterruptedException {
  while(當(dāng)前狀態(tài)不允許獲取操作) {
    if(需要阻塞獲取請求) {
      如果當(dāng)前線程不在隊列中粪狼,則將其插入隊列
      阻塞當(dāng)前線程
    }
    else
      返回失敗
  }
  可能更新同步器的狀態(tài)
  如果線程位于隊列中退腥,則將其移出隊列
  返回成功
}

3.2 偽代碼之釋放鎖:

void release() {
  更新同步器的狀態(tài)
  if (新的狀態(tài)允許某個被阻塞的線程獲取成功)
    解除隊列中一個或多個線程的阻塞狀態(tài)
}

大概就是闡述這么個思路。

3.3 提供的方法

3.3.1 共通方法

以下三個方法再榄,均為protected final修飾阅虫,每個繼承AQS的類都可以調(diào)用這三個方法。

  • protected final int getState() 獲取同步狀態(tài)
  • protected final void setState(int newState) 設(shè)置同步狀態(tài)
  • protected final boolean compareAndSetState(int expect, int update) 如果當(dāng)前狀態(tài)值等于預(yù)期值不跟,原子性地將同步狀態(tài)設(shè)置為給定的更新值,并返回true米碰;否則返回false
3.3.2 子類需要實現(xiàn)的方法

以下五個方法窝革,在AQS內(nèi)部并未實現(xiàn),而是交由子類去實現(xiàn)吕座,然后AQS再調(diào)用子類的實現(xiàn)方法虐译,完成邏輯處理。

  • protected boolean tryAcquire(int) 嘗試以獨(dú)占模式獲取操作吴趴,應(yīng)查詢對象的狀態(tài)是否允許以獨(dú)占模式獲取它漆诽,如果允許則獲取它。
  • protected boolean tryRelease(int) 嘗試釋放同步狀態(tài)
  • protected int tryAcquireShared(int) 共享的方式嘗試獲取操作
  • protected boolean tryReleaseShared(int) 共享的方式嘗試釋放
  • protected boolean isHeldExclusively() 調(diào)用此方法的線程锣枝,是否是獨(dú)占鎖的持有者

子類無須實現(xiàn)上述的所有方法厢拭,可以選擇其中一部分進(jìn)行覆寫,但是要保持實現(xiàn)邏輯完整撇叁,不能穿插實現(xiàn)供鸠。根據(jù)實現(xiàn)方式不同,分為獨(dú)占鎖策略實現(xiàn)和共享鎖策略實現(xiàn)陨闹。

這也是為什么上述方法沒有定義為抽象方法的原因楞捂。如果定義為抽象方法,子類必須實現(xiàn)所有的五個方法趋厉,哪怕你壓根就用不到寨闹。

獨(dú)占鎖:

  • ReentrantLock
  • ReentrantReadWriteLock.WriteLock
    實現(xiàn)策略:
  • tryAcquire(int)
  • tryRelease(int)
  • isHeldExclusively()

共享鎖:

  • CountDownLatch
  • ReentrantReadWriteLock.ReadLock
  • Semaphore
    實現(xiàn)策略:
  • tryAcquireShared(int)
  • tryReleaseShared(int)

AQS還有很多內(nèi)部模板方法,就不一一舉例了君账,之后的源碼解讀繁堡,會展示一部分,并會配上騷氣的注釋杈绸。

4帖蔓、AQS內(nèi)部屬性

4.1 CLH隊列

AQS通過內(nèi)置的FIFO(first-in-first-out)同步隊列來控制獲取共享資源的線程。CLH隊列是FIFO的雙端雙向隊列瞳脓,AQS的同步機(jī)制就是依靠這個CLH隊列完成的塑娇。隊列的每個節(jié)點,都有前驅(qū)節(jié)點指針和后繼節(jié)點指針劫侧。

頭結(jié)點并不在阻塞隊列內(nèi)埋酬!

AQS-Node.jpg

Node源碼:

static final class Node {
    // 共享模式下等待標(biāo)記
    static final Node SHARED = new Node();

    // 獨(dú)占模式下等待標(biāo)記
    static final Node EXCLUSIVE = null;

    // 表示當(dāng)前的線程被取消
    static final int CANCELLED = 1;

    // 表示當(dāng)前節(jié)點的后繼節(jié)點包含的線程需要運(yùn)行哨啃,也就是unpark
    static final int SIGNAL = -1;

    // 表示當(dāng)前節(jié)點在等待condition,也就是在condition隊列中
    static final int CONDITION = -2;

    // 表示當(dāng)前場景下后續(xù)的acquireShared能夠得以執(zhí)行
    static final int PROPAGATE = -3;
    /**
     * CANCELLED =  1 // 當(dāng)前線程因為超時或者中斷被取消写妥。這是一個終結(jié)態(tài)拳球,也就是狀態(tài)到此為止。
     * SIGNAL    = -1 // 表示當(dāng)前線程的后繼線程被阻塞或即將被阻塞珍特,當(dāng)前線程釋放鎖或者取消后需要喚醒后繼線程祝峻。這個狀態(tài)一般都是后繼節(jié)點設(shè)置前驅(qū)節(jié)點的
     * CONDITION = -2 // 表示當(dāng)前線程在Condition隊列中
     * PROPAGATE = -3 // 用于將喚醒后繼線程傳遞下去,這個狀態(tài)的引入是為了完善和增強(qiáng)共享鎖的喚醒機(jī)制
     * 0              // 表示無狀態(tài)或者終結(jié)狀態(tài)扎筒!
     */
    volatile int waitStatus;

    // 前驅(qū)節(jié)點
    volatile Node prev;

    // 后繼節(jié)點
    volatile Node next;

    // 當(dāng)前節(jié)點的線程莱找,初始化使用,在使用后失效
    volatile Thread thread;

    // 存儲condition隊列中的后繼節(jié)點
    Node nextWaiter;

    // 如果該節(jié)點處于共享模式下等待嗜桌,返回true
    final boolean isShared() {
        return nextWaiter == SHARED;
    }
    // 返回當(dāng)前節(jié)點的前驅(qū)節(jié)點奥溺,如果為空,直接拋出空指針異常
    final Node predecessor() throws NullPointerException {
        Node p = prev;
        if (p == null)
            throw new NullPointerException();
        else
            return p;
    }

    Node() {    // Used to establish initial head or SHARED marker
    }

    // 指定線程和模式的構(gòu)造方法
    Node(Thread thread, Node mode) {     // Used by addWaiter
        // SHARED和EXCLUSIVE 用于表示當(dāng)前節(jié)點是共享還是獨(dú)占
        this.nextWaiter = mode;
        this.thread = thread;
    }

    // 指定線程和節(jié)點狀態(tài)的構(gòu)造方法
    Node(Thread thread, int waitStatus) { // Used by Condition
        this.waitStatus = waitStatus;
        this.thread = thread;
    }
}

4.2 volatile state

最為重要的屬性骨宠,這個整數(shù)可以用于表示任意狀態(tài)浮定!在上面有說過。

4.2 volatile head & volatile tail

head 頭結(jié)點层亿,但是這個頭節(jié)點只是個虛節(jié)點桦卒,只是邏輯上代表持有鎖的線程節(jié)點,且head節(jié)點是不存儲thread線程信息和前驅(qū)節(jié)點信息的匿又。

tail 尾節(jié)點闸盔,每個新節(jié)點都會進(jìn)入隊尾。不存儲后繼節(jié)點信息琳省。

  • 這兩個屬性是延遲初始化的迎吵,在第一次且第一個線程持有鎖時,第二個線程因為獲取失敗针贬,進(jìn)入同步隊列時會對head和tail進(jìn)行初始化击费,也就是說在所有線程都能獲取到鎖時,其內(nèi)部的head和tail都為null桦他,一旦head 和 tail被初始化后蔫巩,即使后來沒有線程持有鎖,其內(nèi)部的head 和 tail 依然保留最后一個持有鎖的線程節(jié)點?煅埂(head 和 tail都指向一個內(nèi)存地址)
  • 當(dāng)一個線程獲取鎖失敗而被加入到同步隊列時圆仔,會用CAS來設(shè)置尾節(jié)點tail為當(dāng)前線程對應(yīng)的Node節(jié)點。
  • AQS內(nèi)部的cas操作蔫劣,都是依賴Unsafe類的坪郭,自Java9之后的版本,Unsafe類被移除脉幢,取而代之的是VarHandle類歪沃。

這兩個屬性均為volatile所修飾(保證了變量具有有序性和可見性)

4.3 spinForTimeoutThreshold

自旋超時閥值嗦锐,在doAcquireSharedNanos()等方法中有使用到。

  • 如果用戶定義的等待時間超過這個閥值沪曙,那么線程將阻塞奕污,在阻塞期間如果能夠等到喚醒的機(jī)會并tryAcquireShared成功,則返回true液走,否則返回false碳默,超時也返回false。
  • 如果用戶定義的等待時間小于等于這個閥值缘眶,則會無限循環(huán)腻窒,線程不阻塞,直到有線程釋放同步狀態(tài)或者超時磅崭,然后返回對應(yīng)的結(jié)果。

4.4 exclusiveOwnerThread

這是AQS通過繼承AbstractOwnableSynchronizer類瓦哎,獲得的屬性砸喻,表示獨(dú)占模式下的同步器持有者。

5蒋譬、AQS具體實現(xiàn)

5.1 獨(dú)占鎖實現(xiàn)思路

5.1.1 獲取鎖 ReentrantLock.lock()
/**
 * 獲取獨(dú)占鎖割岛,忽略中斷。
 * 首先嘗試獲取鎖犯助,如果成功癣漆,則返回true;否則會把當(dāng)前線程包裝成Node插入到隊尾剂买,在隊列中會檢測是否為head的直接后繼惠爽,并嘗試獲取鎖,
 * 如果獲取失敗,則會通過LockSupport阻塞當(dāng)前線程瞬哼,直至被釋放鎖的線程喚醒或者被中斷婚肆,隨后再次嘗試獲取鎖,如此反復(fù)坐慰。被喚醒后繼續(xù)之前的代碼執(zhí)行
 */
public final void acquire(int arg) {
    if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}
---------------------------------------------------------------------------------------
其中tryAcquire()方法需要由子類實現(xiàn)较性,ReentrantLock通過覆寫這個方法實現(xiàn)了公平鎖和非公平鎖
---------------------------------------------------------------------------------------

/**
 * 在同步等待隊列中插入節(jié)點
 */
private Node addWaiter(Node mode) {
    Node node = new Node(Thread.currentThread(), mode);
    Node pred = tail;
    // 判斷尾節(jié)點是否為null
    if (pred != null) {
        node.prev = pred;
        // 通過CAS在隊尾插入當(dāng)前節(jié)點
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    // tail節(jié)點為null,則將新節(jié)點插入隊尾结胀,必要時進(jìn)行初始化
    enq(node);
    return node;
}

/**
 * 通過無限循環(huán)和CAS操作在隊列中插入一個節(jié)點成功后返回赞咙。
 * 將節(jié)點插入隊列,必要時進(jìn)行初始化
 */
private Node enq(final Node node) {
    for (;;) {
        Node t = tail;
        // 初始化head和tail
        if (t == null) {
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
            node.prev = t;
            /*
             CAS設(shè)置tail為node
             表面上看是把老tail的next連接到node糟港。
             如果同步隊列head節(jié)點和tail節(jié)點剛剛被這個線程初始化攀操,實際上也把head的next也連接到了node,而老tail節(jié)點被node取締秸抚。
             反之則是崔赌,把老tail的next連接到node意蛀,head并沒有與node產(chǎn)生連接,這樣就形成了鏈表 head <-> old_tail <-> tail
             */
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}

/**
 * 在隊列中的節(jié)點通過此方法獲取鎖健芭,忽略中斷县钥。
 * 這個方法很重要,如果上述沒有獲取到鎖慈迈,將線程包裝成Node節(jié)點加入到同步隊列的尾節(jié)點若贮,然后看代碼里的注釋
 */
final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();
            /*
             * 檢測當(dāng)前節(jié)點前驅(qū)是否head,這是試獲取鎖痒留。
             * 如果是的話谴麦,則調(diào)用tryAcquire嘗試獲取鎖,
             * 成功,則將head置為當(dāng)前節(jié)點伸头。原h(huán)ead節(jié)點的next被置為null等待GC垃圾回收
             */
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            /*
             * 如果未成功獲取鎖則根據(jù)前驅(qū)節(jié)點判斷是否要阻塞匾效。
             * 如果阻塞過程中被中斷,則置interrupted標(biāo)志位為true恤磷。
             * shouldParkAfterFailedAcquire方法在前驅(qū)狀態(tài)不為SIGNAL的情況下都會循環(huán)重試獲取鎖面哼。
             * 如果shouldParkAfterFailedAcquire返回true,則會將當(dāng)前線程阻塞并檢查是否被中斷
             */
            if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

/**
 * 根據(jù)前驅(qū)節(jié)點中的waitStatus來判斷是否需要阻塞當(dāng)前線程扫步。
 */
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    int ws = pred.waitStatus;
    if (ws == Node.SIGNAL)
        /*
         * 前驅(qū)節(jié)點設(shè)置為SIGNAL狀態(tài)魔策,在釋放鎖的時候會喚醒后繼節(jié)點,
         * 所以后繼節(jié)點(也就是當(dāng)前節(jié)點)現(xiàn)在可以阻塞自己河胎。
         */
        return true;
    if (ws > 0) {
        /*
         * 前驅(qū)節(jié)點狀態(tài)為取消,向前遍歷闯袒,更新當(dāng)前節(jié)點的前驅(qū)為往前第一個非取消節(jié)點。
         * 當(dāng)前線程會之后會再次回到循環(huán)并嘗試獲取鎖游岳。
         */
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
         /**
          * 等待狀態(tài)為0或者PROPAGATE(-3)政敢,設(shè)置前驅(qū)的等待狀態(tài)為SIGNAL,
          * 并且之后會回到循環(huán)再次重試獲取鎖。
          */
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}

/**
 * 該方法實現(xiàn)某個node取消獲取鎖胚迫。
 */
private void cancelAcquire(Node node) {
   if (node == null)
       return;

   node.thread = null;

   // 遍歷并更新節(jié)點前驅(qū)堕仔,把node的prev指向前部第一個非取消節(jié)點。
   Node pred = node.prev;
   while (pred.waitStatus > 0)
       node.prev = pred = pred.prev;

   // 記錄pred節(jié)點的后繼為predNext晌区,后續(xù)CAS會用到摩骨。
   Node predNext = pred.next;

   // 直接把當(dāng)前節(jié)點的等待狀態(tài)置為取消,后繼節(jié)點調(diào)用cancelAcquire方法時,也可以跨過該節(jié)點
   node.waitStatus = Node.CANCELLED;

   // 如果當(dāng)前節(jié)點是尾節(jié)點朗若,則將尾節(jié)點置為當(dāng)前節(jié)點的前驅(qū)節(jié)點
   if (node == tail && compareAndSetTail(node, pred)) {
       compareAndSetNext(pred, predNext, null);
   } else {
       // 如果node還有后繼節(jié)點恼五,這種情況要做的是把pred和后繼非取消節(jié)點拼起來。
       int ws;
       if (pred != head && ((ws = pred.waitStatus) == Node.SIGNAL || (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) && pred.thread != null) {
           Node next = node.next;
           /* 
            * 如果node的后繼節(jié)點next非取消狀態(tài)的話哭懈,則用CAS嘗試把pred的后繼置為node的后繼節(jié)點
            * 這里if條件為false或者CAS失敗都沒關(guān)系灾馒,這說明可能有多個線程在取消,總歸會有一個能成功的遣总。
            */
           if (next != null && next.waitStatus <= 0)
               compareAndSetNext(pred, predNext, next);
       } else {
           unparkSuccessor(node);
       }
       
       /*
        * 在GC層面睬罗,和設(shè)置為null具有相同的效果
        */
       node.next = node; 
   }
}

獲取獨(dú)占鎖的執(zhí)行過程大致如下:
假設(shè)當(dāng)前鎖已經(jīng)被線程A持有轨功,且持有鎖的時間足夠長(方便我們講解,也防止抬杠)容达,線程B古涧、C獲取鎖失敗。

線程B:

  • 1花盐、將線程B包裝成Node節(jié)點(簡稱BN)羡滑,加入到同步等待隊列,此時BN的waitStatus=0
  • 2算芯、將tail節(jié)點設(shè)置為BN柒昏,且與head節(jié)點相連,形成鏈表
  • 3熙揍、head節(jié)點是個虛擬節(jié)點职祷,也就是持有鎖的線程(但并不包含有線程信息),tail節(jié)點就是BN
  • 4届囚、線程B進(jìn)入"無限循環(huán)"有梆,判斷前驅(qū)節(jié)點是否為頭節(jié)點(true)并再次嘗試獲取鎖(false,獲取鎖失斀毖恰)
  • 5、線程B將進(jìn)入shouldParkAfterFailedAcquire方法析砸,在方法內(nèi)部昔字,將BN的前驅(qū)節(jié)點(也就是頭結(jié)點)的waitStatus設(shè)置為 -1,此方法返回false
  • 6首繁、因為是無限循環(huán)作郭,所以線程B再次進(jìn)入shouldParkAfterFailedAcquire方法,由于BN的前驅(qū)節(jié)點(也就是頭結(jié)點)的waitStatus為 -1弦疮,所以直接返回true
  • 7夹攒、調(diào)用parkAndCheckInterrupt,當(dāng)前線程B被阻塞胁塞,等待喚醒咏尝。

線程C:

  • 1、將線程C包裝成Node節(jié)點(簡稱CN)啸罢,加入到同步等待隊列编检,此時CN的waitStatus=0
  • 2、將tail節(jié)點設(shè)置為CN扰才,且與原tail節(jié)點(BN節(jié)點)相連
  • 3允懂、線程C進(jìn)入"無限循環(huán)",判斷前驅(qū)節(jié)點是否為頭節(jié)點(false)
  • 4衩匣、線程C將進(jìn)入shouldParkAfterFailedAcquire方法蕾总,在方法內(nèi)部粥航,將CN的前驅(qū)節(jié)點(也就是BN結(jié)點)的waitStatus設(shè)置為 -1,此方法返回false
  • 5生百、因為是無限循環(huán)递雀,所以線程C再次進(jìn)入shouldParkAfterFailedAcquire方法,由于CN的前驅(qū)節(jié)點(也就是BN結(jié)點)的waitStatus為 -1置侍,所以直接返回true
  • 6映之、調(diào)用parkAndCheckInterrupt,線程C被阻塞蜡坊,等待喚醒杠输。

最終的隊列如下:

+------+        +------+        +------+
|      |  <---  |      |  <---  |      |
| head |        |  BN  |        | tail |
|  AN  |  --->  |      |  --->  | (CN) |
+------+        +------+        +------+
5.1.2 釋放鎖 ReentrantLock.unlock()

對于釋放獨(dú)占鎖,會調(diào)用tryRelaes(int)方法秕衙,該方法由子類實現(xiàn)蠢甲,在完全釋放掉鎖后,釋放掉鎖的線程會將后繼線程喚醒据忘,后繼線程進(jìn)行鎖爭用(非公平鎖)

public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;
        // 頭結(jié)點不為null且后繼節(jié)點是需要被喚醒的
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}

釋放獨(dú)占鎖的執(zhí)行過程大致如下(假設(shè)有后繼節(jié)點需要喚醒):

  • 將head節(jié)點的waitStatus設(shè)置為0
  • 喚醒后繼節(jié)點
  • 后繼節(jié)點線程被喚醒后鹦牛,會將后繼節(jié)點設(shè)置為head,并對后繼節(jié)點內(nèi)的prev和thread屬性設(shè)置為null
  • 對原h(huán)ead節(jié)點的next指針設(shè)置為null勇吊,等待GC回收原h(huán)ead節(jié)點曼追。
+------+        +------+        +------+
| old  |  <-X-  | new  |  <---  |      |
| head |        | head |        | tail |
|  AN  |  -X->  |  BN  |  --->  | (CN) |
+------+        +------+        +------+

如上所示,AN節(jié)點(原h(huán)ead節(jié)點)等待被GC垃圾回收汉规。

5.2 共享鎖實現(xiàn)思路

5.2.1 獲取鎖

與獲取獨(dú)占鎖不同礼殊,關(guān)鍵在于,共享鎖可以被多個線程持有针史。

如果需要AQS實現(xiàn)共享鎖晶伦,在實現(xiàn)tryAcquireShared()方法時:

  • 返回負(fù)數(shù),表示獲取失敗
  • 返回0啄枕,表示獲取成功婚陪,但是后繼爭用線程不會成功
  • 返回正數(shù),表示獲取成功频祝,表示后繼爭用線程也可能成功
public final void acquireShared(int arg) {
    if (tryAcquireShared(arg) < 0)
        doAcquireShared(arg);
}

private void doAcquireShared(int arg) {
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();
            if (p == head) {
                int r = tryAcquireShared(arg);
                // 一旦共享獲取成功泌参,設(shè)置新的頭結(jié)點,并且喚醒后繼線程
                if (r >= 0) {
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    if (interrupted)
                        selfInterrupt();
                    failed = false;
                    return;
                }
            }
            if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}
    
/**
 * 這個函數(shù)做的事情有兩件:
 * 1. 在獲取共享鎖成功后常空,設(shè)置head節(jié)點
 * 2. 根據(jù)調(diào)用tryAcquireShared返回的狀態(tài)以及節(jié)點本身的等待狀態(tài)來判斷是否要需要喚醒后繼線程
 */
private void setHeadAndPropagate(Node node, int propagate) {
    // 把當(dāng)前的head封閉在方法棧上及舍,用以下面的條件檢查
    Node h = head;
    setHead(node);
    /*
     * propagate是tryAcquireShared的返回值,這是決定是否傳播喚醒的依據(jù)之一
     * h.waitStatus為SIGNAL或者PROPAGATE時也根據(jù)node的下一個節(jié)點共享來決定是否傳播喚醒
     */
    if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0) {
        Node s = node.next;
        if (s == null || s.isShared())
            doReleaseShared();
    }
}
    
/**
 * 這是共享鎖中的核心喚醒函數(shù)窟绷,主要做的事情就是喚醒下一個線程或者設(shè)置傳播狀態(tài)锯玛。
 * 后繼線程被喚醒后,會嘗試獲取共享鎖,如果成功之后攘残,則又會調(diào)用setHeadAndPropagate,將喚醒傳播下去拙友。
 * 這個函數(shù)的作用是保障在acquire和release存在競爭的情況下,保證隊列中處于等待狀態(tài)的節(jié)點能夠有辦法被喚醒歼郭。
 */
private void doReleaseShared() {
    /*
     * 以下的循環(huán)做的事情就是遗契,在隊列存在后繼線程的情況下,喚醒后繼線程病曾;
     * 或者由于多線程同時釋放共享鎖由于處在中間過程牍蜂,讀到head節(jié)點等待狀態(tài)為0的情況下,
     * 雖然不能unparkSuccessor泰涂,但為了保證喚醒能夠正確穩(wěn)固傳遞下去鲫竞,設(shè)置節(jié)點狀態(tài)為PROPAGATE。
     * 這樣的話獲取鎖的線程在執(zhí)行setHeadAndPropagate時可以讀到PROPAGATE逼蒙,從而由獲取鎖的線程去釋放后繼等待線程从绘。
     */
    for (;;) {
        Node h = head;
        // 如果隊列中存在后繼線程。
        if (h != null && h != tail) {
            int ws = h.waitStatus;
            if (ws == Node.SIGNAL) {
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                    continue;
                unparkSuccessor(h);
            }
            // 如果h節(jié)點的狀態(tài)為0是牢,需要設(shè)置為PROPAGATE用以保證喚醒的傳播僵井。
            else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                continue;
        }
        // 檢查h是否仍然是head,如果不是的話需要再進(jìn)行循環(huán)驳棱。
        if (h == head)
            break;
    }
}
5.2.1 釋放鎖

釋放共享鎖與獲取共享鎖的代碼都使用了doReleaseShared(int)

public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {
        // doReleaseShared的實現(xiàn)上面獲取共享鎖已經(jīng)介紹
        doReleaseShared();
        return true;
    }
    return false;
}

我覺得大家應(yīng)該都能看懂批什,還是簡單說一下吧(手動狗頭~):

同步等待隊列中,在喚醒因為獲取共享鎖失敗而阻塞的后繼節(jié)點線程后社搅,后繼節(jié)點線程會依次喚醒其后繼節(jié)點驻债!依次類推。

再換種說法罚渐?

這種情況有可能是:寫鎖導(dǎo)致獲取讀鎖的一些線程阻塞却汉,而寫鎖釋放后驯妄,會喚醒后繼節(jié)點線程荷并,如果該后繼節(jié)點,恰好是因為獲取讀鎖失敗而阻塞的線程青扔,那么該后繼節(jié)點線程會喚醒其后繼節(jié)點...直到全部獲取讀鎖成功源织,或者某一節(jié)點獲取寫鎖成功。

6微猖、拓展

6.1 不得不說的PROPAGATE

有個關(guān)于AQS的bug谈息,真的值得大家看一看

在共享鎖獲取與釋放的操作中,我覺得有個特別的重要的waitStatus狀態(tài)值凛剥,要和大家說一說侠仇,就是PROPAGATE,這個屬性值的意思是,用于將喚醒后繼線程傳遞下去逻炊,這個狀態(tài)的引入是為了完善和增強(qiáng)共享鎖的喚醒機(jī)制互亮。

之前翻閱了很多關(guān)于AQS的文章,講到這個狀態(tài)值的少之又少余素,哪怕是《Java并發(fā)編程實戰(zhàn)》這本書豹休,也是沒有提及,最終我看到有一位博客園的作者非常詳實的闡述了這個PEOPAGATE狀態(tài)桨吊,也是給了我很大的啟發(fā)威根。

沒錯,我第一次看AQS的源碼的時候视乐,甚至直接把這個PROPAGATE狀態(tài)值忽略掉了洛搀。事實上,不僅僅閱讀源碼的人炊林,容易把這個PROPAGATE狀態(tài)值忽略掉姥卢,哪怕是Doug Lea老爺子本人,在開發(fā)時也沒有意識到渣聚,如果沒有這個狀態(tài)值會導(dǎo)致什么樣的后果独榴,直到上面鏈接的bug出現(xiàn)后,老爺子才加上了這個狀態(tài)奕枝,徹底修復(fù)了這個bug棺榔。

復(fù)現(xiàn)該bug的代碼:

import java.util.concurrent.Semaphore;

public class TestSemaphore {

    private static Semaphore sem = new Semaphore(0);

    private static class Thread1 extends Thread {
        @Override
        public void run() {
            sem.acquireUninterruptibly();
        }
    }

    private static class Thread2 extends Thread {
        @Override
        public void run() {
            sem.release();
        }
    }

    public static void main(String[] args) throws InterruptedException {
        for (int i = 0; i < 10000000; i++) {
            Thread t1 = new Thread1();
            Thread t2 = new Thread1();
            Thread t3 = new Thread2();
            Thread t4 = new Thread2();
            t1.start();
            t2.start();
            t3.start();
            t4.start();
            t1.join();
            t2.join();
            t3.join();
            t4.join();
            System.out.println(i);
        }
    }
}

程序執(zhí)行時,會偶發(fā)線程hang住隘道。

我們再來看看之前的setHeadAndPropagate方法是什么樣的症歇。

private void setHeadAndPropagate(Node node, int propagate) {
    setHead(node);
    if (propagate > 0 && node.waitStatus != 0) {
        Node s = node.next;
        if (s == null || s.isShared())
            unparkSuccessor(node);
    }
}

然后Semaphore.release()調(diào)用的是AQS的releaseShared,看看當(dāng)時的releaseShared長什么樣:

public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}

再看看當(dāng)時的Node:

static final class Node {
    // 忽略掉無關(guān)的代碼谭梗,只展示waitStatus的狀態(tài)值

    static final int CANCELLED =  1;
    static final int SIGNAL    = -1;
    static final int CONDITION = -2;
}

setHeadAndPropagate方法和releaseShared方法忘晤,設(shè)計的也是很簡單。

當(dāng)時源碼里激捏,Node的waitStatus是沒有PROPAGATE=-3這個狀態(tài)值的设塔。

為了方便大家對照,我把當(dāng)時unparkSuccessor方法的源碼远舅,也一并展示出來:

private void unparkSuccessor(Node node) {

    // 將node的waitStatus設(shè)置為0
    compareAndSetWaitStatus(node, Node.SIGNAL, 0);

    Node s = node.next;
    if (s == null || s.waitStatus > 0) {
        s = null;
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    if (s != null)
        LockSupport.unpark(s.thread);
}

接下來闰蛔,我們慢慢聊~

ps: 說真的,現(xiàn)在老板的位置離我的位置不遠(yuǎn)图柏,雖然我的工作已經(jīng)提前很多天完成了序六,但,還是有點慌~蚤吹,冒著風(fēng)險還要繼續(xù)寫例诀!

在AQS獲取共享鎖的操作中,進(jìn)入同步等待的線程(被阻塞掉),有兩種途徑可以被喚醒:

  • 其他線程釋放信號量后繁涂,調(diào)用unparkSuccessor(releaseShared方法中)
  • 其他線程獲取共享鎖成功后暮刃,會通過傳播機(jī)制來喚醒后繼節(jié)點(也就是在setHeadAndPropagate方法中)。

bug重現(xiàn)的例子爆土,很簡單椭懊,就是在循環(huán)中重復(fù)不斷的實例化4個線程,前兩個線程獲取信號量步势,兩個線程釋放信號量氧猬,主線程等待4個線程全都執(zhí)行完畢再執(zhí)行打印。

在后兩個線程沒有進(jìn)行釋放信號量的操作時坏瘩,AQS內(nèi)部的同步等待隊列是下面這種情況:

+------+        +------+        +------+
|      |  <---  |      |  <---  |      |
| head |        |  t1  |        |  t2  |
|      |  --->  |      |  --->  |      |
+------+        +------+        +------+
  • 1盅抚、t3釋放信號量,調(diào)用releaseShared倔矾,喚醒后繼節(jié)點里的線程t1妄均,同時,head的waitStatus變?yōu)?
  • 2哪自、t1被喚醒丰包,調(diào)用Semaphore.NonfairSync的tryAcquireShared方法,返回0
  • 3壤巷、t4釋放信號量邑彪,調(diào)用releaseShared,在releaseShared方法中讀到的head還是原h(huán)ead胧华,但是此時head的waitStatus已經(jīng)變?yōu)?寄症,所以不會調(diào)用unparkSuccessor方法
  • 4、t1被喚醒了矩动,由于在步驟2里有巧,調(diào)用Semaphore.NonfairSync的tryAcquireShared方法,返回的是0悲没,所以它也不會調(diào)用unparkSuccessor方法

至此篮迎,兩種途徑全部被封死,沒有任何線程去喚醒t2了檀训,線程被hang住...

ps:Doug Lea 黑人問號臉柑潦,哈哈~

老爺子為了修復(fù)這個bug享言,做出了如下改進(jìn):

  • 1峻凫、增加一個waitStatus的狀態(tài),即PROPAGATE
  • 2览露、在releaseShared方法中抽取提煉出了doReleaseShared()(上面有展示)在doReleaseShared方法中荧琼,如果head節(jié)點的狀態(tài)為0,需要設(shè)置為PROPAGATE用以保證喚醒的傳播。
  • 3命锄、在setHeadAndPropagate方法中也多了一些判斷堰乔,其中就有head節(jié)點的waitStatus如果小于0,就喚醒后繼節(jié)點(PROPAGATE = -3)脐恩。

通過改進(jìn)之后的代碼镐侯,我們再來復(fù)盤一下:

  • 1、t3釋放信號量驶冒,調(diào)用releaseShared苟翻,喚醒后繼節(jié)點里的線程t1,同時骗污,head的waitStatus變?yōu)?
  • 2、t1被喚醒,調(diào)用Semaphore.NonfairSync的tryAcquireShared方法让虐,返回0
  • 3氓润、此步驟和2和同一時刻發(fā)生,t4釋放信號量屋厘,調(diào)用releaseShared涕烧,在doReleaseShared方法中讀到的head還是原h(huán)ead,但是此時head的waitStatus已經(jīng)變?yōu)?汗洒,將head的waitStatus設(shè)置為PROPAGATE(-3)
  • 4澈魄、t1被喚醒了,調(diào)用setHeadAndPropagate方法仲翎,將t1設(shè)置為head痹扇,符合條件判斷,進(jìn)入分支語句溯香,調(diào)用doReleaseShared方法鲫构,繼而喚醒t2節(jié)點線程。

6.2 unparkSuccessor的一點思考

private void unparkSuccessor(Node node) {
    int ws = node.waitStatus;
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0);

    /*
     * 通常情況下玫坛,要喚醒的線程都是當(dāng)前節(jié)點的后繼線程
     * 但是结笨,如果當(dāng)前節(jié)點的后繼節(jié)點被取消了,則從隊列尾部向前遍歷湿镀,直到找到未被取消的后繼節(jié)點
     */
    Node s = node.next;
    if (s == null || s.waitStatus > 0) {
        s = null;
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    if (s != null)
        LockSupport.unpark(s.thread);
}

unparkSuccessor方法中炕吸,如果當(dāng)前節(jié)點的后繼節(jié)點被取消了,則從隊列尾部向前遍歷勉痴,直到找到未被取消的后繼節(jié)點赫模。

這個問題,大家也可以自己思考一下蒸矛,為什么要從tail節(jié)點開始向前遍歷瀑罗?

假設(shè)胸嘴,CLH隊列如下圖所示:

+------+        +------+        +------+
|      |  <---  |      |  <---  |      |
| head |        |  t1  |        | tail |
|      |  --->  |      |  --->  |      |
+------+        +------+        +------+

t1.waitStatus = 1 且 tail.waitStatus = 1

head嘗試喚醒后繼節(jié)點t1,發(fā)現(xiàn)t1是被取消狀態(tài)斩祭,遂找出t1的后繼節(jié)點tail劣像,發(fā)現(xiàn)tail也是被取消狀態(tài),但是tail.next == null摧玫。

與此同時耳奕,有個新節(jié)點加入到隊列尾部,但是還沒有將原tail.next指向新節(jié)點诬像。

也就是說吮铭,tail.next 如果恰好處在步驟1和步驟2中間的話,遍歷就會中斷颅停。

摘錄addWaiter部分代碼:

node.prev = pred;
// 通過CAS在隊尾插入當(dāng)前節(jié)點
if (compareAndSetTail(pred, node)) { // 步驟1
    pred.next = node; // 步驟2
    return node;
}

6.3 acquireQueued 方法里谓晌,為什么還要再tryAcquire?

以獨(dú)占模式來說癞揉,對于這個問題纸肉,我是這么想的:

時刻1:線程B嘗試獲取鎖,但是喊熟,由于鎖被線程A持有柏肪,所以,線程B準(zhǔn)備調(diào)用addWaiter芥牌,將自己入到隊列(但還沒有和head節(jié)點產(chǎn)生指針連接)

時刻1:同一時刻烦味,線程A嘗試釋放鎖,進(jìn)入release方法壁拉,調(diào)用子類的tryRelease()谬俄,將代表鎖持有次數(shù)的state置為0(代表鎖沒有被任何線程持有),進(jìn)入unparkSuccessor方法弃理,發(fā)現(xiàn)并沒有后繼節(jié)點(因為新節(jié)點還未入隊)溃论,所以不會喚醒任何線程,到這里痘昌,線程A釋放鎖操作完成钥勋。

時刻2:線程B調(diào)用addWaiter方法完畢,已經(jīng)入隊辆苔,并和head節(jié)點產(chǎn)生指針連接

時刻3:線程B調(diào)用acquireQueued方法(如下方代碼展示)算灸,如果在這個方法里面不調(diào)用tryAcquire,就會發(fā)生這樣的情況:明明可以獲取鎖驻啤,但是線程卻被休眠了菲驴,進(jìn)而導(dǎo)致整個同步隊列不可用

所以,再次調(diào)用tryAcquire是為了防止新節(jié)點還未入隊街佑,但是頭結(jié)點已經(jīng)釋放了鎖谢翎,導(dǎo)致整個同步隊列癱瘓的情況發(fā)生。

final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();
            // 防止新節(jié)點還未入隊沐旨,但是頭結(jié)點已經(jīng)釋放了鎖森逮,導(dǎo)致整個同步隊列中斷癱瘓的情況發(fā)生
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

結(jié)束

通過閱讀AQS的源碼,對于我們學(xué)習(xí)和掌握基于AQS實現(xiàn)的組件磁携,是有很大幫助的褒侧。

尤其是它的設(shè)計理念和思想,更是我們學(xué)習(xí)的重點谊迄!

Doug Lea的AQS論文闷供,英語較好的朋友,不妨去讀一讀

我在掘金的此文章鏈接

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末统诺,一起剝皮案震驚了整個濱河市歪脏,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌粮呢,老刑警劉巖婿失,帶你破解...
    沈念sama閱讀 211,743評論 6 492
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異啄寡,居然都是意外死亡豪硅,警方通過查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,296評論 3 385
  • 文/潘曉璐 我一進(jìn)店門挺物,熙熙樓的掌柜王于貴愁眉苦臉地迎上來懒浮,“玉大人,你說我怎么就攤上這事识藤⊙庵” “怎么了?”我有些...
    開封第一講書人閱讀 157,285評論 0 348
  • 文/不壞的土叔 我叫張陵痴昧,是天一觀的道長赖草。 經(jīng)常有香客問我,道長剪个,這世上最難降的妖魔是什么秧骑? 我笑而不...
    開封第一講書人閱讀 56,485評論 1 283
  • 正文 為了忘掉前任,我火速辦了婚禮扣囊,結(jié)果婚禮上乎折,老公的妹妹穿的比我還像新娘。我一直安慰自己侵歇,他們只是感情好骂澄,可當(dāng)我...
    茶點故事閱讀 65,581評論 6 386
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著惕虑,像睡著了一般坟冲。 火紅的嫁衣襯著肌膚如雪磨镶。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 49,821評論 1 290
  • 那天健提,我揣著相機(jī)與錄音琳猫,去河邊找鬼。 笑死私痹,一個胖子當(dāng)著我的面吹牛脐嫂,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播紊遵,決...
    沈念sama閱讀 38,960評論 3 408
  • 文/蒼蘭香墨 我猛地睜開眼账千,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了暗膜?” 一聲冷哼從身側(cè)響起匀奏,我...
    開封第一講書人閱讀 37,719評論 0 266
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎学搜,沒想到半個月后攒射,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 44,186評論 1 303
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡恒水,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 36,516評論 2 327
  • 正文 我和宋清朗相戀三年会放,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片钉凌。...
    茶點故事閱讀 38,650評論 1 340
  • 序言:一個原本活蹦亂跳的男人離奇死亡咧最,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出御雕,到底是詐尸還是另有隱情矢沿,我是刑警寧澤,帶...
    沈念sama閱讀 34,329評論 4 330
  • 正文 年R本政府宣布酸纲,位于F島的核電站捣鲸,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏闽坡。R本人自食惡果不足惜栽惶,卻給世界環(huán)境...
    茶點故事閱讀 39,936評論 3 313
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望疾嗅。 院中可真熱鬧外厂,春花似錦、人聲如沸代承。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,757評論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽论悴。三九已至掖棉,卻和暖如春墓律,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背幔亥。 一陣腳步聲響...
    開封第一講書人閱讀 31,991評論 1 266
  • 我被黑心中介騙來泰國打工耻讽, 沒想到剛下飛機(jī)就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人紫谷。 一個月前我還...
    沈念sama閱讀 46,370評論 2 360
  • 正文 我出身青樓齐饮,卻偏偏與公主長得像捐寥,于是被迫代替她去往敵國和親笤昨。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 43,527評論 2 349