Rocketmq源碼-namesrv模塊詳解

Rocketmq 使用 namesrv 來(lái)管理所有的元數(shù)據(jù)信息疯暑,包括主題 Topic 路由信息和 Broker 信息矫夯。
首先我們介紹一下一些基礎(chǔ)概念:

  1. Broker : 儲(chǔ)存消息的服務(wù)器。
    • 分為主從兩種模式欺税,通過(guò) brokerId 來(lái)區(qū)分,目前 brokerId = 0 就表示主節(jié)點(diǎn)。
    • 每個(gè) Broker 啟動(dòng)時(shí)颊埃,會(huì)向 namesrv 注冊(cè)自己的信息,并會(huì)定期發(fā)送心跳信息蝶俱。
  2. Broker 組 : 相同 brokerName 名字的 Broker 服務(wù)器就是一個(gè)組的班利。

    注意: 這里就有一個(gè)小問(wèn)題,如果兩個(gè) Broker 有相同brokerName名字榨呆,而且 brokerId 都是 0 時(shí)罗标,它們都可以向 namesrv 注冊(cè)自己信息,后面覆蓋前面信息积蜻,而且因?yàn)樗鼈兌紩?huì)發(fā)送心跳消息闯割,就會(huì)導(dǎo)致不斷地相互覆蓋。

  3. Broker 集群 : 有相同 clusterName 名字的Broker 服務(wù)器就是同一個(gè)集群的竿拆。
  4. Topic : 主題 Topic 是以 Broker 組進(jìn)行區(qū)分的纽谒。
    • Broker 組有一個(gè) TopicConfigManager 來(lái)管理該 Broker 所擁有的所有主題 Topic 信息,包括主題Topic 的權(quán)限perm如输,讀隊(duì)列數(shù)量readQueueNums鼓黔,writeQueueNums 寫隊(duì)列數(shù)量等等。
    • Broker 組中主 Broker 創(chuàng)建主題 Topic 在這個(gè)Broker 組擁有隊(duì)列文件Queue不见,從 Broker 只是復(fù)制主 Broker澳化。
    • 生產(chǎn)者發(fā)送消息時(shí),也是從主題 Topic 擁有的Broker 組數(shù)組中稳吮,挑選一個(gè)Broker 組缎谷,向這個(gè)Broker 組的主 Broker 發(fā)送消息,然后主 Broker 再將這些數(shù)據(jù)發(fā)送給從 Broker灶似。

一. NamesrvStartup

這個(gè)類是 namesrv 的啟動(dòng)類列林,用來(lái)開啟 namesrv瑞你。

1.1 main 方法

 public static void main(String[] args) {
        main0(args);
    }

    public static NamesrvController main0(String[] args) {

        try {
            // 創(chuàng)建 NamesrvController
            NamesrvController controller = createNamesrvController(args);
            // 啟動(dòng) NamesrvController
            start(controller);
            String tip = "The Name Server boot success. serializeType=" + RemotingCommand.getSerializeTypeConfigInThisServer();
            log.info(tip);
            System.out.printf("%s%n", tip);
            return controller;
        } catch (Throwable e) {
            e.printStackTrace();
            System.exit(-1);
        }

        return null;
    }
  1. 通過(guò) createNamesrvController() 方法創(chuàng)建 NamesrvController 實(shí)例。
  2. 調(diào)用 start(controller) 方法啟動(dòng) NamesrvController希痴。

1.2 createNamesrvController 方法

 public static NamesrvController createNamesrvController(String[] args) throws IOException, JoranException {
        System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION));
        //PackageConflictDetect.detectFastjson();

        // 通過(guò) commandLine 解析 args 參數(shù)
        Options options = ServerUtil.buildCommandlineOptions(new Options());
        commandLine = ServerUtil.parseCmdLine("mqnamesrv", args, buildCommandlineOptions(options), new PosixParser());
        if (null == commandLine) {
            System.exit(-1);
            return null;
        }

        // namesrv 相關(guān)的配置參數(shù)
        final NamesrvConfig namesrvConfig = new NamesrvConfig();
        // netty 相關(guān)的配置參數(shù)
        final NettyServerConfig nettyServerConfig = new NettyServerConfig();
        // 默認(rèn)端口 9876者甲, 但是可以通過(guò) -c 傳入的 properties 文件參數(shù)進(jìn)行覆蓋
        nettyServerConfig.setListenPort(9876);
        if (commandLine.hasOption('c')) {
            String file = commandLine.getOptionValue('c');
            if (file != null) {
                InputStream in = new BufferedInputStream(new FileInputStream(file));
                properties = new Properties();
                properties.load(in);
                MixAll.properties2Object(properties, namesrvConfig);
                MixAll.properties2Object(properties, nettyServerConfig);

                namesrvConfig.setConfigStorePath(file);

                System.out.printf("load config properties file OK, %s%n", file);
                in.close();
            }
        }

        if (commandLine.hasOption('p')) {
            InternalLogger console = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_CONSOLE_NAME);
            MixAll.printObjectProperties(console, namesrvConfig);
            MixAll.printObjectProperties(console, nettyServerConfig);
            System.exit(0);
        }

        MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), namesrvConfig);

        if (null == namesrvConfig.getRocketmqHome()) {
            System.out.printf("Please set the %s variable in your environment to match the location of the RocketMQ installation%n", MixAll.ROCKETMQ_HOME_ENV);
            System.exit(-2);
        }

        LoggerContext lc = (LoggerContext) LoggerFactory.getILoggerFactory();
        JoranConfigurator configurator = new JoranConfigurator();
        configurator.setContext(lc);
        lc.reset();
        configurator.doConfigure(namesrvConfig.getRocketmqHome() + "/conf/logback_namesrv.xml");

        log = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);

        MixAll.printObjectProperties(log, namesrvConfig);
        MixAll.printObjectProperties(log, nettyServerConfig);

        // 創(chuàng)建 NamesrvController
        final NamesrvController controller = new NamesrvController(namesrvConfig, nettyServerConfig);

        // remember all configs to prevent discard
        controller.getConfiguration().registerConfig(properties);

        return controller;
    }
  1. 通過(guò) commandLine 解析命令行 args 參數(shù)。
  2. 如果指定了配置文件砌创,那么讀取配置文件中的配置項(xiàng)虏缸,并賦值到 namesrvConfignettyServerConfig
  3. 創(chuàng)建 NamesrvController 實(shí)例嫩实,并將配置項(xiàng)數(shù)據(jù) properties 賦值到 Configuration 中刽辙。

