1.用Object#wait/notifyAll 實(shí)現(xiàn)
package com.multithread.condition;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
/**
* created by Ethan-Walker on 2019/2/14
*/
public class ConsumerProducer {
private ReentrantLock lock = new ReentrantLock();
private Condition notFull = lock.newCondition(); // 生產(chǎn)者線程等待隊(duì)列
private Condition notEmpty = lock.newCondition(); // 消費(fèi)者線程等待隊(duì)列
private String[] items;
private int count;
private int takePos, putPos;
public ConsumerProducer(int maxSize) {
this.items = new String[maxSize];
}
public void produce(String a) {
lock.lock();
try {
while (items.length == count) { // 緩存已滿
notFull.await(); // 當(dāng)前線程加到 生產(chǎn)者線程阻塞隊(duì)列中
}
// 生產(chǎn)一個值
items[putPos] = a;
putPos = (putPos + 1) % items.length;
count++;
notEmpty.signal(); // 喚醒消費(fèi)者線程中的一個
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
public String consume() {
lock.lock();
String res = null;
try {
while (count == 0) {// 緩存為空
notEmpty.await(); //
}
res = items[takePos];
takePos = (takePos + 1) % items.length;
count--;
notFull.signal(); // 喚醒生產(chǎn)者線程的一個
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
return res;
}
}
public static void main(String[] args) throws InterruptedException {
ConsumerProducer cp = new ConsumerProducer(10);
// 10 個生產(chǎn)者乔遮,每個生產(chǎn)者生產(chǎn)3個產(chǎn)品
Thread[] producers = new Thread[10];
for (int i = 0; i < 10; i++) {
producers[i] = new Thread(new Producer(cp), "producer-" + i);
}
//5個消費(fèi)者枯怖,消費(fèi)所有的產(chǎn)品
Thread[] consumers = new Thread[5];
for (int i = 0; i < 5; i++) {
consumers[i] = new Thread(new Consumer(cp), "consumer-" + i);
consumers[i].start();
}
for (int i = 0; i < 10; i++) {
producers[i].start();
}
}
}
// 每個生產(chǎn)者可以生產(chǎn) 3 個產(chǎn)品
class Producer implements Runnable {
private ConsumerProducer cp;
public Producer(ConsumerProducer cp) {
this.cp = cp;
}
@Override
public void run() {
for (int i = 0; i < 3; i++) {
try {
Thread.sleep(3000); // 生產(chǎn)一個產(chǎn)品花費(fèi)的時(shí)間
cp.produce(Thread.currentThread().getName() + ":" + i);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
//每個消費(fèi)者可以重復(fù)消費(fèi)
class Consumer implements Runnable {
private ConsumerProducer cp;
public Consumer(ConsumerProducer cp) {
this.cp = cp;
}
@Override
public void run() {
while (true) {
try {
Thread.sleep(500);// 消費(fèi)者消費(fèi)需要占用一定的時(shí)間
} catch (InterruptedException e) {
e.printStackTrace();
}
String pro = cp.consume();
System.out.println(Thread.currentThread().getName() + " 消耗了: " + pro);
}
}
}
3. 用 BlockingQueue 實(shí)現(xiàn)
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.logging.Level;
import java.util.logging.Logger;
public class ProducerConsumerPattern {
public static void main(String args[]) throws InterruptedException {
//Creating shared object
BlockingQueue sharedQueue = new ArrayBlockingQueue(8);
// 5個生產(chǎn)者,每個生產(chǎn)3個產(chǎn)品
Thread[] producers = new Thread[5];
for (int i = 0; i < 5; i++) {
producers[i] = new Thread(new Producer(sharedQueue), "producer-" + i);
producers[i].start();
}
Thread.sleep(5000);
// 2個消費(fèi)者厉斟,每個可以重復(fù)消費(fèi)
Thread[] consumers = new Thread[2];
for (int i = 0; i < 2; i++) {
consumers[i] = new Thread(new Consumer(sharedQueue), "consumer-" + i);
consumers[i].start();
}
}
}
//Producer Class in java
class Producer implements Runnable {
private final BlockingQueue sharedQueue;
public Producer(BlockingQueue sharedQueue) {
this.sharedQueue = sharedQueue;
}
static AtomicInteger count = new AtomicInteger(0);
@Override
public void run() {
for (int i = 0; i < 3; i++) {
try {
Thread.sleep(2000);
sharedQueue.put("product-" + count.incrementAndGet());
} catch (InterruptedException ex) {
Logger.getLogger(Producer.class.getName()).log(Level.SEVERE, null, ex);
}
}
}
}
//Consumer Class in Java
class Consumer implements Runnable {
private final BlockingQueue sharedQueue;
public Consumer(BlockingQueue sharedQueue) {
this.sharedQueue = sharedQueue;
}
@Override
public void run() {
while (true) {
try {
System.out.println(Thread.currentThread().getName() + " Consumed: " + sharedQueue.take());
Thread.sleep(1500);
} catch (InterruptedException ex) {
Logger.getLogger(Consumer.class.getName()).log(Level.SEVERE, null, ex);
}
}
}
}