前言
上一篇消息中間件——RabbitMQ(七)高級(jí)特性全在這里!(上)中我們介紹了消息如何保障100%的投遞成功?
,冪等性概念詳解
,在海量訂單產(chǎn)生的業(yè)務(wù)高峰期懂更,如何避免消息的重復(fù)消費(fèi)的問題荤堪?
,Confirm確認(rèn)消息、Return返回消息
本冲。這篇我們來介紹下下面內(nèi)容撤缴。
- 自定義消費(fèi)者
- 消息的限流(防止占用內(nèi)存過多,節(jié)點(diǎn)宕機(jī))
- 消息的ACK與重回隊(duì)列
- TTL消息
- 死信隊(duì)列
1. 自定義消費(fèi)者
1.1 消費(fèi)端自定義監(jiān)聽
我們一般就在代碼中編寫while循環(huán)令蛉,進(jìn)行consumer.nextDelivery方法進(jìn)行獲取下一條消息聚霜,然后進(jìn)行消費(fèi)處理!
但是這種輪訓(xùn)的方式肯定是不好的,代碼也比較low蝎宇。
- 我們使用自定義的Consumer更加的方便弟劲,解耦性更加的強(qiáng),也是在實(shí)際工作中最常見的使用方式姥芥!
1.2 代碼演示
1.2.1 生產(chǎn)者
/**
*
* @ClassName: Producer
* @Description: 生產(chǎn)者
* @author Coder編程
* @date2019年7月30日 下午23:15:51
*
*/
public class Producer {
public static void main(String[] args) throws Exception {
//1 創(chuàng)建ConnectionFactory
Connection connection = ConnectionUtils.getConnection();
Channel channel = connection.createChannel();
String exchange = "test_consumer_exchange";
String routingKey = "consumer.save";
String msg = "Hello RabbitMQ Consumer Message";
for(int i =0; i<5; i ++){
channel.basicPublish(exchange, routingKey, true, null, msg.getBytes());
}
}
}
1.2.2 消費(fèi)者
/**
*
* @ClassName: Consumer
* @Description: 消費(fèi)者
* @author Coder編程
* @date2019年7月30日 下午23:13:51
*
*/
public class Consumer {
public static void main(String[] args) throws Exception {
// 創(chuàng)建ConnectionFactory
Connection connection = ConnectionUtils.getConnection();
Channel channel = connection.createChannel();
String exchangeName = "test_consumer_exchange";
String routingKey = "consumer.#";
String queueName = "test_consumer_queue";
channel.exchangeDeclare(exchangeName, "topic", true, false, null);
channel.queueDeclare(queueName, true, false, false, null);
channel.queueBind(queueName, exchangeName, routingKey);
//實(shí)現(xiàn)自己的MyConsumer()
channel.basicConsume(queueName, true, new MyConsumer(channel));
}
}
1.2.3 自定義類:MyConsumer
/**
*
* @ClassName: MyConsumer
* @Description: TODO
* @author Coder編程
* @date 2019年7月30日 下午23:11:55
*
*/
public class MyConsumer extends DefaultConsumer {
public MyConsumer(Channel channel) {
super(channel);
}
//根據(jù)需求函卒,重寫自己需要的方法。
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.err.println("-----------consume message----------");
//消費(fèi)標(biāo)簽
System.err.println("consumerTag: " + consumerTag);
//這個(gè)對(duì)象包含許多關(guān)鍵信息
System.err.println("envelope: " + envelope);
System.err.println("properties: " + properties);
System.err.println("body: " + new String(body));
}
}
1.3 打印結(jié)果
2. 消費(fèi)端限流
2.1 什么是消費(fèi)端的限流撇眯?
- 假設(shè)一個(gè)場(chǎng)景报嵌,首先,我們Rabbitmq服務(wù)器有上萬(wàn)條未處理的消息熊榛,我們隨便打開一個(gè)消費(fèi)者客戶端锚国,會(huì)出現(xiàn)下面情況:
- 巨量的消息瞬間全部推送過來,但是我們單個(gè)客戶端無法同時(shí)處理這么多數(shù)據(jù)玄坦!這個(gè)時(shí)候很容易導(dǎo)致服務(wù)器崩潰血筑,出現(xiàn)故障。
為什么不在生產(chǎn)端進(jìn)行限流呢煎楣?
因?yàn)樵诟卟l(fā)的情況下豺总,客戶量就是非常大,所以很難在生產(chǎn)端做限制择懂。因此我們可以用MQ在消費(fèi)端做限流喻喳。
- RabbitMQ提供了一種qos(服務(wù)質(zhì)量保證)功能,即在非自動(dòng)確認(rèn)消息的前提下困曙,如果一定數(shù)目的消息(通過基于consume或者channel設(shè)置Qos的值)未被確認(rèn)前表伦,不進(jìn)行消費(fèi)新的消息。
在限流的情況下慷丽,千萬(wàn)不要設(shè)置自動(dòng)簽收蹦哼,要設(shè)置為手動(dòng)簽收。 - void BasicQos(uint prfetchSize,ushort prefetchCount,bool global);
參數(shù)解釋:
prefetchSize:0
prefetchCount:會(huì)告訴RabbitMQ不要同時(shí)給一個(gè)消費(fèi)者推送多于N個(gè)消息要糊,即一旦有N個(gè)消息還沒有ack纲熏,則該consumer將block掉,直到有消息ack锄俄。
global: true\false 是否將上面設(shè)置應(yīng)用于channel局劲,簡(jiǎn)單點(diǎn)說,就是上面限制是channel級(jí)別還是consumer級(jí)別珊膜。
prefetchSize和global這兩項(xiàng)容握,rabbitmq沒有實(shí)現(xiàn)宣脉,暫且不研究prefetch_count在no_ask = false的情況下生效车柠,即在自動(dòng)應(yīng)答的情況下這兩個(gè)值是不生效的。
第一個(gè)參數(shù):消息的限制大小,消息多少兆竹祷。一般不做限制谈跛,設(shè)置為0
第二個(gè)參數(shù):一次最多處理多少條,實(shí)際工作中設(shè)置為1就好
第三個(gè)參數(shù):限流策略在什么上應(yīng)用塑陵。在RabbitMQ一般有兩個(gè)應(yīng)用級(jí)別:1.通道 2.Consumer級(jí)別感憾。一般設(shè)置為false,true 表示channel級(jí)別令花,false表示在consumer級(jí)別
2.2 代碼演示
2.2.1 生產(chǎn)者
/**
*
* @ClassName: Producer
* @Description: 生產(chǎn)者
* @author Coder編程
* @date2019年7月30日 下午23:15:51
*
*/
public class Producer {
public static void main(String[] args) throws Exception {
//1 創(chuàng)建ConnectionFactory
Connection connection = ConnectionUtils.getConnection();
Channel channel = connection.createChannel();
String exchange = "test_qos_exchange";
String routingKey = "qos.save";
String msg = "Hello RabbitMQ QOS Message";
for(int i =0; i<5; i ++){
channel.basicPublish(exchange, routingKey, true, null, msg.getBytes());
}
}
}
2.2.2 消費(fèi)者
/**
*
* @ClassName: Consumer
* @Description: 消費(fèi)者
* @author Coder編程
* @date2019年7月30日 下午23:13:51
*
*/
public class Consumer {
public static void main(String[] args) throws Exception {
//1 創(chuàng)建ConnectionFactory
Connection connection = ConnectionUtils.getConnection();
Channel channel = connection.createChannel();
String exchangeName = "test_qos_exchange";
String queueName = "test_qos_queue";
String routingKey = "qos.#";
channel.exchangeDeclare(exchangeName, "topic", true, false, null);
channel.queueDeclare(queueName, true, false, false, null);
channel.queueBind(queueName, exchangeName, routingKey);
//1 限流方式 第一件事就是 autoAck設(shè)置為 false
//設(shè)置為1阻桅,表示一條一條數(shù)據(jù)處理
channel.basicQos(0, 1, false);
channel.basicConsume(queueName, false, new MyConsumer(channel));
}
}
2.2.3 自定義類:MyConsumer
/**
*
* @ClassName: MyConsumer
* @Description: TODO
* @author Coder編程
* @date 2019年7月30日 下午23:11:55
*
*/
public class MyConsumer extends DefaultConsumer {
private Channel channel ;
public MyConsumer(Channel channel) {
super(channel);
this.channel = channel;
}
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.err.println("-----------consume message----------");
System.err.println("consumerTag: " + consumerTag);
System.err.println("envelope: " + envelope);
System.err.println("properties: " + properties);
System.err.println("body: " + new String(body));
//需要做簽收,false表示不支持批量簽收
channel.basicAck(envelope.getDeliveryTag(), false);
}
}
2.2.4 測(cè)試結(jié)果
我們先注釋掉:channel.basicAck(envelope.getDeliveryTag(), false);然后啟動(dòng)Consumer。
查看Exchange
查看Queues
然后再啟動(dòng)Producer兼都。查看打印結(jié)果:
我們會(huì)發(fā)現(xiàn)消費(fèi)端嫂沉,只收到了一條消息。這是為什么呢扮碧?
第一點(diǎn)因?yàn)槲覀冊(cè)赾onsumer中
channel.basicConsume(queueName, false, new MyConsumer(channel));
第二個(gè)參數(shù)設(shè)置為false為手動(dòng)簽收趟章。
第二點(diǎn)在qos中設(shè)置只接受一條消息。如果這一條消息不給Broker Ack應(yīng)答的話慎王,那么Broker會(huì)認(rèn)為你并沒有消費(fèi)完這一條消息蚓土,那么就不會(huì)繼續(xù)發(fā)送消息。
channel.basicQos(0, 1, false);
可以看下管控臺(tái)赖淤,unack=1蜀漆,Ready=4,total=5.
接下來我們放開注釋channel.basicAck(envelope.getDeliveryTag(), false); 進(jìn)行消息簽收咱旱。重啟服務(wù)嗜愈。
3.1 打印結(jié)果
可以看到正常打印五條結(jié)果
4. 消費(fèi)端ACK與重回隊(duì)列
4.1 消費(fèi)端的手工ACK和NACK
消費(fèi)端進(jìn)行消費(fèi)的時(shí)候,如果由于業(yè)務(wù)異常我們可以進(jìn)行日志的記錄莽龟,然后進(jìn)行補(bǔ)償蠕嫁!
如果由于服務(wù)器宕機(jī)等嚴(yán)重問題,那我們就需要手工進(jìn)行ACK保障消費(fèi)端消費(fèi)成功毯盈!
4.2 消費(fèi)端的重回隊(duì)列
消費(fèi)端重回隊(duì)列是為了對(duì)沒有處理成功的消息剃毒,把消息重新傳遞給Broker!
一般我們?cè)趯?shí)際應(yīng)用中,都會(huì)關(guān)閉重回隊(duì)列搂赋,也就是設(shè)置為False.
4.3 代碼演示
4.3.1 生產(chǎn)者
/**
*
* @ClassName: Producer
* @Description: 生產(chǎn)者
* @author Coder編程
* @date2019年7月30日 下午23:15:51
*
*/
public class Producer {
public static void main(String[] args) throws Exception {
//1創(chuàng)建ConnectionFactory
Connection connection = ConnectionUtils.getConnection();
Channel channel = connection.createChannel();
String exchange = "test_ack_exchange";
String routingKey = "ack.save";
for(int i =0; i<5; i ++){
Map<String, Object> headers = new HashMap<String, Object>();
headers.put("num", i);
//添加屬性赘阀,后續(xù)會(huì)使用到
AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
.deliveryMode(2) //投遞模式,持久化
.contentEncoding("UTF-8")
.headers(headers)
.build();
String msg = "Hello RabbitMQ ACK Message " + i;
channel.basicPublish(exchange, routingKey, true, properties, msg.getBytes());
}
}
}
4.3.2 消費(fèi)者
/**
*
* @ClassName: Consumer
* @Description: 消費(fèi)者
* @author Coder編程
* @date2019年7月30日 下午23:13:51
*
*/
public class Consumer {
public static void main(String[] args) throws Exception {
//1創(chuàng)建ConnectionFactory
Connection connection = ConnectionUtils.getConnection();
Channel channel = connection.createChannel();
String exchangeName = "test_ack_exchange";
String queueName = "test_ack_queue";
String routingKey = "ack.#";
channel.exchangeDeclare(exchangeName, "topic", true, false, null);
channel.queueDeclare(queueName, true, false, false, null);
channel.queueBind(queueName, exchangeName, routingKey);
// 手工簽收 必須要關(guān)閉 autoAck = false
channel.basicConsume(queueName, false, new MyConsumer(channel));
}
}
4.3.3 自定義類:MyConsumer
/**
*
* @ClassName: MyConsumer
* @Description: TODO
* @author Coder編程
* @date 2019年7月30日 下午23:11:55
*
*/
public class MyConsumer extends DefaultConsumer {
private Channel channel ;
public MyConsumer(Channel channel) {
super(channel);
this.channel = channel;
}
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.err.println("-----------consume message----------");
System.err.println("body: " + new String(body));
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
if((Integer)properties.getHeaders().get("num") == 0) {
//Nack三個(gè)參數(shù) 第二個(gè)參數(shù):是否是批量脑奠,第三個(gè)參數(shù):是否重回隊(duì)列(需要注意可能會(huì)發(fā)生重復(fù)消費(fèi)基公,造成死循環(huán))
channel.basicNack(envelope.getDeliveryTag(), false, true);
} else {
channel.basicAck(envelope.getDeliveryTag(), false);
}
}
}
5.1 打印結(jié)果:
注意:
可以看到重回隊(duì)列會(huì)出現(xiàn)重復(fù)消費(fèi)導(dǎo)致死循環(huán)的問題,這時(shí)候最好設(shè)置重試次數(shù)宋欺,比如超過三次后轰豆,消息還是消費(fèi)失敗胰伍,就將消息丟棄。
6. TTL隊(duì)列/消息
6.1 TTL
- TTL是Time To Live的縮寫酸休,也就是生存時(shí)間
- RabbitMQ支持消息的過期時(shí)間骂租,在消息發(fā)送時(shí)可以進(jìn)行指定
- RabbitMQ支持隊(duì)列的過期時(shí)間,從消息入隊(duì)列開始計(jì)算斑司,只要超過了隊(duì)列的超時(shí)時(shí)間配置渗饮,那么消息會(huì)自動(dòng)的清除
6.2 代碼演示
6.2.1 直接通過管控臺(tái)進(jìn)行演示
通過管控臺(tái)創(chuàng)建一個(gè)隊(duì)列
x-max-length 隊(duì)列的最大大小
x-message-ttl 設(shè)置10秒鐘,如果消息還沒有被消費(fèi)的話宿刮,就會(huì)被清除互站。
添加exchange
Queue與Exchange進(jìn)行綁定
點(diǎn)擊 test_ttl_exchange 進(jìn)行綁定
通過管控臺(tái)發(fā)送消息
生產(chǎn)端設(shè)置過期時(shí)間
AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
.deliveryMode(2)
.contentEncoding("UTF-8")
.expiration("10000")
.headers(headers)
.build();
這兩個(gè)屬性并不相同,一個(gè)對(duì)應(yīng)的是消息體僵缺,一個(gè)對(duì)應(yīng)的是隊(duì)列的過期云茸。
7. 死信隊(duì)列
7.1 概念理解
死信隊(duì)列:DLX,Dead-Letter-Exchange
RabbitMQ的死信隊(duì)里與Exchange息息相關(guān)
- 利用DLX,當(dāng)消息在一個(gè)隊(duì)列中變成死信(dead message)之后,它能被重新publish到另一個(gè)Exchange谤饭,這個(gè)Exchange就是DLX
消息變成死信有以下幾種情況
- 消息被拒絕(basic.reject/basic.nack)并且requeue=false
- 消息TTL過期
- 隊(duì)列達(dá)到最大長(zhǎng)度
DLX也是一個(gè)正常的Exchange标捺,和一般的Exchange沒有區(qū)別,它能在任何的隊(duì)列上被指定揉抵,實(shí)際上就是設(shè)置某個(gè)隊(duì)列的屬性
當(dāng)這個(gè)隊(duì)列中有死信時(shí)亡容,RabbitMQ就會(huì)自動(dòng)的將這個(gè)消息重新發(fā)布到設(shè)置的Exchange上去,進(jìn)而被路由到另一個(gè)隊(duì)列冤今。
可以監(jiān)聽這個(gè)隊(duì)列中消息做相應(yīng)的處理闺兢,這個(gè)特征可以彌補(bǔ)RabbitMQ3.0以前支持的immediate參數(shù)的功能。
7.2 代碼演示
- 死信隊(duì)列設(shè)置:
- 首先需要設(shè)置死信隊(duì)列的exchange和queue,然后進(jìn)行綁定:
Exchange:dlx.exchange
Queue:dlx.queue
RoutingKey:# - 然后我們進(jìn)行正常聲明交換機(jī)戏罢、隊(duì)列屋谭、綁定,只不過我們需要在隊(duì)列加上一個(gè)參數(shù)即可:arguments.put("x-dead-letter-exchange","dlx.exchange");
- 這樣消息在過期龟糕、requeue桐磁、隊(duì)列在達(dá)到最大長(zhǎng)度時(shí),消息就可以直接路由到死信隊(duì)列讲岁!
7.2.1 生產(chǎn)者
/**
*
* @ClassName: Producer
* @Description: 生產(chǎn)者
* @author Coder編程
* @date2019年7月30日 下午23:15:51
*
*/
public class Producer {
public static void main(String[] args) throws Exception {
//創(chuàng)建ConnectionFactory
Connection connection = ConnectionUtils.getConnection();
Channel channel = connection.createChannel();
String exchange = "test_dlx_exchange";
String routingKey = "dlx.save";
String msg = "Hello RabbitMQ DLX Message";
for(int i =0; i<1; i ++){
AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
.deliveryMode(2)
.contentEncoding("UTF-8")
.expiration("10000")
.build();
channel.basicPublish(exchange, routingKey, true, properties, msg.getBytes());
}
}
}
7.2.2 消費(fèi)者
/**
*
* @ClassName: Consumer
* @Description: 消費(fèi)者
* @author Coder編程
* @date2019年7月30日 下午23:13:51
*
*/
public class Consumer {
public static void main(String[] args) throws Exception {
//創(chuàng)建ConnectionFactory
Connection connection = ConnectionUtils.getConnection();
Channel channel = connection.createChannel();
// 這就是一個(gè)普通的交換機(jī) 和 隊(duì)列 以及路由
String exchangeName = "test_dlx_exchange";
String routingKey = "dlx.#";
String queueName = "test_dlx_queue";
channel.exchangeDeclare(exchangeName, "topic", true, false, null);
Map<String, Object> agruments = new HashMap<String, Object>();
agruments.put("x-dead-letter-exchange", "dlx.exchange");
//這個(gè)agruments屬性我擂,要設(shè)置到聲明隊(duì)列上
channel.queueDeclare(queueName, true, false, false, agruments);
channel.queueBind(queueName, exchangeName, routingKey);
//要進(jìn)行死信隊(duì)列的聲明:
channel.exchangeDeclare("dlx.exchange", "topic", true, false, null);
channel.queueDeclare("dlx.queue", true, false, false, null);
channel.queueBind("dlx.queue", "dlx.exchange", "#");
channel.basicConsume(queueName, true, new MyConsumer(channel));
}
}
7.2.3 自定義類:MyConsumer
/**
*
* @ClassName: MyConsumer
* @Description: TODO
* @author Coder編程
* @date 2019年7月30日 下午23:11:55
*
*/
public class MyConsumer extends DefaultConsumer {
public MyConsumer(Channel channel) {
super(channel);
}
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.err.println("-----------consume message----------");
System.err.println("consumerTag: " + consumerTag);
System.err.println("envelope: " + envelope);
System.err.println("properties: " + properties);
System.err.println("body: " + new String(body));
}
}
7.2.4 測(cè)試結(jié)果
運(yùn)行Consumer,查看管控臺(tái)
查看Exchanges
查看queue
可以看到test_dlx_queue多了DLX的標(biāo)識(shí)缓艳,表示當(dāng)隊(duì)列中出現(xiàn)死信的時(shí)候校摩,會(huì)將消息發(fā)送到死信隊(duì)列dlx_queue中
關(guān)閉Consumer,只運(yùn)行Producer
過10秒鐘后阶淘,消息過期
在我們工作中衙吩,死信隊(duì)列非常重要,用于消息沒有消費(fèi)者溪窒,處于死信狀態(tài)坤塞。我們可以才用補(bǔ)償機(jī)制冯勉。
小結(jié)
本次主要介紹了RabbitMQ的高級(jí)特性,首先介紹了互聯(lián)網(wǎng)大廠在實(shí)際使用中如何保障100%的消息投遞成功和冪等性的尺锚,以及對(duì)RabbitMQ的確認(rèn)消息珠闰、返回消息惜浅、ACK與重回隊(duì)列瘫辩、消息的限流,以及對(duì)超時(shí)時(shí)間坛悉、死信隊(duì)列的使用
文末
歡迎關(guān)注個(gè)人微信公眾號(hào):Coder編程
獲取最新原創(chuàng)技術(shù)文章和免費(fèi)學(xué)習(xí)資料伐厌,更有大量精品思維導(dǎo)圖、面試資料裸影、PMP備考資料等你來領(lǐng)挣轨,方便你隨時(shí)隨地學(xué)習(xí)技術(shù)知識(shí)!
新建了一個(gè)qq群:315211365轩猩,歡迎大家進(jìn)群交流一起學(xué)習(xí)卷扮。謝謝了!也可以介紹給身邊有需要的朋友均践。
文章收錄至
Github: https://github.com/CoderMerlin/coder-programming
Gitee: https://gitee.com/573059382/coder-programming
歡迎關(guān)注并star~
參考文章:
《RabbitMQ消息中間件精講》
推薦文章:
消息中間件——RabbitMQ(五)快速入門生產(chǎn)者與消費(fèi)者晤锹,SpringBoot整合RabbitMQ!