Java并發(fā)-atomic原子類包源碼剖析

java.util.concurrent.atomic是jdk1.5新增的羹应,這個(gè)包下主要提供了一些原子類犬金,這些類基本特性是線程安全的昔期,保證數(shù)據(jù)的非阻塞同步(比jdk1.5之前的synchronized阻塞同步更高效)骂远,這樣就避免了阻塞同步中線程阻塞和喚醒帶來的性能問題。

下面來舉個(gè)例子說明下原子類的非阻塞同步翠订,代碼如下:

public class Test { 
    public static void main(String args[]) {
        int threadCnt = 20;
        Thread[] threads = new Thread[threadCnt];
        CountDownLatch cdt = new CountDownLatch(threadCnt);
        for (int i = 0; i < threadCnt; i++) {
            threads[i] = new Thread(new Runnable() {
                @Override
                public void run() {
                    for (int j = 0; j < 1000000; j++) {
                        increase();
                    }
                    cdt.countDown();
                }
            });
            threads[i].start();
        }
        for (Thread i : threads) {
            try {
                i.join();//保證前面的所有的線程執(zhí)行玩再接著執(zhí)行主線程
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

//        while (Thread.activeCount() > 1) {
//            Thread.yield();
//        }
//        try {
//            cdt.await();//保證前面的所有的線程執(zhí)行玩再接著執(zhí)行主線程
//        } catch (InterruptedException e) {
//            e.printStackTrace();
//        }
        System.out.println(count + "   " + count2.get());
    }

    public static int count = 0;
    public static AtomicInteger count2 = new AtomicInteger(0);

    public static void increase() {
        count++;
        count2.incrementAndGet();
    }

上面的代碼運(yùn)行了20個(gè)線程巢音,每個(gè)線程對count變量和count2變量進(jìn)行100W次自增操作,如果上面這段代碼能夠正常并發(fā)的話尽超,最后的結(jié)果應(yīng)該都是2000W(20 * 100,0000=2000,0000)才對官撼,但實(shí)際結(jié)果卻發(fā)現(xiàn)每次運(yùn)行count的結(jié)果都不相同,都是一個(gè)小于2000W的數(shù)字似谁,而count2永遠(yuǎn)都是2000W傲绣。

要是換成volatile修飾count變量呢?volatile關(guān)鍵字很重要的兩個(gè)特性:

  1. 保證變量在線程間可見巩踏,對volatile變量所有的寫操作都能立即反應(yīng)到其他線程中斜筐,換句話說,volatile變量在各個(gè)線程中是一致的(得益于java內(nèi)存模型—"先行發(fā)生原則")蛀缝;
  2. 禁止指令的重排序優(yōu)化;

但是volatile并沒有我所期待的第三個(gè)特性:volatile修飾的變量的運(yùn)算在并發(fā)下是安全的目代。用volatile修飾count變量屈梁,重新測試上面的代碼嗤练,count每次都是輸出小于20000的數(shù)字,因?yàn)楹诵狞c(diǎn)在于java里的運(yùn)算(比如自增)并不是原子性的(比如 --i在讶、++i這兩個(gè)操作煞抬,其中包含有3個(gè)操作步驟:第一步,讀取i构哺;第二步革答,加1或減1;第三步:寫回內(nèi)存)曙强。

那么原子變量是怎樣保證數(shù)據(jù)同步的呢残拐?你可以這樣理解:當(dāng)有多個(gè)線程同時(shí)操作這些原子類的實(shí)例時(shí),這些原子實(shí)例對于線程X其實(shí)并不具有synchronized的排他性碟嘴,反而具有排己性溪食。我線程X訪問原子實(shí)例怎么會(huì)排斥自己呢?是不是覺得很難理解呢娜扇?其實(shí)排己就是為了數(shù)據(jù)的同步错沃,而且比排他性的效率更高,因?yàn)榕偶菏峭ㄟ^一個(gè)while自旋循環(huán)實(shí)現(xiàn)的雀瓢,而且這個(gè)while循環(huán)耗時(shí)一般比較短枢析。當(dāng)線程X要寫會(huì)內(nèi)存的時(shí)候,先比較內(nèi)存地址的舊值A(chǔ)(這個(gè)值是volatile的即線程間可見)是否改變了刃麸,如果變了(比如另外一個(gè)線程Y修改了內(nèi)存地址對應(yīng)的變量的值醒叁,線程X是可見的),那就就進(jìn)行while自旋嫌蚤,until直到預(yù)期的舊值A(chǔ)是對的辐益,JVM會(huì)為while自旋這樣的操作會(huì)付出一定的代價(jià),但是這樣的代價(jià)相對于synchronized的線程阻塞和喚起脱吱,可以說是比較高效了迂求。

原子類的非阻塞同步實(shí)際上是靠硬件的相關(guān)指令來實(shí)現(xiàn)的窜醉,或者說只是在硬件級(jí)別上阻塞了,可以對基本數(shù)據(jù)、數(shù)組中的基本數(shù)據(jù)舷丹、對類中的基本數(shù)據(jù)進(jìn)行操作。原子變量類相當(dāng)于一種泛化的volatile變量沈自,能夠支持原子的和有條件的讀-改-寫操作——引文毡代。

在并發(fā)環(huán)境下,某個(gè)線程對共享原子變量先進(jìn)行操作间校,如果沒有其他線程爭用共享數(shù)據(jù)那操作就成功矾克;如果存在數(shù)據(jù)的爭用沖突,那就才去補(bǔ)償措施憔足,比如不斷的重試機(jī)制胁附,直到成功為止酒繁,因?yàn)檫@種樂觀的并發(fā)策略不需要把線程掛起,操作和沖突檢測具備原子性控妻。在硬件指令集的發(fā)展驅(qū)動(dòng)下州袒,使得 "操作和沖突檢測" 這種看起來需要多次操作的行為只需要一條處理器指令便可以完成,這些指令中就包括非常著名的CAS指令(Compare-And-Swap比較并交換)弓候±煽蓿《深入理解Java虛擬機(jī)第二版.周志明》第十三章中這樣描述關(guān)于CAS機(jī)制:

