在《RocketMQ實戰(zhàn)入門》里我們入門了基本的RocketMQ消息發(fā)布和消費,并封裝了一個簡單的util包迎膜,現在我們來看一下如何使用RocketMQ的事務消息來解決分布式事務問題捏浊。
事務消息基本流程
說明福扬,RocketMQ來實現分布式事務主要基于的是BASE理論毅贮,即基本可用昆咽、軟狀態(tài)驾凶、最終一致性。屬于剛性事務與柔性事務中的后者掷酗,性能較好调违,但取的是最終一致性。流程類似于2PC泻轰,但是個異步過程技肩。
流程如下:
Producer發(fā)送半消息給RocketMQ,收到回復后開始執(zhí)行自己本地的事務浮声。半消息不會被消費虚婿。
執(zhí)行本地事務并將執(zhí)行的結果發(fā)送通知RocketMQ旋奢,MQ根據結果來確定之前的半消息是提交還是丟棄。
如果半消息提交然痊,RocketMQ將負責確保該消息被Consumer消費至朗,Consumer消費到消息即執(zhí)行自己的事務。
異常情況:
如果Producer沒有將本地事務結果發(fā)送給MQ玷过,此時事務處于UNKNOW狀態(tài)爽丹,則MQ每60s會通過Producer提供的回調接口來反查其事務執(zhí)行結果來決定半消息是提交還是丟棄。默認會執(zhí)行此回調15次辛蚊,如果還是UNKNOW則丟棄半消息粤蝎,并打印錯誤日志。
如果Producer發(fā)送完半消息之后掛了袋马,MQ會去同一個group里的其他Producer調用回調接口反查事務執(zhí)行結果初澎。
代碼編寫
RocketMQ發(fā)送事務消息主要是使用TransactionMQProducer
這個Producer。然后實現TransactionListener
接口虑凛,并將該Listener綁定到Producer上碑宴。
TransactionMQProducer
與普通消息的DefaultMQProducer
比較類似,而TransactionListener
接口需要實現executeLocalTransaction()和checkLocalTransaction()兩個方法桑谍,分別寫半消息發(fā)送成功后的本地事務邏輯以及供MQ回調的反查事務結果邏輯延柠。
參考官方例子https://github.com/apache/rocketmq/blob/master/docs/cn/RocketMQ_Example.md,實際使用中一般應該是在executeLocalTransaction()
根據業(yè)務邏輯執(zhí)行情況直接返回明確的事務結果是Commit還是Rollback的锣披。這里為了測試故意在executeLocalTransaction()返回事務結果UNKNOW贞间,讓MQ執(zhí)行反查邏輯回調checkLocalTransaction()
方法來獲得事務結果。代碼如下:(部分代碼在前文《RocketMQ實戰(zhàn)入門》中有)
TransactionProducer:
/**
* 事務消息Producer
* 用于發(fā)送事務消息雹仿,實現分布式事務
*
* 跟普通消息用的Producer不同增热,事務Producer沒法全局用個單例,
* 因為需要綁定TransactionListener的業(yè)務處理邏輯胧辽。
* */
@Slf4j
@Component
public class TransactionMsgProducer {
@Value("${rocketmq.url}")
private String mqurl;
@Value("${rocketmq.accessKey}")
private String accessKey;
@Value("${rocketmq.secretKey}")
private String secretKey;
@Value("${rocketmq.producergroup.name}")
private String producerGroupName;
private TransactionMQProducer producer;
public TransactionSendResult publish(EventMessage eventMsg) {
try {
Message msg = new Message(eventMsg.getTopic(), eventMsg.getTag(), eventMsg.getMsgId(), JSON.toJSONString(eventMsg).getBytes("utf-8"));
TransactionSendResult result = producer.sendMessageInTransaction(msg, null);
return result;
} catch (Exception e) {
log.error("事務消息發(fā)送失敗" + e.getMessage(), e);
e.printStackTrace();
}
return null;
}
@PostConstruct
public void init() {
producer = new TransactionMQProducer(producerGroupName + "-trans", getAclRPCHook());
producer.setNamesrvAddr(mqurl);
TransactionListener transactionListener = new MyTransactionListener();
producer.setTransactionListener(transactionListener);
try {
producer.start();
log.info("RocketMQ客戶端事務producer初始化...");
} catch (MQClientException e) {
e.printStackTrace();
}
}
@PreDestroy
public void shutdown() {
if(producer != null)
producer.shutdown();
}
private RPCHook getAclRPCHook() {
return new AclClientRPCHook(new SessionCredentials(accessKey, secretKey));
}
}
TransactionListener:
/**
* 半消息發(fā)送成功則執(zhí)行executeLocalTransaction
* RocketMQ回調checkLocalTransaction來確認Producer本地的事務執(zhí)行狀態(tài)
* */
@Slf4j
public class MyTransactionListener implements TransactionListener{
private AtomicInteger transactionIndex = new AtomicInteger(0);
private ConcurrentHashMap<String, Integer> localTransactions = new ConcurrentHashMap<>();
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
log.info("Producer開始執(zhí)行本地事務...");
int value = transactionIndex.getAndIncrement();
localTransactions.put(msg.getTransactionId(), value % 3);
log.info("事務ID:{},執(zhí)行狀態(tài){}, 但統一返回RocketMQ事務狀態(tài)為UNKNOW", msg.getTransactionId(), value % 3);
return LocalTransactionState.UNKNOW;
}
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
Integer state = localTransactions.get(msg.getTransactionId());
log.info("事務ID:{}執(zhí)行后返回MQ狀態(tài)為UNKNOW峻仇,觸發(fā)RocketMQ回調查詢事務狀態(tài)機制,"
+ "MQ調用Producer提供的回調接口查詢事務狀態(tài)為{}", msg.getTransactionId(), state);
if(null != state) {
switch(state) {
case 0:
log.info("從肥兔子賬戶-300操作還沒執(zhí)行完");
return LocalTransactionState.UNKNOW;
case 1:
log.info("已經成功從肥兔子賬戶-300");
return LocalTransactionState.COMMIT_MESSAGE;
case 2:
log.info("從肥兔子賬戶-300失敗");
return LocalTransactionState.ROLLBACK_MESSAGE;
}
}
return LocalTransactionState.COMMIT_MESSAGE;
}
}
消費者邑商,也即分布式事務的其他參與者:
@Slf4j
@Component
@MsgConsumer(consumerGroup = "testapp-consumer-group", tag = "douchuzi", topic = "test-trans-topic")
public class TestConsumer implements MQMsgHandler{
@Override
public void handleMsg(List<EventMessage> msgs) {
for(EventMessage msg : msgs) {
log.info(JSON.toJSONString(msg));
}
}
}
測試代碼:
EventMessage msg = new EventMessage();
msg.setTopic("test-trans-topic");
msg.setTag("douchuzi");
msg.setMsgId("123");
msg.setProducerGroup("testapp-producer-group");
msg.setMsgBody("給豆畜子賬戶+300塊");
msg.setPublishTime(LocalDateTime.now().toString());
TransactionSendResult result = transactionMsgProducer.publish(msg);
log.info("肥兔子給豆畜子轉賬的事務消息發(fā)送完畢摄咆,"
+ "消息ID:{}, 事務ID{},消息發(fā)送狀態(tài):{},分布式事務開始人断!",
result.getMsgId(),
result.getTransactionId(),
result.getSendStatus());
測試執(zhí)行結果分3種情況豆同,執(zhí)行3次一個循環(huán)。
反查本地事務UNKNOW:
2022-02-01 22:48:37.172 INFO 12388 --- [nio-8080-exec-2] c.w.controller.MyTransactionListener : Producer開始執(zhí)行本地事務...
2022-02-01 22:48:37.173 INFO 12388 --- [nio-8080-exec-2] c.w.controller.MyTransactionListener : 事務ID:7F00000130644E0E2F2A04E502240000,執(zhí)行狀態(tài)0, 但統一返回RocketMQ事務狀態(tài)為UNKNOW
2022-02-01 22:48:37.174 INFO 12388 --- [nio-8080-exec-2] com.wangan.controller.WanganController : 肥兔子給豆畜子轉賬的事務消息發(fā)送完畢含鳞,消息ID:7F00000130644E0E2F2A04E502240000, 事務IDnull,消息發(fā)送狀態(tài):SEND_OK,分布式事務開始芹务!
2022-02-01 22:49:05.702 INFO 12388 --- [pool-1-thread-1] c.w.controller.MyTransactionListener : 事務ID:7F00000130644E0E2F2A04E502240000執(zhí)行后返回MQ狀態(tài)為UNKNOW蝉绷,觸發(fā)RocketMQ回調查詢事務狀態(tài)機制鸭廷,MQ調用Producer提供的回調接口查詢事務狀態(tài)為0
2022-02-01 22:49:05.702 INFO 12388 --- [pool-1-thread-1] c.w.controller.MyTransactionListener : 從肥兔子賬戶-300操作還沒執(zhí)行完
2022-02-01 22:50:05.700 INFO 12388 --- [pool-1-thread-1] c.w.controller.MyTransactionListener : 事務ID:7F00000130644E0E2F2A04E502240000執(zhí)行后返回MQ狀態(tài)為UNKNOW,觸發(fā)RocketMQ回調查詢事務狀態(tài)機制熔吗,MQ調用Producer提供的回調接口查詢事務狀態(tài)為0
2022-02-01 22:50:05.700 INFO 12388 --- [pool-1-thread-1] c.w.controller.MyTransactionListener : 從肥兔子賬戶-300操作還沒執(zhí)行完
2022-02-01 22:51:05.697 INFO 12388 --- [pool-1-thread-1] c.w.controller.MyTransactionListener : 事務ID:7F00000130644E0E2F2A04E502240000執(zhí)行后返回MQ狀態(tài)為UNKNOW辆床,觸發(fā)RocketMQ回調查詢事務狀態(tài)機制,MQ調用Producer提供的回調接口查詢事務狀態(tài)為0
2022-02-01 22:51:05.697 INFO 12388 --- [pool-1-thread-1] c.w.controller.MyTransactionListener : 從肥兔子賬戶-300操作還沒執(zhí)行完
發(fā)送完半消息之后執(zhí)行本地事務并回復MQ事務執(zhí)行結果unknow桅狠,這樣MQ會反查回調接口來確認事務結果讼载,1分鐘查1次,默認15次仍無法確認事務結果則丟棄半消息中跌,這里為了測試方便改成了3次(修改broker.conf
里的transactionCheckMax=3
)咨堤。
反查本地事務COMMIT_MESSAGE:
2022-02-01 22:53:27.323 INFO 12388 --- [nio-8080-exec-5] c.w.controller.MyTransactionListener : Producer開始執(zhí)行本地事務...
2022-02-01 22:53:27.323 INFO 12388 --- [nio-8080-exec-5] c.w.controller.MyTransactionListener : 事務ID:7F00000130644E0E2F2A04E96F880001,執(zhí)行狀態(tài)1, 但統一返回RocketMQ事務狀態(tài)為UNKNOW
2022-02-01 22:53:27.323 INFO 12388 --- [nio-8080-exec-5] com.wangan.controller.WanganController : 肥兔子給豆畜子轉賬的事務消息發(fā)送完畢,消息ID:7F00000130644E0E2F2A04E96F880001, 事務IDnull,消息發(fā)送狀態(tài):SEND_OK漩符,分布式事務開始一喘!
2022-02-01 22:54:05.708 INFO 12388 --- [pool-1-thread-1] c.w.controller.MyTransactionListener : 事務ID:7F00000130644E0E2F2A04E96F880001執(zhí)行后返回MQ狀態(tài)為UNKNOW,觸發(fā)RocketMQ回調查詢事務狀態(tài)機制嗜暴,MQ調用Producer提供的回調接口查詢事務狀態(tài)為1
2022-02-01 22:54:05.708 INFO 12388 --- [pool-1-thread-1] c.w.controller.MyTransactionListener : 已經成功從肥兔子賬戶-300
2022-02-01 22:54:05.718 INFO 12388 --- [MessageThread_6] com.wangan.controller.TestConsumer : {"msgBody":"給豆畜子賬戶+300塊","msgId":"123","producerGroup":"testapp-producer-group","publishTime":"2022-02-01T22:53:27.304","tag":"douchuzi","topic":"test-trans-topic"}
MQ反查事務結果凸克,獲知事務成功,MQ提交半消息闷沥,消息即可被消費者正常消費并執(zhí)行其事務躺率。分布式事務完成。
反查本地事務ROLLBACK_MESSAGE:
2022-02-01 22:54:59.306 INFO 12388 --- [nio-8080-exec-8] c.w.controller.MyTransactionListener : Producer開始執(zhí)行本地事務...
2022-02-01 22:54:59.306 INFO 12388 --- [nio-8080-exec-8] c.w.controller.MyTransactionListener : 事務ID:7F00000130644E0E2F2A04EAD6DD0002,執(zhí)行狀態(tài)2, 但統一返回RocketMQ事務狀態(tài)為UNKNOW
2022-02-01 22:54:59.307 INFO 12388 --- [nio-8080-exec-8] com.wangan.controller.WanganController : 肥兔子給豆畜子轉賬的事務消息發(fā)送完畢执俩,消息ID:7F00000130644E0E2F2A04EAD6DD0002, 事務IDnull,消息發(fā)送狀態(tài):SEND_OK背捌,分布式事務開始!
2022-02-01 22:56:05.718 INFO 12388 --- [pool-1-thread-1] c.w.controller.MyTransactionListener : 事務ID:7F00000130644E0E2F2A04EAD6DD0002執(zhí)行后返回MQ狀態(tài)為UNKNOW颖侄,觸發(fā)RocketMQ回調查詢事務狀態(tài)機制鸟雏,MQ調用Producer提供的回調接口查詢事務狀態(tài)為2
2022-02-01 22:56:05.718 INFO 12388 --- [pool-1-thread-1] c.w.controller.MyTransactionListener : 從肥兔子賬戶-300失敗
反查事務結果為事務失敗,MQ刪除半消息览祖。
總結
RocketMQ事務消息可以實現柔性分布式事務孝鹊,確保最終一致性,不鎖定數據庫資源展蒂,適合高并發(fā)場景又活。