rabbitMQ

AMQP

高級(jí)消息隊(duì)列協(xié)議:是進(jìn)程之間傳遞異步消息的網(wǎng)絡(luò)協(xié)議

AMQP的工作過程

發(fā)布者(Publisher)發(fā)布消息(Message),經(jīng)過交換機(jī)(Exchange)阐枣,交換機(jī)根據(jù)路由規(guī)則將收到消息分發(fā)給交換機(jī)綁定的隊(duì)列(Queue)量蕊,最后AMQP代理會(huì)將消息投遞給訂閱了此隊(duì)列的消費(fèi)者雁刷,或者消費(fèi)者按照需求自行獲取。

RabbitMQ-01.jpg

消息隊(duì)列

什么是消息隊(duì)列(mq)

消息隊(duì)列就是一個(gè)隊(duì)列結(jié)構(gòu)的中間件割捅。主要是分布式之間實(shí)現(xiàn)異步通信的方式。

為什么需要消息隊(duì)列&消息隊(duì)列有哪些作用

  • 解耦:當(dāng)a系統(tǒng)通過接口調(diào)用方式,發(fā)送數(shù)據(jù)給多個(gè)系統(tǒng)咖楣,那么當(dāng)增加一個(gè)需要數(shù)據(jù)的z系統(tǒng)時(shí),a就會(huì)增加一條給z發(fā)送數(shù)據(jù)代碼缕题。這樣截歉,耦合度非常高。所以使用消息隊(duì)列烟零,瘪松,我們只需要讓z系統(tǒng)監(jiān)聽消息即可

  • 異步:當(dāng)a系統(tǒng)發(fā)送一條命令給b系統(tǒng)執(zhí)行咸作,而a不需要接收返回值時(shí)。我們需等待a執(zhí)行完宵睦,程序才能執(zhí)行下去记罚,這樣很浪費(fèi)性能。所以壳嚎,我們使用mq

  • 削峰:當(dāng)并發(fā)數(shù)過高時(shí)桐智,系統(tǒng)無法承載。這樣就先將請求數(shù)據(jù)緩存在mq隊(duì)列烟馅,然后根據(jù)消費(fèi)端能力進(jìn)行消費(fèi)

消息隊(duì)列缺點(diǎn)

系統(tǒng)穩(wěn)定性说庭,可用性降低降低。系統(tǒng)復(fù)雜性提高

RabbitMQ

RabbitMQ適用場景

  • 排隊(duì)算法

  • 秒殺活動(dòng)

  • 消息分發(fā)

  • 異步處理

  • 數(shù)據(jù)同步

  • 處理耗時(shí)任務(wù)

  • 流量銷峰

RabbitMQ幾大組件

  • Message(消息)郑趁。消息是不具名的刊驴,它由消息頭消息體組成。消息體是不透明的寡润,而消息頭則由一系列可選屬性組成捆憎,這些屬性包括:routing-key(路由鍵)、priority(相對于其他消息的優(yōu)先權(quán))梭纹、delivery-mode(指出消息可能持久性存儲(chǔ))等躲惰。

  • exchange(交換機(jī)):交換器。用來接收生產(chǎn)者發(fā)送的消息并將這些消息路由給服務(wù)器中的隊(duì)列变抽。三種常用的交換器類型1. direct(發(fā)布與訂閱 完全匹配)2. fanout(廣播)3. topic(主題础拨,規(guī)則匹配)

  • Queue(隊(duì)列):消息隊(duì)列。用來保存消息直到發(fā)送給消費(fèi)者瞬沦。它是消息的容器太伊,也是消息的終點(diǎn)。一個(gè)消息可投入一個(gè)或多個(gè)隊(duì)列逛钻。消息一直在隊(duì)列里面僚焦,等待消費(fèi)者鏈接到這個(gè)隊(duì)列將其取走

  • Binding(綁定):綁定。用于消息隊(duì)列和交換器之間的關(guān)聯(lián)曙痘。一個(gè)綁定就是基于路由鍵將交換器和消息隊(duì)列連接起來的路由規(guī)則芳悲,所以可以將交換器理解成一個(gè)由綁定構(gòu)成的路由表。

  • Channel(信道):信道边坤。1名扛,Channel中文叫做信道,是TCP里面的虛擬鏈接茧痒。例如:電纜相當(dāng)于TCP肮韧,信道是一個(gè)獨(dú)立光纖束,一條TCP連接上創(chuàng)建多條信道是沒有問題的。2弄企,TCP一旦打開超燃,就會(huì)創(chuàng)建AMQP信道。3拘领,無論是發(fā)布消息意乓、接收消息、訂閱隊(duì)列约素,這些動(dòng)作都是通過信道完成的届良。

