Spring Cloud源碼分析(一)Eureka

看過之前文章的朋友們焕檬,相信已經(jīng)對Eureka的運行機制已經(jīng)有了一定的了解。為了更深入的理解它的運作和配置,下面我們結合源碼來分別看看服務端和客戶端的通信行為是如何實現(xiàn)的籽懦。另外寫這篇文章,還有一個目的漓滔,還是希望鼓勵大家能夠學會學習和研究的方法编饺,由于目前Spring Cloud的中文資料并不多,并不是大部分的問題都能找到現(xiàn)成的答案响驴,所以其實很多問題給出一個科學而慎重的解答也都是花費研究者不少精力的透且。

在看具體源碼前,我們先回顧一下之前我們所實現(xiàn)的內容豁鲤,從而找一個合適的切入口去分析秽誊。首先,服務注冊中心琳骡、服務提供者锅论、服務消費者這三個主要元素來說,后兩者(也就是Eureka客戶端)在整個運行機制中是大部分通信行為的主動發(fā)起者日熬,而注冊中心主要是處理請求的接收者棍厌。所以,我們可以從Eureka的客戶端作為入口看看它是如何完成這些主動通信行為的竖席。

我們在將一個普通的Spring Boot應用注冊到Eureka Server中耘纱,或是從Eureka Server中獲取服務列表時,主要就做了兩件事:

  • 在應用主類中配置了@EnableDiscoveryClient注解
  • application.properties中用eureka.client.serviceUrl.defaultZone參數(shù)指定了服務注冊中心的位置

順著上面的線索毕荐,我們先查看@EnableDiscoveryClient的源碼如下:

/**
 * Annotation to enable a DiscoveryClient implementation.
 * @author Spencer Gibb
 */
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Inherited
@Import(EnableDiscoveryClientImportSelector.class)
public @interface EnableDiscoveryClient {

}

從該注解的注釋我們可以知道:該注解用來開啟DiscoveryClient的實例束析。通過搜索DiscoveryClient,我們可以發(fā)現(xiàn)有一個類和一個接口憎亚。通過梳理可以得到如下圖的關系:

Paste_Image.png

其中员寇,左邊的org.springframework.cloud.client.discovery.DiscoveryClient是Spring Cloud的接口,它定義了用來發(fā)現(xiàn)服務的常用抽象方法第美,而org.springframework.cloud.netflix.eureka.EurekaDiscoveryClient是對該接口的實現(xiàn)蝶锋,從命名來就可以判斷,它實現(xiàn)的是對Eureka發(fā)現(xiàn)服務的封裝什往。所以EurekaDiscoveryClient依賴了Eureka的com.netflix.discovery.EurekaClient接口扳缕,EurekaClient繼承了LookupService接口,他們都是Netflix開源包中的內容别威,它主要定義了針對Eureka的發(fā)現(xiàn)服務的抽象方法躯舔,而真正實現(xiàn)發(fā)現(xiàn)服務的則是Netflix包中的com.netflix.discovery.DiscoveryClient類。

那么省古,我們就看看來詳細看看DiscoveryClient類粥庄。先解讀一下該類頭部的注釋有個總體的了解,注釋的大致內容如下:

這個類用于幫助與Eureka Server互相協(xié)作豺妓。

Eureka Client負責了下面的任務:
- 向Eureka Server注冊服務實例
- 向Eureka Server為租約續(xù)期
- 當服務關閉期間惜互,向Eureka Server取消租約
- 查詢Eureka Server中的服務實例列表

Eureka Client還需要配置一個Eureka Server的URL列表布讹。

在具體研究Eureka Client具體負責的任務之前,我們先看看對Eureka Server的URL列表配置在哪里训堆。根據(jù)我們配置的屬性名:eureka.client.serviceUrl.defaultZone炒事,通過serviceUrl我們找到該屬性相關的加載屬性,但是在SR5版本中它們都被@Deprecated標注了蔫慧,并在注視中可以看到@link到了替代類com.netflix.discovery.endpoint.EndpointUtils,我們可以在該類中找到下面這個函數(shù):

