32.Android架構(gòu)-線程(三)

AbstractQueuedSynchronizer

隊(duì)列同步器纫版,簡(jiǎn)稱(chēng)AQS,是一個(gè)用來(lái)構(gòu)建鎖或者其他同步組件的基礎(chǔ)框架,他使用了一個(gè)int型的成員變量來(lái)表示同步狀態(tài)客情,內(nèi)部通過(guò)FIFO隊(duì)列來(lái)完成資源獲取線程的排隊(duì)工作

使用AQS實(shí)現(xiàn)的鎖加鎖和釋放的過(guò)程(為了簡(jiǎn)單其弊,以不可重入鎖為例說(shuō)明)

lock()方法調(diào)用時(shí),嘗試去獲取鎖膀斋,如果當(dāng)前鎖沒(méi)有被其他線程持有梭伐,則獲取成功,將state+1仰担;如果當(dāng)前鎖已經(jīng)被其他線程持有糊识,則將當(dāng)前線程加入CLH隊(duì)列,此時(shí)還存在兩種情況,1.隊(duì)列為空赂苗,創(chuàng)建一個(gè)節(jié)點(diǎn)作為頭愉耙,并將新加入的節(jié)點(diǎn)加入到隊(duì)列作為尾節(jié)點(diǎn),2.隊(duì)列非空拌滋,直接將新加入的節(jié)點(diǎn)作為尾節(jié)點(diǎn)(設(shè)置為尾節(jié)點(diǎn)過(guò)程中涉及到了CAS的操作朴沿,即如果有其他線程也同時(shí)在做相同的操作,通過(guò)cas方式設(shè)置直到尾節(jié)點(diǎn)設(shè)置成功)鸠真。加入隊(duì)列后判斷一下自己的前置節(jié)點(diǎn)是不是head悯仙,如果是再次嘗試獲取鎖,獲取成功則將此節(jié)點(diǎn)設(shè)置為新的head吠卷,獲取失敗則掛起線程锡垄,等待前置節(jié)點(diǎn)釋放鎖

unlock()方法調(diào)用時(shí),將持有鎖的線程置空祭隔,并將state設(shè)置為0货岭,找到它的next并喚醒它

AQS使用方式和其中的設(shè)計(jì)模式

AQS的主要使用方式是繼承扣汪,子類(lèi)通過(guò)繼承AQS并實(shí)現(xiàn)它的抽象方法來(lái)管理同步狀態(tài)窿凤,在AQS里由一個(gè)int型的state來(lái)代表這個(gè)狀態(tài),在抽象方法的實(shí)現(xiàn)過(guò)程中免不了要對(duì)同步狀態(tài)進(jìn)行更改送朱,這時(shí)就需要使用同步器提供的3個(gè)方法(getState()搞坝、setState(int newState)和compareAndSetState(int expect,int update))來(lái)進(jìn)行操作搔谴,因?yàn)樗鼈兡軌虮WC狀態(tài)的改變是安全的。

private volatile int state;

在實(shí)現(xiàn)上桩撮,子類(lèi)推薦被定義為自定義同步組件的靜態(tài)內(nèi)部類(lèi)敦第,AQS自身沒(méi)有實(shí)現(xiàn)任何同步接口,它僅僅是定義了若干同步狀態(tài)獲取和釋放的方法來(lái)供自定義同步組件使用店量,同步器既可以支持獨(dú)占式地獲取同步狀態(tài)芜果,也可以支持共享式地獲取同步狀態(tài),這樣就可以方便實(shí)現(xiàn)不同類(lèi)型的同步組件(ReentrantLock融师、ReentrantReadWriteLock和CountDownLatch等)右钾。

