微服務(wù):剖析一下源碼,Nacos的健康檢查竟如此簡單

2021-07-13 20:04·Java架構(gòu)師聯(lián)盟

<article class="syl-article-base tt-article-content syl-page-article syl-device-pc" style="box-sizing: border-box; display: block; padding: 0px; text-align: justify; overflow-wrap: break-word; word-break: break-word; overflow: hidden; hyphens: auto; color: rgb(34, 34, 34); font-family: "PingFang SC", "Hiragino Sans GB", "Microsoft YaHei", "WenQuanYi Micro Hei", "Helvetica Neue", Arial, sans-serif; line-height: 1.667; font-size: 18px; margin-bottom: 20px; font-style: normal; font-variant-ligatures: normal; font-variant-caps: normal; font-weight: 400; letter-spacing: normal; orphans: 2; text-indent: 0px; text-transform: none; white-space: normal; widows: 2; word-spacing: 0px; -webkit-text-stroke-width: 0px; background-color: rgb(255, 255, 255); text-decoration-thickness: initial; text-decoration-style: initial; text-decoration-color: initial;">

image.png

前面我們多次提到Nacos的健康檢查宪萄,比如《微服務(wù)之:服務(wù)掛的太干脆艺谆,Nacos還沒反應(yīng)過來,怎么辦拜英?》一文中還對健康檢查進行了自定義調(diào)優(yōu)静汤。那么,Nacos的健康檢查和心跳機制到底是如何實現(xiàn)的呢居凶?在項目實踐中是否又可以參考Nacos的健康檢查機制虫给,運用于其他地方呢?

這篇文章侠碧,就帶大家來揭開Nacos健康檢查機制的面紗抹估。

Nacos的健康檢查

Nacos中臨時實例基于心跳上報方式維持活性,基本的健康檢查流程基本如下:Nacos客戶端會維護一個定時任務(wù)弄兜,每隔5秒發(fā)送一次心跳請求药蜻,以確保自己處于活躍狀態(tài)。Nacos服務(wù)端在15秒內(nèi)如果沒收到客戶端的心跳請求替饿,會將該實例設(shè)置為不健康语泽,在30秒內(nèi)沒收到心跳,會將這個臨時實例摘除视卢。

原理很簡單踱卵,關(guān)于代碼層的實現(xiàn),下面來就逐步來進行解析据过。

客戶端的心跳

實例基于心跳上報的形式來維持活性惋砂,當然就離不開心跳功能的實現(xiàn)了。這里以客戶端心跳實現(xiàn)為基準來進行分析绳锅。

Spring Cloud提供了一個標準接口ServiceRegistry西饵,Nacos對應(yīng)的實現(xiàn)類為NacosServiceRegistry。Spring Cloud項目啟動時會實例化NacosServiceRegistry榨呆,并調(diào)用它的register方法來進行實例的注冊罗标。

</article>

@Override
public void register(Registration registration) { 
   // ...
   NamingService namingService = namingService();
   String serviceId = registration.getServiceId();
   String group = nacosDiscoveryProperties.getGroup();

   Instance instance = getNacosInstanceFromRegistration(registration);

   try {
      namingService.registerInstance(serviceId, group, instance);
      log.info("nacos registry, {} {} {}:{} register finished", group, serviceId,
            instance.getIp(), instance.getPort());
   }catch (Exception e) {
      // ...
   }
}

在該方法中有兩處需要注意,第一處是構(gòu)建Instance的
getNacosInstanceFromRegistration方法积蜻,該方法內(nèi)會設(shè)置Instance的元數(shù)據(jù)(metadata)闯割,通過源元數(shù)據(jù)可以配置服務(wù)器端健康檢查的參數(shù)。比如竿拆,在Spring Cloud中配置的如下參數(shù)宙拉,都可以通過元數(shù)據(jù)項在服務(wù)注冊時傳遞給Nacos的服務(wù)端。

