深入刨析AQS

[toc]



摘要

本文通過ReentrantLock來窺探AbstractQueuedSynchronizer(AQS)的實現(xiàn)原理拒名,在看此文之前噩咪。你需要了解一下park顾彰、unpark的功能,請移步至上一篇《深入剖析park胃碾、unpark》涨享;

AbstractQueuedSynchronizer實現(xiàn)一把鎖

根據(jù)AbstractQueuedSynchronizer的官方文檔,如果想實現(xiàn)一把鎖的仆百,需要繼承AbstractQueuedSynchronizer厕隧,并需要重寫tryAcquire、tryRelease、可選擇重寫isHeldExclusively提供locked state吁讨、因為支持序列化髓迎,所以需要重寫readObject以便反序列化時恢復(fù)原始值、newCondition提供條件建丧;官方提供的java代碼如下(官方文檔見參考連接)排龄;

public class MyLock implements Lock, java.io.Serializable {
    private static class Sync extends AbstractQueuedSynchronizer {
      
        // Acquires the lock if state is zero
        @Override
        public boolean tryAcquire(int acquires) {
            assert acquires == 1; // Otherwise unused
            if (compareAndSetState(0, 1)) {
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
            return false;
        }

        // Releases the lock by setting state to zero
        @Override
        protected boolean tryRelease(int releases) {
            assert releases == 1; // Otherwise unused
            if (getState() == 0) {
                throw new IllegalMonitorStateException();
            }
            setExclusiveOwnerThread(null);
            setState(0);
            return true;
        }

        // Provides a Condition
        Condition newCondition() {
            return new ConditionObject();
        }

        // Deserializes properly
        private void readObject(ObjectInputStream s)
                throws IOException, ClassNotFoundException {
            s.defaultReadObject();
            setState(0); // reset to unlocked state
        }
      
       // Reports whether in locked state
        @Override
        protected boolean isHeldExclusively() {
            return getState() == 1;
        }
    }

    /**
     * The sync object does all the hard work. We just forward to it.
     */
    private final Sync sync = new Sync();

    @Override
    public void lock() {
        sync.acquire(1);
    }

    @Override
    public void lockInterruptibly() throws InterruptedException {
        sync.acquireInterruptibly(1);
    }

    @Override
    public boolean tryLock() {
        return sync.tryAcquire(1);
    }

    @Override
    public boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException {
        return sync.tryAcquireNanos(1, unit.toNanos(timeout));
    }

    @Override
    public void unlock() {
        sync.release(1);
    }

    @Override
    public Condition newCondition() {
        return sync.newCondition();
    }


    private static volatile Integer value = 0;

    public static void main(String[] args) {

        MyLock myLock = new MyLock();
        for (int i = 0; i < 1000; i++) {
            new Thread(()->{
                myLock.lock();
                value ++;
                myLock.unlock();
            }).start();
        }
        System.out.println(value);
    }
}

上面是一個不可重入的鎖,它實現(xiàn)了一個鎖基礎(chǔ)功能翎朱,目的是為了跟ReentrantLock的實現(xiàn)做對比橄维;

ReentrantLock

ReentrantLock的特點(diǎn)

ReentrantLock意思為可重入鎖,指的是一個線程能夠?qū)σ粋€臨界資源重復(fù)加鎖拴曲。ReentrantLock跟常用的Synchronized進(jìn)行比較争舞;

image-20200603140814563

Synchronized的基礎(chǔ)用法

Synchronized的分析可以參考《深入剖析synchronized關(guān)鍵詞》,ReentrantLock可以創(chuàng)建公平鎖澈灼、也可以創(chuàng)建非公平鎖竞川,接下來看一下ReentrantLock的簡單用法,非公平鎖實現(xiàn)比較簡單叁熔,今天重點(diǎn)是公平鎖流译;

public class ReentrantLockTest {

    public static void main(String[] args) {
        ReentrantLock reentrantLock = new ReentrantLock(true);
        reentrantLock.lock();
        try {
            log.info("lock");
        } catch (Exception e) {
            log.error(e);
        } finally {
            reentrantLock.unlock();
            log.info("unlock");
        }
    }
}

ReentrantLock與AQS的關(guān)聯(lián)

先看一下加鎖方法lock

  • 非公平鎖lock方法

    compareAndSetState很好理解,通過CAS加鎖者疤,如果加鎖失敗調(diào)用acquire;

/**
 * Performs lock.  Try immediate barge, backing up to normal
 * acquire on failure.
 */
final void lock() {
    if (compareAndSetState(0, 1))
        setExclusiveOwnerThread(Thread.currentThread());
    else
        acquire(1);
}
  • 公平鎖lock方法
final void lock() {
    acquire(1);
}
  • AQS框架的處理流程

? 線程繼續(xù)等待叠赦,仍然保留獲取鎖的可能驹马,獲取鎖流程仍在繼續(xù),分析實現(xiàn)原理

public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

總結(jié):公平鎖的上鎖是必須判斷自己是不是需要排隊除秀;而非公平鎖是直接進(jìn)行CAS修改計數(shù)器看能不能加鎖成功糯累;如果加鎖不成功則乖乖排隊(調(diào)用acquire);所以不管公平還是不公平册踩;只要進(jìn)到了AQS隊列當(dāng)中那么他就會排隊;