同步器是實(shí)現(xiàn)鎖(也可以是任意同步組件)的關(guān)鍵,在鎖的實(shí)現(xiàn)中聚合同步器旱爆∫ㄉ洌可以這樣理解二者之間的關(guān)系:
鎖是面向使用者的,它定義了使用者與鎖交互的接口(比如可以允許兩個(gè)線程并行訪問(wèn))怀伦,隱藏了實(shí)現(xiàn)細(xì)節(jié)后控;
同步器面向的是鎖的實(shí)現(xiàn)者,它簡(jiǎn)化了鎖的實(shí)現(xiàn)方式空镜,屏蔽了同步狀態(tài)管理、線程的排隊(duì)、等待與喚醒等底層操作吴攒。鎖和同步器很好地隔離了使用者和實(shí)現(xiàn)者所需關(guān)注的領(lǐng)域张抄。
實(shí)現(xiàn)者需要繼承同步器并重寫(xiě)指定的方法,隨后將同步器組合在自定義同步組件的實(shí)現(xiàn)中洼怔,并調(diào)用同步器提供的模板方法署惯,而這些模板方法將會(huì)調(diào)用使用者重寫(xiě)的方法。

AQS基于CLH隊(duì)列鎖實(shí)現(xiàn)

CLH隊(duì)列鎖即Craig, Landin, and Hagersten (CLH) locks镣隶。
CLH隊(duì)列鎖也是一種基于鏈表的可擴(kuò)展极谊、高性能、公平的自旋鎖安岂,申請(qǐng)線程僅僅在本地變量上自旋轻猖,它不斷輪詢(xún)前驅(qū)的狀態(tài),假設(shè)發(fā)現(xiàn)前驅(qū)釋放了鎖就結(jié)束自旋域那。

當(dāng)一個(gè)線程需要獲取鎖時(shí):
1.創(chuàng)建一個(gè)新的QNode咙边,將其中的locked設(shè)置為true表示需要獲取鎖,myPred表示對(duì)其前驅(qū)結(jié)點(diǎn)的引用


圖片1.png

2.線程A對(duì)tail域調(diào)用getAndSet方法次员,使自己成為隊(duì)列的尾部败许,同時(shí)獲取一個(gè)指向其前驅(qū)結(jié)點(diǎn)的引用myPred


圖片2.png

3.線程B需要獲得鎖,同樣的流程再來(lái)一遍
圖片3.png

4.線程就在前驅(qū)結(jié)點(diǎn)的locked字段上旋轉(zhuǎn)淑蔚,直到前驅(qū)結(jié)點(diǎn)釋放鎖(前驅(qū)節(jié)點(diǎn)的鎖值 locked == false)

5.當(dāng)一個(gè)線程需要釋放鎖時(shí)市殷,將當(dāng)前結(jié)點(diǎn)的locked域設(shè)置為false,同時(shí)回收前驅(qū)結(jié)點(diǎn)


圖片4.png

如上圖所示刹衫,前驅(qū)結(jié)點(diǎn)釋放鎖醋寝,線程A的myPred所指向的前驅(qū)結(jié)點(diǎn)的locked字段變?yōu)閒alse,線程A就可以獲取到鎖绪妹。
CLH隊(duì)列鎖的優(yōu)點(diǎn)是空間復(fù)雜度低(如果有n個(gè)線程甥桂,L個(gè)鎖,每個(gè)線程每次只獲取一個(gè)鎖邮旷,那么需要的存儲(chǔ)空間是O(L+n)黄选,n個(gè)線程有n個(gè)myNode,L個(gè)鎖有L個(gè)tail)婶肩。CLH隊(duì)列鎖常用在SMP體系結(jié)構(gòu)下办陷。
Java中的AQS是CLH隊(duì)列鎖的一種變體實(shí)現(xiàn)。

AQS中的方法

acquire():獨(dú)占式獲取同步狀態(tài)律歼,如果當(dāng)前線程獲取同步狀態(tài)成功民镜,則由該方法返回,否則將會(huì)進(jìn)入同步隊(duì)列等待险毁,該方法將會(huì)調(diào)用重寫(xiě)的tryAcquire(int arg)方法

public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

acquireInterruptibly:與acquire相同制圈,但是該方法會(huì)響應(yīng)中斷们童,當(dāng)線程未獲取到同步狀態(tài)而進(jìn)入同步隊(duì)列中,如果當(dāng)前線程被中斷鲸鹦,則該方法會(huì)拋出InterruptException并返回

public final void acquireInterruptibly(int arg)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        if (!tryAcquire(arg))
            doAcquireInterruptibly(arg);
    }

