LongAdder源碼深入剖析

一阱飘、簡介

?在之前的《ConcurrentHashMap深入剖析(JDK8)》文章中,我們看到了CounterCell的實現(xiàn)沿用了LongAdder的分段計數(shù)的原理絮供;那么這一篇我們來重點看下LongAdder的實現(xiàn)原理墩新。
?我們先來這個類的描述:

/**
 * One or more variables that together maintain an initially zero
 * {@code long} sum.  When updates (method {@link #add}) are contended
 * across threads, the set of variables may grow dynamically to reduce
 * contention. Method {@link #sum} (or, equivalently, {@link
 * #longValue}) returns the current total combined across the
 * variables maintaining the sum.
 *
 * <p>This class is usually preferable to {@link AtomicLong} when
 * multiple threads update a common sum that is used for purposes such
 * as collecting statistics, not for fine-grained synchronization
 * control.  Under low update contention, the two classes have similar
 * characteristics. But under high contention, expected throughput of
 * this class is significantly higher, at the expense of higher space
 * consumption.
 *
 * <p>LongAdders can be used with a {@link
 * java.util.concurrent.ConcurrentHashMap} to maintain a scalable
 * frequency map (a form of histogram or multiset). For example, to
 * add a count to a {@code ConcurrentHashMap<String,LongAdder> freqs},
 * initializing if not already present, you can use {@code
 * freqs.computeIfAbsent(k -> new LongAdder()).increment();}
 *
 * <p>This class extends {@link Number}, but does <em>not</em> define
 * methods such as {@code equals}, {@code hashCode} and {@code
 * compareTo} because instances are expected to be mutated, and so are
 * not useful as collection keys.
 *
 * @since 1.8
 * @author Doug Lea
 */

?第二段中很清晰的表明LongAdder在高并發(fā)的場景下會比AtomicLong 具有更好的性能倍谜,代價是消耗更多的內(nèi)存空間崩泡。那么戒洼,有如下問題:

為什么要引入LongAdder?AtomicLong在高并發(fā)的場景下有什么問題嗎允华? 如果低并發(fā)環(huán)境下,LongAdder和AtomicLong性能差不多寥掐,那LongAdder是否就可以替代AtomicLong了靴寂?

為什么要引入LongAdder?

?我們知道召耘,AtomicLong是利用底層的CAS操作來提供并發(fā)性的百炬,比如addAndGet方法:

/**
     * Atomically adds the given value to the current value.
     *
     * @param delta the value to add
     * @return the updated value
     */
    public final long addAndGet(long delta) {
        return unsafe.getAndAddLong(this, valueOffset, delta) + delta;
    }

?上述方法調(diào)用了Unsafe類的getAndAddLong方法,該方法是一個native方法污它,它的邏輯是采用自旋的方式不斷更新目標值剖踊,直到更新成功。(也即樂觀鎖的實現(xiàn)模式)
?在并發(fā)量比較低的情況下衫贬,線程沖突的概率比較小德澈,自旋的次數(shù)不會很多。但是固惯,高并發(fā)情況下梆造,N個線程同時進行自旋操作,會出現(xiàn)大量失敗并不斷自旋的場景葬毫,此時AtomicLong的自旋會成為瓶頸镇辉。
?這就是LongAdder引入的初衷------解決高并發(fā)環(huán)境下AtomictLong的自旋瓶頸問題。

LongAdder為什么高并發(fā)性場景能會更好贴捡?

?既然忽肛,在高并發(fā)場景下LongAdder可以顯著提升性能,那么它時如何做到的烂斋?這一部分簡述下LongAdder的實現(xiàn)思路屹逛,具體原理在源碼剖析的章節(jié)會重點闡述础废。
?我們知道,AtomicLong中有個內(nèi)部變量value保存著實際的long的值煎源,所有的操作都是針對該變量進行操作的色迂。也就是說,高并發(fā)場景下手销,value變量其實就是一個熱點歇僧,也就是N個線程競爭的一個熱點。
?LongAdder的基本思路就是分散熱點锋拖,將value值分散到一個數(shù)組中诈悍,不同線程會命中到數(shù)組的不同槽中,各個線程只對自己槽中的那個值進行CAS操作兽埃,這樣熱點就被分散了侥钳,沖突的概率就小很多。如果要獲取真正的long值柄错,只要將各個槽中的變量值累加返回即可舷夺。
?這種做法其實跟JDK1.8之前的ConcurrentHashMap“分段鎖”的實現(xiàn)思路是一樣的:分散熱點。