public static Map<String, List<String>> getServiceUrlsMapFromConfig(
            EurekaClientConfig clientConfig, String instanceZone, boolean preferSameZone) {
    Map<String, List<String>> orderedUrls = new LinkedHashMap<>();
    String region = getRegion(clientConfig);
    String[] availZones = clientConfig.getAvailabilityZones(clientConfig.getRegion());
    if (availZones == null || availZones.length == 0) {
        availZones = new String[1];
        availZones[0] = DEFAULT_ZONE;
    }
    ……
    int myZoneOffset = getZoneOffset(instanceZone, preferSameZone, availZones);

    String zone = availZones[myZoneOffset];
    List<String> serviceUrls = clientConfig.getEurekaServerServiceUrls(zone);
    if (serviceUrls != null) {
        orderedUrls.put(zone, serviceUrls);
    }
    ……
    return orderedUrls;
}

Region权薯、Zone

在上面的函數(shù)中姑躲,我們可以發(fā)現(xiàn)客戶端依次加載了兩個內容,第一個是Region盟蚣,第二個是Zone黍析,從其加載邏上我們可以判斷他們之間的關系:

  • 通過getRegion函數(shù),我們可以看到它從配置中讀取了一個Region返回屎开,所以一個微服務應用只可以屬于一個Region阐枣,如果不特別配置,就默認為default奄抽。若我們要自己設置蔼两,可以通過eureka.client.region屬性來定義。
public static String getRegion(EurekaClientConfig clientConfig) {
    String region = clientConfig.getRegion();
    if (region == null) {
        region = DEFAULT_REGION;
    }
    region = region.trim().toLowerCase();
    return region;
}
  • 通過getAvailabilityZones函數(shù)逞度,我們可以知道當我們沒有特別為Region配置Zone的時候额划,將默認采用defaultZone,這也是我們之前配置參數(shù)eureka.client.serviceUrl.defaultZone的由來档泽。若要為應用指定Zone俊戳,我們可以通過eureka.client.availability-zones屬性來進行設置。從該函數(shù)的return內容馆匿,我們可以Zone是可以有多個的抑胎,并且通過逗號分隔來配置。由此渐北,我們可以判斷Region與Zone是一對多的關系阿逃。
public String[] getAvailabilityZones(String region) {
    String value = this.availabilityZones.get(region);
    if (value == null) {
        value = DEFAULT_ZONE;
    }
    return value.split(",");
}

ServiceUrls

在獲取了Region和Zone信息之后,才開始真正加載Eureka Server的具體地址腔稀。它根據(jù)傳入的參數(shù)按一定算法確定加載位于哪一個Zone配置的serviceUrls盆昙。

int myZoneOffset = getZoneOffset(instanceZone, preferSameZone, availZones);
String zone = availZones[myZoneOffset];
List<String> serviceUrls = clientConfig.getEurekaServerServiceUrls(zone);

具體獲取serviceUrls的實現(xiàn),我們可以詳細查看getEurekaServerServiceUrls函數(shù)的具體實現(xiàn)類EurekaClientConfigBean焊虏,該類是EurekaClientConfigEurekaConstants接口的實現(xiàn)淡喜,用來加載配置文件中的內容,這里有非常多有用的信息诵闭,這里我們先說一下此處我們關心的炼团,關于defaultZone的信息澎嚣。通過搜索defaultZone,我們可以很容易的找到下面這個函數(shù)瘟芝,它具體實現(xiàn)了易桃,如何解析該參數(shù)的過程,通過此內容锌俱,我們就可以知道晤郑,eureka.client.serviceUrl.defaultZone屬性可以配置多個,并且需要通過逗號分隔贸宏。

