[TOC]
CAS
全稱(Compare And Swap),比較交換
Unsafe類是CAS的核心類,提供硬件級(jí)別的原子操作殴俱。
// 對(duì)象政冻、對(duì)象的地址、預(yù)期值线欲、修改值
public final native boolean compareAndSwapInt(Object var1, long var2, int var4, int var5);
缺點(diǎn):
- 開銷大:在并發(fā)量比較高的情況下明场,如果反復(fù)嘗試更新某個(gè)變量,卻又一直更新不成功李丰,會(huì)給CPU帶來(lái)較大的壓力
-
ABA問題:當(dāng)變量從A修改為B在修改回A時(shí)苦锨,變量值等于期望值A(chǔ),但是無(wú)法判斷是否修改,CAS操作在ABA修改后依然成功舟舒。
- 如何避免:Java提供了AtomicStampedReference和AtomicMarkableReference來(lái)解決拉庶。AtomicStampedReference通過(guò)包裝[E,Integer]的元組來(lái)對(duì)對(duì)象標(biāo)記版本戳stamp,對(duì)于ABA問題其解決方案是加上版本號(hào)魏蔗,即在每個(gè)變量都加上一個(gè)版本號(hào)砍的,每次改變時(shí)加1,即A —> B —> A莺治,變成1A —> 2B —> 3A廓鞠。
- 不能保證代碼塊的原子性:CAS機(jī)制所保證的只是一個(gè)變量的原子性操作,而不能保證整個(gè)代碼塊的原子性谣旁。
public class Test {
private static AtomicInteger atomicInteger = new AtomicInteger(100);
private static AtomicStampedReference atomicStampedReference = new AtomicStampedReference(100,1);
public static void main(String[] args) throws InterruptedException {
//AtomicInteger
Thread at1 = new Thread(new Runnable() {
@Override
public void run() {
atomicInteger.compareAndSet(100,110);
atomicInteger.compareAndSet(110,100);
}
});
Thread at2 = new Thread(new Runnable() {
@Override
public void run() {
try {
TimeUnit.SECONDS.sleep(2); // at1,執(zhí)行完
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("AtomicInteger:" + atomicInteger.compareAndSet(100,120));
}
});
at1.start();
at2.start();
at1.join();
at2.join();
//AtomicStampedReference
Thread tsf1 = new Thread(new Runnable() {
@Override
public void run() {
try {
//讓 tsf2先獲取stamp床佳,導(dǎo)致預(yù)期時(shí)間戳不一致
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
// 預(yù)期引用:100,更新后的引用:110榄审,預(yù)期標(biāo)識(shí)getStamp() 更新后的標(biāo)識(shí)getStamp() + 1
atomicStampedReference.compareAndSet(100,110,atomicStampedReference.getStamp(),atomicStampedReference.getStamp() + 1);
atomicStampedReference.compareAndSet(110,100,atomicStampedReference.getStamp(),atomicStampedReference.getStamp() + 1);
}
});
Thread tsf2 = new Thread(new Runnable() {
@Override
public void run() {
int stamp = atomicStampedReference.getStamp();
try {
TimeUnit.SECONDS.sleep(2); //線程tsf1執(zhí)行完
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("AtomicStampedReference:" +atomicStampedReference.compareAndSet(100,120,stamp,stamp + 1));
}
});
tsf1.start();
tsf2.start();
}
}
AQS(AbstractQueuedSynchronizer)
維護(hù)一個(gè)volatile int state(代表共享資源狀態(tài))和一個(gè)FIFO線程等待隊(duì)列砌们。
模板方法基本分為三類:
- 獨(dú)占鎖
- 共享鎖
- 釋放鎖
資源共享的方式
- Exclusive(獨(dú)占,只有一個(gè)線程能執(zhí)行搁进,如ReentrantLock)
- Share(共享浪感,多個(gè)線程可以同時(shí)執(zhí)行,如Semaphore/CountDownLatch)
同步隊(duì)列
AQS依靠同步隊(duì)列(一個(gè)FIFO的雙向隊(duì)列)來(lái)完成同步狀態(tài)的管理饼问。當(dāng)當(dāng)前線程獲取狀態(tài)失敗后影兽,同步器會(huì)將當(dāng)前線程以及等待信息構(gòu)造成一個(gè)節(jié)點(diǎn)(Node),并嘗試將他加入到同步隊(duì)列莱革。Head節(jié)點(diǎn)不保存等待的線程信息峻堰,僅通過(guò)next指向隊(duì)列中第一個(gè)保存等待線程信息的Node。
Node類
源碼(中字注釋)
static final class Node {
/** 代表共享模式 */
static final Node SHARED = new Node();
/** 代表獨(dú)占模式 */
static final Node EXCLUSIVE = null;
/** 以下四個(gè)狀態(tài)解釋見下文等待狀態(tài) */
static final int CANCELLED = 1;
static final int SIGNAL = -1;
static final int CONDITION = -2;
static final int PROPAGATE = -3;
/** 標(biāo)識(shí)等待狀態(tài)盅视,通過(guò)CAS操作更新捐名,原子操作不會(huì)被打斷*/
volatile int waitStatus;
/** 當(dāng)前節(jié)點(diǎn)的前置節(jié)點(diǎn) */
volatile Node prev;
/** 當(dāng)前節(jié)點(diǎn)的后置節(jié)點(diǎn) */
volatile Node next;
/** 該節(jié)點(diǎn)關(guān)聯(lián)的線程(未能獲取鎖,進(jìn)入等待的線程) */
volatile Thread thread;
/** 指向下一個(gè)在某個(gè)條件上等待的節(jié)點(diǎn)闹击,或者指向 SHARE 節(jié)點(diǎn)镶蹋,表明當(dāng)前處于共享模式*/
Node nextWaiter;
/**
* 判斷是否處于共享模式
*/
final boolean isShared() {
return nextWaiter == SHARED;
}
/**
* 返回當(dāng)前節(jié)點(diǎn)的前置節(jié)點(diǎn)
* 會(huì)做對(duì)前置節(jié)點(diǎn)空值判斷
*/
final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}
Node() { // Used to establish initial head or SHARED marker
}
Node(Thread thread, Node mode) { // Used by addWaiter
this.nextWaiter = mode;
this.thread = thread;
}
Node(Thread thread, int waitStatus) { // Used by Condition
this.waitStatus = waitStatus;
this.thread = thread;
}
}
等待狀態(tài):
等待狀態(tài)的修改是CAS原子操作
- CANCELED: 1,因?yàn)榈却瑫r(shí) (timeout)或者中斷(interrupt)赏半,節(jié)點(diǎn)會(huì)被置為取消狀態(tài)贺归。處于取消狀態(tài)的節(jié)點(diǎn)不會(huì)再去競(jìng)爭(zhēng)鎖,也就是說(shuō)不會(huì)再被阻塞除破。節(jié)點(diǎn)會(huì)一直保持取消狀態(tài),而不會(huì)轉(zhuǎn)換為其他狀態(tài)琼腔。處于 CANCELED 的節(jié)點(diǎn)會(huì)被移出隊(duì)列瑰枫,被 GC 回收。
- SIGNAL: -1,表明當(dāng)前的后繼結(jié)點(diǎn)正在或者將要被阻塞(通過(guò)使用 LockSupport.pack 方法)光坝,因此當(dāng)前的節(jié)點(diǎn)被釋放(release)或者被取消時(shí)(cancel)時(shí)尸诽,要喚醒它的后繼結(jié)點(diǎn)(通過(guò) LockSupport.unpark 方法)。
- CONDITION: -2盯另,表明當(dāng)前節(jié)點(diǎn)在條件隊(duì)列中性含,因?yàn)榈却硞€(gè)條件而被阻塞。
- PROPAGATE: -3鸳惯,在共享模式下商蕴,可以認(rèn)為資源有多個(gè),因此當(dāng)前線程被喚醒之后芝发,可能還有剩余的資源可以喚醒其他線程绪商。該狀態(tài)用來(lái)表明后續(xù)節(jié)點(diǎn)會(huì)傳播喚醒的操作。需要注意的是只有頭節(jié)點(diǎn)才可以設(shè)置為該狀態(tài)(This is set (for head node only) in doReleaseShared to ensure propagation continues, even if other operations have since intervened.)辅鲸。
- 0:新創(chuàng)建的節(jié)點(diǎn)會(huì)處于這種狀態(tài)
鎖的獲取與釋放:
獲取獨(dú)占鎖
- acquire方法
public final void acquire(int arg) {
// 首先嘗試獲取鎖格郁,如果獲取失敗,會(huì)先調(diào)用 addWaiter 方法創(chuàng)建節(jié)點(diǎn)并追加到隊(duì)列尾部
// 然后調(diào)用 acquireQueued 阻塞或者循環(huán)嘗試獲取鎖
if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)){
// 在 acquireQueued 中独悴,如果線程是因?yàn)橹袛喽顺龅淖枞麪顟B(tài)會(huì)返回 true
// 這里的 selfInterrupt 主要是為了恢復(fù)線程的中斷狀態(tài)
selfInterrupt();
}
}
釋放獨(dú)占鎖
? 在獨(dú)占模式中例书,鎖的釋放由于沒有其他線程競(jìng)爭(zhēng),相對(duì)簡(jiǎn)單刻炒。鎖釋放失敗的原因是由于該線程本身不擁有鎖决采,而非多線程競(jìng)爭(zhēng)。鎖釋放成功后會(huì)檢查后置節(jié)點(diǎn)的狀態(tài)落蝙,找到合適的節(jié)點(diǎn)织狐,調(diào)用unparkSuccessor方法喚醒該節(jié)點(diǎn)所關(guān)聯(lián)的線程。
- release方法
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
// waitStatus 為 0筏勒,證明是初始化的空隊(duì)列或者后繼結(jié)點(diǎn)已經(jīng)被喚醒了
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
獲取共享鎖
-
acquireShared方法
通過(guò)該方法可以申請(qǐng)鎖
public final void acquireShared(int arg) {
// 如果返回結(jié)果小于0移迫,證明沒有獲取到共享資源
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
- doAcquireShared
private void doAcquireShared(int arg) {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}