Eureka源碼分析(2.1.4.Release)
首先源碼切忌一行一行debug友绝,需先了解eureka主要功能后,再分析其功能如何實現(xiàn)。
由于上一篇文章提及到客戶端如何從注冊中心中獲取服務列表eureka(一)-功能介紹與客戶端之服務獲取
客戶端之服務續(xù)約
服務續(xù)約任務與服務列表獲取一樣,都是通過TimedSupervisorTask實現(xiàn)動態(tài)時間間隔執(zhí)行任務梨睁。TimedSupervisorTask是如何實現(xiàn)動態(tài)間隔時間執(zhí)行任務的,可看上文提到的文章坛梁。
繼續(xù)從上文提到的initScheduledTasks出發(fā):
com.netflix.discovery.DiscoveryClient#initScheduledTasks:
……上部分代碼為客戶端獲取服務列表任務……
if (clientConfig.shouldRegisterWithEureka()) {
//eureka.instance.leaseRenewallIntervalInSeconds=30,默認每30秒執(zhí)行心跳續(xù)約
int renewalIntervalInSecs = instanceInfo.getLeaseInfo().getRenewalIntervalInSecs();
//eureka.client.heartbeatExecutorExponentialBackOffBound=10而姐,如果發(fā)生超時的重試延遲的最大乘數(shù)。
int expBackOffBound = clientConfig.getHeartbeatExecutorExponentialBackOffBound();
logger.info("Starting heartbeat executor: " + "renew interval is: {}", renewalIntervalInSecs);
//定義個名為heartbeat的線程划咐,執(zhí)行心跳續(xù)約任務
scheduler.schedule(
new TimedSupervisorTask(
"heartbeat",
scheduler,
heartbeatExecutor,
renewalIntervalInSecs,
TimeUnit.SECONDS,
expBackOffBound,
new HeartbeatThread()
),
renewalIntervalInSecs, TimeUnit.SECONDS);
……下部分代碼為實例更新并通知eureka注冊中心任務……
由于TimedSupervisorTask在上篇文章已有介紹,現(xiàn)在則直接查看續(xù)約線程邏輯
com.netflix.discovery.DiscoveryClient.HeartbeatThread:
com.netflix.discovery.DiscoveryClient.HeartbeatThread#run:
public void run() {
if (renew()) {
lastSuccessfulHeartbeatTimestamp = System.currentTimeMillis();
}
}
com.netflix.discovery.DiscoveryClient#renew:
boolean renew() {
EurekaHttpResponse<InstanceInfo> httpResponse;
try {
//發(fā)起heartbeat請求
httpResponse = eurekaTransport.registrationClient.sendHeartBeat(instanceInfo.getAppName(), instanceInfo.getId(), instanceInfo, null);
logger.debug(PREFIX + "{} - Heartbeat status: {}", appPathIdentifier, httpResponse.getStatusCode());
//如果返回的是無法找到钧萍,則重新注冊
if (httpResponse.getStatusCode() == Status.NOT_FOUND.getStatusCode()) {
REREGISTER_COUNTER.increment();
logger.info(PREFIX + "{} - Re-registering apps/{}", appPathIdentifier, instanceInfo.getAppName());
//設置該實例的dirty標志位true褐缠,并更新dirty時間
long timestamp = instanceInfo.setIsDirtyWithTime();
//重新注冊
boolean success = register();
//如果成功,則將實例ditry的標志改為false
if (success) {
instanceInfo.unsetIsDirty(timestamp);
}
return success;
}
return httpResponse.getStatusCode() == Status.OK.getStatusCode();
} catch (Throwable e) {
logger.error(PREFIX + "{} - was unable to send heartbeat!", appPathIdentifier, e);
return false;
}
}
實例的dirty標志isInstanceInfoDirty的作用:在執(zhí)行服務注冊任務時风瘦,如果dirty標志為true队魏,則將實例重新注冊到遠程eureka注冊中心。
客戶端之服務注冊
還是從上文提到的initScheduledTasks出發(fā):
com.netflix.discovery.DiscoveryClient#initScheduledTasks:
if (clientConfig.shouldRegisterWithEureka()) {
……忽略心跳續(xù)約代碼……
// InstanceInfo replicator
//實例更新后,需要將更新信息發(fā)送到遠程eureka中心的線程胡桨,其實現(xiàn)了Runnable接口
instanceInfoReplicator = new InstanceInfoReplicator(
this,
instanceInfo,
clientConfig.getInstanceInfoReplicationIntervalSeconds(),//instanceInfoReplicationIntervalSeconds=30官帘,默認每30秒檢查是否有更新,并注冊到注冊中心
2); //當前時間允許的最大請求數(shù)
//狀態(tài)監(jiān)聽器
statusChangeListener = new ApplicationInfoManager.StatusChangeListener() {
@Override
public String getId() {
return "statusChangeListener";
}
@Override
public void notify(StatusChangeEvent statusChangeEvent) {
if (InstanceStatus.DOWN == statusChangeEvent.getStatus() ||
InstanceStatus.DOWN == statusChangeEvent.getPreviousStatus()) {
logger.warn("Saw local status change event {}", statusChangeEvent);
} else {
logger.info("Saw local status change event {}", statusChangeEvent);
}
//狀態(tài)更新后昧谊,立即更新到eureka注冊中心(這里涉及到令牌限流算法)
instanceInfoReplicator.onDemandUpdate();
}
};
//通過ApplicationInfoManager進行的本地狀態(tài)更新將立即觸發(fā)對遠程eureka服務器的注冊/更新(速率受限)刽虹。默認為true
if (clientConfig.shouldOnDemandUpdateStatusChange()) {
applicationInfoManager.registerStatusChangeListener(statusChangeListener);
}
//eureka.client.initialInstanceInfoReplicationIntervalSeconds=40s,默認延時40s呢诬,實例更新或注冊線程啟動
instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds());
} else {
logger.info("Not registering with Eureka server per configuration");
}
從上面的代碼可以知道涌哲,instanceInfoReplicator的run方法實現(xiàn)了具體的邏輯代碼
com.netflix.discovery.InstanceInfoReplicator#start:
public void start(int initialDelayMs) {
if (started.compareAndSet(false, true)) {
//第一次啟動:設置dirty標志為true,觸發(fā)第一次注冊尚镰。
instanceInfo.setIsDirty();
Future next = scheduler.schedule(this, initialDelayMs, TimeUnit.SECONDS);
scheduledPeriodicRef.set(next);
}
}
com.netflix.discovery.InstanceInfoReplicator#run:
public void run() {
try {
//檢查實例是否需要更新阀圾,如果需要更新,設置dirty標志為true狗唉,更新dirty時間初烘。
discoveryClient.refreshInstanceInfo();
//如果dirty標志為true,則返回dirty時間分俯,如果標志為false則返回null
Long dirtyTimestamp = instanceInfo.isDirtyWithTime();
if (dirtyTimestamp != null) {
//dirty時間不為null肾筐,則需要更新到注冊中心,重新注冊
discoveryClient.register();
//設置dirty標志為false
instanceInfo.unsetIsDirty(dirtyTimestamp);
}
} catch (Throwable t) {
logger.warn("There was a problem with the instance info replicator", t);
} finally {
//replicationIntervalSeconds秒后(即30秒)澳迫,又重新執(zhí)行該方法局齿。
Future next = scheduler.schedule(this, replicationIntervalSeconds, TimeUnit.SECONDS);
scheduledPeriodicRef.set(next);
}
}
com.netflix.discovery.DiscoveryClient#refreshInstanceInfo:
/**
* Refresh the current local instanceInfo. Note that after a valid refresh where changes are observed, the
* isDirty flag on the instanceInfo is set to true
*/
void refreshInstanceInfo() {
applicationInfoManager.refreshDataCenterInfoIfRequired();
applicationInfoManager.refreshLeaseInfoIfRequired();
InstanceStatus status;
try {
status = getHealthCheckHandler().getStatus(instanceInfo.getStatus());
} catch (Exception e) {
logger.warn("Exception from healthcheckHandler.getStatus, setting status to DOWN", e);
status = InstanceStatus.DOWN;
}
if (null != status) {
applicationInfoManager.setInstanceStatus(status);
}
}
refreshInstanceInfo方法具體實現(xiàn)流程圖如下:
服務注冊任務整體流程如下:
附加算法介紹:令牌桶限流算法(簡要)
instanceInfoReplicator.onDemandUpdate():實例狀態(tài)變更后,馬上更新到遠程eureka遠程中心橄登。
該方法用到了RateLimiter來實現(xiàn)限流抓歼。RateLimiter實現(xiàn)了令牌桶限流算法。為了防止網(wǎng)絡擁塞拢锹,需限制流出網(wǎng)絡的流量谣妻,使流量以比較均勻的速度向外發(fā)送。令牌桶算法就實現(xiàn)了這個功能卒稳,可控制發(fā)送到網(wǎng)絡上數(shù)據(jù)的數(shù)目蹋半,并允許突發(fā)數(shù)據(jù)的發(fā)送。