Spring Cloud之Eureka源碼分析2

本章主要介紹Eureka Client端源碼分析辛萍。
客戶端主要是向Server服務(wù)端發(fā)送Http請求喳魏,主要有注冊恒削,心跳續(xù)約池颈,獲取注冊信息等功能

實例配置

在分析源碼之前,需要查看下客戶端配置文件
application.yml

server:
  port: 8020
eureka:
  client:
    serviceUrl:
        defaultZone: http://localhost:8000/eureka/  #eureka服務(wù)端提供的注冊地址 參考服務(wù)端配置的這個路徑

  instance:
    instance-id: client-0 #此實例注冊到eureka服務(wù)端的唯一的實例ID
    prefer-ip-address: true #是否顯示IP地址
    leaseRenewalIntervalInSeconds: 10 #eureka客戶需要多長時間發(fā)送心跳給eureka服務(wù)器钓丰,表明它仍然活著,默認(rèn)為30 秒 (與下面配置的單位都是秒)
    leaseExpirationDurationInSeconds: 30 #Eureka服務(wù)器在接收到實例的最后一次發(fā)出的心跳后躯砰,需要等待多久才可以將此實例刪除,默認(rèn)為90秒

spring:
  application:
    name: service-client #此實例注冊到eureka服務(wù)端的name

訪問服務(wù)端localhost:8000的注冊信息


localhost.png

EurekaClientAutoConfiguration

自動配置類携丁,首先要從依賴包的spring.factories文件看起

org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
org.springframework.cloud.netflix.eureka.config.EurekaClientConfigServerAutoConfiguration,\
org.springframework.cloud.netflix.eureka.config.EurekaDiscoveryClientConfigServiceAutoConfiguration,\
org.springframework.cloud.netflix.eureka.EurekaClientAutoConfiguration,\
org.springframework.cloud.netflix.ribbon.eureka.RibbonEurekaAutoConfiguration,\
org.springframework.cloud.netflix.eureka.EurekaDiscoveryClientConfiguration,\
org.springframework.cloud.netflix.eureka.reactive.EurekaReactiveDiscoveryClientConfiguration,\
org.springframework.cloud.netflix.eureka.loadbalancer.LoadBalancerEurekaAutoConfiguration

org.springframework.cloud.bootstrap.BootstrapConfiguration=\
org.springframework.cloud.netflix.eureka.config.EurekaDiscoveryClientConfigServiceBootstrapConfiguration

其中最重要的是EurekaClientAutoConfiguration類


EurekaClientAutoConfiguration.png

只要類中存在EurekaClientConfig類所在的依賴包eureka-client-xx.jar就可以加載這個類


EurekaClientConfig.png

初始化EurekaClientAutoConfiguration類中的方法Bean加載到spring容器中:
1琢歇、EurekaAutoServiceRegistration 用來注冊

2、RefreshableEurekaClientConfiguration 用來開啟定時任務(wù)

注冊功能實現(xiàn)

1梦鉴、EurekaAutoServiceRegistration
實例化EurekaAutoServiceRegistration對象李茫,并放到spring容器中
org.springframework.cloud.netflix.eureka.serviceregistry.EurekaAutoServiceRegistration


EurekaAutoServiceRegistration.png

2、start
由于EurekaAutoServiceRegistration類實現(xiàn)了SmartLifecycle,SmartApplicationListener等接口肥橙,所以會在容器初始化完成之后調(diào)用EurekaAutoServiceRegistration#start方法魄宏。


start.png

調(diào)用EurekaServiceRegistry的注冊方法和發(fā)布InstanceRegisteredEvent事件
3、register
org.springframework.cloud.netflix.eureka.serviceregistry.EurekaServiceRegistry#register
EurekaServiceRegistry.png

調(diào)用ApplicationInfoManager應(yīng)用信息管理設(shè)置實例初始化狀態(tài)信息initialStatus
4存筏、setInstanceStatus
com.netflix.appinfo.ApplicationInfoManager#setInstanceStatus
設(shè)置實例狀態(tài)信息并調(diào)用監(jiān)聽器notify方法


setInstanceStatus.png

5宠互、notify
com.netflix.appinfo.ApplicationInfoManager.StatusChangeListener#notify
調(diào)用StatusChangeListener狀態(tài)改變監(jiān)聽器的notify方法。
這個對象實現(xiàn)在com.netflix.discovery.DiscoveryClient#initScheduledTasks方法內(nèi)
image.png

6椭坚、onDemandUpdate
com.netflix.discovery.InstanceInfoReplicator#onDemandUpdate
image.png

7名秀、run
由于InstanceInfoReplicator 類實現(xiàn)了Runnable接口,所以會調(diào)用這個run方法
image.png

8藕溅、register

com.netflix.discovery.DiscoveryClient#register
這里就到了注冊客戶端的地方


register.png

9、register
com.netflix.discovery.shared.transport.jersey.AbstractJerseyEurekaHttpClient#register
封裝http請求并調(diào)用到服務(wù)端方法
AbstractJerseyEurekaHttpClient.png

所以继榆,封裝的url是:服務(wù)端的serviceUrl:defaultZone/apps/客戶端應(yīng)用名
調(diào)到Eureka Server端
com.netflix.eureka.resources.ApplicationResource#addInstance
addInstance.png

定時刷新任務(wù)

包括緩存更新與心跳續(xù)約巾表,通過RefreshableEurekaClientConfiguration類開始初始化,并最終通過initScheduledTasks方法開啟定時調(diào)度器任務(wù)

RefreshableEurekaClientConfiguration

EurekaClientAutoConfiguration.RefreshableEurekaClientConfiguration

@Configuration(proxyBeanMethods = false)
    @ConditionalOnRefreshScope
    protected static class RefreshableEurekaClientConfiguration {

        @Autowired
        private ApplicationContext context;

        @Autowired
        private AbstractDiscoveryClientOptionalArgs<?> optionalArgs;

        @Bean(destroyMethod = "shutdown")
        @ConditionalOnMissingBean(value = EurekaClient.class,
                search = SearchStrategy.CURRENT)
        @org.springframework.cloud.context.config.annotation.RefreshScope
        @Lazy
        public EurekaClient eurekaClient(ApplicationInfoManager manager,
                EurekaClientConfig config, EurekaInstanceConfig instance,
                @Autowired(required = false) HealthCheckHandler healthCheckHandler) {
            // If we use the proxy of the ApplicationInfoManager we could run into a
            // problem
            // when shutdown is called on the CloudEurekaClient where the
            // ApplicationInfoManager bean is
            // requested but wont be allowed because we are shutting down. To avoid this
            // we use the
            // object directly.
            ApplicationInfoManager appManager;
            if (AopUtils.isAopProxy(manager)) {
                appManager = ProxyUtils.getTargetObject(manager);
            }
            else {
                appManager = manager;
            }
            CloudEurekaClient cloudEurekaClient = new CloudEurekaClient(appManager,
                    config, this.optionalArgs, this.context);
            cloudEurekaClient.registerHealthCheck(healthCheckHandler);
            return cloudEurekaClient;
        }
}

這個類是程序?qū)崿F(xiàn)定時刷新任務(wù)開始的地方略吨,主要是通過new CloudEurekaClient()方法創(chuàng)建Cloud客戶端類(CloudEurekaClient)

CloudEurekaClient

下面主要查看CloudEurekaClient的調(diào)用鏈
1集币、CloudEurekaClient#CloudEurekaClient
org.springframework.cloud.netflix.eureka.CloudEurekaClient#CloudEurekaClient
調(diào)用CloudEurekaClient類的父類,和對applicationInfoManager翠忠、publisher鞠苟、eurekaTransportField等屬性值賦值


CloudEurekaClient.png

2、DiscoveryClient#DiscoveryClient
com.netflix.discovery.DiscoveryClient#DiscoveryClient


DiscoveryClient.png

3、DiscoveryClient#DiscoveryClient
com.netflix.discovery.DiscoveryClient#DiscoveryClient
構(gòu)造器類調(diào)用当娱,傳入獲取備用服務(wù)器backupRegistryInstance實例的get方法吃既。不在這里調(diào)用
public DiscoveryClient(ApplicationInfoManager applicationInfoManager, final EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args, EndpointRandomizer randomizer) {
        this(applicationInfoManager, config, args, new Provider<BackupRegistry>() {
            private volatile BackupRegistry backupRegistryInstance;

            @Override
            public synchronized BackupRegistry get() {
                if (backupRegistryInstance == null) {
                    String backupRegistryClassName = config.getBackupRegistryImpl();
                    if (null != backupRegistryClassName) {
                        try {
                            backupRegistryInstance = (BackupRegistry) Class.forName(backupRegistryClassName).newInstance();
                            logger.info("Enabled backup registry of type {}", backupRegistryInstance.getClass());
                        } catch (InstantiationException e) {
                            logger.error("Error instantiating BackupRegistry.", e);
                        } catch (IllegalAccessException e) {
                            logger.error("Error instantiating BackupRegistry.", e);
                        } catch (ClassNotFoundException e) {
                            logger.error("Error instantiating BackupRegistry.", e);
                        }
                    }

                    if (backupRegistryInstance == null) {
                        logger.warn("Using default backup registry implementation which does not do anything.");
                        backupRegistryInstance = new NotImplementedRegistryImpl();
                    }
                }

                return backupRegistryInstance;
            }
        }, randomizer);
    }

