一、Eureka架構(gòu)
從下面的架構(gòu)圖中可以看出,不管是服務(wù)的調(diào)用者還是服務(wù)的提供者都可以認(rèn)為是一個(gè)EurekaClient,在啟動(dòng)的過程中會(huì)將自身注冊(cè)到EurekaServer中(也就是Eureka注冊(cè)中心),并通過心跳保持服務(wù)的續(xù)約饮戳。
CAP
一致性(Consistency)、可用性(Availability)洞拨、分區(qū)容錯(cuò)性(Partition tolerance)
Eureka保證AP(ZK保證CP)
注冊(cè)過程
1扯罐、Register(注冊(cè)): 服務(wù)提供者向server注冊(cè)或者更新自己的信息
2、Replicate(同步): server集群之間同步注冊(cè)信息
3烦衣、Get(獲却鹾印):服務(wù)消費(fèi)者從server獲取其他服務(wù)注冊(cè)信息,并緩存到本地
4花吟、renew(續(xù)約):client向server發(fā)送心跳維持元數(shù)據(jù)有效性秸歧,一定時(shí)間未收到心跳則從自身注冊(cè)表中刪除
5、服務(wù)下線:client下線時(shí)向server發(fā)送信息衅澈,server會(huì)注銷該服務(wù)元數(shù)據(jù)键菱,并通知服務(wù)消費(fèi)者。
二今布、Eureka的啟動(dòng)注冊(cè)
注冊(cè)是InstanceId的生成可以查看另一篇SpringCloud學(xué)習(xí)筆記(四)-InstanceId的生成
通常我們通過@EnableDiscoveryClient來聲明一個(gè)Eureka客戶端经备。源碼注釋這樣描述EnableDiscoveryClient
Annotation to enable a DiscoveryClient implementation
即通過該注釋會(huì)開啟一個(gè)DiscoveryClient實(shí)例,DiscoveryClient類源碼可以看到部默,DiscoveryClient的構(gòu)造方法中有如下代碼段弄喘。
// default size of 2 - 1 each for heartbeat and cacheRefresh
scheduler = Executors.newScheduledThreadPool(2,
new ThreadFactoryBuilder()
.setNameFormat("DiscoveryClient-%d")
.setDaemon(true)
.build());
heartbeatExecutor = new ThreadPoolExecutor(
1, clientConfig.getHeartbeatExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
new ThreadFactoryBuilder()
.setNameFormat("DiscoveryClient-HeartbeatExecutor-%d")
.setDaemon(true)
.build()
); // use direct handoff
cacheRefreshExecutor = new ThreadPoolExecutor(
1, clientConfig.getCacheRefreshExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
new ThreadFactoryBuilder()
.setNameFormat("DiscoveryClient-CacheRefreshExecutor-%d")
.setDaemon(true)
.build()
); // use direct handoff
而通過DiscoveryClient的源碼可以看到,在服務(wù)啟動(dòng)時(shí)甩牺,會(huì)啟動(dòng)兩個(gè)線程池,一個(gè)用于心跳的保持累奈,一個(gè)用于緩存的刷新贬派。進(jìn)一步跟蹤源碼急但,在initScheduledTasks中,開啟心跳定時(shí)器搞乏,同時(shí)會(huì)啟動(dòng)一個(gè)線程InstanceInfoReplicator波桩,這個(gè)線程會(huì)將本地實(shí)例信息更新到遠(yuǎn)程服務(wù)器,也就是將當(dāng)前服務(wù)注冊(cè)到注冊(cè)中心请敦。源碼如下:
public void run() {
try {
discoveryClient.refreshInstanceInfo();
Long dirtyTimestamp = instanceInfo.isDirtyWithTime();
if (dirtyTimestamp != null) {
discoveryClient.register();
instanceInfo.unsetIsDirty(dirtyTimestamp);
}
} catch (Throwable t) {
logger.warn("There was a problem with the instance info replicator", t);
} finally {
Future next = scheduler.schedule(this, replicationIntervalSeconds, TimeUnit.SECONDS);
scheduledPeriodicRef.set(next);
}
}
這里有個(gè)小細(xì)節(jié)镐躲,instanceInfoReplicator在啟動(dòng)的時(shí)候,會(huì)有一定時(shí)間的延遲侍筛,默認(rèn)是40秒萤皂,也就是client在啟動(dòng)時(shí),不是馬上向注冊(cè)中心注冊(cè)匣椰,而是會(huì)延遲40秒再注冊(cè)裆熙。
instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds());
public int getInitialInstanceInfoReplicationIntervalSeconds() {
return configInstance.getIntProperty(
namespace + INITIAL_REGISTRATION_REPLICATION_DELAY_KEY, 40).get();
}
接著看register方法, 該方法會(huì)將實(shí)例元數(shù)據(jù)信息(instanceInfo)通過http方式注冊(cè)到Eureka服務(wù)端。
boolean register() throws Throwable {
logger.info(PREFIX + appPathIdentifier + ": registering service...");
EurekaHttpResponse<Void> httpResponse;
try {
httpResponse = eurekaTransport.registrationClient.register(instanceInfo);
} catch (Exception e) {
logger.warn("{} - registration failed {}", PREFIX + appPathIdentifier, e.getMessage(), e);
throw e;
}
if (logger.isInfoEnabled()) {
logger.info("{} - registration status: {}", PREFIX + appPathIdentifier, httpResponse.getStatusCode());
}
return httpResponse.getStatusCode() == 204;
}
三禽笑、EurekaClient的心跳保持
上一小節(jié)已經(jīng)提到了在啟動(dòng)過程中會(huì)初始化心跳定時(shí)器
//服務(wù)刷新默認(rèn)30秒入录,可通過eureka.instance.lease-renewal-interval-in-seconds修改
int renewalIntervalInSecs = instanceInfo.getLeaseInfo().getRenewalIntervalInSecs();
int expBackOffBound = clientConfig.getHeartbeatExecutorExponentialBackOffBound();
logger.info("Starting heartbeat executor: " + "renew interval is: " + renewalIntervalInSecs);
// Heartbeat timer
scheduler.schedule(
new TimedSupervisorTask(
"heartbeat",
scheduler,
heartbeatExecutor,
renewalIntervalInSecs,
TimeUnit.SECONDS,
expBackOffBound,
new HeartbeatThread()
),
renewalIntervalInSecs, TimeUnit.SECONDS);
心跳線程,定時(shí)向注冊(cè)中心發(fā)送http請(qǐng)求佳镜,如果返回404僚稿,會(huì)重新注冊(cè)。
private class HeartbeatThread implements Runnable {
public void run() {
if (renew()) {
lastSuccessfulHeartbeatTimestamp = System.currentTimeMillis();
}
}
}
boolean renew() {
EurekaHttpResponse<InstanceInfo> httpResponse;
try {
httpResponse = eurekaTransport.registrationClient.sendHeartBeat(instanceInfo.getAppName(), instanceInfo.getId(), instanceInfo, null);
logger.debug("{} - Heartbeat status: {}", PREFIX + appPathIdentifier, httpResponse.getStatusCode());
if (httpResponse.getStatusCode() == 404) {
REREGISTER_COUNTER.increment();
logger.info("{} - Re-registering apps/{}", PREFIX + appPathIdentifier, instanceInfo.getAppName());
return register();
}
return httpResponse.getStatusCode() == 200;
} catch (Throwable e) {
logger.error("{} - was unable to send heartbeat!", PREFIX + appPathIdentifier, e);
return false;
}
}
四蟀伸、總結(jié)
在spring-cloud-netflix-eureka-client包下spring.factories可以查看配置入口(EurekaClientAutoConfiguration)
1:項(xiàng)目啟動(dòng)
2:初始化配置EurekaClientAutoConfiguration->eurekaInstanceConfigBean
3:構(gòu)造EurekaClient對(duì)象(內(nèi)部類EurekaClientAutoConfiguration::RefreshableEurekaClientConfiguration)
3.1:構(gòu)造心跳任務(wù)線程池
3.2:構(gòu)造緩存刷新任務(wù)線程池
4:?jiǎn)?dòng)定時(shí)任務(wù)(心跳+緩存刷新)
4.1:?jiǎn)?dòng)緩存刷新定時(shí)任務(wù)
4.2:?jiǎn)?dòng)心跳定時(shí)任務(wù)
4.3:?jiǎn)?dòng)instanceInfoReplicator線程蚀同,執(zhí)行注冊(cè)任務(wù)
5:服務(wù)啟動(dòng)時(shí),會(huì)延遲40秒向注冊(cè)中心注冊(cè)
6:心跳時(shí)間默認(rèn)是30秒望蜡,可通過eureka.instance.lease-renewal-interval-in-seconds修改