LongAdder能否替代AtomicLong售貌?

?我們先來看看LongAdder提供的API:


LongAdder的API




AtomicLong的API

?從上面的圖中可以看到给猾,LongAdder的API和AtomicLong的API還是有比較大的差異,而且AtomicLong提供的功能更豐富颂跨,尤其是addAndGet敢伸、decrementAndGet、compareAndSet這些方法恒削。addAndGet池颈、decrementAndGet除了單純的做自增自減外,還可以立即獲取增減后的值钓丰,而LongAdder則需要做同步控制才能精確獲取增減后的值躯砰。如果業(yè)務(wù)需求需要精確的控制計數(shù),則使用AtomicLong比較合適携丁;
?另外弃揽,從空間方面考慮,LongAdder其實是一種“空間換時間”的思想则北,從這一點來講AtomicLong更合適矿微。
?低并發(fā)、一般的業(yè)務(wù)嘗盡下AtomicLong是足夠了尚揣,如果并發(fā)量很多涌矢,存在大量寫多讀少的情況,那LongAdder可能更合適快骗。


二娜庇、LongAdder原理

?上文已經(jīng)說過了塔次,AtomicLong是多個線程針對單個熱點值value進行原子操作。而LongAdder是每個線程擁有自己的槽名秀,各個線程一般只對自己槽中的那個值進行CAS操作励负。

比如有三個Thread1、Thread2匕得、Thread3继榆,每個線程對value增加10。

?對于AtomicLong汁掠,最終結(jié)果的計算值始終是下面這個形式:

value = 10 + 10 + 10 = 30

?但是對于LongAdder來說略吨,內(nèi)部有一個base變量,一個Cell[]數(shù)組考阱。

  • base變量:非競態(tài)條件下翠忠,直接累加到該變量上
  • Cell[]數(shù)組:競態(tài)條件下,累加各個線程自己的槽Cell[i]中

?最終結(jié)果的計算是下面的形式:



三乞榨、LongAdder源碼剖析

?我們先來看下LongAdder類的類簽名:

public class LongAdder extends Striped64 implements Serializable 

?LongAdder繼承了Striped64類秽之,來實現(xiàn)累加功能,它是實現(xiàn)高并發(fā)累加的工具類吃既;Striped64的設(shè)計核心思路就是通過內(nèi)部的分散計算來避免競爭政溃。Striped64內(nèi)部包含一個base和一個Cell[] cells數(shù)組,又叫hash表态秧。
?沒有競爭的情況下,要累加的數(shù)通過cas累加到base上扼鞋;如果有競爭的話,會將要累加的數(shù)累加到Cells數(shù)組中的某個cell元素里面云头。這樣Striped64的值就跟原理章節(jié)中描述的一樣捐友,滿足了求和公式昏滴;

3.1 Striped64類

?Striped64類有三個重要的成員變量:

/**
     * Table of cells. When non-null, size is a power of 2.
     * 存放Cell的hash表,大小為2的冪宜狐。
     */
    transient volatile Cell[] cells;

    /**
     * Base value, used mainly when there is no contention, but also as
     * a fallback during table initialization races. Updated via CAS.
     * 基礎(chǔ)值
     * 1. 在沒有競爭時會更新這個值;
     * 2. 在cells初始化的過程中,cells處于不可用的狀態(tài),這時候也會嘗試將通過cas操作值累加到base厘熟。
     */
    transient volatile long base;

    /**
     * Spinlock (locked via CAS) used when resizing and/or creating Cells.
     * 自旋鎖绳姨,通過CAS操作加鎖跪削,用于保護創(chuàng)建或者擴展Cell表。
     */
    transient volatile int cellsBusy;
成員cells

?cells數(shù)組是LongAdder高性能實現(xiàn)的必殺器:
?AtomicInteger只有一個value,所以線程累加都要通過cas競爭value這一個變量叶眉,高并發(fā)下線程爭用非常嚴重。
?而LongAdder則有兩個值用于累加,一個是base,它的作用類似于AtomicInteger里面的value卑惜,在沒有競爭的情況下不會用到cells數(shù)組,它為null七婴,這時使用base做累加,有了競爭后cells數(shù)組就上場了撩笆,第一次初始化長度為2泣栈,以后每次擴容都是變?yōu)樵瓉淼膬杀渡」悖钡絚ells數(shù)組的長度大于等于當前服務(wù)器cpu的數(shù)量為止就不在擴容穗酥。那么為什么到超過cpu數(shù)量的時候就不再擴容了呜师?每個線程會通過線程對cells[threadLocalRandomProbe%cells.length]位置的Cell對象中的value做累加菩混,這樣相當于將線程綁定到了cells中的某個Cell對象上。