4、DiscoveryClient#DiscoveryClient
定義調(diào)度器類跨细、心跳執(zhí)行器和緩存刷新執(zhí)行器等的定義鹦倚。
com.netflix.discovery.DiscoveryClient#DiscoveryClient

DiscoveryClient(ApplicationInfoManager applicationInfoManager, EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args,
                    Provider<BackupRegistry> backupRegistryProvider, EndpointRandomizer endpointRandomizer) {
        //  ........前半部分省略
        try {
            // default size of 2 - 1 each for heartbeat and cacheRefresh
            scheduler = Executors.newScheduledThreadPool(2,
                    new ThreadFactoryBuilder()
                            .setNameFormat("DiscoveryClient-%d")
                            .setDaemon(true)
                            .build());

            heartbeatExecutor = new ThreadPoolExecutor(
                    1, clientConfig.getHeartbeatExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,
                    new SynchronousQueue<Runnable>(),
                    new ThreadFactoryBuilder()
                            .setNameFormat("DiscoveryClient-HeartbeatExecutor-%d")
                            .setDaemon(true)
                            .build()
            );  // use direct handoff

            cacheRefreshExecutor = new ThreadPoolExecutor(
                    1, clientConfig.getCacheRefreshExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,
                    new SynchronousQueue<Runnable>(),
                    new ThreadFactoryBuilder()
                            .setNameFormat("DiscoveryClient-CacheRefreshExecutor-%d")
                            .setDaemon(true)
                            .build()
            );  // use direct handoff

            eurekaTransport = new EurekaTransport();
            scheduleServerEndpointTask(eurekaTransport, args);

            AzToRegionMapper azToRegionMapper;
            if (clientConfig.shouldUseDnsForFetchingServiceUrls()) {
                azToRegionMapper = new DNSBasedAzToRegionMapper(clientConfig);
            } else {
                azToRegionMapper = new PropertyBasedAzToRegionMapper(clientConfig);
            }
            if (null != remoteRegionsToFetch.get()) {
                azToRegionMapper.setRegionsToFetch(remoteRegionsToFetch.get().split(","));
            }
            instanceRegionChecker = new InstanceRegionChecker(azToRegionMapper, clientConfig.getRegion());
        } catch (Throwable e) {
            throw new RuntimeException("Failed to initialize DiscoveryClient!", e);
        }

        if (clientConfig.shouldFetchRegistry() && !fetchRegistry(false)) {
            fetchRegistryFromBackup();
        }

        // call and execute the pre registration handler before all background tasks (inc registration) is started
        if (this.preRegistrationHandler != null) {
            this.preRegistrationHandler.beforeRegistration();
        }

        if (clientConfig.shouldRegisterWithEureka() && clientConfig.shouldEnforceRegistrationAtInit()) {
            try {
                if (!register() ) {
                    throw new IllegalStateException("Registration error at startup. Invalid server response.");
                }
            } catch (Throwable th) {
                logger.error("Registration error at startup: {}", th.getMessage());
                throw new IllegalStateException(th);
            }
        }

        // finally, init the schedule tasks (e.g. cluster resolvers, heartbeat, instanceInfo replicator, fetch
        initScheduledTasks();

        try {
            Monitors.registerObject(this);
        } catch (Throwable e) {
            logger.warn("Cannot register timers", e);
        }

        // This is a bit of hack to allow for existing code using DiscoveryManager.getInstance()
        // to work with DI'd DiscoveryClient
        DiscoveryManager.getInstance().setDiscoveryClient(this);
        DiscoveryManager.getInstance().setEurekaClientConfig(config);

        initTimestampMs = System.currentTimeMillis();
        logger.info("Discovery Client initialized at timestamp {} with initial instances count: {}",
                initTimestampMs, this.getApplications().size());
    }

這個方法主要流程:
①、這個方法前半部分是初始化屬性值冀惭。
②震叙、根據(jù)客戶端client配置文件,config.shouldFetchRegistry()是否獲取注冊表信息和
config.shouldRegisterWithEureka()是否注冊到eureka上來對屬性賦值散休,或直接返回
③媒楼、初始化調(diào)度器scheduler、兩個線程池執(zhí)行器heartbeatExecutor(心跳續(xù)約)和cacheRefreshExecutor(緩存刷新戚丸,定時獲取注冊信息表)
④划址、在獲取服務(wù)注冊信息條件下,沒有獲取到信息或異常即fetchRegistry(false)返回false昏滴『秭辏可以從備用服務(wù)器獲取調(diào)用fetchRegistryFromBackup()方法,內(nèi)部實現(xiàn)方法調(diào)用備用服務(wù)器類的get方法backupRegistryProvider.get()
⑤谣殊、初始化調(diào)度器任務(wù)方法initScheduledTasks()

