java生產(chǎn)者消費(fèi)者實(shí)現(xiàn)

什么是生產(chǎn)者消費(fèi)者模式

生產(chǎn)者消費(fèi)者模式是通過一個(gè)容器來解決生產(chǎn)者和消費(fèi)者的強(qiáng)耦合問題。生產(chǎn)者和消費(fèi)者彼此之間不直接通訊剑勾,而通過阻塞隊(duì)列來進(jìn)行通訊量窘,所以生產(chǎn)者生產(chǎn)完數(shù)據(jù)之后不用等待消費(fèi)者處理雇寇,直接扔給阻塞隊(duì)列,消費(fèi)者不找生產(chǎn)者要數(shù)據(jù)蚌铜,而是直接從阻塞隊(duì)列里取锨侯,阻塞隊(duì)列就相當(dāng)于一個(gè)緩沖區(qū),平衡了生產(chǎn)者和消費(fèi)者的處理能力冬殃。
這個(gè)阻塞隊(duì)列就是用來給生產(chǎn)者和消費(fèi)者解耦的囚痴。

為什么要使用生產(chǎn)者和消費(fèi)者模式

在線程世界里,生產(chǎn)者就是生產(chǎn)數(shù)據(jù)的線程审葬,消費(fèi)者就是消費(fèi)數(shù)據(jù)的線程深滚。在多線程開發(fā)當(dāng)中,如果生產(chǎn)者處理速度很快涣觉,而消費(fèi)者處理速度很慢成箫,那么生產(chǎn)者就必須等待消費(fèi)者處理完,才能繼續(xù)生產(chǎn)數(shù)據(jù)旨枯。同樣的道理蹬昌,如果消費(fèi)者的處理能力大于生產(chǎn)者,那么消費(fèi)者就必須等待生產(chǎn)者攀隔。為了解決這個(gè)問題于是引入了生產(chǎn)者和消費(fèi)者模式皂贩。

生產(chǎn)者消費(fèi)者模式實(shí)現(xiàn)(Java)

阻塞隊(duì)列是實(shí)現(xiàn)生產(chǎn)者消費(fèi)者模式的關(guān)鍵,本文介紹兩種自定義阻塞隊(duì)列的實(shí)現(xiàn)以及JDK 1.5 以后新增的java.util.concurrent
包中提供的阻塞隊(duì)列類昆汹。
首先明刷,阻塞隊(duì)列接口:

package com.bytebeats.concurrent.queue;

/**
 * 阻塞隊(duì)列接口
 *
 * @author Ricky Fung
 * @create 2017-03-26 17:28
 */
public interface IBlockingQueue<T> {

    void put(T data) throws InterruptedException;

    T take() throws InterruptedException;
}

方式1

使用 Object.wait()/notifyAll() 來實(shí)現(xiàn)阻塞隊(duì)列。
1满粗、阻塞隊(duì)列實(shí)現(xiàn)

package com.bytebeats.concurrent.queue;

import java.util.LinkedList;

/**
 * 使用Object.wait()/notifyAll()實(shí)現(xiàn)的阻塞隊(duì)列
 *
 * @author Zixi Wang
 * @create 2017-11-01 16:18
 */
public class TraditionalBlockingQueue<T> implements IBlockingQueue<T> {
    private int queueSize;
    private final LinkedList<T> list = new LinkedList<T>();
    private final Object lock = new Object();

    public TraditionalBlockingQueue(){
        this(10);
    }
    public TraditionalBlockingQueue(int queueSize) {
        if(queueSize<1){
            throw new IllegalArgumentException("queueSize must be positive number");
        }
        this.queueSize = queueSize;
    }

    @Override
    public void put(T data) throws InterruptedException {

        synchronized (lock){
            while(list.size()>=queueSize) {
                lock.wait();
            }
            list.addLast(data);
            lock.notifyAll();
        }
    }

    @Override
    public T take() throws InterruptedException {

        synchronized(lock){
            while(list.size()<=0) {
                lock.wait();
            }
            T data = list.removeFirst();
            lock.notifyAll();
            return data;
        }
    }
}

