一. 前言
本篇主要說明eureka客戶端與服務(wù)端間感知延遲的原因须床,并從源碼層面解釋各個(gè)延遲點(diǎn)的源碼實(shí)現(xiàn)滑绒,以及如何保證調(diào)用方平滑感知實(shí)例上下線带到。
二. 服務(wù)上線
- service注冊(cè)到服務(wù)端(啟動(dòng)后即時(shí)注冊(cè))遣总,0s
- 服務(wù)端只讀/讀寫緩存同步周期宙刘,
responseCacheUpdateIntervalMs=30s
- 客戶端拉取最新注冊(cè)表周期锄贷,
registryFetchIntervalSeconds=30s
- 客戶端ribbon緩存serverList更新周期译蒂,
ServerListRefreshInterval=30s
服務(wù)上線被客戶端感知的最大耗時(shí)為90s
三. 服務(wù)下線
正常下線
- service下線發(fā)起cancel請(qǐng)求到服務(wù)端(服務(wù)停止前即時(shí)請(qǐng)求),0s
- 服務(wù)端只讀/讀寫緩存同步周期谊却,
responseCacheUpdateIntervalMs=30s
- 客戶端拉取最新注冊(cè)表周期柔昼,
registryFetchIntervalSeconds=30s
- 客戶端ribbon緩存serverList更新周期,
ServerListRefreshInterval=30s
服務(wù)正常下線被客戶端感知的最大耗時(shí)為90s炎辨,延遲點(diǎn)跟上線完全一致捕透。
異常下線
異常下線指service下線時(shí)并沒有主動(dòng)發(fā)送cancel請(qǐng)求,例如kill -9 或直接宕機(jī)。
- 服務(wù)端剔除(evict)過期任務(wù)的執(zhí)行周期乙嘀,
evictionIntervalTimerInMs=60s
- 剔除任務(wù)會(huì)對(duì)90s內(nèi)未發(fā)起續(xù)約的請(qǐng)求進(jìn)行剔除末购,
leaseExpirationDurationInSeconds=90s
- 服務(wù)端只讀/讀寫緩存同步周期,
responseCacheUpdateIntervalMs=30s
- 客戶端拉取最新注冊(cè)表周期虎谢,
registryFetchIntervalSeconds=30s
- 客戶端ribbon緩存serverList更新周期盟榴,
ServerListRefreshInterval=30s
服務(wù)異常下線被客戶端感知的最大耗時(shí)為240s
下面將依次分析每個(gè)步驟所在源碼的位置及實(shí)現(xiàn)。
四. 源碼分析
服務(wù)端只讀/讀寫緩存同步
ResponseCacheImpl
// responseCacheUpdateIntervalMs 默認(rèn)30s
if (shouldUseReadOnlyResponseCache) {
timer.schedule(getCacheUpdateTask(),
new Date(((System.currentTimeMillis() / responseCacheUpdateIntervalMs) * responseCacheUpdateIntervalMs)
+ responseCacheUpdateIntervalMs),
responseCacheUpdateIntervalMs);
}
private TimerTask getCacheUpdateTask() {
return new TimerTask() {
@Override
public void run() {
for (Key key : readOnlyCacheMap.keySet()) {
try {
CurrentRequestVersion.set(key.getVersion());
Value cacheValue = readWriteCacheMap.get(key);
Value currentCacheValue = readOnlyCacheMap.get(key);
if (cacheValue != currentCacheValue) {
readOnlyCacheMap.put(key, cacheValue);
}
} catch (Throwable th) {
logger.error("Error while updating the client cache from response cache for key {}", key.toStringCompact(), th);
}
}
}
};
}
getCacheUpdateTask
的schedule每隔30s執(zhí)行一次婴噩,遍歷readOnlyCacheMap
中的每個(gè)key擎场,從readWriteCacheMap
中取出最新值,保存到value中几莽。
如果是新的key迅办,readOnlyCacheMap
之前并沒有緩存,則會(huì)在getValue
時(shí)章蚣,完成readOnlyCacheMap的填充站欺。
Value getValue(final Key key, boolean useReadOnlyCache) {
Value payload = null;
try {
if (useReadOnlyCache) {
final Value currentPayload = readOnlyCacheMap.get(key);
if (currentPayload != null) {
payload = currentPayload;
} else {
payload = readWriteCacheMap.get(key);
readOnlyCacheMap.put(key, payload);
}
} else {
payload = readWriteCacheMap.get(key);
}
} catch (Throwable t) {
logger.error("Cannot get value for key : {}", key, t);
}
return payload;
}
客戶端拉取最新注冊(cè)表任務(wù)
DiscoveryClient
/**
* Initializes all scheduled tasks.
*/
// registryFetchIntervalSeconds 默認(rèn)值30s
private void initScheduledTasks() {
if (clientConfig.shouldFetchRegistry()) {
// registry cache refresh timer
int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds();
int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound();
scheduler.schedule(
new TimedSupervisorTask(
"cacheRefresh",
scheduler,
cacheRefreshExecutor,
registryFetchIntervalSeconds,
TimeUnit.SECONDS,
expBackOffBound,
new CacheRefreshThread()
),
registryFetchIntervalSeconds, TimeUnit.SECONDS);
}
...
}
在client端啟動(dòng)過程中,DiscoveryClient的構(gòu)造方法中初始化了多個(gè)schedule任務(wù)究驴,其中一個(gè)就是開啟周期拉取服務(wù)端注冊(cè)表任務(wù)镊绪,周期時(shí)間為30s匀伏。執(zhí)行任務(wù)是new CacheRefreshThread()
灿意,拉取到的最新注冊(cè)表會(huì)保存到本地緩存中l(wèi)ocalRegionApps译暂。
AtomicReference<Applications> localRegionApps = new AtomicReference<Applications>()
客戶端ribbon緩存serverList更新
PollingServerListUpdater
@Override
public synchronized void start(final UpdateAction updateAction) {
if (isActive.compareAndSet(false, true)) {
final Runnable wrapperRunnable = new Runnable() {
@Override
public void run() {
if (!isActive.get()) {
if (scheduledFuture != null) {
scheduledFuture.cancel(true);
}
return;
}
try {
updateAction.doUpdate();
lastUpdated = System.currentTimeMillis();
} catch (Exception e) {
logger.warn("Failed one update cycle", e);
}
}
};
// initialDelayMs 默認(rèn)1s
// refreshIntervalMs 默認(rèn)30s
scheduledFuture = getRefreshExecutor().scheduleWithFixedDelay(
wrapperRunnable,
initialDelayMs,
refreshIntervalMs,
TimeUnit.MILLISECONDS
);
} else {
logger.info("Already active, no-op");
}
}
在ribbon啟動(dòng)過程中,DynamicServerListLoadBalancer的構(gòu)造方法調(diào)用了serverListUpdater.start(updateAction);
開啟了周期刷新serverList的任務(wù),每隔30s執(zhí)行一次蚪腐。
執(zhí)行的任務(wù)是doUpdate()
protected final ServerListUpdater.UpdateAction updateAction = new ServerListUpdater.UpdateAction() {
@Override
public void doUpdate() {
updateListOfServers();
}
};
public void updateListOfServers() {
List<T> servers = new ArrayList<T>();
if (serverListImpl != null) {
servers = serverListImpl.getUpdatedListOfServers();
if (filter != null) {
servers = filter.getFilteredListOfServers(servers);
}
}
updateAllServerList(servers);
}
這里在獲取最新的server注冊(cè)表時(shí),使用的是eureka緩存的值胎食,localRegionApps
或remoteRegionVsApps
逗宜,并沒有發(fā)起遠(yuǎn)程拉取注冊(cè)表的請(qǐng)求。將更新后的serverList緩存到BaseLoadBalancer父類中
@Monitor(name = PREFIX + "AllServerList", type = DataSourceType.INFORMATIONAL)
protected volatile List<Server> allServerList = Collections
.synchronizedList(new ArrayList<Server>());
@Monitor(name = PREFIX + "UpServerList", type = DataSourceType.INFORMATIONAL)
protected volatile List<Server> upServerList = Collections
.synchronizedList(new ArrayList<Server>());
我們?cè)谧远xribbon rule時(shí)剃诅,繼承AbstractLoadBalancerRule
巷送,即可直接通過getLoadBalancer()
來獲取當(dāng)前的serverList。
服務(wù)端剔除(evict)過期任務(wù)
AbstractInstanceRegistry
class EvictionTask extends TimerTask {
private final AtomicLong lastExecutionNanosRef = new AtomicLong(0l);
@Override
public void run() {
try {
long compensationTimeMs = getCompensationTimeMs();
logger.info("Running the evict task with compensationTime {}ms", compensationTimeMs);
evict(compensationTimeMs);
} catch (Throwable e) {
logger.error("Could not run the evict task", e);
}
}
}
protected void postInit() {
renewsLastMin.start();
if (evictionTaskRef.get() != null) {
evictionTaskRef.get().cancel();
}
evictionTaskRef.set(new EvictionTask());
// EvictionIntervalTimerInMs 默認(rèn)60s
evictionTimer.schedule(evictionTaskRef.get(), serverConfig.getEvictionIntervalTimerInMs(), serverConfig.getEvictionIntervalTimerInMs());
}
在EurekaBootStrap
初始化eureka上下文過程中矛辕,內(nèi)部調(diào)用AbstractInstanceRegistry.postInit
開啟EvictionTask
, 每隔60s執(zhí)行一次剔除任務(wù)笑跛。在剔除過程中會(huì)計(jì)算每次的補(bǔ)償時(shí)間(compensationTimeMs),防止因?yàn)間c或時(shí)鐘回?fù)艿纫蛩禺a(chǎn)生誤差聊品。
剔除任務(wù)會(huì)對(duì)90s內(nèi)未發(fā)起續(xù)約的請(qǐng)求進(jìn)行剔除
AbstractInstanceRegistry
public void evict(long additionalLeaseMs) {
// ...
List<Lease<InstanceInfo>> expiredLeases = new ArrayList<>();
for (Entry<String, Map<String, Lease<InstanceInfo>>> groupEntry : registry.entrySet()) {
Map<String, Lease<InstanceInfo>> leaseMap = groupEntry.getValue();
if (leaseMap != null) {
for (Entry<String, Lease<InstanceInfo>> leaseEntry : leaseMap.entrySet()) {
Lease<InstanceInfo> lease = leaseEntry.getValue();
// 這里用于判定是否過期
if (lease.isExpired(additionalLeaseMs) && lease.getHolder() != null) {
expiredLeases.add(lease);
}
}
}
}
// ... 后續(xù)將過期的實(shí)例隨機(jī)剔除一部分飞蹂,不超過總實(shí)例數(shù)的15%。
}
// 判定是否過期翻屈,additionalLeaseMs為補(bǔ)償時(shí)間
public boolean isExpired(long additionalLeaseMs) {
return (evictionTimestamp > 0 || System.currentTimeMillis() > (lastUpdateTimestamp + duration + additionalLeaseMs));
}
通過判定過期時(shí)間可以發(fā)現(xiàn)陈哑,兩次的續(xù)約時(shí)間差需要小于(duration + additionalLeaseMs),假如不考慮補(bǔ)償因素,那么續(xù)約時(shí)間差需小于duration
惊窖。
再看下duration
是怎么來的刽宪。
public Lease(T r, int durationInSecs) {
holder = r;
registrationTimestamp = System.currentTimeMillis();
lastUpdateTimestamp = registrationTimestamp;
duration = (durationInSecs * 1000);
}
首先,duration是通過Lease的構(gòu)造方法賦值界酒,而lease對(duì)象的取值是從registry
緩存中獲得的纠屋,registry
緩存則是在實(shí)例注冊(cè)方法實(shí)現(xiàn)中進(jìn)行保存。
@Override
public void register(final InstanceInfo info, final boolean isReplication) {
int leaseDuration = Lease.DEFAULT_DURATION_IN_SECS;
if (info.getLeaseInfo() != null && info.getLeaseInfo().getDurationInSecs() > 0) {
leaseDuration = info.getLeaseInfo().getDurationInSecs();
}
super.register(info, leaseDuration, isReplication);
replicateToPeers(Action.Register, info.getAppName(), info.getId(), info, null, isReplication);
}
public void register(InstanceInfo registrant, int leaseDuration, boolean isReplication) {
//...
Lease<InstanceInfo> lease = new Lease<InstanceInfo>(registrant, leaseDuration);
if (existingLease != null) {
lease.setServiceUpTimestamp(existingLease.getServiceUpTimestamp());
}
gMap.put(registrant.getId(), lease);
//...
}
通過實(shí)例注冊(cè)的方法實(shí)現(xiàn)可以看到盾计,判定過期周期時(shí)間是在InstanceInfo
中定義的售担,如果為空,則使用默認(rèn)值Lease.DEFAULT_DURATION_IN_SECS
是90s署辉。
而InstanceInfo
是由客戶端側(cè)將本次注冊(cè)的實(shí)例信息傳遞來族铆,所以繼續(xù)看客戶端對(duì)于InstanceInfo
的封裝過程。
客戶端的注冊(cè)實(shí)現(xiàn)主要在DiscoveryClient中哭尝。
InstanceInfo myInfo = applicationInfoManager.getInfo();
// ...
/**
* Register with the eureka service by making the appropriate REST call.
*/
boolean register() throws Throwable {
EurekaHttpResponse<Void> httpResponse;
try {
httpResponse = eurekaTransport.registrationClient.register(instanceInfo);
} catch (Exception e) {
logger.warn(PREFIX + "{} - registration failed {}", appPathIdentifier, e.getMessage(), e);
throw e;
}
return httpResponse.getStatusCode() == Status.NO_CONTENT.getStatusCode();
}
EurekaClientAutoConfiguration
@Bean
@ConditionalOnMissingBean(value = ApplicationInfoManager.class, search = SearchStrategy.CURRENT)
public ApplicationInfoManager eurekaApplicationInfoManager(EurekaInstanceConfig config) {
InstanceInfo instanceInfo = new InstanceInfoFactory().create(config);
return new ApplicationInfoManager(config, instanceInfo);
}
注冊(cè)參數(shù)instanceInfo是在注入過程中保存到了ApplicationInfoManager中哥攘,ApplicationInfoManager在創(chuàng)建時(shí),會(huì)通過InstanceInfoFactory
工廠來創(chuàng)建一個(gè)InstanceInfo的實(shí)例材鹦,duration
則定義在了factory構(gòu)造中逝淹。
public class InstanceInfoFactory {
public InstanceInfo create(EurekaInstanceConfig config) {
LeaseInfo.Builder leaseInfoBuilder = LeaseInfo.Builder.newBuilder()
.setRenewalIntervalInSecs(config.getLeaseRenewalIntervalInSeconds())
.setDurationInSecs(config.getLeaseExpirationDurationInSeconds());
// Builder the instance information to be registered with eureka server
InstanceInfo.Builder builder = InstanceInfo.Builder.newBuilder();
// ... build各種參數(shù)
InstanceInfo instanceInfo = builder.build();
instanceInfo.setLeaseInfo(leaseInfoBuilder.build());
return instanceInfo;
}
// ...
}
可以看到,這里使用config.getLeaseExpirationDurationInSeconds()
作為duration的值桶唐,在EurekaInstanceConfigBean
中配置了duration的值栅葡,默認(rèn)90s
private int leaseExpirationDurationInSeconds = 90;
至此我們可以確定,客戶端默認(rèn)使用90s作為實(shí)例剔除的過期時(shí)間尤泽。
五. 配置優(yōu)化
經(jīng)過上面的分析欣簇,我們了解到各個(gè)情況服務(wù)感知的所有延遲點(diǎn),以及實(shí)現(xiàn)原理坯约。在此基礎(chǔ)上熊咽,我們可以對(duì)真實(shí)場景下eureka參數(shù)配置進(jìn)行適當(dāng)優(yōu)化。
主要解決兩個(gè)問題:
- 減少客戶端的感知時(shí)間
- 保證客戶端可以正確訪問已從erueka下線的實(shí)例闹丐,不會(huì)因?yàn)槠湎戮€而緩存未及時(shí)更新導(dǎo)致失敗
1. 減少客戶端的感知時(shí)間
eureka server 配置
服務(wù)端只讀/讀寫緩存同步周期縮短到10s横殴,因?yàn)橹幌抻趦?nèi)存間兩個(gè)map的操作,可以大幅縮短緩存同步時(shí)間 (30s -> 10s)
eureka.server.response-cache-update-interval-ms=10000
eureka client 配置
客戶端拉取最新注冊(cè)表周期縮短到10s卿拴,因?yàn)榭蛻舳嗣看沃粫?huì)主動(dòng)拉取增量配置衫仑,這里也適當(dāng)縮短拉取時(shí)間 (30s -> 10s)
eureka.client.registry-fetch-interval-seconds=10
客戶端ribbon緩存serverList更新周期縮短到5s,ribbon的更新只會(huì)進(jìn)行內(nèi)存間的同步巍棱,這里可以大幅度縮短時(shí)間 (30s -> 5s)
ribbon.ServerListRefreshInterval=5000
此時(shí)惑畴,服務(wù)上線感知時(shí)間最大耗時(shí) 25s
2. 舊實(shí)例從eureka server下線后繼續(xù)保持可用(平滑啟動(dòng))
eureka為了保證調(diào)用的高效率和高可用性,在內(nèi)部模型中加入了各級(jí)緩存(包括ribbon)航徙,這就導(dǎo)致如果舊實(shí)例下線后如贷,如果客戶端沒有及時(shí)把舊實(shí)例地址剔除,請(qǐng)求仍然可以被打到下線實(shí)例上導(dǎo)致報(bào)錯(cuò)。
結(jié)合上文的內(nèi)容杠袱,如果想實(shí)現(xiàn)平滑啟動(dòng)需要完成以下幾步:
- 舊實(shí)例shutdown前尚猿,eureka client需要感知實(shí)例即將關(guān)閉,并及時(shí)告知eureka server即將下線
- 調(diào)用方需要盡可能快速感知到舊實(shí)例的狀態(tài)變化
- 舊實(shí)例從發(fā)送下線通知到徹底shutdown這個(gè)周期需要被拉長楣富,來保證客戶端更新緩存前凿掂,請(qǐng)求打到此實(shí)例上依然可以處理。
繼續(xù)從源碼層面分析這三步該如何實(shí)現(xiàn)纹蝴。
2.1 eureka client感知實(shí)例即將關(guān)閉
spring通過在啟動(dòng)過程注冊(cè)shutdown hook庄萎,當(dāng)實(shí)例關(guān)閉前,會(huì)發(fā)送ContextClosedEvent
事件塘安。
關(guān)于Spring處理服務(wù)關(guān)閉的詳細(xì)過程請(qǐng)參考 Spring 源碼分析 —— 服務(wù)優(yōu)雅關(guān)閉
eureka會(huì)監(jiān)聽ContextClosedEvent
事件糠涛,來完成通知server端下線的操作。
EurekaAutoServiceRegistration
public class EurekaAutoServiceRegistration implements AutoServiceRegistration, SmartLifecycle, Ordered, SmartApplicationListener {
@Override
public void onApplicationEvent(ApplicationEvent event) {
if (event instanceof WebServerInitializedEvent) {
onApplicationEvent((WebServerInitializedEvent) event);
}
else if (event instanceof ContextClosedEvent) {
onApplicationEvent((ContextClosedEvent) event);
}
}
public void onApplicationEvent(ContextClosedEvent event) {
if (event.getApplicationContext() == context) {
stop();
}
}
@Override
public void stop() {
this.serviceRegistry.deregister(this.registration);
this.running.set(false);
}
}
EurekaAutoServiceRegistration
實(shí)現(xiàn)SmartApplicationListener
接口來監(jiān)聽ContextClosedEvent
事件兼犯,最終會(huì)調(diào)用deregister
忍捡。
除此之外,在Spring 源碼分析 —— 服務(wù)優(yōu)雅關(guān)閉 一篇分析到spring的shutdown hook除了發(fā)送ContextClosedEvent
事件之外切黔,還會(huì)調(diào)用所有的lifecycle
的stop方法砸脊,實(shí)現(xiàn)所有l(wèi)ifecycle的關(guān)閉動(dòng)作,所以這里的stop
方法也會(huì)在事件處理完成之后再次被調(diào)用纬霞,最終也會(huì)調(diào)用deregister
凌埂。
@Override
public void deregister(EurekaRegistration reg) {
if (reg.getApplicationInfoManager().getInfo() != null) {
if (log.isInfoEnabled()) {
log.info("Unregistering application " + reg.getApplicationInfoManager().getInfo().getAppName() + " with eureka with status DOWN");
}
reg.getApplicationInfoManager().setInstanceStatus(InstanceInfo.InstanceStatus.DOWN);
}
}
deregister
方法將ApplicationInfo
中的實(shí)例status修改成DOWN,eureka內(nèi)部會(huì)監(jiān)聽狀態(tài)變更事件發(fā)送給eureka server端险领,此時(shí)server是可以及時(shí)更新實(shí)例狀態(tài)為DOWN侨舆。
2.2 調(diào)用方盡快感知server中實(shí)例新狀態(tài)
上文中也講到秒紧,調(diào)用方服務(wù)有兩層緩存绢陌,分別是eureka客戶端拉取server列表的localRegionApps
緩存和ribbon的serverList
緩存,為此我們將更新周期分別改成了10s和5s
eureka.client.registry-fetch-interval-seconds=10
ribbon.ServerListRefreshInterval=5000
這樣客戶端最大的感知時(shí)間就是15s熔恢,我們需要保證在這15s內(nèi)脐湾,訪問舊實(shí)例不會(huì)失敗,因此需要拉長舊實(shí)例的下線時(shí)間叙淌。
2.3 拉長舊實(shí)例的下線時(shí)間
想要拉長下線時(shí)間比較容易秤掌,通過sleep就可以,但是有個(gè)前提:必須保證server端的實(shí)例狀態(tài)已經(jīng)為DOWN, 且servlet容器沒有被停止鹰霍,在這個(gè)階段的sleep才有意義闻鉴。
如何來保證sleep的時(shí)機(jī),需要繼續(xù)深入分析eureka的實(shí)現(xiàn)茂洒。
如何確定sleep線程的時(shí)機(jī)
前文已經(jīng)提到孟岛,eureka中EurekaAutoServiceRegistration
分別實(shí)現(xiàn)了SmartLifecycle
, SmartApplicationListener
,因此有兩個(gè)入口來感知關(guān)閉事件,分別是
@Override
public void stop(Runnable callback) {
stop();
callback.run();
}
@Override
public void onApplicationEvent(ApplicationEvent event) {
if (event instanceof WebServerInitializedEvent) {
onApplicationEvent((WebServerInitializedEvent) event);
}
else if (event instanceof ContextClosedEvent) {
onApplicationEvent((ContextClosedEvent) event);
}
}
這兩個(gè)入口的調(diào)用先后順序是如何的渠羞?
在Spring 源碼分析 —— 服務(wù)優(yōu)雅關(guān)閉中有提到斤贰,AbstractApplicationContext#doClose()
方法的實(shí)現(xiàn)中,會(huì)先發(fā)送ContextClosedEvent
事件次询,再通過lifecycleProcessor
調(diào)用所有l(wèi)ifecycle的stop
荧恍。
AbstractApplicationContext
protected void doClose() {
// ...
try {
// Publish shutdown event.
// 發(fā)布ContextClosedEvent關(guān)閉事件
publishEvent(new ContextClosedEvent(this));
}
catch (Throwable ex) {
logger.warn("Exception thrown from ApplicationListener handling ContextClosedEvent", ex);
}
// Stop all Lifecycle beans, to avoid delays during individual destruction.
// 調(diào)用所有l(wèi)ifecycle子類bean的關(guān)閉方法
if (this.lifecycleProcessor != null) {
try {
this.lifecycleProcessor.onClose();
}
catch (Throwable ex) {
logger.warn("Exception thrown from LifecycleProcessor on context close", ex);
}
}
// ...
}
可以得知,listener的處理先于lifecycle的處理
了解了eureka停止時(shí)機(jī)之后屯吊,還需要看看servlet容器是何時(shí)停止的送巡。
servlet容器(Tomcat)何時(shí)被停止
從這張繼承關(guān)系圖可以發(fā)現(xiàn),除了EurekaAutoServiceRegistration
之外盒卸,還有兩個(gè)WebServer的bean,WebServerGracefulShutdownLifecycle
授艰、WebServerStartStopLifecycle
也同樣實(shí)現(xiàn)了SmartLifecycle
WebServerStartStopLifecycle
負(fù)責(zé)webServer的啟動(dòng)和停止
WebServerStartStopLifecycle
只負(fù)責(zé)webServer的優(yōu)雅停止(默認(rèn)不執(zhí)行)
class WebServerStartStopLifecycle implements SmartLifecycle {
private final ServletWebServerApplicationContext applicationContext;
private final WebServer webServer;
private volatile boolean running;
WebServerStartStopLifecycle(ServletWebServerApplicationContext applicationContext, WebServer webServer) {
this.applicationContext = applicationContext;
this.webServer = webServer;
}
@Override
public void start() {
this.webServer.start();
this.running = true;
this.applicationContext
.publishEvent(new ServletWebServerInitializedEvent(this.webServer, this.applicationContext));
}
@Override
public void stop() {
this.webServer.stop();
}
@Override
public boolean isRunning() {
return this.running;
}
@Override
public int getPhase() {
return Integer.MAX_VALUE - 1;
}
}
class WebServerGracefulShutdownLifecycle implements SmartLifecycle {
private final WebServer webServer;
private volatile boolean running;
WebServerGracefulShutdownLifecycle(WebServer webServer) {
this.webServer = webServer;
}
@Override
public void start() {
this.running = true;
}
@Override
public void stop() {
throw new UnsupportedOperationException("Stop must not be invoked directly");
}
@Override
public void stop(Runnable callback) {
this.running = false;
this.webServer.shutDownGracefully((result) -> callback.run());
}
@Override
public boolean isRunning() {
return this.running;
}
}
SmartLifecycle實(shí)現(xiàn)類的執(zhí)行順序是如何的?
他們的執(zhí)行順序是由getPhase
的值決定的世落。在spring啟動(dòng)過程中淮腾,會(huì)根據(jù)phase的值從小到大執(zhí)行,在停止過程中屉佳,會(huì)從大到小執(zhí)行(注意是相反的)谷朝,具體的實(shí)現(xiàn)在DefaultLifecycleProcessor
中。
SmartLifecycle | phase |
---|---|
EurekaAutoServiceRegistration | 0 |
WebServerStartStopLifecycle | Integer.MAX_VALUE - 1 |
WebServerGracefulShutdownLifecycle | Integer.MAX_VALUE |
啟動(dòng)順序: EurekaAutoServiceRegistration
-> WebServerStartStopLifecycle
-> WebServerGracefulShutdownLifecycle
停止順序: WebServerGracefulShutdownLifecycle
-> WebServerStartStopLifecycle
-> EurekaAutoServiceRegistration
結(jié)合上面的分析武花,整個(gè)shutdown的執(zhí)行順序如下圖:
至此圆凰,我們可以在圖中插入點(diǎn)1
、插入點(diǎn)2
完成sleep的操作体箕。
但是因?yàn)?code>WebServerGracefulShutdownLifecycle已經(jīng)是最高優(yōu)先級(jí)了专钉,如果我們默認(rèn)沒有開啟優(yōu)雅關(guān)閉,可以在插入點(diǎn)2
實(shí)現(xiàn)SmartLifecycle
并配置最高優(yōu)先級(jí)累铅,否則為了穩(wěn)妥和保證擴(kuò)展性跃须,更應(yīng)該在插入點(diǎn)1
來完成。
插入點(diǎn)1: SmartApplicationListener實(shí)現(xiàn)
@Slf4j
public class UnawareBootListener implements SmartApplicationListener {
// server讀寫cache的同步周期
public static final Integer EUREKA_READ_WRITE_CACHE_SYNC_INTERVAL_SECONDS = 10;
// eureka.client 配置
private final EurekaClientOptimizeConfigBean eurekaConfig;
// ribbon 配置
private final RibbonOptimizeConfigBean ribbonConfig;
public UnawareBootListener(EurekaClientOptimizeConfigBean eurekaConfig, RibbonOptimizeConfigBean ribbonConfig) {
this.eurekaConfig = eurekaConfig;
this.ribbonConfig = ribbonConfig;
}
@Override
public boolean supportsEventType(Class<? extends ApplicationEvent> eventType) {
return ContextClosedEvent.class.isAssignableFrom(eventType);
}
// eureka是0娃兽,這里設(shè)置成1菇民,比eureka低1級(jí)
@Override
public int getOrder() {
return 1;
}
@Override
public void onApplicationEvent(ApplicationEvent event) {
Integer registryFetchIntervalSeconds = eurekaConfig.getRegistryFetchIntervalSeconds();
if (registryFetchIntervalSeconds == null || registryFetchIntervalSeconds <= 0) {
registryFetchIntervalSeconds = DEFAULT_FETCH_INTERVAL_SECONDS;
}
Integer serverListRefreshInterval = ribbonConfig.getServerListRefreshInterval();
if (serverListRefreshInterval == null || serverListRefreshInterval <= 0) {
serverListRefreshInterval = DEFAULT_SERVER_LIST_REFRESH_INTERVAL_SECONDS;
}
int shutDownWaitTime = registryFetchIntervalSeconds + (serverListRefreshInterval / 1000)
+ DEFAULT_EUREKA_READ_WRITE_CACHE_SYNC_INTERVAL_SECONDS;
try {
Thread.sleep(shutDownWaitTime * 1000L);
} catch (InterruptedException e) {
log.warn("UnawareBootListener wait to shutdown interrupted");
}
log.info("UnawareBootListener wait to shutdown seconds: {}s finish", shutDownWaitTime);
}
}
插入點(diǎn)2: lifecycle實(shí)現(xiàn)
@Slf4j
public class UnawareBoot implements SmartLifecycle {
// server讀寫cache的同步周期
public static final Integer EUREKA_READ_WRITE_CACHE_SYNC_INTERVAL_SECONDS = 10;
// eureka.client 配置
private final EurekaClientOptimizeConfigBean eurekaConfig;
// ribbon 配置
private final RibbonOptimizeConfigBean ribbonConfig;
private volatile boolean running = false;
public UnawareBoot(EurekaClientOptimizeConfigBean eurekaConfig, RibbonOptimizeConfigBean ribbonConfig) {
this.eurekaConfig = eurekaConfig;
this.ribbonConfig = ribbonConfig;
}
@Override
public void start() {
running = true;
}
@Override
public void stop() {
running = false;
Integer registryFetchIntervalSeconds = eurekaConfig.getRegistryFetchIntervalSeconds();
if (registryFetchIntervalSeconds == null || registryFetchIntervalSeconds <= 0) {
registryFetchIntervalSeconds = DEFAULT_FETCH_INTERVAL_SECONDS;
}
Integer serverListRefreshInterval = ribbonConfig.getServerListRefreshInterval();
if (serverListRefreshInterval == null || serverListRefreshInterval <= 0) {
serverListRefreshInterval = DEFAULT_SERVER_LIST_REFRESH_INTERVAL_SECONDS;
}
int shutDownWaitTime = registryFetchIntervalSeconds + (serverListRefreshInterval / 1000) + EUREKA_READ_WRITE_CACHE_SYNC_INTERVAL_SECONDS;
try {
Thread.sleep(shutDownWaitTime * 1000L);
} catch (InterruptedException e) {
log.warn("UnawareBoot wait to shutdown interrupted");
}
log.info("UnawareBoot wait to shutdown seconds: {}s finish", shutDownWaitTime);
}
/**
* 設(shè)置最高優(yōu)先級(jí),stop時(shí)優(yōu)先阻塞
*/
@Override
public int getPhase() {
return Integer.MAX_VALUE;
}
@Override
public boolean isRunning() {
return running;
}
}
看上去很完美投储,整個(gè)思路沒有問題第练,確實(shí)可以解決平滑重啟的問題,但是中間少考慮的一點(diǎn)玛荞,就是eureka client續(xù)約心跳任務(wù)娇掏。如果當(dāng)前代碼在sleep之前,client先發(fā)送了續(xù)約請(qǐng)求勋眯,那樣同步給server的狀態(tài)就從DOWN變成了UP婴梧。
漏洞修復(fù)
心跳續(xù)約任務(wù)實(shí)現(xiàn)
InstanceInfoReplicator
public void run() {
try {
// 刷新實(shí)例狀態(tài)壁涎,也就是這個(gè)方法將之前的DOWN轉(zhuǎn)為了UP
discoveryClient.refreshInstanceInfo();
Long dirtyTimestamp = instanceInfo.isDirtyWithTime();
if (dirtyTimestamp != null) {
discoveryClient.register();
instanceInfo.unsetIsDirty(dirtyTimestamp);
}
} catch (Throwable t) {
logger.warn("There was a problem with the instance info replicator", t);
} finally {
Future next = scheduler.schedule(this, replicationIntervalSeconds, TimeUnit.SECONDS);
scheduledPeriodicRef.set(next);
}
}
DiscoveryClient
void refreshInstanceInfo() {
applicationInfoManager.refreshDataCenterInfoIfRequired();
applicationInfoManager.refreshLeaseInfoIfRequired();
InstanceStatus status;
try {
status = getHealthCheckHandler().getStatus(instanceInfo.getStatus());
} catch (Exception e) {
logger.warn("Exception from healthcheckHandler.getStatus, setting status to DOWN", e);
status = InstanceStatus.DOWN;
}
if (null != status) {
applicationInfoManager.setInstanceStatus(status);
}
}
InstanceInfoReplicator
心跳續(xù)約任務(wù)會(huì)每隔replicationIntervalSeconds
(默認(rèn)30s),向server同步當(dāng)前狀態(tài)志秃,同步之前會(huì)計(jì)算當(dāng)前的最新狀態(tài)怔球。計(jì)算狀態(tài)由DiscoveryClient.getHealthCheckHandler().getStatus()
完成。
EurekaHealthCheckHandler
@Override
public InstanceStatus getStatus(InstanceStatus instanceStatus) {
return getHealthStatus();
}
protected InstanceStatus getHealthStatus() {
final Status status;
// statusAggregator默認(rèn)會(huì)在初始化時(shí)注入
if (statusAggregator != null) {
status = getStatus(statusAggregator);
}
else {
status = getStatus(getHealthIndicator());
}
return mapToInstanceStatus(status);
}
protected Status getStatus(StatusAggregator statusAggregator) {
Status status;
Set<Status> statusSet = new HashSet<>();
if (healthIndicators != null) {
statusSet.addAll(
healthIndicators.values().stream().map(HealthIndicator::health)
.map(Health::getStatus).collect(Collectors.toSet()));
}
if (reactiveHealthIndicators != null) {
statusSet.addAll(reactiveHealthIndicators.values().stream()
.map(ReactiveHealthIndicator::health).map(Mono::block)
.filter(Objects::nonNull).map(Health::getStatus)
.collect(Collectors.toSet()));
}
// 這個(gè)方法會(huì)將set集合中的每個(gè)status進(jìn)行排序浮还,返回order最低的一個(gè)set
status = statusAggregator.getAggregateStatus(statusSet);
return status;
}
這個(gè)getStatus
會(huì)計(jì)算當(dāng)前最新的狀態(tài)竟坛,計(jì)算的方式遍歷所有的healthIndicators
,基于當(dāng)前實(shí)例的各種狀態(tài)钧舌、參數(shù)担汤、數(shù)據(jù)庫狀態(tài)等分別計(jì)算Status,構(gòu)成一個(gè)Set<Status>集合洼冻。
SimpleStatusAggregator
@Override
public Status getAggregateStatus(Set<Status> statuses) {
return statuses.stream().filter(this::contains).min(this.comparator).orElse(Status.UNKNOWN);
}
SimpleStatusAggregator
會(huì)將Set集合進(jìn)行排序崭歧,返回order最低的一個(gè)set,默認(rèn)順序從低到高依次是
DOWN -> OUT_OF_SERVICE -> UP -> UNKNOWN
如果一切正常撞牢,這里比較后的狀態(tài)就是UP率碾,重新設(shè)置到InstanceInfo
中,變更事件會(huì)將此次變更發(fā)給server屋彪,server中的實(shí)例狀態(tài)就被更新為UP所宰。
解釋完心跳續(xù)約的過程之后,我們知道畜挥,如果只是單純依賴變更事件去同步server實(shí)例DOWN的狀態(tài)是不嚴(yán)謹(jǐn)?shù)淖兄唷P枰獜氐讓ureka shutdown才可以。
shutdown eureka client
先看下eureka自己是怎么實(shí)現(xiàn)shutdown的蟹但。
EurekaClientAutoConfiguration
@Bean(destroyMethod = "shutdown")
@ConditionalOnMissingBean(value = EurekaClient.class, search = SearchStrategy.CURRENT)
@org.springframework.cloud.context.config.annotation.RefreshScope
@Lazy
public EurekaClient eurekaClient(ApplicationInfoManager manager,
EurekaClientConfig config, EurekaInstanceConfig instance,
@Autowired(required = false) HealthCheckHandler healthCheckHandler) {
ApplicationInfoManager appManager;
if (AopUtils.isAopProxy(manager)) {
appManager = ProxyUtils.getTargetObject(manager);
}
else {
appManager = manager;
}
CloudEurekaClient cloudEurekaClient = new CloudEurekaClient(appManager, config, this.optionalArgs, this.context);
cloudEurekaClient.registerHealthCheck(healthCheckHandler);
return cloudEurekaClient;
}
在EurekaClient
的定義中躯泰,指定了destroyMethod
屬性,當(dāng)bean在被回收時(shí)华糖,會(huì)調(diào)用此方法麦向。
DiscoveryClient
@PreDestroy
@Override
public synchronized void shutdown() {
if (isShutdown.compareAndSet(false, true)) {
logger.info("Shutting down DiscoveryClient ...");
if (statusChangeListener != null && applicationInfoManager != null) {
// 從applicationInfoManager移除事件變更監(jiān)聽器
applicationInfoManager.unregisterStatusChangeListener(statusChangeListener.getId());
}
// 取消所有的定時(shí)任務(wù)
cancelScheduledTasks();
// If APPINFO was registered
if (applicationInfoManager != null && clientConfig.shouldRegisterWithEureka() && clientConfig.shouldUnregisterOnShutdown()) {
// 設(shè)置狀態(tài)為DOWN,并主動(dòng)發(fā)送注銷請(qǐng)求cancel到server端缅阳,這里不在依賴監(jiān)聽器發(fā)送
applicationInfoManager.setInstanceStatus(InstanceStatus.DOWN);
unregister();
}
// 關(guān)閉Transport client
if (eurekaTransport != null) {
eurekaTransport.shutdown();
}
// 關(guān)閉各種Monitor
heartbeatStalenessMonitor.shutdown();
registryStalenessMonitor.shutdown();
Monitors.unregisterObject(this);
logger.info("Completed shut down of DiscoveryClient");
}
}
private void cancelScheduledTasks() {
// 停止心跳續(xù)約任務(wù)
if (instanceInfoReplicator != null) {
instanceInfoReplicator.stop();
}
// 停止心跳續(xù)約執(zhí)行器
if (heartbeatExecutor != null) {
heartbeatExecutor.shutdownNow();
}
// 停止定時(shí)拉取server注冊(cè)表執(zhí)行器
if (cacheRefreshExecutor != null) {
cacheRefreshExecutor.shutdownNow();
}
// 停止ScheduledExecutorService
if (scheduler != null) {
scheduler.shutdownNow();
}
// 停止定時(shí)拉取server注冊(cè)表任務(wù)
if (cacheRefreshTask != null) {
cacheRefreshTask.cancel();
}
// 停止心跳任務(wù)
if (heartbeatTask != null) {
heartbeatTask.cancel();
}
}
shutdown
主要完成了幾件事:移除事件變更監(jiān)聽器磕蛇、停止所有的定時(shí)任務(wù)、設(shè)置實(shí)例狀態(tài)為DOWN十办、發(fā)起注銷請(qǐng)求、關(guān)閉Transport client
了解了整個(gè)關(guān)閉過程之后超棺,如果我們想徹底保證server的注冊(cè)表處于DOWN的狀態(tài)向族,只需要手動(dòng)調(diào)用DiscoveryClient.shutdown()
。
準(zhǔn)確來說shutdown之后的server注冊(cè)表已經(jīng)把當(dāng)前實(shí)例下掉了棠绘,不再顯示DOWN狀態(tài)件相。
五. 平滑啟動(dòng)完整版實(shí)現(xiàn)
@Slf4j
public class UnawareBootListener implements SmartApplicationListener, ApplicationContextAware {
// server讀寫cache的同步周期
public static final Integer EUREKA_READ_WRITE_CACHE_SYNC_INTERVAL_SECONDS = 10;
// eureka.client 配置
private final EurekaClientOptimizeConfigBean eurekaConfig;
// ribbon 配置
private final RibbonOptimizeConfigBean ribbonConfig;
public UnawareBootListener(EurekaClientOptimizeConfigBean eurekaConfig, RibbonOptimizeConfigBean ribbonConfig) {
this.eurekaConfig = eurekaConfig;
this.ribbonConfig = ribbonConfig;
}
@Override
public boolean supportsEventType(Class<? extends ApplicationEvent> eventType) {
return ContextClosedEvent.class.isAssignableFrom(eventType);
}
//之前指定order的值也可以忽略了再扭,都已經(jīng)主動(dòng)shutdown,不需要在關(guān)心listener的順序
@Override
public void onApplicationEvent(ApplicationEvent event) {
DiscoveryClient discoveryClient = applicationContext.getBean(DiscoveryClient.class);
// 主動(dòng)觸發(fā)eureka client shutdown
discoveryClient.shutdown();
Integer registryFetchIntervalSeconds = eurekaConfig.getRegistryFetchIntervalSeconds();
if (registryFetchIntervalSeconds == null || registryFetchIntervalSeconds <= 0) {
registryFetchIntervalSeconds = DEFAULT_FETCH_INTERVAL_SECONDS;
}
Integer serverListRefreshInterval = ribbonConfig.getServerListRefreshInterval();
if (serverListRefreshInterval == null || serverListRefreshInterval <= 0) {
serverListRefreshInterval = DEFAULT_SERVER_LIST_REFRESH_INTERVAL_SECONDS;
}
int shutDownWaitTime = registryFetchIntervalSeconds + (serverListRefreshInterval / 1000)
+ DEFAULT_EUREKA_READ_WRITE_CACHE_SYNC_INTERVAL_SECONDS;
try {
Thread.sleep(shutDownWaitTime * 1000L);
} catch (InterruptedException e) {
log.warn("UnawareBootListener wait to shutdown interrupted");
}
log.info("UnawareBootListener wait to shutdown seconds: {}s finish", shutDownWaitTime);
}
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
this.applicationContext = applicationContext;
}
}
六. 總結(jié)
本文從源碼層面剖析了eureka內(nèi)部如何感知實(shí)例上下線夜矗,如何刷新緩存等泛范,并給出了解決平滑啟動(dòng)的最佳實(shí)踐。
寫在文末:
實(shí)踐代碼雖然只有區(qū)區(qū)幾十行紊撕,但至少需要了解上百倍代碼量實(shí)現(xiàn)罢荡。了解如何啟動(dòng)、如何刷新对扶、如何停止区赵,考慮前后依賴的各種組件,前后耗時(shí)一個(gè)月浪南,花了十幾個(gè)小時(shí)笼才,才寫出這幾十行代碼。