初始化調(diào)度器任務(wù)initScheduledTasks

調(diào)度器任務(wù)包括:
1拂共、定時刷新緩存注冊表信息,分為全量獲取和增量獲取
2姻几、定時向服務(wù)端發(fā)送心跳續(xù)約
3宜狐、狀態(tài)改變監(jiān)聽器執(zhí)行
這里不僅包括這些定時任務(wù),注冊也是在這里調(diào)用狀態(tài)改變監(jiān)聽器StatusChangeListener的notify方法
com.netflix.discovery.DiscoveryClient#initScheduledTasks

 /**
     * Initializes all scheduled tasks.
     */
    private void initScheduledTasks() {
        if (clientConfig.shouldFetchRegistry()) {
            // registry cache refresh timer
            int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds();
            int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound();
            scheduler.schedule(
                    new TimedSupervisorTask(
                            "cacheRefresh",
                            scheduler,
                            cacheRefreshExecutor,
                            registryFetchIntervalSeconds,
                            TimeUnit.SECONDS,
                            expBackOffBound,
                            new CacheRefreshThread()
                    ),
                    registryFetchIntervalSeconds, TimeUnit.SECONDS);
        }

        if (clientConfig.shouldRegisterWithEureka()) {
            int renewalIntervalInSecs = instanceInfo.getLeaseInfo().getRenewalIntervalInSecs();
            int expBackOffBound = clientConfig.getHeartbeatExecutorExponentialBackOffBound();
            logger.info("Starting heartbeat executor: " + "renew interval is: {}", renewalIntervalInSecs);

            // Heartbeat timer
            scheduler.schedule(
                    new TimedSupervisorTask(
                            "heartbeat",
                            scheduler,
                            heartbeatExecutor,
                            renewalIntervalInSecs,
                            TimeUnit.SECONDS,
                            expBackOffBound,
                            new HeartbeatThread()
                    ),
                    renewalIntervalInSecs, TimeUnit.SECONDS);

            // InstanceInfo replicator
            instanceInfoReplicator = new InstanceInfoReplicator(
                    this,
                    instanceInfo,
                    clientConfig.getInstanceInfoReplicationIntervalSeconds(),
                    2); // burstSize

            statusChangeListener = new ApplicationInfoManager.StatusChangeListener() {
                @Override
                public String getId() {
                    return "statusChangeListener";
                }

                @Override
                public void notify(StatusChangeEvent statusChangeEvent) {
                    if (InstanceStatus.DOWN == statusChangeEvent.getStatus() ||
                            InstanceStatus.DOWN == statusChangeEvent.getPreviousStatus()) {
                        // log at warn level if DOWN was involved
                        logger.warn("Saw local status change event {}", statusChangeEvent);
                    } else {
                        logger.info("Saw local status change event {}", statusChangeEvent);
                    }
                    instanceInfoReplicator.onDemandUpdate();
                }
            };

            if (clientConfig.shouldOnDemandUpdateStatusChange()) {
                applicationInfoManager.registerStatusChangeListener(statusChangeListener);
            }

            instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds());
        } else {
            logger.info("Not registering with Eureka server per configuration");
        }
    }

1蛇捌、心跳續(xù)約

1抚恒、initScheduledTasks
com.netflix.discovery.DiscoveryClient#initScheduledTasks
TimedSupervisorTask繼承了TimerTask,TimerTask實現(xiàn)了Runnable
TimedSupervisorTask類的構(gòu)造方法

public TimedSupervisorTask(String name, ScheduledExecutorService scheduler, ThreadPoolExecutor executor,
                               int timeout, TimeUnit timeUnit, int expBackOffBound, Runnable task) {
        this.scheduler = scheduler;
        this.executor = executor;
        this.timeoutMillis = timeUnit.toMillis(timeout);
        this.task = task;
        this.delay = new AtomicLong(timeoutMillis);
        this.maxDelay = timeoutMillis * expBackOffBound;
        Monitors.registerObject(name, this);
    }
heartbeatExecutor.png

2络拌、HeartbeatThread
執(zhí)行TimedSupervisorTask的task任務(wù)俭驮,在給定的間隔內(nèi)執(zhí)行心跳續(xù)約任務(wù)
com.netflix.discovery.DiscoveryClient.HeartbeatThread


HeartbeatThread.png

3、renew
續(xù)約任務(wù)春贸,續(xù)約成功更新lastSuccessfulHeartbeatTimestamp參數(shù)混萝。通過REST方式進行續(xù)訂
com.netflix.discovery.DiscoveryClient#renew


renew.png

