解決MQ消費(fèi)者消息堆積問題(MQ的消費(fèi)者消息丟棄功能)

問題介紹

在開發(fā)中黄伊,遇到了這樣一個(gè)問題,我們使用ActivateMQ來接收處理消息墓阀,然后調(diào)用人工智能的算法去處理數(shù)據(jù)拓轻,但是算法處理的速度太慢,跟不上消息的接收速度勿锅,限制于硬件的問題,算法也沒辦法增加更多的服務(wù)器來進(jìn)行并發(fā)處理粱甫。所以導(dǎo)致消息堆積作瞄,處理的延遲越來越大,新的數(shù)據(jù)得不到處理乌庶。而我們待處理的消息契耿,可以容忍丟棄一部分,所以想到了透敌,如果處理不完的數(shù)據(jù)踢械,可以丟棄,保證最新的數(shù)據(jù)能夠得到處理撵术。

生產(chǎn)者消費(fèi)者模式介紹

生產(chǎn)者消費(fèi)者模式话瞧,是一種常見的設(shè)計(jì)模式。該模式將消息的生成者和消費(fèi)者分開划滋,還有一個(gè)緩沖區(qū)處于生產(chǎn)者和消費(fèi)者之間埃篓,作為一個(gè)中介。生產(chǎn)者把數(shù)據(jù)放入緩沖區(qū)稻薇,而消費(fèi)者從緩沖區(qū)取出數(shù)據(jù)胶征。
使用mq隊(duì)列,也是一種服務(wù)之間的生產(chǎn)者消費(fèi)者模式案狠,其緩沖區(qū)即mq隊(duì)列。而在java單應(yīng)用中骂铁,生產(chǎn)者消費(fèi)者模式,一般使用阻塞隊(duì)列來實(shí)現(xiàn)灿椅,即BlockingQueue接口钞支,來充當(dāng)緩沖區(qū)。

BlockingQueue簡介

阻塞隊(duì)列(BlockingQueue)是一個(gè)支持兩個(gè)附加操作的隊(duì)列婴洼。這兩個(gè)附加的操作是:在隊(duì)列為空時(shí)撼嗓,獲取元素的線程會(huì)等待隊(duì)列變?yōu)榉强铡.?dāng)隊(duì)列滿時(shí)粉捻,存儲(chǔ)元素的線程會(huì)等待隊(duì)列可用振湾。生產(chǎn)者是往隊(duì)列里添加元素的線程,消費(fèi)者是從隊(duì)列里拿元素的線程树酪。阻塞隊(duì)列就是生產(chǎn)者存放元素的容器大州,而消費(fèi)者也只從容器里拿元素。

BlockingQueue的接口方法:

方法\處理方式 拋出異常 返回特殊值 一直阻塞 超時(shí)退出
插入方法 add(e) offer(e) put(e) offer(e,time,unit)
移除方法 remove() poll() take() poll(time,unit)
檢查方法 element() peek() 不可用 不可用

上述這些方法不再詳細(xì)介紹了疮茄,大家可以查閱其他資料根暑。

容忍丟棄的消費(fèi)者實(shí)現(xiàn)(使用BlockingQueue實(shí)現(xiàn))

消費(fèi)者,使用生產(chǎn)者消費(fèi)者模式實(shí)現(xiàn)畸裳。消費(fèi)時(shí)淳地,不是直接進(jìn)行處理帅容,而是將消息添加到阻塞隊(duì)列中并徘,如果指定的時(shí)間內(nèi)扰魂,沒有添加進(jìn)去,就跳過該條消息路幸,這條消息可丟棄付翁,或者保存數(shù)據(jù)庫等晃听,以便后續(xù)處理。
將消息添加到阻塞隊(duì)列中時(shí)佣渴,通過指定的時(shí)間初斑,可以計(jì)算出該消費(fèi)者的最低并發(fā),如指定1s砂竖,則最低并發(fā)為1鹃答,即如果處理消息時(shí),處理速度大于1個(gè)/s置济,則該消費(fèi)者并發(fā)降到最低锋八,變?yōu)?。所以為了保證mq中消息不堆積羞酗,可以設(shè)置添加阻塞隊(duì)列的超時(shí)時(shí)間樊销,和mq中接收消息的間隔一致脏款。
如果有多個(gè)mq的監(jiān)聽消費(fèi)者裤园,則根據(jù)多消費(fèi)者來計(jì)算,確定阻塞隊(duì)列的超時(shí)時(shí)間:
阻塞隊(duì)列的超時(shí)時(shí)間 = mq中接收消息的間隔 * mq監(jiān)聽者數(shù)量
(當(dāng)然系統(tǒng)運(yùn)行時(shí)剃盾,消息推送處理等會(huì)有耗時(shí)淤袜,阻塞隊(duì)列的超時(shí)時(shí)間應(yīng)當(dāng)適當(dāng)小一些,保證mq消息不堆積)

阻塞隊(duì)列消費(fèi)時(shí)积蔚,使用線程池烦周,啟動(dòng)多個(gè)線程,讀取阻塞隊(duì)列漱贱,處理消息夭委。

代碼示例:

package com.pu.WebMagicPro;

import java.util.concurrent.*;

public class ConsumerTest {

