JUC解析 抽象隊(duì)列同步器(AQS-AbstractQueuedSynchronizer)

抽象隊(duì)列同步器(AQS-AbstractQueuedSynchronizer)

從名字上來理解:

  • 抽象:是抽象類,具體由子類實(shí)現(xiàn)
  • 隊(duì)列:數(shù)據(jù)結(jié)構(gòu)是隊(duì)列,使用隊(duì)列存儲數(shù)據(jù)
  • 同步:基于它可以實(shí)現(xiàn)同步功能

我們就從這幾個(gè)方面來入手解讀,但首先,我們得先知道以下幾個(gè)它的特點(diǎn),以便于理解

AbstractQueuedSynchronizer特點(diǎn)

1.AQS可以實(shí)現(xiàn)獨(dú)占鎖和共享鎖。

2.獨(dú)占鎖exclusive是一個(gè)悲觀鎖央渣。保證只有一個(gè)線程經(jīng)過一個(gè)阻塞點(diǎn),只有一個(gè)線程可以獲得鎖拔第。

3.共享鎖shared是一個(gè)樂觀鎖蚊俺。可以允許多個(gè)線程阻塞點(diǎn)得封,可以多個(gè)線程同時(shí)獲取到鎖忙上。它允許一個(gè)資源可以被多個(gè)讀操作茬斧,或者被一個(gè)寫操作訪問项秉,但是兩個(gè)操作不能同時(shí)訪問。

4.AQS使用一個(gè)int類型的成員變量state來表示同步狀態(tài)贷屎,當(dāng)state>0時(shí)表示已經(jīng)獲取了鎖唉侄,當(dāng)state = 0無鎖属划。它提供了三個(gè)方法(getState()同眯、setState(int newState)、compareAndSetState(int expect,int update))來對同步狀態(tài)state進(jìn)行操作明肮,可以確保對state的操作是安全的。

5.AQS是通過一個(gè)CLH隊(duì)列實(shí)現(xiàn)的(CLH鎖即Craig, Landin, and Hagersten (CLH) locks秫舌,CLH鎖是一個(gè)自旋鎖舅巷,能確保無饑餓性赋元,提供先來先服務(wù)的公平性媚值。CLH鎖也是一種基于鏈表的可擴(kuò)展褥芒、高性能献酗、公平的自旋鎖,申請線程只在本地變量上自旋颜及,它不斷輪詢前驅(qū)的狀態(tài),如果發(fā)現(xiàn)前驅(qū)釋放了鎖就結(jié)束自旋肄扎。)

抽象

我們來扒一扒源碼可以看到它繼承于AbstractOwnableSynchronizer它是一個(gè)抽象類.

public abstract class AbstractQueuedSynchronizer
    extends AbstractOwnableSynchronizer
    implements java.io.Serializable

AQS內(nèi)部使用了一個(gè)volatile的變量state來作為資源的標(biāo)識。同時(shí)定義了幾個(gè)獲取和改變state的protected方法,子類可以覆蓋這些方法來實(shí)現(xiàn)自己的邏輯.

可以看到類中為我們提供了幾個(gè)protected級別的方法,它們分別是:

//創(chuàng)建一個(gè)隊(duì)列同步器實(shí)例,初始state是0
protected AbstractQueuedSynchronizer() { }

//返回同步狀態(tài)的當(dāng)前值。
protected final int getState() {
        return state;
}

//設(shè)置同步狀態(tài)的值
protected final void setState(int newState) {
        state = newState;
    }

//獨(dú)占方式猜揪。嘗試獲取資源,成功則返回true钧萍,失敗則返回false。
protected boolean tryAcquire(int arg) {
        throw new UnsupportedOperationException();
    }    
    
    
//獨(dú)占方式万搔。嘗試釋放資源,成功則返回true,失敗則返回false。
protected boolean tryRelease(int arg) {
        throw new UnsupportedOperationException();
    }
    
    
