RocketMQ 與 Spring Cloud Stream整合(八平窘、事務消息)

在分布式消息隊列中,目前唯一提供完整的事務消息的瞬浓,只有 RocketMQ 。關于這一點蓬坡,還是可以鼓吹下的猿棉。

可能會有胖友怒噴艿艿磅叛,RabbitMQ 和 Kafka 也有事務消息啊,也支持發(fā)送事務消息的發(fā)送萨赁,以及后續(xù)的事務消息的 commit提交或 rollbackc 回滾弊琴。但是要考慮一個極端的情況,在本地數(shù)據(jù)庫事務已經(jīng)提交的時時候杖爽,如果因為網(wǎng)絡原因敲董,又或者崩潰等等意外,導致事務消息沒有被 commit 慰安,最終導致這條事務消息丟失腋寨,分布式事務出現(xiàn)問題。

相比來說化焕,RocketMQ 提供事務回查機制萄窜,如果應用超過一定時長未 commit 或 rollback 這條事務消息,RocketMQ 會主動回查應用撒桨,詢問這條事務消息是 commit 還是 rollback 查刻,從而實現(xiàn)事務消息的狀態(tài)最終能夠被 commit 或是 rollback ,達到最終事務的一致性凤类。

這也是為什么艿艿在上面專門加粗“完整的”三個字的原因穗泵。可能上述的描述谜疤,對于絕大多數(shù)沒有了解過分布式事務的胖友佃延,會比較陌生,所以推薦閱讀如下文章:

雖然說 RabbitMQ茎截、Kafka 并未提供完整的事務消息苇侵,但是社區(qū)里,已經(jīng)基于它們之上拓展企锌,提供了事務回查的功能榆浓。例如說:Myth ,采用消息隊列解決分布式事務的開源框架, 基于 Java 語言來開發(fā)(JDK1.8)撕攒,支持 Dubbo肚吏,Spring Cloud,Motan 等 RPC 框架進行分布式事務生宛。

下面味咳,我們來搭建一個 RocketMQ 定時消息的使用示例〔晾考慮方便脊阴,我們直接復用[快速入門]小節(jié)的項目,復制原producer項目成 [sca-stream-rocketmq-producer-transaction]發(fā)送事務消息,繼續(xù)使用 [sca-stream-rocketmq-consumer] 消費消息嘿期。

8.1 復制項目

從 [sca-stream-rocketmq-producer] 復制出 [sca-stream-rocketmq-producer-transaction]來發(fā)送事務消息品擎。

8.2 配置文件

修改 application.yml 配置文件,添加 transactional 配置項為 true备徐,設置 Producer 發(fā)送事務消息萄传。完整配置如下:

spring:
  application:
    name: stream-rocketmq-producer-application
  cloud:
    # Spring Cloud Stream 配置項,對應 BindingServiceProperties 類
    stream:
      # Binding 配置項蜜猾,對應 BindingProperties Map
      bindings:
        erbadagang-output:
          destination: ERBADAGANG-TOPIC-01 # 目的地秀菱。這里使用 RocketMQ Topic
          content-type: application/json # 內(nèi)容格式。這里使用 JSON

        trek-output:
          destination: TREK-TOPIC-01 # 目的地蹭睡。這里使用 RocketMQ Topic
          content-type: application/json # 內(nèi)容格式衍菱。這里使用 JSON
      # Spring Cloud Stream RocketMQ 配置項
      rocketmq:
        # RocketMQ Binder 配置項,對應 RocketMQBinderConfigurationProperties 類
        binder:
          name-server: 101.133.227.13:9876 # RocketMQ Namesrv 地址
        # RocketMQ 自定義 Binding 配置項棠笑,對應 RocketMQBindingProperties Map
        bindings:
          erbadagang-output:
            # RocketMQ Producer 配置項梦碗,對應 RocketMQProducerProperties 類
            producer:
              group: test # 生產(chǎn)者分組
              sync: true # 是否同步發(fā)送消息,默認為 false 異步蓖救。
              transactional: true # 是否發(fā)送事務消息洪规,默認為 false。

server:
  port: 18080

8.3 Demo01Controller

