JAVA并發(fā)(9)——AQS介紹

AQS概述
AbstractQueuedSynchronizer(AQS)是一個提供基礎(chǔ)框架婿崭,JDK提供的Lock是通過AQS框架完成铡原,程序員也可以利用AQS實(shí)現(xiàn)自己的鎖绣溜。以JDK提高的ReentrantLock為例鞭达,如果創(chuàng)建了一個ReentrantLock類的對象lock话肖,lock對象中就包含了AQS的一個子類的實(shí)例sync疲酌。
AQS的大致邏輯是:客戶端代碼在執(zhí)行l(wèi)ock.lock()的方法的時候蜡峰,當(dāng)前線程會去檢測AQS的鎖標(biāo)志(同步狀態(tài))來判斷鎖是否可以被持有,如果鎖被其他線程占用朗恳,那么當(dāng)前線程會被放到AQS的等待隊(duì)列中湿颅。檢測AQS所標(biāo)志位,其實(shí)就是檢查lock對象中sync變量中的鎖標(biāo)志位粥诫,AQS的等待隊(duì)列也就是sync中的等待隊(duì)列油航。
程序員可以使用AQS提供的getState(),setState()和compareAndSetState()方法來檢查和修改AQS中鎖標(biāo)志。程序員只需要重寫如下方法:
下面方法是排它鎖

  • tryAcquire(int)
  • tryRelease(int)
    下面方法是共享鎖
  • tryAcquireShared(int)
  • tryReleaseShared(int)
  • isHeldExclusively()
    AQS的其他方法都是final怀浆,不需要改變谊囚,AQS本質(zhì)上不是個抽象類。

AQS實(shí)現(xiàn)原理

AQS保護(hù)了一個FIFO隊(duì)列执赡,該隊(duì)列是一個雙向列表镰踏,實(shí)現(xiàn)隊(duì)列的數(shù)據(jù)結(jié)構(gòu)是定義在AQS中的Node類。該隊(duì)列主要保存獲取鎖失敗的線程沙合。AQS中的基本字段:

