Eureka Client源碼筆記

Spring官網(wǎng)的Eureka Client使用教程

一 Maven依賴

對(duì)應(yīng)的版本是2.1.3.RELEASE
默認(rèn)已成功搭建Eureka Client客戶端和Eureka Server注冊中心

<dependencyManagement>
    <dependencies>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-dependencies</artifactId>
            <version>Greenwich.SR3</version>
            <type>pom</type>
            <scope>import</scope>
        </dependency>
    </dependencies>
</dependencyManagement>

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
</dependency>

二 自動(dòng)配置類

EurekaClientAutoConfiguration是Eureka Client客戶端中的自動(dòng)配置類,列舉出其中部分比較重要的Bean

  • EurekaClientConfigBean
  • EurekaInstanceConfigBean
  • ApplicationInfoManager
  • EurekaClient

三 Bean釋義

  • EurekaClientConfigBean
    當(dāng)前服務(wù)作為Eureka Client客戶端的配置信息
    @Bean
    @ConditionalOnMissingBean(value = EurekaClientConfig.class, search = SearchStrategy.CURRENT)
    public EurekaClientConfigBean eurekaClientConfigBean(ConfigurableEnvironment env) {
        EurekaClientConfigBean client = new EurekaClientConfigBean();
        if ("bootstrap".equals(this.env.getProperty("spring.config.name"))) {
            // We don't register during bootstrap by default, but there will be another
            // chance later.
            client.setRegisterWithEureka(false);
        }
        return client;
    }
  • EurekaInstanceConfigBean
    當(dāng)前服務(wù)作為Eureka Client客戶端實(shí)例的原生配置信息。可以看到嘗試從yaml里取各種配置信息景埃,例如服務(wù)的context-path矩乐、業(yè)務(wù)ip和端口摔认、管理ip和端口原押,以及設(shè)置其他信息哈恰。
    @Bean
    @ConditionalOnMissingBean(value = EurekaInstanceConfig.class, search = SearchStrategy.CURRENT)
    public EurekaInstanceConfigBean eurekaInstanceConfigBean(InetUtils inetUtils,
            ManagementMetadataProvider managementMetadataProvider) {
        String hostname = getProperty("eureka.instance.hostname");
        boolean preferIpAddress = Boolean
                .parseBoolean(getProperty("eureka.instance.prefer-ip-address"));
        String ipAddress = getProperty("eureka.instance.ip-address");
        boolean isSecurePortEnabled = Boolean
                .parseBoolean(getProperty("eureka.instance.secure-port-enabled"));

        String serverContextPath = env.getProperty("server.servlet.context-path", "/");
        int serverPort = Integer
                .valueOf(env.getProperty("server.port", env.getProperty("port", "8080")));

        Integer managementPort = env.getProperty("management.server.port", Integer.class); // nullable.
        // should
        // be
        // wrapped
        // into
        // optional
        String managementContextPath = env
                .getProperty("management.server.servlet.context-path"); // nullable.
                                                                        // should
        // be wrapped into
        // optional
        Integer jmxPort = env.getProperty("com.sun.management.jmxremote.port",
                Integer.class); // nullable
        EurekaInstanceConfigBean instance = new EurekaInstanceConfigBean(inetUtils);

        instance.setNonSecurePort(serverPort);
        instance.setInstanceId(getDefaultInstanceId(env));
        instance.setPreferIpAddress(preferIpAddress);
        instance.setSecurePortEnabled(isSecurePortEnabled);
        if (StringUtils.hasText(ipAddress)) {
            instance.setIpAddress(ipAddress);
        }

        if (isSecurePortEnabled) {
            instance.setSecurePort(serverPort);
        }

        if (StringUtils.hasText(hostname)) {
            instance.setHostname(hostname);
        }
        String statusPageUrlPath = getProperty("eureka.instance.status-page-url-path");
        String healthCheckUrlPath = getProperty("eureka.instance.health-check-url-path");

        if (StringUtils.hasText(statusPageUrlPath)) {
            instance.setStatusPageUrlPath(statusPageUrlPath);
        }
        if (StringUtils.hasText(healthCheckUrlPath)) {
            instance.setHealthCheckUrlPath(healthCheckUrlPath);
        }

        ManagementMetadata metadata = managementMetadataProvider.get(instance, serverPort,
                serverContextPath, managementContextPath, managementPort);

        if (metadata != null) {
            instance.setStatusPageUrl(metadata.getStatusPageUrl());
            instance.setHealthCheckUrl(metadata.getHealthCheckUrl());
            if (instance.isSecurePortEnabled()) {
                instance.setSecureHealthCheckUrl(metadata.getSecureHealthCheckUrl());
            }
            Map<String, String> metadataMap = instance.getMetadataMap();
            metadataMap.computeIfAbsent("management.port",
                    k -> String.valueOf(metadata.getManagementPort()));
        }
        else {
            // without the metadata the status and health check URLs will not be set
            // and the status page and health check url paths will not include the
            // context path so set them here
            if (StringUtils.hasText(managementContextPath)) {
                instance.setHealthCheckUrlPath(
                        managementContextPath + instance.getHealthCheckUrlPath());
                instance.setStatusPageUrlPath(
                        managementContextPath + instance.getStatusPageUrlPath());
            }
        }

        setupJmxPort(instance, jmxPort);
        return instance;
    }
  • ApplicationInfoManager
    EurekaClientAutoConfiguration的內(nèi)部配置類RefreshableEurekaClientConfiguration內(nèi)配置的Bean派哲。InstanceInfo作為服務(wù)實(shí)例信息臼氨,由原生配置屬性轉(zhuǎn)化而來(最終會(huì)用以注冊到Eureka Server注冊中心)“沤欤客戶端配置信息與服務(wù)實(shí)例信息再封裝為ApplicationInfoManager储矩。
        @Bean
        @ConditionalOnMissingBean(value = ApplicationInfoManager.class, search = SearchStrategy.CURRENT)
        @org.springframework.cloud.context.config.annotation.RefreshScope
        @Lazy
        public ApplicationInfoManager eurekaApplicationInfoManager(
                EurekaInstanceConfig config) {
            InstanceInfo instanceInfo = new InstanceInfoFactory().create(config);
            return new ApplicationInfoManager(config, instanceInfo);
        }
  • EurekaClient
    EurekaClient接口只定義了Eureka Client客戶端的簡單行為,其實(shí)現(xiàn)類DiscoveryClient作為核心類來負(fù)責(zé)與Eureka Server注冊中心的交互褂乍,CloudEurekaClient繼承自DiscoveryClient持隧。
        @Bean(destroyMethod = "shutdown")
        @ConditionalOnMissingBean(value = EurekaClient.class, search = SearchStrategy.CURRENT)
        @org.springframework.cloud.context.config.annotation.RefreshScope
        @Lazy
        public EurekaClient eurekaClient(ApplicationInfoManager manager,
                EurekaClientConfig config, EurekaInstanceConfig instance,
                @Autowired(required = false) HealthCheckHandler healthCheckHandler) {
            // If we use the proxy of the ApplicationInfoManager we could run into a
            // problem
            // when shutdown is called on the CloudEurekaClient where the
            // ApplicationInfoManager bean is
            // requested but wont be allowed because we are shutting down. To avoid this
            // we use the
            // object directly.
            ApplicationInfoManager appManager;
            if (AopUtils.isAopProxy(manager)) {
                appManager = ProxyUtils.getTargetObject(manager);
            }
            else {
                appManager = manager;
            }
            CloudEurekaClient cloudEurekaClient = new CloudEurekaClient(appManager,
                    config, this.optionalArgs, this.context);
            cloudEurekaClient.registerHealthCheck(healthCheckHandler);
            return cloudEurekaClient;
        }

