本章主要介紹Eureka server端源碼分析。
在分析eureka源碼源碼之前经宏,需要首先了解eureka的作用犀暑。
主要包括服務(wù)注冊(cè)與發(fā)現(xiàn),心跳續(xù)約烁兰,自動(dòng)保護(hù)機(jī)制耐亏,服務(wù)剔除,集群同步等功能沪斟。
@EnableEurekaServer注解
Eureka的服務(wù)端開(kāi)啟是通過(guò)這個(gè)注解广辰。
這個(gè)注解創(chuàng)建了EurekaServerMarkerConfiguration類(lèi)
EurekaServerMarkerConfiguration類(lèi)聲明這個(gè)類(lèi)是個(gè)配置類(lèi),并通過(guò)@Bean創(chuàng)建一個(gè)Marker類(lèi)用來(lái)進(jìn)行標(biāo)記主之。在spring自動(dòng)裝配過(guò)程中择吊,將會(huì)對(duì)EurekaServerAutoConfiguration類(lèi)進(jìn)行裝配條件判斷
EurekaServerAutoConfiguration
加載條件是判斷檢查標(biāo)記類(lèi)Marker是否存在。
如果存在槽奕,就表明加了@EnableEurekaServer注解几睛。
EurekaServerAutoConfiguration類(lèi)的作用是:
1、導(dǎo)入EurekaServerInitializerConfiguration類(lèi)粤攒。
服務(wù)剔除所森、自我保護(hù)機(jī)制囱持、初始化自我保護(hù)機(jī)制閾值
2、創(chuàng)建類(lèi)并注入spring容器中
EurekaServerAutoConfiguration配置類(lèi)需要?jiǎng)?chuàng)建的類(lèi)包括PeerAwareInstanceRegistry焕济、PeerEurekaNodes洪唐、EurekaServerContext、EurekaServerBootstrap吼蚁、FilterRegistrationBean凭需、javax.ws.rs.core.Application
其中FilterRegistrationBean和Application:查找資源文件并設(shè)置到Jersey過(guò)濾器中。
Jersey過(guò)濾器的作用是在"com.netflix.discovery","com.netflix.eureka" 兩個(gè)包下找到所有加了Path或Provider的注解肝匆,并返回的資源為Application實(shí)例對(duì)象粒蜈。
FilterRegistrationBean設(shè)置的過(guò)濾器對(duì)象是Application實(shí)例
所以,在eureka client與server端交互就是通過(guò)這些資源文件Application方法調(diào)用的旗国。這些資源訪問(wèn)與MVC模型中Controller層的訪問(wèn)原理類(lèi)似枯怖。
服務(wù)注冊(cè)
由客戶(hù)端發(fā)起Application資源ApplicationResource類(lèi)的調(diào)用
1、addInstance
com.netflix.eureka.resources.ApplicationResource#addInstance方法
首先檢查添加實(shí)例信息是否完整能曾,然后調(diào)用registry.register方法進(jìn)行實(shí)例注冊(cè)
@POST
@Consumes({"application/json", "application/xml"})
public Response addInstance(InstanceInfo info,
@HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication) {
logger.debug("Registering instance {} (replication={})", info.getId(), isReplication);
// validate that the instanceinfo contains all the necessary required fields
//驗(yàn)證一些實(shí)例數(shù)據(jù)是否完整....
// handle cases where clients may be registering with bad DataCenterInfo with missing data
registry.register(info, "true".equals(isReplication));
return Response.status(204).build(); // 204 to be backwards compatible
}
在查看添加實(shí)例具體的調(diào)用鏈之前度硝,需要了解下InstanceRegistry類(lèi)的繼承關(guān)系
2、register
org.springframework.cloud.netflix.eureka.server.InstanceRegistry#register(com.netflix.appinfo.InstanceInfo, int, boolean)
2.1寿冕、首先通過(guò)handleRegistration方法發(fā)布EurekaInstanceRegisteredEvent事件
2.2蕊程、調(diào)用父類(lèi)register方法
3、register
com.netflix.eureka.registry.PeerAwareInstanceRegistryImpl#register
注冊(cè)后進(jìn)行同步驼唱,通過(guò)isReplication參數(shù)控制藻茂。如果注冊(cè)是從其他復(fù)制節(jié)點(diǎn)進(jìn)行復(fù)制,則不同步復(fù)制
4玫恳、register
com.netflix.eureka.registry.AbstractInstanceRegistry#register
public void register(InstanceInfo registrant, int leaseDuration, boolean isReplication) {
try {
read.lock();
Map<String, Lease<InstanceInfo>> gMap = registry.get(registrant.getAppName());
REGISTER.increment(isReplication);
if (gMap == null) {
final ConcurrentHashMap<String, Lease<InstanceInfo>> gNewMap = new ConcurrentHashMap<String, Lease<InstanceInfo>>();
gMap = registry.putIfAbsent(registrant.getAppName(), gNewMap);
if (gMap == null) {
gMap = gNewMap;
}
}
Lease<InstanceInfo> existingLease = gMap.get(registrant.getId());
// Retain the last dirty timestamp without overwriting it, if there is already a lease
if (existingLease != null && (existingLease.getHolder() != null)) {
Long existingLastDirtyTimestamp = existingLease.getHolder().getLastDirtyTimestamp();
Long registrationLastDirtyTimestamp = registrant.getLastDirtyTimestamp();
logger.debug("Existing lease found (existing={}, provided={}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp);
if (existingLastDirtyTimestamp > registrationLastDirtyTimestamp) {
logger.warn("There is an existing lease and the existing lease's dirty timestamp {} is greater" +
" than the one that is being registered {}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp);
logger.warn("Using the existing instanceInfo instead of the new instanceInfo as the registrant");
registrant = existingLease.getHolder();
}
} else {
// The lease does not exist and hence it is a new registration
synchronized (lock) {
if (this.expectedNumberOfRenewsPerMin > 0) {
// Since the client wants to cancel it, reduce the threshold
// (1
// for 30 seconds, 2 for a minute)
this.expectedNumberOfRenewsPerMin = this.expectedNumberOfRenewsPerMin + 2;
this.numberOfRenewsPerMinThreshold =
(int) (this.expectedNumberOfRenewsPerMin * serverConfig.getRenewalPercentThreshold());
}
}
logger.debug("No previous lease information found; it is new registration");
}
Lease<InstanceInfo> lease = new Lease<InstanceInfo>(registrant, leaseDuration);
if (existingLease != null) {
lease.setServiceUpTimestamp(existingLease.getServiceUpTimestamp());
}
gMap.put(registrant.getId(), lease);
synchronized (recentRegisteredQueue) {
recentRegisteredQueue.add(new Pair<Long, String>(
System.currentTimeMillis(),
registrant.getAppName() + "(" + registrant.getId() + ")"));
}
// This is where the initial state transfer of overridden status happens
if (!InstanceStatus.UNKNOWN.equals(registrant.getOverriddenStatus())) {
logger.debug("Found overridden status {} for instance {}. Checking to see if needs to be add to the "
+ "overrides", registrant.getOverriddenStatus(), registrant.getId());
if (!overriddenInstanceStatusMap.containsKey(registrant.getId())) {
logger.info("Not found overridden id {} and hence adding it", registrant.getId());
overriddenInstanceStatusMap.put(registrant.getId(), registrant.getOverriddenStatus());
}
}
InstanceStatus overriddenStatusFromMap = overriddenInstanceStatusMap.get(registrant.getId());
if (overriddenStatusFromMap != null) {
logger.info("Storing overridden status {} from map", overriddenStatusFromMap);
registrant.setOverriddenStatus(overriddenStatusFromMap);
}
// Set the status based on the overridden status rules
InstanceStatus overriddenInstanceStatus = getOverriddenInstanceStatus(registrant, existingLease, isReplication);
registrant.setStatusWithoutDirty(overriddenInstanceStatus);
// If the lease is registered with UP status, set lease service up timestamp
if (InstanceStatus.UP.equals(registrant.getStatus())) {
lease.serviceUp();
}
registrant.setActionType(ActionType.ADDED);
recentlyChangedQueue.add(new RecentlyChangedItem(lease));
registrant.setLastUpdatedTimestamp();
invalidateCache(registrant.getAppName(), registrant.getVIPAddress(), registrant.getSecureVipAddress());
logger.info("Registered instance {}/{} with status {} (replication={})",
registrant.getAppName(), registrant.getId(), registrant.getStatus(), isReplication);
} finally {
read.unlock();
}
}
在這個(gè)方法中:
使用ReentrantReadWriteLock的讀鎖進(jìn)行加鎖辨赐,所以在客戶(hù)端調(diào)用資源進(jìn)行注冊(cè)時(shí),可能不止一個(gè)線程調(diào)用到這里
注冊(cè)信息registry使用ConcurrentHashMap<String, Map<String, Lease<InstanceInfo>>> 結(jié)構(gòu)京办,表示的意義是對(duì)于一個(gè)高可用微服務(wù)集群掀序。如下圖對(duì)于服務(wù)a有a1、a2惭婿、a3相同的微服務(wù)不恭。則ConcurrentHashMap中的String表示微服務(wù)組名稱(chēng)a,Map<String, Lease<InstanceInfo>>>是三個(gè)微服務(wù)實(shí)例审孽,其中Map中的String表示a1或a2或a3微服務(wù)名稱(chēng)县袱,InstanceInfo表示三個(gè)服務(wù)實(shí)例信息
根據(jù)instanceId查找existingLease實(shí)例對(duì)象浑娜,跟當(dāng)前registrant注冊(cè)實(shí)例比較佑力。使用時(shí)間戳較大的實(shí)例對(duì)registrant變量賦值。并設(shè)置實(shí)例的serviceUpTimestamp服務(wù)上線時(shí)間戳筋遭。
放入到當(dāng)前微服務(wù)集合中g(shù)Map.put(registrant.getId(), lease);到這里則完成服務(wù)注冊(cè)
總之打颤,執(zhí)行注冊(cè)實(shí)例的結(jié)果就是把a(bǔ)ppName的微服務(wù)加入到ConcurrentHashMap集合中暴拄。
心跳續(xù)約
客戶(hù)端更新續(xù)約通過(guò)put方法。通過(guò)客戶(hù)端發(fā)起請(qǐng)求
com.netflix.eureka.resources.InstanceResource#renewLease
com.netflix.eureka.registry.PeerAwareInstanceRegistryImpl#renew
調(diào)用父類(lèi)續(xù)約编饺,續(xù)約成功則同步集群
com.netflix.eureka.registry.AbstractInstanceRegistry#renew
public boolean renew(String appName, String id, boolean isReplication) {
RENEW.increment(isReplication);
Map<String, Lease<InstanceInfo>> gMap = registry.get(appName);
Lease<InstanceInfo> leaseToRenew = null;
if (gMap != null) {
leaseToRenew = gMap.get(id);
}
if (leaseToRenew == null) {
RENEW_NOT_FOUND.increment(isReplication);
logger.warn("DS: Registry: lease doesn't exist, registering resource: {} - {}", appName, id);
return false;
} else {
InstanceInfo instanceInfo = leaseToRenew.getHolder();
if (instanceInfo != null) {
// touchASGCache(instanceInfo.getASGName());
InstanceStatus overriddenInstanceStatus = this.getOverriddenInstanceStatus(
instanceInfo, leaseToRenew, isReplication);
if (overriddenInstanceStatus == InstanceStatus.UNKNOWN) {
logger.info("Instance status UNKNOWN possibly due to deleted override for instance {}"
+ "; re-register required", instanceInfo.getId());
RENEW_NOT_FOUND.increment(isReplication);
return false;
}
if (!instanceInfo.getStatus().equals(overriddenInstanceStatus)) {
Object[] args = {
instanceInfo.getStatus().name(),
instanceInfo.getOverriddenStatus().name(),
instanceInfo.getId()
};
logger.info(
"The instance status {} is different from overridden instance status {} for instance {}. "
+ "Hence setting the status to overridden status", args);
instanceInfo.setStatus(overriddenInstanceStatus);
}
}
renewsLastMin.increment();
leaseToRenew.renew();
return true;
}
}
獲取注冊(cè)信息中的具體實(shí)例租債器對(duì)象乖篷,執(zhí)行租債器的renew()方法
com.netflix.eureka.lease.Lease#renew
續(xù)約成功,則更改lastUpdateTimestamp的值是當(dāng)前系統(tǒng)時(shí)間戳與duration的和
服務(wù)下架
服務(wù)剔除是eureka服務(wù)端對(duì)過(guò)期的服務(wù)(長(zhǎng)時(shí)間沒(méi)有心跳的服務(wù))進(jìn)行定時(shí)清除透且。是服務(wù)端開(kāi)啟的定時(shí)任務(wù)撕蔼,具體源碼實(shí)現(xiàn)是在導(dǎo)入EurekaServerInitializerConfiguration類(lèi)中實(shí)現(xiàn)。服務(wù)下架是客戶(hù)端主動(dòng)下架
EurekaServerInitializerConfiguration實(shí)現(xiàn)了ServletContextAware, SmartLifecycle, Ordered接口秽誊。
start方法執(zhí)行時(shí)機(jī)
SmartLifecycle的start方法調(diào)用鏈(在spring的生命周期中):
AbstractApplicationContext#refresh
AbstractApplicationContext#finishRefresh
DefaultLifecycleProcessor#onRefresh
1鲸沮、start
EurekaServerInitializerConfiguration#start
在一個(gè)線程內(nèi)執(zhí)行這個(gè)方法,原因是執(zhí)行eureka上下文環(huán)境不會(huì)影響spring boot正常啟動(dòng)過(guò)程锅论。
這個(gè)方法功能是eureka服務(wù)端上下文的初始化讼溺,發(fā)布了EurekaRegistryAvailableEvent和EurekaServerStartedEvent事件。
2最易、contextInitialized
org.springframework.cloud.netflix.eureka.server.EurekaServerBootstrap#contextInitialized
3怒坯、initEurekaServerContext
EurekaServerBootstrap#initEurekaServerContext
包括從鄰近節(jié)點(diǎn)復(fù)制注冊(cè)表
4、openForTraffic
org.springframework.cloud.netflix.eureka.server.InstanceRegistry#openForTraffic
5藻懒、openForTraffic
com.netflix.eureka.registry.PeerAwareInstanceRegistryImpl#openForTraffic
6剔猿、postInit
com.netflix.eureka.registry.AbstractInstanceRegistry#postInit
7、EvictionTask
com.netflix.eureka.registry.AbstractInstanceRegistry.EvictionTask
創(chuàng)建EvictionTask類(lèi)嬉荆,EvictionTask繼承了TimerTask定時(shí)器任務(wù)類(lèi)
8艳馒、evict
com.netflix.eureka.registry.AbstractInstanceRegistry#evict(long)
public void evict(long additionalLeaseMs) {
logger.debug("Running the evict task");
if (!isLeaseExpirationEnabled()) {
logger.debug("DS: lease expiration is currently disabled.");
return;
}
// We collect first all expired items, to evict them in random order. For large eviction sets,
// if we do not that, we might wipe out whole apps before self preservation kicks in. By randomizing it,
// the impact should be evenly distributed across all applications.
List<Lease<InstanceInfo>> expiredLeases = new ArrayList<>();
for (Entry<String, Map<String, Lease<InstanceInfo>>> groupEntry : registry.entrySet()) {
Map<String, Lease<InstanceInfo>> leaseMap = groupEntry.getValue();
if (leaseMap != null) {
for (Entry<String, Lease<InstanceInfo>> leaseEntry : leaseMap.entrySet()) {
Lease<InstanceInfo> lease = leaseEntry.getValue();
if (lease.isExpired(additionalLeaseMs) && lease.getHolder() != null) {
expiredLeases.add(lease);
}
}
}
}
// To compensate for GC pauses or drifting local time, we need to use current registry size as a base for
// triggering self-preservation. Without that we would wipe out full registry.
int registrySize = (int) getLocalRegistrySize();
int registrySizeThreshold = (int) (registrySize * serverConfig.getRenewalPercentThreshold());
int evictionLimit = registrySize - registrySizeThreshold;
int toEvict = Math.min(expiredLeases.size(), evictionLimit);
if (toEvict > 0) {
logger.info("Evicting {} items (expired={}, evictionLimit={})", toEvict, expiredLeases.size(), evictionLimit);
Random random = new Random(System.currentTimeMillis());
for (int i = 0; i < toEvict; i++) {
// Pick a random item (Knuth shuffle algorithm)
int next = i + random.nextInt(expiredLeases.size() - i);
Collections.swap(expiredLeases, i, next);
Lease<InstanceInfo> lease = expiredLeases.get(i);
String appName = lease.getHolder().getAppName();
String id = lease.getHolder().getId();
EXPIRED.increment();
logger.warn("DS: Registry: expired lease for {}/{}", appName, id);
internalCancel(appName, id, false);
}
}
}
首先判斷是否開(kāi)啟了自我保護(hù)機(jī)制,遍歷注冊(cè)表類(lèi)并判斷每個(gè)租債器實(shí)例調(diào)用isExpired方法判斷是否過(guò)期员寇。對(duì)應(yīng)過(guò)期的實(shí)例加入到expiredLeases集合中弄慰。isExpired方法根據(jù)當(dāng)前時(shí)間戳與最后更新時(shí)間與duration和additionalLeaseMs比較得到。lastUpdateTimestamp最后更新時(shí)間是上次系統(tǒng)時(shí)間與duration 的和蝶锋,最后更新時(shí)間注冊(cè)陆爽、心跳續(xù)約、服務(wù)下架都會(huì)對(duì)這個(gè)參數(shù)進(jìn)行更新扳缕。
public boolean isExpired(long additionalLeaseMs) {
return (evictionTimestamp > 0 || System.currentTimeMillis() > (lastUpdateTimestamp + duration + additionalLeaseMs));
}
對(duì)于需要剔除的expiredLeases實(shí)例集合慌闭,并不會(huì)一下把所有的都會(huì)刪除。會(huì)根據(jù)registrySize和RenewalPercentThreshold計(jì)算出一個(gè)evictionLimit剔除最大數(shù)量躯舔。例如一共有registrySize=100驴剔。RenewalPercentThreshold=0.85,則evictionLimit=100-100*0.85=15個(gè)粥庄。如果expiredLeases超過(guò)15個(gè)則會(huì)任意選出15個(gè)進(jìn)行剔除丧失,對(duì)于剩下的下次剔除任務(wù)中刪除。如果下次還會(huì)超過(guò)則會(huì)觸發(fā)自動(dòng)保護(hù)機(jī)制isLeaseExpirationEnabled惜互,直接返回布讹。
集群同步
調(diào)用方法replicateToPeers
com.netflix.eureka.registry.PeerAwareInstanceRegistryImpl#replicateToPeers
/**
* Replicates all eureka actions to peer eureka nodes except for replication
* traffic to this node.
*
*/
private void replicateToPeers(Action action, String appName, String id,
InstanceInfo info /* optional */,
InstanceStatus newStatus /* optional */, boolean isReplication) {
Stopwatch tracer = action.getTimer().start();
try {
if (isReplication) {
numberOfReplicationsLastMin.increment();
}
// If it is a replication already, do not replicate again as this will create a poison replication
if (peerEurekaNodes == Collections.EMPTY_LIST || isReplication) {
return;
}
for (final PeerEurekaNode node : peerEurekaNodes.getPeerEurekaNodes()) {
// If the url represents this host, do not replicate to yourself.
if (peerEurekaNodes.isThisMyUrl(node.getServiceUrl())) {
continue;
}
replicateInstanceActionsToPeers(action, appName, id, info, newStatus, node);
}
} finally {
tracer.stop();
}
}
private void replicateInstanceActionsToPeers(Action action, String appName,
String id, InstanceInfo info, InstanceStatus newStatus,
PeerEurekaNode node) {
try {
InstanceInfo infoFromRegistry = null;
CurrentRequestVersion.set(Version.V2);
switch (action) {
case Cancel:
node.cancel(appName, id);
break;
case Heartbeat:
InstanceStatus overriddenStatus = overriddenInstanceStatusMap.get(id);
infoFromRegistry = getInstanceByAppAndId(appName, id, false);
node.heartbeat(appName, id, infoFromRegistry, overriddenStatus, false);
break;
case Register:
node.register(info);
break;
case StatusUpdate:
infoFromRegistry = getInstanceByAppAndId(appName, id, false);
node.statusUpdate(appName, id, newStatus, infoFromRegistry);
break;
case DeleteStatusOverride:
infoFromRegistry = getInstanceByAppAndId(appName, id, false);
node.deleteStatusOverride(appName, id, infoFromRegistry);
break;
}
} catch (Throwable t) {
logger.error("Cannot replicate information to {} for action {}", node.getServiceUrl(), action.name(), t);
}
}
當(dāng)前實(shí)例的行為包括Heartbeat, Register, Cancel, StatusUpdate, DeleteStatusOverride琳拭;對(duì)當(dāng)前實(shí)例的這些行為操作成功后,則需要把這些狀態(tài)行為的操作信息實(shí)例同步到鄰近的eureka服務(wù)節(jié)點(diǎn)描验,需不需要集群同步主要看isReplication參數(shù)白嘁,默認(rèn)是false需要同步,能夠執(zhí)行到replicateInstanceActionsToPeers方法膘流。
同步操作
1絮缅、注冊(cè)后同步register
2、續(xù)約后同步renew
3呼股、狀態(tài)更新后同步statusUpdate
4盟蚣、下架后同步cancel
5、刪除狀態(tài)后DeleteStatusOverride
syncUp
從鄰近的節(jié)點(diǎn)復(fù)制節(jié)點(diǎn)信息
com.netflix.eureka.registry.PeerAwareInstanceRegistry#syncUp
public int syncUp() {
// Copy entire entry from neighboring DS node
int count = 0;
for (int i = 0; ((i < serverConfig.getRegistrySyncRetries()) && (count == 0)); i++) {
if (i > 0) {
try {
Thread.sleep(serverConfig.getRegistrySyncRetryWaitMs());
} catch (InterruptedException e) {
logger.warn("Interrupted during registry transfer..");
break;
}
}
Applications apps = eurekaClient.getApplications();
for (Application app : apps.getRegisteredApplications()) {
for (InstanceInfo instance : app.getInstances()) {
try {
if (isRegisterable(instance)) {
register(instance, instance.getLeaseInfo().getDurationInSecs(), true);
count++;
}
} catch (Throwable t) {
logger.error("During DS init copy", t);
}
}
}
}
return count;
}
在注冊(cè)同步重試次數(shù)RegistrySyncRetries和注冊(cè)同步等待時(shí)間RegistrySyncRetryWaitMs內(nèi)卖怜,對(duì)eureka集群的其他對(duì)等節(jié)點(diǎn)注冊(cè)的eurekaclient實(shí)例同步注冊(cè)到當(dāng)前節(jié)點(diǎn)屎开。
自我保護(hù)機(jī)制
在服務(wù)剔除時(shí),會(huì)檢查是否開(kāi)啟自我保護(hù)機(jī)制及是否超過(guò)閾值
PeerAwareInstanceRegistryImpl#isLeaseExpirationEnabled
這個(gè)方法首先檢查enableSelfPreservation參數(shù)是否開(kāi)啟(默認(rèn)是true 開(kāi)啟的)马靠,然后判斷當(dāng)前得到的心跳數(shù)與自我保護(hù)閾值比較奄抽。
自我保護(hù)閾值計(jì)算
numberOfRenewsPerMinThreshold =expectedNumberOfRenewsPerMin*
renewalPercentThreshold
自我保護(hù)閾值=每分鐘期望續(xù)約的心跳數(shù)(所有注冊(cè)上的實(shí)例*每分鐘觸發(fā)的心跳連接次數(shù))*自我保護(hù)機(jī)制的觸發(fā)百分比(85%)
假如一共有100個(gè)實(shí)例,服務(wù)端在默認(rèn)情況下每分鐘連接刷新時(shí)間(expectedClientRenewalIntervalSeconds)是30s甩鳄,所以一分鐘有2次逞度。
numberOfRenewsPerMinThreshold = 100*2*0.85=170個(gè)
所以,觸發(fā)保護(hù)機(jī)制 當(dāng)前得到的心跳連接數(shù)小于85%時(shí)妙啃,會(huì)觸發(fā)
另一種說(shuō)法档泽,在服務(wù)剔除的時(shí)候當(dāng)剔除的數(shù)量大于15%時(shí),會(huì)觸發(fā)
如果客戶(hù)端續(xù)約刷新間隔與服務(wù)端續(xù)約刷新間隔不一致揖赴,則會(huì)造成自我保護(hù)機(jī)制不能正常工作
閾值更改時(shí)機(jī)
1馆匿、15分鐘自動(dòng)更改
com.netflix.eureka.registry.PeerAwareInstanceRegistryImpl#scheduleRenewalThresholdUpdateTask
2、注冊(cè)
3燥滑、服務(wù)下架
com.netflix.eureka.registry.PeerAwareInstanceRegistryImpl#cancel
4渐北、服務(wù)初始化
com.netflix.eureka.registry.PeerAwareInstanceRegistryImpl#openForTraffic
為什么初始化和15分鐘自動(dòng)更改是數(shù)量的2倍,注冊(cè)和下架是+2或-2铭拧。因?yàn)槟J(rèn)情況下服務(wù)端一次刷新心跳是30s赃蛛,所以在60s內(nèi)是2次。注冊(cè)和下架是一個(gè)一個(gè)操作的
服務(wù)發(fā)現(xiàn)
查詢(xún)服務(wù)時(shí)搀菩,涉及服務(wù)端的三層緩存架構(gòu)呕臂。
1、只讀緩存層readOnlyCacheMap(ConcurrentMap類(lèi)型)
2肪跋、讀寫(xiě)緩存層readWriteCacheMap(LoadingCache類(lèi)型歧蒋,使用google的guava框架)
3、真實(shí)數(shù)據(jù)層
com.netflix.eureka.resources.ApplicationResource#getApplication
@GET
public Response getApplication(@PathParam("version") String version,
@HeaderParam("Accept") final String acceptHeader,
@HeaderParam(EurekaAccept.HTTP_X_EUREKA_ACCEPT) String eurekaAccept) {
if (!registry.shouldAllowAccess(false)) {
return Response.status(Status.FORBIDDEN).build();
}
EurekaMonitors.GET_APPLICATION.increment();
CurrentRequestVersion.set(Version.toEnum(version));
KeyType keyType = Key.KeyType.JSON;
if (acceptHeader == null || !acceptHeader.contains("json")) {
keyType = Key.KeyType.XML;
}
Key cacheKey = new Key(
Key.EntityType.Application,
appName,
keyType,
CurrentRequestVersion.get(),
EurekaAccept.fromString(eurekaAccept)
);
String payLoad = responseCache.get(cacheKey);
if (payLoad != null) {
logger.debug("Found: {}", appName);
return Response.ok(payLoad).build();
} else {
logger.debug("Not Found: {}", appName);
return Response.status(Status.NOT_FOUND).build();
}
}
com.netflix.eureka.registry.ResponseCacheImpl#get(com.netflix.eureka.registry.Key)
public String get(final Key key) {
return get(key, shouldUseReadOnlyResponseCache);
}
@VisibleForTesting
String get(final Key key, boolean useReadOnlyCache) {
Value payload = getValue(key, useReadOnlyCache);
if (payload == null || payload.getPayload().equals(EMPTY_PAYLOAD)) {
return null;
} else {
return payload.getPayload();
}
}
@VisibleForTesting
Value getValue(final Key key, boolean useReadOnlyCache) {
Value payload = null;
try {
if (useReadOnlyCache) {
final Value currentPayload = readOnlyCacheMap.get(key);
if (currentPayload != null) {
payload = currentPayload;
} else {
payload = readWriteCacheMap.get(key);
readOnlyCacheMap.put(key, payload);
}
} else {
payload = readWriteCacheMap.get(key);
}
} catch (Throwable t) {
logger.error("Cannot get value for key :" + key, t);
}
return payload;
}
首先查看只讀緩存層是否打開(kāi)shouldUseReadOnlyResponseCache,與之對(duì)應(yīng)的是getValue方法的useReadOnlyCache變量值疏尿。
如果打開(kāi),則先從只讀緩存readOnlyCacheMap查詢(xún)易桃,如果查詢(xún)不到則從讀寫(xiě)緩存是readWriteCacheMap查詢(xún)褥琐,并放入到只讀緩存
緩存更新原理
1、只讀緩存
只讀緩存沒(méi)有提供API更新晤郑,只能通過(guò)定時(shí)任務(wù)或查詢(xún)時(shí)從讀寫(xiě)緩存寫(xiě)到只讀緩存
在創(chuàng)建ResponseCacheImpl類(lèi)時(shí)敌呈,開(kāi)啟定時(shí)任務(wù)
如果從讀寫(xiě)緩存得到的cacheValue與從只讀緩存得到的currentCacheValue不相同,則把cacheValue替換到只讀緩存中
2造寝、讀寫(xiě)緩存
2.1磕洪、構(gòu)建ResponseCacheImpl類(lèi)
如果緩存不存在,則通過(guò)generatePayload方法重新加載
/*
* Generate pay load for the given key.
*/
private Value generatePayload(Key key) {
Stopwatch tracer = null;
try {
String payload;
switch (key.getEntityType()) {
case Application:
boolean isRemoteRegionRequested = key.hasRegions();
if (ALL_APPS.equals(key.getName())) {
if (isRemoteRegionRequested) {
tracer = serializeAllAppsWithRemoteRegionTimer.start();
payload = getPayLoad(key, registry.getApplicationsFromMultipleRegions(key.getRegions()));
} else {
tracer = serializeAllAppsTimer.start();
payload = getPayLoad(key, registry.getApplications());
}
} else if (ALL_APPS_DELTA.equals(key.getName())) {
if (isRemoteRegionRequested) {
tracer = serializeDeltaAppsWithRemoteRegionTimer.start();
versionDeltaWithRegions.incrementAndGet();
versionDeltaWithRegionsLegacy.incrementAndGet();
payload = getPayLoad(key,
registry.getApplicationDeltasFromMultipleRegions(key.getRegions()));
} else {
tracer = serializeDeltaAppsTimer.start();
versionDelta.incrementAndGet();
versionDeltaLegacy.incrementAndGet();
payload = getPayLoad(key, registry.getApplicationDeltas());
}
} else {
tracer = serializeOneApptimer.start();
payload = getPayLoad(key, registry.getApplication(key.getName()));
}
break;
case VIP:
case SVIP:
tracer = serializeViptimer.start();
payload = getPayLoad(key, getApplicationsForVip(key, registry));
break;
default:
logger.error("Unidentified entity type: " + key.getEntityType() + " found in the cache key.");
payload = "";
break;
}
return new Value(payload);
} finally {
if (tracer != null) {
tracer.stop();
}
}
}
private String getPayLoad(Key key, Applications apps) {
EncoderWrapper encoderWrapper = serverCodecs.getEncoder(key.getType(), key.getEurekaAccept());
String result;
try {
result = encoderWrapper.encode(apps);
} catch (Exception e) {
logger.error("Failed to encode the payload for all apps", e);
return "";
}
if(logger.isDebugEnabled()) {
logger.debug("New application cache entry {} with apps hashcode {}", key.toStringCompact(), apps.getAppsHashCode());
}
return result;
}
2.2诫龙、調(diào)用invalidateCache方法析显,使緩存失效
com.netflix.eureka.registry.AbstractInstanceRegistry#invalidateCache
com.netflix.eureka.registry.ResponseCacheImpl#invalidate
com.netflix.eureka.registry.ResponseCacheImpl#invalidate(com.netflix.eureka.registry.Key...)
調(diào)用invalidateCache方法時(shí)機(jī)
1、注冊(cè)后更新緩存
com.netflix.eureka.registry.AbstractInstanceRegistry#register
2签赃、服務(wù)下架
com.netflix.eureka.registry.AbstractInstanceRegistry#cancel
3谷异、statusUpdate
com.netflix.eureka.registry.AbstractInstanceRegistry#statusUpdate
4、deleteStatusOverride
com.netflix.eureka.registry.AbstractInstanceRegistry#deleteStatusOverride
總結(jié):
主要就是服務(wù)端使用的框架調(diào)用原理和Eureka Server端涉及的功能
那些功能使用客戶(hù)端調(diào)用(有注冊(cè)锦聊,續(xù)約歹嘹,下架)
服務(wù)下架與服務(wù)剔除的區(qū)別
eureka服務(wù)端涉及的服務(wù)同步,自我保護(hù)閾值
服務(wù)查詢(xún)的三級(jí)緩存使用原理