2.RocketMQ-事務(wù)源碼解析

RocketMQ-事務(wù)消息

來自官網(wǎng):

rocketmq_design_10.png

上圖說明了事務(wù)消息的大致方案翩隧,其中分為兩個流程:正常事務(wù)消息的發(fā)送及提交惭载、事務(wù)消息的補償流程科侈。

1.事務(wù)消息發(fā)送及提交:

(1) 發(fā)送消息(half消息)已骇。

(2) 服務(wù)端響應(yīng)消息寫入結(jié)果闭翩。

(3) 根據(jù)發(fā)送結(jié)果執(zhí)行本地事務(wù)(如果寫入失敗误债,此時half消息對業(yè)務(wù)不可見浸船,本地邏輯不執(zhí)行)。

(4) 根據(jù)本地事務(wù)狀態(tài)執(zhí)行Commit或者Rollback(Commit操作生成消息索引寝蹈,消息對消費者可見)

2.補償流程:

(1) 對沒有Commit/Rollback的事務(wù)消息(pending狀態(tài)的消息)李命,從服務(wù)端發(fā)起一次“回查”

(2) Producer收到回查消息,檢查回查消息對應(yīng)的本地事務(wù)的狀態(tài)

(3) 根據(jù)本地事務(wù)狀態(tài)箫老,重新Commit或者Rollback

其中封字,補償階段用于解決消息Commit或者Rollback發(fā)生超時或者失敗的情況。

上面的圖片添加上數(shù)據(jù)的存儲流轉(zhuǎn):

image.png

RocketMQ 版本4.3.2

1.事務(wù)消息發(fā)送和提交

我們通過官網(wǎng)的例子來查看源碼耍鬓,官網(wǎng)例子如下:

public class TransactionProducer {
    public static void main(String[] args) throws MQClientException, InterruptedException {
        TransactionListener transactionListener = new TransactionListenerImpl();
        TransactionMQProducer producer = new TransactionMQProducer("please_rename_unique_group_name");
        ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() {
            @Override
            public Thread newThread(Runnable r) {
                Thread thread = new Thread(r);
                thread.setName("client-transaction-msg-check-thread");
                return thread;
            }
        });
        //設(shè)置線程池
        producer.setExecutorService(executorService);
        //設(shè)置本地事務(wù)的監(jiān)聽器阔籽,便于執(zhí)行相關(guān)邏輯
        producer.setTransactionListener(transactionListener);
        producer.start();

        String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};
        for (int i = 0; i < 10; i++) {
            try {
                Message msg =
                    new Message("TopicTest1234", tags[i % tags.length], "KEY" + i,
                        ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
                SendResult sendResult = producer.sendMessageInTransaction(msg, null);
                System.out.printf("%s%n", sendResult);

                Thread.sleep(10);
            } catch (MQClientException | UnsupportedEncodingException e) {
                e.printStackTrace();
            }
        }

        for (int i = 0; i < 100000; i++) {
            Thread.sleep(1000);
        }
        producer.shutdown();
    }
}
public class TransactionListenerImpl implements TransactionListener {
    private AtomicInteger transactionIndex = new AtomicInteger(0);

    private ConcurrentHashMap<String, Integer> localTrans = new ConcurrentHashMap<>();

    //這里是執(zhí)行本地事務(wù)的方法
    @Override
    public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
        int value = transactionIndex.getAndIncrement();
        int status = value % 3;
        localTrans.put(msg.getTransactionId(), status);
        return LocalTransactionState.UNKNOW;
    }

    //這個是服務(wù)端回調(diào)本地事務(wù)成功的方法
    @Override
    public LocalTransactionState checkLocalTransaction(MessageExt msg) {
        Integer status = localTrans.get(msg.getTransactionId());
        if (null != status) {
            switch (status) {
                case 0:
                    return LocalTransactionState.UNKNOW;
                case 1:
                    return LocalTransactionState.COMMIT_MESSAGE;
                case 2:
                    return LocalTransactionState.ROLLBACK_MESSAGE;
                default:
                    return LocalTransactionState.COMMIT_MESSAGE;
            }
        }
        return LocalTransactionState.COMMIT_MESSAGE;
    }
}

1)先看客戶端發(fā)送邏輯流程

//客戶端發(fā)送
TransactionMQProducer#sendMessageInTransaction
DefaultMQProducerImpl#sendMessageInTransaction
    -DefaultMQProducerImpl#send
        = NOTOK   ->    state = ROLLBACK_MESSAGE
        = sendOk  ->    state = TransactionListener#executeLocalTransaction
    -DefaultMQProducerImpl#endTransaction
    -MQClientAPIImpl#endTransactionOneway
        -RemotingClient#invokeOneway

核心源碼 在DefaultMQProducerImpl#sendMessageInTransaction中具體看一下

