Java 源碼分析-ConcurrentHashMap(1.8)

??最近總是在分析源碼,感覺源碼也不是想象上的那么難排霉,今天我來記錄一下我對ConcurrentHashMap的理解窍株。這里只敢說記錄,不敢說分析攻柠,因為ConcurrentHashMap的代碼確實有點難以理解球订,本文不對代碼進行死磕,也就是說瑰钮,可能不會對某一行代碼進行死磕冒滩,而是對整個ConcurrentHashMap類進行整體的理解。
??本文參考資料:

??1.ConcurrentHashMap源碼分析(JDK8版本)
??2. 死磕 Java 并發(fā):J.U.C 之 Java 并發(fā)容器:ConcurrentHashMap
??3.深入淺出ConcurrentHashMap1.8
??4.深入分析ConcurrentHashMap1.8的擴容實現
??5.方騰飛浪谴、魏鵬开睡、程曉明的《Java 并發(fā)編程的藝術》

1.為什么要使用ConcurrentHashMap?

??其實苟耻,樓主都覺得這個問題問的非常傻逼篇恒,為什么使用ConcurrentHashMap呢?當然是為了線程安全唄梁呈。其實這個回答是比較籠統(tǒng)的婚度,這里面還有很多的問題沒有涉及到,比如說,使用最多的HashMap在多線程模型下的缺點蝗茁,傳統(tǒng)線程安全的HashTable的問題等等醋虏。

(1).線程不安全的HashMap

??我們才學習HashMap的時候,就知道HashMap是線程不安全的哮翘,但是不知道HashMap由于多線程會導致什么問題颈嚼。下面,我們來看一段代碼:

public class Demo {
    public static void main(String[] args) {
        final Map<String, String> map = new HashMap<>(2);
        Thread t = new Thread(new Runnable() {
            @Override
            public void run() {
                for(int i = 0; i < 100000; i++){
                    new Thread(new Runnable() {
                        @Override
                        public void run() {
                            map.put(UUID.randomUUID().toString(), "");

                        }
                    }).start();
                }
            }
        });
        t.start();
        try {
            t.join();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

??HashMap在并發(fā)執(zhí)行put操作是會引起死循環(huán)饭寺,是因為多線程會導致HashMap的Entry鏈表形成數據結構阻课,一旦形成環(huán)形數據結構,Entry的next節(jié)點永遠不為空艰匙,就會產生死循環(huán)獲取Entry限煞。

(2).效率低下的HashTable

??HashTable使用synchronized關鍵字來保證線程安全,但在線程競爭激烈的情況下HashTable的效率非常低下员凝。因為當一個線程訪問HashTable的同步方法署驻,其他線程也訪問HashTable的同步方法,會進入阻塞或者輪詢狀態(tài)健霹。如果線程1在使用put進行元素添加旺上,線程2不但不能使用put方法添加元素也不能使用get方法來獲取元素,所以競爭越激烈效率越低糖埋。

(3).ConcurrentHashMap

??相較于笨重的HashTable宣吱,ConcurrentHashMap就顯得非常高效。ConcurrentHashMap降低了鎖的粒度瞳别,其中在1.7中征候,設置了Segment數組,來表示不同數據段洒试,在1.8中倍奢,取消了Segment數組,進一步降低了鎖的粒度垒棋。由于本文是分析1.8的ConcurrentHashMap卒煞,所以不對1.7的版本過多的解釋。

2.ConcurrentHashMap的模型圖

??1.8在1.7的基礎上進一步的優(yōu)化叼架,使得ConcurrentHashMap的鎖的粒度進一步降低畔裕。我們還是先來看看ConcurrentHashMap的結構圖:



??我們看到的是,在1.8的內部乖订,維持了一個Node數組扮饶,用來存儲數據。實際上乍构,ConcurrentHashMap結構與HashMap的結構類似甜无,基本結構都是數組+鏈表+紅黑樹。但是在ConcurrentHashMap里面,紅黑樹在Node數組內部存儲的不是一個TreeNode對象岂丘,而是一個TreeBin對象陵究,TreeBin內部維持著一個紅黑樹。

3.ConcurrentHashMap的內部類

??簡單的看過ConcurrentHashMap的結構圖之后奥帘,現在來了解一下ConcurrentHashMap的內部類铜邮,這個在后面我們理解源碼有一定的幫助。

(1).Node類

??Node類在ConcurrentHashMap的內部類是最基本的類寨蹋,其中桶里面裝的就是Node元素松蒜,還有就是TreeBin、TreeNode等都繼承于Node類已旧。我們先來看看Node的代碼:

    static class Node<K,V> implements Map.Entry<K,V> {
        final int hash;
        final K key;
        volatile V val;
        volatile Node<K,V> next;

        Node(int hash, K key, V val, Node<K,V> next) {
            this.hash = hash;
            this.key = key;
            this.val = val;
            this.next = next;
        }

        public final K getKey()       { return key; }
        public final V getValue()     { return val; }
        public final int hashCode()   { return key.hashCode() ^ val.hashCode(); }
        public final String toString(){ return key + "=" + val; }
        public final V setValue(V value) {
            throw new UnsupportedOperationException();
        }

        public final boolean equals(Object o) {
            Object k, v, u; Map.Entry<?,?> e;
            return ((o instanceof Map.Entry) &&
                    (k = (e = (Map.Entry<?,?>)o).getKey()) != null &&
                    (v = e.getValue()) != null &&
                    (k == key || k.equals(key)) &&
                    (v == (u = val) || v.equals(u)));
        }

        /**
         * Virtualized support for map.get(); overridden in subclasses.
         */
        Node<K,V> find(int h, Object k) {
            Node<K,V> e = this;
            if (k != null) {
                do {
                    K ek;
                    if (e.hash == h &&
                        ((ek = e.key) == k || (ek != null && k.equals(ek))))
                        return e;
                } while ((e = e.next) != null);
            }
            return null;
        }
    }

??我們來看一下Node成員變量:

        final int hash;
        final K key;
        volatile V val;
        volatile Node<K,V> next;

??hash用來存儲該key的hash值秸苗,但是這里面有三個狀態(tài)需要注意:

名稱 描述
MOVED -1 hash值為-1的Node,表示當前節(jié)點已經被處理過了运褪,這中情況將出現多線程擴容的情況下难述,后面會詳細的解釋
TREEBIN -2 hash值為-2的Node,表示當前的節(jié)點是TreeBin
RESERVED -3 保留的hash值吐句,用在ReservationNode上面

??key用來存儲的key值,val用來存儲value值店读,這個就不用多說嗦枢。next字段表示下一個Node,這個在出現哈希沖突時屯断,將定位在相同位置上的Node使用鏈表連接起來文虏。
??我們還需要注意一點是,我們不能夠通過調用setValue方法來改變Node的值。

        public final V setValue(V value) {
            throw new UnsupportedOperationException();
        }

(2).TreeNode類和TreeBin類

??TreeNode類表示的是紅黑樹上的每個節(jié)點殖演。當一個鏈表上的節(jié)點數量超過了指定的值氧秘,會將這個鏈表變?yōu)榧t黑樹,當然每個節(jié)點就轉換為TreeNode趴久。不像HashMap丸相,ConcurrentHashMap在桶里面直接存儲的不是TreeNode,而是一個TreeBin彼棍,在TreeBin內部維護一個紅黑樹灭忠,也就是說TreeNode在TreeBin內部使用的。

(3).ForwardingNode類

??這個是輔助類座硕,通常在擴容時用到弛作。此時如果一個線程在進行擴容操作,另一個線程put一個元素進來华匾,如果看到對應的位置上面是ForwardingNode對象的話映琳,那么就參與擴容的操作隊列來。其中ForwardingNode對象來源有兩個:1.原來的位置為null,但是此時復制操作已經到當前位置的后面了萨西,會將這個原來的桶的這個位置置為ForwardingNode對象有鹿;2.原來位置不為null,但是已經操作過這個位置了原杂。
??我們來來ForwardingNode的源碼:

    static final class ForwardingNode<K,V> extends Node<K,V> {
        final Node<K,V>[] nextTable;
        ForwardingNode(Node<K,V>[] tab) {
            super(MOVED, null, null, null);
            this.nextTable = tab;
        }
        //省略了find方法代碼
    }

??在ForwardingNode里面印颤,我們發(fā)現了一個nextTable對象,這個對象表示擴容之后的數組穿肄,當一個線程訪問到了一個ForwardingNode對象年局,就知道當前正在進行擴容操作,當前這個線程會幫助擴容(擴容分為兩步:1.數組容量增大2倍咸产;2.將原數組的元素拷貝到新數組里面去)矢否,將原數組的元素復制到這個nextTable里面去。

4.基本成員變量

??我們在分析ConcurrentHashMap的源碼時脑溢,還是先來看看它基本的成員變量僵朗。

變量名 描述
table 桶數組,用來存儲Node元素的屑彻。默認為null验庙,只在第一次put操作的進行初始化,該數組的長度永遠為2的n次方社牲。
nextTable 默認為null救拉,當不為null迹辐,表示當前正在進行擴容操作,這個數組就是擴容之后的數組,長度為原數組的兩倍秆吵。
baseCount 記錄map中元素個數奸披,由于是多線程操作坟漱,baseCount記錄的不準確,所以要結合counterCells 來使用保證記錄的正確性。
sizeCtl 表初始化和擴容的控制位弊添。其中,當為-1時,表示當前table數組正在被初始化;當為-N時,表示有N-1個線程在進行擴容操作飞苇;當為0(默認值)時忿等,表示當前table還未使用匾浪,此時table為null冷溶;當為其余正整數時苗胀,表示table的容量,默認是table大小的0.75倍,在代碼中使用的是(n - (n>>>2))的方式來計算0.75

5.構造方法

??在ConcurrentHashMap里面,我覺得有兩個構造方法需要我們關注拴鸵,一個是無參數的構造方法聘芜,這個構造方法非常的簡單瞎饲,是一個空的構造方法疟呐,還有一個就是帶初始容量的構造方法:

    public ConcurrentHashMap(int initialCapacity) {
        if (initialCapacity < 0)
            throw new IllegalArgumentException();
        int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ?
                   MAXIMUM_CAPACITY :
                   tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1));
        this.sizeCtl = cap;
    }

??在這個構造方法里面最主要的是富纸,初始化sizeCtl囤踩。在這里保證sizeCtl必須是2的n次方,所以這里調用tableSizeFor方法的目的就是調整sizeCtl大小為2的n次方。

    private static final int tableSizeFor(int c) {
        int n = c - 1;
        n |= n >>> 1;
        n |= n >>> 2;
        n |= n >>> 4;
        n |= n >>> 8;
        n |= n >>> 16;
        return (n < 0) ? 1 : (n >= MAXIMUM_CAPACITY) ? MAXIMUM_CAPACITY : n + 1;
    }

??假設給出大小為100晓褪,調用這個方法之后堵漱,會將大小調整為256。

6.table數組的初始化

