Java生產(chǎn)者消費(fèi)者的三種實(shí)現(xiàn)

一、使用synchronize以及wait()汞幢、notify() /notifyAll()

  • 1.wait()的作用是讓當(dāng)前線程進(jìn)入等待狀態(tài)曹抬,同時(shí),wait()也會(huì)讓當(dāng)前線程釋放它所持有的鎖急鳄“瘢“直到其他線程調(diào)用此對(duì)象的 notify() 方法或 notifyAll() 方法”,當(dāng)前線程被喚醒(進(jìn)入“就緒狀態(tài)”)
  • 2.notify()和notifyAll()的作用疾宏,則是喚醒當(dāng)前對(duì)象上的等待線程张足;notify()是喚醒單個(gè)線程,而notifyAll()是喚醒所有的線程坎藐。
  • 3.wait(long timeout)讓當(dāng)前線程處于“等待(阻塞)狀態(tài)”为牍,“直到其他線程調(diào)用此對(duì)象的notify()方法或 notifyAll() 方法,或者超過指定的時(shí)間量”岩馍,當(dāng)前線程被喚醒(進(jìn)入“就緒狀態(tài)”)碉咆。
/**
 * 使用synchronize以及wait()、notify() /notifyAll()
 */
public class ShareDataV1 {

    /**
     * 原子計(jì)數(shù)
     */
    public static AtomicInteger atomicInteger = new AtomicInteger();

    /**
     * 標(biāo)示
     */
    public volatile boolean flag = true;

    /**
     * 生產(chǎn)隊(duì)列最大容量
     */
    public static final int MAX_COUNT = 10;

    /**
     * 生產(chǎn)隊(duì)列容器
     */
    public static final List<Integer> pool = new ArrayList<>();

