5. StampedLock

StampedLock是Java 8新增的一個(gè)讀寫(xiě)鎖贿肩,它是對(duì)ReentrantReadWriteLock的改進(jìn)。StampedLock的同步狀態(tài)包含了一個(gè)版本和模式龄寞,獲取鎖的方法返回一個(gè)stamp表示這個(gè)鎖的狀態(tài)汰规;而這些方法的 "try" 版本返回一個(gè)特殊值0表示獲取鎖失敗。鎖釋放和轉(zhuǎn)換的方法需要stamp作為參數(shù)物邑,如果stamp不符合鎖的同步狀態(tài)就會(huì)失敗溜哮。StampedLock提供了三種模式的控制:

  1. 獨(dú)占寫(xiě)模式。writeLock方法可能會(huì)在獲取共享狀態(tài)時(shí)阻塞色解,如果成功獲取鎖茂嗓,返回一個(gè)stamp,它可以作為參數(shù)被用在unlockWrite方法中以釋放寫(xiě)鎖科阎。tryWriteLock的超時(shí)與非超時(shí)版本都被提供使用述吸。當(dāng)寫(xiě)鎖被獲取,那么沒(méi)有讀鎖能夠被獲取并且所有的樂(lè)觀讀鎖驗(yàn)證都會(huì)失敗锣笨。

  2. 悲觀讀模式蝌矛。readLock方法可能會(huì)在獲取共享狀態(tài)時(shí)阻塞,如果成功獲取鎖错英,返回一個(gè)stamp入撒,它可以作為參數(shù)被用在unlockRead方法中以釋放讀鎖。tryReadLock的超時(shí)與非超時(shí)版本都被提供使用椭岩。

  3. 樂(lè)觀讀模式茅逮。tryOptimisticRead方法只有當(dāng)寫(xiě)鎖沒(méi)有被獲取時(shí)會(huì)返回一個(gè)非0的stamp。在獲取這個(gè)stamp后直到調(diào)用validate方法這段時(shí)間判哥,如果寫(xiě)鎖沒(méi)有被獲取献雅,那么validate方法將會(huì)返回true。這個(gè)模式可以被認(rèn)為是讀鎖的一個(gè)弱化版本塌计,因?yàn)樗臓顟B(tài)可能隨時(shí)被寫(xiě)鎖破壞挺身。這個(gè)樂(lè)觀模式的主要是為一些很短的只讀代碼塊的使用設(shè)計(jì),它可以降低競(jìng)爭(zhēng)并且提高吞吐量夺荒。但是瞒渠,它的使用本質(zhì)上是很脆弱的。樂(lè)觀讀的代碼區(qū)域應(yīng)當(dāng)只讀取共享數(shù)據(jù)并將它們儲(chǔ)存在局部變量中以待后來(lái)使用技扼,當(dāng)然在使用前要先驗(yàn)證這些數(shù)據(jù)是否過(guò)期伍玖,這可以使用前面提到的validate方法。在樂(lè)觀讀模式下的數(shù)據(jù)讀取可能是非常不一致的過(guò)程剿吻,因此只有當(dāng)你對(duì)數(shù)據(jù)的表示很熟悉并且重復(fù)調(diào)用validate方法來(lái)檢查數(shù)據(jù)的一致性時(shí)使用此模式窍箍。例如,當(dāng)先讀取一個(gè)對(duì)象或者數(shù)組引用,然后訪問(wèn)它的字段椰棘、元素或者方法之一時(shí)上面的步驟都是需要的纺棺。

這個(gè)類還提供了在三種模式之間轉(zhuǎn)換的輔助方法。例如邪狞,tryConvertToWriteLock方法嘗試"提升"一個(gè)模式祷蝌,如果已經(jīng)獲取了讀鎖并且此時(shí)沒(méi)有其他線程獲取讀鎖,那么這個(gè)方法返回一個(gè)合法的寫(xiě)stamp帆卓。這些方法被設(shè)計(jì)來(lái)幫助減少以“重試為主”設(shè)計(jì)時(shí)發(fā)生的代碼代碼膨脹巨朦。

示例

下面的類中描述了一些StampedLock的常用用法,它主要操作一個(gè)簡(jiǎn)單的二維點(diǎn)剑令。這個(gè)示例在沒(méi)有異常會(huì)拋出的情況下依然沿用使用try-catch塊的慣例糊啡。

class Point {

    // 成員變量
    private double x, y;

    // 鎖實(shí)例
    private final StampedLock sl = new StampedLock();

    // 排它鎖-寫(xiě)鎖(writeLock)
    void move(double deltaX, double deltaY) {
        long stamp = sl.writeLock();
        try {
            x += deltaX;
            y += deltaY;
        } finally {
            sl.unlockWrite(stamp);
        }
    }

    // 一個(gè)只讀方法
    // 其中存在樂(lè)觀讀鎖到悲觀讀鎖的轉(zhuǎn)換
    double distanceFromOrigin() {

        // 嘗試獲取樂(lè)觀讀鎖
        long stamp = sl.tryOptimisticRead();
        // 將全部變量拷貝到方法體棧內(nèi)
        double currentX = x, currentY = y;
        // 檢查在獲取到讀鎖stamp后,鎖有沒(méi)被其他寫(xiě)線程搶占
        if (!sl.validate(stamp)) {
            // 如果被搶占則獲取一個(gè)共享讀鎖(悲觀獲扔踅颉)
            stamp = sl.readLock();
            try {
                // 將全部變量拷貝到方法體棧內(nèi)
                currentX = x;
                currentY = y;
            } finally {
                // 釋放共享讀鎖
                sl.unlockRead(stamp);
            }
        }
        // 返回計(jì)算結(jié)果
        return Math.sqrt(currentX * currentX + currentY * currentY);
    }

    // 獲取讀鎖棚蓄,并嘗試轉(zhuǎn)換為寫(xiě)鎖
    void moveIfAtOrigin(double newX, double newY) {
        long stamp = sl.tryOptimisticRead();
        try {
            // 如果當(dāng)前點(diǎn)在原點(diǎn)則移動(dòng)
            while (x == 0.0 && y == 0.0) {
                // 嘗試將獲取的讀鎖升級(jí)為寫(xiě)鎖
                long ws = sl.tryConvertToWriteLock(stamp);
                // 升級(jí)成功,則更新stamp碍脏,并設(shè)置坐標(biāo)值梭依,然后退出循環(huán)
                if (ws != 0L) {
                    stamp = ws;
                    x = newX;
                    y = newY;
                    break;
                } else {
                    // 讀鎖升級(jí)寫(xiě)鎖失敗則釋放讀鎖,顯示獲取獨(dú)占寫(xiě)鎖潮酒,然后循環(huán)重試
                    sl.unlockRead(stamp);
                    stamp = sl.writeLock();
                }
            }
        } finally {
            sl.unlock(stamp);
        }
    }
}

介紹了StampedLock的基本用法后睛挚,下面開(kāi)始進(jìn)行源碼分析。

同步節(jié)點(diǎn)

StampedLock使用 long 作為同步狀態(tài)的類型急黎,它使用一個(gè)小的有限數(shù)作為讀鎖被獲取的2進(jìn)制位數(shù)(目前為7),所以當(dāng)reader的數(shù)量到達(dá)上限時(shí)侧到,使用一個(gè)額外的溢出字來(lái)表示溢出勃教。我們通過(guò)將最大的reader數(shù)量(RBITS)視作一個(gè)自旋鎖來(lái)保護(hù)同步狀態(tài)的溢出更新。

在StampedLock中使用的同步節(jié)點(diǎn)與AQS的同步節(jié)點(diǎn)有一點(diǎn)不同匠抗,下面先看它的常量代表的意義故源。

