本文主要介紹java中生產(chǎn)者/消費(fèi)者模式的實(shí)現(xiàn),對(duì)java線程鎖機(jī)制的一次深入理解。
生產(chǎn)者/消費(fèi)者模型
生產(chǎn)者/消費(fèi)者模型要保證,同一個(gè)資源在同一時(shí)間節(jié)點(diǎn)下只能被最多一個(gè)線程訪問(wèn),這個(gè)在java中用鎖就很容易實(shí)現(xiàn)汤功。
下面的例子就是模擬多個(gè)生產(chǎn)者生產(chǎn),多個(gè)消費(fèi)者消費(fèi)的demo
//抽象生產(chǎn)者
public abstract class AbstractProducer implements Runnable {
abstract void produce() throws InterruptedException;
@Override
public void run() {
try {
while (true) {
produce();
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
//抽象消費(fèi)者
public abstract class AbstractConsumer implements Runnable {
abstract void consume() throws InterruptedException;
@Override
public void run() {
try {
while (true) {
consume();
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public class ConsumerAndProducerDemo {
private static final AtomicInteger ATOMIC_INTEGER = new AtomicInteger();
private static final ReentrantLock LOCK = new ReentrantLock();
private static final Condition CONDITION = LOCK.newCondition();
private static final Queue<Product> PRODUCTS = new LinkedList<>();
private static final int SIZE = 4;
public static class Product {
int id;
Product(int id) {
this.id = id;
}
}
//實(shí)現(xiàn)消費(fèi)者
private static class Consumer extends AbstractConsumer {
@Override
void consume() throws InterruptedException {
try {
LOCK.lock();
while (PRODUCTS.isEmpty()) {
CONDITION.await();
}
Product product = PRODUCTS.poll();
Thread.sleep((long) (500 + Math.random() * 1000));
System.out.println(" consume product " + product.id);
CONDITION.signalAll();
} finally {
LOCK.unlock();
}
}
}
//實(shí)現(xiàn)生產(chǎn)者
private static class Producer extends AbstractProducer {
@Override
void produce() throws InterruptedException {
try {
LOCK.lock();
while (PRODUCTS.size() >= SIZE) {
CONDITION.await();
}
Thread.sleep(1000);
Product product = new Product(ATOMIC_INTEGER.incrementAndGet());
PRODUCTS.add(product);
System.out.println("produce product " + product.id);
CONDITION.signalAll();
} finally {
LOCK.unlock();
}
}
}
public static void main(String[] args) {
for (int index = 0; index < 2; index++) {
new Thread(new Producer()).start();
}
for (int index = 0; index < 3; index++) {
new Thread(new Consumer()).start();
}
}
}
上面的demo這么實(shí)現(xiàn)
- 啟動(dòng)多個(gè)線程模擬多個(gè)生產(chǎn)者和多個(gè)消費(fèi)者
- 同時(shí)使用了queue用來(lái)緩存產(chǎn)品
- 當(dāng)緩存區(qū)沒(méi)滿(mǎn)時(shí)生產(chǎn)者生產(chǎn)
- 當(dāng)緩沖區(qū)滿(mǎn)時(shí)消費(fèi)者開(kāi)始消費(fèi)
線程之間的同步控轿,這里使用了ReentrantLock冤竹,ReentrantLock在之前的博客中有介紹過(guò),當(dāng)然也可以使用Object自帶的wait()等方法茬射,實(shí)現(xiàn)同步這里就不在修改demo另行實(shí)現(xiàn)了鹦蠕。