public TransactionSendResult sendMessageInTransaction(final Message msg,
                                                      final LocalTransactionExecuter localTransactionExecuter, final Object arg)
    throws MQClientException {
    TransactionListener transactionListener = getCheckListener();
    if (null == localTransactionExecuter && null == transactionListener) {
        throw new MQClientException("tranExecutor is null", null);
    }
    Validators.checkMessage(msg, this.defaultMQProducer);

    SendResult sendResult = null;
    //這里添加了事務(wù)預(yù)消息的標(biāo)識
    MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TRANSACTION_PREPARED, "true");
    MessageAccessor.putProperty(msg, MessageConst.PROPERTY_PRODUCER_GROUP, this.defaultMQProducer.getProducerGroup());
    try {
        //發(fā)送消息
        sendResult = this.send(msg);
    } catch (Exception e) {
        throw new MQClientException("send message Exception", e);
    }

    LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW;
    Throwable localException = null;
    switch (sendResult.getSendStatus()) {
        case SEND_OK: { //發(fā)送到broker的消息發(fā)送ok時
            try {
                if (sendResult.getTransactionId() != null) {
                    msg.putUserProperty("__transactionId__", sendResult.getTransactionId());
                }
                String transactionId = msg.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);
                if (null != transactionId && !"".equals(transactionId)) {
                    msg.setTransactionId(transactionId);
                }
                //默認(rèn)之類的localTransactionExecuter是空的
                if (null != localTransactionExecuter) {
                    localTransactionState = localTransactionExecuter.executeLocalTransactionBranch(msg, arg);
                } else if (transactionListener != null) {
                    log.debug("Used new transaction API");
                    //這里執(zhí)行本地實現(xiàn)的listerner的本地事務(wù)
                    localTransactionState = transactionListener.executeLocalTransaction(msg, arg);
                }
                if (null == localTransactionState) {
                    localTransactionState = LocalTransactionState.UNKNOW;
                }

                if (localTransactionState != LocalTransactionState.COMMIT_MESSAGE) {
                    log.info("executeLocalTransactionBranch return {}", localTransactionState);
                    log.info(msg.toString());
                }
            } catch (Throwable e) {
                log.info("executeLocalTransactionBranch exception", e);
                log.info(msg.toString());
                localException = e;
            }
        }
            break;  //下面其他狀態(tài)設(shè)置rollback
        case FLUSH_DISK_TIMEOUT:
        case FLUSH_SLAVE_TIMEOUT:
        case SLAVE_NOT_AVAILABLE:
            localTransactionState = LocalTransactionState.ROLLBACK_MESSAGE;
            break;
        default:
            break;
    }

    try {
        
        //這里結(jié)束本地事務(wù)后的處理邏輯
        this.endTransaction(sendResult, localTransactionState, localException);
    } catch (Exception e) {
        log.warn("local transaction execute " + localTransactionState + ", but end broker transaction failed", e);
    }

    TransactionSendResult transactionSendResult = new TransactionSendResult();
    transactionSendResult.setSendStatus(sendResult.getSendStatus());
    transactionSendResult.setMessageQueue(sendResult.getMessageQueue());
    transactionSendResult.setMsgId(sendResult.getMsgId());
    transactionSendResult.setQueueOffset(sendResult.getQueueOffset());
    transactionSendResult.setTransactionId(sendResult.getTransactionId());
    transactionSendResult.setLocalTransactionState(localTransactionState);
    return transactionSendResult;
}



//在看看結(jié)束本地事務(wù)后消息怎么處理
public void endTransaction(
    final SendResult sendResult,
    final LocalTransactionState localTransactionState,
    final Throwable localException) throws RemotingException, MQBrokerException, InterruptedException, UnknownHostException {
    final MessageId id;
    if (sendResult.getOffsetMsgId() != null) {
        id = MessageDecoder.decodeMessageId(sendResult.getOffsetMsgId());
    } else {
        id = MessageDecoder.decodeMessageId(sendResult.getMsgId());
    }
    String transactionId = sendResult.getTransactionId();
    final String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(sendResult.getMessageQueue().getBrokerName());
    EndTransactionRequestHeader requestHeader = new EndTransactionRequestHeader();
    requestHeader.setTransactionId(transactionId);
    requestHeader.setCommitLogOffset(id.getOffset());
    //根據(jù)本地事務(wù)執(zhí)行狀態(tài)設(shè)置請求頭提交還是徽標(biāo)標(biāo)識
    switch (localTransactionState) {
        case COMMIT_MESSAGE:
            requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_COMMIT_TYPE);
            break;
        case ROLLBACK_MESSAGE:
            requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_ROLLBACK_TYPE);
            break;
        case UNKNOW:
            requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_NOT_TYPE);
            break;
        default:
            break;
    }

    requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());
    requestHeader.setTranStateTableOffset(sendResult.getQueueOffset());
    requestHeader.setMsgId(sendResult.getMsgId());
    String remark = localException != null ? ("executeLocalTransactionBranch exception: " + localException.toString()) : null;
    
    //發(fā)送oneWay 單項一次性消息,消息code時事務(wù)的code界斜;(便于不同的處理器處理)
    this.mQClientFactory.getMQClientAPIImpl().endTransactionOneway(brokerAddr, requestHeader, remark,
                                                                   this.defaultMQProducer.getSendMsgTimeout());
}

