LongAdder簡單介紹
在原子操作類AtomicLong中,在高并發(fā)的情況,會出現(xiàn)大量的線程去爭搶更新同一個原子變量,但是同時只能有一個線程CAS操作成功,這就會出現(xiàn)大量線程爭搶失敗,然后通過自旋進行重試,這就會消耗CPU的資源;這個時候就出現(xiàn)了LongAdder原子操作類;它主要就是將這里的原子變量復(fù)制成多個,然后多個變量由線程去爭,結(jié)束的時候,再進行多個變量值的計算;
先了解一下LongAdder的結(jié)構(gòu)從上圖可以看出LongAdder是繼承了Striped64這個類;其中定義了一個Cell類型的數(shù)組,Cell的具體結(jié)構(gòu)如下:
@sun.misc.Contended static final class Cell {
volatile long value;
Cell(long x) { value = x; }
final boolean cas(long cmp, long val) {
return UNSAFE.compareAndSwapLong(this, valueOffset, cmp, val);
}
// Unsafe mechanics
private static final sun.misc.Unsafe UNSAFE;
private static final long valueOffset;
static {
try {
UNSAFE = sun.misc.Unsafe.getUnsafe();
Class<?> ak = Cell.class;
valueOffset = UNSAFE.objectFieldOffset
(ak.getDeclaredField("value"));
} catch (Exception e) {
throw new Error(e);
}
}
}
1啥容、該類使用了@sun.misc.Contended注解,是為了解決偽共享問題; 2、該類內(nèi)部定義了一個volatile修飾的long類型的value變量; 3、該類內(nèi)部定義了一個cas(long cmp, long val)方法; 4、該類內(nèi)部通過sun.misc.Unsafe類獲取該類中value變量的偏移量;
因為每個Cell內(nèi)部都維護了一個變量,所以在高并發(fā)情況下,不同的線程,會訪問不同的Cell內(nèi)部的long類型變量,而且當(dāng)某一個線程在變更某一個Cell中的變量時,并不會在此Cell上進行自旋重試,而是嘗試在其他的Cell變量上重試;最后將所有的value變量值sum后再加上base返回;
接下來簡單了解一下LongAdder中比較重要的兩方法add(long x)和longAccumulate(long x, LongBinaryOperator fn, boolean wasUncontended)
- void add(long x)
public void add(long x) {
Cell[] as; long b, v; int m; Cell a;
//(1)
if ((as = cells) != null || !casBase(b = base, b + x)) {
boolean uncontended = true;
//(2)
if (as == null || (m = as.length - 1) < 0 ||
(a = as[getProbe() & m]) == null ||
!(uncontended = a.cas(v = a.value, v + x)))
longAccumulate(x, null, uncontended);
}
}
(1) 中判斷cells數(shù)組是否為null,CAS操作base的值是否成功;一般的在并發(fā)不高的情況下,都是通過CAS操作直接修改base的值來完成;在并發(fā)高的情況下,casBase(b = base, b + x)方法可能會出現(xiàn)失敗,這個時候就會使用cells數(shù)組;
(2) 中一共有三個判斷
情況1迎膜、as == null || (m = as.length - 1) < 0; 這里的as是對cells的引用,如果成立表示此時cells數(shù)據(jù)還未進行初始化;這里的m是cells數(shù)組長度-1,如果m<0成立,證明此時cells數(shù)組的長度小于1,則可以理解此時cells數(shù)據(jù)還未初始化,然后進入到了longAccumulate方法中;
情況2诉位、(a = as[getProbe() & m]) == null; 這里的getProbe()方法是獲取當(dāng)前線程的hash值,a的值是當(dāng)前線程的hash值在cells數(shù)組中對應(yīng)位置的值也就是Cell,如果為null,則證明當(dāng)前線程還未對cells數(shù)組中的元素進行操作,然后進入到了longAccumulate方法中;
static final int getProbe() { return UNSAFE.getInt(Thread.currentThread(), PROBE); }
情況3鲫尊、!(uncontended = a.cas(v = a.value, v + x)); 這里的a為當(dāng)前線程對應(yīng)的Cell, 然后對應(yīng)的CAS操作當(dāng)前的Cell中的value值,如果不成功,則進入到longAccumulate方法中
- void longAccumulate(long x, LongBinaryOperator fn, boolean wasUncontended)
final void longAccumulate(long x, LongBinaryOperator fn,
boolean wasUncontended) {
int h;
//(3)
if ((h = getProbe()) == 0) {
ThreadLocalRandom.current(); // force initialization
h = getProbe();
wasUncontended = true;
}
boolean collide = false; // True if last slot nonempty
for (;;) {
Cell[] as; Cell a; int n; long v;
//(4)
if ((as = cells) != null && (n = as.length) > 0) {
//(4.1)
if ((a = as[(n - 1) & h]) == null) {
if (cellsBusy == 0) { // Try to attach new Cell
Cell r = new Cell(x); // Optimistically create
if (cellsBusy == 0 && casCellsBusy()) {
boolean created = false;
try { // Recheck under lock
Cell[] rs; int m, j;
if ((rs = cells) != null &&
(m = rs.length) > 0 &&
rs[j = (m - 1) & h] == null) {
rs[j] = r;
created = true;
}
} finally {
cellsBusy = 0;
}
if (created)
break;
continue; // Slot is now non-empty
}
}
collide = false;
}
//(4.2)
else if (!wasUncontended) // CAS already known to fail
wasUncontended = true; // Continue after rehash
//(4.3)
else if (a.cas(v = a.value, ((fn == null) ? v + x :
fn.applyAsLong(v, x))))
break;
//(4.4)
else if (n >= NCPU || cells != as)
collide = false; // At max size or stale
//(4.5)
else if (!collide)
collide = true;
//(4.6)
else if (cellsBusy == 0 && casCellsBusy()) {
try {
if (cells == as) { // Expand table unless stale
Cell[] rs = new Cell[n << 1];
for (int i = 0; i < n; ++i)
rs[i] = as[i];
cells = rs;
}
} finally {
cellsBusy = 0;
}
collide = false;
continue; // Retry with expanded table
}
h = advanceProbe(h);
}
//(5)
else if (cellsBusy == 0 && cells == as && casCellsBusy()) {
boolean init = false;
try { // Initialize table
if (cells == as) {
Cell[] rs = new Cell[2];
rs[h & 1] = new Cell(x);
cells = rs;
init = true;
}
} finally {
cellsBusy = 0;
}
if (init)
break;
}
//(6)
else if (casBase(v = base, ((fn == null) ? v + x :
fn.applyAsLong(v, x))))
break; // Fall back on using base
}
}
該方法結(jié)合(2)中的三種情況來分析
情況1進入:此時cells數(shù)組還未初始化,執(zhí)行到(3)的時候h = getProbe()為當(dāng)前線程的hash值,因為當(dāng)前線程還未對threadLocalRandomProbe值做任何操作,所以它肯定是0;所以(h = getProbe()) == 0;成立! 然后調(diào)用ThreadLocalRandom.current();
public static ThreadLocalRandom current() { if (UNSAFE.getInt(Thread.currentThread(), PROBE) == 0) localInit(); return instance; }
static final void localInit() { int p = probeGenerator.addAndGet(PROBE_INCREMENT); int probe = (p == 0) ? 1 : p; // skip 0 long seed = mix64(seeder.getAndAdd(SEEDER_INCREMENT)); Thread t = Thread.currentThread(); UNSAFE.putLong(t, SEED, seed); UNSAFE.putInt(t, PROBE, probe); }
到這里才對threadLocalRandomProbe進行了操作,然后在h = getProbe();獲取最新的值; 這個時候因為as == cells == null,所以直接到(5);在(5)中cellsBusy是一個自旋鎖,當(dāng)?shù)扔?時,證明是無鎖狀態(tài),可以獲取鎖;cells == as判斷當(dāng)前的cells是不是沒有變動;然后再通過casCellsBusy()方法進行加鎖,將cellsBusy賦值為1;在(5)的內(nèi)部,先初始化一個大小為2的Cell[]數(shù)組,然后將通過位于運算將第0個或者第一個1元素的值設(shè)置為x的值,然后將這個新的數(shù)據(jù)賦值給cells數(shù)組,這個時候cells就不是空的了;最后釋放鎖,跳出循環(huán)結(jié)束;
情況二進入:此時cells數(shù)組肯定不為空,但是當(dāng)前線程在cells中還沒有設(shè)置過一個Cell元素;這個時候就到了(4)這一步,這個時候執(zhí)行a = as[(n - 1) & h]==null判斷,這里的h是當(dāng)前線程調(diào)用了ThreadLocalRandom.current()之后的hash值,在做一次判斷是否為null;
假設(shè)(4.1)成立;執(zhí)行cellsBusy==0獲取鎖,嘗試去new一個Cell對象,并且賦值為x;然后在嘗試獲取鎖,并且通過CAS修改鎖的狀態(tài),最后再次判斷當(dāng)前cells是否為空,當(dāng)前線程是否沒有對應(yīng)的Cell元素,然后將新new的元素賦值給當(dāng)前這個線程在cells數(shù)組中的對應(yīng)位置的元素;最后釋放鎖結(jié)束;
假設(shè)(4.1)不成立;證明當(dāng)前線程有對應(yīng)的Cell元素,這個時候的wasUncontended=true;所以不會進入(4.2),所以看(4.3),此時的fn是等于null的煤辨,所以(4.3)就是a.cas(v = a.value, v + x),也就是將當(dāng)前線程對應(yīng)的Cell對象中的原子變量進行CAS操作,如果CAS成功結(jié)束循環(huán);
假設(shè)(4.3)不成立;在(4.4)中設(shè)置collide = false;或者(4.5)中設(shè)置collide = true后,執(zhí)行h = advanceProbe(h);重新設(shè)置當(dāng)前線程的hash值,然后繼續(xù)下一輪循環(huán);
假設(shè)(4.6)成立;這里是是將cells數(shù)組進行擴容,然后繼續(xù)下一輪循環(huán);
情況三進入:此時對應(yīng)線程是有cell元素的,只是cas失敗了;然后進入(4.3)或者(4.6)繼續(xù)循環(huán);
代碼(6)中是直接進行CAS操作base值;