??最近總是在分析源碼,感覺源碼也不是想象上的那么難排霉,今天我來記錄一下我對ConcurrentHashMap的理解窍株。這里只敢說記錄,不敢說分析攻柠,因為ConcurrentHashMap的代碼確實有點難以理解球订,本文不對代碼進行死磕,也就是說瑰钮,可能不會對某一行代碼進行死磕冒滩,而是對整個ConcurrentHashMap類進行整體的理解。
??本文參考資料:
??1.ConcurrentHashMap源碼分析(JDK8版本)
??2. 死磕 Java 并發(fā):J.U.C 之 Java 并發(fā)容器:ConcurrentHashMap
??3.深入淺出ConcurrentHashMap1.8
??4.深入分析ConcurrentHashMap1.8的擴容實現
??5.方騰飛浪谴、魏鵬开睡、程曉明的《Java 并發(fā)編程的藝術》
1.為什么要使用ConcurrentHashMap?
??其實苟耻,樓主都覺得這個問題問的非常傻逼篇恒,為什么使用ConcurrentHashMap呢?當然是為了線程安全唄梁呈。其實這個回答是比較籠統(tǒng)的婚度,這里面還有很多的問題沒有涉及到,比如說,使用最多的HashMap在多線程模型下的缺點蝗茁,傳統(tǒng)線程安全的HashTable的問題等等醋虏。
(1).線程不安全的HashMap
??我們才學習HashMap的時候,就知道HashMap是線程不安全的哮翘,但是不知道HashMap由于多線程會導致什么問題颈嚼。下面,我們來看一段代碼:
public class Demo {
public static void main(String[] args) {
final Map<String, String> map = new HashMap<>(2);
Thread t = new Thread(new Runnable() {
@Override
public void run() {
for(int i = 0; i < 100000; i++){
new Thread(new Runnable() {
@Override
public void run() {
map.put(UUID.randomUUID().toString(), "");
}
}).start();
}
}
});
t.start();
try {
t.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
??HashMap在并發(fā)執(zhí)行put操作是會引起死循環(huán)饭寺,是因為多線程會導致HashMap的Entry鏈表形成數據結構阻课,一旦形成環(huán)形數據結構,Entry的next節(jié)點永遠不為空艰匙,就會產生死循環(huán)獲取Entry限煞。
(2).效率低下的HashTable
??HashTable使用synchronized關鍵字來保證線程安全,但在線程競爭激烈的情況下HashTable的效率非常低下员凝。因為當一個線程訪問HashTable的同步方法署驻,其他線程也訪問HashTable的同步方法,會進入阻塞或者輪詢狀態(tài)健霹。如果線程1在使用put進行元素添加旺上,線程2不但不能使用put方法添加元素也不能使用get方法來獲取元素,所以競爭越激烈效率越低糖埋。
(3).ConcurrentHashMap
??相較于笨重的HashTable宣吱,ConcurrentHashMap就顯得非常高效。ConcurrentHashMap降低了鎖的粒度瞳别,其中在1.7中征候,設置了Segment數組,來表示不同數據段洒试,在1.8中倍奢,取消了Segment數組,進一步降低了鎖的粒度垒棋。由于本文是分析1.8的ConcurrentHashMap卒煞,所以不對1.7的版本過多的解釋。
2.ConcurrentHashMap的模型圖
??1.8在1.7的基礎上進一步的優(yōu)化叼架,使得ConcurrentHashMap的鎖的粒度進一步降低畔裕。我們還是先來看看ConcurrentHashMap的結構圖:
??我們看到的是,在1.8的內部乖订,維持了一個Node數組扮饶,用來存儲數據。實際上乍构,ConcurrentHashMap結構與HashMap的結構類似甜无,基本結構都是數組+鏈表+紅黑樹。但是在ConcurrentHashMap里面,紅黑樹在Node數組內部存儲的不是一個TreeNode對象岂丘,而是一個TreeBin對象陵究,TreeBin內部維持著一個紅黑樹。
3.ConcurrentHashMap的內部類
??簡單的看過ConcurrentHashMap的結構圖之后奥帘,現在來了解一下ConcurrentHashMap的內部類铜邮,這個在后面我們理解源碼有一定的幫助。
(1).Node類
??Node類在ConcurrentHashMap的內部類是最基本的類寨蹋,其中桶里面裝的就是Node元素松蒜,還有就是TreeBin、TreeNode等都繼承于Node類已旧。我們先來看看Node的代碼:
static class Node<K,V> implements Map.Entry<K,V> {
final int hash;
final K key;
volatile V val;
volatile Node<K,V> next;
Node(int hash, K key, V val, Node<K,V> next) {
this.hash = hash;
this.key = key;
this.val = val;
this.next = next;
}
public final K getKey() { return key; }
public final V getValue() { return val; }
public final int hashCode() { return key.hashCode() ^ val.hashCode(); }
public final String toString(){ return key + "=" + val; }
public final V setValue(V value) {
throw new UnsupportedOperationException();
}
public final boolean equals(Object o) {
Object k, v, u; Map.Entry<?,?> e;
return ((o instanceof Map.Entry) &&
(k = (e = (Map.Entry<?,?>)o).getKey()) != null &&
(v = e.getValue()) != null &&
(k == key || k.equals(key)) &&
(v == (u = val) || v.equals(u)));
}
/**
* Virtualized support for map.get(); overridden in subclasses.
*/
Node<K,V> find(int h, Object k) {
Node<K,V> e = this;
if (k != null) {
do {
K ek;
if (e.hash == h &&
((ek = e.key) == k || (ek != null && k.equals(ek))))
return e;
} while ((e = e.next) != null);
}
return null;
}
}
??我們來看一下Node成員變量:
final int hash;
final K key;
volatile V val;
volatile Node<K,V> next;
??hash用來存儲該key的hash值秸苗,但是這里面有三個狀態(tài)需要注意:
名稱 | 值 | 描述 |
---|---|---|
MOVED | -1 | hash值為-1的Node,表示當前節(jié)點已經被處理過了运褪,這中情況將出現多線程擴容的情況下难述,后面會詳細的解釋 |
TREEBIN | -2 | hash值為-2的Node,表示當前的節(jié)點是TreeBin |
RESERVED | -3 | 保留的hash值吐句,用在ReservationNode上面 |
??key用來存儲的key值,val用來存儲value值店读,這個就不用多說嗦枢。next字段表示下一個Node,這個在出現哈希沖突時屯断,將定位在相同位置上的Node使用鏈表連接起來文虏。
??我們還需要注意一點是,我們不能夠通過調用setValue方法來改變Node的值。
public final V setValue(V value) {
throw new UnsupportedOperationException();
}
(2).TreeNode類和TreeBin類
??TreeNode類表示的是紅黑樹上的每個節(jié)點殖演。當一個鏈表上的節(jié)點數量超過了指定的值氧秘,會將這個鏈表變?yōu)榧t黑樹,當然每個節(jié)點就轉換為TreeNode趴久。不像HashMap丸相,ConcurrentHashMap在桶里面直接存儲的不是TreeNode,而是一個TreeBin彼棍,在TreeBin內部維護一個紅黑樹灭忠,也就是說TreeNode在TreeBin內部使用的。
(3).ForwardingNode類
??這個是輔助類座硕,通常在擴容時用到弛作。此時如果一個線程在進行擴容操作,另一個線程put一個元素進來华匾,如果看到對應的位置上面是ForwardingNode對象的話映琳,那么就參與擴容的操作隊列來。其中ForwardingNode對象來源有兩個:1.原來的位置為null,但是此時復制操作已經到當前位置的后面了萨西,會將這個原來的桶的這個位置置為ForwardingNode對象有鹿;2.原來位置不為null,但是已經操作過這個位置了原杂。
??我們來來ForwardingNode的源碼:
static final class ForwardingNode<K,V> extends Node<K,V> {
final Node<K,V>[] nextTable;
ForwardingNode(Node<K,V>[] tab) {
super(MOVED, null, null, null);
this.nextTable = tab;
}
//省略了find方法代碼
}
??在ForwardingNode里面印颤,我們發(fā)現了一個nextTable對象,這個對象表示擴容之后的數組穿肄,當一個線程訪問到了一個ForwardingNode對象年局,就知道當前正在進行擴容操作,當前這個線程會幫助擴容(擴容分為兩步:1.數組容量增大2倍咸产;2.將原數組的元素拷貝到新數組里面去)矢否,將原數組的元素復制到這個nextTable里面去。
4.基本成員變量
??我們在分析ConcurrentHashMap的源碼時脑溢,還是先來看看它基本的成員變量僵朗。
變量名 | 描述 |
---|---|
table | 桶數組,用來存儲Node元素的屑彻。默認為null验庙,只在第一次put操作的進行初始化,該數組的長度永遠為2的n次方社牲。 |
nextTable | 默認為null救拉,當不為null迹辐,表示當前正在進行擴容操作,這個數組就是擴容之后的數組,長度為原數組的兩倍秆吵。 |
baseCount | 記錄map中元素個數奸披,由于是多線程操作坟漱,baseCount記錄的不準確,所以要結合counterCells 來使用保證記錄的正確性。 |
sizeCtl | 表初始化和擴容的控制位弊添。其中,當為-1時,表示當前table數組正在被初始化;當為-N時,表示有N-1個線程在進行擴容操作飞苇;當為0(默認值)時忿等,表示當前table還未使用匾浪,此時table為null冷溶;當為其余正整數時苗胀,表示table的容量,默認是table大小的0.75倍,在代碼中使用的是(n - (n>>>2))的方式來計算0.75 |
5.構造方法
??在ConcurrentHashMap里面,我覺得有兩個構造方法需要我們關注拴鸵,一個是無參數的構造方法聘芜,這個構造方法非常的簡單瞎饲,是一個空的構造方法疟呐,還有一個就是帶初始容量的構造方法:
public ConcurrentHashMap(int initialCapacity) {
if (initialCapacity < 0)
throw new IllegalArgumentException();
int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ?
MAXIMUM_CAPACITY :
tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1));
this.sizeCtl = cap;
}
??在這個構造方法里面最主要的是富纸,初始化sizeCtl囤踩。在這里保證sizeCtl必須是2的n次方,所以這里調用tableSizeFor方法的目的就是調整sizeCtl大小為2的n次方。
private static final int tableSizeFor(int c) {
int n = c - 1;
n |= n >>> 1;
n |= n >>> 2;
n |= n >>> 4;
n |= n >>> 8;
n |= n >>> 16;
return (n < 0) ? 1 : (n >= MAXIMUM_CAPACITY) ? MAXIMUM_CAPACITY : n + 1;
}
??假設給出大小為100晓褪,調用這個方法之后堵漱,會將大小調整為256。
6.table數組的初始化
??在前面涣仿,對table數組的初始化也有所提及勤庐。table不會在構造方法里面進行初始化,而是在第一次put操作時好港,進行初始化愉镰。我們來看看:
final V putVal(K key, V value, boolean onlyIfAbsent) {
if (key == null || value == null) throw new NullPointerException();
int hash = spread(key.hashCode());
int binCount = 0;
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh;
//發(fā)現table數組尚未初始化,調用initTable方法來對table數組進行初始化
if (tab == null || (n = tab.length) == 0)
tab = initTable();
//省略了其余代碼
}
??在調用put方法時钧汹,如果發(fā)現table尚未被初始化丈探,會調用initTable方法來初始化。我們來看看initTable方法的代碼:
private final Node<K,V>[] initTable() {
Node<K,V>[] tab; int sc;
while ((tab = table) == null || tab.length == 0) {
//sizeCtl小于0拔莱,表示已經有線程在對table數組進行初始化碗降,現在這個線程要做的就是讓出cpu時間,
//全力支持table初始化
if ((sc = sizeCtl) < 0)
Thread.yield(); // lost initialization race; just spin
//此時有一個線程在對table數組進行是初始化塘秦,如果使用CAS算法對sizeCtl字段更新成功讼渊,
//表示當前線程可以對table數組進行初始化。
else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
try {
if ((tab = table) == null || tab.length == 0) {
int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
table = tab = nt;
sc = n - (n >>> 2);
}
} finally {
sizeCtl = sc;
}
break;
}
}
return tab;
}
??在initTable方法中尊剔,我們發(fā)現table數組只能被一個線程初始化爪幻。如果一個線程在初始化table數組,會將sizeCtl字段更新為-1须误,其他線程看到sizeCtl為-1時挨稿,就知道當前已經有線程在初始化table數組,他們需要做的是京痢,全力支持那個線程初始化table數組--讓出cpu占用時間奶甘。
??同時,我們看到當table數組更新成功之后历造,sizeCtl更新為table數組大小的0.75倍:
sc = n - (n >>> 2);
7.put操作
??我們使用HashMap最多的操作就是put操作和get操作。這里先對put進行分析船庇。我們先總體看一下put的代碼:
final V putVal(K key, V value, boolean onlyIfAbsent) {
if (key == null || value == null) throw new NullPointerException();
int hash = spread(key.hashCode());
int binCount = 0;
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh;
if (tab == null || (n = tab.length) == 0)
tab = initTable();
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
if (casTabAt(tab, i, null,
new Node<K,V>(hash, key, value, null)))
break; // no lock when adding to empty bin
}
else if ((fh = f.hash) == MOVED)
tab = helpTransfer(tab, f);
else {
V oldVal = null;
synchronized (f) {
if (tabAt(tab, i) == f) {
if (fh >= 0) {
binCount = 1;
for (Node<K,V> e = f;; ++binCount) {
K ek;
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
oldVal = e.val;
if (!onlyIfAbsent)
e.val = value;
break;
}
Node<K,V> pred = e;
if ((e = e.next) == null) {
pred.next = new Node<K,V>(hash, key,
value, null);
break;
}
}
}
else if (f instanceof TreeBin) {
Node<K,V> p;
binCount = 2;
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
}
}
if (binCount != 0) {
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i);
if (oldVal != null)
return oldVal;
break;
}
}
}
addCount(1L, binCount);
return null;
}
??這里吭产,我先將整個put方法的代碼貼出,讓大家有一個總體上概念鸭轮,然后再來一一解釋臣淤,整個put操作的流程。
??整個put操作分為4種情況:
??1.如果當前table沒有被初始化窃爷,會先初始化邑蒋,重新進行put操作(循環(huán)操作姓蜂,保證put成功)。
??2.通過hash計算医吊,定位位置钱慢,如果位置上為null,表示可以直接放進去卿堂。
??3.如果對應位置的元素已經被標記為MOVED束莫,即Node的hash為MOVED,那么表示當前table數組在進行擴容操作草描,此時览绿,當前的線程會幫助擴容。
??4.如果通過hash計算之后的位置不為null穗慕,表示出現了哈希沖突饿敲,此時會鎖住當前這個位置上的Node對象,然后將新添加的Node添加這個位置鏈表或者紅黑樹上逛绵。
(1).hash計算
??put操作的第一步就是hash計算怀各,這里的hash計算,我認為分為兩步:
??1.調用spread方法將key的hashcode再次hash暑脆。
??2.通過(n - 1) & hash方法來計算該元素定位的位置渠啤。
??我們先來看看spread方法:
static final int spread(int h) {
return (h ^ (h >>> 16)) & HASH_BITS;
}
??我先來解釋一下>>>運算符的含義:
??>>>的基本操作就是左移,然后高位補0添吗,這里的左移表示連符號位都要跟著左移沥曹;而>>只是左移數值位,不移動符號位碟联。
??h ^ (h >>> 16)與運算過程如下圖:
??異或操作完成之后妓美,然后跟HASH_BITS取與操作,我認為是為了消除符號位的影響鲤孵。
??至于(n - 1) & hash計算壶栋,其實就是hash % n的計算,具體的解釋普监,大家可以參考我的Java 源碼分析-ThreadLocal,這篇文章里面有相關的解釋贵试。
(2).元素放入數組
??由于初始化table數組在前面已經說了,所以這里直接說賦值的情況凯正。在這種情況下毙玻,還有分為兩種情況:1.對應位置為null;2.對應位置不為null廊散。這里將將兩種情況分別討論一下桑滩。
A.對應位置為null
??這種情況比較簡單,直接調用了casTabAt方法進行賦值:
static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i,
Node<K,V> c, Node<K,V> v) {
return U.compareAndSwapObject(tab, ((long)i << ASHIFT) + ABASE, c, v);
}
??這里使用Unsafe類來對變量進行CAS更新允睹,具體的實現原理运准,樓主也不是很清楚幌氮,實在抱歉!
A.對應位置不為null
??這種情況下胁澳,就比較復雜了该互,我們先來看看代碼:
V oldVal = null;
synchronized (f) {
if (tabAt(tab, i) == f) {
if (fh >= 0) {
binCount = 1;
for (Node<K,V> e = f;; ++binCount) {
K ek;
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
oldVal = e.val;
if (!onlyIfAbsent)
e.val = value;
break;
}
Node<K,V> pred = e;
if ((e = e.next) == null) {
pred.next = new Node<K,V>(hash, key,
value, null);
break;
}
}
}
else if (f instanceof TreeBin) {
Node<K,V> p;
binCount = 2;
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
}
}
if (binCount != 0) {
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i);
if (oldVal != null)
return oldVal;
break;
}
??這種情況下,新添加的Node肯定會添加到這個位置后面听哭,當然為了保證線程安全慢洋,肯定鎖住當前的Node,這樣將鎖的粒度降到了每個Node上陆盘,一個Node的put操作不會影響其他Node的操作普筹。
??鎖住了之后,當然就要添加Node隘马。這里分為兩種情況太防,一種原來的Node不是TreeBin,也就是說原來就是一個鏈表酸员,這樣直接添加到鏈表的內部就行了蜒车;還有就是原來的Node本身就是一個TreeBin,那么我們通過調用TreeBin內部的方法來添加一個Node幔嗦,至于紅黑樹的平衡性酿愧,讓TreeBin自己來維持,我們外部不需要關注邀泉。
??添加完成之后嬉挡,還需要一個操作,那就是如果當前鏈表達到了閾值汇恤,需要將鏈表變?yōu)榧t黑樹庞钢。這一步的解釋在后面在詳細的說。
8.更新baseCount
??在putVal方法的循環(huán)執(zhí)行完畢之后因谎,一個Node肯定放入進去了基括,此時就需要調用addCount方法來更新baseCount變量:
private final void addCount(long x, int check) {
CounterCell[] as; long b, s;
//更新baseCount
if ((as = counterCells) != null ||
!U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
CounterCell a; long v; int m;
boolean uncontended = true;
if (as == null || (m = as.length - 1) < 0 ||
(a = as[ThreadLocalRandom.getProbe() & m]) == null ||
!(uncontended =
U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {
fullAddCount(x, uncontended);
return;
}
if (check <= 1)
return;
s = sumCount();
}
//檢測是否需要擴容
if (check >= 0) {
Node<K,V>[] tab, nt; int n, sc;
while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
(n = tab.length) < MAXIMUM_CAPACITY) {
int rs = resizeStamp(n);
if (sc < 0) {
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
transferIndex <= 0)
break;
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
transfer(tab, nt);
}
else if (U.compareAndSwapInt(this, SIZECTL, sc,
(rs << RESIZE_STAMP_SHIFT) + 2))
transfer(tab, null);
s = sumCount();
}
}
}
?? addCount方法里面分為主要兩個操作:1.更新baseCount;2.檢測是否擴容财岔。這里由于在討論更新baseCount风皿,所以不對擴容操作進行詳細的解釋,之后會做詳細的解釋匠璧。
?? 更新baseCount的操作分成了兩步:1.嘗試更新baseCount變量桐款;2.如果更新失敗,或者counterCells為null會調用fullAddCount方法進行循環(huán)更新患朱。
??我們來看看fullAddCount的代碼:
private final void fullAddCount(long x, boolean wasUncontended) {
int h;
if ((h = ThreadLocalRandom.getProbe()) == 0) {
ThreadLocalRandom.localInit(); // force initialization
h = ThreadLocalRandom.getProbe();
wasUncontended = true;
}
boolean collide = false; // True if last slot nonempty
for (;;) {
CounterCell[] as; CounterCell a; int n; long v;
if ((as = counterCells) != null && (n = as.length) > 0) {
// 如果counterCells數組對應位置上為null鲁僚,創(chuàng)建一個cell炊苫,
//放在這個位置上裁厅。
if ((a = as[(n - 1) & h]) == null) {
if (cellsBusy == 0) { // Try to attach new Cell
CounterCell r = new CounterCell(x); // Optimistic create
if (cellsBusy == 0 &&
U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
boolean created = false;
try { // Recheck under lock
CounterCell[] rs; int m, j;
if ((rs = counterCells) != null &&
(m = rs.length) > 0 &&
rs[j = (m - 1) & h] == null) {
rs[j] = r;
created = true;
}
} finally {
cellsBusy = 0;
}
if (created)
break;
continue; // Slot is now non-empty
}
}
collide = false;
}
else if (!wasUncontended) // CAS already known to fail
wasUncontended = true; // Continue after rehash
//如果對應位置上不為null冰沙,嘗試更新value值
else if (U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))
break;
else if (counterCells != as || n >= NCPU)
collide = false; // At max size or stale
else if (!collide)
collide = true;
//如果對應不為null,并且更新失敗执虹,表示此時counterCells數組的容量過小拓挥,
//此時需要擴容。
else if (cellsBusy == 0 &&
U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
try {
if (counterCells == as) {// Expand table unless stale
CounterCell[] rs = new CounterCell[n << 1];
for (int i = 0; i < n; ++i)
rs[i] = as[i];
counterCells = rs;
}
} finally {
cellsBusy = 0;
}
collide = false;
continue; // Retry with expanded table
}
h = ThreadLocalRandom.advanceProbe(h);
}
//初始化counterCells數組
else if (cellsBusy == 0 && counterCells == as &&
U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
boolean init = false;
try { // Initialize table
if (counterCells == as) {
CounterCell[] rs = new CounterCell[2];
rs[h & 1] = new CounterCell(x);
counterCells = rs;
init = true;
}
} finally {
cellsBusy = 0;
}
if (init)
break;
}
else if (U.compareAndSwapLong(this, BASECOUNT, v = baseCount, v + x))
break; // Fall back on using base
}
}
??這個方法的代碼比較長袋励,理解起來可能麻煩侥啤,我來說明整個方法的作用和執(zhí)行的過程。
??在解釋這個方法之前茬故,先解釋一下baseCount和counterCells含義盖灸。兩個都是用記錄元素個數,只是記錄的時機不同的磺芭。當要更新元素個數時赁炎,優(yōu)先更新baseCount,如果baseCount更新成功的話钾腺,表示更新元素個數的操作已經完成了徙垫;如果更新失敗的話,此時會考慮更新counterCells數組中某一個(隨機的)cell的value值放棒。因此姻报,map的元素個數 = baseCount + 所有的cell的value值。
??在調用fullAddCount方法之前间螟,在addCount方法里面進行了兩次嘗試:1.嘗試更新baseCount吴旋;2.嘗試更新counterCells數組隨機位置上的一個cell的value。如果這兩次嘗試都失敗的話寒亥,則需要調用fullAddCount來保證元素個數正確的更新邮府。
??而這個fullAddCount方法的作用就是更新cell的value值。
??整個fullAddCount方法的執(zhí)行步驟:
??1.如果counterCells為null溉奕,先初始化counterCells數組褂傀,默認大小為0。
??2.如果counterCells不為null加勤,根據產生的隨機仙辟,通過(n - 1) & h找到一個位置,如果這個位置為null的話鳄梅,創(chuàng)建一個cell對象放在這個位置上面叠国。
??3.如果這個位置上面不為null,首先會嘗試更新這個cell的value值戴尸,如果更新成功的話粟焊,表示更新操作完成,否則執(zhí)行第4步。
??4.如果第3步的更新操作失敗项棠,表示此時counterCells數組的容量可能不足以應付這么多的線程了悲雳。所以此時counterCells數組需要擴容。
9.擴容操作
??當我們put一個元素之后香追,如果此時元素已經達到了擴容的閾值了合瓢,就需要擴容。我們先來看看addCount方里面的擴容部分:
if (check >= 0) {
Node<K,V>[] tab, nt; int n, sc;
while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
(n = tab.length) < MAXIMUM_CAPACITY) {
int rs = resizeStamp(n);
//表示已經有線程在進行擴容操作
if (sc < 0) {
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
transferIndex <= 0)
break;
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
transfer(tab, nt);
}
//當前線程是唯一的或是第一個發(fā)起擴容的線程 此時nextTable=null
else if (U.compareAndSwapInt(this, SIZECTL, sc,
(rs << RESIZE_STAMP_SHIFT) + 2))
transfer(tab, null);
s = sumCount();
}
}
??addcount方法擴容部分有兩種情況:1. sc<0情況下透典,表示當前已經有線程在進行擴容操作了晴楔,此時當前這個線程需要參與幫助擴容的任務中來;2. 另一個情況峭咒,當前線程只有一個線程準備擴容操作税弃。第一種情況為什么不調用helperTransfer方法來表示幫助擴容的含義,這個原因凑队,我也不是很懂钙皮,但是比較了第一種情況的代碼與helperTransfer方法的代碼,兩者其實比較類似顽决。
??現在我們來看看transfer方法究竟為我們做了什么短条。由于transfer方法代碼過長,所以這里不完全展出來才菠,這里使用分段形式來解釋茸时。
??首先,如果是第一個線程調用transfer方法進行擴容操作赋访,那么nextTable肯定為null可都。所以transfer方法的第一步就是創(chuàng)建新數組,新數組的容量是舊數組的兩倍:
if (nextTab == null) { // initiating
try {
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
nextTab = nt;
} catch (Throwable ex) { // try to cope with OOME
sizeCtl = Integer.MAX_VALUE;
return;
}
nextTable = nextTab;
transferIndex = n;
}
??擴容之后蚓耽,此時需要將原來的數組進行復制操作了渠牲。這個過程支持多線程操作。在看這個過程之前步悠,我們先來看看幾個變量的含義签杈。
變量名 | 類型 | 描述 |
---|---|---|
fwd | ForwardingNode | 用來作為占位符,表示當前位置已經被處理過 |
advance | boolean | advance為true鼎兽,表示當前這個節(jié)點已經被處理答姥,這個與fwd區(qū)別在于,advance為false表示當前節(jié)點(i位置)可以被處理 |
finishing | boolean | finishing為true谚咬,表示當前擴容操作已經操作完畢 |
??接下來鹦付,我們來分析一下transfer方法的執(zhí)行過程。大體的流程是择卦,遍歷這個原來的桶數組敲长,然后復制到新數組里面去郎嫁。
??首先,必須找到一個可以被處理的Node節(jié)點祈噪,這個節(jié)點可以為null行剂,也可以不為null,但是不能為fwd對象钳降,因為如果是fwd對象,表示當前已經被處理過腌巾,沒必要再去處理遂填。
while (advance) {
int nextIndex, nextBound;
if (--i >= bound || finishing)
advance = false;
else if ((nextIndex = transferIndex) <= 0) {
i = -1;
advance = false;
}
else if (U.compareAndSwapInt
(this, TRANSFERINDEX, nextIndex,
nextBound = (nextIndex > stride ?
nextIndex - stride : 0))) {
bound = nextBound;
i = nextIndex - 1;
advance = false;
}
}
??上面的代碼中,只要advance為false澈蝙,就退出while循環(huán)吓坚,表示當前嘗試著處理這個位置上的Node。這里說的是嘗試著處理灯荧,意思就是礁击,可能不會被處理,比如說逗载,已經有現在在處理了哆窿;或者這個位置已經被處理,已經為fwd了厉斟。
??首先挚躯,如果這個位置上為null的話,那么直接將這個位置設置成fwd擦秽。
else if ((f = tabAt(tab, i)) == null)
advance = casTabAt(tab, i, null, fwd);
??如果當前這個節(jié)點已經被處理了码荔,直接將advance設置為true,進行下一次位置的選擇感挥。
else if ((fh = f.hash) == MOVED)
advance = true; // already processed
??其他情況下缩搅,表示可以處理這個節(jié)點,這里處理的意思触幼,是可以將這個位置上的所有節(jié)點拷貝到新數組里面去硼瓣。這里分為兩種情況,我們來一一的分析置谦。
??首先巨双,第一種情況就是節(jié)點本身為一個鏈表結構。
Node<K,V> ln, hn;
if (fh >= 0) {
int runBit = fh & n;
Node<K,V> lastRun = f;
for (Node<K,V> p = f.next; p != null; p = p.next) {
int b = p.hash & n;
if (b != runBit) {
runBit = b;
lastRun = p;
}
}
if (runBit == 0) {
ln = lastRun;
hn = null;
}
else {
hn = lastRun;
ln = null;
}
for (Node<K,V> p = f; p != lastRun; p = p.next) {
int ph = p.hash; K pk = p.key; V pv = p.val;
if ((ph & n) == 0)
ln = new Node<K,V>(ph, pk, pv, ln);
else
hn = new Node<K,V>(ph, pk, pv, hn);
}
//將原來鏈表上的一部分就放在原位置
setTabAt(nextTab, i, ln);
//將原來鏈表上的另一部分放在 i + n的位置上
setTabAt(nextTab, i + n, hn);
//處理完畢霉祸,將這個位置上設置為fwd筑累,后面的線程看到之后,知道這個節(jié)點已經被處理了
setTabAt(tab, i, fwd);
advance = true;
}
??整個過程是非常簡單丝蹭,就是將原來的鏈表分為了兩部分慢宗,一部分在原來的 i 位置上,一部分在 i+ n的位置上。這里涉及到了喜聞樂見的反轉鏈表镜沽,這里不對反轉鏈表做解釋敏晤,后面會詳細的解釋反轉鏈表的原因,到時候會回來詳細的解釋這段代碼缅茉。
??現在來看看復制操作的第二種類型嘴脾。如果當前位置上的Node為TreeBin類型,表示為紅黑樹蔬墩,此時要做的操作跟鏈表的操作也是非常類似译打,將樹分為兩個部分,一個部分在原來的i位置上拇颅,另一個部分在i + n的位置上奏司。
TreeBin<K,V> t = (TreeBin<K,V>)f;
TreeNode<K,V> lo = null, loTail = null;
TreeNode<K,V> hi = null, hiTail = null;
int lc = 0, hc = 0;
for (Node<K,V> e = t.first; e != null; e = e.next) {
int h = e.hash;
TreeNode<K,V> p = new TreeNode<K,V>
(h, e.key, e.val, null, null);
if ((h & n) == 0) {
if ((p.prev = loTail) == null)
lo = p;
else
loTail.next = p;
loTail = p;
++lc;
}
else {
if ((p.prev = hiTail) == null)
hi = p;
else
hiTail.next = p;
hiTail = p;
++hc;
}
}
//如果此時節(jié)點數量小于閾值,那么將紅黑樹變?yōu)殒湵? ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
(hc != 0) ? new TreeBin<K,V>(lo) : t;
hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
(lc != 0) ? new TreeBin<K,V>(hi) : t;
setTabAt(nextTab, i, ln);
setTabAt(nextTab, i + n, hn);
setTabAt(tab, i, fwd);
advance = true;
??最后樟插,如果所有的節(jié)點被賦值完畢之后韵洋,表示擴容操作已經完成了,此時finishing為true。
if (i < 0 || i >= n || i + n >= nextn) {
int sc;
if (finishing) {
//將nextTable置為null黄锤,以便下次擴容
nextTable = null;
table = nextTab;
//將sizeCtl的值調整為長度的0.75
sizeCtl = (n << 1) - (n >>> 1);
return;
}
//sizeCtl減一搪缨,表示當前有個線程參與擴容
if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
return;
finishing = advance = true;
i = n; // recheck before commit
}
}
10.反轉鏈表的原因
??樓主的思路來自于深入分析ConcurrentHashMap1.8的擴容實現,如果大家可以參考一下鸵熟。剛剛勉吻,我們對transfer方法的復制操作有了一個大概的理解,這里將對其詳細分析旅赢。
??在解釋這段代碼之前齿桃,我們先對這段代碼里面幾個變量有一個初步的理解。
變量名 | 類型 | 描述 |
---|---|---|
runBit | int | 位置為每個節(jié)點的hash值與n進行與操作煮盼,經過第一次循環(huán)短纵,runBit記錄的是最后一個hash值變化的Node的hash值〗┛兀可能這里拗口香到,待會舉一個詳細例子。 |
lastRun | Node | 默認值為當前位置第一個Node报破,經過第一次循環(huán)悠就,lastRun記錄的是最后一個hash值變化的Node。 |
ln | Node | 默認值為null充易,是放在新數組 i 位置上的鏈表頭結點梗脾。 |
hn | Node | 默認值為null,是放在新數組 i + n 位置上的鏈表頭結點 |
??在上面的表格中盹靴,提到最后一個hash值變化的Node炸茧,這句話可能比較拗口瑞妇,這里舉一個簡單的例子來表達意思。
??上圖中是一個鏈表的模型梭冠,其中辕狰,我將這個鏈表中的節(jié)點分為兩類:1.一類是節(jié)點的hash值相同的;2.第二類是跟第一類的hash值不同的控漠。所以蔓倍,從這個鏈表中我們可以得到,lastRun為節(jié)點6盐捷,runBit也是節(jié)點6與n進行與操作得到的值偶翅。為什么是節(jié)點6呢?因為在6之后毙驯,節(jié)點類型沒有再變了。
Node<K,V> ln, hn;
if (fh >= 0) {
int runBit = fh & n;
Node<K,V> lastRun = f;
for (Node<K,V> p = f.next; p != null; p = p.next) {
int b = p.hash & n;
if (b != runBit) {
runBit = b;
lastRun = p;
}
}
if (runBit == 0) {
ln = lastRun;
hn = null;
}
else {
hn = lastRun;
ln = null;
}
for (Node<K,V> p = f; p != lastRun; p = p.next) {
int ph = p.hash; K pk = p.key; V pv = p.val;
if ((ph & n) == 0)
ln = new Node<K,V>(ph, pk, pv, ln);
else
hn = new Node<K,V>(ph, pk, pv, hn);
}
//將原來鏈表上的一部分就放在原位置
setTabAt(nextTab, i, ln);
//將原來鏈表上的另一部分放在 i + n的位置上
setTabAt(nextTab, i + n, hn);
//處理完畢灾测,將這個位置上設置為fwd爆价,后面的線程看到之后,知道這個節(jié)點已經被處理了
setTabAt(tab, i, fwd);
advance = true;
}
??結合源碼來分析媳搪,runBit默認為節(jié)點1的hash和n與操作的值铭段。后面依次記錄與上一次runBit的不同值,最后通過runBit == 0將所有的節(jié)點分為了兩類秦爆。
??現在我們結合上面的圖做一個假設序愚。我們假設紫色節(jié)點的hash值&n為0,那么藍色節(jié)點的hash值&n就不為0等限。經過第一次循環(huán)爸吮,得出 ln = lastRun = 節(jié)點6,hn = null望门。
??此時我們來分析第二次循環(huán)形娇,先來看看紫色節(jié)點構成ln鏈表的過程。
??我看到ln鏈表只是部分反轉了筹误,默認部分是沒有反轉了桐早。其中得到的ln鏈表放在新數組的i位置上。
??然后我們再來看看hn鏈表的構成過程厨剪。
??我們發(fā)現hn鏈表是完全反轉的哄酝。從而,我們得出一個結論祷膳,哪個鏈表默認為null陶衅,那個鏈表就是完全反轉的。
??最終直晨,我們終于知道了万哪,一個反轉鏈表得出的原因侠驯,其實不是故意為之,而是加快復制操作速度奕巍,因為lastRun之后的節(jié)點進行復制操作不需要創(chuàng)建新節(jié)點吟策。
11.get操作
??前面詳細的說了put操作和擴容操作,這里簡單的分析一下get操作的止,get操作相比于前面的操作就非常的簡單檩坚。先來看看源碼:
public V get(Object key) {
Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
int h = spread(key.hashCode());
if ((tab = table) != null && (n = tab.length) > 0 &&
(e = tabAt(tab, (n - 1) & h)) != null) {
//匹配成功
if ((eh = e.hash) == h) {
if ((ek = e.key) == key || (ek != null && key.equals(ek)))
return e.val;
}
//當節(jié)點為TreeBin或者ForwardingNode
else if (eh < 0)
return (p = e.find(h, key)) != null ? p.val : null;
//遍歷鏈表
while ((e = e.next) != null) {
if (e.hash == h &&
((ek = e.key) == key || (ek != null && key.equals(ek))))
return e.val;
}
}
return null;
}
??整個get操作分為三種情況:1.table數組指定位置的第一個節(jié)點就匹配成功,直接返回诅福;2.table數組指定位置上的hash值小于0匾委,此時當前位置可能已經被其他在擴容時處理過,或者當前位置的Node為一個TreeBin氓润,不管是那種類型的Node赂乐,調用的都是find方法來獲取Node節(jié)點;3.其余情況下咖气,直接遍歷鏈表查找挨措。
12.size操作
??size操作里面,主要有兩個方法提供崩溪,一個是傳統(tǒng)的size方法浅役,一個是ConcurrentHashMap比較提倡的mappingCount方法。我們先來看看size方法:
public int size() {
long n = sumCount();
return ((n < 0L) ? 0 :
(n > (long)Integer.MAX_VALUE) ? Integer.MAX_VALUE :
(int)n);
}
??size方法里面調用sumCount方法伶唯,sumCount方法才是真正計算元素個數的方法觉既。我們來看看sumCount方法:
final long sumCount() {
CounterCell[] as = counterCells; CounterCell a;
long sum = baseCount;
if (as != null) {
for (int i = 0; i < as.length; ++i) {
if ((a = as[i]) != null)
sum += a.value;
}
}
return sum;
}
??可能大家對這個方法計算方式不太好理解,其實這種計算方式取決于前面的設計方式乳幸。在前面我說到過瞪讼,map的元素個數 = baseCount + 所有的cell的value值。這個結論是因為多線程更新個數時粹断,如果只在baseCount里面記錄杆麸,一個原因是多線程更新可能會出錯锰蓬,另一個原因是競爭性太大了第租。所以勉痴,設計理念就變成了,當baseCount更新失敗時悬赏,表示baseCount變量當前忙碌狡汉,可以將這個增量添加到一個不忙的cell里面去,這樣競爭性就降低了闽颇。
??我們再來看看mappingCount方法:
public long mappingCount() {
long n = sumCount();
return (n < 0L) ? 0L : n; // ignore transient negative values
}
??達到的效果感覺跟size都差不多盾戴,只不過是一個返回int,一個返回long而已兵多。