生產者消費者的五種實現(xiàn)

以下內容整理自互聯(lián)網社搅,僅用于個人學習


轉載自http://huachao1001.github.io/article.html?QhSkxKKX

生產者消費者問題:
生產者消費者模式是通過一個容器來解決生產者和消費者的強耦合問題。

生產者消費者模式的優(yōu)點:

  • 解耦
  • 支持并發(fā)
  • 支持忙閑不均

解決方法可分為兩類:

  1. 用信號量和鎖機制實現(xiàn)生產者和消費者之間的同步
  • wait() / notify()方法
  • await() / signal()方法
  • BlockingQueue阻塞隊列方法
  • Semaphore方法
  1. 在生產者和消費者之間建立一個管道订歪。(一般不使用撤奸,緩沖區(qū)不易控制署咽、數(shù)據(jù)不易封裝和傳輸)
  • PipedInputStream / PipedOutputStream

wait() / notify()方法

publicclassTest{
privatestaticIntegercount=0;
privatefinalIntegerFULL=5;
privatestaticStringlock="lock";

publicstaticvoidmain(String[]args){
Testt=newTest();
newThread(t.newProducer()).start();
newThread(t.newConsumer()).start();
newThread(t.newProducer()).start();
newThread(t.newConsumer()).start();
}

classProducerimplementsRunnable{
@Override
publicvoidrun(){
for(inti=0;i<5;i++){
try{
Thread.sleep(1000);
}catch(InterruptedExceptione1){
e1.printStackTrace();
}
synchronized(lock){
while(count==FULL){
try{
lock.wait();
}catch(InterruptedExceptione){
e.printStackTrace();
}
}
count++;
System.out.println("生產者"+Thread.currentThread().getName()
+"已生產完成漱牵,商品數(shù)量:"+count);
lock.notifyAll();
}
}
}
}

classConsumerimplementsRunnable{
@Override
publicvoidrun(){
for(inti=0;i<5;i++){
try{
Thread.sleep(1000);
}catch(InterruptedExceptione1){
e1.printStackTrace();
}
synchronized(lock){
while(count==0){
try{
lock.wait();
}catch(InterruptedExceptione){
e.printStackTrace();
}
}
count--;
System.out.println("消費者"+Thread.currentThread().getName()
+"已消費戳杀,剩余商品數(shù)量:"+count);
lock.notifyAll();
}
}
}
}
}

運行結果:

生產者Thread-0已生產完成艾少,商品數(shù)量:1
生產者Thread-2已生產完成卡乾,商品數(shù)量:2
消費者Thread-1已消費,剩余商品數(shù)量:1
消費者Thread-3已消費缚够,剩余商品數(shù)量:0
生產者Thread-0已生產完成幔妨,商品數(shù)量:1
消費者Thread-3已消費,剩余商品數(shù)量:0
生產者Thread-2已生產完成谍椅,商品數(shù)量:1
消費者Thread-1已消費误堡,剩余商品數(shù)量:0
生產者Thread-0已生產完成,商品數(shù)量:1
生產者Thread-2已生產完成雏吭,商品數(shù)量:2
消費者Thread-1已消費锁施,剩余商品數(shù)量:1
消費者Thread-3已消費,剩余商品數(shù)量:0
生產者Thread-0已生產完成杖们,商品數(shù)量:1
消費者Thread-1已消費悉抵,剩余商品數(shù)量:0
生產者Thread-2已生產完成,商品數(shù)量:1
消費者Thread-3已消費摘完,剩余商品數(shù)量:0
生產者Thread-0已生產完成姥饰,商品數(shù)量:1
消費者Thread-1已消費,剩余商品數(shù)量:0
生產者Thread-2已生產完成孝治,商品數(shù)量:1
消費者Thread-3已消費媳否,剩余商品數(shù)量:0

await() / signal()方法

await()/signal()是對wait()/notify()的改進,功能更加強大荆秦,更適用于高級用戶篱竭,synchronized是托管給JVM執(zhí)行的,而lock是Java寫的控制鎖的代碼步绸。

下面是使用ReentrantLock來實現(xiàn)生產者消費者問題:

publicclassTest{
privatestaticIntegercount=0;//緩沖區(qū)
privatefinalIntegerFULL=5;
finalLocklock=newReentrantLock();//獲得可重入鎖
finalConditionput=lock.newCondition();
finalConditionget=lock.newCondition();

publicstaticvoidmain(String[]args){

Testt=newTest();
newThread(t.newProducer()).start();
newThread(t.newConsumer()).start();
newThread(t.newConsumer()).start();
newThread(t.newProducer()).start();
}

//生產者
classProducerimplementsRunnable{
@Override
publicvoidrun(){
for(inti=0;i<5;i++){
try{
Thread.sleep(1000);
}catch(InterruptedExceptione1){
e1.printStackTrace();
}
//加鎖
lock.lock();
try{
while(count==FULL){
try{
put.await();
}catch(InterruptedExceptione){
e.printStackTrace();
}
}
count++;
System.out.println("生產者"+Thread.currentThread().getName()
+"已生產完成掺逼,商品數(shù)量:"+count);
//通知消費者,現(xiàn)在可以消費
get.signal();
}finally{
lock.unlock();
}
}
}
}

classConsumerimplementsRunnable{

@Override
publicvoidrun(){
for(inti=0;i<5;i++){
try{
Thread.sleep(1000);
}catch(InterruptedExceptione1){
e1.printStackTrace();
}
lock.lock();
try{
while(count==0){
try{
get.await();
}catch(Exceptione){
e.printStackTrace();
}
}
count--;
System.out.println("消費者"+Thread.currentThread().getName()
+"已消費瓤介,剩余商品數(shù)量:"+count);
put.signal();
}finally{
lock.unlock();
}
}
}
}
}

運行結果:

生產者Thread-3已生產完成吕喘,商品數(shù)量:1
生產者Thread-0已生產完成赘那,商品數(shù)量:2
消費者Thread-1已消費,剩余商品數(shù)量:1
消費者Thread-2已消費氯质,剩余商品數(shù)量:0
生產者Thread-3已生產完成募舟,商品數(shù)量:1
生產者Thread-0已生產完成,商品數(shù)量:2
消費者Thread-1已消費闻察,剩余商品數(shù)量:1
消費者Thread-2已消費拱礁,剩余商品數(shù)量:0
生產者Thread-0已生產完成,商品數(shù)量:1
生產者Thread-3已生產完成辕漂,商品數(shù)量:2
消費者Thread-1已消費呢灶,剩余商品數(shù)量:1
消費者Thread-2已消費,剩余商品數(shù)量:0
生產者Thread-0已生產完成钉嘹,商品數(shù)量:1
生產者Thread-3已生產完成鸯乃,商品數(shù)量:2
消費者Thread-2已消費,剩余商品數(shù)量:1
消費者Thread-1已消費跋涣,剩余商品數(shù)量:0
生產者Thread-3已生產完成缨睡,商品數(shù)量:1
生產者Thread-0已生產完成,商品數(shù)量:2
消費者Thread-2已消費陈辱,剩余商品數(shù)量:1
消費者Thread-1已消費宏蛉,剩余商品數(shù)量:0

BlockingQueue阻塞隊列方法

BlockingQueue實現(xiàn)主要用于生產者-使用者隊列,但它另外還支持Collection接口性置。是線程安全的,所有排隊方法都可以使用內部鎖或其他形式的并發(fā)控制來自動達到它們的目的揍堰。

用于阻塞的兩個方法:

  • put()方法:將指定元素插入此隊列中鹏浅,將等待可用的空間(如果有必要)。
  • take()方法:獲取并移除此隊列的頭部屏歹,在指定的等待時間前等待可用的元素(如果有必要)隐砸。
public class Test { 
    private static Integer count = 0; 
    final BlockingQueue<Integer> bq = new ArrayBlockingQueue<Integer>(5);// 容量為5的阻塞隊列 
 
    public static void main(String[] args) { 
        Test t = new Test(); 
        new Thread(t.new Producer()).start(); 
        new Thread(t.new Consumer()).start(); 
        new Thread(t.new Consumer()).start(); 
        new Thread(t.new Producer()).start(); 
    } 
 
    class Producer implements Runnable { 
        @Override 
        public void run() { 
            for (int i = 0; i < 5; i++) { 
                try { 
                    Thread.sleep(1000); 
                } catch (Exception e) { 
                    e.printStackTrace(); 
                } 
                try { 
                    bq.put(1); 
                    count++; 
                    System.out.println("生產者" + Thread.currentThread().getName() 
                            + "已生產完成,商品數(shù)量:" + count); 
                } catch (InterruptedException e) { 
                    e.printStackTrace(); 
                } 
            } 
        } 
    } 
 
