[Java并發(fā)]-----第6章 Java并發(fā)包中鎖原理剖析

1. LockSupport工具類

? JDK 中的jr.jar包里面的LockSupport是個工具類,主要作用是掛起和喚醒線程.

? 其內(nèi)部實現(xiàn)是native方法.

? LockSupport類與每個使用它的線程都會關聯(lián)一個許可證(鎖),在默認情況下調(diào)用時是不具有許可證的.

LockSupport和wait(),notify()系列方法的區(qū)別:

? ①LockSupport不需要在同步代碼塊里勇蝙。所以線程間也不需要維護一個共享的同步對象了呛谜,實現(xiàn)了線程間的解耦诅岩。

? ②unpark函數(shù)可以優(yōu)先于park調(diào)用卜壕,所以不需要擔心線程間的執(zhí)行先后順序椅您。(線程A連續(xù)調(diào)用兩次LockSupport.unpark(B)方法喚醒線程B萎坷,然后線程B調(diào)用兩次LockSupport.park()方法嘱兼, 線程B依舊會被阻塞悦冀。因為unpark()方法是更改標志位為1,而不是加一)

(1). void park()方法

? 如果沒有許可證,掛起.

(2). void unpark(Thread thread)方法

? thread線程立即獲取許可證,如果當前狀態(tài)為被阻塞,立即喚醒.

(3). void parkNanos(long nanos)方法

? 如果沒有許可證,掛起nanos微秒

(4). park(Object blocker)方法

? 將blocker變量存放到調(diào)用park方法掛起的線程中,推薦將this放入,可以從日志中知道在那個類中的代碼發(fā)生了掛起

(5). void parkNanos(Object blocker, long nanos)方法

? 相比上個方法多了超時時間

(6). void parkUntil(Object blocker, long deadline)方法

? 阻塞到時間戳deadline.

2. AQS-----鎖的底層支持

? ==AbstractQueuedSynchronizer抽象同步隊列簡稱AQS,是實現(xiàn)同步器的基礎組件==,并發(fā)包中鎖的底層就是使用AQS實現(xiàn)的.

? ==AQS是一個FIFO的雙向隊列,隊列元素為Node. Node中的thread用來存放進入AQS隊列的線程.Node節(jié)點內(nèi)部waitStatus記錄當前線程等待狀態(tài):CANCELLED(1,線程被取消),SIGNAL(-1,線程需要被喚醒),CONDITION(-2,線程在條件隊列中等待),PROPAGATE(-3,釋放共享資源時需要通知其他節(jié)點).==

image-20200221124601535

? AQS中維持了一個單一的狀態(tài)信息state,可以通過CAS操作修改其值,并且它有get()和set()方法.

? 對于AQS來說,線程同步的關鍵對狀態(tài)值state進行操作,分為獨占方式(標記,如果標記狀態(tài)不正確不能進行操作)和共享方式(直接進行CAS修改,不需要判斷標記).

(1). 獨占不響應中斷模式下,獲取與釋放資源

AbstractQueuedSynchronizer源碼剖析(二)- 不響應中斷的獨占鎖

1). 獲取鎖的過程

public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

private Node addWaiter(Node mode) {
    //基于當前線程,節(jié)點類型(Node.EXCLUSIVE)創(chuàng)建新的節(jié)點
    //由于這里是獨占模式醉蚁,因此節(jié)點類型就是Node.EXCLUSIVE
    Node node = new Node(Thread.currentThread(), mode);
    Node pred = tail;
    //這里為了提搞性能燃辖,首先執(zhí)行一次快速入隊操作,即直接嘗試將新節(jié)點加入隊尾
    if (pred != null) {
        node.prev = pred;
        //這里根據(jù)CAS的邏輯网棍,即使并發(fā)操作也只能有一個線程成功并返回黔龟,其余的都要執(zhí)行后面的入隊操作。即enq()方法
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    enq(node);
    return node;
}

private Node enq(final Node node) {
    for (;;) {
        Node t = tail;
        //如果隊列還沒有初始化滥玷,則進行初始化氏身,即創(chuàng)建一個空的頭節(jié)點
        if (t == null) { 
            //同樣是CAS,只有一個線程可以初始化頭結(jié)點成功惑畴,其余的都要重復執(zhí)行循環(huán)體
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
            //新創(chuàng)建的節(jié)點指向隊列尾節(jié)點蛋欣,毫無疑問并發(fā)情況下這里會有多個新創(chuàng)建的節(jié)點指向隊列尾節(jié)點
            node.prev = t;
            //基于這一步的CAS,不管前一步有多少新節(jié)點都指向了尾節(jié)點如贷,這一步只有一個能真正入隊成功陷虎,其他的都必須重新執(zhí)行循環(huán)體
            if (compareAndSetTail(t, node)) {
                t.next = node;
                //該循環(huán)體唯一退出的操作到踏,就是入隊成功(否則就要無限重試)
                return t;
            }
        }
    }
}

final boolean acquireQueued(final Node node, int arg) {
    //鎖資源獲取失敗標記位
    boolean failed = true;
    try {
        //等待線程被中斷標記位
        boolean interrupted = false;
        //這個循環(huán)體執(zhí)行的時機包括新節(jié)點入隊和隊列中等待節(jié)點被喚醒兩個地方
        for (;;) {
            //獲取當前節(jié)點的前置節(jié)點
            final Node p = node.predecessor();
            //如果前置節(jié)點就是頭結(jié)點,則嘗試獲取鎖資源
            if (p == head && tryAcquire(arg)) {
                //當前節(jié)點獲得鎖資源以后設置為頭節(jié)點尚猿,這里繼續(xù)理解我上面說的那句話
                //頭結(jié)點就表示當前正占有鎖資源的節(jié)點
                setHead(node);
                p.next = null; //幫助GC
                //表示鎖資源成功獲取窝稿,因此把failed置為false
                failed = false;
                //返回中斷標記,表示當前節(jié)點是被正常喚醒還是被中斷喚醒
                return interrupted;
            }
            //如果沒有獲取鎖成功凿掂,則進入掛起邏輯
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        //最后會分析獲取鎖失敗處理邏輯
        if (failed)
            cancelAcquire(node);
    }
}

//首先說明一下參數(shù)伴榔,node是當前線程的節(jié)點,pred是它的前置節(jié)點
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    //獲取前置節(jié)點的waitStatus
    int ws = pred.waitStatus;
    if (ws == Node.SIGNAL)
        //如果前置節(jié)點的waitStatus是Node.SIGNAL則返回true庄萎,然后會執(zhí)行parkAndCheckInterrupt()方法進行掛起
        return true;
    if (ws > 0) {
        //由waitStatus的幾個取值可以判斷這里表示前置節(jié)點被取消
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        //這里我們由當前節(jié)點的前置節(jié)點開始潮梯,一直向前找最近的一個沒有被取消的節(jié)點
        //注,由于頭結(jié)點head是通過new Node()創(chuàng)建惨恭,它的waitStatus為0,因此這里不會出現(xiàn)空指針問題秉馏,也就是說最多就是找到頭節(jié)點上面的循環(huán)就退出了
        pred.next = noparkAndCheckInterrupt()de;
    } else {
        //根據(jù)waitStatus的取值限定,這里waitStatus的值只能是0或者PROPAGATE脱羡,那么我們把前置節(jié)點的waitStatus設為Node.SIGNAL然后重新進入該方法進行判斷
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}
  1. 當前線程調(diào)用acquire()申請獲取鎖資源
    1. 先調(diào)用tryAcquire()嘗試獲取鎖()
      1. 這個方法內(nèi)部由具體的鎖實現(xiàn)
    2. 如果失敗先調(diào)用addWaiter()將當前線程入隊到AQS隊列中
      1. 如果之前沒有創(chuàng)建隊列(隊尾為null),直接調(diào)用enq進行完整的入隊操作(包括初始化)
        1. 是一個自旋循環(huán),第一次循環(huán)創(chuàng)建一個空結(jié)點,設置為隊尾和隊首,第二次循環(huán)時將傳入的node入隊
      2. 如果已經(jīng)有隊尾存在了,先將node(待插入結(jié)點)的前置設置為tail,使用t指針指向tail,使用CAS將tail指針指向node,然后將實際上的隊尾t的next設置為node.
    3. 然后在acquireQueued()方法中等待被喚醒直到獲取到資源.
      1. 進入acquireQueued方法后會進入一個無限循環(huán).每一次循環(huán)都會判斷傳入的參數(shù)是不是隊首,并且嘗試一次鎖獲取
      2. 如果獲取成功,將自己設置為隊首,并將node.thread設置為null,保證頭結(jié)點永遠是一個不帶thread的空節(jié)點.
      3. 如果獲取失敗,調(diào)用shouldParkAfterFailedAcquire()判斷自己需不需要阻塞.阻塞的大前提是前繼節(jié)點是可被喚醒的(waitStatus設置為Signal),這樣才能讓自己有機會被喚醒(隊列中按順序喚醒)
        1. 判斷傳入的前繼節(jié)點的waitStatus是否為Signal,是的話直接返回ture
        2. 根據(jù)waitStatus的值分為兩種情況
        3. 如果waitStatus大于0:也就是前繼節(jié)點被取消了.一直向前查找一個沒有被取消的節(jié)點(waitStatus>=0)的結(jié)點(頭結(jié)點waitStatus為0或-1,因此不可能出現(xiàn)空指針),并拼接成雙向隊列(==后面的隊列都是waitStatus>=0的,在狀態(tài)表中只有被取消的線程是這個狀態(tài),所以被移除出隊列,下一次GC會被清除==),返回false
        4. 如果waitStatus不大于0,也就是為0,-2或-3:修改前繼節(jié)點waitStatus為-1
      4. 如果shouldParkAfterFailedAcquire()返回true,證明node的前繼節(jié)點可以被喚醒,立即調(diào)用parkAndCheckInterrupt()阻塞掛起node
      5. 如果shouldParkAfterFailedAcquire()返回false,證明在方法內(nèi)部找到了一個可以被喚醒的節(jié)點作為前繼節(jié)點.在下一次循環(huán)中進行掛起
      6. parkAndCheckInterrupt()方法的掛起并不會被中斷打斷,是非中斷掛起,如果掛起過程中出現(xiàn)了中斷,方法在返回時會返回true,否則返回false
      7. 如果上一步返回了true,證明出現(xiàn)了中斷,在下一次循環(huán)中將中斷標記返回到上一層代碼.
    4. 線程被喚醒之后會將返回中斷標記到這里,如果出現(xiàn)過中斷,將自身線程掛起,來彌補之前的中斷.
  2. 進入臨界區(qū)(線程中運行的用戶代碼)

這個過程口語化的敘述就是:

? 先判斷能否獲取鎖資源,如果不能先進行入隊操作(所有線程一起入隊,只有一個線程能夠CAS成功,其他線程自旋,初始化操作也是放在自旋的邏輯里的.)

? 然后在隊列里面執(zhí)行掛起的邏輯:首先判斷自己是否是頭結(jié)點的next節(jié)點,如果是,再嘗試一次獲取鎖.成功則不必掛起.如果不是頭結(jié)點的后繼,那么判斷當前節(jié)點的前繼節(jié)點能否被喚醒(也就是節(jié)點沒有被取消),如果可以,那么掛起,如果不能,向前尋找第一個可以被喚醒的節(jié)點,然后拼接在其后面(拋棄中間這部分不可被喚醒的節(jié)點).

2). 鎖的釋放過程

public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}

