RabbitMQ是一個(gè)支持AMQP(高級(jí)消息協(xié)議)協(xié)議的消息中間件饺谬。
RabbitMQ的消息通信模型如下所示:
RabbitMQ邏輯圖
消息中間件主要就是用來(lái)發(fā)送和消費(fèi)消息。在RabbitMQ中物赶,消息通過(guò)Publisher發(fā)送者將消息出去培遵,由Consumer接收者接收到消息,并執(zhí)行對(duì)應(yīng)的業(yè)務(wù)邏輯蹂风。但Publisher并不是直接將消息發(fā)送給Consumer,而是先將發(fā)送到一個(gè)Exchange交換機(jī)乾蓬,每一個(gè)交換機(jī)都會(huì)Binding綁定一個(gè)Queue隊(duì)列惠啄,所以Publisher將消息發(fā)送到Exchange交換機(jī)后路由到對(duì)應(yīng)的Queue,將消息存放在Queue中任内,等待Consumer來(lái)拉取數(shù)據(jù)撵渡。
消息的發(fā)布與消費(fèi)簡(jiǎn)單流程如下:
- Publisher通過(guò)Channel信道將消息發(fā)送到RabbitMQ服務(wù)中的Exchange;
- Exchange通過(guò)Binding綁定規(guī)則找到對(duì)應(yīng)的Queue并將消息發(fā)送到對(duì)應(yīng)的Queue隊(duì)列中;
- Consumer消費(fèi)者通過(guò)Channel信道從Queue中拉取數(shù)據(jù)死嗦;
Exchange消息分發(fā)策略
Publisher發(fā)送消息的時(shí)候會(huì)指定Exchange交換機(jī)的名稱趋距,并指定一個(gè)路由key,該路由key會(huì)將消息發(fā)送到具體綁定了這個(gè)路由key對(duì)應(yīng)的Queue中越除。Exchange交換機(jī)將消息發(fā)送到Queue有幾種分發(fā)策略:
- direct:當(dāng)指定direct分發(fā)策略時(shí)霞捡,如果消息的路由key與隊(duì)列綁定的路由key相同時(shí),交換器就會(huì)將消息發(fā)送到該隊(duì)列中俄占。例如發(fā)送消息是指定路由key為 rk1 监署,那么如果隊(duì)列綁定的路由key也是 rk1,那么交換機(jī)會(huì)將消息發(fā)送到該隊(duì)列;
- fanout:當(dāng)指定fanout分發(fā)策略時(shí),交換機(jī)不會(huì)處理路由key,交換機(jī)會(huì)將消息發(fā)送到所有綁定了在該交換機(jī)的隊(duì)列上箱熬。
- topic:當(dāng)指定topic分發(fā)策略時(shí),交換器會(huì)通過(guò)模式匹配分發(fā)消息狈邑,如果路由key與某個(gè)模式匹配時(shí)城须,交換機(jī)就會(huì)將消息發(fā)送到與該模式匹配的隊(duì)列中。例如某個(gè)隊(duì)列 queue 綁定的路由key的模式為 a.# 官地,當(dāng) publisher 發(fā)送消息時(shí)酿傍,如果指定發(fā)送的路由key為 a.b 或者是 a.c 時(shí),該隊(duì)列將會(huì)收到路由器發(fā)送的消息驱入。
- headers: 這種分發(fā)策略幾乎不用了赤炒。
引入pox.xml文件
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>4.1.0</version>
</dependency>
消費(fèi)生產(chǎn)者Producer
/**
* 消息的生產(chǎn)者
*/
public class Producer {
public static void main(String[] args) throws Exception {
//創(chuàng)建連接工廠
ConnectionFactory factory = new ConnectionFactory();
//設(shè)置用戶名與密碼
factory.setUsername("guest");
factory.setPassword("guest");
//設(shè)置 RabbitMQ 地址
factory.setHost("localhost");
//獲取連接,連接到代理服務(wù)器
Connection connection = factory.newConnection();
//獲取通信信道
Channel channel = connection.createChannel();
//設(shè)置exchange交換器的名稱
String exchangeName = "greeting-exchange";
//聲明exchange交換器
//第二個(gè)參數(shù)聲明綁定的方式為direct
//第三個(gè)參數(shù)為是否持久化,設(shè)置為true表示持久化,反之是非持久化,
// 持久化的可以將交換器存盤,在服務(wù)器重啟的時(shí)候不會(huì)丟失信息
channel.exchangeDeclare(exchangeName, BuiltinExchangeType.DIRECT, true);
//指定路由的key
String routingKey = "xsg";
//發(fā)送的消息體
byte[] messageBodyBytes = "hello, how are you!".getBytes();
//發(fā)布信息亏较,也就是將消息發(fā)布到greeting-exchange的交換器
// 并指定一個(gè)路由key莺褒,表示要路由到哪一個(gè)隊(duì)列
channel.basicPublish(exchangeName, routingKey, null, messageBodyBytes);
//關(guān)閉信道
channel.close();
//關(guān)閉連接
connection.close();
}
}
具體步驟:
- 通過(guò)ConnectFactory獲取連接工廠,設(shè)置RabbitMQ服務(wù)器的用戶名雪情、密碼還有服務(wù)器地址遵岩;
- 調(diào)用ConnectFactory的newConnection方法獲取一個(gè)連接Connection,連接到代理服務(wù)器巡通;
- 通過(guò)Connection的createChannel獲取到通信通道尘执,消息將通過(guò)該通信通道發(fā)布出去;
- 定義Exchange交換機(jī)的名稱宴凉,也就是要將消息發(fā)送到哪個(gè)交換機(jī)誊锭;
- 通過(guò)exchangeDeclare聲明一個(gè)交換機(jī),需要指定這個(gè)交換機(jī)分發(fā)消息的策略弥锄,這里指定為direct丧靡,表示將消息路由到路由key一致的隊(duì)列中;
- 通過(guò)管道channel的basicPublish方法發(fā)布消息籽暇,需要指定交換機(jī)的名稱温治,表示要將消息發(fā)送到哪個(gè)交換機(jī);指定路由key戒悠,表現(xiàn)交換機(jī)要將消息發(fā)送到哪個(gè)隊(duì)列中熬荆;
- 發(fā)送成功后關(guān)閉管道與連接;
消息的消費(fèi)者Consumer
/**
* 消息的消費(fèi)者
*/
public class Consumer {
public static void main(String[] args) throws Exception {
//創(chuàng)建連接工廠
ConnectionFactory factory = new ConnectionFactory();
//設(shè)置用戶名與密碼
factory.setUsername("guest");
factory.setPassword("guest");
//設(shè)置 RabbitMQ 地址
factory.setHost("localhost");
//獲取連接绸狐,連接到代理服務(wù)器
Connection connection = factory.newConnection();
//獲取通信信道
final Channel channel = connection.createChannel();
//設(shè)置exchange交換器的名稱
String exchangeName = "greeting-exchange";
//聲明exchange交換器
//第二個(gè)參數(shù)聲明綁定的方式為direct
//第三個(gè)參數(shù)為是否持久化,設(shè)置為true表示持久化,反之是非持久化,
// 持久化的可以將交換器存盤,在服務(wù)器重啟的時(shí)候不會(huì)丟失信息
channel.exchangeDeclare(exchangeName, BuiltinExchangeType.DIRECT, true);
//獲取隊(duì)列的名稱
String queueName = channel.queueDeclare().getQueue();
//指定路由的key
String routingKey = "xsg";
//交換器與隊(duì)列通過(guò)路由鍵判定起來(lái)
channel.queueBind(queueName, exchangeName, routingKey);
while (true) {
//是否自動(dòng)應(yīng)答
boolean autoAck = false;
String consumerTag = "";
channel.basicConsume(queueName, autoAck, consumerTag, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body) throws IOException {
String routingKey = envelope.getRoutingKey();
System.out.println("消息的路由鍵:" + routingKey);
//deliveryTag為條消息的一個(gè)唯一id惶看,用來(lái)告訴RabbitMQ哪條消息被消費(fèi)了
long deliveryTag = envelope.getDeliveryTag();
//第二個(gè)參數(shù)說(shuō)明如何處理這個(gè)失敗消息捏顺,為 true 表示該消息重新放回隊(duì)列頭六孵,值為 false 表示放棄這條消息
channel.basicAck(deliveryTag, false);
System.out.println("消費(fèi)的消息體內(nèi)容:" + new String(body, "utf-8"));
}
});
}
}
}
具體步驟:
- 通過(guò)ConnectFactory獲取連接工廠纬黎,設(shè)置RabbitMQ服務(wù)器的用戶名、密碼還有服務(wù)器地址劫窒;
- 調(diào)用ConnectFactory的newConnection方法獲取一個(gè)連接Connection本今,連接到代理服務(wù)器;
- 通過(guò)Connection的createChannel獲取到通信通道主巍,消費(fèi)者將通過(guò)該通信通道獲取消息冠息;
- 定義Exchange交換機(jī)的名稱,也就是要將消息發(fā)送到哪個(gè)交換機(jī)孕索;
- 通過(guò)exchangeDeclare聲明一個(gè)交換機(jī)逛艰,需要指定這個(gè)交換機(jī)分發(fā)消息的策略,這里指定為direct搞旭,表示將消息路由到路由key一致的隊(duì)列中散怖;
- 通過(guò)channel.queueDeclare().getQueue() 獲取隊(duì)列名稱,并定義一個(gè)路由key肄渗;
- 通過(guò)channel.queueBind 將交換機(jī)與隊(duì)列綁定起來(lái)镇眷,需要傳入隊(duì)列Queue的名稱、交換機(jī)的名稱翎嫡、路由Key欠动,表示要將某個(gè)交換機(jī)通過(guò)路由key綁定到某個(gè)隊(duì)列;
- 通過(guò) channel.basicConsume 獲取消息惑申,需要傳入隊(duì)列名稱具伍,表示獲取哪個(gè)隊(duì)列的消息;
- 處理消息后關(guān)閉管道與連接圈驼;
運(yùn)行結(jié)果:
首先運(yùn)行Consumer人芽,然后運(yùn)行Product,可以看到Consumer控制臺(tái)輸出以下信息:
消息的路由鍵:xsg
消費(fèi)的消息體內(nèi)容:hello, how are you!