tryAcquireNanos:在acquireInterruptibly基礎(chǔ)上增加了超時(shí)限制慧库,如果當(dāng)前線程在超時(shí)時(shí)間內(nèi)沒(méi)有獲取到同步鎖狀態(tài),那么就會(huì)返回false,如果獲取到了就返回true

public final boolean tryAcquireNanos(int arg, long nanosTimeout)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        return tryAcquire(arg) ||
            doAcquireNanos(arg, nanosTimeout);
    }

acquireShared:共享式的獲取同步狀態(tài)馋嗜,如果當(dāng)前線程為獲取到同步狀態(tài)齐板,將會(huì)進(jìn)入同步隊(duì)列等待,與獨(dú)占式獲取的主要區(qū)別是在同一時(shí)刻可以有過(guò)個(gè)線程獲取到同步狀態(tài)

public final void acquireShared(int arg) {
        if (tryAcquireShared(arg) < 0)
            doAcquireShared(arg);
    }

acquireSharedInterruptibly:與acquireShared相同葛菇,該方法會(huì)響應(yīng)中斷

public final void acquireSharedInterruptibly(int arg)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        if (tryAcquireShared(arg) < 0)
            doAcquireSharedInterruptibly(arg);
    }

tryAcquireSharedNanos:在acquireSharedInterruptibly基礎(chǔ)上增加了超時(shí)限制

public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        return tryAcquireShared(arg) >= 0 ||
            doAcquireSharedNanos(arg, nanosTimeout);
    }

release:獨(dú)占式的釋放同步狀態(tài)甘磨,該方法會(huì)在釋放同步狀態(tài)之后,將同步隊(duì)列中第一個(gè)節(jié)點(diǎn)包含的線程喚醒

public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }

releaseShared:共享式的釋放同步狀態(tài)

public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {
            doReleaseShared();
            return true;
        }
        return false;
    }

getQueuedThreads:獲取等待在同步隊(duì)列上的線程集合

public final Collection<Thread> getQueuedThreads() {
        ArrayList<Thread> list = new ArrayList<Thread>();
        for (Node p = tail; p != null; p = p.prev) {
            Thread t = p.thread;
            if (t != null)
                list.add(t);
        }
        return list;
    }

其中可以被重寫(xiě)的方法:

tryAcquire
tryRelease
tryAcquireShared
tryReleaseShared
isHeldExclusively

既然AQS是實(shí)現(xiàn)同步器的基礎(chǔ)框架眯停,那么我們?cè)趺从盟麃?lái)實(shí)現(xiàn)一個(gè)我們自己的類(lèi)似ReentrantLock的工具呢济舆?
如下,定義一個(gè)類(lèi)實(shí)現(xiàn)Lock接口(每個(gè)鎖都要實(shí)現(xiàn)的接口)庵朝,然后定義一個(gè)內(nèi)部類(lèi)吗冤,我們這里命名為MySync繼承自AbstractQueuedSynchronizer,然后我們自定義的鎖的所有操作都是交給這個(gè)內(nèi)部類(lèi)MySync實(shí)現(xiàn)的

public class MyLock implements Lock {
    MySync sync = new MySync();

    @Override
    public void lock() {
        System.out.println(Thread.currentThread().getName()+" ready get lock");
        sync.acquire(1);
        System.out.println(Thread.currentThread().getName()+" already got lock");
    }

    @Override
    public void lockInterruptibly() throws InterruptedException {
        sync.acquireInterruptibly(1);
    }

    @Override
    public boolean tryLock() {
        return sync.tryAcquire(1);
    }

    @Override
    public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
        return sync.tryAcquireNanos(1,unit.toNanos(time));
    }

    @Override
    public void unlock() {
        System.out.println(Thread.currentThread().getName()+" ready release lock");
        sync.release(1);
        System.out.println(Thread.currentThread().getName()+" already released lock");
    }

    @Override
    public Condition newCondition() {
        return sync.newCondition();
    }

    static class MySync extends AbstractQueuedSynchronizer{
        @Override
        protected boolean tryAcquire(int arg) {
            if (compareAndSetState(0,1)){
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
            return false;
        }

        @Override
        protected boolean tryRelease(int arg) {
            if (getState() == 0){
                throw new IllegalMonitorStateException("already release lock");
            }
            setExclusiveOwnerThread(null);
            setState(0);
            return true;
        }

        protected Condition newCondition(){
            return new ConditionObject();
        }
    }
}

