Rocketmq
使用 namesrv
來(lái)管理所有的元數(shù)據(jù)信息疯暑,包括主題 Topic
路由信息和 Broker
信息矫夯。
首先我們介紹一下一些基礎(chǔ)概念:
-
Broker
: 儲(chǔ)存消息的服務(wù)器。- 分為主從兩種模式欺税,通過(guò)
brokerId
來(lái)區(qū)分,目前brokerId = 0
就表示主節(jié)點(diǎn)。 - 每個(gè)
Broker
啟動(dòng)時(shí)颊埃,會(huì)向namesrv
注冊(cè)自己的信息,并會(huì)定期發(fā)送心跳信息蝶俱。
- 分為主從兩種模式欺税,通過(guò)
-
Broker
組 : 相同brokerName
名字的Broker
服務(wù)器就是一個(gè)組的班利。注意: 這里就有一個(gè)小問(wèn)題,如果兩個(gè)
Broker
有相同brokerName
名字榨呆,而且brokerId
都是0
時(shí)罗标,它們都可以向namesrv
注冊(cè)自己信息,后面覆蓋前面信息积蜻,而且因?yàn)樗鼈兌紩?huì)發(fā)送心跳消息闯割,就會(huì)導(dǎo)致不斷地相互覆蓋。 -
Broker
集群 : 有相同clusterName
名字的Broker
服務(wù)器就是同一個(gè)集群的竿拆。 -
Topic
: 主題Topic
是以Broker
組進(jìn)行區(qū)分的纽谒。-
Broker
組有一個(gè)TopicConfigManager
來(lái)管理該Broker
所擁有的所有主題Topic
信息,包括主題Topic
的權(quán)限perm
如输,讀隊(duì)列數(shù)量readQueueNums
鼓黔,writeQueueNums
寫隊(duì)列數(shù)量等等。 -
Broker
組中主Broker
創(chuàng)建主題Topic
在這個(gè)Broker
組擁有隊(duì)列文件Queue
不见,從Broker
只是復(fù)制主Broker
澳化。 - 生產(chǎn)者發(fā)送消息時(shí),也是從主題
Topic
擁有的Broker
組數(shù)組中稳吮,挑選一個(gè)Broker
組缎谷,向這個(gè)Broker
組的主Broker
發(fā)送消息,然后主Broker
再將這些數(shù)據(jù)發(fā)送給從Broker
灶似。
-
一. NamesrvStartup
類
這個(gè)類是 namesrv
的啟動(dòng)類列林,用來(lái)開啟 namesrv
瑞你。
1.1 main
方法
public static void main(String[] args) {
main0(args);
}
public static NamesrvController main0(String[] args) {
try {
// 創(chuàng)建 NamesrvController
NamesrvController controller = createNamesrvController(args);
// 啟動(dòng) NamesrvController
start(controller);
String tip = "The Name Server boot success. serializeType=" + RemotingCommand.getSerializeTypeConfigInThisServer();
log.info(tip);
System.out.printf("%s%n", tip);
return controller;
} catch (Throwable e) {
e.printStackTrace();
System.exit(-1);
}
return null;
}
- 通過(guò)
createNamesrvController()
方法創(chuàng)建NamesrvController
實(shí)例。 - 調(diào)用
start(controller)
方法啟動(dòng)NamesrvController
希痴。
1.2 createNamesrvController
方法
public static NamesrvController createNamesrvController(String[] args) throws IOException, JoranException {
System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION));
//PackageConflictDetect.detectFastjson();
// 通過(guò) commandLine 解析 args 參數(shù)
Options options = ServerUtil.buildCommandlineOptions(new Options());
commandLine = ServerUtil.parseCmdLine("mqnamesrv", args, buildCommandlineOptions(options), new PosixParser());
if (null == commandLine) {
System.exit(-1);
return null;
}
// namesrv 相關(guān)的配置參數(shù)
final NamesrvConfig namesrvConfig = new NamesrvConfig();
// netty 相關(guān)的配置參數(shù)
final NettyServerConfig nettyServerConfig = new NettyServerConfig();
// 默認(rèn)端口 9876者甲, 但是可以通過(guò) -c 傳入的 properties 文件參數(shù)進(jìn)行覆蓋
nettyServerConfig.setListenPort(9876);
if (commandLine.hasOption('c')) {
String file = commandLine.getOptionValue('c');
if (file != null) {
InputStream in = new BufferedInputStream(new FileInputStream(file));
properties = new Properties();
properties.load(in);
MixAll.properties2Object(properties, namesrvConfig);
MixAll.properties2Object(properties, nettyServerConfig);
namesrvConfig.setConfigStorePath(file);
System.out.printf("load config properties file OK, %s%n", file);
in.close();
}
}
if (commandLine.hasOption('p')) {
InternalLogger console = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_CONSOLE_NAME);
MixAll.printObjectProperties(console, namesrvConfig);
MixAll.printObjectProperties(console, nettyServerConfig);
System.exit(0);
}
MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), namesrvConfig);
if (null == namesrvConfig.getRocketmqHome()) {
System.out.printf("Please set the %s variable in your environment to match the location of the RocketMQ installation%n", MixAll.ROCKETMQ_HOME_ENV);
System.exit(-2);
}
LoggerContext lc = (LoggerContext) LoggerFactory.getILoggerFactory();
JoranConfigurator configurator = new JoranConfigurator();
configurator.setContext(lc);
lc.reset();
configurator.doConfigure(namesrvConfig.getRocketmqHome() + "/conf/logback_namesrv.xml");
log = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);
MixAll.printObjectProperties(log, namesrvConfig);
MixAll.printObjectProperties(log, nettyServerConfig);
// 創(chuàng)建 NamesrvController
final NamesrvController controller = new NamesrvController(namesrvConfig, nettyServerConfig);
// remember all configs to prevent discard
controller.getConfiguration().registerConfig(properties);
return controller;
}
- 通過(guò)
commandLine
解析命令行args
參數(shù)。 - 如果指定了配置文件砌创,那么讀取配置文件中的配置項(xiàng)虏缸,并賦值到
namesrvConfig
和nettyServerConfig
。 - 創(chuàng)建
NamesrvController
實(shí)例嫩实,并將配置項(xiàng)數(shù)據(jù)properties
賦值到Configuration
中刽辙。
1.3 start
方法
public static NamesrvController start(final NamesrvController controller) throws Exception {
if (null == controller) {
throw new IllegalArgumentException("NamesrvController is null");
}
// 初始化 NamesrvController
boolean initResult = controller.initialize();
if (!initResult) {
// 初始化失敗,就退出
controller.shutdown();
System.exit(-3);
}
// 添加鉤子函數(shù)甲献,保證 JVM 正常退出時(shí)宰缤,關(guān)閉 NamesrvController
Runtime.getRuntime().addShutdownHook(new ShutdownHookThread(log, new Callable<Void>() {
@Override
public Void call() throws Exception {
controller.shutdown();
return null;
}
}));
// 開啟 NamesrvController
controller.start();
return controller;
}
非常簡(jiǎn)單,先初始化 NamesrvController
晃洒;然后添加鉤子函數(shù)撵溃,保證 JVM
正常退出時(shí),關(guān)閉 NamesrvController
锥累;最后調(diào)用 start()
方法啟動(dòng)。
二. NamesrvController
類
2.1 成員屬性
// Namesrv 的配置項(xiàng)
private final NamesrvConfig namesrvConfig;
// Netty 服務(wù)端配置項(xiàng)
private final NettyServerConfig nettyServerConfig;
// 定時(shí)器集歇,用來(lái)定期檢查是否有不活躍的 broker桶略,以及定期打印 kvConfigManager 中的值
private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl(
"NSScheduledThread"));
// 儲(chǔ)存 KV 的值
private final KVConfigManager kvConfigManager;
// namesrv 所有路由信息的管理器
private final RouteInfoManager routeInfoManager;
// 遠(yuǎn)程RPC服務(wù)服務(wù)端,用來(lái)處理遠(yuǎn)程請(qǐng)求命令
private RemotingServer remotingServer;
// ChannelEventListener 接口子類诲宇,監(jiān)聽 Netty 的 CONNECT, CLOSE, IDLE, EXCEPTION 事件际歼,進(jìn)行對(duì)應(yīng)處理
private BrokerHousekeepingService brokerHousekeepingService;
// 處理遠(yuǎn)程請(qǐng)求的線程池執(zhí)行器
private ExecutorService remotingExecutor;
// 配置項(xiàng)
private Configuration configuration;
// 監(jiān)控 file 變化,主要用于 SSL
private FileWatchService fileWatchService;
-
namesrvConfig
和nettyServerConfig
:Namesrv
的配置項(xiàng)和Netty
服務(wù)端配置項(xiàng)姑蓝。 -
scheduledExecutorService
: 定時(shí)器用來(lái)定期檢查是否有不活躍的
Broker
鹅心,以及定期打印kvConfigManager
中的值。 -
kvConfigManager
:KV
值的管理器纺荧。 -
routeInfoManager
: 所有路由信息的管理器旭愧。 -
remotingServer
: 遠(yuǎn)程RPC
服務(wù)服務(wù)端,用來(lái)處理遠(yuǎn)程請(qǐng)求命令宙暇。 -
brokerHousekeepingService
:ChannelEventListener
接口子類输枯。監(jiān)聽
Netty
的CONNECT
,CLOSE
,IDLE
,EXCEPTION
事件,進(jìn)行對(duì)應(yīng)處理占贫。 -
remotingExecutor
: 處理遠(yuǎn)程請(qǐng)求的線程池執(zhí)行器桃熄。
2.2 initialize
方法
public boolean initialize() {
// 從 kvConfig.json 文件中加載之前存儲(chǔ)的 KV 值
this.kvConfigManager.load();
// 通過(guò) netty 創(chuàng)建一個(gè)服務(wù)端
this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService);
// 用于處理請(qǐng)求的線程池 remotingExecutor
this.remotingExecutor =
Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(), new ThreadFactoryImpl("RemotingExecutorThread_"));
// 注冊(cè)請(qǐng)求命令處理器
this.registerProcessor();
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
// 每隔10秒 檢查是否有不活躍的 broker
NamesrvController.this.routeInfoManager.scanNotActiveBroker();
}
}, 5, 10, TimeUnit.SECONDS);
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
// 每隔10秒打印一下 kvConfigManager 中的值
NamesrvController.this.kvConfigManager.printAllPeriodically();
}
}, 1, 10, TimeUnit.MINUTES);
// 處理 SSL
if (TlsSystemConfig.tlsMode != TlsMode.DISABLED) {
// Register a listener to reload SslContext
try {
fileWatchService = new FileWatchService(
new String[] {
TlsSystemConfig.tlsServerCertPath,
TlsSystemConfig.tlsServerKeyPath,
TlsSystemConfig.tlsServerTrustCertPath
},
new FileWatchService.Listener() {
boolean certChanged, keyChanged = false;
@Override
public void onChanged(String path) {
if (path.equals(TlsSystemConfig.tlsServerTrustCertPath)) {
log.info("The trust certificate changed, reload the ssl context");
reloadServerSslContext();
}
if (path.equals(TlsSystemConfig.tlsServerCertPath)) {
certChanged = true;
}
if (path.equals(TlsSystemConfig.tlsServerKeyPath)) {
keyChanged = true;
}
if (certChanged && keyChanged) {
log.info("The certificate and private key changed, reload the ssl context");
certChanged = keyChanged = false;
reloadServerSslContext();
}
}
private void reloadServerSslContext() {
((NettyRemotingServer) remotingServer).loadSslContext();
}
});
} catch (Exception e) {
log.warn("FileWatchService created error, can't load the certificate dynamically");
}
}
return true;
}
- 從
kvConfig.json
文件中加載之前存儲(chǔ)的KV
值。 - 創(chuàng)建一個(gè)遠(yuǎn)程
RPC
服務(wù)服務(wù)端型奥,用來(lái)處理遠(yuǎn)程請(qǐng)求命令瞳收。 - 用于處理請(qǐng)求的線程池
remotingExecutor
碉京。 - 注冊(cè)請(qǐng)求命令處理器。
- 通過(guò)
scheduledExecutorService
每隔10
秒檢查是否有不活躍的Broker
螟深,以及每隔10
秒打印一下kvConfigManager
中的值谐宙。 - 最后處理
SSL
。
三. RouteInfoManager
類
3.1 成員屬性
// 主題 topic 對(duì)應(yīng)的隊(duì)列相關(guān)信息QueueData血崭,這里是 List 原因是每個(gè) broker 組都有一個(gè) QueueData卧惜。
private final HashMap<String/* topic */, List<QueueData>> topicQueueTable;
// broker 組基礎(chǔ)信息集合,包括 broker 名字夹纫,所屬集群名字和主從 broker 的地址咽瓷。
private final HashMap<String/* brokerName */, BrokerData> brokerAddrTable;
// broker 集群集合,包括所有集群名字以及它擁有所有broker 組名字舰讹。
private final HashMap<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable;
// 每個(gè) broker 的狀態(tài)信息茅姜,每次收到心跳包時(shí),都會(huì)替換對(duì)應(yīng)數(shù)據(jù)月匣。
private final HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable;
// 每個(gè) broker 對(duì)應(yīng)的 FilterServer 地址列表钻洒,用于類模式消息過(guò)濾。
private final HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;
-
topicQueueTable
: 所有主題相關(guān)信息集合锄开。- 每個(gè)主題
Topic
對(duì)應(yīng)一個(gè)List<QueueData>
集合素标,因?yàn)橐粋€(gè)主題有多個(gè)Broker
組。 -
QueueData
包括Broker
組的名字萍悴,這個(gè)Broker
組中當(dāng)期主題Topic
對(duì)應(yīng)的可讀隊(duì)列數(shù)量头遭,可寫隊(duì)列數(shù)量,讀寫權(quán)限和同步標(biāo)記癣诱。
- 每個(gè)主題
-
brokerAddrTable
:Broker
組基礎(chǔ)信息集合计维。BrokerData
包括Broker
組名字,所屬集群名字和主從Broker
的地址撕予。 -
clusterAddrTable
:Broker
集群集合鲫惶,key
是集群名字,value
是集群擁有所有的Broker
組名字实抡。 -
brokerLiveTable
: 每個(gè)Broker
的狀態(tài)信息欠母。 -
filterServerTable
: 每個(gè)Broker
對(duì)應(yīng)的FilterServer
地址列表,用于類模式消息過(guò)濾吆寨。
3.1.1 QueueData
類
public class QueueData implements Comparable<QueueData> {
// broker組的名字
private String brokerName;
// 可讀隊(duì)列數(shù)量
private int readQueueNums;
// 可寫隊(duì)列數(shù)量
private int writeQueueNums;
// 讀寫權(quán)限艺蝴,具體參考 PermName,
private int perm;
// 主題Topic 同步標(biāo)記; 參考TopicSysFlag類: FLAG_UNIT = 0x1 << 0, FLAG_UNIT_SUB = 0x1 << 1
private int topicSynFlag;
}
3.1.2 BrokerData
類
public class BrokerData implements Comparable<BrokerData> {
// broker 組所屬集群的名字
private String cluster;
// broker 組的名字
private String brokerName;
// broker 組中所有 broker 的地址;其中 brokerId = 0 表示主 broker鸟废,其他的都是從 broker猜敢。
private HashMap<Long/* brokerId */, String/* broker address */> brokerAddrs;
}
3.1.3 BrokerLiveInfo
類
class BrokerLiveInfo {
// 最近一次更新時(shí)間,用來(lái)判斷這個(gè) broker 是否活躍
private long lastUpdateTimestamp;
// 這個(gè) broker 的數(shù)據(jù)版本,可以用來(lái)判斷這個(gè) Broker 的數(shù)據(jù)是否改變過(guò)缩擂。
private DataVersion dataVersion;
// 連接這個(gè) broker 的通道channel
private Channel channel;
// 該 broker 的 HaServer地址
private String haServerAddr;
}
3.2 重要方法
3.2.1 registerBroker
方法
/**
* 注冊(cè) Broker
*/
public RegisterBrokerResult registerBroker(
final String clusterName,
final String brokerAddr,
final String brokerName,
final long brokerId,
final String haServerAddr,
final TopicConfigSerializeWrapper topicConfigWrapper,
final List<String> filterServerList,
final Channel channel) {
RegisterBrokerResult result = new RegisterBrokerResult();
try {
try {
this.lock.writeLock().lockInterruptibly();
// 這個(gè) broker 所屬的集群clusterName 是否已經(jīng)在 clusterAddrTable 中
Set<String> brokerNames = this.clusterAddrTable.get(clusterName);
if (null == brokerNames) {
brokerNames = new HashSet<String>();
this.clusterAddrTable.put(clusterName, brokerNames);
}
// 因?yàn)?brokerNames 是 Set 類型鼠冕,會(huì)自動(dòng)去重,所以這里直接添加
brokerNames.add(brokerName);
// 是否第一次注冊(cè)
boolean registerFirst = false;
// 通過(guò) brokerName 從 brokerAddrTable 中獲取對(duì)應(yīng)的 BrokerData胯盯。
BrokerData brokerData = this.brokerAddrTable.get(brokerName);
if (null == brokerData) {
registerFirst = true;
// 如果不存在懈费,就創(chuàng)建新的,并存入 brokerAddrTable 中博脑。
brokerData = new BrokerData(clusterName, brokerName, new HashMap<Long, String>());
this.brokerAddrTable.put(brokerName, brokerData);
}
Map<Long, String> brokerAddrsMap = brokerData.getBrokerAddrs();
//Switch slave to master: first remove <1, IP:PORT> in namesrv, then add <0, IP:PORT>
//The same IP:PORT must only have one record in brokerAddrTable
// 處理 slave 變成 master 的情況
Iterator<Entry<Long, String>> it = brokerAddrsMap.entrySet().iterator();
while (it.hasNext()) {
Entry<Long, String> item = it.next();
// 如果 brokerAddr 相同憎乙,但是 brokerId 不一樣,說(shuō)明這個(gè) broker 修改 brokerId叉趣,
// 那么就先把它從 brokerAddrsMap 集合中移除泞边。
if (null != brokerAddr && brokerAddr.equals(item.getValue()) && brokerId != item.getKey()) {
it.remove();
}
}
String oldAddr = brokerData.getBrokerAddrs().put(brokerId, brokerAddr);
registerFirst = registerFirst || (null == oldAddr);
/**
* 只有當(dāng) topicConfigWrapper 不為null且必須是master節(jié)點(diǎn),才能進(jìn)入
*/
if (null != topicConfigWrapper
&& MixAll.MASTER_ID == brokerId) {
// 當(dāng) topicConfigWrapper 的數(shù)據(jù)版本dataVersion 和當(dāng)前儲(chǔ)存值不一樣疗杉,或者是第一次注冊(cè)時(shí)阵谚;
// 都需要處理 Topic
if (this.isBrokerTopicConfigChanged(brokerAddr, topicConfigWrapper.getDataVersion())
|| registerFirst) {
ConcurrentMap<String, TopicConfig> tcTable =
topicConfigWrapper.getTopicConfigTable();
if (tcTable != null) {
// 遍歷 Broker 上所有的 topic 配置,改變 topicQueueTable 集合數(shù)據(jù)
for (Map.Entry<String, TopicConfig> entry : tcTable.entrySet()) {
// 只有 Broker 組中的主節(jié)點(diǎn)才有可能調(diào)用到這個(gè)方法烟具。
this.createAndUpdateQueueData(brokerName, entry.getValue());
}
}
}
}
// 更新 brokerLiveTable 中該 Broker 地址對(duì)應(yīng)狀態(tài)信息梢什,表示該 Broker 地址是活躍的。
BrokerLiveInfo prevBrokerLiveInfo = this.brokerLiveTable.put(brokerAddr,
new BrokerLiveInfo(
System.currentTimeMillis(),
topicConfigWrapper.getDataVersion(),
channel,
haServerAddr));
if (null == prevBrokerLiveInfo) {
log.info("new broker registered, {} HAServer: {}", brokerAddr, haServerAddr);
}
if (filterServerList != null) {
if (filterServerList.isEmpty()) {
this.filterServerTable.remove(brokerAddr);
} else {
this.filterServerTable.put(brokerAddr, filterServerList);
}
}
if (MixAll.MASTER_ID != brokerId) {
// 如果這個(gè)Broker 是 slave 節(jié)點(diǎn)朝聋, 那么給它設(shè)置主節(jié)點(diǎn)的地址和 HaServer的地址
String masterAddr = brokerData.getBrokerAddrs().get(MixAll.MASTER_ID);
if (masterAddr != null) {
BrokerLiveInfo brokerLiveInfo = this.brokerLiveTable.get(masterAddr);
if (brokerLiveInfo != null) {
result.setHaServerAddr(brokerLiveInfo.getHaServerAddr());
result.setMasterAddr(masterAddr);
}
}
}
} finally {
this.lock.writeLock().unlock();
}
} catch (Exception e) {
log.error("registerBroker Exception", e);
}
return result;
}
注冊(cè) Broker
信息, 每個(gè) Broker
會(huì)定時(shí)向 namesrv
發(fā)送自身的數(shù)據(jù)嗡午,就會(huì)調(diào)用到這個(gè)方法。方法流程:
- 先將這個(gè)
Broker
的brokerName
添加到集群集合clusterAddrTable
中冀痕。 - 將這個(gè)
Broker
的相關(guān)信息添加到brokerAddrTable
集合中荔睹,并判斷這個(gè)Broker
是否第一次注冊(cè)registerFirst
。 - 當(dāng)這個(gè)
Broker
是主節(jié)點(diǎn)金度,topicConfigWrapper
的數(shù)據(jù)版本dataVersion
和當(dāng)前儲(chǔ)存值不一樣,或者是第一次注冊(cè)時(shí)严沥;都需要將該Broker
的主題信息topicConfigWrapper
添加到topicQueueTable
中猜极。 - 更新
brokerLiveTable
中該Broker
地址對(duì)應(yīng)狀態(tài)信息,表示該Broker
地址是活躍的消玄。 - 如果這個(gè)
Broker
是slave
節(jié)點(diǎn)跟伏, 那么給它設(shè)置主節(jié)點(diǎn)的地址和HaServer
的地址。
總結(jié)一下:
就是按照順序翩瓜,分別改變
clusterAddrTable
,brokerAddrTable
,topicQueueTable
,brokerLiveTable
和filterServerTable
的數(shù)據(jù)受扳。
3.2.2 unregisterBroker
方法
/**
* 取消注冊(cè)Broker
*/
public void unregisterBroker(
final String clusterName,
final String brokerAddr,
final String brokerName,
final long brokerId) {
try {
try {
this.lock.writeLock().lockInterruptibly();
// 刪除的時(shí)候,先刪除簡(jiǎn)單的兔跌。
// 1. 刪除 brokerLiveTable 集合中 brokerAddr 對(duì)應(yīng)數(shù)據(jù)
BrokerLiveInfo brokerLiveInfo = this.brokerLiveTable.remove(brokerAddr);
log.info("unregisterBroker, remove from brokerLiveTable {}, {}",
brokerLiveInfo != null ? "OK" : "Failed",
brokerAddr
);
// 2. 刪除 filterServerTable 集合中 brokerAddr 對(duì)應(yīng)數(shù)據(jù)
this.filterServerTable.remove(brokerAddr);
// 3. 再根據(jù) brokerName 處理 brokerAddrTable 集合中brokerData 數(shù)據(jù)
boolean removeBrokerName = false;
BrokerData brokerData = this.brokerAddrTable.get(brokerName);
if (null != brokerData) {
String addr = brokerData.getBrokerAddrs().remove(brokerId);
log.info("unregisterBroker, remove addr from brokerAddrTable {}, {}",
addr != null ? "OK" : "Failed",
brokerAddr
);
if (brokerData.getBrokerAddrs().isEmpty()) {
// 如果 brokerName 只包含這一個(gè) brokerId勘高,被刪除了;
// 那么也要從 brokerAddrTable 集合中刪除這個(gè) brokerName
this.brokerAddrTable.remove(brokerName);
log.info("unregisterBroker, remove name from brokerAddrTable OK, {}",
brokerName
);
removeBrokerName = true;
}
}
if (removeBrokerName) {
// brokerName 被刪除了,要更新 clusterAddrTable 集合數(shù)據(jù)
Set<String> nameSet = this.clusterAddrTable.get(clusterName);
if (nameSet != null) {
boolean removed = nameSet.remove(brokerName);
log.info("unregisterBroker, remove name from clusterAddrTable {}, {}",
removed ? "OK" : "Failed",
brokerName);
if (nameSet.isEmpty()) {
this.clusterAddrTable.remove(clusterName);
log.info("unregisterBroker, remove cluster from clusterAddrTable {}",
clusterName
);
}
}
// 只有當(dāng) brokerName 被刪除了华望,那么就要更新 topicQueueTable 集合了蕊蝗。
this.removeTopicByBrokerName(brokerName);
}
} finally {
this.lock.writeLock().unlock();
}
} catch (Exception e) {
log.error("unregisterBroker Exception", e);
}
}
- 先刪除
brokerLiveTable
集合中brokerAddr
對(duì)應(yīng)數(shù)據(jù)。 - 刪除
filterServerTable
集合中brokerAddr
對(duì)應(yīng)數(shù)據(jù)赖舟。 - 再根據(jù)
brokerName
處理brokerAddrTable
集合中BrokerData
數(shù)據(jù)蓬戚。如果
BrokerData
中只包含當(dāng)前這個(gè)Broker
,那么當(dāng)它被刪除后宾抓,那么就要從brokerAddrTable
刪除這個(gè)brokerName
鍵子漩。表示這個(gè)Broker
組已經(jīng)不存在了。 - 當(dāng)一個(gè)
Broker
組被刪除后石洗,那么就需要改變clusterAddrTable
和topicQueueTable
的數(shù)據(jù)了幢泼。- 從
clusterAddrTable
中刪除這個(gè)Broker
組名字brokerName
, 如果這個(gè)集群只有這一個(gè)Broker
組,那么這個(gè)集群也要從clusterAddrTable
中刪除劲腿。 - 通過(guò)
removeTopicByBrokerName()
方法旭绒,更新topicQueueTable
集合。
- 從
總結(jié)一下:
就是先刪除
brokerLiveTable
和filterServerTable
中的數(shù)據(jù)焦人,因?yàn)樗鼈冎械臄?shù)據(jù)是比較獨(dú)立的挥吵;然后修改brokerAddrTable
集合中數(shù)據(jù);最后根據(jù)Broker
組是否被刪除花椭,來(lái)決定是否修改clusterAddrTable
和topicQueueTable
集合中的數(shù)據(jù)忽匈。
3.2.3 onChannelDestroy
方法
/**
* 通道 Channel 銷毀時(shí)的處理
*/
public void onChannelDestroy(String remoteAddr, Channel channel) {
String brokerAddrFound = null;
if (channel != null) {
// 根據(jù) channel,從 brokerLiveTable 中查找對(duì)應(yīng)的 brokerAddr 值矿辽。
try {
try {
this.lock.readLock().lockInterruptibly();
// 從 brokerLiveTable 中丹允,根據(jù) channel 查找對(duì)應(yīng)的Broker地址 brokerAddrFound
Iterator<Entry<String, BrokerLiveInfo>> itBrokerLiveTable =
this.brokerLiveTable.entrySet().iterator();
while (itBrokerLiveTable.hasNext()) {
Entry<String, BrokerLiveInfo> entry = itBrokerLiveTable.next();
if (entry.getValue().getChannel() == channel) {
brokerAddrFound = entry.getKey();
break;
}
}
} finally {
this.lock.readLock().unlock();
}
} catch (Exception e) {
log.error("onChannelDestroy Exception", e);
}
}
if (null == brokerAddrFound) {
brokerAddrFound = remoteAddr;
} else {
log.info("the broker's channel destroyed, {}, clean it's data structure at once", brokerAddrFound);
}
if (brokerAddrFound != null && brokerAddrFound.length() > 0) {
// 根據(jù) brokerAddr 的值,進(jìn)行移除操作
try {
try {
this.lock.writeLock().lockInterruptibly();
// 先移除 brokerLiveTable 和 filterServerTable 集合中袋倔,
// brokerAddrFound 對(duì)應(yīng)數(shù)據(jù)
this.brokerLiveTable.remove(brokerAddrFound);
this.filterServerTable.remove(brokerAddrFound);
String brokerNameFound = null;
boolean removeBrokerName = false;
Iterator<Entry<String, BrokerData>> itBrokerAddrTable =
this.brokerAddrTable.entrySet().iterator();
// 根據(jù)Broker地址 brokerAddrFound 從brokerAddrTable 中刪除這個(gè) Broker
while (itBrokerAddrTable.hasNext() && (null == brokerNameFound)) {
BrokerData brokerData = itBrokerAddrTable.next().getValue();
Iterator<Entry<Long, String>> it = brokerData.getBrokerAddrs().entrySet().iterator();
while (it.hasNext()) {
Entry<Long, String> entry = it.next();
Long brokerId = entry.getKey();
String brokerAddr = entry.getValue();
if (brokerAddr.equals(brokerAddrFound)) {
// 找到了雕蔽,就刪除這個(gè) Broker 地址
brokerNameFound = brokerData.getBrokerName();
it.remove();
log.info("remove brokerAddr[{}, {}] from brokerAddrTable, because channel destroyed",
brokerId, brokerAddr);
break;
}
}
if (brokerData.getBrokerAddrs().isEmpty()) {
removeBrokerName = true;
itBrokerAddrTable.remove();
log.info("remove brokerName[{}] from brokerAddrTable, because channel destroyed",
brokerData.getBrokerName());
}
}
if (brokerNameFound != null && removeBrokerName) {
// 如果 brokerName 被刪除了,那么就要改變 clusterAddrTable 集合了
Iterator<Entry<String, Set<String>>> it = this.clusterAddrTable.entrySet().iterator();
while (it.hasNext()) {
Entry<String, Set<String>> entry = it.next();
String clusterName = entry.getKey();
Set<String> brokerNames = entry.getValue();
boolean removed = brokerNames.remove(brokerNameFound);
if (removed) {
log.info("remove brokerName[{}], clusterName[{}] from clusterAddrTable, because channel destroyed",
brokerNameFound, clusterName);
if (brokerNames.isEmpty()) {
log.info("remove the clusterName[{}] from clusterAddrTable, because channel destroyed and no broker in this cluster",
clusterName);
it.remove();
}
break;
}
}
}
if (removeBrokerName) {
// 刪除了 brokerName宾娜,那么就要?jiǎng)h除 topicQueueTable 中所有這個(gè) brokerName 對(duì)應(yīng)的QueueData
Iterator<Entry<String, List<QueueData>>> itTopicQueueTable =
this.topicQueueTable.entrySet().iterator();
while (itTopicQueueTable.hasNext()) {
Entry<String, List<QueueData>> entry = itTopicQueueTable.next();
String topic = entry.getKey();
List<QueueData> queueDataList = entry.getValue();
Iterator<QueueData> itQueueData = queueDataList.iterator();
while (itQueueData.hasNext()) {
QueueData queueData = itQueueData.next();
if (queueData.getBrokerName().equals(brokerNameFound)) {
itQueueData.remove();
log.info("remove topic[{} {}], from topicQueueTable, because channel destroyed",
topic, queueData);
}
}
if (queueDataList.isEmpty()) {
// 如果刪除后批狐,這個(gè) queueDataList 為空,
// 說(shuō)明這個(gè) Topic 沒有對(duì)應(yīng)的 QueueData前塔,也應(yīng)該刪除嚣艇。
itTopicQueueTable.remove();
log.info("remove topic[{}] all queue, from topicQueueTable, because channel destroyed",
topic);
}
}
}
} finally {
this.lock.writeLock().unlock();
}
} catch (Exception e) {
log.error("onChannelDestroy Exception", e);
}
}
}
這個(gè)方法和 unregisterBroker
很像,區(qū)別是:
- 通過(guò)
channel
從brokerLiveTable
找到對(duì)應(yīng)的Broker
地址brokerAddr
华弓。 - 刪除
brokerAddrTable
集合數(shù)據(jù)時(shí)食零,是通過(guò)brokerAddr
進(jìn)行匹配的,而不是brokerId
寂屏; - 其他的操作流程和
unregisterBroker
一樣贰谣。
3.2.4 scanNotActiveBroker
方法
public void scanNotActiveBroker() {
Iterator<Entry<String, BrokerLiveInfo>> it = this.brokerLiveTable.entrySet().iterator();
while (it.hasNext()) {
Entry<String, BrokerLiveInfo> next = it.next();
long last = next.getValue().getLastUpdateTimestamp();
// 如果某個(gè) Broker 超過(guò) BROKER_CHANNEL_EXPIRED_TIME(1000 * 60 * 2) 沒有接收到信息娜搂,
// 那么我們就認(rèn)為這個(gè) Broker 已經(jīng)出現(xiàn)問(wèn)題,刪除它
if ((last + BROKER_CHANNEL_EXPIRED_TIME) < System.currentTimeMillis()) {
RemotingUtil.closeChannel(next.getValue().getChannel());
it.remove();
log.warn("The broker channel expired, {} {}ms", next.getKey(), BROKER_CHANNEL_EXPIRED_TIME);
this.onChannelDestroy(next.getKey(), next.getValue().getChannel());
}
}
}
掃描不活躍的 Broker
冈爹,并將它進(jìn)行關(guān)閉涌攻。
如果某個(gè)
Broker
超過(guò)BROKER_CHANNEL_EXPIRED_TIME
(1000 * 60 * 2
) 沒有接收到信息。那么我們就認(rèn)為這個(gè)Broker
已經(jīng)出現(xiàn)問(wèn)題频伤,就關(guān)閉它恳谎。