隊(duì)列持久化
// 隊(duì)列消息持久化
boolean durable = true;
channel.queuDeclare = (ACK_QUEUE_NAME变隔,durable喘落,flase,false,null);
上面的代碼就是進(jìn)行消息持久話茵宪,當(dāng)然還有其他寫法,例如:
@Bean
public Queue directProductQueue(){
return QueueBuilder.durable(隊(duì)列名);
其他寫法不一一贅述瘦棋。
如果隊(duì)列A之前沒(méi)有持久化稀火,重啟RabbitMQ后,隊(duì)列會(huì)消息赌朋,并且凰狞,在代碼里將隊(duì)列A改為了持久化,需要先將原來(lái)的隊(duì)列刪除掉沛慢,否則會(huì)報(bào)錯(cuò)赡若。
持久化后,在控制臺(tái)中會(huì)顯示"D"团甲,這樣的話逾冬,即使重啟RabbitMQ,隊(duì)列A也會(huì)照樣存在。
消息持久化
隊(duì)列持久化并不能讓消息持久化,如果RabbitMQ宕機(jī)身腻,重啟后产还,持久化后的隊(duì)列還會(huì)存在,因?yàn)橄⒛J(rèn)保存在內(nèi)存中嘀趟,所以消息會(huì)丟失脐区,如果想讓消息不丟失,或者丟失的少她按,最好將消息進(jìn)行持久化牛隅,需要在生產(chǎn)段進(jìn)行配置
Message message1 = MessageBuilder.withBody(msgBody.getBytes())
.setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN)
.setContentEncoding("UTF-8")
.setCorrelationId(msgId).setDeliveryMode(MessageDeliveryMode.PERSISTENT).build();
CorrelationData correlationData = new CorrelationData(msgId);
rabbitTemplate.convertAndSend(exchange, routingKey, message1,correlationData);
以上代碼中,setDeliveryMode(MessageDeliveryMode.PERSISTENT) 就是將消息進(jìn)行了持久化酌泰。即使RabbitMQ宕機(jī)倔叼,消息也不會(huì)全部丟失,為什么不能保證全部不丟失呢宫莱?因?yàn)樵谝环N極端情況下丈攒,例如RabbitMQ在將消息寫入磁盤的過(guò)程中,RabbitMQ宕機(jī)授霸,此時(shí)巡验,還未寫入磁盤的部分消息就會(huì)丟失。
當(dāng)然有很多方法可以保證消息盡可能不丟失碘耳,例如生產(chǎn)者發(fā)送消息后立馬將消息寫入數(shù)據(jù)庫(kù)显设,即使RabbitMQ讓部分消息丟失,我們也可以通過(guò)數(shù)據(jù)庫(kù)里的消息進(jìn)行補(bǔ)償辛辨,例如重發(fā)消息捕捂,但是發(fā)消息時(shí)同時(shí)寫庫(kù),對(duì)性能會(huì)有一些影響斗搞。
發(fā)布確認(rèn)
什么是發(fā)布確認(rèn)指攒,發(fā)布確認(rèn)就是生產(chǎn)者發(fā)布的消息被投遞到指定隊(duì)列后,broker會(huì)通過(guò)回調(diào)函數(shù)告訴生產(chǎn)者消息投遞成功了僻焚,要注意允悦,這只是消息投遞成功了,而不是消費(fèi)成功虑啤。
發(fā)布確認(rèn)是否開(kāi)啟需要自己手動(dòng)設(shè)置隙弛,比如可以在application.yml中設(shè)置如下:
rabbitmq:
addresses: xx.xx.xx.x
port: 5672
username: xxx
password: xxxxxxxx
publisher-confirms: true #是否開(kāi)啟回調(diào)
單個(gè)確認(rèn)
單個(gè)確認(rèn)發(fā)布屬于同步確認(rèn),發(fā)一條消息確認(rèn)一次狞山,缺點(diǎn)是發(fā)布消息比較慢全闷,這種方式最多提供每秒不超過(guò)數(shù)百條的發(fā)布消息吞吐量。批量確認(rèn)
相比于單個(gè)確認(rèn)萍启,批量確認(rèn)極大的提高了吞吐量总珠,但是當(dāng)發(fā)生故障時(shí),不能確定哪條消息出了問(wèn)題,同時(shí)姚淆,批量確認(rèn)也是同步的孕蝉。異步確認(rèn)
異步確認(rèn)不會(huì)同步等待broker的確認(rèn)信息,異步響應(yīng)broker的確認(rèn)信息腌逢。
先貼一下代碼:
@Component
@Slf4j
public class RabbitTemplateConfig implements RabbitTemplate.ConfirmCallback {
@Autowired
private RabbitTemplate rabbitTemplate;
@PostConstruct
public void initRabbitTemplate() {
// 設(shè)置生產(chǎn)者消息確認(rèn)
rabbitTemplate.setConfirmCallback(this);
}
/**
* 消息發(fā)送到 Broker 后觸發(fā)回調(diào)
*
* @param correlationData bean
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
if (ack) {
// 如果消息被確認(rèn)了降淮,走一套邏輯
} else {
//如果消息沒(méi)有被確認(rèn),是否補(bǔ)償?持久化到數(shù)據(jù)庫(kù)還是定期處理搏讶? correlationData.getId()
}
}
對(duì)于消息被確認(rèn)還是沒(méi)有被確認(rèn)的具體處理邏輯需要自己去寫佳鳖,你可以在發(fā)消息前將消息先存入redis或者M(jìn)ySQL,為每一條消息設(shè)置一個(gè)唯一的id(可以用UUID媒惕、雪花算法等等)系吩,就是correlationData.getId(),當(dāng)消息沒(méi)有被確認(rèn)妒蔚,可以拿著這個(gè)唯一的id將完整的消息取出來(lái)穿挨,做消息補(bǔ)償還是只是記錄錯(cuò)誤日志自己定奪。
手動(dòng)ack
消息到達(dá)隊(duì)列后肴盏,準(zhǔn)備被消費(fèi)者消費(fèi)科盛,消息被成功消費(fèi)后,即業(yè)務(wù)處理完成后菜皂,進(jìn)行手動(dòng)ack贞绵,RabbitMQ默認(rèn)是自動(dòng)ack的,就是只要開(kāi)始消費(fèi)恍飘,就會(huì)被自動(dòng)ack榨崩,自動(dòng)ack后,隊(duì)列中對(duì)應(yīng)的這條消息就沒(méi)了章母,生產(chǎn)中最好用手動(dòng)ack母蛛。
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
如果在消費(fèi)過(guò)程中出現(xiàn)了問(wèn)題,可以將消息reject胳施,reject后可以選擇消息重新入隊(duì)或者消息直接被丟棄,下面代碼中的 false 表示不重新入隊(duì)溯祸,如果重新入隊(duì)肢专,可能會(huì)帶來(lái)一個(gè)問(wèn)題舞肆,就是如果這條消息永遠(yuǎn)會(huì)在被消費(fèi)的過(guò)程中產(chǎn)生錯(cuò)誤,那么這條消息就會(huì)不斷地被重新入隊(duì)博杖,會(huì)造成死循環(huán)椿胯。
channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
死信隊(duì)列
消息在被消費(fèi)的過(guò)程中發(fā)生錯(cuò)誤怎么辦呢,重新入隊(duì)有風(fēng)險(xiǎn)剃根,可以將消息發(fā)到死信隊(duì)列進(jìn)行處理哩盲,不影響原隊(duì)列。
先說(shuō)一下什么是死信,就是由于某些原因?qū)е玛?duì)列中的某些消息無(wú)法被消費(fèi)廉油,這些消息如果沒(méi)有后期的處理惠险,就會(huì)變成死信,用來(lái)處理死信的隊(duì)列就是死信隊(duì)列抒线,當(dāng)然死信隊(duì)列還可以當(dāng)作延遲隊(duì)列用班巩。
設(shè)置死信隊(duì)列的方法可以參考下方代碼:
@Configuration
public class RabbitConfig {
// 交換機(jī)
public static final String EXCHANGE_TEST= "exchangeTest";
// 路由鍵
public static final String ROUTING_KEY_TEST = "routingKeyTest";
// 隊(duì)列
public static final String DIRECT_QUEUE_TEST = "direct.queuetest";
/**
* 交換機(jī)
**/
@Bean
public DirectExchange directExchange() {
return new DirectExchange(EXCHANGE_TEST);
}
/**
* 隊(duì)列
**/
@Bean
public Queue directQueue() {
return QueueBuilder.durable(DIRECT_QUEUE_TEST)
//死信交換機(jī)聲明
.withArgument("x-dead-letter-exchange", DeadMQConfig.DIRECT_DEAD_EXCHANGE_NAME)
//死信消息的路由key
.withArgument("x-dead-letter-routing-key", DeadMQConfig.DIRECT_DEAD_ROUTING_KEY_NAME)
.build();
}
/**
* Binding,將該routing key的消息通過(guò)交換機(jī)轉(zhuǎn)發(fā)到該隊(duì)列
*/
@Bean
public Binding directBinding() {
return BindingBuilder.bind(directQueue()).to(directExchange()).with(ROUTING_KEY_TEST );
}
}
參考:
[1] 尚硅谷-《消息中間件RabbitMQ》
[2] https://blog.csdn.net/qq_32662795/article/details/88742397
[3] https://www.cnblogs.com/he-erduo/p/13558308.html