1. LockSupport工具類
? JDK 中的jr.jar包里面的LockSupport是個工具類,主要作用是掛起和喚醒線程.
? 其內(nèi)部實現(xiàn)是native方法.
? LockSupport類與每個使用它的線程都會關聯(lián)一個許可證(鎖),在默認情況下調(diào)用時是不具有許可證的.
LockSupport和wait(),notify()系列方法的區(qū)別:
? ①LockSupport不需要在同步代碼塊里勇蝙。所以線程間也不需要維護一個共享的同步對象了呛谜,實現(xiàn)了線程間的解耦诅岩。
? ②unpark函數(shù)可以優(yōu)先于park調(diào)用卜壕,所以不需要擔心線程間的執(zhí)行先后順序椅您。(線程A連續(xù)調(diào)用兩次LockSupport.unpark(B)方法喚醒線程B萎坷,然后線程B調(diào)用兩次LockSupport.park()方法嘱兼, 線程B依舊會被阻塞悦冀。因為unpark()方法是更改標志位為1,而不是加一)
(1). void park()方法
? 如果沒有許可證,掛起.
(2). void unpark(Thread thread)方法
? thread線程立即獲取許可證,如果當前狀態(tài)為被阻塞,立即喚醒.
(3). void parkNanos(long nanos)方法
? 如果沒有許可證,掛起nanos微秒
(4). park(Object blocker)方法
? 將blocker變量存放到調(diào)用park方法掛起的線程中,推薦將this放入,可以從日志中知道在那個類中的代碼發(fā)生了掛起
(5). void parkNanos(Object blocker, long nanos)方法
? 相比上個方法多了超時時間
(6). void parkUntil(Object blocker, long deadline)方法
? 阻塞到時間戳deadline.
2. AQS-----鎖的底層支持
? ==AbstractQueuedSynchronizer抽象同步隊列簡稱AQS,是實現(xiàn)同步器的基礎組件==,并發(fā)包中鎖的底層就是使用AQS實現(xiàn)的.
? ==AQS是一個FIFO的雙向隊列,隊列元素為Node. Node中的thread用來存放進入AQS隊列的線程.Node節(jié)點內(nèi)部waitStatus記錄當前線程等待狀態(tài):CANCELLED(1,線程被取消),SIGNAL(-1,線程需要被喚醒),CONDITION(-2,線程在條件隊列中等待),PROPAGATE(-3,釋放共享資源時需要通知其他節(jié)點).==
? AQS中維持了一個單一的狀態(tài)信息state,可以通過CAS操作修改其值,并且它有get()和set()方法.
? 對于AQS來說,線程同步的關鍵對狀態(tài)值state進行操作,分為獨占方式(標記,如果標記狀態(tài)不正確不能進行操作)和共享方式(直接進行CAS修改,不需要判斷標記).
(1). 獨占不響應中斷模式下,獲取與釋放資源
AbstractQueuedSynchronizer源碼剖析(二)- 不響應中斷的獨占鎖
1). 獲取鎖的過程
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
private Node addWaiter(Node mode) {
//基于當前線程,節(jié)點類型(Node.EXCLUSIVE)創(chuàng)建新的節(jié)點
//由于這里是獨占模式醉蚁,因此節(jié)點類型就是Node.EXCLUSIVE
Node node = new Node(Thread.currentThread(), mode);
Node pred = tail;
//這里為了提搞性能燃辖,首先執(zhí)行一次快速入隊操作,即直接嘗試將新節(jié)點加入隊尾
if (pred != null) {
node.prev = pred;
//這里根據(jù)CAS的邏輯网棍,即使并發(fā)操作也只能有一個線程成功并返回黔龟,其余的都要執(zhí)行后面的入隊操作。即enq()方法
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
private Node enq(final Node node) {
for (;;) {
Node t = tail;
//如果隊列還沒有初始化滥玷,則進行初始化氏身,即創(chuàng)建一個空的頭節(jié)點
if (t == null) {
//同樣是CAS,只有一個線程可以初始化頭結(jié)點成功惑畴,其余的都要重復執(zhí)行循環(huán)體
if (compareAndSetHead(new Node()))
tail = head;
} else {
//新創(chuàng)建的節(jié)點指向隊列尾節(jié)點蛋欣,毫無疑問并發(fā)情況下這里會有多個新創(chuàng)建的節(jié)點指向隊列尾節(jié)點
node.prev = t;
//基于這一步的CAS,不管前一步有多少新節(jié)點都指向了尾節(jié)點如贷,這一步只有一個能真正入隊成功陷虎,其他的都必須重新執(zhí)行循環(huán)體
if (compareAndSetTail(t, node)) {
t.next = node;
//該循環(huán)體唯一退出的操作到踏,就是入隊成功(否則就要無限重試)
return t;
}
}
}
}
final boolean acquireQueued(final Node node, int arg) {
//鎖資源獲取失敗標記位
boolean failed = true;
try {
//等待線程被中斷標記位
boolean interrupted = false;
//這個循環(huán)體執(zhí)行的時機包括新節(jié)點入隊和隊列中等待節(jié)點被喚醒兩個地方
for (;;) {
//獲取當前節(jié)點的前置節(jié)點
final Node p = node.predecessor();
//如果前置節(jié)點就是頭結(jié)點,則嘗試獲取鎖資源
if (p == head && tryAcquire(arg)) {
//當前節(jié)點獲得鎖資源以后設置為頭節(jié)點尚猿,這里繼續(xù)理解我上面說的那句話
//頭結(jié)點就表示當前正占有鎖資源的節(jié)點
setHead(node);
p.next = null; //幫助GC
//表示鎖資源成功獲取窝稿,因此把failed置為false
failed = false;
//返回中斷標記,表示當前節(jié)點是被正常喚醒還是被中斷喚醒
return interrupted;
}
//如果沒有獲取鎖成功凿掂,則進入掛起邏輯
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
//最后會分析獲取鎖失敗處理邏輯
if (failed)
cancelAcquire(node);
}
}
//首先說明一下參數(shù)伴榔,node是當前線程的節(jié)點,pred是它的前置節(jié)點
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
//獲取前置節(jié)點的waitStatus
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
//如果前置節(jié)點的waitStatus是Node.SIGNAL則返回true庄萎,然后會執(zhí)行parkAndCheckInterrupt()方法進行掛起
return true;
if (ws > 0) {
//由waitStatus的幾個取值可以判斷這里表示前置節(jié)點被取消
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
//這里我們由當前節(jié)點的前置節(jié)點開始潮梯,一直向前找最近的一個沒有被取消的節(jié)點
//注,由于頭結(jié)點head是通過new Node()創(chuàng)建惨恭,它的waitStatus為0,因此這里不會出現(xiàn)空指針問題秉馏,也就是說最多就是找到頭節(jié)點上面的循環(huán)就退出了
pred.next = noparkAndCheckInterrupt()de;
} else {
//根據(jù)waitStatus的取值限定,這里waitStatus的值只能是0或者PROPAGATE脱羡,那么我們把前置節(jié)點的waitStatus設為Node.SIGNAL然后重新進入該方法進行判斷
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
- 當前線程調(diào)用acquire()申請獲取鎖資源
- 先調(diào)用tryAcquire()嘗試獲取鎖()
- 這個方法內(nèi)部由具體的鎖實現(xiàn)
- 如果失敗先調(diào)用addWaiter()將當前線程入隊到AQS隊列中
- 如果之前沒有創(chuàng)建隊列(隊尾為null),直接調(diào)用enq進行完整的入隊操作(包括初始化)
- 是一個自旋循環(huán),第一次循環(huán)創(chuàng)建一個空結(jié)點,設置為隊尾和隊首,第二次循環(huán)時將傳入的node入隊
- 如果已經(jīng)有隊尾存在了,先將node(待插入結(jié)點)的前置設置為tail,使用t指針指向tail,使用CAS將tail指針指向node,然后將實際上的隊尾t的next設置為node.
- 如果之前沒有創(chuàng)建隊列(隊尾為null),直接調(diào)用enq進行完整的入隊操作(包括初始化)
- 然后在acquireQueued()方法中等待被喚醒直到獲取到資源.
- 進入acquireQueued方法后會進入一個無限循環(huán).每一次循環(huán)都會判斷傳入的參數(shù)是不是隊首,并且嘗試一次鎖獲取
- 如果獲取成功,將自己設置為隊首,并將node.thread設置為null,保證頭結(jié)點永遠是一個不帶thread的空節(jié)點.
- 如果獲取失敗,調(diào)用shouldParkAfterFailedAcquire()判斷自己需不需要阻塞.阻塞的大前提是前繼節(jié)點是可被喚醒的(waitStatus設置為Signal),這樣才能讓自己有機會被喚醒(隊列中按順序喚醒)
- 判斷傳入的前繼節(jié)點的waitStatus是否為Signal,是的話直接返回ture
- 根據(jù)waitStatus的值分為兩種情況
- 如果waitStatus大于0:也就是前繼節(jié)點被取消了.一直向前查找一個沒有被取消的節(jié)點(waitStatus>=0)的結(jié)點(頭結(jié)點waitStatus為0或-1,因此不可能出現(xiàn)空指針),并拼接成雙向隊列(==后面的隊列都是waitStatus>=0的,在狀態(tài)表中只有被取消的線程是這個狀態(tài),所以被移除出隊列,下一次GC會被清除==),返回false
- 如果waitStatus不大于0,也就是為0,-2或-3:修改前繼節(jié)點waitStatus為-1
- 如果shouldParkAfterFailedAcquire()返回true,證明node的前繼節(jié)點可以被喚醒,立即調(diào)用parkAndCheckInterrupt()阻塞掛起node
- 如果shouldParkAfterFailedAcquire()返回false,證明在方法內(nèi)部找到了一個可以被喚醒的節(jié)點作為前繼節(jié)點.在下一次循環(huán)中進行掛起
- parkAndCheckInterrupt()方法的掛起并不會被中斷打斷,是非中斷掛起,如果掛起過程中出現(xiàn)了中斷,方法在返回時會返回true,否則返回false
- 如果上一步返回了true,證明出現(xiàn)了中斷,在下一次循環(huán)中將中斷標記返回到上一層代碼.
- 線程被喚醒之后會將返回中斷標記到這里,如果出現(xiàn)過中斷,將自身線程掛起,來彌補之前的中斷.
- 先調(diào)用tryAcquire()嘗試獲取鎖()
- 進入臨界區(qū)(線程中運行的用戶代碼)
這個過程口語化的敘述就是:
? 先判斷能否獲取鎖資源,如果不能先進行入隊操作(所有線程一起入隊,只有一個線程能夠CAS成功,其他線程自旋,初始化操作也是放在自旋的邏輯里的.)
? 然后在隊列里面執(zhí)行掛起的邏輯:首先判斷自己是否是頭結(jié)點的next節(jié)點,如果是,再嘗試一次獲取鎖.成功則不必掛起.如果不是頭結(jié)點的后繼,那么判斷當前節(jié)點的前繼節(jié)點能否被喚醒(也就是節(jié)點沒有被取消),如果可以,那么掛起,如果不能,向前尋找第一個可以被喚醒的節(jié)點,然后拼接在其后面(拋棄中間這部分不可被喚醒的節(jié)點).
2). 鎖的釋放過程
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
if (ws < 0)
//把標記為設置為0萝究,表示喚醒操作已經(jīng)開始進行,提高并發(fā)環(huán)境下性能
compareAndSetWaitStatus(node, ws, 0);
Node s = node.next;
//如果當前節(jié)點的后繼節(jié)點為null锉罐,或者已經(jīng)被取消
if (s == null || s.waitStatus > 0) {
s = null;
//注意這個循環(huán)沒有break帆竹,也就是說它是從后往前找,一直找到離當前節(jié)點最近的一個等待喚醒的節(jié)點
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
//執(zhí)行喚醒操作
if (s != null)
LockSupport.unpark(s.thread);
}
釋放鎖的邏輯并不復雜.
- 調(diào)用tryRelease()來釋放資源.(判斷當前線程是否和AQS中的線程一致,然后對state進行修改)(這里返回值是指是否已經(jīng)完全釋放資源了,可重入鎖需要將state釋放至0)
- 拿到head節(jié)點,判斷其狀態(tài),如果不為null,且waitStatus不為0(初始狀態(tài)下,為0,當這個節(jié)點有了后繼節(jié)點時,會被修改成-1,這里是為了防止只有一個頭節(jié)點的空隊列進行了釋放鎖),調(diào)用unparkSuccessor()喚醒頭結(jié)點的后繼節(jié)點的線程
- 先進行waitStatus的判斷,如果waitStatus<0,將其設置為0,避免其他線程也進入此方法,提高高并發(fā)環(huán)境下的性能.
- 找到頭結(jié)點的后繼節(jié)點,再次判斷是否為空或者其狀態(tài)值是否被取消
- 如果后繼節(jié)點為空或者狀態(tài)為被取消,從后向前找到最前面的一個等待喚醒的節(jié)點
- 執(zhí)行喚醒操作
- 返回true
? 如果喚醒的節(jié)點前有被取消的節(jié)點,那么會在喚醒后被判斷前繼節(jié)點不是頭結(jié)點,再次進行判斷前繼節(jié)點是否能被喚醒的方法調(diào)用.在這個方法調(diào)用中會將前面的被取消的節(jié)點移除,然后再次進行前繼節(jié)點是不是頭結(jié)點的判斷.這次就可以判斷成功,將此節(jié)點設置為頭結(jié)點,成功喚醒,運行其內(nèi)部用戶代碼.
整個過程就是:
? 先判斷是否能夠歸還資源(獨占鎖一般都不會失敗,可重入鎖需要判斷返回值是否大于零,因為返回值代表剩余鎖可重入次數(shù),從而決定是否傳遞地釋放后面的線程).
? 然后判斷頭結(jié)點的后繼能否可被喚醒,如果沒有,那么尋找到第一個可被喚醒的節(jié)點,進行喚醒
? 這里for循環(huán)是==從后向前==遍歷的,原因是在入隊操作時,是先將node的前繼指向tail,然后更新tail為node.后一步是cas操作,可能會失敗,并且這個過程不是原子性的,如果在這個過程中發(fā)生了從前向后遍歷隊列,就會發(fā)生找到tail,next為null,但實際上并沒有遍歷到tail節(jié)點的情況.
(2). 共享不響應中斷模式下,獲取與釋放資源
AbstractQueuedSynchronizer源碼剖析(四)- 不響應中斷的共享鎖
1). 獲取鎖的過程
public final void acquireShared(int arg) {
//嘗試獲取共享鎖脓规,返回值小于0表示獲取失敗
if (tryAcquireShared(arg) < 0)
//執(zhí)行獲取鎖失敗以后的方法
doAcquireShared(arg);
}
//參數(shù)不多說栽连,就是傳給acquireShared()的參數(shù)
private void doAcquireShared(int arg) {
//添加等待節(jié)點的方法跟獨占鎖一樣,唯一區(qū)別就是節(jié)點類型變?yōu)榱斯蚕硇颓扔撸辉儋樖? final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
//表示前面的節(jié)點已經(jīng)獲取到鎖秒紧,自己會嘗試獲取鎖
if (p == head) {
int r = tryAcquireShared(arg);
//注意上面說的, 等于0表示不用喚醒后繼節(jié)點挨下,大于0需要
if (r >= 0) {
//這里是重點熔恢,獲取到鎖以后的喚醒操作,后面詳細說
setHeadAndPropagate(node, r);
p.next = null;
//如果是因為中斷醒來則設置中斷標記位
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
//掛起邏輯跟獨占鎖一樣臭笆,不再贅述
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
//獲取失敗的取消邏輯跟獨占鎖一樣叙淌,不再贅述
if (failed)
cancelAcquire(node);
}
}
//兩個入?yún)ⅲ粋€是當前成功獲取共享鎖的節(jié)點愁铺,一個就是tryAcquireShared方法的返回值鹰霍,注意上面說的,它可能大于0也可能等于0
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head; //記錄當前頭節(jié)點
//設置新的頭節(jié)點,即把當前獲取到鎖的節(jié)點設置為頭節(jié)點
//注:這里是獲取到鎖之后的操作,不需要并發(fā)控制
setHead(node);
//這里意思有兩種情況是需要執(zhí)行喚醒操作
//1.propagate > 0 表示調(diào)用方指明了后繼節(jié)點需要被喚醒
//2.頭節(jié)點后面的節(jié)點需要被喚醒(waitStatus<0),不論是老的頭結(jié)點還是新的頭結(jié)點
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
//如果當前節(jié)點的后繼節(jié)點是共享類型或者沒有后繼節(jié)點,則進行喚醒
//這里可以理解為除非明確指明不需要喚醒(后繼等待節(jié)點是獨占類型),否則都要喚醒
if (s == null || s.isShared())
//后面詳細說
doReleaseShared();
}
}
private void setHead(Node node) {
head = node;
node.thread = null;
node.prev = null;
}
- 調(diào)用acquireShared()去獲取鎖
- 首先會嘗試調(diào)用tryAcquireShared()共享方式獲取資源,返回值負數(shù)表示失敗,0表示成功,但沒有剩余資源可用,整數(shù)表示成功且有剩余資源
- 當返回負數(shù)時,調(diào)用doAcquireShared()進行線程的入隊列掛起等待
- 將當前線程打包成Node對象,類型為共享型,添加到隊列尾,方法與獨占式相同
- 如果當前節(jié)點的前繼節(jié)點是頭結(jié)點,嘗試進行獲取資源,如果返回值不為負,意味著可以獲取到資源,調(diào)用setHeadAndPropagate()將當前節(jié)點設置為頭結(jié)點,判斷是否是因為喚醒線程來到這里,對中斷標記進行判斷,是否進行中斷補充
- setHeadAndPropagate(Node node, int propagate)的第二個參數(shù)是剛剛嘗試獲取資源得到的剩余資源數(shù),如果大于零,代表有多余資源,那么應該去喚醒下一個可被喚醒的線程
- h == null 場景未知,應該不會出現(xiàn)吧.
- h.waitStatus意味著頭結(jié)點為可喚醒狀態(tài)
- 以上的集中狀況都需要進行線程喚醒的嘗試.如果當前頭結(jié)點的前繼節(jié)點不為空且為共享型,調(diào)用doReleaseShared()對頭結(jié)點后的第一個可喚醒節(jié)點進行喚醒.
- 如果前繼節(jié)點不是頭節(jié)點,掛起,邏輯同獨占鎖
2). 釋放鎖的過程
public final boolean releaseShared(int arg) {
//嘗試釋放共享鎖
if (tryReleaseShared(arg)) {
//喚醒過程拒逮,詳情見上面分析
doReleaseShared();
return true;
}
return false;
}
private void doReleaseShared() {
for (;;) {
//喚醒操作由頭結(jié)點開始,注意這里的頭節(jié)點已經(jīng)是上面新設置的頭結(jié)點了
//其實就是喚醒上面新獲取到共享鎖的節(jié)點的后繼節(jié)點
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
//表示后繼節(jié)點需要被喚醒
if (ws == Node.SIGNAL) {
//這里需要控制并發(fā),因為入口有setHeadAndPropagate跟release兩個玷氏,避免兩次unpark
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue;
//執(zhí)行喚醒操作
unparkSuccessor(h);
}
//如果后繼節(jié)點暫時不需要喚醒,則把當前節(jié)點狀態(tài)設置為PROPAGATE確保以后可以傳遞下去
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue;
}unparkSuccessor
//如果頭結(jié)點沒有發(fā)生變化腋舌,表示設置完成盏触,退出循環(huán)
//如果頭結(jié)點發(fā)生變化,比如說其他線程獲取到了鎖块饺,為了使自己的喚醒動作可以傳遞赞辩,必須進行重試
if (h == head)
break;
}
}
- 調(diào)用releaseShared()獲取資源
- 先調(diào)用tryReleaseShared()嘗試釋放共享鎖,由具體的鎖實現(xiàn)邏輯,如果獲取到了資源,返回true.繼而調(diào)用doReleaseShared()喚醒頭結(jié)點后的可喚醒線程
- 進入一個自旋循環(huán)
- 判斷頭結(jié)點(如果從設置頭結(jié)點調(diào)用至此方法,此時頭結(jié)點是新的頭節(jié)點)是否為空,是否只有一個頭結(jié)點組成空隊列.
- 如果都不是,那么頭結(jié)點有效,然后判斷頭結(jié)點狀態(tài),如果為-1,設置為0,
- 調(diào)用unparkSuccessor()執(zhí)行喚醒操作(別的線程在21行會判斷為true,跳過,這樣就控制了并發(fā))
- unparkSuccessor()與獨占鎖相同,如果頭結(jié)點的后繼節(jié)點不能喚醒,從后向前找到最前面的一個等待喚醒的節(jié)點喚醒它
- 然后跳入27行將狀態(tài)改為共享模式下可釋放,適應在設置頭結(jié)點處的調(diào)用.
- 如果頭結(jié)點沒有變化(并發(fā)情況下,其他線程可能會改變head的引用),退出方法.
- 先調(diào)用tryReleaseShared()嘗試釋放共享鎖,由具體的鎖實現(xiàn)邏輯,如果獲取到了資源,返回true.繼而調(diào)用doReleaseShared()喚醒頭結(jié)點后的可喚醒線程
3. AQS-----條件變量的支持
? 和之前講到的notify()和wait()配合synchronized內(nèi)置鎖實現(xiàn)線程間同步一樣,AQS中的條件變量==signal()==和==await()==方法也是用來配合鎖(使用AQS實現(xiàn))來實現(xiàn)線程間同步的.
? 不同點在于,synchronized同時之能與一個共享變量的notify()或wait()方法實現(xiàn)同步,而AQS的一個鎖可以對應多個變量.
(1). 條件隊列阻塞的過程(功能類比wait()方法)
//條件隊列入口,參考上面的代碼片段
public final void await() throws InterruptedException {
//如果當前線程被中斷則直接拋出異常
if (Thread.interrupted())
throw new InterruptedException();
//把當前節(jié)點加入條件隊列
Node node = addConditionWaiter();
//釋放掉已經(jīng)獲取的獨占鎖資源
int savedState = fullyRelease(node);
int interruptMode = 0;
//如果不在同步隊列中則不斷掛起
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
//中斷處理授艰,另一種跳出循環(huán)的方式
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
//走到這里說明節(jié)點已經(jīng)條件滿足被加入到了同步隊列中或者中斷了
//這個方法很熟悉吧辨嗽?就跟獨占鎖調(diào)用同樣的獲取鎖方法,從這里可以看出條件隊列只能用于獨占鎖
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
//走到這里說明已經(jīng)成功獲取到了獨占鎖淮腾,接下來就做些收尾工作
//刪除條件隊列中被取消的節(jié)點
if (node.nextWaiter != null)
unlinkCancelledWaiters();
//根據(jù)不同模式處理中斷
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
//注:1.與同步隊列不同糟需,條件隊列頭尾指針是firstWaiter跟lastWaiter
//注:2.條件隊列是在獲取鎖之后,也就是臨界區(qū)進行操作谷朝,因此很多地方不用考慮并發(fā)
private Node addConditionWaiter() {
Node t = lastWaiter;
//如果最后一個節(jié)點被取消洲押,則刪除隊列中被取消的節(jié)點
//至于為啥是最后一個節(jié)點后面會分析
if (t != null && t.waitStatus != Node.CONDITION) {
//刪除所有被取消的節(jié)點
unlinkCancelledWaiters();
t = lastWaiter;
}
//創(chuàng)建一個類型為CONDITION的節(jié)點并加入隊列,由于在臨界區(qū)圆凰,所以這里不用并發(fā)控制
Node node = new Node(Thread.currentThread(), Node.CONDITION);
if (t == null)
firstWaiter = node;
else
t.nextWaiter = node;
lastWaiter = node;
return node;
}
//刪除取消節(jié)點的邏輯雖然長杈帐,但比較簡單,就不單獨說了专钉,就是鏈表刪除
private void unlinkCancelledWaiters() {
Node t = firstWaiter;
Node trail = null;
while (t != null) {
Node next = t.nextWaiter;
if (t.waitStatus != Node.CONDITION) {
t.nextWaiter = null;
if (trail == null)
firstWaiter = next;
else
trail.nextWaiter = next;
if (next == null)
lastWaiter = trail;
}
else
trail = t;
t = next;
}
}
//入?yún)⒕褪切聞?chuàng)建的節(jié)點挑童,即當前節(jié)點
final int fullyRelease(Node node) {
boolean failed = true;
try {
//這里這個取值要注意,獲取當前的state并釋放跃须,這從另一個角度說明必須是獨占鎖
//可以考慮下這個邏輯放在共享鎖下面會發(fā)生什么炮沐?
int savedState = getState();
//跟獨占鎖釋放鎖資源一樣,不贅述
if (release(savedState)) {
failed = false;
return savedState;
} else {
//如果這里釋放失敗回怜,則拋出異常
throw new IllegalMonitorStateException();
}
} finally {
//如果釋放鎖失敗大年,則把節(jié)點取消,由這里就能看出來上面添加節(jié)點的邏輯中只需要判斷最后一個節(jié)點是否被取消就可以了
if (failed)
node.waitStatus = Node.CANCELLED;
}
}
//判斷節(jié)點是否在同步隊列中
final boolean isOnSyncQueue(Node node) {
//快速判斷1:節(jié)點狀態(tài)或者節(jié)點沒有前置節(jié)點
//注:同步隊列是有頭節(jié)點的玉雾,而條件隊列沒有
if (node.waitStatus == Node.CONDITION || node.prev == null)
return false;
//快速判斷2:next字段只有同步隊列才會使用翔试,條件隊列中使用的是nextWaiter字段
if (node.next != null)
return true;
//上面如果無法判斷則進入復雜判斷
return findNodeFromTail(node);
}
//注意這里用的是tail,這是因為條件隊列中的節(jié)點是被加入到同步隊列尾部复旬,這樣查找更快
//從同步隊列尾節(jié)點開始向前查找當前節(jié)點垦缅,如果找到則說明在,否則不在
private boolean findNodeFromTail(Node node) {
Node t = tail;
for (;;) {
if (t == node)
return true;
if (t == null)
return false;
t = t.prev;
}
}
//這里的判斷邏輯是:
//1.如果現(xiàn)在不是中斷的驹碍,即正常被signal喚醒則返回0
//2.如果節(jié)點由中斷加入同步隊列則返回THROW_IE壁涎,由signal加入同步隊列則返回REINTERRUPT
private int checkInterruptWhileWaiting(Node node) {
return Thread.interrupted() ?
(transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
0;
}
//修改節(jié)點狀態(tài)并加入同步隊列
//該方法返回true表示節(jié)點由中斷加入同步隊列凡恍,返回false表示由signal加入同步隊列
final boolean transferAfterCancelledWait(Node node) {
//這里設置節(jié)點狀態(tài)為0,如果成功則加入同步隊列
if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
//與獨占鎖同樣的加入隊列邏輯怔球,不贅述
enq(node);
return true;
}
//如果上面設置失敗嚼酝,說明節(jié)點已經(jīng)被signal喚醒,由于signal操作會將節(jié)點加入同步隊列竟坛,我們只需自旋等待即可
while (!isOnSyncQueue(node))
Thread.yield();
return false;
}
//根據(jù)中斷時機選擇拋出異常或者設置線程中斷狀態(tài)
private void reportInterruptAfterWait(int interruptMode) throws InterruptedException {
if (interruptMode == THROW_IE)
throw new InterruptedException();
else if (interruptMode == REINTERRUPT)
//實現(xiàn)代碼為:Thread.currentThread().interrupt();
selfInterrupt();
}
- 調(diào)用await()方法將當前線程移入條件隊列,等待條件隊列喚醒
- 如果當前線程被中斷,拋出異常
- 調(diào)用addConditionWaiter()將當前線程加入條件隊列
- 如果最后一個節(jié)點被取消了,調(diào)用unlinkCancelledWaiters()清理隊列中的被取消節(jié)點
- 該方法的邏輯就是從頭遍歷隊列,如果狀態(tài)為被取消,刪除該節(jié)點
- 創(chuàng)建一個類型為條件等待的節(jié)點加入隊列尾
- 返回該節(jié)點
- 如果最后一個節(jié)點被取消了,調(diào)用unlinkCancelledWaiters()清理隊列中的被取消節(jié)點
- 調(diào)用fullyRelease()釋放掉當前線程占用的資源,這一步之前的操作都是線程安全的
- 獲取當前的state值,調(diào)用release()釋放資源(內(nèi)部邏輯之前有講),如果成功則返回并標記為成功,失敗拋出異常,
- 最后如果沒有成功,也就是釋放鎖失敗了,將節(jié)點取消(從這里可以看出來,被取消的節(jié)點一定是結(jié)尾處的,所以上面addConditionWaiter()方法判斷的是最后一個節(jié)點是否被取消,這樣保證了隊列中間不可能出現(xiàn)被取消節(jié)點)
- 循環(huán)調(diào)用isOnSyncQueue()判斷當前線程是否在同步隊列中(如果不在則相當于內(nèi)置鎖中被wait()阻塞但沒有被notify()喚醒的狀態(tài)),這里是調(diào)用通過其他線程調(diào)用signal()方法將該線程(條件)喚醒
- 先進行快速判斷,如果該節(jié)點狀態(tài)為條件等待(肯定在條件隊列中),或者前繼節(jié)點(同步隊列中)為空,證明不在同步隊列中
- 如果后繼節(jié)點(同步隊列中)不為空,則證明在同步隊列中,因為只有在隊列中的節(jié)點才有可能被添加后繼節(jié)點
- 如果上面兩個都不能判斷,調(diào)用findNodeFromTail()進入復雜判斷
- 從尾節(jié)點向前查找節(jié)點,如果找到了當前節(jié)點,證明當前節(jié)點在同步隊列中
- 如果不在同步隊列中,進行掛起.并進行中斷處理
- 然后在acquireQueued()方法中等待被喚醒直到獲取到資源.(內(nèi)部邏輯之前有描述過)
- 如果,當前node不是尾節(jié)點,說明其他線程也對條件隊列進行了操作,調(diào)用unlinkCancelledWaiters()清理隊列中被取消的節(jié)點
- 根據(jù)不同模式,判斷是否調(diào)用reportInterruptAfterWait()處理中斷
- 根據(jù)中斷時機選擇拋出異常或者中斷補償(取決于能否響應中斷)
(2). 條件隊列喚醒的過程(功能類比notify()方法)
//條件隊列喚醒入口
public final void signal() {
//如果不是獨占鎖則拋出異常崭歧,再次說明條件隊列只適用于獨占鎖
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
//如果條件隊列不為空隅很,則進行喚醒操作
Node first = firstWaiter;
if (first != null)
doSignal(first);
}
//該方法就是把一個有效節(jié)點從條件隊列中刪除并加入同步隊列
//如果失敗則會查找條件隊列上等待的下一個節(jié)點直到隊列為空
private void doSignal(Node first) {
do {
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
} while (!transferForSignal(first) &&(first = firstWaiter) != null);
}
//將節(jié)點加入同步隊列
final boolean transferForSignal(Node node) {
//修改節(jié)點狀態(tài)率碾,這里如果修改失敗只有一種可能就是該節(jié)點被取消,具體看上面await過程分析
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
//該方法很熟悉了播掷,跟獨占鎖入隊方法一樣审编,不贅述
Node p = enq(node);
//注:這里的p節(jié)點是當前節(jié)點的前置節(jié)點
int ws = p.waitStatus;
//如果前置節(jié)點被取消或者修改狀態(tài)失敗則直接喚醒當前節(jié)點
//此時當前節(jié)點已經(jīng)處于同步隊列中垒酬,喚醒會進行鎖獲取或者正確的掛起操作
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread);
return true;
}
- 調(diào)用signal()條件喚醒條件隊列中的一個線程,將該線程從條件隊列中移動到同步隊列中
- 如果不是獨占鎖,拋出異常,條件隊列只能用于獨占鎖
- 如果條件隊列不為空,調(diào)用doSignal()執(zhí)行喚醒操作
- 從傳入結(jié)點(上一步傳入的是頭結(jié)點)開始把一個有效節(jié)點從條件隊列刪除
- 這一步使用first保存頭結(jié)點,然后頭結(jié)點后移(循環(huán)會判斷頭結(jié)點為空退出循環(huán))
- 調(diào)用transferForSignal()將剛剛得到的first節(jié)點移入同步隊列(成功則退出循環(huán))
- 使用AQS修改該節(jié)點的狀態(tài),將CONDITION(條件等待)修改為0(如果修改失敗肯定是線程被取消了或者其他線程在這一步狀態(tài)為剛剛修改的0,因為條件隊列中的線程狀態(tài)只能是CONDITION或者被取消,這一步修改的不算)如果失敗,返回false
- 調(diào)用enq()將此節(jié)點入同步隊列,并返回前繼節(jié)點(邏輯不再贅述)
- 如果前繼節(jié)點被取消或者修改狀態(tài)失敗,掛起當前節(jié)點,喚醒后進入await()中的acquireQueued()部分自旋掛起等待喚醒(這個喚醒是鎖的競爭,而不是條件控制的).
- 然后會回到上層方法調(diào)用,循環(huán)判斷當前線程是否在同步隊列中.....
4. 獨占鎖ReentrantLock的原理
(1). 結(jié)構(gòu)
? ReentrantLock是可重入的獨占鎖,同時只能有一個線程可以獲取這個鎖,其他線程嘗試獲取就會被阻塞并放入AQS阻塞隊列中.
? 底層是使用AQS來實現(xiàn)的,根據(jù)參數(shù)來決定其內(nèi)部是否公平內(nèi)部類sync如果是NonfairSync類型的,就非公平,如果是FairSync,則公平.
(2). 獲取鎖
1). void lock()方法
public void lock() {
sync.lock();
}
// 非公平鎖
final void lock() {
if (compareAndSetState(0, 1))// 如果當前鎖是自由的,就直接獲取,這就是競爭鎖
setExclusiveOwnerThread(Thread.currentThread());
else// 如果鎖被占有,那么去排隊
acquire(1);
}
// 公平鎖
final void lock() {// 公平鎖,就要排隊
acquire(1);
}
// 通用
public final void acquire(int arg) {// 同之前AQS
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
// 非公平鎖
// 非公平性體現(xiàn)在如果一個線程釋放了鎖,在進行下一步喚醒隊列中的節(jié)點之前別的線程直接競爭到了鎖,那么隊列中的節(jié)點會喚醒失敗
protected final boolean tryAcquire(int acquires) {// 進行了一次調(diào)用,邏輯差不多
return nonfairTryAcquire(acquires);
}
final boolean nonfairTryAcquire(int acquires) {
//先獲取線程和鎖的狀態(tài)
final Thread current = Thread.currentThread();
int c = getState();
// 這整個if-else if可以讓沒有進行排隊直接獲取鎖的線程直接拿到鎖
if (c == 0) {// 如果鎖空閑
if (compareAndSetState(0, acquires)) {// 獲取到鎖
setExclusiveOwnerThread(current);
return true;
}
}else if (current == getExclusiveOwnerThread()) {//如果鎖不空閑,但是是自己拿到的
// 鎖重入
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
// 鎖不空閑而且不屬于自己,就只能是別人的鎖,嘗試獲取失敗
return false;
}
// 公平鎖
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
// 不同點就在這,hasQueuedPredecessors()方法中當前線程節(jié)點有前繼節(jié)點返回true,AQS隊列為空或當前線程是AQS的第一個節(jié)點返回false
// 簡單點說就是確保當前節(jié)點是頭結(jié)點后的第一個節(jié)點,也就是排到隊首的節(jié)點,保證只有隊首節(jié)點才能被喚醒
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
// 如果當前節(jié)點已經(jīng)獲取了鎖,直接重入,這就不牽扯公平性了
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
? 非公平鎖當鎖自由時,就可以直接獲取鎖,而公平鎖需要去acquire()方法內(nèi)部判斷.
? 兩種鎖調(diào)用的acquire()方法都一樣.
? tryAcquire()方法有所區(qū)別,實現(xiàn)邏輯的區(qū)別在于公平鎖在鎖空閑的情況下,獲取鎖之前確保了當前節(jié)點是頭結(jié)點后的第一個節(jié)點,從而讓喚醒的節(jié)點一定是隊列中排過隊的.
2). void lockInterruptibly()方法
? 對中斷響應的獲取鎖.邏輯都類似,調(diào)用的是AQS中可被中斷的獲取鎖方法
3). boolean tryLock()方法
? 嘗試獲取鎖,如果沒獲取到不會阻塞
public boolean tryLock() {
// 之前介紹過nonfairTryzaiAcquire(),就是單純的獲取鎖,之前的阻塞的邏輯是在acquire中
return sync.nonfairTryAcquire(1);
}
4). boolean tryLock(long timeout, TimeUnit unit)方法
? 設置了時間,如果在該時間內(nèi)沒有獲取到鎖,返回false.
? TimeUnit參數(shù)為時間粒度,直接new一個TimeUnit對象即可.
(3). 釋放鎖
1). void unlock()方法
public void unlock() {
sync.release(1);
}
public final boolean release(int arg) {
// 邏輯同AQS
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
protected final boolean tryRelease(int releases) {
// c為本次重入成功后,鎖的重入次數(shù)
int c = getState() - releases;
// 如果不是當前線程拿到鎖,拋出異常
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
// 如果這次重入之后,重入次數(shù)為零,證明鎖沒有被任何線程重入,清空鎖持有線程
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
// 設置重入次數(shù)
setState(c);
return free;
}
5. 讀寫鎖ReentrantReadWriteLock的原理
? 實際應用中,寫少讀多的情況很多,而使用ReentranLock性能太低(只有讀鎖時不需要限制其他讀鎖).ReentrantReadWriteLock應運而生,采用讀寫分離的策略,==允許多個線程可以同時獲取讀鎖==.
(1). 結(jié)構(gòu)
? ==內(nèi)部維護了一個ReadLock和一個WriteLock==,它們都是依賴Sync實現(xiàn)具體功能,而Sunc繼承自AQS.AQS中的state只維護了一個狀態(tài),而讀寫鎖有兩個鎖,所以==采用state的高16為表示讀鎖的狀態(tài),低16位表示寫鎖的狀態(tài).==
(2). 寫鎖的獲取與釋放
? 寫鎖使用WriteLock實現(xiàn).
1). void lock()
? ==寫鎖是個可重入的獨占鎖,當沒有線程獲取讀鎖且寫鎖空閑或者寫鎖屬于當前線程時,可以獲取.==
public void lock() {
sync.acquire(1);
}
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
// 此方法在WriteLock(內(nèi)部類)中實現(xiàn)
protected final boolean tryAcquire(int acquires) {
// 獲取當前線程,寫鎖的重入次數(shù)W
Thread current = Thread.currentThread();
int c = getState();
int w = exclusiveCount(c);
// 寫鎖或者讀鎖已經(jīng)被獲取
if (c != 0) {
// w==0證明寫鎖空閑,那么就是讀鎖被獲取,如果讀鎖空閑繼而判斷當前線程是否是寫鎖中的線程
if (w == 0 || current != getExclusiveOwnerThread())
// 進入這里說明讀鎖空閑,那么不能獲取寫鎖.或者寫鎖重入的不是當前線程
return false;
// 走到這證明可以獲取讀鎖沒被獲取,而且當前線程獲取了寫鎖,這一步判斷是否能繼續(xù)進行重入
if (w + exclusiveCount(acquires) > MAX_COUNT)
throw new Error("Maximum lock count exceeded");
// 設置重入值
setState(c + acquires);
return true;
}
// 第一個寫線程獲取寫鎖
// writerShouldBlock()當有別的線程也在獲取此鎖時,是否應該阻塞(分公平和非公平兩種實現(xiàn))
if (writerShouldBlock() ||
!compareAndSetState(c, c + acquires))
return false;
setExclusiveOwnerThread(current);
return true;
}
// 非公平鎖,永遠不需要判斷,不阻塞,搶就行了
final boolean writerShouldBlock() {
return false; // writers can always barge
}
//公平鎖,調(diào)用hasQueuedPredecessors()判斷是否有前繼節(jié)點,如果有說明不是隊首,繼續(xù)排隊,放棄獲取鎖
final boolean writerShouldBlock() {
return hasQueuedPredecessors();
}
2). void lockInterruptibly()
? 同樣是獲取鎖,此方法會對中斷進行響應.調(diào)用的是AQS中響應中斷的獲取鎖方法.
3). boolean tryLock()
? 嘗試獲取寫鎖,沒有獲取到不會阻塞,邏輯同上.
4). boolean tryLock(long timeout, TimeUnit unit)
? 獲取鎖失敗后掛起指定時間,如果時間到了之后還是沒有獲取到鎖,返回false
5). void unlock()
public void unlock() {
sync.release(1);
}
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
// 上面都是AQS的內(nèi)容,下面的方法是具體的鎖實現(xiàn)的
protected final boolean tryRelease(int releases) {
//判斷是否是寫鎖的擁有者調(diào)用的unlock
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
// 是當前線程獲取的寫鎖
// 獲取可重入值,寫鎖是低16位,可以直接加減.如果寫鎖重入次數(shù)為0,則鎖應修改為空閑狀態(tài)
int nextc = getState() - releases;
boolean free = exclusiveCount(nextc) == 0;
if (free)
setExclusiveOwnerThread(null);
setState(nextc);
return free;
}
(3). 讀鎖的獲取與釋放
? 讀鎖是使用ReadLock實現(xiàn)的.
1). void lock()
? ==獲取讀鎖,如果寫鎖是空閑的,其他線程都可以獲取讀鎖.讀鎖是共享的==
public void lock() {
sync.acquireShared(1);
}
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
// 具體的鎖的實現(xiàn)
protected final int tryAcquireShared(int unused) {
// 獲取當前線程,讀鎖標記
Thread current = Thread.currentThread();
int c = getState();
// 判斷寫鎖是否被占用,且占用者不是自己(一個線程擁有了寫鎖,也是可以擁有讀鎖的,加鎖主要是避免不同線程之間的不同步,在一個線程內(nèi)一定是安全的)
if (exclusiveCount(c) != 0 &&
getExclusiveOwnerThread() != current)
return -1;
// 獲取讀鎖的計數(shù)
int r = sharedCount(c);
// 嘗試獲取鎖,只有一個線程可以成功
if (!readerShouldBlock() &&
r < MAX_COUNT &&
compareAndSetState(c, c + SHARxianchengED_UNIT)) {
// 獲取之前讀鎖是空閑的,也就是說,這個線程是第一個獲取讀鎖的線程
if (r == 0) {
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {// 此線程是之前第一個獲取鎖的線程
firstReaderHoldCount++;
} else {// 如果不是第一個獲取多鎖的線程,將該線程持有鎖的次數(shù)信息景描,放入線程本地變量中秀撇,方便在整個請求上下文(請求鎖呵燕、釋放鎖等過程中)使用持有鎖次數(shù)信息。
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
cachedHoldCounter = rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
}
return 1;
}
// 沒有獲取成功的線程進入fullTryAcquireShared,邏輯與tryAcquireShared()類似,但是是自旋的
return fullTryAcquireShared(current);
}
final int fullTryAcquireShared(Thread current) {
HoldCounter rh = null;
for (;;) {
int c = getState();
// 寫鎖被占用
if (exclusiveCount(c) != 0) {
// 當前線程沒有占有讀鎖
if (getExclusiveOwnerThread() != current)
return -1;
}
// 競爭時應不應該掛起自己
else if (readerShouldBlock()) {
// 此線程是第一個得到讀鎖的線程
if (firstReader == current) {
// doNothing
} else {
if (rh == null) {
rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current)) {
rh = readHolds.get();
if (rh.count == 0)
readHolds.remove();
}
}
if (rh.count == 0)
return -1;
}
}
// 是否達到最大共享值
if (sharedCount(c) == MAX_COUNT)
throw new Error("Maximum lock count exceeded");
// 獲取鎖
if (compareAndSetState(c, c + SHARED_UNIT)) {
// 當前鎖空閑
if (sharedCount(c) == 0) {
firstReader = current;
firstReaderHoldCount = 1;
}
// 如果firstReader是當前線程夜矗,或者當前線程的cachedHoldCounter變量的count不為0(表示當前線程已經(jīng)持有了該共享鎖)让虐,均說明當前線程已經(jīng)持有共享鎖,此次獲取共享鎖是重入,這也是允許的麸俘,可以通過判斷惧笛。
// 此處將重入次數(shù)分為fistReader的重入次數(shù)和其他所有線程的重入次數(shù)之和
else if (firstReader == current) {
firstReaderHoldCount++;
} else {
if (rh == null)
rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
cachedHoldCounter = rh; // cache for release
}
return 1;
}
}
}
2). void lockInterruptibly()
? 可以對中斷進行響應的獲取鎖.
3). boolean tryLock()
? 嘗試獲取鎖,沒有成功不會阻塞.
4). boolean tryLock(long timeout, TimeUnit unit)
? 如果超時時間內(nèi)(掛起了)還沒有獲取鎖,返回false.
5). void unlock()
public void unlock() {
sync.releaseShared(1);
}
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
// 具體鎖實現(xiàn)的邏輯
protected final boolean tryReleaseShared(int unused) {
Thread current = Thread.currentThread();
// 對第一個獲取鎖的線程的重入次數(shù)進行更新
if (firstReader == current) {
if (firstReaderHoldCount == 1)
firstReader = null;
else
firstReaderHoldCount--;
}
// 其他線程重入次數(shù)的更新
else {
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
rh = readHolds.get();
int count = rh.count;
if (count <= 1) {
readHolds.remove();
if (count <= 0)
throw unmatchedUnlockException();
}
--rh.count;
}
// 自旋的釋放鎖
for (;;) {
int c = getState();
int nextc = c - SHARED_UNIT;
// CAS成功則說明當前線程沒有被其他線程打擾
if (compareAndSetState(c, nextc))
// 返回值為讀鎖是否空閑
return nextc == 0;
}
}
6. JDK 8 中新增的StampedLock鎖探究
(1). 概述
? StampedLock鎖是并發(fā)包中JDK 8版本新增的一個鎖,提供了三種模式的讀寫控制.當調(diào)用try系列方法嘗試獲取鎖時,會返回一個long型的變量stamp(戳記).這個戳記代表了鎖的狀態(tài),可以理解為樂觀鎖中的版本.釋放鎖的轉(zhuǎn)換鎖時都需要提供這個標記來判斷權限.
特點:
- 所有獲取鎖的方法都會返回一個stamp戳記,0為失敗,其他為獲取成功
- 所有釋放鎖的方法都需要一個stamp戳記,必須和之前得到鎖時的stamp一致
- 不可重入
- 有三種訪問模式
- 樂觀讀
- 悲觀讀
- 寫鎖
- 寫鎖可以降級為讀鎖
- 都不支持條件隊列
/** 等待鏈表隊列拜效,每一個WNode標識一個等待線程 */
static final class WNode {
volatile WNode prev;
volatile WNode next;
volatile WNode cowait; // 讀模式使用該節(jié)點形成棧
volatile Thread thread; // non-null while possibly parked
volatile int status; // 0, WAITING, or CANCELLED
final int mode; // RMODE or WMODE
WNode(int m, WNode p) { mode = m; prev = p; }
}
/** 鎖隊列狀態(tài)各谚, 當處于寫模式時第8位為1,讀模式時前7為為1-126(讀鎖是共享鎖,附加的readerOverflow用于當讀者超過126時) */
private transient volatile long state;
/** 將state超過 RFULL=126的值放到readerOverflow字段中 */
private transient int readerOverflow;
(2). 寫鎖writeLock
? 獨占鎖,不可重入.請求該鎖成功后會返回一個stamp戳記變量來表示該鎖的版本.
- writeLock():當完全沒有加鎖時赴穗,繞過acquireWrite,否則調(diào)用acquireWrite入隊列獲取鎖資源,
- acquireWrite():入隊自旋,并放到隊列尾部,如果隊列中只剩下一個結(jié)點,則在隊頭進一步自旋,最后會進入阻塞
- unlockWrite():如果鎖的狀態(tài)與stamp相同,調(diào)用release()釋放鎖
- release():喚醒傳入節(jié)點的后繼節(jié)點
(3). 悲觀讀readLock
? 共享鎖,不可重入
- readLock():(隊列為空&&沒有寫鎖同時讀鎖數(shù)小于126&&CAS修改狀態(tài)成功)則狀態(tài)加1并返回,否則調(diào)用acquireRead()自旋獲取讀鎖
- acquireRead():首先是入隊自旋般眉,如果隊尾不是讀模式則放到隊列尾部潜支,如果是讀模式冗酿,則放到隊尾的cowait中。如果隊列中只剩下一個結(jié)點鸠窗,則在隊頭進一步自旋.如果最終依然失敗胯究,則Unsafe().park()掛起當前線程。
- unlockRead():如果state匹配stamp,判斷當前的共享次數(shù),修改state或者readerOverflow
(4). 樂觀讀OptimisticRead()
? 在操作數(shù)據(jù)前并沒有通過 CAS 設置鎖的狀態(tài)臣嚣,僅僅是通過位運算測試
? 如果當前沒有線程持有寫鎖硅则,則簡單的返回一個非 0 的 stamp 版本信息,獲取該 stamp 后在具體操作數(shù)據(jù)前還需要調(diào)用 validate 驗證下該 stamp 是否已經(jīng)不可用暑认,也就是看當調(diào)用 tryOptimisticRead 返回 stamp 后大审,到當前時間是否有其它線程持有了寫鎖徒扶,如果是那么 validate 會返回 0,否則就可以使用該 stamp 版本的鎖對數(shù)據(jù)進行操作导坟。
? 由于 tryOptimisticRead 并沒有使用 CAS 設置鎖狀態(tài)圈澈,所以不需要顯示的釋放該鎖康栈。該鎖的一個特點是適用于讀多寫少的場景,因為獲取讀鎖只是使用位操作進行檢驗漾狼,不涉及 CAS 操作饥臂,所以效率會高很多隅熙,但是同時由于沒有使用真正的鎖,在保證數(shù)據(jù)一致性上需要拷貝一份要操作的變量到方法棧酵熙,并且在操作數(shù)據(jù)時候可能其它寫線程已經(jīng)修改了數(shù)據(jù)匾二,而我們操作的是方法棧里面的數(shù)據(jù),也就是一個快照皮璧,所以最多返回的不是最新的數(shù)據(jù)悴务,但是一致性還是得到保障的譬猫。
/**
* 獲取樂觀讀鎖染服,返回郵票stamp
*/
public long tryOptimisticRead() {
long s; //有寫鎖返回0. 否則返回256
return (((s = state) & WBIT) == 0L) ? (s & SBITS) : 0L;
}
/**
* 驗證從調(diào)用tryOptimisticRead開始到現(xiàn)在這段時間內(nèi)有無寫鎖占用過鎖資源,有寫鎖獲得過鎖資源則返回false. stamp為0返回false.
* @return 從返回stamp開始蕉拢,沒有寫鎖獲得過鎖資源返回true,否則返回false
*/
public boolean validate(long stamp) {
//強制讀取操作和驗證操作在一些情況下的內(nèi)存排序問題
U.loadFence();
//當持有寫鎖后再釋放寫鎖午乓,該校驗也不成立益愈,返回false
return (stamp & SBITS) == (state & SBITS);
}
(5). 鎖轉(zhuǎn)換
/**
* state匹配stamp時, 執(zhí)行下列操作之一.
* 1、stamp 已經(jīng)持有寫鎖敏释,直接返回.
* 2钥顽、讀模式靠汁,但是沒有更多的讀取者蝶怔,并返回一個寫鎖stamp.
* 3、有一個樂觀讀鎖澳叉,只在即時可用的前提下返回一個寫鎖stamp
* 4耳高、其他情況都返回0
*/
public long tryConvertToWriteLock(long stamp) {
long a = stamp & ABITS, m, s, next;
//state匹配stamp
while (((s = state) & SBITS) == (stamp & SBITS)) {
//沒有鎖
if ((m = s & ABITS) == 0L) {
if (a != 0L)
break;
//CAS修改狀態(tài)為持有寫鎖,并返回
if (U.compareAndSwapLong(this, STATE, s, next = s + WBIT))
return next;
}
//持有寫鎖
else if (m == WBIT) {
if (a != m)
//其他線程持有寫鎖
break;
//當前線程已經(jīng)持有寫鎖
return stamp;
}
//有一個讀鎖
else if (m == RUNIT && a != 0L) {
//釋放讀鎖概荷,并嘗試持有寫鎖
if (U.compareAndSwapLong(this, STATE, s, next = s - RUNIT + WBIT))
return next;
}else
break;
}
return 0L;
}
/**
* state匹配stamp時, 執(zhí)行下列操作之一.
1误证、stamp 表示持有寫鎖愈捅,釋放寫鎖慈鸠,并持有讀鎖
2 stamp 表示持有讀鎖 青团,返回該讀鎖
3 有一個樂觀讀鎖,只在即時可用的前提下返回一個讀鎖stamp
4芦昔、其他情況都返回0娃肿,表示失敗
*
*/
public long tryConvertToReadLock(long stamp) {
long a = stamp & ABITS, m, s, next; WNode h;
//state匹配stamp
while (((s = state) & SBITS) == (stamp & SBITS)) {
//沒有鎖
if ((m = s & ABITS) == 0L) {
if (a != 0L)
break;
else if (m < RFULL) {
if (U.compareAndSwapLong(this, STATE, s, next = s + RUNIT))
return next;
}
else if ((next = tryIncReaderOverflow(s)) != 0L)
return next;
}
//寫鎖
else if (m == WBIT) {
//非當前線程持有寫鎖
if (a != m)
break;
//釋放寫鎖持有讀鎖
state = next = s + (WBIT + RUNIT);
if ((h = whead) != null && h.status != 0)
release(h);
return next;
}
//持有讀鎖
else if (a != 0L && a < WBIT)
return stamp;
else
break;
}
return 0L;
}
(6). 使用樂觀讀鎖
使用樂觀讀要保證以下順序:
// 獲取版本信息(樂觀鎖)
long stamp = lock.tryOptimisticRead();
// 復制變量到本地堆棧
copyVaraibale2ThreadMemory();
// 校驗,如果校驗失敗,說明此處樂觀鎖使用失敗,申請悲觀讀鎖
if(!lock.validate(stamp)){
// 申請悲觀讀鎖
long stamp = lock.readLock();
try{
// 復制變量到本地堆棧
copyVaraibale2ThreadMemory();
}finally{
// 使用完之后釋放鎖
lock.unlock();
}
}