Java中AQS基本實(shí)現(xiàn)原理

一、AQS概述

AQS全名AbstractQueuedSynchronizer,意為抽象隊(duì)列同步器,JUC(java.util.concurrent包)下面的Lock和其他一些并發(fā)工具類都是基于它來實(shí)現(xiàn)的生真。AQS維護(hù)了一個volatile的state和一個CLH(FIFO)雙向隊(duì)列。

二屋吨、分析

state

state是一個由volatile修飾的int變量针贬,它的訪問方式有三種:

  • getState()
  • setState(int newState)
  • compareAndSetState(int expect, int update)
    /**
     * 由volatile修飾的state
     */
    private volatile int state;

    /**
     * 基于內(nèi)存可見性的讀
     */
    protected final int getState() {
        return state;
    }

    /**
     * 基于內(nèi)存可見性的寫
     */
    protected final void setState(int newState) {
        state = newState;
    }

    /**
     * 使用CAS+volatile,基于原子性與可見性的對state進(jìn)行設(shè)值
     */
    protected final boolean compareAndSetState(int expect, int update) {
        // 使用Unsafe類赋兵,調(diào)用JNI方法
        return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
    }

資源獲取主要有兩種形式:

  • 獨(dú)占式(EXCLUSIVE)

僅有一個線程能在同一時刻獲取到資源并處理笔咽,如ReentrantLock的實(shí)現(xiàn)。

  • 共享式(SHARED)

多個線程可以同時獲取到資源并處理霹期,如Semaphore/CountDownLatch等叶组。

AQS中大部分邏輯已經(jīng)被實(shí)現(xiàn),集成類只需要重寫state的獲壤臁(acquire)與釋放(release)方法甩十,因?yàn)樵贏QS中,這些方法默認(rèn)定義的實(shí)現(xiàn)方式都是拋出不支持操作異常帕膜,所以按需實(shí)現(xiàn)即可枣氧。

其中需要繼承類重寫的方法有:

  • tryAcquire(int arg)

此方法是獨(dú)占式的獲取資源方法,成功則返回true,失敗返回false垮刹。

  • tryRelease(int arg)

此方法是獨(dú)占式的釋放資源方法达吞,成功則返回true,失敗返回false。

  • tryAcquireShared(int arg)

此方法是共享式的獲取資源方法荒典,返回負(fù)數(shù)表示失敗酪劫,0表示獲取成功吞鸭,但是沒有可用資源,正數(shù)表示獲取成功覆糟,且有可用資源刻剥。

  • tryReleaseShared(int arg)

此方法是共享式的釋放資源方法,如果允許喚醒后續(xù)等待線程則返回true,不允許則返回false滩字。

  • isHeldExclusively()

判斷當(dāng)前線程是否正在獨(dú)享資源造虏,是則返回true,否則返回false。

CLH(FIFO)隊(duì)列

AQS中是通過內(nèi)部類Node來維護(hù)一個CLH隊(duì)列的麦箍。源碼如下:

 static final class Node {
        /** 標(biāo)記共享式訪問 */
        static final Node SHARED = new Node();
        /** 標(biāo)記獨(dú)占式訪問 */
        static final Node EXCLUSIVE = null;

        /** 字段waitStatus的值漓藕,表示當(dāng)前節(jié)點(diǎn)已取消等待 */
        static final int CANCELLED =  1;
        /**字段waitStatus的值,表示當(dāng)前節(jié)點(diǎn)取消或釋放資源后挟裂,通知下一個節(jié)點(diǎn) */
        static final int SIGNAL    = -1;
        /** 表示正在等待觸發(fā)條件 */
        static final int CONDITION = -2;
        /**
         * 表示下一個共享獲取應(yīng)無條件傳播
         */
        static final int PROPAGATE = -3;

        /**
         * Status field, taking on only the values:
         *   SIGNAL:     表示當(dāng)前節(jié)點(diǎn)取消或釋放資源后享钞,通知下一個節(jié)點(diǎn)
         *   CANCELLED:  表示當(dāng)前節(jié)點(diǎn)已取消等待
         *   CONDITION:  表示正在等待觸發(fā)條件
         *   PROPAGATE:  表示下一個共享獲取應(yīng)無條件傳播
         */
        volatile int waitStatus;

        /**
         * 前節(jié)點(diǎn)
         */
        volatile Node prev;

        /**
         * 下一個節(jié)點(diǎn)
         */
        volatile Node next;

        /**
         * 節(jié)點(diǎn)對應(yīng)線程
         */
        volatile Thread thread;

        /**
         * 下一個等待的節(jié)點(diǎn)
         */
        Node nextWaiter;

        /**
         * 是否是共享式訪問
         */
        final boolean isShared() {
            return nextWaiter == SHARED;
        }

        /**
         * 返回前節(jié)點(diǎn)
         */
        final Node predecessor() throws NullPointerException {
            Node p = prev;
            if (p == null)
                throw new NullPointerException();
            else
                return p;
        }

        Node() {    // 共享式訪問的構(gòu)造函數(shù)
        }

        Node(Thread thread, Node mode) {     // 用于被添加等待者使用
            this.nextWaiter = mode;
            this.thread = thread;
        }

        Node(Thread thread, int waitStatus) { // 用于Condition使用
            this.waitStatus = waitStatus;
            this.thread = thread;
        }
    }

