RocketMq源碼篇-事務(wù)消息之原理

一、案例

public class TransactionProducer {
    public static void main(String[] args) throws MQClientException, InterruptedException {
        // 事務(wù)監(jiān)聽器
        TransactionListener transactionListener = new TransactionListenerImpl();
        // 創(chuàng)建生產(chǎn)者來(lái)發(fā)送事務(wù)消息
        TransactionMQProducer producer = new TransactionMQProducer("please_rename_unique_group_name");
        // 檢查事務(wù)的線程池
        ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() {
            @Override
            public Thread newThread(Runnable r) {
                Thread thread = new Thread(r);
                thread.setName("client-transaction-msg-check-thread");
                return thread;
            }
        });

        // 設(shè)置檢查事務(wù)線程池
        producer.setExecutorService(executorService);
        // 設(shè)置事務(wù)監(jiān)聽器
        producer.setTransactionListener(transactionListener);
        producer.start();

        String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};
        for (int i = 0; i < 10; i++) {
            try {
                // 創(chuàng)建消息
                Message msg =
                    new Message("TopicTest1234", tags[i % tags.length], "KEY" + i,
                        ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
                // 發(fā)送消息
                SendResult sendResult = producer.sendMessageInTransaction(msg, null);
                System.out.printf("%s%n", sendResult);

                Thread.sleep(10);
            } catch (MQClientException | UnsupportedEncodingException e) {
                e.printStackTrace();
            }
        }

        for (int i = 0; i < 100000; i++) {
            Thread.sleep(1000);
        }
        producer.shutdown();
    }
}
  • 創(chuàng)建事務(wù)監(jiān)聽器仲墨,用來(lái)執(zhí)行事務(wù),以及檢查事務(wù)執(zhí)行的結(jié)果疏遏;事務(wù)監(jiān)聽器需要實(shí)現(xiàn)TransactionListener接口
public class TransactionListenerImpl implements TransactionListener {
    private AtomicInteger transactionIndex = new AtomicInteger(0);

    private ConcurrentHashMap<String, Integer> localTrans = new ConcurrentHashMap<>();

    // 當(dāng)發(fā)送事務(wù)準(zhǔn)備(half)消息成功時(shí)脉课,將調(diào)用此方法執(zhí)行本地事務(wù)
    @Override
    public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
        int value = transactionIndex.getAndIncrement();
        int status = value % 3;
        localTrans.put(msg.getTransactionId(), status);
        return LocalTransactionState.UNKNOW;
    }

    // 當(dāng)沒有回應(yīng)準(zhǔn)備(半)消息,代理將發(fā)送檢查消息來(lái)檢查事務(wù)狀態(tài)财异,并執(zhí)行此操作
    // 方法被調(diào)用以獲取事務(wù)狀態(tài)
    @Override
    public LocalTransactionState checkLocalTransaction(MessageExt msg) {
        Integer status = localTrans.get(msg.getTransactionId());
        if (null != status) {
            switch (status) {
                case 0:
                    return LocalTransactionState.UNKNOW;
                case 1:
                    return LocalTransactionState.COMMIT_MESSAGE;
                case 2:
                    return LocalTransactionState.ROLLBACK_MESSAGE;
                default:
                    return LocalTransactionState.COMMIT_MESSAGE;
            }
        }
        return LocalTransactionState.COMMIT_MESSAGE;
    }
}
  • 創(chuàng)建的線程池是用來(lái)執(zhí)行TransactionListenerImpl#checkLocalTransaction

二倘零、發(fā)送事務(wù)消息

TransactionMQProducer#sendMessageInTransaction

    @Override
    public TransactionSendResult sendMessageInTransaction(final Message msg,
        final Object arg) throws MQClientException {
        if (null == this.transactionListener) {
            throw new MQClientException("TransactionListener is null", null);
        }

        msg.setTopic(NamespaceUtil.wrapNamespace(this.getNamespace(), msg.getTopic()));
        return this.defaultMQProducerImpl.sendMessageInTransaction(msg, null, arg);
    }