public List<String> getEurekaServerServiceUrls(String myZone) {
    String serviceUrls = this.serviceUrl.get(myZone);
    if (serviceUrls == null || serviceUrls.isEmpty()) {
        serviceUrls = this.serviceUrl.get(DEFAULT_ZONE);
    }
    if (!StringUtils.isEmpty(serviceUrls)) {
        final String[] serviceUrlsSplit = StringUtils.commaDelimitedListToStringArray(serviceUrls);
        List<String> eurekaServiceUrls = new ArrayList<>(serviceUrlsSplit.length);
        for (String eurekaServiceUrl : serviceUrlsSplit) {
            if (!endsWithSlash(eurekaServiceUrl)) {
                eurekaServiceUrl += "/";
            }
            eurekaServiceUrls.add(eurekaServiceUrl);
        }
        return eurekaServiceUrls;
    }
    return new ArrayList<>();
}

當客戶端在服務列表中選擇實例進行訪問時造寝,對于Zone和Region遵循這樣的規(guī)則:優(yōu)先訪問同自己一個Zone中的實例,其次才訪問其他Zone中的實例吭练。通過Region和Zone的兩層級別定義诫龙,配合實際部署的物理結構,我們就可以有效的設計出區(qū)域性故障的容錯集群鲫咽。

服務注冊

在理解了多個服務注冊中心信息的加載后签赃,我們再回頭看看DiscoveryClient類是如何實現(xiàn)“服務注冊”行為的,通過查看它的構造類分尸,可以找到它調用了下面這個函數(shù):

private void initScheduledTasks() {
    ...
    if (clientConfig.shouldRegisterWithEureka()) {
        ...
        // InstanceInfo replicator
        instanceInfoReplicator = new InstanceInfoReplicator(
                this,
               instanceInfo,
                clientConfig.getInstanceInfoReplicationIntervalSeconds(),
                2); // burstSize
        ...
        instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds());
    } else {
        logger.info("Not registering with Eureka server per configuration");
    }
}

在上面的函數(shù)中锦聊,我們可以看到關鍵的判斷依據(jù)if (clientConfig.shouldRegisterWithEureka())。在該分支內箩绍,創(chuàng)建了一個InstanceInfoReplicator類的實例括丁,它會執(zhí)行一個定時任務,查看該類的run()函數(shù)了解該任務做了什么工作:

public void run() {
    try {
        discoveryClient.refreshInstanceInfo();
        Long dirtyTimestamp = instanceInfo.isDirtyWithTime();
        if (dirtyTimestamp != null) {
            discoveryClient.register();
            instanceInfo.unsetIsDirty(dirtyTimestamp);
        }
    } catch (Throwable t) {
        logger.warn("There was a problem with the instance info replicator", t);
    } finally {
        Future next = scheduler.schedule(this, replicationIntervalSeconds, TimeUnit.SECONDS);
        scheduledPeriodicRef.set(next);
    }
}

相信大家都發(fā)現(xiàn)了discoveryClient.register();這一行伶选,真正觸發(fā)調用注冊的地方就在這里史飞。繼續(xù)查看register()的實現(xiàn)內容如下:

boolean register() throws Throwable {
    logger.info(PREFIX + appPathIdentifier + ": registering service...");
    EurekaHttpResponse<Void> httpResponse;
    try {
        httpResponse = eurekaTransport.registrationClient.register(instanceInfo);
    } catch (Exception e) {
        logger.warn("{} - registration failed {}", PREFIX + appPathIdentifier, e.getMessage(), e);
        throw e;
    }
    if (logger.isInfoEnabled()) {
        logger.info("{} - registration status: {}", PREFIX + appPathIdentifier, httpResponse.getStatusCode());
    }
    return httpResponse.getStatusCode() == 204;
}

