RocketMQ Namesrv啟動流程

NamesrvStartup啟動入口

public static NamesrvController main0(String[] args) {

    try {
        // 創(chuàng)建NamesrvController
        NamesrvController controller = createNamesrvController(args);
        // 啟動NamesrvController
        start(controller);
        String tip = "The Name Server boot success. serializeType=" + RemotingCommand.getSerializeTypeConfigInThisServer();
        log.info(tip);
        System.out.printf("%s%n", tip);
        return controller;
    } catch (Throwable e) {
        e.printStackTrace();
        System.exit(-1);
    }

    return null;
}

創(chuàng)建NamesrvController

public static NamesrvController createNamesrvController(String[] args) throws IOException, JoranException {

    // 根據(jù)命令行參數(shù)拼余,使用commons-cli命令行工具包解析生成CommandLine對象
    // 在parseCmdLine中,如果命令行中有-h選項,執(zhí)行打印幫助文檔的邏輯咙好,然后退出笋熬,不再繼續(xù)
    Options options = ServerUtil.buildCommandlineOptions(new Options());
    commandLine = ServerUtil.parseCmdLine("mqnamesrv", args, buildCommandlineOptions(options), new PosixParser());
    if (null == commandLine) {
        System.exit(-1);
        return null;
    }

    // 初始化了2個配置 NamesrvConfig满俗,NettyServerConfig蚂会,其中NettyServerConfig監(jiān)聽9876是硬編碼的
    // 然后通過命令行參數(shù) -c 指定一個配置文件淋样,然后將配置文件中的內(nèi)容解析成NamesrvConfig,NettyServerConfig的配置
    // 設(shè)置NamesrvConfig胁住,NettyServerConfig的邏輯是看類中的set方法习蓬,如果set方法后的名字和配置文件中的key匹配,就會設(shè)置對應(yīng)的值
    final NamesrvConfig namesrvConfig = new NamesrvConfig();
    final NettyServerConfig nettyServerConfig = new NettyServerConfig();
    nettyServerConfig.setListenPort(9876);
    if (commandLine.hasOption('c')) {
        String file = commandLine.getOptionValue('c');
        if (file != null) {
            InputStream in = new BufferedInputStream(new FileInputStream(file));
            properties = new Properties();
            properties.load(in);
            MixAll.properties2Object(properties, namesrvConfig);
            MixAll.properties2Object(properties, nettyServerConfig);

            namesrvConfig.setConfigStorePath(file);

            System.out.printf("load config properties file OK, %s%n", file);
            in.close();
        }
    }

    //如果指定了 -p 選項措嵌,會在控制臺打印配置信息,然后退出芦缰,不再繼續(xù)執(zhí)行
    if (commandLine.hasOption('p')) {
        InternalLogger console = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_CONSOLE_NAME);
        MixAll.printObjectProperties(console, namesrvConfig);
        MixAll.printObjectProperties(console, nettyServerConfig);
        System.exit(0);
    }

    //將啟動命令行的參數(shù)配置設(shè)置到NamesrvConfig中
    MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), namesrvConfig);

    // 檢查必須設(shè)置RocketMQHome
    // 在NamesrvConfig中企巢,可以看到使用系統(tǒng)屬性rocketmq.home.dir,環(huán)境變量ROCKETMQ_HOME和前面的-c指定的配置文件設(shè)置RocketMQHome
    // 在mqnamesrv啟動腳本中會自定探測RockerMQ并export ROCKETMQ_HOME
    if (null == namesrvConfig.getRocketmqHome()) {
        System.out.printf("Please set the %s variable in your environment to match the location of the RocketMQ installation%n", MixAll.ROCKETMQ_HOME_ENV);
        System.exit(-2);
    }

    // 加載logback.xml
    LoggerContext lc = (LoggerContext) LoggerFactory.getILoggerFactory();
    JoranConfigurator configurator = new JoranConfigurator();
    configurator.setContext(lc);
    lc.reset();
    configurator.doConfigure(namesrvConfig.getRocketmqHome() + "/conf/logback_namesrv.xml");

    log = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);

    // 使用logback打印NamesrvConfig让蕾,NettyServerConfig配置信息
    MixAll.printObjectProperties(log, namesrvConfig);
    MixAll.printObjectProperties(log, nettyServerConfig);

    final NamesrvController controller = new NamesrvController(namesrvConfig, nettyServerConfig);

    // 最后還把-c指定的文件的配置在保存到Configruation中
    controller.getConfiguration().registerConfig(properties);

    return controller;
}

