Spring Boot整合RocketMQ之事務(wù)消息

上次簡(jiǎn)單的了解了一下在Spring Boot下通過使用rocketmq-spring-boot-starter進(jìn)行普通消息的發(fā)送成黄、接收以及使用集群模式來模擬實(shí)現(xiàn)廣播模式钢颂,文章鏈接奕枢。今天來學(xué)習(xí)一下RocketMQ事務(wù)消息的發(fā)送削祈。
RocketMQ的事務(wù)消息分為3種狀態(tài)拾碌,分別是提交狀態(tài)卦绣、回滾狀態(tài)姑荷、中間狀態(tài):

TransactionStatus.CommitTransaction: 提交事務(wù)侮攀,它允許消費(fèi)者消費(fèi)此消息。
TransactionStatus.RollbackTransaction: 回滾事務(wù)厢拭,它代表該消息將被刪除兰英,不允許被消費(fèi)。
TransactionStatus.Unknown: 中間狀態(tài)供鸠,它代表需要檢查消息隊(duì)列來確定狀態(tài)畦贸。

當(dāng)然因?yàn)樵陧?xiàng)目中我使用的是rocketmq-spring-boot-starter,所以表述上略有不同楞捂,但是本質(zhì)是一樣的薄坏。
事務(wù)消息在解決分布式事務(wù)的場(chǎng)景中感覺還是很有用的,雖然我們現(xiàn)在項(xiàng)目的分布式事務(wù)是通過Seata來實(shí)現(xiàn)的寨闹,但是通過事務(wù)消息或者消息的最終一次性也是可以的胶坠。
事務(wù)消息總共分為3個(gè)階段:發(fā)送Prepared消息、執(zhí)行本地事務(wù)繁堡、發(fā)送確認(rèn)消息沈善。這三個(gè)階段是前后關(guān)聯(lián)的乡数,只有發(fā)送Prepared消息成功,才會(huì)執(zhí)行本地事務(wù)闻牡,本地事務(wù)返回的狀態(tài)是提交净赴,那么就會(huì)發(fā)送最終的確認(rèn)消息。如果在結(jié)束消息事務(wù)時(shí)罩润,本地事務(wù)狀態(tài)失敗玖翅,那么Broker回查線程定時(shí)(默認(rèn)1分鐘)掃描每個(gè)存儲(chǔ)事務(wù)狀態(tài)的表格文件,如果是已經(jīng)提交或者回滾的消息直接跳過割以,如果是Prepared狀態(tài)則會(huì)向生產(chǎn)者發(fā)起一個(gè)檢查本地事務(wù)的請(qǐng)求金度。

一、代碼修改

首先我創(chuàng)建有一個(gè)Service來發(fā)送事務(wù)消息严沥,代碼沒有什么特殊的含義猜极,只是拿來當(dāng)一個(gè)demo,代碼如下:

    public Boolean save(OrderEntity orderEntity) {
        Message<OrderEntity> message = MessageBuilder.withPayload(orderEntity).build();

        log.info(">>>> send tx message start,tx_group={},destination={},payload={} <<<<",TX_GROUP,ORDER_TOPIC + ORDER_TAG,orderEntity);
        TransactionSendResult sendResult = rocketMQTemplate.sendMessageInTransaction("tx_order","order_topic:" + "tx_tag",message,orderEntity.getUserName());
        String sendStatus = sendResult.getSendStatus().name();
        String localTXState = sendResult.getLocalTransactionState().name();
        log.info(">>>> send status={},localTransactionState={} <<<<",sendStatus,localTXState);
        return Boolean.TRUE;
    }

使用RocketMQTemplate發(fā)送事務(wù)消息和普通消息略有不同的是祝峻,需要指一個(gè)事務(wù)生產(chǎn)者組魔吐,當(dāng)然如果傳入null,則會(huì)使用默認(rèn)值rocketmq_transaction_default_global_name莱找,發(fā)生消息的地址和普通消息一樣都Topic:Tag酬姆,另外一點(diǎn)不同的是除了發(fā)生的Message之外,還可以發(fā)送其他的額外參數(shù)奥溺,不過這些參數(shù)只會(huì)在執(zhí)行本地事務(wù)的時(shí)候會(huì)用到辞色。
接下來我們創(chuàng)建一個(gè)消息的監(jiān)聽器(消費(fèi)者),這個(gè)和普通消息的監(jiān)聽器一樣浮定,代碼如下:

@Component
@RocketMQMessageListener(consumerGroup = "tx_consumer",topic = "order_topic")
public class OrderListener implements RocketMQListener<String>{

    @Override
    public void onMessage(String message) {
        log.info(">>>> message={} <<<<",message);
    }
}

除了消費(fèi)者之外相满,我們還需要?jiǎng)?chuàng)建事務(wù)消息生產(chǎn)者端的消息監(jiān)聽器,注意是生產(chǎn)者桦卒,不是消費(fèi)者立美,我們需要實(shí)現(xiàn)的是RocketMQLocalTransactionListener接口,代碼如下:

@RocketMQTransactionListener(txProducerGroup = "tx_order")
public class OrderTXMsgListener implements RocketMQLocalTransactionListener {

    @Autowired
    private UserRepository userRepository;

    private static final Gson GSON = new Gson();

    @Override
    public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
        log.info(">>>> TX message listener execute local transaction, message={},args={} <<<<",msg,arg);
        // 執(zhí)行本地事務(wù)
        RocketMQLocalTransactionState result = RocketMQLocalTransactionState.COMMIT;
        try {
            String jsonString = new String((byte[]) msg.getPayload(), StandardCharsets.UTF_8);
            OrderEntity orderEntity = GSON.fromJson(jsonString, OrderEntity.class);
            String userName = (String) arg;
        } catch (Exception e) {
            log.error(">>>> exception message={} <<<<",e.getMessage());
            result = RocketMQLocalTransactionState.UNKNOWN;
        }
        return result;
    }

    @Override
    public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
        log.info(">>>> TX message listener check local transaction, message={} <<<<",msg.getPayload());
        // 檢查本地事務(wù)
        RocketMQLocalTransactionState result = RocketMQLocalTransactionState.COMMIT;
        try {
            String jsonString = new String((byte[]) msg.getPayload(), StandardCharsets.UTF_8);
            OrderEntity orderEntity = GSON.fromJson(jsonString, OrderEntity.class);
        } catch (Exception e) {
            // 異常就回滾
            log.error(">>>> exception message={} <<<<",e.getMessage());
            result = RocketMQLocalTransactionState.ROLLBACK;
        }
        return result;
    }
}

@RocketMQTransactionListener表明這個(gè)一個(gè)生產(chǎn)端的消息監(jiān)聽器方灾,需要配置監(jiān)聽的事務(wù)消息生產(chǎn)者組建蹄。而實(shí)現(xiàn)RocketMQLocalTransactionListener接口,重寫執(zhí)行本地事務(wù)的方法和檢查本地事務(wù)方法裕偿。下面洞慎,我們通過修改生產(chǎn)者端事務(wù)監(jiān)聽器的代碼來觀察代碼的執(zhí)行情況。

二嘿棘、消息事務(wù)測(cè)試

首先還是正常的啟動(dòng)項(xiàng)目劲腿,在執(zhí)行本地事務(wù)方法中正常情況下返回的值是COMMIT,即提交事務(wù)鸟妙,這種情況下消費(fèi)者會(huì)直接消費(fèi)消息焦人,而略過檢查本地事務(wù)的方法挥吵。調(diào)用該接口,項(xiàng)目日志輸出如下:

>>>> send tx message start,tx_group=tx_order,destination=order_topic:tx_tag,payload=OrderEntity(id=null, userName=lisi, price=8848.00, address=CN-SC-CD-05, createTime=null, updateTime=null, status=20) <<<<
>>>> TX message listener execute local transaction, message=GenericMessage [payload=byte[119], headers={rocketmq_TOPIC=order_topic, rocketmq_FLAG=0, rocketmq_TRANSACTION_ID=C0A800690C3418B4AAC2842438960000, rocketmq_TAGS=tx_tag, id=f32f4848-9acf-20bb-2501-0e6088765897, contentType=application/json, timestamp=1595749766307}],args=lisi <<<<
>>>> send status=SEND_OK,localTransactionState=COMMIT_MESSAGE <<<<
>>>> message={"id":null,"userName":"lisi","price":8848.00,"address":"CN-SC-CD-05","createTime":null,"updateTime":null,"status":"20"} <<<<

