Spring Cloud Netflix - Eureka Server源碼閱讀

Eureka是一個基于REST并應(yīng)用于AWS云服務(wù)的服務(wù)注冊中心哀蘑,并且經(jīng)歷過了Netflix公司的生產(chǎn)考驗碘勉,絕對是我們值得細(xì)心研讀的中間件罩扇。雖然我們可能并未接觸過AWS梧奢,但在閱讀Eureka之前應(yīng)該簡單地了解一下Amazon EC2中的某些概念担神,如地區(qū)和可用區(qū)域楼吃,這樣能更好地理解Eureka中某些特定的術(shù)語。

在閱讀本文之前妄讯,希望大家都已經(jīng)對Eureka有簡單的認(rèn)識:

Eureka REST operations

Understanding eureka client server communication

Understanding Eureka Peer to Peer Communication

Register - 注冊

Eureka Instance注冊的REST入口在com.netflix.eureka.resources.ApplicationResource#addInstance

    /**
     * Registers information about a particular instance for an
     * {@link com.netflix.discovery.shared.Application}.
     *
     * @param info
     *            {@link InstanceInfo} information of the instance.
     * @param isReplication
     *            a header parameter containing information whether this is
     *            replicated from other nodes.
     */
    @POST
    @Consumes({"application/json", "application/xml"})
    public Response addInstance(InstanceInfo info,
                                @HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication) {
        logger.debug("Registering instance {} (replication={})", info.getId(), isReplication);
        // *********** 字段校驗 ************
        // validate that the instanceinfo contains all the necessary required fields
        if (isBlank(info.getId())) {
            return Response.status(400).entity("Missing instanceId").build();
        } else if (isBlank(info.getHostName())) {
            return Response.status(400).entity("Missing hostname").build();
        } else if (isBlank(info.getAppName())) {
            return Response.status(400).entity("Missing appName").build();
        } else if (!appName.equals(info.getAppName())) {
            return Response.status(400).entity("Mismatched appName, expecting " + appName + " but was " + info.getAppName()).build();
        } else if (info.getDataCenterInfo() == null) {
            return Response.status(400).entity("Missing dataCenterInfo").build();
        } else if (info.getDataCenterInfo().getName() == null) {
            return Response.status(400).entity("Missing dataCenterInfo Name").build();
        }
      
        // handle cases where clients may be registering with bad DataCenterInfo with missing data
        DataCenterInfo dataCenterInfo = info.getDataCenterInfo();
        // 僅當(dāng)DataCenterInfo為AmazonInfo實例的時候孩锡,其父類有可能是UniqueIdentifier
        if (dataCenterInfo instanceof UniqueIdentifier) {
            // ......
        }
        // *********** 字段校驗 END ************
        registry.register(info, "true".equals(isReplication));  // (1)
        return Response.status(204).build();  // 204 to be backwards compatible
    }

真正的注冊操作在(1)處,需要注意的是isReplication變量取決于HTTP頭x-netflix-discovery-replication的值亥贸。繼續(xù)追蹤(1)的調(diào)用棧躬窜,發(fā)現(xiàn)執(zhí)行注冊操作的方法是是com.netflix.eureka.registry.PeerAwareInstanceRegistryImpl#register

注意該方法的javadoc,他告訴了我們一個比較重要的訊息:將InstanceInfo實例信息注冊到Eureka并且復(fù)制該信息到其他peer炕置。如果當(dāng)前收到的注冊信息是來自其他peer的復(fù)制事件荣挨,那么將不會將這個注冊信息繼續(xù)復(fù)制到其他peer男韧,這個標(biāo)志位就是上面所述的isReplication

    /**
     * Registers the information about the {@link InstanceInfo} and replicates
     * this information to all peer eureka nodes. If this is replication event
     * from other replica nodes then it is not replicated.
     *
     * @param info
     *            the {@link InstanceInfo} to be registered and replicated.
     * @param isReplication
     *            true if this is a replication event from other replica nodes,
     *            false otherwise.
     */
    @Override
    public void register(final InstanceInfo info, final boolean isReplication) {
        // 默認(rèn)租約有效時長為90s
        int leaseDuration = Lease.DEFAULT_DURATION_IN_SECS;
        // 注冊信息里包含則依照注冊信息的租約時長
        if (info.getLeaseInfo() != null && info.getLeaseInfo().getDurationInSecs() > 0) {
            leaseDuration = info.getLeaseInfo().getDurationInSecs();
        }
        // super為AbstractInstanceRegistry
        super.register(info, leaseDuration, isReplication);
        // 復(fù)制到其他peer
        replicateToPeers(Action.Register, info.getAppName(), info.getId(), info, null, isReplication);
    }

