ConcurrentSkipListMap 源碼分析 (基于Java 8)

1. ConcurrentSkipListMap 簡(jiǎn)介

ConcurrentSkipListMap 一個(gè)并發(fā)安全, 基于 skip list 實(shí)現(xiàn)有序存儲(chǔ)的Map. OK 我們回憶一下 Java 中常用 的Map HashMap, TreeMap, ConcurrentHashMap, 還有就是我們今天的主角 ConcurrentSkipListMap.
ConcurrentSkipListMap 與 ConcurrentHashMap 同在 concurrent 包下, 雖然 ConcurrentSkipListMap 比 ConcurrentHashMap 多用存儲(chǔ)空間(用空間換時(shí)間), 但它有著ConcurrentHashMap不能比擬的優(yōu)點(diǎn): 有序數(shù)據(jù)存儲(chǔ), 基于的就是 skip list.

2. 跳表(skip list)

跳躍列表(skip list) 是一種隨機(jī)話的數(shù)據(jù)結(jié)構(gòu), 基于上下左右都是鏈表的數(shù)據(jù)結(jié)構(gòu), 其效率可以比擬 二叉查找樹; 基本上 skip list 是在鏈表的上層增加多層索引鏈表(增加是隨機(jī)的)實(shí)現(xiàn)的, 所以查找中根據(jù)索引層進(jìn)行跳躍快速查找, 因此得名;

如圖:


overview1.gif

上面這張圖來自 wiki, 圖中有添加的過程, 但是結(jié)構(gòu)沒有細(xì)節(jié), 我結(jié)合自己的理解, 也畫了一張: (圖片右擊在新的窗口中看, 簡(jiǎn)書的圖片預(yù)覽不能真實(shí)展現(xiàn)圖, 暈!)

concurrentSkipListMap_overview.png

從這張圖中我們可以得出 ConcurrentSkipListMap 的幾個(gè)特點(diǎn):

  1. ConcurrentSkipListMap 的節(jié)點(diǎn)主要由 Node, Index, HeadIndex 構(gòu)成;
  2. ConcurrentSkipListMap 的數(shù)據(jù)結(jié)構(gòu)橫向縱向都是鏈表
  3. 最下面那層鏈表是Node層(數(shù)據(jù)節(jié)點(diǎn)層), 上面幾層都是Index層(索引)
  4. 從縱向鏈表來看, 最左邊的是 HeadIndex 層, 右邊的都是Index 層, 且每層的最底端都是對(duì)應(yīng)Node, 縱向上的索引都是指向最底端的Node(這一點(diǎn)圖片為了美觀就沒體現(xiàn))

了解了數(shù)據(jù)結(jié)構(gòu), 我們直接來看源碼

2. 內(nèi)部節(jié)點(diǎn)類 Node
static final class Node<K, V>{
    final K key;  // key 是 final 的, 說明節(jié)點(diǎn)一旦定下來, 除了刪除, 不然不會(huì)改動(dòng) key 了
    volatile Object value; // 對(duì)應(yīng)的 value
    volatile Node<K, V> next; // 下一個(gè)節(jié)點(diǎn)
    
    // 構(gòu)造函數(shù)
    public Node(K key, Object value, Node<K, V> next) {
        this.key = key;
        this.value = value;
        this.next = next;
    }
    
    /**
     * 創(chuàng)建一個(gè)標(biāo)記節(jié)點(diǎn)(通過 this.value = this 來標(biāo)記)
     * 這個(gè)標(biāo)記節(jié)點(diǎn)非常重要: 有了它, 就能對(duì)鏈表中間節(jié)點(diǎn)進(jìn)行同時(shí)刪除了插入
     * ps: ConcurrentLinkedQueue 只能在頭上, 尾端進(jìn)行插入, 中間進(jìn)行刪除 
     */
    public Node(Node<K, V> next) {
        this.key = null;
        this.value = this;
        this.next = next;
    }

    /**
     * CAS 操作設(shè)置 Value
     */
    boolean casValue(Object cmp, Object val){
        return unsafe.compareAndSwapObject(this, valueOffset, cmp, val);
    }

    /**
     * CAS 操作設(shè)置 next
     */
    boolean casNext(Node<K, V> cmp, Node<K, V> val){
        return unsafe.compareAndSwapObject(this, nextOffset, cmp, val);
    }

    /**
     * 檢測(cè)是否為標(biāo)記節(jié)點(diǎn)
     */
    boolean isMarker(){
        return value == this;
    }

    /**
     * 檢測(cè)是否為 鏈表最左下角的 BASE_HEADER 節(jié)點(diǎn)
     */
    boolean isBaseHeader(){
        return value == BASE_HEADER;
    }

    /**
     * 對(duì)節(jié)點(diǎn)追加一個(gè)標(biāo)記節(jié)點(diǎn), 為最終的刪除做準(zhǔn)備
     */
    boolean appendMarker(Node<K, V> f){
        return casNext(f, new Node<K, V>(f));
    }

    /**
     * Help out a deletion by appending marker or unlinking from
     * predecessor. This called during traversals when value
     * field seen to be null
     * 
     * helpDelete 方法, 這個(gè)方法要么追加一個(gè)標(biāo)記節(jié)點(diǎn), 要么進(jìn)行刪除操作
     */
    void helpDelete(Node<K, V> b, Node<K, V> f){
        /**
         * Rechecking links and then doing only one of the
         * help-out stages per call tends to minimize CAS
         * interference among helping threads
         */
        if(f == next && this == b.next){
            if(f == null || f.value != f){ // 還沒有對(duì)刪除的節(jié)點(diǎn)進(jìn)行節(jié)點(diǎn) marker
                casNext(f, new Node<K, V>(f));
            }else{
                b.casNext(this, f.next); // 刪除 節(jié)點(diǎn) b 與 f.next 之間的節(jié)點(diǎn)
            }
        }
    }

