1、RabbitMQ作用
1.1、流量消峰(解決高并發(fā))
1.2、模塊之間的異步通信
1.3击碗、消息隊(duì)列的中間件有哪些
ActiveMQ---------JMS(SUN公司提供的規(guī)范) Java message Server
RabbitMQ-------在當(dāng)下很多公司都用這一個(gè)
RocketMQ------阿里的
kafka------------用的比較多---最初的設(shè)計(jì) 是用來 完成分布式下日志的收集框架
1.4、RabbitMQ的基本安裝
#安裝之前需要的環(huán)境
yum install epel-release
yum install erlang
#安裝rabbitMQ了
下載rpm文件
wget http://www.rabbitmq.com/releases/rabbitmq-server/v3.6.15/rabbitmq-server-3.6.15-1.el7.noarch.rpm
#下載完成需要安裝
yum install rabbitmq-server-3.6.15-1.el7.noarch.rpm
#設(shè)置開機(jī)啟動(dòng)
systemctl enable rabbitmq-server.service
#查看服務(wù)的狀態(tài)
systemctl status rabbitmq-server.service
#啟動(dòng)這個(gè)服務(wù)
systemctl start rabbitmq-server.service
#停止這個(gè)服務(wù)
systemctl stop rabbitmq-server.service
#查看當(dāng)前所有的用戶
rabbitmqctl list_users
#查看guest用戶所有擁有的權(quán)限
rabbitmqctl list_user_permissions guest
#刪除原來的guest用戶
rabbitmqctl delete_user guest
#添加一個(gè)新的用戶
rabbitmqctl add_user xiaobobo 12345678
#給小波波設(shè)置個(gè)角色(tag)
rabbitmqctl set_user_tags xiaobobo administrator
#給xiaobobo賦予權(quán)限
rabbitmqctl set_permissions -p / xiaobobo ".*" ".*" ".*"
#查看用戶所擁有的權(quán)限
rabbitmqctl list_user_permissions xiaobobo
#開啟web的管理端
rabbitmq-plugins enable rabbitmq_management
1.5们拙、RabbitMQ中的五種通信模型
1.5.1、helloworld模型
意思是:生產(chǎn)者將消息發(fā)送到隊(duì)列 然后隊(duì)列將這個(gè)消息發(fā)送給消費(fèi)者
1.5.2阁吝、測(cè)試用例
(1)導(dǎo)包
<!--導(dǎo)入RabbitMQ的相關(guān)的包-->
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>4.5.0</version>
</dependency>
(2)生產(chǎn)者
//申明隊(duì)列的名字
private static final String QUEUE_NAME="nz1904-helloword";
public static void main(String[] args) throws IOException, TimeoutException {
//第一步:獲取連接
Connection connection = ConnectionUtils.getConnection();
//第二步:創(chuàng)建數(shù)據(jù)傳輸?shù)耐ǖ? Channel channel = connection.createChannel();
//第三步:申明隊(duì)列
/**
* 第一個(gè)參數(shù):隊(duì)列的名字
* 第二個(gè)參數(shù):是否持久化 比如現(xiàn)在發(fā)送到隊(duì)列里面的消息 如果沒有持久化 重啟這個(gè)隊(duì)列后數(shù)據(jù)會(huì)丟失(false) true:重啟之后數(shù)據(jù)依然在
* 第三個(gè)參數(shù):是否排外
* 1:連接關(guān)閉之后 這個(gè)隊(duì)列是否自動(dòng)刪除
* 2:是否允許其他通道來進(jìn)行訪問這個(gè)數(shù)據(jù)
* 第四個(gè)參數(shù):是否允許自動(dòng)刪除
* 就是當(dāng)最后一個(gè)連接斷開的時(shí)候 這個(gè)時(shí)候是否允許自動(dòng)刪除這個(gè)隊(duì)列
* 第五個(gè)參數(shù):申明隊(duì)列的時(shí)候 要附帶的一些參數(shù)
*/
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
//發(fā)送數(shù)據(jù)到隊(duì)列
/**
* 第一個(gè)參數(shù):exchange 交換機(jī) 沒有就設(shè)置為"值就可以了"
* 第二個(gè)參數(shù):原本的意思是路由的key 現(xiàn)在沒有key直接使用隊(duì)列的名字
* 第三個(gè)參數(shù):發(fā)送數(shù)據(jù)到隊(duì)列的時(shí)候 是否要帶一些參數(shù) 沒有帶任何參數(shù)
* 第四個(gè)參數(shù):向隊(duì)列中發(fā)送的數(shù)據(jù)
*/
channel.basicPublish("",QUEUE_NAME,null,"我是小波波1111".getBytes("utf-8"));
channel.close();
connection.close();
}
(3)消費(fèi)者的寫法
//申明隊(duì)列的名字
private static final String QUEUE_NAME="nz1904-helloword";
public static void main(String[] args) throws IOException, TimeoutException {
//獲取連接
Connection connection = ConnectionUtils.getConnection();
//創(chuàng)建通道
final Channel channel = connection.createChannel();
//申明隊(duì)列
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
//消費(fèi)者的申明
DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
/**
*
* @param consumerTag:這個(gè)消息的唯一標(biāo)記
* @param envelope:信封(請(qǐng)求的消息屬性的一個(gè)封裝)
* @param properties:前面隊(duì)列帶過來的值
* @param body :接受到的消息
* @throws IOException
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("接受到的消息是:"+new String(body));
//進(jìn)行手動(dòng)應(yīng)答
/**
* 第一個(gè)參數(shù):自動(dòng)應(yīng)答的這個(gè)消息標(biāo)記
* 第二個(gè)參數(shù):false 就相當(dāng)于告訴隊(duì)列受到消息了
*/
channel.basicAck(envelope.getDeliveryTag(),false);
}
};
//綁定這個(gè)消費(fèi)者
/**
* 第一個(gè)參數(shù):隊(duì)列的名字
* 第二個(gè)參數(shù):是否自動(dòng)應(yīng)答
* 第三個(gè)參數(shù):消費(fèi)者的申明
*/
channel.basicConsume(QUEUE_NAME,false,defaultConsumer);
}
1.5.3砚婆、work模型的玩法
多個(gè)消費(fèi)者消費(fèi)的數(shù)據(jù)之和才是原來隊(duì)列中的所有數(shù)據(jù) 適用于流量的消峰
(1)生產(chǎn)者
public class Producer {
private static final String QUEUE_NAME="nz1904-work";
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtils.getConnection();
Channel channel = connection.createChannel();
//申明隊(duì)列
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
//下面向隊(duì)列中發(fā)送100條消息
for (int i = 0; i <100 ; i++) {
channel.basicPublish("",QUEUE_NAME,null,("我是工作模型:"+i).getBytes());
}
channel.close();
connection.close();
}
}
(2)消費(fèi)者1寫法
public class Consumer {
//申明隊(duì)列的名字
private static final String QUEUE_NAME="nz1904-work";
public static void main(String[] args) throws IOException, TimeoutException {
//獲取連接
Connection connection = ConnectionUtils.getConnection();
//創(chuàng)建通道
final Channel channel = connection.createChannel();
//申明隊(duì)列
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
//消費(fèi)者的申明
DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
/**
*
* @param consumerTag:這個(gè)消息的唯一標(biāo)記
* @param envelope:信封(請(qǐng)求的消息屬性的一個(gè)封裝)
* @param properties:前面隊(duì)列帶過來的值
* @param body :接受到的消息
* @throws IOException
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消費(fèi)者1接受到的消息是:"+new String(body));
//進(jìn)行手動(dòng)應(yīng)答
/**
* 第一個(gè)參數(shù):自動(dòng)應(yīng)答的這個(gè)消息標(biāo)記
* 第二個(gè)參數(shù):false 就相當(dāng)于告訴隊(duì)列受到消息了
*/
channel.basicAck(envelope.getDeliveryTag(),false);
}
};
//綁定這個(gè)消費(fèi)者
/**
* 第一個(gè)參數(shù):隊(duì)列的名字
* 第二個(gè)參數(shù):是否自動(dòng)應(yīng)答
* 第三個(gè)參數(shù):消費(fèi)者的申明
*/
channel.basicConsume(QUEUE_NAME,false,defaultConsumer);
}
}
(3)消費(fèi)者2寫法
public class Consumer1 {
//申明隊(duì)列的名字
private static final String QUEUE_NAME="nz1904-work";
public static void main(String[] args) throws IOException, TimeoutException {
//獲取連接
Connection connection = ConnectionUtils.getConnection();
//創(chuàng)建通道
final Channel channel = connection.createChannel();
//申明隊(duì)列
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
//消費(fèi)者的申明
DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
/**
*
* @param consumerTag:這個(gè)消息的唯一標(biāo)記
* @param envelope:信封(請(qǐng)求的消息屬性的一個(gè)封裝)
* @param properties:前面隊(duì)列帶過來的值
* @param body :接受到的消息
* @throws IOException
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消費(fèi)者2接受到的消息是:"+new String(body));
//進(jìn)行手動(dòng)應(yīng)答
/**
* 第一個(gè)參數(shù):自動(dòng)應(yīng)答的這個(gè)消息標(biāo)記
* 第二個(gè)參數(shù):false 就相當(dāng)于告訴隊(duì)列受到消息了
*/
channel.basicAck(envelope.getDeliveryTag(),false);
}
};
//綁定這個(gè)消費(fèi)者
/**
* 第一個(gè)參數(shù):隊(duì)列的名字
* 第二個(gè)參數(shù):是否自動(dòng)應(yīng)答
* 第三個(gè)參數(shù):消費(fèi)者的申明
*/
channel.basicConsume(QUEUE_NAME,false,defaultConsumer);
}
}
1.5.4、發(fā)布訂閱模型
簡(jiǎn)單的說就是隊(duì)列里面的消息會(huì)被幾個(gè)消費(fèi)者 同時(shí)接受到
模型 適合于做模塊之間的異步通信
例子: 就可以使用這種模型 來發(fā)送日志信息 ------ 立馬就會(huì)被log收集程序
收集到 直接寫到咋們的文件里面
例子:springcloud的config組件里面通知配置自動(dòng)更新
例子:緩存同步也可以使用這一個(gè)
例子:高并發(fā)下實(shí)現(xiàn)下單邏輯
(1)生產(chǎn)者寫法
public class Producer {
//申明交換機(jī)的名字
private static final String EXCHANGE_NAME="nz1904-fanout-01";
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtils.getConnection();
Channel channel = connection.createChannel();
//申明交換機(jī)
/**
* 第一個(gè)參數(shù):交換機(jī)的名字
* 第二個(gè)參數(shù):交換機(jī)的類型
* 交換機(jī)的類型是不能亂寫的 如果使用的是發(fā)布訂閱模型 只能寫 fanout
*/
channel.exchangeDeclare(EXCHANGE_NAME,"fanout");
//發(fā)送消息到交換機(jī)了
for (int i = 0; i <100 ; i++) {
channel.basicPublish(EXCHANGE_NAME,"",null,("發(fā)布訂閱模型的值:"+i).getBytes());
}
//關(guān)閉資源
channel.close();
connection.close();
}
}
(2)消費(fèi)者1寫法
public class Consumer {
//申明交換機(jī)的名字
private static final String EXCHANGE_NAME="nz1904-fanout-01";
//隊(duì)列的名字
private static final String QUEUE_NAME="nz1904-fanout-queue1";
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtils.getConnection();
Channel channel = connection.createChannel();
//申明隊(duì)列
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
//申明交換機(jī)
channel.exchangeDeclare(EXCHANGE_NAME,"fanout");
//將隊(duì)列綁定到交換機(jī)
/**
* 第一個(gè)參數(shù):隊(duì)列的名字
* 第二個(gè)參數(shù):交換機(jī)的名字
* 第三個(gè)參數(shù):路由的key(現(xiàn)在沒有用到這個(gè)路由的key)
*/
channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"");
//申明消費(fèi)者
DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("隊(duì)列1接受到的數(shù)據(jù)是:"+new String(body));
}
};
//就進(jìn)行消費(fèi)者的綁定
channel.basicConsume(QUEUE_NAME,true,defaultConsumer);
}
}
(3)消費(fèi)者2寫法
public class Consumer1 {
//申明交換機(jī)的名字
private static final String EXCHANGE_NAME="nz1904-fanout-01";
//隊(duì)列的名字
private static final String QUEUE_NAME="nz1904-fanout-queue2";
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtils.getConnection();
Channel channel = connection.createChannel();
//申明隊(duì)列
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
//申明交換機(jī)
channel.exchangeDeclare(EXCHANGE_NAME,"fanout");
//將隊(duì)列綁定到交換機(jī)
/**
* 第一個(gè)參數(shù):隊(duì)列的名字
* 第二個(gè)參數(shù):交換機(jī)的名字
* 第三個(gè)參數(shù):路由的key(現(xiàn)在沒有用到這個(gè)路由的key)
*/
channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"");
//申明消費(fèi)者
DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("隊(duì)列2接受到的數(shù)據(jù)是:"+new String(body));
}
};
//就進(jìn)行消費(fèi)者的綁定
channel.basicConsume(QUEUE_NAME,true,defaultConsumer);
}
}
1.5.5突勇、路由模型
路由模式相當(dāng)于是分布訂閱模式的升級(jí)版
(1)生產(chǎn)者寫法
public class Producer {
private static final String EXCHANGE_NAME="nz1904-exchange-direct-01";
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtils.getConnection();
Channel channel = connection.createChannel();
/**
* 如果玩的是路由模型 交換機(jī)的類型只能是 direct
*/
channel.exchangeDeclare(EXCHANGE_NAME,"direct");
//發(fā)送信息到交換機(jī)
for (int i = 0; i <100 ; i++) {
if(i%2==0){
//這個(gè)路由的key是可以隨便設(shè)置的
channel.basicPublish(EXCHANGE_NAME,"xiaowangzi",null,("路由模型的值:"+i).getBytes());
}else{
//這個(gè)路由的key是可以隨便設(shè)置的
channel.basicPublish(EXCHANGE_NAME,"xiaobobo",null,("路由模型的值:"+i).getBytes());
}
}
channel.close();
connection.close();
}
}
(2)消費(fèi)者1寫法
public class Cosnumer1 {
private static final String QUEUE_NAME="nz1904-direct-queue-01";
private static final String EXCHANGE_NAME="nz1904-exchange-direct-01";
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtils.getConnection();
Channel channel = connection.createChannel();
//申明隊(duì)列
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
//申明交換機(jī)
channel.exchangeDeclare(EXCHANGE_NAME,"direct");
//綁定隊(duì)列到交換機(jī)
//第三個(gè)參數(shù):表示的是路由key
channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"xiaobobo");
//申明消費(fèi)者
DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
//這里就是接受消息的地方
System.out.println("路由key是xiaobobo的這個(gè)隊(duì)列接受到數(shù)據(jù):"+new String(body));
}
};
//綁定消費(fèi)者
channel.basicConsume(QUEUE_NAME,true,defaultConsumer);
}
}
(3)消費(fèi)者2寫法
public class Consumer2 {
private static final String QUEUE_NAME="nz1904-direct-queue-02";
private static final String EXCHANGE_NAME="nz1904-exchange-direct-01";
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtils.getConnection();
Channel channel = connection.createChannel();
//申明隊(duì)列
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
//申明交換機(jī)
channel.exchangeDeclare(EXCHANGE_NAME,"direct");
//綁定隊(duì)列到交換機(jī)
//第三個(gè)參數(shù):表示的是路由key
channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"xiaowangzi");
//申明消費(fèi)者
DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
//這里就是接受消息的地方
System.out.println("路由key是xiaowangzi的這個(gè)隊(duì)列接受到數(shù)據(jù):"+new String(body));
}
};
//綁定消費(fèi)者
channel.basicConsume(QUEUE_NAME,true,defaultConsumer);
}
}
1.5.6装盯、topic模式
說明:topic模式相當(dāng)于是對(duì) 路由模式的一個(gè)升級(jí) topic模式主要就是在匹配的規(guī)則上可以實(shí)現(xiàn)模糊匹配
(1)生產(chǎn)者的寫法
public class Producer {
private static final String EXCHANGE_NAME = "nz1904-exchange-topic-01";
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtils.getConnection();
Channel channel = connection.createChannel();
/**
* 如果玩的是路由模型 交換機(jī)的類型只能是 direct
*/
channel.exchangeDeclare(EXCHANGE_NAME, "topic");
//發(fā)送信息到交換機(jī)
for (int i = 0; i < 100; i++) {
//這個(gè)路由的key是可以隨便設(shè)置的
//topic在路由基礎(chǔ)上只有 路由的key發(fā)生改變 其余的都不變
channel.basicPublish(EXCHANGE_NAME, "xiaowangzi.xiaowangzi.xiaowangzi", null, ("路由模型的值:" + i).getBytes());
}
channel.close();
connection.close();
}
}
(2)消費(fèi)者1寫法
public class Cosnumer1 {
private static final String QUEUE_NAME="nz1904-topic-queue-01";
private static final String EXCHANGE_NAME="nz1904-exchange-topic-01";
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtils.getConnection();
Channel channel = connection.createChannel();
//申明隊(duì)列
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
//申明交換機(jī)
channel.exchangeDeclare(EXCHANGE_NAME,"topic");
//綁定隊(duì)列到交換機(jī)
//第三個(gè)參數(shù):表示的是路由key
/**
* 注意 * :只是代表一個(gè)單詞
* # :這個(gè)才代表 一個(gè)或者多個(gè)單詞
* 記住如果有多個(gè)單詞組成的路由key 那么多個(gè)單詞之間使用 . 好連接
*
*
*/
channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"xiaobobo.*");
//申明消費(fèi)者
DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
//這里就是接受消息的地方
System.out.println("路由key是xiaobobo的這個(gè)隊(duì)列接受到數(shù)據(jù):"+new String(body));
}
};
//綁定消費(fèi)者
channel.basicConsume(QUEUE_NAME,true,defaultConsumer);
}
}
(3)消費(fèi)者2寫法
public class Consumer2 {
private static final String QUEUE_NAME="nz1904-topic-queue-02";
private static final String EXCHANGE_NAME="nz1904-exchange-topic-01";
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtils.getConnection();
Channel channel = connection.createChannel();
//申明隊(duì)列
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
//申明交換機(jī)
channel.exchangeDeclare(EXCHANGE_NAME,"topic");
//綁定隊(duì)列到交換機(jī)
//第三個(gè)參數(shù):表示的是路由key
channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"xiaowangzi.#");
//申明消費(fèi)者
DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
//這里就是接受消息的地方
System.out.println("路由key是xiaowangzi的這個(gè)隊(duì)列接受到數(shù)據(jù):"+new String(body));
}
};
//綁定消費(fèi)者
channel.basicConsume(QUEUE_NAME,true,defaultConsumer);
}
}
備注:使用了交換機(jī)發(fā)送了數(shù)據(jù) 如果沒有消費(fèi)者的話那么這個(gè)數(shù)據(jù)會(huì)發(fā)生丟失 通過設(shè)置這樣的屬性來解決這個(gè)問題
channel.basicPublish(EXCHANGE_NAME, "xiaowangzi.xiaowangzi.xiaowangzi", MessageProperties.PERSISTENT_TEXT_PLAIN, ("路由模型的值:" + i).getBytes());
備注2:通道的問題
原本沒有通道我們也可以完成這個(gè)請(qǐng)求 RabbitMQ官方考慮到一個(gè)問題生產(chǎn)者 和 消費(fèi)者 實(shí)際上 Connection 引入這個(gè)通道這個(gè)概念 是為了降低TCP連接的這樣一個(gè)消耗 相當(dāng)于是為了 TCP的復(fù)用 還有一個(gè)目的 就是為了線程隱私 相當(dāng)于每一個(gè)線程都給你創(chuàng)建了一個(gè)通道
2、RabbitMQ中的一些高級(jí)屬性
2.1甲馋、參數(shù)的含義
channel.queueDeclare(QUEUE_NAME,true,false,true,null);
第二個(gè)參數(shù):如果是false 重啟之后 隊(duì)列都沒有了 數(shù)據(jù)也會(huì)丟失
第三個(gè)參數(shù):true:連接一旦關(guān)閉 那么就會(huì)刪除這個(gè)隊(duì)列
第四個(gè)參數(shù):就是最后一個(gè)消費(fèi)者 退出去之后 那么這個(gè)隊(duì)列是否自動(dòng)刪除
第五個(gè)參數(shù):講ttl隊(duì)列的時(shí)候要專門講
channel.basicPublish("",QUEUE_NAME,null,"我是小波波1111".getBytes());
第一個(gè)參數(shù) :交換機(jī)
第二個(gè)參數(shù):路由key
第三個(gè)參數(shù):設(shè)置的隊(duì)列的屬性
第四個(gè)參數(shù):值
2.2埂奈、confirm機(jī)制
問題:就是放到隊(duì)列中的消息 怎么保證一定就是成功的放入了隊(duì)列
引入了 confirm機(jī)制:這個(gè)機(jī)制的意思是 只要放消息到 queue是成功的那么隊(duì)列就一定會(huì)給咋們進(jìn)行反饋
public class Producer {
//申明隊(duì)列的名字
private static final String QUEUE_NAME="nz1904-confirm-01";
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
Connection connection = ConnectionUtils.getConnection();
Channel channel = connection.createChannel();
//第一步:開啟confirm消息確認(rèn)機(jī)制
channel.confirmSelect();
//我們就要對(duì)消息的可達(dá)性實(shí)施監(jiān)聽
//下面就是對(duì)消息的簽收情況進(jìn)行確認(rèn)
channel.addConfirmListener(new ConfirmListener() {
@Override
public void handleAck(long l, boolean b) throws IOException {
System.out.println("發(fā)送成功的監(jiān)聽.....");
}
@Override
public void handleNack(long l, boolean b) throws IOException {
System.out.println("發(fā)送失敗的監(jiān)聽.....");
}
});
channel.queueDeclare(QUEUE_NAME,false,false,true,null);
channel.basicPublish("",QUEUE_NAME,null,"我是小波波1111".getBytes());
}
}
2.3、return機(jī)制
場(chǎng)景:我們?cè)诎l(fā)送消息的是時(shí)候定躏、我們指定的交換機(jī)不存在 或者 指定的路由key不存在 這種時(shí)候我們就需要監(jiān)聽這種不可達(dá)的消息 我們的return機(jī)制就產(chǎn)生了
前提:當(dāng)前的隊(duì)列必須要有消費(fèi)者存在
//有一個(gè)參數(shù)需要設(shè)置
mandatory 如果設(shè)置為ture:就表示的是要監(jiān)聽不可達(dá)的消息 進(jìn)行處理
如果設(shè)置為false 那么隊(duì)列端會(huì)直接刪除這個(gè)消息
(1)生產(chǎn)者的編寫
public class Producer {
private static final String EXCHANGE_NAME="test_return_exchange1";
//是能路由的key
private static final String ROUTING_KEY="return.save";
//是不能路由的key
private static final String ROUTING_ERROR_KEY="abc.save";
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtils.getConnection();
Channel channel = connection.createChannel();
//添加監(jiān)聽
channel.addReturnListener(new ReturnListener() {
/**
*
* @param i:隊(duì)列響應(yīng)給瀏覽器的狀態(tài)碼
* @param s:表示的是狀態(tài)碼對(duì)應(yīng)的文本信息
* @param s1:交換機(jī)的名字
* @param s2:表示的是路由的key
* @param basicProperties:表示的是消息的屬性
* @param bytes:消息體的內(nèi)容
* @throws IOException
*/
@Override
public void handleReturn(int i, String s, String s1, String s2, AMQP.BasicProperties basicProperties, byte[] bytes) throws IOException {
System.out.println("監(jiān)聽到不可達(dá)的消息");
System.out.println("狀態(tài)碼:"+i+"---文本信息:"+s+"---交換機(jī)名字:"+s1+"----路由的key:s2");
System.out.println("監(jiān)聽到不可達(dá)的消息");
System.out.println("監(jiān)聽到不可達(dá)的消息");
System.out.println("監(jiān)聽到不可達(dá)的消息");
}
});
channel.basicPublish(EXCHANGE_NAME,ROUTING_ERROR_KEY,true,null,"這里是測(cè)試Return機(jī)制".getBytes());
}
(2)消費(fèi)者寫法
public class Consumer {
private static final String EXCHANGE_NAME="test_return_exchange1";
//是能路由的key
private static final String ROUTING_KEY="return.#";
//制定綁定的隊(duì)列
private static final String QUEUE_NAME="test_return_queue";
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtils.getConnection();
Channel channel = connection.createChannel();
//申明隊(duì)列
channel.queueDeclare(QUEUE_NAME,true,false,false,null);
//申明交換機(jī)
channel.exchangeDeclare(EXCHANGE_NAME,"topic");
//綁定
channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,ROUTING_KEY);
//申明消費(fèi)者
DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("收到這個(gè)消息了....");
}
};
//進(jìn)行消費(fèi)的綁定
channel.basicConsume(QUEUE_NAME,true,defaultConsumer);
}
}
2.3账磺、消費(fèi)端的限流問題
場(chǎng)景:消費(fèi)者死了 隊(duì)列里面一瞬間就就積累了上萬條數(shù)據(jù)、這個(gè)時(shí)候當(dāng)我們打開客戶端的時(shí)候痊远、瞬間就有巨量的信息給推送過來垮抗、但是我們的客戶端是沒有辦法同時(shí)處理這么多數(shù)據(jù)的 結(jié)果就是消費(fèi)者死了....
這種場(chǎng)景下我們就需要對(duì)消費(fèi)端進(jìn)行限流
(1)生產(chǎn)者的編寫
public class Producer {
//申明隊(duì)列的名字
private static final String QUEUE_NAME="nz1904-limit-01";
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
Connection connection = ConnectionUtils.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
for (int i = 0; i <100 ; i++) {
channel.basicPublish("",QUEUE_NAME, null,("我是小波波"+i).getBytes());
}
channel.close();
connection.close();
}
}
(2)消費(fèi)者1的編寫
public class Consumer {
//申明隊(duì)列的名字
private static final String QUEUE_NAME="nz1904-limit-01";
public static void main(String[] args) throws IOException, TimeoutException {
//獲取連接
Connection connection = ConnectionUtils.getConnection();
//創(chuàng)建通道
final Channel channel = connection.createChannel();
//申明隊(duì)列
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
//消費(fèi)者的申明
DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消費(fèi)者1接受到的消息是:"+new String(body));
//進(jìn)行手動(dòng)應(yīng)答
channel.basicAck(envelope.getDeliveryTag(),false);
}
};
//綁定這個(gè)消費(fèi)者
/**
* 第一個(gè)參數(shù):隊(duì)列的名字
* 第二個(gè)參數(shù):是否自動(dòng)應(yīng)答
* 第三個(gè)參數(shù):消費(fèi)者的申明
*/ channel.basicConsume(QUEUE_NAME,false,defaultConsumer);
}
}
(3)消費(fèi)者2寫法
public class Consumer1 {
//申明隊(duì)列的名字
private static final String QUEUE_NAME="nz1904-limit-01";
public static void main(String[] args) throws IOException, TimeoutException {
//獲取連接
Connection connection = ConnectionUtils.getConnection();
//創(chuàng)建通道
final Channel channel = connection.createChannel();
//申明隊(duì)列
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
//設(shè)置限流機(jī)制
/**
* 第一個(gè)參數(shù):消息本身的大小 如果設(shè)置為0 那么表示對(duì)消息本身的大小不限制
* 第二個(gè)參數(shù):告訴rabbitmq不要一次性給消費(fèi)者推送大于N個(gè)消息 你要推送的前提是
* 現(xiàn)在這N個(gè)消息 已經(jīng)手動(dòng)被確認(rèn) 已經(jīng)完成
* 第三個(gè)參數(shù):true/false :是否將上面的設(shè)置應(yīng)用于整個(gè)通道 true :表示整個(gè) 通道的消費(fèi)者都是這個(gè)策略 如果是false表示的是 只有當(dāng)前的consumer 是這個(gè)策略
*/
channel.basicQos(0,5,false);
//結(jié)論:實(shí)際上如果不設(shè)置的話 分配任務(wù)的事 一開始就分配好了
//必須手動(dòng)簽收
//消費(fèi)者的申明
DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消費(fèi)者2接受到的消息是:"+new String(body));
try {
Thread.sleep(200);
}catch (Exception err){
}
//進(jìn)行手動(dòng)應(yīng)答
channel.basicAck(envelope.getDeliveryTag(),false);
}
};
//綁定這個(gè)消費(fèi)者
/**
* 第一個(gè)參數(shù):隊(duì)列的名字
* 第二個(gè)參數(shù):是否自動(dòng)應(yīng)答
* 第三個(gè)參數(shù):消費(fèi)者的申明
*/
channel.basicConsume(QUEUE_NAME,false,defaultConsumer);
}
}
2.4、TTL隊(duì)列(Time To Live)
場(chǎng)景:我要下單 下單之后 在一定的時(shí)間內(nèi)碧聪、我的訂單如果沒有被處理 那么自動(dòng)失效
備注:簡(jiǎn)單的說就是咋們的隊(duì)列中的消息是有時(shí)間限制的冒版、如果超時(shí)那么這個(gè)消息將會(huì)被隊(duì)列給刪除
public class Producer {
//申明隊(duì)列的名字
private static final String QUEUE_NAME="nz1904-ttl-01";
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
Connection connection = ConnectionUtils.getConnection();
Channel channel = connection.createChannel();
//我們只需要給下面的隊(duì)列設(shè)置好屬性 那么這個(gè)隊(duì)列 就自動(dòng)變成 ttl隊(duì)列了
Map<String,Object> map=new HashMap<>();
map.put("x-message-ttl",5000);
channel.queueDeclare(QUEUE_NAME,false,false,false,map);
channel.basicPublish("",QUEUE_NAME, null,("我是小波波").getBytes());
channel.close();
connection.close();
}
}
2.5、死信隊(duì)列
什么是死信隊(duì)列
當(dāng)消息在隊(duì)列中編程死信之后逞姿、可以定義它重新push 到另外一個(gè)交換機(jī)上辞嗡、這個(gè)交換機(jī) 也有自己對(duì)應(yīng)的隊(duì)列 這個(gè)隊(duì)列就稱為死信隊(duì)列
死信:
發(fā)送到隊(duì)列中的消息被拒絕了
消息的ttl時(shí)間過期
隊(duì)列達(dá)到了最大長(zhǎng)度 再往里面放信息
在滿足上面死信的前提下 捆等、現(xiàn)在我們可以定義一個(gè)隊(duì)列 這個(gè)隊(duì)列專門用來
死信隊(duì)列也是一個(gè)正常的交換機(jī)、和一般的交換機(jī)沒有什么區(qū)別
當(dāng)這個(gè)隊(duì)列中如果有這個(gè)死信的時(shí)候续室、rabbitmq就會(huì)將這個(gè)消息自動(dòng)發(fā)送到我們提前定義好的死信隊(duì)列中去(簡(jiǎn)單的說就是路由到另外一個(gè)隊(duì)列)
(1)生產(chǎn)者的編寫
public class Producer {
//定義的是隊(duì)列(正常的交換機(jī)) 這里發(fā)送消息是在交換機(jī)上面
private static final String EXCHANGE_NAME="ttl-dlx-bobo-exchange";
//定義一個(gè)路由的key
private static final String ROUTING_KEY="dlx.#";
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtils.getConnection();
Channel channel = connection.createChannel();
for (int i = 0; i <5 ; i++) {
channel.basicPublish(EXCHANGE_NAME,ROUTING_KEY,false,null,("我是小波波"+i).getBytes());
}
}
}
(2)消費(fèi)者的編寫
public class Consumer {
//定義的是交換機(jī)
private static final String EXCHANGE_NAME="ttl-dlx-bobo-exchange";
//正常情況下的隊(duì)列
private static final String QUEUE_NAME="ttl-dlx-bobo-queue";
//定義死信隊(duì)列的交換機(jī)的名字
private static final String DLX_EXCHANGE_NAME="dlx-bobo-exchange";
//死信隊(duì)列的定義
private static final String DLX_QUEUE_NAME="dlx-bobo-queue";
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtils.getConnection();
final Channel channel = connection.createChannel();
//創(chuàng)建交換機(jī)和隊(duì)列進(jìn)行綁定
channel.exchangeDeclare(EXCHANGE_NAME,"topic",true);
//隊(duì)列的申明
//我們要申明成死信隊(duì)列
Map<String,Object> map=new HashMap<>();
map.put("x-message-ttl",5000);
//添加一個(gè)死信的屬性 //后面這個(gè)名字就是死信隊(duì)列交換機(jī)的名字
map.put("x-dead-letter-exchange",DLX_EXCHANGE_NAME);
channel.queueDeclare(QUEUE_NAME,true,false,false,map);
//進(jìn)行隊(duì)列和交換機(jī)進(jìn)行綁定
channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"dlx.#");
//上面是正常的隊(duì)列的申明
//下面就是死信隊(duì)列的申明
channel.exchangeDeclare(DLX_EXCHANGE_NAME,"topic");
//申明隊(duì)列
channel.queueDeclare(DLX_QUEUE_NAME,true,false,false,null);
//綁定這個(gè)死信隊(duì)列
channel.queueBind(DLX_QUEUE_NAME,DLX_EXCHANGE_NAME,"#");
//直接性的來調(diào)用這個(gè)
DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("獲取到數(shù)據(jù)了:"+new String(body));
}
};
//綁定消費(fèi)者
channel.basicConsume(DLX_QUEUE_NAME,true,defaultConsumer);
//現(xiàn)在有這個(gè)問題呀?
}
}
2.6栋烤、消費(fèi)者端手動(dòng)簽收和消息的重回隊(duì)列
場(chǎng)景:消費(fèi)者端接受到了咋們的隊(duì)列中的數(shù)據(jù),但是在進(jìn)行業(yè)務(wù)邏輯處理的時(shí)候猎贴、發(fā)現(xiàn)一個(gè)問題班缎、業(yè)務(wù)邏輯處理失敗了? 怎么辦?
手動(dòng)簽收應(yīng)答、我們也可以手動(dòng)拒絕她渴、然后讓消息重回隊(duì)列
(1)生產(chǎn)者的編寫
public class Producer {
//申明隊(duì)列的名字
private static final String QUEUE_NAME="nz1904-ack-02";
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
Connection connection = ConnectionUtils.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
channel.basicPublish("",QUEUE_NAME, null,"我是小波波1111".getBytes());
channel.close();
connection.close();
}
}
(2)消費(fèi)者的編寫
public class Consumer {
//申明隊(duì)列的名字
private static final String QUEUE_NAME="nz1904-ack-02";
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtils.getConnection();
final Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("接受到的消息是:"+new String(body));
/**第一個(gè)參數(shù):當(dāng)前消息的標(biāo)記
* 第二個(gè)參數(shù):是否批量進(jìn)行應(yīng)答
* 下面是簽收
*/
//channel.basicAck(envelope.getDeliveryTag(),false);
//下面也可以拒絕簽收
/**
* 第三個(gè)參數(shù):表示決絕簽收之后這個(gè)消息是否要重回隊(duì)列?
*/
channel.basicNack(envelope.getDeliveryTag(),false,true);
}
}; channel.basicConsume(QUEUE_NAME,false,defaultConsumer);
}
}
2.7达址、怎么保證消息的投遞一定是成功的(難)
(1)消息的延遲投遞來解決傳遞的可靠性
(2)日志消息表實(shí)現(xiàn)可靠消息的傳輸