rocketmq發(fā)送消息之尋找主題路由信息

1壹店、rocket發(fā)送消息整個過程可以概括為:檢查消息->尋找主題路由信息->選擇消息隊列->發(fā)送消息猜丹,本文主要解析尋找主題路由信息部分。
2硅卢、以rocketMq源碼中的示例開始解讀射窒,入口是excample模塊中的Producer#send()方法

SendResult sendResult = producer.send(msg);

DefaultMQProducerImpl#sendDefaultImpl方法

Validators.checkMessage(msg, this.defaultMQProducer);
        final long invokeID = random.nextLong();
        long beginTimestampFirst = System.currentTimeMillis();
        long beginTimestampPrev = beginTimestampFirst;
        long endTimestamp = beginTimestampFirst;
        TopicPublishInfo topicPublishInfo = 
#尋找主題路由信息入口,如果尋找不到路由信息将塑,直接拋出異常信息
this.tryToFindTopicPublishInfo(msg.getTopic());
        if (topicPublishInfo != null && topicPublishInfo.ok()) {
            boolean callTimeout = false;
            MessageQueue mq = null;

DefaultMQProducerImpl#tryToFindTopicPublishInfo方法,第一次發(fā)送消息時脉顿,先從緩存中獲取路由信息,如果沒有獲取到点寥,則從NameServer查找主題信息艾疟,如果獲取則返回,并且將主題信息放入緩存敢辩,如果還未獲取到則從NameServer查詢默認的主題信息

private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) {
        TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic);
        if (null == topicPublishInfo || !topicPublishInfo.ok()) {
            this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo());
            this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
            topicPublishInfo = this.topicPublishInfoTable.get(topic);
        }

        if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) {
            return topicPublishInfo;
        } else {
            this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer);
            topicPublishInfo = this.topicPublishInfoTable.get(topic);
            return topicPublishInfo;
        }
    }

MQClientInstance#updateTopicRouteInfoFromNameServer方法從NameServer查詢主題信息

public boolean updateTopicRouteInfoFromNameServer(final String topic) {
        return updateTopicRouteInfoFromNameServer(topic, false, null);
    }
