前言
再講Nacos之前型凳,先來(lái)講一下服務(wù)注冊(cè)和發(fā)現(xiàn)心俗。我們知道,現(xiàn)在微服務(wù)架構(gòu)是目前開發(fā)的一個(gè)趨勢(shì)回官。服務(wù)消費(fèi)者要去調(diào)用多個(gè)服務(wù)提供者組成的集群曹宴。這里需要做到以下幾點(diǎn):
1、服務(wù)消費(fèi)者需要在本地配置文件中維護(hù)服務(wù)提供者集群的每個(gè)節(jié)點(diǎn)的請(qǐng)求地址歉提。
2笛坦、服務(wù)提供者集群中如果某個(gè)節(jié)點(diǎn)宕機(jī)区转,服務(wù)消費(fèi)者的本地配置中需要同步刪除這個(gè)節(jié)點(diǎn)的請(qǐng)求地址,防止請(qǐng)求發(fā)送到已經(jīng)宕機(jī)的節(jié)點(diǎn)上造成請(qǐng)求失敗版扩。
因此需要引入服務(wù)注冊(cè)中心废离,它具有以下幾個(gè)功能:
- 1、服務(wù)地址的管理礁芦。
- 2蜻韭、服務(wù)注冊(cè)。
- 3宴偿、服務(wù)動(dòng)態(tài)感知湘捎。
一、Nacos介紹
Nacos致力于解決微服務(wù)中的統(tǒng)一配置窄刘,服務(wù)注冊(cè)和發(fā)現(xiàn)等問題。Nacos集成了注冊(cè)中心和配置中心舷胜。其相關(guān)特性包括:
- 1娩践、服務(wù)發(fā)現(xiàn)和服務(wù)健康監(jiān)測(cè)。
Nacos支持基于DNS和RPC的服務(wù)發(fā)現(xiàn)烹骨,即服務(wù)消費(fèi)者可以使用DNS或者HTTP的方式來(lái)查找和發(fā)現(xiàn)服務(wù)翻伺。
Nacos提供對(duì)服務(wù)的實(shí)時(shí)的健康檢查,阻止向不健康的主機(jī)或者服務(wù)實(shí)例發(fā)送請(qǐng)求沮焕。Nacos支持傳輸層(Ping/TCP)吨岭、應(yīng)用層(HTTP、Mysql)的健康檢查峦树。
- 2辣辫、動(dòng)態(tài)配置服務(wù)。
動(dòng)態(tài)配置服務(wù)可以以中心化魁巩、外部化和動(dòng)態(tài)化的方式管理所有環(huán)境的應(yīng)用配置和服務(wù)配置急灭。
- 3、動(dòng)態(tài)DNS服務(wù)谷遂。
支持權(quán)重路由葬馋,讓開發(fā)者更容易的實(shí)現(xiàn)中間層的負(fù)載均衡、更靈活的路由策略肾扰、流量控制以及DNS解析服務(wù)畴嘶。
- 4盗痒、服務(wù)和元數(shù)據(jù)管理芭析。
Nacos允許開發(fā)者從微服務(wù)平臺(tái)建設(shè)的視角來(lái)管理數(shù)據(jù)中心的所有服務(wù)和元數(shù)據(jù)。如:服務(wù)的生命周期苔严、靜態(tài)依賴分析甩恼、服務(wù)的健康狀態(tài)蟀瞧、服務(wù)的流量管理沉颂、路由和安全策略等。
二悦污、Nacos注冊(cè)中心實(shí)現(xiàn)原理分析
2.1 Nacos架構(gòu)圖
以下是Nacos的架構(gòu)圖:
其中分為這么幾個(gè)模塊:
- Provider APP:服務(wù)提供者铸屉。
- Consumer APP:服務(wù)消費(fèi)者。
- Name Server:通過Virtual IP或者DNS的方式實(shí)現(xiàn)Nacos高可用集群的服務(wù)路由切端。
- Nacos Server:Nacos服務(wù)提供者彻坛。
- Nacos Console:Nacos控制臺(tái)。
Nacos Server其中包含:
- OpenAPI:功能訪問入口踏枣。
- Config Service昌屉、Naming Service:Nacos提供的配置服務(wù)、名字服務(wù)模塊茵瀑。
- Consistency Protocol:一致性協(xié)議间驮,用來(lái)實(shí)現(xiàn)Nacos集群節(jié)點(diǎn)的數(shù)據(jù)同步,使用Raft算法實(shí)現(xiàn)马昨。
小總結(jié):
服務(wù)提供者通過VIP(Virtual IP)訪問Nacos Server高可用集群竞帽,基于OpenAPI完成服務(wù)的注冊(cè)和服務(wù)的查詢。
Nacos Server的底層則通過數(shù)據(jù)一致性算法(Raft)來(lái)完成節(jié)點(diǎn)的數(shù)據(jù)同步鸿捧。
2.2 注冊(cè)中心的原理
這里對(duì)其原理做一個(gè)大致的介紹屹篓,在后文則從源碼角度進(jìn)行分析。
首先匙奴,服務(wù)注冊(cè)的功能體現(xiàn)在:
- 服務(wù)實(shí)例啟動(dòng)時(shí)注冊(cè)到服務(wù)注冊(cè)表堆巧、關(guān)閉時(shí)則注銷(服務(wù)注冊(cè))。
- 服務(wù)消費(fèi)者可以通過查詢服務(wù)注冊(cè)表來(lái)獲得可用的實(shí)例(服務(wù)發(fā)現(xiàn))泼菌。
- 服務(wù)注冊(cè)中心需要調(diào)用服務(wù)實(shí)例的健康檢查API來(lái)驗(yàn)證其是否可以正確的處理請(qǐng)求(健康檢查)谍肤。
大致流程:每個(gè)服務(wù)都會(huì)有一個(gè)nacos client,它用來(lái)和nacos server打交道灶轰,用來(lái)具體的服務(wù)注冊(cè)谣沸、查詢等操作,服務(wù)提供者在啟動(dòng)的時(shí)候會(huì)向nacos server注冊(cè)自己笋颤,服務(wù)消費(fèi)者在啟動(dòng)的時(shí)候訂閱nacos server上的服務(wù)提供者乳附。
Nacos服務(wù)注冊(cè)和發(fā)現(xiàn)的實(shí)現(xiàn)原理的圖如下:
三、服務(wù)注冊(cè)
首先需要引入spring-cloud-starter-alibaba-nacos-discovery包
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.3.12.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
</dependency>
</dependencies>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-alibaba-dependencies</artifactId>
<version>2.2.6.RELEASE</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
-
根據(jù)spring.factories配置來(lái)完成相關(guān)類的自動(dòng)注冊(cè)伴澄。
重點(diǎn)來(lái)看這幾個(gè)類赋除,看名稱可猜到是用來(lái)服務(wù)注冊(cè)的,NacosServiceRegistryAutoConfiguration用來(lái)注冊(cè)管理這幾個(gè)bean非凌。
@Configuration(proxyBeanMethods = false)
@EnableConfigurationProperties
@ConditionalOnNacosDiscoveryEnabled
@ConditionalOnProperty(value = "spring.cloud.service-registry.auto-registration.enabled",
matchIfMissing = true)
@AutoConfigureAfter({ AutoServiceRegistrationConfiguration.class,
AutoServiceRegistrationAutoConfiguration.class,
NacosDiscoveryAutoConfiguration.class })
public class NacosServiceRegistryAutoConfiguration {
@Bean
public NacosServiceRegistry nacosServiceRegistry(
NacosDiscoveryProperties nacosDiscoveryProperties) {
return new NacosServiceRegistry(nacosDiscoveryProperties);
}
@Bean
@ConditionalOnBean(AutoServiceRegistrationProperties.class)
public NacosRegistration nacosRegistration(
ObjectProvider<List<NacosRegistrationCustomizer>> registrationCustomizers,
NacosDiscoveryProperties nacosDiscoveryProperties,
ApplicationContext context) {
return new NacosRegistration(registrationCustomizers.getIfAvailable(),
nacosDiscoveryProperties, context);
}
@Bean
@ConditionalOnBean(AutoServiceRegistrationProperties.class)
public NacosAutoServiceRegistration nacosAutoServiceRegistration(
NacosServiceRegistry registry,
AutoServiceRegistrationProperties autoServiceRegistrationProperties,
NacosRegistration registration) {
return new NacosAutoServiceRegistration(registry,
autoServiceRegistrationProperties, registration);
}
}
NacosServiceRegistry:完成服務(wù)注冊(cè)举农,實(shí)現(xiàn)ServiceRegistry。
NacosRegistration:用來(lái)注冊(cè)時(shí)存儲(chǔ)nacos服務(wù)端的相關(guān)信息敞嗡。
NacosAutoServiceRegistration 繼承spring中的AbstractAutoServiceRegistration颁糟,AbstractAutoServiceRegistration實(shí)現(xiàn)ApplicationListener<WebServerInitializedEvent>航背,通過事件監(jiān)聽來(lái)發(fā)起服務(wù)注冊(cè),到時(shí)候會(huì)調(diào)用NacosServiceRegistry.register(registration)
來(lái)看具體如何注冊(cè)
/*************************************************NacosServiceRegistry**************************************************/
public class NacosServiceRegistry implements ServiceRegistry<Registration> {
@Override
public void register(Registration registration) {
if (StringUtils.isEmpty(registration.getServiceId())) {
log.warn("No service to register for nacos client...");
return;
}
NamingService namingService = namingService();
String serviceId = registration.getServiceId();
String group = nacosDiscoveryProperties.getGroup();
Instance instance = getNacosInstanceFromRegistration(registration);
try {
namingService.registerInstance(serviceId, group, instance);
log.info("nacos registry, {} {} {}:{} register finished", group, serviceId,
instance.getIp(), instance.getPort());
}
catch (Exception e) {
if (nacosDiscoveryProperties.isFailFast()) {
log.error("nacos registry, {} register failed...{},", serviceId,
registration.toString(), e);
rethrowRuntimeException(e);
}
else {
log.warn("Failfast is false. {} register failed...{},", serviceId,
registration.toString(), e);
}
}
}
}
/**************************************************NacosNamingService************************************************/
public class NacosNamingService implements NamingService {
@Override
public void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException {
NamingUtils.checkInstanceIsLegal(instance);
String groupedServiceName = NamingUtils.getGroupedName(serviceName, groupName);
if (instance.isEphemeral()) {
BeatInfo beatInfo = beatReactor.buildBeatInfo(groupedServiceName, instance);
// 添加心跳檢測(cè)
beatReactor.addBeatInfo(groupedServiceName, beatInfo);
}
// 完成服務(wù)注冊(cè)
serverProxy.registerService(groupedServiceName, groupName, instance);
}
}
/***************************************************BeatReactor***************************************************/
public class BeatReactor implements Closeable {
private final ScheduledExecutorService executorService;
public final Map<String, BeatInfo> dom2Beat = new ConcurrentHashMap<String, BeatInfo>();
public void addBeatInfo(String serviceName, BeatInfo beatInfo) {
NAMING_LOGGER.info("[BEAT] adding beat: {} to beat map.", beatInfo);
String key = buildKey(serviceName, beatInfo.getIp(), beatInfo.getPort());
BeatInfo existBeat = null;
//fix #1733
if ((existBeat = dom2Beat.remove(key)) != null) {
existBeat.setStopped(true);
}
dom2Beat.put(key, beatInfo);
// 發(fā)起一個(gè)心跳檢測(cè)任務(wù)
executorService.schedule(new BeatTask(beatInfo), beatInfo.getPeriod(), TimeUnit.MILLISECONDS);
MetricsMonitor.getDom2BeatSizeMonitor().set(dom2Beat.size());
}
/******************************************************BeatTask******************************************************/
class BeatTask implements Runnable {
@Override
public void run() {
if (beatInfo.isStopped()) {
return;
}
long nextTime = beatInfo.getPeriod();
try {
// 向nacos服務(wù)發(fā)起心跳檢測(cè)
JsonNode result = serverProxy.sendBeat(beatInfo, BeatReactor.this.lightBeatEnabled);
long interval = result.get("clientBeatInterval").asLong();
boolean lightBeatEnabled = false;
if (result.has(CommonParams.LIGHT_BEAT_ENABLED)) {
lightBeatEnabled = result.get(CommonParams.LIGHT_BEAT_ENABLED).asBoolean();
}
BeatReactor.this.lightBeatEnabled = lightBeatEnabled;
if (interval > 0) {
nextTime = interval;
}
int code = NamingResponseCode.OK;
if (result.has(CommonParams.CODE)) {
code = result.get(CommonParams.CODE).asInt();
}
if (code == NamingResponseCode.RESOURCE_NOT_FOUND) {
Instance instance = new Instance();
instance.setPort(beatInfo.getPort());
instance.setIp(beatInfo.getIp());
instance.setWeight(beatInfo.getWeight());
instance.setMetadata(beatInfo.getMetadata());
instance.setClusterName(beatInfo.getCluster());
instance.setServiceName(beatInfo.getServiceName());
instance.setInstanceId(instance.getInstanceId());
instance.setEphemeral(true);
try {
// 未注冊(cè) 先完成注冊(cè)
serverProxy.registerService(beatInfo.getServiceName(),
NamingUtils.getGroupName(beatInfo.getServiceName()), instance);
} catch (Exception ignore) {
}
}
} catch (NacosException ex) {
NAMING_LOGGER.error("[CLIENT-BEAT] failed to send beat: {}, code: {}, msg: {}",
JacksonUtils.toJson(beatInfo), ex.getErrCode(), ex.getErrMsg());
} catch (Exception unknownEx) {
NAMING_LOGGER.error("[CLIENT-BEAT] failed to send beat: {}, unknown exception msg: {}",
JacksonUtils.toJson(beatInfo), unknownEx.getMessage(), unknownEx);
} finally {
// 發(fā)起下一次心跳檢測(cè)
executorService.schedule(new BeatTask(beatInfo), nextTime, TimeUnit.MILLISECONDS);
}
}
}
}
服務(wù)提供者向nacos server發(fā)起服務(wù)注冊(cè)前棱貌,先向nacos server建立起心跳檢測(cè)機(jī)制玖媚,nacos server那邊也有一個(gè)心跳檢測(cè),服務(wù)提供者不停的向nacos server發(fā)起心跳檢測(cè)婚脱,告知自己的健康狀態(tài)今魔,nacos server發(fā)現(xiàn)該服務(wù)心跳檢測(cè)時(shí)間超時(shí)會(huì)發(fā)布超時(shí)事件來(lái)告知服務(wù)消費(fèi)者。
服務(wù)發(fā)現(xiàn)
服務(wù)發(fā)現(xiàn)由NacosWatch完成障贸,它實(shí)現(xiàn)了Spring的Lifecycle接口错森,容器啟動(dòng)和銷毀時(shí)會(huì)調(diào)用對(duì)應(yīng)的start()和stop()方法。
來(lái)看對(duì)應(yīng)源碼
public class NacosWatch implements ApplicationEventPublisherAware, SmartLifecycle {
@Override
public void start() {
// cas設(shè)置運(yùn)行狀態(tài)為true
if (this.running.compareAndSet(false, true)) {
EventListener eventListener = listenerMap.computeIfAbsent(buildKey(),
event -> new EventListener() {
@Override
public void onEvent(Event event) {
if (event instanceof NamingEvent) {
List<Instance> instances = ((NamingEvent) event)
.getInstances();
Optional<Instance> instanceOptional = selectCurrentInstance(
instances);
instanceOptional.ifPresent(currentInstance -> {
resetIfNeeded(currentInstance);
});
}
}
});
// 獲取nacos server上最新的服務(wù)提供者們
NamingService namingService = nacosServiceManager
.getNamingService(properties.getNacosProperties());
try {
// 訂閱服務(wù) 并對(duì)每個(gè)服務(wù)都添加一個(gè)心跳檢測(cè)監(jiān)聽
namingService.subscribe(properties.getService(), properties.getGroup(),
Arrays.asList(properties.getClusterName()), eventListener);
}
catch (Exception e) {
log.error("namingService subscribe failed, properties:{}", properties, e);
}
// 延時(shí)執(zhí)行一個(gè)服務(wù)發(fā)現(xiàn)任務(wù)
this.watchFuture = this.taskScheduler.scheduleWithFixedDelay(
this::nacosServicesWatch, this.properties.getWatchDelay());
}
}
@Override
public void stop(Runnable callback) {
this.stop();
callback.run();
}
@Override
public void stop() {
// 設(shè)置運(yùn)行狀態(tài)為false 然后取消正在執(zhí)行的任務(wù)
if (this.running.compareAndSet(true, false)) {
if (this.watchFuture != null) {
// shutdown current user-thread,
// then the other daemon-threads will terminate automatic.
this.taskScheduler.shutdown();
this.watchFuture.cancel(true);
}
EventListener eventListener = listenerMap.get(buildKey());
try {
NamingService namingService = nacosServiceManager
.getNamingService(properties.getNacosProperties());
// 取消已經(jīng)下線的服務(wù)訂閱篮洁,發(fā)起取消訂閱操作并刪除訂閱監(jiān)聽
namingService.unsubscribe(properties.getService(), properties.getGroup(),
Arrays.asList(properties.getClusterName()), eventListener);
}
catch (Exception e) {
log.error("namingService unsubscribe failed, properties:{}", properties,
e);
}
}
}
public void nacosServicesWatch() {
// nacos doesn't support watch now , publish an event every 30 seconds.
// nacos不支持立即通知涩维,每30秒發(fā)布一個(gè)事件
this.publisher.publishEvent(
new HeartbeatEvent(this, nacosWatchIndex.getAndIncrement()));
}
}
大致流程:nacos client這邊在spring容器啟動(dòng)后執(zhí)行一個(gè)服務(wù)訂閱操作的延時(shí)任務(wù),這個(gè)任務(wù)執(zhí)行時(shí)先拉取nacos server那邊最新的服務(wù)列表嘀粱,然后與本地緩存的服務(wù)列表進(jìn)行比較激挪,取消訂閱下線的服務(wù),然后每隔30秒向nacos server發(fā)起訂閱操作锋叨,訂閱所有服務(wù)。
服務(wù)消費(fèi)者如何實(shí)時(shí)感知服務(wù)提供者的狀態(tài)信息呢宛篇?
- 1娃磺、服務(wù)消費(fèi)者訂閱后會(huì)執(zhí)行一個(gè)輪詢?nèi)蝿?wù)(每10s執(zhí)行一次)用來(lái)拉取最新的服務(wù)提供者信息并實(shí)時(shí)更新,實(shí)現(xiàn)在HostReactor中的UpdateTask完成,下面來(lái)看代碼
public class HostReactor implements Closeable {
public class UpdateTask implements Runnable {
long lastRefTime = Long.MAX_VALUE;
private final String clusters;
private final String serviceName;
/**
* the fail situation. 1:can't connect to server 2:serviceInfo's hosts is empty
*/
private int failCount = 0;
public UpdateTask(String serviceName, String clusters) {
this.serviceName = serviceName;
this.clusters = clusters;
}
private void incFailCount() {
int limit = 6;
if (failCount == limit) {
return;
}
failCount++;
}
private void resetFailCount() {
failCount = 0;
}
@Override
public void run() {
long delayTime = DEFAULT_DELAY;
try {
// 拿到當(dāng)前的服務(wù)信息
ServiceInfo serviceObj = serviceInfoMap.get(ServiceInfo.getKey(serviceName, clusters));
//如果為null,說明本地沒有叫倍,需要從服務(wù)端獲取
if (serviceObj == null) {
//拉取最新的服務(wù)列表隨后更新
updateService(serviceName, clusters);
return;
}
// 當(dāng)前服務(wù)未及時(shí)更新 進(jìn)行更新操作
//判斷服務(wù)是否已過期偷卧,當(dāng)前服務(wù)的最后一次更新時(shí)間 <= 全局的最后一次更新
if (serviceObj.getLastRefTime() <= lastRefTime) {
//調(diào)用updateService從服務(wù)端獲取地址列表,更新服務(wù)列表
updateService(serviceName, clusters);
serviceObj = serviceInfoMap.get(ServiceInfo.getKey(serviceName, clusters));
} else {
// if serviceName already updated by push, we should not override it
// since the push data may be different from pull through force push
//如果服務(wù)已經(jīng)被基于push機(jī)制的情況下做了更新吆倦,那么我們不需要覆蓋本地服務(wù)听诸。
//因?yàn)閜ush過來(lái)的數(shù)據(jù)和pull數(shù)據(jù)不同,所以這里只是調(diào)用請(qǐng)求去刷新服務(wù)
refreshOnly(serviceName, clusters);
}
// 設(shè)置服務(wù)最新的更新時(shí)間
lastRefTime = serviceObj.getLastRefTime();
// 訂閱被取消蚕泽,如果沒有實(shí)現(xiàn)訂閱或者futureMap中不包含指定服務(wù)信息晌梨,則中斷更新請(qǐng)求
if (!notifier.isSubscribed(serviceName, clusters) && !futureMap
.containsKey(ServiceInfo.getKey(serviceName, clusters))) {
// abort the update task
NAMING_LOGGER.info("update task is stopped, service:" + serviceName + ", clusters:" + clusters);
return;
}
if (CollectionUtils.isEmpty(serviceObj.getHosts())) {
incFailCount();
return;
}
delayTime = serviceObj.getCacheMillis();
resetFailCount();
} catch (Throwable e) {
incFailCount();
NAMING_LOGGER.warn("[NA] failed to update serviceName: " + serviceName, e);
} finally {
// 繼續(xù)下一次輪詢 延后10s執(zhí)行,實(shí)現(xiàn)重復(fù)輪詢
executor.schedule(this, Math.min(delayTime << failCount, DEFAULT_DELAY * 60), TimeUnit.MILLISECONDS);
}
}
}
}
- 2须妻、上面服務(wù)注冊(cè)時(shí)說過仔蝌,服務(wù)提供者注冊(cè)時(shí)nacos服務(wù)端也有一個(gè)相應(yīng)的心跳檢測(cè),當(dāng)心跳檢測(cè)超時(shí)也就是未及時(shí)收到服務(wù)提供者的心跳包荒吏,nacos server判定該服務(wù)狀態(tài)異常敛惊,隨后通過UDP推送服務(wù)信息用來(lái)告知對(duì)應(yīng)服務(wù)消費(fèi)者,服務(wù)消費(fèi)者通過PushReceiver來(lái)處理udp協(xié)議绰更,HostReactor.processServiceJson(String json)來(lái)更新本地服務(wù)列表瞧挤。
public class PushReceiver implements Runnable, Closeable {
private static final Charset UTF_8 = Charset.forName("UTF-8");
private static final int UDP_MSS = 64 * 1024;
private ScheduledExecutorService executorService;
private DatagramSocket udpSocket;
private HostReactor hostReactor;
private volatile boolean closed = false;
public PushReceiver(HostReactor hostReactor) {
try {
this.hostReactor = hostReactor;
String udpPort = getPushReceiverUdpPort();
if (StringUtils.isEmpty(udpPort)) {
this.udpSocket = new DatagramSocket();
} else {
this.udpSocket = new DatagramSocket(new InetSocketAddress(Integer.parseInt(udpPort)));
}
//開啟一個(gè)線程池锡宋,不斷接受服務(wù)端傳遞過來(lái)的數(shù)據(jù)
this.executorService = new ScheduledThreadPoolExecutor(1, new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
thread.setDaemon(true);
thread.setName("com.alibaba.nacos.naming.push.receiver");
return thread;
}
});
//調(diào)用run方法
this.executorService.execute(this);
} catch (Exception e) {
NAMING_LOGGER.error("[NA] init udp socket failed", e);
}
}
@Override
public void run() {
//通過while循環(huán)不斷監(jiān)聽客戶端Nacos Server傳遞過來(lái)的數(shù)據(jù),實(shí)現(xiàn)一個(gè)Push的機(jī)制
while (!closed) {
try {
// byte[] is initialized with 0 full filled by default
byte[] buffer = new byte[UDP_MSS];
DatagramPacket packet = new DatagramPacket(buffer, buffer.length);
//初始化一個(gè)監(jiān)聽特恬,不斷接受客戶端Nacos Server傳遞過來(lái)的數(shù)據(jù)
udpSocket.receive(packet);
String json = new String(IoUtils.tryDecompress(packet.getData()), UTF_8).trim();
NAMING_LOGGER.info("received push data: " + json + " from " + packet.getAddress().toString());
PushPacket pushPacket = JacksonUtils.toObj(json, PushPacket.class);
String ack;
if ("dom".equals(pushPacket.type) || "service".equals(pushPacket.type)) {
// 處理變更信息
hostReactor.processServiceJson(pushPacket.data);
// send ack to server
ack = "{\"type\": \"push-ack\"" + ", \"lastRefTime\":\"" + pushPacket.lastRefTime + "\", \"data\":"
+ "\"\"}";
} else if ("dump".equals(pushPacket.type)) {
// dump data to server
ack = "{\"type\": \"dump-ack\"" + ", \"lastRefTime\": \"" + pushPacket.lastRefTime + "\", \"data\":"
+ "\"" + StringUtils.escapeJavaScript(JacksonUtils.toJson(hostReactor.getServiceInfoMap()))
+ "\"}";
} else {
// do nothing send ack only
ack = "{\"type\": \"unknown-ack\"" + ", \"lastRefTime\":\"" + pushPacket.lastRefTime
+ "\", \"data\":" + "\"\"}";
}
udpSocket.send(new DatagramPacket(ack.getBytes(UTF_8), ack.getBytes(UTF_8).length,
packet.getSocketAddress()));
} catch (Exception e) {
if (closed) {
return;
}
NAMING_LOGGER.error("[NA] error while receiving push data", e);
}
}
}
}
參考:
https://www.cnblogs.com/zzz-blogs/p/14243912.html
https://blog.csdn.net/xingxinggua9620/article/details/113403062