    /**
     * 校驗(yàn)數(shù)據(jù)
     */
    V getValidValue(){
        Object v = value;
        if(v == this || v == BASE_HEADER){
            return null;
        }
        V vv = (V)v;
        return vv;
    }

    /**
     * Creates and returns a new SimpleImmutableEntry holding current
     * mapping if this node holds a valid value, else null.
     *
     * @return new entry or null
     */
    AbstractMap.SimpleImmutableEntry<K, V> createSnapshot(){
        Object v = value;
        if(v == null || v == this || v == BASE_HEADER){
            return null;
        }
        V vv = (V) v;
        return new AbstractMap.SimpleImmutableEntry<K, V>(key, vv);
    }

    // UNSAFE mechanics
    private static final Unsafe unsafe;
    private static final long valueOffset;
    private static final long nextOffset;

    static {
        try {
            unsafe = UnSafeClass.getInstance();
            Class<?> k = Node.class;
            valueOffset = unsafe.objectFieldOffset(k.getDeclaredField("value"));
            nextOffset = unsafe.objectFieldOffset(k.getDeclaredField("next"));
        }catch (Exception e){
            throw new Error(e);
        }
    }

}

從上面的代碼中我們大體知道 Node 有以下幾個(gè)特點(diǎn):

  1. this.value = this 的標(biāo)記節(jié)點(diǎn)
  2. 刪除分為2步, 把 non-null 變成 null, 在進(jìn)行添加標(biāo)記節(jié)點(diǎn) (helpDelete方法中)
3. 索引節(jié)點(diǎn) Index
static class Index<K, V>{

    final Node<K, V> node; // 索引指向的節(jié)點(diǎn), 縱向上所有索引指向鏈表最下面的節(jié)點(diǎn)
    final Index<K, V> down; // 下邊level層的 Index
    volatile Index<K, V> right; // 右邊的  Index

    /**
     * Creates index node with given values
     * @param node
     * @param down
     * @param right
     */
    public Index(Node<K, V> node, Index<K, V> down, Index<K, V> right) {
        this.node = node;
        this.down = down;
        this.right = right;
    }

    /**
     * compareAndSet right field
     * @param cmp
     * @param val
     * @return
     */
    final boolean casRight(Index<K, V> cmp, Index<K, V> val){
        return unsafe.compareAndSwapObject(this, rightOffset, cmp, val);
    }

    /**
     * Returns true if the node this indexes has been deleted.
     * @return true if indexed node is known to be deleted
     */
    final boolean indexesDeletedNode(){
        return node.value == null;
    }

    /**
     * Tries to CAS newSucc as successor. To minimize races with
     * unlink that may lose this index node, if the node being
     * indexed is known to be deleted, it doesn't try to link in
     *
     * @param succ the expecteccurrent successor
     * @param newSucc the new successor
     * @return true if successful
     */
    /**
     * 在 index 本身 和 succ 之間插入一個(gè)新的節(jié)點(diǎn) newSucc
     * @param succ
     * @param newSucc
     * @return
     */
    final boolean link(Index<K, V> succ, Index<K, V> newSucc){
        Node<K, V> n = node;
        newSucc.right = succ;
        return n.value != null  && casRight(succ, newSucc);
    }

    /**
     * Tries to CAS field to skip over apparent successor
     * succ. Fails (forcing a retravesal by caller) if this node
     * is known to be deleted
     * @param succ the expected current successor
     * @return true if successful
     */
    /**
     * 將當(dāng)前的節(jié)點(diǎn) index 設(shè)置其的 right 為 succ.right 等于刪除 succ 節(jié)點(diǎn)
     * @param succ
     * @return
     */
    final boolean unlink(Index<K, V> succ){
        return node.value != null && casRight(succ, succ.right);
    }

    // Unsafe mechanics
    private static final Unsafe unsafe;
    private static final long rightOffset;

