第五章----SpringBoot+RabbitMQ用死信隊列和插件形式實現(xiàn)延遲隊列

1. 死信隊列之延遲隊列

死信隊列:用來保存處理失敗或者過期的消息忿危,確保消息不被丟失以便排查問題姻乓!

延遲隊列:顧名思義就是消息在隊列中存在一定時間后再被消費。比如下單后半小時沒有支付的訂單自動取消跷叉,比如預(yù)約某項功能時提前15分鐘提醒嫌吠,比如希望某一個功能在多長時間后執(zhí)行等都可以使用延遲隊列。

  • RabbitMQ本身是沒有延遲隊列功能的搁拙,但是可以通過死信隊列的TTL和DLX模擬延遲隊列功能秒梳。
  • Time To Live:可以在發(fā)送消息時設(shè)置過期時間,也可以設(shè)置整個隊列的過期時間箕速,如果兩個同時設(shè)置已最早過期時間為準(zhǔn)酪碘。
  • Dead Letter Exchanges:可以通過綁定隊列的死信交換器來實現(xiàn)死信隊列。
x-dead-letter-exchange:綁定死信交換器(其實也是普通交換器盐茎,與類型無關(guān))
x-dead-letter-routing-key:綁定死信隊列的路由鍵(可選)
x-message-ttl:綁定隊列消息的過期時間(可選)
  • 死信隊列設(shè)計思路
生產(chǎn)者 --> 消息 --> 交換機 --> 隊列 --> 變成死信 --> DLX交換機 -->隊列 --> 消費者

進(jìn)入消息隊列:
1. 消息被拒絕兴垦,并且requeue= false
2. 消息ttl過期
3. 隊列達(dá)到最大的長度
死信隊列
  • 做延遲隊列需要創(chuàng)建一個沒有消費者的隊列,用了存儲消息字柠。然后創(chuàng)建一個真正的消費隊列探越,用來做具體的業(yè)務(wù)邏輯。當(dāng)帶有TTL的消息到達(dá)綁定死信交換器的隊列窑业,因為沒有消費者所以會一直等到消息過期钦幔,然后消息被投遞到死信隊列也就是真正的消費隊列。

  • 新建配置類MQDelayConfig.java常柄,創(chuàng)建支付交換器鲤氢、支付隊列綁定死信隊列搀擂、他們的綁定關(guān)系。無消費者卷玉,暫時不知道怎么用注解創(chuàng)建哨颂。

  • 設(shè)置x-dead-letter-exchange、x-dead-letter-routing-key揍庄、x-message-ttl咆蒿。

package com.fzb.rabbitmq.config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;

import java.util.HashMap;
import java.util.Map;

/**
 * @Description 利用死信隊列和過期時間模擬延遲隊列,沒有消費者蚂子,所以不能用注解形式
 * Time To Live(TTL)
 * 1. 可以在發(fā)送消息時設(shè)置過期時間(message.getMessageProperties().setExpiration("5000");)
 * 2. 也可以設(shè)置整個隊列的過期時間(args.put("x-message-ttl",10000);)
 * 3. 如果兩個同時設(shè)置已最早過期時間為準(zhǔn)
 * Dead Letter Exchanges(DLX)
 * @Author jxb
 * @Date 2019-03-10 10:25:30
 */
@Component
public class MQDelayConfig {

    /**
     * @Description 定義支付交換器
     * @Author jxb
     * @Date 2019-04-02 14:39:31
     */
    @Bean
    private DirectExchange directPayExchange() {
        return new DirectExchange("direct.pay.exchange");
    }

    /**
     * @Description 定義支付隊列 綁定死信隊列(其實是綁定的交換器沃测,然后通過交換器路由鍵綁定隊列) 設(shè)置過期時間
     * @Author jxb
     * @Date 2019-04-02 14:40:24
     */
    @Bean
    private Queue directPayQueue() {
        Map<String, Object> args = new HashMap<>(3);
        //聲明死信交換器
        args.put("x-dead-letter-exchange", "direct.delay.exchange");
        //聲明死信路由鍵
        args.put("x-dead-letter-routing-key", "DelayKey");
        //聲明隊列消息過期時間
        args.put("x-message-ttl", 10000);
        return new Queue("direct.pay.queue", true, false, false, args);
    }

    /**
     * @Description 定義支付綁定
     * @Author jxb
     * @Date 2019-04-02 14:46:10
     */
    @Bean
    private Binding bindingOrderDirect() {
        return BindingBuilder.bind(directPayQueue()).to(directPayExchange()).with("OrderPay");
    }
}
  • 帶有過期時間且綁定死信交換器的隊列