四 Eureka Client客戶端啟動(dòng)與注冊中心的交互

DiscoveryClient作為EurekaClient的實(shí)現(xiàn)類,是開啟Eureka Client客戶端與Eureka Server注冊中心交互的入口逃片。上面托管給容器時(shí)子類CloudEurekaClient在其構(gòu)造器中調(diào)用了父類DiscoveryClient的構(gòu)造方法屡拨,調(diào)用到的核心構(gòu)造方法如下,限于篇幅只展示構(gòu)造器中的部分核心代碼塊。

  1. 在try代碼塊中分別創(chuàng)建了一個(gè)Schedule線程池和兩個(gè)普通線程池呀狼。
  2. 根據(jù)后面兩個(gè)線程池名稱可以看出其用途分別為心跳檢測和緩存裂允。
  3. 最后執(zhí)行initScheduledTasks方法
    @Inject
    DiscoveryClient(ApplicationInfoManager applicationInfoManager, EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args,
                    Provider<BackupRegistry> backupRegistryProvider, EndpointRandomizer endpointRandomizer) {

        // 有省略部分代碼

        try {
            // default size of 2 - 1 each for heartbeat and cacheRefresh
            scheduler = Executors.newScheduledThreadPool(2,
                    new ThreadFactoryBuilder()
                            .setNameFormat("DiscoveryClient-%d")
                            .setDaemon(true)
                            .build());

            heartbeatExecutor = new ThreadPoolExecutor(
                    1, clientConfig.getHeartbeatExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,
                    new SynchronousQueue<Runnable>(),
                    new ThreadFactoryBuilder()
                            .setNameFormat("DiscoveryClient-HeartbeatExecutor-%d")
                            .setDaemon(true)
                            .build()
            );  // use direct handoff

            cacheRefreshExecutor = new ThreadPoolExecutor(
                    1, clientConfig.getCacheRefreshExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,
                    new SynchronousQueue<Runnable>(),
                    new ThreadFactoryBuilder()
                            .setNameFormat("DiscoveryClient-CacheRefreshExecutor-%d")
                            .setDaemon(true)
                            .build()
            );  // use direct handoff

            eurekaTransport = new EurekaTransport();
            scheduleServerEndpointTask(eurekaTransport, args);

            AzToRegionMapper azToRegionMapper;
            if (clientConfig.shouldUseDnsForFetchingServiceUrls()) {
                azToRegionMapper = new DNSBasedAzToRegionMapper(clientConfig);
            } else {
                azToRegionMapper = new PropertyBasedAzToRegionMapper(clientConfig);
            }
            if (null != remoteRegionsToFetch.get()) {
                azToRegionMapper.setRegionsToFetch(remoteRegionsToFetch.get().split(","));
            }
            instanceRegionChecker = new InstanceRegionChecker(azToRegionMapper, clientConfig.getRegion());
        } catch (Throwable e) {
            throw new RuntimeException("Failed to initialize DiscoveryClient!", e);
        }

        if (clientConfig.shouldFetchRegistry() && !fetchRegistry(false)) {
            fetchRegistryFromBackup();
        }

        // call and execute the pre registration handler before all background tasks (inc registration) is started
        if (this.preRegistrationHandler != null) {
            this.preRegistrationHandler.beforeRegistration();
        }

        if (clientConfig.shouldRegisterWithEureka() && clientConfig.shouldEnforceRegistrationAtInit()) {
            try {
                if (!register() ) {
                    throw new IllegalStateException("Registration error at startup. Invalid server response.");
                }
            } catch (Throwable th) {
                logger.error("Registration error at startup: {}", th.getMessage());
                throw new IllegalStateException(th);
            }
        }

        // finally, init the schedule tasks (e.g. cluster resolvers, heartbeat, instanceInfo replicator, fetch
        initScheduledTasks();

        // 有省略部分代碼

    }

