Java實(shí)現(xiàn)生產(chǎn)者-消費(fèi)者模型

考查Java的并發(fā)編程時(shí)掂墓,手寫(xiě)“生產(chǎn)者-消費(fèi)者模型”是一個(gè)經(jīng)典問(wèn)題。有如下幾個(gè)考點(diǎn):

  • 對(duì)Java并發(fā)模型的理解
  • 對(duì)Java并發(fā)編程接口的熟練程度
  • bug free
  • coding style

JDK版本:oracle java 1.8.0_102

本文主要?dú)w納了4種寫(xiě)法看成,閱讀后君编,最好在白板上練習(xí)幾遍,檢查自己是否掌握川慌。這4種寫(xiě)法或者編程接口不同啦粹,或者并發(fā)粒度不同,但本質(zhì)是相同的——都是在使用或?qū)崿F(xiàn)BlockingQueue窘游。

生產(chǎn)者-消費(fèi)者模型

網(wǎng)上有很多生產(chǎn)者-消費(fèi)者模型的定義和實(shí)現(xiàn)唠椭。本文研究最常用的有界生產(chǎn)者-消費(fèi)者模型,簡(jiǎn)單概括如下:

  • 生產(chǎn)者持續(xù)生產(chǎn)忍饰,直到緩沖區(qū)滿(mǎn)贪嫂,阻塞;緩沖區(qū)不滿(mǎn)后艾蓝,繼續(xù)生產(chǎn)
  • 消費(fèi)者持續(xù)消費(fèi)力崇,直到緩沖區(qū)空,阻塞赢织;緩沖區(qū)不空后亮靴,繼續(xù)消費(fèi)
  • 生產(chǎn)者可以有多個(gè),消費(fèi)者也可以有多個(gè)

可通過(guò)如下條件驗(yàn)證模型實(shí)現(xiàn)的正確性:

  • 同一產(chǎn)品的消費(fèi)行為一定發(fā)生在生產(chǎn)行為之后
  • 任意時(shí)刻于置,緩沖區(qū)大小不小于0茧吊,不大于限制容量

該模型的應(yīng)用和變種非常多,不贅述八毯。

幾種寫(xiě)法

準(zhǔn)備

面試時(shí)可語(yǔ)言說(shuō)明以下準(zhǔn)備代碼搓侄。關(guān)鍵部分需要實(shí)現(xiàn),如AbstractConsumer话速。

下面會(huì)涉及多種生產(chǎn)者-消費(fèi)者模型的實(shí)現(xiàn)讶踪,可以先抽象出關(guān)鍵的接口,并實(shí)現(xiàn)一些抽象類(lèi):

public interface Consumer {
  void consume() throws InterruptedException;
}
public interface Producer {
  void produce() throws InterruptedException;
}
abstract class AbstractConsumer implements Consumer, Runnable {
  @Override
  public void run() {
    while (true) {
      try {
        consume();
      } catch (InterruptedException e) {
        e.printStackTrace();
        break;
      }
    }
  }
}
abstract class AbstractProducer implements Producer, Runnable {
  @Override
  public void run() {
    while (true) {
      try {
        produce();
      } catch (InterruptedException e) {
        e.printStackTrace();
        break;
      }
    }
  }
}

不同的模型實(shí)現(xiàn)中泊交,生產(chǎn)者乳讥、消費(fèi)者的具體實(shí)現(xiàn)也不同,所以需要為模型定義抽象工廠(chǎng)方法:

public interface Model {
  Runnable newRunnableConsumer();

  Runnable newRunnableProducer();
}

我們將Task作為生產(chǎn)和消費(fèi)的單位:

public class Task {
  public int no;

  public Task(int no) {
    this.no = no;
  }
}

如果需求還不明確(這符合大部分工程工作的實(shí)際情況)廓俭,建議邊實(shí)現(xiàn)邊抽象云石,不要“面向未來(lái)編程”

實(shí)現(xiàn)一:BlockingQueue

BlockingQueue的寫(xiě)法最簡(jiǎn)單白指。核心思想是留晚,把并發(fā)和容量控制封裝在緩沖區(qū)中。而B(niǎo)lockingQueue的性質(zhì)天生滿(mǎn)足這個(gè)要求告嘲。

