線程間的通信與生產(chǎn)者消費(fèi)者模型
在前面我們講了很多關(guān)于同步的問題础嫡,然而在現(xiàn)實(shí)中涝登,需要線程之間的協(xié)作。比如說最經(jīng)典的生產(chǎn)者-消費(fèi)者模型:當(dāng)隊(duì)列滿時(shí)般又,生產(chǎn)者需要等待隊(duì)列有空間才能繼續(xù)往里面放入商品慕蔚,而在等待的期間內(nèi)丐黄,生產(chǎn)者必須釋放對臨界資源(即隊(duì)列)的占用權(quán)。因?yàn)樯a(chǎn)者如果不釋放對臨界資源的占用權(quán)坊萝,那么消費(fèi)者就無法消費(fèi)隊(duì)列中的商品孵稽,就不會讓隊(duì)列有空間,那么生產(chǎn)者就會一直無限等待下去十偶。因此菩鲜,一般情況下,當(dāng)隊(duì)列滿時(shí)惦积,會讓生產(chǎn)者交出對臨界資源的占用權(quán)接校,并進(jìn)入掛起狀態(tài)。然后等待消費(fèi)者消費(fèi)了商品,然后消費(fèi)者通知生產(chǎn)者隊(duì)列有空間了蛛勉。同樣地鹿寻,當(dāng)隊(duì)列空時(shí),消費(fèi)者也必須等待诽凌,等待生產(chǎn)者通知它隊(duì)列中有商品了毡熏。這種互相通信的過程就是線程間的協(xié)作。
在Java中線程協(xié)作有最常見的兩種方式:1)利用Object.wait()侣诵、Object.notify()痢法;2)使用Condition
wait()、notify()和notifyAll()
wait()杜顺、notify()和notifyAll()是Object類中的方法:
/**
* Wakes up a single thread that is waiting on this object's
* monitor. If any threads are waiting on this object, one of them
* is chosen to be awakened. The choice is arbitrary and occurs at
* the discretion of the implementation. A thread waits on an object's
* monitor by calling one of the wait methods
*/
public final native void notify();
/**
* Wakes up all threads that are waiting on this object's monitor. A
* thread waits on an object's monitor by calling one of the
* wait methods.
*/
public final native void notifyAll();
/**
* Causes the current thread to wait until either another thread invokes the
* {@link java.lang.Object#notify()} method or the
* {@link java.lang.Object#notifyAll()} method for this object, or a
* specified amount of time has elapsed.
* <p>
* The current thread must own this object's monitor.
*/
public final native void wait(long timeout) throws InterruptedException;
從這三個(gè)方法的文字描述可以知道以下幾點(diǎn)信息:
1)wait()财搁、notify()和notifyAll()方法是本地方法,并且為final方法躬络,無法被重寫尖奔。
2)調(diào)用某個(gè)對象的wait()方法能讓當(dāng)前線程阻塞,并且當(dāng)前線程必須擁有此對象的monitor(即鎖)穷当。如果在調(diào)用wait()時(shí)提茁,沒有持有適當(dāng)?shù)逆i,則拋出IllegalMonitorStateException膘滨,它是RuntimeException的一個(gè)子類甘凭。
3)調(diào)用某個(gè)對象的notify()方法能夠喚醒一個(gè)正在等待這個(gè)對象的monitor的線程稀拐,如果有多個(gè)線程都在等待這個(gè)對象的monitor火邓,則只能喚醒其中一個(gè)線程;如果在調(diào)用notify()時(shí)沒有持有適當(dāng)?shù)逆i德撬,也會拋出IllegalMonitorStateException铲咨。
4)調(diào)用notifyAll()方法能夠喚醒所有正在等待這個(gè)對象的monitor的線程;
上面已經(jīng)提到蜓洪,如果調(diào)用某個(gè)對象的wait()方法纤勒,當(dāng)前線程必須擁有這個(gè)對象的monitor(即鎖),因此調(diào)用wait()方法必須在同步塊或者同步方法中進(jìn)行(synchronized塊或者synchronized方法)隆檀。
調(diào)用某個(gè)對象的wait()方法摇天,相當(dāng)于讓當(dāng)前線程交出此對象的monitor,然后進(jìn)入等待狀態(tài)恐仑,等待后續(xù)再次獲得此對象的鎖(Thread類中的sleep方法使當(dāng)前線程暫停執(zhí)行一段時(shí)間泉坐,從而讓其他線程有機(jī)會繼續(xù)執(zhí)行,但它并不釋放對象鎖)裳仆;
notify()方法能夠喚醒一個(gè)正在等待該對象的monitor的線程腕让,當(dāng)有多個(gè)線程都在等待該對象的monitor的話,則只能喚醒其中一個(gè)線程歧斟,具體喚醒哪個(gè)線程則不得而知纯丸。
同樣地偏形,調(diào)用某個(gè)對象的notify()方法,當(dāng)前線程也必須擁有這個(gè)對象的monitor觉鼻,因此調(diào)用notify()方法必須在同步塊或者同步方法中進(jìn)行(synchronized塊或者synchronized方法)俊扭。
nofityAll()方法能夠喚醒所有正在等待該對象的monitor的線程,這一點(diǎn)與notify()方法是不同的坠陈。
這里要注意一點(diǎn):notify()和notifyAll()方法只是喚醒等待該對象的monitor的線程统扳,并不決定哪個(gè)線程能夠獲取到monitor。
舉個(gè)簡單的例子:假如有三個(gè)線程Thread1畅姊、Thread2和Thread3都在等待對象objectA的monitor咒钟,此時(shí)Thread4擁有對象objectA的monitor,當(dāng)在Thread4中調(diào)用objectA.notify()方法之后若未,Thread1朱嘴、Thread2和Thread3只有一個(gè)能被喚醒。注意粗合,被喚醒不等于立刻就獲取了objectA的monitor萍嬉。假若在Thread4中調(diào)用objectA.notifyAll()方法,則Thread1隙疚、Thread2和Thread3三個(gè)線程都會被喚醒壤追,至于哪個(gè)線程接下來能夠獲取到objectA的monitor就具體依賴于操作系統(tǒng)的調(diào)度了。
上面尤其要注意一點(diǎn):一個(gè)線程被喚醒不代表立即獲取了對象的monitor供屉,只有等調(diào)用完notify()或者notifyAll()并退出synchronized塊行冰,釋放對象鎖后,其余線程才可獲得鎖執(zhí)行伶丐。
舉個(gè)例子:
public class WaitNotifyTest {
private static Object obj = new Object();
public static void main(String[] args) throws Exception {
Thread t1 = new Thread() {
public void run() {
synchronized(obj) {
try {
obj.wait();
} catch (InterruptedException ex) {}
System.out.println("t1 成功獲得了鎖");
}
}
};
Thread t2 = new Thread() {
public void run() {
synchronized(obj) {
System.out.println("t2 調(diào)用notify()");
obj.notify();
}
System.out.println("t2 釋放了鎖");
}
};
t1.start();
Thread.sleep(2000);
t2.start();
}
}
無論如何運(yùn)行悼做,執(zhí)行結(jié)果如下:
t2 調(diào)用notify()
t2 釋放了鎖
t1 成功獲得了鎖
顯式的Condition對象
Condition是在java 1.5中才出現(xiàn)的,它用來替代傳統(tǒng)的Object的wait()哗魂、notify()實(shí)現(xiàn)線程間的協(xié)作肛走,相比使用Object的wait()、notify()录别,使用Condition的await()朽色、signal()這種方式實(shí)現(xiàn)線程間協(xié)作更加安全和高效。因此通常來說比較推薦使用Condition组题。
Condition是個(gè)接口葫男,基本的方法如下:
public interface Condition {
void await() throws InterruptedException;
void awaitUninterruptibly();
long awaitNanos(long nanosTimeout) throws InterruptedException;
boolean await(long time, TimeUnit unit) throws InterruptedException;
boolean awaitUntil(Date deadline) throws InterruptedException;
void signal();
void signalAll();
}
Condition依賴于Lock接口,生成一個(gè)Condition的基本代碼是:lock.newCondition()往踢;
調(diào)用Condition的await()和signal()方法腾誉,都必須在lock保護(hù)之內(nèi),就是說必須在lock.lock()和lock.unlock()之間才可以使用。
Conditon中的await()對應(yīng)Object的wait()利职;Condition中的signal()對應(yīng)Object的notify()趣效;Condition中的signalAll()對應(yīng)Object的notifyAll()。
生產(chǎn)者與消費(fèi)者模型
1)使用內(nèi)置鎖猪贪、Object的wait()和notify()實(shí)現(xiàn):
public class ProducerAndConsumer {
public static void main(String[] args) {
Producer producer = new Producer();
Consumer consumer = new Consumer();
for (int i=0; i<5; i++) {
new Thread(producer).start();
new Thread(consumer).start();
}
}
private static final int CAPACITY = 10;
private static final LinkedList<Integer> list = new LinkedList<Integer>();
private static final Object obj = new Object();
static class Producer implements Runnable {
private static final AtomicInteger count = new AtomicInteger(0);
public void run() {
for (int i=0; i<10; i++)
produce();
}
private void produce() {
synchronized (obj) {
while (list.size() == CAPACITY) {
try {
obj.wait();
} catch (InterruptedException ex) {
}
}
Integer num = count.incrementAndGet();
list.add(num);
System.out.println(Thread.currentThread().getName() + "生產(chǎn)了 " + num);
obj.notifyAll();
}
}
}
static class Consumer implements Runnable {
public void run() {
for (int i=0; i<10; i++)
consume();
}
private void consume() {
synchronized (obj) {
while (list.size() == 0) {
try {
obj.wait();
} catch (InterruptedException ex) {
}
}
System.out.println(Thread.currentThread().getName() + "消費(fèi)了 " + list.remove());
obj.notifyAll();
}
}
}
}
注意:在多生產(chǎn)者和多消費(fèi)者的模式下跷敬,必須要使用Object.notifyAll()方法,并且當(dāng)線程喚醒之后要循環(huán)判斷條件是否滿足热押,否則很容易陷入死鎖的狀態(tài)西傀。
2)使用顯式鎖和Condition實(shí)現(xiàn)
public class ConditionTest {
public static void main(String[] args) {
Producer producer = new Producer();
Consumer consumer = new Consumer();
for (int i=0; i<5; i++) {
new Thread(producer).start();
new Thread(consumer).start();
}
}
private static final int CAPACITY = 1;
private static final LinkedList<Integer> list = new LinkedList<Integer>();
private static final Lock lock = new ReentrantLock();
private static final Condition notFull = lock.newCondition();
private static final Condition notEmpty = lock.newCondition();
static class Producer implements Runnable {
private static final AtomicInteger count = new AtomicInteger(0);
public void run() {
for (int i=0; i<10; i++)
produce();
}
private void produce() {
lock.lock();
try {
while (list.size() == CAPACITY) {
try {
notFull.await();
} catch (InterruptedException ex) {
}
}
Integer num = count.incrementAndGet();
list.add(num);
System.out.println(Thread.currentThread().getName() + "生產(chǎn)了 " + num);
notEmpty.signal();
} finally {
lock.unlock();
}
}
}
static class Consumer implements Runnable {
public void run() {
for (int i=0; i<10; i++)
consume();
}
private void consume() {
lock.lock();
try {
while (list.size() == 0) {
try {
notEmpty.await();
} catch (InterruptedException ex) {
}
}
System.out.println(Thread.currentThread().getName() + "消費(fèi)了 " + list.remove());
notFull.signal();
} finally {
lock.unlock();
}
}
}
}
兩種實(shí)現(xiàn)方式的區(qū)別
1)使用內(nèi)置鎖時(shí),每個(gè)鎖都只能有一個(gè)相關(guān)聯(lián)的條件隊(duì)列桶癣,因而在像上面的例子中拥褂,多個(gè)線程可能在同一個(gè)條件隊(duì)列上等待不同的條件謂詞,如:生產(chǎn)者線程在等待隊(duì)列notFull牙寞,消費(fèi)者線程在等待隊(duì)列notEmpty饺鹃。當(dāng)條件滿足時(shí),必須使用notifyAll()方法喚醒所有等待的線程间雀,否則程序很容易產(chǎn)生活躍性故障:死鎖悔详。但是,當(dāng)使用notifyAll()方法喚醒所有線程后惹挟,會使得所有線程在鎖上發(fā)生競爭茄螃,然后,大多數(shù)線程又都回到等待狀態(tài)连锯,因此將出現(xiàn)大量的上下文切換操作以及發(fā)生競爭的鎖獲取操作归苍。
2)使用內(nèi)置鎖時(shí),最大的問題就是:無法根據(jù)滿足的條件去喚醒對應(yīng)的線程萎庭,如:notEmpty時(shí)喚醒消費(fèi)者線程霜医,notFull時(shí)喚醒生產(chǎn)者線程。而使用Lock時(shí)驳规,可以產(chǎn)生多個(gè)與之關(guān)聯(lián)的條件隊(duì)列,不同類型的線程將在不同的條件隊(duì)列上等待署海,當(dāng)條件滿足時(shí)吗购,可以使用signal()方法喚醒相應(yīng)的線程,從而極大地減少上下文切換與鎖請求的次數(shù)砸狞。
另外捻勉,Condition比Object提供了更豐富的功能:在每個(gè)鎖上可存在多個(gè)等待、條件等待可以是可中斷的或不可中斷的刀森、基于時(shí)限的等待踱启,以及公平的或非公平的隊(duì)列操作。Condition對象繼承了相關(guān)的Lock對象的公平性,對于公平的鎖埠偿,線程會依照FIFO順序從Condition.await中釋放透罢。