通過日志分析可以看出垃瞧,在執(zhí)行完本地事務(wù)方法之后蔫劣,返回的本地事務(wù)狀態(tài)是COMMIT_MESSAGE坪郭,接著消費(fèi)者消費(fèi)消息个从,和我們的預(yù)期是一樣的。
接下來我們修改下執(zhí)行本地事務(wù)的方法歪沃,讓該方法返回狀態(tài)為RocketMQLocalTransactionState.UNKNOWN嗦锐,修改之后如下:

    public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
        log.info(">>>> TX message listener execute local transaction, message={},args={} <<<<",msg,arg);
        // 執(zhí)行本地事務(wù)
        RocketMQLocalTransactionState result = RocketMQLocalTransactionState.COMMIT;
        try {
            String jsonString = new String((byte[]) msg.getPayload(), StandardCharsets.UTF_8);
            OrderEntity orderEntity = GSON.fromJson(jsonString, OrderEntity.class);
            String userName = (String) arg;
            int r = 11 / 0;
        } catch (Exception e) {
            log.error(">>>> exception message={} <<<<",e.getMessage());
            result = RocketMQLocalTransactionState.UNKNOWN;
        }
        return result;
    }

這樣因?yàn)榘l(fā)生異常,該方法返回的結(jié)果是UNKNOWN沪曙,根據(jù)上文的分析奕污,執(zhí)行本地事務(wù)方法之后應(yīng)該會(huì)執(zhí)行檢查本地事務(wù)方法,重啟項(xiàng)目之后液走,再次調(diào)用一下接口碳默,查看日志輸出如下:

>>>> send tx message start,tx_group=tx_order,destination=order_topic:tx_tag,payload=OrderEntity(id=null, userName=zhangsan, price=90001.00, address=CN-SC-CD-02, createTime=null, updateTime=null, status=10) <<<<
>>>> TX message listener execute local transaction, message=GenericMessage [payload=byte[124], headers={rocketmq_TOPIC=order_topic, rocketmq_FLAG=0, rocketmq_TRANSACTION_ID=C0A800690E9D18B4AAC2842BF39A0000, rocketmq_TAGS=tx_tag, id=dfd215f4-2aa6-f377-d1a7-ebbe3875769a, contentType=application/json, timestamp=1595750272928}],args=zhangsan <<<<
>>>> exception message=/ by zero <<<<
>>>> send status=SEND_OK,localTransactionState=UNKNOW <<<<
HikariPool-1 - Thread starvation or clock leap detected (housekeeper delta=1m22s578ms430μs170ns).
>>>> TX message listener check local transaction, message=GenericMessage [payload=byte[124], headers={rocketmq_QUEUE_ID=0, TRANSACTION_CHECK_TIMES=1, rocketmq_TAGS=tx_tag, rocketmq_BORN_TIMESTAMP=1595750272923, rocketmq_TOPIC=order_topic, rocketmq_FLAG=0, rocketmq_MESSAGE_ID=C0A8006900002A9F00000000000156AB, rocketmq_TRANSACTION_ID=C0A800690E9D18B4AAC2842BF39A0000, rocketmq_SYS_FLAG=0, id=ea3c3a7a-23c6-5acf-4c0f-0fa42f795b41, rocketmq_BORN_HOST=192.168.0.105, contentType=application/json, timestamp=1595750310890}] <<<<
>>>> TX message listener check local transaction, message=GenericMessage [payload=byte[124], headers={rocketmq_QUEUE_ID=0, TRANSACTION_CHECK_TIMES=2, rocketmq_TAGS=tx_tag, rocketmq_BORN_TIMESTAMP=1595750272923, rocketmq_TOPIC=order_topic, rocketmq_FLAG=0, rocketmq_MESSAGE_ID=C0A8006900002A9F0000000000015892, rocketmq_TRANSACTION_ID=C0A800690E9D18B4AAC2842BF39A0000, rocketmq_SYS_FLAG=0, id=cddfa35c-c8b2-cb1b-dce7-a26c6888b99a, rocketmq_BORN_HOST=192.168.0.105, contentType=application/json, timestamp=1595750374536}] <<<<
>>>> message={"id":null,"userName":"zhangsan","price":90001.00,"address":"CN-SC-CD-02","createTime":null,"updateTime":null,"status":"10"} <<<<
>>>> message={"id":null,"userName":"zhangsan","price":90001.00,"address":"CN-SC-CD-02","createTime":null,"updateTime":null,"status":"10"} <<<<

