RocketMQ NameServer 深入剖析

image.png

1、NameServer的作用

Name Server 是專為 RocketMQ 設(shè)計(jì)的輕量級(jí)名稱服務(wù),具有簡(jiǎn)單缀拭、可集群橫吐擴(kuò)展、無(wú)狀態(tài)填帽,節(jié)點(diǎn)之間互不通信等特點(diǎn)蛛淋。

整個(gè)Rocketmq集群的工作原理如下圖所示:


在這里插入圖片描述

任何Producer、Consumer篡腌、Broker與所有NameServer通信褐荷,向NameServer請(qǐng)求或者發(fā)送數(shù)據(jù)。而且都是單向的嘹悼,Producer和Consumer請(qǐng)求數(shù)據(jù)叛甫,Broker發(fā)送數(shù)據(jù)。正是因?yàn)檫@種單向的通信杨伙,RocketMQ水平擴(kuò)容變得很容易其监。

Broker集群

Broker用于接收生產(chǎn)者發(fā)送消息,或者消費(fèi)者消費(fèi)消息的請(qǐng)求限匣。一個(gè)Broker集群由多組Master/Slave組成抖苦,Master可寫(xiě)可讀,Slave只可以讀米死,Master將寫(xiě)入的數(shù)據(jù)同步給Slave锌历。每個(gè)Broker節(jié)點(diǎn),在啟動(dòng)時(shí)峦筒,都會(huì)遍歷NameServer列表究西,與每個(gè)NameServer建立長(zhǎng)連接,注冊(cè)自己的信息物喷,之后定時(shí)上報(bào)卤材。

Producer集群

消息的生產(chǎn)者,通過(guò)NameServer集群獲得Topic的路由信息脯丝,包括Topic下面有哪些Queue,這些Queue分布在哪些Broker上等伏伐。Producer只會(huì)將消息發(fā)送到Master節(jié)點(diǎn)上宠进,因此只需要與Master節(jié)點(diǎn)建立連接。

Consumer集群

消息的消費(fèi)者藐翎,通過(guò)NameServer集群獲得Topic的路由信息材蹬,連接到對(duì)應(yīng)的Broker上消費(fèi)消息实幕。注意,由于Master和Slave都可以讀取消息堤器,因此Consumer會(huì)與Master和Slave都建立連接昆庇。

2遗锣、NameServer類結(jié)構(gòu)

在這里插入圖片描述
  • NamesrvStartup: NameServer的啟動(dòng)類锦秒;
  • NamesrvController: NameServer的核心控制類幅狮;
  • KVConfigManager: 讀取或變更NameServer的配置屬性螺垢,加載NamesrvConfig中配置的配置文件到內(nèi)存拯田;
  • KVConfigSerializeWrapper: NameServer配置信息序列化包裝類叶沛;
  • RouteInfoManager: NameServer數(shù)據(jù)的載體瘪弓,記錄Broker劫映,Topic等信息乓旗;
  • DefaultRequestProcessor: NameServer處理請(qǐng)求的請(qǐng)求類府蛇,負(fù)責(zé)處理所有與NameServer交互的請(qǐng)求;
  • BrokerHousekeepingService: BrokerHouseKeepingService實(shí)現(xiàn)ChannelEventListener接口屿愚,可以說(shuō)是通道在發(fā)送異常時(shí)的回調(diào)方法(Nameserver與Broker的連接通道在關(guān)閉汇跨、通道發(fā)送異常、通道空閑時(shí))妆距;
  • NamesrvConfig: NamesrvConfig,主要指定nameserver的相關(guān)配置目錄屬性穷遂;
  • NettyRemotingServer: Netty服務(wù)類;

(1)NameServer啟動(dòng)流程

NameServer的啟動(dòng)是由NamesrvStartup完成的毅厚,啟動(dòng)過(guò)程如下:


在這里插入圖片描述
  1. 獲取并解析配置參數(shù)塞颁,包括NamesrvConfig和NettyServerConfig;
  2. 調(diào)用NamesrvController.initialize()初始化NamesrvController吸耿;若初始化失敗祠锣,則直接關(guān)閉NamesrvController;
  3. 然后調(diào)用NamesrvController.start()方法來(lái)開(kāi)啟NameServer服務(wù)咽安;
  4. 注冊(cè)ShutdownHookThread服務(wù)伴网。在JVM退出之前,調(diào)用NamesrvController.shutdown()來(lái)進(jìn)行關(guān)閉服務(wù)妆棒,釋放資源澡腾;