在方法initScheduledTasks中

  1. clientConfig是自動(dòng)配置類中托管給容器的EurekaClientConfigBean
  2. -- 默認(rèn)開啟從注冊中心獲取服務(wù)開關(guān)
    -- 默認(rèn)獲取服務(wù)間隔registryFetchIntervalSeconds為30S
    -- 默認(rèn)獲取服務(wù)間隔超時(shí)次數(shù)expBackOffBound為10次
    -- 默認(rèn)開啟向注冊中心注冊服務(wù)開關(guān)
    -- 默認(rèn)續(xù)約間隔renewalIntervalInSecs為30S
    -- 默認(rèn)續(xù)約間隔超時(shí)次數(shù)expBackOffBound為10次
  3. 向一開始創(chuàng)建的Schedule線程池提交兩個(gè)TimedSupervisorTask任務(wù)(父類TimerTask有實(shí)現(xiàn)Runnable)。
  4. Schedule線程池最終調(diào)度到的任務(wù)分別是CacheRefreshThreadHeartbeatThread(兩個(gè)類均有實(shí)現(xiàn)Runnable)
  5. 兩個(gè)任務(wù)的延遲執(zhí)行時(shí)間哥艇、執(zhí)行周期間隔時(shí)間和超時(shí)重試次數(shù)為上述配置信息绝编。
  6. 創(chuàng)建當(dāng)前客戶端實(shí)例的副本InstanceInfoReplicator,在配置了事件發(fā)布器之后貌踏,執(zhí)行了其start方法十饥。
    private void initScheduledTasks() {
        if (clientConfig.shouldFetchRegistry()) {
            // registry cache refresh timer
            int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds();
            int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound();
            scheduler.schedule(
                    new TimedSupervisorTask(
                            "cacheRefresh",
                            scheduler,
                            cacheRefreshExecutor,
                            registryFetchIntervalSeconds,
                            TimeUnit.SECONDS,
                            expBackOffBound,
                            new CacheRefreshThread()
                    ),
                    registryFetchIntervalSeconds, TimeUnit.SECONDS);
        }

        if (clientConfig.shouldRegisterWithEureka()) {
            int renewalIntervalInSecs = instanceInfo.getLeaseInfo().getRenewalIntervalInSecs();
            int expBackOffBound = clientConfig.getHeartbeatExecutorExponentialBackOffBound();
            logger.info("Starting heartbeat executor: " + "renew interval is: {}", renewalIntervalInSecs);

            // Heartbeat timer
            scheduler.schedule(
                    new TimedSupervisorTask(
                            "heartbeat",
                            scheduler,
                            heartbeatExecutor,
                            renewalIntervalInSecs,
                            TimeUnit.SECONDS,
                            expBackOffBound,
                            new HeartbeatThread()
                    ),
                    renewalIntervalInSecs, TimeUnit.SECONDS);

            // InstanceInfo replicator
            instanceInfoReplicator = new InstanceInfoReplicator(
                    this,
                    instanceInfo,
                    clientConfig.getInstanceInfoReplicationIntervalSeconds(),
                    2); // burstSize

            statusChangeListener = new ApplicationInfoManager.StatusChangeListener() {
                @Override
                public String getId() {
                    return "statusChangeListener";
                }

                @Override
                public void notify(StatusChangeEvent statusChangeEvent) {
                    if (InstanceStatus.DOWN == statusChangeEvent.getStatus() ||
                            InstanceStatus.DOWN == statusChangeEvent.getPreviousStatus()) {
                        // log at warn level if DOWN was involved
                        logger.warn("Saw local status change event {}", statusChangeEvent);
                    } else {
                        logger.info("Saw local status change event {}", statusChangeEvent);
                    }
                    instanceInfoReplicator.onDemandUpdate();
                }
            };

            if (clientConfig.shouldOnDemandUpdateStatusChange()) {
                applicationInfoManager.registerStatusChangeListener(statusChangeListener);
            }

            instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds());
        } else {
            logger.info("Not registering with Eureka server per configuration");
        }
    }