1.3 start 方法

    public static NamesrvController start(final NamesrvController controller) throws Exception {

        if (null == controller) {
            throw new IllegalArgumentException("NamesrvController is null");
        }

        // 初始化 NamesrvController
        boolean initResult = controller.initialize();
        if (!initResult) {
            // 初始化失敗,就退出
            controller.shutdown();
            System.exit(-3);
        }

        // 添加鉤子函數(shù)甲献,保證 JVM 正常退出時(shí)宰缤,關(guān)閉 NamesrvController
        Runtime.getRuntime().addShutdownHook(new ShutdownHookThread(log, new Callable<Void>() {
            @Override
            public Void call() throws Exception {
                controller.shutdown();
                return null;
            }
        }));

        // 開啟 NamesrvController
        controller.start();

        return controller;
    }

非常簡(jiǎn)單,先初始化 NamesrvController晃洒;然后添加鉤子函數(shù)撵溃,保證 JVM 正常退出時(shí),關(guān)閉 NamesrvController锥累;最后調(diào)用 start() 方法啟動(dòng)。

二. NamesrvController

2.1 成員屬性

    // Namesrv 的配置項(xiàng)
    private final NamesrvConfig namesrvConfig;

    // Netty 服務(wù)端配置項(xiàng)
    private final NettyServerConfig nettyServerConfig;

    // 定時(shí)器集歇,用來(lái)定期檢查是否有不活躍的 broker桶略,以及定期打印 kvConfigManager 中的值
    private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl(
        "NSScheduledThread"));
    // 儲(chǔ)存 KV 的值
    private final KVConfigManager kvConfigManager;
    //  namesrv 所有路由信息的管理器
    private final RouteInfoManager routeInfoManager;

    // 遠(yuǎn)程RPC服務(wù)服務(wù)端,用來(lái)處理遠(yuǎn)程請(qǐng)求命令
    private RemotingServer remotingServer;

    // ChannelEventListener 接口子類诲宇,監(jiān)聽 Netty 的 CONNECT, CLOSE, IDLE, EXCEPTION 事件际歼,進(jìn)行對(duì)應(yīng)處理
    private BrokerHousekeepingService brokerHousekeepingService;

    // 處理遠(yuǎn)程請(qǐng)求的線程池執(zhí)行器
    private ExecutorService remotingExecutor;
    // 配置項(xiàng)
    private Configuration configuration;
    // 監(jiān)控 file 變化,主要用于 SSL
    private FileWatchService fileWatchService;
  1. namesrvConfignettyServerConfig: Namesrv 的配置項(xiàng)和 Netty 服務(wù)端配置項(xiàng)姑蓝。
  2. scheduledExecutorService : 定時(shí)器

    用來(lái)定期檢查是否有不活躍的 Broker鹅心,以及定期打印 kvConfigManager 中的值。

  3. kvConfigManager : KV 值的管理器纺荧。
  4. routeInfoManager : 所有路由信息的管理器旭愧。
  5. remotingServer : 遠(yuǎn)程RPC服務(wù)服務(wù)端,用來(lái)處理遠(yuǎn)程請(qǐng)求命令宙暇。
  6. brokerHousekeepingService : ChannelEventListener 接口子類输枯。

    監(jiān)聽 NettyCONNECT, CLOSE, IDLE, EXCEPTION 事件,進(jìn)行對(duì)應(yīng)處理占贫。

  7. remotingExecutor : 處理遠(yuǎn)程請(qǐng)求的線程池執(zhí)行器桃熄。

