RocketMQ事務消息與分布式事務

《RocketMQ實戰(zhàn)入門》里我們入門了基本的RocketMQ消息發(fā)布和消費,并封裝了一個簡單的util包迎膜,現在我們來看一下如何使用RocketMQ的事務消息來解決分布式事務問題捏浊。

事務消息基本流程

說明福扬,RocketMQ來實現分布式事務主要基于的是BASE理論毅贮,即基本可用昆咽、軟狀態(tài)驾凶、最終一致性。屬于剛性事務與柔性事務中的后者掷酗,性能較好调违,但取的是最終一致性。流程類似于2PC泻轰,但是個異步過程技肩。

流程如下:

  1. Producer發(fā)送半消息給RocketMQ,收到回復后開始執(zhí)行自己本地的事務浮声。半消息不會被消費虚婿。

  2. 執(zhí)行本地事務并將執(zhí)行的結果發(fā)送通知RocketMQ旋奢,MQ根據結果來確定之前的半消息是提交還是丟棄。

  3. 如果半消息提交然痊,RocketMQ將負責確保該消息被Consumer消費至朗,Consumer消費到消息即執(zhí)行自己的事務。

異常情況:

  1. 如果Producer沒有將本地事務結果發(fā)送給MQ玷过,此時事務處于UNKNOW狀態(tài)爽丹,則MQ每60s會通過Producer提供的回調接口來反查其事務執(zhí)行結果來決定半消息是提交還是丟棄。默認會執(zhí)行此回調15次辛蚊,如果還是UNKNOW則丟棄半消息粤蝎,并打印錯誤日志。

  2. 如果Producer發(fā)送完半消息之后掛了袋马,MQ會去同一個group里的其他Producer調用回調接口反查事務執(zhí)行結果初澎。

代碼編寫

RocketMQ發(fā)送事務消息主要是使用TransactionMQProducer這個Producer。然后實現TransactionListener接口虑凛,并將該Listener綁定到Producer上碑宴。

TransactionMQProducer與普通消息的DefaultMQProducer比較類似,而TransactionListener接口需要實現executeLocalTransaction()和checkLocalTransaction()兩個方法桑谍,分別寫半消息發(fā)送成功后的本地事務邏輯以及供MQ回調的反查事務結果邏輯延柠。

參考官方例子https://github.com/apache/rocketmq/blob/master/docs/cn/RocketMQ_Example.md,實際使用中一般應該是在executeLocalTransaction()根據業(yè)務邏輯執(zhí)行情況直接返回明確的事務結果是Commit還是Rollback的锣披。這里為了測試故意在executeLocalTransaction()返回事務結果UNKNOW贞间,讓MQ執(zhí)行反查邏輯回調checkLocalTransaction()方法來獲得事務結果。代碼如下:(部分代碼在前文《RocketMQ實戰(zhàn)入門》中有)

TransactionProducer:

/**
 * 事務消息Producer
 * 用于發(fā)送事務消息雹仿,實現分布式事務
 * 
 * 跟普通消息用的Producer不同增热,事務Producer沒法全局用個單例,
 * 因為需要綁定TransactionListener的業(yè)務處理邏輯胧辽。
 * */
@Slf4j
@Component
public class TransactionMsgProducer {
    
    @Value("${rocketmq.url}")
    private String mqurl;
    
    @Value("${rocketmq.accessKey}")
    private String accessKey;
    
    @Value("${rocketmq.secretKey}")
    private String secretKey;
    
    @Value("${rocketmq.producergroup.name}")
    private String producerGroupName;
    
    private TransactionMQProducer producer;
    
    public TransactionSendResult publish(EventMessage eventMsg) {
        try {
            Message msg = new Message(eventMsg.getTopic(), eventMsg.getTag(), eventMsg.getMsgId(), JSON.toJSONString(eventMsg).getBytes("utf-8"));
            TransactionSendResult result = producer.sendMessageInTransaction(msg, null);
            return result;
        } catch (Exception e) {
            log.error("事務消息發(fā)送失敗" + e.getMessage(), e);
            e.printStackTrace();
        }
        return null;
    }
    
    @PostConstruct
    public void init() {
        producer = new TransactionMQProducer(producerGroupName + "-trans", getAclRPCHook());
        producer.setNamesrvAddr(mqurl);
        TransactionListener transactionListener = new MyTransactionListener();
        producer.setTransactionListener(transactionListener);
        try {
            producer.start();
            log.info("RocketMQ客戶端事務producer初始化...");
        } catch (MQClientException e) {
            e.printStackTrace();
        }
    }
    
    @PreDestroy
    public void shutdown() {
        if(producer != null)
            producer.shutdown();
    }
    
