spring cloud eureka server

Eureka Server作為一個開箱即用的服務(wù)中心,主要有以下功能:

  • 服務(wù)注冊拉鹃;
  • 接收服務(wù)心跳(續(xù)租)膏燕;
  • 服務(wù)剔除煌寇;
  • 服務(wù)下線阀溶;
  • 集群同步;
  • 獲取注冊表中服務(wù)實例信息击纬。

需要注意的是更振,Eureka Server本身也是一個Eureka Client肯腕,在不禁止其客戶端行為時实撒,他會向其他Eureka Server執(zhí)行注冊知态、發(fā)送心跳等操作贡茅。

源碼和配置

其中重要的類為com.netflix.eureka.registry.AbstractInstanceRegistry類顶考。該類主管該Server下的Client的活動信息村怪。另外一個接口com.netflix.eureka.registry.PeerAwareInstanceRegistry,只有一個實現(xiàn)類,主管和peer之間的同步等操作搅轿。

eureka server主要類圖.png

數(shù)據(jù)結(jié)構(gòu)

注冊表是Eureka中的主要部分璧坟,在此簡單分析一下他的數(shù)據(jù)結(jié)構(gòu)。


image.png

服務(wù)注冊

服務(wù)注冊的默認租約時間leaseDuration為90s黎茎。主要實現(xiàn)位于com.netflix.eureka.registry.AbstractInstanceRegistry#register方法中傅瞻,其實現(xiàn)如下(以下所有的代碼刪除了logger):

public void register(InstanceInfo registrant, int leaseDuration, boolean isReplication) {
        try {
        //首先獲取讀鎖嗅骄,防止其他線程修改數(shù)據(jù)掸读,但是可以讀取數(shù)據(jù)
            read.lock();
      //registry 全局變量儿惫,ConcurrentHashMap類型留搔,總注冊信息
      //通過APPName來獲取服務(wù)集群的租約集合
            Map<String, Lease<InstanceInfo>> gMap = registry.get(registrant.getAppName());
            REGISTER.increment(isReplication);
            if (gMap == null) {
                final ConcurrentHashMap<String, Lease<InstanceInfo>> gNewMap = new ConcurrentHashMap<String, Lease<InstanceInfo>>();
               //此處使用putIfAbsent而不是put隔显,防止多線程獲取readLock之后括眠,覆蓋別的線程添加的屬性
                 gMap = registry.putIfAbsent(registrant.getAppName(), gNewMap);
                if (gMap == null) {
                    gMap = gNewMap;
                }
            }
           //獲取Client實例的租約信息
            Lease<InstanceInfo> existingLease = gMap.get(registrant.getId());
           //租約存在,且租約中的Client實例不為空
            if (existingLease != null && (existingLease.getHolder() != null)) {
              //舊租約的修改時間
                Long existingLastDirtyTimestamp = existingLease.getHolder().getLastDirtyTimestamp();
              //新租約的修改時間
                Long registrationLastDirtyTimestamp = registrant.getLastDirtyTimestamp();
            //因為是多線程操作当船,所以新租約的時間不一定大于舊租約的時間
                if (existingLastDirtyTimestamp > registrationLastDirtyTimestamp) {
                    registrant = existingLease.getHolder();
                }
            } else {
                // 租約不存在
                synchronized (lock) {
                  //自我保護機制[1]
                    if (this.expectedNumberOfRenewsPerMin > 0) {
                        // Since the client wants to cancel it, reduce the threshold
                        // (1 for 30 seconds, 2 for a minute)
                        this.expectedNumberOfRenewsPerMin = this.expectedNumberOfRenewsPerMin + 2;
                        this.numberOfRenewsPerMinThreshold =
                                (int) (this.expectedNumberOfRenewsPerMin * serverConfig.getRenewalPercentThreshold());
                    }
                }
            }
            Lease<InstanceInfo> lease = new Lease<InstanceInfo>(registrant, leaseDuration);
            if (existingLease != null) {
            //繼承上次的服務(wù)啟動時間
                lease.setServiceUpTimestamp(existingLease.getServiceUpTimestamp());
            }
            gMap.put(registrant.getId(), lease);
            synchronized (recentRegisteredQueue) {
                recentRegisteredQueue.add(new Pair<Long, String>(
                        System.currentTimeMillis(),
                        registrant.getAppName() + "(" + registrant.getId() + ")"));
            }
            //把不是UNKUOWN的缩幸,且新的實例放到overriddenInstanceStatusMap中
            if (!InstanceStatus.UNKNOWN.equals(registrant.getOverriddenStatus())) {
                if (!overriddenInstanceStatusMap.containsKey(registrant.getId())) {
                    overriddenInstanceStatusMap.put(registrant.getId(), registrant.getOverriddenStatus());
                }
            }
            InstanceStatus overriddenStatusFromMap = overriddenInstanceStatusMap.get(registrant.getId());
            if (overriddenStatusFromMap != null) {
                logger.info("Storing overridden status {} from map", overriddenStatusFromMap);
                registrant.setOverriddenStatus(overriddenStatusFromMap);
            }

            // 根據(jù)覆蓋狀態(tài)規(guī)則設(shè)置服務(wù)的狀態(tài)
            InstanceStatus overriddenInstanceStatus = getOverriddenInstanceStatus(registrant, existingLease, isReplication);
            registrant.setStatusWithoutDirty(overriddenInstanceStatus);

            //如果實例為UP狀態(tài)钞护,則設(shè)置服務(wù)啟動時間(僅第一次有效)
            if (InstanceStatus.UP.equals(registrant.getStatus())) {
                lease.serviceUp();
            }
            registrant.setActionType(ActionType.ADDED);
           //recentlyChangedQueue用于Client增量式獲取注冊表信息
            recentlyChangedQueue.add(new RecentlyChangedItem(lease));
            registrant.setLastUpdatedTimestamp();
           //設(shè)置responseCache緩存過期患亿,用于Client全量獲取注冊表信息
            invalidateCache(registrant.getAppName(), registrant.getVIPAddress(), registrant.getSecureVipAddress());
            logger.info("Registered instance {}/{} with status {} (replication={})",
                    registrant.getAppName(), registrant.getId(), registrant.getStatus(), isReplication);
        } finally {
            read.unlock();
        }
    }

