Spring Boot整合RabbitMQ(延遲隊(duì)列)

1、什么是RabbitMQ许师?

消息隊(duì)列房蝉,主要是用來實(shí)現(xiàn)應(yīng)用程序的異步和解耦,同時(shí)也能起到消息緩沖微渠,消息分發(fā)的作用搭幻。實(shí)現(xiàn)AMQP(高級(jí)消息隊(duì)列協(xié)議)。當(dāng)生產(chǎn)者大量產(chǎn)生數(shù)據(jù)時(shí)逞盆,消費(fèi)者無法快速消費(fèi)檀蹋,那么需要一個(gè)中間層。保存這個(gè)數(shù)據(jù)云芦。服務(wù)器端用Erlang語言編寫俯逾,支持多種客戶端。

2舅逸、什么是AMQP桌肴?

即Advanced Message Queuing Protocol,高級(jí)消息隊(duì)列協(xié)議堡赔,是應(yīng)用層協(xié)議的一個(gè)開放標(biāo)準(zhǔn)识脆,為面向消息的中間件設(shè)計(jì)。消息中間件主要用于組件之間的解耦善已,消息的發(fā)送者無需知道消息使用者的存在灼捂,反之亦然。AMQP的主要特征是面向消息换团、隊(duì)列悉稠、路由(包括點(diǎn)對(duì)點(diǎn)和發(fā)布/訂閱)、可靠性艘包、安全的猛。

3、相關(guān)概念

發(fā)消息者(生產(chǎn)者)想虎、隊(duì)列卦尊、收消息者(消費(fèi)者),RabbitMQ 在這個(gè)基本概念之上, 多做了一層抽象, 在發(fā)消息者和 隊(duì)列之間, 加入了交換器 (Exchange). 這樣發(fā)消息者和隊(duì)列就沒有直接聯(lián)系, 轉(zhuǎn)而變成發(fā)消息者把消息給交換器, 交換器根據(jù)調(diào)度策略再把消息再給隊(duì)列舌厨。


image.png
  • 左側(cè) P 代表 生產(chǎn)者岂却,也就是往 RabbitMQ 發(fā)消息的程序。
  • 中間即是 RabbitMQ,其中包括了 交換機(jī) 和 隊(duì)列躏哩。
  • 右側(cè) C 代表 消費(fèi)者署浩,也就是往 RabbitMQ 拿消息的程序。

4扫尺、交換機(jī)(Exchange)四種類型:Direct筋栋、topic、Headers正驻、Fanout

  • Direct:direct 類型的行為是"先匹配, 再投送". 即在綁定時(shí)設(shè)定一個(gè) routing_key, 消息的routing_key 匹配時(shí), 才會(huì)被交換器投送到綁定的隊(duì)列中去.
  • Topic:按規(guī)則轉(zhuǎn)發(fā)消息(最靈活)
  • Headers:設(shè)置header attribute參數(shù)類型的交換機(jī)
  • Fanout:轉(zhuǎn)發(fā)消息到所有綁定隊(duì)列

5弊攘、代碼示例

pom文件添加依賴
<dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
消息生產(chǎn)者(以下各個(gè)類型共用此消息發(fā)送者)
package com.gizhi.beam.modular.amqp.service;

import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.util.Date;

@Component
public class ProducerService {
 
    @Autowired
    private AmqpTemplate rabbitTemplate;
 
    public void send() {
        String context = "hello " + new Date();
        System.out.println("Sender : " + context);
        this.rabbitTemplate.convertAndSend("task_queue", context);
    }

    public void topicSend1() {
        String context = "hi, i am message 1";
        System.out.println("Sender : " + context);
        this.rabbitTemplate.convertAndSend("exchange", "topic.message", context);
    }

    public void topicSend2() {
        String context = "hi, i am messages 2";
        System.out.println("Sender : " + context);
        this.rabbitTemplate.convertAndSend("exchange", "topic.messages", context);
    }

