RocketMQ路由注冊是通過Broker與NameServer的心跳功能實現(xiàn)筋讨。Broker啟動時向集群中所有的NameServer發(fā)送心跳包埃叭,每隔30秒向集群中的所有NameServer發(fā)送心跳,NamerServer收到心跳包會更新brokerLiveTable緩存中的BrokerLiveInfo的lastUpdateTimestamp悉罕,然后NameServer每隔十秒掃描brokerLiveTable赤屋,如果連續(xù)120秒沒有收到心跳包,NameServer將移除該broker的路由信息壁袄,同時關閉Socket連接类早。
這里也是為什么不建議線上開啟autoCreateTopic的原因,因為默認緩存在broker中然想,所以開啟后莺奔,多個broker同時默認創(chuàng)建topic,將會導致重復創(chuàng)建broker变泄,最終同步到NameServer后的數(shù)據(jù)重復令哟。
Broker發(fā)送心跳包
org.apache.rocketmq.broker.BrokerController#start
this.scheduledExecutorService.scheduleAtFixedRate(() -> {
try {
BrokerController.this.registerBrokerAll(true, false, brokerConfig.isForceRegister());
} catch (Throwable e) {
log.error("registerBrokerAll Exception", e);
}
}, 1000 * 10, Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)), TimeUnit.MILLISECONDS);
跟蹤代碼,真正執(zhí)行注冊
this.brokerOuterAPI.registerBrokerAll
org.apache.rocketmq.broker.out.BrokerOuterAPI#registerBrokerAll
public List<RegisterBrokerResult> registerBrokerAll(
final String clusterName,
final String brokerAddr,
final String brokerName,
final long brokerId,
final String haServerAddr,
final TopicConfigSerializeWrapper topicConfigWrapper,
final List<String> filterServerList,
final boolean oneway,
final int timeoutMills,
final boolean compressed) {
final List<RegisterBrokerResult> registerBrokerResultList = Lists.newArrayList();
//獲取 NameServer 地址列表
List<String> nameServerAddressList = this.remotingClient.getNameServerAddressList();
if (nameServerAddressList != null && nameServerAddressList.size() > 0) {
final RegisterBrokerRequestHeader requestHeader = new RegisterBrokerRequestHeader();
//填充requestHeader屬性
RegisterBrokerBody requestBody = new RegisterBrokerBody();
//填充body屬性
final CountDownLatch countDownLatch = new CountDownLatch(nameServerAddressList.size());
for (final String namesrvAddr : nameServerAddressList) {
brokerOuterExecutor.execute(new Runnable() {
@Override
public void run() {
//真正注冊
RegisterBrokerResult result = registerBroker(namesrvAddr,oneway, timeoutMills,requestHeader,body);
}
});
}
}
return registerBrokerResultList;
}
注冊邏輯在registerBroker中執(zhí)行妨蛹,邏輯較為簡單屏富,發(fā)送http請求處理返回數(shù)據(jù)即可。
private RegisterBrokerResult registerBroker(
final String namesrvAddr,
final boolean oneway,
final int timeoutMills,
final RegisterBrokerRequestHeader requestHeader,
final byte[] body
) throws RemotingCommandException, MQBrokerException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException,
InterruptedException {
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.REGISTER_BROKER, requestHeader);
request.setBody(body);
if (oneway) {
try {
this.remotingClient.invokeOneway(namesrvAddr, request, timeoutMills);
} catch (RemotingTooMuchRequestException e) {
// Ignore
}
return null;
}
RemotingCommand response = this.remotingClient.invokeSync(namesrvAddr, request, timeoutMills);
assert response != null;
switch (response.getCode()) {
case ResponseCode.SUCCESS: {
RegisterBrokerResponseHeader responseHeader =
(RegisterBrokerResponseHeader) response.decodeCommandCustomHeader(RegisterBrokerResponseHeader.class);
RegisterBrokerResult result = new RegisterBrokerResult();
result.setMasterAddr(responseHeader.getMasterAddr());
result.setHaServerAddr(responseHeader.getHaServerAddr());
if (response.getBody() != null) {
result.setKvTable(KVTable.decode(response.getBody(), KVTable.class));
}
return result;
}
default:
break;
}
throw new MQBrokerException(response.getCode(), response.getRemark());
}
以上完成后蛙卤,返回BrokerController#doRegisterBrokerAll 方法中
//注冊并獲取結(jié)果
List<RegisterBrokerResult> registerBrokerResultList = this.brokerOuterAPI.registerBrokerAll(xxxx);
//更新心跳包返回數(shù)據(jù)
if (registerBrokerResultList.size() > 0) {
RegisterBrokerResult registerBrokerResult = registerBrokerResultList.get(0);
if (registerBrokerResult != null) {
if (this.updateMasterHAServerAddrPeriodically && registerBrokerResult.getHaServerAddr() != null) {
this.messageStore.updateHaMasterAddress(registerBrokerResult.getHaServerAddr());
}
this.slaveSynchronize.setMasterAddr(registerBrokerResult.getMasterAddr());
if (checkOrderConfig) {
this.getTopicConfigManager().updateOrderTopicConfig(registerBrokerResult.getKvTable());
}
}
}
獲取返回的結(jié)果狠半,更新對應的masterAddr 以及 topicConfigTable
NameServer處理心跳包
org.apache.rocketmq.namesrv.processor.DefaultRequestProcessor 負責RocketMQ中處理所有相關的網(wǎng)絡請求噩死,對應的心跳包請求解析器為RequestCode.REGISTER_BROKER
case RequestCode.REGISTER_BROKER:
Version brokerVersion = MQVersion.value2Version(request.getVersion());
if (brokerVersion.ordinal() >= MQVersion.Version.V3_0_11.ordinal()) {
return this.registerBrokerWithFilterServer(ctx, request);
} else {
return this.registerBroker(ctx, request);
}
跟蹤對應的代碼,我們可以看到請求的處理和返回代碼如下:
RegisterBrokerResult result = this.namesrvController.getRouteInfoManager().registerBroker(
requestHeader.getClusterName(),
requestHeader.getBrokerAddr(),
requestHeader.getBrokerName(),
requestHeader.getBrokerId(),
requestHeader.getHaServerAddr(),
registerBrokerBody.getTopicConfigSerializeWrapper(),
registerBrokerBody.getFilterServerList(),
ctx.channel());
responseHeader.setHaServerAddr(result.getHaServerAddr());
responseHeader.setMasterAddr(result.getMasterAddr());
byte[] jsonValue = this.namesrvController.getKvConfigManager().getKVListByNamespace(NamesrvUtil.NAMESPACE_ORDER_TOPIC_CONFIG);
response.setBody(jsonValue);
response.setCode(ResponseCode.SUCCESS);
response.setRemark(null);
namesrvController.getRouteInfoManager().registerBroker 負責處理注冊請求神年,處理后 返回體包裝對應的 ServerAddr以及路由信息返回已维。
RouteInfoManager#registerBroker
public RegisterBrokerResult registerBroker(
final String clusterName,
final String brokerAddr,
final String brokerName,
final long brokerId,
final String haServerAddr,
final TopicConfigSerializeWrapper topicConfigWrapper,
final List<String> filterServerList,
final Channel channel) {
RegisterBrokerResult result = new RegisterBrokerResult();
try {
try {
//寫加鎖
this.lock.writeLock().lockInterruptibly();
Set<String> brokerNames = this.clusterAddrTable.get(clusterName);
if (null == brokerNames) {
brokerNames = new HashSet<String>();
this.clusterAddrTable.put(clusterName, brokerNames);
}
brokerNames.add(brokerName);
boolean registerFirst = false;
//獲取brokerName,如果為空已日,則注冊
BrokerData brokerData = this.brokerAddrTable.get(brokerName);
if (null == brokerData) {
registerFirst = true;
brokerData = new BrokerData(clusterName, brokerName, new HashMap<Long, String>());
this.brokerAddrTable.put(brokerName, brokerData);
}
Map<Long, String> brokerAddrsMap = brokerData.getBrokerAddrs();
//Switch slave to master: first remove <1, IP:PORT> in namesrv, then add <0, IP:PORT>
//The same IP:PORT must only have one record in brokerAddrTable
Iterator<Entry<Long, String>> it = brokerAddrsMap.entrySet().iterator();
while (it.hasNext()) {
Entry<Long, String> item = it.next();
if (null != brokerAddr && brokerAddr.equals(item.getValue()) && brokerId != item.getKey()) {
it.remove();
}
}
String oldAddr = brokerData.getBrokerAddrs().put(brokerId, brokerAddr);
registerFirst = registerFirst || (null == oldAddr);
if (null != topicConfigWrapper
&& MixAll.MASTER_ID == brokerId) {
if (this.isBrokerTopicConfigChanged(brokerAddr, topicConfigWrapper.getDataVersion())
|| registerFirst) {
ConcurrentMap<String, TopicConfig> tcTable =
topicConfigWrapper.getTopicConfigTable();
if (tcTable != null) {
for (Map.Entry<String, TopicConfig> entry : tcTable.entrySet()) {
this.createAndUpdateQueueData(brokerName, entry.getValue());
}
}
}
}
BrokerLiveInfo prevBrokerLiveInfo = this.brokerLiveTable.put(brokerAddr,
new BrokerLiveInfo(
System.currentTimeMillis(),
topicConfigWrapper.getDataVersion(),
channel,
haServerAddr));
if (null == prevBrokerLiveInfo) {
log.info("new broker registered, {} HAServer: {}", brokerAddr, haServerAddr);
}
if (filterServerList != null) {
if (filterServerList.isEmpty()) {
this.filterServerTable.remove(brokerAddr);
} else {
this.filterServerTable.put(brokerAddr, filterServerList);
}
}
if (MixAll.MASTER_ID != brokerId) {
String masterAddr = brokerData.getBrokerAddrs().get(MixAll.MASTER_ID);
if (masterAddr != null) {
BrokerLiveInfo brokerLiveInfo = this.brokerLiveTable.get(masterAddr);
if (brokerLiveInfo != null) {
result.setHaServerAddr(brokerLiveInfo.getHaServerAddr());
result.setMasterAddr(masterAddr);
}
}
}
} finally {
this.lock.writeLock().unlock();
}
} catch (Exception e) {
log.error("registerBroker Exception", e);
}
return result;
}
路由刪除
如上所示垛耳,broker每隔30秒向NameServer注冊路由信息,那么如果一個broker宕機后飘千,NameServer如何剔除Broker呢堂鲜?同樣的NameServer每隔10秒掃描broker列表,管理broker存活信息护奈。
RocketMQ有兩個出發(fā)點來完成路由刪除:
- NameServer定時掃描brokerLiveTable檢測上次心跳包與當前系統(tǒng)的時間差缔莲,如果時間差大于120s,則需要剔除該broker信息
- Broker正常關閉的情況下霉旗,執(zhí)行unregisterBroker命令
RouteInfoManager#scanNotActiveBroker
public void scanNotActiveBroker() {
Iterator<Entry<String, BrokerLiveInfo>> it = this.brokerLiveTable.entrySet().iterator();
while (it.hasNext()) {
Entry<String, BrokerLiveInfo> next = it.next();
long last = next.getValue().getLastUpdateTimestamp();
if ((last + BROKER_CHANNEL_EXPIRED_TIME) < System.currentTimeMillis()) {
RemotingUtil.closeChannel(next.getValue().getChannel());
it.remove();
log.warn("The broker channel expired, {} {}ms", next.getKey(), BROKER_CHANNEL_EXPIRED_TIME);
this.onChannelDestroy(next.getKey(), next.getValue().getChannel());
}
}
}
onChannelDestroy 中申請寫鎖痴奏,然后遍歷移除。
路由發(fā)現(xiàn)
RocketMQ的路由發(fā)現(xiàn)不是實時的奖慌,當topic路由發(fā)生變化時抛虫,NameServer不主動推送給客戶端,而是客戶端定時拉取最新的主題信息简僧。根據(jù)主題名拉取路由信息的命令編碼命名為RequestCode.GET_ROUTEINTO_BY_TOPIC建椰。
請求依然由DefaultRequestProcessor處理。
case RequestCode.GET_ROUTEINTO_BY_TOPIC:
return this.getRouteInfoByTopic(ctx, request);
核心實現(xiàn)
TopicRouteData topicRouteData = this.namesrvController.getRouteInfoManager().pickupTopicRouteData(requestHeader.getTopic());
public class TopicRouteData extends RemotingSerializable {
private String orderTopicConf;//順序消息配置
private List<QueueData> queueDatas;//隊列元數(shù)據(jù)
private List<BrokerData> brokerDatas;//topic分布的broker原數(shù)據(jù)
private HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;//broker上過濾服務器列表
核心實現(xiàn)則由 namesrvController.getRouteInfoManager().pickupTopicRouteData