2)服務(wù)端第一次接受事務(wù)消息

// broker端處理
NettyServerHandler#channelRead0
NettyRemotingAbstract#processMessageReceived
        #processRequestCommand
    -SendMessageProcessor#processRequest
    -SendMessageProcessor#sendMessage
    -TransactionalMessageServiceImpl#prepareMessage
        -TransactionalMessageBridge#putHalfMessage 
            -DefaultMessageStore#putMessage

整個流程我們從SendMessageProcessor看看

private RemotingCommand sendMessage(final ChannelHandlerContext ctx,
                                        final RemotingCommand request,
                                        final SendMessageContext sendMessageContext,
                                        final SendMessageRequestHeader requestHeader) throws RemotingCommandException {

        final RemotingCommand response = RemotingCommand.createResponseCommand(SendMessageResponseHeader.class);
        final SendMessageResponseHeader responseHeader = (SendMessageResponseHeader)response.readCustomHeader();

        response.setOpaque(request.getOpaque());
        //...部分邏輯省略
        MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
        msgInner.setTopic(requestHeader.getTopic());
        msgInner.setQueueId(queueIdInt);

        if (!handleRetryAndDLQ(requestHeader, response, request, msgInner, topicConfig)) {
            return response;
        }

        msgInner.setBody(body);
        msgInner.setFlag(requestHeader.getFlag());
        MessageAccessor.setProperties(msgInner, MessageDecoder.string2messageProperties(requestHeader.getProperties()));
        msgInner.setPropertiesString(requestHeader.getProperties());
        msgInner.setBornTimestamp(requestHeader.getBornTimestamp());
        msgInner.setBornHost(ctx.channel().remoteAddress());
        msgInner.setStoreHost(this.getStoreHost());
        msgInner.setReconsumeTimes(requestHeader.getReconsumeTimes() == null ? 0 : requestHeader.getReconsumeTimes());
        PutMessageResult putMessageResult = null;
        Map<String, String> oriProps = MessageDecoder.string2messageProperties(requestHeader.getProperties());
    
        //獲取是否是事務(wù)消息的標(biāo)識
        String traFlag = oriProps.get(MessageConst.PROPERTY_TRANSACTION_PREPARED);
        if (traFlag != null && Boolean.parseBoolean(traFlag)) {
            if (this.brokerController.getBrokerConfig().isRejectTransactionMessage()) {
                response.setCode(ResponseCode.NO_PERMISSION);
                response.setRemark(
                    "the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1()
                        + "] sending transaction message is forbidden");
                return response;
            }
            putMessageResult = this.brokerController.getTransactionalMessageService().prepareMessage(msgInner);
        } else {
            //非事務(wù)消息的處理仿耽,直接存儲
            putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner);
        }
        //結(jié)果處理返回
        return handlePutMessageResult(putMessageResult, response, request, msgInner, responseHeader, sendMessageContext, ctx, queueIdInt);

    }
//TransactionalMessageBridge#putHalfMessage  存儲半消息瞧瞧
public PutMessageResult putHalfMessage(MessageExtBrokerInner messageInner) {
    //存儲消息
    return store.putMessage(parseHalfMessageInner(messageInner));
}

//這里對于消息做了重新的封裝
private MessageExtBrokerInner parseHalfMessageInner(MessageExtBrokerInner msgInner) {
    
    //設(shè)置真實topic和queueId的 原始信息
    MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_TOPIC, msgInner.getTopic());
    MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_QUEUE_ID,
                                String.valueOf(msgInner.getQueueId()));
    msgInner.setSysFlag(
        MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(), MessageSysFlag.TRANSACTION_NOT_TYPE));
    
    //設(shè)置消息的topic為 "RMQ_SYS_TRANS_HALF_TOPIC" 
    msgInner.setTopic(TransactionalMessageUtil.buildHalfTopic());
    msgInner.setQueueId(0); //隊列使用0號
    msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));
    return msgInner;
}

3)服務(wù)端處理二段事務(wù)結(jié)果

//client發(fā)送消息RemoteCommand.setCode(RequestCode.END_TRANSACTION)

// broker端處理
NettyServerHandler#channelRead0
NettyRemotingAbstract#processMessageReceived
        #processRequestCommand
    -EndTransactionProcessor#processRequest


//總結(jié):事務(wù)消息commit相對rollback 就多一個添加到原來的topic消息的邏輯
// 查看EndTransactionProcessor#processRequest

