Spring Cloud Alibaba——Nacos服務(wù)注冊(cè)原理

前言

再講Nacos之前型凳,先來(lái)講一下服務(wù)注冊(cè)和發(fā)現(xiàn)心俗。我們知道,現(xiàn)在微服務(wù)架構(gòu)是目前開發(fā)的一個(gè)趨勢(shì)回官。服務(wù)消費(fèi)者要去調(diào)用多個(gè)服務(wù)提供者組成的集群曹宴。這里需要做到以下幾點(diǎn):

  • 1、服務(wù)消費(fèi)者需要在本地配置文件中維護(hù)服務(wù)提供者集群的每個(gè)節(jié)點(diǎn)的請(qǐng)求地址歉提。

  • 2笛坦、服務(wù)提供者集群中如果某個(gè)節(jié)點(diǎn)宕機(jī)区转,服務(wù)消費(fèi)者的本地配置中需要同步刪除這個(gè)節(jié)點(diǎn)的請(qǐng)求地址,防止請(qǐng)求發(fā)送到已經(jīng)宕機(jī)的節(jié)點(diǎn)上造成請(qǐng)求失敗版扩。

因此需要引入服務(wù)注冊(cè)中心废离,它具有以下幾個(gè)功能:

  • 1、服務(wù)地址的管理礁芦。
  • 2蜻韭、服務(wù)注冊(cè)。
  • 3宴偿、服務(wù)動(dòng)態(tài)感知湘捎。

一、Nacos介紹

Nacos致力于解決微服務(wù)中的統(tǒng)一配置窄刘,服務(wù)注冊(cè)和發(fā)現(xiàn)等問題。Nacos集成了注冊(cè)中心和配置中心舷胜。其相關(guān)特性包括:

  • 1娩践、服務(wù)發(fā)現(xiàn)和服務(wù)健康監(jiān)測(cè)。

Nacos支持基于DNS和RPC的服務(wù)發(fā)現(xiàn)烹骨,即服務(wù)消費(fèi)者可以使用DNS或者HTTP的方式來(lái)查找和發(fā)現(xiàn)服務(wù)翻伺。
Nacos提供對(duì)服務(wù)的實(shí)時(shí)的健康檢查,阻止向不健康的主機(jī)或者服務(wù)實(shí)例發(fā)送請(qǐng)求沮焕。Nacos支持傳輸層(Ping/TCP)吨岭、應(yīng)用層(HTTP、Mysql)的健康檢查峦树。

  • 2辣辫、動(dòng)態(tài)配置服務(wù)。

動(dòng)態(tài)配置服務(wù)可以以中心化魁巩、外部化和動(dòng)態(tài)化的方式管理所有環(huán)境的應(yīng)用配置和服務(wù)配置急灭。

  • 3、動(dòng)態(tài)DNS服務(wù)谷遂。

支持權(quán)重路由葬馋,讓開發(fā)者更容易的實(shí)現(xiàn)中間層的負(fù)載均衡、更靈活的路由策略肾扰、流量控制以及DNS解析服務(wù)畴嘶。

  • 4盗痒、服務(wù)和元數(shù)據(jù)管理芭析。

Nacos允許開發(fā)者從微服務(wù)平臺(tái)建設(shè)的視角來(lái)管理數(shù)據(jù)中心的所有服務(wù)和元數(shù)據(jù)。如:服務(wù)的生命周期苔严、靜態(tài)依賴分析甩恼、服務(wù)的健康狀態(tài)蟀瞧、服務(wù)的流量管理沉颂、路由和安全策略等。

二悦污、Nacos注冊(cè)中心實(shí)現(xiàn)原理分析

2.1 Nacos架構(gòu)圖

以下是Nacos的架構(gòu)圖:


其中分為這么幾個(gè)模塊:

  • Provider APP:服務(wù)提供者铸屉。
  • Consumer APP:服務(wù)消費(fèi)者。
  • Name Server:通過Virtual IP或者DNS的方式實(shí)現(xiàn)Nacos高可用集群的服務(wù)路由切端。
  • Nacos Server:Nacos服務(wù)提供者彻坛。
  • Nacos Console:Nacos控制臺(tái)。