spring:
  application:
    name: user-service-provider
  cloud:
    nacos:
      discovery:
        server-addr: 127.0.0.1:8848
        heart-beat-interval: 5000
        heart-beat-timeout: 15000
       ip-delete-timeout: 30000

其中的heart-beat-interval丙笋、heart-beat-timeout谢澈、ip-delete-timeout這些健康檢查的參數(shù)煌贴,都是基于元數(shù)據(jù)上報上去的。

register方法的第二處就是調(diào)用NamingService#registerInstance來進行實例的注冊锥忿。NamingService是由Nacos的客戶端提供牛郑,也就是說Nacos客戶端的心跳本身是由Nacos生態(tài)提供的。

在registerInstance方法中最終會調(diào)用到下面的方法:

@Override
public void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException {
    NamingUtils.checkInstanceIsLegal(instance);
    String groupedServiceName = NamingUtils.getGroupedName(serviceName, groupName);
    if (instance.isEphemeral()) {
        BeatInfo beatInfo = beatReactor.buildBeatInfo(groupedServiceName, instance);
        beatReactor.addBeatInfo(groupedServiceName, beatInfo);
    }
    serverProxy.registerService(groupedServiceName, groupName, instance);
}

其中BeatInfo#addBeatInfo便是進行心跳處理的入口敬鬓。當然淹朋,前提條件是當前的實例需要是臨時(瞬時)實例。

對應(yīng)的方法實現(xiàn)如下:

public void addBeatInfo(String serviceName, BeatInfo beatInfo) {
    NAMING_LOGGER.info("[BEAT] adding beat: {} to beat map.", beatInfo);
    String key = buildKey(serviceName, beatInfo.getIp(), beatInfo.getPort());
    BeatInfo existBeat = null;
    //fix #1733
    if ((existBeat = dom2Beat.remove(key)) != null) {
        existBeat.setStopped(true);
    }
    dom2Beat.put(key, beatInfo);
    executorService.schedule(new BeatTask(beatInfo), beatInfo.getPeriod(), TimeUnit.MILLISECONDS);
    MetricsMonitor.getDom2BeatSizeMonitor().set(dom2Beat.size());
}

在倒數(shù)第二行可以看到钉答,客戶端是通過定時任務(wù)來處理心跳的础芍,具體的心跳請求有BeatTask完成。定時任務(wù)的執(zhí)行頻次数尿,封裝在BeatInfo仑性,回退往上看,會發(fā)現(xiàn)BeatInfo的Period來源于Instance#
getInstanceHeartBeatInterval()右蹦。該方法具體實現(xiàn)如下:

public long getInstanceHeartBeatInterval() {
    return this.getMetaDataByKeyWithDefault("preserved.heart.beat.interval", Constants.DEFAULT_HEART_BEAT_INTERVAL);
}

可以看出定時任務(wù)的執(zhí)行間隔就是配置的metadata中的數(shù)據(jù)
preserved.heart.beat.interval诊杆,與上面提到配置heart-beat-interval本質(zhì)是一回事,默認是5秒嫩实。

BeatTask類具體實現(xiàn)如下:

class BeatTask implements Runnable {
    
    BeatInfo beatInfo;
    
    public BeatTask(BeatInfo beatInfo) {
        this.beatInfo = beatInfo;
    }
    