public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws
    RemotingCommandException {
    final RemotingCommand response = RemotingCommand.createResponseCommand(null);
    final EndTransactionRequestHeader requestHeader =
        (EndTransactionRequestHeader)request.decodeCommandCustomHeader(EndTransactionRequestHeader.class);
    LOGGER.info("Transaction request:{}", requestHeader);
    if (BrokerRole.SLAVE == brokerController.getMessageStoreConfig().getBrokerRole()) {
        response.setCode(ResponseCode.SLAVE_NOT_AVAILABLE);
        LOGGER.warn("Message store is slave mode, so end transaction is forbidden. ");
        return response;
    }
    //... 省略掉不必要的代碼
    OperationResult result = new OperationResult();
    
    //消息提交
    if (MessageSysFlag.TRANSACTION_COMMIT_TYPE == requestHeader.getCommitOrRollback()) {
        //獲取之前存儲的半消息
        result = this.brokerController.getTransactionalMessageService().commitMessage(requestHeader);
        if (result.getResponseCode() == ResponseCode.SUCCESS) {
            //校驗消息是否正確
            RemotingCommand res = checkPrepareMessage(result.getPrepareMessage(), requestHeader);
            if (res.getCode() == ResponseCode.SUCCESS) {
                //構(gòu)建原來topic的消息
                MessageExtBrokerInner msgInner = endMessageTransaction(result.getPrepareMessage());
                msgInner.setSysFlag(MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(), requestHeader.getCommitOrRollback()));
                msgInner.setQueueOffset(requestHeader.getTranStateTableOffset());
                msgInner.setPreparedTransactionOffset(requestHeader.getCommitLogOffset());
                msgInner.setStoreTimestamp(result.getPrepareMessage().getStoreTimestamp());
                //消息存儲
                RemotingCommand sendResult = sendFinalMessage(msgInner);
                if (sendResult.getCode() == ResponseCode.SUCCESS) {
                    //”刪除“ 原來的半消息
                    this.brokerController.getTransactionalMessageService().deletePrepareMessage(result.getPrepareMessage());
                }
                return sendResult;
            }
            return res;
        }
    // 消息回滾    
    } else if (MessageSysFlag.TRANSACTION_ROLLBACK_TYPE == requestHeader.getCommitOrRollback()) {
        //獲取原來的半消息
        result = this.brokerController.getTransactionalMessageService().rollbackMessage(requestHeader);
        if (result.getResponseCode() == ResponseCode.SUCCESS) {
             //校驗消息是否正確
            RemotingCommand res = checkPrepareMessage(result.getPrepareMessage(), requestHeader);
            if (res.getCode() == ResponseCode.SUCCESS) {
                  //”刪除“ 原來的半消息 
                this.brokerController.getTransactionalMessageService().deletePrepareMessage(result.getPrepareMessage());
            }
            return res;
        }
    }
    response.setCode(result.getResponseCode());
    response.setRemark(result.getResponseRemark());
    return response;
}


//TransactionalMessageServiceImpl# deletePrepareMessage

public boolean deletePrepareMessage(MessageExt msgExt) {
    if (this.transactionalMessageBridge.putOpMessage(msgExt, TransactionalMessageUtil.REMOVETAG)) {
        log.info("Transaction op message write successfully. messageId={}, queueId={} msgExt:{}", msgExt.getMsgId(), msgExt.getQueueId(), msgExt);
        return true;
    } else {
        log.error("Transaction op message write failed. messageId is {}, queueId is {}", msgExt.getMsgId(), msgExt.getQueueId());
        return false;
    }
}

// TransactionalMessageBridge#putOpMessage
public boolean putOpMessage(MessageExt messageExt, String opType) {
    MessageQueue messageQueue = new MessageQueue(messageExt.getTopic(),
                                                 this.brokerController.getBrokerConfig().getBrokerName(), messageExt.getQueueId());
    if (TransactionalMessageUtil.REMOVETAG.equals(opType)) {
        return addRemoveTagInTransactionOp(messageExt, messageQueue);
    }
    return true;
}

private boolean addRemoveTagInTransactionOp(MessageExt messageExt, MessageQueue messageQueue) {
    
    //構(gòu)建 ”RMQ_SYS_TRANS_OP_HALF_TOPIC“ topic的消息,值為queueOffset
    Message message = new Message(TransactionalMessageUtil.buildOpTopic(), TransactionalMessageUtil.REMOVETAG,
                                  String.valueOf(messageExt.getQueueOffset()).getBytes(TransactionalMessageUtil.charset));
    writeOp(message, messageQueue);
    return true;
}

private void writeOp(Message message, MessageQueue mq) {
    MessageQueue opQueue;
    if (opQueueMap.containsKey(mq)) {
        opQueue = opQueueMap.get(mq);
    } else {
        opQueue = getOpQueueByHalf(mq);
        MessageQueue oldQueue = opQueueMap.putIfAbsent(mq, opQueue);
        if (oldQueue != null) {
            opQueue = oldQueue;
        }
    }
    if (opQueue == null) {
        opQueue = new MessageQueue(TransactionalMessageUtil.buildOpTopic(), mq.getBrokerName(), mq.getQueueId());
    }
    //存儲消息
    putMessage(makeOpMessageInner(message, opQueue));
}

2.補償流程

補償流程主要是broker端通過定時任務(wù) TransactionalMessageCheckService 來回查事務(wù)狀態(tài)

