LongAdder其實(shí)是AtomicLong的升級(jí)版,AtomicLong在多線程下會(huì)導(dǎo)致較多的自旋重試瓷产,主要原因還是因?yàn)槎嗑€程同時(shí)cas同一個(gè)變量的時(shí)候失敗次數(shù)較多宙搬,那LongAdder的出現(xiàn)就是為了解決AtomicLong在多線程壞境下的痛點(diǎn)氛改。
??首先分析源代碼之前我們先拋出以下幾個(gè)問題:
- LongAdder如何解決AtomicLong的痛點(diǎn)誊锭?
- LongAdder內(nèi)部維護(hù)的cell數(shù)組在什么情況下初始化?
- LongAdder怎么確定當(dāng)前線程訪問內(nèi)部cell數(shù)組的那個(gè)元素校读?
- LongAdder如何擴(kuò)容沼侣,在什么情況下擴(kuò)容?
- LongAdder訪問cell沖突后怎么解決歉秫?
先來看下LongAdder類的繼承關(guān)系
LongAdder繼承自Striped64蛾洛,Striped64內(nèi)部維護(hù)的三個(gè)變量cells、base雁芙、cellsBusy雅潭,base其實(shí)是基礎(chǔ)值,cells是一個(gè)Cell數(shù)組却特,LongAdder的值其實(shí)就等于base加上cells數(shù)組內(nèi)的value值。cellsBusy是cells初始化筛圆,擴(kuò)容以及創(chuàng)建新的Cell的標(biāo)志位裂明。
接下來我們分析源代碼,LongAdder的源碼我們主要看add方法就行太援。
public void add(long x) {
Cell[] as; long b, v; int m; Cell a;
if ((as = cells) != null || !casBase(b = base, b + x)) { //(1)
boolean uncontended = true;
if (as == null || (m = as.length - 1) < 0 || (a = as[getProbe() & m]) == null
|| !(uncontended = a.cas(v = a.value, v + x))) //(2)
longAccumulate(x, null, uncontended); //(3)
}
}
代碼1:先看cells數(shù)組是否為空闽晦,如果為空進(jìn)行cas原子操作,將新增的值賦值給原變量提岔,如果cells數(shù)組不為空或者是cas失敗進(jìn)行下一步仙蛉。
問題1:AtomicLong的痛點(diǎn)就是多線程同時(shí)cas一個(gè)變量時(shí),導(dǎo)致失敗后自旋重試次數(shù)特別多碱蒙,所以LongAdder內(nèi)部特意擴(kuò)充了一個(gè)cell數(shù)組去減少線程之間變量的競(jìng)爭(zhēng)關(guān)系荠瘪,從而提高了效率夯巷。
代碼2:別看條件比較多,其實(shí)總結(jié)起來就一句話哀墓,查找當(dāng)前線程所屬的cell元素趁餐,如果有進(jìn)行cas值替換,如果cell為空且替換失敗的話進(jìn)行下一步篮绰,這里注意下getProbe方法后雷,該方法是獲取當(dāng)前線程下的threadLocalRandomProbe的值,這個(gè)值一開始為0吠各,初始化的邏輯在代碼3中臀突。
1、2失敗都會(huì)進(jìn)入3贾漏,3代碼業(yè)務(wù)相對(duì)復(fù)雜我們單獨(dú)擰出來分析
final void longAccumulate(long x, LongBinaryOperator fn,
boolean wasUncontended) {
int h;
if ((h = getProbe()) == 0) {
ThreadLocalRandom.current(); //這里進(jìn)行ThreadLocalRandom初始化
h = getProbe();//獲取當(dāng)前線程下的threadLocalRandomProbe值
wasUncontended = true;//該值為無競(jìng)爭(zhēng)標(biāo)志位置位候学,如果存在競(jìng)爭(zhēng)該值則為false,無競(jìng)爭(zhēng)則為true
}
boolean collide = false;
for (;;) {
Cell[] as; Cell a; int n; long v;
if ((as = cells) != null && (n = as.length) > 0) {
if ((a = as[(n - 1) & h]) == null) { (3) //當(dāng)前線程訪問的cell元素為空進(jìn)行cell初始化
if (cellsBusy == 0) {//判斷標(biāo)志位磕瓷,如果為零表示可以進(jìn)行初始化操作
Cell r = new Cell(x); //新建一個(gè)Cell類
if (cellsBusy == 0 && casCellsBusy()) { //再次判斷并且對(duì)cellsBusy進(jìn)行cas值替換
boolean created = false;
try {
Cell[] rs; int m, j;
if ((rs = cells) != null && (m = rs.length) > 0 && rs[j = (m - 1) & h] == null) {
rs[j] = r;
created = true;
}
} finally {
cellsBusy = 0;//清除初始化標(biāo)志位
}
if (created)//如果創(chuàng)建Cell并且賦值成功直接結(jié)束本次add操作
break;
continue;
}
}
collide = false;
}
else if (!wasUncontended)
wasUncontended = true;
else if (a.cas(v = a.value, ((fn == null) ? v + x :
fn.applyAsLong(v, x))))
break;
else if (n >= NCPU || cells != as)//
collide = false;
else if (!collide)
collide = true;
else if (cellsBusy == 0 && casCellsBusy()) {(4)
try {
if (cells == as) {
Cell[] rs = new Cell[n << 1];//進(jìn)行cell數(shù)組擴(kuò)容盒齿,每次都擴(kuò)充為原來的二倍
for (int i = 0; i < n; ++i)
rs[i] = as[i];//將老數(shù)據(jù)轉(zhuǎn)移至新cell數(shù)組中
cells = rs;
}
} finally {
cellsBusy = 0;
}
collide = false;
continue;
}
h = advanceProbe(h); (5) //重新計(jì)算當(dāng)前線程的threadLocalRandomProbe值,減少?zèng)_突的機(jī)會(huì)困食。
}
else if (cellsBusy == 0 && cells == as && casCellsBusy()) { ----(2)
boolean init = false;
try {
if (cells == as) {
Cell[] rs = new Cell[2];//初始化cell數(shù)組長度為2
rs[h & 1] = new Cell(x);
cells = rs;
init = true;
}
} finally {
cellsBusy = 0;
}
if (init)
break;
}
else if (casBase(v = base, ((fn == null) ? v + x : fn.applyAsLong(v, x))))
break;
}
}
static final int advanceProbe(int probe) {
probe ^= probe << 13; // xorshift
probe ^= probe >>> 17;
probe ^= probe << 5;
UNSAFE.putInt(Thread.currentThread(), PROBE, probe);
return probe;
}
上述代碼邏輯比較復(fù)雜边翁,這里我們帶著剩下的問題去找答案會(huì)更好一些,下面我們逐個(gè)分析硕盹。
問題2:LongAdder在什么情況下會(huì)進(jìn)行數(shù)組初始化呢符匾?當(dāng)在對(duì)base值進(jìn)行cas失敗后,且cell數(shù)組為空的時(shí)候就會(huì)進(jìn)行cell數(shù)組初始化操作瘩例,在源碼中我標(biāo)注了下啊胶,在代碼2中對(duì)cell數(shù)組進(jìn)行初始化,并且初始化的數(shù)組大小長度為2垛贤,并且新創(chuàng)建一個(gè)cell類并賦值在h & 1位置上焰坪。
問題3:在代碼3中可以看到“getProbe()&n-1”與運(yùn)算邏輯,由此我們知道是由線程的threadLocalRandomProbe值與數(shù)組最大下標(biāo)進(jìn)行與運(yùn)算得出的聘惦,這個(gè)邏輯有點(diǎn)類似hashMap尋找散列桶下標(biāo)的邏輯某饰。
問題4:代碼4邏輯里面可以看到對(duì)cell數(shù)組進(jìn)行了擴(kuò)容操作,那么進(jìn)入代碼4的條件結(jié)合前面if判斷可以得出(當(dāng)前線程所訪問的cell元素不為空 & 對(duì)當(dāng)前訪問的cell類中的value值cas替換失敗 & 數(shù)組長度不大于cpu的個(gè)數(shù)
)善绎,每次擴(kuò)容都將數(shù)組擴(kuò)充為原來的兩倍黔漂,并且將老數(shù)據(jù)循環(huán)賦值至新數(shù)組中。
問題5:在代碼5中我們可以看到對(duì)線程的threadLocalRandomProbe進(jìn)行了重新計(jì)算禀酱,采用的是xorshift隨機(jī)算法炬守,以此來減小下次訪問沖突的機(jī)會(huì)。
這里我給一個(gè)我對(duì)AtomicLong和LongAdder性能對(duì)比的小測(cè)試
@State(Scope.Benchmark)
@Warmup(iterations = 5, time = 1)
@Measurement(iterations = 5, time = 1)
@Fork(1)
@OutputTimeUnit(TimeUnit.MICROSECONDS)
@BenchmarkMode(Mode.AverageTime)
public class TestMain {
private LongAdder longAdder = new LongAdder();
private AtomicLong atomicLong = new AtomicLong();
public static void main(String[] args) throws RunnerException {
Options options = new OptionsBuilder().include(TestMain.class.getName()).build();
new Runner(options).run();
}
@Benchmark
@Threads(8)
public void runLongAdder() {
for (int i = 0; i < 1000; i++) {
longAdder.add(i);
}
}
@Benchmark
@Threads(8)
public void runAtomicLong() {
for (int i = 0; i < 1000; i++) {
atomicLong.addAndGet(i);
}
}
}
運(yùn)行結(jié)果:
最后給一個(gè)問題:既然有了高效率的LongAdder那AtomicLong是不是可以廢棄剂跟?
參考文獻(xiàn):