    static {
        try{
            unsafe = UnSafeClass.getInstance();
            Class<?> k = Index.class;
            rightOffset = unsafe.objectFieldOffset(k.getDeclaredField("right"));
        }catch (Exception e){
            throw new Error(e);
        }
    

Index 特點(diǎn):

  1. 縱向鏈表都指向同個(gè) Node
  2. Index 有兩個(gè)表 常用的方法 link, unlink 方法, 后邊會(huì)提及到的
4. 頭索引節(jié)點(diǎn) HeadIndex
/**
 * Nodes heading each level keep track of their level.
 */
static final class HeadIndex<K,V> extends Index<K,V> {
    final int level;
    HeadIndex(Node<K,V> node, Index<K,V> down, Index<K,V> right, int level) {
        super(node, down, right);
        this.level = level;
    }
}

HeadIndex 沒什么特別, 只是增加一個(gè) level 屬性用來標(biāo)示索引層級(jí); 注意所有的 HeadIndex 都指向同一個(gè) Base_header 節(jié)點(diǎn);

OK, 下面我們來看看 ConcurrentSkipListMap 的主要方法 doPut, doGet, doRemove

5. 數(shù)據(jù)增加方法 doPut()
/**
 * Main insetion method. Adds element if not present, or
 * replaces value if present and onlyIfAbsent is false.
 *
 * @param key the key
 * @param value the values that must be associated with key
 * @param onlyIfAbstsent if should not insert if already present
 * @return the old value, or null if newly inserted
 */
private V doPut(K key, V value, boolean onlyIfAbstsent){
    Node<K, V> z; // adde node
    if(key == null){
        throw new NullPointerException();
    }
    Comparator<? super K> cmp = comparator;
    outer:
    for(;;){
        // 0.
        for(Node<K, V> b = findPredecessor(key, cmp), n = b.next;;){ // 1. 將 key 對(duì)應(yīng)的前繼節(jié)點(diǎn)找到, b 為前繼節(jié)點(diǎn), n是前繼節(jié)點(diǎn)的next, 若沒發(fā)生 條件競(jìng)爭(zhēng), 最終 key在 b 與 n 之間 (找到的b在 base_level 上)
            if(n != null){ // 2. n = null時(shí) b 是鏈表的最后一個(gè)節(jié)點(diǎn), key 直接插到 b 之后 (調(diào)用 b.casNext(n, z))
                Object v; int c;
                Node<K, V> f = n.next; // 3 獲取 n 的右節(jié)點(diǎn)
                if(n != b.next){ // 4. 條件競(jìng)爭(zhēng)(另外一個(gè)線程在b之后插入節(jié)點(diǎn), 或直接刪除節(jié)點(diǎn)n), 則 break 到位置 0, 重新
                    break ;
                }
                if((v = n.value) == null){ // 4. 若 節(jié)點(diǎn)n已經(jīng)刪除, 則 調(diào)用 helpDelete 進(jìn)行幫助刪除 (詳情見 helpDelete), 則 break 到位置 0, 重新來
                    n.helpDelete(b, f);
                    break ;
                }

                if(b.value == null || v == n){ // 5. 節(jié)點(diǎn)b被刪除中 ,則 break 到位置 0, 調(diào)用 findPredecessor 幫助刪除 index 層的數(shù)據(jù), 至于 node 層的數(shù)據(jù) 會(huì)通過 helpDelete 方法進(jìn)行刪除
                    break ;
                }
                if((c = cpr(cmp, key, n.key)) > 0){ // 6. 若 key 真的 > n.key (在調(diào)用 findPredecessor 時(shí)是成立的), 則進(jìn)行 向后走
                    b = n;
                    n = f;
                    continue ;
                }
                if(c == 0){ // 7. 直接進(jìn)行賦值
                    if(onlyIfAbstsent || n.casValue(v, value)){
                        V vv = (V) v;
                        return vv;
                    }
                    break ; // 8. cas 競(jìng)爭(zhēng)條件失敗 重來
                }
                // else c < 0; fall through
            }
            // 9. 到這邊時(shí) n.key > key > b.key
            z = new Node<K, V> (key, value, n);
            if(!b.casNext(n, z)){
                break ; // 10. cas競(jìng)爭(zhēng)條件失敗 重來
            }
            break outer; // 11. 注意 這里 break outer 后, 上面的 for循環(huán)不會(huì)再執(zhí)行, 而后執(zhí)行下面的代碼, 這里是break 不是 continue outer, 這兩者的效果是不一樣的
        }
    }

    int rnd = KThreadLocalRandom.nextSecondarySeed();
    if((rnd & 0x80000001) == 0){ // 12. 判斷是否需要添加level
        int level = 1, max;
        while(((rnd >>>= 1) & 1) != 0){
            ++level;
        }
        // 13. 上面這段代碼是獲取 level 的, 我們這里只需要知道獲取 level 就可以 (50%的幾率返回0尉剩,25%的幾率返回1,12.5%的幾率返回2...最大返回31毅臊。)
        Index<K, V> idx = null;
        HeadIndex<K, V> h = head;
        if(level <= (max = h.level)){ // 14. 初始化 max 的值, 若 level 小于 max , 則進(jìn)入這段代碼 (level 是 1-31 之間的隨機(jī)數(shù))
            for(int i = 1; i <= level; ++i){
                idx = new Index<K, V>(z, idx, null); // 15 添加 z 對(duì)應(yīng)的 index 數(shù)據(jù), 并將它們組成一個(gè)上下的鏈表(index層是上下左右都是鏈表)
            }
        }
        else{ // 16. 若 level > max 則只增加一層 index 索引層
            level = max + 1; // 17. 跳表新的 level 產(chǎn)生
            Index<K, V>[] idxs = (Index<K, V>[])new Index<?, ?>[level + 1];
            for(int i = 1; i <= level; ++i){
                idxs[i] = idx = new Index<K, V>(z, idx, null);
            }
            for(;;){
                h = head;
                int oldLevel = h.level; // 18. 獲取老的 level 層
                if(level <= oldLevel){ // 19. 另外的線程進(jìn)行了index 層增加操作, 所以 不需要增加 HeadIndex 層數(shù)
                    break;
                }
                HeadIndex<K, V> newh = h;
                Node<K, V> oldbase = h.node; // 20. 這里的 oldbase 就是BASE_HEADER
                for(int j = oldLevel+1; j <= level; ++j){ // 21. 這里其實(shí)就是增加一層的 HeadIndex (level = max + 1)
                    newh = new HeadIndex<K, V>(oldbase, newh, idxs[j], j); // 22. idxs[j] 就是上面的 idxs中的最高層的索引
                }
                if(casHead(h, newh)){ // 23. 這只新的 headIndex
                    h = newh;  // 24. 這里的 h 變成了 new HeadIndex
                    idx = idxs[level = oldLevel];  // 25. 這里的 idx 上從上往下第二層的 index 節(jié)點(diǎn) level 也變成的 第二
                    break;
                }
            }
        }

        // find insertion points and splice in
        splice:
        for(int insertionLevel = level;;){ // 26. 這時(shí)的 level 已經(jīng)是 第二高的 level(若上面 步驟19 條件競(jìng)爭(zhēng)失敗, 則多出的 index 層其實(shí)是無用的, 因?yàn)?那是 調(diào)用 Index.right 是找不到它的)
            int j = h.level;
            for(Index<K, V> q = h, r = q.right, t = idx;;){ // 27. 初始化對(duì)應(yīng)的數(shù)據(jù)
                if(q == null || t == null){ // 28. 節(jié)點(diǎn)都被刪除 直接 break出去
                    break splice;
                }
                if(r != null){
                    Node<K, V> n = r.node;
                    // compare before deletion check avoids needing recheck
                    int c = cpr(cmp, key, n.key);
                    if(n.value == null){ // 29. 老步驟, 幫助index 的刪除
                        if(!q.unlink(r)){
                            break ;
                        }
                        r = q.right; // 30. 向右進(jìn)行遍歷
                        continue ;
                    }

                    if(c > 0){ // 31. 向右進(jìn)行遍歷
                        q = r;
                        r = r.right;
                        continue ;
                    }
                }

                // 32.
                // 代碼運(yùn)行到這里, 說明 key < n.key
                // 第一次運(yùn)行到這邊時(shí), j 是最新的 HeadIndex 的level j > insertionLevel 非常用可能, 而下面又有 --j, 所以終會(huì)到 j == insertionLevel
                if(j == insertionLevel){
                    if(!q.link(r, t)){ // 33. 將 index t 加到 q 與 r 中間, 若條件競(jìng)爭(zhēng)失敗的話就重試
                        break ; // restrt
                    }
                    if(t.node.value == null){ // 34. 若這時(shí) node 被刪除, 則開始通過 findPredecessor 清理 index 層, findNode 清理 node 層, 之后直接 break 出去, doPut調(diào)用結(jié)束
                        findNode(key);
                        break splice;
                    }
                    if(--insertionLevel == 0){ // 35. index 層添加OK理茎, --1 為下層插入 index 做準(zhǔn)備
                        break splice;
                    }
                }

                /**
                 * 下面這行代碼其實(shí)是最重要的, 理解這行代碼, 那 doPut 就差不多了
                 * 1). --j 要知道 j 是 newhead 的level, 一開始一定 > insertionLevel的, 通過 --1 來為下層操作做準(zhǔn)備 (j 是 headIndex 的level)
                 * 2). 通過 19. 21, 22 步驟, 個(gè)人認(rèn)為 --j >= insertionLevel 是橫成立, 而 --j 是必須要做的
                 * 3) j 經(jīng)過幾次--1管嬉, 當(dāng)出現(xiàn) j < level 時(shí)說明 (j+1) 層的 index已經(jīng)添加成功, 所以處理下層的 index
                 */
                if(--j >= insertionLevel && j < level){
                    t = t.down;
                }
                /** 到這里時(shí), 其實(shí)有兩種情況
                 *  1) 還沒有一次index 層的數(shù)據(jù)插入
                 *  2) 已經(jīng)進(jìn)行 index 層的數(shù)據(jù)插入, 現(xiàn)在為下一層的插入做準(zhǔn)備
                 */
                q = q.down; // 從 index 層向下進(jìn)行查找
                r = q.right;

            }
        }
    }
    return null;
}

doPut方法分析:
整個(gè)doPut方法看起來有點(diǎn)嚇人, 但沒事,我們將這個(gè)方法進(jìn)行分割:

  1. 獲取前繼節(jié)點(diǎn)(步驟1 findPredecessor), 進(jìn)行節(jié)點(diǎn)的插入 (步驟 10 b.casNext(n, z))
  2. 準(zhǔn)備 Index 層的數(shù)據(jù) (idxs[i] = idx = new Index<K, V>(z, idx, null)), 準(zhǔn)備 HeadIndex的數(shù)據(jù), 進(jìn)行插入(步驟 23)
  3. 循環(huán)遍歷Index層, 進(jìn)行索引層的插入(步驟 33 q.link(r, t))
    還迷糊, 沒事, 我們繼續(xù)細(xì)分拆解講:
6. doPut() 增加數(shù)據(jù)過程

6.1. ConcurrentSkipListMap在新建時(shí)的初始狀態(tài)如圖:

state1.png

初始時(shí), 只存在 HeadIndex 和 Base_Header 節(jié)點(diǎn)

6.2. 進(jìn)行節(jié)點(diǎn)添加
6.2.1. 添加 key=1, value = A 節(jié)點(diǎn), 結(jié)果如圖:

state2.png

涉及doPut操作:

1. doPut步驟1 尋找前繼節(jié)點(diǎn), 這時(shí)返回的 b = BaseHeader, n = null, 所以直接到 doPut 步驟 9
2. doPut步驟9, 直接 CAS操作設(shè)置next節(jié)點(diǎn) 
3. 這里假設(shè) 步驟 12 中獲取的 level 是0(要知道獲得0的概率是很大的, 這個(gè)函數(shù)返回的最大值也就31, 也就是說, 最多有31層的索引)
4. 所以這時(shí) idx = null, 直接到步驟 28 break 出去皂林,操作結(jié)束

這里為了理解上的便利, 我們?cè)偬砑右粋€(gè)節(jié)點(diǎn), 最終效果圖如下:

state3.png

6.2.2. 再次添加 key=3, value = C 節(jié)點(diǎn), 結(jié)果如圖:

state4.png

這次增加了索引層 index 1
涉及doPut操作:

1. doPut步驟1 尋找前繼節(jié)點(diǎn), 這時(shí)返回的 b = node2, n = null, 所以直接到 doPut 步驟 9
2. doPut步驟9, 直接 CAS操作設(shè)置next節(jié)點(diǎn)
3. 進(jìn)入步驟 12, 假設(shè)我們獲取到 level = 1, 則步驟14 中 level <= max(max = 1)成立, 初始化一個(gè) idx
4. 最終找到要插入index位置, 進(jìn)行l(wèi)ink操作, 步驟 33

ps: 這時(shí)再put節(jié)點(diǎn) key=4 value = D (情形和 Node1, Node2 一樣), 最終結(jié)果:

state5.png

6.2.3. 再次添加 key=5, value = E 節(jié)點(diǎn), 結(jié)果如圖:

state6.png

這時(shí)發(fā)現(xiàn)索引層增加了一層
我們來看 doPut 操作:

1. doPut步驟1 尋找前繼節(jié)點(diǎn), 這時(shí)返回的 b = node2, n = null, 所以直接到 doPut 步驟 9
2. doPut步驟9, 直接 CAS操作設(shè)置next節(jié)點(diǎn)
3. 進(jìn)入步驟 12, 假設(shè)我們獲取到 level = 25, 則步驟14 中 level <= max(max = 1)不成立, 則進(jìn)入 步驟16, 這里我們發(fā)現(xiàn), 只要 level > max, 只是在原來的 max + 1, 就是指增加一層的索引
4. 進(jìn)行 indx 鏈表的初始化, 一共兩個(gè)鏈表節(jié)點(diǎn) (index是縱向的鏈表)
5. 步驟21, 其實(shí)這時(shí)只是在原來的 HeadIndex 的縱向鏈表上增加一個(gè)新節(jié)點(diǎn)
6. 步驟 23 CAS 新的 HeadIndex, 這里有個(gè)注意點(diǎn) h = newh, 而index是第二高的 Index, 為什么呢? 因?yàn)椴襟E 22中已經(jīng)將最高層的 HadeIndex與Index橫向連接起來了(奇妙吧, 一會(huì)橫向一會(huì)縱向)
7. 步驟 26, 這里的 insertionLevel 是第二高的 level, 而下面的 j 則是最高層的 level
8. 最后就是步驟30中的link操作(j > insertionLevel, 所以先 j-- , 再for loop, 接著就滿足了 )

