JAVA并發(fā)容器-ConcurrentHashMap 1.7和1.8 源碼解析

HashMap是一個(gè)線程不安全的類异袄,在并發(fā)情況下會產(chǎn)生很多問題报慕,詳情可以參考HashMap 源碼解析彤路;HashTable是線程安全的類松却,但是它使用的是synchronized來保證線程安全旭寿,線程競爭激烈的情況下效率非常低下警绩。在jdk1.5的時(shí)候引入了ConcurrentHashMap,這也是一個(gè)線程安全的類盅称,它使用了分段鎖的技術(shù)來提升并發(fā)訪問效率肩祥。

HashTable容器在競爭激烈的并發(fā)環(huán)境下表現(xiàn)出效率低下的原因是所有訪問HashTable的 線程都必須競爭同一把鎖,假如容器里有多把鎖微渠,每一把鎖用于鎖容器其中一部分?jǐn)?shù)據(jù)搭幻,那么 當(dāng)多線程訪問容器里不同數(shù)據(jù)段的數(shù)據(jù)時(shí),線程間就不會存在鎖競爭逞盆,從而可以有效提高并 發(fā)訪問效率檀蹋,這就是ConcurrentHashMap所使用的鎖分段技術(shù)。

在jdk1.7及以前ConcurrentHashMap采用Segment數(shù)組結(jié)構(gòu)和HashEntry數(shù)組結(jié)構(gòu)組成云芦,之后采用的是和HashMap一樣的結(jié)構(gòu)俯逾。

ConcurrentHashMap jdk1.7

結(jié)構(gòu)圖

ConcurrentHashMap結(jié)構(gòu)圖.png

采用Segment數(shù)組結(jié)構(gòu)和HashEntry數(shù)組結(jié)構(gòu)組成,Segment數(shù)組的大小就是ConcurrentHashMap的并發(fā)度舅逸。Segment繼承自ReentrantLock桌肴,所以他本身就是一個(gè)鎖。Segment數(shù)組一旦初始化后就不會再進(jìn)行擴(kuò)容琉历,這也是jdk1.8去掉他的原因坠七。Segment里面又包含了一個(gè)table數(shù)組,這個(gè)數(shù)組是可以擴(kuò)容的旗笔。

如圖我們在定位數(shù)據(jù)的時(shí)候需要對key的hash值進(jìn)行兩次尋址操作彪置,第一次找到在Segment數(shù)組的位置,第二次找到在table數(shù)組中的位置蝇恶。

Segment 類

// 直接繼承自ReentrantLock拳魁,所以一個(gè)Segment本身就是一個(gè)鎖
static final class Segment<K,V> extends ReentrantLock implements Serializable { 
    ...
   // table數(shù)組  
   transient volatile HashEntry<K,V>[] table;

    // 一個(gè)Segment內(nèi)的元素個(gè)數(shù)
    transient int count;

    // 擴(kuò)容閾值
    transient int threshold;

    // 擴(kuò)容因子
    final float loadFactor;

    Segment(float lf, int threshold, HashEntry<K,V>[] tab) {
        this.loadFactor = lf;
        this.threshold = threshold;
        this.table = tab;
    }
...

我們發(fā)現(xiàn)Segment直接繼承自ReentrantLock,所以一個(gè)Segment本身就是一個(gè)鎖撮弧。所以Segment數(shù)組的長度大小直接影響了ConcurrentHashMap的并發(fā)度潘懊。還有每個(gè)Segment單獨(dú)維護(hù)了擴(kuò)容閾值姚糊,擴(kuò)容因子,所以每個(gè)Segment的擴(kuò)容操作時(shí)完全獨(dú)立互不干擾的授舟。

HashEntry 類

static final class HashEntry<K,V> {
    // 不可變
    final int hash;
    final K key;
    // volatile保證可見性救恨,這樣我們在get操作時(shí)就不用加鎖了
    volatile V value;
    volatile HashEntry<K,V> next;

    HashEntry(int hash, K key, V value, HashEntry<K,V> next) {
        this.hash = hash;
        this.key = key;
        this.value = value;
        this.next = next;
    }
...
}

構(gòu)造函數(shù)

public ConcurrentHashMap(int initialCapacity,
                         float loadFactor, int concurrencyLevel) {
    //  參數(shù)校驗(yàn)
    if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0)
        throw new IllegalArgumentException();
    // 并發(fā)度控制,最大是65536
    if (concurrencyLevel > MAX_SEGMENTS)
        concurrencyLevel = MAX_SEGMENTS;
    // Find power-of-two sizes best matching arguments
    // 等于ssize從1向左移位的 次數(shù)
    int sshift = 0;
    int ssize = 1;
    // 找出最接近c(diǎn)oncurrencyLevel的2的n次冪的數(shù)值
    while (ssize < concurrencyLevel) {
        ++sshift;
        ssize <<= 1;
    }
    // 這里之所 以用32是因?yàn)镃oncurrentHashMap里的hash()方法輸出的最大數(shù)是32位的
    this.segmentShift = 32 - sshift;
    // 散列運(yùn)算的掩碼岂却,等于ssize減1
    this.segmentMask = ssize - 1;
    if (initialCapacity > MAXIMUM_CAPACITY)
        initialCapacity = MAXIMUM_CAPACITY;
    int c = initialCapacity / ssize;
    if (c * ssize < initialCapacity)
        ++c;
    // 里HashEntry數(shù)組的長度度忿薇,它等于initialCapacity除以ssize的倍數(shù)c,如果c大于1躏哩,就會取大于等于c的2的N次方值署浩,所以cap不是1,就是2的N次方扫尺。
    int cap = MIN_SEGMENT_TABLE_CAPACITY;
    // 保證HashEntry數(shù)組大小一定是2的n次冪
    while (cap < c)
        cap <<= 1;
    // create segments and segments[0]
    // 初始化Segment數(shù)組筋栋,并實(shí)際只填充Segment數(shù)組的第0個(gè)元素。
    Segment<K,V> s0 =
        new Segment<K,V>(loadFactor, (int)(cap * loadFactor),
                         (HashEntry<K,V>[])new HashEntry[cap]);
    Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize];
    UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0]
    this.segments = ss;
}

通過代碼我們可以看出正驻,在構(gòu)造ConcurrentHashMap的時(shí)候我們就會完成以下件事情:

  1. 確認(rèn)ConcurrentHashMap的并發(fā)度弊攘,也就是Segment數(shù)組長度,并保證它是2的n次冪
  2. 確認(rèn)HashEntry數(shù)組的初始化長度姑曙,并保證它是2的n次冪
  3. 將Segment數(shù)組初始化好并且只填充第0個(gè)元素

put() 方法

public V put(K key, V value) {
    Segment<K,V> s;
    if (value == null)
        throw new NullPointerException();
    // 1. 先獲取key的hash值
    int hash = hash(key);
    int j = (hash >>> segmentShift) & segmentMask;
    if ((s = (Segment<K,V>)UNSAFE.getObject          // nonvolatile; recheck
         (segments, (j << SSHIFT) + SBASE)) == null) //  in ensureSegment
        // 2. 定位到Segment
        s = ensureSegment(j);
    // 3.調(diào)用Segment的put方法
    return s.put(key, hash, value, false);
}

主要流程是:

  1. 先獲取key的hash值
  2. 定位到Segment
  3. 調(diào)用Segment的put方法

hash() 方法

    private int hash(Object k) {
        int h = hashSeed;

        if ((0 != h) && (k instanceof String)) {
            return sun.misc.Hashing.stringHash32((String) k);
        }

        h ^= k.hashCode();

        // Spread bits to regularize both segment and index locations,
        // using variant of single-word Wang/Jenkins hash.
        h += (h <<  15) ^ 0xffffcd7d;
        h ^= (h >>> 10);
        h += (h <<   3);
        h ^= (h >>>  6);
        h += (h <<   2) + (h << 14);
        return h ^ (h >>> 16);
    }