public class NamesrvStartup {

    private static InternalLogger log;
    private static Properties properties = null;
    private static CommandLine commandLine = null;

    public static void main(String[] args) {
        main0(args);
    }

    public static NamesrvController main0(String[] args) {

        try {
            // 創(chuàng)建NamesrvController
            NamesrvController controller = createNamesrvController(args);
            // 啟動(dòng)NamesrvController
            start(controller);
            String tip = "The Name Server boot success. serializeType=" + RemotingCommand.getSerializeTypeConfigInThisServer();
            log.info(tip);
            System.out.printf("%s%n", tip);
            return controller;
        } catch (Throwable e) {
            e.printStackTrace();
            System.exit(-1);
        }

        return null;
    }

    public static NamesrvController createNamesrvController(String[] args) throws IOException, JoranException {
        System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION));
        //PackageConflictDetect.detectFastjson();

        // 構(gòu)建命令行
        Options options = ServerUtil.buildCommandlineOptions(new Options());
        commandLine = ServerUtil.parseCmdLine("mqnamesrv", args, buildCommandlineOptions(options), new PosixParser());
        if (null == commandLine) {
            System.exit(-1);
            return null;
        }
        // nameServer配置參數(shù)
        final NamesrvConfig namesrvConfig = new NamesrvConfig();
        // netty server 配置參數(shù)
        final NettyServerConfig nettyServerConfig = new NettyServerConfig();
        nettyServerConfig.setListenPort(9876);
        // 命令行參數(shù)是否包含配置文件
        if (commandLine.hasOption('c')) {
            // 獲取配置文件路徑
            String file = commandLine.getOptionValue('c');
            if (file != null) {
                InputStream in = new BufferedInputStream(new FileInputStream(file));
                properties = new Properties();
                properties.load(in);
                MixAll.properties2Object(properties, namesrvConfig);
                MixAll.properties2Object(properties, nettyServerConfig);
                namesrvConfig.setConfigStorePath(file);
                System.out.printf("load config properties file OK, %s%n", file);
                in.close();
            }
        }

        // 是否打印參數(shù)
        if (commandLine.hasOption('p')) {
            // 都不打印
            MixAll.printObjectProperties(null, namesrvConfig);
            MixAll.printObjectProperties(null, nettyServerConfig);
            System.exit(0);
        }
        // 設(shè)置命令行的參數(shù),優(yōu)先級(jí)高(會(huì)覆蓋掉配置文件的配置項(xiàng))
        MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), namesrvConfig);
        // 未設(shè)置 rocketMQ home
        if (null == namesrvConfig.getRocketmqHome()) {
            System.out.printf("Please set the %s variable in your environment to match the location of the RocketMQ installation%n", MixAll.ROCKETMQ_HOME_ENV);
            System.exit(-2);
        }

        // 配置Logger
        LoggerContext lc = (LoggerContext) LoggerFactory.getILoggerFactory();
        JoranConfigurator configurator = new JoranConfigurator();
        configurator.setContext(lc);
        lc.reset();
        configurator.doConfigure(namesrvConfig.getRocketmqHome() + "/conf/logback_namesrv.xml");

        log = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);

        // 控制臺(tái)打印參數(shù)
        MixAll.printObjectProperties(log, namesrvConfig);
        MixAll.printObjectProperties(log, nettyServerConfig);

        // 創(chuàng)建 NamesrvController
        final NamesrvController controller = new NamesrvController(namesrvConfig, nettyServerConfig);

        // 注冊(cè)配置參數(shù)糕珊,防止丟失
        // remember all configs to prevent discard
        controller.getConfiguration().registerConfig(properties);
        return controller;
    }

    public static NamesrvController start(final NamesrvController controller) throws Exception {
        if (null == controller) {
            throw new IllegalArgumentException("NamesrvController is null");
        }

        // 初始化NamesrvController
        boolean initResult = controller.initialize();
        // 初始化失敗
        if (!initResult) {
            // 關(guān)閉NamesrvController
            controller.shutdown();
            // 關(guān)閉JVM
            System.exit(-3);
        }
        // 注冊(cè)關(guān)閉鉤子方法:當(dāng)JVM關(guān)閉的時(shí)候动分,先關(guān)閉NamesrvController
        Runtime.getRuntime().addShutdownHook(new ShutdownHookThread(log, new Callable<Void>() {
            @Override
            public Void call() throws Exception {
                // 關(guān)閉NamesrvController
                controller.shutdown();
                return null;
            }
        }));

        // 啟動(dòng)NamesrvController
        controller.start();
        return controller;
    }

    public static void shutdown(final NamesrvController controller) {
        controller.shutdown();
    }

    public static Options buildCommandlineOptions(final Options options) {
        Option opt = new Option("c", "configFile", true, "Name server config properties file");
        opt.setRequired(false);
        options.addOption(opt);

        opt = new Option("p", "printConfigItem", false, "Print all config item");
        opt.setRequired(false);
        options.addOption(opt);
        return options;
    }
}

