很多人在學(xué)習(xí)開源的時(shí)候椭懊,無從下手,代碼那么多步势,從哪個(gè)地方開始呢氧猬?我們學(xué)習(xí)nacos,首先去到nocas的github源碼的地方鏈接: https://github.com/alibaba/nacos下載源碼到我們的idea坏瘩,打開example項(xiàng)目盅抚,
進(jìn)入APP,可以看到如下代碼:
public static void main(String[] args) throws NacosException {
Properties properties = new Properties();
properties.setProperty("serverAddr", "21.34.53.5:8848,21.34.53.6:8848");
properties.setProperty("namespace", "quickStart");
NamingService naming = NamingFactory.createNamingService(properties);
naming.registerInstance("nacos.test.3", "11.11.11.11", 8888, "TEST1");
naming.registerInstance("nacos.test.3", "2.2.2.2", 9999, "DEFAULT");
System.out.println(naming.getAllInstances("nacos.test.3"));
}
這里我們可以看到倔矾,這里構(gòu)建了一個(gè)NamingService實(shí)例妄均,同時(shí)設(shè)置了我們的nacos服務(wù)端的地址和端口,設(shè)置namespace。
我們進(jìn)入createNamingService方法
NamingService
/**
* Create a new naming service.
*
* @param properties naming service properties
* @return new naming service
* @throws NacosException nacos exception
*/
public static NamingService createNamingService(Properties properties) throws NacosException {
try {
Class<?> driverImplClass = Class.forName("com.alibaba.nacos.client.naming.NacosNamingService");
Constructor constructor = driverImplClass.getConstructor(Properties.class);
NamingService vendorImpl = (NamingService) constructor.newInstance(properties);
return vendorImpl;
} catch (Throwable e) {
throw new NacosException(NacosException.CLIENT_INVALID_PARAM, e);
}
}
這里通過反射創(chuàng)建了一個(gè)NamingService實(shí)例丛晦,實(shí)際的實(shí)現(xiàn)類是在api項(xiàng)目里面的NacosNamingService奕纫,隨后我們進(jìn)入NacosNamingService看看
NacosNamingService
1.先看里面的屬性
namespace 名字空間
endpoint 服務(wù)管理服務(wù)端地址管理服務(wù)器地址,獲取服務(wù)管理服務(wù)端地址(當(dāng) nacos server 集群需要擴(kuò)縮容時(shí)烫沙,客戶端需要有一種能力能夠及時(shí)感知到集群發(fā)生變化,及時(shí)感知到集群的變化是通過 endpoint 來實(shí)現(xiàn)的匹层。也即客戶端會(huì)定時(shí)的向endpoint發(fā)送請(qǐng)求來更新客戶端內(nèi)存中的集群列表。)
serverList 服務(wù)管理服務(wù)端地址锌蓄,可直接配置升筏,或從endpoint獲取
cacheDir 調(diào)用服務(wù)信息本地文件緩存地址
logName 暫未使用
HostReactor 客戶端關(guān)心的服務(wù)的實(shí)例信息,推拉模式的更新,failover服務(wù)實(shí)例信息讀寫管理
BeatReactor 本地實(shí)例信息心跳
EventDispatcher 服務(wù)信息變更監(jiān)聽回調(diào)處理
NamingProxy 服務(wù)管理服務(wù)端地址列表更新管理瘸爽,接口調(diào)用負(fù)載均衡您访,失敗重試
/**
* Each Naming service should have different namespace.
* 名字空間
*/
private String namespace;
/**
* 當(dāng) nacos server 集群需要擴(kuò)縮容時(shí),客戶端需要有一種能力能夠及時(shí)感知到集群發(fā)生變化剪决。
* 及時(shí)感知到集群的變化是通過 endpoint 來實(shí)現(xiàn)的灵汪。也即客戶端會(huì)定時(shí)的向 endpoint 發(fā)送請(qǐng)求來更新客戶端內(nèi)存中的集群列表。
* 服務(wù)管理服務(wù)端地址管理服務(wù)器地址柑潦,獲取服務(wù)管理服務(wù)端地址
*/
private String endpoint;
/**
* 服務(wù)管理服務(wù)端地址管理服務(wù)器地址享言,獲取服務(wù)管理服務(wù)端地址
*/
private String serverList;
/**
* 服務(wù)管理服務(wù)端地址管理服務(wù)器地址,獲取服務(wù)管理服務(wù)端地址
*/
private String cacheDir;
private String logName;
/**
* 客戶端關(guān)心的服務(wù)的實(shí)例信息,推拉模式的更新渗鬼,failover服務(wù)實(shí)例信息讀寫管理
*/
private HostReactor hostReactor;
/**
* 本地實(shí)例信息心跳
*/
private BeatReactor beatReactor;
/**
* 服務(wù)信息變更監(jiān)聽回調(diào)處理
*/
private EventDispatcher eventDispatcher;
/**
* 服務(wù)管理服務(wù)端地址列表更新管理览露,接口調(diào)用負(fù)載均衡,失敗重試
*/
private NamingProxy serverProxy;
了解了相關(guān)字段的意思我們來看看構(gòu)造方法
public NacosNamingService(Properties properties) throws NacosException {
init(properties);
}
這里其實(shí)就是執(zhí)行init初始化方法
private void init(Properties properties) throws NacosException {
ValidatorUtils.checkInitParam(properties); //檢查contextPath格式 可為空
this.namespace = InitUtils.initNamespaceForNaming(properties); //初始化命名空間
//子類實(shí)現(xiàn)類中的靜態(tài)代碼串中已經(jīng)向Jackson進(jìn)行了注冊(cè)譬胎,但是由于classloader的原因差牛,只有當(dāng) 該子類被使用的時(shí)候,才會(huì)加載該類堰乔。
// 這可能會(huì)導(dǎo)致Jackson先進(jìn)性反序列化偏化,再注冊(cè)子類,從而導(dǎo)致 反序列化失敗浩考。
//所以這里將NoneSelector夹孔、ExpressionSelector這兩個(gè)類進(jìn)行注冊(cè)或者銷毀
InitUtils.initSerialization();
//這里進(jìn)行nacos服務(wù)端地址初始化
//這里面會(huì)涉及到是否啟用endpoint
initServerAddr(properties);
//如果應(yīng)用由EDAS部署被盈,則支持阿里云的web上下文
InitUtils.initWebRootContext();
//這里初始化本地緩存的路徑及存放的registerInstance的內(nèi)容
initCacheDir();
//初始化LogName析孽,未設(shè)置用naming.log
initLogName(properties);
/**
*初始化ExecutorService線程池,創(chuàng)建名字為com.alibaba.nacos.naming.client.listener的daemon線程N(yùn)otifier
* EventDispatcher中有一個(gè)LinkedBlockingQueue隊(duì)列只怎,放的是ServiceInfo
* EventDispatcher中有ConcurrentMap<String, List<EventListener>>放入的是EventListener
*Notifier中run方法解析
* 先去隊(duì)列中彈出隊(duì)頂元素(poll方法)
* 如果為空進(jìn)行下一次循環(huán)
* 如果不為空則去ConcurrentMap取listeners
* 去除listener去監(jiān)聽NamingEvent
*
*/
this.eventDispatcher = new EventDispatcher();
/**
* 初始化服務(wù)代理袜瞬,用戶名密碼服務(wù)地址及initRefreshTask任務(wù)的線程池,創(chuàng)建com.alibaba.nacos.client.naming.updater名字的daemon線程
*/
this.serverProxy = new NamingProxy(this.namespace, this.endpoint, this.serverList, properties);
/**
* initClientBeatThreadCount(properties):Runtime.getRuntime().availableProcessors()返回到Java虛擬機(jī)的可用的處理器數(shù)量
* 創(chuàng)建一個(gè)此案城池com.alibaba.nacos.naming.beat.sender的daemon線程
*/
this.beatReactor = new BeatReactor(this.serverProxy, initClientBeatThreadCount(properties));
/**
* 同上
*/
this.hostReactor = new HostReactor(this.eventDispatcher, this.serverProxy, beatReactor, this.cacheDir,
isLoadCacheAtStart(properties), initPollingThreadCount(properties));
}
innit方法里面在初始化各個(gè)模塊身堡,具體的步驟是
1.檢查contextPath格式
2.將NoneSelector邓尤、ExpressionSelector這兩個(gè)類進(jìn)行注冊(cè)或者銷毀
3.nacos服務(wù)端地址初始化
4.如果應(yīng)用由EDAS部署,則支持阿里云的web上下文
5.這里初始化本地緩存
6.初始化LogName
7.初始化服務(wù)信息變更監(jiān)聽回調(diào)處理
8.初始化服務(wù)管理服務(wù)端地址列表更新管理,接口調(diào)用負(fù)載均衡汞扎,失敗重試
9.初始化本地實(shí)例信息心跳
10.初始化客戶端關(guān)心的服務(wù)的實(shí)例信息
說明:7-10都會(huì)初始化線程池季稳,創(chuàng)建daemon線程
總的來說,init方法為我們初始化各種本地信息澈魄,下面來看具體初始化方法
ValidatorUtils.checkInitParam(properties)
public static final String CONTEXT_PATH = "contextPath";
private static final Pattern CONTEXT_PATH_MATCH = Pattern.compile("(\\/)\\1+");
public static void checkInitParam(Properties properties) throws NacosException {
checkContextPath(properties.getProperty(PropertyKeyConst.CONTEXT_PATH));
}
/**
* Check context path.
*
* @param contextPath context path
*/
public static void checkContextPath(String contextPath) {
if (contextPath == null) {
return;
}
Matcher matcher = CONTEXT_PATH_MATCH.matcher(contextPath);
if (matcher.find()) {
throw new IllegalArgumentException("Illegal url path expression");
}
}
這里的代碼比較簡(jiǎn)單景鼠,只是檢查了一下contextPath
InitUtils.initNamespaceForNaming(properties)
/**
* Add a difference to the name naming. This method simply initializes the namespace for Naming. Config
* initialization is not the same, so it cannot be reused directly.
*
* 為名稱命名添加差異。此方法簡(jiǎn)單地初始化命名空間以進(jìn)行命名痹扇。配置初始化不一樣铛漓,所以不能直接重用。
*
* @param properties properties
* @return namespace
*/
public static String initNamespaceForNaming(Properties properties) {
String tmpNamespace = null;
String isUseCloudNamespaceParsing = properties.getProperty(PropertyKeyConst.IS_USE_CLOUD_NAMESPACE_PARSING,
System.getProperty(SystemPropertyKeyConst.IS_USE_CLOUD_NAMESPACE_PARSING,
String.valueOf(Constants.DEFAULT_USE_CLOUD_NAMESPACE_PARSING)));//默認(rèn)是true
System.out.println("isUseCloudNamespaceParsing:" + isUseCloudNamespaceParsing);
if (Boolean.parseBoolean(isUseCloudNamespaceParsing)) {
tmpNamespace = TenantUtil.getUserTenantForAns();//這里是ans鲫构,據(jù)說是注冊(cè)中心浓恶,未設(shè)置tenant.id和ans.namespace 返回為空
/**
* 這里檢查是否為空,如果不為空發(fā)返回tmpNamespace结笨,如果為空?qǐng)?zhí)行Callable.call()方法包晰,
* call()方法里面去取ans.namespace屬性,返回namespace
*/
tmpNamespace = TemplateUtils.stringEmptyAndThenExecute(tmpNamespace, new Callable<String>() {
@Override
public String call() {
String namespace = System.getProperty(SystemPropertyKeyConst.ANS_NAMESPACE);
LogUtils.NAMING_LOGGER.info("initializer namespace from System Property :" + namespace);
return namespace;
}
});
/**
* 這里檢查是否為空炕吸,如果不為空發(fā)返回tmpNamespace杜窄,如果為空?qǐng)?zhí)行Callable.call()方法,
* call()方法里面去取ALIBABA_ALIWARE_NAMESPACE環(huán)境變量
*/
tmpNamespace = TemplateUtils.stringEmptyAndThenExecute(tmpNamespace, new Callable<String>() {
@Override
public String call() {
String namespace = System.getenv(PropertyKeyConst.SystemEnv.ALIBABA_ALIWARE_NAMESPACE);
LogUtils.NAMING_LOGGER.info("initializer namespace from System Environment :" + namespace);
return namespace;
}
});
}
/**
* 這里檢查是否為空算途,如果不為空發(fā)返回tmpNamespace塞耕,如果為空?qǐng)?zhí)行Callable.call()方法,
* call()方法里面去取NAMESPACE屬性
*/
tmpNamespace = TemplateUtils.stringEmptyAndThenExecute(tmpNamespace, new Callable<String>() {
@Override
public String call() {
String namespace = System.getProperty(PropertyKeyConst.NAMESPACE);
LogUtils.NAMING_LOGGER.info("initializer namespace from System Property :" + namespace);
return namespace;
}
});
if (StringUtils.isEmpty(tmpNamespace) && properties != null) {
/**
* 這里拿到我們外面設(shè)置的namespace
*/
tmpNamespace = properties.getProperty(PropertyKeyConst.NAMESPACE);
}
/**
* 這里如果前面tmpNamespace都是null嘴瓤,則返回默認(rèn)的NAMESPACE:public
*/
tmpNamespace = TemplateUtils.stringEmptyAndThenExecute(tmpNamespace, new Callable<String>() {
@Override
public String call() {
return UtilAndComs.DEFAULT_NAMESPACE_ID;
}
});
return tmpNamespace;
}
這個(gè)方法里面先去會(huì)判斷是否使用isUseCloudNamespaceParsing扫外,默認(rèn)是true,然后回去檢查是否用ans廓脆,ALIBABA_ALIWARE筛谚。同時(shí)會(huì)拿到我們最開始設(shè)置的namespace,如果為設(shè)置停忿,則用默認(rèn)的public驾讲。
InitUtils.initSerialization()
/**
* Register subType for serialization.
*
* <p>
* Now these subType implementation class has registered in static code. But there are some problem for classloader.
* The implementation class will be loaded when they are used, which will make deserialize before register.
* </p>
*
* <p>
* 子類實(shí)現(xiàn)類中的靜態(tài)代碼串中已經(jīng)向Jackson進(jìn)行了注冊(cè),但是由于classloader的原因席赂,只有當(dāng) 該子類被使用的時(shí)候吮铭,才會(huì)加載該類。這可能會(huì)導(dǎo)致Jackson先進(jìn)性反序列化颅停,再注冊(cè)子類谓晌,從而導(dǎo)致 反序列化失敗。
* </p>
*/
public static void initSerialization() {
// TODO register in implementation class or remove subType
JacksonUtils.registerSubtype(NoneSelector.class, SelectorType.none.name());
JacksonUtils.registerSubtype(ExpressionSelector.class, SelectorType.label.name());
}
這里很簡(jiǎn)單癞揉,主要是為了防止反序列化失敗
initServerAddr(properties)
private void initServerAddr(Properties properties) {
//這里拿到我們前面填寫的nacos服務(wù)端地址
serverList = properties.getProperty(PropertyKeyConst.SERVER_ADDR);
endpoint = InitUtils.initEndpoint(properties);
if (StringUtils.isNotEmpty(endpoint)) {
serverList = "";
}
}
/**
* Init end point.
*
* @param properties properties
* @return end point
*/
public static String initEndpoint(final Properties properties) {
if (properties == null) {
return "";
}
// Whether to enable domain name resolution rules 是否啟用域名解析規(guī)則
/**
* 這里是去取end point的解析規(guī)則纸肉,即對(duì)傳入的endpoint參數(shù)規(guī)則解析的能力可以是一個(gè)具體的值溺欧,也可以是一個(gè)占位符的形式
* 1.endpoint.options 是一個(gè)具體的變量。支持從系統(tǒng)屬性柏肪,系統(tǒng)環(huán)境變量中讀取姐刁。
* 2.defaultValue 是給出的一個(gè)默認(rèn)值。當(dāng)從具體的變量中沒有被正確初始化時(shí)烦味,會(huì)使用給出的默認(rèn)值來初始化绢淀。
*
*/
String isUseEndpointRuleParsing = properties.getProperty(PropertyKeyConst.IS_USE_ENDPOINT_PARSING_RULE,
System.getProperty(SystemPropertyKeyConst.IS_USE_ENDPOINT_PARSING_RULE,
String.valueOf(ParamUtil.USE_ENDPOINT_PARSING_RULE_DEFAULT_VALUE)));
//isUseEndpointParsingRule的值決定是否啟用endpoint解析規(guī)則
boolean isUseEndpointParsingRule = Boolean.parseBoolean(isUseEndpointRuleParsing);
String endpointUrl;
if (isUseEndpointParsingRule) {//如果啟用解析規(guī)則
// Get the set domain name information
endpointUrl = ParamUtil.parsingEndpointRule(properties.getProperty(PropertyKeyConst.ENDPOINT));
if (StringUtils.isBlank(endpointUrl)) {
return "";
}
} else {//不啟用
endpointUrl = properties.getProperty(PropertyKeyConst.ENDPOINT);
}
if (StringUtils.isBlank(endpointUrl)) {
return "";
}
String endpointPort = TemplateUtils
.stringEmptyAndThenExecute(System.getenv(PropertyKeyConst.SystemEnv.ALIBABA_ALIWARE_ENDPOINT_PORT),
new Callable<String>() {
@Override
public String call() {
return properties.getProperty(PropertyKeyConst.ENDPOINT_PORT);
}
});
endpointPort = TemplateUtils.stringEmptyAndThenExecute(endpointPort, new Callable<String>() {
@Override
public String call() {
return "8080";
}
});
return endpointUrl + ":" + endpointPort;
}
第一部分是設(shè)置serverList為我們最開始設(shè)置的服務(wù)端地址
第二部分設(shè)置我們的endpoint規(guī)則
InitUtils.initWebRootContext()
/**
* Init web root context.
*/
public static void initWebRootContext() {
// support the web context with ali-yun if the app deploy by EDAS
final String webContext = System.getProperty(SystemPropertyKeyConst.NAMING_WEB_CONTEXT);
TemplateUtils.stringNotEmptyAndThenExecute(webContext, new Runnable() {
@Override
public void run() {
UtilAndComs.webContext = webContext.indexOf("/") > -1 ? webContext : "/" + webContext;
UtilAndComs.nacosUrlBase = UtilAndComs.webContext + "/v1/ns";
UtilAndComs.nacosUrlInstance = UtilAndComs.nacosUrlBase + "/instance";
}
});
}
這里如果應(yīng)用由EDAS部署矛缨,則支持阿里云的web上下文
initCacheDir()
private void initCacheDir() {
cacheDir = System.getProperty("com.alibaba.nacos.naming.cache.dir");
if (StringUtils.isEmpty(cacheDir)) {
cacheDir = System.getProperty("user.home") + "/nacos/naming/" + namespace;
}
}
這里初始化本地實(shí)例信息八回,在本地你會(huì)看到這樣的文件
C:\Users\nacos\naming\quickStart
[圖片上傳失敗...(image-bbdedd-1600251861321)]
initLogName(properties)
private void initLogName(Properties properties) {
logName = System.getProperty(UtilAndComs.NACOS_NAMING_LOG_NAME);
if (StringUtils.isEmpty(logName)) {
if (properties != null && StringUtils
.isNotEmpty(properties.getProperty(UtilAndComs.NACOS_NAMING_LOG_NAME))) {
logName = properties.getProperty(UtilAndComs.NACOS_NAMING_LOG_NAME);
} else {
logName = "naming.log";
}
}
}
這里設(shè)置logname萄焦,目前沒看到哪里用
new EventDispatcher()
public EventDispatcher() {
this.executor = Executors.newSingleThreadExecutor(new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r, "com.alibaba.nacos.naming.client.listener");
thread.setDaemon(true);
return thread;
}
});
this.executor.execute(new Notifier());
}
我們看到構(gòu)造方法里面初始化了一個(gè)線程池,并且加入了Notifier這個(gè)線程凤瘦,我們來看看Notifier的run方法
@Override
public void run() {
while (!closed) {
ServiceInfo serviceInfo = null;
try {
serviceInfo = changedServices.poll(5, TimeUnit.MINUTES);
} catch (Exception ignore) {
}
if (serviceInfo == null) {
continue;
}
try {
List<EventListener> listeners = observerMap.get(serviceInfo.getKey());
if (!CollectionUtils.isEmpty(listeners)) {
for (EventListener listener : listeners) {
List<Instance> hosts = Collections.unmodifiableList(serviceInfo.getHosts());
listener.onEvent(new NamingEvent(serviceInfo.getName(), serviceInfo.getGroupName(),
serviceInfo.getClusters(), hosts));
}
}
} catch (Exception e) {
NAMING_LOGGER.error("[NA] notify error for service: " + serviceInfo.getName() + ", clusters: "
+ serviceInfo.getClusters(), e);
}
}
}
先去隊(duì)列中彈出隊(duì)頂元素(poll方法)
如果為空進(jìn)行下一次循環(huán)
如果不為空則去ConcurrentMap取listeners
取出listener去監(jiān)聽NamingEvent
new NamingProxy(this.namespace, this.endpoint, this.serverList, properties)
public NamingProxy(String namespaceId, String endpoint, String serverList, Properties properties) {
this.securityProxy = new SecurityProxy(properties, nacosRestTemplate);
this.properties = properties;
this.setServerPort(DEFAULT_SERVER_PORT);
this.namespaceId = namespaceId;
this.endpoint = endpoint;
if (StringUtils.isNotEmpty(serverList)) {
this.serverList = Arrays.asList(serverList.split(","));
if (this.serverList.size() == 1) {
this.nacosDomain = serverList;
}
}
this.initRefreshTask();
}
這里的初始化動(dòng)作很多 我們一個(gè)一個(gè)看
首先看new SecurityProxy(properties, nacosRestTemplate)
/**
* Construct from properties, keeping flexibility.
*
* @param properties a bunch of properties to read
*/
public SecurityProxy(Properties properties, NacosRestTemplate nacosRestTemplate) {
username = properties.getProperty(PropertyKeyConst.USERNAME, StringUtils.EMPTY);
password = properties.getProperty(PropertyKeyConst.PASSWORD, StringUtils.EMPTY);
contextPath = properties.getProperty(PropertyKeyConst.CONTEXT_PATH, "/nacos");
contextPath = contextPath.startsWith("/") ? contextPath : "/" + contextPath;
this.nacosRestTemplate = nacosRestTemplate;
}
這里設(shè)置了用戶名和密碼同時(shí)初始化了nacosRestTemplate宿礁,nacosRestTemplate是客戶端發(fā)送信息到服務(wù)端的類,里面用HttpClient實(shí)現(xiàn)蔬芥,有興趣的可以去看看
再看看initRefreshTask方法
private void initRefreshTask() {
this.executorService = new ScheduledThreadPoolExecutor(2, new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
t.setName("com.alibaba.nacos.client.naming.updater");
t.setDaemon(true);
return t;
}
});
this.executorService.scheduleWithFixedDelay(new Runnable() {
@Override
public void run() {
refreshSrvIfNeed();
}
}, 0, vipSrvRefInterMillis, TimeUnit.MILLISECONDS);
this.executorService.scheduleWithFixedDelay(new Runnable() {
@Override
public void run() {
securityProxy.login(getServerList());
}
}, 0, securityInfoRefreshIntervalMills, TimeUnit.MILLISECONDS);
refreshSrvIfNeed();
this.securityProxy.login(getServerList());
}
首先初始化一個(gè)線程池梆靖,同時(shí)refreshSrvIfNeed去拿服務(wù)端serverList,同時(shí)securityProxy.login登陸到拿到的服務(wù)端列表笔诵。
new BeatReactor(this.serverProxy, initClientBeatThreadCount(properties))
我們先看看initClientBeatThreadCount方法
public static final int DEFAULT_CLIENT_BEAT_THREAD_COUNT =
Runtime.getRuntime().availableProcessors() > 1 ? Runtime.getRuntime().availableProcessors() / 2 : 1;
private int initClientBeatThreadCount(Properties properties) {
if (properties == null) {
return UtilAndComs.DEFAULT_CLIENT_BEAT_THREAD_COUNT;
}
return ConvertUtils.toInt(properties.getProperty(PropertyKeyConst.NAMING_CLIENT_BEAT_THREAD_COUNT),
UtilAndComs.DEFAULT_CLIENT_BEAT_THREAD_COUNT);
}
通過 Runtime.getRuntime().availableProcessors()方法拿到Java虛擬機(jī)的可用的處理器數(shù)量返吻,下面我們看看構(gòu)造方法
public BeatReactor(NamingProxy serverProxy, int threadCount) {
this.serverProxy = serverProxy;
this.executorService = new ScheduledThreadPoolExecutor(threadCount, new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
thread.setDaemon(true);
thread.setName("com.alibaba.nacos.naming.beat.sender");
return thread;
}
});
}
這里只是初始化了線程池,本身這個(gè)BeatReactor有一個(gè)內(nèi)部類BeatTask執(zhí)行本地實(shí)例注冊(cè)到服務(wù)端做心跳檢測(cè)
new HostReactor(this.eventDispatcher, this.serverProxy, beatReactor, this.cacheDir, isLoadCacheAtStart(properties), initPollingThreadCount(properties))
這里先看看isLoadCacheAtStart
private boolean isLoadCacheAtStart(Properties properties) {
boolean loadCacheAtStart = false;
if (properties != null && StringUtils
.isNotEmpty(properties.getProperty(PropertyKeyConst.NAMING_LOAD_CACHE_AT_START))) {
loadCacheAtStart = ConvertUtils
.toBoolean(properties.getProperty(PropertyKeyConst.NAMING_LOAD_CACHE_AT_START));
}
return loadCacheAtStart;
}
這個(gè)方法比較簡(jiǎn)單乎婿,只是設(shè)置了是否加載本地緩存测僵,下面我們看看構(gòu)造方法
public HostReactor(EventDispatcher eventDispatcher, NamingProxy serverProxy, BeatReactor beatReactor,
String cacheDir, boolean loadCacheAtStart, int pollingThreadCount) {
// init executorService
this.executor = new ScheduledThreadPoolExecutor(pollingThreadCount, new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
thread.setDaemon(true);
thread.setName("com.alibaba.nacos.client.naming.updater");
return thread;
}
});
this.eventDispatcher = eventDispatcher;
this.beatReactor = beatReactor;
this.serverProxy = serverProxy;
this.cacheDir = cacheDir;
if (loadCacheAtStart) {
this.serviceInfoMap = new ConcurrentHashMap<String, ServiceInfo>(DiskCache.read(this.cacheDir));
} else {
this.serviceInfoMap = new ConcurrentHashMap<String, ServiceInfo>(16);
}
this.updatingMap = new ConcurrentHashMap<String, Object>();
this.failoverReactor = new FailoverReactor(this, cacheDir);
this.pushReceiver = new PushReceiver(this);
}
這里初始化了一些本地緩存的內(nèi)容,我們主要看看FailoverReactor和PushReceiver
FailoverReactor
public FailoverReactor(HostReactor hostReactor, String cacheDir) {
this.hostReactor = hostReactor;
this.failoverDir = cacheDir + "/failover";
// init executorService
this.executorService = new ScheduledThreadPoolExecutor(1, new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
thread.setDaemon(true);
thread.setName("com.alibaba.nacos.naming.failover");
return thread;
}
});
this.init();
}
public void init() {
executorService.scheduleWithFixedDelay(new SwitchRefresher(), 0L, 5000L, TimeUnit.MILLISECONDS);
executorService.scheduleWithFixedDelay(new DiskFileWriter(), 30, DAY_PERIOD_MINUTES, TimeUnit.MINUTES);
// backup file on startup if failover directory is empty.
executorService.schedule(new Runnable() {
@Override
public void run() {
try {
File cacheDir = new File(failoverDir);
if (!cacheDir.exists() && !cacheDir.mkdirs()) {
throw new IllegalStateException("failed to create cache dir: " + failoverDir);
}
File[] files = cacheDir.listFiles();
if (files == null || files.length <= 0) {
new DiskFileWriter().run();
}
} catch (Throwable e) {
NAMING_LOGGER.error("[NA] failed to backup file on startup.", e);
}
}
}, 10000L, TimeUnit.MILLISECONDS);
}
這里是操作本地實(shí)例信息的一些線程谢翎,F(xiàn)ailoverReactor通過一個(gè)文件配置激活failover模式捍靠。該模式下,會(huì)從本地文件中讀取服務(wù) 列表信息森逮。
PushReceiver
這里主要看run方法
@Override
public void run() {
while (!closed) {
try {
// byte[] is initialized with 0 full filled by default
byte[] buffer = new byte[UDP_MSS];
DatagramPacket packet = new DatagramPacket(buffer, buffer.length);
udpSocket.receive(packet);
String json = new String(IoUtils.tryDecompress(packet.getData()), UTF_8).trim();
NAMING_LOGGER.info("received push data: " + json + " from " + packet.getAddress().toString());
PushPacket pushPacket = JacksonUtils.toObj(json, PushPacket.class);
String ack;
if ("dom".equals(pushPacket.type) || "service".equals(pushPacket.type)) {
hostReactor.processServiceJson(pushPacket.data);
// send ack to server
ack = "{\"type\": \"push-ack\"" + ", \"lastRefTime\":\"" + pushPacket.lastRefTime + "\", \"data\":"
+ "\"\"}";
} else if ("dump".equals(pushPacket.type)) {
// dump data to server
ack = "{\"type\": \"dump-ack\"" + ", \"lastRefTime\": \"" + pushPacket.lastRefTime + "\", \"data\":"
+ "\"" + StringUtils.escapeJavaScript(JacksonUtils.toJson(hostReactor.getServiceInfoMap()))
+ "\"}";
} else {
// do nothing send ack only
ack = "{\"type\": \"unknown-ack\"" + ", \"lastRefTime\":\"" + pushPacket.lastRefTime
+ "\", \"data\":" + "\"\"}";
}
udpSocket.send(new DatagramPacket(ack.getBytes(UTF_8), ack.getBytes(UTF_8).length,
packet.getSocketAddress()));
} catch (Exception e) {
NAMING_LOGGER.error("[NA] error while receiving push data", e);
}
}
}
run方法使用while true循環(huán)來執(zhí)行udpSocket.receive(packet)榨婆,之后將接收到的數(shù)據(jù)解析為PushPacket,然后根據(jù)不同pushPacket.type做不同處理
當(dāng)pushPacket.type為dom或者service的時(shí)候會(huì)調(diào)用hostReactor.processServiceJSON(pushPacket.data)褒侧;當(dāng)pushPacket.type為dump的時(shí)候會(huì)將hostReactor.getServiceInfoMap()序列化到ack中良风,最后將ack返回回去
至此,初始化工作就完成了闷供,下面我們來看看如何注冊(cè)namespace
registerInstance
public void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException {
String groupedServiceName = NamingUtils.getGroupedName(serviceName, groupName);
/**
*ephemeral
*短暫的
*/
if (instance.isEphemeral()) {
BeatInfo beatInfo = beatReactor.buildBeatInfo(groupedServiceName, instance);
/**
* BeatTask加入到線程池中烟央,線程池中線程注冊(cè)實(shí)例到服務(wù)端
*
* 利用prometheus監(jiān)控
*/
beatReactor.addBeatInfo(groupedServiceName, beatInfo);
}
//這里也是注冊(cè)實(shí)例到服務(wù)端,beatReactor里面也是本地實(shí)例心跳
serverProxy.registerService(groupedServiceName, groupName, instance);
}
registerInstance方法第一步獲取GroupedName这吻,然后看instance是否短暫的吊档,如果是執(zhí)行beatReactor.addBeatInfo方法篙议,注冊(cè)及監(jiān)控唾糯,最后也是通過serverProxy注冊(cè)namespace
我們直接來看addBeatInfo方法
/**
* Add beat information.
*
* @param serviceName service name
* @param beatInfo beat information
*/
public void addBeatInfo(String serviceName, BeatInfo beatInfo) {
NAMING_LOGGER.info("[BEAT] adding beat: {} to beat map.", beatInfo);
String key = buildKey(serviceName, beatInfo.getIp(), beatInfo.getPort());
BeatInfo existBeat = null;
//fix #1733
if ((existBeat = dom2Beat.remove(key)) != null) {
existBeat.setStopped(true);
}
dom2Beat.put(key, beatInfo);
executorService.schedule(new BeatTask(beatInfo), beatInfo.getPeriod(), TimeUnit.MILLISECONDS);
MetricsMonitor.getDom2BeatSizeMonitor().set(dom2Beat.size());
}
先是buildKey生成key怠硼,查看beatinfo是否存在,不存在put移怯;然后執(zhí)行BeatTask,我們來看看BeatTask的run方法
public void run() {
if (beatInfo.isStopped()) {
return;
}
long nextTime = beatInfo.getPeriod();
try {
JsonNode result = serverProxy.sendBeat(beatInfo, BeatReactor.this.lightBeatEnabled);
long interval = result.get("clientBeatInterval").asLong();
boolean lightBeatEnabled = false;
if (result.has(CommonParams.LIGHT_BEAT_ENABLED)) {
lightBeatEnabled = result.get(CommonParams.LIGHT_BEAT_ENABLED).asBoolean();
}
BeatReactor.this.lightBeatEnabled = lightBeatEnabled;
if (interval > 0) {
nextTime = interval;
}
int code = NamingResponseCode.OK;
if (result.has(CommonParams.CODE)) {
code = result.get(CommonParams.CODE).asInt();
}
if (code == NamingResponseCode.RESOURCE_NOT_FOUND) {
Instance instance = new Instance();
instance.setPort(beatInfo.getPort());
instance.setIp(beatInfo.getIp());
instance.setWeight(beatInfo.getWeight());
instance.setMetadata(beatInfo.getMetadata());
instance.setClusterName(beatInfo.getCluster());
instance.setServiceName(beatInfo.getServiceName());
instance.setInstanceId(instance.getInstanceId());
instance.setEphemeral(true);
try {
/**
* 注冊(cè)實(shí)例到服務(wù)端 reqApi(UtilAndComs.nacosUrlInstance, params, HttpMethod.POST);
*/
serverProxy.registerService(beatInfo.getServiceName(),
NamingUtils.getGroupName(beatInfo.getServiceName()), instance);
} catch (Exception ignore) {
}
}
} catch (NacosException ex) {
NAMING_LOGGER.error("[CLIENT-BEAT] failed to send beat: {}, code: {}, msg: {}",
JacksonUtils.toJson(beatInfo), ex.getErrCode(), ex.getErrMsg());
}
executorService.schedule(new BeatTask(beatInfo), nextTime, TimeUnit.MILLISECONDS);
}
這個(gè)方法里面最重要兩步就是第一步sendBeat發(fā)送心跳香璃,第二步通過reqApi注冊(cè)實(shí)例到服務(wù)端
然后看看registerService方法
/**
* register a instance to service with specified instance properties.
*
* @param serviceName name of service
* @param groupName group of service
* @param instance instance to register
* @throws NacosException nacos exception
*/
public void registerService(String serviceName, String groupName, Instance instance) throws NacosException {
NAMING_LOGGER.info("[REGISTER-SERVICE] {} registering service {} with instance: {}", namespaceId, serviceName,
instance);
final Map<String, String> params = new HashMap<String, String>(16);
params.put(CommonParams.NAMESPACE_ID, namespaceId);
params.put(CommonParams.SERVICE_NAME, serviceName);
params.put(CommonParams.GROUP_NAME, groupName);
params.put(CommonParams.CLUSTER_NAME, instance.getClusterName());
params.put("ip", instance.getIp());
params.put("port", String.valueOf(instance.getPort()));
params.put("weight", String.valueOf(instance.getWeight()));
params.put("enable", String.valueOf(instance.isEnabled()));
params.put("healthy", String.valueOf(instance.isHealthy()));
params.put("ephemeral", String.valueOf(instance.isEphemeral()));
params.put("metadata", JacksonUtils.toJson(instance.getMetadata()));
reqApi(UtilAndComs.nacosUrlInstance, params, HttpMethod.POST);
}
這里主要配置參數(shù),然后通過reqApi注冊(cè)實(shí)例到服務(wù)端舟误,注冊(cè)實(shí)例到服務(wù)端代碼如下
/**
* Call server.
*
* @param api api
* @param params parameters
* @param body body
* @param curServer ?
* @param method http method
* @return result
* @throws NacosException nacos exception
*/
public String callServer(String api, Map<String, String> params, Map<String, String> body, String curServer,
String method) throws NacosException {
long start = System.currentTimeMillis();
long end = 0;
injectSecurityInfo(params);
Header header = builderHeader();
String url;
if (curServer.startsWith(UtilAndComs.HTTPS) || curServer.startsWith(UtilAndComs.HTTP)) {
url = curServer + api;
} else {
if (!curServer.contains(UtilAndComs.SERVER_ADDR_IP_SPLITER)) {
curServer = curServer + UtilAndComs.SERVER_ADDR_IP_SPLITER + serverPort;
}
url = NamingHttpClientManager.getInstance().getPrefix() + curServer + api;
}
try {
HttpRestResult<String> restResult = nacosRestTemplate
.exchangeForm(url, header, Query.newInstance().initParams(params), body, method, String.class);
end = System.currentTimeMillis();
MetricsMonitor.getNamingRequestMonitor(method, url, String.valueOf(restResult.getCode()))
.observe(end - start);
if (restResult.ok()) {
return restResult.getData();
}
if (HttpStatus.SC_NOT_MODIFIED == restResult.getCode()) {
return StringUtils.EMPTY;
}
throw new NacosException(restResult.getCode(), restResult.getMessage());
} catch (Exception e) {
NAMING_LOGGER.error("[NA] failed to request", e);
throw new NacosException(NacosException.SERVER_ERROR, e);
}
}
我們看到葡秒,這里是通過最開始初始化的nacosRestTemplate發(fā)送的。