Concurrent Java 01 - 線程安全性

線程安全三個必要性

確保三個必要性

原子性

Atomic 包提供了一批AtomicXXX類型,用于確保對象的獲取和操作步驟為原子性操作雨涛。


atomic包提供類
將Read,Load,Use, Assign, Store, Write 綁定在一起
package com.accat.concurrency.example.atomic;

import com.accat.concurrency.annoations.ThreadSafe;
import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicInteger;

@Slf4j
@ThreadSafe
public class AtomicExample1 {

    // 請求總數
    public static int clientTotal = 5000;

    // 同時并發(fā)執(zhí)行的線程數
    public static int threadTotal = 200;

    public static AtomicInteger count = new AtomicInteger(0);

    public static void main(String[] args) throws Exception {
        ExecutorService executorService = Executors.newCachedThreadPool();
        final Semaphore semaphore = new Semaphore(threadTotal);
        final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
        for (int i = 0; i < clientTotal ; i++) {
            executorService.execute(() -> {
                try {
                    semaphore.acquire();
                    add();
                    semaphore.release();
                } catch (Exception e) {
                    log.error("exception", e);
                }
                countDownLatch.countDown();
            });
        }
        countDownLatch.await();
        executorService.shutdown();
        log.info("count:{}", count.get());
    }

    private static void add() {
        count.incrementAndGet();
        // count.getAndIncrement();
    }
}

這里關鍵是add()中的count.incrementAndGet()枢舶,追蹤這個方法
count.incrementAndGet
-> unsafe.getAndAddInt

public final int getAndAddInt(Object var1, long var2, int var4) {
        int var5;
        do {
            var5 = this.getIntVolatile(var1, var2);
        } while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4));

        return var5;
    }

這里使用了樂觀鎖的概念懦胞,不斷地去比較var2, 和 var5,如果相同則切換值var5 + var4

-> getIntVolatile + compareAndSwapInt
追蹤鏈到這個方法祟辟,compareAndSwapInt -- CAS 代表 比較切換值同時進行医瘫,屬于java底層的代碼侣肄。

public final native boolean compareAndSwapInt(Object var1, long var2, int var4, int var5);

compareAndSet

package com.accat.concurrency.example.atomic;

import com.accat.concurrency.annoations.ThreadSafe;
import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.atomic.AtomicReference;

@Slf4j
@ThreadSafe
public class AtomicExample4 {

    private static AtomicReference<Integer> count = new AtomicReference<>(0);

    public static void main(String[] args) {
        count.compareAndSet(0, 2); // 2
        count.compareAndSet(0, 1); // no
        count.compareAndSet(1, 3); // no
        count.compareAndSet(2, 4); // 4
        count.compareAndSet(3, 5); // no
        log.info("count:{}", count.get());
    }
}

compareAndSet直接設置值旧困,由于AtomicReference<V>是個泛型類,所以設置的值為V類型稼锅。

AtomicIntegerFieldUpdater

package com.accat.concurrency.example.atomic;

import com.accat.concurrency.annoations.ThreadSafe;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;

@Slf4j
@ThreadSafe
public class AtomicExample5 {

    private static AtomicIntegerFieldUpdater<AtomicExample5> updater =
            AtomicIntegerFieldUpdater.newUpdater(AtomicExample5.class, "count");

    @Getter
    public volatile int count = 100;

    public static void main(String[] args) {

        AtomicExample5 example5 = new AtomicExample5();

        if (updater.compareAndSet(example5, 100, 120)) {
            log.info("update success 1, {}", example5.getCount());
        }

        if (updater.compareAndSet(example5, 100, 120)) {
            log.info("update success 2, {}", example5.getCount());
        } else {
            log.info("update failed, {}", example5.getCount());
        }
    }
}

更新某個對象中的某字段值

AtomicStampedReference

標準樂觀鎖

/**
     * Atomically sets the value of both the reference and stamp
     * to the given update values if the
     * current reference is {@code ==} to the expected reference
     * and the current stamp is equal to the expected stamp.
     *
     * @param expectedReference the expected value of the reference
     * @param newReference the new value for the reference
     * @param expectedStamp the expected value of the stamp
     * @param newStamp the new value for the stamp
     * @return {@code true} if successful
     */
    public boolean compareAndSet(V   expectedReference,
                                 V   newReference,
                                 int expectedStamp,
                                 int newStamp) {
        Pair<V> current = pair;
        return
            expectedReference == current.reference &&
            expectedStamp == current.stamp &&
            ((newReference == current.reference &&
              newStamp == current.stamp) ||
             casPair(current, Pair.of(newReference, newStamp)));
    }

