RocketMQ事務(wù)消息源碼分析

這是我的第一篇博客符喝,思來想去,決定以RocketMQ(版本4.3.2) 源碼分析開始寫寫甜孤,不定期更新协饲,也可能隨時停更。話不多說缴川,直接開始茉稠。

從RocketMQ 3.0.8 之后 到 4.3.0 之前,在這期間的版本均不支持分布式事務(wù)消息把夸。包括這在期間使用比較廣泛的3.2.6 就是不支持分布式事務(wù)消息战惊。分布式事務(wù)是分布式系統(tǒng)開發(fā)中的一個難點(diǎn),RocketMQ是基于最終一致性的思想實(shí)現(xiàn)的扎即。rocketMQ保證本地事務(wù)成功時吞获,消息一定會發(fā)送成功并被成功消費(fèi),如果本地事務(wù)失敗了谚鄙,消息就不會被發(fā)送各拷。

放一張的官網(wǎng)的事務(wù)處理流程圖:

事務(wù)流程

可以很清晰的看到事務(wù)消息的完整流程

首先介紹下上圖中提到的2個概念

1.half消息是什么?

    half消息指的是暫時無法投遞的消息闷营。當(dāng)消息成功發(fā)送到MQ服務(wù)器烤黍,但服務(wù)器沒有收到來自生產(chǎn)者的消息的第二次確認(rèn)時,該消息被標(biāo)記為“暫時無法投遞”傻盟。此狀態(tài)中的消息稱為half消息速蕊。

2.消息狀態(tài)回查是什么意思?

    由于網(wǎng)絡(luò)斷開或生產(chǎn)者應(yīng)用程序重新啟動可能導(dǎo)致丟失事務(wù)消息的第二次確認(rèn)娘赴。當(dāng)MQ服務(wù)器發(fā)現(xiàn)消息狀態(tài)長時間為half消息時规哲,它將向消息生產(chǎn)者發(fā)送回查請求,檢查生產(chǎn)者上次本地事物的執(zhí)行結(jié)果诽表,以此為基礎(chǔ)進(jìn)行消息的提交或回滾唉锌。

上邊的圖詳細(xì)描述了rocketMQ中分布式事務(wù)的每個階段隅肥,下邊用文字描述一下這個過程:

    1.生產(chǎn)者向MQ服務(wù)器發(fā)送half消息。

    2.half消息發(fā)送成功后袄简,MQ服務(wù)器返回確認(rèn)消息給生產(chǎn)者腥放。

    3.生產(chǎn)者開始執(zhí)行本地事務(wù)。

    4.根據(jù)本地事務(wù)執(zhí)行的結(jié)果(UNKNOW绿语、commit秃症、rollback)向MQ Server發(fā)送提交或回滾消息。

    5.如果錯過了(可能因?yàn)榫W(wǎng)絡(luò)異常吕粹、生產(chǎn)者突然宕機(jī)等導(dǎo)致的異常情況)提交/回滾消息伍纫,則MQ服務(wù)器將向同一組中的每個生產(chǎn)者發(fā)送回查消息以獲取事務(wù)狀態(tài)。

    6.回查生產(chǎn)者本地事物狀態(tài)昂芜。

    7.生產(chǎn)者根據(jù)本地事務(wù)狀態(tài)發(fā)送提交/回滾消息。

    8.MQ服務(wù)器將丟棄回滾的消息赔蒲,但已提交(進(jìn)行過二次確認(rèn)的half消息)的消息將投遞給消費(fèi)者進(jìn)行消費(fèi)泌神。

從上述流程可以知道,事務(wù)消息其實(shí)只是保證了生產(chǎn)者本地事務(wù)和發(fā)送消息的原子性

消費(fèi)者在消費(fèi)事務(wù)消息時舞虱,broker處理事務(wù)消息的消費(fèi)與普通消息是一樣的欢际,若消費(fèi)不成功,則broker會重復(fù)投遞該消息16次矾兜,若仍然不成功則需要人工介入损趋。

OK,知道了rocketMQ的事物處理流程后椅寺,我們根據(jù)官網(wǎng)提供的例子跟下源碼再看看浑槽,源碼下載地址:https://github.com/apache/rocketmq

一、生產(chǎn)者發(fā)送prepare消息

生產(chǎn)者發(fā)送事務(wù)消息過程

先看下TransactionListener 的定義:


public interface TransactionListener {

    /**

    * When send transactional prepare(half) message succeed, this method will be invoked to execute local transaction.

    * 一旦生產(chǎn)者發(fā)送事務(wù)消息成功返帕,會調(diào)用這個方法提交本地事務(wù)

    *

    * @param msg Half(prepare) message

    * @param arg Custom business parameter

    * @return Transaction state

    */

    LocalTransactionState executeLocalTransaction(final Message msg, final Object arg);

    /**

    * When no response to prepare(half) message. broker will send check message to check the transaction status, and this

    * method will be invoked to get local transaction status.

    * 當(dāng)prepare發(fā)送后超時沒有返回桐玻,那么MQ服務(wù)器會回調(diào)執(zhí)行這個方法用來檢查上次本地事務(wù)執(zhí)行的狀態(tài)

    *

    * @param msg Check message

    * @return Transaction state

    */

    LocalTransactionState checkLocalTransaction(final MessageExt msg);

}

TransactionListener的實(shí)現(xiàn)類

public class TransactionListenerImpl implements TransactionListener {
    private final AtomicInteger transactionIndex = new AtomicInteger(0);

    private final ConcurrentHashMap<String, Integer> localTrans = new ConcurrentHashMap<>();

    @Override
    public LocalTransactionState executeLocalTransaction(final Message msg, final Object arg) {
        // 提交本地事物 偽代碼 提交事務(wù)成功成功之后 將狀態(tài)保存起來 方便事務(wù)狀態(tài)回查
        final int value = this.transactionIndex.getAndIncrement();
        final int status = value % 3;
        this.localTrans.put(msg.getTransactionId(), status);
        return LocalTransactionState.UNKNOW;
    }

    @Override
    public LocalTransactionState checkLocalTransaction(final MessageExt msg) {
        // 回調(diào)檢查本地事物 偽代碼 
        final Integer status = this.localTrans.get(msg.getTransactionId());
        if (null != status) {
            switch (status) {
                case 0:
                    return LocalTransactionState.UNKNOW;
                case 1:
                    return LocalTransactionState.COMMIT_MESSAGE;
                case 2:
                    return LocalTransactionState.ROLLBACK_MESSAGE;
                default:
                    return LocalTransactionState.COMMIT_MESSAGE;
            }
        }
        return LocalTransactionState.COMMIT_MESSAGE;
    }
}

在生產(chǎn)者代碼中會初始化事務(wù)監(jiān)聽對象,并注冊到TransactionMQProducer中荆萤,下面附上生產(chǎn)者的示例代碼

