生產(chǎn)者消費(fèi)者模式在我們?nèi)粘9ぷ髦杏玫梅浅6圊逅妫热纾涸?strong>模塊解耦、消息隊(duì)列因惭、分布式場景中都很常見岳锁。這個(gè)模式里有三個(gè)角色,他們之間的關(guān)系是如下圖這樣的:
- 生產(chǎn)者線程:生產(chǎn)消息蹦魔、數(shù)據(jù)
- 消費(fèi)者線程:消費(fèi)消息激率、數(shù)據(jù)
- 阻塞隊(duì)列:作數(shù)據(jù)緩沖、平衡二者能力勿决,避免出現(xiàn)"產(chǎn)能過剩"的情況(生產(chǎn)者生產(chǎn)速度遠(yuǎn)高于消費(fèi)者消費(fèi)速度 or 多個(gè)生產(chǎn)者對一個(gè)消費(fèi)者)以及"供不應(yīng)求"的情況(生產(chǎn)者生產(chǎn)速度遠(yuǎn)低于消費(fèi)者消費(fèi)速度 or 多個(gè)消費(fèi)者對一個(gè)生產(chǎn)者)
從圖中 3 和 4 可以知道:無論阻塞隊(duì)列是滿還是空都可能會(huì)產(chǎn)生阻塞乒躺,阻塞之后就要在合適的時(shí)候去喚醒被阻塞的線程。
Q1:那什么時(shí)候會(huì)喚醒阻塞線程低缩?
1嘉冒、當(dāng)消費(fèi)者判斷隊(duì)列為空時(shí),消費(fèi)者線程進(jìn)入等待表制。這期間生產(chǎn)者一旦往隊(duì)列中放入數(shù)據(jù)健爬,就會(huì)通知所有的消費(fèi)者,喚醒阻塞的消費(fèi)者線程么介。
2、反之蜕衡,當(dāng)生產(chǎn)者判斷隊(duì)列已滿壤短,生產(chǎn)者線程進(jìn)入等待。這期間消費(fèi)者一旦消費(fèi)了數(shù)據(jù)慨仿、隊(duì)列有空位久脯,就會(huì)通知所有的生產(chǎn)者,喚醒阻塞的生產(chǎn)者線程镰吆。
Q2:為什么要用這種模式帘撰?
看了上面的 Q1,大家發(fā)現(xiàn)沒有万皿?生產(chǎn)者不用管消費(fèi)者的動(dòng)作摧找,消費(fèi)者也不用管生產(chǎn)者的動(dòng)作;它兩之間就是通過阻塞隊(duì)列通信牢硅,實(shí)現(xiàn)了解耦蹬耘;阻塞隊(duì)列的加入,平衡二者能力减余;生產(chǎn)者只有在隊(duì)列滿或消費(fèi)者只有在隊(duì)列空時(shí)才會(huì)等待综苔,其他時(shí)間誰搶到鎖誰工作,提高效率。以上就是原因~
使用 wait如筛、notify/notifyAll 實(shí)現(xiàn)
上篇文章《正確使用 wait堡牡、notify/notifyAll》說過,wait 讓當(dāng)前線程等待并釋放鎖杨刨,notify 喚醒任意一個(gè)等待同一個(gè)鎖的線程悴侵,notifyAll 則是喚醒所有等待該鎖的線程,然后誰搶到鎖拭嫁,誰執(zhí)行可免。這就是所謂的等待喚醒機(jī)制
先來看看用等待喚醒機(jī)制如何實(shí)現(xiàn)生產(chǎn)者、消費(fèi)者模式的做粤,首先是阻塞隊(duì)列:
public class MyBlockingQueue {
private int maxSize;
private LinkedList<Integer> queue;
public MyBlockingQueue(int size) {
this.maxSize = size;
queue = new LinkedList<>();
}
public synchronized void put() throws InterruptedException {
while (queue.size() == maxSize) {
System.out.println("隊(duì)列已滿浇借,生產(chǎn)者: " + Thread.currentThread().getName() +"進(jìn)入等待");
wait();
}
Random random = new Random();
int i = random.nextInt();
System.out.println("隊(duì)列未滿,生產(chǎn)者: " +
Thread.currentThread().getName() +"放入數(shù)據(jù)" + i);
// 隊(duì)列空才去喚醒消費(fèi)者怕品,其他時(shí)間自由競爭鎖
if (queue.size() == 0) {
notifyAll();
}
queue.add(i);
}
public synchronized void take() throws InterruptedException {
while (queue.size() == 0) {
System.out.println("隊(duì)列為空妇垢,消費(fèi)者: " + Thread.currentThread().getName() +"進(jìn)入等待");
wait();
}
// 隊(duì)列滿了才去喚醒生產(chǎn)者,其他時(shí)間自由競爭鎖
if (queue.size() == maxSize) {
notifyAll();
}
System.out.println("隊(duì)列有數(shù)據(jù)肉康,消費(fèi)者: " +
Thread.currentThread().getName() +"取出數(shù)據(jù): " + queue.remove());
}
}
主要邏輯在阻塞隊(duì)列這邊:先看 put 方法闯估,while 檢查隊(duì)列是否滿?滿則進(jìn)入等待并主動(dòng)釋放鎖吼和,不滿則生產(chǎn)數(shù)據(jù)涨薪,同時(shí)判斷放入數(shù)據(jù)之前隊(duì)列是否空?空則喚醒消費(fèi)者(因?yàn)殛?duì)列已有數(shù)據(jù)炫乓,可消費(fèi))刚夺。
再看 take 方法,while 檢查隊(duì)列是否空末捣?空則進(jìn)入等待并主動(dòng)釋放鎖侠姑,不空則生產(chǎn)數(shù)據(jù),同時(shí)判斷取出數(shù)據(jù)之前隊(duì)列是否已滿箩做?滿則喚醒生產(chǎn)者(因?yàn)殛?duì)列已有空位莽红,可生產(chǎn))。
為什么是 while 不是 if 邦邦?
大家可能有個(gè)疑問安吁。為什么判斷隊(duì)列 size 進(jìn)入等待狀態(tài)這里是用 while,不能用 if 嗎圃酵?就這個(gè) demo 而言柳畔,是可以的。因?yàn)槲覀兊纳a(chǎn)者和消費(fèi)者線程都只有一個(gè)郭赐,但是多線程情況下用 if 就大錯(cuò)特錯(cuò)了薪韩。想象以下情況:
假設(shè)有兩個(gè)消費(fèi)者一個(gè)生產(chǎn)者确沸。隊(duì)列為空,消費(fèi)者一進(jìn)入等待狀態(tài)俘陷,釋放鎖罗捎。消費(fèi)者二搶到鎖,進(jìn)入 if(queue.size == 0) 的判斷拉盾,也進(jìn)入等待桨菜,釋放鎖。這時(shí)生產(chǎn)者搶到鎖生產(chǎn)數(shù)據(jù)捉偏,隊(duì)列有數(shù)據(jù)了倒得。反過來喚醒兩個(gè)消費(fèi)者。
消費(fèi)者一搶到鎖執(zhí)行 wait() 后的邏輯夭禽,取完數(shù)據(jù)釋放鎖霞掺。這時(shí)消費(fèi)者二拿到鎖,執(zhí)行 wait() 后的邏輯取數(shù)據(jù)讹躯,但是此時(shí)隊(duì)列的數(shù)據(jù)已被消費(fèi)者一取出菩彬,沒有數(shù)據(jù)了,這時(shí)就會(huì)報(bào)異常了潮梯。
而用 while 為什么可以骗灶?因?yàn)椴还苁窍M(fèi)者一還是二搶到鎖,循環(huán)體的邏輯之前秉馏。根據(jù) while 的語法耙旦,它會(huì)再一次判斷條件是否成立,而 if 不會(huì)沃饶。這就是用 while 不用 if 的原因母廷。
生產(chǎn)者:
public class Producer implements Runnable {
private MyBlockingQueue myBlockingQueue;
public Producer(MyBlockingQueue myBlockingQueue) {
this.myBlockingQueue = myBlockingQueue;
}
@Override
public void run() {
for (int i = 0; i < 100; i++) {
try {
myBlockingQueue.put();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
消費(fèi)者:
public class Consumer implements Runnable{
private MyBlockingQueue myBlockingQueue;
public Consumer(MyBlockingQueue myBlockingQueue) {
this.myBlockingQueue = myBlockingQueue;
}
@Override
public void run() {
for (int i = 0; i < 100; i++) {
try {
myBlockingQueue.take();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
測試類:
public class MyBlockingQueueTest {
public static void main(String[] args) {
MyBlockingQueue myBlockingQueue = new MyBlockingQueue(10);
Producer producer = new Producer(myBlockingQueue);
Consumer consumer = new Consumer(myBlockingQueue);
new Thread(producer).start();
new Thread(consumer).start();
}
}
使用 Condition 實(shí)現(xiàn)
Condition 是一個(gè)多線程間協(xié)調(diào)通信的工具類,它的 await糊肤、sign/signAll 方法正好對應(yīng)Object 的 wait、notify/notifyAll 方法氓鄙。相比于 Object 的 wait馆揉、notify 方法,Condition 的 await抖拦、signal 結(jié)合的方式實(shí)現(xiàn)線程間協(xié)作更加安全和高效升酣,所以更推薦這種方式實(shí)現(xiàn)線程間協(xié)作。關(guān)于 Condition 后面章節(jié)會(huì)繼續(xù)研究态罪,敬請關(guān)注
Object 的 wait噩茄、notify 方式需要結(jié)合 synchronized 關(guān)鍵字實(shí)現(xiàn)等待喚醒機(jī)制,同樣 Condition 也需要結(jié)合 Lock 類-复颈。那么這種方式如何實(shí)現(xiàn)生產(chǎn)者绩聘、消費(fèi)者模式?看代碼:
public class MyBlockingQueueForCondition {
private Queue<Integer> queue;
private int max = 10;
private ReentrantLock lock = new ReentrantLock();
private Condition notEmpty = lock.newCondition();
private Condition notFull = lock.newCondition();
public MyBlockingQueueForCondition(int size) {
this.max = size;
queue = new LinkedList();
}
public void put(Integer i) throws InterruptedException {
// 加鎖
lock.lock();
try {
// 隊(duì)列滿了,進(jìn)入等待
while (queue.size() == max) {
System.out.println("隊(duì)列已滿凿菩,生產(chǎn)者: " + Thread.currentThread().getName() + "進(jìn)入等待");
notFull.await();
}
// 加入數(shù)據(jù)之前机杜,隊(duì)列為空?通知消費(fèi)者衅谷,可以消費(fèi)
if (queue.size() == 0) {
notEmpty.signalAll();
}
// 否則椒拗,繼續(xù)生產(chǎn)
queue.add(i);
} finally {
// 最后別忘記釋放鎖
lock.unlock();
}
}
public Integer take() throws InterruptedException {
// 加鎖
lock.lock();
try {
// 隊(duì)列無數(shù)據(jù),進(jìn)入等待
while (queue.size() == 0) {
System.out.println("隊(duì)列為空获黔,消費(fèi)者: " + Thread.currentThread().getName() + "進(jìn)入等待");
notEmpty.await();
}
// 取出數(shù)據(jù)之前蚀苛,隊(duì)列已滿?通知生產(chǎn)者玷氏,可以生產(chǎn)
if (queue.size() == max) {
notFull.signalAll();
}
// 否則堵未,取出
return queue.remove();
} finally {
// 最后別忘記釋放鎖
lock.unlock();
}
}
}
首先,定義了一個(gè)隊(duì)列以及 ReentrantLock 類型的鎖预茄,在這基礎(chǔ)上還創(chuàng)建 notFull兴溜、notEmpty 兩個(gè)條件,分別代表未滿耻陕、不為空的條件拙徽。最后定義了 take、put 方法诗宣。
take 和 put 邏輯差不多膘怕,這里只說 put 。因?yàn)橄M(fèi)生產(chǎn)模式肯定用于多線程環(huán)境召庞,需要保證同步岛心。這里還是先獲取鎖,確保同步篮灼。之后依然是判斷隊(duì)列是否已滿忘古?滿了進(jìn)入等待并釋放鎖,不滿則繼續(xù)生產(chǎn)诅诱,同時(shí)判斷隊(duì)列在生產(chǎn)前是否為空髓堪,為空才去喚醒消費(fèi)者。否則不喚醒娘荡,因?yàn)楫?dāng)隊(duì)列為空消費(fèi)者才進(jìn)入阻塞干旁。
PS:最后是一個(gè)非常重要的細(xì)節(jié),在 finally 里面釋放鎖炮沐,否則有可能出現(xiàn)異常無法釋放鎖的情況争群。
生產(chǎn)者:
public class ProducerForCondition implements Runnable {
private MyBlockingQueueForCondition myBlockingQueueForCondition;
public ProducerForCondition(MyBlockingQueueForCondition myBlockingQueueForCondition) {
this.myBlockingQueueForCondition = myBlockingQueueForCondition;
}
@Override
public void run() {
for (int i = 0; i < 100; i++) {
try {
myBlockingQueueForCondition.put(i);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
消費(fèi)者:
public class ConsumerForCondition implements Runnable{
private MyBlockingQueueForCondition myBlockingQueueForCondition;
public ConsumerForCondition(MyBlockingQueueForCondition myBlockingQueueForCondition) {
this.myBlockingQueueForCondition = myBlockingQueueForCondition;
}
@Override
public void run() {
for (int i = 0; i < 100; i++) {
try {
System.out.println("消費(fèi)者取出數(shù)據(jù): " + myBlockingQueueForCondition.take());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
測試類:
public class MyBlockingQueueForConditionTest {
public static void main(String[] args) {
MyBlockingQueueForCondition myBlockingQueueForCondition = new MyBlockingQueueForCondition(10);
ProducerForCondition producerForCondition = new ProducerForCondition(myBlockingQueueForCondition);
ConsumerForCondition consumerForCondition = new ConsumerForCondition(myBlockingQueueForCondition);
new Thread(producerForCondition).start();
new Thread(consumerForCondition).start();
}
}
使用 BlockingQueue 實(shí)現(xiàn)
看完前兩種方式之后,有些小伙伴可能會(huì)說大年,實(shí)現(xiàn)個(gè)生產(chǎn)者消費(fèi)者這么煩么换薄?其實(shí)主要代碼還是在阻塞隊(duì)列玉雾,這點(diǎn) Java 早就為我們考慮好了,它提供了 BlockingQueue 接口专控,并有實(shí)現(xiàn)類: ArrayBlockingQueue抹凳、DelayQueue、 LinkedBlockingDeque伦腐、LinkedBlockingQueue赢底、等。(關(guān)于阻塞隊(duì)列柏蘑,狗哥的多線程系列后面也會(huì)講到)
我們選用最簡單的 ArrayBlockingQueue 實(shí)現(xiàn)幸冻。它的內(nèi)部也是采取 ReentrantLock 和 Condition 結(jié)合的等待喚醒機(jī)制。所以咳焚,上面的兩種方式其實(shí)是為這種方式鋪墊洽损。不多比比,上代碼:
public class ArrayBlockingQueueTest {
public static void main(String[] args) {
// 初始化長度為 10 的 ArrayBlockingQueue
BlockingQueue<Integer> queue = new ArrayBlockingQueue<>(10);
// 生產(chǎn)者
Runnable producer = () -> {
try {
// 放入數(shù)據(jù)
Random random = new Random();
while (true) {
queue.put(random.nextInt());
}
} catch (Exception e) {
System.out.println("生產(chǎn)數(shù)據(jù)出錯(cuò): " + e.getMessage());
}
};
// 開啟線程生產(chǎn)數(shù)據(jù)
new Thread(producer).start();
// 消費(fèi)者
Runnable consumer = () -> {
try {
// 取出數(shù)據(jù)
while (true) {
System.out.println(queue.take());
}
} catch (Exception e) {
System.out.println("消費(fèi)數(shù)據(jù)出錯(cuò): " + e.getMessage());
}
};
// 開啟線程消費(fèi)數(shù)據(jù)
new Thread(consumer).start();
}
}
創(chuàng)建一個(gè) ArrayBlockingQueue 并給定最大長度為 10革半,創(chuàng)建生產(chǎn)者和消費(fèi)者碑定。生產(chǎn)者在 while(true) 里面一直生產(chǎn),與此同時(shí)消費(fèi)者也是不斷取數(shù)據(jù)又官,有數(shù)據(jù)就取出來延刘。
看著是不是很簡單?但其實(shí)背后 ArrayBlockingQueue 已經(jīng)為我們做好了線程間通信的工作了六敬,比如隊(duì)列滿了就去阻塞生產(chǎn)者線程碘赖,隊(duì)列有空就去喚醒生產(chǎn)者線程等。
巨人的肩膀
總結(jié)
看了這幾個(gè)例子之后外构,相信你對生產(chǎn)者消費(fèi)者模式也有所了解普泡。以后面試官讓你手寫一個(gè)阻塞隊(duì)列,肯定也難不倒你审编。
小福利
如果看到這里撼班,喜歡這篇文章的話,請幫點(diǎn)個(gè)好看垒酬。微信搜索一個(gè)優(yōu)秀的廢人权烧,關(guān)注后回復(fù)電子書送你 100+ 本編程電子書 ,不只 Java 哦伤溉,詳情看下圖∑蘼剩回復(fù) 1024送你一套完整的 java 視頻教程乱顾。