RocketMQ——NameServer 路由規(guī)則

RocketMQ路由注冊是通過Broker與NameServer的心跳功能實現(xiàn)筋讨。Broker啟動時向集群中所有的NameServer發(fā)送心跳包埃叭,每隔30秒向集群中的所有NameServer發(fā)送心跳,NamerServer收到心跳包會更新brokerLiveTable緩存中的BrokerLiveInfo的lastUpdateTimestamp悉罕,然后NameServer每隔十秒掃描brokerLiveTable赤屋,如果連續(xù)120秒沒有收到心跳包,NameServer將移除該broker的路由信息壁袄,同時關閉Socket連接类早。
這里也是為什么不建議線上開啟autoCreateTopic的原因,因為默認緩存在broker中然想,所以開啟后莺奔,多個broker同時默認創(chuàng)建topic,將會導致重復創(chuàng)建broker变泄,最終同步到NameServer后的數(shù)據(jù)重復令哟。

Broker發(fā)送心跳包

org.apache.rocketmq.broker.BrokerController#start

        this.scheduledExecutorService.scheduleAtFixedRate(() -> {
            try {
                BrokerController.this.registerBrokerAll(true, false, brokerConfig.isForceRegister());
            } catch (Throwable e) {
                log.error("registerBrokerAll Exception", e);
            }
        }, 1000 * 10, Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)), TimeUnit.MILLISECONDS);

跟蹤代碼,真正執(zhí)行注冊

this.brokerOuterAPI.registerBrokerAll

org.apache.rocketmq.broker.out.BrokerOuterAPI#registerBrokerAll

    public List<RegisterBrokerResult> registerBrokerAll(
        final String clusterName,
        final String brokerAddr,
        final String brokerName,
        final long brokerId,
        final String haServerAddr,
        final TopicConfigSerializeWrapper topicConfigWrapper,
        final List<String> filterServerList,
        final boolean oneway,
        final int timeoutMills,
        final boolean compressed) {

        final List<RegisterBrokerResult> registerBrokerResultList = Lists.newArrayList();
        //獲取 NameServer 地址列表
        List<String> nameServerAddressList = this.remotingClient.getNameServerAddressList();
        if (nameServerAddressList != null && nameServerAddressList.size() > 0) {

            final RegisterBrokerRequestHeader requestHeader = new RegisterBrokerRequestHeader();
            //填充requestHeader屬性

            RegisterBrokerBody requestBody = new RegisterBrokerBody();
            //填充body屬性
            
            final CountDownLatch countDownLatch = new CountDownLatch(nameServerAddressList.size());
            for (final String namesrvAddr : nameServerAddressList) {
                brokerOuterExecutor.execute(new Runnable() {
                    @Override
                    public void run() {
                      //真正注冊
                      RegisterBrokerResult result = registerBroker(namesrvAddr,oneway, timeoutMills,requestHeader,body);
                    }
                });
            }
        }

        return registerBrokerResultList;
    }

注冊邏輯在registerBroker中執(zhí)行妨蛹,邏輯較為簡單屏富,發(fā)送http請求處理返回數(shù)據(jù)即可。

private RegisterBrokerResult registerBroker(
        final String namesrvAddr,
        final boolean oneway,
        final int timeoutMills,
        final RegisterBrokerRequestHeader requestHeader,
        final byte[] body
    ) throws RemotingCommandException, MQBrokerException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException,
        InterruptedException {
        RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.REGISTER_BROKER, requestHeader);
        request.setBody(body);

        if (oneway) {
            try {
                this.remotingClient.invokeOneway(namesrvAddr, request, timeoutMills);
            } catch (RemotingTooMuchRequestException e) {
                // Ignore
            }
            return null;
        }

        RemotingCommand response = this.remotingClient.invokeSync(namesrvAddr, request, timeoutMills);
        assert response != null;
        switch (response.getCode()) {
            case ResponseCode.SUCCESS: {
                RegisterBrokerResponseHeader responseHeader =
                    (RegisterBrokerResponseHeader) response.decodeCommandCustomHeader(RegisterBrokerResponseHeader.class);
                RegisterBrokerResult result = new RegisterBrokerResult();
                result.setMasterAddr(responseHeader.getMasterAddr());
                result.setHaServerAddr(responseHeader.getHaServerAddr());
                if (response.getBody() != null) {
                    result.setKvTable(KVTable.decode(response.getBody(), KVTable.class));
                }
                return result;
            }
            default:
                break;
        }

        throw new MQBrokerException(response.getCode(), response.getRemark());
    }

