Spring Cloud之Eureka源碼分析

本章主要介紹Eureka server端源碼分析。
在分析eureka源碼源碼之前经宏,需要首先了解eureka的作用犀暑。
主要包括服務(wù)注冊(cè)與發(fā)現(xiàn),心跳續(xù)約烁兰,自動(dòng)保護(hù)機(jī)制耐亏,服務(wù)剔除,集群同步等功能沪斟。

@EnableEurekaServer注解

Eureka的服務(wù)端開(kāi)啟是通過(guò)這個(gè)注解广辰。


image.png

這個(gè)注解創(chuàng)建了EurekaServerMarkerConfiguration類(lèi)


image.png

EurekaServerMarkerConfiguration類(lèi)聲明這個(gè)類(lèi)是個(gè)配置類(lèi),并通過(guò)@Bean創(chuàng)建一個(gè)Marker類(lèi)用來(lái)進(jìn)行標(biāo)記主之。在spring自動(dòng)裝配過(guò)程中择吊,將會(huì)對(duì)EurekaServerAutoConfiguration類(lèi)進(jìn)行裝配條件判斷
image.png

EurekaServerAutoConfiguration

加載條件是判斷檢查標(biāo)記類(lèi)Marker是否存在。


image.png

如果存在槽奕,就表明加了@EnableEurekaServer注解几睛。

EurekaServerAutoConfiguration類(lèi)的作用是:

1、導(dǎo)入EurekaServerInitializerConfiguration類(lèi)粤攒。
服務(wù)剔除所森、自我保護(hù)機(jī)制囱持、初始化自我保護(hù)機(jī)制閾值
2、創(chuàng)建類(lèi)并注入spring容器中
EurekaServerAutoConfiguration配置類(lèi)需要?jiǎng)?chuàng)建的類(lèi)包括PeerAwareInstanceRegistry焕济、PeerEurekaNodes洪唐、EurekaServerContext、EurekaServerBootstrap吼蚁、FilterRegistrationBean凭需、javax.ws.rs.core.Application
其中FilterRegistrationBean和Application:查找資源文件并設(shè)置到Jersey過(guò)濾器中。
Jersey過(guò)濾器的作用是在"com.netflix.discovery","com.netflix.eureka" 兩個(gè)包下找到所有加了Path或Provider的注解肝匆,并返回的資源為Application實(shí)例對(duì)象粒蜈。


image.png

image.png

FilterRegistrationBean設(shè)置的過(guò)濾器對(duì)象是Application實(shí)例


image.png

所以,在eureka client與server端交互就是通過(guò)這些資源文件Application方法調(diào)用的旗国。這些資源訪問(wèn)與MVC模型中Controller層的訪問(wèn)原理類(lèi)似枯怖。

服務(wù)注冊(cè)

由客戶(hù)端發(fā)起Application資源ApplicationResource類(lèi)的調(diào)用

1、addInstance

com.netflix.eureka.resources.ApplicationResource#addInstance方法
首先檢查添加實(shí)例信息是否完整能曾,然后調(diào)用registry.register方法進(jìn)行實(shí)例注冊(cè)

   @POST
    @Consumes({"application/json", "application/xml"})
    public Response addInstance(InstanceInfo info,
                                @HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication) {
        logger.debug("Registering instance {} (replication={})", info.getId(), isReplication);
        // validate that the instanceinfo contains all the necessary required fields
       //驗(yàn)證一些實(shí)例數(shù)據(jù)是否完整....
        // handle cases where clients may be registering with bad DataCenterInfo with missing data

        registry.register(info, "true".equals(isReplication));
        return Response.status(204).build();  // 204 to be backwards compatible
    }

在查看添加實(shí)例具體的調(diào)用鏈之前度硝,需要了解下InstanceRegistry類(lèi)的繼承關(guān)系


InstanceRegistry.png
2、register

org.springframework.cloud.netflix.eureka.server.InstanceRegistry#register(com.netflix.appinfo.InstanceInfo, int, boolean)


register.png

2.1寿冕、首先通過(guò)handleRegistration方法發(fā)布EurekaInstanceRegisteredEvent事件


image.png

2.2蕊程、調(diào)用父類(lèi)register方法
3、register

com.netflix.eureka.registry.PeerAwareInstanceRegistryImpl#register
注冊(cè)后進(jìn)行同步驼唱,通過(guò)isReplication參數(shù)控制藻茂。如果注冊(cè)是從其他復(fù)制節(jié)點(diǎn)進(jìn)行復(fù)制,則不同步復(fù)制


image.png
4玫恳、register

