JAVA 多線程與高并發(fā)學(xué)習(xí)筆記(十四)——AQS核心原理

基于 CAS自旋實現(xiàn)的輕量級鎖在惡性自旋時會消費大量的CPU資源。解決這個問題有2種方案:分散操作熱點和使用隊列削峰她按。JUC并發(fā)包使用的是隊列削峰的方案解決CAS性能問題渠旁,并提供了一個基于雙向隊列的削峰基類——抽象基礎(chǔ)類AbstractQueuedSyncronizer(抽象同步器類风响,簡稱為AQS)。

AQS的內(nèi)部隊列

AQS是CLH隊列的一個變種羡亩,原理和CLH類似摩疑。AQS隊列內(nèi)部維護(hù)的是一個FIFO雙向鏈表,其中的每個節(jié)點其實是由線程封裝的畏铆,當(dāng)線程爭搶鎖失敗后會封裝成節(jié)點加入AQS隊列中:當(dāng)獲取鎖的線程釋放鎖以后雷袋,會從隊列中喚醒一個阻塞的節(jié)點。

AQS的內(nèi)部結(jié)構(gòu)可以參考下圖辞居。

AQS內(nèi)部結(jié)構(gòu).png

AQS核心成員

AQS基本模板模式實現(xiàn)楷怒。AQS為鎖獲取、鎖釋放的排隊和出隊過程提供了一系列的模板方法瓦灶。由于JUC的顯式鎖種類豐富鸠删,因此AQS將不同鎖的具體操作抽取為鉤子方法,供各類鎖去實現(xiàn)贼陶。

狀態(tài)標(biāo)志位

AQS 中維持了一個單一的 volatile 修飾的狀態(tài)信息是 state刃泡,AQS使用int類型的state標(biāo)志鎖的狀態(tài)巧娱,可以理解為鎖的同步狀態(tài)。

// 同步狀態(tài)
private volatile int state;

state 因為使用了 volatile 保證了操作的可見性捅僵,AQS提供了 getState()家卖、setState() 來獲取和設(shè)置同步狀態(tài)。

// 獲取同步狀態(tài)
protected final int getState() {
    return state;
}

// 設(shè)置同步狀態(tài)
protected final void setState(int newState) {
    state = neWState;
}

// 通過CAS設(shè)置同步狀態(tài)
protected final boolean compareAndSetState(int expect, int update) {
    return unsafe.comapreAndSwapInt(this, stateOffset, expect, update);
}

由于 setState() 無法保證原子性庙楚,因此AQS給我們提供了 compareAndSetState() 方法利用底層 Unsafe 的 CAS機(jī)制來實現(xiàn)原子性。

AbstractQueueSynchronizer 繼承了 AbstarctOwnableSynchronizer趴樱,這個基類只有一個變量叫 exclusiveOwnerThread馒闷,表示當(dāng)前占用該鎖的線程,并且提供了相應(yīng)的 get()set() 方法叁征。具體如下:

public abstract class AbstractOwnableSynchronizer implements java.io.Serializable {

    // 表示當(dāng)前占用該鎖的線程
    private transient Thread exlusiveOwnerThread;

    ...
}

隊列節(jié)點類

AQS是一個虛擬隊列纳账,不存在隊列實例,僅存在節(jié)點之間的前后關(guān)系捺疼。節(jié)點類型通過內(nèi)部類 Node 定義疏虫。

static final class Node {

    // 節(jié)點等待狀態(tài)值:取消狀態(tài)
    static final int CANCELLED = 1;
    // 節(jié)點等待狀態(tài)值:標(biāo)識后繼線程處于等待狀態(tài)
    static final int SIGNAL = -1;
    // 節(jié)點等待狀態(tài)值:標(biāo)識當(dāng)前線程正在進(jìn)行條件等待
    static final int CONDITION = -2;
    // 節(jié)點等待狀態(tài)值:標(biāo)識下一次共享鎖的acquireShared操作需要無條件傳播
    static final int PROPAGATE = -3;
    // 節(jié)點狀態(tài)
    // 普通的同步節(jié)點初始值為0,條件等待節(jié)點的初始值為CONDITION(-2)
    volatile int waitState;
    // 節(jié)點所對應(yīng)的線程啤呼,為搶鎖線程或者條件等待線程
    volatile Thread thread;
    // 前驅(qū)節(jié)點
    volatile Node prev;
    // 后繼節(jié)點
    volatile Node next;
    // 此屬性指向下一個條件等待節(jié)點
    Node nextWaiter;
}

waitStatus 屬性