我們?cè)诓迦雮€(gè) key = 11, value = w (步驟和 node1, node2 一樣, 這里省略了)
最終如圖:

state7.png

6.2.4. 再次添加 key=8, value = W 節(jié)點(diǎn), 結(jié)果如圖:

state8.png

這里和添加 key= 5 差不多, 唯一的區(qū)別就是 獲取的前繼節(jié)點(diǎn).right != null 而已, 所以不說了

總結(jié): 其實(shí)doPut方法就是獲取key對(duì)應(yīng)的前繼節(jié)點(diǎn), 然后cas設(shè)置next值, 隨后 生成隨機(jī) level(0-31之間), 若新的 level <= oldMaxLevel 則增加對(duì)應(yīng)的索引層, 若level > oldMaxLevel, 則 HeadIndex 也會(huì)隨之增加索引層;

上面doPut時(shí)每次都會(huì)調(diào)用 findPredecessor 來獲取前繼節(jié)點(diǎn), 那我們就看這個(gè)方法

7. findPredecessor() 尋找前繼節(jié)點(diǎn)

總體思路是: 從矩形鏈表的左上角的 HeadIndex 索引開始, 先向右, 遇到 null, 或 > key 時(shí)向下, 重復(fù)向右向下找, 一直找到 對(duì)應(yīng)的前繼節(jié)點(diǎn)(前繼節(jié)點(diǎn)就是小于 key 的最大節(jié)點(diǎn))