com.netflix.eureka.registry.AbstractInstanceRegistry#register

 public void register(InstanceInfo registrant, int leaseDuration, boolean isReplication) {
        try {
            read.lock();
            Map<String, Lease<InstanceInfo>> gMap = registry.get(registrant.getAppName());
            REGISTER.increment(isReplication);
            if (gMap == null) {
                final ConcurrentHashMap<String, Lease<InstanceInfo>> gNewMap = new ConcurrentHashMap<String, Lease<InstanceInfo>>();
                gMap = registry.putIfAbsent(registrant.getAppName(), gNewMap);
                if (gMap == null) {
                    gMap = gNewMap;
                }
            }
            Lease<InstanceInfo> existingLease = gMap.get(registrant.getId());
            // Retain the last dirty timestamp without overwriting it, if there is already a lease
            if (existingLease != null && (existingLease.getHolder() != null)) {
                Long existingLastDirtyTimestamp = existingLease.getHolder().getLastDirtyTimestamp();
                Long registrationLastDirtyTimestamp = registrant.getLastDirtyTimestamp();
                logger.debug("Existing lease found (existing={}, provided={}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp);
                if (existingLastDirtyTimestamp > registrationLastDirtyTimestamp) {
                    logger.warn("There is an existing lease and the existing lease's dirty timestamp {} is greater" +
                            " than the one that is being registered {}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp);
                    logger.warn("Using the existing instanceInfo instead of the new instanceInfo as the registrant");
                    registrant = existingLease.getHolder();
                }
            } else {
                // The lease does not exist and hence it is a new registration
                synchronized (lock) {
                    if (this.expectedNumberOfRenewsPerMin > 0) {
                        // Since the client wants to cancel it, reduce the threshold
                        // (1
                        // for 30 seconds, 2 for a minute)
                        this.expectedNumberOfRenewsPerMin = this.expectedNumberOfRenewsPerMin + 2;
                        this.numberOfRenewsPerMinThreshold =
                                (int) (this.expectedNumberOfRenewsPerMin * serverConfig.getRenewalPercentThreshold());
                    }
                }
                logger.debug("No previous lease information found; it is new registration");
            }
            Lease<InstanceInfo> lease = new Lease<InstanceInfo>(registrant, leaseDuration);
            if (existingLease != null) {
                lease.setServiceUpTimestamp(existingLease.getServiceUpTimestamp());
            }
            gMap.put(registrant.getId(), lease);
            synchronized (recentRegisteredQueue) {
                recentRegisteredQueue.add(new Pair<Long, String>(
                        System.currentTimeMillis(),
                        registrant.getAppName() + "(" + registrant.getId() + ")"));
            }
            // This is where the initial state transfer of overridden status happens
            if (!InstanceStatus.UNKNOWN.equals(registrant.getOverriddenStatus())) {
                logger.debug("Found overridden status {} for instance {}. Checking to see if needs to be add to the "
                                + "overrides", registrant.getOverriddenStatus(), registrant.getId());
                if (!overriddenInstanceStatusMap.containsKey(registrant.getId())) {
                    logger.info("Not found overridden id {} and hence adding it", registrant.getId());
                    overriddenInstanceStatusMap.put(registrant.getId(), registrant.getOverriddenStatus());
                }
            }
            InstanceStatus overriddenStatusFromMap = overriddenInstanceStatusMap.get(registrant.getId());
            if (overriddenStatusFromMap != null) {
                logger.info("Storing overridden status {} from map", overriddenStatusFromMap);
                registrant.setOverriddenStatus(overriddenStatusFromMap);
            }

            // Set the status based on the overridden status rules
            InstanceStatus overriddenInstanceStatus = getOverriddenInstanceStatus(registrant, existingLease, isReplication);
            registrant.setStatusWithoutDirty(overriddenInstanceStatus);

            // If the lease is registered with UP status, set lease service up timestamp
            if (InstanceStatus.UP.equals(registrant.getStatus())) {
                lease.serviceUp();
            }
            registrant.setActionType(ActionType.ADDED);
            recentlyChangedQueue.add(new RecentlyChangedItem(lease));
            registrant.setLastUpdatedTimestamp();
            invalidateCache(registrant.getAppName(), registrant.getVIPAddress(), registrant.getSecureVipAddress());
            logger.info("Registered instance {}/{} with status {} (replication={})",
                    registrant.getAppName(), registrant.getId(), registrant.getStatus(), isReplication);
        } finally {
            read.unlock();
        }
    }

在這個(gè)方法中:
使用ReentrantReadWriteLock的讀鎖進(jìn)行加鎖辨赐,所以在客戶(hù)端調(diào)用資源進(jìn)行注冊(cè)時(shí),可能不止一個(gè)線程調(diào)用到這里
注冊(cè)信息registry使用ConcurrentHashMap<String, Map<String, Lease<InstanceInfo>>> 結(jié)構(gòu)京办,表示的意義是對(duì)于一個(gè)高可用微服務(wù)集群掀序。如下圖對(duì)于服務(wù)a有a1、a2惭婿、a3相同的微服務(wù)不恭。則ConcurrentHashMap中的String表示微服務(wù)組名稱(chēng)a,Map<String, Lease<InstanceInfo>>>是三個(gè)微服務(wù)實(shí)例审孽,其中Map中的String表示a1或a2或a3微服務(wù)名稱(chēng)县袱,InstanceInfo表示三個(gè)服務(wù)實(shí)例信息


image.png

根據(jù)instanceId查找existingLease實(shí)例對(duì)象浑娜,跟當(dāng)前registrant注冊(cè)實(shí)例比較佑力。使用時(shí)間戳較大的實(shí)例對(duì)registrant變量賦值。并設(shè)置實(shí)例的serviceUpTimestamp服務(wù)上線時(shí)間戳筋遭。
放入到當(dāng)前微服務(wù)集合中g(shù)Map.put(registrant.getId(), lease);到這里則完成服務(wù)注冊(cè)
總之打颤,執(zhí)行注冊(cè)實(shí)例的結(jié)果就是把a(bǔ)ppName的微服務(wù)加入到ConcurrentHashMap集合中暴拄。

心跳續(xù)約

客戶(hù)端更新續(xù)約通過(guò)put方法。通過(guò)客戶(hù)端發(fā)起請(qǐng)求
com.netflix.eureka.resources.InstanceResource#renewLease


image.png

com.netflix.eureka.registry.PeerAwareInstanceRegistryImpl#renew
調(diào)用父類(lèi)續(xù)約编饺,續(xù)約成功則同步集群


image.png

com.netflix.eureka.registry.AbstractInstanceRegistry#renew
public boolean renew(String appName, String id, boolean isReplication) {
        RENEW.increment(isReplication);
        Map<String, Lease<InstanceInfo>> gMap = registry.get(appName);
        Lease<InstanceInfo> leaseToRenew = null;
        if (gMap != null) {
            leaseToRenew = gMap.get(id);
        }
        if (leaseToRenew == null) {
            RENEW_NOT_FOUND.increment(isReplication);
            logger.warn("DS: Registry: lease doesn't exist, registering resource: {} - {}", appName, id);
            return false;
        } else {
            InstanceInfo instanceInfo = leaseToRenew.getHolder();
            if (instanceInfo != null) {
                // touchASGCache(instanceInfo.getASGName());
                InstanceStatus overriddenInstanceStatus = this.getOverriddenInstanceStatus(
                        instanceInfo, leaseToRenew, isReplication);
                if (overriddenInstanceStatus == InstanceStatus.UNKNOWN) {
                    logger.info("Instance status UNKNOWN possibly due to deleted override for instance {}"
                            + "; re-register required", instanceInfo.getId());
                    RENEW_NOT_FOUND.increment(isReplication);
                    return false;
                }
                if (!instanceInfo.getStatus().equals(overriddenInstanceStatus)) {
                    Object[] args = {
                            instanceInfo.getStatus().name(),
                            instanceInfo.getOverriddenStatus().name(),
                            instanceInfo.getId()
                    };
                    logger.info(
                            "The instance status {} is different from overridden instance status {} for instance {}. "
                                    + "Hence setting the status to overridden status", args);
                    instanceInfo.setStatus(overriddenInstanceStatus);
                }
            }
            renewsLastMin.increment();
            leaseToRenew.renew();
            return true;
        }
    }

獲取注冊(cè)信息中的具體實(shí)例租債器對(duì)象乖篷,執(zhí)行租債器的renew()方法
com.netflix.eureka.lease.Lease#renew


image.png

續(xù)約成功,則更改lastUpdateTimestamp的值是當(dāng)前系統(tǒng)時(shí)間戳與duration的和

服務(wù)下架

服務(wù)剔除是eureka服務(wù)端對(duì)過(guò)期的服務(wù)(長(zhǎng)時(shí)間沒(méi)有心跳的服務(wù))進(jìn)行定時(shí)清除透且。是服務(wù)端開(kāi)啟的定時(shí)任務(wù)撕蔼,具體源碼實(shí)現(xiàn)是在導(dǎo)入EurekaServerInitializerConfiguration類(lèi)中實(shí)現(xiàn)。服務(wù)下架是客戶(hù)端主動(dòng)下架
EurekaServerInitializerConfiguration實(shí)現(xiàn)了ServletContextAware, SmartLifecycle, Ordered接口秽誊。

start方法執(zhí)行時(shí)機(jī)

SmartLifecycle的start方法調(diào)用鏈(在spring的生命周期中):
AbstractApplicationContext#refresh
AbstractApplicationContext#finishRefresh
DefaultLifecycleProcessor#onRefresh

1鲸沮、start

EurekaServerInitializerConfiguration#start
在一個(gè)線程內(nèi)執(zhí)行這個(gè)方法,原因是執(zhí)行eureka上下文環(huán)境不會(huì)影響spring boot正常啟動(dòng)過(guò)程锅论。
這個(gè)方法功能是eureka服務(wù)端上下文的初始化讼溺,發(fā)布了EurekaRegistryAvailableEvent和EurekaServerStartedEvent事件。


start.png
2最易、contextInitialized

org.springframework.cloud.netflix.eureka.server.EurekaServerBootstrap#contextInitialized


contextInitialized.png
3怒坯、initEurekaServerContext

EurekaServerBootstrap#initEurekaServerContext
包括從鄰近節(jié)點(diǎn)復(fù)制注冊(cè)表


initEurekaServerContext.png
4、openForTraffic

org.springframework.cloud.netflix.eureka.server.InstanceRegistry#openForTraffic


openForTraffic.png
5藻懒、openForTraffic

com.netflix.eureka.registry.PeerAwareInstanceRegistryImpl#openForTraffic


openForTraffic.png
6剔猿、postInit

com.netflix.eureka.registry.AbstractInstanceRegistry#postInit


postInit.png
7、EvictionTask

com.netflix.eureka.registry.AbstractInstanceRegistry.EvictionTask
創(chuàng)建EvictionTask類(lèi)嬉荆,EvictionTask繼承了TimerTask定時(shí)器任務(wù)類(lèi)


EvictionTask.png
8艳馒、evict

com.netflix.eureka.registry.AbstractInstanceRegistry#evict(long)

public void evict(long additionalLeaseMs) {
        logger.debug("Running the evict task");

        if (!isLeaseExpirationEnabled()) {
            logger.debug("DS: lease expiration is currently disabled.");
            return;
        }

        // We collect first all expired items, to evict them in random order. For large eviction sets,
        // if we do not that, we might wipe out whole apps before self preservation kicks in. By randomizing it,
        // the impact should be evenly distributed across all applications.
        List<Lease<InstanceInfo>> expiredLeases = new ArrayList<>();
        for (Entry<String, Map<String, Lease<InstanceInfo>>> groupEntry : registry.entrySet()) {
            Map<String, Lease<InstanceInfo>> leaseMap = groupEntry.getValue();
            if (leaseMap != null) {
                for (Entry<String, Lease<InstanceInfo>> leaseEntry : leaseMap.entrySet()) {
                    Lease<InstanceInfo> lease = leaseEntry.getValue();
                    if (lease.isExpired(additionalLeaseMs) && lease.getHolder() != null) {
                        expiredLeases.add(lease);
                    }
                }
            }
        }

        // To compensate for GC pauses or drifting local time, we need to use current registry size as a base for
        // triggering self-preservation. Without that we would wipe out full registry.
        int registrySize = (int) getLocalRegistrySize();
        int registrySizeThreshold = (int) (registrySize * serverConfig.getRenewalPercentThreshold());
        int evictionLimit = registrySize - registrySizeThreshold;

        int toEvict = Math.min(expiredLeases.size(), evictionLimit);
        if (toEvict > 0) {
            logger.info("Evicting {} items (expired={}, evictionLimit={})", toEvict, expiredLeases.size(), evictionLimit);

            Random random = new Random(System.currentTimeMillis());
            for (int i = 0; i < toEvict; i++) {
                // Pick a random item (Knuth shuffle algorithm)
                int next = i + random.nextInt(expiredLeases.size() - i);
                Collections.swap(expiredLeases, i, next);
                Lease<InstanceInfo> lease = expiredLeases.get(i);

                String appName = lease.getHolder().getAppName();
                String id = lease.getHolder().getId();
                EXPIRED.increment();
                logger.warn("DS: Registry: expired lease for {}/{}", appName, id);
                internalCancel(appName, id, false);
            }
        }
    }

首先判斷是否開(kāi)啟了自我保護(hù)機(jī)制,遍歷注冊(cè)表類(lèi)并判斷每個(gè)租債器實(shí)例調(diào)用isExpired方法判斷是否過(guò)期员寇。對(duì)應(yīng)過(guò)期的實(shí)例加入到expiredLeases集合中弄慰。isExpired方法根據(jù)當(dāng)前時(shí)間戳與最后更新時(shí)間與duration和additionalLeaseMs比較得到。lastUpdateTimestamp最后更新時(shí)間是上次系統(tǒng)時(shí)間與duration 的和蝶锋,最后更新時(shí)間注冊(cè)陆爽、心跳續(xù)約、服務(wù)下架都會(huì)對(duì)這個(gè)參數(shù)進(jìn)行更新扳缕。

public boolean isExpired(long additionalLeaseMs) {
        return (evictionTimestamp > 0 || System.currentTimeMillis() > (lastUpdateTimestamp + duration + additionalLeaseMs));
    }

對(duì)于需要剔除的expiredLeases實(shí)例集合慌闭,并不會(huì)一下把所有的都會(huì)刪除。會(huì)根據(jù)registrySize和RenewalPercentThreshold計(jì)算出一個(gè)evictionLimit剔除最大數(shù)量躯舔。例如一共有registrySize=100驴剔。RenewalPercentThreshold=0.85,則evictionLimit=100-100*0.85=15個(gè)粥庄。如果expiredLeases超過(guò)15個(gè)則會(huì)任意選出15個(gè)進(jìn)行剔除丧失,對(duì)于剩下的下次剔除任務(wù)中刪除。如果下次還會(huì)超過(guò)則會(huì)觸發(fā)自動(dòng)保護(hù)機(jī)制isLeaseExpirationEnabled惜互,直接返回布讹。

集群同步

調(diào)用方法replicateToPeers
com.netflix.eureka.registry.PeerAwareInstanceRegistryImpl#replicateToPeers

/**
     * Replicates all eureka actions to peer eureka nodes except for replication
     * traffic to this node.
     *
     */
    private void replicateToPeers(Action action, String appName, String id,
                                  InstanceInfo info /* optional */,
                                  InstanceStatus newStatus /* optional */, boolean isReplication) {
        Stopwatch tracer = action.getTimer().start();
        try {
            if (isReplication) {
                numberOfReplicationsLastMin.increment();
            }
            // If it is a replication already, do not replicate again as this will create a poison replication
            if (peerEurekaNodes == Collections.EMPTY_LIST || isReplication) {
                return;
            }

            for (final PeerEurekaNode node : peerEurekaNodes.getPeerEurekaNodes()) {
                // If the url represents this host, do not replicate to yourself.
                if (peerEurekaNodes.isThisMyUrl(node.getServiceUrl())) {
                    continue;
                }
                replicateInstanceActionsToPeers(action, appName, id, info, newStatus, node);
            }
        } finally {
            tracer.stop();
        }
    }
private void replicateInstanceActionsToPeers(Action action, String appName,
                                                 String id, InstanceInfo info, InstanceStatus newStatus,
                                                 PeerEurekaNode node) {
        try {
            InstanceInfo infoFromRegistry = null;
            CurrentRequestVersion.set(Version.V2);
            switch (action) {
                case Cancel:
                    node.cancel(appName, id);
                    break;
                case Heartbeat:
                    InstanceStatus overriddenStatus = overriddenInstanceStatusMap.get(id);
                    infoFromRegistry = getInstanceByAppAndId(appName, id, false);
                    node.heartbeat(appName, id, infoFromRegistry, overriddenStatus, false);
                    break;
                case Register:
                    node.register(info);
                    break;
                case StatusUpdate:
                    infoFromRegistry = getInstanceByAppAndId(appName, id, false);
                    node.statusUpdate(appName, id, newStatus, infoFromRegistry);
                    break;
                case DeleteStatusOverride:
                    infoFromRegistry = getInstanceByAppAndId(appName, id, false);
                    node.deleteStatusOverride(appName, id, infoFromRegistry);
                    break;
            }
        } catch (Throwable t) {
            logger.error("Cannot replicate information to {} for action {}", node.getServiceUrl(), action.name(), t);
        }
    }

當(dāng)前實(shí)例的行為包括Heartbeat, Register, Cancel, StatusUpdate, DeleteStatusOverride琳拭;對(duì)當(dāng)前實(shí)例的這些行為操作成功后,則需要把這些狀態(tài)行為的操作信息實(shí)例同步到鄰近的eureka服務(wù)節(jié)點(diǎn)描验,需不需要集群同步主要看isReplication參數(shù)白嘁,默認(rèn)是false需要同步,能夠執(zhí)行到replicateInstanceActionsToPeers方法膘流。

同步操作

1絮缅、注冊(cè)后同步register


register.png

2、續(xù)約后同步renew


renew.png

3呼股、狀態(tài)更新后同步statusUpdate
statusUpdate.png

4盟蚣、下架后同步cancel


cancel.png

5、刪除狀態(tài)后DeleteStatusOverride
DeleteStatusOverride.png
syncUp

從鄰近的節(jié)點(diǎn)復(fù)制節(jié)點(diǎn)信息
com.netflix.eureka.registry.PeerAwareInstanceRegistry#syncUp


syncUp.png
public int syncUp() {
        // Copy entire entry from neighboring DS node
        int count = 0;

        for (int i = 0; ((i < serverConfig.getRegistrySyncRetries()) && (count == 0)); i++) {
            if (i > 0) {
                try {
                    Thread.sleep(serverConfig.getRegistrySyncRetryWaitMs());
                } catch (InterruptedException e) {
                    logger.warn("Interrupted during registry transfer..");
                    break;
                }
            }
            Applications apps = eurekaClient.getApplications();
            for (Application app : apps.getRegisteredApplications()) {
                for (InstanceInfo instance : app.getInstances()) {
                    try {
                        if (isRegisterable(instance)) {
                            register(instance, instance.getLeaseInfo().getDurationInSecs(), true);
                            count++;
                        }
                    } catch (Throwable t) {
                        logger.error("During DS init copy", t);
                    }
                }
            }
        }
        return count;
    }

在注冊(cè)同步重試次數(shù)RegistrySyncRetries和注冊(cè)同步等待時(shí)間RegistrySyncRetryWaitMs內(nèi)卖怜,對(duì)eureka集群的其他對(duì)等節(jié)點(diǎn)注冊(cè)的eurekaclient實(shí)例同步注冊(cè)到當(dāng)前節(jié)點(diǎn)屎开。

自我保護(hù)機(jī)制

在服務(wù)剔除時(shí),會(huì)檢查是否開(kāi)啟自我保護(hù)機(jī)制及是否超過(guò)閾值


evict.png

PeerAwareInstanceRegistryImpl#isLeaseExpirationEnabled
這個(gè)方法首先檢查enableSelfPreservation參數(shù)是否開(kāi)啟(默認(rèn)是true 開(kāi)啟的)马靠,然后判斷當(dāng)前得到的心跳數(shù)與自我保護(hù)閾值比較奄抽。


isLeaseExpirationEnabled.png
自我保護(hù)閾值計(jì)算

numberOfRenewsPerMinThreshold =expectedNumberOfRenewsPerMin*
renewalPercentThreshold
自我保護(hù)閾值=每分鐘期望續(xù)約的心跳數(shù)(所有注冊(cè)上的實(shí)例*每分鐘觸發(fā)的心跳連接次數(shù))*自我保護(hù)機(jī)制的觸發(fā)百分比(85%)

假如一共有100個(gè)實(shí)例,服務(wù)端在默認(rèn)情況下每分鐘連接刷新時(shí)間(expectedClientRenewalIntervalSeconds)是30s甩鳄,所以一分鐘有2次逞度。
numberOfRenewsPerMinThreshold = 100*2*0.85=170個(gè)

所以,觸發(fā)保護(hù)機(jī)制 當(dāng)前得到的心跳連接數(shù)小于85%時(shí)妙啃,會(huì)觸發(fā)
另一種說(shuō)法档泽,在服務(wù)剔除的時(shí)候當(dāng)剔除的數(shù)量大于15%時(shí),會(huì)觸發(fā)
如果客戶(hù)端續(xù)約刷新間隔與服務(wù)端續(xù)約刷新間隔不一致揖赴,則會(huì)造成自我保護(hù)機(jī)制不能正常工作

閾值更改時(shí)機(jī)

1馆匿、15分鐘自動(dòng)更改
com.netflix.eureka.registry.PeerAwareInstanceRegistryImpl#scheduleRenewalThresholdUpdateTask


scheduleRenewalThresholdUpdateTask.png

updateRenewalThreshold.png

2、注冊(cè)


image.png

3燥滑、服務(wù)下架
com.netflix.eureka.registry.PeerAwareInstanceRegistryImpl#cancel
image.png

4渐北、服務(wù)初始化

com.netflix.eureka.registry.PeerAwareInstanceRegistryImpl#openForTraffic


image.png

為什么初始化和15分鐘自動(dòng)更改是數(shù)量的2倍,注冊(cè)和下架是+2或-2铭拧。因?yàn)槟J(rèn)情況下服務(wù)端一次刷新心跳是30s赃蛛,所以在60s內(nèi)是2次。注冊(cè)和下架是一個(gè)一個(gè)操作的

服務(wù)發(fā)現(xiàn)

查詢(xún)服務(wù)時(shí)搀菩,涉及服務(wù)端的三層緩存架構(gòu)呕臂。
1、只讀緩存層readOnlyCacheMap(ConcurrentMap類(lèi)型)
2肪跋、讀寫(xiě)緩存層readWriteCacheMap(LoadingCache類(lèi)型歧蒋,使用google的guava框架)
3、真實(shí)數(shù)據(jù)層
com.netflix.eureka.resources.ApplicationResource#getApplication

 @GET
    public Response getApplication(@PathParam("version") String version,
                                   @HeaderParam("Accept") final String acceptHeader,
                                   @HeaderParam(EurekaAccept.HTTP_X_EUREKA_ACCEPT) String eurekaAccept) {
        if (!registry.shouldAllowAccess(false)) {
            return Response.status(Status.FORBIDDEN).build();
        }

        EurekaMonitors.GET_APPLICATION.increment();

        CurrentRequestVersion.set(Version.toEnum(version));
        KeyType keyType = Key.KeyType.JSON;
        if (acceptHeader == null || !acceptHeader.contains("json")) {
            keyType = Key.KeyType.XML;
        }

        Key cacheKey = new Key(
                Key.EntityType.Application,
                appName,
                keyType,
                CurrentRequestVersion.get(),
                EurekaAccept.fromString(eurekaAccept)
        );

        String payLoad = responseCache.get(cacheKey);

        if (payLoad != null) {
            logger.debug("Found: {}", appName);
            return Response.ok(payLoad).build();
        } else {
            logger.debug("Not Found: {}", appName);
            return Response.status(Status.NOT_FOUND).build();
        }
    }

com.netflix.eureka.registry.ResponseCacheImpl#get(com.netflix.eureka.registry.Key)

public String get(final Key key) {
        return get(key, shouldUseReadOnlyResponseCache);
    }

    @VisibleForTesting
    String get(final Key key, boolean useReadOnlyCache) {
        Value payload = getValue(key, useReadOnlyCache);
        if (payload == null || payload.getPayload().equals(EMPTY_PAYLOAD)) {
            return null;
        } else {
            return payload.getPayload();
        }
    }

@VisibleForTesting
    Value getValue(final Key key, boolean useReadOnlyCache) {
        Value payload = null;
        try {
            if (useReadOnlyCache) {
                final Value currentPayload = readOnlyCacheMap.get(key);
                if (currentPayload != null) {
                    payload = currentPayload;
                } else {
                    payload = readWriteCacheMap.get(key);
                    readOnlyCacheMap.put(key, payload);
                }
            } else {
                payload = readWriteCacheMap.get(key);
            }
        } catch (Throwable t) {
            logger.error("Cannot get value for key :" + key, t);
        }
        return payload;
    }

首先查看只讀緩存層是否打開(kāi)shouldUseReadOnlyResponseCache,與之對(duì)應(yīng)的是getValue方法的useReadOnlyCache變量值疏尿。
如果打開(kāi),則先從只讀緩存readOnlyCacheMap查詢(xún)易桃,如果查詢(xún)不到則從讀寫(xiě)緩存是readWriteCacheMap查詢(xún)褥琐,并放入到只讀緩存

緩存更新原理

1、只讀緩存

只讀緩存沒(méi)有提供API更新晤郑,只能通過(guò)定時(shí)任務(wù)或查詢(xún)時(shí)從讀寫(xiě)緩存寫(xiě)到只讀緩存
在創(chuàng)建ResponseCacheImpl類(lèi)時(shí)敌呈,開(kāi)啟定時(shí)任務(wù)


shouldUseReadOnlyResponseCache.png

getCacheUpdateTask.png

如果從讀寫(xiě)緩存得到的cacheValue與從只讀緩存得到的currentCacheValue不相同,則把cacheValue替換到只讀緩存中

2造寝、讀寫(xiě)緩存

2.1磕洪、構(gòu)建ResponseCacheImpl類(lèi)


readWriteCacheMap.png

如果緩存不存在,則通過(guò)generatePayload方法重新加載

/*
     * Generate pay load for the given key.
     */
    private Value generatePayload(Key key) {
        Stopwatch tracer = null;
        try {
            String payload;
            switch (key.getEntityType()) {
                case Application:
                    boolean isRemoteRegionRequested = key.hasRegions();

                    if (ALL_APPS.equals(key.getName())) {
                        if (isRemoteRegionRequested) {
                            tracer = serializeAllAppsWithRemoteRegionTimer.start();
                            payload = getPayLoad(key, registry.getApplicationsFromMultipleRegions(key.getRegions()));
                        } else {
                            tracer = serializeAllAppsTimer.start();
                            payload = getPayLoad(key, registry.getApplications());
                        }
                    } else if (ALL_APPS_DELTA.equals(key.getName())) {
                        if (isRemoteRegionRequested) {
                            tracer = serializeDeltaAppsWithRemoteRegionTimer.start();
                            versionDeltaWithRegions.incrementAndGet();
                            versionDeltaWithRegionsLegacy.incrementAndGet();
                            payload = getPayLoad(key,
                                    registry.getApplicationDeltasFromMultipleRegions(key.getRegions()));
                        } else {
                            tracer = serializeDeltaAppsTimer.start();
                            versionDelta.incrementAndGet();
                            versionDeltaLegacy.incrementAndGet();
                            payload = getPayLoad(key, registry.getApplicationDeltas());
                        }
                    } else {
                        tracer = serializeOneApptimer.start();
                        payload = getPayLoad(key, registry.getApplication(key.getName()));
                    }
                    break;
                case VIP:
                case SVIP:
                    tracer = serializeViptimer.start();
                    payload = getPayLoad(key, getApplicationsForVip(key, registry));
                    break;
                default:
                    logger.error("Unidentified entity type: " + key.getEntityType() + " found in the cache key.");
                    payload = "";
                    break;
            }
            return new Value(payload);
        } finally {
            if (tracer != null) {
                tracer.stop();
            }
        }
    }

private String getPayLoad(Key key, Applications apps) {
        EncoderWrapper encoderWrapper = serverCodecs.getEncoder(key.getType(), key.getEurekaAccept());
        String result;
        try {
            result = encoderWrapper.encode(apps);
        } catch (Exception e) {
            logger.error("Failed to encode the payload for all apps", e);
            return "";
        }
        if(logger.isDebugEnabled()) {
            logger.debug("New application cache entry {} with apps hashcode {}", key.toStringCompact(), apps.getAppsHashCode());
        }
        return result;
    }

2.2诫龙、調(diào)用invalidateCache方法析显,使緩存失效
com.netflix.eureka.registry.AbstractInstanceRegistry#invalidateCache


invalidateCache.png

com.netflix.eureka.registry.ResponseCacheImpl#invalidate


invalidate.png

com.netflix.eureka.registry.ResponseCacheImpl#invalidate(com.netflix.eureka.registry.Key...)
invalidate.png
調(diào)用invalidateCache方法時(shí)機(jī)

1、注冊(cè)后更新緩存
com.netflix.eureka.registry.AbstractInstanceRegistry#register
2签赃、服務(wù)下架
com.netflix.eureka.registry.AbstractInstanceRegistry#cancel
3谷异、statusUpdate
com.netflix.eureka.registry.AbstractInstanceRegistry#statusUpdate
4、deleteStatusOverride
com.netflix.eureka.registry.AbstractInstanceRegistry#deleteStatusOverride

總結(jié):

主要就是服務(wù)端使用的框架調(diào)用原理和Eureka Server端涉及的功能
那些功能使用客戶(hù)端調(diào)用(有注冊(cè)锦聊,續(xù)約歹嘹,下架)
服務(wù)下架與服務(wù)剔除的區(qū)別
eureka服務(wù)端涉及的服務(wù)同步,自我保護(hù)閾值
服務(wù)查詢(xún)的三級(jí)緩存使用原理

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
禁止轉(zhuǎn)載孔庭,如需轉(zhuǎn)載請(qǐng)通過(guò)簡(jiǎn)信或評(píng)論聯(lián)系作者尺上。
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市圆到,隨后出現(xiàn)的幾起案子怎抛,更是在濱河造成了極大的恐慌,老刑警劉巖芽淡,帶你破解...
    沈念sama閱讀 216,591評(píng)論 6 501
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件抽诉,死亡現(xiàn)場(chǎng)離奇詭異,居然都是意外死亡吐绵,警方通過(guò)查閱死者的電腦和手機(jī)迹淌,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,448評(píng)論 3 392
  • 文/潘曉璐 我一進(jìn)店門(mén),熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)己单,“玉大人唉窃,你說(shuō)我怎么就攤上這事∥屏” “怎么了纹份?”我有些...
    開(kāi)封第一講書(shū)人閱讀 162,823評(píng)論 0 353
  • 文/不壞的土叔 我叫張陵,是天一觀的道長(zhǎng)。 經(jīng)常有香客問(wèn)我蔓涧,道長(zhǎng)件已,這世上最難降的妖魔是什么? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 58,204評(píng)論 1 292
  • 正文 為了忘掉前任元暴,我火速辦了婚禮篷扩,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘茉盏。我一直安慰自己鉴未,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,228評(píng)論 6 388
  • 文/花漫 我一把揭開(kāi)白布鸠姨。 她就那樣靜靜地躺著铜秆,像睡著了一般。 火紅的嫁衣襯著肌膚如雪讶迁。 梳的紋絲不亂的頭發(fā)上连茧,一...
    開(kāi)封第一講書(shū)人閱讀 51,190評(píng)論 1 299
  • 那天,我揣著相機(jī)與錄音巍糯,去河邊找鬼梅屉。 笑死,一個(gè)胖子當(dāng)著我的面吹牛鳞贷,可吹牛的內(nèi)容都是我干的坯汤。 我是一名探鬼主播,決...
    沈念sama閱讀 40,078評(píng)論 3 418
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼搀愧,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼惰聂!你這毒婦竟也來(lái)了?” 一聲冷哼從身側(cè)響起咱筛,我...
    開(kāi)封第一講書(shū)人閱讀 38,923評(píng)論 0 274
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤搓幌,失蹤者是張志新(化名)和其女友劉穎,沒(méi)想到半個(gè)月后迅箩,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體溉愁,經(jīng)...
    沈念sama閱讀 45,334評(píng)論 1 310
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,550評(píng)論 2 333
  • 正文 我和宋清朗相戀三年饲趋,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了拐揭。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 39,727評(píng)論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡奕塑,死狀恐怖堂污,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情龄砰,我是刑警寧澤盟猖,帶...
    沈念sama閱讀 35,428評(píng)論 5 343
  • 正文 年R本政府宣布讨衣,位于F島的核電站,受9級(jí)特大地震影響式镐,放射性物質(zhì)發(fā)生泄漏反镇。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,022評(píng)論 3 326
  • 文/蒙蒙 一娘汞、第九天 我趴在偏房一處隱蔽的房頂上張望歹茶。 院中可真熱鬧,春花似錦价说、人聲如沸辆亏。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 31,672評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)。三九已至缤弦,卻和暖如春领迈,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背碍沐。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 32,826評(píng)論 1 269
  • 我被黑心中介騙來(lái)泰國(guó)打工狸捅, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人累提。 一個(gè)月前我還...
    沈念sama閱讀 47,734評(píng)論 2 368
  • 正文 我出身青樓尘喝,卻偏偏與公主長(zhǎng)得像,于是被迫代替她去往敵國(guó)和親斋陪。 傳聞我的和親對(duì)象是個(gè)殘疾皇子朽褪,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,619評(píng)論 2 354

推薦閱讀更多精彩內(nèi)容