問(wèn)題引入
這里的一個(gè)需求場(chǎng)景式:工行用戶A想建行用戶B轉(zhuǎn)賬1萬(wàn)元
我們可以使用同步消息來(lái)處理該需求場(chǎng)景
1掉奄、 工行系統(tǒng)發(fā)送一個(gè)給B贈(zèng)款1萬(wàn)元的同步消息M給Broker
2嗓袱、 消息被Broker成功接收后淮椰,向工行系統(tǒng)發(fā)送成功ACK
3捂贿、 工行系統(tǒng)收到成功ACK后向用戶A中扣款1萬(wàn)元
4泛释、 建行系統(tǒng)從Broker中獲取消息M
5、 建行系統(tǒng)消費(fèi)消息M甚带,即向用戶B中贈(zèng)款1萬(wàn)元
這其中是有問(wèn)題的:若第3步中的扣款操作失敗她肯,但消息已經(jīng)發(fā)送到了Broker佳头。對(duì)于MQ來(lái)說(shuō),只要消息寫(xiě)入成功晴氨,那么這個(gè)消息就可以被消費(fèi)康嘉。此時(shí)建行系統(tǒng)中用戶B增加了1萬(wàn)元。出現(xiàn)了數(shù)據(jù)不一致問(wèn)題
解決思路
解決思路是籽前,讓第1/2/3步具有原子性亭珍,要么全部成功,要么全部失敗枝哄。即消息發(fā)送成功后肄梨,必須要保證扣款成功。如果扣款失敗挠锥,則回滾發(fā)送成功的消息众羡。而該思路即使用事務(wù)消息。這里要使用分布式事務(wù)解決方案蓖租。
使用事務(wù)消息來(lái)處理該需求場(chǎng)景:
1粱侣、 事務(wù)管理器TM向事務(wù)協(xié)調(diào)器TC發(fā)起指令,開(kāi)啟全局事務(wù)
2蓖宦、 工行系統(tǒng)發(fā)一個(gè)給B贈(zèng)款1萬(wàn)元的事務(wù)消息M給TC
3甜害、 TC會(huì)向Broker發(fā)送半事務(wù)消息prepareHalf,將消息M預(yù)提交到Broker球昨。此時(shí)的建行系統(tǒng)是看不到Broker中的消息M的
4沫浆、 Broker會(huì)將預(yù)提交執(zhí)行結(jié)果Report給TC
5、 如果預(yù)提交失敗匿垄,則TC會(huì)向TM上報(bào)預(yù)提交失敗的相應(yīng)亿乳,全局事務(wù)結(jié)束;如果與提交成功共螺,TC會(huì)調(diào)用工行系統(tǒng)的回調(diào)操作该肴,去完成工行用戶A的預(yù)扣款1萬(wàn)元操作
6、 工行系統(tǒng)會(huì)向TC發(fā)送預(yù)扣款執(zhí)行結(jié)果藐不,即本地事務(wù)的執(zhí)行狀態(tài)
7匀哄、 TC收到預(yù)扣款執(zhí)行結(jié)果后,會(huì)將結(jié)果上報(bào)給TM
8雏蛮、 TM會(huì)根據(jù)上報(bào)結(jié)果向TC發(fā)出不同的確認(rèn)指令
- 若預(yù)扣款成功(本地事務(wù)狀態(tài)為COMMIT_MESSAGE)涎嚼,則TM向TC發(fā)送Global Commit指令
- 若預(yù)扣款失敗(本地事務(wù)狀態(tài)為ROLLBACK_MESSAGE)挑秉,則TM向TC發(fā)送Global Rollback指令
- 若未知狀態(tài)(本地事務(wù)狀態(tài)為UNKNOW)法梯,則會(huì)觸發(fā)工行系統(tǒng)的本地事務(wù)狀態(tài)回查操作。回查操作會(huì)將回查結(jié)果立哑,Report給TC夜惭。TC將結(jié)果上報(bào)給TM,TM會(huì)向TC發(fā)送最終確認(rèn)指令
9铛绰、 TC在接收到指令后會(huì)向Broker與工行系統(tǒng)發(fā)出確認(rèn)指令 - TC接收的若是Global Commit指令诈茧,則向Broker與工行系統(tǒng)發(fā)送Branch Commit指令。此時(shí)Broker中的消息M才可被建行系統(tǒng)看到捂掰;此時(shí)的工行用戶A中的扣款操作才真正被確認(rèn)
- TC接收的若是Global Rollback指令若皱,則向Broker與工行系統(tǒng)發(fā)送Branch Rollback指令。此時(shí)Broker中的消息M將被撤銷(xiāo)尘颓;工行用戶A中的扣款操作將被回滾
以上方案就是為了確保消息投遞與扣款操作能夠在一個(gè)事務(wù)中走触,要成功都成功,有一個(gè)失敗疤苹,就全部回滾互广。
以上方案并不是一個(gè)典型的XA模式。因?yàn)閄A模式中的分支事務(wù)是異步的卧土,而事務(wù)消息方案中的消息預(yù)提交與預(yù)扣款操作間是同步的惫皱。
基礎(chǔ)
分布式事務(wù)
對(duì)于分布式事務(wù),通俗地說(shuō)就是尤莺,一次操作由若干分支操作組成旅敷,這些分支操作分屬不同應(yīng)用,分布在不同服務(wù)器上颤霎。分布式事務(wù)需要保證這些分支操作要么全部成功媳谁,要么全部失敗。分布式事務(wù)與普通事務(wù)一樣友酱,就是為了保證操作結(jié)果的一致性
事務(wù)消息
RocketMQ提供了類(lèi)似X/Open XA的分布式事務(wù)功能晴音,通過(guò)事務(wù)消息能達(dá)到分布式事務(wù)的最終一致性。XA是一種分布式事務(wù)解決方案缔杉,一種分布式事務(wù)處理模式锤躁。
半事務(wù)消息
咱不能投遞的消息,發(fā)送方已經(jīng)成功地將消息發(fā)送到了Broker或详,但是Broker未收到最終確認(rèn)指令系羞,此時(shí)該消息被標(biāo)記成“暫不能投遞”狀態(tài),即不能被消費(fèi)者看到霸琴。處于該種狀態(tài)下的消息即半事務(wù)消息椒振。
本地事務(wù)狀態(tài)
Producer回調(diào)操作執(zhí)行的結(jié)果為本地事務(wù)狀態(tài),其會(huì)發(fā)送給TC沈贝,而TC會(huì)再發(fā)送給TM杠人,TM會(huì)根據(jù)TC發(fā)送的本地事務(wù)狀態(tài)來(lái)決定全局事務(wù)確認(rèn)指令。
消息回查
消息回查宋下,即重新查詢本地事務(wù)的執(zhí)行狀態(tài)嗡善。本利就是重新到DB中查看預(yù)扣款操作是否執(zhí)行成功。
注意:消息回查不是重新執(zhí)行回調(diào)操作学歧≌忠回調(diào)操作是進(jìn)行預(yù)扣款操作,而消息回查則是查看預(yù)扣款操作執(zhí)行的結(jié)果
引發(fā)消息回查的原因最常見(jiàn)的有兩個(gè):
1枝笨、 回調(diào)操作返回UNKNOW
2袁铐、TC沒(méi)有接收到TM的最終全局事務(wù)確認(rèn)指令
RocketMQ中的消息回查設(shè)置
關(guān)于消息回查,有三個(gè)常見(jiàn)的屬性設(shè)置横浑。他們都在Broker加載的配置文件中配置剔桨,
- transactionTimeout=20,指定TM在20秒內(nèi)應(yīng)將最終確認(rèn)狀態(tài)發(fā)送給TC徙融,否則引發(fā)消息回查洒缀。默認(rèn)60秒
- transactionCheckMax=5,指定最多回查5次欺冀,吵過(guò)后將丟棄消息并記錄錯(cuò)誤日志树绩。默認(rèn)15次
- transactionCheckInterval=10,指定設(shè)置的多次消息回查的時(shí)間間隔為10秒隐轩,默認(rèn)為60秒
XA模式三劍客
XA協(xié)議
XA(Unix Transaction)是一種分布式事務(wù)解決方案饺饭,一種分布式事務(wù)處理模式,是基于XA協(xié)議的职车。XA協(xié)議有Tuxedo(Transaction for Unix has been Extended for Distributed Operation瘫俊,分布式操作擴(kuò)展之后的Unix事務(wù)系統(tǒng))首先提出,并交給X/Open組織悴灵,作為資源管理器與事務(wù)管理器的接口標(biāo)準(zhǔn)军援。
XA模式中有三個(gè)重要組件:TC、TM称勋、RM
TC
Transaction Coordinator胸哥,事務(wù)協(xié)調(diào)者。維護(hù)全局和分支事務(wù)的狀態(tài)赡鲜,驅(qū)動(dòng)全局事務(wù)提交或回滾
RocketMQ中Broker充當(dāng)著TC
TM
Transaction Manager空厌,事務(wù)管理器。定義全局事務(wù)的范圍:開(kāi)始全局事務(wù)银酬、提交或回滾全局事務(wù)嘲更。它實(shí)際是全局事務(wù)的發(fā)起者
RocketMQ中事務(wù)消息的Producer充當(dāng)著TM
RM
Resource Manager,資源管理器揩瞪。管理分支事務(wù)處理的資源赋朦,與TC交談并注冊(cè)分支事務(wù)和報(bào)告分支事務(wù)的狀態(tài),并驅(qū)動(dòng)分支事務(wù)提交或回滾
RocketMQ中事務(wù)消息的Producer及Broker均是RM
XA模式架構(gòu)
XA模式是一個(gè)典型的2PC,其執(zhí)行原理如下:
1宠哄、 TM向TC發(fā)起指令壹将,開(kāi)啟一個(gè)全局事務(wù)。
2毛嫉、 根據(jù)業(yè)務(wù)要求诽俯,各個(gè)RM會(huì)逐個(gè)向TC注冊(cè)分支事務(wù),然后TC會(huì)逐個(gè)向RM發(fā)出預(yù)執(zhí)行指令承粤。
3暴区、 各個(gè)RM在接收到指令后會(huì)進(jìn)行本地事務(wù)預(yù)執(zhí)行。
4辛臊、 RM將預(yù)執(zhí)行結(jié)果Report給TC仙粱。當(dāng)然,這個(gè)結(jié)果可能成功彻舰,也可能失敗伐割。
5、 TC在接收到各個(gè)RM的Report后會(huì)匯總上報(bào)給TM淹遵,根據(jù)匯總結(jié)果TM會(huì)向TC發(fā)出確認(rèn)指令
6口猜、 TC在接收到指令后再次向RM發(fā)送確認(rèn)指令
事務(wù)消息方案并不是一個(gè)典型的XA模式。因?yàn)閄A模式中的分支事務(wù)是異步的透揣,而事務(wù)消息方案中的消息預(yù)提交與預(yù)扣款操作間是同步的济炎。
注意
- 事務(wù)消息不支持延時(shí)消息
- 對(duì)于事務(wù)消息要做好冪等性檢查,因?yàn)槭聞?wù)消息可能不止一次被消費(fèi)(因?yàn)榇嬖诨貪L后再提交的情況)
代碼
定義工行事務(wù)監(jiān)聽(tīng)器
public class ICBCTransactionListener implements TransactionListener {
// 回調(diào)操作方法
// 消息預(yù)提交成功就會(huì)觸發(fā)該方法的執(zhí)行辐真,用于完成本地事務(wù)
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
System.out.println("預(yù)提交消息成功:" + msg);
// 假設(shè)接收到TAGA的消息就表示扣款操作成功须尚,TAGB的消息表示扣款失敗,
// TAGC表示扣款結(jié)果不清楚侍咱,需要執(zhí)行消息回查
if (StringUtils.equals("TAGA", msg.getTags())) {
return LocalTransactionState.COMMIT_MESSAGE;
} else if (StringUtils.equals("TAGB", msg.getTags())) {
return LocalTransactionState.ROLLBACK_MESSAGE;
} else if (StringUtils.equals("TAGC", msg.getTags())) {
return LocalTransactionState.UNKNOW;
}
return LocalTransactionState.UNKNOW;
}
// 消息回查方法
// 引發(fā)消息回查的原因最常見(jiàn)的有兩個(gè):
// 1)回調(diào)操作返回UNKNWON
// 2)TC沒(méi)有接收到TM的最終全局事務(wù)確認(rèn)指令
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
System.out.println("執(zhí)行消息回查" + msg.getTags());
return LocalTransactionState.COMMIT_MESSAGE;
}
}
定義事務(wù)消息生產(chǎn)者
public class TransactionProducer {
public static void main(String[] args) throws Exception {
TransactionMQProducer producer = new TransactionMQProducer("tpg");
producer.setNamesrvAddr("rocketmqOS:9876");
/**
* 定義一個(gè)線程池
* @param corePoolSize 線程池中核心線程數(shù)量
* @param maximumPoolSize 線程池中最多線程數(shù)
* @param keepAliveTime 這是一個(gè)時(shí)間耐床。當(dāng)線程池中線程數(shù)量大于核心線程數(shù)量 是,
* 多余空閑線程的存活時(shí)長(zhǎng)
* @param unit 時(shí)間單位
* @param workQueue 臨時(shí)存放任務(wù)的隊(duì)列楔脯,其參數(shù)就是隊(duì)列的長(zhǎng)度
* @param threadFactory 線程工廠
*/
ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS,
new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
thread.setName("client-transaction-msg-check-thread");
return thread;
}
});
// 為生產(chǎn)者指定一個(gè)線程池
producer.setExecutorService(executorService);
// 為生產(chǎn)者添加事務(wù)監(jiān)聽(tīng)器
producer.setTransactionListener(new ICBCTransactionListener());
producer.start();
String[] tags = {"TAGA", "TAGB", "TAGC"};
for (int i = 0; i < 3; i++) {
byte[] body = ("Hi," + i).getBytes();
Message msg = new Message("TTopic", tags[i], body);
// 發(fā)送事務(wù)消息
// 第二個(gè)參數(shù)用于指定在執(zhí)行本地事務(wù)時(shí)要使用的業(yè)務(wù)參數(shù)
SendResult sendResult = producer.sendMessageInTransaction(msg, null);
System.out.println("發(fā)送結(jié)果為:" + sendResult.getSendStatus());
}
}
}