2.2 initialize 方法

    public boolean initialize() {

        // 從 kvConfig.json 文件中加載之前存儲(chǔ)的 KV 值
        this.kvConfigManager.load();

        // 通過(guò) netty 創(chuàng)建一個(gè)服務(wù)端
        this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService);

        // 用于處理請(qǐng)求的線程池 remotingExecutor
        this.remotingExecutor =
            Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(), new ThreadFactoryImpl("RemotingExecutorThread_"));

        // 注冊(cè)請(qǐng)求命令處理器
        this.registerProcessor();

        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

            @Override
            public void run() {
                // 每隔10秒 檢查是否有不活躍的 broker
                NamesrvController.this.routeInfoManager.scanNotActiveBroker();
            }
        }, 5, 10, TimeUnit.SECONDS);

        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

            @Override
            public void run() {
                // 每隔10秒打印一下 kvConfigManager 中的值
                NamesrvController.this.kvConfigManager.printAllPeriodically();
            }
        }, 1, 10, TimeUnit.MINUTES);

        // 處理 SSL
        if (TlsSystemConfig.tlsMode != TlsMode.DISABLED) {
            // Register a listener to reload SslContext
            try {
                fileWatchService = new FileWatchService(
                    new String[] {
                        TlsSystemConfig.tlsServerCertPath,
                        TlsSystemConfig.tlsServerKeyPath,
                        TlsSystemConfig.tlsServerTrustCertPath
                    },
                    new FileWatchService.Listener() {
                        boolean certChanged, keyChanged = false;
                        @Override
                        public void onChanged(String path) {
                            if (path.equals(TlsSystemConfig.tlsServerTrustCertPath)) {
                                log.info("The trust certificate changed, reload the ssl context");
                                reloadServerSslContext();
                            }
                            if (path.equals(TlsSystemConfig.tlsServerCertPath)) {
                                certChanged = true;
                            }
                            if (path.equals(TlsSystemConfig.tlsServerKeyPath)) {
                                keyChanged = true;
                            }
                            if (certChanged && keyChanged) {
                                log.info("The certificate and private key changed, reload the ssl context");
                                certChanged = keyChanged = false;
                                reloadServerSslContext();
                            }
                        }
                        private void reloadServerSslContext() {
                            ((NettyRemotingServer) remotingServer).loadSslContext();
                        }
                    });
            } catch (Exception e) {
                log.warn("FileWatchService created error, can't load the certificate dynamically");
            }
        }

        return true;
    }
  1. kvConfig.json 文件中加載之前存儲(chǔ)的 KV 值。
  2. 創(chuàng)建一個(gè)遠(yuǎn)程RPC服務(wù)服務(wù)端型奥,用來(lái)處理遠(yuǎn)程請(qǐng)求命令瞳收。
  3. 用于處理請(qǐng)求的線程池 remotingExecutor碉京。
  4. 注冊(cè)請(qǐng)求命令處理器。
  5. 通過(guò) scheduledExecutorService 每隔10秒檢查是否有不活躍的 Broker螟深,以及每隔10秒打印一下 kvConfigManager 中的值谐宙。
  6. 最后處理 SSL

三. RouteInfoManager

3.1 成員屬性

    // 主題 topic 對(duì)應(yīng)的隊(duì)列相關(guān)信息QueueData血崭,這里是 List 原因是每個(gè) broker 組都有一個(gè) QueueData卧惜。
    private final HashMap<String/* topic */, List<QueueData>> topicQueueTable;
    // broker 組基礎(chǔ)信息集合,包括 broker 名字夹纫,所屬集群名字和主從 broker 的地址咽瓷。
    private final HashMap<String/* brokerName */, BrokerData> brokerAddrTable;
    // broker 集群集合,包括所有集群名字以及它擁有所有broker 組名字舰讹。
    private final HashMap<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable;
    // 每個(gè) broker 的狀態(tài)信息茅姜,每次收到心跳包時(shí),都會(huì)替換對(duì)應(yīng)數(shù)據(jù)月匣。
    private final HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable;
    // 每個(gè) broker 對(duì)應(yīng)的 FilterServer 地址列表钻洒,用于類模式消息過(guò)濾。
    private final HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;
  1. topicQueueTable : 所有主題相關(guān)信息集合锄开。
    • 每個(gè)主題 Topic 對(duì)應(yīng)一個(gè) List<QueueData> 集合素标,因?yàn)橐粋€(gè)主題有多個(gè) Broker 組。
    • QueueData 包括 Broker組的名字萍悴,這個(gè) Broker組中當(dāng)期主題Topic對(duì)應(yīng)的可讀隊(duì)列數(shù)量头遭,可寫隊(duì)列數(shù)量,讀寫權(quán)限和同步標(biāo)記癣诱。
  2. brokerAddrTable : Broker 組基礎(chǔ)信息集合计维。

    BrokerData 包括 Broker 組名字,所屬集群名字和主從 Broker 的地址撕予。

  3. clusterAddrTable : Broker 集群集合鲫惶,key 是集群名字,value 是集群擁有所有的Broker組名字实抡。
  4. brokerLiveTable : 每個(gè) Broker 的狀態(tài)信息欠母。
  5. filterServerTable : 每個(gè) Broker 對(duì)應(yīng)的 FilterServer 地址列表,用于類模式消息過(guò)濾吆寨。