private void unparkSuccessor(Node node) {
    int ws = node.waitStatus;
    if (ws < 0)
        //把標記為設置為0萝究,表示喚醒操作已經(jīng)開始進行,提高并發(fā)環(huán)境下性能
        compareAndSetWaitStatus(node, ws, 0);

    Node s = node.next;
    //如果當前節(jié)點的后繼節(jié)點為null锉罐,或者已經(jīng)被取消
    if (s == null || s.waitStatus > 0) {
        s = null;
        //注意這個循環(huán)沒有break帆竹,也就是說它是從后往前找,一直找到離當前節(jié)點最近的一個等待喚醒的節(jié)點
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    //執(zhí)行喚醒操作
    if (s != null)
        LockSupport.unpark(s.thread);
}

釋放鎖的邏輯并不復雜.

  1. 調(diào)用tryRelease()來釋放資源.(判斷當前線程是否和AQS中的線程一致,然后對state進行修改)(這里返回值是指是否已經(jīng)完全釋放資源了,可重入鎖需要將state釋放至0)
  2. 拿到head節(jié)點,判斷其狀態(tài),如果不為null,且waitStatus不為0(初始狀態(tài)下,為0,當這個節(jié)點有了后繼節(jié)點時,會被修改成-1,這里是為了防止只有一個頭節(jié)點的空隊列進行了釋放鎖),調(diào)用unparkSuccessor()喚醒頭結(jié)點的后繼節(jié)點的線程
    1. 先進行waitStatus的判斷,如果waitStatus<0,將其設置為0,避免其他線程也進入此方法,提高高并發(fā)環(huán)境下的性能.
    2. 找到頭結(jié)點的后繼節(jié)點,再次判斷是否為空或者其狀態(tài)值是否被取消
    3. 如果后繼節(jié)點為空或者狀態(tài)為被取消,從后向前找到最前面的一個等待喚醒的節(jié)點
    4. 執(zhí)行喚醒操作
  3. 返回true

? 如果喚醒的節(jié)點前有被取消的節(jié)點,那么會在喚醒后被判斷前繼節(jié)點不是頭結(jié)點,再次進行判斷前繼節(jié)點是否能被喚醒的方法調(diào)用.在這個方法調(diào)用中會將前面的被取消的節(jié)點移除,然后再次進行前繼節(jié)點是不是頭結(jié)點的判斷.這次就可以判斷成功,將此節(jié)點設置為頭結(jié)點,成功喚醒,運行其內(nèi)部用戶代碼.

整個過程就是:

? 先判斷是否能夠歸還資源(獨占鎖一般都不會失敗,可重入鎖需要判斷返回值是否大于零,因為返回值代表剩余鎖可重入次數(shù),從而決定是否傳遞地釋放后面的線程).

? 然后判斷頭結(jié)點的后繼能否可被喚醒,如果沒有,那么尋找到第一個可被喚醒的節(jié)點,進行喚醒

? 這里for循環(huán)是==從后向前==遍歷的,原因是在入隊操作時,是先將node的前繼指向tail,然后更新tail為node.后一步是cas操作,可能會失敗,并且這個過程不是原子性的,如果在這個過程中發(fā)生了從前向后遍歷隊列,就會發(fā)生找到tail,next為null,但實際上并沒有遍歷到tail節(jié)點的情況.

(2). 共享不響應中斷模式下,獲取與釋放資源

AbstractQueuedSynchronizer源碼剖析(四)- 不響應中斷的共享鎖

1). 獲取鎖的過程

public final void acquireShared(int arg) {
    //嘗試獲取共享鎖脓规,返回值小于0表示獲取失敗
    if (tryAcquireShared(arg) < 0)
        //執(zhí)行獲取鎖失敗以后的方法
        doAcquireShared(arg);
}

