并發(fā)編程之:AQS源碼解析

大家好后室,我是小黑,一個在互聯(lián)網(wǎng)茍且偷生的農(nóng)民工独撇。

在Java并發(fā)編程中屑墨,經(jīng)常會用到鎖,除了Synchronized這個JDK關(guān)鍵字以外纷铣,還有Lock接口下面的各種鎖實現(xiàn)卵史,如重入鎖ReentrantLock,還有讀寫鎖ReadWriteLock等关炼,他們在實現(xiàn)鎖的過程中都是依賴與AQS來完成核心的加解鎖邏輯的。那么AQS具體是什么呢匣吊?

提供一個框架儒拂,用于實現(xiàn)依賴先進先出(FIFO)等待隊列的阻塞鎖和相關(guān)同步器(信號量,事件等)色鸳。 該類被設(shè)計為大多數(shù)類型的同步器的有用依據(jù)社痛,這些同步器依賴于單個原子int值來表示狀態(tài)。 子類必須定義改變此狀態(tài)的受保護方法命雀,以及根據(jù)該對象被獲取或釋放來定義該狀態(tài)的含義蒜哀。 給定這些,這個類中的其他方法執(zhí)行所有排隊和阻塞機制吏砂。 子類可以保持其他狀態(tài)字段撵儿,但只以原子方式更新int使用方法操縱值getState() 乘客, setState(int)和compareAndSetState(int, int)被跟蹤相對于同步。

上述內(nèi)容來自JDK官方文檔淀歇。

簡單來說易核,AQS是一個先進先出(FIFO)的等待隊列,主要用在一些線程同步場景浪默,需要通過一個int類型的值來表示同步狀態(tài)牡直。提供了排隊和阻塞機制。

類圖結(jié)構(gòu)

image-20210904200925904

從類圖可以看出纳决,在ReentrantLock中定義了AQS的子類Sync碰逸,可以通過Sync實現(xiàn)對于可重入鎖的加鎖,解鎖阔加。

AQS通過int類型的狀態(tài)state來表示同步狀態(tài)饵史。

AQS中主要提供的方法:

acquire(int) 獨占方式獲取鎖

acquireShared(int) 共享方式獲取鎖

release(int) 獨占方式釋放鎖

releaseShared(int) 共享方式釋放鎖

獨占鎖和共享鎖

關(guān)于獨占鎖和共享鎖先給大家普及一下這個概念。

獨占鎖指該鎖只能同時被一個線程持有掸哑;

共享鎖指該鎖可以被多個線程同時持有约急。

舉個生活中的例子,比如我們使用打車軟件打車苗分,獨占鎖就好比我們打快車或者專車厌蔽,一輛車只能讓一個客戶打到,不能兩個客戶同時打到一輛車摔癣;共享鎖就好比打拼車奴饮,可以有多個客戶一起打到同一輛車。

AQS內(nèi)部結(jié)構(gòu)

我們簡單通過一張圖先來了解下AQS的內(nèi)部結(jié)構(gòu)择浊。其實就是有一個隊列戴卜,這個隊列的頭結(jié)點head代表當(dāng)前正在持有鎖的線程,后續(xù)的其他節(jié)點代表當(dāng)前正在等待的線程琢岩。

image-20210904201020029

接下來我們通過源碼來看看AQS的加鎖和解鎖過程投剥。先來看看獨占鎖是如何進行加解鎖的。

獨占鎖加鎖過程

ReentrantLock lock = new ReentrantLock();
lock.lock();
public void lock() {
    // 調(diào)用sync的lock方法
    sync.lock();
}

可以看到在ReentrantLock的lock方法中担孔,直接調(diào)用了sync這個AQS子類的lock方法江锨。