3.1.1 QueueData

public class QueueData implements Comparable<QueueData> {
    // broker組的名字
    private String brokerName;
    // 可讀隊(duì)列數(shù)量
    private int readQueueNums;
    // 可寫隊(duì)列數(shù)量
    private int writeQueueNums;
    // 讀寫權(quán)限艺蝴,具體參考 PermName,
    private int perm;
    // 主題Topic 同步標(biāo)記; 參考TopicSysFlag類: FLAG_UNIT = 0x1 << 0, FLAG_UNIT_SUB = 0x1 << 1
    private int topicSynFlag;
}

3.1.2 BrokerData

public class BrokerData implements Comparable<BrokerData> {
    // broker 組所屬集群的名字
    private String cluster;
    // broker 組的名字
    private String brokerName;
    // broker 組中所有 broker 的地址;其中 brokerId = 0 表示主 broker鸟废,其他的都是從 broker猜敢。
    private HashMap<Long/* brokerId */, String/* broker address */> brokerAddrs;
}

3.1.3 BrokerLiveInfo

class BrokerLiveInfo {
    // 最近一次更新時(shí)間,用來(lái)判斷這個(gè) broker 是否活躍
    private long lastUpdateTimestamp;
    // 這個(gè) broker 的數(shù)據(jù)版本,可以用來(lái)判斷這個(gè) Broker 的數(shù)據(jù)是否改變過(guò)缩擂。
    private DataVersion dataVersion;
    // 連接這個(gè) broker 的通道channel
    private Channel channel;
    // 該 broker 的 HaServer地址
    private String haServerAddr;
}

3.2 重要方法

3.2.1 registerBroker 方法

    /**
     * 注冊(cè) Broker
     */
    public RegisterBrokerResult registerBroker(
        final String clusterName,
        final String brokerAddr,
        final String brokerName,
        final long brokerId,
        final String haServerAddr,
        final TopicConfigSerializeWrapper topicConfigWrapper,
        final List<String> filterServerList,
        final Channel channel) {
        RegisterBrokerResult result = new RegisterBrokerResult();
        try {
            try {
                this.lock.writeLock().lockInterruptibly();

                // 這個(gè) broker 所屬的集群clusterName 是否已經(jīng)在 clusterAddrTable 中
                Set<String> brokerNames = this.clusterAddrTable.get(clusterName);
                if (null == brokerNames) {
                    brokerNames = new HashSet<String>();
                    this.clusterAddrTable.put(clusterName, brokerNames);
                }
                // 因?yàn)?brokerNames 是 Set 類型鼠冕,會(huì)自動(dòng)去重,所以這里直接添加
                brokerNames.add(brokerName);

                // 是否第一次注冊(cè)
                boolean registerFirst = false;

                // 通過(guò) brokerName 從 brokerAddrTable 中獲取對(duì)應(yīng)的 BrokerData胯盯。
                BrokerData brokerData = this.brokerAddrTable.get(brokerName);
                if (null == brokerData) {
                    registerFirst = true;
                    // 如果不存在懈费,就創(chuàng)建新的,并存入 brokerAddrTable 中博脑。
                    brokerData = new BrokerData(clusterName, brokerName, new HashMap<Long, String>());
                    this.brokerAddrTable.put(brokerName, brokerData);
                }
                Map<Long, String> brokerAddrsMap = brokerData.getBrokerAddrs();
                //Switch slave to master: first remove <1, IP:PORT> in namesrv, then add <0, IP:PORT>
                //The same IP:PORT must only have one record in brokerAddrTable
                // 處理 slave 變成 master 的情況
                Iterator<Entry<Long, String>> it = brokerAddrsMap.entrySet().iterator();
                while (it.hasNext()) {
                    Entry<Long, String> item = it.next();
                    // 如果 brokerAddr 相同憎乙,但是 brokerId 不一樣,說(shuō)明這個(gè) broker 修改 brokerId叉趣,
                    // 那么就先把它從 brokerAddrsMap 集合中移除泞边。
                    if (null != brokerAddr && brokerAddr.equals(item.getValue()) && brokerId != item.getKey()) {
                        it.remove();
                    }
                }

                String oldAddr = brokerData.getBrokerAddrs().put(brokerId, brokerAddr);
                registerFirst = registerFirst || (null == oldAddr);

                /**
                 * 只有當(dāng) topicConfigWrapper 不為null且必須是master節(jié)點(diǎn),才能進(jìn)入
                 */
                if (null != topicConfigWrapper
                    && MixAll.MASTER_ID == brokerId) {
                    // 當(dāng) topicConfigWrapper 的數(shù)據(jù)版本dataVersion 和當(dāng)前儲(chǔ)存值不一樣疗杉,或者是第一次注冊(cè)時(shí)阵谚;
                    // 都需要處理 Topic
                    if (this.isBrokerTopicConfigChanged(brokerAddr, topicConfigWrapper.getDataVersion())
                        || registerFirst) {
                        ConcurrentMap<String, TopicConfig> tcTable =
                            topicConfigWrapper.getTopicConfigTable();
                        if (tcTable != null) {
                            // 遍歷 Broker 上所有的 topic 配置,改變 topicQueueTable 集合數(shù)據(jù)
                            for (Map.Entry<String, TopicConfig> entry : tcTable.entrySet()) {
                                // 只有 Broker 組中的主節(jié)點(diǎn)才有可能調(diào)用到這個(gè)方法烟具。
                                this.createAndUpdateQueueData(brokerName, entry.getValue());
                            }
                        }
                    }
                }

                // 更新 brokerLiveTable 中該 Broker 地址對(duì)應(yīng)狀態(tài)信息梢什,表示該 Broker 地址是活躍的。
                BrokerLiveInfo prevBrokerLiveInfo = this.brokerLiveTable.put(brokerAddr,
                    new BrokerLiveInfo(
                        System.currentTimeMillis(),
                        topicConfigWrapper.getDataVersion(),
                        channel,
                        haServerAddr));
                if (null == prevBrokerLiveInfo) {
                    log.info("new broker registered, {} HAServer: {}", brokerAddr, haServerAddr);
                }

                if (filterServerList != null) {
                    if (filterServerList.isEmpty()) {
                        this.filterServerTable.remove(brokerAddr);
                    } else {
                        this.filterServerTable.put(brokerAddr, filterServerList);
                    }
                }

                if (MixAll.MASTER_ID != brokerId) {
                    // 如果這個(gè)Broker 是 slave 節(jié)點(diǎn)朝聋, 那么給它設(shè)置主節(jié)點(diǎn)的地址和 HaServer的地址
                    String masterAddr = brokerData.getBrokerAddrs().get(MixAll.MASTER_ID);
                    if (masterAddr != null) {
                        BrokerLiveInfo brokerLiveInfo = this.brokerLiveTable.get(masterAddr);
                        if (brokerLiveInfo != null) {
                            result.setHaServerAddr(brokerLiveInfo.getHaServerAddr());
                            result.setMasterAddr(masterAddr);
                        }
                    }
                }
            } finally {
                this.lock.writeLock().unlock();
            }
        } catch (Exception e) {
            log.error("registerBroker Exception", e);
        }

        return result;
    }

