AMQP
高級(jí)消息隊(duì)列協(xié)議:是進(jìn)程之間傳遞異步消息的網(wǎng)絡(luò)協(xié)議
AMQP的工作過程
發(fā)布者(Publisher)發(fā)布消息(Message),經(jīng)過交換機(jī)(Exchange)阐枣,交換機(jī)根據(jù)路由規(guī)則將收到消息分發(fā)給交換機(jī)綁定的隊(duì)列(Queue)量蕊,最后AMQP代理會(huì)將消息投遞給訂閱了此隊(duì)列的消費(fèi)者雁刷,或者消費(fèi)者按照需求自行獲取。
消息隊(duì)列
什么是消息隊(duì)列(mq)
消息隊(duì)列就是一個(gè)隊(duì)列結(jié)構(gòu)的中間件割捅。主要是分布式之間實(shí)現(xiàn)異步通信的方式。
為什么需要消息隊(duì)列&消息隊(duì)列有哪些作用
解耦:當(dāng)a系統(tǒng)通過接口調(diào)用方式,發(fā)送數(shù)據(jù)給多個(gè)系統(tǒng)咖楣,那么當(dāng)增加一個(gè)需要數(shù)據(jù)的z系統(tǒng)時(shí),a就會(huì)增加一條給z發(fā)送數(shù)據(jù)代碼缕题。這樣截歉,耦合度非常高。所以使用消息隊(duì)列烟零,瘪松,我們只需要讓z系統(tǒng)監(jiān)聽消息即可
異步:當(dāng)a系統(tǒng)發(fā)送一條命令給b系統(tǒng)執(zhí)行咸作,而a不需要接收返回值時(shí)。我們需等待a執(zhí)行完宵睦,程序才能執(zhí)行下去记罚,這樣很浪費(fèi)性能。所以壳嚎,我們使用mq
削峰:當(dāng)并發(fā)數(shù)過高時(shí)桐智,系統(tǒng)無法承載。這樣就先將請求數(shù)據(jù)緩存在mq隊(duì)列烟馅,然后根據(jù)消費(fèi)端能力進(jìn)行消費(fèi)
消息隊(duì)列缺點(diǎn)
系統(tǒng)穩(wěn)定性说庭,可用性降低降低。系統(tǒng)復(fù)雜性提高
RabbitMQ
RabbitMQ適用場景
排隊(duì)算法
秒殺活動(dòng)
消息分發(fā)
異步處理
數(shù)據(jù)同步
處理耗時(shí)任務(wù)
流量銷峰
RabbitMQ幾大組件
Message(消息)郑趁。消息是不具名的刊驴,它由消息頭消息體組成。消息體是不透明的寡润,而消息頭則由一系列可選屬性組成捆憎,這些屬性包括:routing-key(路由鍵)、priority(相對于其他消息的優(yōu)先權(quán))梭纹、delivery-mode(指出消息可能持久性存儲(chǔ))等躲惰。
exchange(交換機(jī)):交換器。用來接收生產(chǎn)者發(fā)送的消息并將這些消息路由給服務(wù)器中的隊(duì)列变抽。三種常用的交換器類型1. direct(發(fā)布與訂閱 完全匹配)2. fanout(廣播)3. topic(主題础拨,規(guī)則匹配)
Queue(隊(duì)列):消息隊(duì)列。用來保存消息直到發(fā)送給消費(fèi)者瞬沦。它是消息的容器太伊,也是消息的終點(diǎn)。一個(gè)消息可投入一個(gè)或多個(gè)隊(duì)列逛钻。消息一直在隊(duì)列里面僚焦,等待消費(fèi)者鏈接到這個(gè)隊(duì)列將其取走
Binding(綁定):綁定。用于消息隊(duì)列和交換器之間的關(guān)聯(lián)曙痘。一個(gè)綁定就是基于路由鍵將交換器和消息隊(duì)列連接起來的路由規(guī)則芳悲,所以可以將交換器理解成一個(gè)由綁定構(gòu)成的路由表。
Channel(信道):信道边坤。1名扛,Channel中文叫做信道,是TCP里面的虛擬鏈接茧痒。例如:電纜相當(dāng)于TCP肮韧,信道是一個(gè)獨(dú)立光纖束,一條TCP連接上創(chuàng)建多條信道是沒有問題的。2弄企,TCP一旦打開超燃,就會(huì)創(chuàng)建AMQP信道。3拘领,無論是發(fā)布消息意乓、接收消息、訂閱隊(duì)列约素,這些動(dòng)作都是通過信道完成的届良。
rabbitMQ的原理
交換機(jī)與隊(duì)列進(jìn)行綁定,一個(gè)交換機(jī)可以綁定多個(gè)隊(duì)列圣猎。
消費(fèi)者將消息發(fā)送給對應(yīng)名稱和類型的交換機(jī)(direct:要完全匹配的名稱士葫。fanout:廣播都可以消費(fèi))
交換機(jī)將消息發(fā)送給隊(duì)列
注意:生產(chǎn)者和交換機(jī)。消費(fèi)者和隊(duì)列送悔。为障。。這些連接是通過tcp連接放祟,創(chuàng)建一多個(gè)虛擬信道來進(jìn)行信息傳輸。
rabbitMQ安裝
// 創(chuàng)建并啟動(dòng)mq
docker run -d --name myrabbit1 -p 15672:15672 -p 5672:5672 -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=admin rabbitmq
// 進(jìn)入容器
docker exec -it 容器id bash
// 開啟可視化插件
rabbitmq-plugins enable rabbitmq_management
// rabbitmq配置文件
cd /etc/rabbitmq/conf.d/
// 配置可以查看交換機(jī)
echo management_agent.disable_metrics_collector = false > management_agent.disable_metrics_collector.conf
exit
// 重啟容器
docker restart [containerId]
rabbitMQ的五種常用工作模式
簡單模式:使用默認(rèn)交換機(jī)(direct)
p(生產(chǎn)者)--q(隊(duì)列)--c(1個(gè)消費(fèi)者)
代碼與direct交換機(jī)一樣
work模式:使用默認(rèn)交換機(jī)
p(生產(chǎn)者)--q(隊(duì)列)--c(多個(gè)消費(fèi)者)呻右,當(dāng)有多個(gè)消費(fèi)者是跪妥,會(huì)只能有一個(gè)消費(fèi)者拿到消息
發(fā)布訂閱模式:fanout交換機(jī)
代碼與direct交換機(jī)一樣
路由模式:direct(直連)交換機(jī)
p(生產(chǎn)者)--x(direct交換機(jī))--q(多個(gè)隊(duì)列)--c(多個(gè)消費(fèi)者),當(dāng)消費(fèi)者發(fā)送消息時(shí)声滥,會(huì)通過交換機(jī)發(fā)送給指定的隊(duì)列眉撵,,然后消費(fèi)者按照對應(yīng)的隊(duì)列去進(jìn)行消費(fèi)落塑。
1.創(chuàng)建隊(duì)列
@Configuration
public class RabbitmqConfig {
@Bean
protected Queue quene()
{
Queue queue = new Queue("myQueue");
return queue;
}
}
2.新建類發(fā)送消息
@Autowired
private AmqpTemplate amqpTemplate;
// 生產(chǎn)者發(fā)送:
amqpTemplate.convertAndSend("myQueue" , "這是發(fā)送的內(nèi)容");
3.消費(fèi)者接收:
@RabbitListener(queues = "myQueue")
public void demo(String msg)
{
System.out.println("獲取到的消息1111: " + msg);
}</pre>
發(fā)布訂閱模式:fanout(扇形)交換機(jī)
p(生產(chǎn)者)--x(fanout交換機(jī))--q(多個(gè)隊(duì)列)--c(多個(gè)消費(fèi)者)纽疟,當(dāng)消費(fèi)者發(fā)送消息時(shí),會(huì)通過交換機(jī)發(fā)送給全部的隊(duì)列憾赁,污朽,然后消費(fèi)者按照對應(yīng)的隊(duì)列去進(jìn)行消費(fèi)。
1.創(chuàng)建:
// 創(chuàng)建隊(duì)列myfanout
@Bean
protected Queue fanoutQuque(){
return new Queue("fanout");
}
// 創(chuàng)建交換機(jī)amq.fanout
@Bean
protected FanoutExchange fanoutExchange(){
return new FanoutExchange("amq.fanout");
}
// 綁定隊(duì)列和交換機(jī)
@Bean
protected Binding fanoutBinding(Queue fanoutQuque,FanoutExchange fanoutExchange){
return BindingBuilder.bind(fanoutQuque).to(fanoutExchange);
}
2.發(fā)送:參數(shù)1:交換機(jī) 參數(shù)二:routingKey 參數(shù)3:消息
amqpTemplate.convertAndSend("amq.fanout" , "abcd" , "fanout msg");
3.接收
@RabbitListener(queues = "myfanout")
public void demo(String msg)
{
System.out.println("fanout1: " + msg);
}
注意:參數(shù)二的routingKey對fanout類型交換機(jī)沒有意義
主題模式:topic交換機(jī)
1.創(chuàng)建:
@Bean
protected Queue topicQueue() {
return new Queue("topic");
}
@Bean
protected TopicExchange topicExchange() {
return new TopicExchange("amq.topic");
}
@Bean
protected Binding topicBinding(Queue topicQueue, TopicExchange topicExchange) {
return BindingBuilder.bind(topicQueue).to(topicExchange).with("abcd");
}
2.發(fā)送:參數(shù)1:交換機(jī) 參數(shù)二:routingKey 參數(shù)3:消息
amqpTemplate.convertAndSend("amq.fanout" , "abcd" , "fanout msg");
3.接收
@RabbitListener(queues = "myfanout")
public void demo(String msg)
{
System.out.println("fanout1: " + msg);
}
符號(hào)“#”表示匹配一個(gè)或多個(gè)詞龙考,符號(hào)“*”表示匹配一個(gè)詞蟆肆。</pre>
注意:topic交換機(jī)根據(jù)routingKey不同,選擇不同的隊(duì)列
以上是幾種交換機(jī)模式的代碼,值得注意的是:當(dāng)傳遞消息為對象時(shí)晦款,必須給對象實(shí)體進(jìn)行序列化炎功,且需要給定序列化值。
如何保證rabbitMQ的消息不被重復(fù)消費(fèi)(冪等)
重復(fù)消費(fèi)產(chǎn)生:因?yàn)榫W(wǎng)絡(luò)波動(dòng)原因缓溅,可能會(huì)產(chǎn)生生產(chǎn)者和消費(fèi)者都出現(xiàn)消息的重復(fù)發(fā)送和接收
1.插入數(shù)據(jù)時(shí)判斷數(shù)據(jù)庫是否存在該數(shù)據(jù)蛇损,或者基于唯一主鍵(適用于并發(fā)不是很高,且存在數(shù)據(jù)不可重復(fù)。)
2.記錄關(guān)鍵的key(生產(chǎn)者生成)淤齐,存于緩存股囊。從而判斷是否被重復(fù)消費(fèi)(并發(fā)較高)
代碼:
// 綁定隊(duì)列topic3
@Bean
public Queue topicQueue3()
{
return new Queue("topic3");
}
@Bean
public Binding topicBinding3(Queue topicQueue3 , TopicExchange topicExchange)
{
return BindingBuilder.bind(topicQueue3).to(topicExchange).with("abc");
}
// 發(fā)送
public void publishWithProps(){
amqpTemplate.convertAndSend("amq.topic","abc", "messageWithProps", new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
message.getMessageProperties().setCorrelationId("123");
return message;
}
});
System.out.println("消息發(fā)送成功");
}
// 接收
@RabbitListener(queues = "topic3")
public void demo7(String msg, Channel channel, Message message) throws IOException {
System.out.println("隊(duì)列的消息為:" + msg);
String correlationId = message.getMessageProperties().getCorrelationId();
System.out.println("唯一標(biāo)識(shí)為:" + correlationId);
// 使用redis的setnx存入緩存
// 如果結(jié)果為0則之前沒有被消費(fèi),否則被消費(fèi)了
// 業(yè)務(wù)
}
如何保證rabbitMQ的消息可靠性傳輸(消息不丟失)
1.生產(chǎn)者到交換機(jī)消息丟失
- 生產(chǎn)者到交換機(jī) 丟失消息 :使用setConfirmCallback回調(diào)方法來確定是否發(fā)送成功
// 生產(chǎn)者到交換機(jī)消息丟失
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
if(ack){
System.out.println("消息已經(jīng)送達(dá)到交換機(jī)4膊!毁涉!");
}else{
System.out.println("消息沒有送達(dá)到Exchange,需要做一些補(bǔ)償操作P馑馈贫堰!如重試7次,如果還是不行待牵,則記錄下來");
}
}
});
- mq 丟失消息 :使用setReturnsCallback確定是否進(jìn)入隊(duì)列其屏,并且將交換機(jī)和隊(duì)列都持久化
// 交換機(jī)到隊(duì)列消息消息丟失
rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {
@Override
public void returnedMessage(ReturnedMessage returned) {
// 消息發(fā)送失敗 進(jìn)入此方法,否則不僅進(jìn)入
String msg = new String(returned.getMessage().getBody());
System.out.println("消息:" + msg + "路由隊(duì)列失斢Ц谩Y诵小!做補(bǔ)救操作7∧谩蛤袒!");
}
});
// 另外需開啟持久化機(jī)制來存儲(chǔ)消息,防止消息在mq丟失
rabbitTemplate.convertAndSend("amq.topic","abc", "checkMessageSendExchangeaaaaaa", new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
return message;
}
});
- 消費(fèi)者丟失消息:1.使用basicNack使消息重新發(fā)送(如下代碼)2.直接記錄膨更,手動(dòng)消費(fèi)
// 當(dāng)使用basicNack時(shí)妙真,消息會(huì)重復(fù)發(fā)送給消費(fèi)者(慎用,容易死循環(huán))荚守。
volatile int count = 0;
@RabbitListener(queues = "topic3")
public void demo7(String msg, Channel channel, Message message) throws IOException {
System.out.println("隊(duì)列的消息為:" + msg);
long deliveryTag = message.getMessageProperties().getDeliveryTag();
try {
// 業(yè)務(wù)代碼
int aa = 1/0;
// 手動(dòng)ack
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
}catch (Exception e){
// count的值使用redis將唯一值作為key存儲(chǔ)珍德,使用
if (count < 3){
// 消費(fèi)失敗 第一個(gè)false為
channel.basicNack(deliveryTag,false,true);
count++;
}else {
System.out.println("重試已達(dá)最大次數(shù),存入數(shù)據(jù)庫矗漾,手動(dòng)消費(fèi)");
}
}
}
如何保證rabbitMQ的消息順序性
可以一個(gè)隊(duì)列對應(yīng)一個(gè)消費(fèi)者锈候。然后創(chuàng)建多個(gè)隊(duì)列及相應(yīng)消費(fèi)者
可以將信息保留在redis的隊(duì)列中,敞贡,然后允許一個(gè)服務(wù)進(jìn)行排序消費(fèi)泵琳。
rabbitMQ消息積壓
可以將消費(fèi)者服務(wù)進(jìn)行集群化(但是存在消息順序性問題)
修改消費(fèi)者代碼,進(jìn)行批量獲取消息(比如直接拿到消息進(jìn)一個(gè)隊(duì)列后嫡锌,就發(fā)送消息ack信息虑稼。批量拉取消息)然后進(jìn)行多線程批量消費(fèi)。
用docker或者k8s進(jìn)行動(dòng)態(tài)擴(kuò)容势木。用更多的消費(fèi)者進(jìn)行消費(fèi)
死信隊(duì)列
死信隊(duì)列
死信就是隊(duì)列附帶的一個(gè)交換機(jī)蛛倦。當(dāng)消息出現(xiàn)以下情況,消息就會(huì)被發(fā)路由到死信交換機(jī)啦桌,并由交換機(jī)發(fā)送到死信隊(duì)列溯壶,發(fā)送至客戶端及皂。
消息被消費(fèi)者拒絕
消息的生存時(shí)間到了(或者隊(duì)列所設(shè)置的生存時(shí)間到了)且未被消費(fèi)
隊(duì)列的消息到最大長度(一般來說消息滿了就會(huì)擠掉前面的消息。配置了死信就會(huì)進(jìn)入死信隊(duì)列)
Rabbit集群
#在兩個(gè)服務(wù)器創(chuàng)建兩個(gè)rabbit
version: '3.1'
services:
rabbitmq1:
image: rabbitmq:3.8.5-management-alpine
container_name: rabbitmq1
hostname: rabbitmq1
extra_hosts:
- "rabbitmq1:192.168.11.32"
- "rabbitmq2:192.168.11.33"
environment:
- RABBITMQ_ERLANG_COOKIE=SDJHFGDFFS
ports:
- 5672:5672
- 15672:15672
- 4369:4369
- 25672:25672
version: '3.1'
services:
rabbitmq2:
image: rabbitmq:3.8.5-management-alpine
container_name: rabbitmq2
hostname: rabbitmq2
extra_hosts:
- "rabbitmq1:192.168.11.32"
- "rabbitmq2:192.168.11.33"
environment:
- RABBITMQ_ERLANG_COOKIE=SDJHFGDFFS
ports:
- 5672:5672
- 15672:15672
- 4369:4369
- 25672:25672
</pre>
啟動(dòng)之后需將其互相加入
首先rabbitmq1容器內(nèi)執(zhí)行:
rabbitmqctl start_app
然后rabbitmq2容器內(nèi)執(zhí)行
rabbitmqctl stop_app
rabbitmqctl reset
rabbitmqctl join_cluster rabbit@rabbitmq1
rabbitmqctl start_app</pre>
最后需要設(shè)置鏡像模式