可以看到NamesrvStartup只是一個啟動類浪规,主要邏輯都在處理命令行和配置,主要功能都是在NamesrvController中探孝,而且我們可以看到笋婿,在處理處理配置的時候,真的是對配置文件進(jìn)行反復(fù)鞭尸呀

首先通過-c指定配置文件顿颅,使用MixAll.properties2Object將配置設(shè)置到NamesrvConfig缸濒,NettyServerConfig

MixAll.properties2Object(properties, namesrvConfig);
MixAll.properties2Object(properties, nettyServerConfig);

然后通過命令行參數(shù)設(shè)置到NamesrvConfig

MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), namesrvConfig);

然后初始化NamesrvController,NamesrvController中會初始化一個Configuration類粱腻,Configuration類中又會把NamesrvConfig庇配,NettyServerConfig都merge到allConfigs中

public NamesrvController(NamesrvConfig namesrvConfig, NettyServerConfig nettyServerConfig) {
    this.namesrvConfig = namesrvConfig;
    this.nettyServerConfig = nettyServerConfig;
    this.kvConfigManager = new KVConfigManager(this);
    this.routeInfoManager = new RouteInfoManager();
    this.brokerHousekeepingService = new BrokerHousekeepingService(this);
    // 初始化Configuration
    this.configuration = new Configuration(
        log,
        this.namesrvConfig, this.nettyServerConfig
    );
    this.configuration.setStorePathFromConfig(this.namesrvConfig, "configStorePath");
}
public Configuration(InternalLogger log, Object... configObjects) {
    this.log = log;
    if (configObjects == null || configObjects.length == 0) {
        return;
    }
    // 將NamesrvConfig,NettyServerConfig注冊
    for (Object configObject : configObjects) {
        registerConfig(configObject);
    }
}
public Configuration registerConfig(Object configObject) {
    try {
        readWriteLock.writeLock().lockInterruptibly();

        try {

            Properties registerProps = MixAll.object2Properties(configObject);
                        // 將NamesrvConfig绍些,NettyServerConfig合并到allConfigs
            merge(registerProps, this.allConfigs);

            configObjectList.add(configObject);
        } finally {
            readWriteLock.writeLock().unlock();
        }
    } catch (InterruptedException e) {
        log.error("registerConfig lock error");
    }
    return this;
}

最后還要把-c指定的配置文件和allConfigs進(jìn)行合并

// 最后還把-c指定的文件的配置在保存到Configruation中
controller.getConfiguration().registerConfig(properties);

可以看到-c指定的配置文件讀取進(jìn)來后被拆分為NamesrvConfig捞慌,NettyServerConfig,然后又和Configuration中的allConfigs合并柬批,最后還要再合并一次啸澡,你說這個-c指定的配置文件慘不慘

NamesrvController初始化完成后袖订,就調(diào)用start(controller),才真正的開始

// 啟動NamesrvController
start(controller);

start(controller)方法中最關(guān)鍵的就是下面2個方法

controller.initialize();
controller.start();

NamesrvController初始化

public boolean initialize() {
    
    // 從NamesrvConfig#KvConfigPath指定的文件中反序列化數(shù)據(jù)到KVConfigManager#configTable中
    this.kvConfigManager.load();
    // 啟動網(wǎng)絡(luò)通信的Netty服務(wù)
    this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService);

    // 初始化一下負(fù)責(zé)處理Netty網(wǎng)絡(luò)交互數(shù)據(jù)的線程池嗅虏,
    this.remotingExecutor =
        Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(), new ThreadFactoryImpl("RemotingExecutorThread_"));

    // 注冊一個默認(rèn)負(fù)責(zé)處理Netty網(wǎng)絡(luò)交互數(shù)據(jù)的DefaultRequestProcessor洛姑,這個Processor會使用remotingExecutor執(zhí)行
    // *劃重點,后面這里會再次提到*
    this.registerProcessor();

    // 每10s掃描一下失效的Broker
    this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

        @Override
        public void run() {
            NamesrvController.this.routeInfoManager.scanNotActiveBroker();
        }
    }, 5, 10, TimeUnit.SECONDS);

    // 每10min打印一下前面被反復(fù)蹂躪的配置
    this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

        @Override
        public void run() {
            NamesrvController.this.kvConfigManager.printAllPeriodically();
        }
    }, 1, 10, TimeUnit.MINUTES);


    // 設(shè)置TLS旋恼,這塊不太了解吏口,所以省略了,以后用空了再研究一下TLS吧
    if (TlsSystemConfig.tlsMode != TlsMode.DISABLED) {
       ...
    }

    return true;
}