五 Eureka Client客戶端與注冊中心的交互

  • 服務(wù)獲取

任務(wù)CacheRefreshThread完成了從Eureka Server注冊中心獲取所有服務(wù)并緩存到本地的功能。
從run()方法開始哩俭,依次的調(diào)用鏈為:refreshRegistry() > fetchRegistry() > getAndUpdateDelta()

  1. getAndUpdateDelta的入?yún)楸镜鼐彺娴乃蟹?wù),第一次執(zhí)行該任務(wù)時(shí)拳恋,只有當(dāng)前服務(wù)自身凡资。
  2. 使用EurekaHttpClient向注冊中心發(fā)起獲取所有服務(wù)的請(qǐng)求,最終由AbstractJerseyEurekaHttpClient發(fā)起(具體套娃式的Http請(qǐng)求過程不再分析谬运,用到了Jersey隙赁,有興趣可以了解)
  3. 注冊中心響應(yīng)請(qǐng)求并返回結(jié)果,從響應(yīng)httpResponse中拿到注冊中心返回的所有服務(wù)信息梆暖,在updateDelta中完成對(duì)本地緩存的服務(wù)更新
    class CacheRefreshThread implements Runnable {
        public void run() {
            refreshRegistry();
        }
    }

    // 有省略部分代碼

    private void getAndUpdateDelta(Applications applications) throws Throwable {
        long currentUpdateGeneration = fetchRegistryGeneration.get();

        Applications delta = null;
        EurekaHttpResponse<Applications> httpResponse = eurekaTransport.queryClient.getDelta(remoteRegionsRef.get());
        if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) {
            delta = httpResponse.getEntity();
        }

        if (delta == null) {
            logger.warn("The server does not allow the delta revision to be applied because it is not safe. "
                    + "Hence got the full registry.");
            getAndStoreFullRegistry();
        } else if (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1)) {
            logger.debug("Got delta update with apps hashcode {}", delta.getAppsHashCode());
            String reconcileHashCode = "";
            if (fetchRegistryUpdateLock.tryLock()) {
                try {
                    updateDelta(delta);
                    reconcileHashCode = getReconcileHashCode(applications);
                } finally {
                    fetchRegistryUpdateLock.unlock();
                }
            } else {
                logger.warn("Cannot acquire update lock, aborting getAndUpdateDelta");
            }
            // There is a diff in number of instances for some reason
            if (!reconcileHashCode.equals(delta.getAppsHashCode()) || clientConfig.shouldLogDeltaDiff()) {
                reconcileAndLogDifference(delta, reconcileHashCode);  // this makes a remoteCall
            }
        } else {
            logger.warn("Not updating application delta as another thread is updating it already");
            logger.debug("Ignoring delta update with apps hashcode {}, as another thread is updating it already", delta.getAppsHashCode());
        }
    }
  1. 入?yún)閺淖灾行哪玫降乃蟹?wù)
  2. delta中保存了多個(gè)服務(wù)信息伞访,單個(gè)服務(wù)Application通常會(huì)包含多個(gè)節(jié)點(diǎn)(保證高可用性),InstanceInfo即為單個(gè)節(jié)點(diǎn)的信息轰驳。
  3. 節(jié)點(diǎn)信息不同的ActionType會(huì)有不同的處理方式
  4. 最終所有服務(wù)保存在Applications的屬性Map<String, Application> appNameApplicationMap上厚掷,每個(gè)服務(wù)Application內(nèi)又使用Map<String, InstanceInfo> instancesMap保存了服務(wù)的所有實(shí)例信息。
    private void updateDelta(Applications delta) {
        int deltaCount = 0;
        for (Application app : delta.getRegisteredApplications()) {
            for (InstanceInfo instance : app.getInstances()) {
                Applications applications = getApplications();
                String instanceRegion = instanceRegionChecker.getInstanceRegion(instance);
                if (!instanceRegionChecker.isLocalRegion(instanceRegion)) {
                    Applications remoteApps = remoteRegionVsApps.get(instanceRegion);
                    if (null == remoteApps) {
                        remoteApps = new Applications();
                        remoteRegionVsApps.put(instanceRegion, remoteApps);
                    }
                    applications = remoteApps;
                }

                ++deltaCount;
                if (ActionType.ADDED.equals(instance.getActionType())) {
                    Application existingApp = applications.getRegisteredApplications(instance.getAppName());
                    if (existingApp == null) {
                        applications.addApplication(app);
                    }
                    logger.debug("Added instance {} to the existing apps in region {}", instance.getId(), instanceRegion);
                    applications.getRegisteredApplications(instance.getAppName()).addInstance(instance);
                } else if (ActionType.MODIFIED.equals(instance.getActionType())) {
                    Application existingApp = applications.getRegisteredApplications(instance.getAppName());
                    if (existingApp == null) {
                        applications.addApplication(app);
                    }
                    logger.debug("Modified instance {} to the existing apps ", instance.getId());

                    applications.getRegisteredApplications(instance.getAppName()).addInstance(instance);

                } else if (ActionType.DELETED.equals(instance.getActionType())) {
                    Application existingApp = applications.getRegisteredApplications(instance.getAppName());
                    if (existingApp != null) {
                        logger.debug("Deleted instance {} to the existing apps ", instance.getId());
                        existingApp.removeInstance(instance);
                        /*
                         * We find all instance list from application(The status of instance status is not only the status is UP but also other status)
                         * if instance list is empty, we remove the application.
                         */
                        if (existingApp.getInstancesAsIsFromEureka().isEmpty()) {
                            applications.removeApplication(existingApp);
                        }
                    }
                }
            }
        }
        logger.debug("The total number of instances fetched by the delta processor : {}", deltaCount);

        getApplications().setVersion(delta.getVersion());
        getApplications().shuffleInstances(clientConfig.shouldFilterOnlyUpInstances());

        for (Applications applications : remoteRegionVsApps.values()) {
            applications.setVersion(delta.getVersion());
            applications.shuffleInstances(clientConfig.shouldFilterOnlyUpInstances());
        }
    }
  • 服務(wù)續(xù)約

