生產(chǎn)者消費(fèi)者模型示例

生產(chǎn)者消費(fèi)者模型
Main

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;

public class Main {

    public static void main(String[] args) throws Exception {
        //內(nèi)存緩沖區(qū)
        BlockingQueue<Data> queue = new LinkedBlockingQueue<Data>(10);
        //生產(chǎn)者
        Provider p1 = new Provider(queue);
        
        Provider p2 = new Provider(queue);
        Provider p3 = new Provider(queue);
        //消費(fèi)者
        Consumer c1 = new Consumer(queue);
        Consumer c2 = new Consumer(queue);
        Consumer c3 = new Consumer(queue);
        //創(chuàng)建線程池運(yùn)行,這是一個緩存的線程池筷弦,可以創(chuàng)建無窮大的線程,沒有任務(wù)的時候不創(chuàng)建線程脯颜”韵桑空閑線程存活時間為60s(默認(rèn)值)

        ExecutorService cachePool = Executors.newCachedThreadPool();
        cachePool.execute(p1);
        cachePool.execute(p2);
        cachePool.execute(p3);
        cachePool.execute(c1);
        cachePool.execute(c2);
        cachePool.execute(c3);

        try {
            Thread.sleep(3000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        p1.stop();
        p2.stop();
        p3.stop();
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }       
//      cachePool.shutdown();
//      cachePool.shutdownNow();
        

    }
    
}

provider(生產(chǎn)者)

import java.util.Random;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

public class Provider implements Runnable{
    
    //共享緩存區(qū)
    private BlockingQueue<Data> queue;
    //多線程間是否啟動變量,有強(qiáng)制從主內(nèi)存中刷新的功能承桥。即時返回線程的狀態(tài)
    private volatile boolean isRunning = true;
    //id生成器
    private static AtomicInteger count = new AtomicInteger();
    //隨機(jī)對象
    private static Random r = new Random(); 
    
    public Provider(BlockingQueue queue){
        this.queue = queue;
    }