//參數(shù)不多說栽连,就是傳給acquireShared()的參數(shù)
private void doAcquireShared(int arg) {
    //添加等待節(jié)點的方法跟獨占鎖一樣,唯一區(qū)別就是節(jié)點類型變?yōu)榱斯蚕硇颓扔撸辉儋樖?    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();
            //表示前面的節(jié)點已經(jīng)獲取到鎖秒紧,自己會嘗試獲取鎖
            if (p == head) {
                int r = tryAcquireShared(arg);
                //注意上面說的, 等于0表示不用喚醒后繼節(jié)點挨下,大于0需要
                if (r >= 0) {
                    //這里是重點熔恢,獲取到鎖以后的喚醒操作,后面詳細說
                    setHeadAndPropagate(node, r);
                    p.next = null;
                    //如果是因為中斷醒來則設置中斷標記位
                    if (interrupted)
                        selfInterrupt();
                    failed = false;
                    return;
                }
            }
            //掛起邏輯跟獨占鎖一樣臭笆,不再贅述
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        //獲取失敗的取消邏輯跟獨占鎖一樣叙淌,不再贅述
        if (failed)
            cancelAcquire(node);
    }
}

//兩個入?yún)ⅲ粋€是當前成功獲取共享鎖的節(jié)點愁铺,一個就是tryAcquireShared方法的返回值鹰霍,注意上面說的,它可能大于0也可能等于0
private void setHeadAndPropagate(Node node, int propagate) {
    Node h = head; //記錄當前頭節(jié)點
    //設置新的頭節(jié)點,即把當前獲取到鎖的節(jié)點設置為頭節(jié)點
    //注:這里是獲取到鎖之后的操作,不需要并發(fā)控制
    setHead(node);
    //這里意思有兩種情況是需要執(zhí)行喚醒操作
    //1.propagate > 0 表示調(diào)用方指明了后繼節(jié)點需要被喚醒
    //2.頭節(jié)點后面的節(jié)點需要被喚醒(waitStatus<0),不論是老的頭結(jié)點還是新的頭結(jié)點
    if (propagate > 0 || h == null || h.waitStatus < 0 ||
        (h = head) == null || h.waitStatus < 0) {
        Node s = node.next;
        //如果當前節(jié)點的后繼節(jié)點是共享類型或者沒有后繼節(jié)點,則進行喚醒
        //這里可以理解為除非明確指明不需要喚醒(后繼等待節(jié)點是獨占類型),否則都要喚醒
        if (s == null || s.isShared())
            //后面詳細說
            doReleaseShared();
    }
}

private void setHead(Node node) {
    head = node;
    node.thread = null;
    node.prev = null;
}
  1. 調(diào)用acquireShared()去獲取鎖
    1. 首先會嘗試調(diào)用tryAcquireShared()共享方式獲取資源,返回值負數(shù)表示失敗,0表示成功,但沒有剩余資源可用,整數(shù)表示成功且有剩余資源
    2. 當返回負數(shù)時,調(diào)用doAcquireShared()進行線程的入隊列掛起等待
      1. 將當前線程打包成Node對象,類型為共享型,添加到隊列尾,方法與獨占式相同
      2. 如果當前節(jié)點的前繼節(jié)點是頭結(jié)點,嘗試進行獲取資源,如果返回值不為負,意味著可以獲取到資源,調(diào)用setHeadAndPropagate()將當前節(jié)點設置為頭結(jié)點,判斷是否是因為喚醒線程來到這里,對中斷標記進行判斷,是否進行中斷補充
        1. setHeadAndPropagate(Node node, int propagate)的第二個參數(shù)是剛剛嘗試獲取資源得到的剩余資源數(shù),如果大于零,代表有多余資源,那么應該去喚醒下一個可被喚醒的線程
        2. h == null 場景未知,應該不會出現(xiàn)吧.
        3. h.waitStatus意味著頭結(jié)點為可喚醒狀態(tài)
        4. 以上的集中狀況都需要進行線程喚醒的嘗試.如果當前頭結(jié)點的前繼節(jié)點不為空且為共享型,調(diào)用doReleaseShared()對頭結(jié)點后的第一個可喚醒節(jié)點進行喚醒.
      3. 如果前繼節(jié)點不是頭節(jié)點,掛起,邏輯同獨占鎖

2). 釋放鎖的過程

public final boolean releaseShared(int arg) {
    //嘗試釋放共享鎖
    if (tryReleaseShared(arg)) {
        //喚醒過程拒逮,詳情見上面分析
        doReleaseShared();
        return true;
    }
    return false;
}

