13-線程間的通信與生產(chǎn)者消費(fèi)者模型

線程間的通信與生產(chǎn)者消費(fèi)者模型

在前面我們講了很多關(guān)于同步的問題础嫡,然而在現(xiàn)實(shí)中涝登,需要線程之間的協(xié)作。比如說最經(jīng)典的生產(chǎn)者-消費(fèi)者模型:當(dāng)隊(duì)列滿時(shí)般又,生產(chǎn)者需要等待隊(duì)列有空間才能繼續(xù)往里面放入商品慕蔚,而在等待的期間內(nèi)丐黄,生產(chǎn)者必須釋放對臨界資源(即隊(duì)列)的占用權(quán)。因?yàn)樯a(chǎn)者如果不釋放對臨界資源的占用權(quán)坊萝,那么消費(fèi)者就無法消費(fèi)隊(duì)列中的商品孵稽,就不會讓隊(duì)列有空間,那么生產(chǎn)者就會一直無限等待下去十偶。因此菩鲜,一般情況下,當(dāng)隊(duì)列滿時(shí)惦积,會讓生產(chǎn)者交出對臨界資源的占用權(quán)接校,并進(jìn)入掛起狀態(tài)。然后等待消費(fèi)者消費(fèi)了商品,然后消費(fèi)者通知生產(chǎn)者隊(duì)列有空間了蛛勉。同樣地鹿寻,當(dāng)隊(duì)列空時(shí),消費(fèi)者也必須等待诽凌,等待生產(chǎn)者通知它隊(duì)列中有商品了毡熏。這種互相通信的過程就是線程間的協(xié)作。

在Java中線程協(xié)作有最常見的兩種方式:1)利用Object.wait()侣诵、Object.notify()痢法;2)使用Condition

wait()、notify()和notifyAll()

wait()杜顺、notify()和notifyAll()是Object類中的方法:

/**
 * Wakes up a single thread that is waiting on this object's
 * monitor. If any threads are waiting on this object, one of them
 * is chosen to be awakened. The choice is arbitrary and occurs at
 * the discretion of the implementation. A thread waits on an object's
 * monitor by calling one of the wait methods
 */
public final native void notify();
 
/**
 * Wakes up all threads that are waiting on this object's monitor. A
 * thread waits on an object's monitor by calling one of the
 * wait methods.
 */
public final native void notifyAll();
 
/**
 * Causes the current thread to wait until either another thread invokes the
 * {@link java.lang.Object#notify()} method or the
 * {@link java.lang.Object#notifyAll()} method for this object, or a
 * specified amount of time has elapsed.
 * <p>
 * The current thread must own this object's monitor.
 */
public final native void wait(long timeout) throws InterruptedException;

從這三個(gè)方法的文字描述可以知道以下幾點(diǎn)信息:

1)wait()财搁、notify()和notifyAll()方法是本地方法,并且為final方法躬络,無法被重寫尖奔。

2)調(diào)用某個(gè)對象的wait()方法能讓當(dāng)前線程阻塞,并且當(dāng)前線程必須擁有此對象的monitor(即鎖)穷当。如果在調(diào)用wait()時(shí)提茁,沒有持有適當(dāng)?shù)逆i,則拋出IllegalMonitorStateException膘滨,它是RuntimeException的一個(gè)子類甘凭。

3)調(diào)用某個(gè)對象的notify()方法能夠喚醒一個(gè)正在等待這個(gè)對象的monitor的線程稀拐,如果有多個(gè)線程都在等待這個(gè)對象的monitor火邓,則只能喚醒其中一個(gè)線程;如果在調(diào)用notify()時(shí)沒有持有適當(dāng)?shù)逆i德撬,也會拋出IllegalMonitorStateException铲咨。

4)調(diào)用notifyAll()方法能夠喚醒所有正在等待這個(gè)對象的monitor的線程;

上面已經(jīng)提到蜓洪,如果調(diào)用某個(gè)對象的wait()方法纤勒,當(dāng)前線程必須擁有這個(gè)對象的monitor(即鎖),因此調(diào)用wait()方法必須在同步塊或者同步方法中進(jìn)行(synchronized塊或者synchronized方法)隆檀。

調(diào)用某個(gè)對象的wait()方法摇天,相當(dāng)于讓當(dāng)前線程交出此對象的monitor,然后進(jìn)入等待狀態(tài)恐仑,等待后續(xù)再次獲得此對象的鎖(Thread類中的sleep方法使當(dāng)前線程暫停執(zhí)行一段時(shí)間泉坐,從而讓其他線程有機(jī)會繼續(xù)執(zhí)行,但它并不釋放對象鎖)裳仆;