/**
 * Returns a base-level node with key strictly less than given key,
 * or the base-level header if there is no such node. Also
 * unlinks indexes to deleted nodes found along the way. Callers
 * rely on this side-effect of clearing indices to deleted nodes
 * @param key the key
 * @return a predecessor of the key
 */
private Node<K, V> findPredecessor(Object key, Comparator<? super K> cmp){
    if(key == null)
        throw new NullPointerException(); // don't postpone errors
    for(;;){
        for(Index<K, V> q = head, r = q.right, d;;){ // 1. 初始化數(shù)據(jù) q 是head, r 是 最頂層 h 的右Index節(jié)點(diǎn)
            if(r != null){ // 2. 對(duì)應(yīng)的 r =  null, 則進(jìn)行向下查找
                Node<K, V> n = r.node;
                K k = n.key;
                if(n.value == null){ // 3. n.value = null 說明 節(jié)點(diǎn)n 正在刪除的過程中
                    if(!q.unlink(r)){ // 4. 在 index 層直接刪除 r 節(jié)點(diǎn), 若條件競(jìng)爭(zhēng)發(fā)生直接進(jìn)行break 到步驟1 , 重新從 head 節(jié)點(diǎn)開始查找
                        break; // restart
                    }
                    r = q.right; //reread r // 5. 刪除 節(jié)點(diǎn)r 成功, 獲取新的 r 節(jié)點(diǎn), 回到步驟 2 (還是從這層索引開始向右遍歷, 直到 r == null)
                    continue;
                }

                if(cpr(cmp, key, k) > 0){ // 6. 若 r.node.key < 參數(shù)key, 則繼續(xù)向右遍歷, continue 到 步驟 2處, 若 r.node.key >  參數(shù)key 直接跳到 步驟 7
                    q = r;
                    r = r.right;
                    continue;
                }
            }

            if((d = q.down) == null){ // 7. 到這邊時(shí), 已經(jīng)到跳表的數(shù)據(jù)層, q.node < key < r的 或q.node < key 且 r == null; 所以直接返回 q.node
                return q.node;
            }

            q = d; // 8 未到數(shù)據(jù)層, 進(jìn)行重新賦值向下走 (為什么向下走呢? 回過頭看看 跳表, 原來 上層的index 一般都是比下層的 index 個(gè)數(shù)少的)
            r = d.right;
        }
    }
}