4、sendHeartBeat
拼接http請求萍恕,發(fā)送心跳
com.netflix.discovery.shared.transport.jersey.AbstractJerseyEurekaHttpClient#sendHeartBeat
sendHeartBeat.png

客戶端appName=SERVICE-CLIENT逸嘀,id是instance-id屬性值client-0

服務(wù)端調(diào)用到renewLease方法續(xù)約,appName和id與客戶端傳過來的相同
com.netflix.eureka.resources.InstanceResource#renewLease


image.png

全量拉取和增量拉取

在定時刷新緩存實現(xiàn)獲取注冊信息允粤,分為全量拉取和增量拉取
創(chuàng)建TimedSupervisorTask調(diào)度任務(wù)類崭倘,傳入cacheRefreshExecutor執(zhí)行器翼岁、CacheRefreshThread任務(wù)類、從服務(wù)端獲取注冊信息的時間間隔RegistryFetchIntervalSeconds等參數(shù)信息

image.png

1司光、run
定時執(zhí)行CacheRefreshThread類的run方法
CacheRefreshThread.png

2琅坡、refreshRegistry
首先對remoteRegionsModified參數(shù)進行判斷,這樣可以確保對遠(yuǎn)程區(qū)域進行動態(tài)更改時可以獲取數(shù)據(jù)飘庄。如果更改則remoteRegionsModified=true脑蠕,只進行全量拉取
com.netflix.discovery.DiscoveryClient#refreshRegistry
refreshRegistry.png

3、fetchRegistry
com.netflix.discovery.DiscoveryClient#fetchRegistry
這個方法是決定了使用那種方式拉取

private boolean fetchRegistry(boolean forceFullRegistryFetch) {
        Stopwatch tracer = FETCH_REGISTRY_TIMER.start();

        try {
            // If the delta is disabled or if it is the first time, get all
            // applications
            Applications applications = getApplications();

            if (clientConfig.shouldDisableDelta()
                    || (!Strings.isNullOrEmpty(clientConfig.getRegistryRefreshSingleVipAddress()))
                    || forceFullRegistryFetch
                    || (applications == null)
                    || (applications.getRegisteredApplications().size() == 0)
                    || (applications.getVersion() == -1)) //Client application does not have latest library supporting delta
            {
                logger.info("Disable delta property : {}", clientConfig.shouldDisableDelta());
                logger.info("Single vip registry refresh property : {}", clientConfig.getRegistryRefreshSingleVipAddress());
                logger.info("Force full registry fetch : {}", forceFullRegistryFetch);
                logger.info("Application is null : {}", (applications == null));
                logger.info("Registered Applications size is zero : {}",
                        (applications.getRegisteredApplications().size() == 0));
                logger.info("Application version is -1: {}", (applications.getVersion() == -1));
                getAndStoreFullRegistry();
            } else {
                getAndUpdateDelta(applications);
            }
            applications.setAppsHashCode(applications.getReconcileHashCode());
            logTotalInstances();
        } catch (Throwable e) {
            logger.error(PREFIX + "{} - was unable to refresh its cache! status = {}", appPathIdentifier, e.getMessage(), e);
            return false;
        } finally {
            if (tracer != null) {
                tracer.stop();
            }
        }

        // Notify about cache refresh before updating the instance remote status
        onCacheRefreshed();

        // Update remote status based on refreshed data held in the cache
        updateInstanceRemoteStatus();

        // registry was fetched successfully, so return true
        return true;
    }

全量拉取條件(任意一個)
①跪削、disable-delta屬性值是true 關(guān)閉增量拉取
②谴仙、registry-refresh-single-vip-address 屬性vip地址的值不為空
③、forceFullRegistryFetch 為true 傳過來的變量值
④碾盐、localRegionApps的applications是null 當(dāng)前區(qū)域應(yīng)用
⑤晃跺、applications的數(shù)量是0
⑥、applications的版本是-1

增量拉取

實現(xiàn)增量拉取的條件是不符合全量拉取毫玖,調(diào)用getAndUpdateDelta方法
com.netflix.discovery.DiscoveryClient#getAndUpdateDelta

private void getAndUpdateDelta(Applications applications) throws Throwable {
        long currentUpdateGeneration = fetchRegistryGeneration.get();

        Applications delta = null;
        EurekaHttpResponse<Applications> httpResponse = eurekaTransport.queryClient.getDelta(remoteRegionsRef.get());
        if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) {
            delta = httpResponse.getEntity();
        }

        if (delta == null) {
            logger.warn("The server does not allow the delta revision to be applied because it is not safe. "
                    + "Hence got the full registry.");
            getAndStoreFullRegistry();
        } else if (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1)) {
            logger.debug("Got delta update with apps hashcode {}", delta.getAppsHashCode());
            String reconcileHashCode = "";
            if (fetchRegistryUpdateLock.tryLock()) {
                try {
                    updateDelta(delta);
                    reconcileHashCode = getReconcileHashCode(applications);
                } finally {
                    fetchRegistryUpdateLock.unlock();
                }
            } else {
                logger.warn("Cannot acquire update lock, aborting getAndUpdateDelta");
            }
            // There is a diff in number of instances for some reason
            if (!reconcileHashCode.equals(delta.getAppsHashCode()) || clientConfig.shouldLogDeltaDiff()) {
                reconcileAndLogDifference(delta, reconcileHashCode);  // this makes a remoteCall
            }
        } else {
            logger.warn("Not updating application delta as another thread is updating it already");
            logger.debug("Ignoring delta update with apps hashcode {}, as another thread is updating it already", delta.getAppsHashCode());
        }
    }

這個方法實現(xiàn)了增量拉取的請求實現(xiàn)掀虎,及對拉取增量結(jié)果的處理
1、getDelta
eurekaTransport.queryClient.getDelta(remoteRegionsRef.get())的具體實現(xiàn)是通過AbstractJerseyEurekaHttpClient類實現(xiàn)的
com.netflix.discovery.shared.transport.jersey.AbstractJerseyEurekaHttpClient#getDelta

getDelta.png

拼接apps/delta開頭的請求
2付枫、getApplicationsInternal
com.netflix.discovery.shared.transport.jersey.AbstractJerseyEurekaHttpClient#getApplicationsInternal
image.png

拼接的http請求是:http://localhost:8000/eureka/apps/delta然后調(diào)用到Eureka Server服務(wù)端
3烹玉、getContainerDifferential
服務(wù)端通過getContainerDifferential接收
com.netflix.eureka.resources.ApplicationsResource#getContainerDifferential
getContainerDifferential1.png

getContainerDifferential2.png

無論是調(diào)用到responseCache.getGZIP(cacheKey)方法,還是responseCache.get(cacheKey)方法阐滩。最終都是調(diào)用到Eureka Server的查詢的三種緩存二打,最終會經(jīng)過讀寫緩存readWriteCacheMap處理。
4掂榔、readWriteCacheMap
調(diào)用到ResponseCacheImpl構(gòu)造方法中readWriteCacheMap讀寫緩存的創(chuàng)建中继效,如果緩存中沒有會調(diào)用generatePayload方法
com.netflix.eureka.registry.ResponseCacheImpl#readWriteCacheMap
image.png

5、generatePayload
com.netflix.eureka.registry.ResponseCacheImpl#generatePayload
根據(jù)傳入的Key參數(shù)處理不同的情況
generatePayload.png

key = { eurekaAccept = "full"装获,entityType = "Application"瑞信,hashKey = "ApplicationALL_APPS_DELTAJSONV2full", requestVersion = "V2"穴豫, requestType = "JSON"凡简, regions = null, entityName = "ALL_APPS_DELTA"}
entityType 可分為Application精肃、VIP潘鲫、SVIP、default等情況
在entityType 是Application情況下EntityName分為:"ALL_APPS"(全量拉壤哒取)和"ALL_APPS_DELTA"(增量拉取)
在全量或增量拉取下有isRemoteRegionRequested參數(shù)判斷是否具有遠(yuǎn)程區(qū)域請求
本次處理是:增量拉取的沒有遠(yuǎn)程區(qū)域請求挖函,并調(diào)用getPayLoad方法加載是否含有增量數(shù)據(jù)
6状植、getApplicationDeltas
獲取增量實例
com.netflix.eureka.registry.AbstractInstanceRegistry#getApplicationDeltas

public Applications getApplicationDeltas() {
        GET_ALL_CACHE_MISS_DELTA.increment();
        Applications apps = new Applications();
        apps.setVersion(responseCache.getVersionDelta().get());
        Map<String, Application> applicationInstancesMap = new HashMap<String, Application>();
        try {
            write.lock();
            Iterator<RecentlyChangedItem> iter = this.recentlyChangedQueue.iterator();
            logger.debug("The number of elements in the delta queue is : {}",
                    this.recentlyChangedQueue.size());
            while (iter.hasNext()) {
                Lease<InstanceInfo> lease = iter.next().getLeaseInfo();
                InstanceInfo instanceInfo = lease.getHolder();
                logger.debug(
                        "The instance id {} is found with status {} and actiontype {}",
                        instanceInfo.getId(), instanceInfo.getStatus().name(), instanceInfo.getActionType().name());
                Application app = applicationInstancesMap.get(instanceInfo
                        .getAppName());
                if (app == null) {
                    app = new Application(instanceInfo.getAppName());
                    applicationInstancesMap.put(instanceInfo.getAppName(), app);
                    apps.addApplication(app);
                }
                app.addInstance(new InstanceInfo(decorateInstanceInfo(lease)));
            }

            boolean disableTransparentFallback = serverConfig.disableTransparentFallbackToOtherRegion();

            if (!disableTransparentFallback) {
                Applications allAppsInLocalRegion = getApplications(false);

                for (RemoteRegionRegistry remoteRegistry : this.regionNameVSRemoteRegistry.values()) {
                    Applications applications = remoteRegistry.getApplicationDeltas();
                    for (Application application : applications.getRegisteredApplications()) {
                        Application appInLocalRegistry =
                                allAppsInLocalRegion.getRegisteredApplications(application.getName());
                        if (appInLocalRegistry == null) {
                            apps.addApplication(application);
                        }
                    }
                }
            }

            Applications allApps = getApplications(!disableTransparentFallback);
            apps.setAppsHashCode(allApps.getReconcileHashCode());
            return apps;
        } finally {
            write.unlock();
        }
    }

這個方法主要是遍歷recentlyChangedQueue存在的數(shù)據(jù)放入到Applications對象中浊竟。所以recentlyChangedQueue隊列中存在什么數(shù)據(jù)就很重要,因此我們需要了解最新更新隊列recentlyChangedQueue是如何放入的及放入那些數(shù)據(jù)津畸,及其的移除的原理振定。
在這個方法最后 apps.setAppsHashCode設(shè)置了當(dāng)前服務(wù)端所有注冊信息的HashCode,所以這個增量對象存儲了最新的狀態(tài)HashCode值肉拓。
7后频、客戶端獲取增量數(shù)據(jù)的處理
還是在getAndUpdateDelta方法內(nèi),對服務(wù)端傳輸過來數(shù)據(jù)暖途,獲取當(dāng)前服務(wù)端的增量數(shù)據(jù)部分
com.netflix.discovery.DiscoveryClient#getAndUpdateDelta


getAndUpdateDelta.png

這個方法的主要過程是:
如果增量數(shù)據(jù)部分為空卑惜,則執(zhí)行全量拉取。
對當(dāng)前服務(wù)的注冊信息表執(zhí)行updateDelta(delta)方法驻售,對當(dāng)前注冊實例的增加刪除或修改操作
當(dāng)前更新后的服務(wù)注冊表的HashCode值與增量對象存儲的最新的狀態(tài)HashCode值比較露久,如果不相等 則執(zhí)行全量拉取

recentlyChangedQueue

最新更新隊列ConcurrentLinkedQueue<RecentlyChangedItem> recentlyChangedQueue
com.netflix.eureka.registry.AbstractInstanceRegistry#AbstractInstanceRegistry類在構(gòu)建創(chuàng)建注冊表時創(chuàng)建了recentlyChangedQueue隊列,并創(chuàng)建了一個增量調(diào)度任務(wù)方法getDeltaRetentionTask方法


AbstractInstanceRegistry.png

com.netflix.eureka.registry.AbstractInstanceRegistry#getDeltaRetentionTask
對recentlyChangedQueue隊列中對最近改變的隊列在一定時間范圍retentionTimeInMSInDeltaQueue=180000ms(3分鐘)外的進行定時清除(30s清除一次)


getDeltaRetentionTask.png

recentlyChangedQueue隊列添加條件:
1欺栗、注冊時register
2毫痕、下線時Cancel

3、statusUpdate
4迟几、deleteStatusOverride
getDeltaRetentionTask進行定時清除

全量拉取

全量拉取與增量拉取過程類似
全量拉取調(diào)用getAndStoreFullRegistry方法
1消请、getAndStoreFullRegistry
com.netflix.discovery.DiscoveryClient#getAndStoreFullRegistry


getAndStoreFullRegistry.png

2、getApplications
com.netflix.discovery.shared.transport.EurekaHttpClient#getApplications


EurekaHttpClient.png

3类腮、getApplications
com.netflix.discovery.shared.transport.jersey.AbstractJerseyEurekaHttpClient#getApplications
getApplications.png

拼接的Http請求是:apps/
4臊泰、getApplicationsInternal

com.netflix.discovery.shared.transport.jersey.AbstractJerseyEurekaHttpClient#getApplicationsInternal


getApplicationsInternal.png

5、getContainers
調(diào)用到Eureka Server服務(wù)端的getContainers方法存哲,這里調(diào)用與增量拉取類似因宇。查詢經(jīng)過三層緩存結(jié)構(gòu),具體過程可參考增量查詢與三層緩存結(jié)構(gòu)祟偷。
com.netflix.eureka.resources.ApplicationsResource#getContainers
getContainers.png

如果acceptEncoding包含值gzip將會調(diào)用ResponseCacheImpl#getGZIP方法獲取
ResponseCacheImpl.png