Nacos Server其中包含

  • OpenAPI:功能訪問入口踏枣。
  • Config Service昌屉、Naming Service:Nacos提供的配置服務(wù)、名字服務(wù)模塊茵瀑。
  • Consistency Protocol:一致性協(xié)議间驮,用來(lái)實(shí)現(xiàn)Nacos集群節(jié)點(diǎn)的數(shù)據(jù)同步,使用Raft算法實(shí)現(xiàn)马昨。

小總結(jié):

  • 服務(wù)提供者通過VIP(Virtual IP)訪問Nacos Server高可用集群竞帽,基于OpenAPI完成服務(wù)的注冊(cè)和服務(wù)的查詢。

  • Nacos Server的底層則通過數(shù)據(jù)一致性算法(Raft)來(lái)完成節(jié)點(diǎn)的數(shù)據(jù)同步鸿捧。

2.2 注冊(cè)中心的原理

這里對(duì)其原理做一個(gè)大致的介紹屹篓,在后文則從源碼角度進(jìn)行分析。

首先匙奴,服務(wù)注冊(cè)的功能體現(xiàn)在:

  • 服務(wù)實(shí)例啟動(dòng)時(shí)注冊(cè)到服務(wù)注冊(cè)表堆巧、關(guān)閉時(shí)則注銷(服務(wù)注冊(cè))。
  • 服務(wù)消費(fèi)者可以通過查詢服務(wù)注冊(cè)表來(lái)獲得可用的實(shí)例(服務(wù)發(fā)現(xiàn))泼菌。
  • 服務(wù)注冊(cè)中心需要調(diào)用服務(wù)實(shí)例的健康檢查API來(lái)驗(yàn)證其是否可以正確的處理請(qǐng)求(健康檢查)谍肤。

大致流程:每個(gè)服務(wù)都會(huì)有一個(gè)nacos client,它用來(lái)和nacos server打交道灶轰,用來(lái)具體的服務(wù)注冊(cè)谣沸、查詢等操作,服務(wù)提供者在啟動(dòng)的時(shí)候會(huì)向nacos server注冊(cè)自己笋颤,服務(wù)消費(fèi)者在啟動(dòng)的時(shí)候訂閱nacos server上的服務(wù)提供者乳附。

Nacos服務(wù)注冊(cè)和發(fā)現(xiàn)的實(shí)現(xiàn)原理的圖如下:


Nacos服務(wù)注冊(cè)原理

三、服務(wù)注冊(cè)

首先需要引入spring-cloud-starter-alibaba-nacos-discovery包

<parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>2.3.12.RELEASE</version>
    <relativePath/> <!-- lookup parent from repository -->
</parent>

<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    
    <dependency>
        <groupId>com.alibaba.cloud</groupId>
        <artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
    </dependency>
</dependencies>

<dependencyManagement>
    <dependencies>
        <dependency>
            <groupId>com.alibaba.cloud</groupId>
            <artifactId>spring-cloud-alibaba-dependencies</artifactId>
            <version>2.2.6.RELEASE</version>
            <type>pom</type>
            <scope>import</scope>
        </dependency>
    </dependencies>
</dependencyManagement> 
  • 根據(jù)spring.factories配置來(lái)完成相關(guān)類的自動(dòng)注冊(cè)伴澄。


  • 重點(diǎn)來(lái)看這幾個(gè)類赋除,看名稱可猜到是用來(lái)服務(wù)注冊(cè)的,NacosServiceRegistryAutoConfiguration用來(lái)注冊(cè)管理這幾個(gè)bean非凌。

@Configuration(proxyBeanMethods = false)
@EnableConfigurationProperties
@ConditionalOnNacosDiscoveryEnabled
@ConditionalOnProperty(value = "spring.cloud.service-registry.auto-registration.enabled",
        matchIfMissing = true)
