@TransactionEventListener整合RabbitMq

前言

最近有反饋一些業(yè)務(wù)出現(xiàn)數(shù)據(jù)不一致的問(wèn)題,在檢測(cè)代碼后桨武,主要是一些業(yè)務(wù)下RabbitMq使用不當(dāng)肋拔,在DB事務(wù)沒(méi)有提交呀酸,數(shù)據(jù)沒(méi)落庫(kù)凉蜂,但是mq消費(fèi)者卻已經(jīng)執(zhí)行情況,最終導(dǎo)致數(shù)據(jù)不一致跃惫。

偽代碼

不推薦在service層推送消息叮叹,盡管sendMessage放在最后。都有可能消息執(zhí)行比事務(wù)提交快爆存。

@Transactional
public void save(){
      //業(yè)務(wù)保存
    save(Object obj);
    // 發(fā)送mq消息
     sendMessage(Object obj);
    // 或者處理其它蛉顽。先较。。
}
整合@TransactionEventListener

Spring提供了一個(gè)注解@TransactionEventListener闲勺,將這個(gè)注解標(biāo)注在某個(gè)方法上,那么就將這個(gè)方法聲明為了一個(gè)事務(wù)事件處理器菜循,而具體的事件類型則是由TransactionalEventListener.phase屬性進(jìn)行定義的。以下部分聲明:

@Target({ElementType.METHOD, ElementType.ANNOTATION_TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Documented
@EventListener
public @interface TransactionalEventListener {

    /**
     * 默認(rèn)AFTER_COMMIT:當(dāng)前事務(wù)commit之后 
     */
    TransactionPhase phase() default TransactionPhase.AFTER_COMMIT;

    /**
     * 默認(rèn)false:當(dāng)前方法如果沒(méi)有事務(wù)衙耕,相應(yīng)的事務(wù)事件監(jiān)聽器將不會(huì)執(zhí)行。
     */
    boolean fallbackExecution() default false;
}

/**
 * TransactionPhase 介紹
*/
public enum TransactionPhase {
    // 指定目標(biāo)方法在事務(wù)commit之前執(zhí)行
    BEFORE_COMMIT,

    // 指定目標(biāo)方法在事務(wù)commit之后執(zhí)行
    AFTER_COMMIT,

    // 指定目標(biāo)方法在事務(wù)rollback之后執(zhí)行
    AFTER_ROLLBACK,
    
    // 指定目標(biāo)方法在事務(wù)完成時(shí)執(zhí)行橙喘,這里的完成是指無(wú)論事務(wù)是成功提交還是回滾
    AFTER_COMPLETION
}
方案一改進(jìn)版

借助TransactionalEventListener,目的讓消息在當(dāng)前事務(wù)提交之后推送厅瞎。

  • 定義消息事件
/**
 * 消息推送事件
 */
public class MessageEvent<T> extends ApplicationEvent {

    /**
     * 交換機(jī)
     */
    private String exchange;

    /**
     * 路由key
     */
    private String routingKey;

    public MessageEvent(T source, String routingKey) {
        super(source);
        this.exchange = "";
        this.routingKey = routingKey;
    }

    public MessageEvent(T source, String exchange, String routingKey) {
        super(source);
        this.routingKey = routingKey;
        this.exchange = exchange;
    }
  
    public T getSource() {
        return (T) source;
    }

    public String getExchange() {
        return exchange;
    }

    public String getRoutingKey() {
        return routingKey;
    }
}
  • 消息發(fā)布
/**
 * 消息發(fā)布
 */
@Component
public class MessagePublisher implements ApplicationContextAware {

    private ApplicationContext applicationContext;

    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        this.applicationContext = applicationContext;
    }

    public void sendMessage(String routingKey, Object object) {
        applicationContext.publishEvent(new MessageEvent(object, routingKey));
    }

    public void sendMessage(String routingKey, String message) {
        applicationContext.publishEvent(new MessageEvent(message, routingKey));
    }

    public void sendMessage(String exchange, String routingKey, Object message) {
        applicationContext.publishEvent(new MessageEvent(message, exchange, routingKey));
    }
}

  • 消息監(jiān)聽
@Slf4j
@Component
public class MessageEventListener {

    @Autowired
    private MqService mqService;

    /**
     * 推送Mq消息
     * 設(shè)置fallbackExecution = true和簸,不管前面是否開啟事務(wù),事務(wù)事件監(jiān)聽器都執(zhí)行
     */
    @TransactionalEventListener(fallbackExecution = true)
    public void consumer(MessageEvent notifyEvent) {
        mqService.sendMessage(notifyEvent.getExchange(), notifyEvent.getRoutingKey(), notifyEvent.getSource());
    }
}
  • 模擬TransactionalEventListener打印結(jié)果
    @Autowired
    private MessagePublisher messagePublisher;

    @Transactional
    public void save(Record record) throws InterruptedException {
        this.insertSelective(record);
        messagePublisher.publishMessage(new MessageEvent(record, MessageContant.EXCHANGE_NAME, MessageContant.ROUTING_NAME));

        Thread.sleep(2000L);
        log.info("--------------》 執(zhí)行其它業(yè)務(wù)結(jié)束");
    }
5f1a78cb2a7ac.png
方案二-改造版
@Component
public class MqServiceImpl implements ApplicationContextAware比搭,MqService {

    private ApplicationContext applicationContext;

    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        this.applicationContext = applicationContext;
    }
    
    @Override
    public void sendMessage(String routingKey, Object object) {
         applicationContext.publishEvent(new MessageEvent(object, routingKey));
    }

    @Override
    public void sendMessage(String routingKey, String message) {
         applicationContext.publishEvent(new MessageEvent(message, routingKey));
    }

    @Override
    public void sendMessage(String exchange, String routingKey, Object message) {
        applicationContext.publishEvent(new MessageEvent(message, exchange, routingKey));
    }
}
總結(jié)

