Eureka源碼分析(2.1.4.Release)
首先源碼切忌一行一行debug驳糯,需先了解eureka主要功能后届搁,再分析其功能如何實(shí)現(xiàn)爆阶。
主要功能
- 服務(wù)注冊(cè):客戶端向eureka服務(wù)端注冊(cè)自己的服務(wù),服務(wù)端將信息存于多級(jí)緩存中缭黔。
- 服務(wù)續(xù)約:服務(wù)注冊(cè)后蒂破,客戶端會(huì)定時(shí)發(fā)送心跳包來保持可用性,避免被剔除寞蚌。每30秒(eureka.instance.leaseRenewallIntervalInSeconds)發(fā)送一次心跳來進(jìn)行服務(wù)續(xù)約钠糊。
- 服務(wù)同步:eureka服務(wù)端之間互相進(jìn)行注冊(cè),構(gòu)建server集群壹哺,不同Server之間會(huì)進(jìn)行服務(wù)同步,用來保證服務(wù)信息的一致性管宵。
- 服務(wù)列表獲取:客戶端請(qǐng)求eureka server獲取注冊(cè)的服務(wù)清單箩朴,并緩存到客戶端本地岗喉。默認(rèn)間隔30秒。(eureka.client.registryFetchIntervalSeconds)
fetchRegistry:true 當(dāng)值為true時(shí)炸庞,會(huì)定期從eureka服務(wù)端獲取列表,集群中的eureka服務(wù)端也會(huì)從其他的eureka節(jié)點(diǎn)中獲取列表)
- 遠(yuǎn)程調(diào)用:服務(wù)調(diào)用方在獲取到服務(wù)清單后查牌,就可以根據(jù)自身的一個(gè)負(fù)載均衡策略找到一個(gè)服務(wù)url滥壕,并對(duì)他發(fā)起請(qǐng)求纸颜。
- 服務(wù)下線:客戶端關(guān)閉或重啟時(shí)绎橘,需告訴server端該服務(wù)已下線。服務(wù)端在收到請(qǐng)求后涮较,就會(huì)把該服務(wù)狀態(tài)置為下線(DOWN)胡岔,并把該下線事件傳播出去
- 服務(wù)剔除:如果客戶端沒有正常下線,需要有剔除機(jī)制靶瘸,將不可用的服務(wù)刪除掉。EurekaServer 在啟動(dòng)時(shí)會(huì)創(chuàng)建一個(gè)定時(shí)任務(wù)怨咪,每隔默認(rèn)60秒,從當(dāng)前服務(wù)清單中把超時(shí)沒有續(xù)約(默認(rèn)90秒唉匾,eureka.instance.leaseExpirationDurationInSeconds)的服務(wù)剔除
- 自我保護(hù):當(dāng)短時(shí)間內(nèi),統(tǒng)計(jì)續(xù)約失敗的比例巍膘,如果達(dá)到一定閾值,就會(huì)觸發(fā)自我保護(hù)機(jī)制璃饱。在該機(jī)制下肪康,Eureka Server不會(huì)剔除任何微服務(wù)荚恶。等到一分鐘內(nèi)最近續(xù)約個(gè)數(shù)大于一分鐘內(nèi)最小續(xù)約,再退出自我保護(hù)機(jī)制磷支。自我保護(hù)開關(guān)(eureka.server.enable-self-preservation:false)
Eureka保證AP
什么是AP谒撼?
著名的CAP理論指出,一個(gè)分布式系統(tǒng)不可能同時(shí)滿足C(一致性)雾狈、A(可用性)和P(分區(qū)容錯(cuò)性)廓潜。
由于分區(qū)容錯(cuò)性在是分布式系統(tǒng)中必須要保證的,因此我們只能在A和C之間進(jìn)行權(quán)衡箍邮。Eureka選擇了保證AP锭弊。
- 如一個(gè)客戶端因網(wǎng)絡(luò)原因在一個(gè)eureka服務(wù)端無法獲取服務(wù)列表時(shí)擂错,可以從其他eureka服務(wù)端獲取數(shù)據(jù),即使與之前的eureka服務(wù)列表不一致钮呀;
- eureka服務(wù)端有同步機(jī)制爽醋,保證了服務(wù)列表的最終一致性(非強(qiáng)一致性);
客戶端初始化
這里簡(jiǎn)要分析一下
springboot會(huì)讀取spring-cloud-netflix-eureka-client/2.1.1.RELEASE/spring-cloud-netflix-eureka-client-2.1.1.RELEASE.jar!/META-INF/spring.factories文件.(詳情請(qǐng)看上一篇springboot自動(dòng)裝載)
自動(dòng)初始化EnableAutoConfiguration下的所有配置類蚂四,內(nèi)容如下:
重點(diǎn)看EurekaClientAutoConfiguration類
//初始化eureka客戶端
@Bean(destroyMethod = "shutdown")
@ConditionalOnMissingBean(value = EurekaClient.class, search = SearchStrategy.CURRENT)
public EurekaClient eurekaClient(ApplicationInfoManager manager,
EurekaClientConfig config) {
return new CloudEurekaClient(manager, config, this.optionalArgs,
this.context);
}
CloudEurekaClient繼承了DiscoveryClient類遂赠,該類為原生eureka類跷睦,所以springcloud的eureka客戶端實(shí)際是封裝了原生eureka客戶端,分析發(fā)現(xiàn)抑诸,eureka主要功能都在原生類上,所以直接看原生eureka類的初始化代碼就行了奸绷。
簡(jiǎn)要初始化流程如下:
image.png
具體初始化代碼:com.netflix.discovery.DiscoveryClient#DiscoveryClient(com.netflix.appinfo.ApplicationInfoManager, com.netflix.discovery.EurekaClientConfig, com.netflix.discovery.AbstractDiscoveryClientOptionalArgs, javax.inject.Provider<com.netflix.discovery.BackupRegistry>)
代碼片段:
……忽略上部分代碼……
//實(shí)例化一個(gè)線程池(核心線程:2個(gè)健盒,分別對(duì)應(yīng)心跳續(xù)約任務(wù)和獲取服務(wù)列表任務(wù))
scheduler = Executors.newScheduledThreadPool(2,
new ThreadFactoryBuilder()
.setNameFormat("DiscoveryClient-%d")
.setDaemon(true)
.build());
//聲明心跳續(xù)約執(zhí)行器(續(xù)約)
heartbeatExecutor = new ThreadPoolExecutor(
1, clientConfig.getHeartbeatExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
new ThreadFactoryBuilder()
.setNameFormat("DiscoveryClient-HeartbeatExecutor-%d")
.setDaemon(true)
.build()
);
//聲明更新服務(wù)列表緩存執(zhí)行器(獲取任務(wù)列表)
cacheRefreshExecutor = new ThreadPoolExecutor(
1, clientConfig.getCacheRefreshExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
new ThreadFactoryBuilder()
.setNameFormat("DiscoveryClient-CacheRefreshExecutor-%d")
.setDaemon(true)
.build()
);
……忽略部分代碼……
//重點(diǎn)看這里:初始化定時(shí)任務(wù)
initScheduledTasks();
com.netflix.discovery.DiscoveryClient#initScheduledTasks:
private void initScheduledTasks() {
//默認(rèn)為true扣癣,只有eureka服務(wù)端非集群模式時(shí),該設(shè)置可以為false
if (clientConfig.shouldFetchRegistry()) {
//默認(rèn)eureka.client.registryFetchIntervalSeconds=30父虑,延時(shí)30秒執(zhí)行
int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds();
//默認(rèn)eureka.client.cacheRefreshExecutorExponentialBackOffBound=10士嚎,如果發(fā)生超時(shí)的重試延遲的最大乘數(shù)。
int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound();
//其中scheduler和cacheRefreshExecutor為之前提到的聲明對(duì)象莱衩,其實(shí)就是初始化了一個(gè)cacheRefresh名字的線程執(zhí)行任務(wù)
scheduler.schedule(
new TimedSupervisorTask(
"cacheRefresh",
scheduler,
cacheRefreshExecutor,
registryFetchIntervalSeconds,
TimeUnit.SECONDS,
expBackOffBound,
new CacheRefreshThread()
),
registryFetchIntervalSeconds, TimeUnit.SECONDS);
}
……忽略下半部分代碼(與服務(wù)注冊(cè)任務(wù)相關(guān))……
在這里可以看到scheduler.schedule方法就是在定義一個(gè)延時(shí)任務(wù)笨蚁,30秒后執(zhí)行。但是按理說應(yīng)該定義一個(gè)定時(shí)任務(wù)定時(shí)獲取服務(wù)列表才對(duì)括细。為什么要這樣定義呢奋单?下面重點(diǎn)看看TimedSupervisorTask
初始化實(shí)際賦值:
//上文提到的線程池
this.scheduler = scheduler;
//上文提到的緩存更新執(zhí)行器
this.executor = cacheRefreshExecutor;
//超時(shí)時(shí)間:上文提到的默認(rèn)30秒
this.timeoutMillis = timeUnit.toMillis(registryFetchIntervalSeconds);
//執(zhí)行的真正任務(wù)
this.task = new CacheRefreshThread();
//延遲時(shí)間:上文提到的默認(rèn)30秒
this.delay = new AtomicLong(timeoutMillis);
//最大延遲時(shí)間:30秒*10=300秒,有什么用呢览濒?看下面解釋
this.maxDelay = timeoutMillis * expBackOffBound
這里說一下TimedSupervisorTask的執(zhí)行邏輯匾七,非固定的執(zhí)行時(shí)間,而是采用動(dòng)態(tài)間隔時(shí)間昨忆,去執(zhí)行獲取服務(wù)列表任務(wù)。(在平常工作中席里,也可以效仿此方法實(shí)現(xiàn)動(dòng)態(tài)設(shè)置任務(wù))
具體實(shí)現(xiàn)流程圖如下:
具體實(shí)現(xiàn)代碼:
com.netflix.discovery.TimedSupervisorTask#run:
@Override
public void run() {
Future<?> future = null;
try {
future = executor.submit(task);
threadPoolLevelGauge.set((long) executor.getActiveCount());
//執(zhí)行任務(wù)在默認(rèn)超時(shí)時(shí)間內(nèi)奖磁。
future.get(timeoutMillis, TimeUnit.MILLISECONDS);
//如果任務(wù)沒超時(shí),則把延時(shí)時(shí)間設(shè)置為默認(rèn)的超時(shí)時(shí)間30秒
delay.set(timeoutMillis);
threadPoolLevelGauge.set((long) executor.getActiveCount());
successCounter.increment();
} catch (TimeoutException e) {
//如果超時(shí)
logger.warn("task supervisor timed out", e);
timeoutCounter.increment();
long currentDelay = delay.get();
//設(shè)置最新延時(shí)時(shí)間:將當(dāng)前超時(shí)時(shí)間*2秕狰,與最大延時(shí)比較躁染,即超時(shí)時(shí)間不能超過300秒
long newDelay = Math.min(maxDelay, currentDelay * 2);
//CAS來控制多線程同步
delay.compareAndSet(currentDelay, newDelay);
} catch (RejectedExecutionException e) {
if (executor.isShutdown() || scheduler.isShutdown()) {
logger.warn("task supervisor shutting down, reject the task", e);
} else {
logger.warn("task supervisor rejected the task", e);
}
rejectedCounter.increment();
} catch (Throwable e) {
if (executor.isShutdown() || scheduler.isShutdown()) {
logger.warn("task supervisor shutting down, can't accept the task");
} else {
logger.warn("task supervisor threw an exception", e);
}
throwableCounter.increment();
} finally {
if (future != null) {
future.cancel(true);
}
//又調(diào)用一個(gè)延時(shí)任務(wù),延時(shí)時(shí)間為新設(shè)置的延時(shí)時(shí)間
if (!scheduler.isShutdown()) {
scheduler.schedule(this, delay.get(), TimeUnit.MILLISECONDS);
}
}
}
服務(wù)列表獲取
下面再來看看CacheRefreshThread的實(shí)現(xiàn)邏輯
com.netflix.discovery.DiscoveryClient#refreshRegistry->
com.netflix.discovery.DiscoveryClient#fetchRegistry
private boolean fetchRegistry(boolean forceFullRegistryFetch) {
Stopwatch tracer = FETCH_REGISTRY_TIMER.start();
try {
//獲取本地緩存:localRegionApps.get()
Applications applications = getApplications();
//判斷是全量更新還是增量更新
if (clientConfig.shouldDisableDelta()
|| (!Strings.isNullOrEmpty(clientConfig.getRegistryRefreshSingleVipAddress()))
|| forceFullRegistryFetch
|| (applications == null)
|| (applications.getRegisteredApplications().size() == 0)
|| (applications.getVersion() == -1)) //Client application does not have latest library supporting delta
{
logger.info("Disable delta property : {}", clientConfig.shouldDisableDelta());
logger.info("Single vip registry refresh property : {}", clientConfig.getRegistryRefreshSingleVipAddress());
logger.info("Force full registry fetch : {}", forceFullRegistryFetch);
logger.info("Application is null : {}", (applications == null));
logger.info("Registered Applications size is zero : {}",
(applications.getRegisteredApplications().size() == 0));
logger.info("Application version is -1: {}", (applications.getVersion() == -1));
//全量獲取
getAndStoreFullRegistry();
} else {
//增量獲取
getAndUpdateDelta(applications);
}
applications.setAppsHashCode(applications.getReconcileHashCode());
……忽略下部分代碼……
}
全量獲取:
com.netflix.discovery.DiscoveryClient#getAndStoreFullRegistry->
eurekaTransport.queryClient.getApplications(remoteRegionsRef.get())->
com.netflix.discovery.shared.transport.jersey.AbstractJerseyEurekaHttpClient#getApplications->
com.netflix.discovery.shared.transport.jersey.AbstractJerseyEurekaHttpClient#getApplicationsInternal
增量獲取:
com.netflix.discovery.DiscoveryClient#getAndUpdateDelta:
eueka不是每次都是采用全量獲取服務(wù)列表挠羔,而是默認(rèn)采用增量方式埋嵌,獲取變化的實(shí)例。
當(dāng)然為保證獲取到的服務(wù)列表是最新的拌喉,獲取增量的同時(shí)俐银,eureka服務(wù)端也會(huì)把最新的服務(wù)列表hashcode返回端仰,客戶端將此hashcode與更新后的本地緩存hashcode作對(duì)比,如果不相同吱七,則重新進(jìn)行全量獲取鹤竭。
可以學(xué)習(xí)的地方:
- TimeSupervisorTask是一個(gè)動(dòng)態(tài)時(shí)間間隔任務(wù),如果遇到執(zhí)行任務(wù)超時(shí)臀稚,則擴(kuò)大為原來間隔時(shí)間的兩倍,一直到延時(shí)時(shí)間的閾值窜管。一旦新任務(wù)不再超時(shí),間隔時(shí)間又恢復(fù)為初始值幕帆。如服務(wù)列表獲取任務(wù)默認(rèn)為每隔30秒執(zhí)行,隨著執(zhí)行時(shí)間超時(shí)擴(kuò)大為原來的2倍再執(zhí)行常熙,直到到達(dá)閾值碱茁。如果執(zhí)行未超時(shí),則設(shè)置為原來的間隔時(shí)間執(zhí)行彼城。
- 采用CAS來控制多線程同步退个。
- 增量獲取數(shù)據(jù)列表。盡量避免全量獲扔镉(當(dāng)數(shù)據(jù)量很大的時(shí)候)
- 服務(wù)列表保存在本地緩存,每次遠(yuǎn)程調(diào)用代嗤,不需要經(jīng)過注冊(cè)中心缠借,提高執(zhí)行速度。當(dāng)eureka服務(wù)端掛了的時(shí)候硝逢,eureka客戶端還能通過本地緩存的服務(wù)列表正常調(diào)用遠(yuǎn)程服務(wù)绅喉。