若尋找的過程不是很清楚, 直接看一下最上邊的那張圖

7. doGet() 獲取節(jié)點(diǎn)對(duì)應(yīng)的值

整個(gè)過程:

  1. 尋找 key 的前繼節(jié)點(diǎn) b (這時(shí)b.next = null || b.next > key, 則說明不存key對(duì)應(yīng)的 Node)
  2. 接著就判斷 b, b.next 與 key之間的關(guān)系(其中有些 helpDelete操作)

直接看代碼:

/**
 * Gets value for key. Almost the same as findNode, but returns
 * the found value (to avoid retires during ret-reads)
 *
 *  這個(gè) doGet 方法比較簡(jiǎn)單
 * @param key the key
 * @return the value, or null if absent
 */
private V doGet(Object key){
    if(key == null){
        throw new NullPointerException();
    }
    Comparator<? super K> cmp = comparator;
    outer:
    for(;;){
        for(Node<K, V> b = findPredecessor(key, cmp), n = b.next;;){ // 1. 獲取 key 的前繼節(jié)點(diǎn) b, 其實(shí)這時(shí) n.key >= key
            Object v; int c;
            if(n == null){ // 2. n == null 說明 key 對(duì)應(yīng)的 node 不存在 所以直接 return null
                break outer;
            }
            Node<K, V> f = n.next;
            if(n != b.next){ // 3. 有另外的線程修改數(shù)據(jù), 重新來
                break ;
            }
            if((v = n.value) == null){ // 4. n 是被刪除了的節(jié)點(diǎn), 進(jìn)行helpDelete 后重新再來
                n.helpDelete(b, f);
                break ;
            }
            if(b.value == null || v == n){ // 5. b已經(jīng)是刪除了的節(jié)點(diǎn), 則 break 后再來
                break ;
            }
            if((c = cpr(cmp, key, n.key)) == 0){ // 6. 若 n.key = key 直接返回結(jié)果, 這里返回的結(jié)果有可能是 null
                V vv = (V) v;
                return vv;
            }
            if(c < 0){ // 7. c < 0說明不存在 key 的node 節(jié)點(diǎn)
                break outer;
            }
            // 8. 運(yùn)行到這一步時(shí), 其實(shí)是 在調(diào)用 findPredecessor 后又有節(jié)點(diǎn)添加到 節(jié)點(diǎn)b的后面所致
            b = n;
            n = f;
        }
    }

    return null;
}
8. doRemove() 刪除節(jié)點(diǎn)

整個(gè)刪除個(gè) ConcurrentSkipListMap 里面 nonBlockingLinkedList 實(shí)現(xiàn)的一大亮點(diǎn), 為什么呢? 因?yàn)檫@個(gè) nonBlockingLinkedList 同時(shí)支持并發(fā)安全的從鏈表中間添加/刪除操作, 而 ConcurrentLinkedQueue 只支持并發(fā)安全的從鏈表中間刪除;
刪除操作:

  1. 尋找對(duì)應(yīng)的節(jié)點(diǎn)
  2. 給節(jié)點(diǎn)的 value 至 null, node.value = null
  3. 將 node 有增加一個(gè)標(biāo)記節(jié)點(diǎn) (this.value = this 還記得哇, 不記得的直接看 node 類)
  4. 通過 CAS 直接將 K對(duì)應(yīng)的Node和標(biāo)記節(jié)點(diǎn)一同刪除

直接來看代碼:

/**
 * Main deletion method. Locates node, nulls value, appends a
 * deletion marker, unlinks predecessor, removes associated index
 * nodes, and possibly reduces head index level
 *
 * Index nodes are cleared out simply by calling findPredecessor.
 * which unlinks indexes to deleted nodes found along path to key,
 * which will include the indexes to this node. This is node
 * unconditionally. We can't check beforehand whether there are
 * indexes hadn't been inserted yet for this node during initial
 * search for it, and we'd like to ensure lack of garbage
 * retention, so must call to be sure
 *
 * @param key the key
 * @param value if non-null, the value that must be
 *              associated with key
 * @return the node, or null if not found
 */