    @Override
    public void run() {
        if (beatInfo.isStopped()) {
            return;
        }
        long nextTime = beatInfo.getPeriod();
        try {
            JsonNode result = serverProxy.sendBeat(beatInfo, BeatReactor.this.lightBeatEnabled);
            long interval = result.get("clientBeatInterval").asLong();
            boolean lightBeatEnabled = false;
            if (result.has(CommonParams.LIGHT_BEAT_ENABLED)) {
                lightBeatEnabled = result.get(CommonParams.LIGHT_BEAT_ENABLED).asBoolean();
            }
            BeatReactor.this.lightBeatEnabled = lightBeatEnabled;
            if (interval > 0) {
                nextTime = interval;
            }
            int code = NamingResponseCode.OK;
            if (result.has(CommonParams.CODE)) {
                code = result.get(CommonParams.CODE).asInt();
            }
            if (code == NamingResponseCode.RESOURCE_NOT_FOUND) {
                Instance instance = new Instance();
                instance.setPort(beatInfo.getPort());
                instance.setIp(beatInfo.getIp());
                instance.setWeight(beatInfo.getWeight());
                instance.setMetadata(beatInfo.getMetadata());
                instance.setClusterName(beatInfo.getCluster());
                instance.setServiceName(beatInfo.getServiceName());
                instance.setInstanceId(instance.getInstanceId());
                instance.setEphemeral(true);
                try {
                    serverProxy.registerService(beatInfo.getServiceName(),
                            NamingUtils.getGroupName(beatInfo.getServiceName()), instance);
                } catch (Exception ignore) {
                }
            }
        } catch (NacosException ex) {
            NAMING_LOGGER.error("[CLIENT-BEAT] failed to send beat: {}, code: {}, msg: {}",
                    JacksonUtils.toJson(beatInfo), ex.getErrCode(), ex.getErrMsg());
            
        }
        executorService.schedule(new BeatTask(beatInfo), nextTime, TimeUnit.MILLISECONDS);
    }
}

在run方法中通過NamingProxy#sendBeat完成了心跳請求的發(fā)送刽辙,而在run方法的最后窥岩,再次開啟了一個定時任務(wù)甲献,這樣周期性的進行心跳請求。

NamingProxy#sendBeat方法實現(xiàn)如下:

public JsonNode sendBeat(BeatInfo beatInfo, boolean lightBeatEnabled) throws NacosException {
    
    if (NAMING_LOGGER.isDebugEnabled()) {
        NAMING_LOGGER.debug("[BEAT] {} sending beat to server: {}", namespaceId, beatInfo.toString());
    }
    Map<String, String> params = new HashMap<String, String>(8);
    Map<String, String> bodyMap = new HashMap<String, String>(2);
    if (!lightBeatEnabled) {
        bodyMap.put("beat", JacksonUtils.toJson(beatInfo));
    }
    params.put(CommonParams.NAMESPACE_ID, namespaceId);
    params.put(CommonParams.SERVICE_NAME, beatInfo.getServiceName());
    params.put(CommonParams.CLUSTER_NAME, beatInfo.getCluster());
    params.put("ip", beatInfo.getIp());
    params.put("port", String.valueOf(beatInfo.getPort()));
    String result = reqApi(UtilAndComs.nacosUrlBase + "/instance/beat", params, bodyMap, HttpMethod.PUT);
    return JacksonUtils.toObj(result);
}

實際上颂翼,就是調(diào)用了Nacos服務(wù)端提供的"
/nacos/v1/ns/instance/beat"服務(wù)晃洒。

在客戶端的常量類Constants中定義了心跳相關(guān)的默認參數(shù):

static {
    DEFAULT_HEART_BEAT_TIMEOUT = TimeUnit.SECONDS.toMillis(15L);
    DEFAULT_IP_DELETE_TIMEOUT = TimeUnit.SECONDS.toMillis(30L);
    DEFAULT_HEART_BEAT_INTERVAL = TimeUnit.SECONDS.toMillis(5L);
}

這樣就呼應(yīng)了最開始說的Nacos健康檢查機制的幾個時間維度。

服務(wù)端接收心跳
分析客戶端的過程中已經(jīng)可以看出請求的是
/nacos/v1/ns/instance/beat這個服務(wù)朦乏。Nacos服務(wù)端是在Naming項目中的InstanceController中實現(xiàn)的球及。