調(diào)用NamesrvController.initialize()初始化NamesrvController

public boolean initialize() {
        // 加載KV配置
        this.kvConfigManager.load();

        // 初始化通信層
        this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService);

        // 初始化線程池
        this.remotingExecutor =
                Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(),
                    new ThreadFactoryImpl("RemotingExecutorThread_"));

        this.registerProcessor();

        // 增加定時(shí)任務(wù)
        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

            @Override
            public void run() {
                NamesrvController.this.routeInfoManager.scanNotActiveBroker();
            }
        }, 5, 10, TimeUnit.SECONDS);

        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

            @Override
            public void run() {
                NamesrvController.this.kvConfigManager.printAllPeriodically();
            }
        }, 1, 10, TimeUnit.MINUTES);

        // this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
        //
        // @Override
        // public void run() {
        // NamesrvController.this.routeInfoManager.printAllPeriodically();
        // }
        // }, 1, 5, TimeUnit.MINUTES);

        return true;
    }

加載KV配置,創(chuàng)建NettyServer網(wǎng)絡(luò)處理對(duì)象红选,然后開(kāi)啟兩個(gè)定時(shí)任務(wù)澜公,此類定時(shí)任務(wù)統(tǒng)稱為心跳檢測(cè)

  1. NameServer每隔10秒掃描一次Broker,移除處于不激活狀態(tài)的Broker
  2. NameServer每隔10分鐘打印一次KV配置

3喇肋、NameServer如何保證數(shù)據(jù)的最終一致

NameServer作為一個(gè)名稱服務(wù)坟乾,需要提供服務(wù)注冊(cè)迹辐、服務(wù)剔除、服務(wù)發(fā)現(xiàn)這些基本功能甚侣,但是NameServer節(jié)點(diǎn)之間并不通信明吩,在某個(gè)時(shí)刻各個(gè)節(jié)點(diǎn)數(shù)據(jù)可能不一致的情況下,如何保證客戶端可以最終拿到正確的數(shù)據(jù)殷费。下面分別從路由元信息印荔、路由注冊(cè)、路由剔除宗兼,路由發(fā)現(xiàn)四個(gè)角度進(jìn)行介紹躏鱼。

路由元信息

NameServer路由實(shí)現(xiàn)類: RoutelnfoManager

RouteInfoManager作為NameServer數(shù)據(jù)的載體,記錄Broker殷绍、Topic染苛、QueueData等信息。

Broker在啟動(dòng)時(shí)會(huì)將Broker信息主到、Topic信息茶行、QueueData信息注冊(cè)到所有的NameServer上,并和所有NameServer節(jié)點(diǎn)保持長(zhǎng)連接登钥,之后也會(huì)定時(shí)注冊(cè)信息畔师;

Producer、Consumer也會(huì)和其中一個(gè)NameServer節(jié)點(diǎn)保持長(zhǎng)連接牧牢,定時(shí)從NameServer中獲取Topic路由信息看锉;

   private final HashMap<String/* topic */, List<QueueData>> topicQueueTable;
    private final HashMap<String/* brokerName */, BrokerData> brokerAddrTable;
    private final HashMap<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable;
    private final HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable;
    private final HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;
  • topicQueueTable: Topic 消息隊(duì)列路由信息,消息發(fā)送時(shí)根據(jù)路由表進(jìn)行負(fù) 載均衡 塔鳍。
  • brokerAddrTable : Broker 基礎(chǔ)信息伯铣, 包含 brokerName、 所屬集群名稱 轮纫、 主備 Broker
    地址腔寡。
  • clusterAddrTable: Broker 集群信息,存儲(chǔ)集群中所有 Broker 名稱 掌唾。
  • brokerLiveTable: Broker 狀態(tài)信息 放前。 NameServer 每次 收到心跳包時(shí)會(huì) 替換該信 息 。
  • filterServerTable : Broker上的 FilterServer列表糯彬,用于類模式消息過(guò)濾凭语,

