RocketMQ事務(wù)消息原理簡析

零喉脖、業(yè)務(wù)場景

在項(xiàng)目中,經(jīng)常遇到這樣一個(gè)場景抑月,需要保證數(shù)據(jù)持久化和消息發(fā)送要么同時(shí)成功树叽,要么同時(shí)失敗。比如當(dāng)用戶在交易系統(tǒng)下了一個(gè)訂單谦絮,購物車需要消費(fèi)訂單消息清除加購數(shù)據(jù)题诵、積分系統(tǒng)需要變更用戶積分、短信平臺(tái)需要給買家發(fā)送提醒等层皱,交易系統(tǒng)要將訂單落入DB和發(fā)送訂單消息保證一致性锭,不能本地事務(wù)回滾,訂單沒有生成但是發(fā)送了創(chuàng)建訂單消息叫胖,下游系統(tǒng)產(chǎn)生臟數(shù)據(jù)草冈,也不能訂單已經(jīng)創(chuàng)建,但是下游系統(tǒng)沒有感知繼而無法履約瓮增,影響用戶體驗(yàn)怎棱。
如果讓我們自己實(shí)現(xiàn)的話,當(dāng)然也是有辦法的绷跑。比如在業(yè)務(wù)數(shù)據(jù)庫中建立一張消息表用于存儲(chǔ)消息拳恋,將業(yè)務(wù)數(shù)據(jù)和消息數(shù)據(jù)放在同一個(gè)事務(wù)中進(jìn)行存儲(chǔ),就可以利用數(shù)據(jù)庫事務(wù)保證同時(shí)原子性砸捏。后續(xù)可以定時(shí)掃描消息表谬运,將消息數(shù)據(jù)再發(fā)送出去。
當(dāng)然也可以用現(xiàn)成的解決方案垦藏,RocketMQ從4.3.0版本開始梆暖,支持事務(wù)消息。我們只需要編寫對(duì)應(yīng)的本地事務(wù)執(zhí)行方法executeLocalTransaction和本地事務(wù)執(zhí)行結(jié)果檢查方法checkLocalTransaction掂骏,RocketMQ會(huì)自動(dòng)調(diào)用本地事務(wù)執(zhí)行。如果本地事務(wù)執(zhí)行成功,下游才能消費(fèi)到消息滑废,如果本地事務(wù)執(zhí)行失敗蠕趁,下游是無法感知到這條消息的

一、使用方法

使用RocketMQ發(fā)送事務(wù)消息棚点,只有消息發(fā)送和普通消息發(fā)送有所區(qū)別贬循。參見官方示例:
// TransactionProducer.java
// 需要自定義一個(gè)TransactionListener用于執(zhí)行事務(wù)executeLocalTransaction和事務(wù)執(zhí)行結(jié)果回查checkLocalTransaction 代碼在下面
TransactionListener transactionListener = new TransactionListenerImpl();
// 事務(wù)消息發(fā)送producer
TransactionMQProducer producer = new TransactionMQProducer(PRODUCER_GROUP);
producer.setNamesrvAddr(DEFAULT_NAMESRVADDR);
// 創(chuàng)建一個(gè)線程池 用于Broker回查本地事務(wù)執(zhí)行狀態(tài) 如果這里沒有創(chuàng)建葡缰,RocketMQ會(huì)自動(dòng)創(chuàng)建一個(gè)線程池
ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<>(2000), r -> {
  Thread thread = new Thread(r);
  thread.setName("client-transaction-msg-check-thread");
  return thread;
});

producer.setExecutorService(executorService);
producer.setTransactionListener(transactionListener);
producer.start();

String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};
for (int i = 0; i < MESSAGE_COUNT; i++) {
  try {
    Message msg =
      new Message(TOPIC, tags[i % tags.length], "KEY" + i,
                  ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
    // 發(fā)送事務(wù)消息
    SendResult sendResult = producer.sendMessageInTransaction(msg, null);
    System.out.printf("%s%n", sendResult);

    Thread.sleep(10);
  } catch (MQClientException | UnsupportedEncodingException e) {
    e.printStackTrace();
  }
}
public class TransactionListenerImpl implements TransactionListener {
    private AtomicInteger transactionIndex = new AtomicInteger(0);

    private ConcurrentHashMap<String, Integer> localTrans = new ConcurrentHashMap<>();

    /**
     * 執(zhí)行本地事務(wù)
     */
    @Override
    public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
        int value = transactionIndex.getAndIncrement();
        int status = value % 3;
        localTrans.put(msg.getTransactionId(), status);
        return LocalTransactionState.UNKNOW;
    }

    /**
     * 本地事務(wù)執(zhí)行狀態(tài)回查
     */
    @Override
    public LocalTransactionState checkLocalTransaction(MessageExt msg) {
        Integer status = localTrans.get(msg.getTransactionId());
        if (null != status) {
            switch (status) {
                case 0:
                    return LocalTransactionState.UNKNOW;
                case 1:
                    return LocalTransactionState.COMMIT_MESSAGE;
                case 2:
                    return LocalTransactionState.ROLLBACK_MESSAGE;
                default:
                    return LocalTransactionState.COMMIT_MESSAGE;
            }
        }
        return LocalTransactionState.COMMIT_MESSAGE;
    }
}
本地事務(wù)的執(zhí)行狀態(tài),有三種結(jié)果:
  • LocalTransactionState.COMMIT_MESSAGE:事務(wù)執(zhí)行成功,Broker會(huì)處理消息供下游消費(fèi)
  • LocalTransactionState.ROLLBACK_MESSAGE:事務(wù)被回滾嗦嗡,Broker會(huì)刪除消息,下游感知不到消息
  • LocalTransactionState.UNKNOW:事務(wù)的執(zhí)行結(jié)果未知亿乳,比如事務(wù)還在執(zhí)行中,稍后Broker會(huì)回重復(fù)回查,直到超過最大時(shí)間或者最大次數(shù)

