java.util.concurrent.atomic是jdk1.5新增的羹应,這個(gè)包下主要提供了一些原子類犬金,這些類基本特性是線程安全的昔期,保證數(shù)據(jù)的非阻塞同步(比jdk1.5之前的synchronized阻塞同步更高效)骂远,這樣就避免了阻塞同步中線程阻塞和喚醒帶來的性能問題。
下面來舉個(gè)例子說明下原子類的非阻塞同步翠订,代碼如下:
public class Test {
public static void main(String args[]) {
int threadCnt = 20;
Thread[] threads = new Thread[threadCnt];
CountDownLatch cdt = new CountDownLatch(threadCnt);
for (int i = 0; i < threadCnt; i++) {
threads[i] = new Thread(new Runnable() {
@Override
public void run() {
for (int j = 0; j < 1000000; j++) {
increase();
}
cdt.countDown();
}
});
threads[i].start();
}
for (Thread i : threads) {
try {
i.join();//保證前面的所有的線程執(zhí)行玩再接著執(zhí)行主線程
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// while (Thread.activeCount() > 1) {
// Thread.yield();
// }
// try {
// cdt.await();//保證前面的所有的線程執(zhí)行玩再接著執(zhí)行主線程
// } catch (InterruptedException e) {
// e.printStackTrace();
// }
System.out.println(count + " " + count2.get());
}
public static int count = 0;
public static AtomicInteger count2 = new AtomicInteger(0);
public static void increase() {
count++;
count2.incrementAndGet();
}
上面的代碼運(yùn)行了20個(gè)線程巢音,每個(gè)線程對count變量和count2變量進(jìn)行100W次自增操作,如果上面這段代碼能夠正常并發(fā)的話尽超,最后的結(jié)果應(yīng)該都是2000W()才對官撼,但實(shí)際結(jié)果卻發(fā)現(xiàn)每次運(yùn)行count的結(jié)果都不相同,都是一個(gè)小于2000W的數(shù)字似谁,而count2永遠(yuǎn)都是2000W傲绣。
要是換成volatile修飾count變量呢?volatile關(guān)鍵字很重要的兩個(gè)特性:
- 保證變量在線程間可見巩踏,對volatile變量所有的寫操作都能立即反應(yīng)到其他線程中斜筐,換句話說,volatile變量在各個(gè)線程中是一致的(得益于java內(nèi)存模型—"先行發(fā)生原則")蛀缝;
- 禁止指令的重排序優(yōu)化;
但是volatile并沒有我所期待的第三個(gè)特性:volatile修飾的變量的運(yùn)算在并發(fā)下是安全的目代。用volatile修飾count變量屈梁,重新測試上面的代碼嗤练,count每次都是輸出小于20000的數(shù)字,因?yàn)楹诵狞c(diǎn)在于java里的運(yùn)算(比如自增)并不是原子性的(比如 --i在讶、++i這兩個(gè)操作煞抬,其中包含有3個(gè)操作步驟:第一步,讀取i构哺;第二步革答,加1或減1;第三步:寫回內(nèi)存)曙强。
那么原子變量是怎樣保證數(shù)據(jù)同步的呢残拐?你可以這樣理解:當(dāng)有多個(gè)線程同時(shí)操作這些原子類的實(shí)例時(shí),這些原子實(shí)例對于線程X其實(shí)并不具有synchronized的排他性碟嘴,反而具有排己性溪食。我線程X訪問原子實(shí)例怎么會(huì)排斥自己呢?是不是覺得很難理解呢娜扇?其實(shí)排己就是為了數(shù)據(jù)的同步错沃,而且比排他性的效率更高,因?yàn)榕偶菏峭ㄟ^一個(gè)while自旋循環(huán)實(shí)現(xiàn)的雀瓢,而且這個(gè)while循環(huán)耗時(shí)一般比較短枢析。當(dāng)線程X要寫會(huì)內(nèi)存的時(shí)候,先比較內(nèi)存地址的舊值A(chǔ)(這個(gè)值是volatile的即線程間可見)是否改變了刃麸,如果變了(比如另外一個(gè)線程Y修改了內(nèi)存地址對應(yīng)的變量的值醒叁,線程X是可見的),那就就進(jìn)行while自旋嫌蚤,until直到預(yù)期的舊值A(chǔ)是對的辐益,JVM會(huì)為while自旋這樣的操作會(huì)付出一定的代價(jià),但是這樣的代價(jià)相對于synchronized的線程阻塞和喚起脱吱,可以說是比較高效了迂求。
原子類的非阻塞同步實(shí)際上是靠硬件的相關(guān)指令來實(shí)現(xiàn)的窜醉,或者說只是在硬件級(jí)別上阻塞了,可以對基本數(shù)據(jù)、數(shù)組中的基本數(shù)據(jù)舷丹、對類中的基本數(shù)據(jù)進(jìn)行操作。原子變量類相當(dāng)于一種泛化的volatile變量沈自,能夠支持原子的和有條件的讀-改-寫操作——引文毡代。
在并發(fā)環(huán)境下,某個(gè)線程對共享原子變量先進(jìn)行操作间校,如果沒有其他線程爭用共享數(shù)據(jù)那操作就成功矾克;如果存在數(shù)據(jù)的爭用沖突,那就才去補(bǔ)償措施憔足,比如不斷的重試機(jī)制胁附,直到成功為止酒繁,因?yàn)檫@種樂觀的并發(fā)策略不需要把線程掛起,操作和沖突檢測具備原子性控妻。在硬件指令集的發(fā)展驅(qū)動(dòng)下州袒,使得 "操作和沖突檢測" 這種看起來需要多次操作的行為只需要一條處理器指令便可以完成,這些指令中就包括非常著名的CAS指令(Compare-And-Swap比較并交換)弓候±煽蓿《深入理解Java虛擬機(jī)第二版.周志明》第十三章中這樣描述關(guān)于CAS機(jī)制:
CAS指令需要三個(gè)操作數(shù),分別是內(nèi)存位置(Java中可以簡單理解為變量的內(nèi)存地址菇存,用V表示)夸研、舊的預(yù)期值(A)和新值(B)。CAS指令執(zhí)行時(shí)撰筷,當(dāng)且僅當(dāng)V符合舊的預(yù)期值A(chǔ)時(shí)陈惰,處理器用新值B更新V的值,否則它就不執(zhí)行更新毕籽,但是無論是否更新了V的值抬闯,都會(huì)返回V的舊值,上述的處理過程就是一個(gè)原子操作关筒。
在JDK1.5之后溶握,Java程序中才可以使用CAS操作,該操作由sun.misc.Unsafe類里面的compareAndSwapInt()
和compareAndSwapLong()
等幾個(gè)方法包裝提供蒸播,虛擬機(jī)在內(nèi)部對這些方法做了特殊處理睡榆,即時(shí)編譯出來的結(jié)果就是一條平臺(tái)相關(guān)的處理器CAS指令,沒有方法調(diào)用的過程袍榆,或者可以認(rèn)為是無條件內(nèi)聯(lián)進(jìn)去了胀屿。
由于sun.misc.Unsafe類不是提供給用戶程序調(diào)用的類(Unsafe.getUnsafe()的代碼中限制了只有啟動(dòng)類加載器(BootStrap ClassLoader)加載的Class才能訪問它),因此包雀,如果不采用反射手段宿崭,我們只能通過其它的Java API來間接使用它,如J.U.C包里的整數(shù)原子類才写,其中的compareAndSet()
和getAndIncrement()
等方法都使用了Unsafe類的CAS操作葡兑。
我們來看一下AtomicInteger的incrementAndGet
的源碼:
private static final long valueOffset;
private static final Unsafe unsafe = Unsafe.getUnsafe();
static {
try {
valueOffset = unsafe.objectFieldOffset(AtomicInteger.class.getDeclaredField("value"));
} catch (Exception ex) { throw new Error(ex); }
}
private volatile int value;
public final int incrementAndGet() {
return unsafe.getAndAddInt(this, valueOffset, 1) + 1;
}
可以看到變量value是用volatile修飾的,那么這個(gè)變量在線程之間是透明的赞草,這個(gè)方法其實(shí)就是調(diào)用Unsafe類的方法讹堤,我們再進(jìn)去看下Unsafe的getAndAddInt()
方法的源碼:
public final int getAndAddInt(Object o, long valueOffset, int modify) {
int expect;
do {
expect = this.getIntVolatile(o, valueOffset);
} while(!this.compareAndSwapInt(o, valueOffset, expect, expect + modify));
return expect;
}
可以看到是一個(gè)while循環(huán),非阻塞同步的訣竅就在于這個(gè)自旋式while循環(huán)
厨疙,循環(huán)自旋不斷嘗試將一個(gè)比當(dāng)前值大modify的新值賦給自己洲守,如果失敗則說明在執(zhí)行"獲取-設(shè)置"操作的時(shí)已經(jīng)被其它線程修改過了,于是便再次進(jìn)入循環(huán)下一次操作,直到成功為止岖沛。原子的將變量設(shè)定為新數(shù)據(jù)暑始,同時(shí)返回先前的舊數(shù)據(jù)。
java.util.concurrent.atomic中的類可以分成4組:
- 標(biāo)量類(Scalar):AtomicBoolean婴削,AtomicInteger,AtomicLong牙肝,AtomicReference
- 數(shù)組類:AtomicIntegerArray唉俗,AtomicLongArray,AtomicReferenceArray
- 更新器類:AtomicLongFieldUpdater配椭,AtomicIntegerFieldUpdater虫溜,AtomicReferenceFieldUpdater
- 復(fù)合變量類:AtomicMarkableReference,AtomicStampedReference
標(biāo)量類
第一組AtomicBoolean股缸,AtomicInteger衡楞,AtomicLong,AtomicReference這四種基本類型用來處理布爾敦姻,整數(shù)瘾境,長整數(shù),對象四種數(shù)據(jù)镰惦,其內(nèi)部實(shí)現(xiàn)不是簡單的使用synchronized迷守,而是一個(gè)更為高效的方式CAS (compare and swap) + volatile和native方法,從而避免了synchronized的高開銷旺入,執(zhí)行效率大為提升兑凿。
他們的實(shí)現(xiàn)都是依靠 真正的值為volatile 類型,通過Unsafe 包中的原子操作實(shí)現(xiàn)茵瘾。最基礎(chǔ)就是CAS礼华,他是一切的基礎(chǔ)。如下 拗秘。其中valueOffset是 在內(nèi)存中 value相對于基地址的偏移量圣絮。(它的獲得也由Unsafe 本地代碼獲得)。
核心代碼如下聘殖,其他都是在compareAndSet
基礎(chǔ)上構(gòu)建的晨雳。
public class AtomicInteger extends Number implements java.io.Serializable {
private static final long serialVersionUID = 6214790243416807050L;
// setup to use Unsafe.compareAndSwapInt for updates
private static final Unsafe unsafe = Unsafe.getUnsafe();
private static final long valueOffset;
static {
try {
valueOffset = unsafe.objectFieldOffset
(AtomicInteger.class.getDeclaredField("value"));
} catch (Exception ex) { throw new Error(ex); }
}
private volatile int value;
public AtomicInteger(int initialValue) {
value = initialValue;
}
public AtomicInteger() {
}
public final int get() {
return value;
}
public final void set(int newValue) {
value = newValue;
}
public final void lazySet(int newValue) {
unsafe.putOrderedInt(this, valueOffset, newValue);
}
public final int getAndSet(int newValue) {
return unsafe.getAndSetInt(this, valueOffset, newValue);
}
public final boolean compareAndSet(int expect, int update) {
return unsafe.compareAndSwapInt(this, valueOffset, expect, update);
}
public final boolean weakCompareAndSet(int expect, int update) {
return unsafe.compareAndSwapInt(this, valueOffset, expect, update);
}
public final int getAndIncrement() {
return unsafe.getAndAddInt(this, valueOffset, 1);
}
public final int getAndDecrement() {
return unsafe.getAndAddInt(this, valueOffset, -1);
}
public final int getAndAdd(int delta) {
return unsafe.getAndAddInt(this, valueOffset, delta);
}
public final int incrementAndGet() {
return unsafe.getAndAddInt(this, valueOffset, 1) + 1;
}
public final int decrementAndGet() {
return unsafe.getAndAddInt(this, valueOffset, -1) - 1;
}
public final int addAndGet(int delta) {
return unsafe.getAndAddInt(this, valueOffset, delta) + delta;
}
public final int getAndUpdate(IntUnaryOperator updateFunction) {
int prev, next;
do {
prev = get();
next = updateFunction.applyAsInt(prev);
} while (!compareAndSet(prev, next));
return prev;
}
public final int updateAndGet(IntUnaryOperator updateFunction) {
int prev, next;
do {
prev = get();
next = updateFunction.applyAsInt(prev);
} while (!compareAndSet(prev, next));
return next;
}
public final int getAndAccumulate(int x, IntBinaryOperator accumulatorFunction) {
int prev, next;
do {
prev = get();
next = accumulatorFunction.applyAsInt(prev, x);
} while (!compareAndSet(prev, next));
return prev;
}
public final int accumulateAndGet(int x, IntBinaryOperator accumulatorFunction) {
int prev, next;
do {
prev = get();
next = accumulatorFunction.applyAsInt(prev, x);
} while (!compareAndSet(prev, next));
return next;
}
}
void set()和void lazySet():set設(shè)置為給定值,直接修改原始值奸腺;lazySet延時(shí)設(shè)置變量值餐禁,這個(gè)等價(jià)于set()方法,但是由于字段是volatile類型的突照,因此次字段的修改會(huì)比普通字段(非volatile字段)有稍微的性能延時(shí)(盡管可以忽略)帮非,所以如果不是想立即讀取設(shè)置的新值,允許在“后臺(tái)”修改值,那么此方法就很有用末盔。
數(shù)組類
第二組AtomicIntegerArray筑舅,AtomicLongArray還有AtomicReferenceArray類進(jìn)一步擴(kuò)展了原子操作,對這些類型的數(shù)組提供了支持陨舱。這些類在為其數(shù)組元素提供 volatile 訪問語義方面也引人注目翠拣,這對于普通數(shù)組來說是不受支持的。
他們內(nèi)部并不是像AtomicInteger一樣維持一個(gè)valatile變量游盲,而是全部由native方法實(shí)現(xiàn)误墓,如下
AtomicIntegerArray的實(shí)現(xiàn)片斷:
public class AtomicIntegerArray implements java.io.Serializable {
private static final long serialVersionUID = 2862133569453604235L;
private static final Unsafe unsafe = Unsafe.getUnsafe();
private static final int base = unsafe.arrayBaseOffset(int[].class);
private static final int shift;
private final int[] array;
static {
int scale = unsafe.arrayIndexScale(int[].class);
if ((scale & (scale - 1)) != 0)
throw new Error("data type scale not a power of two");
shift = 31 - Integer.numberOfLeadingZeros(scale);
}
private long checkedByteOffset(int i) {
if (i < 0 || i >= array.length)
throw new IndexOutOfBoundsException("index " + i);
return byteOffset(i);
}
private static long byteOffset(int i) {
return ((long) i << shift) + base;
}
public AtomicIntegerArray(int length) {
array = new int[length];
}
public AtomicIntegerArray(int[] array) {
// Visibility guaranteed by final field guarantees
this.array = array.clone();
}
public final int length() {
return array.length;
}
public final int get(int i) {
return getRaw(checkedByteOffset(i));
}
private int getRaw(long offset) {
return unsafe.getIntVolatile(array, offset);
}
public final void set(int i, int newValue) {
unsafe.putIntVolatile(array, checkedByteOffset(i), newValue);
}
public final void lazySet(int i, int newValue) {
unsafe.putOrderedInt(array, checkedByteOffset(i), newValue);
}
public final int getAndSet(int i, int newValue) {
return unsafe.getAndSetInt(array, checkedByteOffset(i), newValue);
}
public final boolean compareAndSet(int i, int expect, int update) {
return compareAndSetRaw(checkedByteOffset(i), expect, update);
}
private boolean compareAndSetRaw(long offset, int expect, int update) {
return unsafe.compareAndSwapInt(array, offset, expect, update);
}
public final boolean weakCompareAndSet(int i, int expect, int update) {
return compareAndSet(i, expect, update);
}
public final int getAndIncrement(int i) {
return getAndAdd(i, 1);
}
public final int getAndDecrement(int i) {
return getAndAdd(i, -1);
}
public final int getAndAdd(int i, int delta) {
return unsafe.getAndAddInt(array, checkedByteOffset(i), delta);
}
public final int incrementAndGet(int i) {
return getAndAdd(i, 1) + 1;
}
public final int decrementAndGet(int i) {
return getAndAdd(i, -1) - 1;
}
public final int addAndGet(int i, int delta) {
return getAndAdd(i, delta) + delta;
}
public final int getAndUpdate(int i, IntUnaryOperator updateFunction) {
long offset = checkedByteOffset(i);
int prev, next;
do {
prev = getRaw(offset);
next = updateFunction.applyAsInt(prev);
} while (!compareAndSetRaw(offset, prev, next));
return prev;
}
public final int updateAndGet(int i, IntUnaryOperator updateFunction) {
long offset = checkedByteOffset(i);
int prev, next;
do {
prev = getRaw(offset);
next = updateFunction.applyAsInt(prev);
} while (!compareAndSetRaw(offset, prev, next));
return next;
}
public final int getAndAccumulate(int i, int x,
IntBinaryOperator accumulatorFunction) {
long offset = checkedByteOffset(i);
int prev, next;
do {
prev = getRaw(offset);
next = accumulatorFunction.applyAsInt(prev, x);
} while (!compareAndSetRaw(offset, prev, next));
return prev;
}
public final int accumulateAndGet(int i, int x,
IntBinaryOperator accumulatorFunction) {
long offset = checkedByteOffset(i);
int prev, next;
do {
prev = getRaw(offset);
next = accumulatorFunction.applyAsInt(prev, x);
} while (!compareAndSetRaw(offset, prev, next));
return next;
}
}
可以看到,其實(shí)核心操作還是Unsafe類進(jìn)行處理的益缎。這個(gè)類提供兩個(gè)構(gòu)造器谜慌,一個(gè)是通過數(shù)組長度構(gòu)造,第二個(gè)是通過原有數(shù)組構(gòu)造莺奔,需要注意的是第二個(gè)構(gòu)造器是調(diào)用clone()方法將原有數(shù)組拷貝一份欣范,所以當(dāng)AtomicIntegerArray對內(nèi)部的數(shù)組元素進(jìn)行修改時(shí),不會(huì)影響傳入的數(shù)組令哟,大家都知道clone()是淺拷貝恼琼,所以如果數(shù)組內(nèi)容是對象的話會(huì)有影響。
array使用的是final修飾励饵,變成了常量數(shù)組驳癌,內(nèi)存中的偏移地址不可變,這點(diǎn)很重要役听,但是該內(nèi)存地址所指向的那個(gè)對象還是可以變的颓鲜,這個(gè)array數(shù)組就保存到了方法區(qū),同樣的可以保證多線程訪問時(shí)的可見性典予,避免使用volatile也減少了開銷甜滨。
需要注意的是static語句,arrayBaseOffset這個(gè)native方法是獲取數(shù)組首個(gè)元素的首地址偏移賦值給base瘤袖,arrayIndexScale這個(gè)native方法可以用來獲取數(shù)組元素的增量地址(即每個(gè)元素的字節(jié)大小)賦值給scale衣摩,比如元素是int類型,那么scale就是4捂敌,下標(biāo)為i的元素的內(nèi)存地址就是公式1
艾扮。數(shù)組的偏移地址和每個(gè)元素的偏移地址對于Unsafe的CAS操作十分重要,沒有內(nèi)存的偏移地址Unsafe就無法進(jìn)行CAS操作占婉。這應(yīng)該很好理解的吧泡嘴?這只是簡單的解釋,如果這個(gè)解釋你還不理解的話逆济,那么后面的詳細(xì)的解釋就更難懂了酌予。
詳細(xì)解釋的話磺箕,那就先來研究這兩個(gè)native方法:
public int arrayBaseOffset(Class clazz) {
Class<?> component = clazz.getComponentType();
if (component == null) {
throw new IllegalArgumentException("Valid for array classes only: " + clazz);
}
// TODO: make the following not specific to the object model.
int offset = 12;
if (component == long.class || component == double.class) {
offset += 4; // 4 bytes of padding.
}
return offset;
}
public int arrayIndexScale(Class clazz) {
Class<?> component = clazz.getComponentType();
if (component == null) {
throw new IllegalArgumentException("Valid for array classes only: " + clazz);
}
// TODO: make the following not specific to the object model.
if (!component.isPrimitive()) {
return 4;
} else if (component == long.class || component == double.class) {
return 8;
} else if (component == int.class || component == float.class) {
return 4;
} else if (component == char.class || component == short.class) {
return 2;
} else {
// component == byte.class || component == boolean.class.
return 1;
}
}
arrayBaseOffset就是計(jì)算數(shù)組對象在內(nèi)存中從數(shù)組的地址到首個(gè)元素的地址的偏移量offset,為什么offset會(huì)先加12呢抛虫,這里涉及到的是java內(nèi)存中對象存儲(chǔ)的知識(shí)松靡。每個(gè)類對象在內(nèi)存中存儲(chǔ)時(shí)除了數(shù)據(jù)內(nèi)容,其實(shí)還要包含一個(gè)頭部信息的建椰,對象頭包含Mark Word和指向類的指針兩部分雕欺,Mark Word在32位JVM中的長度是32bit,在64位JVM中長度是64bit广凸,指向類的指針在32位JVM中的長度是32bit阅茶,在64位JVM中長度是64bit,如果這個(gè)類是數(shù)組類型的話谅海,還需要4字節(jié)(該數(shù)據(jù)在32位和64位JVM中長度都是32bit)來存儲(chǔ)數(shù)組的大小,所以這里是12字節(jié)蹦浦。接下來又涉及到了字節(jié)對齊扭吁,在jvm中,是要以8字節(jié)為單位進(jìn)行對齊的盲镶,這里的頭部12字節(jié)肯定是無法對齊了侥袜,但是如果是long,double等8字節(jié)的類型溉贿,就是在開始存時(shí)就進(jìn)行對齊操作枫吧,這樣就能保證接下來的每一個(gè)元素都是8的倍數(shù),而如果是其他的對象比如int 4字節(jié)宇色,就在數(shù)組末尾進(jìn)行對齊九杂,這樣就能缺多少補(bǔ)多少。
而arrayIndexScale實(shí)際上是能獲取數(shù)組中每個(gè)元素在內(nèi)存中的大小賦值給scale宣蠕,有點(diǎn)像cpp里sizeof例隆。
然后接著分析static語句,if ((scale & (scale - 1)) != 0) throw new Error("data type scale not a power of two");
這行代碼意思是說如果scale不是2的次冪的話那么就拋出異常抢蚀,大家都知道Integer的字節(jié)長度是4镀层,long和double都是8字節(jié),這些字節(jié)長度都是2的shift次冪皿曲,下面一行代碼shift = 31 - Integer.numberOfLeadingZeros(scale);
是計(jì)算指數(shù)shift的的過程并賦值給shift唱逢,其實(shí)這行代碼可以使用一個(gè)數(shù)學(xué)公式解釋更好理解一些公式2
。Integer類的方法numberOfLeadingZeros(int)
是什么意思呢屋休?其實(shí)按照字面leading的翻譯是領(lǐng)頭的意思坞古,即領(lǐng)頭的0的個(gè)數(shù),進(jìn)一步解釋是從最高位開始直到遇到1的連續(xù)的0的個(gè)數(shù)博投,相對應(yīng)的Integer還有一個(gè)numberOfTrailingZeros(int)
方法表示尾隨的0的個(gè)數(shù)即從最低位開始直到遇到1的連續(xù)的0的個(gè)數(shù)绸贡。其實(shí)計(jì)算shift的過程主要是為了優(yōu)化公式1
,我們看一下源碼中計(jì)算的過程:
private long checkedByteOffset(int i) {
if (i < 0 || i >= array.length)
throw new IndexOutOfBoundsException("index " + i);
return byteOffset(i);
}
private static long byteOffset(int i) {
return ((long) i << shift) + base;
}
由于公式2
的成立,我們可以把公式1轉(zhuǎn)換成公式3:公式3
听怕。
下面我們仔細(xì)分析下Integer類的方法numberOfLeadingZeros(int)
的原理捧挺,其實(shí)是應(yīng)用了典型的二分查找,先把32位整形分為高16位和低16位查找非零數(shù)尿瞭,在對高16位進(jìn)行或低16位進(jìn)行二分
public static int numberOfLeadingZeros(int i) {
if (i == 0)
return 32;
int n = 1;
// 下面的代碼就是定位從左邊開始第一個(gè)非零值的位置闽烙,在定位過程中順便累加從左邊開始0的個(gè)數(shù)
// 將i無符號(hào)右移16位后,有二種情況声搁;
// 情況1.i=0,則第一個(gè)非零值位于低16位黑竞,i至少有16個(gè)0,同時(shí)將i左移16位(把低16位移到原高16位的位置疏旨,這樣情況1和情況2就能統(tǒng)一后續(xù)的判斷方式)
// 情況2.i!=0,則第一個(gè)非零值位于高16位很魂,后續(xù)在高16位中繼續(xù)判斷
// 這個(gè)思路就是二分查找,首先把32位的數(shù)分為高低16位檐涝,如果非零值位于高16位遏匆,后續(xù)再將高16位繼續(xù)二分為高低8位,一直二分到集合中只有1個(gè)元素
if (i >>> 16 == 0) { n += 16; i <<= 16; }
// 判斷第一個(gè)非零值是否位于高8位
if (i >>> 24 == 0) { n += 8; i <<= 8; }
// 判斷第一個(gè)非零值是否位于高4位
if (i >>> 28 == 0) { n += 4; i <<= 4; }
// 判斷第一個(gè)非零值是否位于高2位
if (i >>> 30 == 0) { n += 2; i <<= 2; }
// 判斷第一個(gè)非零值是否位于左邊第一位
n -= i >>> 31;
return n;
更新器類
第三組AtomicIntegerFieldUpdater谁榜,AtomicLongFieldUpdater幅聘,AtomicReferenceFieldUpdater基于反射的實(shí)用工具,可以對指定類的指定 volatile 字段進(jìn)行原子更新窃植。API非常簡單帝蒿,但是也是有一些約束:
- 字段必須是volatile類型的,否則get的時(shí)候會(huì)拋出
Exception in thread "main" java.lang.ExceptionInInitializerError Caused by: java.lang.IllegalArgumentException: Must be volatile type
異常 - 字段的描述類型(修飾符public/protected/default/private)是與調(diào)用者與操作對象字段的關(guān)系一致巷怜。也就是說 調(diào)用者能夠直接操作對象字段葛超,那么就可以反射進(jìn)行原子操作。但是對于父類的字段丛版,子類是不能直接操作的巩掺,盡管子類可以訪問父類的字段。(我試了下页畦,貌似只要字段不是private就行了胖替,大神解釋下)
- 只能是實(shí)例變量,不能是類變量豫缨,也就是說不能加static關(guān)鍵字独令。
- 只能是可修改變量,不能使final變量好芭,因?yàn)閒inal的語義就是不可修改燃箭。實(shí)際上final的語義和volatile是有沖突的,這兩個(gè)關(guān)鍵字不能同時(shí)存在舍败。
- 對于AtomicIntegerFieldUpdater 和AtomicLongFieldUpdater 只能修改int/long類型的字段招狸,不能修改其包裝類型(Integer/Long)敬拓。如果要修改包裝類型就需要使用AtomicReferenceFieldUpdater 。
下面舉個(gè)例子:
public class Test {
private static class User {
public volatile String name;
public volatile String name2;
public volatile int age;
public volatile int age2;
public AtomicInteger age3;
public User(String name, int age) {
this.name = name;
this.name2 = name;
this.age = age;
this.age2 = age;
this.age3 = new AtomicInteger(age);
}
}
// 創(chuàng)建原子更新器,并設(shè)置需要更新的對象類和對象的屬性
private static AtomicIntegerFieldUpdater<User> a = AtomicIntegerFieldUpdater.newUpdater(User.class, "age");
private static AtomicReferenceFieldUpdater<User, String> b = AtomicReferenceFieldUpdater.newUpdater(User.class, String.class, "name");
public static void main(String args[]) {
User user = new User("zzh", 0);
int threadCnt = 20;
Thread[] threads = new Thread[threadCnt];
CountDownLatch cdt = new CountDownLatch(threadCnt);
for (int i = 0; i < threadCnt; i++) {
final int iT = i;
threads[i] = new Thread(new Runnable() {
@Override
public void run() {
for (int j = 0; j < 4000; j++) {
user.name2 += "0";
a.getAndIncrement(user);
b.accumulateAndGet(user, "0", new BinaryOperator() {
@Override
public Object apply(Object o, Object o2) {
return o.toString() + o2.toString();
}
});
user.age2++;
user.age3.incrementAndGet();
}
cdt.countDown();
}
});
threads[i].start();
}
for (Thread i : threads) {
try {
i.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// while (Thread.activeCount() > 1) {
// Thread.yield();
// }
// try {
// cdt.await();
// } catch (InterruptedException e) {
// e.printStackTrace();
// }
System.out.println(a.get(user) + " " + user.age2 + " " + user.age3.get() + " " + b.get(user).length() + " " + user.name2.length());
}
//輸出
//80000 77295 80000 80003 41319
}
測試結(jié)果是age和age3總是80000即非阻塞同步裙戏,字符串name的長度永遠(yuǎn)都是80003(即zzh后跟8萬個(gè)0的字符串)也保持了數(shù)據(jù)的非阻塞同步乘凸,但是age2總是一個(gè)小于80000的數(shù)字即無法同步,name2的長度也總是小于80003的一個(gè)數(shù)即無法同步累榜。
復(fù)合變量類
防止ABA問題出現(xiàn)而構(gòu)造的類营勤。如什么是ABA問題呢,wiki官方解釋壹罚。假設(shè)多線程環(huán)境中葛作,一個(gè)線程修改A->B,然后又B->A猖凛,另一個(gè)線程看到的值未改變赂蠢,又繼續(xù)修改成自己的期望值。如果我們不關(guān)心中間狀態(tài)的變化辨泳,只關(guān)心最終結(jié)果客年,就無所謂ABA問題。而AtomicStampedReference和AtomicMarkableReference就是為了處理ABA問題而存在的漠吻。
看下AtomicStampedReference是怎么解決這個(gè)問題的:
/**
通過static pair保存一個(gè)引用和計(jì)數(shù)器
*/
private static class Pair<T> {
final T reference;
final int stamp;
private Pair(T reference, int stamp) {
this.reference = reference;
this.stamp = stamp;
}
static <T> Pair<T> of(T reference, int stamp) {
return new Pair<T>(reference, stamp);
}
}
private volatile Pair<V> pair;
/**
* 通過傳入的初始化引用和計(jì)數(shù)器來構(gòu)造函數(shù)一個(gè)pair
*
* @param initialRef 初始化用用
* @param initialStamp 初始化計(jì)數(shù)器或者叫時(shí)間戳
*/
public AtomicStampedReference(V initialRef, int initialStamp) {
pair = Pair.of(initialRef, initialStamp);
}
AtomicStampedReference通過一個(gè)pair來保存初始化引用和計(jì)數(shù)器,以后每次原子操作時(shí)司恳,都需要比較引用和計(jì)數(shù)器是否都正確途乃。舉個(gè)通俗點(diǎn)的例子,你倒了一杯水放桌子上扔傅,干了點(diǎn)別的事耍共,然后同事把你水喝了又給你重新倒了一杯水,你回來看水還在猎塞,拿起來就喝试读,如果你不管水中間被人喝過,只關(guān)心水還在荠耽,這就是ABA問題钩骇。如果你是一個(gè)講衛(wèi)生講文明的小伙子,不但關(guān)心水在不在铝量,還要在你離開的時(shí)候水被人動(dòng)過沒有倘屹,因?yàn)槟闶浅绦騿T,所以就想起了放了張紙?jiān)谂赃吢叮瑢懮铣跏贾?纽匙,別人喝水前麻煩先做個(gè)累加才能喝水。這就是AtomicStampedReference的解決方案拍谐。
看下AtomicStampedReference的方法:
public boolean compareAndSet(V expectedReference,
V newReference,
int expectedStamp,
int newStamp) {
Pair<V> current = pair;
return
expectedReference == current.reference &&
expectedStamp == current.stamp &&
((newReference == current.reference &&
newStamp == current.stamp) ||
casPair(current, Pair.of(newReference, newStamp)));
}
AtomicMarkableReference跟AtomicStampedReference差不多烛缔,AtomicStampedReference是使用pair的int stamp作為計(jì)數(shù)器使用馏段,AtomicMarkableReference的pair使用的是boolean mark。還是那個(gè)水的例子践瓷,AtomicStampedReference可能關(guān)心的是動(dòng)過幾次院喜,AtomicMarkableReference關(guān)心的是有沒有被人動(dòng)過,方法都比較簡單当窗。
LongAdder够坐、LongAccumulator、DoubleAdder崖面、DoubleAccumulator
這四個(gè)類之前是在guava以及hystrix等中出現(xiàn)過元咙,jdk1.8在JUC包中新增了這四個(gè)類,作者是著名的Doug lea巫员。
我們來分析下LongAdder這個(gè)類庶香,其余的類的原理都相似。LongAdder主要處理高并發(fā)情況简识,因?yàn)锳tomicLong雖然能保持?jǐn)?shù)據(jù)同步但是在高并發(fā)情況下CAS失敗率會(huì)很高(即while自旋會(huì)很久)導(dǎo)致效率低下赶掖。
而LongAdder的性能比上面那種還要好很多,首先它有一個(gè)基礎(chǔ)的值base七扰,在發(fā)生競爭的情況下奢赂,會(huì)有一個(gè)Cell數(shù)組用于將不同線程的操作離散到不同的節(jié)點(diǎn)上去(會(huì)根據(jù)需要擴(kuò)容,最大為CPU核數(shù))颈走,sum()會(huì)將所有Cell數(shù)組中的value和base累加作為返回值膳灶。核心的思想就是將AtomicLong將單一value的更新壓力分擔(dān)到多個(gè)value中去,降低單個(gè)value的 “熱度”立由,分段更新T觥!
底競爭下直接更新base锐膜,類似AtomicLong毕箍,高并發(fā)下,會(huì)將每個(gè)線程的操作hash到不同的cells數(shù)組中道盏,從而將AtomicLong中更新而柑,一個(gè)value的行為優(yōu)化之后,分散到多個(gè)value中捞奕,從而降低更新熱點(diǎn)牺堰,而需要得到當(dāng)前值的時(shí)候,直接將所有cell中的value與base相加即可颅围,但是跟AtomicLong(compare and change -> xadd)的CAS不同伟葫,incrementAndGet操作及其變種可以返回更新后的值,而LongAdder返回的是void院促。
public class LongAdder extends Striped64 implements Serializable {
//...
}
LongAdder繼承自Striped64筏养,Striped64內(nèi)部維護(hù)了一個(gè)懶加載的數(shù)組以及一個(gè)額外的base實(shí)力域斧抱,數(shù)組的大小是2的N次方(方便取余操作),使用每個(gè)線程Thread內(nèi)部的哈希值訪問渐溶。
abstract class Striped64 extends Number {
/** Number of CPUS, to place bound on table size */
static final int NCPU = Runtime.getRuntime().availableProcessors();
/**
* Table of cells. When non-null, size is a power of 2.
*/
transient volatile Cell[] cells;
@sun.misc.Contended static final class Cell {
volatile long value;
Cell(long x) { value = x; }
final boolean cas(long cmp, long val) {
return UNSAFE.compareAndSwapLong(this, valueOffset, cmp, val);
}
// Unsafe mechanics
private static final sun.misc.Unsafe UNSAFE;
private static final long valueOffset;
static {
try {
UNSAFE = sun.misc.Unsafe.getUnsafe();
Class<?> ak = Cell.class;
valueOffset = UNSAFE.objectFieldOffset
(ak.getDeclaredField("value"));
} catch (Exception e) {
throw new Error(e);
}
}
}
}
數(shù)組的元素是Cell
類辉浦,可以看到Cell類用Contended注解修飾,這里主要是解決false sharing(偽共享的問題)茎辐,比如兩個(gè)volatile變量被分配到了同一個(gè)緩存行宪郊,但是這兩個(gè)的更新在高并發(fā)下會(huì)競爭,比如線程A去更新變量a拖陆,線程B去更新變量b疗杉,但是這兩個(gè)變量被分配到了同一個(gè)緩存行栗涂,因此會(huì)造成每個(gè)線程都去爭搶緩存行的所有權(quán)瞧掺,例如A獲取了所有權(quán)然后執(zhí)行更新這時(shí)由于volatile的語義會(huì)造成其刷新到主存案训,但是由于變量b也被緩存到同一個(gè)緩存行,因此就會(huì)造成cache miss速警,這樣就會(huì)造成極大的性能損失叹誉,因此有一些類庫的作者,例如JUC下面的闷旧、Disruptor等都利用了插入dummy 變量的方式长豁,使得緩存行被其獨(dú)占,比如下面這種代碼添加了14行緩沖行防止偽共享:
static final class Cell {
volatile long p0, p1, p2, p3, p4, p5, p6;
volatile long value;
volatile long q0, q1, q2, q3, q4, q5, q6;
Cell(long x) { value = x; }
final boolean cas(long cmp, long val) {
return UNSAFE.compareAndSwapLong(this, valueOffset, cmp, val);
}
// Unsafe mechanics
private static final sun.misc.Unsafe UNSAFE;
private static final long valueOffset;
static {
try {
UNSAFE = getUnsafe();
Class<?> ak = Cell.class;
valueOffset = UNSAFE.objectFieldOffset
(ak.getDeclaredField("value"));
} catch (Exception e) {
throw new Error(e);
}
}
}
但是這種方式畢竟不通用忙灼,例如32蕉斜、64位操作系統(tǒng)的緩存行大小不一樣,因此JAVA8中就增加了一個(gè)注@sun.misc.Contended解用于解決這個(gè)問題,由JVM去插入這些變量缀棍,具體可以參考openjdk.java.net/jeps/142 ,但是通常來說對象通常是不規(guī)則的分配到內(nèi)存中的机错,但是數(shù)組由于是連續(xù)的內(nèi)存爬范,因此可能會(huì)共享緩存行,因此這里加一個(gè)Contended注解以防cells數(shù)組發(fā)生偽共享的情況弱匪。
來看下核心的add
和sum
方法:
public class LongAdder {
public void add(long x) {
Cell[] as; long b, v; int m; Cell a;
/**
* 如果是第一次執(zhí)行青瀑,則直接case操作base
*/
if ((as = cells) != null || !casBase(b = base, b + x)) {
boolean uncontended = true;
/**
* as數(shù)組為空(null或者size為0)
* 或者當(dāng)前線程取模as數(shù)組大小為空
* 或者cas更新Cell失敗
*/
if (as == null || (m = as.length - 1) < 0 ||
(a = as[getProbe() & m]) == null ||
!(uncontended = a.cas(v = a.value, v + x)))
longAccumulate(x, null, uncontended);
}
}
public long sum() {
//通過累加base與cells數(shù)組中的value從而獲得sum
Cell[] as = cells; Cell a;
long sum = base;
if (as != null) {
for (int i = 0; i < as.length; ++i) {
if ((a = as[i]) != null)
sum += a.value;
}
}
return sum;
}
}
其中casBase操作其實(shí)就是Unsafe的CAS操作。LongAdder的分段更新操作會(huì)帶來空間上的浪費(fèi)(Cell數(shù)組導(dǎo)致)萧诫,可以空間換時(shí)間斥难,但是,能不換就不換帘饶! 所以哑诊,只有并發(fā)高到一定程度了,才會(huì)分段更新及刻,因?yàn)榈筒l(fā)時(shí)镀裤,casBase操作基本都會(huì)成功竞阐。
由于Cell相對來說比較占內(nèi)存,因此這里采用懶加載的方式暑劝,在無競爭的情況下直接更新base域骆莹,在第一次發(fā)生競爭的時(shí)候(CAS失敗)就會(huì)創(chuàng)建一個(gè)大小為2的cells數(shù)組,每次擴(kuò)容都是加倍(加倍操作由longAccumulate方法判斷并執(zhí)行)担猛,只到達(dá)到CPU核數(shù)幕垦。同時(shí)我們知道擴(kuò)容數(shù)組等行為需要只能有一個(gè)線程同時(shí)執(zhí)行,因此需要一個(gè)鎖傅联,這里通過CAS更新cellsBusy來實(shí)現(xiàn)一個(gè)簡單的spin lock先改。
數(shù)組訪問索引是通過Thread里的threadLocalRandomProbe域取模實(shí)現(xiàn)的,這個(gè)域是ThreadLocalRandom更新的纺且,cells的數(shù)組大小被限制為CPU的核數(shù)盏道,因?yàn)榧词褂谐^核數(shù)個(gè)線程去更新,但是每個(gè)線程也只會(huì)和一個(gè)CPU綁定载碌,更新的時(shí)候頂多會(huì)有cpu核數(shù)個(gè)線程猜嘱,因此我們只需要通過hash將不同線程的更新行為離散到不同的slot即可。
我們知道線程嫁艇、線程池會(huì)被關(guān)閉或銷毀朗伶,這個(gè)時(shí)候可能這個(gè)線程之前占用的slot就會(huì)變成沒人用的,但我們也不能清除掉步咪,因?yàn)橐话鉾eb應(yīng)用都是長時(shí)間運(yùn)行的论皆,線程通常也會(huì)動(dòng)態(tài)創(chuàng)建、銷毀猾漫,很可能一段時(shí)間后又會(huì)被其他線程占用点晴,而對于短時(shí)間運(yùn)行的,例如單元測試悯周,清除掉有啥意義呢粒督?
用Doug Lea說的一句話: 低并發(fā)時(shí)LongAdder和AtomicLong性能差不多,高并發(fā)時(shí)LongAdder更高效禽翼。其實(shí)LongAdder就是以空間(Cell數(shù)組)換時(shí)間(CAS操作)屠橄。
參考文獻(xiàn):