在注冊方法中有諸多同步或安全的操作惦界,下面簡析一二:

  1. 開始read.lock()沾歪,使用read的原因,因為readLock是共享鎖狂窑,所以有多線程注冊多個服務(wù)時泉哈,并行執(zhí)行丛晦。對應(yīng)的writeLock排它鎖只用在獲取增量數(shù)據(jù)時,因為獲取增量數(shù)據(jù)時要保證數(shù)據(jù)此時不可更改锌蓄,用writeLock禁止其他線程的read操作煤率。readLock都是對內(nèi)操作,writeLock是對外要保持統(tǒng)一口徑辆沦。

  2. 代碼中涉及的全局變量Map都是ConcurrentMap。保證線程安全蔚晨,且全局共享银择。

  3. 在向registry中存值的時候累舷,并沒有強制性覆蓋浩考,用putIfAbsent代替put方法,兩者都會返回key對應(yīng)的舊值被盈,但前者并不會覆蓋析孽。然后用之前的值來進行操作(因為是根據(jù)實例名獲取的,所以后面只對其租約信息進行修改)只怎。

  4. 自我保護機制處使用了synchronized+volatile:

  • 對于多個修改變量袜瞬,采用synchronized空對象身堡;
  • 因為變量要在別的方法中進行計算安全機制吞滞,所以需要volatile關(guān)鍵字,保證該字段隨時可見盾沫。

(此處的字段不用CAS裁赠,因為涉及乘法,CAS對乘法不太友好赴精。)

  1. 對recentRegisteredQueue的修改佩捞,鎖定該對象,防止所有線程在別的方法中進行操作該值蕾哟。
  2. 緩存類變量基本都是由guava的CacheBuilder來創(chuàng)建的一忱,保證時效和容量等問題。

Tip:[1] 自我保護機制:隨著Client注冊數(shù)量的增加谭确,期待每分鐘維持心跳的數(shù)量也增加帘营,eureka Server檢查到每分鐘續(xù)約的次數(shù)少于期待數(shù)量的閾值(默認0.85)時,就會認為出現(xiàn)異常情況逐哈,從而進入自我保護機制芬迄。默認2次/min/服務(wù)(基于心跳時間)。Eureka會保護這些實例昂秃,認為他們是健康的禀梳,不把他們移出注冊列表。Eureka Server會輸出紅色的文字:

EMERGENCY!EUREKA MAY BE INCORRECTLY CLAIMING INSTANCES ARE UP WHEN THEY'RE NOT.RENEWALS ARE LESSER THAN THRESHOLD AND HENCE THE INSTANCES ARE NOT BEGING EXPIRED JUST TO BE SAFE.

接受心跳服務(wù)

Client默認每隔30s向Server發(fā)起一次續(xù)租肠骆,也叫心跳服務(wù)算途。核心代碼為com.netflix.eureka.registry.AbstractInstanceRegistry#renew

public boolean renew(String appName, String id, boolean isReplication) {
        RENEW.increment(isReplication);
        Map<String, Lease<InstanceInfo>> gMap = registry.get(appName);
        Lease<InstanceInfo> leaseToRenew = null;
        if (gMap != null) {
            leaseToRenew = gMap.get(id);
        }
        //如果沒有發(fā)現(xiàn)租約蚀腿,返回false
        if (leaseToRenew == null) {
            RENEW_NOT_FOUND.increment(isReplication);
            logger.warn("DS: Registry: lease doesn't exist, registering resource: {} - {}", appName, id);
            return false;
        } else {
            //獲取租約中的實例信息
            InstanceInfo instanceInfo = leaseToRenew.getHolder();
            if (instanceInfo != null) {
            //獲取服務(wù)的最終狀態(tài)
                InstanceStatus overriddenInstanceStatus = this.getOverriddenInstanceStatus(
                        instanceInfo, leaseToRenew, isReplication);
                if (overriddenInstanceStatus == InstanceStatus.UNKNOWN) {
                    RENEW_NOT_FOUND.increment(isReplication);
                    return false;
                }
                if (!instanceInfo.getStatus().equals(overriddenInstanceStatus)) {
                  //如果實例的狀態(tài)和最終的覆蓋狀態(tài)不一致嘴瓤,設(shè)置新的狀態(tài)
                    instanceInfo.setStatusWithoutDirty(overriddenInstanceStatus);

                }
            }
            //最后一分鐘的續(xù)租數(shù)(該續(xù)租數(shù)每60秒調(diào)度自動更新為0)
            renewsLastMin.increment();
            //修改最后續(xù)租時間
            leaseToRenew.renew();
            return true;
        }
    }

這部分有很多東西都是在register方法里出現(xiàn)過的,所以不再贅述。

服務(wù)剔除

核心代碼是com.netflix.eureka.registry.AbstractInstanceRegistry#evict(long)廓脆。該入口參數(shù)additionalLeaseMs 一般為0畏浆。此值是補償時間,即如果出現(xiàn)GC或者時鐘偏移等情況而補償?shù)臅r間狞贱。用來判斷是否過期刻获。

    @Override
    public void evict() {
        evict(0l);
    }


 public void evict(long additionalLeaseMs) {
        logger.debug("Running the evict task");
      //是否開啟租約保護[1]
        if (!isLeaseExpirationEnabled()) {
            logger.debug("DS: lease expiration is currently disabled.");
            return;
        }

        // 遍歷所有節(jié)點,找到租約過期的節(jié)點
        List<Lease<InstanceInfo>> expiredLeases = new ArrayList<>();
        for (Entry<String, Map<String, Lease<InstanceInfo>>> groupEntry : registry.entrySet()) {
            Map<String, Lease<InstanceInfo>> leaseMap = groupEntry.getValue();
            if (leaseMap != null) {
                for (Entry<String, Lease<InstanceInfo>> leaseEntry : leaseMap.entrySet()) {
                    Lease<InstanceInfo> lease = leaseEntry.getValue();
                    if (lease.isExpired(additionalLeaseMs) && lease.getHolder() != null) {
                        expiredLeases.add(lease);
                    }
                }
            }
        }

        int registrySize = (int) getLocalRegistrySize();
        int registrySizeThreshold = (int) (registrySize * serverConfig.getRenewalPercentThreshold());
        //可剔除的節(jié)點數(shù) = 本地節(jié)點總數(shù) - 閾值節(jié)點數(shù)
        int evictionLimit = registrySize - registrySizeThreshold;
        //保護機制 - 保證剔除最少量的節(jié)點
        int toEvict = Math.min(expiredLeases.size(), evictionLimit);
        if (toEvict > 0) {
           Random random = new Random(System.currentTimeMillis());
            for (int i = 0; i < toEvict; i++) {
                // Pick a random item (Knuth shuffle algorithm)
                int next = i + random.nextInt(expiredLeases.size() - i);
               //隨機獲取一個節(jié)點瞎嬉,并打亂原有順序(交換對應(yīng)節(jié)點)
                Collections.swap(expiredLeases, i, next);
                Lease<InstanceInfo> lease = expiredLeases.get(i);
                String appName = lease.getHolder().getAppName();
                String id = lease.getHolder().getId();
                EXPIRED.increment();
                //服務(wù)下線
                internalCancel(appName, id, false);
            }
        }
    }