//入手流程
BrokerController#initialize 構(gòu)建TransactionalMessageCheckService 
BrokerController#start      啟動TransactionalMessageCheckService#run方法
TransactionalMessageCheckService#check 真實的業(yè)務(wù)邏輯
@Override
public void run() {
    log.info("Start transaction check service thread!");
    long checkInterval = brokerController.getBrokerConfig().getTradiaonsactionCheckInterval();
    while (!this.isStopped()) {
        //這個調(diào)用父類方法各薇,然后父類使用模板調(diào)用如下onWaitEnd方法
        this.waitForRunning(checkInterval);
    }
    log.info("End transaction check service thread!");
}

@Override
protected void onWaitEnd() {
    long timeout = brokerController.getBrokerConfig().getTransactionTimeOut();
    int checkMax = brokerController.getBrokerConfig().getTransactionCheckMax();
    long begin = System.currentTimeMillis();
    log.info("Begin to check prepare message, begin time:{}", begin);
    this.brokerController.getTransactionalMessageService().check(timeout, checkMax, this.brokerController.getTransactionalMessageCheckListener());
    log.info("End to check prepare message, consumed time:{}", System.currentTimeMillis() - begin);
}

這里先看看主要的邏輯

@Override
public void check(long transactionTimeout, int transactionCheckMax,
                  AbstractTransactionalMessageCheckListener listener) {
    try {
        //獲取半消息的隊列
        String topic = MixAll.RMQ_SYS_TRANS_HALF_TOPIC;
        Set<MessageQueue> msgQueues = transactionalMessageBridge.fetchMessageQueues(topic);
        if (msgQueues == null || msgQueues.size() == 0) {
            log.warn("The queue of topic is empty :" + topic);
            return;
        }
        log.info("Check topic={}, queues={}", topic, msgQueues);
        for (MessageQueue messageQueue : msgQueues) {
            long startTime = System.currentTimeMillis();
            //根據(jù)半消息隊列项贺,組裝獲取op_半消息(刪除的標(biāo)識) 隊列
            MessageQueue opQueue = getOpQueue(messageQueue);
            //分別獲取兩者的消費偏移量
            long halfOffset = transactionalMessageBridge.fetchConsumeOffset(messageQueue);
            long opOffset = transactionalMessageBridge.fetchConsumeOffset(opQueue);
            log.info("Before check, the queue={} msgOffset={} opOffset={}", messageQueue, halfOffset, opOffset);
            if (halfOffset < 0 || opOffset < 0) {
                log.error("MessageQueue: {} illegal offset read: {}, op offset: {},skip this queue", messageQueue,
                          halfOffset, opOffset);
                continue;
            }

            List<Long> doneOpOffset = new ArrayList<>();
            HashMap<Long, Long> removeMap = new HashMap<>();
            //將需要移除的放入 removeMap,已經(jīng)過期或者完成的消息放入doneOpOffSet中峭判, key是半消息的offset
            PullResult pullResult = fillOpRemoveMap(removeMap, opQueue, opOffset, halfOffset, doneOpOffset);
            if (null == pullResult) {
                log.error("The queue={} check msgOffset={} with opOffset={} failed, pullResult is null",
                          messageQueue, halfOffset, opOffset);
                continue;
            }
            // single thread
            int getMessageNullCount = 1;
            long newOffset = halfOffset;
            long i = halfOffset;
            //具體的業(yè)務(wù)邏輯
            while (true) {...}
            
            //更新半消息的消費offset
            if (newOffset != halfOffset) {
                transactionalMessageBridge.updateConsumeOffset(messageQueue, newOffset);
            }
            //更新op_半消息的消費offset
            long newOpOffset = calculateOpOffset(doneOpOffset, opOffset);
            if (newOpOffset != opOffset) {
                transactionalMessageBridge.updateConsumeOffset(opQueue, newOpOffset);
            }
        }
    } catch (Exception e) {
        e.printStackTrace();
        log.error("Check error", e);
    }

}

我們繼續(xù)看while里面的具體業(yè)務(wù)邏輯

