生產(chǎn)者消費(fèi)者模式介紹
生產(chǎn)者消費(fèi)者模式是通過一個(gè)容器來解決生產(chǎn)者和消費(fèi)者的強(qiáng)耦合問題旬痹。生產(chǎn)者和消費(fèi)者彼此之間不直接通訊,而通過阻塞隊(duì)列來進(jìn)行通訊讨越,所以生產(chǎn)者生產(chǎn)完數(shù)據(jù)之后不用等待消費(fèi)者處理两残,直接扔給阻塞隊(duì)列,消費(fèi)者不找生產(chǎn)者要數(shù)據(jù)谎痢,而是直接從阻塞隊(duì)列里取磕昼,阻塞隊(duì)列就相當(dāng)于一個(gè)緩沖區(qū),平衡了生產(chǎn)者和消費(fèi)者的處理能力节猿。
這個(gè)阻塞隊(duì)列就是用來給生產(chǎn)者和消費(fèi)者解耦的票从。阻塞隊(duì)列如何實(shí)現(xiàn)高并發(fā)多線程安全也是生產(chǎn)者消費(fèi)者模式中的核心關(guān)鍵。
在日常開發(fā)過程中滨嘱,我們常常會(huì)遇到一些高并發(fā)場景峰鄙,例如很多秒殺場景,其實(shí)真實(shí)的秒殺場景會(huì)很復(fù)雜太雨,這里只是簡單描述下秒殺場景下的生產(chǎn)者消費(fèi)者模式吟榴,在秒殺場景下生產(chǎn)者是普通參與秒殺的用戶,消費(fèi)者是秒殺系統(tǒng)囊扳,通常來說這樣的場景下秒殺用戶是非常多的吩翻,如果系統(tǒng)采用常規(guī)的實(shí)時(shí)同步交易,那么勢必造成系統(tǒng)處理線程池被瞬間占滿锥咸,后續(xù)請求全部被丟棄狭瞎,這樣造成的用戶體驗(yàn)是非常差的,而且系統(tǒng)可能會(huì)出現(xiàn)快速雪崩搏予。在多線程開發(fā)當(dāng)中熊锭,如果生產(chǎn)者處理速度很快,而消費(fèi)者處理速度很慢,那么生產(chǎn)者就必須等待消費(fèi)者處理完碗殷,才能繼續(xù)生產(chǎn)數(shù)據(jù)精绎。同樣的道理,如果消費(fèi)者的處理能力大于生產(chǎn)者锌妻,那么消費(fèi)者就必須等待生產(chǎn)者代乃。為了解決這個(gè)問題于是引入了生產(chǎn)者和消費(fèi)者模式。秒殺場景是個(gè)非常適合生產(chǎn)者从祝、消費(fèi)者的模式襟己,通過中間的緩沖隊(duì)列解決秒殺請求接收和秒殺處理兩者之間的處理時(shí)間差,并且能夠在過程中通過反欺詐和公平算法來保證消費(fèi)者的公平利益牍陌。下面我們就來用java實(shí)現(xiàn)下生產(chǎn)者和消費(fèi)者模式擎浴,這里實(shí)現(xiàn)的是可以直接用于生產(chǎn)環(huán)境的架構(gòu),并不是簡單的使用Queue做個(gè)簡單的進(jìn)棧出棧毒涧,而是通過java.util.concurrent下的相關(guān)類實(shí)現(xiàn)生產(chǎn)者消費(fèi)者模式的多線程方案贮预。
Java實(shí)現(xiàn)生產(chǎn)者消費(fèi)者模式
隊(duì)列的特性:先進(jìn)先出(FIFO)—先進(jìn)入隊(duì)列的元素先出隊(duì)列(可以理解為我們生活中的排隊(duì)情況,早辦完契讲,早滾蛋)仿吞。生產(chǎn)者(Producer)往隊(duì)列里發(fā)布(publish)事件捡偏,消費(fèi)者(Consumer)獲得通知唤冈,消費(fèi)事件;如果隊(duì)列中沒有事件時(shí)银伟,消費(fèi)者堵塞你虹,直到生產(chǎn)者發(fā)布了新事件。
說到隊(duì)列彤避,那就不得不提到Java中的concurrent包傅物,其主要實(shí)現(xiàn)包括ArrayBlockingQueue、LinkedBlockingQueue琉预、ConcurrentLinkedQueue董饰、LinkedTransferQueue。下面圆米,簡單介紹下:
ArrayBlockingQueue:基于數(shù)組形式的隊(duì)列卒暂,通過加鎖的方式,來保證多線程情況下數(shù)據(jù)的安全娄帖;
LinkedBlockingQueue:基于鏈表形式的隊(duì)列也祠,也通過加鎖的方式,來保證多線程情況下數(shù)據(jù)的安全块茁;
ConcurrentLinkedQueue:基于鏈表形式的隊(duì)列齿坷,通過compare and swap(簡稱CAS)協(xié)議的方式,來保證多線程情況下數(shù)據(jù)的安全数焊,不加鎖永淌,主要使用了Java中的sun.misc.Unsafe類來實(shí)現(xiàn);
LinkedTransferQueue:同上佩耳;
因?yàn)長inkedBlockingQueue采用了樂觀鎖方案遂蛀,所以性能是非常高的,下面我們就用LinkedBlockingQueue作為隊(duì)列緩沖區(qū)來實(shí)現(xiàn)生產(chǎn)者消費(fèi)者模式干厚。
待處理數(shù)據(jù)類
首先我們需要實(shí)現(xiàn)一個(gè)緩沖隊(duì)列中的待處理類李滴,這里例子中實(shí)現(xiàn)的比較簡單,只是設(shè)置了一個(gè)int類型的變量蛮瞄,重寫了構(gòu)造函數(shù)并定義了get方法所坯,大家可以根據(jù)自己的需要定義相關(guān)的內(nèi)容。
public class PCData {
private final int intData;
public PCData(int intData) {
this.intData = intData;
}
public int getIntData() {
return intData;
}
@Override
public String toString() {
return "PCData{" +
"intData=" + intData +
'}';
}
}
生產(chǎn)者類
下面我們定義生產(chǎn)者類挂捅,在生產(chǎn)者類中需要定義一個(gè)緩沖隊(duì)列芹助,這里使用了剛才提到的BlockingDeque。
private BlockingDeque<PCData> queue;
生產(chǎn)者中還需要再定義一個(gè)靜態(tài)的AtomicInteger類型的對象闲先,用于多線程中共享數(shù)據(jù)状土,用于生成PCData,為什么使用AtomicInteger類型伺糠,是因?yàn)锳tomicInteger類型已經(jīng)實(shí)現(xiàn)了線程安全的自增功能蒙谓,在實(shí)際項(xiàng)目使用過程中,這個(gè)值可能是UUID或者其他的全局唯一的數(shù)值训桶。
private static AtomicInteger count = new AtomicInteger();
還需要重寫構(gòu)造方法累驮,在生成生產(chǎn)者的時(shí)候使用同一個(gè)緩沖隊(duì)列,來保證生產(chǎn)者和開發(fā)者都使用一樣的隊(duì)列渊迁,在實(shí)際項(xiàng)目中也可以定一個(gè)全局的隊(duì)列慰照,來保證所有的生產(chǎn)者和消費(fèi)者都使用同一個(gè)對列。
//定義入?yún)锽lockingQueue的構(gòu)造函數(shù)
public Producer(BlockingDeque<PCData> queue){
this.queue = queue;
}
生產(chǎn)者的核心方法中主要實(shí)現(xiàn)了創(chuàng)建PCData類并將該待處理對象放入緩沖隊(duì)列中琉朽,這里為了模擬處理耗時(shí)毒租,sleep了1秒鐘,所有繼承子BlockingDeque的隊(duì)列類都實(shí)現(xiàn)了offer方法箱叁,該方法主要是將待處理對象放入緩沖隊(duì)列中墅垮,這樣生產(chǎn)者就完成了生產(chǎn)者的基本工作,創(chuàng)建待處理類對象耕漱,并將其放入隊(duì)列算色。
Thread.sleep(1000);
data = new PCData(count.incrementAndGet());
queue.offer(data);
下面是整個(gè)Producer的代碼:
import java.util.concurrent.BlockingDeque;
import java.util.concurrent.atomic.AtomicInteger;
/**
* @Author: feiweiwei
* @Description: 生產(chǎn)者
* @Created Date: 17:14 17/9/10.
* @Modify by:
*/
public class Producer implements Runnable {
private volatile boolean isrunning = true;
//內(nèi)存緩沖隊(duì)列
private BlockingDeque<PCData> queue;
private static AtomicInteger count = new AtomicInteger();
//定義入?yún)锽lockingQueue的構(gòu)造函數(shù)
public Producer(BlockingDeque<PCData> queue){
this.queue = queue;
}
public void stop(){
this.isrunning = false;
}
@Override
public void run() {
PCData data = null;
System.out.println("producer id = " + Thread.currentThread().getId());
while (isrunning) {
try {
Thread.sleep(1000);
data = new PCData(count.incrementAndGet());
queue.offer(data);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
消費(fèi)者類
消費(fèi)者類的核心工作就是將待處理數(shù)據(jù)從緩沖隊(duì)列中取出,并處理螟够。
在消費(fèi)者類中同樣有個(gè)BlockingDeque<PCData>的對象灾梦,同樣也是在創(chuàng)建消費(fèi)者類的時(shí)候從外部傳入峡钓,這樣可以保證所有生產(chǎn)者和消費(fèi)者使用一樣的隊(duì)列。
在核心處理邏輯中通過BlockingDeque的take方法取出待處理對象若河,然后就可以對該對象進(jìn)行處理了能岩,調(diào)用take方法后,該待處理對象也自動(dòng)從queue中彈出萧福。
下面是消費(fèi)者實(shí)現(xiàn)代碼:
import java.util.concurrent.BlockingDeque;
/**
* @Author: feiweiwei
* @Description: 消費(fèi)者類
* @Created Date: 17:26 17/9/10.
* @Modify by:
*/
public class Customer implements Runnable {
private BlockingDeque<PCData> queue;
private volatile boolean isrunning = true;
//定義入?yún)锽lockingQueue的構(gòu)造函數(shù)
public Customer(BlockingDeque<PCData> queue){
this.queue = queue;
}
public void stop(){
this.isrunning = false;
}
@Override
public void run() {
System.out.println("customer id = " + Thread.currentThread().getId());
while (isrunning){
try {
PCData data = queue.take();
if ( null != data){
int re = data.getIntData() * data.getIntData();
Thread.sleep(1000);
System.out.println(Thread.currentThread().getId() + " data is " + re + "done!");
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
主調(diào)用Main
在主調(diào)用Main中拉鹃,我們先創(chuàng)建一個(gè)隊(duì)列長度為10的LinkedBlockingDeque對象作為緩沖隊(duì)列。
BlockingDeque<PCData> queue = new LinkedBlockingDeque<PCData>(10);
再分別創(chuàng)建10個(gè)生產(chǎn)者對象和2個(gè)消費(fèi)者鲫忍,并將剛才創(chuàng)建的queue對象作為構(gòu)造函數(shù)入?yún)ⅰ?/p>
Producer[] producers = new Producer[10];
Customer[] customers = new Customer[2];
for(int i=0; i<10; i++){
producers[i] = new Producer(queue);
}
for(int j=0; j<2; j++){
customers[j] = new Customer(queue);
}
創(chuàng)建一個(gè)線程池將生產(chǎn)者和消費(fèi)者調(diào)用起來膏燕,這里的線程池大家可以使用自定義的線程池。
ExecutorService es = Executors.newCachedThreadPool();
for(Producer producer : producers){
es.execute(producer);
}
for(Customer customer : customers){
es.execute(customer);
}
下面是Main代碼:
import java.util.concurrent.BlockingDeque;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingDeque;
/**
* @Author: feiweiwei
* @Description:
* @Created Date: 17:29 17/9/10.
* @Modify by:
*/
public class Main {
public static void main(String[] args) throws InterruptedException {
BlockingDeque<PCData> queue = new LinkedBlockingDeque<PCData>(10);
Producer[] producers = new Producer[10];
Customer[] customers = new Customer[2];
for(int i=0; i<10; i++){
producers[i] = new Producer(queue);
}
for(int j=0; j<2; j++){
customers[j] = new Customer(queue);
}
ExecutorService es = Executors.newCachedThreadPool();
for(Producer producer : producers){
es.execute(producer);
}
for(Customer customer : customers){
es.execute(customer);
}
Thread.sleep(10000);
for(Producer producer : producers){
producer.stop();
}
for(Customer customer : customers){
customer.stop();
}
es.shutdown();
}
}
Disruptor實(shí)現(xiàn)生產(chǎn)者消費(fèi)者模式
剛才那個(gè)是我們自己使用java.util.concurrent下的類實(shí)現(xiàn)的生產(chǎn)者消費(fèi)者模式悟民,目前業(yè)界已經(jīng)有比較成熟的方案坝辫,這里向大家推薦LMAX公司開源的Disruptor框架,Disruptor是一個(gè)開源的框架射亏,可以在無鎖的情況下對隊(duì)列進(jìn)行操作阀溶,那么這個(gè)隊(duì)列的設(shè)計(jì)就是Disruptor的核心所在。
在Disruptor中鸦泳,采用了RingBuffer來作為隊(duì)列的數(shù)據(jù)結(jié)構(gòu)银锻,RingBuffer就是一個(gè)環(huán)形的數(shù)組,既然是數(shù)組做鹰,我們便可對其設(shè)置大小击纬。在這個(gè)ringBuffer中,除了數(shù)組之外钾麸,還有一個(gè)序列號更振,是用來指向數(shù)組中的下一個(gè)可用元素,供生產(chǎn)者使用或者消費(fèi)者使用饭尝,也就是生產(chǎn)者可以生產(chǎn)的地方肯腕,或者消費(fèi)者可以消費(fèi)的地方。在Disruptor中使用的是位運(yùn)算钥平,并且在Disruptor中數(shù)組內(nèi)的元素并不會(huì)被刪除实撒,而是新數(shù)據(jù)來覆蓋原有數(shù)據(jù),所以整個(gè)環(huán)鏈的處理效率非常高涉瘾。
下面我們使用Disruptor來實(shí)現(xiàn)剛才用jdk自帶庫實(shí)現(xiàn)的生產(chǎn)者消費(fèi)者知态。
Disruptor主要類
Disruptor:Disruptor的入口,主要封裝了環(huán)形隊(duì)列RingBuffer立叛、消費(fèi)者集合ConsumerRepository的引用负敏;主要提供了獲取環(huán)形隊(duì)列、添加消費(fèi)者秘蛇、生產(chǎn)者向RingBuffer中添加事件(可以理解為生產(chǎn)者生產(chǎn)數(shù)據(jù))的操作其做;
RingBuffer:Disruptor中隊(duì)列具體的實(shí)現(xiàn)顶考,底層封裝了Object[]數(shù)組;在初始化時(shí)妖泄,會(huì)使用Event事件對數(shù)組進(jìn)行填充村怪,填充的大小就是bufferSize設(shè)置的值;此外浮庐,該對象內(nèi)部還維護(hù)了Sequencer(序列生產(chǎn)器)具體的實(shí)現(xiàn);
Sequencer:序列生產(chǎn)器柬焕,分別有MultiProducerSequencer(多生產(chǎn)者序列生產(chǎn)器) 和 SingleProducerSequencer(單生產(chǎn)者序列生產(chǎn)器)兩個(gè)實(shí)現(xiàn)類审残。上面的例子中,使用的是SingleProducerSequencer斑举;在Sequencer中搅轿,維護(hù)了消費(fèi)者的Sequence(序列對象)和生產(chǎn)者自己的Sequence(序列對象);以及維護(hù)了生產(chǎn)者與消費(fèi)者序列沖突時(shí)候的等待策略WaitStrategy富玷;
Sequence:序列對象璧坟,內(nèi)部維護(hù)了一個(gè)long型的value,這個(gè)序列指向了RingBuffer中Object[]數(shù)組具體的角標(biāo)赎懦。生產(chǎn)者和消費(fèi)者各自維護(hù)自己的Sequence雀鹃;但都是指向RingBuffer的Object[]數(shù)組;
Wait Strategy:等待策略励两。當(dāng)沒有可消費(fèi)的事件時(shí)黎茎,消費(fèi)者根據(jù)特定的策略進(jìn)行等待;當(dāng)沒有可生產(chǎn)的地方時(shí)当悔,生產(chǎn)者根據(jù)特定的策略進(jìn)行等待傅瞻;
Event:事件對象,就是我們Ringbuffer中存在的數(shù)據(jù)盲憎,在Disruptor中用Event來定義數(shù)據(jù)嗅骄,并不存在Event類,它只是一個(gè)定義饼疙;
EventProcessor:事件處理器溺森,單獨(dú)在一個(gè)線程內(nèi)執(zhí)行,判斷消費(fèi)者的序列和生產(chǎn)者序列關(guān)系窑眯,決定是否調(diào)用我們自定義的事件處理器儿惫,也就是是否可以進(jìn)行消費(fèi);
EventHandler:事件處理器伸但,由用戶自定義實(shí)現(xiàn)肾请,也就是最終的事件消費(fèi)者,需要實(shí)現(xiàn)EventHandler接口更胖;
Producer:事件生產(chǎn)者铛铁,也就是我們上面代碼中最后那部門的for循環(huán)隔显;
待處理類
Disruptor的待處理類和自己實(shí)現(xiàn)的待處理類沒有本質(zhì)的區(qū)別,可以按照自己要求進(jìn)行定義饵逐。
public class PCData {
private int data;
public int getData() {
return data;
}
public void setData(int data) {
this.data = data;
}
}
待處理類工廠
這里需要實(shí)現(xiàn)disruptor的EventFactory接口括眠,并且實(shí)現(xiàn)newInstance方法。這里我們實(shí)現(xiàn)的newInstance方法倍权,其實(shí)就是創(chuàng)建待處理類的對象掷豺,該工廠類在創(chuàng)建Disruptor對象的時(shí)候會(huì)使用到。
import com.lmax.disruptor.EventFactory;
/**
* @Author: feiweiwei
* @Description: 待處理類工廠
* @Created Date: 18:55 17/9/10.
* @Modify by:
*/
public class PCDataFactory implements EventFactory<PCData> {
@Override
public PCData newInstance() {
return new PCData();
}
}
disruptor生產(chǎn)者類
同樣需要在生產(chǎn)者中定義一個(gè)RingBuffer<PCData>的環(huán)形隊(duì)列薄声,還需要實(shí)現(xiàn)一個(gè)push的方法当船,通過ringBuffer.next()取到下一個(gè)待處理類序列號,使用ringBuffer.get(sequence)獲取到這個(gè)序列號對應(yīng)的待處理類默辨,并對待處理類進(jìn)行賦值為新的待處理類德频。
最后通過ringBuffer.publish(sequence)才會(huì)將待處理對象發(fā)布出來,消費(fèi)者才能看到缩幸。
import com.lmax.disruptor.RingBuffer;
/**
* @Author: feiweiwei
* @Description: disruptor生產(chǎn)者類
* @Created Date: 18:56 17/9/10.
* @Modify by:
*/
public class Producer {
private final RingBuffer<PCData> ringBuffer;
public Producer(RingBuffer<PCData> ringBuffer) {
this.ringBuffer = ringBuffer;
}
public void pushData(int data){
long sequence = ringBuffer.next();
try{
PCData event = ringBuffer.get(sequence);
event.setData(data);
}finally {
ringBuffer.publish(sequence);
}
}
}
disruptor消費(fèi)者
disruptor的消費(fèi)者類需要實(shí)現(xiàn)WorkHandler接口壹置,并實(shí)現(xiàn)onEvent方法來處理待處理類,例子中只是對待處理類中的值做了平方表谊。
import com.lmax.disruptor.WorkHandler;
/**
* @Author: feiweiwei
* @Description: disruptor消費(fèi)者
* @Created Date: 18:52 17/9/10.
* @Modify by:
*/
public class Consumer implements WorkHandler<PCData> {
@Override
public void onEvent(PCData pcData) throws Exception {
System.out.println(Thread.currentThread().getId() +
"Event = " + pcData.getData()*pcData.getData());
}
}
Main
待處理類钞护、待處理工廠、生產(chǎn)者爆办、消費(fèi)者都定義好之后就可以進(jìn)行使用了患亿,定義一個(gè)緩行隊(duì)列為1024的disruptor對象,這里構(gòu)造函數(shù)入?yún)⒖疵志椭懒搜罕疲芎唵巍?/p>
PCDataFactory factory = new PCDataFactory();
int bufferSize = 1024;
Disruptor<PCData> disruptor = new Disruptor<PCData>(factory,bufferSize,executor,
ProducerType.MULTI,new BlockingWaitStrategy());
給disruptor對象定義消費(fèi)者步藕,這里就簡單定義兩個(gè)consumer作為生產(chǎn)者。
disruptor.handleEventsWithWorkerPool(new Consumer(),new Consumer());
初始化Producer并且將ringBuffer作為構(gòu)造函數(shù)入?yún)⑻舾瘢⑼ㄟ^生產(chǎn)者循環(huán)100次將數(shù)據(jù)push入隊(duì)列咙冗,消費(fèi)者會(huì)自動(dòng)從隊(duì)列取值進(jìn)行處理。
RingBuffer<PCData> ringBuffer = disruptor.getRingBuffer();
Producer producer = new Producer(ringBuffer);
for(int i=0; i<100; i++){
producer.pushData(i);
Thread.sleep(100);
System.out.println("push data " + i);
}
以下為Main全部代碼:
package com.monkey01.producercustomer.disruptor;
import com.lmax.disruptor.BlockingWaitStrategy;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
/**
* @Author: feiweiwei
* @Description:
* @Created Date: 18:59 17/9/10.
* @Modify by:
*/
public class Main {
public static void main(String args[]) throws InterruptedException {
Executor executor = Executors.newCachedThreadPool();
PCDataFactory factory = new PCDataFactory();
int bufferSize = 1024;
Disruptor<PCData> disruptor = new Disruptor<PCData>(factory,bufferSize,executor,
ProducerType.MULTI,new BlockingWaitStrategy());
disruptor.handleEventsWithWorkerPool(new Consumer(),
new Consumer());
disruptor.start();
RingBuffer<PCData> ringBuffer = disruptor.getRingBuffer();
Producer producer = new Producer(ringBuffer);
for(int i=0; i<100; i++){
producer.pushData(i);
Thread.sleep(100);
System.out.println("push data " + i);
}
disruptor.shutdown();
}
}
總結(jié)
大家看到這里也基本對生產(chǎn)者漂彤、消費(fèi)者模式有個(gè)比較深入的了解了雾消,也可以按照文中的例子,在自己的項(xiàng)目中使用挫望,這個(gè)模式在日常項(xiàng)目中還是比較常見的立润,希望大家能夠熟練使用該模式。