@AutoConfigureAfter({ AutoServiceRegistrationConfiguration.class,
        AutoServiceRegistrationAutoConfiguration.class,
        NacosDiscoveryAutoConfiguration.class })
public class NacosServiceRegistryAutoConfiguration {

    @Bean
    public NacosServiceRegistry nacosServiceRegistry(
            NacosDiscoveryProperties nacosDiscoveryProperties) {
        return new NacosServiceRegistry(nacosDiscoveryProperties);
    }

    @Bean
    @ConditionalOnBean(AutoServiceRegistrationProperties.class)
    public NacosRegistration nacosRegistration(
            ObjectProvider<List<NacosRegistrationCustomizer>> registrationCustomizers,
            NacosDiscoveryProperties nacosDiscoveryProperties,
            ApplicationContext context) {
        return new NacosRegistration(registrationCustomizers.getIfAvailable(),
                nacosDiscoveryProperties, context);
    }

    @Bean
    @ConditionalOnBean(AutoServiceRegistrationProperties.class)
    public NacosAutoServiceRegistration nacosAutoServiceRegistration(
            NacosServiceRegistry registry,
            AutoServiceRegistrationProperties autoServiceRegistrationProperties,
            NacosRegistration registration) {
        return new NacosAutoServiceRegistration(registry,
                autoServiceRegistrationProperties, registration);
    }

}
  • NacosServiceRegistry:完成服務(wù)注冊(cè)举农,實(shí)現(xiàn)ServiceRegistry。

  • NacosRegistration:用來(lái)注冊(cè)時(shí)存儲(chǔ)nacos服務(wù)端的相關(guān)信息敞嗡。

  • NacosAutoServiceRegistration 繼承spring中的AbstractAutoServiceRegistration颁糟,AbstractAutoServiceRegistration實(shí)現(xiàn)ApplicationListener<WebServerInitializedEvent>航背,通過事件監(jiān)聽來(lái)發(fā)起服務(wù)注冊(cè),到時(shí)候會(huì)調(diào)用NacosServiceRegistry.register(registration)

來(lái)看具體如何注冊(cè)

/*************************************************NacosServiceRegistry**************************************************/
public class NacosServiceRegistry implements ServiceRegistry<Registration> {

    @Override
    public void register(Registration registration) {

        if (StringUtils.isEmpty(registration.getServiceId())) {
            log.warn("No service to register for nacos client...");
            return;
        }

        NamingService namingService = namingService();
        String serviceId = registration.getServiceId();
        String group = nacosDiscoveryProperties.getGroup();

        Instance instance = getNacosInstanceFromRegistration(registration);

        try {
            namingService.registerInstance(serviceId, group, instance);
            log.info("nacos registry, {} {} {}:{} register finished", group, serviceId,
                    instance.getIp(), instance.getPort());
        }
        catch (Exception e) {
            if (nacosDiscoveryProperties.isFailFast()) {
                log.error("nacos registry, {} register failed...{},", serviceId,
                        registration.toString(), e);
                rethrowRuntimeException(e);
            }
            else {
                log.warn("Failfast is false. {} register failed...{},", serviceId,
                        registration.toString(), e);
            }
        }
    }
}


/**************************************************NacosNamingService************************************************/
public class NacosNamingService implements NamingService {

    @Override
    public void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException {
        NamingUtils.checkInstanceIsLegal(instance);
        String groupedServiceName = NamingUtils.getGroupedName(serviceName, groupName);
        if (instance.isEphemeral()) {
            BeatInfo beatInfo = beatReactor.buildBeatInfo(groupedServiceName, instance);
            // 添加心跳檢測(cè)
            beatReactor.addBeatInfo(groupedServiceName, beatInfo);
        }
        // 完成服務(wù)注冊(cè)
        serverProxy.registerService(groupedServiceName, groupName, instance);
    }
}


/***************************************************BeatReactor***************************************************/
public class BeatReactor implements Closeable {

    private final ScheduledExecutorService executorService;
    
    public final Map<String, BeatInfo> dom2Beat = new ConcurrentHashMap<String, BeatInfo>();
    
