Java 生產(chǎn)者消費者實現(xiàn) —— BlockingQueue

前言

對著《Java 編程思想》,通過wait - notifyAll實現(xiàn)了生產(chǎn)者消費者模式容为。今天用BlockingQueue實現(xiàn)一下。

BlockingQueue

簡單實現(xiàn)

生產(chǎn)者和消費者仙畦,共用一個BlockingQueue撼唾。為什么BlockingQueue能夠?qū)崿F(xiàn)生產(chǎn)者-消費者模型呢廉邑?對于puttake兩個操作,注釋如下:

/**
 * Inserts the specified element into this queue, waiting if necessary
 * for space to become available.
 *
 * @param e the element to add
 * @throws InterruptedException if interrupted while waiting
 * @throws ClassCastException if the class of the specified element
 *         prevents it from being added to this queue
 * @throws NullPointerException if the specified element is null
 * @throws IllegalArgumentException if some property of the specified
 *         element prevents it from being added to this queue
 */
void put(E e) throws InterruptedException;
/**
 * Retrieves and removes the head of this queue, waiting if necessary
 * until an element becomes available.
 *
 * @return the head of this queue
 * @throws InterruptedException if interrupted while waiting
 */
E take() throws InterruptedException;

Apple.java倒谷,生產(chǎn)和消費的對象蛛蒙。

public class Apple {
    
    private int id;
    
    public Apple(int id) {
        this.id = id;
    }

    public int getId() {
        return id;
    }

    public void setId(int id) {
        this.id = id;
    }

    @Override
    public String toString() {
        return "Apple [id=" + id + "]";
    }
}

生產(chǎn)者:

public class Producer {
    BlockingQueue<Apple> queue;
    
    public Producer(BlockingQueue<Apple> queue) {
        this.queue = queue;
    }
    
    public boolean put(Apple apple) {
        return queue.offer(apple);
    }
}

消費者:

public class Consumer {
    BlockingQueue<Apple> queue;
    
    public Consumer(BlockingQueue<Apple> queue) {
        this.queue = queue;
    }
    
    public Apple take() throws InterruptedException {
        return queue.take();
    }
}

測試:

public class TestConsumer {
    