AtomicStampedReference類可以說是樂觀鎖的標準實現吼具,
①該類為改變的類變量維護一個版本號,每次相信該類變量版本為最新矩距,
②如果是最新則設置值拗盒,
③如果不是則放棄操作后返回false

讓多線程代碼只執(zhí)行一次

package com.mmall.concurrency.example.atomic;

import com.mmall.concurrency.annoations.ThreadSafe;
import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicBoolean;

@Slf4j
@ThreadSafe
public class AtomicExample6 {

    private static AtomicBoolean isHappened = new AtomicBoolean(false);

    // 請求總數
    public static int clientTotal = 5000;

    // 同時并發(fā)執(zhí)行的線程數
    public static int threadTotal = 200;

    public static void main(String[] args) throws Exception {
        ExecutorService executorService = Executors.newCachedThreadPool();
        final Semaphore semaphore = new Semaphore(threadTotal);
        final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
        for (int i = 0; i < clientTotal ; i++) {
            executorService.execute(() -> {
                try {
                    semaphore.acquire();
                    test();
                    semaphore.release();
                } catch (Exception e) {
                    log.error("exception", e);
                }
                countDownLatch.countDown();
            });
        }
        countDownLatch.await();
        executorService.shutdown();
        log.info("isHappened:{}", isHappened.get());
    }

    private static void test() {
        if (isHappened.compareAndSet(false, true)) {
            log.info("execute");
        }
    }
}

test()中的代碼只執(zhí)行了一次锥债,如果要確保多線程執(zhí)行同一方法時陡蝇,確保方法只被執(zhí)行一次則可以參考上述代碼。

上述atomic包提供的類是存在缺陷的哮肚,因為它只提供對單一類成員變量的原子性操作登夫。
AtomicInteger只提供對單一Integer的原子性操作。
如果我有兩個類型AtomicInteger aAtomicInteger b允趟, 而操作 a + b 的過程中依然存在縫隙恼策。

鎖的分類

synchronized

synchronized更為底層,JVM層面的實現
Lock更為上層潮剪,是Java代碼實現


synchronized作用范圍
package com.accat.concurrency.example.sync;

import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

@Slf4j
public class SynchronizedExample1 {

    // 修飾一個代碼塊
    public void test1(int j) throws InterruptedException {
        synchronized (this) {
            for (int i = 0; i < 10; i++) {
                log.info("test1 {} - {}", j, i);
                Thread.sleep(200);
            }
        }
    }

    // 修飾一個方法
    public synchronized void test2(int j) throws InterruptedException {
        for (int i = 0; i < 10; i++) {
            log.info("test2 {} - {}", j, i);
            Thread.sleep(200);
        }
    }

