core:Java功能增強(qiáng) —— 事件機(jī)制(事件與監(jiān)聽(tīng)器)
Eureka提供了服務(wù)注冊(cè)與發(fā)現(xiàn)的功能,需要提供一個(gè)服務(wù)注冊(cè)中心(我這里是在spring boot項(xiàng)目啟動(dòng)類上使用@EnableEurekaServer泼疑,再配置相關(guān)屬性)切威,然后在我們的應(yīng)用服務(wù)(我這里是spring boot服務(wù))啟動(dòng)類上使用@EnableDiscoveryClient啟用服務(wù)注冊(cè)發(fā)現(xiàn)功能闯狱。那么問(wèn)題來(lái)了洽沟,我們的服務(wù)是怎么注冊(cè)到注冊(cè)中心的呢攘宙?
使用@EnableDiscoveryClient注解后啟動(dòng)服務(wù)丈莺,我們可以發(fā)現(xiàn)控制臺(tái)打出了如下log:
從圖中可以看出的信息有:1划煮、是在EurekaDiscoveryClientConfiguration中將服務(wù)器注冊(cè)到eureka的。2缔俄、DiscoveryClient類里面應(yīng)該有許多服務(wù)器與eureka之間的通信操作弛秋,如心跳續(xù)約等。3俐载、InstanceInfoReplicator應(yīng)該是一個(gè)線程類蟹略。
既然EurekaDiscoveryClientConfiguration是最開(kāi)始的地方,那就從它看起遏佣。
1挖炬、EurekaDiscoveryClientConfiguration
源碼如下:
@Configuration //使用該注解來(lái)讓Spring容器發(fā)現(xiàn)并注冊(cè)里面的Bean
@EnableConfigurationProperties
@ConditionalOnClass({EurekaClientConfig.class})
@ConditionalOnProperty(
value = {"eureka.client.enabled"},
matchIfMissing = true
)
public class EurekaDiscoveryClientConfiguration implements SmartLifecycle, Ordered {
public void start() {
if (this.port.get() != 0 && this.instanceConfig.getNonSecurePort() == 0) {
this.instanceConfig.setNonSecurePort(this.port.get());
}
if (!this.running.get() && this.instanceConfig.getNonSecurePort() > 0) {
this.maybeInitializeClient();
if (log.isInfoEnabled()) {
log.info("Registering application " + this.instanceConfig.getAppname() + " with eureka with status " + this.instanceConfig.getInitialStatus());
}
this.applicationInfoManager.setInstanceStatus(this.instanceConfig.getInitialStatus());
if (this.healthCheckHandler != null) {
this.eurekaClient.registerHealthCheck(this.healthCheckHandler);
}
//發(fā)布了一個(gè)注冊(cè)事件,在哪里監(jiān)聽(tīng)的状婶?還是說(shuō)是留給我們開(kāi)發(fā)者用的意敛?
this.context.publishEvent(new InstanceRegisteredEvent(this, this.instanceConfig));
this.running.set(true);
}
}
private void maybeInitializeClient() {
this.applicationInfoManager.getInfo();
this.eurekaClient.getApplications();
}
public void stop() {
if (this.applicationInfoManager.getInfo() != null) {
if (log.isInfoEnabled()) {
log.info("Unregistering application " + this.instanceConfig.getAppname() + " with eureka with status DOWN");
}
this.applicationInfoManager.setInstanceStatus(InstanceStatus.DOWN);
}
this.running.set(false);
}
//監(jiān)聽(tīng),內(nèi)置容器啟動(dòng)時(shí)開(kāi)始注冊(cè)
@EventListener({EmbeddedServletContainerInitializedEvent.class})
public void onApplicationEvent(EmbeddedServletContainerInitializedEvent event) {
int localPort = event.getEmbeddedServletContainer().getPort();
if (this.port.get() == 0) {
log.info("Updating port to " + localPort);
this.port.compareAndSet(0, localPort);
this.start();
}
}
//監(jiān)聽(tīng)膛虫,應(yīng)用關(guān)閉時(shí)關(guān)閉eureka客戶端
@EventListener({ContextClosedEvent.class})
public void onApplicationEvent(ContextClosedEvent event) {
this.stop();
this.eurekaClient.shutdown();
}
}
按猜想的來(lái)說(shuō)應(yīng)該有一個(gè)Rest請(qǐng)求什么的將服務(wù)器信息發(fā)送到eureka服務(wù)器才對(duì)空闲,好像上面的start()方法沒(méi)有做什么事情。
我們關(guān)閉了eureka注冊(cè)中心走敌,重新啟動(dòng)服務(wù)碴倾,發(fā)現(xiàn)報(bào)了如下錯(cuò):
嗯,意料之中,肯定連不上啊跌榔,從這里可以看出异雁,發(fā)請(qǐng)求是RedirectingEurekaHttpClient請(qǐng)求客戶端做的,接著往下:
就像一開(kāi)始猜想的僧须,DiscoveryClient里面有與eureka的通信操作纲刀,而InstanceInfoReplicator應(yīng)該是一個(gè)線程類。進(jìn)去看看:
DiscoveryClient使用@Singleton修飾担平,里面有這么兩個(gè)函數(shù)示绊,可以看出是注冊(cè)與續(xù)約:
boolean register() throws Throwable {
logger.info("DiscoveryClient_" + this.appPathIdentifier + ": registering service...");
EurekaHttpResponse httpResponse;
try {
httpResponse = this.eurekaTransport.registrationClient.register(this.instanceInfo);
} catch (Exception var3) {
logger.warn("{} - registration failed {}", new Object[]{"DiscoveryClient_" + this.appPathIdentifier, var3.getMessage(), var3});
throw var3;
}
if (logger.isInfoEnabled()) {
logger.info("{} - registration status: {}", "DiscoveryClient_" + this.appPathIdentifier, httpResponse.getStatusCode());
}
return httpResponse.getStatusCode() == 204;
}
boolean renew() {
try {
EurekaHttpResponse<InstanceInfo> httpResponse = this.eurekaTransport.registrationClient.sendHeartBeat(this.instanceInfo.getAppName(), this.instanceInfo.getId(), this.instanceInfo, (InstanceStatus)null);
logger.debug("{} - Heartbeat status: {}", "DiscoveryClient_" + this.appPathIdentifier, httpResponse.getStatusCode());
if (httpResponse.getStatusCode() == 404) {
this.REREGISTER_COUNTER.increment();
logger.info("{} - Re-registering apps/{}", "DiscoveryClient_" + this.appPathIdentifier, this.instanceInfo.getAppName());
return this.register();
} else {
return httpResponse.getStatusCode() == 200;
}
} catch (Throwable var3) {
logger.error("{} - was unable to send heartbeat!", "DiscoveryClient_" + this.appPathIdentifier, var3);
return false;
}
}
這個(gè)register()就是InstanceInfoReplicator的run方法里調(diào)用的,而InstanceInfoReplicator是在DiscoveryClient的構(gòu)造方法中實(shí)例化的暂论。
在eureka服務(wù)器端則是將服務(wù)信息存放在一個(gè)雙層Map里面褐,第一層的Key是服務(wù)名,第二層的Key是實(shí)例名:
private final ConcurrentHashMap<String, Map<String, Lease<InstanceInfo>>> registry = new ConcurrentHashMap();