? ? 今天來(lái)講一講原子操作類耸彪,JUC包提供了一系列的原子性操作類碌上,這些操作類使用的是CAS非阻塞算法實(shí)現(xiàn)的掀鹅,相比于鎖,原子性的操作性能有更大的提升次屠。各個(gè)原子操作類的實(shí)現(xiàn)原理都大同小異园匹,今天就拿AtomicLong類進(jìn)行講解。除了講解AtomicLong類之后還會(huì)講解JDK8新增的原子操作類LongAdder.
AtomicLong
? ? 從AtomicLong類中的源碼可以看出來(lái)劫灶,AtomicLong類提供的方法都是在圍繞著兩個(gè)成員變量裸违,
private volatile long value;
private static final Unsafe unsafe = Unsafe.getUnsafe();
value,存儲(chǔ)的就是AtomicLong類增減之后的值本昏,由volatile關(guān)鍵字修飾避免的內(nèi)存可見(jiàn)性問(wèn)題供汛。
unsafe,Unsafe類實(shí)例涌穆,提供CAS非阻塞算法方法怔昨。
? ? 下面對(duì)開(kāi)發(fā)中常用到的AtomicLong類提供的方法來(lái)看看它的源碼到底是怎么實(shí)現(xiàn)的。
incrementAndGet
//調(diào)用unsafe方法宿稀,原子性設(shè)置value值為原始值+1趁舀,并返回遞增后的值
public final long incrementAndGet() {
return unsafe.getAndAddLong(this, valueOffset, 1L) +1L;
}
decrementAndGet
//調(diào)用unsafe方法,原子性設(shè)置value值為原始值-1祝沸,并返回遞減后的值
public final long decrementAndGet() {
return unsafe.getAndAddLong(this, valueOffset, -1L) -1L;
}
getAndAdd
//調(diào)用unsafe方法矮烹,原子性設(shè)置value值為原始值+delta。返回相加前的原始值
public final long getAndAdd(long delta) {
return unsafe.getAndAddLong(this, valueOffset, delta);
}
addAndGet
//調(diào)用unsafe方法罩锐,原子性設(shè)置value值為原始值+delta奉狈。返回相加后的值
public final long addAndGet(long delta) {return unsafe.getAndAddLong(this, valueOffset, delta) + delta;
}
? ? 上述的四個(gè)方法都調(diào)用了unsafe.getAndAddLong方法,我們知道CAS算法是非阻塞的并且多線程并發(fā)下只能有一個(gè)線程設(shè)置成功涩惑,那么上述方法是怎么保證值設(shè)置成并返回的呢仁期?讓我們來(lái)看看getAndAddLong中的源碼吧。
public final long getAndAddLong(Object var1, long var2, long var4) {
long var6;
? ? do {
var6 =this.getLongVolatile(var1, var2);????//獲得原始值
? ? }while(!this.compareAndSwapLong(var1, var2, var6, var6 + var4));????//使用CAS進(jìn)行設(shè)置值,直到設(shè)置成功
? ? return var6;
}
從上面的代碼中可以看出getAndAddLong方法通過(guò)自旋的方式保證使用CAS設(shè)置值成功后才返回蟀拷。前面鎖類型文章中有提到碰纬,自旋在高并發(fā)的時(shí)候會(huì)造成性能的損耗,這也是AtomicLong類會(huì)帶來(lái)的問(wèn)題问芬。
LongAdder
? ? 使用AtomicLong時(shí),在高并發(fā)下大量線程會(huì)同時(shí)去競(jìng)爭(zhēng)更新同一個(gè)原子變量寿桨,但是由于同時(shí)只能有一個(gè)線程通過(guò)CAS算法操作成功此衅,而其他線程通過(guò)無(wú)限自旋不斷嘗CAS操作,這會(huì)白白消耗CPU資源亭螟。這個(gè)問(wèn)題能有什么解決方案嗎挡鞍?當(dāng)然有,因?yàn)锳tomicLong只有一個(gè)共享變量value预烙,所以會(huì)造成競(jìng)爭(zhēng)壓力變大墨微,那么是不是可以嘗試增加共享變量的方式來(lái)緩解競(jìng)爭(zhēng)壓力呢?JDK8中新增的原子操作類LongAdder類就是通過(guò)增加共享變量的方式解決AtomicLong類存在的競(jìng)爭(zhēng)壓力過(guò)大造成性能下降的問(wèn)題扁掸。
下面用兩張圖來(lái)對(duì)比一下AtomicLong翘县、LongAdder競(jìng)爭(zhēng)共享變量的不同
? ? LongAdder類繼承Striped64類,Striped64類中有三個(gè)重要的成員變量谴分,
transient volatile long base;
transient volatile Cell[] cells;
transient volatile int cellsBusy;
? ??base:基數(shù)值锈麸,并發(fā)較少時(shí),所有累加操作都是對(duì)base變量操作的牺蹄。
? ? cells:共享變量數(shù)組忘伞,通過(guò)源碼可以看到Cell類添加了@sun.misc.Contended注解,不知道讀者是否還記得這個(gè)注解的作用沙兰?這個(gè)注解是為了解決偽共享而設(shè)計(jì)的氓奈,關(guān)于偽共享可以看這里。
? ??cellsBusy:由于cells數(shù)組是動(dòng)態(tài)的鼎天,cellBusy用來(lái)標(biāo)識(shí)cells數(shù)組正在擴(kuò)容或初始化舀奶,通過(guò)CAS算法對(duì)cellsBusy變量進(jìn)行操作,保證只有一個(gè)線程可以進(jìn)行cell數(shù)組的擴(kuò)容或初始化训措。
? ? 下面看看LongAdder常用的方法:
sum/longValue調(diào)用的sum方法
//返回當(dāng)前的總值
public long sum() {
Cell[] as =cells; Cell a;
? ? long sum =base;
? ? if (as !=null) {
for (int i =0; i < as.length; ++i) {
if ((a = as[i]) !=null)
sum += a.value;
? ? ? ? }
}
return sum;
}
????累加cells數(shù)組內(nèi)部的value后再累加base值伪节,由于累加過(guò)程并沒(méi)有進(jìn)行加鎖,所以在累加過(guò)程有可能別的線程已修改了值绩鸣,所有這個(gè)sum方法返回的并不是一個(gè)精準(zhǔn)的總數(shù)怀大。
reset
//重置值
public void reset() {
Cell[] as =cells; Cell a;
? ? base =0L;
? ? if (as !=null) {
for (int i =0; i < as.length; ++i) {
if ((a = as[i]) !=null)
a.value =0L;
? ? ? ? }
}
}
? ? 重置base變量,如果cells數(shù)組不為空則重置cell中的value值呀闻。
add
//增加指定的值
public void add(long x) {
Cell[] as; long b, v; int m; Cell a;
//判斷cells數(shù)組是否為null化借,為null則在base變量上累加x
? ? if ((as =cells) !=null || !casBase(b =base, b + x)) {
boolean uncontended =true;
? ? ? ? if (as ==null || (m = as.length -1) <0 ||
//通過(guò)當(dāng)前線程的threadLocalRandomProbe變量進(jìn)行計(jì)算要訪問(wèn)cells數(shù)組中的哪一個(gè)cell。
(a = as[getProbe() & m]) ==null ||
!(uncontended = a.cas(v = a.value, v + x)))
//如果計(jì)算得到的cell為null(需要對(duì)cells數(shù)組初始化)捡多,或cell不為null但是通過(guò)CAS算法設(shè)置失敱涂怠(已經(jīng)有其他的線程操作成功铐炫,則需要對(duì)cells進(jìn)行擴(kuò)容或重新計(jì)算cell)下面這行代碼就是對(duì)cells數(shù)組初始化或擴(kuò)容的。
longAccumulate(x, null, uncontended);
? ? }
}
longAccumulate
//對(duì)cells數(shù)組進(jìn)行初始化或擴(kuò)容操作蒜焊。
final void longAccumulate(long x, LongBinaryOperator fn,
? ? ? ? ? ? ? ? ? ? ? ? ? boolean wasUncontended) {
//初始化當(dāng)前線程的threadLocalRandomProbe變量
int h;
? ? if ((h =getProbe()) ==0) {
ThreadLocalRandom.current(); // force initialization
? ? ? ? h =getProbe();
? ? ? ? wasUncontended =true;
? ? }
boolean collide =false;? ? ? ? ? ? ? ? // True if last slot nonempty
? ? for (;;) {
Cell[] as; Cell a; int n; long v;
? ? ? ? if ((as =cells) !=null && (n = as.length) >0) {
//根據(jù)當(dāng)前線程的threadLocalRandomProbe和cells元素個(gè)數(shù)計(jì)算需要訪問(wèn)的cell倒信,如果cell為null執(zhí)行if中代碼
if ((a = as[(n -1) & h]) ==null) {
if (cellsBusy ==0) {// Try to attach new Cell
//為計(jì)算等到為null的cell賦值一個(gè)新創(chuàng)建的cell
? ? ? ? ? ? ? ? ? ? Cell r =new Cell(x);? // Optimistically create
? ? ? ? ? ? ? ? ? ? if (cellsBusy ==0 && casCellsBusy()) {
boolean created =false;
? ? ? ? ? ? ? ? ? ? ? ? try {// Recheck under lock
? ? ? ? ? ? ? ? ? ? ? ? ? ? Cell[] rs; int m, j;
? ? ? ? ? ? ? ? ? ? ? ? ? ? if ((rs =cells) !=null &&
(m = rs.length) >0 &&
rs[j = (m -1) & h] ==null) {
rs[j] = r;
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? created =true;
? ? ? ? ? ? ? ? ? ? ? ? ? ? }
}finally {
cellsBusy =0;
? ? ? ? ? ? ? ? ? ? ? ? }
if (created)
break;
? ? ? ? ? ? ? ? ? ? ? ? continue;? ? ? ? ? // Slot is now non-empty
? ? ? ? ? ? ? ? ? ? }
}
collide =false;
? ? ? ? ? ? }
else if (!wasUncontended)// CAS already known to fail
? ? ? ? ? ? ? ? wasUncontended =true;? ? ? // Continue after rehash
//計(jì)算等到的cell存在,則執(zhí)行CAS進(jìn)行設(shè)置
? ? ? ? ? ? else if (a.cas(v = a.value, ((fn ==null) ? v + x :
fn.applyAsLong(v, x))))
break;
//當(dāng)前cells數(shù)組元素個(gè)數(shù)大于CPU個(gè)數(shù)
? ? ? ? ? ? else if (n >=NCPU ||cells != as)
collide =false;? ? ? ? ? ? // At max size or stale
//是否有沖突
? ? ? ? ? ? else if (!collide)
collide =true;
//如果當(dāng)前cells數(shù)組元素個(gè)數(shù)沒(méi)有達(dá)到cpu個(gè)數(shù)泳梆,并且存在沖突則進(jìn)行cells數(shù)組擴(kuò)容
? ? ? ? ? ? else if (cellsBusy ==0 && casCellsBusy()) {
try {
if (cells == as) {// Expand table unless stale
? ? ? ? ? ? ? ? ? ? ? ? Cell[] rs =new Cell[n <<1];
? ? ? ? ? ? ? ? ? ? ? ? for (int i =0; i < n; ++i)
rs[i] = as[i];
? ? ? ? ? ? ? ? ? ? ? ? cells = rs;
? ? ? ? ? ? ? ? ? ? }
}finally {
cellsBusy =0;
? ? ? ? ? ? ? ? }
collide =false;
? ? ? ? ? ? ? ? continue;? ? ? ? ? ? ? ? ? // Retry with expanded table
? ? ? ? ? ? }
//為了能夠找到一個(gè)空閑的cell鳖悠,重新計(jì)算hash值。
h =advanceProbe(h);
? ? ? ? }
//初始化cells數(shù)組
else if (cellsBusy ==0 &&cells == as && casCellsBusy()) {
boolean init =false;
? ? ? ? ? ? try {// Initialize table
? ? ? ? ? ? ? ? if (cells == as) {
Cell[] rs =new Cell[2];
? ? ? ? ? ? ? ? ? ? rs[h &1] =new Cell(x);
? ? ? ? ? ? ? ? ? ? cells = rs;
? ? ? ? ? ? ? ? ? ? init =true;
? ? ? ? ? ? ? ? }
}finally {
cellsBusy =0;
? ? ? ? ? ? }
if (init)
break;
? ? ? ? }
else if (casBase(v =base, ((fn ==null) ? v + x :
fn.applyAsLong(v, x))))
break;? ? ? ? ? ? ? ? ? ? ? ? ? // Fall back on using base
? ? }
}
? ? 代碼中的注釋不知道我標(biāo)注的能不能讓讀者明白优妙。下面用幾個(gè)問(wèn)題來(lái)加深一下多LongAdder原理乘综。
問(wèn)題1:當(dāng)前線程如何知道需要訪問(wèn)cells數(shù)組中的哪個(gè)元素?
解答1:每個(gè)線程都有一個(gè)threadLocalRandomProbe變量套硼,通過(guò)threadLocalRandomProbe與cells數(shù)組大小進(jìn)行計(jì)算得到當(dāng)前線程需要訪問(wèn)的cell卡辰。
問(wèn)題2:如何初始化cells數(shù)組?
解答2:成功使用CAS算法修改cellsBusy變量為1之后進(jìn)行初始化邪意,初始化之后再講cellsBusy變量設(shè)置為0九妈。
問(wèn)題3:cells數(shù)組擴(kuò)容條件?
解答3:cells數(shù)組擴(kuò)容的條件是當(dāng)前cells數(shù)組的元素個(gè)數(shù)小于CPU個(gè)數(shù)抄罕,并且當(dāng)前線程計(jì)算得到的cell已經(jīng)有其他線程對(duì)其進(jìn)行操作允蚣,則觸發(fā)擴(kuò)容。
問(wèn)題4:線程訪問(wèn)分配的cell元素有沖突后如何處理呆贿?
解決方案4:如果當(dāng)前cells元素個(gè)數(shù)小于CPU個(gè)數(shù)嚷兔,沖突則進(jìn)行擴(kuò)容。如果當(dāng)前cells元素個(gè)數(shù)不小于CPU個(gè)數(shù)做入,則重新計(jì)算當(dāng)前線程的threadLocalRandomProbe變量冒晰,然后重新計(jì)算需要訪問(wèn)的cell,知道獲得的cell沒(méi)有沖突竟块。
?今天的分享就到這壶运,有看不明白的地方一定是我寫(xiě)的不夠清楚,所有歡迎提任何問(wèn)題以及改善方法浪秘。