我們看到是先獲取到租約的有效時長默垄,然后才是真真正正地委托給super執(zhí)行注冊操作super.register(...)并將注冊信息復(fù)制到其他peer此虑。register方法非常長,我們重點觀察一下他的注冊表的結(jié)構(gòu):

private final ConcurrentHashMap<String, Map<String, Lease<InstanceInfo>>> registry

該注冊表是一個以app name為key(在Spring Cloud里就是spring.application.name)口锭,嵌套Map為value的ConcurrentHashMap結(jié)構(gòu)寡壮。其嵌套Map是以Instance ID為key,Lease對象為value的鍵值結(jié)構(gòu)讹弯。這個registry注冊表在Eureka Server或SpringBoot Admin的監(jiān)控面板上以Eureka Service這個角色出現(xiàn)况既。

    /**
     * Registers a new instance with a given duration.
     *
     * @see com.netflix.eureka.lease.LeaseManager#register(java.lang.Object, int, boolean)
     */
    public void register(InstanceInfo registrant, int leaseDuration, boolean isReplication) {
        try {
            read.lock();
            // 可以看出registry是一個以info的app name為key的Map結(jié)構(gòu), 也就是以spring.application.name的大寫串為key
            Map<String, Lease<InstanceInfo>> gMap = registry.get(registrant.getAppName());
            REGISTER.increment(isReplication);
            if (gMap == null) {
                final ConcurrentHashMap<String, Lease<InstanceInfo>> gNewMap = new ConcurrentHashMap<String, Lease<InstanceInfo>>();
                gMap = registry.putIfAbsent(registrant.getAppName(), gNewMap);
                if (gMap == null) {
                    gMap = gNewMap;
                }
            }
            // registry的value的Map結(jié)構(gòu)是以info的id為key,這里的id就是Eureka文檔上的Instance ID组民,給你個例子你就想起是什么東西了:10.8.88.233:config-server:10888
            Lease<InstanceInfo> existingLease = gMap.get(registrant.getId());
            // .......
            Lease<InstanceInfo> lease = new Lease<InstanceInfo>(registrant, leaseDuration);
            if (existingLease != null) {
                lease.setServiceUpTimestamp(existingLease.getServiceUpTimestamp());
            }
            gMap.put(registrant.getId(), lease);
            // .......
        } finally {
            read.unlock();
        }
    }

上面是register(...)中關(guān)于registry的大致操作棒仍,其中有相當(dāng)一部分的操作被略去了,如果感興趣的話可以細(xì)致地研究一下臭胜。

Renew and Cancel Lease - 續(xù)約與取消租約

續(xù)約的REST入口在com.netflix.eureka.resources.InstanceResource#renewLease

而取消租約的REST入口在com.netflix.eureka.resources.InstanceResource#cancelLease

兩者的基本思想相似莫其,經(jīng)由InstanceRegistry -> AbstractInstanceRegistry -> PeerAwareInstanceRegistryImpl,其中PeerAwareInstanceRegistryImpl裝飾了添加復(fù)制信息到其他節(jié)點的功能耸三。其中register乱陡、renew、cancel仪壮、statusUpdate和deleteStatusOverride都會將其信息復(fù)制到其他節(jié)點憨颠。

Fetch Registry - 獲取注冊信息

獲取所有Eureka Instance的注冊信息,com.netflix.eureka.resources.ApplicationsResource#getContainers积锅,其注冊信息由ResponseCacheImpl緩存爽彤,緩存的過期時間在其構(gòu)造函數(shù)中由EurekaServerConfig.getResponseCacheUpdateIntervalMs()所控制,默認(rèn)緩存時間為30s缚陷。而差量注冊信息在Server端會保存得更為長一些(大約3分鐘)适篙,因此獲取的差量可能會重復(fù)返回相同的實例。Eureka Client會自動處理這些重復(fù)信息箫爷。

Evcition

Eureke Server定期進行失效節(jié)點的清理嚷节,執(zhí)行該任務(wù)的定時器被定義在com.netflix.eureka.registry.AbstractInstanceRegistry#evictionTimer,真正的任務(wù)是由他的內(nèi)部類AbstractInstanceRegistry#EvictionTask所執(zhí)行虎锚,默認(rèn)為每60s執(zhí)行一次清理任務(wù)硫痰,其執(zhí)行間隔由EurekaServerConfig#getEvictionIntervalTimerInMs[eureka.server.eviction-interval-timer-in-ms]所決定。