public class TransactionProducer {
    public static void main(final String[] args) throws MQClientException, InterruptedException {
        // 監(jiān)聽對象
        final TransactionListener transactionListener = new TransactionListenerImpl();
        // 創(chuàng)建生產(chǎn)者
        final TransactionMQProducer producer = new TransactionMQProducer("please_rename_unique_group_name");
        // 初始化線程池
        final ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() {
            @Override
            public Thread newThread(final Runnable r) {
                final Thread thread = new Thread(r);
                thread.setName("client-transaction-msg-check-thread");
                return thread;
            }
        });

        producer.setExecutorService(executorService);
        // 設(shè)置監(jiān)聽
        producer.setTransactionListener(transactionListener);
        // 啟動生產(chǎn)者
        producer.start();

        final String[] tags = new String[]{"TagA", "TagB", "TagC", "TagD", "TagE"};
        for (int i = 0; i < 10; i++) {
            try {
                final Message msg =
                        new Message("TopicTest1234", tags[i % tags.length], "KEY" + i,
                                ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
                // 發(fā)送消息
                final SendResult sendResult = producer.sendMessageInTransaction(msg, null);
                System.out.printf("%s%n", sendResult);

                Thread.sleep(10);
            } catch (MQClientException | UnsupportedEncodingException e) {
                e.printStackTrace();
            }
        }

        for (int i = 0; i < 100000; i++) {
            Thread.sleep(1000);
        }
        producer.shutdown();
    }
}

接下來分析一下消息發(fā)送镊靴,生產(chǎn)者調(diào)用sendMessageInTransaction 方法,進(jìn)入這個方法

    @Override
    public TransactionSendResult sendMessageInTransaction(final Message msg,
                                                          final Object arg) throws MQClientException {
        // transactionListener 對象在生產(chǎn)者代碼初始化完畢
        if (null == this.transactionListener) {
            throw new MQClientException("TransactionListener is null", null);
        }

        return this.defaultMQProducerImpl.sendMessageInTransaction(msg, null, arg);
    }

會繼續(xù)調(diào)用DefaultMQProducerImpl 的sendMessageInTransaction方法链韭,DefaultMQProducerImpl 是DefaultMQProducer的成員變量偏竟, 會在 TransactionMQProducer 的父類DefaultMQProducer的構(gòu)造器中初始化:

    /**
     * Constructor specifying both producer group and RPC hook.
     *
     * @param producerGroup Producer group, see the name-sake field.
     * @param rpcHook RPC hook to execute per each remoting command execution.
     */
    public DefaultMQProducer(final String producerGroup, RPCHook rpcHook) {
        this.producerGroup = producerGroup;
        defaultMQProducerImpl = new DefaultMQProducerImpl(this, rpcHook);
    }

接下來進(jìn)入DefaultMQProducerImpl 的sendMessageInTransaction方法,這個方法非常重要敞峭,實(shí)現(xiàn)了事物消息發(fā)送的關(guān)鍵邏輯(發(fā)送消息->執(zhí)行本地事物->commit/rollback消息)

    public TransactionSendResult sendMessageInTransaction(final Message msg,
                                                          final LocalTransactionExecuter localTransactionExecuter, final Object arg)
            throws MQClientException {
        // 獲取注冊到 TransactionMQProducer 注冊的 TransactionListener 如果沒有注冊 返回null
        final TransactionListener transactionListener = getCheckListener();
        if (null == localTransactionExecuter && null == transactionListener) {
            throw new MQClientException("tranExecutor is null", null);
        }
        // 驗(yàn)證消息長度 和 topic
        Validators.checkMessage(msg, this.defaultMQProducer);

        SendResult sendResult = null;
        MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TRANSACTION_PREPARED, "true");
        MessageAccessor.putProperty(msg, MessageConst.PROPERTY_PRODUCER_GROUP, this.defaultMQProducer.getProducerGroup());
        try {
            // 發(fā)送half 消息
            sendResult = this.send(msg);
        } catch (final Exception e) {
            throw new MQClientException("send message Exception", e);
        }

        LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW;
        Throwable localException = null;
        switch (sendResult.getSendStatus()) {
            // half 消息發(fā)送成功
            case SEND_OK: {
                try {
                    if (sendResult.getTransactionId() != null) {
                        msg.putUserProperty("__transactionId__", sendResult.getTransactionId());
                    }
                    final String transactionId = msg.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);
                    if (null != transactionId && !"".equals(transactionId)) {
                        msg.setTransactionId(transactionId);
                    }
                    // localTransactionExecutor 傳進(jìn)來為null踊谋,這個分支不會走
                    if (null != localTransactionExecuter) {
                        localTransactionState = localTransactionExecuter.executeLocalTransactionBranch(msg, arg);
                    } else if (transactionListener != null) {
                        this.log.debug("Used new transaction API");
                        // 提交本地事物
                        localTransactionState = transactionListener.executeLocalTransaction(msg, arg);
                    }
                    // 事務(wù)提交狀態(tài)判定
                    if (null == localTransactionState) {
                        localTransactionState = LocalTransactionState.UNKNOW;
                    }

                    if (localTransactionState != LocalTransactionState.COMMIT_MESSAGE) {
                        this.log.info("executeLocalTransactionBranch return {}", localTransactionState);
                        this.log.info(msg.toString());
                    }
                } catch (final Throwable e) {
                    this.log.info("executeLocalTransactionBranch exception", e);
                    this.log.info(msg.toString());
                    localException = e;
                }
            }
            break;
            case FLUSH_DISK_TIMEOUT:
            case FLUSH_SLAVE_TIMEOUT:
            case SLAVE_NOT_AVAILABLE:
                localTransactionState = LocalTransactionState.ROLLBACK_MESSAGE;
                break;
            default:
                break;
        }

        try {
            // 根據(jù)本地事物提交情況 發(fā)送commit or rollback 消息
            this.endTransaction(sendResult, localTransactionState, localException);
        } catch (final Exception e) {
            this.log.warn("local transaction execute " + localTransactionState + ", but end broker transaction failed", e);
        }

        final TransactionSendResult transactionSendResult = new TransactionSendResult();
        transactionSendResult.setSendStatus(sendResult.getSendStatus());
        transactionSendResult.setMessageQueue(sendResult.getMessageQueue());
        transactionSendResult.setMsgId(sendResult.getMsgId());
        transactionSendResult.setQueueOffset(sendResult.getQueueOffset());
        transactionSendResult.setTransactionId(sendResult.getTransactionId());
        transactionSendResult.setLocalTransactionState(localTransactionState);
        return transactionSendResult;
    }