注冊(cè) Broker 信息, 每個(gè) Broker 會(huì)定時(shí)向 namesrv 發(fā)送自身的數(shù)據(jù)嗡午,就會(huì)調(diào)用到這個(gè)方法。方法流程:

  1. 先將這個(gè) BrokerbrokerName 添加到集群集合 clusterAddrTable 中冀痕。
  2. 將這個(gè) Broker 的相關(guān)信息添加到 brokerAddrTable 集合中荔睹,并判斷這個(gè) Broker 是否第一次注冊(cè) registerFirst
  3. 當(dāng)這個(gè) Broker 是主節(jié)點(diǎn)金度,topicConfigWrapper 的數(shù)據(jù)版本dataVersion 和當(dāng)前儲(chǔ)存值不一樣,或者是第一次注冊(cè)時(shí)严沥;都需要將該 Broker的主題信息 topicConfigWrapper 添加到 topicQueueTable 中猜极。
  4. 更新 brokerLiveTable 中該 Broker 地址對(duì)應(yīng)狀態(tài)信息,表示該 Broker 地址是活躍的消玄。
  5. 如果這個(gè) Brokerslave 節(jié)點(diǎn)跟伏, 那么給它設(shè)置主節(jié)點(diǎn)的地址和 HaServer的地址。

總結(jié)一下:

就是按照順序翩瓜,分別改變 clusterAddrTable, brokerAddrTable, topicQueueTable,brokerLiveTablefilterServerTable 的數(shù)據(jù)受扳。

