DiscoverClient
com.netflix.discovery.DiscoveryClient ,使用的@Inject //google guice 注入遵循 JSR-330規(guī)范
@Inject
DiscoveryClient(ApplicationInfoManager applicationInfoManager, EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args,
Provider<BackupRegistry> backupRegistryProvider) {
// 省略N多代碼
// 初始化定時(shí)器信息
initScheduledTasks();
}
private void initScheduledTasks() {
// 省略N多代碼论矾。。丙躏。
if (clientConfig.shouldRegisterWithEureka()) {
int renewalIntervalInSecs = instanceInfo.getLeaseInfo().getRenewalIntervalInSecs();
int expBackOffBound = clientConfig.getHeartbeatExecutorExponentialBackOffBound();
logger.info("Starting heartbeat executor: " + "renew interval is: " + renewalIntervalInSecs);
// 在這里后频,初始化一個(gè)定時(shí)器任務(wù)
scheduler.schedule(
new TimedSupervisorTask(
"heartbeat",
scheduler,
heartbeatExecutor,
renewalIntervalInSecs,
TimeUnit.SECONDS,
expBackOffBound,
new HeartbeatThread()
),
renewalIntervalInSecs, TimeUnit.SECONDS);
// 省略N多代碼。莱革。献丑。
} else {
logger.info("Not registering with Eureka server per configuration");
}
}
由上可以看出悟民,在DiscoverClient這個(gè)類初始化的時(shí)候厕氨,會(huì)初始化定期任務(wù)进每,每30秒執(zhí)行一次,用來發(fā)送心跳
HeartbeatThread
這個(gè)是用來續(xù)約的線程命斧,主要看其run方法田晚,
private class HeartbeatThread implements Runnable {
public void run() {
if (renew()) {
// 更新最后一次心跳的時(shí)間
lastSuccessfulHeartbeatTimestamp = System.currentTimeMillis();
}
}
}
// 續(xù)約的主方法
boolean renew() {
EurekaHttpResponse<InstanceInfo> httpResponse;
try {
httpResponse = eurekaTransport.registrationClient.sendHeartBeat(instanceInfo.getAppName(), instanceInfo.getId(), instanceInfo, null);
logger.debug("{} - Heartbeat status: {}", PREFIX + appPathIdentifier, httpResponse.getStatusCode());
if (httpResponse.getStatusCode() == 404) {
REREGISTER_COUNTER.increment();
logger.info("{} - Re-registering apps/{}", PREFIX + appPathIdentifier, instanceInfo.getAppName());
return register();
}
return httpResponse.getStatusCode() == 200;
} catch (Throwable e) {
logger.error("{} - was unable to send heartbeat!", PREFIX + appPathIdentifier, e);
return false;
}
}
上面的代碼很簡單,主要就是啟動(dòng)一個(gè)線程国葬,然后線程執(zhí)行renew()方法贤徒, 最終發(fā)送心跳給Eureka-Server
接口地址: apps/ + appName + /' + id ,
如果接口返回值為404胃惜,就是說不存在泞莉,從來沒有注冊過,那么重新走注冊流程
lastDirtyTimestamp
即該instance在client端最后被修改的時(shí)間戳
Eureka-Server接收心跳
InstanceResource
@PUT
public Response renewLease(
@HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication,
@QueryParam("overriddenstatus") String overriddenStatus,
@QueryParam("status") String status,
@QueryParam("lastDirtyTimestamp") String lastDirtyTimestamp) {
boolean isFromReplicaNode = "true".equals(isReplication);
// 續(xù)約
boolean isSuccess = registry.renew(app.getName(), id, isFromReplicaNode);
// 續(xù)約失敗
// Not found in the registry, immediately ask for a register
if (!isSuccess) {
logger.warn("Not Found (Renew): {} - {}", app.getName(), id);
return Response.status(Status.NOT_FOUND).build();
}
// Check if we need to sync based on dirty time stamp, the client
// instance might have changed some value
Response response = null;
// 比較lastDirtyTimestamp
if (lastDirtyTimestamp != null && serverConfig.shouldSyncWhenTimestampDiffers()) {
// 比較lastDirtyTimestamp的大小船殉,這個(gè)還是比較重要的
response = this.validateDirtyTimestamp(Long.valueOf(lastDirtyTimestamp), isFromReplicaNode);
if (response.getStatus() == Response.Status.NOT_FOUND.getStatusCode()
&& (overriddenStatus != null)
&& !(InstanceStatus.UNKNOWN.name().equals(overriddenStatus))
&& isFromReplicaNode) {
registry.storeOverriddenStatusIfRequired(app.getAppName(), id, InstanceStatus.valueOf(overriddenStatus));
}
} else {
response = Response.ok().build();
}
logger.debug("Found (Renew): {} - {}; reply status={}" + app.getName(), id, response.getStatus());
return response;
}
private Response validateDirtyTimestamp(Long lastDirtyTimestamp,
boolean isReplication) {
// 獲取本機(jī)的instance實(shí)例信息
InstanceInfo appInfo = registry.getInstanceByAppAndId(app.getName(), id, false);
if (appInfo != null) {
//如果lastDirtyTimestamp不為空,并且lastDirtyTimestamp和本地的不相等
if ((lastDirtyTimestamp != null) && (!lastDirtyTimestamp.equals(appInfo.getLastDirtyTimestamp()))) {
Object[] args = {id, appInfo.getLastDirtyTimestamp(), lastDirtyTimestamp, isReplication};
// lastDirtyTimestamp>本地的時(shí)間斯嚎,則認(rèn)為當(dāng)前實(shí)例是無效的利虫,返回404錯(cuò)誤,客戶端重新發(fā)起注冊
if (lastDirtyTimestamp > appInfo.getLastDirtyTimestamp()) {
logger.debug(
"Time to sync, since the last dirty timestamp differs -"
+ " ReplicationInstance id : {},Registry : {} Incoming: {} Replication: {}",
args);
return Response.status(Status.NOT_FOUND).build();
} else if (appInfo.getLastDirtyTimestamp() > lastDirtyTimestamp) {
// 如果是集群同步請求堡僻,本地的時(shí)間糠惫,大于客戶端傳過來的時(shí)間,則返回 “沖突” 這個(gè)狀態(tài)回去钉疫,以本地的時(shí)間大的為準(zhǔn)
if (isReplication) {
logger.debug(
"Time to sync, since the last dirty timestamp differs -"
+ " ReplicationInstance id : {},Registry : {} Incoming: {} Replication: {}",
args);
return Response.status(Status.CONFLICT).entity(appInfo).build();
} else {
return Response.ok().build();
}
}
}
}
return Response.ok().build();
}
代碼說明:
1.lastDirtyTimestamp 是客戶端向服務(wù)端發(fā)請求的版本號(hào) 硼讽, 一切請求都以版本號(hào)大的為準(zhǔn)。牲阁, 如: 注冊
2.在調(diào)用續(xù)約的方法之后固阁,Eureka Server 會(huì)對請求過來的lastDirtyTimestamp和本地的做對比壤躲, 如果
請求lastDirtyTimestamp>本地的時(shí)間,則認(rèn)為當(dāng)前實(shí)例是無效的备燃,返回404錯(cuò)誤碉克,客戶端重新發(fā)起注冊。
3.如果是集群同步請求并齐,本地的時(shí)間漏麦,大于其他Eureka Server傳過來的時(shí)間,則返回 “沖突” 這個(gè)狀態(tài)回去况褪,
以本地的時(shí)間大的為準(zhǔn)撕贞,注意是集群同步請求,如果是客戶端傳過的测垛,是不會(huì)有這個(gè)規(guī)則的捏膨。
應(yīng)用續(xù)約
//PeerAwareInstanceRegistryImpl.java
public boolean renew(final String appName, final String id, final boolean isReplication) {
// 執(zhí)行續(xù)約操作
if (super.renew(appName, id, isReplication)) {
// 同步Eureka-Server集群
replicateToPeers(Action.Heartbeat, appName, id, null, null, isReplication);
return true;
}
return false;
}
//AbstractInstanceRegistry.java
public boolean renew(String appName, String id, boolean isReplication) {
// 增加續(xù)約次數(shù)到統(tǒng)計(jì)枚舉
RENEW.increment(isReplication);
// 從Eureka-Server端本地的CurrentHashMap中,通過appName獲取Lease信息
Map<String, Lease<InstanceInfo>> gMap = registry.get(appName);
Lease<InstanceInfo> leaseToRenew = null;
if (gMap != null) {
leaseToRenew = gMap.get(id);
}
// lease為空赐纱,lease在第一次注冊的時(shí)候會(huì)創(chuàng)建脊奋,為空,則表示從來沒有注冊過疙描,租約不存在
if (leaseToRenew == null) {
RENEW_NOT_FOUND.increment(isReplication);
logger.warn("DS: Registry: lease doesn't exist, registering resource: {} - {}", appName, id);
return false;
} else {
// 獲取lease里面的instance信息
InstanceInfo instanceInfo = leaseToRenew.getHolder();
if (instanceInfo != null) {
// touchASGCache(instanceInfo.getASGName());
// 一系列狀態(tài)判斷诚隙,目前還不是很清楚,但是不影響主流程
InstanceStatus overriddenInstanceStatus = this.getOverriddenInstanceStatus(
instanceInfo, leaseToRenew, isReplication);
if (overriddenInstanceStatus == InstanceStatus.UNKNOWN) {
logger.info("Instance status UNKNOWN possibly due to deleted override for instance {}"
+ "; re-register required", instanceInfo.getId());
RENEW_NOT_FOUND.increment(isReplication);
return false;
}
if (!instanceInfo.getStatus().equals(overriddenInstanceStatus)) {
Object[] args = {
instanceInfo.getStatus().name(),
instanceInfo.getOverriddenStatus().name(),
instanceInfo.getId()
};
logger.info(
"The instance status {} is different from overridden instance status {} for instance {}. "
+ "Hence setting the status to overridden status", args);
instanceInfo.setStatus(overriddenInstanceStatus);
}
}
// 設(shè)置每分鐘的續(xù)約次數(shù)
renewsLastMin.increment();
// 續(xù)約
leaseToRenew.renew();
return true;
}
}
從上面可以看到整個(gè)續(xù)約過程起胰,主要就是從本地的CurrentHashMap中獲取租約信息久又, 獲取到了之后,設(shè)置每分鐘的續(xù)約次數(shù)以及續(xù)約時(shí)間效五。
renewsLastMin.increment()地消, 這個(gè)里面。主要是更新一個(gè)currentBucket的變量畏妖,類型為AtomicLong 脉执, 同時(shí)有個(gè)定時(shí)器一分鐘去更新一次。一分鐘之后戒劫,這個(gè)值會(huì)重新設(shè)置為0 半夷。
leaseToRenew.renew() , 更新lastUpdateTimestamp, duration默認(rèn)為90秒
//Lease.java
public void renew() {
lastUpdateTimestamp = System.currentTimeMillis() + duration;
}
總結(jié):
在一下三種情況,續(xù)約是返回404 迅细, 需要客戶端重新發(fā)起注冊的巫橄。
1.當(dāng)客戶端的lastDirtyTimestamp> 大于服務(wù)端的instance的lastDirtyTimestamp時(shí)候,會(huì)認(rèn)為服務(wù)端
的信息是無效的茵典,因此無法續(xù)約湘换,需要重新發(fā)起注冊請求。
2.服務(wù)端的注冊信息不存在
3.服務(wù)端的instance的status = UNKONW, 為什么會(huì)出現(xiàn)UNKONW這個(gè)狀態(tài)呢彩倚,因?yàn)樵赿eleteStatusOverride
的時(shí)候存在傳入U(xiǎn)NKONW的可能性筹我。