【線程通信】生產(chǎn)者-消費(fèi)者模式

一兼搏、簡(jiǎn)述

1??生產(chǎn)者消費(fèi)者模式并不是 GOF 提出的 23 種設(shè)計(jì)模式之一,23 種設(shè)計(jì)模式都是建立在面向?qū)ο蟮幕A(chǔ)之上的缩歪,但其實(shí)面向過程的編程中也有很多高效的編程模式,生產(chǎn)者消費(fèi)者模式便是其中之一谍憔,它是編程過程中最常用的一種設(shè)計(jì)模式匪蝙。

一個(gè)常見的場(chǎng)景:某個(gè)模塊負(fù)責(zé)產(chǎn)生數(shù)據(jù),這些數(shù)據(jù)由另一個(gè)模塊來(lái)負(fù)責(zé)處理(此處的模塊是廣義的习贫,可以是類逛球、函數(shù)、線程苫昌、進(jìn)程等)颤绕。產(chǎn)生數(shù)據(jù)的模塊,就形象地稱為生產(chǎn)者;而處理數(shù)據(jù)的模塊奥务,就稱為消費(fèi)者涕烧。單單抽象出生產(chǎn)者和消費(fèi)者,還夠不上是生產(chǎn)者/消費(fèi)者模式汗洒。該模式還需要有一個(gè)緩沖區(qū)處于生產(chǎn)者和消費(fèi)者之間,作為一個(gè)中介父款。生產(chǎn)者把數(shù)據(jù)放入緩沖區(qū)溢谤,而消費(fèi)者從緩沖區(qū)取出數(shù)據(jù)。

2??舉個(gè)寄信的例子憨攒,大致過程如下:

  1. 把信寫好——相當(dāng)于生產(chǎn)者制造數(shù)據(jù)世杀。
  2. 把信放入郵筒——相當(dāng)于生產(chǎn)者把數(shù)據(jù)放入緩沖區(qū)。
  3. 郵遞員把信從郵筒取出——相當(dāng)于消費(fèi)者把數(shù)據(jù)取出緩沖區(qū)肝集。
  4. 郵遞員把信拿去郵局做相應(yīng)的處理——相當(dāng)于消費(fèi)者處理數(shù)據(jù)瞻坝。

3??說(shuō)明

  1. 生產(chǎn)消費(fèi)者模式可以有效的對(duì)數(shù)據(jù)解耦,優(yōu)化系統(tǒng)結(jié)構(gòu)杏瞻。
  2. 降低生產(chǎn)者和消費(fèi)者線程相互之間的依賴與性能要求所刀。
  3. 一般使用 BlockingQueue 作為數(shù)據(jù)緩沖隊(duì)列,是通過鎖和阻塞來(lái)實(shí)現(xiàn)數(shù)據(jù)之間的同步捞挥。如果對(duì)緩沖隊(duì)列有性能要求浮创,則可以使用基于 CAS 無(wú)鎖設(shè)計(jì)的 ConcurrentLinkedQueue

二砌函、生產(chǎn)者-消費(fèi)者模式是一個(gè)經(jīng)典的多線程設(shè)計(jì)模式

【生產(chǎn)者-消費(fèi)者模式】為多線程間的協(xié)作提供了良好的解決方案斩披。在該模式中,通常有兩類線程讹俊,即若干個(gè)生產(chǎn)者線程和若干個(gè)消費(fèi)者線程垦沉。生產(chǎn)者線程負(fù)責(zé)提交用戶請(qǐng)求,消費(fèi)者線程則負(fù)責(zé)具體處理生產(chǎn)者提交的任務(wù)仍劈。生產(chǎn)者和消費(fèi)者在同一時(shí)間段內(nèi)共用同一存儲(chǔ)空間厕倍,生產(chǎn)者向空間里生產(chǎn)數(shù)據(jù),而消費(fèi)者取走數(shù)據(jù)耳奕。阻塞隊(duì)列就相當(dāng)于一個(gè)緩沖區(qū)绑青,平衡了生產(chǎn)者和消費(fèi)者的處理能力。這個(gè)阻塞隊(duì)列就是用來(lái)給生產(chǎn)者和消費(fèi)者解耦的屋群。

