轉眼就8月份了巢块,要開始努力了~~~
在爭用激烈的場景下,會導致大量的CAS空自旋燎窘。這會浪費大量的CPU資源浑厚,大大降低了程序的性能股耽。
這時候可以通過使用 LongAdder
代替 AtomicInteger
來提高CAS操作的性能。
以空間換時間:LongAdder
Java8提供了一個類 LongAdder
通過以空間換時間的方式提高并發(fā)場景下CAS操作的性能钳幅。
LongAdder
的核心思想是熱點分離物蝙,與 ConcurrentHashMap
的設計思想類似,將 value
值分離成一個數組敢艰,當多線程訪問時茬末,通過 Hash 算法將線程映射到數組的一個元素進行操作,而獲取最終的的 value
結果時盖矫,則將數組的元素求和丽惭。
最終,通過 LongAdder
將內部操作對象從單一 value
值“演變”成一些列的數組元素辈双,從而減小了內部競爭的粒度责掏。
下面看一個小例子:
public class LongAdderTest {
final int TURNS = 1000;
@Test
public void testLongAdder() {
final int TASK_AMOUNT = 10;
ExecutorService pool = Executors.newCachedThreadPool();
LongAdder longAdder = new LongAdder();
CountDownLatch countDownLatch = new CountDownLatch(TASK_AMOUNT);
long start = System.currentTimeMillis();
for(int i = 0; i < TASK_AMOUNT; i++) {
pool.submit(() -> {
try {
for(int j = 0; j < TURNS; j++) {
longAdder.add(1);
}
} catch (Exception e) {
e.printStackTrace();
}
countDownLatch.countDown();
});
}
try {
countDownLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
float time = (System.currentTimeMillis() - start) / 1000F;
System.out.println("運行時長為:" + time);
System.out.println("累加結果為: " + longAdder.longValue());
}
}
LongAdder 的原理
AtomicLong
使用內部變量 value
保存著實際的 long
值,所有的操作都是針對該變量進行的湃望。也就是說在高并發(fā)環(huán)境下换衬,value
變量其實是一個熱點,重試線程越多证芭,CAS失敗概率更高瞳浦,從而進入惡性CAS空自旋狀態(tài)。
LongAdder
的基本實錄是分散熱點废士,將 value
值分散到一個數組中叫潦,不同線程會命中到數組的不同槽(元素)中,各個線程只對自己槽中的那個值進行CAS操作官硝。這樣熱點就被分擔了矗蕊,沖突的概率就小很多短蜕。
使用 LongAdder
,即使線程數再多也不必擔心傻咖,各個線程會分配到多個元素上去更新朋魔,增加元素個數,就可以降低 value
的“熱度”卿操,惡性CAS空自旋就解決了警检。
如果要獲得完整的 LongAdder
存儲的值,只要將各個槽中的變量值累加害淤,返回最終累加之后的值即可解滓。
LongAdder
的實現(xiàn)思路與 ConcurrentHashMap
中分段鎖的基本原理非常相似,本質上都是不同的線程在不同的單元上進行操作筝家,這樣減少了線程競爭,提高了并發(fā)效率邻辉。
LongAdder
的設計體現(xiàn)了空間換時間的思想溪王,不過在實際高并發(fā)場景下,數組元素所消耗的空間可以忽略不計值骇。
LongAdder 實例的內部結構
一個 LongAdder
實例的內部結構如下所示莹菱。
LongAdder
的內部成員包含一個 base
值和一個 cells
數組,在最初無競爭時吱瘩,只操作 base
的值道伟,當線程執(zhí)行CAS失敗后,才初始化 cells
數組使碾,并為線程分配所對應的元素蜜徽。
基類 Striped64 內部三個重要成員
LongAdder
繼承于 Stripped64
類,base
值和 cell
數組都在 Stripped64
類中定義票摇,它的內部三個重要的成員如下:
/**
* 成員一:存放Cell的哈希表拘鞋,大小為2的冪
*/
transient volatile Cell[] cells;
/**
* 成員二:基礎值
* 1.在沒有競爭時會更新這個值
* 2.在cells初始化時,cells不可用矢门,也會嘗試通過CAS操作值累加到base
*/
transient volatile long base;
/**
* 自旋鎖盆色,通過CAS操作加鎖,為0表示cells數組沒有處于創(chuàng)建祟剔、擴容階段
* 為1表示正在創(chuàng)建或者擴展cells數組隔躲,不能進行新Cell元素的設置操作
*/
transient volatile int cellsBusy;
Stripped64
內部包含一個 base
和一個 Cell[]
類型的 cells
數組,cells
數組又叫哈希表物延,在沒有競爭的情況下宣旱,要累加的數通過CAS累加到 base
上,如果有競爭的話叛薯,會將要累加的數累加到 cells
數組中的某個 Cell
元素里面响鹃。
Stripped64
的整體值 value
的獲取函數如下:
public long longValue() {
return sum();
}
/**
* 將多個cells數組中的值加起來的和
*/
public long sum() {
Cell[] as = cells;
Cell a;
long sum = base;
if (as != null) {
for (int i = 0; i < as.length; ++i) {
if ((a = as[i]) != null)
sum += a.value;
}
}
return sum;
}
Stripped64
的設計核心思路是通過內部的分散計算來避免競爭驾霜,以空間換取時間。沒有競爭時 cells
數組為 null买置,這時只使用 base
粪糙,一旦發(fā)生競爭,cells
數組就上場了忿项。
cells
數組第一次初始化長度為2蓉冈,以后每次擴容都變?yōu)樵瓉淼膬杀叮恢钡?cells
數組的長度大于等于當前服務器的CPU核數轩触,同一時刻能持有CPU時間片去并發(fā)操作同一內存地址的最大線程數最多也就是CPU的核數寞酿。
在存在線程爭用的時候,每個線程被映射到 cells[threadLocalRandomProbe&cells.length]
位置的 Cell 元素脱柱,該線程對 value
所做的累加操作就執(zhí)行在對應的 Cell
元素的值上伐弹。
LongAdder 類的 add() 操作
這里分析以下 LongAdder
類的 add()
方法,具體的源碼如下:
/**
* Adds the given value.
*
* @param x the value to add
*/
public void add(long x) {
Cell[] as; long b, v; int m; Cell a;
if ((as = cells) != null || !casBase(b = base, b + x)) {
boolean uncontended = true;
if (as == null || (m = as.length - 1) < 0 ||
(a = as[getProbe() & m]) == null ||
!(uncontended = a.cas(v = a.value, v + x)))
longAccumulate(x, null, uncontended);
}
}
/**
* 自增1
*/
public void increment() {
add(1L);
}
/**
* 自減1
*/
public void decrement() {
add(-1L);
}
首先介紹一下代碼的外層 if 塊的兩個條件語句:
-
cells
數組不為null榨为,說明存在爭用惨好,在不存在爭用的時候,cells
數組一定為null随闺,一旦對base
的CAS操作失敗日川,才會初始化cells
數組。 - 如果
cells
數組為null矩乐,表示之前不存在爭用龄句,并且此次casBase
執(zhí)行成功,表示基于base
成員累加成功散罕,add
方法直接返回分歇;如果casBase
方法執(zhí)行失敗,說明產生了第一次爭用沖突欧漱,需要對cells
數組初始化卿樱,此時即將進入內嵌 if 塊。
casBase
方法很簡單硫椰,就是通過 UNSAFE 類的CAS設置成員變量 base
的值為 base+x
繁调。casBase
方法的代碼如下:
final boolean casBase(long cmp, long val) {
return UNSAFE.compareAndSwapLong(this, BASE, cmp, val);
}
內嵌的if語句塊邏輯如下:
-
as == null || (m = as.length - 1)<0
代表cells
沒有初始化。 - 當前線程的哈希值在
cells
數組映射位置的Cell
對象為空靶草,意思是還沒有其他線程在同一個位置做過累加操作蹄胰。 - 當前線程的哈希值在
cells
數組映射位置的Cell
對象不為空,然后在該Cell
對象上進行CAS操作奕翔,設置其值為v+x
裕寨,但是CAS操作失敗,表示存在爭用。
如果以上條件滿足一個宾袜,就進入 longAccumulate
方法捻艳。
LongAdder 類中的 longAccumulate() 方法
longAccumulate()
是 Striped64
中重要的方法,實現(xiàn)不同的線程更新各自 Cell
中的值庆猫,其實現(xiàn)邏輯類似于分段鎖认轨,具體代碼如下:
final void longAccumulate(long x, LongBinaryOperator fn,
boolean wasUncontended) {
int h;
if ((h = getProbe()) == 0) {
ThreadLocalRandom.current(); // force initialization
h = getProbe();
wasUncontended = true;
}
// 擴容意向,collide=true可以擴容月培,collide=false不可擴容
boolean collide = false;
// 自旋嘁字,一直到操作成功
for (;;) {
// as 表示cells引用
// a 表示當前線程命中的Cell
// n表示cells數組長度
// v 表示期望值
Cell[] as; Cell a; int n; long v;
// true表示下標位置的Cell為null,需要創(chuàng)建Cell
if ((as = cells) != null && (n = as.length) > 0) {
if ((a = as[(n - 1) & h]) == null) {
if (cellsBusy == 0) { // cells 數組沒有處于創(chuàng)建杉畜,擴容階段
Cell r = new Cell(x); // Optimistically create
if (cellsBusy == 0 && casCellsBusy()) {
boolean created = false;
try { // Recheck under lock
Cell[] rs; int m, j;
if ((rs = cells) != null &&
(m = rs.length) > 0 &&
rs[j = (m - 1) & h] == null) {
rs[j] = r;
created = true;
}
} finally {
cellsBusy = 0;
}
if (created)
break;
continue; // Slot is now non-empty
}
}
collide = false;
}
// 當前線程競爭失敗纪蜒,wasUncontended為false
else if (!wasUncontended) // CAS already known to fail
wasUncontended = true; // Continue after rehash
// 當前線程rehash過哈希值,CAS更新cell
else if (a.cas(v = a.value, ((fn == null) ? v + x :
fn.applyAsLong(v, x))))
break;
// 調整擴容意向此叠,然后進入下一輪循環(huán)
else if (n >= NCPU || cells != as)
collide = false; // At max size or stale
// 設置擴容意向為true纯续,但是不一定真的發(fā)生擴容
else if (!collide)
collide = true;
// 真正擴容的邏輯
else if (cellsBusy == 0 && casCellsBusy()) {
try {
if (cells == as) { // Expand table unless stale
Cell[] rs = new Cell[n << 1];
for (int i = 0; i < n; ++i)
rs[i] = as[i];
cells = rs;
}
} finally {
cellsBusy = 0; // 釋放鎖
}
collide = false;
continue; // Retry with expanded table
}
h = advanceProbe(h); //重置當前線程的Hash值
}
// cells還未初始化,并且cellsBusy加鎖成功
else if (cellsBusy == 0 && cells == as && casCellsBusy()) {
boolean init = false;
try { // Initialize table
if (cells == as) {
Cell[] rs = new Cell[2];
rs[h & 1] = new Cell(x);
cells = rs;
init = true;
}
} finally {
cellsBusy = 0;
}
if (init)
break;
}
// 當前線程cellsBusy加鎖失敗灭袁,表示其它線程這鞥在初始化cells
// 所以當前線程將值累加到base猬错,注意add()方法調用此方式時fn為null
else if (casBase(v = base, ((fn == null) ? v + x :
fn.applyAsLong(v, x))))
break; // Fall back on using base
}
}
LongAddr 類的 casCellsBusy 方法
casCellsBusy()
方法的代碼很簡單,就是將 cellsBusy
成員的值改為1简卧,表示目前的cells數組在初始化或擴容中。代碼如下:
final boolean casCellsBusy() {
return UNSAFE.compareAndSwapInt(this, CELLSBUSY, 0, 1);
}
casCellsBusy()
方法相當于鎖的功能烤芦,當線程需要 cells 數組初始化或擴容時举娩,需要調到 casCellsBusy()
方法,通過CAS方式將 cellsBusy 成員的值改為1构罗,如果修改失敗铜涉,就表示其它的線程正在進行數組初始化或擴容的操作。在 cells 數組初始化或擴容的操作執(zhí)行完成之后遂唧,cellsBusy 成員的值被改為0芙代,這時不需要進行CAS修改,直接修改即可盖彭,因為不存在爭用纹烹。
當 cellsBusy 值為1時,表示cells數組正在被某個線程執(zhí)行初始化或擴容操作召边,其它線程不能進行以下操作:
- 對cells數組初始化铺呵。
- 對cells數組擴容。
- 如果cells數組中某個元素為null隧熙,就為該元素創(chuàng)建新的Cell對象片挂,因為數組的結構正在修改,所以其他線程下面不能創(chuàng)建新的Cell對象。