3.2.2 unregisterBroker 方法

 /**
     * 取消注冊(cè)Broker
     */
    public void unregisterBroker(
        final String clusterName,
        final String brokerAddr,
        final String brokerName,
        final long brokerId) {
        try {
            try {
                this.lock.writeLock().lockInterruptibly();
                // 刪除的時(shí)候,先刪除簡(jiǎn)單的兔跌。
                // 1. 刪除 brokerLiveTable 集合中 brokerAddr 對(duì)應(yīng)數(shù)據(jù)
                BrokerLiveInfo brokerLiveInfo = this.brokerLiveTable.remove(brokerAddr);
                log.info("unregisterBroker, remove from brokerLiveTable {}, {}",
                    brokerLiveInfo != null ? "OK" : "Failed",
                    brokerAddr
                );
                // 2. 刪除 filterServerTable 集合中 brokerAddr 對(duì)應(yīng)數(shù)據(jù)
                this.filterServerTable.remove(brokerAddr);

                // 3. 再根據(jù) brokerName 處理 brokerAddrTable 集合中brokerData 數(shù)據(jù)
                boolean removeBrokerName = false;
                BrokerData brokerData = this.brokerAddrTable.get(brokerName);
                if (null != brokerData) {
                    String addr = brokerData.getBrokerAddrs().remove(brokerId);
                    log.info("unregisterBroker, remove addr from brokerAddrTable {}, {}",
                        addr != null ? "OK" : "Failed",
                        brokerAddr
                    );

                    if (brokerData.getBrokerAddrs().isEmpty()) {
                        // 如果 brokerName 只包含這一個(gè) brokerId勘高,被刪除了;
                        // 那么也要從 brokerAddrTable 集合中刪除這個(gè) brokerName
                        this.brokerAddrTable.remove(brokerName);
                        log.info("unregisterBroker, remove name from brokerAddrTable OK, {}",
                            brokerName
                        );

                        removeBrokerName = true;
                    }
                }

                if (removeBrokerName) {
                    // brokerName 被刪除了,要更新 clusterAddrTable 集合數(shù)據(jù)
                    Set<String> nameSet = this.clusterAddrTable.get(clusterName);
                    if (nameSet != null) {
                        boolean removed = nameSet.remove(brokerName);
                        log.info("unregisterBroker, remove name from clusterAddrTable {}, {}",
                            removed ? "OK" : "Failed",
                            brokerName);

                        if (nameSet.isEmpty()) {
                            this.clusterAddrTable.remove(clusterName);
                            log.info("unregisterBroker, remove cluster from clusterAddrTable {}",
                                clusterName
                            );
                        }
                    }
                    // 只有當(dāng) brokerName 被刪除了华望,那么就要更新 topicQueueTable 集合了蕊蝗。
                    this.removeTopicByBrokerName(brokerName);
                }
            } finally {
                this.lock.writeLock().unlock();
            }
        } catch (Exception e) {
            log.error("unregisterBroker Exception", e);
        }
    }
  1. 先刪除 brokerLiveTable 集合中 brokerAddr 對(duì)應(yīng)數(shù)據(jù)。
  2. 刪除 filterServerTable 集合中 brokerAddr 對(duì)應(yīng)數(shù)據(jù)赖舟。
  3. 再根據(jù) brokerName 處理 brokerAddrTable 集合中 BrokerData 數(shù)據(jù)蓬戚。

    如果 BrokerData 中只包含當(dāng)前這個(gè) Broker ,那么當(dāng)它被刪除后宾抓,那么就要從 brokerAddrTable 刪除這個(gè) brokerName 鍵子漩。表示這個(gè) Broker 組已經(jīng)不存在了。

  4. 當(dāng)一個(gè)Broker 組被刪除后石洗,那么就需要改變 clusterAddrTabletopicQueueTable 的數(shù)據(jù)了幢泼。
    • clusterAddrTable 中刪除這個(gè)Broker 組名字 brokerName , 如果這個(gè)集群只有這一個(gè)Broker 組,那么這個(gè)集群也要從 clusterAddrTable 中刪除劲腿。
    • 通過(guò) removeTopicByBrokerName() 方法旭绒,更新 topicQueueTable 集合。

總結(jié)一下:

就是先刪除 brokerLiveTablefilterServerTable 中的數(shù)據(jù)焦人,因?yàn)樗鼈冎械臄?shù)據(jù)是比較獨(dú)立的挥吵;然后修改brokerAddrTable 集合中數(shù)據(jù);最后根據(jù) Broker 組是否被刪除花椭,來(lái)決定是否修改clusterAddrTabletopicQueueTable 集合中的數(shù)據(jù)忽匈。