修改 [Demo01Controller]類循捺,增加發(fā)送事務消息的 HTTP 接口斩例。代碼如下:


    /**
     * 事務消息
     *
     * @return
     */
    @GetMapping("/send_transaction")
    public boolean sendTransaction() {
        // 創(chuàng)建 Message
        Demo01Message message = new Demo01Message()
                .setId(new Random().nextInt());
        // 創(chuàng)建 Spring Message 對象
        Args args = new Args().setArgs1(1).setArgs2("2");
        Message<Demo01Message> springMessage = MessageBuilder.withPayload(message)
                .setHeader("args", JSON.toJSONString(args)) // <X>
                .build();
        // 發(fā)送消息
        return mySource.erbadagangOutput().send(springMessage);
    }

    public static class Args {

        private Integer args1;

        private String args2;

        public Integer getArgs1() {
            return args1;
        }

        public Args setArgs1(Integer args1) {
            this.args1 = args1;
            return this;
        }

        public String getArgs2() {
            return args2;
        }

        public Args setArgs2(String args2) {
            this.args2 = args2;
            return this;
        }

        @Override
        public String toString() {
            return "Args{" +
                    "args1=" + args1 +
                    ", args2='" + args2 + '\'' +
                    '}';
        }

    }

因為 Spring Cloud Stream 在設計時,并沒有考慮事務消息从橘,所以我們只好在 <X> 處念赶,通過 Header 傳遞參數(shù)。

又因為 Header 后續(xù)會被轉換成 String 類型恰力,導致我們無法獲得正確的真實的原始參數(shù)叉谜,所以這里我們先使用 JSON 將 args 參數(shù)序列化成字符串,這樣后續(xù)我們可以使用 JSON 反序列化回來踩萎。

8.4 TransactionListenerImpl

創(chuàng)建 [TransactionListenerImpl]類停局,實現(xiàn) MQ 事務的監(jiān)聽。代碼如下:

package com.erbadagang.springcloudalibaba.stream.rocketmq.producer.message;

import com.alibaba.fastjson.JSON;
import com.erbadagang.springcloudalibaba.stream.rocketmq.producer.controller.Demo01Controller;
import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionState;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.messaging.Message;

/**
 * @description 實現(xiàn) MQ 事務的監(jiān)聽香府。
 * @ClassName: TransactionListenerImpl
 * @author: 郭秀志 jbcode@126.com
 * @date: 2020/8/7 9:21
 * @Copyright:
 */
@RocketMQTransactionListener(txProducerGroup = "test")
public class TransactionListenerImpl implements RocketMQLocalTransactionListener {

    private Logger logger = LoggerFactory.getLogger(getClass());

    @Override
    public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
        // 從消息 Header 中解析到 args 參數(shù)董栽,并使用 JSON 反序列化
        Demo01Controller.Args args = JSON.parseObject(msg.getHeaders().get("args", String.class),
                Demo01Controller.Args.class);
        // ... local transaction process, return rollback, commit or unknown
        logger.info("[executeLocalTransaction][執(zhí)行本地事務,消息:{} args:{}]", msg, args);
        return RocketMQLocalTransactionState.UNKNOWN;
    }

    @Override
    public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
        // ... check local transaction status and return rollback, commit or unknown
        logger.info("[checkLocalTransaction][回查消息:{}]", msg);
        return RocketMQLocalTransactionState.COMMIT;
    }

}

① 在類上企孩,添加 @RocketMQTransactionListener 注解锭碳,聲明監(jiān)聽器的是生產(chǎn)者分組是 "test" 的 Producer 發(fā)送的事務消息。因為 RocketMQ 是回查(請求)指定指定生產(chǎn)分組下的 Producer勿璃,從而獲得事務消息的狀態(tài)擒抛,所以一定要正確設置推汽。

② 實現(xiàn) RocketMQLocalTransactionListener 接口,實現(xiàn)執(zhí)行本地事務和檢查本地事務的方法歧沪。

③ 實現(xiàn) #executeLocalTransaction(...) 方法民泵,實現(xiàn)執(zhí)行本地事務。

  • 注意槽畔,這是一個模板方法。在調(diào)用這個方法之前胁编,Spring Cloud Alibaba Stream RocketMQ 已經(jīng)使用 Producer 發(fā)送了一條事務消息厢钧,本方法里面我們使用消息內(nèi)容進行本地數(shù)據(jù)庫事務操作。然后根據(jù)該方法執(zhí)行的返回的 RocketMQLocalTransactionState 結果嬉橙,提交還是回滾該事務消息早直。

  • 這里,我們?yōu)榱四M需要 RocketMQ 回查 Producer 來獲得事務消息的狀態(tài)市框,所以返回了 RocketMQLocalTransactionState.UNKNOWN——未知狀態(tài)霞扬。

④ 實現(xiàn) #checkLocalTransaction(...) 方法,檢查本地事務枫振。

  • 在事務消息長事件未被提交或回滾時喻圃,RocketMQ 會回查事務消息對應的生產(chǎn)者分組下的 Producer ,獲得事務消息的狀態(tài)粪滤。此時斧拍,該方法就會被調(diào)用。
  • 這里杖小,我們直接返回 RocketMQLocalTransactionState.COMMIT 提交狀態(tài)肆汹。

