1. 配置RabbitMQ
# 發(fā)送確認(rèn)
spring.rabbitmq.publisher-confirms=true
# 發(fā)送回調(diào)
spring.rabbitmq.publisher-returns=true
# 消費(fèi)手動(dòng)確認(rèn)
spring.rabbitmq.listener.simple.acknowledge-mode=manual
2. 生產(chǎn)者發(fā)送消息確認(rèn)機(jī)制
- 其實(shí)這個(gè)也不能叫確認(rèn)機(jī)制,只是起到一個(gè)監(jiān)聽的作用唐含,監(jiān)聽生產(chǎn)者是否發(fā)送消息到exchange和queue。
- 生產(chǎn)者和消費(fèi)者代碼不改變彻亲。
- 新建配置類 MQProducerAckConfig.java 實(shí)現(xiàn)ConfirmCallback和ReturnCallback接口堕担,@Component注冊(cè)成組件。
- ConfirmCallback只確認(rèn)消息是否到達(dá)exchange座掘,已實(shí)現(xiàn)方法confirm中ack屬性為標(biāo)準(zhǔn)递惋,true到達(dá),反之進(jìn)入黑洞溢陪。
- ReturnCallback消息沒有正確到達(dá)隊(duì)列時(shí)觸發(fā)回調(diào)萍虽,如果正確到達(dá)隊(duì)列不執(zhí)行。
package com.fzb.rabbitmq.config;
import org.apache.commons.lang3.SerializationUtils;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
/**
* @Description 消息發(fā)送確認(rèn)
* <p>
* ConfirmCallback 只確認(rèn)消息是否正確到達(dá) Exchange 中
* ReturnCallback 消息沒有正確到達(dá)隊(duì)列時(shí)觸發(fā)回調(diào)形真,如果正確到達(dá)隊(duì)列不執(zhí)行
* <p>
* 1. 如果消息沒有到exchange,則confirm回調(diào),ack=false
* 2. 如果消息到達(dá)exchange,則confirm回調(diào),ack=true
* 3. exchange到queue成功,則不回調(diào)return
* 4. exchange到queue失敗,則回調(diào)return
* @Author jxb
* @Date 2019-04-04 16:57:04
*/
@Component
public class MQProducerAckConfig implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback {
@Autowired
private RabbitTemplate rabbitTemplate;
@PostConstruct
public void init() {
rabbitTemplate.setConfirmCallback(this); //指定 ConfirmCallback
rabbitTemplate.setReturnCallback(this); //指定 ReturnCallback
}
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
if (ack) {
System.out.println("消息發(fā)送成功" + correlationData);
} else {
System.out.println("消息發(fā)送失敗:" + cause);
}
}
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
// 反序列化對(duì)象輸出
System.out.println("消息主體: " + SerializationUtils.deserialize(message.getBody()));
System.out.println("應(yīng)答碼: " + replyCode);
System.out.println("描述:" + replyText);
System.out.println("消息使用的交換器 exchange : " + exchange);
System.out.println("消息使用的路由鍵 routing : " + routingKey);
}
}
3. 消費(fèi)者消息手動(dòng)確認(rèn)
- SpringBoot集成RabbitMQ確認(rèn)機(jī)制分為三種:none杉编、auto(默認(rèn))、manual
Auto:
1. 如果消息成功被消費(fèi)(成功的意思是在消費(fèi)的過程中沒有拋出異常)咆霜,則自動(dòng)確認(rèn)
2. 當(dāng)拋出 AmqpRejectAndDontRequeueException 異常的時(shí)候邓馒,則消息會(huì)被拒絕,且 requeue = false(不重新入隊(duì)列)
3. 當(dāng)拋出 ImmediateAcknowledgeAmqpException 異常蛾坯,則消費(fèi)者會(huì)被確認(rèn)
4. 其他的異常光酣,則消息會(huì)被拒絕,且 requeue = true脉课,此時(shí)會(huì)發(fā)生死循環(huán)挂疆,可以通過 setDefaultRequeueRejected(默認(rèn)是true)去設(shè)置拋棄消息
如設(shè)置成manual手動(dòng)確認(rèn),一定要對(duì)消息做出應(yīng)答下翎,否則rabbit認(rèn)為當(dāng)前隊(duì)列沒有消費(fèi)完成缤言,將不再繼續(xù)向該隊(duì)列發(fā)送消息。
channel.basicAck(long,boolean); 確認(rèn)收到消息视事,消息將被隊(duì)列移除胆萧,false只確認(rèn)當(dāng)前consumer一個(gè)消息收到,true確認(rèn)所有consumer獲得的消息。
channel.basicNack(long,boolean,boolean); 確認(rèn)否定消息跌穗,第一個(gè)boolean表示一個(gè)consumer還是所有订晌,第二個(gè)boolean表示requeue是否重新回到隊(duì)列,true重新入隊(duì)蚌吸。
channel.basicReject(long,boolean); 拒絕消息锈拨,requeue=false 表示不再重新入隊(duì),如果配置了死信隊(duì)列則進(jìn)入死信隊(duì)列羹唠。
當(dāng)消息回滾到消息隊(duì)列時(shí)奕枢,這條消息不會(huì)回到隊(duì)列尾部,而是仍是在隊(duì)列頭部佩微,這時(shí)消費(fèi)者會(huì)又接收到這條消息缝彬,如果想消息進(jìn)入隊(duì)尾,須確認(rèn)消息后再次發(fā)送消息哺眯。
channel.basicPublish(message.getMessageProperties().getReceivedExchange(),
message.getMessageProperties().getReceivedRoutingKey(),
MessageProperties.PERSISTENT_TEXT_PLAIN,
message.getBody());
- 延續(xù)上一章direct類型隊(duì)列為例谷浅,當(dāng)消息出現(xiàn)異常,判斷是否回滾過消息奶卓,如否則消息從新入隊(duì)一疯,反之拋棄消息。其中一個(gè)消費(fèi)者模擬一個(gè)異常夺姑。
@RabbitListener(bindings = {@QueueBinding(value = @Queue(value = "direct.queue"), exchange = @Exchange(value = "direct.exchange"), key = "HelloWorld")})
public void getDirectMessage(User user, Channel channel, Message message) throws IOException {
try {
// 模擬執(zhí)行任務(wù)
Thread.sleep(1000);
// 模擬異常
String is = null;
is.toString();
// 確認(rèn)收到消息墩邀,false只確認(rèn)當(dāng)前consumer一個(gè)消息收到,true確認(rèn)所有consumer獲得的消息
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (Exception e) {
if (message.getMessageProperties().getRedelivered()) {
System.out.println("消息已重復(fù)處理失敗,拒絕再次接收" + user.getName());
// 拒絕消息瑟幕,requeue=false 表示不再重新入隊(duì)磕蒲,如果配置了死信隊(duì)列則進(jìn)入死信隊(duì)列
channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
} else {
System.out.println("消息即將再次返回隊(duì)列處理" + user.getName());
// requeue為是否重新回到隊(duì)列,true重新入隊(duì)
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
}
//e.printStackTrace();
}
}
@RabbitListener(queues = "direct.queue")
public void getDirectMessageCopy(User user, Channel channel, Message message) throws IOException {
try {
// 模擬執(zhí)行任務(wù)
Thread.sleep(1000);
System.out.println("--jxb--MQConsumer--getDirectMessageCopy:" + user.toString());
// 確認(rèn)收到消息只盹,false只確認(rèn)當(dāng)前consumer一個(gè)消息收到辣往,true確認(rèn)所有consumer獲得的消息
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (Exception e) {
if (message.getMessageProperties().getRedelivered()) {
System.out.println("消息已重復(fù)處理失敗,拒絕再次接收!");
// 拒絕消息殖卑,requeue=false 表示不再重新入隊(duì)站削,如果配置了死信隊(duì)列則進(jìn)入死信隊(duì)列
channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
} else {
System.out.println("消息即將再次返回隊(duì)列處理!");
// requeue為是否重新回到隊(duì)列孵稽,true重新入隊(duì)
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
}
e.printStackTrace();
}
}
從執(zhí)行結(jié)果來看许起,三條消息都調(diào)用了confirm方法,說明消息發(fā)送到了exchange菩鲜,且沒有調(diào)用return方法园细,說明消息成功到達(dá)相應(yīng)隊(duì)列。
getDirectMessageCopy方法成功消費(fèi)掉“張三”這條消息接校,由于getDirectMessage方法模擬異常猛频,所以第一次把“李四”從新入隊(duì),此時(shí)getDirectMessageCopy繼續(xù)消費(fèi)“王五”成功,getDirectMessage方法因李四已經(jīng)從新入隊(duì)過鹿寻,再次發(fā)生異常則拋棄消息睦柴。
進(jìn)一步挖掘你會(huì)發(fā)現(xiàn),開始一共3條消息毡熏,有一條回滾消息總數(shù)變成了4條坦敌,每個(gè)消費(fèi)者消費(fèi)2條,所以兩個(gè)消費(fèi)者是輪詢分配的痢法。
- 工作隊(duì)列有兩種工作方式:輪詢分發(fā)(默認(rèn))狱窘、公平分發(fā)即當(dāng)某個(gè)消費(fèi)者沒有消費(fèi)完成之前不用再分發(fā)消息。
- 修改配置文件
# 消費(fèi)者每次從隊(duì)列獲取的消息數(shù)量疯暑。此屬性當(dāng)不設(shè)置時(shí)為:輪詢分發(fā)训柴,設(shè)置為1為:公平分發(fā)
spring.rabbitmq.listener.simple.prefetch=1
將第一個(gè)消費(fèi)者模擬執(zhí)行5秒哑舒,然后向數(shù)據(jù)庫(kù)增加一條數(shù)據(jù)妇拯,執(zhí)行結(jié)果為:
可以看到,getDirectMessageCopy執(zhí)行了4次洗鸵,getDirectMessage執(zhí)行了1次越锈,根據(jù)他們的消費(fèi)能力來公平分發(fā)消息。
如果可以膘滨,我想回到10年前...