public boolean updateTopicRouteInfoFromNameServer(final String topic, boolean isDefault,
        DefaultMQProducer defaultMQProducer) {
        try {
            if (this.lockNamesrv.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
                try {
                    TopicRouteData topicRouteData;
                    #獲取默認主題
                    if (isDefault && defaultMQProducer != null) {
                        topicRouteData = this.mQClientAPIImpl.getDefaultTopicRouteInfoFromNameServer(defaultMQProducer.getCreateTopicKey(),
                            1000 * 3);
                        if (topicRouteData != null) {
                            for (QueueData data : topicRouteData.getQueueDatas()) {
                                int queueNums = Math.min(defaultMQProducer.getDefaultTopicQueueNums(), data.getReadQueueNums());
                                data.setReadQueueNums(queueNums);
                                data.setWriteQueueNums(queueNums);
                            }
                        }
                    } else {
                        #從NameSever獲取指定的主題路由信息蔽莱,底層使用Netty
                        topicRouteData = this.mQClientAPIImpl.getTopicRouteInfoFromNameServer(topic, 1000 * 3);
                    }
                    if (topicRouteData != null) {
                        #因為之前在DefaultMQProducerImpl中已經緩存了主題路由信息,包括空的主題路由信息戚长,因此這里需要檢查是否要更新
                        TopicRouteData old = this.topicRouteTable.get(topic);
                        #比較時是將兩個目標對象深度克隆后進行比對盗冷,防止影響原來的目標對象
                        boolean changed = topicRouteDataIsChange(old, topicRouteData);
                        if (!changed) {
                            changed = this.isNeedUpdateTopicRouteInfo(topic);
                        } else {
                            log.info("the topic[{}] route info changed, old[{}] ,new[{}]", topic, old, topicRouteData);
                        }
                        #如果需要更新,這更新DefaultMQProducerImpl
                        if (changed) {
                            TopicRouteData cloneTopicRouteData = topicRouteData.cloneTopicRouteData();
                            #更新topic的broker地址信息
                            for (BrokerData bd : topicRouteData.getBrokerDatas()) {
                                this.brokerAddrTable.put(bd.getBrokerName(), bd.getBrokerAddrs());
                            }

                            // Update Pub info 更新DefaultMQProducerImpl中的topicPublishInfoTable緩存信息
                            {
                                TopicPublishInfo publishInfo = topicRouteData2TopicPublishInfo(topic, topicRouteData);
                                publishInfo.setHaveTopicRouterInfo(true);
                                Iterator<Entry<String, MQProducerInner>> it = this.producerTable.entrySet().iterator();
                                while (it.hasNext()) {
                                    Entry<String, MQProducerInner> entry = it.next();
                                    MQProducerInner impl = entry.getValue();
                                    if (impl != null) {
                                        impl.updateTopicPublishInfo(topic, publishInfo);
                                    }
                                }
                            }

                            // Update sub info更新消費者信息
                            {
                                Set<MessageQueue> subscribeInfo = topicRouteData2TopicSubscribeInfo(topic, topicRouteData);
                                Iterator<Entry<String, MQConsumerInner>> it = this.consumerTable.entrySet().iterator();
                                while (it.hasNext()) {
                                    Entry<String, MQConsumerInner> entry = it.next();
                                    MQConsumerInner impl = entry.getValue();
                                    if (impl != null) {
                                        impl.updateTopicSubscribeInfo(topic, subscribeInfo);
                                    }
                                }
                            }
                            log.info("topicRouteTable.put. Topic = {}, TopicRouteData[{}]", topic, cloneTopicRouteData);
                            #將原始的主題路由信息放入緩存
                            this.topicRouteTable.put(topic, cloneTopicRouteData);
                            return true;
                        }
                    } else {
                        log.warn("updateTopicRouteInfoFromNameServer, getTopicRouteInfoFromNameServer return null, Topic: {}", topic);
                    }
                } catch (MQClientException e) {
                    if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
                        log.warn("updateTopicRouteInfoFromNameServer Exception", e);
                    }
                } catch (RemotingException e) {
                    log.error("updateTopicRouteInfoFromNameServer Exception", e);
                    throw new IllegalStateException(e);
                } finally {
                    this.lockNamesrv.unlock();
                }
            } else {
                log.warn("updateTopicRouteInfoFromNameServer tryLock timeout {}ms", LOCK_TIMEOUT_MILLIS);
            }
        } catch (InterruptedException e) {
            log.warn("updateTopicRouteInfoFromNameServer Exception", e);
        }

        return false;
    }

MQClientInstance#topicRouteData2TopicPublishInfo方法從TopicRouteData路由信息中獲取主題發(fā)布相關信息

public static TopicPublishInfo topicRouteData2TopicPublishInfo(final String topic, final TopicRouteData route) {
        TopicPublishInfo info = new TopicPublishInfo();
        info.setTopicRouteData(route);
        #設置有序broker信息list
        if (route.getOrderTopicConf() != null && route.getOrderTopicConf().length() > 0) {
            String[] brokers = route.getOrderTopicConf().split(";");
            for (String broker : brokers) {
                String[] item = broker.split(":");
                int nums = Integer.parseInt(item[1]);
                for (int i = 0; i < nums; i++) {
                    MessageQueue mq = new MessageQueue(topic, item[0], i);
                    info.getMessageQueueList().add(mq);
                }
            }

            info.setOrderTopic(true);
        } else {
            List<QueueData> qds = route.getQueueDatas();
            Collections.sort(qds);
            for (QueueData qd : qds) {
                #只獲取有寫權限的隊列
                if (PermName.isWriteable(qd.getPerm())) {
                    BrokerData brokerData = null;
                    for (BrokerData bd : route.getBrokerDatas()) {
                        if (bd.getBrokerName().equals(qd.getBrokerName())) {
                            brokerData = bd;
                            break;
                        }
                    }

                    if (null == brokerData) {
                        continue;
                    }
                    #只使用master節(jié)點
                    if (!brokerData.getBrokerAddrs().containsKey(MixAll.MASTER_ID)) {
                        continue;
                    }

                    for (int i = 0; i < qd.getWriteQueueNums(); i++) {
                        MessageQueue mq = new MessageQueue(topic, qd.getBrokerName(), i);
                        info.getMessageQueueList().add(mq);
                    }
                }
            }

            info.setOrderTopic(false);
        }

        return info;
    }

