一筑公、認(rèn)識(shí)相關(guān)字段
相關(guān)字段颓鲜,
// 兩種情況
// 1. counterCells 數(shù)組未初始化,在沒有線程爭用時(shí)陕贮,將 size 的變化寫入此字段
// 2. 初始化 counterCells 數(shù)組時(shí)堕油,沒有獲取到 cellsBusy 鎖,會(huì)再次嘗試將 size 的變化寫入此字段
private transient volatile long baseCount;
// 用于同步 counterCells 數(shù)組結(jié)構(gòu)修改的樂觀鎖資源
private transient volatile int cellsBusy;
// counterCells 數(shù)組一旦初始化肮之,size 的變化將不再嘗試寫入 baseCount
// 可以將 size 的變化寫入數(shù)組中的任意元素
// 可擴(kuò)容掉缺,長度保持為 2 的冪
private transient volatile CounterCell[] counterCells;
其中,CounterCell 是 ConcurrentHashMap 的一個(gè)靜態(tài)內(nèi)部類戈擒。
/* ---------------- Counter support -------------- */
/**
* A padded cell for distributing counts. Adapted from LongAdder
* and Striped64. See their internal docs for explanation.
*/
@sun.misc.Contended static final class CounterCell {
volatile long value;
CounterCell(long x) { value = x; }
}
二眶明、計(jì)算 size 的源碼分析
計(jì)算 size 的方法調(diào)用鏈:size() -> sumCount(),
public int size() {
long n = sumCount();
return ((n < 0L) ? 0 :
(n > (long)Integer.MAX_VALUE) ? Integer.MAX_VALUE :
(int)n); // 將 n 裁剪到 [0, Integer.MAX_VALUE] 內(nèi)
}
// 計(jì)算 baseCount 字段與所有 counterCells 數(shù)組的非空元素的和
final long sumCount() {
CounterCell[] as = counterCells; CounterCell a;
long sum = baseCount;
if (as != null) {
for (int i = 0; i < as.length; ++i) {
if ((a = as[i]) != null)
sum += a.value;
}
}
return sum;
}
可以看到筐高,map 中鍵值對(duì)的個(gè)數(shù)通過求 baseCount 與 counterCells 非空元素的和得到搜囱。
那么現(xiàn)在的問題就是 baseCount 和 counterCells 里的值都是什么時(shí)候計(jì)算的呢丑瞧?
ConcurrentHashMap 內(nèi)部所有改變鍵值對(duì)個(gè)數(shù)的方法都會(huì)調(diào)用 addCount 方法更新鍵值對(duì)的計(jì)數(shù)。
addCount 方法源碼蜀肘,
// 參數(shù) x 表示鍵值對(duì)個(gè)數(shù)的變化值嗦篱,如果為正,表示新增了元素幌缝,如果為負(fù)灸促,表示刪除了元素
private final void addCount(long x, int check) {
CounterCell[] as; long b, s;
// 如果 counterCells 為空,則直接嘗試通過 CAS 將 x 累加到 baseCount 中
if ((as = counterCells) != null ||
!U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
// counterCells 非空
// 或 counterCells 為空涵卵,但 CAS baseCount 失敗都會(huì)來到這里
CounterCell a; long v; int m;
boolean uncontended = true; // CAS 數(shù)組元素時(shí)浴栽,有沒有發(fā)生線程爭用的標(biāo)志
// 如果當(dāng)前線程探針哈希到的數(shù)組元素非空,則嘗試將 x 累加到對(duì)應(yīng)數(shù)組元素
if (as == null || (m = as.length - 1) < 0 ||
(a = as[ThreadLocalRandom.getProbe() & m]) == null ||
!(uncontended =
U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {
// counterCells 為空轿偎,或其長度小于1
// 或當(dāng)前線程探針哈希到的數(shù)組元素為空
// 或當(dāng)前線程探針哈希到的數(shù)組元素非空典鸡,但 CAS 數(shù)組元素失敗
// 都會(huì)調(diào)用 fullAddCount 方法來完成 x 的寫入
fullAddCount(x, uncontended);
return; // 如果調(diào)用過 fullAddCount,則當(dāng)前線程一定不會(huì)協(xié)助擴(kuò)容
}
// 走到這說明坏晦,CAS 數(shù)組元素成功
// 此時(shí)如果 check <= 1萝玷,也不協(xié)助可能會(huì)發(fā)生的擴(kuò)容
if (check <= 1)
return;
// 如果 check 大于 1,則計(jì)算當(dāng)前 map 的 size昆婿,為判斷是否需要擴(kuò)容做準(zhǔn)備
s = sumCount();
}
// size 的變化已經(jīng)寫入完成
// 后面如果 check >= 0球碉,則判斷當(dāng)前的 size 是否會(huì)觸發(fā)擴(kuò)容
if (check >= 0) {
// 擴(kuò)容相關(guān)的邏輯
Node<K,V>[] tab, nt; int n, sc;
while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
(n = tab.length) < MAXIMUM_CAPACITY) {
int rs = resizeStamp(n);
if (sc < 0) {
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
transferIndex <= 0)
break;
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
transfer(tab, nt);
}
else if (U.compareAndSwapInt(this, SIZECTL, sc,
(rs << RESIZE_STAMP_SHIFT) + 2))
transfer(tab, null);
s = sumCount();
}
}
}
如果不管其中與擴(kuò)容有關(guān)的邏輯,addCount 方法記錄 size 變化的過程可以分為兩類情況仓蛆,
counterCells 數(shù)組未初始化
a. CAS 一次 baseCount
b. 如果 CAS 失敗睁冬,則調(diào)用 fullAddCount 方法counterCells 數(shù)組已初始化
a. CAS 一次當(dāng)前線程探針哈希到的數(shù)組元素
b. 如果 CAS 失敗,則調(diào)用 fullAddCount 方法
fullAddCount 方法的源碼看疙,
// 只被 addCount 方法調(diào)用
// 如果 counterCells 數(shù)組未初始化
// 或者線程哈希到的 counterCells 數(shù)組元素未初始化
// 或者 CAS 數(shù)組元素失敗豆拨,都會(huì)調(diào)用此方法
private final void fullAddCount(long x, boolean wasUncontended) {
int h;
// 判斷線程探針哈希值是否初始化
if ((h = ThreadLocalRandom.getProbe()) == 0) {
ThreadLocalRandom.localInit(); // force initialization
h = ThreadLocalRandom.getProbe();
wasUncontended = true; // 重新假設(shè)未發(fā)生爭用
}
boolean collide = false; // 是否要給 counterCells 擴(kuò)容的標(biāo)志
for (;;) {
CounterCell[] as; CounterCell a; int n; long v;
if ((as = counterCells) != null && (n = as.length) > 0) {
// 數(shù)組不為空且長度大于 0
if ((a = as[(n - 1) & h]) == null) {
// 嘗試初始化線程探針哈希到的數(shù)組元素
if (cellsBusy == 0) { // Try to attach new Cell
// 注意待讳,這里已經(jīng)把 x 放入對(duì)象
CounterCell r = new CounterCell(x); // Optimistic create
if (cellsBusy == 0 && // 準(zhǔn)備初始化數(shù)組元素本讥,要求 cellsBusy 為 0湖蜕,并嘗試將其置 1
U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
// 獲得 cellsBusy 鎖
boolean created = false;
try { // Recheck under lock
CounterCell[] rs; int m, j;
// 判斷有沒有被其它線程初始化
if ((rs = counterCells) != null &&
(m = rs.length) > 0 &&
rs[j = (m - 1) & h] == null) {
rs[j] = r;
created = true;
}
} finally {
cellsBusy = 0; // 釋放 cellsBusy 鎖
}
if (created) // 初始化元素成功蔗候,直接退出循環(huán)
break;
continue; // Slot is now non-empty
}
}
collide = false;
}
else if (!wasUncontended) // CAS already known to fail
wasUncontended = true; // Continue after rehash(指的是更改當(dāng)前線程的探針哈希值)
// wasUncontended 為 true 執(zhí)行到這
// 嘗試將 x 累加進(jìn)數(shù)組元素
else if (U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))
break;
// CAS 失敗
// 判斷 counterCells 是否正在擴(kuò)容碧浊,或數(shù)組長度是否大于等于處理器數(shù)
else if (counterCells != as || n >= NCPU)
collide = false; // At max size or stale
// 如果數(shù)組沒有在擴(kuò)容块促,且數(shù)組長度小于處理器數(shù)
// 此時(shí)抹蚀,如果 collide 為 false植影,則把它變成 true
// 在下一輪循環(huán)中丰涉,如果 CAS 數(shù)組元素繼續(xù)失敗拓巧,就會(huì)觸發(fā) counterCells 擴(kuò)容
else if (!collide)
collide = true;
// 如果 collide 為 true,則嘗試給 counterCells 數(shù)組擴(kuò)容
else if (cellsBusy == 0 &&
U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
try {
if (counterCells == as) {// Expand table unless stale
CounterCell[] rs = new CounterCell[n << 1];
for (int i = 0; i < n; ++i)
rs[i] = as[i];
counterCells = rs;
}
} finally {
cellsBusy = 0;
}
collide = false;
continue; // Retry with expanded table
}
h = ThreadLocalRandom.advanceProbe(h); // 更改當(dāng)前線程的探針哈希值
}
// counterCells 數(shù)組為空或長度為 0
else if (cellsBusy == 0 && counterCells == as &&
U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
// 獲取 cellsBusy 鎖
boolean init = false;
try { // Initialize table
if (counterCells == as) {
CounterCell[] rs = new CounterCell[2]; // 初始長度為 2
rs[h & 1] = new CounterCell(x);
counterCells = rs;
init = true;
}
} finally {
cellsBusy = 0;
}
if (init)
break;
}
// counterCells 數(shù)組為空或長度為 0一死,并且獲取 cellsBusy 鎖失敗
// 則會(huì)再次嘗試將 x 累加到 baseCount
else if (U.compareAndSwapLong(this, BASECOUNT, v = baseCount, v + x))
break; // Fall back on using base
} // end for
}
這個(gè)方法細(xì)節(jié)較多肛度,比較復(fù)雜。
細(xì)節(jié)方面請參考上面的源碼和注釋投慈。
下面承耿,我們只從整體上看它實(shí)現(xiàn)了哪些功能冠骄,
- 線程探針哈希值的初始化。
- counterCells 數(shù)組的初始化和擴(kuò)容加袋。
- counterCells 元素的初始化凛辣。
- 將 size 的變化,寫入 counterCells 中的某一個(gè)元素职烧。(如果 counterCells 初始化時(shí)扁誓,獲取鎖失敗,則還會(huì)嘗試將 size 的變化蚀之,寫入 baseCount蝗敢。)
三、小小總結(jié)
代碼雖然看起來很復(fù)雜足删,但 Doug Lea 計(jì)算 size 的思想很明確寿谴,也很巧妙。
指導(dǎo)思想: 盡量降低線程沖突失受,以最快的速度寫入 size 的變化讶泰。
如何降低沖突?
如果沒有沖突發(fā)生拂到,只將 size 的變化寫入 baseCount痪署。
一旦發(fā)生沖突,就用一個(gè)數(shù)組(counterCells)來存儲(chǔ)后續(xù)所有 size 的變化谆焊。這樣惠桃,線程只要對(duì)任意一個(gè)數(shù)組元素寫入 size 變化成功即可浦夷,數(shù)組長度越長辖试,線程發(fā)生沖突的可能性就越小。
關(guān)于 counterCells 擴(kuò)容:
如果 CAS 數(shù)組元素連續(xù)失敗兩次劈狐,就會(huì)進(jìn)行 counterCells 數(shù)組的擴(kuò)容罐孝,直到達(dá)到機(jī)器的處理器數(shù)為止。
比如我的機(jī)器是雙核四線程肥缔,真正能并行的線程數(shù)是 4莲兢,所以在我機(jī)器上 counterCells 初始化后,最多擴(kuò)容一次续膳。
關(guān)于線程的探針哈希值是如何初始化和更改的改艇,可以參考:關(guān)于 ConcurrentHashMap 1.8 中的線程探針哈希