死磕 java并發(fā)包之LongAdder源碼分析

(1)java8中為什么要新增LongAdder旬牲?

(2)LongAdder的實現(xiàn)方式躺酒?

(3)LongAdder與AtomicLong的對比?

簡介

LongAdder是java8中新增的原子類,在多線程環(huán)境中冰抢,它比AtomicLong性能要高出不少甜紫,特別是寫多的場景降宅。

它是怎么實現(xiàn)的呢?讓我們一起來學(xué)習(xí)吧囚霸。

原理

LongAdder的原理是钉鸯,在最初無競爭時,只更新base的值邮辽,當(dāng)有多線程競爭時通過分段的思想,讓不同的線程更新不同的段贸营,最后把這些段相加就得到了完整的LongAdder存儲的值吨述。


// Striped64中的內(nèi)部類,使用@sun.misc.Contended注解钞脂,說明里面的值消除偽共享 @sun.misc.Contended static final class Cell { // 存儲元素的值揣云,使用volatile修飾保證可見性 volatile long value; Cell(long x) { value = x; } // CAS更新value的值 final boolean cas(long cmp, long val) { return UNSAFE.compareAndSwapLong(this, valueOffset, cmp, val); } // Unsafe實例 private static final sun.misc.Unsafe UNSAFE; // value字段的偏移量 private static final long valueOffset; static { try { UNSAFE = sun.misc.Unsafe.getUnsafe(); Class<?> ak = Cell.class; valueOffset = UNSAFE.objectFieldOffset (ak.getDeclaredField("value")); } catch (Exception e) { throw new Error(e); } } }

Cell類使用@sun.misc.Contended注解,說明是要避免偽共享的冰啃。

使用Unsafe的CAS更新value的值邓夕,其中value的值使用volatile修飾,保證可見性阎毅。

關(guān)于Unsafe的介紹請查看【死磕 java魔法類之Unsafe解析】焚刚。

關(guān)于偽共享的介紹請查看【雜談 什么是偽共享(false sharing)?】扇调。

主要屬性

// 這三個屬性都在Striped64中 // cells數(shù)組矿咕,存儲各個段的值 transient volatile Cell[] cells; // 最初無競爭時使用的,也算一個特殊的段 transient volatile long base; // 標(biāo)記當(dāng)前是否有線程在創(chuàng)建或擴容cells狼钮,或者在創(chuàng)建Cell // 通過CAS更新該值碳柱,相當(dāng)于是一個鎖 transient volatile int cellsBusy;

最初無競爭或有其它線程在創(chuàng)建cells數(shù)組時使用base更新值,有過競爭時使用cells更新值熬芜。

最初無競爭是指一開始沒有線程之間的競爭莲镣,但也有可能是多線程在操作,只是這些線程沒有同時去更新base的值涎拉。

有過競爭是指只要出現(xiàn)過競爭不管后面有沒有競爭都使用cells更新值瑞侮,規(guī)則是不同的線程hash到不同的cell上去更新,減少競爭曼库。

add(x)方法

add(x)方法是LongAdder的主要方法区岗,使用它可以使LongAdder中存儲的值增加x,x可為正可為負(fù)毁枯。