private void doReleaseShared() {
    for (;;) {
        //喚醒操作由頭結(jié)點開始,注意這里的頭節(jié)點已經(jīng)是上面新設置的頭結(jié)點了
        //其實就是喚醒上面新獲取到共享鎖的節(jié)點的后繼節(jié)點
        Node h = head;
        if (h != null && h != tail) {
            int ws = h.waitStatus;
            //表示后繼節(jié)點需要被喚醒
            if (ws == Node.SIGNAL) {
                //這里需要控制并發(fā),因為入口有setHeadAndPropagate跟release兩個玷氏,避免兩次unpark
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                    continue;      
                //執(zhí)行喚醒操作      
                unparkSuccessor(h);
            }
            //如果后繼節(jié)點暫時不需要喚醒,則把當前節(jié)點狀態(tài)設置為PROPAGATE確保以后可以傳遞下去
            else if (ws == 0 &&
                     !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                continue;                
        }unparkSuccessor
        //如果頭結(jié)點沒有發(fā)生變化腋舌,表示設置完成盏触,退出循環(huán)
        //如果頭結(jié)點發(fā)生變化,比如說其他線程獲取到了鎖块饺,為了使自己的喚醒動作可以傳遞赞辩,必須進行重試
        if (h == head)                   
            break;
    }
}
  1. 調(diào)用releaseShared()獲取資源
    1. 先調(diào)用tryReleaseShared()嘗試釋放共享鎖,由具體的鎖實現(xiàn)邏輯,如果獲取到了資源,返回true.繼而調(diào)用doReleaseShared()喚醒頭結(jié)點后的可喚醒線程
      1. 進入一個自旋循環(huán)
      2. 判斷頭結(jié)點(如果從設置頭結(jié)點調(diào)用至此方法,此時頭結(jié)點是新的頭節(jié)點)是否為空,是否只有一個頭結(jié)點組成空隊列.
      3. 如果都不是,那么頭結(jié)點有效,然后判斷頭結(jié)點狀態(tài),如果為-1,設置為0,
      4. 調(diào)用unparkSuccessor()執(zhí)行喚醒操作(別的線程在21行會判斷為true,跳過,這樣就控制了并發(fā))
        1. unparkSuccessor()與獨占鎖相同,如果頭結(jié)點的后繼節(jié)點不能喚醒,從后向前找到最前面的一個等待喚醒的節(jié)點喚醒它
      5. 然后跳入27行將狀態(tài)改為共享模式下可釋放,適應在設置頭結(jié)點處的調(diào)用.
      6. 如果頭結(jié)點沒有變化(并發(fā)情況下,其他線程可能會改變head的引用),退出方法.

3. AQS-----條件變量的支持

? 和之前講到的notify()和wait()配合synchronized內(nèi)置鎖實現(xiàn)線程間同步一樣,AQS中的條件變量==signal()====await()==方法也是用來配合鎖(使用AQS實現(xiàn))來實現(xiàn)線程間同步的.

? 不同點在于,synchronized同時之能與一個共享變量的notify()或wait()方法實現(xiàn)同步,而AQS的一個鎖可以對應多個變量.

(1). 條件隊列阻塞的過程(功能類比wait()方法)

//條件隊列入口,參考上面的代碼片段
public final void await() throws InterruptedException {
    //如果當前線程被中斷則直接拋出異常
    if (Thread.interrupted())
        throw new InterruptedException();
    //把當前節(jié)點加入條件隊列
    Node node = addConditionWaiter();
    //釋放掉已經(jīng)獲取的獨占鎖資源
    int savedState = fullyRelease(node);
    int interruptMode = 0;
    //如果不在同步隊列中則不斷掛起
    while (!isOnSyncQueue(node)) {
        LockSupport.park(this);
        //中斷處理授艰,另一種跳出循環(huán)的方式
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
    }
    //走到這里說明節(jié)點已經(jīng)條件滿足被加入到了同步隊列中或者中斷了
    //這個方法很熟悉吧辨嗽?就跟獨占鎖調(diào)用同樣的獲取鎖方法,從這里可以看出條件隊列只能用于獨占鎖
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
    //走到這里說明已經(jīng)成功獲取到了獨占鎖淮腾,接下來就做些收尾工作
    //刪除條件隊列中被取消的節(jié)點
    if (node.nextWaiter != null) 
        unlinkCancelledWaiters();
    //根據(jù)不同模式處理中斷
    if (interruptMode != 0)
        reportInterruptAfterWait(interruptMode);
}

//注:1.與同步隊列不同糟需,條件隊列頭尾指針是firstWaiter跟lastWaiter
//注:2.條件隊列是在獲取鎖之后,也就是臨界區(qū)進行操作谷朝,因此很多地方不用考慮并發(fā)
private Node addConditionWaiter() {
    Node t = lastWaiter;
    //如果最后一個節(jié)點被取消洲押,則刪除隊列中被取消的節(jié)點
    //至于為啥是最后一個節(jié)點后面會分析
    if (t != null && t.waitStatus != Node.CONDITION) {
        //刪除所有被取消的節(jié)點
        unlinkCancelledWaiters();
        t = lastWaiter;
    }
    //創(chuàng)建一個類型為CONDITION的節(jié)點并加入隊列,由于在臨界區(qū)圆凰,所以這里不用并發(fā)控制
    Node node = new Node(Thread.currentThread(), Node.CONDITION);
    if (t == null)
        firstWaiter = node;
    else
        t.nextWaiter = node;
    lastWaiter = node;
    return node;
}

//刪除取消節(jié)點的邏輯雖然長杈帐,但比較簡單,就不單獨說了专钉,就是鏈表刪除
private void unlinkCancelledWaiters() {
    Node t = firstWaiter;
    Node trail = null;
    while (t != null) {
        Node next = t.nextWaiter;
        if (t.waitStatus != Node.CONDITION) {
            t.nextWaiter = null;
            if (trail == null)
                firstWaiter = next;
            else
                trail.nextWaiter = next;
            if (next == null)
                lastWaiter = trail;
        }
        else
            trail = t;
        t = next;
    }
}

//入?yún)⒕褪切聞?chuàng)建的節(jié)點挑童,即當前節(jié)點
final int fullyRelease(Node node) {
    boolean failed = true;
    try {
        //這里這個取值要注意,獲取當前的state并釋放跃须,這從另一個角度說明必須是獨占鎖
        //可以考慮下這個邏輯放在共享鎖下面會發(fā)生什么炮沐?
        int savedState = getState();
        //跟獨占鎖釋放鎖資源一樣,不贅述
        if (release(savedState)) {
            failed = false;
            return savedState;
        } else {
            //如果這里釋放失敗回怜,則拋出異常
            throw new IllegalMonitorStateException();
        }
    } finally {
        //如果釋放鎖失敗大年,則把節(jié)點取消,由這里就能看出來上面添加節(jié)點的邏輯中只需要判斷最后一個節(jié)點是否被取消就可以了
        if (failed)
            node.waitStatus = Node.CANCELLED;
    }
}

//判斷節(jié)點是否在同步隊列中
final boolean isOnSyncQueue(Node node) {
    //快速判斷1:節(jié)點狀態(tài)或者節(jié)點沒有前置節(jié)點
    //注:同步隊列是有頭節(jié)點的玉雾,而條件隊列沒有
    if (node.waitStatus == Node.CONDITION || node.prev == null)
        return false;
    //快速判斷2:next字段只有同步隊列才會使用翔试,條件隊列中使用的是nextWaiter字段
    if (node.next != null) 
        return true;
    //上面如果無法判斷則進入復雜判斷
    return findNodeFromTail(node);
}

//注意這里用的是tail,這是因為條件隊列中的節(jié)點是被加入到同步隊列尾部复旬,這樣查找更快
//從同步隊列尾節(jié)點開始向前查找當前節(jié)點垦缅,如果找到則說明在,否則不在
private boolean findNodeFromTail(Node node) {
    Node t = tail;
    for (;;) {
        if (t == node)
            return true;
        if (t == null)
            return false;
        t = t.prev;
    }
}

//這里的判斷邏輯是:
//1.如果現(xiàn)在不是中斷的驹碍,即正常被signal喚醒則返回0
//2.如果節(jié)點由中斷加入同步隊列則返回THROW_IE壁涎,由signal加入同步隊列則返回REINTERRUPT
private int checkInterruptWhileWaiting(Node node) {
    return Thread.interrupted() ?
        (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
    0;
}

//修改節(jié)點狀態(tài)并加入同步隊列
//該方法返回true表示節(jié)點由中斷加入同步隊列凡恍,返回false表示由signal加入同步隊列
final boolean transferAfterCancelledWait(Node node) {
    //這里設置節(jié)點狀態(tài)為0,如果成功則加入同步隊列
    if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
        //與獨占鎖同樣的加入隊列邏輯怔球,不贅述
        enq(node);
        return true;
    }
    //如果上面設置失敗嚼酝,說明節(jié)點已經(jīng)被signal喚醒,由于signal操作會將節(jié)點加入同步隊列竟坛,我們只需自旋等待即可
    while (!isOnSyncQueue(node))
        Thread.yield();
    return false;
}

//根據(jù)中斷時機選擇拋出異常或者設置線程中斷狀態(tài)
private void reportInterruptAfterWait(int interruptMode) throws InterruptedException {
      if (interruptMode == THROW_IE)
           throw new InterruptedException();
      else if (interruptMode == REINTERRUPT)
           //實現(xiàn)代碼為:Thread.currentThread().interrupt();
           selfInterrupt();
}
  1. 調(diào)用await()方法將當前線程移入條件隊列,等待條件隊列喚醒
    1. 如果當前線程被中斷,拋出異常
    2. 調(diào)用addConditionWaiter()將當前線程加入條件隊列
      1. 如果最后一個節(jié)點被取消了,調(diào)用unlinkCancelledWaiters()清理隊列中的被取消節(jié)點
        1. 該方法的邏輯就是從頭遍歷隊列,如果狀態(tài)為被取消,刪除該節(jié)點
      2. 創(chuàng)建一個類型為條件等待的節(jié)點加入隊列尾
      3. 返回該節(jié)點
    3. 調(diào)用fullyRelease()釋放掉當前線程占用的資源,這一步之前的操作都是線程安全的
      1. 獲取當前的state值,調(diào)用release()釋放資源(內(nèi)部邏輯之前有講),如果成功則返回并標記為成功,失敗拋出異常,
      2. 最后如果沒有成功,也就是釋放鎖失敗了,將節(jié)點取消(從這里可以看出來,被取消的節(jié)點一定是結(jié)尾處的,所以上面addConditionWaiter()方法判斷的是最后一個節(jié)點是否被取消,這樣保證了隊列中間不可能出現(xiàn)被取消節(jié)點)
    4. 循環(huán)調(diào)用isOnSyncQueue()判斷當前線程是否在同步隊列中(如果不在則相當于內(nèi)置鎖中被wait()阻塞但沒有被notify()喚醒的狀態(tài)),這里是調(diào)用通過其他線程調(diào)用signal()方法將該線程(條件)喚醒
      1. 先進行快速判斷,如果該節(jié)點狀態(tài)為條件等待(肯定在條件隊列中),或者前繼節(jié)點(同步隊列中)為空,證明不在同步隊列中
      2. 如果后繼節(jié)點(同步隊列中)不為空,則證明在同步隊列中,因為只有在隊列中的節(jié)點才有可能被添加后繼節(jié)點
      3. 如果上面兩個都不能判斷,調(diào)用findNodeFromTail()進入復雜判斷
        1. 從尾節(jié)點向前查找節(jié)點,如果找到了當前節(jié)點,證明當前節(jié)點在同步隊列中
    5. 如果不在同步隊列中,進行掛起.并進行中斷處理
    6. 然后在acquireQueued()方法中等待被喚醒直到獲取到資源.(內(nèi)部邏輯之前有描述過)
    7. 如果,當前node不是尾節(jié)點,說明其他線程也對條件隊列進行了操作,調(diào)用unlinkCancelledWaiters()清理隊列中被取消的節(jié)點
    8. 根據(jù)不同模式,判斷是否調(diào)用reportInterruptAfterWait()處理中斷
      1. 根據(jù)中斷時機選擇拋出異常或者中斷補償(取決于能否響應中斷)

(2). 條件隊列喚醒的過程(功能類比notify()方法)

//條件隊列喚醒入口
public final void signal() {
    //如果不是獨占鎖則拋出異常崭歧,再次說明條件隊列只適用于獨占鎖
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    //如果條件隊列不為空隅很,則進行喚醒操作
    Node first = firstWaiter;
    if (first != null)
        doSignal(first);
}

//該方法就是把一個有效節(jié)點從條件隊列中刪除并加入同步隊列
//如果失敗則會查找條件隊列上等待的下一個節(jié)點直到隊列為空
private void doSignal(Node first) {
    do {
        if ( (firstWaiter = first.nextWaiter) == null)
            lastWaiter = null;
        first.nextWaiter = null;
    } while (!transferForSignal(first) &&(first = firstWaiter) != null);
}

//將節(jié)點加入同步隊列
final boolean transferForSignal(Node node) {
    //修改節(jié)點狀態(tài)率碾,這里如果修改失敗只有一種可能就是該節(jié)點被取消,具體看上面await過程分析
    if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
        return false;
    //該方法很熟悉了播掷,跟獨占鎖入隊方法一樣审编,不贅述
    Node p = enq(node);
    //注:這里的p節(jié)點是當前節(jié)點的前置節(jié)點
    int ws = p.waitStatus;
    //如果前置節(jié)點被取消或者修改狀態(tài)失敗則直接喚醒當前節(jié)點
    //此時當前節(jié)點已經(jīng)處于同步隊列中垒酬,喚醒會進行鎖獲取或者正確的掛起操作
    if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
        LockSupport.unpark(node.thread);
    return true;
}
  1. 調(diào)用signal()條件喚醒條件隊列中的一個線程,將該線程從條件隊列中移動到同步隊列中
    1. 如果不是獨占鎖,拋出異常,條件隊列只能用于獨占鎖
    2. 如果條件隊列不為空,調(diào)用doSignal()執(zhí)行喚醒操作
      1. 從傳入結(jié)點(上一步傳入的是頭結(jié)點)開始把一個有效節(jié)點從條件隊列刪除
      2. 這一步使用first保存頭結(jié)點,然后頭結(jié)點后移(循環(huán)會判斷頭結(jié)點為空退出循環(huán))
      3. 調(diào)用transferForSignal()將剛剛得到的first節(jié)點移入同步隊列(成功則退出循環(huán))
        1. 使用AQS修改該節(jié)點的狀態(tài),將CONDITION(條件等待)修改為0(如果修改失敗肯定是線程被取消了或者其他線程在這一步狀態(tài)為剛剛修改的0,因為條件隊列中的線程狀態(tài)只能是CONDITION或者被取消,這一步修改的不算)如果失敗,返回false
        2. 調(diào)用enq()將此節(jié)點入同步隊列,并返回前繼節(jié)點(邏輯不再贅述)
        3. 如果前繼節(jié)點被取消或者修改狀態(tài)失敗,掛起當前節(jié)點,喚醒后進入await()中的acquireQueued()部分自旋掛起等待喚醒(這個喚醒是鎖的競爭,而不是條件控制的).
  2. 然后會回到上層方法調(diào)用,循環(huán)判斷當前線程是否在同步隊列中.....

4. 獨占鎖ReentrantLock的原理

(1). 結(jié)構(gòu)

? ReentrantLock是可重入的獨占鎖,同時只能有一個線程可以獲取這個鎖,其他線程嘗試獲取就會被阻塞并放入AQS阻塞隊列中.

? 底層是使用AQS來實現(xiàn)的,根據(jù)參數(shù)來決定其內(nèi)部是否公平內(nèi)部類sync如果是NonfairSync類型的,就非公平,如果是FairSync,則公平.

(2). 獲取鎖

1). void lock()方法