二勋磕、原理解析

0妈候、整體流程

image.png
  1. Producer發(fā)送事務(wù)消息
  2. Broker端SendMessageProcessor收到消息后,判斷如果是一條事務(wù)消息挂滓,會(huì)將消息原來的topic和隊(duì)列id存儲(chǔ)到消息拓展中苦银,設(shè)置新的topic為RMQ_SYS_TRANS_HALF_TOPIC然后 進(jìn)行存儲(chǔ),然后通知Producer
  3. Producer收到Broker消息發(fā)送成功后,開始執(zhí)行本地事務(wù)
  4. 本地事務(wù)執(zhí)行完畢幔虏,Producer將事務(wù)執(zhí)行狀態(tài)通知Broker
  5. Broker端EndTransactionProcessor收到事務(wù)執(zhí)行狀態(tài)纺念,從RMQ_SYS_TRANS_HALF_TOPIC中取出消息。如果事務(wù)執(zhí)行成功想括,則從消息拓展中取出原本的topic和隊(duì)列id柠辞,存儲(chǔ)到真實(shí)的topic和隊(duì)列id中,存儲(chǔ)到RMQ_SYS_TRANS_OP_HALF_TOPIC主題中主胧;如果是事務(wù)回滾,只把消息存儲(chǔ)到RMQ_SYS_TRANS_OP_HALF_TOPIC主題中
  6. 如果Broker沒有收到Producer事務(wù)執(zhí)行狀態(tài)的通知习勤,Broker端TransactionalMessageCheckService會(huì)主動(dòng)定時(shí)從RMQ_SYS_TRANS_HALF_TOPIC中撈取消息踪栋,判斷是否有需要回查的消息
  7. 如果有需要回查的消息,Broker端TransactionalMessageCheckService會(huì)向Producer回查事務(wù)狀態(tài)
  8. Producer執(zhí)行TransactionListener的checkLocalTransaction方法图毕,查詢事務(wù)執(zhí)行狀態(tài)
  9. Producer查詢本地事務(wù)狀態(tài)之后再執(zhí)行上述第4步和第5步

1夷都、Producer發(fā)送消息

// 編寫TransactionListener實(shí)現(xiàn)類用于執(zhí)行本地事務(wù)和本地事務(wù)回查
TransactionListener transactionListener = new TransactionListenerImpl();
// 發(fā)送事務(wù)消息專用的TransactionMQProducer
TransactionMQProducer producer = new TransactionMQProducer(PRODUCER_GROUP);

producer.setNamesrvAddr(DEFAULT_NAMESRVADDR);
// 新建一個(gè)線程池用于異步執(zhí)行從Broker過來的事務(wù)回查 如果這里不新建 Broker也會(huì)自動(dòng)創(chuàng)建一個(gè)
ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<>(2000), r -> {
    Thread thread = new Thread(r);
    thread.setName("client-transaction-msg-check-thread");
    return thread;
});

producer.setExecutorService(executorService);
producer.setTransactionListener(transactionListener);
producer.start();

String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};
for (int i = 0; i < MESSAGE_COUNT; i++) {
    try {
        Message msg =
            new Message(TOPIC, tags[i % tags.length], "KEY" + i,
                ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
        // 發(fā)送事務(wù)消息
        SendResult sendResult = producer.sendMessageInTransaction(msg, null);
        System.out.printf("%s%n", sendResult);

        Thread.sleep(10);
    } catch (MQClientException | UnsupportedEncodingException e) {
        e.printStackTrace();
    }
}
// 發(fā)送事務(wù)消息
public TransactionSendResult sendMessageInTransaction(final Message msg,
    final LocalTransactionExecuter localTransactionExecuter, final Object arg)
    throws MQClientException {
    // 校驗(yàn)是否設(shè)置TransactionListener 發(fā)送事務(wù)消息必須要有TransactionListener
    TransactionListener transactionListener = getCheckListener();
    if (null == localTransactionExecuter && null == transactionListener) {
        throw new MQClientException("tranExecutor is null", null);
    }

    // ignore DelayTimeLevel parameter
    if (msg.getDelayTimeLevel() != 0) {
        // 事務(wù)消息不支持延遲發(fā)送
        MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_DELAY_TIME_LEVEL);
    }

    Validators.checkMessage(msg, this.defaultMQProducer);

    SendResult sendResult = null;
    // 標(biāo)記prepare消息 Broker根據(jù)這個(gè)判斷是否是一條事務(wù)消息
    MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TRANSACTION_PREPARED, "true");
    // 設(shè)置消息生產(chǎn)者組 為了查詢事務(wù)消息本地事務(wù)狀態(tài)時(shí) 從該生產(chǎn)者組中隨機(jī)選擇一個(gè)消息生產(chǎn)者即可
    MessageAccessor.putProperty(msg, MessageConst.PROPERTY_PRODUCER_GROUP, this.defaultMQProducer.getProducerGroup());
    try {
        sendResult = this.send(msg);
    } catch (Exception e) {
        throw new MQClientException("send message Exception", e);
    }

    // ......
    // 對(duì)發(fā)送結(jié)果的處理稍后解析
}