final void lock() {
    // 獲取鎖
    acquire(1);
}
public final void acquire(int arg) {
    // 1.先嘗試獲取,如果獲取成功糕篇,則直接返回啄育,代表加鎖成功
    if (!tryAcquire(arg) &&
        // 2.如果獲取失敗,則調(diào)用addWaiter在等待隊列中增加一個節(jié)點
        // 3. 調(diào)用acquireQueued告訴前一個節(jié)點拌消,在解鎖之后喚醒自己,然后線程進入等待狀態(tài)
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        // 如果在等待過程中被中斷挑豌,則當(dāng)前線程中斷
        selfInterrupt();
}

在獲取鎖時,基本可以分為3步:

  1. 嘗試獲取,如果成功則返回氓英,如果失敗侯勉,執(zhí)行下一步;
  2. 將當(dāng)前線程放入等待隊列尾部债蓝;
  3. 標(biāo)記前面等待的線程執(zhí)行完之后喚醒當(dāng)前線程壳鹤。
/**
 * 嘗試獲取鎖(公平鎖實現(xiàn))
 */
protected final boolean tryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    // 獲取state,初始值為0饰迹,每次加鎖成功會+1芳誓,解鎖成功-1
    int c = getState();
    // 當(dāng)前沒有線程占用
    if (c == 0) { 
        // 判斷是否有其他線程排隊在本線程之前
        if (!hasQueuedPredecessors() &&
            // 如果沒有,通過CAS進行加鎖
            compareAndSetState(0, acquires)) {
            // 將當(dāng)前線程設(shè)置為AQS的獨占線程
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    // 如果當(dāng)前線程是正在獨占的線程(已持有鎖啊鸭,重入)
    else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;  
        if (nextc < 0)
            throw new Error("Maximum lock count exceeded");
        // state+1
        setState(nextc);
        return true;
    }
    return false;
}
private Node addWaiter(Node mode) {
    // 創(chuàng)建一個當(dāng)前線程的Node節(jié)點
    Node node = new Node(Thread.currentThread(), mode);
    // Try the fast path of enq; backup to full enq on failure
    Node pred = tail;
    // 如果等待隊列的尾節(jié)點!=null
    if (pred != null) {
        // 將本線程對應(yīng)節(jié)點的前置節(jié)點設(shè)置為原來的尾節(jié)點
        node.prev = pred;
        // 通過CAS將本線程節(jié)點設(shè)置為尾節(jié)點
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    //尾節(jié)點為空锹淌,或者在CAS時失敗,則通過enq方法重新加入到尾部赠制。(本方法內(nèi)部采用自旋)
    enq(node);
    return node;
}

private Node enq(final Node node) {
    for (;;) {
        Node t = tail;
        // 尾節(jié)點為空赂摆,代表等待隊列還沒有被初始化過
        if (t == null) { 
            // 創(chuàng)建一個空的Node對象,通過CAS賦值給Head節(jié)點钟些,如果失敗烟号,則重新自旋一次,如果成功,將Head節(jié)點賦值給尾節(jié)點
            if (compareAndSetHead(new Node()))
                tail = head; 
        } else {
            // 尾節(jié)點不為空的情況,說明等待隊列已經(jīng)被初始化過蟆豫,將當(dāng)前節(jié)點的前置節(jié)點指向尾節(jié)點
            node.prev = t;
            // 將當(dāng)前節(jié)點CAS賦值給尾節(jié)點
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}
final boolean acquireQueued(final Node node, int arg) {
    // 標(biāo)識是否加鎖失敗
    boolean failed = true;
    try {
        // 是否被中斷
        boolean interrupted = false;
        for (;;) {
            // 取出來當(dāng)前節(jié)點的前一個節(jié)點
            final Node p = node.predecessor();
            // 如果前一個節(jié)點是head節(jié)點,那么自己就是老二迫筑,這個時候再嘗試獲取一次鎖
            if (p == head && tryAcquire(arg)) {
                // 如果獲取成功,把當(dāng)前節(jié)點設(shè)置為head節(jié)點
                setHead(node);
                p.next = null; // help GC
                failed = false; // 標(biāo)識加鎖成功
                return interrupted;
            }
            // shouldParkAfterFailedAcquire 檢查并更新前置節(jié)點p的狀態(tài)宗弯,如果node節(jié)點應(yīng)該阻塞就返回true
            // 如果返回false脯燃,則自旋一次。
            if (shouldParkAfterFailedAcquire(p, node) &&
                // 當(dāng)前線程阻塞蒙保,在阻塞被喚醒時辕棚,判斷是否被中斷
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed) // 如果加鎖成功,則取消獲取鎖
            cancelAcquire(node);
    }
}
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    int ws = pred.waitStatus;
    if (ws == Node.SIGNAL) // ws == -1
        /*
         * 這個節(jié)點已經(jīng)設(shè)置了請求釋放的狀態(tài)邓厕,所以它可以在這里安全park.
         */
        return true;
    if (ws > 0) {
        /*
         * 前置節(jié)點被取消了逝嚎,跳過前置節(jié)點重試
         */
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
        /*
         * 將前置節(jié)點的狀態(tài)設(shè)置為請求釋放
         */
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}

在整個加鎖過程可以通過下圖更清晰的理解。

image-20210904201038010

獨占鎖解鎖過程

public void unlock() {
    sync.release(1);
}

同樣解鎖時也是直接調(diào)用AQS子類sync的release方法邑狸。

public final boolean release(int arg) {
    // 嘗試解鎖
    if (tryRelease(arg)) {
        Node h = head;
        // 解鎖成功懈糯,如果head!=null并且head.ws不等0涤妒,代表有其他線程排隊
        if (h != null && h.waitStatus != 0)
            // 喚醒后續(xù)等待的節(jié)點
            unparkSuccessor(h);
        return true;
    }
    return false;
}

解鎖過程如下:

  1. 先嘗試解鎖单雾,解鎖失敗則直接返回false。(理論上不會解鎖失敗,因為正在執(zhí)行解鎖的線程一定是持有鎖的線程)
  2. 解鎖成功之后硅堆,如果有head節(jié)點并且狀態(tài)不是0屿储,代表有線程被阻塞等待,則喚醒下一個等待的線程渐逃。
protected final boolean tryRelease(int releases) {
    // state - 1
    int c = getState() - releases;
    // 如果當(dāng)前線程不是獨占AQS的線程够掠,但是這時候又來解鎖,這種情況肯定是非法的茄菊。
    if (Thread.currentThread() != getExclusiveOwnerThread())
        throw new IllegalMonitorStateException();
    boolean free = false;
    if (c == 0) { // 如果狀態(tài)歸零疯潭,代表鎖釋放了,將獨占線程設(shè)置為null
        free = true;
        setExclusiveOwnerThread(null);
    }
    // 將減1之后的狀態(tài)設(shè)置為state
    setState(c);
    return free;
}
private void unparkSuccessor(Node node) {
    /*
     * 如果節(jié)點的ws小于0面殖,將ws設(shè)置為0
     */
    int ws = node.waitStatus;
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0);

    /*
     * 從等待隊列的尾部往前找竖哩,直到第二個節(jié)點,ws<=0的節(jié)點脊僚。
     */
    Node s = node.next;
    if (s == null || s.waitStatus > 0) {
        s = null;
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    // 如果存在符合條件的節(jié)點相叁,unpark喚醒這個節(jié)點的線程。
    if (s != null)
        LockSupport.unpark(s.thread);
}

共享鎖加鎖過程

為了實現(xiàn)共享鎖辽幌,AQS中專門有一套和排他鎖不同的實現(xiàn)增淹,我們來看一下源碼具體是怎么做的。

public void lock() {
    sync.acquireShared(1);
}
public final void acquireShared(int arg) {
    // tryAcquireShared 嘗試獲取共享鎖許可乌企,如果返回負(fù)數(shù)標(biāo)識獲取失敗
    // 返回0表示成功虑润,但是已經(jīng)沒有多余的許可可用,后續(xù)不能再成功逛犹,返回正數(shù)表示后續(xù)請求也可以成功
    if (tryAcquireShared(arg) < 0)
       //  申請失敗端辱,則加入到共享等待隊列
        doAcquireShared(arg);
}

tryAcquireShared嘗試獲取共享許可,本方法需要在子類中進行實現(xiàn)虽画。不同的實現(xiàn)類實現(xiàn)方式不一樣舞蔽。

下面的代碼是ReentrentReadWriteLock中的實現(xiàn)。

 protected final int tryAcquireShared(int unused) {
    Thread current = Thread.currentThread();
    int c = getState();
    // 當(dāng)前有獨占線程正在持有許可码撰,并且獨占線程不是當(dāng)前線程渗柿,則返回失敗(-1)
    if (exclusiveCount(c) != 0 &&
        getExclusiveOwnerThread() != current)
        return -1;
    // 沒有獨占線程脖岛,或者獨占線程是當(dāng)前線程朵栖。
    // 獲取已使用讀鎖的個數(shù)
    int r = sharedCount(c);
    // 判斷當(dāng)前讀鎖是否應(yīng)該阻塞 
    if (!readerShouldBlock() &&
        // 已使用讀鎖小于最大數(shù)量
        r < MAX_COUNT &&
        // CAS設(shè)置state,每次加SHARED_UNIT標(biāo)識共享鎖+1
        compareAndSetState(c, c + SHARED_UNIT)) {
        if (r == 0) { // 標(biāo)識第一次加讀鎖
            firstReader = current;
            firstReaderHoldCount = 1;
        } else if (firstReader == current) {
            // 重入加讀鎖
            firstReaderHoldCount++;
        } else {
            // 并發(fā)加讀鎖柴梆,記錄當(dāng)前線程的讀的次數(shù)陨溅,HoldCounter中是一個ThreadLocal。
            HoldCounter rh = cachedHoldCounter;
            if (rh == null || rh.tid != getThreadId(current))
                cachedHoldCounter = rh = readHolds.get();
            else if (rh.count == 0)
                readHolds.set(rh);
            rh.count++;
        }
        return 1;
    }
    // 否則自旋嘗試獲取共享鎖
    return fullTryAcquireShared(current);
}

本方法可以總結(jié)為三步:

  1. 如果有寫線程獨占绍在,則失敗门扇,返回-1
  2. 沒有寫線程或者當(dāng)前線程就是寫線程重入雹有,則判斷是否讀線程阻塞,如果不用阻塞則CAS將已使用讀鎖個數(shù)+1
  3. 如果第2步失敗臼寄,失敗原因可能是讀線程應(yīng)該阻塞霸奕,或者讀鎖達(dá)到上限,或者CAS失敗吉拳,則調(diào)用fullTryAcquireShared方法质帅。
private void doAcquireShared(int arg) {
    // 加入同步等待隊列,指定是SHARED類型
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            // 取到當(dāng)前節(jié)點的前一個節(jié)點
            final Node p = node.predecessor();
            // 如果前一個節(jié)點是頭節(jié)點留攒,則當(dāng)前節(jié)點是第二個節(jié)點煤惩。
            if (p == head) {
                // 因為是FIFO隊列,所以當(dāng)前節(jié)點這時可以再嘗試獲取一次。
                int r = tryAcquireShared(arg);
                if (r >= 0) {
                    // 獲取成功炼邀,把當(dāng)前節(jié)點設(shè)置為頭節(jié)點盟庞。并且判斷是否需要喚醒后面的等待節(jié)點。
                    // 如果條件允許汤善,就會喚醒后面的節(jié)點
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    if (interrupted)
                        selfInterrupt();
                    failed = false;
                    return;
                }
            }
            // 如果前置節(jié)點不是頭結(jié)點什猖,說明當(dāng)前節(jié)點線程需要阻塞等待,并告知前一個節(jié)點喚醒
            // 檢查并更新前置節(jié)點p的狀態(tài)红淡,如果node節(jié)點應(yīng)該阻塞就返回true
            // 當(dāng)前線程被喚醒之后不狮,會從parkAndCheckInterrupt()執(zhí)行
            if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed) 
            cancelAcquire(node);
    }
}

共享鎖釋放過程

public void unlock() {
    sync.releaseShared(1);
}

public final boolean releaseShared(int arg) {
    //tryReleaseShared()嘗試釋放許可,這個方法在AQS中默認(rèn)拋出一個異常在旱,需要在子類中實現(xiàn)
    if (tryReleaseShared(arg)) {
        // 喚醒線程摇零,設(shè)置傳播狀態(tài) WS
        doReleaseShared();
        return true;
    }
    return false;
}

AQS是很多并發(fā)場景下同步控制的基石,其中的實現(xiàn)相對要復(fù)雜很多桶蝎,還需要多看多琢磨才能完全理解驻仅。本文也是和大家做一個初探,給大家展示了核心的代碼邏輯登渣,希望能有所幫助噪服。


好的,本期內(nèi)容就到這里胜茧,我們下期見粘优。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市呻顽,隨后出現(xiàn)的幾起案子雹顺,更是在濱河造成了極大的恐慌,老刑警劉巖廊遍,帶你破解...
    沈念sama閱讀 219,539評論 6 508
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件嬉愧,死亡現(xiàn)場離奇詭異,居然都是意外死亡喉前,警方通過查閱死者的電腦和手機没酣,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,594評論 3 396
  • 文/潘曉璐 我一進店門揽惹,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人四康,你說我怎么就攤上這事∠廖眨” “怎么了闪金?”我有些...
    開封第一講書人閱讀 165,871評論 0 356
  • 文/不壞的土叔 我叫張陵,是天一觀的道長论颅。 經(jīng)常有香客問我哎垦,道長,這世上最難降的妖魔是什么恃疯? 我笑而不...
    開封第一講書人閱讀 58,963評論 1 295
  • 正文 為了忘掉前任漏设,我火速辦了婚禮,結(jié)果婚禮上今妄,老公的妹妹穿的比我還像新娘郑口。我一直安慰自己,他們只是感情好盾鳞,可當(dāng)我...
    茶點故事閱讀 67,984評論 6 393
  • 文/花漫 我一把揭開白布犬性。 她就那樣靜靜地躺著,像睡著了一般腾仅。 火紅的嫁衣襯著肌膚如雪乒裆。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 51,763評論 1 307
  • 那天推励,我揣著相機與錄音鹤耍,去河邊找鬼。 笑死验辞,一個胖子當(dāng)著我的面吹牛稿黄,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播跌造,決...
    沈念sama閱讀 40,468評論 3 420
  • 文/蒼蘭香墨 我猛地睜開眼抛猖,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了鼻听?” 一聲冷哼從身側(cè)響起财著,我...
    開封第一講書人閱讀 39,357評論 0 276
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎撑碴,沒想到半個月后撑教,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,850評論 1 317
  • 正文 獨居荒郊野嶺守林人離奇死亡醉拓,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 38,002評論 3 338
  • 正文 我和宋清朗相戀三年伟姐,在試婚紗的時候發(fā)現(xiàn)自己被綠了收苏。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 40,144評論 1 351
  • 序言:一個原本活蹦亂跳的男人離奇死亡愤兵,死狀恐怖鹿霸,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情秆乳,我是刑警寧澤懦鼠,帶...
    沈念sama閱讀 35,823評論 5 346
  • 正文 年R本政府宣布,位于F島的核電站屹堰,受9級特大地震影響肛冶,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜扯键,卻給世界環(huán)境...
    茶點故事閱讀 41,483評論 3 331
  • 文/蒙蒙 一睦袖、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧荣刑,春花似錦馅笙、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,026評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至叶堆,卻和暖如春阱飘,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背虱颗。 一陣腳步聲響...
    開封第一講書人閱讀 33,150評論 1 272
  • 我被黑心中介騙來泰國打工沥匈, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人忘渔。 一個月前我還...
    沈念sama閱讀 48,415評論 3 373
  • 正文 我出身青樓高帖,卻偏偏與公主長得像,于是被迫代替她去往敵國和親畦粮。 傳聞我的和親對象是個殘疾皇子散址,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 45,092評論 2 355

推薦閱讀更多精彩內(nèi)容