1 簡介
Condition中的await()方法相當(dāng)于Object的wait()方法揍很,Condition中的signal()方法相當(dāng)于Object的notify()方法契耿,Condition中的signalAll()相當(dāng)于Object的notifyAll()方法廊佩。
不同的是况褪,Object中的wait(),notify(),notifyAll()方法是和"同步鎖"(synchronized關(guān)鍵字)捆綁使用的秉馏;而Condition是需要與"互斥鎖"/"共享鎖"捆綁使用的。
2 Condition的實現(xiàn)分析
Condition是同步器AbstractQueuedSynchronized的內(nèi)部類唆涝,因為Condition的操作需要獲取相關(guān)的鎖,所以作為同步器的內(nèi)部類比較合理唇辨。每個Condition對象都包含著一個隊列(等待隊列)廊酣,該隊列是Condition對象實現(xiàn)等待/通知功能的關(guān)鍵。
等待隊列:
等待隊列是一個FIFO的隊列赏枚,隊列的每一個節(jié)點(diǎn)都包含了一個線程引用亡驰,該線程就是在Condition對象上等待的線程,如果一個線程調(diào)用了await()方法饿幅,該線程就會釋放鎖、構(gòu)造成節(jié)點(diǎn)進(jìn)入等待隊列并進(jìn)入等待狀態(tài)透乾。
這里的節(jié)點(diǎn)定義也就是AbstractQueuedSynchronizer.Node的定義磕秤。
一個Condition包含一個等待隊列市咆,Condition擁有首節(jié)點(diǎn)(firstWaiter)和尾節(jié)點(diǎn)(lastWaiter)。當(dāng)前線程調(diào)用Condition.await()方法時磷瘤,將會以當(dāng)前線程構(gòu)造節(jié)點(diǎn)采缚,并將節(jié)點(diǎn)從尾部加入等待隊列挠他。
在Object的監(jiān)視器模型上,一個對象擁有一個同步隊列和等待隊列赂苗,而Lock(同步器)擁有一個同步隊列和多個等待隊列贮尉。
等待(await):AbstractQueuedLongSynchronizer中實現(xiàn)
調(diào)用Condition的await()方法猜谚,會使當(dāng)前線程進(jìn)入等待隊列并釋放鎖,同時線程狀態(tài)變?yōu)榈却隣顟B(tài)魏铅。
從隊列的角度來看览芳,相當(dāng)于同步隊列的首節(jié)點(diǎn)(獲取了鎖的節(jié)點(diǎn))移動到Condition的等待隊列中。
當(dāng)?shù)却犃兄械墓?jié)點(diǎn)被喚醒铸敏,則喚醒節(jié)點(diǎn)的線程開始嘗試獲取同步狀態(tài)杈笔。如果不是通過Condition.signal()方法喚醒糕非,而是對等待線程進(jìn)行中斷朽肥,則拋出InterruptedException。
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
Node node = addConditionWaiter();
long savedState = fullyRelease(node);
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
Condition等待通知的本質(zhì)
總的來說融师,Condition的本質(zhì)就是等待隊列和同步隊列的交互:
當(dāng)一個持有鎖的線程調(diào)用Condition.await()時旱爆,它會執(zhí)行以下步驟:
- 構(gòu)造一個新的等待隊列節(jié)點(diǎn)加入到等待隊列隊尾
- 釋放鎖怀伦,也就是將它的同步隊列節(jié)點(diǎn)從同步隊列隊首移除
- 自旋山林,直到它在等待隊列上的節(jié)點(diǎn)移動到了同步隊列(通過其他線程調(diào)用signal())或被中斷
- 阻塞當(dāng)前節(jié)點(diǎn),直到它獲取到了鎖拜鹤,也就是它在同步隊列上的節(jié)點(diǎn)排隊排到了隊首敏簿。
當(dāng)一個持有鎖的線程調(diào)用Condition.signal()時宣虾,它會執(zhí)行以下操作:
從等待隊列的隊首開始,嘗試對隊首節(jié)點(diǎn)執(zhí)行喚醒操作蜻势;如果節(jié)點(diǎn)CANCELLED握玛,就嘗試喚醒下一個節(jié)點(diǎn)次员;如果再CANCELLED則繼續(xù)迭代淑蔚。
對每個節(jié)點(diǎn)執(zhí)行喚醒操作時刹衫,首先將節(jié)點(diǎn)加入同步隊列搞挣,此時await()操作的步驟3的解鎖條件就已經(jīng)開啟了囱桨。然后分兩種情況討論:
- 如果先驅(qū)節(jié)點(diǎn)的狀態(tài)為CANCELLED(>0) 或設(shè)置先驅(qū)節(jié)點(diǎn)的狀態(tài)為SIGNAL失敗,那么就立即喚醒當(dāng)前節(jié)點(diǎn)對應(yīng)的線程搀继,此時await()方法就會完成步驟3叽躯,進(jìn)入步驟4.
- 如果成功把先驅(qū)節(jié)點(diǎn)的狀態(tài)設(shè)置為了SIGNAL点骑,那么就不立即喚醒了。等到先驅(qū)節(jié)點(diǎn)成為同步隊列首節(jié)點(diǎn)并釋放了同步狀態(tài)后憨募,會自動喚醒當(dāng)前節(jié)點(diǎn)對應(yīng)線程的馋嗜,這時候await()的步驟3才執(zhí)行完成吵瞻,而且有很大概率快速完成步驟4.
通知(signal):AbstractQueuedLongSynchronizer中實現(xiàn)
調(diào)用Condition的signal()方法,將會喚醒在等待隊列中等待時間最長的節(jié)點(diǎn)(首節(jié)點(diǎn))眯停,在喚醒節(jié)點(diǎn)之前莺债,會將節(jié)點(diǎn)移到同步隊列中签夭。
Condition的signalAll()方法第租,相當(dāng)于對等待隊列中的每個節(jié)點(diǎn)均執(zhí)行一次signal()方法,將等待隊列中的節(jié)點(diǎn)全部移動到同步隊列中丐吓,并喚醒每個節(jié)點(diǎn)的線程券犁。
public final void signal() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignal(first);
}
public final void signalAll() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignalAll(first);
}
最后還要注意,Java 中有 signal 和 signalAll 兩種方法,signal 是隨機(jī)解除一個等待集中的線程的阻塞狀態(tài),signalAll 是解除所有等待集中的線程的阻塞狀態(tài)粘衬。signal 方法的效率會比 signalAll 高,但是它存在危險,因為它一次只解除一個線程的阻塞狀態(tài),因此,如果等待集中有多個線程都滿足了條件,也只能喚醒一個,其他的線程可能會導(dǎo)致死鎖
3 Condition 實例
消費(fèi)生產(chǎn)者模式
public class ConditionTest {
public static void main(String[] args) {
// 倉庫
Depot depot = new Depot(100);
// 消費(fèi)者
Consumer consumer = new Consumer(depot);
// 生產(chǎn)者
Produce produce = new Produce(depot);
produce.produceThing(5);
consumer.consumerThing(5);
produce.produceThing(2);
consumer.consumerThing(5);
produce.produceThing(3);
}
}
class Depot {
private int capacity;
private int size;
private Lock lock;
private Condition consumerCond;
private Condition produceCond;
public Depot(int capacity) {
this.capacity = capacity;
this.size = 0;
this.lock = new ReentrantLock();
this.consumerCond = lock.newCondition();
this.produceCond = lock.newCondition();
}
public void produce(int val) {
lock.lock();
try {
int left = val;
while (left > 0) {
while (size >= capacity) {
produceCond.await();
}
int produce = (left+size) > capacity ? (capacity-size) : left;
size += produce;
left -= produce;
System.out.println(Thread.currentThread().getName() + ", ProduceVal=" + val + ", produce=" + produce + ", size=" + size);
consumerCond.signalAll();
}
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
public void consumer(int val) {
lock.lock();
try {
int left = val;
while (left > 0) {
while (size <= 0) {
consumerCond.await();
}
int consumer = (size <= left) ? size : left;
size -= consumer;
left -= consumer;
System.out.println(Thread.currentThread().getName() + ", ConsumerVal=" + val + ", consumer=" + consumer + ", size=" + size);
produceCond.signalAll();
}
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
}
class Consumer {
private Depot depot;
public Consumer(Depot depot) {
this.depot = depot;
}
public void consumerThing(final int amount) {
new Thread(new Runnable() {
public void run() {
depot.consumer(amount);
}
}).start();
}
}
class Produce {
private Depot depot;
public Produce(Depot depot) {
this.depot = depot;
}
public void produceThing(final int amount) {
new Thread(new Runnable() {
public void run() {
depot.produce(amount);
}
}).start();
}
}
Thread-0, ProduceVal=5, produce=5, size=5
Thread-1, ConsumerVal=5, consumer=5, size=0
Thread-2, ProduceVal=2, produce=2, size=2
Thread-3, ConsumerVal=5, consumer=2, size=0
Thread-4, ProduceVal=3, produce=3, size=3
Thread-3, ConsumerVal=5, consumer=3, size=0
輸出結(jié)果中,Thread-3出現(xiàn)兩次枷莉,就是因為要消費(fèi)5個產(chǎn)品尺迂,但倉庫中只有2個產(chǎn)品冒掌,所以先將庫存的2個產(chǎn)品全部消費(fèi)股毫,然后這個線程進(jìn)入等待隊列铃诬,等待生產(chǎn)苍凛,隨后生產(chǎn)出了3個產(chǎn)品,生產(chǎn)者生產(chǎn)后又執(zhí)行signalAll方法將等待隊列中所有的線程都喚醒宣肚,Thread-3繼續(xù)消費(fèi)還需要的3個產(chǎn)品霉涨。
三個線程依次打印ABC
class Business {
private Lock lock = new ReentrantLock();
private Condition conditionA = lock.newCondition();
private Condition conditionB = lock.newCondition();
private Condition conditionC = lock.newCondition();
private String type = "A"; //內(nèi)部狀態(tài)
/*
* 方法的基本要求為:
* 1惭适、該方法必須為原子的癞志。
* 2、當(dāng)前狀態(tài)必須滿足條件师溅。若不滿足盾舌,則等待妖谴;滿足酌摇,則執(zhí)行業(yè)務(wù)代碼。
* 3仍稀、業(yè)務(wù)執(zhí)行完畢后埂息,修改狀態(tài),并喚醒指定條件下的線程享幽。
*/
public void printA() {
lock.lock(); //鎖,保證了線程安全摆霉。
try {
while (type != "A") { //type不為A携栋,
try {
conditionA.await(); //將當(dāng)前線程阻塞于conditionA對象上蛀蜜,將被阻塞。
} catch (InterruptedException e) {
e.printStackTrace();
}
}
//type為A磅摹,則執(zhí)行户誓。
System.out.println(Thread.currentThread().getName() + " 正在打印A");
type = "B"; //將type設(shè)置為B幕侠。
conditionB.signal(); //喚醒在等待conditionB對象上的一個線程。將信號傳遞出去悼潭。
} finally {
lock.unlock(); //解鎖
}
}
public void printB() {
lock.lock(); //鎖
try {
while (type != "B") { //type不為B舰褪,
try {
conditionB.await(); //將當(dāng)前線程阻塞于conditionB對象上疏橄,將被阻塞。
} catch (InterruptedException e) {
e.printStackTrace();
}
}
//type為B晃酒,則執(zhí)行贝次。
System.out.println(Thread.currentThread().getName() + " 正在打印B");
type = "C"; //將type設(shè)置為C彰导。
conditionC.signal(); //喚醒在等待conditionC對象上的一個線程。將信號傳遞出去搁宾。
} finally {
lock.unlock(); //解鎖
}
}
public void printC() {
lock.lock(); //鎖
try {
while (type != "C") {
try {
conditionC.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println(Thread.currentThread().getName() + " 正在打印C");
type = "A";
conditionA.signal();
} finally {
lock.unlock(); //解鎖
}
}
}
public class ConditionTest{
public static void main(String[] args) {
final Business business = new Business();//業(yè)務(wù)對象盖腿。
//線程1號,打印10次A鸟款。
Thread ta = new Thread(new Runnable() {
@Override
public void run() {
for(int i=0;i<10;i++){
business.printA();
}
}
});
//線程2號何什,打印10次B等龙。
Thread tb = new Thread(new Runnable() {
@Override
public void run() {
for(int i=0;i<10;i++){
business.printB();
}
}
});
//線程3號,打印10次C罐栈。
Thread tc = new Thread(new Runnable() {
@Override
public void run() {
for(int i=0;i<10;i++){
business.printC();
}
}
});
//執(zhí)行3條線程荠诬。
ta.start();
tb.start();
tc.start();
}
}
Thread-0 正在打印A
Thread-1 正在打印B
Thread-2 正在打印C
Thread-0 正在打印A
Thread-1 正在打印B
Thread-2 正在打印C
Thread-0 正在打印A
Thread-1 正在打印B
Thread-2 正在打印C
Thread-0 正在打印A
Thread-1 正在打印B
Thread-2 正在打印C
Thread-0 正在打印A
Thread-1 正在打印B
Thread-2 正在打印C
Thread-0 正在打印A
Thread-1 正在打印B
Thread-2 正在打印C
Thread-0 正在打印A
Thread-1 正在打印B
Thread-2 正在打印C
Thread-0 正在打印A
Thread-1 正在打印B
Thread-2 正在打印C
Thread-0 正在打印A
Thread-1 正在打印B
Thread-2 正在打印C
Thread-0 正在打印A
Thread-1 正在打印B
Thread-2 正在打印C
虛假喚醒
所謂"虛假喚醒"柑贞,即其他地方的代碼觸發(fā)了condition.signal()聂抢,喚醒condition上等待的線程。但被喚醒的線程仍然不滿足執(zhí)行條件康辑。
condition通常與條件語句一起使用:
if(!條件){
condition.await(); //不滿足條件轿亮,當(dāng)前線程等待我注;
}
更好的方法是使用while:
while(!條件){
condition.await(); //不滿足條件迟隅,當(dāng)前線程等待励七;
}
在等待Condition時掠抬,允許發(fā)生"虛假喚醒"两波,這通常作為對基礎(chǔ)平臺語義的讓步闷哆。若使用"if(!條件)"則被"虛假喚醒"的線程可能繼續(xù)執(zhí)行。所以"while(!條件)"可以防止"虛假喚醒"劣坊。建議總是假定這些"虛假喚醒"可能發(fā)生屈留,因此總是在一個循環(huán)中等待。
總結(jié)
如果知道Object的等待通知機(jī)制锐想,Condition的使用是比較容易掌握的赠摇,因為和Object等待通知的使用基本一致浅蚪。
對Condition的源碼理解,主要就是理解等待隊列洽故,等待隊列可以類比同步隊列盗誊,而且等待隊列比同步隊列要簡單哈踱,因為等待隊列是單向隊列,同步隊列是雙向隊列刀诬。
以下是筆者對等待隊列是單向隊列邪财、同步隊列是雙向隊列的一些思考质欲,歡迎提出不同意見:
之所以同步隊列要設(shè)計成雙向的嘶伟,是因為在同步隊列中又碌,節(jié)點(diǎn)喚醒是接力式的,由每一個節(jié)點(diǎn)喚醒它的下一個節(jié)點(diǎn)赠橙,如果是由next指針獲取下一個節(jié)點(diǎn),是有可能獲取失敗的掉奄,因為虛擬隊列每添加一個節(jié)點(diǎn)凤薛,是先用CAS把tail設(shè)置為新節(jié)點(diǎn),然后才修改原tail的next指針到新節(jié)點(diǎn)的速兔。因此用next向后遍歷是不安全的涣狗,但是如果在設(shè)置新節(jié)點(diǎn)為tail前舒憾,為新節(jié)點(diǎn)設(shè)置prev,則可以保證從tail往前遍歷是安全的丁溅。因此要安全的獲取一個節(jié)點(diǎn)Node的下一個節(jié)點(diǎn)探遵,先要看next是不是null,如果是null箱季,還要從tail往前遍歷看看能不能遍歷到Node。
而等待隊列就簡單多了求豫,等待的線程就是等待者诉稍,只負(fù)責(zé)等待,喚醒的線程就是喚醒者蚤告,只負(fù)責(zé)喚醒服爷,因此每次要執(zhí)行喚醒操作的時候,直接喚醒等待隊列的首節(jié)點(diǎn)就行了心褐。等待隊列的實現(xiàn)中不需要遍歷隊列笼踩,因此也不需要prev指針。
特別感謝
Java多線程——Condition條件
Java并發(fā)——使用Condition線程間通信
Java顯式鎖學(xué)習(xí)總結(jié)之六:Condition源碼分析