什么是生產(chǎn)者消費(fèi)者模式
生產(chǎn)者消費(fèi)者模式是通過一個(gè)容器來解決生產(chǎn)者和消費(fèi)者的強(qiáng)耦合問題。生產(chǎn)者和消費(fèi)者彼此之間不直接通訊剑勾,而通過阻塞隊(duì)列來進(jìn)行通訊量窘,所以生產(chǎn)者生產(chǎn)完數(shù)據(jù)之后不用等待消費(fèi)者處理雇寇,直接扔給阻塞隊(duì)列,消費(fèi)者不找生產(chǎn)者要數(shù)據(jù)蚌铜,而是直接從阻塞隊(duì)列里取锨侯,阻塞隊(duì)列就相當(dāng)于一個(gè)緩沖區(qū),平衡了生產(chǎn)者和消費(fèi)者的處理能力冬殃。
這個(gè)阻塞隊(duì)列就是用來給生產(chǎn)者和消費(fèi)者解耦的囚痴。
為什么要使用生產(chǎn)者和消費(fèi)者模式
在線程世界里,生產(chǎn)者就是生產(chǎn)數(shù)據(jù)的線程审葬,消費(fèi)者就是消費(fèi)數(shù)據(jù)的線程深滚。在多線程開發(fā)當(dāng)中,如果生產(chǎn)者處理速度很快涣觉,而消費(fèi)者處理速度很慢成箫,那么生產(chǎn)者就必須等待消費(fèi)者處理完,才能繼續(xù)生產(chǎn)數(shù)據(jù)旨枯。同樣的道理蹬昌,如果消費(fèi)者的處理能力大于生產(chǎn)者,那么消費(fèi)者就必須等待生產(chǎn)者攀隔。為了解決這個(gè)問題于是引入了生產(chǎn)者和消費(fèi)者模式皂贩。
生產(chǎn)者消費(fèi)者模式實(shí)現(xiàn)(Java)
阻塞隊(duì)列是實(shí)現(xiàn)生產(chǎn)者消費(fèi)者模式的關(guān)鍵,本文介紹兩種自定義阻塞隊(duì)列的實(shí)現(xiàn)以及JDK 1.5 以后新增的java.util.concurrent
包中提供的阻塞隊(duì)列類昆汹。
首先明刷,阻塞隊(duì)列接口:
package com.bytebeats.concurrent.queue;
/**
* 阻塞隊(duì)列接口
*
* @author Ricky Fung
* @create 2017-03-26 17:28
*/
public interface IBlockingQueue<T> {
void put(T data) throws InterruptedException;
T take() throws InterruptedException;
}
方式1
使用 Object.wait()/notifyAll() 來實(shí)現(xiàn)阻塞隊(duì)列。
1满粗、阻塞隊(duì)列實(shí)現(xiàn)
package com.bytebeats.concurrent.queue;
import java.util.LinkedList;
/**
* 使用Object.wait()/notifyAll()實(shí)現(xiàn)的阻塞隊(duì)列
*
* @author Zixi Wang
* @create 2017-11-01 16:18
*/
public class TraditionalBlockingQueue<T> implements IBlockingQueue<T> {
private int queueSize;
private final LinkedList<T> list = new LinkedList<T>();
private final Object lock = new Object();
public TraditionalBlockingQueue(){
this(10);
}
public TraditionalBlockingQueue(int queueSize) {
if(queueSize<1){
throw new IllegalArgumentException("queueSize must be positive number");
}
this.queueSize = queueSize;
}
@Override
public void put(T data) throws InterruptedException {
synchronized (lock){
while(list.size()>=queueSize) {
lock.wait();
}
list.addLast(data);
lock.notifyAll();
}
}
@Override
public T take() throws InterruptedException {
synchronized(lock){
while(list.size()<=0) {
lock.wait();
}
T data = list.removeFirst();
lock.notifyAll();
return data;
}
}
}
注意要點(diǎn)
判定 LinkedList大小為0或者大于等于queueSize時(shí)須使用while (condition) {}辈末,不能使用if(condition) {}。其中while(condition)循環(huán),它又被叫做“自旋鎖”挤聘。自旋鎖以及wait()和notify()方法在線程通信這篇文章中有更加詳細(xì)的介紹轰枝。為防止該線程沒有收到notify()調(diào)用也從wait()中返回(也稱作虛假喚醒),這個(gè)線程會(huì)重新去檢查condition條件以決定當(dāng)前是否可以安全地繼續(xù)執(zhí)行還是需要重新保持等待组去,而不是認(rèn)為線程被喚醒了就可以安全地繼續(xù)執(zhí)行了鞍陨。
在 take 方法取走一個(gè)元素后須調(diào)用lock.notifyAll();,如果使用lock.notify()方法在某些情況下會(huì)導(dǎo)致 生產(chǎn)者-消費(fèi)者 同時(shí)處于阻塞狀態(tài)从隆。
方式2
通過Lock和Condition實(shí)現(xiàn)阻塞隊(duì)列
package com.bytebeats.concurrent.queue;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
/**
* 通過Lock和Condition實(shí)現(xiàn)阻塞隊(duì)列
*
* @author Zixi Wang
* @create 2017-11-01 17:08
*/
public class ConditionBlockingQueue<T> implements IBlockingQueue<T> {
private final Object[] items;
int putptr, takeptr, count;
private final Lock lock = new ReentrantLock();
private final Condition notFull = lock.newCondition();
private final Condition notEmpty = lock.newCondition();
public ConditionBlockingQueue(){
this(10);
}
public ConditionBlockingQueue(int queueSize) {
if(queueSize<1){
throw new IllegalArgumentException("queueSize must be positive number");
}
items = new Object[queueSize];
}
@Override
public void put(T data) throws InterruptedException {
lock.lock();
try {
while (count == items.length) {
notFull.await();
}
items[putptr] = data;
if (++putptr == items.length) {
putptr = 0;
}
++count;
notEmpty.signal();
} finally {
lock.unlock();
}
}
@Override
public T take() throws InterruptedException {
lock.lock();
try {
while (count == 0) {
notEmpty.wait();
}
T data = (T) items[takeptr];
if (++takeptr == items.length) {
takeptr = 0;
}
--count;
notFull.signal();
return data;
} finally {
lock.unlock();
}
}
}
方式3
JDK 1.5 以后新增的java.util.concurrent
包新增了java.util.concurrent. BlockingQueue
接口:
A Queue that additionally supports operations that wait for the queue to become non-empty when retrieving an element, and wait for space to become available in the queue when storing an element.
并提供了如下幾種阻塞隊(duì)列實(shí)現(xiàn):
java.util.concurrent.ArrayBlockingQueue
java.util.concurrent.LinkedBlockingQueue
java.util.concurrent.SynchronousQueue
java.util.concurrent.PriorityBlockingQueue
實(shí)現(xiàn)生產(chǎn)者-消費(fèi)者模型使用java.util.concurrent.ArrayBlockingQueue
或者java.util.concurrent.LinkedBlockingQueue即可诚撵。
package com.bytebeats.concurrent;
import com.bytebeats.concurrent.queue.IBlockingQueue;
import com.bytebeats.concurrent.util.Constant;
/**
*
*
* @author Zixi Wang
* @create 2017-11-01 16:16
*/
public class Producer implements Runnable {
private IBlockingQueue<String> queue;
private int consumerNum;
public Producer(IBlockingQueue<String> queue, int consumerNum) {
this.queue = queue;
this.consumerNum = consumerNum;
}
@Override
public void run() {
for(int i=0; i< 100; i++){
try {
queue.put("data_"+i);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
for(int i=0; i<consumerNum; i++){ //結(jié)束符
try {
queue.put(Constant.ENDING_SYMBOL);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
System.out.println("Producer over");
}
}
消費(fèi)者
package com.bytebeats.concurrent;
import com.bytebeats.concurrent.queue.IBlockingQueue;
import com.bytebeats.concurrent.util.Constant;
import java.util.concurrent.TimeUnit;
/**
* 消費(fèi)者
*
* @author Zixi Wang
* @create 2017-11-01 16:16
*/
public class Consumer implements Runnable {
private IBlockingQueue<String> queue;
public Consumer(IBlockingQueue<String> queue) {
this.queue = queue;
}
@Override
public void run() {
while (true) {
String data = null;
try {
data = queue.take();
System.out.println("Consumer "+Thread.currentThread().getName()+" consume:"+data);
if (Constant.ENDING_SYMBOL.equals(data)) {
break;
}
TimeUnit.MILLISECONDS.sleep(100);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
System.out.println("Consumer over");
}
}
我們用 一個(gè)生產(chǎn)者 兩個(gè)消費(fèi)者來做測(cè)試,如下:
package com.bytebeats.concurrent;
import com.bytebeats.concurrent.queue.ConditionBlockingQueue;
import com.bytebeats.concurrent.queue.IBlockingQueue;
/**
* ${DESCRIPTION}
*
* @author Zixi Wang
* @create 2017-11-01 16:21
*/
public class ProducerConsumerDemo {
public static void main(String[] args) {
//new ProducerConsumerDemo().testRun(new TraditionalBlockingQueue<String>());
new ProducerConsumerDemo().testRun(new ConditionBlockingQueue<String>());
}
public void testRun(IBlockingQueue<String> queue){
Thread producer = new Thread(new Producer(queue, 2));
producer.start();
Thread consumer1 = new Thread(new Consumer(queue));
consumer1.start();
Thread consumer2 = new Thread(new Consumer(queue));
consumer2.start();
}
}