==1. 簡(jiǎn)介==
1.1 RabbitMQ定義
RabbitMQ是一個(gè)消息代理和隊(duì)列服務(wù)器账胧,用來(lái)在不同應(yīng)用之間共享數(shù)據(jù),是Erlang語(yǔ)言開發(fā)的,基于AMQP協(xié)議骂际。
1.2 AMQP定義
是一個(gè)二進(jìn)制協(xié)議圃伶。
1.3 AMQP協(xié)議模型
1.4 核心概念
1. Server:Broker省咨,接受客戶端連接
2. Connection:連接征懈,應(yīng)用程序與Broker的網(wǎng)絡(luò)連接
3. Channel:網(wǎng)絡(luò)信道鸣剪,Channel是消息讀寫的通道
4. Message:消息沟突,傳遞的數(shù)據(jù)花颗,有properties何body組成,properties是消息的屬性(可以設(shè)置順序ID)惠拭,body是消息內(nèi)容
5. Virtual-Host:虛擬地址扩劝,用于"邏輯隔離"庸论,最上層的"消息路由",一個(gè)Virtual-Host中有多個(gè)Exchange和Queue棒呛,但是不能有同名的
6. Exchange:交換機(jī)聂示,接受消息,根據(jù)路由鍵轉(zhuǎn)發(fā)消息到綁定的隊(duì)列
7. Binding:Exchange和Queue之間的虛擬連接簇秒,binding中可以包含routing-key
8. Routing-key:一個(gè)路由規(guī)則鱼喉,虛擬機(jī)可用他來(lái)確定如何路由一個(gè)特定消息
9. Queue:消息隊(duì)列,保存消息并將它們轉(zhuǎn)發(fā)給消費(fèi)者
1.5 整體架構(gòu)圖
1.6 消息流轉(zhuǎn)圖
1.7 交換機(jī)圖
==2. 安裝與配置==
2.1 準(zhǔn)備
1. rabbitMQ版本要與erlang版本對(duì)應(yīng)起來(lái)
2. rabbitMQ-rpm和erlang-rpm可以去官網(wǎng)下載趋观,tcp_wrappers扛禽、socat可以去https://pkgs.org下載
2.2 安裝+啟動(dòng)
1. rpm -ivh erlang-18.3-1.el7.centos.x86_64.rpm
2. rpm -ivh rabbitmq-server-3.6.5-1.noarch.rpm
3. rpm -ivh socat-1.7.3.2-1.1.el7.x86_64.rpm 如果上一步提示缺少socat
4. rpm -ivh tcp_wrappers-7.6-77.el7.x86_64.rpm 如果上一步提示缺少tcp_wrappers
5. rabbitmq-server start &
6. rabbitmqctl stop_app
7. rabbitmq-plugins enable rabbitmq_managment
2.3 常用命令
1. rabbitmqctl stop_app
2. rabbitmqctl start_app
3. rabbitmqctl status 節(jié)點(diǎn)狀態(tài)
4. rabbitmqctl list_users 列出所有用戶
5. rabbitmqctl list_user_permissions username 列出用戶權(quán)限
6. rabbitmqctl change_password username newpwd 修改用戶密碼
7. rabbitmqctl list_vhosts 列出所有虛擬主機(jī)
8. rabbitmqctl list_permissions -p vhostpath 列出該虛擬主機(jī)的所有權(quán)限
9. rabbitmqctl list_queues 列出所有隊(duì)列
10. rabbitmqctl reset 移除所有數(shù)據(jù)
11. rabbitmqctl join_cluster <cluster-node> [--ram] 組成集群命令
12. rabbitmqctl cluster_status 查看集群狀態(tài)
==3. Exchange==
3.1 交換機(jī)屬性
1. name:交換機(jī)名稱
2. type:交換機(jī)類型(direct、topic皱坛、fanout编曼、headers)
3. durability:是否需要持久化,true為持久化
4. auto-delete:Exchange上的最后一個(gè)Queue被刪除后麸恍,自動(dòng)刪除該Exchange
5. arguments:自定義參數(shù)
3.2 DirectExchange-直連
發(fā)送到DirectExchange的消息灵巧,會(huì)被轉(zhuǎn)發(fā)到RouteKey中指定的Queue。
Direct可以使用Default-Exchange抹沪,不需要進(jìn)行任何的binding操作刻肄,消息傳遞時(shí),RouteKey必須完全匹配融欧。
3.3 TopicExchange-匹配
發(fā)送到TopicExchange的消息敏弃,會(huì)被轉(zhuǎn)發(fā)到,匹配RouteKey中指定的Queue噪馏。
#:匹配多個(gè)詞
*:匹配一個(gè)詞
3.4 FanoutExchange
不處理路由鍵麦到,只要將隊(duì)列綁定到交換機(jī)上;
發(fā)送到交換機(jī)上的消息欠肾,都會(huì)被轉(zhuǎn)發(fā)到瓶颠,與該交換機(jī)綁定的所有隊(duì)列上;
FanoutExchange轉(zhuǎn)發(fā)消息是最快的刺桃;
==4. Binding+Queue+Message+Virtual==
4.1 Binding-綁定
Exchange<-->Exchange粹淋,Exchange<-->Queue,他們之間的綁定關(guān)系
Binding中可以包含RouteKey或者參數(shù)
4.2 Queue-消息隊(duì)列
實(shí)際存儲(chǔ)消息數(shù)據(jù)
Durability:是否持久化瑟慈,Durable:是桃移,Transient:否
Auto-Delete:如果yes,則最后一個(gè)監(jiān)聽被移除后葛碧,該Queue也會(huì)自動(dòng)被刪除
4.3 Message-消息
應(yīng)該程序和服務(wù)器之間傳遞的數(shù)據(jù)借杰,由Properties(可以設(shè)置順序ID)和Body組成
常用屬性:delivery_mode、headers(自定義屬性)进泼、correlation_id:唯一id蔗衡、expiration:過(guò)期時(shí)間
4.4 Virtual-Host-虛擬主機(jī)
虛擬地址纤虽,用于邏輯隔離,最上層的消息路由
一個(gè)Virtual-Host可以有若干個(gè)Exchange和Queue粘都,但是同一個(gè)Virtual-Host中不能有同名的Exchange和Queue
==5. 高級(jí)特性==
5.1 消息如何保證100%的投遞成功
1. 消息落庫(kù)廓推,對(duì)消息狀態(tài)進(jìn)行打標(biāo)
2. 消息延遲投遞,做二次確認(rèn)翩隧,回調(diào)檢查
5.2 冪等性
1. 定義
冪等性 就是防止高并發(fā)的情況下樊展,執(zhí)行結(jié)果都是唯一的。
消費(fèi)端實(shí)現(xiàn)冪等性堆生,就是消息永遠(yuǎn)被消費(fèi)一次专缠。
2. 解決方案
1. 唯一ID+指紋碼,利用數(shù)據(jù)庫(kù)主鍵去重
SELECT COUNT(1) FROM T_ORDER WHERE 唯一ID + 指紋碼
COUNT(1) == 0淑仆,則INSERT涝婉;
好處:簡(jiǎn)單
壞處:高并發(fā)下有數(shù)據(jù)庫(kù)寫入的性能瓶頸
解決:根據(jù)ID進(jìn)行分庫(kù)分表,進(jìn)行算法路由
2. 利用redis的原子性實(shí)現(xiàn)
setnx key value蔗怠、exists key墩弯、redis的自增
問(wèn)題:
數(shù)據(jù)是否需要落庫(kù),落庫(kù)的話寞射,緩存和數(shù)據(jù)庫(kù)如何保證原子性渔工?
數(shù)據(jù)不落庫(kù),如何設(shè)置定時(shí)同步策略桥温?
5.3 confirm確認(rèn)消息
1. 在channel中開啟確認(rèn)模式:channel.confirmSelect();
2. 在channel中添加監(jiān)聽:addConfirmListener();
3. 發(fā)生Nack的情況:磁盤寫滿引矩、Queue達(dá)到上線、MQ其他異常
4. ack和Nack都收不到的情況:就要定時(shí)任務(wù)去處理
5.4 return消息機(jī)制
如果發(fā)送的消息侵浸,Exchange不存在或者RouteKey路由不到旺韭,這時(shí)就需要returnListener。
Mandatory:true-監(jiān)聽器接受到這些不可達(dá)的消息掏觉,false-broker會(huì)自動(dòng)刪除這些消息区端。
消費(fèi)端自定義監(jiān)聽:繼承DefaultConsumer
5.5 消費(fèi)端限流
生產(chǎn)端不會(huì)限流,只有消費(fèi)端限流澳腹;當(dāng)機(jī)器突然有上萬(wàn)條消息织盼,不做限流,可能會(huì)導(dǎo)致消費(fèi)端服務(wù)器崩潰遵湖。
RabbitMQ提供了qos功能:非自動(dòng)簽收消息的情況下,一定數(shù)量消息未被確認(rèn)前(通過(guò)consumer或channel設(shè)置qos值)晚吞,不進(jìn)行消費(fèi)新的消息
void BasicQos(uint prefetchSize = 0 不限制消息大小,
ushort prefetchCount = 1 一次處理1條延旧,手動(dòng)ack后,在處理另一條,
bool global = false 這個(gè)限制是channel級(jí)別還是consumer級(jí)別);
consumer-->handleDelivery-->channel.basicAck(envelope.getDeliveryTag(), false);
consumer-->handleDelivery-->channel.basicNack(envelope.getDeliveryTag(), false, true-->重發(fā));
5.6 TTL隊(duì)列/消息
Time To Live 生存時(shí)間
支持消息的過(guò)期時(shí)間和隊(duì)列的過(guò)期時(shí)間
5.7 DLX-死信隊(duì)列
當(dāng)消息變成死信(沒有被消費(fèi)者消費(fèi)掉)的時(shí)候槽地,他將被重新發(fā)送到另一個(gè)Exchange迁沫,這個(gè)Exchange就是死信隊(duì)列
消息變成死信的情況:
1. 消息被拒絕(basic.reject/basic.nack)并且requeue=false
2. TTL過(guò)期
3. 隊(duì)列打到最大長(zhǎng)度
在隊(duì)列上添加:arguments.put("s-dead-letter-exchange", "dlx.exchange");
==6. Spring-Boot-Demo==
6.1 pom依賴
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
6.2 application.yml
1. 公共配置
spring:
rabbitmq:
addresses: 192.168.11.76:5672
username: guest
password: guest
virtual-host: /
connection-timeout: 15000
2. 生產(chǎn)端配置
publisher-confirms: true
publisher-returns: true
template:
mandatory: true # 保證監(jiān)聽有效
3. 消費(fèi)端配置
listener:
simple:
acknowledge-mode: manual
concurrency: 5
max-concurrency: 10
order:
key: springboot.*
queue:
name: queue-1
durable: true
exchange:
name: exchange-1
durable: true
type: topic
ignoreDeclarationExceptions: true
6.3 生產(chǎn)端
@Component
public class RabbitSender {
@Autowired
private RabbitTemplate rabbitTemplate;
// 回調(diào)函數(shù): confirm確認(rèn)
final ConfirmCallback confirmCallback = new RabbitTemplate.ConfirmCallback() {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
System.err.println("correlationData: " + correlationData);
System.err.println("ack: " + ack);
if(!ack){
System.err.println("異常處理...");
}
}
};
// 回調(diào)函數(shù): return返回
final ReturnCallback returnCallback = new RabbitTemplate.ReturnCallback() {
@Override
public void returnedMessage(org.springframework.amqp.core.Message message, int replyCode, String replyText,
String exchange, String routingKey) {
System.err.println("return exchange: " + exchange + ", routingKey: "
+ routingKey + ", replyCode: " + replyCode + ", replyText: " + replyText);
}
};
// 發(fā)送消息方法調(diào)用: 構(gòu)建Message消息
public void send(Object message, Map<String, Object> properties) throws Exception {
MessageHeaders mhs = new MessageHeaders(properties);
Message msg = MessageBuilder.createMessage(message, mhs);
rabbitTemplate.setConfirmCallback(confirmCallback);
rabbitTemplate.setReturnCallback(returnCallback);
CorrelationData correlationData = new CorrelationData("1234567890"); // id + 時(shí)間戳 全局唯一
rabbitTemplate.convertAndSend("exchange-1", "springboot.abc", msg, correlationData);
}
// 發(fā)送消息方法調(diào)用: 構(gòu)建自定義對(duì)象消息
public void sendOrder(Order order) throws Exception {
rabbitTemplate.setConfirmCallback(confirmCallback);
rabbitTemplate.setReturnCallback(returnCallback);
CorrelationData correlationData = new CorrelationData("0987654321"); //id + 時(shí)間戳 全局唯一
rabbitTemplate.convertAndSend("exchange-2", "springboot.def", order, correlationData);
}
}
6.4 消費(fèi)端
@Component
public class RabbitReceiver {
@RabbitListener(bindings = @QueueBinding(value = @Queue(value = "queue-1", durable="true"),
exchange = @Exchange(value = "exchange-1", durable="true", type= "topic", ignoreDeclarationExceptions = "true"), key = "springboot.*"))
@RabbitHandler
public void onMessage(Message message, Channel channel) throws Exception {
System.err.println("消費(fèi)端Payload: " + message.getPayload());
Long deliveryTag = (Long)message.getHeaders().get(AmqpHeaders.DELIVERY_TAG);
// 手動(dòng)ACK
channel.basicAck(deliveryTag, false);
}
@Value("spring.rabbitmq.listener.order.key")
private String orderKey;
@Value("spring.rabbitmq.listener.order.queue.name")
private String orderQueueName;
@Value("spring.rabbitmq.listener.order.queue.durable")
private String orderQueueDurable;
@Value("spring.rabbitmq.listener.order.exchange.name")
private String orderExchangeName;
@Value("spring.rabbitmq.listener.order.exchange.durable")
private String orderExchangeDurable;
@Value("spring.rabbitmq.listener.order.exchange.type")
private String orderExchangeType;
@Value("spring.rabbitmq.listener.order.exchange.ignoreDeclarationExceptions")
private String orderExchangeIgnoreDeclarationExceptions;
@RabbitListener(bindings = @QueueBinding(value = @Queue(value = orderQueueName, durable = orderQueueDurable),
exchange = @Exchange(value = orderExchangeName, durable = orderExchangeDurable, type = orderExchangeType, ignoreDeclarationExceptions = orderExchangeIgnoreDeclarationExceptions),
key = orderKey))
@RabbitHandler
public void onOrderMessage(@Payload com.bfxy.springboot.entity.Order order,
Channel channel,
@Headers Map<String, Object> headers) throws Exception {
System.err.println("消費(fèi)端order: " + order.getId());
Long deliveryTag = (Long)headers.get(AmqpHeaders.DELIVERY_TAG);
// 手動(dòng)ACK
channel.basicAck(deliveryTag, false);
}
}
==7. Spring-Cloud-Stream==
7.1 架構(gòu)圖
7.2 概念
Barista接口:用來(lái)定義通道的類型和名稱芦瘾,通道名稱作為配置用,通道類型作為該通道是發(fā)送消息還是接受消息
@output:輸出注解
@input:輸入注解
@StreamListener:監(jiān)聽消息注解
7.3 Demo
7.3.1 pom依賴
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
<version>1.3.4.RELEASE</version>
</dependency>
7.3.2 producer-application.yml
spring:
cloud:
stream:
bindings:
output_channel:
destination: exchange-3
group: queue-3
binder: rabbit-cluster
binders:
rabbit-cluster:
type: rabbit
environment:
spring:
rabbitmq:
addresses: 192.168.11.76:5672
username: guest
password: guest
virtual-host: /
7.3.3 定義通道
public interface Barista {
String OUTPUT_CHANNEL = "output_channel";
// @Output聲明了它是一個(gè)輸出類型的通道集畅,名字是output_channel近弟。
@Output(Barista.OUTPUT_CHANNEL)
MessageChannel logoutput();
}
7.3.4 發(fā)送消息
@EnableBinding(Barista.class)
@Service
public class RabbitmqSender {
@Autowired
private Barista barista;
// 發(fā)送消息
public String sendMessage(Object message, Map<String, Object> properties) throws Exception {
try{
MessageHeaders mhs = new MessageHeaders(properties);
Message msg = MessageBuilder.createMessage(message, mhs);
boolean sendStatus = barista.logoutput().send(msg);
System.out.println("發(fā)送數(shù)據(jù):" + message + ",sendStatus: " + sendStatus);
}catch (Exception e){
e.printStackTrace();
}
return null;
}
}
7.3.5 consumer-application.yml
spring:
cloud:
stream:
bindings:
input_channel:
destination: exchange-3
group: queue-3
binder: rabbit-cluster
consumer:
concurrency: 1
rabbit:
bindings:
input_channel:
consumer:
requeue-rejected: false # 是否支持重發(fā)
acknowledge-mode: MANUAL # 手動(dòng)簽收
recovery-interval: 3000 # 3s重連
durable-subscription: true # 是否啟用持久化訂閱
max-concurrency: 5 # 最大監(jiān)聽數(shù)
binders:
rabbit-cluster:
type: rabbit
environment:
spring:
rabbitmq:
addresses: 192.168.11.76:5672
username: guest
password: guest
virtual-host: /
7.3.6 定義通道
public interface Barista {
String INPUT_CHANNEL = "input_channel";
// @Input聲明了它是一個(gè)輸入類型的通道,名字是Barista.INPUT_CHANNEL挺智。
@Input(Barista.INPUT_CHANNEL)
SubscribableChannel loginput();
}
7.3.7 消費(fèi)消息
@EnableBinding(Barista.class)
@Service
public class RabbitmqReceiver {
@StreamListener(Barista.INPUT_CHANNEL)
public void receiver(Message message) throws Exception {
Channel channel = (com.rabbitmq.client.Channel) message.getHeaders().get(AmqpHeaders.CHANNEL);
Long deliveryTag = (Long) message.getHeaders().get(AmqpHeaders.DELIVERY_TAG);
System.out.println("Input Stream 1 接受數(shù)據(jù):" + message);
channel.basicAck(deliveryTag, false);
}
}
==8. RabbitMQ集群架構(gòu)模式==
8.1 簡(jiǎn)介
1. 主備模式:實(shí)現(xiàn)高可用集群祷愉,一般在并發(fā)和數(shù)據(jù)量不高的情況下使用,也稱Warren模式赦颇。
2. 遠(yuǎn)程模式:實(shí)現(xiàn)雙活的模式二鳄,也稱Shovel模式,消息進(jìn)行不同數(shù)據(jù)中心的復(fù)制工作媒怯,可以跨地域的兩個(gè)MQ集群互聯(lián)订讼。
3. 鏡像模式:也稱Mirror模式,保證100%數(shù)據(jù)不丟失扇苞,簡(jiǎn)單欺殿、用的多。
鏡像隊(duì)列:保證數(shù)據(jù)高可靠性方案鳖敷,主要是實(shí)現(xiàn)數(shù)據(jù)同步
8.2 架構(gòu)模式圖
8.2.1 鏡像模式
8.2.2 多活模式
==9. 架構(gòu)設(shè)計(jì)==
9.1 SET化架構(gòu)
業(yè)務(wù):解決業(yè)務(wù)遇到的擴(kuò)展性和容災(zāi)等需求脖苏,支撐業(yè)務(wù)的高速發(fā)展
通用性:架構(gòu)形成統(tǒng)一解決方案,岸邊各業(yè)務(wù)線接入使用
9.2 集群架構(gòu)圖
9.3 RabbitMQ-架構(gòu)設(shè)計(jì)方案
RabbitMQ-架構(gòu)設(shè)計(jì)方案
9.4 批量消息發(fā)送
9.5 順序消息
9.6 事務(wù)消息發(fā)送
9.7 消息冪等性設(shè)計(jì)