??在前面涣仿,對table數組的初始化也有所提及勤庐。table不會在構造方法里面進行初始化,而是在第一次put操作時好港,進行初始化愉镰。我們來看看:

    final V putVal(K key, V value, boolean onlyIfAbsent) {
        if (key == null || value == null) throw new NullPointerException();
        int hash = spread(key.hashCode());
        int binCount = 0;
        for (Node<K,V>[] tab = table;;) {
            Node<K,V> f; int n, i, fh;
            //發(fā)現table數組尚未初始化,調用initTable方法來對table數組進行初始化
            if (tab == null || (n = tab.length) == 0)
                tab = initTable();
       //省略了其余代碼
    }

??在調用put方法時钧汹,如果發(fā)現table尚未被初始化丈探,會調用initTable方法來初始化。我們來看看initTable方法的代碼:

    private final Node<K,V>[] initTable() {
        Node<K,V>[] tab; int sc;
        while ((tab = table) == null || tab.length == 0) {
             //sizeCtl小于0拔莱,表示已經有線程在對table數組進行初始化碗降,現在這個線程要做的就是讓出cpu時間,
             //全力支持table初始化
            if ((sc = sizeCtl) < 0)
                Thread.yield(); // lost initialization race; just spin
            //此時有一個線程在對table數組進行是初始化塘秦,如果使用CAS算法對sizeCtl字段更新成功讼渊,
            //表示當前線程可以對table數組進行初始化。
            else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
                try {
                    if ((tab = table) == null || tab.length == 0) {
                        int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
                        @SuppressWarnings("unchecked")
                        Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
                        table = tab = nt;
                        sc = n - (n >>> 2);
                    }
                } finally {
                    sizeCtl = sc;
                }
                break;
            }
        }
        return tab;
    }

??在initTable方法中尊剔,我們發(fā)現table數組只能被一個線程初始化爪幻。如果一個線程在初始化table數組,會將sizeCtl字段更新為-1须误,其他線程看到sizeCtl為-1時挨稿,就知道當前已經有線程在初始化table數組,他們需要做的是京痢,全力支持那個線程初始化table數組--讓出cpu占用時間奶甘。
??同時,我們看到當table數組更新成功之后历造,sizeCtl更新為table數組大小的0.75倍:

sc = n - (n >>> 2);

7.put操作

??我們使用HashMap最多的操作就是put操作和get操作。這里先對put進行分析船庇。我們先總體看一下put的代碼:

    final V putVal(K key, V value, boolean onlyIfAbsent) {
        if (key == null || value == null) throw new NullPointerException();
        int hash = spread(key.hashCode());
        int binCount = 0;
        for (Node<K,V>[] tab = table;;) {
            Node<K,V> f; int n, i, fh;
            if (tab == null || (n = tab.length) == 0)
                tab = initTable();
            else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
                if (casTabAt(tab, i, null,
                             new Node<K,V>(hash, key, value, null)))
                    break;                   // no lock when adding to empty bin
            }
            else if ((fh = f.hash) == MOVED)
                tab = helpTransfer(tab, f);
            else {
                V oldVal = null;
                synchronized (f) {
                    if (tabAt(tab, i) == f) {
                        if (fh >= 0) {
                            binCount = 1;
                            for (Node<K,V> e = f;; ++binCount) {
                                K ek;
                                if (e.hash == hash &&
                                    ((ek = e.key) == key ||
                                     (ek != null && key.equals(ek)))) {
                                    oldVal = e.val;
                                    if (!onlyIfAbsent)
                                        e.val = value;
                                    break;
                                }
                                Node<K,V> pred = e;
                                if ((e = e.next) == null) {
                                    pred.next = new Node<K,V>(hash, key,
                                                              value, null);
                                    break;
                                }
                            }
                        }
                        else if (f instanceof TreeBin) {
                            Node<K,V> p;
                            binCount = 2;
                            if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
                                                           value)) != null) {
                                oldVal = p.val;
                                if (!onlyIfAbsent)
                                    p.val = value;
                            }
                        }
                    }
                }
                if (binCount != 0) {
                    if (binCount >= TREEIFY_THRESHOLD)
                        treeifyBin(tab, i);
                    if (oldVal != null)
                        return oldVal;
                    break;
                }
            }
        }
        addCount(1L, binCount);
        return null;
    }

??這里吭产,我先將整個put方法的代碼貼出,讓大家有一個總體上概念鸭轮,然后再來一一解釋臣淤,整個put操作的流程。
??整個put操作分為4種情況:
??1.如果當前table沒有被初始化窃爷,會先初始化邑蒋,重新進行put操作(循環(huán)操作姓蜂,保證put成功)。
??2.通過hash計算医吊,定位位置钱慢,如果位置上為null,表示可以直接放進去卿堂。
??3.如果對應位置的元素已經被標記為MOVED束莫,即Node的hash為MOVED,那么表示當前table數組在進行擴容操作草描,此時览绿,當前的線程會幫助擴容。
??4.如果通過hash計算之后的位置不為null穗慕,表示出現了哈希沖突饿敲,此時會鎖住當前這個位置上的Node對象,然后將新添加的Node添加這個位置鏈表或者紅黑樹上逛绵。

(1).hash計算

??put操作的第一步就是hash計算怀各,這里的hash計算,我認為分為兩步:
??1.調用spread方法將key的hashcode再次hash暑脆。
??2.通過(n - 1) & hash方法來計算該元素定位的位置渠啤。
??我們先來看看spread方法:

    static final int spread(int h) {
        return (h ^ (h >>> 16)) & HASH_BITS;
    }

??我先來解釋一下>>>運算符的含義:
??>>>的基本操作就是左移,然后高位補0添吗,這里的左移表示連符號位都要跟著左移沥曹;而>>只是左移數值位,不移動符號位碟联。
??h ^ (h >>> 16)與運算過程如下圖:


??異或操作完成之后妓美,然后跟HASH_BITS取與操作,我認為是為了消除符號位的影響鲤孵。
??至于(n - 1) & hash計算壶栋,其實就是hash % n的計算,具體的解釋普监,大家可以參考我的Java 源碼分析-ThreadLocal,這篇文章里面有相關的解釋贵试。

(2).元素放入數組

??由于初始化table數組在前面已經說了,所以這里直接說賦值的情況凯正。在這種情況下毙玻,還有分為兩種情況:1.對應位置為null;2.對應位置不為null廊散。這里將將兩種情況分別討論一下桑滩。

