集群?jiǎn)?dòng)同步
protected void initEurekaServerContext() throws Exception {
// ....省略N多代碼
// 同步信息
int registryCount = this.registry.syncUp();
// ....省略N多代碼
}
網(wǎng)上很多文章說(shuō)是調(diào)用syncUp這個(gè)方法去其他Eureka Server節(jié)點(diǎn)復(fù)制注冊(cè)信息邀跃,這個(gè)說(shuō)法不是很準(zhǔn)確, 在這個(gè)地方锅论,SyncUp()這個(gè)方法并不會(huì)去其他Eureka Server節(jié)點(diǎn)復(fù)制信息,而是從本地內(nèi)存里面獲取注冊(cè)信息, 看源碼就知道了拙寡。
public int syncUp() {
// Copy entire entry from neighboring DS node
// 獲取到的注冊(cè)節(jié)點(diǎn)數(shù)量
int count = 0;
// 如果count==0 , 那么默認(rèn)重試5次(前提是開啟了register-with-eureka = true,否則為0)
for (int i = 0; ((i < serverConfig.getRegistrySyncRetries()) && (count == 0)); i++) {
if (i > 0) {
try {
// 從第二次開始琳水,每次默認(rèn)沉睡30秒
Thread.sleep(serverConfig.getRegistrySyncRetryWaitMs());
} catch (InterruptedException e) {
logger.warn("Interrupted during registry transfer..");
break;
}
}
// 從本地內(nèi)存里面獲取注冊(cè)實(shí)例信息
Applications apps = eurekaClient.getApplications();
for (Application app : apps.getRegisteredApplications()) {
for (InstanceInfo instance : app.getInstances()) {
try {
// 判斷是否可以注冊(cè)
if (isRegisterable(instance)) {
// 注冊(cè)到當(dāng)前Eureka Server里面
register(instance, instance.getLeaseInfo().getDurationInSecs(), true);
count++;
}
} catch (Throwable t) {
logger.error("During DS init copy", t);
}
}
}
}
return count;
}
參數(shù)說(shuō)明:
regirstrySyncRetries : 當(dāng)eureka服務(wù)器啟動(dòng)時(shí)嘗試去獲取集群里其他服務(wù)器上的注冊(cè)信息的次數(shù)肆糕,默認(rèn)為5,
只有當(dāng) eureka.client.register-with-eureka = true 的時(shí)候才會(huì)是5在孝,如果是false 诚啃,則為0
registrySyncRetryWaitMs : 當(dāng)eureka服務(wù)器啟動(dòng)時(shí)獲取其他服務(wù)器的注冊(cè)信息失敗時(shí),會(huì)再次嘗試獲取私沮,期間需要等待的時(shí)間始赎,默認(rèn)為30 * 1000毫秒
count : 獲取到的注冊(cè)實(shí)例數(shù)量,如果為0 則根據(jù)重試次數(shù)進(jìn)行重試仔燕,每次重試前沉默 30秒
PS: 在之前的文章中 7. 深入理解Eureka 獲取注冊(cè)信息(七) 造垛,講過Eureka Client啟動(dòng)的時(shí)候默認(rèn)會(huì)自動(dòng)從Eureka Server獲取注冊(cè)信息, 要想Eureka Server在啟動(dòng)的時(shí)候可以同步其他集群節(jié)點(diǎn)的注冊(cè)信息晰搀,那么必須開啟客戶端配置
eureka.client.register-with-eureka = true ## 是否作為一個(gè)Eureka Client 注冊(cè)到Eureka Server上去
eureka.client.fetch-registry = true ## 是否需要從Eureka Server上拉取注冊(cè)信息到本地五辽。
只有開啟了上面兩個(gè)配置,那么集群節(jié)點(diǎn)在啟動(dòng)的時(shí)候外恕,會(huì)初始化Eureka Client端的配置 杆逗,會(huì)從其他Eureka Server拉取注冊(cè)信息到本地俄周,同時(shí)
在初始化Eureka Server的時(shí)候,會(huì)從本地內(nèi)存里面讀取 注冊(cè)信息髓迎,自動(dòng)注冊(cè)到本身的服務(wù)上
集群同步類型
public enum Action {
Heartbeat, Register, Cancel, StatusUpdate, DeleteStatusOverride;
private com.netflix.servo.monitor.Timer timer = Monitors.newTimer(this.name());
public com.netflix.servo.monitor.Timer getTimer() {
return this.timer;
}
}
Heartbeat : 心跳續(xù)約
Register : 注冊(cè)
Cancel : 下線
StatusUpdate : 添加覆蓋狀態(tài)
DeleteStatusOverride : 刪除覆蓋狀態(tài)
發(fā)起同步
這里以注冊(cè)的代碼為例
@Override
public void register(final InstanceInfo info, final boolean isReplication) {
int leaseDuration = Lease.DEFAULT_DURATION_IN_SECS;
if (info.getLeaseInfo() != null && info.getLeaseInfo().getDurationInSecs() > 0) {
leaseDuration = info.getLeaseInfo().getDurationInSecs();
}
// 發(fā)起注冊(cè)
super.register(info, leaseDuration, isReplication);
// 注冊(cè)完成后峦朗,在這里發(fā)起同步,同步類型為Register
replicateToPeers(Action.Register, info.getAppName(), info.getId(), info, null, isReplication);
}
private void replicateToPeers(Action action, String appName, String id,
InstanceInfo info /* optional */,
InstanceStatus newStatus /* optional */, boolean isReplication) {
Stopwatch tracer = action.getTimer().start();
try {
// 判斷是否是集群同步請(qǐng)求排龄,如果是波势,則記錄最后一分鐘的同步次數(shù)
if (isReplication) {
numberOfReplicationsLastMin.increment();
}
// If it is a replication already, do not replicate again as this will create a poison replication
// 集群節(jié)點(diǎn)為空,或者這是一個(gè)Eureka Server 同步請(qǐng)求橄维,直接return
if (peerEurekaNodes == Collections.EMPTY_LIST || isReplication) {
return;
}
// 循環(huán)相鄰的Eureka Server Node尺铣, 分別發(fā)起請(qǐng)求同步
for (final PeerEurekaNode node : peerEurekaNodes.getPeerEurekaNodes()) {
// 判斷是否是自身的URL,過濾掉
if (peerEurekaNodes.isThisMyUrl(node.getServiceUrl())) {
continue;
}
// 發(fā)起同步請(qǐng)求
replicateInstanceActionsToPeers(action, appName, id, info, newStatus, node);
}
} finally {
tracer.stop();
}
}
步驟說(shuō)明:
1.判斷集群節(jié)點(diǎn)是否為空争舞,為空則返回
2.isReplication 代表是否是一個(gè)復(fù)制請(qǐng)求凛忿, isReplication = true 表示是其他Eureka Server發(fā)過來(lái)的同步請(qǐng)求
這個(gè)時(shí)候是不需要繼續(xù)往下同步的。否則會(huì)陷入同步死循環(huán)
3.循環(huán)集群節(jié)點(diǎn)竞川,過濾掉自身的節(jié)點(diǎn)
4.發(fā)起同步請(qǐng)求 店溢,調(diào)用replicateInstanceActionsToPeers
PS: 這里提到了PeerEurekaNode , 對(duì)于PeerEurekaNodes的集群節(jié)點(diǎn)更新及數(shù)據(jù)讀取委乌,可以看這個(gè)1. 深入理解Eureka Server啟動(dòng)(一)在服務(wù)啟動(dòng)的時(shí)候床牧,對(duì)PeerEurekaNodes集群開啟了線程更新集群節(jié)點(diǎn)信息。每15分鐘一次
private void replicateInstanceActionsToPeers(Action action, String appName,
String id, InstanceInfo info, InstanceStatus newStatus,
PeerEurekaNode node) {
try {
InstanceInfo infoFromRegistry = null;
CurrentRequestVersion.set(Version.V2);
switch (action) {
case Cancel: // 下線
node.cancel(appName, id);
break;
case Heartbeat:
// 心跳
InstanceStatus overriddenStatus = overriddenInstanceStatusMap.get(id);
// 獲取本地最新的實(shí)例信息
infoFromRegistry = getInstanceByAppAndId(appName, id, false);
node.heartbeat(appName, id, infoFromRegistry, overriddenStatus, false);
break;
case Register: // 注冊(cè)
node.register(info);
break;
case StatusUpdate: // 設(shè)置覆蓋狀態(tài)
infoFromRegistry = getInstanceByAppAndId(appName, id, false);
node.statusUpdate(appName, id, newStatus, infoFromRegistry);
break;
case DeleteStatusOverride: //刪除覆蓋狀態(tài)
infoFromRegistry = getInstanceByAppAndId(appName, id, false);
node.deleteStatusOverride(appName, id, infoFromRegistry);
break;
}
} catch (Throwable t) {
logger.error("Cannot replicate information to {} for action {}", node.getServiceUrl(), action.name(), t);
}
}
這里直接看注冊(cè)遭贸,其他的原理上是一致的戈咳。
PeerEurekaNode的register方法如下。
public void register(final InstanceInfo info) throws Exception {
long expiryTime = System.currentTimeMillis() + getLeaseRenewalOf(info);
// 默認(rèn)采用的是批處理
batchingDispatcher.process(
taskId("register", info),
new InstanceReplicationTask(targetHost, Action.Register, info, null, true) {
public EurekaHttpResponse<Void> execute() {
return replicationClient.register(info);
}
},
expiryTime
);
}
默認(rèn)采用的是批量任務(wù)處理器壕吹,就是將task放入任務(wù)隊(duì)列中著蛙,然后通過線程獲取任務(wù)隊(duì)列里面的任務(wù),模仿ThreadExecutorPool的方式耳贬,生成線程踏堡,
從隊(duì)列里面抓取任務(wù)處理,統(tǒng)一批量執(zhí)行,Eureka Server 那邊也是統(tǒng)一接收效拭,這樣提高了同步效率
批量處理的任務(wù)執(zhí)行器是com.netflix.eureka.cluster.ReplicationTaskProcessor
@Override
public ProcessingResult process(List<ReplicationTask> tasks) {
// 構(gòu)建ReplicationInstance放入ReplicationList
ReplicationList list = createReplicationListOf(tasks);
try {
// 發(fā)起批量處理請(qǐng)求
EurekaHttpResponse<ReplicationListResponse> response = replicationClient.submitBatchUpdates(list);
int statusCode = response.getStatusCode();
if (!isSuccess(statusCode)) {
if (statusCode == 503) {
logger.warn("Server busy (503) HTTP status code received from the peer {}; rescheduling tasks after delay", peerId);
return ProcessingResult.Congestion;
} else {
// Unexpected error returned from the server. This should ideally never happen.
logger.error("Batch update failure with HTTP status code {}; discarding {} replication tasks", statusCode, tasks.size());
return ProcessingResult.PermanentError;
}
} else {
// 處理執(zhí)行結(jié)果 暂吉,成功則調(diào)用handleSuccess 胖秒,失敗則調(diào)用handleFailure缎患。
handleBatchResponse(tasks, response.getEntity().getResponseList());
}
} catch (Throwable e) {
if (isNetworkConnectException(e)) {
logNetworkErrorSample(null, e);
return ProcessingResult.TransientError;
} else {
logger.error("Not re-trying this exception because it does not seem to be a network exception", e);
return ProcessingResult.PermanentError;
}
}
return ProcessingResult.Success;
}
請(qǐng)求批量處理的接口地址 : peerreplication/batch/
handleBatchResponse(tasks, response.getEntity().getResponseList()) , 循環(huán)調(diào)用處理結(jié)果阎肝,
成功則調(diào)用handleSuccess. , 失敗則調(diào)用handleFailure 挤渔, 比如hearbeat的時(shí)候,調(diào)用返回碼為
404的時(shí)候风题,會(huì)重新發(fā)起注冊(cè)判导。
ReplicationTask replicationTask = new InstanceReplicationTask(targetHost, Action.Heartbeat, info, overriddenStatus, false) {
@Override
public EurekaHttpResponse<InstanceInfo> execute() throws Throwable {
return replicationClient.sendHeartBeat(appName, id, info, overriddenStatus);
}
@Override
public void handleFailure(int statusCode, Object responseEntity) throws Throwable {
super.handleFailure(statusCode, responseEntity);
if (statusCode == 404) {
// 重新發(fā)起注冊(cè)嫉父。
register(info);
}
} else if (config.shouldSyncWhenTimestampDiffers()) {
InstanceInfo peerInstanceInfo = (InstanceInfo) responseEntity;
if (peerInstanceInfo != null) {
syncInstancesIfTimestampDiffers(appName, id, info, peerInstanceInfo);
}
}
}
};
Eureka Server接收同步
程序入口 : com.netflix.eureka.resources.PeerReplicationResource
@Path("batch")
@POST
public Response batchReplication(ReplicationList replicationList) {
try {
ReplicationListResponse batchResponse = new ReplicationListResponse();
// 循環(huán)請(qǐng)求的任務(wù)
for (ReplicationInstance instanceInfo : replicationList.getReplicationList()) {
try {
// 分發(fā)任務(wù),同時(shí)將處理結(jié)果收集起來(lái)眼刃,等會(huì)統(tǒng)一返回
batchResponse.addResponse(dispatch(instanceInfo));
} catch (Exception e) {
batchResponse.addResponse(new ReplicationInstanceResponse(Status.INTERNAL_SERVER_ERROR.getStatusCode(), null));
logger.error(instanceInfo.getAction() + " request processing failed for batch item "
+ instanceInfo.getAppName() + '/' + instanceInfo.getId(), e);
}
}
return Response.ok(batchResponse).build();
} catch (Throwable e) {
logger.error("Cannot execute batch Request", e);
return Response.status(Status.INTERNAL_SERVER_ERROR).build();
}
}
private ReplicationInstanceResponse dispatch(ReplicationInstance instanceInfo) {
// 創(chuàng)建實(shí)例
ApplicationResource applicationResource = createApplicationResource(instanceInfo);
// 創(chuàng)建實(shí)例
InstanceResource resource = createInstanceResource(instanceInfo, applicationResource);
//獲取客戶端instance的lastDirtyTimestamp 绕辖,有點(diǎn)類似于版本號(hào)的概念。
String lastDirtyTimestamp = toString(instanceInfo.getLastDirtyTimestamp());
// 獲取覆蓋狀態(tài)
String overriddenStatus = toString(instanceInfo.getOverriddenStatus());
// 獲取instance的狀態(tài)
String instanceStatus = toString(instanceInfo.getStatus());
Builder singleResponseBuilder = new Builder();
switch (instanceInfo.getAction()) {
case Register: // 注冊(cè)
singleResponseBuilder = handleRegister(instanceInfo, applicationResource);
break;
case Heartbeat: // 心跳續(xù)約
singleResponseBuilder = handleHeartbeat(resource, lastDirtyTimestamp, overriddenStatus, instanceStatus);
break;
case Cancel: // 下線
singleResponseBuilder = handleCancel(resource);
break;
case StatusUpdate: // 修改覆蓋狀態(tài)
singleResponseBuilder = handleStatusUpdate(instanceInfo, resource);
break;
case DeleteStatusOverride: // 刪除覆蓋狀態(tài)
singleResponseBuilder = handleDeleteStatusOverride(instanceInfo, resource);
break;
}
return singleResponseBuilder.build();
}
以上五個(gè)場(chǎng)景擂红,這里就不一一說(shuō)了仪际,就說(shuō)一下注冊(cè)吧,
private static Builder handleRegister(ReplicationInstance instanceInfo, ApplicationResource applicationResource) {
// 調(diào)用Application控制層的接口昵骤,添加實(shí)例
applicationResource.addInstance(instanceInfo.getInstanceInfo(), REPLICATION);
return new Builder().setStatusCode(Status.OK.getStatusCode());
}
@POST
@Consumes({"application/json", "application/xml"})
public Response addInstance(InstanceInfo info,
@HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication) {
// 省略代碼1000行
return Response.status(204).build(); // 204 to be backwards compatible
}
REPLICATION = “true” ,此次請(qǐng)求為true树碱,表示是一個(gè)服務(wù)端的復(fù)制請(qǐng)求。
由上面可以知道变秦,集群同步走的和客戶端注冊(cè)的后續(xù)流程是一樣的成榜,只不過isReplication=true , 表明這是一個(gè)集群同步的請(qǐng)求