NamesrvStartup
啟動入口
public static NamesrvController main0(String[] args) {
try {
// 創(chuàng)建NamesrvController
NamesrvController controller = createNamesrvController(args);
// 啟動NamesrvController
start(controller);
String tip = "The Name Server boot success. serializeType=" + RemotingCommand.getSerializeTypeConfigInThisServer();
log.info(tip);
System.out.printf("%s%n", tip);
return controller;
} catch (Throwable e) {
e.printStackTrace();
System.exit(-1);
}
return null;
}
創(chuàng)建NamesrvController
public static NamesrvController createNamesrvController(String[] args) throws IOException, JoranException {
// 根據(jù)命令行參數(shù)拼余,使用commons-cli命令行工具包解析生成CommandLine對象
// 在parseCmdLine中,如果命令行中有-h選項,執(zhí)行打印幫助文檔的邏輯咙好,然后退出笋熬,不再繼續(xù)
Options options = ServerUtil.buildCommandlineOptions(new Options());
commandLine = ServerUtil.parseCmdLine("mqnamesrv", args, buildCommandlineOptions(options), new PosixParser());
if (null == commandLine) {
System.exit(-1);
return null;
}
// 初始化了2個配置 NamesrvConfig满俗,NettyServerConfig蚂会,其中NettyServerConfig監(jiān)聽9876是硬編碼的
// 然后通過命令行參數(shù) -c 指定一個配置文件淋样,然后將配置文件中的內(nèi)容解析成NamesrvConfig,NettyServerConfig的配置
// 設(shè)置NamesrvConfig胁住,NettyServerConfig的邏輯是看類中的set方法习蓬,如果set方法后的名字和配置文件中的key匹配,就會設(shè)置對應(yīng)的值
final NamesrvConfig namesrvConfig = new NamesrvConfig();
final NettyServerConfig nettyServerConfig = new NettyServerConfig();
nettyServerConfig.setListenPort(9876);
if (commandLine.hasOption('c')) {
String file = commandLine.getOptionValue('c');
if (file != null) {
InputStream in = new BufferedInputStream(new FileInputStream(file));
properties = new Properties();
properties.load(in);
MixAll.properties2Object(properties, namesrvConfig);
MixAll.properties2Object(properties, nettyServerConfig);
namesrvConfig.setConfigStorePath(file);
System.out.printf("load config properties file OK, %s%n", file);
in.close();
}
}
//如果指定了 -p 選項措嵌,會在控制臺打印配置信息,然后退出芦缰,不再繼續(xù)執(zhí)行
if (commandLine.hasOption('p')) {
InternalLogger console = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_CONSOLE_NAME);
MixAll.printObjectProperties(console, namesrvConfig);
MixAll.printObjectProperties(console, nettyServerConfig);
System.exit(0);
}
//將啟動命令行的參數(shù)配置設(shè)置到NamesrvConfig中
MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), namesrvConfig);
// 檢查必須設(shè)置RocketMQHome
// 在NamesrvConfig中企巢,可以看到使用系統(tǒng)屬性rocketmq.home.dir,環(huán)境變量ROCKETMQ_HOME和前面的-c指定的配置文件設(shè)置RocketMQHome
// 在mqnamesrv啟動腳本中會自定探測RockerMQ并export ROCKETMQ_HOME
if (null == namesrvConfig.getRocketmqHome()) {
System.out.printf("Please set the %s variable in your environment to match the location of the RocketMQ installation%n", MixAll.ROCKETMQ_HOME_ENV);
System.exit(-2);
}
// 加載logback.xml
LoggerContext lc = (LoggerContext) LoggerFactory.getILoggerFactory();
JoranConfigurator configurator = new JoranConfigurator();
configurator.setContext(lc);
lc.reset();
configurator.doConfigure(namesrvConfig.getRocketmqHome() + "/conf/logback_namesrv.xml");
log = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);
// 使用logback打印NamesrvConfig让蕾,NettyServerConfig配置信息
MixAll.printObjectProperties(log, namesrvConfig);
MixAll.printObjectProperties(log, nettyServerConfig);
final NamesrvController controller = new NamesrvController(namesrvConfig, nettyServerConfig);
// 最后還把-c指定的文件的配置在保存到Configruation中
controller.getConfiguration().registerConfig(properties);
return controller;
}
可以看到NamesrvStartup
只是一個啟動類浪规,主要邏輯都在處理命令行和配置,主要功能都是在NamesrvController
中探孝,而且我們可以看到笋婿,在處理處理配置的時候,真的是對配置文件進(jìn)行反復(fù)鞭尸呀
首先通過-c指定配置文件顿颅,使用MixAll.properties2Object將配置設(shè)置到NamesrvConfig缸濒,NettyServerConfig
MixAll.properties2Object(properties, namesrvConfig);
MixAll.properties2Object(properties, nettyServerConfig);
然后通過命令行參數(shù)設(shè)置到NamesrvConfig
MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), namesrvConfig);
然后初始化NamesrvController,NamesrvController中會初始化一個Configuration類粱腻,Configuration類中又會把NamesrvConfig庇配,NettyServerConfig都merge到allConfigs中
public NamesrvController(NamesrvConfig namesrvConfig, NettyServerConfig nettyServerConfig) {
this.namesrvConfig = namesrvConfig;
this.nettyServerConfig = nettyServerConfig;
this.kvConfigManager = new KVConfigManager(this);
this.routeInfoManager = new RouteInfoManager();
this.brokerHousekeepingService = new BrokerHousekeepingService(this);
// 初始化Configuration
this.configuration = new Configuration(
log,
this.namesrvConfig, this.nettyServerConfig
);
this.configuration.setStorePathFromConfig(this.namesrvConfig, "configStorePath");
}
public Configuration(InternalLogger log, Object... configObjects) {
this.log = log;
if (configObjects == null || configObjects.length == 0) {
return;
}
// 將NamesrvConfig,NettyServerConfig注冊
for (Object configObject : configObjects) {
registerConfig(configObject);
}
}
public Configuration registerConfig(Object configObject) {
try {
readWriteLock.writeLock().lockInterruptibly();
try {
Properties registerProps = MixAll.object2Properties(configObject);
// 將NamesrvConfig绍些,NettyServerConfig合并到allConfigs
merge(registerProps, this.allConfigs);
configObjectList.add(configObject);
} finally {
readWriteLock.writeLock().unlock();
}
} catch (InterruptedException e) {
log.error("registerConfig lock error");
}
return this;
}
最后還要把-c指定的配置文件和allConfigs進(jìn)行合并
// 最后還把-c指定的文件的配置在保存到Configruation中
controller.getConfiguration().registerConfig(properties);
可以看到-c指定的配置文件讀取進(jìn)來后被拆分為NamesrvConfig捞慌,NettyServerConfig,然后又和Configuration中的allConfigs合并柬批,最后還要再合并一次啸澡,你說這個-c指定的配置文件慘不慘
NamesrvController
初始化完成后袖订,就調(diào)用start(controller),才真正的開始
// 啟動NamesrvController
start(controller);
在start(controller)
方法中最關(guān)鍵的就是下面2個方法
controller.initialize();
controller.start();
NamesrvController
初始化
public boolean initialize() {
// 從NamesrvConfig#KvConfigPath指定的文件中反序列化數(shù)據(jù)到KVConfigManager#configTable中
this.kvConfigManager.load();
// 啟動網(wǎng)絡(luò)通信的Netty服務(wù)
this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService);
// 初始化一下負(fù)責(zé)處理Netty網(wǎng)絡(luò)交互數(shù)據(jù)的線程池嗅虏,
this.remotingExecutor =
Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(), new ThreadFactoryImpl("RemotingExecutorThread_"));
// 注冊一個默認(rèn)負(fù)責(zé)處理Netty網(wǎng)絡(luò)交互數(shù)據(jù)的DefaultRequestProcessor洛姑,這個Processor會使用remotingExecutor執(zhí)行
// *劃重點,后面這里會再次提到*
this.registerProcessor();
// 每10s掃描一下失效的Broker
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
NamesrvController.this.routeInfoManager.scanNotActiveBroker();
}
}, 5, 10, TimeUnit.SECONDS);
// 每10min打印一下前面被反復(fù)蹂躪的配置
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
NamesrvController.this.kvConfigManager.printAllPeriodically();
}
}, 1, 10, TimeUnit.MINUTES);
// 設(shè)置TLS旋恼,這塊不太了解吏口,所以省略了,以后用空了再研究一下TLS吧
if (TlsSystemConfig.tlsMode != TlsMode.DISABLED) {
...
}
return true;
}
啟動NamesrvController
public void start() throws Exception {
this.remotingServer.start();
if (this.fileWatchService != null) {
this.fileWatchService.start();
}
}
可以看到邏輯還算比較清晰冰更,關(guān)鍵功能在KVConfigManager产徊,RouteInfoManager和NettyRemotingServer實現(xiàn)
我們先來看看NettyRemotingServer
public NettyRemotingServer(final NettyServerConfig nettyServerConfig,
final ChannelEventListener channelEventListener) {
// 初始化2個Semaphore,一個是one-way請求的并發(fā)數(shù)蜀细,一個是asynchronous請求的并發(fā)數(shù)舟铜,可以簡單理解成對2種請求做了限流,至于什么是one-way請求奠衔,什么是asynchronous請求谆刨,分析到了再說吧
super(nettyServerConfig.getServerOnewaySemaphoreValue(), nettyServerConfig.getServerAsyncSemaphoreValue());
this.serverBootstrap = new ServerBootstrap();
this.nettyServerConfig = nettyServerConfig;
this.channelEventListener = channelEventListener;
int publicThreadNums = nettyServerConfig.getServerCallbackExecutorThreads();
if (publicThreadNums <= 0) {
publicThreadNums = 4;
}
// 初始化一個公用的線程池,什么情況下用這個公用的線程池归斤?看后面的分析
this.publicExecutor = Executors.newFixedThreadPool(publicThreadNums, new ThreadFactory() {
private AtomicInteger threadIndex = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "NettyServerPublicExecutor_" + this.threadIndex.incrementAndGet());
}
});
// 下面這個就是判斷系統(tǒng)是否支持epoll來初始化相應(yīng)的EventLoopGroup痊夭,如果不是很了解Netty的同學(xué)可以先去學(xué)學(xué)Netty
if (useEpoll()) {
this.eventLoopGroupBoss = new EpollEventLoopGroup();
this.eventLoopGroupSelector = new EpollEventLoopGroup();
} else {
this.eventLoopGroupBoss = new NioEventLoopGroup();
this.eventLoopGroupSelector = new NioEventLoopGroup();
}
// 這個也是TLS相關(guān)的忽略分析
loadSslContext();
}
這里可以看一下怎么判斷系統(tǒng)是否支持epoll的
private static boolean isLinuxPlatform = false;
private static boolean isWindowsPlatform = false;
static {
if (OS_NAME != null && OS_NAME.toLowerCase().contains("linux")) {
isLinuxPlatform = true;
}
if (OS_NAME != null && OS_NAME.toLowerCase().contains("windows")) {
isWindowsPlatform = true;
}
}
-----
private boolean useEpoll() {
return RemotingUtil.isLinuxPlatform()
&& nettyServerConfig.isUseEpollNativeSelector()
&& Epoll.isAvailable();
}
還記得在NamesrvController
初始化時注冊一個默認(rèn)負(fù)責(zé)處理Netty網(wǎng)絡(luò)交互數(shù)據(jù)的DefaultRequestProcessor,DefaultRequestProcessor使用了一個專用的remotingExecutor線程池脏里,我們也可以注冊其他的Processor她我,如果我們注冊Processor時沒有指定線程池就會使用公共的線程池publicExecutor
public void registerProcessor(int requestCode, NettyRequestProcessor processor, ExecutorService executor) {
ExecutorService executorThis = executor;
if (null == executor) {
executorThis = this.publicExecutor;
}
Pair<NettyRequestProcessor, ExecutorService> pair = new Pair<NettyRequestProcessor, ExecutorService>(processor, executorThis);
this.processorTable.put(requestCode, pair);
}
上面只是將Netty的EventLoopGroup進(jìn)行了初始化,卻沒有真正的啟動Netty迫横,真正的啟動還得調(diào)用remotingServer.start();
public void start() throws Exception {
this.remotingServer.start();
// fileWatchService和TLS有關(guān)番舆,大概就是會監(jiān)聽TLS相關(guān)文件的改變,也不仔細(xì)分析了
if (this.fileWatchService != null) {
this.fileWatchService.start();
}
}
接下來看看NettyRemotingServer的start()方法干了啥
public void start() {
// 初始化一個線程池矾踱,用于執(zhí)行共享的Handler
this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(
nettyServerConfig.getServerWorkerThreads(),
new ThreadFactory() {
private AtomicInteger threadIndex = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "NettyServerCodecThread_" + this.threadIndex.incrementAndGet());
}
});
// 初始化一些共享的Handler恨狈,HandshakeHandler,NettyEncoder呛讲,NettyConnectManageHandler禾怠,NettyServerHandler
prepareSharableHandlers();
// 后面就是一些Netty的設(shè)置,具體看Netty
ServerBootstrap childHandler =
this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupSelector)
.channel(useEpoll() ? EpollServerSocketChannel.class : NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 1024)
.option(ChannelOption.SO_REUSEADDR, true)
.option(ChannelOption.SO_KEEPALIVE, false)
.childOption(ChannelOption.TCP_NODELAY, true)
.childOption(ChannelOption.SO_SNDBUF, nettyServerConfig.getServerSocketSndBufSize())
.childOption(ChannelOption.SO_RCVBUF, nettyServerConfig.getServerSocketRcvBufSize())
.localAddress(new InetSocketAddress(this.nettyServerConfig.getListenPort()))
// 我們只需要關(guān)心這里設(shè)置了哪些handler
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline()
.addLast(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME, handshakeHandler)
.addLast(defaultEventExecutorGroup,
encoder,
new NettyDecoder(),
new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()),
connectionManageHandler,
serverHandler
);
}
});
if (nettyServerConfig.isServerPooledByteBufAllocatorEnable()) {
childHandler.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
}
try {
ChannelFuture sync = this.serverBootstrap.bind().sync();
InetSocketAddress addr = (InetSocketAddress) sync.channel().localAddress();
this.port = addr.getPort();
} catch (InterruptedException e1) {
throw new RuntimeException("this.serverBootstrap.bind().sync() InterruptedException", e1);
}
// 這里有一個channelEventListener贝搁,在NamesrvController中channelEventListener就是BrokerHousekeepingService刃宵,BrokerHousekeepingService負(fù)責(zé)在broker斷開連接的時候,移除RouteInfoManager中的路由信息
// NettyEventExecutor會維護(hù)一個NettyEvent的隊列徘公,NettyConnectManageHandler會向NettyEvent的隊列中添加Event牲证,然后由channelEventListener進(jìn)行消費
if (this.channelEventListener != null) {
this.nettyEventExecutor.start();
}
// 定時掃描responseTable,執(zhí)行超時請求的callback
// 這里有2個疑問关面,是誰向responseTable中put數(shù)據(jù)坦袍?為什么這里只執(zhí)行超時請求的callback十厢,正常結(jié)束的請求在哪處理的?
this.timer.scheduleAtFixedRate(new TimerTask() {
@Override
public void run() {
try {
NettyRemotingServer.this.scanResponseTable();
} catch (Throwable e) {
log.error("scanResponseTable exception", e);
}
}
}, 1000 * 3, 1000);
}
到這里RocketMQ Namesrv的啟動流程就結(jié)束了捂齐,下一篇在來分析具體怎么處理請求數(shù)據(jù)的吧
歡迎關(guān)注我的公眾號