1.與Lock的關系
Condition在同步鎖synchronized中用的比較多。
Condition本身也是一個接口,其功能和wait/notify類似逛裤。
public interface Condition {
void await() throws InterruptedException; //等待
void awaitUninterruptibly();
long awaitNanos(long var1) throws InterruptedException;
boolean await(long var1, TimeUnit var3) throws InterruptedException; //計時等待
boolean awaitUntil(Date var1) throws InterruptedException;
void signal(); //喚醒
void signalAll(); //喚醒全部
}
wait()/notify()必須和synchronized一起使用,Condition也必須和Lock一起使用。因此绝葡,在Lock的接口中,有一個與Condition相關的接口:(所有的Condition都是從Lock中構造出來的)
public interface Lock {
void lock();
void lockInterruptibly() throws InterruptedException;
boolean tryLock();
boolean tryLock(long var1, TimeUnit var3) throws InterruptedException;
void unlock();
// 所有的Condition都是從Lock中構造出來的
Condition newCondition();
}
2.使用場景
以以ArrayBlockingQueue為例:為一個用數組實現的阻塞隊列腹鹉,執(zhí)行put(...)操作的時候藏畅,隊列滿了,生產者線程被阻塞功咒;執(zhí)行take()操作的時候愉阎,隊列為空,消費者線程被阻塞力奋。
public class ArrayBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, Serializable {
final Object[] items;
int takeIndex;
int putIndex;
int count;
// 一把鎖+兩個條件
final ReentrantLock lock;
private final Condition notEmpty;
private final Condition notFull;
transient ArrayBlockingQueue<E>.Itrs itrs;
public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity <= 0) {
throw new IllegalArgumentException();
} else {
this.items = new Object[capacity];
// 構造器中創(chuàng)建一把鎖加兩個條件
this.lock = new ReentrantLock(fair);
// 構造器中創(chuàng)建一把鎖加兩個條件
this.notEmpty = this.lock.newCondition();
// 構造器中創(chuàng)建一把鎖加兩個條件
this.notFull = this.lock.newCondition();
}
}
public void put(E e) throws InterruptedException {
Objects.requireNonNull(e);
ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while(this.count == this.items.length) {
// 非滿條件阻塞榜旦,隊列容量已滿
this.notFull.await();
}
this.enqueue(e);
} finally {
lock.unlock();
}
}
private void enqueue(E e) {
Object[] items = this.items;
items[this.putIndex] = e;
if (++this.putIndex == items.length) {
this.putIndex = 0;
}
++this.count;
// put數據結束,通知消費者非空條件
this.notEmpty.signal();
}
public E take() throws InterruptedException {
ReentrantLock lock = this.lock;
lock.lockInterruptibly();
Object var2;
try {
while(this.count == 0) {
// 阻塞于非空條件景殷,隊列元素個數為0溅呢,無法消費
this.notEmpty.await();
}
var2 = this.dequeue();
} finally {
lock.unlock();
}
return var2;
}
private E dequeue() {
Object[] items = this.items;
E e = items[this.takeIndex];
items[this.takeIndex] = null;
if (++this.takeIndex == items.length) {
this.takeIndex = 0;
}
--this.count;
if (this.itrs != null) {
this.itrs.elementDequeued();
}
// 消費成功,通知非滿條件滨彻,隊列中有空間藕届,可以生產元素了。
this.notFull.signal();
return e;
}
3.實現原理
Condition的使用很方便亭饵,避免了wait/notify的喚醒操作不僅喚醒生產者也會喚醒消費者問題休偶。
由于Condition必須和Lock一起使用,所以Condition的實現也是Lock的一部分辜羊。首先查看互斥鎖和讀寫鎖中Condition的構造方法:
public class ReentrantLock implements Lock, java.io.Serializable {
public Condition newCondition() {
return this.sync.newCondition();
}
}
public class ReentrantReadWriteLock implements ReadWriteLock, java.io.Serializable {
private final ReentrantReadWriteLock.ReadLock readerLock;
private final ReentrantReadWriteLock.WriteLock writerLock;
public static class ReadLock implements Lock, Serializable {
// 讀鎖不支持Condition
public Condition newCondition() {
throw new UnsupportedOperationException();
}
}
public static class WriteLock implements Lock, java.io.Serializable {
public Condition newCondition() {
return this.sync.newCondition();
}
}
}
首先踏兜,讀寫鎖中的 ReadLock 是不支持 Condition 的词顾,讀寫鎖的寫鎖和互斥鎖都支持Condition。雖然它們各自調用的是自己的內部類Sync碱妆,但內部類Sync都繼承自AQS肉盹。因此,上面的代碼sync.newCondition最終都調用了AQS中的newCondition疹尾。
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable {
public class ConditionObject implements Condition, java.io.Serializable {
// Condition的所有實現上忍,都在ConditionObject類中
}
}
abstract static class Sync extends AbstractQueuedSynchronizer {
final ConditionObject newCondition() {
return new ConditionObject();
}
}
每一個Condition對象上面,都阻塞了多個線程纳本。因此窍蓝,在ConditionObject內部也有一個雙向鏈表組成的隊列
public class ConditionObject implements Condition, java.io.Serializable {
private transient Node firstWaiter;
private transient Node lastWaiter;
}
static final class Node {
volatile Node prev;
volatile Node next;
volatile Thread thread;
Node nextWaiter;
}
4.await()實現分析
java.util.concurrent.locks.AbstractQueuedSynchronizer.ConditionObject#await()實現分析
public final void await() throws InterruptedException {
// 剛要執(zhí)行await()操作,收到中斷信號繁成,拋異常
if (Thread.interrupted()) throw new InterruptedException();
// 加入Condition的等待隊列
Node node = addConditionWaiter();
// 阻塞在Condition之前必須先釋放鎖吓笙,否則會死鎖
int savedState = fullyRelease(node);
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
// 阻塞當前線程 LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break;
}
// 重新獲取鎖
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
.....
關于await,有幾個關鍵點要說明:
- 線程調用 await()的時候巾腕,肯定已經先拿到了鎖面睛。所以,在 addConditionWaiter()內部尊搬,對這個雙向鏈表的操作不需要執(zhí)行CAS操作叁鉴,線程天生是安全的,代碼如下:
private Node addConditionWaiter() {
// ...
Node t = lastWaiter;
Node node = new Node(Thread.currentThread(), Node.CONDITION);
if (t == null)
firstWaiter = node;
else
t.nextWaiter = node;
lastWaiter = node;
return node;
}
- 在線程執(zhí)行wait操作之前毁嗦,必須先釋放鎖亲茅。也就是fullyRelease(node),否則會發(fā)生死鎖狗准。這個和wait/notify與synchronized的配合機制一樣克锣。
- 線程從await中被喚醒后,必須用acquireQueued(node, savedState)方法重新拿鎖腔长。
- checkInterruptWhileWaiting(node)代碼在park(this)代碼之后袭祟,是為了檢測在park期間是否收到過中斷信號。當線程從park中醒來時捞附,有兩種可能:一種是其他線程調用了unpark巾乳,另一種是收到中斷信號。這里的await()方法是可以響應中斷的鸟召,所以當發(fā)現自己是被中斷喚醒的胆绊,而不是被unpark喚醒的時,會直接退出while循環(huán)欧募,await()方法也會返回压状。
- isOnSyncQueue(node)用于判斷該Node是否在AQS的同步隊列里面。初始的時候,Node只 在Condition的隊列里种冬,而不在AQS的隊列里镣丑。但執(zhí)行notity操作的時候,會放進AQS的同步隊列娱两。
5.signal實現
java.util.concurrent.locks.AbstractQueuedSynchronizer.ConditionObject#signal()
public final void signal() {
//// 只有持有鎖的線程莺匠,才有資格調用signal()方法
if (!AbstractQueuedSynchronizer.this.isHeldExclusively()) {
throw new IllegalMonitorStateException();
} else {
AbstractQueuedSynchronizer.Node first = this.firstWaiter;
if (first != null) {
// 發(fā)起通知
this.doSignal(first);
}
}
}
// 喚醒隊列中的第1個線程
private void doSignal(AbstractQueuedSynchronizer.Node first) {
do {
if ((this.firstWaiter = first.nextWaiter) == null) {
this.lastWaiter = null;
}
first.nextWaiter = null;
} while(!AbstractQueuedSynchronizer.this.transferForSignal(first) && (first = this.firstWaiter) != null);
}
final boolean transferForSignal(Node node) {
if (!node.compareAndSetWaitStatus(Node.CONDITION, 0))
return false;
// 先把Node放入互斥鎖的同步隊列中,再調用unpark方法
Node p = enq(node);
int ws = p.waitStatus;
if (ws > 0 || !p.compareAndSetWaitStatus(ws, Node.SIGNAL))
LockSupport.unpark(node.thread);
return true;
}
同 await()一樣十兢,在調用 signal()的時候趣竣,必須先拿到鎖(否則就會拋出上面的異常),是因為前面執(zhí)行await()的時候纪挎,把鎖釋放了期贫。
然后,從隊列中取出firstWaiter异袄,喚醒它。在通過調用unpark喚醒它之前玛臂,先用enq(node)方法把這個Node從condition中移除并放入AQS的鎖對應的阻塞隊列中烤蜕。也正因為如此,才有了await()方法里面的判斷條件:
- while( ! isOnSyncQueue(node))
- 這個判斷條件滿足迹冤,說明await線程不是被中斷讽营,而是被unpark喚醒的。
- notifyAll()與此類似泡徙。
簡述:
阻塞隊列使用condition實現橱鹏,condition必須和Lock一起使用,一般使用ReentrantLock來創(chuàng)建condition阻塞條件堪藐,并用ReentrantLock的鎖來加鎖控制并發(fā)莉兰。解決了wait/notify同時喚醒生產者和消費者的問題。讀寫鎖中的 ReadLock 是不支持 Condition 的礁竞,讀寫鎖的寫鎖和互斥鎖都支持Condition糖荒。他們最終都調用了AQS中的newCondition,ConditionObject是condition的具體實現模捂,內部也有一個雙向鏈表組成的隊列來存儲阻塞的隊列捶朵。一開始調用await會放入ConditionObject的隊列,然后調用最終是調用park原語阻塞狂男,最后調用signal的時候把線程在ConditionObject中移除综看,放入阻塞隊列本身,并調用unpark岖食。