深入理解Eureka Server集群同步(十)

集群?jiǎn)?dòng)同步

protected void initEurekaServerContext() throws Exception {
   
    // ....省略N多代碼
    // 同步信息
   int registryCount = this.registry.syncUp();
   // ....省略N多代碼
}

網(wǎng)上很多文章說(shuō)是調(diào)用syncUp這個(gè)方法去其他Eureka Server節(jié)點(diǎn)復(fù)制注冊(cè)信息邀跃,這個(gè)說(shuō)法不是很準(zhǔn)確, 在這個(gè)地方锅论,SyncUp()這個(gè)方法并不會(huì)去其他Eureka Server節(jié)點(diǎn)復(fù)制信息,而是從本地內(nèi)存里面獲取注冊(cè)信息, 看源碼就知道了拙寡。

public int syncUp() {
    // Copy entire entry from neighboring DS node
    // 獲取到的注冊(cè)節(jié)點(diǎn)數(shù)量
    int count = 0;
    // 如果count==0 , 那么默認(rèn)重試5次(前提是開啟了register-with-eureka = true,否則為0)
    for (int i = 0; ((i < serverConfig.getRegistrySyncRetries()) && (count == 0)); i++) {
        if (i > 0) {
            try {
                // 從第二次開始琳水,每次默認(rèn)沉睡30秒
                Thread.sleep(serverConfig.getRegistrySyncRetryWaitMs());
            } catch (InterruptedException e) {
                logger.warn("Interrupted during registry transfer..");
                break;
            }
        }
        // 從本地內(nèi)存里面獲取注冊(cè)實(shí)例信息
        Applications apps = eurekaClient.getApplications();
        for (Application app : apps.getRegisteredApplications()) {
            for (InstanceInfo instance : app.getInstances()) {
                try {
                    // 判斷是否可以注冊(cè)
                    if (isRegisterable(instance)) {
                        // 注冊(cè)到當(dāng)前Eureka Server里面
                        register(instance, instance.getLeaseInfo().getDurationInSecs(), true);
                        count++;
                    }
                } catch (Throwable t) {
                    logger.error("During DS init copy", t);
                }
            }
        }
    }
    return count;
}

參數(shù)說(shuō)明:

regirstrySyncRetries : 當(dāng)eureka服務(wù)器啟動(dòng)時(shí)嘗試去獲取集群里其他服務(wù)器上的注冊(cè)信息的次數(shù)肆糕,默認(rèn)為5,

只有當(dāng) eureka.client.register-with-eureka = true 的時(shí)候才會(huì)是5在孝,如果是false 诚啃,則為0

registrySyncRetryWaitMs : 當(dāng)eureka服務(wù)器啟動(dòng)時(shí)獲取其他服務(wù)器的注冊(cè)信息失敗時(shí),會(huì)再次嘗試獲取私沮,期間需要等待的時(shí)間始赎,默認(rèn)為30 * 1000毫秒

count : 獲取到的注冊(cè)實(shí)例數(shù)量,如果為0 則根據(jù)重試次數(shù)進(jìn)行重試仔燕,每次重試前沉默 30秒

PS: 在之前的文章中 7. 深入理解Eureka 獲取注冊(cè)信息(七) 造垛,講過Eureka Client啟動(dòng)的時(shí)候默認(rèn)會(huì)自動(dòng)從Eureka Server獲取注冊(cè)信息, 要想Eureka Server在啟動(dòng)的時(shí)候可以同步其他集群節(jié)點(diǎn)的注冊(cè)信息晰搀,那么必須開啟客戶端配置

eureka.client.register-with-eureka = true    ## 是否作為一個(gè)Eureka Client 注冊(cè)到Eureka Server上去
eureka.client.fetch-registry = true              ## 是否需要從Eureka Server上拉取注冊(cè)信息到本地五辽。

只有開啟了上面兩個(gè)配置,那么集群節(jié)點(diǎn)在啟動(dòng)的時(shí)候外恕,會(huì)初始化Eureka Client端的配置 杆逗,會(huì)從其他Eureka Server拉取注冊(cè)信息到本地俄周,同時(shí)

