JUC之AbstractQueuedSynchronizer

1. 前言

AbstractQueuedSynchronizer(AQS)基于FIFO等待隊(duì)列以及CAS操作實(shí)現(xiàn)了基礎(chǔ)了同步框架,JUC中包括ReentrantLock,ReentrantReadWriteLock迂卢,CountDownLatch,Condition都是基于AbstractQueuedSynchronizer提供的基礎(chǔ)同步操作來實(shí)現(xiàn)的。

2. AbstractQueuedSynchronizer實(shí)現(xiàn)原理

2. 1 CLH隊(duì)列

CLH(代表Craig, Landin, Hagersten三人)隊(duì)列是一個(gè)FIFO的隊(duì)列瘦穆,這個(gè)隊(duì)列用來對資源競爭者(不同的線程就是一個(gè)競爭者)進(jìn)行排隊(duì)棚点。

2.1.1 隊(duì)列節(jié)點(diǎn)Node

AQS實(shí)現(xiàn)中隊(duì)列中一個(gè)節(jié)點(diǎn)即一個(gè)競爭者该镣,用類Node祠墅,Node類有如下主要成員:

成員 含義 取值
waitStatus 當(dāng)前節(jié)點(diǎn)狀態(tài) SIGNAL=-1 表示當(dāng)前節(jié)點(diǎn)釋放資源后需要喚醒他的后繼阻塞中的節(jié)點(diǎn); CANCELLED=1當(dāng)前節(jié)點(diǎn)已經(jīng)取消對資源的競爭; PROPAGATE=-3 用于競爭共享資源競爭侮穿,表示當(dāng)前節(jié)點(diǎn)獲取資源后,應(yīng)該喚醒其他排隊(duì)阻塞中獲取該共享資源的競爭者; CONDITION=-2專門用于Condition的實(shí)現(xiàn)
Node prev,next 當(dāng)前節(jié)點(diǎn)的前驅(qū)和后繼節(jié)點(diǎn) --
Thread thread 當(dāng)前排隊(duì)的線程毁嗦,也就是競爭資源的線程 --
Node nextWaiter 如果waitStatus=CONDITION則nextWaiter連接到下一個(gè)等待在同一個(gè)condition上的node亲茅;nextWaiter也可以是class NODE的靜態(tài)成員SHARED表示當(dāng)前節(jié)點(diǎn)競爭共享資源;等于null表示競爭排他性資源

1. 節(jié)點(diǎn)種類
根據(jù)上表中nextWaiter可以看出狗准,實(shí)際上node的種類有三種:

  1. exclusive node(表示當(dāng)前節(jié)點(diǎn)申請排他新資源)
  2. shared node(表示當(dāng)前節(jié)點(diǎn)申請的是共享資源)
  3. condition克锣, 當(dāng)前節(jié)點(diǎn)等待條件變量

不同的node種類獲取資源(對應(yīng)入隊(duì)操作)和釋放資源(對應(yīng)出隊(duì)操作)都有所不同

2.2.2 入隊(duì)操作

也就是去競爭資源。
這里只介紹一下exclusive和shared node的入隊(duì)操作

2.2.2.1 exclusive node入隊(duì)操作

使用AQS競爭排他性資源時(shí)腔长,需要調(diào)用如下兩個(gè)接口之一:

/*
調(diào)用acquire獲取資源袭祟,這是一個(gè)阻塞操作直到獲取到資源,且不可被中斷捞附。
*/
public final void acquire(long arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
}

/**
和acquire不同的是競爭者等待期間是可以被中斷
*/
 public final void acquireInterruptibly(long arg)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        if (!tryAcquire(arg))
            doAcquireInterruptibly(arg);
    }

兩者都調(diào)用了tryAcquire(), 這個(gè)方法需要使用AQS的用戶繼承并實(shí)現(xiàn)它巾乳,并且這個(gè)調(diào)用不應(yīng)該阻塞,之所以提供tryAcquire操作是因?yàn)楫?dāng)前競爭者可能已經(jīng)獲取了資源鸟召,那么在tryAcquire可以做一下判斷胆绊,避免已經(jīng)獲取了資源后還去盲目的進(jìn)去CLH隊(duì)列排隊(duì)。
典型的就是重進(jìn)入鎖ReentrantLock的實(shí)現(xiàn)欧募,獲取鎖的線程可以再次獲取鎖压状,那么就可以在tryAcquire中做一下判斷。

關(guān)于參數(shù)arg跟继,可以理解成嘗試獲取資源數(shù)量吧种冬,實(shí)際上這個(gè)跟AQS的一個(gè)重要的成員state有關(guān),當(dāng)用戶使用AQS時(shí)舔糖,需要設(shè)置state這個(gè)整數(shù)值娱两,關(guān)于這個(gè)state,應(yīng)該可以當(dāng)成是對資源狀態(tài)的描述吧金吗。比如ReenctrantLock中可以根據(jù)state是否等于0判斷是否加鎖了谷婆,以及state的值大小判斷重進(jìn)入鎖加了幾次鎖。

