一、案例
public class TransactionProducer {
public static void main(String[] args) throws MQClientException, InterruptedException {
// 事務(wù)監(jiān)聽器
TransactionListener transactionListener = new TransactionListenerImpl();
// 創(chuàng)建生產(chǎn)者來(lái)發(fā)送事務(wù)消息
TransactionMQProducer producer = new TransactionMQProducer("please_rename_unique_group_name");
// 檢查事務(wù)的線程池
ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
thread.setName("client-transaction-msg-check-thread");
return thread;
}
});
// 設(shè)置檢查事務(wù)線程池
producer.setExecutorService(executorService);
// 設(shè)置事務(wù)監(jiān)聽器
producer.setTransactionListener(transactionListener);
producer.start();
String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};
for (int i = 0; i < 10; i++) {
try {
// 創(chuàng)建消息
Message msg =
new Message("TopicTest1234", tags[i % tags.length], "KEY" + i,
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
// 發(fā)送消息
SendResult sendResult = producer.sendMessageInTransaction(msg, null);
System.out.printf("%s%n", sendResult);
Thread.sleep(10);
} catch (MQClientException | UnsupportedEncodingException e) {
e.printStackTrace();
}
}
for (int i = 0; i < 100000; i++) {
Thread.sleep(1000);
}
producer.shutdown();
}
}
- 創(chuàng)建事務(wù)監(jiān)聽器仲墨,用來(lái)執(zhí)行事務(wù),以及檢查事務(wù)執(zhí)行的結(jié)果疏遏;事務(wù)監(jiān)聽器需要實(shí)現(xiàn)TransactionListener接口
public class TransactionListenerImpl implements TransactionListener {
private AtomicInteger transactionIndex = new AtomicInteger(0);
private ConcurrentHashMap<String, Integer> localTrans = new ConcurrentHashMap<>();
// 當(dāng)發(fā)送事務(wù)準(zhǔn)備(half)消息成功時(shí)脉课,將調(diào)用此方法執(zhí)行本地事務(wù)
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
int value = transactionIndex.getAndIncrement();
int status = value % 3;
localTrans.put(msg.getTransactionId(), status);
return LocalTransactionState.UNKNOW;
}
// 當(dāng)沒有回應(yīng)準(zhǔn)備(半)消息,代理將發(fā)送檢查消息來(lái)檢查事務(wù)狀態(tài)财异,并執(zhí)行此操作
// 方法被調(diào)用以獲取事務(wù)狀態(tài)
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
Integer status = localTrans.get(msg.getTransactionId());
if (null != status) {
switch (status) {
case 0:
return LocalTransactionState.UNKNOW;
case 1:
return LocalTransactionState.COMMIT_MESSAGE;
case 2:
return LocalTransactionState.ROLLBACK_MESSAGE;
default:
return LocalTransactionState.COMMIT_MESSAGE;
}
}
return LocalTransactionState.COMMIT_MESSAGE;
}
}
- 創(chuàng)建的線程池是用來(lái)執(zhí)行TransactionListenerImpl#checkLocalTransaction
二倘零、發(fā)送事務(wù)消息
TransactionMQProducer#sendMessageInTransaction
@Override
public TransactionSendResult sendMessageInTransaction(final Message msg,
final Object arg) throws MQClientException {
if (null == this.transactionListener) {
throw new MQClientException("TransactionListener is null", null);
}
msg.setTopic(NamespaceUtil.wrapNamespace(this.getNamespace(), msg.getTopic()));
return this.defaultMQProducerImpl.sendMessageInTransaction(msg, null, arg);
}
TransactionMQProducer#sendMessageInTransaction
public TransactionSendResult sendMessageInTransaction(final Message msg,
final LocalTransactionExecuter localTransactionExecuter, final Object arg)
throws MQClientException {
TransactionListener transactionListener = getCheckListener();
if (null == localTransactionExecuter && null == transactionListener) {
throw new MQClientException("tranExecutor is null", null);
}
// ignore DelayTimeLevel parameter
if (msg.getDelayTimeLevel() != 0) {
MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_DELAY_TIME_LEVEL);
}
Validators.checkMessage(msg, this.defaultMQProducer);
SendResult sendResult = null;
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TRANSACTION_PREPARED, "true");
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_PRODUCER_GROUP, this.defaultMQProducer.getProducerGroup());
try {
sendResult = this.send(msg);
} catch (Exception e) {
throw new MQClientException("send message Exception", e);
}
LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW;
Throwable localException = null;
switch (sendResult.getSendStatus()) {
case SEND_OK: {
try {
if (sendResult.getTransactionId() != null) {
msg.putUserProperty("__transactionId__", sendResult.getTransactionId());
}
String transactionId = msg.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);
if (null != transactionId && !"".equals(transactionId)) {
msg.setTransactionId(transactionId);
}
if (null != localTransactionExecuter) {
localTransactionState = localTransactionExecuter.executeLocalTransactionBranch(msg, arg);
} else if (transactionListener != null) {
log.debug("Used new transaction API");
localTransactionState = transactionListener.executeLocalTransaction(msg, arg);
}
if (null == localTransactionState) {
localTransactionState = LocalTransactionState.UNKNOW;
}
if (localTransactionState != LocalTransactionState.COMMIT_MESSAGE) {
log.info("executeLocalTransactionBranch return {}", localTransactionState);
log.info(msg.toString());
}
} catch (Throwable e) {
log.info("executeLocalTransactionBranch exception", e);
log.info(msg.toString());
localException = e;
}
}
break;
case FLUSH_DISK_TIMEOUT:
case FLUSH_SLAVE_TIMEOUT:
case SLAVE_NOT_AVAILABLE:
localTransactionState = LocalTransactionState.ROLLBACK_MESSAGE;
break;
default:
break;
}
try {
this.endTransaction(sendResult, localTransactionState, localException);
} catch (Exception e) {
log.warn("local transaction execute " + localTransactionState + ", but end broker transaction failed", e);
}
TransactionSendResult transactionSendResult = new TransactionSendResult();
transactionSendResult.setSendStatus(sendResult.getSendStatus());
transactionSendResult.setMessageQueue(sendResult.getMessageQueue());
transactionSendResult.setMsgId(sendResult.getMsgId());
transactionSendResult.setQueueOffset(sendResult.getQueueOffset());
transactionSendResult.setTransactionId(sendResult.getTransactionId());
transactionSendResult.setLocalTransactionState(localTransactionState);
return transactionSendResult;
}
- this.send(msg),給broker發(fā)送消息戳寸,其中
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TRANSACTION_PREPARED, "true")
告訴broker是事務(wù)消息呈驶,這時(shí)候,該消息不能被消費(fèi)者消費(fèi)
- 消息發(fā)送成功后疫鹊,開始執(zhí)行事務(wù)
localTransactionState = transactionListener.executeLocalTransaction(msg, arg);
- 事務(wù)執(zhí)行完后袖瞻,向broker發(fā)送事務(wù)執(zhí)行結(jié)果,若執(zhí)行成功拆吆,消費(fèi)者可以執(zhí)行該事務(wù)消息聋迎;若執(zhí)行失敗,消費(fèi)者不能消費(fèi)該事務(wù)消息
三枣耀、發(fā)送事務(wù)執(zhí)行結(jié)果
DefaultMQProducerImpl#endTransaction
public void endTransaction(
final SendResult sendResult,
final LocalTransactionState localTransactionState,
final Throwable localException) throws RemotingException, MQBrokerException, InterruptedException, UnknownHostException {
final MessageId id;
if (sendResult.getOffsetMsgId() != null) {
id = MessageDecoder.decodeMessageId(sendResult.getOffsetMsgId());
} else {
id = MessageDecoder.decodeMessageId(sendResult.getMsgId());
}
String transactionId = sendResult.getTransactionId();
final String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(sendResult.getMessageQueue().getBrokerName());
EndTransactionRequestHeader requestHeader = new EndTransactionRequestHeader();
requestHeader.setTransactionId(transactionId);
requestHeader.setCommitLogOffset(id.getOffset());
switch (localTransactionState) {
case COMMIT_MESSAGE:
requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_COMMIT_TYPE);
break;
case ROLLBACK_MESSAGE:
requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_ROLLBACK_TYPE);
break;
case UNKNOW:
requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_NOT_TYPE);
break;
default:
break;
}
requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());
requestHeader.setTranStateTableOffset(sendResult.getQueueOffset());
requestHeader.setMsgId(sendResult.getMsgId());
String remark = localException != null ? ("executeLocalTransactionBranch exception: " + localException.toString()) : null;
this.mQClientFactory.getMQClientAPIImpl().endTransactionOneway(brokerAddr, requestHeader, remark,
this.defaultMQProducer.getSendMsgTimeout());
}
四霉晕、總結(jié)
- 事務(wù)消息一開始并不是直接發(fā)送到指定的那個(gè)topic 對(duì)應(yīng)的隊(duì)列里面的,而是發(fā)送到RMQ_SYS_TRANS_HALF_TOPIC topic里面,防止消費(fèi)者消費(fèi)牺堰;然后broker響應(yīng)生產(chǎn)者拄轻,執(zhí)行executeLocalTransaction 方法來(lái)執(zhí)行本地事務(wù)
- 當(dāng)本地事務(wù)執(zhí)行成功,返回 commit提交事務(wù)伟葫,broker會(huì)先從RMQ_SYS_TRANS_HALF_TOPIC topic里面找到消息恨搓,恢復(fù)原來(lái)的樣子,存儲(chǔ)到生產(chǎn)者設(shè)置的topic扒俯;存儲(chǔ)成功之后生成一條刪除消息 并放到RMQ_SYS_TRANS_OP_HALF_TOPIC topic 里面
- 如果本地事務(wù)失敗奶卓,就要rollback ,會(huì)從RMQ_SYS_TRANS_HALF_TOPIC topic里面找到你那個(gè)消息撼玄,生成一條刪除消息放到RMQ_SYS_TRANS_OP_HALF_TOPIC topic 里面
五夺姑、異常情況處理
1、提交信息失敗
提交消息失敗掌猛,直接拋異常盏浙,不用執(zhí)行事務(wù)
2、事務(wù)執(zhí)行完后荔茬,向broker提交事務(wù)執(zhí)行結(jié)果失敗
broker 會(huì)有個(gè)事務(wù)服務(wù)線程废膘,隔一段時(shí)間就掃描RMQ_SYS_TRANS_HALF_TOPIC topic 里面沒有提交或者回滾的消息,然后它發(fā)送消息給生產(chǎn)者慕蔚,生產(chǎn)者執(zhí)行檢查事務(wù)的方法TransactionListener#checkLocalTransaction丐黄,詢問事務(wù)的執(zhí)行狀態(tài),默認(rèn)訪問15次