以上完成后蛙卤,返回BrokerController#doRegisterBrokerAll 方法中

//注冊并獲取結(jié)果
 List<RegisterBrokerResult> registerBrokerResultList = this.brokerOuterAPI.registerBrokerAll(xxxx);
 //更新心跳包返回數(shù)據(jù)
        if (registerBrokerResultList.size() > 0) {
            RegisterBrokerResult registerBrokerResult = registerBrokerResultList.get(0);
            if (registerBrokerResult != null) {
                if (this.updateMasterHAServerAddrPeriodically && registerBrokerResult.getHaServerAddr() != null) {
                    this.messageStore.updateHaMasterAddress(registerBrokerResult.getHaServerAddr());
                }

                this.slaveSynchronize.setMasterAddr(registerBrokerResult.getMasterAddr());
              
                if (checkOrderConfig) {
                    this.getTopicConfigManager().updateOrderTopicConfig(registerBrokerResult.getKvTable());
                }
            }
        }

獲取返回的結(jié)果狠半,更新對應的masterAddr 以及 topicConfigTable

NameServer處理心跳包

org.apache.rocketmq.namesrv.processor.DefaultRequestProcessor 負責RocketMQ中處理所有相關的網(wǎng)絡請求噩死,對應的心跳包請求解析器為RequestCode.REGISTER_BROKER

case RequestCode.REGISTER_BROKER:
    Version brokerVersion = MQVersion.value2Version(request.getVersion());
    if (brokerVersion.ordinal() >= MQVersion.Version.V3_0_11.ordinal()) {
        return this.registerBrokerWithFilterServer(ctx, request);
    } else {
        return this.registerBroker(ctx, request);
    }

跟蹤對應的代碼,我們可以看到請求的處理和返回代碼如下:

 RegisterBrokerResult result = this.namesrvController.getRouteInfoManager().registerBroker(
            requestHeader.getClusterName(),
            requestHeader.getBrokerAddr(),
            requestHeader.getBrokerName(),
            requestHeader.getBrokerId(),
            requestHeader.getHaServerAddr(),
            registerBrokerBody.getTopicConfigSerializeWrapper(),
            registerBrokerBody.getFilterServerList(),
            ctx.channel());

        responseHeader.setHaServerAddr(result.getHaServerAddr());
        responseHeader.setMasterAddr(result.getMasterAddr());

        byte[] jsonValue = this.namesrvController.getKvConfigManager().getKVListByNamespace(NamesrvUtil.NAMESPACE_ORDER_TOPIC_CONFIG);
        response.setBody(jsonValue);

        response.setCode(ResponseCode.SUCCESS);
        response.setRemark(null);

