java并發(fā)編程實(shí)戰(zhàn)三之Lock原理
java locks包的核心類(lèi)是AQS(AbstractQueuedSynchronizer), AQS的核心實(shí)現(xiàn)其實(shí)是一個(gè)自旋鎖
1. 自旋鎖(Spin Lock)
自旋鎖是指當(dāng)一個(gè)線程嘗試獲取某個(gè)鎖時(shí)窿撬,如果該鎖已被其他線程占用欣尼,就一直循環(huán)檢測(cè)鎖是否被釋放翎卓,而不是進(jìn)入線程掛起或睡眠狀態(tài)
自旋鎖適用于鎖保護(hù)的臨界區(qū)很小的情況青团,臨界區(qū)很小的話(huà),鎖占用的時(shí)間就很短
1.1 簡(jiǎn)單自旋鎖
下面的代碼是自旋鎖的一種簡(jiǎn)單實(shí)現(xiàn)
它有很多的缺陷罚攀,你可以想到多少缺點(diǎn)言蛇??兽狭?
public class SpinLock {
/**
* 使用原子類(lèi)來(lái)標(biāo)識(shí)線程是否獲取到了鎖
*/
private AtomicReference<Thread> owner = new AtomicReference<Thread>();
public void lock() {
Thread currentThread = Thread.currentThread();
//如果鎖未被占用憾股,則設(shè)置當(dāng)前線程為鎖的擁有者
while (!owner.compareAndSet(null, currentThread)){
}
}
public void unlock(){
Thread currentThread = Thread.currentThread();
// 只有鎖的擁有者才能釋放鎖
owner.compareAndSet(currentThread, null);
}
}
1.2 Ticket Lock
為了解決公平性問(wèn)題鹿蜀,我們模仿銀行排隊(duì)的方式箕慧,設(shè)計(jì)了另一種公平鎖
Ticket Lock 雖然解決了公平性的問(wèn)題,但是多處理器系統(tǒng)上茴恰,每個(gè)進(jìn)程/線程占用的處理器都在讀寫(xiě)同一個(gè)變量serviceNum 颠焦,每次讀寫(xiě)操作都必須在多個(gè)處理器緩存之間進(jìn)行緩存同步,這會(huì)導(dǎo)致繁重的系統(tǒng)總線和內(nèi)存的流量往枣,大大降低系統(tǒng)整體的性能
public class TicketLock {
/**
* 服務(wù)號(hào)
*/
private AtomicInteger serviceNum = new AtomicInteger();
/**
* 排隊(duì)號(hào)
*/
private AtomicInteger ticketNum = new AtomicInteger();
public int lock(){
//首先原子性地獲得一個(gè)排隊(duì)號(hào)
int myTicketNum = ticketNum.getAndIncrement();
//只要當(dāng)前服務(wù)號(hào)不是自己的就不斷輪詢(xún)
while (serviceNum.get() != myTicketNum){
}
return myTicketNum;
}
public void unlock(int myTicket){
int next = myTicket + 1;
serviceNum.compareAndSet(myTicket, next);
}
}
1.3 CLH隊(duì)列鎖
CLH鎖是一種基于鏈表的可擴(kuò)展伐庭、高性能、公平的自旋鎖分冈,申請(qǐng)線程只在本地變量上自旋圾另,它不斷輪詢(xún)前驅(qū)的狀態(tài),如果發(fā)現(xiàn)前驅(qū)釋放了鎖就結(jié)束自旋
CLH在SMP系統(tǒng)結(jié)構(gòu)下該法是非常有效的雕沉。但在NUMA系統(tǒng)結(jié)構(gòu)下集乔,每個(gè)線程有自己的內(nèi)存,如果前趨結(jié)點(diǎn)的內(nèi)存位置比較遠(yuǎn)坡椒,自旋判斷前趨結(jié)點(diǎn)的locked域扰路,性能將大打折扣
當(dāng)一個(gè)線程需要獲取鎖時(shí):
a. 創(chuàng)建一個(gè)的QNode,將其中的locked設(shè)置為true表示需要獲取鎖
b. 線程對(duì)tail域調(diào)用getAndSet方法倔叼,使自己成為隊(duì)列的尾部汗唱,同時(shí)獲取一個(gè)指向其前趨結(jié)點(diǎn)的引用myPred
c. 該線程就在前趨結(jié)點(diǎn)的locked字段上旋轉(zhuǎn),直到前趨結(jié)點(diǎn)釋放鎖
d. 當(dāng)一個(gè)線程需要釋放鎖時(shí)丈攒,將當(dāng)前結(jié)點(diǎn)的locked域設(shè)置為false哩罪,同時(shí)回收前趨結(jié)點(diǎn)
public class CLHLock {
/**
* tail 指向最后線程的節(jié)點(diǎn)
* current 每個(gè)線程的節(jié)點(diǎn)
*/
private final AtomicReference<CLHNode> tail = new AtomicReference<>(new CLHNode());
private final ThreadLocal<CLHNode> current;
public CLHLock() {
current =ThreadLocal.withInitial(CLHNode::new);
}
public void lock(){
CLHNode own = current.get();
own.locked = true;
CLHNode preNode = tail.getAndSet(own);
//輪詢(xún)前驅(qū)節(jié)點(diǎn)
while (preNode.locked){
}
}
public void unlock(){
//當(dāng)前線程節(jié)點(diǎn)上釋放
current.get().locked = false;
}
private static class CLHNode{
private volatile boolean locked = false;
}
}
1.4 MCS鎖
為了解決NUMA系統(tǒng)結(jié)構(gòu)下CLH自選出現(xiàn)的性能問(wèn)題,MCS隊(duì)列鎖應(yīng)運(yùn)而生
MSC與CLH最大的不同并不是鏈表是顯示還是隱式巡验,而是線程自旋的規(guī)則不同:CLH是在前趨結(jié)點(diǎn)的locked域上自旋等待识椰,而MSC是在自己的結(jié)點(diǎn)的locked域上自旋等待。正因?yàn)槿绱松罴睿鉀Q了CLH在NUMA系統(tǒng)架構(gòu)中獲取locked域狀態(tài)內(nèi)存過(guò)遠(yuǎn)的問(wèn)題腹鹉。
a. 隊(duì)列初始化時(shí)沒(méi)有結(jié)點(diǎn),tail=null
b. 線程A想要獲取鎖敷硅,于是將自己置于隊(duì)尾功咒,由于它是第一個(gè)結(jié)點(diǎn),它的locked域?yàn)閒alse
c. 線程B和C相繼加入隊(duì)列绞蹦,a->next=b,b->next=c力奋。且B和C現(xiàn)在沒(méi)有獲取鎖,處于等待狀態(tài)幽七,所以它們的locked域?yàn)閠rue景殷,
尾指針指向線程C對(duì)應(yīng)的結(jié)點(diǎn)
d. 線程A釋放鎖后,順著它的next指針找到了線程B,并把B的locked域設(shè)置為false猿挚。這一動(dòng)作會(huì)觸發(fā)線程B獲取鎖
public class MCSLock {
/**
* 隊(duì)列初始化時(shí)沒(méi)有結(jié)點(diǎn)咐旧,tail指向null
*
*/
private final AtomicReference<MCSNode> tail = new AtomicReference<>(null);
private ThreadLocal<MCSNode> current;
public MCSLock() {
current = ThreadLocal.withInitial(MCSNode::new);
}
public void lock(){
// 線程A想要獲取鎖,于是將自己置于隊(duì)尾绩蜻,由于它是第一個(gè)結(jié)點(diǎn)铣墨,它的locked域?yàn)閒alse
MCSNode own = current.get();
MCSNode preNode = tail.getAndSet(own);
// 線程B和C相繼加入隊(duì)列,a->next=b,b->next=c。
// 且B和C現(xiàn)在沒(méi)有獲取鎖,處于等待狀態(tài),所以它們的locked域?yàn)閠rue,尾指針指向線程C對(duì)應(yīng)的結(jié)點(diǎn)
if(preNode != null){
own.locked = true;
preNode.next = own;
// 在自己的結(jié)點(diǎn)的locked域上自旋等待
while (own.locked){}
}
}
public void unlock(){
MCSNode own = current.get();
if(!own.locked){
return;
}
// 最后一個(gè)獲取鎖的線程
if(own.next == null){
if(tail.compareAndSet(own, null)){
return;
}
// 過(guò)程中又有線程獲得鎖
while (own.next == null){}
}
// 線程A釋放鎖后办绝,順著它的next指針找到了線程B,并把B的locked域設(shè)置為false.這一動(dòng)作會(huì)觸發(fā)線程B獲取鎖
own.next.locked = false;
own.next = null;
}
private static class MCSNode{
private MCSNode next;
private volatile boolean locked = false;
}
}
2. Unsafe和LockSupport
2.1 Unsafe類(lèi)簡(jiǎn)介
首先伊约,我們的代碼中不應(yīng)該使用這個(gè)類(lèi),雖然可以通過(guò)反射的方式獲取到它的示例并使用
個(gè)人覺(jué)得孕蝉,我們也不需要詳細(xì)了解這個(gè)類(lèi)的使用
簡(jiǎn)單了解:
- put,get方法
// l offset變量相對(duì)偏移量屡律,可用objectFieldOffset(java.lang.reflect.Field field)獲取
putObject(java.lang.Object o, long l, java.lang.Object o1)
getObject(java.lang.Object o, long l) - 變量偏移量
objectFieldOffset(java.lang.reflect.Field field)
staticFieldOffset(java.lang.reflect.Field field) - 內(nèi)存管理
allocateMemory(long l)
reallocateMemory(long l, long l1)
setMemory(java.lang.Object o, long l, long l1, byte b)
copyMemory(java.lang.Object o, long l, java.lang.Object o1, long l1, long l2)
freeMemory(long l) - CAS操作
compareAndSwapObject(java.lang.Object o, long l, java.lang.Object o1, java.lang.Object o2) - 實(shí)例化
allocateInstance(java.lang.Class<?> aClass) - 數(shù)組
arrayIndexScale(java.lang.Class<?> aClass)
arrayBaseOffset(java.lang.Class<?> aClass) - 阻塞和喚醒
park(boolean b, long l)
unpark(java.lang.Object o)
2.2 LockSupport
- 喚醒線程或者歸還令牌
unpark(Thread thread) - 阻塞線程,附加額外信息
park(Object blocker) - 阻塞線程一段時(shí)間降淮,附加額外信息
parkNanos(Object blocker, long nanos) - 阻塞線程直到某時(shí)間疹尾,附加額外信息
parkUntil(Object blocker, long deadline) - 阻塞線程
park() - 阻塞線程一段時(shí)間
阻塞線程一段時(shí)間 - 阻塞線程直到某時(shí)間
parkUntil(long deadline) - Thread添加額外信息
setBlocker(Thread t, Object arg)
getBlocker(Thread t)
這里獲取的許可永遠(yuǎn)只有一個(gè),與底層c++實(shí)現(xiàn)有關(guān)骤肛。HotSpot里Parker有一個(gè)私有_counter變量纳本,無(wú)論調(diào)用多少次unpark,_counter都被設(shè)為1
3. AbstractQueuedSynchronizer
AQS改進(jìn)了CLH隊(duì)列自旋鎖,結(jié)合了自旋和睡眠/喚醒兩種方法的優(yōu)點(diǎn)
3.1 雙端鏈表Node
static final class Node {
/** 標(biāo)記當(dāng)前結(jié)點(diǎn)是共享模式 */
static final Node SHARED = new Node();
/** 標(biāo)記當(dāng)前結(jié)點(diǎn)是獨(dú)占模式 */
static final Node EXCLUSIVE = null;
/**
* 結(jié)點(diǎn)的等待狀態(tài) 可能的取值包含:CANCELLED腋颠、SIGNAL繁成、CONDITION、PROPAGATE和0
* 在同步等待隊(duì)列中的節(jié)點(diǎn)初始值為0,在條件等待隊(duì)列中的節(jié)點(diǎn)初始值為CONDITION
* 在獨(dú)占模式下淑玫,取值為CANCELLED印颤、SIGNAL吻谋、0中之一
* 在共享模式下,取值為CANCELLED、SIGNAL掂碱、PROPAGATE和0中之一
* 在條件等待隊(duì)列中绵咱,取值為CANCELLED砌梆、CONDITION中之一
*/
volatile int waitStatus;
/** 擁有當(dāng)前結(jié)點(diǎn)的線程 */
volatile Thread thread;
/** 線程已經(jīng)被取消 */
static final int CANCELLED = 1;
/**
* 后續(xù)節(jié)點(diǎn)需要喚醒
* 節(jié)點(diǎn)插入隊(duì)列時(shí)濒憋,節(jié)點(diǎn)代表的線程睡眠前會(huì)將前一個(gè)節(jié)點(diǎn)的waitStatus置為SIGNAL
* 當(dāng)前一個(gè)節(jié)點(diǎn)釋放鎖時(shí),如果其waitStatus置為SIGNAL但壮,則會(huì)喚醒其后下一個(gè)節(jié)點(diǎn)線程
*/
static final int SIGNAL = -1;
/** 表示節(jié)點(diǎn)代表的線程正處于條件等待隊(duì)列中等待signal信號(hào) */
static final int CONDITION = -2;
/** 在共享模式下使用,表示同步狀態(tài)能夠無(wú)條件向后傳播 */
static final int PROPAGATE = -3;
volatile Node prev;
volatile Node next;
/**
* 在條件等待隊(duì)列中冀泻,用于指向下一個(gè)節(jié)點(diǎn)
* 在同步等待隊(duì)列中,用于標(biāo)記該節(jié)點(diǎn)所代表的線程在獨(dú)占模式下還是共享模式下獲取鎖
*/
Node nextWaiter;
public Node() {
}
/**
*
* @param thread 一般為當(dāng)前線程
* @param mode 排他EXCLUSIVE 共享SHARED
*/
public Node(Thread thread, Node mode) {
this.thread = thread;
this.nextWaiter = mode;
}
final boolean isShared() {
return nextWaiter == SHARED;
}
final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null) {
throw new NullPointerException();
} else {
return p;
}
}
}
3.2 同步等待隊(duì)列
waitStatus在同步等待隊(duì)列中不同的模式下蜡饵,有不同的取值:
- 在獨(dú)占模式下弹渔,取值為CANCELLED、SIGNAL溯祸、0中之一
- 在共享模式下肢专,取值為CANCELLED舞肆、SIGNAL、PROPAGATE和0中之一
nextWaiter
- 在同步等待隊(duì)列中博杖,用于標(biāo)記該節(jié)點(diǎn)所代表的線程在獨(dú)占模式下還是共享模式下獲取鎖
3.3 獨(dú)占模式下鎖獲取
這里我們只細(xì)講acquire方法
public final void acquire(int arg) {
if (!tryAcquire(arg) && //1
acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) //2 -> 3
selfInterrupt(); //4
}
- 首先調(diào)用tryAcquire函數(shù)椿胯,線程嘗試獲取鎖,如果獲取成功則直接返回
- 如果獲取失敗欧募,則調(diào)用addWaiter函數(shù),將線程封裝成一個(gè)Node節(jié)點(diǎn)并插入同步等待隊(duì)列尾部仆抵;Node.EXCLUSIVE代表獨(dú)占模式
/**
* 此處在尾部插入node時(shí)跟继,先設(shè)置node的prev,再CAS修改隊(duì)列tail指向镣丑,修改成功再設(shè)置前一個(gè)節(jié)點(diǎn)的next域
* 隊(duì)列中舔糖,如果某個(gè)node的prev!=null,并不一定表示node已經(jīng)成功插入隊(duì)列中,如果某個(gè)node的前一個(gè)節(jié)點(diǎn)的next!=null,則該node一定位于隊(duì)列中
* @param mode 模式
* @return
*/
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
// 雙端鏈表的add操作
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
//頭尾都為null 初始化以及enq
enq(node);
return node;
}
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) {
// tail為空,初始化head和tail節(jié)點(diǎn) head lazy initialize
if (compareAndSetHead(new Node())) {
tail = head;
}
} else {
// 雙端鏈表的基本操作
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
- 接著調(diào)用acquireQueued函數(shù)莺匠,將前驅(qū)節(jié)點(diǎn)的waitStatus標(biāo)記為SIGNAL后睡眠金吗,等待前驅(qū)節(jié)點(diǎn)釋放鎖后喚醒,被喚醒后則繼續(xù)嘗試獲取鎖
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
// 自旋 當(dāng)且僅當(dāng)前驅(qū)節(jié)點(diǎn)是head,嘗試獲取鎖
// 當(dāng)前驅(qū)節(jié)點(diǎn)等于head時(shí),說(shuō)明前驅(qū)線程為當(dāng)前正擁有鎖,或者剛剛釋放鎖并且喚醒了當(dāng)前節(jié)點(diǎn)
if (p == head && tryAcquire(arg)) {
// 設(shè)置成功獲取鎖的node節(jié)點(diǎn)為隊(duì)列頭head
setHead(node);
// help GC
p.next = null;
failed = false;
return interrupted;
}
// 如果前驅(qū)節(jié)點(diǎn)不是隊(duì)列head或者獲取鎖失敗,則設(shè)置前驅(qū)節(jié)點(diǎn)waitStatus為SIGNAL,并睡眠
// shouldParkAfterFailedAcquire 前置節(jié)點(diǎn)waitStatus設(shè)為-1 返回false
// parkAndCheckInterrupt 阻塞當(dāng)前線程 返回currentThread().isInterrupted(true)
if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) {
interrupted = true;
}
}
} finally {
if (failed) {
cancelAcquire(node);
}
}
}
protected boolean parkAndCheckInterrupt(){
LockSupport.park(this);
return Thread.interrupted();
}
private boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if(ws == Node.SIGNAL){
return true;
}
// ws = 1 CANCELLED 跳過(guò)取消的節(jié)點(diǎn)
if(ws > 0){
do {
node.prev = pred = pred.prev;
}while (pred.waitStatus > 0);
pred.next = node;
}else {
// 0 或者 PROPAGATE
compareAndSetWaitStatus(pred, ws, Node.CANCELLED);
}
return false;
}
/**
* 線程在獲取鎖過(guò)程中,可能因?yàn)槌鲥e(cuò)趣竣、被中斷或超時(shí)而取消獲取鎖
* @param node
*/
protected void cancelAcquire(Node node){
if (node == null) {
return;
}
//線程引用清空
node.thread = null;
Node pred = node.prev;
// 若前驅(qū)節(jié)點(diǎn)是CANCELLED,則跳過(guò)繼續(xù)往前找
while (pred.waitStatus > 0) {
node.prev = pred = pred.prev;
}
// 前驅(qū)節(jié)點(diǎn)中不是CANCELLED節(jié)點(diǎn),獲取pre next指向
Node predNext = pred.next;
// 設(shè)置waitStatus值為CANCELLED,標(biāo)記節(jié)點(diǎn)已取消
node.waitStatus = Node.CANCELLED;
// 如果被取消的node是尾部節(jié)點(diǎn),則設(shè)置tail指針指向前驅(qū)節(jié)點(diǎn),并且設(shè)置前驅(qū)節(jié)點(diǎn)的next指針為null
if(node == tail && compareAndSetTail(node, pred)){
compareAndSetNext(pred, predNext, null);
}else {
int ws;
if(pred != head && ((ws = pred.waitStatus) == Node.SIGNAL || (ws <= 0 && compareAndSetWaitStatus(pred,ws,
Node.SIGNAL))) && pred.thread != null){
Node next = node.next;
if(next != null && next.waitStatus <= 0){
compareAndSetNext(pred, predNext, next);
}
}else {
unparkSuccessor(node);
}
node.next = node;
}
}
/**
* 喚醒后繼節(jié)點(diǎn)
* @param node
*/
private void unparkSuccessor(Node node) {
// 在獨(dú)占模式下,waitStatus<0此時(shí)為SIGNAL,將SIGNAL標(biāo)志清除
int ws = node.waitStatus;
if (ws < 0) {
compareAndSetWaitStatus(node, ws, 0);
}
Node s = node.next;
// node.next == null || node.next.waitStatus > 0,表示該node.next線程取消了獲取鎖,則從隊(duì)列尾部遍歷,找到隊(duì)列中第一個(gè)沒(méi)有取消的線程
if(s == null || s.waitStatus > 0){
s = null;
for(Node t = tail;t != null && t != node; t = t.prev){
if(t.waitStatus <= 0){
s = t;
}
}
}
if(s != null){
LockSupport.unpark(s.thread);
}
}
- 如果線程睡眠過(guò)程中產(chǎn)生中斷摇庙,則調(diào)用selfInterrupt函數(shù)讓線程自我中斷一下,設(shè)置中斷標(biāo)志遥缕,將中斷傳遞給外層
static void selfInterrupt() {
Thread.currentThread().interrupt();
}
3.4 獨(dú)占模式釋放鎖
public final boolean release(int arg) {
if(tryRelease(arg)){
// 每個(gè)隊(duì)列中的線程在獲取到鎖以后,會(huì)將隊(duì)列中包含該線程的Node節(jié)點(diǎn)設(shè)置為head,所以head指向的Node即為當(dāng)前占有鎖的線程,也就是當(dāng)前正在進(jìn)行釋放鎖操作的線程
Node h = head;
if(h != null && h.waitStatus != 0){
// 喚醒后續(xù)的結(jié)點(diǎn)
unparkSuccessor(h);
}
return true;
}
return false;
}
/**
* 喚醒后繼節(jié)點(diǎn)
* @param node
*/
private void unparkSuccessor(Node node) {
// 在獨(dú)占模式下,waitStatus<0此時(shí)為SIGNAL,將SIGNAL標(biāo)志清除
int ws = node.waitStatus;
if (ws < 0) {
compareAndSetWaitStatus(node, ws, 0);
}
Node s = node.next;
// node.next == null || node.next.waitStatus > 0,表示該node.next線程取消了獲取鎖,則從隊(duì)列尾部遍歷,找到隊(duì)列中第一個(gè)沒(méi)有取消的線程
if(s == null || s.waitStatus > 0){
s = null;
for(Node t = tail;t != null && t != node; t = t.prev){
if(t.waitStatus <= 0){
s = t;
}
}
}
if(s != null){
LockSupport.unpark(s.thread);
}
}
3.5 共享模式獲取鎖
共享模式獲取鎖與獨(dú)占模式獲取鎖卫袒,僅有一點(diǎn)區(qū)別,就是setHead的同時(shí)會(huì)喚醒下一個(gè)shared節(jié)點(diǎn)
private void doAcquireShared(int arg){
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
for(;;){
final Node p = node.predecessor();
if(p == head){
int r = tryAcquireShared(arg);
if(r >= 0){
// 與獨(dú)占模式下獲取鎖唯一的區(qū)別就在下面這個(gè)setHeadAndPropagate函數(shù)
setHeadAndPropagate(node, r);
// help GC
p.next = null;
if (interrupted) {
selfInterrupt();
}
failed = false;
return;
}
}
if(shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()){
interrupted = true;
}
}
} finally {
if(failed){
cancelAcquire(node);
}
}
}
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head;
// 設(shè)置成功獲取鎖的node節(jié)點(diǎn)為隊(duì)列頭head
setHead(node);
/**
* propagate > 0 說(shuō)明許可還能夠繼續(xù)被線程acquire
* 之前的head == null 未知狀態(tài)
* 之前的head被設(shè)置為PROPAGAT 說(shuō)明需要往后傳遞
* 當(dāng)前head = null 未知狀態(tài)
* 當(dāng)前head被設(shè)置為PROPAGAT
*/
if(propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0){
Node s = node.next;
if(s == null || s.isShared()){
doReleaseShared();
}
}
}
private void doReleaseShared() {
for(;;){
Node h = head;
if(h != null && h != tail){
int ws = h.waitStatus;
// SIGNAL狀態(tài)直接喚醒下一個(gè)節(jié)點(diǎn)
if(ws == Node.SIGNAL){
if(!compareAndSetWaitStatus(h, Node.SIGNAL, 0)){
continue;
}
unparkSuccessor(h);
}else if(ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)){
//首節(jié)點(diǎn)的狀態(tài)是0,則通過(guò)CAS設(shè)置為PROPAGATE,表示如果連接在當(dāng)前節(jié)點(diǎn)后的node的狀態(tài)如果是shared,則無(wú)條件獲取鎖
continue;
}
}
if(h == head){
break;
}
}
}
3.6 共享模式釋放鎖
public final boolean releaseShared(int arg){
if(tryReleaseShared(arg)){
doReleaseShared();
return true;
}
return false;
}
4. Condition的實(shí)現(xiàn)原理
AQS通過(guò)一個(gè)FIFO單向鏈表來(lái)實(shí)現(xiàn)條件等待隊(duì)列
Condition接口定義了一個(gè)類(lèi)似于Object對(duì)象的監(jiān)視器的接口方法单匣。await()夕凝、 awaitNanos(long nanosTimeout)、signal()户秤、signalAll()可以對(duì)應(yīng)上Object對(duì)象的wait()码秉、wait(long timeout)、notify()鸡号、notifyAll()转砖;而且Condition的控制更加靈活精確
4.1 await()
- 新建Condition Node包裝線程,加入Condition隊(duì)列
- 釋放線程占有的鎖
- 在同步等待隊(duì)列中則睡眠等待 直到被喚醒并加入到同步隊(duì)列中
- 嘗試獲取鎖 此時(shí)節(jié)點(diǎn)已經(jīng)加入到同步隊(duì)列中
- 處理cancled節(jié)點(diǎn)
- 處理異常
/**
* 1. 新建Condition Node包裝線程,加入Condition隊(duì)列
* 2. 釋放線程占有的鎖
* 3. 在同步等待隊(duì)列中則睡眠等待 直到被喚醒并加入到同步隊(duì)列中
* 4. 嘗試獲取鎖 此時(shí)節(jié)點(diǎn)已經(jīng)加入到同步隊(duì)列中
* 5. 處理cancled節(jié)點(diǎn)
* 6. 處理異常
* @throws InterruptedException
*/
@Override
public final void await() throws InterruptedException {
if(Thread.interrupted()){
throw new InterruptedException();
}
// 新建Condition Node包裝線程,加入Condition隊(duì)列
Node node = addConditionWaiter();
// 釋放線程占有的鎖
int savedState = fullyRelease(node);
int interruptMode = 0;
// 不在同步等待隊(duì)列中則睡眠等待 直到被喚醒并加入到同步隊(duì)列中
while (!isOnSyncQueue(node)){
LockSupport.park(this);
// 檢查是否被打斷
if((interruptMode = checkInterruptWhileWaiting(node)) != 0){
break;
}
}
// 嘗試獲取鎖 異常不拋出
if(acquireQueued(node, savedState) && interruptMode != THROW_IE){
interruptMode = REINTERRUPT;
}
if(node.nextWaiter != null){
unlinkCancelledWaiters();
}
if(interruptMode != 0){
reportInterruptAfterWait(interruptMode);
}
}
/**
* 單向鏈表add 返回lastWaiter
* @return
*/
private Node addConditionWaiter() {
Node t = lastWaiter;
// lastWaiter CANCELLED 移除
if(t != null && t.waitStatus != Node.CONDITION){
unlinkCancelledWaiters();
t = lastWaiter;
}
Node node = new Node(Thread.currentThread(), Node.CONDITION);
if(t == null){
firstWaiter = node;
}else {
t.nextWaiter = node;
}
lastWaiter = node;
return node;
}
/**
* 沒(méi)有打斷 0
* interrupted before signalled -1
* interrupted after signalled 1
* @param node
* @return
*/
private int checkInterruptWhileWaiting(Node node) {
return Thread.interrupted() ? (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) : 0;
}
isOnSyncQueue是AQS的方法鲸伴,功能是判斷節(jié)點(diǎn)是否在同步隊(duì)列中
/**
* 節(jié)點(diǎn)是否在同步隊(duì)列中
* @param node
* @return
*/
private boolean isOnSyncQueue(Node node) {
// 在條件隊(duì)列中
// 或者同時(shí)別的線程調(diào)用signal,Node會(huì)從Condition隊(duì)列中移除,從移除到進(jìn)入release隊(duì)列,中間這段時(shí)間prev必然為null
if(node.waitStatus == Node.CONDITION || node.prev == null){
return false;
}
if(node.next != null){
return true;
}
// 可能該Node剛剛最后一個(gè)進(jìn)入release隊(duì)列,所以是tail,其next必然是null,所以需要從隊(duì)尾向前查找
return findNodeFromTail(node);
}
transferAfterCancelledWait喚醒前或后被打斷
/**
* 喚醒前或者后被打斷
* @param node
* @return
*/
private boolean transferAfterCancelledWait(Node node) {
// 喚醒前線程被打斷(超時(shí)等原因?qū)⒉辉俚却? 直接加入到同步隊(duì)列等待獲取鎖
if(compareAndSetWaitStatus(node, Node.CONDITION, 0)){
enq(node);
return true;
}
// 喚醒的時(shí)候被打斷 等待加入隊(duì)列(可以獲取鎖的狀態(tài))結(jié)束
while (!isOnSyncQueue(node)){
Thread.yield();
}
return false;
}
4.2 signal()
- 條件隊(duì)列中移除first節(jié)點(diǎn)
- 跳過(guò)cancled節(jié)點(diǎn),將first加入到同步隊(duì)列直到加入成功
這里并沒(méi)有喚醒該線程堪藐,而是在await()的第4步才可能喚醒該線程
/**
* 1. 條件隊(duì)列中移除first節(jié)點(diǎn)
* 2. 跳過(guò)cancled節(jié)點(diǎn),找到一個(gè)沒(méi)有取消的first放入release隊(duì)列
* @param first
*/
private void doSignal(Node first) {
do {
if((firstWaiter = first.nextWaiter) == null){
lastWaiter = null;
}
first.nextWaiter = null;
}while (!transferForSignal(first) && (first = firstWaiter) != null);
}
transferForSignal是AQS的方法
/**
* condition隊(duì)列中節(jié)點(diǎn)加入到同步隊(duì)列中
* @param first
* @return
*/
private boolean transferForSignal(Node first) {
// 無(wú)法cas waitStatus, 節(jié)點(diǎn)被取消
if(!compareAndSetWaitStatus(first, Node.CONDITION, 0)){
return false;
}
// p是該Node的前驅(qū)
Node p = enq(first);
int ws = p.waitStatus;
// 前驅(qū)節(jié)點(diǎn)cancled或者變更waitStatus失敗,直接喚醒當(dāng)前線程
if(ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)){
LockSupport.unpark(first.thread);
}
return true;
}
5. 簡(jiǎn)單注釋源碼
public abstract class MyAbstractQueuedSynchronizer {
/**
* <pre>
* +------+ prev +-----+ +-----+
* head | | <---- | | <---- | | tail
* | | ----> | | ----> | | tail
* +------+ +-----+ +-----+
* </pre>
*/
/** 雙端隊(duì)列的頭,只有setHead可以修改 */
private transient volatile Node head;
/** 端隊(duì)列的尾,只有enq可以修改 */
private transient volatile Node tail;
/** 同步狀態(tài) */
private volatile int state;
protected final int getState() {
return state;
}
/** 節(jié)點(diǎn)自旋還是阻塞的超時(shí)時(shí)間 */
static final long spinForTimeoutThreshold = 1000L;
protected MyAbstractQueuedSynchronizer() {
}
protected boolean tryAcquire(int arg){
throw new UnsupportedOperationException();
}
protected boolean tryRelease(int arg){
throw new UnsupportedOperationException();
}
protected boolean isHeldExclusively() {
throw new UnsupportedOperationException();
}
/**
* 獨(dú)占模式下線程獲取鎖的方法,該函數(shù)忽略中斷,即線程在aquire過(guò)程中,中斷此線程是無(wú)效的
* @param arg
*/
public final void acquire(int arg) {
if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) {
selfInterrupt();
}
}
/**
* 釋放鎖
* @param arg
* @return
*/
public final boolean release(int arg) {
if(tryRelease(arg)){
// 每個(gè)隊(duì)列中的線程在獲取到鎖以后,會(huì)將隊(duì)列中包含該線程的Node節(jié)點(diǎn)設(shè)置為head,所以head指向的Node即為當(dāng)前占有鎖的線程,也就是當(dāng)前正在進(jìn)行釋放鎖操作的線程
Node h = head;
if(h != null && h.waitStatus != 0){
// 喚醒后續(xù)的結(jié)點(diǎn)
unparkSuccessor(h);
}
return true;
}
return false;
}
/**
* doAcquireShared與acquireQueued僅有一點(diǎn)區(qū)別
* @param arg
*/
private void doAcquireShared(int arg){
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
for(;;){
final Node p = node.predecessor();
if(p == head){
int r = tryAcquireShared(arg);
if(r >= 0){
// 與獨(dú)占模式下獲取鎖唯一的區(qū)別就在下面這個(gè)setHeadAndPropagate函數(shù)
setHeadAndPropagate(node, r);
// help GC
p.next = null;
if (interrupted) {
selfInterrupt();
}
failed = false;
return;
}
}
if(shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()){
interrupted = true;
}
}
} finally {
if(failed){
cancelAcquire(node);
}
}
}
public final boolean releaseShared(int arg){
if(tryReleaseShared(arg)){
doReleaseShared();
return true;
}
return false;
}
/**
*
* @param node
* @param propagate
*/
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head;
// 設(shè)置成功獲取鎖的node節(jié)點(diǎn)為隊(duì)列頭head
setHead(node);
/**
* propagate > 0 說(shuō)明許可還能夠繼續(xù)被線程acquire
* 之前的head == null 未知狀態(tài)
* 之前的head被設(shè)置為PROPAGAT 說(shuō)明需要往后傳遞
* 當(dāng)前head = null 未知狀態(tài)
* 當(dāng)前head被設(shè)置為PROPAGAT
*/
if(propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0){
Node s = node.next;
if(s == null || s.isShared()){
doReleaseShared();
}
}
}
private void doReleaseShared() {
for(;;){
Node h = head;
if(h != null && h != tail){
int ws = h.waitStatus;
// SIGNAL狀態(tài)直接喚醒下一個(gè)節(jié)點(diǎn)
if(ws == Node.SIGNAL){
if(!compareAndSetWaitStatus(h, Node.SIGNAL, 0)){
continue;
}
unparkSuccessor(h);
}else if(ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)){
//首節(jié)點(diǎn)的狀態(tài)是0,則通過(guò)CAS設(shè)置為PROPAGATE,表示如果連接在當(dāng)前節(jié)點(diǎn)后的node的狀態(tài)如果是shared,則無(wú)條件獲取鎖
continue;
}
}
if(h == head){
break;
}
}
}
/**
* 子類(lèi)實(shí)現(xiàn)
* @param arg
* @return
*/
protected int tryAcquireShared(int arg){
throw new UnsupportedOperationException();
}
private boolean tryReleaseShared(int arg) {
throw new UnsupportedOperationException();
}
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
// 自旋 當(dāng)且僅當(dāng)前驅(qū)節(jié)點(diǎn)是head,嘗試獲取鎖
// 當(dāng)前驅(qū)節(jié)點(diǎn)等于head時(shí),說(shuō)明前驅(qū)線程為當(dāng)前正擁有鎖,或者剛剛釋放鎖并且喚醒了當(dāng)前節(jié)點(diǎn)
if (p == head && tryAcquire(arg)) {
// 設(shè)置成功獲取鎖的node節(jié)點(diǎn)為隊(duì)列頭head
setHead(node);
// help GC
p.next = null;
failed = false;
return interrupted;
}
// 如果前驅(qū)節(jié)點(diǎn)不是隊(duì)列head或者獲取鎖失敗,則設(shè)置前驅(qū)節(jié)點(diǎn)waitStatus為SIGNAL,并睡眠
// shouldParkAfterFailedAcquire 前置節(jié)點(diǎn)waitStatus設(shè)為-1 返回false
// parkAndCheckInterrupt 阻塞當(dāng)前線程 返回currentThread().isInterrupted(true)
if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) {
interrupted = true;
}
}
} finally {
if (failed) {
cancelAcquire(node);
}
}
}
/**
* 此處在尾部插入node時(shí),先設(shè)置node的prev挑围,再CAS修改隊(duì)列tail指向礁竞,修改成功再設(shè)置前一個(gè)節(jié)點(diǎn)的next域
* 隊(duì)列中,如果某個(gè)node的prev!=null,并不一定表示node已經(jīng)成功插入隊(duì)列中,如果某個(gè)node的前一個(gè)節(jié)點(diǎn)的next!=null,則該node一定位于隊(duì)列中
* @param mode 模式
* @return
*/
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
Node pred = tail;
if (pred != null) {
// 雙端鏈表的add操作
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
//頭尾都為null 初始化以及enq
enq(node);
return node;
}
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) {
// tail為空,初始化head和tail節(jié)點(diǎn) head lazy initialize
if (compareAndSetHead(new Node())) {
tail = head;
}
} else {
// 雙端鏈表的基本操作
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
/**
* 線程在獲取鎖過(guò)程中,可能因?yàn)槌鲥e(cuò)杉辙、被中斷或超時(shí)而取消獲取鎖
* @param node
*/
protected void cancelAcquire(Node node){
if (node == null) {
return;
}
//線程引用清空
node.thread = null;
Node pred = node.prev;
// 若前驅(qū)節(jié)點(diǎn)是CANCELLED,則跳過(guò)繼續(xù)往前找
while (pred.waitStatus > 0) {
node.prev = pred = pred.prev;
}
// 前驅(qū)節(jié)點(diǎn)中不是CANCELLED節(jié)點(diǎn),獲取pre next指向
Node predNext = pred.next;
// 設(shè)置waitStatus值為CANCELLED,標(biāo)記節(jié)點(diǎn)已取消
node.waitStatus = Node.CANCELLED;
// 如果被取消的node是尾部節(jié)點(diǎn),則設(shè)置tail指針指向前驅(qū)節(jié)點(diǎn),并且設(shè)置前驅(qū)節(jié)點(diǎn)的next指針為null
if(node == tail && compareAndSetTail(node, pred)){
compareAndSetNext(pred, predNext, null);
}else {
int ws;
if(pred != head && ((ws = pred.waitStatus) == Node.SIGNAL || (ws <= 0 && compareAndSetWaitStatus(pred,ws,
Node.SIGNAL))) && pred.thread != null){
Node next = node.next;
if(next != null && next.waitStatus <= 0){
compareAndSetNext(pred, predNext, next);
}
}else {
unparkSuccessor(node);
}
node.next = node;
}
}
/**
* 喚醒后繼節(jié)點(diǎn)
* @param node
*/
private void unparkSuccessor(Node node) {
// 在獨(dú)占模式下,waitStatus<0此時(shí)為SIGNAL,將SIGNAL標(biāo)志清除
int ws = node.waitStatus;
if (ws < 0) {
compareAndSetWaitStatus(node, ws, 0);
}
Node s = node.next;
// node.next == null || node.next.waitStatus > 0,表示該node.next線程取消了獲取鎖,則從隊(duì)列尾部遍歷,找到隊(duì)列中第一個(gè)沒(méi)有取消的線程
if(s == null || s.waitStatus > 0){
s = null;
for(Node t = tail;t != null && t != node; t = t.prev){
if(t.waitStatus <= 0){
s = t;
}
}
}
if(s != null){
LockSupport.unpark(s.thread);
}
}
protected boolean parkAndCheckInterrupt(){
LockSupport.park(this);
return Thread.interrupted();
}
private boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if(ws == Node.SIGNAL){
return true;
}
// ws = 1 CANCELLED 跳過(guò)取消的節(jié)點(diǎn)
if(ws > 0){
do {
node.prev = pred = pred.prev;
}while (pred.waitStatus > 0);
pred.next = node;
}else {
// 0 或者 PROPAGATE
compareAndSetWaitStatus(pred, ws, Node.CANCELLED);
}
return false;
}
private void setHead(Node node) {
head = node;
node.thread = null;
node.prev = null;
}
private int fullyRelease(Node node) {
boolean failed = true;
try {
int savedState = getState();
if (release(savedState)) {
failed = false;
return savedState;
} else {
throw new IllegalMonitorStateException();
}
} finally {
if(failed){
node.waitStatus = Node.CANCELLED;
}
}
}
/**
* 節(jié)點(diǎn)是否在同步隊(duì)列中
* @param node
* @return
*/
private boolean isOnSyncQueue(Node node) {
// 在條件隊(duì)列中
// 或者同時(shí)別的線程調(diào)用signal,Node會(huì)從Condition隊(duì)列中移除,從移除到進(jìn)入release隊(duì)列,中間這段時(shí)間prev必然為null
if(node.waitStatus == Node.CONDITION || node.prev == null){
return false;
}
if(node.next != null){
return true;
}
// 可能該Node剛剛最后一個(gè)進(jìn)入release隊(duì)列,所以是tail,其next必然是null,所以需要從隊(duì)尾向前查找
return findNodeFromTail(node);
}
private boolean findNodeFromTail(Node node) {
Node t = tail;
for (;;) {
if (t == node) {
return true;
}
if (t == null) {
return false;
}
t = t.prev;
}
}
/**
* 喚醒前或者后被打斷
* @param node
* @return
*/
private boolean transferAfterCancelledWait(Node node) {
// 喚醒前線程被打斷(超時(shí)等原因?qū)⒉辉俚却? 直接加入到同步隊(duì)列等待獲取鎖
if(compareAndSetWaitStatus(node, Node.CONDITION, 0)){
enq(node);
return true;
}
// 喚醒的時(shí)候被打斷 等待加入隊(duì)列(可以獲取鎖的狀態(tài))結(jié)束
while (!isOnSyncQueue(node)){
Thread.yield();
}
return false;
}
/**
* condition隊(duì)列中節(jié)點(diǎn)加入到同步隊(duì)列中
* @param first
* @return
*/
private boolean transferForSignal(Node first) {
// 無(wú)法cas waitStatus, 節(jié)點(diǎn)被取消
if(!compareAndSetWaitStatus(first, Node.CONDITION, 0)){
return false;
}
// p是該Node的前驅(qū)
Node p = enq(first);
int ws = p.waitStatus;
// 前驅(qū)節(jié)點(diǎn)cancled或者變更waitStatus失敗,直接喚醒當(dāng)前線程
if(ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)){
LockSupport.unpark(first.thread);
}
return true;
}
static void selfInterrupt() {
Thread.currentThread().interrupt();
}
/** 原子性交換state */
protected final boolean compareAndSetState(int expect, int update) {
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
/** 原子性交換head 僅enq調(diào)用 */
private final boolean compareAndSetHead(Node update) {
return unsafe.compareAndSwapObject(this, headOffset, null, update);
}
/** 原子性交換tail 僅enq調(diào)用 */
private final boolean compareAndSetTail(Node expect, Node update) {
return unsafe.compareAndSwapObject(this, tailOffset, expect, update);
}
private static final boolean compareAndSetWaitStatus(Node node,
int expect,
int update) {
return unsafe.compareAndSwapInt(node, waitStatusOffset,
expect, update);
}
private static final boolean compareAndSetNext(Node node, Node expect, Node update) {
return unsafe.compareAndSwapObject(node, nextOffset, expect, update);
}
static final class Node {
/** 標(biāo)記當(dāng)前結(jié)點(diǎn)是共享模式 */
static final Node SHARED = new Node();
/** 標(biāo)記當(dāng)前結(jié)點(diǎn)是獨(dú)占模式 */
static final Node EXCLUSIVE = null;
/**
* 結(jié)點(diǎn)的等待狀態(tài) 可能的取值包含:CANCELLED模捂、SIGNAL、CONDITION、PROPAGATE和0
* 在同步等待隊(duì)列中的節(jié)點(diǎn)初始值為0,在條件等待隊(duì)列中的節(jié)點(diǎn)初始值為CONDITION
* 在獨(dú)占模式下狂男,取值為CANCELLED综看、SIGNAL、0中之一
* 在共享模式下岖食,取值為CANCELLED红碑、SIGNAL、PROPAGATE和0中之一
* 在條件等待隊(duì)列中泡垃,取值為CANCELLED析珊、CONDITION中之一
*/
volatile int waitStatus;
/** 擁有當(dāng)前結(jié)點(diǎn)的線程 */
volatile Thread thread;
/** 線程已經(jīng)被取消 */
static final int CANCELLED = 1;
/**
* 后續(xù)節(jié)點(diǎn)需要喚醒
* 節(jié)點(diǎn)插入隊(duì)列時(shí),節(jié)點(diǎn)代表的線程睡眠前會(huì)將前一個(gè)節(jié)點(diǎn)的waitStatus置為SIGNAL
* 當(dāng)前一個(gè)節(jié)點(diǎn)釋放鎖時(shí)蔑穴,如果其waitStatus置為SIGNAL忠寻,則會(huì)喚醒其后下一個(gè)節(jié)點(diǎn)線程
*/
static final int SIGNAL = -1;
/** 表示節(jié)點(diǎn)代表的線程正處于條件等待隊(duì)列中等待signal信號(hào) */
static final int CONDITION = -2;
/** 在共享模式下使用,表示同步狀態(tài)能夠無(wú)條件向后傳播 */
static final int PROPAGATE = -3;
volatile Node prev;
volatile Node next;
/**
* 在條件等待隊(duì)列中,用于指向下一個(gè)節(jié)點(diǎn)
* 在同步等待隊(duì)列中存和,用于標(biāo)記該節(jié)點(diǎn)所代表的線程在獨(dú)占模式下還是共享模式下獲取鎖
*/
Node nextWaiter;
public Node() {
}
/**
*
* @param thread 一般為當(dāng)前線程
* @param mode 排他EXCLUSIVE 共享SHARED
*/
public Node(Thread thread, Node mode) {
this.thread = thread;
this.nextWaiter = mode;
}
public Node(Thread thread, int waitStatus) {
this.waitStatus = waitStatus;
this.thread = thread;
}
final boolean isShared() {
return nextWaiter == SHARED;
}
final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null) {
throw new NullPointerException();
} else {
return p;
}
}
}
private static final Unsafe unsafe = Unsafe.getUnsafe();
private static final long stateOffset;
private static final long headOffset;
private static final long tailOffset;
private static final long waitStatusOffset;
private static final long nextOffset;
static {
try {
stateOffset = unsafe.objectFieldOffset
(MyAbstractQueuedSynchronizer.class.getDeclaredField("state"));
headOffset = unsafe.objectFieldOffset
(MyAbstractQueuedSynchronizer.class.getDeclaredField("head"));
tailOffset = unsafe.objectFieldOffset
(MyAbstractQueuedSynchronizer.class.getDeclaredField("tail"));
waitStatusOffset = unsafe.objectFieldOffset
(Node.class.getDeclaredField("waitStatus"));
nextOffset = unsafe.objectFieldOffset
(Node.class.getDeclaredField("next"));
} catch (Exception ex) { throw new Error(ex); }
}
public class ConditionObject implements Condition, Serializable{
private transient Node firstWaiter;
private transient Node lastWaiter;
private static final int REINTERRUPT = 1;
private static final int THROW_IE = -1;
public ConditionObject() {}
/**
* 1. 新建Condition Node包裝線程,加入Condition隊(duì)列
* 2. 釋放線程占有的鎖
* 3. 在同步等待隊(duì)列中則睡眠等待 直到被喚醒并加入到同步隊(duì)列中
* 4. 嘗試獲取鎖 此時(shí)節(jié)點(diǎn)已經(jīng)加入到同步隊(duì)列中
* 5. 處理cancled節(jié)點(diǎn)
* 6. 處理異常
* @throws InterruptedException
*/
@Override
public final void await() throws InterruptedException {
if(Thread.interrupted()){
throw new InterruptedException();
}
// 新建Condition Node包裝線程,加入Condition隊(duì)列
Node node = addConditionWaiter();
// 釋放線程占有的鎖
int savedState = fullyRelease(node);
int interruptMode = 0;
// 不在同步等待隊(duì)列中則睡眠等待 直到被喚醒并加入到同步隊(duì)列中
while (!isOnSyncQueue(node)){
LockSupport.park(this);
// 檢查是否被打斷
if((interruptMode = checkInterruptWhileWaiting(node)) != 0){
break;
}
}
// 嘗試獲取鎖 異常不拋出
if(acquireQueued(node, savedState) && interruptMode != THROW_IE){
interruptMode = REINTERRUPT;
}
if(node.nextWaiter != null){
unlinkCancelledWaiters();
}
if(interruptMode != 0){
reportInterruptAfterWait(interruptMode);
}
}
@Override
public void awaitUninterruptibly() {
}
@Override
public long awaitNanos(long nanosTimeout) throws InterruptedException {
return 0;
}
@Override
public boolean await(long time, TimeUnit unit) throws InterruptedException {
return false;
}
@Override
public boolean awaitUntil(Date deadline) throws InterruptedException {
return false;
}
@Override
public void signal() {
if(!isHeldExclusively()){
throw new IllegalMonitorStateException();
}
Node first = firstWaiter;
if(first != null){
doSignal(first);
}
}
@Override
public void signalAll() {
}
/**
* 1. 條件隊(duì)列中移除first節(jié)點(diǎn)
* 2. 跳過(guò)cancled節(jié)點(diǎn),找到一個(gè)沒(méi)有取消的first放入release隊(duì)列
* @param first
*/
private void doSignal(Node first) {
do {
if((firstWaiter = first.nextWaiter) == null){
lastWaiter = null;
}
first.nextWaiter = null;
}while (!transferForSignal(first) && (first = firstWaiter) != null);
}
private void reportInterruptAfterWait(int interruptMode) throws InterruptedException {
if(interruptMode == THROW_IE){
throw new InterruptedException();
}else if (interruptMode == REINTERRUPT) {
selfInterrupt();
}
}
/**
* 沒(méi)有打斷 ∞忍辍0
* interrupted before signalled -1
* interrupted after signalled 1
* @param node
* @return
*/
private int checkInterruptWhileWaiting(Node node) {
return Thread.interrupted() ? (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) : 0;
}
/**
* 單向鏈表add 返回lastWaiter
* @return
*/
private Node addConditionWaiter() {
Node t = lastWaiter;
// lastWaiter CANCELLED 移除
if(t != null && t.waitStatus != Node.CONDITION){
unlinkCancelledWaiters();
t = lastWaiter;
}
Node node = new Node(Thread.currentThread(), Node.CONDITION);
if(t == null){
firstWaiter = node;
}else {
t.nextWaiter = node;
}
lastWaiter = node;
return node;
}
/**
* 單向鏈表去掉某個(gè)節(jié)點(diǎn)的過(guò)程
*/
private void unlinkCancelledWaiters() {
Node t = firstWaiter;
Node trail = null;
while (t != null){
Node next = t.nextWaiter;
if(t.waitStatus != Node.CONDITION){
t.nextWaiter = null;
if(trail == null){
firstWaiter = next;
}else {
trail.nextWaiter = next;
}
}else {
trail = t;
}
t = next;
}
}
}
}