nacos-服務(wù)注冊(cè)發(fā)現(xiàn)源碼分析-啟動(dòng)注冊(cè)-02
分析了nacos在springboot 啟動(dòng)后攻谁,通過(guò)事件機(jī)制弯予,通知到啟動(dòng)事件監(jiān)聽(tīng)器熙涤,在事件監(jiān)聽(tīng)器里面完成了服務(wù)的注冊(cè)。現(xiàn)在我們來(lái)分析一下那槽,服務(wù)注冊(cè)的具體訪(fǎng)問(wèn)接口骚灸。
- NacosServiceRegistry 服務(wù)注冊(cè)接口
/**
* @author xiaojing
* @author <a href="mailto:mercyblitz@gmail.com">Mercy</a>
*/
public class NacosServiceRegistry implements ServiceRegistry<Registration> {
private static final Logger log = LoggerFactory.getLogger(NacosServiceRegistry.class);
private final NacosDiscoveryProperties nacosDiscoveryProperties;
private final NamingService namingService;
public NacosServiceRegistry(NacosDiscoveryProperties nacosDiscoveryProperties) {
this.nacosDiscoveryProperties = nacosDiscoveryProperties;
this.namingService = nacosDiscoveryProperties.namingServiceInstance(); // 依賴(lài)這個(gè)接口實(shí)現(xiàn)服務(wù)注冊(cè)請(qǐng)求
}
// 服務(wù)注冊(cè)
@Override
public void register(Registration registration) {
if (StringUtils.isEmpty(registration.getServiceId())) {
log.warn("No service to register for nacos client...");
return;
}
String serviceId = registration.getServiceId();
String group = nacosDiscoveryProperties.getGroup();
Instance instance = getNacosInstanceFromRegistration(registration);
try {
namingService.registerInstance(serviceId, group, instance); // 注冊(cè)接口
log.info("nacos registry, {} {} {}:{} register finished", group, serviceId,
instance.getIp(), instance.getPort());
}
catch (Exception e) {
log.error("nacos registry, {} register failed...{},", serviceId,
registration.toString(), e);
// rethrow a RuntimeException if the registration is failed.
// issue : https://github.com/alibaba/spring-cloud-alibaba/issues/1132
rethrowRuntimeException(e);
}
}
- this.namingService = nacosDiscoveryProperties.namingServiceInstance(); 實(shí)例化
public NamingService namingServiceInstance() {
if (null != namingService) {
return namingService;
}
try { // 繼續(xù)看這個(gè)方法
namingService = NacosFactory.createNamingService(getNacosProperties());
}
catch (Exception e) {
log.error("create naming service error!properties={},e=,", this, e);
return null;
}
return namingService;
}
- namingService = NacosFactory.createNamingService(getNacosProperties());
/**
* Create naming service
*
* @param properties init param
* @return Naming
* @throws NacosException Exception
*/
public static NamingService createNamingService(Properties properties) throws NacosException {
return NamingFactory.createNamingService(properties);
}
public static NamingService createNamingService(Properties properties) throws NacosException {
try { // 創(chuàng)建的對(duì)象
Class<?> driverImplClass = Class.forName("com.alibaba.nacos.client.naming.NacosNamingService");
Constructor constructor = driverImplClass.getConstructor(Properties.class);
NamingService vendorImpl = (NamingService)constructor.newInstance(properties);
return vendorImpl;
} catch (Throwable e) {
throw new NacosException(NacosException.CLIENT_INVALID_PARAM, e);
}
}
com.alibaba.nacos.client.naming.NacosNamingService
從上面的方法我們可以看出this.namingService = nacosDiscoveryProperties.namingServiceInstance();
這里的namingService
就是com.alibaba.nacos.client.naming.NacosNamingService
。ok 非驮,回國(guó)頭去看看這句話(huà)namingService.registerInstance(serviceId, group, instance);
也就是說(shuō)這個(gè)方法就是com.alibaba.nacos.client.naming.NacosNamingService#registerInstance(serviceId, group, instance);
- registerInstance(serviceId, group, instance);
.......
private NamingProxy serverProxy; //代理對(duì)象劫笙,這個(gè)對(duì)象就是發(fā)送底層http請(qǐng)求的
public NacosNamingService(String serverList) { //構(gòu)造
Properties properties = new Properties();
properties.setProperty(PropertyKeyConst.SERVER_ADDR, serverList);
init(properties);
}
public NacosNamingService(Properties properties) { // 構(gòu)造
init(properties); // 調(diào)用下面的方法
}
private void init(Properties properties) {
namespace = InitUtils.initNamespaceForNaming(properties);
initServerAddr(properties);
InitUtils.initWebRootContext();
initCacheDir();
initLogName(properties);
eventDispatcher = new EventDispatcher();
serverProxy = new NamingProxy(namespace, endpoint, serverList); // 初始化 代理對(duì)象`serverList` 注冊(cè)中心服務(wù)地址集合填大,也就是nacos 服務(wù)端集合
serverProxy.setProperties(properties);
beatReactor = new BeatReactor(serverProxy, initClientBeatThreadCount(properties));
hostReactor = new HostReactor(eventDispatcher, serverProxy, cacheDir, isLoadCacheAtStart(properties), initPollingThreadCount(properties));
}
// 負(fù)責(zé)服務(wù)注冊(cè)
@Override
public void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException {
if (instance.isEphemeral()) {
BeatInfo beatInfo = new BeatInfo();
beatInfo.setServiceName(NamingUtils.getGroupedName(serviceName, groupName));
beatInfo.setIp(instance.getIp());
beatInfo.setPort(instance.getPort());
beatInfo.setCluster(instance.getClusterName());
beatInfo.setWeight(instance.getWeight());
beatInfo.setMetadata(instance.getMetadata());
beatInfo.setScheduled(false);
long instanceInterval = instance.getInstanceHeartBeatInterval();
beatInfo.setPeriod(instanceInterval == 0 ? DEFAULT_HEART_BEAT_INTERVAL : instanceInterval);
// 添加心跳
beatReactor.addBeatInfo(NamingUtils.getGroupedName(serviceName, groupName), beatInfo);
}
// 我們看到這里,就知道注冊(cè)是繼續(xù)調(diào)用這句話(huà)
serverProxy.registerService(NamingUtils.getGroupedName(serviceName, groupName), groupName, instance);
}
- NamingProxy http 服務(wù)代理 的
registerService
public void registerService(String serviceName, String groupName, Instance instance) throws NacosException {
NAMING_LOGGER.info("[REGISTER-SERVICE] {} registering service {} with instance: {}",
namespaceId, serviceName, instance);
// 組裝請(qǐng)求參數(shù)
final Map<String, String> params = new HashMap<String, String>(9);
params.put(CommonParams.NAMESPACE_ID, namespaceId);
params.put(CommonParams.SERVICE_NAME, serviceName);
params.put(CommonParams.GROUP_NAME, groupName);
params.put(CommonParams.CLUSTER_NAME, instance.getClusterName());
params.put("ip", instance.getIp()); //服務(wù)的ip
params.put("port", String.valueOf(instance.getPort())); // 端口
params.put("weight", String.valueOf(instance.getWeight()));
params.put("enable", String.valueOf(instance.isEnabled()));
params.put("healthy", String.valueOf(instance.isHealthy()));
params.put("ephemeral", String.valueOf(instance.isEphemeral()));
params.put("metadata", JSON.toJSONString(instance.getMetadata()));
// 發(fā)送服務(wù)注冊(cè)請(qǐng)求 post 請(qǐng)求
reqAPI(UtilAndComs.NACOS_URL_INSTANCE, params, HttpMethod.POST);
}
- reqAPI(UtilAndComs.NACOS_URL_INSTANCE, params, HttpMethod.POST);
public String reqAPI(String api, Map<String, String> params, String method) throws NacosException {
List<String> snapshot = serversFromEndpoint;
if (!CollectionUtils.isEmpty(serverList)) {
snapshot = serverList;
}
return reqAPI(api, params, snapshot, method); // 發(fā)送請(qǐng)求
}
- reqAPI(api, params, snapshot, method);
public String reqAPI(String api, Map<String, String> params, List<String> servers, String method) {
params.put(CommonParams.NAMESPACE_ID, getNamespaceId());
if (CollectionUtils.isEmpty(servers) && StringUtils.isEmpty(nacosDomain)) {
throw new IllegalArgumentException("no server available");
}
Exception exception = new Exception();
if (servers != null && !servers.isEmpty()) {
Random random = new Random(System.currentTimeMillis());
int index = random.nextInt(servers.size()); // 隨機(jī)取一個(gè)服務(wù)地址進(jìn)行注冊(cè)
for (int i = 0; i < servers.size(); i++) {
String server = servers.get(index);
try {
return callServer(api, params, server, method); // 如果注冊(cè)失敗召耘,就執(zhí)行下面的邏輯,記錄日志收壕,并取 下一個(gè)服務(wù)地址轨蛤,直到注冊(cè)成功,return
} catch (NacosException e) {
exception = e;
NAMING_LOGGER.error("request {} failed.", server, e);
} catch (Exception e) {
exception = e;
NAMING_LOGGER.error("request {} failed.", server, e);
}
// 如果 上面 注冊(cè)失敗圃验,就再取一個(gè)服務(wù)地址進(jìn)行注冊(cè)
index = (index + 1) % servers.size();
}
throw new IllegalStateException("failed to req API:" + api + " after all servers(" + servers + ") tried: "
+ exception.getMessage());
}
for (int i = 0; i < UtilAndComs.REQUEST_DOMAIN_RETRY_COUNT; i++) {
try {
return callServer(api, params, nacosDomain);
} catch (Exception e) {
exception = e;
NAMING_LOGGER.error("[NA] req api:" + api + " failed, server(" + nacosDomain, e);
}
}
throw new IllegalStateException("failed to req API:/api/" + api + " after all servers(" + servers + ") tried: "
+ exception.getMessage());
}
上面代碼是服務(wù)注冊(cè)的邏輯,大致的意思是:注冊(cè)請(qǐng)求隨機(jī)發(fā)送的服務(wù)器供常,如果第一次就注冊(cè)成功栈暇,就return,否則繼續(xù)遍歷服務(wù)列表,繼續(xù)注冊(cè)煎源,直到注冊(cè)成功手销。下面看看
callServer
- callServer
public String callServer(String api, Map<String, String> params, String curServer, String method)
throws NacosException {
long start = System.currentTimeMillis();
long end = 0;
checkSignature(params);
List<String> headers = builderHeaders();
String url;
if (curServer.startsWith(UtilAndComs.HTTPS) || curServer.startsWith(UtilAndComs.HTTP)) {
url = curServer + api; // 請(qǐng)求地址
} else {
if (!curServer.contains(UtilAndComs.SERVER_ADDR_IP_SPLITER)) {
curServer = curServer + UtilAndComs.SERVER_ADDR_IP_SPLITER + serverPort;
}
url = HttpClient.getPrefix() + curServer + api;
}
// 發(fā)送http 請(qǐng)求
HttpClient.HttpResult result = HttpClient.request(url, headers, params, UtilAndComs.ENCODING, method);
end = System.currentTimeMillis();
MetricsMonitor.getNamingRequestMonitor(method, url, String.valueOf(result.code))
.observe(end - start);
if (HttpURLConnection.HTTP_OK == result.code) {
return result.content; // 請(qǐng)求成功锋拖,返回?cái)?shù)據(jù)
}
if (HttpURLConnection.HTTP_NOT_MODIFIED == result.code) {
return StringUtils.EMPTY;
}
throw new NacosException(NacosException.SERVER_ERROR, "failed to req API:"
+ curServer + api + ". code:"
+ result.code + " msg: " + result.content);
}
- 請(qǐng)求接口
UtilAndComs.NACOS_URL_INSTANCE
public void registerService(String serviceName, String groupName, Instance instance) throws NacosException {
NAMING_LOGGER.info("[REGISTER-SERVICE] {} registering service {} with instance: {}",
namespaceId, serviceName, instance);
final Map<String, String> params = new HashMap<String, String>(9);
params.put(CommonParams.NAMESPACE_ID, namespaceId);
params.put(CommonParams.SERVICE_NAME, serviceName);
params.put(CommonParams.GROUP_NAME, groupName);
params.put(CommonParams.CLUSTER_NAME, instance.getClusterName());
params.put("ip", instance.getIp());
params.put("port", String.valueOf(instance.getPort()));
params.put("weight", String.valueOf(instance.getWeight()));
params.put("enable", String.valueOf(instance.isEnabled()));
params.put("healthy", String.valueOf(instance.isHealthy()));
params.put("ephemeral", String.valueOf(instance.isEphemeral()));
params.put("metadata", JSON.toJSONString(instance.getMetadata()));
// url = UtilAndComs.NACOS_URL_INSTANCE
reqAPI(UtilAndComs.NACOS_URL_INSTANCE, params, HttpMethod.POST);
}
- UtilAndComs.NACOS_URL_INSTANCE
public static String WEB_CONTEXT = "/nacos";
public static String NACOS_URL_BASE = WEB_CONTEXT + "/v1/ns";
public static String NACOS_URL_INSTANCE = NACOS_URL_BASE + "/instance";
/nacos/v1/ns/instance 接口地址馏慨,請(qǐng)求方式:POST。
ok姑隅,到這里我們就分析完了,最底層服務(wù)注冊(cè)請(qǐng)求邏輯倔撞。主要有:1.隨機(jī)將服務(wù)注冊(cè)到一個(gè)服務(wù)器讲仰,直到注冊(cè)成功。2. 請(qǐng)求接口是:/nacos/v1/ns/instance痪蝇,請(qǐng)求方式是post鄙陡,通信協(xié)議采用http 協(xié)議。
- 總結(jié):
我們使用nacos 的時(shí)候躏啰,發(fā)現(xiàn)我們服務(wù)一啟動(dòng)后,就會(huì)注冊(cè)服務(wù)到注冊(cè)中心给僵,于是我們就分析了一下毫捣,nacos 服務(wù)注冊(cè)的邏輯,主要有一下幾點(diǎn):
- 服務(wù)啟動(dòng)就注冊(cè)帝际,原因是 springboot 啟動(dòng)后發(fā)送啟動(dòng)事件蔓同,然后事件監(jiān)聽(tīng)器里發(fā)送服務(wù)注冊(cè)請(qǐng)求。
- 服務(wù)底層通信蹲诀,采用http 協(xié)議
- 服務(wù)注冊(cè)的時(shí)候斑粱,隨機(jī)注冊(cè)到一個(gè)nacos 服務(wù)器,如果第一次注冊(cè)失敗脯爪,就繼續(xù)取下一服務(wù)器地址则北,發(fā)送請(qǐng)求進(jìn)行注冊(cè),直到注冊(cè)成功痕慢,便返回尚揣。
注意:有一段代碼沒(méi)有分析:
for (int i = 0; i < UtilAndComs.REQUEST_DOMAIN_RETRY_COUNT; i++) {
try {
return callServer(api, params, nacosDomain);
} catch (Exception e) {
exception = e;
NAMING_LOGGER.error("[NA] req api:" + api + " failed, server(" + nacosDomain, e);
}
}
throw new IllegalStateException("failed to req API:/api/" + api + " after all servers(" + servers + ") tried: "
+ exception.getMessage());
如果有興趣的可以 nacosDomain 取得是啥值。上面的邏輯是沒(méi)有獲取到nacos 服務(wù)器地址時(shí)的執(zhí)行邏輯掖举。
public String reqAPI(String api, Map<String, String> params, List<String> servers, String method) {
params.put(CommonParams.NAMESPACE_ID, getNamespaceId());
if (CollectionUtils.isEmpty(servers) && StringUtils.isEmpty(nacosDomain)) {
throw new IllegalArgumentException("no server available");
}
Exception exception = new Exception();
if (servers != null && !servers.isEmpty()) { // 服務(wù)器地址不為空
Random random = new Random(System.currentTimeMillis());
int index = random.nextInt(servers.size());
for (int i = 0; i < servers.size(); i++) {
String server = servers.get(index);
try {
return callServer(api, params, server, method);
} catch (NacosException e) {
exception = e;
NAMING_LOGGER.error("request {} failed.", server, e);
} catch (Exception e) {
exception = e;
NAMING_LOGGER.error("request {} failed.", server, e);
}
index = (index + 1) % servers.size();
}
throw new IllegalStateException("failed to req API:" + api + " after all servers(" + servers + ") tried: "
+ exception.getMessage());
}
// 服務(wù)端地址為空的時(shí)候惑艇,執(zhí)行這段邏輯
for (int i = 0; i < UtilAndComs.REQUEST_DOMAIN_RETRY_COUNT; i++) { // 循環(huán)注冊(cè)3次
try {
return callServer(api, params, nacosDomain);
} catch (Exception e) {
exception = e;
NAMING_LOGGER.error("[NA] req api:" + api + " failed, server(" + nacosDomain, e);
}
}
throw new IllegalStateException("failed to req API:/api/" + api + " after all servers(" + servers + ") tried: "
+ exception.getMessage());
}