前言
簡(jiǎn)單介紹一下RocketMQ的背景浮创,RocketMQ是阿里開(kāi)源的消息中間件哭尝,根據(jù)官網(wǎng)描述斥杜,RocketMQ其實(shí)是阿里發(fā)現(xiàn)ActiveMQ和Kafka無(wú)法滿足業(yè)務(wù)需求才開(kāi)發(fā)的涕俗。其中很多思想有參考其他消息中間件妹蔽,具體的幾個(gè)消息中間件之間的功能特點(diǎn)比較可以參考apache rocketMQ的官方文檔莱革,這里就不贅述了。
RocketMQ在官網(wǎng)的整體架構(gòu)如下:
可以看到讹开,RocketMQ主要包含四大組件:NameServer盅视、Broker、Producer旦万、Consumer闹击。每一個(gè)組件都支持水平擴(kuò)展以預(yù)防單點(diǎn)故障。NameServer可以簡(jiǎn)單理解為注冊(cè)中心成艘,可用于路由發(fā)現(xiàn)和集群管理赏半;Broker則是我們通常意義上的服務(wù)端,負(fù)責(zé)消息的轉(zhuǎn)發(fā)淆两、存儲(chǔ)断箫、搜索等功能;Producer和Consumer屬于客戶端秋冰,負(fù)責(zé)消息的發(fā)送和接收仲义。
大多數(shù)情況下,我們對(duì)于RocketMQ的使用停留在服務(wù)搭建和簡(jiǎn)單的消息收發(fā)上(如果服務(wù)搭建由運(yùn)維完成的話,這一步也省掉了)埃撵。那RocketMQ在NameServer和Broker啟動(dòng)的時(shí)候做了什么赵颅?Producer又是如何將消息發(fā)送到Broker?Consumer的消費(fèi)流程呢暂刘?
本文將基于RocketMQ的4.5.0版本源碼饺谬,以其中官方提供的example為例,分析一下RocketMQ服務(wù)啟動(dòng)和消息發(fā)送和接收的流程谣拣。
NameServer啟動(dòng)
nameServer的啟動(dòng)入口源碼如下募寨,兩步走,第一步創(chuàng)建控制器森缠,第二步啟動(dòng)控制器
public static void main(String[] args) {
main0(args);
}
public static NamesrvController main0(String[] args) {
try {
NamesrvController controller = createNamesrvController(args);
start(controller);
String tip = "The Name Server boot success. serializeType=" + RemotingCommand.getSerializeTypeConfigInThisServer();
log.info(tip);
System.out.printf("%s%n", tip);
return controller;
} catch (Throwable e) {
e.printStackTrace();
System.exit(-1);
}
return null;
}
創(chuàng)建控制器
看一下創(chuàng)建控制器的源碼拔鹰,有幾個(gè)點(diǎn)需要注意一下:
- 默認(rèn)會(huì)讀取ROCKETMQ_HOME系統(tǒng)變量,如果未設(shè)置辅鲸,需要手工指定或設(shè)置格郁,否則無(wú)法啟動(dòng)
- 可以看到NameServer配置來(lái)源有三種:默認(rèn)腹殿、配置文件独悴、啟動(dòng)項(xiàng)指定,優(yōu)先級(jí)依次上升(因?yàn)楹竺娴呐渲脮?huì)覆蓋前面的配置)
public static NamesrvController createNamesrvController(String[] args) throws IOException, JoranException {
// 設(shè)置版本號(hào)為系統(tǒng)屬性
System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION));
//PackageConflictDetect.detectFastjson();
// 命令行參數(shù)解析的過(guò)程
Options options = ServerUtil.buildCommandlineOptions(new Options());
commandLine = ServerUtil.parseCmdLine("mqnamesrv", args, buildCommandlineOptions(options), new PosixParser());
if (null == commandLine) {
System.exit(-1);
return null;
}
// 初始化NameServer的配置锣尉,關(guān)鍵的幾個(gè)默認(rèn)配置見(jiàn)$1
final NamesrvConfig namesrvConfig = new NamesrvConfig();
// 初始化NettyServer的配置刻炒,因?yàn)镽ocketMQ各個(gè)組件是使用Netty通信的,主要的默認(rèn)配置見(jiàn)$2
final NettyServerConfig nettyServerConfig = new NettyServerConfig();
// NameServer默認(rèn)的監(jiān)聽(tīng)端口是9876
nettyServerConfig.setListenPort(9876);
// 如果通過(guò) -c 的選項(xiàng)指定了配置配置自沧,會(huì)把文件中對(duì)應(yīng)的屬性讀取到NameServer和NettyServer的配置對(duì)象中
if (commandLine.hasOption('c')) {
String file = commandLine.getOptionValue('c');
if (file != null) {
InputStream in = new BufferedInputStream(new FileInputStream(file));
properties = new Properties();
properties.load(in);
MixAll.properties2Object(properties, namesrvConfig);
MixAll.properties2Object(properties, nettyServerConfig);
namesrvConfig.setConfigStorePath(file);
System.out.printf("load config properties file OK, %s%n", file);
in.close();
}
}
// 如果指定了 -p 選項(xiàng)坟奥,會(huì)打印NameServer和NettyServer的配置,然后正常退出
if (commandLine.hasOption('p')) {
InternalLogger console = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_CONSOLE_NAME);
MixAll.printObjectProperties(console, namesrvConfig);
MixAll.printObjectProperties(console, nettyServerConfig);
System.exit(0);
}
// 如果啟動(dòng)時(shí)指定了某個(gè)配置拇厢,那放入NameServer的配置對(duì)象
MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), namesrvConfig);
if (null == namesrvConfig.getRocketmqHome()) {
System.out.printf("Please set the %s variable in your environment to match the location of the RocketMQ installation%n", MixAll.ROCKETMQ_HOME_ENV);
System.exit(-2);
}
// 初始化logback的配置
LoggerContext lc = (LoggerContext) LoggerFactory.getILoggerFactory();
JoranConfigurator configurator = new JoranConfigurator();
configurator.setContext(lc);
lc.reset();
configurator.doConfigure(namesrvConfig.getRocketmqHome() + "/conf/logback_namesrv.xml");
log = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);
MixAll.printObjectProperties(log, namesrvConfig);
MixAll.printObjectProperties(log, nettyServerConfig);
// 使用NameServer和NettyServer的配置初始化控制器爱谁,同時(shí)會(huì)初始化控制器中一些其他屬性,如KVConfigManager孝偎、RouteInfoManager访敌、BrokerHousekeepingService等
final NamesrvController controller = new NamesrvController(namesrvConfig, nettyServerConfig);
// remember all configs to prevent discard
// 將所有配置信息注冊(cè)到一個(gè)大的Configuration對(duì)象中
// 其實(shí)在NamesrvController構(gòu)造方法中,NameServer和NettyServer的配置已經(jīng)注冊(cè)到Configuration了衣盾,而properties包含其中寺旺,不知道為什么要再次注冊(cè)一遍
controller.getConfiguration().registerConfig(properties);
return controller;
}
$1 默認(rèn)的NameServer配置
private String rocketmqHome = System.getProperty(MixAll.ROCKETMQ_HOME_PROPERTY, System.getenv(MixAll.ROCKETMQ_HOME_ENV));
// kv配置文件路徑
private String kvConfigPath = System.getProperty("user.home") + File.separator + "namesrv" + File.separator + "kvConfig.json";
// 全局配置文件路徑
private String configStorePath = System.getProperty("user.home") + File.separator + "namesrv" + File.separator + "namesrv.properties";
private String productEnvName = "center";
private boolean clusterTest = false;
// 默認(rèn)不開(kāi)啟順序消息
private boolean orderMessageEnable = false;
$2 默認(rèn)的NettyServer配置
// netty的監(jiān)聽(tīng)端口(根據(jù)代碼,會(huì)被9876的默認(rèn)值覆蓋)
private int listenPort = 8888;
// Netty默認(rèn)事件處理線程池势决,處理如broker注冊(cè)阻塑,topic路由信息查詢、topic刪除等
private int serverWorkerThreads = 8;
// Netty服務(wù)異步回調(diào)線程池線程數(shù)量
private int serverCallbackExecutorThreads = 0;
// Selector線程數(shù)量
private int serverSelectorThreads = 3;
// 控制單向的信號(hào)量
private int serverOnewaySemaphoreValue = 256;
// 控制異步信號(hào)量
private int serverAsyncSemaphoreValue = 64;
// 服務(wù)空閑心跳檢測(cè)時(shí)間間隔 單位秒
private int serverChannelMaxIdleTimeSeconds = 120;
// 發(fā)送緩沖區(qū)大小
private int serverSocketSndBufSize = NettySystemConfig.socketSndbufSize;
// 接受緩沖區(qū)大小
private int serverSocketRcvBufSize = NettySystemConfig.socketRcvbufSize;
// 是否使用Netty內(nèi)存池
private boolean serverPooledByteBufAllocatorEnable = true;
// 是否啟用Epoll IO模型果复,Linux環(huán)境建議開(kāi)啟
private boolean useEpollNativeSelector = false;
接著就是控制器的啟動(dòng)了陈莽,start方法如下,==省略了一些基礎(chǔ)校驗(yàn)和簡(jiǎn)單邏輯==,其中最重要的就是controller.initialize()方法
public static NamesrvController start(final NamesrvController controller) throws Exception {
// controller初始化
boolean initResult = controller.initialize();
// controller初始化失敗就退出
if (!initResult) {
controller.shutdown();
System.exit(-3);
}
// 注冊(cè)鉤子函數(shù)传透,系統(tǒng)正常退出會(huì)shutdown掉controller
Runtime.getRuntime().addShutdownHook(new ShutdownHookThread(log, new Callable<Void>() {
@Override
public Void call() throws Exception {
controller.shutdown();
return null;
}
}));
// 其實(shí)就是控制器內(nèi)部remotingServer的啟動(dòng)耘沼,同時(shí)異步起一個(gè)文件監(jiān)聽(tīng)服務(wù),注冊(cè)監(jiān)聽(tīng)器朱盐,詳見(jiàn)$5
controller.start();
return controller;
}
這里有幾個(gè)關(guān)鍵點(diǎn):
- 請(qǐng)求處理器有兩種群嗤,DefaultRequestProcessor和ClusterTestRequestProcessor,由NameServer中的clusterTest字段決定兵琳,ClusterTestRequestProcessor繼承自DefaultRequestProcessor狂秘,唯一的不同就是無(wú)法從本地讀取路由信息時(shí)會(huì)從集群中讀取
- 這里創(chuàng)建了很多的線程池,具體每個(gè)線程池的作用見(jiàn)$6
public boolean initialize() {
// 如果kvConfigPath路徑下存在相關(guān)文件躯肌,會(huì)把文件加載到kvConfigManager的配置表中
this.kvConfigManager.load();
// 初始化remotingServer者春,包含了很多nettyServer的相關(guān)配置,詳見(jiàn)$3
this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService);
// 創(chuàng)建一個(gè)固定大小的線程池清女,就是Netty中的work線程池钱烟,用于處理請(qǐng)求的
this.remotingExecutor =
Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(), new ThreadFactoryImpl("RemotingExecutorThread_"));
// 這里會(huì)注冊(cè)一個(gè)默認(rèn)的處理器DefaultRequestProcessor,綁定上面創(chuàng)建的線程池嫡丙,具體能處理哪些請(qǐng)求見(jiàn)$4
this.registerProcessor();
// 定時(shí)掃描Broker信息并清除失效的Broker路由信息拴袭,周期是10秒
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
NamesrvController.this.routeInfoManager.scanNotActiveBroker();
}
}, 5, 10, TimeUnit.SECONDS);
// 定時(shí)日志輸出kv配置信息,周期10秒
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
NamesrvController.this.kvConfigManager.printAllPeriodically();
}
}, 1, 10, TimeUnit.MINUTES);
// 如果沒(méi)有關(guān)閉TLS曙博,那么會(huì)開(kāi)啟一個(gè)異步文件監(jiān)聽(tīng)線程拥刻,監(jiān)聽(tīng)TLS相關(guān)配置文件的變化,默認(rèn)可選的
if (TlsSystemConfig.tlsMode != TlsMode.DISABLED) {
// Register a listener to reload SslContext
// 略
}
return true;
}
$3 NettyRemotingServer構(gòu)造方法
這里的NettyRemotingServer可以認(rèn)為就是Netty中的Server父泳,內(nèi)部其實(shí)初始化的基本都是Netty的配置般哼。
public NettyRemotingServer(final NettyServerConfig nettyServerConfig,
final ChannelEventListener channelEventListener) {
super(nettyServerConfig.getServerOnewaySemaphoreValue(), nettyServerConfig.getServerAsyncSemaphoreValue());
this.serverBootstrap = new ServerBootstrap();
this.nettyServerConfig = nettyServerConfig;
this.channelEventListener = channelEventListener;
// 前面的默認(rèn)配置是0,如果未曾手動(dòng)設(shè)置惠窄,這里會(huì)設(shè)置為4
int publicThreadNums = nettyServerConfig.getServerCallbackExecutorThreads();
if (publicThreadNums <= 0) {
publicThreadNums = 4;
}
// 新建一個(gè)公用的線程池蒸眠,如果某種消息處理沒(méi)有注冊(cè)其他線程則會(huì)使用這個(gè)
this.publicExecutor = Executors.newFixedThreadPool(publicThreadNums, new ThreadFactory() {
// 略
}
});
// 根據(jù)是否啟用epoll來(lái)初始化Netty中的EventLoopGroup
if (useEpoll()) {
this.eventLoopGroupBoss = new EpollEventLoopGroup(1, new ThreadFactory() {
// 略
});
this.eventLoopGroupSelector = new EpollEventLoopGroup(nettyServerConfig.getServerSelectorThreads(), new ThreadFactory() {
// 略
});
} else {
this.eventLoopGroupBoss = new NioEventLoopGroup(1, new ThreadFactory() {
// 略
});
this.eventLoopGroupSelector = new NioEventLoopGroup(nettyServerConfig.getServerSelectorThreads(), new ThreadFactory() {
// 略
});
}
// 加載TLS的上下文信息
loadSslContext();
}
$4 DefaultRequestProcessor對(duì)請(qǐng)求的處理
RemotingCommand有一個(gè)code字段用來(lái)區(qū)分命令類型,可以看到杆融,DefaultRequestProcessor針對(duì)不同的RemotingCommand有不同的處理(這里只是一部分命令類型楞卡,還有其他命令由其他處理器處理)。常量的名稱比較直觀擒贸,就不一一細(xì)說(shuō)了臀晃。==注意,這里用的線程池就是NamesrvController.remotingExecutor==
public RemotingCommand processRequest(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException {
if (ctx != null) {
log.debug("receive request, {} {} {}",
request.getCode(),
RemotingHelper.parseChannelRemoteAddr(ctx.channel()),
request);
}
switch (request.getCode()) {
case RequestCode.PUT_KV_CONFIG:
return this.putKVConfig(ctx, request);
case RequestCode.GET_KV_CONFIG:
return this.getKVConfig(ctx, request);
case RequestCode.DELETE_KV_CONFIG:
return this.deleteKVConfig(ctx, request);
case RequestCode.QUERY_DATA_VERSION:
return queryBrokerTopicConfig(ctx, request);
case RequestCode.REGISTER_BROKER:
Version brokerVersion = MQVersion.value2Version(request.getVersion());
if (brokerVersion.ordinal() >= MQVersion.Version.V3_0_11.ordinal()) {
return this.registerBrokerWithFilterServer(ctx, request);
} else {
return this.registerBroker(ctx, request);
}
case RequestCode.UNREGISTER_BROKER:
return this.unregisterBroker(ctx, request);
case RequestCode.GET_ROUTEINTO_BY_TOPIC:
return this.getRouteInfoByTopic(ctx, request);
case RequestCode.GET_BROKER_CLUSTER_INFO:
return this.getBrokerClusterInfo(ctx, request);
case RequestCode.WIPE_WRITE_PERM_OF_BROKER:
return this.wipeWritePermOfBroker(ctx, request);
case RequestCode.GET_ALL_TOPIC_LIST_FROM_NAMESERVER:
return getAllTopicListFromNameserver(ctx, request);
case RequestCode.DELETE_TOPIC_IN_NAMESRV:
return deleteTopicInNamesrv(ctx, request);
case RequestCode.GET_KVLIST_BY_NAMESPACE:
return this.getKVListByNamespace(ctx, request);
case RequestCode.GET_TOPICS_BY_CLUSTER:
return this.getTopicsByCluster(ctx, request);
case RequestCode.GET_SYSTEM_TOPIC_LIST_FROM_NS:
return this.getSystemTopicListFromNs(ctx, request);
case RequestCode.GET_UNIT_TOPIC_LIST:
return this.getUnitTopicList(ctx, request);
case RequestCode.GET_HAS_UNIT_SUB_TOPIC_LIST:
return this.getHasUnitSubTopicList(ctx, request);
case RequestCode.GET_HAS_UNIT_SUB_UNUNIT_TOPIC_LIST:
return this.getHasUnitSubUnUnitTopicList(ctx, request);
case RequestCode.UPDATE_NAMESRV_CONFIG:
return this.updateConfig(ctx, request);
case RequestCode.GET_NAMESRV_CONFIG:
return this.getConfig(ctx, request);
default:
break;
}
return null;
}
$5 remotingServer的啟動(dòng)(start方法)
public void start() {
// 創(chuàng)建線程池處理特定的channel事件(編解碼介劫、空閑檢測(cè)徽惋、連接管理等,查看下面具體的Handler)
this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(
nettyServerConfig.getServerWorkerThreads(),
new ThreadFactory() {
private AtomicInteger threadIndex = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "NettyServerCodecThread_" + this.threadIndex.incrementAndGet());
}
});
// 設(shè)置Netty啟動(dòng)類的一些參數(shù)
ServerBootstrap childHandler =
this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupSelector)
.channel(useEpoll() ? EpollServerSocketChannel.class : NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 1024)
.option(ChannelOption.SO_REUSEADDR, true)
.option(ChannelOption.SO_KEEPALIVE, false)
.childOption(ChannelOption.TCP_NODELAY, true)
.childOption(ChannelOption.SO_SNDBUF, nettyServerConfig.getServerSocketSndBufSize())
.childOption(ChannelOption.SO_RCVBUF, nettyServerConfig.getServerSocketRcvBufSize())
.localAddress(new InetSocketAddress(this.nettyServerConfig.getListenPort()))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline()
.addLast(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME,
new HandshakeHandler(TlsSystemConfig.tlsMode))
.addLast(defaultEventExecutorGroup,
new NettyEncoder(),
new NettyDecoder(),
new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()),
new NettyConnectManageHandler(),
new NettyServerHandler()
);
}
});
// 內(nèi)存池的配置
if (nettyServerConfig.isServerPooledByteBufAllocatorEnable()) {
childHandler.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
}
try {
ChannelFuture sync = this.serverBootstrap.bind().sync();
InetSocketAddress addr = (InetSocketAddress) sync.channel().localAddress();
this.port = addr.getPort();
} catch (InterruptedException e1) {
throw new RuntimeException("this.serverBootstrap.bind().sync() InterruptedException", e1);
}
// 這里nettyEventExecutor可以認(rèn)為是一個(gè)線程
// 會(huì)有專門的監(jiān)聽(tīng)器處理channel事件座韵,這里的監(jiān)聽(tīng)器就是BrokerHousekeepingService险绘,用來(lái)處理Broker的上下線
// 這里處理的隊(duì)列任務(wù)來(lái)源就是NettyConnectManageHandler踢京,NettyConnectManageHandler會(huì)針對(duì)不同的channel事件將不同任務(wù)放入nettyEventExecutor的隊(duì)列
if (this.channelEventListener != null) {
this.nettyEventExecutor.start();
}
this.timer.scheduleAtFixedRate(new TimerTask() {
@Override
public void run() {
try {
// 定時(shí)掃描和釋放無(wú)效的responseFuture
// responseFuture是一個(gè)響應(yīng)的占位符,如果超時(shí)未收到response則清除這個(gè)占位符
// 但是依然會(huì)執(zhí)行響應(yīng)綁定的回調(diào)函數(shù)(可能含有超時(shí)的處理邏輯)
NettyRemotingServer.this.scanResponseTable();
} catch (Throwable e) {
log.error("scanResponseTable exception", e);
}
}
}, 1000 * 3, 1000);
}
$6 每個(gè)線程池的作用(包含Netty線程池)
// 線程數(shù)NettyServerConfig.serverWorkerThreads宦棺,處理Broker注冊(cè)瓣距、kv配置增刪等事件
NamesrvController.remotingExecutor
// 線程數(shù)NettyServerConfig.serverCallbackExecutorThreads,從代碼來(lái)看是Broker處理的時(shí)候會(huì)用到代咸,如果某個(gè)業(yè)務(wù)處理器未關(guān)聯(lián)線程池就會(huì)用這個(gè)
NettyRemotingServer.publicExecutor
// 線程數(shù)1蹈丸,Netty中的Boss線程池
NettyRemotingServer.eventLoopGroupBoss
// 線程數(shù)NettyServerConfig.serverSelectorThreads,處理channel IO事件
NettyRemotingServer.eventLoopGroupSelector
// 線程數(shù)NettyServerConfig.serverWorkerThreads呐芥,業(yè)務(wù)處理線程逻杖,很多ChannelHandler綁定的線程池,具體命令處理還會(huì)往后提交給其他線程池
NettyRemotingServer.defaultEventExecutorGroup
整個(gè)NameServer的啟動(dòng)流程如下:
未完待續(xù)思瘟。荸百。。