在打亂節(jié)點順序部分蝎毡,采用隨機,是避免同一時間同一服務(wù)集群全部過期然后被剔除的現(xiàn)象氧枣,從而引發(fā)程序崩潰沐兵。

Tips:
[1] 是否開啟租約保護:該值有兩種情況:

  • eureka.server.enable-self-preservation配置有關(guān),默認為true便监,即不開啟租約保護扎谎。為false時開啟租約保護。
  • 上述值為true時烧董,期望每分鐘的續(xù)租閾值數(shù)大于0且最后一分鐘的續(xù)租數(shù)大于該期望值毁靶,即每分鐘的續(xù)租數(shù)不能小于閾值數(shù),則為true逊移,否則開啟保護機制预吆,不準服務(wù)剔除。默認的閾值為0.85胳泉,通過eureka.server.renewal.percent.threshold來設(shè)置(必為double類型)拐叉。

服務(wù)下線

核心代碼是com.netflix.eureka.registry.AbstractInstanceRegistry#internalCancel

    @Override
    public boolean cancel(String appName, String id, boolean isReplication) {
        return internalCancel(appName, id, isReplication);
    }

    protected boolean internalCancel(String appName, String id, boolean isReplication) {
        try {
            //獲取讀鎖
            read.lock();
            CANCEL.increment(isReplication);
            Map<String, Lease<InstanceInfo>> gMap = registry.get(appName);
            Lease<InstanceInfo> leaseToCancel = null;
            if (gMap != null) {
                leaseToCancel = gMap.remove(id);
            }
          //recentCanceledQueue和之前的recentRegisteredQueue都是統(tǒng)計隊列
            synchronized (recentCanceledQueue) {
                recentCanceledQueue.add(new Pair<Long, String>(System.currentTimeMillis(), appName + "(" + id + ")"));
            }
            //如果租約為空扇商,直接返回false
            if (leaseToCancel == null) {
                CANCEL_NOT_FOUND.increment(isReplication);
                return false;
            } else {
                //設(shè)置下線時間(也叫驅(qū)逐時間)
                leaseToCancel.cancel();
                InstanceInfo instanceInfo = leaseToCancel.getHolder();
                String vip = null;
                String svip = null;
                if (instanceInfo != null) {
                    instanceInfo.setActionType(ActionType.DELETED);
                    //添加到最近租約變更隊列
                    recentlyChangedQueue.add(new RecentlyChangedItem(leaseToCancel));
                    instanceInfo.setLastUpdatedTimestamp();
                    //虛擬網(wǎng)絡(luò)協(xié)議主機地址
                    vip = instanceInfo.getVIPAddress();
                    //安全的虛擬網(wǎng)絡(luò)協(xié)議主機地址
                    svip = instanceInfo.getSecureVipAddress();
                }
                //清理緩存 - 使responseCache中指定屬性移除
                invalidateCache(appName, vip, svip);
                return true;
            }
        } finally {
            read.unlock();
        }
    }

獲取注冊表

該功能涉及多個Eureka Server凤瘦。
從Client部分緩存注冊表也了解到注冊表有全量和增量兩種方式。

全量獲取注冊表

