考查Java的并發(fā)編程時(shí)掂墓,手寫(xiě)“生產(chǎn)者-消費(fèi)者模型”是一個(gè)經(jīng)典問(wèn)題。有如下幾個(gè)考點(diǎn):
- 對(duì)Java并發(fā)模型的理解
- 對(duì)Java并發(fā)編程接口的熟練程度
- bug free
- coding style
JDK版本:oracle java 1.8.0_102
本文主要?dú)w納了4種寫(xiě)法看成,閱讀后君编,最好在白板上練習(xí)幾遍,檢查自己是否掌握川慌。這4種寫(xiě)法或者編程接口不同啦粹,或者并發(fā)粒度不同,但本質(zhì)是相同的——都是在使用或?qū)崿F(xiàn)BlockingQueue窘游。
生產(chǎn)者-消費(fèi)者模型
網(wǎng)上有很多生產(chǎn)者-消費(fèi)者模型的定義和實(shí)現(xiàn)唠椭。本文研究最常用的有界生產(chǎn)者-消費(fèi)者模型,簡(jiǎn)單概括如下:
- 生產(chǎn)者持續(xù)生產(chǎn)忍饰,直到緩沖區(qū)滿(mǎn)贪嫂,阻塞;緩沖區(qū)不滿(mǎn)后艾蓝,繼續(xù)生產(chǎn)
- 消費(fèi)者持續(xù)消費(fèi)力崇,直到緩沖區(qū)空,阻塞赢织;緩沖區(qū)不空后亮靴,繼續(xù)消費(fèi)
- 生產(chǎn)者可以有多個(gè),消費(fèi)者也可以有多個(gè)
可通過(guò)如下條件驗(yàn)證模型實(shí)現(xiàn)的正確性:
- 同一產(chǎn)品的消費(fèi)行為一定發(fā)生在生產(chǎn)行為之后
- 任意時(shí)刻于置,緩沖區(qū)大小不小于0茧吊,不大于限制容量
該模型的應(yīng)用和變種非常多,不贅述八毯。
幾種寫(xiě)法
準(zhǔn)備
面試時(shí)可語(yǔ)言說(shuō)明以下準(zhǔn)備代碼搓侄。關(guān)鍵部分需要實(shí)現(xiàn),如AbstractConsumer话速。
下面會(huì)涉及多種生產(chǎn)者-消費(fèi)者模型的實(shí)現(xiàn)讶踪,可以先抽象出關(guān)鍵的接口,并實(shí)現(xiàn)一些抽象類(lèi):
public interface Consumer {
void consume() throws InterruptedException;
}
public interface Producer {
void produce() throws InterruptedException;
}
abstract class AbstractConsumer implements Consumer, Runnable {
@Override
public void run() {
while (true) {
try {
consume();
} catch (InterruptedException e) {
e.printStackTrace();
break;
}
}
}
}
abstract class AbstractProducer implements Producer, Runnable {
@Override
public void run() {
while (true) {
try {
produce();
} catch (InterruptedException e) {
e.printStackTrace();
break;
}
}
}
}
不同的模型實(shí)現(xiàn)中泊交,生產(chǎn)者乳讥、消費(fèi)者的具體實(shí)現(xiàn)也不同,所以需要為模型定義抽象工廠(chǎng)方法:
public interface Model {
Runnable newRunnableConsumer();
Runnable newRunnableProducer();
}
我們將Task作為生產(chǎn)和消費(fèi)的單位:
public class Task {
public int no;
public Task(int no) {
this.no = no;
}
}
如果需求還不明確(這符合大部分工程工作的實(shí)際情況)廓俭,建議邊實(shí)現(xiàn)邊抽象云石,不要“面向未來(lái)編程”。
實(shí)現(xiàn)一:BlockingQueue
BlockingQueue的寫(xiě)法最簡(jiǎn)單白指。核心思想是留晚,把并發(fā)和容量控制封裝在緩沖區(qū)中。而B(niǎo)lockingQueue的性質(zhì)天生滿(mǎn)足這個(gè)要求告嘲。
public class BlockingQueueModel implements Model {
private final BlockingQueue<Task> queue;
private final AtomicInteger increTaskNo = new AtomicInteger(0);
public BlockingQueueModel(int cap) {
// LinkedBlockingQueue 的隊(duì)列是 lazy-init 的错维,但 ArrayBlockingQueue 在創(chuàng)建時(shí)就已經(jīng) init
this.queue = new LinkedBlockingQueue<>(cap);
}
@Override
public Runnable newRunnableConsumer() {
return new ConsumerImpl();
}
@Override
public Runnable newRunnableProducer() {
return new ProducerImpl();
}
private class ConsumerImpl extends AbstractConsumer implements Consumer, Runnable {
@Override
public void consume() throws InterruptedException {
Task task = queue.take();
// 固定時(shí)間范圍的消費(fèi),模擬相對(duì)穩(wěn)定的服務(wù)器處理過(guò)程
Thread.sleep(500 + (long) (Math.random() * 500));
System.out.println("consume: " + task.no);
}
}
private class ProducerImpl extends AbstractProducer implements Producer, Runnable {
@Override
public void produce() throws InterruptedException {
// 不定期生產(chǎn)橄唬,模擬隨機(jī)的用戶(hù)請(qǐng)求
Thread.sleep((long) (Math.random() * 1000));
Task task = new Task(increTaskNo.getAndIncrement());
System.out.println("produce: " + task.no);
queue.put(task);
}
}
public static void main(String[] args) {
Model model = new BlockingQueueModel(3);
for (int i = 0; i < 2; i++) {
new Thread(model.newRunnableConsumer()).start();
}
for (int i = 0; i < 5; i++) {
new Thread(model.newRunnableProducer()).start();
}
}
}
截取前面的一部分輸出:
produce: 0
produce: 4
produce: 2
produce: 3
produce: 5
consume: 0
produce: 1
consume: 4
produce: 7
consume: 2
produce: 8
consume: 3
produce: 6
consume: 5
produce: 9
consume: 1
produce: 10
consume: 7
由于操作“出隊(duì)/入隊(duì)+日志輸出”不是原子的赋焕,所以上述日志的絕對(duì)順序與實(shí)際的出隊(duì)/入隊(duì)順序有出入,但對(duì)于同一個(gè)任務(wù)號(hào)task.no
仰楚,其consume日志一定出現(xiàn)在其produce日志之后隆判,即:同一任務(wù)的消費(fèi)行為一定發(fā)生在生產(chǎn)行為之后。緩沖區(qū)的容量留給讀者驗(yàn)證僧界。符合兩個(gè)驗(yàn)證條件侨嘀。
BlockingQueue寫(xiě)法的核心只有兩行代碼,并發(fā)和容量控制都封裝在了BlockingQueue中捂襟,正確性由BlockingQueue保證咬腕。面試中首選該寫(xiě)法,自然美觀簡(jiǎn)單葬荷。
勘誤:
在簡(jiǎn)書(shū)回復(fù)一個(gè)讀者的時(shí)候涨共,順道發(fā)現(xiàn)了這個(gè)問(wèn)題:生產(chǎn)日志應(yīng)放在入隊(duì)操作之前,否則同一個(gè)task的生產(chǎn)日志可能出現(xiàn)在消費(fèi)日志之后宠漩。
// 舊的錯(cuò)誤代碼 queue.put(task); System.out.println("produce: " + task.no);
// 正確代碼 System.out.println("produce: " + task.no); queue.put(task);
具體來(lái)說(shuō)举反,生產(chǎn)日志應(yīng)放在入隊(duì)操作之前,消費(fèi)日志應(yīng)放在出隊(duì)操作之后扒吁,以保障:
- 消費(fèi)線(xiàn)程中queue.take()返回之后火鼻,對(duì)應(yīng)生產(chǎn)線(xiàn)程(生產(chǎn)該task的線(xiàn)程)中queue.put()及之前的行為,對(duì)于消費(fèi)線(xiàn)程來(lái)說(shuō)都是可見(jiàn)的雕崩。
想想為什么呢怀薛?因?yàn)槲覀冃枰柚皅ueue.put()與queue.take()的偏序關(guān)系”。其他實(shí)現(xiàn)方案分別借助了條件隊(duì)列褒傅、鎖的偏序關(guān)系捷绒,不存在該問(wèn)題。要解釋這個(gè)問(wèn)題捉貌,需要讀者明白可見(jiàn)性和Happens-Before的概念支鸡,篇幅所限,暫時(shí)不多解釋趁窃。
PS:舊代碼沒(méi)出現(xiàn)這個(gè)問(wèn)題牧挣,是因?yàn)橄M(fèi)者打印消費(fèi)日志之前,sleep了500+ms醒陆,而恰巧競(jìng)爭(zhēng)不激烈瀑构,這個(gè)時(shí)間一般足以讓“滯后”生產(chǎn)日志打印完成(但不保證)。
順道說(shuō)明一下刨摩,猴子現(xiàn)在主要在個(gè)人博客寺晌、簡(jiǎn)書(shū)世吨、掘金和CSDN上發(fā)文章,搜索“猴子007”或“程序猿說(shuō)你好”都能找到呻征。但個(gè)人精力有限耘婚,部分勘誤難免忘記同步到某些地方(甚至連新文章都不同步了T_T),只能保證個(gè)人博客是最新的陆赋,還望理解沐祷。
寫(xiě)文章不是為了出名,一方面希望整理自己的學(xué)習(xí)成果攒岛,一方面希望有更多人能幫助猴子糾正學(xué)習(xí)過(guò)程中的錯(cuò)誤赖临。如果能認(rèn)識(shí)一些志同道合的朋友,一起提高就更好了灾锯。所以希望各位轉(zhuǎn)載的時(shí)候兢榨,一定帶著猴子個(gè)人博客末尾的轉(zhuǎn)載聲明。需要聯(lián)系猴子的話(huà)挠进,簡(jiǎn)書(shū)或郵件都可以色乾。
文章水平不高,就不奢求有人能打賞鼓勵(lì)我這潑猴了T_T
實(shí)現(xiàn)二:wait && notify
如果不能將并發(fā)與容量控制都封裝在緩沖區(qū)中领突,就只能由消費(fèi)者與生產(chǎn)者完成暖璧。最簡(jiǎn)單的方案是使用樸素的wait && notify
機(jī)制。
public class WaitNotifyModel implements Model {
private final Object BUFFER_LOCK = new Object();
private final Queue<Task> buffer = new LinkedList<>();
private final int cap;
private final AtomicInteger increTaskNo = new AtomicInteger(0);
public WaitNotifyModel(int cap) {
this.cap = cap;
}
@Override
public Runnable newRunnableConsumer() {
return new ConsumerImpl();
}
@Override
public Runnable newRunnableProducer() {
return new ProducerImpl();
}
private class ConsumerImpl extends AbstractConsumer implements Consumer, Runnable {
@Override
public void consume() throws InterruptedException {
synchronized (BUFFER_LOCK) {
while (buffer.size() == 0) {
BUFFER_LOCK.wait();
}
Task task = buffer.poll();
assert task != null;
// 固定時(shí)間范圍的消費(fèi)君旦,模擬相對(duì)穩(wěn)定的服務(wù)器處理過(guò)程
Thread.sleep(500 + (long) (Math.random() * 500));
System.out.println("consume: " + task.no);
BUFFER_LOCK.notifyAll();
}
}
}
private class ProducerImpl extends AbstractProducer implements Producer, Runnable {
@Override
public void produce() throws InterruptedException {
// 不定期生產(chǎn)澎办,模擬隨機(jī)的用戶(hù)請(qǐng)求
Thread.sleep((long) (Math.random() * 1000));
synchronized (BUFFER_LOCK) {
while (buffer.size() == cap) {
BUFFER_LOCK.wait();
}
Task task = new Task(increTaskNo.getAndIncrement());
buffer.offer(task);
System.out.println("produce: " + task.no);
BUFFER_LOCK.notifyAll();
}
}
}
public static void main(String[] args) {
Model model = new WaitNotifyModel(3);
for (int i = 0; i < 2; i++) {
new Thread(model.newRunnableConsumer()).start();
}
for (int i = 0; i < 5; i++) {
new Thread(model.newRunnableProducer()).start();
}
}
}
驗(yàn)證方法同上。
樸素的wait && notify
機(jī)制不那么靈活金砍,但足夠簡(jiǎn)單局蚀。synchronized、wait恕稠、notifyAll的用法可參考【Java并發(fā)編程】之十:使用wait/notify/notifyAll實(shí)現(xiàn)線(xiàn)程間通信的幾點(diǎn)重要說(shuō)明琅绅,著重理解喚醒與鎖競(jìng)爭(zhēng)的區(qū)別。
實(shí)現(xiàn)三:簡(jiǎn)單的Lock && Condition
我們要保證理解wait && notify
機(jī)制鹅巍。實(shí)現(xiàn)時(shí)可以使用Object類(lèi)提供的wait()方法與notifyAll()方法千扶,但更推薦的方式是使用java.util.concurrent包提供的Lock && Condition
。
public class LockConditionModel1 implements Model {
private final Lock BUFFER_LOCK = new ReentrantLock();
private final Condition BUFFER_COND = BUFFER_LOCK.newCondition();
private final Queue<Task> buffer = new LinkedList<>();
private final int cap;
private final AtomicInteger increTaskNo = new AtomicInteger(0);
public LockConditionModel1(int cap) {
this.cap = cap;
}
@Override
public Runnable newRunnableConsumer() {
return new ConsumerImpl();
}
@Override
public Runnable newRunnableProducer() {
return new ProducerImpl();
}
private class ConsumerImpl extends AbstractConsumer implements Consumer, Runnable {
@Override
public void consume() throws InterruptedException {
BUFFER_LOCK.lockInterruptibly();
try {
while (buffer.size() == 0) {
BUFFER_COND.await();
}
Task task = buffer.poll();
assert task != null;
// 固定時(shí)間范圍的消費(fèi)骆捧,模擬相對(duì)穩(wěn)定的服務(wù)器處理過(guò)程
Thread.sleep(500 + (long) (Math.random() * 500));
System.out.println("consume: " + task.no);
BUFFER_COND.signalAll();
} finally {
BUFFER_LOCK.unlock();
}
}
}
private class ProducerImpl extends AbstractProducer implements Producer, Runnable {
@Override
public void produce() throws InterruptedException {
// 不定期生產(chǎn)澎羞,模擬隨機(jī)的用戶(hù)請(qǐng)求
Thread.sleep((long) (Math.random() * 1000));
BUFFER_LOCK.lockInterruptibly();
try {
while (buffer.size() == cap) {
BUFFER_COND.await();
}
Task task = new Task(increTaskNo.getAndIncrement());
buffer.offer(task);
System.out.println("produce: " + task.no);
BUFFER_COND.signalAll();
} finally {
BUFFER_LOCK.unlock();
}
}
}
public static void main(String[] args) {
Model model = new LockConditionModel1(3);
for (int i = 0; i < 2; i++) {
new Thread(model.newRunnableConsumer()).start();
}
for (int i = 0; i < 5; i++) {
new Thread(model.newRunnableProducer()).start();
}
}
}
該寫(xiě)法的思路與實(shí)現(xiàn)二的思路完全相同,僅僅將鎖與條件變量換成了Lock和Condition敛苇。
實(shí)現(xiàn)四:更高并發(fā)性能的Lock && Condition
現(xiàn)在妆绞,如果做一些實(shí)驗(yàn),你會(huì)發(fā)現(xiàn),實(shí)現(xiàn)一的并發(fā)性能高于實(shí)現(xiàn)二括饶、三株茶。暫且不關(guān)心BlockingQueue的具體實(shí)現(xiàn),來(lái)分析看如何優(yōu)化實(shí)現(xiàn)三(與實(shí)現(xiàn)二的思路相同巷帝,性能相當(dāng))的性能忌卤。
分析實(shí)現(xiàn)三的瓶頸
最好的查證方法是記錄方法執(zhí)行時(shí)間扫夜,這樣可以直接定位到真正的瓶頸楞泼。但此問(wèn)題較簡(jiǎn)單,我們直接用“瞪眼法”分析笤闯。
實(shí)現(xiàn)三的并發(fā)瓶頸很明顯堕阔,因?yàn)樵阪i BUFFER_LOCK
看來(lái),任何消費(fèi)者線(xiàn)程與生產(chǎn)者線(xiàn)程都是一樣的颗味。換句話(huà)說(shuō)超陆,同一時(shí)刻,最多只允許有一個(gè)線(xiàn)程(生產(chǎn)者或消費(fèi)者浦马,二選一)操作緩沖區(qū) buffer时呀。
而實(shí)際上,如果緩沖區(qū)是一個(gè)隊(duì)列的話(huà)晶默,“生產(chǎn)者將產(chǎn)品入隊(duì)”與“消費(fèi)者將產(chǎn)品出隊(duì)”兩個(gè)操作之間沒(méi)有同步關(guān)系谨娜,可以在隊(duì)首出隊(duì)的同時(shí),在隊(duì)尾入隊(duì)磺陡。理想性能可提升至實(shí)現(xiàn)三的兩倍趴梢。
去掉這個(gè)瓶頸
那么思路就簡(jiǎn)單了:需要兩個(gè)鎖 CONSUME_LOCK
與PRODUCE_LOCK
,CONSUME_LOCK
控制消費(fèi)者線(xiàn)程并發(fā)出隊(duì)币他,PRODUCE_LOCK
控制生產(chǎn)者線(xiàn)程并發(fā)入隊(duì)坞靶;相應(yīng)需要兩個(gè)條件變量NOT_EMPTY
與NOT_FULL
,NOT_EMPTY
負(fù)責(zé)控制消費(fèi)者線(xiàn)程的狀態(tài)(阻塞蝴悉、運(yùn)行)彰阴,NOT_FULL
負(fù)責(zé)控制生產(chǎn)者線(xiàn)程的狀態(tài)(阻塞、運(yùn)行)拍冠。以此讓優(yōu)化消費(fèi)者與消費(fèi)者(或生產(chǎn)者與生產(chǎn)者)之間是串行的尿这;消費(fèi)者與生產(chǎn)者之間是并行的。
public class LockConditionModel2 implements Model {
private final Lock CONSUME_LOCK = new ReentrantLock();
private final Condition NOT_EMPTY = CONSUME_LOCK.newCondition();
private final Lock PRODUCE_LOCK = new ReentrantLock();
private final Condition NOT_FULL = PRODUCE_LOCK.newCondition();
private final Buffer<Task> buffer = new Buffer<>();
private AtomicInteger bufLen = new AtomicInteger(0);
private final int cap;
private final AtomicInteger increTaskNo = new AtomicInteger(0);
public LockConditionModel2(int cap) {
this.cap = cap;
}
@Override
public Runnable newRunnableConsumer() {
return new ConsumerImpl();
}
@Override
public Runnable newRunnableProducer() {
return new ProducerImpl();
}
private class ConsumerImpl extends AbstractConsumer implements Consumer, Runnable {
@Override
public void consume() throws InterruptedException {
int newBufSize = -1;
CONSUME_LOCK.lockInterruptibly();
try {
while (bufLen.get() == 0) {
System.out.println("buffer is empty...");
NOT_EMPTY.await();
}
Task task = buffer.poll();
newBufSize = bufLen.decrementAndGet();
assert task != null;
// 固定時(shí)間范圍的消費(fèi)倦微,模擬相對(duì)穩(wěn)定的服務(wù)器處理過(guò)程
Thread.sleep(500 + (long) (Math.random() * 500));
System.out.println("consume: " + task.no);
if (newBufSize > 0) {
NOT_EMPTY.signalAll();
}
} finally {
CONSUME_LOCK.unlock();
}
if (newBufSize < cap) {
PRODUCE_LOCK.lockInterruptibly();
try {
NOT_FULL.signalAll();
} finally {
PRODUCE_LOCK.unlock();
}
}
}
}
private class ProducerImpl extends AbstractProducer implements Producer, Runnable {
@Override
public void produce() throws InterruptedException {
// 不定期生產(chǎn)妻味,模擬隨機(jī)的用戶(hù)請(qǐng)求
Thread.sleep((long) (Math.random() * 1000));
int newBufSize = -1;
PRODUCE_LOCK.lockInterruptibly();
try {
while (bufLen.get() == cap) {
System.out.println("buffer is full...");
NOT_FULL.await();
}
Task task = new Task(increTaskNo.getAndIncrement());
buffer.offer(task);
newBufSize = bufLen.incrementAndGet();
System.out.println("produce: " + task.no);
if (newBufSize < cap) {
NOT_FULL.signalAll();
}
} finally {
PRODUCE_LOCK.unlock();
}
if (newBufSize > 0) {
CONSUME_LOCK.lockInterruptibly();
try {
NOT_EMPTY.signalAll();
} finally {
CONSUME_LOCK.unlock();
}
}
}
}
private static class Buffer<E> {
private Node head;
private Node tail;
Buffer() {
// dummy node
head = tail = new Node(null);
}
public void offer(E e) {
tail.next = new Node(e);
tail = tail.next;
}
public E poll() {
head = head.next;
E e = head.item;
head.item = null;
return e;
}
private class Node {
E item;
Node next;
Node(E item) {
this.item = item;
}
}
}
public static void main(String[] args) {
Model model = new LockConditionModel2(3);
for (int i = 0; i < 2; i++) {
new Thread(model.newRunnableConsumer()).start();
}
for (int i = 0; i < 5; i++) {
new Thread(model.newRunnableProducer()).start();
}
}
需要注意的是,由于需要同時(shí)在UnThreadSafe的緩沖區(qū) buffer 上進(jìn)行消費(fèi)與生產(chǎn)欣福,我們不能使用實(shí)現(xiàn)二责球、三中使用的隊(duì)列了,需要自己實(shí)現(xiàn)一個(gè)簡(jiǎn)單的緩沖區(qū) Buffer。Buffer要滿(mǎn)足以下條件:
- 在頭部出隊(duì)雏逾,尾部入隊(duì)
- 在poll()方法中只操作head
- 在offer()方法中只操作tail
還能進(jìn)一步優(yōu)化嗎
我們已經(jīng)優(yōu)化掉了消費(fèi)者與生產(chǎn)者之間的瓶頸嘉裤,還能進(jìn)一步優(yōu)化嗎?
如果可以栖博,必然是繼續(xù)優(yōu)化消費(fèi)者與消費(fèi)者(或生產(chǎn)者與生產(chǎn)者)之間的并發(fā)性能屑宠。然而,消費(fèi)者與消費(fèi)者之間必須是串行的仇让,因此典奉,并發(fā)模型上已經(jīng)沒(méi)有地方可以繼續(xù)優(yōu)化了。
不過(guò)在具體的業(yè)務(wù)場(chǎng)景中丧叽,一般還能夠繼續(xù)優(yōu)化卫玖。如:
- 并發(fā)規(guī)模中等,可考慮使用CAS代替重入鎖
- 模型上不能優(yōu)化踊淳,但一個(gè)消費(fèi)行為或許可以進(jìn)一步拆解假瞬、優(yōu)化,從而降低消費(fèi)的延遲
- 一個(gè)隊(duì)列的并發(fā)性能達(dá)到了極限迂尝,可采用“多個(gè)隊(duì)列”(如分布式消息隊(duì)列等)
4種實(shí)現(xiàn)的本質(zhì)
文章開(kāi)頭說(shuō):這4種寫(xiě)法的本質(zhì)相同——都是在使用或?qū)崿F(xiàn)BlockingQueue脱茉。實(shí)現(xiàn)一直接使用BlockingQueue,實(shí)現(xiàn)四實(shí)現(xiàn)了簡(jiǎn)單的BlockingQueue垄开,而實(shí)現(xiàn)二琴许、三則實(shí)現(xiàn)了退化版的BlockingQueue(性能降低一半)。
實(shí)現(xiàn)一使用的BlockingQueue實(shí)現(xiàn)類(lèi)是LinkedBlockingQueue说榆,給出其源碼閱讀對(duì)照虚吟,寫(xiě)的不難:
public class LinkedBlockingQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable {
...
/** Lock held by take, poll, etc */
private final ReentrantLock takeLock = new ReentrantLock();
/** Wait queue for waiting takes */
private final Condition notEmpty = takeLock.newCondition();
/** Lock held by put, offer, etc */
private final ReentrantLock putLock = new ReentrantLock();
/** Wait queue for waiting puts */
private final Condition notFull = putLock.newCondition();
...
/**
* Signals a waiting take. Called only from put/offer (which do not
* otherwise ordinarily lock takeLock.)
*/
private void signalNotEmpty() {
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
notEmpty.signal();
} finally {
takeLock.unlock();
}
}
/**
* Signals a waiting put. Called only from take/poll.
*/
private void signalNotFull() {
final ReentrantLock putLock = this.putLock;
putLock.lock();
try {
notFull.signal();
} finally {
putLock.unlock();
}
}
/**
* Links node at end of queue.
*
* @param node the node
*/
private void enqueue(Node<E> node) {
// assert putLock.isHeldByCurrentThread();
// assert last.next == null;
last = last.next = node;
}
/**
* Removes a node from head of queue.
*
* @return the node
*/
private E dequeue() {
// assert takeLock.isHeldByCurrentThread();
// assert head.item == null;
Node<E> h = head;
Node<E> first = h.next;
h.next = h; // help GC
head = first;
E x = first.item;
first.item = null;
return x;
}
...
/**
* Creates a {@code LinkedBlockingQueue} with the given (fixed) capacity.
*
* @param capacity the capacity of this queue
* @throws IllegalArgumentException if {@code capacity} is not greater
* than zero
*/
public LinkedBlockingQueue(int capacity) {
if (capacity <= 0) throw new IllegalArgumentException();
this.capacity = capacity;
last = head = new Node<E>(null);
}
...
/**
* Inserts the specified element at the tail of this queue, waiting if
* necessary for space to become available.
*
* @throws InterruptedException {@inheritDoc}
* @throws NullPointerException {@inheritDoc}
*/
public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
// Note: convention in all put/take/etc is to preset local var
// holding count negative to indicate failure unless set.
int c = -1;
Node<E> node = new Node<E>(e);
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
putLock.lockInterruptibly();
try {
/*
* Note that count is used in wait guard even though it is
* not protected by lock. This works because count can
* only decrease at this point (all other puts are shut
* out by lock), and we (or some other waiting put) are
* signalled if it ever changes from capacity. Similarly
* for all other uses of count in other wait guards.
*/
while (count.get() == capacity) {
notFull.await();
}
enqueue(node);
c = count.getAndIncrement();
if (c + 1 < capacity)
notFull.signal();
} finally {
putLock.unlock();
}
if (c == 0)
signalNotEmpty();
}
...
public E take() throws InterruptedException {
E x;
int c = -1;
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();
try {
while (count.get() == 0) {
notEmpty.await();
}
x = dequeue();
c = count.getAndDecrement();
if (c > 1)
notEmpty.signal();
} finally {
takeLock.unlock();
}
if (c == capacity)
signalNotFull();
return x;
}
...
}
還存在非常多的寫(xiě)法,如信號(hào)量
Semaphore
签财,也很常見(jiàn)(本科操作系統(tǒng)教材中的生產(chǎn)者-消費(fèi)者模型就是用信號(hào)量實(shí)現(xiàn)的)串慰。不過(guò)追究過(guò)多了就好像在糾結(jié)茴香豆的寫(xiě)法一樣,本文不繼續(xù)探討唱蒸。
總結(jié)
實(shí)現(xiàn)一必須掌握邦鲫,實(shí)現(xiàn)四至少要能清楚表述;實(shí)現(xiàn)二神汹、三掌握一個(gè)即可庆捺。
本文鏈接:Java實(shí)現(xiàn)生產(chǎn)者-消費(fèi)者模型
作者:猴子007
出處:https://monkeysayhi.github.io
本文基于 知識(shí)共享署名-相同方式共享 4.0 國(guó)際許可協(xié)議發(fā)布,歡迎轉(zhuǎn)載屁魏,演繹或用于商業(yè)目的滔以,但是必須保留本文的署名及鏈接。