根據(jù)日志輸出,在Service中返回的事務(wù)消息發(fā)送狀態(tài)是SEND_OK缘眶,但是返回的本地事務(wù)狀態(tài)是UNKNOW嘱根,所以需要執(zhí)行檢查本地事務(wù)方法,但是這里出現(xiàn)了一個(gè)問題就是檢查本地事務(wù)方法執(zhí)行了兩次巷懈,而且事務(wù)消息也被消費(fèi)了兩次该抒,感覺有點(diǎn)不正常了,但是檢查發(fā)現(xiàn)兩條信息日志中rocketmq_TRANSACTION_ID是一樣的顶燕,這是什么情況凑保??會(huì)不會(huì)和HikariPool-1 - Thread starvation or clock leap detected (housekeeper delta=1m22s578ms430μs170ns).有關(guān)呢涌攻,因?yàn)楫?dāng)時(shí)自己使用的DEBUG模式欧引,看代碼停留了一段時(shí)間,這樣導(dǎo)致Broker發(fā)起的第一個(gè)回查線程掛起恳谎,而這時(shí)Broker又啟動(dòng)了一個(gè)線程芝此,從而執(zhí)行了兩次檢查事務(wù)的代碼,而該方法返回的是COMMIT惠爽,所以癌蓖。
不使用DEBUG模式重新測(cè)試一下,日志如下:

>>>> send tx message start,tx_group=tx_order,destination=order_topic:tx_tag,payload=OrderEntity(id=null, userName=wangwu, price=9876.00, address=CN-SC-CD-00, createTime=null, updateTime=null, status=10) <<<<
>>>> TX message listener execute local transaction, message=GenericMessage [payload=byte[121], headers={rocketmq_TOPIC=order_topic, rocketmq_FLAG=0, rocketmq_TRANSACTION_ID=C0A800690E9D18B4AAC28432E4130005, rocketmq_TAGS=tx_tag, id=464edcfe-09c1-cc4a-5ac3-f3df888b0102, contentType=application/json, timestamp=1595750727701}],args=wangwu <<<<
>>>> exception message=/ by zero <<<<
>>>> send status=SEND_OK,localTransactionState=UNKNOW <<<<
>>>> TX message listener check local transaction, message=GenericMessage [payload=byte[121], headers={rocketmq_QUEUE_ID=3, TRANSACTION_CHECK_TIMES=1, rocketmq_TAGS=tx_tag, rocketmq_BORN_TIMESTAMP=1595750727699, rocketmq_TOPIC=order_topic, rocketmq_FLAG=0, rocketmq_MESSAGE_ID=C0A8006900002A9F0000000000016109, rocketmq_TRANSACTION_ID=C0A800690E9D18B4AAC28432E4130005, rocketmq_SYS_FLAG=0, id=77765356-fc4d-6d05-3531-6a67fbbed7f7, rocketmq_BORN_HOST=192.168.0.105, contentType=application/json, timestamp=1595750790917}] <<<<
>>>> message={"id":null,"userName":"wangwu","price":9876.00,"address":"CN-SC-CD-00","createTime":null,"updateTime":null,"status":"10"} <<<<

這里輸出的日志信息又沒有問題了婚肆,我個(gè)人認(rèn)為上面應(yīng)該就是DEBUG導(dǎo)致的租副,這里就不再探討了。
接下來測(cè)試一下较性,在執(zhí)行本地事務(wù)方法中返回ROLLBACK的情況用僧,這里代碼就省略了结胀,直接返回ROLLBACK。日志輸出如下:

>>>> send tx message start,tx_group=tx_order,destination=order_topic:tx_tag,payload=OrderEntity(id=null, userName=zhaoliu, price=10000.00, address=CN-SC-CD-03, createTime=null, updateTime=null, status=10) <<<<
>>>> TX message listener execute local transaction, message=GenericMessage [payload=byte[123], headers={rocketmq_TOPIC=order_topic, rocketmq_FLAG=0, rocketmq_TRANSACTION_ID=C0A800691A3A18B4AAC284F72B910000, rocketmq_TAGS=tx_tag, id=d5b24a82-8d8b-90ad-7322-adfe2c4f3026, contentType=application/json, timestamp=1595763591062}],args=zhaoliu <<<<
>>>> exception message=/ by zero <<<<
>>>> send status=SEND_OK,localTransactionState=ROLLBACK_MESSAGE <<<<

