??spring cloud默認(rèn)使用eureka來做服務(wù)治理旁钧。作為client的核心功能有兩個(gè):服務(wù)發(fā)現(xiàn)和服務(wù)注冊(cè)。
??通過查看spring-cloud-netflix-eureka-client下的類發(fā)現(xiàn)功能核心類是CloudEurekaClient畅姊,他繼承了netflix的DiscoveryClient。
查看DiscoveryClient源碼唉韭,注釋中寫明了DiscoveryClient的四個(gè)功能:
下面一個(gè)個(gè)來看這些功能是如何實(shí)現(xiàn)的陶珠。
1.d)服務(wù)發(fā)現(xiàn)getApplications()
@Override
public Applications getApplications() {
return localRegionApps.get();
}
localRegionApps是一個(gè)成員變量
private final AtomicReference<Applications> localRegionApps = new AtomicReference<Applications>();
所以一定是有其他地方先set了Applications,getApplications() 方法才能正確返回蒋譬。通過搜索代碼割岛,發(fā)現(xiàn)為localRegionApps設(shè)值的地方代碼基本相似:
EurekaHttpResponse<Applications> httpResponse = clientConfig.getRegistryRefreshSingleVipAddress() == null
? eurekaTransport.queryClient.getApplications(remoteRegionsRef.get())
: eurekaTransport.queryClient.getVip(clientConfig.getRegistryRefreshSingleVipAddress(), remoteRegionsRef.get());
Applications serverApps = httpResponse.getEntity();
localRegionApps.set(this.filterAndShuffle(serverApps));
可以看出localRegionApps的來源,通過eurekaTransport.queryClient獲得一個(gè)EurekaHttpResponse<Applications>犯助,再進(jìn)過過濾和亂序處理存入localRegionApps癣漆。
??那現(xiàn)在就有兩個(gè)問題:1.這段代碼是由誰執(zhí)行的,2.eurekaTransport.queryClient是如何工作的剂买。
??通過call Hierarchy發(fā)現(xiàn)調(diào)用鏈的最開始:
class CacheRefreshThread implements Runnable {
public void run() {
refreshRegistry();
}
}
??查詢CacheRefreshThread被使用的地方:
private void initScheduledTasks(){...}
而initScheduledTasks()被調(diào)用的地方是DiscoveryClient的構(gòu)造函數(shù)惠爽,所以這一切在DiscoveryClient被創(chuàng)建時(shí)就開始了。
查看initScheduledTasks()代碼:
scheduler.schedule(
new TimedSupervisorTask(
"cacheRefresh",
scheduler,
cacheRefreshExecutor,
registryFetchIntervalSeconds,
TimeUnit.SECONDS,
expBackOffBound,
new CacheRefreshThread()
),
registryFetchIntervalSeconds, TimeUnit.SECONDS);
...
scheduler.schedule(
new TimedSupervisorTask(
"heartbeat",
scheduler,
heartbeatExecutor,
renewalIntervalInSecs,
TimeUnit.SECONDS,
expBackOffBound,
new HeartbeatThread()
),
renewalIntervalInSecs, TimeUnit.SECONDS);
可以看出這是由線程池周期執(zhí)行的任務(wù)瞬哼,那現(xiàn)在就有兩個(gè)地方:1.執(zhí)行的線程池scheduler婚肆、cacheRefreshExecutor、cacheRefreshExecutor 2.被執(zhí)行的回調(diào):HeartbeatThread坐慰、CacheRefreshThread较性。CacheRefreshThread上面知道了是用來設(shè)置localRegionApps的,HeartbeatThread從注釋來看是用來b)服務(wù)續(xù)約的:
/**
* The heartbeat task that renews the lease in the given intervals.
*/
private class HeartbeatThread implements Runnable {
public void run() {
if (renew()) {
lastSuccessfulHeartbeatTimestamp = System.currentTimeMillis();
}
}
}
找到三個(gè)線程池的初始化代碼:
scheduler = Executors.newScheduledThreadPool(2,
new ThreadFactoryBuilder()
.setNameFormat("DiscoveryClient-%d")
.setDaemon(true)
.build());
heartbeatExecutor = new ThreadPoolExecutor(
1, clientConfig.getHeartbeatExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
new ThreadFactoryBuilder()
.setNameFormat("DiscoveryClient-HeartbeatExecutor-%d")
.setDaemon(true)
.build()
); // use direct handoff
cacheRefreshExecutor = new ThreadPoolExecutor(
1, clientConfig.getCacheRefreshExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
new ThreadFactoryBuilder()
.setNameFormat("DiscoveryClient-CacheRefreshExecutor-%d")
.setDaemon(true)
.build()
);
可以看出eureka默認(rèn)用了兩個(gè)線程來做任務(wù)調(diào)度讨越,兩個(gè)任務(wù)各有一個(gè)專屬線程獨(dú)自負(fù)責(zé)任務(wù)的實(shí)際執(zhí)行两残。這種依靠線程池的隔離策略,在netflix組件中用到的地方有很多把跨。
????找到了CacheRefreshThread是被誰調(diào)用的人弓,接著分析eurekaTransport.queryClient是如何工作的。eurekaTransport.queryClient的服務(wù)發(fā)現(xiàn)方法:getApplications()
@Override
public EurekaHttpResponse<Applications> getApplications(String... regions) {
return getApplicationsInternal("apps/", regions);
}
private EurekaHttpResponse<Applications> getApplicationsInternal(String urlPath,
String[] regions) {
String url = serviceUrl + urlPath;
if (regions != null && regions.length > 0)
urlPath = (urlPath.contains("?") ? "&" : "?") + "regions="
+ StringUtil.join(regions);
ResponseEntity<EurekaApplications> response = restTemplate.exchange(url,
HttpMethod.GET, null, EurekaApplications.class);
return anEurekaHttpResponse(response.getStatusCodeValue(),
response.getStatusCode().value() == HttpStatus.OK.value()
&& response.hasBody() ? (Applications) response.getBody() : null)
.headers(headersOf(response)).build();
}
可以看出是向serviceUrl進(jìn)行http請(qǐng)求獲得相關(guān)數(shù)據(jù)的着逐。而serviceUrl是DiscoveryClient在初始化eurekaTransport時(shí)傳入的崔赌。DiscoveryClient初始化時(shí)主要有三個(gè)參數(shù):ApplicationInfoManager、EurekaClientConfig耸别、ApplicationEventPublisher健芭,通過查看配置類EurekaClientAutoConfiguration可以看到,在SpringCloud中創(chuàng)建的其實(shí)是DiscoveryClient的子類CloudEurekaClient:
@Bean(destroyMethod = "shutdown")
@ConditionalOnMissingBean(value = EurekaClient.class, search = SearchStrategy.CURRENT)
public EurekaClient eurekaClient(ApplicationInfoManager manager, EurekaClientConfig config) {
return new CloudEurekaClient(manager, config, this.optionalArgs,
this.context);
}
ApplicationInfoManager秀姐、EurekaClientConfig是自動(dòng)注入的ApplicationInfoManager manager, EurekaClientConfig config而ApplicationEventPublisher是容器上下文ApplicationContext慈迈。繼續(xù)查找代碼可以發(fā)現(xiàn):
@Bean
@ConditionalOnMissingBean(value = ApplicationInfoManager.class, search = SearchStrategy.CURRENT)
public ApplicationInfoManager eurekaApplicationInfoManager(
EurekaInstanceConfig config) {
InstanceInfo instanceInfo = new InstanceInfoFactory().create(config);
return new ApplicationInfoManager(config, instanceInfo);
}
@ConfigurationProperties(EurekaClientConfigBean.PREFIX)
public class EurekaClientConfigBean implements EurekaClientConfig {...}
????ApplicationInfoManager來源于EurekaInstanceConfig ,而EurekaInstanceConfig 其實(shí)是由spring創(chuàng)建的EurekaInstanceConfigBean省有,通過@ConfigurationProperties("eureka.instance")收集了配置文件中的eureka.instance前綴的配置痒留。
????EurekaClientConfig其實(shí)是由spring實(shí)現(xiàn)并創(chuàng)建的EurekaClientConfigBean谴麦,通過@ConfigurationProperties("eureka.client")收集了配置文件中的eureka.client前綴的配置。
????到這可以看出來springcloud做的工作其實(shí)就是收集配置并用來初始化DiscoveryClient伸头。其實(shí)spring本質(zhì)上作為一個(gè)beanfactory匾效,工作就是創(chuàng)建并管理bean的生命周期。
2.ab)注冊(cè)和續(xù)約
注冊(cè)方法:register()恤磷,查看調(diào)用發(fā)現(xiàn)在DiscoveryClient的構(gòu)造函數(shù)和renew()函數(shù)中被調(diào)用了面哼。查看代碼:
/**
* Register with the eureka service by making the appropriate REST call.
*/
boolean register() throws Throwable {
logger.info(PREFIX + "{}: registering service...", appPathIdentifier);
EurekaHttpResponse<Void> httpResponse;
try {
httpResponse = eurekaTransport.registrationClient.register(instanceInfo);
} catch (Exception e) {
logger.warn(PREFIX + "{} - registration failed {}", appPathIdentifier, e.getMessage(), e);
throw e;
}
if (logger.isInfoEnabled()) {
logger.info(PREFIX + "{} - registration status: {}", appPathIdentifier, httpResponse.getStatusCode());
}
return httpResponse.getStatusCode() == 204;
}
其實(shí)是通過eurekaTransport.registrationClient.register(instanceInfo)和server進(jìn)行了一次rest-http調(diào)用。instanceInfo是這個(gè)客戶端本身扫步。
續(xù)約方法:renew()魔策,上面說到了續(xù)約方法被放入new HeartbeatThread()中,被線程池周期執(zhí)行锌妻。
/**
* Renew with the eureka service by making the appropriate REST call
*/
boolean renew() {
EurekaHttpResponse<InstanceInfo> httpResponse;
try {
httpResponse = eurekaTransport.registrationClient.sendHeartBeat(instanceInfo.getAppName(), instanceInfo.getId(), instanceInfo, null);
logger.debug(PREFIX + "{} - Heartbeat status: {}", appPathIdentifier, httpResponse.getStatusCode());
if (httpResponse.getStatusCode() == 404) {
REREGISTER_COUNTER.increment();
logger.info(PREFIX + "{} - Re-registering apps/{}", appPathIdentifier, instanceInfo.getAppName());
long timestamp = instanceInfo.setIsDirtyWithTime();
boolean success = register();
if (success) {
instanceInfo.unsetIsDirty(timestamp);
}
return success;
}
return httpResponse.getStatusCode() == 200;
} catch (Throwable e) {
logger.error(PREFIX + "{} - was unable to send heartbeat!", appPathIdentifier, e);
return false;
}
}
可以看出也是通過 eurekaTransport.registrationClient進(jìn)行一次http請(qǐng)求代乃,如果請(qǐng)求失敗,則調(diào)用一次register()從新注冊(cè)仿粹,如果都失敗則返回false。
3.c)注銷
@PreDestroy
@Override
public synchronized void shutdown() {
if (isShutdown.compareAndSet(false, true)) {
logger.info("Shutting down DiscoveryClient ...");
if (statusChangeListener != null && applicationInfoManager != null) {
applicationInfoManager.unregisterStatusChangeListener(statusChangeListener.getId());
}
cancelScheduledTasks(); //取消上面所說的兩個(gè)周期任務(wù)原茅,并關(guān)閉線程池
// If APPINFO was registered
if (applicationInfoManager != null
&& clientConfig.shouldRegisterWithEureka()
&& clientConfig.shouldUnregisterOnShutdown()) {
applicationInfoManager.setInstanceStatus(InstanceStatus.DOWN);
unregister();
}
if (eurekaTransport != null) {
eurekaTransport.shutdown();
}
heartbeatStalenessMonitor.shutdown();
registryStalenessMonitor.shutdown();
logger.info("Completed shut down of DiscoveryClient");
}
}
可以看出主要工作有兩部分吭历,1.回收資源 2.調(diào)用unregister()通知eureka-server注銷。查看unregister()代碼:
void unregister() {
// It can be null if shouldRegisterWithEureka == false
if(eurekaTransport != null && eurekaTransport.registrationClient != null) {
try {
logger.info("Unregistering ...");
EurekaHttpResponse<Void> httpResponse = eurekaTransport.registrationClient.cancel(instanceInfo.getAppName(), instanceInfo.getId());
logger.info(PREFIX + "{} - deregister status: {}", appPathIdentifier, httpResponse.getStatusCode());
} catch (Exception e) {
logger.error(PREFIX + "{} - de-registration failed{}", appPathIdentifier, e.getMessage(), e);
}
}
}
可以看出這里也是通過eurekaTransport.registrationClient進(jìn)行了一次http請(qǐng)求擂橘。
綜上所述晌区,eureka的主要靠獨(dú)立的線程池執(zhí)行周期性任務(wù)來執(zhí)行http請(qǐng)求來進(jìn)行服務(wù)發(fā)現(xiàn)的更新和服務(wù)續(xù)約。而spring所扮演的角色只是DiscoveryClient的創(chuàng)建和管理者通贞,并沒有改變eureka的內(nèi)部功能朗若。我們也可以通過自己創(chuàng)建和管理DiscoveryClient在非springcloud項(xiàng)目中獨(dú)立地使用eureka,eureka功能完備昌罩,自己集成相對(duì)簡單哭懈。總之茎用,就是從server通過http請(qǐng)求獲得服務(wù)數(shù)據(jù)而已遣总,可以自己通過瀏覽器訪問:http://localhost:8761/eureka/apps看看數(shù)據(jù)