基于 CAS自旋實現(xiàn)的輕量級鎖在惡性自旋時會消費大量的CPU資源。解決這個問題有2種方案:分散操作熱點和使用隊列削峰她按。JUC并發(fā)包使用的是隊列削峰的方案解決CAS性能問題渠旁,并提供了一個基于雙向隊列的削峰基類——抽象基礎(chǔ)類AbstractQueuedSyncronizer(抽象同步器類风响,簡稱為AQS)。
AQS的內(nèi)部隊列
AQS是CLH隊列的一個變種羡亩,原理和CLH類似摩疑。AQS隊列內(nèi)部維護(hù)的是一個FIFO雙向鏈表,其中的每個節(jié)點其實是由線程封裝的畏铆,當(dāng)線程爭搶鎖失敗后會封裝成節(jié)點加入AQS隊列中:當(dāng)獲取鎖的線程釋放鎖以后雷袋,會從隊列中喚醒一個阻塞的節(jié)點。
AQS的內(nèi)部結(jié)構(gòu)可以參考下圖辞居。
AQS核心成員
AQS基本模板模式實現(xiàn)楷怒。AQS為鎖獲取、鎖釋放的排隊和出隊過程提供了一系列的模板方法瓦灶。由于JUC的顯式鎖種類豐富鸠删,因此AQS將不同鎖的具體操作抽取為鉤子方法,供各類鎖去實現(xiàn)贼陶。
狀態(tài)標(biāo)志位
AQS 中維持了一個單一的 volatile 修飾的狀態(tài)信息是 state刃泡,AQS使用int類型的state標(biāo)志鎖的狀態(tài)巧娱,可以理解為鎖的同步狀態(tài)。
// 同步狀態(tài)
private volatile int state;
state 因為使用了 volatile 保證了操作的可見性捅僵,AQS提供了 getState()
家卖、setState()
來獲取和設(shè)置同步狀態(tài)。
// 獲取同步狀態(tài)
protected final int getState() {
return state;
}
// 設(shè)置同步狀態(tài)
protected final void setState(int newState) {
state = neWState;
}
// 通過CAS設(shè)置同步狀態(tài)
protected final boolean compareAndSetState(int expect, int update) {
return unsafe.comapreAndSwapInt(this, stateOffset, expect, update);
}
由于 setState()
無法保證原子性庙楚,因此AQS給我們提供了 compareAndSetState()
方法利用底層 Unsafe
的 CAS機(jī)制來實現(xiàn)原子性。
AbstractQueueSynchronizer
繼承了 AbstarctOwnableSynchronizer
趴樱,這個基類只有一個變量叫 exclusiveOwnerThread
馒闷,表示當(dāng)前占用該鎖的線程,并且提供了相應(yīng)的 get()
和 set()
方法叁征。具體如下:
public abstract class AbstractOwnableSynchronizer implements java.io.Serializable {
// 表示當(dāng)前占用該鎖的線程
private transient Thread exlusiveOwnerThread;
...
}
隊列節(jié)點類
AQS是一個虛擬隊列纳账,不存在隊列實例,僅存在節(jié)點之間的前后關(guān)系捺疼。節(jié)點類型通過內(nèi)部類 Node 定義疏虫。
static final class Node {
// 節(jié)點等待狀態(tài)值:取消狀態(tài)
static final int CANCELLED = 1;
// 節(jié)點等待狀態(tài)值:標(biāo)識后繼線程處于等待狀態(tài)
static final int SIGNAL = -1;
// 節(jié)點等待狀態(tài)值:標(biāo)識當(dāng)前線程正在進(jìn)行條件等待
static final int CONDITION = -2;
// 節(jié)點等待狀態(tài)值:標(biāo)識下一次共享鎖的acquireShared操作需要無條件傳播
static final int PROPAGATE = -3;
// 節(jié)點狀態(tài)
// 普通的同步節(jié)點初始值為0,條件等待節(jié)點的初始值為CONDITION(-2)
volatile int waitState;
// 節(jié)點所對應(yīng)的線程啤呼,為搶鎖線程或者條件等待線程
volatile Thread thread;
// 前驅(qū)節(jié)點
volatile Node prev;
// 后繼節(jié)點
volatile Node next;
// 此屬性指向下一個條件等待節(jié)點
Node nextWaiter;
}
waitStatus 屬性
每個節(jié)點與等待線程關(guān)聯(lián)卧秘,每個節(jié)點維護(hù)一個狀態(tài) waitStatus,它的各個常量值具體如下:
- CANCELLED(1)官扣。1表示該線程節(jié)點已經(jīng)釋放(超時翅敌、中斷),已取消的節(jié)點不會再阻塞惕蹄,不會參與競爭蚯涮,會一直保持取消狀態(tài)。
- SIGNAL(-1)卖陵。表示其后繼的節(jié)點處于等待狀態(tài)遭顶,當(dāng)前節(jié)點對應(yīng)的線程如果釋放了同步狀態(tài)或者被取消,就會通知后繼節(jié)點泪蔫,使后繼節(jié)點的線程得以運(yùn)行棒旗。
- CONDITION(-2)。表示該線程在條件隊列中阻塞鸥滨,表示節(jié)點在等待隊列中嗦哆。當(dāng)持有鎖的線程調(diào)用了CONDITION的
signal()
方法之后,節(jié)點會從該CONDITION的等待隊列轉(zhuǎn)移到該鎖的同步隊列上婿滓,去競爭鎖老速。 - PROPAGATE(-3)。表示下一個線程獲取共享鎖后凸主,自己的共享狀態(tài)會被無條件地傳播下去橘券,因為共享鎖可能出現(xiàn)同時有N個鎖可以用。
- 0。表示當(dāng)前節(jié)點處于初始狀態(tài)旁舰。
thread成員
Node 的 thread 成員用來存放進(jìn)入AQS隊列中的線程引用锋华,Node的nextWaiter成員用來執(zhí)行自己的后繼等待節(jié)點。
搶占類型常量標(biāo)識
Node 節(jié)點還定義了兩個搶占類型常量標(biāo)識箭窜,SHARED和EXCLUSIVE毯焕,具體如下:
// 標(biāo)識節(jié)點在搶占共享鎖
static final Node SHARED = new Node();
// 表示節(jié)點在搶占獨占鎖
static final Node EXCLUSIVE = null;
FIFO雙向同步隊列
AQS通過內(nèi)置的FIFO雙向隊列來完成線程的排隊工作,內(nèi)部通過節(jié)點 head 和 tail 記錄隊首和隊尾元素磺樱,元素的節(jié)點類型為 Node 類型纳猫,具體如下:
/* 首節(jié)點的引用 */
private transient volatile Node head;
/* 尾節(jié)點的引用 */
private transient volatile Node tail;
AQS 的首尾節(jié)點都是懶加載的,需要的時候才真正創(chuàng)建竹捉。只有在線程競爭失敗的情況下芜辕,有新線程加入同步隊列時,AQS才創(chuàng)建一個head節(jié)點块差。尾節(jié)點只在有新線程阻塞時才被創(chuàng)建侵续。
JUC顯式鎖和AQS的關(guān)系
AQS是一個同步器,它實現(xiàn)了鎖的基本抽象功能憨闰,支持獨占鎖與共享鎖兩種方式状蜗,該類是使用模板模式來實現(xiàn)的,成為構(gòu)建鎖和同步器的框架起趾,使用該類可以簡單且高效地構(gòu)造出應(yīng)用廣泛的同步器诗舰。
JUC中的顯式鎖、線程同步工具等训裆,內(nèi)部都使用了AQS作為等待隊列眶根。
AQS中的模板模式
模板模式
模板模式是類的行為模式。準(zhǔn)備一個抽象類边琉,將部分的邏輯以具體方法的形式實現(xiàn)属百,然后聲明一些抽象方法來迫使子類實現(xiàn)剩余的邏輯。不同的子類提供不同的方式實現(xiàn)這些抽象方法变姨,從而對剩余的邏輯有不同的實現(xiàn)族扰。
模板模式的關(guān)鍵在于:父類提供框架性的公共邏輯,子類提供個性化的定制邏輯定欧。
模板方法(Template Method)也被稱為骨架方法渔呵,主要定義了整個方法需要實現(xiàn)的業(yè)務(wù)操作的算法框架。
鉤子方法(Hook Method)是被模板方法的算法框架所調(diào)用的砍鸠,由子類提供具體的實現(xiàn)方法扩氢。
AQS的模板流程
AQS定義了兩種資源共享方式:
- Exclusive(獨占鎖):只有一個線程能占有鎖資源。
- Share(共享鎖):多個線程可以同時占有鎖資源爷辱。
AQS 為不同的資源共享方式提供了不同的模板流程录豺。
AQS中的鉤子方法
自定義同步器時朦肘,AQS中需要重寫的鉤子方法大致如下:
- tryAcquire(int):獨占鎖鉤子,嘗試獲取資源双饥,若成功則返回true媒抠。
- tryRelease(int):獨占鎖鉤子,嘗試釋放資源咏花,若成功則返回true趴生。
- tryAcquireShared(int):共享鎖鉤子,嘗試獲取資源迟螺,負(fù)數(shù)表示失敗冲秽,0表示成功,但沒有剩余可用資源矩父,正數(shù)表示成功,且有剩余資源排霉。
- tryReleaseShared(int):共享鎖鉤子窍株,嘗試釋放資源,若成功則返回true攻柠。
- isHeldExclusively():獨占鎖鉤子球订,判斷該線程是否正在獨占資源。
以上鉤子的默認(rèn)實現(xiàn)會拋出 UnsupportOperationException 異常瑰钮。
通過AQS實現(xiàn)一把簡單的獨占鎖
本部分模擬 ReentrantLock 實現(xiàn)一個簡單的獨占鎖冒滩,真實的 ReentrantLock 要復(fù)雜很多。
public class SimpleMockLock implements Lock {
private final Sync sync = new Sync();
@Override
public void lock() {
sync.acquire(1);
}
@Override
public void unlock() {
sync.release(1);
}
private static class Sync extends AbstractQueuedSynchronizer {
// 鉤子方法
protected boolean tryAcquire(int arg) {
// CAS更新狀態(tài)值為1
if(compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
// 鉤子方法
protected boolean tryRelease(int arg) {
if(Thread.currentThread() != getExclusiveOwnerThread()) {
throw new IllegalMonitorStateException();
}
if(getState() == 0) {
throw new IllegalMonitorStateException();
}
setExclusiveOwnerThread(null);
setState(0);
return true;
}
}
}
AQS搶占鎖的原理
下面通過AQS中的方法講解一下AQS搶占鎖的原理浪谴。
AQS模板方法:acquire
acquire 是AQS 封裝好的獲取資源的公共入口开睡,它是AQS提供的利用獨占的方式獲取資源的方法,編碼實現(xiàn)如下:
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
acquire 至少執(zhí)行一次 tryAcquire 鉤子方法苟耻。若調(diào)用成功篇恒,則 acquire 直接返回,表示已經(jīng)搶到鎖凶杖,若不成功胁艰,則將線程加入等待隊列。
鉤子實現(xiàn): tryAcquire
tryAcquire 是需要實現(xiàn)的鉤子方法智蝠,可以參照我們前面實現(xiàn)的 SimpleMockLock腾么。
直接入隊:addWaiter
在 acquire 模板方法中,如果鉤子方法 tryAcquire 嘗試獲取同步狀態(tài)失敗的話杈湾,就構(gòu)造同步節(jié)點解虱,通過 addWaiter 方法將該節(jié)點加入同步隊列的隊尾。
private Node addWaiter(Node mode) {
// 創(chuàng)建新節(jié)點
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
// 加入隊列尾部毛秘,將目前的隊列tail作為自己的前驅(qū)節(jié)點pred
Node pred = tail;
if (pred != null) {
node.prev = pred;
// 現(xiàn)場時通過AQS方式修改尾節(jié)點為最新的節(jié)點
// 如果修改陳宮饭寺,將節(jié)點加入隊列的尾部
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
// 第一次嘗試添加尾部失敗阻课,意味著有并發(fā)搶鎖發(fā)生,需要進(jìn)行自旋
enq(node);
return node;
}
addWaiter 方法中艰匙,首先需要構(gòu)造一個 Node 對象限煞,其中有兩個參數(shù),第一個是當(dāng)前線程员凝,第二個是Node節(jié)點署驻,可能值為 SHARED 或 EXCLUSIVE。
自旋入隊:enq
addWaiter 第一次嘗試在尾部加節(jié)點失敗健霹,意味著有并發(fā)搶鎖發(fā)生旺上,需要進(jìn)行自旋,enq 方法通過 CAS 自旋將節(jié)點添加到隊列尾部糖埋。
/**
* 這里進(jìn)行了循環(huán)宣吱,如果此時存在tail,就執(zhí)行添加新隊尾的操作
* 如果依然不存在瞳别,就把當(dāng)前線程作為 head 節(jié)點
* 插入節(jié)點后征候,調(diào)用acquireQueued() 進(jìn)行阻塞
*/
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
// 隊列為空,初始化尾節(jié)點和頭節(jié)點為新節(jié)點
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
/**
* CAS 操作head指針祟敛,僅僅被enq()調(diào)用
*/
private final boolean compareAndSetHead(Node update) {
return unsafe.compareAndSwapObject(this, headOffset, null, update);
}
/**
* CAS 操作tail指針疤坝,僅僅被enq()調(diào)用
*/
private final boolean compareAndSetTail(Node expect, Node update) {
return unsafe.compareAndSwapObject(this, tailOffset, expect, update);
}
自旋搶占:acquireQueued
在節(jié)點入隊之后,啟動自旋搶鎖的流程馆铁,aquireQueued方法的主要邏輯:當(dāng)前Node節(jié)點線程在死循環(huán)中不斷獲取同步狀態(tài)跑揉,并且不斷在前驅(qū)節(jié)點上自旋,只有當(dāng)前驅(qū)節(jié)點是頭節(jié)點時才能嘗試獲取鎖埠巨,原因是:
- 頭結(jié)點是成功獲取同步狀態(tài)(鎖)的節(jié)點历谍,而頭節(jié)點的線程釋放了同步狀態(tài)以后,將會喚醒其后繼節(jié)點乖订,后繼節(jié)點的線程被喚醒后要檢查自己的前驅(qū)節(jié)點是否為頭節(jié)點扮饶。
- 維護(hù)同步隊列的FIFO原則,節(jié)點進(jìn)入同步隊列之后乍构,就進(jìn)入了自旋的過程甜无,每個節(jié)點都在不斷地執(zhí)行for死循環(huán)。
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
// 自選檢查當(dāng)前節(jié)點的前驅(qū)節(jié)點是否為頭節(jié)點哥遮,才能獲取鎖
for (;;) {
// 獲取節(jié)點的前驅(qū)節(jié)點
final Node p = node.predecessor();
// 節(jié)點中的線程循環(huán)地檢查自己的前驅(qū)節(jié)點是否為head節(jié)點
// 前驅(qū)節(jié)點是head時岂丘,進(jìn)一步調(diào)用子類的 tryAcquire 實現(xiàn)
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
// 檢查前一個節(jié)點的狀態(tài),預(yù)判當(dāng)前獲取鎖失敗的線程是否要掛起
// 如果需要掛起眠饮,調(diào)用 parkAndCheckInterrupt 方法掛起當(dāng)前線程奥帘,直到被喚醒
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true; // 若兩個操作都是true,則置true
}
} finally {
// 如果等待過程中沒有成功獲取資源仪召,
// 那么取消節(jié)點在隊列中的等待
if (failed)
// 取消等待寨蹋,將當(dāng)前節(jié)點從隊列中移除
cancelAcquire(node);
}
}
為了不浪費資源松蒜,acquireQueued自旋過程中會阻塞線程,等待被前驅(qū)節(jié)點喚醒后才啟動循環(huán)已旧。如果成功就返回秸苗,否則執(zhí)行 shouldParkAfterFailedAcquire、parkAndCheckInterrupt來達(dá)到阻塞的效果运褪。
掛起預(yù)判:shouldParkAfterFailedAcquire
shouldParkAfterFailedAcquire 方法的主要功能是惊楼,將當(dāng)前節(jié)點的有效前驅(qū)節(jié)點(不是CANCELLED類型的節(jié)點)找到,并且將有效前驅(qū)節(jié)點的狀態(tài)設(shè)置為SIGNAL秸讹,之后返回true代表當(dāng)前線程可以馬上被阻塞了檀咙。
具體分為三種情況:
- 如果前驅(qū)節(jié)點的狀態(tài)為-1(SIGNAL),說明前驅(qū)的等待標(biāo)志已設(shè)好璃诀,返回true表示設(shè)置完畢弧可。
- 如果前驅(qū)節(jié)點的狀態(tài)為1(CANCELLED),說明前驅(qū)節(jié)點本身不再等待了劣欢,需要跨越這些節(jié)點侣诺,然后找到一個有效節(jié)點,再把當(dāng)前節(jié)點和這個有效節(jié)點的喚醒關(guān)系建立好氧秘,調(diào)整前驅(qū)節(jié)點的next指針為自己。
- 如果其他情況:-3趴久、-2丸相、0,那么通過CAS嘗試設(shè)置前驅(qū)節(jié)點為SIGNAL彼棍,表示只要前驅(qū)節(jié)點釋放鎖灭忠,當(dāng)前節(jié)點就可以搶占鎖了。
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus; // 獲取前驅(qū)節(jié)點的狀態(tài)
if (ws == Node.SIGNAL)
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
return true;
if (ws > 0) {
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
線程掛起:parkAndCheckInterrupt
parkAndCheckInterrupt 的主要任務(wù)是暫停當(dāng)前線程座硕,具體如下:
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this); // 調(diào)用park()使線程進(jìn)入waiting狀態(tài)
return Thread.interrupted(); // 如果被喚醒弛作,查看自己是否已經(jīng)被中斷
}
AQS 會把所有等待的線程構(gòu)成一個阻塞等待隊列,當(dāng)一個線程執(zhí)行完 lock.unlock()
华匾,會激活其后繼節(jié)點映琳,通過 LockSupport.unpark(postThread)
完成后繼線程的喚醒。
AQS 的兩個關(guān)鍵點:節(jié)點的入隊和出隊
理解AQS的一個關(guān)鍵點是掌握節(jié)點的入隊和出隊蜘拉。
節(jié)點的自旋入隊
節(jié)點在第一次入隊失敗后萨西,就會開始自旋入隊,分為以下兩種情況:
如果AQS的隊列非空旭旭,新節(jié)點入隊的插入位置在隊列的尾部谎脯,并且通過CAS方式插入,插入之后AQS的tail將指向新的尾節(jié)點持寄。
如果AQS的隊列為空源梭,新節(jié)點入隊時娱俺,AQS 通過 CAS 方法將新節(jié)點設(shè)置為頭節(jié)點 head,并將 tail 指針指向新節(jié)點废麻。
private Node enq(final Node node) {
for (;;) { // 自旋入隊
Node t = tail;
if (t == null) {
// 隊列為空荠卷,初始化尾節(jié)點和頭節(jié)點為新節(jié)點
if (compareAndSetHead(new Node()))
tail = head;
} else {
// 如果隊列不為空,將新節(jié)點插入隊列尾部
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
節(jié)點的出隊
節(jié)點的出隊算法在 acquireQueued()
方法中脑溢,這是一個模板方法僵朗,acquireQueued()
方法不斷在前驅(qū)節(jié)點上自旋(for 循環(huán)),如果前驅(qū)節(jié)點是頭節(jié)點并且當(dāng)前線程使用鉤子方法 tryAcquire()
獲得了鎖屑彻,就移除頭節(jié)點验庙,將當(dāng)前節(jié)點設(shè)置為頭節(jié)點。
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
// 中斷標(biāo)記
boolean interrupted = false;
for (;;) {
// 獲取當(dāng)前線程節(jié)點的前驅(qū)節(jié)點
final Node p = node.predecessor();
// 如果前驅(qū)節(jié)點是頭節(jié)點社牲,則使當(dāng)前線程嘗試獲取鎖資源(tryAcquire方法忘了回頭看第五步)
if (p == head && tryAcquire(arg)) {
// 如果當(dāng)前程線程獲取鎖資源成功粪薛,則將當(dāng)前線程節(jié)點設(shè)置為頭節(jié)點
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
// 根據(jù)前驅(qū)節(jié)點p的等待狀態(tài)判斷是否要將當(dāng)前線程阻塞
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
// 生成CANCELLED狀態(tài)節(jié)點,并喚醒節(jié)點
if (failed)
cancelAcquire(node);
}
}
節(jié)點加入隊列尾部后搏恤,如果其前驅(qū)節(jié)點不是頭節(jié)點违寿,通常情況下,該新節(jié)點所綁定的線程會被無限期阻塞熟空,而不會去執(zhí)行無效循環(huán)藤巢,從而導(dǎo)致CPU資源的浪費。
對于公平鎖息罗,頭節(jié)點一般是占用鎖的節(jié)點掂咒,在釋放鎖時,會喚醒其后稷街店所綁定的線程迈喉,后繼節(jié)點的線程被喚醒后會重新執(zhí)行自旋搶鎖邏輯绍刮。
AQS 釋放鎖的原理
AQS 釋放鎖分成 3 個部分:
release 模板方法
首先會調(diào)用 AQS 的 release
模板方法,代碼如下:
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
可以看到其中主要有 2 個方法挨摸,一個是 tryRelease
鉤子方法孩革,該方法會嘗試釋放當(dāng)前線程持有的資源,由子類根據(jù)具體業(yè)務(wù)提供具體實現(xiàn)得运,執(zhí)行成功后返回 true
膝蜈。
另一個方法是 unparkSucessor()
,用來喚醒后繼節(jié)點澈圈,代碼如下彬檀。
private void unparkSuccessor(Node node) {
// 獲取節(jié)點狀態(tài),釋放鎖的節(jié)點瞬女,也就是頭節(jié)點
int ws = node.waitStatus;
// 若頭節(jié)點狀態(tài)小于0窍帝,則將其置為0,表示初始狀態(tài)
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
Node s = node.next; // 找到后頭的一個節(jié)點
if (s == null || s.waitStatus > 0) {
// 如果新節(jié)點已經(jīng)被取消
s = null;
// 從隊列尾部開始诽偷,往前去找醉千年的一個 waitStatus 小于0的節(jié)點
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
// 喚醒后繼節(jié)點對應(yīng)的線程
if (s != null)
LockSupport.unpark(s.thread);
}
ReentrantLock 搶鎖流程
下面分別對 ReentrantLock 的公平鎖和非公平鎖的實現(xiàn)進(jìn)行講述坤学。
ReentrantLock 非公平鎖的搶占流程
總體流程圖如下:
非公平鎖的同步器子類
ReentrantLock 為非公平鎖實現(xiàn)了一個內(nèi)部的同步器—— NofairSync
疯坤,器顯式鎖獲取方法 lock()
的源碼如下:
static final class NonfairSync extends Sync {
private static final long serialVersionUID = 7316153563782823691L;
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
}
首先用一個CAS操作判斷 state
是不是0(表示當(dāng)前鎖未被占用),如果是0就把它置為1深浮,并且設(shè)置當(dāng)前線程為該鎖的獨占線程压怠,表示獲取鎖成功。當(dāng)多個線程同時嘗試占用一個鎖時飞苇,CAS操作只能保證一個線程操作成功菌瘫,剩下的要排隊。
非公平體現(xiàn)在:如果占用鎖的線程剛剛釋放鎖布卡,state
置為0雨让,而排隊等待鎖 的線程還未喚醒,新來的線程就直接搶占了該鎖忿等,那么久“插隊”了栖忠。
非公平搶占的鉤子方法:tryAcquire
如果非公平搶占沒有成功,非公平鎖的 lock
匯之星模板方法 acquire
贸街,首先會調(diào)用鉤子方法 tryAcquire
庵寞,非公平搶占的鉤子方法實現(xiàn)如下:
static final class NonfairSync extends Sync {
...
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
}
abstract static class Sync extends AbstractQueuedSynchronizer {
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
// 先直接獲取鎖的狀態(tài)
int c = getState();
if (c == 0) {
// 如果內(nèi)部隊列首節(jié)點的線程執(zhí)行晚了,它會將鎖的state置為0
// 當(dāng)前搶鎖線程的下一步就是直接進(jìn)行搶占
// 發(fā)現(xiàn)state是空的薛匪,就直接拿來加鎖使用
if (compareAndSetState(0, acquires)) {
// 1.利用CAS自旋方式確認(rèn)當(dāng)前state確實為0捐川,然后設(shè)置acquire(1)
setExclusiveOwnerThread(current);
// 設(shè)置當(dāng)前執(zhí)行的線程,直接返回true
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
// 2.當(dāng)前線程和執(zhí)行中的線程是同一個逸尖,也就意味著可重入操作
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
// 表示當(dāng)前鎖被1個線程重復(fù)獲取了nextc次
return true;
}
// 否則就返回false属拾,表示沒有成功獲取當(dāng)前鎖,進(jìn)入排隊過程
return false;
}
...
}
其核心思想是當(dāng)前線程嘗試獲取鎖的時候冷溶,如果發(fā)現(xiàn)鎖的狀態(tài)為0,則直接嘗試將鎖拿過來尊浓,而不會考慮其他排隊節(jié)點逞频。
ReentrantLock 公平鎖的搶占流程
ReentrantLock 公平鎖的搶占流程如下:
公平鎖的同步器子類
ReentrantLock 為公平鎖實現(xiàn)了一個內(nèi)部的同步器——FairSync,其顯式鎖獲取方法 lock
的代碼如下:
static final class FairSync extends Sync {
final void lock() {
acquire(1);
}
...
}
其核心思想是通過 AQS 模板方法進(jìn)入隊列入隊操作栋齿。
公平搶占的鉤子方法:tryAcquire
公平鎖的 lock
會執(zhí)行模板方法 acquire
苗胀,該方法首先會調(diào)用鉤子方法 tryAcquire
,其實現(xiàn)如下:
static final class FairSync extends Sync {
...
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState(); // 鎖狀態(tài)
if (c == 0) {
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
}
公平鎖的鉤子方法中瓦堵,首先判斷是否有后繼節(jié)點基协,如果有,并且當(dāng)前線程不是鎖的占有線程菇用,鉤子方法就返回 false
澜驮,模板方法會進(jìn)入排隊的執(zhí)行流程。
是否有后繼節(jié)點的判斷
FairSync 是否有后繼節(jié)點的判斷代碼如下:
public final boolean hasQueuedPredecessors() {
Node t = tail;
Node h = head;
Node s;
return h != t &&
((s = h.next) == null || s.thread != Thread.currentThread());
}
執(zhí)行場景大致如下:
當(dāng)
h != t
不成立的時候惋鸥,說明h
頭節(jié)點杂穷,t
尾節(jié)點要么是同一節(jié)點悍缠。要么都是null
,此時返回false
耐量,表示沒有后繼節(jié)點飞蚓。當(dāng)
h != t
成立的時候,進(jìn)一步檢查head.next
是否為null
廊蜒,如果為null
趴拧,返回true
,這種場景在有其他線程第一次正在入隊時可能會出現(xiàn)山叮。如果
h != t
成立著榴,head.next != null
,判斷head.next
是不是當(dāng)前線程聘芜,如果是就返回false
兄渺,不是就返回true
。
AQS 條件隊列
Condition
是 JUC 用來代替?zhèn)鹘y(tǒng) Object
的 wait/notify
線程間通信與寫作機(jī)制的新組件汰现,它更加的高效挂谍。
Condition 基本原理
ConditionObject 類是實現(xiàn)條件隊列的關(guān)鍵,每個 ConditionObject 對象都維護(hù)一個單獨的條件等待隊列瞎饲。每個 ConditionObject 對應(yīng)一個條件隊列口叙,它記錄該隊列的頭結(jié)點和尾節(jié)點。
public class ConditionObject implements Condition, java.io.Serializable {
/** First node of condition queue. */
private transient Node firstWaiter;
/** Last node of condition queue. */
private transient Node lastWaiter;
}
一個 Condition 對象是一個單條件的等待隊列.
在一個顯式鎖上嗅战,我們可以創(chuàng)建多個等待任務(wù)隊列妄田,這點和內(nèi)置鎖不同,Java 內(nèi)置鎖上只有唯一的一個等待隊列驮捍。比如疟呐,我們可以使用 newCondition 創(chuàng)建兩個等待隊列,具體如下:
private Lock lock = new ReentrantLock();
//創(chuàng)建第一個等待隊列
private Condition firstCond = lock.newCondition();
//創(chuàng)建第二個等待隊列
private Condition secondCond = lock.newCondition();
await等待方法原理
當(dāng)線程調(diào)用 await
方法時东且,說明當(dāng)前線程的節(jié)點為當(dāng)前AQS隊列的頭節(jié)點启具,正好處于占有鎖的狀態(tài),await
方法需要把該線程從 AQS 隊列挪到 Condition 等待隊列里珊泳。
在 await 方法將當(dāng)前線程挪動到 Condition 等待隊列后鲁冯,還會喚醒 AQS 同步隊列中的 head 節(jié)點的下一個節(jié)點,方法代碼如下:
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
Node node = addConditionWaiter(); // 1
int savedState = fullyRelease(node); // 2
int interruptMode = 0;
while (!isOnSyncQueue(node)) { // 3
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE) // 4
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // 5
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
整體流程如下:
- 執(zhí)行
await
時色查,會新創(chuàng)建一個節(jié)點并放入 Condition 隊列尾部薯演。 - 然后釋放鎖,并喚醒AQS同步隊列中的頭節(jié)點的后一個節(jié)點秧了。
- 然后執(zhí)行 while 循環(huán)跨扮,將該節(jié)點的線程阻塞,直到該節(jié)點離開等待隊列,重新回到同步隊列成為同步節(jié)點后好港,線程才退出 while 循環(huán)愉镰。
- 退出循環(huán)后,開始調(diào)用
acquireQueued()
不斷嘗試拿鎖钧汹。 - 拿到鎖后丈探,會清空 Condition 隊列中被取消的節(jié)點。
創(chuàng)建一個新節(jié)點并放入 Condition 隊列尾部的工作由 addCondtionWaiter 方法完成拔莱。
private Node addConditionWaiter() {
Node t = lastWaiter;
// 如果尾節(jié)點取消碗降,重新定位尾節(jié)點
if (t != null && t.waitStatus != Node.CONDITION) {
unlinkCancelledWaiters();
t = lastWaiter;
}
// 創(chuàng)建一個新Node,作為等待節(jié)點
Node node = new Node(Thread.currentThread(), Node.CONDITION);
// 將新Node加入等待隊列
if (t == null)
firstWaiter = node;
else
t.nextWaiter = node;
lastWaiter = node;
return node;
}
signal喚醒方法原理
線程在某個 ConditionObject 對象上調(diào)用 signal
方法后塘秦,等待隊列中的 firstWaiter
會被加入同步隊列中锅知,等待節(jié)點被喚醒氯材,流程如下:
public final void signal() {
// 如果當(dāng)前線程不是持有該鎖的線程,就拋出異常
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignal(first); // 喚醒節(jié)點
}
// 執(zhí)行喚醒
private void doSignal(Node first) {
do {
// 出隊的代碼寫的很巧妙
// first出隊,firstWaiter頭部指向下一個節(jié)點烂叔,自己的nextWaiter
if ( (firstWaiter = first.nextWaiter) == null)
// 如果第二個節(jié)點為空愤估,則尾部也為空
lastWaiter = null;
// 將原來頭部first的后繼置空畅哑,help for GC
first.nextWaiter = null;
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}
// 將被喚醒的節(jié)點轉(zhuǎn)移到同步隊列
final boolean transferForSignal(Node node) {
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
Node p = enq(node);
int ws = p.waitStatus;
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread);
return true;
}
signal
方法的整體流程如下:
通過
enq()
方法自旋講條件隊列中的頭節(jié)點放入 AQS 同步隊列尾部旭绒,并獲取它在 AQS 隊列中的前驅(qū)節(jié)點。如果前驅(qū)節(jié)點的狀態(tài)是取消狀態(tài)京痢,或者設(shè)置前驅(qū)節(jié)點為 Signal 狀態(tài)失敗奶甘,就喚醒當(dāng)前節(jié)點的線程,否則節(jié)點在同步隊列的尾部祭椰,參與排隊臭家。
同步隊列中的線程被喚醒后,表示重新獲取了顯式鎖方淤,然后繼續(xù)執(zhí)行
condition.wait()
語句后面的臨界區(qū)代碼钉赁。
AQS實際應(yīng)用
JUC 的同一架構(gòu)如下圖所示。
AQS 建立在 CAS 原子操作和 volatile 可見性變量的基礎(chǔ)之上携茂,為上層的顯式鎖橄霉、同步工具類、阻塞隊列邑蒋、線程池、并發(fā)容器按厘、Future異步工具提供線程之間同步的基礎(chǔ)設(shè)施医吊。所以,AQS在JUC框架使用很廣泛逮京。