一、broker事務消息處理流程
- 接收事務消息布蔗,存入half topic
- 接收事務消息定续,存入half topic ,這個其實跟發(fā)送普通消息差不多燥滑,只不過它這里要將原來的topic 渐北,queueId 換成事務消息的,也就是half topic 铭拧,queueId 是half topic 里面queue的id
- 提交事務
- 本地事務執(zhí)行完成之后赃蛛,會告訴broker 某個事務消息的本地執(zhí)行結果恃锉。如果是提交事務,會將原來存在half topic 中的事務消息取出來呕臂,換成原來的 topic 與queueId破托,接著就是將消息寫入commitlog中,存入成功之后歧蒋,將這個事務消息的執(zhí)行結果寫入到這個op half topic 中土砂,這步操作就是為了本地事務消息檢查器找出那種還沒有確認的消息。其實存入原來topic 之后谜洽,broker的reput線程就可以將這個消息在commitlog的位置寫到對應queue中了
- 回滾事務
- 如果是回滾消息的話萝映,只是將這個事務消息的執(zhí)行結果寫入到這個op half topic 中,消費者不會消費阐虚,等待被清理
- 檢查本地事務執(zhí)行結果
- broker 會有一個后臺線程不停的檢查那些沒有告訴broker 本地事務執(zhí)行結果的事務消息序臂,然后回調消息生產者問問這個事務消息對應的本地事務執(zhí)行如何了,是commit還是rollback实束。這里這個線程是60s檢查一次奥秆,然后檢查寫入half topic 超過6s還沒告訴本地事務執(zhí)行結果的消息
二、源碼分析
1咸灿、broker事務處理組件初始化
BrokerController#initialTransaction
private void initialTransaction() {
// spi方式加載TransactionalMessageService類构订,默認沒有
this.transactionalMessageService = ServiceProvider.loadClass(ServiceProvider.TRANSACTION_SERVICE_ID, TransactionalMessageService.class);
if (null == this.transactionalMessageService) {
// TransactionalMessageBridge是事務消息服務組件用來與存儲器交互使用的
this.transactionalMessageService = new TransactionalMessageServiceImpl(new TransactionalMessageBridge(this, this.getMessageStore()));
log.warn("Load default transaction message hook service: {}", TransactionalMessageServiceImpl.class.getSimpleName());
}
// spi方式加載AbstractTransactionalMessageCheckListener,默認沒有
// 事務消息檢查器找出沒有本地事務執(zhí)行結果的消息后避矢,會通知監(jiān)聽器AbstractTransactionalMessageCheckListener悼瘾,該監(jiān)聽器進行響應的處理
this.transactionalMessageCheckListener = ServiceProvider.loadClass(ServiceProvider.TRANSACTION_LISTENER_ID, AbstractTransactionalMessageCheckListener.class);
if (null == this.transactionalMessageCheckListener) {
this.transactionalMessageCheckListener = new DefaultTransactionalMessageCheckListener();
log.warn("Load default discard message hook service: {}", DefaultTransactionalMessageCheckListener.class.getSimpleName());
}
this.transactionalMessageCheckListener.setBrokerController(this);
// 事務消息檢查服務,檢查沒有返回事務執(zhí)行結果的消息
this.transactionalMessageCheckService = new TransactionalMessageCheckService(this);
}
2谷异、接收事務消息
SendMessageProcessor#asyncSendMessage
CompletableFuture<PutMessageResult> putMessageResult = null;
Map<String, String> origProps = MessageDecoder.string2messageProperties(requestHeader.getProperties());
String transFlag = origProps.get(MessageConst.PROPERTY_TRANSACTION_PREPARED);
if (transFlag != null && Boolean.parseBoolean(transFlag)) {
if (this.brokerController.getBrokerConfig().isRejectTransactionMessage()) {
response.setCode(ResponseCode.NO_PERMISSION);
response.setRemark(
"the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1()
+ "] sending transaction message is forbidden");
return CompletableFuture.completedFuture(response);
}
putMessageResult = this.brokerController.getTransactionalMessageService().asyncPrepareMessage(msgInner);
} else {
putMessageResult = this.brokerController.getMessageStore().asyncPutMessage(msgInner);
}
TransactionalMessageServiceImpl#asyncPrepareMessage
@Override
public CompletableFuture<PutMessageResult> asyncPrepareMessage(MessageExtBrokerInner messageInner) {
return transactionalMessageBridge.asyncPutHalfMessage(messageInner);
}
TransactionalMessageBridge#asyncPutHalfMessage
public CompletableFuture<PutMessageResult> asyncPutHalfMessage(MessageExtBrokerInner messageInner) {
return store.asyncPutMessage(parseHalfMessageInner(messageInner));
}
TransactionalMessageBridge#parseHalfMessageInner
private MessageExtBrokerInner parseHalfMessageInner(MessageExtBrokerInner msgInner) {
// REAL_TOPIC屬性值保存真實的topic
MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_TOPIC, msgInner.getTopic());
// REAL_QID屬性值保存真實的queueId
MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_QUEUE_ID,
String.valueOf(msgInner.getQueueId()));
msgInner.setSysFlag(
MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(), MessageSysFlag.TRANSACTION_NOT_TYPE));
// 設置topic是RMQ_SYS_TRANS_HALF_TOPIC
msgInner.setTopic(TransactionalMessageUtil.buildHalfTopic());
// 設置queueId是0
msgInner.setQueueId(0);
msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));
return msgInner;
}
- TransactionalMessageBridge#parseHalfMessageInner將事務消息重置
- store.asyncPutMessage(parseHalfMessageInner(messageInner))將重置的事務消息保存至文件存儲系統(tǒng)
3分尸、 提交事務或者回滾事務
EndTransactionProcessor#processRequest
OperationResult result = new OperationResult();
if (MessageSysFlag.TRANSACTION_COMMIT_TYPE == requestHeader.getCommitOrRollback()) {
// 獲取half topic的數(shù)據(jù)
result = this.brokerController.getTransactionalMessageService().commitMessage(requestHeader);
if (result.getResponseCode() == ResponseCode.SUCCESS) {
// 檢查數(shù)據(jù)
RemotingCommand res = checkPrepareMessage(result.getPrepareMessage(), requestHeader);
if (res.getCode() == ResponseCode.SUCCESS) {
// 將事務消息轉為原來的消息
MessageExtBrokerInner msgInner = endMessageTransaction(result.getPrepareMessage());
msgInner.setSysFlag(MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(), requestHeader.getCommitOrRollback()));
msgInner.setQueueOffset(requestHeader.getTranStateTableOffset());
msgInner.setPreparedTransactionOffset(requestHeader.getCommitLogOffset());
msgInner.setStoreTimestamp(result.getPrepareMessage().getStoreTimestamp());
MessageAccessor.clearProperty(msgInner, MessageConst.PROPERTY_TRANSACTION_PREPARED);
// 提交至存儲器
RemotingCommand sendResult = sendFinalMessage(msgInner);
if (sendResult.getCode() == ResponseCode.SUCCESS) {
// 提交成功后,將RMQ_SYS_TRANS_HALF_TOPIC置為RMQ_SYS_TRANS_OP_HALF_TOPIC歹嘹,為了后面清理
this.brokerController.getTransactionalMessageService().deletePrepareMessage(result.getPrepareMessage());
}
return sendResult;
}
return res;
}
} else if (MessageSysFlag.TRANSACTION_ROLLBACK_TYPE == requestHeader.getCommitOrRollback()) {
// 獲取數(shù)據(jù)
result = this.brokerController.getTransactionalMessageService().rollbackMessage(requestHeader);
if (result.getResponseCode() == ResponseCode.SUCCESS) {
// 檢查數(shù)據(jù)
RemotingCommand res = checkPrepareMessage(result.getPrepareMessage(), requestHeader);
if (res.getCode() == ResponseCode.SUCCESS) {
// 將RMQ_SYS_TRANS_HALF_TOPIC置為RMQ_SYS_TRANS_OP_HALF_TOPIC,為了后面清理
this.brokerController.getTransactionalMessageService().deletePrepareMessage(result.getPrepareMessage());
}
return res;
}
}
- commitMessage是獲取之前topic為RMQ_SYS_TRANS_HALF_TOPIC的消息
- endMessageTransaction將消息還原
- sendFinalMessage提交最終的消息
- deletePrepareMessage是創(chuàng)建topic為RMQ_SYS_TRANS_OP_HALF_TOPIC孔庭,tag是remove尺上,消息內容是之前的queueOffset的新消息,方便后面清除
4圆到、檢查事務執(zhí)行結果
TransactionalMessageCheckService#run
public void run() {
log.info("Start transaction check service thread!");
long checkInterval = brokerController.getBrokerConfig().getTransactionCheckInterval();
while (!this.isStopped()) {
this.waitForRunning(checkInterval);
}
log.info("End transaction check service thread!");
}
TransactionalMessageCheckService#onWaitEnd
@Override
protected void onWaitEnd() {
// 6s超時怎抛,half topic還在就檢查
long timeout = brokerController.getBrokerConfig().getTransactionTimeOut();
// 最大檢查次數(shù)15
int checkMax = brokerController.getBrokerConfig().getTransactionCheckMax();
long begin = System.currentTimeMillis();
log.info("Begin to check prepare message, begin time:{}", begin);
this.brokerController.getTransactionalMessageService().check(timeout, checkMax, this.brokerController.getTransactionalMessageCheckListener());
log.info("End to check prepare message, consumed time:{}", System.currentTimeMillis() - begin);
}
TransactionalMessageServiceImpl#check
if (isNeedCheck) {
// 重新塞到了commitLog中
if (!putBackHalfMsgQueue(msgExt, i)) {
continue;
}
// 發(fā)送
listener.resolveHalfMsg(msgExt);
}
AbstractTransactionalMessageCheckListener#resolveHalfMsg
public void resolveHalfMsg(final MessageExt msgExt) {
executorService.execute(new Runnable() {
@Override
public void run() {
try {
sendCheckMessage(msgExt);
} catch (Exception e) {
LOGGER.error("Send check message error!", e);
}
}
});
}
AbstractTransactionalMessageCheckListener#sendCheckMessage
發(fā)送RequestCode.CHECK_TRANSACTION_STATE的消息查詢本地事務執(zhí)行結果
public void sendCheckMessage(MessageExt msgExt) throws Exception {
CheckTransactionStateRequestHeader checkTransactionStateRequestHeader = new CheckTransactionStateRequestHeader();
checkTransactionStateRequestHeader.setCommitLogOffset(msgExt.getCommitLogOffset());
checkTransactionStateRequestHeader.setOffsetMsgId(msgExt.getMsgId());
checkTransactionStateRequestHeader.setMsgId(msgExt.getUserProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX));
checkTransactionStateRequestHeader.setTransactionId(checkTransactionStateRequestHeader.getMsgId());
checkTransactionStateRequestHeader.setTranStateTableOffset(msgExt.getQueueOffset());
msgExt.setTopic(msgExt.getUserProperty(MessageConst.PROPERTY_REAL_TOPIC));
msgExt.setQueueId(Integer.parseInt(msgExt.getUserProperty(MessageConst.PROPERTY_REAL_QUEUE_ID)));
msgExt.setStoreSize(0);
String groupId = msgExt.getProperty(MessageConst.PROPERTY_PRODUCER_GROUP);
Channel channel = brokerController.getProducerManager().getAvaliableChannel(groupId);
if (channel != null) {
brokerController.getBroker2Client().checkProducerTransactionState(groupId, channel, checkTransactionStateRequestHeader, msgExt);
} else {
LOGGER.warn("Check transaction failed, channel is null. groupId={}", groupId);
}
}