Spring官網(wǎng)的Eureka Server使用教程
一 Maven依賴
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>Greenwich.SR3</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-eureka-server</artifactId>
</dependency>
二 自動(dòng)配置類
EurekaServerAutoConfiguration
是Eureka Server注冊(cè)中心中的自動(dòng)配置類望众,類上導(dǎo)入了另一個(gè)配置類EurekaServerInitializerConfiguration
坯门。以下列舉出其中部分比較重要的Bean
- PeerAwareInstanceRegistry
- PeerEurekaNodes
- EurekaServerContext
三 Bean釋義
- PeerAwareInstanceRegistry
實(shí)例注冊(cè)中心迅栅,維護(hù)了所有Eureka Client客戶端注冊(cè)上來的實(shí)例
@Bean
public PeerAwareInstanceRegistry peerAwareInstanceRegistry(
ServerCodecs serverCodecs) {
this.eurekaClient.getApplications(); // force initialization
return new InstanceRegistry(this.eurekaServerConfig, this.eurekaClientConfig,
serverCodecs, this.eurekaClient,
this.instanceRegistryProperties.getExpectedNumberOfClientsSendingRenews(),
this.instanceRegistryProperties.getDefaultOpenForTrafficCount());
}
- PeerEurekaNodes
Eureka Server服務(wù)集群中的單個(gè)Eureka Server服務(wù)
@Bean
@ConditionalOnMissingBean
public PeerEurekaNodes peerEurekaNodes(PeerAwareInstanceRegistry registry,
ServerCodecs serverCodecs,
ReplicationClientAdditionalFilters replicationClientAdditionalFilters) {
return new RefreshablePeerEurekaNodes(registry, this.eurekaServerConfig,
this.eurekaClientConfig, serverCodecs, this.applicationInfoManager,
replicationClientAdditionalFilters);
}
- EurekaServerContext
Eureka Server服務(wù)上下文
@Bean
public EurekaServerContext eurekaServerContext(ServerCodecs serverCodecs,
PeerAwareInstanceRegistry registry, PeerEurekaNodes peerEurekaNodes) {
return new DefaultEurekaServerContext(this.eurekaServerConfig, serverCodecs,
registry, peerEurekaNodes, this.applicationInfoManager);
}
四 Eureka Server服務(wù)的啟動(dòng)入口
DefaultEurekaServerContext
作為Bean托管給容器敛劝,其內(nèi)部initialize方法使用@PostConstruct標(biāo)注捏境,表示要在Bean初始化前執(zhí)行此方法(由InitDestroyAnnotationBeanPostProcessor
使用反射完成調(diào)用)于游,方法內(nèi)部會(huì)做兩件事:
- 啟動(dòng)Eureka Server
- 初始化注冊(cè)中心
@PostConstruct
@Override
public void initialize() {
logger.info("Initializing ...");
peerEurekaNodes.start();
try {
registry.init(peerEurekaNodes);
} catch (Exception e) {
throw new RuntimeException(e);
}
logger.info("Initialized");
}
-
啟動(dòng)Eureka Server
啟動(dòng)Eureka Server會(huì)從配置文件中讀取配置的其他Eureka Server節(jié)點(diǎn)地址,然后根據(jù)節(jié)點(diǎn)地址分別創(chuàng)建PeerEurekaNode垫言,將其他Eureka Server節(jié)點(diǎn)對(duì)應(yīng)的PeerEurekaNode保存在當(dāng)前Eureka Server節(jié)點(diǎn)上贰剥,為后續(xù)集群間的同步做準(zhǔn)備,源碼追蹤如下:
- 創(chuàng)建Schedule線程池
- 執(zhí)行方法updatePeerEurekaNodes(resolvePeerUrls())
- 向線程池中放入任務(wù)peersUpdateTask以周期性的執(zhí)行(任務(wù)中執(zhí)行方法updatePeerEurekaNodes(resolvePeerUrls()))
- 執(zhí)行間隔為Bean
EurekaServerConfigBean
中的默認(rèn)配置10分鐘
peerEurekaNodes.start();
public void start() {
taskExecutor = Executors.newSingleThreadScheduledExecutor(
new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r, "Eureka-PeerNodesUpdater");
thread.setDaemon(true);
return thread;
}
}
);
try {
updatePeerEurekaNodes(resolvePeerUrls());
Runnable peersUpdateTask = new Runnable() {
@Override
public void run() {
try {
updatePeerEurekaNodes(resolvePeerUrls());
} catch (Throwable e) {
logger.error("Cannot update the replica Nodes", e);
}
}
};
taskExecutor.scheduleWithFixedDelay(
peersUpdateTask,
serverConfig.getPeerEurekaNodesUpdateIntervalMs(),
serverConfig.getPeerEurekaNodesUpdateIntervalMs(),
TimeUnit.MILLISECONDS
);
} catch (Exception e) {
throw new IllegalStateException(e);
}
for (PeerEurekaNode node : peerEurekaNodes) {
logger.info("Replica node URL: {}", node.getServiceUrl());
}
}
applicationInfoManager在構(gòu)造PeerEurekaNodes時(shí)已經(jīng)由構(gòu)造器傳入(applicationInfoManager在Client篇已經(jīng)說明)筷频。在resolvePeerUrls方法內(nèi)蚌成,從配置文件中取出配置的集群中的其他Eureka Server節(jié)點(diǎn)地址返回
protected List<String> resolvePeerUrls() {
InstanceInfo myInfo = applicationInfoManager.getInfo();
String zone = InstanceInfo.getZone(clientConfig.getAvailabilityZones(clientConfig.getRegion()), myInfo);
List<String> replicaUrls = EndpointUtils
.getDiscoveryServiceUrls(clientConfig, zone, new EndpointUtils.InstanceInfoBasedUrlRandomizer(myInfo));
int idx = 0;
while (idx < replicaUrls.size()) {
if (isThisMyUrl(replicaUrls.get(idx))) {
replicaUrls.remove(idx);
} else {
idx++;
}
}
return replicaUrls;
}
收集到集群中的其他Eureka Server節(jié)點(diǎn)地址后,使用createPeerEurekaNode創(chuàng)建其他Eureka Server節(jié)點(diǎn)對(duì)象凛捏,存儲(chǔ)在PeerEurekaNodes內(nèi)的peerEurekaNodes上担忧。
protected void updatePeerEurekaNodes(List<String> newPeerUrls) {
if (newPeerUrls.isEmpty()) {
logger.warn("The replica size seems to be empty. Check the route 53 DNS Registry");
return;
}
Set<String> toShutdown = new HashSet<>(peerEurekaNodeUrls);
toShutdown.removeAll(newPeerUrls);
Set<String> toAdd = new HashSet<>(newPeerUrls);
toAdd.removeAll(peerEurekaNodeUrls);
if (toShutdown.isEmpty() && toAdd.isEmpty()) { // No change
return;
}
// Remove peers no long available
List<PeerEurekaNode> newNodeList = new ArrayList<>(peerEurekaNodes);
if (!toShutdown.isEmpty()) {
logger.info("Removing no longer available peer nodes {}", toShutdown);
int i = 0;
while (i < newNodeList.size()) {
PeerEurekaNode eurekaNode = newNodeList.get(i);
if (toShutdown.contains(eurekaNode.getServiceUrl())) {
newNodeList.remove(i);
eurekaNode.shutDown();
} else {
i++;
}
}
}
// Add new peers
if (!toAdd.isEmpty()) {
logger.info("Adding new peer nodes {}", toAdd);
for (String peerUrl : toAdd) {
newNodeList.add(createPeerEurekaNode(peerUrl));
}
}
this.peerEurekaNodes = newNodeList;
this.peerEurekaNodeUrls = new HashSet<>(newPeerUrls);
}
封裝其他Eureka Server節(jié)點(diǎn)為PeerEurekaNode對(duì)象最重要的步驟,是使用配置的參數(shù)去初始化當(dāng)前Eureka Server節(jié)點(diǎn)與其他Eureka Server節(jié)點(diǎn)通信的HttpClient坯癣,具體在JerseyReplicationClient的createReplicationClient中嘗試創(chuàng)建EurekaJerseyClient瓶盛,但追蹤其底層實(shí)質(zhì)是對(duì)Apache HttpClient的封裝,方法調(diào)用鏈如下:
> JerseyReplicationClient.createReplicationClient
> EurekaJerseyClientBuilder.build
> new EurekaJerseyClientImpl
> ApacheHttpClient4 create
> ApacheHttpClient4 createDefaultClientHandler
> new DefaultHttpClient
protected PeerEurekaNode createPeerEurekaNode(String peerEurekaNodeUrl) {
HttpReplicationClient replicationClient = JerseyReplicationClient.createReplicationClient(serverConfig, serverCodecs, peerEurekaNodeUrl);
String targetHost = hostFromUrl(peerEurekaNodeUrl);
if (targetHost == null) {
targetHost = "host";
}
return new PeerEurekaNode(registry, targetHost, peerEurekaNodeUrl, replicationClient, serverConfig);
}
-
初始化注冊(cè)中心
Eureka Server作為注冊(cè)中心示罗,會(huì)維護(hù)一個(gè)注冊(cè)到當(dāng)前節(jié)點(diǎn)的實(shí)例數(shù)量閾值惩猫。初始化注冊(cè)中心中,初始化響應(yīng)緩存(guava)蚜点;啟動(dòng)了一個(gè)Timer轧房,該Timer會(huì)定時(shí)統(tǒng)計(jì)注冊(cè)中心的實(shí)例數(shù)量,在滿足一定的判斷條件后绍绘,動(dòng)態(tài)更新注冊(cè)中心維護(hù)的閾值奶镶。條件判斷與閾值的更新在方法scheduleRenewalThresholdUpdateTask中
registry.init(peerEurekaNodes);
@Override
public void init(PeerEurekaNodes peerEurekaNodes) throws Exception {
this.numberOfReplicationsLastMin.start();
this.peerEurekaNodes = peerEurekaNodes;
initializedResponseCache();
scheduleRenewalThresholdUpdateTask();
initRemoteRegionRegistry();
try {
Monitors.registerObject(this);
} catch (Throwable e) {
logger.warn("Cannot register the JMX monitor for the InstanceRegistry :", e);
}
}
- eurekaClient是在InstanceRegistry的構(gòu)造器中傳入迟赃,eurekaClient是由Eureka Client的自動(dòng)配置類托管給容器的
- getApplications方法用以獲取當(dāng)前服務(wù)緩存的注冊(cè)中心的所有實(shí)例,count為實(shí)例數(shù)量值
- 判斷count是否大于當(dāng)前閾值的85%厂镇,85%為Bean
EurekaServerConfigBean
的默認(rèn)配置 - 在滿足條件的情況下捺氢,在updateRenewsPerMinThreshold中更新閾值
private void updateRenewalThreshold() {
try {
Applications apps = eurekaClient.getApplications();
int count = 0;
for (Application app : apps.getRegisteredApplications()) {
for (InstanceInfo instance : app.getInstances()) {
if (this.isRegisterable(instance)) {
++count;
}
}
}
synchronized (lock) {
// Update threshold only if the threshold is greater than the
// current expected threshold or if self preservation is disabled.
if ((count) > (serverConfig.getRenewalPercentThreshold() * expectedNumberOfClientsSendingRenews)
|| (!this.isSelfPreservationModeEnabled())) {
this.expectedNumberOfClientsSendingRenews = count;
updateRenewsPerMinThreshold();
}
}
logger.info("Current renewal threshold is : {}", numberOfRenewsPerMinThreshold);
} catch (Throwable e) {
logger.error("Cannot update renewal threshold", e);
}
}
protected void updateRenewsPerMinThreshold() {
this.numberOfRenewsPerMinThreshold = (int) (this.expectedNumberOfClientsSendingRenews
* (60.0 / serverConfig.getExpectedClientRenewalIntervalSeconds())
* serverConfig.getRenewalPercentThreshold());
}
五 Eureka Server注冊(cè)中心響應(yīng)客戶端請(qǐng)求
Eureka Server作為服務(wù)注冊(cè)中心,使用Jersey暴露資源接口剪撬,用以處理Eureka Client客戶端的各種請(qǐng)求摄乒。列舉出暴露核心接口的入口資源類,有興趣可以再研究Jersey
- ApplicationsResource 服務(wù)獲取
- ApplicationResource 服務(wù)注冊(cè)
- InstanceResource 服務(wù)續(xù)約残黑、卸載
- PeerReplicationResource Eureka Server節(jié)點(diǎn)用來做集群狀態(tài)同步
但實(shí)際上所有客戶端的請(qǐng)求馍佑,包括服務(wù)注冊(cè)、獲取梨水、續(xù)約和卸載拭荤,最終都由InstanceRegistry的父類AbstractInstanceRegistry來完成處理,其內(nèi)部屬性registry中存儲(chǔ)了所有Eureka Client客戶端注冊(cè)上來的實(shí)例疫诽。
private final ConcurrentHashMap<String, Map<String, Lease<InstanceInfo>>> registry
= new ConcurrentHashMap<String, Map<String, Lease<InstanceInfo>>>();
-
服務(wù)注冊(cè)
Eureka Server注冊(cè)中心接收到來自Eureka Client客戶端的服務(wù)注冊(cè)請(qǐng)求后舅世,會(huì)在自己內(nèi)部保存該實(shí)例,然后向集群中的其他Eureka Server節(jié)點(diǎn)發(fā)起注冊(cè)服務(wù)請(qǐng)求奇徒,該請(qǐng)求是Eureka Server節(jié)點(diǎn)之間的交互雏亚,不同于客戶端與注冊(cè)中心的注冊(cè)請(qǐng)求,不會(huì)出現(xiàn)循環(huán)調(diào)用的情況摩钙。
接收客戶端服務(wù)注冊(cè)請(qǐng)求的是使用Jersey暴露的資源ApplicationResource的addInstance方法罢低,在做了基礎(chǔ)校驗(yàn)及數(shù)據(jù)校驗(yàn)后,交由注冊(cè)中心來完成注冊(cè)胖笛,這個(gè)過程是由InstanceRegistry的注冊(cè)方法再調(diào)用了父類PeerAwareInstanceRegistryImpl的注冊(cè)方法
@POST
@Consumes({"application/json", "application/xml"})
public Response addInstance(InstanceInfo info,
@HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication) {
logger.debug("Registering instance {} (replication={})", info.getId(), isReplication);
// validate that the instanceinfo contains all the necessary required fields
if (isBlank(info.getId())) {
return Response.status(400).entity("Missing instanceId").build();
} else if (isBlank(info.getHostName())) {
return Response.status(400).entity("Missing hostname").build();
} else if (isBlank(info.getIPAddr())) {
return Response.status(400).entity("Missing ip address").build();
} else if (isBlank(info.getAppName())) {
return Response.status(400).entity("Missing appName").build();
} else if (!appName.equals(info.getAppName())) {
return Response.status(400).entity("Mismatched appName, expecting " + appName + " but was " + info.getAppName()).build();
} else if (info.getDataCenterInfo() == null) {
return Response.status(400).entity("Missing dataCenterInfo").build();
} else if (info.getDataCenterInfo().getName() == null) {
return Response.status(400).entity("Missing dataCenterInfo Name").build();
}
// handle cases where clients may be registering with bad DataCenterInfo with missing data
DataCenterInfo dataCenterInfo = info.getDataCenterInfo();
if (dataCenterInfo instanceof UniqueIdentifier) {
String dataCenterInfoId = ((UniqueIdentifier) dataCenterInfo).getId();
if (isBlank(dataCenterInfoId)) {
boolean experimental = "true".equalsIgnoreCase(serverConfig.getExperimental("registration.validation.dataCenterInfoId"));
if (experimental) {
String entity = "DataCenterInfo of type " + dataCenterInfo.getClass() + " must contain a valid id";
return Response.status(400).entity(entity).build();
} else if (dataCenterInfo instanceof AmazonInfo) {
AmazonInfo amazonInfo = (AmazonInfo) dataCenterInfo;
String effectiveId = amazonInfo.get(AmazonInfo.MetaDataKey.instanceId);
if (effectiveId == null) {
amazonInfo.getMetadata().put(AmazonInfo.MetaDataKey.instanceId.getName(), info.getId());
}
} else {
logger.warn("Registering DataCenterInfo of type {} without an appropriate id", dataCenterInfo.getClass());
}
}
}
registry.register(info, "true".equals(isReplication));
return Response.status(204).build(); // 204 to be backwards compatible
}
PeerAwareInstanceRegistryImpl的注冊(cè)方法分為兩步
- 再調(diào)用父類AbstractInstanceRegistry的注冊(cè)方法
- 通知集群中的其他Eureka Server節(jié)點(diǎn)完成同步注冊(cè)
@Override
public void register(final InstanceInfo info, final boolean isReplication) {
int leaseDuration = Lease.DEFAULT_DURATION_IN_SECS;
if (info.getLeaseInfo() != null && info.getLeaseInfo().getDurationInSecs() > 0) {
leaseDuration = info.getLeaseInfo().getDurationInSecs();
}
super.register(info, leaseDuration, isReplication);
replicateToPeers(Action.Register, info.getAppName(), info.getId(), info, null, isReplication);
}
最終在AbstractInstanceRegistry的注冊(cè)方法中网持,會(huì)將客戶端實(shí)例保存在屬性registry中,這正是注冊(cè)中心保存所有客戶端實(shí)例信息的地方
- 從registry中判斷當(dāng)前實(shí)例對(duì)應(yīng)的服務(wù)是否存在(一個(gè)服務(wù)可能多節(jié)點(diǎn)部署所以有多個(gè)實(shí)例)长踊,無則新增(使用ConcurrentHashMap解決并發(fā)問題)
- 判斷實(shí)例對(duì)應(yīng)的服務(wù)中功舀,是否包含當(dāng)前實(shí)例。有則保留最后一次注冊(cè)的時(shí)間戳身弊,無則更新當(dāng)前注冊(cè)中心保存實(shí)例數(shù)量閾值
- 將實(shí)例ID與用實(shí)例封裝的Lease對(duì)象保存在對(duì)應(yīng)服務(wù)的Map中辟汰,而服務(wù)Map又被保存在registry中
- 標(biāo)記服務(wù)在線
private final ConcurrentHashMap<String, Map<String, Lease<InstanceInfo>>> registry
= new ConcurrentHashMap<String, Map<String, Lease<InstanceInfo>>>();
// 省略部分代碼
public void register(InstanceInfo registrant, int leaseDuration, boolean isReplication) {
try {
read.lock();
Map<String, Lease<InstanceInfo>> gMap = registry.get(registrant.getAppName());
REGISTER.increment(isReplication);
if (gMap == null) {
final ConcurrentHashMap<String, Lease<InstanceInfo>> gNewMap = new ConcurrentHashMap<String, Lease<InstanceInfo>>();
gMap = registry.putIfAbsent(registrant.getAppName(), gNewMap);
if (gMap == null) {
gMap = gNewMap;
}
}
Lease<InstanceInfo> existingLease = gMap.get(registrant.getId());
// Retain the last dirty timestamp without overwriting it, if there is already a lease
if (existingLease != null && (existingLease.getHolder() != null)) {
Long existingLastDirtyTimestamp = existingLease.getHolder().getLastDirtyTimestamp();
Long registrationLastDirtyTimestamp = registrant.getLastDirtyTimestamp();
logger.debug("Existing lease found (existing={}, provided={}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp);
// this is a > instead of a >= because if the timestamps are equal, we still take the remote transmitted
// InstanceInfo instead of the server local copy.
if (existingLastDirtyTimestamp > registrationLastDirtyTimestamp) {
logger.warn("There is an existing lease and the existing lease's dirty timestamp {} is greater" +
" than the one that is being registered {}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp);
logger.warn("Using the existing instanceInfo instead of the new instanceInfo as the registrant");
registrant = existingLease.getHolder();
}
} else {
// The lease does not exist and hence it is a new registration
synchronized (lock) {
if (this.expectedNumberOfClientsSendingRenews > 0) {
// Since the client wants to register it, increase the number of clients sending renews
this.expectedNumberOfClientsSendingRenews = this.expectedNumberOfClientsSendingRenews + 1;
updateRenewsPerMinThreshold();
}
}
logger.debug("No previous lease information found; it is new registration");
}
Lease<InstanceInfo> lease = new Lease<InstanceInfo>(registrant, leaseDuration);
if (existingLease != null) {
lease.setServiceUpTimestamp(existingLease.getServiceUpTimestamp());
}
gMap.put(registrant.getId(), lease);
synchronized (recentRegisteredQueue) {
recentRegisteredQueue.add(new Pair<Long, String>(
System.currentTimeMillis(),
registrant.getAppName() + "(" + registrant.getId() + ")"));
}
// This is where the initial state transfer of overridden status happens
if (!InstanceStatus.UNKNOWN.equals(registrant.getOverriddenStatus())) {
logger.debug("Found overridden status {} for instance {}. Checking to see if needs to be add to the "
+ "overrides", registrant.getOverriddenStatus(), registrant.getId());
if (!overriddenInstanceStatusMap.containsKey(registrant.getId())) {
logger.info("Not found overridden id {} and hence adding it", registrant.getId());
overriddenInstanceStatusMap.put(registrant.getId(), registrant.getOverriddenStatus());
}
}
InstanceStatus overriddenStatusFromMap = overriddenInstanceStatusMap.get(registrant.getId());
if (overriddenStatusFromMap != null) {
logger.info("Storing overridden status {} from map", overriddenStatusFromMap);
registrant.setOverriddenStatus(overriddenStatusFromMap);
}
// Set the status based on the overridden status rules
InstanceStatus overriddenInstanceStatus = getOverriddenInstanceStatus(registrant, existingLease, isReplication);
registrant.setStatusWithoutDirty(overriddenInstanceStatus);
// If the lease is registered with UP status, set lease service up timestamp
if (InstanceStatus.UP.equals(registrant.getStatus())) {
lease.serviceUp();
}
registrant.setActionType(ActionType.ADDED);
recentlyChangedQueue.add(new RecentlyChangedItem(lease));
registrant.setLastUpdatedTimestamp();
invalidateCache(registrant.getAppName(), registrant.getVIPAddress(), registrant.getSecureVipAddress());
logger.info("Registered instance {}/{} with status {} (replication={})",
registrant.getAppName(), registrant.getId(), registrant.getStatus(), isReplication);
} finally {
read.unlock();
}
}
-
服務(wù)獲取
客戶端啟動(dòng)30S后,會(huì)向注冊(cè)中心發(fā)起獲取所有服務(wù)的請(qǐng)求佑刷。由Jersey暴露的資源ApplicationsResource處理該請(qǐng)求莉擒,經(jīng)過響應(yīng)緩存處理酿炸,最終會(huì)將注冊(cè)中心收集到的所有注冊(cè)的實(shí)例返回給客戶端瘫絮。
- 方法嘗試從responseCache中獲取壓縮格式的所有服務(wù)實(shí)例
- responseCache為注冊(cè)中心的響應(yīng)緩存guava,在初始化注冊(cè)中心時(shí)有提及
@Inject
ApplicationsResource(EurekaServerContext eurekaServer) {
this.serverConfig = eurekaServer.getServerConfig();
this.registry = eurekaServer.getRegistry();
this.responseCache = registry.getResponseCache();
}
// 有省略代碼
@Path("delta")
@GET
public Response getContainerDifferential(
@PathParam("version") String version,
@HeaderParam(HEADER_ACCEPT) String acceptHeader,
@HeaderParam(HEADER_ACCEPT_ENCODING) String acceptEncoding,
@HeaderParam(EurekaAccept.HTTP_X_EUREKA_ACCEPT) String eurekaAccept,
@Context UriInfo uriInfo, @Nullable @QueryParam("regions") String regionsStr) {
boolean isRemoteRegionRequested = null != regionsStr && !regionsStr.isEmpty();
// If the delta flag is disabled in discovery or if the lease expiration
// has been disabled, redirect clients to get all instances
if ((serverConfig.shouldDisableDelta()) || (!registry.shouldAllowAccess(isRemoteRegionRequested))) {
return Response.status(Status.FORBIDDEN).build();
}
String[] regions = null;
if (!isRemoteRegionRequested) {
EurekaMonitors.GET_ALL_DELTA.increment();
} else {
regions = regionsStr.toLowerCase().split(",");
Arrays.sort(regions); // So we don't have different caches for same regions queried in different order.
EurekaMonitors.GET_ALL_DELTA_WITH_REMOTE_REGIONS.increment();
}
CurrentRequestVersion.set(Version.toEnum(version));
KeyType keyType = Key.KeyType.JSON;
String returnMediaType = MediaType.APPLICATION_JSON;
if (acceptHeader == null || !acceptHeader.contains(HEADER_JSON_VALUE)) {
keyType = Key.KeyType.XML;
returnMediaType = MediaType.APPLICATION_XML;
}
Key cacheKey = new Key(Key.EntityType.Application,
ResponseCacheImpl.ALL_APPS_DELTA,
keyType, CurrentRequestVersion.get(), EurekaAccept.fromString(eurekaAccept), regions
);
if (acceptEncoding != null
&& acceptEncoding.contains(HEADER_GZIP_VALUE)) {
return Response.ok(responseCache.getGZIP(cacheKey))
.header(HEADER_CONTENT_ENCODING, HEADER_GZIP_VALUE)
.header(HEADER_CONTENT_TYPE, returnMediaType)
.build();
} else {
return Response.ok(responseCache.get(cacheKey))
.build();
}
}
ResponseCacheImpl內(nèi)通過緩存獲取注冊(cè)中心所有服務(wù)實(shí)例的調(diào)用鏈依次為:getGZIP() > getValue() > readWriteCacheMap.get() > 激活guava緩存中的generatePayload()方法
- key.getEntityType()為Application填硕,key.getName()為ALL_APPS
- 有注冊(cè)中心返回所有已注冊(cè)的服務(wù)實(shí)例
private Value generatePayload(Key key) {
Stopwatch tracer = null;
try {
String payload;
switch (key.getEntityType()) {
case Application:
boolean isRemoteRegionRequested = key.hasRegions();
if (ALL_APPS.equals(key.getName())) {
if (isRemoteRegionRequested) {
tracer = serializeAllAppsWithRemoteRegionTimer.start();
payload = getPayLoad(key, registry.getApplicationsFromMultipleRegions(key.getRegions()));
} else {
tracer = serializeAllAppsTimer.start();
payload = getPayLoad(key, registry.getApplications());
}
} else if (ALL_APPS_DELTA.equals(key.getName())) {
if (isRemoteRegionRequested) {
tracer = serializeDeltaAppsWithRemoteRegionTimer.start();
versionDeltaWithRegions.incrementAndGet();
versionDeltaWithRegionsLegacy.incrementAndGet();
payload = getPayLoad(key,
registry.getApplicationDeltasFromMultipleRegions(key.getRegions()));
} else {
tracer = serializeDeltaAppsTimer.start();
versionDelta.incrementAndGet();
versionDeltaLegacy.incrementAndGet();
payload = getPayLoad(key, registry.getApplicationDeltas());
}
} else {
tracer = serializeOneApptimer.start();
payload = getPayLoad(key, registry.getApplication(key.getName()));
}
break;
case VIP:
case SVIP:
tracer = serializeViptimer.start();
payload = getPayLoad(key, getApplicationsForVip(key, registry));
break;
default:
logger.error("Unidentified entity type: {} found in the cache key.", key.getEntityType());
payload = "";
break;
}
return new Value(payload);
} finally {
if (tracer != null) {
tracer.stop();
}
}
}
遍歷registry返回所有實(shí)例麦萤,可以看到返回結(jié)果與Eureka Client篇拿到的響應(yīng)是相同的Applications
public Applications getApplicationsFromMultipleRegions(String[] remoteRegions) {
boolean includeRemoteRegion = null != remoteRegions && remoteRegions.length != 0;
logger.debug("Fetching applications registry with remote regions: {}, Regions argument {}",
includeRemoteRegion, remoteRegions);
if (includeRemoteRegion) {
GET_ALL_WITH_REMOTE_REGIONS_CACHE_MISS.increment();
} else {
GET_ALL_CACHE_MISS.increment();
}
Applications apps = new Applications();
apps.setVersion(1L);
for (Entry<String, Map<String, Lease<InstanceInfo>>> entry : registry.entrySet()) {
Application app = null;
if (entry.getValue() != null) {
for (Entry<String, Lease<InstanceInfo>> stringLeaseEntry : entry.getValue().entrySet()) {
Lease<InstanceInfo> lease = stringLeaseEntry.getValue();
if (app == null) {
app = new Application(lease.getHolder().getAppName());
}
app.addInstance(decorateInstanceInfo(lease));
}
}
if (app != null) {
apps.addApplication(app);
}
}
if (includeRemoteRegion) {
for (String remoteRegion : remoteRegions) {
RemoteRegionRegistry remoteRegistry = regionNameVSRemoteRegistry.get(remoteRegion);
if (null != remoteRegistry) {
Applications remoteApps = remoteRegistry.getApplications();
for (Application application : remoteApps.getRegisteredApplications()) {
if (shouldFetchFromRemoteRegistry(application.getName(), remoteRegion)) {
logger.info("Application {} fetched from the remote region {}",
application.getName(), remoteRegion);
Application appInstanceTillNow = apps.getRegisteredApplications(application.getName());
if (appInstanceTillNow == null) {
appInstanceTillNow = new Application(application.getName());
apps.addApplication(appInstanceTillNow);
}
for (InstanceInfo instanceInfo : application.getInstances()) {
appInstanceTillNow.addInstance(instanceInfo);
}
} else {
logger.debug("Application {} not fetched from the remote region {} as there exists a "
+ "whitelist and this app is not in the whitelist.",
application.getName(), remoteRegion);
}
}
} else {
logger.warn("No remote registry available for the remote region {}", remoteRegion);
}
}
}
apps.setAppsHashCode(apps.getReconcileHashCode());
return apps;
}
-
服務(wù)續(xù)約
注冊(cè)中心接受到實(shí)例的續(xù)約請(qǐng)求后鹿鳖,會(huì)從注冊(cè)中心找到對(duì)應(yīng)實(shí)例信息,更新最后一次續(xù)約的時(shí)間壮莹,再同步給集群中的其他Eureka Server
資源InstanceResource的renewLease方法接收續(xù)約請(qǐng)求
@PUT
public Response renewLease(
@HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication,
@QueryParam("overriddenstatus") String overriddenStatus,
@QueryParam("status") String status,
@QueryParam("lastDirtyTimestamp") String lastDirtyTimestamp) {
boolean isFromReplicaNode = "true".equals(isReplication);
boolean isSuccess = registry.renew(app.getName(), id, isFromReplicaNode);
// Not found in the registry, immediately ask for a register
if (!isSuccess) {
logger.warn("Not Found (Renew): {} - {}", app.getName(), id);
return Response.status(Status.NOT_FOUND).build();
}
// Check if we need to sync based on dirty time stamp, the client
// instance might have changed some value
Response response;
if (lastDirtyTimestamp != null && serverConfig.shouldSyncWhenTimestampDiffers()) {
response = this.validateDirtyTimestamp(Long.valueOf(lastDirtyTimestamp), isFromReplicaNode);
// Store the overridden status since the validation found out the node that replicates wins
if (response.getStatus() == Response.Status.NOT_FOUND.getStatusCode()
&& (overriddenStatus != null)
&& !(InstanceStatus.UNKNOWN.name().equals(overriddenStatus))
&& isFromReplicaNode) {
registry.storeOverriddenStatusIfRequired(app.getAppName(), id, InstanceStatus.valueOf(overriddenStatus));
}
} else {
response = Response.ok().build();
}
logger.debug("Found (Renew): {} - {}; reply status={}", app.getName(), id, response.getStatus());
return response;
}
調(diào)用InstanceRegistry的續(xù)約方法翅帜,會(huì)調(diào)用PeerAwareInstanceRegistryImpl的續(xù)約方法,該方法完成對(duì)父類續(xù)約方法的調(diào)用以及通知集群中其他節(jié)點(diǎn)完成服務(wù)續(xù)約
public boolean renew(final String appName, final String id, final boolean isReplication) {
if (super.renew(appName, id, isReplication)) {
replicateToPeers(Action.Heartbeat, appName, id, null, null, isReplication);
return true;
}
return false;
}
最終在AbstractInstanceRegistry的續(xù)約方法中命满,會(huì)從registry中拿到服務(wù)信息再找到實(shí)例信息涝滴,更新實(shí)例的最新續(xù)約時(shí)間
public boolean renew(String appName, String id, boolean isReplication) {
RENEW.increment(isReplication);
Map<String, Lease<InstanceInfo>> gMap = registry.get(appName);
Lease<InstanceInfo> leaseToRenew = null;
if (gMap != null) {
leaseToRenew = gMap.get(id);
}
if (leaseToRenew == null) {
RENEW_NOT_FOUND.increment(isReplication);
logger.warn("DS: Registry: lease doesn't exist, registering resource: {} - {}", appName, id);
return false;
} else {
InstanceInfo instanceInfo = leaseToRenew.getHolder();
if (instanceInfo != null) {
// touchASGCache(instanceInfo.getASGName());
InstanceStatus overriddenInstanceStatus = this.getOverriddenInstanceStatus(
instanceInfo, leaseToRenew, isReplication);
if (overriddenInstanceStatus == InstanceStatus.UNKNOWN) {
logger.info("Instance status UNKNOWN possibly due to deleted override for instance {}"
+ "; re-register required", instanceInfo.getId());
RENEW_NOT_FOUND.increment(isReplication);
return false;
}
if (!instanceInfo.getStatus().equals(overriddenInstanceStatus)) {
logger.info(
"The instance status {} is different from overridden instance status {} for instance {}. "
+ "Hence setting the status to overridden status", instanceInfo.getStatus().name(),
instanceInfo.getOverriddenStatus().name(),
instanceInfo.getId());
instanceInfo.setStatusWithoutDirty(overriddenInstanceStatus);
}
}
renewsLastMin.increment();
leaseToRenew.renew();
return true;
}
}
-
服務(wù)卸載
卸載服務(wù)的請(qǐng)求由資源InstanceResource處理,最終會(huì)從注冊(cè)中心移出服務(wù)內(nèi)對(duì)應(yīng)的實(shí)例
cancelLease方法處理卸載服務(wù)的請(qǐng)求胶台,同樣在PeerAwareInstanceRegistryImpl中完成對(duì)父類卸載服務(wù)方法的調(diào)用歼疮,并通知集群中的其他節(jié)點(diǎn)下線該實(shí)例
@DELETE
public Response cancelLease(
@HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication) {
try {
boolean isSuccess = registry.cancel(app.getName(), id,
"true".equals(isReplication));
if (isSuccess) {
logger.debug("Found (Cancel): {} - {}", app.getName(), id);
return Response.ok().build();
} else {
logger.info("Not Found (Cancel): {} - {}", app.getName(), id);
return Response.status(Status.NOT_FOUND).build();
}
} catch (Throwable e) {
logger.error("Error (cancel): {} - {}", app.getName(), id, e);
return Response.serverError().build();
}
}
- 注冊(cè)中心下線服務(wù)時(shí),會(huì)從服務(wù)的實(shí)例列表gMap中移出該實(shí)例
- 最后清除guava響應(yīng)緩存
protected boolean internalCancel(String appName, String id, boolean isReplication) {
try {
read.lock();
CANCEL.increment(isReplication);
Map<String, Lease<InstanceInfo>> gMap = registry.get(appName);
Lease<InstanceInfo> leaseToCancel = null;
if (gMap != null) {
leaseToCancel = gMap.remove(id);
}
synchronized (recentCanceledQueue) {
recentCanceledQueue.add(new Pair<Long, String>(System.currentTimeMillis(), appName + "(" + id + ")"));
}
InstanceStatus instanceStatus = overriddenInstanceStatusMap.remove(id);
if (instanceStatus != null) {
logger.debug("Removed instance id {} from the overridden map which has value {}", id, instanceStatus.name());
}
if (leaseToCancel == null) {
CANCEL_NOT_FOUND.increment(isReplication);
logger.warn("DS: Registry: cancel failed because Lease is not registered for: {}/{}", appName, id);
return false;
} else {
leaseToCancel.cancel();
InstanceInfo instanceInfo = leaseToCancel.getHolder();
String vip = null;
String svip = null;
if (instanceInfo != null) {
instanceInfo.setActionType(ActionType.DELETED);
recentlyChangedQueue.add(new RecentlyChangedItem(leaseToCancel));
instanceInfo.setLastUpdatedTimestamp();
vip = instanceInfo.getVIPAddress();
svip = instanceInfo.getSecureVipAddress();
}
invalidateCache(appName, vip, svip);
logger.info("Cancelled instance {}/{} (replication={})", appName, id, isReplication);
return true;
}
} finally {
read.unlock();
}
}
六 Eureka Server清理失效服務(wù)
Eureka Client通過向Eureka Server發(fā)起服務(wù)續(xù)約的請(qǐng)求以維持自身的持續(xù)在線狀態(tài)诈唬,而對(duì)于多次為發(fā)現(xiàn)續(xù)約請(qǐng)求的服務(wù)實(shí)例韩脏,Eureka Server會(huì)認(rèn)為服務(wù)失效進(jìn)而對(duì)其進(jìn)行剔除操作
EurekaServerInitializerConfiguration
作為L(zhǎng)ifecycle的實(shí)現(xiàn)類,在容器啟動(dòng)時(shí)會(huì)調(diào)用其start方法铸磅,start啟動(dòng)新線程調(diào)用EurekaServerBootstrap
的contextInitialized方法赡矢,調(diào)用鏈依次為:contextInitialized() > initEurekaServerContext() > 注冊(cè)中心從子類到父類的openForTraffic方法 > AbstractInstanceRegistry的postInit方法
- 構(gòu)造對(duì)象時(shí)初始化的evictionTimer
- 初始化的任務(wù)EvictionTask
- postInit方法中將任務(wù)托管給evictionTimer執(zhí)行
private Timer evictionTimer = new Timer("Eureka-EvictionTimer", true);
private final AtomicReference<EvictionTask> evictionTaskRef = new AtomicReference<EvictionTask>();
// 有省略代碼
protected void postInit() {
renewsLastMin.start();
if (evictionTaskRef.get() != null) {
evictionTaskRef.get().cancel();
}
evictionTaskRef.set(new EvictionTask());
evictionTimer.schedule(evictionTaskRef.get(),
serverConfig.getEvictionIntervalTimerInMs(),
serverConfig.getEvictionIntervalTimerInMs());
}
- 在任務(wù)EvictionTask中,最終判斷注冊(cè)中心的實(shí)例是否失效阅仔,是由實(shí)例Lease自身的isExpired來判斷
- 對(duì)于失效的服務(wù)吹散,使用上述internalCancel方法來從注冊(cè)中心移除實(shí)例
@Override
public void run() {
try {
long compensationTimeMs = getCompensationTimeMs();
logger.info("Running the evict task with compensationTime {}ms", compensationTimeMs);
evict(compensationTimeMs);
} catch (Throwable e) {
logger.error("Could not run the evict task", e);
}
}
public void evict(long additionalLeaseMs) {
logger.debug("Running the evict task");
if (!isLeaseExpirationEnabled()) {
logger.debug("DS: lease expiration is currently disabled.");
return;
}
// We collect first all expired items, to evict them in random order. For large eviction sets,
// if we do not that, we might wipe out whole apps before self preservation kicks in. By randomizing it,
// the impact should be evenly distributed across all applications.
List<Lease<InstanceInfo>> expiredLeases = new ArrayList<>();
for (Entry<String, Map<String, Lease<InstanceInfo>>> groupEntry : registry.entrySet()) {
Map<String, Lease<InstanceInfo>> leaseMap = groupEntry.getValue();
if (leaseMap != null) {
for (Entry<String, Lease<InstanceInfo>> leaseEntry : leaseMap.entrySet()) {
Lease<InstanceInfo> lease = leaseEntry.getValue();
if (lease.isExpired(additionalLeaseMs) && lease.getHolder() != null) {
expiredLeases.add(lease);
}
}
}
}
// To compensate for GC pauses or drifting local time, we need to use current registry size as a base for
// triggering self-preservation. Without that we would wipe out full registry.
int registrySize = (int) getLocalRegistrySize();
int registrySizeThreshold = (int) (registrySize * serverConfig.getRenewalPercentThreshold());
int evictionLimit = registrySize - registrySizeThreshold;
int toEvict = Math.min(expiredLeases.size(), evictionLimit);
if (toEvict > 0) {
logger.info("Evicting {} items (expired={}, evictionLimit={})", toEvict, expiredLeases.size(), evictionLimit);
Random random = new Random(System.currentTimeMillis());
for (int i = 0; i < toEvict; i++) {
// Pick a random item (Knuth shuffle algorithm)
int next = i + random.nextInt(expiredLeases.size() - i);
Collections.swap(expiredLeases, i, next);
Lease<InstanceInfo> lease = expiredLeases.get(i);
String appName = lease.getHolder().getAppName();
String id = lease.getHolder().getId();
EXPIRED.increment();
logger.warn("DS: Registry: expired lease for {}/{}", appName, id);
internalCancel(appName, id, false);
}
}
}
duration是Lease中內(nèi)置的默認(rèn)時(shí)間為90S,判斷規(guī)則很簡(jiǎn)單:當(dāng)前時(shí)間是否已經(jīng)超過八酒,實(shí)例最后一次續(xù)約時(shí)間+90S+補(bǔ)償時(shí)間送浊,超過則視為服務(wù)失效
public boolean isExpired(long additionalLeaseMs) {
return (evictionTimestamp > 0 || System.currentTimeMillis() > (lastUpdateTimestamp + duration + additionalLeaseMs));
}
七 Eureka Server集群間的交互
Eureka Server集群見的交互指的是:接收到客戶端請(qǐng)求的Eureka Server節(jié)點(diǎn)將同步該請(qǐng)求到所有其他節(jié)點(diǎn)(注意并不是任何兩個(gè)Eureka Server節(jié)點(diǎn)都會(huì)進(jìn)行同步),上述服務(wù)注冊(cè)丘跌、續(xù)約和卸載都會(huì)在PeerAwareInstanceRegistryImpl中發(fā)起這種對(duì)集群中其他節(jié)點(diǎn)的請(qǐng)求
peerEurekaNodes.getPeerEurekaNodes()會(huì)獲取到當(dāng)前Eureka Server節(jié)點(diǎn)上保存的集群中的其他Eureka Server節(jié)點(diǎn)信息PeerEurekaNode(在開始啟動(dòng)Eureka Server時(shí)封裝的)袭景,然后在replicateInstanceActionsToPeers中完成對(duì)其他節(jié)點(diǎn)的同步
private void replicateToPeers(Action action, String appName, String id,
InstanceInfo info /* optional */,
InstanceStatus newStatus /* optional */, boolean isReplication) {
Stopwatch tracer = action.getTimer().start();
try {
if (isReplication) {
numberOfReplicationsLastMin.increment();
}
// If it is a replication already, do not replicate again as this will create a poison replication
if (peerEurekaNodes == Collections.EMPTY_LIST || isReplication) {
return;
}
for (final PeerEurekaNode node : peerEurekaNodes.getPeerEurekaNodes()) {
// If the url represents this host, do not replicate to yourself.
if (peerEurekaNodes.isThisMyUrl(node.getServiceUrl())) {
continue;
}
replicateInstanceActionsToPeers(action, appName, id, info, newStatus, node);
}
} finally {
tracer.stop();
}
}
使用PeerEurekaNode中開始配置好的httpClient對(duì)集群中的其他節(jié)點(diǎn)發(fā)起請(qǐng)求
private void replicateInstanceActionsToPeers(Action action, String appName,
String id, InstanceInfo info, InstanceStatus newStatus,
PeerEurekaNode node) {
try {
InstanceInfo infoFromRegistry = null;
CurrentRequestVersion.set(Version.V2);
switch (action) {
case Cancel:
node.cancel(appName, id);
break;
case Heartbeat:
InstanceStatus overriddenStatus = overriddenInstanceStatusMap.get(id);
infoFromRegistry = getInstanceByAppAndId(appName, id, false);
node.heartbeat(appName, id, infoFromRegistry, overriddenStatus, false);
break;
case Register:
node.register(info);
break;
case StatusUpdate:
infoFromRegistry = getInstanceByAppAndId(appName, id, false);
node.statusUpdate(appName, id, newStatus, infoFromRegistry);
break;
case DeleteStatusOverride:
infoFromRegistry = getInstanceByAppAndId(appName, id, false);
node.deleteStatusOverride(appName, id, infoFromRegistry);
break;
}
} catch (Throwable t) {
logger.error("Cannot replicate information to {} for action {}", node.getServiceUrl(), action.name(), t);
}
}
而Eureka Server服務(wù)中處理這些請(qǐng)求的是資源PeerReplicationResource,但追蹤到底層最終還是調(diào)用了注冊(cè)中心AbstractInstanceRegistry的相關(guān)方法來完成集群間的同步
@Path("batch")
@POST
public Response batchReplication(ReplicationList replicationList) {
try {
ReplicationListResponse batchResponse = new ReplicationListResponse();
for (ReplicationInstance instanceInfo : replicationList.getReplicationList()) {
try {
batchResponse.addResponse(dispatch(instanceInfo));
} catch (Exception e) {
batchResponse.addResponse(new ReplicationInstanceResponse(Status.INTERNAL_SERVER_ERROR.getStatusCode(), null));
logger.error("{} request processing failed for batch item {}/{}",
instanceInfo.getAction(), instanceInfo.getAppName(), instanceInfo.getId(), e);
}
}
return Response.ok(batchResponse).build();
} catch (Throwable e) {
logger.error("Cannot execute batch Request", e);
return Response.status(Status.INTERNAL_SERVER_ERROR).build();
}
}
private ReplicationInstanceResponse dispatch(ReplicationInstance instanceInfo) {
ApplicationResource applicationResource = createApplicationResource(instanceInfo);
InstanceResource resource = createInstanceResource(instanceInfo, applicationResource);
String lastDirtyTimestamp = toString(instanceInfo.getLastDirtyTimestamp());
String overriddenStatus = toString(instanceInfo.getOverriddenStatus());
String instanceStatus = toString(instanceInfo.getStatus());
Builder singleResponseBuilder = new Builder();
switch (instanceInfo.getAction()) {
case Register:
singleResponseBuilder = handleRegister(instanceInfo, applicationResource);
break;
case Heartbeat:
singleResponseBuilder = handleHeartbeat(serverConfig, resource, lastDirtyTimestamp, overriddenStatus, instanceStatus);
break;
case Cancel:
singleResponseBuilder = handleCancel(resource);
break;
case StatusUpdate:
singleResponseBuilder = handleStatusUpdate(instanceInfo, resource);
break;
case DeleteStatusOverride:
singleResponseBuilder = handleDeleteStatusOverride(instanceInfo, resource);
break;
}
return singleResponseBuilder.build();
}
Eureka Client與Eureka Server之間的細(xì)節(jié)頗多闭树,要想全部記錄清楚需要花點(diǎn)時(shí)間耸棒,例如通篇遇到的分區(qū)region,文中還未介紹报辱,有興趣可以自行研究与殃。