否則調(diào)用ResponseCacheImpl#get(Key)方法
最終會調(diào)用到讀寫緩存的generatePayload方法處理
6察滑、generatePayload
com.netflix.eureka.registry.ResponseCacheImpl#generatePayload
Key結(jié)構(gòu)中的EntityName是"ALL_APPS"全量查詢。
generatePayload.png

7修肠、getApplications
獲取注冊表所有的注冊信息

com.netflix.eureka.registry.AbstractInstanceRegistry#getApplications()


getApplications.png

8贺辰、結(jié)果處理
放入到當(dāng)前對象localRegionApps緩存中
image.png

如果含有獲取遠(yuǎn)程區(qū)域注冊信息FetchingRemoteRegionRegistries,只需要分到不同的索引位置
image.png

總結(jié):

Eureka Client客戶端與Eureka Server服務(wù)端是關(guān)聯(lián)的嵌施,不能分開處理饲化。關(guān)鍵點有以下幾點:
客戶端主要了解的是如何向服務(wù)端發(fā)起請求及服務(wù)端接收接口的對應(yīng)
客戶端的定時任務(wù)有哪些:心跳及緩存獲取
客戶端注冊的實現(xiàn),緩存獲取原理
緩存獲取中的增量拉取與全量拉取的區(qū)別
增量拉取在服務(wù)端的實現(xiàn)原理吗伤,由一個狀態(tài)更改隊列實現(xiàn)的

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
禁止轉(zhuǎn)載吃靠,如需轉(zhuǎn)載請通過簡信或評論聯(lián)系作者。
  • 序言:七十年代末足淆,一起剝皮案震驚了整個濱河市巢块,隨后出現(xiàn)的幾起案子礁阁,更是在濱河造成了極大的恐慌,老刑警劉巖族奢,帶你破解...
    沈念sama閱讀 217,826評論 6 506
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件姥闭,死亡現(xiàn)場離奇詭異,居然都是意外死亡越走,警方通過查閱死者的電腦和手機棚品,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,968評論 3 395
  • 文/潘曉璐 我一進店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來廊敌,“玉大人铜跑,你說我怎么就攤上這事⊥ザ兀” “怎么了疼进?”我有些...
    開封第一講書人閱讀 164,234評論 0 354
  • 文/不壞的土叔 我叫張陵急膀,是天一觀的道長聚请。 經(jīng)常有香客問我,道長涡拘,這世上最難降的妖魔是什么疼电? 我笑而不...
    開封第一講書人閱讀 58,562評論 1 293
  • 正文 為了忘掉前任嚼锄,我火速辦了婚禮,結(jié)果婚禮上蔽豺,老公的妹妹穿的比我還像新娘区丑。我一直安慰自己,他們只是感情好修陡,可當(dāng)我...
    茶點故事閱讀 67,611評論 6 392
  • 文/花漫 我一把揭開白布沧侥。 她就那樣靜靜地躺著,像睡著了一般魄鸦。 火紅的嫁衣襯著肌膚如雪宴杀。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 51,482評論 1 302
  • 那天拾因,我揣著相機與錄音旺罢,去河邊找鬼。 笑死绢记,一個胖子當(dāng)著我的面吹牛扁达,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播蠢熄,決...
    沈念sama閱讀 40,271評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼跪解,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了签孔?” 一聲冷哼從身側(cè)響起惠遏,我...
    開封第一講書人閱讀 39,166評論 0 276
  • 序言:老撾萬榮一對情侶失蹤砾跃,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后节吮,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,608評論 1 314
  • 正文 獨居荒郊野嶺守林人離奇死亡判耕,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,814評論 3 336
  • 正文 我和宋清朗相戀三年透绩,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片壁熄。...
    茶點故事閱讀 39,926評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡帚豪,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出草丧,到底是詐尸還是另有隱情狸臣,我是刑警寧澤,帶...
    沈念sama閱讀 35,644評論 5 346
  • 正文 年R本政府宣布昌执,位于F島的核電站烛亦,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏懂拾。R本人自食惡果不足惜煤禽,卻給世界環(huán)境...
    茶點故事閱讀 41,249評論 3 329
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望岖赋。 院中可真熱鬧檬果,春花似錦、人聲如沸唐断。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,866評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽脸甘。三九已至恳啥,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間斤程,已是汗流浹背角寸。 一陣腳步聲響...
    開封第一講書人閱讀 32,991評論 1 269
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留忿墅,地道東北人扁藕。 一個月前我還...
    沈念sama閱讀 48,063評論 3 370
  • 正文 我出身青樓,卻偏偏與公主長得像疚脐,于是被迫代替她去往敵國和親亿柑。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 44,871評論 2 354