> line: 352
    /** 處理器的數(shù)量,控制自旋次數(shù) */
    private static final int NCPU = Runtime.getRuntime().availableProcessors();

    /** 被增加到同步隊(duì)列前最大的重試次數(shù)汞贸,至少為1 */
    private static final int SPINS = (NCPU > 1) ? 1 << 6 : 1;

    /** 在頭節(jié)點(diǎn)處被阻塞前的最大重試次數(shù) */
    private static final int HEAD_SPINS = (NCPU > 1) ? 1 << 10 : 1;

    /** 在再次被阻塞前的最大重試次數(shù) */
    private static final int MAX_HEAD_SPINS = (NCPU > 1) ? 1 << 16 : 1;

    /** The period for yielding when waiting for overflow spinlock */
    private static final int OVERFLOW_YIELD_RATE = 7; // must be power 2 - 1

    /** 讀鎖被獲取的次數(shù)的二進(jìn)制位數(shù) */
    private static final int LG_READERS = 7;

    // Values for lock state and stamp operations
    private static final long RUNIT = 1L;                  // 類似讀寫(xiě)鎖的RUNIT绳军,意思是每次獲取讀鎖時(shí)
                                                           // 同步狀態(tài)應(yīng)當(dāng)增加1
    private static final long WBIT  = 1L << LG_READERS;    // 寫(xiě)狀態(tài)        10000000
    private static final long RBITS = WBIT - 1L;           // 溢出保護(hù)      01111111
    private static final long RFULL = RBITS - 1L;          // 最大reader    01111110
    private static final long ABITS = RBITS | WBIT;        // 掩碼          11111111
    private static final long SBITS = ~RBITS;              // 掩碼     24(1)10000000

    /*
     * 3種模式可以通過(guò)檢查區(qū)分 (m = stamp & ABITS):
     * 寫(xiě)模式: m == WBIT
     * 樂(lè)觀讀模式: m == 0L (即使讀鎖已經(jīng)被持有)
     * 悲觀讀模式: m > 0L && m <= RFULL (同步狀態(tài)的拷貝,,但是stamp中的
     * read hold count除了用來(lái)決定是哪個(gè)模式以外不會(huì)被使用)
     *
     * This differs slightly from the encoding of state:
     * (state & ABITS) == 0L 表示鎖沒(méi)有被獲取
     * (state & ABITS) == RBITS 這是一個(gè)特殊值矢腻,表示操作讀者bit位的自旋鎖溢出
     */

    /** 鎖狀態(tài)的初始值 */
    private static final long ORIGIN = WBIT << 1;          // 1 00000000

    // Special value from cancelled acquire methods so caller can throw IE
    private static final long INTERRUPTED = 1L;

    // 節(jié)點(diǎn)狀態(tài)值; order matters
    private static final int WAITING   = -1;
    private static final int CANCELLED =  1;

    // 節(jié)點(diǎn)模式 (使用int而不是boolean以允許運(yùn)算)
    private static final int RMODE = 0;
    private static final int WMODE = 1;

其中ABITS和SBITS是作為掩碼使用的门驾,來(lái)快速檢查當(dāng)前鎖的狀態(tài),在后面讀寫(xiě)鎖的獲取中可以看到它們的使用多柑。使用ORIGIN作為初始值也是與此相關(guān)奶是,我們?cè)诤竺嬗懻摗6x狀態(tài)正常最多只可以被獲取126(RFULL)次,如果超出這個(gè)上限聂沙,那么其他讀線程獲取鎖時(shí)需要在readreaderOverflow記錄秆麸。因?yàn)?code>readreaderOverflow不是個(gè)原子變量,所以為了保證它的同步性及汉,需要進(jìn)行同步處理沮趣。

了解了常量值的含義后,開(kāi)始對(duì)同步節(jié)點(diǎn)的分析:

> line: 406
static final class WNode {
    volatile WNode prev;      // 前驅(qū)節(jié)點(diǎn)
    volatile WNode next;      // 后繼節(jié)點(diǎn)
    volatile WNode cowait;    // 讀線程鏈表
    volatile Thread thread;   // non-null while possibly parked
    volatile int status;      // 0, WAITING, or CANCELLED
    final int mode;           // RMODE or WMODE
    WNode(int m, WNode p) { mode = m; prev = p; }
}

/** 隊(duì)列頭節(jié)點(diǎn) */
private transient volatile WNode whead;
/** 隊(duì)列尾節(jié)點(diǎn) */
private transient volatile WNode wtail;

StampedLock中的同步節(jié)點(diǎn)和AQS的幾乎一樣坷随,只多加了一個(gè)cowait字段兔毒,同時(shí)狀態(tài)略有不同,還多了個(gè)判定是讀還是寫(xiě)的mode字段甸箱。關(guān)于狀態(tài)只有WAITINGCANCELLED兩種育叁,閱讀過(guò)AQS相信對(duì)此不會(huì)有疑惑,而cowait的的出現(xiàn)是對(duì)AQS的優(yōu)化芍殖。在StampedLock中豪嗽,讀節(jié)點(diǎn)不像AQS那樣每個(gè)讀線程都會(huì)構(gòu)造一個(gè)自己的節(jié)點(diǎn)并加入到同步隊(duì)列中,而是將許多連續(xù)的讀節(jié)點(diǎn)掛載在一個(gè)讀節(jié)點(diǎn)上豌骏,此時(shí)同步隊(duì)列中就不會(huì)出現(xiàn)多個(gè)連續(xù)的讀節(jié)點(diǎn)龟梦,當(dāng)此讀節(jié)點(diǎn)獲取到鎖時(shí),會(huì)喚醒在其上掛載的所有讀線程窃躲,此時(shí)其他需要增加到同步隊(duì)列中的線程無(wú)論讀寫(xiě)都會(huì)幫助頭節(jié)點(diǎn)喚醒计贰,如此就大大加快了讀線程的喚醒速度,具體實(shí)現(xiàn)會(huì)在后面進(jìn)行講解蒂窒。

寫(xiě)鎖的獲取與釋放

> line: 459
public long writeLock() {
    long next;
    // 當(dāng)沒(méi)有悲觀讀鎖或者寫(xiě)鎖已經(jīng)被獲取時(shí)躁倒,能夠獲取到寫(xiě)鎖
    return ((next = tryWriteLock()) != 0L) ? next : acquireWrite(false, 0L);
}

> line: 471
public long tryWriteLock() {
    long s;
    // 進(jìn)行掩碼運(yùn)算
    return (((s = state) & ABITS) == 0L) ? tryWriteLock(s) : 0L;
}

> line: 442
private long tryWriteLock(long s) {
    // assert (s & ABITS) == 0L;
    long next;
    if (casState(s, next = s | WBIT)) {
        VarHandle.storeStoreFence();
        return next;
    }
    return 0L;
}

我們先明確一件事,同步狀態(tài)的初始值為100000000(二進(jìn)制)洒琢,如果獲取到悲觀讀鎖秧秉,那么同步狀態(tài)會(huì)加一,ABITS的值為11111111衰抑,所以如果 state & ABITS不為0象迎,就表示有線程獲取了悲觀讀鎖或者由線程已經(jīng)獲取了寫(xiě)鎖,而如果state & ABITS為0呛踊,則只可能有線程獲取到了樂(lè)觀讀鎖砾淌,此時(shí)線程可以無(wú)視樂(lè)觀讀鎖然后獲取寫(xiě)鎖。

下面是獲取讀鎖的超時(shí)版本:

> line: 489
public long tryWriteLock(long time, TimeUnit unit)
    throws InterruptedException {
    long nanos = unit.toNanos(time);
    // 如果在嘗試獲取鎖前線程已經(jīng)被中斷谭网,那么直接拋出異常
    if (!Thread.interrupted()) {
        long next, deadline;
        // 成功獲取到寫(xiě)鎖汪厨,直接返回一個(gè) stamp
        if ((next = tryWriteLock()) != 0L)
            return next;
        // 指定超時(shí)時(shí)間 <=0,直接返回
        if (nanos <= 0L)
            return 0L;
        if ((deadline = System.nanoTime() + nanos) == 0L)
            deadline = 1L;
        if ((next = acquireWrite(true, deadline)) != INTERRUPTED)
            return next;
    }
    throw new InterruptedException();
}

