前言
對著《Java 編程思想》,通過wait - notifyAll實現(xiàn)了生產(chǎn)者消費者模式容为。今天用BlockingQueue實現(xiàn)一下。
BlockingQueue
簡單實現(xiàn)
生產(chǎn)者和消費者仙畦,共用一個BlockingQueue撼唾。為什么BlockingQueue能夠?qū)崿F(xiàn)生產(chǎn)者-消費者模型呢廉邑?對于put
和take
兩個操作,注釋如下:
/**
* Inserts the specified element into this queue, waiting if necessary
* for space to become available.
*
* @param e the element to add
* @throws InterruptedException if interrupted while waiting
* @throws ClassCastException if the class of the specified element
* prevents it from being added to this queue
* @throws NullPointerException if the specified element is null
* @throws IllegalArgumentException if some property of the specified
* element prevents it from being added to this queue
*/
void put(E e) throws InterruptedException;
/**
* Retrieves and removes the head of this queue, waiting if necessary
* until an element becomes available.
*
* @return the head of this queue
* @throws InterruptedException if interrupted while waiting
*/
E take() throws InterruptedException;
Apple.java倒谷,生產(chǎn)和消費的對象蛛蒙。
public class Apple {
private int id;
public Apple(int id) {
this.id = id;
}
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
@Override
public String toString() {
return "Apple [id=" + id + "]";
}
}
生產(chǎn)者:
public class Producer {
BlockingQueue<Apple> queue;
public Producer(BlockingQueue<Apple> queue) {
this.queue = queue;
}
public boolean put(Apple apple) {
return queue.offer(apple);
}
}
消費者:
public class Consumer {
BlockingQueue<Apple> queue;
public Consumer(BlockingQueue<Apple> queue) {
this.queue = queue;
}
public Apple take() throws InterruptedException {
return queue.take();
}
}
測試:
public class TestConsumer {
public static void main(String[] args) {
final BlockingQueue<Apple> queue = new LinkedBlockingDeque<Apple>(100);
// 生產(chǎn)者
new Thread(new Runnable() {
int appleId = 0;
Producer producer = new Producer(queue);
@Override
public void run() {
try {
while (true) {
TimeUnit.SECONDS.sleep(1);
producer.put(new Apple(appleId++));
producer.put(new Apple(appleId++));
}
} catch (Exception e) {
e.printStackTrace();
}
}
}).start();
// 消費者
new Thread(new Runnable() {
Consumer consumer = new Consumer(queue);
@Override
public void run() {
try {
while (true) {
System.out.println(consumer.take().getId());
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
}
}
輸出:
生產(chǎn)者生產(chǎn)2個Apple,消費者立即消費掉恨锚。
改進(jìn)
上述代碼存在一些問題:
- 生產(chǎn)者和消費者宇驾,都僅用于特定的類型
Apple
- 在使用過程中倍靡,需要自己定義BlockingQueue猴伶,自行實現(xiàn)生產(chǎn)者和消費者的線程,使用復(fù)雜
- 如果要定義多個消費者線程塌西,需要多次手動編寫代碼
- 生產(chǎn)者并沒有專注自身的功能:存儲要消費的對象
- 消費者并沒有專注自身的功能:取出對象他挎、如何消費對象
改進(jìn)后的代碼如下:
Apple類未更改。
Producer變?yōu)槌橄箢惣裥瑁⑹褂梅盒桶旖啊@锩嫘略?code>線程池,用于運(yùn)行消費者線程站辉。
public abstract class Producer<E> {
protected BlockingQueue<E> queue;
protected ExecutorService threadPool = Executors.newCachedThreadPool();
public static final int DEFAULT_QUEUE_LENGTH = 10000;
public Producer(int capacity) {
initQueue(capacity);
}
public BlockingQueue<E> getQueue() {
return queue;
}
public void setQueue(BlockingQueue<E> queue) {
this.queue = queue;
}
public boolean put(E apple) {
return queue.offer(apple);
}
private void initQueue(int capacity) {
if (queue == null) {
synchronized (this) {
if (queue == null) {
queue = new LinkedBlockingDeque<E>(capacity < 0 ? DEFAULT_QUEUE_LENGTH : capacity);
}
}
}
}
protected void consumerThread(int consumerCount, Consumer<E> consumer) {
for (int i = 0; i < consumerCount; i++) {
threadPool.execute(consumer);
}
}
}
Consumer也變成抽象類呢撞,使用泛型损姜,并實現(xiàn)了Runnable接口。其中run方法的實現(xiàn)邏輯是:從阻塞隊列中取出一個對象殊霞,并調(diào)用抽象方法consume
摧阅。該方法是具體的消費者實現(xiàn)的消費邏輯。
public abstract class Consumer<E> implements Runnable{
BlockingQueue<E> queue;
/**
* 數(shù)據(jù)逐個處理
* @param data
*/
protected abstract void consume(E data);
@Override
public void run() {
while (true) {
try {
E data = take();
try {
consume(data);
} catch (Exception e) {
e.printStackTrace();
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public Consumer(BlockingQueue<E> queue) {
this.queue = queue;
}
public E take() throws InterruptedException {
return queue.take();
}
}
AppleProducer:Apple的生產(chǎn)者绷蹲,使用非延遲加載的單例模式棒卷,指定阻塞隊列的長度、消費者線程數(shù)量祝钢。
public class AppleProducer extends Producer<Apple>{
// 并沒有延遲加載
public static AppleProducer INSTANCE = new AppleProducer(DEFAULT_QUEUE_LENGTH, 1);
private AppleProducer(int capacity, int consumerCount) {
super(capacity);
AppleConsumer consumer = new AppleConsumer(queue);
consumerThread(consumerCount, consumer);
}
}
AppleConsumer:Apple的消費者比规,要實現(xiàn)具體的消費方法consume
。這里只是在控制臺輸出對象信息拦英。
public class AppleConsumer extends Consumer<Apple>{
public AppleConsumer(BlockingQueue<Apple> queue) {
super(queue);
}
@Override
protected void consume(Apple data) {
System.out.println(data);
}
}
測試:這里只需要獲取AppleProducer蜒什,調(diào)用put方法添加對象即可!在隊列中有對象Apple時疤估,會有線程取出Apple吃谣,自動調(diào)用AppleConsumer的consume方法。
public class TestConsumer {
public static void main(String[] args) throws InterruptedException {
AppleProducer producer = AppleProducer.INSTANCE;
for (int i = 0; i < 60; i++) {
producer.put(new Apple(i));
}
}
}
有待改進(jìn)的地方
- 并沒有面向接口編程做裙,仍然是通過繼承來實現(xiàn)的岗憋,代碼有耦合(但是也不能算是缺點吧)
- 阻塞隊列直接使用LinkedBlockingDeque,并不夠靈活(PriorityBlockingQueue等)
- 對于線程锚贱,并沒有好的名字仔戈,調(diào)試等并不直觀
- 如果有多個生產(chǎn)者-消費者,例如增加了Banana拧廊,管理仍然不夠直觀监徘。可以增加一個方法吧碾,能夠打印出所有的生產(chǎn)者-消費者