源碼修煉筆記之AQS(AbstractQueuedSynchronizer)源碼解析

AbstractQueuedSynchronizer被稱為隊列同步器,簡稱為大家熟知的AQS,這個類可以稱作concurrent包的基礎疯汁,該類提供了同步的基本功能忌卤。該類包括如下幾個核心要素:

  • AQS內(nèi)部維護一個volatile修飾的state變量,state用于標記鎖的狀態(tài)允坚;
  • AQS通過內(nèi)部類Node記錄當前是哪個線程持有鎖;
  • AQS通過LockSupport的park和unPark方法來阻塞和喚醒線程;
  • AQS通過node來維護一個隊列吐限,用于保存所有阻塞的線程。

下面通過剖析源碼來看看AQS是如何工作的褂始。

AQS概要

AQS通過內(nèi)部類Node記錄當前是哪個線程持有鎖诸典,Node中有一個前驅(qū)節(jié)點和一個后繼節(jié)點,形成一個雙向鏈表崎苗,這個鏈表是一種CLH隊列狐粱,其中waitStatus表示當前線程的狀態(tài),其可能的取值包括以下幾種:

  • SIGNAL(-1)胆数,表示后繼線程已經(jīng)或者即將被阻塞肌蜻,當前線程釋放鎖或者獲取鎖失敗后需要喚醒后繼線程;
  • CANCELLED(1)必尼,表示當前線程因為超時或者中斷被取消蒋搜,這個狀態(tài)不可以被修改篡撵;
  • CONDITION(-2),當前線程為條件等待豆挽,其狀態(tài)設置0之后才能去競爭鎖育谬;
  • PROPAGATE(-3),表示共享鎖釋放之后需要傳遞給后繼節(jié)點帮哈,只有頭結(jié)點的才會有該狀態(tài)膛檀;
  • 0,該狀態(tài)為初始值娘侍,不屬于上面任意一種狀態(tài)咖刃。

Node對象中還有一個nextWaiter變量,指向下一個條件等待節(jié)點憾筏,相當于在CLH隊列的基礎上維護了一個簡單的單鏈表來關聯(lián)條件等待的節(jié)點嚎杨。

    static final class Node {

        static final Node SHARED = new Node();

        static final Node EXCLUSIVE = null;

        static final int CANCELLED =  1;

        static final int SIGNAL    = -1;

        static final int CONDITION = -2;
  
        static final int PROPAGATE = -3;

        volatile int waitStatus;

        volatile Node prev;

        volatile Node next;

        volatile Thread thread;

        Node nextWaiter;

        final boolean isShared() {
            return nextWaiter == SHARED;
        }

        final Node predecessor() throws NullPointerException {
            Node p = prev;
            if (p == null)
                throw new NullPointerException();
            else
                return p;
        }
        ...
        構(gòu)造方法
        ...
    }

Node提供了兩種入隊列的方法,即enq和addWaiter踩叭,enq方法如下所示磕潮,當尾節(jié)點tail為null時,表明阻塞隊列還沒有被初始化容贝,通過CAS操作來設置頭結(jié)點自脯,頭結(jié)點為new Node(),實際上頭結(jié)點中沒有阻塞的線程斤富,算得上是一個空的節(jié)點(注意空節(jié)點和null是不一樣的)膏潮,然后進行tail=head操作,這也說明當head=tail的時候满力,隊列中實際上是不存在阻塞線程的焕参,然后將需要入隊列的node放入隊列尾部,將tail指向node油额。

    private Node enq(final Node node) {
        for (;;) {
            Node t = tail;
            //如果tail為空叠纷,說明CLH隊列沒有被初始化,
            if (t == null) {
                //初始化CLH隊列潦嘶,將head和tail指向一個new Node()涩嚣,
                //此時雖然CLH有一個節(jié)點,但是并沒有真正意義的阻塞線程
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
                //將node放入隊列尾部掂僵,并通過cas將tail指向node
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }

addWaiter通常表示添加一個條件等待的節(jié)點入隊列航厚,該方法首先嘗試通過CAS操作快速入隊列,如果失敗則通過調(diào)用enq來入隊列锰蓬。

    private Node addWaiter(Node mode) {
        Node node = new Node(Thread.currentThread(), mode);
        //嘗試快速入隊列
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        //快速入隊列失敗則采用enq方入隊列
        enq(node);
        return node;
    }


Node還提供了喚醒后繼節(jié)點線程的功能幔睬,主要是通過LockSupport來實現(xiàn)的,源碼如下所示芹扭,

    private void unparkSuccessor(Node node) {
        int ws = node.waitStatus;
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);

        Node s = node.next;
        if (s == null || s.waitStatus > 0) {
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null)
            LockSupport.unpark(s.thread);
    }

排他獲取鎖

\color{green}{不支持中斷的獲取鎖}

    //不可中斷的獲取鎖
    public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            //對中斷做補償麻顶,中斷當前線程
            selfInterrupt();
    }

