深入理解Eureka-Client 發(fā)送心跳(三)

DiscoverClient

com.netflix.discovery.DiscoveryClient ,使用的@Inject //google guice 注入遵循 JSR-330規(guī)范

@Inject
DiscoveryClient(ApplicationInfoManager applicationInfoManager, EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args,
                Provider<BackupRegistry> backupRegistryProvider) {
     
    // 省略N多代碼
    // 初始化定時(shí)器信息
    initScheduledTasks();
    
}
private void initScheduledTasks() {
    // 省略N多代碼论矾。。丙躏。
    if (clientConfig.shouldRegisterWithEureka()) {
        int renewalIntervalInSecs = instanceInfo.getLeaseInfo().getRenewalIntervalInSecs();
        int expBackOffBound = clientConfig.getHeartbeatExecutorExponentialBackOffBound();
        logger.info("Starting heartbeat executor: " + "renew interval is: " + renewalIntervalInSecs);
 
        // 在這里后频,初始化一個(gè)定時(shí)器任務(wù)
        scheduler.schedule(
                new TimedSupervisorTask(
                        "heartbeat",
                        scheduler,
                        heartbeatExecutor,
                        renewalIntervalInSecs,
                        TimeUnit.SECONDS,
                        expBackOffBound,
                        new HeartbeatThread()
                ),
                renewalIntervalInSecs, TimeUnit.SECONDS);
 
        // 省略N多代碼。莱革。献丑。
    } else {
        logger.info("Not registering with Eureka server per configuration");
    }
}

由上可以看出悟民,在DiscoverClient這個(gè)類初始化的時(shí)候厕氨,會(huì)初始化定期任務(wù)进每,每30秒執(zhí)行一次,用來發(fā)送心跳

HeartbeatThread

這個(gè)是用來續(xù)約的線程命斧,主要看其run方法田晚,

private class HeartbeatThread implements Runnable {
 
    public void run() {
        if (renew()) {
            // 更新最后一次心跳的時(shí)間
            lastSuccessfulHeartbeatTimestamp = System.currentTimeMillis();
        }
    }
}
// 續(xù)約的主方法
boolean renew() {
    EurekaHttpResponse<InstanceInfo> httpResponse;
    try {
        httpResponse = eurekaTransport.registrationClient.sendHeartBeat(instanceInfo.getAppName(), instanceInfo.getId(), instanceInfo, null);
        logger.debug("{} - Heartbeat status: {}", PREFIX + appPathIdentifier, httpResponse.getStatusCode());
        if (httpResponse.getStatusCode() == 404) {
            REREGISTER_COUNTER.increment();
            logger.info("{} - Re-registering apps/{}", PREFIX + appPathIdentifier, instanceInfo.getAppName());
            return register();
        }
        return httpResponse.getStatusCode() == 200;
    } catch (Throwable e) {
        logger.error("{} - was unable to send heartbeat!", PREFIX + appPathIdentifier, e);
        return false;
    }
}

上面的代碼很簡單,主要就是啟動(dòng)一個(gè)線程国葬,然后線程執(zhí)行renew()方法贤徒, 最終發(fā)送心跳給Eureka-Server

接口地址: apps/ + appName + /' + id ,

如果接口返回值為404胃惜,就是說不存在泞莉,從來沒有注冊過,那么重新走注冊流程

lastDirtyTimestamp

即該instance在client端最后被修改的時(shí)間戳

Eureka-Server接收心跳

InstanceResource

