spring-cloud-eureka-client功能簡要分析

??spring cloud默認(rèn)使用eureka來做服務(wù)治理旁钧。作為client的核心功能有兩個(gè):服務(wù)發(fā)現(xiàn)和服務(wù)注冊(cè)。
??通過查看spring-cloud-netflix-eureka-client下的類發(fā)現(xiàn)功能核心類是CloudEurekaClient畅姊,他繼承了netflix的DiscoveryClient。
查看DiscoveryClient源碼唉韭,注釋中寫明了DiscoveryClient的四個(gè)功能:


image.png

下面一個(gè)個(gè)來看這些功能是如何實(shí)現(xiàn)的陶珠。
1.d)服務(wù)發(fā)現(xiàn)getApplications()

    @Override
    public Applications getApplications() {
        return localRegionApps.get();
    }

localRegionApps是一個(gè)成員變量

    private final AtomicReference<Applications> localRegionApps = new AtomicReference<Applications>();

所以一定是有其他地方先set了Applications,getApplications() 方法才能正確返回蒋譬。通過搜索代碼割岛,發(fā)現(xiàn)為localRegionApps設(shè)值的地方代碼基本相似:

EurekaHttpResponse<Applications> httpResponse = clientConfig.getRegistryRefreshSingleVipAddress() == null
                ? eurekaTransport.queryClient.getApplications(remoteRegionsRef.get())
                : eurekaTransport.queryClient.getVip(clientConfig.getRegistryRefreshSingleVipAddress(), remoteRegionsRef.get());
Applications serverApps = httpResponse.getEntity();
localRegionApps.set(this.filterAndShuffle(serverApps));

可以看出localRegionApps的來源,通過eurekaTransport.queryClient獲得一個(gè)EurekaHttpResponse<Applications>犯助,再進(jìn)過過濾和亂序處理存入localRegionApps癣漆。
??那現(xiàn)在就有兩個(gè)問題:1.這段代碼是由誰執(zhí)行的,2.eurekaTransport.queryClient是如何工作的剂买。
??通過call Hierarchy發(fā)現(xiàn)調(diào)用鏈的最開始:

 class CacheRefreshThread implements Runnable {
        public void run() {
            refreshRegistry();
        }
    }

??查詢CacheRefreshThread被使用的地方:

private void initScheduledTasks(){...}

而initScheduledTasks()被調(diào)用的地方是DiscoveryClient的構(gòu)造函數(shù)惠爽,所以這一切在DiscoveryClient被創(chuàng)建時(shí)就開始了。
查看initScheduledTasks()代碼:

scheduler.schedule(
                    new TimedSupervisorTask(
                            "cacheRefresh",
                            scheduler,
                            cacheRefreshExecutor,
                            registryFetchIntervalSeconds,
                            TimeUnit.SECONDS,
                            expBackOffBound,
                            new CacheRefreshThread()
                    ),
                    registryFetchIntervalSeconds, TimeUnit.SECONDS);
...
scheduler.schedule(
                    new TimedSupervisorTask(
                            "heartbeat",
                            scheduler,
                            heartbeatExecutor,
                            renewalIntervalInSecs,
                            TimeUnit.SECONDS,
                            expBackOffBound,
                            new HeartbeatThread()
                    ),
                    renewalIntervalInSecs, TimeUnit.SECONDS);

可以看出這是由線程池周期執(zhí)行的任務(wù)瞬哼,那現(xiàn)在就有兩個(gè)地方:1.執(zhí)行的線程池scheduler婚肆、cacheRefreshExecutor、cacheRefreshExecutor 2.被執(zhí)行的回調(diào):HeartbeatThread坐慰、CacheRefreshThread较性。CacheRefreshThread上面知道了是用來設(shè)置localRegionApps的,HeartbeatThread從注釋來看是用來b)服務(wù)續(xù)約的:

  /**
     * The heartbeat task that renews the lease in the given intervals.
     */
    private class HeartbeatThread implements Runnable {

        public void run() {
            if (renew()) {
                lastSuccessfulHeartbeatTimestamp = System.currentTimeMillis();
            }
        }
    }

