4. eureka-client 源碼分析, 服務(wù)注冊咱揍,心跳檢測

https://blog.csdn.net/wy0123/article/details/79852339?utm_source=blogxgwz9

1. 環(huán)境

eureka-core-1.9.2
eureka-client-1.9.2

2. eureka-client是客戶端的源碼,主要的功能有如下幾個

  • 服務(wù)注冊
  • 服務(wù)續(xù)約
  • 服務(wù)發(fā)現(xiàn)
  • 服務(wù)下線

而這些功能主要由netflix eureka實現(xiàn)键袱,其客戶端實現(xiàn)類為DiscoveryClient.java,它實現(xiàn)了EurekaClient接口,而EurekaClient又繼承了LookupService接口族奢。

1

3. 數(shù)據(jù)結(jié)構(gòu)

我們首先來看看這個最上層的接口都聲明了些什么?

LookupService:

Application getApplication(String appName);
Applications getApplications();
List<InstanceInfo> getInstancesById(String id);
InstanceInfo getNextServerFromEureka(String virtualHostname, boolean secure);
2

在這里從返回類型可以看出此接口的主要任務(wù)為獲取Application和InstanceInfo

  • Application:其屬性包含一個name和一個InstanceInfo類型的Set丹鸿,其name為服務(wù)名越走,為配置文件中的spring.application.name,其Set為一個InstanceInfo列表靠欢,其中記述了注冊了此服務(wù)名的Eureka節(jié)點列表廊敌。

  • InstanceInfo:主要記述了Eureka節(jié)點的信息,包括ip门怪、port等等骡澈。

而這四個接口分別是獲取application、application列表薪缆,還有獲取InstanceInfo列表秧廉,而從DiscoveryClient.java的實現(xiàn)來看伞广,getInstancesById()也是從applications對象里取出的。

    @Override
    public List<InstanceInfo> getInstancesById(String id) {
        List<InstanceInfo> instancesList = new ArrayList<InstanceInfo>();
        for (Application app : this.getApplications()
                .getRegisteredApplications()) {
            InstanceInfo instanceInfo = app.getByInstanceId(id);
            if (instanceInfo != null) {
                instancesList.add(instanceInfo);
            }
        }
        return instancesList;
    }

在現(xiàn)在的環(huán)境下查看


3

實現(xiàn)類代碼還是如上

說明在客戶端里疼电,是以服務(wù)名為單位嚼锄,保存著注冊此服務(wù)的服務(wù)提供者列表。這個服務(wù)提供者保存在InstanceInfo里蔽豺。

4. 類結(jié)構(gòu)

作為主要工作的類DiscoveryClient.java区丑,它有幾個內(nèi)部類:

  • EurekaTransport.java:用于服務(wù)注冊、服務(wù)續(xù)約等與服務(wù)器通信
  • HeartbeatThread.java:用于服務(wù)續(xù)約
  • CacheRefreshThread.java:刷新服務(wù)列表用

5. 注冊服務(wù)

在客戶端啟動后初始化了DiscoveryClient修陡,在這里啟動了3個定時任務(wù):

  • CacheRefresh線程:定時刷新服務(wù)列表
  • HeartBeat線程:定時發(fā)送心跳沧侥,進行服務(wù)續(xù)約,如果服務(wù)被注冊中心剔除魄鸦,則重新注冊
  • InstanceInfoReplicator線程:用來向注冊中心注冊自己(最初的注冊不是由這個定時任務(wù)完成宴杀,是由springcloud里的邏輯直接調(diào)用這個線程的run方法完成)

由此可見,netflix eureka 的客戶端組件DiscoveryClient啟動后拾因,服務(wù)的注冊是依靠這個心跳線程來實現(xiàn)的旺罢。
HeartbeatThread是其一個內(nèi)部線程類:

/**
 *   The heartbeat task that renews the lease in the given intervals.
 */
private class HeartbeatThread implements Runnable {
    public void run() {
        if (renew()) {
            lastSuccessfulHeartbeatTimestamp = System.currentTimeMillis();
        }
    }

其run方法,調(diào)用了renew()方法:

    /**
     *    Renew with the eureka service by making the appropriate REST call
     */
    boolean renew() {
        EurekaHttpResponse<InstanceInfo> httpResponse;
        try {
            httpResponse = eurekaTransport.registrationClient.sendHeartBeat(instanceInfo.getAppName(), instanceInfo.getId(), instanceInfo, null);
            logger.debug("{} - Heartbeat status: {}", PREFIX + appPathIdentifier, httpResponse.getStatusCode());
            if (httpResponse.getStatusCode() == 404) {
                REREGISTER_COUNTER.increment();
                logger.info("{} - Re-registering apps/{}", PREFIX + appPathIdentifier, instanceInfo.getAppName());
                return register();
            }
            return httpResponse.getStatusCode() == 200;
        } catch (Throwable e) {
            logger.error("{} - was unable to send heartbeat!", PREFIX + appPathIdentifier, e);
            return false;
        }
    }

registrationClient.sendHeartBeat向注冊中心續(xù)約绢记,如果返回404扁达,則說明已失效,需要重新調(diào)用register()注冊蠢熄。

而此線程是在一個定時任務(wù)里調(diào)用:

private void initScheduledTasks() {
        ........
        if (clientConfig.shouldRegisterWithEureka()) {
            int renewalIntervalInSecs = instanceInfo.getLeaseInfo().getRenewalIntervalInSecs();
            int expBackOffBound = clientConfig.getHeartbeatExecutorExponentialBackOffBound();
            logger.info("Starting heartbeat executor: " + "renew interval is: " + renewalIntervalInSecs);

            // Heartbeat timer
            scheduler.schedule(
                    new TimedSupervisorTask(
                            "heartbeat",
                            scheduler,
                            heartbeatExecutor,
                            renewalIntervalInSecs,
                            TimeUnit.SECONDS,
                            expBackOffBound,
                            new HeartbeatThread()
                    ),
                    renewalIntervalInSecs, TimeUnit.SECONDS);
        ..........
}

renewalIntervalInSecs為心跳續(xù)約的時間間隔跪解,默認為30秒,則意味著服務(wù)啟動30秒后签孔,才能注冊到注冊中心叉讥。

問題:

可是我們在實際情況中,服務(wù)啟動后饥追,服務(wù)立刻就注冊到注冊中心了节吮,這是為什么?

原因是Spring Cloud Eureka在封裝的時候判耕,自己調(diào)用了register()方法,啟動同時注冊了服務(wù)翘骂。
EurekaServiceRegistry.java

public class EurekaServiceRegistry implements ServiceRegistry<EurekaRegistration> {

    private static final Log log = LogFactory.getLog(EurekaServiceRegistry.class);

    @Override
    public void register(EurekaRegistration reg) {
        maybeInitializeClient(reg);

        if (log.isInfoEnabled()) {
            log.info("Registering application " + reg.getInstanceConfig().getAppname()
                    + " with eureka with status "
                    + reg.getInstanceConfig().getInitialStatus());
        }

        reg.getApplicationInfoManager()
                .setInstanceStatus(reg.getInstanceConfig().getInitialStatus());

        if (reg.getHealthCheckHandler() != null) {
            reg.getEurekaClient().registerHealthCheck(reg.getHealthCheckHandler());
        }
    }
    ........

通過調(diào)用setInstanceStatus()壁熄,立刻觸發(fā)DiscoveryClient中的statusChangeListener,

public synchronized void setInstanceStatus(InstanceStatus status) {
        InstanceStatus next = instanceStatusMapper.map(status);
        if (next == null) {
            return;
        }

        InstanceStatus prev = instanceInfo.setStatus(next);
        if (prev != null) {
            for (StatusChangeListener listener : listeners.values()) {
                try {
                    listener.notify(new StatusChangeEvent(prev, next));
                } catch (Exception e) {
                    logger.warn("failed to notify listener: {}", listener.getId(), e);
                }
            }
        }
    }

listener.notify()最終調(diào)用回DiscoveryClient.java中的register()碳竟,此段代碼在com.netflix.discovery.InstanceInfoReplicator中:

    public void run() {
        try {
            discoveryClient.refreshInstanceInfo();

            Long dirtyTimestamp = instanceInfo.isDirtyWithTime();
            if (dirtyTimestamp != null) {
                discoveryClient.register();
                instanceInfo.unsetIsDirty(dirtyTimestamp);
            }
        } catch (Throwable t) {
            logger.warn("There was a problem with the instance info replicator", t);
        } finally {
            Future next = scheduler.schedule(this, replicationIntervalSeconds, TimeUnit.SECONDS);
            scheduledPeriodicRef.set(next);
        }

由此可見草丧,Spring Cloud EurekaClient在啟動時會向服務(wù)端注冊自己的服務(wù)信息,并告知注冊中心自己的過期時間(lease-expiration-duration-in-seconds)莹桅,并且根據(jù)配置的續(xù)約間隔時間(lease-renewal-interval-in-seconds 默認30秒)定時向服務(wù)端發(fā)送心跳包進行服務(wù)續(xù)約動作昌执。

這個注冊的注冊中心的地址是通過eureka.client.serviceUrl來指定的烛亦,那么在集群中,這個地址是多個懂拾,我們是向其中一個注冊煤禽,還是每個都注冊一遍呢?

4.png
5
6

在RetryableEurekaHttpClient中岖赋,有如下代碼段:

     @Override
    protected <R> EurekaHttpResponse<R> execute(RequestExecutor<R> requestExecutor) {
        List<EurekaEndpoint> candidateHosts = null;
        int endpointIdx = 0;
        for (int retry = 0; retry < numberOfRetries; retry++) {
            EurekaHttpClient currentHttpClient = delegate.get();
            EurekaEndpoint currentEndpoint = null;
            if (currentHttpClient == null) {
                if (candidateHosts == null) {
                    candidateHosts = getHostCandidates();
                    if (candidateHosts.isEmpty()) {
                        throw new TransportException("There is no known eureka server; cluster server list is empty");
                    }
                }
                if (endpointIdx >= candidateHosts.size()) {
                    throw new TransportException("Cannot execute request on any known server");
                }

                currentEndpoint = candidateHosts.get(endpointIdx++);
                currentHttpClient = clientFactory.newClient(currentEndpoint);
            }

            try {
                EurekaHttpResponse<R> response = requestExecutor.execute(currentHttpClient);
                if (serverStatusEvaluator.accept(response.getStatusCode(), requestExecutor.getRequestType())) {
                    delegate.set(currentHttpClient);
                    if (retry > 0) {
                        logger.info("Request execution succeeded on retry #{}", retry);
                    }
                    return response;
                }
                logger.warn("Request execution failure with status code {}; retrying on another server if available", response.getStatusCode());
            } catch (Exception e) {
                logger.warn("Request execution failed with message: {}", e.getMessage());  // just log message as the underlying client should log the stacktrace
            }

            // Connection error or 5xx from the server that must be retried on another server
            delegate.compareAndSet(currentHttpClient, null);
            if (currentEndpoint != null) {
                quarantineSet.add(currentEndpoint);
            }
        }
        throw new TransportException("Retry limit reached; giving up on completing the request");
    }

這里的getHostCandidates()方法檬果,是從配置文件中取得配置得serviceUrl列表,之后唐断,只取了第一個:

currentEndpoint = candidateHosts.get(endpointIdx++);

如果注冊失敗选脊,則循環(huán)重試,用serviceUrl列表的下一個脸甘。

至于注冊信息怎么在注冊中心傳播恳啥,在Eureka Server再進行學習。

6. 發(fā)現(xiàn)服務(wù)

前面已經(jīng)注冊了服務(wù)丹诀,服務(wù)是用來消費的钝的,所以Eureka還提供了從注冊中心發(fā)現(xiàn)服務(wù)的功能。

這個發(fā)現(xiàn)服務(wù)的功能就是由上面所說的CacheRefresh線程來實現(xiàn)的忿墅。

    DiscoveryClient(ApplicationInfoManager applicationInfoManager, EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args,

        .....................

            cacheRefreshExecutor = new ThreadPoolExecutor(
                    1, clientConfig.getCacheRefreshExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,
                    new SynchronousQueue<Runnable>(),
                    new ThreadFactoryBuilder()
                            .setNameFormat("DiscoveryClient-CacheRefreshExecutor-%d")
                            .setDaemon(true)
                            .build()
            );  // use direct handoff

        ..........................

        if (clientConfig.shouldFetchRegistry() && !fetchRegistry(false)) {
            fetchRegistryFromBackup();
        }

        ..........................

        if (clientConfig.shouldFetchRegistry()) {
            // registry cache refresh timer
            int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds();
            int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound();
            scheduler.schedule(
                    new TimedSupervisorTask(
                            "cacheRefresh",
                            scheduler,
                            cacheRefreshExecutor,
                            registryFetchIntervalSeconds,
                            TimeUnit.SECONDS,
                            expBackOffBound,
                            new CacheRefreshThread()
                    ),
                    registryFetchIntervalSeconds, TimeUnit.SECONDS);
        }

registryFetchIntervalSeconds為刷新服務(wù)列表的間隔時間扁藕。這樣如果注冊中心服務(wù)列表有變化,就會被同步到客戶端疚脐。
在掉用initScheduledTasks()方法之前亿柑,和刷新服務(wù)列表信息定時任務(wù)之間,代碼還掉用了fetchRegistry(boolean forceFullRegistryFetch)方法棍弄,立刻拉取了服務(wù)端的信息望薄。這就是為什么定時任務(wù)為30秒,但是客戶端啟動后呼畸,立刻就能獲取到服務(wù)列表的原因痕支。

若初始拉取注冊信息失敗,從備份注冊中心獲取蛮原。備份注冊中心為構(gòu)造函數(shù)的最后一個參數(shù)卧须。

7.服務(wù)注冊節(jié)點與服務(wù)發(fā)現(xiàn)節(jié)點的關(guān)系

由前面的介紹可以發(fā)現(xiàn)每個客戶端節(jié)點在啟動時,都需要連接注冊中心儒陨,通過instanceInfoReplicator(間隔30秒)花嘶,這個過程也是注冊服務(wù)的過程。
所以每個Eureka節(jié)點既可以是服務(wù)生產(chǎn)者蹦漠,也可以是服務(wù)消費者椭员。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市笛园,隨后出現(xiàn)的幾起案子隘击,更是在濱河造成了極大的恐慌侍芝,老刑警劉巖,帶你破解...
    沈念sama閱讀 217,826評論 6 506
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件埋同,死亡現(xiàn)場離奇詭異州叠,居然都是意外死亡,警方通過查閱死者的電腦和手機莺禁,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,968評論 3 395
  • 文/潘曉璐 我一進店門留量,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人哟冬,你說我怎么就攤上這事楼熄。” “怎么了浩峡?”我有些...
    開封第一講書人閱讀 164,234評論 0 354
  • 文/不壞的土叔 我叫張陵可岂,是天一觀的道長。 經(jīng)常有香客問我翰灾,道長缕粹,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 58,562評論 1 293
  • 正文 為了忘掉前任纸淮,我火速辦了婚禮平斩,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘咽块。我一直安慰自己绘面,他們只是感情好,可當我...
    茶點故事閱讀 67,611評論 6 392
  • 文/花漫 我一把揭開白布侈沪。 她就那樣靜靜地躺著揭璃,像睡著了一般。 火紅的嫁衣襯著肌膚如雪亭罪。 梳的紋絲不亂的頭發(fā)上瘦馍,一...
    開封第一講書人閱讀 51,482評論 1 302
  • 那天,我揣著相機與錄音应役,去河邊找鬼情组。 笑死,一個胖子當著我的面吹牛箩祥,可吹牛的內(nèi)容都是我干的呻惕。 我是一名探鬼主播,決...
    沈念sama閱讀 40,271評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼滥比,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了做院?” 一聲冷哼從身側(cè)響起盲泛,我...
    開封第一講書人閱讀 39,166評論 0 276
  • 序言:老撾萬榮一對情侶失蹤濒持,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后寺滚,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體柑营,經(jīng)...
    沈念sama閱讀 45,608評論 1 314
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,814評論 3 336
  • 正文 我和宋清朗相戀三年村视,在試婚紗的時候發(fā)現(xiàn)自己被綠了官套。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 39,926評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡蚁孔,死狀恐怖奶赔,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情杠氢,我是刑警寧澤站刑,帶...
    沈念sama閱讀 35,644評論 5 346
  • 正文 年R本政府宣布,位于F島的核電站鼻百,受9級特大地震影響绞旅,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜温艇,卻給世界環(huán)境...
    茶點故事閱讀 41,249評論 3 329
  • 文/蒙蒙 一因悲、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧勺爱,春花似錦晃琳、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,866評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至绣否,卻和暖如春誊涯,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背蒜撮。 一陣腳步聲響...
    開封第一講書人閱讀 32,991評論 1 269
  • 我被黑心中介騙來泰國打工暴构, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人段磨。 一個月前我還...
    沈念sama閱讀 48,063評論 3 370
  • 正文 我出身青樓取逾,卻偏偏與公主長得像,于是被迫代替她去往敵國和親苹支。 傳聞我的和親對象是個殘疾皇子砾隅,可洞房花燭夜當晚...
    茶點故事閱讀 44,871評論 2 354