啟動NamesrvController

public void start() throws Exception {
    this.remotingServer.start();

    if (this.fileWatchService != null) {
        this.fileWatchService.start();
    }
}

可以看到邏輯還算比較清晰冰更,關(guān)鍵功能在KVConfigManager产徊,RouteInfoManager和NettyRemotingServer實現(xiàn)

我們先來看看NettyRemotingServer

public NettyRemotingServer(final NettyServerConfig nettyServerConfig,
    final ChannelEventListener channelEventListener) {
    // 初始化2個Semaphore,一個是one-way請求的并發(fā)數(shù)蜀细,一個是asynchronous請求的并發(fā)數(shù)舟铜,可以簡單理解成對2種請求做了限流,至于什么是one-way請求奠衔,什么是asynchronous請求谆刨,分析到了再說吧
    super(nettyServerConfig.getServerOnewaySemaphoreValue(), nettyServerConfig.getServerAsyncSemaphoreValue());
    this.serverBootstrap = new ServerBootstrap();
    this.nettyServerConfig = nettyServerConfig;
    this.channelEventListener = channelEventListener;

    int publicThreadNums = nettyServerConfig.getServerCallbackExecutorThreads();
    if (publicThreadNums <= 0) {
        publicThreadNums = 4;
    }
        
     // 初始化一個公用的線程池,什么情況下用這個公用的線程池归斤?看后面的分析
    this.publicExecutor = Executors.newFixedThreadPool(publicThreadNums, new ThreadFactory() {
        private AtomicInteger threadIndex = new AtomicInteger(0);

        @Override
        public Thread newThread(Runnable r) {
            return new Thread(r, "NettyServerPublicExecutor_" + this.threadIndex.incrementAndGet());
        }
    });

    // 下面這個就是判斷系統(tǒng)是否支持epoll來初始化相應(yīng)的EventLoopGroup痊夭,如果不是很了解Netty的同學(xué)可以先去學(xué)學(xué)Netty
    if (useEpoll()) {
        this.eventLoopGroupBoss = new EpollEventLoopGroup();

        this.eventLoopGroupSelector = new EpollEventLoopGroup();
    } else {
        this.eventLoopGroupBoss = new NioEventLoopGroup();

        this.eventLoopGroupSelector = new NioEventLoopGroup();
    }
        // 這個也是TLS相關(guān)的忽略分析
    loadSslContext();
}

這里可以看一下怎么判斷系統(tǒng)是否支持epoll的

private static boolean isLinuxPlatform = false;
private static boolean isWindowsPlatform = false;

static {
    if (OS_NAME != null && OS_NAME.toLowerCase().contains("linux")) {
        isLinuxPlatform = true;
    }

    if (OS_NAME != null && OS_NAME.toLowerCase().contains("windows")) {
        isWindowsPlatform = true;
    }
}

-----

private boolean useEpoll() {
    return RemotingUtil.isLinuxPlatform()
        && nettyServerConfig.isUseEpollNativeSelector()
        && Epoll.isAvailable();
}

還記得在NamesrvController初始化時注冊一個默認(rèn)負(fù)責(zé)處理Netty網(wǎng)絡(luò)交互數(shù)據(jù)的DefaultRequestProcessor,DefaultRequestProcessor使用了一個專用的remotingExecutor線程池脏里,我們也可以注冊其他的Processor她我,如果我們注冊Processor時沒有指定線程池就會使用公共的線程池publicExecutor

 public void registerProcessor(int requestCode, NettyRequestProcessor processor, ExecutorService executor) {
    ExecutorService executorThis = executor;
    if (null == executor) {
        executorThis = this.publicExecutor;
    }

    Pair<NettyRequestProcessor, ExecutorService> pair = new Pair<NettyRequestProcessor, ExecutorService>(processor, executorThis);
    this.processorTable.put(requestCode, pair);
}

