Java生產(chǎn)者消費者模型簡析

前言

生產(chǎn)者和消費者問題是線程模型中的經(jīng)典問題:生產(chǎn)者和消費者在同一時間段內(nèi)共用同一個存儲空間,生產(chǎn)者往存儲空間中添加產(chǎn)品邢锯,消費者從存儲空間中取走產(chǎn)品,當(dāng)存儲空間為空時斤讥,消費者阻塞候生,當(dāng)存儲空間滿時同眯,生產(chǎn)者阻塞。

image

java中最基本的生產(chǎn)者消費者模型是用notify跟wait實現(xiàn)的唯鸭。

加鎖機制的缺陷

在Java程序中嗽测,synchronized解決了多線程競爭的問題。當(dāng)一個加鎖函數(shù)執(zhí)行完成后會自動釋放鎖肿孵,例如,對于一個任務(wù)管理器疏魏,多個線程同時往隊列中添加任務(wù)停做,可以用synchronized加鎖,但是synchronized并沒有解決多線程協(xié)調(diào)的問題大莫。

class TaskQueue {
    Queue<String> queue = new LinkedList<>();

    public synchronized void addTask(String s) {
        this.queue.add(s);
    }

    public synchronized String getTask() {
        while (queue.isEmpty()) {
        }
        return queue.remove();
    }
}

上面這段代碼對兩個方法進行了加鎖蛉腌,但是由于getTask()方法加鎖后,如果queue對象中一直沒有對象就會形成死循環(huán)只厘,鎖得不到釋放烙丛!其他線程也不行拿到鎖進行下一步操作,這樣的問題在多線程中經(jīng)常發(fā)生羔味,java為了解決這樣的問題所以進行了線程的喚醒沉睡操作河咽!

可以將方法進行稍作修改變?yōu)椋?/p>

public synchronized String getTask() {
    while (queue.isEmpty()) {
        this.wait();
    }
    return queue.remove();
}

其中 this.wait() 方法的意思是:

        wait()讓當(dāng)前線程進入等待狀態(tài),同時赋元,wait()也會讓當(dāng)前線程釋放它所持有的鎖忘蟹。“直到其他線程調(diào)用此對象的 notify() 方法或 notifyAll() 方法”搁凸,當(dāng)前線程被喚醒(進入“就緒狀態(tài)”)

所以這里調(diào)用方法后媚值,如果queue中是空的會將此線程進行沉睡,釋放他們的鎖护糖!但是釋放后誰怎么去通知這個線程又可以繼續(xù)進行了呢褥芒?這個時候就需要notify或者notifyAll方法了!

所以我們再將添加方法進行修改得到:

public synchronized void addTask(String s) {
    this.queue.add(s);
    this.notify(); // 喚醒在this鎖等待的線程
}

其中 this.notify() 方法的意思是:

notify()和notifyAll()的作用嫡良,則是喚醒當(dāng)前對象上的等待線程锰扶;notify()是喚醒單個線程,而notifyAll()是喚醒所有的線程皆刺。

這段代碼的意思就是我將一個值放入到queue后少辣,我馬上調(diào)用notify方法去通知那些進入wait方法的線程可以繼續(xù)執(zhí)行了!

最簡單的生產(chǎn)消費

根據(jù)上面的解釋羡蛾,我們完整的代碼可以像下面這樣:


public class Java_21 {