任務(wù)HeartbeatThread完成了向注冊中心續(xù)約服務(wù)的功能级解。

  1. 使用當(dāng)前客戶端實(shí)例信息冒黑,向注冊中心發(fā)起服務(wù)續(xù)約請(qǐng)求
  2. 正常情況續(xù)約成功
  3. 續(xù)約接口404的情況下,使用注冊服務(wù)接口代替勤哗。
  4. 續(xù)約成功的前提下抡爹,客戶端更新自身維護(hù)的最后一次成功的心跳時(shí)間
    private class HeartbeatThread implements Runnable {

        public void run() {
            if (renew()) {
                lastSuccessfulHeartbeatTimestamp = System.currentTimeMillis();
            }
        }
    }

    boolean renew() {
        EurekaHttpResponse<InstanceInfo> httpResponse;
        try {
            httpResponse = eurekaTransport.registrationClient.sendHeartBeat(instanceInfo.getAppName(), instanceInfo.getId(), instanceInfo, null);
            logger.debug(PREFIX + "{} - Heartbeat status: {}", appPathIdentifier, httpResponse.getStatusCode());
            if (httpResponse.getStatusCode() == Status.NOT_FOUND.getStatusCode()) {
                REREGISTER_COUNTER.increment();
                logger.info(PREFIX + "{} - Re-registering apps/{}", appPathIdentifier, instanceInfo.getAppName());
                long timestamp = instanceInfo.setIsDirtyWithTime();
                boolean success = register();
                if (success) {
                    instanceInfo.unsetIsDirty(timestamp);
                }
                return success;
            }
            return httpResponse.getStatusCode() == Status.OK.getStatusCode();
        } catch (Throwable e) {
            logger.error(PREFIX + "{} - was unable to send heartbeat!", appPathIdentifier, e);
            return false;
        }
    }
  • 服務(wù)注冊