注意要點(diǎn)
判定 LinkedList大小為0或者大于等于queueSize時(shí)須使用while (condition) {}辈末,不能使用if(condition) {}。其中while(condition)循環(huán),它又被叫做“自旋鎖”挤聘。自旋鎖以及wait()和notify()方法在線程通信這篇文章中有更加詳細(xì)的介紹轰枝。為防止該線程沒有收到notify()調(diào)用也從wait()中返回(也稱作虛假喚醒),這個(gè)線程會(huì)重新去檢查condition條件以決定當(dāng)前是否可以安全地繼續(xù)執(zhí)行還是需要重新保持等待组去,而不是認(rèn)為線程被喚醒了就可以安全地繼續(xù)執(zhí)行了鞍陨。
在 take 方法取走一個(gè)元素后須調(diào)用lock.notifyAll();,如果使用lock.notify()方法在某些情況下會(huì)導(dǎo)致 生產(chǎn)者-消費(fèi)者 同時(shí)處于阻塞狀態(tài)从隆。

方式2

通過Lock和Condition實(shí)現(xiàn)阻塞隊(duì)列

package com.bytebeats.concurrent.queue;

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

/**
 * 通過Lock和Condition實(shí)現(xiàn)阻塞隊(duì)列
 *
 * @author Zixi Wang
 * @create 2017-11-01 17:08
 */
public class ConditionBlockingQueue<T> implements IBlockingQueue<T> {
    private final Object[] items;
    int putptr, takeptr, count;

    private final Lock lock = new ReentrantLock();
    private final Condition notFull  = lock.newCondition();
    private final Condition notEmpty = lock.newCondition();

    public ConditionBlockingQueue(){
        this(10);
    }
    public ConditionBlockingQueue(int queueSize) {
        if(queueSize<1){
            throw new IllegalArgumentException("queueSize must be positive number");
        }
        items = new Object[queueSize];
    }

    @Override
    public void put(T data) throws InterruptedException {

        lock.lock();
        try {
            while (count == items.length) {
                notFull.await();
            }
            items[putptr] = data;
            if (++putptr == items.length) {
                putptr = 0;
            }
            ++count;
            notEmpty.signal();
        } finally {
            lock.unlock();
        }
    }

    @Override
    public T take() throws InterruptedException {

        lock.lock();
        try {
            while (count == 0) {
                notEmpty.wait();
            }
            T data = (T) items[takeptr];
            if (++takeptr == items.length) {
                takeptr = 0;
            }
            --count;
            notFull.signal();
            return data;
        } finally {
            lock.unlock();
        }
    }
}

方式3

JDK 1.5 以后新增的java.util.concurrent
包新增了java.util.concurrent. BlockingQueue
接口:
A Queue that additionally supports operations that wait for the queue to become non-empty when retrieving an element, and wait for space to become available in the queue when storing an element.

并提供了如下幾種阻塞隊(duì)列實(shí)現(xiàn):
java.util.concurrent.ArrayBlockingQueue
java.util.concurrent.LinkedBlockingQueue
java.util.concurrent.SynchronousQueue
java.util.concurrent.PriorityBlockingQueue

實(shí)現(xiàn)生產(chǎn)者-消費(fèi)者模型使用java.util.concurrent.ArrayBlockingQueue
或者java.util.concurrent.LinkedBlockingQueue即可诚撵。

package com.bytebeats.concurrent;

import com.bytebeats.concurrent.queue.IBlockingQueue;
import com.bytebeats.concurrent.util.Constant;

/**
 * 
 *
 * @author Zixi Wang
 * @create 2017-11-01 16:16
 */
public class Producer implements Runnable {
    private IBlockingQueue<String> queue;
    private int consumerNum;

    public Producer(IBlockingQueue<String> queue, int consumerNum) {
        this.queue = queue;
        this.consumerNum = consumerNum;
    }