    public static void main(String[] args) throws InterruptedException {
        TaskQueue q = new TaskQueue();
        List<Thread> ts = new ArrayList<>();
        for (int i = 0; i < 5; i++) {
            Thread t = new Thread(() -> {
                // 執(zhí)行task:
                while (true) {
                    try {
                        String s = q.getTask();
                        System.out.println("execute task: " + s);
                    } catch (InterruptedException e) {
                        return;
                    }
                }
            });
            t.start();
            ts.add(t);
        }
        Thread add = new Thread(() -> {
            for (int i = 0; i < 10; i++) {
                // 放入task:
                String s = "t-" + Math.random();
                System.out.println("add task: " + s);
                q.addTask(s);
                try {
                    Thread.sleep(100);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });
        add.start();
        add.join();
        Thread.sleep(100);
        for (Thread thread : ts) {
            thread.interrupt();
        }
    }
}

class TaskQueue {
    private Queue<String> stringQueue = new LinkedList<>();

    public synchronized void addTask(String s) {
        stringQueue.add(s);
//        notify()和notifyAll()的作用漓帅,則是喚醒當(dāng)前對象上的等待線程;notify()是喚醒單個線程,而notifyAll()是喚醒所有的線程忙干。
        this.notifyAll();  // 喚醒在this鎖等待的所有線程
    }


    public synchronized String getTask() throws InterruptedException {
        while (stringQueue.isEmpty()) {
            //wait()的作用是讓當(dāng)前線程進入等待狀態(tài)器予,同時,wait()也會讓當(dāng)前線程釋放它所持有的鎖捐迫∏瑁“直到其他線程調(diào)用此對象的 notify() 方法或 notifyAll() 方法”,當(dāng)前線程被喚醒(進入“就緒狀態(tài)”)
            this.wait();
        }
        return stringQueue.remove();
    }
}

運行的結(jié)果就是:

add task: t-0.8499205383861345
execute task: t-0.8499205383861345
add task: t-0.7942800777561965
execute task: t-0.7942800777561965
add task: t-0.6906469783761674
execute task: t-0.6906469783761674
add task: t-0.8532616076721191
execute task: t-0.8532616076721191
add task: t-0.7939063475258298
execute task: t-0.7939063475258298
add task: t-0.658439645359411
execute task: t-0.658439645359411
add task: t-0.1836403135968735
execute task: t-0.1836403135968735
add task: t-0.868308480446476
execute task: t-0.868308480446476
add task: t-0.5658234674527566
execute task: t-0.5658234674527566
add task: t-0.8699112304857072
execute task: t-0.8699112304857072

一眼就能看出是添加一個元素后馬上取出一個元素施戴。

對上面的代碼進行解釋:

我們有一個TaskQueue類里面有個添加與取出的方法反浓,兩個方法都進行了加鎖的機制。

首先我們看取出的getTask()機制:

我們判斷stringQueue是不是空如果是空就調(diào)用wait() 方法釋放當(dāng)前線程的鎖赞哗,然后進入到睡眠狀態(tài)

再來看addTask()方法:

我們向stringQueue中添加一個值雷则,添加好了后我們馬上調(diào)用notifyAll()方法喚醒簽名所有調(diào)用了wait()方法的線程進行消費!

最后再來看main函數(shù)的執(zhí)行:

我們開啟了五個線程進行g(shù)etTask()的執(zhí)行肪笋,表示取出操作月劈,我們再開啟一個線程進行了addTask()操作,我們調(diào)用join方法讓add線程進行完成后藤乙,我們再遍歷集合調(diào)用interrupt()方法進行所有線程的中斷操作

執(zhí)行過程:

add線程中我調(diào)用addTask()后添加一個值馬上就去調(diào)用notifyAll()通知其他五個等待線程去取猜揪,至于是哪個線程取到這就是cpu自己的調(diào)度了,等取空了后五個線程中又進入了休眠狀態(tài)坛梁,我這邊又去添加值而姐,這樣重復(fù)進行就可以達(dá)到一放一取的操作

生產(chǎn)者消費者擴充

一般的生產(chǎn)者消費者模型都是給了一個緩沖區(qū)的概念,都是添加到緩沖區(qū)罚勾,等緩沖區(qū)到達(dá)一個值后生產(chǎn)者就不生產(chǎn)了毅人,然后讓消費者去消費,等消費者消費完后又暫停等生產(chǎn)者去生產(chǎn)尖殃!跟前言里面的圖是一個道理丈莺,將我們的代碼簡單的變化就能達(dá)到:

修改生產(chǎn)者:

  public synchronized void addTask(String s) {
        stringQueue.add(s);
        if (stringQueue.size()==100){
            this.notifyAll();
        }
    }

中間的緩沖區(qū)大小就是100.

最后

生產(chǎn)者消費者模式我們不是經(jīng)常用到,但是我們開發(fā)中用到的框架很多都是基于這樣的線程模型送丰,所以掌握原理后有助于我們開發(fā)中解決實際的線程安全交流問題

補充

加鎖的寫法有很多種缔俄,例如我將上面的方法不用this加鎖,用一個中間常量效果一樣器躏,完整代碼如下:

public class Java_21 {
     static final String LOCK = "lock";

    public static void main(String[] args) throws InterruptedException {


        TaskQueue q = new TaskQueue();
        List<Thread> ts = new ArrayList<>();
        for (int i = 0; i < 5; i++) {
            Thread t = new Thread(() -> {
                // 執(zhí)行task:
                while (true) {
                    try {
                        String s = q.getTask();
                        System.out.println("execute task: " + s);
                    } catch (InterruptedException e) {
                        return;
                    }
                }
            });
            t.start();
            ts.add(t);
        }
        Thread add = new Thread(() -> {
            for (int i = 0; i < 10; i++) {
                // 放入task:
                String s = "t-" + Math.random();
                System.out.println("add task: " + s);
                q.addTask(s);
                try {
                    Thread.sleep(100);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });
        add.start();
        add.join();
        Thread.sleep(100);
        for (Thread thread : ts) {
            thread.interrupt();
        }
    }
}

class TaskQueue {
    private Queue<String> stringQueue = new LinkedList<>();

    public  void addTask(String s) {
        synchronized(LOCK){
            stringQueue.add(s);
//        if (stringQueue.size()==100){
//            LOCK.notifyAll();  // 喚醒在this鎖等待的所有線程
//
//        }

            LOCK.notifyAll();
        }
    }


    public  String getTask() throws InterruptedException {
        synchronized (LOCK){
            while (stringQueue.isEmpty()) {
                LOCK.wait();
            }
            return stringQueue.remove();
        }
    }
}

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末俐载,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子登失,更是在濱河造成了極大的恐慌遏佣,老刑警劉巖,帶你破解...
    沈念sama閱讀 216,324評論 6 498
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件揽浙,死亡現(xiàn)場離奇詭異状婶,居然都是意外死亡意敛,警方通過查閱死者的電腦和手機,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,356評論 3 392
  • 文/潘曉璐 我一進店門膛虫,熙熙樓的掌柜王于貴愁眉苦臉地迎上來草姻,“玉大人,你說我怎么就攤上這事稍刀×枚溃” “怎么了?”我有些...
    開封第一講書人閱讀 162,328評論 0 353
  • 文/不壞的土叔 我叫張陵账月,是天一觀的道長综膀。 經(jīng)常有香客問我,道長局齿,這世上最難降的妖魔是什么僧须? 我笑而不...
    開封第一講書人閱讀 58,147評論 1 292
  • 正文 為了忘掉前任,我火速辦了婚禮项炼,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘示绊。我一直安慰自己锭部,他們只是感情好,可當(dāng)我...
    茶點故事閱讀 67,160評論 6 388
  • 文/花漫 我一把揭開白布面褐。 她就那樣靜靜地躺著拌禾,像睡著了一般。 火紅的嫁衣襯著肌膚如雪展哭。 梳的紋絲不亂的頭發(fā)上湃窍,一...
    開封第一講書人閱讀 51,115評論 1 296
  • 那天,我揣著相機與錄音匪傍,去河邊找鬼您市。 笑死,一個胖子當(dāng)著我的面吹牛役衡,可吹牛的內(nèi)容都是我干的茵休。 我是一名探鬼主播,決...
    沈念sama閱讀 40,025評論 3 417
  • 文/蒼蘭香墨 我猛地睜開眼手蝎,長吁一口氣:“原來是場噩夢啊……” “哼榕莺!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起棵介,我...
    開封第一講書人閱讀 38,867評論 0 274
  • 序言:老撾萬榮一對情侶失蹤钉鸯,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后邮辽,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體唠雕,經(jīng)...
    沈念sama閱讀 45,307評論 1 310
  • 正文 獨居荒郊野嶺守林人離奇死亡贸营,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,528評論 2 332
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了及塘。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片莽使。...
    茶點故事閱讀 39,688評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖笙僚,靈堂內(nèi)的尸體忽然破棺而出芳肌,到底是詐尸還是另有隱情,我是刑警寧澤肋层,帶...
    沈念sama閱讀 35,409評論 5 343
  • 正文 年R本政府宣布亿笤,位于F島的核電站,受9級特大地震影響栋猖,放射性物質(zhì)發(fā)生泄漏净薛。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 41,001評論 3 325
  • 文/蒙蒙 一蒲拉、第九天 我趴在偏房一處隱蔽的房頂上張望肃拜。 院中可真熱鬧,春花似錦雌团、人聲如沸燃领。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,657評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽猛蔽。三九已至,卻和暖如春灵寺,著一層夾襖步出監(jiān)牢的瞬間曼库,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 32,811評論 1 268
  • 我被黑心中介騙來泰國打工略板, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留毁枯,地道東北人。 一個月前我還...
    沈念sama閱讀 47,685評論 2 368
  • 正文 我出身青樓叮称,卻偏偏與公主長得像后众,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子颅拦,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 44,573評論 2 353

推薦閱讀更多精彩內(nèi)容