RabbitMQ 實(shí)戰(zhàn)教程
1.MQ引言
修改ip地址
1.1 什么是MQ
MQ
(Message Quene) : 翻譯為 消息隊(duì)列
,通過(guò)典型的 生產(chǎn)者
和消費(fèi)者
模型,生產(chǎn)者不斷向消息隊(duì)列中生產(chǎn)消息窜管,消費(fèi)者不斷的從隊(duì)列中獲取消息喧枷。因?yàn)橄⒌纳a(chǎn)和消費(fèi)都是異步的咕宿,而且只關(guān)心消息的發(fā)送和接收烦粒,沒(méi)有業(yè)務(wù)邏輯的侵入,輕松的實(shí)現(xiàn)系統(tǒng)間解耦。別名為 消息中間件
通過(guò)利用高效可靠的消息傳遞機(jī)制進(jìn)行平臺(tái)無(wú)關(guān)的數(shù)據(jù)交流蚤氏,并基于數(shù)據(jù)通信來(lái)進(jìn)行分布式系統(tǒng)的集成甘耿。
1.2 MQ有哪些
當(dāng)今市面上有很多主流的消息中間件,如老牌的ActiveMQ
竿滨、RabbitMQ
佳恬,炙手可熱的Kafka
,阿里巴巴自主開發(fā)RocketMQ
等姐呐。
1.3 不同MQ特點(diǎn)
# 1.ActiveMQ
ActiveMQ 是Apache出品殿怜,最流行的,能力強(qiáng)勁的開源消息總線曙砂。它是一個(gè)完全支持JMS規(guī)范的的消息中間件。豐富的API,多種集群架構(gòu)模式讓ActiveMQ在業(yè)界成為老牌的消息中間件,在中小型企業(yè)頗受歡迎!
# 2.Kafka
Kafka是LinkedIn開源的分布式發(fā)布-訂閱消息系統(tǒng)骏掀,目前歸屬于Apache頂級(jí)項(xiàng)目鸠澈。Kafka主要特點(diǎn)是基于Pull的模式來(lái)處理消息消費(fèi),
追求高吞吐量截驮,一開始的目的就是用于日志收集和傳輸笑陈。0.8版本開始支持復(fù)制,不支持事務(wù)葵袭,對(duì)消息的重復(fù)涵妥、丟失、錯(cuò)誤沒(méi)有嚴(yán)格要求坡锡,
適合產(chǎn)生大量數(shù)據(jù)的互聯(lián)網(wǎng)服務(wù)的數(shù)據(jù)收集業(yè)務(wù)蓬网。
# 3.RocketMQ
RocketMQ是阿里開源的消息中間件,它是純Java開發(fā)鹉勒,具有高吞吐量帆锋、高可用性、適合大規(guī)模分布式系統(tǒng)應(yīng)用的特點(diǎn)禽额。RocketMQ思路起
源于Kafka锯厢,但并不是Kafka的一個(gè)Copy,它對(duì)消息的可靠傳輸及事務(wù)性做了優(yōu)化脯倒,目前在阿里集團(tuán)被廣泛應(yīng)用于交易实辑、充值、流計(jì)算藻丢、消
息推送剪撬、日志流式處理、binglog分發(fā)等場(chǎng)景郁岩。
# 4.RabbitMQ
RabbitMQ是使用Erlang語(yǔ)言開發(fā)的開源消息隊(duì)列系統(tǒng)婿奔,基于AMQP協(xié)議來(lái)實(shí)現(xiàn)缺狠。AMQP的主要特征是面向消息、隊(duì)列萍摊、路由(包括點(diǎn)對(duì)點(diǎn)和發(fā)布/訂閱)挤茄、可靠性、安全冰木。AMQP協(xié)議更多用在企業(yè)系統(tǒng)內(nèi)對(duì)數(shù)據(jù)一致性穷劈、穩(wěn)定性和可靠性要求很高的場(chǎng)景,對(duì)性能和吞吐量的要求還在其次踊沸。
RabbitMQ比Kafka可靠歇终,Kafka更適合IO高吞吐的處理,一般應(yīng)用在大數(shù)據(jù)日志處理或?qū)?shí)時(shí)性(少量延遲)逼龟,可靠性(少量丟數(shù)據(jù))要求稍低的場(chǎng)景使用评凝,比如ELK日志收集。
2.RabbitMQ 的引言
2.1 RabbitMQ
基于
AMQP
協(xié)議腺律,erlang語(yǔ)言開發(fā)奕短,是部署最廣泛的開源消息中間件,是最受歡迎的開源消息中間件之一。
官網(wǎng)
: https://www.rabbitmq.com/
官方教程
: https://www.rabbitmq.com/#getstarted
<pre spellcheck="false" class="md-fences md-end-block md-fences-with-lineno ty-contain-cm modeLoaded" lang="markdown" cid="n21" mdtype="fences" style="box-sizing: border-box; overflow: visible; font-family: Monaco, Consolas, "Andale Mono", "DejaVu Sans Mono", monospace; margin-top: 0px; margin-bottom: 20px; font-size: 0.9rem; display: block; break-inside: avoid; text-align: left; white-space: normal; background: rgb(51, 51, 51); position: relative !important; padding: 10px 10px 10px 0px; width: inherit;"> # AMQP 協(xié)議
AMQP(advanced message queuing protocol)`在2003年時(shí)被提出匀钧,最早用于解決金融領(lǐng)不同平臺(tái)之間的消息傳遞交互問(wèn)題翎碑。顧名思義,AMQP是一種協(xié)議之斯,更準(zhǔn)確的說(shuō)是一種binary wire-level protocol(鏈接協(xié)議)日杈。這是其和JMS的本質(zhì)差別,AMQP不從API層進(jìn)行限定佑刷,而是直接定義網(wǎng)絡(luò)交換的數(shù)據(jù)格式莉擒。這使得實(shí)現(xiàn)了AMQP的provider天然性就是跨平臺(tái)的。以下是AMQP協(xié)議模型:</pre>
2.2 RabbitMQ 的安裝
2.2.1 下載
官網(wǎng)下載地址
: https://www.rabbitmq.com/download.html
最新版本
: 3.7.18
2.2.2 下載的安裝包
注意
:這里的安裝包是centos7安裝的包
2.2.3 安裝步驟
1.將rabbitmq安裝包上傳到linux系統(tǒng)中
erlang-22.0.7-1.el7.x86_64.rpm
rabbitmq-server-3.7.18-1.el7.noarch.rpm
# 2.安裝Erlang依賴包
rpm -ivh erlang-22.0.7-1.el7.x86_64.rpm
# 3.安裝RabbitMQ安裝包(需要聯(lián)網(wǎng))
yum install -y rabbitmq-server-3.7.18-1.el7.noarch.rpm
注意:默認(rèn)安裝完成后配置文件模板在:/usr/share/doc/rabbitmq-server-3.7.18/rabbitmq.config.example目錄中,需要
將配置文件復(fù)制到/etc/rabbitmq/目錄中,并修改名稱為rabbitmq.config
# 4.復(fù)制配置文件
cp /usr/share/doc/rabbitmq-server-3.7.18/rabbitmq.config.example /etc/rabbitmq/rabbitmq.config
# 5.查看配置文件位置
ls /etc/rabbitmq/rabbitmq.config
# 6.修改配置文件(參見下圖:)
vim /etc/rabbitmq/rabbitmq.config
將上圖中配置文件中紅色部分去掉%%
,以及最后的,
逗號(hào) 修改為下圖:允許來(lái)賓用戶在任意地方訪問(wèn)
7.執(zhí)行如下命令,啟動(dòng)rabbitmq中的插件管理
rabbitmq-plugins enable rabbitmq_management
出現(xiàn)如下說(shuō)明:
Enabling plugins on node rabbit@localhost:
rabbitmq_management
The following plugins have been configured:
rabbitmq_management
rabbitmq_management_agent
rabbitmq_web_dispatch
Applying plugin configuration to rabbit@localhost...
The following plugins have been enabled:
rabbitmq_management
rabbitmq_management_agent
rabbitmq_web_dispatch
set 3 plugins.
Offline change; changes will take effect at broker restart.
# 8.啟動(dòng)RabbitMQ的服務(wù)
systemctl start rabbitmq-server
systemctl restart rabbitmq-server
systemctl stop rabbitmq-server
# 9.查看服務(wù)狀態(tài)(見下圖:)
systemctl status rabbitmq-server
● rabbitmq-server.service - RabbitMQ broker
Loaded: loaded (/usr/lib/systemd/system/rabbitmq-server.service; disabled; vendor preset: disabled)
Active: active (running) since 三 2019-09-25 22:26:35 CST; 7s ago
Main PID: 2904 (beam.smp)
Status: "Initialized"
CGroup: /system.slice/rabbitmq-server.service
├─2904 /usr/lib64/erlang/erts-10.4.4/bin/beam.smp -W w -A 64 -MBas ageffcbf -MHas ageffcbf -
MBlmbcs...
├─3220 erl_child_setup 32768
├─3243 inet_gethost 4
└─3244 inet_gethost 4
.........
10.關(guān)閉防火墻服務(wù)
systemctl disable firewalld
Removed symlink /etc/systemd/system/multi-user.target.wants/firewalld.service.
Removed symlink /etc/systemd/system/dbus-org.fedoraproject.FirewallD1.service.
systemctl stop firewalld
# 11.訪問(wèn)web管理界面
http://10.15.0.8:15672/
12.登錄管理界面
username: guest
password: guest
3. RabiitMQ 配置
3.1RabbitMQ 管理命令行
# 1.服務(wù)啟動(dòng)相關(guān)
systemctl start|restart|stop|status rabbitmq-server
# 2.管理命令行 用來(lái)在不使用web管理界面情況下命令操作RabbitMQ
rabbitmqctl help 可以查看更多命令
# 3.插件管理命令行
rabbitmq-plugins enable|list|disable </pre>
3.2 web管理界面介紹
3.2.1 overview概覽
connections:無(wú)論生產(chǎn)者還是消費(fèi)者项乒,都需要與RabbitMQ建立連接后才可以完成消息的生產(chǎn)和消費(fèi)啰劲,在這里可以查看連接情況
channels:通道,建立連接后檀何,會(huì)形成通道蝇裤,消息的投遞獲取依賴通道。
Exchanges:交換機(jī)频鉴,用來(lái)實(shí)現(xiàn)消息的路由
Queues:隊(duì)列栓辜,即消息隊(duì)列,消息存放在隊(duì)列中垛孔,等待消費(fèi)藕甩,消費(fèi)后被移除隊(duì)列。
3.2.2 Admin用戶和虛擬主機(jī)管理
1. 添加用戶
上面的Tags選項(xiàng)周荐,其實(shí)是指定用戶的角色狭莱,可選的有以下幾個(gè):
-
超級(jí)管理員(administrator)
可登陸管理控制臺(tái)僵娃,可查看所有的信息,并且可以對(duì)用戶腋妙,策略(policy)進(jìn)行操作默怨。
-
監(jiān)控者(monitoring)
可登陸管理控制臺(tái),同時(shí)可以查看rabbitmq節(jié)點(diǎn)的相關(guān)信息(進(jìn)程數(shù)骤素,內(nèi)存使用情況匙睹,磁盤使用情況等)
-
策略制定者(policymaker)
可登陸管理控制臺(tái), 同時(shí)可以對(duì)policy進(jìn)行管理。但無(wú)法查看節(jié)點(diǎn)的相關(guān)信息(上圖紅框標(biāo)識(shí)的部分)济竹。
-
普通管理者(management)
僅可登陸管理控制臺(tái)痕檬,無(wú)法看到節(jié)點(diǎn)信息,也無(wú)法對(duì)策略進(jìn)行管理送浊。
-
其他
無(wú)法登陸管理控制臺(tái)梦谜,通常就是普通的生產(chǎn)者和消費(fèi)者。
2. 創(chuàng)建虛擬主機(jī)
虛擬主機(jī)
為了讓各個(gè)用戶可以互不干擾的工作罕袋,RabbitMQ添加了虛擬主機(jī)(Virtual Hosts)的概念改淑。其實(shí)就是一個(gè)獨(dú)立的訪問(wèn)路徑,不同用戶使用不同路徑浴讯,各自有自己的隊(duì)列、交換機(jī)蔼啦,互相不會(huì)影響榆纽。</pre>
3. 綁定虛擬主機(jī)和用戶
創(chuàng)建好虛擬主機(jī)白修,我們還要給用戶添加訪問(wèn)權(quán)限:
點(diǎn)擊添加好的虛擬主機(jī):
進(jìn)入虛擬機(jī)設(shè)置界面:
4.RabbitMQ 的第一個(gè)程序
4.0 AMQP協(xié)議的回顧
4.1 RabbitMQ支持的消息模型
4.2 引入依賴
<dependencies>
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>4.10.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-log4j12 -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.25</version>
<scope>test</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.commons/commons-lang3 -->
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>3.9</version>
</dependency>
</dependencies>
4.3 第一種模型(直連)
在上圖的模型中缭召,有以下概念:
P:生產(chǎn)者,也就是要發(fā)送消息的程序
C:消費(fèi)者:消息的接受者偷仿,會(huì)一直等待消息到來(lái)鸵赫。
queue:消息隊(duì)列衣屏,圖中紅色部分。類似一個(gè)郵箱辩棒,可以緩存消息狼忱;生產(chǎn)者向其中投遞消息,消費(fèi)者從其中取出消息一睁。
1. 開發(fā)生產(chǎn)者
//創(chuàng)建連接工廠
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("10.15.0.9");
connectionFactory.setPort(5672);
connectionFactory.setUsername("ems");
connectionFactory.setPassword("123");
connectionFactory.setVirtualHost("/ems");
Connection connection = connectionFactory.newConnection();
//創(chuàng)建通道
Channel channel = connection.createChannel();
//參數(shù)1: 是否持久化 參數(shù)2:是否獨(dú)占隊(duì)列 參數(shù)3:是否自動(dòng)刪除 參數(shù)4:其他屬性
channel.queueDeclare("hello",true,false,false,null);
channel.basicPublish("","hello", null,"hello rabbitmq".getBytes());
channel.close();
connection.close();
##### 2\. 開發(fā)消費(fèi)者
//創(chuàng)建連接工廠
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("10.15.0.9");
connectionFactory.setPort(5672);
connectionFactory.setUsername("ems");
connectionFactory.setPassword("123");
connectionFactory.setVirtualHost("/ems");
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare("hello", true, false, false, null);
channel.basicConsume("hello",true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println(new String(body));
}
});
##### 3\. 參數(shù)的說(shuō)明
channel.queueDeclare("hello",true,false,false,null);
'參數(shù)1':用來(lái)聲明通道對(duì)應(yīng)的隊(duì)列
'參數(shù)2':用來(lái)指定是否持久化隊(duì)列
'參數(shù)3':用來(lái)指定是否獨(dú)占隊(duì)列
'參數(shù)4':用來(lái)指定是否自動(dòng)刪除隊(duì)列
'參數(shù)5':對(duì)隊(duì)列的額外配置</pre>
4.4 第二種模型(work quene)
Work queues
钻弄,也被稱為(Task queues
),任務(wù)模型者吁。當(dāng)消息處理比較耗時(shí)的時(shí)候窘俺,可能生產(chǎn)消息的速度會(huì)遠(yuǎn)遠(yuǎn)大于消息的消費(fèi)速度。長(zhǎng)此以往复凳,消息就會(huì)堆積越來(lái)越多瘤泪,無(wú)法及時(shí)處理灶泵。此時(shí)就可以使用work 模型:讓多個(gè)消費(fèi)者綁定到一個(gè)隊(duì)列,共同消費(fèi)隊(duì)列中的消息对途。隊(duì)列中的消息一旦消費(fèi)赦邻,就會(huì)消失,因此任務(wù)是不會(huì)被重復(fù)執(zhí)行的掀宋。
角色:
P:生產(chǎn)者:任務(wù)的發(fā)布者
C1:消費(fèi)者-1深纲,領(lǐng)取任務(wù)并且完成任務(wù),假設(shè)完成速度較慢
C2:消費(fèi)者-2:領(lǐng)取任務(wù)并完成任務(wù)劲妙,假設(shè)完成速度快
1. 開發(fā)生產(chǎn)者
channel.queueDeclare("hello", true, false, false, null);
for (int i = 0; i < 10; i++) {
channel.basicPublish("", "hello", null, (i+"====>:我是消息").getBytes());
}
2.開發(fā)消費(fèi)者-1
channel.queueDeclare("hello",true,false,false,null);
channel.basicConsume("hello",true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消費(fèi)者1: "+new String(body));
}
});
3.開發(fā)消費(fèi)者-2
channel.queueDeclare("hello",true,false,false,null);
channel.basicConsume("hello",true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
try {
Thread.sleep(1000); //處理消息比較慢 一秒處理一個(gè)消息
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("消費(fèi)者2: "+new String(body));
}
});
4.測(cè)試結(jié)果
總結(jié):默認(rèn)情況下湃鹊,RabbitMQ將按順序?qū)⒚總€(gè)消息發(fā)送給下一個(gè)使用者。平均而言镣奋,每個(gè)消費(fèi)者都會(huì)收到相同數(shù)量的消息币呵。這種分發(fā)消息的方式稱為循環(huán)。
5.消息自動(dòng)確認(rèn)機(jī)制
Doing a task can take a few seconds. You may wonder what happens if one of the consumers starts a long task and dies with it only partly done. With our current code, once RabbitMQ delivers a message to the consumer it immediately marks it for deletion. In this case, if you kill a worker we will lose the message it was just processing. We'll also lose all the messages that were dispatched to this particular worker but were not yet handled.
But we don't want to lose any tasks. If a worker dies, we'd like the task to be delivered to another worker.
channel.basicQos(1);//一次只接受一條未確認(rèn)的消息
//參數(shù)2:關(guān)閉自動(dòng)確認(rèn)消息
channel.basicConsume("hello",false,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消費(fèi)者1: "+new String(body));
channel.basicAck(envelope.getDeliveryTag(),false);//手動(dòng)確認(rèn)消息
}
});
設(shè)置通道一次只能消費(fèi)一個(gè)消息
關(guān)閉消息的自動(dòng)確認(rèn),開啟手動(dòng)確認(rèn)消息
4.5 第三種模型(fanout)
fanout 扇出 也稱為廣播
在廣播模式下侨颈,消息發(fā)送流程是這樣的:
可以有多個(gè)消費(fèi)者
每個(gè)消費(fèi)者有自己的queue(隊(duì)列)
每個(gè)隊(duì)列都要綁定到Exchange(交換機(jī))
生產(chǎn)者發(fā)送的消息余赢,只能發(fā)送到交換機(jī),交換機(jī)來(lái)決定要發(fā)給哪個(gè)隊(duì)列哈垢,生產(chǎn)者無(wú)法決定妻柒。
交換機(jī)把消息發(fā)送給綁定過(guò)的所有隊(duì)列
隊(duì)列的消費(fèi)者都能拿到消息。實(shí)現(xiàn)一條消息被多個(gè)消費(fèi)者消費(fèi)
1. 開發(fā)生產(chǎn)者
//聲明交換機(jī)
channel.exchangeDeclare("logs","fanout");//廣播 一條消息多個(gè)消費(fèi)者同時(shí)消費(fèi)
//發(fā)布消息
channel.basicPublish("logs","",null,"hello".getBytes());
2. 開發(fā)消費(fèi)者-1
//綁定交換機(jī)
channel.exchangeDeclare("logs","fanout");
//創(chuàng)建臨時(shí)隊(duì)列
String queue = channel.queueDeclare().getQueue();
//將臨時(shí)隊(duì)列綁定exchange
channel.queueBind(queue,"logs","");
//處理消息
channel.basicConsume(queue,true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消費(fèi)者1: "+new String(body));
}
});
3. 開發(fā)消費(fèi)者-2
//綁定交換機(jī)
channel.exchangeDeclare("logs","fanout");
//創(chuàng)建臨時(shí)隊(duì)列
String queue = channel.queueDeclare().getQueue();
//將臨時(shí)隊(duì)列綁定exchange
channel.queueBind(queue,"logs","");
//處理消息
channel.basicConsume(queue,true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消費(fèi)者2: "+new String(body));
}
});
4.開發(fā)消費(fèi)者-3
//綁定交換機(jī)
channel.exchangeDeclare("logs","fanout");
//創(chuàng)建臨時(shí)隊(duì)列
String queue = channel.queueDeclare().getQueue();
//將臨時(shí)隊(duì)列綁定exchange
channel.queueBind(queue,"logs","");
//處理消息
channel.basicConsume(queue,true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消費(fèi)者3: "+new String(body));
}
});
5. 測(cè)試結(jié)果
4.6 第四種模型(Routing)
4.6.1 Routing 之訂閱模型-Direct(直連)
在Fanout模式中耘分,一條消息举塔,會(huì)被所有訂閱的隊(duì)列都消費(fèi)。但是求泰,在某些場(chǎng)景下央渣,我們希望不同的消息被不同的隊(duì)列消費(fèi)。這時(shí)就要用到Direct類型的Exchange渴频。
在Direct模型下:
隊(duì)列與交換機(jī)的綁定芽丹,不能是任意綁定了,而是要指定一個(gè)
RoutingKey
(路由key)消息的發(fā)送方在 向 Exchange發(fā)送消息時(shí)卜朗,也必須指定消息的
RoutingKey
拔第。Exchange不再把消息交給每一個(gè)綁定的隊(duì)列,而是根據(jù)消息的
Routing Key
進(jìn)行判斷聊替,只有隊(duì)列的Routingkey
與消息的Routing key
完全一致楼肪,才會(huì)接收到消息
流程:
圖解:
P:生產(chǎn)者,向Exchange發(fā)送消息惹悄,發(fā)送消息時(shí)春叫,會(huì)指定一個(gè)routing key。
X:Exchange(交換機(jī)),接收生產(chǎn)者的消息暂殖,然后把消息遞交給 與routing key完全匹配的隊(duì)列
C1:消費(fèi)者价匠,其所在隊(duì)列指定了需要routing key 為 error 的消息
C2:消費(fèi)者,其所在隊(duì)列指定了需要routing key 為 info呛每、error踩窖、warning 的消息
1. 開發(fā)生產(chǎn)者
//聲明交換機(jī) 參數(shù)1:交換機(jī)名稱 參數(shù)2:交換機(jī)類型 基于指令的Routing key轉(zhuǎn)發(fā)
channel.exchangeDeclare("logs_direct","direct");
String key = "";
//發(fā)布消息
channel.basicPublish("logs_direct",key,null,("指定的route key"+key+"的消息").getBytes());
2.開發(fā)消費(fèi)者-1
//聲明交換機(jī)
channel.exchangeDeclare("logs_direct","direct");
//創(chuàng)建臨時(shí)隊(duì)列
String queue = channel.queueDeclare().getQueue();
//綁定隊(duì)列和交換機(jī)
channel.queueBind(queue,"logs_direct","error");
channel.queueBind(queue,"logs_direct","info");
channel.queueBind(queue,"logs_direct","warn");
//消費(fèi)消息
channel.basicConsume(queue,true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消費(fèi)者1: "+new String(body));
}
});
3.開發(fā)消費(fèi)者-2
//聲明交換機(jī)
channel.exchangeDeclare("logs_direct","direct");
//創(chuàng)建臨時(shí)隊(duì)列
String queue = channel.queueDeclare().getQueue();
//綁定隊(duì)列和交換機(jī)
channel.queueBind(queue,"logs_direct","error");
//消費(fèi)消息
channel.basicConsume(queue,true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消費(fèi)者2: "+new String(body));
}
});
4.測(cè)試生產(chǎn)者發(fā)送Route key為error的消息時(shí)
[圖片上傳失敗...(image-e0aa80-1612771575087)]
[圖片上傳失敗...(image-d6b9d7-1612771575087)]
5.測(cè)試生產(chǎn)者發(fā)送Route key為info的消息時(shí)
[圖片上傳失敗...(image-416aff-1612771575086)]
[圖片上傳失敗...(image-7c7340-1612771575086)]
4.6.2 Routing 之訂閱模型-Topic
Topic
類型的Exchange
與Direct
相比,都是可以根據(jù)RoutingKey
把消息路由到不同的隊(duì)列晨横。只不過(guò)Topic
類型Exchange
可以讓隊(duì)列在綁定Routing key
的時(shí)候使用通配符洋腮!這種模型Routingkey
一般都是由一個(gè)或多個(gè)單詞組成,多個(gè)單詞之間以”.”分割手形,例如: item.insert
[圖片上傳失敗...(image-f499b2-1612771575086)]
<pre spellcheck="false" class="md-fences md-end-block md-fences-with-lineno ty-contain-cm modeLoaded" lang="markdown" cid="n219" mdtype="fences" style="box-sizing: border-box; overflow: visible; font-family: Monaco, Consolas, "Andale Mono", "DejaVu Sans Mono", monospace; margin-top: 0px; margin-bottom: 20px; font-size: 0.9rem; display: block; break-inside: avoid; text-align: left; white-space: normal; background: rgb(51, 51, 51); position: relative !important; padding: 10px 10px 10px 0px; width: inherit;"> # 統(tǒng)配符
- (star) can substitute for exactly one word. 匹配不多不少恰好1個(gè)詞
(hash) can substitute for zero or more words. 匹配一個(gè)或多個(gè)詞
如:
audit.# 匹配audit.irs.corporate或者 audit.irs 等
audit.* 只能匹配 audit.irs</pre>
1.開發(fā)生產(chǎn)者
<pre spellcheck="false" class="md-fences md-end-block md-fences-with-lineno ty-contain-cm modeLoaded" lang="java" cid="n221" mdtype="fences" style="box-sizing: border-box; overflow: visible; font-family: Monaco, Consolas, "Andale Mono", "DejaVu Sans Mono", monospace; margin-top: 0px; margin-bottom: 20px; font-size: 0.9rem; display: block; break-inside: avoid; text-align: left; white-space: normal; background: rgb(51, 51, 51); position: relative !important; padding: 10px 10px 10px 0px; width: inherit;"> //生命交換機(jī)和交換機(jī)類型 topic 使用動(dòng)態(tài)路由(通配符方式)
channel.exchangeDeclare("topics","topic");
String routekey = "user.save";//動(dòng)態(tài)路由key
//發(fā)布消息
channel.basicPublish("topics",routekey,null,("這是路由中的動(dòng)態(tài)訂閱模型,route key: ["+routekey+"]").getBytes());</pre>
2.開發(fā)消費(fèi)者-1
Routing Key中使用*通配符方式
<pre spellcheck="false" class="md-fences md-end-block md-fences-with-lineno ty-contain-cm modeLoaded" lang="java" cid="n224" mdtype="fences" style="box-sizing: border-box; overflow: visible; font-family: Monaco, Consolas, "Andale Mono", "DejaVu Sans Mono", monospace; margin-top: 0px; margin-bottom: 20px; font-size: 0.9rem; display: block; break-inside: avoid; text-align: left; white-space: normal; background: rgb(51, 51, 51); position: relative !important; padding: 10px 10px 10px 0px; width: inherit;"> //聲明交換機(jī)
channel.exchangeDeclare("topics","topic");
//創(chuàng)建臨時(shí)隊(duì)列
String queue = channel.queueDeclare().getQueue();
//綁定隊(duì)列與交換機(jī)并設(shè)置獲取交換機(jī)中動(dòng)態(tài)路由
channel.queueBind(queue,"topics","user.*");
//消費(fèi)消息
channel.basicConsume(queue,true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消費(fèi)者1: "+new String(body));
}
});</pre>
3.開發(fā)消費(fèi)者-2
Routing Key中使用#通配符方式
<pre spellcheck="false" class="md-fences md-end-block md-fences-with-lineno ty-contain-cm modeLoaded" lang="java" cid="n227" mdtype="fences" style="box-sizing: border-box; overflow: visible; font-family: Monaco, Consolas, "Andale Mono", "DejaVu Sans Mono", monospace; margin-top: 0px; margin-bottom: 20px; font-size: 0.9rem; display: block; break-inside: avoid; text-align: left; white-space: normal; background: rgb(51, 51, 51); position: relative !important; padding: 10px 10px 10px 0px; width: inherit;"> //聲明交換機(jī)
channel.exchangeDeclare("topics","topic");
//創(chuàng)建臨時(shí)隊(duì)列
String queue = channel.queueDeclare().getQueue();
//綁定隊(duì)列與交換機(jī)并設(shè)置獲取交換機(jī)中動(dòng)態(tài)路由
channel.queueBind(queue,"topics","user.#");
//消費(fèi)消息
channel.basicConsume(queue,true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消費(fèi)者2: "+new String(body));
}
});</pre>
4.測(cè)試結(jié)果
[圖片上傳失敗...(image-b45d2f-1612771575086)]
[圖片上傳失敗...(image-1dd8bf-1612771575086)]
5. SpringBoot中使用RabbitMQ
5.0 搭建初始環(huán)境
1. 引入依賴
<pre spellcheck="false" class="md-fences md-end-block md-fences-with-lineno ty-contain-cm modeLoaded" lang="xml" cid="n234" mdtype="fences" style="box-sizing: border-box; overflow: visible; font-family: Monaco, Consolas, "Andale Mono", "DejaVu Sans Mono", monospace; margin-top: 0px; margin-bottom: 20px; font-size: 0.9rem; display: block; break-inside: avoid; text-align: left; white-space: normal; background: rgb(51, 51, 51); position: relative !important; padding: 10px 10px 10px 0px; width: inherit;"> <dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency></pre>
2. 配置配置文件
<pre spellcheck="false" class="md-fences md-end-block md-fences-with-lineno ty-contain-cm modeLoaded" lang="yml" cid="n236" mdtype="fences" style="box-sizing: border-box; overflow: visible; font-family: Monaco, Consolas, "Andale Mono", "DejaVu Sans Mono", monospace; margin-top: 0px; margin-bottom: 20px; font-size: 0.9rem; display: block; break-inside: avoid; text-align: left; white-space: normal; background: rgb(51, 51, 51); position: relative !important; padding: 10px 10px 10px 0px; width: inherit;"> spring:
application:
name: springboot_rabbitmq
rabbitmq:
host: 10.15.0.9
port: 5672
username: ems
password: 123
virtual-host: /ems</pre>
RabbitTemplate
用來(lái)簡(jiǎn)化操作 使用時(shí)候直接在項(xiàng)目中注入即可使用
5.1 第一種hello world模型使用
-
開發(fā)生產(chǎn)者
<pre spellcheck="false" class="md-fences md-end-block md-fences-with-lineno ty-contain-cm modeLoaded" lang="java" cid="n242" mdtype="fences" style="box-sizing: border-box; overflow: visible; font-family: Monaco, Consolas, "Andale Mono", "DejaVu Sans Mono", monospace; margin-top: 0px; margin-bottom: 20px; font-size: 0.9rem; display: block; break-inside: avoid; text-align: left; white-space: normal; background: rgb(51, 51, 51); position: relative !important; padding: 10px 10px 10px 0px; width: inherit;"> @Autowired
private RabbitTemplate rabbitTemplate;@Test
public void testHello(){
rabbitTemplate.convertAndSend("hello","hello world");
}</pre> -
開發(fā)消費(fèi)者
<pre spellcheck="false" class="md-fences md-end-block md-fences-with-lineno ty-contain-cm modeLoaded" lang="java" cid="n245" mdtype="fences" style="box-sizing: border-box; overflow: visible; font-family: Monaco, Consolas, "Andale Mono", "DejaVu Sans Mono", monospace; margin-top: 0px; margin-bottom: 20px; font-size: 0.9rem; display: block; break-inside: avoid; text-align: left; white-space: normal; background: rgb(51, 51, 51); position: relative !important; padding: 10px 10px 10px 0px; width: inherit;"> @Component
@RabbitListener(queuesToDeclare = @Queue("hello"))
public class HelloCustomer {@RabbitHandler
public void receive1(String message){
System.out.println("message = " + message);
}
}</pre>
5.2 第二種work模型使用
-
開發(fā)生產(chǎn)者
<pre spellcheck="false" class="md-fences md-end-block md-fences-with-lineno ty-contain-cm modeLoaded" lang="java" cid="n250" mdtype="fences" style="box-sizing: border-box; overflow: visible; font-family: Monaco, Consolas, "Andale Mono", "DejaVu Sans Mono", monospace; margin-top: 0px; margin-bottom: 20px; font-size: 0.9rem; display: block; break-inside: avoid; text-align: left; white-space: normal; background: rgb(51, 51, 51); position: relative !important; padding: 10px 10px 10px 0px; width: inherit;"> @Autowired
private RabbitTemplate rabbitTemplate;@Test
public void testWork(){
for (int i = 0; i < 10; i++) {
rabbitTemplate.convertAndSend("work","hello work!");
}
}</pre> -
開發(fā)消費(fèi)者
<pre spellcheck="false" class="md-fences md-end-block md-fences-with-lineno ty-contain-cm modeLoaded" lang="java" cid="n253" mdtype="fences" style="box-sizing: border-box; overflow: visible; font-family: Monaco, Consolas, "Andale Mono", "DejaVu Sans Mono", monospace; margin-top: 0px; margin-bottom: 20px; font-size: 0.9rem; display: block; break-inside: avoid; text-align: left; white-space: normal; background: rgb(51, 51, 51); position: relative !important; padding: 10px 10px 10px 0px; width: inherit;"> @Component
public class WorkCustomer {
@RabbitListener(queuesToDeclare = @Queue("work"))
public void receive1(String message){
System.out.println("work message1 = " + message);
}@RabbitListener(queuesToDeclare = @Queue("work"))
public void receive2(String message){
System.out.println("work message2 = " + message);
}
}</pre>說(shuō)明:默認(rèn)在Spring AMQP實(shí)現(xiàn)中Work這種方式就是公平調(diào)度,如果需要實(shí)現(xiàn)能者多勞需要額外配置
5.3 Fanout 廣播模型
-
開發(fā)生產(chǎn)者
<pre spellcheck="false" class="md-fences md-end-block md-fences-with-lineno ty-contain-cm modeLoaded" lang="java" cid="n260" mdtype="fences" style="box-sizing: border-box; overflow: visible; font-family: Monaco, Consolas, "Andale Mono", "DejaVu Sans Mono", monospace; margin-top: 0px; margin-bottom: 20px; font-size: 0.9rem; display: block; break-inside: avoid; text-align: left; white-space: normal; background: rgb(51, 51, 51); position: relative !important; padding: 10px 10px 10px 0px; width: inherit;"> @Autowired
private RabbitTemplate rabbitTemplate;@Test
public void testFanout() throws InterruptedException {
rabbitTemplate.convertAndSend("logs","","這是日志廣播");
}</pre> -
開發(fā)消費(fèi)者
<pre spellcheck="false" class="md-fences md-end-block md-fences-with-lineno ty-contain-cm modeLoaded" lang="java" cid="n264" mdtype="fences" style="box-sizing: border-box; overflow: visible; font-family: Monaco, Consolas, "Andale Mono", "DejaVu Sans Mono", monospace; margin-top: 0px; margin-bottom: 20px; font-size: 0.9rem; display: block; break-inside: avoid; text-align: left; white-space: normal; background: rgb(51, 51, 51); position: relative !important; padding: 10px 10px 10px 0px; width: inherit;"> @Component
public class FanoutCustomer {@RabbitListener(bindings = @QueueBinding(
value = @Queue,
exchange = @Exchange(name="logs",type = "fanout")
))
public void receive1(String message){
System.out.println("message1 = " + message);
}@RabbitListener(bindings = @QueueBinding(
value = @Queue, //創(chuàng)建臨時(shí)隊(duì)列
exchange = @Exchange(name="logs",type = "fanout") //綁定交換機(jī)類型
))
public void receive2(String message){
System.out.println("message2 = " + message);
}
}</pre>
5.4 Route 路由模型
-
開發(fā)生產(chǎn)者
<pre spellcheck="false" class="md-fences md-end-block md-fences-with-lineno ty-contain-cm modeLoaded" lang="java" cid="n270" mdtype="fences" style="box-sizing: border-box; overflow: visible; font-family: Monaco, Consolas, "Andale Mono", "DejaVu Sans Mono", monospace; margin-top: 0px; margin-bottom: 20px; font-size: 0.9rem; display: block; break-inside: avoid; text-align: left; white-space: normal; background: rgb(51, 51, 51); position: relative !important; padding: 10px 10px 10px 0px; width: inherit;"> @Autowired
private RabbitTemplate rabbitTemplate;@Test
public void testDirect(){
rabbitTemplate.convertAndSend("directs","error","error 的日志信息");
}</pre> -
開發(fā)消費(fèi)者
<pre spellcheck="false" class="md-fences md-end-block md-fences-with-lineno ty-contain-cm modeLoaded" lang="java" cid="n273" mdtype="fences" style="box-sizing: border-box; overflow: visible; font-family: Monaco, Consolas, "Andale Mono", "DejaVu Sans Mono", monospace; margin-top: 0px; margin-bottom: 20px; font-size: 0.9rem; display: block; break-inside: avoid; text-align: left; white-space: normal; background: rgb(51, 51, 51); position: relative !important; padding: 10px 10px 10px 0px; width: inherit;"> @Component
public class DirectCustomer {@RabbitListener(bindings ={
@QueueBinding(
value = @Queue(),
key={"info","error"},
exchange = @Exchange(type = "direct",name="directs")
)})
public void receive1(String message){
System.out.println("message1 = " + message);
}@RabbitListener(bindings ={
@QueueBinding(
value = @Queue(),
key={"error"},
exchange = @Exchange(type = "direct",name="directs")
)})
public void receive2(String message){
System.out.println("message2 = " + message);
}
}
</pre>
5.5 Topic 訂閱模型(動(dòng)態(tài)路由模型)
-
開發(fā)生產(chǎn)者
<pre spellcheck="false" class="md-fences md-end-block md-fences-with-lineno ty-contain-cm modeLoaded" lang="java" cid="n278" mdtype="fences" style="box-sizing: border-box; overflow: visible; font-family: Monaco, Consolas, "Andale Mono", "DejaVu Sans Mono", monospace; margin-top: 0px; margin-bottom: 20px; font-size: 0.9rem; display: block; break-inside: avoid; text-align: left; white-space: normal; background: rgb(51, 51, 51); position: relative !important; padding: 10px 10px 10px 0px; width: inherit;"> @Autowired
private RabbitTemplate rabbitTemplate;//topic
@Test
public void testTopic(){
rabbitTemplate.convertAndSend("topics","user.save.findAll","user.save.findAll 的消息");
}</pre> -
開發(fā)消費(fèi)者
<pre spellcheck="false" class="md-fences md-end-block md-fences-with-lineno ty-contain-cm modeLoaded" lang="java" cid="n282" mdtype="fences" style="box-sizing: border-box; overflow: visible; font-family: Monaco, Consolas, "Andale Mono", "DejaVu Sans Mono", monospace; margin-top: 0px; margin-bottom: 20px; font-size: 0.9rem; display: block; break-inside: avoid; text-align: left; white-space: normal; background: rgb(51, 51, 51); position: relative !important; padding: 10px 10px 10px 0px; width: inherit;"> @Component
public class TopCustomer {
@RabbitListener(bindings = {
@QueueBinding(
value = @Queue,
key = {"user.*"},
exchange = @Exchange(type = "topic",name = "topics")
)
})
public void receive1(String message){
System.out.println("message1 = " + message);
}@RabbitListener(bindings = {
@QueueBinding(
value = @Queue,
key = {"user.#"},
exchange = @Exchange(type = "topic",name = "topics")
)
})
public void receive2(String message){
System.out.println("message2 = " + message);
}
}</pre>
6. MQ的應(yīng)用場(chǎng)景
6.1 異步處理
場(chǎng)景說(shuō)明:用戶注冊(cè)后啥供,需要發(fā)注冊(cè)郵件和注冊(cè)短信,傳統(tǒng)的做法有兩種 1.串行的方式 2.并行的方式
-
串行方式:
將注冊(cè)信息寫入數(shù)據(jù)庫(kù)后,發(fā)送注冊(cè)郵件,再發(fā)送注冊(cè)短信,以上三個(gè)任務(wù)全部完成后才返回給客戶端。 這有一個(gè)問(wèn)題是,郵件,短信并不是必須的,它只是一個(gè)通知,而這種做法讓客戶端等待沒(méi)有必要等待的東西.
-
并行方式:
將注冊(cè)信息寫入數(shù)據(jù)庫(kù)后,發(fā)送郵件的同時(shí),發(fā)送短信,以上三個(gè)任務(wù)完成后,返回給客戶端,并行的方式能提高處理的時(shí)間库糠。
-
消息隊(duì)列:
假設(shè)三個(gè)業(yè)務(wù)節(jié)點(diǎn)分別使用50ms,串行方式使用時(shí)間150ms,并行使用時(shí)間100ms伙狐。雖然并行已經(jīng)提高的處理時(shí)間,但是,前面說(shuō)過(guò),郵件和短信對(duì)我正常的使用網(wǎng)站沒(méi)有任何影響,客戶端沒(méi)有必要等著其發(fā)送完成才顯示注冊(cè)成功,應(yīng)該是寫入數(shù)據(jù)庫(kù)后就返回.消息隊(duì)列
: 引入消息隊(duì)列后瞬欧,把發(fā)送郵件,短信不是必須的業(yè)務(wù)邏輯異步處理
由此可以看出,引入消息隊(duì)列后贷屎,用戶的響應(yīng)時(shí)間就等于寫入數(shù)據(jù)庫(kù)的時(shí)間+寫入消息隊(duì)列的時(shí)間(可以忽略不計(jì)),引入消息隊(duì)列后處理后,響應(yīng)時(shí)間是串行的3倍,是并行的2倍。
6.2 應(yīng)用解耦
場(chǎng)景:雙11是購(gòu)物狂節(jié),用戶下單后,訂單系統(tǒng)需要通知庫(kù)存系統(tǒng),傳統(tǒng)的做法就是訂單系統(tǒng)調(diào)用庫(kù)存系統(tǒng)的接口.
這種做法有一個(gè)缺點(diǎn):
當(dāng)庫(kù)存系統(tǒng)出現(xiàn)故障時(shí),訂單就會(huì)失敗艘虎。 訂單系統(tǒng)和庫(kù)存系統(tǒng)高耦合. 引入消息隊(duì)列
訂單系統(tǒng):
用戶下單后,訂單系統(tǒng)完成持久化處理,將消息寫入消息隊(duì)列,返回用戶訂單下單成功唉侄。庫(kù)存系統(tǒng):
訂閱下單的消息,獲取下單消息,進(jìn)行庫(kù)操作。 就算庫(kù)存系統(tǒng)出現(xiàn)故障,消息隊(duì)列也能保證消息的可靠投遞,不會(huì)導(dǎo)致消息丟失.
6.3 流量削峰
場(chǎng)景:
秒殺活動(dòng)野建,一般會(huì)因?yàn)榱髁窟^(guò)大美旧,導(dǎo)致應(yīng)用掛掉,為了解決這個(gè)問(wèn)題,一般在應(yīng)用前端加入消息隊(duì)列贬墩。
作用:
1.可以控制活動(dòng)人數(shù),超過(guò)此一定閥值的訂單直接丟棄(我為什么秒殺一次都沒(méi)有成功過(guò)呢^^)
2.可以緩解短時(shí)間的高流量壓垮應(yīng)用(應(yīng)用程序按自己的最大處理能力獲取訂單)
1.用戶的請(qǐng)求,服務(wù)器收到之后,首先寫入消息隊(duì)列,加入消息隊(duì)列長(zhǎng)度超過(guò)最大值,則直接拋棄用戶請(qǐng)求或跳轉(zhuǎn)到錯(cuò)誤頁(yè)面.
2.秒殺業(yè)務(wù)根據(jù)消息隊(duì)列中的請(qǐng)求信息妄呕,再做后續(xù)處理.
7. RabbitMQ的集群
7.1 集群架構(gòu)
7.1.1 普通集群(副本集群)
All data/state required for the operation of a RabbitMQ broker is replicated across all nodes. An exception to this are message queues, which by default reside on one node, though they are visible and reachable from all nodes. To replicate queues across nodes in a cluster --摘自官網(wǎng)
默認(rèn)情況下:RabbitMQ代理操作所需的所有數(shù)據(jù)/狀態(tài)都將跨所有節(jié)點(diǎn)復(fù)制陶舞。這方面的一個(gè)例外是消息隊(duì)列,默認(rèn)情況下绪励,消息隊(duì)列位于一個(gè)節(jié)點(diǎn)上肿孵,盡管它們可以從所有節(jié)點(diǎn)看到和訪問(wèn)
-
架構(gòu)圖
核心解決問(wèn)題: 當(dāng)集群中某一時(shí)刻master節(jié)點(diǎn)宕機(jī),可以對(duì)Quene中信息,進(jìn)行備份
-
集群搭建
<pre spellcheck="false" class="md-fences md-end-block md-fences-with-lineno ty-contain-cm modeLoaded" lang="markdown" cid="n336" mdtype="fences" style="box-sizing: border-box; overflow: visible; font-family: Monaco, Consolas, "Andale Mono", "DejaVu Sans Mono", monospace; margin-top: 0px; margin-bottom: 20px; font-size: 0.9rem; display: block; break-inside: avoid; text-align: left; white-space: normal; background: rgb(51, 51, 51); position: relative !important; padding: 10px 10px 10px 0px; width: inherit;"> # 0.集群規(guī)劃
node1: 10.15.0.3 mq1 master 主節(jié)點(diǎn)
node2: 10.15.0.4 mq2 repl1 副本節(jié)點(diǎn)
node3: 10.15.0.5 mq3 repl2 副本節(jié)點(diǎn)1.克隆三臺(tái)機(jī)器主機(jī)名和ip映射
vim /etc/hosts加入:
10.15.0.3 mq1
10.15.0.4 mq2
10.15.0.5 mq3
node1: vim /etc/hostname 加入: mq1
node2: vim /etc/hostname 加入: mq2
node3: vim /etc/hostname 加入: mq32.三個(gè)機(jī)器安裝rabbitmq,并同步cookie文件,在node1上執(zhí)行:
scp /var/lib/rabbitmq/.erlang.cookie root@mq2:/var/lib/rabbitmq/
scp /var/lib/rabbitmq/.erlang.cookie root@mq3:/var/lib/rabbitmq/3.查看cookie是否一致:
node1: cat /var/lib/rabbitmq/.erlang.cookie
node2: cat /var/lib/rabbitmq/.erlang.cookie
node3: cat /var/lib/rabbitmq/.erlang.cookie4.后臺(tái)啟動(dòng)rabbitmq所有節(jié)點(diǎn)執(zhí)行如下命令,啟動(dòng)成功訪問(wèn)管理界面:
rabbitmq-server -detached
5.在node2和node3執(zhí)行加入集群命令:
1.關(guān)閉 rabbitmqctl stop_app
2.加入集群 rabbitmqctl join_cluster rabbit@mq1
3.啟動(dòng)服務(wù) rabbitmqctl start_app6.查看集群狀態(tài),任意節(jié)點(diǎn)執(zhí)行:
rabbitmqctl cluster_status
7.如果出現(xiàn)如下顯示,集群搭建成功:
Cluster status of node rabbit@mq3 ...
[{nodes,[{disc,[rabbit@mq1,rabbit@mq2,rabbit@mq3]}]},
{running_nodes,[rabbit@mq1,rabbit@mq2,rabbit@mq3]},
{cluster_name,<<"rabbit@mq1">>},
{partitions,[]},
{alarms,[{rabbit@mq1,[]},{rabbit@mq2,[]},{rabbit@mq3,[]}]}]8.登錄管理界面,展示如下狀態(tài):</pre>
<pre spellcheck="false" class="md-fences md-end-block md-fences-with-lineno ty-contain-cm modeLoaded" lang="markdown" cid="n338" mdtype="fences" style="box-sizing: border-box; overflow: visible; font-family: Monaco, Consolas, "Andale Mono", "DejaVu Sans Mono", monospace; margin-top: 0px; margin-bottom: 20px; font-size: 0.9rem; display: block; break-inside: avoid; text-align: left; white-space: normal; background: rgb(51, 51, 51); position: relative !important; padding: 10px 10px 10px 0px; width: inherit;"> # 9.測(cè)試集群在node1上,創(chuàng)建隊(duì)列</pre>
<pre spellcheck="false" class="md-fences md-end-block md-fences-with-lineno ty-contain-cm modeLoaded" lang="markdown" cid="n340" mdtype="fences" style="box-sizing: border-box; overflow: visible; font-family: Monaco, Consolas, "Andale Mono", "DejaVu Sans Mono", monospace; margin-top: 0px; margin-bottom: 20px; font-size: 0.9rem; display: block; break-inside: avoid; text-align: left; white-space: normal; background: rgb(51, 51, 51); position: relative !important; padding: 10px 10px 10px 0px; width: inherit;"> # 10.查看node2和node3節(jié)點(diǎn):</pre>
<pre spellcheck="false" class="md-fences md-end-block md-fences-with-lineno ty-contain-cm modeLoaded" lang="markdown" cid="n343" mdtype="fences" style="box-sizing: border-box; overflow: visible; font-family: Monaco, Consolas, "Andale Mono", "DejaVu Sans Mono", monospace; margin-top: 0px; margin-bottom: 20px; font-size: 0.9rem; display: block; break-inside: avoid; text-align: left; white-space: normal; background: rgb(51, 51, 51); position: relative !important; padding: 10px 10px 10px 0px; width: inherit;"> # 11.關(guān)閉node1節(jié)點(diǎn),執(zhí)行如下命令,查看node2和node3:
rabbitmqctl stop_app</pre>
7.1.2 鏡像集群
This guide covers mirroring (queue contents replication) of classic queues --摘自官網(wǎng)
By default, contents of a queue within a RabbitMQ cluster are located on a single node (the node on which the queue was declared). This is in contrast to exchanges and bindings, which can always be considered to be on all nodes. Queues can optionally be made mirrored across multiple nodes. --摘自官網(wǎng)
鏡像隊(duì)列機(jī)制就是將隊(duì)列在三個(gè)節(jié)點(diǎn)之間設(shè)置主從關(guān)系,消息會(huì)在三個(gè)節(jié)點(diǎn)之間進(jìn)行自動(dòng)同步疏魏,且如果其中一個(gè)節(jié)點(diǎn)不可用停做,并不會(huì)導(dǎo)致消息丟失或服務(wù)不可用的情況,提升MQ集群的整體高可用性大莫。
-
集群架構(gòu)圖
-
配置集群架構(gòu)
<pre spellcheck="false" class="md-fences md-end-block md-fences-with-lineno ty-contain-cm modeLoaded" lang="markdown" cid="n360" mdtype="fences" style="box-sizing: border-box; overflow: visible; font-family: Monaco, Consolas, "Andale Mono", "DejaVu Sans Mono", monospace; margin-top: 0px; margin-bottom: 20px; font-size: 0.9rem; display: block; break-inside: avoid; text-align: left; white-space: normal; background: rgb(51, 51, 51); position: relative !important; padding: 10px 10px 10px 0px; width: inherit; color: rgb(184, 191, 198); font-style: normal; font-variant-ligatures: normal; font-variant-caps: normal; font-weight: 400; letter-spacing: normal; orphans: 2; text-indent: 0px; text-transform: none; widows: 2; word-spacing: 0px; -webkit-text-stroke-width: 0px; text-decoration-style: initial; text-decoration-color: initial;"> # 0.策略說(shuō)明
rabbitmqctl set_policy [-p <vhost>] [--priority <priority>] [--apply-to <apply-to>] <name> <pattern> <definition>
-p Vhost: 可選參數(shù)蛉腌,針對(duì)指定vhost下的queue進(jìn)行設(shè)置
Name: policy的名稱
Pattern: queue的匹配模式(正則表達(dá)式)
Definition:鏡像定義,包括三個(gè)部分ha-mode, ha-params, ha-sync-mode
ha-mode:指明鏡像隊(duì)列的模式,有效值為 all/exactly/nodes
all:表示在集群中所有的節(jié)點(diǎn)上進(jìn)行鏡像
exactly:表示在指定個(gè)數(shù)的節(jié)點(diǎn)上進(jìn)行鏡像烙丛,節(jié)點(diǎn)的個(gè)數(shù)由ha-params指定
nodes:表示在指定的節(jié)點(diǎn)上進(jìn)行鏡像舅巷,節(jié)點(diǎn)名稱通過(guò)ha-params指定
ha-params:ha-mode模式需要用到的參數(shù)
ha-sync-mode:進(jìn)行隊(duì)列中消息的同步方式,有效值為automatic和manual
priority:可選參數(shù)河咽,policy的優(yōu)先級(jí)1.查看當(dāng)前策略
rabbitmqctl list_policies
2.添加策略
rabbitmqctl set_policy ha-all '^hello' '{"ha-mode":"all","ha-sync-mode":"automatic"}'
說(shuō)明:策略正則表達(dá)式為 “^” 表示所有匹配所有隊(duì)列名稱 ^hello:匹配hello開頭隊(duì)列3.刪除策略
rabbitmqctl clear_policy ha-all
4.測(cè)試集群</pre>