成員變量cellsBusy

?cellsBusy,它有兩個值0或1扁藕,它的作用是當要修改cells數(shù)組時加鎖沮峡,防止多線程同時修改cells數(shù)組(也稱cells表),0為無鎖亿柑,1位加鎖邢疙,加鎖的狀況有三種:

  1. cells數(shù)組初始化的時候;
  2. cells數(shù)組擴容的時候望薄;
  3. 如果cells數(shù)組中某個元素為null疟游,給這個位置創(chuàng)建新的Cell對象的時候;
成員變量base

?它有兩個作用:

    1. 在開始沒有競爭的情況下痕支,將累加值累加到base颁虐;
    1. 在cells初始化的過程中,cells不可用采转,這時會嘗試將值累加到base上聪廉;

3.2 Cell內(nèi)部類

/**
     * Padded variant of AtomicLong supporting only raw accesses plus CAS.
     *
     * JVM intrinsics note: It would be possible to use a release-only
     * form of CAS here, if it were provided.
     *  //為提高性能,使用注解@sun.misc.Contended故慈,用來避免偽共享
     */
    @sun.misc.Contended static final class Cell {
        //用來保存要累加的值
        volatile long value;
        Cell(long x) { value = x; }
        //使用UNSAFE類的cas來更新value值
        final boolean cas(long cmp, long val) {
            return UNSAFE.compareAndSwapLong(this, valueOffset, cmp, val);
        }

        // Unsafe mechanics
        private static final sun.misc.Unsafe UNSAFE;
        //value在Cell類中存儲位置的偏移量板熊;
        private static final long valueOffset;
        //這個靜態(tài)方法用于獲取偏移量
        static {
            try {
                UNSAFE = sun.misc.Unsafe.getUnsafe();
                Class<?> ak = Cell.class;
                valueOffset = UNSAFE.objectFieldOffset
                    (ak.getDeclaredField("value"));
            } catch (Exception e) {
                throw new Error(e);
            }
        }
    }

?這個類相對來說比較簡單,volatile類型的value用來保存要累加的值察绷;final類型的valueOffset表示value在Cell類中存儲位置的偏移量干签;Cell類重點需要關(guān)注注解@sun.misc.Contended;這個注解是JDK1.8中用來解決偽共享的問題拆撼,具體偽共享可以參見《偽共享容劳、緩存行填充和CPU緩存詳述》這一篇文章,這里不再詳述闸度;

3.3 LongAdder

?LongAdder我們關(guān)注下重點方法竭贩,首先看下add方法。

add方法

?add方法是LongAdder累加的方法莺禁,傳入的參數(shù)x為要累加的值留量;

