JAVA 多線程與高并發(fā)學習筆記(十)——LongAdder

轉眼就8月份了巢块,要開始努力了~~~

在爭用激烈的場景下,會導致大量的CAS空自旋燎窘。這會浪費大量的CPU資源浑厚,大大降低了程序的性能股耽。

這時候可以通過使用 LongAdder 代替 AtomicInteger 來提高CAS操作的性能。

以空間換時間:LongAdder

Java8提供了一個類 LongAdder 通過以空間換時間的方式提高并發(fā)場景下CAS操作的性能钳幅。

LongAdder 的核心思想是熱點分離物蝙,與 ConcurrentHashMap 的設計思想類似,將 value 值分離成一個數組敢艰,當多線程訪問時茬末,通過 Hash 算法將線程映射到數組的一個元素進行操作,而獲取最終的的 value 結果時盖矫,則將數組的元素求和丽惭。

最終,通過 LongAdder 將內部操作對象從單一 value 值“演變”成一些列的數組元素辈双,從而減小了內部競爭的粒度责掏。

下面看一個小例子:

public class LongAdderTest {

    final int TURNS = 1000;

    @Test
    public void testLongAdder() {
        final int TASK_AMOUNT = 10;

        ExecutorService pool = Executors.newCachedThreadPool();

        LongAdder longAdder = new LongAdder();

        CountDownLatch countDownLatch = new CountDownLatch(TASK_AMOUNT);
        long start = System.currentTimeMillis();
        for(int i = 0; i < TASK_AMOUNT; i++) {
            pool.submit(() -> {
                try {
                    for(int j = 0; j < TURNS; j++) {
                        longAdder.add(1);
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                }
                countDownLatch.countDown();
            });
        }

        try {
            countDownLatch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        float time = (System.currentTimeMillis() - start) / 1000F;
        System.out.println("運行時長為:" + time);
        System.out.println("累加結果為: " + longAdder.longValue());
    }
}

LongAdder 的原理

AtomicLong 使用內部變量 value 保存著實際的 long 值,所有的操作都是針對該變量進行的湃望。也就是說在高并發(fā)環(huán)境下换衬,value 變量其實是一個熱點,重試線程越多证芭,CAS失敗概率更高瞳浦,從而進入惡性CAS空自旋狀態(tài)。

LongAdder 的基本實錄是分散熱點废士,將 value 值分散到一個數組中叫潦,不同線程會命中到數組的不同槽(元素)中,各個線程只對自己槽中的那個值進行CAS操作官硝。這樣熱點就被分擔了矗蕊,沖突的概率就小很多短蜕。

使用 LongAdder,即使線程數再多也不必擔心傻咖,各個線程會分配到多個元素上去更新朋魔,增加元素個數,就可以降低 value 的“熱度”卿操,惡性CAS空自旋就解決了警检。

如果要獲得完整的 LongAdder 存儲的值,只要將各個槽中的變量值累加害淤,返回最終累加之后的值即可解滓。

LongAdder 的實現(xiàn)思路與 ConcurrentHashMap 中分段鎖的基本原理非常相似,本質上都是不同的線程在不同的單元上進行操作筝家,這樣減少了線程競爭,提高了并發(fā)效率邻辉。

LongAdder 的設計體現(xiàn)了空間換時間的思想溪王,不過在實際高并發(fā)場景下,數組元素所消耗的空間可以忽略不計值骇。

LongAdder 實例的內部結構

一個 LongAdder 實例的內部結構如下所示莹菱。

longAdder.png

LongAdder 的內部成員包含一個 base 值和一個 cells 數組,在最初無競爭時吱瘩,只操作 base 的值道伟,當線程執(zhí)行CAS失敗后,才初始化 cells 數組使碾,并為線程分配所對應的元素蜜徽。

基類 Striped64 內部三個重要成員

LongAdder 繼承于 Stripped64 類,base 值和 cell 數組都在 Stripped64 類中定義票摇,它的內部三個重要的成員如下:

/**
 * 成員一:存放Cell的哈希表拘鞋,大小為2的冪
 */
transient volatile Cell[] cells;
/**
 * 成員二:基礎值
 * 1.在沒有競爭時會更新這個值
 * 2.在cells初始化時,cells不可用矢门,也會嘗試通過CAS操作值累加到base
 */
transient volatile long base;
/**
 * 自旋鎖盆色,通過CAS操作加鎖,為0表示cells數組沒有處于創(chuàng)建祟剔、擴容階段
 * 為1表示正在創(chuàng)建或者擴展cells數組隔躲,不能進行新Cell元素的設置操作
 */
transient volatile int cellsBusy;

Stripped64 內部包含一個 base 和一個 Cell[] 類型的 cells 數組,cells 數組又叫哈希表物延,在沒有競爭的情況下宣旱,要累加的數通過CAS累加到 base 上,如果有競爭的話叛薯,會將要累加的數累加到 cells 數組中的某個 Cell 元素里面响鹃。

Stripped64 的整體值 value 的獲取函數如下:

public long longValue() {
    return sum();
}

/**
 * 將多個cells數組中的值加起來的和
 */
public long sum() {
    Cell[] as = cells; 
    Cell a;
    long sum = base;
    if (as != null) {
        for (int i = 0; i < as.length; ++i) {
            if ((a = as[i]) != null)
                sum += a.value;
        }
    }
    return sum;
}

Stripped64 的設計核心思路是通過內部的分散計算來避免競爭驾霜,以空間換取時間。沒有競爭時 cells 數組為 null买置,這時只使用 base粪糙,一旦發(fā)生競爭,cells 數組就上場了忿项。

cells 數組第一次初始化長度為2蓉冈,以后每次擴容都變?yōu)樵瓉淼膬杀叮恢钡?cells 數組的長度大于等于當前服務器的CPU核數轩触,同一時刻能持有CPU時間片去并發(fā)操作同一內存地址的最大線程數最多也就是CPU的核數寞酿。

在存在線程爭用的時候,每個線程被映射到 cells[threadLocalRandomProbe&cells.length] 位置的 Cell 元素脱柱,該線程對 value 所做的累加操作就執(zhí)行在對應的 Cell 元素的值上伐弹。

LongAdder 類的 add() 操作

這里分析以下 LongAdder 類的 add() 方法,具體的源碼如下:

/**
 * Adds the given value.
 *
 * @param x the value to add
 */
public void add(long x) {
    Cell[] as; long b, v; int m; Cell a;
    if ((as = cells) != null || !casBase(b = base, b + x)) {
        boolean uncontended = true;
        if (as == null || (m = as.length - 1) < 0 ||
            (a = as[getProbe() & m]) == null ||
            !(uncontended = a.cas(v = a.value, v + x)))
            longAccumulate(x, null, uncontended);
    }
}

/**
 * 自增1
 */
public void increment() {
    add(1L);
}

/**
 * 自減1
 */
public void decrement() {
    add(-1L);
}

首先介紹一下代碼的外層 if 塊的兩個條件語句:

  • cells 數組不為null榨为,說明存在爭用惨好,在不存在爭用的時候,cells 數組一定為null随闺,一旦對 base 的CAS操作失敗日川,才會初始化 cells 數組。
  • 如果 cells 數組為null矩乐,表示之前不存在爭用龄句,并且此次 casBase 執(zhí)行成功,表示基于 base 成員累加成功散罕,add 方法直接返回分歇;如果 casBase 方法執(zhí)行失敗,說明產生了第一次爭用沖突欧漱,需要對 cells 數組初始化卿樱,此時即將進入內嵌 if 塊。

casBase 方法很簡單硫椰,就是通過 UNSAFE 類的CAS設置成員變量 base 的值為 base+x繁调。casBase方法的代碼如下:

final boolean casBase(long cmp, long val) {
    return UNSAFE.compareAndSwapLong(this, BASE, cmp, val);
}

內嵌的if語句塊邏輯如下:

  • as == null || (m = as.length - 1)<0 代表 cells 沒有初始化。
  • 當前線程的哈希值在 cells 數組映射位置的 Cell 對象為空靶草,意思是還沒有其他線程在同一個位置做過累加操作蹄胰。
  • 當前線程的哈希值在 cells 數組映射位置的 Cell 對象不為空,然后在該 Cell 對象上進行CAS操作奕翔,設置其值為 v+x裕寨,但是CAS操作失敗,表示存在爭用。

如果以上條件滿足一個宾袜,就進入 longAccumulate 方法捻艳。

LongAdder 類中的 longAccumulate() 方法

longAccumulate()Striped64 中重要的方法,實現(xiàn)不同的線程更新各自 Cell 中的值庆猫,其實現(xiàn)邏輯類似于分段鎖认轨,具體代碼如下:

final void longAccumulate(long x, LongBinaryOperator fn,
                              boolean wasUncontended) {
    int h;
    if ((h = getProbe()) == 0) {
        ThreadLocalRandom.current(); // force initialization
        h = getProbe();
        wasUncontended = true;
    }
    // 擴容意向,collide=true可以擴容月培,collide=false不可擴容
    boolean collide = false;
    // 自旋嘁字,一直到操作成功
    for (;;) {
        // as 表示cells引用
        // a 表示當前線程命中的Cell
        // n表示cells數組長度
        // v 表示期望值
        Cell[] as; Cell a; int n; long v;
        // true表示下標位置的Cell為null,需要創(chuàng)建Cell
        if ((as = cells) != null && (n = as.length) > 0) {
            if ((a = as[(n - 1) & h]) == null) {
                if (cellsBusy == 0) {       // cells 數組沒有處于創(chuàng)建杉畜,擴容階段
                    Cell r = new Cell(x);   // Optimistically create
                    if (cellsBusy == 0 && casCellsBusy()) {
                        boolean created = false;
                        try {               // Recheck under lock
                            Cell[] rs; int m, j;
                            if ((rs = cells) != null &&
                                (m = rs.length) > 0 &&
                                rs[j = (m - 1) & h] == null) {
                                rs[j] = r;
                                created = true;
                            }
                        } finally {
                            cellsBusy = 0;
                        }
                        if (created)
                            break;
                        continue;           // Slot is now non-empty
                    }
                }
                collide = false;
            }
            // 當前線程競爭失敗纪蜒,wasUncontended為false
            else if (!wasUncontended)       // CAS already known to fail
                wasUncontended = true;      // Continue after rehash
            // 當前線程rehash過哈希值,CAS更新cell
            else if (a.cas(v = a.value, ((fn == null) ? v + x :
                                            fn.applyAsLong(v, x))))
                break;
            // 調整擴容意向此叠,然后進入下一輪循環(huán)
            else if (n >= NCPU || cells != as)
                collide = false;            // At max size or stale
            // 設置擴容意向為true纯续,但是不一定真的發(fā)生擴容
            else if (!collide)
                collide = true;
            // 真正擴容的邏輯
            else if (cellsBusy == 0 && casCellsBusy()) {
                try {
                    if (cells == as) {      // Expand table unless stale
                        Cell[] rs = new Cell[n << 1];
                        for (int i = 0; i < n; ++i)
                            rs[i] = as[i];
                        cells = rs;
                    }
                } finally {
                    cellsBusy = 0; // 釋放鎖
                }
                collide = false;
                continue;                   // Retry with expanded table
            }
            h = advanceProbe(h); //重置當前線程的Hash值
        }
        // cells還未初始化,并且cellsBusy加鎖成功
        else if (cellsBusy == 0 && cells == as && casCellsBusy()) {
            boolean init = false;
            try {                           // Initialize table
                if (cells == as) {
                    Cell[] rs = new Cell[2];
                    rs[h & 1] = new Cell(x);
                    cells = rs;
                    init = true;
                }
            } finally {
                cellsBusy = 0;
            }
            if (init)
                break;
        }
        // 當前線程cellsBusy加鎖失敗灭袁,表示其它線程這鞥在初始化cells
        // 所以當前線程將值累加到base猬错,注意add()方法調用此方式時fn為null
        else if (casBase(v = base, ((fn == null) ? v + x :
                                    fn.applyAsLong(v, x))))
            break;                          // Fall back on using base
    }
}

LongAddr 類的 casCellsBusy 方法

casCellsBusy()方法的代碼很簡單,就是將 cellsBusy 成員的值改為1简卧,表示目前的cells數組在初始化或擴容中。代碼如下:

final boolean casCellsBusy() {
    return UNSAFE.compareAndSwapInt(this, CELLSBUSY, 0, 1);
}

casCellsBusy() 方法相當于鎖的功能烤芦,當線程需要 cells 數組初始化或擴容時举娩,需要調到 casCellsBusy() 方法,通過CAS方式將 cellsBusy 成員的值改為1构罗,如果修改失敗铜涉,就表示其它的線程正在進行數組初始化或擴容的操作。在 cells 數組初始化或擴容的操作執(zhí)行完成之后遂唧,cellsBusy 成員的值被改為0芙代,這時不需要進行CAS修改,直接修改即可盖彭,因為不存在爭用纹烹。

當 cellsBusy 值為1時,表示cells數組正在被某個線程執(zhí)行初始化或擴容操作召边,其它線程不能進行以下操作:

  1. 對cells數組初始化铺呵。
  2. 對cells數組擴容。
  3. 如果cells數組中某個元素為null隧熙,就為該元素創(chuàng)建新的Cell對象片挂,因為數組的結構正在修改,所以其他線程下面不能創(chuàng)建新的Cell對象。
?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
  • 序言:七十年代末音念,一起剝皮案震驚了整個濱河市沪饺,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌闷愤,老刑警劉巖整葡,帶你破解...
    沈念sama閱讀 223,126評論 6 520
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異肝谭,居然都是意外死亡掘宪,警方通過查閱死者的電腦和手機,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 95,421評論 3 400
  • 文/潘曉璐 我一進店門攘烛,熙熙樓的掌柜王于貴愁眉苦臉地迎上來魏滚,“玉大人,你說我怎么就攤上這事坟漱∈蟠危” “怎么了?”我有些...
    開封第一講書人閱讀 169,941評論 0 366
  • 文/不壞的土叔 我叫張陵芋齿,是天一觀的道長腥寇。 經常有香客問我,道長觅捆,這世上最難降的妖魔是什么赦役? 我笑而不...
    開封第一講書人閱讀 60,294評論 1 300
  • 正文 為了忘掉前任,我火速辦了婚禮栅炒,結果婚禮上掂摔,老公的妹妹穿的比我還像新娘。我一直安慰自己赢赊,他們只是感情好乙漓,可當我...
    茶點故事閱讀 69,295評論 6 398
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著释移,像睡著了一般叭披。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上玩讳,一...
    開封第一講書人閱讀 52,874評論 1 314
  • 那天涩蜘,我揣著相機與錄音,去河邊找鬼熏纯。 笑死皱坛,一個胖子當著我的面吹牛,可吹牛的內容都是我干的豆巨。 我是一名探鬼主播剩辟,決...
    沈念sama閱讀 41,285評論 3 424
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了贩猎?” 一聲冷哼從身側響起熊户,我...
    開封第一講書人閱讀 40,249評論 0 277
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎吭服,沒想到半個月后嚷堡,有當地人在樹林里發(fā)現(xiàn)了一具尸體,經...
    沈念sama閱讀 46,760評論 1 321
  • 正文 獨居荒郊野嶺守林人離奇死亡艇棕,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內容為張勛視角 年9月15日...
    茶點故事閱讀 38,840評論 3 343
  • 正文 我和宋清朗相戀三年蝌戒,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片沼琉。...
    茶點故事閱讀 40,973評論 1 354
  • 序言:一個原本活蹦亂跳的男人離奇死亡北苟,死狀恐怖,靈堂內的尸體忽然破棺而出打瘪,到底是詐尸還是另有隱情友鼻,我是刑警寧澤,帶...
    沈念sama閱讀 36,631評論 5 351
  • 正文 年R本政府宣布闺骚,位于F島的核電站挂滓,受9級特大地震影響猪瞬,放射性物質發(fā)生泄漏岩齿。R本人自食惡果不足惜呛哟,卻給世界環(huán)境...
    茶點故事閱讀 42,315評論 3 336
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望胸梆。 院中可真熱鬧敦捧,春花似錦、人聲如沸乳绕。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,797評論 0 25
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽洋措。三九已至,卻和暖如春杰刽,著一層夾襖步出監(jiān)牢的瞬間菠发,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 33,926評論 1 275
  • 我被黑心中介騙來泰國打工贺嫂, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留滓鸠,地道東北人。 一個月前我還...
    沈念sama閱讀 49,431評論 3 379
  • 正文 我出身青樓第喳,卻偏偏與公主長得像糜俗,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當晚...
    茶點故事閱讀 45,982評論 2 361

推薦閱讀更多精彩內容