JAVA進階之AQS

1赖舟、引言

在JDK1.5之前,一般是靠synchronized關(guān)鍵字來實現(xiàn)線程對共享變量的互斥訪問硫椰。synchronized是在字節(jié)碼上加指令铜邮,依賴于底層操作系統(tǒng)的Mutex Lock實現(xiàn)。

而從JDK1.5以后java界的一位大神—— Doug Lea 開發(fā)了AbstractQueuedSynchronizer(AQS)組件酱虎,使用原生java代碼實現(xiàn)了synchronized語義雨膨。換句話說,Doug Lea沒有使用更“高級”的機器指令读串,也不依靠JDK編譯時的特殊處理聊记,僅用一個普普通通的類就完成了代碼塊的并發(fā)訪問控制撒妈,比那些費力不討好的實現(xiàn)不知高到哪里去了。

java.util.concurrent包有多重要無需多言甥雕,一言以蔽之踩身,是Doug Lea大爺對天下所有Java程序員的憐憫。

AQS定義了一套多線程訪問共享資源的同步器框架社露,是整個java.util.concurrent包的基石挟阻,LockReadWriteLock峭弟、CountDowndLatch附鸽、CyclicBarrierSemaphore瞒瘸、ThreadPoolExecutor等都是在AQS的基礎(chǔ)上實現(xiàn)的坷备。

2、實現(xiàn)原理

并發(fā)控制的核心是鎖的獲取與釋放情臭,鎖的實現(xiàn)方式有很多種省撑,AQS采用的是一種改進的CLH鎖

2.1 CLH鎖

CLH(Craig, Landin, and Hagersten locks)是一種自旋鎖俯在,能確保無饑餓性竟秫,提供先來先服務(wù)的公平性。

何謂自旋鎖跷乐?它是為實現(xiàn)保護共享資源而提出一種鎖機制肥败。其實,自旋鎖與互斥鎖比較類似愕提,它們都是為了解決對某項資源的互斥使用馒稍。無論是互斥鎖,還是自旋鎖浅侨,在任何時刻纽谒,最多只能有一個保持者,也就是說如输,在任何時刻最多只能有一個執(zhí)行單元獲得鎖佛舱。但是兩者在調(diào)度機制上略有不同。對于互斥鎖挨决,如果資源已經(jīng)被占用,資源申請者只能進入睡眠狀態(tài)订歪。但是自旋鎖不會引起調(diào)用者睡眠脖祈,如果自旋鎖已經(jīng)被別的執(zhí)行單元保持,調(diào)用者就一直循環(huán)在那里看是否該自旋鎖的保持者已經(jīng)釋放了鎖刷晋,“自旋”一詞就是因此而得名盖高。

CLH鎖是一種基于鏈表的可擴展慎陵、高性能、公平的自旋鎖喻奥,申請線程只在本地變量上自旋席纽,它不斷輪詢前驅(qū)的狀態(tài),如果發(fā)現(xiàn)前驅(qū)釋放了鎖就結(jié)束自旋撞蚕。

CLH隊列中的節(jié)點QNode中含有一個locked字段润梯,該字段若為true表示該線程需要獲取鎖,且不釋放鎖甥厦,為false表示線程釋放了鎖纺铭。節(jié)點之間是通過隱形的鏈表相連,之所以叫隱形的鏈表是因為這些節(jié)點之間沒有明顯的next指針刀疙,而是通過myPred所指向的節(jié)點的變化情況來影響myNode的行為舶赔。

CLHLock上還有一個尾指針,始終指向隊列的最后一個節(jié)點谦秧。

1.png

當一個線程需要獲取鎖時竟纳,會創(chuàng)建一個新的QNode,將其中的locked設(shè)置為true表示需要獲取鎖疚鲤,然后使自己成為隊列的尾部锥累,同時獲取一個指向其前趨的引用myPred,然后該線程就在前趨節(jié)點的locked字段上旋轉(zhuǎn)石咬,直到前趨節(jié)點釋放鎖揩悄。當一個線程需要釋放鎖時,將當前節(jié)點的locked域設(shè)置為false鬼悠,同時回收前趨節(jié)點删性。如上圖所示,線程A需要獲取鎖焕窝,其myNode域為true蹬挺,些時tail指向線程A的節(jié)點,然后線程B也加入到線程A后面它掂,tail指向線程B的節(jié)點巴帮。然后線程A和B都在它的myPred域上旋轉(zhuǎn),一旦它的myPred節(jié)點的locked字段變?yōu)閒alse虐秋,它就可以獲取鎖榕茧。