任務(wù)InstanceInfoReplicator中保存了客戶端服務(wù)的實(shí)例信息

  1. 構(gòu)造器中初始化了Schedule線程池,隨著start方法被調(diào)用芒划,任務(wù)會(huì)被Schedule線程池調(diào)度冬竟。(這里的initialDelayMs為EurekaClientConfigBean中的默認(rèn)配置initialInstanceInfoReplicationIntervalSeconds = 40,即延遲40S后再向注冊中心發(fā)起服務(wù)注冊請(qǐng)求)
  2. run方法中實(shí)際執(zhí)行的是DiscoveryClient的register方法民逼,以完成客戶端服務(wù)向注冊中心的注冊
InstanceInfoReplicator

    InstanceInfoReplicator(DiscoveryClient discoveryClient, InstanceInfo instanceInfo, int replicationIntervalSeconds, int burstSize) {
        this.discoveryClient = discoveryClient;
        this.instanceInfo = instanceInfo;
        this.scheduler = Executors.newScheduledThreadPool(1,
                new ThreadFactoryBuilder()
                        .setNameFormat("DiscoveryClient-InstanceInfoReplicator-%d")
                        .setDaemon(true)
                        .build());

        this.scheduledPeriodicRef = new AtomicReference<Future>();

        this.started = new AtomicBoolean(false);
        this.rateLimiter = new RateLimiter(TimeUnit.MINUTES);
        this.replicationIntervalSeconds = replicationIntervalSeconds;
        this.burstSize = burstSize;

        this.allowedRatePerMinute = 60 * this.burstSize / this.replicationIntervalSeconds;
        logger.info("InstanceInfoReplicator onDemand update allowed rate per min is {}", allowedRatePerMinute);
    }

    public void start(int initialDelayMs) {
        if (started.compareAndSet(false, true)) {
            instanceInfo.setIsDirty();  // for initial register
            Future next = scheduler.schedule(this, initialDelayMs, TimeUnit.SECONDS);
            scheduledPeriodicRef.set(next);
        }
    }

    public void run() {
        try {
            discoveryClient.refreshInstanceInfo();

            Long dirtyTimestamp = instanceInfo.isDirtyWithTime();
            if (dirtyTimestamp != null) {
                discoveryClient.register();
                instanceInfo.unsetIsDirty(dirtyTimestamp);
            }
        } catch (Throwable t) {
            logger.warn("There was a problem with the instance info replicator", t);
        } finally {
            Future next = scheduler.schedule(this, replicationIntervalSeconds, TimeUnit.SECONDS);
            scheduledPeriodicRef.set(next);
        }
    }