回顧一下上面剛說完的注冊流程翁都,在PeerAwareInstanceRegistryImpl#register里面特別指出了默認(rèn)的租約時長為90s[eureka.Instance.lease-expiration-duration-in-seconds]碍论,即如果90s后都沒有收到特定的Eureka Instance的Heartbeats,則會認(rèn)為這個Instance已經(jīng)失效(Instance在正常情況下默認(rèn)每隔30s發(fā)送一個Heartbeats[eureka.Instance.lease-renewal-interval-in-seconds]柄慰,對以上兩個默認(rèn)值有疑問的可以翻閱LeaseInfo)鳍悠,EvictionTask則會把這個Instance納入清理的范圍税娜。我們看看EvictionTask的清理代碼是怎么寫的。

    public void evict(long additionalLeaseMs) {
        logger.debug("Running the evict task");

        if (!isLeaseExpirationEnabled()) {
            logger.debug("DS: lease expiration is currently disabled.");
            return;
        }

        // We collect first all expired items, to evict them in random order. For large eviction sets,
        // if we do not that, we might wipe out whole apps before self preservation kicks in. By randomizing it,
        // the impact should be evenly distributed across all applications.
        // (2) 下面的for循環(huán)就是把registry中所有的Lease提取到局部變量expiredLeases
        List<Lease<InstanceInfo>> expiredLeases = new ArrayList<>();
        for (Entry<String, Map<String, Lease<InstanceInfo>>> groupEntry : registry.entrySet()) {
            Map<String, Lease<InstanceInfo>> leaseMap = groupEntry.getValue();
            if (leaseMap != null) {
                for (Entry<String, Lease<InstanceInfo>> leaseEntry : leaseMap.entrySet()) {
                    Lease<InstanceInfo> lease = leaseEntry.getValue();
                    if (lease.isExpired(additionalLeaseMs) && lease.getHolder() != null) {
                        expiredLeases.add(lease);
                    }
                }
            }
        }

        // To compensate for GC pauses or drifting local time, we need to use current registry size as a base for
        // triggering self-preservation. Without that we would wipe out full registry.
        int registrySize = (int) getLocalRegistrySize();
        int registrySizeThreshold = (int) (registrySize * serverConfig.getRenewalPercentThreshold());   // (3)
        int evictionLimit = registrySize - registrySizeThreshold;

        int toEvict = Math.min(expiredLeases.size(), evictionLimit);
        if (toEvict > 0) {
            logger.info("Evicting {} items (expired={}, evictionLimit={})", toEvict, expiredLeases.size(), evictionLimit);

            Random random = new Random(System.currentTimeMillis());
            for (int i = 0; i < toEvict; i++) {
                // Pick a random item (Knuth shuffle algorithm)
                int next = i + random.nextInt(expiredLeases.size() - i);
                Collections.swap(expiredLeases, i, next);
                Lease<InstanceInfo> lease = expiredLeases.get(i);

                String appName = lease.getHolder().getAppName();
                String id = lease.getHolder().getId();
                EXPIRED.increment();
                logger.warn("DS: Registry: expired lease for {}/{}", appName, id);
                internalCancel(appName, id, false);
            }
        }
    }

(2)中把本地的registry中的租約信息全部提取出來藏研,并在(3)通過serverConfig.getRenewalPercentThreshold()[eureka.server.renewal-percent-threshold敬矩,默認(rèn)85%]計算出一個最大可剔除的閾值evictionLimit

新增Peer Node時的初始化

在有多個Eureka Server的情況下蠢挡,每個Eureka Server之間是如何發(fā)現(xiàn)對方的呢弧岳?

通過調(diào)試之后,我們根據(jù)調(diào)用鏈從下往上追溯业踏,其初始入口為org.springframework.cloud.netflix.eureka.server.EurekaServerBootstrap#contextInitialized

    public void contextInitialized(ServletContext context) {
        try {
            initEurekaEnvironment();
            initEurekaServerContext();  // (4)

            context.setAttribute(EurekaServerContext.class.getName(), this.serverContext);
        }
        catch (Throwable e) {
            log.error("Cannot bootstrap eureka server :", e);
            throw new RuntimeException("Cannot bootstrap eureka server :", e);
        }
    }

