RabbitMQ學(xué)習(xí)(五)消費端削峰限流

1.MQ的作用

1)解耦:在項目啟動之初是很難預(yù)測未來會遇到什么困難的睦霎,消息中間件在處理過程中插入了一個隱含的梢卸,基于數(shù)據(jù)的接口層,兩邊都實現(xiàn)這個接口副女,這樣就允許獨立的修改或者擴展兩邊的處理過程蛤高,只要兩邊遵守相同的接口約束即可。
2)冗余(存儲):在某些情況下處理數(shù)據(jù)的過程中會失敗碑幅,消息中間件允許把數(shù)據(jù)持久化知道他們完全被處理
擴展性:消息中間件解耦了應(yīng)用的過程戴陡,所以提供消息入隊和處理的效率是很容易的,只需要增加處理流程就可以了沟涨。
3)削峰:在訪問量劇增的情況下恤批,但是應(yīng)用仍然需要發(fā)揮作用,但是這樣的突發(fā)流量并不常見裹赴。而使用消息中間件采用隊列的形式可以減少突發(fā)訪問壓力喜庞,不會因為突發(fā)的超時負荷要求而崩潰
4)可恢復(fù)性:當(dāng)系統(tǒng)一部分組件失效時,不會影響到整個系統(tǒng)棋返。消息中間件降低了進程間的耦合性延都,當(dāng)一個處理消息的進程掛掉后,加入消息中間件的消息仍然可以在系統(tǒng)恢復(fù)后重新處理
5)順序保證:在大多數(shù)場景下睛竣,處理數(shù)據(jù)的順序也很重要晰房,大部分消息中間件支持一定的順序性
6)緩沖:消息中間件通過一個緩沖層來幫助任務(wù)最高效率的執(zhí)行
7)異步通信:通過把把消息發(fā)送給消息中間件,消息中間件并不立即處。

本文只討論削峰填谷的應(yīng)用場景:

舉個業(yè)務(wù)場景的栗子殊者,秒殺業(yè)務(wù):
上游發(fā)起下單操作
下游完成秒殺業(yè)務(wù)邏輯(庫存檢查与境,庫存凍結(jié),余額檢查猖吴,余額凍結(jié)摔刁,訂單生成,余額扣減海蔽,庫存扣減簸搞,生成流水,余額解凍准潭,庫存解凍)
上游下單業(yè)務(wù)簡單,每秒發(fā)起了10000個請求域仇,下游秒殺業(yè)務(wù)復(fù)雜刑然,每秒只能處理2000個請求,很有可能上游不限速的下單暇务,導(dǎo)致下游系統(tǒng)被壓垮泼掠,引發(fā)雪崩。
為了避免雪崩垦细,常見的優(yōu)化方案有兩種:
1)業(yè)務(wù)上游隊列緩沖择镇,限速發(fā)送
2)業(yè)務(wù)下游隊列緩沖,限速執(zhí)行

本文只討論下游隊列括改,就是消費端的限速執(zhí)行

rabbitmq提供了一種服務(wù)質(zhì)量保障功能腻豌,即在非自動確認消息的前提下,如果一定數(shù)目的消息未被確認嘱能,不進行消費新的消息吝梅。
使用 basicqos方法:
在消費端進行使用。 0 1 false
prefetSize:0
prefetCount:這個值一般在設(shè)置為非自動ack的情況下生效惹骂,一般大小為1
global: true是channel級別苏携, false是消費者級別
注意:我們要使用非自動ack
消費者代碼:

package com.bfxy.rabbitmq.api.limit;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.QueueingConsumer.Delivery;

