一激涤、原理介紹
關(guān)于LongAdder這個(gè)類(lèi)可能很多朋友不太熟悉拟糕,我簡(jiǎn)單的對(duì)LongAdder介紹一下。
LongAdder被設(shè)計(jì)出來(lái)是為了用于高并發(fā)下的自增倦踢,說(shuō)到自增操作送滞,為了保證線(xiàn)程安全,可能很多朋友用過(guò)AtomicInteger或者AtomicLong這兩個(gè)類(lèi)辱挥,那為什么不用這兩個(gè)類(lèi)來(lái)完成自增而要用LongAdder呢犁嗅?這是因?yàn)锳tomicInteger和AtomicLong的底層都是直接使用CAS來(lái)修改類(lèi)中的value值,假如有N個(gè)線(xiàn)程同時(shí)修改晤碘,每次只能有一個(gè)線(xiàn)程修改成功褂微,另外N-1個(gè)線(xiàn)程會(huì)修改失敗然后進(jìn)行自旋功蜓,問(wèn)題的關(guān)鍵就在這里,自旋是需要消耗CPU時(shí)間片的宠蚂,所以當(dāng)并發(fā)量較大時(shí)式撼,自旋所占用CPU的時(shí)間就會(huì)很長(zhǎng)湃鹊。
為了解決AtomicInteger和AtomicLong的缺點(diǎn)才有了LongAdder類(lèi)。LongAdder的思想是化整為零盆耽,如果在沒(méi)有線(xiàn)程競(jìng)爭(zhēng)的情況下進(jìn)行自增寂嘉,那么與AtomicInteger和AtomicLong沒(méi)有什么區(qū)別,直接累加即可塞茅。如果在高并發(fā)的環(huán)境中,LongAdder會(huì)創(chuàng)建一個(gè)cell數(shù)組,給每個(gè)線(xiàn)程分配一個(gè)桶位十艾,一個(gè)線(xiàn)程對(duì)應(yīng)一個(gè)cell,讓每個(gè)線(xiàn)程在各自的桶位進(jìn)行累加腾节,最后把cell數(shù)組中的所有值加起來(lái)就是最終結(jié)果忘嫉。
二、Striped64類(lèi)
進(jìn)入LongAdder源碼中可以看見(jiàn)LongAdder繼承了Striped64案腺,先來(lái)看看Striped64中的核心屬性庆冕。
Cell是一個(gè)內(nèi)部類(lèi),類(lèi)中有value值和cas修改value的方法劈榨,關(guān)于@sun.misc.Contended注解可以看一下https://www.cnblogs.com/eycuii/p/11525164.html访递。
@sun.misc.Contended
static final class Cell { // 這里的cell就是用來(lái)讓每個(gè)線(xiàn)程來(lái)累加的對(duì)象
// 實(shí)際累加的變量
volatile long value;
Cell(long x) { value = x; }
// 封裝cas操作來(lái)修改value
final boolean cas(long cmp, long val) {
return UNSAFE.compareAndSwapLong(this, valueOffset, cmp, val);
}
private static final sun.misc.Unsafe UNSAFE;
// value在內(nèi)存中的偏移量
private static final long valueOffset;
static {
try {
UNSAFE = sun.misc.Unsafe.getUnsafe();
Class<?> ak = Cell.class;
valueOffset = UNSAFE.objectFieldOffset
(ak.getDeclaredField("value"));
} catch (Exception e) {
throw new Error(e);
}
}
}
/**
* 獲得當(dāng)前物理機(jī)的CPU數(shù)量,會(huì)被用于限制cell[]的長(zhǎng)度
*/
static final int NCPU = Runtime.getRuntime().availableProcessors();
/**
* cell數(shù)組同辣,數(shù)組長(zhǎng)度一定為2的整數(shù)次冪
*/
transient volatile Cell[] cells;
/**
* base主要在沒(méi)有爭(zhēng)用時(shí)使用拷姿,通過(guò) CAS 更新
*/
transient volatile long base;
/**
* cellBusy就是一把鎖,需要拿到鎖才能進(jìn)行初始化或者擴(kuò)容
* 0表示為加鎖旱函,1表示已加鎖
*/
transient volatile int cellsBusy;
// cas修改base
final boolean casBase(long cmp, long val) {
return UNSAFE.compareAndSwapLong(this, BASE, cmp, val);
}
// 加鎖响巢,初始化或者擴(kuò)容時(shí)鎖住cell[]
final boolean casCellsBusy() {
return UNSAFE.compareAndSwapInt(this, CELLSBUSY, 0, 1);
}
// 獲得線(xiàn)程的探針hash值
static final int getProbe() {
return UNSAFE.getInt(Thread.currentThread(), PROBE);
}
// 設(shè)置線(xiàn)程的探針hash值,其實(shí)就是rehash操作
static final int advanceProbe(int probe) {
probe ^= probe << 13;
probe ^= probe >>> 17;
probe ^= probe << 5;
UNSAFE.putInt(Thread.currentThread(), PROBE, probe);
return probe;
}
三棒妨、add()
add方法是LongAdder的最基本方法踪古,其他的方法均以add來(lái)展開(kāi),我會(huì)逐行分析券腔,請(qǐng)看我的注釋伏穆。
/**
* 進(jìn)行累加
* x:需要累加的值
*/
public void add(long x) {
/**
* as:cell[],用于多線(xiàn)程的累加
* b:base值
* v:cell進(jìn)行cas操作時(shí)的期望值
* m:cell[]長(zhǎng)度 - 1
* a:當(dāng)前線(xiàn)程對(duì)應(yīng)的cell
*/
Cell[] as; long b, v; int m; Cell a;
/**
* case1:cell[]非空(產(chǎn)生競(jìng)爭(zhēng)時(shí)才會(huì)初始化颅眶,非空表示已經(jīng)初始化了)蜈出,
* 如果已初始化,那么線(xiàn)程需要把x累加到對(duì)應(yīng)的cell中涛酗。
* case2:前置條件:cell[]未初始化铡原,casBase是Striped64封裝的cas操作修改base的一個(gè)方法偷厦,
* case1不滿(mǎn)足說(shuō)明之前未產(chǎn)生線(xiàn)程競(jìng)爭(zhēng),這里直接嘗試修改base燕刻,若修改成功說(shuō)明沒(méi)有競(jìng)爭(zhēng)只泼,
* 若失敗說(shuō)明修改base產(chǎn)生了競(jìng)爭(zhēng),需要?jiǎng)?chuàng)建cell[]進(jìn)入數(shù)組累加卵洗。
*/
if ((as = cells) != null || !casBase(b = base, b + x)) {
// true表示產(chǎn)生競(jìng)爭(zhēng)请唱,false表示產(chǎn)生了競(jìng)爭(zhēng)
boolean uncontended = true;
/**
* case1:cell[]是否初始化,true表示未初始化过蹂,直接進(jìn)入longAccumulate()
* case2:cell[]長(zhǎng)度是否為0十绑,防止某些異常情況出現(xiàn)cell[]不為null但是長(zhǎng)度為0
* case3:線(xiàn)程桶位中cell對(duì)象是否被初始化,getProbe()表示獲取線(xiàn)程的探測(cè)hash值酷勺,
* 可以理解為hashCode一樣的東西本橙,hash & m的作用就是計(jì)算線(xiàn)程對(duì)應(yīng)的桶位
* (這個(gè)原理在HashMap中講過(guò)),true表示cell未初始化脆诉。
* case4:前置條件:cell[]和線(xiàn)程對(duì)應(yīng)的cell對(duì)象已初始化甚亭。嘗試修改cell中的value,
* 若修改成功說(shuō)明沒(méi)有競(jìng)爭(zhēng)击胜,若修改失敗說(shuō)明修改value產(chǎn)生了競(jìng)爭(zhēng)亏狰。
*/
if (as == null || (m = as.length - 1) < 0 ||
(a = as[getProbe() & m]) == null ||
!(uncontended = a.cas(v = a.value, v + x)))
// 進(jìn)入的條件:cell[]未初始化 ||
// 線(xiàn)程對(duì)應(yīng)的cell未初始化 ||
// 修改value失敗(有競(jìng)爭(zhēng))
longAccumulate(x, null, uncontended);
}
}
四偶摔、longAccumulate()
longAccumulate()方法中非常復(fù)雜暇唾,有大量的if/else,在看的時(shí)候一定要慢慢捋清楚啰挪,最好配合IDEA把if/else塊縮起來(lái)一個(gè)一個(gè)看信不,閱讀if/else內(nèi)部邏輯時(shí)需要明白進(jìn)入這個(gè)if/else塊的條件和前置條件,我大致列舉一下邏輯順序:
- cell[]已初始化亡呵,線(xiàn)程對(duì)應(yīng)的cell未初始化
- 創(chuàng)建cell對(duì)象
- 嘗試加鎖抽活,加鎖成功 -> 關(guān)聯(lián)cell對(duì)象至桶位,加鎖失敗 -> 進(jìn)行rehash -> 自旋
- cell[]已初始化锰什,線(xiàn)程對(duì)應(yīng)的cell已初始化
- 判斷是否有競(jìng)爭(zhēng)下硕,有 -> rehash之后自旋 -> cas修改對(duì)應(yīng)的cell,無(wú) -> cas修改對(duì)應(yīng)的cell
- 修改成功 -> 退出汁胆,修改失敗 -> 擴(kuò)容(因?yàn)槭〉拇螖?shù)太多梭姓,所以考慮擴(kuò)容)
- 判斷當(dāng)前cell[]長(zhǎng)度是否大于等于 CPU核心數(shù) && 檢查當(dāng)前是否有其他線(xiàn)程已經(jīng)完成了擴(kuò)容操作
- 是 -> 本次循環(huán)無(wú)法擴(kuò)容 -> rehash -> 自旋,否 -> 修改擴(kuò)容意向?yàn)閠rue -> 自旋(擴(kuò)容之前再次嘗試修改cell嫩码,修改失敗則擴(kuò)容)
- 擴(kuò)容結(jié)束 -> 自旋
- cell[]未初始化
- 直接進(jìn)行初始化(需要加鎖以及檢查是否已被其他線(xiàn)程初始化)
整體上來(lái)看是按照這三個(gè)大塊的邏輯來(lái)走誉尖,其中內(nèi)部又會(huì)有很多細(xì)節(jié),比如擴(kuò)容和初始化需要double check铸题,以及擴(kuò)容前的重新自旋一次等等铡恕。代碼的邏輯設(shè)計(jì)的很巧妙琢感,也很?chē)?yán)謹(jǐn),一定要慢慢讀懂探熔。
/**
* 進(jìn)入的前置條件: cell[]未初始化 || 線(xiàn)程對(duì)應(yīng)的cell未初始化 || 修改value失斁哉搿(有競(jìng)爭(zhēng))
* x:需要累加的值
* fn:函數(shù)式接口,可以對(duì)方法進(jìn)行擴(kuò)展诀艰,這里沒(méi)有用上不用管
* wasUncontended:是否產(chǎn)生競(jìng)爭(zhēng)柬甥,true:沒(méi)有競(jìng)爭(zhēng),false:有競(jìng)爭(zhēng)
* 根據(jù)上面的分析其垄,只有修改value失敗時(shí)為false
*/
final void longAccumulate(long x, LongBinaryOperator fn,
boolean wasUncontended) {
// 當(dāng)前線(xiàn)程探針hash
int h;
// h為0表示線(xiàn)程的探針hash為初始化苛蒲,則需要先進(jìn)行初始化。
if ((h = getProbe()) == 0) {
// 初始化線(xiàn)程中的探針hash
ThreadLocalRandom.current();
h = getProbe();
/**
* 當(dāng)getProbe()為0的時(shí)候绿满,剛剛在a = as[getProbe() & m]會(huì)計(jì)算出as[0]撤防,
* 所以0號(hào)桶位產(chǎn)生的競(jìng)爭(zhēng)是因?yàn)榫€(xiàn)程未初始化探針hash導(dǎo)致在0號(hào)桶位產(chǎn)生競(jìng)爭(zhēng),
* 正常情況下線(xiàn)程可能對(duì)應(yīng)別的桶位棒口,所以不是真正的競(jìng)爭(zhēng),這里才會(huì)重新設(shè)置true
* 表示沒(méi)有發(fā)送競(jìng)爭(zhēng)辜膝。
*/
wasUncontended = true;
}
// collide表示擴(kuò)容意向无牵,true:可能擴(kuò)容,false:一定不會(huì)擴(kuò)容
boolean collide = false;
// 自旋
for (;;) {
/**
* as:當(dāng)前的cell[]
* a:當(dāng)前線(xiàn)程對(duì)應(yīng)的cell
* n:cell[]長(zhǎng)度
* v:cas修改時(shí)的期望值
*/
Cell[] as; Cell a; int n; long v;
// 判斷cell[]是否初始化
if ((as = cells) != null && (n = as.length) > 0) {
// 前置條件:cell[]已經(jīng)初始化
// 拿到當(dāng)前線(xiàn)程對(duì)應(yīng)的cell判斷是否初始化
if ((a = as[(n - 1) & h]) == null) {
// 前置條件:當(dāng)前線(xiàn)程對(duì)應(yīng)的cell未初始化
// 判斷當(dāng)前是否加鎖
if (cellsBusy == 0) {
// 前置條件:未加鎖
// 創(chuàng)建線(xiàn)程對(duì)應(yīng)的cell對(duì)象并賦值厂抖,PS:這里還沒(méi)有關(guān)聯(lián)到具體的桶位
Cell r = new Cell(x);
/**
* 需要獲取到鎖才能進(jìn)行初始化
* case1:判斷是否加鎖(防止有其他線(xiàn)程在這個(gè)時(shí)候加鎖了)
* case2:嘗試加鎖
*/
if (cellsBusy == 0 && casCellsBusy()) {
// 前置條件:當(dāng)前線(xiàn)程加鎖成功
// 用于表示是否初始化完畢茎毁,true:完成,false未完成
boolean created = false;
// 進(jìn)行初始化操作
try {
/**
* rs:當(dāng)前cell[]
* m:cell[]長(zhǎng)度
* j:線(xiàn)程對(duì)應(yīng)桶位下標(biāo)
*/
Cell[] rs; int m, j;
/**
* case1忱辅、case2:判斷cell[]是否初始化
* case3:判斷線(xiàn)程對(duì)應(yīng)的cell是否初始化
* 這里之所以再次判斷七蜘,是因?yàn)榭赡艽嬖诰€(xiàn)程安全問(wèn)題,
* 例如線(xiàn)程1執(zhí)行剛完初始化墙懂,線(xiàn)程2剛好執(zhí)行到這個(gè)if語(yǔ)句橡卤,
* 如果線(xiàn)程1和線(xiàn)程2的對(duì)應(yīng)的桶位相同,則線(xiàn)程2會(huì)覆蓋線(xiàn)程1
* 創(chuàng)建的cell對(duì)象损搬。
*/
if ((rs = cells) != null &&
(m = rs.length) > 0 &&
rs[j = (m - 1) & h] == null) {
// 讓桶位指向剛剛創(chuàng)建的cell
rs[j] = r;
// 表示創(chuàng)建成功
created = true;
}
} finally {
// 釋放鎖
cellsBusy = 0;
}
// 創(chuàng)建成功碧库,退出自旋
if (created)
break;
// 未創(chuàng)建成功則再次自旋
continue;
}
}
// 前置條件:線(xiàn)程對(duì)應(yīng)cell未初始化 && 未獲得鎖
// 因?yàn)橛衅渌€(xiàn)程鎖住了cell[],所以無(wú)法創(chuàng)建cell對(duì)象
// 在下面進(jìn)行rehash然后自旋巧勤,
// 這樣有可能重新分配到一個(gè)已經(jīng)初始化的cell對(duì)象
collide = false;
}
// 前置條件:cell[]已初始化 && 線(xiàn)程對(duì)應(yīng)的cell已初始化
// cas競(jìng)爭(zhēng)失敗走到這里
else if (!wasUncontended)
// 設(shè)置為非競(jìng)爭(zhēng)狀態(tài)嵌灰,在下面進(jìn)行rehash然后自旋
wasUncontended = true;
// 前置條件:cell[]已初始化 && 線(xiàn)程對(duì)應(yīng)的cell已初始化
// cas修改線(xiàn)程對(duì)應(yīng)的cell,修改成功直接退出颅悉,失敗則繼續(xù)自旋
else if (a.cas(v = a.value, ((fn == null) ? v + x :
fn.applyAsLong(v, x))))
break;
// case1:數(shù)組長(zhǎng)度大于等于CPU核心數(shù)
// case2:cell[]已經(jīng)發(fā)生擴(kuò)容沽瞭,不再是之前的對(duì)象了
else if (n >= NCPU || cells != as)
// 當(dāng)case1或case2為true進(jìn)入
// case1表示n >= NCPU,則一定不會(huì)發(fā)生擴(kuò)容
// case2表示cells剛剛完成了一次擴(kuò)容剩瓶,所以設(shè)置為false
collide = false;
// 下面是擴(kuò)容邏輯驹溃,所以這里需要判斷是否能夠擴(kuò)容
// 當(dāng)不能擴(kuò)容時(shí)在下面進(jìn)行rehash然后自旋
else if (!collide)
collide = true;
// 判斷是否加鎖并且嘗試獲得鎖
else if (cellsBusy == 0 && casCellsBusy()) {
try {
// 再次檢查城丧,防止進(jìn)入之前有其他線(xiàn)程進(jìn)行了擴(kuò)容導(dǎo)致cells和as不同
if (cells == as) {
// 創(chuàng)建新的數(shù)組, n << 1等效于n*2
Cell[] rs = new Cell[n << 1];
// 遷移元素到新數(shù)組
for (int i = 0; i < n; ++i)
rs[i] = as[i];
// 把成員變量cells指向新的數(shù)組
// 其實(shí)在上面的幾次判斷cells == as就是因?yàn)檫@里
cells = rs;
}
} finally {
// 釋放鎖
cellsBusy = 0;
}
// 擴(kuò)容完成之后設(shè)置為false
collide = false;
// 重新走一遍循環(huán)吠架,
continue;
}
// rehash操作芙贫,重新再分配一次hash和桶位
// 能夠進(jìn)行rehash的情況:
// 產(chǎn)生競(jìng)爭(zhēng)(wasUncontended == false) ||
h = advanceProbe(h);
}
// 前置條件:cell[]未初始化
// 判斷是否加鎖以及確定as和cells所指向同一個(gè)對(duì)象
// 獲得鎖后執(zhí)行初始化
else if (cellsBusy == 0 && cells == as && casCellsBusy()) {
// 是否初始化完成
boolean init = false;
try {
// 再次檢查之后開(kāi)始初始化cell[]和線(xiàn)程對(duì)應(yīng)的cell
if (cells == as) {
// 初始化cell[]長(zhǎng)度為2
Cell[] rs = new Cell[2];
// 初始化線(xiàn)程對(duì)應(yīng)的cell并賦值x
rs[h & 1] = new Cell(x);
// 讓cells指向新數(shù)組
cells = rs;
// 初始化完成
init = true;
}
} finally {
// 釋放鎖
cellsBusy = 0;
}
// 初始化完成后退出自旋
if (init)
break;
}
// 前置條件:cell[]未初始化 && (未獲取到鎖 || cells != as)
// 線(xiàn)程走到這里大概率是因?yàn)橛衅渌€(xiàn)程正在初始化cell[]或者已經(jīng)初始化完成
// 所以這里直接對(duì)base嘗試?yán)奂? else if (casBase(v = base, ((fn == null) ? v + x :
fn.applyAsLong(v, x))))
break;
}
}