上面只是將Netty的EventLoopGroup進(jìn)行了初始化,卻沒有真正的啟動Netty迫横,真正的啟動還得調(diào)用remotingServer.start();

public void start() throws Exception {
    this.remotingServer.start();
        // fileWatchService和TLS有關(guān)番舆,大概就是會監(jiān)聽TLS相關(guān)文件的改變,也不仔細(xì)分析了
    if (this.fileWatchService != null) {
        this.fileWatchService.start();
    }
}

接下來看看NettyRemotingServer的start()方法干了啥

public void start() {
    // 初始化一個線程池矾踱,用于執(zhí)行共享的Handler
    this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(
        nettyServerConfig.getServerWorkerThreads(),
        new ThreadFactory() {

            private AtomicInteger threadIndex = new AtomicInteger(0);

            @Override
            public Thread newThread(Runnable r) {
                return new Thread(r, "NettyServerCodecThread_" + this.threadIndex.incrementAndGet());
            }
        });
        // 初始化一些共享的Handler恨狈,HandshakeHandler,NettyEncoder呛讲,NettyConnectManageHandler禾怠,NettyServerHandler
    prepareSharableHandlers();
        // 后面就是一些Netty的設(shè)置,具體看Netty
    ServerBootstrap childHandler =
        this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupSelector)
            .channel(useEpoll() ? EpollServerSocketChannel.class : NioServerSocketChannel.class)
            .option(ChannelOption.SO_BACKLOG, 1024)
            .option(ChannelOption.SO_REUSEADDR, true)
            .option(ChannelOption.SO_KEEPALIVE, false)
            .childOption(ChannelOption.TCP_NODELAY, true)
            .childOption(ChannelOption.SO_SNDBUF, nettyServerConfig.getServerSocketSndBufSize())
            .childOption(ChannelOption.SO_RCVBUF, nettyServerConfig.getServerSocketRcvBufSize())
            .localAddress(new InetSocketAddress(this.nettyServerConfig.getListenPort()))
                // 我們只需要關(guān)心這里設(shè)置了哪些handler
            .childHandler(new ChannelInitializer<SocketChannel>() {
                @Override
                public void initChannel(SocketChannel ch) throws Exception {
                    ch.pipeline()
                        .addLast(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME, handshakeHandler)
                        .addLast(defaultEventExecutorGroup,
                            encoder,
                            new NettyDecoder(),
                            new IdleStateHandler(0, 0,          nettyServerConfig.getServerChannelMaxIdleTimeSeconds()),
                            connectionManageHandler,
                            serverHandler
                        );
                }
            });

    if (nettyServerConfig.isServerPooledByteBufAllocatorEnable()) {
        childHandler.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
    }

    try {
        ChannelFuture sync = this.serverBootstrap.bind().sync();
        InetSocketAddress addr = (InetSocketAddress) sync.channel().localAddress();
        this.port = addr.getPort();
    } catch (InterruptedException e1) {
        throw new RuntimeException("this.serverBootstrap.bind().sync() InterruptedException", e1);
    }
        // 這里有一個channelEventListener贝搁,在NamesrvController中channelEventListener就是BrokerHousekeepingService刃宵,BrokerHousekeepingService負(fù)責(zé)在broker斷開連接的時候,移除RouteInfoManager中的路由信息
    // NettyEventExecutor會維護(hù)一個NettyEvent的隊列徘公,NettyConnectManageHandler會向NettyEvent的隊列中添加Event牲证,然后由channelEventListener進(jìn)行消費
    if (this.channelEventListener != null) {
        this.nettyEventExecutor.start();
    }
        // 定時掃描responseTable,執(zhí)行超時請求的callback
    // 這里有2個疑問关面,是誰向responseTable中put數(shù)據(jù)坦袍?為什么這里只執(zhí)行超時請求的callback十厢,正常結(jié)束的請求在哪處理的?
    this.timer.scheduleAtFixedRate(new TimerTask() {

        @Override
        public void run() {
            try {
                NettyRemotingServer.this.scanResponseTable();
            } catch (Throwable e) {
                log.error("scanResponseTable exception", e);
            }
        }
    }, 1000 * 3, 1000);
}

到這里RocketMQ Namesrv的啟動流程就結(jié)束了捂齐,下一篇在來分析具體怎么處理請求數(shù)據(jù)的吧