> line: 1231
private long acquireWrite(boolean interruptible, long deadline) {
    WNode node = null, p;
    for (int spins = -1;;) { // spin while enqueuing
        long m, s, ns;
        // 如果當(dāng)前沒(méi)有線程獲取了悲觀讀鎖或?qū)戞i蜻底,那么嘗試獲取寫(xiě)鎖
        if ((m = (s = state) & ABITS) == 0L) {
            if ((ns = tryWriteLock(s)) != 0L)
                return ns;
        }
        else if (spins < 0)
            // 如果有線程已經(jīng)獲取了寫(xiě)鎖并且隊(duì)列還未被初始化或者為空骄崩,設(shè)置自旋次數(shù)
            spins = (m == WBIT && wtail == whead) ? SPINS : 0;
        // 進(jìn)行忙等待
        else if (spins > 0) {
            --spins;
            Thread.onSpinWait();
        }
        // 如果自旋結(jié)束還未獲取到鎖
        else if ((p = wtail) == null) { // 初始化隊(duì)列
            // 構(gòu)建一個(gè)節(jié)點(diǎn)聘鳞,并設(shè)為頭節(jié)點(diǎn)以及尾節(jié)點(diǎn)
            WNode hd = new WNode(WMODE, null);
            if (WHEAD.weakCompareAndSet(this, null, hd))
                wtail = hd;
        }
        // 如果隊(duì)列已經(jīng)被初始化,將自己增加到同步隊(duì)列中
        else if (node == null)
            node = new WNode(WMODE, p);
        // 入隊(duì)過(guò)程中如果有其他線程已經(jīng)將自己設(shè)置為尾節(jié)點(diǎn)要拂,則重新插入到尾端
        else if (node.prev != p)
            node.prev = p;
        // 嘗試將自己設(shè)置為隊(duì)列尾節(jié)點(diǎn)抠璃,成功則跳出循環(huán)
        else if (WTAIL.weakCompareAndSet(this, p, node)) {
            p.next = node;
            break;
        }
    }

    boolean wasInterrupted = false;
    for (int spins = -1;;) {
        WNode h, np, pp; int ps;
        // 如果前驅(qū)節(jié)點(diǎn)是頭節(jié)點(diǎn)
        if ((h = whead) == p) {
            // 設(shè)置自旋次數(shù)
            if (spins < 0)
                spins = HEAD_SPINS;
            // 如果小于最大頭節(jié)點(diǎn)自旋次數(shù),則將自選次數(shù)擴(kuò)大為兩倍
            else if (spins < MAX_HEAD_SPINS)
                spins <<= 1;
            for (int k = spins; k > 0; --k) { // 在隊(duì)列頭部一直自旋
                long s, ns;
                // 檢查當(dāng)前是否有線程獲取了寫(xiě)鎖或者悲觀讀鎖
                if (((s = state) & ABITS) == 0L) {
                    // 如果沒(méi)有脱惰,則嘗試獲取寫(xiě)鎖
                    if ((ns = tryWriteLock(s)) != 0L) {
                        // 獲取成功搏嗡,將自己設(shè)為頭節(jié)點(diǎn)
                        whead = node;
                        node.prev = null;
                        if (wasInterrupted)
                            Thread.currentThread().interrupt();
                        return ns;
                    }
                }
                // 如果當(dāng)前鎖已被占有,那么就自旋
                else
                    Thread.onSpinWait();
            }
        }
        // 如果頭節(jié)點(diǎn)不為空且不是當(dāng)前節(jié)點(diǎn)的前驅(qū)節(jié)點(diǎn)
        else if (h != null) { 
            WNode c; Thread w;
            // 如果頭節(jié)點(diǎn)的cowait!=null拉一,即頭節(jié)點(diǎn)是一個(gè)讀節(jié)點(diǎn)采盒,那么便幫助它釋放在此處
            // 積聚的讀線程
            while ((c = h.cowait) != null) {
                if (WCOWAIT.weakCompareAndSet(h, c, c.cowait) &&
                    (w = c.thread) != null)
                    LockSupport.unpark(w);
            }
        }
        // 如果在執(zhí)行上面操作時(shí)頭節(jié)點(diǎn)沒(méi)有被改變
        if (whead == h) {
            if ((np = node.prev) != p) {
                if (np != null)
                    (p = np).next = node;   // stale
            }
            // 將前驅(qū)節(jié)點(diǎn)設(shè)置為WAITING
            else if ((ps = p.status) == 0)
                WSTATUS.compareAndSet(p, 0, WAITING);
            // 如果前驅(qū)節(jié)點(diǎn)被取消了,則將此節(jié)點(diǎn)移除同步隊(duì)列蔚润,與更前面的節(jié)點(diǎn)建立聯(lián)系
            else if (ps == CANCELLED) {
                if ((pp = p.prev) != null) {
                    node.prev = pp;
                    pp.next = node;
                }
            }
            else {
                long time; // 0 argument to park means no timeout
                if (deadline == 0L)
                    time = 0L;
                // 如果超時(shí)時(shí)間已經(jīng)到達(dá)磅氨,取消此節(jié)點(diǎn)
                else if ((time = deadline - System.nanoTime()) <= 0L)
                    return cancelWaiter(node, node, false);
                Thread wt = Thread.currentThread();
                node.thread = wt;
                // 阻塞當(dāng)前線程
                if (p.status < 0 && (p != h || (state & ABITS) != 0L) &&
                    whead == h && node.prev == p) {
                    if (time == 0L)
                        LockSupport.park(this);
                    else
                        LockSupport.parkNanos(this, time);
                }
                node.thread = null;
                if (Thread.interrupted()) {
                    if (interruptible)
                        return cancelWaiter(node, node, true);
                    wasInterrupted = true;
                }
            }
        }
    }
}

上面獲取寫(xiě)狀態(tài)的方法很復(fù)雜,我們將其逐個(gè)塊分析嫡纠。首先此方法包含了兩個(gè)死循環(huán)烦租,第一個(gè)循環(huán)主要的操作就是將自己加入到同步隊(duì)列中,而第二個(gè)循環(huán)的主要目的是阻塞自己除盏。

下面先說(shuō)第一個(gè)循環(huán)叉橱,在這個(gè)循環(huán)中執(zhí)行時(shí),一直都在伴隨著自旋者蠕,在將自己加入到同步隊(duì)列時(shí)窃祝,也在嘗試能否獲取到同步狀態(tài)。

  1. 第一個(gè)if分支 if ((m = (s = state) & ABITS) == 0L)踱侣。 每次都會(huì)檢查同步狀態(tài)以查看是否有資格去獲取寫(xiě)鎖粪小,而失敗的原因只會(huì)是與其他線程競(jìng)爭(zhēng)寫(xiě)鎖時(shí)失敗了,自此以后它就沒(méi)有資格去獲取了泻仙,直到鎖再次為空糕再,這就是第一個(gè)if分支的用處。
  2. 第二個(gè)if分支 spins < 0玉转。 如果第一個(gè)if判斷失敗,那么證明鎖已經(jīng)被其他線程持有了殴蹄,不管是讀鎖還是寫(xiě)鎖究抓。此時(shí)設(shè)置自旋次數(shù),準(zhǔn)備自旋獲取鎖袭灯,這也是StampedLock相對(duì)于過(guò)去的ReentrantLock做出的改變刺下,因?yàn)楹芏鄷r(shí)候在自旋的這段時(shí)間內(nèi)其他線程會(huì)釋放鎖,所以此時(shí)就能更快的獲取到鎖稽荧,而非通過(guò)等待/通知機(jī)制等待喚醒橘茉。當(dāng)然,如果是其他線程獲取了悲觀讀鎖,那么就將自旋次數(shù)設(shè)置為0畅卓,以讓它們能夠有時(shí)間讀取共享數(shù)擅腰;或者當(dāng)前同步隊(duì)列中已經(jīng)有不止一個(gè)節(jié)點(diǎn)了,那么根據(jù)FIFO原則翁潘,需要讓前面的線程先獲取寫(xiě)鎖趁冈,所以自旋次數(shù)也為0.
  3. 第三個(gè)if分支 spins > 0。 在第二個(gè)if分支設(shè)置了自旋次數(shù)后拜马,線程便可以開(kāi)始自旋了渗勘,每自旋一次spins減1。經(jīng)過(guò)這兩步之后俩莽,主要的自旋過(guò)程已經(jīng)完成旺坠,后面在加入到同步隊(duì)列中時(shí),只會(huì)每次循環(huán)時(shí)再嘗試一次扮超,相當(dāng)于多幾次嘗試的機(jī)會(huì)取刃。
  4. 第四個(gè)if分支 (p = wtail) == null。 當(dāng)自旋次數(shù)用完后線程還沒(méi)有獲取到寫(xiě)鎖瞒津,那么就嘗試將自己增加到同步隊(duì)列中蝉衣。如果隊(duì)列還沒(méi)有被初始化,就構(gòu)造一個(gè)節(jié)點(diǎn)并將其設(shè)置為頭節(jié)點(diǎn)以及尾節(jié)點(diǎn)巷蚪。
  5. 第五個(gè)if分支 node == null病毡。 如果同步隊(duì)列已經(jīng)被初始化了,那么就構(gòu)造一個(gè)寫(xiě)節(jié)點(diǎn)屁柏,前驅(qū)節(jié)點(diǎn)指向當(dāng)前的尾節(jié)點(diǎn)啦膜。
  6. 第六個(gè)if分支 node.prev != p。在第五步后淌喻,這個(gè)節(jié)點(diǎn)并沒(méi)有被真正增加到同步隊(duì)列中僧家,因?yàn)榍膀?qū)節(jié)點(diǎn)的next字段還沒(méi)有指向它。在此期間裸删,可能有其他節(jié)點(diǎn)在它之前成功插入到了同步隊(duì)列中八拱,此時(shí)隊(duì)列的尾節(jié)點(diǎn)已經(jīng)被改變,而不是在線程堆棧中保存的尾節(jié)點(diǎn)p涯塔,所以將前驅(qū)節(jié)點(diǎn)指向新的尾節(jié)點(diǎn)肌稻,避免隊(duì)列混亂。
  7. 第七個(gè)分支 WTAIL.weakCompareAndSet(this, p, node)匕荸。 如果尾節(jié)點(diǎn)并未發(fā)生改變爹谭,那么便嘗試CAS將自己設(shè)置為隊(duì)列的尾節(jié)點(diǎn)并使前驅(qū)節(jié)點(diǎn)的next字段指向自己,如果成功了榛搔,就跳出此循環(huán)诺凡,否則就重新設(shè)置前驅(qū)節(jié)點(diǎn)然后再次嘗試东揣。至此,第一個(gè)循環(huán)結(jié)束腹泌。