A.對應位置為null

??這種情況比較簡單,直接調用了casTabAt方法進行賦值:

    static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i,
                                        Node<K,V> c, Node<K,V> v) {
        return U.compareAndSwapObject(tab, ((long)i << ASHIFT) + ABASE, c, v);
    }

??這里使用Unsafe類來對變量進行CAS更新允睹,具體的實現原理运准,樓主也不是很清楚幌氮,實在抱歉!

A.對應位置不為null

??這種情況下胁澳,就比較復雜了该互,我們先來看看代碼:

                V oldVal = null;
                synchronized (f) {
                    if (tabAt(tab, i) == f) {
                        if (fh >= 0) {
                            binCount = 1;
                            for (Node<K,V> e = f;; ++binCount) {
                                K ek;
                                if (e.hash == hash &&
                                    ((ek = e.key) == key ||
                                     (ek != null && key.equals(ek)))) {
                                    oldVal = e.val;
                                    if (!onlyIfAbsent)
                                        e.val = value;
                                    break;
                                }
                                Node<K,V> pred = e;
                                if ((e = e.next) == null) {
                                    pred.next = new Node<K,V>(hash, key,
                                                              value, null);
                                    break;
                                }
                            }
                        }
                        else if (f instanceof TreeBin) {
                            Node<K,V> p;
                            binCount = 2;
                            if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
                                                           value)) != null) {
                                oldVal = p.val;
                                if (!onlyIfAbsent)
                                    p.val = value;
                            }
                        }
                    }
                }
                if (binCount != 0) {
                    if (binCount >= TREEIFY_THRESHOLD)
                        treeifyBin(tab, i);
                    if (oldVal != null)
                        return oldVal;
                    break;
                }

??這種情況下,新添加的Node肯定會添加到這個位置后面听哭,當然為了保證線程安全慢洋,肯定鎖住當前的Node,這樣將鎖的粒度降到了每個Node上陆盘,一個Node的put操作不會影響其他Node的操作普筹。
??鎖住了之后,當然就要添加Node隘马。這里分為兩種情況太防,一種原來的Node不是TreeBin,也就是說原來就是一個鏈表酸员,這樣直接添加到鏈表的內部就行了蜒车;還有就是原來的Node本身就是一個TreeBin,那么我們通過調用TreeBin內部的方法來添加一個Node幔嗦,至于紅黑樹的平衡性酿愧,讓TreeBin自己來維持,我們外部不需要關注邀泉。
??添加完成之后嬉挡,還需要一個操作,那就是如果當前鏈表達到了閾值汇恤,需要將鏈表變?yōu)榧t黑樹庞钢。這一步的解釋在后面在詳細的說。

8.更新baseCount

??在putVal方法的循環(huán)執(zhí)行完畢之后因谎,一個Node肯定放入進去了基括,此時就需要調用addCount方法來更新baseCount變量:

    private final void addCount(long x, int check) {
        CounterCell[] as; long b, s;
        //更新baseCount
        if ((as = counterCells) != null ||
            !U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
            CounterCell a; long v; int m;
            boolean uncontended = true;
            if (as == null || (m = as.length - 1) < 0 ||
                (a = as[ThreadLocalRandom.getProbe() & m]) == null ||
                !(uncontended =
                  U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {
                fullAddCount(x, uncontended);
                return;
            }
            if (check <= 1)
                return;
            s = sumCount();
        }
        //檢測是否需要擴容
        if (check >= 0) {
            Node<K,V>[] tab, nt; int n, sc;
            while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
                   (n = tab.length) < MAXIMUM_CAPACITY) {
                int rs = resizeStamp(n);
                if (sc < 0) {
                    if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
                        sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
                        transferIndex <= 0)
                        break;
                    if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
                        transfer(tab, nt);
                }
                else if (U.compareAndSwapInt(this, SIZECTL, sc,
                                             (rs << RESIZE_STAMP_SHIFT) + 2))
                    transfer(tab, null);
                s = sumCount();
            }
        }
    }

?? addCount方法里面分為主要兩個操作:1.更新baseCount;2.檢測是否擴容财岔。這里由于在討論更新baseCount风皿,所以不對擴容操作進行詳細的解釋,之后會做詳細的解釋匠璧。
?? 更新baseCount的操作分成了兩步:1.嘗試更新baseCount變量桐款;2.如果更新失敗,或者counterCells為null會調用fullAddCount方法進行循環(huán)更新患朱。
??我們來看看fullAddCount的代碼:

    private final void fullAddCount(long x, boolean wasUncontended) {
        int h;
        if ((h = ThreadLocalRandom.getProbe()) == 0) {
            ThreadLocalRandom.localInit();      // force initialization
            h = ThreadLocalRandom.getProbe();
            wasUncontended = true;
        }
        boolean collide = false;                // True if last slot nonempty
        for (;;) {
            CounterCell[] as; CounterCell a; int n; long v;
            if ((as = counterCells) != null && (n = as.length) > 0) {
               // 如果counterCells數組對應位置上為null鲁僚,創(chuàng)建一個cell炊苫,
               //放在這個位置上裁厅。
                if ((a = as[(n - 1) & h]) == null) {
                    if (cellsBusy == 0) {            // Try to attach new Cell
                        CounterCell r = new CounterCell(x); // Optimistic create
                        if (cellsBusy == 0 &&
                            U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
                            boolean created = false;
                            try {               // Recheck under lock
                                CounterCell[] rs; int m, j;
                                if ((rs = counterCells) != null &&
                                    (m = rs.length) > 0 &&
                                    rs[j = (m - 1) & h] == null) {
                                    rs[j] = r;
                                    created = true;
                                }
                            } finally {
                                cellsBusy = 0;
                            }
                            if (created)
                                break;
                            continue;           // Slot is now non-empty
                        }
                    }
                    collide = false;
                }
                else if (!wasUncontended)       // CAS already known to fail
                    wasUncontended = true;      // Continue after rehash
               //如果對應位置上不為null冰沙,嘗試更新value值
                else if (U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))
                    break;
                else if (counterCells != as || n >= NCPU)
                    collide = false;            // At max size or stale
                else if (!collide)
                    collide = true;
                 //如果對應不為null,并且更新失敗执虹,表示此時counterCells數組的容量過小拓挥,
                //此時需要擴容。
                else if (cellsBusy == 0 &&
                         U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
                    try {
                        if (counterCells == as) {// Expand table unless stale
                            CounterCell[] rs = new CounterCell[n << 1];
                            for (int i = 0; i < n; ++i)
                                rs[i] = as[i];
                            counterCells = rs;
                        }
                    } finally {
                        cellsBusy = 0;
                    }
                    collide = false;
                    continue;                   // Retry with expanded table
                }
                h = ThreadLocalRandom.advanceProbe(h);
            }
            //初始化counterCells數組
            else if (cellsBusy == 0 && counterCells == as &&
                     U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
                boolean init = false;
                try {                           // Initialize table
                    if (counterCells == as) {
                        CounterCell[] rs = new CounterCell[2];
                        rs[h & 1] = new CounterCell(x);
                        counterCells = rs;
                        init = true;
                    }
                } finally {
                    cellsBusy = 0;
                }
                if (init)
                    break;
            }
            else if (U.compareAndSwapLong(this, BASECOUNT, v = baseCount, v + x))
                break;                          // Fall back on using base
        }
    }