rabbitMQ的原理

  1. 交換機(jī)與隊(duì)列進(jìn)行綁定,一個(gè)交換機(jī)可以綁定多個(gè)隊(duì)列圣猎。

  2. 消費(fèi)者將消息發(fā)送給對應(yīng)名稱和類型的交換機(jī)(direct:要完全匹配的名稱士葫。fanout:廣播都可以消費(fèi))

  3. 交換機(jī)將消息發(fā)送給隊(duì)列

注意:生產(chǎn)者和交換機(jī)。消費(fèi)者和隊(duì)列送悔。为障。。這些連接是通過tcp連接放祟,創(chuàng)建一多個(gè)虛擬信道來進(jìn)行信息傳輸。

rabbitMQ安裝

// 創(chuàng)建并啟動(dòng)mq
docker run -d --name myrabbit1 -p 15672:15672 -p 5672:5672 -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=admin rabbitmq
// 進(jìn)入容器
docker exec -it 容器id bash
// 開啟可視化插件
rabbitmq-plugins enable rabbitmq_management
// rabbitmq配置文件
cd /etc/rabbitmq/conf.d/ 
// 配置可以查看交換機(jī)
echo management_agent.disable_metrics_collector = false > management_agent.disable_metrics_collector.conf
exit
// 重啟容器
docker restart [containerId]

rabbitMQ的五種常用工作模式

簡單模式:使用默認(rèn)交換機(jī)(direct)

p(生產(chǎn)者)--q(隊(duì)列)--c(1個(gè)消費(fèi)者)

代碼與direct交換機(jī)一樣

work模式:使用默認(rèn)交換機(jī)

p(生產(chǎn)者)--q(隊(duì)列)--c(多個(gè)消費(fèi)者)呻右,當(dāng)有多個(gè)消費(fèi)者是跪妥,會(huì)只能有一個(gè)消費(fèi)者拿到消息

發(fā)布訂閱模式:fanout交換機(jī)

代碼與direct交換機(jī)一樣

路由模式:direct(直連)交換機(jī)

p(生產(chǎn)者)--x(direct交換機(jī))--q(多個(gè)隊(duì)列)--c(多個(gè)消費(fèi)者),當(dāng)消費(fèi)者發(fā)送消息時(shí)声滥,會(huì)通過交換機(jī)發(fā)送給指定的隊(duì)列眉撵,,然后消費(fèi)者按照對應(yīng)的隊(duì)列去進(jìn)行消費(fèi)落塑。

 1.創(chuàng)建隊(duì)列
@Configuration
public class RabbitmqConfig {

 @Bean
 protected Queue quene()
 {
 Queue queue = new Queue("myQueue");
 return queue;
 }
}

 2.新建類發(fā)送消息
@Autowired
private AmqpTemplate amqpTemplate;
// 生產(chǎn)者發(fā)送:
amqpTemplate.convertAndSend("myQueue" , "這是發(fā)送的內(nèi)容");

3.消費(fèi)者接收:
@RabbitListener(queues = "myQueue")
public void demo(String msg)
{
System.out.println("獲取到的消息1111: " + msg);
}</pre>
發(fā)布訂閱模式:fanout(扇形)交換機(jī)

p(生產(chǎn)者)--x(fanout交換機(jī))--q(多個(gè)隊(duì)列)--c(多個(gè)消費(fèi)者)纽疟,當(dāng)消費(fèi)者發(fā)送消息時(shí),會(huì)通過交換機(jī)發(fā)送給全部的隊(duì)列憾赁,污朽,然后消費(fèi)者按照對應(yīng)的隊(duì)列去進(jìn)行消費(fèi)。

1.創(chuàng)建:
// 創(chuàng)建隊(duì)列myfanout
@Bean
protected Queue fanoutQuque(){
 return new Queue("fanout");
}
// 創(chuàng)建交換機(jī)amq.fanout
@Bean
protected FanoutExchange fanoutExchange(){
 return new FanoutExchange("amq.fanout");
}
// 綁定隊(duì)列和交換機(jī)
@Bean
protected Binding fanoutBinding(Queue fanoutQuque,FanoutExchange fanoutExchange){
 return BindingBuilder.bind(fanoutQuque).to(fanoutExchange);
}