@CanDistro
@PutMapping("/beat")
@Secured(parser = NamingResourceParser.class, action = ActionTypes.WRITE)
public ObjectNode beat(HttpServletRequest request) throws Exception {

    // ...
    Instance instance = serviceManager.getInstance(namespaceId, serviceName, clusterName, ip, port);

    if (instance == null) {
        // ...
        instance = new Instance();
        instance.setPort(clientBeat.getPort());
        instance.setIp(clientBeat.getIp());
        instance.setWeight(clientBeat.getWeight());
        instance.setMetadata(clientBeat.getMetadata());
        instance.setClusterName(clusterName);
        instance.setServiceName(serviceName);
        instance.setInstanceId(instance.getInstanceId());
        instance.setEphemeral(clientBeat.isEphemeral());

        serviceManager.registerInstance(namespaceId, serviceName, instance);
    }

    Service service = serviceManager.getService(namespaceId, serviceName);
    // ...
    service.processClientBeat(clientBeat);
    // ...
    return result;
}

服務(wù)端在接收到請求時,主要做了兩件事:第一呻疹,如果發(fā)送心跳的實例不存在吃引,則將其進行注冊;第二刽锤,調(diào)用其Service的processClientBeat方法進行心跳處理镊尺。

processClientBeat方法實現(xiàn)如下:

public void processClientBeat(final RsInfo rsInfo) {
    ClientBeatProcessor clientBeatProcessor = new ClientBeatProcessor();
    clientBeatProcessor.setService(this);
    clientBeatProcessor.setRsInfo(rsInfo);
    HealthCheckReactor.scheduleNow(clientBeatProcessor);
}

ClientBeatProcessor同樣是一個實現(xiàn)了Runnable的Task,通過HealthCheckReactor定義的scheduleNow方法進行立即執(zhí)行并思。

scheduleNow方法實現(xiàn):

public static ScheduledFuture<?> scheduleNow(Runnable task) {
    return GlobalExecutor.scheduleNamingHealth(task, 0, TimeUnit.MILLISECONDS);
}

再來看看ClientBeatProcessor中對具體任務(wù)的實現(xiàn):

@Override
public void run() {
    Service service = this.service;
    // logging    
    String ip = rsInfo.getIp();
    String clusterName = rsInfo.getCluster();
    int port = rsInfo.getPort();
    Cluster cluster = service.getClusterMap().get(clusterName);
    List<Instance> instances = cluster.allIPs(true);
    
    for (Instance instance : instances) {
        if (instance.getIp().equals(ip) && instance.getPort() == port) {
            // logging
            instance.setLastBeat(System.currentTimeMillis());
            if (!instance.isMarked()) {
                if (!instance.isHealthy()) {
                    instance.setHealthy(true);
                    // logging
                    getPushService().serviceChanged(service);
                }
            }
        }
    }
}

在run方法中先檢查了發(fā)送心跳的實例和IP是否一致庐氮,如果一致則更新最后一次心跳時間弄砍。同時仙畦,如果該實例之前未被標記且處于不健康狀態(tài)音婶,則將其改為健康狀態(tài),并將變動通過PushService提供事件機制進行發(fā)布衣式。事件是由Spring的ApplicationContext進行發(fā)布,事件為ServiceChangeEvent瞳收。

通過上述心跳操作,Nacos服務(wù)端的實例的健康狀態(tài)和最后心跳時間已經(jīng)被刷新螟深。那么,如果沒有收到心跳時界弧,服務(wù)器端又是如何判斷呢?

服務(wù)端心跳檢查
客戶端發(fā)起心跳垢箕,服務(wù)器端來檢查客戶端的心跳是否正常,或者說對應(yīng)的實例中的心跳更新時間是否正常条获。

服務(wù)器端心跳的觸發(fā)是在服務(wù)實例注冊時觸發(fā)的,同樣在InstanceController中帅掘,register注冊實現(xiàn)如下:

@CanDistro
@PostMapping
@Secured(parser = NamingResourceParser.class, action = ActionTypes.WRITE)
public String register(HttpServletRequest request) throws Exception {
    // ...
    final Instance instance = parseInstance(request);

    serviceManager.registerInstance(namespaceId, serviceName, instance);
    return "ok";
}

ServiceManager#registerInstance實現(xiàn)代碼如下:

public void registerInstance(String namespaceId, String serviceName, Instance instance) throws NacosException {
    
    createEmptyService(namespaceId, serviceName, instance.isEphemeral());
    // ...
}

心跳相關(guān)實現(xiàn)在第一次創(chuàng)建空的Service中實現(xiàn),最終會調(diào)到如下方法:

public void createServiceIfAbsent(String namespaceId, String serviceName, boolean local, Cluster cluster)
        throws NacosException {
    Service service = getService(namespaceId, serviceName);
    if (service == null) {
        
        Loggers.SRV_LOG.info("creating empty service {}:{}", namespaceId, serviceName);
        service = new Service();
        service.setName(serviceName);
        service.setNamespaceId(namespaceId);
        service.setGroupName(NamingUtils.getGroupName(serviceName));
        // now validate the service. if failed, exception will be thrown
        service.setLastModifiedMillis(System.currentTimeMillis());
        service.recalculateChecksum();
        if (cluster != null) {
            cluster.setService(service);
            service.getClusterMap().put(cluster.getName(), cluster);
        }
        service.validate();
        
        putServiceAndInit(service);
        if (!local) {
            addOrReplaceService(service);
        }
    }
}

在putServiceAndInit方法中對Service進行初始化:

private void putServiceAndInit(Service service) throws NacosException {
    putService(service);
    service = getService(service.getNamespaceId(), service.getName());
    service.init();
    consistencyService
            .listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), true), service);
    consistencyService
            .listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), false), service);
    Loggers.SRV_LOG.info("[NEW-SERVICE] {}", service.toJson());
}

service.init()方法實現(xiàn):

public void init() {
    HealthCheckReactor.scheduleCheck(clientBeatCheckTask);
    for (Map.Entry<String, Cluster> entry : clusterMap.entrySet()) {
        entry.getValue().setService(this);
        entry.getValue().init();
    }
}

HealthCheckReactor#scheduleCheck方法實現(xiàn):

public static void scheduleCheck(ClientBeatCheckTask task) {
    futureMap.computeIfAbsent(task.taskKey(),
            k -> GlobalExecutor.scheduleNamingHealth(task, 5000, 5000, TimeUnit.MILLISECONDS));
}

延遲5秒執(zhí)行碧绞,每5秒檢查一次吱窝。

在init方法的第一行便可以看到執(zhí)行健康檢查的Task,具體Task是由ClientBeatCheckTask來實現(xiàn)院峡,對應(yīng)的run方法核心代碼如下:

@Override
public void run() {
    // ...        
    List<Instance> instances = service.allIPs(true);
    
    // first set health status of instances:
    for (Instance instance : instances) {
        if (System.currentTimeMillis() - instance.getLastBeat() > instance.getInstanceHeartBeatTimeOut()) {
            if (!instance.isMarked()) {
                if (instance.isHealthy()) {
                    instance.setHealthy(false);
                    // logging...
                    getPushService().serviceChanged(service);
                    ApplicationUtils.publishEvent(new InstanceHeartbeatTimeoutEvent(this, instance));
                }
            }
        }
    }
    
    if (!getGlobalConfig().isExpireInstance()) {
        return;
    }
    
    // then remove obsolete instances:
    for (Instance instance : instances) {
        
        if (instance.isMarked()) {
            continue;
        }
        
        if (System.currentTimeMillis() - instance.getLastBeat() > instance.getIpDeleteTimeout()) {
            // delete instance
            deleteIp(instance);
        }
    }
}

在第一個for循環(huán)中,先判斷當前時間與上次心跳時間的間隔是否大于超時時間撕予。如果實例已經(jīng)超時鲫惶,且為被標記,且健康狀態(tài)為健康实抡,則將健康狀態(tài)設(shè)置為不健康欠母,同時發(fā)布狀態(tài)變化的事件欢策。

在第二個for循環(huán)中,如果實例已經(jīng)被標記則跳出循環(huán)赏淌。如果未標記壁熄,同時當前時間與上次心跳時間的間隔大于刪除IP時間挺勿,則將對應(yīng)的實例刪除晨继。

