jdk8中,采用多線程擴(kuò)容。整個擴(kuò)容過程,通過CAS設(shè)置sizeCtl遇革,transferIndex等變量協(xié)調(diào)多個線程進(jìn)行并發(fā)擴(kuò)容卿闹。
擴(kuò)容相關(guān)的屬性
nextTable
擴(kuò)容期間,將table數(shù)組中的元素 遷移到 nextTable萝快。
/**
* The next table to use; non-null only while resizing.
擴(kuò)容時锻霎,將table中的元素遷移至nextTable . 擴(kuò)容時非空
*/
private transient volatile Node<K,V>[] nextTable;
sizeCtl屬性
private transient volatile int sizeCtl;
多線程之間,以volatile的方式讀取sizeCtl屬性揪漩,來判斷ConcurrentHashMap當(dāng)前所處的狀態(tài)旋恼。通過cas設(shè)置sizeCtl屬性,告知其他線程ConcurrentHashMap的狀態(tài)變更奄容。
不同狀態(tài)冰更,sizeCtl所代表的含義也有所不同。
- 未初始化:
- sizeCtl=0:表示沒有指定初始容量昂勒。
- sizeCtl>0:表示初始容量蜀细。
-
初始化中:
- sizeCtl=-1,標(biāo)記作用,告知其他線程戈盈,正在初始化
-
正常狀態(tài):
- sizeCtl=0.75n ,擴(kuò)容閾值
-
擴(kuò)容中:
- sizeCtl < 0 : 表示有其他線程正在執(zhí)行擴(kuò)容
- sizeCtl = (resizeStamp(n) << RESIZE_STAMP_SHIFT) + 2 :表示此時只有一個線程在執(zhí)行擴(kuò)容
ConcurrentHashMap的狀態(tài)圖如下:
transferIndex屬性
private transient volatile int transferIndex;
/**
擴(kuò)容線程每次最少要遷移16個hash桶
*/
private static final int MIN_TRANSFER_STRIDE = 16;
擴(kuò)容索引奠衔,表示已經(jīng)分配給擴(kuò)容線程的table數(shù)組索引位置。主要用來協(xié)調(diào)多個線程塘娶,并發(fā)安全地
獲取遷移任務(wù)(hash桶)归斤。
1 在擴(kuò)容之前,transferIndex 在數(shù)組的最右邊 刁岸。此時有一個線程發(fā)現(xiàn)已經(jīng)到達(dá)擴(kuò)容閾值脏里,準(zhǔn)備開始擴(kuò)容。
2 擴(kuò)容線程虹曙,在遷移數(shù)據(jù)之前膝宁,首先要將transferIndex左移(以cas的方式修改 transferIndex=transferIndex-stride(要遷移hash桶的個數(shù))),獲取遷移任務(wù)根吁。每個擴(kuò)容線程都會通過for循環(huán)+CAS的方式設(shè)置transferIndex员淫,因此可以確保多線程擴(kuò)容的并發(fā)安全。
換個角度击敌,我們可以將待遷移的table數(shù)組介返,看成一個任務(wù)隊(duì)列,transferIndex看成任務(wù)隊(duì)列的頭指針。而擴(kuò)容線程圣蝎,就是這個隊(duì)列的消費(fèi)者刃宵。擴(kuò)容線程通過CAS設(shè)置transferIndex索引的過程,就是消費(fèi)者從任務(wù)隊(duì)列中獲取任務(wù)的過程徘公。為了性能考慮牲证,我們當(dāng)然不會每次只獲取一個任務(wù)(hash桶),因此ConcurrentHashMap規(guī)定关面,每次至少要獲取16個遷移任務(wù)(遷移16個hash桶坦袍,MIN_TRANSFER_STRIDE = 16)
cas設(shè)置transferIndex的源碼如下:
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
//計(jì)算每次遷移的node個數(shù)
if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
stride = MIN_TRANSFER_STRIDE; // 確保每次遷移的node個數(shù)不少于16個
...
for (int i = 0, bound = 0;;) {
...
//cas無鎖算法設(shè)置 transferIndex = transferIndex - stride
if (U.compareAndSwapInt
(this, TRANSFERINDEX, nextIndex,
nextBound = (nextIndex > stride ?
nextIndex - stride : 0))) {
...
...
}
...//省略遷移邏輯
}
}
ForwardingNode節(jié)點(diǎn)
標(biāo)記作用,表示其他線程正在擴(kuò)容等太,并且此節(jié)點(diǎn)已經(jīng)擴(kuò)容完畢
關(guān)聯(lián)了nextTable,擴(kuò)容期間可以通過find方法捂齐,訪問已經(jīng)遷移到了nextTable中的數(shù)據(jù)
static final class ForwardingNode<K,V> extends Node<K,V> {
final Node<K,V>[] nextTable;
ForwardingNode(Node<K,V>[] tab) {
//hash值為MOVED(-1)的節(jié)點(diǎn)就是ForwardingNode
super(MOVED, null, null, null);
this.nextTable = tab;
}
//通過此方法,訪問被遷移到nextTable中的數(shù)據(jù)
Node<K,V> find(int h, Object k) {
...
}
}
何時擴(kuò)容
1 當(dāng)前容量超過閾值
final V putVal(K key, V value, boolean onlyIfAbsent) {
...
addCount(1L, binCount);
...
}
private final void addCount(long x, int check) {
...
if (check >= 0) {
Node<K,V>[] tab, nt; int n, sc;
//s>=sizeCtl 即容量達(dá)到擴(kuò)容閾值缩抡,需要擴(kuò)容
while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
(n = tab.length) < MAXIMUM_CAPACITY) {
//調(diào)用transfer()擴(kuò)容
...
}
}
}
2 當(dāng)鏈表中元素個數(shù)超過默認(rèn)設(shè)定(8個)奠宜,當(dāng)數(shù)組的大小還未超過64的時候,此時進(jìn)行數(shù)組的擴(kuò)容瞻想,如果超過則將鏈表轉(zhuǎn)化成紅黑樹
final V putVal(K key, V value, boolean onlyIfAbsent) {
...
if (binCount != 0) {
//鏈表中元素個數(shù)超過默認(rèn)設(shè)定(8個)
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i);
if (oldVal != null)
return oldVal;
break;
}
...
}
private final void treeifyBin(Node<K,V>[] tab, int index) {
Node<K,V> b; int n, sc;
if (tab != null) {
//數(shù)組的大小還未超過64
if ((n = tab.length) < MIN_TREEIFY_CAPACITY)
//擴(kuò)容
tryPresize(n << 1);
else if ((b = tabAt(tab, index)) != null && b.hash >= 0) {
//轉(zhuǎn)換成紅黑樹
...
}
}
}
3 當(dāng)發(fā)現(xiàn)其他線程擴(kuò)容時压真,幫其擴(kuò)容
final V putVal(K key, V value, boolean onlyIfAbsent) {
...
//f.hash == MOVED 表示為:ForwardingNode,說明其他線程正在擴(kuò)容
else if ((fh = f.hash) == MOVED)
tab = helpTransfer(tab, f);
...
}
擴(kuò)容過程分析
- 線程執(zhí)行put操作蘑险,發(fā)現(xiàn)容量已經(jīng)達(dá)到擴(kuò)容閾值榴都,需要進(jìn)行擴(kuò)容操作,此時transferindex=tab.length=32
- 擴(kuò)容線程A 以cas的方式修改transferindex=31-16=16 ,然后按照降序遷移table[31]--table[16]這個區(qū)間的hash桶
- 遷移hash桶時漠其,會將桶內(nèi)的鏈表或者紅黑樹嘴高,按照一定算法,拆分成2份和屎,將其插入nextTable[i]和nextTable[i+n](n是table數(shù)組的長度)拴驮。 遷移完畢的hash桶,會被設(shè)置成ForwardingNode節(jié)點(diǎn),以此告知訪問此桶的其他線程柴信,此節(jié)點(diǎn)已經(jīng)遷移完畢套啤。
相關(guān)代碼如下:
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
...//省略無關(guān)代碼
synchronized (f) {
//將node鏈表,分成2個新的node鏈表
for (Node<K,V> p = f; p != lastRun; p = p.next) {
int ph = p.hash; K pk = p.key; V pv = p.val;
if ((ph & n) == 0)
ln = new Node<K,V>(ph, pk, pv, ln);
else
hn = new Node<K,V>(ph, pk, pv, hn);
}
//將新node鏈表賦給nextTab
setTabAt(nextTab, i, ln);
setTabAt(nextTab, i + n, hn);
setTabAt(tab, i, fwd);
}
...//省略無關(guān)代碼
}
- 此時線程2訪問到了ForwardingNode節(jié)點(diǎn)随常,如果線程2執(zhí)行的put或remove等寫操作潜沦,那么就會先幫其擴(kuò)容。如果線程2執(zhí)行的是get等讀方法绪氛,則會調(diào)用ForwardingNode的find方法唆鸡,去nextTable里面查找相關(guān)元素。
- 線程2加入擴(kuò)容操作
- 如果準(zhǔn)備加入擴(kuò)容的線程枣察,發(fā)現(xiàn)以下情況争占,放棄擴(kuò)容燃逻,直接返回。
- 發(fā)現(xiàn)transferIndex=0,即所有node均已分配
- 發(fā)現(xiàn)擴(kuò)容線程已經(jīng)達(dá)到最大擴(kuò)容線程數(shù)
部分源碼分析
tryPresize方法
協(xié)調(diào)多個線程如何調(diào)用transfer方法進(jìn)行hash桶的遷移(addCount臂痕,helpTransfer 方法中也有類似的邏輯)
private final void tryPresize(int size) {
//計(jì)算擴(kuò)容的目標(biāo)size
int c = (size >= (MAXIMUM_CAPACITY >>> 1)) ? MAXIMUM_CAPACITY :
tableSizeFor(size + (size >>> 1) + 1);
int sc;
while ((sc = sizeCtl) >= 0) {
Node<K,V>[] tab = table; int n;
//tab沒有初始化
if (tab == null || (n = tab.length) == 0) {
n = (sc > c) ? sc : c;
//初始化之前伯襟,CAS設(shè)置sizeCtl=-1
if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
try {
if (table == tab) {
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
table = nt;
//sc=0.75n,相當(dāng)于擴(kuò)容閾值
sc = n - (n >>> 2);
}
} finally {
//此時并沒有通過CAS賦值,因?yàn)槠渌胍獔?zhí)行初始化的線程握童,發(fā)現(xiàn)sizeCtl=-1姆怪,就直接返回,從而確保任何情況澡绩,只會有一個線程執(zhí)行初始化操作稽揭。
sizeCtl = sc;
}
}
}
//目標(biāo)擴(kuò)容size小于擴(kuò)容閾值,或者容量超過最大限制時英古,不需要擴(kuò)容
else if (c <= sc || n >= MAXIMUM_CAPACITY)
break;
//擴(kuò)容
else if (tab == table) {
int rs = resizeStamp(n);
//sc<0表示淀衣,已經(jīng)有其他線程正在擴(kuò)容
if (sc < 0) {
Node<K,V>[] nt;
/**
1 (sc >>> RESIZE_STAMP_SHIFT) != rs :擴(kuò)容線程數(shù) > MAX_RESIZERS-1
2 sc == rs + 1 和 sc == rs + MAX_RESIZERS :表示什么昙读?召调??
3 (nt = nextTable) == null :表示nextTable正在初始化
4 transferIndex <= 0 :表示所有hash桶均分配出去
*/
//如果不需要幫其擴(kuò)容蛮浑,直接返回
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
transferIndex <= 0)
break;
//CAS設(shè)置sizeCtl=sizeCtl+1
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
//幫其擴(kuò)容
transfer(tab, nt);
}
//第一個執(zhí)行擴(kuò)容操作的線程唠叛,將sizeCtl設(shè)置為:(resizeStamp(n) << RESIZE_STAMP_SHIFT) + 2)
else if (U.compareAndSwapInt(this, SIZECTL, sc,
(rs << RESIZE_STAMP_SHIFT) + 2))
transfer(tab, null);
}
}
}
transfer方法
負(fù)責(zé)遷移node節(jié)點(diǎn)
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
int n = tab.length, stride;
//計(jì)算需要遷移多少個hash桶(MIN_TRANSFER_STRIDE該值作為下限,以避免擴(kuò)容線程過多)
if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
stride = MIN_TRANSFER_STRIDE; // subdivide range
if (nextTab == null) { // initiating
try {
//擴(kuò)容一倍
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
nextTab = nt;
} catch (Throwable ex) { // try to cope with OOME
sizeCtl = Integer.MAX_VALUE;
return;
}
nextTable = nextTab;
transferIndex = n;
}
int nextn = nextTab.length;
ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
boolean advance = true;
boolean finishing = false; // to ensure sweep before committing nextTab
//1 逆序遷移已經(jīng)獲取到的hash桶集合沮稚,如果遷移完畢艺沼,則更新transferIndex,獲取下一批待遷移的hash桶
//2 如果transferIndex=0蕴掏,表示所以hash桶均被分配障般,將i置為-1,準(zhǔn)備退出transfer方法
for (int i = 0, bound = 0;;) {
Node<K,V> f; int fh;
//更新待遷移的hash桶索引
while (advance) {
int nextIndex, nextBound;
//更新遷移索引i盛杰。
if (--i >= bound || finishing)
advance = false;
else if ((nextIndex = transferIndex) <= 0) {
//transferIndex<=0表示已經(jīng)沒有需要遷移的hash桶挽荡,將i置為-1,線程準(zhǔn)備退出
i = -1;
advance = false;
}
//當(dāng)遷移完bound這個桶后即供,嘗試更新transferIndex定拟,,獲取下一批待遷移的hash桶
else if (U.compareAndSwapInt
(this, TRANSFERINDEX, nextIndex,
nextBound = (nextIndex > stride ?
nextIndex - stride : 0))) {
bound = nextBound;
i = nextIndex - 1;
advance = false;
}
}
//退出transfer
if (i < 0 || i >= n || i + n >= nextn) {
int sc;
if (finishing) {
//最后一個遷移的線程逗嫡,recheck后青自,做收尾工作,然后退出
nextTable = null;
table = nextTab;
sizeCtl = (n << 1) - (n >>> 1);
return;
}
if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
/**
第一個擴(kuò)容的線程驱证,執(zhí)行transfer方法之前延窜,會設(shè)置 sizeCtl = (resizeStamp(n) << RESIZE_STAMP_SHIFT) + 2)
后續(xù)幫其擴(kuò)容的線程,執(zhí)行transfer方法之前抹锄,會設(shè)置 sizeCtl = sizeCtl+1
每一個退出transfer的方法的線程需曾,退出之前吗坚,會設(shè)置 sizeCtl = sizeCtl-1
那么最后一個線程退出時:
必然有sc == (resizeStamp(n) << RESIZE_STAMP_SHIFT) + 2),即 (sc - 2) == resizeStamp(n) << RESIZE_STAMP_SHIFT
*/
//不相等呆万,說明不到最后一個線程商源,直接退出transfer方法
if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
return;
finishing = advance = true;
//最后退出的線程要重新check下是否全部遷移完畢
i = n; // recheck before commit
}
}
else if ((f = tabAt(tab, i)) == null)
advance = casTabAt(tab, i, null, fwd);
else if ((fh = f.hash) == MOVED)
advance = true; // already processed
//遷移node節(jié)點(diǎn)
else {
synchronized (f) {
if (tabAt(tab, i) == f) {
Node<K,V> ln, hn;
//鏈表遷移
if (fh >= 0) {
int runBit = fh & n;
Node<K,V> lastRun = f;
for (Node<K,V> p = f.next; p != null; p = p.next) {
int b = p.hash & n;
if (b != runBit) {
runBit = b;
lastRun = p;
}
}
if (runBit == 0) {
ln = lastRun;
hn = null;
}
else {
hn = lastRun;
ln = null;
}
//將node鏈表,分成2個新的node鏈表
for (Node<K,V> p = f; p != lastRun; p = p.next) {
int ph = p.hash; K pk = p.key; V pv = p.val;
if ((ph & n) == 0)
ln = new Node<K,V>(ph, pk, pv, ln);
else
hn = new Node<K,V>(ph, pk, pv, hn);
}
//將新node鏈表賦給nextTab
setTabAt(nextTab, i, ln);
setTabAt(nextTab, i + n, hn);
setTabAt(tab, i, fwd);
advance = true;
}
//紅黑樹遷移
else if (f instanceof TreeBin) {
TreeBin<K,V> t = (TreeBin<K,V>)f;
TreeNode<K,V> lo = null, loTail = null;
TreeNode<K,V> hi = null, hiTail = null;
int lc = 0, hc = 0;
for (Node<K,V> e = t.first; e != null; e = e.next) {
int h = e.hash;
TreeNode<K,V> p = new TreeNode<K,V>
(h, e.key, e.val, null, null);
if ((h & n) == 0) {
if ((p.prev = loTail) == null)
lo = p;
else
loTail.next = p;
loTail = p;
++lc;
}
else {
if ((p.prev = hiTail) == null)
hi = p;
else
hiTail.next = p;
hiTail = p;
++hc;
}
}
ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
(hc != 0) ? new TreeBin<K,V>(lo) : t;
hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
(lc != 0) ? new TreeBin<K,V>(hi) : t;
setTabAt(nextTab, i, ln);
setTabAt(nextTab, i + n, hn);
setTabAt(tab, i, fwd);
advance = true;
}
}
}
}
}
}
總結(jié)
多線程無鎖擴(kuò)容的關(guān)鍵就是通過CAS設(shè)置sizeCtl與transferIndex變量谋减,協(xié)調(diào)多個線程對table數(shù)組中的node進(jìn)行遷移牡彻。