測(cè)試類(lèi)

public class TestMyLock {

    public void test() {
        final Lock lock = new MyLock();
        for (int i = 0; i < 4; i++) {
            Worker w = new Worker(lock);
            w.start();
        }
    }

    public static void main(String[] args) {
        TestMyLock lock = new TestMyLock();
        lock.test();
    }

    class Worker extends Thread {
        private final Lock lock;

        public Worker(Lock lock) {
            this.lock = lock;
        }

        public void run() {
            lock.lock();
            System.out.println(Thread.currentThread().getName());
            try {
                SleepTools.second(3);
            } finally {
                lock.unlock();
            }
        }
    }
}


打印:
Thread-0 ready get lock
Thread-0 already got lock
Thread-0
Thread-1 ready get lock
Thread-2 ready get lock
Thread-3 ready get lock

Thread-0 ready release lock
Thread-0 already released lock
Thread-1 already got lock
Thread-1

Thread-1 ready release lock
Thread-1 already released lock
Thread-2 already got lock
Thread-2

Thread-2 ready release lock
Thread-2 already released lock
Thread-3 already got lock
Thread-3

Thread-3 ready release lock
Thread-3 already released lock

Process finished with exit code 0

我們這個(gè)鎖有個(gè)缺陷九府,他目前是不可重入的椎瘟,接下來(lái)我們實(shí)現(xiàn)一個(gè)自己的可重入鎖。重入鎖和非重入鎖的差別其實(shí)并不大侄旬,關(guān)鍵就在于獲取鎖和釋放鎖的時(shí)候是否有判斷當(dāng)前線程是否是已經(jīng)持有鎖的線程這一步

public class MyReentrantLock implements Lock {
    MyReentrantSync reentrantSync = new MyReentrantSync();
    @Override
    public void lock() {
        reentrantSync.acquire(1);
    }

    @Override
    public void lockInterruptibly() throws InterruptedException {
        reentrantSync.acquireInterruptibly(1);
    }

    @Override
    public boolean tryLock() {
        return reentrantSync.tryAcquire(1);
    }

    @Override
    public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
        return reentrantSync.tryAcquireNanos(1,unit.toNanos(time));
    }

    @Override
    public void unlock() {
        reentrantSync.release(1);
    }

    @Override
    public Condition newCondition() {
        return reentrantSync.newCondition();
    }

    class MyReentrantSync extends AbstractQueuedSynchronizer{
        @Override
        protected boolean tryAcquire(int arg) {
            if (compareAndSetState(0, 1)) {
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
            if (Thread.currentThread() == getExclusiveOwnerThread()){
                setState(getState()+1);
                return true;
            }
            return false;
        }

        @Override
        protected boolean tryRelease(int arg) {
            if (Thread.currentThread() != getExclusiveOwnerThread()){
                throw new IllegalMonitorStateException();
            }
            if (getState() == 0){
                throw new IllegalMonitorStateException("already release");
            }
            setState(getState()-1);
            if (getState() == 0){
                setExclusiveOwnerThread(null);
            }
            return true;
        }
        protected Condition newCondition(){
            return new ConditionObject();
        }
    }
}

測(cè)試類(lèi)

public class TestReenterSelfLock {

    static final Lock lock = new MyReentrantLock();

    public void reenter(int x){
        lock.lock();
        try {
            System.out.println(Thread.currentThread().getName()+":遞歸層級(jí):"+x);
            int y = x - 1;
            if (y==0) return;
            else{
                reenter(y);
            }
        } finally {
            lock.unlock();
        }
    }

    public void test() {
        for (int i = 0; i < 3; i++) {
            Worker w = new Worker();
            w.start();
        }
    }

    public static void main(String[] args) {
        TestReenterSelfLock testMyLock = new TestReenterSelfLock();
        testMyLock.test();
    }

    class Worker extends Thread {
        public void run() {
            System.out.println(Thread.currentThread().getName());
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            reenter(3);
        }
    }
}

