eureka(一)-功能介紹與客戶端之服務(wù)獲取

Eureka源碼分析(2.1.4.Release)

首先源碼切忌一行一行debug驳糯,需先了解eureka主要功能后届搁,再分析其功能如何實(shí)現(xiàn)爆阶。


image.png

主要功能

  • 服務(wù)注冊(cè):客戶端向eureka服務(wù)端注冊(cè)自己的服務(wù),服務(wù)端將信息存于多級(jí)緩存中缭黔。
  • 服務(wù)續(xù)約:服務(wù)注冊(cè)后蒂破,客戶端會(huì)定時(shí)發(fā)送心跳包來保持可用性,避免被剔除寞蚌。每30秒(eureka.instance.leaseRenewallIntervalInSeconds)發(fā)送一次心跳來進(jìn)行服務(wù)續(xù)約钠糊。
  • 服務(wù)同步:eureka服務(wù)端之間互相進(jìn)行注冊(cè),構(gòu)建server集群壹哺,不同Server之間會(huì)進(jìn)行服務(wù)同步,用來保證服務(wù)信息的一致性管宵。
  • 服務(wù)列表獲取:客戶端請(qǐng)求eureka server獲取注冊(cè)的服務(wù)清單箩朴,并緩存到客戶端本地岗喉。默認(rèn)間隔30秒。(eureka.client.registryFetchIntervalSeconds)

fetchRegistry:true 當(dāng)值為true時(shí)炸庞,會(huì)定期從eureka服務(wù)端獲取列表,集群中的eureka服務(wù)端也會(huì)從其他的eureka節(jié)點(diǎn)中獲取列表)

  • 遠(yuǎn)程調(diào)用:服務(wù)調(diào)用方在獲取到服務(wù)清單后查牌,就可以根據(jù)自身的一個(gè)負(fù)載均衡策略找到一個(gè)服務(wù)url滥壕,并對(duì)他發(fā)起請(qǐng)求纸颜。
  • 服務(wù)下線:客戶端關(guān)閉或重啟時(shí)绎橘,需告訴server端該服務(wù)已下線。服務(wù)端在收到請(qǐng)求后涮较,就會(huì)把該服務(wù)狀態(tài)置為下線(DOWN)胡岔,并把該下線事件傳播出去
  • 服務(wù)剔除:如果客戶端沒有正常下線,需要有剔除機(jī)制靶瘸,將不可用的服務(wù)刪除掉。EurekaServer 在啟動(dòng)時(shí)會(huì)創(chuàng)建一個(gè)定時(shí)任務(wù)怨咪,每隔默認(rèn)60秒,從當(dāng)前服務(wù)清單中把超時(shí)沒有續(xù)約(默認(rèn)90秒唉匾,eureka.instance.leaseExpirationDurationInSeconds)的服務(wù)剔除
  • 自我保護(hù):當(dāng)短時(shí)間內(nèi),統(tǒng)計(jì)續(xù)約失敗的比例巍膘,如果達(dá)到一定閾值,就會(huì)觸發(fā)自我保護(hù)機(jī)制璃饱。在該機(jī)制下肪康,Eureka Server不會(huì)剔除任何微服務(wù)荚恶。等到一分鐘內(nèi)最近續(xù)約個(gè)數(shù)大于一分鐘內(nèi)最小續(xù)約,再退出自我保護(hù)機(jī)制磷支。自我保護(hù)開關(guān)(eureka.server.enable-self-preservation:false)

Eureka保證AP

什么是AP谒撼?
著名的CAP理論指出,一個(gè)分布式系統(tǒng)不可能同時(shí)滿足C(一致性)雾狈、A(可用性)和P(分區(qū)容錯(cuò)性)廓潜。
由于分區(qū)容錯(cuò)性在是分布式系統(tǒng)中必須要保證的,因此我們只能在A和C之間進(jìn)行權(quán)衡箍邮。Eureka選擇了保證AP锭弊。

  1. 如一個(gè)客戶端因網(wǎng)絡(luò)原因在一個(gè)eureka服務(wù)端無法獲取服務(wù)列表時(shí)擂错,可以從其他eureka服務(wù)端獲取數(shù)據(jù),即使與之前的eureka服務(wù)列表不一致钮呀;
  2. eureka服務(wù)端有同步機(jī)制爽醋,保證了服務(wù)列表的最終一致性(非強(qiáng)一致性);

