線程安全三個必要性
原子性
Atomic 包提供了一批AtomicXXX類型,用于確保對象的獲取和操作步驟為原子性操作雨涛。
package com.accat.concurrency.example.atomic;
import com.accat.concurrency.annoations.ThreadSafe;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicInteger;
@Slf4j
@ThreadSafe
public class AtomicExample1 {
// 請求總數
public static int clientTotal = 5000;
// 同時并發(fā)執(zhí)行的線程數
public static int threadTotal = 200;
public static AtomicInteger count = new AtomicInteger(0);
public static void main(String[] args) throws Exception {
ExecutorService executorService = Executors.newCachedThreadPool();
final Semaphore semaphore = new Semaphore(threadTotal);
final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
for (int i = 0; i < clientTotal ; i++) {
executorService.execute(() -> {
try {
semaphore.acquire();
add();
semaphore.release();
} catch (Exception e) {
log.error("exception", e);
}
countDownLatch.countDown();
});
}
countDownLatch.await();
executorService.shutdown();
log.info("count:{}", count.get());
}
private static void add() {
count.incrementAndGet();
// count.getAndIncrement();
}
}
這里關鍵是add()
中的count.incrementAndGet()
枢舶,追蹤這個方法
count.incrementAndGet
-> unsafe.getAndAddInt
public final int getAndAddInt(Object var1, long var2, int var4) {
int var5;
do {
var5 = this.getIntVolatile(var1, var2);
} while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4));
return var5;
}
這里使用了樂觀鎖的概念懦胞,不斷地去比較var2
, 和 var5
,如果相同則切換值var5 + var4
-> getIntVolatile + compareAndSwapInt
追蹤鏈到這個方法祟辟,compareAndSwapInt
-- CAS 代表 比較和切換值同時進行医瘫,屬于java底層的代碼侣肄。
public final native boolean compareAndSwapInt(Object var1, long var2, int var4, int var5);
compareAndSet
package com.accat.concurrency.example.atomic;
import com.accat.concurrency.annoations.ThreadSafe;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.atomic.AtomicReference;
@Slf4j
@ThreadSafe
public class AtomicExample4 {
private static AtomicReference<Integer> count = new AtomicReference<>(0);
public static void main(String[] args) {
count.compareAndSet(0, 2); // 2
count.compareAndSet(0, 1); // no
count.compareAndSet(1, 3); // no
count.compareAndSet(2, 4); // 4
count.compareAndSet(3, 5); // no
log.info("count:{}", count.get());
}
}
compareAndSet
直接設置值旧困,由于AtomicReference<V>
是個泛型類,所以設置的值為V類型稼锅。
AtomicIntegerFieldUpdater
package com.accat.concurrency.example.atomic;
import com.accat.concurrency.annoations.ThreadSafe;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
@Slf4j
@ThreadSafe
public class AtomicExample5 {
private static AtomicIntegerFieldUpdater<AtomicExample5> updater =
AtomicIntegerFieldUpdater.newUpdater(AtomicExample5.class, "count");
@Getter
public volatile int count = 100;
public static void main(String[] args) {
AtomicExample5 example5 = new AtomicExample5();
if (updater.compareAndSet(example5, 100, 120)) {
log.info("update success 1, {}", example5.getCount());
}
if (updater.compareAndSet(example5, 100, 120)) {
log.info("update success 2, {}", example5.getCount());
} else {
log.info("update failed, {}", example5.getCount());
}
}
}
更新某個對象中的某字段值
AtomicStampedReference
標準樂觀鎖
/**
* Atomically sets the value of both the reference and stamp
* to the given update values if the
* current reference is {@code ==} to the expected reference
* and the current stamp is equal to the expected stamp.
*
* @param expectedReference the expected value of the reference
* @param newReference the new value for the reference
* @param expectedStamp the expected value of the stamp
* @param newStamp the new value for the stamp
* @return {@code true} if successful
*/
public boolean compareAndSet(V expectedReference,
V newReference,
int expectedStamp,
int newStamp) {
Pair<V> current = pair;
return
expectedReference == current.reference &&
expectedStamp == current.stamp &&
((newReference == current.reference &&
newStamp == current.stamp) ||
casPair(current, Pair.of(newReference, newStamp)));
}
AtomicStampedReference
類可以說是樂觀鎖的標準實現吼具,
①該類為改變的類變量維護一個版本號,每次相信該類變量版本為最新矩距,
②如果是最新則設置值拗盒,
③如果不是則放棄操作后返回false
。
讓多線程代碼只執(zhí)行一次
package com.mmall.concurrency.example.atomic;
import com.mmall.concurrency.annoations.ThreadSafe;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicBoolean;
@Slf4j
@ThreadSafe
public class AtomicExample6 {
private static AtomicBoolean isHappened = new AtomicBoolean(false);
// 請求總數
public static int clientTotal = 5000;
// 同時并發(fā)執(zhí)行的線程數
public static int threadTotal = 200;
public static void main(String[] args) throws Exception {
ExecutorService executorService = Executors.newCachedThreadPool();
final Semaphore semaphore = new Semaphore(threadTotal);
final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
for (int i = 0; i < clientTotal ; i++) {
executorService.execute(() -> {
try {
semaphore.acquire();
test();
semaphore.release();
} catch (Exception e) {
log.error("exception", e);
}
countDownLatch.countDown();
});
}
countDownLatch.await();
executorService.shutdown();
log.info("isHappened:{}", isHappened.get());
}
private static void test() {
if (isHappened.compareAndSet(false, true)) {
log.info("execute");
}
}
}
test()
中的代碼只執(zhí)行了一次锥债,如果要確保多線程執(zhí)行同一方法時陡蝇,確保方法只被執(zhí)行一次則可以參考上述代碼。
鎖
上述atomic包提供的類是存在缺陷的哮肚,因為它只提供對單一類成員變量的原子性操作登夫。
如AtomicInteger
只提供對單一Integer
的原子性操作。
如果我有兩個類型AtomicInteger a
和AtomicInteger b
允趟, 而操作 a + b
的過程中依然存在縫隙恼策。
synchronized
synchronized更為底層,JVM層面的實現
Lock更為上層潮剪,是Java代碼實現
package com.accat.concurrency.example.sync;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@Slf4j
public class SynchronizedExample1 {
// 修飾一個代碼塊
public void test1(int j) throws InterruptedException {
synchronized (this) {
for (int i = 0; i < 10; i++) {
log.info("test1 {} - {}", j, i);
Thread.sleep(200);
}
}
}
// 修飾一個方法
public synchronized void test2(int j) throws InterruptedException {
for (int i = 0; i < 10; i++) {
log.info("test2 {} - {}", j, i);
Thread.sleep(200);
}
}
public static void main(String[] args) {
SynchronizedExample1 example1 = new SynchronizedExample1();
SynchronizedExample1 example2 = new SynchronizedExample1();
ExecutorService executorService = Executors.newCachedThreadPool();
executorService.execute(() -> {
try {
example1.test2(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
executorService.execute(() -> {
try {
example2.test2(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
}
package com.accat.concurrency.example.sync;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@Slf4j
public class SynchronizedExample2 {
// 修飾一個類
public static void test1(int j) throws InterruptedException {
synchronized (SynchronizedExample2.class) {
for (int i = 0; i < 10; i++) {
log.info("test1 {} - {}", j, i);
Thread.sleep(200);
}
}
}
// 修飾一個靜態(tài)方法
public static synchronized void test2(int j) throws InterruptedException {
for (int i = 0; i < 10; i++) {
log.info("test2 {} - {}", j, i);
Thread.sleep(200);
}
}
public static void main(String[] args) {
SynchronizedExample2 example1 = new SynchronizedExample2();
SynchronizedExample2 example2 = new SynchronizedExample2();
ExecutorService executorService = Executors.newCachedThreadPool();
executorService.execute(() -> {
try {
example1.test1(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
executorService.execute(() -> {
try {
example2.test1(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
}
可見性
Volatile關鍵字保證讀取或者寫入工作內存時都事先與主內存中的數據進行同步。
package com.accat.concurrency.example.count;
import com.accat.concurrency.annoations.NotThreadSafe;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
@Slf4j
@NotThreadSafe
public class CountExample4 {
// 請求總數
public static int clientTotal = 5000;
// 同時并發(fā)執(zhí)行的線程數
public static int threadTotal = 200;
public static volatile int count = 0;
public static void main(String[] args) throws Exception {
ExecutorService executorService = Executors.newCachedThreadPool();
final Semaphore semaphore = new Semaphore(threadTotal);
final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
for (int i = 0; i < clientTotal ; i++) {
executorService.execute(() -> {
try {
semaphore.acquire();
add();
semaphore.release();
} catch (Exception e) {
log.error("exception", e);
}
countDownLatch.countDown();
});
}
countDownLatch.await();
executorService.shutdown();
log.info("count:{}", count);
}
private static void add() {
count++;
// 1抗碰、count讀取 多個線程讀取最新的結果
// 2狮斗、+1
// 3、count寫入 多個線程把+1的結果同時寫回主內存
}
}
volatile并不具備線程原子性弧蝇,不能保證線程安全情龄。
它只能保證讀取的值為主內存當前值,寫入值后對其他線程立馬可見捍壤,影響的是判斷值的過程骤视。但是當操作時,可能多個操作(+1)執(zhí)行后同時寫入主內存中鹃觉。