通過屬性命名,大家基本也能猜出來仰税,注冊操作也是通過REST請求的方式進行的构资。同時,這里我們也能看到發(fā)起注冊請求的時候陨簇,傳入了一個com.netflix.appinfo.InstanceInfo對象吐绵,該對象就是注冊時候客戶端給服務端的服務的元數(shù)據(jù)。

服務獲取與服務續(xù)約

順著上面的思路河绽,我們繼續(xù)來看DiscoveryClientinitScheduledTasks函數(shù)己单,不難發(fā)現(xiàn)在其中還有兩個定時任務,分別是“服務獲取”和“服務續(xù)約”:

private void initScheduledTasks() {
    if (clientConfig.shouldFetchRegistry()) {
        // registry cache refresh timer
        int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds();
        int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound();
        scheduler.schedule(
                new TimedSupervisorTask(
                        "cacheRefresh",
                        scheduler,
                        cacheRefreshExecutor,
                        registryFetchIntervalSeconds,
                        TimeUnit.SECONDS,
                        expBackOffBound,
                        new CacheRefreshThread()
                ),
                registryFetchIntervalSeconds, TimeUnit.SECONDS);
    }
    if (clientConfig.shouldRegisterWithEureka()) {
        int renewalIntervalInSecs = instanceInfo.getLeaseInfo().getRenewalIntervalInSecs();
        int expBackOffBound = clientConfig.getHeartbeatExecutorExponentialBackOffBound();
        logger.info("Starting heartbeat executor: " + "renew interval is: " + renewalIntervalInSecs);

        // Heartbeat timer
        scheduler.schedule(
                new TimedSupervisorTask(
                        "heartbeat",
                        scheduler,
                        heartbeatExecutor,
                        renewalIntervalInSecs,
                        TimeUnit.SECONDS,
                        expBackOffBound,
                        new HeartbeatThread()
                ),
                renewalIntervalInSecs, TimeUnit.SECONDS);
        // InstanceInfo replicator
        ……
    }
}

從源碼中耙饰,我們就可以發(fā)現(xiàn)纹笼,“服務獲取”相對于“服務續(xù)約”更為獨立,“服務續(xù)約”與“服務注冊”在同一個if邏輯中苟跪,這個不難理解廷痘,服務注冊到Eureka Server后蔓涧,自然需要一個心跳去續(xù)約,防止被剔除笋额,所以他們肯定是成對出現(xiàn)的元暴。從源碼中,我們可以清楚看到了兄猩,對于服務續(xù)約相關的時間控制參數(shù):

eureka.instance.lease-renewal-interval-in-seconds=30
eureka.instance.lease-expiration-duration-in-seconds=90

而“服務獲取”的邏輯在獨立的一個if判斷中茉盏,其判斷依據(jù)就是我們之前所提到的eureka.client.fetch-registry=true參數(shù),它默認是為true的枢冤,大部分情況下我們不需要關心援岩。為了定期的更新客戶端的服務清單,以保證服務訪問的正確性掏导,“服務獲取”的請求不會只限于服務啟動,而是一個定時執(zhí)行的任務羽峰,從源碼中我們可以看到任務運行中的registryFetchIntervalSeconds參數(shù)對應eureka.client.registry-fetch-interval-seconds=30配置參數(shù)趟咆,它默認為30秒。

繼續(xù)循序漸進的向下深入梅屉,我們就能分別發(fā)現(xiàn)實現(xiàn)“服務獲取”和“服務續(xù)約”的具體方法值纱,其中“服務續(xù)約”的實現(xiàn)較為簡單,直接以REST請求的方式進行續(xù)約:

boolean renew() {
    EurekaHttpResponse<InstanceInfo> httpResponse;
    try {
        httpResponse = eurekaTransport.registrationClient.sendHeartBeat(instanceInfo.getAppName(), instanceInfo.getId(), instanceInfo, null);
        logger.debug("{} - Heartbeat status: {}", PREFIX + appPathIdentifier, httpResponse.getStatusCode());
        if (httpResponse.getStatusCode() == 404) {
            REREGISTER_COUNTER.increment();
            logger.info("{} - Re-registering apps/{}", PREFIX + appPathIdentifier, instanceInfo.getAppName());
            return register();
        }
        return httpResponse.getStatusCode() == 200;
    } catch (Throwable e) {
        logger.error("{} - was unable to send heartbeat!", PREFIX + appPathIdentifier, e);
        return false;
    }
}