客戶端初始化

這里簡(jiǎn)要分析一下
springboot會(huì)讀取spring-cloud-netflix-eureka-client/2.1.1.RELEASE/spring-cloud-netflix-eureka-client-2.1.1.RELEASE.jar!/META-INF/spring.factories文件.(詳情請(qǐng)看上一篇springboot自動(dòng)裝載

自動(dòng)初始化EnableAutoConfiguration下的所有配置類蚂四,內(nèi)容如下:


image.png

重點(diǎn)看EurekaClientAutoConfiguration類

//初始化eureka客戶端
@Bean(destroyMethod = "shutdown")
        @ConditionalOnMissingBean(value = EurekaClient.class, search = SearchStrategy.CURRENT)
        public EurekaClient eurekaClient(ApplicationInfoManager manager,
                EurekaClientConfig config) {
            return new CloudEurekaClient(manager, config, this.optionalArgs,
                    this.context);
        }
image.png

CloudEurekaClient繼承了DiscoveryClient類遂赠,該類為原生eureka類跷睦,所以springcloud的eureka客戶端實(shí)際是封裝了原生eureka客戶端,分析發(fā)現(xiàn)抑诸,eureka主要功能都在原生類上,所以直接看原生eureka類的初始化代碼就行了奸绷。
簡(jiǎn)要初始化流程如下:


image.png

具體初始化代碼:com.netflix.discovery.DiscoveryClient#DiscoveryClient(com.netflix.appinfo.ApplicationInfoManager, com.netflix.discovery.EurekaClientConfig, com.netflix.discovery.AbstractDiscoveryClientOptionalArgs, javax.inject.Provider<com.netflix.discovery.BackupRegistry>)
代碼片段:

……忽略上部分代碼……
//實(shí)例化一個(gè)線程池(核心線程:2個(gè)健盒,分別對(duì)應(yīng)心跳續(xù)約任務(wù)和獲取服務(wù)列表任務(wù))
            scheduler = Executors.newScheduledThreadPool(2,
                    new ThreadFactoryBuilder()
                            .setNameFormat("DiscoveryClient-%d")
                            .setDaemon(true)
                            .build());
//聲明心跳續(xù)約執(zhí)行器(續(xù)約)
            heartbeatExecutor = new ThreadPoolExecutor(
                    1, clientConfig.getHeartbeatExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,
                    new SynchronousQueue<Runnable>(),
                    new ThreadFactoryBuilder()
                            .setNameFormat("DiscoveryClient-HeartbeatExecutor-%d")
                            .setDaemon(true)
                            .build()
            );  
//聲明更新服務(wù)列表緩存執(zhí)行器(獲取任務(wù)列表)
            cacheRefreshExecutor = new ThreadPoolExecutor(
                    1, clientConfig.getCacheRefreshExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,
                    new SynchronousQueue<Runnable>(),
                    new ThreadFactoryBuilder()
                            .setNameFormat("DiscoveryClient-CacheRefreshExecutor-%d")
                            .setDaemon(true)
                            .build()
            ); 
……忽略部分代碼……
            //重點(diǎn)看這里:初始化定時(shí)任務(wù)
            initScheduledTasks();

com.netflix.discovery.DiscoveryClient#initScheduledTasks:

 private void initScheduledTasks() {
        //默認(rèn)為true扣癣,只有eureka服務(wù)端非集群模式時(shí),該設(shè)置可以為false
        if (clientConfig.shouldFetchRegistry()) {
            //默認(rèn)eureka.client.registryFetchIntervalSeconds=30父虑,延時(shí)30秒執(zhí)行
            int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds();
            //默認(rèn)eureka.client.cacheRefreshExecutorExponentialBackOffBound=10士嚎,如果發(fā)生超時(shí)的重試延遲的最大乘數(shù)。
            int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound();
          //其中scheduler和cacheRefreshExecutor為之前提到的聲明對(duì)象莱衩,其實(shí)就是初始化了一個(gè)cacheRefresh名字的線程執(zhí)行任務(wù)
            scheduler.schedule(
                    new TimedSupervisorTask(
                            "cacheRefresh",
                            scheduler,
                            cacheRefreshExecutor,
                            registryFetchIntervalSeconds,
                            TimeUnit.SECONDS,
                            expBackOffBound,
                            new CacheRefreshThread()
                    ),
                    registryFetchIntervalSeconds, TimeUnit.SECONDS);
        }
……忽略下半部分代碼(與服務(wù)注冊(cè)任務(wù)相關(guān))……

在這里可以看到scheduler.schedule方法就是在定義一個(gè)延時(shí)任務(wù)笨蚁,30秒后執(zhí)行。但是按理說應(yīng)該定義一個(gè)定時(shí)任務(wù)定時(shí)獲取服務(wù)列表才對(duì)括细。為什么要這樣定義呢奋单?下面重點(diǎn)看看TimedSupervisorTask

初始化實(shí)際賦值:

        //上文提到的線程池
        this.scheduler = scheduler;
        //上文提到的緩存更新執(zhí)行器
        this.executor = cacheRefreshExecutor;
        //超時(shí)時(shí)間:上文提到的默認(rèn)30秒
        this.timeoutMillis = timeUnit.toMillis(registryFetchIntervalSeconds);
        //執(zhí)行的真正任務(wù)
        this.task = new CacheRefreshThread();
        //延遲時(shí)間:上文提到的默認(rèn)30秒
        this.delay = new AtomicLong(timeoutMillis);
        //最大延遲時(shí)間:30秒*10=300秒,有什么用呢览濒?看下面解釋
        this.maxDelay = timeoutMillis * expBackOffBound

這里說一下TimedSupervisorTask的執(zhí)行邏輯匾七,非固定的執(zhí)行時(shí)間,而是采用動(dòng)態(tài)間隔時(shí)間昨忆,去執(zhí)行獲取服務(wù)列表任務(wù)。(在平常工作中席里,也可以效仿此方法實(shí)現(xiàn)動(dòng)態(tài)設(shè)置任務(wù))

具體實(shí)現(xiàn)流程圖如下:


image.png

具體實(shí)現(xiàn)代碼:
com.netflix.discovery.TimedSupervisorTask#run:

@Override
    public void run() {
        Future<?> future = null;
        try {
            future = executor.submit(task);
            threadPoolLevelGauge.set((long) executor.getActiveCount());
            //執(zhí)行任務(wù)在默認(rèn)超時(shí)時(shí)間內(nèi)奖磁。
            future.get(timeoutMillis, TimeUnit.MILLISECONDS);  
            //如果任務(wù)沒超時(shí),則把延時(shí)時(shí)間設(shè)置為默認(rèn)的超時(shí)時(shí)間30秒
            delay.set(timeoutMillis);
            threadPoolLevelGauge.set((long) executor.getActiveCount());
            successCounter.increment();
        } catch (TimeoutException e) {
            //如果超時(shí)
            logger.warn("task supervisor timed out", e);
            timeoutCounter.increment();
            long currentDelay = delay.get();
            //設(shè)置最新延時(shí)時(shí)間:將當(dāng)前超時(shí)時(shí)間*2秕狰,與最大延時(shí)比較躁染,即超時(shí)時(shí)間不能超過300秒
            long newDelay = Math.min(maxDelay, currentDelay * 2);
            //CAS來控制多線程同步
            delay.compareAndSet(currentDelay, newDelay);

        } catch (RejectedExecutionException e) {
            if (executor.isShutdown() || scheduler.isShutdown()) {
                logger.warn("task supervisor shutting down, reject the task", e);
            } else {
                logger.warn("task supervisor rejected the task", e);
            }

            rejectedCounter.increment();
        } catch (Throwable e) {
            if (executor.isShutdown() || scheduler.isShutdown()) {
                logger.warn("task supervisor shutting down, can't accept the task");
            } else {
                logger.warn("task supervisor threw an exception", e);
            }

            throwableCounter.increment();
        } finally {
            if (future != null) {
                future.cancel(true);
            }
            //又調(diào)用一個(gè)延時(shí)任務(wù),延時(shí)時(shí)間為新設(shè)置的延時(shí)時(shí)間
            if (!scheduler.isShutdown()) {
                scheduler.schedule(this, delay.get(), TimeUnit.MILLISECONDS);
            }
        }
    }

服務(wù)列表獲取

下面再來看看CacheRefreshThread的實(shí)現(xiàn)邏輯
com.netflix.discovery.DiscoveryClient#refreshRegistry->
com.netflix.discovery.DiscoveryClient#fetchRegistry

    private boolean fetchRegistry(boolean forceFullRegistryFetch) {
        Stopwatch tracer = FETCH_REGISTRY_TIMER.start();

        try {
            //獲取本地緩存:localRegionApps.get()
            Applications applications = getApplications();
            //判斷是全量更新還是增量更新
            if (clientConfig.shouldDisableDelta()
                    || (!Strings.isNullOrEmpty(clientConfig.getRegistryRefreshSingleVipAddress()))
                    || forceFullRegistryFetch
                    || (applications == null)
                    || (applications.getRegisteredApplications().size() == 0)
                    || (applications.getVersion() == -1)) //Client application does not have latest library supporting delta
            {
                logger.info("Disable delta property : {}", clientConfig.shouldDisableDelta());
                logger.info("Single vip registry refresh property : {}", clientConfig.getRegistryRefreshSingleVipAddress());
                logger.info("Force full registry fetch : {}", forceFullRegistryFetch);
                logger.info("Application is null : {}", (applications == null));
                logger.info("Registered Applications size is zero : {}",
                        (applications.getRegisteredApplications().size() == 0));
                logger.info("Application version is -1: {}", (applications.getVersion() == -1));
                //全量獲取
                getAndStoreFullRegistry();
            } else {
                //增量獲取
                getAndUpdateDelta(applications);
            }
            applications.setAppsHashCode(applications.getReconcileHashCode());
            ……忽略下部分代碼……
    }

全量獲取:
com.netflix.discovery.DiscoveryClient#getAndStoreFullRegistry->
eurekaTransport.queryClient.getApplications(remoteRegionsRef.get())->
com.netflix.discovery.shared.transport.jersey.AbstractJerseyEurekaHttpClient#getApplications->
com.netflix.discovery.shared.transport.jersey.AbstractJerseyEurekaHttpClient#getApplicationsInternal


image.png

增量獲取:
com.netflix.discovery.DiscoveryClient#getAndUpdateDelta:


image.png

eueka不是每次都是采用全量獲取服務(wù)列表挠羔,而是默認(rèn)采用增量方式埋嵌,獲取變化的實(shí)例。
當(dāng)然為保證獲取到的服務(wù)列表是最新的拌喉,獲取增量的同時(shí)俐银,eureka服務(wù)端也會(huì)把最新的服務(wù)列表hashcode返回端仰,客戶端將此hashcode與更新后的本地緩存hashcode作對(duì)比,如果不相同吱七,則重新進(jìn)行全量獲取鹤竭。

可以學(xué)習(xí)的地方:

  1. TimeSupervisorTask是一個(gè)動(dòng)態(tài)時(shí)間間隔任務(wù),如果遇到執(zhí)行任務(wù)超時(shí)臀稚,則擴(kuò)大為原來間隔時(shí)間的兩倍,一直到延時(shí)時(shí)間的閾值窜管。一旦新任務(wù)不再超時(shí),間隔時(shí)間又恢復(fù)為初始值幕帆。如服務(wù)列表獲取任務(wù)默認(rèn)為每隔30秒執(zhí)行,隨著執(zhí)行時(shí)間超時(shí)擴(kuò)大為原來的2倍再執(zhí)行常熙,直到到達(dá)閾值碱茁。如果執(zhí)行未超時(shí),則設(shè)置為原來的間隔時(shí)間執(zhí)行彼城。
  2. 采用CAS來控制多線程同步退个。
  3. 增量獲取數(shù)據(jù)列表。盡量避免全量獲扔镉(當(dāng)數(shù)據(jù)量很大的時(shí)候)
  4. 服務(wù)列表保存在本地緩存,每次遠(yuǎn)程調(diào)用代嗤,不需要經(jīng)過注冊(cè)中心缠借,提高執(zhí)行速度。當(dāng)eureka服務(wù)端掛了的時(shí)候硝逢,eureka客戶端還能通過本地緩存的服務(wù)列表正常調(diào)用遠(yuǎn)程服務(wù)绅喉。
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市徽缚,隨后出現(xiàn)的幾起案子革屠,更是在濱河造成了極大的恐慌排宰,老刑警劉巖红省,帶你破解...
    沈念sama閱讀 218,122評(píng)論 6 505
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場(chǎng)離奇詭異虾啦,居然都是意外死亡痕寓,警方通過查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,070評(píng)論 3 395
  • 文/潘曉璐 我一進(jìn)店門硬毕,熙熙樓的掌柜王于貴愁眉苦臉地迎上來礼仗,“玉大人,你說我怎么就攤上這事元践。” “怎么了沪羔?”我有些...
    開封第一講書人閱讀 164,491評(píng)論 0 354
  • 文/不壞的土叔 我叫張陵象浑,是天一觀的道長(zhǎng)。 經(jīng)常有香客問我篓吁,道長(zhǎng)蚪拦,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 58,636評(píng)論 1 293
  • 正文 為了忘掉前任,我火速辦了婚禮翼雀,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘箱熬。我一直安慰自己,他們只是感情好城须,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,676評(píng)論 6 392
  • 文/花漫 我一把揭開白布糕伐。 她就那樣靜靜地躺著,像睡著了一般良瞧。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上挚冤,一...
    開封第一講書人閱讀 51,541評(píng)論 1 305
  • 那天赞庶,我揣著相機(jī)與錄音,去河邊找鬼澜薄。 笑死誊锭,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的丧靡。 我是一名探鬼主播,決...
    沈念sama閱讀 40,292評(píng)論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼饭庞,長(zhǎng)吁一口氣:“原來是場(chǎng)噩夢(mèng)啊……” “哼熬荆!你這毒婦竟也來了卤恳?” 一聲冷哼從身側(cè)響起累盗,我...
    開封第一講書人閱讀 39,211評(píng)論 0 276
  • 序言:老撾萬榮一對(duì)情侶失蹤若债,失蹤者是張志新(化名)和其女友劉穎拆融,沒想到半個(gè)月后啊终,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體傲须,經(jīng)...
    沈念sama閱讀 45,655評(píng)論 1 314
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,846評(píng)論 3 336
  • 正文 我和宋清朗相戀三年例衍,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了菇绵。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 39,965評(píng)論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡翎嫡,死狀恐怖永乌,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情圈驼,我是刑警寧澤望几,帶...
    沈念sama閱讀 35,684評(píng)論 5 347
  • 正文 年R本政府宣布,位于F島的核電站橄抹,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏玉锌。R本人自食惡果不足惜疟羹,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,295評(píng)論 3 329
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望参淫。 院中可真熱鬧愧杯,春花似錦、人聲如沸民效。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,894評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽舒萎。三九已至,卻和暖如春章鲤,著一層夾襖步出監(jiān)牢的瞬間咆贬,已是汗流浹背败徊。 一陣腳步聲響...
    開封第一講書人閱讀 33,012評(píng)論 1 269
  • 我被黑心中介騙來泰國(guó)打工皱蹦, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留眷蜈,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 48,126評(píng)論 3 370
  • 正文 我出身青樓辜妓,卻偏偏與公主長(zhǎng)得像忌怎,于是被迫代替她去往敵國(guó)和親。 傳聞我的和親對(duì)象是個(gè)殘疾皇子呆躲,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,914評(píng)論 2 355

推薦閱讀更多精彩內(nèi)容