RocketMQ 基于訂閱發(fā)布機(jī)制 , 一個(gè) Topic 擁有 多 個(gè)消息隊(duì) 列 撩扒,一 個(gè) Broker 為每一主 題默 認(rèn)創(chuàng)建 4 個(gè)讀隊(duì)列 4 個(gè)寫(xiě)隊(duì)列 似扔。 多個(gè) Broker 組成 一個(gè)集群 , BrokerName 由相同的多臺(tái) Broker 組 成 Master-Slave 架構(gòu) , brokerId 為 0 代表 Master虫几, 大于 0 表示 Slave。 BrokerLivelnfo 中 的 lastUpdateTimestamp 存儲(chǔ)上次收到 Broker 心跳包的時(shí)間 挽拔。

路由注冊(cè)

對(duì)于Zookeeper辆脸、Etcd這樣強(qiáng)一致性組件,數(shù)據(jù)只要寫(xiě)到主節(jié)點(diǎn)螃诅,內(nèi)部會(huì)通過(guò)狀態(tài)機(jī)將數(shù)據(jù)復(fù)制到其他節(jié)點(diǎn)啡氢,Zookeeper使用的是Zab協(xié)議,etcd使用的是raft協(xié)議术裸。

但是NameServer節(jié)點(diǎn)之間是互不通信的倘是,無(wú)法進(jìn)行數(shù)據(jù)復(fù)制。RocketMQ采取的策略是袭艺,在Broker節(jié)點(diǎn)在啟動(dòng)的時(shí)候搀崭,輪訓(xùn)NameServer列表,與每個(gè)NameServer節(jié)點(diǎn)建立長(zhǎng)連接猾编,發(fā)起注冊(cè)請(qǐng)求瘤睹。NameServer內(nèi)部會(huì)維護(hù)一個(gè)Broker表,用來(lái)動(dòng)態(tài)存儲(chǔ)Broker的信息答倡。

同時(shí)轰传,Broker節(jié)點(diǎn)為了證明自己是存活的,會(huì)將最新的信息上報(bào)給NameServer瘪撇,然后每隔30秒向NameServer發(fā)送心跳包获茬,心跳包中包含 BrokerId、Broker地址倔既、Broker名稱恕曲、Broker所屬集群名稱等等,然后NameServer接收到心跳包后叉存,會(huì)更新時(shí)間戳码俩,記錄這個(gè)Broker的最新存活時(shí)間。

NameServer在處理心跳包的時(shí)候歼捏,存在多個(gè)Broker同時(shí)操作一張Broker表稿存,為了防止并發(fā)修改Broker表導(dǎo)致不安全,路由注冊(cè)操作引入了ReadWriteLock讀寫(xiě)鎖瞳秽,這個(gè)設(shè)計(jì)亮點(diǎn)允許多個(gè)消息生產(chǎn)者并發(fā)讀瓣履,保證了消息發(fā)送時(shí)的高并發(fā),但是同一時(shí)刻N(yùn)ameServer只能處理一個(gè)Broker心跳包练俐,多個(gè)心跳包串行處理袖迎。這也是讀寫(xiě)鎖的經(jīng)典使用場(chǎng)景,即讀多寫(xiě)少。

Broker端心跳包發(fā)送

Broker端心跳包發(fā)送( BrokerController#start)

 this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

            @Override
            public void run() {
                try {
                    BrokerController.this.registerBrokerAll(true, false);
                }
                catch (Exception e) {
                    log.error("registerBrokerAll Exception", e);
                }
            }
        }, 1000 * 10, 1000 * 30, TimeUnit.MILLISECONDS);

該方法主要是遍歷 NameServer列表燕锥, Broker消息服務(wù)器依次向 NameServer發(fā)送心跳包辜贵。