public class BlockingQueueModel implements Model {
  private final BlockingQueue<Task> queue;

  private final AtomicInteger increTaskNo = new AtomicInteger(0);

  public BlockingQueueModel(int cap) {
    // LinkedBlockingQueue 的隊(duì)列是 lazy-init 的错维,但 ArrayBlockingQueue 在創(chuàng)建時(shí)就已經(jīng) init
    this.queue = new LinkedBlockingQueue<>(cap);
  }

  @Override
  public Runnable newRunnableConsumer() {
    return new ConsumerImpl();
  }

  @Override
  public Runnable newRunnableProducer() {
    return new ProducerImpl();
  }

  private class ConsumerImpl extends AbstractConsumer implements Consumer, Runnable {
    @Override
    public void consume() throws InterruptedException {
      Task task = queue.take();
      // 固定時(shí)間范圍的消費(fèi),模擬相對(duì)穩(wěn)定的服務(wù)器處理過(guò)程
      Thread.sleep(500 + (long) (Math.random() * 500));
      System.out.println("consume: " + task.no);
    }
  }

  private class ProducerImpl extends AbstractProducer implements Producer, Runnable {
    @Override
    public void produce() throws InterruptedException {
      // 不定期生產(chǎn)橄唬,模擬隨機(jī)的用戶(hù)請(qǐng)求
      Thread.sleep((long) (Math.random() * 1000));
      Task task = new Task(increTaskNo.getAndIncrement());
      System.out.println("produce: " + task.no);
      queue.put(task);
    }
  }

  public static void main(String[] args) {
    Model model = new BlockingQueueModel(3);
    for (int i = 0; i < 2; i++) {
      new Thread(model.newRunnableConsumer()).start();
    }
    for (int i = 0; i < 5; i++) {
      new Thread(model.newRunnableProducer()).start();
    }
  }
}

截取前面的一部分輸出:

produce: 0
produce: 4
produce: 2
produce: 3
produce: 5
consume: 0
produce: 1
consume: 4
produce: 7
consume: 2
produce: 8
consume: 3
produce: 6
consume: 5
produce: 9
consume: 1
produce: 10
consume: 7

由于操作“出隊(duì)/入隊(duì)+日志輸出”不是原子的赋焕,所以上述日志的絕對(duì)順序與實(shí)際的出隊(duì)/入隊(duì)順序有出入,但對(duì)于同一個(gè)任務(wù)號(hào)task.no仰楚,其consume日志一定出現(xiàn)在其produce日志之后隆判,即:同一任務(wù)的消費(fèi)行為一定發(fā)生在生產(chǎn)行為之后。緩沖區(qū)的容量留給讀者驗(yàn)證僧界。符合兩個(gè)驗(yàn)證條件侨嘀。

BlockingQueue寫(xiě)法的核心只有兩行代碼,并發(fā)和容量控制都封裝在了BlockingQueue中捂襟,正確性由BlockingQueue保證咬腕。面試中首選該寫(xiě)法,自然美觀簡(jiǎn)單葬荷。

勘誤:

在簡(jiǎn)書(shū)回復(fù)一個(gè)讀者的時(shí)候涨共,順道發(fā)現(xiàn)了這個(gè)問(wèn)題:生產(chǎn)日志應(yīng)放在入隊(duì)操作之前,否則同一個(gè)task的生產(chǎn)日志可能出現(xiàn)在消費(fèi)日志之后宠漩。

// 舊的錯(cuò)誤代碼
queue.put(task);
System.out.println("produce: " + task.no);
// 正確代碼
System.out.println("produce: " + task.no);
queue.put(task);

具體來(lái)說(shuō)举反,生產(chǎn)日志應(yīng)放在入隊(duì)操作之前,消費(fèi)日志應(yīng)放在出隊(duì)操作之后扒吁,以保障:

  • 消費(fèi)線(xiàn)程中queue.take()返回之后火鼻,對(duì)應(yīng)生產(chǎn)線(xiàn)程(生產(chǎn)該task的線(xiàn)程)中queue.put()及之前的行為,對(duì)于消費(fèi)線(xiàn)程來(lái)說(shuō)都是可見(jiàn)的雕崩。