notify()方法能夠喚醒一個(gè)正在等待該對象的monitor的線程腕让,當(dāng)有多個(gè)線程都在等待該對象的monitor的話,則只能喚醒其中一個(gè)線程歧斟,具體喚醒哪個(gè)線程則不得而知纯丸。

同樣地偏形,調(diào)用某個(gè)對象的notify()方法,當(dāng)前線程也必須擁有這個(gè)對象的monitor觉鼻,因此調(diào)用notify()方法必須在同步塊或者同步方法中進(jìn)行(synchronized塊或者synchronized方法)俊扭。

nofityAll()方法能夠喚醒所有正在等待該對象的monitor的線程,這一點(diǎn)與notify()方法是不同的坠陈。

這里要注意一點(diǎn):notify()和notifyAll()方法只是喚醒等待該對象的monitor的線程统扳,并不決定哪個(gè)線程能夠獲取到monitor。

舉個(gè)簡單的例子:假如有三個(gè)線程Thread1畅姊、Thread2和Thread3都在等待對象objectA的monitor咒钟,此時(shí)Thread4擁有對象objectA的monitor,當(dāng)在Thread4中調(diào)用objectA.notify()方法之后若未,Thread1朱嘴、Thread2和Thread3只有一個(gè)能被喚醒。注意粗合,被喚醒不等于立刻就獲取了objectA的monitor萍嬉。假若在Thread4中調(diào)用objectA.notifyAll()方法,則Thread1隙疚、Thread2和Thread3三個(gè)線程都會被喚醒壤追,至于哪個(gè)線程接下來能夠獲取到objectA的monitor就具體依賴于操作系統(tǒng)的調(diào)度了。

上面尤其要注意一點(diǎn):一個(gè)線程被喚醒不代表立即獲取了對象的monitor供屉,只有等調(diào)用完notify()或者notifyAll()并退出synchronized塊行冰,釋放對象鎖后,其余線程才可獲得鎖執(zhí)行伶丐。

舉個(gè)例子:

public class WaitNotifyTest {
    
    private static Object obj = new Object();
    
    public static void main(String[] args) throws Exception {
        Thread t1 = new Thread() {
            public void run() {
                synchronized(obj) {
                    try {
                        obj.wait();
                    } catch (InterruptedException ex) {}
                    System.out.println("t1 成功獲得了鎖");
                }
            }
        };
        
        Thread t2 = new Thread() {
            public void run() {
                synchronized(obj) {
                    System.out.println("t2 調(diào)用notify()");
                    obj.notify();
                }
                System.out.println("t2 釋放了鎖");
            }
        };
        
        t1.start();
        Thread.sleep(2000);
        t2.start();
    }
}

無論如何運(yùn)行悼做,執(zhí)行結(jié)果如下:

t2 調(diào)用notify()

t2 釋放了鎖

t1 成功獲得了鎖

顯式的Condition對象

Condition是在java 1.5中才出現(xiàn)的,它用來替代傳統(tǒng)的Object的wait()哗魂、notify()實(shí)現(xiàn)線程間的協(xié)作肛走,相比使用Object的wait()、notify()录别,使用Condition的await()朽色、signal()這種方式實(shí)現(xiàn)線程間協(xié)作更加安全和高效。因此通常來說比較推薦使用Condition组题。

Condition是個(gè)接口葫男,基本的方法如下:

public interface Condition {
    void await() throws InterruptedException;
    void awaitUninterruptibly();
    long awaitNanos(long nanosTimeout) throws InterruptedException;
    boolean await(long time, TimeUnit unit) throws InterruptedException;
    boolean awaitUntil(Date deadline) throws InterruptedException;
    void signal();
    void signalAll();
}

Condition依賴于Lock接口,生成一個(gè)Condition的基本代碼是:lock.newCondition()往踢;
調(diào)用Condition的await()和signal()方法腾誉,都必須在lock保護(hù)之內(nèi),就是說必須在lock.lock()和lock.unlock()之間才可以使用。

Conditon中的await()對應(yīng)Object的wait()利职;Condition中的signal()對應(yīng)Object的notify()趣效;Condition中的signalAll()對應(yīng)Object的notifyAll()。

生產(chǎn)者與消費(fèi)者模型