DiscoveryClient類的register方法內(nèi)泵殴,向注冊中心發(fā)起了注冊請(qǐng)求。

DiscoveryClient

    boolean register() throws Throwable {
        logger.info(PREFIX + "{}: registering service...", appPathIdentifier);
        EurekaHttpResponse<Void> httpResponse;
        try {
            httpResponse = eurekaTransport.registrationClient.register(instanceInfo);
        } catch (Exception e) {
            logger.warn(PREFIX + "{} - registration failed {}", appPathIdentifier, e.getMessage(), e);
            throw e;
        }
        if (logger.isInfoEnabled()) {
            logger.info(PREFIX + "{} - registration status: {}", appPathIdentifier, httpResponse.getStatusCode());
        }
        return httpResponse.getStatusCode() == Status.NO_CONTENT.getStatusCode();
    }
  • 服務(wù)卸載

DiscoveryClient類的shutdown方法中還提供了通知注冊中心卸載服務(wù)的能力拼苍。

  1. 對(duì)@PreDestroy的處理在InitDestroyAnnotationBeanPostProcessor的postProcessBeforeDestruction中袋狞,屬于Spring的能力。
  2. 關(guān)閉服務(wù)時(shí)shutdown方法會(huì)被調(diào)用,cancelScheduledTasks中關(guān)閉了上述服務(wù)注冊苟鸯、服務(wù)續(xù)約和獲取所有服務(wù)中創(chuàng)建的線程池
  3. unregister中向注冊中心發(fā)起了卸載實(shí)例的請(qǐng)求
    @PreDestroy
    @Override
    public synchronized void shutdown() {
        if (isShutdown.compareAndSet(false, true)) {
            logger.info("Shutting down DiscoveryClient ...");

            if (statusChangeListener != null && applicationInfoManager != null) {
                applicationInfoManager.unregisterStatusChangeListener(statusChangeListener.getId());
            }

            cancelScheduledTasks();

            // If APPINFO was registered
            if (applicationInfoManager != null
                    && clientConfig.shouldRegisterWithEureka()
                    && clientConfig.shouldUnregisterOnShutdown()) {
                applicationInfoManager.setInstanceStatus(InstanceStatus.DOWN);
                unregister();
            }

            if (eurekaTransport != null) {
                eurekaTransport.shutdown();
            }

            heartbeatStalenessMonitor.shutdown();
            registryStalenessMonitor.shutdown();

            logger.info("Completed shut down of DiscoveryClient");
        }
    }

    private void cancelScheduledTasks() {
        if (instanceInfoReplicator != null) {
            instanceInfoReplicator.stop();
        }
        if (heartbeatExecutor != null) {
            heartbeatExecutor.shutdownNow();
        }
        if (cacheRefreshExecutor != null) {
            cacheRefreshExecutor.shutdownNow();
        }
        if (scheduler != null) {
            scheduler.shutdownNow();
        }
    }

    void unregister() {
        // It can be null if shouldRegisterWithEureka == false
        if(eurekaTransport != null && eurekaTransport.registrationClient != null) {
            try {
                logger.info("Unregistering ...");
                EurekaHttpResponse<Void> httpResponse = eurekaTransport.registrationClient.cancel(instanceInfo.getAppName(), instanceInfo.getId());
                logger.info(PREFIX + "{} - deregister  status: {}", appPathIdentifier, httpResponse.getStatusCode());
            } catch (Exception e) {
                logger.error(PREFIX + "{} - de-registration failed{}", appPathIdentifier, e.getMessage(), e);
            }
        }
    }

