一. 概述
常見分布式事務的解決方案有:
- 2PC/3PC, 參考文章: 分布式事務(1)---2PC和3PC原理
- TCC或者GTS(阿里), 參考文章: 分布式事務(2)---TCC原理
- 消息中間件最終一致性, 例: RocketMQ
二. 基礎概念
RocketMQ是一種最終一致性的分布式事務, 就是說它保證的是消息最終一致性
2.1 事務交互流程
說明:
- 發(fā)送方發(fā)送半消息給服務端, 消息中攜帶通知B服務執(zhí)行需要的信息
- 服務端接受半消息成功后給發(fā)送方返回成功的通知
- 發(fā)送方接收到成功通知后開始執(zhí)行本地事務
- 如果本地事務成功, 那么久通知服務端把半消息推送到訂閱方, 否則取消半消息的推送
- 如果因為網(wǎng)絡等原因遲遲沒有返回失敗還是成功椒舵,那么會執(zhí)行RocketMQ的回調(diào)接口, 來進行事務結(jié)果的回查秽褒。
- 檢查本地數(shù)據(jù)庫提交結(jié)果, 查看是否已提交
- 更具查看事務結(jié)果通知服務端是否推送半消息到訂閱方
- 消息推到訂閱方, 訂閱方接收到消息后執(zhí)行事務, 只要保證消費方失敗重試, 就能保證最終一致性
2.2 方法說明
名詞 | 說明 |
---|---|
Half Message | 事務消息 也稱半消息 標識該消息處于"暫時不能投遞"狀態(tài)灰嫉,不會被Comsumer所消費栋操,待服務端收到生成者對該消息的commit或者rollback響應后熔吗,消息會被正常投遞或者回滾(丟棄)消息 |
TransactionMQProducer | 半消息的發(fā)送者 |
TransactionListener | 處理本地事務, 根據(jù)本地事務處理結(jié)果決定半消息是否發(fā)送, 需重寫實現(xiàn)方法: 1.executeLocalTransaction:執(zhí)行本地事務操作; 2.checkLocalTransaction:回查本地事務操作 |
LocalTransactionState | 事務消息的狀態(tài)腮出,有三種狀態(tài):CommitTransaction(提交) 怀偷、RollbackTransaction(回滾)宠叼、Unknown(未知) |
三. 使用示例
現(xiàn)在有個轉(zhuǎn)賬業(yè)務, 用戶A轉(zhuǎn)賬給用戶B, 設計兩個操作: 用戶A扣錢和用戶B加錢, 這兩個操作是在兩個不同的服務器執(zhí)行的.
3.1 實現(xiàn)原理
說明:
- 用戶A在扣款之前鄙皇,先發(fā)送半消息到中間件
- 半消息發(fā)送成功后芜赌,執(zhí)行扣款本地事務
- 扣款事務執(zhí)行成功后,通過推送消息通知另外一臺服務器進行用戶B加錢事務
3.2 用戶A扣錢操作
3.2.1 Service層(實現(xiàn):TransactionListener類伴逸,進行扣錢事務操作)
@Service
public class TransactionListenerImpl implements TransactionListener {
// 執(zhí)行本地事務操作
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
try {
// 開始執(zhí)行用戶A扣錢操作
// 事務提交成功后返回提交狀態(tài)通知
return LocalTransactionState.COMMIT_MESSAGE;
} catch (Exception e) {
// 事務提交失敗后返回回滾狀態(tài)通知
return LocalTransactionState.ROLLBACK_MESSAGE;
}
}
// 回查本地事務操作
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
boolean flag = true; // 查看數(shù)據(jù)庫事務提交結(jié)果, 提交成功返回true, 否則返回false
if(flag){
return LocalTransactionState.COMMIT_MESSAGE;
}else {
return LocalTransactionState.ROLLBACK_MESSAGE;
}
}
}
3.2.2 Controller層(消息發(fā)送)
@RestController
public class Producer {
@Autowired
private ObjectMapper objectMapper;
@Resource
private TransactionListenerImpl transactionListener;
@RequestMapping("/sendMessage")
public Object sendMessage(Map<String,Object> param) {
try {
//消息對象
Message message = new Message();
//設置主題內(nèi)容
message.setBody(objectMapper.writeValueAsString("通知內(nèi)容").getBytes());
message.setTopic("topicName");//設置主題名
message.setTags("topicTag");// 設置標簽
TransactionMQProducer transactionMQProducer = new TransactionMQProducer();
// 設置處理對象
transactionMQProducer.setTransactionListener(transactionListener);
// 發(fā)送半消息
TransactionSendResult sendResult = transactionMQProducer.sendMessageInTransaction(message, null);
if(SendStatus.SEND_OK == sendResult.getSendStatus()){
// 半消息發(fā)送成功
return "success";
}else {
// 半消息發(fā)送失敗
return "error";
}
} catch (Exception e) {
return "error";
}
}
}
3.2 用戶B加錢操作
@Configuration
public class ConsumerService {
@Bean
public DefaultMQPushConsumer defaultMQPushConsumer() throws MQClientException {
//消息接受者
DefaultMQPushConsumer defaultMQPushConsumer = new DefaultMQPushConsumer();
//設置ConsumerGroup
defaultMQPushConsumer.setConsumerGroup("consumerGroupName");
//設置Nameserve
defaultMQPushConsumer.setNamesrvAddr("nameServe");
//設置主題與主題下的標簽
defaultMQPushConsumer.subscribe("topicName", "topicTag");
// 開始接收消息缠沈,當訂閱主題內(nèi)容發(fā)生變化,本方法就會執(zhí)行
defaultMQPushConsumer.registerMessageListener((List<MessageExt> msgs, ConsumeConcurrentlyContext context) -> {
//遍歷消息隊列
msgs.forEach(mt -> {
// 找到對應的通知后, 判斷是否已經(jīng)處理過, 否則進行用戶B的加錢操作
// ......
});
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
defaultMQPushConsumer.start();
return defaultMQPushConsumer;
}
}