進(jìn)入send() 方法

    public SendResult send(
            final Message msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
        // 默認(rèn)超時時間是3秒
        return send(msg, this.defaultMQProducer.getSendMsgTimeout());
    }
    
    public SendResult send(final Message msg,
                           final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
        return this.sendDefaultImpl(msg, CommunicationMode.SYNC, null, timeout);
    }

繼續(xù)調(diào)用sendDefaultImpl()方法

    private SendResult sendDefaultImpl(
            final Message msg,
            final CommunicationMode communicationMode,
            final SendCallback sendCallback,
            final long timeout
    ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
        this.makeSureStateOK();
        // 驗(yàn)證topic 和消息長度
        Validators.checkMessage(msg, this.defaultMQProducer);

        final long invokeID = this.random.nextLong();
        final long beginTimestampFirst = System.currentTimeMillis();
        long beginTimestampPrev = beginTimestampFirst;
        long endTimestamp = beginTimestampFirst;
        // 如果本地緩存中沒有該消息的topic 信息 則會去namesrv 中獲取topic 信息 并更新到本地緩存
        final TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
        if (topicPublishInfo != null && topicPublishInfo.ok()) {
            boolean callTimeout = false;
            MessageQueue mq = null;
            Exception exception = null;
            SendResult sendResult = null;
            // 消息發(fā)送重試次數(shù)
            final int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;
            int times = 0;
            final String[] brokersSent = new String[timesTotal];
            for (; times < timesTotal; times++) {
                final String lastBrokerName = null == mq ? null : mq.getBrokerName();
                final MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);
                if (mqSelected != null) {
                    mq = mqSelected;
                    brokersSent[times] = mq.getBrokerName();
                    try {
                        beginTimestampPrev = System.currentTimeMillis();
                        final long costTime = beginTimestampPrev - beginTimestampFirst;
                        if (timeout < costTime) {
                            callTimeout = true;
                            break;
                        }

                        // 發(fā)送消息代碼
                        sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime);
                        endTimestamp = System.currentTimeMillis();
                        this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
                        switch (communicationMode) {
                            case ASYNC:
                                return null;
                            case ONEWAY:
                                return null;
                            case SYNC:
                                if (sendResult.getSendStatus() != SendStatus.SEND_OK) {
                                    if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) {
                                        continue;
                                    }
                                }

                                return sendResult;
                            default:
                                break;
                                
                        ···省略非關(guān)鍵代碼
                        }
                    }
                }
            }
        }
    }

sendKernelImpl()方法中會設(shè)置必要屬性,并調(diào)用getMQClientAPIImpl().sendMessage方法

    private SendResult sendKernelImpl(final Message msg,
                                      final MessageQueue mq,
                                      final CommunicationMode communicationMode,
                                      final SendCallback sendCallback,
                                      final TopicPublishInfo topicPublishInfo,
                                      final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
        final long beginStartTime = System.currentTimeMillis();
        String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
        if (null == brokerAddr) {
            tryToFindTopicPublishInfo(mq.getTopic());
            brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
        }

        SendMessageContext context = null;
        if (brokerAddr != null) {
            brokerAddr = MixAll.brokerVIPChannel(this.defaultMQProducer.isSendMessageWithVIPChannel(), brokerAddr);

            final byte[] prevBody = msg.getBody();
            try {
                //for MessageBatch,ID has been set in the generating process
                if (!(msg instanceof MessageBatch)) {
                    MessageClientIDSetter.setUniqID(msg);
                }

                int sysFlag = 0;
                boolean msgBodyCompressed = false;
                if (this.tryToCompressMessage(msg)) {
                    sysFlag |= MessageSysFlag.COMPRESSED_FLAG;
                    msgBodyCompressed = true;
                }

                final String tranMsg = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);
                if (tranMsg != null && Boolean.parseBoolean(tranMsg)) {
                    sysFlag |= MessageSysFlag.TRANSACTION_PREPARED_TYPE;
                }

                if (hasCheckForbiddenHook()) {
                    final CheckForbiddenContext checkForbiddenContext = new CheckForbiddenContext();
                    checkForbiddenContext.setNameSrvAddr(this.defaultMQProducer.getNamesrvAddr());
                    checkForbiddenContext.setGroup(this.defaultMQProducer.getProducerGroup());
                    checkForbiddenContext.setCommunicationMode(communicationMode);
                    checkForbiddenContext.setBrokerAddr(brokerAddr);
                    checkForbiddenContext.setMessage(msg);
                    checkForbiddenContext.setMq(mq);
                    checkForbiddenContext.setUnitMode(this.isUnitMode());
                    this.executeCheckForbiddenHook(checkForbiddenContext);
                }

                if (this.hasSendMessageHook()) {
                    context = new SendMessageContext();
                    context.setProducer(this);
                    context.setProducerGroup(this.defaultMQProducer.getProducerGroup());
                    context.setCommunicationMode(communicationMode);
                    context.setBornHost(this.defaultMQProducer.getClientIP());
                    context.setBrokerAddr(brokerAddr);
                    context.setMessage(msg);
                    context.setMq(mq);
                    final String isTrans = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);
                    if (isTrans != null && isTrans.equals("true")) {
                        context.setMsgType(MessageType.Trans_Msg_Half);
                    }

                    if (msg.getProperty("__STARTDELIVERTIME") != null || msg.getProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL) != null) {
                        context.setMsgType(MessageType.Delay_Msg);
                    }
                    this.executeSendMessageHookBefore(context);
                }

                final SendMessageRequestHeader requestHeader = new SendMessageRequestHeader();
                requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());
                requestHeader.setTopic(msg.getTopic());
                requestHeader.setDefaultTopic(this.defaultMQProducer.getCreateTopicKey());
                requestHeader.setDefaultTopicQueueNums(this.defaultMQProducer.getDefaultTopicQueueNums());
                requestHeader.setQueueId(mq.getQueueId());
                requestHeader.setSysFlag(sysFlag);
                requestHeader.setBornTimestamp(System.currentTimeMillis());
                requestHeader.setFlag(msg.getFlag());
                requestHeader.setProperties(MessageDecoder.messageProperties2String(msg.getProperties()));
                requestHeader.setReconsumeTimes(0);
                requestHeader.setUnitMode(this.isUnitMode());
                requestHeader.setBatch(msg instanceof MessageBatch);
                if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
                    final String reconsumeTimes = MessageAccessor.getReconsumeTime(msg);
                    if (reconsumeTimes != null) {
                        requestHeader.setReconsumeTimes(Integer.valueOf(reconsumeTimes));
                        MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_RECONSUME_TIME);
                    }

                    final String maxReconsumeTimes = MessageAccessor.getMaxReconsumeTimes(msg);
                    if (maxReconsumeTimes != null) {
                        requestHeader.setMaxReconsumeTimes(Integer.valueOf(maxReconsumeTimes));
                        MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_MAX_RECONSUME_TIMES);
                    }
                }

                SendResult sendResult = null;
                switch (communicationMode) {
                    case ASYNC:
                        Message tmpMessage = msg;
                        if (msgBodyCompressed) {
                            //If msg body was compressed, msgbody should be reset using prevBody.
                            //Clone new message using commpressed message body and recover origin massage.
                            //Fix bug:https://github.com/apache/rocketmq-externals/issues/66
                            tmpMessage = MessageAccessor.cloneMessage(msg);
                            msg.setBody(prevBody);
                        }
                        final long costTimeAsync = System.currentTimeMillis() - beginStartTime;
                        if (timeout < costTimeAsync) {
                            throw new RemotingTooMuchRequestException("sendKernelImpl call timeout");
                        }
                        sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(
                                brokerAddr,
                                mq.getBrokerName(),
                                tmpMessage,
                                requestHeader,
                                timeout - costTimeAsync,
                                communicationMode,
                                sendCallback,
                                topicPublishInfo,
                                this.mQClientFactory,
                                this.defaultMQProducer.getRetryTimesWhenSendAsyncFailed(),
                                context,
                                this);
                        break;
                    case ONEWAY:
                    case SYNC:
                        // // 參數(shù)為CommunicationMode.SYNC 走這個分支
                        final long costTimeSync = System.currentTimeMillis() - beginStartTime;
                        if (timeout < costTimeSync) {
                            throw new RemotingTooMuchRequestException("sendKernelImpl call timeout");
                        }
                        // 發(fā)送消息
                        sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(
                                brokerAddr,
                                mq.getBrokerName(),
                                msg,
                                requestHeader,
                                timeout - costTimeSync,
                                communicationMode,
                                context,
                                this);
                        break;
                    default:
                        assert false;
                        break;
                }
            }
        }
    }