    public void addBeatInfo(String serviceName, BeatInfo beatInfo) {
        NAMING_LOGGER.info("[BEAT] adding beat: {} to beat map.", beatInfo);
        String key = buildKey(serviceName, beatInfo.getIp(), beatInfo.getPort());
        BeatInfo existBeat = null;
        //fix #1733
        if ((existBeat = dom2Beat.remove(key)) != null) {
            existBeat.setStopped(true);
        }
        dom2Beat.put(key, beatInfo);
        // 發(fā)起一個(gè)心跳檢測(cè)任務(wù)
        executorService.schedule(new BeatTask(beatInfo), beatInfo.getPeriod(), TimeUnit.MILLISECONDS);
        MetricsMonitor.getDom2BeatSizeMonitor().set(dom2Beat.size());
    }
    
    
/******************************************************BeatTask******************************************************/  
    class BeatTask implements Runnable {
        @Override
        public void run() {
            if (beatInfo.isStopped()) {
                return;
            }
            long nextTime = beatInfo.getPeriod();
            try {
                // 向nacos服務(wù)發(fā)起心跳檢測(cè)
                JsonNode result = serverProxy.sendBeat(beatInfo, BeatReactor.this.lightBeatEnabled);
                long interval = result.get("clientBeatInterval").asLong();
                boolean lightBeatEnabled = false;
                if (result.has(CommonParams.LIGHT_BEAT_ENABLED)) {
                    lightBeatEnabled = result.get(CommonParams.LIGHT_BEAT_ENABLED).asBoolean();
                }
                BeatReactor.this.lightBeatEnabled = lightBeatEnabled;
                if (interval > 0) {
                    nextTime = interval;
                }
                int code = NamingResponseCode.OK;
                if (result.has(CommonParams.CODE)) {
                    code = result.get(CommonParams.CODE).asInt();
                }
                if (code == NamingResponseCode.RESOURCE_NOT_FOUND) {
                    Instance instance = new Instance();
                    instance.setPort(beatInfo.getPort());
                    instance.setIp(beatInfo.getIp());
                    instance.setWeight(beatInfo.getWeight());
                    instance.setMetadata(beatInfo.getMetadata());
                    instance.setClusterName(beatInfo.getCluster());
                    instance.setServiceName(beatInfo.getServiceName());
                    instance.setInstanceId(instance.getInstanceId());
                    instance.setEphemeral(true);
                    try {
                        // 未注冊(cè) 先完成注冊(cè)
                        serverProxy.registerService(beatInfo.getServiceName(),
                                NamingUtils.getGroupName(beatInfo.getServiceName()), instance);
                    } catch (Exception ignore) {
                    }
                }
            } catch (NacosException ex) {
                NAMING_LOGGER.error("[CLIENT-BEAT] failed to send beat: {}, code: {}, msg: {}",
                        JacksonUtils.toJson(beatInfo), ex.getErrCode(), ex.getErrMsg());
    
            } catch (Exception unknownEx) {
                NAMING_LOGGER.error("[CLIENT-BEAT] failed to send beat: {}, unknown exception msg: {}",
                        JacksonUtils.toJson(beatInfo), unknownEx.getMessage(), unknownEx);
            } finally {
                // 發(fā)起下一次心跳檢測(cè)
                executorService.schedule(new BeatTask(beatInfo), nextTime, TimeUnit.MILLISECONDS);
            }
        }
    }       
}   

服務(wù)提供者向nacos server發(fā)起服務(wù)注冊(cè)前棱貌,先向nacos server建立起心跳檢測(cè)機(jī)制玖媚,nacos server那邊也有一個(gè)心跳檢測(cè),服務(wù)提供者不停的向nacos server發(fā)起心跳檢測(cè)婚脱,告知自己的健康狀態(tài)今魔,nacos server發(fā)現(xiàn)該服務(wù)心跳檢測(cè)時(shí)間超時(shí)會(huì)發(fā)布超時(shí)事件來(lái)告知服務(wù)消費(fèi)者。

服務(wù)發(fā)現(xiàn)