acquire方法首先會調(diào)用tryAcquire方法嘗試獲取鎖赦抖,如果獲取鎖失敗,首先通過addWaiter將當前線程放入CLH隊列中,然后通過acquireQueued方法獲取鎖澈蚌,acquireQueued方法源碼如下所示:

    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            //記錄中斷狀態(tài)
            boolean interrupted = false;
            //自旋式的獲取鎖
            for (;;) {
                final Node p = node.predecessor();
                //當前線程為CLH中的第一個阻塞線程才會嘗試去獲取鎖
                if (p == head && tryAcquire(arg)) {
                    //獲取成功則更新head
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    //返回中斷狀態(tài)
                    return interrupted;
                }
                //判斷中斷信息
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            //如果獲取鎖失敗摹芙,則取消獲取鎖的操作
            if (failed)
                cancelAcquire(node);
        }
    }

acquireQueued方法是無中斷的獲取鎖灼狰,該方法有一個布爾類型的返回值宛瞄,該值不是表示是否成功獲取鎖,而是標示當前線程的中斷狀態(tài)交胚,因為acquireQueued方法是無法響應中斷的份汗,需要對中斷進行補償,這個補償體現(xiàn)在acquire方法中蝴簇。

    //模板方法tryAcquire需要子類進行具體實現(xiàn)
    protected boolean tryAcquire(int arg) {
        throw new UnsupportedOperationException();
    }

\color{green}{支持中斷的獲取鎖}

    //可中斷的獲取鎖
    public final void acquireInterruptibly(int arg)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        if (!tryAcquire(arg))
            doAcquireInterruptibly(arg);
    }