想想為什么呢怀薛?因?yàn)槲覀冃枰柚皅ueue.put()與queue.take()的偏序關(guān)系”。其他實(shí)現(xiàn)方案分別借助了條件隊(duì)列褒傅、鎖的偏序關(guān)系捷绒,不存在該問(wèn)題。要解釋這個(gè)問(wèn)題捉貌,需要讀者明白可見(jiàn)性和Happens-Before的概念支鸡,篇幅所限,暫時(shí)不多解釋趁窃。

PS:舊代碼沒(méi)出現(xiàn)這個(gè)問(wèn)題牧挣,是因?yàn)橄M(fèi)者打印消費(fèi)日志之前,sleep了500+ms醒陆,而恰巧競(jìng)爭(zhēng)不激烈瀑构,這個(gè)時(shí)間一般足以讓“滯后”生產(chǎn)日志打印完成(但不保證)。


順道說(shuō)明一下刨摩,猴子現(xiàn)在主要在個(gè)人博客寺晌、簡(jiǎn)書(shū)世吨、掘金和CSDN上發(fā)文章,搜索“猴子007”或“程序猿說(shuō)你好”都能找到呻征。但個(gè)人精力有限耘婚,部分勘誤難免忘記同步到某些地方(甚至連新文章都不同步了T_T),只能保證個(gè)人博客是最新的陆赋,還望理解沐祷。

寫(xiě)文章不是為了出名,一方面希望整理自己的學(xué)習(xí)成果攒岛,一方面希望有更多人能幫助猴子糾正學(xué)習(xí)過(guò)程中的錯(cuò)誤赖临。如果能認(rèn)識(shí)一些志同道合的朋友,一起提高就更好了灾锯。所以希望各位轉(zhuǎn)載的時(shí)候兢榨,一定帶著猴子個(gè)人博客末尾的轉(zhuǎn)載聲明。需要聯(lián)系猴子的話(huà)挠进,簡(jiǎn)書(shū)或郵件都可以色乾。

文章水平不高,就不奢求有人能打賞鼓勵(lì)我這潑猴了T_T

實(shí)現(xiàn)二:wait && notify

如果不能將并發(fā)與容量控制都封裝在緩沖區(qū)中领突,就只能由消費(fèi)者與生產(chǎn)者完成暖璧。最簡(jiǎn)單的方案是使用樸素的wait && notify機(jī)制。

public class WaitNotifyModel implements Model {
  private final Object BUFFER_LOCK = new Object();
  private final Queue<Task> buffer = new LinkedList<>();
  private final int cap;

  private final AtomicInteger increTaskNo = new AtomicInteger(0);

  public WaitNotifyModel(int cap) {
    this.cap = cap;
  }

  @Override
  public Runnable newRunnableConsumer() {
    return new ConsumerImpl();
  }

  @Override
  public Runnable newRunnableProducer() {
    return new ProducerImpl();
  }

  private class ConsumerImpl extends AbstractConsumer implements Consumer, Runnable {
    @Override
    public void consume() throws InterruptedException {
      synchronized (BUFFER_LOCK) {
        while (buffer.size() == 0) {
          BUFFER_LOCK.wait();
        }
        Task task = buffer.poll();
        assert task != null;
        // 固定時(shí)間范圍的消費(fèi)君旦,模擬相對(duì)穩(wěn)定的服務(wù)器處理過(guò)程
        Thread.sleep(500 + (long) (Math.random() * 500));
        System.out.println("consume: " + task.no);
        BUFFER_LOCK.notifyAll();
      }
    }
  }

  private class ProducerImpl extends AbstractProducer implements Producer, Runnable {
    @Override
    public void produce() throws InterruptedException {
      // 不定期生產(chǎn)澎办,模擬隨機(jī)的用戶(hù)請(qǐng)求
      Thread.sleep((long) (Math.random() * 1000));
      synchronized (BUFFER_LOCK) {
        while (buffer.size() == cap) {
          BUFFER_LOCK.wait();
        }
        Task task = new Task(increTaskNo.getAndIncrement());
        buffer.offer(task);
        System.out.println("produce: " + task.no);
        BUFFER_LOCK.notifyAll();
      }
    }
  }

