前言
大多情況下坛悉,程序中需要不同的線程做不同的事荷憋,比如一個(gè)線程對共享變量做tickets++操作台颠,另一個(gè)線程對共享變量做tickets–操作,這就是生產(chǎn)者和消費(fèi)者模式勒庄。
正文
一串前,生產(chǎn)者-消費(fèi)者模式也是多線程
生產(chǎn)者和消費(fèi)者模式也是多線程的范例瘫里。所以其編程需要遵循多線程的規(guī)矩。
首先荡碾,既然是多線程谨读,就必然要使用同步。上回說到坛吁,synchronized關(guān)鍵字在修飾函數(shù)的時(shí)候漆腌,使用的是“this”鎖,所以在同一個(gè)類中的函數(shù)被synchronized修飾后阶冈,使用的是同一把鎖。線程調(diào)用這些函數(shù)時(shí)塑径,不管調(diào)用的是tickets++操作函數(shù)女坑,還是tickets–函數(shù),都會先去判斷是否加鎖了统舀,得到鎖之后再去進(jìn)行具體的操作匆骗。
我們先用代碼把程序中的資源,生產(chǎn)者誉简,消費(fèi)者表示出來碉就。
package com.jimmy.ThreadCommunication;
class Resource{ // 資源類
private String productName; // 資源名稱
private int count = 1; // 資源編號
public void produce(String name){ // 生產(chǎn)資源函數(shù)
this.productName = name + count;
count ++; // 資源編號遞增,用來模擬資源遞增
System.out.println(Thread.currentThread().getName()+"...生產(chǎn)者.."+this.productName);
}
public void consume() { // 消費(fèi)資源函數(shù)
System.out.println(Thread.currentThread().getName()+"...消費(fèi)者.."+this.productName);
}
}
class Producer implements Runnable{ // 生產(chǎn)者類闷串,用于開啟生產(chǎn)者線程
private Resource res;
//生產(chǎn)者初始化就要分配資源
public Producer(Resource res) {
this.res = res;
}
@Override
public void run() {
for (int i = 0; i < 10; i++) {
res.produce("bread"); // 循環(huán)生產(chǎn)10次
}
}
}
class Comsumer implements Runnable{ // 消費(fèi)者類瓮钥,用于開啟消費(fèi)者線程
private Resource res;
//同理,消費(fèi)者一初始化也要分配資源
public Comsumer(Resource res) {
this.res = res;
}
@Override
public void run() {
for (int i = 0; i < 10; i++) {
res.consume(); // 循環(huán)消費(fèi)10次
}
}
}
public class ProducerAndConsumer1 {
public static void main(String[] args) {
Resource resource = new Resource(); // 實(shí)例化資源
Producer producer = new Producer(resource); // 實(shí)例化生產(chǎn)者和消費(fèi)者類烹吵,它們?nèi)〉猛粋€(gè)資源
Comsumer comsumer = new Comsumer(resource);
Thread threadProducer = new Thread(producer); // 創(chuàng)建1個(gè)生產(chǎn)者線程
Thread threadComsumer = new Thread(comsumer); // 創(chuàng)建1個(gè)消費(fèi)者線程
threadProducer.start(); // 分別開啟線程
threadComsumer.start();
}
}
架子搭好了碉熄,就來運(yùn)行一下,當(dāng)然會出現(xiàn)錯(cuò)誤的結(jié)果肋拔,如下所示:
Thread-0...生產(chǎn)者..bread1
Thread-0...生產(chǎn)者..bread2
Thread-0...生產(chǎn)者..bread3
Thread-0...生產(chǎn)者..bread4
Thread-0...生產(chǎn)者..bread5
Thread-1...消費(fèi)者..bread1
Thread-1...消費(fèi)者..bread6
Thread-1...消費(fèi)者..bread6
Thread-1...消費(fèi)者..bread6
Thread-1...消費(fèi)者..bread6
Thread-1...消費(fèi)者..bread6
Thread-0...生產(chǎn)者..bread6
Thread-0...生產(chǎn)者..bread7
Thread-1...消費(fèi)者..bread6
Thread-1...消費(fèi)者..bread8
Thread-1...消費(fèi)者..bread8
Thread-1...消費(fèi)者..bread8
Thread-0...生產(chǎn)者..bread8
Thread-0...生產(chǎn)者..bread9
Thread-0...生產(chǎn)者..bread10
很明顯锈津,出現(xiàn)了線程安全錯(cuò)誤。這時(shí)凉蜂,就需要“同步”來保證對共享變量的互斥訪問琼梆。上面代碼中需要同步的就是Resource資源類中的produce和consume方法,分別使用synchronized來修飾窿吩,由于synchronized修飾方法時(shí)使用的是“this”鎖茎杂,所以同一個(gè)類中的所有被修飾的方法用的都是同一個(gè)鎖,那么線程一次只能訪問其中一個(gè)方法纫雁。加鎖后的Resource類方法如下:
class Resource{ // 資源類
private String productName; // 資源名稱
private int count = 1; // 資源編號
public synchronized void produce(String name){ // 生產(chǎn)資源函數(shù)
this.productName = name + count;
count ++; // 資源編號遞增蛉顽,用來模擬資源遞增
System.out.println(Thread.currentThread().getName()+"...生產(chǎn)者.."+this.productName);
}
public synchronized void consume() { // 消費(fèi)資源函數(shù)
System.out.println(Thread.currentThread().getName()+"...消費(fèi)者.."+this.productName);
}
}
再來跑一次代碼,又出現(xiàn)問題了:
Thread-0...生產(chǎn)者..bread1
Thread-0...生產(chǎn)者..bread2
Thread-0...生產(chǎn)者..bread3
Thread-0...生產(chǎn)者..bread4
Thread-0...生產(chǎn)者..bread5
Thread-0...生產(chǎn)者..bread6
Thread-0...生產(chǎn)者..bread7
Thread-0...生產(chǎn)者..bread8
Thread-0...生產(chǎn)者..bread9
Thread-0...生產(chǎn)者..bread10
Thread-1...消費(fèi)者..bread10
Thread-1...消費(fèi)者..bread10
Thread-1...消費(fèi)者..bread10
Thread-1...消費(fèi)者..bread10
Thread-1...消費(fèi)者..bread10
Thread-1...消費(fèi)者..bread10
Thread-1...消費(fèi)者..bread10
Thread-1...消費(fèi)者..bread10
Thread-1...消費(fèi)者..bread10
Thread-1...消費(fèi)者..bread10
雖然沒有了線程安全錯(cuò)誤先较,但是問題來了携冤,生產(chǎn)者不停的生產(chǎn)悼粮,還沒等消費(fèi)者消費(fèi)呢,就將后面的資源覆蓋了前面的資源曾棕,導(dǎo)致消費(fèi)者消費(fèi)不到前面的資源扣猫,這樣很容易造成系統(tǒng)資源浪費(fèi)。理想中的結(jié)果應(yīng)該是翘地,生產(chǎn)者生產(chǎn)一個(gè)申尤,消費(fèi)者消費(fèi)一個(gè),和諧運(yùn)行衙耕。對此昧穿,java為多線程引入了”等待-喚醒”機(jī)制。
二橙喘,等待喚醒機(jī)制
與線程做同樣的操作不同时鸵,不同線程之間的操作需要等待喚醒機(jī)制來保證線程間的執(zhí)行順序。生產(chǎn)者和消費(fèi)者模式中厅瞎,生產(chǎn)者和消費(fèi)者是兩類不同的線程饰潜, 這兩類中又可以有很多線程來協(xié)同工作。通俗來說就是和簸,系統(tǒng)為資源設(shè)置一個(gè)標(biāo)志flag彭雾,該標(biāo)志用來標(biāo)明資源是否存在,所有的線程執(zhí)行操作前都要判斷資源是否存在锁保。舉例來說薯酝,系統(tǒng)初始化后,資源是空的爽柒。接下來要執(zhí)行的可能是生產(chǎn)者線程蜜托,也可能是消費(fèi)者線程。如果是消費(fèi)者線程獲得執(zhí)行權(quán)霉赡,先判斷資源橄务,此時(shí)為空,就會進(jìn)入阻塞狀態(tài)穴亏,交出執(zhí)行權(quán)蜂挪,并喚醒其他線程。如果是生產(chǎn)者線程獲得執(zhí)行權(quán)嗓化,先判斷資源棠涮,此時(shí)為空,立馬進(jìn)行生產(chǎn)刺覆,完了交出執(zhí)行權(quán)并喚醒其他線程严肪。
注意,上面提到了兩點(diǎn),第一點(diǎn)是標(biāo)志位flag驳糯,也就是等待機(jī)制篇梭,生產(chǎn)者要判斷系統(tǒng)沒有資源才進(jìn)行生產(chǎn),不然要等待酝枢,消費(fèi)者要判斷系統(tǒng)有資源才進(jìn)行消費(fèi)恬偷,不然也要等待。第二點(diǎn)是喚醒機(jī)制帘睦,不管是生產(chǎn)者還是消費(fèi)者袍患,它們在生產(chǎn)完或者消費(fèi)完后,都要執(zhí)行一個(gè)喚醒操作竣付。java提供的等待喚醒機(jī)制是由java.lang.Object類中的wait()和notify()函數(shù)組來實(shí)現(xiàn)的诡延。其中notify()函數(shù)隨機(jī)喚醒一個(gè)被wait()的線程,而notifyAll()喚醒所有被wait()的線程古胆。很遺憾肆良,并沒有直接喚醒對方線程的函數(shù)。
notify()適用于單生產(chǎn)者和單消費(fèi)者模式赤兴,而notifyAll()適用于多生產(chǎn)者或多消費(fèi)者模式。
下面來看2個(gè)生產(chǎn)者和2個(gè)消費(fèi)者線程處理一個(gè)共享變量的代碼示例:
package com.jimmy.ThreadCommunication;
class Resource2{
private String productName;
private int count = 1;
private boolean flag = false; // 資源類增加一個(gè)標(biāo)志位隧哮,默認(rèn)false桶良,也就是沒有資源
public synchronized void produce(String name){
while (flag == true) { // 如果flag為true,也就是有資源了沮翔,生產(chǎn)者線程就去等待陨帆。
try {
wait(); // wait函數(shù)拋出的異常只能被截獲
} catch (InterruptedException e) {
e.printStackTrace();
}
}
this.productName = name + count;
count ++;
System.out.println(Thread.currentThread().getName()+"....生產(chǎn)者.."+this.productName);
flag = true; // 生產(chǎn)完了就將flag修改為true
notifyAll(); // 然后喚醒其他線程
}
public synchronized void consume() {
while (flag == false) { // 如果flag為false,也就是沒有資源采蚀,消費(fèi)者線程就去等待
try { // 判斷flag要用while疲牵,因?yàn)榫€程被喚醒后會再次判斷flag
wait(); // 而如果是if來判斷,被喚醒后不會再判斷flag榆鼠,那么多個(gè)生產(chǎn)者線程就可能死鎖
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
System.out.println(Thread.currentThread().getName()+"...消費(fèi)者.."+this.productName);
flag = false; // 消費(fèi)完了就把標(biāo)志改為false
notifyAll(); // 然后喚醒其他線程纲爸,因?yàn)橛卸鄠€(gè)生產(chǎn)者和消費(fèi)者線程,所以要用notifyAll妆够,
// 因?yàn)閚otify只喚醒一個(gè)识啦,喚醒到同類型的線程就不好了。
}
}
class Producer2 implements Runnable{
private Resource2 res;
//生產(chǎn)者初始化就要分配資源
public Producer2(Resource2 res) {
this.res = res;
}
@Override
public void run() {
for (int i = 0; i < 5; i++) {
res.produce("bread");
}
}
}
class Comsumer2 implements Runnable{
private Resource2 res;
//同理神妹,消費(fèi)者一初始化也要分配資源
public Comsumer2(Resource2 res) {
this.res = res;
}
@Override
public void run() {
for (int i = 0; i < 10; i++) {
res.consume();
}
}
}
public class ProducerAndConsumer2 {
public static void main(String[] args) {
Resource2 resource = new Resource2(); // 實(shí)例化資源
Producer2 producer = new Producer2(resource); // 實(shí)例化生產(chǎn)者颓哮,并傳入資源對象
Comsumer2 comsumer = new Comsumer2(resource); // 實(shí)例化消費(fèi)者,并傳入相同的資源對象
Thread threadProducer1 = new Thread(producer); // 創(chuàng)建2個(gè)生產(chǎn)者線程
Thread threadProducer2 = new Thread(producer);
Thread threadComsumer1 = new Thread(comsumer); // 創(chuàng)建2個(gè)消費(fèi)者線程
Thread threadComsumer2 = new Thread(comsumer);
threadProducer1.start();
threadProducer2.start();
threadComsumer1.start();
threadComsumer2.start();
}
}
上述代碼的輸出結(jié)果如下鸵荠,是理想中的生產(chǎn)一個(gè)冕茅,消費(fèi)一個(gè)依次進(jìn)行。
Thread-0....生產(chǎn)者..bread1
Thread-3...消費(fèi)者..bread1
Thread-1....生產(chǎn)者..bread2
Thread-2...消費(fèi)者..bread2
Thread-1....生產(chǎn)者..bread3
Thread-3...消費(fèi)者..bread3
Thread-0....生產(chǎn)者..bread4
Thread-3...消費(fèi)者..bread4
Thread-1....生產(chǎn)者..bread5
Thread-2...消費(fèi)者..bread5
Thread-1....生產(chǎn)者..bread6
Thread-3...消費(fèi)者..bread6
Thread-0....生產(chǎn)者..bread7
Thread-3...消費(fèi)者..bread7
Thread-1....生產(chǎn)者..bread8
Thread-2...消費(fèi)者..bread8
Thread-0....生產(chǎn)者..bread9
Thread-3...消費(fèi)者..bread9
Thread-0....生產(chǎn)者..bread10
Thread-2...消費(fèi)者..bread10
可以看出,線程0和1是生產(chǎn)者線程姨伤,他們每次只有一個(gè)進(jìn)行生產(chǎn)哨坪。線程2和3是消費(fèi)者線程,同樣的姜挺,每次只有一個(gè)進(jìn)行消費(fèi)齿税。
注意,上述代碼中的問題有2點(diǎn)需要注意炊豪,第一點(diǎn)是用if還是while來判斷flag凌箕,第二點(diǎn)是用notify還是notifyAll函數(shù)。統(tǒng)一來說词渤,while判斷在線程喚醒后還會再次判斷牵舱,如果只有一個(gè)生產(chǎn)者和消費(fèi)者線程的話可以用if,如果有多個(gè)生產(chǎn)者或者消費(fèi)者缺虐,就必須用while判斷芜壁,不然會出現(xiàn)死鎖。所以高氮,最終要用while和notifyAll()的組合慧妄。
總結(jié)
多線程編程往往是多個(gè)線程執(zhí)行不同的任務(wù),不同的任務(wù)不僅需要“同步”剪芍,還需要“等待喚醒機(jī)制”塞淹。兩者結(jié)合就可以實(shí)現(xiàn)多線程編程,其中的生產(chǎn)者消費(fèi)者模式就是經(jīng)典范例罪裹。
然而饱普,使用synchronized修飾同步函數(shù)和使用Object類中的wait,notify方法實(shí)現(xiàn)等待喚醒是有弊端的状共。就是效率問題套耕,notifyAll方法喚醒所有被wait的線程,包括本類型的線程峡继,如果本類型的線程被喚醒冯袍,還要再次判斷并進(jìn)入wait,這就產(chǎn)生了很大的效率問題碾牌。理想狀態(tài)下颠猴,生產(chǎn)者線程要喚醒消費(fèi)者線程,而消費(fèi)者線程要喚醒生產(chǎn)者線程小染。為此翘瓮,jdk1.5引入了java.util.concurrent.locks包,并提供了Lock和Condition接口及實(shí)現(xiàn)類裤翩。
對標(biāo)阿里P6級架構(gòu)師
掃描下方二維碼资盅,可以獲取更多學(xué)習(xí)資料调榄。