由下個入口(4)最終可以定位到方法com.netflix.eureka.registry.PeerAwareInstanceRegistryImpl#syncUp禽炬,從對應(yīng)的javadoc上我們可以知道該方法從peer eureka節(jié)點往自己填充注冊表信息。 如果操作失敗則此同步操作將failover到其他節(jié)點勤家,直到遍歷完列表(service urls)為止腹尖。該方法與普通的Eureka Client注冊到Eureka Server不同的一點是,其標(biāo)志位isReplication為true伐脖,如果不記得這是什么作用的話可以翻閱到上面的Register - 注冊小節(jié)热幔。

Peer Node信息的定時更新

首先我們看Eureka Server的上下文實體中的方法com.netflix.eureka.DefaultEurekaServerContext#initialize

    @PostConstruct
    @Override
    public void initialize() throws Exception {
        logger.info("Initializing ...");
        peerEurekaNodes.start();    // (5)
        registry.init(peerEurekaNodes);
        logger.info("Initialized");
    }

該方法明確指出這是一個Spring Bean,在構(gòu)建Bean完成后執(zhí)行此方法讼庇,繼續(xù)追蹤(5)绎巨。

    public void start() {
        taskExecutor = Executors.newSingleThreadScheduledExecutor(
                new ThreadFactory() {
                    @Override
                    public Thread newThread(Runnable r) {
                        Thread thread = new Thread(r, "Eureka-PeerNodesUpdater");
                        thread.setDaemon(true);
                        return thread;
                    }
                }
        );
        try {
            updatePeerEurekaNodes(resolvePeerUrls());   // (6)
            Runnable peersUpdateTask = new Runnable() { // (7)
                @Override
                public void run() {
                    try {
                        updatePeerEurekaNodes(resolvePeerUrls());   // (6)
                    } catch (Throwable e) {
                        logger.error("Cannot update the replica Nodes", e);
                    }

                }
            };
            taskExecutor.scheduleWithFixedDelay(
                    peersUpdateTask,    // (7)
                    serverConfig.getPeerEurekaNodesUpdateIntervalMs(),  // (8)
                    serverConfig.getPeerEurekaNodesUpdateIntervalMs(),
                    TimeUnit.MILLISECONDS
            );
        } catch (Exception e) {
            throw new IllegalStateException(e);
        }
        for (PeerEurekaNode node : peerEurekaNodes) {
            logger.info("Replica node URL:  " + node.getServiceUrl());
        }
    }

上面這段代碼很清晰地告訴我們在啟動Eureka Server的時候就會調(diào)用updatePeerEurekaNodes(...)更新peer的狀態(tài),并封裝為一個Runnable進行周期性更新蠕啄。這個定時時間由serverConfig.getPeerEurekaNodesUpdateIntervalMs()[eureka.server.peer-eureka-nodes-update-interval-ms]所控制场勤,默認(rèn)值為600s,即10min介汹。一直經(jīng)由EndpointUtils#getDiscoveryServiceUrls却嗡、EndpointUtils#getServiceUrlsFromConfigEurekaClientConfigBean#getEurekaServerServiceUrls獲得對應(yīng)zone的service urls,如有需要可以覆蓋上述getEurekaServerServiceUrls方法以動態(tài)獲取service urls嘹承,而不是選擇Spring Cloud默認(rèn)從properties文件讀取。

Self Preservation - 自我保護

當(dāng)新增Eureka Server時如庭,他會先嘗試從其他Peer上獲取所有Eureka Instance的注冊信息叹卷。如果在獲取時出現(xiàn)問題,該Eureka Server會在放棄之前嘗試在其他Peer上獲取注冊信息坪它。如果這個Eureka Server成功獲取到所有Instance的注冊信息骤竹,那么他就會根據(jù)所獲取到的注冊信息設(shè)置應(yīng)該接收到的續(xù)約閾值。如果在任何時候續(xù)約的閾值低于所設(shè)定的值(在15分鐘[eureka.server.renewal-threshold-update-interval-ms]內(nèi)低于85%[eureka.server.renewal-percent-threshold])往毡,則該Eureka Server會出于保護當(dāng)前注冊列表的目的而停止將任何Instance進行過期處理蒙揣。

在Netflix中上述保護措施被成為自我保護模式,主要是用于Eureka Server與Eureka Client存在網(wǎng)絡(luò)分區(qū)情況下的場景开瞭。在這種情況下懒震,Eureka Server嘗試保護其已有的實例信息罩息,但如果出現(xiàn)大規(guī)模的網(wǎng)絡(luò)分區(qū)時,相應(yīng)的Eureka Client會獲取到大量無法響應(yīng)的服務(wù)个扰。所以瓷炮,Eureka Client必須確保對于一些不存在或者無法響應(yīng)的Eureka Instance具備更加彈性的應(yīng)對策略,例如快速超時并嘗試其他實例递宅。