找到三個(gè)線程池的初始化代碼:

 scheduler = Executors.newScheduledThreadPool(2,
                    new ThreadFactoryBuilder()
                            .setNameFormat("DiscoveryClient-%d")
                            .setDaemon(true)
                            .build());

            heartbeatExecutor = new ThreadPoolExecutor(
                    1, clientConfig.getHeartbeatExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,
                    new SynchronousQueue<Runnable>(),
                    new ThreadFactoryBuilder()
                            .setNameFormat("DiscoveryClient-HeartbeatExecutor-%d")
                            .setDaemon(true)
                            .build()
            );  // use direct handoff

            cacheRefreshExecutor = new ThreadPoolExecutor(
                    1, clientConfig.getCacheRefreshExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,
                    new SynchronousQueue<Runnable>(),
                    new ThreadFactoryBuilder()
                            .setNameFormat("DiscoveryClient-CacheRefreshExecutor-%d")
                            .setDaemon(true)
                            .build()
            );

可以看出eureka默認(rèn)用了兩個(gè)線程來做任務(wù)調(diào)度讨越,兩個(gè)任務(wù)各有一個(gè)專屬線程獨(dú)自負(fù)責(zé)任務(wù)的實(shí)際執(zhí)行两残。這種依靠線程池的隔離策略,在netflix組件中用到的地方有很多把跨。
????找到了CacheRefreshThread是被誰調(diào)用的人弓,接著分析eurekaTransport.queryClient是如何工作的。eurekaTransport.queryClient的服務(wù)發(fā)現(xiàn)方法:getApplications()

@Override
    public EurekaHttpResponse<Applications> getApplications(String... regions) {
        return getApplicationsInternal("apps/", regions);
    }

    private EurekaHttpResponse<Applications> getApplicationsInternal(String urlPath,
            String[] regions) {
        String url = serviceUrl + urlPath;

        if (regions != null && regions.length > 0)
            urlPath = (urlPath.contains("?") ? "&" : "?") + "regions="
                    + StringUtil.join(regions);

        ResponseEntity<EurekaApplications> response = restTemplate.exchange(url,
                HttpMethod.GET, null, EurekaApplications.class);

        return anEurekaHttpResponse(response.getStatusCodeValue(),
                response.getStatusCode().value() == HttpStatus.OK.value()
                        && response.hasBody() ? (Applications) response.getBody() : null)
                                .headers(headersOf(response)).build();
    }

可以看出是向serviceUrl進(jìn)行http請(qǐng)求獲得相關(guān)數(shù)據(jù)的着逐。而serviceUrl是DiscoveryClient在初始化eurekaTransport時(shí)傳入的崔赌。DiscoveryClient初始化時(shí)主要有三個(gè)參數(shù):ApplicationInfoManager、EurekaClientConfig耸别、ApplicationEventPublisher健芭,通過查看配置類EurekaClientAutoConfiguration可以看到,在SpringCloud中創(chuàng)建的其實(shí)是DiscoveryClient的子類CloudEurekaClient:

        @Bean(destroyMethod = "shutdown")
        @ConditionalOnMissingBean(value = EurekaClient.class, search = SearchStrategy.CURRENT)
        public EurekaClient eurekaClient(ApplicationInfoManager manager, EurekaClientConfig config) {
            return new CloudEurekaClient(manager, config, this.optionalArgs,
                    this.context);
        }

