1、什么是RabbitMQ许师?
消息隊(duì)列房蝉,主要是用來實(shí)現(xiàn)應(yīng)用程序的異步和解耦,同時(shí)也能起到消息緩沖微渠,消息分發(fā)的作用搭幻。實(shí)現(xiàn)AMQP(高級(jí)消息隊(duì)列協(xié)議)。當(dāng)生產(chǎn)者大量產(chǎn)生數(shù)據(jù)時(shí)逞盆,消費(fèi)者無法快速消費(fèi)檀蹋,那么需要一個(gè)中間層。保存這個(gè)數(shù)據(jù)云芦。服務(wù)器端用Erlang語言編寫俯逾,支持多種客戶端。
2舅逸、什么是AMQP桌肴?
即Advanced Message Queuing Protocol,高級(jí)消息隊(duì)列協(xié)議堡赔,是應(yīng)用層協(xié)議的一個(gè)開放標(biāo)準(zhǔn)识脆,為面向消息的中間件設(shè)計(jì)。消息中間件主要用于組件之間的解耦善已,消息的發(fā)送者無需知道消息使用者的存在灼捂,反之亦然。AMQP的主要特征是面向消息换团、隊(duì)列悉稠、路由(包括點(diǎn)對(duì)點(diǎn)和發(fā)布/訂閱)、可靠性艘包、安全的猛。
3、相關(guān)概念
發(fā)消息者(生產(chǎn)者)想虎、隊(duì)列卦尊、收消息者(消費(fèi)者),RabbitMQ 在這個(gè)基本概念之上, 多做了一層抽象, 在發(fā)消息者和 隊(duì)列之間, 加入了交換器 (Exchange). 這樣發(fā)消息者和隊(duì)列就沒有直接聯(lián)系, 轉(zhuǎn)而變成發(fā)消息者把消息給交換器, 交換器根據(jù)調(diào)度策略再把消息再給隊(duì)列舌厨。
- 左側(cè) P 代表 生產(chǎn)者岂却,也就是往 RabbitMQ 發(fā)消息的程序。
- 中間即是 RabbitMQ,其中包括了 交換機(jī) 和 隊(duì)列躏哩。
- 右側(cè) C 代表 消費(fèi)者署浩,也就是往 RabbitMQ 拿消息的程序。
4扫尺、交換機(jī)(Exchange)四種類型:Direct筋栋、topic、Headers正驻、Fanout
- Direct:direct 類型的行為是"先匹配, 再投送". 即在綁定時(shí)設(shè)定一個(gè) routing_key, 消息的routing_key 匹配時(shí), 才會(huì)被交換器投送到綁定的隊(duì)列中去.
- Topic:按規(guī)則轉(zhuǎn)發(fā)消息(最靈活)
- Headers:設(shè)置header attribute參數(shù)類型的交換機(jī)
- Fanout:轉(zhuǎn)發(fā)消息到所有綁定隊(duì)列
5弊攘、代碼示例
pom文件添加依賴
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
消息生產(chǎn)者(以下各個(gè)類型共用此消息發(fā)送者)
package com.gizhi.beam.modular.amqp.service;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.Date;
@Component
public class ProducerService {
@Autowired
private AmqpTemplate rabbitTemplate;
public void send() {
String context = "hello " + new Date();
System.out.println("Sender : " + context);
this.rabbitTemplate.convertAndSend("task_queue", context);
}
public void topicSend1() {
String context = "hi, i am message 1";
System.out.println("Sender : " + context);
this.rabbitTemplate.convertAndSend("exchange", "topic.message", context);
}
public void topicSend2() {
String context = "hi, i am messages 2";
System.out.println("Sender : " + context);
this.rabbitTemplate.convertAndSend("exchange", "topic.messages", context);
}
public void fanoutsend() {
String context = "hi, fanout msg ";
System.out.println("Sender : " + context);
this.rabbitTemplate.convertAndSend("fanoutExchange","", context);
}
}
1、Direct類型
RabbitConfig
package com.gizhi.beam.modular.amqp.config;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitConfig {
@Bean
public Queue Queue() {
return new Queue("task_queue");
}
}
消費(fèi)者
package com.gizhi.beam.modular.amqp.service;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
@RabbitListener(queues = "task_queue")
public class CustomerService {
@RabbitHandler
public void process(String hello) {
System.out.println("Receiver : " + hello);
}
}
2拨拓、Topic類型
TopicRabbitConfig
package com.gizhi.beam.modular.amqp.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
//topic 是RabbitMQ中最靈活的一種方式肴颊,可以根據(jù)routing_key自由的綁定不同的隊(duì)列
@Configuration
public class TopicRabbitConfig {
final static String message = "topic.message";
final static String messages = "topic.messages";
@Bean
public Queue queueMessage() {
return new Queue(TopicRabbitConfig.message);
}
@Bean
public Queue queueMessages() {
return new Queue(TopicRabbitConfig.messages);
}
@Bean
TopicExchange exchange() {
return new TopicExchange("exchange");
}
@Bean
Binding bindingExchangeMessage(Queue queueMessage, TopicExchange exchange) {
return BindingBuilder.bind(queueMessage).to(exchange).with("topic.message");
}
@Bean
Binding bindingExchangeMessages(Queue queueMessages, TopicExchange exchange) {
// #:相當(dāng)于一個(gè)或者多個(gè)單詞,例如一個(gè)匹配模式是topic.#渣磷,那么,以topic開頭的路由鍵都是可以的
// *:相當(dāng)于一個(gè)單詞授瘦,例如一個(gè)匹配模式是topic.*醋界,那么,以topic開頭的路由鍵,后面接一個(gè)單詞的都可以
return BindingBuilder.bind(queueMessages).to(exchange).with("topic.#");
}
}
消費(fèi)者
@Component
public class TopicCustomerService {
@RabbitListener(queues = "topic.message")
public void message(String msg) {
System.out.println("message : " + msg);
}
@RabbitListener(queues = "topic.messages")
public void messages(String msg) {
System.out.println("messages : " + msg);
}
}
3提完、Fanout類型
FanoutRabbitConfig
package com.gizhi.beam.modular.amqp.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
//Fanout 就是我們熟悉的廣播模式或者訂閱模式形纺,給Fanout交換機(jī)發(fā)送消息,綁定了這個(gè)交換機(jī)的所有隊(duì)列都收到這個(gè)消息徒欣。
@Configuration
public class FanoutRabbitConfig {
@Bean
public Queue AMessage() {
return new Queue("fanout.A");
}
@Bean
public Queue BMessage() {
return new Queue("fanout.B");
}
@Bean
public Queue CMessage() {
return new Queue("fanout.C");
}
@Bean
FanoutExchange fanoutExchange() {
return new FanoutExchange("fanoutExchange");
}
@Bean
Binding bindingExchangeA(Queue AMessage, FanoutExchange fanoutExchange) {
return BindingBuilder.bind(AMessage).to(fanoutExchange);
}
@Bean
Binding bindingExchangeB(Queue BMessage, FanoutExchange fanoutExchange) {
return BindingBuilder.bind(BMessage).to(fanoutExchange);
}
@Bean
Binding bindingExchangeC(Queue CMessage, FanoutExchange fanoutExchange) {
return BindingBuilder.bind(CMessage).to(fanoutExchange);
}
}
消費(fèi)者
package com.gizhi.beam.modular.amqp.service;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class FanoutCustomerAService {
@RabbitListener(queues = "fanout.A")
public void fanoutA(String msg) {
System.out.println("A : " + msg);
}
@RabbitListener(queues = "fanout.B")
public void fanoutB(String msg) {
System.out.println("B : " + msg);
}
@RabbitListener(queues = "fanout.C")
public void fanoutC(String msg) {
System.out.println("C : " + msg);
}
}
4逐样、延遲隊(duì)列
AMQP協(xié)議和RabbitMQ隊(duì)列本身沒有直接支持延遲隊(duì)列功能,但是我們可以通過RabbitMQ的兩個(gè)特性來曲線實(shí)現(xiàn)延遲隊(duì)列:
- Time To Live(TTL)
- Dead Letter Exchanges(DLX)
原理:
RabbitMQ可以針對(duì)Queue設(shè)置x-expires 或者 針對(duì)Message設(shè)置 x-message-ttl打肝,來控制消息的生存時(shí)間脂新,如果超時(shí)(兩者同時(shí)設(shè)置以最先到期的時(shí)間為準(zhǔn)),則消息變?yōu)閐ead letter(死信)粗梭。RabbitMQ的Queue可以配置x-dead-letter-exchange 和x-dead-letter-routing-key(可選)兩個(gè)參數(shù)争便,如果隊(duì)列內(nèi)出現(xiàn)了dead letter,則按照這兩個(gè)參數(shù)重新路由轉(zhuǎn)發(fā)到指定的隊(duì)列断医。
x-dead-letter-exchange:出現(xiàn)dead letter之后將dead letter重新發(fā)送到指定exchange
x-dead-letter-routing-key:出現(xiàn)dead letter之后將dead letter重新按照指定的routing-key發(fā)送滞乙。
DelayRabbitConfig
package com.gizhi.beam.modular.amqp.config;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
@Configuration
@Slf4j
public class DelayRabbitConfig {
/**
* 延遲隊(duì)列 TTL 名稱
*/
private static final String ORDER_DELAY_QUEUE = "user.order.delay.queue";
/**
* DLX,dead letter發(fā)送到的 exchange
* 延時(shí)消息就是發(fā)送到該交換機(jī)的
*/
public static final String ORDER_DELAY_EXCHANGE = "user.order.delay.exchange";
/**
* routing key 名稱
* 具體消息發(fā)送在該 routingKey 的
*/
public static final String ORDER_DELAY_ROUTING_KEY = "order_delay";
public static final String ORDER_QUEUE_NAME = "user.order.queue";
public static final String ORDER_EXCHANGE_NAME = "user.order.exchange";
public static final String ORDER_ROUTING_KEY = "order";
/**
* 延遲隊(duì)列配置
* <p>
* 1鉴嗤、params.put("x-message-ttl", 5 * 1000);
* 第一種方式是直接設(shè)置 Queue 延遲時(shí)間 但如果直接給隊(duì)列設(shè)置過期時(shí)間,這種做法不是很靈活,(當(dāng)然二者是兼容的,默認(rèn)是時(shí)間小的優(yōu)先)
* 2斩启、rabbitTemplate.convertAndSend(book, message -> {
* message.getMessageProperties().setExpiration(2 * 1000 + "");
* return message;
* });
* 第二種就是每次發(fā)送消息動(dòng)態(tài)設(shè)置延遲時(shí)間,這樣我們可以靈活控制
**/
@Bean
public Queue delayOrderQueue() {
Map<String, Object> params = new HashMap<>();
// x-dead-letter-exchange 聲明了隊(duì)列里的死信轉(zhuǎn)發(fā)到的DLX名稱,
params.put("x-dead-letter-exchange", ORDER_EXCHANGE_NAME);
// x-dead-letter-routing-key 聲明了這些死信在轉(zhuǎn)發(fā)時(shí)攜帶的 routing-key 名稱醉锅。
params.put("x-dead-letter-routing-key", ORDER_ROUTING_KEY);
return new Queue(ORDER_DELAY_QUEUE, true, false, false, params);
}
/**
* 需要將一個(gè)隊(duì)列綁定到交換機(jī)上兔簇,要求該消息與一個(gè)特定的路由鍵完全匹配。
* 這是一個(gè)完整的匹配。如果一個(gè)隊(duì)列綁定到該交換機(jī)上要求路由鍵 “dog”男韧,則只有被標(biāo)記為“dog”的消息才被轉(zhuǎn)發(fā)朴摊,
* 不會(huì)轉(zhuǎn)發(fā)dog.puppy,也不會(huì)轉(zhuǎn)發(fā)dog.guard此虑,只會(huì)轉(zhuǎn)發(fā)dog甚纲。
* @return DirectExchange
*/
@Bean
public DirectExchange orderDelayExchange() {
return new DirectExchange(ORDER_DELAY_EXCHANGE);
}
@Bean
public Binding dlxBinding() {
return BindingBuilder.bind(delayOrderQueue()).to(orderDelayExchange()).with(ORDER_DELAY_ROUTING_KEY);
}
@Bean
public Queue orderQueue() {
return new Queue(ORDER_QUEUE_NAME, true);
}
/**
* 將路由鍵和某模式進(jìn)行匹配。此時(shí)隊(duì)列需要綁定要一個(gè)模式上朦前。
* 符號(hào)“#”匹配一個(gè)或多個(gè)詞介杆,符號(hào)“*”匹配不多不少一個(gè)詞。因此“audit.#”能夠匹配到“audit.irs.corporate”韭寸,但是“audit.*” 只會(huì)匹配到“audit.irs”春哨。
**/
@Bean
public TopicExchange orderTopicExchange() {
return new TopicExchange(ORDER_EXCHANGE_NAME);
}
@Bean
public Binding orderBinding() {
// TODO 如果要讓延遲隊(duì)列之間有關(guān)聯(lián),這里的 routingKey 和 綁定的交換機(jī)很關(guān)鍵
return BindingBuilder.bind(orderQueue()).to(orderTopicExchange()).with(ORDER_ROUTING_KEY);
}
}
生產(chǎn)者DelaySender
package com.gizhi.beam.modular.amqp.service;
import com.gizhi.beam.modular.amqp.config.DelayRabbitConfig;
import com.gizhi.beam.modular.amqp.dto.Order;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.Date;
@Component
@Slf4j
public class DelaySender {
@Autowired
private AmqpTemplate amqpTemplate;
public void sendDelay(Order order) {
log.info("【訂單生成時(shí)間】" + new Date().toString() +"【1分鐘后檢查訂單是否已經(jīng)支付】" + order.toString() );
this.amqpTemplate.convertAndSend(DelayRabbitConfig.ORDER_DELAY_EXCHANGE, DelayRabbitConfig.ORDER_DELAY_ROUTING_KEY, order, message -> {
// 如果配置了 params.put("x-message-ttl", 5 * 1000); 那么這一句也可以省略,具體根據(jù)業(yè)務(wù)需要是聲明 Queue 的時(shí)候就指定好延遲時(shí)間還是在發(fā)送自己控制時(shí)間
message.getMessageProperties().setExpiration(1 * 1000 * 60 + "");
return message;
});
}
}
消費(fèi)者DelayReceiver
package com.gizhi.beam.modular.amqp.service;
import com.gizhi.beam.modular.amqp.config.DelayRabbitConfig;
import com.gizhi.beam.modular.amqp.dto.Order;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.util.Date;
@Component
@Slf4j
public class DelayReceiver {
@RabbitListener(queues = {DelayRabbitConfig.ORDER_QUEUE_NAME})
public void orderDelayQueue(Order order, Message message, Channel channel) {
log.info("###########################################");
log.info("【orderDelayQueue 監(jiān)聽的消息】 - 【消費(fèi)時(shí)間】 - [{}]- 【訂單內(nèi)容】 - [{}]", new Date(), order.toString());
if(order.getOrderStatus() == 0) {
order.setOrderStatus(2);
log.info("【該訂單未支付,取消訂單】" + order.toString());
} else if(order.getOrderStatus() == 1) {
log.info("【該訂單已完成支付】");
} else if(order.getOrderStatus() == 2) {
log.info("【該訂單已取消】");
}
log.info("###########################################");
}
}
5恩伺、源碼地址:https://gitee.com/hsshy/beam-example
6赴背、關(guān)注我的公眾號(hào),互相學(xué)習(xí)晶渠。
參考鏈接
https://blog.csdn.net/ztx114/article/details/78410727
https://blog.csdn.net/woaitingting1985/article/details/79087357
https://blog.csdn.net/lizc_lizc/article/details/80722763
http://www.reibang.com/p/911d987b5f11
https://www.cnblogs.com/cag2050/p/7724055.html