2.發(fā)送:參數(shù)1:交換機(jī) 參數(shù)二:routingKey 參數(shù)3:消息
amqpTemplate.convertAndSend("amq.fanout" , "abcd" , "fanout msg");

3.接收
@RabbitListener(queues = "myfanout")
public void demo(String msg)
{
System.out.println("fanout1: " + msg);
}

注意:參數(shù)二的routingKey對fanout類型交換機(jī)沒有意義

主題模式:topic交換機(jī)
1.創(chuàng)建:
@Bean
protected Queue topicQueue() {
 return new Queue("topic");
}
@Bean
protected TopicExchange topicExchange() {
 return new TopicExchange("amq.topic");
}
@Bean
protected Binding topicBinding(Queue topicQueue, TopicExchange topicExchange) {
 return BindingBuilder.bind(topicQueue).to(topicExchange).with("abcd");
}

2.發(fā)送:參數(shù)1:交換機(jī) 參數(shù)二:routingKey 參數(shù)3:消息
amqpTemplate.convertAndSend("amq.fanout" , "abcd" , "fanout msg");

3.接收
@RabbitListener(queues = "myfanout")
public void demo(String msg)
{
System.out.println("fanout1: " + msg);
}

符號(hào)“#”表示匹配一個(gè)或多個(gè)詞龙考,符號(hào)“*”表示匹配一個(gè)詞蟆肆。</pre>

注意:topic交換機(jī)根據(jù)routingKey不同,選擇不同的隊(duì)列

以上是幾種交換機(jī)模式的代碼,值得注意的是:當(dāng)傳遞消息為對象時(shí)晦款,必須給對象實(shí)體進(jìn)行序列化炎功,且需要給定序列化值。

如何保證rabbitMQ的消息不被重復(fù)消費(fèi)(冪等)

重復(fù)消費(fèi)產(chǎn)生:因?yàn)榫W(wǎng)絡(luò)波動(dòng)原因缓溅,可能會(huì)產(chǎn)生生產(chǎn)者和消費(fèi)者都出現(xiàn)消息的重復(fù)發(fā)送和接收

1.插入數(shù)據(jù)時(shí)判斷數(shù)據(jù)庫是否存在該數(shù)據(jù)蛇损,或者基于唯一主鍵(適用于并發(fā)不是很高,且存在數(shù)據(jù)不可重復(fù)。)

2.記錄關(guān)鍵的key(生產(chǎn)者生成)淤齐,存于緩存股囊。從而判斷是否被重復(fù)消費(fèi)(并發(fā)較高)

代碼:

 // 綁定隊(duì)列topic3
 @Bean
 public Queue topicQueue3()
 {
 return new Queue("topic3");

 }

 @Bean
 public Binding topicBinding3(Queue topicQueue3 , TopicExchange topicExchange)
 {
 return BindingBuilder.bind(topicQueue3).to(topicExchange).with("abc");
 }


 // 發(fā)送
 public void publishWithProps(){
 amqpTemplate.convertAndSend("amq.topic","abc", "messageWithProps", new MessagePostProcessor() {
 @Override
 public Message postProcessMessage(Message message) throws AmqpException {
 message.getMessageProperties().setCorrelationId("123");
 return message;
 }
 });
 System.out.println("消息發(fā)送成功");
 }

 // 接收
 @RabbitListener(queues = "topic3")
 public void demo7(String msg, Channel channel, Message message) throws IOException {
 System.out.println("隊(duì)列的消息為:" + msg);
 String correlationId = message.getMessageProperties().getCorrelationId();
 System.out.println("唯一標(biāo)識(shí)為:" + correlationId);
 // 使用redis的setnx存入緩存
 // 如果結(jié)果為0則之前沒有被消費(fèi),否則被消費(fèi)了
 // 業(yè)務(wù)

 }

如何保證rabbitMQ的消息可靠性傳輸(消息不丟失)

1.生產(chǎn)者到交換機(jī)消息丟失

  • 生產(chǎn)者到交換機(jī) 丟失消息 :使用setConfirmCallback回調(diào)方法來確定是否發(fā)送成功
// 生產(chǎn)者到交換機(jī)消息丟失
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
 @Override
 public void confirm(CorrelationData correlationData, boolean ack, String cause) {
 if(ack){
 System.out.println("消息已經(jīng)送達(dá)到交換機(jī)4膊!毁涉!");
 }else{
 System.out.println("消息沒有送達(dá)到Exchange,需要做一些補(bǔ)償操作P馑馈贫堰!如重試7次,如果還是不行待牵,則記錄下來");
 }
 }
});
  • mq 丟失消息 :使用setReturnsCallback確定是否進(jìn)入隊(duì)列其屏,并且將交換機(jī)和隊(duì)列都持久化