具體請求NameServer的主題路由信息省略同廉,到此仪糖,第一次發(fā)送消息獲取主題路由信息就已完成。

最后編輯于
?著作權歸作者所有,轉載或內容合作請聯系作者
  • 序言:七十年代末迫肖,一起剝皮案震驚了整個濱河市锅劝,隨后出現的幾起案子,更是在濱河造成了極大的恐慌蟆湖,老刑警劉巖故爵,帶你破解...
    沈念sama閱讀 217,542評論 6 504
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現場離奇詭異帐姻,居然都是意外死亡稠集,警方通過查閱死者的電腦和手機,發(fā)現死者居然都...
    沈念sama閱讀 92,822評論 3 394
  • 文/潘曉璐 我一進店門饥瓷,熙熙樓的掌柜王于貴愁眉苦臉地迎上來剥纷,“玉大人,你說我怎么就攤上這事呢铆』扌” “怎么了?”我有些...
    開封第一講書人閱讀 163,912評論 0 354
  • 文/不壞的土叔 我叫張陵棺克,是天一觀的道長悠垛。 經常有香客問我,道長娜谊,這世上最難降的妖魔是什么确买? 我笑而不...
    開封第一講書人閱讀 58,449評論 1 293
  • 正文 為了忘掉前任,我火速辦了婚禮纱皆,結果婚禮上湾趾,老公的妹妹穿的比我還像新娘。我一直安慰自己派草,他們只是感情好搀缠,可當我...
    茶點故事閱讀 67,500評論 6 392
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著近迁,像睡著了一般艺普。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上鉴竭,一...
    開封第一講書人閱讀 51,370評論 1 302
  • 那天歧譬,我揣著相機與錄音,去河邊找鬼拓瞪。 笑死缴罗,一個胖子當著我的面吹牛,可吹牛的內容都是我干的祭埂。 我是一名探鬼主播面氓,決...
    沈念sama閱讀 40,193評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼蛆橡!你這毒婦竟也來了舌界?” 一聲冷哼從身側響起,我...
    開封第一講書人閱讀 39,074評論 0 276
  • 序言:老撾萬榮一對情侶失蹤泰演,失蹤者是張志新(化名)和其女友劉穎呻拌,沒想到半個月后,有當地人在樹林里發(fā)現了一具尸體睦焕,經...
    沈念sama閱讀 45,505評論 1 314
  • 正文 獨居荒郊野嶺守林人離奇死亡藐握,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內容為張勛視角 年9月15日...
    茶點故事閱讀 37,722評論 3 335
  • 正文 我和宋清朗相戀三年靴拱,在試婚紗的時候發(fā)現自己被綠了。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片猾普。...
    茶點故事閱讀 39,841評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡袜炕,死狀恐怖,靈堂內的尸體忽然破棺而出初家,到底是詐尸還是另有隱情偎窘,我是刑警寧澤,帶...
    沈念sama閱讀 35,569評論 5 345
  • 正文 年R本政府宣布溜在,位于F島的核電站陌知,受9級特大地震影響,放射性物質發(fā)生泄漏掖肋。R本人自食惡果不足惜仆葡,卻給世界環(huán)境...
    茶點故事閱讀 41,168評論 3 328
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望培遵。 院中可真熱鬧浙芙,春花似錦、人聲如沸籽腕。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,783評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽皇耗。三九已至南窗,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間郎楼,已是汗流浹背万伤。 一陣腳步聲響...
    開封第一講書人閱讀 32,918評論 1 269
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留呜袁,地道東北人敌买。 一個月前我還...
    沈念sama閱讀 47,962評論 2 370
  • 正文 我出身青樓,卻偏偏與公主長得像阶界,于是被迫代替她去往敵國和親虹钮。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當晚...
    茶點故事閱讀 44,781評論 2 354

推薦閱讀更多精彩內容