本章主要介紹Eureka Client端源碼分析辛萍。
客戶端主要是向Server服務(wù)端發(fā)送Http請求喳魏,主要有注冊恒削,心跳續(xù)約池颈,獲取注冊信息等功能
實例配置
在分析源碼之前,需要查看下客戶端配置文件
application.yml
server:
port: 8020
eureka:
client:
serviceUrl:
defaultZone: http://localhost:8000/eureka/ #eureka服務(wù)端提供的注冊地址 參考服務(wù)端配置的這個路徑
instance:
instance-id: client-0 #此實例注冊到eureka服務(wù)端的唯一的實例ID
prefer-ip-address: true #是否顯示IP地址
leaseRenewalIntervalInSeconds: 10 #eureka客戶需要多長時間發(fā)送心跳給eureka服務(wù)器钓丰,表明它仍然活著,默認(rèn)為30 秒 (與下面配置的單位都是秒)
leaseExpirationDurationInSeconds: 30 #Eureka服務(wù)器在接收到實例的最后一次發(fā)出的心跳后躯砰,需要等待多久才可以將此實例刪除,默認(rèn)為90秒
spring:
application:
name: service-client #此實例注冊到eureka服務(wù)端的name
訪問服務(wù)端localhost:8000的注冊信息
EurekaClientAutoConfiguration
自動配置類携丁,首先要從依賴包的spring.factories文件看起
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
org.springframework.cloud.netflix.eureka.config.EurekaClientConfigServerAutoConfiguration,\
org.springframework.cloud.netflix.eureka.config.EurekaDiscoveryClientConfigServiceAutoConfiguration,\
org.springframework.cloud.netflix.eureka.EurekaClientAutoConfiguration,\
org.springframework.cloud.netflix.ribbon.eureka.RibbonEurekaAutoConfiguration,\
org.springframework.cloud.netflix.eureka.EurekaDiscoveryClientConfiguration,\
org.springframework.cloud.netflix.eureka.reactive.EurekaReactiveDiscoveryClientConfiguration,\
org.springframework.cloud.netflix.eureka.loadbalancer.LoadBalancerEurekaAutoConfiguration
org.springframework.cloud.bootstrap.BootstrapConfiguration=\
org.springframework.cloud.netflix.eureka.config.EurekaDiscoveryClientConfigServiceBootstrapConfiguration
其中最重要的是EurekaClientAutoConfiguration類
只要類中存在EurekaClientConfig類所在的依賴包eureka-client-xx.jar就可以加載這個類
初始化EurekaClientAutoConfiguration類中的方法Bean加載到spring容器中:
1琢歇、EurekaAutoServiceRegistration 用來注冊
2、RefreshableEurekaClientConfiguration 用來開啟定時任務(wù)
注冊功能實現(xiàn)
1梦鉴、EurekaAutoServiceRegistration
實例化EurekaAutoServiceRegistration對象李茫,并放到spring容器中
org.springframework.cloud.netflix.eureka.serviceregistry.EurekaAutoServiceRegistration
2、start
由于EurekaAutoServiceRegistration類實現(xiàn)了SmartLifecycle,SmartApplicationListener等接口肥橙,所以會在容器初始化完成之后調(diào)用EurekaAutoServiceRegistration#start方法魄宏。
調(diào)用EurekaServiceRegistry的注冊方法和發(fā)布InstanceRegisteredEvent事件
3、register
org.springframework.cloud.netflix.eureka.serviceregistry.EurekaServiceRegistry#register
調(diào)用ApplicationInfoManager應(yīng)用信息管理設(shè)置實例初始化狀態(tài)信息initialStatus
4存筏、setInstanceStatus
com.netflix.appinfo.ApplicationInfoManager#setInstanceStatus
設(shè)置實例狀態(tài)信息并調(diào)用監(jiān)聽器notify方法
5宠互、notify
com.netflix.appinfo.ApplicationInfoManager.StatusChangeListener#notify
調(diào)用StatusChangeListener狀態(tài)改變監(jiān)聽器的notify方法。
這個對象實現(xiàn)在com.netflix.discovery.DiscoveryClient#initScheduledTasks方法內(nèi)
6椭坚、onDemandUpdate
com.netflix.discovery.InstanceInfoReplicator#onDemandUpdate
7名秀、run
由于InstanceInfoReplicator 類實現(xiàn)了Runnable接口,所以會調(diào)用這個run方法
8藕溅、register
com.netflix.discovery.DiscoveryClient#register
這里就到了注冊客戶端的地方
9、register
com.netflix.discovery.shared.transport.jersey.AbstractJerseyEurekaHttpClient#register
封裝http請求并調(diào)用到服務(wù)端方法
所以继榆,封裝的url是:服務(wù)端的serviceUrl:defaultZone/apps/客戶端應(yīng)用名
調(diào)到Eureka Server端
com.netflix.eureka.resources.ApplicationResource#addInstance
定時刷新任務(wù)
包括緩存更新與心跳續(xù)約巾表,通過RefreshableEurekaClientConfiguration類開始初始化,并最終通過initScheduledTasks方法開啟定時調(diào)度器任務(wù)
RefreshableEurekaClientConfiguration
EurekaClientAutoConfiguration.RefreshableEurekaClientConfiguration
@Configuration(proxyBeanMethods = false)
@ConditionalOnRefreshScope
protected static class RefreshableEurekaClientConfiguration {
@Autowired
private ApplicationContext context;
@Autowired
private AbstractDiscoveryClientOptionalArgs<?> optionalArgs;
@Bean(destroyMethod = "shutdown")
@ConditionalOnMissingBean(value = EurekaClient.class,
search = SearchStrategy.CURRENT)
@org.springframework.cloud.context.config.annotation.RefreshScope
@Lazy
public EurekaClient eurekaClient(ApplicationInfoManager manager,
EurekaClientConfig config, EurekaInstanceConfig instance,
@Autowired(required = false) HealthCheckHandler healthCheckHandler) {
// If we use the proxy of the ApplicationInfoManager we could run into a
// problem
// when shutdown is called on the CloudEurekaClient where the
// ApplicationInfoManager bean is
// requested but wont be allowed because we are shutting down. To avoid this
// we use the
// object directly.
ApplicationInfoManager appManager;
if (AopUtils.isAopProxy(manager)) {
appManager = ProxyUtils.getTargetObject(manager);
}
else {
appManager = manager;
}
CloudEurekaClient cloudEurekaClient = new CloudEurekaClient(appManager,
config, this.optionalArgs, this.context);
cloudEurekaClient.registerHealthCheck(healthCheckHandler);
return cloudEurekaClient;
}
}
這個類是程序?qū)崿F(xiàn)定時刷新任務(wù)開始的地方略吨,主要是通過new CloudEurekaClient()方法創(chuàng)建Cloud客戶端類(CloudEurekaClient)
CloudEurekaClient
下面主要查看CloudEurekaClient的調(diào)用鏈
1集币、CloudEurekaClient#CloudEurekaClient
org.springframework.cloud.netflix.eureka.CloudEurekaClient#CloudEurekaClient
調(diào)用CloudEurekaClient類的父類,和對applicationInfoManager翠忠、publisher鞠苟、eurekaTransportField等屬性值賦值
2、DiscoveryClient#DiscoveryClient
com.netflix.discovery.DiscoveryClient#DiscoveryClient
3、DiscoveryClient#DiscoveryClient
com.netflix.discovery.DiscoveryClient#DiscoveryClient
構(gòu)造器類調(diào)用当娱,傳入獲取備用服務(wù)器backupRegistryInstance實例的get方法吃既。不在這里調(diào)用
public DiscoveryClient(ApplicationInfoManager applicationInfoManager, final EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args, EndpointRandomizer randomizer) {
this(applicationInfoManager, config, args, new Provider<BackupRegistry>() {
private volatile BackupRegistry backupRegistryInstance;
@Override
public synchronized BackupRegistry get() {
if (backupRegistryInstance == null) {
String backupRegistryClassName = config.getBackupRegistryImpl();
if (null != backupRegistryClassName) {
try {
backupRegistryInstance = (BackupRegistry) Class.forName(backupRegistryClassName).newInstance();
logger.info("Enabled backup registry of type {}", backupRegistryInstance.getClass());
} catch (InstantiationException e) {
logger.error("Error instantiating BackupRegistry.", e);
} catch (IllegalAccessException e) {
logger.error("Error instantiating BackupRegistry.", e);
} catch (ClassNotFoundException e) {
logger.error("Error instantiating BackupRegistry.", e);
}
}
if (backupRegistryInstance == null) {
logger.warn("Using default backup registry implementation which does not do anything.");
backupRegistryInstance = new NotImplementedRegistryImpl();
}
}
return backupRegistryInstance;
}
}, randomizer);
}
4、DiscoveryClient#DiscoveryClient
定義調(diào)度器類跨细、心跳執(zhí)行器和緩存刷新執(zhí)行器等的定義鹦倚。
com.netflix.discovery.DiscoveryClient#DiscoveryClient
DiscoveryClient(ApplicationInfoManager applicationInfoManager, EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args,
Provider<BackupRegistry> backupRegistryProvider, EndpointRandomizer endpointRandomizer) {
// ........前半部分省略
try {
// default size of 2 - 1 each for heartbeat and cacheRefresh
scheduler = Executors.newScheduledThreadPool(2,
new ThreadFactoryBuilder()
.setNameFormat("DiscoveryClient-%d")
.setDaemon(true)
.build());
heartbeatExecutor = new ThreadPoolExecutor(
1, clientConfig.getHeartbeatExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
new ThreadFactoryBuilder()
.setNameFormat("DiscoveryClient-HeartbeatExecutor-%d")
.setDaemon(true)
.build()
); // use direct handoff
cacheRefreshExecutor = new ThreadPoolExecutor(
1, clientConfig.getCacheRefreshExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
new ThreadFactoryBuilder()
.setNameFormat("DiscoveryClient-CacheRefreshExecutor-%d")
.setDaemon(true)
.build()
); // use direct handoff
eurekaTransport = new EurekaTransport();
scheduleServerEndpointTask(eurekaTransport, args);
AzToRegionMapper azToRegionMapper;
if (clientConfig.shouldUseDnsForFetchingServiceUrls()) {
azToRegionMapper = new DNSBasedAzToRegionMapper(clientConfig);
} else {
azToRegionMapper = new PropertyBasedAzToRegionMapper(clientConfig);
}
if (null != remoteRegionsToFetch.get()) {
azToRegionMapper.setRegionsToFetch(remoteRegionsToFetch.get().split(","));
}
instanceRegionChecker = new InstanceRegionChecker(azToRegionMapper, clientConfig.getRegion());
} catch (Throwable e) {
throw new RuntimeException("Failed to initialize DiscoveryClient!", e);
}
if (clientConfig.shouldFetchRegistry() && !fetchRegistry(false)) {
fetchRegistryFromBackup();
}
// call and execute the pre registration handler before all background tasks (inc registration) is started
if (this.preRegistrationHandler != null) {
this.preRegistrationHandler.beforeRegistration();
}
if (clientConfig.shouldRegisterWithEureka() && clientConfig.shouldEnforceRegistrationAtInit()) {
try {
if (!register() ) {
throw new IllegalStateException("Registration error at startup. Invalid server response.");
}
} catch (Throwable th) {
logger.error("Registration error at startup: {}", th.getMessage());
throw new IllegalStateException(th);
}
}
// finally, init the schedule tasks (e.g. cluster resolvers, heartbeat, instanceInfo replicator, fetch
initScheduledTasks();
try {
Monitors.registerObject(this);
} catch (Throwable e) {
logger.warn("Cannot register timers", e);
}
// This is a bit of hack to allow for existing code using DiscoveryManager.getInstance()
// to work with DI'd DiscoveryClient
DiscoveryManager.getInstance().setDiscoveryClient(this);
DiscoveryManager.getInstance().setEurekaClientConfig(config);
initTimestampMs = System.currentTimeMillis();
logger.info("Discovery Client initialized at timestamp {} with initial instances count: {}",
initTimestampMs, this.getApplications().size());
}
這個方法主要流程:
①、這個方法前半部分是初始化屬性值冀惭。
②震叙、根據(jù)客戶端client配置文件,config.shouldFetchRegistry()是否獲取注冊表信息和
config.shouldRegisterWithEureka()是否注冊到eureka上來對屬性賦值散休,或直接返回
③媒楼、初始化調(diào)度器scheduler、兩個線程池執(zhí)行器heartbeatExecutor(心跳續(xù)約)和cacheRefreshExecutor(緩存刷新戚丸,定時獲取注冊信息表)
④划址、在獲取服務(wù)注冊信息條件下,沒有獲取到信息或異常即fetchRegistry(false)返回false昏滴『秭辏可以從備用服務(wù)器獲取調(diào)用fetchRegistryFromBackup()方法,內(nèi)部實現(xiàn)方法調(diào)用備用服務(wù)器類的get方法backupRegistryProvider.get()
⑤谣殊、初始化調(diào)度器任務(wù)方法initScheduledTasks()
初始化調(diào)度器任務(wù)initScheduledTasks
調(diào)度器任務(wù)包括:
1拂共、定時刷新緩存注冊表信息,分為全量獲取和增量獲取
2姻几、定時向服務(wù)端發(fā)送心跳續(xù)約
3宜狐、狀態(tài)改變監(jiān)聽器執(zhí)行
這里不僅包括這些定時任務(wù),注冊也是在這里調(diào)用狀態(tài)改變監(jiān)聽器StatusChangeListener的notify方法
com.netflix.discovery.DiscoveryClient#initScheduledTasks
/**
* Initializes all scheduled tasks.
*/
private void initScheduledTasks() {
if (clientConfig.shouldFetchRegistry()) {
// registry cache refresh timer
int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds();
int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound();
scheduler.schedule(
new TimedSupervisorTask(
"cacheRefresh",
scheduler,
cacheRefreshExecutor,
registryFetchIntervalSeconds,
TimeUnit.SECONDS,
expBackOffBound,
new CacheRefreshThread()
),
registryFetchIntervalSeconds, TimeUnit.SECONDS);
}
if (clientConfig.shouldRegisterWithEureka()) {
int renewalIntervalInSecs = instanceInfo.getLeaseInfo().getRenewalIntervalInSecs();
int expBackOffBound = clientConfig.getHeartbeatExecutorExponentialBackOffBound();
logger.info("Starting heartbeat executor: " + "renew interval is: {}", renewalIntervalInSecs);
// Heartbeat timer
scheduler.schedule(
new TimedSupervisorTask(
"heartbeat",
scheduler,
heartbeatExecutor,
renewalIntervalInSecs,
TimeUnit.SECONDS,
expBackOffBound,
new HeartbeatThread()
),
renewalIntervalInSecs, TimeUnit.SECONDS);
// InstanceInfo replicator
instanceInfoReplicator = new InstanceInfoReplicator(
this,
instanceInfo,
clientConfig.getInstanceInfoReplicationIntervalSeconds(),
2); // burstSize
statusChangeListener = new ApplicationInfoManager.StatusChangeListener() {
@Override
public String getId() {
return "statusChangeListener";
}
@Override
public void notify(StatusChangeEvent statusChangeEvent) {
if (InstanceStatus.DOWN == statusChangeEvent.getStatus() ||
InstanceStatus.DOWN == statusChangeEvent.getPreviousStatus()) {
// log at warn level if DOWN was involved
logger.warn("Saw local status change event {}", statusChangeEvent);
} else {
logger.info("Saw local status change event {}", statusChangeEvent);
}
instanceInfoReplicator.onDemandUpdate();
}
};
if (clientConfig.shouldOnDemandUpdateStatusChange()) {
applicationInfoManager.registerStatusChangeListener(statusChangeListener);
}
instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds());
} else {
logger.info("Not registering with Eureka server per configuration");
}
}
1蛇捌、心跳續(xù)約
1抚恒、initScheduledTasks
com.netflix.discovery.DiscoveryClient#initScheduledTasks
TimedSupervisorTask繼承了TimerTask,TimerTask實現(xiàn)了Runnable
TimedSupervisorTask類的構(gòu)造方法
public TimedSupervisorTask(String name, ScheduledExecutorService scheduler, ThreadPoolExecutor executor,
int timeout, TimeUnit timeUnit, int expBackOffBound, Runnable task) {
this.scheduler = scheduler;
this.executor = executor;
this.timeoutMillis = timeUnit.toMillis(timeout);
this.task = task;
this.delay = new AtomicLong(timeoutMillis);
this.maxDelay = timeoutMillis * expBackOffBound;
Monitors.registerObject(name, this);
}
2络拌、HeartbeatThread
執(zhí)行TimedSupervisorTask的task任務(wù)俭驮,在給定的間隔內(nèi)執(zhí)行心跳續(xù)約任務(wù)
com.netflix.discovery.DiscoveryClient.HeartbeatThread
3、renew
續(xù)約任務(wù)春贸,續(xù)約成功更新lastSuccessfulHeartbeatTimestamp參數(shù)混萝。通過REST方式進行續(xù)訂
com.netflix.discovery.DiscoveryClient#renew
4、sendHeartBeat
拼接http請求萍恕,發(fā)送心跳
com.netflix.discovery.shared.transport.jersey.AbstractJerseyEurekaHttpClient#sendHeartBeat
客戶端appName=SERVICE-CLIENT逸嘀,id是instance-id屬性值client-0
服務(wù)端調(diào)用到renewLease方法續(xù)約,appName和id與客戶端傳過來的相同
com.netflix.eureka.resources.InstanceResource#renewLease
全量拉取和增量拉取
在定時刷新緩存實現(xiàn)獲取注冊信息允粤,分為全量拉取和增量拉取
創(chuàng)建TimedSupervisorTask調(diào)度任務(wù)類崭倘,傳入cacheRefreshExecutor執(zhí)行器翼岁、CacheRefreshThread任務(wù)類、從服務(wù)端獲取注冊信息的時間間隔RegistryFetchIntervalSeconds等參數(shù)信息
1司光、run
定時執(zhí)行CacheRefreshThread類的run方法
2琅坡、refreshRegistry
首先對remoteRegionsModified參數(shù)進行判斷,這樣可以確保對遠(yuǎn)程區(qū)域進行動態(tài)更改時可以獲取數(shù)據(jù)飘庄。如果更改則remoteRegionsModified=true脑蠕,只進行全量拉取
com.netflix.discovery.DiscoveryClient#refreshRegistry
3、fetchRegistry
com.netflix.discovery.DiscoveryClient#fetchRegistry
這個方法是決定了使用那種方式拉取
private boolean fetchRegistry(boolean forceFullRegistryFetch) {
Stopwatch tracer = FETCH_REGISTRY_TIMER.start();
try {
// If the delta is disabled or if it is the first time, get all
// applications
Applications applications = getApplications();
if (clientConfig.shouldDisableDelta()
|| (!Strings.isNullOrEmpty(clientConfig.getRegistryRefreshSingleVipAddress()))
|| forceFullRegistryFetch
|| (applications == null)
|| (applications.getRegisteredApplications().size() == 0)
|| (applications.getVersion() == -1)) //Client application does not have latest library supporting delta
{
logger.info("Disable delta property : {}", clientConfig.shouldDisableDelta());
logger.info("Single vip registry refresh property : {}", clientConfig.getRegistryRefreshSingleVipAddress());
logger.info("Force full registry fetch : {}", forceFullRegistryFetch);
logger.info("Application is null : {}", (applications == null));
logger.info("Registered Applications size is zero : {}",
(applications.getRegisteredApplications().size() == 0));
logger.info("Application version is -1: {}", (applications.getVersion() == -1));
getAndStoreFullRegistry();
} else {
getAndUpdateDelta(applications);
}
applications.setAppsHashCode(applications.getReconcileHashCode());
logTotalInstances();
} catch (Throwable e) {
logger.error(PREFIX + "{} - was unable to refresh its cache! status = {}", appPathIdentifier, e.getMessage(), e);
return false;
} finally {
if (tracer != null) {
tracer.stop();
}
}
// Notify about cache refresh before updating the instance remote status
onCacheRefreshed();
// Update remote status based on refreshed data held in the cache
updateInstanceRemoteStatus();
// registry was fetched successfully, so return true
return true;
}
全量拉取條件(任意一個)
①跪削、disable-delta屬性值是true 關(guān)閉增量拉取
②谴仙、registry-refresh-single-vip-address 屬性vip地址的值不為空
③、forceFullRegistryFetch 為true 傳過來的變量值
④碾盐、localRegionApps的applications是null 當(dāng)前區(qū)域應(yīng)用
⑤晃跺、applications的數(shù)量是0
⑥、applications的版本是-1
增量拉取
實現(xiàn)增量拉取的條件是不符合全量拉取毫玖,調(diào)用getAndUpdateDelta方法
com.netflix.discovery.DiscoveryClient#getAndUpdateDelta
private void getAndUpdateDelta(Applications applications) throws Throwable {
long currentUpdateGeneration = fetchRegistryGeneration.get();
Applications delta = null;
EurekaHttpResponse<Applications> httpResponse = eurekaTransport.queryClient.getDelta(remoteRegionsRef.get());
if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) {
delta = httpResponse.getEntity();
}
if (delta == null) {
logger.warn("The server does not allow the delta revision to be applied because it is not safe. "
+ "Hence got the full registry.");
getAndStoreFullRegistry();
} else if (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1)) {
logger.debug("Got delta update with apps hashcode {}", delta.getAppsHashCode());
String reconcileHashCode = "";
if (fetchRegistryUpdateLock.tryLock()) {
try {
updateDelta(delta);
reconcileHashCode = getReconcileHashCode(applications);
} finally {
fetchRegistryUpdateLock.unlock();
}
} else {
logger.warn("Cannot acquire update lock, aborting getAndUpdateDelta");
}
// There is a diff in number of instances for some reason
if (!reconcileHashCode.equals(delta.getAppsHashCode()) || clientConfig.shouldLogDeltaDiff()) {
reconcileAndLogDifference(delta, reconcileHashCode); // this makes a remoteCall
}
} else {
logger.warn("Not updating application delta as another thread is updating it already");
logger.debug("Ignoring delta update with apps hashcode {}, as another thread is updating it already", delta.getAppsHashCode());
}
}
這個方法實現(xiàn)了增量拉取的請求實現(xiàn)掀虎,及對拉取增量結(jié)果的處理
1、getDelta
eurekaTransport.queryClient.getDelta(remoteRegionsRef.get())的具體實現(xiàn)是通過AbstractJerseyEurekaHttpClient類實現(xiàn)的
com.netflix.discovery.shared.transport.jersey.AbstractJerseyEurekaHttpClient#getDelta
拼接apps/delta開頭的請求
2付枫、getApplicationsInternal
com.netflix.discovery.shared.transport.jersey.AbstractJerseyEurekaHttpClient#getApplicationsInternal
拼接的http請求是:http://localhost:8000/eureka/apps/delta然后調(diào)用到Eureka Server服務(wù)端
3烹玉、getContainerDifferential
服務(wù)端通過getContainerDifferential接收
com.netflix.eureka.resources.ApplicationsResource#getContainerDifferential
無論是調(diào)用到responseCache.getGZIP(cacheKey)方法,還是responseCache.get(cacheKey)方法阐滩。最終都是調(diào)用到Eureka Server的查詢的三種緩存二打,最終會經(jīng)過讀寫緩存readWriteCacheMap處理。
4掂榔、readWriteCacheMap
調(diào)用到ResponseCacheImpl構(gòu)造方法中readWriteCacheMap讀寫緩存的創(chuàng)建中继效,如果緩存中沒有會調(diào)用generatePayload方法
com.netflix.eureka.registry.ResponseCacheImpl#readWriteCacheMap
5、generatePayload
com.netflix.eureka.registry.ResponseCacheImpl#generatePayload
根據(jù)傳入的Key參數(shù)處理不同的情況
key = { eurekaAccept = "full"装获,entityType = "Application"瑞信,hashKey = "ApplicationALL_APPS_DELTAJSONV2full", requestVersion = "V2"穴豫, requestType = "JSON"凡简, regions = null, entityName = "ALL_APPS_DELTA"}
entityType 可分為Application精肃、VIP潘鲫、SVIP、default等情況
在entityType 是Application情況下EntityName分為:"ALL_APPS"(全量拉壤哒取)和"ALL_APPS_DELTA"(增量拉取)
在全量或增量拉取下有isRemoteRegionRequested參數(shù)判斷是否具有遠(yuǎn)程區(qū)域請求
本次處理是:增量拉取的沒有遠(yuǎn)程區(qū)域請求挖函,并調(diào)用getPayLoad方法加載是否含有增量數(shù)據(jù)
6状植、getApplicationDeltas
獲取增量實例
com.netflix.eureka.registry.AbstractInstanceRegistry#getApplicationDeltas
public Applications getApplicationDeltas() {
GET_ALL_CACHE_MISS_DELTA.increment();
Applications apps = new Applications();
apps.setVersion(responseCache.getVersionDelta().get());
Map<String, Application> applicationInstancesMap = new HashMap<String, Application>();
try {
write.lock();
Iterator<RecentlyChangedItem> iter = this.recentlyChangedQueue.iterator();
logger.debug("The number of elements in the delta queue is : {}",
this.recentlyChangedQueue.size());
while (iter.hasNext()) {
Lease<InstanceInfo> lease = iter.next().getLeaseInfo();
InstanceInfo instanceInfo = lease.getHolder();
logger.debug(
"The instance id {} is found with status {} and actiontype {}",
instanceInfo.getId(), instanceInfo.getStatus().name(), instanceInfo.getActionType().name());
Application app = applicationInstancesMap.get(instanceInfo
.getAppName());
if (app == null) {
app = new Application(instanceInfo.getAppName());
applicationInstancesMap.put(instanceInfo.getAppName(), app);
apps.addApplication(app);
}
app.addInstance(new InstanceInfo(decorateInstanceInfo(lease)));
}
boolean disableTransparentFallback = serverConfig.disableTransparentFallbackToOtherRegion();
if (!disableTransparentFallback) {
Applications allAppsInLocalRegion = getApplications(false);
for (RemoteRegionRegistry remoteRegistry : this.regionNameVSRemoteRegistry.values()) {
Applications applications = remoteRegistry.getApplicationDeltas();
for (Application application : applications.getRegisteredApplications()) {
Application appInLocalRegistry =
allAppsInLocalRegion.getRegisteredApplications(application.getName());
if (appInLocalRegistry == null) {
apps.addApplication(application);
}
}
}
}
Applications allApps = getApplications(!disableTransparentFallback);
apps.setAppsHashCode(allApps.getReconcileHashCode());
return apps;
} finally {
write.unlock();
}
}
這個方法主要是遍歷recentlyChangedQueue存在的數(shù)據(jù)放入到Applications對象中浊竟。所以recentlyChangedQueue隊列中存在什么數(shù)據(jù)就很重要,因此我們需要了解最新更新隊列recentlyChangedQueue是如何放入的及放入那些數(shù)據(jù)津畸,及其的移除的原理振定。
在這個方法最后 apps.setAppsHashCode設(shè)置了當(dāng)前服務(wù)端所有注冊信息的HashCode,所以這個增量對象存儲了最新的狀態(tài)HashCode值肉拓。
7后频、客戶端獲取增量數(shù)據(jù)的處理
還是在getAndUpdateDelta方法內(nèi),對服務(wù)端傳輸過來數(shù)據(jù)暖途,獲取當(dāng)前服務(wù)端的增量數(shù)據(jù)部分
com.netflix.discovery.DiscoveryClient#getAndUpdateDelta
這個方法的主要過程是:
如果增量數(shù)據(jù)部分為空卑惜,則執(zhí)行全量拉取。
對當(dāng)前服務(wù)的注冊信息表執(zhí)行updateDelta(delta)方法驻售,對當(dāng)前注冊實例的增加刪除或修改操作
當(dāng)前更新后的服務(wù)注冊表的HashCode值與增量對象存儲的最新的狀態(tài)HashCode值比較露久,如果不相等 則執(zhí)行全量拉取
recentlyChangedQueue
最新更新隊列ConcurrentLinkedQueue<RecentlyChangedItem> recentlyChangedQueue
com.netflix.eureka.registry.AbstractInstanceRegistry#AbstractInstanceRegistry類在構(gòu)建創(chuàng)建注冊表時創(chuàng)建了recentlyChangedQueue隊列,并創(chuàng)建了一個增量調(diào)度任務(wù)方法getDeltaRetentionTask方法
com.netflix.eureka.registry.AbstractInstanceRegistry#getDeltaRetentionTask
對recentlyChangedQueue隊列中對最近改變的隊列在一定時間范圍retentionTimeInMSInDeltaQueue=180000ms(3分鐘)外的進行定時清除(30s清除一次)
recentlyChangedQueue隊列添加條件:
1欺栗、注冊時register
2毫痕、下線時Cancel
3、statusUpdate
4迟几、deleteStatusOverride
getDeltaRetentionTask進行定時清除
全量拉取
全量拉取與增量拉取過程類似
全量拉取調(diào)用getAndStoreFullRegistry方法
1消请、getAndStoreFullRegistry
com.netflix.discovery.DiscoveryClient#getAndStoreFullRegistry
2、getApplications
com.netflix.discovery.shared.transport.EurekaHttpClient#getApplications
3类腮、getApplications
com.netflix.discovery.shared.transport.jersey.AbstractJerseyEurekaHttpClient#getApplications
拼接的Http請求是:apps/
4臊泰、getApplicationsInternal
com.netflix.discovery.shared.transport.jersey.AbstractJerseyEurekaHttpClient#getApplicationsInternal
5、getContainers
調(diào)用到Eureka Server服務(wù)端的getContainers方法存哲,這里調(diào)用與增量拉取類似因宇。查詢經(jīng)過三層緩存結(jié)構(gòu),具體過程可參考增量查詢與三層緩存結(jié)構(gòu)祟偷。
com.netflix.eureka.resources.ApplicationsResource#getContainers
如果acceptEncoding包含值gzip將會調(diào)用ResponseCacheImpl#getGZIP方法獲取
否則調(diào)用ResponseCacheImpl#get(Key)方法
最終會調(diào)用到讀寫緩存的generatePayload方法處理
6察滑、generatePayload
com.netflix.eureka.registry.ResponseCacheImpl#generatePayload
Key結(jié)構(gòu)中的EntityName是"ALL_APPS"全量查詢。
7修肠、getApplications
獲取注冊表所有的注冊信息
com.netflix.eureka.registry.AbstractInstanceRegistry#getApplications()
8贺辰、結(jié)果處理
放入到當(dāng)前對象localRegionApps緩存中
如果含有獲取遠(yuǎn)程區(qū)域注冊信息FetchingRemoteRegionRegistries,只需要分到不同的索引位置
總結(jié):
Eureka Client客戶端與Eureka Server服務(wù)端是關(guān)聯(lián)的嵌施,不能分開處理饲化。關(guān)鍵點有以下幾點:
客戶端主要了解的是如何向服務(wù)端發(fā)起請求及服務(wù)端接收接口的對應(yīng)
客戶端的定時任務(wù)有哪些:心跳及緩存獲取
客戶端注冊的實現(xiàn),緩存獲取原理
緩存獲取中的增量拉取與全量拉取的區(qū)別
增量拉取在服務(wù)端的實現(xiàn)原理吗伤,由一個狀態(tài)更改隊列實現(xiàn)的