    public void fanoutsend() {
        String context = "hi, fanout msg ";
        System.out.println("Sender : " + context);
        this.rabbitTemplate.convertAndSend("fanoutExchange","", context);
    }
 
}

1、Direct類型

RabbitConfig
package com.gizhi.beam.modular.amqp.config;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitConfig {
 
    @Bean
    public Queue Queue() {
        return new Queue("task_queue");
    }
 
}

消費(fèi)者
package com.gizhi.beam.modular.amqp.service;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
@RabbitListener(queues = "task_queue")
public class CustomerService {
 
    @RabbitHandler
    public void process(String hello) {
        System.out.println("Receiver  : " + hello);
    }
 
}

2拨拓、Topic類型

TopicRabbitConfig
package com.gizhi.beam.modular.amqp.config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
//topic 是RabbitMQ中最靈活的一種方式肴颊,可以根據(jù)routing_key自由的綁定不同的隊(duì)列
@Configuration
public class TopicRabbitConfig {

    final static String message = "topic.message";
    final static String messages = "topic.messages";

    @Bean
    public Queue queueMessage() {
        return new Queue(TopicRabbitConfig.message);
    }

    @Bean
    public Queue queueMessages() {
        return new Queue(TopicRabbitConfig.messages);
    }

    @Bean
    TopicExchange exchange() {
        return new TopicExchange("exchange");
    }

    @Bean
    Binding bindingExchangeMessage(Queue queueMessage, TopicExchange exchange) {
        return BindingBuilder.bind(queueMessage).to(exchange).with("topic.message");
    }

    @Bean
    Binding bindingExchangeMessages(Queue queueMessages, TopicExchange exchange) {
        // #:相當(dāng)于一個(gè)或者多個(gè)單詞,例如一個(gè)匹配模式是topic.#渣磷,那么,以topic開頭的路由鍵都是可以的
        // *:相當(dāng)于一個(gè)單詞授瘦,例如一個(gè)匹配模式是topic.*醋界,那么,以topic開頭的路由鍵,后面接一個(gè)單詞的都可以
        return BindingBuilder.bind(queueMessages).to(exchange).with("topic.#");
    }
}
消費(fèi)者
@Component
public class TopicCustomerService {

    @RabbitListener(queues = "topic.message")
    public void message(String msg) {
        System.out.println("message  : " + msg);
    }
    @RabbitListener(queues = "topic.messages")
    public void messages(String msg) {
        System.out.println("messages  : " + msg);
    }

 
}

3提完、Fanout類型

FanoutRabbitConfig
package com.gizhi.beam.modular.amqp.config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
//Fanout 就是我們熟悉的廣播模式或者訂閱模式形纺,給Fanout交換機(jī)發(fā)送消息,綁定了這個(gè)交換機(jī)的所有隊(duì)列都收到這個(gè)消息徒欣。

@Configuration
public class FanoutRabbitConfig {
    @Bean
    public Queue AMessage() {
        return new Queue("fanout.A");
    }
 
    @Bean
    public Queue BMessage() {
        return new Queue("fanout.B");
    }
 
    @Bean
    public Queue CMessage() {
        return new Queue("fanout.C");
    }
 
    @Bean
    FanoutExchange fanoutExchange() {
        return new FanoutExchange("fanoutExchange");
    }
 
    @Bean
    Binding bindingExchangeA(Queue AMessage, FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(AMessage).to(fanoutExchange);
    }
 
    @Bean
    Binding bindingExchangeB(Queue BMessage, FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(BMessage).to(fanoutExchange);
    }
 
    @Bean
    Binding bindingExchangeC(Queue CMessage, FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(CMessage).to(fanoutExchange);
    }
 
}
消費(fèi)者
package com.gizhi.beam.modular.amqp.service;

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class FanoutCustomerAService {

    @RabbitListener(queues = "fanout.A")
    public void fanoutA(String msg) {
        System.out.println("A  : " + msg);
    }

    @RabbitListener(queues = "fanout.B")
    public void fanoutB(String msg) {
        System.out.println("B  : " + msg);
    }

    @RabbitListener(queues = "fanout.C")
    public void fanoutC(String msg) {
        System.out.println("C  : " + msg);
    }
 
}

