https://blog.csdn.net/wy0123/article/details/79852339?utm_source=blogxgwz9
1. 環(huán)境
eureka-core-1.9.2
eureka-client-1.9.2
2. eureka-client是客戶端的源碼,主要的功能有如下幾個
- 服務(wù)注冊
- 服務(wù)續(xù)約
- 服務(wù)發(fā)現(xiàn)
- 服務(wù)下線
而這些功能主要由netflix eureka實現(xiàn)键袱,其客戶端實現(xiàn)類為DiscoveryClient.java,它實現(xiàn)了EurekaClient接口,而EurekaClient又繼承了LookupService接口族奢。
3. 數(shù)據(jù)結(jié)構(gòu)
我們首先來看看這個最上層的接口都聲明了些什么?
LookupService:
Application getApplication(String appName);
Applications getApplications();
List<InstanceInfo> getInstancesById(String id);
InstanceInfo getNextServerFromEureka(String virtualHostname, boolean secure);
在這里從返回類型可以看出此接口的主要任務(wù)為獲取Application和InstanceInfo
Application:其屬性包含一個name和一個InstanceInfo類型的Set丹鸿,其name為服務(wù)名越走,為配置文件中的spring.application.name,其Set為一個InstanceInfo列表靠欢,其中記述了注冊了此服務(wù)名的Eureka節(jié)點列表廊敌。
InstanceInfo:主要記述了Eureka節(jié)點的信息,包括ip门怪、port等等骡澈。
而這四個接口分別是獲取application、application列表薪缆,還有獲取InstanceInfo列表秧廉,而從DiscoveryClient.java的實現(xiàn)來看伞广,getInstancesById()也是從applications對象里取出的。
@Override
public List<InstanceInfo> getInstancesById(String id) {
List<InstanceInfo> instancesList = new ArrayList<InstanceInfo>();
for (Application app : this.getApplications()
.getRegisteredApplications()) {
InstanceInfo instanceInfo = app.getByInstanceId(id);
if (instanceInfo != null) {
instancesList.add(instanceInfo);
}
}
return instancesList;
}
在現(xiàn)在的環(huán)境下查看
實現(xiàn)類代碼還是如上
說明在客戶端里疼电,是以服務(wù)名為單位嚼锄,保存著注冊此服務(wù)的服務(wù)提供者列表。這個服務(wù)提供者保存在InstanceInfo里蔽豺。
4. 類結(jié)構(gòu)
作為主要工作的類DiscoveryClient.java区丑,它有幾個內(nèi)部類:
- EurekaTransport.java:用于服務(wù)注冊、服務(wù)續(xù)約等與服務(wù)器通信
- HeartbeatThread.java:用于服務(wù)續(xù)約
- CacheRefreshThread.java:刷新服務(wù)列表用
5. 注冊服務(wù)
在客戶端啟動后初始化了DiscoveryClient修陡,在這里啟動了3個定時任務(wù):
- CacheRefresh線程:定時刷新服務(wù)列表
- HeartBeat線程:定時發(fā)送心跳沧侥,進行服務(wù)續(xù)約,如果服務(wù)被注冊中心剔除魄鸦,則重新注冊
- InstanceInfoReplicator線程:用來向注冊中心注冊自己(最初的注冊不是由這個定時任務(wù)完成宴杀,是由springcloud里的邏輯直接調(diào)用這個線程的run方法完成)
由此可見,netflix eureka 的客戶端組件DiscoveryClient啟動后拾因,服務(wù)的注冊是依靠這個心跳線程來實現(xiàn)的旺罢。
HeartbeatThread是其一個內(nèi)部線程類:
/**
* The heartbeat task that renews the lease in the given intervals.
*/
private class HeartbeatThread implements Runnable {
public void run() {
if (renew()) {
lastSuccessfulHeartbeatTimestamp = System.currentTimeMillis();
}
}
其run方法,調(diào)用了renew()方法:
/**
* Renew with the eureka service by making the appropriate REST call
*/
boolean renew() {
EurekaHttpResponse<InstanceInfo> httpResponse;
try {
httpResponse = eurekaTransport.registrationClient.sendHeartBeat(instanceInfo.getAppName(), instanceInfo.getId(), instanceInfo, null);
logger.debug("{} - Heartbeat status: {}", PREFIX + appPathIdentifier, httpResponse.getStatusCode());
if (httpResponse.getStatusCode() == 404) {
REREGISTER_COUNTER.increment();
logger.info("{} - Re-registering apps/{}", PREFIX + appPathIdentifier, instanceInfo.getAppName());
return register();
}
return httpResponse.getStatusCode() == 200;
} catch (Throwable e) {
logger.error("{} - was unable to send heartbeat!", PREFIX + appPathIdentifier, e);
return false;
}
}
registrationClient.sendHeartBeat向注冊中心續(xù)約绢记,如果返回404扁达,則說明已失效,需要重新調(diào)用register()注冊蠢熄。
而此線程是在一個定時任務(wù)里調(diào)用:
private void initScheduledTasks() {
........
if (clientConfig.shouldRegisterWithEureka()) {
int renewalIntervalInSecs = instanceInfo.getLeaseInfo().getRenewalIntervalInSecs();
int expBackOffBound = clientConfig.getHeartbeatExecutorExponentialBackOffBound();
logger.info("Starting heartbeat executor: " + "renew interval is: " + renewalIntervalInSecs);
// Heartbeat timer
scheduler.schedule(
new TimedSupervisorTask(
"heartbeat",
scheduler,
heartbeatExecutor,
renewalIntervalInSecs,
TimeUnit.SECONDS,
expBackOffBound,
new HeartbeatThread()
),
renewalIntervalInSecs, TimeUnit.SECONDS);
..........
}
renewalIntervalInSecs為心跳續(xù)約的時間間隔跪解,默認為30秒,則意味著服務(wù)啟動30秒后签孔,才能注冊到注冊中心叉讥。
問題:
可是我們在實際情況中,服務(wù)啟動后饥追,服務(wù)立刻就注冊到注冊中心了节吮,這是為什么?
原因是Spring Cloud Eureka在封裝的時候判耕,自己調(diào)用了register()方法,啟動同時注冊了服務(wù)翘骂。
EurekaServiceRegistry.java
public class EurekaServiceRegistry implements ServiceRegistry<EurekaRegistration> {
private static final Log log = LogFactory.getLog(EurekaServiceRegistry.class);
@Override
public void register(EurekaRegistration reg) {
maybeInitializeClient(reg);
if (log.isInfoEnabled()) {
log.info("Registering application " + reg.getInstanceConfig().getAppname()
+ " with eureka with status "
+ reg.getInstanceConfig().getInitialStatus());
}
reg.getApplicationInfoManager()
.setInstanceStatus(reg.getInstanceConfig().getInitialStatus());
if (reg.getHealthCheckHandler() != null) {
reg.getEurekaClient().registerHealthCheck(reg.getHealthCheckHandler());
}
}
........
通過調(diào)用setInstanceStatus()壁熄,立刻觸發(fā)DiscoveryClient中的statusChangeListener,
public synchronized void setInstanceStatus(InstanceStatus status) {
InstanceStatus next = instanceStatusMapper.map(status);
if (next == null) {
return;
}
InstanceStatus prev = instanceInfo.setStatus(next);
if (prev != null) {
for (StatusChangeListener listener : listeners.values()) {
try {
listener.notify(new StatusChangeEvent(prev, next));
} catch (Exception e) {
logger.warn("failed to notify listener: {}", listener.getId(), e);
}
}
}
}
listener.notify()最終調(diào)用回DiscoveryClient.java中的register()碳竟,此段代碼在com.netflix.discovery.InstanceInfoReplicator中:
public void run() {
try {
discoveryClient.refreshInstanceInfo();
Long dirtyTimestamp = instanceInfo.isDirtyWithTime();
if (dirtyTimestamp != null) {
discoveryClient.register();
instanceInfo.unsetIsDirty(dirtyTimestamp);
}
} catch (Throwable t) {
logger.warn("There was a problem with the instance info replicator", t);
} finally {
Future next = scheduler.schedule(this, replicationIntervalSeconds, TimeUnit.SECONDS);
scheduledPeriodicRef.set(next);
}
由此可見草丧,Spring Cloud EurekaClient在啟動時會向服務(wù)端注冊自己的服務(wù)信息,并告知注冊中心自己的過期時間(lease-expiration-duration-in-seconds)莹桅,并且根據(jù)配置的續(xù)約間隔時間(lease-renewal-interval-in-seconds 默認30秒)定時向服務(wù)端發(fā)送心跳包進行服務(wù)續(xù)約動作昌执。
這個注冊的注冊中心的地址是通過eureka.client.serviceUrl來指定的烛亦,那么在集群中,這個地址是多個懂拾,我們是向其中一個注冊煤禽,還是每個都注冊一遍呢?
在RetryableEurekaHttpClient中岖赋,有如下代碼段:
@Override
protected <R> EurekaHttpResponse<R> execute(RequestExecutor<R> requestExecutor) {
List<EurekaEndpoint> candidateHosts = null;
int endpointIdx = 0;
for (int retry = 0; retry < numberOfRetries; retry++) {
EurekaHttpClient currentHttpClient = delegate.get();
EurekaEndpoint currentEndpoint = null;
if (currentHttpClient == null) {
if (candidateHosts == null) {
candidateHosts = getHostCandidates();
if (candidateHosts.isEmpty()) {
throw new TransportException("There is no known eureka server; cluster server list is empty");
}
}
if (endpointIdx >= candidateHosts.size()) {
throw new TransportException("Cannot execute request on any known server");
}
currentEndpoint = candidateHosts.get(endpointIdx++);
currentHttpClient = clientFactory.newClient(currentEndpoint);
}
try {
EurekaHttpResponse<R> response = requestExecutor.execute(currentHttpClient);
if (serverStatusEvaluator.accept(response.getStatusCode(), requestExecutor.getRequestType())) {
delegate.set(currentHttpClient);
if (retry > 0) {
logger.info("Request execution succeeded on retry #{}", retry);
}
return response;
}
logger.warn("Request execution failure with status code {}; retrying on another server if available", response.getStatusCode());
} catch (Exception e) {
logger.warn("Request execution failed with message: {}", e.getMessage()); // just log message as the underlying client should log the stacktrace
}
// Connection error or 5xx from the server that must be retried on another server
delegate.compareAndSet(currentHttpClient, null);
if (currentEndpoint != null) {
quarantineSet.add(currentEndpoint);
}
}
throw new TransportException("Retry limit reached; giving up on completing the request");
}
這里的getHostCandidates()方法檬果,是從配置文件中取得配置得serviceUrl列表,之后唐断,只取了第一個:
currentEndpoint = candidateHosts.get(endpointIdx++);
如果注冊失敗选脊,則循環(huán)重試,用serviceUrl列表的下一個脸甘。
至于注冊信息怎么在注冊中心傳播恳啥,在Eureka Server再進行學習。
6. 發(fā)現(xiàn)服務(wù)
前面已經(jīng)注冊了服務(wù)丹诀,服務(wù)是用來消費的钝的,所以Eureka還提供了從注冊中心發(fā)現(xiàn)服務(wù)的功能。
這個發(fā)現(xiàn)服務(wù)的功能就是由上面所說的CacheRefresh線程來實現(xiàn)的忿墅。
DiscoveryClient(ApplicationInfoManager applicationInfoManager, EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args,
.....................
cacheRefreshExecutor = new ThreadPoolExecutor(
1, clientConfig.getCacheRefreshExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
new ThreadFactoryBuilder()
.setNameFormat("DiscoveryClient-CacheRefreshExecutor-%d")
.setDaemon(true)
.build()
); // use direct handoff
..........................
if (clientConfig.shouldFetchRegistry() && !fetchRegistry(false)) {
fetchRegistryFromBackup();
}
..........................
if (clientConfig.shouldFetchRegistry()) {
// registry cache refresh timer
int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds();
int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound();
scheduler.schedule(
new TimedSupervisorTask(
"cacheRefresh",
scheduler,
cacheRefreshExecutor,
registryFetchIntervalSeconds,
TimeUnit.SECONDS,
expBackOffBound,
new CacheRefreshThread()
),
registryFetchIntervalSeconds, TimeUnit.SECONDS);
}
registryFetchIntervalSeconds為刷新服務(wù)列表的間隔時間扁藕。這樣如果注冊中心服務(wù)列表有變化,就會被同步到客戶端疚脐。
在掉用initScheduledTasks()方法之前亿柑,和刷新服務(wù)列表信息定時任務(wù)之間,代碼還掉用了fetchRegistry(boolean forceFullRegistryFetch)方法棍弄,立刻拉取了服務(wù)端的信息望薄。這就是為什么定時任務(wù)為30秒,但是客戶端啟動后呼畸,立刻就能獲取到服務(wù)列表的原因痕支。
若初始拉取注冊信息失敗,從備份注冊中心獲取蛮原。備份注冊中心為構(gòu)造函數(shù)的最后一個參數(shù)卧须。
7.服務(wù)注冊節(jié)點與服務(wù)發(fā)現(xiàn)節(jié)點的關(guān)系
由前面的介紹可以發(fā)現(xiàn)每個客戶端節(jié)點在啟動時,都需要連接注冊中心儒陨,通過instanceInfoReplicator(間隔30秒)花嘶,這個過程也是注冊服務(wù)的過程。
所以每個Eureka節(jié)點既可以是服務(wù)生產(chǎn)者蹦漠,也可以是服務(wù)消費者椭员。