1. 概述
ConcurrentHashMap
是JDK提供的一種線程安全的HashMap
實現(xiàn)沃粗,JDK1.8對ConcurrentHashMap
進(jìn)行了大量優(yōu)化,除了增加了函數(shù)式編程特性键畴,還對加鎖方式進(jìn)行了優(yōu)化最盅,它拋棄了JDK1.6中分段鎖的設(shè)計,而是直接對Map中Table數(shù)組的每個節(jié)點進(jìn)行加鎖起惕,進(jìn)一步減少了鎖粒度檩禾,并且不再采用ReentrantLock
加鎖 ,直接使用synchronized
同步塊(JDK1.6開始已經(jīng)對synchronized
做了大量優(yōu)化疤祭,加入了自旋鎖盼产、偏向鎖、輕量級鎖勺馆、重量級鎖等)戏售。
為了提高查詢效率,采用了數(shù)組+鏈表+紅黑樹的設(shè)計草穆,當(dāng)鏈表中的元素個數(shù)大于64灌灾,且數(shù)組中鏈表節(jié)點長度大于8,則會自動把鏈表轉(zhuǎn)化為紅黑樹悲柱,當(dāng)兩個條件有一個不滿足時锋喜,會回退到數(shù)組+單鏈表的數(shù)據(jù)結(jié)構(gòu)。
在JDK1.8的實現(xiàn)中豌鸡,還實現(xiàn)了并發(fā)擴(kuò)容機(jī)制嘿般,也就是可以由多個線程同時幫助擴(kuò)容,加速數(shù)據(jù)轉(zhuǎn)移過程涯冠,極大提升了效率炉奴,本文嘗試著把這個擴(kuò)容過程解釋清楚。
2. 實現(xiàn)原理
2.1 基礎(chǔ)
首先介紹一下CocurrentHashMap中幾個重要變量蛇更,這些變量在原子更新瞻赶、并發(fā)擴(kuò)容控制以及統(tǒng)計元素個數(shù)方面發(fā)揮著重要作用。
// 代表Map中元素個數(shù)的的基礎(chǔ)計數(shù)器派任,當(dāng)無競爭時直接使用CAS方式更新該數(shù)值
transient volatile long baseCount;
/**
* sizeCtl用于table初始化和擴(kuò)容控制砸逊,當(dāng)為正數(shù)時表示table的擴(kuò)容閾值(n * 0.75),當(dāng)為負(fù)數(shù)時表示table正在初始化或者擴(kuò)容掌逛,
* -1表示table正在初始化师逸,其他負(fù)值代表正在擴(kuò)容,第一個擴(kuò)容的線程會把擴(kuò)容戳rs左移RESIZE_STAMP_SHIFT(默認(rèn)16)位再加2更新設(shè)置到sizeCtl中(sizeCtl = (rs << 16) + 2)颤诀,
* 每次一個新線程來擴(kuò)容時都令sizeCtl = sizeCtl + 1字旭,因此可根據(jù)sizeCtl計算出正在擴(kuò)容的線程數(shù)对湃,注釋中所
* 描述的 sizeCtl = -(1+threads)是不準(zhǔn)確的。擴(kuò)容時sizeCtl有兩部分組成遗淳,第一部分是擴(kuò)容戳拍柒,占據(jù)sizeCtl的高有效位,長度為
* RESIZE_STAMP_BITS位(默認(rèn)16)屈暗,剩下的低有效位長度為32-RESIZE_STAMP_BITS位(16)拆讯,每個新線程協(xié)助擴(kuò)容時sizeCtl+1
* ,直到所有的低有效位被占滿养叛,低有效位默認(rèn)占16位(最高位為符號位)种呐,所以擴(kuò)容線程數(shù)默認(rèn)最大為65535
*/
transient volatile int sizeCtl;
/**
* 用于控制多個線程去擴(kuò)容時領(lǐng)取擴(kuò)容子任務(wù),每個線程領(lǐng)取子任務(wù)時弃甥,要減去擴(kuò)容步長爽室,如果能減成功,
* 則成功領(lǐng)取一個擴(kuò)容子任務(wù)淆攻,`transferIndex = transferIndex - stride(擴(kuò)容步長)`阔墩,transferIndex減到0時
* 代表沒有可以領(lǐng)取的擴(kuò)容子任務(wù)。
*/
transient volatile int transferIndex;
// 擴(kuò)容或者創(chuàng)建CounterCells時使用的自旋鎖(使用CAS實現(xiàn))瓶珊;
transient volatile int cellsBusy;
/**
* 存儲Map中元素的計數(shù)器啸箫,當(dāng)并發(fā)量較高時`baseCount`競爭較為激烈,更新效率較低伞芹,所以把變化的數(shù)值
* 更新到`counterCells`中的某個節(jié)點上忘苛,計算size()時需要統(tǒng)計每個`counterCells`的大小再加上`baseCount`的數(shù)值。
*/
transient volatile CounterCell[] counterCells;
/**
* ConcurrentHashMap采用cas算法進(jìn)行更新變量(table[i]唱较,sizeCtl扎唾,transferIndex,cellsBusy等)來保證線程安全性绊汹,它其實是一種樂觀策略稽屏,
* 假設(shè)每次都不會產(chǎn)生沖突,所以能夠直接更新成功西乖,如果出現(xiàn)沖突則再重試,直到更新成功坛增。實現(xiàn)cas主要是借助了`sun.misc.Unsafe`類获雕,該類提供了
* 諸多根據(jù)內(nèi)存偏移量直接從內(nèi)存中讀取設(shè)置對象屬性的底層操作。
*/
static final sun.misc.Unsafe U;
下面是
ConcurrentHashMap
中的重要常量的含義及功能說明
// HashMap的最大容量(table數(shù)組的長度):2^30收捣,因為hashCode最高兩位用于控制目的届案,因此hashCode最大取值為2^30,
// 所以table數(shù)組長度n > hashCode罢艾,hashCode & (n-1)時無法索引到數(shù)組后面的節(jié)點上
private static final int MAXIMUM_CAPACITY = 1 << 30;
// 負(fù)載因子楣颠,最大容量為數(shù)組長度*負(fù)載因子尽纽,元素個數(shù)超過容量則觸發(fā)擴(kuò)容
private static final float LOAD_FACTOR = 0.75f;
// 鏈表長度大于8時鏈表轉(zhuǎn)化為紅黑樹
static final int TREEIFY_THRESHOLD = 8;
// 紅黑樹節(jié)點數(shù)小于6時紅黑樹轉(zhuǎn)化為單鏈表
static final int UNTREEIFY_THRESHOLD = 6;
// 容量大于64時轉(zhuǎn)化為紅黑樹
static final int MIN_TREEIFY_CAPACITY = 64;
// 最小轉(zhuǎn)移步長:由于在擴(kuò)容過程中念逞,會把一個待轉(zhuǎn)移的數(shù)組分為多個區(qū)間段(轉(zhuǎn)移步長)褪测,每個線程一次轉(zhuǎn)移一個區(qū)間段的數(shù)據(jù)也颤,
// 一個區(qū)間段(轉(zhuǎn)移步長)的默認(rèn)長度是16秦爆,實際運行過程中會動態(tài)計算
private static final int MIN_TRANSFER_STRIDE = 16;
/**
* 擴(kuò)容戳有效位數(shù):每次在需要擴(kuò)容的時會根據(jù)當(dāng)前數(shù)組table的大小生成一個擴(kuò)容戳垛贤,當(dāng)一個線程需要
* 協(xié)助擴(kuò)容時需要實時計算擴(kuò)容戳來驗證是否
* 需要協(xié)助擴(kuò)容或擴(kuò)容過程是否完成妖混,生成擴(kuò)容戳的方式:Integer.numberOfLeadingZeros(n) | (1 << (RESIZE_STAMP_BITS - 1));
* 其中n表示當(dāng)前table的大小少漆,利用該常量表示擴(kuò)容戳的有效位長度厅须,默認(rèn)16位侧馅。
*/
private static int RESIZE_STAMP_BITS = 16;
// 最大并發(fā)擴(kuò)容線程數(shù):65535
private static final int MAX_RESIZERS = (1 << (32 - RESIZE_STAMP_BITS)) - 1;
// 擴(kuò)容戳移位大形D颉:sizeCtl為int類型,長度為32位馁痴,擴(kuò)容戳有效位有RESIZE_STAMP_BITS位(默認(rèn)16位)谊娇,
// 所以把擴(kuò)容戳移到sizeCtl最高有效位時需要移位的個數(shù)為: 32 - RESIZE_STAMP_BITS
private static final int RESIZE_STAMP_SHIFT = 32 - RESIZE_STAMP_BITS;
ConcurrentHashMap對table數(shù)組中的數(shù)據(jù)節(jié)點查詢更新由Unsafe類實現(xiàn),每次查詢更新都是從內(nèi)存中直接取數(shù)據(jù)罗晕,借助了如下3個小函數(shù):
- 獲取table中index為i的節(jié)點
static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) {
return (Node<K,V>)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE);
}
- table中index為i的節(jié)點更新為v(cas原子更新方式)
static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i, Node<K,V> c, Node<K,V> v) {
return U.compareAndSwapObject(tab, ((long)i << ASHIFT) + ABASE, c, v);
}
- 把table中index為i的節(jié)點更新為v(普通更新方式)
static final <K,V> void setTabAt(Node<K,V>[] tab, int i, Node<K,V> v) {
U.putObjectVolatile(tab, ((long)i << ASHIFT) + ABASE, v);
}
2.2 實現(xiàn)過程
由于擴(kuò)容(resize)過程是在向Map中插入節(jié)點時所觸發(fā)的數(shù)據(jù)轉(zhuǎn)移過程济欢,所以直接從插入數(shù)據(jù)的內(nèi)部實現(xiàn)方法putVal()開始介紹。
CocurrentHashMap中的put()和putIfAbsent()方法內(nèi)部都是借助putVal()方法實現(xiàn)攀例,所以我們只需要看一下putVal()的實現(xiàn)過程船逮。
/** Implementation for put and putIfAbsent */
final V putVal(K key, V value, boolean onlyIfAbsent) {
if (key == null || value == null) throw new NullPointerException();
int hash = spread(key.hashCode());//此處進(jìn)行了一次重哈希(高16位與低16位異或),減少哈希碰撞
int binCount = 0;
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh;
if (tab == null || (n = tab.length) == 0)
tab = initTable();
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) { // 如果插入的位置還沒有節(jié)點粤铭,則使用cas方式直接把節(jié)點設(shè)置到該位置上
if (casTabAt(tab, i, null,
new Node<K,V>(hash, key, value, null)))
break; // no lock when adding to empty bin
}
else if ((fh = f.hash) == MOVED) // 如果要插入的位置的節(jié)點是轉(zhuǎn)移節(jié)點挖胃,說明Map正在擴(kuò)容,則協(xié)助擴(kuò)容
tab = helpTransfer(tab, f);
else {
V oldVal = null;
synchronized (f) { // 對節(jié)點f加鎖
if (tabAt(tab, i) == f) { // 加鎖成功
if (fh >= 0) { // hashCode>0,代表節(jié)點為單鏈表
binCount = 1;
for (Node<K,V> e = f;; ++binCount) {
K ek;
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) { //key在Map中已經(jīng)存在梆惯,更新key的值
oldVal = e.val;
if (!onlyIfAbsent)
e.val = value;
break;
}
Node<K,V> pred = e;
if ((e = e.next) == null) { //新節(jié)點插入到鏈表的最后一個位置
pred.next = new Node<K,V>(hash, key,
value, null);
break;
}
}
}
else if (f instanceof TreeBin) { //如果為紅黑樹節(jié)點
Node<K,V> p;
binCount = 2;
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
value)) != null) { // 向紅黑數(shù)中插入一個節(jié)點
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
}
}
if (binCount != 0) {
if (binCount >= TREEIFY_THRESHOLD)//如果鏈表節(jié)點數(shù)量大于閾值(默認(rèn)8)酱鸭,則轉(zhuǎn)化為紅黑樹
treeifyBin(tab, i);
if (oldVal != null)
return oldVal;
break;
}
}
}
addCount(1L, binCount); // 節(jié)點插入完成后,更新Map中節(jié)點總數(shù)
return null;
}
從上述代碼可以總結(jié)出插入數(shù)據(jù)節(jié)點的過程:
- 首先是根據(jù)key的hashCode做一次重哈希(進(jìn)一步減少哈希碰撞)
- 先判斷table為空垛吗,空則初始化Map凹髓,否則:
- 根據(jù)hashCode取模定位到table中的某個節(jié)點f,如果f為空怯屉,則新創(chuàng)建一個節(jié)點蔚舀,使用cas方式更新到數(shù)組的f節(jié)點上,插入結(jié)束锨络,否則:
- 若f是轉(zhuǎn)移轉(zhuǎn)移節(jié)點赌躺,則調(diào)用helpTransfer協(xié)助轉(zhuǎn)移,否則:
- 鎖定節(jié)點f(通過synchronized加鎖)
- 節(jié)點f鎖定成功后判斷節(jié)點f類型羡儿,如果f是鏈表節(jié)點礼患,則直接插入到鏈表底端(key不存在的話),如果節(jié)點f是紅黑樹節(jié)點,則按照二叉搜索樹的方式插入節(jié)點缅叠,并調(diào)整樹結(jié)構(gòu)使其滿足紅黑規(guī)則
- 最后調(diào)用addCount更新Map中的節(jié)點計數(shù)悄泥。
所以接下來要講一下addCount方法:
addCount方法用于更新Map中節(jié)點計數(shù),更新節(jié)點個數(shù)時也做了對應(yīng)的優(yōu)化肤粱,其中采用了和
LongAdder
一樣的實現(xiàn)方式弹囚,具體實現(xiàn)過程可以看LongAdder
的實現(xiàn)過程;元素個數(shù)更新完成后再判斷是否需要擴(kuò)容狼犯,我主要對判斷開始擴(kuò)容或協(xié)助擴(kuò)容的各種條件進(jìn)行解釋余寥。
/**
*
* @param x the count to add
* @param check if <0, don't check resize, if <= 1 only check if uncontended
*/
private final void addCount(long x, int check) {
CounterCell[] as; long b, s;
if ((as = counterCells) != null ||
!U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
CounterCell a; long v; int m;
boolean uncontended = true;
if (as == null || (m = as.length - 1) < 0 ||
(a = as[ThreadLocalRandom.getProbe() & m]) == null ||
!(uncontended =
U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {
fullAddCount(x, uncontended);
return;
}
if (check <= 1)
return;
s = sumCount(); //統(tǒng)計map中的元素個數(shù)
}
if (check >= 0) { // 檢查是否需要協(xié)助擴(kuò)容
Node<K,V>[] tab, nt; int n, sc;
while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
(n = tab.length) < MAXIMUM_CAPACITY) { // map中的元素個數(shù)已經(jīng)大于擴(kuò)容閾值且小于最大閾值,table需要擴(kuò)容
int rs = resizeStamp(n); //計算擴(kuò)容戳
if (sc < 0) { // 表示正在擴(kuò)容或者table初始化
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 || // sizeCtl無符號右移16位得到擴(kuò)容戳悯森,擴(kuò)容戳不同說明當(dāng)前線程已經(jīng)滯后其他線程宋舷,其他線程已經(jīng)開啟了新一輪擴(kuò)容任務(wù),不能再去擴(kuò)容瓢姻,sc == rs + 1 目前沒看懂
sc == rs + MAX_RESIZERS || (nt = nextTable) == null || //擴(kuò)容線程數(shù)大于最大擴(kuò)容線程數(shù)祝蝠,nextTable為空表示沒有在擴(kuò)容,不需要協(xié)助
transferIndex <= 0) // transferIndex < 0 表示其他線程已經(jīng)把擴(kuò)容子任務(wù)領(lǐng)取完畢幻碱,也不需要協(xié)助擴(kuò)容
break;
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) // 使用cas方式把sizeCtl加1绎狭,代表增加一個協(xié)助擴(kuò)容的線程,并令當(dāng)前線程去協(xié)助擴(kuò)容褥傍,當(dāng)前線程協(xié)助完成后需要把sizeCtl減1儡嘶,所以sizeCtl<0時可以利用sizeCtl計算出擴(kuò)容線程的個數(shù)
transfer(tab, nt);
}
else if (U.compareAndSwapInt(this, SIZECTL, sc,
(rs << RESIZE_STAMP_SHIFT) + 2)) //當(dāng)前沒有線程在擴(kuò)容,則把擴(kuò)容戳rs 左移16位加2得到一個負(fù)值恍风,用cas方式更新到sizeCtl中蹦狂,更新成功則作為第一個擴(kuò)容線程執(zhí)行擴(kuò)容任務(wù)
transfer(tab, null);
s = sumCount();
}
}
}
helpTransfer 主要用于通過計算來驗證Map是否需要協(xié)助擴(kuò)容,如果Map正在擴(kuò)容且擴(kuò)容未結(jié)束則協(xié)助擴(kuò)容朋贬,并通過transfer執(zhí)行擴(kuò)容過程凯楔。
/**
* Helps transfer if a resize is in progress.
* 當(dāng)其他線程進(jìn)來時發(fā)現(xiàn)當(dāng)前Map正在擴(kuò)容,則判斷是否需要幫助擴(kuò)容
*/
final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) {
Node<K,V>[] nextTab; int sc;
if (tab != null && (f instanceof ForwardingNode) &&
(nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) {//當(dāng)前table數(shù)組和nextTable數(shù)組都不為空锦募,而且當(dāng)前節(jié)點為ForwardingNode摆屯,說明HashMap正在擴(kuò)容
int rs = resizeStamp(tab.length); //以當(dāng)前數(shù)組的大小通過移位的方式生成擴(kuò)容戳,保證每次擴(kuò)容過程都生成唯一的擴(kuò)容戳
while (nextTab == nextTable && table == tab && //指向table的指針和nextTab的指針沒有被其他線程更新
(sc = sizeCtl) < 0) { // sizeCtl小于0時糠亩,可以通過移位反解出正在擴(kuò)容的線程數(shù)虐骑,代表正在擴(kuò)容,大于0時代表下次擴(kuò)容的閾值
if ((sc >>> RESIZE_STAMP_SHIFT) != rs //sc右移16位若等于rs赎线,說明沒有線程在擴(kuò)容
|| sc == rs + 1 || sc == rs + MAX_RESIZERS //擴(kuò)容線程數(shù)超過限制
|| transferIndex <= 0) //transferIndex用于每次分配數(shù)據(jù)轉(zhuǎn)移任務(wù)的上界富弦,如果小于0則說明沒有可以分配的數(shù)據(jù)轉(zhuǎn)移任務(wù)
break;
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) {//cas方式更新sc,更新成功則擴(kuò)容線程數(shù)+1氛驮,并開始幫助轉(zhuǎn)移任務(wù)
transfer(tab, nextTab);
break;
}
}
return nextTab;
}
return table;
}
transfer方法為實際的擴(kuò)容實現(xiàn),實現(xiàn)過程有些復(fù)雜济似,但如果認(rèn)真看了前面關(guān)于控制變量sizeCtl矫废、擴(kuò)容戳rs以及轉(zhuǎn)移索引transferIndex的相關(guān)注釋說明盏缤,應(yīng)該不難理解。
/**
* Moves and/or copies the nodes in each bin to new table. See
* above for explanation.
*/
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
int n = tab.length, stride;
//根據(jù)table的長度及cpu核數(shù)計算轉(zhuǎn)移任務(wù)步長
if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE) // 計算轉(zhuǎn)移步長并判斷是否小于最小轉(zhuǎn)移步長
stride = MIN_TRANSFER_STRIDE; // subdivide range
if (nextTab == null) { // 第一個擴(kuò)容線程進(jìn)來需要初始化nextTable
try {
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
nextTab = nt;
} catch (Throwable ex) { // try to cope with OOME
sizeCtl = Integer.MAX_VALUE;
return;
}
nextTable = nextTab;
transferIndex = n;
}
int nextn = nextTab.length;
ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
boolean advance = true;
boolean finishing = false; // to ensure sweep before committing nextTab
for (int i = 0, bound = 0;;) {
Node<K,V> f; int fh;
while (advance) {
int nextIndex, nextBound;
if (--i >= bound || finishing) //當(dāng)前索引已經(jīng)走到了本次擴(kuò)容子任務(wù)的下界蓖扑,子任務(wù)轉(zhuǎn)移結(jié)束
advance = false;
else if ((nextIndex = transferIndex) <= 0) {//任務(wù)轉(zhuǎn)移完成
i = -1;
advance = false;
}
//通過cas方式嘗試獲取一個轉(zhuǎn)移任務(wù)(transferIndex - 轉(zhuǎn)移步長stride)唉铜,獲取成功后得到處理的下界及當(dāng)前索引
else if (U.compareAndSwapInt
(this, TRANSFERINDEX, nextIndex,
nextBound = (nextIndex > stride ?
nextIndex - stride : 0))) {
bound = nextBound; // 更新當(dāng)前子任務(wù)的下界
i = nextIndex - 1; // 更新當(dāng)前index位置
advance = false;
}
}
if (i < 0 || i >= n || i + n >= nextn) { // 擴(kuò)容結(jié)束
int sc;
if (finishing) { // 最后一個出去的線程:更新table指針及sizeCtl值
nextTable = null;
table = nextTab; // 指向擴(kuò)容后的數(shù)組
sizeCtl = (n << 1) - (n >>> 1); //sizeCtl更新為最新的擴(kuò)容閾值(2n - 0.5n = 1.5n = 2n * 0.75),移位實現(xiàn)保證高效率
return;
}
// sizeCtl減1律杠,表示減少一個擴(kuò)容線程
if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
// 判斷是否是最后一個擴(kuò)容線程潭流,如果不是則直接退出,由于第一個線程進(jìn)來時把擴(kuò)容戳rs左移16位+2更新到sizeCtl柜去,所以如果是最后一個線程的話灰嫉,sizeCtl -2 應(yīng)該等于rs左移16位
if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
return;
finishing = advance = true;//如果是最后一個線程,則結(jié)束標(biāo)志更新為真嗓奢,并且在重新檢查一遍數(shù)組
i = n; // recheck before commit
}
}
else if ((f = tabAt(tab, i)) == null) // 當(dāng)前桶節(jié)點為空讼撒,設(shè)置為轉(zhuǎn)移成轉(zhuǎn)移節(jié)點
advance = casTabAt(tab, i, null, fwd);
else if ((fh = f.hash) == MOVED) // 該桶節(jié)點已經(jīng)被轉(zhuǎn)移
advance = true; // already processed
else {
synchronized (f) { // 獲取該節(jié)點的鎖
if (tabAt(tab, i) == f) {// 獲取鎖之后再次驗證是否被其他線程修改過
Node<K,V> ln, hn;
if (fh >= 0) { // 節(jié)點HasCode大于0 代表該節(jié)點為鏈表節(jié)點
// 由于數(shù)組長度n為2的冪次方,所以當(dāng)數(shù)組長度增加到2n時股耽,原來hash到table中i的數(shù)據(jù)節(jié)點在長度為2n的table中要么在低位nextTab[i]處根盒,要么在高位nextTab[n+i]處,具體在哪個位置與(fh & n)的計算結(jié)果有關(guān)
int runBit = fh & n;
Node<K,V> lastRun = f;
// 此處循環(huán)的目的是找到鏈表中最后一個從低索引位置變到高索引位置或者從高索引位置變到低索引位置的節(jié)點lastRun物蝙,從lastRun節(jié)點到鏈表的尾節(jié)點可根據(jù)runBit直接插入到新數(shù)組nextTable的節(jié)點中炎滞,其目的是盡量減少新創(chuàng)建節(jié)點數(shù)量,直接更新指針位置
for (Node<K,V> p = f.next; p != null; p = p.next) {
int b = p.hash & n;
if (b != runBit) {
runBit = b;
lastRun = p;
}
}
if (runBit == 0) {
ln = lastRun;
hn = null;
}
else {
hn = lastRun;
ln = null;
}
// 對于lastRun之前的鏈表節(jié)點诬乞,根據(jù)hashCode&n可確定即將轉(zhuǎn)移到nextTable中的低索引位置節(jié)點(nextTab[i])還是高索引位置節(jié)點(nextTab[i + n])册赛,并形成兩個新的鏈表
for (Node<K,V> p = f; p != lastRun; p = p.next) {
int ph = p.hash; K pk = p.key; V pv = p.val;
if ((ph & n) == 0)
ln = new Node<K,V>(ph, pk, pv, ln);
else
hn = new Node<K,V>(ph, pk, pv, hn);
}
// 使用cas方式更新兩個鏈表到新數(shù)組nextTable中,并且把原來的table節(jié)點i中的數(shù)值變?yōu)檗D(zhuǎn)移節(jié)點
setTabAt(nextTab, i, ln);
setTabAt(nextTab, i + n, hn);
setTabAt(tab, i, fwd);
advance = true;
}
else if (f instanceof TreeBin) { //該節(jié)點為二叉搜索數(shù)節(jié)點(紅黑樹)
TreeBin<K,V> t = (TreeBin<K,V>)f;
TreeNode<K,V> lo = null, loTail = null;
TreeNode<K,V> hi = null, hiTail = null;
int lc = 0, hc = 0;
for (Node<K,V> e = t.first; e != null; e = e.next) {
int h = e.hash;
TreeNode<K,V> p = new TreeNode<K,V>
(h, e.key, e.val, null, null);
if ((h & n) == 0) {
if ((p.prev = loTail) == null)
lo = p;
else
loTail.next = p;
loTail = p;
++lc;
}
else {
if ((p.prev = hiTail) == null)
hi = p;
else
hiTail.next = p;
hiTail = p;
++hc;
}
}
ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
(hc != 0) ? new TreeBin<K,V>(lo) : t;
hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
(lc != 0) ? new TreeBin<K,V>(hi) : t;
setTabAt(nextTab, i, ln);
setTabAt(nextTab, i + n, hn);
setTabAt(tab, i, fwd);
advance = true;
}
}
}
}
}
}
transfer數(shù)據(jù)轉(zhuǎn)移過程可以分為如下幾個步驟:
- 第一個擴(kuò)容線程進(jìn)來后創(chuàng)建nextTable數(shù)組丽惭,并設(shè)置transferIndex击奶;
- 線程(第一個或其他)通過transferIndex-stride(擴(kuò)容步長)來領(lǐng)取一個擴(kuò)容子任務(wù),transferIndex減到0說明所有子任務(wù)領(lǐng)取完成责掏;
- 線程領(lǐng)取到擴(kuò)容子任務(wù)后設(shè)置當(dāng)前處理子任務(wù)的下界并更新當(dāng)前處理節(jié)點所在的索引位置柜砾;
- 對子任務(wù)中的每個節(jié)點,擴(kuò)容線程從后向前依次判斷該節(jié)點是否已經(jīng)轉(zhuǎn)移换衬,如果沒有轉(zhuǎn)移痰驱,則對該節(jié)點進(jìn)行加鎖,并且把節(jié)點對應(yīng)的鏈表或紅黑樹轉(zhuǎn)移到新數(shù)組nextTable中去瞳浦;
- 如果線程處理的節(jié)點索引已經(jīng)到達(dá)子任務(wù)的下界担映,則子任務(wù)執(zhí)行結(jié)束,并嘗試去領(lǐng)取新的子任務(wù)叫潦,若領(lǐng)取不到再判斷當(dāng)前線程是否是最后一個擴(kuò)容線程蝇完,若是則最后掃描一遍數(shù)組,執(zhí)行清理工作,否則直接退出短蜕。
3. 總結(jié)
JDK1.8對ConcurrentHashMap做了大量優(yōu)化氢架,本文只是對其中如果進(jìn)行多線程并發(fā)擴(kuò)容的過程做了詳解,其他方面比如高效更新元素個數(shù)(類似LongAdder)以及紅黑樹的調(diào)整將在其他文章中做詳細(xì)的解釋朋魔。