CAS指令需要三個(gè)操作數(shù),分別是內(nèi)存位置(Java中可以簡單理解為變量的內(nèi)存地址菇存,用V表示)夸研、舊的預(yù)期值(A)和新值(B)。CAS指令執(zhí)行時(shí)撰筷,當(dāng)且僅當(dāng)V符合舊的預(yù)期值A(chǔ)時(shí)陈惰,處理器用新值B更新V的值,否則它就不執(zhí)行更新毕籽,但是無論是否更新了V的值抬闯,都會(huì)返回V的舊值,上述的處理過程就是一個(gè)原子操作关筒。
在JDK1.5之后溶握,Java程序中才可以使用CAS操作,該操作由sun.misc.Unsafe類里面的compareAndSwapInt()compareAndSwapLong()等幾個(gè)方法包裝提供蒸播,虛擬機(jī)在內(nèi)部對這些方法做了特殊處理睡榆,即時(shí)編譯出來的結(jié)果就是一條平臺(tái)相關(guān)的處理器CAS指令,沒有方法調(diào)用的過程袍榆,或者可以認(rèn)為是無條件內(nèi)聯(lián)進(jìn)去了胀屿。
由于sun.misc.Unsafe類不是提供給用戶程序調(diào)用的類(Unsafe.getUnsafe()的代碼中限制了只有啟動(dòng)類加載器(BootStrap ClassLoader)加載的Class才能訪問它),因此包雀,如果不采用反射手段宿崭,我們只能通過其它的Java API來間接使用它,如J.U.C包里的整數(shù)原子類才写,其中的compareAndSet()getAndIncrement()等方法都使用了Unsafe類的CAS操作葡兑。

我們來看一下AtomicInteger的incrementAndGet的源碼:

    private static final long valueOffset;
    private static final Unsafe unsafe = Unsafe.getUnsafe();
    static {
        try {
            valueOffset = unsafe.objectFieldOffset(AtomicInteger.class.getDeclaredField("value"));
        } catch (Exception ex) { throw new Error(ex); }
    }

    private volatile int value;

    public final int incrementAndGet() {
        return unsafe.getAndAddInt(this, valueOffset, 1) + 1;
    }

可以看到變量value是用volatile修飾的,那么這個(gè)變量在線程之間是透明的赞草,這個(gè)方法其實(shí)就是調(diào)用Unsafe類的方法讹堤,我們再進(jìn)去看下Unsafe的getAndAddInt()方法的源碼:

    public final int getAndAddInt(Object o, long valueOffset, int modify) {
        int expect;
        do {
            expect = this.getIntVolatile(o, valueOffset);
        } while(!this.compareAndSwapInt(o, valueOffset, expect, expect + modify));
        return expect;
    }

可以看到是一個(gè)while循環(huán),非阻塞同步的訣竅就在于這個(gè)自旋式while循環(huán)厨疙,循環(huán)自旋不斷嘗試將一個(gè)比當(dāng)前值大modify的新值賦給自己洲守,如果失敗則說明在執(zhí)行"獲取-設(shè)置"操作的時(shí)已經(jīng)被其它線程修改過了,于是便再次進(jìn)入循環(huán)下一次操作,直到成功為止岖沛。原子的將變量設(shè)定為新數(shù)據(jù)暑始,同時(shí)返回先前的舊數(shù)據(jù)。

java.util.concurrent.atomic中的類可以分成4組:

  • 標(biāo)量類(Scalar):AtomicBoolean婴削,AtomicInteger,AtomicLong牙肝,AtomicReference
  • 數(shù)組類:AtomicIntegerArray唉俗,AtomicLongArray,AtomicReferenceArray
  • 更新器類:AtomicLongFieldUpdater配椭,AtomicIntegerFieldUpdater虫溜,AtomicReferenceFieldUpdater
  • 復(fù)合變量類:AtomicMarkableReference,AtomicStampedReference

標(biāo)量類

第一組AtomicBoolean股缸,AtomicInteger衡楞,AtomicLong,AtomicReference這四種基本類型用來處理布爾敦姻,整數(shù)瘾境,長整數(shù),對象四種數(shù)據(jù)镰惦,其內(nèi)部實(shí)現(xiàn)不是簡單的使用synchronized迷守,而是一個(gè)更為高效的方式CAS (compare and swap) + volatile和native方法,從而避免了synchronized的高開銷旺入,執(zhí)行效率大為提升兑凿。

他們的實(shí)現(xiàn)都是依靠 真正的值為volatile 類型,通過Unsafe 包中的原子操作實(shí)現(xiàn)茵瘾。最基礎(chǔ)就是CAS礼华,他是一切的基礎(chǔ)。如下 拗秘。其中valueOffset是 在內(nèi)存中 value相對于基地址的偏移量圣絮。(它的獲得也由Unsafe 本地代碼獲得)。

核心代碼如下聘殖,其他都是在compareAndSet基礎(chǔ)上構(gòu)建的晨雳。

public class AtomicInteger extends Number implements java.io.Serializable {
    private static final long serialVersionUID = 6214790243416807050L;

    // setup to use Unsafe.compareAndSwapInt for updates
    private static final Unsafe unsafe = Unsafe.getUnsafe();
    private static final long valueOffset;

    static {
        try {
            valueOffset = unsafe.objectFieldOffset
                (AtomicInteger.class.getDeclaredField("value"));
        } catch (Exception ex) { throw new Error(ex); }
    }

    private volatile int value;

    public AtomicInteger(int initialValue) {
        value = initialValue;
    }

    public AtomicInteger() {
    }

    public final int get() {
        return value;
    }

    public final void set(int newValue) {
        value = newValue;
    }

    public final void lazySet(int newValue) {
        unsafe.putOrderedInt(this, valueOffset, newValue);
    }

    public final int getAndSet(int newValue) {
        return unsafe.getAndSetInt(this, valueOffset, newValue);
    }

    public final boolean compareAndSet(int expect, int update) {
        return unsafe.compareAndSwapInt(this, valueOffset, expect, update);
    }

