一兼搏、簡(jiǎn)述
1??生產(chǎn)者消費(fèi)者模式并不是 GOF 提出的 23 種設(shè)計(jì)模式之一,23 種設(shè)計(jì)模式都是建立在面向?qū)ο蟮幕A(chǔ)之上的缩歪,但其實(shí)面向過程的編程中也有很多高效的編程模式,生產(chǎn)者消費(fèi)者模式便是其中之一谍憔,它是編程過程中最常用的一種設(shè)計(jì)模式匪蝙。
一個(gè)常見的場(chǎng)景:某個(gè)模塊負(fù)責(zé)產(chǎn)生數(shù)據(jù),這些數(shù)據(jù)由另一個(gè)模塊來(lái)負(fù)責(zé)處理(此處的模塊是廣義的习贫,可以是類逛球、函數(shù)、線程苫昌、進(jìn)程等)颤绕。產(chǎn)生數(shù)據(jù)的模塊,就形象地稱為生產(chǎn)者;而處理數(shù)據(jù)的模塊奥务,就稱為消費(fèi)者涕烧。單單抽象出生產(chǎn)者和消費(fèi)者,還夠不上是生產(chǎn)者/消費(fèi)者模式汗洒。該模式還需要有一個(gè)緩沖區(qū)處于生產(chǎn)者和消費(fèi)者之間,作為一個(gè)中介父款。生產(chǎn)者把數(shù)據(jù)放入緩沖區(qū)溢谤,而消費(fèi)者從緩沖區(qū)取出數(shù)據(jù)。
2??舉個(gè)寄信的例子憨攒,大致過程如下:
- 把信寫好——相當(dāng)于生產(chǎn)者制造數(shù)據(jù)世杀。
- 把信放入郵筒——相當(dāng)于生產(chǎn)者把數(shù)據(jù)放入緩沖區(qū)。
- 郵遞員把信從郵筒取出——相當(dāng)于消費(fèi)者把數(shù)據(jù)取出緩沖區(qū)肝集。
- 郵遞員把信拿去郵局做相應(yīng)的處理——相當(dāng)于消費(fèi)者處理數(shù)據(jù)瞻坝。
3??說(shuō)明
- 生產(chǎn)消費(fèi)者模式可以有效的對(duì)數(shù)據(jù)解耦,優(yōu)化系統(tǒng)結(jié)構(gòu)杏瞻。
- 降低生產(chǎn)者和消費(fèi)者線程相互之間的依賴與性能要求所刀。
- 一般使用 BlockingQueue 作為數(shù)據(jù)緩沖隊(duì)列,是通過鎖和阻塞來(lái)實(shí)現(xiàn)數(shù)據(jù)之間的同步捞挥。如果對(duì)緩沖隊(duì)列有性能要求浮创,則可以使用基于 CAS 無(wú)鎖設(shè)計(jì)的 ConcurrentLinkedQueue。
二砌函、生產(chǎn)者-消費(fèi)者模式是一個(gè)經(jīng)典的多線程設(shè)計(jì)模式
【生產(chǎn)者-消費(fèi)者模式】為多線程間的協(xié)作提供了良好的解決方案斩披。在該模式中,通常有兩類線程讹俊,即若干個(gè)生產(chǎn)者線程和若干個(gè)消費(fèi)者線程垦沉。生產(chǎn)者線程負(fù)責(zé)提交用戶請(qǐng)求,消費(fèi)者線程則負(fù)責(zé)具體處理生產(chǎn)者提交的任務(wù)仍劈。生產(chǎn)者和消費(fèi)者在同一時(shí)間段內(nèi)共用同一存儲(chǔ)空間厕倍,生產(chǎn)者向空間里生產(chǎn)數(shù)據(jù),而消費(fèi)者取走數(shù)據(jù)耳奕。阻塞隊(duì)列就相當(dāng)于一個(gè)緩沖區(qū)绑青,平衡了生產(chǎn)者和消費(fèi)者的處理能力。這個(gè)阻塞隊(duì)列就是用來(lái)給生產(chǎn)者和消費(fèi)者解耦的屋群。
Java 中一共有四種方法支持同步闸婴,其中前三個(gè)是同步方法,一個(gè)是管道方法芍躏。
- Object 的 wait()/notify()邪乍。
- Lock 和 Condition 的 await()/signal()。
- BlockingQueue 阻塞隊(duì)列方法。
- PipedInputStream/PipedOutputStream
三庇楞、wait()/notify()
wait()/nofity() 是基類 Object 的兩個(gè)方法榜配,也就意味著所有 Java 類都有這兩個(gè)方法,這樣就可以為任何對(duì)象實(shí)現(xiàn)同步機(jī)制吕晌。
- wait():當(dāng)緩沖區(qū)已滿/空時(shí)蛋褥,生產(chǎn)者/消費(fèi)者線程停止自己的執(zhí)行,放棄鎖睛驳,使自己處于等待狀態(tài)烙心,讓其他線程執(zhí)行。
- notify():當(dāng)生產(chǎn)者/消費(fèi)者向緩沖區(qū)放入/取出一個(gè)產(chǎn)品時(shí)乏沸,向其他等待的線程發(fā)出可執(zhí)行的通知淫茵,同時(shí)放棄鎖,使自己處于等待狀態(tài)蹬跃。
public class ProducerConsumer {
private static final int CAPACITY = 5;
public static void main(String args[]) {
Queue<Integer> queue = new LinkedList<Integer>();
Thread producer1 = new Producer("P-1", queue, CAPACITY);
Thread producer2 = new Producer("P-2", queue, CAPACITY);
Thread consumer1 = new Consumer("C1", queue, CAPACITY);
Thread consumer2 = new Consumer("C2", queue, CAPACITY);
Thread consumer3 = new Consumer("C3", queue, CAPACITY);
producer1.start();
producer2.start();
consumer1.start();
consumer2.start();
consumer3.start();
}
}
生產(chǎn)者
public static class Producer extends Thread {
private Queue<Integer> queue;
String name;
int maxSize;
int i = 0;
public Producer(String name, Queue<Integer> queue, int maxSize) {
super(name);
this.name = name;
this.queue = queue;
this.maxSize = maxSize;
}
@Override
public void run() {
while (true) {
synchronized (queue) {
while (queue.size() == maxSize) {
try {
System.out.println("Queue is full, Producer[" + name + "] thread waiting for " + "consumer to take something from queue.");
queue.wait();
} catch (Exception ex) {
ex.printStackTrace();
}
}
System.out.println("[" + name + "] Producing value : +" + i);
queue.offer(i++);
queue.notifyAll();
try {
Thread.sleep(new Random().nextInt(1000));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}
消費(fèi)者
public static class Consumer extends Thread {
private Queue<Integer> queue;
String name;
int maxSize;
public Consumer(String name, Queue<Integer> queue, int maxSize) {
super(name);
this.name = name;
this.queue = queue;
this.maxSize = maxSize;
}
@Override
public void run() {
while (true) {
synchronized (queue) {
while (queue.isEmpty()) {
try {
System.out.println("Queue is empty, Consumer[" + name + "] thread is waiting for Producer");
queue.wait();
} catch (Exception ex) {
ex.printStackTrace();
}
}
int x = queue.poll();
System.out.println("[" + name + "] Consuming value : " + x);
queue.notifyAll();
try {
Thread.sleep(new Random().nextInt(1000));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}
判斷 Queue 大小為 0 或者大于等于 queueSize 時(shí)須使用while(condition){}
匙瘪,不能使用if(condition){}
。其中while(condition)
蝶缀,它又被叫做“自旋鎖”丹喻。為防止該線程沒有收到notify()
調(diào)用也從wait()
中返回(也稱作虛假喚醒),這個(gè)線程會(huì)重新去檢查 condition 條件以決定當(dāng)前是否可以安全地繼續(xù)執(zhí)行還是需要重新保持等待翁都,而不是認(rèn)為線程被喚醒了就可以安全地繼續(xù)執(zhí)行了驻啤。
四、使用 Lock 和 Condition 的 await()/signal()
在 JDK5.0 之后荐吵,Java 提供了更加健壯的線程處理機(jī)制骑冗,包括同步、鎖定先煎、線程池等贼涩,它們可以實(shí)現(xiàn)更細(xì)粒度的線程控制。Condition 接口的 await()/signal() 就是其中用來(lái)做同步的兩種方法薯蝎,它們的功能基本上和 Object 的 wait()/nofity() 相同遥倦,完全可以取代它們,但是它們和新引入的鎖定機(jī)制 Lock 直接掛鉤占锯,具有更大的靈活性袒哥。通過在 Lock 對(duì)象上調(diào)用newCondition(),將條件變量和一個(gè)鎖對(duì)象進(jìn)行綁定消略,進(jìn)而控制并發(fā)程序訪問競(jìng)爭(zhēng)資源的安全堡称。下面來(lái)看代碼:
/**
* 生產(chǎn)者消費(fèi)者模式:使用Lock和Condition實(shí)現(xiàn)
* {@link java.util.concurrent.locks.Lock}
* {@link java.util.concurrent.locks.Condition}
*/
public class ProducerConsumerByLock {
private static final int CAPACITY = 5;
private static final Lock lock = new ReentrantLock();
private static final Condition fullCondition = lock.newCondition(); //隊(duì)列滿的條件
private static final Condition emptyCondition = lock.newCondition(); //隊(duì)列空的條件
public static void main(String args[]) {
Queue<Integer> queue = new LinkedList<Integer>();
Thread producer1 = new Producer("P-1", queue, CAPACITY);
Thread producer2 = new Producer("P-2", queue, CAPACITY);
Thread consumer1 = new Consumer("C1", queue, CAPACITY);
Thread consumer2 = new Consumer("C2", queue, CAPACITY);
Thread consumer3 = new Consumer("C3", queue, CAPACITY);
producer1.start();
producer2.start();
consumer1.start();
consumer2.start();
consumer3.start();
}
}
生產(chǎn)者
public static class Producer extends Thread {
private Queue<Integer> queue;
String name;
int maxSize;
int i = 0;
public Producer(String name, Queue<Integer> queue, int maxSize) {
super(name);
this.name = name;
this.queue = queue;
this.maxSize = maxSize;
}
@Override
public void run() {
while (true) {
//獲得鎖
lock.lock();
while (queue.size() == maxSize) {
try {
System.out.println("Queue is full, Producer[" + name + "] thread waiting for " + "consumer to take something from queue.");
//條件不滿足,生產(chǎn)阻塞
fullCondition.await();
} catch (InterruptedException ex) {
ex.printStackTrace();
}
}
System.out.println("[" + name + "] Producing value : +" + i);
queue.offer(i++);
//喚醒其他所有生產(chǎn)者艺演、消費(fèi)者
fullCondition.signalAll();
emptyCondition.signalAll();
//釋放鎖
lock.unlock();
try {
Thread.sleep(new Random().nextInt(1000));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
消費(fèi)者
public static class Consumer extends Thread {
private Queue<Integer> queue;
String name;
int maxSize;
public Consumer(String name, Queue<Integer> queue, int maxSize) {
super(name);
this.name = name;
this.queue = queue;
this.maxSize = maxSize;
}
@Override
public void run() {
while (true) {
//獲得鎖
lock.lock();
while (queue.isEmpty()) {
try {
System.out.println("Queue is empty, Consumer[" + name + "] thread is waiting for Producer");
//條件不滿足却紧,消費(fèi)阻塞
emptyCondition.await();
} catch (Exception ex) {
ex.printStackTrace();
}
}
int x = queue.poll();
System.out.println("[" + name + "] Consuming value : " + x);
//喚醒其他所有生產(chǎn)者桐臊、消費(fèi)者
fullCondition.signalAll();
emptyCondition.signalAll();
//釋放鎖
lock.unlock();
try {
Thread.sleep(new Random().nextInt(1000));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
五、使用 BlockingQueue 阻塞隊(duì)列方法
JDK1.5 以后新增的java.util.concurrent包新增了 BlockingQueue 接口晓殊。并提供了如下幾種阻塞隊(duì)列實(shí)現(xiàn):
- java.util.concurrent.ArrayBlockingQueue
- java.util.concurrent.LinkedBlockingQueue
- java.util.concurrent.SynchronousQueue
- java.util.concurrent.PriorityBlockingQueue
實(shí)現(xiàn)生產(chǎn)者-消費(fèi)者模型使用 ArrayBlockingQueue 或者 LinkedBlockingQueue 即可断凶。
這里使用 LinkedBlockingQueue,它是一個(gè)已經(jīng)在內(nèi)部實(shí)現(xiàn)了同步的隊(duì)列巫俺,實(shí)現(xiàn)方式采用的是 await()/signal()认烁。它可以在生成對(duì)象時(shí)指定容量大小。它用于阻塞操作的是 put()/take()介汹。
- put():類似于上面的生產(chǎn)者線程砚著,容量達(dá)到最大時(shí),自動(dòng)阻塞痴昧。
- take():類似于上面的消費(fèi)者線程,容量為 0 時(shí)冠王,自動(dòng)阻塞赶撰。
LinkedBlockingQueue 類的 put() 源碼:
/** Main lock guarding all access */
final ReentrantLock lock = new ReentrantLock();
/** Condition for waiting takes */
private final Condition notEmpty = lock.newCondition();
/** Condition for waiting puts */
private final Condition notFull = lock.newCondition();
public void put(E e) throws InterruptedException {
putLast(e);
}
public void putLast(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
Node<E> node = new Node<E>(e);
final ReentrantLock lock = this.lock;
lock.lock();
try {
while (!linkLast(node))
notFull.await();
} finally {
lock.unlock();
}
}
整體邏輯如下:
/**
* 生產(chǎn)者消費(fèi)者模式:使用{@link java.util.concurrent.BlockingQueue}實(shí)現(xiàn)
*/
public class ProducerConsumerByBQ{
private static final int CAPACITY = 5;
public static void main(String args[]){
LinkedBlockingDeque<Integer> blockingQueue = new LinkedBlockingDeque<Integer>(CAPACITY);
Thread producer1 = new Producer("P-1", blockingQueue, CAPACITY);
Thread producer2 = new Producer("P-2", blockingQueue, CAPACITY);
Thread consumer1 = new Consumer("C1", blockingQueue, CAPACITY);
Thread consumer2 = new Consumer("C2", blockingQueue, CAPACITY);
Thread consumer3 = new Consumer("C3", blockingQueue, CAPACITY);
producer1.start();
producer2.start();
consumer1.start();
consumer2.start();
consumer3.start();
}
/**
* 生產(chǎn)者
*/
public static class Producer extends Thread{
private LinkedBlockingDeque<Integer> blockingQueue;
String name;
int maxSize;
int i = 0;
public Producer(String name, LinkedBlockingDeque<Integer> queue, int maxSize){
super(name);
this.name = name;
this.blockingQueue = queue;
this.maxSize = maxSize;
}
@Override
public void run(){
while(true){
try {
blockingQueue.put(i);
System.out.println("[" + name + "] Producing value : +" + i);
i++;
//暫停最多1秒
Thread.sleep(new Random().nextInt(1000));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
/**
* 消費(fèi)者
*/
public static class Consumer extends Thread{
private LinkedBlockingDeque<Integer> blockingQueue;
String name;
int maxSize;
public Consumer(String name, LinkedBlockingDeque<Integer> queue, int maxSize){
super(name);
this.name = name;
this.blockingQueue = queue;
this.maxSize = maxSize;
}
@Override
public void run(){
while(true){
try {
int x = blockingQueue.take();
System.out.println("[" + name + "] Consuming : " + x);
//暫停最多1秒
Thread.sleep(new Random().nextInt(1000));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}