LongAdder是JDK 1.8 新增的原子類下翎,基于Striped64實現(xiàn)。 從官方文檔看,LongAdder在高并發(fā)的場景下會比AtomicLong 具有更好的性能乒裆,代價是消耗更多的內(nèi)存空間:
This class is usually preferabl
e to AtomicLong when multiple threads update a common sum that is used for purposes such as collecting statistics, not for fine-grained synchronization control.Under low update contention, the two classes have similar characteristics. But under high contention, expected throughput of this class is significantly higher, at the expense of higher space consumption.
那么LongAdder是怎么實現(xiàn)的喊衫?
先看一下LongAdder的類圖:
基類Number互纯,Number是一個抽象類其中沒有任何邏輯,該類是byte兔院、double殖卑、float、int坊萝、long孵稽、short的基類。
Striped64
設(shè)計思想
該部分翻譯自Striped64源碼注釋屹堰,可以略過肛冶,概括起來就是:
分散熱點,將value值分散到一個數(shù)組中扯键,不同線程會命中到數(shù)組的不同槽中睦袖,各個線程只對自己槽中的那個值進行CAS操作,這樣熱點就被分散了荣刑,沖突的概率就小很多馅笙。如果要獲取真正的long值伦乔,只要將各個槽中的變量值累加返回。
從Striped64類注釋可以看到:
Striped64是package內(nèi)使用的董习,對于在64位元素上動態(tài)分片提供統(tǒng)一實現(xiàn)(感覺有點像:AbstractQueuedSynchronizer)
Striped64繼承了Number類烈和,這也就是說具體實現(xiàn)的子類也必須實現(xiàn)相關(guān)的內(nèi)容
該類維護一個原子更新變量的延遲初始化表,以及一個額外的“base”字段皿淋。表的大小是2的冪招刹。索引使用掩碼下的每個線程的hash code。這個類中的幾乎所有聲明都是package私有的窝趣,由子類直接訪問疯暑。
表格內(nèi)的元素是Cell類,Cell類是一個為了減少緩存爭用而填充的AtomicLong的變種哑舒。填充對于大多數(shù)原子來說是多余的妇拯,因為它們通常不規(guī)則地分散在內(nèi)存中,因此彼此之間不會有太多的干擾洗鸵。但是越锈,駐留在數(shù)組中的原子對象往往是彼此相鄰的,因此在沒有這種預(yù)防措施的情況下膘滨,最常見的情況是共享高速緩存線(這對性能有很大的負面影響)甘凭。
在某種程度上,因為Cell類相對較大火邓,只有他們真正被需要的時候对蒲,我們才創(chuàng)建。如果沒有競爭贡翘,那么所有的更新操作將對base字段實現(xiàn)。當(dāng)發(fā)生第一次爭用(也就是說如果第一次對base字段的CAS操作失斉槁摺)鸣驱,初始化為大小是2的表格。當(dāng)進一步的爭用發(fā)生的時候,表的大小會加倍蝠咆,直到達到等于大于cpu的數(shù)量踊东。表在未使用之前一直為null。
利用一個自旋鎖(cellsBusy)來初始化和調(diào)整表的大小刚操,以及用新Cells填充slots闸翅。這個地方?jīng)]有必要使用阻塞鎖,如果鎖不可達菊霜,線程可以嘗試其他的slots坚冀,或者嘗試base字段。在這些重試期間鉴逞,競爭加劇记某,但是降低了局部性司训,這仍然比阻塞鎖來得好。
通過ThreadLocalRandom維護的Thread.probe字段用作每個線程的哈希碼液南。我們讓它們保持未初始化(為零)(如果它們以這種方式出現(xiàn))咒钟,直到它們在插槽0競爭舌劳。出現(xiàn)競爭后初始化為通常不會和其他的的值沖突的值资柔,比如線程的哈希碼纵搁。在執(zhí)行更新時發(fā)生CAS操作失敗意味著出現(xiàn)了爭用或者表碰撞眼耀,或兩者都有池凄。在發(fā)生沖突時雷蹂,如果表的大小小于容量伟端,那么它的大小將加倍,除非其他線程持有鎖匪煌。如果哈希后的slot為空责蝠,并且鎖可用,則創(chuàng)建一個新單元格萎庭。如果存在了那么會進行CAS嘗試霜医。通過雙重哈希進行重試,利用一個輔助哈希(Marsaglia XorShift隨機數(shù)算法)來嘗試尋找一個空閑的slot驳规。
表的大小是有上限的支子,因為當(dāng)線程多于CPU時,假設(shè)每個線程都綁定到一個CPU达舒,就會有一個完美的散列函數(shù)將線程映射到插槽,從而消除沖突叹侄。當(dāng)我們達到容量時巩搏,我們通過隨機改變沖突線程的哈希代碼來搜索這個映射。因為搜索是隨機的趾代,沖突只有通過CAS失敗才知道贯底,收斂可能會很慢,而且因為線程通常不會永遠綁定到CPU,所以根本不會發(fā)生禽捆。然而笙什,盡管有這些限制,在這些情況下觀察到的競爭率通常很低胚想。
Cell可能會出現(xiàn)不可用的情況琐凭,包括進行哈希的線程終止,或者由于table擴容導(dǎo)致線程哈希不正確浊服。我們不嘗試檢測或刪除這樣的單元格统屈,假設(shè)對于長時間運行的實例,爭用會再次發(fā)生牙躺,因此最終將再次需要這些單元格;而對于短時間運行的實例愁憔,花費時間去銷毀又沒有什么必要。
Cell類
Atomiclong的變體孽拷,僅支持原始訪問和CAS吨掌。
Cell類被注解@jdk.internal.vm.annotation.Contended修飾。Contended的作用(詳細信息參見:JEP 142):Define a way to specify that one or more fields in an object are likely to be highly contended across processor cores so that the VM can arrange for them not to share cache lines with other fields, or other objects, that are likely to be independently accessed.
具體實現(xiàn)
Striped64的核心方法是longAccumulate脓恕、doubleAccumulate膜宋,兩者類似,下面主要看一下longAccumulate进肯,對于這種代碼激蹲,個人建議是理解思路即可,畢竟咱們又不是過來修改JDK的江掩,如果真的要修改了或者有類似的需求了学辱,再回來細看即可。
//x? 元素? ? //fn? 更新函數(shù)环形,如果是add可以為null(這個約定避免了longadder中定義額外的變量或者函數(shù))? ? //wasUncontended 如果CAS在調(diào)用之前失敗了策泣,這個值為falsefinal void longAccumulate(long x, LongBinaryOperator fn,? ? ? ? ? ? ? ? ? ? ? ? ? ? ? boolean wasUncontended) {? ? ? ? int h;? ? ? ? //獲取當(dāng)前線程的probe值咏尝,如果為0沙郭,則需要初始化該線程的probe值if((h = getProbe()) == 0) {? ? ? ? ThreadLocalRandom.current(); // force initialization? ? ? ? ? ? h = getProbe();? ? ? ? ? ? wasUncontended =true;? ? ? ? }? ? ? ? boolean collide =false;? // Trueiflast slot nonemptydone:for(;;) {? ? ? ? ? ? Cell[] cs; Cell c; int n; long v;? ? ? ? ? ? //Cells不為空,進行操作if((cs = cells) != null && (n = cs.length) > 0) {? ? ? ? ? ? ? ? //通過(hashCode & (length - 1))這種算法來實現(xiàn)取模 有種看到HashMap代碼的感覺? ? ? ? ? ? ? ? //如果當(dāng)前位置為null說明需要初始化if((c = cs[(n - 1) & h]) == null) {? ? ? ? ? ? ? ? ? ? //判斷鎖狀態(tài)if(cellsBusy == 0) {? ? ? // Try to attach new Cell? ? ? ? ? ? ? ? ? ? ? ? Cell r = new Cell(x);? // Optimistically create? ? ? ? ? ? ? ? ? ? ? ? //再次判斷鎖狀態(tài)冀宴,同時獲取鎖if(cellsBusy == 0 && casCellsBusy()) {? ? ? ? ? ? ? ? ? ? ? ? ? ? try {? ? ? ? ? ? ? // Recheck under lock? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? Cell[] rs; int m, j;if((rs = cells) != null &&? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? (m = rs.length) > 0 &&? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? rs[j = (m - 1) & h] == null) {? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? rs[j] = r;? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? //創(chuàng)建成功跳出breakdone;? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? }? ? ? ? ? ? ? ? ? ? ? ? ? ? } finally {? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? //釋放鎖? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? cellsBusy = 0;? ? ? ? ? ? ? ? ? ? ? ? ? ? }continue;? ? ? ? ? // Slot is now non-empty? ? ? ? ? ? ? ? ? ? ? ? }? ? ? ? ? ? ? ? ? ? }? ? ? ? ? ? ? ? ? ? collide =false;? ? ? ? ? ? ? ? }? ? ? ? ? ? ? ? //運行到此說明cell的對應(yīng)位置上已經(jīng)有相應(yīng)的Cell了火本,? ? ? ? ? ? ? ? //不需要初始化了? ? ? ? ? ? ? ? //CAS操作已經(jīng)失敗了危队,出現(xiàn)了競爭elseif(!wasUncontended)? ? ? // CAS already known to fail? ? ? ? ? ? ? ? ? ? wasUncontended =true;? ? ? // Continue afterrehash//這里嘗試將x值加到a的value上elseif(c.cas(v = c.value,? ? ? ? ? ? ? ? ? ? ? ? ? ? ? (fn == null) ? v + x : fn.applyAsLong(v, x)))? ? ? ? ? ? ? ? ? ? //如果嘗試成功,跳出循環(huán)钙畔,方法退出break;? ? ? ? ? ? ? ? //cell數(shù)組最大為cpu的數(shù)量茫陆,? ? ? ? ? ? ? ? //cells != as表明cells數(shù)組已經(jīng)被更新了? ? ? ? ? ? ? ? //標記為最大狀態(tài)或者說是過期狀態(tài)elseif(n >= NCPU || cells != cs)? ? ? ? ? ? ? ? ? ? collide =false;? ? ? ? ? ? // At max size or staleelseif(!collide)? ? ? ? ? ? ? ? ? ? collide =true;? ? ? ? ? ? ? ? //擴容 當(dāng)前容量 * 2elseif(cellsBusy == 0 && casCellsBusy()) {? ? ? ? ? ? ? ? ? ? try {if(cells == cs)? ? ? ? // Expand table unless stale? ? ? ? ? ? ? ? ? ? ? ? ? ? cells = Arrays.copyOf(cs, n << 1);? ? ? ? ? ? ? ? ? ? } finally {? ? ? ? ? ? ? ? ? ? ? ? cellsBusy = 0;? ? ? ? ? ? ? ? ? ? }? ? ? ? ? ? ? ? ? ? collide =false;continue;? ? ? ? ? ? ? ? ? // Retry with expanded table? ? ? ? ? ? ? ? }? ? ? ? ? ? ? ? h = advanceProbe(h);? ? ? ? ? ? }? ? ? ? ? ? //嘗試獲取鎖之后擴大Cellselseif(cellsBusy == 0 && cells == cs && casCellsBusy()) {? ? ? ? ? ? ? ? try {? ? ? ? ? ? ? ? ? ? ? ? ? // Initialize tableif(cells == cs) {? ? ? ? ? ? ? ? ? ? ? ? //初始化cell表,初始容量為2擎析。? ? ? ? ? ? ? ? ? ? ? ? Cell[] rs = new Cell[2];? ? ? ? ? ? ? ? ? ? ? ? rs[h & 1] = new Cell(x);? ? ? ? ? ? ? ? ? ? ? ? cells = rs;breakdone;? ? ? ? ? ? ? ? ? ? }? ? ? ? ? ? ? ? } finally {? ? ? ? ? ? ? ? ? ? //釋放cellsBusy鎖? ? ? ? ? ? ? ? ? ? cellsBusy = 0;? ? ? ? ? ? ? ? }? ? ? ? ? ? }? ? ? ? ? ? //如果創(chuàng)建cell表由于競爭導(dǎo)致失敗簿盅,嘗試將x累加到base上? ? ? ? ? ? // Fall back on using baseelseif(casBase(v = base,? ? ? ? ? ? ? ? ? ? ? ? ? ? (fn == null) ? v + x : fn.applyAsLong(v, x)))breakdone;? ? ? ? }? ? }? ? /**? ? * CASes the cellsBusy field from 0 to 1 to acquire lock.? ? */? ? final booleancasCellsBusy() {returnCELLSBUSY.compareAndSet(this, 0, 1);? ? }? ? /**? ? * CASes the base field.? ? */? ? final boolean casBase(long cmp, long val) {returnBASE.compareAndSet(this, cmp, val);? ? }
這一段的核心是這樣的:
longAccumulate會根據(jù)當(dāng)前線程來計算一個哈希值,然后根據(jù)(hashCode & (length - 1))取模,以定位到該線程被分散到的Cell數(shù)組中的位置
如果Cell數(shù)組還沒有被創(chuàng)建桨醋,那么就去獲取cellBusy這個鎖(相當(dāng)于鎖棚瘟,但是更為輕量級),如果獲取成功喜最,則初始化Cell數(shù)組偎蘸,初始容量為2,初始化完成之后將x包裝成一個Cell返顺,哈希計算之后分散到相應(yīng)的index上禀苦。如果獲取cellBusy失敗,那么會試圖將x累計到base上遂鹊,更新失敗會重新嘗試直到成功振乏。
如果Cell數(shù)組已經(jīng)被初始化過了,那么就根據(jù)線程的哈希值分散到一個Cell數(shù)組元素上秉扑,獲取這個位置上的Cell并且賦值給變量a慧邮,如果a為null,說明該位置還沒有被初始化舟陆,那么就初始化误澳,當(dāng)然在初始化之前需要競爭cellBusy變量。
如果Cell數(shù)組的大小已經(jīng)最大了(大于等于CPU的數(shù)量)秦躯,那么就需要重新計算哈希忆谓,來重新分散當(dāng)前線程到另外一個Cell位置上再走一遍該方法的邏輯,否則就需要對Cell數(shù)組進行擴容踱承,然后將原來的計數(shù)內(nèi)容遷移過去倡缠。由于Cell里面保存的是計數(shù)值,所以擴容后沒有必要做其他處理茎活,直接根據(jù)index將舊的Cell數(shù)組內(nèi)容復(fù)制到新的Cell數(shù)組中昙沦。
LongAdder
LongAdder的基本思路就是分散熱點,將value值分散到一個數(shù)組中载荔,不同線程會命中到數(shù)組的不同槽中盾饮,各個線程只對自己槽中的那個值進行CAS操作,這樣熱點就被分散了懒熙,沖突的概率就小很多丘损。如果要獲取真正的long值,只要將各個槽中的變量值累加返回工扎。
說明
保持一個或者多個變量号俐,初始值設(shè)置為零用于求和。當(dāng)更新出現(xiàn)多個線程競爭時定庵,變量集合動態(tài)增長以減少爭用。最后當(dāng)需要求和的時候或者說需要這個Long型的值時,可以通過把當(dāng)前這些變量求和蔬浙,合并后得出最終的和猪落。
LongAdder在一些高并發(fā)場景下表現(xiàn)要比AtomicLong好,比如多個線程同時更新一個求和的變量畴博,比如統(tǒng)計集合的數(shù)量笨忌,但是不能用于細粒度同步控制,換句話說這個是可能有誤差的(因為更新與讀取是并行的)俱病。在低并發(fā)場景場景下LongAdder和AtomicLong的性能表現(xiàn)沒什么差別官疲,但是當(dāng)高并發(fā)競爭的時候,這個類將具備更好的吞吐性能亮隙,但是相應(yīng)的也會耗費相當(dāng)?shù)目臻g途凫。
LongAdder繼承了Number抽象類,但是并沒有實現(xiàn)一些方法例如: equals溢吻、hashCode维费、compareTo,因為LongAdder實例的預(yù)期用途是進行一些比較頻繁的變化促王,所以也不適合作為集合的key犀盟。
具體實現(xiàn)
看一下LongAdder有哪些方法:
下面主要解析LongAdder increment、sum方法蝇狼,先看一下源碼:
/** * Equivalent to {@code add(1)}. */public voidincrement() {add(1L);}/*** Adds the given value.** @param x the value to add*/public void add(long x) {? ? Cell[] cs; long b, v; int m; Cell c;if((cs = cells) != null || !casBase(b = base, b + x)) {? ? //到了這里 表明cs不為null or 線程有并發(fā)沖突阅畴,導(dǎo)致caseBase失敗? ? ? ? boolean uncontended =true;if(cs == null || // cells 為null? ? ? ? ? ? (m = cs.length - 1) < 0 || // cells 不為null 但只有一個元素? ? ? ? ? ? (c = cs[getProbe() & m]) == null || //哈希取模 對應(yīng)位置元素為null? ? ? ? ? ? !(uncontended = c.cas(v = c.value, v + x))) //cas 替換失敗(并發(fā)競爭)? ? ? ? ? ? longAccumulate(x, null, uncontended);? ? }}/*** CASes the base field (Striped64類中的方法)*/final boolean casBase(long cmp, long val) {returnBASE.compareAndSet(this, cmp, val);}//當(dāng)在sum的過程中迅耘,有可能別的線程正在操作cells(因為沒有加鎖)//sum取的值贱枣,不一定準確public longsum() {? ? Cell[] cs = cells;? ? long sum = base;if(cs != null) {for(Cell c : cs)if(c != null)? ? ? ? ? ? ? ? sum += c.value;? ? }returnsum;}
LongAdder vs AtomicLong Performance
Java 8 Performance Improvements: LongAdder vs AtomicLong
對比LongAccumulator
LongAdder類可以看做是LongAccumulator的一個特例,LongAccumulator提供了比LongAdder更強大豹障、靈活的功能冯事。
/** * Creates a new instance using the given accumulatorfunction* and identity element. * @param accumulatorFunction a side-effect-freefunctionof two arguments * @param identity identity (initial value)forthe accumulatorfunction*/public LongAccumulator(LongBinaryOperator accumulatorFunction,? ? ? ? ? ? ? ? ? ? ? ? ? long identity) {? ? this.function = accumulatorFunction;? ? base = this.identity = identity;}@FunctionalInterfacepublic interface LongBinaryOperator {/** * Applies this operator to the given operands. * * @param left the first operand * @param right the second operand * @returnthe operator result*/long applyAsLong(long left, long right);}復(fù)制代碼
構(gòu)造函數(shù)其中accumulatorFunction一個雙目運算接口,根據(jù)輸入的兩個參數(shù)返回一個計算值血公,identity則是LongAccumulator累加器的初始值昵仅。
accumulatorFunction主要用于Striped64 longAccumulate中使用,如果fn==null累魔,則默認是相加摔笤,否則會調(diào)用fn.applyAsLong(v, x)
LongAccumulator相比于LongAdder,可以為累加器提供非0的初始值垦写,而LongAdder只能提供默認的0值吕世。
另外,LongAccumulator還可以指定累加規(guī)則梯投,比如累加或者相乘命辖,只需要在構(gòu)造LongAccumulator時况毅,傳入自定義的雙目運算器即可,后者則內(nèi)置累加規(guī)則尔艇。
為了感謝支持我的朋友尔许!整理了一份Java高級架構(gòu)資料、Spring源碼分析终娃、Dubbo味廊、Redis、Netty棠耕、zookeeper余佛、Spring cloud、分布式等資料
關(guān)注公眾號:Java耕耘者(免費獲惹嫌)
這份知識尤其適合:
1.近期想跳槽辉巡,要面試的Java程序員,查漏補缺搅荞,以便盡快彌補短板红氯;
2.想了解“一線互聯(lián)網(wǎng)公司”最新招聘需求/技術(shù)要求,對比找出自身的長處和弱點所在咕痛,評估自己在現(xiàn)有市場上的競爭力如何痢甘;
3.還沒形成系統(tǒng)的Java知識體系,缺乏清晰的提升方向和學(xué)習(xí)路徑的程序員茉贡。
4.想去一線互聯(lián)網(wǎng)公司缺乏信心
本號專注Java源碼分析塞栅。喜歡底層源碼的朋友可以來交流探討。交流群:818491202 驗證:33