??這個方法的代碼比較長袋励,理解起來可能麻煩侥啤,我來說明整個方法的作用和執(zhí)行的過程。
??在解釋這個方法之前茬故,先解釋一下baseCount和counterCells含義盖灸。兩個都是用記錄元素個數,只是記錄的時機不同的磺芭。當要更新元素個數時赁炎,優(yōu)先更新baseCount,如果baseCount更新成功的話钾腺,表示更新元素個數的操作已經完成了徙垫;如果更新失敗的話,此時會考慮更新counterCells數組中某一個(隨機的)cell的value值放棒。因此姻报,map的元素個數 = baseCount + 所有的cell的value值
??在調用fullAddCount方法之前间螟,在addCount方法里面進行了兩次嘗試:1.嘗試更新baseCount吴旋;2.嘗試更新counterCells數組隨機位置上的一個cell的value。如果這兩次嘗試都失敗的話寒亥,則需要調用fullAddCount來保證元素個數正確的更新邮府。
??而這個fullAddCount方法的作用就是更新cell的value值。
??整個fullAddCount方法的執(zhí)行步驟:
??1.如果counterCells為null溉奕,先初始化counterCells數組褂傀,默認大小為0。
??2.如果counterCells不為null加勤,根據產生的隨機仙辟,通過(n - 1) & h找到一個位置,如果這個位置為null的話鳄梅,創(chuàng)建一個cell對象放在這個位置上面叠国。
??3.如果這個位置上面不為null,首先會嘗試更新這個cell的value值戴尸,如果更新成功的話粟焊,表示更新操作完成,否則執(zhí)行第4步。
??4.如果第3步的更新操作失敗项棠,表示此時counterCells數組的容量可能不足以應付這么多的線程了悲雳。所以此時counterCells數組需要擴容。

9.擴容操作

??當我們put一個元素之后香追,如果此時元素已經達到了擴容的閾值了合瓢,就需要擴容。我們先來看看addCount方里面的擴容部分:

        if (check >= 0) {
            Node<K,V>[] tab, nt; int n, sc;
            while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
                   (n = tab.length) < MAXIMUM_CAPACITY) {
                int rs = resizeStamp(n);
                //表示已經有線程在進行擴容操作
                if (sc < 0) {
                    if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
                        sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
                        transferIndex <= 0)
                        break;
                    if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
                        transfer(tab, nt);
                }
                //當前線程是唯一的或是第一個發(fā)起擴容的線程  此時nextTable=null
                else if (U.compareAndSwapInt(this, SIZECTL, sc,
                                             (rs << RESIZE_STAMP_SHIFT) + 2))
                    transfer(tab, null);
                s = sumCount();
            }
        }

??addcount方法擴容部分有兩種情況:1. sc<0情況下透典,表示當前已經有線程在進行擴容操作了晴楔,此時當前這個線程需要參與幫助擴容的任務中來;2. 另一個情況峭咒,當前線程只有一個線程準備擴容操作税弃。第一種情況為什么不調用helperTransfer方法來表示幫助擴容的含義,這個原因凑队,我也不是很懂钙皮,但是比較了第一種情況的代碼與helperTransfer方法的代碼,兩者其實比較類似顽决。
??現在我們來看看transfer方法究竟為我們做了什么短条。由于transfer方法代碼過長,所以這里不完全展出來才菠,這里使用分段形式來解釋茸时。
??首先,如果是第一個線程調用transfer方法進行擴容操作赋访,那么nextTable肯定為null可都。所以transfer方法的第一步就是創(chuàng)建新數組,新數組的容量是舊數組的兩倍:

        if (nextTab == null) {            // initiating
            try {
                @SuppressWarnings("unchecked")
                Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
                nextTab = nt;
            } catch (Throwable ex) {      // try to cope with OOME
                sizeCtl = Integer.MAX_VALUE;
                return;
            }
            nextTable = nextTab;
            transferIndex = n;
        }

??擴容之后蚓耽,此時需要將原來的數組進行復制操作了渠牲。這個過程支持多線程操作。在看這個過程之前步悠,我們先來看看幾個變量的含義签杈。