public void add(long x) { // as是Striped64中的cells屬性 // b是Striped64中的base屬性 // v是當(dāng)前線程hash到的Cell中存儲的值 // m是cells的長度減1慈缔,hash時作為掩碼使用 // a是當(dāng)前線程hash到的Cell Cell[] as; long b, v; int m; Cell a; // 條件1:cells不為空,說明出現(xiàn)過競爭种玛,cells已經(jīng)創(chuàng)建 // 條件2:cas操作base失敗藐鹤,說明其它線程先一步修改了base瓤檐,正在出現(xiàn)競爭 if ((as = cells) != null || !casBase(b = base, b + x)) { // true表示當(dāng)前競爭還不激烈 // false表示競爭激烈,多個線程hash到同一個Cell娱节,可能要擴容 boolean uncontended = true; // 條件1:cells為空挠蛉,說明正在出現(xiàn)競爭,上面是從條件2過來的 // 條件2:應(yīng)該不會出現(xiàn) // 條件3:當(dāng)前線程所在的Cell為空肄满,說明當(dāng)前線程還沒有更新過Cell谴古,應(yīng)初始化一個Cell // 條件4:更新當(dāng)前線程所在的Cell失敗,說明現(xiàn)在競爭很激烈稠歉,多個線程hash到了同一個Cell掰担,應(yīng)擴容 if (as == null || (m = as.length - 1) < 0 || // getProbe()方法返回的是線程中的threadLocalRandomProbe字段 // 它是通過隨機數(shù)生成的一個值,對于一個確定的線程這個值是固定的 // 除非刻意修改它 (a = as[getProbe() & m]) == null || !(uncontended = a.cas(v = a.value, v + x))) // 調(diào)用Striped64中的方法處理 longAccumulate(x, null, uncontended); } }

(1)最初無競爭時只更新base怒炸;

(2)直到更新base失敗時带饱,創(chuàng)建cells數(shù)組;

(3)當(dāng)多個線程競爭同一個Cell比較激烈時阅羹,可能要擴容勺疼;

longAccumulate()方法

final void longAccumulate(long x, LongBinaryOperator fn, boolean wasUncontended) { // 存儲線程的probe值 int h; // 如果getProbe()方法返回0,說明隨機數(shù)未初始化 if ((h = getProbe()) == 0) { // 強制初始化 ThreadLocalRandom.current(); // force initialization // 重新獲取probe值 h = getProbe(); // 都未初始化捏鱼,肯定還不存在競爭激烈 wasUncontended = true; } // 是否發(fā)生碰撞 boolean collide = false; // True if last slot nonempty for (;;) { Cell[] as; Cell a; int n; long v; // cells已經(jīng)初始化過 if ((as = cells) != null && (n = as.length) > 0) { // 當(dāng)前線程所在的Cell未初始化 if ((a = as[(n - 1) & h]) == null) { // 當(dāng)前無其它線程在創(chuàng)建或擴容cells执庐,也沒有線程在創(chuàng)建Cell if (cellsBusy == 0) { // Try to attach new Cell // 新建一個Cell,值為當(dāng)前需要增加的值 Cell r = new Cell(x); // Optimistically create // 再次檢測cellsBusy导梆,并嘗試更新它為1 // 相當(dāng)于當(dāng)前線程加鎖 if (cellsBusy == 0 && casCellsBusy()) { // 是否創(chuàng)建成功 boolean created = false; try { // Recheck under lock Cell[] rs; int m, j; // 重新獲取cells耕肩,并找到當(dāng)前線程hash到cells數(shù)組中的位置 // 這里一定要重新獲取cells,因為as并不在鎖定范圍內(nèi) // 有可能已經(jīng)擴容了问潭,這里要重新獲取 if ((rs = cells) != null && (m = rs.length) > 0 && rs[j = (m - 1) & h] == null) { // 把上面新建的Cell放在cells的j位置處 rs[j] = r; // 創(chuàng)建成功 created = true; } } finally { // 相當(dāng)于釋放鎖 cellsBusy = 0; } // 創(chuàng)建成功了就返回 // 值已經(jīng)放在新建的Cell里面了 if (created) break; continue; // Slot is now non-empty } } // 標(biāo)記當(dāng)前未出現(xiàn)沖突 collide = false; } // 當(dāng)前線程所在的Cell不為空猿诸,且更新失敗了 // 這里簡單地設(shè)為true,相當(dāng)于簡單地自旋一次 // 通過下面的語句修改線程的probe再重新嘗試 else if (!wasUncontended) // CAS already known to fail wasUncontended = true; // Continue after rehash // 再次嘗試CAS更新當(dāng)前線程所在Cell的值狡忙,如果成功了就返回 else if (a.cas(v = a.value, ((fn == null) ? v + x : fn.applyAsLong(v, x)))) break; // 如果cells數(shù)組的長度達(dá)到了CPU核心數(shù)梳虽,或者cells擴容了 // 設(shè)置collide為false并通過下面的語句修改線程的probe再重新嘗試 else if (n >= NCPU || cells != as) collide = false; // At max size or stale // 上上個elseif都更新失敗了,且上個條件不成立灾茁,說明出現(xiàn)沖突了 else if (!collide) collide = true; // 明確出現(xiàn)沖突了窜觉,嘗試占有鎖,并擴容 else if (cellsBusy == 0 && casCellsBusy()) { try { // 檢查是否有其它線程已經(jīng)擴容過了 if (cells == as) { // Expand table unless stale // 新數(shù)組為原數(shù)組的兩倍 Cell[] rs = new Cell[n << 1]; // 把舊數(shù)組元素拷貝到新數(shù)組中 for (int i = 0; i < n; ++i) rs[i] = as[i]; // 重新賦值cells為新數(shù)組 cells = rs; } } finally { // 釋放鎖 cellsBusy = 0; } // 已解決沖突 collide = false; // 使用擴容后的新數(shù)組重新嘗試 continue; // Retry with expanded table } // 更新失敗或者達(dá)到了CPU核心數(shù)北专,重新生成probe禀挫,并重試 h = advanceProbe(h); } // 未初始化過cells數(shù)組,嘗試占有鎖并初始化cells數(shù)組 else if (cellsBusy == 0 && cells == as && casCellsBusy()) { // 是否初始化成功 boolean init = false; try { // Initialize table // 檢測是否有其它線程初始化過 if (cells == as) { // 新建一個大小為2的Cell數(shù)組 Cell[] rs = new Cell[2]; // 找到當(dāng)前線程hash到數(shù)組中的位置并創(chuàng)建其對應(yīng)的Cell rs[h & 1] = new Cell(x); // 賦值給cells數(shù)組 cells = rs; // 初始化成功 init = true; } } finally { // 釋放鎖 cellsBusy = 0; } // 初始化成功直接返回 // 因為增加的值已經(jīng)同時創(chuàng)建到Cell中了 if (init) break; } // 如果有其它線程在初始化cells數(shù)組中拓颓,就嘗試更新base // 如果成功了就返回 else if (casBase(v = base, ((fn == null) ? v + x : fn.applyAsLong(v, x)))) break; // Fall back on using base } }

(1)如果cells數(shù)組未初始化语婴,當(dāng)前線程會嘗試占有cellsBusy鎖并創(chuàng)建cells數(shù)組;

(2)如果當(dāng)前線程嘗試創(chuàng)建cells數(shù)組時,發(fā)現(xiàn)有其它線程已經(jīng)在創(chuàng)建了砰左,就嘗試更新base匿醒,如果成功就返回;

(3)通過線程的probe值找到當(dāng)前線程應(yīng)該更新cells數(shù)組中的哪個Cell缠导;

(4)如果當(dāng)前線程所在的Cell未初始化廉羔,就占有占有cellsBusy鎖并在相應(yīng)的位置創(chuàng)建一個Cell;

(5)嘗試CAS更新當(dāng)前線程所在的Cell僻造,如果成功就返回憋他,如果失敗說明出現(xiàn)沖突;

(5)當(dāng)前線程更新Cell失敗后并不是立即擴容髓削,而是嘗試更新probe值后再重試一次举瑰;

(6)如果在重試的時候還是更新失敗,就擴容蔬螟;

(7)擴容時當(dāng)前線程占有cellsBusy鎖,并把數(shù)組容量擴大到兩倍汽畴,再遷移原cells數(shù)組中元素到新數(shù)組中旧巾;

(8)cellsBusy在創(chuàng)建cells數(shù)組、創(chuàng)建Cell忍些、擴容cells數(shù)組三個地方用到鲁猩;

sum()方法

sum()方法是獲取LongAdder中真正存儲的值的大小,通過把base和所有段相加得到罢坝。

public long sum() { Cell[] as = cells; Cell a; // sum初始等于base long sum = base; // 如果cells不為空 if (as != null) { // 遍歷所有的Cell for (int i = 0; i < as.length; ++i) { // 如果所在的Cell不為空廓握,就把它的value累加到sum中 if ((a = as[i]) != null) sum += a.value; } } // 返回sum return sum; }


可以看到sum()方法是把base和所有段的值相加得到,那么嘁酿,這里有一個問題隙券,如果前面已經(jīng)累加到sum上的Cell的value有修改,不是就沒法計算到了么闹司?

答案確實如此娱仔,所以LongAdder可以說不是強一致性的,它是最終一致性的游桩。

LongAdder VS AtomicLong

直接上代碼:

public class LongAdderVSAtomicLongTest { public static void main(String[] args){ testAtomicLongVSLongAdder(1, 10000000); testAtomicLongVSLongAdder(10, 10000000); testAtomicLongVSLongAdder(20, 10000000); testAtomicLongVSLongAdder(40, 10000000); testAtomicLongVSLongAdder(80, 10000000); } static void testAtomicLongVSLongAdder(final int threadCount, final int times){ try { System.out.println("threadCount:" + threadCount + ", times:" + times); long start = System.currentTimeMillis(); testLongAdder(threadCount, times); System.out.println("LongAdder elapse:" + (System.currentTimeMillis() - start) + "ms"); long start2 = System.currentTimeMillis(); testAtomicLong(threadCount, times); System.out.println("AtomicLong elapse:" + (System.currentTimeMillis() - start2) + "ms"); } catch (InterruptedException e) { e.printStackTrace(); } } static void testAtomicLong(final int threadCount, final int times) throws InterruptedException { AtomicLong atomicLong = new AtomicLong(); List<Thread> list = new ArrayList<>(); for (int i=0;i<threadCount;i++){ list.add(new Thread(() -> { for (int j = 0; j<times; j++){ atomicLong.incrementAndGet(); } })); } for (Thread thread : list){ thread.start(); } for (Thread thread : list){ thread.join(); } } static void testLongAdder(final int threadCount, final int times) throws InterruptedException { LongAdder longAdder = new LongAdder(); List<Thread> list = new ArrayList<>(); for (int i=0;i<threadCount;i++){ list.add(new Thread(() -> { for (int j = 0; j<times; j++){ longAdder.add(1); } })); } for (Thread thread : list){ thread.start(); } for (Thread thread : list){ thread.join(); } } }

運行結(jié)果如下:

threadCount:1, times:10000000 LongAdder elapse:158ms AtomicLong elapse:64ms threadCount:10, times:10000000 LongAdder elapse:206ms AtomicLong elapse:2449ms threadCount:20, times:10000000 LongAdder elapse:429ms AtomicLong elapse:5142ms threadCount:40, times:10000000 LongAdder elapse:840ms AtomicLong elapse:10506ms threadCount:80, times:10000000 LongAdder elapse:1369ms AtomicLong elapse:20482ms

可以看到當(dāng)只有一個線程的時候牲迫,AtomicLong反而性能更高,隨著線程越來越多借卧,AtomicLong的性能急劇下降盹憎,而LongAdder的性能影響很小。

總結(jié)

(1)LongAdder通過base和cells數(shù)組來存儲值铐刘;

(2)不同的線程會hash到不同的cell上去更新陪每,減少了競爭;

(3)LongAdder的性能非常高,最終會達(dá)到一種無競爭的狀態(tài)奶稠;

彩蛋

在longAccumulate()方法中有個條件是n >= NCPU就不會走到擴容邏輯了俯艰,而n是2的倍數(shù),那是不是代表cells數(shù)組最大只能達(dá)到大于等于NCPU的最小2次方锌订?

答案是明確的竹握。因為同一個CPU核心同時只會運行一個線程,而更新失敗了說明有兩個不同的核心更新了同一個Cell辆飘,這時會重新設(shè)置更新失敗的那個線程的probe值啦辐,這樣下一次它所在的Cell很大概率會發(fā)生改變,如果運行的時間足夠長蜈项,最終會出現(xiàn)同一個核心的所有線程都會hash到同一個Cell(大概率芹关,但不一定全在一個Cell上)上去更新,所以紧卒,這里cells數(shù)組中長度并不需要太長侥衬,達(dá)到CPU核心數(shù)足夠了。

比如跑芳,筆者的電腦是8核的轴总,所以這里cells的數(shù)組最大只會到8,達(dá)到8就不會擴容了博个。



?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末怀樟,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子盆佣,更是在濱河造成了極大的恐慌往堡,老刑警劉巖,帶你破解...
    沈念sama閱讀 212,383評論 6 493
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件共耍,死亡現(xiàn)場離奇詭異虑灰,居然都是意外死亡,警方通過查閱死者的電腦和手機痹兜,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,522評論 3 385
  • 文/潘曉璐 我一進(jìn)店門瘩缆,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人佃蚜,你說我怎么就攤上這事庸娱。” “怎么了谐算?”我有些...
    開封第一講書人閱讀 157,852評論 0 348
  • 文/不壞的土叔 我叫張陵熟尉,是天一觀的道長。 經(jīng)常有香客問我洲脂,道長斤儿,這世上最難降的妖魔是什么剧包? 我笑而不...
    開封第一講書人閱讀 56,621評論 1 284
  • 正文 為了忘掉前任,我火速辦了婚禮往果,結(jié)果婚禮上疆液,老公的妹妹穿的比我還像新娘。我一直安慰自己陕贮,他們只是感情好堕油,可當(dāng)我...
    茶點故事閱讀 65,741評論 6 386
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著肮之,像睡著了一般掉缺。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上戈擒,一...
    開封第一講書人閱讀 49,929評論 1 290
  • 那天眶明,我揣著相機與錄音,去河邊找鬼筐高。 笑死搜囱,一個胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的柑土。 我是一名探鬼主播蜀肘,決...
    沈念sama閱讀 39,076評論 3 410
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼冰单!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起灸促,我...
    開封第一講書人閱讀 37,803評論 0 268
  • 序言:老撾萬榮一對情侶失蹤诫欠,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后浴栽,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體荒叼,經(jīng)...
    沈念sama閱讀 44,265評論 1 303
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 36,582評論 2 327
  • 正文 我和宋清朗相戀三年典鸡,在試婚紗的時候發(fā)現(xiàn)自己被綠了被廓。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 38,716評論 1 341
  • 序言:一個原本活蹦亂跳的男人離奇死亡萝玷,死狀恐怖嫁乘,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情球碉,我是刑警寧澤蜓斧,帶...
    沈念sama閱讀 34,395評論 4 333
  • 正文 年R本政府宣布,位于F島的核電站睁冬,受9級特大地震影響挎春,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 40,039評論 3 316
  • 文/蒙蒙 一直奋、第九天 我趴在偏房一處隱蔽的房頂上張望能庆。 院中可真熱鬧,春花似錦脚线、人聲如沸搁胆。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,798評論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽丰涉。三九已至,卻和暖如春斯碌,著一層夾襖步出監(jiān)牢的瞬間一死,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 32,027評論 1 266
  • 我被黑心中介騙來泰國打工傻唾, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留投慈,地道東北人。 一個月前我還...
    沈念sama閱讀 46,488評論 2 361
  • 正文 我出身青樓冠骄,卻偏偏與公主長得像伪煤,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子凛辣,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 43,612評論 2 350

推薦閱讀更多精彩內(nèi)容

  • 基本原理和思想 ?Java有很多并發(fā)控制機制抱既,比如說以AQS為基礎(chǔ)的鎖或者以CAS為原理的自旋鎖。一般來說扁誓,CAS...
    Java耕耘者閱讀 1,132評論 0 0
  • 一防泵、簡介 ?在之前的《ConcurrentHashMap深入剖析(JDK8)》文章中,我們看到了CounterCe...
    SunnyMore閱讀 1,838評論 1 6
  • 作者: 一字馬胡 轉(zhuǎn)載標(biāo)志 【2017-11-03】 更新日志 Java Striped64 Striped64...
    一字馬胡閱讀 8,798評論 11 22
  • 30立什么蝗敢?立身捷泞,確立自己的品格和修養(yǎng),思想修養(yǎng)寿谴,道德涵養(yǎng)锁右,能力培養(yǎng)。自強是立身之本讶泰,別把自己的需求寄托在父母的資...
    梅溪湖肖老師520閱讀 183評論 0 0
  • 管理最主要的是以人為本咏瑟,優(yōu)秀的管理者要學(xué)會帶領(lǐng)自己的團隊,做好客戶導(dǎo)向痪署、成果導(dǎo)向响蕴。管理者要做好對自己團隊的監(jiān)督...
    孫倩閱讀 121評論 0 0