生產(chǎn)者消費(fèi)者模型
生產(chǎn)者消費(fèi)者模型是一個典型的多線程問題悠咱,涉及生產(chǎn)者、消費(fèi)者征炼、產(chǎn)品倉庫析既。生產(chǎn)者生產(chǎn)的產(chǎn)品放入倉庫中、消費(fèi)者從倉庫中取走產(chǎn)品谆奥。倉庫可看成是阻塞隊列眼坏,有如下關(guān)系。
- 倉庫放滿產(chǎn)品后生產(chǎn)者停止生產(chǎn)
- 倉庫中沒有產(chǎn)品時消費(fèi)者停止消費(fèi)
- 倉庫不滿的時候所有生產(chǎn)者都進(jìn)行工作,倉庫有產(chǎn)品后通知消費(fèi)者消費(fèi)
- 倉庫有產(chǎn)品的時候所有消費(fèi)者都工作宰译,倉庫不滿時通知生產(chǎn)者生產(chǎn)
代碼實現(xiàn)
import java.util.concurrent.atomic.AtomicInteger;
/**
* @author HXJ
* @date 2018/7/26
*/
public class ProduceConsume {
static class Produce {
public Produce(String name) {
this.name = name;
}
private String name;
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
@Override
public String toString() {
return "Produce{" +
"name='" + name + '\'' +
'}';
}
}
//單向鏈表檐蚜,用于構(gòu)建隊列
static class Node {
Produce produce;
Node next;
Node(Produce produce) { this.produce = produce; }
}
static class ProduceDepot {
private int capacity;
private final AtomicInteger count = new AtomicInteger();
private Node head;
private Node tail;
private Object notFull = new Object();
private Object notEmpty = new Object();
public ProduceDepot(int capacity) {
this.capacity = capacity;
this.head = this.tail = new Node(null);
}
public void produce(Produce produce) {
Node node = new Node(produce);
int num;
synchronized (notFull) {
while (count.get() == this.capacity) {
try {
//如果倉庫滿了,停止生產(chǎn)
System.out.println(Thread.currentThread().getName() + " 倉庫滿了沿侈,停止生產(chǎn)");
notFull.wait();
} catch (InterruptedException e) {
}
}
//新生產(chǎn)的產(chǎn)品排隊放入倉庫闯第,放在隊列末尾
this.tail = this.tail.next = node;
num = count.getAndIncrement();
if (num + 1 < this.capacity) {
//如果隊列不滿了通知生產(chǎn)
notFull.notifyAll();
System.out.println(Thread.currentThread().getName() + " 倉庫沒滿,通知其他生產(chǎn)者");
}
}
if (num == 0) {
synchronized (notEmpty) {
System.out.println(Thread.currentThread().getName() + " 倉庫有產(chǎn)品缀拭,通知消費(fèi)者消費(fèi)");
notEmpty.notifyAll();
}
}
}
public Produce consume() {
int num;
Produce produce;
synchronized (notEmpty) {
while (count.get() == 0) {
try {
System.out.println(Thread.currentThread().getName() + " 倉庫空了咳短,停止消費(fèi)");
notEmpty.wait();
} catch (InterruptedException e) {
}
}
//把先進(jìn)倉庫的產(chǎn)品取出
Node h = this.head;
Node first = this.head.next;
this.head = first;
h.next = null; //gc
produce = first.produce;
first.produce = null;
num = count.getAndDecrement();
if (num > 1) {
System.out.println(Thread.currentThread().getName() + " 倉庫有產(chǎn)品,通知其他消費(fèi)者");
notEmpty.notifyAll();
}
}
if (num == this.capacity) {
//隊列不滿了蛛淋,通知生產(chǎn)者
synchronized (notFull) {
System.out.println(Thread.currentThread().getName() + " 倉庫不滿了咙好,通知繼續(xù)生產(chǎn)");
notFull.notifyAll();
}
}
return produce;
}
}
public static void main(String[] args) throws InterruptedException {
ProduceDepot depot = new ProduceDepot(10);
new Thread(new Runnable() {
@Override
public void run() {
int i = 0;
while (true) {
depot.produce(new Produce("produce-" + i));
i ++;
}
}
}, "produce").start();
new Thread(new Runnable() {
@Override
public void run() {
while (true) {
System.out.println("消費(fèi)產(chǎn)品:" + depot.consume());
}
}
}, "consume").start();
Thread.currentThread().join();
}
}
運(yùn)行結(jié)果:
...
消費(fèi)產(chǎn)品:Produce{name='produce-67303'}
consume 倉庫空了,停止消費(fèi)
produce 倉庫沒滿褐荷,通知其他生產(chǎn)者
produce 倉庫沒滿勾效,通知其他生產(chǎn)者
produce 倉庫有產(chǎn)品,通知消費(fèi)者消費(fèi)
produce 倉庫沒滿叛甫,通知其他生產(chǎn)者
consume 倉庫有產(chǎn)品层宫,通知其他消費(fèi)者
消費(fèi)產(chǎn)品:Produce{name='produce-67304'}
consume 倉庫有產(chǎn)品,通知其他消費(fèi)者
消費(fèi)產(chǎn)品:Produce{name='produce-67305'}
消費(fèi)產(chǎn)品:Produce{name='produce-67306'}
consume 倉庫空了合溺,停止消費(fèi)
produce 倉庫沒滿卒密,通知其他生產(chǎn)者
produce 倉庫沒滿缀台,通知其他生產(chǎn)者
produce 倉庫有產(chǎn)品棠赛,通知消費(fèi)者消費(fèi)
produce 倉庫沒滿,通知其他生產(chǎn)者
consume 倉庫有產(chǎn)品膛腐,通知其他消費(fèi)者
消費(fèi)產(chǎn)品:Produce{name='produce-67307'}
consume 倉庫有產(chǎn)品睛约,通知其他消費(fèi)者
消費(fèi)產(chǎn)品:Produce{name='produce-67308'}
消費(fèi)產(chǎn)品:Produce{name='produce-67309'}
consume 倉庫空了,停止消費(fèi)
produce 倉庫沒滿哲身,通知其他生產(chǎn)者
produce 倉庫沒滿辩涝,通知其他生產(chǎn)者
produce 倉庫有產(chǎn)品,通知消費(fèi)者消費(fèi)
produce 倉庫沒滿勘天,通知其他生產(chǎn)者
produce 倉庫沒滿怔揩,通知其他生產(chǎn)者
produce 倉庫沒滿,通知其他生產(chǎn)者
produce 倉庫沒滿脯丝,通知其他生產(chǎn)者
produce 倉庫沒滿商膊,通知其他生產(chǎn)者
produce 倉庫沒滿,通知其他生產(chǎn)者
produce 倉庫沒滿宠进,通知其他生產(chǎn)者
produce 倉庫沒滿晕拆,通知其他生產(chǎn)者
produce 倉庫沒滿,通知其他生產(chǎn)者
produce 倉庫滿了材蹬,停止生產(chǎn)
consume 倉庫有產(chǎn)品实幕,通知其他消費(fèi)者
消費(fèi)產(chǎn)品:Produce{name='produce-67310'}
consume 倉庫有產(chǎn)品吝镣,通知其他消費(fèi)者
consume 倉庫不滿了,通知繼續(xù)生產(chǎn)
...
結(jié)果說明:
倉庫(阻塞隊列)昆庇,實現(xiàn)的關(guān)鍵有兩個地方末贾,一是倉庫產(chǎn)品數(shù)量的變更是通過AtomicInteger進(jìn)行原子操作的,即庫存的增加或減少都是按順序執(zhí)行的整吆;二是對生產(chǎn)者和消費(fèi)者進(jìn)行了分組未舟,生產(chǎn)者阻塞/喚醒在notFull監(jiān)視器上,消費(fèi)者阻塞/喚醒在notEmpty監(jiān)視器上掂为。兩組線程相互獨(dú)立裕膀,通過監(jiān)視器進(jìn)行通信,生產(chǎn)者工作的條件是倉庫不滿勇哗,停止工作的條件是倉庫已滿昼扛,阻塞等待在notFull對象上,換句話說就是欲诺,倉庫裝滿了抄谐,到notFull這個地方歇著吧!當(dāng)生產(chǎn)者把生產(chǎn)的產(chǎn)品放入倉庫后扰法,通知消費(fèi)者消費(fèi)蛹含;倉庫空了,消費(fèi)者停止工作塞颁,倉庫不滿的時候通知生產(chǎn)者浦箱。