RocketMQ架構(gòu)的四個(gè)核心
Producer Cluster
消息的發(fā)送者瓣窄,需要去NameServer去取到對(duì)應(yīng)Topic的信息去建立連接并發(fā)送消息到指定Broker
Consumer Cluster
消息的消費(fèi)者,需要去NameServer去取到對(duì)應(yīng)Topic的信息去建立連接到指定的Broker并取得消息去消費(fèi)
Broker
實(shí)際消息的接收站纳鼎,消息的存儲(chǔ)地俺夕,過(guò)濾功能等。啟動(dòng)會(huì)向NameServer注冊(cè)自己贱鄙。
NameServer
RocketMQ的注冊(cè)中心啥么,支持Broker的動(dòng)態(tài)注冊(cè)與發(fā)現(xiàn),保存元數(shù)據(jù)贰逾,干啥都要先找他去做路由悬荣。
NameSrv -
在真實(shí)環(huán)境中,如果有生產(chǎn)者疙剑,消費(fèi)者加入或者掉線氯迂,Broker擴(kuò)容或掉線等各種異常場(chǎng)景,NameSrv的協(xié)調(diào)管理能力正是用于解決此類場(chǎng)景言缤。但相較之下嚼蚀,NameSrv比大多數(shù)分布式協(xié)調(diào)服務(wù)或注冊(cè)中心又輕量的多。比如ZK管挟、Eureka都會(huì)在Node之間做同步轿曙,而NameSrv被設(shè)計(jì)成無(wú)狀態(tài)的,每個(gè)NameSrv節(jié)點(diǎn)互相不通信僻孝。
NameSrv 啟動(dòng)流程
入口
public static NamesrvController main0(String[] args) {
try {
NamesrvController controller = createNamesrvController(args);
start(controller);
String tip = "The Name Server boot success. serializeType=" + RemotingCommand.getSerializeTypeConfigInThisServer();
log.info(tip);
System.out.printf("%s%n", tip);
return controller;
} catch (Throwable e) {
e.printStackTrace();
System.exit(-1);
}
return null;
}
createNamesrvController
public static NamesrvController createNamesrvController(String[] args) throws IOException, JoranException {
// 省略部分
final NamesrvConfig namesrvConfig = new NamesrvConfig();
final NettyServerConfig nettyServerConfig = new NettyServerConfig();
nettyServerConfig.setListenPort(9876);
// 如果啟動(dòng)命令含有 c 代表指定了配置文件 如 -c /home/rocketmq/conf/namesrv.properties
if (commandLine.hasOption('c')) {
String file = commandLine.getOptionValue('c');
if (file != null) {
// 讀文件
InputStream in = new BufferedInputStream(new FileInputStream(file));
properties = new Properties();
properties.load(in);
// 把配置寫(xiě)入namesrvConfig和nettyServerConfig中
MixAll.properties2Object(properties, namesrvConfig);
MixAll.properties2Object(properties, nettyServerConfig);
namesrvConfig.setConfigStorePath(file);
System.out.printf("load config properties file OK, %s%n", file);
in.close();
}
}
// 省略部分
// 日志配置
LoggerContext lc = (LoggerContext) LoggerFactory.getILoggerFactory();
JoranConfigurator configurator = new JoranConfigurator();
configurator.setContext(lc);
lc.reset();
configurator.doConfigure(namesrvConfig.getRocketmqHome() + "/conf/logback_namesrv.xml");
log = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);
MixAll.printObjectProperties(log, namesrvConfig);
MixAll.printObjectProperties(log, nettyServerConfig);
// 把namesrvConfig和nettyServerConfig構(gòu)建一個(gè)NamesrvController
final NamesrvController controller = new NamesrvController(namesrvConfig, nettyServerConfig);
// remember all configs to prevent discard
controller.getConfiguration().registerConfig(properties);
return controller;
}
NamesrvController的參數(shù)和他的構(gòu)造函數(shù)
NamesrvController的參數(shù)
// namesrv配置信息
private final NamesrvConfig namesrvConfig;
// nettyServer配置信息
private final NettyServerConfig nettyServerConfig;
// 單個(gè)線程的定時(shí)調(diào)度線程池
private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl(
"NSScheduledThread"));
// 鍵值對(duì)管理
private final KVConfigManager kvConfigManager;
// 路由信息管理
private final RouteInfoManager routeInfoManager;
// 實(shí)際啟動(dòng)的netty server
private RemotingServer remotingServer;
// broker管理
private BrokerHousekeepingService brokerHousekeepingService;
// 固定大小的線程池
private ExecutorService remotingExecutor;
// 配置類
private Configuration configuration;
// 暫不知道
private FileWatchService fileWatchService;
可以看到NamesrvController類包含了很多信息导帝,如有一個(gè)NettyServer和相關(guān)的配置,有鍵值對(duì)管理穿铆,有路由信息管理您单,有broker管理,有自己的配置荞雏。
NamesrvController的構(gòu)造器
public NamesrvController(NamesrvConfig namesrvConfig, NettyServerConfig nettyServerConfig) {
this.namesrvConfig = namesrvConfig;
this.nettyServerConfig = nettyServerConfig;
this.kvConfigManager = new KVConfigManager(this);
this.routeInfoManager = new RouteInfoManager();
this.brokerHousekeepingService = new BrokerHousekeepingService(this);
this.configuration = new Configuration(
log,
this.namesrvConfig, this.nettyServerConfig
);
this.configuration.setStorePathFromConfig(this.namesrvConfig, "configStorePath");
}
至此虐秦,NamesrvController已實(shí)例化完畢,返回入口再執(zhí)行start凤优。
啟動(dòng) controller
public static NamesrvController start(final NamesrvController controller) throws Exception {
if (null == controller) {
throw new IllegalArgumentException("NamesrvController is null");
}
// 1. 初始化controller
boolean initResult = controller.initialize();
if (!initResult) {
controller.shutdown();
System.exit(-3);
}
// 2. 注冊(cè)JVM鉤子函數(shù)悦陋,在JVM進(jìn)程關(guān)閉之前,把用到的線程池先關(guān)閉
Runtime.getRuntime().addShutdownHook(new ShutdownHookThread(log, new Callable<Void>() {
@Override
public Void call() throws Exception {
controller.shutdown();
return null;
}
}));
// 3. 啟用controller筑辨,其實(shí)就是啟動(dòng)netty server
controller.start();
return controller;
}
本質(zhì)上NameServer是一個(gè)tcp server俺驶,啟動(dòng)后用于接收來(lái)自broker,C,P的請(qǐng)求并作出處理。
初始化
public boolean initialize() {
this.kvConfigManager.load();
// 創(chuàng)建netty server
this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService);
// 創(chuàng)建線程池挖垛,默認(rèn)8個(gè)線程痒钝,最后丟給netty server使用
this.remotingExecutor =
Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(), new ThreadFactoryImpl("RemotingExecutorThread_"));
// 為remotingServer綁定processor, 實(shí)際就是用來(lái)處理NettyServer接收到的請(qǐng)求
this.registerProcessor();
// 掃描不活躍的broker
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
NamesrvController.this.routeInfoManager.scanNotActiveBroker();
}
}, 5, 10, TimeUnit.SECONDS);
// 打配置
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
NamesrvController.this.kvConfigManager.printAllPeriodically();
}
}, 1, 10, TimeUnit.MINUTES);
// 非主線省略
return true;
}
NameServer啟動(dòng)的流程可被簡(jiǎn)單歸類于以下幾個(gè)步驟 -
- 讀配置,分發(fā)在NameServer和Netty的配置中
- 用配置構(gòu)建NamesrvController實(shí)例
- 初始化NamesrvController
- 啟動(dòng)NamesrvController中的netty server用于接收請(qǐng)求痢毒,響應(yīng)請(qǐng)求
至此送矩,NameSrv已經(jīng)成功啟動(dòng)了NettyServer,現(xiàn)在NameSrv是一個(gè)可以被生產(chǎn)者哪替,Broker栋荸,消費(fèi)者連接的一個(gè)狀態(tài)了,并可以源源不斷的處理他們發(fā)過(guò)來(lái)的請(qǐng)求凭舶。在this.registerProcessor();
這一行代碼中晌块,也為NettyServer配置了具體的業(yè)務(wù)處理器。我們后面將在DefaultRequestProcessor
中去探索其可以實(shí)現(xiàn)哪些功能帅霜。
NameSrv的核心數(shù)據(jù)結(jié)構(gòu)
從上文的掃描不活躍的broker來(lái)初探NameSrv的核心數(shù)據(jù)結(jié)構(gòu)
public void scanNotActiveBroker() {
Iterator<Entry<String, BrokerLiveInfo>> it = this.brokerLiveTable.entrySet().iterator();
// 遍歷存活broker map
while (it.hasNext()) {
Entry<String, BrokerLiveInfo> next = it.next();
// 得到上一次更新時(shí)間
long last = next.getValue().getLastUpdateTimestamp();
// 2分鐘沒(méi)發(fā)心跳
if ((last + BROKER_CHANNEL_EXPIRED_TIME) < System.currentTimeMillis()) {
// 關(guān)閉channel
RemotingUtil.closeChannel(next.getValue().getChannel());
// 移出map
it.remove();
log.warn("The broker channel expired, {} {}ms", next.getKey(), BROKER_CHANNEL_EXPIRED_TIME);
this.onChannelDestroy(next.getKey(), next.getValue().getChannel());
}
}
}
brokerLiveTable是什么匆背?從上文中看是一個(gè)維護(hù)每個(gè)Broker與NameSrv存活關(guān)系集合。NameSrv靠著一次次的接收心跳請(qǐng)求來(lái)判斷這些Broker有沒(méi)有失活身冀。
private final HashMap<String/* topic */, List<QueueData>> topicQueueTable;
private final HashMap<String/* brokerName */, BrokerData> brokerAddrTable;
private final HashMap<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable;
private final HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable;
private final HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;
這些就是NameSrv的核心存儲(chǔ)結(jié)構(gòu)
可以從兩方面來(lái)理解钝尸,其最終都是為了拿到某個(gè)broker的地址去建立連接或?qū)roker安置到哪個(gè)Map中。
- 第一種是由topic去
topicQueueTable
中拿到brokerName再去brokerAddrTable
去拿到broker的地址信息搂根。這是能猜測(cè)到的生產(chǎn)者去NameSrv拿broker地址的一種場(chǎng)景珍促。 - 第二種是由集群名去
clusterAddrTable
去拿到此集群下的brokerName。經(jīng)常發(fā)生在注冊(cè)broker剩愧。 -
brokerLiveTable
用于保持brokerAddr的心跳狀態(tài)猪叙,以定期移除不存活的broker。 -
filterServerTable
這里暫不討論
我們下面也將根據(jù)以上2種較為常用的場(chǎng)景對(duì)這些功能進(jìn)行分析仁卷,也可以看到NameSrv是如何基于以上幾個(gè)Map做出各種功能穴翩。
NameSrv的功能
NameSrv所支持的功能都在org.apache.rocketmq.namesrv.processor.DefaultRequestProcessor#processRequest
方法的switch中。其主要功能是負(fù)責(zé)處理netty接收到的請(qǐng)求的锦积,根據(jù)請(qǐng)求的類型分別執(zhí)行不同的操作藏否。我們挑RequestCode.REGISTER_BROKER
和RequestCode.GET_ROUTEINFO_BY_TOPIC
來(lái)看看。
路由注冊(cè)
public RegisterBrokerResult registerBroker(
final String clusterName,
final String brokerAddr,
final String brokerName,
final long brokerId,
final String haServerAddr,
final TopicConfigSerializeWrapper topicConfigWrapper,
final List<String> filterServerList,
final Channel channel) {
RegisterBrokerResult result = new RegisterBrokerResult();
try {
try {
this.lock.writeLock().lockInterruptibly();
// 根據(jù)集群名拿到集群下的所有brokerName
Set<String> brokerNames = this.clusterAddrTable.get(clusterName);
// 如果不存在充包,則創(chuàng)建副签,把集群名,brokerName進(jìn)行映射
if (null == brokerNames) {
brokerNames = new HashSet<String>();
this.clusterAddrTable.put(clusterName, brokerNames);
}
// 把新注冊(cè)的brokerName丟到集群table里
brokerNames.add(brokerName);
boolean registerFirst = false;
// 再根據(jù)brokerName去拿到broker的詳細(xì)信息里查
BrokerData brokerData = this.brokerAddrTable.get(brokerName);
// 如果找不到對(duì)應(yīng)的brokerData數(shù)據(jù)基矮,則證明是第一次注冊(cè)
if (null == brokerData) {
registerFirst = true;
// 新建brokerData并放入brokerAddrTable
brokerData = new BrokerData(clusterName, brokerName, new HashMap<Long, String>());
this.brokerAddrTable.put(brokerName, brokerData);
}
// 一個(gè)brokerName對(duì)應(yīng)的BrokerData可能對(duì)應(yīng)多個(gè)broker地址
Map<Long, String> brokerAddrsMap = brokerData.getBrokerAddrs();
//Switch slave to master: first remove <1, IP:PORT> in namesrv, then add <0, IP:PORT>
//The same IP:PORT must only have one record in brokerAddrTable
Iterator<Entry<Long, String>> it = brokerAddrsMap.entrySet().iterator();
while (it.hasNext()) {
Entry<Long, String> item = it.next();
// 此broker的地址之前就在里面了淆储,但此次的brokerId如發(fā)生變化,則代表主從發(fā)生了變化家浇,移除
if (null != brokerAddr && brokerAddr.equals(item.getValue()) && brokerId != item.getKey()) {
it.remove();
}
}
// 放入新的并返回舊的本砰,如果不存在會(huì)返回null
String oldAddr = brokerData.getBrokerAddrs().put(brokerId, brokerAddr);
// oldAddr的值就能確定是不是第一次注冊(cè)
registerFirst = registerFirst || (null == oldAddr);
// 如果是master節(jié)點(diǎn)
if (null != topicConfigWrapper
&& MixAll.MASTER_ID == brokerId) {
// 如果version不一樣或是第一次注冊(cè)
if (this.isBrokerTopicConfigChanged(brokerAddr, topicConfigWrapper.getDataVersion())
|| registerFirst) {
ConcurrentMap<String, TopicConfig> tcTable =
topicConfigWrapper.getTopicConfigTable();
if (tcTable != null) {
// 創(chuàng)建或更新queueData
for (Map.Entry<String, TopicConfig> entry : tcTable.entrySet()) {
this.createAndUpdateQueueData(brokerName, entry.getValue());
}
}
}
}
// 填充broker狀態(tài)表,以便心跳檢測(cè)
BrokerLiveInfo prevBrokerLiveInfo = this.brokerLiveTable.put(brokerAddr,
new BrokerLiveInfo(
System.currentTimeMillis(),
topicConfigWrapper.getDataVersion(),
channel,
haServerAddr));
if (null == prevBrokerLiveInfo) {
log.info("new broker registered, {} HAServer: {}", brokerAddr, haServerAddr);
}
if (filterServerList != null) {
if (filterServerList.isEmpty()) {
this.filterServerTable.remove(brokerAddr);
} else {
this.filterServerTable.put(brokerAddr, filterServerList);
}
}
// 如果不是master節(jié)點(diǎn)钢悲,則把master節(jié)點(diǎn)的地址放到haServer和masterAddr中
if (MixAll.MASTER_ID != brokerId) {
String masterAddr = brokerData.getBrokerAddrs().get(MixAll.MASTER_ID);
if (masterAddr != null) {
BrokerLiveInfo brokerLiveInfo = this.brokerLiveTable.get(masterAddr);
if (brokerLiveInfo != null) {
result.setHaServerAddr(brokerLiveInfo.getHaServerAddr());
result.setMasterAddr(masterAddr);
}
}
}
} finally {
this.lock.writeLock().unlock();
}
} catch (Exception e) {
log.error("registerBroker Exception", e);
}
return result;
}
我們將注冊(cè)broker概括為以下幾步
- 先把本次注冊(cè)的broker信息加到
clusterAddrTable
中点额,意為本集群下多了一個(gè)broker - 然后把broker信息注冊(cè)到
brokerAddrTable
中舔株,這一步可能發(fā)生主從變化等 - 在
topicQueueTable
中創(chuàng)建或更新隊(duì)列數(shù)據(jù) - 在
brokerLiveTable
中維護(hù)本次broker的心跳信息 - 如果不是Master節(jié)點(diǎn),則返回Master節(jié)點(diǎn)地址
路由發(fā)現(xiàn)
其實(shí)看過(guò)路由注冊(cè)之后还棱,八九不離十就能猜到路由發(fā)現(xiàn)可能是從topicQueueTable
取到broker的信息载慈,然后再去brokerAddrTable
去獲取每個(gè)broker的詳細(xì)信息。源碼也證實(shí)了這一點(diǎn):
public TopicRouteData pickupTopicRouteData(final String topic) {
TopicRouteData topicRouteData = new TopicRouteData();
boolean foundQueueData = false;
boolean foundBrokerData = false;
Set<String> brokerNameSet = new HashSet<String>();
List<BrokerData> brokerDataList = new LinkedList<BrokerData>();
topicRouteData.setBrokerDatas(brokerDataList);
HashMap<String, List<String>> filterServerMap = new HashMap<String, List<String>>();
topicRouteData.setFilterServerTable(filterServerMap);
try {
try {
this.lock.readLock().lockInterruptibly();
List<QueueData> queueDataList = this.topicQueueTable.get(topic);
if (queueDataList != null) {
// topic隊(duì)列元信息集合
topicRouteData.setQueueDatas(queueDataList);
foundQueueData = true;
Iterator<QueueData> it = queueDataList.iterator();
while (it.hasNext()) {
QueueData qd = it.next();
brokerNameSet.add(qd.getBrokerName());
}
for (String brokerName : brokerNameSet) {
BrokerData brokerData = this.brokerAddrTable.get(brokerName);
if (null != brokerData) {
BrokerData brokerDataClone = new BrokerData(brokerData.getCluster(), brokerData.getBrokerName(), (HashMap<Long, String>) brokerData
.getBrokerAddrs().clone());
// broker的元數(shù)據(jù)集合
brokerDataList.add(brokerDataClone);
foundBrokerData = true;
for (final String brokerAddr : brokerDataClone.getBrokerAddrs().values()) {
List<String> filterServerList = this.filterServerTable.get(brokerAddr);
filterServerMap.put(brokerAddr, filterServerList);
}
}
}
}
} finally {
this.lock.readLock().unlock();
}
} catch (Exception e) {
log.error("pickupTopicRouteData Exception", e);
}
log.debug("pickupTopicRouteData {} {}", topic, topicRouteData);
if (foundBrokerData && foundQueueData) {
return topicRouteData;
}
return null;
}
總結(jié)
理解了NameSrv的核心數(shù)據(jù)結(jié)構(gòu)的用處和他們的層級(jí)關(guān)系珍手,就變相的理解了NameSrv的作用場(chǎng)景办铡。從Processor上看,每個(gè)請(qǐng)求對(duì)應(yīng)的處理皆是對(duì)這幾個(gè)Map進(jìn)行 一些信息的提取和維護(hù)琳要。