ApplicationInfoManager秀姐、EurekaClientConfig是自動(dòng)注入的ApplicationInfoManager manager, EurekaClientConfig config而ApplicationEventPublisher是容器上下文ApplicationContext慈迈。繼續(xù)查找代碼可以發(fā)現(xiàn):

        @Bean
        @ConditionalOnMissingBean(value = ApplicationInfoManager.class, search = SearchStrategy.CURRENT)
        public ApplicationInfoManager eurekaApplicationInfoManager(
                EurekaInstanceConfig config) {
            InstanceInfo instanceInfo = new InstanceInfoFactory().create(config);
            return new ApplicationInfoManager(config, instanceInfo);
        }

        @ConfigurationProperties(EurekaClientConfigBean.PREFIX)
        public class EurekaClientConfigBean implements EurekaClientConfig {...}

????ApplicationInfoManager來源于EurekaInstanceConfig ,而EurekaInstanceConfig 其實(shí)是由spring創(chuàng)建的EurekaInstanceConfigBean省有,通過@ConfigurationProperties("eureka.instance")收集了配置文件中的eureka.instance前綴的配置痒留。
????EurekaClientConfig其實(shí)是由spring實(shí)現(xiàn)并創(chuàng)建的EurekaClientConfigBean谴麦,通過@ConfigurationProperties("eureka.client")收集了配置文件中的eureka.client前綴的配置。
????到這可以看出來springcloud做的工作其實(shí)就是收集配置并用來初始化DiscoveryClient伸头。其實(shí)spring本質(zhì)上作為一個(gè)beanfactory匾效,工作就是創(chuàng)建并管理bean的生命周期。

2.ab)注冊(cè)和續(xù)約
注冊(cè)方法:register()恤磷,查看調(diào)用發(fā)現(xiàn)在DiscoveryClient的構(gòu)造函數(shù)和renew()函數(shù)中被調(diào)用了面哼。查看代碼:

    /**
     * Register with the eureka service by making the appropriate REST call.
     */
    boolean register() throws Throwable {
        logger.info(PREFIX + "{}: registering service...", appPathIdentifier);
        EurekaHttpResponse<Void> httpResponse;
        try {
            httpResponse = eurekaTransport.registrationClient.register(instanceInfo);
        } catch (Exception e) {
            logger.warn(PREFIX + "{} - registration failed {}", appPathIdentifier, e.getMessage(), e);
            throw e;
        }
        if (logger.isInfoEnabled()) {
            logger.info(PREFIX + "{} - registration status: {}", appPathIdentifier, httpResponse.getStatusCode());
        }
        return httpResponse.getStatusCode() == 204;
    }

其實(shí)是通過eurekaTransport.registrationClient.register(instanceInfo)和server進(jìn)行了一次rest-http調(diào)用。instanceInfo是這個(gè)客戶端本身扫步。
續(xù)約方法:renew()魔策,上面說到了續(xù)約方法被放入new HeartbeatThread()中,被線程池周期執(zhí)行锌妻。

    /**
     * Renew with the eureka service by making the appropriate REST call
     */
    boolean renew() {
        EurekaHttpResponse<InstanceInfo> httpResponse;
        try {
            httpResponse = eurekaTransport.registrationClient.sendHeartBeat(instanceInfo.getAppName(), instanceInfo.getId(), instanceInfo, null);
            logger.debug(PREFIX + "{} - Heartbeat status: {}", appPathIdentifier, httpResponse.getStatusCode());
            if (httpResponse.getStatusCode() == 404) {
                REREGISTER_COUNTER.increment();
                logger.info(PREFIX + "{} - Re-registering apps/{}", appPathIdentifier, instanceInfo.getAppName());
                long timestamp = instanceInfo.setIsDirtyWithTime();
                boolean success = register();
                if (success) {
                    instanceInfo.unsetIsDirty(timestamp);
                }
                return success;
            }
            return httpResponse.getStatusCode() == 200;
        } catch (Throwable e) {
            logger.error(PREFIX + "{} - was unable to send heartbeat!", appPathIdentifier, e);
            return false;
        }
    }

可以看出也是通過 eurekaTransport.registrationClient進(jìn)行一次http請(qǐng)求代乃,如果請(qǐng)求失敗,則調(diào)用一次register()從新注冊(cè)仿粹,如果都失敗則返回false。
3.c)注銷