該部分的核心代碼是com.netflix.eureka.registry.AbstractInstanceRegistry#getApplicationsFromMultipleRegions案铺。

 public Applications getApplicationsFromMultipleRegions(String[] remoteRegions) {
        //查看是否有遠程區(qū)域
        boolean includeRemoteRegion = null != remoteRegions && remoteRegions.length != 0;

        if (includeRemoteRegion) {
            GET_ALL_WITH_REMOTE_REGIONS_CACHE_MISS.increment();
        } else {
            GET_ALL_CACHE_MISS.increment();
        }
        Applications apps = new Applications();
        apps.setVersion(1L);
        //從本地registry上獲取實例信息
        for (Entry<String, Map<String, Lease<InstanceInfo>>> entry : registry.entrySet()) {
            Application app = null;

            if (entry.getValue() != null) {
                for (Entry<String, Lease<InstanceInfo>> stringLeaseEntry : entry.getValue().entrySet()) {
                    Lease<InstanceInfo> lease = stringLeaseEntry.getValue();
                    if (app == null) {
                        app = new Application(lease.getHolder().getAppName());
                    }
                    //decorateInstanceInfo方法就是講lease中holder轉(zhuǎn)換為Instance
                    app.addInstance(decorateInstanceInfo(lease));
                }
            }
            if (app != null) {
                apps.addApplication(app);
            }
        }
        //從遠程注冊表上獲取實例信息
        if (includeRemoteRegion) {
            for (String remoteRegion : remoteRegions) {
                RemoteRegionRegistry remoteRegistry = regionNameVSRemoteRegistry.get(remoteRegion);
                if (null != remoteRegistry) {
                    Applications remoteApps = remoteRegistry.getApplications();
                    for (Application application : remoteApps.getRegisteredApplications()) {
                        if (shouldFetchFromRemoteRegistry(application.getName(), remoteRegion)) {
                            logger.info("Application {}  fetched from the remote region {}",
                                    application.getName(), remoteRegion);

                            Application appInstanceTillNow = apps.getRegisteredApplications(application.getName());
                            if (appInstanceTillNow == null) {
                                appInstanceTillNow = new Application(application.getName());
                                apps.addApplication(appInstanceTillNow);
                            }
                            for (InstanceInfo instanceInfo : application.getInstances()) {
                                appInstanceTillNow.addInstance(instanceInfo);
                            }
                        } else {
                            logger.debug("Application {} not fetched from the remote region {} as there exists a "
                                            + "whitelist and this app is not in the whitelist.",
                                    application.getName(), remoteRegion);
                        }
                    }
                } else {
                    logger.warn("No remote registry available for the remote region {}", remoteRegion);
                }
            }
        }
        apps.setAppsHashCode(apps.getReconcileHashCode());
        return apps;
    }

入?yún)emoteRegions為指定區(qū)域的數(shù)組集合蔬芥。來源于配置eureka.server.remote-region-urls-with-name,默認為空红且。

從多地區(qū)增量式獲取注冊表

增量獲取的要義還是從最近修改的隊列中坝茎,獲取到最近修改的instanceInfo涤姊,從全局map中獲取到這些實例的完整信息暇番,然后將這些instanceInfo打包過去。
該部分的核心代碼com.netflix.eureka.registry.AbstractInstanceRegistry#getApplicationDeltasFromMultipleRegions思喊。

public Applications getApplicationDeltasFromMultipleRegions(String[] remoteRegions) {
        if (null == remoteRegions) {
            remoteRegions = allKnownRemoteRegions; // null means all remote regions.
        }

        boolean includeRemoteRegion = remoteRegions.length != 0;

        if (includeRemoteRegion) {
            GET_ALL_WITH_REMOTE_REGIONS_CACHE_MISS_DELTA.increment();
        } else {
            GET_ALL_CACHE_MISS_DELTA.increment();
        }

        Applications apps = new Applications();
        apps.setVersion(responseCache.getVersionDeltaWithRegions().get());
        Map<String, Application> applicationInstancesMap = new HashMap<String, Application>();
        try {
            //獲取寫鎖壁酬,排它鎖
            write.lock();
            Iterator<RecentlyChangedItem> iter = this.recentlyChangedQueue.iterator();
            while (iter.hasNext()) {
                Lease<InstanceInfo> lease = iter.next().getLeaseInfo();
                InstanceInfo instanceInfo = lease.getHolder();
                Application app = applicationInstancesMap.get(instanceInfo.getAppName());
                if (app == null) {
                    app = new Application(instanceInfo.getAppName());
                    applicationInstancesMap.put(instanceInfo.getAppName(), app);
                    apps.addApplication(app);
                }
                app.addInstance(decorateInstanceInfo(lease));
            }

            if (includeRemoteRegion) {
                for (String remoteRegion : remoteRegions) {
                    RemoteRegionRegistry remoteRegistry = regionNameVSRemoteRegistry.get(remoteRegion);
                    if (null != remoteRegistry) {
                        Applications remoteAppsDelta = remoteRegistry.getApplicationDeltas();
                        if (null != remoteAppsDelta) {
                            for (Application application : remoteAppsDelta.getRegisteredApplications()) {
                                if (shouldFetchFromRemoteRegistry(application.getName(), remoteRegion)) {
                                    Application appInstanceTillNow =
                                            apps.getRegisteredApplications(application.getName());
                                    if (appInstanceTillNow == null) {
                                        appInstanceTillNow = new Application(application.getName());
                                        apps.addApplication(appInstanceTillNow);
                                    }
                                    for (InstanceInfo instanceInfo : application.getInstances()) {
                                        appInstanceTillNow.addInstance(instanceInfo);
                                    }
                                }
                            }
                        }
                    }
                }
            }

            Applications allApps = getApplicationsFromMultipleRegions(remoteRegions);
            apps.setAppsHashCode(allApps.getReconcileHashCode());
            return apps;
        } finally {
            write.unlock();
        }
    }