    public final boolean weakCompareAndSet(int expect, int update) {
        return unsafe.compareAndSwapInt(this, valueOffset, expect, update);
    }

    public final int getAndIncrement() {
        return unsafe.getAndAddInt(this, valueOffset, 1);
    }

    public final int getAndDecrement() {
        return unsafe.getAndAddInt(this, valueOffset, -1);
    }

    public final int getAndAdd(int delta) {
        return unsafe.getAndAddInt(this, valueOffset, delta);
    }

    public final int incrementAndGet() {
        return unsafe.getAndAddInt(this, valueOffset, 1) + 1;
    }

    public final int decrementAndGet() {
        return unsafe.getAndAddInt(this, valueOffset, -1) - 1;
    }

    public final int addAndGet(int delta) {
        return unsafe.getAndAddInt(this, valueOffset, delta) + delta;
    }

    public final int getAndUpdate(IntUnaryOperator updateFunction) {
        int prev, next;
        do {
            prev = get();
            next = updateFunction.applyAsInt(prev);
        } while (!compareAndSet(prev, next));
        return prev;
    }

    public final int updateAndGet(IntUnaryOperator updateFunction) {
        int prev, next;
        do {
            prev = get();
            next = updateFunction.applyAsInt(prev);
        } while (!compareAndSet(prev, next));
        return next;
    }

    public final int getAndAccumulate(int x, IntBinaryOperator accumulatorFunction) {
        int prev, next;
        do {
            prev = get();
            next = accumulatorFunction.applyAsInt(prev, x);
        } while (!compareAndSet(prev, next));
        return prev;
    }

    public final int accumulateAndGet(int x, IntBinaryOperator accumulatorFunction) {
        int prev, next;
        do {
            prev = get();
            next = accumulatorFunction.applyAsInt(prev, x);
        } while (!compareAndSet(prev, next));
        return next;
    }
}

void set()和void lazySet():set設(shè)置為給定值,直接修改原始值奸腺;lazySet延時(shí)設(shè)置變量值餐禁,這個(gè)等價(jià)于set()方法,但是由于字段是volatile類型的突照,因此次字段的修改會(huì)比普通字段(非volatile字段)有稍微的性能延時(shí)(盡管可以忽略)帮非,所以如果不是想立即讀取設(shè)置的新值,允許在“后臺(tái)”修改值,那么此方法就很有用末盔。

數(shù)組類

第二組AtomicIntegerArray筑舅,AtomicLongArray還有AtomicReferenceArray類進(jìn)一步擴(kuò)展了原子操作,對這些類型的數(shù)組提供了支持陨舱。這些類在為其數(shù)組元素提供 volatile 訪問語義方面也引人注目翠拣,這對于普通數(shù)組來說是不受支持的。

他們內(nèi)部并不是像AtomicInteger一樣維持一個(gè)valatile變量游盲,而是全部由native方法實(shí)現(xiàn)误墓,如下
AtomicIntegerArray的實(shí)現(xiàn)片斷:

public class AtomicIntegerArray implements java.io.Serializable {
    private static final long serialVersionUID = 2862133569453604235L;

    private static final Unsafe unsafe = Unsafe.getUnsafe();
    private static final int base = unsafe.arrayBaseOffset(int[].class);
    private static final int shift;
    private final int[] array;

    static {
        int scale = unsafe.arrayIndexScale(int[].class);
        if ((scale & (scale - 1)) != 0)
            throw new Error("data type scale not a power of two");
        shift = 31 - Integer.numberOfLeadingZeros(scale);
    }

    private long checkedByteOffset(int i) {
        if (i < 0 || i >= array.length)
            throw new IndexOutOfBoundsException("index " + i);
        return byteOffset(i);
    }

    private static long byteOffset(int i) {
        return ((long) i << shift) + base;
    }

    public AtomicIntegerArray(int length) {
        array = new int[length];
    }

    public AtomicIntegerArray(int[] array) {
        // Visibility guaranteed by final field guarantees
        this.array = array.clone();
    }

    public final int length() {
        return array.length;
    }

    public final int get(int i) {
        return getRaw(checkedByteOffset(i));
    }

    private int getRaw(long offset) {
        return unsafe.getIntVolatile(array, offset);
    }

    public final void set(int i, int newValue) {
        unsafe.putIntVolatile(array, checkedByteOffset(i), newValue);
    }

    public final void lazySet(int i, int newValue) {
        unsafe.putOrderedInt(array, checkedByteOffset(i), newValue);
    }

    public final int getAndSet(int i, int newValue) {
        return unsafe.getAndSetInt(array, checkedByteOffset(i), newValue);
    }

    public final boolean compareAndSet(int i, int expect, int update) {
        return compareAndSetRaw(checkedByteOffset(i), expect, update);
    }

    private boolean compareAndSetRaw(long offset, int expect, int update) {
        return unsafe.compareAndSwapInt(array, offset, expect, update);
    }

    public final boolean weakCompareAndSet(int i, int expect, int update) {
        return compareAndSet(i, expect, update);
    }

    public final int getAndIncrement(int i) {
        return getAndAdd(i, 1);
    }

    public final int getAndDecrement(int i) {
        return getAndAdd(i, -1);
    }

    public final int getAndAdd(int i, int delta) {
        return unsafe.getAndAddInt(array, checkedByteOffset(i), delta);
    }

    public final int incrementAndGet(int i) {
        return getAndAdd(i, 1) + 1;
    }

    public final int decrementAndGet(int i) {
        return getAndAdd(i, -1) - 1;
    }

    public final int addAndGet(int i, int delta) {
        return getAndAdd(i, delta) + delta;
    }

    public final int getAndUpdate(int i, IntUnaryOperator updateFunction) {
        long offset = checkedByteOffset(i);
        int prev, next;
        do {
            prev = getRaw(offset);
            next = updateFunction.applyAsInt(prev);
        } while (!compareAndSetRaw(offset, prev, next));
        return prev;
    }

