上次簡(jiǎn)單的了解了一下在Spring Boot
下通過使用rocketmq-spring-boot-starter
進(jìn)行普通消息的發(fā)送成黄、接收以及使用集群模式來模擬實(shí)現(xiàn)廣播模式钢颂,文章鏈接奕枢。今天來學(xué)習(xí)一下RocketMQ
事務(wù)消息的發(fā)送削祈。
RocketMQ
的事務(wù)消息分為3種狀態(tài)拾碌,分別是提交狀態(tài)卦绣、回滾狀態(tài)姑荷、中間狀態(tài):
TransactionStatus.CommitTransaction: 提交事務(wù)侮攀,它允許消費(fèi)者消費(fèi)此消息。
TransactionStatus.RollbackTransaction: 回滾事務(wù)厢拭,它代表該消息將被刪除兰英,不允許被消費(fèi)。
TransactionStatus.Unknown: 中間狀態(tài)供鸠,它代表需要檢查消息隊(duì)列來確定狀態(tài)畦贸。
當(dāng)然因?yàn)樵陧?xiàng)目中我使用的是rocketmq-spring-boot-starter
,所以表述上略有不同楞捂,但是本質(zhì)是一樣的薄坏。
事務(wù)消息在解決分布式事務(wù)的場(chǎng)景中感覺還是很有用的,雖然我們現(xiàn)在項(xiàng)目的分布式事務(wù)是通過Seata
來實(shí)現(xiàn)的寨闹,但是通過事務(wù)消息或者消息的最終一次性也是可以的胶坠。
事務(wù)消息總共分為3個(gè)階段:發(fā)送Prepared消息、執(zhí)行本地事務(wù)繁堡、發(fā)送確認(rèn)消息沈善。這三個(gè)階段是前后關(guān)聯(lián)的乡数,只有發(fā)送Prepared消息成功,才會(huì)執(zhí)行本地事務(wù)闻牡,本地事務(wù)返回的狀態(tài)是提交净赴,那么就會(huì)發(fā)送最終的確認(rèn)消息。如果在結(jié)束消息事務(wù)時(shí)罩润,本地事務(wù)狀態(tài)失敗玖翅,那么Broker
回查線程定時(shí)(默認(rèn)1分鐘)掃描每個(gè)存儲(chǔ)事務(wù)狀態(tài)的表格文件,如果是已經(jīng)提交或者回滾的消息直接跳過割以,如果是Prepared狀態(tài)則會(huì)向生產(chǎn)者發(fā)起一個(gè)檢查本地事務(wù)的請(qǐng)求金度。
一、代碼修改
首先我創(chuàng)建有一個(gè)Service
來發(fā)送事務(wù)消息严沥,代碼沒有什么特殊的含義猜极,只是拿來當(dāng)一個(gè)demo,代碼如下:
public Boolean save(OrderEntity orderEntity) {
Message<OrderEntity> message = MessageBuilder.withPayload(orderEntity).build();
log.info(">>>> send tx message start,tx_group={},destination={},payload={} <<<<",TX_GROUP,ORDER_TOPIC + ORDER_TAG,orderEntity);
TransactionSendResult sendResult = rocketMQTemplate.sendMessageInTransaction("tx_order","order_topic:" + "tx_tag",message,orderEntity.getUserName());
String sendStatus = sendResult.getSendStatus().name();
String localTXState = sendResult.getLocalTransactionState().name();
log.info(">>>> send status={},localTransactionState={} <<<<",sendStatus,localTXState);
return Boolean.TRUE;
}
使用RocketMQTemplate
發(fā)送事務(wù)消息和普通消息略有不同的是祝峻,需要指一個(gè)事務(wù)生產(chǎn)者組魔吐,當(dāng)然如果傳入null
,則會(huì)使用默認(rèn)值rocketmq_transaction_default_global_name
莱找,發(fā)生消息的地址和普通消息一樣都Topic:Tag
酬姆,另外一點(diǎn)不同的是除了發(fā)生的Message
之外,還可以發(fā)送其他的額外參數(shù)奥溺,不過這些參數(shù)只會(huì)在執(zhí)行本地事務(wù)的時(shí)候會(huì)用到辞色。
接下來我們創(chuàng)建一個(gè)消息的監(jiān)聽器(消費(fèi)者),這個(gè)和普通消息的監(jiān)聽器一樣浮定,代碼如下:
@Component
@RocketMQMessageListener(consumerGroup = "tx_consumer",topic = "order_topic")
public class OrderListener implements RocketMQListener<String>{
@Override
public void onMessage(String message) {
log.info(">>>> message={} <<<<",message);
}
}
除了消費(fèi)者之外相满,我們還需要?jiǎng)?chuàng)建事務(wù)消息生產(chǎn)者端的消息監(jiān)聽器,注意是生產(chǎn)者桦卒,不是消費(fèi)者立美,我們需要實(shí)現(xiàn)的是RocketMQLocalTransactionListener
接口,代碼如下:
@RocketMQTransactionListener(txProducerGroup = "tx_order")
public class OrderTXMsgListener implements RocketMQLocalTransactionListener {
@Autowired
private UserRepository userRepository;
private static final Gson GSON = new Gson();
@Override
public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
log.info(">>>> TX message listener execute local transaction, message={},args={} <<<<",msg,arg);
// 執(zhí)行本地事務(wù)
RocketMQLocalTransactionState result = RocketMQLocalTransactionState.COMMIT;
try {
String jsonString = new String((byte[]) msg.getPayload(), StandardCharsets.UTF_8);
OrderEntity orderEntity = GSON.fromJson(jsonString, OrderEntity.class);
String userName = (String) arg;
} catch (Exception e) {
log.error(">>>> exception message={} <<<<",e.getMessage());
result = RocketMQLocalTransactionState.UNKNOWN;
}
return result;
}
@Override
public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
log.info(">>>> TX message listener check local transaction, message={} <<<<",msg.getPayload());
// 檢查本地事務(wù)
RocketMQLocalTransactionState result = RocketMQLocalTransactionState.COMMIT;
try {
String jsonString = new String((byte[]) msg.getPayload(), StandardCharsets.UTF_8);
OrderEntity orderEntity = GSON.fromJson(jsonString, OrderEntity.class);
} catch (Exception e) {
// 異常就回滾
log.error(">>>> exception message={} <<<<",e.getMessage());
result = RocketMQLocalTransactionState.ROLLBACK;
}
return result;
}
}
@RocketMQTransactionListener
表明這個(gè)一個(gè)生產(chǎn)端的消息監(jiān)聽器方灾,需要配置監(jiān)聽的事務(wù)消息生產(chǎn)者組建蹄。而實(shí)現(xiàn)RocketMQLocalTransactionListener
接口,重寫執(zhí)行本地事務(wù)的方法和檢查本地事務(wù)方法裕偿。下面洞慎,我們通過修改生產(chǎn)者端事務(wù)監(jiān)聽器的代碼來觀察代碼的執(zhí)行情況。
二嘿棘、消息事務(wù)測(cè)試
首先還是正常的啟動(dòng)項(xiàng)目劲腿,在執(zhí)行本地事務(wù)方法中正常情況下返回的值是COMMIT
,即提交事務(wù)鸟妙,這種情況下消費(fèi)者會(huì)直接消費(fèi)消息焦人,而略過檢查本地事務(wù)的方法挥吵。調(diào)用該接口,項(xiàng)目日志輸出如下:
>>>> send tx message start,tx_group=tx_order,destination=order_topic:tx_tag,payload=OrderEntity(id=null, userName=lisi, price=8848.00, address=CN-SC-CD-05, createTime=null, updateTime=null, status=20) <<<<
>>>> TX message listener execute local transaction, message=GenericMessage [payload=byte[119], headers={rocketmq_TOPIC=order_topic, rocketmq_FLAG=0, rocketmq_TRANSACTION_ID=C0A800690C3418B4AAC2842438960000, rocketmq_TAGS=tx_tag, id=f32f4848-9acf-20bb-2501-0e6088765897, contentType=application/json, timestamp=1595749766307}],args=lisi <<<<
>>>> send status=SEND_OK,localTransactionState=COMMIT_MESSAGE <<<<
>>>> message={"id":null,"userName":"lisi","price":8848.00,"address":"CN-SC-CD-05","createTime":null,"updateTime":null,"status":"20"} <<<<
通過日志分析可以看出垃瞧,在執(zhí)行完本地事務(wù)方法之后蔫劣,返回的本地事務(wù)狀態(tài)是COMMIT_MESSAGE
坪郭,接著消費(fèi)者消費(fèi)消息个从,和我們的預(yù)期是一樣的。
接下來我們修改下執(zhí)行本地事務(wù)的方法歪沃,讓該方法返回狀態(tài)為RocketMQLocalTransactionState.UNKNOWN
嗦锐,修改之后如下:
public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
log.info(">>>> TX message listener execute local transaction, message={},args={} <<<<",msg,arg);
// 執(zhí)行本地事務(wù)
RocketMQLocalTransactionState result = RocketMQLocalTransactionState.COMMIT;
try {
String jsonString = new String((byte[]) msg.getPayload(), StandardCharsets.UTF_8);
OrderEntity orderEntity = GSON.fromJson(jsonString, OrderEntity.class);
String userName = (String) arg;
int r = 11 / 0;
} catch (Exception e) {
log.error(">>>> exception message={} <<<<",e.getMessage());
result = RocketMQLocalTransactionState.UNKNOWN;
}
return result;
}
這樣因?yàn)榘l(fā)生異常,該方法返回的結(jié)果是UNKNOWN
沪曙,根據(jù)上文的分析奕污,執(zhí)行本地事務(wù)方法之后應(yīng)該會(huì)執(zhí)行檢查本地事務(wù)方法,重啟項(xiàng)目之后液走,再次調(diào)用一下接口碳默,查看日志輸出如下:
>>>> send tx message start,tx_group=tx_order,destination=order_topic:tx_tag,payload=OrderEntity(id=null, userName=zhangsan, price=90001.00, address=CN-SC-CD-02, createTime=null, updateTime=null, status=10) <<<<
>>>> TX message listener execute local transaction, message=GenericMessage [payload=byte[124], headers={rocketmq_TOPIC=order_topic, rocketmq_FLAG=0, rocketmq_TRANSACTION_ID=C0A800690E9D18B4AAC2842BF39A0000, rocketmq_TAGS=tx_tag, id=dfd215f4-2aa6-f377-d1a7-ebbe3875769a, contentType=application/json, timestamp=1595750272928}],args=zhangsan <<<<
>>>> exception message=/ by zero <<<<
>>>> send status=SEND_OK,localTransactionState=UNKNOW <<<<
HikariPool-1 - Thread starvation or clock leap detected (housekeeper delta=1m22s578ms430μs170ns).
>>>> TX message listener check local transaction, message=GenericMessage [payload=byte[124], headers={rocketmq_QUEUE_ID=0, TRANSACTION_CHECK_TIMES=1, rocketmq_TAGS=tx_tag, rocketmq_BORN_TIMESTAMP=1595750272923, rocketmq_TOPIC=order_topic, rocketmq_FLAG=0, rocketmq_MESSAGE_ID=C0A8006900002A9F00000000000156AB, rocketmq_TRANSACTION_ID=C0A800690E9D18B4AAC2842BF39A0000, rocketmq_SYS_FLAG=0, id=ea3c3a7a-23c6-5acf-4c0f-0fa42f795b41, rocketmq_BORN_HOST=192.168.0.105, contentType=application/json, timestamp=1595750310890}] <<<<
>>>> TX message listener check local transaction, message=GenericMessage [payload=byte[124], headers={rocketmq_QUEUE_ID=0, TRANSACTION_CHECK_TIMES=2, rocketmq_TAGS=tx_tag, rocketmq_BORN_TIMESTAMP=1595750272923, rocketmq_TOPIC=order_topic, rocketmq_FLAG=0, rocketmq_MESSAGE_ID=C0A8006900002A9F0000000000015892, rocketmq_TRANSACTION_ID=C0A800690E9D18B4AAC2842BF39A0000, rocketmq_SYS_FLAG=0, id=cddfa35c-c8b2-cb1b-dce7-a26c6888b99a, rocketmq_BORN_HOST=192.168.0.105, contentType=application/json, timestamp=1595750374536}] <<<<
>>>> message={"id":null,"userName":"zhangsan","price":90001.00,"address":"CN-SC-CD-02","createTime":null,"updateTime":null,"status":"10"} <<<<
>>>> message={"id":null,"userName":"zhangsan","price":90001.00,"address":"CN-SC-CD-02","createTime":null,"updateTime":null,"status":"10"} <<<<
根據(jù)日志輸出,在Service
中返回的事務(wù)消息發(fā)送狀態(tài)是SEND_OK
缘眶,但是返回的本地事務(wù)狀態(tài)是UNKNOW
嘱根,所以需要執(zhí)行檢查本地事務(wù)方法,但是這里出現(xiàn)了一個(gè)問題就是檢查本地事務(wù)方法執(zhí)行了兩次巷懈,而且事務(wù)消息也被消費(fèi)了兩次该抒,感覺有點(diǎn)不正常了,但是檢查發(fā)現(xiàn)兩條信息日志中rocketmq_TRANSACTION_ID
是一樣的顶燕,這是什么情況凑保??會(huì)不會(huì)和HikariPool-1 - Thread starvation or clock leap detected (housekeeper delta=1m22s578ms430μs170ns).
有關(guān)呢涌攻,因?yàn)楫?dāng)時(shí)自己使用的DEBUG
模式欧引,看代碼停留了一段時(shí)間,這樣導(dǎo)致Broker
發(fā)起的第一個(gè)回查線程掛起恳谎,而這時(shí)Broker
又啟動(dòng)了一個(gè)線程芝此,從而執(zhí)行了兩次檢查事務(wù)的代碼,而該方法返回的是COMMIT
惠爽,所以癌蓖。
不使用DEBUG
模式重新測(cè)試一下,日志如下:
>>>> send tx message start,tx_group=tx_order,destination=order_topic:tx_tag,payload=OrderEntity(id=null, userName=wangwu, price=9876.00, address=CN-SC-CD-00, createTime=null, updateTime=null, status=10) <<<<
>>>> TX message listener execute local transaction, message=GenericMessage [payload=byte[121], headers={rocketmq_TOPIC=order_topic, rocketmq_FLAG=0, rocketmq_TRANSACTION_ID=C0A800690E9D18B4AAC28432E4130005, rocketmq_TAGS=tx_tag, id=464edcfe-09c1-cc4a-5ac3-f3df888b0102, contentType=application/json, timestamp=1595750727701}],args=wangwu <<<<
>>>> exception message=/ by zero <<<<
>>>> send status=SEND_OK,localTransactionState=UNKNOW <<<<
>>>> TX message listener check local transaction, message=GenericMessage [payload=byte[121], headers={rocketmq_QUEUE_ID=3, TRANSACTION_CHECK_TIMES=1, rocketmq_TAGS=tx_tag, rocketmq_BORN_TIMESTAMP=1595750727699, rocketmq_TOPIC=order_topic, rocketmq_FLAG=0, rocketmq_MESSAGE_ID=C0A8006900002A9F0000000000016109, rocketmq_TRANSACTION_ID=C0A800690E9D18B4AAC28432E4130005, rocketmq_SYS_FLAG=0, id=77765356-fc4d-6d05-3531-6a67fbbed7f7, rocketmq_BORN_HOST=192.168.0.105, contentType=application/json, timestamp=1595750790917}] <<<<
>>>> message={"id":null,"userName":"wangwu","price":9876.00,"address":"CN-SC-CD-00","createTime":null,"updateTime":null,"status":"10"} <<<<
這里輸出的日志信息又沒有問題了婚肆,我個(gè)人認(rèn)為上面應(yīng)該就是DEBUG
導(dǎo)致的租副,這里就不再探討了。
接下來測(cè)試一下较性,在執(zhí)行本地事務(wù)方法中返回ROLLBACK
的情況用僧,這里代碼就省略了结胀,直接返回ROLLBACK
。日志輸出如下:
>>>> send tx message start,tx_group=tx_order,destination=order_topic:tx_tag,payload=OrderEntity(id=null, userName=zhaoliu, price=10000.00, address=CN-SC-CD-03, createTime=null, updateTime=null, status=10) <<<<
>>>> TX message listener execute local transaction, message=GenericMessage [payload=byte[123], headers={rocketmq_TOPIC=order_topic, rocketmq_FLAG=0, rocketmq_TRANSACTION_ID=C0A800691A3A18B4AAC284F72B910000, rocketmq_TAGS=tx_tag, id=d5b24a82-8d8b-90ad-7322-adfe2c4f3026, contentType=application/json, timestamp=1595763591062}],args=zhaoliu <<<<
>>>> exception message=/ by zero <<<<
>>>> send status=SEND_OK,localTransactionState=ROLLBACK_MESSAGE <<<<
沒有執(zhí)行檢驗(yàn)本地事務(wù)的方法责循,和之前說的一樣糟港。到這里我覺得應(yīng)該基本上可以明白生產(chǎn)者端消息監(jiān)聽器中兩個(gè)方法的具體作用了,主要還是理解RocketMQ
事務(wù)消息的基本原理院仿。
校驗(yàn)本地事務(wù)方法的返回值和執(zhí)行本地事務(wù)方法的返回值的作用是一樣的秸抚,這里就不再測(cè)試了。
網(wǎng)上找了一個(gè)圖歹垫,感覺非常的直觀: