本文通過閱讀Eureka源碼艾帐,分享Eureka的實現(xiàn)原理定躏。
本文主要梳理Eureka整體設計及實現(xiàn),并不一一列舉Eureka源碼細節(jié)临梗。
源碼分析基于Spring Cloud Hoxton胞得,Eureka版本為1.9
Eureka分為Eureka Client荧止,Eureka Server,多個Eureka Server節(jié)點組成一個Eureka集群阶剑,服務通過Eureka Client注冊到Eureka Server跃巡。
CAP理論指出,一個分布式系統(tǒng)不可能同時滿足C(一致性)牧愁、A(可用性)和P(分區(qū)容錯性)素邪。
由于分布式系統(tǒng)中必須保證分區(qū)容錯性,因此我們只能在A和C之間進行權衡猪半。
Zookeeper保證的是CP, 而Eureka則是保證AP兔朦。
為什么呢?
在注冊中心這種場景中磨确,可用性比一致性更重要沽甥。
作為注冊中心,其實數(shù)據(jù)是不經常變更的乏奥,只有服務發(fā)布摆舟,機器上下線,服務擴縮容時才變更邓了。
因此Eureka選擇AP恨诱,即使出問題了,也返回舊數(shù)據(jù)骗炉,保證服務能(最大程度)正常調用照宝, 避免出現(xiàn)因為注冊中心的問題導致服務不可用這種得不償失的情況。
所以痕鳍,Eureka各個節(jié)點都是平等的(去中心化的架構硫豆,無master/slave區(qū)分)龙巨,掛掉的節(jié)點不會影響正常節(jié)點的工作笼呆,剩余的節(jié)點依然可以提供注冊和查詢服務熊响。
Eureka Client
Eureka 1.9只要引入spring-cloud-starter-netflix-eureka-client依賴,即使不使用@EnableDiscoveryClient或@EnableEurekaClient注解诗赌,服務也會注冊到Eureka集群汗茄。
client主要邏輯在com.netflix.discovery.DiscoveryClient實現(xiàn),EurekaClientAutoConfiguration中構建了其子類CloudEurekaClient铭若。
定時任務
DiscoveryClient#initScheduledTasks方法設置定時任務洪碳,主要有CacheRefreshThread,HeartbeatThread叼屠,以及InstanceInfoReplicator瞳腌。
同步
服務注冊信息緩存在DiscoveryClient#localRegionApps變量中,CacheRefreshThread負責定時從Eureka Server讀取最新的服務注冊信息镜雨,更新到本地緩存嫂侍。
CacheRefreshThread -> DiscoveryClient#refreshRegistry -> DiscoveryClient#fetchRegistry
當存在多個Eureka Server節(jié)點時,Client會與eureka.client.serviceUrl.defaultZone配置的第一個Server節(jié)點同步數(shù)據(jù)荚坞,當?shù)谝粋€Server節(jié)點同步失敗挑宠,才會同步第二個節(jié)點,以此類推颓影。
從DiscoveryClient#fetchRegistry可以看到各淀,同步數(shù)據(jù)有兩個方法
(1)全量同步
由DiscoveryClient#getAndStoreFullRegistry方法實現(xiàn),通過Http Get調用Server接口apps/
诡挂,
獲取Server節(jié)點中所有服務注冊信息替換DiscoveryClient#localRegionApps
注意:Client請求Server端的服務碎浇,都是通過EurekaHttpClient接口發(fā)起,該接口實現(xiàn)類EurekaHttpClientDecorator通過RequestExecutor接口將請求委托給其他EurekaHttpClient實現(xiàn)類璃俗,并提供execute方法給子類實現(xiàn)擴展處理(該擴展處理可以針對每一個EurekaHttpClient方法南捂,類似AOP)。子類RetryableEurekaHttpClient#execute中旧找,會獲取eureka.client.service-url.defaultZone中配置的地址溺健,通過TransportClientFactory#newClient,構造一個RestTemplateTransportClientFactory钮蛛,再真正發(fā)起請求鞭缭。
(2)增量同步
由DiscoveryClient#getAndUpdateDelta方法實現(xiàn),通過Http Get調用Server接口apps/delta
魏颓,獲取最新ADDED岭辣、MODIFIED,DELETED操作甸饱,更新本地緩存沦童。
如果獲取最新操作失敗仑濒,則會發(fā)起全量同步。
配置:
eureka.client.fetch-registry偷遗,是否定時同步信息墩瞳,默認true
eureka.client.registry-fetch-interval-seconds,間隔多少秒同步一次服務注冊信息氏豌,默認30
心跳
HeartbeatThread -> DiscoveryClient#renew -> EurekaHttpClient#sendHeartBeat
通過Http Put調用Server接口apps/{appName}/{instanceId}
appName是服務的spring.application.name喉酌,instanceId是服務IP加服務端口。
注意:如果Server返回NOT_FOUND狀態(tài)泵喘,則重新注冊泪电。
配置:
eureka.client.register-with-eureka,當前應用是否注冊到Eureka集群纪铺,默認true
eureka.instance.lease-renewal-interval-in-seconds相速,間隔多少秒發(fā)送一次心跳,默認30
注冊
DiscoveryClient#構造函數(shù) -> DiscoveryClient#register
通過Http Post調用Server接口apps/{appName}
鲜锚,發(fā)送當前應用的注冊信息到Server突诬。
配置:
eureka.client.register-with-eureka,當前應用是否注冊到Eureka集群烹棉,默認true
eureka.client.should-enforce-registration-at-init攒霹,是否在初始化時注冊,默認false
InstanceInfoReplicator
InstanceInfoReplicator任務會去監(jiān)測應用自身的IP信息以及配置信息是否發(fā)生改變浆洗,如果發(fā)生改變催束,則會重新發(fā)起注冊。
配置:
eureka.client.initial-instance-info-replication-interval-seconds伏社,間隔多少秒檢查一次自身信息抠刺,默認40
下線
EurekaClientAutoConfiguration配置了CloudEurekaClient的銷毀方法
@Bean(destroyMethod = "shutdown")
DiscoveryClient#shutdown方法完成下線的處理工作,包括取消定時任務摘昌,調用unregister方法(通過Http Delete調用Server接口apps/{appName}/{id}
)速妖,取消監(jiān)控任務等
Eureka Server
@EnableEurekaServer引入EurekaServerMarkerConfiguration,EurekaServerMarkerConfiguration構建EurekaServerMarkerConfiguration.Marker聪黎。
EurekaServerAutoConfiguration會在Spring上下文中存在EurekaServerMarkerConfiguration.Marker時生效罕容,構造Server端組件類。
Eureka Server也要使用DiscoveryClient稿饰,拉取其他Server節(jié)點的服務注冊信息或者將自身注冊到Eureka集群中锦秒。
啟動同步
Server啟動時,需要從相鄰Server節(jié)點獲取服務注冊信息喉镰,同步到自身內存旅择。
Server的服務注冊信息存放在AbstractInstanceRegistry#registry變量中,類型為ConcurrentHashMap<String, Map<String, Lease<InstanceInfo>>>侣姆。
外層Map Key為appName生真,外層Map Key為instanceId沉噩,Lease代表Client與Server之間維持的一個契約。InstanceInfo保存具體的服務注冊信息柱蟀,如instanceId川蒙,appName,ipAddr产弹,port等派歌。
EurekaServerBootstrap是Server端的啟動引導類弯囊,EurekaServerInitializerConfiguration實現(xiàn)了Lifecycle接口痰哨,start方法調用eurekaServerBootstrap.contextInitialized完成Server端初始化。
eurekaServerBootstrap.contextInitialized -> EurekaServerBootstrap#initEurekaServerContext -> PeerAwareInstanceRegistryImpl#syncUp -> AbstractInstanceRegistry#register
PeerAwareInstanceRegistryImpl#syncUp調用DiscoveryClient#getApplications方法匾嘱,獲取相鄰server節(jié)點的所有服務注冊信息斤斧,再調用AbstractInstanceRegistry#register方法,注冊到AbstractInstanceRegistry#registry變量中霎烙。
AbstractInstanceRegistry#register
public void register(InstanceInfo registrant, int leaseDuration, boolean isReplication) {
try {
read.lock();
Map<String, Lease<InstanceInfo>> gMap = registry.get(registrant.getAppName());
REGISTER.increment(isReplication);
...
// #1
Lease<InstanceInfo> existingLease = gMap.get(registrant.getId());
if (existingLease != null && (existingLease.getHolder() != null)) {
Long existingLastDirtyTimestamp = existingLease.getHolder().getLastDirtyTimestamp();
Long registrationLastDirtyTimestamp = registrant.getLastDirtyTimestamp();
...
// #2
if (existingLastDirtyTimestamp > registrationLastDirtyTimestamp) {
registrant = existingLease.getHolder();
}
} else {
synchronized (lock) {
if (this.expectedNumberOfClientsSendingRenews > 0) {
this.expectedNumberOfClientsSendingRenews = this.expectedNumberOfClientsSendingRenews + 1;
// #3
updateRenewsPerMinThreshold();
}
}
logger.debug("No previous lease information found; it is new registration");
}
Lease<InstanceInfo> lease = new Lease<InstanceInfo>(registrant, leaseDuration);
if (existingLease != null) {
lease.setServiceUpTimestamp(existingLease.getServiceUpTimestamp());
}
// #4
gMap.put(registrant.getId(), lease);
...
registrant.setActionType(ActionType.ADDED);
// #5
recentlyChangedQueue.add(new RecentlyChangedItem(lease));
registrant.setLastUpdatedTimestamp();
invalidateCache(registrant.getAppName(), registrant.getVIPAddress(), registrant.getSecureVipAddress());
logger.info("Registered instance {}/{} with status {} (replication={})",
registrant.getAppName(), registrant.getId(), registrant.getStatus(), isReplication);
} finally {
read.unlock();
}
}
#1
通過appName撬讽,instanceId查詢已有的Lease
#2
如果該服務已存在Lease,并且LastDirtyTimestamp的值更大悬垃,使用已存在的Lease游昼。
#3
更新numberOfRenewsPerMinThreshold,該值用于自我保護模式尝蠕。
#4
構建一個新的Lease烘豌,添加到AbstractInstanceRegistry#registry緩存中。
#5
添加recentlyChangedQueue看彼,apps/delta
接口從中獲取最新變更操作廊佩。
提供服務
Server通過ApplicationsResource/ApplicationResource/InstanceResource對外提供Http服務。
AbstractInstanceRegistry負責實現(xiàn)cancle靖榕,register标锄,renew,statusUpdate茁计,deleteStatusOverride等操作的業(yè)務邏輯料皇。
PeerAwareInstanceRegistryImpl通過replicateToPeers方法將操作同步到其他節(jié)點,以保證集群節(jié)點數(shù)據(jù)同步星压。
PeerAwareInstanceRegistryImpl#replicateToPeers方法最后一個參數(shù)isReplication践剂,決定是否需要進行同步。
如果Server節(jié)點接收到其他Server節(jié)點發(fā)送的同步操作租幕,是不需要再繼續(xù)向其他Server同步的舷手,否則會引起循環(huán)更新。
該參數(shù)通過Http Requst的Header參數(shù)x-netflix-discovery-replication決定(只有Client發(fā)送的請求該參數(shù)才為true)劲绪。
數(shù)據(jù)一致
PeerAwareInstanceRegistryImpl#replicateToPeers方法通過PeerEurekaNodes#getPeerEurekaNodes獲取其他server節(jié)點地址男窟,
PeerEurekaNodes#peerEurekaNodes變量維護了所有的Server節(jié)點信息盆赤。
PeerEurekaNodes通過peersUpdateTask任務定時從DNS或配置文件獲取最新的Server節(jié)點地址列表,并更新PeerEurekaNodes#peerEurekaNodes歉眷。
配置:
eureka.server.peer-eureka-nodes-update-interval-ms牺六,間隔多少分鐘拉取一次Server節(jié)點地址列表,默認10
PeerEurekaNode管理具體一個Server節(jié)點汗捡,并負責向該Server節(jié)點同步register淑际,cancel,heartbeat等操作扇住。
PeerEurekaNode通過定時任務的方式同步這些操作春缕。它維護了兩個TaskDispatcher,批處理調度器batchingDispatcher和非批處理調度器nonBatchingDispatcher艘蹋。
PeerEurekaNode#構造方法調用TaskDispatchers#createBatchingTaskDispatcher構造TaskDispatcher
public static <ID, T> TaskDispatcher<ID, T> createBatchingTaskDispatcher(String id,
int maxBufferSize,
int workloadSize,
int workerCount,
long maxBatchingDelay,
long congestionRetryDelayMs,
long networkFailureRetryMs,
TaskProcessor<T> taskProcessor) {
final AcceptorExecutor<ID, T> acceptorExecutor = new AcceptorExecutor<>(
id, maxBufferSize, workloadSize, maxBatchingDelay, congestionRetryDelayMs, networkFailureRetryMs
);
final TaskExecutors<ID, T> taskExecutor = TaskExecutors.batchExecutors(id, workerCount, taskProcessor, acceptorExecutor);
return new TaskDispatcher<ID, T>() {
public void process(ID id, T task, long expiryTime) {
acceptorExecutor.process(id, task, expiryTime);
}
public void shutdown() {
acceptorExecutor.shutdown();
taskExecutor.shutdown();
}
};
}
TaskDispatcher負責任務分發(fā)锄贼,過期任務會被拋棄,如果兩個任務有相同id女阀,則前一個任務則會被刪除宅荤。
AcceptorExecutor負責整合任務,將任務放入批次中浸策。
TaskExecutors將整合好的任務(批次)分給TaskProcessor處理冯键,實際處理任務的是ReplicationTaskProcessor。
ReplicationTaskProcessor可以重復執(zhí)行失敗的任務庸汗,ReplicationTaskProcessor#process(List<ReplicationTask> tasks)
處理批次任務惫确,將tasks合并到一個請求,發(fā)送到下游Server接口peerreplication/batch/
夫晌。
任務類為ReplicationTask雕薪,它提供了handleFailure方法,當下游Server接口返回statusCode不在[200,300)區(qū)間晓淀,則調用該方法所袁。
從TaskExecutors#BatchWorkerRunnable的run方法可以看到,
調用下游Server接口時凶掰,如果下游返回503狀態(tài)或發(fā)生IO異常燥爷,會通過taskDispatcher.reprocess重新執(zhí)行任務,以保證最終一致性懦窘。
如果發(fā)生其他異常前翎,只打印日志,不重復執(zhí)行任務畅涂。
配置:
eureka.server.max-elements-in-peer-replication-pool港华,等待執(zhí)行任務最大數(shù)量,默認為10000
需要注意一下PeerEurekaNode#heartbeat方法午衰,心跳任務實現(xiàn)了handleFailure方法
public void handleFailure(int statusCode, Object responseEntity) throws Throwable {
super.handleFailure(statusCode, responseEntity);
if (statusCode == 404) {
logger.warn("{}: missing entry.", getTaskName());
if (info != null) {
logger.warn("{}: cannot find instance id {} and hence replicating the instance with status {}",
getTaskName(), info.getId(), info.getStatus());
register(info);
}
}
...
}
如果下游server節(jié)點沒有找到服務注冊信息立宜,就返回404狀態(tài)冒萄,這時需要重新注冊該服務。這點很重要橙数,它可以保證不同Server節(jié)點保持數(shù)據(jù)一致尊流。
假設有一個client,注冊到Eureka集群server1,server2,server3灯帮。下面來分析兩個場景
場景1. client啟動時崖技,server1接收帶client的注冊信息,但同步給server2前宕機了钟哥,怎么辦迎献?
這時,client定時發(fā)起心跳瞪醋,但它與server1心跳操作失敗忿晕,只能向server2發(fā)起心跳装诡,server2返回404(NOT_FOUND狀態(tài))银受,client重新注冊。
場景2. server3與其他機器server1,server2之間出現(xiàn)了網絡分區(qū)鸦采,這時client注冊到eureka集群宾巍。然后網絡恢復了,server3怎么同步數(shù)據(jù)呢渔伯?
當server1向server3同步心跳時顶霞,server3返回404,于是server1重新向server3注冊client信息锣吼,數(shù)據(jù)最終保持一致选浑。
主動失效
AbstractInstanceRegistry#deltaRetentionTimer任務會定時移除recentlyChangedQueue中過期的增量操作信息
配置:
eureka.server.delta-retention-timer-interval-in-ms,間隔多少秒清理一次過期的增量操作信息玄叠,默認30
eureka.server.retention-time-in-m-s-in-delta-queue古徒,增量操作保留多少分鐘,默認3
AbstractInstanceRegistry#evictionTimer任務會定時剔除AbstractInstanceRegistry#registry中已經過期的(太久沒收到心跳)服務注冊信息读恃。
計算服務失效時間時還要加上補償時間隧膘,即計算本次任務執(zhí)行的時間和上次任務執(zhí)行的時間差,若超過eviction-interval-timer-in-ms配置值則加上超出時間差作為補償時間寺惫。
每次剔除服務的數(shù)量都有一個上限疹吃,為注冊服務數(shù)量*renewal-percent-threshold,Eureka會隨機剔除過期的服務西雀。
配置:
eureka.server.eviction-interval-timer-in-ms萨驶,間隔多少秒清理一次過期的服務,默認60
eureka.instance.lease-expiration-duration-in-seconds艇肴,間隔多少秒沒收到心跳則判定服務過期腔呜,默認90
eureka.server.renewal-percent-threshold判莉,自我保護閥值因子,默認0.85
自我保護機制
PeerAwareInstanceRegistryImpl#scheduleRenewalThresholdUpdateTask育谬,定時更新numberOfRenewsPerMinThreshold券盅,該值用于判定是否進入自我保護模式,在自我保護模式下膛檀,AbstractInstanceRegistry#evictionTimer任務直接返回锰镀,不剔除過期服務。
numberOfRenewsPerMinThreshold計算在PeerAwareInstanceRegistryImpl#updateRenewsPerMinThreshold
protected void updateRenewsPerMinThreshold() {
this.numberOfRenewsPerMinThreshold = (int) (this.expectedNumberOfClientsSendingRenews
* (60.0 / serverConfig.getExpectedClientRenewalIntervalSeconds())
* serverConfig.getRenewalPercentThreshold());
}
expectedNumberOfClientsSendingRenews
-> 已注冊服務總數(shù)
60.0 / serverConfig.getExpectedClientRenewalIntervalSeconds()
-> expected-client-renewal-interval-seconds配置了Client間隔多少秒發(fā)一次心跳咖刃,這里計算一個Client每分鐘發(fā)送心跳數(shù)量泳炉。
RenewalPercentThreshold 自我保護閥值因子。
可以看到嚎杨,numberOfRenewsPerMinThreshold表示一分鐘內Server接收心跳最低次數(shù)花鹅,實際數(shù)量少于該值則進入自我保護模式。
此時Eureka認為客戶端與注冊中心出現(xiàn)了網絡故障(比如網絡故障或頻繁的啟動關閉客戶端)枫浙,不再剔除任何服務刨肃,它要等待網絡故障恢復后,再退出自我保護模式箩帚。這樣可以最大程度保證服務間正常調用真友。
PeerAwareInstanceRegistryImpl#isLeaseExpirationEnabled方法判定當前是否處于自我保護模式。該方法比較renewsLastMin中的值是否大于numberOfRenewsPerMinThreshold紧帕,AbstractInstanceRegistry#renewsLastMin統(tǒng)計一分鐘內心跳次數(shù)盔然。
配置:
eureka.server.enable-self-preservation,是否啟用自我保護機制是嗜,默認為true
eureka.server.expected-client-renewal-interval-seconds愈案,Client間隔多少秒發(fā)送一次心跳
eureka.server.renewal-percent-threshold,自我保護閥值因子鹅搪,默認0.85
狀態(tài)更新
InstanceInfo維護了狀態(tài)變量status和覆蓋狀態(tài)變量overriddenStatus站绪。
status是Eureka Client本身發(fā)布的狀態(tài)。
overriddenstatus是手動或通過工具強制執(zhí)行的狀態(tài)涩嚣。
Server端提供服務apps/{appName}/{instanceId}/status
崇众,可以變更服務實例status以及overriddenStatus,從而主動變更服務狀態(tài)航厚。
注意顷歌,并不會修改Client端的服務狀態(tài),而是修改Server段服務注冊信息中保存的服務狀態(tài)幔睬。
而Server處理Client注冊或心跳時眯漩,會使用overriddenstatus覆蓋status。
Eureka Client在獲取到注冊信息時,會調用DiscoveryClient#shuffleInstances方法赦抖,過濾掉非InstanceStatus.UP狀態(tài)的服務實例舱卡,從而避免調動該實例,以達到服務實例的暫停服務队萤,而無需關閉服務實例轮锥。
InstanceInfo還維護了lastDirtyTimestamp變量,代表服務注冊信息最后更新時間要尔。
從InstanceResource可以看到舍杜,更新狀態(tài)statusUpdate或者刪除狀態(tài)deleteStatusUpdate時都可以提供lastDirtyTimestamp,
而處理心跳的renewLease方法赵辕,必須有l(wèi)astDirtyTimestamp參數(shù)既绩,validateDirtyTimestamp方法負責檢驗lastDirtyTimestamp參數(shù)
- 當lastDirtyTimestamp參數(shù)等于當前注冊信息中的lastDirtyTimestamp,返回處理成功还惠。
- 當lastDirtyTimestamp參數(shù)大于當前注冊信息中的lastDirtyTimestamp饲握,返回NOT_FOUND狀態(tài),表示Client的信息已經過期蚕键,需要重新注冊救欧。
- 當lastDirtyTimestamp參數(shù)小于當前注冊信息中的lastDirtyTimestamp,返回CONFLICT(409)狀態(tài)嚎幸,表示數(shù)據(jù)沖突颜矿,并返回當前節(jié)點中該服務的注冊信息。
這時如果心跳是Client發(fā)起的嫉晶,Client會忽略409的返回狀態(tài)(DiscoveryClient#renew),但如果是其他Server節(jié)點同步過來的田篇,發(fā)送心跳的Server節(jié)點會使用返回的服務注冊信息更新本節(jié)點的注冊信息(PeerEurekaNode#heartbeat)替废。
配置:
eureka.client.filter-only-up-instances,獲取實例時是否只保留UP狀態(tài)的實例泊柬,默認為true
eureka.server.sync-when-timestamp-differs椎镣,當時間戳不一致時,是否進行同步數(shù)據(jù)兽赁,默認為true
文本關于Eureka的分享就到這里状答,我們可以Eureka設計和實現(xiàn)都比較簡單,但是非常實用刀崖。
我在深入閱讀Eureka源碼前猶豫了一段時間(畢竟Eureka 2.0 開源流產)惊科,不過經過一段時間深入學習,收獲不少亮钦,希望這篇文章也可以給對Eureka感興趣的同學提供一個深入學習思路馆截。
如果您覺得本文不錯,歡迎關注我的微信公眾號,您的關注是我堅持的動力蜡娶!