eureka(二)-客戶端之服務續(xù)約和注冊

Eureka源碼分析(2.1.4.Release)

首先源碼切忌一行一行debug友绝,需先了解eureka主要功能后,再分析其功能如何實現(xiàn)。

image.png

由于上一篇文章提及到客戶端如何從注冊中心中獲取服務列表eureka(一)-功能介紹與客戶端之服務獲取

客戶端之服務續(xù)約

服務續(xù)約任務與服務列表獲取一樣,都是通過TimedSupervisorTask實現(xiàn)動態(tài)時間間隔執(zhí)行任務梨睁。TimedSupervisorTask是如何實現(xiàn)動態(tài)間隔時間執(zhí)行任務的,可看上文提到的文章坛梁。

繼續(xù)從上文提到的initScheduledTasks出發(fā):
com.netflix.discovery.DiscoveryClient#initScheduledTasks:

……上部分代碼為客戶端獲取服務列表任務……
if (clientConfig.shouldRegisterWithEureka()) {
            //eureka.instance.leaseRenewallIntervalInSeconds=30,默認每30秒執(zhí)行心跳續(xù)約
            int renewalIntervalInSecs = instanceInfo.getLeaseInfo().getRenewalIntervalInSecs();
            //eureka.client.heartbeatExecutorExponentialBackOffBound=10而姐,如果發(fā)生超時的重試延遲的最大乘數(shù)。
            int expBackOffBound = clientConfig.getHeartbeatExecutorExponentialBackOffBound();
            logger.info("Starting heartbeat executor: " + "renew interval is: {}", renewalIntervalInSecs);
            //定義個名為heartbeat的線程划咐,執(zhí)行心跳續(xù)約任務
            scheduler.schedule(
                    new TimedSupervisorTask(
                            "heartbeat",
                            scheduler,
                            heartbeatExecutor,
                            renewalIntervalInSecs,
                            TimeUnit.SECONDS,
                            expBackOffBound,
                            new HeartbeatThread()
                    ),
                    renewalIntervalInSecs, TimeUnit.SECONDS);
……下部分代碼為實例更新并通知eureka注冊中心任務……

由于TimedSupervisorTask在上篇文章已有介紹,現(xiàn)在則直接查看續(xù)約線程邏輯

com.netflix.discovery.DiscoveryClient.HeartbeatThread:

image.png

com.netflix.discovery.DiscoveryClient.HeartbeatThread#run:

        public void run() {
            if (renew()) {
                lastSuccessfulHeartbeatTimestamp = System.currentTimeMillis();
            }
        }

com.netflix.discovery.DiscoveryClient#renew:

boolean renew() {
        EurekaHttpResponse<InstanceInfo> httpResponse;
        try {
            //發(fā)起heartbeat請求
            httpResponse = eurekaTransport.registrationClient.sendHeartBeat(instanceInfo.getAppName(), instanceInfo.getId(), instanceInfo, null);
            logger.debug(PREFIX + "{} - Heartbeat status: {}", appPathIdentifier, httpResponse.getStatusCode());
            //如果返回的是無法找到钧萍,則重新注冊
            if (httpResponse.getStatusCode() == Status.NOT_FOUND.getStatusCode()) {
                REREGISTER_COUNTER.increment();
                logger.info(PREFIX + "{} - Re-registering apps/{}", appPathIdentifier, instanceInfo.getAppName());
                //設置該實例的dirty標志位true褐缠,并更新dirty時間
                long timestamp = instanceInfo.setIsDirtyWithTime();
                //重新注冊
                boolean success = register();
                //如果成功,則將實例ditry的標志改為false
                if (success) {
                    instanceInfo.unsetIsDirty(timestamp);
                }
                return success;
            }
            return httpResponse.getStatusCode() == Status.OK.getStatusCode();
        } catch (Throwable e) {
            logger.error(PREFIX + "{} - was unable to send heartbeat!", appPathIdentifier, e);
            return false;
        }
    }

實例的dirty標志isInstanceInfoDirty的作用:在執(zhí)行服務注冊任務時风瘦,如果dirty標志為true队魏,則將實例重新注冊到遠程eureka注冊中心。

客戶端之服務注冊

還是從上文提到的initScheduledTasks出發(fā):
com.netflix.discovery.DiscoveryClient#initScheduledTasks:

        if (clientConfig.shouldRegisterWithEureka()) {
            ……忽略心跳續(xù)約代碼……
            // InstanceInfo replicator
            //實例更新后,需要將更新信息發(fā)送到遠程eureka中心的線程胡桨,其實現(xiàn)了Runnable接口
            instanceInfoReplicator = new InstanceInfoReplicator(
                    this,
                    instanceInfo,
                    clientConfig.getInstanceInfoReplicationIntervalSeconds(),//instanceInfoReplicationIntervalSeconds=30官帘,默認每30秒檢查是否有更新,并注冊到注冊中心
                    2); //當前時間允許的最大請求數(shù)
           //狀態(tài)監(jiān)聽器
            statusChangeListener = new ApplicationInfoManager.StatusChangeListener() {
                @Override
                public String getId() {
                    return "statusChangeListener";
                }

                @Override
                public void notify(StatusChangeEvent statusChangeEvent) {
                    if (InstanceStatus.DOWN == statusChangeEvent.getStatus() ||
                            InstanceStatus.DOWN == statusChangeEvent.getPreviousStatus()) {
      
                        logger.warn("Saw local status change event {}", statusChangeEvent);
                    } else {
                        logger.info("Saw local status change event {}", statusChangeEvent);
                    }
                    //狀態(tài)更新后昧谊,立即更新到eureka注冊中心(這里涉及到令牌限流算法)
                    instanceInfoReplicator.onDemandUpdate();
                }
            };
            //通過ApplicationInfoManager進行的本地狀態(tài)更新將立即觸發(fā)對遠程eureka服務器的注冊/更新(速率受限)刽虹。默認為true
            if (clientConfig.shouldOnDemandUpdateStatusChange()) {
                applicationInfoManager.registerStatusChangeListener(statusChangeListener);
            }

      //eureka.client.initialInstanceInfoReplicationIntervalSeconds=40s,默認延時40s呢诬,實例更新或注冊線程啟動      
  instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds());
        } else {
            logger.info("Not registering with Eureka server per configuration");
        }

