RocketMQ實現(xiàn)分布式事務

一. 概述

常見分布式事務的解決方案有:

二. 基礎概念

RocketMQ是一種最終一致性的分布式事務, 就是說它保證的是消息最終一致性

2.1 事務交互流程

說明:

  1. 發(fā)送方發(fā)送半消息給服務端, 消息中攜帶通知B服務執(zhí)行需要的信息
  2. 服務端接受半消息成功后給發(fā)送方返回成功的通知
  3. 發(fā)送方接收到成功通知后開始執(zhí)行本地事務
  4. 如果本地事務成功, 那么久通知服務端把半消息推送到訂閱方, 否則取消半消息的推送
  5. 如果因為網(wǎng)絡等原因遲遲沒有返回失敗還是成功椒舵,那么會執(zhí)行RocketMQ的回調(diào)接口, 來進行事務結(jié)果的回查秽褒。
  6. 檢查本地數(shù)據(jù)庫提交結(jié)果, 查看是否已提交
  7. 更具查看事務結(jié)果通知服務端是否推送半消息到訂閱方
  8. 消息推到訂閱方, 訂閱方接收到消息后執(zhí)行事務, 只要保證消費方失敗重試, 就能保證最終一致性

2.2 方法說明

名詞 說明
Half Message 事務消息 也稱半消息 標識該消息處于"暫時不能投遞"狀態(tài)灰嫉,不會被Comsumer所消費栋操,待服務端收到生成者對該消息的commit或者rollback響應后熔吗,消息會被正常投遞或者回滾(丟棄)消息
TransactionMQProducer 半消息的發(fā)送者
TransactionListener 處理本地事務, 根據(jù)本地事務處理結(jié)果決定半消息是否發(fā)送, 需重寫實現(xiàn)方法: 1.executeLocalTransaction:執(zhí)行本地事務操作; 2.checkLocalTransaction:回查本地事務操作
LocalTransactionState 事務消息的狀態(tài)腮出,有三種狀態(tài):CommitTransaction(提交) 怀偷、RollbackTransaction(回滾)宠叼、Unknown(未知)

三. 使用示例

現(xiàn)在有個轉(zhuǎn)賬業(yè)務, 用戶A轉(zhuǎn)賬給用戶B, 設計兩個操作: 用戶A扣錢和用戶B加錢, 這兩個操作是在兩個不同的服務器執(zhí)行的.

3.1 實現(xiàn)原理

說明:

  1. 用戶A在扣款之前鄙皇,先發(fā)送半消息到中間件
  2. 半消息發(fā)送成功后芜赌,執(zhí)行扣款本地事務
  3. 扣款事務執(zhí)行成功后,通過推送消息通知另外一臺服務器進行用戶B加錢事務

3.2 用戶A扣錢操作

3.2.1 Service層(實現(xiàn):TransactionListener類伴逸,進行扣錢事務操作)

@Service
public class TransactionListenerImpl implements TransactionListener {
    // 執(zhí)行本地事務操作
    @Override
    public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
        try {
            // 開始執(zhí)行用戶A扣錢操作
            // 事務提交成功后返回提交狀態(tài)通知
            return LocalTransactionState.COMMIT_MESSAGE;
        } catch (Exception e) {
            // 事務提交失敗后返回回滾狀態(tài)通知
            return LocalTransactionState.ROLLBACK_MESSAGE;
        }
    }
    // 回查本地事務操作
    @Override
    public LocalTransactionState checkLocalTransaction(MessageExt msg) {
        boolean flag = true; // 查看數(shù)據(jù)庫事務提交結(jié)果, 提交成功返回true, 否則返回false
        if(flag){
            return LocalTransactionState.COMMIT_MESSAGE;
        }else {
            return LocalTransactionState.ROLLBACK_MESSAGE;
        }
    }
}

3.2.2 Controller層(消息發(fā)送)

@RestController
public class Producer {
    @Autowired
    private ObjectMapper objectMapper;
    @Resource
    private TransactionListenerImpl transactionListener;
    @RequestMapping("/sendMessage")
    public Object sendMessage(Map<String,Object> param) {
        try {
            //消息對象
            Message message = new Message();
            //設置主題內(nèi)容
            message.setBody(objectMapper.writeValueAsString("通知內(nèi)容").getBytes());
            message.setTopic("topicName");//設置主題名
            message.setTags("topicTag");// 設置標簽
            TransactionMQProducer transactionMQProducer = new TransactionMQProducer();
            // 設置處理對象
            transactionMQProducer.setTransactionListener(transactionListener);
            // 發(fā)送半消息
            TransactionSendResult sendResult = transactionMQProducer.sendMessageInTransaction(message, null);
            if(SendStatus.SEND_OK == sendResult.getSendStatus()){
                // 半消息發(fā)送成功
                return "success";
            }else {
                // 半消息發(fā)送失敗
                return "error";
            }
        } catch (Exception e) {
            return "error";
        }
    }
}

