Eureka Server作為一個開箱即用的服務(wù)中心,主要有以下功能:
- 服務(wù)注冊拉鹃;
- 接收服務(wù)心跳(續(xù)租)膏燕;
- 服務(wù)剔除煌寇;
- 服務(wù)下線阀溶;
- 集群同步;
- 獲取注冊表中服務(wù)實例信息击纬。
需要注意的是更振,Eureka Server本身也是一個Eureka Client肯腕,在不禁止其客戶端行為時实撒,他會向其他Eureka Server執(zhí)行注冊知态、發(fā)送心跳等操作贡茅。
源碼和配置
其中重要的類為com.netflix.eureka.registry.AbstractInstanceRegistry
類顶考。該類主管該Server下的Client的活動信息村怪。另外一個接口com.netflix.eureka.registry.PeerAwareInstanceRegistry
,只有一個實現(xiàn)類,主管和peer之間的同步等操作搅轿。
數(shù)據(jù)結(jié)構(gòu)
注冊表是Eureka中的主要部分璧坟,在此簡單分析一下他的數(shù)據(jù)結(jié)構(gòu)。
服務(wù)注冊
服務(wù)注冊的默認租約時間leaseDuration為90s黎茎。主要實現(xiàn)位于com.netflix.eureka.registry.AbstractInstanceRegistry#register
方法中傅瞻,其實現(xiàn)如下(以下所有的代碼刪除了logger):
public void register(InstanceInfo registrant, int leaseDuration, boolean isReplication) {
try {
//首先獲取讀鎖嗅骄,防止其他線程修改數(shù)據(jù)掸读,但是可以讀取數(shù)據(jù)
read.lock();
//registry 全局變量儿惫,ConcurrentHashMap類型留搔,總注冊信息
//通過APPName來獲取服務(wù)集群的租約集合
Map<String, Lease<InstanceInfo>> gMap = registry.get(registrant.getAppName());
REGISTER.increment(isReplication);
if (gMap == null) {
final ConcurrentHashMap<String, Lease<InstanceInfo>> gNewMap = new ConcurrentHashMap<String, Lease<InstanceInfo>>();
//此處使用putIfAbsent而不是put隔显,防止多線程獲取readLock之后括眠,覆蓋別的線程添加的屬性
gMap = registry.putIfAbsent(registrant.getAppName(), gNewMap);
if (gMap == null) {
gMap = gNewMap;
}
}
//獲取Client實例的租約信息
Lease<InstanceInfo> existingLease = gMap.get(registrant.getId());
//租約存在,且租約中的Client實例不為空
if (existingLease != null && (existingLease.getHolder() != null)) {
//舊租約的修改時間
Long existingLastDirtyTimestamp = existingLease.getHolder().getLastDirtyTimestamp();
//新租約的修改時間
Long registrationLastDirtyTimestamp = registrant.getLastDirtyTimestamp();
//因為是多線程操作当船,所以新租約的時間不一定大于舊租約的時間
if (existingLastDirtyTimestamp > registrationLastDirtyTimestamp) {
registrant = existingLease.getHolder();
}
} else {
// 租約不存在
synchronized (lock) {
//自我保護機制[1]
if (this.expectedNumberOfRenewsPerMin > 0) {
// Since the client wants to cancel it, reduce the threshold
// (1 for 30 seconds, 2 for a minute)
this.expectedNumberOfRenewsPerMin = this.expectedNumberOfRenewsPerMin + 2;
this.numberOfRenewsPerMinThreshold =
(int) (this.expectedNumberOfRenewsPerMin * serverConfig.getRenewalPercentThreshold());
}
}
}
Lease<InstanceInfo> lease = new Lease<InstanceInfo>(registrant, leaseDuration);
if (existingLease != null) {
//繼承上次的服務(wù)啟動時間
lease.setServiceUpTimestamp(existingLease.getServiceUpTimestamp());
}
gMap.put(registrant.getId(), lease);
synchronized (recentRegisteredQueue) {
recentRegisteredQueue.add(new Pair<Long, String>(
System.currentTimeMillis(),
registrant.getAppName() + "(" + registrant.getId() + ")"));
}
//把不是UNKUOWN的缩幸,且新的實例放到overriddenInstanceStatusMap中
if (!InstanceStatus.UNKNOWN.equals(registrant.getOverriddenStatus())) {
if (!overriddenInstanceStatusMap.containsKey(registrant.getId())) {
overriddenInstanceStatusMap.put(registrant.getId(), registrant.getOverriddenStatus());
}
}
InstanceStatus overriddenStatusFromMap = overriddenInstanceStatusMap.get(registrant.getId());
if (overriddenStatusFromMap != null) {
logger.info("Storing overridden status {} from map", overriddenStatusFromMap);
registrant.setOverriddenStatus(overriddenStatusFromMap);
}
// 根據(jù)覆蓋狀態(tài)規(guī)則設(shè)置服務(wù)的狀態(tài)
InstanceStatus overriddenInstanceStatus = getOverriddenInstanceStatus(registrant, existingLease, isReplication);
registrant.setStatusWithoutDirty(overriddenInstanceStatus);
//如果實例為UP狀態(tài)钞护,則設(shè)置服務(wù)啟動時間(僅第一次有效)
if (InstanceStatus.UP.equals(registrant.getStatus())) {
lease.serviceUp();
}
registrant.setActionType(ActionType.ADDED);
//recentlyChangedQueue用于Client增量式獲取注冊表信息
recentlyChangedQueue.add(new RecentlyChangedItem(lease));
registrant.setLastUpdatedTimestamp();
//設(shè)置responseCache緩存過期患亿,用于Client全量獲取注冊表信息
invalidateCache(registrant.getAppName(), registrant.getVIPAddress(), registrant.getSecureVipAddress());
logger.info("Registered instance {}/{} with status {} (replication={})",
registrant.getAppName(), registrant.getId(), registrant.getStatus(), isReplication);
} finally {
read.unlock();
}
}
在注冊方法中有諸多同步或安全的操作惦界,下面簡析一二:
開始read.lock()沾歪,使用read的原因,因為readLock是共享鎖狂窑,所以有多線程注冊多個服務(wù)時泉哈,并行執(zhí)行丛晦。對應(yīng)的writeLock排它鎖只用在獲取增量數(shù)據(jù)時,因為獲取增量數(shù)據(jù)時要保證數(shù)據(jù)此時不可更改锌蓄,用writeLock禁止其他線程的read操作煤率。readLock都是對內(nèi)操作,writeLock是對外要保持統(tǒng)一口徑辆沦。
代碼中涉及的全局變量Map都是ConcurrentMap。保證線程安全蔚晨,且全局共享银择。
在向registry中存值的時候累舷,并沒有強制性覆蓋浩考,用
putIfAbsent
代替put
方法,兩者都會返回key對應(yīng)的舊值被盈,但前者并不會覆蓋析孽。然后用之前的值來進行操作(因為是根據(jù)實例名獲取的,所以后面只對其租約信息進行修改)只怎。自我保護機制處使用了synchronized+volatile:
- 對于多個修改變量袜瞬,采用synchronized空對象身堡;
- 因為變量要在別的方法中進行計算安全機制吞滞,所以需要volatile關(guān)鍵字,保證該字段隨時可見盾沫。
(此處的字段不用CAS裁赠,因為涉及乘法,CAS對乘法不太友好赴精。)
- 對recentRegisteredQueue的修改佩捞,鎖定該對象,防止所有線程在別的方法中進行操作該值蕾哟。
- 緩存類變量基本都是由guava的CacheBuilder來創(chuàng)建的一忱,保證時效和容量等問題。
Tip:[1] 自我保護機制:隨著Client注冊數(shù)量的增加谭确,期待每分鐘維持心跳的數(shù)量也增加帘营,eureka Server檢查到每分鐘續(xù)約的次數(shù)少于期待數(shù)量的閾值(默認0.85)時,就會認為出現(xiàn)異常情況逐哈,從而進入自我保護機制芬迄。默認2次/min/服務(wù)(基于心跳時間)。Eureka會保護這些實例昂秃,認為他們是健康的禀梳,不把他們移出注冊列表。Eureka Server會輸出紅色的文字:
EMERGENCY!EUREKA MAY BE INCORRECTLY CLAIMING INSTANCES ARE UP WHEN THEY'RE NOT.RENEWALS ARE LESSER THAN THRESHOLD AND HENCE THE INSTANCES ARE NOT BEGING EXPIRED JUST TO BE SAFE.
接受心跳服務(wù)
Client默認每隔30s向Server發(fā)起一次續(xù)租肠骆,也叫心跳服務(wù)算途。核心代碼為com.netflix.eureka.registry.AbstractInstanceRegistry#renew
。
public boolean renew(String appName, String id, boolean isReplication) {
RENEW.increment(isReplication);
Map<String, Lease<InstanceInfo>> gMap = registry.get(appName);
Lease<InstanceInfo> leaseToRenew = null;
if (gMap != null) {
leaseToRenew = gMap.get(id);
}
//如果沒有發(fā)現(xiàn)租約蚀腿,返回false
if (leaseToRenew == null) {
RENEW_NOT_FOUND.increment(isReplication);
logger.warn("DS: Registry: lease doesn't exist, registering resource: {} - {}", appName, id);
return false;
} else {
//獲取租約中的實例信息
InstanceInfo instanceInfo = leaseToRenew.getHolder();
if (instanceInfo != null) {
//獲取服務(wù)的最終狀態(tài)
InstanceStatus overriddenInstanceStatus = this.getOverriddenInstanceStatus(
instanceInfo, leaseToRenew, isReplication);
if (overriddenInstanceStatus == InstanceStatus.UNKNOWN) {
RENEW_NOT_FOUND.increment(isReplication);
return false;
}
if (!instanceInfo.getStatus().equals(overriddenInstanceStatus)) {
//如果實例的狀態(tài)和最終的覆蓋狀態(tài)不一致嘴瓤,設(shè)置新的狀態(tài)
instanceInfo.setStatusWithoutDirty(overriddenInstanceStatus);
}
}
//最后一分鐘的續(xù)租數(shù)(該續(xù)租數(shù)每60秒調(diào)度自動更新為0)
renewsLastMin.increment();
//修改最后續(xù)租時間
leaseToRenew.renew();
return true;
}
}
這部分有很多東西都是在register方法里出現(xiàn)過的,所以不再贅述。
服務(wù)剔除
核心代碼是com.netflix.eureka.registry.AbstractInstanceRegistry#evict(long)
廓脆。該入口參數(shù)additionalLeaseMs 一般為0畏浆。此值是補償時間,即如果出現(xiàn)GC或者時鐘偏移等情況而補償?shù)臅r間狞贱。用來判斷是否過期刻获。
@Override
public void evict() {
evict(0l);
}
public void evict(long additionalLeaseMs) {
logger.debug("Running the evict task");
//是否開啟租約保護[1]
if (!isLeaseExpirationEnabled()) {
logger.debug("DS: lease expiration is currently disabled.");
return;
}
// 遍歷所有節(jié)點,找到租約過期的節(jié)點
List<Lease<InstanceInfo>> expiredLeases = new ArrayList<>();
for (Entry<String, Map<String, Lease<InstanceInfo>>> groupEntry : registry.entrySet()) {
Map<String, Lease<InstanceInfo>> leaseMap = groupEntry.getValue();
if (leaseMap != null) {
for (Entry<String, Lease<InstanceInfo>> leaseEntry : leaseMap.entrySet()) {
Lease<InstanceInfo> lease = leaseEntry.getValue();
if (lease.isExpired(additionalLeaseMs) && lease.getHolder() != null) {
expiredLeases.add(lease);
}
}
}
}
int registrySize = (int) getLocalRegistrySize();
int registrySizeThreshold = (int) (registrySize * serverConfig.getRenewalPercentThreshold());
//可剔除的節(jié)點數(shù) = 本地節(jié)點總數(shù) - 閾值節(jié)點數(shù)
int evictionLimit = registrySize - registrySizeThreshold;
//保護機制 - 保證剔除最少量的節(jié)點
int toEvict = Math.min(expiredLeases.size(), evictionLimit);
if (toEvict > 0) {
Random random = new Random(System.currentTimeMillis());
for (int i = 0; i < toEvict; i++) {
// Pick a random item (Knuth shuffle algorithm)
int next = i + random.nextInt(expiredLeases.size() - i);
//隨機獲取一個節(jié)點瞎嬉,并打亂原有順序(交換對應(yīng)節(jié)點)
Collections.swap(expiredLeases, i, next);
Lease<InstanceInfo> lease = expiredLeases.get(i);
String appName = lease.getHolder().getAppName();
String id = lease.getHolder().getId();
EXPIRED.increment();
//服務(wù)下線
internalCancel(appName, id, false);
}
}
}
在打亂節(jié)點順序部分蝎毡,采用隨機,是避免同一時間同一服務(wù)集群全部過期然后被剔除的現(xiàn)象氧枣,從而引發(fā)程序崩潰沐兵。
Tips:
[1] 是否開啟租約保護:該值有兩種情況:
- 與
eureka.server.enable-self-preservation
配置有關(guān),默認為true便监,即不開啟租約保護扎谎。為false時開啟租約保護。 - 上述值為true時烧董,期望每分鐘的續(xù)租閾值數(shù)大于0且最后一分鐘的續(xù)租數(shù)大于該期望值毁靶,即每分鐘的續(xù)租數(shù)不能小于閾值數(shù),則為true逊移,否則開啟保護機制预吆,不準服務(wù)剔除。默認的閾值為0.85胳泉,通過
eureka.server.renewal.percent.threshold
來設(shè)置(必為double類型)拐叉。
服務(wù)下線
核心代碼是com.netflix.eureka.registry.AbstractInstanceRegistry#internalCancel
。
@Override
public boolean cancel(String appName, String id, boolean isReplication) {
return internalCancel(appName, id, isReplication);
}
protected boolean internalCancel(String appName, String id, boolean isReplication) {
try {
//獲取讀鎖
read.lock();
CANCEL.increment(isReplication);
Map<String, Lease<InstanceInfo>> gMap = registry.get(appName);
Lease<InstanceInfo> leaseToCancel = null;
if (gMap != null) {
leaseToCancel = gMap.remove(id);
}
//recentCanceledQueue和之前的recentRegisteredQueue都是統(tǒng)計隊列
synchronized (recentCanceledQueue) {
recentCanceledQueue.add(new Pair<Long, String>(System.currentTimeMillis(), appName + "(" + id + ")"));
}
//如果租約為空扇商,直接返回false
if (leaseToCancel == null) {
CANCEL_NOT_FOUND.increment(isReplication);
return false;
} else {
//設(shè)置下線時間(也叫驅(qū)逐時間)
leaseToCancel.cancel();
InstanceInfo instanceInfo = leaseToCancel.getHolder();
String vip = null;
String svip = null;
if (instanceInfo != null) {
instanceInfo.setActionType(ActionType.DELETED);
//添加到最近租約變更隊列
recentlyChangedQueue.add(new RecentlyChangedItem(leaseToCancel));
instanceInfo.setLastUpdatedTimestamp();
//虛擬網(wǎng)絡(luò)協(xié)議主機地址
vip = instanceInfo.getVIPAddress();
//安全的虛擬網(wǎng)絡(luò)協(xié)議主機地址
svip = instanceInfo.getSecureVipAddress();
}
//清理緩存 - 使responseCache中指定屬性移除
invalidateCache(appName, vip, svip);
return true;
}
} finally {
read.unlock();
}
}
獲取注冊表
該功能涉及多個Eureka Server凤瘦。
從Client部分緩存注冊表也了解到注冊表有全量和增量兩種方式。
全量獲取注冊表
該部分的核心代碼是com.netflix.eureka.registry.AbstractInstanceRegistry#getApplicationsFromMultipleRegions
案铺。
public Applications getApplicationsFromMultipleRegions(String[] remoteRegions) {
//查看是否有遠程區(qū)域
boolean includeRemoteRegion = null != remoteRegions && remoteRegions.length != 0;
if (includeRemoteRegion) {
GET_ALL_WITH_REMOTE_REGIONS_CACHE_MISS.increment();
} else {
GET_ALL_CACHE_MISS.increment();
}
Applications apps = new Applications();
apps.setVersion(1L);
//從本地registry上獲取實例信息
for (Entry<String, Map<String, Lease<InstanceInfo>>> entry : registry.entrySet()) {
Application app = null;
if (entry.getValue() != null) {
for (Entry<String, Lease<InstanceInfo>> stringLeaseEntry : entry.getValue().entrySet()) {
Lease<InstanceInfo> lease = stringLeaseEntry.getValue();
if (app == null) {
app = new Application(lease.getHolder().getAppName());
}
//decorateInstanceInfo方法就是講lease中holder轉(zhuǎn)換為Instance
app.addInstance(decorateInstanceInfo(lease));
}
}
if (app != null) {
apps.addApplication(app);
}
}
//從遠程注冊表上獲取實例信息
if (includeRemoteRegion) {
for (String remoteRegion : remoteRegions) {
RemoteRegionRegistry remoteRegistry = regionNameVSRemoteRegistry.get(remoteRegion);
if (null != remoteRegistry) {
Applications remoteApps = remoteRegistry.getApplications();
for (Application application : remoteApps.getRegisteredApplications()) {
if (shouldFetchFromRemoteRegistry(application.getName(), remoteRegion)) {
logger.info("Application {} fetched from the remote region {}",
application.getName(), remoteRegion);
Application appInstanceTillNow = apps.getRegisteredApplications(application.getName());
if (appInstanceTillNow == null) {
appInstanceTillNow = new Application(application.getName());
apps.addApplication(appInstanceTillNow);
}
for (InstanceInfo instanceInfo : application.getInstances()) {
appInstanceTillNow.addInstance(instanceInfo);
}
} else {
logger.debug("Application {} not fetched from the remote region {} as there exists a "
+ "whitelist and this app is not in the whitelist.",
application.getName(), remoteRegion);
}
}
} else {
logger.warn("No remote registry available for the remote region {}", remoteRegion);
}
}
}
apps.setAppsHashCode(apps.getReconcileHashCode());
return apps;
}
入?yún)emoteRegions為指定區(qū)域的數(shù)組集合蔬芥。來源于配置eureka.server.remote-region-urls-with-name
,默認為空红且。
從多地區(qū)增量式獲取注冊表
增量獲取的要義還是從最近修改的隊列中坝茎,獲取到最近修改的instanceInfo涤姊,從全局map中獲取到這些實例的完整信息暇番,然后將這些instanceInfo打包過去。
該部分的核心代碼com.netflix.eureka.registry.AbstractInstanceRegistry#getApplicationDeltasFromMultipleRegions
思喊。
public Applications getApplicationDeltasFromMultipleRegions(String[] remoteRegions) {
if (null == remoteRegions) {
remoteRegions = allKnownRemoteRegions; // null means all remote regions.
}
boolean includeRemoteRegion = remoteRegions.length != 0;
if (includeRemoteRegion) {
GET_ALL_WITH_REMOTE_REGIONS_CACHE_MISS_DELTA.increment();
} else {
GET_ALL_CACHE_MISS_DELTA.increment();
}
Applications apps = new Applications();
apps.setVersion(responseCache.getVersionDeltaWithRegions().get());
Map<String, Application> applicationInstancesMap = new HashMap<String, Application>();
try {
//獲取寫鎖壁酬,排它鎖
write.lock();
Iterator<RecentlyChangedItem> iter = this.recentlyChangedQueue.iterator();
while (iter.hasNext()) {
Lease<InstanceInfo> lease = iter.next().getLeaseInfo();
InstanceInfo instanceInfo = lease.getHolder();
Application app = applicationInstancesMap.get(instanceInfo.getAppName());
if (app == null) {
app = new Application(instanceInfo.getAppName());
applicationInstancesMap.put(instanceInfo.getAppName(), app);
apps.addApplication(app);
}
app.addInstance(decorateInstanceInfo(lease));
}
if (includeRemoteRegion) {
for (String remoteRegion : remoteRegions) {
RemoteRegionRegistry remoteRegistry = regionNameVSRemoteRegistry.get(remoteRegion);
if (null != remoteRegistry) {
Applications remoteAppsDelta = remoteRegistry.getApplicationDeltas();
if (null != remoteAppsDelta) {
for (Application application : remoteAppsDelta.getRegisteredApplications()) {
if (shouldFetchFromRemoteRegistry(application.getName(), remoteRegion)) {
Application appInstanceTillNow =
apps.getRegisteredApplications(application.getName());
if (appInstanceTillNow == null) {
appInstanceTillNow = new Application(application.getName());
apps.addApplication(appInstanceTillNow);
}
for (InstanceInfo instanceInfo : application.getInstances()) {
appInstanceTillNow.addInstance(instanceInfo);
}
}
}
}
}
}
}
Applications allApps = getApplicationsFromMultipleRegions(remoteRegions);
apps.setAppsHashCode(allApps.getReconcileHashCode());
return apps;
} finally {
write.unlock();
}
}
集群同步
該功能涉及多個Eureka Server。
Eureka Server初始化本地注冊表
這部分主要通過代碼com.netflix.eureka.registry.PeerAwareInstanceRegistryImpl#syncUp
來完成。主要執(zhí)行:
- 它會先從peer(同伴)節(jié)點上拉取注冊信息舆乔。
- 并將其中的服務(wù)實例注冊到本地注冊表中岳服。
@Override
public int syncUp() {
// Copy entire entry from neighboring DS node
int count = 0;
for (int i = 0; ((i < serverConfig.getRegistrySyncRetries()) && (count == 0)); i++) {
if (i > 0) {
try {
//注冊表重試等待時間
Thread.sleep(serverConfig.getRegistrySyncRetryWaitMs());
} catch (InterruptedException e) {
logger.warn("Interrupted during registry transfer..");
break;
}
}
Applications apps = eurekaClient.getApplications();
for (Application app : apps.getRegisteredApplications()) {
for (InstanceInfo instance : app.getInstances()) {
try {
if (isRegisterable(instance)) {
//此處走com.netflix.eureka.registry.AbstractInstanceRegistry#register方法
register(instance, instance.getLeaseInfo().getDurationInSecs(), true);
count++;
}
} catch (Throwable t) {
logger.error("During DS init copy", t);
}
}
}
}
return count;
}
在初始化本地注冊表時,EurekaServer不會接受來自Client的通信請求(如注冊希俩、續(xù)租吊宋、獲取注冊表等)。在同步結(jié)束后颜武,通過com.netflix.eureka.registry.PeerAwareInstanceRegistryImpl#openForTraffic
方法來允許該Server接受流量璃搜。關(guān)于這兩部分的調(diào)用邏輯,在com.netflix.eureka.EurekaBootStrap#initEurekaServerContext
代碼中鳞上。而該方法調(diào)用時是EurekaBootStrap實現(xiàn)javax.servlet.ServletContextListener#contextInitialized
的內(nèi)部这吻。
@Override
public void openForTraffic(ApplicationInfoManager applicationInfoManager, int count) {
// 初始化自我保護機制統(tǒng)計參數(shù)
this.expectedNumberOfRenewsPerMin = count * 2;
this.numberOfRenewsPerMinThreshold =
(int) (this.expectedNumberOfRenewsPerMin * serverConfig.getRenewalPercentThreshold());
this.startupTime = System.currentTimeMillis();
if (count > 0) {
this.peerInstancesTransferEmptyOnStartup = false;
}
DataCenterInfo.Name selfName = applicationInfoManager.getInfo().getDataCenterInfo().getName();
//判斷是否是AWS環(huán)境,一般我們的環(huán)境是MyOwn環(huán)境
boolean isAws = Name.Amazon == selfName;
if (isAws && serverConfig.shouldPrimeAwsReplicaConnections()) {
logger.info("Priming AWS connections for all replicas..");
primeAwsReplicas(applicationInfoManager);
}
//修改服務(wù)實例狀態(tài)為上線狀態(tài)
applicationInfoManager.setInstanceStatus(InstanceStatus.UP);
//驅(qū)逐任務(wù)以及測試每分鐘心率(租約率)的任務(wù)
super.postInit();
}
Eureka Server 之間注冊信息表信息的同步復(fù)制
這部分代碼核心在com.netflix.eureka.registry.PeerAwareInstanceRegistryImpl
中篙议,其包括下線唾糯,注冊和續(xù)約等等。
下線
@Override
public boolean cancel(final String appName, final String id,
final boolean isReplication) {
//調(diào)用 AbstractInstanceRegistry 的cancel
if (super.cancel(appName, id, isReplication)) {
//復(fù)制(同步)下線狀態(tài)到同伴
replicateToPeers(Action.Cancel, appName, id, null, null, isReplication);
//鎖定空對象
synchronized (lock) {
if (this.expectedNumberOfRenewsPerMin > 0) {
this.expectedNumberOfRenewsPerMin = this.expectedNumberOfRenewsPerMin - 2;
this.numberOfRenewsPerMinThreshold =
(int) (this.expectedNumberOfRenewsPerMin * serverConfig.getRenewalPercentThreshold());
}
}
return true;
}
return false;
}
注冊方法
@Override
public void register(final InstanceInfo info, final boolean isReplication) {
//默認租約時間90s
int leaseDuration = Lease.DEFAULT_DURATION_IN_SECS;
if (info.getLeaseInfo() != null && info.getLeaseInfo().getDurationInSecs() > 0) {
leaseDuration = info.getLeaseInfo().getDurationInSecs();
}
//如果租約為空鬼贱,則用默認時間移怯;若租約不為空,注冊租約該有的剩余時間
//調(diào)用 AbstractInstanceRegistry 的renew
super.register(info, leaseDuration, isReplication);
replicateToPeers(Action.Register, info.getAppName(), info.getId(), info, null, isReplication);
}
續(xù)租方法
public boolean renew(final String appName, final String id, final boolean isReplication) {
//調(diào)用 AbstractInstanceRegistry 的renew
if (super.renew(appName, id, isReplication)) {
replicateToPeers(Action.Heartbeat, appName, id, null, null, isReplication);
return true;
}
return false;
}
同步到同伴節(jié)點
下面是復(fù)制到同伴的入口方法这难,代碼是com.netflix.eureka.registry.PeerAwareInstanceRegistryImpl#replicateToPeers
芋酌。
private void replicateToPeers(Action action, String appName, String id,
InstanceInfo info /* optional */,
InstanceStatus newStatus /* optional */, boolean isReplication) {
//Stopwatch是測試執(zhí)行某些代碼的時間
Stopwatch tracer = action.getTimer().start();
try {
//是否是復(fù)制的
if (isReplication) {
numberOfReplicationsLastMin.increment();
}
// 如果peer集群為空,或者本來就是復(fù)制操作雁佳,則不再執(zhí)行脐帝,防止造成污染復(fù)制
if (peerEurekaNodes == Collections.EMPTY_LIST || isReplication) {
return;
}
for (final PeerEurekaNode node : peerEurekaNodes.getPeerEurekaNodes()) {
// If the url represents this host, do not replicate to yourself.
if (peerEurekaNodes.isThisMyUrl(node.getServiceUrl())) {
continue;
}
//同步Instance到同伴節(jié)點
replicateInstanceActionsToPeers(action, appName, id, info, newStatus, node);
}
} finally {
tracer.stop();
}
}
peerEurekaNodes是同伴Eureka Server節(jié)點。每個Eureka Server會向同伴同步數(shù)據(jù)糖权。
同步到同伴的代碼為com.netflix.eureka.registry.PeerAwareInstanceRegistryImpl#replicateInstanceActionsToPeers
搁拙。根據(jù)同步的Action操作,來分類同步亡电。
private void replicateInstanceActionsToPeers(Action action, String appName,
String id, InstanceInfo info, InstanceStatus newStatus,
PeerEurekaNode node) {
try {
InstanceInfo infoFromRegistry = null;
CurrentRequestVersion.set(Version.V2);
switch (action) {
case Cancel:
node.cancel(appName, id);
break;
case Heartbeat:
InstanceStatus overriddenStatus = overriddenInstanceStatusMap.get(id);
infoFromRegistry = getInstanceByAppAndId(appName, id, false);
node.heartbeat(appName, id, infoFromRegistry, overriddenStatus, false);
break;
case Register:
node.register(info);
break;
case StatusUpdate:
infoFromRegistry = getInstanceByAppAndId(appName, id, false);
node.statusUpdate(appName, id, newStatus, infoFromRegistry);
break;
case DeleteStatusOverride:
infoFromRegistry = getInstanceByAppAndId(appName, id, false);
node.deleteStatusOverride(appName, id, infoFromRegistry);
break;
}
} catch (Throwable t) {
logger.error("Cannot replicate information to {} for action {}", node.getServiceUrl(), action.name(), t);
}
}
同步操作只有以下幾種:
public enum Action {
Heartbeat, Register, Cancel, StatusUpdate, DeleteStatusOverride;
......
}
上面的請求在com.netflix.eureka.transport.JerseyReplicationClient
中髓迎。如HeartBeat的請求在com.netflix.eureka.transport.JerseyReplicationClient#sendHeartBeat
中〗耍可以查看路徑等腿堤。這部分路徑就和client中處理對應(yīng)功能是一樣的。