3.2.3 onChannelDestroy 方法

    /**
     * 通道 Channel 銷毀時(shí)的處理
     */
    public void onChannelDestroy(String remoteAddr, Channel channel) {
        String brokerAddrFound = null;
        if (channel != null) {
            // 根據(jù) channel,從 brokerLiveTable 中查找對(duì)應(yīng)的 brokerAddr 值矿辽。
            try {
                try {
                    this.lock.readLock().lockInterruptibly();
                    // 從 brokerLiveTable 中丹允,根據(jù) channel 查找對(duì)應(yīng)的Broker地址 brokerAddrFound
                    Iterator<Entry<String, BrokerLiveInfo>> itBrokerLiveTable =
                        this.brokerLiveTable.entrySet().iterator();
                    while (itBrokerLiveTable.hasNext()) {
                        Entry<String, BrokerLiveInfo> entry = itBrokerLiveTable.next();
                        if (entry.getValue().getChannel() == channel) {
                            brokerAddrFound = entry.getKey();
                            break;
                        }
                    }
                } finally {
                    this.lock.readLock().unlock();
                }
            } catch (Exception e) {
                log.error("onChannelDestroy Exception", e);
            }
        }

        if (null == brokerAddrFound) {
            brokerAddrFound = remoteAddr;
        } else {
            log.info("the broker's channel destroyed, {}, clean it's data structure at once", brokerAddrFound);
        }

        if (brokerAddrFound != null && brokerAddrFound.length() > 0) {

            // 根據(jù) brokerAddr 的值,進(jìn)行移除操作
            try {
                try {
                    this.lock.writeLock().lockInterruptibly();
                    // 先移除 brokerLiveTable 和 filterServerTable 集合中袋倔,
                    // brokerAddrFound 對(duì)應(yīng)數(shù)據(jù)
                    this.brokerLiveTable.remove(brokerAddrFound);
                    this.filterServerTable.remove(brokerAddrFound);
                    String brokerNameFound = null;
                    boolean removeBrokerName = false;
                    Iterator<Entry<String, BrokerData>> itBrokerAddrTable =
                        this.brokerAddrTable.entrySet().iterator();
                    // 根據(jù)Broker地址 brokerAddrFound 從brokerAddrTable 中刪除這個(gè) Broker
                    while (itBrokerAddrTable.hasNext() && (null == brokerNameFound)) {
                        BrokerData brokerData = itBrokerAddrTable.next().getValue();

                        Iterator<Entry<Long, String>> it = brokerData.getBrokerAddrs().entrySet().iterator();
                        while (it.hasNext()) {
                            Entry<Long, String> entry = it.next();
                            Long brokerId = entry.getKey();
                            String brokerAddr = entry.getValue();
                            if (brokerAddr.equals(brokerAddrFound)) {
                                // 找到了雕蔽,就刪除這個(gè) Broker 地址
                                brokerNameFound = brokerData.getBrokerName();
                                it.remove();
                                log.info("remove brokerAddr[{}, {}] from brokerAddrTable, because channel destroyed",
                                    brokerId, brokerAddr);
                                break;
                            }
                        }

                        if (brokerData.getBrokerAddrs().isEmpty()) {
                            removeBrokerName = true;
                            itBrokerAddrTable.remove();
                            log.info("remove brokerName[{}] from brokerAddrTable, because channel destroyed",
                                brokerData.getBrokerName());
                        }
                    }

                    if (brokerNameFound != null && removeBrokerName) {
                        // 如果 brokerName 被刪除了,那么就要改變 clusterAddrTable 集合了
                        Iterator<Entry<String, Set<String>>> it = this.clusterAddrTable.entrySet().iterator();
                        while (it.hasNext()) {
                            Entry<String, Set<String>> entry = it.next();
                            String clusterName = entry.getKey();
                            Set<String> brokerNames = entry.getValue();
                            boolean removed = brokerNames.remove(brokerNameFound);
                            if (removed) {
                                log.info("remove brokerName[{}], clusterName[{}] from clusterAddrTable, because channel destroyed",
                                    brokerNameFound, clusterName);

                                if (brokerNames.isEmpty()) {
                                    log.info("remove the clusterName[{}] from clusterAddrTable, because channel destroyed and no broker in this cluster",
                                        clusterName);
                                    it.remove();
                                }

                                break;
                            }
                        }
                    }

                    if (removeBrokerName) {
                        // 刪除了 brokerName宾娜,那么就要?jiǎng)h除 topicQueueTable 中所有這個(gè) brokerName 對(duì)應(yīng)的QueueData
                        Iterator<Entry<String, List<QueueData>>> itTopicQueueTable =
                            this.topicQueueTable.entrySet().iterator();
                        while (itTopicQueueTable.hasNext()) {
                            Entry<String, List<QueueData>> entry = itTopicQueueTable.next();
                            String topic = entry.getKey();
                            List<QueueData> queueDataList = entry.getValue();

                            Iterator<QueueData> itQueueData = queueDataList.iterator();
                            while (itQueueData.hasNext()) {
                                QueueData queueData = itQueueData.next();
                                if (queueData.getBrokerName().equals(brokerNameFound)) {
                                    itQueueData.remove();
                                    log.info("remove topic[{} {}], from topicQueueTable, because channel destroyed",
                                        topic, queueData);
                                }
                            }

                            if (queueDataList.isEmpty()) {
                                // 如果刪除后批狐,這個(gè) queueDataList 為空,
                                // 說(shuō)明這個(gè) Topic 沒有對(duì)應(yīng)的 QueueData前塔,也應(yīng)該刪除嚣艇。
                                itTopicQueueTable.remove();
                                log.info("remove topic[{}] all queue, from topicQueueTable, because channel destroyed",
                                    topic);
                            }
                        }
                    }
                } finally {
                    this.lock.writeLock().unlock();
                }
            } catch (Exception e) {
                log.error("onChannelDestroy Exception", e);
            }
        }
    }

這個(gè)方法和 unregisterBroker 很像,區(qū)別是:

  1. 通過(guò) channelbrokerLiveTable 找到對(duì)應(yīng)的 Broker 地址 brokerAddr 华弓。
  2. 刪除 brokerAddrTable 集合數(shù)據(jù)時(shí)食零,是通過(guò) brokerAddr 進(jìn)行匹配的,而不是 brokerId寂屏;
  3. 其他的操作流程和 unregisterBroker 一樣贰谣。