2、Broker接收事務(wù)消息

// asyncSendMessage方法
CompletableFuture<PutMessageResult> putMessageResult = null;
// 依據(jù)消息是否有MessageConst.PROPERTY_TRANSACTION_PREPARED判斷是否事務(wù)消息
String transFlag = origProps.get(MessageConst.PROPERTY_TRANSACTION_PREPARED);
if (Boolean.parseBoolean(transFlag)) {
    // 處理事務(wù)消息
    if (this.brokerController.getBrokerConfig().isRejectTransactionMessage()) {
        response.setCode(ResponseCode.NO_PERMISSION);
        response.setRemark(
                "the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1()
                        + "] sending transaction message is forbidden");
        return CompletableFuture.completedFuture(response);
    }
    // 保存事務(wù)消息
    putMessageResult = this.brokerController.getTransactionalMessageService().asyncPrepareMessage(msgInner);
} else {
    // 保存消息
    putMessageResult = this.brokerController.getMessageStore().asyncPutMessage(msgInner);
}
private MessageExtBrokerInner parseHalfMessageInner(MessageExtBrokerInner msgInner) {
    // 備份原本的topic和隊(duì)列
    MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_TOPIC, msgInner.getTopic());
    MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_QUEUE_ID,
        String.valueOf(msgInner.getQueueId()));
    msgInner.setSysFlag(
        MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(), MessageSysFlag.TRANSACTION_NOT_TYPE));
    // 設(shè)置新的topic為RMQ_SYS_TRANS_HALF_TOPIC
    msgInner.setTopic(TransactionalMessageUtil.buildHalfTopic());
    // 隊(duì)列是寫死的 只有一個(gè) 也就是說是順序處理的
    msgInner.setQueueId(0);
    msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));
    return msgInner;
}

3予颤、Producer處理發(fā)送消息結(jié)果

// sendMessageInTransaction方法
try {
    // 事務(wù)消息發(fā)送結(jié)果
    sendResult = this.send(msg);
} catch (Exception e) {
    throw new MQClientException("send message Exception", e);
}

LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW;
Throwable localException = null;
// 處理事務(wù)消息發(fā)送結(jié)果
switch (sendResult.getSendStatus()) {
    // 發(fā)送成功的情況
    case SEND_OK: {
        try {
            if (sendResult.getTransactionId() != null) {
                msg.putUserProperty("__transactionId__", sendResult.getTransactionId());
            }
            String transactionId = msg.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);
            if (null != transactionId && !"".equals(transactionId)) {
                msg.setTransactionId(transactionId);
            }
            if (null != localTransactionExecuter) {
                // 這個(gè)已經(jīng)廢棄了 不會(huì)進(jìn)入這里
                localTransactionState = localTransactionExecuter.executeLocalTransactionBranch(msg, arg);
            } else if (transactionListener != null) {
                // 這里執(zhí)行本地事務(wù)
                log.debug("Used new transaction API");
                localTransactionState = transactionListener.executeLocalTransaction(msg, arg);
            }
            if (null == localTransactionState) {
                localTransactionState = LocalTransactionState.UNKNOW;
            }

            if (localTransactionState != LocalTransactionState.COMMIT_MESSAGE) {
                log.info("executeLocalTransactionBranch return {}", localTransactionState);
                log.info(msg.toString());
            }
        } catch (Throwable e) {
            // 有catch邏輯 是考慮到事務(wù)執(zhí)行異常的場景
            log.info("executeLocalTransactionBranch exception", e);
            log.info(msg.toString());
            localException = e;
        }
    }
    break;
    case FLUSH_DISK_TIMEOUT:
    case FLUSH_SLAVE_TIMEOUT:
    case SLAVE_NOT_AVAILABLE:
        localTransactionState = LocalTransactionState.ROLLBACK_MESSAGE;
        break;
    default:
        break;
}

4囤官、Producer通知Broker事務(wù)執(zhí)行狀態(tài)