在初始化Eureka Server的時(shí)候,會(huì)從本地內(nèi)存里面讀取 注冊(cè)信息髓迎,自動(dòng)注冊(cè)到本身的服務(wù)上

集群同步類型

public enum Action {
    Heartbeat, Register, Cancel, StatusUpdate, DeleteStatusOverride;

    private com.netflix.servo.monitor.Timer timer = Monitors.newTimer(this.name());

    public com.netflix.servo.monitor.Timer getTimer() {
        return this.timer;
    }
}


Heartbeat : 心跳續(xù)約

Register : 注冊(cè)

Cancel : 下線

StatusUpdate : 添加覆蓋狀態(tài)

DeleteStatusOverride : 刪除覆蓋狀態(tài)

發(fā)起同步

這里以注冊(cè)的代碼為例

@Override
public void register(final InstanceInfo info, final boolean isReplication) {
    int leaseDuration = Lease.DEFAULT_DURATION_IN_SECS;
    if (info.getLeaseInfo() != null && info.getLeaseInfo().getDurationInSecs() > 0) {
        leaseDuration = info.getLeaseInfo().getDurationInSecs();
    }
    // 發(fā)起注冊(cè)
    super.register(info, leaseDuration, isReplication);
    // 注冊(cè)完成后峦朗,在這里發(fā)起同步,同步類型為Register
    replicateToPeers(Action.Register, info.getAppName(), info.getId(), info, null, isReplication);
}
private void replicateToPeers(Action action, String appName, String id,
                              InstanceInfo info /* optional */,
                              InstanceStatus newStatus /* optional */, boolean isReplication) {
    Stopwatch tracer = action.getTimer().start();
    try {
        // 判斷是否是集群同步請(qǐng)求排龄,如果是波势,則記錄最后一分鐘的同步次數(shù)
        if (isReplication) {
            numberOfReplicationsLastMin.increment();
        }
        // If it is a replication already, do not replicate again as this will create a poison replication
        // 集群節(jié)點(diǎn)為空,或者這是一個(gè)Eureka Server 同步請(qǐng)求橄维,直接return
        if (peerEurekaNodes == Collections.EMPTY_LIST || isReplication) {
            return;
        }
        // 循環(huán)相鄰的Eureka Server Node尺铣, 分別發(fā)起請(qǐng)求同步
        for (final PeerEurekaNode node : peerEurekaNodes.getPeerEurekaNodes()) {
            // 判斷是否是自身的URL,過濾掉
            if (peerEurekaNodes.isThisMyUrl(node.getServiceUrl())) {
                continue;
            }
            // 發(fā)起同步請(qǐng)求
            replicateInstanceActionsToPeers(action, appName, id, info, newStatus, node);
        }
    } finally {
        tracer.stop();
    }
}

步驟說(shuō)明:

1.判斷集群節(jié)點(diǎn)是否為空争舞,為空則返回

2.isReplication 代表是否是一個(gè)復(fù)制請(qǐng)求凛忿, isReplication = true 表示是其他Eureka Server發(fā)過來(lái)的同步請(qǐng)求

這個(gè)時(shí)候是不需要繼續(xù)往下同步的。否則會(huì)陷入同步死循環(huán)

3.循環(huán)集群節(jié)點(diǎn)竞川,過濾掉自身的節(jié)點(diǎn)

4.發(fā)起同步請(qǐng)求 店溢,調(diào)用replicateInstanceActionsToPeers

PS: 這里提到了PeerEurekaNode , 對(duì)于PeerEurekaNodes的集群節(jié)點(diǎn)更新及數(shù)據(jù)讀取委乌,可以看這個(gè)1. 深入理解Eureka Server啟動(dòng)(一)在服務(wù)啟動(dòng)的時(shí)候床牧,對(duì)PeerEurekaNodes集群開啟了線程更新集群節(jié)點(diǎn)信息。每15分鐘一次

