Java并發(fā)編程實(shí)戰(zhàn): AQS 源碼 史上最詳盡圖解+逐行注釋
引言: 學(xué)習(xí)一個(gè)java并發(fā)編程工具的時(shí)候,我們首先要抓住這三點(diǎn):
狀態(tài)
一般是一個(gè)state屬性政钟,它基本是整個(gè)工具的核心肄满,通常整個(gè)工具都是在設(shè)置和修改狀態(tài)凹髓,很多方法的操作都依賴于當(dāng)前狀態(tài)是什么凌盯。
由于狀態(tài)是全局共享的,一般會(huì)被設(shè)置成volatile類型滞时,以保證其修改的可見性;
隊(duì)列
隊(duì)列通常是一個(gè)等待對(duì)象 Node 的集合滤灯,大多數(shù)以鏈表的形式實(shí)現(xiàn)坪稽。隊(duì)列采用的是悲觀鎖的思想,表示當(dāng)前所等待的資源鳞骤,狀態(tài)或者條件短時(shí)間內(nèi)可能無法滿足窒百。因此,它會(huì)將當(dāng)前線程包裝成某種類型的數(shù)據(jù)結(jié)構(gòu) Node 豫尽,放入一個(gè)等待隊(duì)列中篙梢,當(dāng)一定條件滿足后,再從等待隊(duì)列中取出美旧。
CAS
CAS操作是最輕量的并發(fā)處理渤滞,通常我們對(duì)于狀態(tài)的修改都會(huì)用到CAS操作贬墩,因?yàn)闋顟B(tài)可能被多個(gè)線程同時(shí)修改,CAS操作保證了同一個(gè)時(shí)刻妄呕,只有一個(gè)線程能修改成功陶舞,從而保證了線程安全,CAS操作基本是由Unsafe工具類的compareAndSwapXXX來實(shí)現(xiàn)的绪励;CAS采用的是樂觀鎖的思想肿孵,因此常常伴隨著自旋,如果發(fā)現(xiàn)當(dāng)前無法成功地執(zhí)行CAS优炬,則不斷重試颁井,直到成功為止,自旋的的表現(xiàn)形式通常是一個(gè)死循環(huán)for(;;)蠢护。
[ ref: https://segmentfault.com/a/1190000015739343 ]
AbstractQueuedSynchronizer
雙向 CLH 鏈表
節(jié)點(diǎn)模型
節(jié)點(diǎn)狀態(tài)
簡(jiǎn)介
- AbstractQueuedSynchronizer是JDK實(shí)現(xiàn)其他同步工具的基礎(chǔ)雅宾。
AQS內(nèi)部封裝了一個(gè)狀態(tài)volatile int state用來表示資源,提供了獨(dú)占以及共享兩種操作:acquire(acquireShare)/release(releaseShare)葵硕。
acquire的語義是:獲取資源眉抬,如果當(dāng)前資源滿足條件,則直接返回懈凹,否則掛起當(dāng)前線程
release的語義是:釋放資源蜀变,喚醒掛起線程
特征
first-in-first-out (FIFO) wait queues
blocking locks and related synchronizers (semaphores, events, etc)
-
樂觀鎖
- 共享鎖shared是一個(gè)樂觀鎖〗槠溃可以允許多個(gè)線程阻塞點(diǎn)库北,可以多個(gè)線程同時(shí)獲取到鎖。它允許一個(gè)資源可以被多個(gè)讀操作们陆,或者被一個(gè)寫操作訪問寒瓦,但是兩個(gè)操作不能同時(shí)訪問。
- Java中的樂觀鎖基本都是通過CAS操作實(shí)現(xiàn)的坪仇,CAS是一種更新的原子操作杂腰,比較當(dāng)前值跟傳入值版本號(hào)是否一樣,一樣的更新椅文,否則失敗喂很。
-
悲觀鎖
- 獨(dú)占鎖exclusive是一個(gè)悲觀鎖。保證只有一個(gè)線程經(jīng)過一個(gè)阻塞點(diǎn)皆刺,只有一個(gè)線程可以獲得鎖少辣。
- Java中的悲觀鎖就是synchronized,AQS框架下的鎖則是先嘗試CAS樂觀鎖去獲取芹橡,獲取不到才會(huì)轉(zhuǎn)為悲觀鎖毒坛,如ReentrantLock
大量使用了CAS操作, 并且在沖突時(shí),采用自旋方式重試煎殷,以實(shí)現(xiàn)輕量級(jí)和高效地獲取鎖屯伞。
AQS可以實(shí)現(xiàn)獨(dú)占鎖和共享鎖
通過一個(gè)CLH隊(duì)列實(shí)現(xiàn)的(CLH鎖即Craig, Landin, and Hagersten (CLH) locks,CLH鎖是一個(gè)自旋鎖豪直,能確保無饑餓性劣摇,提供先來先服務(wù)的公平性。CLH鎖也是一種基于鏈表的可擴(kuò)展弓乙、高性能末融、公平的自旋鎖,申請(qǐng)線程只在本地變量上自旋暇韧,它不斷輪詢前驅(qū)的狀態(tài)勾习,如果發(fā)現(xiàn)前驅(qū)釋放了鎖就結(jié)束自旋。)
核心業(yè)務(wù)邏輯
- 1.AQS中用state屬性表示鎖同步狀態(tài)懈玻,如果能成功將state屬性通過CAS操作從0設(shè)置成1即獲取了鎖. 當(dāng)state>0時(shí)表示已經(jīng)獲取了鎖巧婶,當(dāng)state = 0無鎖。
- 2.獲取了鎖的線程才能將exclusiveOwnerThread設(shè)置成自己
- 3.addWaiter負(fù)責(zé)將當(dāng)前等待鎖的線程包裝成Node,并成功地添加到隊(duì)列的末尾涂乌,這一點(diǎn)是由它調(diào)用的enq方法保證的艺栈,enq方法同時(shí)還負(fù)責(zé)在隊(duì)列為空時(shí)初始化隊(duì)列。
- 4.acquireQueued方法用于在Node成功入隊(duì)后湾盒,繼續(xù)嘗試獲取鎖(取決于Node的前驅(qū)節(jié)點(diǎn)是不是head)湿右,或者將線程掛起
- 5.shouldParkAfterFailedAcquire方法用于保證當(dāng)前線程的前驅(qū)節(jié)點(diǎn)的waitStatus屬性值為SIGNAL,從而保證了自己掛起后,前驅(qū)節(jié)點(diǎn)會(huì)負(fù)責(zé)在合適的時(shí)候喚醒自己罚勾。
- 6.parkAndCheckInterrupt方法用于掛起當(dāng)前線程毅人,并檢查中斷狀態(tài)。
- 7.如果最終成功獲取了鎖尖殃,線程會(huì)從lock()方法返回堰塌,繼續(xù)往下執(zhí)行;否則分衫,線程會(huì)阻塞等待。
三板斧
狀態(tài)
-
volatile state屬性
- private volatile int state;
- 該屬性的值即表示了鎖的狀態(tài)般此,state為0表示鎖沒有被占用蚪战,state大于0表示當(dāng)前已經(jīng)有線程持有該鎖,這里之所以說大于0而不說等于1是因?yàn)榭赡艽嬖诳芍厝氲那闆r铐懊。你可以把state變量當(dāng)做是當(dāng)前持有該鎖的線程數(shù)量邀桑。
- CAS 操作用來改變狀態(tài)
-
waitStatus 的狀態(tài)值
-
static final int CANCELLED = 1;
- 表示Node所代表的當(dāng)前線程已經(jīng)取消了排隊(duì),即放棄獲取鎖了科乎。
-
static final int SIGNAL = -1;
- 它不是表征當(dāng)前節(jié)點(diǎn)的狀態(tài)壁畸,而是當(dāng)前節(jié)點(diǎn)的下一個(gè)節(jié)點(diǎn)的狀態(tài)。
- 當(dāng)一個(gè)節(jié)點(diǎn)的waitStatus被置為SIGNAL,就說明它的下一個(gè)節(jié)點(diǎn)(即它的后繼節(jié)點(diǎn))已經(jīng)被掛起了(或者馬上就要被掛起了)捏萍,因此在當(dāng)前節(jié)點(diǎn)釋放了鎖或者放棄獲取鎖時(shí)太抓,如果它的waitStatus屬性為SIGNAL,它還要完成一個(gè)額外的操作——喚醒它的后繼節(jié)點(diǎn)令杈。
static final int CONDITION = -2;
static final int PROPAGATE = -3;
-
-
CAS操作
-
CAS操作主要針對(duì)5個(gè)屬性
- AQS的3個(gè)屬性state,head和tail
- Node對(duì)象的兩個(gè)屬性waitStatus,next
-
private static final Unsafe unsafe = Unsafe.getUnsafe();
private static final long stateOffset;
private static final long headOffset;
private static final long tailOffset;
private static final long waitStatusOffset;
private static final long nextOffset;
static {
try {
stateOffset = unsafe.objectFieldOffset
(AbstractQueuedSynchronizer.class.getDeclaredField("state"));
headOffset = unsafe.objectFieldOffset
(AbstractQueuedSynchronizer.class.getDeclaredField("head"));
tailOffset = unsafe.objectFieldOffset
(AbstractQueuedSynchronizer.class.getDeclaredField("tail"));
waitStatusOffset = unsafe.objectFieldOffset
(Node.class.getDeclaredField("waitStatus"));
nextOffset = unsafe.objectFieldOffset
(Node.class.getDeclaredField("next"));
} catch (Exception ex) { throw new Error(ex); }
}
CAS操作代碼
protected final boolean compareAndSetState(int expect, int update) {
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
private final boolean compareAndSetHead(Node update) {
return unsafe.compareAndSwapObject(this, headOffset, null, update);
}
private final boolean compareAndSetTail(Node expect, Node update) {
return unsafe.compareAndSwapObject(this, tailOffset, expect, update);
}
private static final boolean compareAndSetWaitStatus(Node node, int expect,int update) {
return unsafe.compareAndSwapInt(node, waitStatusOffset, expect, update);
}
private static final boolean compareAndSetNext(Node node, Node expect, Node update) {
return unsafe.compareAndSwapObject(node, nextOffset, expect, update);
}
隊(duì)列
- AQS中走敌,隊(duì)列的實(shí)現(xiàn)是一個(gè)雙向鏈表,被稱為sync queue逗噩,它表示所有等待鎖的線程的集合
- AQS中的隊(duì)列是一個(gè)CLH隊(duì)列掉丽,它的head節(jié)點(diǎn)永遠(yuǎn)是一個(gè)啞結(jié)點(diǎn)(dummy node), 它不代表任何線程(某些情況下可以看做是代表了當(dāng)前持有鎖的線程),因此head所指向的Node的thread屬性永遠(yuǎn)是null异雁。只有從次頭節(jié)點(diǎn)往后的所有節(jié)點(diǎn)才代表了所有等待鎖的線程捶障。也就是說,在當(dāng)前線程沒有搶到鎖被包裝成Node扔到隊(duì)列中時(shí)纲刀,即使隊(duì)列是空的项炼,它也會(huì)排在第二個(gè),我們會(huì)在它的前面新建一個(gè)dummy節(jié)點(diǎn)
- 在并發(fā)編程中使用隊(duì)列通常是將當(dāng)前線程包裝成某種類型的數(shù)據(jù)結(jié)構(gòu)扔到等待隊(duì)列中.
- 隊(duì)列中的節(jié)點(diǎn)數(shù)據(jù)結(jié)構(gòu)
static final class Node {
// 共享
static final Node SHARED = new Node();
// 獨(dú)占
static final Node EXCLUSIVE = null;
/**
* 因?yàn)槌瑫r(shí)或者中斷柑蛇,節(jié)點(diǎn)會(huì)被設(shè)置為取消狀態(tài)芥挣,被取消的節(jié)點(diǎn)時(shí)不會(huì)參與到競(jìng)爭(zhēng)中的,他會(huì)一直保持取消狀態(tài)不會(huì)轉(zhuǎn)變?yōu)槠渌麪顟B(tài)
*/
static final int CANCELLED = 1;
/**
* 后繼節(jié)點(diǎn)的線程處于等待狀態(tài)耻台,而當(dāng)前節(jié)點(diǎn)的線程如果釋放了同步狀態(tài)或者被取消空免,將會(huì)通知后繼節(jié)點(diǎn),使后繼節(jié)點(diǎn)的線程得以運(yùn)行
* (說白了就是處于等待被喚醒的線程(或是節(jié)點(diǎn))只要前繼結(jié)點(diǎn)釋放鎖盆耽,就會(huì)通知標(biāo)識(shí)為SIGNAL狀態(tài)的后繼結(jié)點(diǎn)的線程執(zhí)行)
*/
static final int SIGNAL = -1;
/**
* 節(jié)點(diǎn)在等待隊(duì)列中蹋砚,節(jié)點(diǎn)線程等待在Condition上,當(dāng)其他線程對(duì)Condition調(diào)用了signal()后摄杂,該節(jié)點(diǎn)將會(huì)從等待隊(duì)列中轉(zhuǎn)移到同步隊(duì)列中坝咐,加入到同步狀態(tài)的獲取中
*/
static final int CONDITION = -2;
/**
* 表示下一次共享式同步狀態(tài)獲取,將會(huì)無條件地傳播下去
*/
static final int PROPAGATE = -3;
/** 等待狀態(tài) */
volatile int waitStatus;
/** 前驅(qū)節(jié)點(diǎn)析恢,當(dāng)節(jié)點(diǎn)添加到同步隊(duì)列時(shí)被設(shè)置(尾部添加) */
volatile Node prev;
/** 后繼節(jié)點(diǎn) */
volatile Node next;
/** 等待隊(duì)列中的后續(xù)節(jié)點(diǎn)墨坚。如果當(dāng)前節(jié)點(diǎn)是共享的,那么字段將是一個(gè) SHARED 常量映挂,也就是說節(jié)點(diǎn)類型(獨(dú)占和共享)和等待隊(duì)列中的后續(xù)節(jié)點(diǎn)共用同一個(gè)字段 */
Node nextWaiter;
/** 獲取同步狀態(tài)的線程 */
volatile Thread thread;
final boolean isShared() {
return nextWaiter == SHARED;
}
final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}
Node() { // Used to establish initial head or SHARED marker
}
Node(Thread thread, Node mode) { // Used by addWaiter
this.nextWaiter = mode;
this.thread = thread;
}
Node(Thread thread, int waitStatus) { // Used by Condition
this.waitStatus = waitStatus;
this.thread = thread;
}
}
狀態(tài)變量waitStatus
表示當(dāng)前Node所代表的線程的等待鎖的狀態(tài)泽篮,在獨(dú)占鎖模式下,我們只需要關(guān)注CANCELLED SIGNAL兩種狀態(tài)即可柑船。nextWaiter屬性
在獨(dú)占鎖模式下永遠(yuǎn)為null帽撑,僅僅起到一個(gè)標(biāo)記作用,沒有實(shí)際意義鞍时。
AQS核心屬性
鎖相關(guān)的屬性有兩個(gè)
- private volatile int state; //鎖的狀態(tài)
- private transient Thread exclusiveOwnerThread; // 當(dāng)前持有鎖的線程亏拉,注意這個(gè)屬性是從AbstractOwnableSynchronizer繼承而來
sync queue相關(guān)的屬性有兩個(gè)
- private transient volatile Node head; // 隊(duì)頭扣蜻,為dummy node
- private transient volatile Node tail; // 隊(duì)尾,新入隊(duì)的節(jié)點(diǎn)
隊(duì)列中的Node的屬性
// 節(jié)點(diǎn)所代表的線程
volatile Thread thread;
// 雙向鏈表及塘,每個(gè)節(jié)點(diǎn)需要保存自己的前驅(qū)節(jié)點(diǎn)和后繼節(jié)點(diǎn)的引用
volatile Node prev;
volatile Node next;
// 線程所處的等待鎖的狀態(tài)莽使,初始化時(shí),該值為0
volatile int waitStatus;
static final int CANCELLED = 1;
static final int SIGNAL = -1;
acquire分析
tryAcquire()嘗試直接去獲取資源磷蛹,如果成功則直接返回吮旅;
addWaiter()將該線程加入等待隊(duì)列的尾部,并標(biāo)記為獨(dú)占模式味咳;
acquireQueued()使線程在等待隊(duì)列中獲取資源庇勃,一直獲取到資源后才返回。如果在整個(gè)等待過程中被中斷過槽驶,則返回true责嚷,否則返回false。
如果線程在等待過程中被中斷過掂铐,先不響應(yīng)的罕拂。在獲取資源后才再進(jìn)行自我中斷selfInterrupt()
tryAcquire(arg) : 獲取鎖的業(yè)務(wù)邏輯
- 判斷當(dāng)前鎖有沒有被占用:
1.如果鎖沒有被占用, 嘗試以公平的方式獲取鎖
2.如果鎖已經(jīng)被占用, 檢查是不是鎖重入
獲取鎖成功返回true, 失敗則返回false
addWaiter(Node mode)
當(dāng)tryAcquire失敗后,才會(huì)調(diào)用acquireQueued(addWaiter(Node.EXCLUSIVE), arg)全陨,addWaiter方法用于將當(dāng)前線程加入到等待隊(duì)列的隊(duì)尾爆班,并返回當(dāng)前線程所在的結(jié)點(diǎn)。
使用了自旋保證插入隊(duì)尾成功辱姨。
在獲取鎖失敗后調(diào)用, 將當(dāng)前請(qǐng)求鎖的線程包裝成Node扔到sync queue中去柿菩,并返回這個(gè)Node。
源代碼
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
// 如果隊(duì)列不為空, 則用CAS方式將當(dāng)前節(jié)點(diǎn)設(shè)為尾節(jié)點(diǎn)
if (pred != null) {
node.prev = pred;
// 檢查tail的狀態(tài)雨涛,如果當(dāng)前是pred
if (compareAndSetTail(pred, node)) { // 將當(dāng)前節(jié)點(diǎn)設(shè)為尾節(jié)點(diǎn)
pred.next = node; // 把tail的next節(jié)點(diǎn)指向當(dāng)前Node
return node;
}
}
// 代碼會(huì)執(zhí)行到這里, 只有兩種情況:
// 1. 隊(duì)列為空
// 2. CAS失敗
// 注意, 這里是并發(fā)條件下, 所以什么都有可能發(fā)生, 尤其注意CAS失敗后也會(huì)來到這里. 例如: 有可能其他線程已經(jīng)成為了新的尾節(jié)點(diǎn)枢舶,導(dǎo)致尾節(jié)點(diǎn)不再是我們之前看到的那個(gè)pred了。
// 如果當(dāng)前node插入隊(duì)尾失敗替久,則通過自旋保證替換成功(自旋+CAS)
enq(node);
return node;
}
-
enq() 方法
- 在該方法中, 我們使用了死循環(huán), 即以自旋方式將節(jié)點(diǎn)插入隊(duì)列凉泄,如果失敗則不停的嘗試, 直到成功為止, 另外, 該方法也負(fù)責(zé)在隊(duì)列為空時(shí), 初始化隊(duì)列,這也說明蚯根,隊(duì)列是延時(shí)初始化的(lazily initialized):
private Node enq(final Node node) {
for (;;) {
Node t = tail;
// 如果是空隊(duì)列, 首先進(jìn)行初始化
// 這里也可以看出, 隊(duì)列不是在構(gòu)造的時(shí)候初始化的, 而是延遲到需要用的時(shí)候再初始化, 以提升性能
if (t == null) {
// 注意后众,初始化時(shí)使用new Node()方法新建了一個(gè)dummy節(jié)點(diǎn)
// 從這里可以看出, 在這個(gè)等待隊(duì)列中,頭結(jié)點(diǎn)是一個(gè)“啞節(jié)點(diǎn)”颅拦,它不代表任何等待的線程吼具。
// head節(jié)點(diǎn)不代表任何線程,它就是一個(gè)空節(jié)點(diǎn)矩距!
if (compareAndSetHead(new Node()))
tail = head; // 這里僅僅是將尾節(jié)點(diǎn)指向dummy節(jié)點(diǎn),并沒有返回
} else {
// 到這里說明隊(duì)列已經(jīng)不是空的了, 這個(gè)時(shí)候再繼續(xù)嘗試將節(jié)點(diǎn)加到隊(duì)尾
// 1.設(shè)置node的前驅(qū)節(jié)點(diǎn)為當(dāng)前的尾節(jié)點(diǎn)
node.prev = t;
// 2.修改tail屬性怖竭,使它指向當(dāng)前節(jié)點(diǎn); 這里的CAS保證了同一時(shí)刻只有一個(gè)節(jié)點(diǎn)能成為尾節(jié)點(diǎn)锥债,其他節(jié)點(diǎn)將失敗,失敗后將回到for循環(huán)中繼續(xù)重試。
if (compareAndSetTail(t, node)) {
// 3.修改原來的尾節(jié)點(diǎn)哮肚,使它的next指向當(dāng)前節(jié)點(diǎn)
t.next = node;
return t;
}
}
}
}
將一個(gè)節(jié)點(diǎn)node添加到sync queue的末尾需要三步:
1.設(shè)置node的前驅(qū)節(jié)點(diǎn)為當(dāng)前的尾節(jié)點(diǎn):node.prev = t
2.修改tail屬性登夫,使它指向當(dāng)前節(jié)點(diǎn)
3.修改原來的尾節(jié)點(diǎn),使它的next指向當(dāng)前節(jié)點(diǎn)
- 尾分叉
// Step1
node.prev = t;
// Step2
if (compareAndSetTail(t, node)) {
// Step3
t.next = node;
return t;
}
需要注意允趟,這里的三步并不是一個(gè)原子操作恼策,第一步很容易成功;而第二步由于是一個(gè)CAS操作潮剪,在并發(fā)條件下有可能失敗涣楷,第三步只有在第二步成功的條件下才執(zhí)行。這里的CAS保證了同一時(shí)刻只有一個(gè)節(jié)點(diǎn)能成為尾節(jié)點(diǎn)抗碰,其他節(jié)點(diǎn)將失敗狮斗,失敗后將回到for循環(huán)中繼續(xù)重試。
所以弧蝇,當(dāng)有大量的線程在同時(shí)入隊(duì)的時(shí)候碳褒,同一時(shí)刻,只有一個(gè)線程能完整地完成這三步看疗,而其他線程只能完成第一步沙峻,于是就出現(xiàn)了尾分叉.
這里第三步是在第二步執(zhí)行成功后才執(zhí)行的,這就意味著两芳,有可能即使我們已經(jīng)完成了第二步摔寨,將新的節(jié)點(diǎn)設(shè)置成了尾節(jié)點(diǎn),此時(shí)原來舊的尾節(jié)點(diǎn)的next值可能還是null(因?yàn)檫€沒有來的及執(zhí)行第三步)盗扇,所以如果此時(shí)有線程恰巧從頭節(jié)點(diǎn)開始向后遍歷整個(gè)鏈表祷肯,則它是遍歷不到新加進(jìn)來的尾節(jié)點(diǎn)的,但是這顯然是不合理的疗隶,因?yàn)楝F(xiàn)在的tail已經(jīng)指向了新的尾節(jié)點(diǎn)佑笋。
另一方面,當(dāng)我們完成了第二步之后斑鼻,第一步一定是完成了的蒋纬,所以如果我們從尾節(jié)點(diǎn)開始向前遍歷,已經(jīng)可以遍歷到所有的節(jié)點(diǎn)坚弱。
這也就是為什么我們?cè)贏QS相關(guān)的源碼中 (比如:unparkSuccessor(Node node) 中的:
for (Node t = tail; t != null && t != node; t = t.prev))
通常是從尾節(jié)點(diǎn)開始逆向遍歷鏈表——因?yàn)橐粋€(gè)節(jié)點(diǎn)要能入隊(duì)蜀备,則它的prev屬性一定是有值的,但是它的next屬性可能暫時(shí)還沒有值荒叶。
至于那些“分叉”的入隊(duì)失敗的其他節(jié)點(diǎn)碾阁,在下一輪的循環(huán)中,它們的prev屬性會(huì)重新指向新的尾節(jié)點(diǎn)些楣,繼續(xù)嘗試新的CAS操作脂凶,最終宪睹,所有節(jié)點(diǎn)都會(huì)通過自旋不斷的嘗試入隊(duì),直到成功為止蚕钦。
acquireQueued(final Node node, int arg)
addWaiter的將當(dāng)前線程加入隊(duì)列后亭病,使用acquireQueued進(jìn)行阻塞,直到獲取到資源后返回嘶居。
condition = lock.newCondition();
lock.lock();
try{
while(!條件謂詞成立){
condition.await();
}
}
finally{
lock.unlock();
}
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
// 當(dāng)前節(jié)點(diǎn)的前驅(qū)是 head 節(jié)點(diǎn)時(shí), 再次嘗試獲取鎖
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
//在獲取鎖失敗后, 判斷是否需要把當(dāng)前線程掛起
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
shouldParkAfterFailedAcquire(Node pred, Node node)
- 這個(gè)函數(shù)只有在當(dāng)前節(jié)點(diǎn)的前驅(qū)節(jié)點(diǎn)的waitStatus狀態(tài)本身就是SIGNAL的時(shí)候才會(huì)返回true, 其他時(shí)候都會(huì)返回false:
// Returns true if thread should block.
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus; // 獲得前驅(qū)節(jié)點(diǎn)的ws
if (ws == Node.SIGNAL)
// 前驅(qū)節(jié)點(diǎn)的狀態(tài)已經(jīng)是SIGNAL了(This node has already set status asking a release)罪帖,說明鬧鐘已經(jīng)設(shè)了,可以直接高枕無憂地睡了(so it can safely park)
return true;
if (ws > 0) {
// 當(dāng)前節(jié)點(diǎn)的 ws > 0, 則為 Node.CANCELLED 說明前驅(qū)節(jié)點(diǎn)已經(jīng)取消了等待鎖(由于超時(shí)或者中斷等原因)
// 既然前驅(qū)節(jié)點(diǎn)不等了, 那就繼續(xù)往前找, 直到找到一個(gè)還在等待鎖的節(jié)點(diǎn)
// 然后我們跨過這些不等待鎖的節(jié)點(diǎn), 直接排在等待鎖的節(jié)點(diǎn)的后面 (是不是很開心!!!)
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
// 前驅(qū)節(jié)點(diǎn)的狀態(tài)既不是SIGNAL邮屁,也不是CANCELLED
// 用CAS設(shè)置前驅(qū)節(jié)點(diǎn)的ws為 Node.SIGNAL整袁,給自己定一個(gè)鬧鐘
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
- 提示 waitStatus
/** waitStatus value to indicate thread has cancelled */
static final int CANCELLED = 1;
/** waitStatus value to indicate successor's thread needs unparking */
static final int SIGNAL = -1;
/** waitStatus value to indicate thread is waiting on condition */
static final int CONDITION = -2;
/**
* waitStatus value to indicate the next acquireShared should
* unconditionally propagate
*/
static final int PROPAGATE = -3;
parkAndCheckInterrupt()
到這個(gè)函數(shù)已經(jīng)是最后一步了, 就是將線程掛起, 等待被喚醒. Convenience method to park and then check if interrupted. return true if interrupted
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this); // 線程被掛起,停在這里不再往下執(zhí)行了
return Thread.interrupted();
}
LockSupport.park()
public class LockSupport extends Object
用于創(chuàng)建鎖和其他同步類的基本線程阻塞原語樱报。
- 源代碼
public static void park(Object blocker) {
Thread t = Thread.currentThread();
setBlocker(t, blocker);
UNSAFE.park(false, 0L);
setBlocker(t, null);
}
private static void setBlocker(Thread t, Object arg) {
// Even though volatile, hotspot doesn't need a write barrier here.
UNSAFE.putObject(t, parkBlockerOffset, arg);
}
- public static void park(Object blocker) :
Disables the current thread for thread scheduling purposes unless the permit is available.
If the permit is available then it is consumed and the call returns immediately; otherwise the current thread becomes disabled for thread scheduling purposes and lies dormant until one of three things happens:
Some other thread invokes unpark with the current thread as the target; or
Some other thread interrupts the current thread; or
The call spuriously (that is, for no reason) returns.
This method does not report which of these caused the method to return. Callers should re-check the conditions which caused the thread to park in the first place. Callers may also determine, for example, the interrupt status of the thread upon return.
獨(dú)占鎖
同一時(shí)刻葬项,鎖只能被一個(gè)線程所持有。
通過state變量是否為0迹蛤,我們可以分辨當(dāng)前鎖是否被占用民珍,但光知道鎖是不是被占用是不夠的,我們并不知道占用鎖的線程是哪一個(gè)盗飒。
在AQS中嚷量,通過exclusiveOwnerThread (獨(dú)占鎖擁有者)屬性來保存占用鎖的線程是哪一個(gè)
package java.util.concurrent.locks;
/**
* A synchronizer that may be exclusively owned by a thread. This
* class provides a basis for creating locks and related synchronizers
* that may entail a notion of ownership. The
* {@code AbstractOwnableSynchronizer} class itself does not manage or
* use this information. However, subclasses and tools may use
* appropriately maintained values to help control and monitor access
* and provide diagnostics.
*
* @since 1.6
* @author Doug Lea
*/
public abstract class AbstractOwnableSynchronizer
implements java.io.Serializable {
/** Use serial ID even though all fields transient. */
private static final long serialVersionUID = 3737899427754241961L;
/**
* Empty constructor for use by subclasses.
*/
protected AbstractOwnableSynchronizer() { }
/**
* The current owner of exclusive mode synchronization.
* 當(dāng)前持有鎖的線程
*/
private transient Thread exclusiveOwnerThread;
/**
* Sets the thread that currently owns exclusive access.
* A {@code null} argument indicates that no thread owns access.
* This method does not otherwise impose any synchronization or
* {@code volatile} field accesses.
* @param thread the owner thread
*/
protected final void setExclusiveOwnerThread(Thread thread) {
exclusiveOwnerThread = thread;
}
/**
* Returns the thread last set by {@code setExclusiveOwnerThread},
* or {@code null} if never set. This method does not otherwise
* impose any synchronization or {@code volatile} field accesses.
* @return the owner thread
*/
protected final Thread getExclusiveOwnerThread() {
return exclusiveOwnerThread;
}
}
ReentrantLock 源碼分析
ReentrantLock有 公平鎖 和 非公平鎖 兩種實(shí)現(xiàn), 默認(rèn)實(shí)現(xiàn)為非公平鎖, 這體現(xiàn)在它的構(gòu)造函數(shù)中:
public class ReentrantLock implements Lock, java.io.Serializable {
/** Synchronizer providing all implementation mechanics */
private final Sync sync;
/**
* Base of synchronization control for this lock. Subclassed
* into fair and nonfair versions below. Uses AQS state to
* represent the number of holds on the lock.
*/
abstract static class Sync extends AbstractQueuedSynchronizer {
...
}
/**
* Sync object for non-fair locks
*/
static final class NonfairSync extends Sync{
...
}
/**
* Sync object for fair locks
*/
static final class FairSync extends Sync {
...
}
/**
* Creates an instance of {@code ReentrantLock}.
* This is equivalent to using {@code ReentrantLock(false)}.
*/
public ReentrantLock() {
// 默認(rèn)構(gòu)造函數(shù), 非公平鎖
sync = new NonfairSync();
}
/**
* Creates an instance of {@code ReentrantLock} with the
* given fairness policy.
*
* @param fair {@code true} if this lock should use a fair ordering policy
*/
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
// 獲取鎖
public void lock() {
sync.lock();
}
...
}
FairLock
- 源代碼
/**
* Sync object for fair locks
*/
static final class FairSync extends Sync {
private static final long serialVersionUID = -3000897897090466540L;
final void lock() { // final的lock()方法
// 獲取鎖
acquire(1);
}
/**
* Fair version of tryAcquire. Don't grant access unless
* recursive call or no waiters or is first.
*/
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
// 首先獲取當(dāng)前鎖的狀態(tài)
int c = getState();
// c=0 說明當(dāng)前鎖是avaiable的, 沒有被任何線程占用, 可以嘗試獲取
// 因?yàn)槭菍?shí)現(xiàn)公平鎖, 所以在搶占之前首先看看隊(duì)列中有沒有排在自己前面的Node
// 如果沒有人在排隊(duì), 則通過CAS方式獲取鎖, 就可以直接退出了
if (c == 0) {
//檢測(cè)自己是不是head節(jié)點(diǎn)的后繼節(jié)點(diǎn),即處在阻塞隊(duì)列第一位的節(jié)點(diǎn)
if (!hasQueuedPredecessors() &&
// 當(dāng)前線程還沒有獲得鎖逆趣,所以可能存在多線程同時(shí)在競(jìng)爭(zhēng)鎖的情況, 所以這里使用CAS操作設(shè)置 state
compareAndSetState(0, acquires)) {
// 將當(dāng)前線程設(shè)置為占用鎖的線程
setExclusiveOwnerThread(current);
return true;
}
}
// 如果 c>0 說明鎖已經(jīng)被占用了
// 對(duì)于可重入鎖, 這個(gè)時(shí)候檢查占用鎖的線程是不是就是當(dāng)前線程,是的話,說明已經(jīng)拿到了鎖, 直接重入就行
else if (current == getExclusiveOwnerThread()) {
// 重入鎖, state 加1 (acquires)
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
// 調(diào)用setState方法時(shí)蝶溶,是在當(dāng)前線程已經(jīng)是持有鎖的情況下,因此對(duì)state的修改是安全的宣渗,只需要普通的方法就可以了抖所。
setState(nextc);
return true;
}
// 到這里說明有人占用了鎖, 并且占用鎖的不是當(dāng)前線程, 則獲取鎖失敗
return false;
}
}
獲取鎖的業(yè)務(wù)邏輯小結(jié)
1.獲取鎖其實(shí)主要就是干一件事:
將state的狀態(tài)通過CAS操作由0改寫成1.
由于是CAS操作,必然是只有一個(gè)線程能執(zhí)行成功痕囱。則執(zhí)行成功的線程即獲取了鎖田轧,在這之后,才有權(quán)利將exclusiveOwnerThread的值設(shè)成自己鞍恢,從而成為“王鎖擁有者”傻粘。
2.另外對(duì)于可重入鎖,如果當(dāng)前線程已經(jīng)是獲取了鎖的線程了帮掉,它還要注意增加鎖的重入次數(shù)弦悉。
3.值得一提的是,這里修改state狀態(tài)的操作蟆炊,一個(gè)用了CAS方法compareAndSetState稽莉,一個(gè)用了普通的setState方法。這是因?yàn)橛肅AS操作時(shí)涩搓,當(dāng)前線程還沒有獲得鎖污秆,所以可能存在多線程同時(shí)在競(jìng)爭(zhēng)鎖的情況后室;而調(diào)用setState方法時(shí),是在當(dāng)前線程已經(jīng)是持有鎖的情況下混狠,因此對(duì)state的修改是安全的,只需要普通的方法就可以了疾层。
因此将饺,在多線程條件下看源碼時(shí),我們一定要時(shí)刻在心中問自己:
這段代碼是否是線程安全的痛黎?同一時(shí)刻是否可能有多個(gè)線程在執(zhí)行這行代碼?
獲取鎖的流程 : aquire() 方法
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
// Acquires in exclusive mode
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
tryAcquire(arg)
/**
* Fair version of tryAcquire. Don't grant access unless
* recursive call or no waiters or is first.
*/
protected final boolean tryAcquire(int acquires) {
// 當(dāng)前運(yùn)行的線程
final Thread current = Thread.currentThread();
// 獲取當(dāng)前AQS的 synchronization state
int c = getState();
if (c == 0) {
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
}
hasQueuedPredecessors() 判斷是否有等待時(shí)間比當(dāng)前線程更長的線程:
返回結(jié)果:
true, 如果當(dāng)前線程之前有一個(gè)排隊(duì)的線程;
如果當(dāng)前線程在隊(duì)列的頭部或隊(duì)列為空予弧, false-
等同于:
getFirstQueuedThread() != Thread.currentThread() && hasQueuedThreads()
public final boolean hasQueuedPredecessors() {
// The correctness of this depends on head being initialized
// before tail and on head.next being accurate if the current
// thread is first in queue.
Node t = tail; // Read fields in reverse initialization order
Node h = head;
Node s;
return h != t &&
((s = h.next) == null || s.thread != Thread.currentThread());
}
- addWaiter(Node mode)
- acquireQueued(final Node node, int arg)
- selfInterrupt
Java內(nèi)存模型
8種操作
- lock(鎖定):作用于主內(nèi)存的變量,把一個(gè)變量標(biāo)識(shí)為一條線程獨(dú)占狀態(tài)湖饱。
- unlock(解鎖):作用于主內(nèi)存變量掖蛤,把一個(gè)處于鎖定狀態(tài)的變量釋放出來,釋放后的變量才可以被其他線程鎖定井厌。
- read(讀闰就ァ):作用于主內(nèi)存變量,把一個(gè)變量值從主內(nèi)存?zhèn)鬏數(shù)骄€程的工作內(nèi)存中仅仆,以便隨后的load動(dòng)作使用
- load(載入):作用于工作內(nèi)存的變量器赞,它把read操作從主內(nèi)存中得到的變量值放入工作內(nèi)存的變量副本中。
- use(使用):作用于工作內(nèi)存的變量墓拜,把工作內(nèi)存中的一個(gè)變量值傳遞給執(zhí)行引擎港柜,每當(dāng)虛擬機(jī)遇到一個(gè)需要使用變量的值的字節(jié)碼指令時(shí)將會(huì)執(zhí)行這個(gè)操作。
- assign(賦值):作用于工作內(nèi)存的變量咳榜,它把一個(gè)從執(zhí)行引擎接收到的值賦值給工作內(nèi)存的變量夏醉,每當(dāng)虛擬機(jī)遇到一個(gè)給變量賦值的字節(jié)碼指令時(shí)執(zhí)行這個(gè)操作。
- store(存儲(chǔ)):作用于工作內(nèi)存的變量涌韩,把工作內(nèi)存中的一個(gè)變量的值傳送到主內(nèi)存中镣典,以便隨后的write的操作梯浪。
- write(寫入):作用于主內(nèi)存的變量,它把store操作從工作內(nèi)存中一個(gè)變量的值傳送到主內(nèi)存的變量中。
操作規(guī)則
- 不允許read和load,store和write操作之一單獨(dú)出現(xiàn)茄袖。
- 不允許一個(gè)線程丟棄它最近的assign操作。即變量在工作內(nèi)存中改變了賬號(hào)必須把變化同步回主內(nèi)存肮砾。
- 不允許一個(gè)線程無原因地(沒有發(fā)生過任何assign操作)把數(shù)據(jù)從工作內(nèi)存同步回主內(nèi)存中饵史。
- 一個(gè)新的變量只允許在主內(nèi)存中誕生,不允許工作內(nèi)存直接使用未初始化的變量嘴拢。
- 一個(gè)變量同一時(shí)刻只允許一條線程進(jìn)行l(wèi)ock操作桩盲,但同一線程可以lock多次,lock多次之后必須執(zhí)行同樣次數(shù)的unlock操作席吴。
- 如果對(duì)一個(gè)變量執(zhí)行l(wèi)ock操作赌结,將會(huì)清空工作內(nèi)存中此變量的值捞蛋,在執(zhí)行引擎使用這個(gè)變量前需要重新執(zhí)行l(wèi)oad或assign操作初始化變量的值。
- 如果一個(gè)變量事先沒有被lock操作鎖定柬姚,則不允許對(duì)它執(zhí)行unlock操作拟杉;也不允許去unlock一個(gè)被其他線程鎖定的變量。
- 對(duì)一個(gè)變量執(zhí)行unlock操作之前量承,必須先把此變量同步到主內(nèi)存中(執(zhí)行store和write操作)搬设。
3個(gè)特征
-
原子性
- 不會(huì)被線程調(diào)度機(jī)制打斷的操作;這種操作一旦開始撕捍,就一直運(yùn)行到結(jié)束拿穴,中間不會(huì)有任何 context switch (換到另一個(gè)線程) 如: 賦值或者return。比如”a = 1;”和 “return a;”這樣的操作都具有原子性忧风。
- 如果代碼不能保證操作為原子操作默色,可以使用synchronized來保證原子操作
-
可見性
- 當(dāng)一個(gè)線程修改了共享變量的值,其他線程能夠立即得知這個(gè)修改狮腿。volatile就是干這個(gè)的腿宰。
java內(nèi)存模型是通過在變量修改后將新值同步回主內(nèi)存,在變量讀取前從主內(nèi)存刷新變量蚤霞。
普通變量與volatile變量的區(qū)別是:volatile的特殊規(guī)則保證了新值能立即同步到主內(nèi)存酗失,以及每次使用前立即從主內(nèi)存刷新。
能保證可見性還有synchronized和final
-
有序性
- MM的有序性表現(xiàn)為:如果在本線程內(nèi)觀察昧绣,所有的操作都是有序的规肴;如果在一個(gè)線程中觀察另一個(gè)線程,所有的操作都是無序的夜畴。前半句指“線程內(nèi)表現(xiàn)為串行的語義”(as-if-serial)拖刃,后半句值“指令重排序”和普通變量的”工作內(nèi)存與主內(nèi)存同步延遲“的現(xiàn)象。
volatile贪绘,與synchronized 可以保證有序性兑牡。
重排序
在執(zhí)行程序時(shí)為了提高性能,編譯器和處理器經(jīng)常會(huì)對(duì)指令進(jìn)行重排序税灌。從硬件架構(gòu)上來說均函,指令重排序是指CPU采用了允許將多條指令不按照程序規(guī)定的順序,分開發(fā)送給各個(gè)相應(yīng)電路單元處理菱涤,而不是指令任意重排苞也。重排序分成三種類型:
編譯器優(yōu)化的重排序:編譯器在不改變單線程程序語義放入前提下,可以重新安排語句的執(zhí)行順序粘秆。
指令級(jí)并行的重排序:現(xiàn)代處理器采用了指令級(jí)并行技術(shù)來將多條指令重疊執(zhí)行如迟。如果不存在數(shù)據(jù)依賴性,處理器可以改變語句對(duì)應(yīng)機(jī)器指令的執(zhí)行順序。
內(nèi)存系統(tǒng)的重排序:由于處理器使用緩存和讀寫緩沖區(qū)殷勘,這使得加載和存儲(chǔ)操作看上去可能是在亂序執(zhí)行此再。
-
內(nèi)存屏障指令
- LoadLoad、LoadStore玲销、StoreLoad和StoreStore
先行發(fā)生原則(happens-before)
如果操作A先行發(fā)生于操作B输拇,則A產(chǎn)生的影響能被操作B觀察到,“影響”包括修改了內(nèi)存中共享變量的值贤斜、發(fā)送了消息淳附、調(diào)用了方法等。如果兩個(gè)操作滿足happens-before原則蠢古,那么不需要進(jìn)行同步操作,JVM能夠保證操作具有順序性别凹,此時(shí)不能夠隨意的重排序草讶。否則,無法保證順序性炉菲,就能進(jìn)行指令的重排序堕战。
-
基本規(guī)則
- 程序次序規(guī)則(Program Order Rule):在同一個(gè)線程中,按照程序代碼順序拍霜,書寫在前面的操作先行發(fā)生于書寫在后面的操縱嘱丢。準(zhǔn)確的說是程序的控制流順序,考慮分支和循環(huán)等祠饺。
- 管理鎖定規(guī)則(Monitor Lock Rule):一個(gè)unlock操作先行發(fā)生于后面(時(shí)間上的順序)對(duì)同一個(gè)鎖的lock操作越驻。
- volatile變量規(guī)則(Volatile Variable Rule):對(duì)一個(gè)volatile變量的寫操作先行發(fā)生于后面(時(shí)間上的順序)對(duì)該變量的讀操作。
- 線程啟動(dòng)規(guī)則(Thread Start Rule):Thread對(duì)象的start()方法先行發(fā)生于此線程的每一個(gè)動(dòng)作道偷。
- 線程終止規(guī)則(Thread Termination Rule):線程的所有操作都先行發(fā)生于對(duì)此線程的終止檢測(cè)缀旁,可以通過Thread.join()方法結(jié)束、Thread.isAlive()的返回值等手段檢測(cè)到線程已經(jīng)終止執(zhí)行勺鸦。
- 線程中斷規(guī)則(Thread Interruption Rule):對(duì)線程interrupt()方法的調(diào)用先行發(fā)生于被中斷線程的代碼檢測(cè)到中斷時(shí)事件的發(fā)生并巍。Thread.interrupted()可以檢測(cè)是否有中斷發(fā)生。
- 對(duì)象終結(jié)規(guī)則(Finilizer Rule):一個(gè)對(duì)象的初始化完成(構(gòu)造函數(shù)執(zhí)行結(jié)束)先行發(fā)生于它的finalize()的開始换途。
- 傳遞性(Transitivity):如果操作A 先行發(fā)生于操作B懊渡,操作B 先行發(fā)生于操作C,那么可以得出A 先行發(fā)生于操作C军拟。
參考資料
https://docs.oracle.com/javase/8/docs/api/
https://blog.csdn.net/piaoslowly/article/details/81460002
https://segmentfault.com/a/1190000015739343
https://segmentfault.com/a/1190000015752512
Kotlin 開發(fā)者社區(qū)
國內(nèi)第一Kotlin 開發(fā)者社區(qū)公眾號(hào)剃执,主要分享、交流 Kotlin 編程語言吻谋、Spring Boot忠蝗、Android、React.js/Node.js漓拾、函數(shù)式編程阁最、編程思想等相關(guān)主題戒祠。
學(xué)習(xí)筆記思維導(dǎo)圖: