場景
生產(chǎn)者生產(chǎn)數(shù)據(jù)奄妨,消費者消費數(shù)據(jù);
但是性能處理速度均有差異炊琉,因而需要一個中間隊列協(xié)調展蒂;
舉例
3個廚師做甜點,有3個吃貨來吃苔咪,如果廚師和吃貨一對一锰悼,廚師的生產(chǎn)速度和吃貨的吃飯速度均成成了短板,可能存在這樣一種不協(xié)調的場景团赏;
- 慢性子廚師款待一個吃飯快的胖吃貨箕般,吃貨都要餓死了,只能死等舔清;
- 急性子廚師款待一個吃飯慢的瘦子吃貨(比如我女朋友)丝里,吃貨吃不完,堆了一堆甜點
這個時候怎么辦比較好呢?
加一個櫥柜体谒,糕點生產(chǎn)好一個一個放在櫥柜里杯聚,吃貨排隊一個一個拿,這樣是不是好多了呢抒痒,櫥柜協(xié)調了廚師和吃貨的關系幌绍,減少了浪費;
角色
生產(chǎn)者 廚師
消費者 吃貨
數(shù)據(jù) 糕點
中間隊列 桌子
提升與思考
代碼放在最后故响,先講幾個問題深入大家理解
- 隊列可以有多種實現(xiàn)(Strategy Pattern)
- 先放先出 FIFO
- 后放先出 LIFO
- 優(yōu)先級隊列 Priority Quene
-
為什么生產(chǎn)和消費要用多線程?
這也是我在實際應用中感受最深的傀广,答案只有一個,單線程太耗時
只有當多線程的效率提升可以抵消開發(fā)難度和性能消耗時才有必要用多線程 -
別忘記sychronized 和 notifyAll()
否則其他線程一直等待彩届,不會繼續(xù) -
生產(chǎn)線程和消費線程的數(shù)量匹配
上述問題可以只有一個生產(chǎn)線程和一個消費線程(PIPE模式)
當生產(chǎn)比較費時的時候生產(chǎn)線程多一些伪冰,消費耗時時(如網(wǎng)絡IO)多一些消費線程(一對多對應線程池的WorkThread設計模式)
-
當廚師效率較低時(Thread.sleep(1000))
面包總是被吃光,可以增加廚師或者減少吃貨
面包被吃貨吃光樟蠕,等待廚師生產(chǎn)
面包被吃貨吃光贮聂,等待廚師生產(chǎn)
面包被吃貨吃光靠柑,等待廚師生產(chǎn)
廚師乙 在桌子上放入了 廚師乙產(chǎn)生的第1個面包
吃貨甲 從桌子上取走了 廚師乙產(chǎn)生的第1個面包
面包被吃貨吃光,等待廚師生產(chǎn)
面包被吃貨吃光寂汇,等待廚師生產(chǎn)
廚師甲 在桌子上放入了 廚師甲產(chǎn)生的第2個面包
吃貨丙 從桌子上取走了 廚師甲產(chǎn)生的第2個面包
面包被吃貨吃光病往,等待廚師生產(chǎn)
廚師丙 在桌子上放入了 廚師丙產(chǎn)生的第3個面包
吃貨乙 從桌子上取走了 廚師丙產(chǎn)生的第3個面包
面包被吃貨吃光,等待廚師生產(chǎn)
廚師乙 在桌子上放入了 廚師乙產(chǎn)生的第4個面包
吃貨甲 從桌子上取走了 廚師乙產(chǎn)生的第4個面包
面包被吃貨吃光骄瓣,等待廚師生產(chǎn)
廚師丙 在桌子上放入了 廚師丙產(chǎn)生的第5個面包
吃貨丙 從桌子上取走了 廚師丙產(chǎn)生的第5個面包
面包被吃貨吃光,等待廚師生產(chǎn)
面包被吃貨吃光耍攘,等待廚師生產(chǎn)
-
當廚師效率較高時(Thread.sleep(500)時)
總是放不下榕栏,這時可以減少廚師或者增加吃貨
廚師丙 在桌子上放入了 廚師丙產(chǎn)生的第18個面包
桌子放滿了面包,不能再放了
桌子放滿了面包蕾各,不能再放了
吃貨丙 從桌子上取走了 廚師甲產(chǎn)生的第11個面包
廚師乙 在桌子上放入了 廚師乙產(chǎn)生的第7個面包
桌子放滿了面包扒磁,不能再放了
桌子放滿了面包,不能再放了
吃貨乙 從桌子上取走了 廚師丙產(chǎn)生的第16個面包
廚師丙 在桌子上放入了 廚師丙產(chǎn)生的第19個面包
桌子放滿了面包式曲,不能再放了
吃貨丙 從桌子上取走了 廚師丙產(chǎn)生的第18個面包
廚師甲 在桌子上放入了 廚師甲產(chǎn)生的第17個面包
桌子放滿了面包妨托,不能再放了
桌子放滿了面包,不能再放了
桌子放滿了面包吝羞,不能再放了
吃貨乙 從桌子上取走了 廚師乙產(chǎn)生的第7個面包
廚師甲 在桌子上放入了 廚師甲產(chǎn)生的第22個面包
桌子放滿了面包兰伤,不能再放了
桌子放滿了面包,不能再放了
所以中間隊列給我們提供了一個緩沖區(qū)钧排,用于協(xié)調生產(chǎn)者和消費者間的性能差異敦腔,控制互斥
總結:
- 線程的協(xié)調運行需要"放在中間的東西"
- 線程的互斥處理需要"保護中間的東西"
- 中間隊列使得兩端協(xié)調成為肯能
代碼
- 生產(chǎn)者
/**
* @author xuhe
* @description 生產(chǎn)者線程
* @date 2018/5/17
*/
public class ProducerThread extends Thread {
public static int index;
private String threadName;
private DataChannel dataChannel;
private Random random;
public ProducerThread(String threadName, DataChannel dataChannel, long randomSeed) {
this.threadName = threadName;
this.dataChannel = dataChannel;
this.random = new Random(randomSeed);
setName(threadName);
}
@Override
public void run() {
try {
while (true){
Thread.sleep(random.nextInt(500));
Data data= new Data();
data.setName(String.format("%s產(chǎn)生的第%s個面包",threadName,getIndex()));
dataChannel.put(data);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
private synchronized int getIndex(){
return ++index;
}
}
- 消費者
package com.microparticletech.producer_consumer;
import java.util.Random;
/**
* @author xuhe
* @description 消費者線程
* @date 2018/5/17
*/
public class ConsumerThread extends Thread {
private DataChannel dataChannel;
private String threadName;
private Random random;
public ConsumerThread(String threadName,DataChannel dataChannel, long randomSeed) {
this.threadName = threadName;
this.dataChannel=dataChannel;
this.random = new Random(randomSeed);
setName(threadName);
}
@Override
public void run() {
System.out.println(threadName+"啟動");
try {
while (true){
Data data=new Data();
data=dataChannel.get();
Thread.sleep(random.nextInt(1000));
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
- 數(shù)據(jù)
@lombok.Data
public class Data {
private String name;
public void setName(String name) {
this.name = name;
}
@Override
public String toString() {
return
"name='" + name + '\'';
}
}
- 中間隊列
public interface DataChannel {
void put(Data data) throws InterruptedException;
Data get() throws InterruptedException;
}
public class DataChannelFIFOImpl implements DataChannel {
//先進先出的隊列實現(xiàn)
private static final int CAPACITY=3;
private Data[] dataArray = new Data[CAPACITY];
private int head;
private int tail;
private int count;
@Override
public synchronized void put(Data data) throws InterruptedException {
while (count>=CAPACITY){
System.out.println("桌子放滿了面包,不能再放了");
wait();
}
System.out.println(String.format("%s 在桌子上放入了 %s",Thread.currentThread().getName(),data.getName()));
dataArray[tail]=data;
tail=(tail+1)%CAPACITY;
count++;
notifyAll();
}
@Override
public synchronized Data get() throws InterruptedException {
while (count<=0){
System.out.println("面包被吃貨吃光恨溜,等待廚師生產(chǎn)");
wait();
}
Data data=dataArray[head];
System.out.println(String.format("%s 從桌子上取走了 %s",Thread.currentThread().getName(),data.getName()));
count--;
head=(head+1)%CAPACITY;
notifyAll();
return data;
}
}
- 啟動類
public class MainTest {
public static void main(String[] args) {
int size=3;
DataChannel channel = new DataChannelFIFOImpl();
Thread producer1 = new ProducerThread("廚師甲" , channel, 31415);
Thread producer2 = new ProducerThread("廚師乙" , channel, 92653);
Thread producer3 = new ProducerThread("廚師丙" , channel, 58979);
producer1.start();
producer2.start();
producer3.start();
Thread consumer1 = new ConsumerThread("吃貨甲" , channel, 32384);
Thread consumer2 = new ConsumerThread("吃貨乙" , channel, 62643);
Thread consumer3 = new ConsumerThread("吃貨丙" , channel, 38327);
consumer1.start();
consumer2.start();
consumer3.start();
}
}