4逐样、延遲隊(duì)列

AMQP協(xié)議和RabbitMQ隊(duì)列本身沒有直接支持延遲隊(duì)列功能,但是我們可以通過RabbitMQ的兩個(gè)特性來曲線實(shí)現(xiàn)延遲隊(duì)列:
  • Time To Live(TTL)
  • Dead Letter Exchanges(DLX)
原理:

RabbitMQ可以針對(duì)Queue設(shè)置x-expires 或者 針對(duì)Message設(shè)置 x-message-ttl打肝,來控制消息的生存時(shí)間脂新,如果超時(shí)(兩者同時(shí)設(shè)置以最先到期的時(shí)間為準(zhǔn)),則消息變?yōu)閐ead letter(死信)粗梭。RabbitMQ的Queue可以配置x-dead-letter-exchange 和x-dead-letter-routing-key(可選)兩個(gè)參數(shù)争便,如果隊(duì)列內(nèi)出現(xiàn)了dead letter,則按照這兩個(gè)參數(shù)重新路由轉(zhuǎn)發(fā)到指定的隊(duì)列断医。
x-dead-letter-exchange:出現(xiàn)dead letter之后將dead letter重新發(fā)送到指定exchange
x-dead-letter-routing-key:出現(xiàn)dead letter之后將dead letter重新按照指定的routing-key發(fā)送滞乙。


image.png
DelayRabbitConfig
package com.gizhi.beam.modular.amqp.config;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
 
import java.util.HashMap;
import java.util.Map;
 
@Configuration
@Slf4j
public class DelayRabbitConfig {
 
 
    /**
     * 延遲隊(duì)列 TTL 名稱
     */
    private static final String ORDER_DELAY_QUEUE = "user.order.delay.queue";
    /**
     * DLX,dead letter發(fā)送到的 exchange
     * 延時(shí)消息就是發(fā)送到該交換機(jī)的
     */
    public static final String ORDER_DELAY_EXCHANGE = "user.order.delay.exchange";
    /**
     * routing key 名稱
     * 具體消息發(fā)送在該 routingKey 的
     */
    public static final String ORDER_DELAY_ROUTING_KEY = "order_delay";
 
    public static final String ORDER_QUEUE_NAME = "user.order.queue";
    public static final String ORDER_EXCHANGE_NAME = "user.order.exchange";
    public static final String ORDER_ROUTING_KEY = "order";
 
    /**
     * 延遲隊(duì)列配置
     * <p>
     * 1鉴嗤、params.put("x-message-ttl", 5 * 1000);
     * 第一種方式是直接設(shè)置 Queue 延遲時(shí)間 但如果直接給隊(duì)列設(shè)置過期時(shí)間,這種做法不是很靈活,(當(dāng)然二者是兼容的,默認(rèn)是時(shí)間小的優(yōu)先)
     * 2斩启、rabbitTemplate.convertAndSend(book, message -> {
     * message.getMessageProperties().setExpiration(2 * 1000 + "");
     * return message;
     * });
     * 第二種就是每次發(fā)送消息動(dòng)態(tài)設(shè)置延遲時(shí)間,這樣我們可以靈活控制
     **/
    @Bean
    public Queue delayOrderQueue() {
        Map<String, Object> params = new HashMap<>();
        // x-dead-letter-exchange 聲明了隊(duì)列里的死信轉(zhuǎn)發(fā)到的DLX名稱,
        params.put("x-dead-letter-exchange", ORDER_EXCHANGE_NAME);
        // x-dead-letter-routing-key 聲明了這些死信在轉(zhuǎn)發(fā)時(shí)攜帶的 routing-key 名稱醉锅。
        params.put("x-dead-letter-routing-key", ORDER_ROUTING_KEY);
        return new Queue(ORDER_DELAY_QUEUE, true, false, false, params);
    }
    /**
     * 需要將一個(gè)隊(duì)列綁定到交換機(jī)上兔簇,要求該消息與一個(gè)特定的路由鍵完全匹配。
     * 這是一個(gè)完整的匹配。如果一個(gè)隊(duì)列綁定到該交換機(jī)上要求路由鍵 “dog”男韧,則只有被標(biāo)記為“dog”的消息才被轉(zhuǎn)發(fā)朴摊,
     * 不會(huì)轉(zhuǎn)發(fā)dog.puppy,也不會(huì)轉(zhuǎn)發(fā)dog.guard此虑,只會(huì)轉(zhuǎn)發(fā)dog甚纲。
     * @return DirectExchange
     */
    @Bean
    public DirectExchange orderDelayExchange() {
        return new DirectExchange(ORDER_DELAY_EXCHANGE);
    }
    @Bean
    public Binding dlxBinding() {
        return BindingBuilder.bind(delayOrderQueue()).to(orderDelayExchange()).with(ORDER_DELAY_ROUTING_KEY);
    }
 