上面代碼addWaiter()其實(shí)就是不停的使用CAS操作將當(dāng)前節(jié)點(diǎn)加入CLH隊(duì)列尾部辽聊。

2. acquireQueued()

下面是acquireQueued的代碼實(shí)現(xiàn):

final boolean acquireQueued(final Node node, long arg) {
        boolean failed = true;
        try {
            // 線程中斷標(biāo)記
            boolean interrupted = false;
            for (;;) {
                // 獲得當(dāng)前節(jié)點(diǎn)的前驅(qū)
                final Node p = node.predecessor();
                // 如果前驅(qū)節(jié)點(diǎn)已經(jīng)是隊(duì)列頭,那就意味著看起來自己最優(yōu)先獲取到資源期贫,然后就調(diào)用tryAcquire嘗試獲取資源跟匆。
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                // 當(dāng)前節(jié)點(diǎn)不是頭節(jié)點(diǎn)則排隊(duì)阻塞。
                // shouldParkAfterFailedAcquire會(huì)將前驅(qū)節(jié)點(diǎn)的waitStatus標(biāo)記成SIGNAL表示告訴前驅(qū)節(jié)點(diǎn):你釋放資源后需要喚醒我通砍,我可能阻塞了
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    // 如果排隊(duì)過程中被中斷玛臂,記下來烤蜕,不甩異常
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

以上可以看出acquireQueued其實(shí)就是在不停的做如下嘗試:

  1. 判斷當(dāng)前節(jié)點(diǎn)是不是最前面了,是則調(diào)用tryAcquire嘗試獲取資源迹冤,獲取成功就返回讽营,獲取失敗或者不是最前面節(jié)點(diǎn)則阻塞等待,然后到2
  2. 無限等待然后被喚醒泡徙,繼續(xù)1.

2. doAcquireInterruptibly()
和acquireQueued的過程是一樣的橱鹏,就是等待的過程中被中斷了立即甩異常

2.2.2.2 shared node入隊(duì)操作

和exelusive模式一樣都有可中斷和不可中斷兩個(gè),如下:

 public final void acquireShared(long arg) {
        if (tryAcquireShared(arg) < 0)
            doAcquireShared(arg);
 }

public final void acquireSharedInterruptibly(long arg)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        if (tryAcquireShared(arg) < 0)
            doAcquireSharedInterruptibly(arg);
}

tryAcquireShared()需要使用AQS使用者繼承實(shí)現(xiàn)堪藐,不過和tryAcquire不一樣的是它返回一個(gè)整數(shù)莉兰,小于0表示嘗試失敗了。

1. doAcquireShared

private void doAcquireShared(int arg) {
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head) {
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {
                       // 和exclusive模式下acquireQueued整體流程是一樣的礁竞,
                       // 不同的是在這里糖荒,成功獲取到競爭資源后,調(diào)用的這個(gè)方法模捂。
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        if (interrupted)
                            selfInterrupt();
                        failed = false;
                        return;
                    }
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
 }

關(guān)于上面setHeadAndPropagate做了什么捶朵,先通過下圖描述一下為什么會(huì)有這個(gè)方法的調(diào)用吧.

假設(shè)現(xiàn)在排隊(duì)隊(duì)列如下:


-----     ------    -----     -----
| E1 |  ->| S1 | -> | S2 | ->| S3 |
-----     ------    -----     ----

節(jié)點(diǎn)E1是頭節(jié)點(diǎn),exclusively持有資源狂男, S1, S2, S3都是在申請共享訪問資源综看,然后因?yàn)镋1已經(jīng)持有了,所以都阻塞了并淋。

然后E1釋放資源了寓搬,所以他會(huì)喚醒阻塞中的S1,S1拿到了資源县耽,但是由于S1是共享模式去拿資源句喷,S2,S3也都是去申請共享資源兔毙,所以S1應(yīng)該去同樣喚醒S2唾琼,S3一起去分享資源。 這也就是setHeadAndPropagate干的事情--喚醒S2, S2醒了喚醒S3澎剥。

2.2.3 出隊(duì)操作

也就是釋放資源.

2.2.3.1 exclusive node釋放資源

釋放資源通過如下方法實(shí)現(xiàn):

public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
 }

tryRelease和tryAcquire一樣锡溯,是需要使用者繼承并實(shí)現(xiàn)的,返回true表示資源完全被釋放(也就是需要喚醒后面等待的node)哑姚,tryRelease同樣需要結(jié)合資源狀態(tài)state去判斷祭饭,且應(yīng)該是一個(gè)非阻塞的調(diào)用。

下面unparkSuccessor喚醒后繼等待者的實(shí)現(xiàn):