public RegisterBrokerResult registerBrokerAll(//
            final String clusterName,// 1
            final String brokerAddr,// 2
            final String brokerName,// 3
            final long brokerId,// 4
            final String haServerAddr,// 5
            final TopicConfigSerializeWrapper topicConfigWrapper,// 6
            final List<String> filterServerList,// 7
            final boolean oneway// 8
    ) {
        RegisterBrokerResult registerBrokerResult = null;

        List<String> nameServerAddressList = this.remotingClient.getNameServerAddressList();
        if (nameServerAddressList != null) {
            for (String namesrvAddr : nameServerAddressList) {
                try {
                    RegisterBrokerResult result =
                            this.registerBroker(namesrvAddr, clusterName, brokerAddr, brokerName, brokerId,
                                haServerAddr, topicConfigWrapper, filterServerList, oneway);
                    if (result != null) {
                        registerBrokerResult = result;
                    }

                    log.info("register broker to name server {} OK", namesrvAddr);
                }
                catch (Exception e) {
                    log.warn("registerBroker Exception, " + namesrvAddr, e);
                }
            }
        }

NameServer端處理心跳包

Step1:路由 注冊(cè)需要加寫(xiě)鎖 ,防止并發(fā)修改 RoutelnfoManager 中的路由 表 归形。 Broker 所屬 集群是否存在托慨, 如果不存在,則創(chuàng) 建暇榴,然 后將 broker 名加入到集群 合中厚棵。

Step2 :維護(hù) BrokerData信息,首先從 brokerAddrTable根據(jù) BrokerName嘗試獲取 Broker信息蔼紧,如果不存在婆硬, 則新建 BrokerData并放入到 brokerAddrTable, registerFirst設(shè) 置為 true;如果存在 , 直接替換原先的奸例, registerFirst設(shè)置為 false彬犯,表示非第一次注冊(cè) 。

Step3 :如果Broker為Master查吊,并且BrokerTopic配置信息發(fā)生變化或者是初次注冊(cè)躏嚎, 則需要?jiǎng)?chuàng)建或更新 Topic路由元數(shù)據(jù),填充 topicQueueTable菩貌, 其實(shí)就是為默認(rèn)主題自動(dòng)注 冊(cè)路由信息卢佣,其中包含 MixAII.DEFAULT TOPIC 的路由信息。 當(dāng)消息生產(chǎn)者發(fā)送主題時(shí)箭阶, 如果該主題未創(chuàng)建并且BrokerConfig的autoCreateTopicEnable為true時(shí)虚茶, 將返回MixAII. DEFAULT TOPIC的路由信息。

路由剔除

正常情況下仇参,如果Broker關(guān)閉嘹叫,則會(huì)與NameServer斷開(kāi)長(zhǎng)連接,Netty的通道關(guān)閉監(jiān)聽(tīng)器會(huì)監(jiān)聽(tīng)到連接斷開(kāi)事件诈乒,然后會(huì)將這個(gè)Broker信息剔除掉罩扇。

Broker 每隔 30s 向 NameServer 發(fā)送一個(gè)心跳包,心跳包中包含 BrokerId怕磨、Broker地址喂饥、Broker名稱、 Broker所屬集群名稱肠鲫、Broker關(guān)聯(lián)的 FilterServer列表员帮。 但是如果 Broker若機(jī) , NameServer無(wú)法收到心跳包导饲,此時(shí) NameServer如何來(lái)剔除這些失 效的 Broker 呢? Name Server會(huì)每隔 IOs 掃描 brokerLiveTable狀態(tài)表捞高,如果 BrokerLive 的 lastUpdateTimestamp 的時(shí)間戳距當(dāng)前時(shí)間超過(guò) 120s氯材,則認(rèn)為 Broker失效,移除該 Broker, 關(guān)閉與Broker連接硝岗,并同時(shí)更新topicQueueTable氢哮、 brokerAddrTable、 brokerLiveTable型檀、 filterServerTable命浴。

RocktMQ 有兩個(gè)觸發(fā)點(diǎn)來(lái)觸發(fā)路由刪除 。

  1. NameServer定時(shí)掃描 brokerLiveTable檢測(cè)上次心跳包與 當(dāng)前系統(tǒng)時(shí)間的時(shí)間差贱除, 如果時(shí)間戳大于 120s,則需要移除該 Broker 信息 媳溺。
  2. Broker在正常被關(guān)閉的情況下月幌,會(huì)執(zhí)行 unregisterBroker指令。

路由發(fā)現(xiàn)

RocketMQ 路由發(fā)現(xiàn)是非實(shí)時(shí)的悬蔽,當(dāng) Topic 路由出現(xiàn)變化后扯躺, NameServer不主動(dòng)推送給客戶端 , 而 是由客戶端定時(shí)拉取主題最新的路由 蝎困。