    public static void main(String[] args) {

        final BlockingQueue<Apple> queue = new LinkedBlockingDeque<Apple>(100);
        
        // 生產(chǎn)者
        new Thread(new Runnable() {
            
            int appleId = 0;
            Producer producer = new Producer(queue);
            
            @Override
            public void run() {
                try {
                    while (true) {
                        TimeUnit.SECONDS.sleep(1);
                        producer.put(new Apple(appleId++)); 
                        producer.put(new Apple(appleId++)); 
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }).start();

        // 消費者
        new Thread(new Runnable() {
            Consumer consumer = new Consumer(queue);
            
            @Override
            public void run() {
                try {
                    while (true) {
                        System.out.println(consumer.take().getId());
                    }
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }).start();
    }
}

輸出:

生產(chǎn)者生產(chǎn)2個Apple,消費者立即消費掉恨锚。

改進(jìn)

上述代碼存在一些問題:

  • 生產(chǎn)者和消費者宇驾,都僅用于特定的類型Apple
  • 在使用過程中倍靡,需要自己定義BlockingQueue猴伶,自行實現(xiàn)生產(chǎn)者和消費者的線程,使用復(fù)雜
  • 如果要定義多個消費者線程塌西,需要多次手動編寫代碼
  • 生產(chǎn)者并沒有專注自身的功能:存儲要消費的對象
  • 消費者并沒有專注自身的功能:取出對象他挎、如何消費對象

改進(jìn)后的代碼如下:

Apple類未更改。

Producer變?yōu)槌橄箢惣裥瑁⑹褂梅盒桶旖啊@锩嫘略?code>線程池,用于運(yùn)行消費者線程站辉。

public abstract class Producer<E> {
    protected BlockingQueue<E> queue;
    protected ExecutorService threadPool = Executors.newCachedThreadPool();
    public static final int DEFAULT_QUEUE_LENGTH = 10000;
    
    public Producer(int capacity) {
        initQueue(capacity);
    }
    
    public BlockingQueue<E> getQueue() {
        return queue;
    }

    public void setQueue(BlockingQueue<E> queue) {
        this.queue = queue;
    }

    public boolean put(E apple) {
        return queue.offer(apple);
    }
    
    private void initQueue(int capacity) {
        if (queue == null) {
            synchronized (this) {
                if (queue == null) {
                    queue = new LinkedBlockingDeque<E>(capacity < 0 ? DEFAULT_QUEUE_LENGTH : capacity);
                }
            }
        }
    }
    
    protected void consumerThread(int consumerCount, Consumer<E> consumer) {
        for (int i = 0; i < consumerCount; i++) {
            threadPool.execute(consumer);
        }
    }
}

Consumer也變成抽象類呢撞,使用泛型损姜,并實現(xiàn)了Runnable接口。其中run方法的實現(xiàn)邏輯是:從阻塞隊列中取出一個對象殊霞,并調(diào)用抽象方法consume摧阅。該方法是具體的消費者實現(xiàn)的消費邏輯。

public abstract class Consumer<E> implements Runnable{
    BlockingQueue<E> queue;
    
    /**
     * 數(shù)據(jù)逐個處理
     * @param data
     */
    protected abstract void consume(E data);
    
    @Override
    public void run() {
        while (true) {
            try {
                E data = take();
                try {
                    consume(data);
                } catch (Exception e) {
                    e.printStackTrace();
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
    
    public Consumer(BlockingQueue<E> queue) {
        this.queue = queue;
    }
    
    public E take() throws InterruptedException {
        return queue.take();
    }
}

AppleProducer:Apple的生產(chǎn)者绷蹲,使用非延遲加載的單例模式棒卷,指定阻塞隊列的長度、消費者線程數(shù)量祝钢。

public class AppleProducer extends Producer<Apple>{
    
    // 并沒有延遲加載
    public static AppleProducer INSTANCE = new AppleProducer(DEFAULT_QUEUE_LENGTH, 1); 

    private AppleProducer(int capacity, int consumerCount) {
        super(capacity);
        AppleConsumer consumer = new AppleConsumer(queue);
        consumerThread(consumerCount, consumer);
    }
}

AppleConsumer:Apple的消費者比规,要實現(xiàn)具體的消費方法consume。這里只是在控制臺輸出對象信息拦英。

public class AppleConsumer extends Consumer<Apple>{

    public AppleConsumer(BlockingQueue<Apple> queue) {
        super(queue);
    }

    @Override
    protected void consume(Apple data) {
        System.out.println(data);
    }
}

測試:這里只需要獲取AppleProducer蜒什,調(diào)用put方法添加對象即可!在隊列中有對象Apple時疤估,會有線程取出Apple吃谣,自動調(diào)用AppleConsumer的consume方法。

public class TestConsumer {
    
    public static void main(String[] args) throws InterruptedException {

        AppleProducer producer = AppleProducer.INSTANCE;
        for (int i = 0; i < 60; i++) {
            producer.put(new Apple(i));
        }
    }
}

有待改進(jìn)的地方

  • 并沒有面向接口編程做裙,仍然是通過繼承來實現(xiàn)的岗憋,代碼有耦合(但是也不能算是缺點吧)
  • 阻塞隊列直接使用LinkedBlockingDeque,并不夠靈活(PriorityBlockingQueue等)
  • 對于線程锚贱,并沒有好的名字仔戈,調(diào)試等并不直觀
  • 如果有多個生產(chǎn)者-消費者,例如增加了Banana拧廊,管理仍然不夠直觀监徘。可以增加一個方法吧碾,能夠打印出所有的生產(chǎn)者-消費者
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末凰盔,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子倦春,更是在濱河造成了極大的恐慌户敬,老刑警劉巖,帶你破解...
    沈念sama閱讀 218,122評論 6 505
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件睁本,死亡現(xiàn)場離奇詭異尿庐,居然都是意外死亡,警方通過查閱死者的電腦和手機(jī)呢堰,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,070評論 3 395
  • 文/潘曉璐 我一進(jìn)店門抄瑟,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人枉疼,你說我怎么就攤上這事皮假⌒猓” “怎么了?”我有些...
    開封第一講書人閱讀 164,491評論 0 354
  • 文/不壞的土叔 我叫張陵惹资,是天一觀的道長严卖。 經(jīng)常有香客問我,道長布轿,這世上最難降的妖魔是什么哮笆? 我笑而不...
    開封第一講書人閱讀 58,636評論 1 293
  • 正文 為了忘掉前任,我火速辦了婚禮汰扭,結(jié)果婚禮上稠肘,老公的妹妹穿的比我還像新娘。我一直安慰自己萝毛,他們只是感情好项阴,可當(dāng)我...
    茶點故事閱讀 67,676評論 6 392
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著笆包,像睡著了一般环揽。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上庵佣,一...
    開封第一講書人閱讀 51,541評論 1 305
  • 那天歉胶,我揣著相機(jī)與錄音,去河邊找鬼巴粪。 笑死通今,一個胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的肛根。 我是一名探鬼主播辫塌,決...
    沈念sama閱讀 40,292評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼派哲!你這毒婦竟也來了臼氨?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 39,211評論 0 276
  • 序言:老撾萬榮一對情侶失蹤芭届,失蹤者是張志新(化名)和其女友劉穎储矩,沒想到半個月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體喉脖,經(jīng)...
    沈念sama閱讀 45,655評論 1 314
  • 正文 獨居荒郊野嶺守林人離奇死亡椰苟,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,846評論 3 336
  • 正文 我和宋清朗相戀三年抑月,在試婚紗的時候發(fā)現(xiàn)自己被綠了树叽。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 39,965評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡谦絮,死狀恐怖题诵,靈堂內(nèi)的尸體忽然破棺而出洁仗,到底是詐尸還是另有隱情,我是刑警寧澤性锭,帶...
    沈念sama閱讀 35,684評論 5 347
  • 正文 年R本政府宣布赠潦,位于F島的核電站,受9級特大地震影響草冈,放射性物質(zhì)發(fā)生泄漏她奥。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 41,295評論 3 329
  • 文/蒙蒙 一怎棱、第九天 我趴在偏房一處隱蔽的房頂上張望哩俭。 院中可真熱鬧,春花似錦拳恋、人聲如沸凡资。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,894評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽隙赁。三九已至,卻和暖如春梆暖,著一層夾襖步出監(jiān)牢的瞬間伞访,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 33,012評論 1 269
  • 我被黑心中介騙來泰國打工轰驳, 沒想到剛下飛機(jī)就差點兒被人妖公主榨干…… 1. 我叫王不留咐扭,地道東北人。 一個月前我還...
    沈念sama閱讀 48,126評論 3 370
  • 正文 我出身青樓滑废,卻偏偏與公主長得像蝗肪,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子蠕趁,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 44,914評論 2 355

推薦閱讀更多精彩內(nèi)容

  • 考查Java的并發(fā)編程時薛闪,手寫“生產(chǎn)者-消費者模型”是一個經(jīng)典問題。有如下幾個考點: 對Java并發(fā)模型的理解 對...
    猴子007閱讀 4,404評論 12 33
  • 相關(guān)概念 面向?qū)ο蟮娜齻€特征 封裝,繼承,多態(tài).這個應(yīng)該是人人皆知.有時候也會加上抽象. 多態(tài)的好處 允許不同類對...
    東經(jīng)315度閱讀 1,942評論 0 8
  • 一俺陋、多線程 說明下線程的狀態(tài) java中的線程一共有 5 種狀態(tài)豁延。 NEW:這種情況指的是,通過 New 關(guān)鍵字創(chuàng)...
    Java旅行者閱讀 4,680評論 0 44
  • 東野圭吾的作品作為懸疑推理小說的先鋒,讀的多了缴挖,就很容易被貼標(biāo)簽袋狞,滿滿都是充滿理性燒腦的情節(jié)。天天腦袋處于放空的自...
    我冰叔閱讀 674評論 0 0
  • 《朗讀者》里對汪明荃和羅家英訪談的那一期給人留下很深的印象。 汪姐說:他們到六十歲這個年紀(jì)才選擇結(jié)婚苟鸯,是很重視這一...
    大頭諾阿諾閱讀 845評論 0 0