@PUT
public Response renewLease(
        @HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication,
        @QueryParam("overriddenstatus") String overriddenStatus,
        @QueryParam("status") String status,
        @QueryParam("lastDirtyTimestamp") String lastDirtyTimestamp) {
    boolean isFromReplicaNode = "true".equals(isReplication);
    // 續(xù)約
    boolean isSuccess = registry.renew(app.getName(), id, isFromReplicaNode);
    // 續(xù)約失敗
    // Not found in the registry, immediately ask for a register
    if (!isSuccess) {
        logger.warn("Not Found (Renew): {} - {}", app.getName(), id);
        return Response.status(Status.NOT_FOUND).build();
    }
    // Check if we need to sync based on dirty time stamp, the client
    // instance might have changed some value
    Response response = null;
    // 比較lastDirtyTimestamp 
    if (lastDirtyTimestamp != null && serverConfig.shouldSyncWhenTimestampDiffers()) {
        // 比較lastDirtyTimestamp的大小船殉,這個(gè)還是比較重要的
        response = this.validateDirtyTimestamp(Long.valueOf(lastDirtyTimestamp), isFromReplicaNode);
        if (response.getStatus() == Response.Status.NOT_FOUND.getStatusCode()
                && (overriddenStatus != null)
                && !(InstanceStatus.UNKNOWN.name().equals(overriddenStatus))
                && isFromReplicaNode) {
            registry.storeOverriddenStatusIfRequired(app.getAppName(), id, InstanceStatus.valueOf(overriddenStatus));
        }
    } else {
        response = Response.ok().build();
    }
    logger.debug("Found (Renew): {} - {}; reply status={}" + app.getName(), id, response.getStatus());
    return response;
}
 
 
 
 
 
 
private Response validateDirtyTimestamp(Long lastDirtyTimestamp,
                                        boolean isReplication) {
    // 獲取本機(jī)的instance實(shí)例信息
    InstanceInfo appInfo = registry.getInstanceByAppAndId(app.getName(), id, false);
    if (appInfo != null) {
        //如果lastDirtyTimestamp不為空,并且lastDirtyTimestamp和本地的不相等
        if ((lastDirtyTimestamp != null) && (!lastDirtyTimestamp.equals(appInfo.getLastDirtyTimestamp()))) {
            Object[] args = {id, appInfo.getLastDirtyTimestamp(), lastDirtyTimestamp, isReplication};
            // lastDirtyTimestamp>本地的時(shí)間斯嚎,則認(rèn)為當(dāng)前實(shí)例是無效的利虫,返回404錯(cuò)誤,客戶端重新發(fā)起注冊
            if (lastDirtyTimestamp > appInfo.getLastDirtyTimestamp()) {
                logger.debug(
                        "Time to sync, since the last dirty timestamp differs -"
                                + " ReplicationInstance id : {},Registry : {} Incoming: {} Replication: {}",
                        args);
                return Response.status(Status.NOT_FOUND).build();
            } else if (appInfo.getLastDirtyTimestamp() > lastDirtyTimestamp) {
                // 如果是集群同步請求堡僻,本地的時(shí)間糠惫,大于客戶端傳過來的時(shí)間,則返回 “沖突” 這個(gè)狀態(tài)回去钉疫,以本地的時(shí)間大的為準(zhǔn)
                if (isReplication) {
                    logger.debug(
                            "Time to sync, since the last dirty timestamp differs -"
                                    + " ReplicationInstance id : {},Registry : {} Incoming: {} Replication: {}",
                            args);
                    return Response.status(Status.CONFLICT).entity(appInfo).build();
                } else {
                    return Response.ok().build();
                }
            }
        }
 
    }
    return Response.ok().build();
}

代碼說明:

1.lastDirtyTimestamp 是客戶端向服務(wù)端發(fā)請求的版本號(hào) 硼讽, 一切請求都以版本號(hào)大的為準(zhǔn)。牲阁, 如: 注冊

2.在調(diào)用續(xù)約的方法之后固阁,Eureka Server 會(huì)對請求過來的lastDirtyTimestamp和本地的做對比壤躲, 如果

請求lastDirtyTimestamp>本地的時(shí)間,則認(rèn)為當(dāng)前實(shí)例是無效的备燃,返回404錯(cuò)誤碉克,客戶端重新發(fā)起注冊。