public class Consumer {

    
    public static void main(String[] args) throws Exception {
        
        
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("10.136.197.244");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");
        connectionFactory.setUsername("admin");
        connectionFactory.setPassword("123456");
        
        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();
        
        
        String exchangeName = "test_qos_exchange";
        String queueName = "test_qos_queue";
        String routingKey = "qos.#";
        
        channel.exchangeDeclare(exchangeName, "topic", true, false, null);
        channel.queueDeclare(queueName, true, false, false, null);
        channel.queueBind(queueName, exchangeName, routingKey);
        
        //1 限流方式  第一件事就是 autoAck設(shè)置為 false
        
        channel.basicQos(0,3,false);
        channel.basicConsume(queueName,false,new MyConsumer(channel));
    }
}

自定義消費者代碼:

package com.bfxy.rabbitmq.api.limit;

import java.io.IOException;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;

public class MyConsumer extends DefaultConsumer {


    private Channel channel ;
    
    public MyConsumer(Channel channel) {
        super(channel);
        this.channel = channel;
    }

    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
        System.err.println("-----------consume message----------");
        System.err.println("consumerTag: " + consumerTag);
        System.err.println("envelope: " + envelope);
        System.err.println("properties: " + properties);
        System.err.println("body: " + new String(body));
        
        channel.basicAck(envelope.getDeliveryTag(), false);
        
    }


}

生產(chǎn)者代碼:

package com.bfxy.rabbitmq.api.limit;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class Producer {

    
    public static void main(String[] args) throws Exception {
        
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("10.136.197.244");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");
        connectionFactory.setUsername("admin");


        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();
        
        String exchange = "test_qos_exchange";
        String routingKey = "qos.save";
        
        String msg = "Hello RabbitMQ QOS Message";
        
        for(int i =0; i<5; i ++){
            channel.basicPublish(exchange, routingKey, true, null, msg.getBytes());
        }
        
    }
}

調(diào)試步驟:
1)啟動消費者類,效果如圖:

消費者啟動mq交換機信息.JPG

消費者啟動mq隊列信息.JPG

2)在自定義消費者類中注釋掉channel.basicAck(envelope.getDeliveryTag(), false);
啟動生產(chǎn)者類对粪,mq管控臺信息
管控臺信息.JPG

可以看到1個待確認的右冻,4個準備好的消息,
3)放開代碼channel.basicAck(envelope.getDeliveryTag(), false);
啟動生產(chǎn)者類著拭,mq管控臺信息
管控臺信息.JPG

總結(jié):消費者消費成功一個消息后纱扭,需要設(shè)置成手動確認,當(dāng)返回確認成功后茫死,再去消費下一個消息跪但,這樣可以實現(xiàn)消費端的削峰限流,不至于讓消費端服務(wù)崩潰。
到這里是不是以為結(jié)束了呢屡久,其實還有一個知識點忆首,就是消費端對沒有消費成功的消息,可以不進行確認被环,讓其重回隊列糙及,再次消費,與上面的代碼相比筛欢,只需修改自定義的消費者浸锨,設(shè)置如果滿足我們自己設(shè)置的條件就認為是沒有消費成功,讓其重回隊列版姑,這個時候broker端會再此發(fā)出這條消息柱搜。
修改如下:
重回隊列.JPG