TransactionMQProducer#sendMessageInTransaction

 public TransactionSendResult sendMessageInTransaction(final Message msg,
        final LocalTransactionExecuter localTransactionExecuter, final Object arg)
        throws MQClientException {
        TransactionListener transactionListener = getCheckListener();
        if (null == localTransactionExecuter && null == transactionListener) {
            throw new MQClientException("tranExecutor is null", null);
        }

        // ignore DelayTimeLevel parameter
        if (msg.getDelayTimeLevel() != 0) {
            MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_DELAY_TIME_LEVEL);
        }

        Validators.checkMessage(msg, this.defaultMQProducer);

        SendResult sendResult = null;
        MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TRANSACTION_PREPARED, "true");
        MessageAccessor.putProperty(msg, MessageConst.PROPERTY_PRODUCER_GROUP, this.defaultMQProducer.getProducerGroup());
        try {
            sendResult = this.send(msg);
        } catch (Exception e) {
            throw new MQClientException("send message Exception", e);
        }

        LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW;
        Throwable localException = null;
        switch (sendResult.getSendStatus()) {
            case SEND_OK: {
                try {
                    if (sendResult.getTransactionId() != null) {
                        msg.putUserProperty("__transactionId__", sendResult.getTransactionId());
                    }
                    String transactionId = msg.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);
                    if (null != transactionId && !"".equals(transactionId)) {
                        msg.setTransactionId(transactionId);
                    }
                    if (null != localTransactionExecuter) {
                        localTransactionState = localTransactionExecuter.executeLocalTransactionBranch(msg, arg);
                    } else if (transactionListener != null) {
                        log.debug("Used new transaction API");
                        localTransactionState = transactionListener.executeLocalTransaction(msg, arg);
                    }
                    if (null == localTransactionState) {
                        localTransactionState = LocalTransactionState.UNKNOW;
                    }

                    if (localTransactionState != LocalTransactionState.COMMIT_MESSAGE) {
                        log.info("executeLocalTransactionBranch return {}", localTransactionState);
                        log.info(msg.toString());
                    }
                } catch (Throwable e) {
                    log.info("executeLocalTransactionBranch exception", e);
                    log.info(msg.toString());
                    localException = e;
                }
            }
            break;
            case FLUSH_DISK_TIMEOUT:
            case FLUSH_SLAVE_TIMEOUT:
            case SLAVE_NOT_AVAILABLE:
                localTransactionState = LocalTransactionState.ROLLBACK_MESSAGE;
                break;
            default:
                break;
        }

        try {
            this.endTransaction(sendResult, localTransactionState, localException);
        } catch (Exception e) {
            log.warn("local transaction execute " + localTransactionState + ", but end broker transaction failed", e);
        }

        TransactionSendResult transactionSendResult = new TransactionSendResult();
        transactionSendResult.setSendStatus(sendResult.getSendStatus());
        transactionSendResult.setMessageQueue(sendResult.getMessageQueue());
        transactionSendResult.setMsgId(sendResult.getMsgId());
        transactionSendResult.setQueueOffset(sendResult.getQueueOffset());
        transactionSendResult.setTransactionId(sendResult.getTransactionId());
        transactionSendResult.setLocalTransactionState(localTransactionState);
        return transactionSendResult;
    }
  • this.send(msg),給broker發(fā)送消息戳寸,其中
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TRANSACTION_PREPARED, "true")

告訴broker是事務(wù)消息呈驶,這時(shí)候,該消息不能被消費(fèi)者消費(fèi)

  • 消息發(fā)送成功后疫鹊,開始執(zhí)行事務(wù)
localTransactionState = transactionListener.executeLocalTransaction(msg, arg);
  • 事務(wù)執(zhí)行完后袖瞻,向broker發(fā)送事務(wù)執(zhí)行結(jié)果,若執(zhí)行成功拆吆,消費(fèi)者可以執(zhí)行該事務(wù)消息聋迎;若執(zhí)行失敗,消費(fèi)者不能消費(fèi)該事務(wù)消息

三枣耀、發(fā)送事務(wù)執(zhí)行結(jié)果

DefaultMQProducerImpl#endTransaction

public void endTransaction(
        final SendResult sendResult,
        final LocalTransactionState localTransactionState,
        final Throwable localException) throws RemotingException, MQBrokerException, InterruptedException, UnknownHostException {
        final MessageId id;
        if (sendResult.getOffsetMsgId() != null) {
            id = MessageDecoder.decodeMessageId(sendResult.getOffsetMsgId());
        } else {
            id = MessageDecoder.decodeMessageId(sendResult.getMsgId());
        }
        String transactionId = sendResult.getTransactionId();
        final String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(sendResult.getMessageQueue().getBrokerName());
        EndTransactionRequestHeader requestHeader = new EndTransactionRequestHeader();
        requestHeader.setTransactionId(transactionId);
        requestHeader.setCommitLogOffset(id.getOffset());
        switch (localTransactionState) {
            case COMMIT_MESSAGE:
                requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_COMMIT_TYPE);
                break;
            case ROLLBACK_MESSAGE:
                requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_ROLLBACK_TYPE);
                break;
            case UNKNOW:
                requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_NOT_TYPE);
                break;
            default:
                break;
        }

        requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());
        requestHeader.setTranStateTableOffset(sendResult.getQueueOffset());
        requestHeader.setMsgId(sendResult.getMsgId());
        String remark = localException != null ? ("executeLocalTransactionBranch exception: " + localException.toString()) : null;
        this.mQClientFactory.getMQClientAPIImpl().endTransactionOneway(brokerAddr, requestHeader, remark,
            this.defaultMQProducer.getSendMsgTimeout());
    }