namesrvController.getRouteInfoManager().registerBroker 負責處理注冊請求神年,處理后 返回體包裝對應的 ServerAddr以及路由信息返回已维。
RouteInfoManager#registerBroker

    public RegisterBrokerResult registerBroker(
        final String clusterName,
        final String brokerAddr,
        final String brokerName,
        final long brokerId,
        final String haServerAddr,
        final TopicConfigSerializeWrapper topicConfigWrapper,
        final List<String> filterServerList,
        final Channel channel) {
        RegisterBrokerResult result = new RegisterBrokerResult();
        try {
            try {
            //寫加鎖
                this.lock.writeLock().lockInterruptibly();
                
                Set<String> brokerNames = this.clusterAddrTable.get(clusterName);
                if (null == brokerNames) {
                    brokerNames = new HashSet<String>();
                    this.clusterAddrTable.put(clusterName, brokerNames);
                }
                brokerNames.add(brokerName);

                boolean registerFirst = false;
                //獲取brokerName,如果為空已日,則注冊
                BrokerData brokerData = this.brokerAddrTable.get(brokerName);
                if (null == brokerData) {
                    registerFirst = true;
                    brokerData = new BrokerData(clusterName, brokerName, new HashMap<Long, String>());
                    this.brokerAddrTable.put(brokerName, brokerData);
                }
                Map<Long, String> brokerAddrsMap = brokerData.getBrokerAddrs();
                //Switch slave to master: first remove <1, IP:PORT> in namesrv, then add <0, IP:PORT>
                //The same IP:PORT must only have one record in brokerAddrTable
                Iterator<Entry<Long, String>> it = brokerAddrsMap.entrySet().iterator();
                while (it.hasNext()) {
                    Entry<Long, String> item = it.next();
                    if (null != brokerAddr && brokerAddr.equals(item.getValue()) && brokerId != item.getKey()) {
                        it.remove();
                    }
                }

                String oldAddr = brokerData.getBrokerAddrs().put(brokerId, brokerAddr);
                registerFirst = registerFirst || (null == oldAddr);

                if (null != topicConfigWrapper
                    && MixAll.MASTER_ID == brokerId) {
                    if (this.isBrokerTopicConfigChanged(brokerAddr, topicConfigWrapper.getDataVersion())
                        || registerFirst) {
                        ConcurrentMap<String, TopicConfig> tcTable =
                            topicConfigWrapper.getTopicConfigTable();
                        if (tcTable != null) {
                            for (Map.Entry<String, TopicConfig> entry : tcTable.entrySet()) {
                                this.createAndUpdateQueueData(brokerName, entry.getValue());
                            }
                        }
                    }
                }

                BrokerLiveInfo prevBrokerLiveInfo = this.brokerLiveTable.put(brokerAddr,
                    new BrokerLiveInfo(
                        System.currentTimeMillis(),
                        topicConfigWrapper.getDataVersion(),
                        channel,
                        haServerAddr));
                if (null == prevBrokerLiveInfo) {
                    log.info("new broker registered, {} HAServer: {}", brokerAddr, haServerAddr);
                }

                if (filterServerList != null) {
                    if (filterServerList.isEmpty()) {
                        this.filterServerTable.remove(brokerAddr);
                    } else {
                        this.filterServerTable.put(brokerAddr, filterServerList);
                    }
                }

                if (MixAll.MASTER_ID != brokerId) {
                    String masterAddr = brokerData.getBrokerAddrs().get(MixAll.MASTER_ID);
                    if (masterAddr != null) {
                        BrokerLiveInfo brokerLiveInfo = this.brokerLiveTable.get(masterAddr);
                        if (brokerLiveInfo != null) {
                            result.setHaServerAddr(brokerLiveInfo.getHaServerAddr());
                            result.setMasterAddr(masterAddr);
                        }
                    }
                }
            } finally {
                this.lock.writeLock().unlock();
            }
        } catch (Exception e) {
            log.error("registerBroker Exception", e);
        }

        return result;
    }

路由刪除

如上所示垛耳,broker每隔30秒向NameServer注冊路由信息,那么如果一個broker宕機后飘千,NameServer如何剔除Broker呢堂鲜?同樣的NameServer每隔10秒掃描broker列表,管理broker存活信息护奈。
RocketMQ有兩個出發(fā)點來完成路由刪除:

  1. NameServer定時掃描brokerLiveTable檢測上次心跳包與當前系統(tǒng)的時間差缔莲,如果時間差大于120s,則需要剔除該broker信息
  2. Broker正常關閉的情況下霉旗,執(zhí)行unregisterBroker命令
    RouteInfoManager#scanNotActiveBroker
    public void scanNotActiveBroker() {
        Iterator<Entry<String, BrokerLiveInfo>> it = this.brokerLiveTable.entrySet().iterator();
        while (it.hasNext()) {
            Entry<String, BrokerLiveInfo> next = it.next();
            long last = next.getValue().getLastUpdateTimestamp();
            if ((last + BROKER_CHANNEL_EXPIRED_TIME) < System.currentTimeMillis()) {
                RemotingUtil.closeChannel(next.getValue().getChannel());
                it.remove();
                log.warn("The broker channel expired, {} {}ms", next.getKey(), BROKER_CHANNEL_EXPIRED_TIME);
                this.onChannelDestroy(next.getKey(), next.getValue().getChannel());
            }
        }
    }

onChannelDestroy 中申請寫鎖痴奏,然后遍歷移除。

路由發(fā)現(xiàn)

