什么是事務(wù)消息
事務(wù)消息(Transactional Message)是指應(yīng)用本地事務(wù)和發(fā)送消息操作可以被定義到全局事務(wù)中,要么同時(shí)成功,要么同時(shí)失敗窗声。RocketMQ的事務(wù)消息提供類似 X/Open XA 的分布事務(wù)功能仔引,通過(guò)事務(wù)消息能達(dá)到分布式事務(wù)的最終一致。
事務(wù)消息所對(duì)應(yīng)的場(chǎng)景
在一些對(duì)數(shù)據(jù)一致性有強(qiáng)需求的場(chǎng)景芜赌,可以用 Apache RocketMQ 事務(wù)消息來(lái)解決仰挣,從而保證上下游數(shù)據(jù)的一致性伴逸。
以秒殺購(gòu)物商城的商品下單交易場(chǎng)景為例,用戶支付訂單這一核心操作的同時(shí)會(huì)涉及到下游物流發(fā)貨膘壶、庫(kù)存變更错蝴、購(gòu)物車狀態(tài)清空等多個(gè)子系統(tǒng)的變更。
事務(wù)性業(yè)務(wù)的處理分支包括:
- 主分支訂單系統(tǒng)狀態(tài)更新:由未支付變更為支付成功颓芭。
- 調(diào)用第三方物流系統(tǒng)狀態(tài)新增:新增待發(fā)貨物流記錄顷锰,創(chuàng)建訂單物流記錄。
- 積分系統(tǒng)狀態(tài)變更:變更用戶積分亡问,更新用戶積分表官紫。
- 購(gòu)物車系統(tǒng)狀態(tài)變更:清空購(gòu)物車,更新用戶購(gòu)物車記錄州藕。
RocketMQ的事務(wù)消息
Apache RocketMQ在4.3.0版的時(shí)候已經(jīng)支持分布式事務(wù)消息束世,這里RocketMQ采用了2PC的思想來(lái)實(shí)現(xiàn)了提交事務(wù)消息,同時(shí)增加一個(gè)補(bǔ)償邏輯來(lái)處理二階段超時(shí)或者失敗的消息床玻。
RocketMQ事務(wù)消息流程
針對(duì)于事務(wù)消息的總體運(yùn)作流程毁涉,主要分為兩個(gè)部分:正常事務(wù)消息的發(fā)送及提交、事務(wù)消息的補(bǔ)償流程锈死。
事務(wù)消息發(fā)送及提交基本流程概要(后面會(huì)詳細(xì)分析原理)
事務(wù)消息發(fā)送步驟如下
- 消息發(fā)送者:生產(chǎn)者將半事務(wù)消息發(fā)送至RocketMQ Broker贫堰。
- Broker服務(wù)端:RocketMQ Broker 將消息持久化成功之后,向生產(chǎn)者返回 Ack 確認(rèn)消息已經(jīng)發(fā)送成功待牵,此時(shí)消息暫不能投遞其屏,為半事務(wù)消息。
- 業(yè)務(wù)系統(tǒng):生產(chǎn)者開(kāi)始執(zhí)行本地事務(wù)邏輯缨该。
- 生產(chǎn)者根據(jù)本地事務(wù)執(zhí)行結(jié)果向服務(wù)端提交二次確認(rèn)結(jié)果(Commit或是Rollback)偎行。
- 如果本地操作成功,Commit操作生成消息索引压彭,消息對(duì)消費(fèi)者可見(jiàn)
- 如果本地操作失敗睦优,此時(shí)對(duì)應(yīng)的half消息對(duì)業(yè)務(wù)不可見(jiàn),本地邏輯不執(zhí)行壮不,Rollback均進(jìn)行回滾汗盘。
服務(wù)端收到確認(rèn)結(jié)果后處理邏輯如下
- 確認(rèn)結(jié)果為Commit:服務(wù)端將半事務(wù)消息標(biāo)記為可投遞,并投遞給消費(fèi)者询一。
- 確認(rèn)結(jié)果為Rollback:服務(wù)端將回滾事務(wù)隐孽,不會(huì)將半事務(wù)消息投遞給消費(fèi)者癌椿。
消息出現(xiàn)異常情況的補(bǔ)償流程如下
在斷網(wǎng)或者是生產(chǎn)者應(yīng)用重啟的特殊情況下,若服務(wù)端未收到發(fā)送者提交的二次確認(rèn)結(jié)果(Commit/Rollback)或服務(wù)端收到的二次確認(rèn)結(jié)果為Unknown未知狀態(tài)菱阵,經(jīng)過(guò)固定時(shí)間后踢俄,服務(wù)端將對(duì)消息生產(chǎn)者即生產(chǎn)者集群中任一生產(chǎn)者實(shí)例發(fā)起消息回查。
注意:服務(wù)端僅僅會(huì)按照參數(shù)嘗試指定次數(shù)晴及,超過(guò)次數(shù)后事務(wù)會(huì)強(qiáng)制回滾都办,因此未決事務(wù)的回查時(shí)效性非常關(guān)鍵,需要按照業(yè)務(wù)的實(shí)際風(fēng)險(xiǎn)來(lái)設(shè)置
事務(wù)消息回查步驟如下
- 生產(chǎn)者收到消息回查后虑稼,需要檢查對(duì)應(yīng)消息的本地事務(wù)執(zhí)行的最終結(jié)果琳钉。
- 生產(chǎn)者根據(jù)檢查得到的本地事務(wù)的最終狀態(tài)再次提交二次確認(rèn),服務(wù)端仍按照步驟4對(duì)半事務(wù)消息進(jìn)行處理蛛倦。
補(bǔ)償總結(jié)
- 對(duì)沒(méi)有Commit/Rollback的事務(wù)消息(pending狀態(tài)的消息)歌懒,Broker服務(wù)端會(huì)發(fā)起一次“回查”。
- 生產(chǎn)者Producer收到回查消息溯壶,檢查回查消息對(duì)應(yīng)的本地事務(wù)的狀態(tài)及皂。
- 生產(chǎn)者根據(jù)本地事務(wù)狀態(tài),重新Commit或者Rollback且改。
補(bǔ)償階段用于解決消息Commit或者Rollback發(fā)生超時(shí)或者失敗的情況
RocketMQ事務(wù)消息實(shí)現(xiàn)原理
事務(wù)消息在一階段對(duì)用戶不可見(jiàn)
在RocketMQ事務(wù)消息的主要流程中验烧,一階段的消息如何對(duì)用戶不可見(jiàn)。
實(shí)現(xiàn)技術(shù)要點(diǎn)一:事務(wù)消息相對(duì)普通消息最大的特點(diǎn)就是一階段發(fā)送的消息對(duì)用戶是不可見(jiàn)的。如何做到寫入消息但是對(duì)用戶不可見(jiàn)呢?
RocketMQ事務(wù)消息的做法是:如果消息是half消息吁峻,將備份原消息的Topic與消息消費(fèi)隊(duì)列庆揩,然后,改變Topic為RMQ_SYS_TRANS_HALF_TOPIC。
由于消費(fèi)組未訂閱該主題,故消費(fèi)端無(wú)法消費(fèi)half類型的消息,然后RocketMQ會(huì)開(kāi)啟一個(gè)定時(shí)任務(wù)浩习,從Topic為RMQ_SYS_TRANS_HALF_TOPIC中拉取消息進(jìn)行消費(fèi),根據(jù)生產(chǎn)者組獲取一個(gè)服務(wù)提供者發(fā)送回查事務(wù)狀態(tài)請(qǐng)求济丘,根據(jù)事務(wù)狀態(tài)來(lái)決定是提交或回滾消息谱秽。
RocketMQ中,消息在服務(wù)端的存儲(chǔ)結(jié)構(gòu)如下摹迷,每條消息都會(huì)有對(duì)應(yīng)的索引信息疟赊,Consumer通過(guò)ConsumeQueue這個(gè)二級(jí)索引來(lái)讀取消息實(shí)體內(nèi)容,其流程如下:
RocketMQ的底層實(shí)現(xiàn)原理
- 寫入的如果事務(wù)消息峡碉,對(duì)消息的Topic和Queue等屬性進(jìn)行替換近哟,同時(shí)將原來(lái)的Topic和Queue信息存儲(chǔ)到消息的屬性中,正因?yàn)橄⒅黝}被替換鲫寄,故消息并不會(huì)轉(zhuǎn)發(fā)到該原主題的消息消費(fèi)隊(duì)列吉执。
- 由于沒(méi)有直接發(fā)送到目標(biāo)的topic的隊(duì)列里面疯淫,故此消費(fèi)者無(wú)法感知消息的存在,不會(huì)消費(fèi)戳玫,其實(shí)改變消息主題是RocketMQ的常用“套路”熙掺,回想一下延時(shí)消息的實(shí)現(xiàn)機(jī)制。
發(fā)送一個(gè)半事務(wù)消息
半事務(wù)消息是指暫不能投遞的消息咕宿,生產(chǎn)者已經(jīng)成功地將消息發(fā)送到了 Broker币绩,但是Broker未收到生產(chǎn)者對(duì)該消息的二次確認(rèn),此時(shí)該消息被標(biāo)記成“暫不能投遞(pending)”狀態(tài)荠列,如果發(fā)送成功則執(zhí)行本地事務(wù)类浪,并根據(jù)本地事務(wù)執(zhí)行成功與否载城,向Broker半事務(wù)消息狀態(tài)(commit或者rollback)肌似,半事務(wù)消息只有commit狀態(tài)才會(huì)真正向下游投遞。
Commit和Rollback操作以及Op消息的底層實(shí)現(xiàn)原理
Rollback的情況诉瓦,對(duì)于Rollback川队,本身一階段的消息對(duì)用戶是不可見(jiàn)的,其實(shí)不需要真正撤銷消息(實(shí)際上RocketMQ也無(wú)法去真正的刪除一條消息睬澡,因?yàn)槭琼樞驅(qū)懳募模?/p>
但是區(qū)別于這條消息沒(méi)有確定狀態(tài)(Pending狀態(tài)固额,事務(wù)懸而未決),需要一個(gè)操作來(lái)標(biāo)識(shí)這條消息的最終狀態(tài)煞聪。RocketMQ事務(wù)消息方案中引入了Op消息的概念斗躏,用Op消息標(biāo)識(shí)事務(wù)消息已經(jīng)確定的狀態(tài)(Commit或者Rollback)。
如果一條事務(wù)消息沒(méi)有對(duì)應(yīng)的Op消息昔脯,說(shuō)明這個(gè)事務(wù)的狀態(tài)還無(wú)法確定(可能是二階段失敗了)啄糙。引入Op消息后,事務(wù)消息無(wú)論是Commit或者Rollback都會(huì)記錄一個(gè)Op操作云稚。Commit相對(duì)于Rollback只是在寫入Op消息前創(chuàng)建Half消息的索引隧饼。
Op消息的存儲(chǔ)和對(duì)應(yīng)關(guān)系
Op消息寫入到全局特定的Topic中通過(guò)源碼中的方法
TransactionalMessageUtil.buildOpTopic();
這個(gè)Topic是一個(gè)內(nèi)部的Topic(像Half消息的Topic一樣)静陈,不會(huì)被用戶消費(fèi)燕雁。Op消息的內(nèi)容為對(duì)應(yīng)的Half消息的存儲(chǔ)的Offset,這樣通過(guò)Op消息能索引到Half消息進(jìn)行后續(xù)的回查操作鲸拥。
Half消息的索引構(gòu)建
執(zhí)行二階段Commit操作時(shí)拐格,需要構(gòu)建出Half消息的索引。
- 一階段的Half消息由于是寫到一個(gè)特殊的Topic刑赶,
- 二階段構(gòu)建索引時(shí)需要讀取出Half消息捏浊,并將Topic和Queue替換成真正的目標(biāo)的Topic和Queue,之后通過(guò)一次普通消息的寫入操作來(lái)生成一條對(duì)用戶可見(jiàn)的消息角撞。
所以呛伴,RocketMQ事務(wù)消息二階段其實(shí)是利用了一階段存儲(chǔ)的消息的內(nèi)容勃痴,在二階段時(shí)恢復(fù)出一條完整的普通消息,然后走一遍消息寫入流程热康。
補(bǔ)償控制要點(diǎn)
如果由于網(wǎng)絡(luò)閃斷沛申、生產(chǎn)者應(yīng)用重啟等原因,導(dǎo)致某條事務(wù)消息的二次確認(rèn)丟失姐军,Broker端會(huì)通過(guò)掃描發(fā)現(xiàn)某條消息長(zhǎng)期處于"半事務(wù)消息"時(shí)铁材,需要主動(dòng)向消息生產(chǎn)者詢問(wèn)該消息的最終狀態(tài)(Commit或是Rollback)。
這樣最終保證了本地事務(wù)執(zhí)行成功奕锌,下游就能收到消息著觉,本地事務(wù)執(zhí)行失敗,下游就收不到消息惊暴”穑總而保證了上下游數(shù)據(jù)的一致性。
注意:事務(wù)消息的生產(chǎn)組名稱 ProducerGroupName不能隨意設(shè)置辽话。事務(wù)消息有回查機(jī)制肄鸽,回查時(shí)Broker端如果發(fā)現(xiàn)原始生產(chǎn)者已經(jīng)崩潰,則會(huì)聯(lián)系同一生產(chǎn)者組的其他生產(chǎn)者實(shí)例回查本地事務(wù)執(zhí)行情況以Commit或Rollback半事務(wù)消息油啤。
RocketMQ的回查功能實(shí)現(xiàn)原理
如果在RocketMQ事務(wù)消息的二階段過(guò)程中失敗了典徘,例如在做Commit操作時(shí),出現(xiàn)網(wǎng)絡(luò)問(wèn)題導(dǎo)致Commit失敗益咬,那么需要通過(guò)一定的策略使這條消息最終被Commit逮诲。RocketMQ采用了一種補(bǔ)償機(jī)制,稱為“回查”幽告。
-
回查次數(shù)的配置化
Broker端對(duì)未確定狀態(tài)的消息發(fā)起回查梅鹦,將消息發(fā)送到對(duì)應(yīng)的Producer端(同一個(gè)Group的Producer),由Producer根據(jù)消息來(lái)檢查本地事務(wù)的狀態(tài)评腺,進(jìn)而執(zhí)行Commit或者Rollback帘瞭。Broker端通過(guò)對(duì)比Half消息和Op消息進(jìn)行事務(wù)消息的回查并且推進(jìn)CheckPoint(記錄那些事務(wù)消息的狀態(tài)是確定的)。
為了避免單個(gè)消息被檢查太多次而導(dǎo)致半隊(duì)列消息累積蒿讥,我們默認(rèn)將單個(gè)消息的檢查次數(shù)限制為 15 次蝶念,但是用戶可以通過(guò) Broker 配置文件的 transactionCheckMax參數(shù)來(lái)修改此限制
如果已經(jīng)檢查某條消息超過(guò) N 次的話( N = transactionCheckMax ) 則 Broker 將丟棄此消息,并在默認(rèn)情況下同時(shí)打印錯(cuò)誤日志芋绸,執(zhí)行回滾Rollback操作媒殉。
-
回查行為的定制化d
- 此外用戶可以通過(guò)重寫AbstractTransactionalMessageCheckListener 類來(lái)修改這個(gè)Rollback的行為,比如改寫為Commit摔敛,或者其他的記錄日志或者發(fā)送消息郵件推送給指定人進(jìn)行人工跟進(jìn)廷蓉。
回查觸發(fā)時(shí)間定制化
事務(wù)消息將在 Broker配置文件中的參數(shù)transactionTimeout 這樣的特定時(shí)間長(zhǎng)度之后被檢查。當(dāng)發(fā)送事務(wù)消息時(shí)马昙,用戶還可以通過(guò)設(shè)置用戶屬性CHECK_IMMUNITY_TIME_IN_SECONDS 來(lái)改變這個(gè)限制桃犬,該參數(shù)優(yōu)先于 transactionTimeout 參數(shù)刹悴。
事務(wù)性消息可能不止一次被檢查或消費(fèi)。
發(fā)送給用戶的目標(biāo)topic消息可能會(huì)失敗攒暇,目前這依日志的記錄而定土匀。它的高可用性通過(guò) RocketMQ 本身的高可用性機(jī)制來(lái)保證,如果希望確保事務(wù)消息不丟失形用、并且事務(wù)完整性得到保證就轧,建議使用同步的雙重寫入機(jī)制。
事務(wù)消息的生產(chǎn)者 ID 不能與其他類型消息的生產(chǎn)者 ID 共享田度。與其他類型的消息不同妒御,事務(wù)消息允許反向查詢、MQ服務(wù)器能通過(guò)它們的生產(chǎn)者 ID 查詢到消費(fèi)者镇饺。
消息事務(wù)樣例
事務(wù)消息共有三種狀態(tài)乎莉,提交狀態(tài)、回滾狀態(tài)兰怠、中間狀態(tài)梦鉴。
- TransactionStatus.CommitTransaction: 提交事務(wù),它允許消費(fèi)者消費(fèi)此消息揭保。
- TransactionStatus.RollbackTransaction: 回滾事務(wù),它代表該消息將被刪除魄宏,不允許被消費(fèi)秸侣。
- TransactionStatus.Unknown: 中間狀態(tài)(Pending),它代表需要檢查消息隊(duì)列來(lái)確定狀態(tài)宠互。
開(kāi)發(fā)實(shí)現(xiàn)案例
發(fā)送事務(wù)消息樣例
創(chuàng)建事務(wù)性生產(chǎn)者
使用 TransactionMQProducer類創(chuàng)建生產(chǎn)者味榛,并指定唯一的 ProducerGroup,就可以設(shè)置自定義線程池來(lái)處理這些檢查請(qǐng)求予跌。執(zhí)行本地事務(wù)后搏色、需要根據(jù)執(zhí)行結(jié)果對(duì)消息隊(duì)列進(jìn)行回復(fù)。
TransactionListener transactionListener = new TransactionListenerImpl();
TransactionMQProducer producer = new
TransactionMQProducer("please_rename_unique_group_name");
ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100,
TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
thread.setName("client-transaction-msg-check-thread");
return thread;
}
});
producer.setExecutorService(executorService);
producer.setTransactionListener(transactionListener);
producer.start();
String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};
for (int i = 0; i < 10; i++) {
try {
Message msg =
new Message("TopicTest1234", tags[i % tags.length], "KEY" + i,
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.sendMessageInTransaction(msg, null);
System.out.printf("%s%n", sendResult);
Thread.sleep(10);
} catch (MQClientException | UnsupportedEncodingException e) {
e.printStackTrace();
}
}
for (int i = 0; i < 100000; i++) {
Thread.sleep(1000);
}
producer.shutdown();
}
實(shí)現(xiàn)事務(wù)的監(jiān)聽(tīng)接口
TransactionListener接口的定義如下:
public interface TransactionListener {
/**
* When send transactional prepare(half) message succeed, this method will be invoked to execute local transaction.
*
* @param msg Half(prepare) message
* @param arg Custom business parameter
* @return Transaction state
*/
LocalTransactionState executeLocalTransaction(final Message msg, final Object arg);
/**
* When no response to prepare(half) message. broker will send check message to check the transaction status, and this
* method will be invoked to get local transaction status.
*
* @param msg Check message
* @return Transaction state
*/
LocalTransactionState checkLocalTransaction(final MessageExt msg);
}
當(dāng)發(fā)送半消息成功時(shí)券册,我們使用 executeLocalTransaction 方法來(lái)執(zhí)行本地事務(wù)频轿。它返回前一節(jié)中提到的三個(gè)事務(wù)狀態(tài)之一。checkLocalTransaction 方法用于檢查本地事務(wù)狀態(tài)烁焙,并回應(yīng)消息隊(duì)列的檢查請(qǐng)求航邢。它也是返回前一節(jié)中提到的三個(gè)事務(wù)狀態(tài)之一。
public class TransactionListenerImpl implements TransactionListener {
private AtomicInteger transactionIndex = new AtomicInteger(0);
private ConcurrentHashMap<String, Integer> localTrans = new ConcurrentHashMap<>();
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
int value = transactionIndex.getAndIncrement();
int status = value % 3;
localTrans.put(msg.getTransactionId(), status);
return LocalTransactionState.UNKNOW;
}
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
Integer status = localTrans.get(msg.getTransactionId());
if (null != status) {
switch (status) {
case 0:
return LocalTransactionState.UNKNOW;
case 1:
return LocalTransactionState.COMMIT_MESSAGE;
case 2:
return LocalTransactionState.ROLLBACK_MESSAGE;
}
}
return LocalTransactionState.COMMIT_MESSAGE;
}
}
executeLocalTransaction 是半事務(wù)消息發(fā)送成功后骄蝇,執(zhí)行本地事務(wù)的方法膳殷,具體執(zhí)行完本地事務(wù)后,可以在該方法中返回以下三種狀態(tài):
- LocalTransactionState.COMMIT_MESSAGE:提交事務(wù)九火,允許消費(fèi)者消費(fèi)該消息
- LocalTransactionState.ROLLBACK_MESSAGE:回滾事務(wù)赚窃,消息將被丟棄不允許消費(fèi)册招。
- LocalTransactionState.UNKNOW:暫時(shí)無(wú)法判斷狀態(tài),等待固定時(shí)間以后Broker端根據(jù)回查規(guī)則向生產(chǎn)者進(jìn)行消息回查勒极。
checkLocalTransaction是由于二次確認(rèn)消息沒(méi)有收到跨细,Broker端回查事務(wù)狀態(tài)的方法『又剩回查規(guī)則:本地事務(wù)執(zhí)行完成后冀惭,若Broker端收到的本地事務(wù)返回狀態(tài)為L(zhǎng)ocalTransactionState.UNKNOW,或生產(chǎn)者應(yīng)用退出導(dǎo)致本地事務(wù)未提交任何狀態(tài)掀鹅。則Broker端會(huì)向消息生產(chǎn)者發(fā)起事務(wù)回查散休,第一次回查后仍未獲取到事務(wù)狀態(tài),則之后每隔一段時(shí)間會(huì)再次回查乐尊。
事務(wù)消息使用上的限制
事務(wù)消息不支持延時(shí)消息和批量消息戚丸。