Spring官網(wǎng)的Eureka Client使用教程
一 Maven依賴
對(duì)應(yīng)的版本是2.1.3.RELEASE
默認(rèn)已成功搭建Eureka Client客戶端和Eureka Server注冊中心
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>Greenwich.SR3</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
</dependency>
二 自動(dòng)配置類
EurekaClientAutoConfiguration
是Eureka Client客戶端中的自動(dòng)配置類,列舉出其中部分比較重要的Bean
- EurekaClientConfigBean
- EurekaInstanceConfigBean
- ApplicationInfoManager
- EurekaClient
三 Bean釋義
- EurekaClientConfigBean
當(dāng)前服務(wù)作為Eureka Client客戶端的配置信息
@Bean
@ConditionalOnMissingBean(value = EurekaClientConfig.class, search = SearchStrategy.CURRENT)
public EurekaClientConfigBean eurekaClientConfigBean(ConfigurableEnvironment env) {
EurekaClientConfigBean client = new EurekaClientConfigBean();
if ("bootstrap".equals(this.env.getProperty("spring.config.name"))) {
// We don't register during bootstrap by default, but there will be another
// chance later.
client.setRegisterWithEureka(false);
}
return client;
}
- EurekaInstanceConfigBean
當(dāng)前服務(wù)作為Eureka Client客戶端實(shí)例的原生配置信息。可以看到嘗試從yaml里取各種配置信息景埃,例如服務(wù)的context-path矩乐、業(yè)務(wù)ip和端口摔认、管理ip和端口原押,以及設(shè)置其他信息哈恰。
@Bean
@ConditionalOnMissingBean(value = EurekaInstanceConfig.class, search = SearchStrategy.CURRENT)
public EurekaInstanceConfigBean eurekaInstanceConfigBean(InetUtils inetUtils,
ManagementMetadataProvider managementMetadataProvider) {
String hostname = getProperty("eureka.instance.hostname");
boolean preferIpAddress = Boolean
.parseBoolean(getProperty("eureka.instance.prefer-ip-address"));
String ipAddress = getProperty("eureka.instance.ip-address");
boolean isSecurePortEnabled = Boolean
.parseBoolean(getProperty("eureka.instance.secure-port-enabled"));
String serverContextPath = env.getProperty("server.servlet.context-path", "/");
int serverPort = Integer
.valueOf(env.getProperty("server.port", env.getProperty("port", "8080")));
Integer managementPort = env.getProperty("management.server.port", Integer.class); // nullable.
// should
// be
// wrapped
// into
// optional
String managementContextPath = env
.getProperty("management.server.servlet.context-path"); // nullable.
// should
// be wrapped into
// optional
Integer jmxPort = env.getProperty("com.sun.management.jmxremote.port",
Integer.class); // nullable
EurekaInstanceConfigBean instance = new EurekaInstanceConfigBean(inetUtils);
instance.setNonSecurePort(serverPort);
instance.setInstanceId(getDefaultInstanceId(env));
instance.setPreferIpAddress(preferIpAddress);
instance.setSecurePortEnabled(isSecurePortEnabled);
if (StringUtils.hasText(ipAddress)) {
instance.setIpAddress(ipAddress);
}
if (isSecurePortEnabled) {
instance.setSecurePort(serverPort);
}
if (StringUtils.hasText(hostname)) {
instance.setHostname(hostname);
}
String statusPageUrlPath = getProperty("eureka.instance.status-page-url-path");
String healthCheckUrlPath = getProperty("eureka.instance.health-check-url-path");
if (StringUtils.hasText(statusPageUrlPath)) {
instance.setStatusPageUrlPath(statusPageUrlPath);
}
if (StringUtils.hasText(healthCheckUrlPath)) {
instance.setHealthCheckUrlPath(healthCheckUrlPath);
}
ManagementMetadata metadata = managementMetadataProvider.get(instance, serverPort,
serverContextPath, managementContextPath, managementPort);
if (metadata != null) {
instance.setStatusPageUrl(metadata.getStatusPageUrl());
instance.setHealthCheckUrl(metadata.getHealthCheckUrl());
if (instance.isSecurePortEnabled()) {
instance.setSecureHealthCheckUrl(metadata.getSecureHealthCheckUrl());
}
Map<String, String> metadataMap = instance.getMetadataMap();
metadataMap.computeIfAbsent("management.port",
k -> String.valueOf(metadata.getManagementPort()));
}
else {
// without the metadata the status and health check URLs will not be set
// and the status page and health check url paths will not include the
// context path so set them here
if (StringUtils.hasText(managementContextPath)) {
instance.setHealthCheckUrlPath(
managementContextPath + instance.getHealthCheckUrlPath());
instance.setStatusPageUrlPath(
managementContextPath + instance.getStatusPageUrlPath());
}
}
setupJmxPort(instance, jmxPort);
return instance;
}
- ApplicationInfoManager
EurekaClientAutoConfiguration
的內(nèi)部配置類RefreshableEurekaClientConfiguration
內(nèi)配置的Bean派哲。InstanceInfo作為服務(wù)實(shí)例信息臼氨,由原生配置屬性轉(zhuǎn)化而來(最終會(huì)用以注冊到Eureka Server注冊中心)“沤欤客戶端配置信息與服務(wù)實(shí)例信息再封裝為ApplicationInfoManager储矩。
@Bean
@ConditionalOnMissingBean(value = ApplicationInfoManager.class, search = SearchStrategy.CURRENT)
@org.springframework.cloud.context.config.annotation.RefreshScope
@Lazy
public ApplicationInfoManager eurekaApplicationInfoManager(
EurekaInstanceConfig config) {
InstanceInfo instanceInfo = new InstanceInfoFactory().create(config);
return new ApplicationInfoManager(config, instanceInfo);
}
- EurekaClient
EurekaClient接口只定義了Eureka Client客戶端的簡單行為,其實(shí)現(xiàn)類DiscoveryClient作為核心類來負(fù)責(zé)與Eureka Server注冊中心的交互褂乍,CloudEurekaClient繼承自DiscoveryClient持隧。
@Bean(destroyMethod = "shutdown")
@ConditionalOnMissingBean(value = EurekaClient.class, search = SearchStrategy.CURRENT)
@org.springframework.cloud.context.config.annotation.RefreshScope
@Lazy
public EurekaClient eurekaClient(ApplicationInfoManager manager,
EurekaClientConfig config, EurekaInstanceConfig instance,
@Autowired(required = false) HealthCheckHandler healthCheckHandler) {
// If we use the proxy of the ApplicationInfoManager we could run into a
// problem
// when shutdown is called on the CloudEurekaClient where the
// ApplicationInfoManager bean is
// requested but wont be allowed because we are shutting down. To avoid this
// we use the
// object directly.
ApplicationInfoManager appManager;
if (AopUtils.isAopProxy(manager)) {
appManager = ProxyUtils.getTargetObject(manager);
}
else {
appManager = manager;
}
CloudEurekaClient cloudEurekaClient = new CloudEurekaClient(appManager,
config, this.optionalArgs, this.context);
cloudEurekaClient.registerHealthCheck(healthCheckHandler);
return cloudEurekaClient;
}
四 Eureka Client客戶端啟動(dòng)與注冊中心的交互
DiscoveryClient作為EurekaClient的實(shí)現(xiàn)類,是開啟Eureka Client客戶端與Eureka Server注冊中心交互的入口逃片。上面托管給容器時(shí)子類CloudEurekaClient在其構(gòu)造器中調(diào)用了父類DiscoveryClient的構(gòu)造方法屡拨,調(diào)用到的核心構(gòu)造方法如下,限于篇幅只展示構(gòu)造器中的部分核心代碼塊。
- 在try代碼塊中分別創(chuàng)建了一個(gè)Schedule線程池和兩個(gè)普通線程池呀狼。
- 根據(jù)后面兩個(gè)線程池名稱可以看出其用途分別為心跳檢測和緩存裂允。
- 最后執(zhí)行initScheduledTasks方法
@Inject
DiscoveryClient(ApplicationInfoManager applicationInfoManager, EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args,
Provider<BackupRegistry> backupRegistryProvider, EndpointRandomizer endpointRandomizer) {
// 有省略部分代碼
try {
// default size of 2 - 1 each for heartbeat and cacheRefresh
scheduler = Executors.newScheduledThreadPool(2,
new ThreadFactoryBuilder()
.setNameFormat("DiscoveryClient-%d")
.setDaemon(true)
.build());
heartbeatExecutor = new ThreadPoolExecutor(
1, clientConfig.getHeartbeatExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
new ThreadFactoryBuilder()
.setNameFormat("DiscoveryClient-HeartbeatExecutor-%d")
.setDaemon(true)
.build()
); // use direct handoff
cacheRefreshExecutor = new ThreadPoolExecutor(
1, clientConfig.getCacheRefreshExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
new ThreadFactoryBuilder()
.setNameFormat("DiscoveryClient-CacheRefreshExecutor-%d")
.setDaemon(true)
.build()
); // use direct handoff
eurekaTransport = new EurekaTransport();
scheduleServerEndpointTask(eurekaTransport, args);
AzToRegionMapper azToRegionMapper;
if (clientConfig.shouldUseDnsForFetchingServiceUrls()) {
azToRegionMapper = new DNSBasedAzToRegionMapper(clientConfig);
} else {
azToRegionMapper = new PropertyBasedAzToRegionMapper(clientConfig);
}
if (null != remoteRegionsToFetch.get()) {
azToRegionMapper.setRegionsToFetch(remoteRegionsToFetch.get().split(","));
}
instanceRegionChecker = new InstanceRegionChecker(azToRegionMapper, clientConfig.getRegion());
} catch (Throwable e) {
throw new RuntimeException("Failed to initialize DiscoveryClient!", e);
}
if (clientConfig.shouldFetchRegistry() && !fetchRegistry(false)) {
fetchRegistryFromBackup();
}
// call and execute the pre registration handler before all background tasks (inc registration) is started
if (this.preRegistrationHandler != null) {
this.preRegistrationHandler.beforeRegistration();
}
if (clientConfig.shouldRegisterWithEureka() && clientConfig.shouldEnforceRegistrationAtInit()) {
try {
if (!register() ) {
throw new IllegalStateException("Registration error at startup. Invalid server response.");
}
} catch (Throwable th) {
logger.error("Registration error at startup: {}", th.getMessage());
throw new IllegalStateException(th);
}
}
// finally, init the schedule tasks (e.g. cluster resolvers, heartbeat, instanceInfo replicator, fetch
initScheduledTasks();
// 有省略部分代碼
}
在方法initScheduledTasks中
- clientConfig是自動(dòng)配置類中托管給容器的EurekaClientConfigBean
- -- 默認(rèn)開啟從注冊中心獲取服務(wù)開關(guān)
-- 默認(rèn)獲取服務(wù)間隔registryFetchIntervalSeconds為30S
-- 默認(rèn)獲取服務(wù)間隔超時(shí)次數(shù)expBackOffBound為10次
-- 默認(rèn)開啟向注冊中心注冊服務(wù)開關(guān)
-- 默認(rèn)續(xù)約間隔renewalIntervalInSecs為30S
-- 默認(rèn)續(xù)約間隔超時(shí)次數(shù)expBackOffBound為10次 - 向一開始創(chuàng)建的Schedule線程池提交兩個(gè)TimedSupervisorTask任務(wù)(父類TimerTask有實(shí)現(xiàn)Runnable)。
- Schedule線程池最終調(diào)度到的任務(wù)分別是
CacheRefreshThread
和HeartbeatThread
(兩個(gè)類均有實(shí)現(xiàn)Runnable) - 兩個(gè)任務(wù)的延遲執(zhí)行時(shí)間哥艇、執(zhí)行周期間隔時(shí)間和超時(shí)重試次數(shù)為上述配置信息绝编。
- 創(chuàng)建當(dāng)前客戶端實(shí)例的副本
InstanceInfoReplicator
,在配置了事件發(fā)布器之后貌踏,執(zhí)行了其start方法十饥。
private void initScheduledTasks() {
if (clientConfig.shouldFetchRegistry()) {
// registry cache refresh timer
int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds();
int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound();
scheduler.schedule(
new TimedSupervisorTask(
"cacheRefresh",
scheduler,
cacheRefreshExecutor,
registryFetchIntervalSeconds,
TimeUnit.SECONDS,
expBackOffBound,
new CacheRefreshThread()
),
registryFetchIntervalSeconds, TimeUnit.SECONDS);
}
if (clientConfig.shouldRegisterWithEureka()) {
int renewalIntervalInSecs = instanceInfo.getLeaseInfo().getRenewalIntervalInSecs();
int expBackOffBound = clientConfig.getHeartbeatExecutorExponentialBackOffBound();
logger.info("Starting heartbeat executor: " + "renew interval is: {}", renewalIntervalInSecs);
// Heartbeat timer
scheduler.schedule(
new TimedSupervisorTask(
"heartbeat",
scheduler,
heartbeatExecutor,
renewalIntervalInSecs,
TimeUnit.SECONDS,
expBackOffBound,
new HeartbeatThread()
),
renewalIntervalInSecs, TimeUnit.SECONDS);
// InstanceInfo replicator
instanceInfoReplicator = new InstanceInfoReplicator(
this,
instanceInfo,
clientConfig.getInstanceInfoReplicationIntervalSeconds(),
2); // burstSize
statusChangeListener = new ApplicationInfoManager.StatusChangeListener() {
@Override
public String getId() {
return "statusChangeListener";
}
@Override
public void notify(StatusChangeEvent statusChangeEvent) {
if (InstanceStatus.DOWN == statusChangeEvent.getStatus() ||
InstanceStatus.DOWN == statusChangeEvent.getPreviousStatus()) {
// log at warn level if DOWN was involved
logger.warn("Saw local status change event {}", statusChangeEvent);
} else {
logger.info("Saw local status change event {}", statusChangeEvent);
}
instanceInfoReplicator.onDemandUpdate();
}
};
if (clientConfig.shouldOnDemandUpdateStatusChange()) {
applicationInfoManager.registerStatusChangeListener(statusChangeListener);
}
instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds());
} else {
logger.info("Not registering with Eureka server per configuration");
}
}
五 Eureka Client客戶端與注冊中心的交互
-
服務(wù)獲取
任務(wù)CacheRefreshThread
完成了從Eureka Server注冊中心獲取所有服務(wù)并緩存到本地的功能。
從run()方法開始哩俭,依次的調(diào)用鏈為:refreshRegistry() > fetchRegistry() > getAndUpdateDelta()
- getAndUpdateDelta的入?yún)楸镜鼐彺娴乃蟹?wù),第一次執(zhí)行該任務(wù)時(shí)拳恋,只有當(dāng)前服務(wù)自身凡资。
- 使用EurekaHttpClient向注冊中心發(fā)起獲取所有服務(wù)的請(qǐng)求,最終由AbstractJerseyEurekaHttpClient發(fā)起(具體套娃式的Http請(qǐng)求過程不再分析谬运,用到了Jersey隙赁,有興趣可以了解)
- 注冊中心響應(yīng)請(qǐng)求并返回結(jié)果,從響應(yīng)httpResponse中拿到注冊中心返回的所有服務(wù)信息梆暖,在updateDelta中完成對(duì)本地緩存的服務(wù)更新
class CacheRefreshThread implements Runnable {
public void run() {
refreshRegistry();
}
}
// 有省略部分代碼
private void getAndUpdateDelta(Applications applications) throws Throwable {
long currentUpdateGeneration = fetchRegistryGeneration.get();
Applications delta = null;
EurekaHttpResponse<Applications> httpResponse = eurekaTransport.queryClient.getDelta(remoteRegionsRef.get());
if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) {
delta = httpResponse.getEntity();
}
if (delta == null) {
logger.warn("The server does not allow the delta revision to be applied because it is not safe. "
+ "Hence got the full registry.");
getAndStoreFullRegistry();
} else if (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1)) {
logger.debug("Got delta update with apps hashcode {}", delta.getAppsHashCode());
String reconcileHashCode = "";
if (fetchRegistryUpdateLock.tryLock()) {
try {
updateDelta(delta);
reconcileHashCode = getReconcileHashCode(applications);
} finally {
fetchRegistryUpdateLock.unlock();
}
} else {
logger.warn("Cannot acquire update lock, aborting getAndUpdateDelta");
}
// There is a diff in number of instances for some reason
if (!reconcileHashCode.equals(delta.getAppsHashCode()) || clientConfig.shouldLogDeltaDiff()) {
reconcileAndLogDifference(delta, reconcileHashCode); // this makes a remoteCall
}
} else {
logger.warn("Not updating application delta as another thread is updating it already");
logger.debug("Ignoring delta update with apps hashcode {}, as another thread is updating it already", delta.getAppsHashCode());
}
}
- 入?yún)閺淖灾行哪玫降乃蟹?wù)
- delta中保存了多個(gè)服務(wù)信息伞访,單個(gè)服務(wù)Application通常會(huì)包含多個(gè)節(jié)點(diǎn)(保證高可用性),InstanceInfo即為單個(gè)節(jié)點(diǎn)的信息轰驳。
- 節(jié)點(diǎn)信息不同的ActionType會(huì)有不同的處理方式
- 最終所有服務(wù)保存在
Applications
的屬性Map<String, Application> appNameApplicationMap
上厚掷,每個(gè)服務(wù)Application
內(nèi)又使用Map<String, InstanceInfo> instancesMap
保存了服務(wù)的所有實(shí)例信息。
private void updateDelta(Applications delta) {
int deltaCount = 0;
for (Application app : delta.getRegisteredApplications()) {
for (InstanceInfo instance : app.getInstances()) {
Applications applications = getApplications();
String instanceRegion = instanceRegionChecker.getInstanceRegion(instance);
if (!instanceRegionChecker.isLocalRegion(instanceRegion)) {
Applications remoteApps = remoteRegionVsApps.get(instanceRegion);
if (null == remoteApps) {
remoteApps = new Applications();
remoteRegionVsApps.put(instanceRegion, remoteApps);
}
applications = remoteApps;
}
++deltaCount;
if (ActionType.ADDED.equals(instance.getActionType())) {
Application existingApp = applications.getRegisteredApplications(instance.getAppName());
if (existingApp == null) {
applications.addApplication(app);
}
logger.debug("Added instance {} to the existing apps in region {}", instance.getId(), instanceRegion);
applications.getRegisteredApplications(instance.getAppName()).addInstance(instance);
} else if (ActionType.MODIFIED.equals(instance.getActionType())) {
Application existingApp = applications.getRegisteredApplications(instance.getAppName());
if (existingApp == null) {
applications.addApplication(app);
}
logger.debug("Modified instance {} to the existing apps ", instance.getId());
applications.getRegisteredApplications(instance.getAppName()).addInstance(instance);
} else if (ActionType.DELETED.equals(instance.getActionType())) {
Application existingApp = applications.getRegisteredApplications(instance.getAppName());
if (existingApp != null) {
logger.debug("Deleted instance {} to the existing apps ", instance.getId());
existingApp.removeInstance(instance);
/*
* We find all instance list from application(The status of instance status is not only the status is UP but also other status)
* if instance list is empty, we remove the application.
*/
if (existingApp.getInstancesAsIsFromEureka().isEmpty()) {
applications.removeApplication(existingApp);
}
}
}
}
}
logger.debug("The total number of instances fetched by the delta processor : {}", deltaCount);
getApplications().setVersion(delta.getVersion());
getApplications().shuffleInstances(clientConfig.shouldFilterOnlyUpInstances());
for (Applications applications : remoteRegionVsApps.values()) {
applications.setVersion(delta.getVersion());
applications.shuffleInstances(clientConfig.shouldFilterOnlyUpInstances());
}
}
-
服務(wù)續(xù)約
任務(wù)HeartbeatThread
完成了向注冊中心續(xù)約服務(wù)的功能级解。
- 使用當(dāng)前客戶端實(shí)例信息冒黑,向注冊中心發(fā)起服務(wù)續(xù)約請(qǐng)求
- 正常情況續(xù)約成功
- 續(xù)約接口404的情況下,使用注冊服務(wù)接口代替勤哗。
- 續(xù)約成功的前提下抡爹,客戶端更新自身維護(hù)的最后一次成功的心跳時(shí)間
private class HeartbeatThread implements Runnable {
public void run() {
if (renew()) {
lastSuccessfulHeartbeatTimestamp = System.currentTimeMillis();
}
}
}
boolean renew() {
EurekaHttpResponse<InstanceInfo> httpResponse;
try {
httpResponse = eurekaTransport.registrationClient.sendHeartBeat(instanceInfo.getAppName(), instanceInfo.getId(), instanceInfo, null);
logger.debug(PREFIX + "{} - Heartbeat status: {}", appPathIdentifier, httpResponse.getStatusCode());
if (httpResponse.getStatusCode() == Status.NOT_FOUND.getStatusCode()) {
REREGISTER_COUNTER.increment();
logger.info(PREFIX + "{} - Re-registering apps/{}", appPathIdentifier, instanceInfo.getAppName());
long timestamp = instanceInfo.setIsDirtyWithTime();
boolean success = register();
if (success) {
instanceInfo.unsetIsDirty(timestamp);
}
return success;
}
return httpResponse.getStatusCode() == Status.OK.getStatusCode();
} catch (Throwable e) {
logger.error(PREFIX + "{} - was unable to send heartbeat!", appPathIdentifier, e);
return false;
}
}
-
服務(wù)注冊
任務(wù)InstanceInfoReplicator
中保存了客戶端服務(wù)的實(shí)例信息
- 構(gòu)造器中初始化了Schedule線程池,隨著start方法被調(diào)用芒划,任務(wù)會(huì)被Schedule線程池調(diào)度冬竟。(這里的initialDelayMs為EurekaClientConfigBean中的默認(rèn)配置initialInstanceInfoReplicationIntervalSeconds = 40,即延遲40S后再向注冊中心發(fā)起服務(wù)注冊請(qǐng)求)
- run方法中實(shí)際執(zhí)行的是
DiscoveryClient
的register方法民逼,以完成客戶端服務(wù)向注冊中心的注冊
InstanceInfoReplicator
InstanceInfoReplicator(DiscoveryClient discoveryClient, InstanceInfo instanceInfo, int replicationIntervalSeconds, int burstSize) {
this.discoveryClient = discoveryClient;
this.instanceInfo = instanceInfo;
this.scheduler = Executors.newScheduledThreadPool(1,
new ThreadFactoryBuilder()
.setNameFormat("DiscoveryClient-InstanceInfoReplicator-%d")
.setDaemon(true)
.build());
this.scheduledPeriodicRef = new AtomicReference<Future>();
this.started = new AtomicBoolean(false);
this.rateLimiter = new RateLimiter(TimeUnit.MINUTES);
this.replicationIntervalSeconds = replicationIntervalSeconds;
this.burstSize = burstSize;
this.allowedRatePerMinute = 60 * this.burstSize / this.replicationIntervalSeconds;
logger.info("InstanceInfoReplicator onDemand update allowed rate per min is {}", allowedRatePerMinute);
}
public void start(int initialDelayMs) {
if (started.compareAndSet(false, true)) {
instanceInfo.setIsDirty(); // for initial register
Future next = scheduler.schedule(this, initialDelayMs, TimeUnit.SECONDS);
scheduledPeriodicRef.set(next);
}
}
public void run() {
try {
discoveryClient.refreshInstanceInfo();
Long dirtyTimestamp = instanceInfo.isDirtyWithTime();
if (dirtyTimestamp != null) {
discoveryClient.register();
instanceInfo.unsetIsDirty(dirtyTimestamp);
}
} catch (Throwable t) {
logger.warn("There was a problem with the instance info replicator", t);
} finally {
Future next = scheduler.schedule(this, replicationIntervalSeconds, TimeUnit.SECONDS);
scheduledPeriodicRef.set(next);
}
}
DiscoveryClient
類的register方法內(nèi)泵殴,向注冊中心發(fā)起了注冊請(qǐng)求。
DiscoveryClient
boolean register() throws Throwable {
logger.info(PREFIX + "{}: registering service...", appPathIdentifier);
EurekaHttpResponse<Void> httpResponse;
try {
httpResponse = eurekaTransport.registrationClient.register(instanceInfo);
} catch (Exception e) {
logger.warn(PREFIX + "{} - registration failed {}", appPathIdentifier, e.getMessage(), e);
throw e;
}
if (logger.isInfoEnabled()) {
logger.info(PREFIX + "{} - registration status: {}", appPathIdentifier, httpResponse.getStatusCode());
}
return httpResponse.getStatusCode() == Status.NO_CONTENT.getStatusCode();
}
-
服務(wù)卸載
在DiscoveryClient
類的shutdown方法中還提供了通知注冊中心卸載服務(wù)的能力拼苍。
- 對(duì)
@PreDestroy
的處理在InitDestroyAnnotationBeanPostProcessor
的postProcessBeforeDestruction中袋狞,屬于Spring的能力。 - 關(guān)閉服務(wù)時(shí)shutdown方法會(huì)被調(diào)用,cancelScheduledTasks中關(guān)閉了上述服務(wù)注冊苟鸯、服務(wù)續(xù)約和獲取所有服務(wù)中創(chuàng)建的線程池
- unregister中向注冊中心發(fā)起了卸載實(shí)例的請(qǐng)求
@PreDestroy
@Override
public synchronized void shutdown() {
if (isShutdown.compareAndSet(false, true)) {
logger.info("Shutting down DiscoveryClient ...");
if (statusChangeListener != null && applicationInfoManager != null) {
applicationInfoManager.unregisterStatusChangeListener(statusChangeListener.getId());
}
cancelScheduledTasks();
// If APPINFO was registered
if (applicationInfoManager != null
&& clientConfig.shouldRegisterWithEureka()
&& clientConfig.shouldUnregisterOnShutdown()) {
applicationInfoManager.setInstanceStatus(InstanceStatus.DOWN);
unregister();
}
if (eurekaTransport != null) {
eurekaTransport.shutdown();
}
heartbeatStalenessMonitor.shutdown();
registryStalenessMonitor.shutdown();
logger.info("Completed shut down of DiscoveryClient");
}
}
private void cancelScheduledTasks() {
if (instanceInfoReplicator != null) {
instanceInfoReplicator.stop();
}
if (heartbeatExecutor != null) {
heartbeatExecutor.shutdownNow();
}
if (cacheRefreshExecutor != null) {
cacheRefreshExecutor.shutdownNow();
}
if (scheduler != null) {
scheduler.shutdownNow();
}
}
void unregister() {
// It can be null if shouldRegisterWithEureka == false
if(eurekaTransport != null && eurekaTransport.registrationClient != null) {
try {
logger.info("Unregistering ...");
EurekaHttpResponse<Void> httpResponse = eurekaTransport.registrationClient.cancel(instanceInfo.getAppName(), instanceInfo.getId());
logger.info(PREFIX + "{} - deregister status: {}", appPathIdentifier, httpResponse.getStatusCode());
} catch (Exception e) {
logger.error(PREFIX + "{} - de-registration failed{}", appPathIdentifier, e.getMessage(), e);
}
}
}
至此同蜻,服務(wù)注冊、續(xù)約早处、緩存注冊中心所有服務(wù)和服務(wù)卸載都以完成湾蔓,但作為注冊中心,如何處理這些客戶端請(qǐng)求砌梆,以及注冊中心集群間如何交互默责,還需要再進(jìn)一步了解Eureka Server注冊中心。