第一個(gè)循環(huán)完成之后嘶卧,當(dāng)前線程已經(jīng)成功插入到了同步隊(duì)列中,第二個(gè)循環(huán)可能依然會(huì)自旋獲取同步狀態(tài)真屯,如果不就將自己阻塞脸候。

  1. 第一個(gè)if分支 (h = whead) == p。 此時(shí)p保存的還是在第一個(gè)循環(huán)中記錄的上一個(gè)尾節(jié)點(diǎn)绑蔫,即當(dāng)前節(jié)點(diǎn)的前驅(qū)結(jié)點(diǎn)运沦。如果前驅(qū)結(jié)點(diǎn)為頭節(jié)點(diǎn),那么便可以再次嘗試獲取同步狀態(tài)配深,因?yàn)榇藭r(shí)有很大幾率能夠成功携添。下面的if分支建立在此if分支的基礎(chǔ)上。
  • 第一個(gè)if分支 spins < 0篓叶。 首先設(shè)置自旋次數(shù)烈掠,因?yàn)榇藭r(shí)很有可能成功獲取同步狀態(tài),所以相比第一個(gè)循環(huán) 1<<6 == 64 的次數(shù)缸托,現(xiàn)在將自旋次數(shù)設(shè)置的更多左敌,為 1<<10 == 1024 次。
  • 第二個(gè)分支 spins < MAX_HEAD_SPINS 俐镐。 如果在上一個(gè)大循環(huán)中矫限,在給定自旋次數(shù)內(nèi)仍未能夠成功獲取同步狀態(tài),那么就再次擴(kuò)大自旋次數(shù)到 1<<16 == 65536 次佩抹。

在設(shè)置了自旋次數(shù)后叼风,開(kāi)始嘗試獲取同步狀態(tài)。此處的for循環(huán)的唯一功能就是獲取同步狀態(tài)棍苹,如果鎖目前被其他線程占有无宿,那么進(jìn)入忙等待,否則執(zhí)行tryWriteLock方法進(jìn)行CAS競(jìng)爭(zhēng)枢里。

  1. 第二個(gè)if分支 h != null孽鸡。 如果前驅(qū)節(jié)點(diǎn)不是頭節(jié)點(diǎn),并且頭節(jié)點(diǎn)是讀節(jié)點(diǎn)栏豺,那么就幫助喚醒積聚的等待線程梭灿。注意,如果讀線程獲取鎖失敗冰悠,當(dāng)同步隊(duì)列的尾節(jié)點(diǎn)是讀節(jié)點(diǎn)時(shí),它便不會(huì)將自己插入到同步隊(duì)列中配乱,而是直接掛載在那個(gè)尾節(jié)點(diǎn)溉卓,當(dāng)那個(gè)節(jié)點(diǎn)成為頭節(jié)點(diǎn)時(shí)皮迟,就會(huì)喚醒在其上掛載的所有讀線程,此處就是幫助讀節(jié)點(diǎn)更快喚醒那些線程桑寨。

在完成了上面兩個(gè)分支的工作后(不管做的是哪一個(gè))伏尼,如果頭節(jié)點(diǎn)沒(méi)有發(fā)生變化片拍,那么就執(zhí)行下面的操作重抖,否則進(jìn)入下一個(gè)循環(huán)重新開(kāi)始梗搅。

  • 第一個(gè)if分支 (np = node.prev) != p铜犬。 如果前驅(qū)節(jié)點(diǎn)發(fā)生了改變栗弟,那么使當(dāng)前的前驅(qū)節(jié)點(diǎn)的next字段指向自己宴胧。
  • 第二個(gè)if分支 (ps = p.status) == 0蒂破。如果前驅(qū)節(jié)點(diǎn)未發(fā)生改變猿妈,且它的狀態(tài)為初始狀態(tài)肢藐,那么就嘗試將它的狀態(tài)設(shè)置為WATING故河,因?yàn)榇藭r(shí)前驅(qū)節(jié)點(diǎn)并不是頭節(jié)點(diǎn)并且已經(jīng)將自己阻塞或者取消了。
  • 第三個(gè)if分支 ps == CANCELLED吆豹。 如果前驅(qū)節(jié)點(diǎn)被取消了鱼的,那么就將此節(jié)點(diǎn)從同步隊(duì)列中移除出去,然后設(shè)置更前面的節(jié)點(diǎn)的next字段指向自己痘煤。
  • 第四個(gè)if分支凑阶。 根據(jù)是否超時(shí)阻塞當(dāng)前線程,如果未使用超時(shí)衷快,那么就一直阻塞直到其他線程喚醒宙橱,否則就進(jìn)行超時(shí)處理。

雖然此方法看起來(lái)比較復(fù)雜烦磁,但是邏輯清晰养匈,還是比較容易理解。其復(fù)雜性主要就是自旋優(yōu)化導(dǎo)致的都伪。

下面是此方法的流程圖:

loop1
loop2

下面是幾種寫(xiě)節(jié)點(diǎn)加入同步隊(duì)列的常見(jiàn)情況:

下面是獲取鎖的響應(yīng)中斷版本:

> line: 517
public long writeLockInterruptibly() throws InterruptedException {
    long next;
    if (!Thread.interrupted() &&
        (next = acquireWrite(true, 0L)) != INTERRUPTED)
        return next;
    throw new InterruptedException();
}

整體上一樣呕乎,只是多了個(gè)響應(yīng)中斷。

鎖釋放時(shí)需要將加鎖時(shí)返回的stamp作為參數(shù)陨晶。

> line: 678
public void unlockWrite(long stamp) {
    // 如果同步狀態(tài)和stamp不相符猬仁,拋出異常
    if (state != stamp || (stamp & WBIT) == 0L)
        throw new IllegalMonitorStateException();
    unlockWriteInternal(stamp);
}

> line: 661
private long unlockWriteInternal(long s) {
    long next; WNode h;
    // 更改同步狀態(tài)
    STATE.setVolatile(this, next = unlockWriteState(s));
    // 如果同步隊(duì)列中有節(jié)點(diǎn)等待,并且自旋獲取同步狀態(tài)失敗被阻塞(status!=0)先誉,釋放它
    if ((h = whead) != null && h.status != 0)
        release(h);
    return next;
}

> line: 657
// 返回一個(gè)未加鎖狀態(tài)湿刽,增加一個(gè)版本并避免同步狀態(tài)為 0
private static long unlockWriteState(long s) {
    return ((s += WBIT) == 0L) ? ORIGIN : s;
}