這個(gè)方法大致思路是:先拿到key的hashCode襟交,然后對這個(gè)值進(jìn)行再散列。

Segment.put() 方法

final V put(K key, int hash, V value, boolean onlyIfAbsent) {
    // 1. 加鎖
    HashEntry<K,V> node = tryLock() ? null :
            // scanAndLockForPut在沒有獲取到鎖的情況下伤靠,去查詢key是否存在捣域,如果不存在就新建一個(gè)Node
        scanAndLockForPut(key, hash, value);
    V oldValue;
    try {
        HashEntry<K,V>[] tab = table;
        // 確定元素在tabl數(shù)組上的位置
        int index = (tab.length - 1) & hash;
        HashEntry<K,V> first = entryAt(tab, index);
        for (HashEntry<K,V> e = first;;) {
            if (e != null) {
                K k;
                // 如果原來位置上有值并且key相同,那么直接替換原來的value
                if ((k = e.key) == key ||
                    (e.hash == hash && key.equals(k))) {
                    oldValue = e.value;
                    if (!onlyIfAbsent) {
                        e.value = value;
                        ++modCount;
                    }
                    break;
                }
                e = e.next;
            }
            else {
                if (node != null)
                    node.setNext(first);
                else
                    node = new HashEntry<K,V>(hash, key, value, first);
                // 元素總數(shù)加一
                int c = count + 1;
                // 判斷是否需要擴(kuò)容
                if (c > threshold && tab.length < MAXIMUM_CAPACITY)
                    rehash(node);
                else
                    setEntryAt(tab, index, node);
                ++modCount;
                count = c;
                oldValue = null;
                break;
            }
        }
    } finally {
        unlock();
    }
    return oldValue;
}

大致過程是:

  1. 加鎖
  2. 定位key在tabl數(shù)組上的索引位置index宴合,獲取到頭結(jié)點(diǎn)
  3. 判斷是否有hash沖突
  4. 如果沒有沖突直接將新節(jié)點(diǎn)node添加到數(shù)組index索引位
  5. 如果有沖突焕梅,先判斷是否有相同key
  6. 有相同key直接替換對應(yīng)node的value值
  7. 沒有添加新元素到鏈表尾部
  8. 解鎖

這里需要注意的是scanAndLockForPut方法,他在沒有獲取到鎖的時(shí)候不僅會通過自旋獲取鎖卦洽,還會做一些其他的查找或新增節(jié)點(diǎn)的工贞言,以此來提升put性能。

Segment.scanAndLockForPut() 方法

private HashEntry<K,V> scanAndLockForPut(K key, int hash, V value) {
    //定位HashEntry數(shù)組位置阀蒂,獲取第一個(gè)節(jié)點(diǎn)
    HashEntry<K,V> first = entryForHash(this, hash);
    HashEntry<K,V> e = first;
    HashEntry<K,V> node = null;
    //掃描次數(shù)该窗,循環(huán)標(biāo)記位
    int retries = -1; // negative while locating node
    while (!tryLock()) {
        HashEntry<K,V> f; // to recheck first below
        // 表示遍歷鏈表還沒有結(jié)束
        if (retries < 0) {
            if (e == null) {
                if (node == null) // speculatively create node
                    //  完成新節(jié)點(diǎn)初始化
                    node = new HashEntry<K,V>(hash, key, value, null);
                // 完成鏈表的遍歷,還是沒有找到相同key的節(jié)點(diǎn)
                retries = 0;
            }
            // 有hash沖突蚤霞,開始查找是否有相同的key
            else if (key.equals(e.key))
                retries = 0;
            else
                e = e.next;
        }
        // 斷循環(huán)次數(shù)是否大于最大掃描次數(shù)
        else if (++retries > MAX_SCAN_RETRIES) {
            // 自旋獲取鎖
            lock();
            break;
        }
        // 每間隔一次循環(huán)酗失,檢查一次first節(jié)點(diǎn)是否改變
        else if ((retries & 1) == 0 &&
                 (f = entryForHash(this, hash)) != first) {
            // 首節(jié)點(diǎn)有變動,更新first争便,重新掃描
            e = first = f; // re-traverse if entry changed
            retries = -1;
        }
    }
    return node;
}

scanAndLockForPut方法在當(dāng)前線程獲取不到segment鎖的情況下,完成查找或新建節(jié)點(diǎn)的工作断医。當(dāng)獲取到鎖后直接將該節(jié)點(diǎn)加入鏈表即可滞乙,提升了put操作的性能奏纪。大致過程:

  1. 定位key在HashEntry數(shù)組的索引位,并獲取第一個(gè)節(jié)點(diǎn)
  2. 嘗試獲取鎖斩启,如果成功直接返回序调,否則進(jìn)入自旋
  3. 判斷是否有hash沖突,沒有就直接完成新節(jié)點(diǎn)的初始化
  4. 有hash沖突兔簇,開始遍歷鏈表查找是否有相同key
  5. 如果沒找到相同key发绢,那么就完成新節(jié)點(diǎn)的初始化
  6. 如果找到相同key,判斷循環(huán)次數(shù)是否大于最大掃描次數(shù)
  7. 如果循環(huán)次數(shù)是否大于最大掃描次數(shù)垄琐,就直接CAS拿鎖(阻塞式)
  8. 如果循環(huán)次數(shù)不大于最大掃描次數(shù)边酒,判斷頭結(jié)點(diǎn)是否有變化
  9. 進(jìn)入下次循環(huán)

Segment.rehash() 擴(kuò)容方法

private void rehash(HashEntry<K,V> node) {
    // 復(fù)制老數(shù)組
    HashEntry<K,V>[] oldTable = table;
    int oldCapacity = oldTable.length;
    // table數(shù)組擴(kuò)容2倍
    int newCapacity = oldCapacity << 1;
    // 擴(kuò)容閾值也增加兩倍
    threshold = (int)(newCapacity * loadFactor);
    // 創(chuàng)建新數(shù)組
    HashEntry<K,V>[] newTable =
        (HashEntry<K,V>[]) new HashEntry[newCapacity];
    // 計(jì)算新的掩碼
    int sizeMask = newCapacity - 1;
    for (int i = 0; i < oldCapacity ; i++) {
        HashEntry<K,V> e = oldTable[i];
        if (e != null) {
            HashEntry<K,V> next = e.next;
            // 計(jì)算新的索引位
            int idx = e.hash & sizeMask;
            // 轉(zhuǎn)移數(shù)據(jù)
            if (next == null)   //  Single node on list
                newTable[idx] = e;
            else { // Reuse consecutive sequence at same slot
                HashEntry<K,V> lastRun = e;
                int lastIdx = idx;
                for (HashEntry<K,V> last = next;
                     last != null;
                     last = last.next) {
                    int k = last.hash & sizeMask;
                    if (k != lastIdx) {
                        lastIdx = k;
                        lastRun = last;
                    }
                }
                newTable[lastIdx] = lastRun;
                // Clone remaining nodes
                for (HashEntry<K,V> p = e; p != lastRun; p = p.next) {
                    V v = p.value;
                    int h = p.hash;
                    int k = h & sizeMask;
                    HashEntry<K,V> n = newTable[k];
                    newTable[k] = new HashEntry<K,V>(h, p.key, v, n);
                }
            }
        }
    }
    // 將新的節(jié)點(diǎn)加到對應(yīng)索引位
    int nodeIndex = node.hash & sizeMask; // add the new node
    node.setNext(newTable[nodeIndex]);
    newTable[nodeIndex] = node;
    table = newTable;
}