    private RPCHook getAclRPCHook() {
        return new AclClientRPCHook(new SessionCredentials(accessKey, secretKey));
    }
}

TransactionListener:

/**
 * 半消息發(fā)送成功則執(zhí)行executeLocalTransaction
 * RocketMQ回調checkLocalTransaction來確認Producer本地的事務執(zhí)行狀態(tài)
 * */
@Slf4j
public class MyTransactionListener implements TransactionListener{
    
    private AtomicInteger transactionIndex = new AtomicInteger(0);
    private ConcurrentHashMap<String, Integer> localTransactions = new ConcurrentHashMap<>();

    @Override
    public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
        log.info("Producer開始執(zhí)行本地事務...");
        int value = transactionIndex.getAndIncrement();
        localTransactions.put(msg.getTransactionId(), value % 3);
        log.info("事務ID:{},執(zhí)行狀態(tài){}, 但統一返回RocketMQ事務狀態(tài)為UNKNOW", msg.getTransactionId(), value % 3);
        return LocalTransactionState.UNKNOW;
    }

    @Override
    public LocalTransactionState checkLocalTransaction(MessageExt msg) {
        Integer state = localTransactions.get(msg.getTransactionId());
        log.info("事務ID:{}執(zhí)行后返回MQ狀態(tài)為UNKNOW峻仇,觸發(fā)RocketMQ回調查詢事務狀態(tài)機制,"
                + "MQ調用Producer提供的回調接口查詢事務狀態(tài)為{}", msg.getTransactionId(), state);
        if(null != state) {
            switch(state) {
            case 0:
                log.info("從肥兔子賬戶-300操作還沒執(zhí)行完");
                return LocalTransactionState.UNKNOW;
            case 1:
                log.info("已經成功從肥兔子賬戶-300");
                return LocalTransactionState.COMMIT_MESSAGE;
            case 2:
                log.info("從肥兔子賬戶-300失敗");
                return LocalTransactionState.ROLLBACK_MESSAGE;
            }
        }
        return LocalTransactionState.COMMIT_MESSAGE;
    }

}

消費者邑商,也即分布式事務的其他參與者:

@Slf4j
@Component
@MsgConsumer(consumerGroup = "testapp-consumer-group", tag = "douchuzi", topic = "test-trans-topic")
public class TestConsumer implements MQMsgHandler{

    @Override
    public void handleMsg(List<EventMessage> msgs) {
        for(EventMessage msg : msgs) {
            log.info(JSON.toJSONString(msg));
        }
    }
}

測試代碼:

EventMessage msg = new EventMessage();
msg.setTopic("test-trans-topic");
msg.setTag("douchuzi");
msg.setMsgId("123");
msg.setProducerGroup("testapp-producer-group");
msg.setMsgBody("給豆畜子賬戶+300塊");
msg.setPublishTime(LocalDateTime.now().toString());
TransactionSendResult result = transactionMsgProducer.publish(msg);
log.info("肥兔子給豆畜子轉賬的事務消息發(fā)送完畢摄咆,"
        + "消息ID:{}, 事務ID{},消息發(fā)送狀態(tài):{},分布式事務開始人断!", 
        result.getMsgId(), 
        result.getTransactionId(), 
        result.getSendStatus());

測試執(zhí)行結果分3種情況豆同,執(zhí)行3次一個循環(huán)。

反查本地事務UNKNOW:

