生產(chǎn)者/消費(fèi)者模式實(shí)現(xiàn)
等待/通知模式最經(jīng)典的案例就是“生產(chǎn)者/消費(fèi)者模式”模式第晰。但此模式在使用上有幾種“變形”,還有一些小的注意事項(xiàng)彬祖,但原理都是基于wait/notify的茁瘦。
一生產(chǎn)者與一消費(fèi)者:操作值
模擬項(xiàng)目 創(chuàng)建一個(gè)類p_c_test
/**
* 輸出結(jié)果:
get的值是:1494750000110_26368251399553
set的值是:1494750000110_26368251410228
get的值是:1494750000110_26368251410228
set的值是:1494750000110_26368251420081
get的值是:1494750000110_26368251420081
set的值是:1494750000110_26368251431987
get的值是:1494750000110_26368251431987
set的值是:1494750000110_26368251442250
get的值是:1494750000110_26368251442250
set的值是:1494750000110_26368251452103
get的值是:1494750000110_26368251452103
set的值是:1494750000110_26368251462367
get的值是:1494750000110_26368251462367
set的值是:1494750000110_26368251474273
*結(jié)果分析:
例子展示一個(gè)消費(fèi)者一個(gè)生產(chǎn)者,進(jìn)行數(shù)據(jù)的交互储笑,在控制臺(tái)中打印的get和set是交互運(yùn)行的甜熔。
* @author jiaxing
*
*/
package entity;
/**
* 類說(shuō)明: 創(chuàng)建生產(chǎn)者類
*
* @author: Casin
* @Create: 2017-05-15 16:49
* @HOME: https://qincasin.github.io/
*/
public class P{
private String lock;
public P(String lock) {
super();
this.lock=lock;
}
public void setValue(){
try {
synchronized(lock){
if(!ValueObject.value.equals("")){
lock.wait(); //等待
}
//nanoTime():返回最準(zhǔn)確的可用系統(tǒng)計(jì)時(shí)器的當(dāng)前值,以毫微秒(納秒)為單位突倍。()不可用于計(jì)算兩時(shí)間差腔稀,具體看源碼注釋
String value=System.currentTimeMillis()+"_"+System.nanoTime();
System.out.println("set的值是:"+value);
ValueObject.value=value;
lock.notify(); //喚醒
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
package entity;
/**
* 類說(shuō)明: 消費(fèi)者類
*
* @author: Casin
* @Create: 2017-05-15 18:28
* @HOME: https://qincasin.github.io/
*/
public class C{
private String lock;
public C(String lock) {
super();
this.lock = lock;
}
public void getValue(){
try {
synchronized(lock){
if(ValueObject.value.equals("")){
lock.wait();//等待
}
System.out.println("get的值是:"+ValueObject.value);
ValueObject.value="";
lock.notify();//喚醒
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
package extthread;
import entity.P;
/**
* 類說(shuō)明: 創(chuàng)建線程P
*
* @author: Casin
* @Create: 2017-05-15 18:32
* @HOME: https://qincasin.github.io/
*/
public class ThreadP extends Thread {
private P p;
public ThreadP(P p) {
super();
this.p=p;
}
@Override
public void run() {
while (true){
p.setValue();
}
}
}
package extthread;
import entity.C;
/**
* 類說(shuō)明: 線程C
*
* @author: Casin
* @Create: 2017-05-15 18:37
* @HOME: https://qincasin.github.io/
*/
public class ThreadC extends Thread{
private C r;
public ThreadC(C r) {
this.r = r;
}
@Override
public void run() {
super.run();
while (true){
r.getValue();
}
}
}
package test;
import entity.C;
import entity.P;
import extthread.ThreadC;
import extthread.ThreadP;
/**
* 類說(shuō)明: 測(cè)試類
*
* @author: Casin
* @Create: 2017-05-15 18:40
* @HOME: https://qincasin.github.io/
*/
public class Run {
public static void main(String[] args) {
String lock = new String("");
P p = new P(lock);
C r = new C(lock);
ThreadP threadP = new ThreadP(p);
ThreadC threadC = new ThreadC(r);
threadP.start();
threadC.start();
}
}
但如果在此例子的基礎(chǔ)上,設(shè)計(jì)出多個(gè)生產(chǎn)者和多個(gè)消費(fèi)者羽历,那么在運(yùn)行的過(guò)程中極有可能出現(xiàn)“假死”的情況焊虏,也就是所有的線程都呈WAITING等待狀態(tài)。
多生產(chǎn)者多消費(fèi)者:操作值-假死
“假死”的現(xiàn)象其實(shí)就是線程進(jìn)入WAITING等待狀態(tài)窄陡,如果全部線程都進(jìn)入WAITING狀態(tài)炕淮,則程序就不在執(zhí)行任何業(yè)務(wù)功能了拆火,整個(gè)項(xiàng)目呈停止?fàn)顟B(tài)跳夭。這在使用生產(chǎn)者與消費(fèi)者模式時(shí)經(jīng)常遇到。
模擬項(xiàng)目们镜,創(chuàng)建類p_c_allWait
代碼如下:
package entity;
/**
* 類說(shuō)明: 生產(chǎn)者
*
* @author: Casin
* @Create: 2017-05-15 18:48
* @HOME: https://qincasin.github.io/
*/
public class P {
private String lock;
public P(String lock) {
this.lock = lock;
}
public void setValue(){
try {
synchronized (lock){
if(!ValueObject.value.equals("")){
System.out.println("生產(chǎn)者"+Thread.currentThread().getName()+" WAITINFG了※ ");
lock.wait();
}
System.out.println("生產(chǎn)者"+Thread.currentThread().getName()+" RUNNABLE了 ");
String value=System.currentTimeMillis()+"_"+System.nanoTime();
ValueObject.value=value;
lock.notify();
}
}catch (InterruptedException e){
e.printStackTrace();
}
}
}
package entity;
/**
* 類說(shuō)明: 消費(fèi)者
*
* @author: Casin
* @Create: 2017-05-15 18:52
* @HOME: https://qincasin.github.io/
*/
public class C {
private String lock;
public C(String lock) {
this.lock = lock;
}
public void getValue(){
try {
synchronized (lock){
while(ValueObject.value.equals("")){
System.out.println("消費(fèi)者"+Thread.currentThread().getName()+" WAITING了☆");
lock.wait();
}
System.out.println("消費(fèi)者"+Thread.currentThread().getName()+" RUNNABLE了");
ValueObject.value="";
lock.notify();
}
}catch (InterruptedException e){
e.printStackTrace();
}
}
}
package entity;
/**
* 類說(shuō)明: 存儲(chǔ)值的對(duì)象
*
* @author: Casin
* @Create: 2017-05-15 18:48
* @HOME: https://qincasin.github.io/
*/
public class ValueObject {
public static String value="";
}
package extthread;
import entity.P;
/**
* 類說(shuō)明: 線程p
*
* @author: Casin
* @Create: 2017-05-15 18:54
* @HOME: https://qincasin.github.io/
*/
public class ThreadP extends Thread{
private P p;
public ThreadP(P p) {
super();
this.p = p;
}
@Override
public void run() {
super.run();
while (true){
p.setValue();
}
}
}
package extthread;
import entity.C;
/**
* 類說(shuō)明: ThreadC線程
*
* @author: Casin
* @Create: 2017-05-15 18:55
* @HOME: https://qincasin.github.io/
*/
public class ThreadC extends Thread{
private C r;
public ThreadC(C r) {
super();
this.r = r;
}
@Override
public void run() {
super.run();
while (true){
r.getValue();
}
}
}
package test;
import entity.C;
import entity.P;
import extthread.ThreadC;
import extthread.ThreadP;
/**
*
消費(fèi)者消費(fèi)者1 WAITING了☆
消費(fèi)者消費(fèi)者2 WAITING了☆
生產(chǎn)者生產(chǎn)者1 RUNNABLE了
生產(chǎn)者生產(chǎn)者1 WAITINFG了※
生產(chǎn)者生產(chǎn)者2 RUNNABLE了
生產(chǎn)者生產(chǎn)者2 WAITINFG了※
消費(fèi)者消費(fèi)者1 RUNNABLE了
消費(fèi)者消費(fèi)者1 WAITING了☆
消費(fèi)者消費(fèi)者2 WAITING了☆
main - RUNNABLE
Monitor Ctrl-Break - RUNNABLE
生產(chǎn)者1 - WAITING
消費(fèi)者1 - WAITING
生產(chǎn)者2 - WAITING
消費(fèi)者2 - WAITING
*解釋:輸出結(jié)果中※符號(hào)代表本線程進(jìn)入等待狀態(tài)币叹,需要額外注意這樣的執(zhí)行結(jié)果
* 在代碼中確實(shí)已經(jīng)通過(guò)wait/notify進(jìn)行通信了,但不保證notify喚醒的是異類模狭,也許是同類颈抚,
* 比如“生產(chǎn)者”喚醒“生產(chǎn)者”,或“消費(fèi)者”喚醒“消費(fèi)者”這樣的情況嚼鹉。
* 如果按這樣的情況運(yùn)行的比例積少成多贩汉,就會(huì)導(dǎo)致所有的線程都不能繼續(xù)運(yùn)行下去,大家都在等待锚赤,都呈WAITING狀態(tài)匹舞,程序最后也就成“假死”狀態(tài),不能繼續(xù)運(yùn)行下去了线脚。
*/
/**
* 類說(shuō)明: 測(cè)試類
*
* @author: Casin
* @Create: 2017-05-15 18:56
* @HOME: https://qincasin.github.io/
*/
public class Run {
public static void main(String[] args) throws InterruptedException {
String lock = new String("");
P p = new P(lock);
C r = new C(lock);
ThreadP[] threadP = new ThreadP[2];
ThreadC[] threadC =new ThreadC[2];
for (int i=0;i<2;i++){
threadP[i] = new ThreadP(p);
threadP[i].setName("生產(chǎn)者"+(i+1));
threadC[i] = new ThreadC(r);
threadC[i].setName("消費(fèi)者"+(i+1));
threadP[i].start();
System.out.println("-------------11111--");
threadC[i].start();
}
Thread.sleep(5000);
//getThreadGroup():返回此線程所屬的線程組赐稽,如果線程已經(jīng)停止叫榕,此方法返回null
//activeCount返回當(dāng)前線程組以及子組中活動(dòng)線程數(shù)的估計(jì)
//enumerate():將當(dāng)前線程的線程組及其子組中的每個(gè)活動(dòng)線程復(fù)制到指定的數(shù)組中
//getState():返回線程的當(dāng)前狀態(tài)--->總共六種狀態(tài)
Thread[] threadArray = new Thread[Thread.currentThread().getThreadGroup().activeCount()];
Thread.currentThread().getThreadGroup().enumerate(threadArray);
for(int i=0;i<threadArray.length;i++){
System.out.println(threadArray[i].getName()+" - "+threadArray[i].getState());
}
}
}
多生產(chǎn)者多消費(fèi)者:操作值
解決“假死”的方法,將上面的項(xiàng)目p_c_allWait中P.java和C.java文件中的notify()改成notifyAll()就可以解決了姊舵。原理是:不光同志同類線程晰绎,也包括異類。這樣就至于出現(xiàn)假死的狀態(tài)了括丁,程序會(huì)一直運(yùn)行下去荞下。
一生產(chǎn)者一消費(fèi)者:操作棧
本例子是使生產(chǎn)者想堆棧List對(duì)象中放入對(duì)象,使消費(fèi)者從List堆棧中取出數(shù)據(jù)躏将,List最大容量是1锄弱,實(shí)驗(yàn)環(huán)境只有一個(gè)生產(chǎn)者與一個(gè)消費(fèi)者。
- 創(chuàng)建項(xiàng)目stack_1
package entity;
import java.util.ArrayList;
import java.util.List;
/**
* 類說(shuō)明:
*
* @author: Casin
* @Create: 2017-05-15 20:26
* @HOME: https://qincasin.github.io/
*/
public class MyStack {
private List list = new ArrayList();
//入棧
synchronized public void push(){
try {
if(list.size()==1){
this.wait();//當(dāng)棧中元素為1時(shí)進(jìn)行等待操作
}
list.add("anyString="+Math.random());
this.notify();//喚醒
System.out.println("push="+list.size());
}catch (InterruptedException e){
e.printStackTrace();
}
}
//出棧
synchronized public String pop(){
String returnValue="";
try {
if (list.size()==0){
System.out.println("pop操作中的"+Thread.currentThread().getName()+" 線程呈wait狀態(tài)");
this.wait();
}
returnValue=""+list.get(0);
list.remove(0);
this.notify();
System.out.println("pop="+list.size());
}catch (InterruptedException e){
e.printStackTrace();
}
return returnValue;
}
}
package extthread;
import service.C;
/**
* 類說(shuō)明: 消費(fèi)者線程用于pop操作
*
* @author: Casin
* @Create: 2017-05-15 20:39
* @HOME: https://qincasin.github.io/
*/
public class C_Thread extends Thread {
private C c;
public C_Thread(C c) {
this.c = c;
}
@Override
public void run() {
super.run();
while (true){
c.popService();
}
}
}
package extthread;
import service.P;
/**
* 類說(shuō)明: 生產(chǎn)者線程push操作
*
* @author: Casin
* @Create: 2017-05-15 20:34
* @HOME: https://qincasin.github.io/
*/
public class P_Thread extends Thread {
private P p;
public P_Thread(P p) {
this.p = p;
}
@Override
public void run() {
super.run();
while (true){
p.pushService();
}
}
}
package service;
import entity.MyStack;
/**
* 類說(shuō)明: 消費(fèi)者用于pop操作
*
* @author: Casin
* @Create: 2017-05-15 20:37
* @HOME: https://qincasin.github.io/
*/
public class C {
private MyStack myStack;
public C(MyStack myStack) {
this.myStack = myStack;
}
public void popService(){
System.out.println("pop="+myStack.pop());
}
}
package service;
import entity.MyStack;
/**
* 類說(shuō)明: 消費(fèi)者用于pop操作
*
* @author: Casin
* @Create: 2017-05-15 20:37
* @HOME: https://qincasin.github.io/
*/
public class C {
private MyStack myStack;
public C(MyStack myStack) {
this.myStack = myStack;
}
public void popService(){
System.out.println("pop="+myStack.pop());
}
}
package test;
import entity.MyStack;
import extthread.C_Thread;
import extthread.P_Thread;
import service.C;
import service.P;
/**
* 輸出結(jié)果:
push=1
pop=0
pop=anyString=0.6610059962956492
pop操作中的Thread-1 線程呈wait狀態(tài)
push=1
pop=0
pop=anyString=0.17516237577017224
pop操作中的Thread-1 線程呈wait狀態(tài)
push=1
pop=0
pop=anyString=0.004120762121210819
pop操作中的Thread-1 線程呈wait狀態(tài)
push=1
pop=0
pop=anyString=0.3785755506701072
pop操作中的Thread-1 線程呈wait狀態(tài)
*解釋:
* 結(jié)果中顯示消費(fèi)者放push一個(gè)祸憋,則消費(fèi)者pop一個(gè)会宪,當(dāng)沒(méi)有push時(shí),則消費(fèi)者呈現(xiàn)waiting狀態(tài)
* 容器size()的值不會(huì)大于1蚯窥,這也是例子想要實(shí)現(xiàn)的效果掸鹅,值在0和1之間進(jìn)行交替,也就是生產(chǎn)者和消費(fèi)者這兩個(gè)過(guò)程在交替執(zhí)行拦赠。
*/
/**
* 類說(shuō)明: 測(cè)試一生產(chǎn)者一消費(fèi)者:操作棧類
*
* @author: Casin
* @Create: 2017-05-15 20:41
* @HOME: https://qincasin.github.io/
*/
public class Run {
public static void main(String[] args) {
MyStack myStack = new MyStack();
P p = new P(myStack);
C c = new C(myStack);
P_Thread p_thread = new P_Thread(p);
C_Thread c_thread = new C_Thread(c);
p_thread.start();
c_thread.start();
}
}
一生產(chǎn)者多消費(fèi)者--操作棧:解決wait條件改變與假死
本例是使用一個(gè)生產(chǎn)者向堆棧List對(duì)象中放入數(shù)據(jù)巍沙,而多個(gè)消費(fèi)者從List堆棧中取出數(shù)據(jù)妖碉。List最大容量仍然還是1
創(chuàng)建新的項(xiàng)目另锋,將上個(gè)項(xiàng)目中的代碼全部復(fù)制到新項(xiàng)目中妇多;
代碼如下:
解釋:
- 當(dāng)條件發(fā)生改變時(shí)并沒(méi)有得到及時(shí)的相應(yīng)途蒋,會(huì)導(dǎo)致多個(gè)呈wait狀態(tài)的線程都被喚醒豪诲,繼而執(zhí)行l(wèi)ist.remove(0)代碼而出現(xiàn)異常畴椰。
- 解決辦法:將Mystack中的if修改為while滑臊。
- 但是這時(shí)會(huì)出現(xiàn)程序假死狀態(tài)
- 因此使用notifyAll()解決假死狀態(tài)
package entity;
import java.util.ArrayList;
import java.util.List;
/**
* 類說(shuō)明:
*
* @author: Casin
* @Create: 2017-05-15 20:26
* @HOME: https://qincasin.github.io/
*/
public class MyStack {
private List list = new ArrayList();
//入棧
synchronized public void push(){
try {
while(list.size()==1){
this.wait();//當(dāng)棧中元素為1時(shí)進(jìn)行等待操作
}
list.add("anyString="+Math.random());
this.notifyAll();//喚醒
System.out.println("push="+list.size());
}catch (InterruptedException e){
e.printStackTrace();
}
}
//出棧
synchronized public String pop(){
String returnValue="";
try {
while (list.size()==0){
System.out.println("pop操作中的"+Thread.currentThread().getName()+" 線程呈wait狀態(tài)");
this.wait();
}
returnValue=""+list.get(0);
list.remove(0);
this.notifyAll();
System.out.println("pop="+list.size());
}catch (InterruptedException e){
e.printStackTrace();
}
return returnValue;
}
}
package test;
import entity.MyStack;
import extthread.C_Thread;
import extthread.P_Thread;
import service.C;
import service.P;
/**
* 輸出結(jié)果:
push=1
pop=0
pop=anyString=0.39424855100604783
pop操作中的Thread-3 線程呈wait狀態(tài)
pop操作中的Thread-5 線程呈wait狀態(tài)
pop操作中的Thread-4 線程呈wait狀態(tài)
pop操作中的Thread-2 線程呈wait狀態(tài)
push=1
pop=0
*解釋:
* 程序會(huì)一直執(zhí)行下去蚪拦,當(dāng)生產(chǎn)者push一個(gè)后牍疏,那么多個(gè)消費(fèi)者只有一個(gè)會(huì)pot蠢笋,其余的全部等待。
* 解決了wait條件改變與假死的bug
*/
/**
* 類說(shuō)明: 測(cè)試一生產(chǎn)者多消費(fèi)者:操作棧類
*
* @author: Casin
* @Create: 2017-05-15 20:41
* @HOME: https://qincasin.github.io/
*/
public class Run {
public static void main(String[] args) {
MyStack myStack = new MyStack();
P p = new P(myStack);
C c1 = new C(myStack);
C c2 = new C(myStack);
C c3 = new C(myStack);
C c4 = new C(myStack);
C c5 = new C(myStack);
//啟動(dòng)一個(gè)生產(chǎn)者
P_Thread p_thread = new P_Thread(p);
p_thread.start();
//啟動(dòng)多個(gè)消費(fèi)者
C_Thread c_thread1 = new C_Thread(c1);
C_Thread c_thread2 = new C_Thread(c2);
C_Thread c_thread3 = new C_Thread(c3);
C_Thread c_thread4 = new C_Thread(c4);
C_Thread c_thread5 = new C_Thread(c5);
c_thread1.start();
c_thread2.start();
c_thread3.start();
c_thread4.start();
c_thread5.start();
}
}
多生產(chǎn)者與一消費(fèi)者:操作棧
本例子展示使用消費(fèi)者向堆棧List對(duì)象中放入數(shù)據(jù)鳞陨,使用消費(fèi)者從List堆棧中取出數(shù)據(jù)昨寞。
List最大容量還是1
創(chuàng)建新項(xiàng)目,將上一個(gè)項(xiàng)目全部復(fù)制到新項(xiàng)目中厦滤,修改Run方法中代碼
package test;
import entity.MyStack;
import extthread.C_Thread;
import extthread.P_Thread;
import service.C;
import service.P;
/**
* 類說(shuō)明: 多生產(chǎn)者與一消費(fèi)者:操作棧
*
* @author: Casin
* @Create: 2017-05-16 19:37
* @HOME: https://qincasin.github.io/
*/
public class Run {
public static void main(String[] args) {
MyStack myStack = new MyStack();
//創(chuàng)建多個(gè)生產(chǎn)者
P p1 = new P(myStack);
P p2 = new P(myStack);
P p3 = new P(myStack);
P p4 = new P(myStack);
P p5 = new P(myStack);
P p6 = new P(myStack);
P_Thread p_thread1 = new P_Thread(p1);
P_Thread p_thread2 = new P_Thread(p2);
P_Thread p_thread3 = new P_Thread(p3);
P_Thread p_thread4 = new P_Thread(p4);
P_Thread p_thread5 = new P_Thread(p5);
P_Thread p_thread6 = new P_Thread(p6);
p_thread1.start();
p_thread2.start();
p_thread3.start();
p_thread4.start();
p_thread5.start();
p_thread6.start();
//創(chuàng)建一個(gè)消費(fèi)者
C c = new C(myStack);
C_Thread c_thread = new C_Thread(c);
c_thread.start();
}
}
最后結(jié)果顯示正常
多生產(chǎn)者與多消費(fèi)者:操作棧
本例子是使用生產(chǎn)者向棧List對(duì)象中放入數(shù)據(jù)援岩,使用消費(fèi)者從List棧中取出數(shù)據(jù)。List最大容量是1掏导,實(shí)驗(yàn)環(huán)境是多個(gè)生產(chǎn)者與多個(gè)生產(chǎn)者享怀。
代碼比較簡(jiǎn)單,只需要在Run類中多添加幾個(gè)消費(fèi)者就可以了碘菜。輸出結(jié)果也是可想而知凹蜈,當(dāng)一個(gè)生產(chǎn)者push一個(gè)后限寞,那么消費(fèi)者只有一個(gè)可以pop,其余呈現(xiàn)waiting狀態(tài)仰坦。