ConcurrentHashMap
在多線(xiàn)程環(huán)境下经磅,使用HashMap
進(jìn)行put
操作時(shí)存在丟失數(shù)據(jù)的情況抛猫,為了避免這種bug的隱患驰怎,強(qiáng)烈建議使用ConcurrentHashMap
代替HashMap
滤灯,為了對(duì)ConcurrentHashMap
有更深入的了解巍杈,本文將對(duì)ConcurrentHashMap
1.7和1.8的不同實(shí)現(xiàn)進(jìn)行分析忧饭。
1.7實(shí)現(xiàn)
數(shù)據(jù)結(jié)構(gòu)
jdk1.7中采用Segment
+ HashEntry
的方式進(jìn)行實(shí)現(xiàn),結(jié)構(gòu)如下:
ConcurrentHashMap
初始化時(shí)筷畦,計(jì)算出Segment
數(shù)組的大小ssize
和每個(gè)Segment
中HashEntry
數(shù)組的大小cap
词裤,并初始化Segment
數(shù)組的第一個(gè)元素;其中ssize
大小為2的冪次方鳖宾,默認(rèn)為16吼砂,cap
大小也是2的冪次方,最小值為2鼎文,最終結(jié)果根據(jù)根據(jù)初始化容量initialCapacity
進(jìn)行計(jì)算渔肩,計(jì)算過(guò)程如下:
if (c * ssize < initialCapacity)
++c;
int cap = MIN_SEGMENT_TABLE_CAPACITY;
while (cap < c)
cap <<= 1;
其中Segment
在實(shí)現(xiàn)上繼承了ReentrantLock
,這樣就自帶了鎖的功能拇惋。
put實(shí)現(xiàn)
當(dāng)執(zhí)行put
方法插入數(shù)據(jù)時(shí)周偎,根據(jù)key的hash值,在Segment
數(shù)組中找到相應(yīng)的位置蚤假,如果相應(yīng)位置的Segment
還未初始化栏饮,則通過(guò)CAS進(jìn)行賦值吧兔,接著執(zhí)行Segment
對(duì)象的put
方法通過(guò)加鎖機(jī)制插入數(shù)據(jù)磷仰,實(shí)現(xiàn)如下:
場(chǎng)景:線(xiàn)程A和線(xiàn)程B同時(shí)執(zhí)行相同Segment
對(duì)象的put
方法
1、線(xiàn)程A執(zhí)行tryLock()
方法成功獲取鎖境蔼,則把HashEntry
對(duì)象插入到相應(yīng)的位置灶平;
2、線(xiàn)程B獲取鎖失敗箍土,則執(zhí)行scanAndLockForPut()
方法逢享,在scanAndLockForPut
方法中,會(huì)通過(guò)重復(fù)執(zhí)行tryLock()
方法嘗試獲取鎖吴藻,在多處理器環(huán)境下瞒爬,重復(fù)次數(shù)為64,單處理器重復(fù)次數(shù)為1,當(dāng)執(zhí)行tryLock()
方法的次數(shù)超過(guò)上限時(shí)侧但,則執(zhí)行lock()
方法掛起線(xiàn)程B矢空;
3、當(dāng)線(xiàn)程A執(zhí)行完插入操作時(shí)禀横,會(huì)通過(guò)unlock()
方法釋放鎖屁药,接著喚醒線(xiàn)程B繼續(xù)執(zhí)行;
size實(shí)現(xiàn)
因?yàn)?code>ConcurrentHashMap是可以并發(fā)插入數(shù)據(jù)的柏锄,所以在準(zhǔn)確計(jì)算元素時(shí)存在一定的難度酿箭,一般的思路是統(tǒng)計(jì)每個(gè)Segment
對(duì)象中的元素個(gè)數(shù),然后進(jìn)行累加趾娃,但是這種方式計(jì)算出來(lái)的結(jié)果并不一樣的準(zhǔn)確的缭嫡,因?yàn)樵谟?jì)算后面幾個(gè)Segment
的元素個(gè)數(shù)時(shí),已經(jīng)計(jì)算過(guò)的Segment
同時(shí)可能有數(shù)據(jù)的插入或則刪除茫舶,在1.7的實(shí)現(xiàn)中械巡,采用了如下方式:
try {
for (;;) {
if (retries++ == RETRIES_BEFORE_LOCK) {
for (int j = 0; j < segments.length; ++j)
ensureSegment(j).lock(); // force creation
}
sum = 0L;
size = 0;
overflow = false;
for (int j = 0; j < segments.length; ++j) {
Segment<K,V> seg = segmentAt(segments, j);
if (seg != null) {
sum += seg.modCount;
int c = seg.count;
if (c < 0 || (size += c) < 0)
overflow = true;
}
}
if (sum == last)
break;
last = sum;
}
} finally {
if (retries > RETRIES_BEFORE_LOCK) {
for (int j = 0; j < segments.length; ++j)
segmentAt(segments, j).unlock();
}
}
先采用不加鎖的方式,連續(xù)計(jì)算元素的個(gè)數(shù)饶氏,最多計(jì)算3次:
1讥耗、如果前后兩次計(jì)算結(jié)果相同,則說(shuō)明計(jì)算出來(lái)的元素個(gè)數(shù)是準(zhǔn)確的疹启;
2古程、如果前后兩次計(jì)算結(jié)果都不同,則給每個(gè)Segment
進(jìn)行加鎖喊崖,再計(jì)算一次元素的個(gè)數(shù)挣磨;
1.8實(shí)現(xiàn)
數(shù)據(jù)結(jié)構(gòu)
1.8中放棄了Segment
臃腫的設(shè)計(jì),取而代之的是采用Node
+ CAS
+ Synchronized
來(lái)保證并發(fā)安全進(jìn)行實(shí)現(xiàn)荤懂,結(jié)構(gòu)如下:
只有在執(zhí)行第一次put
方法時(shí)才會(huì)調(diào)用initTable()
初始化Node
數(shù)組茁裙,實(shí)現(xiàn)如下:
private final Node<K,V>[] initTable() {
Node<K,V>[] tab; int sc;
while ((tab = table) == null || tab.length == 0) {
if ((sc = sizeCtl) < 0)
Thread.yield(); // lost initialization race; just spin
else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
try {
if ((tab = table) == null || tab.length == 0) {
int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
table = tab = nt;
sc = n - (n >>> 2);
}
} finally {
sizeCtl = sc;
}
break;
}
}
return tab;
}
put實(shí)現(xiàn)
當(dāng)執(zhí)行put
方法插入數(shù)據(jù)時(shí),根據(jù)key的hash值节仿,在Node
數(shù)組中找到相應(yīng)的位置晤锥,實(shí)現(xiàn)如下:
1、如果相應(yīng)位置的Node
還未初始化廊宪,則通過(guò)CAS插入相應(yīng)的數(shù)據(jù)矾瘾;
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value, null)))
break; // no lock when adding to empty bin
}
2、如果相應(yīng)位置的Node
不為空箭启,且當(dāng)前該節(jié)點(diǎn)不處于移動(dòng)狀態(tài)壕翩,則對(duì)該節(jié)點(diǎn)加synchronized
鎖,如果該節(jié)點(diǎn)的hash
不小于0傅寡,則遍歷鏈表更新節(jié)點(diǎn)或插入新節(jié)點(diǎn)放妈;
if (fh >= 0) {
binCount = 1;
for (Node<K,V> e = f;; ++binCount) {
K ek;
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
oldVal = e.val;
if (!onlyIfAbsent)
e.val = value;
break;
}
Node<K,V> pred = e;
if ((e = e.next) == null) {
pred.next = new Node<K,V>(hash, key, value, null);
break;
}
}
}
3北救、如果該節(jié)點(diǎn)是TreeBin
類(lèi)型的節(jié)點(diǎn),說(shuō)明是紅黑樹(shù)結(jié)構(gòu)芜抒,則通過(guò)putTreeVal
方法往紅黑樹(shù)中插入節(jié)點(diǎn)扭倾;
else if (f instanceof TreeBin) {
Node<K,V> p;
binCount = 2;
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key, value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
4、如果binCount
不為0挽绩,說(shuō)明put
操作對(duì)數(shù)據(jù)產(chǎn)生了影響膛壹,如果當(dāng)前鏈表的個(gè)數(shù)達(dá)到8個(gè),則通過(guò)treeifyBin
方法轉(zhuǎn)化為紅黑樹(shù)唉堪,如果oldVal
不為空模聋,說(shuō)明是一次更新操作,沒(méi)有對(duì)元素個(gè)數(shù)產(chǎn)生影響唠亚,則直接返回舊值链方;
if (binCount != 0) {
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i);
if (oldVal != null)
return oldVal;
break;
}
5、如果插入的是一個(gè)新節(jié)點(diǎn)灶搜,則執(zhí)行addCount()
方法嘗試更新元素個(gè)數(shù)baseCount
祟蚀;
size實(shí)現(xiàn)
1.8中使用一個(gè)volatile
類(lèi)型的變量baseCount
記錄元素的個(gè)數(shù),當(dāng)插入新數(shù)據(jù)或則刪除數(shù)據(jù)時(shí)割卖,會(huì)通過(guò)addCount()
方法更新baseCount
前酿,實(shí)現(xiàn)如下:
if ((as = counterCells) != null ||
!U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
CounterCell a; long v; int m;
boolean uncontended = true;
if (as == null || (m = as.length - 1) < 0 ||
(a = as[ThreadLocalRandom.getProbe() & m]) == null ||
!(uncontended =
U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {
fullAddCount(x, uncontended);
return;
}
if (check <= 1)
return;
s = sumCount();
}
1、初始化時(shí)counterCells
為空鹏溯,在并發(fā)量很高時(shí)罢维,如果存在兩個(gè)線(xiàn)程同時(shí)執(zhí)行CAS
修改baseCount
值,則失敗的線(xiàn)程會(huì)繼續(xù)執(zhí)行方法體中的邏輯丙挽,使用CounterCell
記錄元素個(gè)數(shù)的變化肺孵;
2、如果CounterCell
數(shù)組counterCells
為空颜阐,調(diào)用fullAddCount()
方法進(jìn)行初始化平窘,并插入對(duì)應(yīng)的記錄數(shù),通過(guò)CAS
設(shè)置cellsBusy字段凳怨,只有設(shè)置成功的線(xiàn)程才能初始化CounterCell
數(shù)組瑰艘,實(shí)現(xiàn)如下:
else if (cellsBusy == 0 && counterCells == as &&
U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
boolean init = false;
try { // Initialize table
if (counterCells == as) {
CounterCell[] rs = new CounterCell[2];
rs[h & 1] = new CounterCell(x);
counterCells = rs;
init = true;
}
} finally {
cellsBusy = 0;
}
if (init)
break;
}
3、如果通過(guò)CAS
設(shè)置cellsBusy字段失敗的話(huà)猿棉,則繼續(xù)嘗試通過(guò)CAS
修改baseCount
字段磅叛,如果修改baseCount
字段成功的話(huà)屑咳,就退出循環(huán)萨赁,否則繼續(xù)循環(huán)插入CounterCell
對(duì)象;
else if (U.compareAndSwapLong(this, BASECOUNT, v = baseCount, v + x))
break;
所以在1.8中的size
實(shí)現(xiàn)比1.7簡(jiǎn)單多兆龙,因?yàn)樵貍€(gè)數(shù)保存baseCount
中杖爽,部分元素的變化個(gè)數(shù)保存在CounterCell
數(shù)組中敲董,實(shí)現(xiàn)如下:
public int size() {
long n = sumCount();
return ((n < 0L) ? 0 :
(n > (long)Integer.MAX_VALUE) ? Integer.MAX_VALUE :
(int)n);
}
final long sumCount() {
CounterCell[] as = counterCells; CounterCell a;
long sum = baseCount;
if (as != null) {
for (int i = 0; i < as.length; ++i) {
if ((a = as[i]) != null)
sum += a.value;
}
}
return sum;
}
通過(guò)累加baseCount
和CounterCell
數(shù)組中的數(shù)量,即可得到元素的總個(gè)數(shù)慰安;