微信公眾號「后端進階」莉钙,專注后端技術(shù)分享:Java、Golang筛谚、WEB框架磁玉、分布式中間件、服務(wù)治理等等驾讲。
老司機傾囊相授蚊伞,帶你一路進階,來不及解釋了快上車吮铭!
早期的rocketmq版本的路由功能是使用zookeeper實現(xiàn)的时迫,后來rocketmq為了追求性能,自己實現(xiàn)了一個性能更高效且實現(xiàn)簡單的路由中心NameServer沐兵,而且可以通過部署多個路由節(jié)點實現(xiàn)高可用别垮,但它們之間并不能互相通信,這也就會導(dǎo)致在某一個時刻各個路由節(jié)點間的數(shù)據(jù)并不完全相同扎谎,但數(shù)據(jù)某個時刻不一致并不會導(dǎo)致消息發(fā)送不了碳想,這也是rocketmq追求簡單高效的一個做法。
路由啟動
看了Nameserver的源碼后大呼驚嘆毁靶,整個NameServer總共就由這么幾個類類組成:
其中NamesrvStartup為啟動類胧奔,NamesrvController為核心控制器,RouteInfoManager為路由信息表预吆。
知道了這幾個類的功能之后龙填,我們就直接定位到NamesrvStartup啟動類的啟動方法:
org.apache.rocketmq.namesrv.NamesrvStartup#main0:
public static NamesrvController main0(String[] args) {
try {
// 步驟一
NamesrvController controller = createNamesrvController(args);
// 步驟二
start(controller);
String tip = "The Name Server boot success. serializeType=" + RemotingCommand.getSerializeTypeConfigInThisServer();
log.info(tip);
System.out.printf("%s%n", tip);
return controller;
} catch (Throwable e) {
e.printStackTrace();
System.exit(-1);
}
return null;
}
整個NameServer服務(wù)啟動的流程代碼都在main0(String[] args)方法了,看起來是不是很簡單拐叉,我們繼續(xù)往下擼它的具體實現(xiàn):
步驟一:
org.apache.rocketmq.namesrv.NamesrvStartup#createNamesrvController:
這個方法的代碼有點多岩遗,下面我會拆分成幾段進行分析:
// 創(chuàng)建命令行參數(shù)對象,這里定義了 -h 和 -n參數(shù)
Options options = ServerUtil.buildCommandlineOptions(new Options());
// 根據(jù)Options和運行時參數(shù)args生成命令行對象凤瘦,buildCommandlineOptions定義了-c參數(shù)(Name server config properties file)和-p參數(shù)(Print all config item)
commandLine = ServerUtil.parseCmdLine("mqnamesrv", args, buildCommandlineOptions(options), new PosixParser());
if (null == commandLine) {
System.exit(-1);
return null;
}
這里使用了Apache Commons CLI 命令行解析工具宿礁,它可以幫助開發(fā)者快速構(gòu)建啟動命令内狗,并且?guī)椭憬M織命令的參數(shù)泌霍、以及輸出列表等。
這段主要是根據(jù)運行時傳遞的參數(shù)生成commandLine命令行對象在抛,用于解析運行時類似于 -c 指定文件路徑,然后填充到namesrvConfig和nettyServerConfig對象中返吻。
final NamesrvConfig namesrvConfig = new NamesrvConfig();
final NettyServerConfig nettyServerConfig = new NettyServerConfig();
nettyServerConfig.setListenPort(9876);
if (commandLine.hasOption('c')) {
// 讀取命令行-c參數(shù)指定的配置文件
String file = commandLine.getOptionValue('c');
if (file != null) {
// 將文件轉(zhuǎn)成輸入流
InputStream in = new BufferedInputStream(new FileInputStream(file));
properties = new Properties();
// 加載到屬性對象
properties.load(in);
// 裝載配置
MixAll.properties2Object(properties, namesrvConfig);
MixAll.properties2Object(properties, nettyServerConfig);
}
}
這段是createNamesrvController(String[] args)方法最為核心的代碼姑子,從代碼知道先創(chuàng)建NamesrvConfig和NettyServerConfig對象,接著利用commandLine命令行工具讀取-c指定的配置文件路徑测僵,然后將其讀取到流中街佑,生成properties對象,最后將namesrvConfig和nettyServerConfig對象進行初始化恨课。
final NamesrvController controller = new NamesrvController(namesrvConfig, nettyServerConfig);
// remember all configs to prevent discard
controller.getConfiguration().registerConfig(properties);
到這里就水到渠成地利用namesrvConfig和nettyServerConfig對象創(chuàng)建NamesrvController對象舆乔,然后在注冊一遍properties防止丟失。
createNamesrvController(String[] args)這一步也是啟動nameserver最為關(guān)鍵的操作剂公,它為我們啟動時提供了namesrvConfig和nettyServerConfig配置對象希俩,以及創(chuàng)建NamesrvController核心控制器。
步驟二:
org.apache.rocketmq.namesrv.NamesrvStartup#start:
public static NamesrvController start(final NamesrvController controller) throws Exception {
if (null == controller) {
throw new IllegalArgumentException("NamesrvController is null");
}
// 對核心控制器進行初始化操作
boolean initResult = controller.initialize();
if (!initResult) {
controller.shutdown();
System.exit(-3);
}
// 注冊一個鉤子函數(shù)纲辽,用于JVM進程關(guān)閉時颜武,優(yōu)雅地釋放netty服務(wù)、線程池等資源
Runtime.getRuntime().addShutdownHook(new ShutdownHookThread(log, new Callable<Void>() {
@Override
public Void call() throws Exception {
controller.shutdown();
return null;
}
}));
// 核心控制器啟動操作
controller.start();
return controller;
}
步驟二也是一個次級的啟動流程控制方法拖吼,該方法主要對核心控制器進行初始化操作鳞上,同時注冊一個鉤子函數(shù),用于JVM進程關(guān)閉時吊档,優(yōu)雅地釋放netty服務(wù)篙议、線程池等資源,最后對核心控制器進行啟動操作怠硼,接下來我們繼續(xù)擼詳細實現(xiàn):
org.apache.rocketmq.namesrv.NamesrvController#initialize:
public boolean initialize() {
// 加載KV配置
this.kvConfigManager.load();
// 創(chuàng)建Netty網(wǎng)絡(luò)服務(wù)對象
this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService);
this.remotingExecutor =
Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(), new ThreadFactoryImpl("RemotingExecutorThread_"));
this.registerProcessor();
// 創(chuàng)建定時任務(wù)--每個10s掃描一次Broker鬼贱,并定時剔除不活躍的Broker
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
NamesrvController.this.routeInfoManager.scanNotActiveBroker();
}
}, 5, 10, TimeUnit.SECONDS);
// 創(chuàng)建定時任務(wù)--每個10分鐘打印一遍KV配置
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
NamesrvController.this.kvConfigManager.printAllPeriodically();
}
}, 1, 10, TimeUnit.MINUTES);
// ...
return true;
}
該方法主要是對核心控制器進行啟動前的一些初始化操作,包括根據(jù)NamesrvConfig的kvConfigPath存儲KV配置屬性的路徑加載KV配置香璃,創(chuàng)建定時任務(wù):每個10s掃描一次Broker这难,并定時剔除不活躍的Broker;每個10分鐘打印一遍KV配置葡秒。
這里的每個10s掃描一次Broker姻乓,并定時剔除不活躍的Broker,這里是路由刪除的一些邏輯眯牧,后面會講到蹋岩。
org.apache.rocketmq.namesrv.NamesrvController#start:
public void start() throws Exception {
this.remotingServer.start();
if (this.fileWatchService != null) {
this.fileWatchService.start();
}
}
到這里對啟動進行最后一步操作,即創(chuàng)建Netty服務(wù)学少,我們知道rocketmq是通過netty來進行通信剪个,對應(yīng)netty的一些細節(jié)這里就不展開講了,后面我也會計劃寫一些關(guān)于netty的系列文章旱易,敬請期待禁偎。
路由啟動時序圖:
路由注冊
路由注冊即是Broker向Nameserver注冊的過程,它們是通過Broker的心跳功能實現(xiàn)的阀坏,那么既然Nameserver是用來存儲Broker的注冊信息如暖,那么我們就先來看看Nameserver到底存儲了哪些信息,回到文章最開始的那張結(jié)構(gòu)圖忌堂,我們知道RouteInfoManager為路由信息表:
org.apache.rocketmq.namesrv.routeinfo.RouteInfoManager:
public class RouteInfoManager {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);
private final static long BROKER_CHANNEL_EXPIRED_TIME = 1000 * 60 * 2;
private final ReadWriteLock lock = new ReentrantReadWriteLock();
private final HashMap<String/* topic */, List<QueueData>> topicQueueTable;
private final HashMap<String/* brokerName */, BrokerData> brokerAddrTable;
private final HashMap<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable;
private final HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable;
private final HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;
}
- topicQueueTable:Topic消息隊列路由信息盒至,包括topic所在的broker名稱,讀隊列數(shù)量士修,寫隊列數(shù)量枷遂,同步標記等信息,rocketmq根據(jù)topicQueueTable的信息進行負載均衡消息發(fā)送棋嘲。
- brokerAddrTable:Broker節(jié)點信息酒唉,包括brokername,所在集群名稱沸移,還有主備節(jié)點信息痪伦。
- clusterAddrTable:Broker集群信息,存儲了集群中所有的Brokername雹锣。
- brokerLiveTable:Broker狀態(tài)信息网沾,Nameserver每次收到Broker的心跳包就會更新該信息。
這里也先講一下rocketmq是基于訂閱發(fā)布機制蕊爵,我之前也寫過一篇文章《rocketmq的消費模式》辉哥,我們可知一個Topic擁有多個消息隊列,如果不指定隊列的數(shù)量攒射,一個Broker會為每個Topic創(chuàng)建4個讀隊列和4個寫隊列醋旦,多個Broker組成集群,Broker會通過發(fā)送心跳包將自己的信息注冊到路由中心匆篓,路由中心brokerLiveTable存儲Broker的狀態(tài)浑度,它會根據(jù)Broker的心跳包更新Broker狀態(tài)信息。
步驟一:Broker發(fā)送心跳包
org.apache.rocketmq.broker.BrokerController#start:
public void start() throws Exception {
// 初次啟動鸦概,這里會強制執(zhí)行發(fā)送心跳包
this.registerBrokerAll(true, false, true);
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
BrokerController.this.registerBrokerAll(true, false, brokerConfig.isForceRegister());
} catch (Throwable e) {
log.error("registerBrokerAll Exception", e);
}
}
}, 1000 * 10, Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)), TimeUnit.MILLISECONDS);
}
Broker在核心控制器啟動時箩张,會強制發(fā)送一次心跳包,接著創(chuàng)建一個定時任務(wù)窗市,定時向路由中心發(fā)送心跳包先慷。
org.apache.rocketmq.broker.BrokerController#registerBrokerAll:
public synchronized void registerBrokerAll(final boolean checkOrderConfig, boolean oneway, boolean forceRegister) {
// 創(chuàng)建一個topic包裝類
TopicConfigSerializeWrapper topicConfigWrapper = this.getTopicConfigManager().buildTopicConfigSerializeWrapper();
// 這里比較有趣,如果該broker沒有讀寫權(quán)限咨察,那么會新建一個臨時的topicConfigTable论熙,再set進包裝類
if (!PermName.isWriteable(this.getBrokerConfig().getBrokerPermission())
|| !PermName.isReadable(this.getBrokerConfig().getBrokerPermission())) {
ConcurrentHashMap<String, TopicConfig> topicConfigTable = new ConcurrentHashMap<String, TopicConfig>();
for (TopicConfig topicConfig : topicConfigWrapper.getTopicConfigTable().values()) {
TopicConfig tmp =
new TopicConfig(topicConfig.getTopicName(), topicConfig.getReadQueueNums(), topicConfig.getWriteQueueNums(),
this.brokerConfig.getBrokerPermission());
topicConfigTable.put(topicConfig.getTopicName(), tmp);
}
topicConfigWrapper.setTopicConfigTable(topicConfigTable);
}
// 判斷是否該Broker是否需要發(fā)送心跳包
if (forceRegister || needRegister(this.brokerConfig.getBrokerClusterName(),
this.getBrokerAddr(),
this.brokerConfig.getBrokerName(),
this.brokerConfig.getBrokerId(),
this.brokerConfig.getRegisterBrokerTimeoutMills())) {
// 執(zhí)行發(fā)送心跳包
doRegisterBrokerAll(checkOrderConfig, oneway, topicConfigWrapper);
}
}
該方法是Broker執(zhí)行發(fā)送心跳包的核心控制方法,它主要做了topic的包裝類封裝操作摄狱,判斷Broker此時是否需要執(zhí)行發(fā)送心跳包脓诡,但我查了下org.apache.rocketmq.common.BrokerConfig#forceRegister字段的值永遠等于true无午,也就是該判斷永遠為true,即每次都需要發(fā)送心跳包祝谚。
我們定位到needRegister遠程調(diào)用到路由中心的方法:
org.apache.rocketmq.namesrv.routeinfo.RouteInfoManager#isBrokerTopicConfigChanged:
public boolean isBrokerTopicConfigChanged(final String brokerAddr, final DataVersion dataVersion) {
DataVersion prev = queryBrokerTopicConfig(brokerAddr);
return null == prev || !prev.equals(dataVersion);
}
public DataVersion queryBrokerTopicConfig(final String brokerAddr) {
BrokerLiveInfo prev = this.brokerLiveTable.get(brokerAddr);
if (prev != null) {
return prev.getDataVersion();
}
return null;
}
發(fā)現(xiàn)宪迟,Broker是否需要發(fā)送心跳包由該Broker在路由中心org.apache.rocketmq.namesrv.routeinfo.BrokerLiveInfo#dataVersion決定,如果dataVersion為空或者當前dataVersion不等于brokerLiveTable存儲的brokerLiveTable交惯,Broker就需要發(fā)送心跳包次泽。
步驟二:Nameserver處理心跳包
Nameserver的netty服務(wù)監(jiān)聽收到心跳包之后,會調(diào)用到路由中心以下方法進行處理:
org.apache.rocketmq.namesrv.routeinfo.RouteInfoManager#registerBroker:
public RegisterBrokerResult registerBroker(
final String clusterName,
final String brokerAddr,
final String brokerName,
final long brokerId,
final String haServerAddr,
final TopicConfigSerializeWrapper topicConfigWrapper,
final List<String> filterServerList,
final Channel channel) {
RegisterBrokerResult result = new RegisterBrokerResult();
try {
try {
this.lock.writeLock().lockInterruptibly();
// 獲取集群下所有的Broker席爽,并將當前Broker加入clusterAddrTable意荤,由于brokerNames是Set結(jié)構(gòu),并不會重復(fù)
Set<String> brokerNames = this.clusterAddrTable.get(clusterName);
if (null == brokerNames) {
brokerNames = new HashSet<String>();
this.clusterAddrTable.put(clusterName, brokerNames);
}
brokerNames.add(brokerName);
boolean registerFirst = false;
// 獲取Broker信息只锻,如果是首次注冊玖像,那么新建一個BrokerData并加入brokerAddrTable
BrokerData brokerData = this.brokerAddrTable.get(brokerName);
if (null == brokerData) {
registerFirst = true;
brokerData = new BrokerData(clusterName, brokerName, new HashMap<Long, String>());
this.brokerAddrTable.put(brokerName, brokerData);
}
// 這里判斷Broker是否是已經(jīng)注冊過
String oldAddr = brokerData.getBrokerAddrs().put(brokerId, brokerAddr);
registerFirst = registerFirst || (null == oldAddr);
// 如果是Broker是Master節(jié)點嗎,并且Topic信息更新或者是首次注冊炬藤,那么創(chuàng)建更新topic隊列信息
if (null != topicConfigWrapper
&& MixAll.MASTER_ID == brokerId) {
if (this.isBrokerTopicConfigChanged(brokerAddr, topicConfigWrapper.getDataVersion())
|| registerFirst) {
ConcurrentMap<String, TopicConfig> tcTable =
topicConfigWrapper.getTopicConfigTable();
if (tcTable != null) {
for (Map.Entry<String, TopicConfig> entry : tcTable.entrySet()) {
this.createAndUpdateQueueData(brokerName, entry.getValue());
}
}
}
}
// 更新BrokerLiveInfo狀態(tài)信息
BrokerLiveInfo prevBrokerLiveInfo =
this.brokerLiveTable.put(brokerAddr,new BrokerLiveInfo(System.currentTimeMillis(),topicConfigWrapper.getDataVersion(),channel,haServerAddr));
} finally {
this.lock.writeLock().unlock();
}
} catch (Exception e) {
log.error("registerBroker Exception", e);
}
return result;
}
該方法是處理Broker心跳包的最核心方法御铃,它主要做了對RouteInfoManager路由信息的一些更新操作,包括對clusterAddrTable沈矿、brokerAddrTable上真、topicQueueTable、brokerLiveTable等路由信息羹膳。
路由注冊時序圖:
路由刪除
前面部分我們分析了Nameserver啟動時會創(chuàng)建一個定時任務(wù)睡互,定時剔除不活躍的Broker。
org.apache.rocketmq.namesrv.routeinfo.RouteInfoManager#scanNotActiveBroker:
public void scanNotActiveBroker() {
Iterator<Entry<String, BrokerLiveInfo>> it = this.brokerLiveTable.entrySet().iterator();
while (it.hasNext()) {
Entry<String, BrokerLiveInfo> next = it.next();
long last = next.getValue().getLastUpdateTimestamp();
// 如果當前時間大于最后修改時間加上Broker過期時間陵像,那么就剔除該Broker
if ((last + BROKER_CHANNEL_EXPIRED_TIME) < System.currentTimeMillis()) {
// 關(guān)閉Broker對應(yīng)的channel
RemotingUtil.closeChannel(next.getValue().getChannel());
it.remove();
log.warn("The broker channel expired, {} {}ms", next.getKey(), BROKER_CHANNEL_EXPIRED_TIME);
// 從brokerLiveTable就珠、brokerAddrTable、topicQueueTable移除Broker相關(guān)信息
this.onChannelDestroy(next.getKey(), next.getValue().getChannel());
}
}
}
剔除Broker信息的邏輯比較簡單醒颖,首先從BrokerLiveInfo獲取狀態(tài)信息妻怎,判斷Broker的心跳時間是否已超過限定值,若超過之后就執(zhí)行剔除邏輯泞歉。
分析完了rocketmq自帶的路由中心源碼逼侦,其實我們自己實現(xiàn)一個路由中心貌似也不難。有時候我們發(fā)現(xiàn)公司有些項目可以獨立拆分出來做成中間件的形式腰耙,也就是單獨部署榛丢,其它業(yè)務(wù)依賴client包調(diào)用中間件服務(wù),比如短信挺庞、推送晰赞、郵件、配置等模塊。如果我們把這些中間件做成高可用集群部署掖鱼,也可以考慮自己實現(xiàn)一個路由中心然走。