而“服務獲取”則相對復雜一些坯汤,會根據(jù)是否第一次獲取發(fā)起不同的REST請求和相應的處理虐唠,具體的實現(xiàn)邏輯還是跟之前類似,有興趣的讀者可以繼續(xù)查看服務客戶端的其他具體內容惰聂,了解更多細節(jié)疆偿。

服務注冊中心處理

通過上面的源碼分析,可以看到所有的交互都是通過REST的請求來發(fā)起的搓幌。下面我們來看看服務注冊中心對這些請求的處理杆故。Eureka Server對于各類REST請求的定義都位于:com.netflix.eureka.resources包下桥嗤。

以“服務注冊”請求為例:

@POST
@Consumes({"application/json", "application/xml"})
public Response addInstance(InstanceInfo info,
                  @HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication) {
    logger.debug("Registering instance {} (replication={})", info.getId(), isReplication);
    // validate that the instanceinfo contains all the necessary required fields
    ...
    // handle cases where clients may be registering with bad DataCenterInfo with missing data
    DataCenterInfo dataCenterInfo = info.getDataCenterInfo();
    if (dataCenterInfo instanceof UniqueIdentifier) {
        String dataCenterInfoId = ((UniqueIdentifier) dataCenterInfo).getId();
        if (isBlank(dataCenterInfoId)) {
            boolean experimental = "true".equalsIgnoreCase(
                    serverConfig.getExperimental("registration.validation.dataCenterInfoId"));
            if (experimental) {
                String entity = "DataCenterInfo of type " + dataCenterInfo.getClass()
                                        + " must contain a valid id";
                return Response.status(400).entity(entity).build();
            } else if (dataCenterInfo instanceof AmazonInfo) {
                AmazonInfo amazonInfo = (AmazonInfo) dataCenterInfo;
                String effectiveId = amazonInfo.get(AmazonInfo.MetaDataKey.instanceId);
                if (effectiveId == null) {
                    amazonInfo.getMetadata().put(
                            AmazonInfo.MetaDataKey.instanceId.getName(), info.getId());
                }
            } else {
                logger.warn("Registering DataCenterInfo of type {} without an appropriate id",
                        dataCenterInfo.getClass());
            }
        }
    }

    registry.register(info, "true".equals(isReplication));
    return Response.status(204).build();  // 204 to be backwards compatible
}

在對注冊信息進行了一大堆校驗之后撬码,會調用org.springframework.cloud.netflix.eureka.server.InstanceRegistry對象中的register(InstanceInfo info, int leaseDuration, boolean isReplication)函數(shù)來進行服務注冊:

    public void register(InstanceInfo info, int leaseDuration, boolean isReplication) {
        if (log.isDebugEnabled()) {
            log.debug("register " + info.getAppName() + ", vip " + info.getVIPAddress()
                    + ", leaseDuration " + leaseDuration + ", isReplication "
                    + isReplication);
        }
        this.ctxt.publishEvent(new EurekaInstanceRegisteredEvent(this, info,
                leaseDuration, isReplication));

        super.register(info, leaseDuration, isReplication);
    }

在注冊函數(shù)中,先調用publishEvent函數(shù)幽邓,將該新服務注冊的事件傳播出去拐揭,然后調用com.netflix.eureka.registry.AbstractInstanceRegistry父類中的注冊實現(xiàn)撤蟆,將InstanceInfo中的元數(shù)據(jù)信息存儲在一個ConcurrentHashMap<String, Map<String, Lease<InstanceInfo>>>對象中,它是一個兩層Map結構堂污,第一層的key存儲服務名:InstanceInfo中的appName屬性家肯,第二層的key存儲實例名:InstanceInfo中的instanceId屬性。