打印結(jié)果
Thread-0
Thread-1
Thread-2
Thread-0:遞歸層級(jí):3
Thread-0:遞歸層級(jí):2
Thread-0:遞歸層級(jí):1
Thread-2:遞歸層級(jí):3
Thread-2:遞歸層級(jí):2
Thread-2:遞歸層級(jí):1
Thread-1:遞歸層級(jí):3
Thread-1:遞歸層級(jí):2
Thread-1:遞歸層級(jí):1
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末肺蔚,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子儡羔,更是在濱河造成了極大的恐慌宣羊,老刑警劉巖,帶你破解...
    沈念sama閱讀 211,948評(píng)論 6 492
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件汰蜘,死亡現(xiàn)場(chǎng)離奇詭異仇冯,居然都是意外死亡,警方通過(guò)查閱死者的電腦和手機(jī)族操,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,371評(píng)論 3 385
  • 文/潘曉璐 我一進(jìn)店門(mén)苛坚,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái),“玉大人色难,你說(shuō)我怎么就攤上這事泼舱。” “怎么了枷莉?”我有些...
    開(kāi)封第一講書(shū)人閱讀 157,490評(píng)論 0 348
  • 文/不壞的土叔 我叫張陵娇昙,是天一觀的道長(zhǎng)。 經(jīng)常有香客問(wèn)我笤妙,道長(zhǎng)冒掌,這世上最難降的妖魔是什么噪裕? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 56,521評(píng)論 1 284
  • 正文 為了忘掉前任,我火速辦了婚禮股毫,結(jié)果婚禮上州疾,老公的妹妹穿的比我還像新娘。我一直安慰自己皇拣,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 65,627評(píng)論 6 386
  • 文/花漫 我一把揭開(kāi)白布薄嫡。 她就那樣靜靜地躺著氧急,像睡著了一般。 火紅的嫁衣襯著肌膚如雪毫深。 梳的紋絲不亂的頭發(fā)上吩坝,一...
    開(kāi)封第一講書(shū)人閱讀 49,842評(píng)論 1 290
  • 那天,我揣著相機(jī)與錄音哑蔫,去河邊找鬼钉寝。 笑死,一個(gè)胖子當(dāng)著我的面吹牛闸迷,可吹牛的內(nèi)容都是我干的嵌纲。 我是一名探鬼主播,決...
    沈念sama閱讀 38,997評(píng)論 3 408
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼腥沽,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼逮走!你這毒婦竟也來(lái)了?” 一聲冷哼從身側(cè)響起今阳,我...
    開(kāi)封第一講書(shū)人閱讀 37,741評(píng)論 0 268
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤师溅,失蹤者是張志新(化名)和其女友劉穎,沒(méi)想到半個(gè)月后盾舌,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體墓臭,經(jīng)...
    沈念sama閱讀 44,203評(píng)論 1 303
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,534評(píng)論 2 327
  • 正文 我和宋清朗相戀三年妖谴,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了窿锉。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 38,673評(píng)論 1 341
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡窖维,死狀恐怖榆综,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情铸史,我是刑警寧澤鼻疮,帶...
    沈念sama閱讀 34,339評(píng)論 4 330
  • 正文 年R本政府宣布,位于F島的核電站琳轿,受9級(jí)特大地震影響判沟,放射性物質(zhì)發(fā)生泄漏耿芹。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,955評(píng)論 3 313
  • 文/蒙蒙 一挪哄、第九天 我趴在偏房一處隱蔽的房頂上張望吧秕。 院中可真熱鬧,春花似錦迹炼、人聲如沸砸彬。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 30,770評(píng)論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)砂碉。三九已至,卻和暖如春刻两,著一層夾襖步出監(jiān)牢的瞬間增蹭,已是汗流浹背。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 32,000評(píng)論 1 266
  • 我被黑心中介騙來(lái)泰國(guó)打工磅摹, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留滋迈,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 46,394評(píng)論 2 360
  • 正文 我出身青樓户誓,卻偏偏與公主長(zhǎng)得像饼灿,于是被迫代替她去往敵國(guó)和親。 傳聞我的和親對(duì)象是個(gè)殘疾皇子厅克,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 43,562評(píng)論 2 349