前言
最近有反饋一些業(yè)務(wù)出現(xiàn)數(shù)據(jù)不一致的問(wèn)題,在檢測(cè)代碼后桨武,主要是一些業(yè)務(wù)下RabbitMq使用不當(dāng)肋拔,在DB事務(wù)沒(méi)有提交呀酸,數(shù)據(jù)沒(méi)落庫(kù)凉蜂,但是mq消費(fèi)者卻已經(jīng)執(zhí)行情況,最終導(dǎo)致數(shù)據(jù)不一致跃惫。
偽代碼
不推薦在service層推送消息叮叹,盡管sendMessage放在最后。都有可能消息執(zhí)行比事務(wù)提交快爆存。
@Transactional
public void save(){
//業(yè)務(wù)保存
save(Object obj);
// 發(fā)送mq消息
sendMessage(Object obj);
// 或者處理其它蛉顽。先较。。
}
整合@TransactionEventListener
Spring提供了一個(gè)注解@TransactionEventListener闲勺,將這個(gè)注解標(biāo)注在某個(gè)方法上,那么就將這個(gè)方法聲明為了一個(gè)事務(wù)事件處理器菜循,而具體的事件類型則是由TransactionalEventListener.phase屬性進(jìn)行定義的。以下部分聲明:
@Target({ElementType.METHOD, ElementType.ANNOTATION_TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Documented
@EventListener
public @interface TransactionalEventListener {
/**
* 默認(rèn)AFTER_COMMIT:當(dāng)前事務(wù)commit之后
*/
TransactionPhase phase() default TransactionPhase.AFTER_COMMIT;
/**
* 默認(rèn)false:當(dāng)前方法如果沒(méi)有事務(wù)衙耕,相應(yīng)的事務(wù)事件監(jiān)聽器將不會(huì)執(zhí)行。
*/
boolean fallbackExecution() default false;
}
/**
* TransactionPhase 介紹
*/
public enum TransactionPhase {
// 指定目標(biāo)方法在事務(wù)commit之前執(zhí)行
BEFORE_COMMIT,
// 指定目標(biāo)方法在事務(wù)commit之后執(zhí)行
AFTER_COMMIT,
// 指定目標(biāo)方法在事務(wù)rollback之后執(zhí)行
AFTER_ROLLBACK,
// 指定目標(biāo)方法在事務(wù)完成時(shí)執(zhí)行橙喘,這里的完成是指無(wú)論事務(wù)是成功提交還是回滾
AFTER_COMPLETION
}
方案一改進(jìn)版
借助TransactionalEventListener,目的讓消息在當(dāng)前事務(wù)提交之后推送厅瞎。
- 定義消息事件
/**
* 消息推送事件
*/
public class MessageEvent<T> extends ApplicationEvent {
/**
* 交換機(jī)
*/
private String exchange;
/**
* 路由key
*/
private String routingKey;
public MessageEvent(T source, String routingKey) {
super(source);
this.exchange = "";
this.routingKey = routingKey;
}
public MessageEvent(T source, String exchange, String routingKey) {
super(source);
this.routingKey = routingKey;
this.exchange = exchange;
}
public T getSource() {
return (T) source;
}
public String getExchange() {
return exchange;
}
public String getRoutingKey() {
return routingKey;
}
}
- 消息發(fā)布
/**
* 消息發(fā)布
*/
@Component
public class MessagePublisher implements ApplicationContextAware {
private ApplicationContext applicationContext;
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
this.applicationContext = applicationContext;
}
public void sendMessage(String routingKey, Object object) {
applicationContext.publishEvent(new MessageEvent(object, routingKey));
}
public void sendMessage(String routingKey, String message) {
applicationContext.publishEvent(new MessageEvent(message, routingKey));
}
public void sendMessage(String exchange, String routingKey, Object message) {
applicationContext.publishEvent(new MessageEvent(message, exchange, routingKey));
}
}
- 消息監(jiān)聽
@Slf4j
@Component
public class MessageEventListener {
@Autowired
private MqService mqService;
/**
* 推送Mq消息
* 設(shè)置fallbackExecution = true和簸,不管前面是否開啟事務(wù),事務(wù)事件監(jiān)聽器都執(zhí)行
*/
@TransactionalEventListener(fallbackExecution = true)
public void consumer(MessageEvent notifyEvent) {
mqService.sendMessage(notifyEvent.getExchange(), notifyEvent.getRoutingKey(), notifyEvent.getSource());
}
}
- 模擬TransactionalEventListener打印結(jié)果
@Autowired
private MessagePublisher messagePublisher;
@Transactional
public void save(Record record) throws InterruptedException {
this.insertSelective(record);
messagePublisher.publishMessage(new MessageEvent(record, MessageContant.EXCHANGE_NAME, MessageContant.ROUTING_NAME));
Thread.sleep(2000L);
log.info("--------------》 執(zhí)行其它業(yè)務(wù)結(jié)束");
}
方案二-改造版
@Component
public class MqServiceImpl implements ApplicationContextAware比搭,MqService {
private ApplicationContext applicationContext;
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
this.applicationContext = applicationContext;
}
@Override
public void sendMessage(String routingKey, Object object) {
applicationContext.publishEvent(new MessageEvent(object, routingKey));
}
@Override
public void sendMessage(String routingKey, String message) {
applicationContext.publishEvent(new MessageEvent(message, routingKey));
}
@Override
public void sendMessage(String exchange, String routingKey, Object message) {
applicationContext.publishEvent(new MessageEvent(message, exchange, routingKey));
}
}
總結(jié)
一般推薦寫法身诺,消息跟事務(wù)獨(dú)立開來(lái),先提交事務(wù)霉赡,然后發(fā)送mq,或者放在controller層穴亏。引入@TransactionEventListener對(duì)目前已有代碼改造方便、修改代碼最少嗓化。