小結(jié)
通過本文的源碼分析凰盔,我們從Spring Cloud開始,追蹤到Nacos Client中的心跳時間掷贾,再追蹤到Nacos服務(wù)端接收心跳的實現(xiàn)和檢查實例是否健康的實現(xiàn)睛榄。想必通過整個源碼的梳理,你已經(jīng)對整個Nacos心跳的實現(xiàn)有所了解想帅。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末场靴,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子港准,更是在濱河造成了極大的恐慌旨剥,老刑警劉巖,帶你破解...
    沈念sama閱讀 218,941評論 6 508
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件浅缸,死亡現(xiàn)場離奇詭異轨帜,居然都是意外死亡,警方通過查閱死者的電腦和手機衩椒,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,397評論 3 395
  • 文/潘曉璐 我一進店門蚌父,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人烟具,你說我怎么就攤上這事梢什。” “怎么了朝聋?”我有些...
    開封第一講書人閱讀 165,345評論 0 356
  • 文/不壞的土叔 我叫張陵,是天一觀的道長囤躁。 經(jīng)常有香客問我冀痕,道長,這世上最難降的妖魔是什么狸演? 我笑而不...
    開封第一講書人閱讀 58,851評論 1 295
  • 正文 為了忘掉前任言蛇,我火速辦了婚禮宵距,結(jié)果婚禮上满哪,老公的妹妹穿的比我還像新娘劝篷。我一直安慰自己娇妓,他們只是感情好哈恰,可當我...
    茶點故事閱讀 67,868評論 6 392
  • 文/花漫 我一把揭開白布着绷。 她就那樣靜靜地躺著蓬戚,像睡著了一般宾抓。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上幢泼,一...
    開封第一講書人閱讀 51,688評論 1 305
  • 那天缕棵,我揣著相機與錄音招驴,去河邊找鬼别厘。 笑死拥诡,一個胖子當著我的面吹牛渴肉,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播披蕉,決...
    沈念sama閱讀 40,414評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼眯娱,長吁一口氣:“原來是場噩夢啊……” “哼食零!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起娜搂,我...
    開封第一講書人閱讀 39,319評論 0 276
  • 序言:老撾萬榮一對情侶失蹤百宇,失蹤者是張志新(化名)和其女友劉穎携御,沒想到半個月后啄刹,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體凄贩,經(jīng)...
    沈念sama閱讀 45,775評論 1 315
  • 正文 獨居荒郊野嶺守林人離奇死亡疲扎,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,945評論 3 336
  • 正文 我和宋清朗相戀三年椒丧,在試婚紗的時候發(fā)現(xiàn)自己被綠了壶熏。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 40,096評論 1 350
  • 序言:一個原本活蹦亂跳的男人離奇死亡俄占,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出祝拯,到底是詐尸還是另有隱情,我是刑警寧澤鹰贵,帶...
    沈念sama閱讀 35,789評論 5 346
  • 正文 年R本政府宣布碉输,位于F島的核電站敷钾,受9級特大地震影響肄梨,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜侨赡,卻給世界環(huán)境...
    茶點故事閱讀 41,437評論 3 331
  • 文/蒙蒙 一羊壹、第九天 我趴在偏房一處隱蔽的房頂上張望油猫。 院中可真熱鬧尔店,春花似錦、人聲如沸鲫售。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,993評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽涎嚼。三九已至,卻和暖如春苔货,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背夜惭。 一陣腳步聲響...
    開封第一講書人閱讀 33,107評論 1 271
  • 我被黑心中介騙來泰國打工产喉, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留敢会,地道東北人。 一個月前我還...
    沈念sama閱讀 48,308評論 3 372
  • 正文 我出身青樓晦譬,卻偏偏與公主長得像互广,于是被迫代替她去往敵國和親惫皱。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當晚...
    茶點故事閱讀 45,037評論 2 355

推薦閱讀更多精彩內(nèi)容