從上面的代碼可以知道涌哲,instanceInfoReplicator的run方法實現(xiàn)了具體的邏輯代碼
com.netflix.discovery.InstanceInfoReplicator#start:

    public void start(int initialDelayMs) {
        if (started.compareAndSet(false, true)) {
            //第一次啟動:設置dirty標志為true,觸發(fā)第一次注冊尚镰。
            instanceInfo.setIsDirty(); 
            Future next = scheduler.schedule(this, initialDelayMs, TimeUnit.SECONDS);
            scheduledPeriodicRef.set(next);
        }
    }

com.netflix.discovery.InstanceInfoReplicator#run:

public void run() {
        try {
            //檢查實例是否需要更新阀圾,如果需要更新,設置dirty標志為true狗唉,更新dirty時間初烘。
            discoveryClient.refreshInstanceInfo();
           //如果dirty標志為true,則返回dirty時間分俯,如果標志為false則返回null
            Long dirtyTimestamp = instanceInfo.isDirtyWithTime();
            if (dirtyTimestamp != null) {
                //dirty時間不為null肾筐,則需要更新到注冊中心,重新注冊
                discoveryClient.register();
                //設置dirty標志為false
                instanceInfo.unsetIsDirty(dirtyTimestamp);
            }
        } catch (Throwable t) {
            logger.warn("There was a problem with the instance info replicator", t);
        } finally {
            //replicationIntervalSeconds秒后(即30秒)澳迫,又重新執(zhí)行該方法局齿。
            Future next = scheduler.schedule(this, replicationIntervalSeconds, TimeUnit.SECONDS);
            scheduledPeriodicRef.set(next);
        }
    }

com.netflix.discovery.DiscoveryClient#refreshInstanceInfo:

/**
     * Refresh the current local instanceInfo. Note that after a valid refresh where changes are observed, the
     * isDirty flag on the instanceInfo is set to true
     */
    void refreshInstanceInfo() {
        applicationInfoManager.refreshDataCenterInfoIfRequired();
        applicationInfoManager.refreshLeaseInfoIfRequired();

        InstanceStatus status;
        try {
            status = getHealthCheckHandler().getStatus(instanceInfo.getStatus());
        } catch (Exception e) {
            logger.warn("Exception from healthcheckHandler.getStatus, setting status to DOWN", e);
            status = InstanceStatus.DOWN;
        }

        if (null != status) {
            applicationInfoManager.setInstanceStatus(status);
        }
    }

