1. 本篇概要
其實(shí)乍炉,還有1種場(chǎng)景需要考慮:當(dāng)消費(fèi)者接收到消息后绢片,還沒(méi)處理完業(yè)務(wù)邏輯,消費(fèi)者掛掉了岛琼,那消息也算丟失了底循?,比如用戶下單槐瑞,訂單中心發(fā)送了1個(gè)消息到RabbitMQ里的隊(duì)列熙涤,積分中心收到這個(gè)消息,準(zhǔn)備給這個(gè)下單的用戶增加20積分困檩,但積分還沒(méi)增加成功呢祠挫,積分中心自己掛掉了,導(dǎo)致數(shù)據(jù)出現(xiàn)問(wèn)題窗看。
那么如何解決這種問(wèn)題呢茸歧?
為了保證消息被消費(fèi)者成功的消費(fèi),RabbitMQ提供了消息確認(rèn)機(jī)制(message acknowledgement)显沈,本文主要講解RabbitMQ中软瞎,如何使用消息確認(rèn)機(jī)制來(lái)保證消息被消費(fèi)者成功的消費(fèi),避免因?yàn)橄M(fèi)者突然宕機(jī)而引起的消息丟失拉讯。
2. 開啟顯式Ack模式
我們開啟一個(gè)消費(fèi)者的代碼是這樣的:
// 創(chuàng)建隊(duì)列消費(fèi)者
com.rabbitmq.client.Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println("Received Message '" + message + "'");
}
};
channel.basicConsume(QUEUE_NAME, true, consumer);
這里的重點(diǎn)是channel.basicConsume(QUEUE_NAME, true, consumer);
方法的第2個(gè)參數(shù)涤浇,讓我們先看下basicConsume()的源碼:
public String basicConsume(String queue, boolean autoAck, Consumer callback) throws IOException {
return this.basicConsume(queue, autoAck, "", callback);
}
這里的autoAck參數(shù)指的是是否自動(dòng)確認(rèn),如果設(shè)置為ture魔慷,RabbitMQ會(huì)自動(dòng)把發(fā)送出去的消息置為確認(rèn)只锭,然后從內(nèi)存(或者磁盤)中刪除,而不管消費(fèi)者接收到消息是否處理成功院尔;如果設(shè)置為false蜻展,RabbitMQ會(huì)等待消費(fèi)者顯式的回復(fù)確認(rèn)信號(hào)后才會(huì)從內(nèi)存(或者磁盤)中刪除。
建議將autoAck設(shè)置為false邀摆,這樣消費(fèi)者就有足夠的時(shí)間處理消息纵顾,不用擔(dān)心處理消息過(guò)程中消費(fèi)者宕機(jī)造成消息丟失。
此時(shí)栋盹,隊(duì)列里的消息就分成了2個(gè)部分:
- 等待投遞給消費(fèi)者的消息(下圖中的Ready部分)
- 已經(jīng)投遞給消費(fèi)者施逾,但是還沒(méi)有收到消費(fèi)者確認(rèn)信號(hào)的消息(下圖中的Unacked部分)
如果RabbitMQ一直沒(méi)有收到消費(fèi)者的確認(rèn)信號(hào)例获,并且消費(fèi)此消息的消費(fèi)者已經(jīng)斷開連接汉额,則RabbitMQ會(huì)安排該消息重新進(jìn)入隊(duì)列,等待投遞給下一個(gè)消費(fèi)者榨汤,當(dāng)然也有可能還是原來(lái)的那個(gè)消費(fèi)者蠕搜。
RabbitMQ不會(huì)為未確認(rèn)的消息設(shè)置過(guò)期時(shí)間,它判斷此消息是否需要重新投遞給消費(fèi)者的唯一依據(jù)是消費(fèi)該消息的消費(fèi)者連接是否已經(jīng)斷開件余,這么設(shè)計(jì)的原因是RabbitMQ允許消費(fèi)者消費(fèi)一條消息的時(shí)間可以很久很久讥脐。
為了便于理解遭居,我們舉個(gè)具體的例子,生產(chǎn)者的話的我們延用上文中的DurableProducer:
package com.zwwhnly.springbootaction.rabbitmq.durable;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class DurableProducer {
private final static String EXCHANGE_NAME = "durable-exchange";
private final static String QUEUE_NAME = "durable-queue";
public static void main(String[] args) throws IOException, TimeoutException {
// 創(chuàng)建連接
ConnectionFactory factory = new ConnectionFactory();
// 設(shè)置 RabbitMQ 的主機(jī)名
factory.setHost("localhost");
// 創(chuàng)建一個(gè)連接
Connection connection = factory.newConnection();
// 創(chuàng)建一個(gè)通道
Channel channel = connection.createChannel();
// 創(chuàng)建一個(gè)Exchange
channel.exchangeDeclare(EXCHANGE_NAME, "direct", true);
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
// 發(fā)送消息
String message = "durable exchange test";
AMQP.BasicProperties props = new AMQP.BasicProperties().builder().deliveryMode(2).build();
channel.basicPublish(EXCHANGE_NAME, "", props, message.getBytes());
// 關(guān)閉頻道和連接
channel.close();
connection.close();
}
}
然后新建一個(gè)消費(fèi)者AckConsumer類:
package com.zwwhnly.springbootaction.rabbitmq.ack;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class AckConsumer {
private final static String QUEUE_NAME = "durable-queue";
public static void main(String[] args) throws IOException, TimeoutException {
// 創(chuàng)建連接
ConnectionFactory factory = new ConnectionFactory();
// 設(shè)置 RabbitMQ 的主機(jī)名
factory.setHost("localhost");
// 創(chuàng)建一個(gè)連接
Connection connection = factory.newConnection();
// 創(chuàng)建一個(gè)通道
Channel channel = connection.createChannel();
// 創(chuàng)建隊(duì)列消費(fèi)者
com.rabbitmq.client.Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8");
int result = 1 / 0;
System.out.println("Received Message '" + message + "'");
}
};
channel.basicConsume(QUEUE_NAME, true, consumer);
}
}
我們先將autoAck參數(shù)設(shè)置為ture旬渠,即自動(dòng)確認(rèn)俱萍,并在消費(fèi)消息時(shí)故意寫個(gè)異常,然后先運(yùn)行生產(chǎn)者客戶端將消息寫入隊(duì)列中告丢,然后運(yùn)行消費(fèi)者客戶端枪蘑,發(fā)現(xiàn)消息未消費(fèi)成功但是卻消失了:
然后我們將autoAck設(shè)置為false:
channel.basicConsume(QUEUE_NAME, false, consumer);
再次運(yùn)行生產(chǎn)者客戶端將消息寫入隊(duì)列中岳颇,然后運(yùn)行消費(fèi)者客戶端,此時(shí)雖然消費(fèi)者客戶端仍然代碼異常颅湘,但是消息仍然在隊(duì)列中:
然后我們刪除掉消費(fèi)者客戶端中的異常代碼,重新啟動(dòng)消費(fèi)者客戶端闯参,發(fā)現(xiàn)消息消費(fèi)成功了瞻鹏,但是消息一直未Ack:
手動(dòng)停掉消費(fèi)者客戶端新博,發(fā)現(xiàn)消息又到了Ready狀態(tài),準(zhǔn)備重新投遞:
之所以消費(fèi)掉消息赫悄,卻一直還是Unacked狀態(tài),是因?yàn)槲覀儧](méi)在代碼中添加顯式的Ack代碼:
String message = new String(body, "UTF-8");
//int result = 1 / 0;
System.out.println("Received Message '" + message + "'");
long deliveryTag = envelope.getDeliveryTag();
channel.basicAck(deliveryTag, false);
deliveryTag可以看做消息的編號(hào)馏慨,它是一個(gè)64位的長(zhǎng)×××值埂淮。
此時(shí)運(yùn)行消費(fèi)者客戶端,發(fā)現(xiàn)消息消費(fèi)成功写隶,并且在隊(duì)列中被移除:
文末彩蛋
[Java學(xué)習(xí)樟澜、面試;文檔叮盘、視頻資源免費(fèi)獲取]