    class Consumer implements Runnable { 
 
        @Override 
        public void run() { 
            for (int i = 0; i < 5; i++) { 
                try { 
                    Thread.sleep(1000); 
                } catch (InterruptedException e1) { 
                    e1.printStackTrace(); 
                } 
                try { 
                    bq.take(); 
                    count--; 
                    System.out.println("消費者" + Thread.currentThread().getName() 
                            + "已消費蝙眶,剩余商品數(shù)量:" + count); 
                } catch (Exception e) { 
                    e.printStackTrace(); 
                } 
            } 
        } 
    } 
}

運行結果:

生產者Thread-0已生產完成季希,商品數(shù)量:0 
消費者Thread-1已消費,剩余商品數(shù)量:0 
生產者Thread-3已生產完成幽纷,商品數(shù)量:0 
消費者Thread-2已消費式塌,剩余商品數(shù)量:0 
生產者Thread-3已生產完成,商品數(shù)量:1 
消費者Thread-2已消費友浸,剩余商品數(shù)量:1 
生產者Thread-0已生產完成峰尝,商品數(shù)量:2 
消費者Thread-1已消費,剩余商品數(shù)量:0 
生產者Thread-0已生產完成收恢,商品數(shù)量:2 
消費者Thread-1已消費武学,剩余商品數(shù)量:0 
生產者Thread-3已生產完成祭往,商品數(shù)量:2 
消費者Thread-2已消費,剩余商品數(shù)量:1 
生產者Thread-0已生產完成火窒,商品數(shù)量:1 
消費者Thread-1已消費硼补,剩余商品數(shù)量:1 
生產者Thread-3已生產完成,商品數(shù)量:2 
消費者Thread-2已消費熏矿,剩余商品數(shù)量:0 
生產者Thread-0已生產完成已骇,商品數(shù)量:0 
消費者Thread-2已消費,剩余商品數(shù)量:0 
生產者Thread-3已生產完成曲掰,商品數(shù)量:1 
消費者Thread-1已消費疾捍,剩余商品數(shù)量:0

Semaphore方法實現(xiàn)同步

信號量(Semaphore)維護了一個許可集。在許可可用前會阻塞每一個 acquire()栏妖,然后再獲取該許可乱豆。每個release()添加一個許可,從而可能釋放一個正在阻塞的獲取者吊趾。但是宛裕,不使用實際的許可對象,Semaphore只對可用許可的號碼進行計數(shù)论泛,并采取相應的行動揩尸。Semaphore通常用于限制可以訪問某些資源(物理或邏輯的)的線程數(shù)目。

注意屁奏,調用acquire()時無法保持同步鎖岩榆,因為這會阻止將項返回到池中。信號量封裝所需的同步坟瓢,以限制對池的訪問勇边,這同維持該池本身一致性所需的同步是分開的。

public class Test { 
    int count = 0; 
    final Semaphore put = new Semaphore(5);// 初始令牌個數(shù) 
    final Semaphore get = new Semaphore(0); 
    final Semaphore mutex = new Semaphore(1); 
 
    public static void main(String[] args) { 
        Test t = new Test(); 
        new Thread(t.new Producer()).start(); 
        new Thread(t.new Consumer()).start(); 
        new Thread(t.new Consumer()).start(); 
        new Thread(t.new Producer()).start(); 
    } 
 
    class Producer implements Runnable { 
        @Override 
        public void run() { 
            for (int i = 0; i < 5; i++) { 
                try { 
                    Thread.sleep(1000); 
                } catch (Exception e) { 
                    e.printStackTrace(); 
                } 
                try { 
                    put.acquire();// 注意順序 
                    mutex.acquire(); 
                    count++; 
                    System.out.println("生產者" + Thread.currentThread().getName() 
                            + "已生產完成折联,商品數(shù)量:" + count); 
                } catch (Exception e) { 
                    e.printStackTrace(); 
                } finally { 
                    mutex.release(); 
                    get.release(); 
                } 
 
            } 
        } 
    } 
 
    class Consumer implements Runnable { 
 