3.如果是集群同步請求并齐,本地的時(shí)間漏麦,大于其他Eureka Server傳過來的時(shí)間,則返回 “沖突” 這個(gè)狀態(tài)回去况褪,

以本地的時(shí)間大的為準(zhǔn)撕贞,注意是集群同步請求,如果是客戶端傳過的测垛,是不會(huì)有這個(gè)規(guī)則的捏膨。

應(yīng)用續(xù)約

//PeerAwareInstanceRegistryImpl.java
public boolean renew(final String appName, final String id, final boolean isReplication) {
    // 執(zhí)行續(xù)約操作
    if (super.renew(appName, id, isReplication)) {
        // 同步Eureka-Server集群
        replicateToPeers(Action.Heartbeat, appName, id, null, null, isReplication);
        return true;
    }
    return false;
}


//AbstractInstanceRegistry.java
public boolean renew(String appName, String id, boolean isReplication) {
    // 增加續(xù)約次數(shù)到統(tǒng)計(jì)枚舉
    RENEW.increment(isReplication);
    // 從Eureka-Server端本地的CurrentHashMap中,通過appName獲取Lease信息
    Map<String, Lease<InstanceInfo>> gMap = registry.get(appName);
    Lease<InstanceInfo> leaseToRenew = null;
    if (gMap != null) {
        leaseToRenew = gMap.get(id);
    }
    // lease為空赐纱,lease在第一次注冊的時(shí)候會(huì)創(chuàng)建脊奋,為空,則表示從來沒有注冊過疙描,租約不存在
    if (leaseToRenew == null) {
        RENEW_NOT_FOUND.increment(isReplication);
        logger.warn("DS: Registry: lease doesn't exist, registering resource: {} - {}", appName, id);
        return false;
    } else {
        // 獲取lease里面的instance信息
        InstanceInfo instanceInfo = leaseToRenew.getHolder();
        if (instanceInfo != null) {
            // touchASGCache(instanceInfo.getASGName());
            // 一系列狀態(tài)判斷诚隙,目前還不是很清楚,但是不影響主流程
            InstanceStatus overriddenInstanceStatus = this.getOverriddenInstanceStatus(
                    instanceInfo, leaseToRenew, isReplication);
            if (overriddenInstanceStatus == InstanceStatus.UNKNOWN) {
                logger.info("Instance status UNKNOWN possibly due to deleted override for instance {}"
                        + "; re-register required", instanceInfo.getId());
                RENEW_NOT_FOUND.increment(isReplication);
                return false;
            }
            if (!instanceInfo.getStatus().equals(overriddenInstanceStatus)) {
                Object[] args = {
                        instanceInfo.getStatus().name(),
                        instanceInfo.getOverriddenStatus().name(),
                        instanceInfo.getId()
                };
                logger.info(
                        "The instance status {} is different from overridden instance status {} for instance {}. "
                                + "Hence setting the status to overridden status", args);
                instanceInfo.setStatus(overriddenInstanceStatus);
            }
        }
        // 設(shè)置每分鐘的續(xù)約次數(shù)
        renewsLastMin.increment();
        // 續(xù)約
        leaseToRenew.renew();
        return true;
    }
}

從上面可以看到整個(gè)續(xù)約過程起胰,主要就是從本地的CurrentHashMap中獲取租約信息久又, 獲取到了之后,設(shè)置每分鐘的續(xù)約次數(shù)以及續(xù)約時(shí)間效五。
renewsLastMin.increment()地消, 這個(gè)里面。主要是更新一個(gè)currentBucket的變量畏妖,類型為AtomicLong 脉执, 同時(shí)有個(gè)定時(shí)器一分鐘去更新一次。一分鐘之后戒劫,這個(gè)值會(huì)重新設(shè)置為0 半夷。

leaseToRenew.renew() , 更新lastUpdateTimestamp, duration默認(rèn)為90秒

//Lease.java
public void renew() {
    lastUpdateTimestamp = System.currentTimeMillis() + duration;
 
}