// 交換機(jī)到隊(duì)列消息消息丟失
rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {
 @Override
 public void returnedMessage(ReturnedMessage returned) {
 // 消息發(fā)送失敗 進(jìn)入此方法,否則不僅進(jìn)入
 String msg = new String(returned.getMessage().getBody());
 System.out.println("消息:" + msg + "路由隊(duì)列失斢Ц谩Y诵小!做補(bǔ)救操作7∧谩蛤袒!");
 }
});

// 另外需開啟持久化機(jī)制來存儲(chǔ)消息,防止消息在mq丟失
 rabbitTemplate.convertAndSend("amq.topic","abc", "checkMessageSendExchangeaaaaaa", new MessagePostProcessor() {
 @Override
 public Message postProcessMessage(Message message) throws AmqpException {
 message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
 return message;
 }
 });
  • 消費(fèi)者丟失消息:1.使用basicNack使消息重新發(fā)送(如下代碼)2.直接記錄膨更,手動(dòng)消費(fèi)
// 當(dāng)使用basicNack時(shí)妙真,消息會(huì)重復(fù)發(fā)送給消費(fèi)者(慎用,容易死循環(huán))荚守。
volatile int count = 0;
@RabbitListener(queues = "topic3")
public void demo7(String msg, Channel channel, Message message) throws IOException {
 System.out.println("隊(duì)列的消息為:" + msg);
 long deliveryTag = message.getMessageProperties().getDeliveryTag();
 try {
 // 業(yè)務(wù)代碼
 int aa = 1/0;
 // 手動(dòng)ack
 channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
 }catch (Exception e){
 // count的值使用redis將唯一值作為key存儲(chǔ)珍德,使用
 if (count < 3){
 // 消費(fèi)失敗 第一個(gè)false為
 channel.basicNack(deliveryTag,false,true);
 count++;
 }else {
 System.out.println("重試已達(dá)最大次數(shù),存入數(shù)據(jù)庫矗漾,手動(dòng)消費(fèi)");
 }
 }
}

如何保證rabbitMQ的消息順序性

  • 可以一個(gè)隊(duì)列對應(yīng)一個(gè)消費(fèi)者锈候。然后創(chuàng)建多個(gè)隊(duì)列及相應(yīng)消費(fèi)者

  • 可以將信息保留在redis的隊(duì)列中,敞贡,然后允許一個(gè)服務(wù)進(jìn)行排序消費(fèi)泵琳。

rabbitMQ消息積壓

  • 可以將消費(fèi)者服務(wù)進(jìn)行集群化(但是存在消息順序性問題)

  • 修改消費(fèi)者代碼,進(jìn)行批量獲取消息(比如直接拿到消息進(jìn)一個(gè)隊(duì)列后嫡锌,就發(fā)送消息ack信息虑稼。批量拉取消息)然后進(jìn)行多線程批量消費(fèi)。

  • 用docker或者k8s進(jìn)行動(dòng)態(tài)擴(kuò)容势木。用更多的消費(fèi)者進(jìn)行消費(fèi)

  • 死信隊(duì)列

死信隊(duì)列

死信就是隊(duì)列附帶的一個(gè)交換機(jī)蛛倦。當(dāng)消息出現(xiàn)以下情況,消息就會(huì)被發(fā)路由到死信交換機(jī)啦桌,并由交換機(jī)發(fā)送到死信隊(duì)列溯壶,發(fā)送至客戶端及皂。

  • 消息被消費(fèi)者拒絕

  • 消息的生存時(shí)間到了(或者隊(duì)列所設(shè)置的生存時(shí)間到了)且未被消費(fèi)

  • 隊(duì)列的消息到最大長度(一般來說消息滿了就會(huì)擠掉前面的消息。配置了死信就會(huì)進(jìn)入死信隊(duì)列)

Rabbit集群

#在兩個(gè)服務(wù)器創(chuàng)建兩個(gè)rabbit
version: '3.1'
services:
 rabbitmq1:
 image: rabbitmq:3.8.5-management-alpine
 container_name: rabbitmq1
 hostname: rabbitmq1
 extra_hosts:
 - "rabbitmq1:192.168.11.32"
 - "rabbitmq2:192.168.11.33"
 environment: 
 - RABBITMQ_ERLANG_COOKIE=SDJHFGDFFS
 ports:
 - 5672:5672
 - 15672:15672
 - 4369:4369
 - 25672:25672

