生產(chǎn)者消費(fèi)者模型
Main
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
public class Main {
public static void main(String[] args) throws Exception {
//內(nèi)存緩沖區(qū)
BlockingQueue<Data> queue = new LinkedBlockingQueue<Data>(10);
//生產(chǎn)者
Provider p1 = new Provider(queue);
Provider p2 = new Provider(queue);
Provider p3 = new Provider(queue);
//消費(fèi)者
Consumer c1 = new Consumer(queue);
Consumer c2 = new Consumer(queue);
Consumer c3 = new Consumer(queue);
//創(chuàng)建線程池運(yùn)行,這是一個緩存的線程池筷弦,可以創(chuàng)建無窮大的線程,沒有任務(wù)的時候不創(chuàng)建線程脯颜”韵桑空閑線程存活時間為60s(默認(rèn)值)
ExecutorService cachePool = Executors.newCachedThreadPool();
cachePool.execute(p1);
cachePool.execute(p2);
cachePool.execute(p3);
cachePool.execute(c1);
cachePool.execute(c2);
cachePool.execute(c3);
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
p1.stop();
p2.stop();
p3.stop();
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
// cachePool.shutdown();
// cachePool.shutdownNow();
}
}
provider(生產(chǎn)者)
import java.util.Random;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
public class Provider implements Runnable{
//共享緩存區(qū)
private BlockingQueue<Data> queue;
//多線程間是否啟動變量,有強(qiáng)制從主內(nèi)存中刷新的功能承桥。即時返回線程的狀態(tài)
private volatile boolean isRunning = true;
//id生成器
private static AtomicInteger count = new AtomicInteger();
//隨機(jī)對象
private static Random r = new Random();
public Provider(BlockingQueue queue){
this.queue = queue;
}
@Override
public void run() {
while(isRunning){
try {
//隨機(jī)休眠0 - 1000 毫秒 表示獲取數(shù)據(jù)(產(chǎn)生數(shù)據(jù)的耗時)
Thread.sleep(r.nextInt(1000));
//獲取的數(shù)據(jù)進(jìn)行累計...
int id = count.incrementAndGet();
//比如通過一個getData方法獲取了
Data data = new Data(Integer.toString(id), "數(shù)據(jù)" + id);
System.out.println("當(dāng)前線程:" + Thread.currentThread().getName() + ", 獲取了數(shù)據(jù)费薄,id為:" + id + ", 進(jìn)行裝載到公共緩沖區(qū)中...");
if(!this.queue.offer(data, 2, TimeUnit.SECONDS)){
System.out.println("提交緩沖區(qū)數(shù)據(jù)失敗....");
//do something... 比如重新提交
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public void stop(){
this.isRunning = false;
}
}
Consumer(消費(fèi)者)
import java.util.Random;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
public class Consumer implements Runnable{
private BlockingQueue<Data> queue;
public Consumer(BlockingQueue queue){
this.queue = queue;
}
//隨機(jī)對象
private static Random r = new Random();
@Override
public void run() {
while(true){
try {
//獲取數(shù)據(jù)
Data data = this.queue.take();
//進(jìn)行數(shù)據(jù)處理坷檩。休眠0 - 1000毫秒模擬耗時
Thread.sleep(r.nextInt(1000));
System.out.println("當(dāng)前消費(fèi)線程:" + Thread.currentThread().getName() + ", 消費(fèi)成功赡艰,消費(fèi)數(shù)據(jù)為id: " + data.getId());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
Data數(shù)據(jù)
public final class Data {
private String id;
private String name;
public Data(String id, String name){
this.id = id;
this.name = name;
}
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
@Override
public String toString(){
return "{id: " + id + ", name: " + name + "}";
}
}
log信息
當(dāng)前線程:pool-1-thread-3, 獲取了數(shù)據(jù)售淡,id為:1, 進(jìn)行裝載到公共緩沖區(qū)中...
當(dāng)前消費(fèi)線程:pool-1-thread-4, 消費(fèi)成功慷垮,消費(fèi)數(shù)據(jù)為id: 1
當(dāng)前線程:pool-1-thread-1, 獲取了數(shù)據(jù)揖闸,id為:2, 進(jìn)行裝載到公共緩沖區(qū)中...
當(dāng)前線程:pool-1-thread-2, 獲取了數(shù)據(jù),id為:3, 進(jìn)行裝載到公共緩沖區(qū)中...
當(dāng)前消費(fèi)線程:pool-1-thread-6料身, 消費(fèi)成功汤纸,消費(fèi)數(shù)據(jù)為id: 3
當(dāng)前線程:pool-1-thread-3, 獲取了數(shù)據(jù),id為:4, 進(jìn)行裝載到公共緩沖區(qū)中...
當(dāng)前線程:pool-1-thread-2, 獲取了數(shù)據(jù)芹血,id為:5, 進(jìn)行裝載到公共緩沖區(qū)中...
當(dāng)前線程:pool-1-thread-3, 獲取了數(shù)據(jù)蹲嚣,id為:6, 進(jìn)行裝載到公共緩沖區(qū)中...
當(dāng)前線程:pool-1-thread-1, 獲取了數(shù)據(jù),id為:7, 進(jìn)行裝載到公共緩沖區(qū)中...
當(dāng)前線程:pool-1-thread-2, 獲取了數(shù)據(jù)祟牲,id為:8, 進(jìn)行裝載到公共緩沖區(qū)中...
當(dāng)前消費(fèi)線程:pool-1-thread-5隙畜, 消費(fèi)成功,消費(fèi)數(shù)據(jù)為id: 2
當(dāng)前消費(fèi)線程:pool-1-thread-4说贝, 消費(fèi)成功议惰,消費(fèi)數(shù)據(jù)為id: 4
當(dāng)前線程:pool-1-thread-1, 獲取了數(shù)據(jù),id為:9, 進(jìn)行裝載到公共緩沖區(qū)中...
當(dāng)前線程:pool-1-thread-3, 獲取了數(shù)據(jù)乡恕,id為:10, 進(jìn)行裝載到公共緩沖區(qū)中...
當(dāng)前線程:pool-1-thread-2, 獲取了數(shù)據(jù)秒裕,id為:11, 進(jìn)行裝載到公共緩沖區(qū)中...
當(dāng)前線程:pool-1-thread-3, 獲取了數(shù)據(jù),id為:12, 進(jìn)行裝載到公共緩沖區(qū)中...
當(dāng)前消費(fèi)線程:pool-1-thread-4闷板, 消費(fèi)成功侦铜,消費(fèi)數(shù)據(jù)為id: 7
當(dāng)前消費(fèi)線程:pool-1-thread-6, 消費(fèi)成功函卒,消費(fèi)數(shù)據(jù)為id: 5
當(dāng)前消費(fèi)線程:pool-1-thread-4辆憔, 消費(fèi)成功,消費(fèi)數(shù)據(jù)為id: 8
當(dāng)前消費(fèi)線程:pool-1-thread-5报嵌, 消費(fèi)成功虱咧,消費(fèi)數(shù)據(jù)為id: 6
當(dāng)前消費(fèi)線程:pool-1-thread-6, 消費(fèi)成功锚国,消費(fèi)數(shù)據(jù)為id: 9
當(dāng)前線程:pool-1-thread-1, 獲取了數(shù)據(jù)腕巡,id為:13, 進(jìn)行裝載到公共緩沖區(qū)中...
當(dāng)前消費(fèi)線程:pool-1-thread-5, 消費(fèi)成功血筑,消費(fèi)數(shù)據(jù)為id: 11
當(dāng)前線程:pool-1-thread-2, 獲取了數(shù)據(jù)绘沉,id為:14, 進(jìn)行裝載到公共緩沖區(qū)中...
當(dāng)前消費(fèi)線程:pool-1-thread-5煎楣, 消費(fèi)成功,消費(fèi)數(shù)據(jù)為id: 13
當(dāng)前線程:pool-1-thread-3, 獲取了數(shù)據(jù)车伞,id為:15, 進(jìn)行裝載到公共緩沖區(qū)中...
當(dāng)前消費(fèi)線程:pool-1-thread-5择懂, 消費(fèi)成功,消費(fèi)數(shù)據(jù)為id: 14
當(dāng)前消費(fèi)線程:pool-1-thread-6帖世, 消費(fèi)成功休蟹,消費(fèi)數(shù)據(jù)為id: 12
當(dāng)前線程:pool-1-thread-2, 獲取了數(shù)據(jù),id為:16, 進(jìn)行裝載到公共緩沖區(qū)中...
當(dāng)前線程:pool-1-thread-3, 獲取了數(shù)據(jù)日矫,id為:17, 進(jìn)行裝載到公共緩沖區(qū)中...
當(dāng)前消費(fèi)線程:pool-1-thread-6赂弓, 消費(fèi)成功,消費(fèi)數(shù)據(jù)為id: 16
當(dāng)前消費(fèi)線程:pool-1-thread-4哪轿, 消費(fèi)成功盈魁,消費(fèi)數(shù)據(jù)為id: 10
當(dāng)前線程:pool-1-thread-1, 獲取了數(shù)據(jù),id為:18, 進(jìn)行裝載到公共緩沖區(qū)中...
當(dāng)前消費(fèi)線程:pool-1-thread-6窃诉, 消費(fèi)成功杨耙,消費(fèi)數(shù)據(jù)為id: 17
當(dāng)前消費(fèi)線程:pool-1-thread-5, 消費(fèi)成功飘痛,消費(fèi)數(shù)據(jù)為id: 15
當(dāng)前消費(fèi)線程:pool-1-thread-4珊膜, 消費(fèi)成功,消費(fèi)數(shù)據(jù)為id: 18
當(dāng)前線程:pool-1-thread-3, 獲取了數(shù)據(jù)宣脉,id為:19, 進(jìn)行裝載到公共緩沖區(qū)中...
當(dāng)前線程:pool-1-thread-2, 獲取了數(shù)據(jù)车柠,id為:20, 進(jìn)行裝載到公共緩沖區(qū)中...
當(dāng)前消費(fèi)線程:pool-1-thread-6, 消費(fèi)成功塑猖,消費(fèi)數(shù)據(jù)為id: 19
當(dāng)前線程:pool-1-thread-1, 獲取了數(shù)據(jù)竹祷,id為:21, 進(jìn)行裝載到公共緩沖區(qū)中...
當(dāng)前消費(fèi)線程:pool-1-thread-4, 消費(fèi)成功羊苟,消費(fèi)數(shù)據(jù)為id: 21
當(dāng)前消費(fèi)線程:pool-1-thread-5塑陵, 消費(fèi)成功,消費(fèi)數(shù)據(jù)為id: 20