private void replicateInstanceActionsToPeers(Action action, String appName,
                                             String id, InstanceInfo info, InstanceStatus newStatus,
                                             PeerEurekaNode node) {
    try {
        InstanceInfo infoFromRegistry = null;
        CurrentRequestVersion.set(Version.V2);
        switch (action) {
            case Cancel: // 下線
                node.cancel(appName, id);
                break;
            case Heartbeat:
                // 心跳
                InstanceStatus overriddenStatus = overriddenInstanceStatusMap.get(id);
                // 獲取本地最新的實(shí)例信息
                infoFromRegistry = getInstanceByAppAndId(appName, id, false);
                node.heartbeat(appName, id, infoFromRegistry, overriddenStatus, false);
                break;
            case Register: // 注冊(cè)
                node.register(info);
                break;
            case StatusUpdate:  // 設(shè)置覆蓋狀態(tài)
                infoFromRegistry = getInstanceByAppAndId(appName, id, false);
                node.statusUpdate(appName, id, newStatus, infoFromRegistry);
                break;
            case DeleteStatusOverride: //刪除覆蓋狀態(tài)
                infoFromRegistry = getInstanceByAppAndId(appName, id, false);
                node.deleteStatusOverride(appName, id, infoFromRegistry);
                break;
        }
    } catch (Throwable t) {
        logger.error("Cannot replicate information to {} for action {}", node.getServiceUrl(), action.name(), t);
    }
}

這里直接看注冊(cè)遭贸,其他的原理上是一致的戈咳。

PeerEurekaNode的register方法如下。

public void register(final InstanceInfo info) throws Exception {
    long expiryTime = System.currentTimeMillis() + getLeaseRenewalOf(info);
    // 默認(rèn)采用的是批處理
    batchingDispatcher.process(
            taskId("register", info),
            new InstanceReplicationTask(targetHost, Action.Register, info, null, true) {
                public EurekaHttpResponse<Void> execute() {
                    return replicationClient.register(info);
                }
            },
            expiryTime
    );
}

默認(rèn)采用的是批量任務(wù)處理器壕吹,就是將task放入任務(wù)隊(duì)列中著蛙,然后通過線程獲取任務(wù)隊(duì)列里面的任務(wù),模仿ThreadExecutorPool的方式耳贬,生成線程踏堡,

從隊(duì)列里面抓取任務(wù)處理,統(tǒng)一批量執(zhí)行,Eureka Server 那邊也是統(tǒng)一接收效拭,這樣提高了同步效率

批量處理的任務(wù)執(zhí)行器是com.netflix.eureka.cluster.ReplicationTaskProcessor

@Override
public ProcessingResult process(List<ReplicationTask> tasks) {
    // 構(gòu)建ReplicationInstance放入ReplicationList 
    ReplicationList list = createReplicationListOf(tasks);
    try {
        // 發(fā)起批量處理請(qǐng)求
        EurekaHttpResponse<ReplicationListResponse> response = replicationClient.submitBatchUpdates(list);
        int statusCode = response.getStatusCode();
        if (!isSuccess(statusCode)) {
            if (statusCode == 503) {
                logger.warn("Server busy (503) HTTP status code received from the peer {}; rescheduling tasks after delay", peerId);
                return ProcessingResult.Congestion;
            } else {
                // Unexpected error returned from the server. This should ideally never happen.
                logger.error("Batch update failure with HTTP status code {}; discarding {} replication tasks", statusCode, tasks.size());
                return ProcessingResult.PermanentError;
            }
        } else {
            // 處理執(zhí)行結(jié)果 暂吉,成功則調(diào)用handleSuccess 胖秒,失敗則調(diào)用handleFailure缎患。
            handleBatchResponse(tasks, response.getEntity().getResponseList());
        }
    } catch (Throwable e) {
        if (isNetworkConnectException(e)) {
            logNetworkErrorSample(null, e);
            return ProcessingResult.TransientError;
        } else {
            logger.error("Not re-trying this exception because it does not seem to be a network exception", e);
            return ProcessingResult.PermanentError;
        }
    }
    return ProcessingResult.Success;
}

請(qǐng)求批量處理的接口地址 : peerreplication/batch/

handleBatchResponse(tasks, response.getEntity().getResponseList()) , 循環(huán)調(diào)用處理結(jié)果阎肝,

成功則調(diào)用handleSuccess. , 失敗則調(diào)用handleFailure 挤渔, 比如hearbeat的時(shí)候,調(diào)用返回碼為

404的時(shí)候风题,會(huì)重新發(fā)起注冊(cè)判导。