version: '3.1'
services:
 rabbitmq2:
 image: rabbitmq:3.8.5-management-alpine
 container_name: rabbitmq2
 hostname: rabbitmq2
 extra_hosts:
 - "rabbitmq1:192.168.11.32"
 - "rabbitmq2:192.168.11.33"
 environment: 
 - RABBITMQ_ERLANG_COOKIE=SDJHFGDFFS
 ports:
 - 5672:5672
 - 15672:15672
 - 4369:4369
 - 25672:25672
</pre>

啟動(dòng)之后需將其互相加入

首先rabbitmq1容器內(nèi)執(zhí)行:

rabbitmqctl start_app

然后rabbitmq2容器內(nèi)執(zhí)行

rabbitmqctl stop_app
rabbitmqctl reset 
rabbitmqctl join_cluster rabbit@rabbitmq1
rabbitmqctl start_app</pre>

最后需要設(shè)置鏡像模式

image-20220728223801059.png
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末且改,一起剝皮案震驚了整個(gè)濱河市验烧,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌又跛,老刑警劉巖碍拆,帶你破解...
    沈念sama閱讀 211,265評論 6 490
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異慨蓝,居然都是意外死亡感混,警方通過查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,078評論 2 385
  • 文/潘曉璐 我一進(jìn)店門礼烈,熙熙樓的掌柜王于貴愁眉苦臉地迎上來弧满,“玉大人,你說我怎么就攤上這事此熬⊥ノ兀” “怎么了?”我有些...
    開封第一講書人閱讀 156,852評論 0 347
  • 文/不壞的土叔 我叫張陵犀忱,是天一觀的道長募谎。 經(jīng)常有香客問我,道長阴汇,這世上最難降的妖魔是什么近哟? 我笑而不...
    開封第一講書人閱讀 56,408評論 1 283
  • 正文 為了忘掉前任,我火速辦了婚禮鲫寄,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘疯淫。我一直安慰自己地来,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 65,445評論 5 384
  • 文/花漫 我一把揭開白布熙掺。 她就那樣靜靜地躺著未斑,像睡著了一般。 火紅的嫁衣襯著肌膚如雪币绩。 梳的紋絲不亂的頭發(fā)上蜡秽,一...
    開封第一講書人閱讀 49,772評論 1 290
  • 那天,我揣著相機(jī)與錄音缆镣,去河邊找鬼芽突。 笑死,一個(gè)胖子當(dāng)著我的面吹牛董瞻,可吹牛的內(nèi)容都是我干的寞蚌。 我是一名探鬼主播田巴,決...
    沈念sama閱讀 38,921評論 3 406
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼挟秤!你這毒婦竟也來了壹哺?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 37,688評論 0 266
  • 序言:老撾萬榮一對情侶失蹤艘刚,失蹤者是張志新(化名)和其女友劉穎管宵,沒想到半個(gè)月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體攀甚,經(jīng)...
    沈念sama閱讀 44,130評論 1 303
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡箩朴,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,467評論 2 325
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了云稚。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片隧饼。...
    茶點(diǎn)故事閱讀 38,617評論 1 340
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖静陈,靈堂內(nèi)的尸體忽然破棺而出燕雁,到底是詐尸還是另有隱情,我是刑警寧澤鲸拥,帶...
    沈念sama閱讀 34,276評論 4 329
  • 正文 年R本政府宣布拐格,位于F島的核電站,受9級(jí)特大地震影響刑赶,放射性物質(zhì)發(fā)生泄漏捏浊。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,882評論 3 312
  • 文/蒙蒙 一撞叨、第九天 我趴在偏房一處隱蔽的房頂上張望金踪。 院中可真熱鬧,春花似錦牵敷、人聲如沸胡岔。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,740評論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽靶瘸。三九已至,卻和暖如春毛肋,著一層夾襖步出監(jiān)牢的瞬間怨咪,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 31,967評論 1 265
  • 我被黑心中介騙來泰國打工润匙, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留诗眨,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 46,315評論 2 360
  • 正文 我出身青樓孕讳,卻偏偏與公主長得像辽话,于是被迫代替她去往敵國和親肄鸽。 傳聞我的和親對象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 43,486評論 2 348

推薦閱讀更多精彩內(nèi)容