com.alibaba.rocketmq.filtersrv.processor.DefaultRequestProcessor#pullMessageForward

  private RemotingCommand pullMessageForward(final ChannelHandlerContext ctx, final RemotingCommand request)
            throws Exception {
        final RemotingCommand response =
                RemotingCommand.createResponseCommand(PullMessageResponseHeader.class);
        final PullMessageResponseHeader responseHeader =
                (PullMessageResponseHeader) response.readCustomHeader();
        final PullMessageRequestHeader requestHeader =
                (PullMessageRequestHeader) request.decodeCommandCustomHeader(PullMessageRequestHeader.class);

        // 由于異步返回录语,所以必須要設(shè)置
        response.setOpaque(request.getOpaque());

        DefaultMQPullConsumer pullConsumer = this.filtersrvController.getDefaultMQPullConsumer();
        final FilterClassInfo findFilterClass =
                this.filtersrvController.getFilterClassManager().findFilterClass(
                    requestHeader.getConsumerGroup(), requestHeader.getTopic());
        if (null == findFilterClass) {
            response.setCode(ResponseCode.SYSTEM_ERROR);
            response.setRemark("Find Filter class failed, not registered");
            return response;
        }

        if (null == findFilterClass.getMessageFilter()) {
            response.setCode(ResponseCode.SYSTEM_ERROR);
            response.setRemark("Find Filter class failed, registered but no class");
            return response;
        }

        responseHeader.setSuggestWhichBrokerId(MixAll.MASTER_ID);

        // 構(gòu)造從Broker拉消息的參數(shù)
        MessageQueue mq = new MessageQueue();
        mq.setTopic(requestHeader.getTopic());
        mq.setQueueId(requestHeader.getQueueId());
        mq.setBrokerName(this.filtersrvController.getBrokerName());
        long offset = requestHeader.getQueueOffset();
        int maxNums = requestHeader.getMaxMsgNums();

        final PullCallback pullCallback = new PullCallback() {

            @Override
            public void onSuccess(PullResult pullResult) {
                responseHeader.setMaxOffset(pullResult.getMaxOffset());
                responseHeader.setMinOffset(pullResult.getMinOffset());
                responseHeader.setNextBeginOffset(pullResult.getNextBeginOffset());
                response.setRemark(null);

                switch (pullResult.getPullStatus()) {
                case FOUND:
                    response.setCode(ResponseCode.SUCCESS);

                    List<MessageExt> msgListOK = new ArrayList<MessageExt>();
                    try {
                        for (MessageExt msg : pullResult.getMsgFoundList()) {
                            boolean match = findFilterClass.getMessageFilter().match(msg);
                            if (match) {
                                msgListOK.add(msg);
                            }
                        }

                        // 有消息返回
                        if (!msgListOK.isEmpty()) {
                            returnResponse(requestHeader.getConsumerGroup(), requestHeader.getTopic(), ctx,
                                response, msgListOK);
                            return;
                        }
                        // 全部都被過(guò)濾掉了
                        else {
                            response.setCode(ResponseCode.PULL_RETRY_IMMEDIATELY);
                        }
                    }
                    // 只要拋異常,就終止過(guò)濾禾乘,并返回客戶端異常
                    catch (Throwable e) {
                        final String error =
                                String.format("do Message Filter Exception, ConsumerGroup: %s Topic: %s ",
                                    requestHeader.getConsumerGroup(), requestHeader.getTopic());
                        log.error(error, e);

                        response.setCode(ResponseCode.SYSTEM_ERROR);
                        response.setRemark(error + RemotingHelper.exceptionSimpleDesc(e));
                        returnResponse(requestHeader.getConsumerGroup(), requestHeader.getTopic(), ctx,
                            response, null);
                        return;
                    }

                    break;
                case NO_MATCHED_MSG:
                    response.setCode(ResponseCode.PULL_RETRY_IMMEDIATELY);
                    break;
                case NO_NEW_MSG:
                    response.setCode(ResponseCode.PULL_NOT_FOUND);
                    break;
                case OFFSET_ILLEGAL:
                    response.setCode(ResponseCode.PULL_OFFSET_MOVED);
                    break;
                default:
                    break;
                }

                returnResponse(requestHeader.getConsumerGroup(), requestHeader.getTopic(), ctx, response,
                    null);
            }


            @Override
            public void onException(Throwable e) {
                response.setCode(ResponseCode.SYSTEM_ERROR);
                response.setRemark("Pull Callback Exception, " + RemotingHelper.exceptionSimpleDesc(e));
                returnResponse(requestHeader.getConsumerGroup(), requestHeader.getTopic(), ctx, response,
                    null);
                return;
            }
        };

        pullConsumer.pullBlockIfNotFound(mq, null, offset, maxNums, pullCallback);

        return null;
    }
  1. 調(diào)用 RouterlnfoManager 的方法澎埠,從路由 表 topicQueueTable、 brokerAddrTable始藕、 filterServerTable中分別填充TopicRouteData中的List<Queu巳Data>蒲稳、List<BrokerData>和 filterServer 地址表 。
  2. 如果找到主題對(duì)應(yīng)的路由信息并且該主題為順序消息伍派,則從 NameServer KVconfig 中獲取關(guān)于順序消息相關(guān) 的配置填充路由信息 江耀。