至此同蜻,服務(wù)注冊、續(xù)約早处、緩存注冊中心所有服務(wù)和服務(wù)卸載都以完成湾蔓,但作為注冊中心,如何處理這些客戶端請(qǐng)求砌梆,以及注冊中心集群間如何交互默责,還需要再進(jìn)一步了解Eureka Server注冊中心。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末咸包,一起剝皮案震驚了整個(gè)濱河市桃序,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌烂瘫,老刑警劉巖媒熊,帶你破解...
    沈念sama閱讀 206,482評(píng)論 6 481
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異坟比,居然都是意外死亡芦鳍,警方通過查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,377評(píng)論 2 382
  • 文/潘曉璐 我一進(jìn)店門葛账,熙熙樓的掌柜王于貴愁眉苦臉地迎上來柠衅,“玉大人,你說我怎么就攤上這事籍琳》蒲纾” “怎么了?”我有些...
    開封第一講書人閱讀 152,762評(píng)論 0 342
  • 文/不壞的土叔 我叫張陵趋急,是天一觀的道長裙顽。 經(jīng)常有香客問我,道長宣谈,這世上最難降的妖魔是什么愈犹? 我笑而不...
    開封第一講書人閱讀 55,273評(píng)論 1 279
  • 正文 為了忘掉前任,我火速辦了婚禮闻丑,結(jié)果婚禮上漩怎,老公的妹妹穿的比我還像新娘。我一直安慰自己嗦嗡,他們只是感情好勋锤,可當(dāng)我...
    茶點(diǎn)故事閱讀 64,289評(píng)論 5 373
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著侥祭,像睡著了一般叁执。 火紅的嫁衣襯著肌膚如雪茄厘。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 49,046評(píng)論 1 285
  • 那天谈宛,我揣著相機(jī)與錄音次哈,去河邊找鬼。 笑死吆录,一個(gè)胖子當(dāng)著我的面吹牛窑滞,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播恢筝,決...
    沈念sama閱讀 38,351評(píng)論 3 400
  • 文/蒼蘭香墨 我猛地睜開眼哀卫,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了撬槽?” 一聲冷哼從身側(cè)響起此改,我...
    開封第一講書人閱讀 36,988評(píng)論 0 259
  • 序言:老撾萬榮一對(duì)情侶失蹤,失蹤者是張志新(化名)和其女友劉穎侄柔,沒想到半個(gè)月后共啃,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 43,476評(píng)論 1 300
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡勋拟,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 35,948評(píng)論 2 324
  • 正文 我和宋清朗相戀三年勋磕,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了妈候。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片敢靡。...
    茶點(diǎn)故事閱讀 38,064評(píng)論 1 333
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖苦银,靈堂內(nèi)的尸體忽然破棺而出啸胧,到底是詐尸還是另有隱情,我是刑警寧澤幔虏,帶...
    沈念sama閱讀 33,712評(píng)論 4 323
  • 正文 年R本政府宣布纺念,位于F島的核電站,受9級(jí)特大地震影響想括,放射性物質(zhì)發(fā)生泄漏陷谱。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,261評(píng)論 3 307
  • 文/蒙蒙 一瑟蜈、第九天 我趴在偏房一處隱蔽的房頂上張望烟逊。 院中可真熱鬧,春花似錦铺根、人聲如沸宪躯。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,264評(píng)論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽访雪。三九已至详瑞,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間臣缀,已是汗流浹背坝橡。 一陣腳步聲響...
    開封第一講書人閱讀 31,486評(píng)論 1 262
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留肝陪,地道東北人驳庭。 一個(gè)月前我還...
    沈念sama閱讀 45,511評(píng)論 2 354
  • 正文 我出身青樓,卻偏偏與公主長得像氯窍,于是被迫代替她去往敵國和親饲常。 傳聞我的和親對(duì)象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 42,802評(píng)論 2 345