/**
     * Adds the given value.
     *
     * @param x the value to add
     */
    public void add(long x) {
        Cell[] as; long b, v; int m; Cell a;
        /**
         * 如果一下兩種條件則繼續(xù)執(zhí)行if內(nèi)的語句
         * 1. cells數(shù)組不為null(不存在爭用的時候,cells數(shù)組一定為null,一旦對base的cas操作失敗楼熄,才會初始化cells數(shù)組)
         * 2. 如果cells數(shù)組為null忆绰,如果casBase執(zhí)行成功,則直接返回可岂,如果casBase方法執(zhí)行失敶砀摇(casBase失敗,說明第一次爭用沖突產(chǎn)生缕粹,需要對cells數(shù)組初始化)進入if內(nèi)稚茅;
         * casBase方法很簡單,就是通過UNSAFE類的cas設(shè)置成員變量base的值為base+要累加的值
         * casBase執(zhí)行成功的前提是無競爭平斩,這時候cells數(shù)組還沒有用到為null峰锁,可見在無競爭的情況下是類似于AtomticInteger處理方式,使用cas做累加双戳。
         */
        if ((as = cells) != null || !casBase(b = base, b + x)) {
            //uncontended判斷cells數(shù)組中虹蒋,當前線程要做cas累加操作的某個元素是否不存在爭用,如果cas失敗則存在爭用飒货;uncontended=false代表存在爭用魄衅,uncontended=true代表不存在爭用。
            boolean uncontended = true;
            /**
             *1. as == null : cells數(shù)組未被初始化塘辅,成立則直接進入if執(zhí)行cell初始化
             *2. (m = as.length - 1) < 0: cells數(shù)組的長度為0
             *條件1與2都代表cells數(shù)組沒有被初始化成功晃虫,初始化成功的cells數(shù)組長度為2;
             *3. (a = as[getProbe() & m]) == null :如果cells被初始化扣墩,且它的長度不為0哲银,則通過getProbe方法獲取當前線程Thread的threadLocalRandomProbe變量的值,初始為0呻惕,然后執(zhí)行threadLocalRandomProbe&(cells.length-1 ),相當于m%cells.length;如果cells[threadLocalRandomProbe%cells.length]的位置為null荆责,這說明這個位置從來沒有線程做過累加,需要進入if繼續(xù)執(zhí)行亚脆,在這個位置創(chuàng)建一個新的Cell對象做院;
             *4. !(uncontended = a.cas(v = a.value, v + x)):嘗試對cells[threadLocalRandomProbe%cells.length]位置的Cell對象中的value值做累加操作,并返回操作結(jié)果,如果失敗了則進入if,重新計算一個threadLocalRandomProbe濒持;

             如果進入if語句執(zhí)行l(wèi)ongAccumulate方法,有三種情況
             1. 前兩個條件代表cells沒有初始化键耕,
             2. 第三個條件指當前線程hash到的cells數(shù)組中的位置還沒有其它線程做過累加操作,
             3. 第四個條件代表產(chǎn)生了沖突,uncontended=false
             **/
            if (as == null || (m = as.length - 1) < 0 ||
                (a = as[getProbe() & m]) == null ||
                !(uncontended = a.cas(v = a.value, v + x)))
                longAccumulate(x, null, uncontended);
        }
    }

?如上所示柑营,只有從未出現(xiàn)過并發(fā)沖突的時候屈雄,base基數(shù)才會使用到,一旦出現(xiàn)了并發(fā)沖突官套,之后所有的操作都只是針對cell[]數(shù)組中的單元Cell酒奶。如果cell[]數(shù)組未初始化蓖议,會調(diào)用父類的longAccumulate去初始化cell[],如果cell[]已經(jīng)初始化但沖突發(fā)生在Cell單元內(nèi)讥蟆,則也是調(diào)用父類的longAccumulate,此時可能需要對cell[]擴容了纺阔。接下來我們來看下longAccumulate方法瘸彤。

longAccumulate方法

?三個參數(shù)第一個為要累加的值,第二個為null笛钝,第三個為wasUncontended表示調(diào)用方法之前的add方法是否未發(fā)生競爭;

