工作三年,小胖問我:什么是生產(chǎn)者消費(fèi)者模式?菜到摳腳肃叶!

生產(chǎn)者消費(fèi)者模式在我們?nèi)粘9ぷ髦杏玫梅浅6圊逅妫热纾涸?strong>模塊解耦、消息隊(duì)列因惭、分布式場景中都很常見岳锁。這個(gè)模式里有三個(gè)角色,他們之間的關(guān)系是如下圖這樣的:

圖源:Java 并發(fā)編程 - 徐隆曦
  • 生產(chǎn)者線程:生產(chǎn)消息蹦魔、數(shù)據(jù)
  • 消費(fèi)者線程:消費(fèi)消息激率、數(shù)據(jù)
  • 阻塞隊(duì)列:作數(shù)據(jù)緩沖、平衡二者能力勿决,避免出現(xiàn)"產(chǎn)能過剩"的情況(生產(chǎn)者生產(chǎn)速度遠(yuǎn)高于消費(fèi)者消費(fèi)速度 or 多個(gè)生產(chǎn)者對一個(gè)消費(fèi)者)以及"供不應(yīng)求"的情況(生產(chǎn)者生產(chǎn)速度遠(yuǎn)低于消費(fèi)者消費(fèi)速度 or 多個(gè)消費(fèi)者對一個(gè)生產(chǎn)者)

從圖中 3 和 4 可以知道:無論阻塞隊(duì)列是滿還是空都可能會(huì)產(chǎn)生阻塞乒躺,阻塞之后就要在合適的時(shí)候去喚醒被阻塞的線程。

Q1:那什么時(shí)候會(huì)喚醒阻塞線程低缩?

  • 1嘉冒、當(dāng)消費(fèi)者判斷隊(duì)列為空時(shí),消費(fèi)者線程進(jìn)入等待表制。這期間生產(chǎn)者一旦往隊(duì)列中放入數(shù)據(jù)健爬,就會(huì)通知所有的消費(fèi)者,喚醒阻塞的消費(fèi)者線程么介。

  • 2、反之蜕衡,當(dāng)生產(chǎn)者判斷隊(duì)列已滿壤短,生產(chǎn)者線程進(jìn)入等待。這期間消費(fèi)者一旦消費(fèi)了數(shù)據(jù)慨仿、隊(duì)列有空位久脯,就會(huì)通知所有的生產(chǎn)者,喚醒阻塞的生產(chǎn)者線程镰吆。

Q2:為什么要用這種模式帘撰?

看了上面的 Q1,大家發(fā)現(xiàn)沒有万皿?生產(chǎn)者不用管消費(fèi)者的動(dòng)作摧找,消費(fèi)者也不用管生產(chǎn)者的動(dòng)作;它兩之間就是通過阻塞隊(duì)列通信牢硅,實(shí)現(xiàn)了解耦蹬耘;阻塞隊(duì)列的加入,平衡二者能力减余;生產(chǎn)者只有在隊(duì)列滿或消費(fèi)者只有在隊(duì)列空時(shí)才會(huì)等待综苔,其他時(shí)間誰搶到鎖誰工作,提高效率。以上就是原因~

使用 wait如筛、notify/notifyAll 實(shí)現(xiàn)

上篇文章《正確使用 wait堡牡、notify/notifyAll》說過,wait 讓當(dāng)前線程等待并釋放鎖杨刨,notify 喚醒任意一個(gè)等待同一個(gè)鎖的線程悴侵,notifyAll 則是喚醒所有等待該鎖的線程,然后誰搶到鎖拭嫁,誰執(zhí)行可免。這就是所謂的等待喚醒機(jī)制

先來看看用等待喚醒機(jī)制如何實(shí)現(xiàn)生產(chǎn)者、消費(fèi)者模式的做粤,首先是阻塞隊(duì)列:

public class MyBlockingQueue {

    private int maxSize;
    private LinkedList<Integer> queue;

    public MyBlockingQueue(int size) {
        this.maxSize = size;
        queue = new LinkedList<>();
    }

    public synchronized void put() throws InterruptedException {
        while (queue.size() == maxSize) {
            System.out.println("隊(duì)列已滿浇借,生產(chǎn)者: " + Thread.currentThread().getName() +"進(jìn)入等待");
            wait();
        }
        Random random = new Random();
        int i = random.nextInt();
        System.out.println("隊(duì)列未滿,生產(chǎn)者: " +
                Thread.currentThread().getName() +"放入數(shù)據(jù)" + i);

        // 隊(duì)列空才去喚醒消費(fèi)者怕品,其他時(shí)間自由競爭鎖
        if (queue.size() == 0) {
            notifyAll();
        }

        queue.add(i);
    }