private void unparkSuccessor(Node node) {
       
        int ws = node.waitStatus;
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);

        Node s = node.next;
        // 后繼節(jié)點(diǎn)等于null并不能代表沒有后繼節(jié)點(diǎn)叙量,原因跟入隊(duì)addWaiter的實(shí)現(xiàn)有關(guān).
        // 調(diào)用addWaiter入隊(duì)newNode時(shí)先用CAS將tail設(shè)置成newNode倡蝙,然后再將之前的tail.next設(shè)置成newNode,
        // 所以這中間的時(shí)間間隔中有tail但是之前的tail還沒有完成和newNode連接绞佩,也就是之前的tail的后繼是null
        // 而s.waitStatus >0 則表示這個(gè)節(jié)點(diǎn)狀態(tài)為CACELLED取消競爭資源寺鸥,應(yīng)該再找它后面的節(jié)點(diǎn)
        // for循環(huán)里從tail往前遍歷找到最前面的沒有CANCELLED的節(jié)點(diǎn)猪钮,然后喚醒它。
        if (s == null || s.waitStatus > 0) {
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null)
            // 喚醒找到的節(jié)點(diǎn)對應(yīng)的線程胆建。
            LockSupport.unpark(s.thread);
    }

2.2.3.2 shared node釋放資源

調(diào)用如下方法:

public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {
            doReleaseShared();
            return true;
        }
        return false;
 }

tryReleaseShared需要使用AQS的繼承并實(shí)現(xiàn)它烤低,返回true表示成功釋放,意味著需要喚醒后續(xù)等待者笆载。

關(guān)于doReleaseShared就是喚醒后繼所有等待的節(jié)點(diǎn)扑馁,被喚醒的節(jié)點(diǎn)又會(huì)在前面介紹的各種doAcquireXXX中醒來去嘗試獲取資源,獲取不到的又會(huì)等待宰译。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末檐蚜,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子沿侈,更是在濱河造成了極大的恐慌闯第,老刑警劉巖,帶你破解...
    沈念sama閱讀 219,110評論 6 508
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件缀拭,死亡現(xiàn)場離奇詭異咳短,居然都是意外死亡,警方通過查閱死者的電腦和手機(jī)蛛淋,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,443評論 3 395
  • 文/潘曉璐 我一進(jìn)店門咙好,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人褐荷,你說我怎么就攤上這事勾效。” “怎么了叛甫?”我有些...
    開封第一講書人閱讀 165,474評論 0 356
  • 文/不壞的土叔 我叫張陵层宫,是天一觀的道長。 經(jīng)常有香客問我其监,道長萌腿,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 58,881評論 1 295
  • 正文 為了忘掉前任抖苦,我火速辦了婚禮毁菱,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘锌历。我一直安慰自己贮庞,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,902評論 6 392
  • 文/花漫 我一把揭開白布究西。 她就那樣靜靜地躺著窗慎,像睡著了一般。 火紅的嫁衣襯著肌膚如雪怔揩。 梳的紋絲不亂的頭發(fā)上捉邢,一...
    開封第一講書人閱讀 51,698評論 1 305
  • 那天,我揣著相機(jī)與錄音商膊,去河邊找鬼伏伐。 笑死,一個(gè)胖子當(dāng)著我的面吹牛晕拆,可吹牛的內(nèi)容都是我干的藐翎。 我是一名探鬼主播,決...
    沈念sama閱讀 40,418評論 3 419
  • 文/蒼蘭香墨 我猛地睜開眼实幕,長吁一口氣:“原來是場噩夢啊……” “哼吝镣!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起昆庇,我...
    開封第一講書人閱讀 39,332評論 0 276
  • 序言:老撾萬榮一對情侶失蹤末贾,失蹤者是張志新(化名)和其女友劉穎,沒想到半個(gè)月后整吆,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體拱撵,經(jīng)...
    沈念sama閱讀 45,796評論 1 316
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,968評論 3 337
  • 正文 我和宋清朗相戀三年表蝙,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了拴测。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 40,110評論 1 351
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡府蛇,死狀恐怖集索,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情汇跨,我是刑警寧澤务荆,帶...
    沈念sama閱讀 35,792評論 5 346
  • 正文 年R本政府宣布,位于F島的核電站扰法,受9級(jí)特大地震影響蛹含,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜塞颁,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,455評論 3 331
  • 文/蒙蒙 一浦箱、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧祠锣,春花似錦酷窥、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,003評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至澡腾,卻和暖如春沸伏,著一層夾襖步出監(jiān)牢的瞬間糕珊,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 33,130評論 1 272
  • 我被黑心中介騙來泰國打工毅糟, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留红选,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 48,348評論 3 373
  • 正文 我出身青樓姆另,卻偏偏與公主長得像喇肋,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個(gè)殘疾皇子迹辐,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,047評論 2 355

推薦閱讀更多精彩內(nèi)容