/**
     * Handles cases of updates involving initialization, resizing,
     * creating new Cells, and/or contention. See above for
     * explanation. This method suffers the usual non-modularity
     * problems of optimistic retry code, relying on rechecked sets of
     * reads.
     *
     * @param x the value
     * @param fn the update function, or null for add (this convention
     * avoids the need for an extra field or function in LongAdder).
     * @param wasUncontended false if CAS failed before call
     */
    final void longAccumulate(long x, LongBinaryOperator fn,
                              boolean wasUncontended) {
        //獲取當前線程的threadLocalRandomProbe值作為hash值,如果當前線程的threadLocalRandomProbe為0质况,說明當前線程是第一次進入該方法,則強制設(shè)置線程的threadLocalRandomProbe為ThreadLocalRandom類的成員靜態(tài)私有變量probeGenerator的值玻靡,后面會詳細將hash值的生成;
        //另外需要注意结榄,如果threadLocalRandomProbe=0,代表新的線程開始參與cell爭用的情況
        //1.當前線程之前還沒有參與過cells爭用(也許cells數(shù)組還沒初始化囤捻,進到當前方法來就是為了初始化cells數(shù)組后爭用的),是第一次執(zhí)行base的cas累加操作失斁世省;
        //2.或者是在執(zhí)行add方法時蝎土,對cells某個位置的Cell的cas操作第一次失敗视哑,則將wasUncontended設(shè)置為false,那么這里會將其重新置為true誊涯;第一次執(zhí)行操作失數惨恪;
        //凡是參與了cell爭用操作的線程threadLocalRandomProbe都不為0暴构;
        int h;
        if ((h = getProbe()) == 0) {
            //初始化ThreadLocalRandom;
            ThreadLocalRandom.current(); // force initialization
            //將h設(shè)置為0x9e3779b9
            h = getProbe();
            //設(shè)置未競爭標記為true
            wasUncontended = true;
        }
        //cas沖突標志跪呈,表示當前線程hash到的Cells數(shù)組的位置,做cas累加操作時與其它線程發(fā)生了沖突取逾,cas失敽穆獭;collide=true代表有沖突砾隅,collide=false代表無沖突
        boolean collide = false;                // True if last slot nonempty
        for (;;) {
            Cell[] as; Cell a; int n; long v;
            //這個主干if有三個分支
            //1.CASE1:處理cells數(shù)組已經(jīng)正常初始化了的情況(這個if分支處理add方法的四個條件中的3和4)
            //2.CASE2:處理cells數(shù)組沒有初始化或者長度為0的情況缭乘;(這個分支處理add方法的四個條件中的1和2)
            //3.CASE3:處理如果cell數(shù)組沒有初始化,并且其它線程正在執(zhí)行對cells數(shù)組初始化的操作琉用,及cellbusy=1堕绩;則嘗試將累加值通過cas累加到base上
            //先看主分支一
            if ((as = cells) != null && (n = as.length) > 0) {
                /**
                 *CASE1:這個是處理add方法內(nèi)部if分支的條件3:如果被hash到的位置為null,說明沒有線程在這個位置設(shè)置過值邑时,沒有競爭奴紧,可以直接使用,則用x值作為初始值創(chuàng)建一個新的Cell對象晶丘,對cells數(shù)組使用cellsBusy加鎖黍氮,然后將這個Cell對象放到cells[m%cells.length]位置上
                 */
                if ((a = as[(n - 1) & h]) == null) {
                    //cellsBusy == 0 代表當前沒有線程cells數(shù)組做修改
                    if (cellsBusy == 0) {       // Try to attach new Cell
                        //將要累加的x值作為初始值創(chuàng)建一個新的Cell對象唐含,
                        Cell r = new Cell(x);   // Optimistically create
                        //如果cellsBusy=0無鎖,則通過cas將cellsBusy設(shè)置為1加鎖
                        if (cellsBusy == 0 && casCellsBusy()) {
                            //標記Cell是否創(chuàng)建成功并放入到cells數(shù)組被hash的位置上
                            boolean created = false;
                            try {               // Recheck under lock
                                Cell[] rs; int m, j;
                                //再次檢查cells數(shù)組不為null沫浆,且長度不為空捷枯,且hash到的位置的Cell為null
                                if ((rs = cells) != null &&
                                    (m = rs.length) > 0 &&
                                    rs[j = (m - 1) & h] == null) {
                                    //將新的cell設(shè)置到該位置
                                    rs[j] = r;
                                    created = true;
                                }
                            } finally {
                                //去掉鎖
                                cellsBusy = 0;
                            }
                            //生成成功,跳出循環(huán)
                            if (created)
                                break;
                            //如果created為false专执,說明上面指定的cells數(shù)組的位置cells[m%cells.length]已經(jīng)有其它線程設(shè)置了cell了淮捆,繼續(xù)執(zhí)行循環(huán)。
                            continue;           // Slot is now non-empty
                        }
                    }
                    //如果執(zhí)行的當前行本股,代表cellsBusy=1攀痊,有線程正在更改cells數(shù)組,代表產(chǎn)生了沖突拄显,將collide設(shè)置為false
                    collide = false;
                }
                /**
                 * case2:如果add方法中條件4的通過cas設(shè)置cells[m%cells.length]位置的Cell對象中的value值設(shè)置為v+x失敗,說明已經(jīng)發(fā)生競爭苟径,將wasUncontended設(shè)置為true,跳出內(nèi)部的if判斷躬审,最后重新計算一個新的probe棘街,然后重新執(zhí)行循環(huán);
                 */
                else if (!wasUncontended)       // CAS already known to fail
                    //設(shè)置未競爭標志位true,繼續(xù)執(zhí)行承边,后面會算一個新的probe值蹬碧,然后重新執(zhí)行循環(huán)。
                    wasUncontended = true;      // Continue after rehash
                /**
                 * case3:新的爭用線程參與爭用的情況:處理剛進入當前方法時threadLocalRandomProbe=0的情況炒刁,也就是當前線程第一次參與cell爭用的cas失敗恩沽,這里會嘗試將x值加到cells[m%cells.length]的value ,如果成功直接退出
                 */
                else if (a.cas(v = a.value, ((fn == null) ? v + x :
                                             fn.applyAsLong(v, x))))
                    break;
                /**
                 *  case4:case3處理新的線程爭用執(zhí)行失敗了翔始,這時如果cells數(shù)組的長度已經(jīng)到了最大值(大于等于cup數(shù)量)罗心,或者是當前cells已經(jīng)做了擴容,則將collide設(shè)置為false城瞎,后面重新計算prob的值
                 */
                else if (n >= NCPU || cells != as)
                    collide = false;            // At max size or stale
                /**
                 * case5:如果發(fā)生了沖突collide=false渤闷,則設(shè)置其為true;會在最后重新計算hash值后脖镀,進入下一次for循環(huán)
                 */
                else if (!collide)
                    //設(shè)置沖突標志飒箭,表示發(fā)生了沖突,需要再次生成hash蜒灰,重試锈拨。 如果下次重試任然走到了改分支此時collide=true澜术,!collide條件不成立纳决,則走后一個分支
                    collide = true;
                /**
                 * case6:擴容cells數(shù)組游沿,新參與cell爭用的線程兩次均失敗,且符合庫容條件翅溺,會執(zhí)行該分支
                 */
                else if (cellsBusy == 0 && casCellsBusy()) {
                    try {
                        //檢查cells是否已經(jīng)被擴容
                        if (cells == as) {      // Expand table unless stale
                            Cell[] rs = new Cell[n << 1];
                            for (int i = 0; i < n; ++i)
                                rs[i] = as[i];
                            cells = rs;
                        }
                    } finally {
                        cellsBusy = 0;
                    }
                    collide = false;
                    continue;                   // Retry with expanded table
                }
                //為當前線程重新計算hash值
                h = advanceProbe(h);
            }
            //這個大的分支處理add方法中的條件1與條件2成立的情況脑漫,如果cell表還未初始化或者長度為0髓抑,先嘗試獲取cellsBusy鎖。
            else if (cellsBusy == 0 && cells == as && casCellsBusy()) {
                boolean init = false;
                try {                           // Initialize table
                    //初始化cells數(shù)組优幸,初始容量為2,并將x值通過hash&1吨拍,放到0個或第1個位置上
                    if (cells == as) {
                        Cell[] rs = new Cell[2];
                        rs[h & 1] = new Cell(x);
                        cells = rs;
                        init = true;
                    }
                } finally {
                    //解鎖
                    cellsBusy = 0;
                }
                //如果init為true說明初始化成功,跳出循環(huán)
                if (init)
                    break;
            }
            /**
             *如果以上操作都失敗了网杆,則嘗試將值累加到base上羹饰;
             */
            else if (casBase(v = base, ((fn == null) ? v + x :
                                        fn.applyAsLong(v, x))))
                break;                          // Fall back on using base
        }
    }