public void lock() {
    sync.lock();
}

// 非公平鎖
final void lock() {
    if (compareAndSetState(0, 1))// 如果當前鎖是自由的,就直接獲取,這就是競爭鎖
        setExclusiveOwnerThread(Thread.currentThread());
    else// 如果鎖被占有,那么去排隊
        acquire(1);
}

// 公平鎖
final void lock() {// 公平鎖,就要排隊
    acquire(1);
}

// 通用
public final void acquire(int arg) {// 同之前AQS
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

// 非公平鎖
// 非公平性體現(xiàn)在如果一個線程釋放了鎖,在進行下一步喚醒隊列中的節(jié)點之前別的線程直接競爭到了鎖,那么隊列中的節(jié)點會喚醒失敗
protected final boolean tryAcquire(int acquires) {// 進行了一次調(diào)用,邏輯差不多
    return nonfairTryAcquire(acquires);
}
final boolean nonfairTryAcquire(int acquires) {
    //先獲取線程和鎖的狀態(tài)
    final Thread current = Thread.currentThread();
    int c = getState();
    // 這整個if-else if可以讓沒有進行排隊直接獲取鎖的線程直接拿到鎖
    if (c == 0) {// 如果鎖空閑
        if (compareAndSetState(0, acquires)) {// 獲取到鎖
            setExclusiveOwnerThread(current);
            return true;
        }
    }else if (current == getExclusiveOwnerThread()) {//如果鎖不空閑,但是是自己拿到的
        // 鎖重入
        int nextc = c + acquires;
        if (nextc < 0) // overflow
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    // 鎖不空閑而且不屬于自己,就只能是別人的鎖,嘗試獲取失敗
    return false;
}

// 公平鎖
protected final boolean tryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    if (c == 0) {
        // 不同點就在這,hasQueuedPredecessors()方法中當前線程節(jié)點有前繼節(jié)點返回true,AQS隊列為空或當前線程是AQS的第一個節(jié)點返回false
        // 簡單點說就是確保當前節(jié)點是頭結(jié)點后的第一個節(jié)點,也就是排到隊首的節(jié)點,保證只有隊首節(jié)點才能被喚醒
        if (!hasQueuedPredecessors() &&
            compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    // 如果當前節(jié)點已經(jīng)獲取了鎖,直接重入,這就不牽扯公平性了
    else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
        if (nextc < 0)
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    return false;
}

? 非公平鎖當鎖自由時,就可以直接獲取鎖,而公平鎖需要去acquire()方法內(nèi)部判斷.

? 兩種鎖調(diào)用的acquire()方法都一樣.

? tryAcquire()方法有所區(qū)別,實現(xiàn)邏輯的區(qū)別在于公平鎖在鎖空閑的情況下,獲取鎖之前確保了當前節(jié)點是頭結(jié)點后的第一個節(jié)點,從而讓喚醒的節(jié)點一定是隊列中排過隊的.

2). void lockInterruptibly()方法

? 對中斷響應的獲取鎖.邏輯都類似,調(diào)用的是AQS中可被中斷的獲取鎖方法

3). boolean tryLock()方法

? 嘗試獲取鎖,如果沒獲取到不會阻塞

public boolean tryLock() {
    // 之前介紹過nonfairTryzaiAcquire(),就是單純的獲取鎖,之前的阻塞的邏輯是在acquire中
    return sync.nonfairTryAcquire(1);
}

4). boolean tryLock(long timeout, TimeUnit unit)方法

? 設置了時間,如果在該時間內(nèi)沒有獲取到鎖,返回false.