  public static void main(String[] args) {
    Model model = new WaitNotifyModel(3);
    for (int i = 0; i < 2; i++) {
      new Thread(model.newRunnableConsumer()).start();
    }
    for (int i = 0; i < 5; i++) {
      new Thread(model.newRunnableProducer()).start();
    }
  }
}

驗(yàn)證方法同上。

樸素的wait && notify機(jī)制不那么靈活金砍,但足夠簡(jiǎn)單局蚀。synchronized、wait恕稠、notifyAll的用法可參考【Java并發(fā)編程】之十:使用wait/notify/notifyAll實(shí)現(xiàn)線(xiàn)程間通信的幾點(diǎn)重要說(shuō)明琅绅,著重理解喚醒與鎖競(jìng)爭(zhēng)的區(qū)別

實(shí)現(xiàn)三:簡(jiǎn)單的Lock && Condition

我們要保證理解wait && notify機(jī)制鹅巍。實(shí)現(xiàn)時(shí)可以使用Object類(lèi)提供的wait()方法與notifyAll()方法千扶,但更推薦的方式是使用java.util.concurrent包提供的Lock && Condition

public class LockConditionModel1 implements Model {
  private final Lock BUFFER_LOCK = new ReentrantLock();
  private final Condition BUFFER_COND = BUFFER_LOCK.newCondition();
  private final Queue<Task> buffer = new LinkedList<>();
  private final int cap;

  private final AtomicInteger increTaskNo = new AtomicInteger(0);

  public LockConditionModel1(int cap) {
    this.cap = cap;
  }

  @Override
  public Runnable newRunnableConsumer() {
    return new ConsumerImpl();
  }

  @Override
  public Runnable newRunnableProducer() {
    return new ProducerImpl();
  }

  private class ConsumerImpl extends AbstractConsumer implements Consumer, Runnable {
    @Override
    public void consume() throws InterruptedException {
      BUFFER_LOCK.lockInterruptibly();
      try {
        while (buffer.size() == 0) {
          BUFFER_COND.await();
        }
        Task task = buffer.poll();
        assert task != null;
        // 固定時(shí)間范圍的消費(fèi)骆捧,模擬相對(duì)穩(wěn)定的服務(wù)器處理過(guò)程
        Thread.sleep(500 + (long) (Math.random() * 500));
        System.out.println("consume: " + task.no);
        BUFFER_COND.signalAll();
      } finally {
        BUFFER_LOCK.unlock();
      }
    }
  }

  private class ProducerImpl extends AbstractProducer implements Producer, Runnable {
    @Override
    public void produce() throws InterruptedException {
      // 不定期生產(chǎn)澎羞,模擬隨機(jī)的用戶(hù)請(qǐng)求
      Thread.sleep((long) (Math.random() * 1000));
      BUFFER_LOCK.lockInterruptibly();
      try {
        while (buffer.size() == cap) {
          BUFFER_COND.await();
        }
        Task task = new Task(increTaskNo.getAndIncrement());
        buffer.offer(task);
        System.out.println("produce: " + task.no);
        BUFFER_COND.signalAll();
      } finally {
        BUFFER_LOCK.unlock();
      }
    }
  }

  public static void main(String[] args) {
    Model model = new LockConditionModel1(3);
    for (int i = 0; i < 2; i++) {
      new Thread(model.newRunnableConsumer()).start();
    }
    for (int i = 0; i < 5; i++) {
      new Thread(model.newRunnableProducer()).start();
    }
  }
}

該寫(xiě)法的思路與實(shí)現(xiàn)二的思路完全相同,僅僅將鎖與條件變量換成了Lock和Condition敛苇。

實(shí)現(xiàn)四:更高并發(fā)性能的Lock && Condition

現(xiàn)在妆绞,如果做一些實(shí)驗(yàn),你會(huì)發(fā)現(xiàn),實(shí)現(xiàn)一的并發(fā)性能高于實(shí)現(xiàn)二括饶、三株茶。暫且不關(guān)心BlockingQueue的具體實(shí)現(xiàn),來(lái)分析看如何優(yōu)化實(shí)現(xiàn)三(與實(shí)現(xiàn)二的思路相同巷帝,性能相當(dāng))的性能忌卤。

分析實(shí)現(xiàn)三的瓶頸

