簡書 占小狼
轉(zhuǎn)載請注明原創(chuàng)出處讼溺,謝謝!
關于文章中的疑問:為什么要構造一個反序鏈表,放在nextTable的i+n的位置上呢,在《深入分析ConcurrentHashMap1.8的擴容實現(xiàn) 》一文中進行了詳細分析皂林。
前言
HashMap是我們平時開發(fā)過程中用的比較多的集合,但它是非線程安全的蚯撩,在涉及到多線程并發(fā)的情況础倍,進行get操作有可能會引起死循環(huán),導致CPU利用率接近100%胎挎。
final HashMap<String, String> map = new HashMap<String, String>(2);
for (int i = 0; i < 10000; i++) {
new Thread(new Runnable() {
@Override
public void run() {
map.put(UUID.randomUUID().toString(), "");
}
}).start();
}
解決方案有Hashtable和Collections.synchronizedMap(hashMap)沟启,不過這兩個方案基本上是對讀寫進行加鎖操作,一個線程在讀寫元素犹菇,其余線程必須等待德迹,性能可想而知。
所以揭芍,Doug Lea給我們帶來了并發(fā)安全的ConcurrentHashMap胳搞,它的實現(xiàn)是依賴于 Java 內(nèi)存模型,所以我們在了解 ConcurrentHashMap 的之前必須了解一些底層的知識:
本文源碼是JDK8的版本称杨,與之前的版本有較大差異肌毅。
JDK1.7分析
ConcurrentHashMap采用 分段鎖的機制,實現(xiàn)并發(fā)的更新操作姑原,底層采用數(shù)組+鏈表的存儲結構芽腾。
其包含兩個核心靜態(tài)內(nèi)部類 Segment和HashEntry。
- Segment繼承ReentrantLock用來充當鎖的角色页衙,每個 Segment 對象守護每個散列映射表的若干個桶摊滔。
- HashEntry 用來封裝映射表的鍵 / 值對;
- 每個桶是由若干個 HashEntry 對象鏈接起來的鏈表店乐。
一個 ConcurrentHashMap 實例中包含由若干個 Segment 對象組成的數(shù)組艰躺,下面我們通過一個圖來演示一下 ConcurrentHashMap 的結構:
JDK1.8分析
1.8的實現(xiàn)已經(jīng)拋棄了Segment分段鎖機制,利用CAS+Synchronized來保證并發(fā)更新的安全眨八,底層采用數(shù)組+鏈表+紅黑樹的存儲結構腺兴。
重要概念
在開始之前,有些重要的概念需要介紹一下:
- table:默認為null廉侧,初始化發(fā)生在第一次插入操作页响,默認大小為16的數(shù)組篓足,用來存儲Node節(jié)點數(shù)據(jù),擴容時大小總是2的冪次方闰蚕。
- nextTable:默認為null栈拖,擴容時新生成的數(shù)組,其大小為原數(shù)組的兩倍没陡。
- sizeCtl :默認為0涩哟,用來控制table的初始化和擴容操作,具體應用在后續(xù)會體現(xiàn)出來盼玄。
- **-1 **代表table正在初始化
- **-N **表示有N-1個線程正在進行擴容操作
- 其余情況:
1贴彼、如果table未初始化,表示table需要初始化的大小埃儿。
2器仗、如果table初始化完成,表示table的容量童番,默認是table大小的0.75倍精钮,居然用這個公式算0.75(n - (n >>> 2))。 - Node:保存key妓盲,value及key的hash值的數(shù)據(jù)結構。
class Node<K,V> implements Map.Entry<K,V> {
final int hash;
final K key;
volatile V val;
volatile Node<K,V> next;
... 省略部分代碼
}
其中value和next都用volatile修飾专普,保證并發(fā)的可見性悯衬。
- ForwardingNode:一個特殊的Node節(jié)點,hash值為-1檀夹,其中存儲nextTable的引用筋粗。
final class ForwardingNode<K,V> extends Node<K,V> {
final Node<K,V>[] nextTable;
ForwardingNode(Node<K,V>[] tab) {
super(MOVED, null, null, null);
this.nextTable = tab;
}
}
只有table發(fā)生擴容的時候,F(xiàn)orwardingNode才會發(fā)揮作用炸渡,作為一個占位符放在table中表示當前節(jié)點為null或則已經(jīng)被移動娜亿。
實例初始化
實例化ConcurrentHashMap時帶參數(shù)時,會根據(jù)參數(shù)調(diào)整table的大小蚌堵,假設參數(shù)為100买决,最終會調(diào)整成256,確保table的大小總是2的冪次方吼畏,算法如下:
ConcurrentHashMap<String, String> hashMap = new ConcurrentHashMap<>(100);
private static final int tableSizeFor(int c) {
int n = c - 1;
n |= n >>> 1;
n |= n >>> 2;
n |= n >>> 4;
n |= n >>> 8;
n |= n >>> 16;
return (n < 0) ? 1 : (n >= MAXIMUM_CAPACITY) ? MAXIMUM_CAPACITY : n + 1;
}
注意督赤,ConcurrentHashMap在構造函數(shù)中只會初始化sizeCtl值,并不會直接初始化table泻蚊,而是延緩到第一次put操作躲舌。
table初始化
前面已經(jīng)提到過,table初始化操作會延緩到第一次put行為性雄。但是put是可以并發(fā)執(zhí)行的没卸,Doug Lea是如何實現(xiàn)table只初始化一次的羹奉?讓我們來看看源碼的實現(xiàn)。
private final Node<K,V>[] initTable() {
Node<K,V>[] tab; int sc;
while ((tab = table) == null || tab.length == 0) {
//如果一個線程發(fā)現(xiàn)sizeCtl<0约计,意味著另外的線程執(zhí)行CAS操作成功诀拭,當前線程只需要讓出cpu時間片
if ((sc = sizeCtl) < 0)
Thread.yield(); // lost initialization race; just spin
else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
try {
if ((tab = table) == null || tab.length == 0) {
int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
table = tab = nt;
sc = n - (n >>> 2);
}
} finally {
sizeCtl = sc;
}
break;
}
}
return tab;
}
sizeCtl默認為0,如果ConcurrentHashMap實例化時有傳參數(shù)病蛉,sizeCtl會是一個2的冪次方的值炫加。所以執(zhí)行第一次put操作的線程會執(zhí)行Unsafe.compareAndSwapInt方法修改sizeCtl為-1,有且只有一個線程能夠修改成功铺然,其它線程通過Thread.yield()讓出CPU時間片等待table初始化完成俗孝。
put操作
假設table已經(jīng)初始化完成,put操作采用CAS+synchronized實現(xiàn)并發(fā)插入或更新操作魄健,具體實現(xiàn)如下赋铝。
final V putVal(K key, V value, boolean onlyIfAbsent) {
if (key == null || value == null) throw new NullPointerException();
int hash = spread(key.hashCode());
int binCount = 0;
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh;
if (tab == null || (n = tab.length) == 0)
tab = initTable();
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value, null)))
break; // no lock when adding to empty bin
}
else if ((fh = f.hash) == MOVED)
tab = helpTransfer(tab, f);
...省略部分代碼
}
addCount(1L, binCount);
return null;
}
- hash算法
static final int spread(int h) {return (h ^ (h >>> 16)) & HASH_BITS;}
- table中定位索引位置,n是table的大小
int index = (n - 1) & hash
- 獲取table中對應索引的元素f沽瘦。
Doug Lea采用Unsafe.getObjectVolatile來獲取革骨,也許有人質(zhì)疑,直接table[index]不可以么析恋,為什么要這么復雜良哲?
在java內(nèi)存模型中,我們已經(jīng)知道每個線程都有一個工作內(nèi)存助隧,里面存儲著table的副本筑凫,雖然table是volatile修飾的,但不能保證線程每次都拿到table中的最新元素并村,Unsafe.getObjectVolatile可以直接獲取指定內(nèi)存的數(shù)據(jù)巍实,保證了每次拿到數(shù)據(jù)都是最新的。 - 如果f為null哩牍,說明table中這個位置第一次插入元素棚潦,利用Unsafe.compareAndSwapObject方法插入Node節(jié)點。
- 如果CAS成功膝昆,說明Node節(jié)點已經(jīng)插入丸边,隨后addCount(1L, binCount)方法會檢查當前容量是否需要進行擴容。
- 如果CAS失敗荚孵,說明有其它線程提前插入了節(jié)點原环,自旋重新嘗試在這個位置插入節(jié)點。
- 如果f的hash值為-1处窥,說明當前f是ForwardingNode節(jié)點嘱吗,意味有其它線程正在擴容,則一起進行擴容操作。
- 其余情況把新的Node節(jié)點按鏈表或紅黑樹的方式插入到合適的位置谒麦,這個過程采用同步內(nèi)置鎖實現(xiàn)并發(fā)俄讹,代碼如下:
synchronized (f) {
if (tabAt(tab, i) == f) {
if (fh >= 0) {
binCount = 1;
for (Node<K,V> e = f;; ++binCount) {
K ek;
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
oldVal = e.val;
if (!onlyIfAbsent)
e.val = value;
break;
}
Node<K,V> pred = e;
if ((e = e.next) == null) {
pred.next = new Node<K,V>(hash, key,
value, null);
break;
}
}
}
else if (f instanceof TreeBin) {
Node<K,V> p;
binCount = 2;
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
}
}
在節(jié)點f上進行同步,節(jié)點插入之前绕德,再次利用tabAt(tab, i) == f判斷患膛,防止被其它線程修改。
- 如果f.hash >= 0耻蛇,說明f是鏈表結構的頭結點踪蹬,遍歷鏈表,如果找到對應的node節(jié)點臣咖,則修改value跃捣,否則在鏈表尾部加入節(jié)點。
- 如果f是TreeBin類型節(jié)點夺蛇,說明f是紅黑樹根節(jié)點疚漆,則在樹結構上遍歷元素,更新或增加節(jié)點刁赦。
- 如果鏈表中節(jié)點數(shù)binCount >= TREEIFY_THRESHOLD(默認是8)娶聘,則把鏈表轉(zhuǎn)化為紅黑樹結構。
table擴容
當table容量不足的時候甚脉,即table的元素數(shù)量達到容量閾值sizeCtl丸升,需要對table進行擴容。
整個擴容分為兩部分:
- 構建一個nextTable牺氨,大小為table的兩倍狡耻。
- 把table的數(shù)據(jù)復制到nextTable中。
這兩個過程在單線程下實現(xiàn)很簡單波闹,但是ConcurrentHashMap是支持并發(fā)插入的酝豪,擴容操作自然也會有并發(fā)的出現(xiàn)涛碑,這種情況下精堕,第二步可以支持節(jié)點的并發(fā)復制,這樣性能自然提升不少蒲障,但實現(xiàn)的復雜度也上升了一個臺階歹篓。
先看第一步,構建nextTable揉阎,毫無疑問庄撮,這個過程只能只有單個線程進行nextTable的初始化,具體實現(xiàn)如下:
private final void addCount(long x, int check) {
... 省略部分代碼
if (check >= 0) {
Node<K,V>[] tab, nt; int n, sc;
while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
(n = tab.length) < MAXIMUM_CAPACITY) {
int rs = resizeStamp(n);
if (sc < 0) {
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
transferIndex <= 0)
break;
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
transfer(tab, nt);
}
else if (U.compareAndSwapInt(this, SIZECTL, sc,
(rs << RESIZE_STAMP_SHIFT) + 2))
transfer(tab, null);
s = sumCount();
}
}
}
通過Unsafe.compareAndSwapInt修改sizeCtl值毙籽,保證只有一個線程能夠初始化nextTable洞斯,擴容后的數(shù)組長度為原來的兩倍,但是容量是原來的1.5。
節(jié)點從table移動到nextTable烙如,大體思想是遍歷么抗、復制的過程。
- 首先根據(jù)運算得到需要遍歷的次數(shù)i亚铁,然后利用tabAt方法獲得i位置的元素f蝇刀,初始化一個forwardNode實例fwd。
- 如果f == null徘溢,則在table中的i位置放入fwd吞琐,這個過程是采用Unsafe.compareAndSwapObjectf方法實現(xiàn)的,很巧妙的實現(xiàn)了節(jié)點的并發(fā)移動然爆。
- 如果f是鏈表的頭節(jié)點站粟,就構造一個反序鏈表,把他們分別放在nextTable的i和i+n的位置上施蜜,移動完成卒蘸,采用Unsafe.putObjectVolatile方法給table原位置賦值fwd。
- 如果f是TreeBin節(jié)點翻默,也做一個反序處理缸沃,并判斷是否需要untreeify,把處理的結果分別放在nextTable的i和i+n的位置上修械,移動完成趾牧,同樣采用Unsafe.putObjectVolatile方法給table原位置賦值fwd。
遍歷過所有的節(jié)點以后就完成了復制工作肯污,把table指向nextTable翘单,并更新sizeCtl為新數(shù)組大小的0.75倍 ,擴容完成蹦渣。
紅黑樹構造
注意:如果鏈表結構中元素超過TREEIFY_THRESHOLD閾值哄芜,默認為8個,則把鏈表轉(zhuǎn)化為紅黑樹柬唯,提高遍歷查詢效率认臊。
if (binCount != 0) {
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i);
if (oldVal != null)
return oldVal;
break;
}
接下來我們看看如何構造樹結構,代碼如下:
private final void treeifyBin(Node<K,V>[] tab, int index) {
Node<K,V> b; int n, sc;
if (tab != null) {
if ((n = tab.length) < MIN_TREEIFY_CAPACITY)
tryPresize(n << 1);
else if ((b = tabAt(tab, index)) != null && b.hash >= 0) {
synchronized (b) {
if (tabAt(tab, index) == b) {
TreeNode<K,V> hd = null, tl = null;
for (Node<K,V> e = b; e != null; e = e.next) {
TreeNode<K,V> p =
new TreeNode<K,V>(e.hash, e.key, e.val,
null, null);
if ((p.prev = tl) == null)
hd = p;
else
tl.next = p;
tl = p;
}
setTabAt(tab, index, new TreeBin<K,V>(hd));
}
}
}
}
}
可以看出锄奢,生成樹節(jié)點的代碼塊是同步的失晴,進入同步代碼塊之后,再次驗證table中index位置元素是否被修改過拘央。
1涂屁、根據(jù)table中index位置Node鏈表,重新生成一個hd為頭結點的TreeNode鏈表灰伟。
2拆又、根據(jù)hd頭結點,生成TreeBin樹結構,并把樹結構的root節(jié)點寫到table的index位置的內(nèi)存中帖族,具體實現(xiàn)如下:
TreeBin(TreeNode<K,V> b) {
super(TREEBIN, null, null, null);
this.first = b;
TreeNode<K,V> r = null;
for (TreeNode<K,V> x = b, next; x != null; x = next) {
next = (TreeNode<K,V>)x.next;
x.left = x.right = null;
if (r == null) {
x.parent = null;
x.red = false;
r = x;
}
else {
K k = x.key;
int h = x.hash;
Class<?> kc = null;
for (TreeNode<K,V> p = r;;) {
int dir, ph;
K pk = p.key;
if ((ph = p.hash) > h)
dir = -1;
else if (ph < h)
dir = 1;
else if ((kc == null &&
(kc = comparableClassFor(k)) == null) ||
(dir = compareComparables(kc, k, pk)) == 0)
dir = tieBreakOrder(k, pk);
TreeNode<K,V> xp = p;
if ((p = (dir <= 0) ? p.left : p.right) == null) {
x.parent = xp;
if (dir <= 0)
xp.left = x;
else
xp.right = x;
r = balanceInsertion(r, x);
break;
}
}
}
}
this.root = r;
assert checkInvariants(root);
}
主要根據(jù)Node節(jié)點的hash值大小構建二叉樹义矛。這個紅黑樹的構造過程實在有點復雜,感興趣的同學可以看看源碼盟萨。
get操作
get操作和put操作相比凉翻,顯得簡單了許多。
public V get(Object key) {
Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
int h = spread(key.hashCode());
if ((tab = table) != null && (n = tab.length) > 0 &&
(e = tabAt(tab, (n - 1) & h)) != null) {
if ((eh = e.hash) == h) {
if ((ek = e.key) == key || (ek != null && key.equals(ek)))
return e.val;
}
else if (eh < 0)
return (p = e.find(h, key)) != null ? p.val : null;
while ((e = e.next) != null) {
if (e.hash == h &&
((ek = e.key) == key || (ek != null && key.equals(ek))))
return e.val;
}
}
return null;
}
- 判斷table是否為空捻激,如果為空制轰,直接返回null。
- 計算key的hash值胞谭,并獲取指定table中指定位置的Node節(jié)點垃杖,通過遍歷鏈表或則樹結構找到對應的節(jié)點,返回value值丈屹。
總結
ConcurrentHashMap 是一個并發(fā)散列映射表的實現(xiàn)调俘,它允許完全并發(fā)的讀取,并且支持給定數(shù)量的并發(fā)更新旺垒。相比于 HashTable 和同步包裝器包裝的 HashMap彩库,使用一個全局的鎖來同步不同線程間的并發(fā)訪問,同一時間點先蒋,只能有一個線程持有鎖骇钦,也就是說在同一時間點,只能有一個線程能訪問容器竞漾,這雖然保證多線程間的安全并發(fā)訪問眯搭,但同時也導致對容器的訪問變成串行化的了。
1.6中采用ReentrantLock 分段鎖的方式业岁,使多個線程在不同的segment上進行寫操作不會發(fā)現(xiàn)阻塞行為;1.8中直接采用了內(nèi)置鎖synchronized鳞仙,難道是因為1.8的虛擬機對內(nèi)置鎖已經(jīng)優(yōu)化的足夠快了?
END笔时。
我是占小狼棍好。
在魔都艱苦奮斗,白天是上班族梳玫,晚上是知識服務工作者爹梁。
讀完我的文章有收獲,記得關注和點贊哦姚垃,如果非要打賞,我也是不會拒絕的啦!