    @Override
    public void run() {
        while(isRunning){
            try {
                //隨機(jī)休眠0 - 1000 毫秒 表示獲取數(shù)據(jù)(產(chǎn)生數(shù)據(jù)的耗時) 
                Thread.sleep(r.nextInt(1000));
                //獲取的數(shù)據(jù)進(jìn)行累計...
                int id = count.incrementAndGet();
                //比如通過一個getData方法獲取了
                Data data = new Data(Integer.toString(id), "數(shù)據(jù)" + id);
                System.out.println("當(dāng)前線程:" + Thread.currentThread().getName() + ", 獲取了數(shù)據(jù)费薄,id為:" + id + ", 進(jìn)行裝載到公共緩沖區(qū)中...");
                if(!this.queue.offer(data, 2, TimeUnit.SECONDS)){
                    System.out.println("提交緩沖區(qū)數(shù)據(jù)失敗....");
                    //do something... 比如重新提交
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
    
    public void stop(){
        this.isRunning = false;
    }
    
}

Consumer(消費(fèi)者)

import java.util.Random;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;

public class Consumer implements Runnable{

    private BlockingQueue<Data> queue;
    
    public Consumer(BlockingQueue queue){
        this.queue = queue;
    }
    
    //隨機(jī)對象
    private static Random r = new Random(); 

    @Override
    public void run() {
        while(true){
            try {
                //獲取數(shù)據(jù)
                Data data = this.queue.take();
                //進(jìn)行數(shù)據(jù)處理坷檩。休眠0 - 1000毫秒模擬耗時
                Thread.sleep(r.nextInt(1000));
                System.out.println("當(dāng)前消費(fèi)線程:" + Thread.currentThread().getName() + ", 消費(fèi)成功赡艰,消費(fèi)數(shù)據(jù)為id: " + data.getId());
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

Data數(shù)據(jù)

public final class Data {

    private String id;
    private String name;
    
    public Data(String id, String name){
        this.id = id;
        this.name = name;
    }
    
    public String getId() {
        return id;
    }

    public void setId(String id) {
        this.id = id;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    @Override
    public String toString(){
        return "{id: " + id + ", name: " + name + "}";
    }
    
}

log信息

當(dāng)前線程:pool-1-thread-3, 獲取了數(shù)據(jù)售淡,id為:1, 進(jìn)行裝載到公共緩沖區(qū)中...
當(dāng)前消費(fèi)線程:pool-1-thread-4, 消費(fèi)成功慷垮,消費(fèi)數(shù)據(jù)為id: 1
當(dāng)前線程:pool-1-thread-1, 獲取了數(shù)據(jù)揖闸,id為:2, 進(jìn)行裝載到公共緩沖區(qū)中...
當(dāng)前線程:pool-1-thread-2, 獲取了數(shù)據(jù),id為:3, 進(jìn)行裝載到公共緩沖區(qū)中...
當(dāng)前消費(fèi)線程:pool-1-thread-6料身, 消費(fèi)成功汤纸,消費(fèi)數(shù)據(jù)為id: 3
當(dāng)前線程:pool-1-thread-3, 獲取了數(shù)據(jù),id為:4, 進(jìn)行裝載到公共緩沖區(qū)中...
當(dāng)前線程:pool-1-thread-2, 獲取了數(shù)據(jù)芹血,id為:5, 進(jìn)行裝載到公共緩沖區(qū)中...
當(dāng)前線程:pool-1-thread-3, 獲取了數(shù)據(jù)蹲嚣,id為:6, 進(jìn)行裝載到公共緩沖區(qū)中...
當(dāng)前線程:pool-1-thread-1, 獲取了數(shù)據(jù),id為:7, 進(jìn)行裝載到公共緩沖區(qū)中...
當(dāng)前線程:pool-1-thread-2, 獲取了數(shù)據(jù)祟牲,id為:8, 進(jìn)行裝載到公共緩沖區(qū)中...
當(dāng)前消費(fèi)線程:pool-1-thread-5隙畜, 消費(fèi)成功,消費(fèi)數(shù)據(jù)為id: 2
當(dāng)前消費(fèi)線程:pool-1-thread-4说贝, 消費(fèi)成功议惰,消費(fèi)數(shù)據(jù)為id: 4
當(dāng)前線程:pool-1-thread-1, 獲取了數(shù)據(jù),id為:9, 進(jìn)行裝載到公共緩沖區(qū)中...
當(dāng)前線程:pool-1-thread-3, 獲取了數(shù)據(jù)乡恕,id為:10, 進(jìn)行裝載到公共緩沖區(qū)中...
當(dāng)前線程:pool-1-thread-2, 獲取了數(shù)據(jù)秒裕,id為:11, 進(jìn)行裝載到公共緩沖區(qū)中...
當(dāng)前線程:pool-1-thread-3, 獲取了數(shù)據(jù),id為:12, 進(jìn)行裝載到公共緩沖區(qū)中...
當(dāng)前消費(fèi)線程:pool-1-thread-4闷板, 消費(fèi)成功侦铜,消費(fèi)數(shù)據(jù)為id: 7
當(dāng)前消費(fèi)線程:pool-1-thread-6, 消費(fèi)成功函卒,消費(fèi)數(shù)據(jù)為id: 5
當(dāng)前消費(fèi)線程:pool-1-thread-4辆憔, 消費(fèi)成功,消費(fèi)數(shù)據(jù)為id: 8
當(dāng)前消費(fèi)線程:pool-1-thread-5报嵌, 消費(fèi)成功虱咧,消費(fèi)數(shù)據(jù)為id: 6
當(dāng)前消費(fèi)線程:pool-1-thread-6, 消費(fèi)成功锚国,消費(fèi)數(shù)據(jù)為id: 9
當(dāng)前線程:pool-1-thread-1, 獲取了數(shù)據(jù)腕巡,id為:13, 進(jìn)行裝載到公共緩沖區(qū)中...
當(dāng)前消費(fèi)線程:pool-1-thread-5, 消費(fèi)成功血筑,消費(fèi)數(shù)據(jù)為id: 11
當(dāng)前線程:pool-1-thread-2, 獲取了數(shù)據(jù)绘沉,id為:14, 進(jìn)行裝載到公共緩沖區(qū)中...
當(dāng)前消費(fèi)線程:pool-1-thread-5煎楣, 消費(fèi)成功,消費(fèi)數(shù)據(jù)為id: 13
當(dāng)前線程:pool-1-thread-3, 獲取了數(shù)據(jù)车伞,id為:15, 進(jìn)行裝載到公共緩沖區(qū)中...
當(dāng)前消費(fèi)線程:pool-1-thread-5择懂, 消費(fèi)成功,消費(fèi)數(shù)據(jù)為id: 14
當(dāng)前消費(fèi)線程:pool-1-thread-6帖世, 消費(fèi)成功休蟹,消費(fèi)數(shù)據(jù)為id: 12
當(dāng)前線程:pool-1-thread-2, 獲取了數(shù)據(jù),id為:16, 進(jìn)行裝載到公共緩沖區(qū)中...
當(dāng)前線程:pool-1-thread-3, 獲取了數(shù)據(jù)日矫,id為:17, 進(jìn)行裝載到公共緩沖區(qū)中...
當(dāng)前消費(fèi)線程:pool-1-thread-6赂弓, 消費(fèi)成功,消費(fèi)數(shù)據(jù)為id: 16
當(dāng)前消費(fèi)線程:pool-1-thread-4哪轿, 消費(fèi)成功盈魁,消費(fèi)數(shù)據(jù)為id: 10
當(dāng)前線程:pool-1-thread-1, 獲取了數(shù)據(jù),id為:18, 進(jìn)行裝載到公共緩沖區(qū)中...
當(dāng)前消費(fèi)線程:pool-1-thread-6窃诉, 消費(fèi)成功杨耙,消費(fèi)數(shù)據(jù)為id: 17
當(dāng)前消費(fèi)線程:pool-1-thread-5, 消費(fèi)成功飘痛,消費(fèi)數(shù)據(jù)為id: 15
當(dāng)前消費(fèi)線程:pool-1-thread-4珊膜, 消費(fèi)成功,消費(fèi)數(shù)據(jù)為id: 18
當(dāng)前線程:pool-1-thread-3, 獲取了數(shù)據(jù)宣脉,id為:19, 進(jìn)行裝載到公共緩沖區(qū)中...
當(dāng)前線程:pool-1-thread-2, 獲取了數(shù)據(jù)车柠,id為:20, 進(jìn)行裝載到公共緩沖區(qū)中...
當(dāng)前消費(fèi)線程:pool-1-thread-6, 消費(fèi)成功塑猖,消費(fèi)數(shù)據(jù)為id: 19
當(dāng)前線程:pool-1-thread-1, 獲取了數(shù)據(jù)竹祷,id為:21, 進(jìn)行裝載到公共緩沖區(qū)中...
當(dāng)前消費(fèi)線程:pool-1-thread-4, 消費(fèi)成功羊苟,消費(fèi)數(shù)據(jù)為id: 21
當(dāng)前消費(fèi)線程:pool-1-thread-5塑陵, 消費(fèi)成功,消費(fèi)數(shù)據(jù)為id: 20

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末蜡励,一起剝皮案震驚了整個濱河市令花,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌巍虫,老刑警劉巖彭则,帶你破解...
    沈念sama閱讀 212,884評論 6 492
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異占遥,居然都是意外死亡,警方通過查閱死者的電腦和手機(jī)输瓜,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,755評論 3 385
  • 文/潘曉璐 我一進(jìn)店門瓦胎,熙熙樓的掌柜王于貴愁眉苦臉地迎上來芬萍,“玉大人,你說我怎么就攤上這事搔啊〖盱簦” “怎么了?”我有些...
    開封第一講書人閱讀 158,369評論 0 348
  • 文/不壞的土叔 我叫張陵负芋,是天一觀的道長漫蛔。 經(jīng)常有香客問我,道長旧蛾,這世上最難降的妖魔是什么莽龟? 我笑而不...
    開封第一講書人閱讀 56,799評論 1 285
  • 正文 為了忘掉前任,我火速辦了婚禮锨天,結(jié)果婚禮上毯盈,老公的妹妹穿的比我還像新娘。我一直安慰自己病袄,他們只是感情好搂赋,可當(dāng)我...
    茶點故事閱讀 65,910評論 6 386
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著益缠,像睡著了一般脑奠。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上幅慌,一...
    開封第一講書人閱讀 50,096評論 1 291
  • 那天宋欺,我揣著相機(jī)與錄音,去河邊找鬼欠痴。 笑死迄靠,一個胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的喇辽。 我是一名探鬼主播掌挚,決...
    沈念sama閱讀 39,159評論 3 411
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼菩咨!你這毒婦竟也來了吠式?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 37,917評論 0 268
  • 序言:老撾萬榮一對情侶失蹤抽米,失蹤者是張志新(化名)和其女友劉穎特占,沒想到半個月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體云茸,經(jīng)...
    沈念sama閱讀 44,360評論 1 303
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡是目,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 36,673評論 2 327
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了标捺。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片懊纳。...
    茶點故事閱讀 38,814評論 1 341
  • 序言:一個原本活蹦亂跳的男人離奇死亡揉抵,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出嗤疯,到底是詐尸還是另有隱情冤今,我是刑警寧澤,帶...
    沈念sama閱讀 34,509評論 4 334
  • 正文 年R本政府宣布茂缚,位于F島的核電站戏罢,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏脚囊。R本人自食惡果不足惜龟糕,卻給世界環(huán)境...
    茶點故事閱讀 40,156評論 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望凑术。 院中可真熱鬧翩蘸,春花似錦、人聲如沸淮逊。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,882評論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽泄鹏。三九已至郎任,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間备籽,已是汗流浹背舶治。 一陣腳步聲響...
    開封第一講書人閱讀 32,123評論 1 267
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機(jī)就差點兒被人妖公主榨干…… 1. 我叫王不留车猬,地道東北人霉猛。 一個月前我還...
    沈念sama閱讀 46,641評論 2 362
  • 正文 我出身青樓,卻偏偏與公主長得像珠闰,于是被迫代替她去往敵國和親惜浅。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 43,728評論 2 351

推薦閱讀更多精彩內(nèi)容