每個(gè)Consumer可能需要一段時(shí)間才能處理完收到的數(shù)據(jù)凡桥。如果在這個(gè)過程中,Consumer出錯(cuò)了喷好,異常退出了翔横,而數(shù)據(jù)還沒有處理完成,那么非常不幸绒窑,這段數(shù)據(jù)就丟失了棕孙。
因?yàn)槲覀儾捎胣o-ack的方式進(jìn)行確認(rèn),也就是說些膨,每次Consumer接到數(shù)據(jù)后蟀俊,而不管是否處理完 成,RabbitMQ Server會(huì)立即把這個(gè)Message標(biāo)記為完成订雾,然后從queue中刪除了肢预。
如果一個(gè)Consumer異常退出了,它處理的數(shù)據(jù)能夠被另外的Consumer處理洼哎,這樣數(shù)據(jù)在這種情況下就不會(huì)丟失了(注意是這種情況下)烫映。
為了保證數(shù)據(jù)不被丟失沼本,RabbitMQ支持消息確認(rèn)機(jī)制,即acknowledgments锭沟。為了保證數(shù)據(jù)能被正確處理而不僅僅是被Consumer收到抽兆,那么我們不能采用no-ack。而應(yīng)該是在處理完數(shù)據(jù)后發(fā)送ack族淮。即手動(dòng)ACK辫红。
在處理數(shù)據(jù)后發(fā)送的ack,就是告訴RabbitMQ數(shù)據(jù)已經(jīng)被接收祝辣,處理完成贴妻,RabbitMQ可以去安全的刪除它了。
如果Consumer退出了但是沒有發(fā)送ack蝙斜,那么RabbitMQ就會(huì)把這個(gè)Message發(fā)送到下一個(gè)Consumer名惩。這樣就保證了在Consumer異常退出的情況下數(shù)據(jù)也不會(huì)丟失。
這里并沒有用到超時(shí)機(jī)制孕荠。RabbitMQ僅僅通過Consumer的連接中斷來確認(rèn)該Message并沒有被正確處理娩鹉。也就是說,RabbitMQ給了Consumer足夠長的時(shí)間來做數(shù)據(jù)處理稚伍。
這樣即使你通過Ctr-C中斷了Recieve.cs底循,那么Message也不會(huì)丟失了,它會(huì)被分發(fā)到下一個(gè)Consumer槐瑞。如果忘記了ack熙涤,那么后果很嚴(yán)重。當(dāng)Consumer退出時(shí)困檩,Message會(huì)重新分發(fā)祠挫。然后RabbitMQ會(huì)占用越來越多的內(nèi)存,由于 RabbitMQ會(huì)長時(shí)間運(yùn)行悼沿,因此這個(gè)“內(nèi)存泄漏”是致命的等舔。
你的 consumer 代碼必須能夠處理各種異常,確保只要收到一條消息糟趾,最終一定能夠執(zhí)行一條 ACK / NACK
去調(diào)試這種錯(cuò)誤慌植,可以通過以下命令打印un-acked Messages.
如果連接沒有斷開應(yīng)用要通知服務(wù)器讓消息重新發(fā)送:
可以通過channel.nack(message)來讓不通過的消息再次進(jìn)入消息隊(duì)列。
if(body==’Hello World3!’){chnl.nack(msg);
//這樣就可以讓這個(gè)消息再次進(jìn)入隊(duì)列而不用重啟服務(wù)义郑。
}else{
console.log(‘a(chǎn)ck’);
chnl.ack(msg);
}
消費(fèi)端的手工ACK與NACK
當(dāng)我們?cè)O(shè)置 autoACK=false 時(shí)蝶柿,就可以使用手工ACK方式了,那么其實(shí)手工方式包括了手工ACK與NACK非驮。
當(dāng)我們手工 ACK 時(shí)交汤,會(huì)發(fā)送給Broker一個(gè)應(yīng)答,代表消息成功處理了劫笙,Broker就可以回送響應(yīng)給生產(chǎn)端了芙扎。NACK 則表示消息處理失敗了星岗,如果設(shè)置重回隊(duì)列,Broker端就會(huì)將沒有成功處理的消息重新發(fā)送戒洼。
使用方式
消費(fèi)端進(jìn)行消費(fèi)的時(shí)候俏橘,如果由于業(yè)務(wù)異常我們可以手工 NACK 并進(jìn)行日志的記錄,然后進(jìn)行補(bǔ)償圈浇!
方法:void basicNack(long deliveryTag, boolean multiple, boolean requeue)
requeue為true敷矫,表示deliveryTag=n之前未確認(rèn)的消息都處理失敗且將這些消息重新放回隊(duì)列中。
requeue為false,表示deliveryTag=n之前未確認(rèn)的消息都處理失敗且將這些消息直接丟棄汉额。
如果由于服務(wù)器宕機(jī)等嚴(yán)重問題,那我們就需要手工進(jìn)行 ACK 保障消費(fèi)端消費(fèi)成功榨汤!
方法:void basicAck(long deliveryTag, boolean multiple)
消費(fèi)端的重回隊(duì)列
- 消費(fèi)端重回隊(duì)列是為了對(duì)沒有處理成功的消息蠕搜,把消息重新會(huì)遞給Broker!
- 重回隊(duì)列收壕,會(huì)把消費(fèi)失敗的消息重新添加到隊(duì)列的尾端妓灌,供消費(fèi)者繼續(xù)消費(fèi)。
- 一般我們?cè)趯?shí)際應(yīng)用中蜜宪,都會(huì)關(guān)閉重回隊(duì)列虫埂,也就是設(shè)置為false
演示重回隊(duì)列
生產(chǎn)端
對(duì)消息設(shè)置自定義屬性以便進(jìn)行區(qū)分
public class Producer {
public static void main(String[] args) throws Exception {
//1 創(chuàng)建ConnectionFactorys
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.11.76");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
//2 獲取Connection
Connection connection = connectionFactory.newConnection();
//3 通過Connection創(chuàng)建一個(gè)新的Channel
Channel channel = connection.createChannel();
String exchange = "test_ack_exchange";
String routingKey = "ack.save";
for(int i =0; i<5; i ++){
//設(shè)置消息屬性
Map<String, Object> headers = new HashMap<String, Object>();
headers.put("num", i);
AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
.deliveryMode(2)
.contentEncoding("UTF-8")
.headers(headers)
.build();
//發(fā)送消息
String msg = "Hello RabbitMQ ACK Message " + i;
channel.basicPublish(exchange, routingKey, true, properties, msg.getBytes());
}
}
}
自定義消費(fèi)者
對(duì)第一條消息進(jìn)行NACK,并設(shè)置重回隊(duì)列
public class MyConsumer extends DefaultConsumer {
private Channel channel ;
public MyConsumer(Channel channel) {
super(channel);
this.channel = channel;
}
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.err.println("-----------consume message----------");
System.err.println("body: " + new String(body));
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
if((Integer)properties.getHeaders().get("num") == 0) {
//NACK圃验,參數(shù)三requeue:是否重回隊(duì)列
channel.basicNack(envelope.getDeliveryTag(), false, true);
} else {
channel.basicAck(envelope.getDeliveryTag(), false);
}
}
}
消費(fèi)端
關(guān)閉自動(dòng)簽收功能
public class Consumer {
public static void main(String[] args) throws Exception {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.11.76");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
String exchangeName = "test_ack_exchange";
String queueName = "test_ack_queue";
String routingKey = "ack.#";
//聲明交換機(jī)和隊(duì)列掉伏,然后進(jìn)行綁定設(shè)置路由Key
channel.exchangeDeclare(exchangeName, "topic", true, false, null);
channel.queueDeclare(queueName, true, false, false, null);
channel.queueBind(queueName, exchangeName, routingKey);
//手工簽收 必須要設(shè)置 autoAck = false
channel.basicConsume(queueName, false, new MyConsumer(channel));
}
}
運(yùn)行說明
先啟動(dòng)消費(fèi)端,然后啟動(dòng)生產(chǎn)端澳窑,消費(fèi)端打印如下斧散,顯然第一條消息由于我們調(diào)用了NACK,并且設(shè)置了重回隊(duì)列摊聋,所以會(huì)導(dǎo)致該條消息一直重復(fù)發(fā)送鸡捐,消費(fèi)端就會(huì)一直循環(huán)消費(fèi)
-----------consume message---------- body: Hello RabbitMQ ACK Message 0
-----------consume message---------- body: Hello RabbitMQ ACK Message 1
-----------consume message---------- body: Hello RabbitMQ ACK Message 2
-----------consume message---------- body: Hello RabbitMQ ACK Message 3
-----------consume message---------- body: Hello RabbitMQ ACK Message 4
-----------consume message---------- body: Hello RabbitMQ ACK Message 0
-----------consume message---------- body: Hello RabbitMQ ACK Message 0
-----------consume message----------
一般工作中不會(huì)設(shè)置重回隊(duì)列這個(gè)屬性,我們都是自己去做補(bǔ)償或者投遞到延遲隊(duì)列里的麻裁,然后指定時(shí)間去處理即可箍镜。