AQS探究

總所周知,java concurrent包的工具類是構建在AbstractQueuedSynchronizer類上的基礎上的减牺,而這個類是Doug Lea大神基于CHL隊列實現(xiàn)的同步器晚缩。這個強大的同步器是怎樣實現(xiàn)的呢浅役?我們來一探究竟。

因為AQS的代碼比較難以理解瞻凤,我們從concurrent包下的并發(fā)工具類著手開始研究憨攒。從最簡單的CountDownLatch開始,首先看它的源碼


   public class CountDownLatch {
    /**
     * Synchronization control For CountDownLatch.
     * Uses AQS state to represent count.
     */
    private static final class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = 4982264981922014374L;

        Sync(int count) {
            setState(count);
        }

        int getCount() {
            return getState();
        }

        protected int tryAcquireShared(int acquires) {
            return (getState() == 0) ? 1 : -1;
        }

        protected boolean tryReleaseShared(int releases) {
            // Decrement count; signal when transition to zero
            for (;;) {
                int c = getState();
                if (c == 0)
                    return false;
                int nextc = c-1;
                if (compareAndSetState(c, nextc))
                    return nextc == 0;
            }
        }
    }

CountDownlatch類定義了一個Sync類繼承自AQS阀参,實現(xiàn)的了AQS的tryAcquireShared和tryReleaseShared方法肝集,share顧名思義是共享鎖。首先從await方法入手:


    public void await() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }

await方法調用的AQS的acquireSharedInterruptibly


    public final void acquireSharedInterruptibly(int arg)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        if (tryAcquireShared(arg) < 0)
            doAcquireSharedInterruptibly(arg);
    }

從這個方法看蛛壳,await方法是可中斷的杏瞻,如果當前線程被中斷,則直接向上拋InterruptedException衙荐。如果正常執(zhí)行捞挥,則會調用tryAcquireShared方法,這個是在之類中實現(xiàn)的∮且鳎現(xiàn)在回到CountDownLatch砌函,看tryAcquireShared的實現(xiàn):


   protected int tryAcquireShared(int acquires) {
            return (getState() == 0) ? 1 : -1;
        }

很簡單,如果state為0則返回1溜族,否則讹俊,返回-1。state是構造函數(shù)里傳進來的煌抒。我們都知道使用CountDownlatch時傳進來的數(shù)字表示并發(fā)執(zhí)行的線程數(shù)仍劈,由此聯(lián)想state就是持有鎖的線程數(shù)。從acquireSharedInterruptibly方法可以看到摧玫,當前state!=0耳奕,即并發(fā)任務線程還沒執(zhí)行完時绑青,會進入doAcquireSharedInterruptibly:


private void doAcquireSharedInterruptibly(int arg)
        throws InterruptedException {
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            for (;;) {
                final Node p = node.predecessor();
                if (p == head) {
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        failed = false;
                        return;
                    }
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

首先看addWaiter方法


 private Node addWaiter(Node mode) {
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        enq(node);
        return node;
    }

也就是說在pred為null的時候會初始化隊列


    private Node enq(final Node node) {
        for (;;) {
            Node t = tail;
            if (t == null) { // Must initialize
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }

從上面代碼看初始化之后的隊列是這樣的:

線程隊列.jpg

head只是指向一個空節(jié)點诬像,這一點對于理解后面的代碼很重要屋群,再回到doAcquireSharedInterruptibly,p的前繼節(jié)點就是head,所以會進入下面的if分支(至于為什么有這個if判斷后面再詳解),對于CountDownLatch坏挠,在并發(fā)任務還沒完成的時候芍躏,tryAcquireShared返回值為-1,所以就不會往下走降狠。直接進入shouldParkAfterFailedAcquire


private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int ws = pred.waitStatus;
        if (ws == Node.SIGNAL)
            /*
             * This node has already set status asking a release
             * to signal it, so it can safely park.
             */
            return true;
        if (ws > 0) {
            /*
             * Predecessor was cancelled. Skip over predecessors and
             * indicate retry.
             */
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            /*
             * waitStatus must be 0 or PROPAGATE.  Indicate that we
             * need a signal, but don't park yet.  Caller will need to
             * retry to make sure it cannot acquire before parking.
             */
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }

pred就是head对竣,初始化之后waitStatus=0,進入else分支榜配,故head的waitStatus被更新為SIGNAL否纬,再回到doAcquireSharedInterruptibly,這個時候如果線程沒有被中斷,那么會接著循環(huán)蛋褥,再次進入shouldParkAfterFailedAcquire临燃,這個是進入第一個if分支,返回true烙心,那么就是進入parkAndCheckInterrupt膜廊,將當前線程阻塞住,這就是CountDownlatch調用await后阻塞住的原因。

從上面的分析可以知道淫茵,對于CountDownlatch爪瓜,在并發(fā)任務還沒結束的時候,如果另外一個線程B再調用await方法匙瘪,那么當前線程會放到等待隊列的最后面铆铆。第一個節(jié)點park住的時候,它的waitStatus還是0丹喻,所以這次算灸,shouldParkAfterFailedAcquire會把第一個節(jié)點的waitStatus設置為SIGNAL,同時下次循環(huán)會park住線程B

AQS獲取鎖的過程已經(jīng)了解清楚了驻啤,下面來看看AQS釋放鎖的過程菲驴。還是從CountDownLatch的countdown()方法入手。countdown()是直接調用AQS的releaseShared


    public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {
            doReleaseShared();
            return true;
        }
        return false;
    }

從代碼看骑冗,tryReleaseShared是在子類中實現(xiàn)的:


        protected boolean tryReleaseShared(int releases) {
            // Decrement count; signal when transition to zero
            for (;;) {
                int c = getState();
                if (c == 0)
                    return false;
                int nextc = c-1;
                if (compareAndSetState(c, nextc))
                    return nextc == 0;
            }
        }

從tryReleaseShared方法代碼來看赊瞬,只有等所有并發(fā)任務執(zhí)行完,tryReleaseShared才會返回true贼涩,才會執(zhí)行doReleaseShared


private void doReleaseShared() {
        /*
         * Ensure that a release propagates, even if there are other
         * in-progress acquires/releases.  This proceeds in the usual
         * way of trying to unparkSuccessor of head if it needs
         * signal. But if it does not, status is set to PROPAGATE to
         * ensure that upon release, propagation continues.
         * Additionally, we must loop in case a new node is added
         * while we are doing this. Also, unlike other uses of
         * unparkSuccessor, we need to know if CAS to reset status
         * fails, if so rechecking.
         */
        for (;;) {
            Node h = head;
            if (h != null && h != tail) {
                int ws = h.waitStatus;
                if (ws == Node.SIGNAL) {
                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                        continue;            // loop to recheck cases
                    unparkSuccessor(h);
                }
                else if (ws == 0 &&
                         !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                    continue;                // loop on failed CAS
            }
            if (h == head)                   // loop if head changed
                break;
        }
    }

如果head節(jié)點的waitStatus為SIGNAL巧涧,則先把head節(jié)點的status設置為0,然后進入unparkSuccessor


private void unparkSuccessor(Node node) {
        /*
         * If status is negative (i.e., possibly needing signal) try
         * to clear in anticipation of signalling.  It is OK if this
         * fails or if status is changed by waiting thread.
         */
        int ws = node.waitStatus;
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);

        /*
         * Thread to unpark is held in successor, which is normally
         * just the next node.  But if cancelled or apparently null,
         * traverse backwards from tail to find the actual
         * non-cancelled successor.
         */
        Node s = node.next;
        if (s == null || s.waitStatus > 0) {
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null)
            LockSupport.unpark(s.thread);
    }

在通常情況下遥倦,s!=null并且s.waitStatus為SIGNAL谤绳,所以head節(jié)點的后繼節(jié)點會被喚醒占锯。就是說每次調用releaseShared只會喚醒等待隊列中head節(jié)點之后的線程。

分析到這里缩筛,試想這個使用CountDownLatch場景消略,線程A和線程B,都調用await方法等待線程B瞎抛、線程C完成任務艺演。那么在線程B、線>程C完成任務的時候桐臊,主線程調用releaseShared進入doReleaseShared喚醒head節(jié)點之后的節(jié)點線程胎撤。因為原來的線程是在doAcquireSharedInterruptibly里的for循環(huán)最后park住,現(xiàn)在仍然回到該處断凶,繼續(xù)下次循環(huán)伤提。這個時候會進入上面提到的if分支,進入setHeadAndPropagate认烁。


    private void setHeadAndPropagate(Node node, long propagate) {
        Node h = head; // Record old head for check below
        setHead(node);
        /*
         * Try to signal next queued node if:
         *   Propagation was indicated by caller,
         *     or was recorded (as h.waitStatus) by a previous operation
         *     (note: this uses sign-check of waitStatus because
         *      PROPAGATE status may transition to SIGNAL.)
         * and
         *   The next node is waiting in shared mode,
         *     or we don't know, because it appears null
         *
         * The conservatism in both of these checks may cause
         * unnecessary wake-ups, but only when there are multiple
         * racing acquires/releases, so most need signals now or soon
         * anyway.
         */
        if (propagate > 0 || h == null || h.waitStatus < 0) {
            Node s = node.next;
            if (s == null || s.isShared())
                doReleaseShared();
        }
    }

從代碼來看肿男,setHeadAndPropagate就是把當前當前head節(jié)點remove掉,設置當前線程節(jié)點為head節(jié)點(也就是第二個節(jié)點)砚著。同時在共享鎖的模式下次伶,會調用doReleaseShared,喚醒當前節(jié)點的后繼節(jié)點,這就是propagate的概念稽穆。同理后續(xù)節(jié)點又會再喚醒它后面的節(jié)點冠王,直到整個隊列都被喚醒。

至此舌镶,已基本了解AQS的工作原理的柱彻,為了加深印象,我們來看下面的線程隊列的變化過程圖餐胀。

線程thread1調用acquireSharedInterruptibly之后哟楷,線程隊列如下圖,同時thread1被park住


另外一個線程thead2再次調用acquireSharedInterruptibly之后否灾,線程隊列如下圖卖擅,同時thread2被park住


這個時候,另一個線程觸發(fā)releaseShared墨技,線程隊列如下圖惩阶,同時thread1被unpark

thread1被unpark之后,會進入setHeadAndPropagate扣汪,setHead之后断楷,線程隊列如下圖


thread1調用doReleaseShared喚醒thread2后,線程隊列如下圖


thread2 進入setHeadAndPropagate崭别,setHead之后冬筒,線程隊列如下圖


最后編輯于
?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
  • 序言:七十年代末恐锣,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子舞痰,更是在濱河造成了極大的恐慌土榴,老刑警劉巖,帶你破解...
    沈念sama閱讀 218,755評論 6 507
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件匀奏,死亡現(xiàn)場離奇詭異鞭衩,居然都是意外死亡学搜,警方通過查閱死者的電腦和手機娃善,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,305評論 3 395
  • 文/潘曉璐 我一進店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來瑞佩,“玉大人聚磺,你說我怎么就攤上這事【嫱瑁” “怎么了瘫寝?”我有些...
    開封第一講書人閱讀 165,138評論 0 355
  • 文/不壞的土叔 我叫張陵,是天一觀的道長稠炬。 經(jīng)常有香客問我焕阿,道長,這世上最難降的妖魔是什么首启? 我笑而不...
    開封第一講書人閱讀 58,791評論 1 295
  • 正文 為了忘掉前任暮屡,我火速辦了婚禮,結果婚禮上毅桃,老公的妹妹穿的比我還像新娘褒纲。我一直安慰自己,他們只是感情好钥飞,可當我...
    茶點故事閱讀 67,794評論 6 392
  • 文/花漫 我一把揭開白布莺掠。 她就那樣靜靜地躺著,像睡著了一般读宙。 火紅的嫁衣襯著肌膚如雪彻秆。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 51,631評論 1 305
  • 那天结闸,我揣著相機與錄音唇兑,去河邊找鬼。 笑死膀估,一個胖子當著我的面吹牛幔亥,可吹牛的內容都是我干的。 我是一名探鬼主播察纯,決...
    沈念sama閱讀 40,362評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼帕棉,長吁一口氣:“原來是場噩夢啊……” “哼针肥!你這毒婦竟也來了?” 一聲冷哼從身側響起香伴,我...
    開封第一講書人閱讀 39,264評論 0 276
  • 序言:老撾萬榮一對情侶失蹤慰枕,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后即纲,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體具帮,經(jīng)...
    沈念sama閱讀 45,724評論 1 315
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內容為張勛視角 年9月15日...
    茶點故事閱讀 37,900評論 3 336
  • 正文 我和宋清朗相戀三年低斋,在試婚紗的時候發(fā)現(xiàn)自己被綠了蜂厅。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 40,040評論 1 350
  • 序言:一個原本活蹦亂跳的男人離奇死亡膊畴,死狀恐怖掘猿,靈堂內的尸體忽然破棺而出,到底是詐尸還是另有隱情唇跨,我是刑警寧澤稠通,帶...
    沈念sama閱讀 35,742評論 5 346
  • 正文 年R本政府宣布,位于F島的核電站买猖,受9級特大地震影響改橘,放射性物質發(fā)生泄漏。R本人自食惡果不足惜玉控,卻給世界環(huán)境...
    茶點故事閱讀 41,364評論 3 330
  • 文/蒙蒙 一飞主、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧奸远,春花似錦既棺、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,944評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至薛窥,卻和暖如春胖烛,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背诅迷。 一陣腳步聲響...
    開封第一講書人閱讀 33,060評論 1 270
  • 我被黑心中介騙來泰國打工佩番, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人罢杉。 一個月前我還...
    沈念sama閱讀 48,247評論 3 371
  • 正文 我出身青樓趟畏,卻偏偏與公主長得像,于是被迫代替她去往敵國和親滩租。 傳聞我的和親對象是個殘疾皇子赋秀,可洞房花燭夜當晚...
    茶點故事閱讀 44,979評論 2 355

推薦閱讀更多精彩內容