refreshInstanceInfo方法具體實現(xiàn)流程圖如下:


image.png

服務注冊任務整體流程如下:


image.png

附加算法介紹:令牌桶限流算法(簡要)

instanceInfoReplicator.onDemandUpdate():實例狀態(tài)變更后,馬上更新到遠程eureka遠程中心橄登。
該方法用到了RateLimiter來實現(xiàn)限流抓歼。RateLimiter實現(xiàn)了令牌桶限流算法。為了防止網(wǎng)絡擁塞拢锹,需限制流出網(wǎng)絡的流量谣妻,使流量以比較均勻的速度向外發(fā)送。令牌桶算法就實現(xiàn)了這個功能卒稳,可控制發(fā)送到網(wǎng)絡上數(shù)據(jù)的數(shù)目蹋半,并允許突發(fā)數(shù)據(jù)的發(fā)送。


image.png
最后編輯于
?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
  • 序言:七十年代末充坑,一起剝皮案震驚了整個濱河市减江,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌捻爷,老刑警劉巖辈灼,帶你破解...
    沈念sama閱讀 206,602評論 6 481
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異也榄,居然都是意外死亡巡莹,警方通過查閱死者的電腦和手機,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,442評論 2 382
  • 文/潘曉璐 我一進店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來降宅,“玉大人骂远,你說我怎么就攤上這事⊙” “怎么了激才?”我有些...
    開封第一講書人閱讀 152,878評論 0 344
  • 文/不壞的土叔 我叫張陵,是天一觀的道長唠雕。 經(jīng)常有香客問我贸营,道長,這世上最難降的妖魔是什么岩睁? 我笑而不...
    開封第一講書人閱讀 55,306評論 1 279
  • 正文 為了忘掉前任钞脂,我火速辦了婚禮,結果婚禮上捕儒,老公的妹妹穿的比我還像新娘冰啃。我一直安慰自己,他們只是感情好刘莹,可當我...
    茶點故事閱讀 64,330評論 5 373
  • 文/花漫 我一把揭開白布阎毅。 她就那樣靜靜地躺著,像睡著了一般点弯。 火紅的嫁衣襯著肌膚如雪扇调。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 49,071評論 1 285
  • 那天抢肛,我揣著相機與錄音狼钮,去河邊找鬼。 笑死捡絮,一個胖子當著我的面吹牛熬芜,可吹牛的內容都是我干的。 我是一名探鬼主播福稳,決...
    沈念sama閱讀 38,382評論 3 400
  • 文/蒼蘭香墨 我猛地睜開眼涎拉,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了的圆?” 一聲冷哼從身側響起鼓拧,我...
    開封第一講書人閱讀 37,006評論 0 259
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎越妈,沒想到半個月后毁枯,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 43,512評論 1 300
  • 正文 獨居荒郊野嶺守林人離奇死亡叮称,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內容為張勛視角 年9月15日...
    茶點故事閱讀 35,965評論 2 325
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片瓤檐。...
    茶點故事閱讀 38,094評論 1 333
  • 序言:一個原本活蹦亂跳的男人離奇死亡赂韵,死狀恐怖,靈堂內的尸體忽然破棺而出挠蛉,到底是詐尸還是另有隱情祭示,我是刑警寧澤,帶...
    沈念sama閱讀 33,732評論 4 323
  • 正文 年R本政府宣布谴古,位于F島的核電站质涛,受9級特大地震影響,放射性物質發(fā)生泄漏掰担。R本人自食惡果不足惜汇陆,卻給世界環(huán)境...
    茶點故事閱讀 39,283評論 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望带饱。 院中可真熱鬧毡代,春花似錦、人聲如沸勺疼。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,286評論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽执庐。三九已至酪耕,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間轨淌,已是汗流浹背迂烁。 一陣腳步聲響...
    開封第一講書人閱讀 31,512評論 1 262
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留猿诸,地道東北人婚被。 一個月前我還...
    沈念sama閱讀 45,536評論 2 354
  • 正文 我出身青樓,卻偏偏與公主長得像梳虽,于是被迫代替她去往敵國和親址芯。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當晚...
    茶點故事閱讀 42,828評論 2 345

推薦閱讀更多精彩內容