服務(wù)發(fā)現(xiàn)由NacosWatch完成障贸,它實(shí)現(xiàn)了Spring的Lifecycle接口错森,容器啟動(dòng)和銷毀時(shí)會(huì)調(diào)用對(duì)應(yīng)的start()和stop()方法。

來(lái)看對(duì)應(yīng)源碼

public class NacosWatch implements ApplicationEventPublisherAware, SmartLifecycle {

    @Override
    public void start() {
        // cas設(shè)置運(yùn)行狀態(tài)為true
        if (this.running.compareAndSet(false, true)) {
            EventListener eventListener = listenerMap.computeIfAbsent(buildKey(),
                    event -> new EventListener() {
                        @Override
                        public void onEvent(Event event) {
                            if (event instanceof NamingEvent) {
                                List<Instance> instances = ((NamingEvent) event)
                                        .getInstances();
                                Optional<Instance> instanceOptional = selectCurrentInstance(
                                        instances);
                                instanceOptional.ifPresent(currentInstance -> {
                                    resetIfNeeded(currentInstance);
                                });
                            }
                        }
                    });
            // 獲取nacos server上最新的服務(wù)提供者們
            NamingService namingService = nacosServiceManager
                    .getNamingService(properties.getNacosProperties());
            try {
                // 訂閱服務(wù) 并對(duì)每個(gè)服務(wù)都添加一個(gè)心跳檢測(cè)監(jiān)聽
                namingService.subscribe(properties.getService(), properties.getGroup(),
                        Arrays.asList(properties.getClusterName()), eventListener);
            }
            catch (Exception e) {
                log.error("namingService subscribe failed, properties:{}", properties, e);
            }

             // 延時(shí)執(zhí)行一個(gè)服務(wù)發(fā)現(xiàn)任務(wù)
            this.watchFuture = this.taskScheduler.scheduleWithFixedDelay(
                    this::nacosServicesWatch, this.properties.getWatchDelay());
        }
    }
    
    @Override
    public void stop(Runnable callback) {
        this.stop();
        callback.run();
    }

    @Override
    public void stop() {
        // 設(shè)置運(yùn)行狀態(tài)為false 然后取消正在執(zhí)行的任務(wù)
        if (this.running.compareAndSet(true, false)) {
            if (this.watchFuture != null) {
                // shutdown current user-thread,
                // then the other daemon-threads will terminate automatic.
                this.taskScheduler.shutdown();
                this.watchFuture.cancel(true);
            }

            EventListener eventListener = listenerMap.get(buildKey());
            try {
                NamingService namingService = nacosServiceManager
                        .getNamingService(properties.getNacosProperties());
                        
                // 取消已經(jīng)下線的服務(wù)訂閱篮洁,發(fā)起取消訂閱操作并刪除訂閱監(jiān)聽      
                namingService.unsubscribe(properties.getService(), properties.getGroup(),
                        Arrays.asList(properties.getClusterName()), eventListener);
            }
            catch (Exception e) {
                log.error("namingService unsubscribe failed, properties:{}", properties,
                        e);
            }
        }
    }
    
    public void nacosServicesWatch() {

        // nacos doesn't support watch now , publish an event every 30 seconds.
        // nacos不支持立即通知涩维,每30秒發(fā)布一個(gè)事件
        this.publisher.publishEvent(
                new HeartbeatEvent(this, nacosWatchIndex.getAndIncrement()));

    }   
    
}

大致流程:nacos client這邊在spring容器啟動(dòng)后執(zhí)行一個(gè)服務(wù)訂閱操作的延時(shí)任務(wù),這個(gè)任務(wù)執(zhí)行時(shí)先拉取nacos server那邊最新的服務(wù)列表嘀粱,然后與本地緩存的服務(wù)列表進(jìn)行比較激挪,取消訂閱下線的服務(wù),然后每隔30秒向nacos server發(fā)起訂閱操作锋叨,訂閱所有服務(wù)。

