一條消費成功被消費經歷了生產者->MQ->消費者,因此在這三個步驟中都有可能造成消息丟失。
一 消息生產者沒有把消息成功發(fā)送到MQ
1.1 事務機制
AMQP
協(xié)議提供了事務機制邑狸,在投遞消息時開啟事務支持,如果消息投遞失敗闸翅,則回滾事務舟铜。
自定義事務管理器
@Configuration
public class RabbitTranscation {
@Bean
public RabbitTransactionManager rabbitTransactionManager(ConnectionFactory connectionFactory){
return new RabbitTransactionManager(connectionFactory);
}
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory){
return new RabbitTemplate(connectionFactory);
}
}
修改yml
spring:
rabbitmq:
# 消息在未被隊列收到的情況下返回
publisher-returns: true
開啟事務支持
rabbitTemplate.setChannelTransacted(true);
消息未接收時調用ReturnCallback
rabbitTemplate.setMandatory(true);
生產者投遞消息
@Service
public class ProviderTranscation implements RabbitTemplate.ReturnCallback {
@Autowired
RabbitTemplate rabbitTemplate;
@PostConstruct
public void init(){
// 設置channel開啟事務
rabbitTemplate.setChannelTransacted(true);
rabbitTemplate.setReturnCallback(this);
}
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
System.out.println("這條消息發(fā)送失敗了"+message+",請?zhí)幚?);
}
@Transactional(rollbackFor = Exception.class,transactionManager = "rabbitTransactionManager")
public void publishMessage(String message) throws Exception {
rabbitTemplate.setMandatory(true);
rabbitTemplate.convertAndSend("javatrip",message);
}
}
但是,很少有人這么干仇穗,因為這是同步操作流部,一條消息發(fā)送之后會使發(fā)送端阻塞,以等待RabbitMQ-Server的回應纹坐,之后才能繼續(xù)發(fā)送下一條消息枝冀,生產者生產消息的吞吐量和性能都會大大降低。
1.2 發(fā)送方確認機制
發(fā)送消息時將信道設置為confirm
模式耘子,消息進入該信道后果漾,都會被指派給一個唯一ID,一旦消息被投遞到所匹配的隊列后谷誓,RabbitMQ
就會發(fā)送給生產者一個確認绒障。
開啟消息確認機制
spring:
rabbitmq:
# 消息在未被隊列收到的情況下返回
publisher-returns: true
# 開啟消息確認機制
publisher-confirm-type: correlated
消息未接收時調用ReturnCallback
rabbitTemplate.setMandatory(true);
生產者投遞消息
@Service
public class ConfirmProvider implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnCallback {
@Autowired
RabbitTemplate rabbitTemplate;
@PostConstruct
public void init() {
rabbitTemplate.setReturnCallback(this);
rabbitTemplate.setConfirmCallback(this);
}
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
if(ack){
System.out.println("確認了這條消息:"+correlationData);
}else{
System.out.println("確認失敗了:"+correlationData+";出現(xiàn)異常:"+cause);
}
}
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
System.out.println("這條消息發(fā)送失敗了"+message+",請?zhí)幚?);
}
public void publisMessage(String message){
rabbitTemplate.setMandatory(true);
rabbitTemplate.convertAndSend("javatrip",message);
}
}
如果消息確認失敗后捍歪,我們可以進行消息補償户辱,也就是消息的重試機制鸵钝。當未收到確認信息時進行消息的重新投遞。設置如下配置即可完成焕妙。
spring:
rabbitmq:
# 支持消息發(fā)送失敗后重返隊列
publisher-returns: true
# 開啟消息確認機制
publisher-confirm-type: correlated
listener:
simple:
retry:
# 開啟重試
enabled: true
# 最大重試次數
max-attempts: 5
# 重試時間間隔
initial-interval: 3000
二 消息發(fā)送到MQ后蒋伦,MQ宕機導致內存中的消息丟失
消息在MQ中有可能發(fā)生丟失,這時候我們就需要將隊列和消息都進行持久化焚鹊。
@Queue注解為我們提供了隊列相關的一些屬性痕届,具體如下:
- name: 隊列的名稱;
- durable: 是否持久化末患;
- exclusive: 是否獨享研叫、排外的;
- autoDelete: 是否自動刪除璧针;
- arguments:隊列的其他屬性參數嚷炉,有如下可選項,可參看圖2的arguments:
- x-message-ttl:消息的過期時間探橱,單位:毫秒申屹;
- x-expires:隊列過期時間,隊列在多長時間未被訪問將被刪除隧膏,單位:毫秒哗讥;
- x-max-length:隊列最大長度,超過該最大值胞枕,則將從隊列頭部開始刪除消息杆煞;
- x-max-length-bytes:隊列消息內容占用最大空間,受限于內存大小腐泻,超過該閾值則從隊列頭部開始刪除消息决乎;
- x-overflow:設置隊列溢出行為。這決定了當達到隊列的最大長度時消息會發(fā)生什么派桩。有效值是drop-head构诚、reject-publish或reject-publish-dlx。仲裁隊列類型僅支持drop-head铆惑;
- x-dead-letter-exchange:死信交換器名稱唤反,過期或被刪除(因隊列長度超長或因空間超出閾值)的消息可指定發(fā)送到該交換器中;
- x-dead-letter-routing-key:死信消息路由鍵鸭津,在消息發(fā)送到死信交換器時會使用該路由鍵,如果不設置肠缨,則使用消息的原來的路由鍵值
- x-single-active-consumer:表示隊列是否是單一活動消費者逆趋,true時,注冊的消費組內只有一個消費者消費消息晒奕,其他被忽略闻书,false時消息循環(huán)分發(fā)給所有消費者(默認false)
- x-max-priority:隊列要支持的最大優(yōu)先級數;如果未設置名斟,隊列將不支持消息優(yōu)先級;
- x-queue-mode(Lazy mode):將隊列設置為延遲模式魄眉,在磁盤上保留盡可能多的消息砰盐,以減少RAM的使用;如果未設置,隊列將保留內存緩存以盡可能快地傳遞消息坑律;
- x-queue-master-locator:在集群模式下設置鏡像隊列的主節(jié)點信息岩梳。
持久化隊列
創(chuàng)建隊列的時候將持久化屬性durable設置為true,同時要將autoDelete設置為false
@Queue(value = "javatrip",durable = "false",autoDelete = "false")
持久化消息
發(fā)送消息的時候將消息的deliveryMode設置為2晃择,在Spring Boot中消息默認就是持久化的冀值。
三 消費者消費消息的時候,未消費完畢就出現(xiàn)了異常
消費者剛消費了消息宫屠,還沒有處理業(yè)務列疗,結果發(fā)生異常。這時候就需要關閉自動確認浪蹂,改為手動確認消息抵栈。
修改yml為手動簽收模式
spring:
rabbitmq:
listener:
simple:
# 手動簽收模式
acknowledge-mode: manual
# 每次簽收一條消息
prefetch: 1
消費者手動簽收
@Component
@RabbitListener(queuesToDeclare = @Queue(value = "javatrip", durable = "true"))
public class Consumer {
@RabbitHandler
public void receive(String message, @Headers Map<String,Object> headers, Channel channel) throws Exception{
System.out.println(message);
// 唯一的消息ID
Long deliverTag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG);
// 確認該條消息
if(...){
channel.basicAck(deliverTag,false);
}else{
// 消費失敗,消息重返隊列
channel.basicNack(deliverTag,false,true);
}
}
}
四 總結
消息丟失的原因坤次?
生產者古劲、MQ、消費者都有可能造成消息丟失
如何保證消息的可靠性浙踢?
- 發(fā)送方采取發(fā)送者確認模式
- MQ進行隊列及消息的持久化
-
消費者消費成功后手動確認消息
image