在這里我們可以發(fā)現(xiàn)每次擴(kuò)容是針對一個(gè)單獨(dú)的Segment的,在擴(kuò)容完成之前中不會對擴(kuò)容前的數(shù)組進(jìn)行修改狸窘,這樣就可以保證get()不被擴(kuò)容影響墩朦。大致過程是:

  1. 新建擴(kuò)容后的數(shù)組,容量是原來的兩倍
  2. 遍歷擴(kuò)容前的數(shù)組
  3. 通過e.hash & sizeMask;計(jì)算key新的索引位
  4. 轉(zhuǎn)移數(shù)據(jù)
  5. 將擴(kuò)容后的數(shù)組指向成員變量table

get() 方法

public V get(Object key) {
    Segment<K,V> s; // manually integrate access methods to reduce overhead
    HashEntry<K,V>[] tab;
    int h = hash(key);
    // 計(jì)算出Segment的索引位
    long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE;
    // 以原子的方式獲取Segment
    if ((s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)) != null &&
        (tab = s.table) != null) {
        // 原子方式獲取HashEntry
        for (HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile
                 (tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE);
             e != null; e = e.next) {
            K k;
            // key相同
            if ((k = e.key) == key || (e.hash == h && key.equals(k)))
                // value是volatile所以可以不加鎖直接取值返回
                return e.value;
        }
    }
    return null;
}

我們可以看到get方法是沒有加鎖的翻擒,因?yàn)镠ashEntry的value和next屬性是volatile的氓涣,volatile直接保證了可見性,所以讀的時(shí)候可以不加鎖陋气。Java中Unsafe類可以參考這篇博客劳吠。

size() 方法

public int size() {
    // Try a few times to get accurate count. On failure due to
    // continuous async changes in table, resort to locking.
    final Segment<K,V>[] segments = this.segments;
    int size;
    // true表示size溢出32位(大于Integer.MAX_VALUE)
    boolean overflow; // true if size overflows 32 bits
    long sum;         // sum of modCounts
    long last = 0L;   // previous sum
    int retries = -1; // first iteration isn't retry
    try {
        for (;;) {
            // retries 如果retries等于2則對所有Segment加鎖
            if (retries++ == RETRIES_BEFORE_LOCK) {
                for (int j = 0; j < segments.length; ++j)
                    ensureSegment(j).lock(); // force creation
            }
            sum = 0L;
            size = 0;
            overflow = false;
            // 統(tǒng)計(jì)每個(gè)Segment元素個(gè)數(shù)
            for (int j = 0; j < segments.length; ++j) {
                Segment<K,V> seg = segmentAt(segments, j);
                if (seg != null) {
                    sum += seg.modCount;
                    int c = seg.count;
                    if (c < 0 || (size += c) < 0)
                        overflow = true;
                }
            }
            if (sum == last)
                break;
            last = sum;
        }
    } finally {
        // 解鎖
        if (retries > RETRIES_BEFORE_LOCK) {
            for (int j = 0; j < segments.length; ++j)
                segmentAt(segments, j).unlock();
        }
    }
    // 如果size大于Integer.MAX_VALUE值則直接返貨Integer.MAX_VALUE
    return overflow ? Integer.MAX_VALUE : size;
}

size的核心思想是先進(jìn)性兩次不加鎖統(tǒng)計(jì),如果兩次的值一樣則直接返回巩趁,否則第三個(gè)統(tǒng)計(jì)的時(shí)候會將所有segment全部鎖定痒玩,再進(jìn)行size統(tǒng)計(jì),所以size()盡量少用晶渠。因?yàn)檫@是在并發(fā)情況下凰荚,size其他線程也會改變size大小,所以size()的返回值只能表示當(dāng)前線程褒脯、當(dāng)時(shí)的一個(gè)狀態(tài)便瑟,可以算其實(shí)是一個(gè)預(yù)估值。

isEmpty() 方法

public boolean isEmpty() {
    long sum = 0L;
    final Segment<K,V>[] segments = this.segments;
    for (int j = 0; j < segments.length; ++j) {
        Segment<K,V> seg = segmentAt(segments, j);
        if (seg != null) {
            // 只要有一個(gè)Segment的元素個(gè)數(shù)不為0則表示不為null
            if (seg.count != 0)
                return false;
            // 統(tǒng)計(jì)操作總數(shù)
            sum += seg.modCount;
        }
    }
    if (sum != 0L) { // recheck unless no modifications
        for (int j = 0; j < segments.length; ++j) {
            Segment<K,V> seg = segmentAt(segments, j);
            if (seg != null) {
                if (seg.count != 0)
                    return false;
                sum -= seg.modCount;
            }
        }
        // 說明在統(tǒng)計(jì)過程中ConcurrentHashMap又被操作過番川,
        // 因?yàn)樯厦媾袛嗔薈oncurrentHashMap不可能會有元素到涂,所以這里如果有操作一定是新增節(jié)點(diǎn)
        if (sum != 0L)
            return false;
    }
    return true;
}
  1. 先判斷Segment里面是否有元素,如果有直接返回颁督,如果沒有則統(tǒng)計(jì)操作總數(shù)践啄;
  2. 為了保證在統(tǒng)計(jì)過程中ConcurrentHashMap里面的元素沒有發(fā)生變化,再對所有的Segment的操作數(shù)做了統(tǒng)計(jì)沉御;
  3. 最后 sum==0 表示ConcurrentHashMap里面確實(shí)沒有元素返回true屿讽,否則一定進(jìn)行過新增元素返回false。

和size方法一樣這個(gè)方法也是一個(gè)若一致方法,最后的結(jié)果也是一個(gè)預(yù)估值伐谈。


ConcurrentHashMap jdk1.8

結(jié)構(gòu)圖

ConcurrentHashMap jdk1.8 數(shù)據(jù)結(jié)構(gòu).png

這個(gè)結(jié)構(gòu)和HashMap一樣

核心屬性

//最大容量
private static final int MAXIMUM_CAPACITY = 1 << 30;
//初始容量
private static final int DEFAULT_CAPACITY = 16;
//數(shù)組最大容量
static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;
//默認(rèn)并發(fā)度烂完,兼容1.7及之前版本
private static final int DEFAULT_CONCURRENCY_LEVEL = 16;
//加載/擴(kuò)容因子,實(shí)際使用n - (n >>> 2)
private static final float LOAD_FACTOR = 0.75f;
//鏈表轉(zhuǎn)紅黑樹的節(jié)點(diǎn)數(shù)閥值
static final int TREEIFY_THRESHOLD = 8;
//紅黑樹轉(zhuǎn)鏈表的節(jié)點(diǎn)數(shù)閥值
static final int UNTREEIFY_THRESHOLD = 6;
//當(dāng)數(shù)組長度還未超過64,優(yōu)先數(shù)組的擴(kuò)容,否則將鏈表轉(zhuǎn)為紅黑樹
static final int MIN_TREEIFY_CAPACITY = 64;
//擴(kuò)容時(shí)任務(wù)的最小轉(zhuǎn)移節(jié)點(diǎn)數(shù)
private static final int MIN_TRANSFER_STRIDE = 16;
//sizeCtl中記錄stamp的位數(shù)
private static int RESIZE_STAMP_BITS = 16;
//幫助擴(kuò)容的最大線程數(shù)
private static final int MAX_RESIZERS = (1 << (32 - RESIZE_STAMP_BITS)) - 1;
//size在sizeCtl中的偏移量
private static final int RESIZE_STAMP_SHIFT = 32 - RESIZE_STAMP_BITS;

// ForwardingNode標(biāo)記節(jié)點(diǎn)的hash值(表示正在擴(kuò)容)
static final int MOVED     = -1; // hash for forwarding nodes
// TreeBin節(jié)點(diǎn)的hash值诵棵,它是對應(yīng)桶的根節(jié)點(diǎn)
static final int TREEBIN   = -2; // hash for roots of trees
static final int RESERVED  = -3; // hash for transient reservations
static final int HASH_BITS = 0x7fffffff; // usable bits of normal node hash