2.2 AQS數(shù)據(jù)模型

AQS維護了一個volatile int state(代表共享資源)和一個FIFO線程等待隊列(多線程爭用資源被阻塞時會進入此隊列)。

2.png

AQS的內(nèi)部隊列是CLH同步鎖的一種變形客给。其主要從以下方面進行了改造:

  • 在結(jié)構(gòu)上引入了頭節(jié)點和尾節(jié)點用押,分別指向隊列的頭和尾,嘗試獲取鎖靶剑、入隊列蜻拨、釋放鎖等實現(xiàn)都與頭尾節(jié)點相關(guān)池充,
  • 為了可以處理timeout和cancel操作,每個node維護一個指向前驅(qū)的指針缎讼。如果一個node的前驅(qū)被cancel收夸,這個node可以前向移動使用前驅(qū)的狀態(tài)字段
  • 在每個node里面使用一個狀態(tài)字段來控制阻塞/喚醒,而不是自旋
  • head節(jié)點使用的是傀儡節(jié)點

FIFO隊列中的節(jié)點有AQS的靜態(tài)內(nèi)部類Node定義:

static final class Node {

    // 共享模式
    static final Node SHARED = new Node();

    // 獨占模式
    static final Node EXCLUSIVE = null;

    static final int CANCELLED = 1;
    static final int SIGNAL = -1;
    static final int CONDITION = -2;
    static final int PROPAGATE = -3;

    /**
     * CANCELLED血崭,值為1卧惜,表示當前的線程被取消 
     * SIGNAL,值為-1功氨,表示當前節(jié)點的后繼節(jié)點包含的線程需要運行序苏,也就是unpark;
     * CONDITION捷凄,值為-2忱详,表示當前節(jié)點在等待condition,也就是在condition隊列中跺涤;
     * PROPAGATE匈睁,值為-3,表示當前場景下后續(xù)的acquireShared能夠得以執(zhí)行桶错; 
     * 值為0航唆,表示當前節(jié)點在sync隊列中,等待著獲取鎖院刁。
     */
    volatile int waitStatus;

    // 前驅(qū)結(jié)點
    volatile Node prev;

    // 后繼結(jié)點
    volatile Node next;

    // 與該結(jié)點綁定的線程
    volatile Thread thread;

    // 存儲condition隊列中的后繼節(jié)點
    Node nextWaiter;

    // 是否為共享模式
    final boolean isShared() {
        return nextWaiter == SHARED;
    }

    // 獲取前驅(qū)結(jié)點
    final Node predecessor() throws NullPointerException {
        Node p = prev;
        if (p == null)
            throw new NullPointerException();
        else
            return p;
    }

    Node() { // Used to establish initial head or SHARED marker
    }

    Node(Thread thread, Node mode) { // Used by addWaiter
        this.nextWaiter = mode;
        this.thread = thread;
    }

    Node(Thread thread, int waitStatus) { // Used by Condition
        this.waitStatus = waitStatus;
        this.thread = thread;
    }
}

Node類中有兩個常量SHAREEXCLUSIVE糯钙,顧名思義這兩個常量用于表示這個節(jié)點支持共享模式還是獨占模式,共享模式指的是允許多個線程獲取同一個鎖而且可能獲取成功退腥,獨占模式指的是一個鎖如果被一個線程持有任岸,其他線程必須等待。多個線程讀取一個文件可以采用共享模式狡刘,而當有一個線程在寫文件時不會允許另一個線程寫這個文件享潜,這就是獨占模式的應(yīng)用場景。

2.3 CAS操作

AQS有三個重要的變量:

    // 隊頭結(jié)點
    private transient volatile Node head;

    // 隊尾結(jié)點
    private transient volatile Node tail;

    // 代表共享資源
    private volatile int state;

    protected final int getState() {
        return state;
    }

    protected final void setState(int newState) {
        state = newState;
    }

    protected final boolean compareAndSetState(int expect, int update) {
        return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
    }

compareAndSetState方法是以樂觀鎖的方式更新共享資源嗅蔬。

獨占鎖是一種悲觀鎖剑按,synchronized就是一種獨占鎖,會導(dǎo)致其它所有需要鎖的線程掛起澜术,等待持有鎖的線程釋放鎖艺蝴。而另一個更加有效的鎖就是樂觀鎖。所謂樂觀鎖就是鸟废,每次不加鎖而是假設(shè)沒有沖突而去完成某項操作猜敢,如果因為沖突失敗就重試,直到成功為止。樂觀鎖用到的機制就是CAS锣枝,即Compare And Swap