我們接著MQClientAPIImpl組件的sendMessage()方法繼續(xù)往下看旋讹,SYNC分支

    public SendResult sendMessage(
        final String addr,
        final String brokerName,
        final Message msg,
        final SendMessageRequestHeader requestHeader,
        final long timeoutMillis,
        final CommunicationMode communicationMode,
        final SendMessageContext context,
        final DefaultMQProducerImpl producer
    ) throws RemotingException, MQBrokerException, InterruptedException {
        return sendMessage(addr, brokerName, msg, requestHeader, timeoutMillis, communicationMode, null, null, null, 0, context, producer);
    }

    public SendResult sendMessage(
        final String addr,
        final String brokerName,
        final Message msg,
        final SendMessageRequestHeader requestHeader,
        final long timeoutMillis,
        final CommunicationMode communicationMode,
        final SendCallback sendCallback,
        final TopicPublishInfo topicPublishInfo,
        final MQClientInstance instance,
        final int retryTimesWhenSendFailed,
        final SendMessageContext context,
        final DefaultMQProducerImpl producer
    ) throws RemotingException, MQBrokerException, InterruptedException {
        long beginStartTime = System.currentTimeMillis();
        RemotingCommand request = null;
        if (sendSmartMsg || msg instanceof MessageBatch) {
            SendMessageRequestHeaderV2 requestHeaderV2 = SendMessageRequestHeaderV2.createSendMessageRequestHeaderV2(requestHeader);
            request = RemotingCommand.createRequestCommand(msg instanceof MessageBatch ? RequestCode.SEND_BATCH_MESSAGE : RequestCode.SEND_MESSAGE_V2, requestHeaderV2);
        } else {
            request = RemotingCommand.createRequestCommand(RequestCode.SEND_MESSAGE, requestHeader);
        }

        request.setBody(msg.getBody());

        switch (communicationMode) {
            case ONEWAY:
                this.remotingClient.invokeOneway(addr, request, timeoutMillis);
                return null;
            case ASYNC:
                final AtomicInteger times = new AtomicInteger();
                long costTimeAsync = System.currentTimeMillis() - beginStartTime;
                if (timeoutMillis < costTimeAsync) {
                    throw new RemotingTooMuchRequestException("sendMessage call timeout");
                }
                this.sendMessageAsync(addr, brokerName, msg, timeoutMillis - costTimeAsync, request, sendCallback, topicPublishInfo, instance,
                    retryTimesWhenSendFailed, times, context, producer);
                return null;
            case SYNC:
                // 判斷超時時間
                long costTimeSync = System.currentTimeMillis() - beginStartTime;
                if (timeoutMillis < costTimeSync) {
                    throw new RemotingTooMuchRequestException("sendMessage call timeout");
                }
                // 發(fā)送消息
                return this.sendMessageSync(addr, brokerName, msg, timeoutMillis - costTimeSync, request);
            default:
                assert false;
                break;
        }

        return null;
    }

sendMessageSync方法褪子,調(diào)用NettyRemotingClient組件的invokeSync()方法進(jìn)行處理

    private SendResult sendMessageSync(
            final String addr,
            final String brokerName,
            final Message msg,
            final long timeoutMillis,
            final RemotingCommand request
    ) throws RemotingException, MQBrokerException, InterruptedException {
        final RemotingCommand response = this.remotingClient.invokeSync(addr, request, timeoutMillis);
        assert response != null;
        return this.processSendResponse(brokerName, msg, response);
    }

invokeSync()方法中會通過nio的方式把消息發(fā)送到MQ服務(wù)器端broker量淌,invokeSync()方法中會繼續(xù)調(diào)用invokeSyncImpl()方法

    @Override
    public RemotingCommand invokeSync(String addr, final RemotingCommand request, long timeoutMillis)
        throws InterruptedException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException {
        long beginStartTime = System.currentTimeMillis();
        final Channel channel = this.getAndCreateChannel(addr);
        if (channel != null && channel.isActive()) {
            try {
                if (this.rpcHook != null) {
                    this.rpcHook.doBeforeRequest(addr, request);
                }
                long costTime = System.currentTimeMillis() - beginStartTime;
                if (timeoutMillis < costTime) {
                    throw new RemotingTimeoutException("invokeSync call timeout");
                }
                RemotingCommand response = this.invokeSyncImpl(channel, request, timeoutMillis - costTime);
                if (this.rpcHook != null) {
                    this.rpcHook.doAfterResponse(RemotingHelper.parseChannelRemoteAddr(channel), request, response);
                }
                return response;
            } catch (RemotingSendRequestException e) {
                log.warn("invokeSync: send request exception, so close the channel[{}]", addr);
                this.closeChannel(addr, channel);
                throw e;
            } catch (RemotingTimeoutException e) {
                if (nettyClientConfig.isClientCloseSocketIfTimeout()) {
                    this.closeChannel(addr, channel);
                    log.warn("invokeSync: close socket because of timeout, {}ms, {}", timeoutMillis, addr);
                }
                log.warn("invokeSync: wait response timeout exception, the channel[{}]", addr);
                throw e;
            }
        } else {
            this.closeChannel(addr, channel);
            throw new RemotingConnectException(addr);
        }
    }
    
    public RemotingCommand invokeSyncImpl(final Channel channel, final RemotingCommand request,
        final long timeoutMillis)
        throws InterruptedException, RemotingSendRequestException, RemotingTimeoutException {
        final int opaque = request.getOpaque();

        try {
            final ResponseFuture responseFuture = new ResponseFuture(channel, opaque, timeoutMillis, null, null);
            this.responseTable.put(opaque, responseFuture);
            final SocketAddress addr = channel.remoteAddress();
            // netty
            channel.writeAndFlush(request).addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture f) throws Exception {
                    if (f.isSuccess()) {
                        responseFuture.setSendRequestOK(true);
                        return;
                    } else {
                        responseFuture.setSendRequestOK(false);
                    }

                    responseTable.remove(opaque);
                    responseFuture.setCause(f.cause());
                    responseFuture.putResponse(null);
                    log.warn("send a request command to channel <" + addr + "> failed.");
                }
            });

            RemotingCommand responseCommand = responseFuture.waitResponse(timeoutMillis);
            if (null == responseCommand) {
                if (responseFuture.isSendRequestOK()) {
                    throw new RemotingTimeoutException(RemotingHelper.parseSocketAddressAddr(addr), timeoutMillis,
                        responseFuture.getCause());
                } else {
                    throw new RemotingSendRequestException(RemotingHelper.parseSocketAddressAddr(addr), responseFuture.getCause());
                }
            }

            return responseCommand;
        } finally {
            this.responseTable.remove(opaque);
        }
    }