//存放Node元素的數(shù)組,在第一次插入數(shù)據(jù)時(shí)初始化
transient volatile Node<K,V>[] table;
//一個(gè)過渡的table表,只有在擴(kuò)容的時(shí)候才會使用
private transient volatile Node<K,V>[] nextTable;
//基礎(chǔ)計(jì)數(shù)器值(size = baseCount + CounterCell[i].value)
private transient volatile long baseCount;
/**
 * 控制table數(shù)組的初始化和擴(kuò)容抠蚣,不同的值有不同的含義:
 * -1:表示正在初始化
 * -n:表示正在擴(kuò)容
 * 0:表示還未初始化,默認(rèn)值
 * 大于0:表示下一次擴(kuò)容的閾值
 */
private transient volatile int sizeCtl;
//節(jié)點(diǎn)轉(zhuǎn)移時(shí)下一個(gè)需要轉(zhuǎn)移的table索引
private transient volatile int transferIndex;
//元素變化時(shí)用于控制自旋
private transient volatile int cellsBusy;
// 保存table中的每個(gè)節(jié)點(diǎn)的元素個(gè)數(shù) 長度是2的冪次方履澳,初始化是2嘶窄,每次擴(kuò)容為原來的2倍
// size = baseCount + CounterCell[i].value
private transient volatile CounterCell[] counterCells;

其他的參考HashMap 源碼解析

Node 類

static class Node<K,V> implements Map.Entry<K,V> {
    final int hash;
    final K key;
    volatile V val;
    volatile Node<K,V> next;

    Node(int hash, K key, V val, Node<K,V> next) {
        this.hash = hash;
        this.key = key;
        this.val = val;
        this.next = next;
    }
...

鏈表節(jié)點(diǎn),保存著key和value的值距贷。

TreeNode類

static final class TreeNode<K,V> extends Node<K,V> {
    TreeNode<K,V> parent;  // red-black tree links
    TreeNode<K,V> left;
    TreeNode<K,V> right;
    TreeNode<K,V> prev;    // needed to unlink next upon deletion
    boolean red;

    TreeNode(int hash, K key, V val, Node<K,V> next,
             TreeNode<K,V> parent) {
        super(hash, key, val, next);
        this.parent = parent;
    }
...

紅黑樹節(jié)點(diǎn)柄冲,包含了樹的信息。

TreeNode類

TreeBins中使用的節(jié)點(diǎn)

static final class TreeBin<K,V> extends Node<K,V> {
    TreeNode<K,V> root;
    volatile TreeNode<K,V> first;
    // 鎖的持有者
    volatile Thread waiter;
    // 鎖狀態(tài)
    volatile int lockState;
    // values for lockState
    // 表示持有寫鎖
    static final int WRITER = 1; // set while holding write lock
    // 表示等待
    static final int WAITER = 2; // set when waiting for write lock
    // 表示讀鎖的增量值
    static final int READER = 4; // increment value for setting read lock
...

與HashMap有點(diǎn)區(qū)別的是储耐,他不直接使用TreeNode作為數(shù)的根節(jié)點(diǎn)羊初,而是使用TreeBins對其做了裝飾后成為了根節(jié)點(diǎn);同時(shí)它還記錄了鎖的狀態(tài)什湘;需要注意的是:

  1. TreeBins節(jié)點(diǎn)的hash值是 -2
  2. 我們對紅黑樹添加節(jié)點(diǎn)后长赞,紅黑樹的根節(jié)點(diǎn)有可能會因?yàn)樾D(zhuǎn)而發(fā)生變化,所以我們在添加樹節(jié)點(diǎn)的時(shí)候在putTreeVal()方法里面我們使用cas在加了一次鎖闽撤。

ForwardingNode 類

/**
 * A node inserted at head of bins during transfer operations.
 */
static final class ForwardingNode<K,V> extends Node<K,V> {
    final Node<K,V>[] nextTable;
    ForwardingNode(Node<K,V>[] tab) {
        super(MOVED, null, null, null);
        this.nextTable = tab;
    }

    Node<K,V> find(int h, Object k) {
        // loop to avoid arbitrarily deep recursion on forwarding nodes
        outer: for (Node<K,V>[] tab = nextTable;;) {
            Node<K,V> e; int n;
            // 1. 判斷新的數(shù)組是否是null得哆,
            // 2. 如果不為NULL給那就找到對應(yīng)索引位上的頭結(jié)點(diǎn)
            // 3. 判斷頭節(jié)點(diǎn)是否為NULL
            if (k == null || tab == null || (n = tab.length) == 0 ||
                (e = tabAt(tab, (n - 1) & h)) == null)
                return null;
            // 自旋找節(jié)點(diǎn)
            for (;;) {
                int eh; K ek;
                if ((eh = e.hash) == h &&
                    ((ek = e.key) == k || (ek != null && k.equals(ek))))
                    return e;
                if (eh < 0) {
                    // 如果又變成了ForwardingNode標(biāo)記節(jié)點(diǎn),那說明有發(fā)生了擴(kuò)容哟旗,需要跳出循環(huán)從新查找
                    if (e instanceof ForwardingNode) {
                        tab = ((ForwardingNode<K,V>)e).nextTable;
                        continue outer;
                    }
                    else
                        return e.find(h, k);
                }
                if ((e = e.next) == null)
                    return null;
            }
        }
    }
}

ForwardingNode 節(jié)點(diǎn)是一個(gè)擴(kuò)容標(biāo)記節(jié)點(diǎn)贩据,只要在數(shù)組上發(fā)現(xiàn)對應(yīng)索引位上是ForwardingNode 節(jié)點(diǎn)時(shí),表示正在擴(kuò)容闸餐。當(dāng)get方法調(diào)用時(shí)饱亮,如果遇到ForwardingNode 節(jié)點(diǎn),那么它將會到擴(kuò)容后的數(shù)據(jù)上查找數(shù)據(jù)舍沙,否則還是在擴(kuò)容前的數(shù)組上查找數(shù)據(jù)近上。這個(gè)要注意兩點(diǎn):

  1. 這個(gè)節(jié)點(diǎn)的hash值是 -1
  2. 這個(gè)節(jié)點(diǎn)的find方法是在對擴(kuò)容后的數(shù)組進(jìn)行查找

構(gòu)造函數(shù)

public ConcurrentHashMap18() {
}

與HashMap一樣,構(gòu)造函數(shù)啥都沒干拂铡,初始化操作是在第一次put完成的壹无。

put() 方法

    public V put(K key, V value) {
        return putVal(key, value, false);
    }

啥都么有

spread() 方法