Java 中一共有四種方法支持同步闸婴,其中前三個(gè)是同步方法,一個(gè)是管道方法芍躏。

  1. Object 的 wait()/notify()邪乍。
  2. Lock 和 Condition 的 await()/signal()。
  3. BlockingQueue 阻塞隊(duì)列方法。
  4. PipedInputStream/PipedOutputStream

三庇楞、wait()/notify()

wait()/nofity() 是基類 Object 的兩個(gè)方法榜配,也就意味著所有 Java 類都有這兩個(gè)方法,這樣就可以為任何對(duì)象實(shí)現(xiàn)同步機(jī)制吕晌。

  1. wait():當(dāng)緩沖區(qū)已滿/空時(shí)蛋褥,生產(chǎn)者/消費(fèi)者線程停止自己的執(zhí)行,放棄鎖睛驳,使自己處于等待狀態(tài)烙心,讓其他線程執(zhí)行。
  2. notify():當(dāng)生產(chǎn)者/消費(fèi)者向緩沖區(qū)放入/取出一個(gè)產(chǎn)品時(shí)乏沸,向其他等待的線程發(fā)出可執(zhí)行的通知淫茵,同時(shí)放棄鎖,使自己處于等待狀態(tài)蹬跃。
public class ProducerConsumer {
    private static final int CAPACITY = 5;

    public static void main(String args[]) {
        Queue<Integer> queue = new LinkedList<Integer>();

        Thread producer1 = new Producer("P-1", queue, CAPACITY);
        Thread producer2 = new Producer("P-2", queue, CAPACITY);
        Thread consumer1 = new Consumer("C1", queue, CAPACITY);
        Thread consumer2 = new Consumer("C2", queue, CAPACITY);
        Thread consumer3 = new Consumer("C3", queue, CAPACITY);

        producer1.start();
        producer2.start();
        consumer1.start();
        consumer2.start();
        consumer3.start();
    }
}

生產(chǎn)者

public static class Producer extends Thread {
    private Queue<Integer> queue;
    String name;
    int maxSize;
    int i = 0;

    public Producer(String name, Queue<Integer> queue, int maxSize) {
        super(name);
        this.name = name;
        this.queue = queue;
        this.maxSize = maxSize;
    }

