問題引入
在java體系中,為保證并發(fā)安全夹囚,我們通常會采用顯示鎖或者cas無鎖編程仁卷。使用顯示鎖(包括sychorized穴翩,lock)來保證臨界區(qū)的資源安全,是一種常用方式锦积。而cas無鎖編程則使用最原始的方式來保證臨界資源的安全芒帕。如果對lock有過了解的同學,應該知道丰介,lock鎖本質(zhì)就是通過對一個中間變量進行cas操作副签,如果cas操作成功,則表示加鎖成功基矮,如果cas失敗,則進入aqs隊列等待通知繼續(xù)cas冠场。不難發(fā)現(xiàn)家浇,java中的lock實現(xiàn)本質(zhì)上也是通過cas操作來獲取鎖,獲取鎖成功后碴裙,再操作臨界資源钢悲。那么,我們當然可以直接對臨界資源進行cas操作舔株,省去中間加鎖這一步莺琳,這就像如果我們租房子,我們可以找中介幫我們租载慈,當然惭等,我們也可以直接找到房東。那有同學就要問了办铡,既然如此辞做,jdk為啥還費這么大勁開發(fā)lock接口琳要,這就要說到cas操作本身的劣勢了。cas全名是 compare and swap秤茅,比較并交換稚补,高并發(fā)情況下,可能會產(chǎn)生如下幾個問題
1框喳、aba問題
2课幕、競爭過于激烈?guī)泶罅靠兆孕瑢е耤pu的急劇上升五垮。
問題解決思路
aba問題相對來說好解決乍惊,加一個版本號就可以。但是cas導致大量空自旋的問題拼余,就不那么好解決了污桦。解決這個問題的思路有兩個。
1匙监、為什么會大量空自旋凡橱,是因為對臨界資源的競爭太過激烈,我們可以采用分散臨界資源的方式去減少對臨界資源的競爭亭姥,從而降低空自旋稼钩,這也是計算機科學中最常用的方法論之一:以空間換時間。
2达罗、既然自旋次數(shù)太多坝撑,我們就讓其等待,不再自旋粮揉。
基于第二種想法巡李,就產(chǎn)生了Lock鎖的方式。我們對臨界資源修改的時候扶认,如果并發(fā)太高侨拦,競爭修改臨界資源的線程太多,我們就將cas失敗的線程放入到一個隊列中辐宾,等到修改成功的線程完成后再通知隊列中的后繼線程去修改狱从,而不是再無腦去不斷自旋嘗試cas。這樣叠纹,就避免了直接cas帶來的惡性自旋問題季研。但是采用這種方式,也有壞處誉察,就是高并發(fā)下處理性能會有所降低与涡,因為要去爭搶鎖,并且線程間的通信切換也會帶來一定的消耗。這就像租房子的時候找中介递沪,雖然可以省心一點豺鼻,但是你要付中介費用。
基于此款慨,我們再看第一種方式儒飒,jdk中的Atomic原子類,就可以直接通過cas操作保證臨界資源的安全檩奠,但是存在我們開頭所說的問題桩了,所以jdk在1.8中就開發(fā)了一個LongAdder 來解決這個問題,用的方式就是我們所說的第一種方式分離熱點埠戳,下面我們來具體對比一下AtomicLong 和 LongAdder 的性能表現(xiàn)井誉,然后再分析LongAdder的設計思想、源碼解析整胃。
AtomicLong 與LongAdder的對比實驗
public class CASTest {
//線程數(shù)量:10
private final int THREAD_COUNT = 10;
//累加次數(shù):分別測試 1000颗圣,10000,100000屁使,1000000在岂,10000000,100000000
private final int TURNS = 1000;
//cpu密集型線程池
ExecutorService pool = new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors(),
Runtime.getRuntime().availableProcessors(),3, TimeUnit.SECONDS,new LinkedBlockingDeque<>());
//原子對象
private AtomicLong atomicLong = new AtomicLong(0);
//LongAdder原子對象
LongAdder longAdder = new LongAdder();
/**
* AtomicLong性能測試
*/
public void testAtomicLong() throws InterruptedException {
CountDownLatch doneSignal = new CountDownLatch(THREAD_COUNT);
long start = System.currentTimeMillis();
for (int i=0 ;i< THREAD_COUNT; i++) {
pool.submit(() -> {
for (int j=0; j<TURNS; j++) {
atomicLong.incrementAndGet();
}
doneSignal.countDown();;
});
}
doneSignal.await();
float time = (System.currentTimeMillis() - start)/1000F;
System.out.println("AtomicLong 測試累加結(jié)果: "+atomicLong.get()+ " 耗費時間:"+time );
}
/**
* LongAdder測試類
*/
public void testLongAdder() throws InterruptedException {
CountDownLatch doneSignal = new CountDownLatch(THREAD_COUNT);
long start = System.currentTimeMillis();
for (int i=0 ;i< THREAD_COUNT; i++) {
pool.submit(() -> {
for (int j=0; j<TURNS; j++) {
longAdder.increment();
}
doneSignal.countDown();;
});
}
doneSignal.await();
float time = (System.currentTimeMillis() - start)/1000F;
System.out.println("LongAdder 測試累加結(jié)果: "+longAdder.longValue()+ " 耗費時間:"+time );
}
public static void main(String[] args) throws InterruptedException {
CASTest casTest = new CASTest();
casTest.testAtomicLong();
casTest.testLongAdder();
}
}
以上代碼是對AtomicLong和LongAdder的性能比較蛮寂,在啟用10個線程的情況下蔽午,我們分別對臨界資源進行1000,1W,10w,100w,1000w,一億次累計操作酬蹋,然后比較AtomicLong和LongAdder所耗費的時間及老。通過分析下圖這個結(jié)果,我們可以明顯看出范抓,在cas競爭不大的情況下骄恶,AtomicLong性能是要高一些的,但是隨著cas競爭越來越激烈匕垫,LongAdder的優(yōu)勢會越來越明顯叠蝇,在每個線程累加1億次的情況下,LongAdder的性能接近是AtomicLong 的7倍(而且這個優(yōu)勢可能隨著硬件資源的升高會逐漸放大)
AtomicLong和LongAdder的性能比較
LongAdder的原理與分析
接下來我們來看一下LongAdder的原理
原理概述:
AtomicLong 使用內(nèi)部變量 value 保存著實際的 long 值年缎,所有的操作都是針對該 value 變量進行。也就是說铃慷,高并發(fā)環(huán)境下单芜,value 變量其實是一個熱點,也就是 N 個線程競爭一個熱點犁柜。重試線程越多洲鸠,意味著 CAS 的失敗幾率更高,CAS 失敗幾率就越高,從而進入惡性 CAS 空自旋狀態(tài)扒腕。
LongAdder 的基本思路就是分散熱點绢淀,將 value 值分散到一個數(shù)組中,不同線程會命中到數(shù)組的不同槽(元素)中瘾腰,各個線程只對自己槽中的那個值進行 CAS 操作皆的。這樣熱點就被分散了,沖突的概率就小很多蹋盆。當需要獲取元素值時费薄,只需要將所有數(shù)組元素值累加即可。
原理詳解
LongAdder 繼承于 Striped64 類栖雾,base 值和 cells 數(shù)組都在 Striped64 類定義楞抡。基類 Striped64 內(nèi)部三個重要的成員如下:
/**
* 成員一:存放 Cell 的 hash 表析藕,大小為 2 的冪召廷。
*/
transient volatile Cell[] cells;
/**
* 成員二:基礎(chǔ)值,
* 1. 在沒有競爭時會更新這個值账胧;
* 2. 在 cells 初始化時 cells 不可用竞慢,也會嘗試將通過 cas 操作值累加到 base。
*/
transient volatile long base;
/**
* 自旋鎖找爱,通過 CAS 操作加鎖梗顺,為 0 表示 cells 數(shù)組沒有處于創(chuàng)建、擴容階段
* 為 1 用于表示正在創(chuàng)建或者擴展 Cell 數(shù)組车摄,不能進行新 Cell 元素的設置操作寺谤。
*/
transient volatile int cellsBusy;
1。在cas競爭不激烈吮播,沒有發(fā)生cas失敗的情況下变屁,線程會直接更新case的值。這種情況下本質(zhì)上和AtmicLong是一樣的意狠。
2粟关、但是一旦競爭加劇,發(fā)生了cas失敗的情況情況下环戈,線程首先會嘗試修改cellsBuzy值闷板,修改成功表示可以創(chuàng)建或者擴容數(shù)組
3却邓、修改失敗的話先鱼,說明正在有其他線程去創(chuàng)建或者擴容數(shù)組,則此時去直接修改base值渣蜗。
4拦止、在創(chuàng)建了cell數(shù)組的情況下县遣,會根據(jù)線程id對數(shù)組的長度取模糜颠,然后映射到數(shù)組下標某一具體的cell,再去修改這個cell的值萧求,這樣就把原來所有線程對base值的競爭修改其兴,分散到了對數(shù)組中的每一個cell數(shù)組元素值的修改,大大減少了并發(fā)競爭夸政,從而降低cas惡性自旋元旬。
5、同時需要注意的一個點是秒梳,cell數(shù)組的擴容時機以及它的上限大小法绵。在cell數(shù)組已經(jīng)被創(chuàng)建并且對應的數(shù)組下標位置已經(jīng)有值的情況下,某個線程去修改cell元素的值酪碘,如果修改失敗的話朋譬,說明當前cell數(shù)組長度可能不夠,盡管已經(jīng)分散了兴垦,但是映射到具體某一位置后修改依然失敗徙赢,這時就需要去擴容數(shù)組的長度,擴容一次是原來的兩倍探越。當然狡赐,也不可能無限擴容,上限是達到cpu的核心數(shù)钦幔,因為可以并發(fā)的線程數(shù)最多就是cpu的核心數(shù)枕屉。
6、在獲取元素值時鲤氢,直接累加base值和cell數(shù)組中所有元素值即可搀擂。
總結(jié):
高并發(fā)情況下對臨界資源的分離熱點(分段)是最常用的手段之一,類似的案例還有很多種卷玉,比如jdk1.8之前的版本concurrentHashMap也是通過分段鎖來降低競爭,比如本地緩存組件caffeine中的StripedBuffer并發(fā)寫入隊列中的場景哨颂。具體業(yè)務應用層面,比如我們在優(yōu)化redis大key時相种,常用的方法也是將key分散威恼;比如商品庫存如果是存在redis中,在并發(fā)更新redis庫存時寝并,如果redis達到性能瓶頸箫措,我們也可以將redis庫存key分散成多個庫存key 等等...掌握這一思想,在應對高并發(fā)的場景時衬潦,大有裨益蒂破。
附錄LongAdder源碼解析
public void add(long x) {
Cell[] as; long b, v; int m; Cell a;
/**
* :cells 數(shù)組不為 null,說明存在爭用别渔;在不存在爭用的時候,cells 數(shù)組
* 一定為 null,一旦對 base 的 cas 操作失敗哎媚,才會初始化 cells 數(shù)組
*/
/**
* 如果 cells 數(shù)組為 null喇伯,表示之前不存在爭用,并且此次 casBase 執(zhí)行
* 成功拨与,則表示基于 base 成員累加成功稻据,add 方法直接返回;如果 casBase 方法執(zhí)行失敗买喧,
* 說明產(chǎn)生了第一次爭用沖突捻悯,需要對 cells 數(shù)組初始化,此時淤毛,即將進入內(nèi)層 if 塊
*/
if ((as = cells) != null || !casBase(b = base, b + x)) {
boolean uncontended = true;
/**
* cells 沒有初始化
*/
if (as == null || (m = as.length - 1) < 0 ||
/**
* 指當前線程 hash 到的 cells 數(shù)組的元素位置的 Cell 對象為空今缚,意思是
* 還沒有其他線程在同一個位置做過累加操作
*/
(a = as[getProbe() & m]) == null ||
/**
* 指當前線程在 cells 數(shù)組的被 hash 到的元素位置的 Cell 對象不為空,
* 然后在該 Cell 對象上進行 CAS 設置其值為 v+x(x 為該 Cell 需要累加的值)低淡,但是 CAS
* 操作失敗姓言,表示存在爭用
*/
!(uncontended = a.cas(v = a.value, v + x)))
longAccumulate(x, null, uncontended);
}
}
final void longAccumulate(long x, LongBinaryOperator fn,
boolean wasUncontended) {
int h;
if ((h = getProbe()) == 0) {
ThreadLocalRandom.current(); // force initialization
h = getProbe();
wasUncontended = true;
}
//擴容意向,collide=true 可以擴容蔗蹋,collide=false 不可擴容
boolean collide = false; // True if last slot nonempty
//自旋何荚,一直到操作成功
for (;;) {
Cell[] as; Cell a; int n; long v;
//表示 cells 已經(jīng)初始化了,當前線程應該將數(shù)據(jù)寫入到對應的 cell 中
if ((as = cells) != null && (n = as.length) > 0) {
//true 表示下標位置的 cell 為 null猪杭,需要創(chuàng)建 new Cell
if ((a = as[(n - 1) & h]) == null) {
if (cellsBusy == 0) { // Try to attach new Cell
Cell r = new Cell(x); // Optimistically create
if (cellsBusy == 0 && casCellsBusy()) {
boolean created = false;
try { // Recheck under lock
Cell[] rs; int m, j;
if ((rs = cells) != null &&
(m = rs.length) > 0 &&
rs[j = (m - 1) & h] == null) {
rs[j] = r;
created = true;
}
} finally {
cellsBusy = 0;
}
if (created)
break;
continue; // Slot is now non-empty
}
}
collide = false;
}
else if (!wasUncontended) // CAS already known to fail
wasUncontended = true; // Continue after rehash
/**
* 當前下標位置的cell已經(jīng)有值餐塘,需要進行cas修改cell的值
*(如果修改失敗了,說明數(shù)組當前的容量可能太小皂吮,導致cas失敗戒傻,后續(xù)有擴容意向)
*/
else if (a.cas(v = a.value, ((fn == null) ? v + x :
fn.applyAsLong(v, x))))
break;
//如果數(shù)組長度大于cpu核心數(shù),則不再擴容
else if (n >= NCPU || cells != as)
collide = false; // At max size or stale
else if (!collide)
//設置擴容意向為true
collide = true;
//真正擴容邏輯涮较,容量設置為原諒的兩倍
else if (cellsBusy == 0 && casCellsBusy()) {
try {
if (cells == as) { // Expand table unless stale
Cell[] rs = new Cell[n << 1];
for (int i = 0; i < n; ++i)
rs[i] = as[i];
cells = rs;
}
} finally {
cellsBusy = 0;
}
collide = false;
continue; // Retry with expanded table
}
h = advanceProbe(h);
}
//cells 還未初始化(as 為 null),并且 cellsBusy 加鎖成功
else if (cellsBusy == 0 && cells == as && casCellsBusy()) {
boolean init = false;
try { // Initialize table
if (cells == as) {
//初始化長度為2
Cell[] rs = new Cell[2];
rs[h & 1] = new Cell(x);
cells = rs;
init = true;
}
} finally {
cellsBusy = 0;
}
if (init)
break;
}
/**
* CASE3:當前線程 cellsBusy 加鎖失敗稠鼻,表示其他線程正在初始化 cells
* 所以當前線程將值累加到 base,
*/
else if (casBase(v = base, ((fn == null) ? v + x :
fn.applyAsLong(v, x))))
break; // Fall back on using base
}
}
/**
* LongAdder sum的值等于base值 加上所有cell中的值
* @return
*/
public long sum() {
Cell[] as = cells; Cell a;
long sum = base;
if (as != null) {
for (int i = 0; i < as.length; ++i) {
if ((a = as[i]) != null)
sum += a.value;
}
}
return sum;
}