    public static void main(String[] args) {
        SynchronizedExample1 example1 = new SynchronizedExample1();
        SynchronizedExample1 example2 = new SynchronizedExample1();
        ExecutorService executorService = Executors.newCachedThreadPool();
        executorService.execute(() -> {
            try {
                example1.test2(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
        executorService.execute(() -> {
            try {
                example2.test2(2);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
    }
}
package com.accat.concurrency.example.sync;

import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

@Slf4j
public class SynchronizedExample2 {

    // 修飾一個類
    public static void test1(int j) throws InterruptedException {
        synchronized (SynchronizedExample2.class) {
            for (int i = 0; i < 10; i++) {
                log.info("test1 {} - {}", j, i);
                Thread.sleep(200);
            }
        }
    }

    // 修飾一個靜態(tài)方法
    public static synchronized void test2(int j) throws InterruptedException {
        for (int i = 0; i < 10; i++) {
            log.info("test2 {} - {}", j, i);
            Thread.sleep(200);
        }
    }

    public static void main(String[] args) {
        SynchronizedExample2 example1 = new SynchronizedExample2();
        SynchronizedExample2 example2 = new SynchronizedExample2();
        ExecutorService executorService = Executors.newCachedThreadPool();
        executorService.execute(() -> {
            try {
                example1.test1(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
        executorService.execute(() -> {
            try {
                example2.test1(2);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
    }
}
不同關鍵字的應用場景

可見性

在Read前加上Load指令涣楷,在Write后加入Store指令
Volatile寫
Volatile讀

Volatile關鍵字保證讀取或者寫入工作內存時都事先與主內存中的數據進行同步。

package com.accat.concurrency.example.count;

import com.accat.concurrency.annoations.NotThreadSafe;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;

@Slf4j
@NotThreadSafe
public class CountExample4 {

    // 請求總數
    public static int clientTotal = 5000;

    // 同時并發(fā)執(zhí)行的線程數
    public static int threadTotal = 200;

    public static volatile int count = 0;

    public static void main(String[] args) throws Exception {
        ExecutorService executorService = Executors.newCachedThreadPool();
        final Semaphore semaphore = new Semaphore(threadTotal);
        final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
        for (int i = 0; i < clientTotal ; i++) {
            executorService.execute(() -> {
                try {
                    semaphore.acquire();
                    add();
                    semaphore.release();
                } catch (Exception e) {
                    log.error("exception", e);
                }
                countDownLatch.countDown();
            });
        }
        countDownLatch.await();
        executorService.shutdown();
        log.info("count:{}", count);
    }

    private static void add() {
        count++;
        // 1抗碰、count讀取  多個線程讀取最新的結果
        // 2狮斗、+1
        // 3、count寫入  多個線程把+1的結果同時寫回主內存
    }
}

volatile并不具備線程原子性弧蝇,不能保證線程安全情龄。
它只能保證讀取的值為主內存當前值,寫入值后對其他線程立馬可見捍壤,影響的是判斷值的過程骤视。但是當操作時,可能多個操作(+1)執(zhí)行后同時寫入主內存中鹃觉。


volatile適應場景一:狀態(tài)通知

關于volatile底層細節(jié)

?著作權歸作者所有,轉載或內容合作請聯系作者
  • 序言:七十年代末专酗,一起剝皮案震驚了整個濱河市,隨后出現的幾起案子盗扇,更是在濱河造成了極大的恐慌祷肯,老刑警劉巖沉填,帶你破解...
    沈念sama閱讀 211,123評論 6 490
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現場離奇詭異佑笋,居然都是意外死亡翼闹,警方通過查閱死者的電腦和手機,發(fā)現死者居然都...
    沈念sama閱讀 90,031評論 2 384
  • 文/潘曉璐 我一進店門蒋纬,熙熙樓的掌柜王于貴愁眉苦臉地迎上來猎荠,“玉大人,你說我怎么就攤上這事蜀备」匾。” “怎么了?”我有些...
    開封第一講書人閱讀 156,723評論 0 345
  • 文/不壞的土叔 我叫張陵碾阁,是天一觀的道長输虱。 經常有香客問我,道長脂凶,這世上最難降的妖魔是什么宪睹? 我笑而不...
    開封第一講書人閱讀 56,357評論 1 283
  • 正文 為了忘掉前任,我火速辦了婚禮蚕钦,結果婚禮上亭病,老公的妹妹穿的比我還像新娘。我一直安慰自己冠桃,他們只是感情好命贴,可當我...
    茶點故事閱讀 65,412評論 5 384
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著食听,像睡著了一般胸蛛。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上樱报,一...
    開封第一講書人閱讀 49,760評論 1 289
  • 那天葬项,我揣著相機與錄音,去河邊找鬼迹蛤。 笑死民珍,一個胖子當著我的面吹牛,可吹牛的內容都是我干的盗飒。 我是一名探鬼主播嚷量,決...
    沈念sama閱讀 38,904評論 3 405
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼逆趣!你這毒婦竟也來了蝶溶?” 一聲冷哼從身側響起,我...
    開封第一講書人閱讀 37,672評論 0 266
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎抖所,沒想到半個月后梨州,有當地人在樹林里發(fā)現了一具尸體,經...
    沈念sama閱讀 44,118評論 1 303
  • 正文 獨居荒郊野嶺守林人離奇死亡田轧,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內容為張勛視角 年9月15日...
    茶點故事閱讀 36,456評論 2 325
  • 正文 我和宋清朗相戀三年暴匠,在試婚紗的時候發(fā)現自己被綠了。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片傻粘。...
    茶點故事閱讀 38,599評論 1 340
  • 序言:一個原本活蹦亂跳的男人離奇死亡每窖,死狀恐怖,靈堂內的尸體忽然破棺而出抹腿,到底是詐尸還是另有隱情岛请,我是刑警寧澤旭寿,帶...
    沈念sama閱讀 34,264評論 4 328
  • 正文 年R本政府宣布警绩,位于F島的核電站,受9級特大地震影響盅称,放射性物質發(fā)生泄漏肩祥。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 39,857評論 3 312
  • 文/蒙蒙 一缩膝、第九天 我趴在偏房一處隱蔽的房頂上張望混狠。 院中可真熱鬧,春花似錦疾层、人聲如沸将饺。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,731評論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽予弧。三九已至,卻和暖如春湖饱,著一層夾襖步出監(jiān)牢的瞬間掖蛤,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 31,956評論 1 264
  • 我被黑心中介騙來泰國打工井厌, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留蚓庭,地道東北人。 一個月前我還...
    沈念sama閱讀 46,286評論 2 360
  • 正文 我出身青樓仅仆,卻偏偏與公主長得像器赞,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子墓拜,可洞房花燭夜當晚...
    茶點故事閱讀 43,465評論 2 348

推薦閱讀更多精彩內容