總結(jié):

在一下三種情況,續(xù)約是返回404 迅细, 需要客戶端重新發(fā)起注冊的巫橄。

1.當(dāng)客戶端的lastDirtyTimestamp> 大于服務(wù)端的instance的lastDirtyTimestamp時(shí)候,會(huì)認(rèn)為服務(wù)端

的信息是無效的茵典,因此無法續(xù)約湘换,需要重新發(fā)起注冊請求。

2.服務(wù)端的注冊信息不存在

3.服務(wù)端的instance的status = UNKONW, 為什么會(huì)出現(xiàn)UNKONW這個(gè)狀態(tài)呢彩倚,因?yàn)樵赿eleteStatusOverride

的時(shí)候存在傳入U(xiǎn)NKONW的可能性筹我。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市署恍,隨后出現(xiàn)的幾起案子崎溃,更是在濱河造成了極大的恐慌,老刑警劉巖盯质,帶你破解...
    沈念sama閱讀 222,590評論 6 517
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件袁串,死亡現(xiàn)場離奇詭異,居然都是意外死亡呼巷,警方通過查閱死者的電腦和手機(jī)囱修,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 95,157評論 3 399
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來王悍,“玉大人破镰,你說我怎么就攤上這事⊙勾ⅲ” “怎么了鲜漩?”我有些...
    開封第一講書人閱讀 169,301評論 0 362
  • 文/不壞的土叔 我叫張陵,是天一觀的道長集惋。 經(jīng)常有香客問我孕似,道長,這世上最難降的妖魔是什么刮刑? 我笑而不...
    開封第一講書人閱讀 60,078評論 1 300
  • 正文 為了忘掉前任喉祭,我火速辦了婚禮,結(jié)果婚禮上雷绢,老公的妹妹穿的比我還像新娘泛烙。我一直安慰自己,他們只是感情好翘紊,可當(dāng)我...
    茶點(diǎn)故事閱讀 69,082評論 6 398
  • 文/花漫 我一把揭開白布蔽氨。 她就那樣靜靜地躺著,像睡著了一般帆疟。 火紅的嫁衣襯著肌膚如雪孵滞。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 52,682評論 1 312
  • 那天鸯匹,我揣著相機(jī)與錄音,去河邊找鬼泄伪。 笑死殴蓬,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播染厅,決...
    沈念sama閱讀 41,155評論 3 422
  • 文/蒼蘭香墨 我猛地睜開眼痘绎,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了肖粮?” 一聲冷哼從身側(cè)響起孤页,我...
    開封第一講書人閱讀 40,098評論 0 277
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎涩馆,沒想到半個(gè)月后行施,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 46,638評論 1 319
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡魂那,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 38,701評論 3 342
  • 正文 我和宋清朗相戀三年蛾号,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片涯雅。...
    茶點(diǎn)故事閱讀 40,852評論 1 353
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡鲜结,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出活逆,到底是詐尸還是另有隱情精刷,我是刑警寧澤,帶...
    沈念sama閱讀 36,520評論 5 351
  • 正文 年R本政府宣布蔗候,位于F島的核電站怒允,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏琴庵。R本人自食惡果不足惜误算,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 42,181評論 3 335
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望迷殿。 院中可真熱鬧儿礼,春花似錦、人聲如沸庆寺。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,674評論 0 25
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽懦尝。三九已至知纷,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間陵霉,已是汗流浹背琅轧。 一陣腳步聲響...
    開封第一講書人閱讀 33,788評論 1 274
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留踊挠,地道東北人乍桂。 一個(gè)月前我還...
    沈念sama閱讀 49,279評論 3 379
  • 正文 我出身青樓,卻偏偏與公主長得像,于是被迫代替她去往敵國和親睹酌。 傳聞我的和親對象是個(gè)殘疾皇子权谁,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,851評論 2 361

推薦閱讀更多精彩內(nèi)容