> line: 1208
private void release(WNode h) {
    if (h != null) {
        WNode q; Thread w;
        WSTATUS.compareAndSet(h, WAITING, 0);
        // 如果第一個(gè)等待節(jié)點(diǎn)為null或者被取消了,那么從隊(duì)尾開(kāi)始逆向?qū)ふ?        if ((q = h.next) == null || q.status == CANCELLED) {
            for (WNode t = wtail; t != null && t != h; t = t.prev)
                if (t.status <= 0)
                    q = t;
        }
        if (q != null && (w = q.thread) != null)
            LockSupport.unpark(w);
    }
}

從寫(xiě)鎖釋放的源碼中可以看出褐耳,寫(xiě)鎖釋放的時(shí)候诈闺,同步狀態(tài)會(huì)增加WBIT,相當(dāng)于記錄寫(xiě)鎖總共被獲取的次數(shù)铃芦,當(dāng)其增加到上限溢出時(shí)雅镊,重置同步狀態(tài)為ORIGIN襟雷。

讀鎖的獲取與釋放

> line: 532
public long readLock() {
    long s, next;
    // 如果同步隊(duì)列為空并且沒(méi)有線程獲取了寫(xiě)鎖并且CAS成功,返回stamp
    return (whead == wtail
            && ((s = state) & ABITS) < RFULL
            && casState(s, next = s + RUNIT))
        ? next
        : acquireRead(false, 0L);
}

> line: 1339
private long acquireRead(boolean interruptible, long deadline) {
    boolean wasInterrupted = false;
    WNode node = null, p;
    for (int spins = -1;;) {
        WNode h;
        // 如果同步隊(duì)列為空或者只有一個(gè)頭節(jié)點(diǎn)
        if ((h = whead) == (p = wtail)) {
            for (long m, s, ns;;) {
                // 如果沒(méi)有線程獲得寫(xiě)鎖并且讀鎖數(shù)量未達(dá)到上限仁烹,進(jìn)行CAS競(jìng)爭(zhēng)
                // 如果讀鎖數(shù)量達(dá)到上限耸弄,嘗試增加讀鎖數(shù)量溢出,如果成功卓缰,返回
                if ((m = (s = state) & ABITS) < RFULL ?
                    casState(s, ns = s + RUNIT) :
                    (m < WBIT && (ns = tryIncReaderOverflow(s)) != 0L)) {
                    if (wasInterrupted)
                        Thread.currentThread().interrupt();
                    return ns;
                }
                // 如果有線程獲取了寫(xiě)鎖
                else if (m >= WBIT) {
                    // 如果自旋次數(shù)大于0计呈,進(jìn)行忙等待
                    if (spins > 0) {
                        --spins;
                        Thread.onSpinWait();
                    }
                    else {
                        // 如果自旋次數(shù)用完,并且同步隊(duì)列未發(fā)生變化征唬,跳出循環(huán)
                        if (spins == 0) {
                            WNode nh = whead, np = wtail;
                            if ((nh == h && np == p) || (h = nh) != (p = np))
                                break;
                        }
                        // 設(shè)置自旋次數(shù)
                        spins = SPINS;
                    }
                }
            }
        }
        if (p == null) { // 初始化隊(duì)列
            WNode hd = new WNode(WMODE, null);
            if (WHEAD.weakCompareAndSet(this, null, hd))
                wtail = hd;
        }
        // 如果隊(duì)列已經(jīng)初始化捌显,創(chuàng)建一個(gè)讀節(jié)點(diǎn),插入到隊(duì)列尾部
        else if (node == null)
            node = new WNode(RMODE, p);
        // 如果隊(duì)列只有一個(gè)節(jié)點(diǎn)或者尾節(jié)點(diǎn)為寫(xiě)節(jié)點(diǎn)
        else if (h == p || p.mode != RMODE) {
            // 如果在自旋期間有其他節(jié)點(diǎn)成功插入到隊(duì)列尾部鳍鸵,將prev字段設(shè)置為新的尾節(jié)點(diǎn)
            if (node.prev != p)
                node.prev = p;
            // 否則苇瓣,設(shè)置當(dāng)前節(jié)點(diǎn)為尾節(jié)點(diǎn),并使前驅(qū)節(jié)點(diǎn)的next字段指向自己偿乖,跳出循環(huán)
            else if (WTAIL.weakCompareAndSet(this, p, node)) {
                p.next = node;
                break;
            }
        }
        // 如果尾節(jié)點(diǎn)為讀節(jié)點(diǎn)击罪,將其cowait設(shè)置為當(dāng)前節(jié)點(diǎn),注意這是一個(gè)類似鏈表插入的操作
        else if (!WCOWAIT.compareAndSet(p, node.cowait = p.cowait, node))
            node.cowait = null;
        // 如果上一步CAS成功
        else {
            for (;;) {
                WNode pp, c; Thread w;
                // 如果頭節(jié)點(diǎn)為讀節(jié)點(diǎn)贪薪,嘗試釋放此節(jié)點(diǎn)上掛載的所有節(jié)點(diǎn)
                if ((h = whead) != null && (c = h.cowait) != null &&
                    WCOWAIT.compareAndSet(h, c, c.cowait) &&
                    (w = c.thread) != null) // help release
                    LockSupport.unpark(w);
                // 響應(yīng)中斷
                if (Thread.interrupted()) {
                    if (interruptible)
                        return cancelWaiter(node, p, true);
                    wasInterrupted = true;
                }
                // 如果掛載節(jié)點(diǎn)的前驅(qū)是頭節(jié)點(diǎn)或者掛載節(jié)點(diǎn)為頭節(jié)點(diǎn)媳禁,即現(xiàn)在可以嘗試獲取讀鎖
                if (h == (pp = p.prev) || h == p || pp == null) {
                    long m, s, ns;
                    do {
                        if ((m = (s = state) & ABITS) < RFULL ?
                            casState(s, ns = s + RUNIT) :
                            (m < WBIT &&
                             (ns = tryIncReaderOverflow(s)) != 0L)) {
                            if (wasInterrupted)
                                Thread.currentThread().interrupt();
                            return ns;
                        }
                    } while (m < WBIT);
                }
                // 如果在前面執(zhí)行完隊(duì)列頭節(jié)點(diǎn)未發(fā)生變化
                if (whead == h && p.prev == pp) {
                    long time;
                    if (pp == null || h == p || p.status > 0) {
                        node = null; // throw away
                        break;
                    }
                    // 計(jì)算超時(shí)時(shí)間,并阻塞自己
                    if (deadline == 0L)
                        time = 0L;
                    else if ((time = deadline - System.nanoTime()) <= 0L) {
                        if (wasInterrupted)
                            Thread.currentThread().interrupt();
                        return cancelWaiter(node, p, false);
                    }
                    Thread wt = Thread.currentThread();
                    node.thread = wt;
                    if ((h != pp || (state & ABITS) == WBIT) &&
                        whead == h && p.prev == pp) {
                        if (time == 0L)
                            LockSupport.park(this);
                        else
                            LockSupport.parkNanos(this, time);
                    }
                    node.thread = null;
                }
            }
        }
    }

    for (int spins = -1;;) {
        WNode h, np, pp; int ps;
        // 如果前驅(qū)節(jié)點(diǎn)是頭節(jié)點(diǎn)
        if ((h = whead) == p) {
            // 設(shè)置自旋數(shù)
            if (spins < 0)
                spins = HEAD_SPINS;
            else if (spins < MAX_HEAD_SPINS)
                spins <<= 1;
            for (int k = spins;;) { // 在頭部自旋画切,嘗試獲取鎖
                long m, s, ns;
                if ((m = (s = state) & ABITS) < RFULL ?
                    casState(s, ns = s + RUNIT) :
                    (m < WBIT && (ns = tryIncReaderOverflow(s)) != 0L)) {
                    WNode c; Thread w;
                    whead = node;
                    node.prev = null;
                    while ((c = node.cowait) != null) {
                        if (WCOWAIT.compareAndSet(node, c, c.cowait) &&
                            (w = c.thread) != null)
                            LockSupport.unpark(w);
                    }
                    if (wasInterrupted)
                        Thread.currentThread().interrupt();
                    return ns;
                }
                else if (m >= WBIT && --k <= 0)
                    break;
                else
                    Thread.onSpinWait();
            }
        }
        // 如果頭節(jié)點(diǎn)不為空并且前驅(qū)節(jié)點(diǎn)不為頭節(jié)點(diǎn)竣稽,如果頭節(jié)點(diǎn)是讀節(jié)點(diǎn)
        // 幫助喚醒掛載的讀線程
        else if (h != null) {
            WNode c; Thread w;
            while ((c = h.cowait) != null) {
                if (WCOWAIT.compareAndSet(h, c, c.cowait) &&
                    (w = c.thread) != null)
                    LockSupport.unpark(w);
            }
        }
        // 如果頭節(jié)點(diǎn)未發(fā)生改變
        if (whead == h) {
            // 如果前驅(qū)節(jié)點(diǎn)發(fā)生了改變,更新節(jié)點(diǎn)引用
            if ((np = node.prev) != p) {
                if (np != null)
                    (p = np).next = node;   // stale
            }
            // 如果前驅(qū)節(jié)點(diǎn)狀態(tài)是初始狀態(tài)霍弹,將其設(shè)置為WAITING
            else if ((ps = p.status) == 0)
                WSTATUS.compareAndSet(p, 0, WAITING);
            // 如果前驅(qū)節(jié)點(diǎn)被取消毫别,移除此節(jié)點(diǎn)并更新節(jié)點(diǎn)引用
            else if (ps == CANCELLED) {
                if ((pp = p.prev) != null) {
                    node.prev = pp;
                    pp.next = node;
                }
            }
            // 否則,阻塞自己
            else {
                long time;
                if (deadline == 0L)
                    time = 0L;
                else if ((time = deadline - System.nanoTime()) <= 0L)
                    return cancelWaiter(node, node, false);
                Thread wt = Thread.currentThread();
                node.thread = wt;
                if (p.status < 0 &&
                    (p != h || (state & ABITS) == WBIT) &&
                    whead == h && node.prev == p) {
                        if (time == 0L)
                            LockSupport.park(this);
                        else
                            LockSupport.parkNanos(this, time);
                }
                node.thread = null;
                if (Thread.interrupted()) {
                    if (interruptible)
                        return cancelWaiter(node, node, true);
                    wasInterrupted = true;
                }
            }
        }
    }
}