每個節(jié)點與等待線程關(guān)聯(lián)卧秘,每個節(jié)點維護(hù)一個狀態(tài) waitStatus,它的各個常量值具體如下:

  • CANCELLED(1)官扣。1表示該線程節(jié)點已經(jīng)釋放(超時翅敌、中斷),已取消的節(jié)點不會再阻塞惕蹄,不會參與競爭蚯涮,會一直保持取消狀態(tài)。
  • SIGNAL(-1)卖陵。表示其后繼的節(jié)點處于等待狀態(tài)遭顶,當(dāng)前節(jié)點對應(yīng)的線程如果釋放了同步狀態(tài)或者被取消,就會通知后繼節(jié)點泪蔫,使后繼節(jié)點的線程得以運(yùn)行棒旗。
  • CONDITION(-2)。表示該線程在條件隊列中阻塞鸥滨,表示節(jié)點在等待隊列中嗦哆。當(dāng)持有鎖的線程調(diào)用了CONDITION的signal() 方法之后,節(jié)點會從該CONDITION的等待隊列轉(zhuǎn)移到該鎖的同步隊列上婿滓,去競爭鎖老速。
  • PROPAGATE(-3)。表示下一個線程獲取共享鎖后凸主,自己的共享狀態(tài)會被無條件地傳播下去橘券,因為共享鎖可能出現(xiàn)同時有N個鎖可以用。
  • 0。表示當(dāng)前節(jié)點處于初始狀態(tài)旁舰。

thread成員

Node 的 thread 成員用來存放進(jìn)入AQS隊列中的線程引用锋华,Node的nextWaiter成員用來執(zhí)行自己的后繼等待節(jié)點。

搶占類型常量標(biāo)識

Node 節(jié)點還定義了兩個搶占類型常量標(biāo)識箭窜,SHARED和EXCLUSIVE毯焕,具體如下:

// 標(biāo)識節(jié)點在搶占共享鎖
static final Node SHARED = new Node();

// 表示節(jié)點在搶占獨占鎖
static final Node EXCLUSIVE = null;

FIFO雙向同步隊列

AQS通過內(nèi)置的FIFO雙向隊列來完成線程的排隊工作,內(nèi)部通過節(jié)點 head 和 tail 記錄隊首和隊尾元素磺樱,元素的節(jié)點類型為 Node 類型纳猫,具體如下:

/* 首節(jié)點的引用 */
private transient volatile Node head;

/* 尾節(jié)點的引用 */
private transient volatile Node tail;

AQS 的首尾節(jié)點都是懶加載的,需要的時候才真正創(chuàng)建竹捉。只有在線程競爭失敗的情況下芜辕,有新線程加入同步隊列時,AQS才創(chuàng)建一個head節(jié)點块差。尾節(jié)點只在有新線程阻塞時才被創(chuàng)建侵续。

JUC顯式鎖和AQS的關(guān)系

AQS是一個同步器,它實現(xiàn)了鎖的基本抽象功能憨闰,支持獨占鎖與共享鎖兩種方式状蜗,該類是使用模板模式來實現(xiàn)的,成為構(gòu)建鎖和同步器的框架起趾,使用該類可以簡單且高效地構(gòu)造出應(yīng)用廣泛的同步器诗舰。

JUC中的顯式鎖、線程同步工具等训裆,內(nèi)部都使用了AQS作為等待隊列眶根。

AQS中的模板模式

模板模式

模板模式是類的行為模式。準(zhǔn)備一個抽象類边琉,將部分的邏輯以具體方法的形式實現(xiàn)属百,然后聲明一些抽象方法來迫使子類實現(xiàn)剩余的邏輯。不同的子類提供不同的方式實現(xiàn)這些抽象方法变姨,從而對剩余的邏輯有不同的實現(xiàn)族扰。

模板模式的關(guān)鍵在于:父類提供框架性的公共邏輯,子類提供個性化的定制邏輯定欧。

模板方法(Template Method)也被稱為骨架方法渔呵,主要定義了整個方法需要實現(xiàn)的業(yè)務(wù)操作的算法框架。

鉤子方法(Hook Method)是被模板方法的算法框架所調(diào)用的砍鸠,由子類提供具體的實現(xiàn)方法扩氢。

AQS的模板流程

AQS定義了兩種資源共享方式:

  • Exclusive(獨占鎖):只有一個線程能占有鎖資源。
  • Share(共享鎖):多個線程可以同時占有鎖資源爷辱。

AQS 為不同的資源共享方式提供了不同的模板流程录豺。

AQS中的鉤子方法

自定義同步器時朦肘,AQS中需要重寫的鉤子方法大致如下:

  • tryAcquire(int):獨占鎖鉤子,嘗試獲取資源双饥,若成功則返回true媒抠。
  • tryRelease(int):獨占鎖鉤子,嘗試釋放資源咏花,若成功則返回true趴生。
  • tryAcquireShared(int):共享鎖鉤子,嘗試獲取資源迟螺,負(fù)數(shù)表示失敗冲秽,0表示成功,但沒有剩余可用資源矩父,正數(shù)表示成功,且有剩余資源排霉。
  • tryReleaseShared(int):共享鎖鉤子窍株,嘗試釋放資源,若成功則返回true攻柠。
  • isHeldExclusively():獨占鎖鉤子球订,判斷該線程是否正在獨占資源。

以上鉤子的默認(rèn)實現(xiàn)會拋出 UnsupportOperationException 異常瑰钮。

通過AQS實現(xiàn)一把簡單的獨占鎖