    static final int spread(int h) {
        return (h ^ (h >>> 16)) & HASH_BITS;
    }

計(jì)算key.hashCode()并將更高位的散列擴(kuò)展(XOR)降低。采用位運(yùn)算主要是是加快計(jì)算速度感帅。

putVal() 方法

/** Implementation for put and putIfAbsent */
final V putVal(K key, V value, boolean onlyIfAbsent) {
    if (key == null || value == null) throw new NullPointerException();
    // 計(jì)算hash值
    int hash = spread(key.hashCode());
    int binCount = 0;
    for (Node<K,V>[] tab = table;;) {
        Node<K,V> f; int n, i, fh;
        // 判斷是否需要初始化
        if (tab == null || (n = tab.length) == 0)
            tab = initTable();
        // 找出key對應(yīng)的索引位上的第一個(gè)節(jié)點(diǎn)
        else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
            // 如果該索引位為null斗锭,則直接將數(shù)據(jù)放到該索引位
            if (casTabAt(tab, i, null,
                         new Node<K,V>(hash, key, value, null)))
                break;                   // no lock when adding to empty bin
        }
        // 正在擴(kuò)容
        else if ((fh = f.hash) == MOVED)
            tab = helpTransfer(tab, f);
        else {
            V oldVal = null;
            // 加內(nèi)置鎖鎖定一個(gè)數(shù)組的索引位,并添加節(jié)點(diǎn)
            synchronized (f) {
                if (tabAt(tab, i) == f) {
                    // 表示鏈表節(jié)點(diǎn)
                    if (fh >= 0) {
                        binCount = 1;
                        for (Node<K,V> e = f;; ++binCount) {
                            K ek;
                            // key相同直接替換value值
                            if (e.hash == hash &&
                                ((ek = e.key) == key ||
                                 (ek != null && key.equals(ek)))) {
                                oldVal = e.val;
                                if (!onlyIfAbsent)
                                    e.val = value;
                                break;
                            }
                            // 將新節(jié)點(diǎn)添加到鏈表尾部
                            Node<K,V> pred = e;
                            if ((e = e.next) == null) {
                                pred.next = new Node<K,V>(hash, key,
                                                          value, null);
                                break;
                            }
                        }
                    }
                    // 表示樹節(jié)點(diǎn)
                    else if (f instanceof TreeBin) {
                        Node<K,V> p;
                        binCount = 2;
                        // 添加數(shù)節(jié)點(diǎn)
                        if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
                                                       value)) != null) {
                            oldVal = p.val;
                            if (!onlyIfAbsent)
                                p.val = value;
                        }
                    }
                }
            }
            if (binCount != 0) {
                // 嘗試將鏈表轉(zhuǎn)換成紅黑樹
                if (binCount >= TREEIFY_THRESHOLD)
                    treeifyBin(tab, i);
                if (oldVal != null)
                    return oldVal;
                break;
            }
        }
    }
    addCount(1L, binCount);
    return null;
}

主要流程:

  1. 計(jì)算key的hash值
  2. 判斷是否需要初始化失球,如果是則調(diào)用initTable()方法完成初始化
  3. 判斷是否有hash沖突岖是,如沒有直接設(shè)置新節(jié)點(diǎn)到對飲索引位,如果有獲取頭結(jié)點(diǎn)
  4. 根據(jù)頭結(jié)點(diǎn)的hash值判斷是否正在擴(kuò)容,如果是則幫助擴(kuò)容
  5. 如果沒有擴(kuò)容則對頭結(jié)點(diǎn)加鎖豺撑,添加新節(jié)點(diǎn)
  6. fh >= 0根據(jù)頭結(jié)點(diǎn)hash值判斷是否是鏈表節(jié)點(diǎn)作箍,如果是新增鏈表節(jié)點(diǎn),否則新增樹節(jié)點(diǎn)
  7. 新增樹節(jié)點(diǎn)putTreeVal()需要注意前硫,紅黑樹的根節(jié)點(diǎn)有可能會因?yàn)樾D(zhuǎn)而發(fā)生變化,所以我們在添加節(jié)點(diǎn)的時(shí)候還需要對根節(jié)點(diǎn)使用cas在加了一次鎖荧止。
  8. 判斷是否需要嘗試由鏈表轉(zhuǎn)換成樹結(jié)構(gòu)
  9. addCount(1L, binCount);新增count數(shù)屹电,并判斷是否需要擴(kuò)容或者幫助擴(kuò)容

sizeCtl值含義:

  • -1:表示正在初始化
  • -n:表示正在擴(kuò)容
  • 0:表示還未初始化,默認(rèn)值
  • 大于0:表示下一次擴(kuò)容的閾值

initTable() 初始化方法

private final Node<K,V>[] initTable() {
    Node<K,V>[] tab; int sc;
    while ((tab = table) == null || tab.length == 0) {
        // 正在初始化
        if ((sc = sizeCtl) < 0)
            // 讓出CPU執(zhí)行權(quán)跃巡,然后自旋
            Thread.yield(); // lost initialization race; just spin
        // CAS替換標(biāo)志位(相當(dāng)于獲取鎖)
        else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
            try {
                // 二次判斷
                if ((tab = table) == null || tab.length == 0) {
                    int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
                    @SuppressWarnings("unchecked")
                    Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
                    table = tab = nt;
                    // 相當(dāng)于sc=n*3/4
                    sc = n - (n >>> 2); 
                }
            } finally {
                // 擴(kuò)容閾值
                sizeCtl = sc;
            }
            break;
        }
    }
    return tab;
}

主要過程:

  1. 根據(jù)sizeCtl判斷是否正在初始化
  2. 如果其他線程正在初始化就讓出CPU執(zhí)行權(quán)危号,進(jìn)入下一次CPU執(zhí)行權(quán)的競爭Thread.yield();
  3. 如果沒有進(jìn)行初始化的線程則,CAS替換sizeCtl標(biāo)志位(相當(dāng)于獲取鎖)
  4. 獲取到鎖后再次判斷是否初始化
  5. 如果沒有則初始化Node數(shù)組,并設(shè)置sizeCtl值為下一次擴(kuò)容閾值

helpTransfer()幫助擴(kuò)容

final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) {
    Node<K,V>[] nextTab; int sc;
    // ForwardingNode標(biāo)記節(jié)點(diǎn)素邪,表示正在擴(kuò)容
    if (tab != null && (f instanceof ForwardingNode) &&
        (nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) {
        int rs = resizeStamp(tab.length);
        while (nextTab == nextTable && table == tab &&
               (sc = sizeCtl) < 0) {
            if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
                sc == rs + MAX_RESIZERS || transferIndex <= 0)
                break;
            if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) {
                transfer(tab, nextTab);
                break;
            }
        }
        return nextTab;
    }
    return table;
}

判斷是否正在擴(kuò)容外莲,如果是就幫助擴(kuò)容。

transfer() 擴(kuò)容方法