隊列
  • 生產(chǎn)者,為消息設(shè)置過期時間setExpiration("15000");
    /**
     * @Description 支付隊列食茎、綁定死信隊列蒂破,測試消息延遲功能
     * @Author jxb
     * @Date 2019-04-02 14:07:25
     */
    @RequestMapping(value = "/directDelayMQ", method = {RequestMethod.GET})
    public List<User> directDelayMQ() {
        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        List<User> users = userService.getUserList(null);
        for (User user : users) {
            CorrelationData correlationData = new CorrelationData(String.valueOf(user.getId()));
            rabbitTemplate.convertAndSend("direct.pay.exchange", "OrderPay", user,
                    message -> {
                        // 設(shè)置5秒過期
                        message.getMessageProperties().setExpiration("15000");
                        return message;
                    },
                    correlationData);
            System.out.println(user.getName() + ":" + sdf.format(new Date()));
        }
        return users;
    }
  • 消費者,聲明真正消費的隊列别渔、交換器附迷、綁定
    /**
     * @Description 延遲隊列
     * @Author jxb
     * @Date 2019-04-04 16:34:28
     */
    @RabbitListener(bindings = {@QueueBinding(value = @Queue(value = "direct.delay.queue"), exchange = @Exchange(value = "direct.delay.exchange"), key = {"DelayKey"})})
    public void getDLMessage(User user, Channel channel, Message message) throws InterruptedException, IOException {
        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        // 模擬執(zhí)行任務(wù)
        System.out.println("這是延遲隊列消費:" + user.getName() + ":" + sdf.format(new Date()));
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    }

測試結(jié)果,因為消息配置的是15秒后到期哎媚,而隊列配置了10秒到期喇伯,所以最終按照時間短的計算。

延遲隊列

思考: 如果先放入一條A消息過期時間是10秒拨与,再放入一個b消息過期時間是5秒稻据,那延遲隊列是否可以先消費b消息?

答案是否定的买喧,因為隊列就會遵循先進(jìn)先出的規(guī)則捻悯,b消息會等a消息過期后,一起消費淤毛,這就是所謂的隊列阻塞今缚。由這個問題我們引出插件形式來實現(xiàn)延遲隊列

2. 用rabbitmq-delayed-message-exchange插件實現(xiàn)延遲隊列

下載插件地址

  • 強烈建議安裝erlang20+版本和RabbitMQ3.7+版本,另插件版本要和RabbitMQ版本一致低淡。

  • 解壓成.ez的文件姓言,上傳到RabbitMQ安裝目錄的plugins文件夾下,停止服務(wù)器蔗蹋,開啟插件事期,啟動服務(wù)器。

1. 查看yum 安裝的軟件路徑
   查找安裝包:rpm -qa|grep rabbitmq
   查找位置: rpm -ql rabbitmq-server-3.6.15-1.el6.noarch
   卸載yum安裝:yum remove rabbitmq-server-3.6.15-1.el6.noarch
2. 上傳到plugins文件夾
3. 停止服務(wù)器
   service rabbitmq-server stop
4. 開啟插件
   rabbitmq-plugins enable rabbitmq_delayed_message_exchange
   (關(guān)閉插件)
   rabbitmq-plugins disable rabbitmq_delayed_message_exchange
5. 啟動服務(wù)器
   service rabbitmq-server start
6. 查看插件
   rabbitmq-plugins list
  • 生產(chǎn)者纸颜,設(shè)置Header屬性x-delay過期時間
    /**
     * @Description 插件延遲隊列功能
     * @Author jxb
     * @Date 2019-04-02 14:07:25
     */
    @RequestMapping(value = "/directPluginDelayMQ", method = {RequestMethod.GET})
    public List<User> directPluginDelayMQ() {
        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        List<User> users = userService.getUserList(null);
        for (User user : users) {
            CorrelationData correlationData = new CorrelationData(String.valueOf(user.getId()));
            rabbitTemplate.convertAndSend("direct.plugin.delay.exchange", "PluginDelayKey", user,
                    message -> {
                        // 設(shè)置5秒過期
                        message.getMessageProperties().setHeader("x-delay",5000);
                        return message;
                    },
                    correlationData);
            System.out.println(user.getName() + ":" + sdf.format(new Date()));
        }
        return users;
    }
  • 消費者兽泣,設(shè)置x-delayed-message類型的交換器,增加參數(shù)x-delayed-type為direct
    /**
     * @Description 插件延遲隊列
     * @Author jxb
     * @Date 2019-04-04 16:34:28
     */
    @RabbitListener(bindings = {@QueueBinding(value = @Queue(value = "direct.plugin.delay.queue"), exchange = @Exchange(value = "direct.plugin.delay.exchange",type = "x-delayed-message",arguments = {@Argument(name="x-delayed-type",value = "direct")}), key = {"PluginDelayKey"})})
    public void getPDLMessage(User user, Channel channel, Message message) throws InterruptedException, IOException {
        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        // 模擬執(zhí)行任務(wù)
        System.out.println("這是插件延遲隊列消費:" + user.getName() + ":" + sdf.format(new Date()));
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    }
  • 插件形式交換器
交換器

注:用代碼是創(chuàng)建一個:CustomExchange自定義交換器胁孙,類型一定要設(shè)置成:x-delayed-message

注:如果配置了發(fā)送回調(diào)ReturnCallback唠倦,插件延遲隊列則會回調(diào)該方法称鳞,因為發(fā)送方確實沒有投遞到隊列上,只是在交換器上暫存稠鼻,等過期時間到了 才會發(fā)往隊列冈止。