//共享方式草姻。嘗試獲取資源。負(fù)數(shù)表示失敗剧劝;0表示成功谣妻,但沒有剩余可用資源;正數(shù)表示成功染突,且有剩余資源
protected int tryAcquireShared(int arg) {
        throw new UnsupportedOperationException();
    }    
    
    
//共享方式茵休。嘗試釋放資源,如果釋放后允許喚醒后續(xù)等待結(jié)點(diǎn)返回true,否則返回false。
protected boolean tryReleaseShared(int arg) {
        throw new UnsupportedOperationException();
    }
    

這些方法雖然都是protected方法,但是它們并沒有在AQS具體實(shí)現(xiàn),而是直接拋出異常,AQS實(shí)現(xiàn)了一系列主要的邏輯
由此可知,AQS是一個(gè)抽象的用于構(gòu)建鎖和同步器框架点弯,使用AQS能簡單且高效地構(gòu)造出應(yīng)用廣泛的同步器肃拜,比如我們提到的ReentrantLockSemaphore雌团,ReentrantReadWriteLock燃领,SynchronousQueueFutureTask等等皆是基于AQS的锦援。

我們自己也能利用AQS非常輕松容易地構(gòu)造出自定義的同步器猛蔽,只要子類實(shí)現(xiàn)它的幾個(gè)protected方法就可以了.

隊(duì)列

AQS類本身實(shí)現(xiàn)的是具體線程等待隊(duì)列的維護(hù)(如獲取資源失敗入隊(duì)/喚醒出隊(duì)等)。它內(nèi)部使用了一個(gè)先進(jìn)先出(FIFO)的雙端隊(duì)列(CLH),并使用了兩個(gè)指針head和tail用于標(biāo)識隊(duì)列的頭部和尾部曼库。其數(shù)據(jù)結(jié)構(gòu)如圖:


隊(duì)列并不是直接儲存線程区岗,而是儲存擁有線程的Node節(jié)點(diǎn)

我們來看看Node的結(jié)構(gòu):

static final class Node {
    // 標(biāo)記一個(gè)結(jié)點(diǎn)(對應(yīng)的線程)在共享模式下等待
    static final Node SHARED = new Node();
    // 標(biāo)記一個(gè)結(jié)點(diǎn)(對應(yīng)的線程)在獨(dú)占模式下等待
    static final Node EXCLUSIVE = null; 

    // waitStatus的值毁枯,表示該結(jié)點(diǎn)(對應(yīng)的線程)已被取消
    static final int CANCELLED = 1; 
    // waitStatus的值慈缔,表示后繼結(jié)點(diǎn)(對應(yīng)的線程)需要被喚醒
    static final int SIGNAL = -1;
    // waitStatus的值,表示該結(jié)點(diǎn)(對應(yīng)的線程)在等待某一條件
    static final int CONDITION = -2;
    
    //waitStatus的值种玛,表示有資源可用藐鹤,新head結(jié)點(diǎn)需要繼續(xù)喚醒后繼結(jié)點(diǎn)
    //(共享模式下,多線程并發(fā)釋放資源赂韵,而head喚醒其后繼結(jié)點(diǎn)后娱节,
    //需要把多出來的資源留給后面的結(jié)點(diǎn);設(shè)置新的head結(jié)點(diǎn)時(shí)祭示,會繼續(xù)喚醒其后繼結(jié)點(diǎn))
    static final int PROPAGATE = -3;

    // 等待狀態(tài)肄满,取值范圍,-3质涛,-2稠歉,-1,0蹂窖,1
    volatile int waitStatus;
    volatile Node prev; // 前驅(qū)結(jié)點(diǎn)
    volatile Node next; // 后繼結(jié)點(diǎn)
    volatile Thread thread; // 結(jié)點(diǎn)對應(yīng)的線程
    Node nextWaiter; // 等待隊(duì)列里下一個(gè)等待條件的結(jié)點(diǎn)


    // 判斷共享模式的方法
    final boolean isShared() {
        return nextWaiter == SHARED;
    }