ReplicationTask replicationTask = new InstanceReplicationTask(targetHost, Action.Heartbeat, info, overriddenStatus, false) {
    @Override
    public EurekaHttpResponse<InstanceInfo> execute() throws Throwable {
        return replicationClient.sendHeartBeat(appName, id, info, overriddenStatus);
    }

    @Override
    public void handleFailure(int statusCode, Object responseEntity) throws Throwable {
        super.handleFailure(statusCode, responseEntity);
        if (statusCode == 404) {
                // 重新發(fā)起注冊(cè)嫉父。
                register(info);
            }
        } else if (config.shouldSyncWhenTimestampDiffers()) {
            InstanceInfo peerInstanceInfo = (InstanceInfo) responseEntity;
            if (peerInstanceInfo != null) {
                syncInstancesIfTimestampDiffers(appName, id, info, peerInstanceInfo);
            }
        }
    }
};

Eureka Server接收同步

程序入口 : com.netflix.eureka.resources.PeerReplicationResource

@Path("batch")
@POST
public Response batchReplication(ReplicationList replicationList) {
    try {
        ReplicationListResponse batchResponse = new ReplicationListResponse();
        // 循環(huán)請(qǐng)求的任務(wù)
        for (ReplicationInstance instanceInfo : replicationList.getReplicationList()) {
            try {
                // 分發(fā)任務(wù),同時(shí)將處理結(jié)果收集起來(lái)眼刃,等會(huì)統(tǒng)一返回
                batchResponse.addResponse(dispatch(instanceInfo));
            } catch (Exception e) {
                batchResponse.addResponse(new ReplicationInstanceResponse(Status.INTERNAL_SERVER_ERROR.getStatusCode(), null));
                logger.error(instanceInfo.getAction() + " request processing failed for batch item "
                        + instanceInfo.getAppName() + '/' + instanceInfo.getId(), e);
            }
        }
        return Response.ok(batchResponse).build();
    } catch (Throwable e) {
        logger.error("Cannot execute batch Request", e);
        return Response.status(Status.INTERNAL_SERVER_ERROR).build();
    }
}


private ReplicationInstanceResponse dispatch(ReplicationInstance instanceInfo) {
    //  創(chuàng)建實(shí)例
    ApplicationResource applicationResource = createApplicationResource(instanceInfo);
    // 創(chuàng)建實(shí)例
    InstanceResource resource = createInstanceResource(instanceInfo, applicationResource);
    //獲取客戶端instance的lastDirtyTimestamp  绕辖,有點(diǎn)類似于版本號(hào)的概念。
    String lastDirtyTimestamp = toString(instanceInfo.getLastDirtyTimestamp());
    // 獲取覆蓋狀態(tài)
    String overriddenStatus = toString(instanceInfo.getOverriddenStatus());
    // 獲取instance的狀態(tài)
    String instanceStatus = toString(instanceInfo.getStatus());

    Builder singleResponseBuilder = new Builder();
    switch (instanceInfo.getAction()) {
        case Register: // 注冊(cè)
            singleResponseBuilder = handleRegister(instanceInfo, applicationResource);
            break;
        case Heartbeat: // 心跳續(xù)約
            singleResponseBuilder = handleHeartbeat(resource, lastDirtyTimestamp, overriddenStatus, instanceStatus);
            break;
        case Cancel:   // 下線
            singleResponseBuilder = handleCancel(resource);
            break;
        case StatusUpdate:  // 修改覆蓋狀態(tài)
            singleResponseBuilder = handleStatusUpdate(instanceInfo, resource);
            break;
        case DeleteStatusOverride: // 刪除覆蓋狀態(tài)
            singleResponseBuilder = handleDeleteStatusOverride(instanceInfo, resource);
            break;
    }
    return singleResponseBuilder.build();
}

以上五個(gè)場(chǎng)景擂红,這里就不一一說(shuō)了仪际,就說(shuō)一下注冊(cè)吧,

private static Builder handleRegister(ReplicationInstance instanceInfo, ApplicationResource applicationResource) {
    // 調(diào)用Application控制層的接口昵骤,添加實(shí)例
    applicationResource.addInstance(instanceInfo.getInstanceInfo(), REPLICATION);
    return new Builder().setStatusCode(Status.OK.getStatusCode());
}