服務(wù)消費(fèi)者如何實(shí)時(shí)感知服務(wù)提供者的狀態(tài)信息呢宛篇?

  • 1娃磺、服務(wù)消費(fèi)者訂閱后會(huì)執(zhí)行一個(gè)輪詢?nèi)蝿?wù)(每10s執(zhí)行一次)用來(lái)拉取最新的服務(wù)提供者信息并實(shí)時(shí)更新,實(shí)現(xiàn)在HostReactor中的UpdateTask完成,下面來(lái)看代碼
public class HostReactor implements Closeable {

    public class UpdateTask implements Runnable {
        
        long lastRefTime = Long.MAX_VALUE;
        
        private final String clusters;
        
        private final String serviceName;
        
        /**
         * the fail situation. 1:can't connect to server 2:serviceInfo's hosts is empty
         */
        private int failCount = 0;
        
        public UpdateTask(String serviceName, String clusters) {
            this.serviceName = serviceName;
            this.clusters = clusters;
        }
        private void incFailCount() {
            int limit = 6;
            if (failCount == limit) {
                return;
            }
            failCount++;
        }
        
        private void resetFailCount() {
            failCount = 0;
        }
        
        @Override
        public void run() {
            long delayTime = DEFAULT_DELAY;
            
            try {
                // 拿到當(dāng)前的服務(wù)信息
                ServiceInfo serviceObj = serviceInfoMap.get(ServiceInfo.getKey(serviceName, clusters));
                
                //如果為null,說明本地沒有叫倍,需要從服務(wù)端獲取
                if (serviceObj == null) {
                    //拉取最新的服務(wù)列表隨后更新
                    updateService(serviceName, clusters);
                    return;
                }
                
                // 當(dāng)前服務(wù)未及時(shí)更新 進(jìn)行更新操作
                //判斷服務(wù)是否已過期偷卧,當(dāng)前服務(wù)的最后一次更新時(shí)間 <= 全局的最后一次更新
                if (serviceObj.getLastRefTime() <= lastRefTime) {
                    //調(diào)用updateService從服務(wù)端獲取地址列表,更新服務(wù)列表
                    updateService(serviceName, clusters);
                    serviceObj = serviceInfoMap.get(ServiceInfo.getKey(serviceName, clusters));
                } else {
                    // if serviceName already updated by push, we should not override it
                    // since the push data may be different from pull through force push
                    //如果服務(wù)已經(jīng)被基于push機(jī)制的情況下做了更新吆倦,那么我們不需要覆蓋本地服務(wù)听诸。            
                    //因?yàn)閜ush過來(lái)的數(shù)據(jù)和pull數(shù)據(jù)不同,所以這里只是調(diào)用請(qǐng)求去刷新服務(wù) 
                    refreshOnly(serviceName, clusters);
                }
                
                // 設(shè)置服務(wù)最新的更新時(shí)間
                lastRefTime = serviceObj.getLastRefTime();
                // 訂閱被取消蚕泽,如果沒有實(shí)現(xiàn)訂閱或者futureMap中不包含指定服務(wù)信息晌梨,則中斷更新請(qǐng)求
                if (!notifier.isSubscribed(serviceName, clusters) && !futureMap
                        .containsKey(ServiceInfo.getKey(serviceName, clusters))) {
                    // abort the update task
                    NAMING_LOGGER.info("update task is stopped, service:" + serviceName + ", clusters:" + clusters);
                    return;
                }
                if (CollectionUtils.isEmpty(serviceObj.getHosts())) {
                    incFailCount();
                    return;
                }
                delayTime = serviceObj.getCacheMillis();
                resetFailCount();
            } catch (Throwable e) {
                incFailCount();
                NAMING_LOGGER.warn("[NA] failed to update serviceName: " + serviceName, e);
            } finally {
                // 繼續(xù)下一次輪詢 延后10s執(zhí)行,實(shí)現(xiàn)重復(fù)輪詢 
                executor.schedule(this, Math.min(delayTime << failCount, DEFAULT_DELAY * 60), TimeUnit.MILLISECONDS);
            }
        }
    }
}       
  • 2须妻、上面服務(wù)注冊(cè)時(shí)說過仔蝌,服務(wù)提供者注冊(cè)時(shí)nacos服務(wù)端也有一個(gè)相應(yīng)的心跳檢測(cè),當(dāng)心跳檢測(cè)超時(shí)也就是未及時(shí)收到服務(wù)提供者的心跳包荒吏,nacos server判定該服務(wù)狀態(tài)異常敛惊,隨后通過UDP推送服務(wù)信息用來(lái)告知對(duì)應(yīng)服務(wù)消費(fèi)者,服務(wù)消費(fèi)者通過PushReceiver來(lái)處理udp協(xié)議绰更,HostReactor.processServiceJson(String json)來(lái)更新本地服務(wù)列表瞧挤。
