一阱飘、簡介
?在之前的《ConcurrentHashMap深入剖析(JDK8)》文章中,我們看到了CounterCell的實現(xiàn)沿用了LongAdder的分段計數(shù)的原理絮供;那么這一篇我們來重點看下LongAdder的實現(xiàn)原理墩新。
?我們先來這個類的描述:
/**
* One or more variables that together maintain an initially zero
* {@code long} sum. When updates (method {@link #add}) are contended
* across threads, the set of variables may grow dynamically to reduce
* contention. Method {@link #sum} (or, equivalently, {@link
* #longValue}) returns the current total combined across the
* variables maintaining the sum.
*
* <p>This class is usually preferable to {@link AtomicLong} when
* multiple threads update a common sum that is used for purposes such
* as collecting statistics, not for fine-grained synchronization
* control. Under low update contention, the two classes have similar
* characteristics. But under high contention, expected throughput of
* this class is significantly higher, at the expense of higher space
* consumption.
*
* <p>LongAdders can be used with a {@link
* java.util.concurrent.ConcurrentHashMap} to maintain a scalable
* frequency map (a form of histogram or multiset). For example, to
* add a count to a {@code ConcurrentHashMap<String,LongAdder> freqs},
* initializing if not already present, you can use {@code
* freqs.computeIfAbsent(k -> new LongAdder()).increment();}
*
* <p>This class extends {@link Number}, but does <em>not</em> define
* methods such as {@code equals}, {@code hashCode} and {@code
* compareTo} because instances are expected to be mutated, and so are
* not useful as collection keys.
*
* @since 1.8
* @author Doug Lea
*/
?第二段中很清晰的表明LongAdder在高并發(fā)的場景下會比AtomicLong 具有更好的性能倍谜,代價是消耗更多的內(nèi)存空間崩泡。那么戒洼,有如下問題:
為什么要引入LongAdder?AtomicLong在高并發(fā)的場景下有什么問題嗎允华? 如果低并發(fā)環(huán)境下,LongAdder和AtomicLong性能差不多寥掐,那LongAdder是否就可以替代AtomicLong了靴寂?
為什么要引入LongAdder?
?我們知道召耘,AtomicLong是利用底層的CAS操作來提供并發(fā)性的百炬,比如addAndGet方法:
/**
* Atomically adds the given value to the current value.
*
* @param delta the value to add
* @return the updated value
*/
public final long addAndGet(long delta) {
return unsafe.getAndAddLong(this, valueOffset, delta) + delta;
}
?上述方法調(diào)用了Unsafe類的getAndAddLong方法,該方法是一個native
方法污它,它的邏輯是采用自旋的方式不斷更新目標值剖踊,直到更新成功。(也即樂觀鎖的實現(xiàn)模式)
?在并發(fā)量比較低的情況下衫贬,線程沖突的概率比較小德澈,自旋的次數(shù)不會很多。但是固惯,高并發(fā)情況下梆造,N個線程同時進行自旋操作,會出現(xiàn)大量失敗并不斷自旋的場景葬毫,此時AtomicLong的自旋會成為瓶頸镇辉。
?這就是LongAdder引入的初衷------解決高并發(fā)環(huán)境下AtomictLong的自旋瓶頸問題。
LongAdder為什么高并發(fā)性場景能會更好贴捡?
?既然忽肛,在高并發(fā)場景下LongAdder可以顯著提升性能,那么它時如何做到的烂斋?這一部分簡述下LongAdder的實現(xiàn)思路屹逛,具體原理在源碼剖析的章節(jié)會重點闡述础废。
?我們知道,AtomicLong中有個內(nèi)部變量value保存著實際的long的值煎源,所有的操作都是針對該變量進行操作的色迂。也就是說,高并發(fā)場景下手销,value變量其實就是一個熱點歇僧,也就是N個線程競爭的一個熱點。
?LongAdder的基本思路就是分散熱點
锋拖,將value值分散到一個數(shù)組中诈悍,不同線程會命中到數(shù)組的不同槽中,各個線程只對自己槽中的那個值進行CAS操作兽埃,這樣熱點就被分散了侥钳,沖突的概率就小很多。如果要獲取真正的long值柄错,只要將各個槽中的變量值累加返回即可舷夺。
?這種做法其實跟JDK1.8之前的ConcurrentHashMap“分段鎖”的實現(xiàn)思路是一樣的:分散熱點。
LongAdder能否替代AtomicLong售貌?
?我們先來看看LongAdder提供的API:
?從上面的圖中可以看到给猾,LongAdder的API和AtomicLong的API還是有比較大的差異,而且AtomicLong提供的功能更豐富颂跨,尤其是addAndGet敢伸、decrementAndGet、compareAndSet這些方法恒削。addAndGet池颈、decrementAndGet除了單純的做自增自減外,還可以立即獲取增減后的值钓丰,而LongAdder則需要做同步控制才能精確獲取增減后的值躯砰。如果業(yè)務(wù)需求需要精確的控制計數(shù),則使用AtomicLong比較合適携丁;
?另外弃揽,從空間方面考慮,LongAdder其實是一種“空間換時間”的思想
则北,從這一點來講AtomicLong更合適矿微。
?低并發(fā)、一般的業(yè)務(wù)嘗盡下AtomicLong是足夠了尚揣,如果并發(fā)量很多涌矢,存在大量寫多讀少的情況,那LongAdder可能更合適快骗。
二娜庇、LongAdder原理
?上文已經(jīng)說過了塔次,AtomicLong是多個線程針對單個熱點值value進行原子操作。而LongAdder是每個線程擁有自己的槽名秀,各個線程一般只對自己槽中的那個值進行CAS操作励负。
比如有三個Thread1、Thread2匕得、Thread3继榆,每個線程對value增加10。
?對于AtomicLong汁掠,最終結(jié)果的計算值始終是下面這個形式:
value = 10 + 10 + 10 = 30
?但是對于LongAdder來說略吨,內(nèi)部有一個base變量,一個Cell[]數(shù)組考阱。
- base變量:非競態(tài)條件下翠忠,直接累加到該變量上
- Cell[]數(shù)組:競態(tài)條件下,累加各個線程自己的槽Cell[i]中
?最終結(jié)果的計算是下面的形式:
三乞榨、LongAdder源碼剖析
?我們先來看下LongAdder類的類簽名:
public class LongAdder extends Striped64 implements Serializable
?LongAdder繼承了Striped64類秽之,來實現(xiàn)累加功能,它是實現(xiàn)高并發(fā)累加的工具類吃既;Striped64的設(shè)計核心思路就是通過內(nèi)部的分散計算來避免競爭政溃。Striped64內(nèi)部包含一個base和一個Cell[] cells數(shù)組,又叫hash表态秧。
?沒有競爭的情況下,要累加的數(shù)通過cas累加到base上扼鞋;如果有競爭的話,會將要累加的數(shù)累加到Cells數(shù)組中的某個cell元素里面云头。這樣Striped64的值就跟原理章節(jié)中描述的一樣捐友,滿足了求和公式昏滴;
3.1 Striped64類
?Striped64類有三個重要的成員變量:
/**
* Table of cells. When non-null, size is a power of 2.
* 存放Cell的hash表,大小為2的冪宜狐。
*/
transient volatile Cell[] cells;
/**
* Base value, used mainly when there is no contention, but also as
* a fallback during table initialization races. Updated via CAS.
* 基礎(chǔ)值
* 1. 在沒有競爭時會更新這個值;
* 2. 在cells初始化的過程中,cells處于不可用的狀態(tài),這時候也會嘗試將通過cas操作值累加到base厘熟。
*/
transient volatile long base;
/**
* Spinlock (locked via CAS) used when resizing and/or creating Cells.
* 自旋鎖绳姨,通過CAS操作加鎖跪削,用于保護創(chuàng)建或者擴展Cell表。
*/
transient volatile int cellsBusy;
成員cells
?cells數(shù)組是LongAdder高性能實現(xiàn)的必殺器:
?AtomicInteger只有一個value,所以線程累加都要通過cas競爭value這一個變量叶眉,高并發(fā)下線程爭用非常嚴重。
?而LongAdder則有兩個值用于累加,一個是base,它的作用類似于AtomicInteger里面的value卑惜,在沒有競爭的情況下不會用到cells數(shù)組,它為null七婴,這時使用base做累加,有了競爭后cells數(shù)組就上場了撩笆,第一次初始化長度為2泣栈,以后每次擴容都是變?yōu)樵瓉淼膬杀渡」悖钡絚ells數(shù)組的長度大于等于當前服務(wù)器cpu的數(shù)量為止就不在擴容穗酥。那么為什么到超過cpu數(shù)量的時候就不再擴容了呜师?每個線程會通過線程對cells[threadLocalRandomProbe%cells.length]位置的Cell對象中的value做累加菩混,這樣相當于將線程綁定到了cells中的某個Cell對象上。
成員變量cellsBusy
?cellsBusy,它有兩個值0或1扁藕,它的作用是當要修改cells數(shù)組時加鎖沮峡,防止多線程同時修改cells數(shù)組(也稱cells表),0為無鎖亿柑,1位加鎖邢疙,加鎖的狀況有三種:
- cells數(shù)組初始化的時候;
- cells數(shù)組擴容的時候望薄;
- 如果cells數(shù)組中某個元素為null疟游,給這個位置創(chuàng)建新的Cell對象的時候;
成員變量base
?它有兩個作用:
- 在開始沒有競爭的情況下痕支,將累加值累加到base颁虐;
- 在cells初始化的過程中,cells不可用采转,這時會嘗試將值累加到base上聪廉;
3.2 Cell內(nèi)部類
/**
* Padded variant of AtomicLong supporting only raw accesses plus CAS.
*
* JVM intrinsics note: It would be possible to use a release-only
* form of CAS here, if it were provided.
* //為提高性能,使用注解@sun.misc.Contended故慈,用來避免偽共享
*/
@sun.misc.Contended static final class Cell {
//用來保存要累加的值
volatile long value;
Cell(long x) { value = x; }
//使用UNSAFE類的cas來更新value值
final boolean cas(long cmp, long val) {
return UNSAFE.compareAndSwapLong(this, valueOffset, cmp, val);
}
// Unsafe mechanics
private static final sun.misc.Unsafe UNSAFE;
//value在Cell類中存儲位置的偏移量板熊;
private static final long valueOffset;
//這個靜態(tài)方法用于獲取偏移量
static {
try {
UNSAFE = sun.misc.Unsafe.getUnsafe();
Class<?> ak = Cell.class;
valueOffset = UNSAFE.objectFieldOffset
(ak.getDeclaredField("value"));
} catch (Exception e) {
throw new Error(e);
}
}
}
?這個類相對來說比較簡單,volatile類型的value用來保存要累加的值察绷;final類型的valueOffset表示value在Cell類中存儲位置的偏移量干签;Cell類重點需要關(guān)注注解@sun.misc.Contended;這個注解是JDK1.8中用來解決偽共享的問題拆撼,具體偽共享可以參見《偽共享容劳、緩存行填充和CPU緩存詳述》這一篇文章,這里不再詳述闸度;
3.3 LongAdder
?LongAdder我們關(guān)注下重點方法竭贩,首先看下add方法。
add方法
?add方法是LongAdder累加的方法莺禁,傳入的參數(shù)x為要累加的值留量;
/**
* Adds the given value.
*
* @param x the value to add
*/
public void add(long x) {
Cell[] as; long b, v; int m; Cell a;
/**
* 如果一下兩種條件則繼續(xù)執(zhí)行if內(nèi)的語句
* 1. cells數(shù)組不為null(不存在爭用的時候,cells數(shù)組一定為null,一旦對base的cas操作失敗楼熄,才會初始化cells數(shù)組)
* 2. 如果cells數(shù)組為null忆绰,如果casBase執(zhí)行成功,則直接返回可岂,如果casBase方法執(zhí)行失敶砀摇(casBase失敗,說明第一次爭用沖突產(chǎn)生缕粹,需要對cells數(shù)組初始化)進入if內(nèi)稚茅;
* casBase方法很簡單,就是通過UNSAFE類的cas設(shè)置成員變量base的值為base+要累加的值
* casBase執(zhí)行成功的前提是無競爭平斩,這時候cells數(shù)組還沒有用到為null峰锁,可見在無競爭的情況下是類似于AtomticInteger處理方式,使用cas做累加双戳。
*/
if ((as = cells) != null || !casBase(b = base, b + x)) {
//uncontended判斷cells數(shù)組中虹蒋,當前線程要做cas累加操作的某個元素是否不存在爭用,如果cas失敗則存在爭用飒货;uncontended=false代表存在爭用魄衅,uncontended=true代表不存在爭用。
boolean uncontended = true;
/**
*1. as == null : cells數(shù)組未被初始化塘辅,成立則直接進入if執(zhí)行cell初始化
*2. (m = as.length - 1) < 0: cells數(shù)組的長度為0
*條件1與2都代表cells數(shù)組沒有被初始化成功晃虫,初始化成功的cells數(shù)組長度為2;
*3. (a = as[getProbe() & m]) == null :如果cells被初始化扣墩,且它的長度不為0哲银,則通過getProbe方法獲取當前線程Thread的threadLocalRandomProbe變量的值,初始為0呻惕,然后執(zhí)行threadLocalRandomProbe&(cells.length-1 ),相當于m%cells.length;如果cells[threadLocalRandomProbe%cells.length]的位置為null荆责,這說明這個位置從來沒有線程做過累加,需要進入if繼續(xù)執(zhí)行亚脆,在這個位置創(chuàng)建一個新的Cell對象做院;
*4. !(uncontended = a.cas(v = a.value, v + x)):嘗試對cells[threadLocalRandomProbe%cells.length]位置的Cell對象中的value值做累加操作,并返回操作結(jié)果,如果失敗了則進入if,重新計算一個threadLocalRandomProbe濒持;
如果進入if語句執(zhí)行l(wèi)ongAccumulate方法,有三種情況
1. 前兩個條件代表cells沒有初始化键耕,
2. 第三個條件指當前線程hash到的cells數(shù)組中的位置還沒有其它線程做過累加操作,
3. 第四個條件代表產(chǎn)生了沖突,uncontended=false
**/
if (as == null || (m = as.length - 1) < 0 ||
(a = as[getProbe() & m]) == null ||
!(uncontended = a.cas(v = a.value, v + x)))
longAccumulate(x, null, uncontended);
}
}
?如上所示柑营,只有從未出現(xiàn)過并發(fā)沖突的時候屈雄,base基數(shù)才會使用到,一旦出現(xiàn)了并發(fā)沖突官套,之后所有的操作都只是針對cell[]數(shù)組中的單元Cell酒奶。如果cell[]數(shù)組未初始化蓖议,會調(diào)用父類的longAccumulate去初始化cell[],如果cell[]已經(jīng)初始化但沖突發(fā)生在Cell單元內(nèi)讥蟆,則也是調(diào)用父類的longAccumulate,此時可能需要對cell[]擴容了纺阔。接下來我們來看下longAccumulate方法瘸彤。
longAccumulate方法
?三個參數(shù)第一個為要累加的值,第二個為null笛钝,第三個為wasUncontended表示調(diào)用方法之前的add方法是否未發(fā)生競爭;
/**
* Handles cases of updates involving initialization, resizing,
* creating new Cells, and/or contention. See above for
* explanation. This method suffers the usual non-modularity
* problems of optimistic retry code, relying on rechecked sets of
* reads.
*
* @param x the value
* @param fn the update function, or null for add (this convention
* avoids the need for an extra field or function in LongAdder).
* @param wasUncontended false if CAS failed before call
*/
final void longAccumulate(long x, LongBinaryOperator fn,
boolean wasUncontended) {
//獲取當前線程的threadLocalRandomProbe值作為hash值,如果當前線程的threadLocalRandomProbe為0质况,說明當前線程是第一次進入該方法,則強制設(shè)置線程的threadLocalRandomProbe為ThreadLocalRandom類的成員靜態(tài)私有變量probeGenerator的值玻靡,后面會詳細將hash值的生成;
//另外需要注意结榄,如果threadLocalRandomProbe=0,代表新的線程開始參與cell爭用的情況
//1.當前線程之前還沒有參與過cells爭用(也許cells數(shù)組還沒初始化囤捻,進到當前方法來就是為了初始化cells數(shù)組后爭用的),是第一次執(zhí)行base的cas累加操作失斁世省;
//2.或者是在執(zhí)行add方法時蝎土,對cells某個位置的Cell的cas操作第一次失敗视哑,則將wasUncontended設(shè)置為false,那么這里會將其重新置為true誊涯;第一次執(zhí)行操作失數惨恪;
//凡是參與了cell爭用操作的線程threadLocalRandomProbe都不為0暴构;
int h;
if ((h = getProbe()) == 0) {
//初始化ThreadLocalRandom;
ThreadLocalRandom.current(); // force initialization
//將h設(shè)置為0x9e3779b9
h = getProbe();
//設(shè)置未競爭標記為true
wasUncontended = true;
}
//cas沖突標志跪呈,表示當前線程hash到的Cells數(shù)組的位置,做cas累加操作時與其它線程發(fā)生了沖突取逾,cas失敽穆獭;collide=true代表有沖突砾隅,collide=false代表無沖突
boolean collide = false; // True if last slot nonempty
for (;;) {
Cell[] as; Cell a; int n; long v;
//這個主干if有三個分支
//1.CASE1:處理cells數(shù)組已經(jīng)正常初始化了的情況(這個if分支處理add方法的四個條件中的3和4)
//2.CASE2:處理cells數(shù)組沒有初始化或者長度為0的情況缭乘;(這個分支處理add方法的四個條件中的1和2)
//3.CASE3:處理如果cell數(shù)組沒有初始化,并且其它線程正在執(zhí)行對cells數(shù)組初始化的操作琉用,及cellbusy=1堕绩;則嘗試將累加值通過cas累加到base上
//先看主分支一
if ((as = cells) != null && (n = as.length) > 0) {
/**
*CASE1:這個是處理add方法內(nèi)部if分支的條件3:如果被hash到的位置為null,說明沒有線程在這個位置設(shè)置過值邑时,沒有競爭奴紧,可以直接使用,則用x值作為初始值創(chuàng)建一個新的Cell對象晶丘,對cells數(shù)組使用cellsBusy加鎖黍氮,然后將這個Cell對象放到cells[m%cells.length]位置上
*/
if ((a = as[(n - 1) & h]) == null) {
//cellsBusy == 0 代表當前沒有線程cells數(shù)組做修改
if (cellsBusy == 0) { // Try to attach new Cell
//將要累加的x值作為初始值創(chuàng)建一個新的Cell對象唐含,
Cell r = new Cell(x); // Optimistically create
//如果cellsBusy=0無鎖,則通過cas將cellsBusy設(shè)置為1加鎖
if (cellsBusy == 0 && casCellsBusy()) {
//標記Cell是否創(chuàng)建成功并放入到cells數(shù)組被hash的位置上
boolean created = false;
try { // Recheck under lock
Cell[] rs; int m, j;
//再次檢查cells數(shù)組不為null沫浆,且長度不為空捷枯,且hash到的位置的Cell為null
if ((rs = cells) != null &&
(m = rs.length) > 0 &&
rs[j = (m - 1) & h] == null) {
//將新的cell設(shè)置到該位置
rs[j] = r;
created = true;
}
} finally {
//去掉鎖
cellsBusy = 0;
}
//生成成功,跳出循環(huán)
if (created)
break;
//如果created為false专执,說明上面指定的cells數(shù)組的位置cells[m%cells.length]已經(jīng)有其它線程設(shè)置了cell了淮捆,繼續(xù)執(zhí)行循環(huán)。
continue; // Slot is now non-empty
}
}
//如果執(zhí)行的當前行本股,代表cellsBusy=1攀痊,有線程正在更改cells數(shù)組,代表產(chǎn)生了沖突拄显,將collide設(shè)置為false
collide = false;
}
/**
* case2:如果add方法中條件4的通過cas設(shè)置cells[m%cells.length]位置的Cell對象中的value值設(shè)置為v+x失敗,說明已經(jīng)發(fā)生競爭苟径,將wasUncontended設(shè)置為true,跳出內(nèi)部的if判斷躬审,最后重新計算一個新的probe棘街,然后重新執(zhí)行循環(huán);
*/
else if (!wasUncontended) // CAS already known to fail
//設(shè)置未競爭標志位true,繼續(xù)執(zhí)行承边,后面會算一個新的probe值蹬碧,然后重新執(zhí)行循環(huán)。
wasUncontended = true; // Continue after rehash
/**
* case3:新的爭用線程參與爭用的情況:處理剛進入當前方法時threadLocalRandomProbe=0的情況炒刁,也就是當前線程第一次參與cell爭用的cas失敗恩沽,這里會嘗試將x值加到cells[m%cells.length]的value ,如果成功直接退出
*/
else if (a.cas(v = a.value, ((fn == null) ? v + x :
fn.applyAsLong(v, x))))
break;
/**
* case4:case3處理新的線程爭用執(zhí)行失敗了翔始,這時如果cells數(shù)組的長度已經(jīng)到了最大值(大于等于cup數(shù)量)罗心,或者是當前cells已經(jīng)做了擴容,則將collide設(shè)置為false城瞎,后面重新計算prob的值
*/
else if (n >= NCPU || cells != as)
collide = false; // At max size or stale
/**
* case5:如果發(fā)生了沖突collide=false渤闷,則設(shè)置其為true;會在最后重新計算hash值后脖镀,進入下一次for循環(huán)
*/
else if (!collide)
//設(shè)置沖突標志飒箭,表示發(fā)生了沖突,需要再次生成hash蜒灰,重試锈拨。 如果下次重試任然走到了改分支此時collide=true澜术,!collide條件不成立纳决,則走后一個分支
collide = true;
/**
* case6:擴容cells數(shù)組游沿,新參與cell爭用的線程兩次均失敗,且符合庫容條件翅溺,會執(zhí)行該分支
*/
else if (cellsBusy == 0 && casCellsBusy()) {
try {
//檢查cells是否已經(jīng)被擴容
if (cells == as) { // Expand table unless stale
Cell[] rs = new Cell[n << 1];
for (int i = 0; i < n; ++i)
rs[i] = as[i];
cells = rs;
}
} finally {
cellsBusy = 0;
}
collide = false;
continue; // Retry with expanded table
}
//為當前線程重新計算hash值
h = advanceProbe(h);
}
//這個大的分支處理add方法中的條件1與條件2成立的情況脑漫,如果cell表還未初始化或者長度為0髓抑,先嘗試獲取cellsBusy鎖。
else if (cellsBusy == 0 && cells == as && casCellsBusy()) {
boolean init = false;
try { // Initialize table
//初始化cells數(shù)組优幸,初始容量為2,并將x值通過hash&1吨拍,放到0個或第1個位置上
if (cells == as) {
Cell[] rs = new Cell[2];
rs[h & 1] = new Cell(x);
cells = rs;
init = true;
}
} finally {
//解鎖
cellsBusy = 0;
}
//如果init為true說明初始化成功,跳出循環(huán)
if (init)
break;
}
/**
*如果以上操作都失敗了网杆,則嘗試將值累加到base上羹饰;
*/
else if (casBase(v = base, ((fn == null) ? v + x :
fn.applyAsLong(v, x))))
break; // Fall back on using base
}
}
getProbe()方法hash的生成
?hash是LongAdder定位當前線程應(yīng)該將值累加到cells表的哪個位置上的,所以hash算法是非常重要的跛璧,接下來看看它的實現(xiàn);
?在java的Thread類中有一個成員變量:
/** Probe hash value; nonzero if threadLocalRandomSeed initialized */
@sun.misc.Contended("tlr")
int threadLocalRandomProbe;
?threadLocalRandomProbe這個變量的值就是LongAdder用來hash定位Cells數(shù)組位置的新啼,平時線程的這個變量一般用不到追城,它的值一直都是0。在LongAdder的父類Striped64里通過getProbe方法獲取當前線程的threadLocalRandomProbe值:
/**
* Returns the probe value for the current thread.
* Duplicated from ThreadLocalRandom because of packaging restrictions.
*/
static final int getProbe() {
//PROBE是threadLocalRandomProbe變量在Thread類里面的偏移量燥撞,所以下面語句獲取的就是threadLocalRandomProbe的值座柱;
return UNSAFE.getInt(Thread.currentThread(), PROBE);
}
threadLocalRandomProbe的初始化
?線程對LongAdder的累加操作,在沒有進入longAccumulate方法前物舒,threadLocalRandomProbe一直都是0色洞,當發(fā)生爭用后才會進入longAccumulate方法中,進入該方法第一件事就是判斷threadLocalRandomProbe是否為0冠胯,如果為0火诸,則將其設(shè)置為0x9e3779b9
。
int h;
if ((h = getProbe()) == 0) { // 給當前線程生成一個非0的hash值
//初始化ThreadLocalRandom;
ThreadLocalRandom.current(); // force initialization
//將h設(shè)置為0x9e3779b9
h = getProbe();
//設(shè)置未競爭標記為true
wasUncontended = true;
}
?重點在 ThreadLocalRandom.current(); 這一行
/**
* Returns the current thread's {@code ThreadLocalRandom}.
*
* @return the current thread's {@code ThreadLocalRandom}
*/
public static ThreadLocalRandom current() {
if (UNSAFE.getInt(Thread.currentThread(), PROBE) == 0)
localInit();
return instance;
}
?在current方法中判斷如果probe的值為0荠察,則執(zhí)行l(wèi)ocaInit()方法置蜀,將當前線程的probe設(shè)置為非0的值,該方法實現(xiàn)如下:
/**
* Initialize Thread fields for the current thread. Called only
* when Thread.threadLocalRandomProbe is zero, indicating that a
* thread local seed value needs to be generated. Note that even
* though the initialization is purely thread-local, we need to
* rely on (static) atomic generators to initialize the values.
*/
static final void localInit() {
//private static final int PROBE_INCREMENT = 0x9e3779b9;
int p = probeGenerator.addAndGet(PROBE_INCREMENT);
//prob不能為0
int probe = (p == 0) ? 1 : p; // skip 0
long seed = mix64(seeder.getAndAdd(SEEDER_INCREMENT));
//獲取當前線程
Thread t = Thread.currentThread();
UNSAFE.putLong(t, SEED, seed);
//將probe的值更新為probeGenerator的值
UNSAFE.putInt(t, PROBE, probe);
}
?probeGenerator 是static 類型的AtomicInteger類悉盆,每執(zhí)行一次localInit()方法盯荤,都會將probeGenerator 累加一次0x9e3779b9這個值;焕盟,0x9e3779b9這個數(shù)字的得來是 2^32 除以一個常數(shù)秋秤,這個常數(shù)就是傳說中的黃金比例 1.6180339887;然后將當前線程的threadLocalRandomProbe設(shè)置為probeGenerator 的值脚翘,如果probeGenerator 為0灼卢,這取1;
threadLocalRandomProbe重新生成
?就是將prob的值左右移位 来农、異或操作三次
/**
* Pseudo-randomly advances and records the given probe value for the
* given thread.
*/
static final int advanceProbe(int probe) {
probe ^= probe << 13; // xorshift
probe ^= probe >>> 17;
probe ^= probe << 5;
UNSAFE.putInt(Thread.currentThread(), PROBE, probe);
return probe;
}
LongAdder的sum方法
?最后來看下LongAdder的sum方法芥玉,
/**
* Returns the current sum. The returned value is <em>NOT</em> an
* atomic snapshot; invocation in the absence of concurrent
* updates returns an accurate result, but concurrent updates that
* occur while the sum is being calculated might not be
* incorporated.
*
* @return the sum
*/
public long sum() {
Cell[] as = cells; Cell a;
long sum = base;
if (as != null) {
for (int i = 0; i < as.length; ++i) {
if ((a = as[i]) != null)
sum += a.value;
}
}
return sum;
}
? 這個其實印證了我們原理章節(jié)中所說的一個求和公式;需要注意的是备图,這個方法只能得到某個時刻的近似值灿巧,這也就是LongAdder并不能完全替代LongAtomic的原因之一赶袄。
四、Striped64的其它子類
?JDK1.8時抠藕,java.util.concurrent.atomic包中饿肺,除了新引入LongAdder外,還有引入了它的三個兄弟類:LongAccumulator盾似、DoubleAdder敬辣、DoubleAccumulator
LongAccumulator
?LongAccumulator是LongAdder的增強版。LongAdder只能針對數(shù)值的進行加減運算零院,而LongAccumulator提供了自定義的函數(shù)操作溉跃。其構(gòu)造函數(shù)如下:
/**
* Creates a new instance using the given accumulator function
* and identity element.
* @param accumulatorFunction a side-effect-free function of two arguments
* @param identity identity (initial value) for the accumulator function
*/
public LongAccumulator(LongBinaryOperator accumulatorFunction,
long identity) {
this.function = accumulatorFunction;
base = this.identity = identity;
}
?通過LongBinaryOperator,可以自定義對入?yún)⒌娜我獠僮鞲娉⒎祷亟Y(jié)果(LongBinaryOperator接收2個long作為參數(shù)撰茎,并返回1個long)
?LongAccumulator內(nèi)部原理和LongAdder幾乎完全一樣,都是利用了父類Striped64的longAccumulate方法打洼。
DoubleAdder和DoubleAccumulator
?從名字也可以看出龄糊,DoubleAdder和DoubleAccumulator用于操作double原始類型。
?與LongAdder的唯一區(qū)別就是募疮,其內(nèi)部會通過一些方法炫惩,將原始的double類型,轉(zhuǎn)換為long類型阿浓,其余和LongAdder完全一樣:
/**
* Adds the given value.
*
* @param x the value to add
*/
public void add(double x) {
Cell[] as; long b, v; int m; Cell a;
if ((as = cells) != null ||
!casBase(b = base,
Double.doubleToRawLongBits //Double內(nèi)部轉(zhuǎn)換成Long的方法
(Double.longBitsToDouble(b) + x))) {
boolean uncontended = true;
if (as == null || (m = as.length - 1) < 0 ||
(a = as[getProbe() & m]) == null ||
!(uncontended = a.cas(v = a.value,
Double.doubleToRawLongBits
(Double.longBitsToDouble(v) + x))))
doubleAccumulate(x, null, uncontended);
}
}