? TimeUnit參數(shù)為時間粒度,直接new一個TimeUnit對象即可.

(3). 釋放鎖

1). void unlock()方法

public void unlock() {
    sync.release(1);
}

public final boolean release(int arg) {
    // 邏輯同AQS
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}

protected final boolean tryRelease(int releases) {
    // c為本次重入成功后,鎖的重入次數(shù)
    int c = getState() - releases;
    // 如果不是當前線程拿到鎖,拋出異常
    if (Thread.currentThread() != getExclusiveOwnerThread())
        throw new IllegalMonitorStateException();
    boolean free = false;
    // 如果這次重入之后,重入次數(shù)為零,證明鎖沒有被任何線程重入,清空鎖持有線程
    if (c == 0) {
        free = true;
        setExclusiveOwnerThread(null);
    }
    // 設置重入次數(shù)
    setState(c);
    return free;
}

5. 讀寫鎖ReentrantReadWriteLock的原理

? 實際應用中,寫少讀多的情況很多,而使用ReentranLock性能太低(只有讀鎖時不需要限制其他讀鎖).ReentrantReadWriteLock應運而生,采用讀寫分離的策略,==允許多個線程可以同時獲取讀鎖==.

(1). 結(jié)構(gòu)

? ==內(nèi)部維護了一個ReadLock和一個WriteLock==,它們都是依賴Sync實現(xiàn)具體功能,而Sunc繼承自AQS.AQS中的state只維護了一個狀態(tài),而讀寫鎖有兩個鎖,所以==采用state的高16為表示讀鎖的狀態(tài),低16位表示寫鎖的狀態(tài).==

(2). 寫鎖的獲取與釋放

? 寫鎖使用WriteLock實現(xiàn).

1). void lock()

? ==寫鎖是個可重入的獨占鎖,當沒有線程獲取讀鎖且寫鎖空閑或者寫鎖屬于當前線程時,可以獲取.==

public void lock() {
    sync.acquire(1);
}

public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

// 此方法在WriteLock(內(nèi)部類)中實現(xiàn)
protected final boolean tryAcquire(int acquires) {
    // 獲取當前線程,寫鎖的重入次數(shù)W
    Thread current = Thread.currentThread();
    int c = getState();
    int w = exclusiveCount(c);
    // 寫鎖或者讀鎖已經(jīng)被獲取
    if (c != 0) {
        // w==0證明寫鎖空閑,那么就是讀鎖被獲取,如果讀鎖空閑繼而判斷當前線程是否是寫鎖中的線程
        if (w == 0 || current != getExclusiveOwnerThread())
            // 進入這里說明讀鎖空閑,那么不能獲取寫鎖.或者寫鎖重入的不是當前線程
            return false;
        // 走到這證明可以獲取讀鎖沒被獲取,而且當前線程獲取了寫鎖,這一步判斷是否能繼續(xù)進行重入
        if (w + exclusiveCount(acquires) > MAX_COUNT)
            throw new Error("Maximum lock count exceeded");
        // 設置重入值
        setState(c + acquires);
        return true;
    }
    // 第一個寫線程獲取寫鎖
    // writerShouldBlock()當有別的線程也在獲取此鎖時,是否應該阻塞(分公平和非公平兩種實現(xiàn))
    if (writerShouldBlock() ||
        !compareAndSetState(c, c + acquires))
        return false;
    setExclusiveOwnerThread(current);
    return true;
}

// 非公平鎖,永遠不需要判斷,不阻塞,搶就行了
final boolean writerShouldBlock() {
    return false; // writers can always barge
}

//公平鎖,調(diào)用hasQueuedPredecessors()判斷是否有前繼節(jié)點,如果有說明不是隊首,繼續(xù)排隊,放棄獲取鎖
final boolean writerShouldBlock() {
    return hasQueuedPredecessors();
}

2). void lockInterruptibly()

? 同樣是獲取鎖,此方法會對中斷進行響應.調(diào)用的是AQS中響應中斷的獲取鎖方法.

3). boolean tryLock()

? 嘗試獲取寫鎖,沒有獲取到不會阻塞,邏輯同上.

4). boolean tryLock(long timeout, TimeUnit unit)

? 獲取鎖失敗后掛起指定時間,如果時間到了之后還是沒有獲取到鎖,返回false

5). void unlock()

public void unlock() {
    sync.release(1);
}
public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}

// 上面都是AQS的內(nèi)容,下面的方法是具體的鎖實現(xiàn)的
protected final boolean tryRelease(int releases) {
    //判斷是否是寫鎖的擁有者調(diào)用的unlock
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    // 是當前線程獲取的寫鎖
    // 獲取可重入值,寫鎖是低16位,可以直接加減.如果寫鎖重入次數(shù)為0,則鎖應修改為空閑狀態(tài)
    int nextc = getState() - releases;
    boolean free = exclusiveCount(nextc) == 0;
    if (free)
        setExclusiveOwnerThread(null);
    setState(nextc);
    return free;
}

(3). 讀鎖的獲取與釋放

? 讀鎖是使用ReadLock實現(xiàn)的.

1). void lock()

? ==獲取讀鎖,如果寫鎖是空閑的,其他線程都可以獲取讀鎖.讀鎖是共享的==

public void lock() {
    sync.acquireShared(1);
}
public final void acquireShared(int arg) {
    if (tryAcquireShared(arg) < 0)
        doAcquireShared(arg);
}

// 具體的鎖的實現(xiàn)
protected final int tryAcquireShared(int unused) {
    // 獲取當前線程,讀鎖標記
    Thread current = Thread.currentThread();
    int c = getState();
    // 判斷寫鎖是否被占用,且占用者不是自己(一個線程擁有了寫鎖,也是可以擁有讀鎖的,加鎖主要是避免不同線程之間的不同步,在一個線程內(nèi)一定是安全的)
    if (exclusiveCount(c) != 0 &&
        getExclusiveOwnerThread() != current)
        return -1;
    // 獲取讀鎖的計數(shù)
    int r = sharedCount(c);
    // 嘗試獲取鎖,只有一個線程可以成功
    if (!readerShouldBlock() &&
        r < MAX_COUNT &&
        compareAndSetState(c, c + SHARxianchengED_UNIT)) {
        // 獲取之前讀鎖是空閑的,也就是說,這個線程是第一個獲取讀鎖的線程
        if (r == 0) {
            firstReader = current;
            firstReaderHoldCount = 1;
        } else if (firstReader == current) {// 此線程是之前第一個獲取鎖的線程
            firstReaderHoldCount++;
        } else {// 如果不是第一個獲取多鎖的線程,將該線程持有鎖的次數(shù)信息景描,放入線程本地變量中秀撇,方便在整個請求上下文(請求鎖呵燕、釋放鎖等過程中)使用持有鎖次數(shù)信息。
            HoldCounter rh = cachedHoldCounter;
            if (rh == null || rh.tid != getThreadId(current))
                cachedHoldCounter = rh = readHolds.get();
            else if (rh.count == 0)
                readHolds.set(rh);
            rh.count++;
        }
        return 1;
    }
    // 沒有獲取成功的線程進入fullTryAcquireShared,邏輯與tryAcquireShared()類似,但是是自旋的
    return fullTryAcquireShared(current);
}