MQ服務(wù)器端broker處理完發(fā)送過來的消息之后會給生產(chǎn)者DefaultMQProducerImpl一個返回值SendResult。

到這里為止我們執(zhí)行到事物消息的哪個階段了呢嫌褪?

現(xiàn)在我們回到關(guān)鍵流程中去呀枢,關(guān)鍵流程就在DefaultMQProducerImpl組件的sendMessageInTransaction()方法中

然后我們發(fā)現(xiàn)其實(shí)我們剛把這段代碼sendResult = this.send(msg)執(zhí)行完畢并拿到了一個sendResult返回值,這個返回值代表著我們發(fā)送這條消息的一個結(jié)果(發(fā)送成功還是失斄础)

    public TransactionSendResult sendMessageInTransaction(final Message msg,
                                                          final LocalTransactionExecuter localTransactionExecuter, final Object arg)
            throws MQClientException {
        // 獲取注冊到 TransactionMQProducer 注冊的 TransactionListener 如果沒有注冊 返回null
        final TransactionListener transactionListener = getCheckListener();
        if (null == localTransactionExecuter && null == transactionListener) {
            throw new MQClientException("tranExecutor is null", null);
        }
        // 驗(yàn)證消息長度 和 topic
        Validators.checkMessage(msg, this.defaultMQProducer);

        SendResult sendResult = null;
        ···省略
        try {
            // 發(fā)送half 消息
            sendResult = this.send(msg);
        } catch (final Exception e) {
            throw new MQClientException("send message Exception", e);
        }
    }

接下來會根據(jù)sendResult返回值來執(zhí)行不同的邏輯處理

如果sendResult為SEND_OK裙秋,即發(fā)送prepare消息成功,那么就開始執(zhí)行本地事物(即TransactionListenerImpl組件的executeLocalTransaction()方法)

本地事物返回值是一個枚舉localTransactionState(有3種取值 COMMIT_MESSAGE , ROLLBACK_MESSAGE , UNKNOW)

如果發(fā)送prepare消息失敗缨伊,則直接設(shè)置localTransactionState的值為ROLLBACK_MESSAGE摘刑。最后執(zhí)行endTransaction()方法進(jìn)行prepare消息的二次確認(rèn)。源碼如下刻坊,這段源碼上面也放上去了

    public TransactionSendResult sendMessageInTransaction(final Message msg,
                                                          final LocalTransactionExecuter localTransactionExecuter, final Object arg)
            throws MQClientException {
        // 獲取注冊到 TransactionMQProducer 注冊的 TransactionListener 如果沒有注冊 返回null
        final TransactionListener transactionListener = getCheckListener();
        if (null == localTransactionExecuter && null == transactionListener) {
            throw new MQClientException("tranExecutor is null", null);
        }
        // 驗(yàn)證消息長度 和 topic
        Validators.checkMessage(msg, this.defaultMQProducer);

        SendResult sendResult = null;
        MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TRANSACTION_PREPARED, "true");
        MessageAccessor.putProperty(msg, MessageConst.PROPERTY_PRODUCER_GROUP, this.defaultMQProducer.getProducerGroup());
        try {
            // 發(fā)送half 消息
            sendResult = this.send(msg);
        } catch (final Exception e) {
            throw new MQClientException("send message Exception", e);
        }

        LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW;
        Throwable localException = null;
        switch (sendResult.getSendStatus()) {
            // half 消息發(fā)送成功
            case SEND_OK: {
                try {
                    if (sendResult.getTransactionId() != null) {
                        msg.putUserProperty("__transactionId__", sendResult.getTransactionId());
                    }
                    final String transactionId = msg.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);
                    if (null != transactionId && !"".equals(transactionId)) {
                        msg.setTransactionId(transactionId);
                    }
                    // localTransactionExecutor 傳進(jìn)來為null枷恕,這個分支不會走
                    if (null != localTransactionExecuter) {
                        localTransactionState = localTransactionExecuter.executeLocalTransactionBranch(msg, arg);
                    } else if (transactionListener != null) {
                        this.log.debug("Used new transaction API");
                        // 提交本地事物
                        localTransactionState = transactionListener.executeLocalTransaction(msg, arg);
                    }
                    // 事務(wù)提交狀態(tài)判定
                    if (null == localTransactionState) {
                        localTransactionState = LocalTransactionState.UNKNOW;
                    }

                    if (localTransactionState != LocalTransactionState.COMMIT_MESSAGE) {
                        this.log.info("executeLocalTransactionBranch return {}", localTransactionState);
                        this.log.info(msg.toString());
                    }
                } catch (final Throwable e) {
                    this.log.info("executeLocalTransactionBranch exception", e);
                    this.log.info(msg.toString());
                    localException = e;
                }
            }
            break;
            case FLUSH_DISK_TIMEOUT:
            case FLUSH_SLAVE_TIMEOUT:
            case SLAVE_NOT_AVAILABLE:
                localTransactionState = LocalTransactionState.ROLLBACK_MESSAGE;
                break;
            default:
                break;
        }

        try {
            // 根據(jù)本地事物提交情況 發(fā)送commit or rollback 消息
            this.endTransaction(sendResult, localTransactionState, localException);
        } catch (final Exception e) {
            this.log.warn("local transaction execute " + localTransactionState + ", but end broker transaction failed", e);
        }

        final TransactionSendResult transactionSendResult = new TransactionSendResult();
        transactionSendResult.setSendStatus(sendResult.getSendStatus());
        transactionSendResult.setMessageQueue(sendResult.getMessageQueue());
        transactionSendResult.setMsgId(sendResult.getMsgId());
        transactionSendResult.setQueueOffset(sendResult.getQueueOffset());
        transactionSendResult.setTransactionId(sendResult.getTransactionId());
        transactionSendResult.setLocalTransactionState(localTransactionState);
        return transactionSendResult;
    }