3.2.4 scanNotActiveBroker 方法

  public void scanNotActiveBroker() {
        Iterator<Entry<String, BrokerLiveInfo>> it = this.brokerLiveTable.entrySet().iterator();
        while (it.hasNext()) {
            Entry<String, BrokerLiveInfo> next = it.next();
            long last = next.getValue().getLastUpdateTimestamp();
            // 如果某個(gè) Broker 超過(guò) BROKER_CHANNEL_EXPIRED_TIME(1000 * 60 * 2) 沒有接收到信息娜搂,
            // 那么我們就認(rèn)為這個(gè) Broker 已經(jīng)出現(xiàn)問(wèn)題,刪除它
            if ((last + BROKER_CHANNEL_EXPIRED_TIME) < System.currentTimeMillis()) {
                RemotingUtil.closeChannel(next.getValue().getChannel());
                it.remove();
                log.warn("The broker channel expired, {} {}ms", next.getKey(), BROKER_CHANNEL_EXPIRED_TIME);
                this.onChannelDestroy(next.getKey(), next.getValue().getChannel());
            }
        }
    }

掃描不活躍的 Broker冈爹,并將它進(jìn)行關(guān)閉涌攻。

如果某個(gè) Broker 超過(guò) BROKER_CHANNEL_EXPIRED_TIME(1000 * 60 * 2) 沒有接收到信息。那么我們就認(rèn)為這個(gè) Broker 已經(jīng)出現(xiàn)問(wèn)題频伤,就關(guān)閉它恳谎。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市憋肖,隨后出現(xiàn)的幾起案子因痛,更是在濱河造成了極大的恐慌,老刑警劉巖岸更,帶你破解...
    沈念sama閱讀 217,657評(píng)論 6 505
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件鸵膏,死亡現(xiàn)場(chǎng)離奇詭異,居然都是意外死亡怎炊,警方通過(guò)查閱死者的電腦和手機(jī)谭企,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,889評(píng)論 3 394
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)评肆,“玉大人债查,你說(shuō)我怎么就攤上這事」贤欤” “怎么了盹廷?”我有些...
    開封第一講書人閱讀 164,057評(píng)論 0 354
  • 文/不壞的土叔 我叫張陵,是天一觀的道長(zhǎng)久橙。 經(jīng)常有香客問(wèn)我俄占,道長(zhǎng),這世上最難降的妖魔是什么淆衷? 我笑而不...
    開封第一講書人閱讀 58,509評(píng)論 1 293
  • 正文 為了忘掉前任缸榄,我火速辦了婚禮,結(jié)果婚禮上祝拯,老公的妹妹穿的比我還像新娘甚带。我一直安慰自己,他們只是感情好鹿驼,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,562評(píng)論 6 392
  • 文/花漫 我一把揭開白布欲低。 她就那樣靜靜地躺著辕宏,像睡著了一般畜晰。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上瑞筐,一...
    開封第一講書人閱讀 51,443評(píng)論 1 302
  • 那天凄鼻,我揣著相機(jī)與錄音,去河邊找鬼。 笑死块蚌,一個(gè)胖子當(dāng)著我的面吹牛闰非,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播峭范,決...
    沈念sama閱讀 40,251評(píng)論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼财松,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼!你這毒婦竟也來(lái)了纱控?” 一聲冷哼從身側(cè)響起辆毡,我...
    開封第一講書人閱讀 39,129評(píng)論 0 276
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤,失蹤者是張志新(化名)和其女友劉穎甜害,沒想到半個(gè)月后舶掖,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,561評(píng)論 1 314
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡尔店,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,779評(píng)論 3 335
  • 正文 我和宋清朗相戀三年眨攘,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片嚣州。...
    茶點(diǎn)故事閱讀 39,902評(píng)論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡鲫售,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出避诽,到底是詐尸還是另有隱情龟虎,我是刑警寧澤,帶...
    沈念sama閱讀 35,621評(píng)論 5 345
  • 正文 年R本政府宣布沙庐,位于F島的核電站鲤妥,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏拱雏。R本人自食惡果不足惜棉安,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,220評(píng)論 3 328
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望铸抑。 院中可真熱鬧贡耽,春花似錦、人聲如沸鹊汛。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,838評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)刁憋。三九已至滥嘴,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間至耻,已是汗流浹背若皱。 一陣腳步聲響...
    開封第一講書人閱讀 32,971評(píng)論 1 269
  • 我被黑心中介騙來(lái)泰國(guó)打工镊叁, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人走触。 一個(gè)月前我還...
    沈念sama閱讀 48,025評(píng)論 2 370
  • 正文 我出身青樓晦譬,卻偏偏與公主長(zhǎng)得像,于是被迫代替她去往敵國(guó)和親互广。 傳聞我的和親對(duì)象是個(gè)殘疾皇子敛腌,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,843評(píng)論 2 354

推薦閱讀更多精彩內(nèi)容