@PreDestroy
    @Override
    public synchronized void shutdown() {
        if (isShutdown.compareAndSet(false, true)) {
            logger.info("Shutting down DiscoveryClient ...");

            if (statusChangeListener != null && applicationInfoManager != null) {
                applicationInfoManager.unregisterStatusChangeListener(statusChangeListener.getId());
            }

            cancelScheduledTasks(); //取消上面所說的兩個(gè)周期任務(wù)原茅,并關(guān)閉線程池

            // If APPINFO was registered
            if (applicationInfoManager != null
                    && clientConfig.shouldRegisterWithEureka()
                    && clientConfig.shouldUnregisterOnShutdown()) {
                applicationInfoManager.setInstanceStatus(InstanceStatus.DOWN);
                unregister();
            }

            if (eurekaTransport != null) {
                eurekaTransport.shutdown();
            }

            heartbeatStalenessMonitor.shutdown();
            registryStalenessMonitor.shutdown();

            logger.info("Completed shut down of DiscoveryClient");
        }
    }

可以看出主要工作有兩部分吭历,1.回收資源 2.調(diào)用unregister()通知eureka-server注銷。查看unregister()代碼:

void unregister() {
        // It can be null if shouldRegisterWithEureka == false
        if(eurekaTransport != null && eurekaTransport.registrationClient != null) {
            try {
                logger.info("Unregistering ...");
                EurekaHttpResponse<Void> httpResponse = eurekaTransport.registrationClient.cancel(instanceInfo.getAppName(), instanceInfo.getId());
                logger.info(PREFIX + "{} - deregister  status: {}", appPathIdentifier, httpResponse.getStatusCode());
            } catch (Exception e) {
                logger.error(PREFIX + "{} - de-registration failed{}", appPathIdentifier, e.getMessage(), e);
            }
        }
    }

可以看出這里也是通過eurekaTransport.registrationClient進(jìn)行了一次http請(qǐng)求擂橘。

