完整流程
Nacos服務(wù)注冊(cè)表結(jié)構(gòu):Map<namespace, Map<group::serviceName, Service>>
舉例說(shuō)明:
一:服務(wù)注冊(cè)
客戶端閱讀入口
<!--nacos客戶端-->
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
</dependency>
1. 查看spring.factories文件中幫我們自動(dòng)裝配的類
2.查看自動(dòng)裝配的類
com.alibaba.cloud.nacos.NacosDiscoveryAutoConfiguration
->com.alibaba.cloud.nacos.NacosDiscoveryAutoConfiguration#nacosAutoServiceRegistration()
//返回 new NacosAutoServiceRegistration()
3.NacosAutoServiceRegistration里的調(diào)用鏈路
com.alibaba.cloud.nacos.NacosDiscoveryAutoConfiguration
->com.alibaba.cloud.nacos.NacosDiscoveryAutoConfiguration#nacosAutoServiceRegistration()
//返回 new NacosAutoServiceRegistration()
//監(jiān)聽(tīng)spring的WebServerInitializedEvent啟動(dòng)事件時(shí)
->org.springframework.cloud.client.serviceregistry.AbstractAutoServiceRegistration#bind
->org.springframework.cloud.client.serviceregistry.AbstractAutoServiceRegistration#start
->org.springframework.cloud.client.serviceregistry.AbstractAutoServiceRegistration#register
->com.alibaba.cloud.nacos.registry.NacosServiceRegistry#register
->com.alibaba.nacos.client.naming.NacosNamingService#registerInstance(String serviceName, Instance instance)
//做了兩件事
//1.調(diào)用com.alibaba.nacos.client.naming.beat.BeatReactor#addBeatInfo(String serviceName, BeatInfo beatInfo)添加心跳信息
//2.調(diào)用代理執(zhí)行注冊(cè)com.alibaba.nacos.client.naming.net.NamingProxy#registerService(String serviceName, String groupName, Instance instance)
->com.alibaba.nacos.client.naming.net.NamingProxy#reqAPI()
小結(jié):其實(shí)就是使用http請(qǐng)求了服務(wù)端的注冊(cè)接口
--
服務(wù)端閱讀
com.alibaba.nacos.naming.controllers.InstanceController#register
->com.alibaba.nacos.naming.core.ServiceManager#registerInstance
->com.alibaba.nacos.naming.core.ServiceManager#addInstance
->com.alibaba.nacos.naming.consistency.DelegateConsistencyServiceImpl#put
->com.alibaba.nacos.naming.consistency.ephemeral.distro.DistroConsistencyServiceImpl#put
->com.alibaba.nacos.naming.consistency.ephemeral.distro.DistroConsistencyServiceImpl#onPut
->com.alibaba.nacos.naming.consistency.ephemeral.distro.DataStore#put
->com.alibaba.nacos.naming.consistency.ephemeral.distro.DistroConsistencyServiceImpl.Notifier#addTask
添加到隊(duì)列里
public class Notifier implements Runnable {
private ConcurrentHashMap<String, String> services = new ConcurrentHashMap<>(10 * 1024);
private BlockingQueue<Pair<String, DataOperation>> tasks = new ArrayBlockingQueue<>(1024 * 1024);
/**
* Add new notify task to queue.
*
* @param datumKey data key
* @param action action for data
*/
public void addTask(String datumKey, DataOperation action) {
if (services.containsKey(datumKey) && action == DataOperation.CHANGE) {
return;
}
if (action == DataOperation.CHANGE) {
services.put(datumKey, StringUtils.EMPTY);
}
tasks.offer(Pair.with(datumKey, action));
}
public int getTaskSize() {
return tasks.size();
}
@Override
public void run() {
Loggers.DISTRO.info("distro notifier started");
for (; ; ) {
try {
Pair<String, DataOperation> pair = tasks.take();
handle(pair);
} catch (Throwable e) {
Loggers.DISTRO.error("[NACOS-DISTRO] Error while handling notifying task", e);
}
}
}
private void handle(Pair<String, DataOperation> pair) {
try {
String datumKey = pair.getValue0();
DataOperation action = pair.getValue1();
services.remove(datumKey);
int count = 0;
if (!listeners.containsKey(datumKey)) {
return;
}
for (RecordListener listener : listeners.get(datumKey)) {
count++;
try {
if (action == DataOperation.CHANGE) {
listener.onChange(datumKey, dataStore.get(datumKey).value);
continue;
}
if (action == DataOperation.DELETE) {
listener.onDelete(datumKey);
continue;
}
} catch (Throwable e) {
Loggers.DISTRO.error("[NACOS-DISTRO] error while notifying listener of key: {}", datumKey, e);
}
}
if (Loggers.DISTRO.isDebugEnabled()) {
Loggers.DISTRO
.debug("[NACOS-DISTRO] datum change notified, key: {}, listener count: {}, action: {}",
datumKey, count, action.name());
}
} catch (Throwable e) {
Loggers.DISTRO.error("[NACOS-DISTRO] Error while handling notifying task", e);
}
}
}
調(diào)用鏈路
com.alibaba.nacos.naming.consistency.ephemeral.distro.DistroConsistencyServiceImpl.Notifier#run
->com.alibaba.nacos.naming.consistency.ephemeral.distro.DistroConsistencyServiceImpl.Notifier#handle
->com.alibaba.nacos.naming.core.Service#onChange
->com.alibaba.nacos.naming.core.Service#updateIPs
->com.alibaba.nacos.naming.core.Cluster#updateIps
public void updateIps(List<Instance> ips, boolean ephemeral) {
....
toUpdateInstances = new HashSet<>(ips);
if (ephemeral) {
ephemeralInstances = toUpdateInstances;
} else {
persistentInstances = toUpdateInstances;
}
}
為Cluster類的成員變量
private Set<Instance> ephemeralInstances = new HashSet<>();
run()方法什么時(shí)候觸發(fā)?
可以看到,使用了@PostConstruct注解將notifier提交到了一個(gè)線程池里面
public class DistroConsistencyServiceImpl implements EphemeralConsistencyService, DistroDataProcessor {
....
private volatile Notifier notifier = new Notifier();
@PostConstruct
public void init() {
GlobalExecutor.submitDistroNotifyTask(notifier);
}
....
}
小結(jié):使用了阻塞隊(duì)列來(lái)提高了并發(fā)能力,但是否隊(duì)列會(huì)被撐爆?注冊(cè)成功延時(shí)會(huì)有多少?
我們可以看到隊(duì)列大小為1024*1024
同時(shí)并發(fā)注冊(cè)的情況應(yīng)該很小,此外為寫(xiě)內(nèi)存操作,所以從隊(duì)列中獲取內(nèi)容進(jìn)行消費(fèi)應(yīng)該也是很快的
private BlockingQueue<Pair<String, DataOperation>> tasks = new ArrayBlockingQueue<>(1024 * 1024);
那還為注冊(cè)完,消費(fèi)端直接從注冊(cè)中心獲取是否會(huì)讀到臟數(shù)據(jù)?
我們常規(guī)的操作可能是直接加鎖,寫(xiě)完才運(yùn)行讀,但無(wú)疑會(huì)影響吞吐量
我們來(lái)看nacos是怎么處理的?
回到
public void updateIps(List<Instance> ips, boolean ephemeral) {
//如果為ephemeral 則復(fù)制出一份副本
Set<Instance> toUpdateInstances = ephemeral ? ephemeralInstances : persistentInstances;
HashMap<String, Instance> oldIpMap = new HashMap<>(toUpdateInstances.size());
//復(fù)制操作
for (Instance ip : toUpdateInstances) {
oldIpMap.put(ip.getDatumKey(), ip);
}
//基于oldIpMap 即我們復(fù)制出來(lái)的 進(jìn)行注冊(cè)操作
List<Instance> updatedIPs = updatedIps(ips, oldIpMap.values());
....
//最終將 toUpdateInstances 賦值給ephemeralInstances 或者 persistentInstances
toUpdateInstances = new HashSet<>(ips);
if (ephemeral) {
ephemeralInstances = toUpdateInstances;
} else {
persistentInstances = toUpdateInstances;
}
}
使用到了寫(xiě)時(shí)復(fù)制,即讀寫(xiě)分離的思想
**那會(huì)不會(huì)出現(xiàn)多個(gè)實(shí)例同時(shí)寫(xiě),然后出現(xiàn)覆蓋的問(wèn)題?
我們回顧之前的初始化邏輯,只會(huì)初始化一次,所以這里是一個(gè)單線程不斷從隊(duì)列里面拿然后執(zhí)行注冊(cè)邏輯,所以不會(huì)出現(xiàn)覆蓋寫(xiě)的問(wèn)題
@PostConstruct
public void init() {
GlobalExecutor.submitDistroNotifyTask(notifier);
}
寫(xiě)時(shí)復(fù)制會(huì)不會(huì)占用很多內(nèi)存空間?
我們可以看到其實(shí)只是復(fù)制了Cluster里面的Set<Instance>集合(詳細(xì)的可能看下面的nacos的完整存儲(chǔ)模型),而不是復(fù)制了整個(gè)注冊(cè)表,所以我們使用寫(xiě)時(shí)復(fù)制時(shí)需要考慮復(fù)制的粒度問(wèn)題
我們?cè)賮?lái)看下nacos底層的存儲(chǔ)模型是什么樣的?
/**
* Core manager storing all services in Nacos.
*
* @author nkorange
*/
@Component
public class ServiceManager implements RecordListener<Service> {
/**
* Map(namespace, Map(group::serviceName, Service)).
*/
private final Map<String, Map<String, Service>> serviceMap = new ConcurrentHashMap<>();
}
Service
我們可以看到 'service --> cluster --> instance' model, in which service stores a list of clusters, which contain a list of instances.
一個(gè)sercie可能部署了一個(gè)集群,一個(gè)集群可能會(huì)有多個(gè)實(shí)例
/**
* Service of Nacos server side
*
* <p>We introduce a 'service --> cluster --> instance' model, in which service stores a list of clusters, which
* contain
* a list of instances.
*
* <p>his class inherits from Service in API module and stores some fields that do not have to expose to client.
*
* @author nkorange
*/
public class Service extends com.alibaba.nacos.api.naming.pojo.Service implements Record, RecordListener<Instances> {
private Map<String, Cluster> clusterMap = new HashMap<>();
}
Cluster
就包含了我們?cè)趗pdateIps()方法內(nèi)最終更新的persistentInstances 和ephemeralInstances
public class Cluster extends com.alibaba.nacos.api.naming.pojo.Cluster implements Cloneable {
@JsonIgnore
private Set<Instance> persistentInstances = new HashSet<>();
@JsonIgnore
private Set<Instance> ephemeralInstances = new HashSet<>();
}
小結(jié):它的數(shù)據(jù)模型就是一個(gè)Namespace下面可能會(huì)有一個(gè)服務(wù)分組,一個(gè)服務(wù)分組下面可能會(huì)有多個(gè)服務(wù),一個(gè)服務(wù)可能會(huì)有一個(gè)集群,一個(gè)集群可能會(huì)有多個(gè)實(shí)例
心跳發(fā)送
com.alibaba.nacos.client.naming.NacosNamingService#registerInstance()
->com.alibaba.nacos.client.naming.beat.BeatReactor#addBeatInfo
addBeatInfo()方法
class BeatTask implements Runnable {
BeatInfo beatInfo;
public BeatTask(BeatInfo beatInfo) {
this.beatInfo = beatInfo;
}
@Override
public void run() {
if (beatInfo.isStopped()) {
return;
}
long nextTime = beatInfo.getPeriod();
...
//1.發(fā)送心跳
JsonNode result = serverProxy.sendBeat(beatInfo, BeatReactor.this.lightBeatEnabled);
...
//繼續(xù)嵌套調(diào)用
executorService.schedule(new BeatTask(beatInfo), nextTime, TimeUnit.MILLISECONDS);
}
}
即客戶端最終使用Http請(qǐng)求服務(wù)端接口/instance/beat
服務(wù)端心跳檢查
在服務(wù)注冊(cè)的Init方法中com.alibaba.nacos.naming.core.Service#init
開(kāi)啟了心跳檢查
com.alibaba.nacos.naming.healthcheck.HealthCheckReactor#scheduleCheck(com.alibaba.nacos.naming.healthcheck.ClientBeatCheckTask)
public void init() {
HealthCheckReactor.scheduleCheck(clientBeatCheckTask);
for (Map.Entry<String, Cluster> entry : clusterMap.entrySet()) {
entry.getValue().setService(this);
entry.getValue().init();
}
}
默認(rèn)的心跳超時(shí)時(shí)間
默認(rèn)15秒
ClientBeatCheckTask.run()中instance.getInstanceHeartBeatTimeOut()
默認(rèn)的delete時(shí)間
默認(rèn)30秒
ClientBeatCheckTask.run()中instance.getIpDeleteTimeout()