一般來說,有兩種方式實現(xiàn)本地事務回查時予权,返回事務消息的狀態(tài)昂勉。

第一種,通過 msg 消息扫腺,獲得某個業(yè)務上的標識或者編號岗照,然后去數(shù)據(jù)庫中查詢業(yè)務記錄,從而判斷該事務消息的狀態(tài)是提交還是回滾斧账。

第二種谴返,記錄 msg 的事務編號,與事務狀態(tài)到數(shù)據(jù)庫中咧织。

  • 第一步嗓袱,在 #executeLocalTransaction(...) 方法中,先存儲一條 idmsg 的事務編號习绢,狀態(tài)為 RocketMQLocalTransactionState.UNKNOWN 的記錄渠抹。
  • 第二步蝙昙,調(diào)用帶有事務的業(yè)務 Service 的方法。在該 Service 方法中梧却,在邏輯都執(zhí)行成功的情況下奇颠,更新 idmsg 的事務編號,狀態(tài)變更為 RocketMQLocalTransactionState.COMMIT 放航。這樣烈拒,我們就可以伴隨這個事務的提交,更新 idmsg 的事務編號的記錄的狀為 RocketMQLocalTransactionState.COMMIT 广鳍,美滋滋荆几。。
  • 第三步赊时,要以 try-catch 的方式吨铸,調(diào)用業(yè)務 Service 的方法。如此祖秒,如果發(fā)生異常诞吱,回滾事務的時候,可以在 catch 中竭缝,更新 idmsg 的事務編號的記錄的狀態(tài)為 RocketMQLocalTransactionState.ROLLBACK 房维。?? 極端情況下,可能更新失敗抬纸,則打印 error 日志握巢,告警知道,人工介入松却。
  • 如此三步之后暴浦,我們在 #executeLocalTransaction(...) 方法中,就可以通過查找數(shù)據(jù)庫晓锻,idmsg 的事務編號的記錄的狀態(tài)歌焦,然后返回。

相比來說砚哆,傾向第一種独撇,實現(xiàn)更加簡單通用,對于業(yè)務開發(fā)者躁锁,更加友好纷铣。和有幾個朋友溝通了下,但他們是采用第二種战转。

8.5 簡單測試

① 執(zhí)行 ConsumerApplication搜立,啟動消費者的實例。
② 執(zhí)行 ProducerApplication槐秧,啟動生產(chǎn)者的實例啄踊。

之后忧设,請求 http://127.0.0.1:18080/demo01/send_transaction 接口,發(fā)送事務消息颠通。IDEA 控制臺輸出日志如下:

// ProduerApplication 控制臺
// ### TransactionListenerImpl 執(zhí)行 executeLocalTransaction 方法址晕,先執(zhí)行本地事務的邏輯
2020-08-07 09:53:13.715  INFO 5656 --- [io-18080-exec-5] c.e.s.s.r.p.m.TransactionListenerImpl    : [executeLocalTransaction][執(zhí)行本地事務,消息:GenericMessage [payload=byte[16], headers={args={"args1":1,"args2":"erbadagang神車"}, rocketmq_TOPIC=ERBADAGANG-TOPIC-01, rocketmq_FLAG=0, rocketmq_TRANSACTION_ID=C0A82B7C161818B4AAC2210545790002, id=06cfec82-f118-78df-5f8c-c6d273b69581, contentType=application/json, timestamp=1596765193715}] args:Args{args1=1, args2='erbadagang神車'}]
// ### Producer 發(fā)送事務消息成功顿锰,但是因為 executeLocalTransaction 方法返回的是 UNKOWN 狀態(tài)谨垃,所以事務消息并未提交或者回滾
// ### RocketMQ Broker 在發(fā)送事務消息 38 秒后,發(fā)現(xiàn)事務消息還未提交或是回滾硼控,所以回查 Producer 乘客。此時,checkLocalTransaction 方法返回 COMMIT 淀歇,所以該事務消息被提交。
2020-08-07 09:53:51.368  INFO 5656 --- [pool-1-thread-1] c.e.s.s.r.p.m.TransactionListenerImpl    : [checkLocalTransaction][回查消息:GenericMessage [payload=byte[16], headers={rocketmq_QUEUE_ID=3, TRANSACTION_CHECK_TIMES=1, rocketmq_BORN_TIMESTAMP=1596765193593, args={"args1":1,"args2":"erbadagang神車"}, rocketmq_TOPIC=ERBADAGANG-TOPIC-01, rocketmq_FLAG=0, rocketmq_MESSAGE_ID=6585E30D00002A9F000000000005005F, rocketmq_TRANSACTION_ID=C0A82B7C161818B4AAC2210545790002, rocketmq_SYS_FLAG=0, id=e3304269-c7da-b3d5-864a-2c0fb8a713e5, CLUSTER=DefaultCluster, rocketmq_BORN_HOST=103.3.96.229, contentType=application/json, timestamp=1596765231368}]]