try {
    // 事務(wù)消息收尾工作 通知Broker干活
    this.endTransaction(msg, sendResult, localTransactionState, localException);
} catch (Exception e) {
    log.warn("local transaction execute " + localTransactionState + ", but end broker transaction failed", e);
}
public void endTransaction(
    final Message msg,
    final SendResult sendResult,
    final LocalTransactionState localTransactionState,
    final Throwable localException) throws RemotingException, MQBrokerException, InterruptedException, UnknownHostException {
    final MessageId id;
    if (sendResult.getOffsetMsgId() != null) {
        id = MessageDecoder.decodeMessageId(sendResult.getOffsetMsgId());
    } else {
        id = MessageDecoder.decodeMessageId(sendResult.getMsgId());
    }
    String transactionId = sendResult.getTransactionId();
    final String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(sendResult.getMessageQueue().getBrokerName());
    EndTransactionRequestHeader requestHeader = new EndTransactionRequestHeader();
    requestHeader.setTransactionId(transactionId);
    requestHeader.setCommitLogOffset(id.getOffset());
    // 設(shè)置本地事務(wù)執(zhí)行狀態(tài)
    switch (localTransactionState) {
        case COMMIT_MESSAGE:
            requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_COMMIT_TYPE);
            break;
        case ROLLBACK_MESSAGE:
            requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_ROLLBACK_TYPE);
            break;
        case UNKNOW:
            requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_NOT_TYPE);
            break;
        default:
            break;
    }

    doExecuteEndTransactionHook(msg, sendResult.getMsgId(), brokerAddr, localTransactionState, false);
    requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());
    requestHeader.setTranStateTableOffset(sendResult.getQueueOffset());
    requestHeader.setMsgId(sendResult.getMsgId());
    String remark = localException != null ? ("executeLocalTransactionBranch exception: " + localException.toString()) : null;
    // 發(fā)送消息給Broker
    this.mQClientFactory.getMQClientAPIImpl().endTransactionOneway(brokerAddr, requestHeader, remark,
        this.defaultMQProducer.getSendMsgTimeout());
}
public void endTransactionOneway(
    final String addr,
    final EndTransactionRequestHeader requestHeader,
    final String remark,
    final long timeoutMillis
) throws RemotingException, InterruptedException {
    // 指定Broker使用EndTransactionProcessor處理
    RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.END_TRANSACTION, requestHeader);

    request.setRemark(remark);
    // 單向消息 不考慮發(fā)送結(jié)果 
    // 也就是說 是可能發(fā)送失敗的 發(fā)送失敗之后Broker會(huì)回查
    this.remotingClient.invokeOneway(addr, request, timeoutMillis);
}

5、Broker處理事務(wù)執(zhí)行狀態(tài)通知

// processRequest方法
// 上面的代理是Broker向Producer回查事務(wù)后的處理 稍后解析
else {
    // 發(fā)送半消息之后產(chǎn)生的調(diào)用
    switch (requestHeader.getCommitOrRollback()) {
        // 如果事務(wù)執(zhí)行不是commit或者rollback 直接返回 不再進(jìn)行下面的邏輯
        case MessageSysFlag.TRANSACTION_NOT_TYPE: {
            LOGGER.warn("The producer[{}] end transaction in sending message,  and it's pending status."
                    + "RequestHeader: {} Remark: {}",
                RemotingHelper.parseChannelRemoteAddr(ctx.channel()),
                requestHeader.toString(),
                request.getRemark());
            return null;
        }

        // 如果事務(wù)執(zhí)行是commit 接著下面的處理
        case MessageSysFlag.TRANSACTION_COMMIT_TYPE: {
            break;
        }
        // 如果事務(wù)執(zhí)行是rollback 打印異常日志 接著下面的處理
        case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE: {
            LOGGER.warn("The producer[{}] end transaction in sending message, rollback the message."
                    + "RequestHeader: {} Remark: {}",
                RemotingHelper.parseChannelRemoteAddr(ctx.channel()),
                requestHeader.toString(),
                request.getRemark());
            break;
        }
        default:
            return null;
    }
}
OperationResult result = new OperationResult();
if (MessageSysFlag.TRANSACTION_COMMIT_TYPE == requestHeader.getCommitOrRollback()) {
    // 根據(jù)偏移量 取出topic是RMQ_SYS_TRANS_HALF_TOPIC的消息
    // 第2步Broker保存消息之后 會(huì)把偏移量通知Producer Producer再傳到這里
    result = this.brokerController.getTransactionalMessageService().commitMessage(requestHeader);
    // 事務(wù)執(zhí)行狀態(tài)是commit
    if (result.getResponseCode() == ResponseCode.SUCCESS) {
        RemotingCommand res = checkPrepareMessage(result.getPrepareMessage(), requestHeader);
        if (res.getCode() == ResponseCode.SUCCESS) {
            // 從消息中取出原來的topic和隊(duì)列等信息 構(gòu)建真實(shí)消息
            MessageExtBrokerInner msgInner = endMessageTransaction(result.getPrepareMessage());
            msgInner.setSysFlag(MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(), requestHeader.getCommitOrRollback()));
            msgInner.setQueueOffset(requestHeader.getTranStateTableOffset());
            msgInner.setPreparedTransactionOffset(requestHeader.getCommitLogOffset());
            msgInner.setStoreTimestamp(result.getPrepareMessage().getStoreTimestamp());
            // 清除事務(wù)消息相關(guān)標(biāo)記 防止循環(huán)處理
            MessageAccessor.clearProperty(msgInner, MessageConst.PROPERTY_TRANSACTION_PREPARED);
            // 保存真實(shí)消息
            RemotingCommand sendResult = sendFinalMessage(msgInner);
            if (sendResult.getCode() == ResponseCode.SUCCESS) {
                // 刪除消息RMQ_SYS_TRANS_HALF_TOPIC
                // 實(shí)際上是投遞到RMQ_SYS_TRANS_OP_HALF_TOPIC中 并標(biāo)記刪除 
                // 這里為什么還要投遞到RMQ_SYS_TRANS_OP_HALF_TOPIC中 不直接刪除呢 后面還需要根據(jù)這個(gè)判斷是否是重復(fù)處理等
                this.brokerController.getTransactionalMessageService().deletePrepareMessage(result.getPrepareMessage());
            }
            return sendResult;
        }
        return res;
    }
} else if (MessageSysFlag.TRANSACTION_ROLLBACK_TYPE == requestHeader.getCommitOrRollback()) {
    // 如果事務(wù)狀態(tài)是rollback 刪除消息RMQ_SYS_TRANS_HALF_TOPIC 投遞到RMQ_SYS_TRANS_OP_HALF_TOPIC中并標(biāo)記刪除
    // 比commit少了一個(gè)投遞真實(shí)主題的步驟
    result = this.brokerController.getTransactionalMessageService().rollbackMessage(requestHeader);
    if (result.getResponseCode() == ResponseCode.SUCCESS) {
        RemotingCommand res = checkPrepareMessage(result.getPrepareMessage(), requestHeader);
        if (res.getCode() == ResponseCode.SUCCESS) {
            this.brokerController.getTransactionalMessageService().deletePrepareMessage(result.getPrepareMessage());
        }
        return res;
    }
}