一般推薦寫法身诺,消息跟事務(wù)獨(dú)立開來(lái),先提交事務(wù)霉赡,然后發(fā)送mq,或者放在controller層穴亏。引入@TransactionEventListener對(duì)目前已有代碼改造方便、修改代碼最少嗓化。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市刺覆,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌驳糯,老刑警劉巖,帶你破解...
    沈念sama閱讀 216,591評(píng)論 6 501
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件酝枢,死亡現(xiàn)場(chǎng)離奇詭異悍手,居然都是意外死亡帘睦,警方通過(guò)查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,448評(píng)論 3 392
  • 文/潘曉璐 我一進(jìn)店門谓苟,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)官脓,“玉大人涝焙,你說(shuō)我怎么就攤上這事÷刈玻” “怎么了妖滔?”我有些...
    開封第一講書人閱讀 162,823評(píng)論 0 353
  • 文/不壞的土叔 我叫張陵,是天一觀的道長(zhǎng)座舍。 經(jīng)常有香客問(wèn)我,道長(zhǎng)曲秉,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 58,204評(píng)論 1 292
  • 正文 為了忘掉前任榆鼠,我火速辦了婚禮,結(jié)果婚禮上妆够,老公的妹妹穿的比我還像新娘识啦。我一直安慰自己神妹,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,228評(píng)論 6 388
  • 文/花漫 我一把揭開白布冕茅。 她就那樣靜靜地躺著腰鬼,像睡著了一般嵌赠。 火紅的嫁衣襯著肌膚如雪熄赡。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 51,190評(píng)論 1 299
  • 那天炊豪,我揣著相機(jī)與錄音,去河邊找鬼词渤。 笑死,一個(gè)胖子當(dāng)著我的面吹牛缺虐,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播高氮,決...
    沈念sama閱讀 40,078評(píng)論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼顷牌,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼!你這毒婦竟也來(lái)了窟蓝?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 38,923評(píng)論 0 274
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤状共,失蹤者是張志新(化名)和其女友劉穎,沒(méi)想到半個(gè)月后口芍,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,334評(píng)論 1 310
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡鬓椭,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,550評(píng)論 2 333
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了翘瓮。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 39,727評(píng)論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡资盅,死狀恐怖踊赠,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情筐带,我是刑警寧澤,帶...
    沈念sama閱讀 35,428評(píng)論 5 343
  • 正文 年R本政府宣布伦籍,位于F島的核電站,受9級(jí)特大地震影響帖鸦,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜作儿,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,022評(píng)論 3 326
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望灭红。 院中可真熱鬧,春花似錦、人聲如沸君珠。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,672評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)。三九已至乐导,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間物臂,已是汗流浹背旺拉。 一陣腳步聲響...
    開封第一講書人閱讀 32,826評(píng)論 1 269
  • 我被黑心中介騙來(lái)泰國(guó)打工棵磷, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人仪媒。 一個(gè)月前我還...
    沈念sama閱讀 47,734評(píng)論 2 368
  • 正文 我出身青樓,卻偏偏與公主長(zhǎng)得像留凭,于是被迫代替她去往敵國(guó)和親。 傳聞我的和親對(duì)象是個(gè)殘疾皇子蔼夜,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,619評(píng)論 2 354