1)使用內(nèi)置鎖猪贪、Object的wait()和notify()實(shí)現(xiàn):

public class ProducerAndConsumer {
    public static void main(String[] args) {
        Producer producer = new Producer();
        Consumer consumer = new Consumer();
        
        for (int i=0; i<5; i++) {
            new Thread(producer).start();
            new Thread(consumer).start();
        }
    }
    
    private static final int CAPACITY = 10;
    private static final LinkedList<Integer> list = new LinkedList<Integer>();
    private static final Object obj = new Object();
    
    static class Producer implements Runnable {
        private static final AtomicInteger count = new AtomicInteger(0);
        public void run() {
            for (int i=0; i<10; i++)
                produce();
        }
        private void produce() {
            synchronized (obj) {
                while (list.size() == CAPACITY) {
                    try {
                        obj.wait();
                    } catch (InterruptedException ex) {
                    }
                }
                Integer num = count.incrementAndGet();
                list.add(num);
                System.out.println(Thread.currentThread().getName() + "生產(chǎn)了 " + num);
                obj.notifyAll();
            }
        }
    }
    
    static class Consumer implements Runnable {
        public void run() {
            for (int i=0; i<10; i++)
                consume();
        }
        private void consume() {
            synchronized (obj) {
                while (list.size() == 0) {
                    try {
                        obj.wait();
                    } catch (InterruptedException ex) {
                    }
                }
                System.out.println(Thread.currentThread().getName() + "消費(fèi)了 " + list.remove());
                obj.notifyAll();
            }
        }
    }
}

注意:在多生產(chǎn)者和多消費(fèi)者的模式下跷敬,必須要使用Object.notifyAll()方法,并且當(dāng)線程喚醒之后要循環(huán)判斷條件是否滿足热押,否則很容易陷入死鎖的狀態(tài)西傀。

2)使用顯式鎖和Condition實(shí)現(xiàn)

public class ConditionTest {
    public static void main(String[] args) {
        Producer producer = new Producer();
        Consumer consumer = new Consumer();
        
        for (int i=0; i<5; i++) {
            new Thread(producer).start();
            new Thread(consumer).start();
        }
    }
    
    private static final int CAPACITY = 1;
    private static final LinkedList<Integer> list = new LinkedList<Integer>();
    private static final Lock lock = new ReentrantLock();
    private static final Condition notFull = lock.newCondition();
    private static final Condition notEmpty = lock.newCondition();
    
    static class Producer implements Runnable {
        private static final AtomicInteger count = new AtomicInteger(0);
        public void run() {
            for (int i=0; i<10; i++)
                produce();
        }
        private void produce() {
            lock.lock();
            try {
                while (list.size() == CAPACITY) {
                    try {
                        notFull.await();
                    } catch (InterruptedException ex) {
                    }
                }
                Integer num = count.incrementAndGet();
                list.add(num);
                System.out.println(Thread.currentThread().getName() + "生產(chǎn)了 " + num);
                notEmpty.signal();
            } finally {
                lock.unlock();
            }
        }
    }
    
    static class Consumer implements Runnable {
        public void run() {
            for (int i=0; i<10; i++)
                consume();
        }
        private void consume() {
            lock.lock();
            try {
                while (list.size() == 0) {
                    try {
                        notEmpty.await();
                    } catch (InterruptedException ex) {
                    }
                }
                System.out.println(Thread.currentThread().getName() + "消費(fèi)了 " + list.remove());
                notFull.signal();
            } finally {
                lock.unlock();
            }
        }
    }
}

兩種實(shí)現(xiàn)方式的區(qū)別

1)使用內(nèi)置鎖時(shí),每個(gè)鎖都只能有一個(gè)相關(guān)聯(lián)的條件隊(duì)列桶癣,因而在像上面的例子中拥褂,多個(gè)線程可能在同一個(gè)條件隊(duì)列上等待不同的條件謂詞,如:生產(chǎn)者線程在等待隊(duì)列notFull牙寞,消費(fèi)者線程在等待隊(duì)列notEmpty饺鹃。當(dāng)條件滿足時(shí),必須使用notifyAll()方法喚醒所有等待的線程间雀,否則程序很容易產(chǎn)生活躍性故障:死鎖悔详。但是,當(dāng)使用notifyAll()方法喚醒所有線程后惹挟,會使得所有線程在鎖上發(fā)生競爭茄螃,然后,大多數(shù)線程又都回到等待狀態(tài)连锯,因此將出現(xiàn)大量的上下文切換操作以及發(fā)生競爭的鎖獲取操作归苍。

2)使用內(nèi)置鎖時(shí),最大的問題就是:無法根據(jù)滿足的條件去喚醒對應(yīng)的線程萎庭,如:notEmpty時(shí)喚醒消費(fèi)者線程霜医,notFull時(shí)喚醒生產(chǎn)者線程。而使用Lock時(shí)驳规,可以產(chǎn)生多個(gè)與之關(guān)聯(lián)的條件隊(duì)列,不同類型的線程將在不同的條件隊(duì)列上等待署海,當(dāng)條件滿足時(shí)吗购,可以使用signal()方法喚醒相應(yīng)的線程,從而極大地減少上下文切換與鎖請求的次數(shù)砸狞。

另外捻勉,Condition比Object提供了更豐富的功能:在每個(gè)鎖上可存在多個(gè)等待、條件等待可以是可中斷的或不可中斷的刀森、基于時(shí)限的等待踱启,以及公平的或非公平的隊(duì)列操作。Condition對象繼承了相關(guān)的Lock對象的公平性,對于公平的鎖埠偿,線程會依照FIFO順序從Condition.await中釋放透罢。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市冠蒋,隨后出現(xiàn)的幾起案子羽圃,更是在濱河造成了極大的恐慌,老刑警劉巖抖剿,帶你破解...
    沈念sama閱讀 207,248評論 6 481
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件朽寞,死亡現(xiàn)場離奇詭異,居然都是意外死亡斩郎,警方通過查閱死者的電腦和手機(jī)脑融,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,681評論 2 381
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來缩宜,“玉大人吨掌,你說我怎么就攤上這事∨。” “怎么了膜宋?”我有些...
    開封第一講書人閱讀 153,443評論 0 344
  • 文/不壞的土叔 我叫張陵,是天一觀的道長炼幔。 經(jīng)常有香客問我秋茫,道長,這世上最難降的妖魔是什么乃秀? 我笑而不...
    開封第一講書人閱讀 55,475評論 1 279
  • 正文 為了忘掉前任肛著,我火速辦了婚禮,結(jié)果婚禮上跺讯,老公的妹妹穿的比我還像新娘枢贿。我一直安慰自己,他們只是感情好刀脏,可當(dāng)我...
    茶點(diǎn)故事閱讀 64,458評論 5 374
  • 文/花漫 我一把揭開白布局荚。 她就那樣靜靜地躺著,像睡著了一般愈污。 火紅的嫁衣襯著肌膚如雪耀态。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 49,185評論 1 284
  • 那天暂雹,我揣著相機(jī)與錄音首装,去河邊找鬼。 笑死杭跪,一個(gè)胖子當(dāng)著我的面吹牛仙逻,可吹牛的內(nèi)容都是我干的驰吓。 我是一名探鬼主播,決...
    沈念sama閱讀 38,451評論 3 401
  • 文/蒼蘭香墨 我猛地睜開眼系奉,長吁一口氣:“原來是場噩夢啊……” “哼檬贰!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起喜最,我...
    開封第一講書人閱讀 37,112評論 0 261
  • 序言:老撾萬榮一對情侶失蹤偎蘸,失蹤者是張志新(化名)和其女友劉穎,沒想到半個(gè)月后瞬内,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體迷雪,經(jīng)...
    沈念sama閱讀 43,609評論 1 300
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,083評論 2 325
  • 正文 我和宋清朗相戀三年虫蝶,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了章咧。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 38,163評論 1 334
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡能真,死狀恐怖赁严,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情粉铐,我是刑警寧澤疼约,帶...
    沈念sama閱讀 33,803評論 4 323
  • 正文 年R本政府宣布,位于F島的核電站蝙泼,受9級特大地震影響程剥,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜汤踏,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,357評論 3 307
  • 文/蒙蒙 一织鲸、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧溪胶,春花似錦搂擦、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,357評論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至懒熙,卻和暖如春丘损,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背工扎。 一陣腳步聲響...
    開封第一講書人閱讀 31,590評論 1 261
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留衔蹲,地道東北人肢娘。 一個(gè)月前我還...
    沈念sama閱讀 45,636評論 2 355
  • 正文 我出身青樓呈础,卻偏偏與公主長得像,于是被迫代替她去往敵國和親橱健。 傳聞我的和親對象是個(gè)殘疾皇子而钞,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 42,925評論 2 344

推薦閱讀更多精彩內(nèi)容