AQS架構(gòu)圖

美團(tuán)畫的AQS的架構(gòu)圖泳姐,很詳細(xì),當(dāng)有自定義同步器接入時暂吉,只需重寫第一層所需要的部分方法即可胖秒,不需要關(guān)注底層具體的實現(xiàn)流程。當(dāng)自定義同步器進(jìn)行加鎖或者解鎖操作時慕的,先經(jīng)過第一層的API進(jìn)入AQS內(nèi)部方法阎肝,然后經(jīng)過第二層進(jìn)行鎖的獲取,接著對于獲取鎖失敗的流程肮街,進(jìn)入第三層和第四層的等待隊列處理风题,而這些處理方式均依賴于第五層的基礎(chǔ)數(shù)據(jù)提供層。

82077ccf14127a87b77cefd1ccf562d3253591

AQS核心思想是,如果被請求的共享資源空閑沛硅,那么就將當(dāng)前請求資源的線程設(shè)置為有效的工作線程眼刃,將共享資源設(shè)置為鎖定狀態(tài);如果共享資源被占用摇肌,就需要一定的阻塞等待喚醒機(jī)制來保證鎖分配擂红。這個機(jī)制主要用的是CLH隊列的變體實現(xiàn)的,將暫時獲取不到鎖的線程加入到隊列中朦蕴。

CLH:Craig篮条、Landin and Hagersten隊列,是單向鏈表吩抓,AQS中的隊列是CLH變體的虛擬雙向隊列(FIFO)涉茧,AQS是通過將每條請求共享資源的線程封裝成一個節(jié)點(diǎn)來實現(xiàn)鎖的分配。

  • 非公平鎖的加鎖流程

<img src="https://gitee.com/yangsanchao/images/raw/master/blogs/b8b53a70984668bc68653efe9531573e78636-20200604160346262-20200604160403594.png" alt="img" style="zoom:50%;" />

  • 公平鎖的加鎖流程

<img src="https://gitee.com/yangsanchao/images/raw/master/blogs/image-20200604172945029.png" alt="image-20200604172945029" style="zoom:50%;" />

  • 解鎖公平鎖和非公平鎖邏輯一致

<img src="https://gitee.com/yangsanchao/images/raw/master/blogs/image-20200604173023011.png" alt="image-20200604173023011" style="zoom:50%;" />

加鎖:

  • 通過ReentrantLock的加鎖方法Lock進(jìn)行加鎖操作疹娶。
  • 會調(diào)用到內(nèi)部類Sync的Lock方法伴栓,由于Sync#lock是抽象方法,根據(jù)ReentrantLock初始化選擇的公平鎖和非公平鎖雨饺,執(zhí)行相關(guān)內(nèi)部類的Lock方法钳垮,本質(zhì)上都會執(zhí)行AQS的Acquire方法。
  • AQS的Acquire方法會執(zhí)行tryAcquire方法额港,但是由于tryAcquire需要自定義同步器實現(xiàn)饺窿,因此執(zhí)行了ReentrantLock中的tryAcquire方法,由于ReentrantLock是通過公平鎖和非公平鎖內(nèi)部類實現(xiàn)的tryAcquire方法移斩,因此會根據(jù)鎖類型不同肚医,執(zhí)行不同的tryAcquire。
  • tryAcquire是獲取鎖邏輯向瓷,獲取失敗后肠套,會執(zhí)行框架AQS的后續(xù)邏輯,跟ReentrantLock自定義同步器無關(guān)猖任。
  • 流程:Lock -> acquire -> tryAcquire( or nonfairTryAcquire)

解鎖:

  • 通過ReentrantLock的解鎖方法Unlock進(jìn)行解鎖你稚。
  • Unlock會調(diào)用內(nèi)部類Sync的Release方法,該方法繼承于AQS朱躺。
  • Release中會調(diào)用tryRelease方法刁赖,tryRelease需要自定義同步器實現(xiàn),tryRelease只在ReentrantLock中的Sync實現(xiàn)室琢,因此可以看出乾闰,釋放鎖的過程,并不區(qū)分是否為公平鎖盈滴。
  • 釋放成功后涯肩,所有處理由AQS框架完成轿钠,與自定義同步器無關(guān)。
  • 流程:unlock -> release -> tryRelease

acquire獲取鎖

public final void acquire(int arg) {
    if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)){
        selfInterrupt();
    }
}

tryAcquire

acquire方法首先會調(diào)tryAcquire方法病苗,需要注意的是tryAcquire的結(jié)果做取反疗垛;根據(jù)前面分析,tryAcquire會調(diào)用子類的實現(xiàn)硫朦,ReentrantLock有兩個內(nèi)部類贷腕,F(xiàn)airSync,NonfairSync咬展,都繼承自Sync泽裳,Sync繼承AbstractQueuedSynchronizer;

實現(xiàn)方式差別在是否有hasQueuedPredecessors() 的判斷條件

  • 公平鎖實現(xiàn)
/**
 * Fair version of tryAcquire.  Don't grant access unless
 * recursive call or no waiters or is first.
 */
protected final boolean tryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    // 獲取lock對象的上鎖狀態(tài)破婆,如果鎖是自由狀態(tài)則=0涮总,如果被上鎖則為1,大于1表示重入  
    int c = getState();
    if (c == 0) {
        // hasQueuedPredecessors祷舀,判斷自己是否需要排隊
        // 下面我會單獨(dú)介紹瀑梗,如果不需要排隊則進(jìn)行cas嘗試加鎖
        // 如果加鎖成功則把當(dāng)前線程設(shè)置為擁有鎖的線程
        if (!hasQueuedPredecessors() &&
            compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    // 如果C不等于0,但是當(dāng)前線程等于擁有鎖的線程則表示這是一次重入裳扯,那么直接把狀態(tài)+1表示重入次數(shù)+1
    else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
        if (nextc < 0)
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    return false;
}

非公平鎖

/**
 * Performs non-fair tryLock.  tryAcquire is implemented in
 * subclasses, but both need nonfair try for trylock method.
 */
final boolean nonfairTryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    if (c == 0) {
        if (compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
        if (nextc < 0) // overflow
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    return false;
}

hasQueuedPredecessors

public final boolean hasQueuedPredecessors() {
    // The correctness of this depends on head being initialized
    // before tail and on head.next being accurate if the current
    // thread is first in queue.
    Node t = tail; // Read fields in reverse initialization order
    Node h = head;
    Node s;
    return h != t &&
        ((s = h.next) == null || s.thread != Thread.currentThread());
}
  • Node

先來看下AQS中最基本的數(shù)據(jù)結(jié)構(gòu)——Node抛丽,Node即為上面CLH變體隊列中的節(jié)點(diǎn)。

static final class Node {
    static final Node SHARED = new Node(); // 表示線程以共享的模式等待鎖
    static final Node EXCLUSIVE = null; // 表示線程正在以獨(dú)占的方式等待鎖
    static final int CANCELLED =  1; // 表示線程獲取鎖的請求已經(jīng)取消了
    static final int SIGNAL    = -1; // 表示線程已經(jīng)準(zhǔn)備好了饰豺,就等資源釋放了
    static final int CONDITION = -2; // 表示節(jié)點(diǎn)在等待隊列中亿鲜,節(jié)點(diǎn)線程等待喚醒
    static final int PROPAGATE = -3; // 當(dāng)前線程處在SHARED情況下,該字段才會使用
    volatile int waitStatus; // 當(dāng)前節(jié)點(diǎn)在隊列中的狀態(tài)
    volatile Node prev; // 前驅(qū)指針
    volatile Node next; // 后繼指針
    volatile Thread thread; // 表示處于該節(jié)點(diǎn)的線程
    Node nextWaiter; // 指向下一個處于CONDITION狀態(tài)的節(jié)點(diǎn)
    final boolean isShared() {
        return nextWaiter == SHARED;
    }
    // 返回前驅(qū)節(jié)點(diǎn)冤吨,沒有的話拋出npe
    final Node predecessor() throws NullPointerException {
        Node p = prev;
        if (p == null)
            throw new NullPointerException();
        else
            return p;
    }
    Node() {    // Used to establish initial head or SHARED marker
    }
    Node(Thread thread, Node mode) {     // Used by addWaiter
        this.nextWaiter = mode;
        this.thread = thread;
    }
    Node(Thread thread, int waitStatus) { // Used by Condition
        this.waitStatus = waitStatus;
        this.thread = thread;
    }
}

再看hasQueuedPredecessors狡门,整個方法如果最后返回false,則去加鎖锅很,如果返回true則不加鎖,因為這個方法被取反操作凤跑;hasQueuedPredecessors是公平鎖加鎖時判斷等待隊列中是否存在有效節(jié)點(diǎn)的方法爆安。如果返回False,說明當(dāng)前線程可以爭取共享資源仔引;如果返回True扔仓,說明隊列中存在有效節(jié)點(diǎn),當(dāng)前線程必須加入到等待隊列中咖耘。

  • h != t && ((s = h.next) == null || s.thread != Thread.currentThread());

雙向鏈表中翘簇,第一個節(jié)點(diǎn)為虛節(jié)點(diǎn),其實并不存儲任何信息儿倒,只是占位版保。真正的第一個有數(shù)據(jù)的節(jié)點(diǎn)呜笑,是在第二個節(jié)點(diǎn)開始的。

  • 當(dāng)h != t時: 如果(s = h.next) == null彻犁,等待隊列正在有線程進(jìn)行初始化叫胁,但只是進(jìn)行到了Tail指向Head,沒有將Head指向Tail汞幢,此時隊列中有元素驼鹅,需要返回True(這塊具體見下邊代碼分析)。
  • 如果(s = h.next) != null森篷,說明此時隊列中至少有一個有效節(jié)點(diǎn)输钩。
  • 如果此時s.thread == Thread.currentThread(),說明等待隊列的第一個有效節(jié)點(diǎn)中的線程與當(dāng)前線程相同仲智,那么當(dāng)前線程是可以獲取資源的买乃;
  • 如果s.thread != Thread.currentThread(),說明等待隊列的第一個有效節(jié)點(diǎn)線程與當(dāng)前線程不同坎藐,當(dāng)前線程必須加入進(jìn)等待隊列为牍。

如果這上面沒有看懂,沒有關(guān)系岩馍,先來分析一下構(gòu)建整個隊列的過程碉咆;

  • addWaiter(Node.EXCLUSIVE)
private Node addWaiter(Node mode) {
    Node node = new Node(Thread.currentThread(), mode);
    // Try the fast path of enq; backup to full enq on failure
    // tail為對尾,賦值給pred
    Node pred = tail;
    // 判斷pred是否為空蛀恩,其實就是判斷對尾是否有節(jié)點(diǎn)疫铜,其實只要隊列被初始化了對尾肯定不為空
    if (pred != null) {
        node.prev = pred;
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    enq(node);
    return node;
}
  • enq
private Node enq(final Node node) {
    for (;;) {
        Node t = tail;
        if (t == null) { // Must initialize
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
            node.prev = t;
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}

用一張圖來分析一下,整個隊列構(gòu)建過程双谆;

image-20200604203002973
  • (1)通過Node(Thread thread, Node mode) 方法構(gòu)建一個node節(jié)點(diǎn)(node2)壳咕,此時的nextWaiter為空,線程不為空顽馋,是當(dāng)前線程谓厘;

  • (2)如果隊尾為空,則說明隊列未建立寸谜,調(diào)用enq構(gòu)建第一個虛擬節(jié)點(diǎn)(node1)竟稳,通過compareAndSetHead方法構(gòu)建一個頭節(jié)點(diǎn),需要注意的是該頭節(jié)點(diǎn)thread是null熊痴,后續(xù)很多都是用線程是否為null來判讀是否為第一個虛擬節(jié)點(diǎn)他爸;

  • (3)將node1 cas設(shè)置為head

  • (4)將頭節(jié)點(diǎn)賦值為tail = head

  • (5)進(jìn)入下一次for循環(huán)時,會走到else分支果善,會將傳入的node的指向頭部節(jié)點(diǎn)的next辆童,此時node2的prev指向node1(tail)

  • (6)將node2 cas設(shè)置為tail赛惩;

  • (7)將node2指向node1的next;

    經(jīng)過上面的步驟焰枢,就構(gòu)建了一個長度為2的隊列;

添加第二個隊列時,走的是這段代碼,流程就簡單多了,代碼如下

if (pred != null) {
    node.prev = pred;
    if (compareAndSetTail(pred, node)) {
        pred.next = node;
        return node;
    }
}
image-20200604202859302

再看一下h != t && ((s = h.next) == null || s.thread != Thread.currentThread());因為整個構(gòu)建過程并不是原子操作止喷,所以這個條件判斷,現(xiàn)在再是不是就看明白了混聊?

  • 當(dāng)h != t時(3)步驟已經(jīng)完成: 如果(s = h.next) == null 此時步驟(4)未完成弹谁,等待隊列正在有線程進(jìn)行初始化,但只是進(jìn)行到了Tail指向Head句喜,沒有將Head指向Tail预愤,此時隊列中有元素,需要返回True
  • 如果(s = h.next) != null咳胃,說明此時隊列中至少有一個有效節(jié)點(diǎn)植康。
  • 如果此時s.thread == Thread.currentThread(),說明等待隊列的第一個有效節(jié)點(diǎn)中的線程與當(dāng)前線程相同展懈,那么當(dāng)前線程是可以獲取資源的销睁;
  • 如果s.thread != Thread.currentThread(),說明等待隊列的第一個有效節(jié)點(diǎn)線程與當(dāng)前線程不同存崖,當(dāng)前線程必須加入進(jìn)等待隊列冻记。

acquireQueued

addWaiter方法其實就是把對應(yīng)的線程以Node的數(shù)據(jù)結(jié)構(gòu)形式加入到雙端隊列里,返回的是一個包含該線程的Node来惧。而這個Node會作為參數(shù)冗栗,進(jìn)入到acquireQueued方法中。acquireQueued方法可以對排隊中的線程進(jìn)行“獲鎖”操作供搀∮缇樱總的來說,一個線程獲取鎖失敗了葛虐,被放入等待隊列胎源,acquireQueued會把放入隊列中的線程不斷去獲取鎖,直到獲取成功或者不再需要獲扔炱辍(中斷)乒融。

下面通過代碼從“何時出隊列?”和“如何出隊列摄悯?”兩個方向來分析一下acquireQueued源碼:

final boolean acquireQueued(final Node node, int arg) {
    // 標(biāo)記是否成功拿到資源
    boolean failed = true;
    try {
        // 標(biāo)記等待過程中是否中斷過
        boolean interrupted = false;
        for (;;) {
            // 獲取當(dāng)前節(jié)點(diǎn)的前驅(qū)節(jié)點(diǎn),有兩種情況愧捕;1奢驯、上一個節(jié)點(diǎn)為頭部;2上一個節(jié)點(diǎn)不為頭部
            final Node p = node.predecessor();
            // 如果p是頭結(jié)點(diǎn)次绘,說明當(dāng)前節(jié)點(diǎn)在真實數(shù)據(jù)隊列的首部瘪阁,就嘗試獲取鎖(頭結(jié)點(diǎn)是虛節(jié)點(diǎn))
            // 因為第一次tryAcquire判斷是否需要排隊撒遣,如果需要排隊,那么我就入隊管跺,此處再重試一次
            if (p == head && tryAcquire(arg)) {
                // 獲取鎖成功义黎,頭指針移動到當(dāng)前node
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            // 說明p為頭節(jié)點(diǎn)且當(dāng)前沒有獲取到鎖(可能是非公平鎖被搶占了)或者是p不為頭結(jié)點(diǎn),這個時候就要判斷當(dāng)前node是否要被阻塞(被阻塞條件:前驅(qū)節(jié)點(diǎn)的waitStatus為-1)豁跑,防止無限循環(huán)浪費(fèi)資源廉涕。具體兩個方法下面細(xì)細(xì)分析
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
       // 成功拿到資源,準(zhǔn)備釋放
        if (failed)
            cancelAcquire(node);
    }
}

setHead

設(shè)置當(dāng)前節(jié)點(diǎn)為頭節(jié)點(diǎn)艇拍,并且將node.thread為空(剛才提到判斷是否為頭部虛擬節(jié)點(diǎn)的條件就是node.thread == null狐蜕。waitStatus狀態(tài)并為修改,等下我們再分析卸夕;

private void setHead(Node node) {
    head = node;
    node.thread = null;
    node.prev = null;
}

shouldParkAfterFailedAcquire

接下來看shouldParkAfterFailedAcquire代碼层释,需要注意的是,每一個新創(chuàng)建Node的節(jié)點(diǎn)是被下一個排隊的node設(shè)置為等待狀態(tài)為SIGNAL快集, 這里比較難以理解為什么需要去改變上一個節(jié)點(diǎn)的park狀態(tài)贡羔?

每個node都有一個狀態(tài),默認(rèn)為0个初,表示無狀態(tài)乖寒,-1表示在park;當(dāng)時不能自己把自己改成-1狀態(tài)勃黍?因為你得確定你自己park了才是能改為-1宵统;所以只能先park;在改狀態(tài)覆获;但是問題你自己都park了马澈;完全釋放CPU資源了,故而沒有辦法執(zhí)行任何代碼了弄息,所以只能別人來改痊班;故而可以看到每次都是自己的后一個節(jié)點(diǎn)把自己改成-1狀態(tài);

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    // 獲取前驅(qū)節(jié)點(diǎn)的狀態(tài) 
    int ws = pred.waitStatus;
    // 說明頭結(jié)點(diǎn)處于喚醒狀態(tài)
    if (ws == Node.SIGNAL)
        return true;
    // static final int CANCELLED =  1; // 表示線程獲取鎖的請求已經(jīng)取消了
    // static final int SIGNAL    = -1; // 表示線程已經(jīng)準(zhǔn)備好了摹量,就等資源釋放了
    // static final int CONDITION = -2; // 表示節(jié)點(diǎn)在等待隊列中涤伐,節(jié)點(diǎn)線程等待喚醒
    // static final int PROPAGATE = -3; // 當(dāng)前線程處在SHARED情況下,該字段才會使用
    if (ws > 0) {
        do {
            // 把取消節(jié)點(diǎn)從隊列中剔除
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
        // 設(shè)置前任節(jié)點(diǎn)等待狀態(tài)為SIGNAL
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}

parkAndCheckInterrupt

調(diào)用LockSupport.park掛起當(dāng)前線程缨称,自己已經(jīng)park凝果,無法再修改狀態(tài)了!

private final boolean parkAndCheckInterrupt() {
    // 調(diào)?用park()使線程進(jìn)?入waiting狀態(tài)
    LockSupport.park(this);
    // 如果被喚醒睦尽,查看?自?己是不不是被中斷的器净,這?里里先清除?下標(biāo)記位
    return Thread.interrupted(); 
}

shouldParkAfterFailedAcquire的整個流程還是比較清晰的,如果不清楚当凡,可以參考美團(tuán)畫的流程圖山害;

cancelAcquire

通過上面的分析纠俭,當(dāng)failed為true時,也就意味著park結(jié)束浪慌,線程被喚醒了冤荆,for循環(huán)已經(jīng)跳出,開始執(zhí)行cancelAcquire权纤,通過cancelAcquire方法钓简,將Node的狀態(tài)標(biāo)記為CANCELLED;代碼如下:

private void cancelAcquire(Node node) {
    // 將無效節(jié)點(diǎn)過濾
    if (node == null)
        return;
    // 設(shè)置該節(jié)點(diǎn)不關(guān)聯(lián)任何線程妖碉,也就是虛節(jié)點(diǎn)(上面已經(jīng)提到涌庭,node.thread = null是判讀是否是頭節(jié)點(diǎn)的條件)
    node.thread = null;
    Node pred = node.prev;
    // 通過前驅(qū)節(jié)點(diǎn),處理waitStatus > 0的node
    while (pred.waitStatus > 0)
        node.prev = pred = pred.prev;
    // 把當(dāng)前node的狀態(tài)設(shè)置為CANCELLED欧宜,當(dāng)下一個node排隊結(jié)束時坐榆,自己就會被上一行代碼處理掉;
    Node predNext = pred.next;
    node.waitStatus = Node.CANCELLED;
    // 如果當(dāng)前節(jié)點(diǎn)是尾節(jié)點(diǎn)冗茸,將從后往前的第一個非取消狀態(tài)的節(jié)點(diǎn)設(shè)置為尾節(jié)點(diǎn)席镀,更新失敗的話,則進(jìn)入else夏漱,如果更新成功豪诲,將tail的后繼節(jié)點(diǎn)設(shè)置為null
    if (node == tail && compareAndSetTail(node, pred)) {
        // 把自己設(shè)置為null
        compareAndSetNext(pred, predNext, null);
    } else {
        int ws;
        // 如果當(dāng)前節(jié)點(diǎn)不是head的后繼節(jié)點(diǎn)
        // 1:判斷當(dāng)前節(jié)點(diǎn)前驅(qū)節(jié)點(diǎn)的是否為SIGNAL
        // 2:如果不是,則把前驅(qū)節(jié)點(diǎn)設(shè)置為SINGAL看是否成功
        // 如果1和2中有一個為true挂绰,再判斷當(dāng)前節(jié)點(diǎn)的線程是否為null
        // 如果上述條件都滿足屎篱,把當(dāng)前節(jié)點(diǎn)的前驅(qū)節(jié)點(diǎn)的后繼指針指向當(dāng)前節(jié)點(diǎn)的后繼節(jié)點(diǎn) 
        if (pred != head &&
            ((ws = pred.waitStatus) == Node.SIGNAL ||
             (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
            pred.thread != null) {
            Node next = node.next;
            if (next != null && next.waitStatus <= 0)
                compareAndSetNext(pred, predNext, next);
        } else {
            // 如果當(dāng)前節(jié)點(diǎn)是head的后繼節(jié)點(diǎn),或者上述條件不滿足葵蒂,那就喚醒當(dāng)前節(jié)點(diǎn)的后繼節(jié)點(diǎn)
            unparkSuccessor(node);
        }
        node.next = node; // help GC
    }
}

當(dāng)前的流程:

  • 獲取當(dāng)前節(jié)點(diǎn)的前驅(qū)節(jié)點(diǎn)交播,如果前驅(qū)節(jié)點(diǎn)的狀態(tài)是CANCELLED,那就一直往前遍歷践付,找到第一個waitStatus <= 0的節(jié)點(diǎn)秦士,將找到的Pred節(jié)點(diǎn)和當(dāng)前Node關(guān)聯(lián),將當(dāng)前Node設(shè)置為CANCELLED永高。

  • 根據(jù)當(dāng)前節(jié)點(diǎn)的位置隧土,考慮以下三種情況:

    (1) 當(dāng)前節(jié)點(diǎn)是尾節(jié)點(diǎn)。

    (2) 當(dāng)前節(jié)點(diǎn)是Head的后繼節(jié)點(diǎn)命爬。

    (3) 當(dāng)前節(jié)點(diǎn)不是Head的后繼節(jié)點(diǎn)曹傀,也不是尾節(jié)點(diǎn)。

(1)當(dāng)前節(jié)點(diǎn)時尾節(jié)點(diǎn)

<img src="https://gitee.com/yangsanchao/images/raw/master/blogs/image-20200607180254816.png" alt="image-20200607180254816" style="zoom:50%;" />

(2)當(dāng)前節(jié)點(diǎn)是Head的后繼節(jié)點(diǎn)饲宛。

這張圖描述的是這段代碼:unparkSuccessor

Node s = node.next;
if (s == null || s.waitStatus > 0) {
    s = null;
    for (Node t = tail; t != null && t != node; t = t.prev)
        if (t.waitStatus <= 0)
            s = t;
}

<img src="https://gitee.com/yangsanchao/images/raw/master/blogs/image-20200607180140169.png" alt="image-20200607180140169" style="zoom:50%;" />

(3)當(dāng)前節(jié)點(diǎn)不是Head的后繼節(jié)點(diǎn)皆愉,也不是尾節(jié)點(diǎn)。

這張圖描述的是這段代碼跟(2)一樣;

<img src="https://gitee.com/yangsanchao/images/raw/master/blogs/image-20200607180034112.png" alt="image-20200607180034112" style="zoom:50%;" />

通過上面的圖亥啦,你會發(fā)現(xiàn)所有的變化都是對Next指針進(jìn)行了操作,而沒有對Prev指針進(jìn)行操作练链,原因是執(zhí)行cancelAcquire的時候翔脱,當(dāng)前節(jié)點(diǎn)的前置節(jié)點(diǎn)可能已經(jīng)從隊列中出去了(已經(jīng)執(zhí)行過Try代碼塊中的shouldParkAfterFailedAcquire方法了),也就是下圖中代碼1和代碼2直接的間隙就會出現(xiàn)這種情況媒鼓,此時修改Prev指針届吁,有可能會導(dǎo)致Prev指向另一個已經(jīng)移除隊列的Node,因此這塊變化Prev指針不安全绿鸣。

<img src="https://gitee.com/yangsanchao/images/raw/master/blogs/image-20200607180655529.png" alt="image-20200607180655529" />

unlock解鎖

解鎖時并不區(qū)分公平和不公平疚沐,因為ReentrantLock實現(xiàn)了鎖的可重入,可以進(jìn)一步的看一下時如何處理的潮模,上代碼:

public void unlock() {
    sync.release(1);
}

release

public final boolean release(int arg) {
    // 自定義的tryRelease如果返回true亮蛔,說明該鎖沒有被任何線程持有
    if (tryRelease(arg)) {
        // 獲取頭結(jié)點(diǎn)
        Node h = head;
        if (h != null && h.waitStatus != 0)
            // 頭結(jié)點(diǎn)不為空并且頭結(jié)點(diǎn)的waitStatus不是初始化節(jié)點(diǎn)情況,解除線程掛起狀態(tài)
            unparkSuccessor(h);
        return true;
    }
    return false;
}

這里的判斷條件為什么是h != null && h.waitStatus != 0

  1. h == null Head還沒初始化擎厢。初始情況下究流,head == null,第一個節(jié)點(diǎn)入隊动遭,Head會被初始化一個虛擬節(jié)點(diǎn)芬探。如果還沒來得及入隊,就會出現(xiàn)head == null 的情況厘惦。
  2. h != null && waitStatus == 0 表明后繼節(jié)點(diǎn)對應(yīng)的線程仍在運(yùn)行中偷仿,不需要喚醒。
  3. h != null && waitStatus < 0 表明后繼節(jié)點(diǎn)可能被阻塞了宵蕉,需要喚醒酝静,(還記得一個node是在shouldParkAfterFailedAcquire方法中被設(shè)置為SIGNAL = -1的吧?不記得翻看一下上面吧)

tryRelease

protected final boolean tryRelease(int releases) {
    // 減少可重入次數(shù)国裳,setState(c);
    int c = getState() - releases;
    // 當(dāng)前線程不是持有鎖的線程形入,拋出異常
    if (Thread.currentThread() != getExclusiveOwnerThread())
        throw new IllegalMonitorStateException();
    boolean free = false;
    // 如果持有線程全部釋放,將當(dāng)前獨(dú)占鎖所有線程設(shè)置為null缝左,并更新state
    if (c == 0) {
        free = true;
        setExclusiveOwnerThread(null);
    }
    setState(c);
    return free;
}

unparkSuccessor

這個方法在cancelAcquire其實也用到了亿遂,簡單分析一下

// 如果當(dāng)前節(jié)點(diǎn)是head的后繼節(jié)點(diǎn),或者上述條件不滿足渺杉,就喚醒當(dāng)前節(jié)點(diǎn)的后繼節(jié)點(diǎn)unparkSuccessor(node);

private void unparkSuccessor(Node node) {
    // 獲取結(jié)點(diǎn)waitStatus蛇数,CAS設(shè)置狀態(tài)state=0
    int ws = node.waitStatus;
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0);
    // 獲取當(dāng)前節(jié)點(diǎn)的下一個節(jié)點(diǎn)
    Node s = node.next;
    // 如果下個節(jié)點(diǎn)是null或者下個節(jié)點(diǎn)被cancelled,就找到隊列最開始的非cancelled的節(jié)點(diǎn)
    if (s == null || s.waitStatus > 0) {
        s = null;
        // 就從尾部節(jié)點(diǎn)開始找是越,到隊首耳舅,找到隊列第一個waitStatus<0的節(jié)點(diǎn)。
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    // 如果當(dāng)前節(jié)點(diǎn)的下個節(jié)點(diǎn)不為空,而且狀態(tài)<=0浦徊,就把當(dāng)前節(jié)點(diǎn)unpark
    if (s != null)
        LockSupport.unpark(s.thread);
}

為什么要從后往前找第一個非Cancelled的節(jié)點(diǎn)呢馏予?

原因1:addWaiter方法并非原子,構(gòu)建鏈表結(jié)構(gòu)時如下圖中 1盔性、2間隙執(zhí)行unparkSuccessor霞丧,此時鏈表是不完整的,沒辦法從前往后找了冕香;

image-20200607184934087

原因2:還有一點(diǎn)原因蛹尝,在產(chǎn)生CANCELLED狀態(tài)節(jié)點(diǎn)的時候,先斷開的是Next指針悉尾,Prev指針并未斷開突那,因此也是必須要從后往前遍歷才能夠遍歷完全部的Node;

中斷恢復(fù)

喚醒后构眯,會執(zhí)行return Thread.interrupted();愕难,這個函數(shù)返回的是當(dāng)前執(zhí)行線程的中斷狀態(tài),并清除鸵赖。

private final boolean parkAndCheckInterrupt() {
    LockSupport.park(this);
    return Thread.interrupted();
}

acquireQueued代碼务漩,當(dāng)parkAndCheckInterrupt返回True或者False的時候,interrupted的值不同它褪,但都會執(zhí)行下次循環(huán)饵骨。如果這個時候獲取鎖成功,就會把當(dāng)前interrupted返回茫打。

public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}
final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
                interrupted = true;
            }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

如果acquireQueued為True居触,就會執(zhí)行selfInterrupt方法。

該方法其實是為了中斷線程老赤。但為什么獲取了鎖以后還要中斷線程呢轮洋?這部分屬于Java提供的協(xié)作式中斷知識內(nèi)容,感興趣同學(xué)可以查閱一下抬旺。這里簡單介紹一下:

  1. 當(dāng)中斷線程被喚醒時弊予,并不知道被喚醒的原因,可能是當(dāng)前線程在等待中被中斷开财,也可能是釋放了鎖以后被喚醒汉柒。因此我們通過Thread.interrupted()方法檢查中斷標(biāo)記(該方法返回了當(dāng)前線程的中斷狀態(tài),并將當(dāng)前線程的中斷標(biāo)識設(shè)置為False)责鳍,并記錄下來碾褂,如果發(fā)現(xiàn)該線程被中斷過,就再中斷一次历葛。
  2. 線程在等待資源的過程中被喚醒正塌,喚醒后還是會不斷地去嘗試獲取鎖,直到搶到鎖為止。也就是說乓诽,在整個流程中帜羊,并不響應(yīng)中斷,只是記錄中斷記錄鸠天。最后搶到鎖返回了逮壁,那么如果被中斷過的話,就需要補(bǔ)充一次中斷粮宛。

這里的處理方式主要是運(yùn)用線程池中基本運(yùn)作單元Worder中的runWorker,通過Thread.interrupted()進(jìn)行額外的判斷處理卖宠,可以看下ThreadPoolExecutor源碼的判斷條件巍杈;

image-20200607210022068

其它

AQS在JUC中有?比較?廣泛的使?用,以下是主要使?用的地?方:

  • ReentrantLock:使?用AQS保存鎖重復(fù)持有的次數(shù)扛伍。當(dāng)?一個線程獲取鎖時筷畦, ReentrantLock記錄當(dāng)
    前獲得鎖的線程標(biāo)識,?用于檢測是否重復(fù)獲取刺洒,以及錯誤線程試圖解鎖操作時異常情況的處理理鳖宾。
  • Semaphore:使?用AQS同步狀態(tài)來保存信號量量的當(dāng)前計數(shù)。 tryRelease會增加計數(shù)逆航,
    acquireShared會減少計數(shù)鼎文。
  • CountDownLatch:使?用AQS同步狀態(tài)來表示計數(shù)。計數(shù)為0時因俐,所有的Acquire操作
    (CountDownLatch的await?方法)才可以通過拇惋。
  • ReentrantReadWriteLock:使?用AQS同步狀態(tài)中的16位保存寫鎖持有的次數(shù),剩下的16位?用于保
    存讀鎖的持有次數(shù)抹剩。
  • ThreadPoolExecutor: Worker利利?用AQS同步狀態(tài)實現(xiàn)對獨(dú)占線程變量量的設(shè)置(tryAcquire和
    tryRelease)撑帖。

至此,通過ReentrantLock分析AQS的實現(xiàn)原理一家完畢澳眷,需要說明的是胡嘿,此文深度參考了美團(tuán)分析的ReentrantLock,是參考鏈接的第三個钳踊,有興趣可以對比差異衷敌,感謝!

參考

JDK API 文檔

Java的LockSupport.park()實現(xiàn)分析

[從ReentrantLock的實現(xiàn)看AQS的原理及應(yīng)用

[Thread的中斷機(jī)制(interrupt)


你的鼓勵也是我創(chuàng)作的動力

打賞地址

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末箍土,一起剝皮案震驚了整個濱河市逢享,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌吴藻,老刑警劉巖瞒爬,帶你破解...
    沈念sama閱讀 218,858評論 6 508
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異,居然都是意外死亡侧但,警方通過查閱死者的電腦和手機(jī)矢空,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,372評論 3 395
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來禀横,“玉大人屁药,你說我怎么就攤上這事“爻” “怎么了酿箭?”我有些...
    開封第一講書人閱讀 165,282評論 0 356
  • 文/不壞的土叔 我叫張陵,是天一觀的道長趾娃。 經(jīng)常有香客問我缭嫡,道長,這世上最難降的妖魔是什么抬闷? 我笑而不...
    開封第一講書人閱讀 58,842評論 1 295
  • 正文 為了忘掉前任妇蛀,我火速辦了婚禮,結(jié)果婚禮上笤成,老公的妹妹穿的比我還像新娘评架。我一直安慰自己,他們只是感情好炕泳,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,857評論 6 392
  • 文/花漫 我一把揭開白布纵诞。 她就那樣靜靜地躺著,像睡著了一般培遵。 火紅的嫁衣襯著肌膚如雪挣磨。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 51,679評論 1 305
  • 那天荤懂,我揣著相機(jī)與錄音茁裙,去河邊找鬼。 笑死节仿,一個胖子當(dāng)著我的面吹牛晤锥,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播廊宪,決...
    沈念sama閱讀 40,406評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼矾瘾,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了箭启?” 一聲冷哼從身側(cè)響起壕翩,我...
    開封第一講書人閱讀 39,311評論 0 276
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎傅寡,沒想到半個月后放妈,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體北救,經(jīng)...
    沈念sama閱讀 45,767評論 1 315
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,945評論 3 336
  • 正文 我和宋清朗相戀三年芜抒,在試婚紗的時候發(fā)現(xiàn)自己被綠了珍策。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 40,090評論 1 350
  • 序言:一個原本活蹦亂跳的男人離奇死亡宅倒,死狀恐怖攘宙,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情拐迁,我是刑警寧澤蹭劈,帶...
    沈念sama閱讀 35,785評論 5 346
  • 正文 年R本政府宣布,位于F島的核電站线召,受9級特大地震影響链方,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜灶搜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,420評論 3 331
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望工窍。 院中可真熱鬧割卖,春花似錦、人聲如沸患雏。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,988評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽淹仑。三九已至丙挽,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間匀借,已是汗流浹背颜阐。 一陣腳步聲響...
    開封第一講書人閱讀 33,101評論 1 271
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留吓肋,地道東北人凳怨。 一個月前我還...
    沈念sama閱讀 48,298評論 3 372
  • 正文 我出身青樓,卻偏偏與公主長得像是鬼,于是被迫代替她去往敵國和親肤舞。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,033評論 2 355

推薦閱讀更多精彩內(nèi)容