看下endTransaction()方法

    public void endTransaction(
            final SendResult sendResult,
            final LocalTransactionState localTransactionState,
            final Throwable localException) throws RemotingException, MQBrokerException, InterruptedException, UnknownHostException {
        final MessageId id;
        if (sendResult.getOffsetMsgId() != null) {
            id = MessageDecoder.decodeMessageId(sendResult.getOffsetMsgId());
        } else {
            id = MessageDecoder.decodeMessageId(sendResult.getMsgId());
        }
        final String transactionId = sendResult.getTransactionId();
        final String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(sendResult.getMessageQueue().getBrokerName());
        final EndTransactionRequestHeader requestHeader = new EndTransactionRequestHeader();
        requestHeader.setTransactionId(transactionId);
        requestHeader.setCommitLogOffset(id.getOffset());
        //根據(jù)本地事務(wù)執(zhí)行的結(jié)果去設(shè)置請求頭信息commitOrRollback
        switch (localTransactionState) {
            case COMMIT_MESSAGE:
                requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_COMMIT_TYPE);
                break;
            case ROLLBACK_MESSAGE:
                requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_ROLLBACK_TYPE);
                break;
            case UNKNOW:
                requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_NOT_TYPE);
                break;
            default:
                break;
        }

        requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());
        requestHeader.setTranStateTableOffset(sendResult.getQueueOffset());
        requestHeader.setMsgId(sendResult.getMsgId());
        final String remark = localException != null ? ("executeLocalTransactionBranch exception: " + localException.toString()) : null;
        this.mQClientFactory.getMQClientAPIImpl().endTransactionOneway(brokerAddr, requestHeader, remark,
                this.defaultMQProducer.getSendMsgTimeout());
    }
    
    public void endTransactionOneway(
            final String addr,
            final EndTransactionRequestHeader requestHeader,
            final String remark,
            final long timeoutMillis
    ) throws RemotingException, MQBrokerException, InterruptedException {
        final RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.END_TRANSACTION, requestHeader);

        request.setRemark(remark);

        //將request寫入channel中,通過socket把消息發(fā)送給消費(fèi)端谭胚,后邊主要是netty的事兒了
        this.remotingClient.invokeOneway(addr, request, timeoutMillis);
    }
    
    public void invokeOneway(String addr, RemotingCommand request, long timeoutMillis) throws InterruptedException,
        RemotingConnectException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException {
        final Channel channel = this.getAndCreateChannel(addr);
        if (channel != null && channel.isActive()) {
            try {
                if (this.rpcHook != null) {
                    this.rpcHook.doBeforeRequest(addr, request);
                }
                this.invokeOnewayImpl(channel, request, timeoutMillis);
            } catch (RemotingSendRequestException e) {
                log.warn("invokeOneway: send request exception, so close the channel[{}]", addr);
                this.closeChannel(addr, channel);
                throw e;
            }
        } else {
            this.closeChannel(addr, channel);
            throw new RemotingConnectException(addr);
        }
    }

最終調(diào)用NettyRemotingClient組件的invokeOneway()方法完成prepare消息的二次確認(rèn)徐块,如果localTransactionState的值為COMMIT_MESSAGE時則MQ服務(wù)端會將消息投遞給消費(fèi)者進(jìn)行消費(fèi);

但是如果localTransactionState的值為ROLLBACK_MESSAGE時則MQ服務(wù)端會刪除已經(jīng)存儲的prepare消息灾而,此時消費(fèi)者將沒有機(jī)會消費(fèi)到這條消息胡控。

二、Broker端對消息的處理

(1)Broker處理prepare消息

最終調(diào)用SendMessageProcessor組件的sendMessage方法處理事務(wù)消息旁趟,代碼如下

    private RemotingCommand sendMessage(final ChannelHandlerContext ctx,
                                        final RemotingCommand request,
                                        final SendMessageContext sendMessageContext,
                                        final SendMessageRequestHeader requestHeader) throws RemotingCommandException {

        final RemotingCommand response = RemotingCommand.createResponseCommand(SendMessageResponseHeader.class);
        final SendMessageResponseHeader responseHeader = (SendMessageResponseHeader) response.readCustomHeader();

        response.setOpaque(request.getOpaque());

        response.addExtField(MessageConst.PROPERTY_MSG_REGION, this.brokerController.getBrokerConfig().getRegionId());
        response.addExtField(MessageConst.PROPERTY_TRACE_SWITCH, String.valueOf(this.brokerController.getBrokerConfig().isTraceOn()));

        log.debug("receive SendMessage request command, {}", request);

        final long startTimstamp = this.brokerController.getBrokerConfig().getStartAcceptSendRequestTimeStamp();
        if (this.brokerController.getMessageStore().now() < startTimstamp) {
            response.setCode(ResponseCode.SYSTEM_ERROR);
            response.setRemark(String.format("broker unable to service, until %s", UtilAll.timeMillisToHumanString2(startTimstamp)));
            return response;
        }

        response.setCode(-1);
        super.msgCheck(ctx, requestHeader, response);
        if (response.getCode() != -1) {
            return response;
        }

        final byte[] body = request.getBody();

        int queueIdInt = requestHeader.getQueueId();
        final TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic());

        if (queueIdInt < 0) {
            queueIdInt = Math.abs(this.random.nextInt() % 99999999) % topicConfig.getWriteQueueNums();
        }

        final MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
        msgInner.setTopic(requestHeader.getTopic());
        msgInner.setQueueId(queueIdInt);

        if (!handleRetryAndDLQ(requestHeader, response, request, msgInner, topicConfig)) {
            return response;
        }

        msgInner.setBody(body);
        msgInner.setFlag(requestHeader.getFlag());
        MessageAccessor.setProperties(msgInner, MessageDecoder.string2messageProperties(requestHeader.getProperties()));
        msgInner.setPropertiesString(requestHeader.getProperties());
        msgInner.setBornTimestamp(requestHeader.getBornTimestamp());
        msgInner.setBornHost(ctx.channel().remoteAddress());
        msgInner.setStoreHost(this.getStoreHost());
        msgInner.setReconsumeTimes(requestHeader.getReconsumeTimes() == null ? 0 : requestHeader.getReconsumeTimes());
        PutMessageResult putMessageResult = null;
        final Map<String, String> oriProps = MessageDecoder.string2messageProperties(requestHeader.getProperties());
        //判斷是否是事務(wù)消息 如果是事務(wù)消息則用事務(wù)消息的邏輯處理
        final String traFlag = oriProps.get(MessageConst.PROPERTY_TRANSACTION_PREPARED);
        if (traFlag != null && Boolean.parseBoolean(traFlag)) {
            if (this.brokerController.getBrokerConfig().isRejectTransactionMessage()) {
                response.setCode(ResponseCode.NO_PERMISSION);
                response.setRemark(
                        "the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1()
                                + "] sending transaction message is forbidden");
                return response;
            }
            // 處理prepare消息
            putMessageResult = this.brokerController.getTransactionalMessageService().prepareMessage(msgInner);
        } else {
            putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner);
        }

        return handlePutMessageResult(putMessageResult, response, request, msgInner, responseHeader, sendMessageContext, ctx, queueIdInt);

    }