    Node(Thread thread, Node mode) {     // Used by addWaiter
        this.nextWaiter = mode;
        this.thread = thread;
    }

    // 其它方法忽略轧抗,可以參考具體的源碼
}

// AQS里面的addWaiter私有方法
private Node addWaiter(Node mode) {
    // 使用了Node的這個(gè)構(gòu)造函數(shù)
    Node node = new Node(Thread.currentThread(), mode);
    // 其它代碼省略
}

過Node我們可以實(shí)現(xiàn)兩個(gè)隊(duì)列,一是通過prev和next實(shí)現(xiàn)CLH隊(duì)列(線程同步隊(duì)列,雙向隊(duì)列)瞬测,二是nextWaiter實(shí)現(xiàn)Condition條件上的等待線程隊(duì)列(單向隊(duì)列)横媚,這個(gè)Condition主要用在ReentrantLock類中

同步

兩種同步方式:

  • 獨(dú)占模式(Exclusive):資源是獨(dú)占的,一次只能一個(gè)線程獲取月趟。如ReentrantLock灯蝴。
  • 共享模式(Share):同時(shí)可以被多個(gè)線程獲取,具體的資源個(gè)數(shù)可以通過參數(shù)指定孝宗。如Semaphore/CountDownLatch穷躁。

同時(shí)實(shí)現(xiàn)兩種模式的同步類,如ReadWriteLock

獲取資源

獲取資源的入口是acquire(int arg)方法因妇。arg是要獲取的資源的個(gè)數(shù)问潭,在獨(dú)占模式下始終為1。

public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

首先調(diào)用tryAcquire(arg)嘗試去獲取資源婚被。前面提到了這個(gè)方法是在子類具體實(shí)現(xiàn)的
如果獲取資源失敗狡忙,就通過addWaiter(Node.EXCLUSIVE)方法把這個(gè)線程插入到等待隊(duì)列中。其中傳入的參數(shù)代表要插入的Node是獨(dú)占式的址芯。這個(gè)方法的具體實(shí)現(xiàn):