最好的查證方法是記錄方法執(zhí)行時(shí)間扫夜,這樣可以直接定位到真正的瓶頸楞泼。但此問(wèn)題較簡(jiǎn)單,我們直接用“瞪眼法”分析笤闯。

實(shí)現(xiàn)三的并發(fā)瓶頸很明顯堕阔,因?yàn)樵阪i BUFFER_LOCK 看來(lái),任何消費(fèi)者線(xiàn)程與生產(chǎn)者線(xiàn)程都是一樣的颗味。換句話(huà)說(shuō)超陆,同一時(shí)刻,最多只允許有一個(gè)線(xiàn)程(生產(chǎn)者或消費(fèi)者浦马,二選一)操作緩沖區(qū) buffer时呀。

而實(shí)際上,如果緩沖區(qū)是一個(gè)隊(duì)列的話(huà)晶默,“生產(chǎn)者將產(chǎn)品入隊(duì)”與“消費(fèi)者將產(chǎn)品出隊(duì)”兩個(gè)操作之間沒(méi)有同步關(guān)系谨娜,可以在隊(duì)首出隊(duì)的同時(shí),在隊(duì)尾入隊(duì)磺陡。理想性能可提升至實(shí)現(xiàn)三的兩倍趴梢。

去掉這個(gè)瓶頸

那么思路就簡(jiǎn)單了:需要兩個(gè)鎖 CONSUME_LOCKPRODUCE_LOCKCONSUME_LOCK控制消費(fèi)者線(xiàn)程并發(fā)出隊(duì)币他,PRODUCE_LOCK控制生產(chǎn)者線(xiàn)程并發(fā)入隊(duì)坞靶;相應(yīng)需要兩個(gè)條件變量NOT_EMPTYNOT_FULLNOT_EMPTY負(fù)責(zé)控制消費(fèi)者線(xiàn)程的狀態(tài)(阻塞蝴悉、運(yùn)行)彰阴,NOT_FULL負(fù)責(zé)控制生產(chǎn)者線(xiàn)程的狀態(tài)(阻塞、運(yùn)行)拍冠。以此讓優(yōu)化消費(fèi)者與消費(fèi)者(或生產(chǎn)者與生產(chǎn)者)之間是串行的尿这;消費(fèi)者與生產(chǎn)者之間是并行的。

public class LockConditionModel2 implements Model {
  private final Lock CONSUME_LOCK = new ReentrantLock();
  private final Condition NOT_EMPTY = CONSUME_LOCK.newCondition();
  private final Lock PRODUCE_LOCK = new ReentrantLock();
  private final Condition NOT_FULL = PRODUCE_LOCK.newCondition();

  private final Buffer<Task> buffer = new Buffer<>();
  private AtomicInteger bufLen = new AtomicInteger(0);

  private final int cap;

  private final AtomicInteger increTaskNo = new AtomicInteger(0);

  public LockConditionModel2(int cap) {
    this.cap = cap;
  }

  @Override
  public Runnable newRunnableConsumer() {
    return new ConsumerImpl();
  }

  @Override
  public Runnable newRunnableProducer() {
    return new ProducerImpl();
  }

  private class ConsumerImpl extends AbstractConsumer implements Consumer, Runnable {
    @Override
    public void consume() throws InterruptedException {
      int newBufSize = -1;

      CONSUME_LOCK.lockInterruptibly();
      try {
        while (bufLen.get() == 0) {
          System.out.println("buffer is empty...");
          NOT_EMPTY.await();
        }
        Task task = buffer.poll();
        newBufSize = bufLen.decrementAndGet();
        assert task != null;
        // 固定時(shí)間范圍的消費(fèi)倦微,模擬相對(duì)穩(wěn)定的服務(wù)器處理過(guò)程
        Thread.sleep(500 + (long) (Math.random() * 500));
        System.out.println("consume: " + task.no);
        if (newBufSize > 0) {
          NOT_EMPTY.signalAll();
        }
      } finally {
        CONSUME_LOCK.unlock();
      }

      if (newBufSize < cap) {
        PRODUCE_LOCK.lockInterruptibly();
        try {
          NOT_FULL.signalAll();
        } finally {
          PRODUCE_LOCK.unlock();
        }
      }
    }
  }