        @Override 
        public void run() { 
            for (int i = 0; i < 5; i++) { 
                try { 
                    Thread.sleep(1000); 
                } catch (InterruptedException e1) { 
                    e1.printStackTrace(); 
                } 
                try { 
                    get.acquire();// 注意順序 
                    mutex.acquire(); 
                    count--; 
                    System.out.println("消費者" + Thread.currentThread().getName() 
                            + "已消費粒褒,剩余商品數(shù)量:" + count); 
                } catch (Exception e) { 
                    e.printStackTrace(); 
                } finally { 
                    mutex.release(); 
                    put.release(); 
                } 
            } 
        } 
    } 
}

運行結果:

生產者Thread-0已生產完成,商品數(shù)量:1 
消費者Thread-2已消費诚镰,剩余商品數(shù)量:0 
生產者Thread-3已生產完成奕坟,商品數(shù)量:1 
消費者Thread-1已消費,剩余商品數(shù)量:0 
生產者Thread-0已生產完成清笨,商品數(shù)量:1 
生產者Thread-3已生產完成月杉,商品數(shù)量:2 
消費者Thread-2已消費,剩余商品數(shù)量:1 
消費者Thread-1已消費抠艾,剩余商品數(shù)量:0 
生產者Thread-0已生產完成沙合,商品數(shù)量:1 
生產者Thread-3已生產完成,商品數(shù)量:2 
消費者Thread-2已消費,剩余商品數(shù)量:1 
消費者Thread-1已消費首懈,剩余商品數(shù)量:0 
生產者Thread-0已生產完成绊率,商品數(shù)量:1 
生產者Thread-3已生產完成,商品數(shù)量:2 
消費者Thread-2已消費究履,剩余商品數(shù)量:1 
消費者Thread-1已消費滤否,剩余商品數(shù)量:0 
生產者Thread-0已生產完成,商品數(shù)量:1 
生產者Thread-3已生產完成最仑,商品數(shù)量:2 
消費者Thread-2已消費藐俺,剩余商品數(shù)量:1 
消費者Thread-1已消費,剩余商品數(shù)量:0

PipedInputStream / PipedOutputStream

這個類位于java.io包中泥彤,是解決同步問題的最簡單的辦法欲芹,一個線程將數(shù)據(jù)寫入管道,另一個線程從管道讀取數(shù)據(jù)吟吝,這樣便構成了一種生產者/消費者的緩沖區(qū)編程模式菱父。PipedInputStream/PipedOutputStream只能用于多線程模式,用于單線程下可能會引發(fā)死鎖剑逃。

public class Test { 
    final PipedInputStream pis = new PipedInputStream(); 
    final PipedOutputStream pos = new PipedOutputStream(); 
 
    public static void main(String[] args) { 
        Test t = new Test(); 
        new Thread(t.new Producer()).start(); 
        new Thread(t.new Consumer()).start(); 
    } 
 
    class Producer implements Runnable { 
 
        @Override 
        public void run() { 
            try { 
                pis.connect(pos); 
            } catch (IOException e) { 
                e.printStackTrace(); 
            } 
            try { 
                while (true) { // 不斷的產生數(shù)據(jù) 
                    int n = (int) (Math.random() * 255); 
                    System.out.println("生產者" + Thread.currentThread().getName() 
                            + "已生產完成浙宜,商品數(shù)量:" + n); 
                    pos.write(n); 
                    pos.flush(); 
                } 
            } catch (IOException e) { 
                e.printStackTrace(); 
            } finally { 
                try { 
                    pis.close(); 
                    pos.close(); 
                } catch (IOException e) { 
                    e.printStackTrace(); 
                } 
            } 
 
        } 
    } 
 
    class Consumer implements Runnable { 
 
        @Override 
        public void run() { 
            int n; 
            try { 
                while (true) { 
                    n = pis.read(); 
                    System.out.println("消費者" + Thread.currentThread().getName() 
                            + "已消費,剩余商品數(shù)量:" + n); 
                } 
            } catch (IOException e) { 
                e.printStackTrace(); 
            } finally { 
                try { 
                    pis.close(); 
                    pos.close(); 
                } catch (IOException e) { 
                    e.printStackTrace(); 
                } 
 
            } 
        } 
    } 
}

運行結果:

生產者Thread-0已生產完成蛹磺,商品數(shù)量:6 
生產者Thread-0已生產完成粟瞬,商品數(shù)量:158 
生產者Thread-0已生產完成,商品數(shù)量:79 
生產者Thread-0已生產完成萤捆,商品數(shù)量:119 
生產者Thread-0已生產完成裙品,商品數(shù)量:93 
生產者Thread-0已生產完成,商品數(shù)量:213 
生產者Thread-0已生產完成俗或,商品數(shù)量:151 
生產者Thread-0已生產完成市怎,商品數(shù)量:101 
生產者Thread-0已生產完成,商品數(shù)量:125 
生產者Thread-0已生產完成蕴侣,商品數(shù)量:109 
生產者Thread-0已生產完成,商品數(shù)量:67 
生產者Thread-0已生產完成臭觉,商品數(shù)量:109 
生產者Thread-0已生產完成昆雀,商品數(shù)量:132 
生產者Thread-0已生產完成,商品數(shù)量:139 
最后編輯于
?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
  • 序言:七十年代末蝠筑,一起剝皮案震驚了整個濱河市狞膘,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌什乙,老刑警劉巖挽封,帶你破解...
    沈念sama閱讀 211,042評論 6 490
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異臣镣,居然都是意外死亡辅愿,警方通過查閱死者的電腦和手機智亮,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 89,996評論 2 384
  • 文/潘曉璐 我一進店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來点待,“玉大人阔蛉,你說我怎么就攤上這事●海” “怎么了状原?”我有些...
    開封第一講書人閱讀 156,674評論 0 345
  • 文/不壞的土叔 我叫張陵,是天一觀的道長苗踪。 經常有香客問我颠区,道長,這世上最難降的妖魔是什么通铲? 我笑而不...
    開封第一講書人閱讀 56,340評論 1 283
  • 正文 為了忘掉前任毕莱,我火速辦了婚禮,結果婚禮上测暗,老公的妹妹穿的比我還像新娘央串。我一直安慰自己,他們只是感情好碗啄,可當我...
    茶點故事閱讀 65,404評論 5 384
  • 文/花漫 我一把揭開白布质和。 她就那樣靜靜地躺著,像睡著了一般稚字。 火紅的嫁衣襯著肌膚如雪饲宿。 梳的紋絲不亂的頭發(fā)上腋妙,一...
    開封第一講書人閱讀 49,749評論 1 289
  • 那天良瞧,我揣著相機與錄音,去河邊找鬼靡菇。 笑死昌讲,一個胖子當著我的面吹牛国夜,可吹牛的內容都是我干的。 我是一名探鬼主播短绸,決...
    沈念sama閱讀 38,902評論 3 405
  • 文/蒼蘭香墨 我猛地睜開眼车吹,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了醋闭?” 一聲冷哼從身側響起窄驹,我...
    開封第一講書人閱讀 37,662評論 0 266
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎证逻,沒想到半個月后乐埠,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經...
    沈念sama閱讀 44,110評論 1 303
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內容為張勛視角 年9月15日...
    茶點故事閱讀 36,451評論 2 325
  • 正文 我和宋清朗相戀三年丈咐,在試婚紗的時候發(fā)現(xiàn)自己被綠了瑞眼。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 38,577評論 1 340
  • 序言:一個原本活蹦亂跳的男人離奇死亡扯罐,死狀恐怖负拟,靈堂內的尸體忽然破棺而出,到底是詐尸還是另有隱情歹河,我是刑警寧澤掩浙,帶...
    沈念sama閱讀 34,258評論 4 328
  • 正文 年R本政府宣布,位于F島的核電站秸歧,受9級特大地震影響厨姚,放射性物質發(fā)生泄漏。R本人自食惡果不足惜键菱,卻給世界環(huán)境...
    茶點故事閱讀 39,848評論 3 312
  • 文/蒙蒙 一谬墙、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧经备,春花似錦拭抬、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,726評論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至纷闺,卻和暖如春算凿,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背犁功。 一陣腳步聲響...
    開封第一講書人閱讀 31,952評論 1 264
  • 我被黑心中介騙來泰國打工氓轰, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人浸卦。 一個月前我還...
    沈念sama閱讀 46,271評論 2 360
  • 正文 我出身青樓署鸡,卻偏偏與公主長得像,于是被迫代替她去往敵國和親限嫌。 傳聞我的和親對象是個殘疾皇子靴庆,可洞房花燭夜當晚...
    茶點故事閱讀 43,452評論 2 348

推薦閱讀更多精彩內容