簡介
條件鎖塔沃,指在獲得鎖之后阵翎,還需要達成某些條件后院领,才能繼續(xù)執(zhí)行的鎖双絮。且必須配合Lock一起使用浴麻,也就是說必須獲得鎖之后才可以調(diào)用condition.await()方法
源碼分析
ReentrantLock 的條件鎖使用的 AbstractQueuedSynchronizer
中的ConditionObject
來實現(xiàn)的,所以其實標題說的ReentrantLock 源碼分析囤攀,其實應該是AQS源碼分析之條件鎖Condition
软免,但是這里為什么還是要說成ReentrantLock 源碼分析呢?主要是AQS是一個抽象類焚挠,用戶并不能直接使用膏萧,而ReentrantLock 提供了使用條件鎖的入口,源碼如下::
final ConditionObject newCondition() {
return new ConditionObject();
}
Condition 接口
Condition 是一個接口蝌衔,定義了7個方法榛泛,分別是:
- void await() throws InterruptedException;
使當前線程等待,直到發(fā)出信號或中斷 - boolean await(long time, TimeUnit unit) throws InterruptedException;
使當前線程等待噩斟,直到它被喚醒或中斷曹锨,或指定的等待時間被終止。等價于:awaitNanos(unit.toNanos(time)) > 0 - long awaitNanos(long nanosTimeout) throws InterruptedException;
使當前線程等待剃允,直到發(fā)出信號或中斷拂檩,或過去指定的等待時間 - void awaitUninterruptibly();
使當前線程等待,直到發(fā)出信號為止 - boolean awaitUntil(Date deadline) throws InterruptedException;
使當前線程等待幻赚,直到發(fā)出信號或中斷靶橱,或過去指定的截止時間 - void signal();
喚醒一個等待的線程 - void signalAll();
喚醒所有等待的線程
總結下來,就是await营袜、signal撒顿、signalAll,所以下面我們也主要分析這三個方法荚板。
AQS.ConditionObject類
ConditionObject 是AQS是的一個內(nèi)部類凤壁,實現(xiàn)了Condition 接口,并且實現(xiàn)它的全部方法跪另,ConditionObject 也維護了一個隊列拧抖,為了和AbstractQueuedSynchronizer內(nèi)部類Node組成的隊列區(qū)分開,這里的隊列我們下面稱為等待隊列免绿,Node組成的隊列稱為同步隊列唧席,等待隊列源碼如下:
public class ConditionObject implements Condition, java.io.Serializable {
/** First node of condition queue. */
private transient Node firstWaiter;
/** Last node of condition queue. */
private transient Node lastWaiter;
}
Condition.await()方法
使當前線程等待,直到發(fā)出信號或中斷嘲驾,如果當前線程被中斷淌哟,拋出InterruptedException
源碼解析:
public final void await() throws InterruptedException {
if (Thread.interrupted())
// 如果當前線程被中斷,拋出InterruptedException
throw new InterruptedException();
// 以當前線程為節(jié)點添加到等待隊列辽故,并返回當前節(jié)點
Node node = addConditionWaiter();
// 完全釋放當前線程獲得鎖徒仓,并返回釋放前state的值
int savedState = fullyRelease(node);
// 中斷標識
int interruptMode = 0;
// 檢查當前節(jié)點的是否在同步隊列,注意前面的感嘆號誊垢,是節(jié)點不在同步隊列中掉弛,才將當前線程park
while (!isOnSyncQueue(node)) {
// 調(diào)用Unsafa類底層阻塞線程,等待喚醒自己的條件信號
LockSupport.park(this);
// 當被喚醒以后喂走,接著從下面開始執(zhí)行
// checkInterruptWhileWaiting 檢查線程是否被中斷
// 發(fā)出信號之前被中斷殃饿,返回-1,發(fā)出信號之后被中斷缴啡,返回1壁晒,沒有被中斷,返回0
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
// 再次從同步隊列獲得鎖业栅,獲取不到鎖會再次阻塞線程
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
// 判斷條件等待隊列中有沒有線程被取消秒咐,如果有,則將它們清除
unlinkCancelledWaiters();
if (interruptMode != 0)
// 發(fā)生了中斷碘裕,拋出異承。或者重新中斷當前線程
reportInterruptAfterWait(interruptMode);
}
await()方法過程總結:
- 檢查線程中斷情況,如果當前線程被中斷帮孔,拋出InterruptedException
- 以當前線程為節(jié)點添加到等待隊列雷滋,并返回當前節(jié)點
- 完全釋放當前線程獲得鎖不撑,并返回釋放前
state
的值 - 檢查當前節(jié)點的是否在同步隊列
- 不在同步隊列,調(diào)用Unsafa類底層 park 阻塞線程晤斩,等待喚醒信號
- 當被喚醒以后焕檬,再次從同步隊列獲得鎖,獲取不到鎖會再次阻塞線程
- 判斷條件等待隊列中有沒有線程被取消澳泵,如果有实愚,則將它們清除
- 如果發(fā)生了中斷,拋出異惩酶ǎ或者重新中斷當前線程
Condition.signal()方法
“喚醒”一個等待時間最長的線程腊敲,也就是等待隊列的第一個線程——firstWaiter;
源碼解析:
public final void signal() {
// 判斷是否是當前線程持有鎖维苔,不是則拋出異常
// 說明了調(diào)用這個方法之前也必須要持有鎖
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
// 等待隊列隊頭碰辅,理論上就是第一次調(diào)用await()時加入的節(jié)點線程
Node first = firstWaiter;
if (first != null)
// 發(fā)信號
doSignal(first);
}
private void doSignal(Node first) {
do {
// firstWaiter = first.nextWaiter 重新賦值等待隊列頭結點
if ( (firstWaiter = first.nextWaiter) == null)
// 等待隊列 為空
lastWaiter = null;
// 斷掉節(jié)點關系
first.nextWaiter = null;
// transferForSignal 將節(jié)點從等待隊列轉(zhuǎn)移到同步隊列
} while (!transferForSignal(first) && (first = firstWaiter) != null);
}
// node節(jié)點是等待隊列上的節(jié)點
final boolean transferForSignal(Node node) {
// 改變節(jié)點的等待狀態(tài)為0
// 0表示:當前節(jié)點在sync隊列中,等待著獲取鎖介时。-2表示當前節(jié)點在等待condition没宾,也就是在condition隊列中
// 返回false,外層的循環(huán)繼續(xù)執(zhí)行
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
// 將節(jié)點加入到同步隊列中,返回node節(jié)點的前驅(qū)結點潮尝,也就是老的尾節(jié)點
Node p = enq(node);
int ws = p.waitStatus;
// 大于0的狀態(tài)只有1榕吼,也就是取消
// 如果老的尾節(jié)點被取消 或者 更新老的尾節(jié)點為SIGNAL失敗,可以直接輪到當前節(jié)點勉失,直接喚醒當前節(jié)點的線程
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread);
// 如果老的尾節(jié)點沒有被取消 或者 更新老的尾節(jié)點為SIGNAL成功羹蚣,則返回true
// 返回true的話,外層的do循環(huán)會直接退出
// 所以這個方法最核心的邏輯知識把等待隊列的節(jié)點轉(zhuǎn)移到同步隊列中去
// 轉(zhuǎn)移到同步隊列后乱凿,signal()方法調(diào)用完成后緊接著應該是unlock()方法顽素,移動同步隊列的新節(jié)點等待被喚醒
return true;
}
signal()方法過程總結:
- 判斷是否是當前線程持有鎖在調(diào)用signal方法,不是則拋出異常(調(diào)用signal()方法之前必須調(diào)用lock()方法)
- 拿到等待隊列隊頭節(jié)點 firstWaiter徒蟆,理論上就是第一次調(diào)用await()時加入的節(jié)點胁出,節(jié)點存在則繼續(xù)調(diào)用dosignal()。
- 先使用 CAS 方式修改節(jié)點的waitStatus的值為0全蝶,表示此節(jié)點在同步隊列中
- 將節(jié)點加入到同步隊列中(
enq(node)
),返回node節(jié)點的前驅(qū)結點寺枉,也就是老的尾節(jié)點 - 同步隊列中抑淫,如果老的尾節(jié)點被取消 或者 更新老的尾節(jié)點為SIGNAL失敗
說明可以直接輪到當前節(jié)點,直接喚醒等待隊列第一個節(jié)點的線程 - 如果老的尾節(jié)點沒有被取消 或者 更新老的尾節(jié)點為SIGNAL成功姥闪,則返回true始苇,返回true的話,外層的do循環(huán)會直接退出筐喳,結束signal()方法催式。
最后如果直接返回true函喉,第5步?jīng)]有執(zhí)行,那signal()方法就沒有地方調(diào)用了unpark方法了荣月,那線程是在什么時候被喚醒的呢管呵?
signal()方法核心任務只是把等待隊列中的節(jié)點轉(zhuǎn)移到同步隊列中,signal()方法執(zhí)行完成后喉童,理論上會執(zhí)行后面的unlock()方法撇寞,tryRelease()解鎖成功會調(diào)用unparkSuccessor(node)方法,執(zhí)行LockSupport.unpark(thread)堂氯,同步隊列中的(等待)節(jié)點線程被喚醒,繼續(xù)執(zhí)行await()方法之后的代碼牌废。
Condition.signalAll()方法
signalAll 和 signal 方法很相似咽白,signal方法在doSignal的時候只是把等到隊列的第一個節(jié)點轉(zhuǎn)移到同步隊列,而signalAll是遍歷等待隊列鸟缕,把隊列中所有節(jié)點都轉(zhuǎn)移到同步隊列中去
源碼展示:
private void doSignalAll(Node first) {
lastWaiter = firstWaiter = null;
// 遍歷所有的等待隊列
do {
Node next = first.nextWaiter;
first.nextWaiter = null;
// 等待隊列轉(zhuǎn)移到同步隊列晶框,signal方法也是同樣轉(zhuǎn)移的
transferForSignal(first);
first = next;
} while (first != null);
}
經(jīng)典用例ArrayBlockingQueue:
比如最典型的阻塞隊列 ArrayBlockingQueue,當隊列中沒有元素的時候懂从,他無法take出一個元素授段,需要等待其他線程入隊一個元素后喚醒它,才能繼續(xù)彈出元素番甩。
它有三個重要的屬性侵贵,一個鎖和兩個條件,源碼如下:
final ReentrantLock lock;
private final Condition notEmpty;
private final Condition notFull;
在構造方法中初始化:
public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity <= 0)
throw new IllegalArgumentException();
this.items = new Object[capacity];
lock = new ReentrantLock(fair);
notEmpty = lock.newCondition();
notFull = lock.newCondition();
}
take() 方法:
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0)
**notEmpty.await();**
return dequeue();
} finally {
lock.unlock();
}
}
enqueue(E)方法:
private void enqueue(E x) {
// assert lock.getHoldCount() == 1;
// assert items[putIndex] == null;
final Object[] items = this.items;
items[putIndex] = x;
if (++putIndex == items.length)
putIndex = 0;
count++;
**notEmpty.signal();**
}
從上面take方法可以看出缘薛,當隊列為空時窍育,線程要等待入隊發(fā)生,而不是直接出隊返回宴胧;
當入隊方法enqueue調(diào)用時漱抓,隊列不為空,notEmpty.signal() 喚醒等待的線程恕齐。
put(E)方法:
public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == items.length)
**notFull.await();**
enqueue(e);
} finally {
lock.unlock();
}
}
插入元素的時候乞娄,如果隊列已經(jīng)滿了,線程要等待显歧,等待隊列不是滿的狀態(tài)時才可以執(zhí)行后面的入隊操作仪或;
出隊或remove等操作之后,會觸發(fā)喚醒等待的線程:
private E dequeue() {
// assert lock.getHoldCount() == 1;
// assert items[takeIndex] != null;
final Object[] items = this.items;
@SuppressWarnings("unchecked")
E x = (E) items[takeIndex];
items[takeIndex] = null;
if (++takeIndex == items.length)
takeIndex = 0;
count--;
if (itrs != null)
itrs.elementDequeued();
**notFull.signal();**
return x;
}
注意追迟,signal 和 await 要成對調(diào)用溶其,不然只調(diào)用 await 動作,線程則會一直等待敦间,除非線程被中斷瓶逃。
Condition 總結
- Reentranlock 的條件鎖是基于AQS框架中的ConditionObject來實現(xiàn)的束铭,自己一句代碼都沒有寫,都是用的它爸爸的代碼厢绝。
- 從源碼也可以看出契沫,使用條件鎖的當前線程必須持有鎖,代碼上表示也就是使用Condition.await()時必須要lock.lock()
- await() 方法首先會完全釋放當前線程獲得的鎖昔汉,然后再把當前線程的節(jié)點加入到等待隊列中懈万,然后調(diào)用Unsafa類底層 park 阻塞線程,等待被喚醒
- signal() 方法核心是就是把等待隊列中的一個節(jié)點轉(zhuǎn)移到同步隊列中靶病,不一定會馬上喚醒線程
- signalAll() 方法核心是就是把等待隊列中的所有節(jié)點轉(zhuǎn)移到同步隊列中会通,不一定會馬上喚醒線程
條件鎖使用的簡單流程總結
- A線程 獲得鎖 lock
- A線程 await
- A線程釋放鎖
- A線程加入到等待隊列
- A線程阻塞 park
- B線程 獲得鎖 lock
- B線程 signal
- B線程 把等待隊列中的A線程轉(zhuǎn)移到同步隊列
- B線程 釋放鎖 unlock
- A線程被喚醒 unpark
- A線程 繼續(xù)執(zhí)行await方法后面的代碼
- A線程釋放鎖 unlock