3.2 用戶B加錢操作

@Configuration
public class ConsumerService {
    @Bean
    public DefaultMQPushConsumer defaultMQPushConsumer() throws MQClientException {
        //消息接受者
        DefaultMQPushConsumer defaultMQPushConsumer = new DefaultMQPushConsumer();
        //設置ConsumerGroup
        defaultMQPushConsumer.setConsumerGroup("consumerGroupName");
        //設置Nameserve
        defaultMQPushConsumer.setNamesrvAddr("nameServe");
        //設置主題與主題下的標簽
        defaultMQPushConsumer.subscribe("topicName", "topicTag");
        //  開始接收消息缠沈,當訂閱主題內(nèi)容發(fā)生變化,本方法就會執(zhí)行
        defaultMQPushConsumer.registerMessageListener((List<MessageExt> msgs, ConsumeConcurrentlyContext context) -> {
            //遍歷消息隊列
            msgs.forEach(mt -> {
                // 找到對應的通知后, 判斷是否已經(jīng)處理過, 否則進行用戶B的加錢操作
                // ......
            });
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        });
        defaultMQPushConsumer.start();
        return defaultMQPushConsumer;
    }
}
最后編輯于
?著作權歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末错蝴,一起剝皮案震驚了整個濱河市洲愤,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌顷锰,老刑警劉巖禽篱,帶你破解...
    沈念sama閱讀 217,509評論 6 504
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異馍惹,居然都是意外死亡躺率,警方通過查閱死者的電腦和手機,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,806評論 3 394
  • 文/潘曉璐 我一進店門万矾,熙熙樓的掌柜王于貴愁眉苦臉地迎上來悼吱,“玉大人,你說我怎么就攤上這事良狈『筇恚” “怎么了?”我有些...
    開封第一講書人閱讀 163,875評論 0 354
  • 文/不壞的土叔 我叫張陵薪丁,是天一觀的道長遇西。 經(jīng)常有香客問我,道長严嗜,這世上最難降的妖魔是什么粱檀? 我笑而不...
    開封第一講書人閱讀 58,441評論 1 293
  • 正文 為了忘掉前任,我火速辦了婚禮漫玄,結(jié)果婚禮上茄蚯,老公的妹妹穿的比我還像新娘压彭。我一直安慰自己,他們只是感情好渗常,可當我...
    茶點故事閱讀 67,488評論 6 392
  • 文/花漫 我一把揭開白布壮不。 她就那樣靜靜地躺著,像睡著了一般皱碘。 火紅的嫁衣襯著肌膚如雪询一。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 51,365評論 1 302
  • 那天癌椿,我揣著相機與錄音健蕊,去河邊找鬼。 笑死如失,一個胖子當著我的面吹牛,可吹牛的內(nèi)容都是我干的送粱。 我是一名探鬼主播褪贵,決...
    沈念sama閱讀 40,190評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼抗俄!你這毒婦竟也來了脆丁?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 39,062評論 0 276
  • 序言:老撾萬榮一對情侶失蹤动雹,失蹤者是張志新(化名)和其女友劉穎槽卫,沒想到半個月后,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體胰蝠,經(jīng)...
    沈念sama閱讀 45,500評論 1 314
  • 正文 獨居荒郊野嶺守林人離奇死亡歼培,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,706評論 3 335
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了茸塞。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片躲庄。...
    茶點故事閱讀 39,834評論 1 347
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖钾虐,靈堂內(nèi)的尸體忽然破棺而出噪窘,到底是詐尸還是另有隱情,我是刑警寧澤效扫,帶...
    沈念sama閱讀 35,559評論 5 345
  • 正文 年R本政府宣布倔监,位于F島的核電站,受9級特大地震影響菌仁,放射性物質(zhì)發(fā)生泄漏浩习。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 41,167評論 3 328
  • 文/蒙蒙 一济丘、第九天 我趴在偏房一處隱蔽的房頂上張望瘦锹。 院中可真熱鬧,春花似錦、人聲如沸弯院。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,779評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽听绳。三九已至颂碘,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間椅挣,已是汗流浹背头岔。 一陣腳步聲響...
    開封第一講書人閱讀 32,912評論 1 269
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留鼠证,地道東北人峡竣。 一個月前我還...
    沈念sama閱讀 47,958評論 2 370
  • 正文 我出身青樓,卻偏偏與公主長得像量九,于是被迫代替她去往敵國和親适掰。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當晚...
    茶點故事閱讀 44,779評論 2 354

推薦閱讀更多精彩內(nèi)容