RabbitMQ支持的消息模型
https://www.rabbitmq.com/getstarted.html
環(huán)境搭建
創(chuàng)建一個Maven項目導入RabbitMQ依賴
<!--引入rabbitmq-->
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.7.2</version>
</dependency>
創(chuàng)建ems
用戶,密碼為123
,創(chuàng)建VirtualHost /ems
Hello World 模型
在上圖的模型中晶通,有以下概念:
- P:生產(chǎn)者璃氢,也就是要發(fā)送消息的程序
- C:消費者:消息的接受者,會一直等待消息到來狮辽。
- queue:消息隊列一也,圖中紅色部分巢寡。類似一個郵箱,可以緩存消息椰苟;生產(chǎn)者向其中投遞消息抑月,消費者從其中取出消息。
生產(chǎn)者
創(chuàng)建生產(chǎn)者Provider類舆蝴,通過連接工廠對象獲取連接爪幻,創(chuàng)建通道channel
//生產(chǎn)者
public class Provider {
@Test
public void testSendMessage() throws IOException, TimeoutException {
//創(chuàng)建連接mq的連接工廠對象
ConnectionFactory connectionFactory = new ConnectionFactory();
//設置連接rabbitmq的主機
connectionFactory.setHost("localhost");
//設置端口號
connectionFactory.setPort(5672);
//設置連接哪個虛擬主機
connectionFactory.setVirtualHost("/ems");
//設置訪問虛擬主機的用戶名和密碼
connectionFactory.setUsername("ems");
connectionFactory.setPassword("123");
//獲取連接對象
Connection connection = connectionFactory.newConnection();
//獲取連接中的通道
Channel channel = connection.createChannel();
//通道綁定對應的消息隊列
//參數(shù)1:queue 隊列名稱 如果隊列不存在則自動創(chuàng)建
//參數(shù)2:durable 用來定義隊列特性是否要持久化 true 持久化隊列 false 不持久化
//參數(shù)3:exclusive 是否獨占隊列 true 獨占隊列 false 不獨占
//參數(shù)4:autoDelete 是否在消費完成后自動刪除隊列 true 自動刪除 false 不自動刪除
channel.queueDeclare("hello",false,false,false,null);
//發(fā)布消息
//參數(shù)1:交換機名稱 參數(shù)2:隊列名稱 參數(shù)3:傳遞消息額外設置 參數(shù)4;消息的具體內容
channel.basicPublish("","hello", null,"hello rabbitmq".getBytes());
channel.close();
connection.close();
}
}
消費者
channel.queueDeclare();
與生產(chǎn)者保持一致须误,使用channel.basicConsume()
接收消息,回調接口new DefaultConsumer(channel)
重寫handleDelivery()
方法
//消費者
public class Customer {
public static void main(String[] args) throws IOException, TimeoutException {
//創(chuàng)建連接mq的連接工廠對象
ConnectionFactory connectionFactory = new ConnectionFactory();
//設置連接rabbitmq的主機
connectionFactory.setHost("localhost");
//設置端口號
connectionFactory.setPort(5672);
//設置連接哪個虛擬主機
connectionFactory.setVirtualHost("/ems");
//設置訪問虛擬主機的用戶名和密碼
connectionFactory.setUsername("ems");
connectionFactory.setPassword("123");
//獲取連接對象
Connection connection = connectionFactory.newConnection();
//獲取連接中的通道
Channel channel = connection.createChannel();
//通道綁定對應的消息隊列--參數(shù)要和生產(chǎn)者保持對應仇轻,否則可能出現(xiàn)錯誤
//參數(shù)1:queue 隊列名稱 如果隊列不存在則自動創(chuàng)建
//參數(shù)2:durable 用來定義隊列特性是否要持久化 true 持久化隊列 false 不持久化
//參數(shù)3:exclusive 是否獨占隊列 true 獨占隊列 false 不獨占
//參數(shù)4:autoDelete 是否在消費完成后自動刪除隊列 true 自動刪除 false 不自動刪除
channel.queueDeclare("hello", false, false, false, null);
//消費信息
//參數(shù)1:消費哪個隊列的信息 隊列名稱
//參數(shù)2:開啟消息的自動確認機制
//參數(shù)3:消費時的回調接口
channel.basicConsume("hello", true, new DefaultConsumer(channel) {
//最后一個參數(shù):消息隊列中取出的信息
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("new String(body)=" + new String(body));
}
});
}
}
測速
首先啟動消費者監(jiān)聽京痢,然后在啟動生產(chǎn)者發(fā)送消息。發(fā)送消息后消費者進行消費
參數(shù)說明
channel.queueDeclare("hello",false,false,false,null);
- 參數(shù)1:queue 隊列名稱 如果隊列不存在則自動創(chuàng)建
- 參數(shù)2:durable 用來定義隊列特性是否要持久化 true 持久化隊列 false 不持久化
- 參數(shù)3:exclusive 是否獨占隊列 true 獨占隊列 false 不獨占
- 參數(shù)4:autoDelete 是否在消費完成后自動刪除隊列 true 自動刪除 false 不自動刪除
channel.basicPublish("","aa", null,"hello rabbitmq".getBytes());
- 參數(shù)1:交換機名稱
- 參數(shù)2:隊列名稱
- 參數(shù)3:傳遞消息額外設置
- 參數(shù)4篷店;消息的具體內容
channel.basicConsume()
- 參數(shù)1:消費哪個隊列的信息 隊列名稱
- 參數(shù)2:開啟消息的自動確認機制
- 參數(shù)3:消費時的回調接口
durable為ture只能實現(xiàn)隊列持久化祭椰。若想要其中未消費的消息持久化channel.basicPublish()
中參數(shù)3要設置為MessageProperties.PERSISTENT_TEXT_PLAIN
封裝工具類
通過實現(xiàn)Hello World模型,我們可以發(fā)現(xiàn)存在了許多重復的代碼疲陕。例如獲取連接對象方淤,關閉連接操作。我們將其封裝成一個工具類 RabbitMQUtils
,獲取連接對象getConnection()
,關閉資源closeConnectionAndChanel()
//工具類
public class RabbitMQUtils {
//獲取連接
public static Connection getConnection() {
try {
ConnectionFactory connectionFactory = new ConnectionFactory();
//設置連接rabbitmq的主機
connectionFactory.setHost("localhost");
//設置端口號
connectionFactory.setPort(5672);
//設置連接哪個虛擬主機
connectionFactory.setVirtualHost("/ems");
//設置訪問虛擬主機的用戶名和密碼
connectionFactory.setUsername("ems");
connectionFactory.setPassword("123");
return connectionFactory.newConnection();
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
return null;
}
//關閉連接和通道的方法
public static void closeConnectionAndChanel(Channel channel, Connection connection) {
try {
//關閉通道
if (channel != null) {
channel.close();
}
//關閉連接
if (connection != null) {
connection.close();
}
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
}
不過上述代碼還是可以優(yōu)化的蹄殃,每次獲取連接都需要創(chuàng)建一次工廠對象是沒有必要的携茂。相關認證資源也只需要加載一次即可。于是我們將代碼優(yōu)化
//工具類
public class RabbitMQUtils {
//創(chuàng)建連接mq的連接工廠對象 屬于重量級資源
private static ConnectionFactory connectionFactory;
//靜態(tài)代碼塊 類加載時執(zhí)行 只執(zhí)行一次
static {
connectionFactory = new ConnectionFactory();
//設置連接rabbitmq的主機
connectionFactory.setHost("localhost");
//設置端口號
connectionFactory.setPort(5672);
//設置連接哪個虛擬主機
connectionFactory.setVirtualHost("/ems");
//設置訪問虛擬主機的用戶名和密碼
connectionFactory.setUsername("ems");
connectionFactory.setPassword("123");
}
//獲取連接
public static Connection getConnection() {
try {
return connectionFactory.newConnection();
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
return null;
}
//關閉連接和通道的方法
public static void closeConnectionAndChanel(Channel channel, Connection connection) {
try {
//關閉通道
if (channel != null) {
channel.close();
}
//關閉連接
if (connection != null) {
connection.close();
}
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
}
//生產(chǎn)者
public class Provider {
@Test
public void testSendMessage() throws IOException, TimeoutException {
//通過工具類獲取連接對象
Connection connection = RabbitMQUtils.getConnection();
//獲取連接中的通道
Channel channel = connection.createChannel();
//通道綁定對應的消息隊列
//參數(shù)1:queue 隊列名稱 如果隊列不存在則自動創(chuàng)建
//參數(shù)2:durable 用來定義隊列特性是否要持久化 true 持久化隊列 false 不持久化
//參數(shù)3:exclusive 是否獨占隊列 true 獨占隊列 false 不獨占
//參數(shù)4:autoDelete 是否在消費完成后自動刪除隊列 true 自動刪除 false 不自動刪除
channel.queueDeclare("hello",true,false,false,null);
//發(fā)布消息
//參數(shù)1:交換機名稱 參數(shù)2:隊列名稱 參數(shù)3:傳遞消息額外設置 參數(shù)4诅岩;消息的具體內容
//channel.basicPublish("","aa", null,"hello rabbitmq".getBytes());
channel.basicPublish("","hello", MessageProperties.PERSISTENT_TEXT_PLAIN,"hello rabbitmq".getBytes());
//通過工具類關閉
RabbitMQUtils.closeConnectionAndChanel(channel,connection);
}
}
消費者同樣使用工具類
//消費者
public class Customer {
public static void main(String[] args) throws IOException, TimeoutException {
//通過工具類獲取連接對象
Connection connection = RabbitMQUtils.getConnection();
//獲取連接中的通道
Channel channel = connection.createChannel();
//通道綁定對應的消息隊列--參數(shù)要和生產(chǎn)者保持對應讳苦,否則可能出現(xiàn)錯誤
//參數(shù)1:queue 隊列名稱 如果隊列不存在則自動創(chuàng)建
//參數(shù)2:durable 用來定義隊列特性是否要持久化 true 持久化隊列 false 不持久化
//參數(shù)3:exclusive 是否獨占隊列 true 獨占隊列 false 不獨占
//參數(shù)4:autoDelete 是否在消費完成后自動刪除隊列 true 自動刪除 false 不自動刪除
channel.queueDeclare("hello", true, false, false, null);
//消費信息
//參數(shù)1:消費哪個隊列的信息 隊列名稱
//參數(shù)2:開啟消息的自動確認機制
//參數(shù)3:消費時的回調接口
channel.basicConsume("hello", true, new DefaultConsumer(channel) {
//最后一個參數(shù):消息隊列中取出的信息
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("new String(body)=" + new String(body));
}
});
}
}
封裝成功!
Work Quene 模型
Work queues
吩谦,也被稱為(Task queues
)鸳谜,任務模型。當消息處理比較耗時的時候式廷,可能生產(chǎn)消息的速度會遠遠大于消息的消費速度咐扭。長此以往,消息就會堆積越來越多滑废,無法及時處理蝗肪。此時就可以使用work 模型:讓多個消費者綁定到一個隊列,共同消費隊列中的消息策严。隊列中的消息一旦消費穗慕,就會消失,因此任務是不會被重復執(zhí)行的妻导。
角色:
- P:生產(chǎn)者:任務的發(fā)布者
- C1:消費者-1逛绵,領取任務并且完成任務怀各,假設完成速度較慢
- C2:消費者-2:領取任務并完成任務,假設完成速度快
平均消費信息
生產(chǎn)者
public class Provider {
public static void main(String[] args) throws IOException {
//獲取連接對象
Connection connection = RabbitMQUtils.getConnection();
//獲取通道對象
Channel channel = connection.createChannel();
//通過通道聲明隊列
channel.queueDeclare("work",true,false,false,null );
//生產(chǎn)消息
for (int i = 1; i <= 20; i++) {
channel.basicPublish("","work",null,(i+"hello work queue").getBytes());
}
//關閉資源
RabbitMQUtils.closeConnectionAndChanel(channel,connection);
}
}
消費者
消費者1
public class Customer1 {
public static void main(String[] args) throws IOException {
//獲取連接
Connection connection = RabbitMQUtils.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare("work", true, false, false, null);
//參數(shù)1:隊列名稱 參數(shù)2:消息自動確認 true 消費者自動向rabbitmq確認消息消費 false 不會自動確認消息
channel.basicConsume("work", true, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消費者-1:" + new String(body));
}
});
}
}
消費者2
public class Customer2 {
public static void main(String[] args) throws IOException {
//獲取連接
Connection connection = RabbitMQUtils.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare("work", true, false, false, null);
//參數(shù)1:隊列名稱 參數(shù)2:消息自動確認 true 消費者自動向rabbitmq確認消息消費 false 不會自動確認消息
channel.basicConsume("work", true, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消費者-2:" + new String(body));
}
});
}
}
測速
先分別啟動消費者1和消費者2進行監(jiān)聽术浪,再啟動生產(chǎn)者生產(chǎn)消息
可以看到20條消息被依次平均消費了瓢对,輪詢。
<mark>不過這種消費效果會導致一種問題胰苏。當某一個消費者出現(xiàn)了故障或者消費消息的時間過長硕蛹,則會導致整個隊列的消息都會變慢。類似木桶效應硕并,因此我們需要一種能者多勞多得的消費消息策略法焰。</mark>
消息自動確認機制
Doing a task can take a few seconds. You may wonder what happens if one of the consumers starts a long task and dies with it only partly done. With our current code, once RabbitMQ delivers a message to the consumer it immediately marks it for deletion. In this case, if you kill a worker we will lose the message it was just processing. We'll also lose all the messages that were dispatched to this particular worker but were not yet handled.
But we don't want to lose any tasks. If a worker dies, we'd like the task to be delivered to another worker.
我們在channel.basicConsume()
中配置了消息自動確認。
<mark>不過開啟自動確認機制還是存在隱患倔毙,例如上圖消息消費者1接收到了5個消息后埃仪,自動確認了。則隊列中的消息則會收到確認后刪除陕赃。而消費者1消費3個消息后出現(xiàn)故障宕機卵蛉,則為被消費的剩余兩個消息則丟失了。</mark>
能者多勞多得
- 關閉自動確認 channel.basicConsume(隊列名, false,回調函數(shù))
- 道道每一次只消費一條信息 channel.basicQos(1);
- 消費消息后手動確認 channel.basicAck(envelope.getDeliveryTag(),false);
消費者
public class Customer1 {
public static void main(String[] args) throws IOException {
//獲取連接
Connection connection = RabbitMQUtils.getConnection();
Channel channel = connection.createChannel();
//每一次只消費一條消息
channel.basicQos(1);
channel.queueDeclare("work", true, false, false, null);
//參數(shù)1:隊列名稱 參數(shù)2:消息自動確認 true 消費者自動向rabbitmq確認消息消費 false 不會自動確認消息
channel.basicConsume("work", false, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("消費者-1:" + new String(body));
//手動確認信息
//參數(shù)1:確認隊列中哪個具體消息 參數(shù)2:是否開啟多個消息同時確認
channel.basicAck(envelope.getDeliveryTag(),false);
}
});
}
}
消費者1每次消費消息前先睡眠2秒么库。
public class Customer2 {
public static void main(String[] args) throws IOException {
//獲取連接
Connection connection = RabbitMQUtils.getConnection();
Channel channel = connection.createChannel();
//每一次只消費一條消息
channel.basicQos(1);
channel.queueDeclare("work", true, false, false, null);
//參數(shù)1:隊列名稱 參數(shù)2:消息自動確認 true 消費者自動向rabbitmq確認消息消費 false 不會自動確認消息
channel.basicConsume("work", false, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消費者-2:" + new String(body));
//手動確認信息
//參數(shù)1:確認隊列中哪個具體消息 參數(shù)2:是否開啟多個消息同時確認
channel.basicAck(envelope.getDeliveryTag(),false);
}
});
}
}
測試
兩個消費者進行修改后傻丝,消費者1的消費速度遠遠小于消費者2 。此時看消費100條信息的結果诉儒。
消費者1只消費了1條消息葡缰,而消費者2消費了99條消息。實現(xiàn)了能者多勞多得多效果忱反!
Fanout 模型
廣播模型
角色:
- P 生產(chǎn)者
- C 消費者
- X 交換機
在廣播模式下运准,消息發(fā)送流程是這樣的:
- 可以有多個消費者
- 每個消費者有自己的queue(隊列)
- 每個隊列都要綁定到Exchange(交換機)
- 生產(chǎn)者發(fā)送的消息,只能發(fā)送到交換機缭受,交換機來決定要發(fā)給哪個隊列胁澳,生產(chǎn)者無法決定。
- 交換機把消息發(fā)送給綁定過的所有隊列
- 隊列的消費者都能拿到消息米者。實現(xiàn)一條消息被多個消費者消費
生產(chǎn)者
聲明交換機channel.exchangeDeclare("logs", "fanout");
參數(shù)1:交換機名稱 參數(shù)2:交換機類型 fanout 廣播類型
public class Provider {
public static void main(String[] args) throws IOException {
//獲取連接對象
Connection connection = RabbitMQUtils.getConnection();
Channel channel = connection.createChannel();
//聲明指定的的交換機
//參數(shù)1:交換機名稱 參數(shù)2:交換機類型 fanout 廣播類型
channel.exchangeDeclare("logs", "fanout");
//發(fā)送消息
channel.basicPublish("logs", "", null, "fanout type message".getBytes());
//釋放資源
RabbitMQUtils.closeConnectionAndChanel(channel, connection);
}
}
消費者
消費者1
-
channel.exchangeDeclare("logs","fanout");
通道綁定交換機 -
String queue = channel.queueDeclare().getQueue();
獲取臨時隊列 -
channel.queueBind(queue,"logs","");
綁定交換機和隊列
public class Customer1 {
public static void main(String[] args) throws IOException {
//獲取連接對象
Connection connection = RabbitMQUtils.getConnection();
Channel channel = connection.createChannel();
//通道綁定交換機
channel.exchangeDeclare("logs","fanout");
//
String queue = channel.queueDeclare().getQueue();
//綁定交換機和隊列
channel.queueBind(queue,"logs","");
//消費消息
channel.basicConsume(queue,true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消費者1:"+new String(body));
}
});
}
}
消費者2和消費者3復制修改輸入語句即可韭畸。
測試
首先啟動消費者們,在啟動生產(chǎn)者生產(chǎn)消息蔓搞。
測試成功胰丁!
Direct 模型
在Fanout模式中,一條消息喂分,會被所有訂閱的隊列都消費锦庸。但是,在某些場景下蒲祈,我們希望不同的消息被不同的隊列消費甘萧。這時就要用到Direct類型的Exchange萝嘁。
在Direct模型下:
- 隊列與交換機的綁定,不能是任意綁定了扬卷,而是要指定一個
RoutingKey
(路由key) - 消息的發(fā)送方在 向 Exchange發(fā)送消息時牙言,也必須指定消息的
RoutingKey
。 - Exchange不再把消息交給每一個綁定的隊列怪得,而是根據(jù)消息的
Routing Key
進行判斷咱枉,只有隊列的Routingkey
與消息的Routing key
完全一致,才會接收到消息
圖解:
- P:生產(chǎn)者徒恋,向Exchange發(fā)送消息蚕断,發(fā)送消息時,會指定一個routing key入挣。
- X:Exchange(交換機)基括,接收生產(chǎn)者的消息,然后把消息遞交給 與routing key完全匹配的隊列
- C1:消費者财岔,其所在隊列指定了需要routing key 為 error 的消息
- C2:消費者,其所在隊列指定了需要routing key 為 info河爹、error匠璧、warning 的消息
生產(chǎn)者
public class Provider {
public static void main(String[] args) throws IOException {
String exchangeName = "logs_direct";
//獲取連接對象
Connection connection = RabbitMQUtils.getConnection();
//獲取連接通道對象
Channel channel = connection.createChannel();
//通過通道聲明交換機
//參數(shù)1:交換機名稱 參數(shù)2:direct 路由模式
channel.exchangeDeclare(exchangeName,"direct");
//發(fā)送消息
String routingKey = "error";
channel.basicPublish(exchangeName,routingKey,null,("這是direct模型發(fā)布的基于routingKey:【"+routingKey+"】").getBytes());
//關閉資源
RabbitMQUtils.closeConnectionAndChanel(channel,connection);
}
}
消費者
消費者1
public class Customer1 {
public static void main(String[] args) throws IOException {
String exchangeName = "logs_direct";
//獲取連接對象
Connection connection = RabbitMQUtils.getConnection();
Channel channel = connection.createChannel();
//通道綁定交換機
channel.exchangeDeclare(exchangeName,"direct");
//獲取臨時隊列
String queue = channel.queueDeclare().getQueue();
//基于routingKey綁定交換機和隊列
channel.queueBind(queue,exchangeName,"error");
//消費消息
channel.basicConsume(queue,true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消費者1:"+new String(body));
}
});
}
}
消費者2
public class Customer2 {
public static void main(String[] args) throws IOException {
String exchangeName = "logs_direct";
//獲取連接對象
Connection connection = RabbitMQUtils.getConnection();
Channel channel = connection.createChannel();
//通道綁定交換機
channel.exchangeDeclare(exchangeName,"direct");
//獲取臨時隊列
String queue = channel.queueDeclare().getQueue();
//基于routingKey綁定交換機和隊列
channel.queueBind(queue,exchangeName,"info");
channel.queueBind(queue,exchangeName,"error");
channel.queueBind(queue,exchangeName,"warning");
//消費消息
channel.basicConsume(queue,true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消費者2:"+new String(body));
}
});
}
}
測試
生產(chǎn)者發(fā)送消息是的routingKey為error
所以消費者1,和消費者2都能消費
當我們被生產(chǎn)者綁定當routingKey改為info
時咸这,此時只有消費者2才能消費
測試成功夷恍!
Topic 模型
Topic
類型的Exchange
與Direct
相比,都是可以根據(jù)RoutingKey
把消息路由到不同的隊列媳维。只不過Topic
類型Exchange
可以讓隊列在綁定Routing key
的時候使用通配符酿雪!這種模型Routingkey
一般都是由一個或多個單詞組成,多個單詞之間以”.”分割侄刽,例如: item.insert
生產(chǎn)者
public class Provider {
public static void main(String[] args) throws IOException {
//獲取連接對象
Connection connection = RabbitMQUtils.getConnection();
Channel channel = connection.createChannel();
//聲明交換機以及交換機類型 topic
channel.exchangeDeclare("topics","topic");
String routingKey = "user.save.find";
//發(fā)布消息
channel.basicPublish("topics",routingKey,null,("這里是topic動態(tài)路由模型,routingKey:【"+routingKey+"】").getBytes());
//釋放資源
RabbitMQUtils.closeConnectionAndChanel(channel,connection);
}
}
消費者
消費者1
public class Customer1 {
public static void main(String[] args) throws IOException {
String exchangeName = "topics";
//獲取連接對象
Connection connection = RabbitMQUtils.getConnection();
Channel channel = connection.createChannel();
//通道綁定交換機
channel.exchangeDeclare(exchangeName,"topic");
//獲取臨時隊列
String queue = channel.queueDeclare().getQueue();
//基于routingKey綁定交換機和隊列
channel.queueBind(queue,exchangeName,"user.*");
//消費消息
channel.basicConsume(queue,true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消費者1:"+new String(body));
}
});
}
}
消費者2
public class Customer2 {
public static void main(String[] args) throws IOException {
String exchangeName = "topics";
//獲取連接對象
Connection connection = RabbitMQUtils.getConnection();
Channel channel = connection.createChannel();
//通道綁定交換機
channel.exchangeDeclare(exchangeName,"topic");
//獲取臨時隊列
String queue = channel.queueDeclare().getQueue();
//基于routingKey綁定交換機和隊列
channel.queueBind(queue,exchangeName,"user.#");
//消費消息
channel.basicConsume(queue,true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消費者2:"+new String(body));
}
});
}
}
測試
生產(chǎn)者綁定的routingKey為user.save.find
指黎,消費者1的routingKey為user.*
,消費者2的routingKey為user.#
。所以只符合消費者2州丹。
測試成功醋安!