變量名 類型 描述
fwd ForwardingNode 用來作為占位符,表示當前位置已經被處理過
advance boolean advance為true鼎兽,表示當前這個節(jié)點已經被處理答姥,這個與fwd區(qū)別在于,advance為false表示當前節(jié)點(i位置)可以被處理
finishing boolean finishing為true谚咬,表示當前擴容操作已經操作完畢

??接下來鹦付,我們來分析一下transfer方法的執(zhí)行過程。大體的流程是择卦,遍歷這個原來的桶數組敲长,然后復制到新數組里面去郎嫁。
??首先,必須找到一個可以被處理的Node節(jié)點祈噪,這個節(jié)點可以為null行剂,也可以不為null,但是不能為fwd對象钳降,因為如果是fwd對象,表示當前已經被處理過腌巾,沒必要再去處理遂填。

           while (advance) {
                int nextIndex, nextBound;
                if (--i >= bound || finishing)
                    advance = false;
                else if ((nextIndex = transferIndex) <= 0) {
                    i = -1;
                    advance = false;
                }
                else if (U.compareAndSwapInt
                         (this, TRANSFERINDEX, nextIndex,
                          nextBound = (nextIndex > stride ?
                                       nextIndex - stride : 0))) {
                    bound = nextBound;
                    i = nextIndex - 1;
                    advance = false;
                }
            }

??上面的代碼中,只要advance為false澈蝙,就退出while循環(huán)吓坚,表示當前嘗試著處理這個位置上的Node。這里說的是嘗試著處理灯荧,意思就是礁击,可能不會被處理,比如說逗载,已經有現在在處理了哆窿;或者這個位置已經被處理,已經為fwd了厉斟。
??首先挚躯,如果這個位置上為null的話,那么直接將這個位置設置成fwd擦秽。

            else if ((f = tabAt(tab, i)) == null)
                advance = casTabAt(tab, i, null, fwd);

??如果當前這個節(jié)點已經被處理了码荔,直接將advance設置為true,進行下一次位置的選擇感挥。

            else if ((fh = f.hash) == MOVED)
                advance = true; // already processed

??其他情況下缩搅,表示可以處理這個節(jié)點,這里處理的意思触幼,是可以將這個位置上的所有節(jié)點拷貝到新數組里面去硼瓣。這里分為兩種情況,我們來一一的分析置谦。
??首先巨双,第一種情況就是節(jié)點本身為一個鏈表結構。

                        Node<K,V> ln, hn;
                        if (fh >= 0) {
                            int runBit = fh & n;
                            Node<K,V> lastRun = f;
                            for (Node<K,V> p = f.next; p != null; p = p.next) {
                                int b = p.hash & n;
                                if (b != runBit) {
                                    runBit = b;
                                    lastRun = p;
                                }
                            }
                            if (runBit == 0) {
                                ln = lastRun;
                                hn = null;
                            }
                            else {
                                hn = lastRun;
                                ln = null;
                            }
                            for (Node<K,V> p = f; p != lastRun; p = p.next) {
                                int ph = p.hash; K pk = p.key; V pv = p.val;
                                if ((ph & n) == 0)
                                    ln = new Node<K,V>(ph, pk, pv, ln);
                                else
                                    hn = new Node<K,V>(ph, pk, pv, hn);
                            }
                            //將原來鏈表上的一部分就放在原位置
                            setTabAt(nextTab, i, ln);
                            //將原來鏈表上的另一部分放在 i + n的位置上
                            setTabAt(nextTab, i + n, hn);
                            //處理完畢霉祸,將這個位置上設置為fwd筑累,后面的線程看到之后,知道這個節(jié)點已經被處理了
                            setTabAt(tab, i, fwd);
                            advance = true;
                        }

??整個過程是非常簡單丝蹭,就是將原來的鏈表分為了兩部分慢宗,一部分在原來的 i 位置上,一部分在 i+ n的位置上。這里涉及到了喜聞樂見的反轉鏈表镜沽,這里不對反轉鏈表做解釋敏晤,后面會詳細的解釋反轉鏈表的原因,到時候會回來詳細的解釋這段代碼缅茉。
??現在來看看復制操作的第二種類型嘴脾。如果當前位置上的Node為TreeBin類型,表示為紅黑樹蔬墩,此時要做的操作跟鏈表的操作也是非常類似译打,將樹分為兩個部分,一個部分在原來的i位置上拇颅,另一個部分在i + n的位置上奏司。

                            TreeBin<K,V> t = (TreeBin<K,V>)f;
                            TreeNode<K,V> lo = null, loTail = null;
                            TreeNode<K,V> hi = null, hiTail = null;
                            int lc = 0, hc = 0;
                            for (Node<K,V> e = t.first; e != null; e = e.next) {
                                int h = e.hash;
                                TreeNode<K,V> p = new TreeNode<K,V>
                                    (h, e.key, e.val, null, null);
                                if ((h & n) == 0) {
                                    if ((p.prev = loTail) == null)
                                        lo = p;
                                    else
                                        loTail.next = p;
                                    loTail = p;
                                    ++lc;
                                }
                                else {
                                    if ((p.prev = hiTail) == null)
                                        hi = p;
                                    else
                                        hiTail.next = p;
                                    hiTail = p;
                                    ++hc;
                                }
                            }
                            //如果此時節(jié)點數量小于閾值,那么將紅黑樹變?yōu)殒湵?                            ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
                                (hc != 0) ? new TreeBin<K,V>(lo) : t;
                            hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
                                (lc != 0) ? new TreeBin<K,V>(hi) : t;

                            setTabAt(nextTab, i, ln);
                            setTabAt(nextTab, i + n, hn);
                            setTabAt(tab, i, fwd);
                            advance = true;