CAS 指的是現(xiàn)代 CPU 廣泛支持的一種對內(nèi)存中的共享數(shù)據(jù)進行操作的一種特殊指令兰英。這個指令會對內(nèi)存中的共享數(shù)據(jù)做原子的讀寫操作撇叁。簡單介紹一下這個指令的操作過程:

首先,CPU 會將內(nèi)存中將要被更改的數(shù)據(jù)與期望的值做比較畦贸。然后陨闹,當這兩個值相等時,CPU 才會將內(nèi)存中的數(shù)值替換為新的值薄坏。否則便不做操作趋厉。最后,CPU 會將舊的數(shù)值返回胶坠。

這一系列的操作是原子的君账。它們雖然看似復(fù)雜,但卻是 Java 5 并發(fā)機制優(yōu)于原有鎖機制的根本沈善。簡單來說乡数,CAS 的含義是“我認為原有的值應(yīng)該是什么,如果是闻牡,則將原有的值更新為新值净赴,否則不做修改,并告訴我原來的值是多少”罩润。

CAS通過調(diào)用JNI(Java Native Interface)調(diào)用實現(xiàn)的玖翅。JNI允許java調(diào)用其他語言,而CAS就是借助C語言來調(diào)用CPU底層指令實現(xiàn)的割以。Unsafe是CAS的核心類金度,它提供了硬件級別的原子操作。

Doug Lea大神在java同步器中大量使用了CAS技術(shù)拳球,鬼斧神工的實現(xiàn)了多線程執(zhí)行的安全性审姓。CAS不僅在AQS的實現(xiàn)中隨處可見,也是整個java.util.concurrent包的基石祝峻。

可以發(fā)現(xiàn)魔吐,headtail莱找、state三個變量都是volatile的酬姆。

volatile是輕量級的synchronized,它在多處理器開發(fā)中保證了共享變量的“可見性”奥溺。可見性的意思是當一個線程修改一個共享變量時辞色,另外一個線程能讀到這個修改的值。如果一個字段被聲明成volatile浮定,Java線程內(nèi)存模型確保所有線程看到這個變量的值是一致的相满。

volatile變量也存在一些局限:不能用于構(gòu)建原子的復(fù)合操作层亿,因此當一個變量依賴舊值時就不能使用volatile變量。而CAS呢立美,恰恰可以提供對共享變量的原子的讀寫操作匿又。

volatile保證共享變量的可見性,CAS保證更新操作的原子性建蹄,簡直是絕配碌更!把這些特性整合在一起,就形成了整個concurrent包得以實現(xiàn)的基石洞慎。如果仔細分析concurrent包的源代碼實現(xiàn)痛单,會發(fā)現(xiàn)一個通用化的實現(xiàn)模式:

  • 首先,聲明共享變量為volatile劲腿;
  • 然后旭绒,使用CAS的原子條件更新來實現(xiàn)線程之間的同步;
  • 同時谆棱,配合以volatile的讀/寫和CAS所具有的volatile讀和寫的內(nèi)存語義來實現(xiàn)線程之間的通信快压。

AQS,非阻塞數(shù)據(jù)結(jié)構(gòu)和原子變量類(java.util.concurrent.atomic包中的類)垃瞧,這些concurrent包中的基礎(chǔ)類都是使用這種模式來實現(xiàn)的蔫劣,而concurrent包中的高層類又是依賴于這些基礎(chǔ)類來實現(xiàn)的。從整體來看个从,concurrent包的實現(xiàn)示意圖如下:

3.png

3脉幢、源碼解讀

前面提到過,AQS定義兩種資源共享方式:

  • Exclusive:獨占嗦锐,只有一個線程能執(zhí)行嫌松,如ReentrantLock
  • Share:共享,多個線程可同時執(zhí)行奕污,如Semaphore/CountDownLatch