private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {\
    // n原來數(shù)組長度
    int n = tab.length, stride;
    if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
        stride = MIN_TRANSFER_STRIDE; // subdivide range
    // 判斷是發(fā)起擴(kuò)容的線程還是幫助擴(kuò)容的線程兔朦,如果是發(fā)起擴(kuò)容的需要初始化新數(shù)組
    if (nextTab == null) {            // initiating
        try {
            @SuppressWarnings("unchecked")
            Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
            nextTab = nt;
        } catch (Throwable ex) {      // try to cope with OOME
            sizeCtl = Integer.MAX_VALUE;
            return;
        }
        nextTable = nextTab;
        transferIndex = n;
    }
    int nextn = nextTab.length;
    // 擴(kuò)容期間的數(shù)據(jù)節(jié)點(diǎn)(用于標(biāo)志位偷线,hash值是-1)
    ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
    // 當(dāng)advance == true時(shí),表明該節(jié)點(diǎn)已經(jīng)處理過了
    boolean advance = true;
    // 在擴(kuò)容完成之前保證get不被影響
    boolean finishing = false; // to ensure sweep before committing nextTab
    // 1. 從右往左找到第一個(gè)有數(shù)據(jù)的索引位節(jié)點(diǎn)(有hash沖突的桶)
    // 2. 如果找到的節(jié)點(diǎn)是NULL節(jié)點(diǎn)(沒有hash沖突的節(jié)點(diǎn))沽甥,那么將該索引位的NULL替換成ForwardingNode標(biāo)記節(jié)點(diǎn),這個(gè)節(jié)點(diǎn)的hash是-1
    // 3. 如果找到不為NULL的節(jié)點(diǎn)(有hash沖突的桶),則對這個(gè)節(jié)點(diǎn)進(jìn)行加鎖
    // 4. 開始進(jìn)進(jìn)移動節(jié)點(diǎn)數(shù)據(jù)
    for (int i = 0, bound = 0;;) {
        //f:當(dāng)前處理i位置的node(頭結(jié)點(diǎn)或者根節(jié)點(diǎn));
        Node<K,V> f; int fh;
        // 通過while循環(huán)獲取本次需要移動的節(jié)點(diǎn)索引i
        while (advance) {
            // nextIndex:下一個(gè)要處理的節(jié)點(diǎn)索引; nextBound:下一個(gè)需要處理的節(jié)點(diǎn)的索引邊界
            int nextIndex, nextBound;
            // i是老數(shù)組索引位谱煤,通過--i來講索引位往前一個(gè)索引位移動噪服,直到0索引位
            if (--i >= bound || finishing)
                advance = false;
            // 節(jié)點(diǎn)已全部轉(zhuǎn)移
            else if ((nextIndex = transferIndex) <= 0) {
                i = -1;
                advance = false;
            }
            // transferIndex(初值為最后一個(gè)節(jié)點(diǎn)的索引),表示從transferIndex開始后面所有的節(jié)點(diǎn)都已分配恨诱,
            // 每次線程領(lǐng)取擴(kuò)容任務(wù)后媳瞪,需要更新transferIndex的值(transferIndex-stride)。
            // CAS修改transferIndex照宝,并更新索引邊界
            else if (U.compareAndSwapInt
                     (this, TRANSFERINDEX, nextIndex,
                      nextBound = (nextIndex > stride ?
                                   nextIndex - stride : 0))) {
                bound = nextBound;
                // 老數(shù)組最后一個(gè)索引位置
                i = nextIndex - 1;
                advance = false;
            }
        }
        if (i < 0 || i >= n || i + n >= nextn) {
            int sc;
            // 已經(jīng)完成所有節(jié)點(diǎn)復(fù)制了
            if (finishing) {
                nextTable = null;
                table = nextTab;
                // sizeCtl閾值為原來的1.5倍
                sizeCtl = (n << 1) - (n >>> 1);
                // 結(jié)束自旋
                return;
            }
            // CAS 更新擴(kuò)容閾值蛇受,在這里面sizectl值減一,說明新加入一個(gè)線程參與到擴(kuò)容操作
            if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
                if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
                    return;
                finishing = advance = true;
                i = n; // recheck before commit
            }
        }
        // 將以前老數(shù)組上為NULL的節(jié)點(diǎn)(還沒有元素的桶或者說成沒有hash沖突的數(shù)據(jù)節(jié)點(diǎn))硫豆,用ForwardingNode標(biāo)記節(jié)點(diǎn)補(bǔ)齊
        // 主要作用是:其他線程在put元素龙巨,發(fā)現(xiàn)找到的索引位是fwd節(jié)點(diǎn)則表示正在擴(kuò)容,那么該線程會來幫助擴(kuò)通熊响,而不是在那里等待
        else if ((f = tabAt(tab, i)) == null)
            advance = casTabAt(tab, i, null, fwd);
        // 表示處理過該節(jié)點(diǎn)了
        else if ((fh = f.hash) == MOVED)
            advance = true; // already processed
        else {
            // 對應(yīng)索引位加鎖
            synchronized (f) {
                // 再次校驗(yàn)一下老數(shù)組對應(yīng)索引位節(jié)點(diǎn)是否是我們找到的節(jié)點(diǎn)f
                if (tabAt(tab, i) == f) {
                    // 低索引位頭節(jié)點(diǎn)(i位)旨别, 高位索引位頭節(jié)點(diǎn)(i+tab.length)
                    Node<K,V> ln, hn;
                    // fh >=0 表示鏈表節(jié)點(diǎn),TreeBin節(jié)點(diǎn)的hash值-2
                    if (fh >= 0) {
                        // fh & n算法可以算出新的節(jié)點(diǎn)該分配到那個(gè)索引位(runBit要么為0放低位ln汗茄,要么為n放高位hn)秸弛,
                        // runBit表示鏈表中最后一個(gè)元素的hash值&n的值
                        int runBit = fh & n;
                        // lastRun表示鏈表中最后一個(gè)元素
                        Node<K,V> lastRun = f;
                        // 找到鏈表中最后一個(gè)節(jié)點(diǎn),并賦值給lastRun
                        for (Node<K,V> p = f.next; p != null; p = p.next) {
                            int b = p.hash & n;
                            if (b != runBit) {
                                runBit = b;
                                lastRun = p;
                            }
                        }
                        // 判斷原來的最后一個(gè)節(jié)點(diǎn)應(yīng)該添加到高位還是低位
                        if (runBit == 0) {
                            ln = lastRun;
                            hn = null;
                        }
                        else {
                            hn = lastRun;
                            ln = null;
                        }
                        // f表示頭結(jié)點(diǎn),如果p不是尾節(jié)點(diǎn)递览,則轉(zhuǎn)移節(jié)點(diǎn)
                        // 如果以前節(jié)點(diǎn)順序是 1 2 3 4 轉(zhuǎn)移后就是 3 2 1 4 
                        for (Node<K,V> p = f; p != lastRun; p = p.next) {
                            int ph = p.hash; K pk = p.key; V pv = p.val;
                            if ((ph & n) == 0)
                                // 轉(zhuǎn)移節(jié)點(diǎn)時(shí)都是新建節(jié)點(diǎn),以免破壞原來數(shù)組結(jié)構(gòu)影響get方法
                                ln = new Node<K,V>(ph, pk, pv, ln);
                            else
                                hn = new Node<K,V>(ph, pk, pv, hn);
                        }
                        // 設(shè)置新數(shù)組低索引位頭節(jié)點(diǎn)(i位)
                        setTabAt(nextTab, i, ln);
                        // 設(shè)置新數(shù)組高位索引位頭節(jié)點(diǎn)(i+tab.length)
                        setTabAt(nextTab, i + n, hn);
                        // 設(shè)置老數(shù)組i位為標(biāo)記節(jié)點(diǎn)叼屠,表示已經(jīng)處理過了
                        setTabAt(tab, i, fwd);
                        advance = true;
                    }
                    else if (f instanceof TreeBin) {
                        TreeBin<K,V> t = (TreeBin<K,V>)f;
                        TreeNode<K,V> lo = null, loTail = null;
                        TreeNode<K,V> hi = null, hiTail = null;
                        int lc = 0, hc = 0;
                        for (Node<K,V> e = t.first; e != null; e = e.next) {
                            int h = e.hash;
                            TreeNode<K,V> p = new TreeNode<K,V>
                                (h, e.key, e.val, null, null);
                            if ((h & n) == 0) {
                                if ((p.prev = loTail) == null)
                                    lo = p;
                                else
                                    loTail.next = p;
                                loTail = p;
                                ++lc;
                            }
                            else {
                                if ((p.prev = hiTail) == null)
                                    hi = p;
                                else
                                    hiTail.next = p;
                                hiTail = p;
                                ++hc;
                            }
                        }
                        ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
                            (hc != 0) ? new TreeBin<K,V>(lo) : t;
                        hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
                            (lc != 0) ? new TreeBin<K,V>(hi) : t;
                        // 設(shè)置新數(shù)組低索引位頭節(jié)點(diǎn)(i位)
                        setTabAt(nextTab, i, ln);
                        // 設(shè)置新數(shù)組高位索引位頭節(jié)點(diǎn)(i+tab.length)
                        setTabAt(nextTab, i + n, hn);
                        // 設(shè)置老數(shù)組i位為標(biāo)記節(jié)點(diǎn),表示已經(jīng)處理過了
                        setTabAt(tab, i, fwd);
                        advance = true;
                    }
                }
            }
        }
    }
}

