問題介紹
在開發(fā)中黄伊,遇到了這樣一個(gè)問題,我們使用ActivateMQ來接收處理消息墓阀,然后調(diào)用人工智能的算法去處理數(shù)據(jù)拓轻,但是算法處理的速度太慢,跟不上消息的接收速度勿锅,限制于硬件的問題,算法也沒辦法增加更多的服務(wù)器來進(jìn)行并發(fā)處理粱甫。所以導(dǎo)致消息堆積作瞄,處理的延遲越來越大,新的數(shù)據(jù)得不到處理乌庶。而我們待處理的消息契耿,可以容忍丟棄一部分,所以想到了透敌,如果處理不完的數(shù)據(jù)踢械,可以丟棄,保證最新的數(shù)據(jù)能夠得到處理撵术。
生產(chǎn)者消費(fèi)者模式介紹
生產(chǎn)者消費(fèi)者模式话瞧,是一種常見的設(shè)計(jì)模式。該模式將消息的生成者和消費(fèi)者分開划滋,還有一個(gè)緩沖區(qū)處于生產(chǎn)者和消費(fèi)者之間埃篓,作為一個(gè)中介。生產(chǎn)者把數(shù)據(jù)放入緩沖區(qū)稻薇,而消費(fèi)者從緩沖區(qū)取出數(shù)據(jù)胶征。
使用mq隊(duì)列,也是一種服務(wù)之間的生產(chǎn)者消費(fèi)者模式案狠,其緩沖區(qū)即mq隊(duì)列。而在java單應(yīng)用中骂铁,生產(chǎn)者消費(fèi)者模式,一般使用阻塞隊(duì)列來實(shí)現(xiàn)灿椅,即BlockingQueue接口钞支,來充當(dāng)緩沖區(qū)。
BlockingQueue簡介
阻塞隊(duì)列(BlockingQueue)是一個(gè)支持兩個(gè)附加操作的隊(duì)列婴洼。這兩個(gè)附加的操作是:在隊(duì)列為空時(shí)撼嗓,獲取元素的線程會(huì)等待隊(duì)列變?yōu)榉强铡.?dāng)隊(duì)列滿時(shí)粉捻,存儲(chǔ)元素的線程會(huì)等待隊(duì)列可用振湾。生產(chǎn)者是往隊(duì)列里添加元素的線程,消費(fèi)者是從隊(duì)列里拿元素的線程树酪。阻塞隊(duì)列就是生產(chǎn)者存放元素的容器大州,而消費(fèi)者也只從容器里拿元素。
BlockingQueue的接口方法:
方法\處理方式 | 拋出異常 | 返回特殊值 | 一直阻塞 | 超時(shí)退出 |
---|---|---|---|---|
插入方法 | add(e) | offer(e) | put(e) | offer(e,time,unit) |
移除方法 | remove() | poll() | take() | poll(time,unit) |
檢查方法 | element() | peek() | 不可用 | 不可用 |
上述這些方法不再詳細(xì)介紹了疮茄,大家可以查閱其他資料根暑。
容忍丟棄的消費(fèi)者實(shí)現(xiàn)(使用BlockingQueue實(shí)現(xiàn))
消費(fèi)者,使用生產(chǎn)者消費(fèi)者模式實(shí)現(xiàn)畸裳。消費(fèi)時(shí)淳地,不是直接進(jìn)行處理帅容,而是將消息添加到阻塞隊(duì)列中并徘,如果指定的時(shí)間內(nèi)扰魂,沒有添加進(jìn)去,就跳過該條消息路幸,這條消息可丟棄付翁,或者保存數(shù)據(jù)庫等晃听,以便后續(xù)處理。
將消息添加到阻塞隊(duì)列中時(shí)佣渴,通過指定的時(shí)間初斑,可以計(jì)算出該消費(fèi)者的最低并發(fā),如指定1s砂竖,則最低并發(fā)為1鹃答,即如果處理消息時(shí),處理速度大于1個(gè)/s置济,則該消費(fèi)者并發(fā)降到最低锋八,變?yōu)?。所以為了保證mq中消息不堆積羞酗,可以設(shè)置添加阻塞隊(duì)列的超時(shí)時(shí)間樊销,和mq中接收消息的間隔一致脏款。
如果有多個(gè)mq的監(jiān)聽消費(fèi)者裤园,則根據(jù)多消費(fèi)者來計(jì)算,確定阻塞隊(duì)列的超時(shí)時(shí)間:
阻塞隊(duì)列的超時(shí)時(shí)間 = mq中接收消息的間隔 * mq監(jiān)聽者數(shù)量
(當(dāng)然系統(tǒng)運(yùn)行時(shí)剃盾,消息推送處理等會(huì)有耗時(shí)淤袜,阻塞隊(duì)列的超時(shí)時(shí)間應(yīng)當(dāng)適當(dāng)小一些,保證mq消息不堆積)
阻塞隊(duì)列消費(fèi)時(shí)积蔚,使用線程池烦周,啟動(dòng)多個(gè)線程,讀取阻塞隊(duì)列漱贱,處理消息夭委。
代碼示例:
package com.pu.WebMagicPro;
import java.util.concurrent.*;
public class ConsumerTest {
/**
* 使用多線程處理消息
*/
private Executor executor = new ThreadPoolExecutor(3, 5,
10, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(512), // 使用有界隊(duì)列,避免OOM
new ThreadPoolExecutor.DiscardPolicy());
/**
* 隊(duì)列長度
*/
private static final int QUEUE_SIZE = 3;
/**
* 消費(fèi)者處理隊(duì)列
*/
private BlockingQueue<String> queue = new ArrayBlockingQueue<>(QUEUE_SIZE);
/**
* 啟動(dòng)讀取隊(duì)列內(nèi)容的標(biāo)記
*/
private volatile boolean readQueue = false;
/**
* 消費(fèi)方法
* @param content 消息內(nèi)容
*/
public void consumer(String content) {
try {
// 消費(fèi)消息崇摄,1s未插入隊(duì)列配猫,則消費(fèi)不成功
boolean success = queue.offer(content, 1, TimeUnit.SECONDS);
if (!success) {
System.out.println("消息丟棄:" + content); // 消息消費(fèi)不成功杏死,可以直接丟棄,或者保存到數(shù)據(jù)庫中等腐巢,使用其他辦法處理
}
if (!readQueue) {
readQueue = true;
// 啟動(dòng)多個(gè)線程消費(fèi)隊(duì)列
for (int i = 0; i < QUEUE_SIZE; i++) {
executor.execute(this::startRead);
}
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
private void startRead() {
while (readQueue) {
String content = null;
try {
// 讀取消息
content = queue.poll(5, TimeUnit.SECONDS);
} catch (InterruptedException e) {
e.printStackTrace();
}
if (content != null) {
// 處理消息
dealContent(content);
}
}
}
/**
* 模擬實(shí)際的處理方法
* @param content 消息
*/
private void dealContent(String content) {
// 模擬處理過程
System.out.println("處理: " + content);
try {
// 線程睡眠5秒冯丙,模擬處理用時(shí)
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("處理完成: " + content);
}
public static void main(String[] args) {
ConsumerTest consumer = new ConsumerTest();
int size = 5;
// 開啟5個(gè)線程進(jìn)行消息的推送遭京,每個(gè)線程推送10次消息
for (int i = 0; i < size; i++) {
final int t = i;
Thread thread = new Thread(() -> {
for (int j = 0; j < 10; j++) {
consumer.consumer("線程:" + t + ",消息" + j);
}
});
thread.start();
}
}
}