消息暫存在交換器
  • SpringBoot集成RabbitMQ常用配置(非本系列用)
#rabbitmq
spring.rabbitmq.host=192.168.89.168
spring.rabbitmq.port=5672
spring.rabbitmq.username=fzb
spring.rabbitmq.password=fzb2019
spring.rabbitmq.virtual-host=fzb_host
#消費者數(shù)量
spring.rabbitmq.listener.simple.concurrency=10
#最大消費者數(shù)量
spring.rabbitmq.listener.simple.max-concurrency=10
#消費者每次從隊列獲取的消息數(shù)量。寫多了候齿,如果長時間得不到消費熙暴,數(shù)據(jù)就一直得不到處理
spring.rabbitmq.listener.simple.prefetch=1
#消費者自動啟動
spring.rabbitmq.listener.simple.auto-startup=true
#消費者消費失敗,自動重新入隊
spring.rabbitmq.listener.simple.default-requeue-rejected=true
#啟用發(fā)送重試 隊列滿了發(fā)不進(jìn)去時啟動重試
spring.rabbitmq.template.retry.enabled=true 
#1秒鐘后重試一次
spring.rabbitmq.template.retry.initial-interval=1000 
#最大重試次數(shù) 3次
spring.rabbitmq.template.retry.max-attempts=3
#最大間隔 10秒鐘
spring.rabbitmq.template.retry.max-interval=10000
#等待間隔 的倍數(shù)慌盯。如果為2  第一次 乘以2 等1秒周霉, 第二次 乘以2 等2秒 ,第三次 乘以2 等4秒
spring.rabbitmq.template.retry.multiplier=1.0

做一個有趣的人亚皂,讓生活更好玩一些

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末俱箱,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子灭必,更是在濱河造成了極大的恐慌狞谱,老刑警劉巖,帶你破解...
    沈念sama閱讀 221,406評論 6 515
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件禁漓,死亡現(xiàn)場離奇詭異跟衅,居然都是意外死亡,警方通過查閱死者的電腦和手機播歼,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 94,395評論 3 398
  • 文/潘曉璐 我一進(jìn)店門与斤,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人荚恶,你說我怎么就攤上這事×字В” “怎么了谒撼?”我有些...
    開封第一講書人閱讀 167,815評論 0 360
  • 文/不壞的土叔 我叫張陵,是天一觀的道長雾狈。 經(jīng)常有香客問我廓潜,道長,這世上最難降的妖魔是什么善榛? 我笑而不...
    開封第一講書人閱讀 59,537評論 1 296
  • 正文 為了忘掉前任辩蛋,我火速辦了婚禮,結(jié)果婚禮上移盆,老公的妹妹穿的比我還像新娘悼院。我一直安慰自己,他們只是感情好咒循,可當(dāng)我...
    茶點故事閱讀 68,536評論 6 397
  • 文/花漫 我一把揭開白布据途。 她就那樣靜靜地躺著绞愚,像睡著了一般。 火紅的嫁衣襯著肌膚如雪颖医。 梳的紋絲不亂的頭發(fā)上位衩,一...
    開封第一講書人閱讀 52,184評論 1 308
  • 那天,我揣著相機與錄音熔萧,去河邊找鬼糖驴。 笑死,一個胖子當(dāng)著我的面吹牛佛致,可吹牛的內(nèi)容都是我干的贮缕。 我是一名探鬼主播,決...
    沈念sama閱讀 40,776評論 3 421
  • 文/蒼蘭香墨 我猛地睜開眼晌杰,長吁一口氣:“原來是場噩夢啊……” “哼跷睦!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起肋演,我...
    開封第一講書人閱讀 39,668評論 0 276
  • 序言:老撾萬榮一對情侶失蹤抑诸,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后爹殊,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體蜕乡,經(jīng)...
    沈念sama閱讀 46,212評論 1 319
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 38,299評論 3 340
  • 正文 我和宋清朗相戀三年梗夸,在試婚紗的時候發(fā)現(xiàn)自己被綠了层玲。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 40,438評論 1 352
  • 序言:一個原本活蹦亂跳的男人離奇死亡反症,死狀恐怖辛块,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情铅碍,我是刑警寧澤润绵,帶...
    沈念sama閱讀 36,128評論 5 349
  • 正文 年R本政府宣布,位于F島的核電站胞谈,受9級特大地震影響尘盼,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜烦绳,卻給世界環(huán)境...
    茶點故事閱讀 41,807評論 3 333
  • 文/蒙蒙 一卿捎、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧径密,春花似錦午阵、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,279評論 0 24
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽括细。三九已至,卻和暖如春戚啥,著一層夾襖步出監(jiān)牢的瞬間奋单,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 33,395評論 1 272
  • 我被黑心中介騙來泰國打工猫十, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留览濒,地道東北人。 一個月前我還...
    沈念sama閱讀 48,827評論 3 376
  • 正文 我出身青樓拖云,卻偏偏與公主長得像贷笛,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子宙项,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 45,446評論 2 359