主要過程:

  1. tab為擴(kuò)容前的數(shù)組
  2. 判斷是否是第一個(gè)發(fā)起擴(kuò)容的線程绞铃,如果是需要初始化擴(kuò)容后的數(shù)組nextTable
  3. fwd = new ForwardingNode<K,V>(nextTab)初始化擴(kuò)容標(biāo)記節(jié)點(diǎn)
  4. 進(jìn)入擴(kuò)容循環(huán)
  5. 在擴(kuò)容前的數(shù)組tab上從右往左(從高索引位到低索引位)遍歷所有頭結(jié)點(diǎn)镜雨,索引位為i
  6. 如果找到的頭結(jié)點(diǎn)是NULL(沒有hash沖突)則tab[i]=fwd
  7. 找到的頭結(jié)點(diǎn)不為NULL則(有hash沖突)則鎖定頭結(jié)點(diǎn)synchronized (f)
  8. 再次校驗(yàn)頭結(jié)點(diǎn)是否發(fā)生改變儿捧,如果改變直接結(jié)束
  9. 初始化高索引位和第索引位的頭結(jié)點(diǎn)
  10. 移動節(jié)點(diǎn)到相應(yīng)索引位
  11. 設(shè)置擴(kuò)容后的數(shù)組低索引位頭節(jié)點(diǎn)(i位)
  12. 設(shè)置擴(kuò)容后的數(shù)組高位索引位頭節(jié)點(diǎn)(i+tab.length)
  13. 設(shè)置擴(kuò)容前的數(shù)組i位為標(biāo)記節(jié)點(diǎn)(tab[i]=fwd)荚坞,表示已經(jīng)處理過了
  14. 進(jìn)入第3步直到完成

注意:

  • 第5點(diǎn)有tab[i]=fwd有兩層含義:1,表示對應(yīng)索引位已經(jīng)處理過了;2,當(dāng)其他線程拿到該頭結(jié)點(diǎn)的時(shí)候能知曉正在擴(kuò)容菲盾,這時(shí)在put的時(shí)候幫助擴(kuò)容颓影,在get的時(shí)候去擴(kuò)容后的數(shù)組上找相應(yīng)的key
  • int runBit = fh & n;算法可以算出新的節(jié)點(diǎn)該分配到那個(gè)索引位(runBit要么為0放低位ln,要么為n放高位hn)
  • 如果是鏈表節(jié)點(diǎn)懒鉴,以前節(jié)點(diǎn)順序是 1 2 3 4 擴(kuò)容后會變成 3 2 1 4