    /**
     * 生產(chǎn)
     */
    public void produce() {
        // 判斷蛀恩,干活疫铜,通知
        while (flag) {
            try {
                // 每隔 1000 毫秒生產(chǎn)一個(gè)商品
                Thread.sleep(100);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

            // 這里有一個(gè)鎖
            synchronized (pool) {
                //池子滿了,生產(chǎn)者停止生產(chǎn)
                //埋個(gè)坑双谆,這里用的if
                //TODO 判斷
                while (pool.size() == MAX_COUNT) {
                    try {
                        System.out.println("pool is full, wating...");
                        // TODO 線程阻塞
                        pool.wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                //干活
                pool.add(atomicInteger.incrementAndGet());
                System.out.println("produce number:" + atomicInteger.get() + "\t" + "current size:" + pool.size());
                //通知
                // notify() 方法隨機(jī)喚醒對(duì)象的等待池中的一個(gè)線程壳咕,進(jìn)入鎖池席揽;
                // notifyAll() 喚醒對(duì)象的等待池中的所有線程,進(jìn)入鎖池谓厘。
                pool.notifyAll();
            }
        }
    }


    /**
     * 消費(fèi)
     */
    public void consumue() {
        // 判斷幌羞,干活,通知
        while (flag) {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

            // 這里有一個(gè)鎖
            synchronized (pool) {
                //池子滿了竟稳,生產(chǎn)者停止生產(chǎn)
                //埋個(gè)坑属桦,這里用的if
                //TODO 判斷
                while (pool.size() == 0) {
                    try {
                        System.out.println("pool is empty, wating...");
                        pool.wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                //干活
                int temp = pool.get(0);
                pool.remove(0);
                System.out.println("cousume number:" + temp + "\t" + "current size:" + pool.size());
                //通知
                pool.notifyAll();
            }
        }
    }

    /**
     * 停止
     */
    public void stop() {
        flag = false;
    }


    public static void main(String[] args) {
        ShareDataV1 shareDataV1 = new ShareDataV1();

        // 開啟生產(chǎn)線程
        new Thread(() -> {
            shareDataV1.produce();
        }, "AAA").start();

        // 開啟消費(fèi)線程
        new Thread(() -> {
            shareDataV1.consumue();
        }, "BBB").start();

        // 開啟生產(chǎn)線程
        new Thread(() -> {
            shareDataV1.produce();
        }, "CCC").start();

        // 開啟消費(fèi)線程
        new Thread(() -> {
            shareDataV1.consumue();
        }, "DDD").start();

        try {
            Thread.sleep(5000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        shareDataV1.stop();
    }

}

二、使用Lock他爸,Condition的await和signal方法

  • JUC包下的鎖Lock替代synchronize關(guān)鍵字地啰。await方法代替wait,signal代替notifyall讲逛。
    下面這個(gè)demo實(shí)現(xiàn)了pool的大小為1的生產(chǎn)者消費(fèi)者模型亏吝。
class ShareData {
    private int number = 0;
    private Lock lock = new ReentrantLock();
    private Condition condition = lock.newCondition();

    public void increment() throws Exception {
        lock.lock();
        try { 
            while (number != 0) {
                //等待,不能生產(chǎn)
                condition.await();
            }
            number++;
            System.out.println(Thread.currentThread().getName() + "\t" + number);

            condition.signalAll();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }


    }

    public void decrement() throws Exception {
        lock.lock();
        try {
            while (number == 0) {
                //等待盏混,不能消費(fèi)
                condition.await();
            }

            number--;
            System.out.println(Thread.currentThread().getName() + "\t" + number);

            condition.signalAll();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }


    }
}

public class ProducerConsumer_V2{
    public static void main(String[] args) {
        ShareData shareData = new ShareData();

        new Thread(()->{
            for (int i = 0; i < 5; i++) {
                try {
                    shareData.increment();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }, "AA").start();

        new Thread(()->{
            for (int i = 0; i < 5; i++) {
                try {
                    shareData.decrement();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }, "BB").start();

        new Thread(()->{
            for (int i = 0; i < 5; i++) {
                try {
                    shareData.increment();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }, "CC").start();

        new Thread(()->{
            for (int i = 0; i < 5; i++) {
                try {
                    shareData.decrement();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }, "DD").start();
    }
}

三蔚鸥、下面是使用阻塞隊(duì)列實(shí)現(xiàn)生產(chǎn)者消費(fèi)者模式:

  • 當(dāng)阻塞隊(duì)列為空時(shí),從阻塞隊(duì)列中取數(shù)據(jù)的操作會(huì)被阻塞许赃。
  • 當(dāng)阻塞隊(duì)列為滿時(shí)止喷,往阻塞隊(duì)列中添加數(shù)據(jù)的操作會(huì)被阻塞。

LinkedBlockingQueue

  • 基于單向鏈表的阻塞隊(duì)列實(shí)現(xiàn)混聊,在初始化LinkedBlockingQueue的時(shí)候可以指定對(duì)立的大小弹谁,也可以不指定,默認(rèn)類似一個(gè)無限大小的容量(Integer.MAX_VALUE)句喜,不指隊(duì)列容量大小也是會(huì)有風(fēng)險(xiǎn)的预愤,一旦數(shù)據(jù)生產(chǎn)速度大于消費(fèi)速度,系統(tǒng)內(nèi)存將有可能被消耗殆盡咳胃,因此要謹(jǐn)慎操作植康。另外LinkedBlockingQueue中用于阻塞生產(chǎn)者、消費(fèi)者的鎖是兩個(gè)(鎖分離)展懈,因此生產(chǎn)與消費(fèi)是可以同時(shí)進(jìn)行的销睁。
/**
 * 下面是使用阻塞隊(duì)列實(shí)現(xiàn)生產(chǎn)者消費(fèi)者模式:
 */
public class ShareDataV3 {

    /**
     * 阻塞隊(duì)列容量
     */
    private static final int MAX_CAPACITY = 10;

    /**
     * 阻塞隊(duì)列
     */
    private static BlockingQueue<Integer> blockingQueue = new LinkedBlockingQueue<>(MAX_CAPACITY);

    /**
     * 標(biāo)示
     */
    private volatile boolean FLAG = true;

    /**
     * 原子計(jì)數(shù)
     */
    private AtomicInteger atomicInteger = new AtomicInteger();

    /**
     * 生產(chǎn)
     *
     * @throws InterruptedException
     */
    public void produce() throws InterruptedException {

        // 開啟可控的死循環(huán) 不停的生產(chǎn)
        while (FLAG) {
            // 向隊(duì)列中添加元素
            boolean retvalue = blockingQueue.offer(atomicInteger.incrementAndGet(), 2, TimeUnit.SECONDS);
            // 插入元素是否成功
            if (retvalue == true) {
                System.out.println(Thread.currentThread().getName() + "\t 插入隊(duì)列" + atomicInteger.get() + "成功" + "資源隊(duì)列大小= " + blockingQueue.size());
            } else {
                System.out.println(Thread.currentThread().getName() + "\t 插入隊(duì)列" + atomicInteger.get() + "失敗" + "資源隊(duì)列大小= " + blockingQueue.size());
            }
            // 線程睡眠
            TimeUnit.SECONDS.sleep(1);
        }
        System.out.println(Thread.currentThread().getName() + "FLAG變?yōu)閒lase,生產(chǎn)停止");
    }

    /**
     * 消費(fèi)
     *
     * @throws InterruptedException
     */
    public void consume() throws InterruptedException {
        Integer result = null;
        while (true) {
            // 從隊(duì)列中獲取元素
            result = blockingQueue.poll(2, TimeUnit.SECONDS);
            // 獲取結(jié)果判斷
            if (null == result) {
                System.out.println("超過兩秒沒有取道數(shù)據(jù)存崖,消費(fèi)者即將退出");
                return;
            }
            // 打印獲取結(jié)果
            System.out.println(Thread.currentThread().getName() + "\t 消費(fèi)" + result + "成功" + "\t\t" + "資源隊(duì)列大小= " + blockingQueue.size());
            Thread.sleep(1500);
        }

    }

    /**
     * 停止
     */
    public void stop() {
        this.FLAG = false;
    }


    public static void main(String[] args) {

        ShareDataV3 v3 = new ShareDataV3();

        // 開啟一個(gè)線程 執(zhí)行生產(chǎn)
        new Thread(() -> {
            try {
                v3.produce();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }, "AAA").start();

        // 開啟一個(gè)線程 執(zhí)行消費(fèi)
        new Thread(() -> {
            try {
                v3.consume();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }, "BBB").start();

        // 開啟一個(gè)線程 執(zhí)行消費(fèi)
        new Thread(() -> {
            try {
                v3.consume();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }, "CCC").start();


        try {
            // 測試 主線程睡眠5秒
            Thread.sleep(5000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        v3.stop();
    }

}

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末冻记,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子来惧,更是在濱河造成了極大的恐慌冗栗,老刑警劉巖,帶你破解...
    沈念sama閱讀 216,372評(píng)論 6 498
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異贞瞒,居然都是意外死亡偶房,警方通過查閱死者的電腦和手機(jī)趁曼,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,368評(píng)論 3 392
  • 文/潘曉璐 我一進(jìn)店門军浆,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人挡闰,你說我怎么就攤上這事乒融。” “怎么了摄悯?”我有些...
    開封第一講書人閱讀 162,415評(píng)論 0 353
  • 文/不壞的土叔 我叫張陵赞季,是天一觀的道長。 經(jīng)常有香客問我奢驯,道長申钩,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 58,157評(píng)論 1 292
  • 正文 為了忘掉前任瘪阁,我火速辦了婚禮撒遣,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘管跺。我一直安慰自己义黎,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,171評(píng)論 6 388
  • 文/花漫 我一把揭開白布豁跑。 她就那樣靜靜地躺著廉涕,像睡著了一般。 火紅的嫁衣襯著肌膚如雪艇拍。 梳的紋絲不亂的頭發(fā)上狐蜕,一...
    開封第一講書人閱讀 51,125評(píng)論 1 297
  • 那天,我揣著相機(jī)與錄音卸夕,去河邊找鬼馏鹤。 笑死,一個(gè)胖子當(dāng)著我的面吹牛娇哆,可吹牛的內(nèi)容都是我干的湃累。 我是一名探鬼主播,決...
    沈念sama閱讀 40,028評(píng)論 3 417
  • 文/蒼蘭香墨 我猛地睜開眼碍讨,長吁一口氣:“原來是場噩夢啊……” “哼治力!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起勃黍,我...
    開封第一講書人閱讀 38,887評(píng)論 0 274
  • 序言:老撾萬榮一對(duì)情侶失蹤宵统,失蹤者是張志新(化名)和其女友劉穎,沒想到半個(gè)月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體马澈,經(jīng)...
    沈念sama閱讀 45,310評(píng)論 1 310
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡瓢省,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,533評(píng)論 2 332
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了痊班。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片勤婚。...
    茶點(diǎn)故事閱讀 39,690評(píng)論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖涤伐,靈堂內(nèi)的尸體忽然破棺而出馒胆,到底是詐尸還是另有隱情,我是刑警寧澤凝果,帶...
    沈念sama閱讀 35,411評(píng)論 5 343
  • 正文 年R本政府宣布祝迂,位于F島的核電站,受9級(jí)特大地震影響器净,放射性物質(zhì)發(fā)生泄漏型雳。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,004評(píng)論 3 325
  • 文/蒙蒙 一山害、第九天 我趴在偏房一處隱蔽的房頂上張望纠俭。 院中可真熱鬧,春花似錦粗恢、人聲如沸柑晒。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,659評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽匙赞。三九已至,卻和暖如春妖碉,著一層夾襖步出監(jiān)牢的瞬間涌庭,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 32,812評(píng)論 1 268
  • 我被黑心中介騙來泰國打工欧宜, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留坐榆,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 47,693評(píng)論 2 368
  • 正文 我出身青樓冗茸,卻偏偏與公主長得像席镀,于是被迫代替她去往敵國和親。 傳聞我的和親對(duì)象是個(gè)殘疾皇子夏漱,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,577評(píng)論 2 353