沒有執(zhí)行檢驗(yàn)本地事務(wù)的方法责循,和之前說的一樣糟港。到這里我覺得應(yīng)該基本上可以明白生產(chǎn)者端消息監(jiān)聽器中兩個(gè)方法的具體作用了,主要還是理解RocketMQ事務(wù)消息的基本原理院仿。
校驗(yàn)本地事務(wù)方法的返回值和執(zhí)行本地事務(wù)方法的返回值的作用是一樣的秸抚,這里就不再測(cè)試了。
網(wǎng)上找了一個(gè)圖歹垫,感覺非常的直觀:

圖-1.png

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末剥汤,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子排惨,更是在濱河造成了極大的恐慌吭敢,老刑警劉巖,帶你破解...
    沈念sama閱讀 222,104評(píng)論 6 515
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件暮芭,死亡現(xiàn)場(chǎng)離奇詭異鹿驼,居然都是意外死亡,警方通過查閱死者的電腦和手機(jī)辕宏,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 94,816評(píng)論 3 399
  • 文/潘曉璐 我一進(jìn)店門畜晰,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人匾效,你說我怎么就攤上這事舷蟀。” “怎么了面哼?”我有些...
    開封第一講書人閱讀 168,697評(píng)論 0 360
  • 文/不壞的土叔 我叫張陵野宜,是天一觀的道長(zhǎng)。 經(jīng)常有香客問我魔策,道長(zhǎng)匈子,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 59,836評(píng)論 1 298
  • 正文 為了忘掉前任闯袒,我火速辦了婚禮虎敦,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘政敢。我一直安慰自己其徙,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 68,851評(píng)論 6 397
  • 文/花漫 我一把揭開白布喷户。 她就那樣靜靜地躺著唾那,像睡著了一般。 火紅的嫁衣襯著肌膚如雪褪尝。 梳的紋絲不亂的頭發(fā)上闹获,一...
    開封第一講書人閱讀 52,441評(píng)論 1 310
  • 那天期犬,我揣著相機(jī)與錄音,去河邊找鬼避诽。 笑死龟虎,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的沙庐。 我是一名探鬼主播鲤妥,決...
    沈念sama閱讀 40,992評(píng)論 3 421
  • 文/蒼蘭香墨 我猛地睜開眼,長(zhǎng)吁一口氣:“原來是場(chǎng)噩夢(mèng)啊……” “哼轨功!你這毒婦竟也來了旭斥?” 一聲冷哼從身側(cè)響起容达,我...
    開封第一講書人閱讀 39,899評(píng)論 0 276
  • 序言:老撾萬榮一對(duì)情侶失蹤古涧,失蹤者是張志新(化名)和其女友劉穎,沒想到半個(gè)月后花盐,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體羡滑,經(jīng)...
    沈念sama閱讀 46,457評(píng)論 1 318
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 38,529評(píng)論 3 341
  • 正文 我和宋清朗相戀三年算芯,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了柒昏。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 40,664評(píng)論 1 352
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡熙揍,死狀恐怖职祷,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情届囚,我是刑警寧澤有梆,帶...
    沈念sama閱讀 36,346評(píng)論 5 350
  • 正文 年R本政府宣布,位于F島的核電站意系,受9級(jí)特大地震影響泥耀,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜蛔添,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 42,025評(píng)論 3 334
  • 文/蒙蒙 一痰催、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧迎瞧,春花似錦夸溶、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,511評(píng)論 0 24
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至咏尝,卻和暖如春压语,著一層夾襖步出監(jiān)牢的瞬間啸罢,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 33,611評(píng)論 1 272
  • 我被黑心中介騙來泰國(guó)打工胎食, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留扰才,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 49,081評(píng)論 3 377
  • 正文 我出身青樓厕怜,卻偏偏與公主長(zhǎng)得像衩匣,于是被迫代替她去往敵國(guó)和親。 傳聞我的和親對(duì)象是個(gè)殘疾皇子粥航,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,675評(píng)論 2 359