6蛤虐、Broker主動(dòng)撈取消息

TransactionalMessageCheckService類實(shí)現(xiàn)Runnable接口党饮,在Broker啟動(dòng)的時(shí)候,回調(diào)用BrokerController的start方法驳庭,在start方法中刑顺,會(huì)調(diào)用TransactionalMessageCheckService的start方法啟動(dòng)線程,run方法是一個(gè)死循環(huán)饲常,默認(rèn)每6秒執(zhí)行一次

public void run() {
    log.info("Start transaction check service thread!");
    long checkInterval = brokerController.getBrokerConfig().getTransactionCheckInterval();
    while (!this.isStopped()) {
        this.waitForRunning(checkInterval);
    }
    log.info("End transaction check service thread!");
}
循環(huán)中實(shí)際執(zhí)行的是這個(gè)方法
protected void onWaitEnd() {
    // 事務(wù)過期時(shí)間 只有當(dāng)消息存儲(chǔ)時(shí)間加上這個(gè)過期時(shí)間大于系統(tǒng)當(dāng)前時(shí)間 才對(duì)消息執(zhí)行事務(wù)回查 否則在下一次周期中執(zhí)行事務(wù)回查操作
    long timeout = brokerController.getBrokerConfig().getTransactionTimeOut();
    // 事務(wù)回查最大檢測次數(shù) 如果超過最大檢測次數(shù)還是無法獲知消息的事務(wù)狀態(tài) 不會(huì)再會(huì)回查 直接丟棄相當(dāng)于回滾事務(wù)
    int checkMax = brokerController.getBrokerConfig().getTransactionCheckMax();
    long begin = System.currentTimeMillis();
    log.info("Begin to check prepare message, begin time:{}", begin);
    this.brokerController.getTransactionalMessageService().check(timeout, checkMax, this.brokerController.getTransactionalMessageCheckListener());
    log.info("End to check prepare message, consumed time:{}", System.currentTimeMillis() - begin);
}
public void check(long transactionTimeout, int transactionCheckMax,
        AbstractTransactionalMessageCheckListener listener) {
    try {
        // 獲取事務(wù)半消息主題下的全部隊(duì)列 然后依次處理
        String topic = TopicValidator.RMQ_SYS_TRANS_HALF_TOPIC;
        Set<MessageQueue> msgQueues = transactionalMessageBridge.fetchMessageQueues(topic);
        if (msgQueues == null || msgQueues.size() == 0) {
            log.warn("The queue of topic is empty :" + topic);
            return;
        }
        log.debug("Check topic={}, queues={}", topic, msgQueues);
        for (MessageQueue messageQueue : msgQueues) {
            long startTime = System.currentTimeMillis();
            MessageQueue opQueue = getOpQueue(messageQueue);
            // RMQ_SYS_TRANS_HALF_TOPIC消息消費(fèi)進(jìn)度
            long halfOffset = transactionalMessageBridge.fetchConsumeOffset(messageQueue);
            // 收到事務(wù)消息提交或者回滾請(qǐng)求后的MQ_SYS_TRANS_OP_HALF_TOPIC中消息消費(fèi)進(jìn)度
            long opOffset = transactionalMessageBridge.fetchConsumeOffset(opQueue);
            log.info("Before check, the queue={} msgOffset={} opOffset={}", messageQueue, halfOffset, opOffset);
            if (halfOffset < 0 || opOffset < 0) {
                log.error("MessageQueue: {} illegal offset read: {}, op offset: {},skip this queue", messageQueue,
                    halfOffset, opOffset);
                continue;
            }

            List<Long> doneOpOffset = new ArrayList<>();
            HashMap<Long, Long> removeMap = new HashMap<>();
            // 根據(jù)當(dāng)前的處理進(jìn)度 依次從已處理隊(duì)列MQ_SYS_TRANS_OP_HALF_TOPIC拉取32條消息
            PullResult pullResult = fillOpRemoveMap(removeMap, opQueue, opOffset, halfOffset, doneOpOffset);
            if (null == pullResult) {
                log.error("The queue={} check msgOffset={} with opOffset={} failed, pullResult is null",
                    messageQueue, halfOffset, opOffset);
                continue;
            }
            // single thread
            // 獲取空消息的次數(shù)
            int getMessageNullCount = 1;
            // 當(dāng)前處理半消息的進(jìn)度
            long newOffset = halfOffset;
            // 當(dāng)前處理消息的隊(duì)列偏移量
            long i = halfOffset;
            while (true) {
                // 超過時(shí)常等待下次調(diào)度
                if (System.currentTimeMillis() - startTime > MAX_PROCESS_TIME_LIMIT) {
                    log.info("Queue={} process time reach max={}", messageQueue, MAX_PROCESS_TIME_LIMIT);
                    break;
                }
                // 如果該消息已被處理 繼續(xù)處理下一條消息
                if (removeMap.containsKey(i)) {
                    log.debug("Half offset {} has been committed/rolled back", i);
                    Long removedOpOffset = removeMap.remove(i);
                    doneOpOffset.add(removedOpOffset);
                } else {
                    // 獲取消息
                    GetResult getResult = getHalfMsg(messageQueue, i);
                    MessageExt msgExt = getResult.getMsg();
                    if (msgExt == null) {
                        // 超過重試次數(shù) 直接跳出 結(jié)束該消息隊(duì)列的事務(wù)回查狀態(tài)
                        if (getMessageNullCount++ > MAX_RETRY_COUNT_WHEN_HALF_NULL) {
                            break;
                        }
                        // 沒有新的消息而返回 結(jié)束該消息隊(duì)列的事務(wù)回查
                        if (getResult.getPullResult().getPullStatus() == PullStatus.NO_NEW_MSG) {
                            log.debug("No new msg, the miss offset={} in={}, continue check={}, pull result={}", i,
                                messageQueue, getMessageNullCount, getResult.getPullResult());
                            break;
                        } else {
                            log.info("Illegal offset, the miss offset={} in={}, continue check={}, pull result={}",
                                i, messageQueue, getMessageNullCount, getResult.getPullResult());
                            // 其它原因 重新拉取
                            i = getResult.getPullResult().getNextBeginOffset();
                            newOffset = i;
                            continue;
                        }
                    }

                    // needDiscard 如果該消息回查的次數(shù)超過允許回查的最大次數(shù) 該消息將被丟棄 事務(wù)消息提交失敗 每回查一次 在消息屬性中+1 默認(rèn)回查最大次數(shù)為5
                    // needSkip 如果事務(wù)消息超過文件的過期時(shí)間 默認(rèn)72小時(shí) 則跳過該消息
                    if (needDiscard(msgExt, transactionCheckMax) || needSkip(msgExt)) {
                        listener.resolveDiscardMsg(msgExt);
                        newOffset = i + 1;
                        i++;
                        continue;
                    }
                    if (msgExt.getStoreTimestamp() >= startTime) {
                        log.debug("Fresh stored. the miss offset={}, check it later, store={}", i,
                            new Date(msgExt.getStoreTimestamp()));
                        break;
                    }

                    // 消息已存儲(chǔ)時(shí)間 當(dāng)前系統(tǒng)時(shí)間減去消息存儲(chǔ)時(shí)間
                    long valueOfCurrentMinusBorn = System.currentTimeMillis() - msgExt.getBornTimestamp();
                    // checkImmunityTime 立即檢測事務(wù)消息的時(shí)間 設(shè)計(jì)的意義是 應(yīng)用程序在發(fā)送事務(wù)消息后 事務(wù)不會(huì)馬上提交 該時(shí)間就是假設(shè)事務(wù)消息發(fā)送成功后 應(yīng)用程序事務(wù)提交時(shí)間 在這段時(shí)間內(nèi) RocketMQ任務(wù)事務(wù)未提交 不應(yīng)該在這個(gè)時(shí)間段內(nèi)向應(yīng)用程序發(fā)送回查請(qǐng)求
                    // transactionTimeout 事務(wù)消息的超時(shí)時(shí)間 這個(gè)時(shí)間是從OP拉取消息的最后一條消息存儲(chǔ)時(shí)間與check方法開始的時(shí)間 如果時(shí)間差超過了transactionTimeout 就算時(shí)間小于checkImmunityTime 也發(fā)送事務(wù)回查指令
                    long checkImmunityTime = transactionTimeout;
                    // PROPERTY_CHECK_IMMUNITY_TIME_IN_SECONDS 消息事務(wù)消息回查的最晚時(shí)間 單位為秒 指的是程序發(fā)送事務(wù)消息 可以指定該事務(wù)消息的有效時(shí)間 只有在這個(gè)時(shí)間內(nèi)收到回查消息才有效 默認(rèn)為null
                    String checkImmunityTimeStr = msgExt.getUserProperty(MessageConst.PROPERTY_CHECK_IMMUNITY_TIME_IN_SECONDS);
                    if (null != checkImmunityTimeStr) {
                        checkImmunityTime = getImmunityTime(checkImmunityTimeStr, transactionTimeout);
                        if (valueOfCurrentMinusBorn < checkImmunityTime) {
                            if (checkPrepareQueueOffset(removeMap, doneOpOffset, msgExt)) {
                                newOffset = i + 1;
                                i++;
                                continue;
                            }
                        }
                    } else {
                        // 如果當(dāng)前時(shí)間沒過應(yīng)用程序事務(wù)結(jié)束時(shí)間 跳出本次處理
                        if ((0 <= valueOfCurrentMinusBorn) && (valueOfCurrentMinusBorn < checkImmunityTime)) {
                            log.debug("New arrived, the miss offset={}, check it later checkImmunity={}, born={}", i,
                                checkImmunityTime, new Date(msgExt.getBornTimestamp()));
                            break;
                        }
                    }
                    List<MessageExt> opMsg = pullResult.getMsgFoundList();
                    // 如果OP隊(duì)列中沒有已處理消息并且已經(jīng)超過應(yīng)用程序事務(wù)結(jié)束時(shí)間transactionTimeout
                    // 或者
                    // 操作隊(duì)列不為空并且最后一條消息的存儲(chǔ)時(shí)間已經(jīng)超過transactionTimeout
                    boolean isNeedCheck = (opMsg == null && valueOfCurrentMinusBorn > checkImmunityTime)
                        || (opMsg != null && (opMsg.get(opMsg.size() - 1).getBornTimestamp() - startTime > transactionTimeout))
                        || (valueOfCurrentMinusBorn <= -1);

                    if (isNeedCheck) {
                        // 這里回查是異步處理的 所以在回查之前 需要把消息重新投遞到隊(duì)列中 以便下次check
                        if (!putBackHalfMsgQueue(msgExt, i)) {
                            continue;
                        }
                        // 執(zhí)行回查邏輯
                        listener.resolveHalfMsg(msgExt);
                    } else {
                        // 如果無法判斷是否發(fā)送回查消息 則加載更多的已處理消息進(jìn)行篩選
                        pullResult = fillOpRemoveMap(removeMap, opQueue, pullResult.getNextBeginOffset(), halfOffset, doneOpOffset);
                        log.debug("The miss offset:{} in messageQueue:{} need to get more opMsg, result is:{}", i,
                            messageQueue, pullResult);
                        continue;
                    }
                }
                newOffset = i + 1;
                i++;
            }
            if (newOffset != halfOffset) {
                // 保存半消息的回查進(jìn)度
                transactionalMessageBridge.updateConsumeOffset(messageQueue, newOffset);
            }
            long newOpOffset = calculateOpOffset(doneOpOffset, opOffset);
            if (newOpOffset != opOffset) {
                // 保存OP進(jìn)度
                transactionalMessageBridge.updateConsumeOffset(opQueue, newOpOffset);
            }
        }
    } catch (Throwable e) {
        log.error("Check error", e);
    }

}