    @Override
    public void run() {
        while (true) {
            synchronized (queue) {
                while (queue.size() == maxSize) {
                    try {
                        System.out.println("Queue is full, Producer[" + name + "] thread waiting for " + "consumer to take something from queue.");
                        queue.wait();
                    } catch (Exception ex) {
                        ex.printStackTrace();
                    }
                }
                System.out.println("[" + name + "] Producing value : +" + i);
                queue.offer(i++);
                queue.notifyAll();

                try {
                    Thread.sleep(new Random().nextInt(1000));
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

消費(fèi)者

public static class Consumer extends Thread {
    private Queue<Integer> queue;
    String name;
    int maxSize;

    public Consumer(String name, Queue<Integer> queue, int maxSize) {
        super(name);
        this.name = name;
        this.queue = queue;
        this.maxSize = maxSize;
    }

    @Override
    public void run() {
        while (true) {
            synchronized (queue) {
                while (queue.isEmpty()) {
                    try {
                        System.out.println("Queue is empty, Consumer[" + name + "] thread is waiting for Producer");
                        queue.wait();
                    } catch (Exception ex) {
                        ex.printStackTrace();
                    }
                }
                int x = queue.poll();
                System.out.println("[" + name + "] Consuming value : " + x);
                queue.notifyAll();

                try {
                    Thread.sleep(new Random().nextInt(1000));
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

判斷 Queue 大小為 0 或者大于等于 queueSize 時(shí)須使用while(condition){}匙瘪,不能使用if(condition){}。其中while(condition)蝶缀,它又被叫做“自旋鎖”丹喻。為防止該線程沒有收到notify()調(diào)用也從wait()中返回(也稱作虛假喚醒),這個(gè)線程會(huì)重新去檢查 condition 條件以決定當(dāng)前是否可以安全地繼續(xù)執(zhí)行還是需要重新保持等待翁都,而不是認(rèn)為線程被喚醒了就可以安全地繼續(xù)執(zhí)行了驻啤。

四、使用 Lock 和 Condition 的 await()/signal()

JDK5.0 之后荐吵,Java 提供了更加健壯的線程處理機(jī)制骑冗,包括同步、鎖定先煎、線程池等贼涩,它們可以實(shí)現(xiàn)更細(xì)粒度的線程控制。Condition 接口的 await()/signal() 就是其中用來(lái)做同步的兩種方法薯蝎,它們的功能基本上和 Object 的 wait()/nofity() 相同遥倦,完全可以取代它們,但是它們和新引入的鎖定機(jī)制 Lock 直接掛鉤占锯,具有更大的靈活性袒哥。通過在 Lock 對(duì)象上調(diào)用newCondition(),將條件變量和一個(gè)鎖對(duì)象進(jìn)行綁定消略,進(jìn)而控制并發(fā)程序訪問競(jìng)爭(zhēng)資源的安全堡称。下面來(lái)看代碼:

/**
 * 生產(chǎn)者消費(fèi)者模式:使用Lock和Condition實(shí)現(xiàn)
 * {@link java.util.concurrent.locks.Lock}
 * {@link java.util.concurrent.locks.Condition}
 */
public class ProducerConsumerByLock {
    private static final int CAPACITY = 5;
    private static final Lock lock = new ReentrantLock();
    private static final Condition fullCondition = lock.newCondition();     //隊(duì)列滿的條件
    private static final Condition emptyCondition = lock.newCondition();    //隊(duì)列空的條件

    public static void main(String args[]) {
        Queue<Integer> queue = new LinkedList<Integer>();

        Thread producer1 = new Producer("P-1", queue, CAPACITY);
        Thread producer2 = new Producer("P-2", queue, CAPACITY);
        Thread consumer1 = new Consumer("C1", queue, CAPACITY);
        Thread consumer2 = new Consumer("C2", queue, CAPACITY);
        Thread consumer3 = new Consumer("C3", queue, CAPACITY);

        producer1.start();
        producer2.start();
        consumer1.start();
        consumer2.start();
        consumer3.start();
    }
}

生產(chǎn)者

public static class Producer extends Thread {
    private Queue<Integer> queue;
    String name;
    int maxSize;
    int i = 0;

    public Producer(String name, Queue<Integer> queue, int maxSize) {
        super(name);
        this.name = name;
        this.queue = queue;
        this.maxSize = maxSize;
    }

    @Override
    public void run() {
        while (true) {

            //獲得鎖
            lock.lock();
            while (queue.size() == maxSize) {
                try {
                    System.out.println("Queue is full, Producer[" + name + "] thread waiting for " + "consumer to take something from queue.");
                    //條件不滿足,生產(chǎn)阻塞
                    fullCondition.await();
                } catch (InterruptedException ex) {
                    ex.printStackTrace();
                }
            }
            System.out.println("[" + name + "] Producing value : +" + i);
            queue.offer(i++);

            //喚醒其他所有生產(chǎn)者艺演、消費(fèi)者
            fullCondition.signalAll();
            emptyCondition.signalAll();

            //釋放鎖
            lock.unlock();

            try {
                Thread.sleep(new Random().nextInt(1000));
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

    }
}

消費(fèi)者

public static class Consumer extends Thread {
    private Queue<Integer> queue;
    String name;
    int maxSize;

    public Consumer(String name, Queue<Integer> queue, int maxSize) {
        super(name);
        this.name = name;
        this.queue = queue;
        this.maxSize = maxSize;
    }

    @Override
    public void run() {
        while (true) {
            //獲得鎖
            lock.lock();

            while (queue.isEmpty()) {
                try {
                    System.out.println("Queue is empty, Consumer[" + name + "] thread is waiting for Producer");
                    //條件不滿足却紧,消費(fèi)阻塞
                    emptyCondition.await();
                } catch (Exception ex) {
                    ex.printStackTrace();
                }
            }
            int x = queue.poll();
            System.out.println("[" + name + "] Consuming value : " + x);

            //喚醒其他所有生產(chǎn)者桐臊、消費(fèi)者
            fullCondition.signalAll();
            emptyCondition.signalAll();

            //釋放鎖
            lock.unlock();

            try {
                Thread.sleep(new Random().nextInt(1000));
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

五、使用 BlockingQueue 阻塞隊(duì)列方法

JDK1.5 以后新增的java.util.concurrent包新增了 BlockingQueue 接口晓殊。并提供了如下幾種阻塞隊(duì)列實(shí)現(xiàn):

  1. java.util.concurrent.ArrayBlockingQueue
  2. java.util.concurrent.LinkedBlockingQueue
  3. java.util.concurrent.SynchronousQueue
  4. java.util.concurrent.PriorityBlockingQueue

實(shí)現(xiàn)生產(chǎn)者-消費(fèi)者模型使用 ArrayBlockingQueue 或者 LinkedBlockingQueue 即可断凶。

這里使用 LinkedBlockingQueue,它是一個(gè)已經(jīng)在內(nèi)部實(shí)現(xiàn)了同步的隊(duì)列巫俺,實(shí)現(xiàn)方式采用的是 await()/signal()认烁。它可以在生成對(duì)象時(shí)指定容量大小。它用于阻塞操作的是 put()/take()介汹。

  1. put():類似于上面的生產(chǎn)者線程砚著,容量達(dá)到最大時(shí),自動(dòng)阻塞痴昧。
  2. take():類似于上面的消費(fèi)者線程,容量為 0 時(shí)冠王,自動(dòng)阻塞赶撰。

LinkedBlockingQueue 類的 put() 源碼:

/** Main lock guarding all access */
final ReentrantLock lock = new ReentrantLock();

/** Condition for waiting takes */
private final Condition notEmpty = lock.newCondition();

/** Condition for waiting puts */
private final Condition notFull = lock.newCondition();

public void put(E e) throws InterruptedException {
    putLast(e);
}

public void putLast(E e) throws InterruptedException {
    if (e == null) throw new NullPointerException();
    Node<E> node = new Node<E>(e);
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        while (!linkLast(node))
            notFull.await();
    } finally {
        lock.unlock();
    }
}

整體邏輯如下:

/**
 * 生產(chǎn)者消費(fèi)者模式:使用{@link java.util.concurrent.BlockingQueue}實(shí)現(xiàn)
 */
public class ProducerConsumerByBQ{
    private static final int CAPACITY = 5;

    public static void main(String args[]){
        LinkedBlockingDeque<Integer> blockingQueue = new LinkedBlockingDeque<Integer>(CAPACITY);

        Thread producer1 = new Producer("P-1", blockingQueue, CAPACITY);
        Thread producer2 = new Producer("P-2", blockingQueue, CAPACITY);
        Thread consumer1 = new Consumer("C1", blockingQueue, CAPACITY);
        Thread consumer2 = new Consumer("C2", blockingQueue, CAPACITY);
        Thread consumer3 = new Consumer("C3", blockingQueue, CAPACITY);

        producer1.start();
        producer2.start();
        consumer1.start();
        consumer2.start();
        consumer3.start();
    }

    /**
     * 生產(chǎn)者
     */
    public static class Producer extends Thread{
        private LinkedBlockingDeque<Integer> blockingQueue;
        String name;
        int maxSize;
        int i = 0;

        public Producer(String name, LinkedBlockingDeque<Integer> queue, int maxSize){
            super(name);
            this.name = name;
            this.blockingQueue = queue;
            this.maxSize = maxSize;
        }

        @Override
        public void run(){
            while(true){
                try {
                    blockingQueue.put(i);
                    System.out.println("[" + name + "] Producing value : +" + i);
                    i++;

                    //暫停最多1秒
                    Thread.sleep(new Random().nextInt(1000));
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }

        }
    }

    /**
     * 消費(fèi)者
     */
    public static class Consumer extends Thread{
        private LinkedBlockingDeque<Integer> blockingQueue;
        String name;
        int maxSize;

        public Consumer(String name, LinkedBlockingDeque<Integer> queue, int maxSize){
            super(name);
            this.name = name;
            this.blockingQueue = queue;
            this.maxSize = maxSize;
        }

        @Override
        public void run(){
            while(true){
                try {
                    int x = blockingQueue.take();
                    System.out.println("[" + name + "] Consuming : " + x);

                    //暫停最多1秒
                    Thread.sleep(new Random().nextInt(1000));
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市柱彻,隨后出現(xiàn)的幾起案子豪娜,更是在濱河造成了極大的恐慌,老刑警劉巖哟楷,帶你破解...
    沈念sama閱讀 212,718評(píng)論 6 492
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件瘤载,死亡現(xiàn)場(chǎng)離奇詭異,居然都是意外死亡卖擅,警方通過查閱死者的電腦和手機(jī)鸣奔,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,683評(píng)論 3 385
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)惩阶,“玉大人挎狸,你說(shuō)我怎么就攤上這事《峡” “怎么了锨匆?”我有些...
    開封第一講書人閱讀 158,207評(píng)論 0 348
  • 文/不壞的土叔 我叫張陵,是天一觀的道長(zhǎng)冬筒。 經(jīng)常有香客問我,道長(zhǎng),這世上最難降的妖魔是什么穴翩? 我笑而不...
    開封第一講書人閱讀 56,755評(píng)論 1 284
  • 正文 為了忘掉前任氨菇,我火速辦了婚禮,結(jié)果婚禮上响牛,老公的妹妹穿的比我還像新娘鞭衩。我一直安慰自己学搜,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 65,862評(píng)論 6 386
  • 文/花漫 我一把揭開白布论衍。 她就那樣靜靜地躺著瑞佩,像睡著了一般。 火紅的嫁衣襯著肌膚如雪坯台。 梳的紋絲不亂的頭發(fā)上炬丸,一...
    開封第一講書人閱讀 50,050評(píng)論 1 291
  • 那天,我揣著相機(jī)與錄音蜒蕾,去河邊找鬼稠炬。 笑死,一個(gè)胖子當(dāng)著我的面吹牛咪啡,可吹牛的內(nèi)容都是我干的首启。 我是一名探鬼主播,決...
    沈念sama閱讀 39,136評(píng)論 3 410
  • 文/蒼蘭香墨 我猛地睜開眼撤摸,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼毅桃!你這毒婦竟也來(lái)了?” 一聲冷哼從身側(cè)響起准夷,我...
    開封第一講書人閱讀 37,882評(píng)論 0 268
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤钥飞,失蹤者是張志新(化名)和其女友劉穎,沒想到半個(gè)月后衫嵌,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體读宙,經(jīng)...
    沈念sama閱讀 44,330評(píng)論 1 303
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,651評(píng)論 2 327
  • 正文 我和宋清朗相戀三年楔绞,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了结闸。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 38,789評(píng)論 1 341
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡酒朵,死狀恐怖膀估,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情耻讽,我是刑警寧澤察纯,帶...
    沈念sama閱讀 34,477評(píng)論 4 333
  • 正文 年R本政府宣布,位于F島的核電站针肥,受9級(jí)特大地震影響饼记,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜慰枕,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 40,135評(píng)論 3 317
  • 文/蒙蒙 一具则、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧具帮,春花似錦博肋、人聲如沸低斋。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,864評(píng)論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)膊畴。三九已至,卻和暖如春病游,著一層夾襖步出監(jiān)牢的瞬間唇跨,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 32,099評(píng)論 1 267
  • 我被黑心中介騙來(lái)泰國(guó)打工衬衬, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留买猖,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 46,598評(píng)論 2 362
  • 正文 我出身青樓滋尉,卻偏偏與公主長(zhǎng)得像玉控,于是被迫代替她去往敵國(guó)和親。 傳聞我的和親對(duì)象是個(gè)殘疾皇子狮惜,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 43,697評(píng)論 2 351

推薦閱讀更多精彩內(nèi)容