啟動生產(chǎn)者和消費者,消費者控制臺信息如下:
重復(fù)消費未確認的消息.JPG

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末剥险,一起剝皮案震驚了整個濱河市聪蘸,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌表制,老刑警劉巖健爬,帶你破解...
    沈念sama閱讀 218,122評論 6 505
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異么介,居然都是意外死亡娜遵,警方通過查閱死者的電腦和手機,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,070評論 3 395
  • 文/潘曉璐 我一進店門壤短,熙熙樓的掌柜王于貴愁眉苦臉地迎上來设拟,“玉大人,你說我怎么就攤上這事鸽扁∷庹溃” “怎么了?”我有些...
    開封第一講書人閱讀 164,491評論 0 354
  • 文/不壞的土叔 我叫張陵,是天一觀的道長。 經(jīng)常有香客問我诵闭,道長孵坚,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 58,636評論 1 293
  • 正文 為了忘掉前任,我火速辦了婚禮,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘钮科。我一直安慰自己,他們只是感情好婆赠,可當(dāng)我...
    茶點故事閱讀 67,676評論 6 392
  • 文/花漫 我一把揭開白布绵脯。 她就那樣靜靜地躺著,像睡著了一般。 火紅的嫁衣襯著肌膚如雪蛆挫。 梳的紋絲不亂的頭發(fā)上赃承,一...
    開封第一講書人閱讀 51,541評論 1 305
  • 那天,我揣著相機與錄音悴侵,去河邊找鬼瞧剖。 笑死,一個胖子當(dāng)著我的面吹牛可免,可吹牛的內(nèi)容都是我干的抓于。 我是一名探鬼主播,決...
    沈念sama閱讀 40,292評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼浇借,長吁一口氣:“原來是場噩夢啊……” “哼捉撮!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起妇垢,我...
    開封第一講書人閱讀 39,211評論 0 276
  • 序言:老撾萬榮一對情侶失蹤呕缭,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后修己,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,655評論 1 314
  • 正文 獨居荒郊野嶺守林人離奇死亡迎罗,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,846評論 3 336
  • 正文 我和宋清朗相戀三年睬愤,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片纹安。...
    茶點故事閱讀 39,965評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡尤辱,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出厢岂,到底是詐尸還是另有隱情光督,我是刑警寧澤,帶...
    沈念sama閱讀 35,684評論 5 347
  • 正文 年R本政府宣布塔粒,位于F島的核電站结借,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏卒茬。R本人自食惡果不足惜船老,卻給世界環(huán)境...
    茶點故事閱讀 41,295評論 3 329
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望圃酵。 院中可真熱鬧柳畔,春花似錦、人聲如沸郭赐。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,894評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至俘陷,卻和暖如春罗捎,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背岭洲。 一陣腳步聲響...
    開封第一講書人閱讀 33,012評論 1 269
  • 我被黑心中介騙來泰國打工宛逗, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人盾剩。 一個月前我還...
    沈念sama閱讀 48,126評論 3 370
  • 正文 我出身青樓雷激,卻偏偏與公主長得像,于是被迫代替她去往敵國和親告私。 傳聞我的和親對象是個殘疾皇子屎暇,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 44,914評論 2 355

推薦閱讀更多精彩內(nèi)容

  • 概念:微服務(wù)就是一些可獨立運行、可協(xié)同工作的小的服務(wù)驻粟。微服務(wù)是現(xiàn)在特別流行的服務(wù)根悼,微服務(wù)的字面意思是大家都很好理解...
    程序員技術(shù)圈閱讀 3,347評論 2 47
  • Spring Cloud為開發(fā)人員提供了快速構(gòu)建分布式系統(tǒng)中一些常見模式的工具(例如配置管理,服務(wù)發(fā)現(xiàn)蜀撑,斷路器挤巡,智...
    卡卡羅2017閱讀 134,657評論 18 139
  • IM系統(tǒng)的MQ消息中間件選型:Kafka還是RabbitMQ矿卑? 1、前言 在IM這種講究高并發(fā)沃饶、高消息吞吐的互聯(lián)網(wǎng)...
    匆匆歲月閱讀 3,171評論 1 111
  • 秋水 紅葉與天邊 ☆田秀 一聲聲雁叫轉(zhuǎn)上紅葉放飛的長空 (我正枕靠水邊的夢鄉(xiāng)) 拉開黃昏...
    興安居士閱讀 260評論 0 3
  • 轉(zhuǎn)折點 當(dāng)每個人跟隨神的心意時母廷,往往會面臨一個信仰的轉(zhuǎn)折點,有的人糊肤,雖有感動琴昆,卻沒有跟隨下去。中途放棄了馆揉,這樣就錯...
    玉初辰閱讀 831評論 0 51