    @Override
    public void run() {

        for(int i=0; i< 100; i++){
            try {
                queue.put("data_"+i);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }

        for(int i=0; i<consumerNum; i++){   //結(jié)束符
            try {
                queue.put(Constant.ENDING_SYMBOL);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }

        System.out.println("Producer over");
    }
}

消費(fèi)者

package com.bytebeats.concurrent;

import com.bytebeats.concurrent.queue.IBlockingQueue;
import com.bytebeats.concurrent.util.Constant;

import java.util.concurrent.TimeUnit;

/**
 * 消費(fèi)者
 *
 * @author Zixi Wang
 * @create 2017-11-01 16:16
 */
public class Consumer implements Runnable {
    private IBlockingQueue<String> queue;

    public Consumer(IBlockingQueue<String> queue) {
        this.queue = queue;
    }

    @Override
    public void run() {

        while (true) {
            String data = null;
            try {
                data = queue.take();
                System.out.println("Consumer "+Thread.currentThread().getName()+" consume:"+data);
                if (Constant.ENDING_SYMBOL.equals(data)) {
                    break;
                }
                TimeUnit.MILLISECONDS.sleep(100);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
        System.out.println("Consumer over");
    }
}

我們用 一個(gè)生產(chǎn)者 兩個(gè)消費(fèi)者來做測(cè)試,如下:

package com.bytebeats.concurrent;

import com.bytebeats.concurrent.queue.ConditionBlockingQueue;
import com.bytebeats.concurrent.queue.IBlockingQueue;

/**
 * ${DESCRIPTION}
 *
 * @author Zixi Wang
 * @create 2017-11-01 16:21
 */
public class ProducerConsumerDemo {

    public static void main(String[] args) {

        //new ProducerConsumerDemo().testRun(new TraditionalBlockingQueue<String>());
        new ProducerConsumerDemo().testRun(new ConditionBlockingQueue<String>());
    }

    public void testRun(IBlockingQueue<String> queue){

        Thread producer = new Thread(new Producer(queue, 2));
        producer.start();

        Thread consumer1 = new Thread(new Consumer(queue));
        consumer1.start();
        Thread consumer2 = new Thread(new Consumer(queue));
        consumer2.start();
    }
}
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末键闺,一起剝皮案震驚了整個(gè)濱河市寿烟,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌辛燥,老刑警劉巖筛武,帶你破解...
    沈念sama閱讀 218,122評(píng)論 6 505
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場(chǎng)離奇詭異购桑,居然都是意外死亡,警方通過查閱死者的電腦和手機(jī)氏淑,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,070評(píng)論 3 395
  • 文/潘曉璐 我一進(jìn)店門勃蜘,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人假残,你說我怎么就攤上這事缭贡。” “怎么了辉懒?”我有些...
    開封第一講書人閱讀 164,491評(píng)論 0 354
  • 文/不壞的土叔 我叫張陵阳惹,是天一觀的道長。 經(jīng)常有香客問我眶俩,道長莹汤,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 58,636評(píng)論 1 293
  • 正文 為了忘掉前任颠印,我火速辦了婚禮纲岭,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘线罕。我一直安慰自己止潮,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,676評(píng)論 6 392
  • 文/花漫 我一把揭開白布钞楼。 她就那樣靜靜地躺著喇闸,像睡著了一般。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上燃乍,一...
    開封第一講書人閱讀 51,541評(píng)論 1 305
  • 那天唆樊,我揣著相機(jī)與錄音,去河邊找鬼橘沥。 笑死窗轩,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的座咆。 我是一名探鬼主播痢艺,決...
    沈念sama閱讀 40,292評(píng)論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場(chǎng)噩夢(mèng)啊……” “哼介陶!你這毒婦竟也來了堤舒?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 39,211評(píng)論 0 276
  • 序言:老撾萬榮一對(duì)情侶失蹤哺呜,失蹤者是張志新(化名)和其女友劉穎舌缤,沒想到半個(gè)月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體某残,經(jīng)...
    沈念sama閱讀 45,655評(píng)論 1 314
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡国撵,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,846評(píng)論 3 336
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了玻墅。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片介牙。...
    茶點(diǎn)故事閱讀 39,965評(píng)論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖澳厢,靈堂內(nèi)的尸體忽然破棺而出环础,到底是詐尸還是另有隱情,我是刑警寧澤剩拢,帶...
    沈念sama閱讀 35,684評(píng)論 5 347
  • 正文 年R本政府宣布线得,位于F島的核電站,受9級(jí)特大地震影響徐伐,放射性物質(zhì)發(fā)生泄漏贯钩。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,295評(píng)論 3 329
  • 文/蒙蒙 一办素、第九天 我趴在偏房一處隱蔽的房頂上張望魏保。 院中可真熱鬧,春花似錦摸屠、人聲如沸谓罗。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,894評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽檩咱。三九已至揭措,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間刻蚯,已是汗流浹背绊含。 一陣腳步聲響...
    開封第一講書人閱讀 33,012評(píng)論 1 269
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留炊汹,地道東北人躬充。 一個(gè)月前我還...
    沈念sama閱讀 48,126評(píng)論 3 370
  • 正文 我出身青樓,卻偏偏與公主長得像讨便,于是被迫代替她去往敵國和親充甚。 傳聞我的和親對(duì)象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,914評(píng)論 2 355

推薦閱讀更多精彩內(nèi)容