歡迎關(guān)注我的公眾號

我的公眾號
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末蛮放,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子奠宜,更是在濱河造成了極大的恐慌包颁,老刑警劉巖,帶你破解...
    沈念sama閱讀 218,755評論 6 507
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件压真,死亡現(xiàn)場離奇詭異娩嚼,居然都是意外死亡,警方通過查閱死者的電腦和手機滴肿,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,305評論 3 395
  • 文/潘曉璐 我一進(jìn)店門岳悟,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人泼差,你說我怎么就攤上這事贵少。” “怎么了堆缘?”我有些...
    開封第一講書人閱讀 165,138評論 0 355
  • 文/不壞的土叔 我叫張陵滔灶,是天一觀的道長。 經(jīng)常有香客問我吼肥,道長宽气,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 58,791評論 1 295
  • 正文 為了忘掉前任潜沦,我火速辦了婚禮,結(jié)果婚禮上绪氛,老公的妹妹穿的比我還像新娘唆鸡。我一直安慰自己,他們只是感情好枣察,可當(dāng)我...
    茶點故事閱讀 67,794評論 6 392
  • 文/花漫 我一把揭開白布争占。 她就那樣靜靜地躺著,像睡著了一般序目。 火紅的嫁衣襯著肌膚如雪臂痕。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 51,631評論 1 305
  • 那天猿涨,我揣著相機與錄音握童,去河邊找鬼。 笑死叛赚,一個胖子當(dāng)著我的面吹牛澡绩,可吹牛的內(nèi)容都是我干的稽揭。 我是一名探鬼主播,決...
    沈念sama閱讀 40,362評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼肥卡,長吁一口氣:“原來是場噩夢啊……” “哼溪掀!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起步鉴,我...
    開封第一講書人閱讀 39,264評論 0 276
  • 序言:老撾萬榮一對情侶失蹤揪胃,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后氛琢,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體喊递,經(jīng)...
    沈念sama閱讀 45,724評論 1 315
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,900評論 3 336
  • 正文 我和宋清朗相戀三年艺沼,在試婚紗的時候發(fā)現(xiàn)自己被綠了册舞。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 40,040評論 1 350
  • 序言:一個原本活蹦亂跳的男人離奇死亡障般,死狀恐怖调鲸,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情挽荡,我是刑警寧澤藐石,帶...
    沈念sama閱讀 35,742評論 5 346
  • 正文 年R本政府宣布,位于F島的核電站定拟,受9級特大地震影響于微,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜青自,卻給世界環(huán)境...
    茶點故事閱讀 41,364評論 3 330
  • 文/蒙蒙 一株依、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧延窜,春花似錦恋腕、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,944評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至获高,卻和暖如春哈肖,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背念秧。 一陣腳步聲響...
    開封第一講書人閱讀 33,060評論 1 270
  • 我被黑心中介騙來泰國打工淤井, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人。 一個月前我還...
    沈念sama閱讀 48,247評論 3 371
  • 正文 我出身青樓庄吼,卻偏偏與公主長得像缎除,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子总寻,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 44,979評論 2 355

推薦閱讀更多精彩內(nèi)容

  • 一渐行、簡述 這篇內(nèi)容主要講述RocketMQ的NameSrv的啟動流程轰坊,通過它的啟動,也能了解到NameSrv是干什...
    ASD_92f7閱讀 843評論 0 0
  • ORA-00001: 違反唯一約束條件 (.) 錯誤說明:當(dāng)在唯一索引所對應(yīng)的列上鍵入重復(fù)值時祟印,會觸發(fā)此異常肴沫。 O...
    我想起個好名字閱讀 5,320評論 0 9
  • feisky云計算、虛擬化與Linux技術(shù)筆記posts - 1014, comments - 298, trac...
    不排版閱讀 3,855評論 0 5
  • 本來是準(zhǔn)備看一看Spring源碼的蕴忆。然后在知乎上看到來一個帖子颤芬,說有一群**自己連Spring官方文檔都沒有完全讀...
    此魚不得水閱讀 6,935評論 4 21
  • [TOC] ##Assoc 顯示或修改文件擴展名關(guān)聯(lián) Assoc [.Ext[=[Filetype]]] .Ex...
    btijjj閱讀 345評論 0 1