while (true) {
    //處理時間超時退出處理
    if (System.currentTimeMillis() - startTime > MAX_PROCESS_TIME_LIMIT) {
        log.info("Queue={} process time reach max={}", messageQueue, MAX_PROCESS_TIME_LIMIT);
        break;
    }
    //removeMap中存儲的消息是已經(jīng)被commit或者rollback,有刪除標(biāo)識的开缎,所以這里存在需要移除
    if (removeMap.containsKey(i)) {
        log.info("Half offset {} has been committed/rolled back", i);
        removeMap.remove(i);
    } else {
        GetResult getResult = getHalfMsg(messageQueue, i);
        MessageExt msgExt = getResult.getMsg();
        if (msgExt == null) {
            if (getMessageNullCount++ > MAX_RETRY_COUNT_WHEN_HALF_NULL) {
                break;
            }
            if (getResult.getPullResult().getPullStatus() == PullStatus.NO_NEW_MSG) {
                log.info("No new msg, the miss offset={} in={}, continue check={}, pull result={}", i,
                         messageQueue, getMessageNullCount, getResult.getPullResult());
                break;
            } else {
                log.info("Illegal offset, the miss offset={} in={}, continue check={}, pull result={}",
                         i, messageQueue, getMessageNullCount, getResult.getPullResult());
                i = getResult.getPullResult().getNextBeginOffset();
                newOffset = i;
                continue;
            }
        }
        //needDiscard 查過最多幾次檢查次數(shù)需要拋棄,needSkip消息存儲超過三天也是需要拋棄
        if (needDiscard(msgExt, transactionCheckMax) || needSkip(msgExt)) {
            listener.resolveDiscardMsg(msgExt);
            newOffset = i + 1;
            i++;
            continue;
        }
        //如果消息存儲時間超過當(dāng)前時間林螃,那么退出當(dāng)前邏輯
        if (msgExt.getStoreTimestamp() >= startTime) {
            log.info("Fresh stored. the miss offset={}, check it later, store={}", i,
                     new Date(msgExt.getStoreTimestamp()));
            break;
        }
        
        long valueOfCurrentMinusBorn = System.currentTimeMillis() - msgExt.getBornTimestamp();
        long checkImmunityTime = transactionTimeout;
        String checkImmunityTimeStr = msgExt.getUserProperty(MessageConst.PROPERTY_CHECK_IMMUNITY_TIME_IN_SECONDS);
        if (null != checkImmunityTimeStr) {
            checkImmunityTime = getImmunityTime(checkImmunityTimeStr, transactionTimeout);
            if (valueOfCurrentMinusBorn < checkImmunityTime) {
                if (checkPrepareQueueOffset(removeMap, doneOpOffset, msgExt, checkImmunityTime)) {
                    newOffset = i + 1;
                    i++;
                    continue;
                }
            }
        } else {
            if ((0 <= valueOfCurrentMinusBorn) && (valueOfCurrentMinusBorn < checkImmunityTime)) {
                log.info("New arrived, the miss offset={}, check it later checkImmunity={}, born={}", i,
                         checkImmunityTime, new Date(msgExt.getBornTimestamp()));
                break;
            }
        }
        List<MessageExt> opMsg = pullResult.getMsgFoundList();
        //判斷當(dāng)前消息是否可以回查奕删,有三個條件
        boolean isNeedCheck = (opMsg == null && valueOfCurrentMinusBorn > checkImmunityTime)
            || (opMsg != null && (opMsg.get(opMsg.size() - 1).getBornTimestamp() - startTime > transactionTimeout))
            || (valueOfCurrentMinusBorn <= -1);

        if (isNeedCheck) {
            //將Half消息再次寫入到commitlog中
            if (!putBackHalfMsgQueue(msgExt, i)) {
                continue;
            }
            //處理半消息,回查請求疗认,單獨的線程池處理
            listener.resolveHalfMsg(msgExt);
        } else {
            pullResult = fillOpRemoveMap(removeMap, opQueue, pullResult.getNextBeginOffset(), halfOffset, doneOpOffset);
            log.info("The miss offset:{} in messageQueue:{} need to get more opMsg, result is:{}", i,
                     messageQueue, pullResult);
            continue;
        }
    }
    //繼續(xù)處理下一條消息
    newOffset = i + 1;
    i++;
}
后續(xù)的邏輯就是
服務(wù)端:
1.AbstractTransactionalMessageCheckListener#resolveHalfMsg //處理完残,向客戶端發(fā)起請求
2.AbstractTransactionalMessageCheckListener#sendCheckMessage  
3.Broker2Client#checkProducerTransactionState              //通過invokeOneWay 發(fā)起請求check
    
客戶端:
2.ClientRemotingProcessor#checkTransactionState  
3.DefaultMQProducerImpl#checkTransactionState   //本地邏輯處理
4.transactionListener.checkLocalTransaction     //checkExecutor線程池 執(zhí)行本地listener 事務(wù)結(jié)果
5.MQClientAPIImpl#endTransactionOneway          //本地事務(wù)狀態(tài)結(jié)果再次向服務(wù)端發(fā)起回傳,服務(wù)端再重復(fù)之前的事務(wù)執(zhí)行邏輯
//服務(wù)端代碼

public void resolveHalfMsg(final MessageExt msgExt) {
    executorService.execute(new Runnable() {
        @Override
        public void run() {
            try {
                //業(yè)務(wù)線程執(zhí)行
                sendCheckMessage(msgExt);
            } catch (Exception e) {
                LOGGER.error("Send check message error!", e);
            }
        }
    });
}