服務端的請求接收都非常類似盟猖,對于其他的服務端處理息楔,這里就不再展開寝贡,讀者可以根據(jù)上面的脈絡來自己查看其內容(這里包含很多細節(jié)內容)來幫助和加深理解。

最后編輯于
?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
  • 序言:七十年代末值依,一起剝皮案震驚了整個濱河市圃泡,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌愿险,老刑警劉巖颇蜡,帶你破解...
    沈念sama閱讀 211,123評論 6 490
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異辆亏,居然都是意外死亡风秤,警方通過查閱死者的電腦和手機,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,031評論 2 384
  • 文/潘曉璐 我一進店門扮叨,熙熙樓的掌柜王于貴愁眉苦臉地迎上來缤弦,“玉大人,你說我怎么就攤上這事彻磁“澹” “怎么了?”我有些...
    開封第一講書人閱讀 156,723評論 0 345
  • 文/不壞的土叔 我叫張陵衷蜓,是天一觀的道長累提。 經(jīng)常有香客問我,道長磁浇,這世上最難降的妖魔是什么斋陪? 我笑而不...
    開封第一講書人閱讀 56,357評論 1 283
  • 正文 為了忘掉前任,我火速辦了婚禮置吓,結果婚禮上无虚,老公的妹妹穿的比我還像新娘。我一直安慰自己衍锚,他們只是感情好骑科,可當我...
    茶點故事閱讀 65,412評論 5 384
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著构拳,像睡著了一般咆爽。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上置森,一...
    開封第一講書人閱讀 49,760評論 1 289
  • 那天斗埂,我揣著相機與錄音,去河邊找鬼凫海。 笑死呛凶,一個胖子當著我的面吹牛,可吹牛的內容都是我干的行贪。 我是一名探鬼主播漾稀,決...
    沈念sama閱讀 38,904評論 3 405
  • 文/蒼蘭香墨 我猛地睜開眼模闲,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了崭捍?” 一聲冷哼從身側響起尸折,我...
    開封第一講書人閱讀 37,672評論 0 266
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎殷蛇,沒想到半個月后实夹,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 44,118評論 1 303
  • 正文 獨居荒郊野嶺守林人離奇死亡粒梦,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內容為張勛視角 年9月15日...
    茶點故事閱讀 36,456評論 2 325
  • 正文 我和宋清朗相戀三年亮航,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片匀们。...
    茶點故事閱讀 38,599評論 1 340
  • 序言:一個原本活蹦亂跳的男人離奇死亡缴淋,死狀恐怖,靈堂內的尸體忽然破棺而出泄朴,到底是詐尸還是另有隱情重抖,我是刑警寧澤,帶...
    沈念sama閱讀 34,264評論 4 328
  • 正文 年R本政府宣布叼旋,位于F島的核電站,受9級特大地震影響沦辙,放射性物質發(fā)生泄漏夫植。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 39,857評論 3 312
  • 文/蒙蒙 一油讯、第九天 我趴在偏房一處隱蔽的房頂上張望详民。 院中可真熱鬧,春花似錦陌兑、人聲如沸沈跨。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,731評論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽饿凛。三九已至,卻和暖如春软驰,著一層夾襖步出監(jiān)牢的瞬間涧窒,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 31,956評論 1 264
  • 我被黑心中介騙來泰國打工锭亏, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留纠吴,地道東北人。 一個月前我還...
    沈念sama閱讀 46,286評論 2 360
  • 正文 我出身青樓慧瘤,卻偏偏與公主長得像戴已,于是被迫代替她去往敵國和親固该。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當晚...
    茶點故事閱讀 43,465評論 2 348

推薦閱讀更多精彩內容