    public final int updateAndGet(int i, IntUnaryOperator updateFunction) {
        long offset = checkedByteOffset(i);
        int prev, next;
        do {
            prev = getRaw(offset);
            next = updateFunction.applyAsInt(prev);
        } while (!compareAndSetRaw(offset, prev, next));
        return next;
    }

    public final int getAndAccumulate(int i, int x,
                                      IntBinaryOperator accumulatorFunction) {
        long offset = checkedByteOffset(i);
        int prev, next;
        do {
            prev = getRaw(offset);
            next = accumulatorFunction.applyAsInt(prev, x);
        } while (!compareAndSetRaw(offset, prev, next));
        return prev;
    }

    public final int accumulateAndGet(int i, int x,
                                      IntBinaryOperator accumulatorFunction) {
        long offset = checkedByteOffset(i);
        int prev, next;
        do {
            prev = getRaw(offset);
            next = accumulatorFunction.applyAsInt(prev, x);
        } while (!compareAndSetRaw(offset, prev, next));
        return next;
    }
}

可以看到,其實(shí)核心操作還是Unsafe類進(jìn)行處理的益缎。這個(gè)類提供兩個(gè)構(gòu)造器谜慌,一個(gè)是通過數(shù)組長度構(gòu)造,第二個(gè)是通過原有數(shù)組構(gòu)造莺奔,需要注意的是第二個(gè)構(gòu)造器是調(diào)用clone()方法將原有數(shù)組拷貝一份欣范,所以當(dāng)AtomicIntegerArray對內(nèi)部的數(shù)組元素進(jìn)行修改時(shí),不會(huì)影響傳入的數(shù)組令哟,大家都知道clone()是淺拷貝恼琼,所以如果數(shù)組內(nèi)容是對象的話會(huì)有影響。

array使用的是final修飾励饵,變成了常量數(shù)組驳癌,內(nèi)存中的偏移地址不可變,這點(diǎn)很重要役听,但是該內(nèi)存地址所指向的那個(gè)對象還是可以變的颓鲜,這個(gè)array數(shù)組就保存到了方法區(qū),同樣的可以保證多線程訪問時(shí)的可見性典予,避免使用volatile也減少了開銷甜滨。

需要注意的是static語句,arrayBaseOffset這個(gè)native方法是獲取數(shù)組首個(gè)元素的首地址偏移賦值給base瘤袖,arrayIndexScale這個(gè)native方法可以用來獲取數(shù)組元素的增量地址(即每個(gè)元素的字節(jié)大小)賦值給scale衣摩,比如元素是int類型,那么scale就是4捂敌,下標(biāo)為i的元素的內(nèi)存地址valueOffset_i就是valueOffset_i=i* scale + base = i*4+base公式1艾扮。數(shù)組的偏移地址和每個(gè)元素的偏移地址對于Unsafe的CAS操作十分重要,沒有內(nèi)存的偏移地址Unsafe就無法進(jìn)行CAS操作占婉。這應(yīng)該很好理解的吧泡嘴?這只是簡單的解釋,如果這個(gè)解釋你還不理解的話逆济,那么后面的詳細(xì)的解釋就更難懂了酌予。

詳細(xì)解釋的話磺箕,那就先來研究這兩個(gè)native方法:

public int arrayBaseOffset(Class clazz) {
    Class<?> component = clazz.getComponentType();
    if (component == null) {
        throw new IllegalArgumentException("Valid for array classes only: " + clazz);
    }
    // TODO: make the following not specific to the object model.
    int offset = 12;
    if (component == long.class || component == double.class) {
        offset += 4;  // 4 bytes of padding.
    }
    return offset;
}

public int arrayIndexScale(Class clazz) {
    Class<?> component = clazz.getComponentType();
    if (component == null) {
        throw new IllegalArgumentException("Valid for array classes only: " + clazz);
    }
    // TODO: make the following not specific to the object model.
    if (!component.isPrimitive()) {
        return 4;
    } else  if (component == long.class || component == double.class) {
        return 8;
    } else if (component == int.class || component == float.class) {
        return 4;
    } else if (component == char.class || component == short.class) {
        return 2;
    } else {
        // component == byte.class || component == boolean.class.
        return 1;
    }
}

arrayBaseOffset就是計(jì)算數(shù)組對象在內(nèi)存中從數(shù)組的地址到首個(gè)元素的地址的偏移量offset,為什么offset會(huì)先加12呢抛虫,這里涉及到的是java內(nèi)存中對象存儲(chǔ)的知識(shí)松靡。每個(gè)類對象在內(nèi)存中存儲(chǔ)時(shí)除了數(shù)據(jù)內(nèi)容,其實(shí)還要包含一個(gè)頭部信息的建椰,對象頭包含Mark Word指向類的指針兩部分雕欺,Mark Word在32位JVM中的長度是32bit,在64位JVM中長度是64bit广凸,指向類的指針在32位JVM中的長度是32bit阅茶,在64位JVM中長度是64bit,如果這個(gè)類是數(shù)組類型的話谅海,還需要4字節(jié)(該數(shù)據(jù)在32位和64位JVM中長度都是32bit)來存儲(chǔ)數(shù)組的大小,所以這里是12字節(jié)蹦浦。接下來又涉及到了字節(jié)對齊扭吁,在jvm中,是要以8字節(jié)為單位進(jìn)行對齊的盲镶,這里的頭部12字節(jié)肯定是無法對齊了侥袜,但是如果是long,double等8字節(jié)的類型溉贿,就是在開始存時(shí)就進(jìn)行對齊操作枫吧,這樣就能保證接下來的每一個(gè)元素都是8的倍數(shù),而如果是其他的對象比如int 4字節(jié)宇色,就在數(shù)組末尾進(jìn)行對齊九杂,這樣就能缺多少補(bǔ)多少。

而arrayIndexScale實(shí)際上是能獲取數(shù)組中每個(gè)元素在內(nèi)存中的大小賦值給scale宣蠕,有點(diǎn)像cpp里sizeof例隆。