    public synchronized void take() throws InterruptedException {
        while (queue.size() == 0) {
            System.out.println("隊(duì)列為空妇垢,消費(fèi)者: " + Thread.currentThread().getName() +"進(jìn)入等待");
            wait();
        }

        // 隊(duì)列滿了才去喚醒生產(chǎn)者,其他時(shí)間自由競爭鎖
        if (queue.size() == maxSize) {
            notifyAll();
        }

        System.out.println("隊(duì)列有數(shù)據(jù)肉康,消費(fèi)者: " +
                Thread.currentThread().getName() +"取出數(shù)據(jù): " + queue.remove());
    }

}

主要邏輯在阻塞隊(duì)列這邊:先看 put 方法闯估,while 檢查隊(duì)列是否滿?滿則進(jìn)入等待并主動(dòng)釋放鎖吼和,不滿則生產(chǎn)數(shù)據(jù)涨薪,同時(shí)判斷放入數(shù)據(jù)之前隊(duì)列是否空?空則喚醒消費(fèi)者(因?yàn)殛?duì)列已有數(shù)據(jù)炫乓,可消費(fèi))刚夺。

再看 take 方法,while 檢查隊(duì)列是否空末捣?空則進(jìn)入等待并主動(dòng)釋放鎖侠姑,不空則生產(chǎn)數(shù)據(jù),同時(shí)判斷取出數(shù)據(jù)之前隊(duì)列是否已滿箩做?滿則喚醒生產(chǎn)者(因?yàn)殛?duì)列已有空位莽红,可生產(chǎn))。

為什么是 while 不是 if 邦邦?

大家可能有個(gè)疑問安吁。為什么判斷隊(duì)列 size 進(jìn)入等待狀態(tài)這里是用 while,不能用 if 嗎圃酵?就這個(gè) demo 而言柳畔,是可以的。因?yàn)槲覀兊纳a(chǎn)者和消費(fèi)者線程都只有一個(gè)郭赐,但是多線程情況下用 if 就大錯(cuò)特錯(cuò)了薪韩。想象以下情況:

  • 假設(shè)有兩個(gè)消費(fèi)者一個(gè)生產(chǎn)者确沸。隊(duì)列為空,消費(fèi)者一進(jìn)入等待狀態(tài)俘陷,釋放鎖罗捎。消費(fèi)者二搶到鎖,進(jìn)入 if(queue.size == 0) 的判斷拉盾,也進(jìn)入等待桨菜,釋放鎖。這時(shí)生產(chǎn)者搶到鎖生產(chǎn)數(shù)據(jù)捉偏,隊(duì)列有數(shù)據(jù)了倒得。反過來喚醒兩個(gè)消費(fèi)者。

  • 消費(fèi)者一搶到鎖執(zhí)行 wait() 后的邏輯夭禽,取完數(shù)據(jù)釋放鎖霞掺。這時(shí)消費(fèi)者二拿到鎖,執(zhí)行 wait() 后的邏輯取數(shù)據(jù)讹躯,但是此時(shí)隊(duì)列的數(shù)據(jù)已被消費(fèi)者一取出菩彬,沒有數(shù)據(jù)了,這時(shí)就會(huì)報(bào)異常了潮梯。

而用 while 為什么可以骗灶?因?yàn)椴还苁窍M(fèi)者一還是二搶到鎖,循環(huán)體的邏輯之前秉馏。根據(jù) while 的語法耙旦,它會(huì)再一次判斷條件是否成立,而 if 不會(huì)沃饶。這就是用 while 不用 if 的原因母廷。

生產(chǎn)者:

public class Producer implements Runnable {

    private MyBlockingQueue myBlockingQueue;

    public Producer(MyBlockingQueue myBlockingQueue) {
        this.myBlockingQueue = myBlockingQueue;
    }