  private class ProducerImpl extends AbstractProducer implements Producer, Runnable {
    @Override
    public void produce() throws InterruptedException {
      // 不定期生產(chǎn)妻味,模擬隨機(jī)的用戶(hù)請(qǐng)求
      Thread.sleep((long) (Math.random() * 1000));

      int newBufSize = -1;

      PRODUCE_LOCK.lockInterruptibly();
      try {
        while (bufLen.get() == cap) {
          System.out.println("buffer is full...");
          NOT_FULL.await();
        }
        Task task = new Task(increTaskNo.getAndIncrement());
        buffer.offer(task);
        newBufSize = bufLen.incrementAndGet();
        System.out.println("produce: " + task.no);
        if (newBufSize < cap) {
          NOT_FULL.signalAll();
        }
      } finally {
        PRODUCE_LOCK.unlock();
      }

      if (newBufSize > 0) {
        CONSUME_LOCK.lockInterruptibly();
        try {
          NOT_EMPTY.signalAll();
        } finally {
          CONSUME_LOCK.unlock();
        }
      }
    }
  }

  private static class Buffer<E> {
    private Node head;
    private Node tail;

    Buffer() {
      // dummy node
      head = tail = new Node(null);
    }

    public void offer(E e) {
      tail.next = new Node(e);
      tail = tail.next;
    }

    public E poll() {
      head = head.next;
      E e = head.item;
      head.item = null;
      return e;
    }

    private class Node {
      E item;
      Node next;

      Node(E item) {
        this.item = item;
      }
    }
  }

  public static void main(String[] args) {
    Model model = new LockConditionModel2(3);
    for (int i = 0; i < 2; i++) {
      new Thread(model.newRunnableConsumer()).start();
    }
    for (int i = 0; i < 5; i++) {
      new Thread(model.newRunnableProducer()).start();
    }
  }

需要注意的是,由于需要同時(shí)在UnThreadSafe的緩沖區(qū) buffer 上進(jìn)行消費(fèi)與生產(chǎn)欣福,我們不能使用實(shí)現(xiàn)二责球、三中使用的隊(duì)列了,需要自己實(shí)現(xiàn)一個(gè)簡(jiǎn)單的緩沖區(qū) Buffer。Buffer要滿(mǎn)足以下條件:

  • 在頭部出隊(duì)雏逾,尾部入隊(duì)
  • 在poll()方法中只操作head
  • 在offer()方法中只操作tail

還能進(jìn)一步優(yōu)化嗎

我們已經(jīng)優(yōu)化掉了消費(fèi)者與生產(chǎn)者之間的瓶頸嘉裤,還能進(jìn)一步優(yōu)化嗎?

如果可以栖博,必然是繼續(xù)優(yōu)化消費(fèi)者與消費(fèi)者(或生產(chǎn)者與生產(chǎn)者)之間的并發(fā)性能屑宠。然而,消費(fèi)者與消費(fèi)者之間必須是串行的仇让,因此典奉,并發(fā)模型上已經(jīng)沒(méi)有地方可以繼續(xù)優(yōu)化了。

不過(guò)在具體的業(yè)務(wù)場(chǎng)景中丧叽,一般還能夠繼續(xù)優(yōu)化卫玖。如:

  • 并發(fā)規(guī)模中等,可考慮使用CAS代替重入鎖
  • 模型上不能優(yōu)化踊淳,但一個(gè)消費(fèi)行為或許可以進(jìn)一步拆解假瞬、優(yōu)化,從而降低消費(fèi)的延遲
  • 一個(gè)隊(duì)列的并發(fā)性能達(dá)到了極限迂尝,可采用“多個(gè)隊(duì)列”(如分布式消息隊(duì)列等)

4種實(shí)現(xiàn)的本質(zhì)

文章開(kāi)頭說(shuō):這4種寫(xiě)法的本質(zhì)相同——都是在使用或?qū)崿F(xiàn)BlockingQueue脱茉。實(shí)現(xiàn)一直接使用BlockingQueue,實(shí)現(xiàn)四實(shí)現(xiàn)了簡(jiǎn)單的BlockingQueue垄开,而實(shí)現(xiàn)二琴许、三則實(shí)現(xiàn)了退化版的BlockingQueue(性能降低一半)。

實(shí)現(xiàn)一使用的BlockingQueue實(shí)現(xiàn)類(lèi)是LinkedBlockingQueue说榆,給出其源碼閱讀對(duì)照虚吟,寫(xiě)的不難:

public class LinkedBlockingQueue<E> extends AbstractQueue<E>
        implements BlockingQueue<E>, java.io.Serializable {
...
/** Lock held by take, poll, etc */
    private final ReentrantLock takeLock = new ReentrantLock();
    /** Wait queue for waiting takes */
    private final Condition notEmpty = takeLock.newCondition();
    /** Lock held by put, offer, etc */
    private final ReentrantLock putLock = new ReentrantLock();
    /** Wait queue for waiting puts */
    private final Condition notFull = putLock.newCondition();
...
    /**
     * Signals a waiting take. Called only from put/offer (which do not
     * otherwise ordinarily lock takeLock.)
     */
    private void signalNotEmpty() {
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lock();
        try {
            notEmpty.signal();
        } finally {
            takeLock.unlock();
        }
    }

    /**
     * Signals a waiting put. Called only from take/poll.
     */
    private void signalNotFull() {
        final ReentrantLock putLock = this.putLock;
        putLock.lock();
        try {
            notFull.signal();
        } finally {
            putLock.unlock();
        }
    }

    /**
     * Links node at end of queue.
     *
     * @param node the node
     */
    private void enqueue(Node<E> node) {
        // assert putLock.isHeldByCurrentThread();
        // assert last.next == null;
        last = last.next = node;
    }

    /**
     * Removes a node from head of queue.
     *
     * @return the node
     */
    private E dequeue() {
        // assert takeLock.isHeldByCurrentThread();
        // assert head.item == null;
        Node<E> h = head;
        Node<E> first = h.next;
        h.next = h; // help GC
        head = first;
        E x = first.item;
        first.item = null;
        return x;
    }
...
    /**
     * Creates a {@code LinkedBlockingQueue} with the given (fixed) capacity.
     *
     * @param capacity the capacity of this queue
     * @throws IllegalArgumentException if {@code capacity} is not greater
     *         than zero
     */
    public LinkedBlockingQueue(int capacity) {
        if (capacity <= 0) throw new IllegalArgumentException();
        this.capacity = capacity;
        last = head = new Node<E>(null);
    }
...
    /**
     * Inserts the specified element at the tail of this queue, waiting if
     * necessary for space to become available.
     *
     * @throws InterruptedException {@inheritDoc}
     * @throws NullPointerException {@inheritDoc}
     */
    public void put(E e) throws InterruptedException {
        if (e == null) throw new NullPointerException();
        // Note: convention in all put/take/etc is to preset local var
        // holding count negative to indicate failure unless set.
        int c = -1;
        Node<E> node = new Node<E>(e);
        final ReentrantLock putLock = this.putLock;
        final AtomicInteger count = this.count;
        putLock.lockInterruptibly();
        try {
            /*
             * Note that count is used in wait guard even though it is
             * not protected by lock. This works because count can
             * only decrease at this point (all other puts are shut
             * out by lock), and we (or some other waiting put) are
             * signalled if it ever changes from capacity. Similarly
             * for all other uses of count in other wait guards.
             */
            while (count.get() == capacity) {
                notFull.await();
            }
            enqueue(node);
            c = count.getAndIncrement();
            if (c + 1 < capacity)
                notFull.signal();
        } finally {
            putLock.unlock();
        }
        if (c == 0)
            signalNotEmpty();
    }
...
    public E take() throws InterruptedException {
        E x;
        int c = -1;
        final AtomicInteger count = this.count;
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lockInterruptibly();
        try {
            while (count.get() == 0) {
                notEmpty.await();
            }
            x = dequeue();
            c = count.getAndDecrement();
            if (c > 1)
                notEmpty.signal();
        } finally {
            takeLock.unlock();
        }
        if (c == capacity)
            signalNotFull();
        return x;
    }
...
}

還存在非常多的寫(xiě)法,如信號(hào)量Semaphore签财,也很常見(jiàn)(本科操作系統(tǒng)教材中的生產(chǎn)者-消費(fèi)者模型就是用信號(hào)量實(shí)現(xiàn)的)串慰。不過(guò)追究過(guò)多了就好像在糾結(jié)茴香豆的寫(xiě)法一樣,本文不繼續(xù)探討唱蒸。