public class PushReceiver implements Runnable, Closeable {

    private static final Charset UTF_8 = Charset.forName("UTF-8");
    
    private static final int UDP_MSS = 64 * 1024;
    
    private ScheduledExecutorService executorService;
    
    private DatagramSocket udpSocket;
    
    private HostReactor hostReactor;
    
    private volatile boolean closed = false;
    
    public PushReceiver(HostReactor hostReactor) {
        try {
            this.hostReactor = hostReactor;
            String udpPort = getPushReceiverUdpPort();
            if (StringUtils.isEmpty(udpPort)) {
                this.udpSocket = new DatagramSocket();
            } else {
                this.udpSocket = new DatagramSocket(new InetSocketAddress(Integer.parseInt(udpPort)));
            }
            //開啟一個(gè)線程池锡宋,不斷接受服務(wù)端傳遞過來(lái)的數(shù)據(jù)
            this.executorService = new ScheduledThreadPoolExecutor(1, new ThreadFactory() {
                @Override
                public Thread newThread(Runnable r) {
                    Thread thread = new Thread(r);
                    thread.setDaemon(true);
                    thread.setName("com.alibaba.nacos.naming.push.receiver");
                    return thread;
                }
            });
            
            //調(diào)用run方法
            this.executorService.execute(this);
        } catch (Exception e) {
            NAMING_LOGGER.error("[NA] init udp socket failed", e);
        }
    }   

    @Override
    public void run() {
        //通過while循環(huán)不斷監(jiān)聽客戶端Nacos Server傳遞過來(lái)的數(shù)據(jù),實(shí)現(xiàn)一個(gè)Push的機(jī)制
        while (!closed) {
            try {
                
                // byte[] is initialized with 0 full filled by default
                byte[] buffer = new byte[UDP_MSS];
                DatagramPacket packet = new DatagramPacket(buffer, buffer.length);
                //初始化一個(gè)監(jiān)聽特恬,不斷接受客戶端Nacos Server傳遞過來(lái)的數(shù)據(jù)
                udpSocket.receive(packet);
                
                String json = new String(IoUtils.tryDecompress(packet.getData()), UTF_8).trim();
                NAMING_LOGGER.info("received push data: " + json + " from " + packet.getAddress().toString());
                
                PushPacket pushPacket = JacksonUtils.toObj(json, PushPacket.class);
                String ack;
                if ("dom".equals(pushPacket.type) || "service".equals(pushPacket.type)) {
                    // 處理變更信息
                    hostReactor.processServiceJson(pushPacket.data);
                    
                    // send ack to server
                    ack = "{\"type\": \"push-ack\"" + ", \"lastRefTime\":\"" + pushPacket.lastRefTime + "\", \"data\":"
                            + "\"\"}";
                } else if ("dump".equals(pushPacket.type)) {
                    // dump data to server
                    ack = "{\"type\": \"dump-ack\"" + ", \"lastRefTime\": \"" + pushPacket.lastRefTime + "\", \"data\":"
                            + "\"" + StringUtils.escapeJavaScript(JacksonUtils.toJson(hostReactor.getServiceInfoMap()))
                            + "\"}";
                } else {
                    // do nothing send ack only
                    ack = "{\"type\": \"unknown-ack\"" + ", \"lastRefTime\":\"" + pushPacket.lastRefTime
                            + "\", \"data\":" + "\"\"}";
                }
                
                udpSocket.send(new DatagramPacket(ack.getBytes(UTF_8), ack.getBytes(UTF_8).length,
                        packet.getSocketAddress()));
            } catch (Exception e) {
                if (closed) {
                    return;
                }
                NAMING_LOGGER.error("[NA] error while receiving push data", e);
            }
        }
    }
}