四霉晕、總結(jié)

  • 事務(wù)消息一開始并不是直接發(fā)送到指定的那個(gè)topic 對(duì)應(yīng)的隊(duì)列里面的,而是發(fā)送到RMQ_SYS_TRANS_HALF_TOPIC topic里面,防止消費(fèi)者消費(fèi)牺堰;然后broker響應(yīng)生產(chǎn)者拄轻,執(zhí)行executeLocalTransaction 方法來(lái)執(zhí)行本地事務(wù)
  • 當(dāng)本地事務(wù)執(zhí)行成功,返回 commit提交事務(wù)伟葫,broker會(huì)先從RMQ_SYS_TRANS_HALF_TOPIC topic里面找到消息恨搓,恢復(fù)原來(lái)的樣子,存儲(chǔ)到生產(chǎn)者設(shè)置的topic扒俯;存儲(chǔ)成功之后生成一條刪除消息 并放到RMQ_SYS_TRANS_OP_HALF_TOPIC topic 里面
  • 如果本地事務(wù)失敗奶卓,就要rollback ,會(huì)從RMQ_SYS_TRANS_HALF_TOPIC topic里面找到你那個(gè)消息撼玄,生成一條刪除消息放到RMQ_SYS_TRANS_OP_HALF_TOPIC topic 里面

五夺姑、異常情況處理

1、提交信息失敗

提交消息失敗掌猛,直接拋異常盏浙,不用執(zhí)行事務(wù)

2、事務(wù)執(zhí)行完后荔茬,向broker提交事務(wù)執(zhí)行結(jié)果失敗

broker 會(huì)有個(gè)事務(wù)服務(wù)線程废膘,隔一段時(shí)間就掃描RMQ_SYS_TRANS_HALF_TOPIC topic 里面沒有提交或者回滾的消息,然后它發(fā)送消息給生產(chǎn)者慕蔚,生產(chǎn)者執(zhí)行檢查事務(wù)的方法TransactionListener#checkLocalTransaction丐黄,詢問事務(wù)的執(zhí)行狀態(tài),默認(rèn)訪問15次

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末孔飒,一起剝皮案震驚了整個(gè)濱河市灌闺,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌坏瞄,老刑警劉巖桂对,帶你破解...
    沈念sama閱讀 222,464評(píng)論 6 517
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場(chǎng)離奇詭異鸠匀,居然都是意外死亡蕉斜,警方通過查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 95,033評(píng)論 3 399
  • 文/潘曉璐 我一進(jìn)店門缀棍,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)宅此,“玉大人,你說我怎么就攤上這事爬范》塘瑁” “怎么了?”我有些...
    開封第一講書人閱讀 169,078評(píng)論 0 362
  • 文/不壞的土叔 我叫張陵坦敌,是天一觀的道長(zhǎng)侣诵。 經(jīng)常有香客問我痢法,道長(zhǎng),這世上最難降的妖魔是什么杜顺? 我笑而不...
    開封第一講書人閱讀 59,979評(píng)論 1 299
  • 正文 為了忘掉前任财搁,我火速辦了婚禮,結(jié)果婚禮上躬络,老公的妹妹穿的比我還像新娘尖奔。我一直安慰自己,他們只是感情好穷当,可當(dāng)我...
    茶點(diǎn)故事閱讀 69,001評(píng)論 6 398
  • 文/花漫 我一把揭開白布提茁。 她就那樣靜靜地躺著,像睡著了一般馁菜。 火紅的嫁衣襯著肌膚如雪茴扁。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 52,584評(píng)論 1 312
  • 那天汪疮,我揣著相機(jī)與錄音峭火,去河邊找鬼。 笑死智嚷,一個(gè)胖子當(dāng)著我的面吹牛卖丸,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播盏道,決...
    沈念sama閱讀 41,085評(píng)論 3 422
  • 文/蒼蘭香墨 我猛地睜開眼稍浆,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼!你這毒婦竟也來(lái)了猜嘱?” 一聲冷哼從身側(cè)響起粹湃,我...
    開封第一講書人閱讀 40,023評(píng)論 0 277
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤,失蹤者是張志新(化名)和其女友劉穎泉坐,沒想到半個(gè)月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體裳仆,經(jīng)...
    沈念sama閱讀 46,555評(píng)論 1 319
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡腕让,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 38,626評(píng)論 3 342
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了歧斟。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片纯丸。...
    茶點(diǎn)故事閱讀 40,769評(píng)論 1 353
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖静袖,靈堂內(nèi)的尸體忽然破棺而出觉鼻,到底是詐尸還是另有隱情,我是刑警寧澤队橙,帶...
    沈念sama閱讀 36,439評(píng)論 5 351
  • 正文 年R本政府宣布坠陈,位于F島的核電站萨惑,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏仇矾。R本人自食惡果不足惜庸蔼,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 42,115評(píng)論 3 335
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望贮匕。 院中可真熱鬧姐仅,春花似錦、人聲如沸刻盐。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,601評(píng)論 0 25
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)敦锌。三九已至馒疹,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間供屉,已是汗流浹背行冰。 一陣腳步聲響...
    開封第一講書人閱讀 33,702評(píng)論 1 274
  • 我被黑心中介騙來(lái)泰國(guó)打工, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留伶丐,地道東北人悼做。 一個(gè)月前我還...
    沈念sama閱讀 49,191評(píng)論 3 378
  • 正文 我出身青樓,卻偏偏與公主長(zhǎng)得像哗魂,于是被迫代替她去往敵國(guó)和親肛走。 傳聞我的和親對(duì)象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,781評(píng)論 2 361

推薦閱讀更多精彩內(nèi)容