final V doRemove(Object key, Object value){
    if(key == null){
        throw new NullPointerException();
    }
    Comparator<? super K> cmp = comparator;
    outer:
    for(;;){
        for(Node<K, V> b = findPredecessor(key, cmp), n = b.next;;){ // 1. 獲取對(duì)應(yīng)的前繼節(jié)點(diǎn) b
            Object v; int c;
            if(n == null){ // 2. 節(jié)點(diǎn) n 被刪除 直接 return null 返回 , 因?yàn)槔碚撋?b.key < key < n.key
                break outer;
            }
            Node<K, V> f = n.next;
            if(n != b.next){ // 3. 有其他線程在 節(jié)點(diǎn)b 后增加數(shù)據(jù), 重來
                break ;
            }
            if((v = n.value) == null){ // 4. 節(jié)點(diǎn) n 被刪除, 調(diào)用 helpDelete 后重來
                n.helpDelete(b, f);
                break ;
            }

            if(b.value == null || v == n){ // 5. 節(jié)點(diǎn) b 刪除, 重來 調(diào)用findPredecessor時(shí)會(huì)對(duì) b節(jié)點(diǎn)對(duì)應(yīng)的index進(jìn)行清除, 而b借點(diǎn)吧本身會(huì)通過 helpDelete 來刪除
                break ;
            }
            if((c = cpr(cmp, key, n.key)) < 0){ // 6. 若n.key < key 則說明 key 對(duì)應(yīng)的節(jié)點(diǎn)就不存在, 所以直接 return
                break outer;
            }

            if(c > 0){ // 7. c>0 出現(xiàn)在 有其他線程在本方法調(diào)用findPredecessor后又在b 后增加節(jié)點(diǎn), 所以向后遍歷
                b = n;
                n = f;
                continue ;
            }

            if(value != null && !value.equals(v)){ // 8. 若 前面的條件為真, 則不進(jìn)行刪除 (調(diào)用 doRemove 時(shí)指定一定要滿足 key value 都相同, 具體看 remove 方法)
                break outer;
            }
            if(!n.casValue(v, null)){ // 9. 進(jìn)行數(shù)據(jù)的刪除
                break ;
            }
            if(!n.appendMarker(f) || !b.casNext(n, f)){ // 10. 進(jìn)行 marker 節(jié)點(diǎn)的追加, 這里的第二個(gè) cas 不一定會(huì)成功, 但沒關(guān)系的 (第二個(gè) cas 是刪除 n節(jié)點(diǎn), 不成功會(huì)有  helpDelete 進(jìn)行刪除)
                findNode(key);  // 11. 對(duì) key 對(duì)應(yīng)的index 進(jìn)行刪除
            }
            else{
                findPredecessor(key, cmp); //12. 對(duì) key 對(duì)應(yīng)的index 進(jìn)行刪除 10進(jìn)行操作失敗后通過 findPredecessor 進(jìn)行index 的刪除
                if(head.right == null){
                    tryReduceLevel(); // 13. 進(jìn)行headIndex 對(duì)應(yīng)的index 層的刪除
                }
            }

            V vv = (V) v;
            return vv;

        }
    }

    return null;
}

整個(gè)doRemove代碼比較簡(jiǎn)單,我就不說了..... 但問題來了, 為什么要?jiǎng)h除時(shí)增加 marker 節(jié)點(diǎn), 這到底什么作用?
先給大家一個(gè)結(jié)論: 沒有這個(gè)marker節(jié)點(diǎn)會(huì)有可能多刪除節(jié)點(diǎn), marker的存在保證之間節(jié)點(diǎn)一邊刪除一邊插入數(shù)據(jù) 是安全的

9. marker 節(jié)點(diǎn)的作用

如圖 demo1(沒有maker節(jié)點(diǎn)進(jìn)行刪除插入操作):
有三個(gè)鏈接在一起的Node (node 有三個(gè)屬性 key, value, next, 且所有操作都是 cas)
* +------+ +------+ +------+
* ... | A |------>| B |----->| C | ...
* +------+ +------+ +------+

Thread 1 Thread 2
if B.value == null 判斷B的value是否為null
B.value = null
A.casNext(B, B.next)
Node D = new Node(C); B.casNext(C, D);

步驟:

1. Thread 2 準(zhǔn)備在B的后面插入一個(gè)節(jié)點(diǎn) D, 它先判斷 B.value == null, 發(fā)現(xiàn) B 沒被刪除(假設(shè) value = null 是刪除)
2. Thread1 對(duì) B 節(jié)點(diǎn)進(jìn)行刪除 B.value = null
3. Thread 1 直接設(shè)置 A的next是 B 的next (A.casNext(B, B.next)) 成功將節(jié)點(diǎn) B 刪除
4. 這時(shí) Thread 2 new 一個(gè)節(jié)點(diǎn) D, 直接設(shè)置 B.casNext(C, D) 成功
5. 最終效果, 因?yàn)楣?jié)點(diǎn)B被刪除掉, 所以節(jié)點(diǎn)D也沒有插入進(jìn)去(沒插進(jìn)去指不能從以A節(jié)點(diǎn)為head, 通過 next() 方法獲取到節(jié)點(diǎn)D)

原因: 若隊(duì)列在刪除的過程中沒有引入 maker 節(jié)點(diǎn), 有可能導(dǎo)致剛剛插入的節(jié)點(diǎn)無緣無故的消失

下面是一個(gè)例子:
原本: 有3各節(jié)點(diǎn) A, B, C 組成的隊(duì)列
A -> B -> C (A.next = B, B.next = C)
現(xiàn)在有兩個(gè)線程 (thread1, thread2) 進(jìn)行操作, thread1 進(jìn)行刪除節(jié)點(diǎn)B, thread2 在節(jié)點(diǎn)B后插入節(jié)點(diǎn)D

進(jìn)行操作前: 
+------+       +------+      +------+
|   A  |------>|   B  |----->|   C  | 
+------+       +------+      +------+

進(jìn)行操作后: 
D 插入到節(jié)點(diǎn)B之后成功了, 可是隊(duì)列的 頭結(jié)點(diǎn)是 A, 通過A.next().next().next()..... 方法
無法訪問到節(jié)點(diǎn)D(D節(jié)點(diǎn)已經(jīng)丟失了)
            +------+      +------+
            |   B  |----->|   D  | 
            +------+      +------+
                              |
                              V
+------+                   +------+
|   A  |------>----->----->|   C  | 
+------+                   +------+

如圖 demo2(有 marker 節(jié)點(diǎn)):
有三個(gè)鏈接在一起的Node (node 有三個(gè)屬性 key, value, next, 且所有操作都是 cas)
* +------+ +------+ +------+
* ... | A |------>| B |----->| C | ...
* +------+ +------+ +------+

Thread 1 Thread 2
if B.value == null 判斷B的value是否為null
B.value = null
M = new MarkerNode(C); B.caseNext(C, M);
A.casNext(B, B.next)
Node D = new Node(); B.casNext(C, D);

步驟:

