1.消息中間件概述
MQ全稱為Message Queue教届,消息隊(duì)列是應(yīng)用程序和應(yīng)用程序之間的通信方法橙困。
MQ能為我們帶來什么署驻?
在項(xiàng)目中奋献,可將一些無需即時(shí)返回且耗時(shí)的操作提取出來,進(jìn)行異步處理旺上,而這種異步處理的方式大大的節(jié)省了服務(wù)器的請(qǐng)求響應(yīng)時(shí)間瓶蚂,從而提高了系統(tǒng)的吞吐量。
MQ的優(yōu)勢(shì)
- 應(yīng)用解耦:提高系統(tǒng)容錯(cuò)性和可維護(hù)性
MQ相當(dāng)于一個(gè)中介宣吱,生產(chǎn)方通過MQ與消費(fèi)方交互窃这,它將應(yīng)用程序進(jìn)行解耦合。MQ作為中間件征候,MQ其中一端的應(yīng)用系統(tǒng)調(diào)整可以不影響到另外一端
- 異步提速:提升用戶體驗(yàn)和系統(tǒng)吞吐量
將不需要同步處理的并且耗時(shí)長(zhǎng)的操作由消息隊(duì)列通知消息接收方進(jìn)行異步處理杭攻。提高了應(yīng)用程序的響應(yīng)時(shí)間祟敛。
- 削峰填谷:提高系統(tǒng)穩(wěn)定性
例如減緩數(shù)據(jù)庫壓力,將MQ以消息隊(duì)列堆積起來兆解,系統(tǒng)就可以按照自己的消費(fèi)能力來消費(fèi)這樣一來馆铁,高峰期產(chǎn)生的數(shù)據(jù)勢(shì)必會(huì)被積壓在MQ中,高峰就被“削”掉了痪宰。但是因?yàn)橄⒎e壓叼架,在高峰期過后的一段時(shí)間內(nèi),消費(fèi)消息的速度還是會(huì)維持原先的值衣撬,直到消費(fèi)完積壓的消息,這就叫做“填谷”
MQ的劣勢(shì)
- 高可用問題
- 復(fù)雜性問題
- 一致性問題
MQ是消息通信的模型乖订;實(shí)現(xiàn)MQ的大致有兩種主流方式:AMQP、JMS具练。
- AMQP(Advanced Message Queue 高級(jí)消息隊(duì)列協(xié)議)
AMQP是一種協(xié)議乍构,更準(zhǔn)確的說是一種binary wire-level protocol(鏈接協(xié)議)。這是其和JMS的本質(zhì)差別扛点,AMQP不從API層進(jìn)行限定哥遮,而是直接定義網(wǎng)絡(luò)交換的數(shù)據(jù)格式。AMQP 是協(xié)議陵究,類比HTTP眠饮。
- JMS
JMS即Java消息服務(wù)(JavaMessage Service)應(yīng)用程序接口,是一個(gè)Java平臺(tái)中關(guān)于面向消息中間件(MOM)的API铜邮,用于在兩個(gè)應(yīng)用程序之間仪召,或分布式系統(tǒng)中發(fā)送消息,進(jìn)行異步通信松蒜。JMS 是 API 規(guī)范接口扔茅,類比 JDBC。
- AMQP 與 JMS 區(qū)別
* JMS是定義了統(tǒng)一的接口秸苗,來對(duì)消息操作進(jìn)行統(tǒng)一召娜;AMQP是通過規(guī)定協(xié)議來統(tǒng)一數(shù)據(jù)交互的格式 * JMS限定了必須使用Java語言;AMQP只是協(xié)議惊楼,不規(guī)定實(shí)現(xiàn)方式玖瘸,因此是跨語言的。 * JMS規(guī)定了兩種消息模式檀咙;而AMQP的消息模式更加豐富
- MQ常見產(chǎn)品
RabibitMQ:基于AMQP協(xié)議店读,erlang語言開發(fā),穩(wěn)定性好
ActiveMQ:基于JMS
RocketMQ:基于JMS攀芯,阿里巴巴產(chǎn)品
Kafka:類似MQ的產(chǎn)品;分布式消息系統(tǒng)文虏,高吞吐量
2.快速入門
- 創(chuàng)建工程侣诺,添加依賴
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.6.0</version>
</dependency>
- 生產(chǎn)者
public class Producer {
static final String QUEUE_NAME="simple_queue";
public static void main(String[] args) throws IOException, TimeoutException {
//一殖演、創(chuàng)建工廠
ConnectionFactory connectionFactory = new ConnectionFactory();
/**
* 二、設(shè)置參數(shù)
* 1.主機(jī)地址
* 2.連接端口
* 3.虛擬主機(jī)名稱
* 4.連接用戶名
* 5.連接密碼
*/
connectionFactory.setHost("localhost");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
//創(chuàng)建連接
Connection connection = connectionFactory.newConnection();
//創(chuàng)建頻道
Channel channel = connection.createChannel();
/**
* 三年鸳、聲明(創(chuàng)建)隊(duì)列
* 參數(shù)1:隊(duì)列名稱
* 參數(shù)2:是否定義持久化隊(duì)列
* 參數(shù)3:是否獨(dú)占本次連接
* 參數(shù)4:是否在不使用的時(shí)候自動(dòng)刪除隊(duì)列
* 參數(shù)5:隊(duì)列其它參數(shù)
*/
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
String message="Hello! RabbitMQ";
?
/**
* 參數(shù)1:交換機(jī)名稱趴久,如果沒有指定則使用默認(rèn)Default Exchage
* 參數(shù)2:路由key,簡(jiǎn)單模式可以傳遞隊(duì)列名稱
* 參數(shù)3:消息其它屬性
* 參數(shù)4:消息內(nèi)容
*/
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println("已發(fā)送消息:" + message);
// 關(guān)閉資源
channel.close();
connection.close();
}
}
- 消費(fèi)者
public class consumer {
static final String QUEUE_NAME="simple_queue";
public static void main(String[] args) throws IOException, TimeoutException {
//創(chuàng)建工廠
ConnectionFactory connectionFactory = new ConnectionFactory();
/**
* 設(shè)置參數(shù)
* 1.主機(jī)地址
* 2.連接端口
* 3.虛擬主機(jī)名稱
* 4.連接用戶名
* 5.連接密碼
*/
connectionFactory.setHost("localhost");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
//創(chuàng)建連接
Connection connection = connectionFactory.newConnection();
//創(chuàng)建頻道
Channel channel = connection.createChannel();
/**
* 聲明(創(chuàng)建)隊(duì)列
* 參數(shù)1:隊(duì)列名稱
* 參數(shù)2:是否定義持久化隊(duì)列
* 參數(shù)3:是否獨(dú)占本次連接
* 參數(shù)4:是否在不使用的時(shí)候自動(dòng)刪除隊(duì)列
* 參數(shù)5:隊(duì)列其它參數(shù)
*/
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
//創(chuàng)建消費(fèi)者
DefaultConsumer consumer=new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
/**
* consumerTag 消息者標(biāo)簽,在channel.basicConsume時(shí)候可以指定
* envelope 消息包的內(nèi)容搔确,可從中獲取消息id彼棍,消息routingkey,交換機(jī)膳算,消息和重傳標(biāo)志(收到消息失敗后是否需要重新發(fā)送)
* properties 屬性信息
* body 消息
*/
//路由key
System.out.println("路由key為:" + envelope.getRoutingKey());
//交換機(jī)
System.out.println("交換機(jī)為:" + envelope.getExchange());
//消息id
System.out.println("消息id為:" + envelope.getDeliveryTag());
//收到的消息
System.out.println("接收到的消息為:" + new String(body, "utf-8"));
}
};
//監(jiān)聽消息
/**
* 參數(shù)1:隊(duì)列名稱
* 參數(shù)2:是否自動(dòng)確認(rèn)座硕,設(shè)置為true為表示消息接收到自動(dòng)向mq回復(fù)接收到了,mq接收到回復(fù)會(huì)刪除消息涕蜂,設(shè)置為false則需要手動(dòng)確認(rèn)
* 參數(shù)3:消息接收到后回調(diào)
*/
channel.basicConsume(QUEUE_NAME, true, consumer);
//不關(guān)閉資源华匾,應(yīng)該一直監(jiān)聽消息
//channel.close();
//connection.close();
?
}
?
}
上述的快速入門,其實(shí)使用的就是工作模式中的簡(jiǎn)單模式机隙。
P:生產(chǎn)者蜘拉,也就是要發(fā)送消息的程序
C:消費(fèi)者:消息的接受者,會(huì)一直等待消息到來有鹿。
queue:消息隊(duì)列旭旭,圖中紅色部分。類似一個(gè)郵箱葱跋,可以緩存消息持寄;生產(chǎn)者向其中投遞消息,消費(fèi)者從其中取出消息年局。
3.工作模式
3.1 Work queues工作隊(duì)列模式
應(yīng)用場(chǎng)景:對(duì)于任務(wù)過重或任務(wù)較多情況使用工作隊(duì)列可以提高任務(wù)處理的速度际看。
- 生產(chǎn)者
public class Producer_workQueues {
static final String QUEUE_NAME="work_queue";
public static void main(String[] args) throws IOException, TimeoutException {
//創(chuàng)建連接
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("localhost");
connectionFactory.setPort(5672);
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
connectionFactory.setVirtualHost("/");
Connection connection = connectionFactory.newConnection();
//創(chuàng)建通道
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME,true,false,false,null);
for (int i = 0; i <30; i++) {
String message = i+"Hello,RabbitMQ,workqueue!";
channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
System.out.println("已發(fā)送消息:"+message);
}
//關(guān)閉資源
channel.close();
connection.close();
}
}
- 消費(fèi)者1
public class ConsumerWorkQueues01 {
static final String QUEUE_NAME="work_queue";
public static void main(String[] args) throws IOException, TimeoutException {
//創(chuàng)建連接
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("localhost");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
Connection connection = connectionFactory.newConnection();
//創(chuàng)建通道
final Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME,true,false,false,null);
channel.basicQos(1);
//創(chuàng)建消費(fèi)者
DefaultConsumer consumer=new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
try {
System.out.println("路由key為:" + envelope.getRoutingKey());
System.out.println("交換機(jī)為:" + envelope.getExchange());
System.out.println("消息id為:" + envelope.getDeliveryTag());
System.out.println("消費(fèi)者1-接收到的消息為:" + new String(body, "utf-8"));
Thread.sleep(1000);
channel.basicAck(envelope.getDeliveryTag(), false);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
};
channel.basicConsume(QUEUE_NAME, false, consumer);
}
}
- 消費(fèi)者2
public class ConsumerWorkQueues01 {
static final String QUEUE_NAME="work_queue";
public static void main(String[] args) throws IOException, TimeoutException {
//創(chuàng)建連接
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("localhost");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
Connection connection = connectionFactory.newConnection();
//創(chuàng)建通道
final Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME,true,false,false,null);
channel.basicQos(1);
//創(chuàng)建消費(fèi)者
DefaultConsumer consumer=new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
try {
System.out.println("路由key為:" + envelope.getRoutingKey());
System.out.println("交換機(jī)為:" + envelope.getExchange());
System.out.println("消息id為:" + envelope.getDeliveryTag());
System.out.println("消費(fèi)者1-接收到的消息為:" + new String(body, "utf-8"));
Thread.sleep(1000);
channel.basicAck(envelope.getDeliveryTag(), false);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
};
channel.basicConsume(QUEUE_NAME, false, consumer);
}
}
在一個(gè)隊(duì)列中如果有多個(gè)消費(fèi)者,那么消費(fèi)者之間對(duì)于同一個(gè)消息的關(guān)系是競(jìng)爭(zhēng)的關(guān)系矢否。有點(diǎn)類似
負(fù)載均衡
的意思
3.2發(fā)布與訂閱模式
- P:生產(chǎn)者仲闽,也就是要發(fā)送消息的程序,但是不再發(fā)送到隊(duì)列中僵朗,而是發(fā)給X(交換機(jī))
- C:消費(fèi)者赖欣,消息的接受者,會(huì)一直等待消息到來验庙。
- Queue:消息隊(duì)列顶吮,接收消息、緩存消息粪薛。
- Exchange:交換機(jī)悴了,圖中的X。一方面,接收生產(chǎn)者發(fā)送的消息湃交。另一方面熟空,知道如何處理消息,例如遞交給某個(gè)特別隊(duì)列搞莺、遞交給所有隊(duì)列息罗、或是將消息丟棄。到底如何操作才沧,取決于Exchange的類型迈喉。Exchange有常見以下3種類型:
* Fanout:廣播,將消息交給所有綁定到交換機(jī)的隊(duì)列 * Direct:定向温圆,把消息交給符合指定routing key 的隊(duì)列 * Topic:通配符挨摸,把消息交給符合routing pattern(路由模式) 的隊(duì)列
Exchange(交換機(jī))只負(fù)責(zé)轉(zhuǎn)發(fā)消息,不具備存儲(chǔ)消息的能力捌木,因此如果沒有任何隊(duì)列與Exchange綁定油坝,或者沒有符合路由規(guī)則的隊(duì)列,那么消息會(huì)丟失刨裆!
發(fā)布訂閱模式:
每個(gè)消費(fèi)者監(jiān)聽自己的隊(duì)列澈圈。
生產(chǎn)者將消息發(fā)給broker,由交換機(jī)將消息轉(zhuǎn)發(fā)到綁定此交換機(jī)的每個(gè)隊(duì)列帆啃,每個(gè)綁定交換機(jī)的隊(duì)列都將接收到消息瞬女。
- 生產(chǎn)者
public class ProducerPublishAndSubscribe {
//交換機(jī)名稱
static final String FANOUT_EXCHAGE = "fanout_exchange";
//隊(duì)列名稱
static final String FANOUT_QUEUE_1 = "fanout_queue_1";
//隊(duì)列名稱
static final String FANOUT_QUEUE_2 = "fanout_queue_2";
public static void main(String[] args) throws IOException, TimeoutException {
//創(chuàng)建連接
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("localhost");
connectionFactory.setPort(5672);
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
connectionFactory.setVirtualHost("/");
Connection connection = connectionFactory.newConnection();
//創(chuàng)建通道
Channel channel = connection.createChannel();
channel.exchangeDeclare(FANOUT_EXCHAGE, BuiltinExchangeType.FANOUT);
channel.queueDeclare(FANOUT_QUEUE_1,true,false,false,null);
channel.queueDeclare(FANOUT_QUEUE_2,true,false,false,null);
channel.queueBind(FANOUT_QUEUE_1,FANOUT_EXCHAGE,"");
channel.queueBind(FANOUT_QUEUE_2,FANOUT_EXCHAGE,"");
for (int i = 0; i <10; i++) {
String message = "Hello RabbitMQ PublishAndSubscribe-->" + i;
channel.basicPublish(FANOUT_EXCHAGE,"",null,message.getBytes());
System.out.println("已發(fā)送"+message);
}
//關(guān)閉資源
channel.close();
connection.close();
}
}
- 消費(fèi)者1
public class ConsumerPublishAndSubscribe01 {
//交換機(jī)名稱
static final String FANOUT_EXCHAGE = "fanout_exchange";
//隊(duì)列名稱
static final String FANOUT_QUEUE_1 = "fanout_queue_1";
//隊(duì)列名稱
static final String FANOUT_QUEUE_2 = "fanout_queue_2";
public static void main(String[] args) throws IOException, TimeoutException {
//創(chuàng)建連接
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("localhost");
connectionFactory.setPort(5672);
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
connectionFactory.setVirtualHost("/");
Connection connection = connectionFactory.newConnection();
//創(chuàng)建通道
Channel channel = connection.createChannel();
channel.exchangeDeclare(FANOUT_EXCHAGE, BuiltinExchangeType.FANOUT);
channel.queueDeclare(FANOUT_QUEUE_1,true,false,false,null);
channel.queueBind(FANOUT_QUEUE_1,FANOUT_EXCHAGE,"");
//創(chuàng)建消費(fèi)者
DefaultConsumer consumer =new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("路由key為:" + envelope.getRoutingKey());
System.out.println("交換機(jī)為:" + envelope.getExchange());
System.out.println("消息id為:" + envelope.getDeliveryTag());
System.out.println("消費(fèi)者1-接收到的消息為:" + new String(body, "utf-8"));
}
};
channel.basicConsume(FANOUT_QUEUE_1,true,consumer);
}
}
- 消費(fèi)者2
public class ConsumerPublishAndSubscribe02 {
//交換機(jī)名稱
static final String FANOUT_EXCHAGE = "fanout_exchange";
//隊(duì)列名稱
static final String FANOUT_QUEUE_1 = "fanout_queue_1";
//隊(duì)列名稱
static final String FANOUT_QUEUE_2 = "fanout_queue_2";
public static void main(String[] args) throws IOException, TimeoutException {
//創(chuàng)建連接
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("localhost");
connectionFactory.setPort(5672);
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
connectionFactory.setVirtualHost("/");
Connection connection = connectionFactory.newConnection();
//創(chuàng)建通道
Channel channel = connection.createChannel();
channel.exchangeDeclare(FANOUT_EXCHAGE, BuiltinExchangeType.FANOUT);
channel.queueDeclare(FANOUT_QUEUE_2,true,false,false,null);
channel.queueBind(FANOUT_QUEUE_2,FANOUT_EXCHAGE,"");
//創(chuàng)建消費(fèi)者
DefaultConsumer consumer =new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("路由key為:" + envelope.getRoutingKey());
System.out.println("交換機(jī)為:" + envelope.getExchange());
System.out.println("消息id為:" + envelope.getDeliveryTag());
System.out.println("消費(fèi)者2-接收到的消息為:" + new String(body, "utf-8"));
}
};
channel.basicConsume(FANOUT_QUEUE_2,true,consumer);
}
}
啟動(dòng)所有消費(fèi)者,然后使用生產(chǎn)者發(fā)送消息努潘;在每個(gè)消費(fèi)者對(duì)應(yīng)的控制臺(tái)可以查看到生產(chǎn)者發(fā)送的所有消息诽偷;實(shí)現(xiàn)廣播的效果。
發(fā)布訂閱模式與工作隊(duì)列模式的區(qū)別
工作隊(duì)列模式不用定義交換機(jī)疯坤,而發(fā)布/訂閱模式需要定義交換機(jī)报慕。
發(fā)布/訂閱模式的生產(chǎn)方是面向交換機(jī)發(fā)送消息,工作隊(duì)列模式的生產(chǎn)方是面向隊(duì)列發(fā)送消息(底層使用默認(rèn)交換機(jī))
發(fā)布/訂閱模式需要設(shè)置隊(duì)列和交換機(jī)的綁定压怠,工作隊(duì)列模式不需要設(shè)置眠冈,實(shí)際上工作隊(duì)列模式會(huì)將隊(duì)列綁 定到默認(rèn)的交換機(jī) 。
3.3 Routing路由模式
- P:生產(chǎn)者菌瘫,向Exchange發(fā)送消息蜗顽,發(fā)送消息時(shí),會(huì)指定一個(gè)routing key雨让。
- X:Exchange(交換機(jī))雇盖,接收生產(chǎn)者的消息,然后把消息遞交給 與routing key完全匹配的隊(duì)列
- C1:消費(fèi)者栖忠,其所在隊(duì)列指定了需要routing key 為 error 的消息
- C2:消費(fèi)者崔挖,其所在隊(duì)列指定了需要routing key 為 info贸街、error、warning 的消息
- 路由模式交換機(jī)的類型為:Direct狸相,還有隊(duì)列綁定交換機(jī)的時(shí)候需要指定routing key匾浪。
- 生產(chǎn)者
public class ProducerRouting {
//交換機(jī)名稱
static final String DIRECT_EXCHAGE = "direct_exchange";
//隊(duì)列名稱
static final String DIRECT_QUEUE_INSERT = "direct_queue_insert";
//隊(duì)列名稱
static final String DIRECT_QUEUE_UPDATE = "direct_queue_update";
public static void main(String[] args) throws IOException, TimeoutException {
//創(chuàng)建連接
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("localhost");
connectionFactory.setPort(5672);
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
connectionFactory.setVirtualHost("/");
Connection connection = connectionFactory.newConnection();
//創(chuàng)建通道
Channel channel = connection.createChannel();
channel.exchangeDeclare(DIRECT_EXCHAGE, BuiltinExchangeType.DIRECT);
channel.queueDeclare(DIRECT_QUEUE_INSERT, true, false, false, null);
channel.queueDeclare(DIRECT_QUEUE_UPDATE, true, false, false, null);
channel.queueBind(DIRECT_QUEUE_UPDATE,DIRECT_EXCHAGE,"update");
channel.queueBind(DIRECT_QUEUE_INSERT,DIRECT_EXCHAGE,"insert");
String message = "新增了商品捶朵。路由模式桌肴;routing key 為 insert!";
channel.basicPublish(DIRECT_EXCHAGE,"insert",null,message.getBytes());
System.out.println("已發(fā)送消息:"+message);
message="修改了商品匙头。路由模式;routing key 為 update";
channel.basicPublish(DIRECT_EXCHAGE,"update",null,message.getBytes());
System.out.println("已發(fā)送消息:"+message);
//關(guān)閉資源
channel.close();
connection.close();
}
}
- 消費(fèi)者1
public class ConsumerRouting01 {
//交換機(jī)名稱
static final String DIRECT_EXCHAGE = "direct_exchange";
//隊(duì)列名稱
static final String DIRECT_QUEUE_INSERT = "direct_queue_insert";
//隊(duì)列名稱
static final String DIRECT_QUEUE_UPDATE = "direct_queue_update";
public static void main(String[] args) throws IOException, TimeoutException {
//創(chuàng)建連接
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("localhost");
connectionFactory.setPort(5672);
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
connectionFactory.setVirtualHost("/");
Connection connection = connectionFactory.newConnection();
//創(chuàng)建通道
Channel channel = connection.createChannel();
channel.exchangeDeclare(DIRECT_EXCHAGE, BuiltinExchangeType.DIRECT);
channel.queueDeclare(DIRECT_QUEUE_INSERT,true,false,false,null);
channel.queueBind(DIRECT_QUEUE_INSERT,DIRECT_EXCHAGE,"insert");
//創(chuàng)建消費(fèi)者
DefaultConsumer consumer =new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("路由key為:" + envelope.getRoutingKey());
System.out.println("交換機(jī)為:" + envelope.getExchange());
System.out.println("消息id為:" + envelope.getDeliveryTag());
System.out.println("消費(fèi)者1-接收到的消息為:" + new String(body, "utf-8"));
}
};
channel.basicConsume(DIRECT_QUEUE_INSERT,true,consumer);
}
}
- 消費(fèi)者2
public class ConsumerRouting02 {
//交換機(jī)名稱
static final String DIRECT_EXCHAGE = "direct_exchange";
//隊(duì)列名稱
static final String DIRECT_QUEUE_INSERT = "direct_queue_insert";
//隊(duì)列名稱
static final String DIRECT_QUEUE_UPDATE = "direct_queue_update";
public static void main(String[] args) throws IOException, TimeoutException {
//創(chuàng)建連接
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("localhost");
connectionFactory.setPort(5672);
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
connectionFactory.setVirtualHost("/");
Connection connection = connectionFactory.newConnection();
//創(chuàng)建通道
Channel channel = connection.createChannel();
channel.exchangeDeclare(DIRECT_EXCHAGE, BuiltinExchangeType.DIRECT);
channel.queueDeclare(DIRECT_QUEUE_UPDATE,true,false,false,null);
channel.queueBind(DIRECT_QUEUE_UPDATE,DIRECT_EXCHAGE,"update");
//創(chuàng)建消費(fèi)者
DefaultConsumer consumer =new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("路由key為:" + envelope.getRoutingKey());
System.out.println("交換機(jī)為:" + envelope.getExchange());
System.out.println("消息id為:" + envelope.getDeliveryTag());
System.out.println("消費(fèi)者1-接收到的消息為:" + new String(body, "utf-8"));
}
};
channel.basicConsume(DIRECT_QUEUE_UPDATE,true,consumer);
}
}
路由模式特點(diǎn):
- 隊(duì)列與交換機(jī)的綁定将谊,不能是任意綁定了,而是要指定一個(gè)
RoutingKey
(路由key)
- 消息的發(fā)送方在 向 Exchange發(fā)送消息時(shí)渐白,也必須指定消息的
RoutingKey
尊浓。
- Exchange不再把消息交給每一個(gè)綁定的隊(duì)列,而是根據(jù)消息的
Routing Key
進(jìn)行判斷纯衍,只有隊(duì)列的Routingkey
與消息的Routing key
完全一致栋齿,才會(huì)接收到消息
3.4Topics通配符模式
Topic
類型與Direct
相比,都是可以根據(jù)Routing key把消息路由到不同的隊(duì)列襟诸。只不過Topic
類型Exchange可以讓隊(duì)列在綁定Routing key的時(shí)候使用通配符瓦堵!
Routingkey
一般都是有一個(gè)或多個(gè)單詞組成,多個(gè)單詞之間以”.”分割歌亲,例如:item.insert
通配符規(guī)則:
#
:匹配一個(gè)或多個(gè)詞
*
:匹配不多不少恰好1個(gè)詞舉例:
item.#
:能夠匹配item.insert.abc
或者item.insert
item.*
:只能匹配item.insert
- 消費(fèi)者
public class ProducerTopics {
//交換機(jī)名稱
static final String TOPIC_EXCHAGE = "topic_exchange";
//隊(duì)列名稱
static final String TOPIC_QUEUE_1 = "topic_queue_1";
//隊(duì)列名稱
static final String TOPIC_QUEUE_2 = "topic_queue_2";
public static void main(String[] args) throws IOException, TimeoutException {
//創(chuàng)建連接
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("localhost");
connectionFactory.setPort(5672);
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
connectionFactory.setVirtualHost("/");
Connection connection = connectionFactory.newConnection();
//創(chuàng)建通道
Channel channel = connection.createChannel();
channel.exchangeDeclare(TOPIC_EXCHAGE, BuiltinExchangeType.TOPIC);
channel.queueDeclare(TOPIC_QUEUE_1, true, false, false, null);
channel.queueDeclare(TOPIC_QUEUE_2, true, false, false, null);
channel.queueBind(TOPIC_QUEUE_1,TOPIC_EXCHAGE,"*.update");
channel.queueBind(TOPIC_QUEUE_2,TOPIC_EXCHAGE,"*.insert");
channel.queueBind(TOPIC_QUEUE_1,TOPIC_EXCHAGE,"*.delete");
channel.queueBind(TOPIC_QUEUE_2,TOPIC_EXCHAGE,"*.update");
channel.queueBind(TOPIC_QUEUE_2,TOPIC_EXCHAGE,"*.delete");
//發(fā)送消息
String message="新增了商品菇用。Topic模式;routing key 為 item.insert ";
channel.basicPublish(TOPIC_EXCHAGE, "item.insert", null, message.getBytes());
System.out.println("已發(fā)送消息:" + message);
message = "修改了商品陷揪。Topic模式惋鸥;routing key 為 item.update" ;
channel.basicPublish(TOPIC_EXCHAGE, "item.update", null, message.getBytes());
System.out.println("已發(fā)送消息:" + message);
message = "刪除了商品。Topic模式悍缠;routing key 為 item.delete" ;
channel.basicPublish(TOPIC_EXCHAGE, "item.delete", null, message.getBytes());
System.out.println("已發(fā)送消息:" + message);
//關(guān)閉資源
channel.close();
connection.close();
}
}<
- 消費(fèi)者1
public class ConsumerTopics01 {
//交換機(jī)名稱
static final String TOPIC_EXCHAGE = "topic_exchange";
//隊(duì)列名稱
static final String TOPIC_QUEUE_1 = "topic_queue_1";
//隊(duì)列名稱
static final String TOPIC_QUEUE_2 = "topic_queue_2";
public static void main(String[] args) throws IOException, TimeoutException {
//創(chuàng)建連接
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("localhost");
connectionFactory.setPort(5672);
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
connectionFactory.setVirtualHost("/");
Connection connection = connectionFactory.newConnection();
//創(chuàng)建通道
Channel channel = connection.createChannel();
channel.exchangeDeclare(TOPIC_EXCHAGE, BuiltinExchangeType.TOPIC);
channel.queueDeclare(TOPIC_QUEUE_1, true, false, false, null);
channel.queueBind(TOPIC_QUEUE_1, TOPIC_EXCHAGE, "item.update");
channel.queueBind(TOPIC_QUEUE_1, TOPIC_EXCHAGE, "item.delete");
//創(chuàng)建消費(fèi)者
DefaultConsumer consumer =new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("路由key為:" + envelope.getRoutingKey());
System.out.println("交換機(jī)為:" + envelope.getExchange());
System.out.println("消息id為:" + envelope.getDeliveryTag());
System.out.println("消費(fèi)者1-接收到的消息為:" + new String(body, "utf-8"));
}
};
channel.basicConsume(TOPIC_QUEUE_1,true,consumer);
}
}
- 消費(fèi)者2
public class ConsumerTopics02 {
//交換機(jī)名稱
static final String TOPIC_EXCHAGE = "topic_exchange";
//隊(duì)列名稱
static final String TOPIC_QUEUE_1 = "topic_queue_1";
//隊(duì)列名稱
static final String TOPIC_QUEUE_2 = "topic_queue_2";
public static void main(String[] args) throws IOException, TimeoutException {
//創(chuàng)建連接
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("localhost");
connectionFactory.setPort(5672);
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
connectionFactory.setVirtualHost("/");
Connection connection = connectionFactory.newConnection();
//創(chuàng)建通道
Channel channel = connection.createChannel();
channel.exchangeDeclare(TOPIC_EXCHAGE, BuiltinExchangeType.TOPIC);
channel.queueDeclare(TOPIC_QUEUE_2, true, false, false, null);
channel.queueBind(TOPIC_QUEUE_2, TOPIC_EXCHAGE, "item.*");
//創(chuàng)建消費(fèi)者
DefaultConsumer consumer =new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("路由key為:" + envelope.getRoutingKey());
System.out.println("交換機(jī)為:" + envelope.getExchange());
System.out.println("消息id為:" + envelope.getDeliveryTag());
System.out.println("消費(fèi)者1-接收到的消息為:" + new String(body, "utf-8"));
}
};
channel.basicConsume(TOPIC_QUEUE_2,true,consumer);
}
}
總結(jié)
RabbitMQ工作模式: 1. 簡(jiǎn)單模式 一個(gè)生產(chǎn)者卦绣、一個(gè)消費(fèi)者,不需要設(shè)置交換機(jī)(使用默認(rèn)的交換機(jī))
2. 工作隊(duì)列模式 Work Queue 一個(gè)生產(chǎn)者飞蚓、多個(gè)消費(fèi)者(競(jìng)爭(zhēng)關(guān)系)滤港,不需要設(shè)置交換機(jī)(使用默認(rèn)的交換機(jī))
3. 發(fā)布訂閱模式 Publish/subscribe 需要設(shè)置類型為fanout的交換機(jī),并且交換機(jī)和隊(duì)列進(jìn)行綁定玷坠,當(dāng)發(fā)送消息到交換機(jī)后蜗搔,交換機(jī)會(huì)將消息發(fā)送到綁定的隊(duì)列
4. 路由模式 Routing 需要設(shè)置類型為direct的交換機(jī),交換機(jī)和隊(duì)列進(jìn)行綁定八堡,并且指定routing key樟凄,當(dāng)發(fā)送消息到交換機(jī)后,交換機(jī)會(huì)根據(jù)routing key將消息發(fā)送到對(duì)應(yīng)的隊(duì)列
5. 通配符模式 Topic 需要設(shè)置類型為topic的交換機(jī)兄渺,交換機(jī)和隊(duì)列進(jìn)行綁定缝龄,并且指定通配符方式的routing key,當(dāng)發(fā)送消息到交換機(jī)后,交換機(jī)會(huì)根據(jù)routing key將消息發(fā)送到對(duì)應(yīng)的隊(duì)列
4.框架整合RabbitMQ
4.1 Spring整合RabbitMQ
4.1.1 搭建生產(chǎn)者工程
- 創(chuàng)建工程叔壤,添加依賴
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.itheima</groupId>
<artifactId>spring-rabbitmq-producer</artifactId>
<version>1.0-SNAPSHOT</version>
<dependencies>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context</artifactId>
<version>5.1.7.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit</artifactId>
<version>2.1.8.RELEASE</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-test</artifactId>
<version>5.1.7.RELEASE</version>
</dependency>
</dependencies>
</project>
- 編寫配置文件
- rabbitmq.properties連接參數(shù)配置文件
rabbitmq.host=localhost
rabbitmq.port=5672
rabbitmq.username=guest
rabbitmq.password=guest
rabbitmq.virtual=/
- spring-rabbitmq.xml配置文件
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/context
https://www.springframework.org/schema/context/spring-context.xsd
http://www.springframework.org/schema/rabbit
http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">
<!--加載配置文件-->
<context:property-placeholder location="classpath:rabbitmq.properties"/>
<!--創(chuàng)建connectionFactory-->
<rabbit:connection-factory id="connectionFactory" host="${rabbitmq.host}"
port="${rabbitmq.port}" username="${rabbitmq.username}" password="${rabbitmq.password}"
virtual-host="${rabbitmq.virtual}"/>
<!--定義管理交換機(jī)瞎饲、隊(duì)列-->
<rabbit:admin connection-factory="connectionFactory"/>
<!--定義持久化隊(duì)列,不存在則自動(dòng)創(chuàng)建炼绘;不綁定到交換機(jī)則綁定到默認(rèn)交換機(jī)默認(rèn)交換機(jī)類型為direct嗅战,名字為:"",路由鍵為隊(duì)列的名稱-->
<rabbit:queue id="spring_queue" name="spring_queue" auto-declare="true"/>
<!--定義廣播交換機(jī)中的持久化隊(duì)列俺亮,不存在則自動(dòng)創(chuàng)建-->
<rabbit:queue id="spring_fanout_queue_1" name="spring_fanout_queue_1" auto-declare="true"/>
<rabbit:queue id="spring_fanout_queue_2" name="spring_fanout_queue_2" auto-declare="true"/>
<!--定義廣播類型交換機(jī)驮捍;并綁定上述兩個(gè)隊(duì)列-->
<rabbit:fanout-exchange id="spring_fanout_exchange" name="spring_fanout_exchange" auto-declare="true">
<rabbit:bindings>
<rabbit:binding queue="spring_fanout_queue_1"/>
<rabbit:binding queue="spring_fanout_queue_2"/>
</rabbit:bindings>
</rabbit:fanout-exchange>
<!--定義廣播交換機(jī)中的持久化隊(duì)列,不存在則自動(dòng)創(chuàng)建-->
<rabbit:queue id="spring_topic_queue_star" name="spring_topic_queue_star" auto-declare="true"/>
<rabbit:queue id="spring_topic_queue_well" name="spring_topic_queue_well" auto-declare="true"/>
<rabbit:queue id="spring_topic_queue_well2" name="spring_topic_queue_well2" auto-declare="true"/>
<rabbit:topic-exchange id="spring_topic_exchange" name="spring_topic_exchange" auto-declare="true">
<rabbit:bindings>
<rabbit:binding pattern="heima.*" queue="spring_topic_queue_star"/>
<rabbit:binding pattern="heima.#" queue="spring_topic_queue_well"/>
<rabbit:binding pattern="itcast.#" queue="spring_topic_queue_well2"/>
</rabbit:bindings>
</rabbit:topic-exchange>
<!--定義rabbitTemplate對(duì)象操作可以在代碼中方便發(fā)送消息-->
<rabbit:template id="rabbitTemplate" connection-factory="connectionFactory"/>
</beans>
- 創(chuàng)建測(cè)試文件
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = "classpath:spring-rabbitmq.xml")
public class ProducerTest {
@Autowired
private RabbitTemplate rabbitTemplate;
/**
*交換機(jī)類型為 direct
*交換機(jī)的名稱為空脚曾,路由鍵為隊(duì)列的名稱
*/
@Test
public void test1() {
rabbitTemplate.convertAndSend("spring_queue","只發(fā)隊(duì)列spring_queue的消息东且。");
}
/**
*交換機(jī)類型為 fanout
*綁定到該交換機(jī)的所有隊(duì)列都能夠收到消息
*/
@Test
public void test2() {
rabbitTemplate.convertAndSend("spring_fanout_exchange","","發(fā)送到spring_fanout_exchange交換機(jī)的廣播消息");
}
/**
*交換機(jī)類型為 topic
*/
@Test
public void test3() {
rabbitTemplate.convertAndSend("spring_topic_exchange", "heima.bj", "發(fā)送到spring_topic_exchange交換機(jī)heima.bj的消息");
rabbitTemplate.convertAndSend("spring_topic_exchange", "heima.bj.1", "發(fā)送到spring_topic_exchange交換機(jī)heima.bj.1的消息");
rabbitTemplate.convertAndSend("spring_topic_exchange", "heima.bj.2", "發(fā)送到spring_topic_exchange交換機(jī)heima.bj.2的消息");
rabbitTemplate.convertAndSend("spring_topic_exchange", "itcast.cn", "發(fā)送到spring_topic_exchange交換機(jī)itcast.cn的消息");
}
}
4.1.2 搭建消費(fèi)者工程
- 創(chuàng)建工程,添加依賴
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.itheima</groupId>
<artifactId>springrabbitmqcomsumer</artifactId>
<version>1.0-SNAPSHOT</version>
<dependencies>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context</artifactId>
<version>5.1.7.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit</artifactId>
<version>2.1.8.RELEASE</version>
</dependency>
</dependencies>
</project>
- 編寫配置文件
- rabbitmq.properties連接參數(shù)配置文件
rabbitmq.host=localhost
rabbitmq.port=5672
rabbitmq.username=guest
rabbitmq.password=guest
rabbitmq.virtual=/
- spring-rabbitmq.xml配置文件
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/context
https://www.springframework.org/schema/context/spring-context.xsd
http://www.springframework.org/schema/rabbit
http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">
<!--加載配置文件-->
<context:property-placeholder location="classpath:rabbitmq.properties"/>
<!--創(chuàng)建connectionFactory-->
<rabbit:connection-factory id="connectionFactory" host="${rabbitmq.host}"
port="${rabbitmq.port}" username="${rabbitmq.username}" password="${rabbitmq.password}"
virtual-host="${rabbitmq.virtual}"/>
<!--配置rabbitmq監(jiān)聽器-->
<bean id="springQueueListener" class="com.itheima.SpringQueueListener"/>
<bean id="fanoutListener1" class="com.itheima.FanoutListener1"/>
<bean id="fanoutListener2" class="com.itheima.FanoutListener2"/>
<bean id="topicListenerStar" class="com.itheima.TopicListenerStar"/>
<bean id="topicListenerWell" class="com.itheima.TopicListenerWell"/>
<bean id="topicListenerWell2" class="com.itheima.TopicListenerWell2"/>
<rabbit:listener-container connection-factory="connectionFactory" auto-declare="true">
<rabbit:listener ref="springQueueListener" queue-names="spring_queue"/>
<rabbit:listener ref="fanoutListener1" queue-names="spring_fanout_queue_1"/>
<rabbit:listener ref="fanoutListener2" queue-names="spring_fanout_queue_2"/>
<rabbit:listener ref="topicListenerStar" queue-names="spring_topic_queue_star"/>
<rabbit:listener ref="topicListenerWell" queue-names="spring_topic_queue_well"/>
<rabbit:listener ref="topicListenerWell2" queue-names="spring_topic_queue_well2"/>
</rabbit:listener-container>
</beans>
</beans>
- 編寫監(jiān)聽器
- 隊(duì)列監(jiān)聽器
public class SpringQueueListener implements MessageListener {
public void onMessage(Message message) {
try {
String msg = new String(message.getBody(), "utf-8");
System.out.println("接收路由名稱為:"+message.getMessageProperties().getReceivedExchange()+"本讥," +
"路由鍵為:"+ message.getMessageProperties().getReceivedRoutingKey()+"珊泳,隊(duì)列名為:" +
message.getMessageProperties().getConsumerQueue()+"的消息:"+msg);
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
}
}
- 廣播監(jiān)聽器1
public class FanoutListener1 implements MessageListener {
public void onMessage(Message message) {
try {
String msg = new String(message.getBody(), "utf-8");
System.out.println("廣播監(jiān)聽器1:接收路由名稱為:"+message.getMessageProperties().getReceivedExchange()+"," +
"路由鍵為:"+ message.getMessageProperties().getReceivedRoutingKey()+"拷沸,隊(duì)列名為:" +
message.getMessageProperties().getConsumerQueue()+"的消息:"+msg);
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
}
}
- 廣播監(jiān)聽器2
public class FanoutListener2 implements MessageListener {
public void onMessage(Message message) {
try {
String msg = new String(message.getBody(), "utf-8");
System.out.println("廣播監(jiān)聽器2:接收路由名稱為:"+message.getMessageProperties().getReceivedExchange()+"色查," +
"路由鍵為:"+ message.getMessageProperties().getReceivedRoutingKey()+",隊(duì)列名為:" +
message.getMessageProperties().getConsumerQueue()+"的消息:"+msg);
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
}
}
- 星號(hào)通配符監(jiān)聽器
public class TopicListenerStar implements MessageListener {
public void onMessage(Message message) {
try {
String msg = new String(message.getBody(), "utf-8");
System.out.println("通配符*監(jiān)聽器:接收路由名稱為:"+message.getMessageProperties().getReceivedExchange()+"堵漱," +
"路由鍵為:"+ message.getMessageProperties().getReceivedRoutingKey()+"综慎,隊(duì)列名為:" +
message.getMessageProperties().getConsumerQueue()+"的消息:"+msg);
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
}
}
- 井號(hào)號(hào)通配符監(jiān)聽器
public class TopicListenerWell implements MessageListener {
public void onMessage(Message message) {
try {
String msg = new String(message.getBody(), "utf-8");
System.out.println("通配符#監(jiān)聽器:接收路由名稱為:"+message.getMessageProperties().getReceivedExchange()+"," +
"路由鍵為:"+ message.getMessageProperties().getReceivedRoutingKey()+"勤庐,隊(duì)列名為:" +
message.getMessageProperties().getConsumerQueue()+"的消息:"+msg);
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
}
}
- 井號(hào)號(hào)通配符監(jiān)聽器2
public class TopicListenerWell2 implements MessageListener {
public void onMessage(Message message) {
try {
String msg = new String(message.getBody(), "utf-8");
System.out.println("通配符#監(jiān)聽器2:接收路由名稱為:"+message.getMessageProperties().getReceivedExchange()+"示惊," +
"路由鍵為:"+ message.getMessageProperties().getReceivedRoutingKey()+",隊(duì)列名為:" +
message.getMessageProperties().getConsumerQueue()+"的消息:"+msg);
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
}
}
- 編寫測(cè)試類
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = "classpath:spring-rabbitmq.xml")
public class ConsumerTest {
@Test
public void test1() {
while (true){
}
}
}
4.2 SpringBoot整合RabbitMQ
4.2.1 搭建生產(chǎn)者工程
- 創(chuàng)建工程添加依賴
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.2.6.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.itheima</groupId>
<artifactId>springboot-rabbitmq-producer</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>springboot-rabbitmq-producer</name>
<description>Demo project for Spring Boot</description>
<properties>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.junit.vintage</groupId>
<artifactId>junit-vintage-engine</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
- 編寫配置文件
spring:
rabbitmq:
host: localhost
port: 5672
virtual-host: /
username: guest
password: guest
- 編寫配置類
@Configuration
public class RabbitMQConfig {
//交換機(jī)名稱
public static final String ITEM_TOPIC_EXCHANGE = "item_topic_exchange";
//隊(duì)列名稱
public static final String ITEM_QUEUE = "item_queue";
@Bean("itemTopicExchange")
public Exchange topicExchange(){
return ExchangeBuilder.topicExchange(ITEM_TOPIC_EXCHANGE).durable(true).build();
}
@Bean("itemQueue")
public Queue itemQueue(){
return QueueBuilder.durable(ITEM_QUEUE).build();
}
@Bean
public Binding itemBind(@Qualifier("itemQueue")Queue queue,
@Qualifier("itemTopicExchange") Exchange exchange){
return BindingBuilder.bind(queue).to(exchange).with("item.#").noargs();
}
}
- 編寫測(cè)試類
@SpringBootTest
@RunWith(SpringRunner.class)
public class SpringbootRabbitmqProducerApplicationTests {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void test1() {
rabbitTemplate.convertAndSend(RabbitMQConfig.ITEM_TOPIC_EXCHANGE,"boot.bi.z","boot mq hello..");
}
}
4.2.1 搭建消費(fèi)者工程
- 創(chuàng)建工程添加依賴
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.2.6.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.itheima</groupId>
<artifactId>springboot-rabbitmq-consumer</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>springboot-rabbitmq-consumer</name>
<description>Demo project for Spring Boot</description>
<properties>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.junit.vintage</groupId>
<artifactId>junit-vintage-engine</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
- 編寫配置文件
spring:
rabbitmq:
host: localhost
port: 5672
virtual-host: /
username: guest
password: guest
- 配置監(jiān)聽器
@Component
public class ConsumerListener {
@RabbitListener(queues = "item_queue")
public void listener1(String message){
System.out.println("消費(fèi)者接收到的消息為:" + message);
}
}