> line: 1157
private long tryIncReaderOverflow(long s) {
    // assert (s & ABITS) >= RFULL;
    // 如果剛好到達(dá)上限典格,則將同步狀態(tài)的最后七位設(shè)置為RBITS岛宦,用作屏蔽其他讀線程以原子更新readOverflow
    if ((s & ABITS) == RFULL) {
        if (casState(s, s | RBITS)) {
            // readerOverflow記錄當(dāng)讀線程數(shù)量飽和時(shí)的溢出數(shù)
            ++readerOverflow;
            // 更新完readerOverflow后將state重置回RFULL
            // 此時(shí)可以讓下一個(gè)線程來(lái)更新readerOverflow
            STATE.setVolatile(this, s);
            return s;
        }
    }
    // 否則,如果隨機(jī)數(shù)&111==0耍缴,調(diào)度線程
    else if ((LockSupport.nextSecondarySeed() & OVERFLOW_YIELD_RATE) == 0)
        Thread.yield();
    // 否則進(jìn)行忙等待
    else
        Thread.onSpinWait();
    return 0L;
}

讀狀態(tài)的獲取相比寫(xiě)狀態(tài)更加復(fù)雜砾肺,主要原因便是StampedLock的讀節(jié)點(diǎn)不像ReentrantReadWriteLock那樣每個(gè)線程構(gòu)造一個(gè)節(jié)點(diǎn),而是如果當(dāng)前尾節(jié)點(diǎn)是寫(xiě)節(jié)點(diǎn)防嗡,那么就構(gòu)造一個(gè)讀節(jié)點(diǎn)加入隊(duì)列尾端变汪,如果尾節(jié)點(diǎn)是讀節(jié)點(diǎn),就不會(huì)再將新構(gòu)造的讀節(jié)點(diǎn)加入到隊(duì)列尾部了蚁趁,而是直接掛載在前面那個(gè)讀節(jié)點(diǎn)上裙盾,當(dāng)那個(gè)節(jié)點(diǎn)成為頭節(jié)點(diǎn)時(shí),喚醒掛載在上面的所有讀線程。

整個(gè)方法依然被分為兩個(gè)大循環(huán)闷煤,不過(guò)這兩個(gè)循環(huán)的功能交錯(cuò)童芹,下面一個(gè)一個(gè)進(jìn)行分析,首先說(shuō)第一個(gè)大循環(huán)鲤拿。

  1. 第一個(gè)if (h = whead) == (p = wtail)。 如果隊(duì)列還沒(méi)有被初始化或者只有一個(gè)頭節(jié)點(diǎn)署咽,那么嘗試獲取讀狀態(tài)近顷。因?yàn)榇藭r(shí)很有可能獲取到讀鎖,所以即使現(xiàn)在讀線程數(shù)量已經(jīng)到達(dá)上限宁否,依然繼續(xù)嘗試窒升。但如果已經(jīng)有線程獲取了寫(xiě)鎖,那么就自旋 1<<6 == 64 次進(jìn)行嘗試慕匠。如果自旋結(jié)束還沒(méi)有獲取到鎖饱须,那么退出獲取鎖的循環(huán)。
  2. 第一個(gè)if分支 p == null台谊。 如果當(dāng)前隊(duì)列沒(méi)有被初始化蓉媳,那么就初始化隊(duì)列,此步寫(xiě)鎖獲取中已經(jīng)說(shuō)過(guò)锅铅,不再贅述酪呻,后面與寫(xiě)鎖獲取類似的功能都不在贅述。
  3. 第二個(gè)if分支 node == null盐须。 構(gòu)造一個(gè)讀節(jié)點(diǎn)玩荠,設(shè)置prev字段為隊(duì)尾。
  4. 第三個(gè)if分支 h == p || p.mode != RMODE贼邓。 如果前驅(qū)節(jié)點(diǎn)為頭節(jié)點(diǎn)或者前驅(qū)節(jié)點(diǎn)的寫(xiě)模式阶冈,如果當(dāng)前節(jié)點(diǎn)的prev字段被改變,那么就重新設(shè)置prev字段塑径,否則將此節(jié)點(diǎn)設(shè)置為隊(duì)尾女坑,并跳出第一個(gè)大循環(huán)。
  5. 第四個(gè)if分支 !WCOWAIT.compareAndSet(p, node.cowait = p.cowait, node)晓勇。 如果前驅(qū)結(jié)點(diǎn)不是頭節(jié)點(diǎn)并且是一個(gè)讀節(jié)點(diǎn)堂飞,那么嘗試將自己掛載到此節(jié)點(diǎn)上。
  6. 第五個(gè)if分支绑咱。 如果第五步CAS成功绰筛,則進(jìn)入最后一個(gè)分支,否則循環(huán)CAS設(shè)置描融。這個(gè)if分支由多個(gè)if語(yǔ)句組成铝噩,下面逐個(gè)分析。
  • 第一個(gè)if (h = whead) != null && (c = h.cowait) != null && WCOWAIT.compareAndSet(h, c, c.cowait) && (w = c.thread) != null窿克。 如果頭節(jié)點(diǎn)是一個(gè)讀節(jié)點(diǎn)骏庸,那么幫助它喚醒掛載在其上等待的讀線程毛甲。
  • 第二個(gè)if Thread.interrupted()。 如果響應(yīng)中斷具被,取消自己玻募,否則只記錄被中斷了。
  • 第三個(gè)if h == (pp = p.prev) || h == p || pp == null一姿。 如果節(jié)點(diǎn)的前驅(qū)(當(dāng)前節(jié)點(diǎn)掛載在一個(gè)讀節(jié)點(diǎn)上七咧,相當(dāng)于是一個(gè)節(jié)點(diǎn))是頭節(jié)點(diǎn)或者節(jié)點(diǎn)是頭節(jié)點(diǎn),再次嘗試獲取鎖直到有線程獲取了寫(xiě)鎖叮叹。
  • 第四個(gè)if whead == h && p.prev == pp艾栋。 如果節(jié)點(diǎn)引用未發(fā)生變化,那么嘗試阻塞自己蛉顽,因?yàn)橐呀?jīng)給予了足夠的機(jī)會(huì)嘗試獲取寫(xiě)鎖蝗砾,或者離能夠獲取到鎖還很遙遠(yuǎn),阻塞防止浪費(fèi)cpu資源携冤。如果被掛載的節(jié)點(diǎn)被取消了悼粮,那么丟棄自己,重新開(kāi)始噪叙,否則掛載節(jié)點(diǎn)將會(huì)一直阻塞下去矮锈。