RocketMQ的路由發(fā)現(xiàn)不是實時的奖慌,當topic路由發(fā)生變化時抛虫,NameServer不主動推送給客戶端,而是客戶端定時拉取最新的主題信息简僧。根據(jù)主題名拉取路由信息的命令編碼命名為RequestCode.GET_ROUTEINTO_BY_TOPIC建椰。
請求依然由DefaultRequestProcessor處理。

case RequestCode.GET_ROUTEINTO_BY_TOPIC:
     return this.getRouteInfoByTopic(ctx, request);

核心實現(xiàn)

TopicRouteData topicRouteData = this.namesrvController.getRouteInfoManager().pickupTopicRouteData(requestHeader.getTopic());
public class TopicRouteData extends RemotingSerializable {
    private String orderTopicConf;//順序消息配置
    private List<QueueData> queueDatas;//隊列元數(shù)據(jù)
    private List<BrokerData> brokerDatas;//topic分布的broker原數(shù)據(jù)
    private HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;//broker上過濾服務器列表

核心實現(xiàn)則由 namesrvController.getRouteInfoManager().pickupTopicRouteData

總結(jié)

AF8AFD9D88EEEF147E552C3FACBC4979.jpg
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末岛马,一起剝皮案震驚了整個濱河市棉姐,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌啦逆,老刑警劉巖伞矩,帶你破解...
    沈念sama閱讀 222,000評論 6 515
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異夏志,居然都是意外死亡乃坤,警方通過查閱死者的電腦和手機,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 94,745評論 3 399
  • 文/潘曉璐 我一進店門沟蔑,熙熙樓的掌柜王于貴愁眉苦臉地迎上來湿诊,“玉大人,你說我怎么就攤上這事瘦材√耄” “怎么了?”我有些...
    開封第一講書人閱讀 168,561評論 0 360
  • 文/不壞的土叔 我叫張陵食棕,是天一觀的道長朗和。 經(jīng)常有香客問我错沽,道長,這世上最難降的妖魔是什么眶拉? 我笑而不...
    開封第一講書人閱讀 59,782評論 1 298
  • 正文 為了忘掉前任千埃,我火速辦了婚禮,結(jié)果婚禮上镀层,老公的妹妹穿的比我還像新娘镰禾。我一直安慰自己,他們只是感情好唱逢,可當我...
    茶點故事閱讀 68,798評論 6 397
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著屋休,像睡著了一般坞古。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上劫樟,一...
    開封第一講書人閱讀 52,394評論 1 310
  • 那天痪枫,我揣著相機與錄音,去河邊找鬼叠艳。 笑死奶陈,一個胖子當著我的面吹牛,可吹牛的內(nèi)容都是我干的附较。 我是一名探鬼主播吃粒,決...
    沈念sama閱讀 40,952評論 3 421
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼拒课!你這毒婦竟也來了徐勃?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 39,852評論 0 276
  • 序言:老撾萬榮一對情侶失蹤早像,失蹤者是張志新(化名)和其女友劉穎僻肖,沒想到半個月后,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體卢鹦,經(jīng)...
    沈念sama閱讀 46,409評論 1 318
  • 正文 獨居荒郊野嶺守林人離奇死亡臀脏,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 38,483評論 3 341
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了冀自。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片揉稚。...
    茶點故事閱讀 40,615評論 1 352
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖凡纳,靈堂內(nèi)的尸體忽然破棺而出窃植,到底是詐尸還是另有隱情,我是刑警寧澤荐糜,帶...
    沈念sama閱讀 36,303評論 5 350
  • 正文 年R本政府宣布巷怜,位于F島的核電站葛超,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏延塑。R本人自食惡果不足惜绣张,卻給世界環(huán)境...
    茶點故事閱讀 41,979評論 3 334
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望关带。 院中可真熱鬧侥涵,春花似錦、人聲如沸宋雏。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,470評論 0 24
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽磨总。三九已至嗦明,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間蚪燕,已是汗流浹背娶牌。 一陣腳步聲響...
    開封第一講書人閱讀 33,571評論 1 272
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留馆纳,地道東北人诗良。 一個月前我還...
    沈念sama閱讀 49,041評論 3 377
  • 正文 我出身青樓,卻偏偏與公主長得像鲁驶,于是被迫代替她去往敵國和親鉴裹。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當晚...
    茶點故事閱讀 45,630評論 2 359

推薦閱讀更多精彩內(nèi)容