final int fullTryAcquireShared(Thread current) {
    HoldCounter rh = null;
    for (;;) {
        int c = getState();
        // 寫鎖被占用
        if (exclusiveCount(c) != 0) {
            // 當前線程沒有占有讀鎖
            if (getExclusiveOwnerThread() != current)
                return -1;
        }
        // 競爭時應不應該掛起自己
        else if (readerShouldBlock()) {
            // 此線程是第一個得到讀鎖的線程
            if (firstReader == current) {
                // doNothing
            } else {
                if (rh == null) {
                    rh = cachedHoldCounter;
                    if (rh == null || rh.tid != getThreadId(current)) {
                        rh = readHolds.get();
                        if (rh.count == 0)
                            readHolds.remove();
                    }
                }
                if (rh.count == 0)
                    return -1;
            }
        }
        // 是否達到最大共享值
        if (sharedCount(c) == MAX_COUNT)
            throw new Error("Maximum lock count exceeded");
        // 獲取鎖
        if (compareAndSetState(c, c + SHARED_UNIT)) {
            // 當前鎖空閑
            if (sharedCount(c) == 0) {
                firstReader = current;
                firstReaderHoldCount = 1;
            }
            // 如果firstReader是當前線程夜矗,或者當前線程的cachedHoldCounter變量的count不為0(表示當前線程已經(jīng)持有了該共享鎖)让虐,均說明當前線程已經(jīng)持有共享鎖,此次獲取共享鎖是重入,這也是允許的麸俘,可以通過判斷惧笛。
            // 此處將重入次數(shù)分為fistReader的重入次數(shù)和其他所有線程的重入次數(shù)之和
            else if (firstReader == current) {
                firstReaderHoldCount++;
            } else {
                if (rh == null)
                    rh = cachedHoldCounter;
                if (rh == null || rh.tid != getThreadId(current))
                    rh = readHolds.get();
                else if (rh.count == 0)
                    readHolds.set(rh);
                rh.count++;
                cachedHoldCounter = rh; // cache for release
            }
            return 1;
        }
    }
}

2). void lockInterruptibly()

? 可以對中斷進行響應的獲取鎖.

3). boolean tryLock()

? 嘗試獲取鎖,沒有成功不會阻塞.

4). boolean tryLock(long timeout, TimeUnit unit)

? 如果超時時間內(nèi)(掛起了)還沒有獲取鎖,返回false.

5). void unlock()

public void unlock() {
    sync.releaseShared(1);
}
public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {
        doReleaseShared();
        return true;
    }
    return false;
}

// 具體鎖實現(xiàn)的邏輯
protected final boolean tryReleaseShared(int unused) {
    Thread current = Thread.currentThread();
    // 對第一個獲取鎖的線程的重入次數(shù)進行更新
    if (firstReader == current) {
        if (firstReaderHoldCount == 1)
            firstReader = null;
        else
            firstReaderHoldCount--;
    }
    // 其他線程重入次數(shù)的更新
    else {
        HoldCounter rh = cachedHoldCounter;
        if (rh == null || rh.tid != getThreadId(current))
            rh = readHolds.get();
        int count = rh.count;
        if (count <= 1) {
            readHolds.remove();
            if (count <= 0)
                throw unmatchedUnlockException();
        }
        --rh.count;
    }
    // 自旋的釋放鎖
    for (;;) {
        int c = getState();
        int nextc = c - SHARED_UNIT;
        // CAS成功則說明當前線程沒有被其他線程打擾
        if (compareAndSetState(c, nextc))
            // 返回值為讀鎖是否空閑
            return nextc == 0;
    }
}

6. JDK 8 中新增的StampedLock鎖探究

(1). 概述

? StampedLock鎖是并發(fā)包中JDK 8版本新增的一個鎖,提供了三種模式的讀寫控制.當調(diào)用try系列方法嘗試獲取鎖時,會返回一個long型的變量stamp(戳記).這個戳記代表了鎖的狀態(tài),可以理解為樂觀鎖中的版本.釋放鎖的轉(zhuǎn)換鎖時都需要提供這個標記來判斷權限.

特點:

  • 所有獲取鎖的方法都會返回一個stamp戳記,0為失敗,其他為獲取成功
  • 所有釋放鎖的方法都需要一個stamp戳記,必須和之前得到鎖時的stamp一致
  • 不可重入
  • 有三種訪問模式
    • 樂觀讀
    • 悲觀讀
    • 寫鎖
  • 寫鎖可以降級為讀鎖
  • 都不支持條件隊列
/** 等待鏈表隊列拜效,每一個WNode標識一個等待線程 */
static final class WNode {
    volatile WNode prev;
    volatile WNode next;
    volatile WNode cowait;    // 讀模式使用該節(jié)點形成棧
    volatile Thread thread;   // non-null while possibly parked
    volatile int status;      // 0, WAITING, or CANCELLED
    final int mode;           // RMODE or WMODE
    WNode(int m, WNode p) { mode = m; prev = p; }
}


/** 鎖隊列狀態(tài)各谚, 當處于寫模式時第8位為1,讀模式時前7為為1-126(讀鎖是共享鎖,附加的readerOverflow用于當讀者超過126時) */
private transient volatile long state;
/** 將state超過 RFULL=126的值放到readerOverflow字段中 */
private transient int readerOverflow;

(2). 寫鎖writeLock

? 獨占鎖,不可重入.請求該鎖成功后會返回一個stamp戳記變量來表示該鎖的版本.

  • writeLock():當完全沒有加鎖時赴穗,繞過acquireWrite,否則調(diào)用acquireWrite入隊列獲取鎖資源,
    • acquireWrite():入隊自旋,并放到隊列尾部,如果隊列中只剩下一個結(jié)點,則在隊頭進一步自旋,最后會進入阻塞
  • unlockWrite():如果鎖的狀態(tài)與stamp相同,調(diào)用release()釋放鎖
    • release():喚醒傳入節(jié)點的后繼節(jié)點

(3). 悲觀讀readLock

? 共享鎖,不可重入

  • readLock():(隊列為空&&沒有寫鎖同時讀鎖數(shù)小于126&&CAS修改狀態(tài)成功)則狀態(tài)加1并返回,否則調(diào)用acquireRead()自旋獲取讀鎖
    • acquireRead():首先是入隊自旋般眉,如果隊尾不是讀模式則放到隊列尾部潜支,如果是讀模式冗酿,則放到隊尾的cowait中。如果隊列中只剩下一個結(jié)點鸠窗,則在隊頭進一步自旋.如果最終依然失敗胯究,則Unsafe().park()掛起當前線程。
  • unlockRead():如果state匹配stamp,判斷當前的共享次數(shù),修改state或者readerOverflow

(4). 樂觀讀OptimisticRead()

? 在操作數(shù)據(jù)前并沒有通過 CAS 設置鎖的狀態(tài)臣嚣,僅僅是通過位運算測試

? 如果當前沒有線程持有寫鎖硅则,則簡單的返回一個非 0 的 stamp 版本信息,獲取該 stamp 后在具體操作數(shù)據(jù)前還需要調(diào)用 validate 驗證下該 stamp 是否已經(jīng)不可用暑认,也就是看當調(diào)用 tryOptimisticRead 返回 stamp 后大审,到當前時間是否有其它線程持有了寫鎖徒扶,如果是那么 validate 會返回 0,否則就可以使用該 stamp 版本的鎖對數(shù)據(jù)進行操作导坟。

? 由于 tryOptimisticRead 并沒有使用 CAS 設置鎖狀態(tài)圈澈,所以不需要顯示的釋放該鎖康栈。該鎖的一個特點是適用于讀多寫少的場景,因為獲取讀鎖只是使用位操作進行檢驗漾狼,不涉及 CAS 操作饥臂,所以效率會高很多隅熙,但是同時由于沒有使用真正的鎖,在保證數(shù)據(jù)一致性上需要拷貝一份要操作的變量到方法棧酵熙,并且在操作數(shù)據(jù)時候可能其它寫線程已經(jīng)修改了數(shù)據(jù)匾二,而我們操作的是方法棧里面的數(shù)據(jù),也就是一個快照皮璧,所以最多返回的不是最新的數(shù)據(jù)悴务,但是一致性還是得到保障的譬猫。

/**
 * 獲取樂觀讀鎖染服,返回郵票stamp
 */
public long tryOptimisticRead() {
    long s;  //有寫鎖返回0.   否則返回256
    return (((s = state) & WBIT) == 0L) ? (s & SBITS) : 0L;
}