第二個(gè)大循環(huán):

  1. (h = whead) == p。 如果前驅(qū)節(jié)點(diǎn)是頭節(jié)點(diǎn)睁蕾,那么再次自旋嘗試獲取同步狀態(tài)苞笨。如果獲取成功,將自己設(shè)置為頭節(jié)點(diǎn)子眶,并喚醒掛載節(jié)點(diǎn)瀑凝。
  2. h != null。 如果頭節(jié)點(diǎn)不為空且是讀節(jié)點(diǎn)臭杰,幫助喚醒線程粤咪。
  3. whead == h。如果經(jīng)過(guò)上面的步驟后頭節(jié)點(diǎn)未發(fā)生變化渴杆,那么就將自己阻塞寥枝。

下面是此方法的流程圖:

loop 1
loop 2

下面展示一下讀線程插入到同步隊(duì)列的幾種常見(jiàn)情況,注意讀節(jié)點(diǎn)與寫(xiě)節(jié)點(diǎn)的內(nèi)部構(gòu)造是一樣的磁奖,此處省略寫(xiě)節(jié)點(diǎn)的cowait字段囊拜。

StampedLock-read1.jpg
StampedLock-read2.jpg
StampedLock-read3.jpg

關(guān)于讀鎖獲取的其他版本大體差不多,不再贅述比搭,此處只貼出源碼冠跷。

> line: 549
public long tryReadLock() {
    long s, m, next;
    while ((m = (s = state) & ABITS) != WBIT) {
        if (m < RFULL) {
            if (casState(s, next = s + RUNIT))
                return next;
        }
        else if ((next = tryIncReaderOverflow(s)) != 0L)
            return next;
    }
    return 0L;
}

> line: 576
public long tryReadLock(long time, TimeUnit unit)
    throws InterruptedException {
    long s, m, next, deadline;
    long nanos = unit.toNanos(time);
    if (!Thread.interrupted()) {
        if ((m = (s = state) & ABITS) != WBIT) {
            if (m < RFULL) {
                if (casState(s, next = s + RUNIT))
                    return next;
            }
            else if ((next = tryIncReaderOverflow(s)) != 0L)
                return next;
        }
        if (nanos <= 0L)
            return 0L;
        if ((deadline = System.nanoTime() + nanos) == 0L)
            deadline = 1L;
        if ((next = acquireRead(true, deadline)) != INTERRUPTED)
            return next;
    }
    throw new InterruptedException();
}

> line: 610
public long readLockInterruptibly() throws InterruptedException {
    long s, next;
    if (!Thread.interrupted()
        // bypass acquireRead on common uncontended case
        && ((whead == wtail
             && ((s = state) & ABITS) < RFULL
             && casState(s, next = s + RUNIT))
            ||
            (next = acquireRead(true, 0L)) != INTERRUPTED))
        return next;
    throw new InterruptedException();
}

讀鎖釋放代碼如下:

> line: 693
public void unlockRead(long stamp) {
    long s, m; WNode h;
    while (((s = state) & SBITS) == (stamp & SBITS)
           && (stamp & RBITS) > 0L
           && ((m = s & RBITS) > 0L)) {
        if (m < RFULL) {
            if (casState(s, s - RUNIT)) {
                if (m == RUNIT && (h = whead) != null && h.status != 0)
                    release(h);
                return;
            }
        }
        else if (tryDecReaderOverflow(s) != 0L)
            return;
    }
    throw new IllegalMonitorStateException();
}

> line: 1179
private long tryDecReaderOverflow(long s) {
    // assert (s & ABITS) >= RFULL;
    if ((s & ABITS) == RFULL) {
        if (casState(s, s | RBITS)) {
            int r; long next;
            // 優(yōu)先減少溢出記錄
            if ((r = readerOverflow) > 0) {
                readerOverflow = r - 1;
                next = s;
            }
            else
                next = s - RUNIT;
            STATE.setVolatile(this, next);
            return next;
        }
    }
    else if ((LockSupport.nextSecondarySeed() & OVERFLOW_YIELD_RATE) == 0)
        Thread.yield();
    else
        Thread.onSpinWait();
    return 0L;
}

樂(lè)觀讀鎖的獲取

樂(lè)觀讀鎖只需要獲取,不需要釋放。只要沒(méi)有線程獲取寫(xiě)鎖蜜托,那么就能獲取到樂(lè)觀讀鎖抄囚。當(dāng)然,當(dāng)使用由樂(lè)觀讀鎖讀取的共享數(shù)據(jù)時(shí)橄务,需要先調(diào)用validate方法驗(yàn)證數(shù)據(jù)是否過(guò)期幔托,只需要判斷從獲取數(shù)據(jù)到現(xiàn)在這段時(shí)間內(nèi)是否有線程獲取讀鎖即可。從此可以看出仪糖,樂(lè)觀讀鎖可以很好的提高吞吐量柑司,但是它的使用也非常不穩(wěn)定,如果對(duì)數(shù)據(jù)的了解與控制不清晰锅劝,那么很容易出現(xiàn)臟讀問(wèn)題。

> line: 629
public long tryOptimisticRead() {
    long s;
    // 只要沒(méi)有線程獲取寫(xiě)鎖蟆湖,那么就能成功獲取樂(lè)觀讀鎖
    return (((s = state) & WBIT) == 0L) ? (s & SBITS) : 0L;
}

> line: 646
public boolean validate(long stamp) {
    VarHandle.acquireFence();
    // 驗(yàn)證是否有線程獲取到寫(xiě)鎖
    return (stamp & SBITS) == (state & SBITS);
}

讀寫(xiě)鎖之間的轉(zhuǎn)換

除了提供了樂(lè)觀讀鎖外故爵,StampedLock相比ReentrantReadWriteLock還提供了讀寫(xiě)鎖之間的轉(zhuǎn)換,而ReentrantReadWriteLock只提供了寫(xiě)鎖到讀鎖的鎖降級(jí)特性隅津。

  1. 嘗試轉(zhuǎn)換為寫(xiě)鎖

此時(shí)有三種情況诬垂,樂(lè)觀讀鎖轉(zhuǎn)換為讀鎖,悲觀讀鎖轉(zhuǎn)換為讀鎖伦仍,寫(xiě)鎖轉(zhuǎn)換為寫(xiě)鎖结窘。

  • 樂(lè)觀讀鎖的stamp & ABITS一定是0,可通過(guò)樂(lè)觀讀鎖的獲取方法發(fā)現(xiàn)這個(gè)特點(diǎn)充蓝。當(dāng)嘗試轉(zhuǎn)換時(shí)隧枫,會(huì)通過(guò)tryConvertToWriteLock方法的第一個(gè)if語(yǔ)句進(jìn)行判斷能否成功轉(zhuǎn)換。其中的(a!=0)的作用是:現(xiàn)在state&ABITS == 0谓苟,即沒(méi)有線程獲取了寫(xiě)鎖或悲觀讀鎖官脓,但是提供的stamp不符合這個(gè)結(jié)論,所以stamp檢驗(yàn)失敗涝焙。
  • 雖然不會(huì)有人想要在獲取寫(xiě)鎖時(shí)依然調(diào)用此方法卑笨,但是為了預(yù)防這種情況,必須要是當(dāng)前獲取了寫(xiě)鎖的線程調(diào)用此方法才能成功仑撞。
  • 當(dāng)悲觀讀鎖想要轉(zhuǎn)換為寫(xiě)鎖時(shí)赤兴,必須此時(shí)只有自己一個(gè)線程獲取了悲觀讀鎖才可以,否則此線程在修改數(shù)據(jù)時(shí)隧哮,其他的讀線程是無(wú)法感知到數(shù)據(jù)的更新的桶良。