集群同步

該功能涉及多個Eureka Server。

Eureka Server初始化本地注冊表

這部分主要通過代碼com.netflix.eureka.registry.PeerAwareInstanceRegistryImpl#syncUp來完成。主要執(zhí)行:

  • 它會先從peer(同伴)節(jié)點上拉取注冊信息舆乔。
  • 并將其中的服務(wù)實例注冊到本地注冊表中岳服。
@Override
    public int syncUp() {
        // Copy entire entry from neighboring DS node
        int count = 0;

        for (int i = 0; ((i < serverConfig.getRegistrySyncRetries()) && (count == 0)); i++) {
            if (i > 0) {
                try {
                    //注冊表重試等待時間
                    Thread.sleep(serverConfig.getRegistrySyncRetryWaitMs());
                } catch (InterruptedException e) {
                    logger.warn("Interrupted during registry transfer..");
                    break;
                }
            }
            Applications apps = eurekaClient.getApplications();
            for (Application app : apps.getRegisteredApplications()) {
                for (InstanceInfo instance : app.getInstances()) {
                    try {
                        if (isRegisterable(instance)) {
                            //此處走com.netflix.eureka.registry.AbstractInstanceRegistry#register方法
                            register(instance, instance.getLeaseInfo().getDurationInSecs(), true);
                            count++;
                        }
                    } catch (Throwable t) {
                        logger.error("During DS init copy", t);
                    }
                }
            }
        }
        return count;
    }

在初始化本地注冊表時,EurekaServer不會接受來自Client的通信請求(如注冊希俩、續(xù)租吊宋、獲取注冊表等)。在同步結(jié)束后颜武,通過com.netflix.eureka.registry.PeerAwareInstanceRegistryImpl#openForTraffic方法來允許該Server接受流量璃搜。關(guān)于這兩部分的調(diào)用邏輯,在com.netflix.eureka.EurekaBootStrap#initEurekaServerContext代碼中鳞上。而該方法調(diào)用時是EurekaBootStrap實現(xiàn)javax.servlet.ServletContextListener#contextInitialized的內(nèi)部这吻。

@Override
    public void openForTraffic(ApplicationInfoManager applicationInfoManager, int count) {
        // 初始化自我保護機制統(tǒng)計參數(shù)
        this.expectedNumberOfRenewsPerMin = count * 2;
        this.numberOfRenewsPerMinThreshold =
                (int) (this.expectedNumberOfRenewsPerMin * serverConfig.getRenewalPercentThreshold());
        this.startupTime = System.currentTimeMillis();
        if (count > 0) {
            this.peerInstancesTransferEmptyOnStartup = false;
        }
        DataCenterInfo.Name selfName = applicationInfoManager.getInfo().getDataCenterInfo().getName();
        //判斷是否是AWS環(huán)境,一般我們的環(huán)境是MyOwn環(huán)境
        boolean isAws = Name.Amazon == selfName;
        if (isAws && serverConfig.shouldPrimeAwsReplicaConnections()) {
            logger.info("Priming AWS connections for all replicas..");
            primeAwsReplicas(applicationInfoManager);
        }
        //修改服務(wù)實例狀態(tài)為上線狀態(tài)
        applicationInfoManager.setInstanceStatus(InstanceStatus.UP);
        //驅(qū)逐任務(wù)以及測試每分鐘心率(租約率)的任務(wù)
        super.postInit();
    }

Eureka Server 之間注冊信息表信息的同步復(fù)制

