RabbitMQ的幾種常見模式的詳細(xì)介紹和使用---實踐
官網(wǎng):RabbitMQ
官方文檔:各個模式簡介
RabbitMQ就不詳細(xì)介紹了,以下就是各個模式的原理和實踐操作:
1.安裝配置
查看mq鏡像: docker search rabbitmq:management
下載mq鏡像: docker pull rabbitmq:management
安裝鏡像:docker run -d --name rabbit -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=admin -p 15672:15672 -p 5672:5672 -p 25672:25672 -p 61613:61613 -p 1883:1883 rabbitmq:management
說明
5672:默認(rèn)的客戶端連接的端口
15672:默認(rèn)的web管理界面的端口
命令中的【RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=admin】是web管理平臺的用戶名和密碼
【 -p 15672:15672】 是控制平臺docker映射到系統(tǒng)的對應(yīng)端口
【 -p 5672:5672】 是應(yīng)用程序的訪問端口
訪問地址
http://ip:15672
如果是linux服務(wù)器液肌,首先開放服務(wù)器端口挟炬,例如阿里云鸥滨,先配置安全組:
添加:
開始安裝:
查詢 docker search rabbitmq:management
下載 docker search rabbitmq:management
安裝 docker run -d --name rabbit -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=admin -p 15672:15672 -p 5672:5672 -p 25672:25672 -p 61613:61613 -p 1883:1883 rabbitmq:management
安裝成功
訪問地址:
http://你的ip:15672
2.測試
先創(chuàng)建一個連接類:
package boot.spring.controller;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
/**
* @description
* @AUTHER: sk
* @DATE: 2021/9/26
**/
public class ConnectionUtil {
/**
* 獲取連接
* @return Connection
* @throws Exception
*/
public static Connection getConnection() throws Exception {
//定義連接工廠
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("你的ip");
factory.setPort(5672);
//設(shè)置vhost
factory.setVirtualHost("/");
factory.setUsername("admin");
factory.setPassword("admin");
//通過工廠獲取連接
Connection connection = factory.newConnection();
return connection;
}
}
2.1簡單模式
一個生產(chǎn)者蒋纬,一個消費者昔驱。
原理圖:發(fā)送:
package boot.spring.controller.easy;
import boot.spring.controller.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
/**
* @description 1.簡單模式:一個生產(chǎn)者一個消費者
* @AUTHER: sk
* @DATE: 2021/9/26
**/
public class TestSend {
public final static String QUEUE_NAME = "test-queue";
//創(chuàng)建隊列,發(fā)送消息
public static void main(String[] args) throws Exception {
//獲取連接
Connection connection = ConnectionUtil.getConnection();
//創(chuàng)建通道
Channel channel = connection.createChannel();
//聲明創(chuàng)建隊列
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
//消息內(nèi)容
String message = "Hello World!";
channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
System.out.println("發(fā)送消息:"+message);
//關(guān)閉連接和通道
channel.close();
connection.close();
}
}
生產(chǎn)的一條消息未被消費:
接收:
package boot.spring.controller.easy;
import boot.spring.controller.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.QueueingConsumer;
/**
* @description 簡單模式一個生產(chǎn)者一個消費者
* @AUTHER: sk
* @DATE: 2021/9/26
**/
public class TestResive {
//消費者消費消息
public static void main(String[] args) throws Exception {
//獲取連接和通道
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
//聲明通道
channel.queueDeclare(TestSend.QUEUE_NAME,false,false,false,null);
//定義消費者
QueueingConsumer consumer = new QueueingConsumer(channel);
//監(jiān)聽隊列
channel.basicConsume(TestSend.QUEUE_NAME,true,consumer);
while(true){
//這個方法會阻塞住,直到獲取到消息
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println("接收到消息:"+message);
}
}
}
已被消費:
2.2 work模式
競爭消費者模式
一個生產(chǎn)者适室,多個消費者,每個消費者獲取到的消息唯一鸥拧,生產(chǎn)的消息會被消費者瓜分茉稠。
原理圖:生產(chǎn)100條消息:
package boot.spring.controller.work;
import boot.spring.controller.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
/**
* @description 2.work模式:一個生產(chǎn)者多個消費者
* @AUTHER: sk
* @DATE: 2021/9/26
**/
public class WorkSend2 {
public final static String QUEUE_NAME = "test2";
//消息生產(chǎn)者
public static void main(String[] args) throws Exception {
//獲取連接和通道
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
//聲明隊列
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
String message = "";
for(int i = 0; i<100; i++){
message = "" + i;
channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
System.out.println("發(fā)送消息:"+message);
Thread.sleep(i);
}
channel.close();
connection.close();
}
}
消費者1:
package boot.spring.controller.work;
import boot.spring.controller.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.QueueingConsumer;
/**
* @description 2.work模式:一個生產(chǎn)者多個消費者
* @AUTHER: sk
* @DATE: 2021/9/26
**/
public class WorkResive1 {
//消費者1 自動模式
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(WorkSend2.QUEUE_NAME,false,false,false,null);
//同一時刻服務(wù)器只發(fā)送一條消息給消費端
channel.basicQos(1);
QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume(WorkSend2.QUEUE_NAME,false,consumer);
while(true){
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println("recive1:"+message);
Thread.sleep(100);
//消息消費完給服務(wù)器返回確認(rèn)狀態(tài),表示該消息已被消費
channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
}
}
}
消費者1卿吐,消費了100條消息中的一半:
消費者2:
package boot.spring.controller.work;
import boot.spring.controller.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.QueueingConsumer;
/**
* @description 2.work模式:一個生產(chǎn)者多個消費者
* @AUTHER: sk
* @DATE: 2021/9/26
**/
public class WorkResive2 {
//消費者2 手動模式
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare("test2",false,false,false,null);
channel.basicQos(1);
QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume("test2",true,consumer);
while(true){
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println("recive1:"+message);
Thread.sleep(10);
//channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
}
}
}
消費者2消費了100條消息的另一半:
2.3 訂閱模式
生產(chǎn)者將消息發(fā)送到交換機旁舰,消費者從交換機獲取消息。
原理圖:生產(chǎn)者發(fā)送消息到交換機:
package boot.spring.controller.exchange;
import boot.spring.controller.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
/**
* @description 3.訂閱者模式:一個生產(chǎn)者發(fā)送的消息會被多個消費者獲取
* @AUTHER: sk
* @DATE: 2021/9/26
**/
public class Send {
public static final String EXCHANGE_NAME = "test_exchange_fanout";
//生產(chǎn)者嗡官,發(fā)送消息到交換機
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
//聲明交換機 fanout:交換機類型 主要有fanout,direct,topics三種
channel.exchangeDeclare(EXCHANGE_NAME,"fanout");
String message = "訂閱模式:消息007!";
channel.basicPublish(EXCHANGE_NAME,"",null,message.getBytes());
System.out.println(message);
channel.close();
connection.close();
}
}
生產(chǎn)者產(chǎn)生的消息:
消費者1:
package boot.spring.controller.exchange;
import boot.spring.controller.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.QueueingConsumer;
/**
* @description 3.訂閱者模式:一個生產(chǎn)者發(fā)送的消息會被多個消費者獲取
* @AUTHER: sk
* @DATE: 2021/9/26
**/
public class Resive1 {
//消費者1
public final static String QUEUE_NAME = "test_queue_exchange_1";
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
//綁定隊列到交換機上
channel.queueBind(QUEUE_NAME,Send.EXCHANGE_NAME,"");
channel.basicQos(1);
QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume(QUEUE_NAME,true,consumer);
while(true){
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println("消費者1:"+message);
}
}
}
消費者2:
package boot.spring.controller.exchange;
import boot.spring.controller.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.QueueingConsumer;
/**
* @description 3.訂閱者模式:一個生產(chǎn)者發(fā)送的消息會被多個消費者獲取
* @AUTHER: sk
* @DATE: 2021/9/26
**/
public class Resive2 {
//消費者2
public final static String QUEUE_NAME = "test_queue_exchange_2";
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
//綁定隊列到交換機上
channel.queueBind(QUEUE_NAME,Send.EXCHANGE_NAME,"");
channel.basicQos(1);
QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume(QUEUE_NAME,true,consumer);
while(true){
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println("消費者2:"+message);
}
}
}
由此可見箭窜,訂閱者模式中,所有的消費者都通過交換機收到了消息衍腥。
2.4 路由模式
生產(chǎn)者發(fā)送消息到隊列中時可自定義一個key磺樱,消費者可根據(jù)key去選擇對應(yīng)的消息,各取所需婆咸。
注意:路由key竹捉,是一種完全匹配,只有匹配到的消費者才能消費消息尚骄。
生產(chǎn)者生產(chǎn)帶key的消息:(key=“dog”)
package boot.spring.controller.rout;
import boot.spring.controller.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
/**
* @description 4.路由模式:發(fā)送消息到交換機并且要指定路由key
* @AUTHER: sk
* @DATE: 2021/9/26
**/
public class RoutSend {
public static final String EXCHANGE_NAME = "test_exchange_direct";
//生產(chǎn)者
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
//聲明交換機 fanout:交換機類型 主要有fanout,direct,topics三種
channel.exchangeDeclare(EXCHANGE_NAME,"direct");
String message = "路由模式產(chǎn)生的消息!";
channel.basicPublish(EXCHANGE_NAME,"dog",null,message.getBytes());
System.out.println(message);
channel.close();
connection.close();
}
}
消費者1:(key=“dog”)
package boot.spring.controller.rout;
import boot.spring.controller.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.QueueingConsumer;
/**
* @description 4.路由模式:消費者將隊列綁定到交換機時需要指定路由key
* @AUTHER: sk
* @DATE: 2021/9/26
**/
public class RoutResive1 {
//消費者1
public final static String QUEUE_NAME = "test_queue_direct_1";
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
//綁定隊列到交換機上,并制定路由鍵為"dog"
channel.queueBind(QUEUE_NAME, RoutSend.EXCHANGE_NAME,"dog");
channel.basicQos(1);
QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume(QUEUE_NAME,true,consumer);
while(true){
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println("RoutResive1:"+message);
}
}
}
消費者2:(key=“cat”)
package boot.spring.controller.rout;
import boot.spring.controller.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.QueueingConsumer;
/**
* @description 4.路由模式:消費者將隊列綁定到交換機時需要指定路由key
* @AUTHER: sk
* @DATE: 2021/9/26
**/
public class RoutResive2 {
//消費者2
public final static String QUEUE_NAME = "test_queue_direct_2";
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
//綁定隊列到交換機上,并制定路由鍵為"cat"
channel.queueBind(QUEUE_NAME, RoutSend.EXCHANGE_NAME,"cat");
channel.basicQos(1);
QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume(QUEUE_NAME,true,consumer);
while(true){
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println("RoutResive2:"+message);
}
}
}
很顯然块差,消費者1獲取到了消息,消費者2并沒有獲取到消息,因為消費者2的key與生產(chǎn)者的key不一致憨闰。
2.5 通配符模式
原理和路由模式類似询兴,只是key值作了模糊匹配而已。
*(星號)可以正好代替一個詞起趾。
# (hash) 可以代替零個或多個單詞
-
topic交換器通過模式匹配分配消息的路由鍵屬性诗舰,將路由鍵和某個模式進行匹配,此時隊列需要綁定到一個模式上训裆。它將路由鍵和綁定鍵的字符串切分成單詞眶根,這些單詞之間用點隔開。它同樣也會識別兩個通配符:符號“#”和符號“”边琉。#匹配0個或多個單詞属百,匹配一個單詞。如下圖所示:
生產(chǎn)者產(chǎn)生消息:
package boot.spring.controller.topic;
import boot.spring.controller.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
/**
* @description 5.路由模式:發(fā)送消息到交換機并且要指定通配符路由
* @AUTHER: sk
* @DATE: 2021/9/26
**/
public class TopicSend {
//生產(chǎn)者
public static final String EXCHANGE_NAME = "test_exchange_topic";
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
//聲明交換機 topic:交換機類型
channel.exchangeDeclare(EXCHANGE_NAME,"topic");
String message = "通配符模式產(chǎn)生的消息";
channel.basicPublish(EXCHANGE_NAME,"dog.1",null,message.getBytes());
System.out.println(message);
channel.close();
connection.close();
}
}
消費者1:
package boot.spring.controller.topic;
import boot.spring.controller.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.QueueingConsumer;
/**
* @description 5.路由模式:消費者將隊列綁定到交換機時需要指定通配符路由key
* @AUTHER: sk
* @DATE: 2021/9/26
**/
public class TopicResive1 {
//消費者1
public final static String QUEUE_NAME = "test_queue_topic_1";
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
//綁定隊列到交換機上,并制定路由鍵匹配規(guī)則為"dog.*"
channel.queueBind(QUEUE_NAME, TopicSend.EXCHANGE_NAME,"dog.*");
channel.basicQos(1);
QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume(QUEUE_NAME,true,consumer);
while(true){
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println("TopicResive1:"+message);
}
}
}
消費者2:
package boot.spring.controller.topic;
import boot.spring.controller.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.QueueingConsumer;
/**
* @description 5.路由模式:消費者將隊列綁定到交換機時需要指定通配符路由key
* @AUTHER: sk
* @DATE: 2021/9/26
**/
public class TopicResive2 {
//消費者2
public final static String QUEUE_NAME = "test_queue_topic_2";
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
//綁定隊列到交換機上,并制定路由鍵匹配規(guī)則為"#.1"
channel.queueBind(QUEUE_NAME, TopicSend.EXCHANGE_NAME,"#.1");
channel.basicQos(1);
QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume(QUEUE_NAME,true,consumer);
while(true){
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println("TopicResive2:"+message);
}
}
}
消費者3:
package boot.spring.controller.topic;
import boot.spring.controller.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.QueueingConsumer;
/**
* @description 5.路由模式:消費者將隊列綁定到交換機時需要指定通配符路由key
* @AUTHER: sk
* @DATE: 2021/9/26
**/
public class TopicResive3 {
//消費者3
public final static String QUEUE_NAME = "test_queue_topic_3";
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
//綁定隊列到交換機上,并制定路由鍵匹配規(guī)則為"cat.#"
channel.queueBind(QUEUE_NAME, TopicSend.EXCHANGE_NAME,"cat.#");
channel.basicQos(1);
QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume(QUEUE_NAME,true,consumer);
while(true){
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println("TopicResive3:"+message);
}
}
}
結(jié)果:消費者1和消費者2可以收到消息变姨,消費者3不能收到消息族扰。
完整代碼地址:
https://github.com/DongFangXiaoYu/springBoot-RabbitMq/tree/master