@POST
@Consumes({"application/json", "application/xml"})
public Response addInstance(InstanceInfo info,
                            @HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication) {
    // 省略代碼1000行
    return Response.status(204).build();  // 204 to be backwards compatible
}

REPLICATION = “true” ,此次請(qǐng)求為true树碱,表示是一個(gè)服務(wù)端的復(fù)制請(qǐng)求。

由上面可以知道变秦,集群同步走的和客戶端注冊(cè)的后續(xù)流程是一樣的成榜,只不過isReplication=true , 表明這是一個(gè)集群同步的請(qǐng)求

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市蹦玫,隨后出現(xiàn)的幾起案子赎婚,更是在濱河造成了極大的恐慌,老刑警劉巖樱溉,帶你破解...
    沈念sama閱讀 217,542評(píng)論 6 504
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件惑淳,死亡現(xiàn)場(chǎng)離奇詭異,居然都是意外死亡饺窿,警方通過查閱死者的電腦和手機(jī)歧焦,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,822評(píng)論 3 394
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)肚医,“玉大人绢馍,你說(shuō)我怎么就攤上這事〕μ祝” “怎么了舰涌?”我有些...
    開封第一講書人閱讀 163,912評(píng)論 0 354
  • 文/不壞的土叔 我叫張陵,是天一觀的道長(zhǎng)你稚。 經(jīng)常有香客問我瓷耙,道長(zhǎng),這世上最難降的妖魔是什么刁赖? 我笑而不...
    開封第一講書人閱讀 58,449評(píng)論 1 293
  • 正文 為了忘掉前任搁痛,我火速辦了婚禮,結(jié)果婚禮上宇弛,老公的妹妹穿的比我還像新娘鸡典。我一直安慰自己,他們只是感情好枪芒,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,500評(píng)論 6 392
  • 文/花漫 我一把揭開白布彻况。 她就那樣靜靜地躺著谁尸,像睡著了一般。 火紅的嫁衣襯著肌膚如雪纽甘。 梳的紋絲不亂的頭發(fā)上良蛮,一...
    開封第一講書人閱讀 51,370評(píng)論 1 302
  • 那天,我揣著相機(jī)與錄音悍赢,去河邊找鬼背镇。 笑死,一個(gè)胖子當(dāng)著我的面吹牛泽裳,可吹牛的內(nèi)容都是我干的瞒斩。 我是一名探鬼主播,決...
    沈念sama閱讀 40,193評(píng)論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼涮总,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼胸囱!你這毒婦竟也來(lái)了?” 一聲冷哼從身側(cè)響起瀑梗,我...
    開封第一講書人閱讀 39,074評(píng)論 0 276
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤烹笔,失蹤者是張志新(化名)和其女友劉穎,沒想到半個(gè)月后抛丽,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體谤职,經(jīng)...
    沈念sama閱讀 45,505評(píng)論 1 314
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,722評(píng)論 3 335
  • 正文 我和宋清朗相戀三年亿鲜,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了允蜈。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 39,841評(píng)論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡蒿柳,死狀恐怖饶套,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情垒探,我是刑警寧澤妓蛮,帶...
    沈念sama閱讀 35,569評(píng)論 5 345
  • 正文 年R本政府宣布,位于F島的核電站圾叼,受9級(jí)特大地震影響蛤克,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜夷蚊,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,168評(píng)論 3 328
  • 文/蒙蒙 一构挤、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧撬码,春花似錦儿倒、人聲如沸版保。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,783評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)。三九已至叫胁,卻和暖如春凰慈,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背驼鹅。 一陣腳步聲響...
    開封第一講書人閱讀 32,918評(píng)論 1 269
  • 我被黑心中介騙來(lái)泰國(guó)打工微谓, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人输钩。 一個(gè)月前我還...
    沈念sama閱讀 47,962評(píng)論 2 370
  • 正文 我出身青樓豺型,卻偏偏與公主長(zhǎng)得像,于是被迫代替她去往敵國(guó)和親买乃。 傳聞我的和親對(duì)象是個(gè)殘疾皇子姻氨,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,781評(píng)論 2 354

推薦閱讀更多精彩內(nèi)容