這部分代碼核心在com.netflix.eureka.registry.PeerAwareInstanceRegistryImpl中篙议,其包括下線唾糯,注冊和續(xù)約等等。

下線
 @Override
    public boolean cancel(final String appName, final String id,
                          final boolean isReplication) {
        //調(diào)用 AbstractInstanceRegistry 的cancel
        if (super.cancel(appName, id, isReplication)) {
            //復(fù)制(同步)下線狀態(tài)到同伴
            replicateToPeers(Action.Cancel, appName, id, null, null, isReplication);
            //鎖定空對象
            synchronized (lock) {
                if (this.expectedNumberOfRenewsPerMin > 0) {
                    this.expectedNumberOfRenewsPerMin = this.expectedNumberOfRenewsPerMin - 2;
                    this.numberOfRenewsPerMinThreshold =
                            (int) (this.expectedNumberOfRenewsPerMin * serverConfig.getRenewalPercentThreshold());
                }
            }
            return true;
        }
        return false;
    }
注冊方法
   @Override
    public void register(final InstanceInfo info, final boolean isReplication) {
        //默認租約時間90s
        int leaseDuration = Lease.DEFAULT_DURATION_IN_SECS;
        if (info.getLeaseInfo() != null && info.getLeaseInfo().getDurationInSecs() > 0) {
            leaseDuration = info.getLeaseInfo().getDurationInSecs();
        }
        //如果租約為空鬼贱,則用默認時間移怯;若租約不為空,注冊租約該有的剩余時間
        //調(diào)用 AbstractInstanceRegistry 的renew
        super.register(info, leaseDuration, isReplication);
        replicateToPeers(Action.Register, info.getAppName(), info.getId(), info, null, isReplication);
    }
續(xù)租方法
    public boolean renew(final String appName, final String id, final boolean isReplication) {
        //調(diào)用 AbstractInstanceRegistry 的renew
        if (super.renew(appName, id, isReplication)) {
            replicateToPeers(Action.Heartbeat, appName, id, null, null, isReplication);
            return true;
        }
        return false;
    }
同步到同伴節(jié)點

下面是復(fù)制到同伴的入口方法这难,代碼是com.netflix.eureka.registry.PeerAwareInstanceRegistryImpl#replicateToPeers芋酌。

private void replicateToPeers(Action action, String appName, String id,
                                  InstanceInfo info /* optional */,
                                  InstanceStatus newStatus /* optional */, boolean isReplication) {
        //Stopwatch是測試執(zhí)行某些代碼的時間
        Stopwatch tracer = action.getTimer().start();
        try {
            //是否是復(fù)制的
            if (isReplication) {
                numberOfReplicationsLastMin.increment();
            }
            // 如果peer集群為空,或者本來就是復(fù)制操作雁佳,則不再執(zhí)行脐帝,防止造成污染復(fù)制
            if (peerEurekaNodes == Collections.EMPTY_LIST || isReplication) {
                return;
            }

            for (final PeerEurekaNode node : peerEurekaNodes.getPeerEurekaNodes()) {
                // If the url represents this host, do not replicate to yourself.
                if (peerEurekaNodes.isThisMyUrl(node.getServiceUrl())) {
                    continue;
                }
                //同步Instance到同伴節(jié)點
                replicateInstanceActionsToPeers(action, appName, id, info, newStatus, node);
            }
        } finally {
            tracer.stop();
        }
    }

peerEurekaNodes是同伴Eureka Server節(jié)點。每個Eureka Server會向同伴同步數(shù)據(jù)糖权。

同步到同伴的代碼為com.netflix.eureka.registry.PeerAwareInstanceRegistryImpl#replicateInstanceActionsToPeers搁拙。根據(jù)同步的Action操作,來分類同步亡电。

 private void replicateInstanceActionsToPeers(Action action, String appName,
                                                 String id, InstanceInfo info, InstanceStatus newStatus,
                                                 PeerEurekaNode node) {
        try {
            InstanceInfo infoFromRegistry = null;
            CurrentRequestVersion.set(Version.V2);
            switch (action) {
                case Cancel:
                    node.cancel(appName, id);
                    break;
                case Heartbeat:
                    InstanceStatus overriddenStatus = overriddenInstanceStatusMap.get(id);
                    infoFromRegistry = getInstanceByAppAndId(appName, id, false);
                    node.heartbeat(appName, id, infoFromRegistry, overriddenStatus, false);
                    break;
                case Register:
                    node.register(info);
                    break;
                case StatusUpdate:
                    infoFromRegistry = getInstanceByAppAndId(appName, id, false);
                    node.statusUpdate(appName, id, newStatus, infoFromRegistry);
                    break;
                case DeleteStatusOverride:
                    infoFromRegistry = getInstanceByAppAndId(appName, id, false);
                    node.deleteStatusOverride(appName, id, infoFromRegistry);
                    break;
            }
        } catch (Throwable t) {
            logger.error("Cannot replicate information to {} for action {}", node.getServiceUrl(), action.name(), t);
        }
    }