然后接著分析static語句,if ((scale & (scale - 1)) != 0) throw new Error("data type scale not a power of two");這行代碼意思是說如果scale不是2的次冪的話那么就拋出異常抢蚀,大家都知道Integer的字節(jié)長度是4镀层,long和double都是8字節(jié),這些字節(jié)長度都是2的shift次冪皿曲,下面一行代碼shift = 31 - Integer.numberOfLeadingZeros(scale);是計(jì)算指數(shù)shift的的過程并賦值給shift唱逢,其實(shí)這行代碼可以使用一個(gè)數(shù)學(xué)公式解釋更好理解一些2^{shift}=scale公式2。Integer類的方法numberOfLeadingZeros(int)是什么意思呢屋休?其實(shí)按照字面leading的翻譯是領(lǐng)頭的意思坞古,即領(lǐng)頭的0的個(gè)數(shù),進(jìn)一步解釋是從最高位開始直到遇到1的連續(xù)的0的個(gè)數(shù)博投,相對應(yīng)的Integer還有一個(gè)numberOfTrailingZeros(int)方法表示尾隨的0的個(gè)數(shù)即從最低位開始直到遇到1的連續(xù)的0的個(gè)數(shù)绸贡。其實(shí)計(jì)算shift的過程主要是為了優(yōu)化公式1,我們看一下源碼中計(jì)算valueOffset_i的過程:

    private long checkedByteOffset(int i) {
        if (i < 0 || i >= array.length)
            throw new IndexOutOfBoundsException("index " + i);
        return byteOffset(i);
    }

    private static long byteOffset(int i) {
        return ((long) i << shift) + base;
    }

由于公式2的成立,我們可以把公式1轉(zhuǎn)換成公式3:valueOffset_i=i << shift+base公式3听怕。

下面我們仔細(xì)分析下Integer類的方法numberOfLeadingZeros(int)的原理捧挺,其實(shí)是應(yīng)用了典型的二分查找,先把32位整形分為高16位和低16位查找非零數(shù)尿瞭,在對高16位進(jìn)行或低16位進(jìn)行二分

    public static int numberOfLeadingZeros(int i) {
        if (i == 0)                                                                       
                return 32;
        int n = 1;
        // 下面的代碼就是定位從左邊開始第一個(gè)非零值的位置闽烙,在定位過程中順便累加從左邊開始0的個(gè)數(shù)
        // 將i無符號(hào)右移16位后,有二種情況声搁;
        //   情況1.i=0,則第一個(gè)非零值位于低16位黑竞,i至少有16個(gè)0,同時(shí)將i左移16位(把低16位移到原高16位的位置疏旨,這樣情況1和情況2就能統(tǒng)一后續(xù)的判斷方式)
        //   情況2.i!=0,則第一個(gè)非零值位于高16位很魂,后續(xù)在高16位中繼續(xù)判斷
        // 這個(gè)思路就是二分查找,首先把32位的數(shù)分為高低16位檐涝,如果非零值位于高16位遏匆,后續(xù)再將高16位繼續(xù)二分為高低8位,一直二分到集合中只有1個(gè)元素
        if (i >>> 16 == 0) { n += 16; i <<= 16; }
        // 判斷第一個(gè)非零值是否位于高8位
        if (i >>> 24 == 0) { n +=  8; i <<=  8; }  
        // 判斷第一個(gè)非零值是否位于高4位
        if (i >>> 28 == 0) { n +=  4; i <<=  4; }
        // 判斷第一個(gè)非零值是否位于高2位
        if (i >>> 30 == 0) { n +=  2; i <<=  2; }       
        // 判斷第一個(gè)非零值是否位于左邊第一位
        n -= i >>> 31;                                                                
        return n;

更新器類

第三組AtomicIntegerFieldUpdater谁榜,AtomicLongFieldUpdater幅聘,AtomicReferenceFieldUpdater基于反射的實(shí)用工具,可以對指定類的指定 volatile 字段進(jìn)行原子更新窃植。API非常簡單帝蒿,但是也是有一些約束:

  1. 字段必須是volatile類型的,否則get的時(shí)候會(huì)拋出Exception in thread "main" java.lang.ExceptionInInitializerError Caused by: java.lang.IllegalArgumentException: Must be volatile type異常
  2. 字段的描述類型(修飾符public/protected/default/private)是與調(diào)用者與操作對象字段的關(guān)系一致巷怜。也就是說 調(diào)用者能夠直接操作對象字段葛超,那么就可以反射進(jìn)行原子操作。但是對于父類的字段丛版,子類是不能直接操作的巩掺,盡管子類可以訪問父類的字段。(我試了下页畦,貌似只要字段不是private就行了胖替,大神解釋下)
  3. 只能是實(shí)例變量,不能是類變量豫缨,也就是說不能加static關(guān)鍵字独令。
  4. 只能是可修改變量,不能使final變量好芭,因?yàn)閒inal的語義就是不可修改燃箭。實(shí)際上final的語義和volatile是有沖突的,這兩個(gè)關(guān)鍵字不能同時(shí)存在舍败。
  5. 對于AtomicIntegerFieldUpdater 和AtomicLongFieldUpdater 只能修改int/long類型的字段招狸,不能修改其包裝類型(Integer/Long)敬拓。如果要修改包裝類型就需要使用AtomicReferenceFieldUpdater 。

下面舉個(gè)例子:

public class Test {
    private static class User {
        public volatile String name;
        public volatile String name2;
        public volatile int age;
        public volatile int age2;
        public AtomicInteger age3;

