題
(1)java8中為什么要新增LongAdder旬牲?
(2)LongAdder的實現(xiàn)方式躺酒?
(3)LongAdder與AtomicLong的對比?
簡介
LongAdder是java8中新增的原子類,在多線程環(huán)境中冰抢,它比AtomicLong性能要高出不少甜紫,特別是寫多的場景降宅。
它是怎么實現(xiàn)的呢?讓我們一起來學(xué)習(xí)吧囚霸。
原理
LongAdder的原理是钉鸯,在最初無競爭時,只更新base的值邮辽,當(dāng)有多線程競爭時通過分段的思想,讓不同的線程更新不同的段贸营,最后把這些段相加就得到了完整的LongAdder存儲的值吨述。
// Striped64中的內(nèi)部類,使用@sun.misc.Contended注解钞脂,說明里面的值消除偽共享 @sun.misc.Contended static final class Cell { // 存儲元素的值揣云,使用volatile修飾保證可見性 volatile long value; Cell(long x) { value = x; } // CAS更新value的值 final boolean cas(long cmp, long val) { return UNSAFE.compareAndSwapLong(this, valueOffset, cmp, val); } // Unsafe實例 private static final sun.misc.Unsafe UNSAFE; // value字段的偏移量 private static final long valueOffset; static { try { UNSAFE = sun.misc.Unsafe.getUnsafe(); Class<?> ak = Cell.class; valueOffset = UNSAFE.objectFieldOffset (ak.getDeclaredField("value")); } catch (Exception e) { throw new Error(e); } } }
Cell類使用@sun.misc.Contended注解,說明是要避免偽共享的冰啃。
使用Unsafe的CAS更新value的值邓夕,其中value的值使用volatile修飾,保證可見性阎毅。
關(guān)于Unsafe的介紹請查看【死磕 java魔法類之Unsafe解析】焚刚。
關(guān)于偽共享的介紹請查看【雜談 什么是偽共享(false sharing)?】扇调。
主要屬性
// 這三個屬性都在Striped64中 // cells數(shù)組矿咕,存儲各個段的值 transient volatile Cell[] cells; // 最初無競爭時使用的,也算一個特殊的段 transient volatile long base; // 標(biāo)記當(dāng)前是否有線程在創(chuàng)建或擴容cells狼钮,或者在創(chuàng)建Cell // 通過CAS更新該值碳柱,相當(dāng)于是一個鎖 transient volatile int cellsBusy;
最初無競爭或有其它線程在創(chuàng)建cells數(shù)組時使用base更新值,有過競爭時使用cells更新值熬芜。
最初無競爭是指一開始沒有線程之間的競爭莲镣,但也有可能是多線程在操作,只是這些線程沒有同時去更新base的值涎拉。
有過競爭是指只要出現(xiàn)過競爭不管后面有沒有競爭都使用cells更新值瑞侮,規(guī)則是不同的線程hash到不同的cell上去更新,減少競爭曼库。
add(x)方法
add(x)方法是LongAdder的主要方法区岗,使用它可以使LongAdder中存儲的值增加x,x可為正可為負(fù)毁枯。
public void add(long x) { // as是Striped64中的cells屬性 // b是Striped64中的base屬性 // v是當(dāng)前線程hash到的Cell中存儲的值 // m是cells的長度減1慈缔,hash時作為掩碼使用 // a是當(dāng)前線程hash到的Cell Cell[] as; long b, v; int m; Cell a; // 條件1:cells不為空,說明出現(xiàn)過競爭种玛,cells已經(jīng)創(chuàng)建 // 條件2:cas操作base失敗藐鹤,說明其它線程先一步修改了base瓤檐,正在出現(xiàn)競爭 if ((as = cells) != null || !casBase(b = base, b + x)) { // true表示當(dāng)前競爭還不激烈 // false表示競爭激烈,多個線程hash到同一個Cell娱节,可能要擴容 boolean uncontended = true; // 條件1:cells為空挠蛉,說明正在出現(xiàn)競爭,上面是從條件2過來的 // 條件2:應(yīng)該不會出現(xiàn) // 條件3:當(dāng)前線程所在的Cell為空肄满,說明當(dāng)前線程還沒有更新過Cell谴古,應(yīng)初始化一個Cell // 條件4:更新當(dāng)前線程所在的Cell失敗,說明現(xiàn)在競爭很激烈稠歉,多個線程hash到了同一個Cell掰担,應(yīng)擴容 if (as == null || (m = as.length - 1) < 0 || // getProbe()方法返回的是線程中的threadLocalRandomProbe字段 // 它是通過隨機數(shù)生成的一個值,對于一個確定的線程這個值是固定的 // 除非刻意修改它 (a = as[getProbe() & m]) == null || !(uncontended = a.cas(v = a.value, v + x))) // 調(diào)用Striped64中的方法處理 longAccumulate(x, null, uncontended); } }
(1)最初無競爭時只更新base怒炸;
(2)直到更新base失敗時带饱,創(chuàng)建cells數(shù)組;
(3)當(dāng)多個線程競爭同一個Cell比較激烈時阅羹,可能要擴容勺疼;
longAccumulate()方法
final void longAccumulate(long x, LongBinaryOperator fn, boolean wasUncontended) { // 存儲線程的probe值 int h; // 如果getProbe()方法返回0,說明隨機數(shù)未初始化 if ((h = getProbe()) == 0) { // 強制初始化 ThreadLocalRandom.current(); // force initialization // 重新獲取probe值 h = getProbe(); // 都未初始化捏鱼,肯定還不存在競爭激烈 wasUncontended = true; } // 是否發(fā)生碰撞 boolean collide = false; // True if last slot nonempty for (;;) { Cell[] as; Cell a; int n; long v; // cells已經(jīng)初始化過 if ((as = cells) != null && (n = as.length) > 0) { // 當(dāng)前線程所在的Cell未初始化 if ((a = as[(n - 1) & h]) == null) { // 當(dāng)前無其它線程在創(chuàng)建或擴容cells执庐,也沒有線程在創(chuàng)建Cell if (cellsBusy == 0) { // Try to attach new Cell // 新建一個Cell,值為當(dāng)前需要增加的值 Cell r = new Cell(x); // Optimistically create // 再次檢測cellsBusy导梆,并嘗試更新它為1 // 相當(dāng)于當(dāng)前線程加鎖 if (cellsBusy == 0 && casCellsBusy()) { // 是否創(chuàng)建成功 boolean created = false; try { // Recheck under lock Cell[] rs; int m, j; // 重新獲取cells耕肩,并找到當(dāng)前線程hash到cells數(shù)組中的位置 // 這里一定要重新獲取cells,因為as并不在鎖定范圍內(nèi) // 有可能已經(jīng)擴容了问潭,這里要重新獲取 if ((rs = cells) != null && (m = rs.length) > 0 && rs[j = (m - 1) & h] == null) { // 把上面新建的Cell放在cells的j位置處 rs[j] = r; // 創(chuàng)建成功 created = true; } } finally { // 相當(dāng)于釋放鎖 cellsBusy = 0; } // 創(chuàng)建成功了就返回 // 值已經(jīng)放在新建的Cell里面了 if (created) break; continue; // Slot is now non-empty } } // 標(biāo)記當(dāng)前未出現(xiàn)沖突 collide = false; } // 當(dāng)前線程所在的Cell不為空猿诸,且更新失敗了 // 這里簡單地設(shè)為true,相當(dāng)于簡單地自旋一次 // 通過下面的語句修改線程的probe再重新嘗試 else if (!wasUncontended) // CAS already known to fail wasUncontended = true; // Continue after rehash // 再次嘗試CAS更新當(dāng)前線程所在Cell的值狡忙,如果成功了就返回 else if (a.cas(v = a.value, ((fn == null) ? v + x : fn.applyAsLong(v, x)))) break; // 如果cells數(shù)組的長度達(dá)到了CPU核心數(shù)梳虽,或者cells擴容了 // 設(shè)置collide為false并通過下面的語句修改線程的probe再重新嘗試 else if (n >= NCPU || cells != as) collide = false; // At max size or stale // 上上個elseif都更新失敗了,且上個條件不成立灾茁,說明出現(xiàn)沖突了 else if (!collide) collide = true; // 明確出現(xiàn)沖突了窜觉,嘗試占有鎖,并擴容 else if (cellsBusy == 0 && casCellsBusy()) { try { // 檢查是否有其它線程已經(jīng)擴容過了 if (cells == as) { // Expand table unless stale // 新數(shù)組為原數(shù)組的兩倍 Cell[] rs = new Cell[n << 1]; // 把舊數(shù)組元素拷貝到新數(shù)組中 for (int i = 0; i < n; ++i) rs[i] = as[i]; // 重新賦值cells為新數(shù)組 cells = rs; } } finally { // 釋放鎖 cellsBusy = 0; } // 已解決沖突 collide = false; // 使用擴容后的新數(shù)組重新嘗試 continue; // Retry with expanded table } // 更新失敗或者達(dá)到了CPU核心數(shù)北专,重新生成probe禀挫,并重試 h = advanceProbe(h); } // 未初始化過cells數(shù)組,嘗試占有鎖并初始化cells數(shù)組 else if (cellsBusy == 0 && cells == as && casCellsBusy()) { // 是否初始化成功 boolean init = false; try { // Initialize table // 檢測是否有其它線程初始化過 if (cells == as) { // 新建一個大小為2的Cell數(shù)組 Cell[] rs = new Cell[2]; // 找到當(dāng)前線程hash到數(shù)組中的位置并創(chuàng)建其對應(yīng)的Cell rs[h & 1] = new Cell(x); // 賦值給cells數(shù)組 cells = rs; // 初始化成功 init = true; } } finally { // 釋放鎖 cellsBusy = 0; } // 初始化成功直接返回 // 因為增加的值已經(jīng)同時創(chuàng)建到Cell中了 if (init) break; } // 如果有其它線程在初始化cells數(shù)組中拓颓,就嘗試更新base // 如果成功了就返回 else if (casBase(v = base, ((fn == null) ? v + x : fn.applyAsLong(v, x)))) break; // Fall back on using base } }
(1)如果cells數(shù)組未初始化语婴,當(dāng)前線程會嘗試占有cellsBusy鎖并創(chuàng)建cells數(shù)組;
(2)如果當(dāng)前線程嘗試創(chuàng)建cells數(shù)組時,發(fā)現(xiàn)有其它線程已經(jīng)在創(chuàng)建了砰左,就嘗試更新base匿醒,如果成功就返回;
(3)通過線程的probe值找到當(dāng)前線程應(yīng)該更新cells數(shù)組中的哪個Cell缠导;
(4)如果當(dāng)前線程所在的Cell未初始化廉羔,就占有占有cellsBusy鎖并在相應(yīng)的位置創(chuàng)建一個Cell;
(5)嘗試CAS更新當(dāng)前線程所在的Cell僻造,如果成功就返回憋他,如果失敗說明出現(xiàn)沖突;
(5)當(dāng)前線程更新Cell失敗后并不是立即擴容髓削,而是嘗試更新probe值后再重試一次举瑰;
(6)如果在重試的時候還是更新失敗,就擴容蔬螟;
(7)擴容時當(dāng)前線程占有cellsBusy鎖,并把數(shù)組容量擴大到兩倍汽畴,再遷移原cells數(shù)組中元素到新數(shù)組中旧巾;
(8)cellsBusy在創(chuàng)建cells數(shù)組、創(chuàng)建Cell忍些、擴容cells數(shù)組三個地方用到鲁猩;
sum()方法
sum()方法是獲取LongAdder中真正存儲的值的大小,通過把base和所有段相加得到罢坝。
public long sum() { Cell[] as = cells; Cell a; // sum初始等于base long sum = base; // 如果cells不為空 if (as != null) { // 遍歷所有的Cell for (int i = 0; i < as.length; ++i) { // 如果所在的Cell不為空廓握,就把它的value累加到sum中 if ((a = as[i]) != null) sum += a.value; } } // 返回sum return sum; }
可以看到sum()方法是把base和所有段的值相加得到,那么嘁酿,這里有一個問題隙券,如果前面已經(jīng)累加到sum上的Cell的value有修改,不是就沒法計算到了么闹司?
答案確實如此娱仔,所以LongAdder可以說不是強一致性的,它是最終一致性的游桩。
LongAdder VS AtomicLong
直接上代碼:
public class LongAdderVSAtomicLongTest { public static void main(String[] args){ testAtomicLongVSLongAdder(1, 10000000); testAtomicLongVSLongAdder(10, 10000000); testAtomicLongVSLongAdder(20, 10000000); testAtomicLongVSLongAdder(40, 10000000); testAtomicLongVSLongAdder(80, 10000000); } static void testAtomicLongVSLongAdder(final int threadCount, final int times){ try { System.out.println("threadCount:" + threadCount + ", times:" + times); long start = System.currentTimeMillis(); testLongAdder(threadCount, times); System.out.println("LongAdder elapse:" + (System.currentTimeMillis() - start) + "ms"); long start2 = System.currentTimeMillis(); testAtomicLong(threadCount, times); System.out.println("AtomicLong elapse:" + (System.currentTimeMillis() - start2) + "ms"); } catch (InterruptedException e) { e.printStackTrace(); } } static void testAtomicLong(final int threadCount, final int times) throws InterruptedException { AtomicLong atomicLong = new AtomicLong(); List<Thread> list = new ArrayList<>(); for (int i=0;i<threadCount;i++){ list.add(new Thread(() -> { for (int j = 0; j<times; j++){ atomicLong.incrementAndGet(); } })); } for (Thread thread : list){ thread.start(); } for (Thread thread : list){ thread.join(); } } static void testLongAdder(final int threadCount, final int times) throws InterruptedException { LongAdder longAdder = new LongAdder(); List<Thread> list = new ArrayList<>(); for (int i=0;i<threadCount;i++){ list.add(new Thread(() -> { for (int j = 0; j<times; j++){ longAdder.add(1); } })); } for (Thread thread : list){ thread.start(); } for (Thread thread : list){ thread.join(); } } }
運行結(jié)果如下:
threadCount:1, times:10000000 LongAdder elapse:158ms AtomicLong elapse:64ms threadCount:10, times:10000000 LongAdder elapse:206ms AtomicLong elapse:2449ms threadCount:20, times:10000000 LongAdder elapse:429ms AtomicLong elapse:5142ms threadCount:40, times:10000000 LongAdder elapse:840ms AtomicLong elapse:10506ms threadCount:80, times:10000000 LongAdder elapse:1369ms AtomicLong elapse:20482ms
可以看到當(dāng)只有一個線程的時候牲迫,AtomicLong反而性能更高,隨著線程越來越多借卧,AtomicLong的性能急劇下降盹憎,而LongAdder的性能影響很小。
總結(jié)
(1)LongAdder通過base和cells數(shù)組來存儲值铐刘;
(2)不同的線程會hash到不同的cell上去更新陪每,減少了競爭;
(3)LongAdder的性能非常高,最終會達(dá)到一種無競爭的狀態(tài)奶稠;
彩蛋
在longAccumulate()方法中有個條件是n >= NCPU就不會走到擴容邏輯了俯艰,而n是2的倍數(shù),那是不是代表cells數(shù)組最大只能達(dá)到大于等于NCPU的最小2次方锌订?
答案是明確的竹握。因為同一個CPU核心同時只會運行一個線程,而更新失敗了說明有兩個不同的核心更新了同一個Cell辆飘,這時會重新設(shè)置更新失敗的那個線程的probe值啦辐,這樣下一次它所在的Cell很大概率會發(fā)生改變,如果運行的時間足夠長蜈项,最終會出現(xiàn)同一個核心的所有線程都會hash到同一個Cell(大概率芹关,但不一定全在一個Cell上)上去更新,所以紧卒,這里cells數(shù)組中長度并不需要太長侥衬,達(dá)到CPU核心數(shù)足夠了。
比如跑芳,筆者的電腦是8核的轴总,所以這里cells的數(shù)組最大只會到8,達(dá)到8就不會擴容了博个。