同步操作只有以下幾種:

public enum Action {
        Heartbeat, Register, Cancel, StatusUpdate, DeleteStatusOverride;
......
}

上面的請求在com.netflix.eureka.transport.JerseyReplicationClient中髓迎。如HeartBeat的請求在com.netflix.eureka.transport.JerseyReplicationClient#sendHeartBeat中〗耍可以查看路徑等腿堤。這部分路徑就和client中處理對應(yīng)功能是一樣的。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末如暖,一起剝皮案震驚了整個濱河市笆檀,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌盒至,老刑警劉巖酗洒,帶你破解...
    沈念sama閱讀 222,627評論 6 517
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件士修,死亡現(xiàn)場離奇詭異,居然都是意外死亡樱衷,警方通過查閱死者的電腦和手機棋嘲,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 95,180評論 3 399
  • 文/潘曉璐 我一進店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來矩桂,“玉大人沸移,你說我怎么就攤上這事≈读瘢” “怎么了阔籽?”我有些...
    開封第一講書人閱讀 169,346評論 0 362
  • 文/不壞的土叔 我叫張陵,是天一觀的道長牲蜀。 經(jīng)常有香客問我笆制,道長,這世上最難降的妖魔是什么涣达? 我笑而不...
    開封第一講書人閱讀 60,097評論 1 300
  • 正文 為了忘掉前任在辆,我火速辦了婚禮,結(jié)果婚禮上度苔,老公的妹妹穿的比我還像新娘匆篓。我一直安慰自己,他們只是感情好寇窑,可當(dāng)我...
    茶點故事閱讀 69,100評論 6 398
  • 文/花漫 我一把揭開白布鸦概。 她就那樣靜靜地躺著,像睡著了一般甩骏。 火紅的嫁衣襯著肌膚如雪窗市。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 52,696評論 1 312
  • 那天饮笛,我揣著相機與錄音咨察,去河邊找鬼。 笑死福青,一個胖子當(dāng)著我的面吹牛摄狱,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播无午,決...
    沈念sama閱讀 41,165評論 3 422
  • 文/蒼蘭香墨 我猛地睜開眼媒役,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了宪迟?” 一聲冷哼從身側(cè)響起酣衷,我...
    開封第一講書人閱讀 40,108評論 0 277
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎踩验,沒想到半個月后鸥诽,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體商玫,經(jīng)...
    沈念sama閱讀 46,646評論 1 319
  • 正文 獨居荒郊野嶺守林人離奇死亡箕憾,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 38,709評論 3 342
  • 正文 我和宋清朗相戀三年牡借,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片袭异。...
    茶點故事閱讀 40,861評論 1 353
  • 序言:一個原本活蹦亂跳的男人離奇死亡钠龙,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出御铃,到底是詐尸還是另有隱情碴里,我是刑警寧澤,帶...
    沈念sama閱讀 36,527評論 5 351
  • 正文 年R本政府宣布上真,位于F島的核電站咬腋,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏睡互。R本人自食惡果不足惜根竿,卻給世界環(huán)境...
    茶點故事閱讀 42,196評論 3 336
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望就珠。 院中可真熱鬧寇壳,春花似錦、人聲如沸妻怎。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,698評論 0 25
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽逼侦。三九已至匿辩,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間榛丢,已是汗流浹背撒汉。 一陣腳步聲響...
    開封第一講書人閱讀 33,804評論 1 274
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留涕滋,地道東北人睬辐。 一個月前我還...
    沈念sama閱讀 49,287評論 3 379
  • 正文 我出身青樓,卻偏偏與公主長得像宾肺,于是被迫代替她去往敵國和親溯饵。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 45,860評論 2 361

推薦閱讀更多精彩內(nèi)容