一婆殿、事務(wù)
RabbitMQ中與事務(wù)機(jī)制有關(guān)的方法有三個(gè):txSelect(), txCommit()以及txRollback()诈乒。txSelect()開(kāi)啟事務(wù),生產(chǎn)者發(fā)送消息內(nèi)容給mq婆芦,這是一階段提交怕磨。然后本地可以繼續(xù)處理自己的業(yè)務(wù)邏輯,處理完提交事務(wù)寞缝,就發(fā)送提交事務(wù)的消息給mq癌压,mq就可以直接后續(xù)處理了。如果本地處理有問(wèn)題荆陆,回滾本地業(yè)務(wù)滩届,發(fā)送一個(gè)回滾事務(wù)的消息給mq,mq就知道這條消息作廢了被啼,進(jìn)行回滾帜消,不進(jìn)行后續(xù)的操作了。
但事務(wù)機(jī)制是一個(gè)同步的過(guò)程浓体,效率相對(duì)較低泡挺,如果對(duì)數(shù)據(jù)一致性要求很高的話(huà)可以使用事務(wù)機(jī)制。
二命浴、ack消息確認(rèn)
rabbitmq的confirm模式是異步的娄猫,所以相對(duì)效率會(huì)高很多。
1.rabiitmq消息確認(rèn)分為兩種:
1.發(fā)送消息的確認(rèn)生闲。分為消息發(fā)送到交換機(jī)的確認(rèn)媳溺、消息發(fā)送到隊(duì)列的確認(rèn)
2.接收消息的確認(rèn)。
2.springboot集成rabiitmq的確認(rèn)模式:
acknowledgeMode有三值:
A碍讯、NONE = no acks will be sent (incompatible with channelTransacted=true).
RabbitMQ calls this "autoack" because the broker assumes all messages are acked without any action from the consumer.
B悬蔽、MANUAL = the listener must acknowledge all messages by calling Channel.basicAck().
C、AUTO = the container will acknowledge the message automatically, unless the MessageListener throws an exception.
簡(jiǎn)單來(lái)說(shuō)也就是:
none:不確認(rèn)捉兴,不會(huì)發(fā)送任何ack
manual:手動(dòng)確認(rèn)蝎困,發(fā)送端和客戶(hù)端都需要手動(dòng)確認(rèn)
auto:自動(dòng)確認(rèn)录语,就是自動(dòng)發(fā)ack,除非拋異常禾乘。
3.代碼
配置:
@Configuration
public class MqConsumerConfig {
public final static String QUEUE_ACK_NAME = "orderme-queue.yannic.ack";
public static final String ORDER_WEBSOCKET_EXCHANGE = "orderme.yannic.websocket";
@Bean(name="orderTopicAckQueue")
public Queue orderTopicAckQueue() {
return new Queue(QUEUE_ACK_NAME);
}
@Bean(name = "orderWebSocketExchange")
public TopicExchange orderWebSocketExchange() {
return new TopicExchange(ORDER_WEBSOCKET_EXCHANGE);
}
@Bean
Binding bindingExchangeAckMessage(@Qualifier("orderTopicAckQueue") Queue queueMessage, TopicExchange exchange) {
return BindingBuilder.bind(queueMessage).to(exchange).with("yannic.*");
}
/**
* 定制化amqp模版
* connectionFactory:包含了yml文件配置參數(shù)
*/
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
// 必須設(shè)置為 true澎埠,不然當(dāng) 發(fā)送到交換器成功,但是沒(méi)有匹配的隊(duì)列盖袭,不會(huì)觸發(fā) ReturnCallback 回調(diào)
// 而且 ReturnCallback 比 ConfirmCallback 先回調(diào)失暂,意思就是 ReturnCallback 執(zhí)行完了才會(huì)執(zhí)行 ConfirmCallback
rabbitTemplate.setMandatory(true);
// 設(shè)置 ConfirmCallback 回調(diào) yml需要配置 publisher-confirms: true
rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
// 如果發(fā)送到交換器都沒(méi)有成功(比如說(shuō)刪除了交換器),ack 返回值為 false
// 如果發(fā)送到交換器成功鳄虱,但是沒(méi)有匹配的隊(duì)列(比如說(shuō)取消了綁定)弟塞,ack 返回值為還是 true (這是一個(gè)坑,需要注意)
if (ack) {
String messageId = correlationData.getId();
System.out.println("confirm:"+messageId);
}
});
// 設(shè)置 ReturnCallback 回調(diào) yml需要配置 publisher-returns: true
// 如果發(fā)送到交換器成功拙已,但是沒(méi)有匹配的隊(duì)列决记,就會(huì)觸發(fā)這個(gè)回調(diào)
rabbitTemplate.setReturnCallback((message, replyCode, replyText,
exchange, routingKey) -> {
String messageId = message.getMessageProperties().getMessageId();
System.out.println("return:"+messageId);
});
return rabbitTemplate;
}
}
發(fā)送端:
/**
* 發(fā)送信息確認(rèn)ack
* @param exchange
* @param routingKey
* @param object
*/
public void sendMessageAck(String exchange, String routingKey, Object object) {
logger.info("mq消息發(fā)送開(kāi)始===》");
try {
//CorrelationData用于confirm機(jī)制里的回調(diào)確認(rèn)
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
rabbitTemplate.convertAndSend(exchange,routingKey,JSON.toJSONString(object),correlationData);
logger.info("mq消息發(fā)送結(jié)束==》{}", object);
} catch (Exception e) {
logger.error(String.format("mq 發(fā)送 %s 的數(shù)據(jù) %s 異常", exchange, object), e);
} finally {
}
}
消費(fèi)端:
/**
* 手動(dòng)確認(rèn)ack
* @param msg
*/
@RabbitListener(queues = MqConsumerConfig.QUEUE_ACK_NAME)
public void consumeTopicAckMessage(Message msg, Channel channel) {
logger.info("接收的消息為:{}",msg.getBody());
try {
channel.basicAck(msg.getMessageProperties().getDeliveryTag(), false);
} catch (IOException e) {
e.printStackTrace();
logger.error("接收mq消息失敗:{}",msg);
}
}
4.深入思考
生產(chǎn)者發(fā)送消息給mq,遇到網(wǎng)絡(luò)抖動(dòng)或者mq這時(shí)候宕機(jī)了倍踪,沒(méi)有收到mq的ack怎么辦系宫?
方案一:就是事務(wù)控制咯。這個(gè)就是效率慢建车,rabiitmq的事務(wù)與confirm不能同時(shí)使用.
方案二:生產(chǎn)者這邊業(yè)務(wù)控制扩借。比如生產(chǎn)者每次發(fā)消息之前先把消息保存到本地,如果收到ack就把這個(gè)消息給刪除缤至,沒(méi)有收到就隔一段時(shí)間重試潮罪,最多重試個(gè)3次,還是沒(méi)收到就把這個(gè)消息登記起來(lái)后續(xù)處理领斥,不再發(fā)送了嫉到。