在并發(fā)編程中砸脊,比較經(jīng)典的編程例子就是生產(chǎn)者和消費(fèi)者模型。下面就是一個(gè)例子來詮釋一下什么是生產(chǎn)者和消費(fèi)者以及他們的特點(diǎn)和注意點(diǎn)纬霞。
1凌埂、先定義一個(gè)數(shù)據(jù)對象,
public class Data {
private String id;
private String name;
public Data(String id,String name){
this.id = id;
this.name = name;
}
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
@Override
public String toString() {
return "Data [id=" + id + ", name=" + name + "]";
}
}
2.定義一個(gè)生產(chǎn)者诗芜,實(shí)現(xiàn)Runnable接口
public class Provider implements Runnable{
//共享緩沖區(qū)
private BlockingQueue<Data> queue;
//多線程間是否啟動(dòng)變量瞳抓,有強(qiáng)制從主內(nèi)存中刷新的功能,及時(shí)返回線程狀態(tài)
private volatile boolean isRunning = true;
//id生成器
private static AtomicInteger count = new AtomicInteger();
//隨機(jī)對象
private static Random r = new Random();
public Provider(BlockingQueue queue){
this.queue = queue;
}
@Override
public void run() {
while(isRunning){
//隨機(jī)休眠0-1000毫秒 表示獲取數(shù)據(jù)
try {
Thread.sleep(r.nextInt(1000));
//獲取的數(shù)據(jù)進(jìn)行累計(jì)
int id = count.incrementAndGet();
//比如通過一個(gè)getData()方法獲取了
Data data = new Data(Integer.toString(id),"數(shù)據(jù)"+id);
System.out.println("當(dāng)前線程:"+ Thread.currentThread().getName() + ",獲取了數(shù)據(jù)伏恐,id為:"+ id+ ",進(jìn)行裝載到公共緩沖區(qū)中孩哑。。翠桦。");
if(!this.queue.offer(data,2,TimeUnit.SECONDS)){
System.out.print("提交緩沖區(qū)數(shù)據(jù)失敗");
}
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.print("aaa");
}
}
public void stop(){
this.isRunning = false;
}
}
這里有幾個(gè)注意點(diǎn)横蜒,一個(gè)就是對共享緩沖區(qū)的選擇,作為生產(chǎn)者–消費(fèi)者模型而言,共享緩沖區(qū)一定要具備阻塞的能力丛晌。所以這邊選擇的是阻塞隊(duì)列鹰霍。還有一個(gè)就是在并發(fā)編程的時(shí)候,如果需要使用類似i++這種id自增長的功能茵乱,需要使用Atomic包下的并發(fā)類茂洒。因?yàn)檫@些類是采用CAS設(shè)計(jì)的,不會(huì)產(chǎn)生并發(fā)問題瓶竭。
3.消費(fèi)者
public class Consumer implements Runnable {
private BlockingQueue<Data> queue;
public Consumer(BlockingQueue queu){
this.queue = queu;
}
//隨機(jī)對象
private static Random r = new Random();
@Override
public void run() {
while(true){
try{
//獲取數(shù)據(jù)
Data data = this.queue.take();
//進(jìn)行數(shù)據(jù)處理督勺,休眠 0-1000毫秒模擬耗時(shí)
Thread.sleep(r.nextInt(1000));
System.out.print("當(dāng)前消費(fèi)線程"+Thread.currentThread().getName() +",消費(fèi)成功,消費(fèi)id為"+data.getId());
}catch(InterruptedException e){
e.printStackTrace();
}
}
}
}
消費(fèi)者主要就是從阻塞隊(duì)列中獲取數(shù)據(jù)智哀,如果隊(duì)列中沒有元素,則會(huì)釋放CPU荧恍,然后等待。(注意這里使用的是take而不是poll送巡,不同點(diǎn)在于take在沒有元素的時(shí)候會(huì)釋放CPU,而poll則是直接返回null)骗爆。
main函數(shù):
public class Main {
public static void main(String[] args){
//內(nèi)存緩沖區(qū)
BlockingQueue<Data> queue = new LinkedBlockingQueue<Data>(10);
//生產(chǎn)者
Provider p1 = new Provider(queue);
Provider p2 = new Provider(queue);
Provider p3 = new Provider(queue);
Consumer c1 = new Consumer(queue);
Consumer c2 = new Consumer(queue);
Consumer c3 = new Consumer(queue);
//創(chuàng)建線程池次氨,這是一個(gè)緩存的線程池摘投,可以創(chuàng)建無窮大的線程,沒有任務(wù)的時(shí)候不創(chuàng)建線程犀呼,空閑線程存活的時(shí)間為60s。
ExecutorService cachepool = Executors.newCachedThreadPool();
cachepool.execute(p1);
cachepool.execute(p2);
cachepool.execute(p3);
cachepool.execute(c1);
cachepool.execute(c2);
cachepool.execute(c3);
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
p1.stop();
p2.stop();
p3.stop();
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}