5 原子類
JUC 包中提供了許多原子性操作類,這些類都是使用非阻塞算法CAS實(shí)現(xiàn)的,原子類的作用和鎖類似伟姐,都是為了保證并發(fā)情況下線程安全。相比使用鎖實(shí)現(xiàn)原子操作性能更好具有以下優(yōu)點(diǎn):
- 粒度更細(xì):原子變量可以把競爭范圍縮小到變量級(jí)別稿壁,通常鎖的粒度都要大于原子變量的粒度乐疆。
- 效率更高:CAS相比切換到內(nèi)核態(tài)掛起喚醒線程效率更高,除了高度競爭的情況原子類效率更高蒂胞。
常見 6 種原子類如下所示:
原子類型 | 舉例 | 作用 |
---|---|---|
Atomic-基本類型 | AtomicInteger,AtomicLong涮较,AtomicBoolean | |
Atomic-Array數(shù)組類型 | AtomicIntegerArray稠鼻,AtomicLongArray,AtomicBooleanArray | |
Atomic-Reference引用類型 | AtomicReference狂票、AtomicStampedReference候齿、AtomicMarkableReference | |
Atomic-FieldUpdater升級(jí) | AtomicIntegerFieldUpdater,AtomicLongFieldUpdater闺属,AtomicReferenceFieldUpdater | |
Adder 累加器 | LongAdder慌盯、DoubleAdder | |
Accumlator 累加器 | LongAccumlator、DoubleAccumlator |
5.1 AtomicInteger
AtomicInteger 的使用示例如下所示掂器,對(duì)于基本類型如果需要保證線程安全亚皂,我們可以使用 AtomicInteger 來代替 synchronized 和 Lock,使用更加簡潔優(yōu)雅国瓮,也保證了線程安全灭必。
public class AtomicintegerDemo {
public volatile int num = 0;
private AtomicInteger atomicInteger = new AtomicInteger(0);
public static void main(String[] args) throws InterruptedException {
AtomicintegerDemo dmeo = new AtomicintegerDemo();
// 兩個(gè)線程各執(zhí)行1000次累加,期望結(jié)果是得到2000
Thread t1 = new Thread(() -> dmeo.increment());
Thread t2 = new Thread(() -> dmeo.increment());
t1.start();
t2.start();
// 等待兩個(gè)線程執(zhí)行結(jié)束
t1.join();
t2.join();
// 打印普通變量的累加結(jié)果
System.out.println(dmeo.num);
// 打印原子變量的累加結(jié)果
System.out.println(dmeo.atomicInteger.get());
}
// 基礎(chǔ)類型+1
public void incrementBasic() {
num++;
}
// 原子類型+1
public void incrementAtomic() {
atomicInteger.getAndIncrement();
}
// 累加1000次
public void increment() {
for (int i = 0; i < 1000; i++) {
incrementBasic();
incrementAtomic();
}
}
}
AtomicInteger 常見方法如下所示乃摹,代碼示例見Github:
-
set(int value)
設(shè)置值 -
get()
獲取當(dāng)前值 -
getAndSet(int newValue)
獲取當(dāng)前值禁漓,并設(shè)置新值 -
getAndIncrement()
獲取當(dāng)前值,并自增+1 -
getAndDecrement()
獲取當(dāng)前值孵睬,并自減-1 -
getAndAdd(int value)
獲取當(dāng)前值播歼,并加上值value -
incrementAndGet()
先自增+1,再返回自增后的值 -
compareAndSet(int expect, int update)
使用CAS方式修改值掰读,修改成功返回true秘狞,修改失敗返回false
AtomicInteger 與 synchronized
只有 synchronized 中的自適應(yīng)自旋鎖,才會(huì)自旋一定次數(shù)后將線程掛起蹈集,即升級(jí)為重量級(jí)鎖烁试。而 AtomicInteger 會(huì)死循環(huán)CAS直至成功,所以高并發(fā)環(huán)境下 synchronized 效率會(huì)高于AtomicInteger拢肆,這也是LongAdder誕生的原因减响。
常見6種原子類
原子類使用
數(shù)組原子類
引用原子類
升級(jí)為原子類
LongAdder
AtomicLong 通過 CAS 提供了非阻塞的原子操作,相比使用阻塞算法的同步器性能已經(jīng)很好了善榛,但是使用AtomicLong時(shí),在高并發(fā)環(huán)境下大量線程會(huì)去競爭更新同一個(gè)原子變量呻畸,但是由于同時(shí)只會(huì)有一個(gè)線程的CAS操作會(huì)成功移盆,這就導(dǎo)致大量線程競爭失敗后,會(huì)進(jìn)行死循環(huán)不斷自旋嘗試CAS操作伤为,這樣會(huì)浪費(fèi)CPU資源咒循。
針對(duì)高并發(fā)環(huán)境下CAS操作浪費(fèi)CPU資源之外据途,AtomicLong 還有一個(gè)缺點(diǎn)就是更新數(shù)據(jù)前需要從主存獲取數(shù)據(jù),更新數(shù)據(jù)后需要刷新數(shù)據(jù)到主存叙甸。如下圖所示颖医,thread-1 運(yùn)行在 core-1 上,修改變量 ctr 后裆蒸,需要將 ctr 從本地內(nèi)存刷新flush到主存熔萧;thread-2 運(yùn)行在 core-2 上,修改變量 ctr 前僚祷,需要從主存獲取 ctr 的最新數(shù)據(jù)刷新refresh到本地內(nèi)存佛致。
(CAS涉及到預(yù)期值,主內(nèi)存值辙谜,更新值 俺榆。 當(dāng)且僅當(dāng)預(yù)期值==主內(nèi)存值時(shí)候,才會(huì)將主內(nèi)存值更新為更新值 装哆。 )
針對(duì)高并發(fā)環(huán)境下CAS操作浪費(fèi)CPU資源罐脊,和每次更新都需要刷新到主存的缺點(diǎn),JDK8中提供了一個(gè)原子自增自減類LongAdder
蜕琴。
AtomicLong的性能瓶頸是多個(gè)線程競爭一個(gè)變量的更新導(dǎo)致的萍桌,LongAdder的思路就是空間換時(shí)間,每個(gè)線程保存一份變量的副本進(jìn)行自增自減操作奸绷,這樣就避免了多個(gè)線程競爭梗夸,在最后獲取結(jié)果時(shí),再將這多個(gè)副本變量相加即可得到結(jié)果号醉。與ThreadLocal
的思路相同反症,每個(gè)線程保存自己的副本避免競爭
如下圖所示,LongAdder 會(huì)在每個(gè)線程保存一份變量 ctr 的副本畔派,就能避免多個(gè)線程CAS競爭铅碍,也不需要頻繁刷新數(shù)據(jù)到主存。
需要執(zhí)行 1w 個(gè)任務(wù)线椰,每個(gè)任務(wù)的操作是累加 1w 次胞谈,這些任務(wù)由 20 個(gè)線程執(zhí)行,
LongAdder示例代碼如下:
public class LongAdderDemo {
public static void main(String[] args) throws InterruptedException {
// 計(jì)數(shù)器counter
LongAdder counter = new LongAdder();
ExecutorService service = Executors.newFixedThreadPool(20);
long start = System.currentTimeMillis();
// 1w個(gè)累加任務(wù)憨愉,每個(gè)累加任務(wù)執(zhí)行1w次累加操作
for (int i = 0; i < 10000; i++) {
service.submit(new Task(counter));
}
service.shutdown();
// 等待任務(wù)執(zhí)行完畢
while (!service.isTerminated()) {
}
long end = System.currentTimeMillis();
// 打印計(jì)數(shù)器結(jié)果和耗時(shí)
System.out.println(counter.sum());
System.out.println("LongAdder耗時(shí):" + (end - start));
}
private static class Task implements Runnable {
private LongAdder counter;
public Task(LongAdder counter) {
this.counter = counter;
}
// 累加任務(wù)烦绳,任務(wù)內(nèi)容是累加1w次
@Override
public void run() {
for (int i = 0; i < 10000; i++) {
counter.increment();
}
}
}
}
上面代碼使用 LongAdder 作為計(jì)數(shù)器耗時(shí)為 AtomicLong 的十分之一。如果使用 AtomicLong配紫,每次自增 increment() 操作都需要修改值并刷新到主存径密,自增失敗的線程需要也會(huì)進(jìn)行自旋嘗試,浪費(fèi)CPU資源躺孝。而使用 LongAdder 則會(huì)修改每個(gè)線程的 counter 變量副本享扔,在最后使用sum()
方法求和即可底桂。
LongAdder 源碼分析
LongAdder#sum() 方法的源碼如下所示,是對(duì) Cell 數(shù)組的所有值求和惧眠,再與 base 相加得到LongAdder 的值籽懦。由于求和時(shí)沒有對(duì) Cell 數(shù)組進(jìn)行加鎖,所以在求和操作時(shí)可能有線程對(duì)Cell 值進(jìn)行了修改氛魁,因此在上面的示例代碼中暮顺,我們是等線程執(zhí)行完畢才進(jìn)行的求和sum()
操作。
// 返回LongAdder的值
public long sum() {
Cell[] as = cells;
Cell a;
long sum = base;
if (as != null) {
for (int i = 0; i < as.length; ++i) {
if ((a = as[i]) != null)
// 對(duì)base和數(shù)組所有值求和
sum += a.value;
}
}
return sum;
}
// 等價(jià)于sum求和操作
public long longValue() {
return sum();
}
下面是 LongAdder#add 的源碼呆盖,
在代碼1處拖云,分析如下:
如果以下兩種條件則繼續(xù)執(zhí)行if內(nèi)的語句
- cells數(shù)組不為null(不存在爭用的時(shí)候,cells數(shù)組一定為null应又,一旦對(duì)base的cas操作失敗宙项,才會(huì)初始化cells數(shù)組)
- 如果cells數(shù)組為null,如果casBase執(zhí)行成功株扛,則直接返回尤筐;如果casBase方法執(zhí)行失敗(casBase失敗洞就,說明第一次爭用沖突產(chǎn)生盆繁,需要對(duì)cells數(shù)組初始化)進(jìn)入if內(nèi);casBase方法很簡單旬蟋,就是通過UNSAFE類的cas設(shè)置成員變量base的值為base+要累加的值油昂。casBase執(zhí)行成功的前提是無競爭,這時(shí)候cells數(shù)組還沒有用到為null倾贰,可見在無競爭的情況下是類似于AtomticInteger處理方式冕碟,使用cas做累加。
在代碼2處匆浙,分析如下:
- as == null : cells數(shù)組未被初始化安寺,成立則直接進(jìn)入if執(zhí)行cell初始化
- (m = as.length - 1) < 0: cells數(shù)組的長度為0,條件1與2都代表cells數(shù)組沒有被初始化成功首尼,初始化成功的cells數(shù)組長度為2挑庶;
- (a = as[getProbe() & m]) == null :如果cells被初始化,且它的長度不為0软能,則通過getProbe方法獲取當(dāng)前線程Thread的threadLocalRandomProbe變量的值迎捺,初始為0,然后執(zhí)行threadLocalRandomProbe&(cells.length-1 ),相當(dāng)于m%cells.length查排;如果cells[threadLocalRandomProbe%cells.length]的位置為null凳枝,這說明這個(gè)位置從來沒有線程做過累加,需要進(jìn)入if繼續(xù)執(zhí)行雹嗦,在這個(gè)位置創(chuàng)建一個(gè)新的Cell對(duì)象范舀;
- !(uncontended = a.cas(v = a.value, v + x)):嘗試對(duì)cells[threadLocalRandomProbe%cells.length]位置的Cell對(duì)象中的value值做累加操作,并返回操作結(jié)果,如果失敗了則進(jìn)入if,重新計(jì)算一個(gè)threadLocalRandomProbe了罪;
在代碼3處锭环,即進(jìn)入if語句執(zhí)行l(wèi)ongAccumulate方法,有三種情況
- 前兩個(gè)條件代表cells沒有初始化,
- 第三個(gè)條件指當(dāng)前線程hash到的cells數(shù)組中的位置還沒有其它線程做過累加操作泊藕,
- 第四個(gè)條件代表產(chǎn)生了沖突,uncontended=false
public void add(long x) {
Cell[] as;
long b, v;
int m;
Cell a;
// 代碼1辅辩,cells不為null時(shí)使用CAS操作在base上相加,即casBase
if ((as = cells) != null || !casBase(b = base, b + x)) {
// 如果cells不為null娃圆,或者CAS操作失敗了玫锋,則執(zhí)行下面操作
// uncontended判斷cells數(shù)組中,當(dāng)前線程要做cas累加操作的某個(gè)元素是否不存在爭用讼呢,
// 如果cas失敗則存在爭用撩鹿;false代表存在爭用,true代表不存在爭用悦屏。
boolean uncontended = true;
// 代碼2节沦,
if (as == null || (m = as.length - 1) < 0 ||
(a = as[getProbe() & m]) == null ||
!(uncontended = a.cas(v = a.value, v + x)))
// 代碼3
longAccumulate(x, null, uncontended);
}
}
/**
* Equivalent to {@code add(1)}.
*/
public void increment() {
add(1L);
}
// 補(bǔ)充longAccumulate源碼分析,參考Java并發(fā)編程之美
LongAdder 與 AtomicLong 的適用場景
從上面的分析來看是不行的础爬,因?yàn)锳tomicLong提供了很多cas方法甫贯,例如getAndIncrement、getAndDecrement等看蚜,使用起來非常的靈活叫搁,而LongAdder只有add和sum,適合的是統(tǒng)計(jì)求和計(jì)數(shù)的場景供炎,場景比較單一渴逻。
優(yōu)點(diǎn):由于 JVM 會(huì)將 64位的double,long 型變量的讀操作分為兩次32位的讀操作,所以低并發(fā)保持了 AtomicLong性能,高并發(fā)下熱點(diǎn)數(shù)據(jù)被 hash 到多個(gè) Cell,有限分離,通過分散提升了并行度
但統(tǒng)計(jì)時(shí)有數(shù)據(jù)更新,也可能會(huì)出現(xiàn)數(shù)據(jù)誤差,但高并發(fā)場景有限使用此類,低時(shí)還是可以繼續(xù) AtomicLong碱茁。
LongAccumulator
LongAdder 類是 LongAccumulator 的一個(gè)特例裸卫,LongAccumulator 功能更加強(qiáng)大,可以傳入計(jì)算函數(shù)纽竣,也可以指定初始值墓贿,查看LongAccumulator示例代碼。
public class LongAccumulatorDemo {
public static void main(String[] args) {
// 累加器蜓氨,初始值為100聋袋,傳入函數(shù)是表示對(duì)傳入數(shù)值和當(dāng)前值進(jìn)行的運(yùn)算
LongAccumulator longAccumulator = new LongAccumulator((x, y) -> x + y, 100);
// 傳入值為1,根據(jù)傳入函數(shù)穴吹,是將1與當(dāng)前值相加
longAccumulator.accumulate(1);
longAccumulator.accumulate(2);
System.out.println(longAccumulator.get());
}
}
LongAdder#add 方法與 LongAccumulator#accumulate 方法最終都調(diào)用的 Striped64#longAccumulate 方法幽勒,區(qū)別是LongAdder 使用默認(rèn)的相加操作,而 LongAccumulator 會(huì)傳入自定義的計(jì)算函數(shù)港令。
下面是 LongAccumulator 的源碼:
public class LongAccumulator extends Striped64 implements Serializable {
private final LongBinaryOperator function;
private final long identity;
/**
* @param accumulatorFunction 對(duì)傳入值與當(dāng)前值做的運(yùn)算
* @param identity identity 初始值
*/
public LongAccumulator(LongBinaryOperator accumulatorFunction,
long identity) {
// 保存自定義的運(yùn)算規(guī)則
this.function = accumulatorFunction;
base = this.identity = identity;
}
public void accumulate(long x) {
Cell[] as; long b, v, r; int m; Cell a;
if ((as = cells) != null ||
(r = function.applyAsLong(b = base, x)) != b && !casBase(b, r)) {
boolean uncontended = true;
if (as == null || (m = as.length - 1) < 0 ||
(a = as[getProbe() & m]) == null ||
!(uncontended =
(r = function.applyAsLong(v = a.value, x)) == v ||
a.cas(v, r)))
// 最終調(diào)用啥容,與LongAdder不同的是需要傳入自定義的函數(shù)function
longAccumulate(x, function, uncontended);
}
}
下面兩行代碼實(shí)現(xiàn)的功能是一樣的锈颗,都可以實(shí)現(xiàn)線程安全的累加。
LongAdder adder = new LongAdder();
adder.increment();
LongAccumulator accumulator = new LongAccumulator((x, y) -> x + y, 0);
accumulator.accumulate(1);
6 CAS
在Java中可以通過鎖或CAS的方式來實(shí)現(xiàn)原子操作咪惠,JVM中的CAS操作是使用處理器提供的CMPXCHG指令實(shí)現(xiàn)的击吱。自旋CAS實(shí)現(xiàn)的基本思路就是循環(huán)進(jìn)行CAS操作直至成功為止。
CAS 實(shí)現(xiàn)原子操作三大問題
- ABA問題
什么是ABA問題?
因?yàn)镃AS需要在操作值得時(shí)候遥昧,檢查值有沒有發(fā)生變化覆醇,如果沒有發(fā)生變化則更新,但是如果一個(gè)值原來是A炭臭、變成了B永脓、又變成了A,那么使用CAS進(jìn)行檢查時(shí)會(huì)發(fā)現(xiàn)它的值沒有發(fā)生變化鞋仍,但實(shí)際上卻變化了常摧。
解決ABA問題
- 使用版本號(hào)
ABA問題的解決思路是使用版本號(hào),每次變量更新的時(shí)候版本號(hào)加1威创,那么A->B->A就會(huì)變成1A->2B->3A
- jdk自帶原子變量
從jdk1.5開始排宰,jdk的Atomic包里就提供了一個(gè)類AtomicStampedReference來解決ABA問題,這個(gè)類中的compareAndSet方法的作用就是首先檢查當(dāng)前引用是否等于預(yù)期引用那婉,并且檢查當(dāng)前標(biāo)志是否等于預(yù)期標(biāo)志板甘,如果全部相等,則以原子方式將該引用和該標(biāo)志的值更新為指定的新值
/**
* 如果當(dāng)前引用等于預(yù)期引用并且當(dāng)前標(biāo)志等于預(yù)期標(biāo)志
* 則以原子方式將該引用和該標(biāo)志的值設(shè)置為給定新值
*
* @param expectedReference 預(yù)期引用值
* @param newReference 新的引用值
* @param expectedStamp 預(yù)期標(biāo)記值
* @param newStamp 新標(biāo)記值
* @return {@code true} if successful
*/
public boolean compareAndSet(V expectedReference,
V newReference,
int expectedStamp,
int newStamp) {
Pair<V> current = pair;
return
#預(yù)期引用==當(dāng)前引用
expectedReference == current.reference &&
#預(yù)期標(biāo)志==當(dāng)前標(biāo)志
expectedStamp == current.stamp &&
#新引用==當(dāng)前引用 并且 新標(biāo)志==當(dāng)前標(biāo)志
((newReference == current.reference &&
newStamp == current.stamp) ||
#原子更新值
casPair(current, Pair.of(newReference, newStamp)));
}
- 循環(huán)時(shí)間長開銷大
自旋CAS如果長時(shí)間不成功详炬,會(huì)給CPU帶來非常大的執(zhí)行開銷盐类。如果jvm能支持處理器提供的pause指令,那么效率會(huì)有一定的提升呛谜。pause指令有兩個(gè)作用:
第一在跳,它可以延遲流水線執(zhí)行指令(de-pipeline),使CPU不會(huì)消耗過多的執(zhí)行資源隐岛,延遲的時(shí)間取決于具體實(shí)現(xiàn)的版本猫妙,在一些處理器上延遲時(shí)間是零。
第二聚凹,它可以避免在退出循環(huán)的時(shí)候因內(nèi)存順序沖突(Memory Order Violation)而引起CPU流水線被清空(CPU Pipeline Flush)割坠,從而提高CPU的執(zhí)行效率。
- 只能保證一個(gè)共享變量的原子操作
當(dāng)對(duì)一個(gè)共享變量執(zhí)行操作時(shí)妒牙,我們可以使用循環(huán)CAS的方式來保證原子操作彼哼,但是多個(gè)共享變量操作時(shí),循環(huán)CAS就無法保證操作的原子性湘今,這個(gè)時(shí)候就可以用鎖敢朱。還有一個(gè)方法,就是把多個(gè)共享變量合并成一個(gè)共享變量來操作。比如拴签,有兩個(gè)共享變量i=2,j=a合并一下ij=2a孝常,然后用CAS來操作ij。從java1.5開始蚓哩,JDK提供了AtomicReference類來保證引用對(duì)象之間的原子性茫因,就可以把多個(gè)變量放在一個(gè)對(duì)象里來進(jìn)行CAS操作。
CAS適用場景
除了偏向鎖杖剪,JVM實(shí)現(xiàn)鎖的方式都使用了循環(huán)CAS。即當(dāng)一個(gè)線程進(jìn)入同步塊時(shí)使用循環(huán)CAS的方式來獲取鎖驰贷,退出同步塊時(shí)使用循環(huán)CAS的方式釋放鎖盛嘿。
synchronized中的輕量級(jí)鎖自旋鎖才會(huì)嘗試10次CAS然后升級(jí)為重量級(jí)鎖,而AtomicInteger 中的CAS會(huì)真的一直循環(huán)直至CAS成功括袒,所以在高并發(fā)環(huán)境下建議使用LongAdder代替AtomicInteger次兆。
CAS 的源碼實(shí)現(xiàn) http://www.reibang.com/p/c8e9bce8b3c6
LOCK cmpxchg
并發(fā)編程藝術(shù) p53