        public User(String name, int age) {
            this.name = name;
            this.name2 = name;
            this.age = age;
            this.age2 = age;
            this.age3 = new AtomicInteger(age);
        }
    }
    // 創(chuàng)建原子更新器,并設(shè)置需要更新的對象類和對象的屬性
    private static AtomicIntegerFieldUpdater<User> a = AtomicIntegerFieldUpdater.newUpdater(User.class, "age");
    private static AtomicReferenceFieldUpdater<User, String> b = AtomicReferenceFieldUpdater.newUpdater(User.class, String.class, "name");
    public static void main(String args[]) {
        User user = new User("zzh", 0);
        int threadCnt = 20;
        Thread[] threads = new Thread[threadCnt];
        CountDownLatch cdt = new CountDownLatch(threadCnt);
        for (int i = 0; i < threadCnt; i++) {
            final int iT = i;
            threads[i] = new Thread(new Runnable() {
                @Override
                public void run() {
                    for (int j = 0; j < 4000; j++) {
                        user.name2 += "0";
                        a.getAndIncrement(user);
                        b.accumulateAndGet(user, "0", new BinaryOperator() {
                            @Override
                            public Object apply(Object o, Object o2) {
                                return o.toString() + o2.toString();
                            }
                        });
                        user.age2++;
                        user.age3.incrementAndGet();
                    }
                    cdt.countDown();
                }
            });
            threads[i].start();
        }
        for (Thread i : threads) {
            try {
                i.join();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
//        while (Thread.activeCount() > 1) {
//            Thread.yield();
//        }
//        try {
//            cdt.await();
//        } catch (InterruptedException e) {
//            e.printStackTrace();
//        }
        System.out.println(a.get(user) + "   " + user.age2 + "   " + user.age3.get() + "  " + b.get(user).length() + "  " + user.name2.length());
    }
        //輸出
        //80000   77295   80000  80003  41319
}

測試結(jié)果是age和age3總是80000即非阻塞同步裙戏,字符串name的長度永遠(yuǎn)都是80003(即zzh后跟8萬個(gè)0的字符串)也保持了數(shù)據(jù)的非阻塞同步乘凸,但是age2總是一個(gè)小于80000的數(shù)字即無法同步,name2的長度也總是小于80003的一個(gè)數(shù)即無法同步累榜。

復(fù)合變量類

防止ABA問題出現(xiàn)而構(gòu)造的類营勤。如什么是ABA問題呢,wiki官方解釋壹罚。假設(shè)多線程環(huán)境中葛作,一個(gè)線程修改A->B,然后又B->A猖凛,另一個(gè)線程看到的值未改變赂蠢,又繼續(xù)修改成自己的期望值。如果我們不關(guān)心中間狀態(tài)的變化辨泳,只關(guān)心最終結(jié)果客年,就無所謂ABA問題。而AtomicStampedReference和AtomicMarkableReference就是為了處理ABA問題而存在的漠吻。

看下AtomicStampedReference是怎么解決這個(gè)問題的:


/**
通過static pair保存一個(gè)引用和計(jì)數(shù)器
*/
private static class Pair<T> {
    final T reference;
    final int stamp;
    private Pair(T reference, int stamp) {
        this.reference = reference;
        this.stamp = stamp;
    }
    static <T> Pair<T> of(T reference, int stamp) {
        return new Pair<T>(reference, stamp);
    }
}
 
private volatile Pair<V> pair;
 
/**
 * 通過傳入的初始化引用和計(jì)數(shù)器來構(gòu)造函數(shù)一個(gè)pair
 *
 * @param initialRef 初始化用用
 * @param initialStamp 初始化計(jì)數(shù)器或者叫時(shí)間戳
 */
public AtomicStampedReference(V initialRef, int initialStamp) {
    pair = Pair.of(initialRef, initialStamp);
}

AtomicStampedReference通過一個(gè)pair來保存初始化引用和計(jì)數(shù)器,以后每次原子操作時(shí)司恳,都需要比較引用和計(jì)數(shù)器是否都正確途乃。舉個(gè)通俗點(diǎn)的例子,你倒了一杯水放桌子上扔傅,干了點(diǎn)別的事耍共,然后同事把你水喝了又給你重新倒了一杯水,你回來看水還在猎塞,拿起來就喝试读,如果你不管水中間被人喝過,只關(guān)心水還在荠耽,這就是ABA問題钩骇。如果你是一個(gè)講衛(wèi)生講文明的小伙子,不但關(guān)心水在不在铝量,還要在你離開的時(shí)候水被人動(dòng)過沒有倘屹,因?yàn)槟闶浅绦騿T,所以就想起了放了張紙?jiān)谂赃吢叮瑢懮铣跏贾?纽匙,別人喝水前麻煩先做個(gè)累加才能喝水。這就是AtomicStampedReference的解決方案拍谐。

看下AtomicStampedReference的方法:

    public boolean compareAndSet(V   expectedReference,
                                 V   newReference,
                                 int expectedStamp,
                                 int newStamp) {
        Pair<V> current = pair;
        return
            expectedReference == current.reference &&
            expectedStamp == current.stamp &&
            ((newReference == current.reference &&
              newStamp == current.stamp) ||
             casPair(current, Pair.of(newReference, newStamp)));
    }

AtomicMarkableReference跟AtomicStampedReference差不多烛缔,AtomicStampedReference是使用pair的int stamp作為計(jì)數(shù)器使用馏段,AtomicMarkableReference的pair使用的是boolean mark。還是那個(gè)水的例子践瓷,AtomicStampedReference可能關(guān)心的是動(dòng)過幾次院喜,AtomicMarkableReference關(guān)心的是有沒有被人動(dòng)過,方法都比較簡單当窗。

LongAdder够坐、LongAccumulator、DoubleAdder崖面、DoubleAccumulator

這四個(gè)類之前是在guava以及hystrix等中出現(xiàn)過元咙,jdk1.8在JUC包中新增了這四個(gè)類,作者是著名的Doug lea巫员。

我們來分析下LongAdder這個(gè)類庶香,其余的類的原理都相似。LongAdder主要處理高并發(fā)情況简识,因?yàn)锳tomicLong雖然能保持?jǐn)?shù)據(jù)同步但是在高并發(fā)情況下CAS失敗率會(huì)很高(即while自旋會(huì)很久)導(dǎo)致效率低下赶掖。

而LongAdder的性能比上面那種還要好很多,首先它有一個(gè)基礎(chǔ)的值base七扰,在發(fā)生競爭的情況下奢赂,會(huì)有一個(gè)Cell數(shù)組用于將不同線程的操作離散到不同的節(jié)點(diǎn)上去(會(huì)根據(jù)需要擴(kuò)容,最大為CPU核數(shù))颈走,sum()會(huì)將所有Cell數(shù)組中的value和base累加作為返回值膳灶。核心的思想就是將AtomicLong將單一value的更新壓力分擔(dān)到多個(gè)value中去,降低單個(gè)value的 “熱度”立由,分段更新T觥!