    /**
     * 使用多線程處理消息
     */
    private Executor executor = new ThreadPoolExecutor(3, 5,
            10, TimeUnit.SECONDS,
            new ArrayBlockingQueue<>(512), // 使用有界隊(duì)列,避免OOM
            new ThreadPoolExecutor.DiscardPolicy());

    /**
     * 隊(duì)列長度
     */
    private static final int QUEUE_SIZE = 3;

    /**
     * 消費(fèi)者處理隊(duì)列
     */
    private BlockingQueue<String> queue = new ArrayBlockingQueue<>(QUEUE_SIZE);

    /**
     * 啟動(dòng)讀取隊(duì)列內(nèi)容的標(biāo)記
     */
    private volatile boolean readQueue = false;


    /**
     * 消費(fèi)方法
     * @param content 消息內(nèi)容
     */
    public void consumer(String content) {
        try {
            // 消費(fèi)消息崇摄,1s未插入隊(duì)列配猫,則消費(fèi)不成功
            boolean success = queue.offer(content, 1, TimeUnit.SECONDS);
            if (!success) {
                System.out.println("消息丟棄:" + content);     // 消息消費(fèi)不成功杏死,可以直接丟棄,或者保存到數(shù)據(jù)庫中等腐巢,使用其他辦法處理
            }

            if (!readQueue) {
                readQueue = true;
                // 啟動(dòng)多個(gè)線程消費(fèi)隊(duì)列
                for (int i = 0; i < QUEUE_SIZE; i++) {
                    executor.execute(this::startRead);
                }
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    private void startRead() {
        while (readQueue) {
            String content = null;
            try {
                // 讀取消息
                content = queue.poll(5, TimeUnit.SECONDS);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            if (content != null) {
                // 處理消息
                dealContent(content);
            }
        }
    }

    /**
     * 模擬實(shí)際的處理方法
     * @param content 消息
     */
    private void dealContent(String content) {
        // 模擬處理過程
        System.out.println("處理: " + content);
        try {
            // 線程睡眠5秒冯丙,模擬處理用時(shí)
            TimeUnit.SECONDS.sleep(5);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("處理完成: " + content);
    }

    public static void main(String[] args) {
        ConsumerTest consumer = new ConsumerTest();
        int size = 5;
        // 開啟5個(gè)線程進(jìn)行消息的推送遭京,每個(gè)線程推送10次消息
        for (int i = 0; i < size; i++) {
            final int t = i;
            Thread thread = new Thread(() -> {
                for (int j = 0; j < 10; j++) {
                    consumer.consumer("線程:" + t + ",消息" + j);
                }
            });
            thread.start();
        }
    }
}

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市船殉,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌挨厚,老刑警劉巖糠惫,帶你破解...
    沈念sama閱讀 211,884評論 6 492
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異巢价,居然都是意外死亡理郑,警方通過查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,347評論 3 385
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來役电,“玉大人,你說我怎么就攤上這事冀膝■” “怎么了?”我有些...
    開封第一講書人閱讀 157,435評論 0 348
  • 文/不壞的土叔 我叫張陵赐纱,是天一觀的道長熬北。 經(jīng)常有香客問我,道長起胰,這世上最難降的妖魔是什么巫延? 我笑而不...
    開封第一講書人閱讀 56,509評論 1 284
  • 正文 為了忘掉前任地消,我火速辦了婚禮脉执,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘适瓦。我一直安慰自己谱仪,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 65,611評論 6 386
  • 文/花漫 我一把揭開白布嗦随。 她就那樣靜靜地躺著敬尺,像睡著了一般。 火紅的嫁衣襯著肌膚如雪署恍。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 49,837評論 1 290
  • 那天盯质,我揣著相機(jī)與錄音呼巷,去河邊找鬼。 笑死赎瑰,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的餐曼。 我是一名探鬼主播,決...
    沈念sama閱讀 38,987評論 3 408
  • 文/蒼蘭香墨 我猛地睜開眼渠脉,長吁一口氣:“原來是場噩夢啊……” “哼瓶佳!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起为朋,我...
    開封第一講書人閱讀 37,730評論 0 267
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎习寸,沒想到半個(gè)月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體孵滞,經(jīng)...
    沈念sama閱讀 44,194評論 1 303
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡鸯匹,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,525評論 2 327
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了匿级。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片染厅。...
    茶點(diǎn)故事閱讀 38,664評論 1 340
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖孤页,靈堂內(nèi)的尸體忽然破棺而出涩馆,到底是詐尸還是另有隱情散庶,我是刑警寧澤,帶...
    沈念sama閱讀 34,334評論 4 330
  • 正文 年R本政府宣布屋讶,位于F島的核電站,受9級特大地震影響斩芭,放射性物質(zhì)發(fā)生泄漏乐疆。R本人自食惡果不足惜划乖,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,944評論 3 313
  • 文/蒙蒙 一琴庵、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧儿礼,春花似錦庆寺、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,764評論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽陵霉。三九已至,卻和暖如春鹰晨,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背模蜡。 一陣腳步聲響...
    開封第一講書人閱讀 31,997評論 1 266
  • 我被黑心中介騙來泰國打工扁凛, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人谨朝。 一個(gè)月前我還...
    沈念sama閱讀 46,389評論 2 360
  • 正文 我出身青樓字币,卻偏偏與公主長得像,于是被迫代替她去往敵國和親洗出。 傳聞我的和親對象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 43,554評論 2 349