2022-02-01 22:48:37.172  INFO 12388 --- [nio-8080-exec-2] c.w.controller.MyTransactionListener     : Producer開始執(zhí)行本地事務...
2022-02-01 22:48:37.173  INFO 12388 --- [nio-8080-exec-2] c.w.controller.MyTransactionListener     : 事務ID:7F00000130644E0E2F2A04E502240000,執(zhí)行狀態(tài)0, 但統一返回RocketMQ事務狀態(tài)為UNKNOW
2022-02-01 22:48:37.174  INFO 12388 --- [nio-8080-exec-2] com.wangan.controller.WanganController   : 肥兔子給豆畜子轉賬的事務消息發(fā)送完畢含鳞,消息ID:7F00000130644E0E2F2A04E502240000, 事務IDnull,消息發(fā)送狀態(tài):SEND_OK,分布式事務開始芹务!
2022-02-01 22:49:05.702  INFO 12388 --- [pool-1-thread-1] c.w.controller.MyTransactionListener     : 事務ID:7F00000130644E0E2F2A04E502240000執(zhí)行后返回MQ狀態(tài)為UNKNOW蝉绷,觸發(fā)RocketMQ回調查詢事務狀態(tài)機制鸭廷,MQ調用Producer提供的回調接口查詢事務狀態(tài)為0
2022-02-01 22:49:05.702  INFO 12388 --- [pool-1-thread-1] c.w.controller.MyTransactionListener     : 從肥兔子賬戶-300操作還沒執(zhí)行完
2022-02-01 22:50:05.700  INFO 12388 --- [pool-1-thread-1] c.w.controller.MyTransactionListener     : 事務ID:7F00000130644E0E2F2A04E502240000執(zhí)行后返回MQ狀態(tài)為UNKNOW,觸發(fā)RocketMQ回調查詢事務狀態(tài)機制熔吗,MQ調用Producer提供的回調接口查詢事務狀態(tài)為0
2022-02-01 22:50:05.700  INFO 12388 --- [pool-1-thread-1] c.w.controller.MyTransactionListener     : 從肥兔子賬戶-300操作還沒執(zhí)行完
2022-02-01 22:51:05.697  INFO 12388 --- [pool-1-thread-1] c.w.controller.MyTransactionListener     : 事務ID:7F00000130644E0E2F2A04E502240000執(zhí)行后返回MQ狀態(tài)為UNKNOW辆床,觸發(fā)RocketMQ回調查詢事務狀態(tài)機制,MQ調用Producer提供的回調接口查詢事務狀態(tài)為0
2022-02-01 22:51:05.697  INFO 12388 --- [pool-1-thread-1] c.w.controller.MyTransactionListener     : 從肥兔子賬戶-300操作還沒執(zhí)行完

發(fā)送完半消息之后執(zhí)行本地事務并回復MQ事務執(zhí)行結果unknow桅狠,這樣MQ會反查回調接口來確認事務結果讼载,1分鐘查1次,默認15次仍無法確認事務結果則丟棄半消息中跌,這里為了測試方便改成了3次(修改broker.conf里的transactionCheckMax=3)咨堤。

反查本地事務COMMIT_MESSAGE:

2022-02-01 22:53:27.323  INFO 12388 --- [nio-8080-exec-5] c.w.controller.MyTransactionListener     : Producer開始執(zhí)行本地事務...
2022-02-01 22:53:27.323  INFO 12388 --- [nio-8080-exec-5] c.w.controller.MyTransactionListener     : 事務ID:7F00000130644E0E2F2A04E96F880001,執(zhí)行狀態(tài)1, 但統一返回RocketMQ事務狀態(tài)為UNKNOW
2022-02-01 22:53:27.323  INFO 12388 --- [nio-8080-exec-5] com.wangan.controller.WanganController   : 肥兔子給豆畜子轉賬的事務消息發(fā)送完畢,消息ID:7F00000130644E0E2F2A04E96F880001, 事務IDnull,消息發(fā)送狀態(tài):SEND_OK漩符,分布式事務開始一喘!
2022-02-01 22:54:05.708  INFO 12388 --- [pool-1-thread-1] c.w.controller.MyTransactionListener     : 事務ID:7F00000130644E0E2F2A04E96F880001執(zhí)行后返回MQ狀態(tài)為UNKNOW,觸發(fā)RocketMQ回調查詢事務狀態(tài)機制嗜暴,MQ調用Producer提供的回調接口查詢事務狀態(tài)為1
2022-02-01 22:54:05.708  INFO 12388 --- [pool-1-thread-1] c.w.controller.MyTransactionListener     : 已經成功從肥兔子賬戶-300
2022-02-01 22:54:05.718  INFO 12388 --- [MessageThread_6] com.wangan.controller.TestConsumer       : {"msgBody":"給豆畜子賬戶+300塊","msgId":"123","producerGroup":"testapp-producer-group","publishTime":"2022-02-01T22:53:27.304","tag":"douchuzi","topic":"test-trans-topic"}

MQ反查事務結果凸克,獲知事務成功,MQ提交半消息闷沥,消息即可被消費者正常消費并執(zhí)行其事務躺率。分布式事務完成。

反查本地事務ROLLBACK_MESSAGE:

2022-02-01 22:54:59.306  INFO 12388 --- [nio-8080-exec-8] c.w.controller.MyTransactionListener     : Producer開始執(zhí)行本地事務...
2022-02-01 22:54:59.306  INFO 12388 --- [nio-8080-exec-8] c.w.controller.MyTransactionListener     : 事務ID:7F00000130644E0E2F2A04EAD6DD0002,執(zhí)行狀態(tài)2, 但統一返回RocketMQ事務狀態(tài)為UNKNOW
2022-02-01 22:54:59.307  INFO 12388 --- [nio-8080-exec-8] com.wangan.controller.WanganController   : 肥兔子給豆畜子轉賬的事務消息發(fā)送完畢执俩,消息ID:7F00000130644E0E2F2A04EAD6DD0002, 事務IDnull,消息發(fā)送狀態(tài):SEND_OK背捌,分布式事務開始!
2022-02-01 22:56:05.718  INFO 12388 --- [pool-1-thread-1] c.w.controller.MyTransactionListener     : 事務ID:7F00000130644E0E2F2A04EAD6DD0002執(zhí)行后返回MQ狀態(tài)為UNKNOW颖侄,觸發(fā)RocketMQ回調查詢事務狀態(tài)機制鸟雏,MQ調用Producer提供的回調接口查詢事務狀態(tài)為2
2022-02-01 22:56:05.718  INFO 12388 --- [pool-1-thread-1] c.w.controller.MyTransactionListener     : 從肥兔子賬戶-300失敗

反查事務結果為事務失敗,MQ刪除半消息览祖。

總結

RocketMQ事務消息可以實現柔性分布式事務孝鹊,確保最終一致性,不鎖定數據庫資源展蒂,適合高并發(fā)場景又活。

?著作權歸作者所有,轉載或內容合作請聯系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市锰悼,隨后出現的幾起案子柳骄,更是在濱河造成了極大的恐慌,老刑警劉巖箕般,帶你破解...
    沈念sama閱讀 212,542評論 6 493
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件耐薯,死亡現場離奇詭異,居然都是意外死亡,警方通過查閱死者的電腦和手機曲初,發(fā)現死者居然都...
    沈念sama閱讀 90,596評論 3 385
  • 文/潘曉璐 我一進店門体谒,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人臼婆,你說我怎么就攤上這事抒痒。” “怎么了颁褂?”我有些...
    開封第一講書人閱讀 158,021評論 0 348
  • 文/不壞的土叔 我叫張陵故响,是天一觀的道長。 經常有香客問我颁独,道長彩届,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 56,682評論 1 284
  • 正文 為了忘掉前任奖唯,我火速辦了婚禮惨缆,結果婚禮上,老公的妹妹穿的比我還像新娘丰捷。我一直安慰自己坯墨,他們只是感情好,可當我...
    茶點故事閱讀 65,792評論 6 386
  • 文/花漫 我一把揭開白布病往。 她就那樣靜靜地躺著捣染,像睡著了一般。 火紅的嫁衣襯著肌膚如雪停巷。 梳的紋絲不亂的頭發(fā)上耍攘,一...
    開封第一講書人閱讀 49,985評論 1 291
  • 那天,我揣著相機與錄音畔勤,去河邊找鬼蕾各。 笑死,一個胖子當著我的面吹牛庆揪,可吹牛的內容都是我干的式曲。 我是一名探鬼主播,決...
    沈念sama閱讀 39,107評論 3 410
  • 文/蒼蘭香墨 我猛地睜開眼缸榛,長吁一口氣:“原來是場噩夢啊……” “哼吝羞!你這毒婦竟也來了?” 一聲冷哼從身側響起内颗,我...
    開封第一講書人閱讀 37,845評論 0 268
  • 序言:老撾萬榮一對情侶失蹤钧排,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后均澳,有當地人在樹林里發(fā)現了一具尸體恨溜,經...
    沈念sama閱讀 44,299評論 1 303
  • 正文 獨居荒郊野嶺守林人離奇死亡符衔,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內容為張勛視角 年9月15日...
    茶點故事閱讀 36,612評論 2 327
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現自己被綠了糟袁。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片柏腻。...
    茶點故事閱讀 38,747評論 1 341
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖系吭,靈堂內的尸體忽然破棺而出,到底是詐尸還是另有隱情颗品,我是刑警寧澤肯尺,帶...
    沈念sama閱讀 34,441評論 4 333
  • 正文 年R本政府宣布,位于F島的核電站躯枢,受9級特大地震影響则吟,放射性物質發(fā)生泄漏。R本人自食惡果不足惜锄蹂,卻給世界環(huán)境...
    茶點故事閱讀 40,072評論 3 317
  • 文/蒙蒙 一氓仲、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧得糜,春花似錦敬扛、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,828評論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至治宣,卻和暖如春急侥,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背侮邀。 一陣腳步聲響...
    開封第一講書人閱讀 32,069評論 1 267
  • 我被黑心中介騙來泰國打工坏怪, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人绊茧。 一個月前我還...
    沈念sama閱讀 46,545評論 2 362
  • 正文 我出身青樓铝宵,卻偏偏與公主長得像,于是被迫代替她去往敵國和親按傅。 傳聞我的和親對象是個殘疾皇子捉超,可洞房花燭夜當晚...
    茶點故事閱讀 43,658評論 2 350

推薦閱讀更多精彩內容