總結(jié)

實(shí)現(xiàn)一必須掌握邦鲫,實(shí)現(xiàn)四至少要能清楚表述;實(shí)現(xiàn)二神汹、三掌握一個(gè)即可庆捺。


本文鏈接:Java實(shí)現(xiàn)生產(chǎn)者-消費(fèi)者模型
作者:猴子007
出處:https://monkeysayhi.github.io
本文基于 知識(shí)共享署名-相同方式共享 4.0 國(guó)際許可協(xié)議發(fā)布,歡迎轉(zhuǎn)載屁魏,演繹或用于商業(yè)目的滔以,但是必須保留本文的署名及鏈接。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末氓拼,一起剝皮案震驚了整個(gè)濱河市你画,隨后出現(xiàn)的幾起案子抵碟,更是在濱河造成了極大的恐慌,老刑警劉巖坏匪,帶你破解...
    沈念sama閱讀 211,290評(píng)論 6 491
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件拟逮,死亡現(xiàn)場(chǎng)離奇詭異,居然都是意外死亡适滓,警方通過(guò)查閱死者的電腦和手機(jī)敦迄,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,107評(píng)論 2 385
  • 文/潘曉璐 我一進(jìn)店門(mén),熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)凭迹,“玉大人罚屋,你說(shuō)我怎么就攤上這事∪锩纾” “怎么了沿后?”我有些...
    開(kāi)封第一講書(shū)人閱讀 156,872評(píng)論 0 347
  • 文/不壞的土叔 我叫張陵沿彭,是天一觀的道長(zhǎng)朽砰。 經(jīng)常有香客問(wèn)我,道長(zhǎng)喉刘,這世上最難降的妖魔是什么瞧柔? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 56,415評(píng)論 1 283
  • 正文 為了忘掉前任,我火速辦了婚禮睦裳,結(jié)果婚禮上造锅,老公的妹妹穿的比我還像新娘。我一直安慰自己廉邑,他們只是感情好哥蔚,可當(dāng)我...
    茶點(diǎn)故事閱讀 65,453評(píng)論 6 385
  • 文/花漫 我一把揭開(kāi)白布。 她就那樣靜靜地躺著蛛蒙,像睡著了一般糙箍。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上牵祟,一...
    開(kāi)封第一講書(shū)人閱讀 49,784評(píng)論 1 290
  • 那天深夯,我揣著相機(jī)與錄音,去河邊找鬼诺苹。 笑死咕晋,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的收奔。 我是一名探鬼主播掌呜,決...
    沈念sama閱讀 38,927評(píng)論 3 406
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼坪哄!你這毒婦竟也來(lái)了质蕉?” 一聲冷哼從身側(cè)響起呢撞,我...
    開(kāi)封第一講書(shū)人閱讀 37,691評(píng)論 0 266
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤,失蹤者是張志新(化名)和其女友劉穎饰剥,沒(méi)想到半個(gè)月后殊霞,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 44,137評(píng)論 1 303
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡汰蓉,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,472評(píng)論 2 326
  • 正文 我和宋清朗相戀三年绷蹲,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片顾孽。...
    茶點(diǎn)故事閱讀 38,622評(píng)論 1 340
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡祝钢,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出若厚,到底是詐尸還是另有隱情拦英,我是刑警寧澤,帶...
    沈念sama閱讀 34,289評(píng)論 4 329
  • 正文 年R本政府宣布测秸,位于F島的核電站疤估,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏霎冯。R本人自食惡果不足惜铃拇,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,887評(píng)論 3 312
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望沈撞。 院中可真熱鬧慷荔,春花似錦、人聲如沸缠俺。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 30,741評(píng)論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)壹士。三九已至磷雇,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間墓卦,已是汗流浹背倦春。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 31,977評(píng)論 1 265
  • 我被黑心中介騙來(lái)泰國(guó)打工, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留落剪,地道東北人睁本。 一個(gè)月前我還...
    沈念sama閱讀 46,316評(píng)論 2 360
  • 正文 我出身青樓,卻偏偏與公主長(zhǎng)得像忠怖,于是被迫代替她去往敵國(guó)和親呢堰。 傳聞我的和親對(duì)象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 43,490評(píng)論 2 348