底競爭下直接更新base锐膜,類似AtomicLong毕箍,高并發(fā)下,會(huì)將每個(gè)線程的操作hash到不同的cells數(shù)組中道盏,從而將AtomicLong中更新而柑,一個(gè)value的行為優(yōu)化之后,分散到多個(gè)value中捞奕,從而降低更新熱點(diǎn)牺堰,而需要得到當(dāng)前值的時(shí)候,直接將所有cell中的value與base相加即可颅围,但是跟AtomicLong(compare and change -> xadd)的CAS不同伟葫,incrementAndGet操作及其變種可以返回更新后的值,而LongAdder返回的是void院促。

public class LongAdder extends Striped64 implements Serializable {
//...
}

LongAdder繼承自Striped64筏养,Striped64內(nèi)部維護(hù)了一個(gè)懶加載的數(shù)組以及一個(gè)額外的base實(shí)力域斧抱,數(shù)組的大小是2的N次方(方便取余操作),使用每個(gè)線程Thread內(nèi)部的哈希值訪問渐溶。

abstract class Striped64 extends Number {
/** Number of CPUS, to place bound on table size */
    static final int NCPU = Runtime.getRuntime().availableProcessors();

    /**
     * Table of cells. When non-null, size is a power of 2.
     */
    transient volatile Cell[] cells;
     
@sun.misc.Contended static final class Cell {
        volatile long value;
        Cell(long x) { value = x; }
        final boolean cas(long cmp, long val) {
            return UNSAFE.compareAndSwapLong(this, valueOffset, cmp, val);
        }

        // Unsafe mechanics
        private static final sun.misc.Unsafe UNSAFE;
        private static final long valueOffset;
        static {
            try {
                UNSAFE = sun.misc.Unsafe.getUnsafe();
                Class<?> ak = Cell.class;
                valueOffset = UNSAFE.objectFieldOffset
                    (ak.getDeclaredField("value"));
            } catch (Exception e) {
                throw new Error(e);
            }
        }
    }
}

數(shù)組的元素是Cell類辉浦,可以看到Cell類用Contended注解修飾,這里主要是解決false sharing(偽共享的問題)茎辐,比如兩個(gè)volatile變量被分配到了同一個(gè)緩存行宪郊,但是這兩個(gè)的更新在高并發(fā)下會(huì)競爭,比如線程A去更新變量a拖陆,線程B去更新變量b疗杉,但是這兩個(gè)變量被分配到了同一個(gè)緩存行栗涂,因此會(huì)造成每個(gè)線程都去爭搶緩存行的所有權(quán)瞧掺,例如A獲取了所有權(quán)然后執(zhí)行更新這時(shí)由于volatile的語義會(huì)造成其刷新到主存案训,但是由于變量b也被緩存到同一個(gè)緩存行,因此就會(huì)造成cache miss速警,這樣就會(huì)造成極大的性能損失叹誉,因此有一些類庫的作者,例如JUC下面的闷旧、Disruptor等都利用了插入dummy 變量的方式长豁,使得緩存行被其獨(dú)占,比如下面這種代碼添加了14行緩沖行防止偽共享:

static final class Cell {
        volatile long p0, p1, p2, p3, p4, p5, p6;
        volatile long value;
        volatile long q0, q1, q2, q3, q4, q5, q6;
        Cell(long x) { value = x; }

        final boolean cas(long cmp, long val) {
            return UNSAFE.compareAndSwapLong(this, valueOffset, cmp, val);
        }

        // Unsafe mechanics
        private static final sun.misc.Unsafe UNSAFE;
        private static final long valueOffset;
        static {
            try {
                UNSAFE = getUnsafe();
                Class<?> ak = Cell.class;
                valueOffset = UNSAFE.objectFieldOffset
                    (ak.getDeclaredField("value"));
            } catch (Exception e) {
                throw new Error(e);
            }
        }
 }

但是這種方式畢竟不通用忙灼,例如32蕉斜、64位操作系統(tǒng)的緩存行大小不一樣,因此JAVA8中就增加了一個(gè)注@sun.misc.Contended解用于解決這個(gè)問題,由JVM去插入這些變量缀棍,具體可以參考openjdk.java.net/jeps/142 ,但是通常來說對象通常是不規(guī)則的分配到內(nèi)存中的机错,但是數(shù)組由于是連續(xù)的內(nèi)存爬范,因此可能會(huì)共享緩存行,因此這里加一個(gè)Contended注解以防cells數(shù)組發(fā)生偽共享的情況弱匪。

來看下核心的addsum方法:

public class LongAdder {
    public void add(long x) {
        Cell[] as; long b, v; int m; Cell a;
        /**
         *  如果是第一次執(zhí)行青瀑,則直接case操作base
         */
        if ((as = cells) != null || !casBase(b = base, b + x)) {
            boolean uncontended = true;
            /**
             * as數(shù)組為空(null或者size為0)
             * 或者當(dāng)前線程取模as數(shù)組大小為空
             * 或者cas更新Cell失敗
             */
            if (as == null || (m = as.length - 1) < 0 ||
                (a = as[getProbe() & m]) == null ||
                !(uncontended = a.cas(v = a.value, v + x)))
                longAccumulate(x, null, uncontended);
        }
    }

    public long sum() {
       //通過累加base與cells數(shù)組中的value從而獲得sum
        Cell[] as = cells; Cell a;
        long sum = base;
        if (as != null) {
            for (int i = 0; i < as.length; ++i) {
                if ((a = as[i]) != null)
                    sum += a.value;
            }
        }
        return sum;
    }
}

其中casBase操作其實(shí)就是Unsafe的CAS操作。LongAdder的分段更新操作會(huì)帶來空間上的浪費(fèi)(Cell數(shù)組導(dǎo)致)萧诫,可以空間換時(shí)間斥难,但是,能不換就不換帘饶! 所以哑诊,只有并發(fā)高到一定程度了,才會(huì)分段更新及刻,因?yàn)榈筒l(fā)時(shí)镀裤,casBase操作基本都會(huì)成功竞阐。