獨(dú)占模式-獲取資源

使用AQS中的acquire(int arg)方法

    public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

該方法分為4個部分:

  • tryAcquire()

需要自己實(shí)現(xiàn)的方法,如果獲取到資源使用權(quán)诀蓉,則返回true,反之fasle栗竖。如果獲取到資源,返回true,!true為false,根據(jù)&&的短路性渠啤,則不會執(zhí)行后續(xù)方法狐肢,直接跳過程序。如果未獲取到資源沥曹,返回false,!false為true,則進(jìn)入后續(xù)方法处坪。

  • addWaiter()

如果未獲取到資源使用權(quán),則首先會調(diào)用此方法架专。上源碼:

    private Node addWaiter(Node mode) {
        // 封裝當(dāng)前線程和獨(dú)占模式
        Node node = new Node(Thread.currentThread(), mode);
        // 獲取尾部節(jié)點(diǎn)
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
            // CAS設(shè)置尾部節(jié)點(diǎn)
            if (compareAndSetTail(pred, node)) {
                // 將為節(jié)點(diǎn)的下一節(jié)點(diǎn)指向當(dāng)前node
                pred.next = node;
                return node;
            }
        }
        // 如果尾結(jié)點(diǎn)為空或者設(shè)置尾結(jié)點(diǎn)失敗
        enq(node);
        return node;
    }
 private Node enq(final Node node) {
        // 如果CAS設(shè)置未成功則死循環(huán)
        for (;;) {
            // 獲得尾結(jié)點(diǎn)
            Node t = tail;
            // 如果尾節(jié)點(diǎn)為空,說明CLH隊(duì)列為空玄帕,需要初始化
            if (t == null) { 
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
                // 設(shè)置當(dāng)前節(jié)點(diǎn)的前驅(qū)節(jié)點(diǎn)
                node.prev = t;
                // CAS設(shè)置當(dāng)前節(jié)點(diǎn)為尾結(jié)點(diǎn)
                if (compareAndSetTail(t, node)) {
                    // 設(shè)置后驅(qū)節(jié)點(diǎn)
                    t.next = node;
                    return t;
                }
            }
        }
    }
  • acquiredQueued()
    final boolean acquireQueued(final Node node, int arg) {
        // 標(biāo)識資源獲取是否失敗
        boolean failed = true;
        try {
            // 標(biāo)識線程是否中斷
            boolean interrupted = false;
            for (;;) {
                // 獲得當(dāng)前節(jié)點(diǎn)的前驅(qū)節(jié)點(diǎn)
                final Node p = node.predecessor();
                // 如果前驅(qū)節(jié)點(diǎn)為頭結(jié)點(diǎn)部脚,說明快到當(dāng)前節(jié)點(diǎn)了,嘗試獲取資源
                if (p == head && tryAcquire(arg)) {
                    // 獲取資源成功
                    // 設(shè)置當(dāng)前節(jié)點(diǎn)為頭結(jié)點(diǎn)
                    setHead(node);
                    // 取消前驅(qū)節(jié)點(diǎn)(以前的頭部)的后節(jié)點(diǎn)裤纹,方便GC回收
                    p.next = null; // help GC
                    // 標(biāo)識未失敗
                    failed = false;
                    // 返回中斷標(biāo)志
                    return interrupted;
                }
                // 如果當(dāng)前節(jié)點(diǎn)的前驅(qū)節(jié)點(diǎn)不是頭結(jié)點(diǎn)或獲取資源失敗
                // 需要用shouldParkAfterFailedAcquire函數(shù)判斷是否需要阻塞該節(jié)點(diǎn)持有的線程
                // 如果需要阻塞委刘,則執(zhí)行parkAndCheckInterrupt方法,并設(shè)置被中斷
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            // 如果最終獲取資源失敗
            if (failed)
                // 當(dāng)前節(jié)點(diǎn)取消獲取資源
                cancelAcquire(node);
        }
    }
  • selfInterrupt()

中斷當(dāng)前線程

    static void selfInterrupt() {
        Thread.currentThread().interrupt();
    }

獨(dú)占模式-釋放資源

release() 釋放資源并喚醒后繼線程

    public final boolean release(int arg) {
        if (tryRelease(arg)) {
            // 獲取頭結(jié)點(diǎn)
            Node h = head;
            // 頭結(jié)點(diǎn)不為空且等待狀態(tài)值不為0
            if (h != null && h.waitStatus != 0)
                // 喚醒后續(xù)等待線程
                unparkSuccessor(h);
            return true;
        }
        return false;
    }
    private void unparkSuccessor(Node node) {
        int ws = node.waitStatus;
        // 如果等待狀態(tài)值小于0
        if (ws < 0)
            // 使用CAS將waitStatus設(shè)置為0
            compareAndSetWaitStatus(node, ws, 0);

        Node s = node.next;
        // 如果當(dāng)前節(jié)點(diǎn)沒有后繼節(jié)點(diǎn)或者后繼節(jié)點(diǎn)放棄競爭資源
        if (s == null || s.waitStatus > 0) {
            s = null;
            // 從隊(duì)列尾部循環(huán)直到當(dāng)前節(jié)點(diǎn)鹰椒,找到最近的且等待狀態(tài)值小于0的節(jié)點(diǎn)
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        // 如果找到的后繼節(jié)點(diǎn)不為空,則喚醒其持有的線程
        if (s != null)
            LockSupport.unpark(s.thread);
    }
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末锡移,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子漆际,更是在濱河造成了極大的恐慌淆珊,老刑警劉巖,帶你破解...
    沈念sama閱讀 211,639評論 6 492
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件奸汇,死亡現(xiàn)場離奇詭異施符,居然都是意外死亡往声,警方通過查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,277評論 3 385
  • 文/潘曉璐 我一進(jìn)店門戳吝,熙熙樓的掌柜王于貴愁眉苦臉地迎上來浩销,“玉大人,你說我怎么就攤上這事听哭÷螅” “怎么了?”我有些...
    開封第一講書人閱讀 157,221評論 0 348
  • 文/不壞的土叔 我叫張陵陆盘,是天一觀的道長普筹。 經(jīng)常有香客問我,道長礁遣,這世上最難降的妖魔是什么斑芜? 我笑而不...
    開封第一講書人閱讀 56,474評論 1 283
  • 正文 為了忘掉前任,我火速辦了婚禮祟霍,結(jié)果婚禮上杏头,老公的妹妹穿的比我還像新娘。我一直安慰自己沸呐,他們只是感情好醇王,可當(dāng)我...
    茶點(diǎn)故事閱讀 65,570評論 6 386
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著崭添,像睡著了一般寓娩。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上呼渣,一...
    開封第一講書人閱讀 49,816評論 1 290
  • 那天棘伴,我揣著相機(jī)與錄音,去河邊找鬼屁置。 笑死焊夸,一個胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的蓝角。 我是一名探鬼主播阱穗,決...
    沈念sama閱讀 38,957評論 3 408
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼使鹅!你這毒婦竟也來了揪阶?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 37,718評論 0 266
  • 序言:老撾萬榮一對情侶失蹤患朱,失蹤者是張志新(化名)和其女友劉穎鲁僚,沒想到半個月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 44,176評論 1 303
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡蕴茴,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,511評論 2 327
  • 正文 我和宋清朗相戀三年劝评,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片倦淀。...
    茶點(diǎn)故事閱讀 38,646評論 1 340
  • 序言:一個原本活蹦亂跳的男人離奇死亡蒋畜,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出撞叽,到底是詐尸還是另有隱情姻成,我是刑警寧澤,帶...
    沈念sama閱讀 34,322評論 4 330
  • 正文 年R本政府宣布愿棋,位于F島的核電站科展,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏糠雨。R本人自食惡果不足惜才睹,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,934評論 3 313
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望甘邀。 院中可真熱鬧琅攘,春花似錦、人聲如沸松邪。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,755評論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽逗抑。三九已至剧辐,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間邮府,已是汗流浹背荧关。 一陣腳步聲響...
    開封第一講書人閱讀 31,987評論 1 266
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留褂傀,地道東北人羞酗。 一個月前我還...
    沈念sama閱讀 46,358評論 2 360
  • 正文 我出身青樓,卻偏偏與公主長得像紊服,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子胸竞,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 43,514評論 2 348