前言
生產(chǎn)者和消費者問題是線程模型中的經(jīng)典問題:生產(chǎn)者和消費者在同一時間段內(nèi)共用同一個存儲空間,生產(chǎn)者往存儲空間中添加產(chǎn)品邢锯,消費者從存儲空間中取走產(chǎn)品,當(dāng)存儲空間為空時斤讥,消費者阻塞候生,當(dāng)存儲空間滿時同眯,生產(chǎn)者阻塞。
java中最基本的生產(chǎn)者消費者模型是用notify跟wait實現(xiàn)的唯鸭。
加鎖機制的缺陷
在Java程序中嗽测,synchronized解決了多線程競爭的問題。當(dāng)一個加鎖函數(shù)執(zhí)行完成后會自動釋放鎖肿孵,例如,對于一個任務(wù)管理器疏魏,多個線程同時往隊列中添加任務(wù)停做,可以用synchronized加鎖,但是synchronized并沒有解決多線程協(xié)調(diào)的問題大莫。
class TaskQueue {
Queue<String> queue = new LinkedList<>();
public synchronized void addTask(String s) {
this.queue.add(s);
}
public synchronized String getTask() {
while (queue.isEmpty()) {
}
return queue.remove();
}
}
上面這段代碼對兩個方法進行了加鎖蛉腌,但是由于getTask()方法加鎖后,如果queue對象中一直沒有對象就會形成死循環(huán)只厘,鎖得不到釋放烙丛!其他線程也不行拿到鎖進行下一步操作,這樣的問題在多線程中經(jīng)常發(fā)生羔味,java為了解決這樣的問題所以進行了線程的喚醒沉睡操作河咽!
可以將方法進行稍作修改變?yōu)椋?/p>
public synchronized String getTask() {
while (queue.isEmpty()) {
this.wait();
}
return queue.remove();
}
其中 this.wait() 方法的意思是:
wait()讓當(dāng)前線程進入等待狀態(tài),同時赋元,wait()也會讓當(dāng)前線程釋放它所持有的鎖忘蟹。“直到其他線程調(diào)用此對象的 notify() 方法或 notifyAll() 方法”搁凸,當(dāng)前線程被喚醒(進入“就緒狀態(tài)”)
所以這里調(diào)用方法后媚值,如果queue中是空的會將此線程進行沉睡,釋放他們的鎖护糖!但是釋放后誰怎么去通知這個線程又可以繼續(xù)進行了呢褥芒?這個時候就需要notify或者notifyAll方法了!
所以我們再將添加方法進行修改得到:
public synchronized void addTask(String s) {
this.queue.add(s);
this.notify(); // 喚醒在this鎖等待的線程
}
其中 this.notify() 方法的意思是:
notify()和notifyAll()的作用嫡良,則是喚醒當(dāng)前對象上的等待線程锰扶;notify()是喚醒單個線程,而notifyAll()是喚醒所有的線程皆刺。
這段代碼的意思就是我將一個值放入到queue后少辣,我馬上調(diào)用notify方法去通知那些進入wait方法的線程可以繼續(xù)執(zhí)行了!
最簡單的生產(chǎn)消費
根據(jù)上面的解釋羡蛾,我們完整的代碼可以像下面這樣:
public class Java_21 {
public static void main(String[] args) throws InterruptedException {
TaskQueue q = new TaskQueue();
List<Thread> ts = new ArrayList<>();
for (int i = 0; i < 5; i++) {
Thread t = new Thread(() -> {
// 執(zhí)行task:
while (true) {
try {
String s = q.getTask();
System.out.println("execute task: " + s);
} catch (InterruptedException e) {
return;
}
}
});
t.start();
ts.add(t);
}
Thread add = new Thread(() -> {
for (int i = 0; i < 10; i++) {
// 放入task:
String s = "t-" + Math.random();
System.out.println("add task: " + s);
q.addTask(s);
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
add.start();
add.join();
Thread.sleep(100);
for (Thread thread : ts) {
thread.interrupt();
}
}
}
class TaskQueue {
private Queue<String> stringQueue = new LinkedList<>();
public synchronized void addTask(String s) {
stringQueue.add(s);
// notify()和notifyAll()的作用漓帅,則是喚醒當(dāng)前對象上的等待線程;notify()是喚醒單個線程,而notifyAll()是喚醒所有的線程忙干。
this.notifyAll(); // 喚醒在this鎖等待的所有線程
}
public synchronized String getTask() throws InterruptedException {
while (stringQueue.isEmpty()) {
//wait()的作用是讓當(dāng)前線程進入等待狀態(tài)器予,同時,wait()也會讓當(dāng)前線程釋放它所持有的鎖捐迫∏瑁“直到其他線程調(diào)用此對象的 notify() 方法或 notifyAll() 方法”,當(dāng)前線程被喚醒(進入“就緒狀態(tài)”)
this.wait();
}
return stringQueue.remove();
}
}
運行的結(jié)果就是:
add task: t-0.8499205383861345
execute task: t-0.8499205383861345
add task: t-0.7942800777561965
execute task: t-0.7942800777561965
add task: t-0.6906469783761674
execute task: t-0.6906469783761674
add task: t-0.8532616076721191
execute task: t-0.8532616076721191
add task: t-0.7939063475258298
execute task: t-0.7939063475258298
add task: t-0.658439645359411
execute task: t-0.658439645359411
add task: t-0.1836403135968735
execute task: t-0.1836403135968735
add task: t-0.868308480446476
execute task: t-0.868308480446476
add task: t-0.5658234674527566
execute task: t-0.5658234674527566
add task: t-0.8699112304857072
execute task: t-0.8699112304857072
一眼就能看出是添加一個元素后馬上取出一個元素施戴。
對上面的代碼進行解釋:
我們有一個TaskQueue類里面有個添加與取出的方法反浓,兩個方法都進行了加鎖的機制。
首先我們看取出的getTask()機制:
我們判斷stringQueue是不是空如果是空就調(diào)用wait() 方法釋放當(dāng)前線程的鎖赞哗,然后進入到睡眠狀態(tài)
再來看addTask()方法:
我們向stringQueue中添加一個值雷则,添加好了后我們馬上調(diào)用notifyAll()方法喚醒簽名所有調(diào)用了wait()方法的線程進行消費!
最后再來看main函數(shù)的執(zhí)行:
我們開啟了五個線程進行g(shù)etTask()的執(zhí)行肪笋,表示取出操作月劈,我們再開啟一個線程進行了addTask()操作,我們調(diào)用join方法讓add線程進行完成后藤乙,我們再遍歷集合調(diào)用interrupt()方法進行所有線程的中斷操作
執(zhí)行過程:
add線程中我調(diào)用addTask()后添加一個值馬上就去調(diào)用notifyAll()通知其他五個等待線程去取猜揪,至于是哪個線程取到這就是cpu自己的調(diào)度了,等取空了后五個線程中又進入了休眠狀態(tài)坛梁,我這邊又去添加值而姐,這樣重復(fù)進行就可以達(dá)到一放一取的操作
生產(chǎn)者消費者擴充
一般的生產(chǎn)者消費者模型都是給了一個緩沖區(qū)的概念,都是添加到緩沖區(qū)罚勾,等緩沖區(qū)到達(dá)一個值后生產(chǎn)者就不生產(chǎn)了毅人,然后讓消費者去消費,等消費者消費完后又暫停等生產(chǎn)者去生產(chǎn)尖殃!跟前言里面的圖是一個道理丈莺,將我們的代碼簡單的變化就能達(dá)到:
修改生產(chǎn)者:
public synchronized void addTask(String s) {
stringQueue.add(s);
if (stringQueue.size()==100){
this.notifyAll();
}
}
中間的緩沖區(qū)大小就是100.
最后
生產(chǎn)者消費者模式我們不是經(jīng)常用到,但是我們開發(fā)中用到的框架很多都是基于這樣的線程模型送丰,所以掌握原理后有助于我們開發(fā)中解決實際的線程安全交流問題
補充
加鎖的寫法有很多種缔俄,例如我將上面的方法不用this加鎖,用一個中間常量效果一樣器躏,完整代碼如下:
public class Java_21 {
static final String LOCK = "lock";
public static void main(String[] args) throws InterruptedException {
TaskQueue q = new TaskQueue();
List<Thread> ts = new ArrayList<>();
for (int i = 0; i < 5; i++) {
Thread t = new Thread(() -> {
// 執(zhí)行task:
while (true) {
try {
String s = q.getTask();
System.out.println("execute task: " + s);
} catch (InterruptedException e) {
return;
}
}
});
t.start();
ts.add(t);
}
Thread add = new Thread(() -> {
for (int i = 0; i < 10; i++) {
// 放入task:
String s = "t-" + Math.random();
System.out.println("add task: " + s);
q.addTask(s);
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
add.start();
add.join();
Thread.sleep(100);
for (Thread thread : ts) {
thread.interrupt();
}
}
}
class TaskQueue {
private Queue<String> stringQueue = new LinkedList<>();
public void addTask(String s) {
synchronized(LOCK){
stringQueue.add(s);
// if (stringQueue.size()==100){
// LOCK.notifyAll(); // 喚醒在this鎖等待的所有線程
//
// }
LOCK.notifyAll();
}
}
public String getTask() throws InterruptedException {
synchronized (LOCK){
while (stringQueue.isEmpty()) {
LOCK.wait();
}
return stringQueue.remove();
}
}
}