由于Cell相對來說比較占內(nèi)存,因此這里采用懶加載的方式暑劝,在無競爭的情況下直接更新base域骆莹,在第一次發(fā)生競爭的時(shí)候(CAS失敗)就會(huì)創(chuàng)建一個(gè)大小為2的cells數(shù)組,每次擴(kuò)容都是加倍(加倍操作由longAccumulate方法判斷并執(zhí)行)担猛,只到達(dá)到CPU核數(shù)幕垦。同時(shí)我們知道擴(kuò)容數(shù)組等行為需要只能有一個(gè)線程同時(shí)執(zhí)行,因此需要一個(gè)鎖傅联,這里通過CAS更新cellsBusy來實(shí)現(xiàn)一個(gè)簡單的spin lock先改。

數(shù)組訪問索引是通過Thread里的threadLocalRandomProbe域取模實(shí)現(xiàn)的,這個(gè)域是ThreadLocalRandom更新的纺且,cells的數(shù)組大小被限制為CPU的核數(shù)盏道,因?yàn)榧词褂谐^核數(shù)個(gè)線程去更新,但是每個(gè)線程也只會(huì)和一個(gè)CPU綁定载碌,更新的時(shí)候頂多會(huì)有cpu核數(shù)個(gè)線程猜嘱,因此我們只需要通過hash將不同線程的更新行為離散到不同的slot即可。

我們知道線程嫁艇、線程池會(huì)被關(guān)閉或銷毀朗伶,這個(gè)時(shí)候可能這個(gè)線程之前占用的slot就會(huì)變成沒人用的,但我們也不能清除掉步咪,因?yàn)橐话鉾eb應(yīng)用都是長時(shí)間運(yùn)行的论皆,線程通常也會(huì)動(dòng)態(tài)創(chuàng)建、銷毀猾漫,很可能一段時(shí)間后又會(huì)被其他線程占用点晴,而對于短時(shí)間運(yùn)行的,例如單元測試悯周,清除掉有啥意義呢粒督?

用Doug Lea說的一句話: 低并發(fā)時(shí)LongAdder和AtomicLong性能差不多,高并發(fā)時(shí)LongAdder更高效禽翼。其實(shí)LongAdder就是以空間(Cell數(shù)組)換時(shí)間(CAS操作)屠橄。


參考文獻(xiàn):

  1. 原子操作類AtomicInteger詳解
  2. java-juc-原子類-AtomicIntegerArray初探| cruise yang
  3. Integer.numberOfLeadingZeros(int i) - u010667082 - CSDN博客
  4. JUC源碼分析4-原子變量-AtomicStampedReference/AtomicMarkableReference
  5. Java并發(fā)工具類之LongAdder原理總結(jié)- 后端- 掘金
  6. 從LONGADDER看更高效的無鎖實(shí)現(xiàn)
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市闰挡,隨后出現(xiàn)的幾起案子锐墙,更是在濱河造成了極大的恐慌,老刑警劉巖长酗,帶你破解...
    沈念sama閱讀 216,402評(píng)論 6 499
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件溪北,死亡現(xiàn)場離奇詭異,居然都是意外死亡,警方通過查閱死者的電腦和手機(jī)刻盐,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,377評(píng)論 3 392
  • 文/潘曉璐 我一進(jìn)店門掏膏,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人敦锌,你說我怎么就攤上這事馒疹。” “怎么了乙墙?”我有些...
    開封第一講書人閱讀 162,483評(píng)論 0 353
  • 文/不壞的土叔 我叫張陵颖变,是天一觀的道長。 經(jīng)常有香客問我听想,道長腥刹,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 58,165評(píng)論 1 292
  • 正文 為了忘掉前任汉买,我火速辦了婚禮衔峰,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘蛙粘。我一直安慰自己垫卤,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,176評(píng)論 6 388
  • 文/花漫 我一把揭開白布出牧。 她就那樣靜靜地躺著穴肘,像睡著了一般。 火紅的嫁衣襯著肌膚如雪舔痕。 梳的紋絲不亂的頭發(fā)上评抚,一...
    開封第一講書人閱讀 51,146評(píng)論 1 297
  • 那天,我揣著相機(jī)與錄音伯复,去河邊找鬼慨代。 笑死,一個(gè)胖子當(dāng)著我的面吹牛啸如,可吹牛的內(nèi)容都是我干的鱼响。 我是一名探鬼主播,決...
    沈念sama閱讀 40,032評(píng)論 3 417
  • 文/蒼蘭香墨 我猛地睜開眼组底,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了筐骇?” 一聲冷哼從身側(cè)響起债鸡,我...
    開封第一講書人閱讀 38,896評(píng)論 0 274
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎铛纬,沒想到半個(gè)月后厌均,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,311評(píng)論 1 310
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡告唆,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,536評(píng)論 2 332
  • 正文 我和宋清朗相戀三年棺弊,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了晶密。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 39,696評(píng)論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡模她,死狀恐怖稻艰,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情侈净,我是刑警寧澤尊勿,帶...
    沈念sama閱讀 35,413評(píng)論 5 343
  • 正文 年R本政府宣布,位于F島的核電站畜侦,受9級(jí)特大地震影響元扔,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜旋膳,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,008評(píng)論 3 325
  • 文/蒙蒙 一澎语、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧验懊,春花似錦擅羞、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,659評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至歌溉,卻和暖如春垄懂,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背痛垛。 一陣腳步聲響...
    開封第一講書人閱讀 32,815評(píng)論 1 269
  • 我被黑心中介騙來泰國打工草慧, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人匙头。 一個(gè)月前我還...
    沈念sama閱讀 47,698評(píng)論 2 368
  • 正文 我出身青樓漫谷,卻偏偏與公主長得像,于是被迫代替她去往敵國和親蹂析。 傳聞我的和親對象是個(gè)殘疾皇子舔示,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,592評(píng)論 2 353