??最后樟插,如果所有的節(jié)點被賦值完畢之后韵洋,表示擴容操作已經完成了,此時finishing為true。

            if (i < 0 || i >= n || i + n >= nextn) {
                int sc;
                if (finishing) {
                    //將nextTable置為null黄锤,以便下次擴容
                    nextTable = null;
                    table = nextTab;
                   //將sizeCtl的值調整為長度的0.75
                    sizeCtl = (n << 1) - (n >>> 1);
                    return;
                }
                 //sizeCtl減一搪缨,表示當前有個線程參與擴容
                if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
                    if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
                        return;
                    finishing = advance = true;
                    i = n; // recheck before commit
                }
            }

10.反轉鏈表的原因

??樓主的思路來自于深入分析ConcurrentHashMap1.8的擴容實現,如果大家可以參考一下鸵熟。剛剛勉吻,我們對transfer方法的復制操作有了一個大概的理解,這里將對其詳細分析旅赢。
??在解釋這段代碼之前齿桃,我們先對這段代碼里面幾個變量有一個初步的理解。

變量名 類型 描述
runBit int 位置為每個節(jié)點的hash值與n進行與操作煮盼,經過第一次循環(huán)短纵,runBit記錄的是最后一個hash值變化的Node的hash值〗┛兀可能這里拗口香到,待會舉一個詳細例子。
lastRun Node 默認值為當前位置第一個Node报破,經過第一次循環(huán)悠就,lastRun記錄的是最后一個hash值變化的Node。
ln Node 默認值為null充易,是放在新數組 i 位置上的鏈表頭結點梗脾。
hn Node 默認值為null,是放在新數組 i + n 位置上的鏈表頭結點

??在上面的表格中盹靴,提到最后一個hash值變化的Node炸茧,這句話可能比較拗口瑞妇,這里舉一個簡單的例子來表達意思。



??上圖中是一個鏈表的模型梭冠,其中辕狰,我將這個鏈表中的節(jié)點分為兩類:1.一類是節(jié)點的hash值相同的;2.第二類是跟第一類的hash值不同的控漠。所以蔓倍,從這個鏈表中我們可以得到,lastRun為節(jié)點6盐捷,runBit也是節(jié)點6與n進行與操作得到的值偶翅。為什么是節(jié)點6呢?因為在6之后毙驯,節(jié)點類型沒有再變了。

                        Node<K,V> ln, hn;
                        if (fh >= 0) {
                            int runBit = fh & n;
                            Node<K,V> lastRun = f;
                            for (Node<K,V> p = f.next; p != null; p = p.next) {
                                int b = p.hash & n;
                                if (b != runBit) {
                                    runBit = b;
                                    lastRun = p;
                                }
                            }
                            if (runBit == 0) {
                                ln = lastRun;
                                hn = null;
                            }
                            else {
                                hn = lastRun;
                                ln = null;
                            }
                            for (Node<K,V> p = f; p != lastRun; p = p.next) {
                                int ph = p.hash; K pk = p.key; V pv = p.val;
                                if ((ph & n) == 0)
                                    ln = new Node<K,V>(ph, pk, pv, ln);
                                else
                                    hn = new Node<K,V>(ph, pk, pv, hn);
                            }
                            //將原來鏈表上的一部分就放在原位置
                            setTabAt(nextTab, i, ln);
                            //將原來鏈表上的另一部分放在 i + n的位置上
                            setTabAt(nextTab, i + n, hn);
                            //處理完畢灾测,將這個位置上設置為fwd爆价,后面的線程看到之后,知道這個節(jié)點已經被處理了
                            setTabAt(tab, i, fwd);
                            advance = true;
                        }

??結合源碼來分析媳搪,runBit默認為節(jié)點1的hash和n與操作的值铭段。后面依次記錄與上一次runBit的不同值,最后通過runBit == 0將所有的節(jié)點分為了兩類秦爆。
??現在我們結合上面的圖做一個假設序愚。我們假設紫色節(jié)點的hash值&n為0,那么藍色節(jié)點的hash值&n就不為0等限。經過第一次循環(huán)爸吮,得出 ln = lastRun = 節(jié)點6,hn = null望门。
??此時我們來分析第二次循環(huán)形娇,先來看看紫色節(jié)點構成ln鏈表的過程。


??我看到ln鏈表只是部分反轉了筹误,默認部分是沒有反轉了桐早。其中得到的ln鏈表放在新數組的i位置上。
??然后我們再來看看hn鏈表的構成過程厨剪。

??我們發(fā)現hn鏈表是完全反轉的哄酝。從而,我們得出一個結論祷膳,哪個鏈表默認為null陶衅,那個鏈表就是完全反轉的
??最終直晨,我們終于知道了万哪,一個反轉鏈表得出的原因侠驯,其實不是故意為之,而是加快復制操作速度奕巍,因為lastRun之后的節(jié)點進行復制操作不需要創(chuàng)建新節(jié)點吟策。

11.get操作

??前面詳細的說了put操作和擴容操作,這里簡單的分析一下get操作的止,get操作相比于前面的操作就非常的簡單檩坚。先來看看源碼:

    public V get(Object key) {
        Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
        int h = spread(key.hashCode());
        if ((tab = table) != null && (n = tab.length) > 0 &&
            (e = tabAt(tab, (n - 1) & h)) != null) {
             //匹配成功
            if ((eh = e.hash) == h) {
                if ((ek = e.key) == key || (ek != null && key.equals(ek)))
                    return e.val;
            }
           //當節(jié)點為TreeBin或者ForwardingNode
            else if (eh < 0)
                return (p = e.find(h, key)) != null ? p.val : null;
           //遍歷鏈表
            while ((e = e.next) != null) {
                if (e.hash == h &&
                    ((ek = e.key) == key || (ek != null && key.equals(ek))))
                    return e.val;
            }
        }
        return null;
    }

