前言
導(dǎo)文:
1.什么是RabbitMQ
2.Java開發(fā)技術(shù)大雜燴(三)之電商項(xiàng)目?jī)?yōu)化、rabbitmq腾仅、Git乒裆、OSI套利、VIM推励、Intellj IDEA、HTTP肉迫、JS验辞、Java
之前在上面2篇文章中,講到過RabbitMQ的安裝喊衫,基本概念和用法跌造。我們來回顧一下RabbitMQ核心基礎(chǔ)概念。
Server:又稱之為Broker族购,接受客戶端的連接壳贪,實(shí)現(xiàn)AMQP實(shí)體服務(wù)。
Connection:連接寝杖,應(yīng)用程序與Broker的網(wǎng)絡(luò)連接违施。
Channel:網(wǎng)絡(luò)信道,幾乎所有的操作都在Channel中進(jìn)行瑟幕,Channel是進(jìn)行消息讀寫的通道磕蒲×袅剩客戶端可以建立多個(gè)Channel,每個(gè)Channel代表一個(gè)會(huì)話任務(wù)辣往。
如果每一次訪問RabbitMQ都建立一個(gè)Connection兔院,在消息量大的時(shí)候建立TCP Connection的開銷將是巨大的,效率也較低站削。Channel是在connection內(nèi)部建立的邏輯連接坊萝,如果應(yīng)用程序支持多線程,通常每個(gè)thread創(chuàng)建單獨(dú)的channel進(jìn)行通訊许起,AMQP method包含了channel id幫助客戶端和message broker識(shí)別channel屹堰,所以channel之間是完全隔離的。Channel作為輕量級(jí)的Connection極大減少了操作系統(tǒng)建立TCP connection的開銷街氢。Message:消息扯键,服務(wù)器和應(yīng)用程序之間傳送的數(shù)據(jù),由Message Properties和Body組成珊肃。Properties可以對(duì)消息進(jìn)行修飾荣刑,比如消息的優(yōu)先級(jí),延遲等高級(jí)特性伦乔,Body就是消息體內(nèi)容厉亏。
Virtual Host:虛擬地址,用于進(jìn)行邏輯隔離烈和,最上層的消息路由爱只。一個(gè)Virtual Host里面可以有若干個(gè)Exchange和Queue,同一個(gè)Virtual Host里面不能有相同名稱的Exchange或者Queue招刹。
Exchange:交換機(jī)恬试,接收消息,根據(jù)路由鍵轉(zhuǎn)發(fā)消息到綁定的隊(duì)列疯暑。
Binding:Exchange和Queue之間的虛擬連接训柴,binding中可以包含routing key。
Routing key:一個(gè)路由規(guī)則妇拯,虛擬機(jī)可以用它來確定如何路由一個(gè)特定消息幻馁。
Queue:也可以稱之為Message Queue(消息隊(duì)列),保存消息并將它們轉(zhuǎn)發(fā)到消費(fèi)者越锈。
通過下面2張圖仗嗦,我們能大概能明白AMQP協(xié)議模型和消息流轉(zhuǎn)過程。在Exchange和Message Queue上面還有Virtual host甘凭。記住同一個(gè)Virtual Host里面不能有相同名稱的ExChange和Message Queue稀拐。
接著我們看下面的圖,這是RabbitMQ消息可靠性投遞的解決方案之一对蒲。
1.將消息落地到業(yè)務(wù)db和Message db钩蚊。
2.采用Confirm方式發(fā)送消息至MQ Broker贡翘,返回結(jié)果的過程是異步的。Confirm消息砰逻,是指生產(chǎn)者投遞消息后鸣驱,如果Broker收到消息后,會(huì)給生產(chǎn)者一個(gè)ACK蝠咆。生產(chǎn)者通過ACK踊东,可以確認(rèn)這條消息是否正常發(fā)送到Broker,這種方式是消息可靠性投遞的核心刚操。
3闸翅、4:在這里將消息分成3種狀態(tài)。status=0表示消息正在投遞中菊霜,status=1表示消息投遞成功坚冀,status=2表示消息投遞了3次還是失敗。生產(chǎn)者接收Broker返回的Confirm確認(rèn)消息結(jié)果鉴逞,然后根據(jù)結(jié)果更新消息的狀態(tài)记某。將status的狀態(tài)從投遞中改成投遞成功即可。
5.在消息Confirm過程中构捡,可能由于網(wǎng)絡(luò)閃斷問題或者是Broker端出現(xiàn)異常液南,導(dǎo)致回送消息失敗或者出現(xiàn)異常。這時(shí)候勾徽,就需要生產(chǎn)者對(duì)消息進(jìn)行可靠性投遞滑凉,保證投遞到Broker的消息可靠不丟失。還有一種極端情況值得我們考慮喘帚,那就是網(wǎng)絡(luò)閃斷畅姊。我們的消息成功投遞到Broker,但是在回送ACK確認(rèn)消息時(shí)啥辨,由于網(wǎng)絡(luò)閃斷涡匀,生產(chǎn)者沒有收到。此時(shí)我們?cè)僦匦峦哆f此消息可能會(huì)造成消費(fèi)端重復(fù)消費(fèi)消息了溉知。這時(shí)候需要消費(fèi)端去做冪等處理(生成全局消息ID,判斷此消息是否消費(fèi)過)腕够。對(duì)于沒有投遞成功的消息级乍,我們可以設(shè)置一個(gè)重新投遞時(shí)間。比如一個(gè)消息在5分鐘內(nèi)帚湘,status狀態(tài)還是0玫荣,也就是這個(gè)消息還沒有成功投遞到Broker端。這時(shí)候我們需要一個(gè)定時(shí)任務(wù)大诸,每隔幾分鐘從Message db中拉取status為0的消息捅厂。
6.將拉取的消息執(zhí)行重新投遞操作贯卦。
7.設(shè)置最大消息投遞次數(shù)。當(dāng)一個(gè)消息被投遞了3次焙贷,還是不成功撵割,那么將status置為2。最后交給人工解決處理此類問題或者將消息轉(zhuǎn)存到失敗表辙芍。
下面講解一下涉及到消息可靠性的知識(shí)點(diǎn)和一些配置了啡彬。
application-dev.properties
#rabbtisMQ配置
spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.username=root
spring.rabbitmq.password=root
spring.rabbitmq.virtual-host=/
#消費(fèi)者數(shù)量
spring.rabbitmq.listener.simple.concurrency=10
#最大消費(fèi)者數(shù)量
spring.rabbitmq.listener.simple.max-concurrency=10
#消費(fèi)者每次從隊(duì)列獲取的消息數(shù)量
spring.rabbitmq.listener.simple.prefetch=1
#消費(fèi)者自動(dòng)啟動(dòng)
spring.rabbitmq.listener.simple.auto-startup=true
#消費(fèi)失敗,自動(dòng)重新入隊(duì)
#重試次數(shù)超過最大限制之后是否丟棄(true不丟棄時(shí)需要寫相應(yīng)代碼將該消息加入死信隊(duì)列)
#true故硅,自動(dòng)重新入隊(duì)庶灿,要寫相應(yīng)代碼將該消息加入死信隊(duì)列
#false,丟棄
spring.rabbitmq.listener.simple.default-requeue-rejected=false
#是否開啟消費(fèi)者重試(為false時(shí)關(guān)閉消費(fèi)者重試,這時(shí)消費(fèi)端代碼異常會(huì)一直重復(fù)收到消息)
spring.rabbitmq.listener.simple.retry.enabled=true
spring.rabbitmq.listener.simple.retry.initial-interval=1000
spring.rabbitmq.listener.simple.retry.max-attempts=3
spring.rabbitmq.listener.simple.retry.multiplier=1.0
spring.rabbitmq.listener.simple.retry.max-interval=10000
#啟動(dòng)發(fā)送重試策略
spring.rabbitmq.template.retry.enabled=true
#初始重試間隔為1s
spring.rabbitmq.template.retry.initial-interval=1000
#重試的最大次數(shù)
spring.rabbitmq.template.retry.max-attempts=3
#重試間隔最多10s
spring.rabbitmq.template.retry.max-interval=10000
#每次重試的因子是1.0 等差
spring.rabbitmq.template.retry.multiplier=1.0
#
#RabbitMQ的消息確認(rèn)有兩種吃衅。
#一種是消息發(fā)送確認(rèn)往踢。這種是用來確認(rèn)生產(chǎn)者將消息發(fā)送給交換器,交換器傳遞給隊(duì)列的過程中徘层,
# 消息是否成功投遞菲语。
#發(fā)送確認(rèn)分為兩步,一是確認(rèn)是否到達(dá)交換器惑灵,二是確認(rèn)是否到達(dá)隊(duì)列山上。
#第二種是消費(fèi)接收確認(rèn)。這種是確認(rèn)消費(fèi)者是否成功消費(fèi)了隊(duì)列中的消息英支。
# 確認(rèn)消息發(fā)送成功佩憾,通過實(shí)現(xiàn)ConfirmCallBack接口,消息發(fā)送到交換器Exchange后觸發(fā)回調(diào)
spring.rabbitmq.publisher-confirms=true
# 實(shí)現(xiàn)ReturnCallback接口干花,如果消息從交換器發(fā)送到對(duì)應(yīng)隊(duì)列失敗時(shí)觸發(fā)
# (比如根據(jù)發(fā)送消息時(shí)指定的routingKey找不到隊(duì)列時(shí)會(huì)觸發(fā))
spring.rabbitmq.publisher-returns=true
# 消息消費(fèi)確認(rèn)妄帘,可以手動(dòng)確認(rèn)
spring.rabbitmq.listener.simple.acknowledge-mode=manual
#在消息沒有被路由到合適隊(duì)列情況下會(huì)將消息返還給消息發(fā)布者
#當(dāng)mandatory標(biāo)志位設(shè)置為true時(shí),如果exchange根據(jù)自身類型和消息routingKey無法找到一個(gè)合適的queue存儲(chǔ)消息池凄,
# 那么broker會(huì)調(diào)用basic.return方法將消息返還給生產(chǎn)者;當(dāng)mandatory設(shè)置為false時(shí)抡驼,
# 出現(xiàn)上述情況broker會(huì)直接將消息丟棄;通俗的講,mandatory標(biāo)志告訴broker代理服務(wù)器至少將消息route到一個(gè)隊(duì)列中肿仑,
# 否則就將消息return給發(fā)送者;
spring.rabbitmq.template.mandatory=true
要確保RabbitMQ消息的可靠要保證以下3點(diǎn):
1.publisher Confirms:要確保生產(chǎn)者的消息到broker的可靠性致盟。可能會(huì)發(fā)生消息投遞到broker過程中,broker掛了的情況。
2.Exchange冀值,Queue,Message持久化:RabbitMQ是典型的內(nèi)存式消息堆積杯道。我們需要把message存儲(chǔ)到磁盤中。如果是未持久化的消息存儲(chǔ)在內(nèi)存中责蝠,broker掛了那么消息會(huì)丟失党巾。
3.consumer acknowledgement:消費(fèi)者確認(rèn)模式有3種:none(沒有消息會(huì)發(fā)送應(yīng)答),auto(自動(dòng)應(yīng)答),manual(手動(dòng)應(yīng)答)萎庭。為了保證消息可靠性,我們?cè)O(shè)置手動(dòng)應(yīng)答齿拂,這是為什么呢驳规?采用自動(dòng)應(yīng)答的方式,每次消費(fèi)端收到消息后创肥,不管是否處理完成达舒,Broker都會(huì)把這條消息置為完成,然后從Queue中刪除叹侄。如果消費(fèi)端消費(fèi)時(shí)巩搏,拋出異常。也就是說消費(fèi)端沒有成功消費(fèi)該消息趾代,從而造成消息丟失贯底。為了確保消息被消費(fèi)者正確處理,我們采用手動(dòng)應(yīng)答(調(diào)用basicAck撒强、basicNack禽捆、basicReject方法),只有在消息得到正確處理下飘哨,再發(fā)送ACK胚想。
RabbitMQ消息確認(rèn)有2種:消息發(fā)送確認(rèn),消費(fèi)接收確認(rèn)芽隆。消息發(fā)送確認(rèn)是確認(rèn)生產(chǎn)者將消息發(fā)送到Exchange浊服,Exchange分發(fā)消息至Queue的過程中,消息是否可靠投遞胚吁。第一步是否到達(dá)Exchange牙躺,第二步確認(rèn)是否到達(dá)Queue。
實(shí)現(xiàn)ConfirmCallBack接口,消息發(fā)送到Exchange后觸發(fā)回調(diào)腕扶。
// 消息發(fā)送到交換器Exchange后觸發(fā)回調(diào)
private final RabbitTemplate.ConfirmCallback confirmCallback =
new RabbitTemplate.ConfirmCallback() {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
log.info("生產(chǎn)端confirm...");
log.info("correlationData=" + correlationData);
String messageId = correlationData.getId();
if (ack) {
//confirm返回成功,更新消息投遞狀態(tài)
brokerMessageLogMapper.updateMessageLogStatus(messageId, Constants.ORDER_SEND_SUCCESS, new Date());
} else {
// 失敗則進(jìn)行具體的后續(xù)操作孽拷,重試或者補(bǔ)償?shù)仁侄巍? log.info("異常處理...");
}
}
};
實(shí)現(xiàn)ReturnCallBack接口,消息從Exchange發(fā)送到指定的Queue失敗觸發(fā)回調(diào)
// 如果消息從交換器發(fā)送到對(duì)應(yīng)隊(duì)列失敗時(shí)觸發(fā)
private final RabbitTemplate.ReturnCallback returnCallback =
new RabbitTemplate.ReturnCallback() {
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
log.info("message=" + message.toString());
log.info("replyCode=" + replyCode);
log.info("replyText=" + replyText);
log.info("exchange=" + exchange);
log.info("routingKey=" + routingKey);
}
};
消息確認(rèn)機(jī)制開啟半抱,需要配置以下信息
spring.rabbitmq.publisher-confirms=true
spring.rabbitmq.publisher-returns=true
spring.rabbitmq.listener.simple.acknowledge-mode=manual
之前說過手動(dòng)應(yīng)答可以調(diào)用basicAck,basicNack,basicReject方法脓恕,下面來講講。
手動(dòng)確認(rèn)消息代虾,當(dāng)multiple為false进肯,只確認(rèn)當(dāng)前的消息。當(dāng)multiple為true棉磨,批量確認(rèn)所有比當(dāng)前deliveryTag小的消息。deliveryTag是用來標(biāo)識(shí)Channel中投遞的消息学辱。RabbitMQ保證在每個(gè)Channel中乘瓤,消息的deliveryTag是從1遞增环形。
當(dāng)消費(fèi)端處理消息異常時(shí),我們可以選擇處理失敗消息的方式衙傀。如果requeue為true抬吟,失敗消息會(huì)重新進(jìn)入Queue,試想一下统抬,如果消費(fèi)者在消費(fèi)時(shí)發(fā)生異常火本,那么就不會(huì)對(duì)這一次消息進(jìn)行ACK,進(jìn)而發(fā)生回滾消息的操作聪建,使消息始終放在Queue的頭部钙畔,然后不斷的被處理和回滾,導(dǎo)致隊(duì)列陷入死循環(huán)金麸,為了解決這種問題擎析,我們可以引入重試機(jī)制(當(dāng)重試次數(shù)超過最大值,丟棄該消息)或者是死信隊(duì)列+重試隊(duì)列挥下。
requeue為false揍魂,丟棄該消息。
和basicNack用法一樣棚瘟。
為了配合Return機(jī)制现斋,我們要配置spring.rabbitmq.template.mandatory=true
。它的作用是在消息沒有被路由到合適的隊(duì)列情況下偎蘸,Broker會(huì)將消息返回給生產(chǎn)者庄蹋。當(dāng)mandatory為true時(shí),如果Exchange根據(jù)類型和消息Routing Key無法路由到一個(gè)合適的Queue存儲(chǔ)消息禀苦,那么Broker會(huì)調(diào)用Basic.Return回調(diào)給handleReturn()蔓肯,再回調(diào)給ReturnCallback,將消息返回給生產(chǎn)者振乏。當(dāng)mandatory為false時(shí)蔗包,丟棄該消息。
@Override
public void handleReturn(int replyCode,
String replyText,
String exchange,
String routingKey,
BasicProperties properties,
byte[] body)
throws IOException {
ReturnCallback returnCallback = this.returnCallback;
if (returnCallback == null) {
Object messageTagHeader = properties.getHeaders().remove(RETURN_CORRELATION_KEY);
if (messageTagHeader != null) {
String messageTag = messageTagHeader.toString();
final PendingReply pendingReply = this.replyHolder.get(messageTag);
if (pendingReply != null) {
returnCallback = new ReturnCallback() {
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange,
String routingKey) {
pendingReply.returned(new AmqpMessageReturnedException("Message returned",
message, replyCode, replyText, exchange, routingKey));
}
};
}
else if (logger.isWarnEnabled()) {
logger.warn("Returned request message but caller has timed out");
}
}
else if (logger.isWarnEnabled()) {
logger.warn("Returned message but no callback available");
}
}
if (returnCallback != null) {
properties.getHeaders().remove(PublisherCallbackChannel.RETURN_CORRELATION_KEY);
MessageProperties messageProperties = this.messagePropertiesConverter.toMessageProperties(
properties, null, this.encoding);
Message returnedMessage = new Message(body, messageProperties);
returnCallback.returnedMessage(returnedMessage,
replyCode, replyText, exchange, routingKey);
}
}
當(dāng)消息路由不到合適的Queue慧邮,會(huì)在回調(diào)給ReturnCallck這些信息调限。
如果消費(fèi)端忘記了ACK,這些消息會(huì)一直處于Unacked 狀態(tài)误澳。由于RabbitMQ消息消費(fèi)沒有超時(shí)機(jī)制耻矮,也就是程序不重啟,消息會(huì)一直處于Unacked狀態(tài)忆谓。當(dāng)消費(fèi)端程序關(guān)閉時(shí)裆装,這些處于Unack狀態(tài)的消息會(huì)重新恢復(fù)成Ready狀態(tài)。這時(shí)候會(huì)出現(xiàn)一種情況:當(dāng)消費(fèi)端程序開啟時(shí),由于Broker端積壓了大量的消息哨免,又可能會(huì)讓消費(fèi)端崩潰茎活。所以我們要對(duì)消費(fèi)端進(jìn)行限流處理。RabbitMQ提供了一種qos(Quality of Service,服務(wù)質(zhì)量保證)功能琢唾,即在非自動(dòng)ACK前提下载荔,如果一定數(shù)量的消息未被ACK前,不進(jìn)行新消息的消息采桃。
spring.rabbitmq.listener.simple.prefetch=1
下面貼消息可靠性解決方案代碼了懒熙。
配置任務(wù)調(diào)度中心
@Configuration
@EnableScheduling
public class TaskSchedulerConfig implements SchedulingConfigurer {
protected ThreadPoolExecutor threadPoolExecutor;
@Override
public void configureTasks(ScheduledTaskRegistrar taskRegistrar) {
taskRegistrar.setScheduler(taskExecutor());
}
@Bean(destroyMethod = "shutdown")
public ThreadPoolExecutor taskExecutor() {
ThreadFactory namedThreadFactory = new ThreadFactoryBuilder()
.setNameFormat("task-executor-pool-%d").build();
this.threadPoolExecutor = new ScheduledThreadPoolExecutor(10, namedThreadFactory,
new ThreadPoolExecutor.AbortPolicy());
return threadPoolExecutor;
}
}
執(zhí)行重新投遞status為0的消息。這里也可以使用corn表達(dá)式設(shè)置觸發(fā)任務(wù)調(diào)度的時(shí)間普办。關(guān)于fixedRate和fixedDelay概念總有人搞混工扎。fixedRate任務(wù)兩次執(zhí)行時(shí)間間隔是任務(wù)的開始點(diǎn),而fixedDelay的間隔是前次任務(wù)的結(jié)束和下一次任務(wù)開始的間隔泌豆。
@Component
@Slf4j
public class RetryMessageTask {
@Autowired
private RabbitmqOrderSender rabbitmqOrderSender;
@Autowired
private BrokerMessageLogMapper brokerMessageLogMapper;
@Scheduled(initialDelay = 5000, fixedDelay = 30000)
public void trySendMessage() {
log.info("定時(shí)投遞status為0的消息...");
List<BrokerMessageLog> brokerMessageLogList = brokerMessageLogMapper.listStatusAndTimeoutMessage();
brokerMessageLogList.forEach(brokerMessageLog -> {
if (brokerMessageLog.getTryCount() >= 3) {
log.info("投遞3次還是失敗...");
brokerMessageLogMapper.updateMessageLogStatus(brokerMessageLog.getMessageId(),
Constants.ORDER_SEND_FAIL,
new Date());
} else {
log.info("投遞失敗...");
brokerMessageLogMapper.updateReSendMessage(brokerMessageLog.getMessageId(),
new Date());
Order order = JSON.parseObject(brokerMessageLog.getMessage(), Order.class);
try {
rabbitmqOrderSender.sendOrder(order);
} catch (Exception e) {
log.error("重新投遞消息發(fā)送異常...:" + e.getMessage());
}
}
});
}
}
消息生產(chǎn)端
@Component
@Slf4j
public class RabbitmqOrderSender {
@Autowired
private RabbitTemplate rabbitTemplate;
@Autowired
private BrokerMessageLogMapper brokerMessageLogMapper;
// 消息發(fā)送到交換器Exchange后觸發(fā)回調(diào)
private final RabbitTemplate.ConfirmCallback confirmCallback =
new RabbitTemplate.ConfirmCallback() {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
log.info("生產(chǎn)端confirm...");
log.info("correlationData=" + correlationData);
String messageId = correlationData.getId();
if (ack) {
//confirm返回成功,更新消息投遞狀態(tài)
brokerMessageLogMapper.updateMessageLogStatus(messageId, Constants.ORDER_SEND_SUCCESS, new Date());
} else {
// 失敗則進(jìn)行具體的后續(xù)操作定庵,重試或者補(bǔ)償?shù)仁侄巍? log.info("異常處理...");
}
}
};
// 如果消息從交換器發(fā)送到對(duì)應(yīng)隊(duì)列失敗時(shí)觸發(fā)
private final RabbitTemplate.ReturnCallback returnCallback =
new RabbitTemplate.ReturnCallback() {
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
log.info("message=" + message.toString());
log.info("replyCode=" + replyCode);
log.info("replyText=" + replyText);
log.info("exchange=" + exchange);
log.info("routingKey=" + routingKey);
}
};
public void sendOrder(Order order) {
log.info("生產(chǎn)端發(fā)送消息...");
rabbitTemplate.setConfirmCallback(this.confirmCallback);
rabbitTemplate.setReturnCallback(this.returnCallback);
CorrelationData correlationData = new CorrelationData(order.getMessageId());
rabbitTemplate.convertAndSend(MQConfig.ORDER_DIRECT_EXCAHNGE,
MQConfig.ORDER_QUEUE, order, correlationData);
}
}
消息消費(fèi)端
@Component
@Slf4j
public class RabbitmqOrderReceiver {
@RabbitListener(queues = MQConfig.ORDER_QUEUE)
public void receive(@Payload Order order, Channel channel,
@Headers Map<String, Object> headers,
Message message) throws IOException, InterruptedException {
log.info("消費(fèi)端接收消息...");
log.info("message=" + message.toString());
log.info("order=" + order);
Long deliveryTag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG);
log.info("deliveryTag=" + deliveryTag);
// 手工ack
channel.basicAck(deliveryTag, false);
}
}
當(dāng)我們發(fā)送消息時(shí),故意將Exchange設(shè)置成一個(gè)不存在的值踪危。消息路由不到合適的Exchange蔬浙,Confirm機(jī)制回送的ACK會(huì)返回false,走異常處理贞远。這個(gè)消息的狀態(tài)不會(huì)更新成1畴博。然后定時(shí)任務(wù)會(huì)拉取status為0的消息,進(jìn)行重新投遞蓝仲,投遞了3次消息還未成功俱病,將status置為2。
接下來袱结,我們測(cè)試一波亮隙。
@Test
public void test() {
Order order = new Order();
order.setId("36");
order.setName("cmazxiaoma測(cè)試訂單-36");
order.setMessageId(UUIDUtil.uuid());
rabbitmqOrderService.createOrder(order);
}
消息投遞失敗。
定時(shí)任務(wù)重新投遞消息失敗垢夹。
將失敗的消息重新投遞3次還是失敗溢吻。
更新Message db信息,將重新投遞3次還是失敗的消息狀態(tài)置為2果元。
接著我們把消費(fèi)端手動(dòng)ACK的代碼注釋掉促王,再讓生產(chǎn)端發(fā)送消息《梗看看會(huì)出現(xiàn)什么情況蝇狼。
我們會(huì)發(fā)現(xiàn)Queue堆積了該消息。
我們關(guān)掉RabbitMQ Server倡怎,看看此消息是否會(huì)持久化迅耘。
[root@VM_0_11_centos log]# ps -ef|grep rabbitmq
root 13283 10291 0 13:42 pts/1 00:00:00 grep --color=auto rabbitmq
root 23051 1 1 Nov06 ? 00:09:29 /usr/lib64/erlang/erts-5.10.4/bin/beam -W w -A 64 -P 1048576 -t 5000000 -stbt db -zdbbl 128000 -K true -- -root /usr/lib64/erlang -progname erl -- -home /root -- -pa /usr/local/rabbitmq/ebin -noshell -noinput -s rabbit boot -sname rabbit@VM_0_11_centos -boot start_sasl -kernel inet_default_connect_options [{nodelay,true}] -sasl errlog_type error -sasl sasl_error_logger false -rabbit error_logger {file,"/usr/local/rabbitmq/var/log/rabbitmq/rabbit@VM_0_11_centos.log"} -rabbit sasl_error_logger {file,"/usr/local/rabbitmq/var/log/rabbitmq/rabbit@VM_0_11_centos-sasl.log"} -rabbit enabled_plugins_file "/usr/local/rabbitmq/etc/rabbitmq/enabled_plugins" -rabbit plugins_dir "/usr/local/rabbitmq/plugins" -rabbit plugins_expand_dir "/usr/local/rabbitmq/var/lib/rabbitmq/mnesia/rabbit@VM_0_11_centos-plugins-expand" -os_mon start_cpu_sup false -os_mon start_disksup false -os_mon start_memsup false -mnesia dir "/usr/local/rabbitmq/var/lib/rabbitmq/mnesia/rabbit@VM_0_11_centos" -kernel inet_dist_listen_min 25672 -kernel inet_dist_listen_max 25672 -noshell -noinput
[root@VM_0_11_centos log]# kill -9 23051
[root@VM_0_11_centos sbin]# rabbitmq-server -detached
Warning: PID file not written; -detached was passed.
[root@VM_0_11_centos sbin]# ps -ef|grep rabbitmq
root 13500 1 31 13:44 ? 00:00:02 /usr/lib64/erlang/erts-5.10.4/bin/beam -W w -A 64 -P 1048576 -t 5000000 -stbt db -zdbbl 128000 -K true -- -root /usr/lib64/erlang -progname erl -- -home /root -- -pa /usr/local/rabbitmq/ebin -noshell -noinput -s rabbit boot -sname rabbit@VM_0_11_centos -boot start_sasl -kernel inet_default_connect_options [{nodelay,true}] -sasl errlog_type error -sasl sasl_error_logger false -rabbit error_logger {file,"/usr/local/rabbitmq/var/log/rabbitmq/rabbit@VM_0_11_centos.log"} -rabbit sasl_error_logger {file,"/usr/local/rabbitmq/var/log/rabbitmq/rabbit@VM_0_11_centos-sasl.log"} -rabbit enabled_plugins_file "/usr/local/rabbitmq/etc/rabbitmq/enabled_plugins" -rabbit plugins_dir "/usr/local/rabbitmq/plugins" -rabbit plugins_expand_dir "/usr/local/rabbitmq/var/lib/rabbitmq/mnesia/rabbit@VM_0_11_centos-plugins-expand" -os_mon start_cpu_sup false -os_mon start_disksup false -os_mon start_memsup false -mnesia dir "/usr/local/rabbitmq/var/lib/rabbitmq/mnesia/rabbit@VM_0_11_centos" -kernel inet_dist_listen_min 25672 -kernel inet_dist_listen_max 25672 -noshell -noinput
root 13597 10291 0 13:44 pts/1 00:00:00 grep --color=auto rabbitmq
執(zhí)行rabbitmqctl list_queues name messages_ready messages_unacknowledged
命令贱枣,查詢Queue情況,發(fā)現(xiàn)Message持久化了豹障。
斷開消費(fèi)者程序冯事,我們可以看到消息從Unacked狀態(tài)轉(zhuǎn)換成Ready了焦匈。
尾言
不管是神還是惡魔都不會(huì)對(duì)不抗?fàn)幍娜松斐鲈?/p>