/**
 * 驗證從調(diào)用tryOptimisticRead開始到現(xiàn)在這段時間內(nèi)有無寫鎖占用過鎖資源,有寫鎖獲得過鎖資源則返回false. stamp為0返回false.
 * @return 從返回stamp開始蕉拢,沒有寫鎖獲得過鎖資源返回true,否則返回false
 */
public boolean validate(long stamp) {
    //強制讀取操作和驗證操作在一些情況下的內(nèi)存排序問題
    U.loadFence();
    //當持有寫鎖后再釋放寫鎖午乓,該校驗也不成立益愈,返回false
    return (stamp & SBITS) == (state & SBITS);
}

(5). 鎖轉(zhuǎn)換

/**
 * state匹配stamp時, 執(zhí)行下列操作之一. 
 *   1、stamp 已經(jīng)持有寫鎖敏释,直接返回.  
 *   2钥顽、讀模式靠汁,但是沒有更多的讀取者蝶怔,并返回一個寫鎖stamp.
 *   3、有一個樂觀讀鎖澳叉,只在即時可用的前提下返回一個寫鎖stamp
 *   4耳高、其他情況都返回0
 */
public long tryConvertToWriteLock(long stamp) {
    long a = stamp & ABITS, m, s, next;
    //state匹配stamp
    while (((s = state) & SBITS) == (stamp & SBITS)) {
        //沒有鎖
        if ((m = s & ABITS) == 0L) {
            if (a != 0L)
                break;
            //CAS修改狀態(tài)為持有寫鎖,并返回
            if (U.compareAndSwapLong(this, STATE, s, next = s + WBIT))
                return next;
        }
        //持有寫鎖
        else if (m == WBIT) {
            if (a != m)
                //其他線程持有寫鎖
                break;
            //當前線程已經(jīng)持有寫鎖
            return stamp;
        }
        //有一個讀鎖
        else if (m == RUNIT && a != 0L) {
            //釋放讀鎖概荷,并嘗試持有寫鎖
            if (U.compareAndSwapLong(this, STATE, s, next = s - RUNIT + WBIT))
                return next;
        }else
            break;
    }
    return 0L;
}

/**
 * state匹配stamp時, 執(zhí)行下列操作之一.
      1误证、stamp 表示持有寫鎖愈捅,釋放寫鎖慈鸠,并持有讀鎖
      2 stamp 表示持有讀鎖 青团,返回該讀鎖
      3 有一個樂觀讀鎖,只在即時可用的前提下返回一個讀鎖stamp
      4芦昔、其他情況都返回0娃肿,表示失敗
 *
 */
public long tryConvertToReadLock(long stamp) {
    long a = stamp & ABITS, m, s, next; WNode h;
    //state匹配stamp
    while (((s = state) & SBITS) == (stamp & SBITS)) {
        //沒有鎖
        if ((m = s & ABITS) == 0L) {
            if (a != 0L)
                break;
            else if (m < RFULL) {
                if (U.compareAndSwapLong(this, STATE, s, next = s + RUNIT))
                    return next;
            }
            else if ((next = tryIncReaderOverflow(s)) != 0L)
                return next;
        }
        //寫鎖
        else if (m == WBIT) {
            //非當前線程持有寫鎖
            if (a != m)
                break;
            //釋放寫鎖持有讀鎖
            state = next = s + (WBIT + RUNIT);
            if ((h = whead) != null && h.status != 0)
                release(h);
            return next;
        }
        //持有讀鎖
        else if (a != 0L && a < WBIT)
            return stamp;
        else
            break;
    }
    return 0L;
}

(6). 使用樂觀讀鎖

使用樂觀讀要保證以下順序:

// 獲取版本信息(樂觀鎖)
long stamp = lock.tryOptimisticRead();
// 復制變量到本地堆棧
copyVaraibale2ThreadMemory();
// 校驗,如果校驗失敗,說明此處樂觀鎖使用失敗,申請悲觀讀鎖
if(!lock.validate(stamp)){
    // 申請悲觀讀鎖
    long stamp = lock.readLock();
    try{
        // 復制變量到本地堆棧
        copyVaraibale2ThreadMemory();
    }finally{
        // 使用完之后釋放鎖
        lock.unlock();
    }
}
?著作權歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末凭豪,一起剝皮案震驚了整個濱河市嫂伞,隨后出現(xiàn)的幾起案子桐智,更是在濱河造成了極大的恐慌说庭,老刑警劉巖,帶你破解...
    沈念sama閱讀 212,816評論 6 492
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件姿搜,死亡現(xiàn)場離奇詭異舅柜,居然都是意外死亡,警方通過查閱死者的電腦和手機变抽,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,729評論 3 385
  • 文/潘曉璐 我一進店門绍载,熙熙樓的掌柜王于貴愁眉苦臉地迎上來击儡,“玉大人蝠引,你說我怎么就攤上這事螃概。” “怎么了茧痒?”我有些...
    開封第一講書人閱讀 158,300評論 0 348
  • 文/不壞的土叔 我叫張陵,是天一觀的道長超燃。 經(jīng)常有香客問我拘领,道長约素,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 56,780評論 1 285
  • 正文 為了忘掉前任,我火速辦了婚禮送悔,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘屋灌。我一直安慰自己应狱,他們只是感情好疾呻,可當我...
    茶點故事閱讀 65,890評論 6 385
  • 文/花漫 我一把揭開白布罐韩。 她就那樣靜靜地躺著,像睡著了一般龙考。 火紅的嫁衣襯著肌膚如雪晦款。 梳的紋絲不亂的頭發(fā)上枚冗,一...
    開封第一講書人閱讀 50,084評論 1 291
  • 那天赁温,我揣著相機與錄音股囊,去河邊找鬼。 笑死居灯,一個胖子當著我的面吹牛怪嫌,可吹牛的內(nèi)容都是我干的柳沙。 我是一名探鬼主播偎行,決...
    沈念sama閱讀 39,151評論 3 410
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼膨更!你這毒婦竟也來了缴允?” 一聲冷哼從身側(cè)響起练般,我...
    開封第一講書人閱讀 37,912評論 0 268
  • 序言:老撾萬榮一對情侶失蹤薄料,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后誊役,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體蛔垢,經(jīng)...
    沈念sama閱讀 44,355評論 1 303
  • 正文 獨居荒郊野嶺守林人離奇死亡鹏漆,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 36,666評論 2 327
  • 正文 我和宋清朗相戀三年艺玲,在試婚紗的時候發(fā)現(xiàn)自己被綠了验烧。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片碍拆。...
    茶點故事閱讀 38,809評論 1 341
  • 序言:一個原本活蹦亂跳的男人離奇死亡感混,死狀恐怖礼烈,靈堂內(nèi)的尸體忽然破棺而出此熬,到底是詐尸還是另有隱情滑进,我是刑警寧澤扶关,帶...
    沈念sama閱讀 34,504評論 4 334
  • 正文 年R本政府宣布节槐,位于F島的核電站拐纱,受9級特大地震影響秸架,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜币绩,卻給世界環(huán)境...
    茶點故事閱讀 40,150評論 3 317
  • 文/蒙蒙 一缆镣、第九天 我趴在偏房一處隱蔽的房頂上張望董瞻。 院中可真熱鬧田巴,春花似錦壹哺、人聲如沸管宵。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,882評論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽钱床。三九已至埠居,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間刑赶,已是汗流浹背懂衩。 一陣腳步聲響...
    開封第一講書人閱讀 32,121評論 1 267
  • 我被黑心中介騙來泰國打工浊洞, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留法希,地道東北人。 一個月前我還...
    沈念sama閱讀 46,628評論 2 362
  • 正文 我出身青樓,卻偏偏與公主長得像屋剑,于是被迫代替她去往敵國和親唉匾。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當晚...
    茶點故事閱讀 43,724評論 2 351

推薦閱讀更多精彩內(nèi)容