本部分模擬 ReentrantLock 實現(xiàn)一個簡單的獨占鎖冒滩,真實的 ReentrantLock 要復(fù)雜很多。


public class SimpleMockLock implements Lock {
    private final Sync sync = new Sync();
    @Override
    public void lock() {
        sync.acquire(1);
    }

    @Override
    public void unlock() {
        sync.release(1);
    }
    

    private static class Sync extends AbstractQueuedSynchronizer {

        // 鉤子方法
        protected boolean tryAcquire(int arg) {
            // CAS更新狀態(tài)值為1
            if(compareAndSetState(0, 1)) {
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
            return false;
        }

        // 鉤子方法
        protected boolean tryRelease(int arg) {
            if(Thread.currentThread() != getExclusiveOwnerThread()) {
                throw new IllegalMonitorStateException();
            }
            
            if(getState() == 0) {
                throw new IllegalMonitorStateException();
            }
            
            setExclusiveOwnerThread(null);
            setState(0);
            return true;
        }
    }
}

AQS搶占鎖的原理

下面通過AQS中的方法講解一下AQS搶占鎖的原理浪谴。

AQS模板方法:acquire

acquire 是AQS 封裝好的獲取資源的公共入口开睡,它是AQS提供的利用獨占的方式獲取資源的方法,編碼實現(xiàn)如下:

public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

acquire 至少執(zhí)行一次 tryAcquire 鉤子方法苟耻。若調(diào)用成功篇恒,則 acquire 直接返回,表示已經(jīng)搶到鎖凶杖,若不成功胁艰,則將線程加入等待隊列。

鉤子實現(xiàn): tryAcquire

tryAcquire 是需要實現(xiàn)的鉤子方法智蝠,可以參照我們前面實現(xiàn)的 SimpleMockLock腾么。

直接入隊:addWaiter

在 acquire 模板方法中,如果鉤子方法 tryAcquire 嘗試獲取同步狀態(tài)失敗的話杈湾,就構(gòu)造同步節(jié)點解虱,通過 addWaiter 方法將該節(jié)點加入同步隊列的隊尾。

private Node addWaiter(Node mode) {
    // 創(chuàng)建新節(jié)點
    Node node = new Node(Thread.currentThread(), mode);
    // Try the fast path of enq; backup to full enq on failure
    // 加入隊列尾部毛秘,將目前的隊列tail作為自己的前驅(qū)節(jié)點pred
    Node pred = tail;
    if (pred != null) {
        node.prev = pred;
        // 現(xiàn)場時通過AQS方式修改尾節(jié)點為最新的節(jié)點
        // 如果修改陳宮饭寺,將節(jié)點加入隊列的尾部
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    // 第一次嘗試添加尾部失敗阻课,意味著有并發(fā)搶鎖發(fā)生,需要進(jìn)行自旋
    enq(node);
    return node;
}

addWaiter 方法中艰匙,首先需要構(gòu)造一個 Node 對象限煞,其中有兩個參數(shù),第一個是當(dāng)前線程员凝,第二個是Node節(jié)點署驻,可能值為 SHARED 或 EXCLUSIVE。

自旋入隊:enq

addWaiter 第一次嘗試在尾部加節(jié)點失敗健霹,意味著有并發(fā)搶鎖發(fā)生旺上,需要進(jìn)行自旋,enq 方法通過 CAS 自旋將節(jié)點添加到隊列尾部糖埋。

/**
 * 這里進(jìn)行了循環(huán)宣吱,如果此時存在tail,就執(zhí)行添加新隊尾的操作
 * 如果依然不存在瞳别,就把當(dāng)前線程作為 head 節(jié)點
 * 插入節(jié)點后征候,調(diào)用acquireQueued() 進(jìn)行阻塞
 */
private Node enq(final Node node) {
    for (;;) {
        Node t = tail;
        if (t == null) { // Must initialize
            // 隊列為空,初始化尾節(jié)點和頭節(jié)點為新節(jié)點
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
            node.prev = t;
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}

/**
 * CAS 操作head指針祟敛,僅僅被enq()調(diào)用
 */
private final boolean compareAndSetHead(Node update) {
    return unsafe.compareAndSwapObject(this, headOffset, null, update);
}

/**
 * CAS 操作tail指針疤坝,僅僅被enq()調(diào)用
 */
private final boolean compareAndSetTail(Node expect, Node update) {
    return unsafe.compareAndSwapObject(this, tailOffset, expect, update);
}

自旋搶占:acquireQueued

在節(jié)點入隊之后,啟動自旋搶鎖的流程馆铁,aquireQueued方法的主要邏輯:當(dāng)前Node節(jié)點線程在死循環(huán)中不斷獲取同步狀態(tài)跑揉,并且不斷在前驅(qū)節(jié)點上自旋,只有當(dāng)前驅(qū)節(jié)點是頭節(jié)點時才能嘗試獲取鎖埠巨,原因是:

  1. 頭結(jié)點是成功獲取同步狀態(tài)(鎖)的節(jié)點历谍,而頭節(jié)點的線程釋放了同步狀態(tài)以后,將會喚醒其后繼節(jié)點乖订,后繼節(jié)點的線程被喚醒后要檢查自己的前驅(qū)節(jié)點是否為頭節(jié)點扮饶。
  2. 維護(hù)同步隊列的FIFO原則,節(jié)點進(jìn)入同步隊列之后乍构,就進(jìn)入了自旋的過程甜无,每個節(jié)點都在不斷地執(zhí)行for死循環(huán)。
final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            // 自選檢查當(dāng)前節(jié)點的前驅(qū)節(jié)點是否為頭節(jié)點哥遮,才能獲取鎖
            for (;;) {
                // 獲取節(jié)點的前驅(qū)節(jié)點
                final Node p = node.predecessor();
                // 節(jié)點中的線程循環(huán)地檢查自己的前驅(qū)節(jié)點是否為head節(jié)點
                // 前驅(qū)節(jié)點是head時岂丘,進(jìn)一步調(diào)用子類的 tryAcquire 實現(xiàn)
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                // 檢查前一個節(jié)點的狀態(tài),預(yù)判當(dāng)前獲取鎖失敗的線程是否要掛起
                // 如果需要掛起眠饮,調(diào)用 parkAndCheckInterrupt 方法掛起當(dāng)前線程奥帘,直到被喚醒
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true; // 若兩個操作都是true,則置true
            }
        } finally {
            // 如果等待過程中沒有成功獲取資源仪召,
            // 那么取消節(jié)點在隊列中的等待
            if (failed)
                // 取消等待寨蹋,將當(dāng)前節(jié)點從隊列中移除
                cancelAcquire(node);
        }
    }

為了不浪費資源松蒜,acquireQueued自旋過程中會阻塞線程,等待被前驅(qū)節(jié)點喚醒后才啟動循環(huán)已旧。如果成功就返回秸苗,否則執(zhí)行 shouldParkAfterFailedAcquire、parkAndCheckInterrupt來達(dá)到阻塞的效果运褪。

掛起預(yù)判:shouldParkAfterFailedAcquire

shouldParkAfterFailedAcquire 方法的主要功能是惊楼,將當(dāng)前節(jié)點的有效前驅(qū)節(jié)點(不是CANCELLED類型的節(jié)點)找到,并且將有效前驅(qū)節(jié)點的狀態(tài)設(shè)置為SIGNAL秸讹,之后返回true代表當(dāng)前線程可以馬上被阻塞了檀咙。

具體分為三種情況:

  1. 如果前驅(qū)節(jié)點的狀態(tài)為-1(SIGNAL),說明前驅(qū)的等待標(biāo)志已設(shè)好璃诀,返回true表示設(shè)置完畢弧可。
  2. 如果前驅(qū)節(jié)點的狀態(tài)為1(CANCELLED),說明前驅(qū)節(jié)點本身不再等待了劣欢,需要跨越這些節(jié)點侣诺,然后找到一個有效節(jié)點,再把當(dāng)前節(jié)點和這個有效節(jié)點的喚醒關(guān)系建立好氧秘,調(diào)整前驅(qū)節(jié)點的next指針為自己。
  3. 如果其他情況:-3趴久、-2丸相、0,那么通過CAS嘗試設(shè)置前驅(qū)節(jié)點為SIGNAL彼棍,表示只要前驅(qū)節(jié)點釋放鎖灭忠,當(dāng)前節(jié)點就可以搶占鎖了。
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    int ws = pred.waitStatus; // 獲取前驅(qū)節(jié)點的狀態(tài)
    if (ws == Node.SIGNAL)
        /*
            * This node has already set status asking a release
            * to signal it, so it can safely park.
            */
        return true;
    if (ws > 0) {
        /*
            * Predecessor was cancelled. Skip over predecessors and
            * indicate retry.
            */
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
        /*
            * waitStatus must be 0 or PROPAGATE.  Indicate that we
            * need a signal, but don't park yet.  Caller will need to
            * retry to make sure it cannot acquire before parking.
            */
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}

線程掛起:parkAndCheckInterrupt

parkAndCheckInterrupt 的主要任務(wù)是暫停當(dāng)前線程座硕,具體如下:

private final boolean parkAndCheckInterrupt() {
    LockSupport.park(this); // 調(diào)用park()使線程進(jìn)入waiting狀態(tài)
    return Thread.interrupted(); // 如果被喚醒弛作,查看自己是否已經(jīng)被中斷
}

AQS 會把所有等待的線程構(gòu)成一個阻塞等待隊列,當(dāng)一個線程執(zhí)行完 lock.unlock()华匾,會激活其后繼節(jié)點映琳,通過 LockSupport.unpark(postThread) 完成后繼線程的喚醒。

AQS 的兩個關(guān)鍵點:節(jié)點的入隊和出隊

理解AQS的一個關(guān)鍵點是掌握節(jié)點的入隊和出隊蜘拉。

節(jié)點的自旋入隊

節(jié)點在第一次入隊失敗后萨西,就會開始自旋入隊,分為以下兩種情況:

  1. 如果AQS的隊列非空旭旭,新節(jié)點入隊的插入位置在隊列的尾部谎脯,并且通過CAS方式插入,插入之后AQS的tail將指向新的尾節(jié)點持寄。

  2. 如果AQS的隊列為空源梭,新節(jié)點入隊時娱俺,AQS 通過 CAS 方法將新節(jié)點設(shè)置為頭節(jié)點 head,并將 tail 指針指向新節(jié)點废麻。

private Node enq(final Node node) {
    for (;;) { // 自旋入隊
        Node t = tail;
        if (t == null) { 
            // 隊列為空荠卷,初始化尾節(jié)點和頭節(jié)點為新節(jié)點
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
            // 如果隊列不為空,將新節(jié)點插入隊列尾部
            node.prev = t;
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}

節(jié)點的出隊

節(jié)點的出隊算法在 acquireQueued() 方法中脑溢,這是一個模板方法僵朗,acquireQueued() 方法不斷在前驅(qū)節(jié)點上自旋(for 循環(huán)),如果前驅(qū)節(jié)點是頭節(jié)點并且當(dāng)前線程使用鉤子方法 tryAcquire() 獲得了鎖屑彻,就移除頭節(jié)點验庙,將當(dāng)前節(jié)點設(shè)置為頭節(jié)點。

final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        // 中斷標(biāo)記
        boolean interrupted = false;
        for (;;) {
            // 獲取當(dāng)前線程節(jié)點的前驅(qū)節(jié)點
            final Node p = node.predecessor();
            // 如果前驅(qū)節(jié)點是頭節(jié)點社牲,則使當(dāng)前線程嘗試獲取鎖資源(tryAcquire方法忘了回頭看第五步)
            if (p == head && tryAcquire(arg)) {
                // 如果當(dāng)前程線程獲取鎖資源成功粪薛,則將當(dāng)前線程節(jié)點設(shè)置為頭節(jié)點
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            // 根據(jù)前驅(qū)節(jié)點p的等待狀態(tài)判斷是否要將當(dāng)前線程阻塞
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        // 生成CANCELLED狀態(tài)節(jié)點,并喚醒節(jié)點
        if (failed)
            cancelAcquire(node);
    }
}

節(jié)點加入隊列尾部后搏恤,如果其前驅(qū)節(jié)點不是頭節(jié)點违寿,通常情況下,該新節(jié)點所綁定的線程會被無限期阻塞熟空,而不會去執(zhí)行無效循環(huán)藤巢,從而導(dǎo)致CPU資源的浪費。

對于公平鎖息罗,頭節(jié)點一般是占用鎖的節(jié)點掂咒,在釋放鎖時,會喚醒其后稷街店所綁定的線程迈喉,后繼節(jié)點的線程被喚醒后會重新執(zhí)行自旋搶鎖邏輯绍刮。

AQS 釋放鎖的原理

AQS 釋放鎖分成 3 個部分:

release 模板方法

首先會調(diào)用 AQS 的 release 模板方法,代碼如下:

public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}

可以看到其中主要有 2 個方法挨摸,一個是 tryRelease 鉤子方法孩革,該方法會嘗試釋放當(dāng)前線程持有的資源,由子類根據(jù)具體業(yè)務(wù)提供具體實現(xiàn)得运,執(zhí)行成功后返回 true膝蜈。

另一個方法是 unparkSucessor(),用來喚醒后繼節(jié)點澈圈,代碼如下彬檀。

private void unparkSuccessor(Node node) {
    // 獲取節(jié)點狀態(tài),釋放鎖的節(jié)點瞬女,也就是頭節(jié)點
    int ws = node.waitStatus;
    // 若頭節(jié)點狀態(tài)小于0窍帝,則將其置為0,表示初始狀態(tài)
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0);

    Node s = node.next; // 找到后頭的一個節(jié)點
    if (s == null || s.waitStatus > 0) {
        // 如果新節(jié)點已經(jīng)被取消
        s = null;
        // 從隊列尾部開始诽偷,往前去找醉千年的一個 waitStatus 小于0的節(jié)點
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    // 喚醒后繼節(jié)點對應(yīng)的線程
    if (s != null)
        LockSupport.unpark(s.thread);
}

ReentrantLock 搶鎖流程

下面分別對 ReentrantLock 的公平鎖和非公平鎖的實現(xiàn)進(jìn)行講述坤学。

ReentrantLock 非公平鎖的搶占流程

總體流程圖如下:

reentrantlock1.png

非公平鎖的同步器子類

ReentrantLock 為非公平鎖實現(xiàn)了一個內(nèi)部的同步器—— NofairSync疯坤,器顯式鎖獲取方法 lock() 的源碼如下:

static final class NonfairSync extends Sync {
    private static final long serialVersionUID = 7316153563782823691L;

    final void lock() {
        if (compareAndSetState(0, 1))
            setExclusiveOwnerThread(Thread.currentThread());
        else
            acquire(1);
    }

    protected final boolean tryAcquire(int acquires) {
        return nonfairTryAcquire(acquires);
    }
}

首先用一個CAS操作判斷 state 是不是0(表示當(dāng)前鎖未被占用),如果是0就把它置為1深浮,并且設(shè)置當(dāng)前線程為該鎖的獨占線程压怠,表示獲取鎖成功。當(dāng)多個線程同時嘗試占用一個鎖時飞苇,CAS操作只能保證一個線程操作成功菌瘫,剩下的要排隊。

非公平體現(xiàn)在:如果占用鎖的線程剛剛釋放鎖布卡,state 置為0雨让,而排隊等待鎖 的線程還未喚醒,新來的線程就直接搶占了該鎖忿等,那么久“插隊”了栖忠。

非公平搶占的鉤子方法:tryAcquire

如果非公平搶占沒有成功,非公平鎖的 lock 匯之星模板方法 acquire贸街,首先會調(diào)用鉤子方法 tryAcquire庵寞,非公平搶占的鉤子方法實現(xiàn)如下:

static final class NonfairSync extends Sync {
    ...

    protected final boolean tryAcquire(int acquires) {
        return nonfairTryAcquire(acquires);
    }
}
 abstract static class Sync extends AbstractQueuedSynchronizer {
   
    final boolean nonfairTryAcquire(int acquires) {
        final Thread current = Thread.currentThread();
        // 先直接獲取鎖的狀態(tài)
        int c = getState();
        if (c == 0) {
            // 如果內(nèi)部隊列首節(jié)點的線程執(zhí)行晚了,它會將鎖的state置為0
            // 當(dāng)前搶鎖線程的下一步就是直接進(jìn)行搶占
            // 發(fā)現(xiàn)state是空的薛匪,就直接拿來加鎖使用
            if (compareAndSetState(0, acquires)) {
                // 1.利用CAS自旋方式確認(rèn)當(dāng)前state確實為0捐川,然后設(shè)置acquire(1)
                setExclusiveOwnerThread(current);
                // 設(shè)置當(dāng)前執(zhí)行的線程,直接返回true
                return true;
            }
        }
        else if (current == getExclusiveOwnerThread()) {
            // 2.當(dāng)前線程和執(zhí)行中的線程是同一個逸尖,也就意味著可重入操作
            int nextc = c + acquires;
            if (nextc < 0) // overflow
                throw new Error("Maximum lock count exceeded");
            setState(nextc);
            // 表示當(dāng)前鎖被1個線程重復(fù)獲取了nextc次
            return true;
        }
        // 否則就返回false属拾,表示沒有成功獲取當(dāng)前鎖,進(jìn)入排隊過程
        return false;
    }
    ...
}

其核心思想是當(dāng)前線程嘗試獲取鎖的時候冷溶,如果發(fā)現(xiàn)鎖的狀態(tài)為0,則直接嘗試將鎖拿過來尊浓,而不會考慮其他排隊節(jié)點逞频。

ReentrantLock 公平鎖的搶占流程

ReentrantLock 公平鎖的搶占流程如下:

reetrantlock2.png

公平鎖的同步器子類

ReentrantLock 為公平鎖實現(xiàn)了一個內(nèi)部的同步器——FairSync,其顯式鎖獲取方法 lock 的代碼如下:

static final class FairSync extends Sync {
        
    final void lock() {
        acquire(1);
    }
    ...
}

其核心思想是通過 AQS 模板方法進(jìn)入隊列入隊操作栋齿。

公平搶占的鉤子方法:tryAcquire

公平鎖的 lock 會執(zhí)行模板方法 acquire苗胀,該方法首先會調(diào)用鉤子方法 tryAcquire,其實現(xiàn)如下:

static final class FairSync extends Sync {
    ...

    protected final boolean tryAcquire(int acquires) {
        final Thread current = Thread.currentThread();
        int c = getState(); // 鎖狀態(tài)
        if (c == 0) {
            if (!hasQueuedPredecessors() &&
                compareAndSetState(0, acquires)) {
                setExclusiveOwnerThread(current);
                return true;
            }
        }
        else if (current == getExclusiveOwnerThread()) {
            int nextc = c + acquires;
            if (nextc < 0)
                throw new Error("Maximum lock count exceeded");
            setState(nextc);
            return true;
        }
        return false;
    }
}

公平鎖的鉤子方法中瓦堵,首先判斷是否有后繼節(jié)點基协,如果有,并且當(dāng)前線程不是鎖的占有線程菇用,鉤子方法就返回 false澜驮,模板方法會進(jìn)入排隊的執(zhí)行流程。

是否有后繼節(jié)點的判斷

FairSync 是否有后繼節(jié)點的判斷代碼如下:

public final boolean hasQueuedPredecessors() {
    Node t = tail; 
    Node h = head;
    Node s;
    return h != t &&
        ((s = h.next) == null || s.thread != Thread.currentThread());
}

執(zhí)行場景大致如下:

  1. 當(dāng) h != t 不成立的時候惋鸥,說明 h 頭節(jié)點杂穷,t 尾節(jié)點要么是同一節(jié)點悍缠。要么都是 null,此時返回 false耐量,表示沒有后繼節(jié)點飞蚓。

  2. 當(dāng) h != t 成立的時候,進(jìn)一步檢查 head.next 是否為 null廊蜒,如果為 null趴拧,返回 true,這種場景在有其他線程第一次正在入隊時可能會出現(xiàn)山叮。

  3. 如果 h != t 成立著榴,head.next != null,判斷 head.next 是不是當(dāng)前線程聘芜,如果是就返回 false兄渺,不是就返回true

AQS 條件隊列

Condition 是 JUC 用來代替?zhèn)鹘y(tǒng) Objectwait/notify 線程間通信與寫作機(jī)制的新組件汰现,它更加的高效挂谍。

Condition 基本原理

ConditionObject 類是實現(xiàn)條件隊列的關(guān)鍵,每個 ConditionObject 對象都維護(hù)一個單獨的條件等待隊列瞎饲。每個 ConditionObject 對應(yīng)一個條件隊列口叙,它記錄該隊列的頭結(jié)點和尾節(jié)點。

public class ConditionObject implements Condition, java.io.Serializable {
        
    /** First node of condition queue. */
    private transient Node firstWaiter;
    /** Last node of condition queue. */
    private transient Node lastWaiter;
}

一個 Condition 對象是一個單條件的等待隊列.

condtionObject.png

在一個顯式鎖上嗅战,我們可以創(chuàng)建多個等待任務(wù)隊列妄田,這點和內(nèi)置鎖不同,Java 內(nèi)置鎖上只有唯一的一個等待隊列驮捍。比如疟呐,我們可以使用 newCondition 創(chuàng)建兩個等待隊列,具體如下:

private Lock lock = new ReentrantLock();
//創(chuàng)建第一個等待隊列
private Condition firstCond = lock.newCondition();
//創(chuàng)建第二個等待隊列
private Condition secondCond = lock.newCondition();
condition.png

await等待方法原理

當(dāng)線程調(diào)用 await 方法時东且,說明當(dāng)前線程的節(jié)點為當(dāng)前AQS隊列的頭節(jié)點启具,正好處于占有鎖的狀態(tài),await 方法需要把該線程從 AQS 隊列挪到 Condition 等待隊列里珊泳。

在 await 方法將當(dāng)前線程挪動到 Condition 等待隊列后鲁冯,還會喚醒 AQS 同步隊列中的 head 節(jié)點的下一個節(jié)點,方法代碼如下:

 public final void await() throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    Node node = addConditionWaiter(); // 1
    int savedState = fullyRelease(node); // 2
    int interruptMode = 0;
    while (!isOnSyncQueue(node)) { // 3
        LockSupport.park(this);
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
    }
    if (acquireQueued(node, savedState) &&  interruptMode != THROW_IE) // 4
        interruptMode = REINTERRUPT;
    if (node.nextWaiter != null) // 5
        unlinkCancelledWaiters();
    if (interruptMode != 0)
        reportInterruptAfterWait(interruptMode);
}

整體流程如下:

  1. 執(zhí)行 await 時色查,會新創(chuàng)建一個節(jié)點并放入 Condition 隊列尾部薯演。
  2. 然后釋放鎖,并喚醒AQS同步隊列中的頭節(jié)點的后一個節(jié)點秧了。
  3. 然后執(zhí)行 while 循環(huán)跨扮,將該節(jié)點的線程阻塞,直到該節(jié)點離開等待隊列,重新回到同步隊列成為同步節(jié)點后好港,線程才退出 while 循環(huán)愉镰。
  4. 退出循環(huán)后,開始調(diào)用 acquireQueued() 不斷嘗試拿鎖钧汹。
  5. 拿到鎖后丈探,會清空 Condition 隊列中被取消的節(jié)點。

創(chuàng)建一個新節(jié)點并放入 Condition 隊列尾部的工作由 addCondtionWaiter 方法完成拔莱。

private Node addConditionWaiter() {
    Node t = lastWaiter;
    // 如果尾節(jié)點取消碗降,重新定位尾節(jié)點
    if (t != null && t.waitStatus != Node.CONDITION) {
        unlinkCancelledWaiters();
        t = lastWaiter;
    }
    // 創(chuàng)建一個新Node,作為等待節(jié)點
    Node node = new Node(Thread.currentThread(), Node.CONDITION);
    // 將新Node加入等待隊列
    if (t == null)
        firstWaiter = node;
    else
        t.nextWaiter = node;
    lastWaiter = node;
    return node;
}

signal喚醒方法原理

線程在某個 ConditionObject 對象上調(diào)用 signal 方法后塘秦,等待隊列中的 firstWaiter 會被加入同步隊列中锅知,等待節(jié)點被喚醒氯材,流程如下:

public final void signal() {
    // 如果當(dāng)前線程不是持有該鎖的線程,就拋出異常
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    Node first = firstWaiter;
    if (first != null)
        doSignal(first); // 喚醒節(jié)點
}

// 執(zhí)行喚醒
private void doSignal(Node first) {
    do {
        // 出隊的代碼寫的很巧妙
        // first出隊,firstWaiter頭部指向下一個節(jié)點烂叔,自己的nextWaiter
        if ( (firstWaiter = first.nextWaiter) == null)
            // 如果第二個節(jié)點為空愤估,則尾部也為空
            lastWaiter = null;
        // 將原來頭部first的后繼置空畅哑,help for GC
        first.nextWaiter = null;
    } while (!transferForSignal(first) &&
                (first = firstWaiter) != null);
}

// 將被喚醒的節(jié)點轉(zhuǎn)移到同步隊列
final boolean transferForSignal(Node node) {

    if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
        return false;

    Node p = enq(node);
    int ws = p.waitStatus;
    if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
        LockSupport.unpark(node.thread);
    return true;
}

signal 方法的整體流程如下:

  1. 通過 enq() 方法自旋講條件隊列中的頭節(jié)點放入 AQS 同步隊列尾部旭绒,并獲取它在 AQS 隊列中的前驅(qū)節(jié)點。

  2. 如果前驅(qū)節(jié)點的狀態(tài)是取消狀態(tài)京痢,或者設(shè)置前驅(qū)節(jié)點為 Signal 狀態(tài)失敗奶甘,就喚醒當(dāng)前節(jié)點的線程,否則節(jié)點在同步隊列的尾部祭椰,參與排隊臭家。

  3. 同步隊列中的線程被喚醒后,表示重新獲取了顯式鎖方淤,然后繼續(xù)執(zhí)行 condition.wait() 語句后面的臨界區(qū)代碼钉赁。

AQS實際應(yīng)用

JUC 的同一架構(gòu)如下圖所示。

juc總體架構(gòu).png

AQS 建立在 CAS 原子操作和 volatile 可見性變量的基礎(chǔ)之上携茂,為上層的顯式鎖橄霉、同步工具類、阻塞隊列邑蒋、線程池、并發(fā)容器按厘、Future異步工具提供線程之間同步的基礎(chǔ)設(shè)施医吊。所以,AQS在JUC框架使用很廣泛逮京。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末卿堂,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌草描,老刑警劉巖览绿,帶你破解...
    沈念sama閱讀 218,036評論 6 506
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異穗慕,居然都是意外死亡饿敲,警方通過查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,046評論 3 395
  • 文/潘曉璐 我一進(jìn)店門逛绵,熙熙樓的掌柜王于貴愁眉苦臉地迎上來怀各,“玉大人,你說我怎么就攤上這事术浪∑岸裕” “怎么了?”我有些...
    開封第一講書人閱讀 164,411評論 0 354
  • 文/不壞的土叔 我叫張陵胰苏,是天一觀的道長硕蛹。 經(jīng)常有香客問我,道長硕并,這世上最難降的妖魔是什么法焰? 我笑而不...
    開封第一講書人閱讀 58,622評論 1 293
  • 正文 為了忘掉前任,我火速辦了婚禮鲤孵,結(jié)果婚禮上壶栋,老公的妹妹穿的比我還像新娘。我一直安慰自己普监,他們只是感情好贵试,可當(dāng)我...
    茶點故事閱讀 67,661評論 6 392
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著凯正,像睡著了一般毙玻。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上廊散,一...
    開封第一講書人閱讀 51,521評論 1 304
  • 那天桑滩,我揣著相機(jī)與錄音,去河邊找鬼允睹。 笑死运准,一個胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的缭受。 我是一名探鬼主播胁澳,決...
    沈念sama閱讀 40,288評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼米者!你這毒婦竟也來了韭畸?” 一聲冷哼從身側(cè)響起宇智,我...
    開封第一講書人閱讀 39,200評論 0 276
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎胰丁,沒想到半個月后随橘,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,644評論 1 314
  • 正文 獨居荒郊野嶺守林人離奇死亡锦庸,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,837評論 3 336
  • 正文 我和宋清朗相戀三年机蔗,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片酸员。...
    茶點故事閱讀 39,953評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡蜒车,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出幔嗦,到底是詐尸還是另有隱情酿愧,我是刑警寧澤,帶...
    沈念sama閱讀 35,673評論 5 346
  • 正文 年R本政府宣布邀泉,位于F島的核電站嬉挡,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏汇恤。R本人自食惡果不足惜庞钢,卻給世界環(huán)境...
    茶點故事閱讀 41,281評論 3 329
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望因谎。 院中可真熱鬧基括,春花似錦、人聲如沸财岔。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,889評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽匠璧。三九已至桐款,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間夷恍,已是汗流浹背魔眨。 一陣腳步聲響...
    開封第一講書人閱讀 33,011評論 1 269
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機(jī)就差點兒被人妖公主榨干…… 1. 我叫王不留酿雪,地道東北人遏暴。 一個月前我還...
    沈念sama閱讀 48,119評論 3 370
  • 正文 我出身青樓,卻偏偏與公主長得像指黎,于是被迫代替她去往敵國和親朋凉。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 44,901評論 2 355

推薦閱讀更多精彩內(nèi)容