1. Thread 2 準(zhǔn)備在B的后面插入一個(gè)節(jié)點(diǎn) D, 它先判斷 B.value == null, 發(fā)現(xiàn) B 沒被刪除(假設(shè) value = null 是刪除)
2. Thread1 對(duì) B 節(jié)點(diǎn)進(jìn)行刪除 B.value = null
3. Thread 1 在B的后面追加一個(gè) MarkerNode M 
4. Thread 1 將 B 與 M 一起刪除
5. 這時(shí)你會(huì)發(fā)現(xiàn) Thread 2 的 B.casNext(C, D) 發(fā)生的可能 :
  1) 在Thread 1 設(shè)置 marker 節(jié)點(diǎn)前操作, 則B.casNext(C, D) 成功, B 與 marker 也一起刪除
  2) 在Thread 1 設(shè)置maker之后, 刪除 b與marker之前, 則B.casNext(C, D) 操作失敗(b.next 變成maker了), 所以在代碼中加個(gè) loop 循環(huán)嘗試
  3) 在Thread 1 刪除 B, marker 之后, 則B.casNext(C, D) 失敗(b.next變成maker, B.casNext(C,D) 操作失敗, 在 loop 中重新進(jìn)行嘗試插入)
6. 最終結(jié)論 maker 節(jié)點(diǎn)的存在致使 非阻塞鏈表能實(shí)現(xiàn)中間節(jié)點(diǎn)的刪除和插入同時(shí)安全進(jìn)行(反過來就是若沒有marker節(jié)點(diǎn), 有可能剛剛插入的數(shù)據(jù)就丟掉了)
10. ConcurrentSkipListMap 總結(jié)

整個(gè)代碼分析過來, 發(fā)現(xiàn)了很多有意思的東西, skip list, nonblockingLinkedList 等等!
ConcurrentSkipListMap 是一個(gè)基于 Skip List 實(shí)現(xiàn)的并發(fā)安全, 非阻塞讀/寫/刪除 的 Map, 最重要的是 它的value是有序存儲(chǔ)的, 而且其內(nèi)部是由縱橫鏈表組成, 在進(jìn)行java開發(fā)中有一定的運(yùn)用場(chǎng)景!

參考:
wiki 定義 Skip List
High Performance Dynamic Lock-Free Hash Tables and List-Based Sets

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末蚯撩,一起剝皮案震驚了整個(gè)濱河市础倍,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌求厕,老刑警劉巖著隆,帶你破解...
    沈念sama閱讀 219,270評(píng)論 6 508
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件扰楼,死亡現(xiàn)場(chǎng)離奇詭異呀癣,居然都是意外死亡,警方通過查閱死者的電腦和手機(jī)弦赖,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,489評(píng)論 3 395
  • 文/潘曉璐 我一進(jìn)店門项栏,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人蹬竖,你說我怎么就攤上這事沼沈。” “怎么了币厕?”我有些...
    開封第一講書人閱讀 165,630評(píng)論 0 356
  • 文/不壞的土叔 我叫張陵列另,是天一觀的道長(zhǎng)。 經(jīng)常有香客問我旦装,道長(zhǎng)页衙,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 58,906評(píng)論 1 295
  • 正文 為了忘掉前任,我火速辦了婚禮店乐,結(jié)果婚禮上艰躺,老公的妹妹穿的比我還像新娘。我一直安慰自己眨八,他們只是感情好腺兴,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,928評(píng)論 6 392
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著廉侧,像睡著了一般页响。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上伏穆,一...
    開封第一講書人閱讀 51,718評(píng)論 1 305
  • 那天拘泞,我揣著相機(jī)與錄音,去河邊找鬼枕扫。 笑死陪腌,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的烟瞧。 我是一名探鬼主播诗鸭,決...
    沈念sama閱讀 40,442評(píng)論 3 420
  • 文/蒼蘭香墨 我猛地睜開眼,長(zhǎng)吁一口氣:“原來是場(chǎng)噩夢(mèng)啊……” “哼参滴!你這毒婦竟也來了强岸?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 39,345評(píng)論 0 276
  • 序言:老撾萬榮一對(duì)情侶失蹤砾赔,失蹤者是張志新(化名)和其女友劉穎蝌箍,沒想到半個(gè)月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體暴心,經(jīng)...
    沈念sama閱讀 45,802評(píng)論 1 317
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡妓盲,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,984評(píng)論 3 337
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了专普。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片悯衬。...
    茶點(diǎn)故事閱讀 40,117評(píng)論 1 351
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖檀夹,靈堂內(nèi)的尸體忽然破棺而出筋粗,到底是詐尸還是另有隱情,我是刑警寧澤炸渡,帶...
    沈念sama閱讀 35,810評(píng)論 5 346
  • 正文 年R本政府宣布娜亿,位于F島的核電站,受9級(jí)特大地震影響蚌堵,放射性物質(zhì)發(fā)生泄漏买决。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,462評(píng)論 3 331
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望策州。 院中可真熱鬧瘸味,春花似錦、人聲如沸够挂。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,011評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)孽糖。三九已至枯冈,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間办悟,已是汗流浹背尘奏。 一陣腳步聲響...
    開封第一講書人閱讀 33,139評(píng)論 1 272
  • 我被黑心中介騙來泰國(guó)打工, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留病蛉,地道東北人炫加。 一個(gè)月前我還...
    沈念sama閱讀 48,377評(píng)論 3 373
  • 正文 我出身青樓,卻偏偏與公主長(zhǎng)得像铺然,于是被迫代替她去往敵國(guó)和親俗孝。 傳聞我的和親對(duì)象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,060評(píng)論 2 355

推薦閱讀更多精彩內(nèi)容