參考:
https://www.cnblogs.com/zzz-blogs/p/14243912.html

https://blog.csdn.net/xingxinggua9620/article/details/113403062

https://blog.csdn.net/Zong_0915/article/details/113001226

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末执俩,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子鸵鸥,更是在濱河造成了極大的恐慌奠滑,老刑警劉巖,帶你破解...
    沈念sama閱讀 211,194評(píng)論 6 490
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件妒穴,死亡現(xiàn)場(chǎng)離奇詭異宋税,居然都是意外死亡,警方通過查閱死者的電腦和手機(jī)讼油,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,058評(píng)論 2 385
  • 文/潘曉璐 我一進(jìn)店門杰赛,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái),“玉大人矮台,你說我怎么就攤上這事乏屯。” “怎么了瘦赫?”我有些...
    開封第一講書人閱讀 156,780評(píng)論 0 346
  • 文/不壞的土叔 我叫張陵辰晕,是天一觀的道長(zhǎng)。 經(jīng)常有香客問我确虱,道長(zhǎng)含友,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 56,388評(píng)論 1 283
  • 正文 為了忘掉前任校辩,我火速辦了婚禮窘问,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘宜咒。我一直安慰自己惠赫,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 65,430評(píng)論 5 384
  • 文/花漫 我一把揭開白布故黑。 她就那樣靜靜地躺著儿咱,像睡著了一般。 火紅的嫁衣襯著肌膚如雪倍阐。 梳的紋絲不亂的頭發(fā)上概疆,一...
    開封第一講書人閱讀 49,764評(píng)論 1 290
  • 那天,我揣著相機(jī)與錄音峰搪,去河邊找鬼岔冀。 笑死,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的使套。 我是一名探鬼主播罐呼,決...
    沈念sama閱讀 38,907評(píng)論 3 406
  • 文/蒼蘭香墨 我猛地睜開眼,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼侦高!你這毒婦竟也來(lái)了嫉柴?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 37,679評(píng)論 0 266
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤奉呛,失蹤者是張志新(化名)和其女友劉穎计螺,沒想到半個(gè)月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體瞧壮,經(jīng)...
    沈念sama閱讀 44,122評(píng)論 1 303
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡登馒,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,459評(píng)論 2 325
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了咆槽。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片陈轿。...
    茶點(diǎn)故事閱讀 38,605評(píng)論 1 340
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖秦忿,靈堂內(nèi)的尸體忽然破棺而出麦射,到底是詐尸還是另有隱情,我是刑警寧澤灯谣,帶...
    沈念sama閱讀 34,270評(píng)論 4 329
  • 正文 年R本政府宣布潜秋,位于F島的核電站,受9級(jí)特大地震影響胎许,放射性物質(zhì)發(fā)生泄漏半等。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,867評(píng)論 3 312
  • 文/蒙蒙 一呐萨、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧莽囤,春花似錦谬擦、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,734評(píng)論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)。三九已至话肖,卻和暖如春北秽,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背最筒。 一陣腳步聲響...
    開封第一講書人閱讀 31,961評(píng)論 1 265
  • 我被黑心中介騙來(lái)泰國(guó)打工贺氓, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人床蜘。 一個(gè)月前我還...
    沈念sama閱讀 46,297評(píng)論 2 360
  • 正文 我出身青樓辙培,卻偏偏與公主長(zhǎng)得像蔑水,于是被迫代替她去往敵國(guó)和親。 傳聞我的和親對(duì)象是個(gè)殘疾皇子扬蕊,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 43,472評(píng)論 2 348

推薦閱讀更多精彩內(nèi)容