TransactionalMessageService的prepareMessage()方法

    public PutMessageResult prepareMessage(MessageExtBrokerInner messageInner) {
        return transactionalMessageBridge.putHalfMessage(messageInner);
    }

TransactionalMessageBridge的putHalfMessage()方法進(jìn)行調(diào)用保存方法

    public PutMessageResult putHalfMessage(MessageExtBrokerInner messageInner) {
        return store.putMessage(parseHalfMessageInner(messageInner));
    }
(2)Broker處理prepare消息的二次確認(rèn)昼激,即結(jié)束事務(wù)消息的處理

2.1 會判斷本次事務(wù)的最終狀態(tài),如果是Commit就改變事物消息狀態(tài)锡搜,使消費(fèi)者可見橙困,此時消費(fèi)者就可以消費(fèi)消息了

2.2 如果是Rollback,那么就刪除在broker端存儲的事物消息耕餐,此時消費(fèi)者就永遠(yuǎn)消費(fèi)不到這條消息

    public RemotingCommand processRequest(final ChannelHandlerContext ctx, final RemotingCommand request) throws
            RemotingCommandException {
        final RemotingCommand response = RemotingCommand.createResponseCommand(null);
        final EndTransactionRequestHeader requestHeader =
                (EndTransactionRequestHeader) request.decodeCommandCustomHeader(EndTransactionRequestHeader.class);
        LOGGER.info("Transaction request:{}", requestHeader);
        if (BrokerRole.SLAVE == this.brokerController.getMessageStoreConfig().getBrokerRole()) {
            response.setCode(ResponseCode.SLAVE_NOT_AVAILABLE);
            LOGGER.warn("Message store is slave mode, so end transaction is forbidden. ");
            return response;
        }

        if (requestHeader.getFromTransactionCheck()) {
            switch (requestHeader.getCommitOrRollback()) {
                case MessageSysFlag.TRANSACTION_NOT_TYPE: {
                    LOGGER.warn("Check producer[{}] transaction state, but it's pending status."
                                    + "RequestHeader: {} Remark: {}",
                            RemotingHelper.parseChannelRemoteAddr(ctx.channel()),
                            requestHeader.toString(),
                            request.getRemark());
                    return null;
                }

                case MessageSysFlag.TRANSACTION_COMMIT_TYPE: {
                    LOGGER.warn("Check producer[{}] transaction state, the producer commit the message."
                                    + "RequestHeader: {} Remark: {}",
                            RemotingHelper.parseChannelRemoteAddr(ctx.channel()),
                            requestHeader.toString(),
                            request.getRemark());

                    break;
                }

                case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE: {
                    LOGGER.warn("Check producer[{}] transaction state, the producer rollback the message."
                                    + "RequestHeader: {} Remark: {}",
                            RemotingHelper.parseChannelRemoteAddr(ctx.channel()),
                            requestHeader.toString(),
                            request.getRemark());
                    break;
                }
                default:
                    return null;
            }
        } else {
            switch (requestHeader.getCommitOrRollback()) {
                case MessageSysFlag.TRANSACTION_NOT_TYPE: {
                    LOGGER.warn("The producer[{}] end transaction in sending message,  and it's pending status."
                                    + "RequestHeader: {} Remark: {}",
                            RemotingHelper.parseChannelRemoteAddr(ctx.channel()),
                            requestHeader.toString(),
                            request.getRemark());
                    return null;
                }

                case MessageSysFlag.TRANSACTION_COMMIT_TYPE: {
                    break;
                }

                case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE: {
                    LOGGER.warn("The producer[{}] end transaction in sending message, rollback the message."
                                    + "RequestHeader: {} Remark: {}",
                            RemotingHelper.parseChannelRemoteAddr(ctx.channel()),
                            requestHeader.toString(),
                            request.getRemark());
                    break;
                }
                default:
                    return null;
            }
        }
        OperationResult result = new OperationResult();
        // 如果收到的是Commit事務(wù)消息
        if (MessageSysFlag.TRANSACTION_COMMIT_TYPE == requestHeader.getCommitOrRollback()) {
            // 從commitLog中查出原始的prepared消息
            // 這要求了Producer在發(fā)送最終的Commit消息的時候一定要指定是同一個Broker
            result = this.brokerController.getTransactionalMessageService().commitMessage(requestHeader);
            if (result.getResponseCode() == ResponseCode.SUCCESS) {
                // 檢查獲取到的消息是否和當(dāng)前消息匹配
                final RemotingCommand res = checkPrepareMessage(result.getPrepareMessage(), requestHeader);
                if (res.getCode() == ResponseCode.SUCCESS) {
                    // 使用原始的prepared消息屬性纷宇,構(gòu)建最終發(fā)給consumer的消息
                    final MessageExtBrokerInner msgInner = endMessageTransaction(result.getPrepareMessage());
                    msgInner.setSysFlag(MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(), requestHeader.getCommitOrRollback()));
                    msgInner.setQueueOffset(requestHeader.getTranStateTableOffset());
                    msgInner.setPreparedTransactionOffset(requestHeader.getCommitLogOffset());
                    msgInner.setStoreTimestamp(result.getPrepareMessage().getStoreTimestamp());
                    // 調(diào)用MessageStore的消息存儲接口提交消息,使用真正的topic和queueId
                    final RemotingCommand sendResult = sendFinalMessage(msgInner);
                    if (sendResult.getCode() == ResponseCode.SUCCESS) {
                        // 設(shè)置Prepared消息的標(biāo)記位為delete
                        this.brokerController.getTransactionalMessageService().deletePrepareMessage(result.getPrepareMessage());
                    }
                    return sendResult;
                }
                return res;
            }
        } else if (MessageSysFlag.TRANSACTION_ROLLBACK_TYPE == requestHeader.getCommitOrRollback()) {
            // 如果收到的是Rollback事務(wù)消息
            result = this.brokerController.getTransactionalMessageService().rollbackMessage(requestHeader);
            if (result.getResponseCode() == ResponseCode.SUCCESS) {
                final RemotingCommand res = checkPrepareMessage(result.getPrepareMessage(), requestHeader);
                if (res.getCode() == ResponseCode.SUCCESS) {
                    // 設(shè)置Prepared消息的標(biāo)記位為delete
                    this.brokerController.getTransactionalMessageService().deletePrepareMessage(result.getPrepareMessage());
                }
                return res;
            }
        }
        response.setCode(result.getResponseCode());
        response.setRemark(result.getResponseRemark());
        return response;
    }

