現(xiàn)在研究下Eureka服務(wù)下線的源碼虾啦。由服務(wù)續(xù)約的源碼我們知道,如果客戶端在90秒內(nèi)沒(méi)有繼續(xù)跟服務(wù)端進(jìn)行心跳的話滤淳,服務(wù)端會(huì)進(jìn)行下線客戶端并且更改狀態(tài)將其剔除勺鸦,并且也會(huì)在集群中告知(同步)其它節(jié)點(diǎn)亲配。
〓Eureka Client
/**
* 注銷服務(wù),調(diào)用client的cancel服務(wù)善榛,往里面看也就是調(diào)用了服務(wù)端的http delete 請(qǐng)求進(jìn)行服務(wù)下線
*/ void unregister() {
// It can be null if shouldRegisterWithEureka == false if(eurekaTransport != null && eurekaTransport.registrationClient != null) {
try {
logger.info("Unregistering ...");
EurekaHttpResponse<Void> httpResponse = eurekaTransport.registrationClient.cancel(instanceInfo.getAppName(), instanceInfo.getId());
logger.info(PREFIX + appPathIdentifier + " - deregister status: " + httpResponse.getStatusCode());
} catch (Exception e) {
logger.error(PREFIX + appPathIdentifier + " - de-registration failed" + e.getMessage(), e);
}
}
}
@Override public EurekaHttpResponse<Void> cancel(String appName, String id) {
String urlPath = "apps/" + appName + '/' + id;
ClientResponse response = null;
try {
Builder resourceBuilder = jerseyClient.resource(serviceUrl).path(urlPath).getRequestBuilder();
addExtraHeaders(resourceBuilder);
response = resourceBuilder.delete(ClientResponse.class);
return anEurekaHttpResponse(response.getStatus()).headers(headersOf(response)).build();
} finally {
if (logger.isDebugEnabled()) {
logger.debug("Jersey HTTP DELETE {}/{}; statusCode={}", serviceUrl, urlPath, response == null ? "N/A" : response.getStatus());
}
if (response != null) {
response.close();
}
}
}
接下來(lái)再看是哪里調(diào)用注銷服務(wù):
@PreDestroy @Override public synchronized void shutdown() {
if (isShutdown.compareAndSet(false, true)) {
logger.info("Shutting down DiscoveryClient ...");
// 注銷服務(wù)狀態(tài)的監(jiān)聽(tīng)器 if (statusChangeListener != null && applicationInfoManager != null) {
applicationInfoManager.unregisterStatusChangeListener(statusChangeListener.getId());
}
// 取消定時(shí)器任務(wù)砾肺,關(guān)閉線程池等 cancelScheduledTasks();
// 服務(wù)實(shí)例以及被注冊(cè),那么設(shè)置實(shí)例狀態(tài)為DOWN翩蘸,并且進(jìn)行注銷操作 if (applicationInfoManager != null && clientConfig.shouldRegisterWithEureka()) {
applicationInfoManager.setInstanceStatus(InstanceStatus.DOWN);
unregister();
}
// 關(guān)閉eurekaTransport if (eurekaTransport != null) {
eurekaTransport.shutdown();
}
// 關(guān)閉監(jiān)控 heartbeatStalenessMonitor.shutdown();
registryStalenessMonitor.shutdown();
logger.info("Completed shut down of DiscoveryClient");
}
}
// DiscoveryManager調(diào)用了DiscveryClient的shutdown方法所意,這里就是服務(wù)下線的入口 public void shutdownComponent() {
if (discoveryClient != null) {
try {
discoveryClient.shutdown();
discoveryClient = null;
} catch (Throwable th) {
logger.error("Error in shutting down client", th);
}
}
}
〓Eureka Server
// 服務(wù)端提供的對(duì)外服務(wù)下線接口 @DELETE public Response cancelLease(
@HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication) {
boolean isSuccess = registry.cancel(app.getName(), id,
"true".equals(isReplication));
if (isSuccess) {
logger.debug("Found (Cancel): " + app.getName() + " - " + id);
return Response.ok().build();
} else {
logger.info("Not Found (Cancel): " + app.getName() + " - " + id);
return Response.status(Status.NOT_FOUND).build();
}
}
// 服務(wù)端下線入口 @Override public boolean cancel(final String appName, final String id,
final boolean isReplication) {
// 調(diào)用父類cancel方法,將實(shí)例信息添加到最近下線隊(duì)列催首、最近變更隊(duì)列扶踊,并且使本地緩存失效操作 if (super.cancel(appName, id, isReplication)) {
// 服務(wù)端集群之間進(jìn)行Cancel同步操作 replicateToPeers(Action.Cancel, appName, id, null, null, isReplication);
synchronized (lock) {
if (this.expectedNumberOfRenewsPerMin > 0) {
// Since the client wants to cancel it, reduce the threshold (1 for 30 seconds, 2 for a minute) this.expectedNumberOfRenewsPerMin = this.expectedNumberOfRenewsPerMin - 2;
this.numberOfRenewsPerMinThreshold =
(int) (this.expectedNumberOfRenewsPerMin * serverConfig.getRenewalPercentThreshold());
}
}
return true;
}
return false;
}
// 最終調(diào)用這個(gè)方法進(jìn)行實(shí)例信息移除 protected boolean internalCancel(String appName, String id, boolean isReplication) {
try {
// 讀鎖 read.lock();
// 取消(下線)計(jì)數(shù) CANCEL.increment(isReplication);
// 獲取續(xù)約實(shí)例 Map<String, Lease<InstanceInfo>> gMap = registry.get(appName);
Lease<InstanceInfo> leaseToCancel = null;
if (gMap != null) {
leaseToCancel = gMap.remove(id);
}
// 添加到近期取消隊(duì)列 recentCanceledQueue.add(new Pair<Long, String>(System.currentTimeMillis(), appName + "(" + id + ")"));
// 移除實(shí)例狀態(tài) InstanceStatus instanceStatus = overriddenInstanceStatusMap.remove(id);
if (instanceStatus != null) {
logger.debug("Removed instance id {} from the overridden map which has value {}", id, instanceStatus.name());
}
if (leaseToCancel == null) {
// 續(xù)約實(shí)例不存在,返回false CANCEL_NOT_FOUND.increment(isReplication);
logger.warn("DS: Registry: cancel failed because Lease is not registered for: {}/{}", appName, id);
return false;
} else {
// 續(xù)約實(shí)例執(zhí)行取消操作:設(shè)置剔除時(shí)間 leaseToCancel.cancel();
InstanceInfo instanceInfo = leaseToCancel.getHolder();
String vip = null;
String svip = null;
if (instanceInfo != null) {
// 設(shè)置實(shí)例行為為delete instanceInfo.setActionType(ActionType.DELETED);
// 添加到近期變化隊(duì)列 recentlyChangedQueue.add(new RecentlyChangedItem(leaseToCancel));
instanceInfo.setLastUpdatedTimestamp();
vip = instanceInfo.getVIPAddress();
svip = instanceInfo.getSecureVipAddress();
}
// 使響應(yīng)緩存失效 invalidateCache(appName, vip, svip);
logger.info("Cancelled instance {}/{} (replication={})", appName, id, isReplication);
return true;
}
} finally {
// 釋放讀鎖 read.unlock();
}
}
總結(jié)
Eureka client執(zhí)行DiscoveryManager調(diào)用了DiscveryClient的shutdown方法進(jìn)行服務(wù)的下線操作郎任,然后服務(wù)端接收到http delete請(qǐng)求之后進(jìn)行服務(wù)的相關(guān)下線操作姻檀,并且同步到集群中的其它節(jié)點(diǎn)。Eureka server則會(huì)將實(shí)例信息進(jìn)行剔除處理涝滴,并且添加到近期變化隊(duì)列和近期取消隊(duì)列里绣版。