前言簡介
生產(chǎn)者和消費(fèi)者問題是線程模型中的經(jīng)典問題:生產(chǎn)者和消費(fèi)者在同一時(shí)間段內(nèi)共用同一個(gè)存儲(chǔ)空間早歇,生產(chǎn)者往存儲(chǔ)空間中添加產(chǎn)品衬潦,消費(fèi)者從存儲(chǔ)空間中取走產(chǎn)品师幕,當(dāng)存儲(chǔ)空間為空時(shí)孵滞,消費(fèi)者阻塞沾歪,當(dāng)存儲(chǔ)空間滿時(shí)漂彤,生產(chǎn)者阻塞。
舉例說明:
- 你把信寫好——相當(dāng)于生產(chǎn)者制造數(shù)據(jù)
- 你把信放入郵筒——相當(dāng)于生產(chǎn)者把數(shù)據(jù)放入緩沖區(qū)
- 郵遞員把信從郵筒取出——相當(dāng)于消費(fèi)者把數(shù)據(jù)取出緩沖區(qū)
- 郵遞員把信拿去郵局做相應(yīng)的處理——相當(dāng)于消費(fèi)者處理數(shù)據(jù)
具體實(shí)現(xiàn)方式
為什么要使用生產(chǎn)者和消費(fèi)者模式
在線程世界里,生產(chǎn)者就是生產(chǎn)數(shù)據(jù)的線程挫望,消費(fèi)者就是消費(fèi)數(shù)據(jù)的線程立润。在多線程開發(fā)當(dāng)中,如果生產(chǎn)者處理速度很快媳板,而消費(fèi)者處理速度很慢桑腮,那么生產(chǎn)者就必須等待消費(fèi)者處理完,才能繼續(xù)生產(chǎn)數(shù)據(jù)蛉幸。同樣的道理破讨,如果消費(fèi)者的處理能力大于生產(chǎn)者,那么消費(fèi)者就必須等待生產(chǎn)者巨缘。為了解決這個(gè)問題于是引入了生產(chǎn)者和消費(fèi)者模式添忘。
生產(chǎn)者和消費(fèi)者問題的不同實(shí)現(xiàn)方式
1. 不完善的實(shí)現(xiàn)(會(huì)導(dǎo)致死鎖)
int itemCount = 0;//總數(shù)量
procedure producer() {//生產(chǎn)者
while (true) {
item = produceItem();//生產(chǎn)一個(gè)
if (itemCount == BUFFER_SIZE) {//生產(chǎn)滿則睡眠
sleep();
}
putItemIntoBuffer(item);//緩沖區(qū)放入一個(gè)
itemCount = itemCount + 1;
if (itemCount == 1) {
wakeup(consumer);//喚醒消費(fèi)者
}
}
}
procedure consumer() {//消費(fèi)者
while (true) {
if (itemCount == 0) {//消費(fèi)完則睡眠
sleep();
}
item = removeItemFromBuffer();//緩沖區(qū)減少一個(gè)
itemCount = itemCount - 1;
if (itemCount == BUFFER_SIZE - 1) {
wakeup(producer);//喚醒生產(chǎn)者
}
consumeItem(item);//消費(fèi)一個(gè)
}
}
上面代碼中的問題在于它可能導(dǎo)致競爭條件,進(jìn)而引發(fā)死鎖若锁「槠铮考慮下面的情形:
- 消費(fèi)者把最后一個(gè) itemCount 的內(nèi)容讀出來,注意它現(xiàn)在是零又固。消費(fèi)者返回到while的起始處仲器,現(xiàn)在進(jìn)入 if 塊;
- 就在調(diào)用sleep之前仰冠,CPU決定將時(shí)間讓給生產(chǎn)者乏冀,于是消費(fèi)者在執(zhí)行 sleep 之前就被中斷了,生產(chǎn)者開始執(zhí)行洋只;
- 生產(chǎn)者生產(chǎn)出一項(xiàng)數(shù)據(jù)后將其放入緩沖區(qū)辆沦,然后在 itemCount 上加 1;
- 由于緩沖區(qū)在上一步加 1 之前為空识虚,生產(chǎn)者嘗試喚醒消費(fèi)者肢扯;
- 遺憾的是,消費(fèi)者并沒有在休眠担锤,喚醒指令不起作用蔚晨。當(dāng)消費(fèi)者恢復(fù)執(zhí)行的時(shí)候,執(zhí)行 sleep肛循,一覺不醒铭腕。出現(xiàn)這種情況的原因在于,消費(fèi)者只能被生產(chǎn)者在 itemCount 為 1 的情況下喚醒多糠;
- 生產(chǎn)者不停地循環(huán)執(zhí)行累舷,直到緩沖區(qū)滿,隨后進(jìn)入休眠夹孔。
由于兩個(gè)線程都進(jìn)入了永遠(yuǎn)的休眠笋粟,死鎖情況出現(xiàn)了怀挠。因此,該算法是不完善的害捕。
2. 使用信號(hào)燈的算法
semaphore fillCount = 0; // 生產(chǎn)的項(xiàng)目 總存量
semaphore emptyCount = BUFFER_SIZE; // 剩余空間
procedure producer() {
while (true) {
item = produceItem();//生產(chǎn)
down(emptyCount);//減少剩余空間
putItemIntoBuffer(item);//緩沖區(qū)增加
up(fillCount);//增加存量
}
}
procedure consumer() {
while (true) {
down(fillCount);//減少存量
item = removeItemFromBuffer();//緩沖區(qū)減少
up(emptyCount);//增加剩余空間
consumeItem(item);//消費(fèi)
}
}
上述方法在只有一個(gè)生產(chǎn)者和一個(gè)消費(fèi)者時(shí)能解決問題绿淋。對(duì)于多個(gè)生產(chǎn)者或者多個(gè)消費(fèi)者共享緩沖區(qū)的情況,該算法也會(huì)導(dǎo)致競爭條件尝盼,出現(xiàn)兩個(gè)或以上的進(jìn)程同時(shí)讀或?qū)懲粋€(gè)緩沖區(qū)槽的情況吞滞。
為了解決這個(gè)問題,需要在保證同一時(shí)刻只有一個(gè)生產(chǎn)者能夠執(zhí)行 putItemIntoBuffer()盾沫。也就是說裁赠,需要尋找一種方法來互斥地執(zhí)行臨界區(qū)的代碼。為了達(dá)到這個(gè)目的赴精,可引入一個(gè)二值信號(hào)燈 mutex佩捞,其值只能為 1 或者 0。如果把線程放入 down(mutex) 和 up(mutex) 之間蕾哟,就可以限制只有一個(gè)線程能被執(zhí)行一忱。多生產(chǎn)者、消費(fèi)者的解決算法如下
semaphore mutex = 1;
semaphore fillCount = 0;
semaphore emptyCount = BUFFER_SIZE;
procedure producer() {
while (true) {
item = produceItem();
down(emptyCount);
down(mutex);//獲取鎖
putItemIntoBuffer(item);
up(mutex);//釋放鎖
up(fillCount);
}
}
procedure consumer() {
while (true) {
down(fillCount);
down(mutex);
item = removeItemFromBuffer();
up(mutex);
up(emptyCount);
consumeItem(item);
}
}
3. 使用管程的算法
monitor ProducerConsumer {
int itemCount
condition full;
condition empty;
procedure add(item) {
while (itemCount == BUFFER_SIZE)
wait(full);
putItemIntoBuffer(item);
itemCount = itemCount + 1;
if (itemCount == 1)
notify(empty);
}
procedure remove() {
while (itemCount == 0)
wait(empty);
item = removeItemFromBuffer();
itemCount = itemCount - 1;
if (itemCount == BUFFER_SIZE - 1)
notify(full);
return item;
}
}
procedure producer() {
while (true) {
item = produceItem()
ProducerConsumer.add(item)
}
}
procedure consumer() {
while (true) {
item = ProducerConsumer.remove()
consumeItem(item)
}
}
注意代碼中 while 語句的用法谭确,都是用在測(cè)試緩沖區(qū)是否已滿或空的時(shí)候帘营。當(dāng)存在多個(gè)消費(fèi)者時(shí),有可能造成競爭條件的情況是:某一消費(fèi)者在一項(xiàng)數(shù)據(jù)被放入緩沖區(qū)中時(shí)被喚醒逐哈,但是另一消費(fèi)者已經(jīng)在管程上等待了一段時(shí)間并移除了這項(xiàng)數(shù)據(jù)芬迄。如果 while 語句被改成 if,則會(huì)出現(xiàn)放入緩沖區(qū)的數(shù)據(jù)項(xiàng)過多昂秃,或移除空緩沖區(qū)中的元素的情況禀梳。
java的5種實(shí)現(xiàn)方式
1. wait()和notify()方法的實(shí)現(xiàn)
這也是最簡單最基礎(chǔ)的實(shí)現(xiàn),緩沖區(qū)滿和為空時(shí)都調(diào)用wait()方法等待肠骆,當(dāng)生產(chǎn)者生產(chǎn)了一個(gè)產(chǎn)品或者消費(fèi)者消費(fèi)了一個(gè)產(chǎn)品之后會(huì)喚醒所有線程出皇。
/**
* @author shangjing
* @date 2018/11/22 3:26 PM
* @describe wait,notify實(shí)現(xiàn)
*/
public class WaitTest {
private static int count = 0;
private static final int buffCount = 10;
private static String lock = "lock";
class Producer implements Runnable {
@Override
public void run() {
for (int i = 0; i < 10; i++) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
synchronized (lock) {
while (count == buffCount) {
try {
lock.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
count++;
System.out.println(Thread.currentThread().getName() + "-生產(chǎn)者生產(chǎn),數(shù)量為:" + count);
lock.notifyAll();
}
}
}
}
class Consumer implements Runnable {
@Override
public void run() {
for (int i = 0; i < 10; i++) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
synchronized (lock) {
while (count == 0) {
try {
lock.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
count--;
System.out.println(Thread.currentThread().getName() + "-消費(fèi)者消費(fèi)哗戈,數(shù)量為:"+ count);
lock.notifyAll();
}
}
}
}
public static void main(String[] args) {
WaitTest waitTest = new WaitTest();
new Thread(waitTest.new Producer()).start();
new Thread(waitTest.new Consumer()).start();
new Thread(waitTest.new Producer()).start();
new Thread(waitTest.new Consumer()).start();
new Thread(waitTest.new Producer()).start();
new Thread(waitTest.new Consumer()).start();
}
}
2. 可重入鎖ReentrantLock的實(shí)現(xiàn)
java.util.concurrent.lock 中的 Lock 框架是鎖定的一個(gè)抽象,通過對(duì)lock的lock()方法和unlock()方法實(shí)現(xiàn)了對(duì)鎖的顯示控制荷科,而synchronize()則是對(duì)鎖的隱性控制唯咬。
可重入鎖,也叫做遞歸鎖畏浆,指的是同一線程 外層函數(shù)獲得鎖之后 胆胰,內(nèi)層遞歸函數(shù)仍然有獲取該鎖的代碼,但不受影響刻获,簡單來說蜀涨,該鎖維護(hù)這一個(gè)與獲取鎖相關(guān)的計(jì)數(shù)器,如果擁有鎖的某個(gè)線程再次得到鎖,那么獲取計(jì)數(shù)器就加1厚柳,函數(shù)調(diào)用結(jié)束計(jì)數(shù)器就減1氧枣,然后鎖需要被釋放兩次才能獲得真正釋放。已經(jīng)獲取鎖的線程進(jìn)入其他需要相同鎖的同步代碼塊不會(huì)被阻塞别垮。
/**
* @author shangjing
* @date 2018/11/22 3:53 PM
* @describe
*/
public class LockTest {
private static int count = 0;
private static final int buffCount = 10;
private static Lock lock = new ReentrantLock();
//創(chuàng)建兩個(gè)條件變量便监,一個(gè)為緩沖區(qū)非滿,一個(gè)為緩沖區(qū)非空
private final Condition notFull = lock.newCondition();
private final Condition notEmpty = lock.newCondition();
class Producer implements Runnable {
@Override
public void run() {
for (int i = 0; i < 10; i++) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
lock.lock();
try {
while (count == buffCount) {
try {
notFull.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
count++;
System.out.println(Thread.currentThread().getName() + "-生產(chǎn)者生產(chǎn)碳想,數(shù)量為:" + count);
notEmpty.signal();
} finally {
lock.unlock();
}
}
}
}
class Consumer implements Runnable {
@Override
public void run() {
for (int i = 0; i < 10; i++) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
lock.lock();
try {
while (count == 0) {
try {
notEmpty.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
count--;
System.out.println(Thread.currentThread().getName() + "-消費(fèi)者消費(fèi)烧董,數(shù)量為:"+ count);
notFull.signal();
} finally {
lock.unlock();
}
}
}
}
public static void main(String[] args) {
LockTest lockTest = new LockTest();
new Thread(lockTest.new Producer()).start();
new Thread(lockTest.new Consumer()).start();
new Thread(lockTest.new Producer()).start();
new Thread(lockTest.new Consumer()).start();
new Thread(lockTest.new Producer()).start();
new Thread(lockTest.new Consumer()).start();
}
}
3. 阻塞隊(duì)列BlockingQueue的實(shí)現(xiàn)(最簡單)
BlockingQueue即阻塞隊(duì)列,從阻塞這個(gè)詞可以看出胧奔,在某些情況下對(duì)阻塞隊(duì)列的訪問可能會(huì)造成阻塞逊移。被阻塞的情況主要有如下兩種:
當(dāng)隊(duì)列滿了的時(shí)候進(jìn)行入隊(duì)列操作
當(dāng)隊(duì)列空了的時(shí)候進(jìn)行出隊(duì)列操作
因此,當(dāng)一個(gè)線程對(duì)已經(jīng)滿了的阻塞隊(duì)列進(jìn)行入隊(duì)操作時(shí)會(huì)阻塞龙填,除非有另外一個(gè)線程進(jìn)行了出隊(duì)操作胳泉,當(dāng)一個(gè)線程對(duì)一個(gè)空的阻塞隊(duì)列進(jìn)行出隊(duì)操作時(shí)也會(huì)阻塞,除非有另外一個(gè)線程進(jìn)行了入隊(duì)操作觅够。
從上可知胶背,阻塞隊(duì)列是線程安全的。
/**
* @author shangjing
* @date 2018/11/22 4:05 PM
* @describe
*/
public class BlockingQueueTest {
private static int count = 0;
private final BlockingQueue blockingQueue = new LinkedBlockingQueue(10);
class Producer implements Runnable {
@Override
public void run() {
for (int i = 0; i < 10; i++) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
try {
blockingQueue.put(1);
count++;
System.out.println(Thread.currentThread().getName() + "-生產(chǎn)者生產(chǎn)喘先,數(shù)量為:" + count);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
class Consumer implements Runnable {
@Override
public void run() {
for (int i = 0; i < 10; i++) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
try {
blockingQueue.take();
count--;
System.out.println(Thread.currentThread().getName() + "-消費(fèi)者消費(fèi)钳吟,數(shù)量為:"+ count);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
public static void main(String[] args) {
BlockingQueueTest blockingQueueTest = new BlockingQueueTest();
new Thread(blockingQueueTest.new Producer()).start();
new Thread(blockingQueueTest.new Consumer()).start();
new Thread(blockingQueueTest.new Producer()).start();
new Thread(blockingQueueTest.new Consumer()).start();
new Thread(blockingQueueTest.new Producer()).start();
new Thread(blockingQueueTest.new Consumer()).start();
}
}
4. 信號(hào)量Semaphore的實(shí)現(xiàn)
Semaphore(信號(hào)量)是用來控制同時(shí)訪問特定資源的線程數(shù)量,它通過協(xié)調(diào)各個(gè)線程窘拯,以保證合理的使用公共資源红且,在操作系統(tǒng)中是一個(gè)非常重要的問題,可以用來解決哲學(xué)家就餐問題涤姊。Java中的Semaphore維護(hù)了一個(gè)許可集暇番,一開始先設(shè)定這個(gè)許可集的數(shù)量,可以使用acquire()方法獲得一個(gè)許可思喊,當(dāng)許可不足時(shí)會(huì)被阻塞壁酬,release()添加一個(gè)許可。在下列代碼中恨课,還加入了另外一個(gè)mutex信號(hào)量舆乔,維護(hù)生產(chǎn)者消費(fèi)者之間的同步關(guān)系,保證生產(chǎn)者和消費(fèi)者之間的交替進(jìn)行
/**
* @author shangjing
* @date 2018/11/22 4:20 PM
* @describe
*/
public class SemaphoreTest {
private static int count = 0;
//創(chuàng)建三個(gè)信號(hào)量
private final Semaphore notFull = new Semaphore(10);
private final Semaphore notEmpty = new Semaphore(0);
private final Semaphore mutex = new Semaphore(1);
class Producer implements Runnable {
@Override
public void run() {
for (int i = 0; i < 10; i++) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
try {
notFull.acquire();//獲取許可
mutex.acquire();
count++;
System.out.println(Thread.currentThread().getName() + "-生產(chǎn)者生產(chǎn)剂公,數(shù)量為:" + count);
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
mutex.release();//釋放
notEmpty.release();
}
}
}
}
class Consumer implements Runnable {
@Override
public void run() {
for (int i = 0; i < 10; i++) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
try {
notEmpty.acquire();
mutex.acquire();
count--;
System.out.println(Thread.currentThread().getName() + "-消費(fèi)者消費(fèi)希俩,數(shù)量為:"+ count);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
mutex.release();
notFull.release();
}
}
}
}
public static void main(String[] args) {
SemaphoreTest semaphoreTest = new SemaphoreTest();
new Thread(semaphoreTest.new Producer()).start();
new Thread(semaphoreTest.new Consumer()).start();
new Thread(semaphoreTest.new Producer()).start();
new Thread(semaphoreTest.new Consumer()).start();
new Thread(semaphoreTest.new Producer()).start();
new Thread(semaphoreTest.new Consumer()).start();
}
}
5. 管道輸入輸出流PipedInputStream和PipedOutputStream實(shí)現(xiàn)
在java的io包下,PipedOutputStream和PipedInputStream分別是管道輸出流和管道輸入流纲辽。
它們的作用是讓多線程可以通過管道進(jìn)行線程間的通訊颜武。在使用管道通信時(shí)璃搜,必須將PipedOutputStream和PipedInputStream配套使用。
使用方法:先創(chuàng)建一個(gè)管道輸入流和管道輸出流鳞上,然后將輸入流和輸出流進(jìn)行連接这吻,用生產(chǎn)者線程往管道輸出流中寫入數(shù)據(jù),消費(fèi)者在管道輸入流中讀取數(shù)據(jù)因块,這樣就可以實(shí)現(xiàn)了不同線程間的相互通訊橘原,但是這種方式在生產(chǎn)者和生產(chǎn)者蛔垢、消費(fèi)者和消費(fèi)者之間不能保證同步申尤,也就是說在一個(gè)生產(chǎn)者和一個(gè)消費(fèi)者的情況下是可以生產(chǎn)者和消費(fèi)者之間交替運(yùn)行的,多個(gè)生成者和多個(gè)消費(fèi)者者之間則不行
/**
* @author shangjing
* @date 2018/11/22 4:29 PM
* @describe
*/
public class PipedTest {
private final PipedInputStream pis = new PipedInputStream();
private final PipedOutputStream pos = new PipedOutputStream();
{
try {
pis.connect(pos);
} catch (IOException e) {
e.printStackTrace();
}
}
class Producer implements Runnable {
@Override
public void run() {
try {
while(true) {
Thread.sleep(1000);
int num = (int) (Math.random() * 255);
System.out.println(Thread.currentThread().getName() + "生產(chǎn)者生產(chǎn)了一個(gè)數(shù)字噩咪,該數(shù)字為: " + num);
pos.write(num);
pos.flush();
}
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
pos.close();
pis.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
class Consumer implements Runnable {
@Override
public void run() {
try {
while(true) {
Thread.sleep(1000);
int num = pis.read();
System.out.println("消費(fèi)者消費(fèi)了一個(gè)數(shù)字吩愧,該數(shù)字為:" + num);
}
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
pos.close();
pis.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
public static void main(String[] args) {
PipedTest pipedTest = new PipedTest();
new Thread(pipedTest.new Producer()).start();
new Thread(pipedTest.new Consumer()).start();
}
}
消費(fèi)者生產(chǎn)者并行的優(yōu)化實(shí)現(xiàn)
上面的實(shí)現(xiàn)方式生產(chǎn)者和消費(fèi)者是互斥的芋酌,效率并不是最好⊙慵眩可以采用多個(gè)生產(chǎn)者(多個(gè)消費(fèi)者)串行執(zhí)行脐帝,生產(chǎn)者與消費(fèi)者之間并行執(zhí)行,提升效率糖权。
更高并發(fā)性能的Lock實(shí)現(xiàn):
需要兩個(gè)鎖 CONSUME_LOCK與PRODUCE_LOCK堵腹,CONSUME_LOCK控制消費(fèi)者線程并發(fā)出隊(duì),PRODUCE_LOCK控制生產(chǎn)者線程并發(fā)入隊(duì)星澳;相應(yīng)需要兩個(gè)條件變量NOT_EMPTY與NOT_FULL疚顷,NOT_EMPTY負(fù)責(zé)控制消費(fèi)者線程的狀態(tài)(阻塞、運(yùn)行)禁偎,NOT_FULL負(fù)責(zé)控制生產(chǎn)者線程的狀態(tài)(阻塞腿堤、運(yùn)行)。以此讓優(yōu)化消費(fèi)者與消費(fèi)者(或生產(chǎn)者與生產(chǎn)者)之間是串行的如暖;消費(fèi)者與生產(chǎn)者之間是并行的笆檀。