??整個get操作分為三種情況:1.table數組指定位置的第一個節(jié)點就匹配成功,直接返回诅福;2.table數組指定位置上的hash值小于0匾委,此時當前位置可能已經被其他在擴容時處理過,或者當前位置的Node為一個TreeBin氓润,不管是那種類型的Node赂乐,調用的都是find方法來獲取Node節(jié)點;3.其余情況下咖气,直接遍歷鏈表查找挨措。

12.size操作

??size操作里面,主要有兩個方法提供崩溪,一個是傳統(tǒng)的size方法浅役,一個是ConcurrentHashMap比較提倡的mappingCount方法。我們先來看看size方法:

    public int size() {
        long n = sumCount();
        return ((n < 0L) ? 0 :
                (n > (long)Integer.MAX_VALUE) ? Integer.MAX_VALUE :
                (int)n);
    }

??size方法里面調用sumCount方法伶唯,sumCount方法才是真正計算元素個數的方法觉既。我們來看看sumCount方法:

    final long sumCount() {
        CounterCell[] as = counterCells; CounterCell a;
        long sum = baseCount;
        if (as != null) {
            for (int i = 0; i < as.length; ++i) {
                if ((a = as[i]) != null)
                    sum += a.value;
            }
        }
        return sum;
    }

??可能大家對這個方法計算方式不太好理解,其實這種計算方式取決于前面的設計方式乳幸。在前面我說到過瞪讼,map的元素個數 = baseCount + 所有的cell的value值。這個結論是因為多線程更新個數時粹断,如果只在baseCount里面記錄杆麸,一個原因是多線程更新可能會出錯锰蓬,另一個原因是競爭性太大了第租。所以勉痴,設計理念就變成了,當baseCount更新失敗時悬赏,表示baseCount變量當前忙碌狡汉,可以將這個增量添加到一個不忙的cell里面去,這樣競爭性就降低了闽颇。
??我們再來看看mappingCount方法:

    public long mappingCount() {
        long n = sumCount();
        return (n < 0L) ? 0L : n; // ignore transient negative values
    }

??達到的效果感覺跟size都差不多盾戴,只不過是一個返回int,一個返回long而已兵多。

最后編輯于
?著作權歸作者所有,轉載或內容合作請聯系作者
  • 序言:七十年代末尖啡,一起剝皮案震驚了整個濱河市橄仆,隨后出現的幾起案子,更是在濱河造成了極大的恐慌衅斩,老刑警劉巖盆顾,帶你破解...
    沈念sama閱讀 217,406評論 6 503
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現場離奇詭異畏梆,居然都是意外死亡您宪,警方通過查閱死者的電腦和手機,發(fā)現死者居然都...
    沈念sama閱讀 92,732評論 3 393
  • 文/潘曉璐 我一進店門奠涌,熙熙樓的掌柜王于貴愁眉苦臉地迎上來宪巨,“玉大人,你說我怎么就攤上這事溜畅∧笞浚” “怎么了?”我有些...
    開封第一講書人閱讀 163,711評論 0 353
  • 文/不壞的土叔 我叫張陵慈格,是天一觀的道長怠晴。 經常有香客問我,道長峦椰,這世上最難降的妖魔是什么龄寞? 我笑而不...
    開封第一講書人閱讀 58,380評論 1 293
  • 正文 為了忘掉前任汰规,我火速辦了婚禮汤功,結果婚禮上,老公的妹妹穿的比我還像新娘溜哮。我一直安慰自己滔金,他們只是感情好,可當我...
    茶點故事閱讀 67,432評論 6 392
  • 文/花漫 我一把揭開白布茂嗓。 她就那樣靜靜地躺著餐茵,像睡著了一般。 火紅的嫁衣襯著肌膚如雪述吸。 梳的紋絲不亂的頭發(fā)上忿族,一...
    開封第一講書人閱讀 51,301評論 1 301
  • 那天,我揣著相機與錄音蝌矛,去河邊找鬼道批。 笑死,一個胖子當著我的面吹牛入撒,可吹牛的內容都是我干的隆豹。 我是一名探鬼主播,決...
    沈念sama閱讀 40,145評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼茅逮,長吁一口氣:“原來是場噩夢啊……” “哼璃赡!你這毒婦竟也來了判哥?” 一聲冷哼從身側響起,我...
    開封第一講書人閱讀 39,008評論 0 276
  • 序言:老撾萬榮一對情侶失蹤碉考,失蹤者是張志新(化名)和其女友劉穎塌计,沒想到半個月后,有當地人在樹林里發(fā)現了一具尸體豆励,經...
    沈念sama閱讀 45,443評論 1 314
  • 正文 獨居荒郊野嶺守林人離奇死亡夺荒,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內容為張勛視角 年9月15日...
    茶點故事閱讀 37,649評論 3 334
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現自己被綠了良蒸。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片技扼。...
    茶點故事閱讀 39,795評論 1 347
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖嫩痰,靈堂內的尸體忽然破棺而出剿吻,到底是詐尸還是另有隱情,我是刑警寧澤串纺,帶...
    沈念sama閱讀 35,501評論 5 345
  • 正文 年R本政府宣布丽旅,位于F島的核電站,受9級特大地震影響纺棺,放射性物質發(fā)生泄漏榄笙。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 41,119評論 3 328
  • 文/蒙蒙 一祷蝌、第九天 我趴在偏房一處隱蔽的房頂上張望茅撞。 院中可真熱鬧,春花似錦巨朦、人聲如沸米丘。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,731評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽拄查。三九已至,卻和暖如春棚蓄,著一層夾襖步出監(jiān)牢的瞬間堕扶,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 32,865評論 1 269
  • 我被黑心中介騙來泰國打工梭依, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留稍算,地道東北人。 一個月前我還...
    沈念sama閱讀 47,899評論 2 370
  • 正文 我出身青樓睛挚,卻偏偏與公主長得像邪蛔,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子扎狱,可洞房花燭夜當晚...
    茶點故事閱讀 44,724評論 2 354

推薦閱讀更多精彩內容