綜上所述晌区,eureka的主要靠獨(dú)立的線程池執(zhí)行周期性任務(wù)來執(zhí)行http請(qǐng)求來進(jìn)行服務(wù)發(fā)現(xiàn)的更新和服務(wù)續(xù)約。而spring所扮演的角色只是DiscoveryClient的創(chuàng)建和管理者通贞,并沒有改變eureka的內(nèi)部功能朗若。我們也可以通過自己創(chuàng)建和管理DiscoveryClient在非springcloud項(xiàng)目中獨(dú)立地使用eureka,eureka功能完備昌罩,自己集成相對(duì)簡單哭懈。總之茎用,就是從server通過http請(qǐng)求獲得服務(wù)數(shù)據(jù)而已遣总,可以自己通過瀏覽器訪問:http://localhost:8761/eureka/apps看看數(shù)據(jù)

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市轨功,隨后出現(xiàn)的幾起案子旭斥,更是在濱河造成了極大的恐慌,老刑警劉巖古涧,帶你破解...
    沈念sama閱讀 211,123評(píng)論 6 490
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件垂券,死亡現(xiàn)場(chǎng)離奇詭異,居然都是意外死亡羡滑,警方通過查閱死者的電腦和手機(jī)菇爪,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,031評(píng)論 2 384
  • 文/潘曉璐 我一進(jìn)店門算芯,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人娄帖,你說我怎么就攤上這事也祠。” “怎么了近速?”我有些...
    開封第一講書人閱讀 156,723評(píng)論 0 345
  • 文/不壞的土叔 我叫張陵诈嘿,是天一觀的道長。 經(jīng)常有香客問我削葱,道長奖亚,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 56,357評(píng)論 1 283
  • 正文 為了忘掉前任析砸,我火速辦了婚禮昔字,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘首繁。我一直安慰自己作郭,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 65,412評(píng)論 5 384
  • 文/花漫 我一把揭開白布弦疮。 她就那樣靜靜地躺著夹攒,像睡著了一般。 火紅的嫁衣襯著肌膚如雪胁塞。 梳的紋絲不亂的頭發(fā)上咏尝,一...
    開封第一講書人閱讀 49,760評(píng)論 1 289
  • 那天,我揣著相機(jī)與錄音啸罢,去河邊找鬼编检。 笑死,一個(gè)胖子當(dāng)著我的面吹牛扰才,可吹牛的內(nèi)容都是我干的允懂。 我是一名探鬼主播,決...
    沈念sama閱讀 38,904評(píng)論 3 405
  • 文/蒼蘭香墨 我猛地睜開眼训桶,長吁一口氣:“原來是場(chǎng)噩夢(mèng)啊……” “哼累驮!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起舵揭,我...
    開封第一講書人閱讀 37,672評(píng)論 0 266
  • 序言:老撾萬榮一對(duì)情侶失蹤谤专,失蹤者是張志新(化名)和其女友劉穎,沒想到半個(gè)月后午绳,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體置侍,經(jīng)...
    沈念sama閱讀 44,118評(píng)論 1 303
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,456評(píng)論 2 325
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了蜡坊。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片杠输。...
    茶點(diǎn)故事閱讀 38,599評(píng)論 1 340
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖秕衙,靈堂內(nèi)的尸體忽然破棺而出蠢甲,到底是詐尸還是另有隱情,我是刑警寧澤据忘,帶...
    沈念sama閱讀 34,264評(píng)論 4 328
  • 正文 年R本政府宣布鹦牛,位于F島的核電站,受9級(jí)特大地震影響勇吊,放射性物質(zhì)發(fā)生泄漏曼追。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,857評(píng)論 3 312
  • 文/蒙蒙 一汉规、第九天 我趴在偏房一處隱蔽的房頂上張望礼殊。 院中可真熱鬧,春花似錦针史、人聲如沸晶伦。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,731評(píng)論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽坝辫。三九已至,卻和暖如春射亏,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背竭业。 一陣腳步聲響...
    開封第一講書人閱讀 31,956評(píng)論 1 264
  • 我被黑心中介騙來泰國打工智润, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人未辆。 一個(gè)月前我還...
    沈念sama閱讀 46,286評(píng)論 2 360
  • 正文 我出身青樓窟绷,卻偏偏與公主長得像,于是被迫代替她去往敵國和親咐柜。 傳聞我的和親對(duì)象是個(gè)殘疾皇子兼蜈,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 43,465評(píng)論 2 348

推薦閱讀更多精彩內(nèi)容

  • Spring Cloud為開發(fā)人員提供了快速構(gòu)建分布式系統(tǒng)中一些常見模式的工具(例如配置管理,服務(wù)發(fā)現(xiàn)拙友,斷路器为狸,智...
    卡卡羅2017閱讀 134,628評(píng)論 18 139
  • Spring Boot 參考指南 介紹 轉(zhuǎn)載自:https://www.gitbook.com/book/qbgb...
    毛宇鵬閱讀 46,773評(píng)論 6 342
  • 轉(zhuǎn)載請(qǐng)標(biāo)明出處:http://blog.csdn.net/forezp/article/details/73017...
    方志朋閱讀 4,716評(píng)論 2 12
  • 我們?cè)趯W(xué)習(xí)的過程當(dāng)中都在思考一問題,初級(jí)程序員與高級(jí)程序員到底有什么樣的差別遗契。我每次很辛苦都完成一個(gè)案例辐棒,為了一個(gè)...
    小子520閱讀 452評(píng)論 0 0
  • 聽說現(xiàn)在你在安睡,突然很安心,睡吧漾根,好好休息才可以正確思考泰涂。感賞今天外公外婆也可以安靜,感賞今天沒什么事發(fā)生辐怕,投射...
    xueyanL閱讀 101評(píng)論 0 0