零喉脖、業(yè)務(wù)場景
在項(xiàng)目中,經(jīng)常遇到這樣一個(gè)場景抑月,需要保證數(shù)據(jù)持久化和消息發(fā)送要么同時(shí)成功树叽,要么同時(shí)失敗。比如當(dāng)用戶在交易系統(tǒng)下了一個(gè)訂單谦絮,購物車需要消費(fèi)訂單消息清除加購數(shù)據(jù)题诵、積分系統(tǒng)需要變更用戶積分、短信平臺(tái)需要給買家發(fā)送提醒等层皱,交易系統(tǒng)要將訂單落入DB和發(fā)送訂單消息保證一致性锭,不能本地事務(wù)回滾,訂單沒有生成但是發(fā)送了創(chuàng)建訂單消息叫胖,下游系統(tǒng)產(chǎn)生臟數(shù)據(jù)草冈,也不能訂單已經(jīng)創(chuàng)建,但是下游系統(tǒng)沒有感知繼而無法履約瓮增,影響用戶體驗(yàn)怎棱。
如果讓我們自己實(shí)現(xiàn)的話,當(dāng)然也是有辦法的绷跑。比如在業(yè)務(wù)數(shù)據(jù)庫中建立一張消息表用于存儲(chǔ)消息拳恋,將業(yè)務(wù)數(shù)據(jù)和消息數(shù)據(jù)放在同一個(gè)事務(wù)中進(jìn)行存儲(chǔ),就可以利用數(shù)據(jù)庫事務(wù)保證同時(shí)原子性砸捏。后續(xù)可以定時(shí)掃描消息表谬运,將消息數(shù)據(jù)再發(fā)送出去。
當(dāng)然也可以用現(xiàn)成的解決方案垦藏,RocketMQ從4.3.0版本開始梆暖,支持事務(wù)消息。我們只需要編寫對(duì)應(yīng)的本地事務(wù)執(zhí)行方法executeLocalTransaction和本地事務(wù)執(zhí)行結(jié)果檢查方法checkLocalTransaction掂骏,RocketMQ會(huì)自動(dòng)調(diào)用本地事務(wù)執(zhí)行。如果本地事務(wù)執(zhí)行成功,下游才能消費(fèi)到消息滑废,如果本地事務(wù)執(zhí)行失敗蠕趁,下游是無法感知到這條消息的
一、使用方法
使用RocketMQ發(fā)送事務(wù)消息棚点,只有消息發(fā)送和普通消息發(fā)送有所區(qū)別贬循。參見官方示例:
// TransactionProducer.java
// 需要自定義一個(gè)TransactionListener用于執(zhí)行事務(wù)executeLocalTransaction和事務(wù)執(zhí)行結(jié)果回查checkLocalTransaction 代碼在下面
TransactionListener transactionListener = new TransactionListenerImpl();
// 事務(wù)消息發(fā)送producer
TransactionMQProducer producer = new TransactionMQProducer(PRODUCER_GROUP);
producer.setNamesrvAddr(DEFAULT_NAMESRVADDR);
// 創(chuàng)建一個(gè)線程池 用于Broker回查本地事務(wù)執(zhí)行狀態(tài) 如果這里沒有創(chuàng)建葡缰,RocketMQ會(huì)自動(dòng)創(chuàng)建一個(gè)線程池
ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<>(2000), r -> {
Thread thread = new Thread(r);
thread.setName("client-transaction-msg-check-thread");
return thread;
});
producer.setExecutorService(executorService);
producer.setTransactionListener(transactionListener);
producer.start();
String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};
for (int i = 0; i < MESSAGE_COUNT; i++) {
try {
Message msg =
new Message(TOPIC, tags[i % tags.length], "KEY" + i,
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
// 發(fā)送事務(wù)消息
SendResult sendResult = producer.sendMessageInTransaction(msg, null);
System.out.printf("%s%n", sendResult);
Thread.sleep(10);
} catch (MQClientException | UnsupportedEncodingException e) {
e.printStackTrace();
}
}
public class TransactionListenerImpl implements TransactionListener {
private AtomicInteger transactionIndex = new AtomicInteger(0);
private ConcurrentHashMap<String, Integer> localTrans = new ConcurrentHashMap<>();
/**
* 執(zhí)行本地事務(wù)
*/
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
int value = transactionIndex.getAndIncrement();
int status = value % 3;
localTrans.put(msg.getTransactionId(), status);
return LocalTransactionState.UNKNOW;
}
/**
* 本地事務(wù)執(zhí)行狀態(tài)回查
*/
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
Integer status = localTrans.get(msg.getTransactionId());
if (null != status) {
switch (status) {
case 0:
return LocalTransactionState.UNKNOW;
case 1:
return LocalTransactionState.COMMIT_MESSAGE;
case 2:
return LocalTransactionState.ROLLBACK_MESSAGE;
default:
return LocalTransactionState.COMMIT_MESSAGE;
}
}
return LocalTransactionState.COMMIT_MESSAGE;
}
}
本地事務(wù)的執(zhí)行狀態(tài),有三種結(jié)果:
- LocalTransactionState.COMMIT_MESSAGE:事務(wù)執(zhí)行成功,Broker會(huì)處理消息供下游消費(fèi)
- LocalTransactionState.ROLLBACK_MESSAGE:事務(wù)被回滾嗦嗡,Broker會(huì)刪除消息,下游感知不到消息
- LocalTransactionState.UNKNOW:事務(wù)的執(zhí)行結(jié)果未知亿乳,比如事務(wù)還在執(zhí)行中,稍后Broker會(huì)回重復(fù)回查,直到超過最大時(shí)間或者最大次數(shù)
二勋磕、原理解析
0妈候、整體流程
image.png
- Producer發(fā)送事務(wù)消息
- Broker端SendMessageProcessor收到消息后,判斷如果是一條事務(wù)消息挂滓,會(huì)將消息原來的topic和隊(duì)列id存儲(chǔ)到消息拓展中苦银,設(shè)置新的topic為RMQ_SYS_TRANS_HALF_TOPIC然后 進(jìn)行存儲(chǔ),然后通知Producer
- Producer收到Broker消息發(fā)送成功后,開始執(zhí)行本地事務(wù)
- 本地事務(wù)執(zhí)行完畢幔虏,Producer將事務(wù)執(zhí)行狀態(tài)通知Broker
- Broker端EndTransactionProcessor收到事務(wù)執(zhí)行狀態(tài)纺念,從RMQ_SYS_TRANS_HALF_TOPIC中取出消息。如果事務(wù)執(zhí)行成功想括,則從消息拓展中取出原本的topic和隊(duì)列id柠辞,存儲(chǔ)到真實(shí)的topic和隊(duì)列id中,存儲(chǔ)到RMQ_SYS_TRANS_OP_HALF_TOPIC主題中主胧;如果是事務(wù)回滾,只把消息存儲(chǔ)到RMQ_SYS_TRANS_OP_HALF_TOPIC主題中
- 如果Broker沒有收到Producer事務(wù)執(zhí)行狀態(tài)的通知习勤,Broker端TransactionalMessageCheckService會(huì)主動(dòng)定時(shí)從RMQ_SYS_TRANS_HALF_TOPIC中撈取消息踪栋,判斷是否有需要回查的消息
- 如果有需要回查的消息,Broker端TransactionalMessageCheckService會(huì)向Producer回查事務(wù)狀態(tài)
- Producer執(zhí)行TransactionListener的checkLocalTransaction方法图毕,查詢事務(wù)執(zhí)行狀態(tài)
- Producer查詢本地事務(wù)狀態(tài)之后再執(zhí)行上述第4步和第5步
1夷都、Producer發(fā)送消息
// 編寫TransactionListener實(shí)現(xiàn)類用于執(zhí)行本地事務(wù)和本地事務(wù)回查
TransactionListener transactionListener = new TransactionListenerImpl();
// 發(fā)送事務(wù)消息專用的TransactionMQProducer
TransactionMQProducer producer = new TransactionMQProducer(PRODUCER_GROUP);
producer.setNamesrvAddr(DEFAULT_NAMESRVADDR);
// 新建一個(gè)線程池用于異步執(zhí)行從Broker過來的事務(wù)回查 如果這里不新建 Broker也會(huì)自動(dòng)創(chuàng)建一個(gè)
ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<>(2000), r -> {
Thread thread = new Thread(r);
thread.setName("client-transaction-msg-check-thread");
return thread;
});
producer.setExecutorService(executorService);
producer.setTransactionListener(transactionListener);
producer.start();
String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};
for (int i = 0; i < MESSAGE_COUNT; i++) {
try {
Message msg =
new Message(TOPIC, tags[i % tags.length], "KEY" + i,
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
// 發(fā)送事務(wù)消息
SendResult sendResult = producer.sendMessageInTransaction(msg, null);
System.out.printf("%s%n", sendResult);
Thread.sleep(10);
} catch (MQClientException | UnsupportedEncodingException e) {
e.printStackTrace();
}
}
// 發(fā)送事務(wù)消息
public TransactionSendResult sendMessageInTransaction(final Message msg,
final LocalTransactionExecuter localTransactionExecuter, final Object arg)
throws MQClientException {
// 校驗(yàn)是否設(shè)置TransactionListener 發(fā)送事務(wù)消息必須要有TransactionListener
TransactionListener transactionListener = getCheckListener();
if (null == localTransactionExecuter && null == transactionListener) {
throw new MQClientException("tranExecutor is null", null);
}
// ignore DelayTimeLevel parameter
if (msg.getDelayTimeLevel() != 0) {
// 事務(wù)消息不支持延遲發(fā)送
MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_DELAY_TIME_LEVEL);
}
Validators.checkMessage(msg, this.defaultMQProducer);
SendResult sendResult = null;
// 標(biāo)記prepare消息 Broker根據(jù)這個(gè)判斷是否是一條事務(wù)消息
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TRANSACTION_PREPARED, "true");
// 設(shè)置消息生產(chǎn)者組 為了查詢事務(wù)消息本地事務(wù)狀態(tài)時(shí) 從該生產(chǎn)者組中隨機(jī)選擇一個(gè)消息生產(chǎn)者即可
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_PRODUCER_GROUP, this.defaultMQProducer.getProducerGroup());
try {
sendResult = this.send(msg);
} catch (Exception e) {
throw new MQClientException("send message Exception", e);
}
// ......
// 對(duì)發(fā)送結(jié)果的處理稍后解析
}
2、Broker接收事務(wù)消息
// asyncSendMessage方法
CompletableFuture<PutMessageResult> putMessageResult = null;
// 依據(jù)消息是否有MessageConst.PROPERTY_TRANSACTION_PREPARED判斷是否事務(wù)消息
String transFlag = origProps.get(MessageConst.PROPERTY_TRANSACTION_PREPARED);
if (Boolean.parseBoolean(transFlag)) {
// 處理事務(wù)消息
if (this.brokerController.getBrokerConfig().isRejectTransactionMessage()) {
response.setCode(ResponseCode.NO_PERMISSION);
response.setRemark(
"the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1()
+ "] sending transaction message is forbidden");
return CompletableFuture.completedFuture(response);
}
// 保存事務(wù)消息
putMessageResult = this.brokerController.getTransactionalMessageService().asyncPrepareMessage(msgInner);
} else {
// 保存消息
putMessageResult = this.brokerController.getMessageStore().asyncPutMessage(msgInner);
}
private MessageExtBrokerInner parseHalfMessageInner(MessageExtBrokerInner msgInner) {
// 備份原本的topic和隊(duì)列
MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_TOPIC, msgInner.getTopic());
MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_QUEUE_ID,
String.valueOf(msgInner.getQueueId()));
msgInner.setSysFlag(
MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(), MessageSysFlag.TRANSACTION_NOT_TYPE));
// 設(shè)置新的topic為RMQ_SYS_TRANS_HALF_TOPIC
msgInner.setTopic(TransactionalMessageUtil.buildHalfTopic());
// 隊(duì)列是寫死的 只有一個(gè) 也就是說是順序處理的
msgInner.setQueueId(0);
msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));
return msgInner;
}
3予颤、Producer處理發(fā)送消息結(jié)果
// sendMessageInTransaction方法
try {
// 事務(wù)消息發(fā)送結(jié)果
sendResult = this.send(msg);
} catch (Exception e) {
throw new MQClientException("send message Exception", e);
}
LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW;
Throwable localException = null;
// 處理事務(wù)消息發(fā)送結(jié)果
switch (sendResult.getSendStatus()) {
// 發(fā)送成功的情況
case SEND_OK: {
try {
if (sendResult.getTransactionId() != null) {
msg.putUserProperty("__transactionId__", sendResult.getTransactionId());
}
String transactionId = msg.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);
if (null != transactionId && !"".equals(transactionId)) {
msg.setTransactionId(transactionId);
}
if (null != localTransactionExecuter) {
// 這個(gè)已經(jīng)廢棄了 不會(huì)進(jìn)入這里
localTransactionState = localTransactionExecuter.executeLocalTransactionBranch(msg, arg);
} else if (transactionListener != null) {
// 這里執(zhí)行本地事務(wù)
log.debug("Used new transaction API");
localTransactionState = transactionListener.executeLocalTransaction(msg, arg);
}
if (null == localTransactionState) {
localTransactionState = LocalTransactionState.UNKNOW;
}
if (localTransactionState != LocalTransactionState.COMMIT_MESSAGE) {
log.info("executeLocalTransactionBranch return {}", localTransactionState);
log.info(msg.toString());
}
} catch (Throwable e) {
// 有catch邏輯 是考慮到事務(wù)執(zhí)行異常的場景
log.info("executeLocalTransactionBranch exception", e);
log.info(msg.toString());
localException = e;
}
}
break;
case FLUSH_DISK_TIMEOUT:
case FLUSH_SLAVE_TIMEOUT:
case SLAVE_NOT_AVAILABLE:
localTransactionState = LocalTransactionState.ROLLBACK_MESSAGE;
break;
default:
break;
}
4囤官、Producer通知Broker事務(wù)執(zhí)行狀態(tài)
try {
// 事務(wù)消息收尾工作 通知Broker干活
this.endTransaction(msg, sendResult, localTransactionState, localException);
} catch (Exception e) {
log.warn("local transaction execute " + localTransactionState + ", but end broker transaction failed", e);
}
public void endTransaction(
final Message msg,
final SendResult sendResult,
final LocalTransactionState localTransactionState,
final Throwable localException) throws RemotingException, MQBrokerException, InterruptedException, UnknownHostException {
final MessageId id;
if (sendResult.getOffsetMsgId() != null) {
id = MessageDecoder.decodeMessageId(sendResult.getOffsetMsgId());
} else {
id = MessageDecoder.decodeMessageId(sendResult.getMsgId());
}
String transactionId = sendResult.getTransactionId();
final String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(sendResult.getMessageQueue().getBrokerName());
EndTransactionRequestHeader requestHeader = new EndTransactionRequestHeader();
requestHeader.setTransactionId(transactionId);
requestHeader.setCommitLogOffset(id.getOffset());
// 設(shè)置本地事務(wù)執(zhí)行狀態(tài)
switch (localTransactionState) {
case COMMIT_MESSAGE:
requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_COMMIT_TYPE);
break;
case ROLLBACK_MESSAGE:
requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_ROLLBACK_TYPE);
break;
case UNKNOW:
requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_NOT_TYPE);
break;
default:
break;
}
doExecuteEndTransactionHook(msg, sendResult.getMsgId(), brokerAddr, localTransactionState, false);
requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());
requestHeader.setTranStateTableOffset(sendResult.getQueueOffset());
requestHeader.setMsgId(sendResult.getMsgId());
String remark = localException != null ? ("executeLocalTransactionBranch exception: " + localException.toString()) : null;
// 發(fā)送消息給Broker
this.mQClientFactory.getMQClientAPIImpl().endTransactionOneway(brokerAddr, requestHeader, remark,
this.defaultMQProducer.getSendMsgTimeout());
}
public void endTransactionOneway(
final String addr,
final EndTransactionRequestHeader requestHeader,
final String remark,
final long timeoutMillis
) throws RemotingException, InterruptedException {
// 指定Broker使用EndTransactionProcessor處理
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.END_TRANSACTION, requestHeader);
request.setRemark(remark);
// 單向消息 不考慮發(fā)送結(jié)果
// 也就是說 是可能發(fā)送失敗的 發(fā)送失敗之后Broker會(huì)回查
this.remotingClient.invokeOneway(addr, request, timeoutMillis);
}
5、Broker處理事務(wù)執(zhí)行狀態(tài)通知
// processRequest方法
// 上面的代理是Broker向Producer回查事務(wù)后的處理 稍后解析
else {
// 發(fā)送半消息之后產(chǎn)生的調(diào)用
switch (requestHeader.getCommitOrRollback()) {
// 如果事務(wù)執(zhí)行不是commit或者rollback 直接返回 不再進(jìn)行下面的邏輯
case MessageSysFlag.TRANSACTION_NOT_TYPE: {
LOGGER.warn("The producer[{}] end transaction in sending message, and it's pending status."
+ "RequestHeader: {} Remark: {}",
RemotingHelper.parseChannelRemoteAddr(ctx.channel()),
requestHeader.toString(),
request.getRemark());
return null;
}
// 如果事務(wù)執(zhí)行是commit 接著下面的處理
case MessageSysFlag.TRANSACTION_COMMIT_TYPE: {
break;
}
// 如果事務(wù)執(zhí)行是rollback 打印異常日志 接著下面的處理
case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE: {
LOGGER.warn("The producer[{}] end transaction in sending message, rollback the message."
+ "RequestHeader: {} Remark: {}",
RemotingHelper.parseChannelRemoteAddr(ctx.channel()),
requestHeader.toString(),
request.getRemark());
break;
}
default:
return null;
}
}
OperationResult result = new OperationResult();
if (MessageSysFlag.TRANSACTION_COMMIT_TYPE == requestHeader.getCommitOrRollback()) {
// 根據(jù)偏移量 取出topic是RMQ_SYS_TRANS_HALF_TOPIC的消息
// 第2步Broker保存消息之后 會(huì)把偏移量通知Producer Producer再傳到這里
result = this.brokerController.getTransactionalMessageService().commitMessage(requestHeader);
// 事務(wù)執(zhí)行狀態(tài)是commit
if (result.getResponseCode() == ResponseCode.SUCCESS) {
RemotingCommand res = checkPrepareMessage(result.getPrepareMessage(), requestHeader);
if (res.getCode() == ResponseCode.SUCCESS) {
// 從消息中取出原來的topic和隊(duì)列等信息 構(gòu)建真實(shí)消息
MessageExtBrokerInner msgInner = endMessageTransaction(result.getPrepareMessage());
msgInner.setSysFlag(MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(), requestHeader.getCommitOrRollback()));
msgInner.setQueueOffset(requestHeader.getTranStateTableOffset());
msgInner.setPreparedTransactionOffset(requestHeader.getCommitLogOffset());
msgInner.setStoreTimestamp(result.getPrepareMessage().getStoreTimestamp());
// 清除事務(wù)消息相關(guān)標(biāo)記 防止循環(huán)處理
MessageAccessor.clearProperty(msgInner, MessageConst.PROPERTY_TRANSACTION_PREPARED);
// 保存真實(shí)消息
RemotingCommand sendResult = sendFinalMessage(msgInner);
if (sendResult.getCode() == ResponseCode.SUCCESS) {
// 刪除消息RMQ_SYS_TRANS_HALF_TOPIC
// 實(shí)際上是投遞到RMQ_SYS_TRANS_OP_HALF_TOPIC中 并標(biāo)記刪除
// 這里為什么還要投遞到RMQ_SYS_TRANS_OP_HALF_TOPIC中 不直接刪除呢 后面還需要根據(jù)這個(gè)判斷是否是重復(fù)處理等
this.brokerController.getTransactionalMessageService().deletePrepareMessage(result.getPrepareMessage());
}
return sendResult;
}
return res;
}
} else if (MessageSysFlag.TRANSACTION_ROLLBACK_TYPE == requestHeader.getCommitOrRollback()) {
// 如果事務(wù)狀態(tài)是rollback 刪除消息RMQ_SYS_TRANS_HALF_TOPIC 投遞到RMQ_SYS_TRANS_OP_HALF_TOPIC中并標(biāo)記刪除
// 比commit少了一個(gè)投遞真實(shí)主題的步驟
result = this.brokerController.getTransactionalMessageService().rollbackMessage(requestHeader);
if (result.getResponseCode() == ResponseCode.SUCCESS) {
RemotingCommand res = checkPrepareMessage(result.getPrepareMessage(), requestHeader);
if (res.getCode() == ResponseCode.SUCCESS) {
this.brokerController.getTransactionalMessageService().deletePrepareMessage(result.getPrepareMessage());
}
return res;
}
}
6蛤虐、Broker主動(dòng)撈取消息
TransactionalMessageCheckService類實(shí)現(xiàn)Runnable接口党饮,在Broker啟動(dòng)的時(shí)候,回調(diào)用BrokerController的start方法驳庭,在start方法中刑顺,會(huì)調(diào)用TransactionalMessageCheckService的start方法啟動(dòng)線程,run方法是一個(gè)死循環(huán)饲常,默認(rèn)每6秒執(zhí)行一次
public void run() {
log.info("Start transaction check service thread!");
long checkInterval = brokerController.getBrokerConfig().getTransactionCheckInterval();
while (!this.isStopped()) {
this.waitForRunning(checkInterval);
}
log.info("End transaction check service thread!");
}
循環(huán)中實(shí)際執(zhí)行的是這個(gè)方法
protected void onWaitEnd() {
// 事務(wù)過期時(shí)間 只有當(dāng)消息存儲(chǔ)時(shí)間加上這個(gè)過期時(shí)間大于系統(tǒng)當(dāng)前時(shí)間 才對(duì)消息執(zhí)行事務(wù)回查 否則在下一次周期中執(zhí)行事務(wù)回查操作
long timeout = brokerController.getBrokerConfig().getTransactionTimeOut();
// 事務(wù)回查最大檢測次數(shù) 如果超過最大檢測次數(shù)還是無法獲知消息的事務(wù)狀態(tài) 不會(huì)再會(huì)回查 直接丟棄相當(dāng)于回滾事務(wù)
int checkMax = brokerController.getBrokerConfig().getTransactionCheckMax();
long begin = System.currentTimeMillis();
log.info("Begin to check prepare message, begin time:{}", begin);
this.brokerController.getTransactionalMessageService().check(timeout, checkMax, this.brokerController.getTransactionalMessageCheckListener());
log.info("End to check prepare message, consumed time:{}", System.currentTimeMillis() - begin);
}
public void check(long transactionTimeout, int transactionCheckMax,
AbstractTransactionalMessageCheckListener listener) {
try {
// 獲取事務(wù)半消息主題下的全部隊(duì)列 然后依次處理
String topic = TopicValidator.RMQ_SYS_TRANS_HALF_TOPIC;
Set<MessageQueue> msgQueues = transactionalMessageBridge.fetchMessageQueues(topic);
if (msgQueues == null || msgQueues.size() == 0) {
log.warn("The queue of topic is empty :" + topic);
return;
}
log.debug("Check topic={}, queues={}", topic, msgQueues);
for (MessageQueue messageQueue : msgQueues) {
long startTime = System.currentTimeMillis();
MessageQueue opQueue = getOpQueue(messageQueue);
// RMQ_SYS_TRANS_HALF_TOPIC消息消費(fèi)進(jìn)度
long halfOffset = transactionalMessageBridge.fetchConsumeOffset(messageQueue);
// 收到事務(wù)消息提交或者回滾請(qǐng)求后的MQ_SYS_TRANS_OP_HALF_TOPIC中消息消費(fèi)進(jìn)度
long opOffset = transactionalMessageBridge.fetchConsumeOffset(opQueue);
log.info("Before check, the queue={} msgOffset={} opOffset={}", messageQueue, halfOffset, opOffset);
if (halfOffset < 0 || opOffset < 0) {
log.error("MessageQueue: {} illegal offset read: {}, op offset: {},skip this queue", messageQueue,
halfOffset, opOffset);
continue;
}
List<Long> doneOpOffset = new ArrayList<>();
HashMap<Long, Long> removeMap = new HashMap<>();
// 根據(jù)當(dāng)前的處理進(jìn)度 依次從已處理隊(duì)列MQ_SYS_TRANS_OP_HALF_TOPIC拉取32條消息
PullResult pullResult = fillOpRemoveMap(removeMap, opQueue, opOffset, halfOffset, doneOpOffset);
if (null == pullResult) {
log.error("The queue={} check msgOffset={} with opOffset={} failed, pullResult is null",
messageQueue, halfOffset, opOffset);
continue;
}
// single thread
// 獲取空消息的次數(shù)
int getMessageNullCount = 1;
// 當(dāng)前處理半消息的進(jìn)度
long newOffset = halfOffset;
// 當(dāng)前處理消息的隊(duì)列偏移量
long i = halfOffset;
while (true) {
// 超過時(shí)常等待下次調(diào)度
if (System.currentTimeMillis() - startTime > MAX_PROCESS_TIME_LIMIT) {
log.info("Queue={} process time reach max={}", messageQueue, MAX_PROCESS_TIME_LIMIT);
break;
}
// 如果該消息已被處理 繼續(xù)處理下一條消息
if (removeMap.containsKey(i)) {
log.debug("Half offset {} has been committed/rolled back", i);
Long removedOpOffset = removeMap.remove(i);
doneOpOffset.add(removedOpOffset);
} else {
// 獲取消息
GetResult getResult = getHalfMsg(messageQueue, i);
MessageExt msgExt = getResult.getMsg();
if (msgExt == null) {
// 超過重試次數(shù) 直接跳出 結(jié)束該消息隊(duì)列的事務(wù)回查狀態(tài)
if (getMessageNullCount++ > MAX_RETRY_COUNT_WHEN_HALF_NULL) {
break;
}
// 沒有新的消息而返回 結(jié)束該消息隊(duì)列的事務(wù)回查
if (getResult.getPullResult().getPullStatus() == PullStatus.NO_NEW_MSG) {
log.debug("No new msg, the miss offset={} in={}, continue check={}, pull result={}", i,
messageQueue, getMessageNullCount, getResult.getPullResult());
break;
} else {
log.info("Illegal offset, the miss offset={} in={}, continue check={}, pull result={}",
i, messageQueue, getMessageNullCount, getResult.getPullResult());
// 其它原因 重新拉取
i = getResult.getPullResult().getNextBeginOffset();
newOffset = i;
continue;
}
}
// needDiscard 如果該消息回查的次數(shù)超過允許回查的最大次數(shù) 該消息將被丟棄 事務(wù)消息提交失敗 每回查一次 在消息屬性中+1 默認(rèn)回查最大次數(shù)為5
// needSkip 如果事務(wù)消息超過文件的過期時(shí)間 默認(rèn)72小時(shí) 則跳過該消息
if (needDiscard(msgExt, transactionCheckMax) || needSkip(msgExt)) {
listener.resolveDiscardMsg(msgExt);
newOffset = i + 1;
i++;
continue;
}
if (msgExt.getStoreTimestamp() >= startTime) {
log.debug("Fresh stored. the miss offset={}, check it later, store={}", i,
new Date(msgExt.getStoreTimestamp()));
break;
}
// 消息已存儲(chǔ)時(shí)間 當(dāng)前系統(tǒng)時(shí)間減去消息存儲(chǔ)時(shí)間
long valueOfCurrentMinusBorn = System.currentTimeMillis() - msgExt.getBornTimestamp();
// checkImmunityTime 立即檢測事務(wù)消息的時(shí)間 設(shè)計(jì)的意義是 應(yīng)用程序在發(fā)送事務(wù)消息后 事務(wù)不會(huì)馬上提交 該時(shí)間就是假設(shè)事務(wù)消息發(fā)送成功后 應(yīng)用程序事務(wù)提交時(shí)間 在這段時(shí)間內(nèi) RocketMQ任務(wù)事務(wù)未提交 不應(yīng)該在這個(gè)時(shí)間段內(nèi)向應(yīng)用程序發(fā)送回查請(qǐng)求
// transactionTimeout 事務(wù)消息的超時(shí)時(shí)間 這個(gè)時(shí)間是從OP拉取消息的最后一條消息存儲(chǔ)時(shí)間與check方法開始的時(shí)間 如果時(shí)間差超過了transactionTimeout 就算時(shí)間小于checkImmunityTime 也發(fā)送事務(wù)回查指令
long checkImmunityTime = transactionTimeout;
// PROPERTY_CHECK_IMMUNITY_TIME_IN_SECONDS 消息事務(wù)消息回查的最晚時(shí)間 單位為秒 指的是程序發(fā)送事務(wù)消息 可以指定該事務(wù)消息的有效時(shí)間 只有在這個(gè)時(shí)間內(nèi)收到回查消息才有效 默認(rèn)為null
String checkImmunityTimeStr = msgExt.getUserProperty(MessageConst.PROPERTY_CHECK_IMMUNITY_TIME_IN_SECONDS);
if (null != checkImmunityTimeStr) {
checkImmunityTime = getImmunityTime(checkImmunityTimeStr, transactionTimeout);
if (valueOfCurrentMinusBorn < checkImmunityTime) {
if (checkPrepareQueueOffset(removeMap, doneOpOffset, msgExt)) {
newOffset = i + 1;
i++;
continue;
}
}
} else {
// 如果當(dāng)前時(shí)間沒過應(yīng)用程序事務(wù)結(jié)束時(shí)間 跳出本次處理
if ((0 <= valueOfCurrentMinusBorn) && (valueOfCurrentMinusBorn < checkImmunityTime)) {
log.debug("New arrived, the miss offset={}, check it later checkImmunity={}, born={}", i,
checkImmunityTime, new Date(msgExt.getBornTimestamp()));
break;
}
}
List<MessageExt> opMsg = pullResult.getMsgFoundList();
// 如果OP隊(duì)列中沒有已處理消息并且已經(jīng)超過應(yīng)用程序事務(wù)結(jié)束時(shí)間transactionTimeout
// 或者
// 操作隊(duì)列不為空并且最后一條消息的存儲(chǔ)時(shí)間已經(jīng)超過transactionTimeout
boolean isNeedCheck = (opMsg == null && valueOfCurrentMinusBorn > checkImmunityTime)
|| (opMsg != null && (opMsg.get(opMsg.size() - 1).getBornTimestamp() - startTime > transactionTimeout))
|| (valueOfCurrentMinusBorn <= -1);
if (isNeedCheck) {
// 這里回查是異步處理的 所以在回查之前 需要把消息重新投遞到隊(duì)列中 以便下次check
if (!putBackHalfMsgQueue(msgExt, i)) {
continue;
}
// 執(zhí)行回查邏輯
listener.resolveHalfMsg(msgExt);
} else {
// 如果無法判斷是否發(fā)送回查消息 則加載更多的已處理消息進(jìn)行篩選
pullResult = fillOpRemoveMap(removeMap, opQueue, pullResult.getNextBeginOffset(), halfOffset, doneOpOffset);
log.debug("The miss offset:{} in messageQueue:{} need to get more opMsg, result is:{}", i,
messageQueue, pullResult);
continue;
}
}
newOffset = i + 1;
i++;
}
if (newOffset != halfOffset) {
// 保存半消息的回查進(jìn)度
transactionalMessageBridge.updateConsumeOffset(messageQueue, newOffset);
}
long newOpOffset = calculateOpOffset(doneOpOffset, opOffset);
if (newOpOffset != opOffset) {
// 保存OP進(jìn)度
transactionalMessageBridge.updateConsumeOffset(opQueue, newOpOffset);
}
}
} catch (Throwable e) {
log.error("Check error", e);
}
}
7蹲堂、Broker主動(dòng)回查事務(wù)狀態(tài)
public void resolveHalfMsg(final MessageExt msgExt) {
executorService.execute(new Runnable() {
@Override
public void run() {
try {
sendCheckMessage(msgExt);
} catch (Exception e) {
LOGGER.error("Send check message error!", e);
}
}
});
}
public void sendCheckMessage(MessageExt msgExt) throws Exception {
CheckTransactionStateRequestHeader checkTransactionStateRequestHeader = new CheckTransactionStateRequestHeader();
checkTransactionStateRequestHeader.setCommitLogOffset(msgExt.getCommitLogOffset());
checkTransactionStateRequestHeader.setOffsetMsgId(msgExt.getMsgId());
checkTransactionStateRequestHeader.setMsgId(msgExt.getUserProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX));
checkTransactionStateRequestHeader.setTransactionId(checkTransactionStateRequestHeader.getMsgId());
checkTransactionStateRequestHeader.setTranStateTableOffset(msgExt.getQueueOffset());
msgExt.setTopic(msgExt.getUserProperty(MessageConst.PROPERTY_REAL_TOPIC));
msgExt.setQueueId(Integer.parseInt(msgExt.getUserProperty(MessageConst.PROPERTY_REAL_QUEUE_ID)));
msgExt.setStoreSize(0);
String groupId = msgExt.getProperty(MessageConst.PROPERTY_PRODUCER_GROUP);
// 從同一個(gè)生產(chǎn)者組中選擇一個(gè)Producer進(jìn)行回查
// 所以同一個(gè)生產(chǎn)者組中如果部分機(jī)器出現(xiàn)宕機(jī)、發(fā)布重啟等問題 也不會(huì)影響回查
Channel channel = brokerController.getProducerManager().getAvailableChannel(groupId);
if (channel != null) {
brokerController.getBroker2Client().checkProducerTransactionState(groupId, channel, checkTransactionStateRequestHeader, msgExt);
} else {
LOGGER.warn("Check transaction failed, channel is null. groupId={}", groupId);
}
}
public void checkProducerTransactionState(
final String group,
final Channel channel,
final CheckTransactionStateRequestHeader requestHeader,
final MessageExt messageExt) throws Exception {
// 給Producer發(fā)送消息時(shí) 指定類型是CHECK_TRANSACTION_STATE
RemotingCommand request =
RemotingCommand.createRequestCommand(RequestCode.CHECK_TRANSACTION_STATE, requestHeader);
request.setBody(MessageDecoder.encode(messageExt, false));
try {
this.brokerController.getRemotingServer().invokeOneway(channel, request, 10);
} catch (Exception e) {
log.error("Check transaction failed because invoke producer exception. group={}, msgId={}, error={}",
group, messageExt.getMsgId(), e.toString());
}
}
9贝淤、Producer本地事務(wù)回查
public RemotingCommand processRequest(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException {
switch (request.getCode()) {
case RequestCode.CHECK_TRANSACTION_STATE:
// 判斷是Broker事務(wù)回查 檢查本地事務(wù)執(zhí)行狀態(tài)
return this.checkTransactionState(ctx, request);
// ......省略部分代碼
}
return null;
}
public void checkTransactionState(final String addr, final MessageExt msg,
final CheckTransactionStateRequestHeader header) {
Runnable request = new Runnable() {
//...省略部分代碼 下文解析
};
// 這里正是用新建TransactionMQProducer時(shí)創(chuàng)建的線程池異步執(zhí)行提高效率
this.checkExecutor.submit(request);
}
public void run() {
TransactionCheckListener transactionCheckListener = DefaultMQProducerImpl.this.checkListener();
TransactionListener transactionListener = getCheckListener();
if (transactionCheckListener != null || transactionListener != null) {
LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW;
Throwable exception = null;
try {
if (transactionCheckListener != null) {
localTransactionState = transactionCheckListener.checkLocalTransactionState(message);
} else if (transactionListener != null) {
// 檢查本地事務(wù)
log.debug("Used new check API in transaction message");
localTransactionState = transactionListener.checkLocalTransaction(message);
} else {
log.warn("CheckTransactionState, pick transactionListener by group[{}] failed", group);
}
} catch (Throwable e) {
log.error("Broker call checkTransactionState, but checkLocalTransactionState exception", e);
exception = e;
}
// 將本地事務(wù)狀態(tài)通知Broker
// 和第四步Producer第一次嘗試通知Broker一樣 也是單向發(fā)送 可能發(fā)送失敗
this.processTransactionState(
localTransactionState,
group,
exception);
} else {
log.warn("CheckTransactionState, pick transactionCheckListener by group[{}] failed", group);
}
}