無鎖即無障礙的運行, 所有線程都可以到達臨界區(qū), 接近于無等待.無鎖采用CAS(compare and swap)算法來處理線程沖突, 其原理如下
CAS原理
CAS包含3個參數(shù)CAS(V,E,N).V表示要更新的變量, E表示預(yù)期值, N表示新值.僅當(dāng)V值等于E值時, 才會將V的值設(shè)為N, 如果V值和E值不同, 則說明已經(jīng)有其他線程做了更新, 則當(dāng)前線程什么都不做. 最后, CAS返回當(dāng)前V的真實值. CAS操作是抱著樂觀的態(tài)度進行的, 它總是認(rèn)為自己可以成功完成操作.
當(dāng)多個線程同時使用CAS操作一個變量時, 只有一個會勝出, 并成功更新, 其余均會失敗.失敗的線程不會被掛起,僅是被告知失敗, 并且允許再次嘗試, 當(dāng)然也允許失敗的線程放棄操作.基于這樣的原理, CAS操作即時沒有鎖,也可以發(fā)現(xiàn)其他線程對當(dāng)前線程的干擾, 并進行恰當(dāng)?shù)奶幚?
CPU指令
另外, 雖然上述步驟繁多, 實際上CAS整一個操作過程是一個原子操作, 它是由一條CPU指令完成的,從指令層保證操作可靠, 不會被多線程干擾.
無鎖與volatile
當(dāng)給變量加了volatile關(guān)鍵字, 表示該變量對所有線程可見, 但不保證原子性.
AtomicInteger
// 取得當(dāng)前值
public final int get()
// 設(shè)置當(dāng)前值
public final void set(int newValue)
// 設(shè)置新值,并返回舊值
public final int getAndSet(int newValue)
// 如果當(dāng)前值為expect嫡秕,則設(shè)置為u
public final boolean compareAndSet(int expect, int u)
// 當(dāng)前值加1,返回舊值
public final int getAndIncrement()
// 當(dāng)前值減1底循,返回舊值
public final int getAndDecrement()
// 當(dāng)前值增加delta,返回舊值
public final int getAndAdd(int delta)
// 當(dāng)前值加1,返回新值
public final int incrementAndGet()
// 當(dāng)前值減1很魂,返回新值
public final int decrementAndGet()
// 當(dāng)前值增加delta讶泰,返回新值
public final int addAndGet(int delta)
// 封裝了一個int對其加減
private volatile int value;
.......
public final boolean compareAndSet(int expect, int update) {
// 通過unsafe 基于CPU的CAS指令來實現(xiàn), 可以認(rèn)為無阻塞.
return unsafe.compareAndSwapInt(this, valueOffset, expect, update);
}
.......
public final int getAndIncrement() {
for (;;) {
// 當(dāng)前值
int current = get();
// 預(yù)期值
int next = current + 1;
if (compareAndSet(current, next)) {
// 如果加成功了, 則返回當(dāng)前值
return current;
}
// 如果加失敗了, 說明其他線程已經(jīng)修改了數(shù)據(jù), 與期望不相符,
// 則繼續(xù)無限循環(huán), 直到成功. 這種樂觀鎖, 理論上只要等兩三個時鐘周期就可以設(shè)值成功
// 相比于直接通過synchronized獨占鎖的方式操作int, 要大大節(jié)約等待時間.
}
}
demo
使用10個線程打印0-10000, 最終得到結(jié)果10w.
import java.util.concurrent.atomic.AtomicInteger;
public class AtomicIntegerDemo {
static AtomicInteger i = new AtomicInteger();
public static class AddThread implements Runnable {
public void run() {
for (int k = 0; k < 10000; k++) {
i.incrementAndGet();
}
}
}
public static void main(String[] args) throws InterruptedException {
Thread[] ts = new Thread[10];
for (int k = 0; k < 10; k++) {
ts[k] = new Thread(new AddThread());
}
for (int k = 0; k < 10; k++) {
ts[k].start();
}
for (int k = 0; k < 10; k++) {
ts[k].join();
}
System.out.println(i);
}
}
Unsafe
Unsafe類是在sun.misc包下, 可以用于一些非安全的操作咏瑟,比如:
根據(jù)偏移量設(shè)置值, 線程park(), 底層的CAS操作等等.
// 獲取類實例中變量的偏移量
valueOffset = unsafe.objectFieldOffset(AtomicInteger.class.getDeclaredField("value"));
// 基于偏移量對值進行操作
unsafe.compareAndSwapInt(this, valueOffset, expect, update);
主要接口
// 獲得給定對象偏移量上的int值
public native int getInt(Object o, long offset);
// 設(shè)置給定對象偏移量上的int值
public native void putInt(Object o, long offset, int x);
// 獲得字段在對象中的偏移量
public native long objectFieldOffset(Field f);
// 設(shè)置給定對象的int值,使用volatile語義
public native void putIntVolatile(Object o, long offset, int x);
// 獲得給定對象對象的int值痪署,使用volatile語義
public native int getIntVolatile(Object o, long offset);
// 和putIntVolatile()一樣码泞,但是它要求被操作字段就是volatile類型的
public native void putOrderedInt(Object o, long offset, int x);
AtomicReference
與AtomicInteger類似, 只是里面封裝了一個對象, 而不是int, 對引用進行修改
主要接口
1 get()
2 set(V)
3 compareAndSet()
4 getAndSet(V)
demo
使用10個線程, 同時嘗試修改AtomicReference中的String, 最終只有一個線程可以成功.
import java.util.concurrent.atomic.AtomicReference;
public class AtomicReferenceTest {
public final static AtomicReference<String> attxnicStr = new AtomicReference<String>("abc");
public static void main(String[] args) {
for (int i = 0; i < 10; i++) {
new Thread() {
public void run() {
try {
Thread.sleep(Math.abs((int) (Math.random() * 100)));
} catch (InterruptedException e) {
e.printStackTrace();
}
if (attxnicStr.compareAndSet("abc", "def")) {
System.out.println("Thread:" + Thread.currentThread().getId() + " change value to " + attxnicStr.get());
} else {
System.out.println("Thread:" + Thread.currentThread().getId() + " change failed!");
}
}
}.start();
}
}
}
AtomicStampedReference
也是封裝了一個引用, 主要解決ABA問題.
ABA問題
線程一準(zhǔn)備用CAS將變量的值由A替換為B, 在此之前線程二將變量的值由A替換為C, 線程三又將C替換為A, 然后線程一執(zhí)行CAS時發(fā)現(xiàn)變量的值仍然為A, 所以線程一CAS成功.
// 比較設(shè)置 參數(shù)依次為:期望值 寫入新值 期望時間戳 新時間戳
public boolean compareAndSet(V expectedReference,V newReference,int expectedStamp,int newStamp)
// 獲得當(dāng)前對象引用
public V getReference()
// 獲得當(dāng)前時間戳
public int getStamp()
// 設(shè)置當(dāng)前對象引用和時間戳
public void set(V newReference, int newStamp)
源碼分析
// 內(nèi)部封裝了一個Pair對象, 每次對對象操作的時候, stamp + 1
private static class Pair<T> {
final T reference;
final int stamp;
private Pair(T reference, int stamp) {
this.reference = reference;
this.stamp = stamp;
}
static <T> Pair<T> of(T reference, int stamp) {
return new Pair<T>(reference, stamp);
}
}
private volatile Pair<V> pair;
// 進行cas操作的時候, 會對比stamp的值
public boolean compareAndSet(V expectedReference,
V newReference,
int expectedStamp,
int newStamp) {
Pair<V> current = pair;
return
expectedReference == current.reference &&
expectedStamp == current.stamp &&
((newReference == current.reference &&
newStamp == current.stamp) ||
casPair(current, Pair.of(newReference, newStamp)));
}
后臺使用多個線程對用戶充值, 要求只能充值一次
public class AtomicStampedReferenceDemo {
static AtomicStampedReference<Integer> money=new AtomicStampedReference<Integer>(19,0);
public staticvoid main(String[] args) {
//模擬多個線程同時更新后臺數(shù)據(jù)庫,為用戶充值
for(int i = 0 ; i < 3 ; i++) {
final int timestamp=money.getStamp();
newThread() {
public void run() {
while(true){
while(true){
Integerm=money.getReference();
if(m<20){
if(money.compareAndSet(m,m+20,timestamp,timestamp+1)){
System.out.println("余額小于20元狼犯,充值成功余寥,余額:"+money.getReference()+"元");
break;
}
}else{
//System.out.println("余額大于20元,無需充值");
break ;
}
}
}
}
}.start();
}
//用戶消費線程悯森,模擬消費行為
new Thread() {
publicvoid run() {
for(int i=0;i<100;i++){
while(true){
int timestamp=money.getStamp();
Integer m=money.getReference();
if(m>10){
System.out.println("大于10元");
if(money.compareAndSet(m, m-10,timestamp,timestamp+1)){
System.out.println("成功消費10元宋舷,余額:"+money.getReference());
break;
}
}else{
System.out.println("沒有足夠的金額");
break;
}
}
try {Thread.sleep(100);} catch (InterruptedException e) {}
}
}
}.start();
}
}
AtomicIntegerArray
支持無鎖的數(shù)組
// 獲得數(shù)組第i個下標(biāo)的元素
public final int get(int i)
// 獲得數(shù)組的長度
public final int length()
// 將數(shù)組第i個下標(biāo)設(shè)置為newValue,并返回舊的值
public final int getAndSet(int i, int newValue)
// 進行CAS操作瓢姻,如果第i個下標(biāo)的元素等于expect祝蝠,則設(shè)置為update,設(shè)置成功返回true
public final boolean compareAndSet(int i, int expect, int update)
// 將第i個下標(biāo)的元素加1
public final int getAndIncrement(int i)
// 將第i個下標(biāo)的元素減1
public final int getAndDecrement(int i)
// 將第i個下標(biāo)的元素增加delta(delta可以是負(fù)數(shù))
public final int getAndAdd(int i, int delta)
// 數(shù)組本身基地址
private static final int base = unsafe.arrayBaseOffset(int[].class);
// 封裝了一個數(shù)組
private final int[] array;
static {
// 數(shù)組中對象的寬度, int類型, 4個字節(jié), scale = 4;
int scale = unsafe.arrayIndexScale(int[].class);
if ((scale & (scale - 1)) != 0)
throw new Error("data type scale not a power of two");
// 前導(dǎo)0 : 一個數(shù)字轉(zhuǎn)為二進制后, 他前面0的個數(shù)
// 對于4來講, 他就是00000000 00000000 00000000 00000100, 他的前導(dǎo)0 就是29
// 所以shift = 2
shift = 31 - Integer.numberOfLeadingZeros(scale);
}
// 獲取第i個元素
public final int get(int i) {
return getRaw(checkedByteOffset(i));
}
// 第i個元素, 在數(shù)組中的偏移量是多少
private long checkedByteOffset(int i) {
if (i < 0 || i >= array.length)
throw new IndexOutOfBoundsException("index " + i);
return byteOffset(i);
}
// base : 數(shù)組基地址, i << shift, 其實就是i * 4, 因為這邊是int array.
private static long byteOffset(int i) {
// i * 4 + base
return ((long) i << shift) + base;
}
// 根據(jù)偏移量從數(shù)組中獲取數(shù)據(jù)
private int getRaw(long offset) {
return unsafe.getIntVolatile(array, offset);
}
Demo
import java.util.concurrent.atomic.AtomicIntegerArray;
public class AtomicArrayDemo {
static AtomicIntegerArray arr = new AtomicIntegerArray(10);
public static class AddThread implements Runnable {
public void run() {
for (int k = 0; k < 10000; k++) {
arr.incrementAndGet(k % arr.length());
}
}
}
public static void main(String[] args) throws InterruptedException {
Thread[] ts = new Thread[10];
for (int k = 0; k < 10; k++) {
ts[k] = new Thread(new AddThread());
}
for (int k = 0; k < 10; k++) {
ts[k].start();
}
for (int k = 0; k < 10; k++) {
ts[k].join();
}
System.out.println(arr);
}
}
跟之前的AtomicInteger沒太多區(qū)別幻碱,只是需要對于傳入的i需要先判斷在數(shù)組length范圍內(nèi)续膳,再轉(zhuǎn)換成內(nèi)存地址。類中其他方法類同收班。