在分布式消息隊列中,目前唯一提供完整的事務消息的瞬浓,只有 RocketMQ 。關于這一點蓬坡,還是可以鼓吹下的猿棉。
可能會有胖友怒噴艿艿磅叛,RabbitMQ 和 Kafka 也有事務消息啊,也支持發(fā)送事務消息的發(fā)送萨赁,以及后續(xù)的事務消息的 commit提交或 rollbackc 回滾弊琴。但是要考慮一個極端的情況,在本地數(shù)據(jù)庫事務已經(jīng)提交的時時候杖爽,如果因為網(wǎng)絡原因敲董,又或者崩潰等等意外,導致事務消息沒有被 commit 慰安,最終導致這條事務消息丟失腋寨,分布式事務出現(xiàn)問題。
相比來說化焕,RocketMQ 提供事務回查機制萄窜,如果應用超過一定時長未 commit 或 rollback 這條事務消息,RocketMQ 會主動回查應用撒桨,詢問這條事務消息是 commit 還是 rollback 查刻,從而實現(xiàn)事務消息的狀態(tài)最終能夠被 commit 或是 rollback ,達到最終事務的一致性凤类。
這也是為什么艿艿在上面專門加粗“完整的”三個字的原因穗泵。可能上述的描述谜疤,對于絕大多數(shù)沒有了解過分布式事務的胖友佃延,會比較陌生,所以推薦閱讀如下文章:
雖然說 RabbitMQ茎截、Kafka 并未提供完整的事務消息苇侵,但是社區(qū)里,已經(jīng)基于它們之上拓展企锌,提供了事務回查的功能榆浓。例如說:Myth ,采用消息隊列解決分布式事務的開源框架, 基于 Java 語言來開發(fā)(JDK1.8)撕攒,支持 Dubbo肚吏,Spring Cloud,Motan 等 RPC 框架進行分布式事務生宛。
下面味咳,我們來搭建一個 RocketMQ 定時消息的使用示例〔晾考慮方便脊阴,我們直接復用[快速入門]小節(jié)的項目,復制原producer項目成 [sca-stream-rocketmq-producer-transaction
]發(fā)送事務消息,繼續(xù)使用 [sca-stream-rocketmq-consumer
] 消費消息嘿期。
8.1 復制項目
從 [sca-stream-rocketmq-producer
] 復制出 [sca-stream-rocketmq-producer-transaction
]來發(fā)送事務消息品擎。
8.2 配置文件
修改 application.yml
配置文件,添加 transactional
配置項為 true
备徐,設置 Producer 發(fā)送事務消息萄传。完整配置如下:
spring:
application:
name: stream-rocketmq-producer-application
cloud:
# Spring Cloud Stream 配置項,對應 BindingServiceProperties 類
stream:
# Binding 配置項蜜猾,對應 BindingProperties Map
bindings:
erbadagang-output:
destination: ERBADAGANG-TOPIC-01 # 目的地秀菱。這里使用 RocketMQ Topic
content-type: application/json # 內(nèi)容格式。這里使用 JSON
trek-output:
destination: TREK-TOPIC-01 # 目的地蹭睡。這里使用 RocketMQ Topic
content-type: application/json # 內(nèi)容格式衍菱。這里使用 JSON
# Spring Cloud Stream RocketMQ 配置項
rocketmq:
# RocketMQ Binder 配置項,對應 RocketMQBinderConfigurationProperties 類
binder:
name-server: 101.133.227.13:9876 # RocketMQ Namesrv 地址
# RocketMQ 自定義 Binding 配置項棠笑,對應 RocketMQBindingProperties Map
bindings:
erbadagang-output:
# RocketMQ Producer 配置項梦碗,對應 RocketMQProducerProperties 類
producer:
group: test # 生產(chǎn)者分組
sync: true # 是否同步發(fā)送消息,默認為 false 異步蓖救。
transactional: true # 是否發(fā)送事務消息洪规,默認為 false。
server:
port: 18080
8.3 Demo01Controller
修改 [Demo01Controller]類循捺,增加發(fā)送事務消息的 HTTP 接口斩例。代碼如下:
/**
* 事務消息
*
* @return
*/
@GetMapping("/send_transaction")
public boolean sendTransaction() {
// 創(chuàng)建 Message
Demo01Message message = new Demo01Message()
.setId(new Random().nextInt());
// 創(chuàng)建 Spring Message 對象
Args args = new Args().setArgs1(1).setArgs2("2");
Message<Demo01Message> springMessage = MessageBuilder.withPayload(message)
.setHeader("args", JSON.toJSONString(args)) // <X>
.build();
// 發(fā)送消息
return mySource.erbadagangOutput().send(springMessage);
}
public static class Args {
private Integer args1;
private String args2;
public Integer getArgs1() {
return args1;
}
public Args setArgs1(Integer args1) {
this.args1 = args1;
return this;
}
public String getArgs2() {
return args2;
}
public Args setArgs2(String args2) {
this.args2 = args2;
return this;
}
@Override
public String toString() {
return "Args{" +
"args1=" + args1 +
", args2='" + args2 + '\'' +
'}';
}
}
因為 Spring Cloud Stream 在設計時,并沒有考慮事務消息从橘,所以我們只好在 <X> 處念赶,通過 Header 傳遞參數(shù)。
又因為 Header 后續(xù)會被轉換成 String 類型恰力,導致我們無法獲得正確的真實的原始參數(shù)叉谜,所以這里我們先使用 JSON 將 args 參數(shù)序列化成字符串,這樣后續(xù)我們可以使用 JSON 反序列化回來踩萎。
8.4 TransactionListenerImpl
創(chuàng)建 [TransactionListenerImpl]類停局,實現(xiàn) MQ 事務的監(jiān)聽。代碼如下:
package com.erbadagang.springcloudalibaba.stream.rocketmq.producer.message;
import com.alibaba.fastjson.JSON;
import com.erbadagang.springcloudalibaba.stream.rocketmq.producer.controller.Demo01Controller;
import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionState;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.messaging.Message;
/**
* @description 實現(xiàn) MQ 事務的監(jiān)聽香府。
* @ClassName: TransactionListenerImpl
* @author: 郭秀志 jbcode@126.com
* @date: 2020/8/7 9:21
* @Copyright:
*/
@RocketMQTransactionListener(txProducerGroup = "test")
public class TransactionListenerImpl implements RocketMQLocalTransactionListener {
private Logger logger = LoggerFactory.getLogger(getClass());
@Override
public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
// 從消息 Header 中解析到 args 參數(shù)董栽,并使用 JSON 反序列化
Demo01Controller.Args args = JSON.parseObject(msg.getHeaders().get("args", String.class),
Demo01Controller.Args.class);
// ... local transaction process, return rollback, commit or unknown
logger.info("[executeLocalTransaction][執(zhí)行本地事務,消息:{} args:{}]", msg, args);
return RocketMQLocalTransactionState.UNKNOWN;
}
@Override
public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
// ... check local transaction status and return rollback, commit or unknown
logger.info("[checkLocalTransaction][回查消息:{}]", msg);
return RocketMQLocalTransactionState.COMMIT;
}
}
① 在類上企孩,添加 @RocketMQTransactionListener
注解锭碳,聲明監(jiān)聽器的是生產(chǎn)者分組是 "test"
的 Producer 發(fā)送的事務消息。因為 RocketMQ 是回查(請求)指定指定生產(chǎn)分組下的 Producer勿璃,從而獲得事務消息的狀態(tài)擒抛,所以一定要正確設置推汽。
② 實現(xiàn) RocketMQLocalTransactionListener 接口,實現(xiàn)執(zhí)行本地事務和檢查本地事務的方法歧沪。
③ 實現(xiàn) #executeLocalTransaction(...)
方法民泵,實現(xiàn)執(zhí)行本地事務。
注意槽畔,這是一個模板方法。在調(diào)用這個方法之前胁编,Spring Cloud Alibaba Stream RocketMQ 已經(jīng)使用 Producer 發(fā)送了一條事務消息厢钧,本方法里面我們使用消息內(nèi)容進行本地數(shù)據(jù)庫事務操作。然后根據(jù)該方法執(zhí)行的返回的 RocketMQLocalTransactionState 結果嬉橙,提交還是回滾該事務消息早直。
這里,我們?yōu)榱四M需要 RocketMQ
回查
Producer 來獲得事務消息的狀態(tài)市框,所以返回了RocketMQLocalTransactionState.UNKNOWN
——未知狀態(tài)霞扬。
④ 實現(xiàn) #checkLocalTransaction(...)
方法,檢查本地事務枫振。
- 在事務消息長事件未被提交或回滾時喻圃,RocketMQ 會回查事務消息對應的生產(chǎn)者分組下的 Producer ,獲得事務消息的狀態(tài)粪滤。此時斧拍,該方法就會被調(diào)用。
- 這里杖小,我們直接返回
RocketMQLocalTransactionState.COMMIT
提交狀態(tài)肆汹。
一般來說,有兩種方式實現(xiàn)本地事務回查時予权,返回事務消息的狀態(tài)昂勉。
第一種,通過 msg
消息扫腺,獲得某個業(yè)務上的標識或者編號岗照,然后去數(shù)據(jù)庫中查詢業(yè)務記錄,從而判斷該事務消息的狀態(tài)是提交還是回滾斧账。
第二種谴返,記錄 msg
的事務編號,與事務狀態(tài)到數(shù)據(jù)庫中咧织。
- 第一步嗓袱,在
#executeLocalTransaction(...)
方法中,先存儲一條id
為msg
的事務編號习绢,狀態(tài)為RocketMQLocalTransactionState.UNKNOWN
的記錄渠抹。 - 第二步蝙昙,調(diào)用帶有事務的業(yè)務 Service 的方法。在該 Service 方法中梧却,在邏輯都執(zhí)行成功的情況下奇颠,更新
id
為msg
的事務編號,狀態(tài)變更為RocketMQLocalTransactionState.COMMIT
放航。這樣烈拒,我們就可以伴隨這個事務的提交,更新id
為msg
的事務編號的記錄的狀為RocketMQLocalTransactionState.COMMIT
广鳍,美滋滋荆几。。 - 第三步赊时,要以
try-catch
的方式吨铸,調(diào)用業(yè)務 Service 的方法。如此祖秒,如果發(fā)生異常诞吱,回滾事務的時候,可以在catch
中竭缝,更新id
為msg
的事務編號的記錄的狀態(tài)為RocketMQLocalTransactionState.ROLLBACK
房维。?? 極端情況下,可能更新失敗抬纸,則打印 error 日志握巢,告警知道,人工介入松却。 - 如此三步之后暴浦,我們在
#executeLocalTransaction(...)
方法中,就可以通過查找數(shù)據(jù)庫晓锻,id
為msg
的事務編號的記錄的狀態(tài)歌焦,然后返回。
相比來說砚哆,傾向第一種独撇,實現(xiàn)更加簡單通用,對于業(yè)務開發(fā)者躁锁,更加友好纷铣。和有幾個朋友溝通了下,但他們是采用第二種战转。
8.5 簡單測試
① 執(zhí)行 ConsumerApplication搜立,啟動消費者的實例。
② 執(zhí)行 ProducerApplication槐秧,啟動生產(chǎn)者的實例啄踊。
之后忧设,請求 http://127.0.0.1:18080/demo01/send_transaction 接口,發(fā)送事務消息颠通。IDEA 控制臺輸出日志如下:
// ProduerApplication 控制臺
// ### TransactionListenerImpl 執(zhí)行 executeLocalTransaction 方法址晕,先執(zhí)行本地事務的邏輯
2020-08-07 09:53:13.715 INFO 5656 --- [io-18080-exec-5] c.e.s.s.r.p.m.TransactionListenerImpl : [executeLocalTransaction][執(zhí)行本地事務,消息:GenericMessage [payload=byte[16], headers={args={"args1":1,"args2":"erbadagang神車"}, rocketmq_TOPIC=ERBADAGANG-TOPIC-01, rocketmq_FLAG=0, rocketmq_TRANSACTION_ID=C0A82B7C161818B4AAC2210545790002, id=06cfec82-f118-78df-5f8c-c6d273b69581, contentType=application/json, timestamp=1596765193715}] args:Args{args1=1, args2='erbadagang神車'}]
// ### Producer 發(fā)送事務消息成功顿锰,但是因為 executeLocalTransaction 方法返回的是 UNKOWN 狀態(tài)谨垃,所以事務消息并未提交或者回滾
// ### RocketMQ Broker 在發(fā)送事務消息 38 秒后,發(fā)現(xiàn)事務消息還未提交或是回滾硼控,所以回查 Producer 乘客。此時,checkLocalTransaction 方法返回 COMMIT 淀歇,所以該事務消息被提交。
2020-08-07 09:53:51.368 INFO 5656 --- [pool-1-thread-1] c.e.s.s.r.p.m.TransactionListenerImpl : [checkLocalTransaction][回查消息:GenericMessage [payload=byte[16], headers={rocketmq_QUEUE_ID=3, TRANSACTION_CHECK_TIMES=1, rocketmq_BORN_TIMESTAMP=1596765193593, args={"args1":1,"args2":"erbadagang神車"}, rocketmq_TOPIC=ERBADAGANG-TOPIC-01, rocketmq_FLAG=0, rocketmq_MESSAGE_ID=6585E30D00002A9F000000000005005F, rocketmq_TRANSACTION_ID=C0A82B7C161818B4AAC2210545790002, rocketmq_SYS_FLAG=0, id=e3304269-c7da-b3d5-864a-2c0fb8a713e5, CLUSTER=DefaultCluster, rocketmq_BORN_HOST=103.3.96.229, contentType=application/json, timestamp=1596765231368}]]
// ConsumerApplication 控制臺
// ### 事務消息被提交匈织,所以該消息被 Consumer 消費
2020-08-07 09:53:51.602 INFO 8832 --- [MessageThread_1] c.e.s.s.r.c.listener.Demo01Consumer : [onMessage][線程編號:78 消息內(nèi)容:Demo01Message{id=997254686}]
底線
本文源代碼使用 Apache License 2.0開源許可協(xié)議浪默,這里是本文源碼Gitee地址,可通過命令git clone+地址
下載代碼到本地缀匕,也可直接點擊鏈接通過瀏覽器方式查看源代碼纳决。