>line: 739
public long tryConvertToWriteLock(long stamp) {
    long a = stamp & ABITS, m, s, next;
    // 驗(yàn)證stamp
    while (((s = state) & SBITS) == (stamp & SBITS)) {
        // 當(dāng)前沒(méi)有悲觀讀鎖或?qū)戞i
        if ((m = s & ABITS) == 0L) {
            // 但是提供的stamp記錄不相符,失敗
            if (a != 0L)
                break;
            // 嘗試獲取寫(xiě)鎖近迁,成功則返回
            if ((next = tryWriteLock(s)) != 0L)
                return next;
        }
        // 如果當(dāng)前線程是已經(jīng)獲取了寫(xiě)鎖的線程
        else if (m == WBIT) {
            if (a != m)
                break;
            return stamp;
        }
        // 如果此時(shí)只有自己一個(gè)讀線程獲取了悲觀讀鎖
        else if (m == RUNIT && a != 0L) {
            if (casState(s, next = s - RUNIT + WBIT)) {
                VarHandle.storeStoreFence();
                return next;
            }
        }
        else
            break;
    }
    return 0L;
}

  1. 嘗試轉(zhuǎn)換為悲觀讀鎖

嘗試轉(zhuǎn)換為悲觀讀鎖依然有三種情況艺普。

  • 寫(xiě)鎖轉(zhuǎn)換為悲觀讀鎖。寫(xiě)鎖有資格轉(zhuǎn)換為讀鎖,這并不會(huì)引發(fā)問(wèn)題歧譬,只要提供的stamp符合當(dāng)前的寫(xiě)鎖state即可(注意:寫(xiě)鎖每次釋放時(shí)都會(huì)更新版本岸浑,所以每個(gè)獲取寫(xiě)鎖的state都不會(huì)相同)。
  • 樂(lè)觀讀鎖轉(zhuǎn)換為悲觀讀鎖瑰步。只要當(dāng)前沒(méi)有其他線程獲取了寫(xiě)鎖矢洲,那么就能安全轉(zhuǎn)換。
  • 悲觀讀鎖轉(zhuǎn)換為悲觀讀鎖缩焦。此種情況和上面的寫(xiě)鎖轉(zhuǎn)寫(xiě)鎖一樣读虏,已經(jīng)是讀鎖了那就無(wú)需轉(zhuǎn)換,只需驗(yàn)證stamp合法性然后直接返回即可袁滥。
> line: 776
public long tryConvertToReadLock(long stamp) {
    long a, s, next; WNode h;
    while (((s = state) & SBITS) == (stamp & SBITS)) {
        if ((a = stamp & ABITS) >= WBIT) {
            // write stamp
            if (s != stamp)
                break;
            STATE.setVolatile(this, next = unlockWriteState(s) + RUNIT);
            if ((h = whead) != null && h.status != 0)
                release(h);
            return next;
        }
        else if (a == 0L) {
            // optimistic read stamp
            if ((s & ABITS) < RFULL) {
                if (casState(s, next = s + RUNIT))
                    return next;
            }
            else if ((next = tryIncReaderOverflow(s)) != 0L)
                return next;
        }
        else {
            // already a read stamp
            if ((s & ABITS) == 0L)
                break;
            return stamp;
        }
    }
    return 0L;
}
  1. 嘗試轉(zhuǎn)換為樂(lè)觀讀鎖盖桥。
  • 寫(xiě)鎖轉(zhuǎn)換為樂(lè)觀讀鎖。只需要驗(yàn)證stamp合法性即可成功轉(zhuǎn)換题翻。
  • 樂(lè)觀讀鎖轉(zhuǎn)換為樂(lè)觀讀鎖揩徊。直接返回。
  • 悲觀讀鎖轉(zhuǎn)換為樂(lè)觀讀鎖嵌赠。第一個(gè)if語(yǔ)句(m = s & ABITS) == 0L驗(yàn)證stamp的合法性塑荒,此時(shí)stamp只會(huì)是悲觀讀鎖返回的,但是s & ABITS==0表示當(dāng)前沒(méi)有線程獲取了悲觀讀鎖姜挺,所以stamp不合法齿税。后兩個(gè)if語(yǔ)句針對(duì)當(dāng)前reader是否溢出作出不同的舉措。
> line: 817
public long tryConvertToOptimisticRead(long stamp) {
    long a, m, s, next; WNode h;
    VarHandle.acquireFence();
    while (((s = state) & SBITS) == (stamp & SBITS)) {
        if ((a = stamp & ABITS) >= WBIT) {
            // write stamp
            if (s != stamp)
                break;
            return unlockWriteInternal(s);
        }
        else if (a == 0L)
            // already an optimistic read stamp
            return stamp;
        else if ((m = s & ABITS) == 0L) // invalid read stamp
            break;
        else if (m < RFULL) {
            if (casState(s, next = s - RUNIT)) {
                if (m == RUNIT && (h = whead) != null && h.status != 0)
                    release(h);
                return next & SBITS;
            }
        }
        else if ((next = tryDecReaderOverflow(s)) != 0L)
            return next & SBITS;
    }
    return 0L;
}
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末炊豪,一起剝皮案震驚了整個(gè)濱河市凌箕,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌溜在,老刑警劉巖陌知,帶你破解...
    沈念sama閱讀 211,884評(píng)論 6 492
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場(chǎng)離奇詭異掖肋,居然都是意外死亡仆葡,警方通過(guò)查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,347評(píng)論 3 385
  • 文/潘曉璐 我一進(jìn)店門(mén)志笼,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)沿盅,“玉大人,你說(shuō)我怎么就攤上這事纫溃⊙В” “怎么了?”我有些...
    開(kāi)封第一講書(shū)人閱讀 157,435評(píng)論 0 348
  • 文/不壞的土叔 我叫張陵紊浩,是天一觀的道長(zhǎng)窖铡。 經(jīng)常有香客問(wèn)我疗锐,道長(zhǎng),這世上最難降的妖魔是什么费彼? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 56,509評(píng)論 1 284
  • 正文 為了忘掉前任滑臊,我火速辦了婚禮,結(jié)果婚禮上箍铲,老公的妹妹穿的比我還像新娘雇卷。我一直安慰自己,他們只是感情好颠猴,可當(dāng)我...
    茶點(diǎn)故事閱讀 65,611評(píng)論 6 386
  • 文/花漫 我一把揭開(kāi)白布关划。 她就那樣靜靜地躺著,像睡著了一般翘瓮。 火紅的嫁衣襯著肌膚如雪贮折。 梳的紋絲不亂的頭發(fā)上,一...
    開(kāi)封第一講書(shū)人閱讀 49,837評(píng)論 1 290
  • 那天资盅,我揣著相機(jī)與錄音脱货,去河邊找鬼。 笑死律姨,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的臼疫。 我是一名探鬼主播择份,決...
    沈念sama閱讀 38,987評(píng)論 3 408
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼烫堤!你這毒婦竟也來(lái)了荣赶?” 一聲冷哼從身側(cè)響起,我...
    開(kāi)封第一講書(shū)人閱讀 37,730評(píng)論 0 267
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤鸽斟,失蹤者是張志新(化名)和其女友劉穎拔创,沒(méi)想到半個(gè)月后,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體富蓄,經(jīng)...
    沈念sama閱讀 44,194評(píng)論 1 303
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡剩燥,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,525評(píng)論 2 327
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了立倍。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片灭红。...
    茶點(diǎn)故事閱讀 38,664評(píng)論 1 340
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖口注,靈堂內(nèi)的尸體忽然破棺而出变擒,到底是詐尸還是另有隱情,我是刑警寧澤寝志,帶...
    沈念sama閱讀 34,334評(píng)論 4 330
  • 正文 年R本政府宣布娇斑,位于F島的核電站策添,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏毫缆。R本人自食惡果不足惜唯竹,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,944評(píng)論 3 313
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望悔醋。 院中可真熱鬧摩窃,春花似錦、人聲如沸芬骄。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 30,764評(píng)論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)账阻。三九已至蒂秘,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間淘太,已是汗流浹背姻僧。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 31,997評(píng)論 1 266
  • 我被黑心中介騙來(lái)泰國(guó)打工, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留蒲牧,地道東北人撇贺。 一個(gè)月前我還...
    沈念sama閱讀 46,389評(píng)論 2 360
  • 正文 我出身青樓,卻偏偏與公主長(zhǎng)得像冰抢,于是被迫代替她去往敵國(guó)和親松嘶。 傳聞我的和親對(duì)象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 43,554評(píng)論 2 349

推薦閱讀更多精彩內(nèi)容