// ConsumerApplication 控制臺
// ### 事務消息被提交匈织,所以該消息被 Consumer 消費
2020-08-07 09:53:51.602  INFO 8832 --- [MessageThread_1] c.e.s.s.r.c.listener.Demo01Consumer      : [onMessage][線程編號:78 消息內(nèi)容:Demo01Message{id=997254686}]

底線


本文源代碼使用 Apache License 2.0開源許可協(xié)議浪默,這里是本文源碼Gitee地址,可通過命令git clone+地址下載代碼到本地缀匕,也可直接點擊鏈接通過瀏覽器方式查看源代碼纳决。

?著作權歸作者所有,轉載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市乡小,隨后出現(xiàn)的幾起案子阔加,更是在濱河造成了極大的恐慌,老刑警劉巖满钟,帶你破解...
    沈念sama閱讀 211,817評論 6 492
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件胜榔,死亡現(xiàn)場離奇詭異,居然都是意外死亡湃番,警方通過查閱死者的電腦和手機夭织,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,329評論 3 385
  • 文/潘曉璐 我一進店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來吠撮,“玉大人尊惰,你說我怎么就攤上這事∧嗬迹” “怎么了弄屡?”我有些...
    開封第一講書人閱讀 157,354評論 0 348
  • 文/不壞的土叔 我叫張陵,是天一觀的道長鞋诗。 經(jīng)常有香客問我膀捷,道長,這世上最難降的妖魔是什么削彬? 我笑而不...
    開封第一講書人閱讀 56,498評論 1 284
  • 正文 為了忘掉前任担孔,我火速辦了婚禮江锨,結果婚禮上,老公的妹妹穿的比我還像新娘糕篇。我一直安慰自己啄育,他們只是感情好,可當我...
    茶點故事閱讀 65,600評論 6 386
  • 文/花漫 我一把揭開白布拌消。 她就那樣靜靜地躺著挑豌,像睡著了一般。 火紅的嫁衣襯著肌膚如雪墩崩。 梳的紋絲不亂的頭發(fā)上氓英,一...
    開封第一講書人閱讀 49,829評論 1 290
  • 那天,我揣著相機與錄音鹦筹,去河邊找鬼铝阐。 笑死,一個胖子當著我的面吹牛铐拐,可吹牛的內(nèi)容都是我干的徘键。 我是一名探鬼主播,決...
    沈念sama閱讀 38,979評論 3 408
  • 文/蒼蘭香墨 我猛地睜開眼遍蟋,長吁一口氣:“原來是場噩夢啊……” “哼吹害!你這毒婦竟也來了?” 一聲冷哼從身側響起虚青,我...
    開封第一講書人閱讀 37,722評論 0 266
  • 序言:老撾萬榮一對情侶失蹤它呀,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后棒厘,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體纵穿,經(jīng)...
    沈念sama閱讀 44,189評論 1 303
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 36,519評論 2 327
  • 正文 我和宋清朗相戀三年奢人,在試婚紗的時候發(fā)現(xiàn)自己被綠了政恍。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 38,654評論 1 340
  • 序言:一個原本活蹦亂跳的男人離奇死亡达传,死狀恐怖篙耗,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情宪赶,我是刑警寧澤宗弯,帶...
    沈念sama閱讀 34,329評論 4 330
  • 正文 年R本政府宣布,位于F島的核電站搂妻,受9級特大地震影響蒙保,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜欲主,卻給世界環(huán)境...
    茶點故事閱讀 39,940評論 3 313
  • 文/蒙蒙 一邓厕、第九天 我趴在偏房一處隱蔽的房頂上張望逝嚎。 院中可真熱鬧,春花似錦详恼、人聲如沸补君。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,762評論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽挽铁。三九已至,卻和暖如春敞掘,著一層夾襖步出監(jiān)牢的瞬間叽掘,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 31,993評論 1 266
  • 我被黑心中介騙來泰國打工玖雁, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留更扁,地道東北人。 一個月前我還...
    沈念sama閱讀 46,382評論 2 360
  • 正文 我出身青樓赫冬,卻偏偏與公主長得像浓镜,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子面殖,可洞房花燭夜當晚...
    茶點故事閱讀 43,543評論 2 349