public void sendCheckMessage(MessageExt msgExt) throws Exception {
    CheckTransactionStateRequestHeader checkTransactionStateRequestHeader = new CheckTransactionStateRequestHeader();
    checkTransactionStateRequestHeader.setCommitLogOffset(msgExt.getCommitLogOffset());
    checkTransactionStateRequestHeader.setOffsetMsgId(msgExt.getMsgId());
    checkTransactionStateRequestHeader.setMsgId(msgExt.getUserProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX));
    checkTransactionStateRequestHeader.setTransactionId(checkTransactionStateRequestHeader.getMsgId());
    checkTransactionStateRequestHeader.setTranStateTableOffset(msgExt.getQueueOffset());
    msgExt.setTopic(msgExt.getUserProperty(MessageConst.PROPERTY_REAL_TOPIC));
    msgExt.setQueueId(Integer.parseInt(msgExt.getUserProperty(MessageConst.PROPERTY_REAL_QUEUE_ID)));
    msgExt.setStoreSize(0);
    //從消息中獲取生產(chǎn)組的一員發(fā)起請i去
    String groupId = msgExt.getProperty(MessageConst.PROPERTY_PRODUCER_GROUP);
    Channel channel = brokerController.getProducerManager().getAvaliableChannel(groupId);
    if (channel != null) {
        
        brokerController.getBroker2Client().checkProducerTransactionState(groupId, channel, checkTransactionStateRequestHeader, msgExt);
    } else {
        LOGGER.warn("Check transaction failed, channel is null. groupId={}", groupId);
    }
}


//Broker2Client#checkProducerTransactionState

public void checkProducerTransactionState(
    final String group,
    final Channel channel,
    final CheckTransactionStateRequestHeader requestHeader,
    final MessageExt messageExt) throws Exception {
    RemotingCommand request =
        RemotingCommand.createRequestCommand(RequestCode.CHECK_TRANSACTION_STATE, requestHeader);
    request.setBody(MessageDecoder.encode(messageExt, false));
    try {
        //單次發(fā)起請求處理
        this.brokerController.getRemotingServer().invokeOneway(channel, request, 10);
    } catch (Exception e) {
        log.error("Check transaction failed because invoke producer exception. group={}, msgId={}", group, messageExt.getMsgId(), e.getMessage());
    }
}

客戶端的簡單邏輯代碼如下:

ClientRemotingProcessor#checkTransactionState
//
public RemotingCommand checkTransactionState(ChannelHandlerContext ctx,
                                                 RemotingCommand request) throws RemotingCommandException {
    final CheckTransactionStateRequestHeader requestHeader =
        (CheckTransactionStateRequestHeader) request.decodeCommandCustomHeader(CheckTransactionStateRequestHeader.class);
    final ByteBuffer byteBuffer = ByteBuffer.wrap(request.getBody());
    final MessageExt messageExt = MessageDecoder.decode(byteBuffer);
    if (messageExt != null) {
        String transactionId = messageExt.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);
        if (null != transactionId && !"".equals(transactionId)) {
            messageExt.setTransactionId(transactionId);
        }
        final String group = messageExt.getProperty(MessageConst.PROPERTY_PRODUCER_GROUP);
        if (group != null) {
            MQProducerInner producer = this.mqClientFactory.selectProducer(group);
            //找到當(dāng)前生產(chǎn)組實例 做本地check           
            if (producer != null) {
                final String addr = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
                producer.checkTransactionState(addr, messageExt, requestHeader);
            } else {
                log.debug("checkTransactionState, pick producer by group[{}] failed", group);
            }
        } else {
            log.warn("checkTransactionState, pick producer group failed");
        }
    } else {
        log.warn("checkTransactionState, decode message failed");
    }

    return null;
}