如果找不到路由信息 CODE 則使用 TOPIC NOT_EXISTS ,表示沒(méi)有找到對(duì)應(yīng)的路由 诉植。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末祥国,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子晾腔,更是在濱河造成了極大的恐慌舌稀,老刑警劉巖,帶你破解...
    沈念sama閱讀 216,651評(píng)論 6 501
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件灼擂,死亡現(xiàn)場(chǎng)離奇詭異扩借,居然都是意外死亡,警方通過(guò)查閱死者的電腦和手機(jī)缤至,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,468評(píng)論 3 392
  • 文/潘曉璐 我一進(jìn)店門(mén)潮罪,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)康谆,“玉大人,你說(shuō)我怎么就攤上這事嫉到∥职担” “怎么了?”我有些...
    開(kāi)封第一講書(shū)人閱讀 162,931評(píng)論 0 353
  • 文/不壞的土叔 我叫張陵何恶,是天一觀的道長(zhǎng)孽锥。 經(jīng)常有香客問(wèn)我,道長(zhǎng)细层,這世上最難降的妖魔是什么惜辑? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 58,218評(píng)論 1 292
  • 正文 為了忘掉前任,我火速辦了婚禮疫赎,結(jié)果婚禮上盛撑,老公的妹妹穿的比我還像新娘。我一直安慰自己捧搞,他們只是感情好抵卫,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,234評(píng)論 6 388
  • 文/花漫 我一把揭開(kāi)白布。 她就那樣靜靜地躺著胎撇,像睡著了一般介粘。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上晚树,一...
    開(kāi)封第一講書(shū)人閱讀 51,198評(píng)論 1 299
  • 那天姻采,我揣著相機(jī)與錄音,去河邊找鬼爵憎。 笑死偎谁,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的纲堵。 我是一名探鬼主播巡雨,決...
    沈念sama閱讀 40,084評(píng)論 3 418
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼席函!你這毒婦竟也來(lái)了铐望?” 一聲冷哼從身側(cè)響起,我...
    開(kāi)封第一講書(shū)人閱讀 38,926評(píng)論 0 274
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤茂附,失蹤者是張志新(化名)和其女友劉穎正蛙,沒(méi)想到半個(gè)月后,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體营曼,經(jīng)...
    沈念sama閱讀 45,341評(píng)論 1 311
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡乒验,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,563評(píng)論 2 333
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了蒂阱。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片锻全。...
    茶點(diǎn)故事閱讀 39,731評(píng)論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡狂塘,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出鳄厌,到底是詐尸還是另有隱情荞胡,我是刑警寧澤,帶...
    沈念sama閱讀 35,430評(píng)論 5 343
  • 正文 年R本政府宣布了嚎,位于F島的核電站泪漂,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏歪泳。R本人自食惡果不足惜萝勤,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,036評(píng)論 3 326
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望呐伞。 院中可真熱鬧敌卓,春花似錦、人聲如沸荸哟。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 31,676評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)鞍历。三九已至,卻和暖如春肪虎,著一層夾襖步出監(jiān)牢的瞬間劣砍,已是汗流浹背。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 32,829評(píng)論 1 269
  • 我被黑心中介騙來(lái)泰國(guó)打工扇救, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留刑枝,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 47,743評(píng)論 2 368
  • 正文 我出身青樓迅腔,卻偏偏與公主長(zhǎng)得像装畅,于是被迫代替她去往敵國(guó)和親。 傳聞我的和親對(duì)象是個(gè)殘疾皇子沧烈,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,629評(píng)論 2 354

推薦閱讀更多精彩內(nèi)容