//指向FIFO列表的頭結(jié)點(diǎn)
private transient volatile Node head;
//指向FIFO列表的尾結(jié)點(diǎn)
private transient volatile Node tail;
/**同步狀態(tài) 0表示鎖可以被線程獲取 1表示鎖被其他線程持有
*線程在執(zhí)行l(wèi)ock方法的時候奠伪,就是通過判斷和修改該值來申請鎖
/*
private volatile int state;
//該屬性是AQS父類的字段,記錄了獨(dú)占鎖的線程
private transient Thread exclusiveOwnerThread;

介紹AQS的實(shí)現(xiàn)原理首懈,需要從它的某個子類開始绊率,這里以ReentrantLock為例。首先定義一個需要同步的測試類

class LockTest{
    Lock lock = new ReentrantLock();
    public void p(){
        try {
            lock.lock();
            while (true){
            }
        }
        finally {
            lock.unlock();
        }
    }
}

然后定義訪問同步代碼的線程類:

class LockA extends Thread{
    LockTest lockTest;
    public LockA(LockTest l,String name){
        super(name);
        lockTest = l;
    }    
    @Override
    public void run() {
        lockTest.p();
    }
}

下面是測試函數(shù):

public static void main(String[] args) throws Exception{
        LockTest lt = new LockTest();
        LockA lockA = new LockA(lt,"A");
        LockA lockB = new LockA(lt,"B");
        LockA lockC = new LockA(lt,"C");
        LockA lockD = new LockA(lt,"D");       
        lockA.start();
        //讓線程A先啟動
        Thread.sleep(500);
        lockB.start();
        lockC.start();
        lockD.start();
}

用idea進(jìn)行斷點(diǎn)調(diào)試究履,首先調(diào)用lockA.start(),讓lockA線程進(jìn)入臨界區(qū)滤否,再進(jìn)入臨界區(qū)之前,會調(diào)用lock.lock()方法先獲取鎖最仑。lock.lock()的實(shí)現(xiàn)如下:

final void lock() {
            //CAS更新state狀態(tài)值
            if (compareAndSetState(0, 1))
               //由于ReenterLock是獨(dú)占鎖,下面的是調(diào)用AQS父類的方法去設(shè)置當(dāng)前擁有獨(dú)占鎖的線程  setExclusiveOwnerThread(Thread.currentThread());
            else
                acquire(1);
}

compareAndSetState是CAS操作藐俺,來判斷AQS中state的狀態(tài)是否為0,如果是0則更新為1泥彤,因?yàn)閘ockA第一個進(jìn)入臨界區(qū)的線程紊搪,那么它執(zhí)行compareAndSetState會成功,然后就會去執(zhí)行if語句的其他操作:擁有獨(dú)占鎖的線程全景。如果compareAndSetState()操作失敗,那么將會去執(zhí)行acquire(1)牵囤,這個方法的邏輯等一下lockB線程的時候再分析爸黄。
lockA執(zhí)行完setExclusiveOwnerThread方法后滞伟,獲取鎖完成,開始執(zhí)行臨界區(qū)代碼炕贵。
下面lockB線程啟動梆奈,同樣調(diào)用lock.lock()方法來獲取鎖,由于lockA正在執(zhí)行臨界區(qū)代碼称开,鎖沒有釋放亩钟,因此lockB線程在執(zhí)行compareAndSetState操作時就會失敗,轉(zhuǎn)而去執(zhí)行acquire(1);下面是acquire方法的代碼:

public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
}

該方法執(zhí)行邏輯:
1.如果tryAcquire方法返回false鳖轰,然后去執(zhí)行2;返回true,方法退出

  • addWaiter方法是向FIFO列表中增加一個等待節(jié)點(diǎn)清酥,然后執(zhí)行3
  • acqd如果返回true,那么執(zhí)行4,否則退出

tryAcquire方法邏輯

tryAcquire方法是AQS提供的需要實(shí)現(xiàn)者重新的方法蕴侣,用于以獨(dú)占式獲取鎖焰轻,在ReenterLock中,tryAcquire的實(shí)現(xiàn)如下

protected final boolean tryAcquire(int acquires) {
            return nonfairTryAcquire(acquires);
}

nonfairTryAcquire(acquires)方法實(shí)現(xiàn)如下:

final boolean nonfairTryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            //再次獲取AQS鎖狀態(tài)昆雀,如果此時之前的線程已經(jīng)執(zhí)行完成,lockB可以嘗試調(diào)用
            //compareAndSetState方法獲取鎖,入股返回true那么lockB線程就不需要加入到FIFO隊(duì)列
            int c = getState();
            if (c == 0) {
                if (compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }//如果獲取鎖狀態(tài)不為0,然后檢查當(dāng)前持有鎖的線程是不是當(dāng)前執(zhí)行的線程,
            //如果是同一個線程求妹,那么就把AQS的鎖狀態(tài)值加上acquires
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0) // overflow
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            //如果compareAndSetState執(zhí)行失敗或者current != getExclusiveOwnerThread()
            //那么tryAcquire方法返回false止吁,然后去執(zhí)行把當(dāng)前線程加入到FIFO列表中的邏輯
            return false;
        }

addWaiter方法邏輯

當(dāng)tryAcquire方法返回false,然后去執(zhí)行addWaiter(Node.EXCLUSIVE)方法,Node.EXCLUSIVE值為null

/** 該值表示創(chuàng)建的節(jié)點(diǎn)是獨(dú)占式的 */
static final Node EXCLUSIVE = null;

addWaiter方法的邏輯如下:

     /**
     * 為當(dāng)前線程創(chuàng)建了一個FIFO隊(duì)列的節(jié)點(diǎn),并且需要給節(jié)點(diǎn)設(shè)置鎖類型
     *
     * 參數(shù)mode:Node.EXCLUSIVE表示排他鎖,Node.SHARED表示共享鎖
     */
    private Node addWaiter(Node mode) {
        //用當(dāng)前線程和鎖類型創(chuàng)建一個Node對象
        Node node = new Node(Thread.currentThread(), mode);
        
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        enq(node);
        return node;
    }