acquireInterruptibly方法獲取鎖的過程中能夠響應中斷杯活,主要體現(xiàn)在獲取鎖之前會判斷一下當前線程的中斷中斷狀態(tài),若中斷則拋出InterruptedException熬词,然后通過tryAcquire獲取鎖旁钧,獲取成功直接返回,獲取失敗則通過doAcquireInterruptibly獲取鎖互拾,該方法和acquireQueued最大的區(qū)別就是在判斷parkAndCheckInterrupt后歪今,acquireQueued僅僅記錄中斷狀態(tài),parkAndCheckInterrupt則會拋出異常颜矿。

    private void doAcquireInterruptibly(int arg)
        throws InterruptedException {
        final Node node = addWaiter(Node.EXCLUSIVE);
        boolean failed = true;
        try {
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    //拋出異常寄猩,響應中斷
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

\color{green}{支持超時時間的獲取鎖}

支持超時時間的獲取鎖功能

    public final boolean tryAcquireNanos(int arg, long nanosTimeout)
            throws InterruptedException {
        //響應中斷
        if (Thread.interrupted())
            throw new InterruptedException();
        //首先通過tryAcquire快速獲取鎖,若失敗則調(diào)用doAcquireNanos方法
        return tryAcquire(arg) ||
            doAcquireNanos(arg, nanosTimeout);
    }

從方法tryAcquireNanos的源碼可以看出骑疆,該方法也是響應中斷的田篇,該方法首先調(diào)用模板方法tryAcquire快速的獲取鎖,如果失敗則通過doAcquireNanos獲取鎖箍铭,doAcquireNanos中支持超時機制泊柬,其源碼如下所示:

    private boolean doAcquireNanos(int arg, long nanosTimeout)
            throws InterruptedException {
        if (nanosTimeout <= 0L)
            return false;
        final long deadline = System.nanoTime() + nanosTimeout;
        final Node node = addWaiter(Node.EXCLUSIVE);
        boolean failed = true;
        try {
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return true;
                }
                nanosTimeout = deadline - System.nanoTime();
                //判斷如果超時則直接返回false,代表獲取鎖失敗
                if (nanosTimeout <= 0L)
                    return false;
                if (shouldParkAfterFailedAcquire(p, node) &&
                    nanosTimeout > spinForTimeoutThreshold)
                    LockSupport.parkNanos(this, nanosTimeout);
                if (Thread.interrupted())
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

doAcquireNanos方法與acquireQueued方法的區(qū)別是每次循環(huán)獲取鎖過程中都會計算deadline和當前時間的差值诈火,如果這個差值小于0兽赁,則表示獲取鎖的操作已經(jīng)超時,則直接返回false表示獲取鎖失敗柄瑰。

共享鎖獲取

AQS中不僅支持排他鎖的獲取闸氮,即acquire、acquireInterruptibly和tryAcquireNanos教沾,還提供了共享鎖的獲取操作方法蒲跨,包括acquireShared、acquireSharedInterruptibly和tryAcquireSharedNanos授翻,這三個方法源碼如下所示:

    public final void acquireShared(int arg) {
        if (tryAcquireShared(arg) < 0)
            doAcquireShared(arg);
    }

    public final void acquireSharedInterruptibly(int arg)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        if (tryAcquireShared(arg) < 0)
            doAcquireSharedInterruptibly(arg);
    }

    public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        return tryAcquireShared(arg) >= 0 ||
            doAcquireSharedNanos(arg, nanosTimeout);
    }

共享鎖的獲取和排他鎖的獲取方法類似或悲,共享鎖調(diào)用了不同的模板方法tryAcquireShared孙咪,這里介紹一下doAcquireShared方法,其他方法變化的套路和共享鎖的使用套路一樣巡语,doAcquireShared方法源碼如下所示:

    private void doAcquireShared(int arg) {
        //當前線程入隊列
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            boolean interrupted = false;
            //自旋式的獲取鎖
            for (;;) {
                final Node p = node.predecessor();
                //只有隊列中的第一個阻塞線程才能獲取鎖    
                if (p == head) {
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        //獲取鎖成功翎蹈,補償中斷
                        if (interrupted)
                            selfInterrupt();
                        failed = false;
                        return;
                    }
                }
                //通過interrupted記錄中斷信息
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

doAcquireShared方法沒有返回值,與acquireQueued不同的是:

  • doAcquireShared沒有返回值男公,該方法的中斷補償是在方法內(nèi)完成的荤堪,獲取鎖成功之后,會判斷中斷信息interrupted的狀態(tài)枢赔,如果為true則調(diào)用selfInterrupt()方法中斷當前線程澄阳;
  • 獲取鎖成功之后不是簡單的設置head,而是通過setHeadAndPropagate方法來設置頭結(jié)點和并且判斷后繼節(jié)點的信息踏拜,對后繼節(jié)點中的線程進行喚醒操作等碎赢,setHeadAndPropagate方法源碼如下所示:
    private void setHeadAndPropagate(Node node, int propagate) {
        Node h = head; 
        //設置新的頭結(jié)點
        setHead(node);
        if (propagate > 0 || h == null || h.waitStatus < 0 ||
            (h = head) == null || h.waitStatus < 0) {
            Node s = node.next;
            //如果后繼節(jié)點為空或者為SHARED類型的節(jié)點,執(zhí)行doReleaseShared方法
            if (s == null || s.isShared())
                doReleaseShared();
        }
    }

    private void doReleaseShared() {
        for (;;) {
            Node h = head;
            if (h != null && h != tail) {
                int ws = h.waitStatus;
                if (ws == Node.SIGNAL) {
                    //狀態(tài)為SIGNAL,則喚醒后繼節(jié)點中的線程
                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                        continue;            
                    unparkSuccessor(h);
                }
                //若狀態(tài)為0速梗,則設置狀態(tài)為PROPAGATE
                else if (ws == 0 &&
                         !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                    continue;                
            }
            if (h == head)                  
                break;
        }
    }

鎖的釋放

鎖的釋放也分為釋放排他鎖和釋放共享鎖肮塞,分別為release方法和releaseShared方法,源碼如下所示姻锁,

    //釋放排他鎖
    public final boolean release(int arg) {
        //釋放鎖枕赵,然后喚醒后繼節(jié)點的線程
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }


    //釋放共享鎖
    public final boolean releaseShared(int arg) {
        //釋放鎖,然后調(diào)用doReleaseShared方法
        if (tryReleaseShared(arg)) {
            doReleaseShared();
            return true;
        }
        return false;
    }

release方法和releaseShared方法分別調(diào)用模板方法tryRelease和tryReleaseShared來釋放鎖屋摔,release方法中直接通過調(diào)用unparkSuccessor喚醒后繼線程烁设,而releaseShared的喚醒操作在doReleaseShared方法中進行。

取消獲取鎖

當獲取鎖失敗時钓试,需要進行一些狀態(tài)清理和變化装黑,cancelAcquire方法就是用來實現(xiàn)這些功能的,其源碼如下所示弓熏,

    private void cancelAcquire(Node node) {
       
        if (node == null)
            return;
        //節(jié)點線程置為null
        node.thread = null;

        //從CLH隊列中清除已經(jīng)取消的節(jié)點(CANCELLED)
        Node pred = node.prev;
        while (pred.waitStatus > 0)
            node.prev = pred = pred.prev;

        Node predNext = pred.next;

        node.waitStatus = Node.CANCELLED;

        //判斷如果node是尾部節(jié)點恋谭,則設置尾部節(jié)點
        if (node == tail && compareAndSetTail(node, pred)) {
            compareAndSetNext(pred, predNext, null);
        } else {
            int ws;
            //若不是頭節(jié)點則直接從CLH隊列中清除當前節(jié)點
            if (pred != head &&
                ((ws = pred.waitStatus) == Node.SIGNAL ||
                 (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
                pred.thread != null) {
                Node next = node.next;
                if (next != null && next.waitStatus <= 0)
                    compareAndSetNext(pred, predNext, next);
            //若為頭結(jié)點,則喚醒后繼節(jié)點中的線程
            } else {
                unparkSuccessor(node);
            }

            node.next = node; // help GC
        }
    }

取消獲取鎖的操作首先將隊列中處于CANCELLED狀態(tài)的節(jié)點剔除挽鞠,然后根據(jù)當前節(jié)點在CLH隊列中的位置進行不同的操作:

  • node在隊列尾部疚颊,則重新設置CLH隊列的尾部節(jié)點;
  • node為頭結(jié)點信认,喚醒后繼節(jié)點中的線程材义;
  • node既不是頭結(jié)點也不是尾節(jié)點,則在CLH中剔除node嫁赏。

總結(jié)
AQS是整個concurrent包的基礎其掂,可重入鎖、線程池潦蝇、信號量(Semaphore)等同步工具類都需要借助AQS來完成款熬,了解AQS是深入學習concurrent包的前提深寥。

寫在最后
源碼路上,共勉O团!M锒臁!

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末殉簸,一起剝皮案震驚了整個濱河市闰集,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌喂链,老刑警劉巖返十,帶你破解...
    沈念sama閱讀 217,406評論 6 503
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件妥泉,死亡現(xiàn)場離奇詭異椭微,居然都是意外死亡,警方通過查閱死者的電腦和手機盲链,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,732評論 3 393
  • 文/潘曉璐 我一進店門蝇率,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人刽沾,你說我怎么就攤上這事本慕。” “怎么了侧漓?”我有些...
    開封第一講書人閱讀 163,711評論 0 353
  • 文/不壞的土叔 我叫張陵锅尘,是天一觀的道長。 經(jīng)常有香客問我布蔗,道長藤违,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 58,380評論 1 293
  • 正文 為了忘掉前任纵揍,我火速辦了婚禮顿乒,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘泽谨。我一直安慰自己璧榄,他們只是感情好,可當我...
    茶點故事閱讀 67,432評論 6 392
  • 文/花漫 我一把揭開白布吧雹。 她就那樣靜靜地躺著骨杂,像睡著了一般。 火紅的嫁衣襯著肌膚如雪雄卷。 梳的紋絲不亂的頭發(fā)上搓蚪,一...
    開封第一講書人閱讀 51,301評論 1 301
  • 那天,我揣著相機與錄音龙亲,去河邊找鬼陕凹。 笑死悍抑,一個胖子當著我的面吹牛,可吹牛的內(nèi)容都是我干的杜耙。 我是一名探鬼主播搜骡,決...
    沈念sama閱讀 40,145評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼佑女!你這毒婦竟也來了记靡?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 39,008評論 0 276
  • 序言:老撾萬榮一對情侶失蹤团驱,失蹤者是張志新(化名)和其女友劉穎摸吠,沒想到半個月后,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體嚎花,經(jīng)...
    沈念sama閱讀 45,443評論 1 314
  • 正文 獨居荒郊野嶺守林人離奇死亡寸痢,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,649評論 3 334
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了紊选。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片啼止。...
    茶點故事閱讀 39,795評論 1 347
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖兵罢,靈堂內(nèi)的尸體忽然破棺而出献烦,到底是詐尸還是另有隱情,我是刑警寧澤卖词,帶...
    沈念sama閱讀 35,501評論 5 345
  • 正文 年R本政府宣布巩那,位于F島的核電站,受9級特大地震影響此蜈,放射性物質(zhì)發(fā)生泄漏即横。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 41,119評論 3 328
  • 文/蒙蒙 一舶替、第九天 我趴在偏房一處隱蔽的房頂上張望令境。 院中可真熱鬧,春花似錦顾瞪、人聲如沸舔庶。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,731評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽惕橙。三九已至,卻和暖如春钉跷,著一層夾襖步出監(jiān)牢的瞬間弥鹦,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 32,865評論 1 269
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留彬坏,地道東北人朦促。 一個月前我還...
    沈念sama閱讀 47,899評論 2 370
  • 正文 我出身青樓,卻偏偏與公主長得像栓始,于是被迫代替她去往敵國和親务冕。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當晚...
    茶點故事閱讀 44,724評論 2 354