    @Bean
    public Queue orderQueue() {
        return new Queue(ORDER_QUEUE_NAME, true);
    }
    /**
     * 將路由鍵和某模式進(jìn)行匹配。此時(shí)隊(duì)列需要綁定要一個(gè)模式上朦前。
     * 符號(hào)“#”匹配一個(gè)或多個(gè)詞介杆,符號(hào)“*”匹配不多不少一個(gè)詞。因此“audit.#”能夠匹配到“audit.irs.corporate”韭寸,但是“audit.*” 只會(huì)匹配到“audit.irs”春哨。
     **/
    @Bean
    public TopicExchange orderTopicExchange() {
        return new TopicExchange(ORDER_EXCHANGE_NAME);
    }
 
    @Bean
    public Binding orderBinding() {
        // TODO 如果要讓延遲隊(duì)列之間有關(guān)聯(lián),這里的 routingKey 和 綁定的交換機(jī)很關(guān)鍵
        return BindingBuilder.bind(orderQueue()).to(orderTopicExchange()).with(ORDER_ROUTING_KEY);
    }
 
}

生產(chǎn)者DelaySender
package com.gizhi.beam.modular.amqp.service;

import com.gizhi.beam.modular.amqp.config.DelayRabbitConfig;
import com.gizhi.beam.modular.amqp.dto.Order;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
 
import java.util.Date;
 
@Component
@Slf4j
public class DelaySender {
 
    @Autowired
    private AmqpTemplate amqpTemplate;
 
    public void sendDelay(Order order) {
        log.info("【訂單生成時(shí)間】" + new Date().toString() +"【1分鐘后檢查訂單是否已經(jīng)支付】" + order.toString() );
        this.amqpTemplate.convertAndSend(DelayRabbitConfig.ORDER_DELAY_EXCHANGE, DelayRabbitConfig.ORDER_DELAY_ROUTING_KEY, order, message -> {
            // 如果配置了 params.put("x-message-ttl", 5 * 1000); 那么這一句也可以省略,具體根據(jù)業(yè)務(wù)需要是聲明 Queue 的時(shí)候就指定好延遲時(shí)間還是在發(fā)送自己控制時(shí)間
            message.getMessageProperties().setExpiration(1 * 1000 * 60 + "");
            return message;
        });
    }
}

消費(fèi)者DelayReceiver
package com.gizhi.beam.modular.amqp.service;

import com.gizhi.beam.modular.amqp.config.DelayRabbitConfig;
import com.gizhi.beam.modular.amqp.dto.Order;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
 
import java.util.Date;
 
@Component
@Slf4j
public class DelayReceiver {
 