Striped64類longAccumulate方法原理
getProbe()方法hash的生成

?hash是LongAdder定位當前線程應(yīng)該將值累加到cells表的哪個位置上的,所以hash算法是非常重要的跛璧,接下來看看它的實現(xiàn);
?在java的Thread類中有一個成員變量:

/** Probe hash value; nonzero if threadLocalRandomSeed initialized */
    @sun.misc.Contended("tlr")
    int threadLocalRandomProbe;

?threadLocalRandomProbe這個變量的值就是LongAdder用來hash定位Cells數(shù)組位置的新啼,平時線程的這個變量一般用不到追城,它的值一直都是0。在LongAdder的父類Striped64里通過getProbe方法獲取當前線程的threadLocalRandomProbe值:

/**
     * Returns the probe value for the current thread.
     * Duplicated from ThreadLocalRandom because of packaging restrictions.
     */
    static final int getProbe() {
        //PROBE是threadLocalRandomProbe變量在Thread類里面的偏移量燥撞,所以下面語句獲取的就是threadLocalRandomProbe的值座柱;
        return UNSAFE.getInt(Thread.currentThread(), PROBE);
    }
threadLocalRandomProbe的初始化

?線程對LongAdder的累加操作,在沒有進入longAccumulate方法前物舒,threadLocalRandomProbe一直都是0色洞,當發(fā)生爭用后才會進入longAccumulate方法中,進入該方法第一件事就是判斷threadLocalRandomProbe是否為0冠胯,如果為0火诸,則將其設(shè)置為0x9e3779b9

int h;
        if ((h = getProbe()) == 0) { // 給當前線程生成一個非0的hash值
            //初始化ThreadLocalRandom;
            ThreadLocalRandom.current(); // force initialization
            //將h設(shè)置為0x9e3779b9
            h = getProbe();
            //設(shè)置未競爭標記為true
            wasUncontended = true;
        }