Node是AQS的內(nèi)部類挽封,它主要包括:

        //FIFO列表等待線程的鎖模式:共享鎖
        static final Node SHARED = new Node();
        //FIFO列表等待線程的鎖模式:獨(dú)占鎖
        static final Node EXCLUSIVE = null;
        //線程狀態(tài) 取消執(zhí)行
        static final int CANCELLED =  1;
        //線程狀態(tài) 可以接受被喚起操作
        static final int SIGNAL    = -1;
        //線程狀態(tài) 等待CONDITION條件
        static final int CONDITION = -2;
        //線程狀態(tài) 等待CONDITION條件并且無條件傳播
        static final int PROPAGATE = -3;
        //線程狀態(tài)
        volatile int waitStatus;
        //前序節(jié)點(diǎn)
        volatile Node prev;
        //后續(xù)節(jié)點(diǎn)
        volatile Node next;
        //保存當(dāng)前線程
        volatile Thread thread;
        //節(jié)點(diǎn)鎖模式
        Node nextWaiter;

由于lockB在執(zhí)行的時候已球,head和tail都是null,因此if (pred != null) 不會執(zhí)行场仲,直接執(zhí)行enq方法:

private Node enq(final Node node) {
        for (;;) {
            Node t = tail;
            if (t == null) { 
                //如果AQS尾節(jié)點(diǎn)是null和悦,說明AQS中的FIFO對里是空的,
                //需要創(chuàng)建一個空的Node對象渠缕,作為FIFO的頭結(jié)點(diǎn)
                if (compareAndSetHead(new Node()))
                    tail = head;
            } 
            //如果tail不為null鸽素,把當(dāng)前AQS的尾節(jié)點(diǎn)賦給新創(chuàng)建的node節(jié)點(diǎn)的前序節(jié)點(diǎn),
            //然后調(diào)用CAS把新創(chuàng)建的node設(shè)置為AQS的尾節(jié)點(diǎn)
            else {
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
}

<font color=red>注:enq方法為AQS設(shè)置FILO的頭結(jié)點(diǎn)和尾節(jié)點(diǎn)亦鳞,使用CAS+死循環(huán)馍忽,直到設(shè)置成功。因?yàn)楫?dāng)一個線程在調(diào)用CAS設(shè)置的時候燕差,可能另外一個線程已經(jīng)調(diào)用CAS成功返回遭笋,因此會出現(xiàn)調(diào)用失敗的,如果調(diào)用失敗徒探,需要循環(huán)重新設(shè)置瓦呼。enq只有在成功設(shè)置了tail之后才會返回</font>
由于本次調(diào)試是用多線程debug模式,因此测暗,不會出現(xiàn)多個線程同時執(zhí)行CAS的情況央串,因此當(dāng)lockB執(zhí)行到compareAndSetHead會成功返回磨澡,這樣就創(chuàng)建了一個空的頭結(jié)點(diǎn),然后把頭結(jié)點(diǎn)對象賦值給tail质和,此時FIFO隊(duì)列狀態(tài)如下圖:


本次循環(huán)操作完成之后稳摄,因?yàn)槭茄h(huán),沒有退出循環(huán)饲宿,然后再執(zhí)行一次循環(huán)體厦酬,這次因?yàn)閠ail!=null,所以會執(zhí)行else操作瘫想,使用CAS把lockB代表的node設(shè)置為tail,然后返回仗阅。操作之后的FIFO如下圖:

addWaiter方法執(zhí)行完成,線程被放到FIFO列表中殿托。下面是執(zhí)行acquireQueued方法:

final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                //獲取當(dāng)前線程的prev節(jié)點(diǎn)
                final Node p = node.predecessor();
                //如果p是頭結(jié)點(diǎn)霹菊,那么讓當(dāng)前線程再嘗試獲取一次AQS節(jié)點(diǎn)鎖狀態(tài),如果獲成功支竹,
                //那么把當(dāng)前線程所在的node設(shè)置為head旋廷,并且把node節(jié)點(diǎn)其prev、next礼搁,thread
                //都變?yōu)閚ull
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
}