在網(wǎng)絡(luò)分區(qū)出現(xiàn)時可能會發(fā)生以下幾種情況:

  • Peer之間的心跳可能會失敗娘香,某Eureka Server檢測到這種情況并為了保護當(dāng)前的注冊列表而進入了自我保護模式。新的注冊可能發(fā)生在某些孤立的Eureka Server上办龄,某些Eureka Client可能會擁有新的注冊列表烘绽,而另外一些則可能沒有(不同的實例視圖)。
  • 當(dāng)網(wǎng)絡(luò)恢復(fù)到穩(wěn)定狀態(tài)后俐填,Eureka Server會進行自我修復(fù)诀姚。當(dāng)Peer能正常通信之后注冊信息會被重新同步。
  • 最重要的一點是玷禽,在網(wǎng)絡(luò)中斷期間Eureka Server應(yīng)該更距彈性赫段,但在這段期間Eureka Client可能會有不同的實例視圖。

作者:Chris
原博客:http://blog.chriscs.com
Github:https://github.com/prontera

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末矢赁,一起剝皮案震驚了整個濱河市糯笙,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌撩银,老刑警劉巖给涕,帶你破解...
    沈念sama閱讀 206,214評論 6 481
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異额获,居然都是意外死亡够庙,警方通過查閱死者的電腦和手機,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,307評論 2 382
  • 文/潘曉璐 我一進店門抄邀,熙熙樓的掌柜王于貴愁眉苦臉地迎上來耘眨,“玉大人,你說我怎么就攤上這事境肾√弈眩” “怎么了?”我有些...
    開封第一講書人閱讀 152,543評論 0 341
  • 文/不壞的土叔 我叫張陵奥喻,是天一觀的道長偶宫。 經(jīng)常有香客問我,道長环鲤,這世上最難降的妖魔是什么纯趋? 我笑而不...
    開封第一講書人閱讀 55,221評論 1 279
  • 正文 為了忘掉前任,我火速辦了婚禮,結(jié)果婚禮上吵冒,老公的妹妹穿的比我還像新娘纯命。我一直安慰自己,他們只是感情好桦锄,可當(dāng)我...
    茶點故事閱讀 64,224評論 5 371
  • 文/花漫 我一把揭開白布扎附。 她就那樣靜靜地躺著,像睡著了一般结耀。 火紅的嫁衣襯著肌膚如雪留夜。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 49,007評論 1 284
  • 那天图甜,我揣著相機與錄音碍粥,去河邊找鬼。 笑死黑毅,一個胖子當(dāng)著我的面吹牛嚼摩,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播矿瘦,決...
    沈念sama閱讀 38,313評論 3 399
  • 文/蒼蘭香墨 我猛地睜開眼枕面,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了缚去?” 一聲冷哼從身側(cè)響起潮秘,我...
    開封第一講書人閱讀 36,956評論 0 259
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎易结,沒想到半個月后枕荞,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 43,441評論 1 300
  • 正文 獨居荒郊野嶺守林人離奇死亡搞动,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 35,925評論 2 323
  • 正文 我和宋清朗相戀三年躏精,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片鹦肿。...
    茶點故事閱讀 38,018評論 1 333
  • 序言:一個原本活蹦亂跳的男人離奇死亡矗烛,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出狮惜,到底是詐尸還是另有隱情高诺,我是刑警寧澤,帶...
    沈念sama閱讀 33,685評論 4 322
  • 正文 年R本政府宣布碾篡,位于F島的核電站,受9級特大地震影響筏餐,放射性物質(zhì)發(fā)生泄漏开泽。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 39,234評論 3 307
  • 文/蒙蒙 一魁瞪、第九天 我趴在偏房一處隱蔽的房頂上張望穆律。 院中可真熱鬧惠呼,春花似錦、人聲如沸峦耘。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,240評論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽辅髓。三九已至泣崩,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間洛口,已是汗流浹背矫付。 一陣腳步聲響...
    開封第一講書人閱讀 31,464評論 1 261
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留第焰,地道東北人买优。 一個月前我還...
    沈念sama閱讀 45,467評論 2 352
  • 正文 我出身青樓,卻偏偏與公主長得像挺举,于是被迫代替她去往敵國和親杀赢。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 42,762評論 2 345

推薦閱讀更多精彩內(nèi)容