擴(kuò)容的大致過程圖解:

  1. 發(fā)起擴(kuò)容诡挂,擴(kuò)容前數(shù)組tab

    ConcurrentHashMap擴(kuò)容1.jpg

  2. 在擴(kuò)容前的數(shù)組tab上從右往左(從高索引位到低索引位)遍歷所有頭結(jié)點(diǎn),索引位為i临谱,如果找到的頭結(jié)點(diǎn)是NULL則直接賦值成````fwd``` 標(biāo)記節(jié)點(diǎn)璃俗。

    ConcurrentHashMap擴(kuò)容2.jpg

  3. 擴(kuò)容前的數(shù)組上找到不為NULL的節(jié)點(diǎn),則還是移動節(jié)點(diǎn)到擴(kuò)容后的額數(shù)組


    ConcurrentHashMap擴(kuò)容3.jpg

addCount() 方法

private final void addCount(long x, int check) {
    // CounterCell[] as;使用計(jì)數(shù)器數(shù)組因該是為了提升并發(fā)量悉默,減小沖突概率
    CounterCell[] as; long b, s;
    // 計(jì)數(shù)器表不為NULL(counterCells當(dāng)修改baseCount有沖突時(shí)旧找,需要將size增量放到這個(gè)計(jì)數(shù)器數(shù)組里面)
    if ((as = counterCells) != null ||
        // 使用CAS更新baseCount的值(+1)如果失敗說明存在競爭
        !U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
        CounterCell a; long v; int m;
        // CounterCell是否存在競爭的標(biāo)記位
        boolean uncontended = true;
        // CounterCell[] as為NULL表示as沒有競爭
        if (as == null || (m = as.length - 1) < 0 ||
            // 隨機(jī)一個(gè)數(shù)組位置來驗(yàn)證是否為NULL,如果a是null表示沒有競爭麦牺,隨機(jī)也是為了減小沖突概率
            (a = as[ThreadLocalRandom.getProbe() & m]) == null ||
            // CAS替換a的value钮蛛,如果失敗表示存在競爭
            !(uncontended =
              U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {
            // 將size增量值存到as上
            fullAddCount(x, uncontended);
            return;
        }
        if (check <= 1)
            return;
        // 統(tǒng)計(jì)size
        s = sumCount();
    }
    // 檢查是否需要擴(kuò)容
    if (check >= 0) {
        Node<K,V>[] tab, nt; int n, sc;
        // size大于閾值sizeCtl,tab數(shù)組長度小于最大值1<<30
        while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
               (n = tab.length) < MAXIMUM_CAPACITY) {
            int rs = resizeStamp(n);
            // 表示正在擴(kuò)容
            if (sc < 0) {
                if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
                    sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
                    transferIndex <= 0)
                    break;
                if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
                    // 幫助擴(kuò)容
                    transfer(tab, nt);
            }
            // sc = (rs << RESIZE_STAMP_SHIFT) + 2剖膳,移位后是負(fù)數(shù)
            else if (U.compareAndSwapInt(this, SIZECTL, sc,
                                         (rs << RESIZE_STAMP_SHIFT) + 2))
                // 發(fā)起擴(kuò)容魏颓,此時(shí)nextTable=null
                transfer(tab, null);
            s = sumCount();
        }
    }
}

put()方法執(zhí)行最后會對當(dāng)前Map的size+1,ConcurrentHashMap中size由baseCountCounterCell[] as組成吱晒,size=baseCount+as[i].value甸饱。addCount的大致過程如下:

  1. CAS替換baseCount值,如果失敗說明對size的增量(size++)存在競爭
  2. 如果存在競爭仑濒,我們會使用到CounterCell[] as數(shù)組
  3. as[ThreadLocalRandom.getProbe() & m]隨機(jī)取一個(gè)索引位叹话,使用CAS完成size++
  4. 如果as[i]也存在競爭會調(diào)用fullAddCount(x, uncontended);方法完成size++
  5. size++完成后通過size=baseCount+as[i].value公式計(jì)算出元素總數(shù)
  6. 判斷是否需要擴(kuò)容
  7. 如果需要擴(kuò)容,在判斷一下是幫助擴(kuò)容還是發(fā)起擴(kuò)容

注意:

  • CounterCell[] as:這個(gè)的只要目的是分散對baseCount的單一競爭墩瞳,提示size++的并發(fā)率驼壶,這里和table數(shù)組一樣使用了鎖分離技術(shù),as的長度也是2的n次方喉酌,初始長度是2
  • 在第三步中使用隨機(jī)數(shù)也是為了提升并發(fā)效率热凹,ThreadLocalRandom類是JDK7在JUC包下新增的隨機(jī)數(shù)生成器泵喘,它解決了Random類在多線程下,多個(gè)線程競爭內(nèi)部唯一的原子性種子變量般妙,而導(dǎo)致大量線程自旋重試的不足
  • fullAddCount(x, uncontended);方法里面完成了as的初始化和擴(kuò)容
  • 元素總數(shù)的計(jì)算公式是size=baseCount+as[i].value

sumCount() 方法

final long sumCount() {
    CounterCell[] as = counterCells; CounterCell a;
    long sum = baseCount;
    if (as != null) {
        for (int i = 0; i < as.length; ++i) {
            if ((a = as[i]) != null)
                sum += a.value;
        }
    }
    return sum;
}

元素總數(shù)的計(jì)算公式是size=baseCount+as[i].value

get() 方法

public V get(Object key) {
    Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
    int h = spread(key.hashCode());
    // table 不為NULL并且對飲索引位不為NULL
    if ((tab = table) != null && (n = tab.length) > 0 &&
        (e = tabAt(tab, (n - 1) & h)) != null) {
        if ((eh = e.hash) == h) {
            // 頭節(jié)點(diǎn)key相同
            if ((ek = e.key) == key || (ek != null && key.equals(ek)))
                return e.val;
        }
        // 樹節(jié)點(diǎn)或者ForwardingNode標(biāo)記節(jié)點(diǎn)
        else if (eh < 0)
            return (p = e.find(h, key)) != null ? p.val : null;
        // 鏈表節(jié)點(diǎn)
        while ((e = e.next) != null) {
            // key相同
            if (e.hash == h &&
                ((ek = e.key) == key || (ek != null && key.equals(ek))))
                return e.val;
        }
    }
    return null;
}

主要流程:

  1. 判斷table和key對應(yīng)索引位是否為NULL
  2. 判斷頭節(jié)點(diǎn)是否是要找的節(jié)點(diǎn)
  3. eh < 0表示是樹節(jié)點(diǎn)或ForwardingNode標(biāo)記節(jié)點(diǎn)纪铺,直接通過find方法找對應(yīng)的key
  4. 否則是鏈表節(jié)點(diǎn),挨個(gè)鏈表節(jié)點(diǎn)找相應(yīng)的key
  5. 返回結(jié)果

注意:

  • get 方法沒有加鎖碟渺,原因是節(jié)點(diǎn)的value是volatile的鲜锚,已經(jīng)保證了可見性,只要value有更新苫拍,那么我們一定能讀到最新數(shù)據(jù)烹棉。
  • e.find(h, key)這里:如果對應(yīng)索引位頭結(jié)點(diǎn)是ForwardingNode節(jié)點(diǎn),那么會直接去擴(kuò)通后的數(shù)組找對應(yīng)的key怯疤,可以參見上面ForwardingNode.find()方法

size()方法

public int size() {
    long n = sumCount();
    return ((n < 0L) ? 0 :
            (n > (long)Integer.MAX_VALUE) ? Integer.MAX_VALUE :
            (int)n);
}

弱一致性

get方法和containsKey方法都是遍歷對應(yīng)索引位上所有節(jié)點(diǎn),來判斷是否存在key相同的節(jié)點(diǎn)以及獲得該節(jié)點(diǎn)的value催束。但由于遍歷過程中其他線程可能對鏈表結(jié)構(gòu)做了調(diào)整集峦,因此get和containsKey返回的可能是過時(shí)的數(shù)據(jù),這一點(diǎn)是ConcurrentHashMap在弱一致性上的體現(xiàn)抠刺。

JDK1.8與JDK1.7的不同點(diǎn)

  • 去掉了Segment 數(shù)組:這樣做鎖的粒度更小塔淤,減少了并發(fā)沖突的概率;查找數(shù)據(jù)時(shí)不用計(jì)算兩次hash速妖;
  • 存儲數(shù)據(jù)是采用了鏈表+紅黑樹的形式:當(dāng)一個(gè)桶內(nèi)數(shù)據(jù)量很大的時(shí)候高蜂,紅黑樹的查詢效率遠(yuǎn)高于鏈表。
  • 1.8直接使用了內(nèi)置鎖synchronized:簡化了加鎖操作
  • 1.8的初始化是在第一次put時(shí)完成的罕容,1.7的時(shí)候再構(gòu)造的時(shí)候完成的
  • 在put過程中當(dāng)發(fā)現(xiàn)正在擴(kuò)容备恤,1.8的線程會幫助擴(kuò)容,1.7的只是會檢查key是否存在或者完成新節(jié)點(diǎn)的初始化工作
  • 1.8的hash值計(jì)算更簡單了
  • 1.8擴(kuò)容過程中會修改擴(kuò)容前的數(shù)組锦秒,1.7擴(kuò)容過程中不會修改原來數(shù)組
  • 1.8在get()時(shí)如果判斷到當(dāng)前索引位正在擴(kuò)容露泊,那么直接在擴(kuò)容后的數(shù)組中去找對應(yīng)的key
  • 1.7的size計(jì)算使用的三次計(jì)算的方式,1.8使用了鎖分離技術(shù)

測試代碼

package com.xiaolyuh;

import java.util.Map;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * @author yuhao.wang3
 * @since 2019/7/26 17:58
 */
public class HashMapTest {
    public static void main(String[] args) {
        AtomicInteger integer = new AtomicInteger();
        ExecutorService cachedThreadPool = Executors.newFixedThreadPool(50);
        Map<User, Integer> map = new ConcurrentHashMap<>();
        for (int i = 0; i < 10000; i++) {
            cachedThreadPool.execute(() -> {
                User user = new User(1);
                map.put(user, 1);
            });
        }
    }

    static class User {
        int age;

        public User(int age) {
            this.age = age;
        }

        @Override
        public int hashCode() {
            return age;
        }

        @Override
        public String toString() {
            return "" + age;
        }
    }
}

layering-cache

為監(jiān)控而生的多級緩存框架 layering-cache這是我開源的一個(gè)多級緩存框架的實(shí)現(xiàn)旅择,如果有興趣可以看一下

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末惭笑,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子生真,更是在濱河造成了極大的恐慌沉噩,老刑警劉巖,帶你破解...
    沈念sama閱讀 211,817評論 6 492
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件柱蟀,死亡現(xiàn)場離奇詭異川蒙,居然都是意外死亡,警方通過查閱死者的電腦和手機(jī)长已,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,329評論 3 385
  • 文/潘曉璐 我一進(jìn)店門派歌,熙熙樓的掌柜王于貴愁眉苦臉地迎上來弯囊,“玉大人,你說我怎么就攤上這事胶果∝抑觯” “怎么了?”我有些...
    開封第一講書人閱讀 157,354評論 0 348
  • 文/不壞的土叔 我叫張陵早抠,是天一觀的道長霎烙。 經(jīng)常有香客問我,道長蕊连,這世上最難降的妖魔是什么悬垃? 我笑而不...
    開封第一講書人閱讀 56,498評論 1 284
  • 正文 為了忘掉前任,我火速辦了婚禮甘苍,結(jié)果婚禮上尝蠕,老公的妹妹穿的比我還像新娘。我一直安慰自己载庭,他們只是感情好看彼,可當(dāng)我...
    茶點(diǎn)故事閱讀 65,600評論 6 386
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著囚聚,像睡著了一般靖榕。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上顽铸,一...
    開封第一講書人閱讀 49,829評論 1 290
  • 那天茁计,我揣著相機(jī)與錄音,去河邊找鬼谓松。 笑死星压,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的鬼譬。 我是一名探鬼主播租幕,決...
    沈念sama閱讀 38,979評論 3 408
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼拧簸!你這毒婦竟也來了劲绪?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 37,722評論 0 266
  • 序言:老撾萬榮一對情侶失蹤盆赤,失蹤者是張志新(化名)和其女友劉穎贾富,沒想到半個(gè)月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體牺六,經(jīng)...
    沈念sama閱讀 44,189評論 1 303
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡颤枪,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,519評論 2 327
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了淑际。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片畏纲。...
    茶點(diǎn)故事閱讀 38,654評論 1 340
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡扇住,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出盗胀,到底是詐尸還是另有隱情艘蹋,我是刑警寧澤,帶...
    沈念sama閱讀 34,329評論 4 330
  • 正文 年R本政府宣布票灰,位于F島的核電站女阀,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏屑迂。R本人自食惡果不足惜浸策,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,940評論 3 313
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望惹盼。 院中可真熱鬧庸汗,春花似錦、人聲如沸手报。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,762評論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽昧诱。三九已至,卻和暖如春所袁,著一層夾襖步出監(jiān)牢的瞬間盏档,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 31,993評論 1 266
  • 我被黑心中介騙來泰國打工燥爷, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留蜈亩,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 46,382評論 2 360
  • 正文 我出身青樓前翎,卻偏偏與公主長得像稚配,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個(gè)殘疾皇子港华,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 43,543評論 2 349

推薦閱讀更多精彩內(nèi)容