7蹲堂、Broker主動(dòng)回查事務(wù)狀態(tài)

public void resolveHalfMsg(final MessageExt msgExt) {
    executorService.execute(new Runnable() {
        @Override
        public void run() {
            try {
                sendCheckMessage(msgExt);
            } catch (Exception e) {
                LOGGER.error("Send check message error!", e);
            }
        }
    });
}

public void sendCheckMessage(MessageExt msgExt) throws Exception {
    CheckTransactionStateRequestHeader checkTransactionStateRequestHeader = new CheckTransactionStateRequestHeader();
    checkTransactionStateRequestHeader.setCommitLogOffset(msgExt.getCommitLogOffset());
    checkTransactionStateRequestHeader.setOffsetMsgId(msgExt.getMsgId());
    checkTransactionStateRequestHeader.setMsgId(msgExt.getUserProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX));
    checkTransactionStateRequestHeader.setTransactionId(checkTransactionStateRequestHeader.getMsgId());
    checkTransactionStateRequestHeader.setTranStateTableOffset(msgExt.getQueueOffset());
    msgExt.setTopic(msgExt.getUserProperty(MessageConst.PROPERTY_REAL_TOPIC));
    msgExt.setQueueId(Integer.parseInt(msgExt.getUserProperty(MessageConst.PROPERTY_REAL_QUEUE_ID)));
    msgExt.setStoreSize(0);
    String groupId = msgExt.getProperty(MessageConst.PROPERTY_PRODUCER_GROUP);
    // 從同一個(gè)生產(chǎn)者組中選擇一個(gè)Producer進(jìn)行回查
    // 所以同一個(gè)生產(chǎn)者組中如果部分機(jī)器出現(xiàn)宕機(jī)、發(fā)布重啟等問題 也不會(huì)影響回查
    Channel channel = brokerController.getProducerManager().getAvailableChannel(groupId);
    if (channel != null) {
        brokerController.getBroker2Client().checkProducerTransactionState(groupId, channel, checkTransactionStateRequestHeader, msgExt);
    } else {
        LOGGER.warn("Check transaction failed, channel is null. groupId={}", groupId);
    }
}