?重點在 ThreadLocalRandom.current(); 這一行

/**
     * Returns the current thread's {@code ThreadLocalRandom}.
     *
     * @return the current thread's {@code ThreadLocalRandom}
     */
    public static ThreadLocalRandom current() {
        if (UNSAFE.getInt(Thread.currentThread(), PROBE) == 0)
            localInit();
        return instance;
    }

?在current方法中判斷如果probe的值為0荠察,則執(zhí)行l(wèi)ocaInit()方法置蜀,將當前線程的probe設(shè)置為非0的值,該方法實現(xiàn)如下:

/**
     * Initialize Thread fields for the current thread.  Called only
     * when Thread.threadLocalRandomProbe is zero, indicating that a
     * thread local seed value needs to be generated. Note that even
     * though the initialization is purely thread-local, we need to
     * rely on (static) atomic generators to initialize the values.
     */
    static final void localInit() {
        //private static final int PROBE_INCREMENT = 0x9e3779b9;
        int p = probeGenerator.addAndGet(PROBE_INCREMENT);
        //prob不能為0
        int probe = (p == 0) ? 1 : p; // skip 0
        long seed = mix64(seeder.getAndAdd(SEEDER_INCREMENT));
        //獲取當前線程
        Thread t = Thread.currentThread();
        UNSAFE.putLong(t, SEED, seed);
        //將probe的值更新為probeGenerator的值
        UNSAFE.putInt(t, PROBE, probe);
    }

?probeGenerator 是static 類型的AtomicInteger類悉盆,每執(zhí)行一次localInit()方法盯荤,都會將probeGenerator 累加一次0x9e3779b9這個值;焕盟,0x9e3779b9這個數(shù)字的得來是 2^32 除以一個常數(shù)秋秤,這個常數(shù)就是傳說中的黃金比例 1.6180339887;然后將當前線程的threadLocalRandomProbe設(shè)置為probeGenerator 的值脚翘,如果probeGenerator 為0灼卢,這取1;

threadLocalRandomProbe重新生成

?就是將prob的值左右移位 来农、異或操作三次

/**
     * Pseudo-randomly advances and records the given probe value for the
     * given thread.
     */
    static final int advanceProbe(int probe) {
        probe ^= probe << 13;   // xorshift
        probe ^= probe >>> 17;
        probe ^= probe << 5;
        UNSAFE.putInt(Thread.currentThread(), PROBE, probe);
        return probe;
    }
LongAdder的sum方法

?最后來看下LongAdder的sum方法芥玉,

 /**
     * Returns the current sum.  The returned value is <em>NOT</em> an
     * atomic snapshot; invocation in the absence of concurrent
     * updates returns an accurate result, but concurrent updates that
     * occur while the sum is being calculated might not be
     * incorporated.
     *
     * @return the sum
     */
    public long sum() {
        Cell[] as = cells; Cell a;
        long sum = base;
        if (as != null) {
            for (int i = 0; i < as.length; ++i) {
                if ((a = as[i]) != null)
                    sum += a.value;
            }
        }
        return sum;
    }

? 這個其實印證了我們原理章節(jié)中所說的一個求和公式;需要注意的是备图,這個方法只能得到某個時刻的近似值灿巧,這也就是LongAdder并不能完全替代LongAtomic的原因之一赶袄。

四、Striped64的其它子類

?JDK1.8時抠藕,java.util.concurrent.atomic包中饿肺,除了新引入LongAdder外,還有引入了它的三個兄弟類:LongAccumulator盾似、DoubleAdder敬辣、DoubleAccumulator

LongAccumulator

?LongAccumulator是LongAdder的增強版。LongAdder只能針對數(shù)值的進行加減運算零院,而LongAccumulator提供了自定義的函數(shù)操作溉跃。其構(gòu)造函數(shù)如下:

/**
     * Creates a new instance using the given accumulator function
     * and identity element.
     * @param accumulatorFunction a side-effect-free function of two arguments
     * @param identity identity (initial value) for the accumulator function
     */
    public LongAccumulator(LongBinaryOperator accumulatorFunction,
                           long identity) {
        this.function = accumulatorFunction;
        base = this.identity = identity;
    }

?通過LongBinaryOperator,可以自定義對入?yún)⒌娜我獠僮鞲娉⒎祷亟Y(jié)果(LongBinaryOperator接收2個long作為參數(shù)撰茎,并返回1個long)