    @Override
    public void run() {
        for (int i = 0; i < 100; i++) {
            try {
                myBlockingQueue.put();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

消費(fèi)者:

public class Consumer implements Runnable{

    private MyBlockingQueue myBlockingQueue;

    public Consumer(MyBlockingQueue myBlockingQueue) {
        this.myBlockingQueue = myBlockingQueue;
    }

    @Override
    public void run() {
        for (int i = 0; i < 100; i++) {
            try {
                myBlockingQueue.take();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

}

測試類:

public class MyBlockingQueueTest {

    public static void main(String[] args) {
        MyBlockingQueue myBlockingQueue = new MyBlockingQueue(10);
        Producer producer = new Producer(myBlockingQueue);
        Consumer consumer = new Consumer(myBlockingQueue);
        new Thread(producer).start();
        new Thread(consumer).start();
    }

}

使用 Condition 實(shí)現(xiàn)

Condition 是一個(gè)多線程間協(xié)調(diào)通信的工具類,它的 await糊肤、sign/signAll 方法正好對應(yīng)Object 的 wait、notify/notifyAll 方法氓鄙。相比于 Object 的 wait馆揉、notify 方法,Condition 的 await抖拦、signal 結(jié)合的方式實(shí)現(xiàn)線程間協(xié)作更加安全和高效升酣,所以更推薦這種方式實(shí)現(xiàn)線程間協(xié)作。關(guān)于 Condition 后面章節(jié)會(huì)繼續(xù)研究态罪,敬請關(guān)注

Object 的 wait噩茄、notify 方式需要結(jié)合 synchronized 關(guān)鍵字實(shí)現(xiàn)等待喚醒機(jī)制,同樣 Condition 也需要結(jié)合 Lock 類-复颈。那么這種方式如何實(shí)現(xiàn)生產(chǎn)者绩聘、消費(fèi)者模式?看代碼:

public class MyBlockingQueueForCondition {

    private Queue<Integer> queue;
    private int max = 10;
    private ReentrantLock lock = new ReentrantLock();
    private Condition notEmpty = lock.newCondition();
    private Condition notFull = lock.newCondition();

    public MyBlockingQueueForCondition(int size) {
        this.max = size;
        queue = new LinkedList();
    }

    public void put(Integer i) throws InterruptedException {
        // 加鎖
        lock.lock();
        try {
            // 隊(duì)列滿了,進(jìn)入等待
            while (queue.size() == max) {
                System.out.println("隊(duì)列已滿凿菩,生產(chǎn)者: " + Thread.currentThread().getName() + "進(jìn)入等待");
                notFull.await();
            }

            // 加入數(shù)據(jù)之前机杜,隊(duì)列為空?通知消費(fèi)者衅谷,可以消費(fèi)
            if (queue.size() == 0) {
                notEmpty.signalAll();
            }

            // 否則椒拗,繼續(xù)生產(chǎn)
            queue.add(i);
        } finally {
            // 最后別忘記釋放鎖
            lock.unlock();
        }
    }

    public Integer take() throws InterruptedException {
        // 加鎖
        lock.lock();
        try {
            // 隊(duì)列無數(shù)據(jù),進(jìn)入等待
            while (queue.size() == 0) {
                System.out.println("隊(duì)列為空获黔,消費(fèi)者: " + Thread.currentThread().getName() + "進(jìn)入等待");
                notEmpty.await();
            }

            // 取出數(shù)據(jù)之前蚀苛,隊(duì)列已滿?通知生產(chǎn)者玷氏,可以生產(chǎn)
            if (queue.size() == max) {
                notFull.signalAll();
            }

            // 否則堵未,取出
            return queue.remove();
        } finally {
            // 最后別忘記釋放鎖
            lock.unlock();
        }
    }
}

首先,定義了一個(gè)隊(duì)列以及 ReentrantLock 類型的鎖预茄,在這基礎(chǔ)上還創(chuàng)建 notFull兴溜、notEmpty 兩個(gè)條件,分別代表未滿耻陕、不為空的條件拙徽。最后定義了 take、put 方法诗宣。

take 和 put 邏輯差不多膘怕,這里只說 put 。因?yàn)橄M(fèi)生產(chǎn)模式肯定用于多線程環(huán)境召庞,需要保證同步岛心。這里還是先獲取鎖,確保同步篮灼。之后依然是判斷隊(duì)列是否已滿忘古?滿了進(jìn)入等待并釋放鎖,不滿則繼續(xù)生產(chǎn)诅诱,同時(shí)判斷隊(duì)列在生產(chǎn)前是否為空髓堪,為空才去喚醒消費(fèi)者。否則不喚醒娘荡,因?yàn)楫?dāng)隊(duì)列為空消費(fèi)者才進(jìn)入阻塞干旁。

PS:最后是一個(gè)非常重要的細(xì)節(jié),在 finally 里面釋放鎖炮沐,否則有可能出現(xiàn)異常無法釋放鎖的情況争群。

生產(chǎn)者:

public class ProducerForCondition implements Runnable {

    private MyBlockingQueueForCondition myBlockingQueueForCondition;

    public ProducerForCondition(MyBlockingQueueForCondition myBlockingQueueForCondition) {
        this.myBlockingQueueForCondition = myBlockingQueueForCondition;
    }

    @Override
    public void run() {
        for (int i = 0; i < 100; i++) {
            try {
                myBlockingQueueForCondition.put(i);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

消費(fèi)者:

public class ConsumerForCondition implements Runnable{

    private MyBlockingQueueForCondition myBlockingQueueForCondition;

    public ConsumerForCondition(MyBlockingQueueForCondition myBlockingQueueForCondition) {
        this.myBlockingQueueForCondition = myBlockingQueueForCondition;
    }

    @Override
    public void run() {
        for (int i = 0; i < 100; i++) {
            try {
                System.out.println("消費(fèi)者取出數(shù)據(jù): " + myBlockingQueueForCondition.take());
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

}

測試類:

public class MyBlockingQueueForConditionTest {

    public static void main(String[] args) {
        MyBlockingQueueForCondition myBlockingQueueForCondition = new MyBlockingQueueForCondition(10);
        ProducerForCondition producerForCondition = new ProducerForCondition(myBlockingQueueForCondition);
        ConsumerForCondition consumerForCondition = new ConsumerForCondition(myBlockingQueueForCondition);
        new Thread(producerForCondition).start();
        new Thread(consumerForCondition).start();
    }

}

使用 BlockingQueue 實(shí)現(xiàn)

看完前兩種方式之后,有些小伙伴可能會(huì)說大年,實(shí)現(xiàn)個(gè)生產(chǎn)者消費(fèi)者這么煩么换薄?其實(shí)主要代碼還是在阻塞隊(duì)列玉雾,這點(diǎn) Java 早就為我們考慮好了,它提供了 BlockingQueue 接口专控,并有實(shí)現(xiàn)類: ArrayBlockingQueue抹凳、DelayQueue、 LinkedBlockingDeque伦腐、LinkedBlockingQueue赢底、等。(關(guān)于阻塞隊(duì)列柏蘑,狗哥的多線程系列后面也會(huì)講到

我們選用最簡單的 ArrayBlockingQueue 實(shí)現(xiàn)幸冻。它的內(nèi)部也是采取 ReentrantLock 和 Condition 結(jié)合的等待喚醒機(jī)制。所以咳焚,上面的兩種方式其實(shí)是為這種方式鋪墊洽损。不多比比,上代碼:

public class ArrayBlockingQueueTest {

    public static void main(String[] args) {
        // 初始化長度為 10 的 ArrayBlockingQueue
        BlockingQueue<Integer> queue = new ArrayBlockingQueue<>(10);
        // 生產(chǎn)者
        Runnable producer = () -> {
            try {
                // 放入數(shù)據(jù)
                Random random = new Random();
                while (true) {
                    queue.put(random.nextInt());
                }
            } catch (Exception e) {
                System.out.println("生產(chǎn)數(shù)據(jù)出錯(cuò): " + e.getMessage());
            }
        };
        // 開啟線程生產(chǎn)數(shù)據(jù)
        new Thread(producer).start();

        // 消費(fèi)者
        Runnable consumer = () -> {
            try {
                // 取出數(shù)據(jù)
                while (true) {
                    System.out.println(queue.take());
                }
            } catch (Exception e) {
                System.out.println("消費(fèi)數(shù)據(jù)出錯(cuò): " + e.getMessage());
            }
        };
        // 開啟線程消費(fèi)數(shù)據(jù)
        new Thread(consumer).start();
    }

}

創(chuàng)建一個(gè) ArrayBlockingQueue 并給定最大長度為 10革半,創(chuàng)建生產(chǎn)者和消費(fèi)者碑定。生產(chǎn)者在 while(true) 里面一直生產(chǎn),與此同時(shí)消費(fèi)者也是不斷取數(shù)據(jù)又官,有數(shù)據(jù)就取出來延刘。

看著是不是很簡單?但其實(shí)背后 ArrayBlockingQueue 已經(jīng)為我們做好了線程間通信的工作了六敬,比如隊(duì)列滿了就去阻塞生產(chǎn)者線程碘赖,隊(duì)列有空就去喚醒生產(chǎn)者線程等

巨人的肩膀

總結(jié)

看了這幾個(gè)例子之后外构,相信你對生產(chǎn)者消費(fèi)者模式也有所了解普泡。以后面試官讓你手寫一個(gè)阻塞隊(duì)列,肯定也難不倒你审编。

小福利

如果看到這里撼班,喜歡這篇文章的話,請幫點(diǎn)個(gè)好看垒酬。微信搜索一個(gè)優(yōu)秀的廢人权烧,關(guān)注后回復(fù)電子書送你 100+ 本編程電子書 ,不只 Java 哦伤溉,詳情看下圖∑蘼剩回復(fù) 1024送你一套完整的 java 視頻教程乱顾。

資源
C語言
C++
Java
Git
Python
GO
Linux
經(jīng)典必讀
面試相關(guān)
前端
人工智能
設(shè)計(jì)模式
數(shù)據(jù)庫
數(shù)據(jù)結(jié)構(gòu)與算法
計(jì)算機(jī)基礎(chǔ)
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市宫静,隨后出現(xiàn)的幾起案子走净,更是在濱河造成了極大的恐慌券时,老刑警劉巖,帶你破解...
    沈念sama閱讀 216,372評論 6 498
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件伏伯,死亡現(xiàn)場離奇詭異橘洞,居然都是意外死亡,警方通過查閱死者的電腦和手機(jī)说搅,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,368評論 3 392
  • 文/潘曉璐 我一進(jìn)店門炸枣,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人弄唧,你說我怎么就攤上這事适肠。” “怎么了候引?”我有些...
    開封第一講書人閱讀 162,415評論 0 353
  • 文/不壞的土叔 我叫張陵侯养,是天一觀的道長。 經(jīng)常有香客問我澄干,道長逛揩,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 58,157評論 1 292
  • 正文 為了忘掉前任麸俘,我火速辦了婚禮辩稽,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘疾掰。我一直安慰自己搂誉,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,171評論 6 388
  • 文/花漫 我一把揭開白布静檬。 她就那樣靜靜地躺著炭懊,像睡著了一般。 火紅的嫁衣襯著肌膚如雪拂檩。 梳的紋絲不亂的頭發(fā)上侮腹,一...
    開封第一講書人閱讀 51,125評論 1 297
  • 那天,我揣著相機(jī)與錄音稻励,去河邊找鬼父阻。 笑死,一個(gè)胖子當(dāng)著我的面吹牛望抽,可吹牛的內(nèi)容都是我干的加矛。 我是一名探鬼主播,決...
    沈念sama閱讀 40,028評論 3 417
  • 文/蒼蘭香墨 我猛地睜開眼煤篙,長吁一口氣:“原來是場噩夢啊……” “哼斟览!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起辑奈,我...
    開封第一講書人閱讀 38,887評論 0 274
  • 序言:老撾萬榮一對情侶失蹤苛茂,失蹤者是張志新(化名)和其女友劉穎已烤,沒想到半個(gè)月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體妓羊,經(jīng)...
    沈念sama閱讀 45,310評論 1 310
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡胯究,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,533評論 2 332
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了躁绸。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片裕循。...
    茶點(diǎn)故事閱讀 39,690評論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖涨颜,靈堂內(nèi)的尸體忽然破棺而出费韭,到底是詐尸還是另有隱情,我是刑警寧澤庭瑰,帶...
    沈念sama閱讀 35,411評論 5 343
  • 正文 年R本政府宣布星持,位于F島的核電站,受9級特大地震影響弹灭,放射性物質(zhì)發(fā)生泄漏督暂。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,004評論 3 325
  • 文/蒙蒙 一穷吮、第九天 我趴在偏房一處隱蔽的房頂上張望逻翁。 院中可真熱鬧,春花似錦捡鱼、人聲如沸八回。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,659評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽缠诅。三九已至,卻和暖如春乍迄,著一層夾襖步出監(jiān)牢的瞬間管引,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 32,812評論 1 268
  • 我被黑心中介騙來泰國打工闯两, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留褥伴,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 47,693評論 2 368
  • 正文 我出身青樓漾狼,卻偏偏與公主長得像重慢,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個(gè)殘疾皇子逊躁,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,577評論 2 353

推薦閱讀更多精彩內(nèi)容