private Node addWaiter(Node mode) {
    // 生成該線程對應(yīng)的Node節(jié)點(diǎn)
    Node node = new Node(Thread.currentThread(), mode);
    // 將Node插入隊(duì)列中
    Node pred = tail;
    if (pred != null) {
        node.prev = pred;
        // 使用CAS嘗試灾茁,如果成功就返回
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    // 如果等待隊(duì)列為空或者上述CAS失敗窜觉,再自旋CAS插入
    enq(node);
    return node;
}

//AQS中會存在多個(gè)線程同時(shí)爭奪資源的情況,
//因此肯定會出現(xiàn)多個(gè)線程同時(shí)插入節(jié)點(diǎn)的操作北专,
//在這里是通過CAS自旋的方式保證了操作的線程安全性禀挫。

// 自旋CAS插入等待隊(duì)列
private Node enq(final Node node) {
    for (;;) {
        Node t = tail;
        if (t == null) { // Must initialize
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
            node.prev = t;
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}

若設(shè)置成功就代表自己獲取到了鎖,返回true拓颓。狀態(tài)為0設(shè)置1的動(dòng)作在外部就有做過一次语婴,內(nèi)部再一次做只是提升概率,而且這樣的操作相對鎖來講不占開銷驶睦。
如果狀態(tài)不是0腻格,則判定當(dāng)前線程是否為排它鎖的Owner,如果是Owner則嘗試將狀態(tài)增加acquires(也就是增加1)啥繁,如果這個(gè)狀態(tài)值越界,則會拋出異常提示青抛,若沒有越界旗闽,將狀態(tài)設(shè)置進(jìn)去后返回true(實(shí)現(xiàn)了類似于偏向的功能,可重入蜜另,但是無需進(jìn)一步征用)适室。
如果狀態(tài)不是0,且自身不是owner举瑰,則返回false捣辆。

現(xiàn)在通過addWaiter方法,已經(jīng)把一個(gè)Node放到等待隊(duì)列尾部了此迅。而處于等待隊(duì)列的結(jié)點(diǎn)是從頭結(jié)點(diǎn)一個(gè)一個(gè)去獲取資源的汽畴。具體的實(shí)現(xiàn)我們來看看acquireQueued方法:

final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        // 自旋
        for (;;) {
            final Node p = node.predecessor();
            // 如果node的前驅(qū)結(jié)點(diǎn)p是head,表示node是第二個(gè)結(jié)點(diǎn)耸序,就可以嘗試去獲取資源了
            if (p == head && tryAcquire(arg)) {
                // 拿到資源后忍些,將head指向該結(jié)點(diǎn)。
                // 所以head所指的結(jié)點(diǎn)坎怪,就是當(dāng)前獲取到資源的那個(gè)結(jié)點(diǎn)或null罢坝。
                setHead(node); 
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            // 如果自己可以休息了,就進(jìn)入waiting狀態(tài)搅窿,直到被unpark()
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

這里parkAndCheckInterrupt方法內(nèi)部使用到了LockSupport.park(this)嘁酿,順便簡單介紹一下park。
LockSupport類是Java 6 引入的一個(gè)類男应,提供了基本的線程同步原語闹司。LockSupport實(shí)際上是調(diào)用了Unsafe類里的函數(shù),歸結(jié)到Unsafe里殉了,只有兩個(gè)函數(shù):
park(boolean isAbsolute, long time):阻塞當(dāng)前線程
unpark(Thread jthread):使給定的線程停止阻塞

所以結(jié)點(diǎn)進(jìn)入等待隊(duì)列后开仰,是調(diào)用park使它進(jìn)入阻塞狀態(tài)的。只有頭結(jié)點(diǎn)的線程是處于活躍狀態(tài)的。

acquire方法 獲取資源的流程:

當(dāng)然众弓,獲取資源的方法除了acquire外恩溅,還有以下三個(gè):

  • acquireInterruptibly:申請可中斷的資源(獨(dú)占模式)
  • acquireShared:申請共享模式的資源
  • acquireSharedInterruptibly:申請可中斷的資源(共享模式)

可中斷的意思是,在線程中斷時(shí)可能會拋出InterruptedException

釋放資源

釋放資源相比于獲取資源來說谓娃,會簡單許多脚乡。在AQS中只有一小段實(shí)現(xiàn)。

源碼:

public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}

tryRelease方法
這個(gè)動(dòng)作可以認(rèn)為就是一個(gè)設(shè)置鎖狀態(tài)的操作滨达,而且是將狀態(tài)減掉傳入的參數(shù)值(參數(shù)是1)奶稠,如果結(jié)果狀態(tài)為0,就將排它鎖的Owner設(shè)置為null捡遍,以使得其它的線程有機(jī)會進(jìn)行執(zhí)行逊桦。

在排它鎖中锅移,加鎖的時(shí)候狀態(tài)會增加1(當(dāng)然可以自己修改這個(gè)值),在解鎖的時(shí)候減掉1,同一個(gè)鎖汁果,在可以重入后雹拄,可能會被疊加為2艳狐、3聘殖、4這些值,只有unlock()的次數(shù)與lock()的次數(shù)對應(yīng)才會將Owner線程設(shè)置為空续挟,而且也只有這種情況下才會返回true紧卒。
這一點(diǎn)大家寫代碼要注意,如果是在循環(huán)體中l(wèi)ock()或故意使用兩次以上的lock(),而最終只有一次unlock()诗祸,最終可能無法釋放鎖跑芳。導(dǎo)致死鎖.


private void unparkSuccessor(Node node) {
    // 如果狀態(tài)是負(fù)數(shù),嘗試把它設(shè)置為0
    int ws = node.waitStatus;
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0);
    // 得到頭結(jié)點(diǎn)的后繼結(jié)點(diǎn)head.next
    Node s = node.next;
    // 如果這個(gè)后繼結(jié)點(diǎn)為空或者狀態(tài)大于0
    // 通過前面的定義我們知道直颅,大于0只有一種可能聋亡,就是這個(gè)結(jié)點(diǎn)已被取消
    if (s == null || s.waitStatus > 0) {
        s = null;
        // 等待隊(duì)列中所有還有用的結(jié)點(diǎn),都向前移動(dòng)
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    // 如果后繼結(jié)點(diǎn)不為空际乘,
    if (s != null)
        LockSupport.unpark(s.thread);
}

方法unparkSuccessor(Node)坡倔,意味著真正要釋放鎖了,它傳入的是head節(jié)點(diǎn),內(nèi)部首先會發(fā)生的動(dòng)作是獲取head節(jié)點(diǎn)的next節(jié)點(diǎn)脖含,如果獲取到的節(jié)點(diǎn)不為空罪塔,則直接通過:“LockSupport.unpark()”方法來釋放對應(yīng)的被掛起的線程.

關(guān)注公眾號:java寶典

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市养葵,隨后出現(xiàn)的幾起案子征堪,更是在濱河造成了極大的恐慌,老刑警劉巖关拒,帶你破解...
    沈念sama閱讀 216,651評論 6 501
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件佃蚜,死亡現(xiàn)場離奇詭異庸娱,居然都是意外死亡,警方通過查閱死者的電腦和手機(jī)谐算,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,468評論 3 392
  • 文/潘曉璐 我一進(jìn)店門熟尉,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人洲脂,你說我怎么就攤上這事斤儿。” “怎么了恐锦?”我有些...
    開封第一講書人閱讀 162,931評論 0 353
  • 文/不壞的土叔 我叫張陵往果,是天一觀的道長。 經(jīng)常有香客問我一铅,道長陕贮,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 58,218評論 1 292
  • 正文 為了忘掉前任潘飘,我火速辦了婚禮飘蚯,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘福也。我一直安慰自己,他們只是感情好攀圈,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,234評論 6 388
  • 文/花漫 我一把揭開白布暴凑。 她就那樣靜靜地躺著,像睡著了一般赘来。 火紅的嫁衣襯著肌膚如雪现喳。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 51,198評論 1 299
  • 那天犬辰,我揣著相機(jī)與錄音嗦篱,去河邊找鬼。 笑死幌缝,一個(gè)胖子當(dāng)著我的面吹牛灸促,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播涵卵,決...
    沈念sama閱讀 40,084評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼浴栽,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了轿偎?” 一聲冷哼從身側(cè)響起典鸡,我...
    開封第一講書人閱讀 38,926評論 0 274
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎坏晦,沒想到半個(gè)月后萝玷,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體嫁乘,經(jīng)...
    沈念sama閱讀 45,341評論 1 311
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,563評論 2 333
  • 正文 我和宋清朗相戀三年球碉,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了蜓斧。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 39,731評論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡汁尺,死狀恐怖法精,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情痴突,我是刑警寧澤搂蜓,帶...
    沈念sama閱讀 35,430評論 5 343
  • 正文 年R本政府宣布,位于F島的核電站辽装,受9級特大地震影響帮碰,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜拾积,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,036評論 3 326
  • 文/蒙蒙 一殉挽、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧拓巧,春花似錦斯碌、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,676評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至承耿,卻和暖如春冠骄,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背加袋。 一陣腳步聲響...
    開封第一講書人閱讀 32,829評論 1 269
  • 我被黑心中介騙來泰國打工凛辣, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人职烧。 一個(gè)月前我還...
    沈念sama閱讀 47,743評論 2 368
  • 正文 我出身青樓扁誓,卻偏偏與公主長得像,于是被迫代替她去往敵國和親蚀之。 傳聞我的和親對象是個(gè)殘疾皇子跋理,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,629評論 2 354

推薦閱讀更多精彩內(nèi)容