?LongAccumulator內(nèi)部原理和LongAdder幾乎完全一樣,都是利用了父類Striped64的longAccumulate方法打洼。

DoubleAdder和DoubleAccumulator

?從名字也可以看出龄糊,DoubleAdder和DoubleAccumulator用于操作double原始類型。

?與LongAdder的唯一區(qū)別就是募疮,其內(nèi)部會通過一些方法炫惩,將原始的double類型,轉(zhuǎn)換為long類型阿浓,其余和LongAdder完全一樣:

/**
     * Adds the given value.
     *
     * @param x the value to add
     */
    public void add(double x) {
        Cell[] as; long b, v; int m; Cell a;
        if ((as = cells) != null ||
            !casBase(b = base,
                     Double.doubleToRawLongBits //Double內(nèi)部轉(zhuǎn)換成Long的方法
                     (Double.longBitsToDouble(b) + x))) {
            boolean uncontended = true;
            if (as == null || (m = as.length - 1) < 0 ||
                (a = as[getProbe() & m]) == null ||
                !(uncontended = a.cas(v = a.value,
                                      Double.doubleToRawLongBits
                                      (Double.longBitsToDouble(v) + x))))
                doubleAccumulate(x, null, uncontended);
        }
    }
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末他嚷,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子芭毙,更是在濱河造成了極大的恐慌爸舒,老刑警劉巖,帶你破解...
    沈念sama閱讀 217,277評論 6 503
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件稿蹲,死亡現(xiàn)場離奇詭異扭勉,居然都是意外死亡,警方通過查閱死者的電腦和手機苛聘,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,689評論 3 393
  • 文/潘曉璐 我一進店門涂炎,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人设哗,你說我怎么就攤上這事唱捣。” “怎么了网梢?”我有些...
    開封第一講書人閱讀 163,624評論 0 353
  • 文/不壞的土叔 我叫張陵震缭,是天一觀的道長。 經(jīng)常有香客問我战虏,道長拣宰,這世上最難降的妖魔是什么党涕? 我笑而不...
    開封第一講書人閱讀 58,356評論 1 293
  • 正文 為了忘掉前任,我火速辦了婚禮巡社,結(jié)果婚禮上膛堤,老公的妹妹穿的比我還像新娘。我一直安慰自己晌该,他們只是感情好肥荔,可當我...
    茶點故事閱讀 67,402評論 6 392
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著朝群,像睡著了一般燕耿。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上姜胖,一...
    開封第一講書人閱讀 51,292評論 1 301
  • 那天誉帅,我揣著相機與錄音,去河邊找鬼谭期。 笑死堵第,一個胖子當著我的面吹牛吧凉,可吹牛的內(nèi)容都是我干的隧出。 我是一名探鬼主播,決...
    沈念sama閱讀 40,135評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼阀捅,長吁一口氣:“原來是場噩夢啊……” “哼胀瞪!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起饲鄙,我...
    開封第一講書人閱讀 38,992評論 0 275
  • 序言:老撾萬榮一對情侶失蹤凄诞,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后忍级,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體帆谍,經(jīng)...
    沈念sama閱讀 45,429評論 1 314
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,636評論 3 334
  • 正文 我和宋清朗相戀三年轴咱,在試婚紗的時候發(fā)現(xiàn)自己被綠了汛蝙。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 39,785評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡朴肺,死狀恐怖窖剑,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情戈稿,我是刑警寧澤西土,帶...
    沈念sama閱讀 35,492評論 5 345
  • 正文 年R本政府宣布,位于F島的核電站鞍盗,受9級特大地震影響需了,放射性物質(zhì)發(fā)生泄漏跳昼。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 41,092評論 3 328
  • 文/蒙蒙 一援所、第九天 我趴在偏房一處隱蔽的房頂上張望庐舟。 院中可真熱鬧住拭,春花似錦、人聲如沸滔岳。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,723評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽摊求。三九已至刘离,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間硫惕,已是汗流浹背茧痕。 一陣腳步聲響...
    開封第一講書人閱讀 32,858評論 1 269
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留恼除,地道東北人。 一個月前我還...
    沈念sama閱讀 47,891評論 2 370
  • 正文 我出身青樓令野,卻偏偏與公主長得像徽级,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子餐抢,可洞房花燭夜當晚...
    茶點故事閱讀 44,713評論 2 354

推薦閱讀更多精彩內(nèi)容