根據(jù)上面源碼饶碘,p是node的前序節(jié)點(diǎn),根據(jù)當(dāng)前程序運(yùn)行狀態(tài)p==head馒吴,但是tryAcquire方法執(zhí)行返回false扎运。然后執(zhí)行shouldParkAfterFailedAcquire方法,該方法有兩個邏輯饮戳,如果當(dāng)前節(jié)點(diǎn)的前序節(jié)點(diǎn)p.waitStatus=SIGNAL,那么返回true豪治;如果是大于0,那么需要把這些大于0的node從FIFO中移除(按照從后往前遍歷的方法)并返回false。如果是0扯罐,把當(dāng)前線程鎖在的node節(jié)點(diǎn)前序節(jié)點(diǎn)的waitStatus設(shè)置為SIGNAL(-1)负拟,返回false。

如果shouldParkAfterFailedAcquire返回true歹河,然后去執(zhí)行parkAndCheckInterrupt()掩浙。本測試程序,此時返回是false秸歧。此刻FIFO隊(duì)列如圖:

程序返回后厨姚,接著循環(huán)開始處執(zhí)行。此刻當(dāng)程序重新執(zhí)行到shouldParkAfterFailedAcquire方法是因?yàn)閜節(jié)點(diǎn)的waitStatus=-1键菱,因此返回true谬墙。然后程序接著執(zhí)行parkAndCheckInterrupt()方法:

private final boolean parkAndCheckInterrupt() {
        LockSupport.park(this);
        return Thread.interrupted();
}

調(diào)用LockSupport.park(this),當(dāng)前線程會被所阻塞一直到調(diào)用LockSupport.uppark()方法喚起险耀。

接著執(zhí)行測試程序的lockC.start();第三個線程啟動玖喘。第三個線程經(jīng)過上面的步驟后執(zhí)行到acquireQueued()時,F(xiàn)IFO隊(duì)列的狀態(tài)如下圖:

然后執(zhí)行到shouldParkAfterFailedAcquire方法蘑志,由于當(dāng)前線程是lockC累奈,那么他的前序節(jié)點(diǎn)是lockB急但,此時lockB的waitStatus=0,因此shouldParkAfterFailedAcquire執(zhí)行修改lockB的waitStatus=-1的操作波桩,然后方法返回false戒努,然后在從循環(huán)開始處執(zhí)行。然后又會執(zhí)行到shouldParkAfterFailedAcquire方法镐躲,此時由于lockC的前序節(jié)點(diǎn)lockB的waitStatus==-1直接返回true,然后當(dāng)前線程lockC執(zhí)行parkAndCheckInterrupt()把自己阻塞萤皂。

下面分析鎖的釋放。鎖釋放是持有線程執(zhí)行:

finally {
        lock.unlock();
}

unlock方法調(diào)用的sync.release(1);此方法的具體實(shí)現(xiàn)如下:

public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
}

tryRelease()方法是在ReenterLock類中的Sync內(nèi)部實(shí)現(xiàn)端礼,他主要邏輯是用當(dāng)前AQS的鎖狀態(tài)-1的差值是否與0相等入录,如果不等于0,那說明還需要等其他線程調(diào)用unLock(),tryRelease()返回false僚稿,這個差值計(jì)算是為了實(shí)現(xiàn)共享鎖的使用贫奠。直到AQS的鎖狀態(tài)與1的差值為0,然后返回true

protected final boolean tryRelease(int releases) {
            int c = getState() - releases;
            if (Thread.currentThread() != getExclusiveOwnerThread())
                throw new IllegalMonitorStateException();
            boolean free = false;
            if (c == 0) {
                free = true;
                setExclusiveOwnerThread(null);
            }
            setState(c);
            return free;
}