不同的自定義同步器爭用共享資源的方式也不同萎羔。自定義同步器在實現(xiàn)時只需要實現(xiàn)共享資源state的獲取與釋放方式即可,至于具體線程等待隊列的維護(如獲取資源失敗入隊/喚醒出隊等)碳默,AQS已經(jīng)在頂層實現(xiàn)好了贾陷。自定義同步器實現(xiàn)時主要實現(xiàn)以下幾種方法:

  • isHeldExclusively():該線程是否正在獨占資源。只有用到condition才需要去實現(xiàn)它嘱根。
  • tryAcquire(int):獨占方式髓废。嘗試獲取資源,成功則返回true该抒,失敗則返回false慌洪。
  • tryRelease(int):獨占方式。嘗試釋放資源,成功則返回true冈爹,失敗則返回false涌攻。
  • tryAcquireShared(int):共享方式。嘗試獲取資源频伤。負數(shù)表示失斞⑵帷;0表示成功剂买,但沒有剩余可用資源;正數(shù)表示成功癌蓖,且有剩余資源瞬哼。
  • tryReleaseShared(int):共享方式。嘗試釋放資源租副,成功則返回true坐慰,失敗則返回false。

ReentrantLock為例用僧,state初始化為0结胀,表示未鎖定狀態(tài)。A線程lock()時责循,會調(diào)用tryAcquire()獨占該鎖并將state+1糟港。此后,其他線程再tryAcquire()時就會失敗院仿,直到A線程unlock()到state=0(即釋放鎖)為止秸抚,其它線程才有機會獲取該鎖。當然歹垫,釋放鎖之前剥汤,A線程自己是可以重復(fù)獲取此鎖的(state會累加),這就是可重入的概念排惨。但要注意吭敢,獲取多少次就要釋放多么次,這樣才能保證state是能回到零態(tài)的暮芭。

再以CountDownLatch以例鹿驼,任務(wù)分為N個子線程去執(zhí)行,state也初始化為N(注意N要與線程個數(shù)一致)谴麦。這N個子線程是并行執(zhí)行的蠢沿,每個子線程執(zhí)行完后countDown()一次,state會CAS減1匾效。等到所有子線程都執(zhí)行完后(即state=0)舷蟀,會unpark()主調(diào)用線程,然后主調(diào)用線程就會從await()函數(shù)返回,繼續(xù)后余動作野宜。

一般來說扫步,自定義同步器要么是獨占方法,要么是共享方式匈子,他們也只需實現(xiàn)tryAcquire-tryRelease河胎、tryAcquireShared-tryReleaseShared中的一種即可。但AQS也支持自定義同步器同時實現(xiàn)獨占和共享兩種方式虎敦,如ReentrantReadWriteLock游岳。