消息Commit/Rollback后蛾方,理論上需要將原來的Prepared消息刪除像捶,這樣Broker就能知道哪些消息一直沒收到Commit/Rollback,需要去Producer回查狀態(tài)桩砰。但是如果直接修改CommitLog文件拓春,這個代價是很大的,所以RocketMQ是通過生成一個新的delete消息來標(biāo)記的亚隅。這樣硼莽,Broker在檢查的時候只需要看下Prepared消息有沒有對應(yīng)的delete消息就可以了,我們看看這個這個操作:

    public boolean deletePrepareMessage(MessageExt msgExt) {
        // 刪除標(biāo)志
        if (this.transactionalMessageBridge.putOpMessage(msgExt, TransactionalMessageUtil.REMOVETAG)) {
            log.info("Transaction op message write successfully. messageId={}, queueId={} msgExt:{}", msgExt.getMsgId(), msgExt.getQueueId(), msgExt);
            return true;
        } else {
            log.error("Transaction op message write failed. messageId is {}, queueId is {}", msgExt.getMsgId(), msgExt.getQueueId());
            return false;
        }
    }

三、事務(wù)消息是如何處理回查的懂鸵?

broker在啟動時會啟動線程回查的服務(wù)偏螺,在TransactionMessageCheckService的run方法中,該方法會執(zhí)行到onWaitEnd方法:

    public void run() {
        log.info("Start transaction check service thread!");
        final long checkInterval = this.brokerController.getBrokerConfig().getTransactionCheckInterval();
        while (!this.isStopped()) {
            this.waitForRunning(checkInterval);
        }
        log.info("End transaction check service thread!");
    }
    
    protected void waitForRunning(long interval) {
        if (hasNotified.compareAndSet(true, false)) {
            this.onWaitEnd();
            return;
        }

        //entry to wait
        waitPoint.reset();

        try {
            waitPoint.await(interval, TimeUnit.MILLISECONDS);
        } catch (InterruptedException e) {
            log.error("Interrupted", e);
        } finally {
            hasNotified.set(false);
            this.onWaitEnd();
        }
    }

    protected void onWaitEnd() {
        // 超時時間
        final long timeout = this.brokerController.getBrokerConfig().getTransactionTimeOut();
        // 最大檢查次數(shù)
        final int checkMax = this.brokerController.getBrokerConfig().getTransactionCheckMax();
        // 當(dāng)前時間
        final long begin = System.currentTimeMillis();
        log.info("Begin to check prepare message, begin time:{}", begin);
        // 開始檢查
        this.brokerController.getTransactionalMessageService().check(timeout, checkMax, this.brokerController.getTransactionalMessageCheckListener());
        log.info("End to check prepare message, consumed time:{}", System.currentTimeMillis() - begin);
    }

通過netty傳遞消息最終調(diào)用到TransactionListenerImpl組件的checkLocalTransaction()方法來檢查本地事物的狀態(tài)

四匆光、總結(jié)

本篇文章從生產(chǎn)者發(fā)送prepare消息套像、Broker端對消息的處理以及事務(wù)消息是如何處理回查的3個階段詳細(xì)分析了rocketMQ的源碼,按照上述代碼執(zhí)行順序大家完全可以跟著走讀下源碼终息。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末夺巩,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子周崭,更是在濱河造成了極大的恐慌柳譬,老刑警劉巖,帶你破解...
    沈念sama閱讀 218,451評論 6 506
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件续镇,死亡現(xiàn)場離奇詭異美澳,居然都是意外死亡,警方通過查閱死者的電腦和手機(jī)摸航,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,172評論 3 394
  • 文/潘曉璐 我一進(jìn)店門制跟,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人忙厌,你說我怎么就攤上這事〗龋” “怎么了逢净?”我有些...
    開封第一講書人閱讀 164,782評論 0 354
  • 文/不壞的土叔 我叫張陵,是天一觀的道長歼指。 經(jīng)常有香客問我爹土,道長,這世上最難降的妖魔是什么踩身? 我笑而不...
    開封第一講書人閱讀 58,709評論 1 294
  • 正文 為了忘掉前任胀茵,我火速辦了婚禮,結(jié)果婚禮上挟阻,老公的妹妹穿的比我還像新娘琼娘。我一直安慰自己,他們只是感情好附鸽,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,733評論 6 392
  • 文/花漫 我一把揭開白布脱拼。 她就那樣靜靜地躺著,像睡著了一般坷备。 火紅的嫁衣襯著肌膚如雪熄浓。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 51,578評論 1 305
  • 那天省撑,我揣著相機(jī)與錄音赌蔑,去河邊找鬼俯在。 笑死,一個胖子當(dāng)著我的面吹牛娃惯,可吹牛的內(nèi)容都是我干的跷乐。 我是一名探鬼主播,決...
    沈念sama閱讀 40,320評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼石景,長吁一口氣:“原來是場噩夢啊……” “哼劈猿!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起潮孽,我...
    開封第一講書人閱讀 39,241評論 0 276
  • 序言:老撾萬榮一對情侶失蹤揪荣,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后往史,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體仗颈,經(jīng)...
    沈念sama閱讀 45,686評論 1 314
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,878評論 3 336
  • 正文 我和宋清朗相戀三年椎例,在試婚紗的時候發(fā)現(xiàn)自己被綠了挨决。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 39,992評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡订歪,死狀恐怖脖祈,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情刷晋,我是刑警寧澤盖高,帶...
    沈念sama閱讀 35,715評論 5 346
  • 正文 年R本政府宣布,位于F島的核電站眼虱,受9級特大地震影響喻奥,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜捏悬,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,336評論 3 330
  • 文/蒙蒙 一撞蚕、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧,春花似錦、人聲如沸掘宪。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,912評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至摧莽,卻和暖如春庙洼,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 33,040評論 1 270
  • 我被黑心中介騙來泰國打工油够, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留蚁袭,地道東北人。 一個月前我還...
    沈念sama閱讀 48,173評論 3 370
  • 正文 我出身青樓石咬,卻偏偏與公主長得像揩悄,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子鬼悠,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,947評論 2 355