//DefaultMQProductImpl#checkTransactionState
@Override
public void checkTransactionState(final String addr, final MessageExt msg,
                                  final CheckTransactionStateRequestHeader header) {
    Runnable request = new Runnable() {
        private final String brokerAddr = addr;
        private final MessageExt message = msg;
        private final CheckTransactionStateRequestHeader checkRequestHeader = header;
        private final String group = DefaultMQProducerImpl.this.defaultMQProducer.getProducerGroup();

        @Override
        public void run() {
            TransactionCheckListener transactionCheckListener = DefaultMQProducerImpl.this.checkListener();
            TransactionListener transactionListener = getCheckListener();
            if (transactionCheckListener != null || transactionListener != null) {
                LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW;
                Throwable exception = null;
                try {
                    
                    //調(diào)用本地自定義的listerner 方法執(zhí)行
                    if (transactionCheckListener != null) {
                        localTransactionState = transactionCheckListener.checkLocalTransactionState(message);
                    } else if (transactionListener != null) {
                        log.debug("Used new check API in transaction message");
                        localTransactionState = transactionListener.checkLocalTransaction(message);
                    } else {
                        log.warn("CheckTransactionState, pick transactionListener by group[{}] failed", group);
                    }
                } catch (Throwable e) {
                    log.error("Broker call checkTransactionState, but checkLocalTransactionState exception", e);
                    exception = e;
                }
                //通過查詢事務(wù)執(zhí)行結(jié)果做后續(xù)處理
                this.processTransactionState(
                    localTransactionState,
                    group,
                    exception);
            } else {
                log.warn("CheckTransactionState, pick transactionCheckListener by group[{}] failed", group);
            }
        }

        private void processTransactionState(
            final LocalTransactionState localTransactionState,
            final String producerGroup,
            final Throwable exception) {
            final EndTransactionRequestHeader thisHeader = new EndTransactionRequestHeader();
            thisHeader.setCommitLogOffset(checkRequestHeader.getCommitLogOffset());
            thisHeader.setProducerGroup(producerGroup);
            thisHeader.setTranStateTableOffset(checkRequestHeader.getTranStateTableOffset());
            thisHeader.setFromTransactionCheck(true);

            String uniqueKey = message.getProperties().get(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);
            if (uniqueKey == null) {
                uniqueKey = message.getMsgId();
            }
            thisHeader.setMsgId(uniqueKey);
            thisHeader.setTransactionId(checkRequestHeader.getTransactionId());
            switch (localTransactionState) {
                case COMMIT_MESSAGE:
                    thisHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_COMMIT_TYPE);
                    break;
                case ROLLBACK_MESSAGE:
                    thisHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_ROLLBACK_TYPE);
                    log.warn("when broker check, client rollback this transaction, {}", thisHeader);
                    break;
                case UNKNOW:
                    thisHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_NOT_TYPE);
                    log.warn("when broker check, client does not know this transaction state, {}", thisHeader);
                    break;
                default:
                    break;
            }

            String remark = null;
            if (exception != null) {
                remark = "checkLocalTransactionState Exception: " + RemotingHelper.exceptionSimpleDesc(exception);
            }

            try {
                //繼續(xù)通過獲取到的事務(wù)結(jié)果 再次向服務(wù)端發(fā)起請求
                DefaultMQProducerImpl.this.mQClientFactory.getMQClientAPIImpl().endTransactionOneway(brokerAddr, thisHeader, remark,
                                                                                                     3000);
            } catch (Exception e) {
                log.error("endTransactionOneway exception", e);
            }
        }
    };
    //業(yè)務(wù)線程 處理本地事務(wù)結(jié)果查詢
    this.checkExecutor.submit(request);
}
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末横漏,一起剝皮案震驚了整個濱河市谨设,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌缎浇,老刑警劉巖扎拣,帶你破解...
    沈念sama閱讀 221,888評論 6 515
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異,居然都是意外死亡二蓝,警方通過查閱死者的電腦和手機(jī)誉券,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 94,677評論 3 399
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來刊愚,“玉大人踊跟,你說我怎么就攤上這事“偻兀” “怎么了琴锭?”我有些...
    開封第一講書人閱讀 168,386評論 0 360
  • 文/不壞的土叔 我叫張陵晰甚,是天一觀的道長衙传。 經(jīng)常有香客問我,道長厕九,這世上最難降的妖魔是什么蓖捶? 我笑而不...
    開封第一講書人閱讀 59,726評論 1 297
  • 正文 為了忘掉前任,我火速辦了婚禮扁远,結(jié)果婚禮上俊鱼,老公的妹妹穿的比我還像新娘。我一直安慰自己畅买,他們只是感情好并闲,可當(dāng)我...
    茶點故事閱讀 68,729評論 6 397
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著谷羞,像睡著了一般帝火。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上湃缎,一...
    開封第一講書人閱讀 52,337評論 1 310
  • 那天犀填,我揣著相機(jī)與錄音,去河邊找鬼嗓违。 笑死九巡,一個胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的蹂季。 我是一名探鬼主播冕广,決...
    沈念sama閱讀 40,902評論 3 421
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼偿洁!你這毒婦竟也來了撒汉?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 39,807評論 0 276
  • 序言:老撾萬榮一對情侶失蹤父能,失蹤者是張志新(化名)和其女友劉穎神凑,沒想到半個月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 46,349評論 1 318
  • 正文 獨居荒郊野嶺守林人離奇死亡溉委,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 38,439評論 3 340
  • 正文 我和宋清朗相戀三年鹃唯,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片瓣喊。...
    茶點故事閱讀 40,567評論 1 352
  • 序言:一個原本活蹦亂跳的男人離奇死亡坡慌,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出藻三,到底是詐尸還是另有隱情洪橘,我是刑警寧澤,帶...
    沈念sama閱讀 36,242評論 5 350
  • 正文 年R本政府宣布棵帽,位于F島的核電站熄求,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏逗概。R本人自食惡果不足惜弟晚,卻給世界環(huán)境...
    茶點故事閱讀 41,933評論 3 334
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望逾苫。 院中可真熱鬧卿城,春花似錦、人聲如沸铅搓。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,420評論 0 24
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽星掰。三九已至多望,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間蹋偏,已是汗流浹背便斥。 一陣腳步聲響...
    開封第一講書人閱讀 33,531評論 1 272
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機(jī)就差點兒被人妖公主榨干…… 1. 我叫王不留威始,地道東北人棒搜。 一個月前我還...
    沈念sama閱讀 48,995評論 3 377
  • 正文 我出身青樓护锤,卻偏偏與公主長得像,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子煞额,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 45,585評論 2 359