public void checkProducerTransactionState(
    final String group,
    final Channel channel,
    final CheckTransactionStateRequestHeader requestHeader,
    final MessageExt messageExt) throws Exception {
    // 給Producer發(fā)送消息時(shí) 指定類型是CHECK_TRANSACTION_STATE
    RemotingCommand request =
        RemotingCommand.createRequestCommand(RequestCode.CHECK_TRANSACTION_STATE, requestHeader);
    request.setBody(MessageDecoder.encode(messageExt, false));
    try {
        this.brokerController.getRemotingServer().invokeOneway(channel, request, 10);
    } catch (Exception e) {
        log.error("Check transaction failed because invoke producer exception. group={}, msgId={}, error={}",
                group, messageExt.getMsgId(), e.toString());
    }
}

9贝淤、Producer本地事務(wù)回查

public RemotingCommand processRequest(ChannelHandlerContext ctx,
        RemotingCommand request) throws RemotingCommandException {
    switch (request.getCode()) {
        case RequestCode.CHECK_TRANSACTION_STATE:
            // 判斷是Broker事務(wù)回查 檢查本地事務(wù)執(zhí)行狀態(tài)
            return this.checkTransactionState(ctx, request);
        // ......省略部分代碼
    }
    return null;
}
public void checkTransactionState(final String addr, final MessageExt msg,
    final CheckTransactionStateRequestHeader header) {
    Runnable request = new Runnable() {
        //...省略部分代碼 下文解析
    };

    // 這里正是用新建TransactionMQProducer時(shí)創(chuàng)建的線程池異步執(zhí)行提高效率
    this.checkExecutor.submit(request);
}
public void run() {
    TransactionCheckListener transactionCheckListener = DefaultMQProducerImpl.this.checkListener();
    TransactionListener transactionListener = getCheckListener();
    if (transactionCheckListener != null || transactionListener != null) {
        LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW;
        Throwable exception = null;
        try {
            if (transactionCheckListener != null) {
                localTransactionState = transactionCheckListener.checkLocalTransactionState(message);
            } else if (transactionListener != null) {
                // 檢查本地事務(wù)
                log.debug("Used new check API in transaction message");
                localTransactionState = transactionListener.checkLocalTransaction(message);
            } else {
                log.warn("CheckTransactionState, pick transactionListener by group[{}] failed", group);
            }
        } catch (Throwable e) {
            log.error("Broker call checkTransactionState, but checkLocalTransactionState exception", e);
            exception = e;
        }

        // 將本地事務(wù)狀態(tài)通知Broker
        // 和第四步Producer第一次嘗試通知Broker一樣 也是單向發(fā)送 可能發(fā)送失敗
        this.processTransactionState(
            localTransactionState,
            group,
            exception);
    } else {
        log.warn("CheckTransactionState, pick transactionCheckListener by group[{}] failed", group);
    }
}
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末柒竞,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子播聪,更是在濱河造成了極大的恐慌朽基,老刑警劉巖,帶你破解...
    沈念sama閱讀 222,378評(píng)論 6 516
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件离陶,死亡現(xiàn)場離奇詭異踩晶,居然都是意外死亡,警方通過查閱死者的電腦和手機(jī)枕磁,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 94,970評(píng)論 3 399
  • 文/潘曉璐 我一進(jìn)店門渡蜻,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人,你說我怎么就攤上這事茸苇∨挪裕” “怎么了?”我有些...
    開封第一講書人閱讀 168,983評(píng)論 0 362
  • 文/不壞的土叔 我叫張陵学密,是天一觀的道長淘衙。 經(jīng)常有香客問我,道長腻暮,這世上最難降的妖魔是什么彤守? 我笑而不...
    開封第一講書人閱讀 59,938評(píng)論 1 299
  • 正文 為了忘掉前任,我火速辦了婚禮哭靖,結(jié)果婚禮上具垫,老公的妹妹穿的比我還像新娘。我一直安慰自己试幽,他們只是感情好筝蚕,可當(dāng)我...
    茶點(diǎn)故事閱讀 68,955評(píng)論 6 398
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著铺坞,像睡著了一般起宽。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上济榨,一...
    開封第一講書人閱讀 52,549評(píng)論 1 312
  • 那天坯沪,我揣著相機(jī)與錄音,去河邊找鬼擒滑。 笑死屏箍,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的橘忱。 我是一名探鬼主播赴魁,決...
    沈念sama閱讀 41,063評(píng)論 3 422
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼钝诚!你這毒婦竟也來了颖御?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 39,991評(píng)論 0 277
  • 序言:老撾萬榮一對(duì)情侶失蹤凝颇,失蹤者是張志新(化名)和其女友劉穎潘拱,沒想到半個(gè)月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體拧略,經(jīng)...
    沈念sama閱讀 46,522評(píng)論 1 319
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡芦岂,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 38,604評(píng)論 3 342
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了垫蛆。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片禽最。...
    茶點(diǎn)故事閱讀 40,742評(píng)論 1 353
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡腺怯,死狀恐怖阶祭,靈堂內(nèi)的尸體忽然破棺而出屎即,到底是詐尸還是另有隱情光督,我是刑警寧澤唉窃,帶...
    沈念sama閱讀 36,413評(píng)論 5 351
  • 正文 年R本政府宣布,位于F島的核電站击你,受9級(jí)特大地震影響千诬,放射性物質(zhì)發(fā)生泄漏仙粱。R本人自食惡果不足惜仅叫,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 42,094評(píng)論 3 335
  • 文/蒙蒙 一帜篇、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧诫咱,春花似錦笙隙、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,572評(píng)論 0 25
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽婴渡。三九已至幻锁,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間边臼,已是汗流浹背哄尔。 一陣腳步聲響...
    開封第一講書人閱讀 33,671評(píng)論 1 274
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留柠并,地道東北人岭接。 一個(gè)月前我還...
    沈念sama閱讀 49,159評(píng)論 3 378
  • 正文 我出身青樓,卻偏偏與公主長得像臼予,于是被迫代替她去往敵國和親鸣戴。 傳聞我的和親對(duì)象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,747評(píng)論 2 361

推薦閱讀更多精彩內(nèi)容