在java的集合家族中富弦, ConcurrentHashMap 當之無愧是支持并發(fā)最好的鍵值對(Map)集合茫舶。在日常編碼中因宇,出場率也相當之高翩剪。在jdk8中乳怎,集合類 ConcurrentHashMap 經(jīng) Doug Lea 大師之手,借助volatile語義以及CAS操作進行優(yōu)化前弯,使得該集合類更好地發(fā)揮出了并發(fā)的優(yōu)勢蚪缀。與jdk7中相比,在原有段鎖(Segment)的基礎(chǔ)上恕出,引入了數(shù)組+鏈表+紅黑樹的存儲模型询枚,在查詢效率上花費了不少心思。
本文作為源碼解讀筆記剃根,逐一分析關(guān)鍵函數(shù)哩盲,初探下jdk8版本的ConcurrentHashMap在并發(fā)與性能設(shè)計上的精妙之處。
基礎(chǔ)數(shù)據(jù)結(jié)構(gòu)
ConcurrentHashMap內(nèi)存存儲結(jié)構(gòu)圖大致如下:
ConcurrentHashMap部分成員變量/常量分析如下(詳見注釋):
/**
* 真正存儲Node數(shù)據(jù)(桶首)節(jié)點的數(shù)組table
* 所有Node節(jié)點根據(jù)hash分桶存儲
* table數(shù)組中存儲的是所有桶(bin)的首節(jié)點
* hash值相同的節(jié)點以鏈表形式分裝在桶中
* 當一個桶中節(jié)點數(shù)達到8個時,轉(zhuǎn)換為紅黑樹廉油,提高查詢效率
*/
transient volatile Node<K,V>[] table;
/**
* 只在擴容時使用
*/
private transient volatile Node<K,V>[] nextTable;
/**
* 重要控制變量
* 根據(jù)變量的數(shù)值不同惠险,類實例處于不同階段
* 1\. = -1 : 正在初始化
* 2\. < -1 : 正在擴容,數(shù)值為 -(1 + 參與擴容的線程數(shù))
* 3\. = 0 : 創(chuàng)建時初始為0
* 4\. > 0 : 下一次擴容的大小
*/
private transient volatile int sizeCtl;
/**
* table的最大容量抒线,必須為2次冪形式
*/
private static final int MAXIMUM_CAPACITY = 1 << 30;
/**
* table的默認初始容量班巩,必須為2次冪形式
*/
private static final int DEFAULT_CAPACITY = 16;
/**
* table的負載因子,當前節(jié)點數(shù)量超過 n * LOAD_FACTOR嘶炭,執(zhí)行擴容
* 位操作表達式為 n - (n >>> 2)
*/
private static final float LOAD_FACTOR = 0.75f;
/**
* 針對每個桶(bin)抱慌,鏈表轉(zhuǎn)換為紅黑樹的節(jié)點數(shù)閾值
*/
static final int TREEIFY_THRESHOLD = 8;
/**
* 針對每個桶(bin),紅黑樹退化為鏈表的節(jié)點數(shù)閾值
*/
static final int UNTREEIFY_THRESHOLD = 6;
/**
* 在擴容中眨猎,參與的單個線程允許處理的最少table桶首節(jié)點個數(shù)
* 雖然適當添加線程抑进,會使得整個擴容過程變快,但需要考慮多線程內(nèi)存同時分配的問題
*/
private static final int MIN_TRANSFER_STRIDE = 16;
Node<K,V>節(jié)點是ConcurrentHashMap存儲數(shù)據(jù)的最基本結(jié)構(gòu)睡陪。一個數(shù)據(jù)mapping節(jié)點中寺渗,存儲4個變量:當前節(jié)點hash值、節(jié)點的key值兰迫、節(jié)點的value值信殊、指向下一個節(jié)點的指針next。其中在子類中的hash可以為負數(shù)汁果,具有特殊的并發(fā)處理意義涡拘,后文會解釋。除了具有特殊意義的子類据德,Node中的key和val不允許為null鳄乏。
/**
* 基礎(chǔ)的鍵值對節(jié)點
*/
static class Node<K,V> implements Map.Entry<K,V> {
final int hash;
final K key;
volatile V val; // 申明為volatile
volatile Node<K,V> next; // 申明為volatile
// 此處省略部分成員函數(shù)
Node<K,V> find(int h, Object k) {
Node<K,V> e = this;
if (k != null) {
do {
K ek;
if (e.hash == h &&
((ek = e.key) == k || (ek != null && k.equals(ek))))
return e;
/**
* 以鏈表形式查找桶中下一個Node信息
* 當轉(zhuǎn)換為subclass紅黑樹節(jié)點TreeNode
* 則使用TreeNode中的find進行查詢操作
*/
} while ((e = e.next) != null);
}
return null;
}
}
在真正閱讀 ConcurrentHashMap 源碼之前,我們簡單復(fù)習下關(guān)于volatile和CAS的概念棘利,這樣才能更好地幫助我們理解源碼中的關(guān)鍵方法汞窗。
volatile語義
java提供的關(guān)鍵字volatile是最輕量級的同步機制。當定義一個變量為volatile時赡译,它就具備了三層語義: - 可見性(Visibility):在多線程環(huán)境下,一個變量的寫操作總是對其后的讀取線程可見 - 原子性(Atomicity):volatile的讀/寫操作具有原子性 - 有序性(Ordering):禁止指令的重排序優(yōu)化不铆,JVM會通過插入內(nèi)存屏障(Memory Barrier)指令來保證
就同步性能而言蝌焚,大多數(shù)場景下volatile的總開銷是要比鎖低的。在ConcurrentHashMap的源碼中誓斥,我們能看到頻繁的volatile變量讀取與寫入只洒。
/**
* 讀取Table數(shù)組表中的Node元素
*/
static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) {
// volatile讀
return (Node<K,V>)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE);
}
/**
* 賦值Table數(shù)組表中的Node元素
*/
static final <K,V> void setTabAt(Node<K,V>[] tab, int i, Node<K,V> v) {
// volatile寫
U.putObjectVolatile(tab, ((long)i << ASHIFT) + ABASE, v);
}
這兩個函數(shù)在ConcurrentHashMap中使用非常頻繁。對數(shù)組表中Node劳坑,最基礎(chǔ)的讀取與賦值操作毕谴,都使用了Unsafe類中附帶volatile語義的操作。這正是上層邏輯函數(shù)中能夠盡可能去鎖化的基石所在。
CAS操作
CAS:Compare and Swap
/**
* 利用CAS賦值Table數(shù)組表中的Node元素
* 與volatile的setTabAt操作不同涝开,setTabAt操作一定會成功循帐,
* 但casTabAt由于期望一個原有值expected,可能會失敗
*/
static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i,
Node<K,V> c, Node<K,V> v) {
return U.compareAndSwapObject(tab, ((long)i << ASHIFT) + ABASE, c, v);
}
CAS一般被理解為原子操作舀武。在java中拄养,正是利用了處理器的CMPXCHG(intel)指令實現(xiàn)CAS操作。CAS需要接受原有期望值expected以及想要修改的新值x银舱,只有在原有期望值與當前值相等時才會更新為x瘪匿,否則為失敗。在ConcurrentHashMap的方法中寻馏,大量使用CAS獲取/修改互斥量棋弥,以達到多線程并發(fā)環(huán)境下的正確性。
源碼分析之putVal方法
putVal是將一個新key-value mapping插入到當前ConcurrentHashMap的關(guān)鍵方法诚欠。
此方法的具體流程如下圖:
putVal源碼分析如下(詳見注釋):
final V putVal(K key, V value, boolean onlyIfAbsent) {
if (key == null || value == null) throw new NullPointerException();
// 計算新節(jié)點的hash值
int hash = spread(key.hashCode());
int binCount = 0;
// 獲取當前table顽染,進入死循環(huán)
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh;
// 當前table為空,則進行初始化聂薪,然后重新進入死循環(huán)
if (tab == null || (n = tab.length) == 0)
tab = initTable();
// hash值對應(yīng)table數(shù)組中的桶首節(jié)點為空家乘,則直接cas插入新節(jié)點為桶首節(jié)點
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
if (casTabAt(tab, i, null,
new Node<K,V>(hash, key, value, null)))
// 直接設(shè)置為桶首節(jié)點成功,退出死循環(huán)(出口之一)
break;
}
// 當前桶首節(jié)點正在特殊的擴容狀態(tài)下藏澳,當前線程嘗試參與擴容
// 然后重新進入死循環(huán)
else if ((fh = f.hash) == MOVED)
tab = helpTransfer(tab, f);
// 通過桶首節(jié)點仁锯,將新節(jié)點加入table
else {
V oldVal = null;
// 獲取桶首節(jié)點實例對象鎖,進入臨界區(qū)進行添加操作
synchronized (f) {
if (tabAt(tab, i) == f) {
// 桶首節(jié)點hash值>0翔悠,表示為鏈表
if (fh >= 0) {
binCount = 1;
for (Node<K,V> e = f;; ++binCount) {
K ek;
// key已經(jīng)存在业崖,則替換
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
oldVal = e.val;
if (!onlyIfAbsent)
e.val = value;
break;
}
Node<K,V> pred = e;
// key不存在,則插入在鏈表尾部
if ((e = e.next) == null) {
pred.next = new Node<K,V>(hash, key,
value, null);
break;
}
}
}
// 桶首節(jié)點為Node子類型TreeBin蓄愁,表示為紅黑樹
else if (f instanceof TreeBin) {
Node<K,V> p;
binCount = 2;
// 調(diào)用putTreeVal方法双炕,插入新值
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
value)) != null) {
// key已經(jīng)存在,則替換
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
}
}
if (binCount != 0) {
if (binCount >= TREEIFY_THRESHOLD)
// 插入新節(jié)點后撮抓,達到鏈表轉(zhuǎn)換紅黑樹閾值妇斤,則執(zhí)行轉(zhuǎn)換操作
treeifyBin(tab, i);
// 退出死循環(huán)(出口之二)
if (oldVal != null)
return oldVal;
break;
}
}
}
// 更新計算count時的base和counterCells數(shù)組
// 檢查是否有擴容需求,如是丹拯,則執(zhí)行擴容
addCount(1L, binCount);
return null;
}
該流程中站超,可以細細品味的環(huán)節(jié)有: - 初始化方法 initTable - 擴容方法 transfer (在多線程擴容方法 helpTransfer 中被調(diào)用)
源碼分析之初始化initTable方法
initTable方法允許多線程同時進入,但只有一個線程可以完成table的初始化乖酬,其他線程都會通過yield方法讓出cpu死相。
private final Node<K,V>[] initTable() {
Node<K,V>[] tab; int sc;
while ((tab = table) == null || tab.length == 0) {
if ((sc = sizeCtl) < 0)
// 前文提及sizeCtl是重要的控制變量
// sizeCtl = -1 表示正在初始化
// 已經(jīng)有其他線程在執(zhí)行初始化,則主動讓出cpu
Thread.yield();
// 利用CAS操作設(shè)置sizeCtl為-1
// 設(shè)置成功表示當前線程為執(zhí)行初始化的唯一線程
// 此處進入臨界區(qū)
else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
try {
// 由于讓出cpu的線程也會后續(xù)進入該臨界區(qū)
// 需要進行再次確認table是否為null
if ((tab = table) == null || tab.length == 0) {
int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
@SuppressWarnings("unchecked")
// 真正初始化咬像,即分配Node數(shù)組
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
table = tab = nt;
// 默認負載為0.75
sc = n - (n >>> 2);
}
} finally {
sizeCtl = sc;
}
// 退出死循環(huán)的唯一出口
break;
}
}
return tab;
}
源碼分析之擴容transfer方法
擴容transfer方法是一個設(shè)計極為精巧的方法算撮。通過互斥讀寫ForwardingNode生宛,多線程可以協(xié)同完成擴容任務(wù)。
transfer源碼分析如下(詳見注釋):
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
int n = tab.length, stride;
if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
// 單個線程允許處理的最少table桶首節(jié)點個數(shù)
// 即每個線程的處理任務(wù)量
stride = MIN_TRANSFER_STRIDE;
// nextTab為擴容中的臨時table
if (nextTab == null) {
try {
@SuppressWarnings("unchecked")
// 擴容后的容量為當前的2倍
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
nextTab = nt;
} catch (Throwable ex) {
sizeCtl = Integer.MAX_VALUE;
return;
}
nextTable = nextTab;
// transferIndex為擴容復(fù)制過程中的桶首節(jié)點遍歷索引
// 所以從n開始肮柜,表示從后向前遍歷
transferIndex = n;
}
int nextn = nextTab.length;
// ForwardingNode是Node節(jié)點的直接子類陷舅,是擴容過程中的特殊桶首節(jié)點
// 該類中沒有key,value,next
// hash值為特定的-1
// 附加Node<K,V>[] nextTable變量指向擴容中的nextTab
// 在find方法中,將擴容中的查詢操作導(dǎo)入到nextTab上
ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
boolean advance = true;
boolean finishing = false;
for (int i = 0, bound = 0;;) {
Node<K,V> f; int fh;
while (advance) {
int nextIndex, nextBound;
if (--i >= bound || finishing)
advance = false;
// transferIndex = 0表示table中所有數(shù)組元素都已經(jīng)有其他線程負責擴容
else if ((nextIndex = transferIndex) <= 0) {
i = -1;
advance = false;
}
// 嘗試更新transferIndex素挽,獲取當前線程執(zhí)行擴容復(fù)制的索引區(qū)間
// 更新成功蔑赘,則當前線程負責完成索引為(nextBound,nextIndex)之間的桶首節(jié)點擴容
else if (U.compareAndSwapInt
(this, TRANSFERINDEX, nextIndex,
nextBound = (nextIndex > stride ?
nextIndex - stride : 0))) {
bound = nextBound;
i = nextIndex - 1;
advance = false;
}
}
if (i < 0 || i >= n || i + n >= nextn) {
int sc;
if (finishing) {
nextTable = null;
table = nextTab;
// 擴容成功预明,設(shè)置新sizeCtl缩赛,仍然為總大小的0.75
sizeCtl = (n << 1) - (n >>> 1);
return;
}
if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
return;
finishing = advance = true;
i = n;
}
}
// 當前table節(jié)點為空,不需要復(fù)制撰糠,直接放入ForwardingNode
else if ((f = tabAt(tab, i)) == null)
advance = casTabAt(tab, i, null, fwd);
// 當前table節(jié)點已經(jīng)是ForwardingNode
// 表示已經(jīng)被其他線程處理了酥馍,則直接往前遍歷
// 通過CAS讀寫ForwardingNode節(jié)點狀態(tài),達到多線程互斥處理
else if ((fh = f.hash) == MOVED)
advance = true;
// 此處開始執(zhí)行真正的擴容操作
else {
// 鎖住當前桶首節(jié)點
synchronized (f) {
if (tabAt(tab, i) == f) {
Node<K,V> ln, hn;
// 鏈表節(jié)點復(fù)制
if (fh >= 0) {
int runBit = fh & n;
Node<K,V> lastRun = f;
for (Node<K,V> p = f.next; p != null; p = p.next) {
int b = p.hash & n;
if (b != runBit) {
runBit = b;
lastRun = p;
}
}
if (runBit == 0) {
ln = lastRun;
hn = null;
}
else {
hn = lastRun;
ln = null;
}
for (Node<K,V> p = f; p != lastRun; p = p.next) {
int ph = p.hash; K pk = p.key; V pv = p.val;
if ((ph & n) == 0)
ln = new Node<K,V>(ph, pk, pv, ln);
else
hn = new Node<K,V>(ph, pk, pv, hn);
}
setTabAt(nextTab, i, ln);
setTabAt(nextTab, i + n, hn);
// 擴容成功后阅酪,設(shè)置ForwardingNode節(jié)點
setTabAt(tab, i, fwd);
advance = true;
}
// 紅黑樹節(jié)點復(fù)制
else if (f instanceof TreeBin) {
TreeBin<K,V> t = (TreeBin<K,V>)f;
TreeNode<K,V> lo = null, loTail = null;
TreeNode<K,V> hi = null, hiTail = null;
int lc = 0, hc = 0;
for (Node<K,V> e = t.first; e != null; e = e.next) {
int h = e.hash;
TreeNode<K,V> p = new TreeNode<K,V>
(h, e.key, e.val, null, null);
if ((h & n) == 0) {
if ((p.prev = loTail) == null)
lo = p;
else
loTail.next = p;
loTail = p;
++lc;
}
else {
if ((p.prev = hiTail) == null)
hi = p;
else
hiTail.next = p;
hiTail = p;
++hc;
}
}
ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
(hc != 0) ? new TreeBin<K,V>(lo) : t;
hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
(lc != 0) ? new TreeBin<K,V>(hi) : t;
setTabAt(nextTab, i, ln);
setTabAt(nextTab, i + n, hn);
// 擴容成功后旨袒,設(shè)置ForwardingNode節(jié)點
setTabAt(tab, i, fwd);
advance = true;
}
}
}
}
}
}
源碼分析之get方法
相比之下,獲取方法get要簡單很多术辐。唯一需要注意的是砚尽,即使在擴容情況下,get操作也能正確執(zhí)行辉词。
public V get(Object key) {
Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
// 計算當前key值的hash值
int h = spread(key.hashCode());
if ((tab = table) != null && (n = tab.length) > 0 &&
// 唯一一處volatile讀操作
(e = tabAt(tab, (n - 1) & h)) != null) {
// 判斷是否就是桶首節(jié)點
if ((eh = e.hash) == h) {
if ((ek = e.key) == key || (ek != null && key.equals(ek)))
return e.val;
}
// hash為負表示是擴容中的ForwardingNode節(jié)點
// 直接調(diào)用ForwardingNode的find方法(可以是代理到擴容中的nextTable)
else if (eh < 0)
return (p = e.find(h, key)) != null ? p.val : null;
// 通過next指針必孤,逐一查找
while ((e = e.next) != null) {
if (e.hash == h &&
((ek = e.key) == key || (ek != null && key.equals(ek))))
return e.val;
}
}
return null;
}
總結(jié)
通過分析ConcurrentHashMap中put、get瑞躺、transfer方法敷搪,我們可以看到在jdk8下,該集合類在并發(fā)性上做出了諸多優(yōu)化幢哨。volatile語義提供更細顆粒度的輕量級鎖赡勘,使得多線程可以(幾乎)同時讀寫實例中的關(guān)鍵量,正確理解當前類所處的狀態(tài)捞镰,進入對應(yīng)if語句中執(zhí)行相關(guān)邏輯闸与。通過學(xué)習這些關(guān)鍵方法中的并發(fā)處理,我們可以體會更多大師筆下的精妙設(shè)計岸售。醍醐灌頂几迄,茅塞頓開,豈不美哉冰评?
轉(zhuǎn)自https://bentang.me/tech/2016/12/01/jdk8-concurrenthashmap-1