當(dāng)該方法返回true之后唤崭,執(zhí)行unparkSuccessor(h)這里參數(shù)h是FIFO列表的頭結(jié)點(diǎn)谢肾,具體代碼邏輯如下:

  private void unparkSuccessor(Node node) {
        //獲取頭結(jié)點(diǎn)的鎖狀態(tài)信息
        int ws = node.waitStatus;
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);

        
        Node s = node.next;
        if (s == null || s.waitStatus > 0) {
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        //把頭結(jié)點(diǎn)的后繼節(jié)點(diǎn)所存儲的線程從阻塞狀態(tài)中釋放出來
        if (s != null)
            LockSupport.unpark(s.thread);
    }

當(dāng)執(zhí)行到LockSupport.unpark(s.thread)時,F(xiàn)IFO第一個阻塞線程會從阻塞狀態(tài)中返回,也就是調(diào)用LockSupport.park()方法返回微姊,執(zhí)行后面代碼分预,然后開始執(zhí)行acquireQueued方法。此方法邏輯上面已經(jīng)分析過了配喳。
到此,AQS基本加鎖解鎖已經(jīng)分析完成凳干。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末救赐,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子泌绣,更是在濱河造成了極大的恐慌馋贤,老刑警劉巖,帶你破解...
    沈念sama閱讀 222,590評論 6 517
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件仿滔,死亡現(xiàn)場離奇詭異犹芹,居然都是意外死亡,警方通過查閱死者的電腦和手機(jī)腰埂,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 95,157評論 3 399
  • 文/潘曉璐 我一進(jìn)店門屿笼,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人休雌,你說我怎么就攤上這事肝断〕哿荩” “怎么了担扑?”我有些...
    開封第一講書人閱讀 169,301評論 0 362
  • 文/不壞的土叔 我叫張陵,是天一觀的道長胚宦。 經(jīng)常有香客問我洁奈,道長,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 60,078評論 1 300
  • 正文 為了忘掉前任印叁,我火速辦了婚禮军掂,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘蝗锥。我一直安慰自己,他們只是感情好汇竭,可當(dāng)我...
    茶點(diǎn)故事閱讀 69,082評論 6 398
  • 文/花漫 我一把揭開白布穴张。 她就那樣靜靜地躺著,像睡著了一般玻驻。 火紅的嫁衣襯著肌膚如雪偿枕。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 52,682評論 1 312
  • 那天嗤锉,我揣著相機(jī)與錄音捺萌,去河邊找鬼膘茎。 笑死酷誓,一個胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的棒拂。 我是一名探鬼主播玫氢,決...
    沈念sama閱讀 41,155評論 3 422
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼攻旦!你這毒婦竟也來了生逸?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 40,098評論 0 277
  • 序言:老撾萬榮一對情侶失蹤烙无,失蹤者是張志新(化名)和其女友劉穎遍尺,沒想到半個月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體迂苛,經(jīng)...
    沈念sama閱讀 46,638評論 1 319
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡歧蕉,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 38,701評論 3 342
  • 正文 我和宋清朗相戀三年惯退,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片催跪。...
    茶點(diǎn)故事閱讀 40,852評論 1 353
  • 序言:一個原本活蹦亂跳的男人離奇死亡懊蒸,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出骑丸,到底是詐尸還是另有隱情妒貌,我是刑警寧澤铸豁,帶...
    沈念sama閱讀 36,520評論 5 351
  • 正文 年R本政府宣布节芥,位于F島的核電站,受9級特大地震影響头镊,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜颖杏,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 42,181評論 3 335
  • 文/蒙蒙 一坛芽、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧,春花似錦机久、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,674評論 0 25
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽软棺。三九已至,卻和暖如春喘落,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背瘦棋。 一陣腳步聲響...
    開封第一講書人閱讀 33,788評論 1 274
  • 我被黑心中介騙來泰國打工赌朋, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留篇裁,地道東北人赡若。 一個月前我還...
    沈念sama閱讀 49,279評論 3 379
  • 正文 我出身青樓,卻偏偏與公主長得像往枣,于是被迫代替她去往敵國和親粉渠。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,851評論 2 361

推薦閱讀更多精彩內(nèi)容