    @RabbitListener(queues = {DelayRabbitConfig.ORDER_QUEUE_NAME})
    public void orderDelayQueue(Order order, Message message, Channel channel) {
        log.info("###########################################");
        log.info("【orderDelayQueue 監(jiān)聽的消息】 - 【消費(fèi)時(shí)間】 - [{}]- 【訂單內(nèi)容】 - [{}]",  new Date(), order.toString());
        if(order.getOrderStatus() == 0) {
            order.setOrderStatus(2);
            log.info("【該訂單未支付,取消訂單】" + order.toString());
        } else if(order.getOrderStatus() == 1) {
            log.info("【該訂單已完成支付】");
        } else if(order.getOrderStatus() == 2) {
            log.info("【該訂單已取消】");
        }
        log.info("###########################################");
    }
}

5恩伺、源碼地址:https://gitee.com/hsshy/beam-example

6赴背、關(guān)注我的公眾號(hào),互相學(xué)習(xí)晶渠。

image.png

參考鏈接

https://blog.csdn.net/ztx114/article/details/78410727
https://blog.csdn.net/woaitingting1985/article/details/79087357
https://blog.csdn.net/lizc_lizc/article/details/80722763
http://www.reibang.com/p/911d987b5f11
https://www.cnblogs.com/cag2050/p/7724055.html

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末凰荚,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子褒脯,更是在濱河造成了極大的恐慌便瑟,老刑警劉巖,帶你破解...
    沈念sama閱讀 211,817評(píng)論 6 492
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件番川,死亡現(xiàn)場(chǎng)離奇詭異到涂,居然都是意外死亡,警方通過查閱死者的電腦和手機(jī)颁督,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,329評(píng)論 3 385
  • 文/潘曉璐 我一進(jìn)店門践啄,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人适篙,你說我怎么就攤上這事往核。” “怎么了嚷节?”我有些...
    開封第一講書人閱讀 157,354評(píng)論 0 348
  • 文/不壞的土叔 我叫張陵聂儒,是天一觀的道長。 經(jīng)常有香客問我硫痰,道長衩婚,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 56,498評(píng)論 1 284
  • 正文 為了忘掉前任效斑,我火速辦了婚禮非春,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘。我一直安慰自己奇昙,他們只是感情好护侮,可當(dāng)我...
    茶點(diǎn)故事閱讀 65,600評(píng)論 6 386
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著储耐,像睡著了一般羊初。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上什湘,一...
    開封第一講書人閱讀 49,829評(píng)論 1 290
  • 那天长赞,我揣著相機(jī)與錄音,去河邊找鬼闽撤。 笑死得哆,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的哟旗。 我是一名探鬼主播贩据,決...
    沈念sama閱讀 38,979評(píng)論 3 408
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場(chǎng)噩夢(mèng)啊……” “哼闸餐!你這毒婦竟也來了乐设?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 37,722評(píng)論 0 266
  • 序言:老撾萬榮一對(duì)情侶失蹤绎巨,失蹤者是張志新(化名)和其女友劉穎,沒想到半個(gè)月后蠕啄,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體场勤,經(jīng)...
    沈念sama閱讀 44,189評(píng)論 1 303
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,519評(píng)論 2 327
  • 正文 我和宋清朗相戀三年歼跟,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了和媳。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 38,654評(píng)論 1 340
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡哈街,死狀恐怖留瞳,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情骚秦,我是刑警寧澤她倘,帶...
    沈念sama閱讀 34,329評(píng)論 4 330
  • 正文 年R本政府宣布,位于F島的核電站作箍,受9級(jí)特大地震影響硬梁,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜胞得,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,940評(píng)論 3 313
  • 文/蒙蒙 一荧止、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧,春花似錦跃巡、人聲如沸危号。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,762評(píng)論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽外莲。三九已至,卻和暖如春娘香,著一層夾襖步出監(jiān)牢的瞬間苍狰,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 31,993評(píng)論 1 266
  • 我被黑心中介騙來泰國打工烘绽, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留淋昭,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 46,382評(píng)論 2 360
  • 正文 我出身青樓安接,卻偏偏與公主長得像翔忽,于是被迫代替她去往敵國和親。 傳聞我的和親對(duì)象是個(gè)殘疾皇子盏檐,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 43,543評(píng)論 2 349

推薦閱讀更多精彩內(nèi)容