3.1 acquire(int)

    public final void acquire(int arg) {
        if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

此方法是獨占模式下線程獲取共享資源的頂層入口。如果獲取到資源其徙,線程直接返回胚迫,否則進入等待隊列,直到獲取到資源為止唾那,且整個過程忽略中斷的影響访锻。獲取到資源后,線程就可以去執(zhí)行其臨界區(qū)代碼了闹获。

函數(shù)流程如下:

  • tryAcquire()嘗試直接去獲取資源期犬,如果成功則直接返回;
  • addWaiter()將該線程加入等待隊列的尾部避诽,并標記為獨占模式龟虎;
  • acquireQueued()使線程在等待隊列中獲取資源,一直獲取到資源后才返回沙庐。如果在整個等待過程中被中斷過遣总,則返回true,否則返回false轨功。
  • 如果線程在等待過程中被中斷過旭斥,它是不響應(yīng)的。只是獲取資源后才再進行自我中斷selfInterrupt()古涧,將中斷補上垂券。

下面再來看看每個方法的實現(xiàn)代碼。

3.1.1 tryAcquire(int)

    protected boolean tryAcquire(int arg) {
        throw new UnsupportedOperationException();
    }

此方法嘗試去獲取獨占資源羡滑。如果獲取成功菇爪,則直接返回true,否則直接返回false柒昏。

AQS只是一個框架凳宙,在這里定義了一個接口,具體資源的獲取交由自定義同步器去實現(xiàn)了(通過state的get/set/CAS)职祷,至于能不能重入氏涩,能不能加塞届囚,那就看具體的自定義同步器怎么去設(shè)計了。當然是尖,自定義同步器在進行資源訪問時要考慮線程安全的影響意系。
這里之所以沒有定義成abstract,是因為獨占模式下只用實現(xiàn)tryAcquire-tryRelease饺汹,而共享模式下只用實現(xiàn)tryAcquireShared-tryReleaseShared蛔添。如果都定義成abstract,那么每個模式也要去實現(xiàn)另一模式下的接口兜辞。說到底迎瞧,Doug Lea還是站在咱們開發(fā)者的角度,盡量減少不必要的工作量逸吵。

3.1.2 addWaiter(Node)

    private Node addWaiter(Node mode) {
        // 使用當前線程構(gòu)造結(jié)點
        Node node = new Node(Thread.currentThread(), mode);

        Node pred = tail;
        if (pred != null) { // 如果隊尾結(jié)點不為空夹攒,將當前節(jié)點插入隊尾
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        // 隊尾結(jié)點為空(隊列還沒有初始化),則轉(zhuǎn)調(diào)enq入隊
        enq(node);
        return node;
    }

其中胁塞,compareAndSetTail方法也是調(diào)用Unsafe類實現(xiàn)CAS操作,更新隊尾压语。

3.1.3 enq(Node)

    private Node enq(final Node node) {
        for (;;) { // CAS自旋啸罢,直到插入成功
            Node t = tail;
            if (t == null) { // 隊尾為空,則先初始化隊列胎食,new一個傀儡節(jié)點
                if (compareAndSetHead(new Node()))
                    tail = head; // 頭尾指針都指向傀儡節(jié)點
            } else { // 插入隊尾
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }

這段代碼的精髓就在于CAS自旋volatile變量扰才,也是AtomicIntegerAtomicBoolean等原子量的靈魂厕怜。

3.1.4 acquireQueued(Node, int)

通過tryAcquire()addWaiter()衩匣,如果線程獲取資源失敗,已經(jīng)被放入等待隊列尾部了粥航。但是琅捏,后面還有一項重要的事沒干,就是讓線程進入阻塞狀態(tài)递雀,直到其他線程釋放資源后喚醒自己。過程跟在銀行辦理業(yè)務(wù)時排隊拿號有點相似,acquireQueued()就是干這件事:在等待隊列中排隊拿號(中間沒其它事干可以休息)陈哑,直到拿到號后再返回合瓢。

    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true; // 是否獲取到了資源
        try {
            boolean interrupted = false; // 等待過程中有沒有被中斷
            for (;;) { // 自旋,直到
                final Node p = node.predecessor();
                // 前驅(qū)是head杨凑,則有資格去嘗試獲取資源
                if (p == head && tryAcquire(arg)) {
                    // 獲取資源成功滤奈,將自己置為隊頭,并回收其前驅(qū)(舊的隊頭)
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                // 獲取資源失敗撩满,
                if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

如果獲取資源失敗后蜒程,會調(diào)用兩個函數(shù)绅你,shouldParkAfterFailedAcquireparkAndCheckInterrupt,下面來看看它倆是干什么的搞糕。

3.1.5 shouldParkAfterFailedAcquire(Node, Node)

從名字可以猜出來勇吊,該函數(shù)的作用是“在獲取資源失敗后是否需要阻塞”:

    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int ws = pred.waitStatus; // 前驅(qū)狀態(tài)
        if (ws == Node.SIGNAL)
            // Node.SIGNAL,代表前驅(qū)釋放資源后會通知后繼結(jié)點
            return true;
        if (ws > 0) { // 代表前驅(qū)已取消任務(wù)窍仰,相當于退出了等待隊列
            do { // 一個個往前找汉规,找到最近一個正常等待的前驅(qū),排在它的后面
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            // 前驅(qū)狀態(tài)正常驹吮,則將其狀態(tài)置為SIGNAL针史,意為,釋放資源后通知后繼結(jié)點
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }

整個流程中碟狞,如果前驅(qū)節(jié)點的狀態(tài)不是SIGNAL啄枕,那么自己就不能安心去休息,需要去找個安心的休息點族沃,同時可以再嘗試下看有沒有機會輪到自己拿號频祝。

3.1.6 parkAndCheckInterrupt()

如果線程找好安全休息點后,那就可以安心去休息了脆淹。此方法就是讓線程去休息常空,真正進入等待狀態(tài)。

    private final boolean parkAndCheckInterrupt() {
        LockSupport.park(this); // 使線程進入waiting狀態(tài)
        return Thread.interrupted();
    }

park()會讓當前線程進入waiting狀態(tài)盖溺。在此狀態(tài)下漓糙,有兩種途徑可以喚醒該線程:被unpark()或被interrupt()

3.1.7 小結(jié)

總結(jié)下acquire的流程:

  • 調(diào)用自定義同步器的tryAcquire()嘗試直接去獲取資源烘嘱,如果成功則直接返回昆禽;
  • 沒成功,則addWaiter()將該線程加入等待隊列的尾部蝇庭,并標記為獨占模式醉鳖;
  • acquireQueued()使線程在等待隊列中休息,有機會時(輪到自己哮内,會被unpark())會去嘗試獲取資源辐棒。獲取到資源后才返回。如果在整個等待過程中被中斷過牍蜂,則返回true漾根,否則返回false。
  • 如果線程在等待過程中被中斷過鲫竞,它是不響應(yīng)的辐怕。只是獲取資源后才再進行自我中斷selfInterrupt(),將中斷補上从绘。

3.2 release(int)

release()是acquire()的逆操作寄疏,是獨占模式下線程釋放共享資源的頂層入口是牢。它會釋放指定量的資源,如果徹底釋放了(即state=0),它會喚醒等待隊列里的其他線程來獲取資源陕截。

    public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0) // 狀態(tài)不為0驳棱,證明需要喚醒后繼結(jié)點
                unparkSuccessor(h);
            return true;
        }
        return false;
    }

3.2.1 tryRelease(int)

    protected boolean tryRelease(int arg) {
        throw new UnsupportedOperationException();
    }

tryAcquire()一樣,這個方法是需要自定義同步器去實現(xiàn)的农曲。正常來說社搅,tryRelease()都會成功的,因為這是獨占模式乳规,該線程來釋放資源形葬,那么它肯定已經(jīng)拿到獨占資源了,直接減掉相應(yīng)量的資源即可暮的,也不需要考慮線程安全的問題笙以。

3.2.2 unparkSuccessor(Node)

    private void unparkSuccessor(Node node) {

        int ws = node.waitStatus;
        if (ws < 0) // 將當前結(jié)點狀態(tài)置零
            compareAndSetWaitStatus(node, ws, 0);

        Node s = node.next;
        if (s == null || s.waitStatus > 0) { // 后繼結(jié)點為空或者已取消
            s = null;
            // 從隊尾開始向前尋找,找到第一個正常的后繼結(jié)點
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null)
            LockSupport.unpark(s.thread); // 喚醒該結(jié)點上的線程
    }

邏輯并不復(fù)雜冻辩,一句話概括:用unpark()喚醒等待隊列中最前邊的那個未放棄線程猖腕。

3.3 acquireShared(int)

此方法是共享模式下線程獲取共享資源的頂層入口。它會獲取指定量的資源恨闪,獲取成功則直接返回倘感,獲取失敗則進入等待隊列,直到獲取到資源為止凛剥,整個過程忽略中斷。

    public final void acquireShared(int arg) {
        if (tryAcquireShared(arg) < 0)
            doAcquireShared(arg);
    }

    protected int tryAcquireShared(int arg) { // 留給子類實現(xiàn)
        throw new UnsupportedOperationException();
    }

這里tryAcquireShared()依然需要自定義同步器去實現(xiàn)轻姿。但是AQS已經(jīng)把其返回值的語義定義好了:負值代表獲取失斃缰椤;0代表獲取成功互亮,但沒有剩余資源犁享;正數(shù)表示獲取成功,還有剩余資源豹休,其他線程還可以去獲取炊昆。

3.3.1 doAcquireShared(int)

    private void doAcquireShared(int arg) {
        final Node node = addWaiter(Node.SHARED); // 以共享模式加入隊尾
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head) { // 前驅(qū)是隊頭(隊頭肯定是已經(jīng)拿到資源的結(jié)點)
                    int r = tryAcquireShared(arg); // 嘗試獲取資源
                    if (r >= 0) { // 獲取資源成功
                        setHeadAndPropagate(node, r); // 將自己置為隊頭,若還有剩余資源威根,向后傳播
                        p.next = null; // help GC
                        if (interrupted)
                            selfInterrupt(); // 如果等待過程中被打斷過凤巨,此時將中斷補上。
                        failed = false;
                        return;
                    }
                }
                // 判斷狀態(tài)洛搀,尋找合適的前驅(qū)敢茁,進入waiting狀態(tài),等著被unpark()或interrupt()
                if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

該函數(shù)的功能類似于獨占模式下的acquireQueued()留美。

跟獨占模式比彰檬,有一點需要注意的是伸刃,這里只有線程是head.next時(“老二”),才會去嘗試獲取資源逢倍,有剩余的話還會喚醒之后的隊友捧颅。那么問題就來了,假如老大用完后釋放了5個資源较雕,而老二需要6個碉哑,老三需要1個,老四需要2個郎笆。因為老大先喚醒老二谭梗,老二一看資源不夠自己用繼續(xù)park(),也更不會去喚醒老三和老四了宛蚓。獨占模式激捏,同一時刻只有一個線程去執(zhí)行,這樣做未嘗不可凄吏;但共享模式下远舅,多個線程是可以同時執(zhí)行的,現(xiàn)在因為老二的資源需求量大痕钢,而把后面量小的老三和老四也都卡住了图柏。

3.3.2 setHeadAndPropagate(Node, int)

   private void setHeadAndPropagate(Node node, int propagate) {
       Node h = head;
       setHead(node); // 將自己置為隊頭

       if (propagate > 0 || h == null || h.waitStatus < 0) {
           Node s = node.next;
           if (s == null || s.isShared()) // 后繼結(jié)點也為共享模式,則觸發(fā)釋放資源函數(shù)
               doReleaseShared();
       }
   }

此方法在setHead()的基礎(chǔ)上多了一步任连,就是自己蘇醒的同時蚤吹,如果條件符合(比如還有剩余資源),還會去喚醒后繼節(jié)點随抠,畢竟是共享模式裁着。

3.4 releaseShared(int)

此方法是共享模式下線程釋放共享資源的頂層入口。它會釋放指定量的資源拱她,如果徹底釋放了(即state=0),它會喚醒等待隊列里的其他線程來獲取資源二驰。

    public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) { // 嘗試釋放資源
            doReleaseShared(); // 釋放成功,繼續(xù)喚醒后繼結(jié)點
            return true;
        }
        return false;
    }

    protected boolean tryReleaseShared(int arg) { // 留給子類實現(xiàn)
        throw new UnsupportedOperationException();
    }

跟獨占模式下的release()相似秉沼,但有一點稍微需要注意:獨占模式下的tryRelease()在完全釋放掉資源(state=0)后桶雀,才會返回true去喚醒其他線程,這主要是基于可重入的考量唬复;而共享模式下的releaseShared()則沒有這種要求矗积,多線程可并發(fā)執(zhí)行,不適用于可重入敞咧。

3.4.1 doReleaseShared()

    private void doReleaseShared() {

        for (;;) {
            Node h = head;
            if (h != null && h != tail) { // 頭結(jié)點不為空且有后繼結(jié)點
                int ws = h.waitStatus;
                if (ws == Node.SIGNAL) {
                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) // 頭結(jié)點狀態(tài)漠魏,SIGNAL——>0
                        continue; // 狀態(tài)更新失敗則循環(huán)進行,直到成功
                    unparkSuccessor(h); // 喚醒后繼結(jié)點
                } else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) 
          // 頭結(jié)點狀態(tài)妄均,0——>PROPAGATE
                    continue; // 持續(xù)循環(huán)柱锹,直到狀態(tài)更新成功
            }
            if (h == head) // 頭結(jié)點沒變哪自,則結(jié)束循環(huán);否則繼續(xù)
                break;
        }
    }

其余函數(shù)已經(jīng)在上面分析過了禁熏。至此壤巷,AQS的獨占模式與共享模式下的實現(xiàn)原理剖析的差不多了,代碼是最好的老師瞧毙。

除了上面分析的核心方法胧华,AQS還有定義了附帶超時功能的tryAcquireNanos()/tryAcquireSharedNanos()方法,以及響應(yīng)中斷的acquireInterruptibly()/acquireSharedInterruptibly()方法宙彪,其核心流程與通用方法大同小異矩动,不再贅述。

4释漆、應(yīng)用實例

我們利用AQS來實現(xiàn)一個不可重入的互斥鎖實現(xiàn)悲没。鎖資源(AQS里的state)只有兩種狀態(tài):0表示未鎖定,1表示鎖定男图。下邊是Mutex的核心源碼:

public class Mutex {

    /**
     * 靜態(tài)內(nèi)部類示姿,自定義同步器
     */
    private static class Sync extends AbstractQueuedSynchronizer {

        @Override
        protected boolean isHeldExclusively() {
            return getState() == 1; // 是否有資源可用
        }

        @Override
        public boolean tryAcquire(int acquires) {
            assert acquires == 1;
            if (compareAndSetState(0, 1)) { // state:0——>1,代表獲取鎖
                setExclusiveOwnerThread(Thread.currentThread()); // 設(shè)置當前占用資源的線程
                return true;
            }
            return false;
        }

        @Override
        protected boolean tryRelease(int releases) {
            assert releases == 1;
            if (getState() == 0) {
                throw new IllegalMonitorStateException();
            }
            setExclusiveOwnerThread(null);
            setState(0); // state:1——>0逊笆,代表釋放鎖
            return true;
        }
    }

    private final Sync sync = new Sync();

    /**
     * 獲取鎖栈戳,可能會阻塞
     */
    public void lock() {
        sync.acquire(1);
    }

    /**
     * 嘗試獲取鎖,無論成功或失敗难裆,立即返回
     */
    public boolean tryLock() {
        return sync.tryAcquire(1);
    }

    /**
     * 釋放鎖
     */
    public void unlock() {
        sync.release(1);
    }
}

同步類在實現(xiàn)時一般都將自定義同步器(sync)定義為內(nèi)部類子檀,供自己使用;而同步類自己(Mutex)則實現(xiàn)某個接口乃戈,對外服務(wù)褂痰。當然,接口的實現(xiàn)要直接依賴sync偏化,它們在語義上也存在某種對應(yīng)關(guān)系脐恩。而sync只用實現(xiàn)資源state的獲取-釋放方式tryAcquire-tryRelelase镐侯,至于線程的排隊侦讨、等待、喚醒等苟翻,上層的AQS都已經(jīng)實現(xiàn)好了韵卤,我們不用關(guān)心。

ReentrantLock/CountDownLatch/Semphore這些同步類的實現(xiàn)方式都差不多崇猫,不同的地方就在獲取沈条、釋放資源的方式tryAcquire-tryRelelase

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末诅炉,一起剝皮案震驚了整個濱河市蜡歹,隨后出現(xiàn)的幾起案子屋厘,更是在濱河造成了極大的恐慌,老刑警劉巖月而,帶你破解...
    沈念sama閱讀 218,386評論 6 506
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件汗洒,死亡現(xiàn)場離奇詭異,居然都是意外死亡父款,警方通過查閱死者的電腦和手機溢谤,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,142評論 3 394
  • 文/潘曉璐 我一進店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來憨攒,“玉大人世杀,你說我怎么就攤上這事「渭” “怎么了瞻坝?”我有些...
    開封第一講書人閱讀 164,704評論 0 353
  • 文/不壞的土叔 我叫張陵,是天一觀的道長包晰。 經(jīng)常有香客問我湿镀,道長,這世上最難降的妖魔是什么伐憾? 我笑而不...
    開封第一講書人閱讀 58,702評論 1 294
  • 正文 為了忘掉前任勉痴,我火速辦了婚禮,結(jié)果婚禮上树肃,老公的妹妹穿的比我還像新娘蒸矛。我一直安慰自己,他們只是感情好胸嘴,可當我...
    茶點故事閱讀 67,716評論 6 392
  • 文/花漫 我一把揭開白布雏掠。 她就那樣靜靜地躺著,像睡著了一般劣像。 火紅的嫁衣襯著肌膚如雪乡话。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 51,573評論 1 305
  • 那天耳奕,我揣著相機與錄音绑青,去河邊找鬼。 笑死屋群,一個胖子當著我的面吹牛闸婴,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播芍躏,決...
    沈念sama閱讀 40,314評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼邪乍,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起庇楞,我...
    開封第一講書人閱讀 39,230評論 0 276
  • 序言:老撾萬榮一對情侶失蹤榜配,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后吕晌,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體芥牌,經(jīng)...
    沈念sama閱讀 45,680評論 1 314
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,873評論 3 336
  • 正文 我和宋清朗相戀三年聂使,在試婚紗的時候發(fā)現(xiàn)自己被綠了壁拉。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 39,991評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡柏靶,死狀恐怖弃理,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情屎蜓,我是刑警寧澤痘昌,帶...
    沈念sama閱讀 35,706評論 5 346
  • 正文 年R本政府宣布,位于F島的核電站炬转,受9級特大地震影響辆苔,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜扼劈,卻給世界環(huán)境...
    茶點故事閱讀 41,329評論 3 330
  • 文/蒙蒙 一驻啤、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧荐吵,春花似錦骑冗、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,910評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至薯蝎,卻和暖如春遥倦,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背占锯。 一陣腳步聲響...
    開封第一講書人閱讀 33,038評論 1 270
  • 我被黑心中介騙來泰國打工袒哥, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人烟央。 一個月前我還...
    沈念sama閱讀 48,158評論 3 370
  • 正文 我出身青樓统诺,卻偏偏與公主長得像歪脏,于是被迫代替她去往敵國和親疑俭。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當晚...
    茶點故事閱讀 44,941評論 2 355

推薦閱讀更多精彩內(nèi)容