Eureka是一個基于REST并應(yīng)用于AWS云服務(wù)的服務(wù)注冊中心哀蘑,并且經(jīng)歷過了Netflix公司的生產(chǎn)考驗碘勉,絕對是我們值得細(xì)心研讀的中間件罩扇。雖然我們可能并未接觸過AWS梧奢,但在閱讀Eureka之前應(yīng)該簡單地了解一下Amazon EC2中的某些概念担神,如地區(qū)和可用區(qū)域楼吃,這樣能更好地理解Eureka中某些特定的術(shù)語。
在閱讀本文之前妄讯,希望大家都已經(jīng)對Eureka有簡單的認(rèn)識:
Understanding eureka client server communication
Understanding Eureka Peer to Peer Communication
Register - 注冊
Eureka Instance注冊的REST入口在com.netflix.eureka.resources.ApplicationResource#addInstance
/**
* Registers information about a particular instance for an
* {@link com.netflix.discovery.shared.Application}.
*
* @param info
* {@link InstanceInfo} information of the instance.
* @param isReplication
* a header parameter containing information whether this is
* replicated from other nodes.
*/
@POST
@Consumes({"application/json", "application/xml"})
public Response addInstance(InstanceInfo info,
@HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication) {
logger.debug("Registering instance {} (replication={})", info.getId(), isReplication);
// *********** 字段校驗 ************
// validate that the instanceinfo contains all the necessary required fields
if (isBlank(info.getId())) {
return Response.status(400).entity("Missing instanceId").build();
} else if (isBlank(info.getHostName())) {
return Response.status(400).entity("Missing hostname").build();
} else if (isBlank(info.getAppName())) {
return Response.status(400).entity("Missing appName").build();
} else if (!appName.equals(info.getAppName())) {
return Response.status(400).entity("Mismatched appName, expecting " + appName + " but was " + info.getAppName()).build();
} else if (info.getDataCenterInfo() == null) {
return Response.status(400).entity("Missing dataCenterInfo").build();
} else if (info.getDataCenterInfo().getName() == null) {
return Response.status(400).entity("Missing dataCenterInfo Name").build();
}
// handle cases where clients may be registering with bad DataCenterInfo with missing data
DataCenterInfo dataCenterInfo = info.getDataCenterInfo();
// 僅當(dāng)DataCenterInfo為AmazonInfo實例的時候孩锡,其父類有可能是UniqueIdentifier
if (dataCenterInfo instanceof UniqueIdentifier) {
// ......
}
// *********** 字段校驗 END ************
registry.register(info, "true".equals(isReplication)); // (1)
return Response.status(204).build(); // 204 to be backwards compatible
}
真正的注冊操作在(1)
處,需要注意的是isReplication
變量取決于HTTP頭x-netflix-discovery-replication
的值亥贸。繼續(xù)追蹤(1)
的調(diào)用棧躬窜,發(fā)現(xiàn)執(zhí)行注冊操作的方法是是com.netflix.eureka.registry.PeerAwareInstanceRegistryImpl#register
注意該方法的javadoc,他告訴了我們一個比較重要的訊息:將InstanceInfo
實例信息注冊到Eureka并且復(fù)制該信息到其他peer炕置。如果當(dāng)前收到的注冊信息是來自其他peer的復(fù)制事件荣挨,那么將不會將這個注冊信息繼續(xù)復(fù)制到其他peer男韧,這個標(biāo)志位就是上面所述的isReplication
。
/**
* Registers the information about the {@link InstanceInfo} and replicates
* this information to all peer eureka nodes. If this is replication event
* from other replica nodes then it is not replicated.
*
* @param info
* the {@link InstanceInfo} to be registered and replicated.
* @param isReplication
* true if this is a replication event from other replica nodes,
* false otherwise.
*/
@Override
public void register(final InstanceInfo info, final boolean isReplication) {
// 默認(rèn)租約有效時長為90s
int leaseDuration = Lease.DEFAULT_DURATION_IN_SECS;
// 注冊信息里包含則依照注冊信息的租約時長
if (info.getLeaseInfo() != null && info.getLeaseInfo().getDurationInSecs() > 0) {
leaseDuration = info.getLeaseInfo().getDurationInSecs();
}
// super為AbstractInstanceRegistry
super.register(info, leaseDuration, isReplication);
// 復(fù)制到其他peer
replicateToPeers(Action.Register, info.getAppName(), info.getId(), info, null, isReplication);
}
我們看到是先獲取到租約的有效時長默垄,然后才是真真正正地委托給super執(zhí)行注冊操作super.register(...)
并將注冊信息復(fù)制到其他peer此虑。register方法非常長,我們重點觀察一下他的注冊表的結(jié)構(gòu):
private final ConcurrentHashMap<String, Map<String, Lease<InstanceInfo>>> registry
該注冊表是一個以app name為key(在Spring Cloud里就是spring.application.name
)口锭,嵌套Map為value的ConcurrentHashMap結(jié)構(gòu)寡壮。其嵌套Map是以Instance ID為key,Lease對象為value的鍵值結(jié)構(gòu)讹弯。這個registry注冊表在Eureka Server或SpringBoot Admin的監(jiān)控面板上以Eureka Service這個角色出現(xiàn)况既。
/**
* Registers a new instance with a given duration.
*
* @see com.netflix.eureka.lease.LeaseManager#register(java.lang.Object, int, boolean)
*/
public void register(InstanceInfo registrant, int leaseDuration, boolean isReplication) {
try {
read.lock();
// 可以看出registry是一個以info的app name為key的Map結(jié)構(gòu), 也就是以spring.application.name的大寫串為key
Map<String, Lease<InstanceInfo>> gMap = registry.get(registrant.getAppName());
REGISTER.increment(isReplication);
if (gMap == null) {
final ConcurrentHashMap<String, Lease<InstanceInfo>> gNewMap = new ConcurrentHashMap<String, Lease<InstanceInfo>>();
gMap = registry.putIfAbsent(registrant.getAppName(), gNewMap);
if (gMap == null) {
gMap = gNewMap;
}
}
// registry的value的Map結(jié)構(gòu)是以info的id為key,這里的id就是Eureka文檔上的Instance ID组民,給你個例子你就想起是什么東西了:10.8.88.233:config-server:10888
Lease<InstanceInfo> existingLease = gMap.get(registrant.getId());
// .......
Lease<InstanceInfo> lease = new Lease<InstanceInfo>(registrant, leaseDuration);
if (existingLease != null) {
lease.setServiceUpTimestamp(existingLease.getServiceUpTimestamp());
}
gMap.put(registrant.getId(), lease);
// .......
} finally {
read.unlock();
}
}
上面是register(...)
中關(guān)于registry的大致操作棒仍,其中有相當(dāng)一部分的操作被略去了,如果感興趣的話可以細(xì)致地研究一下臭胜。
Renew and Cancel Lease - 續(xù)約與取消租約
續(xù)約的REST入口在com.netflix.eureka.resources.InstanceResource#renewLease
而取消租約的REST入口在com.netflix.eureka.resources.InstanceResource#cancelLease
兩者的基本思想相似莫其,經(jīng)由InstanceRegistry
-> AbstractInstanceRegistry
-> PeerAwareInstanceRegistryImpl
,其中PeerAwareInstanceRegistryImpl裝飾了添加復(fù)制信息到其他節(jié)點的功能耸三。其中register乱陡、renew、cancel仪壮、statusUpdate和deleteStatusOverride都會將其信息復(fù)制到其他節(jié)點憨颠。
Fetch Registry - 獲取注冊信息
獲取所有Eureka Instance的注冊信息,com.netflix.eureka.resources.ApplicationsResource#getContainers
积锅,其注冊信息由ResponseCacheImpl
緩存爽彤,緩存的過期時間在其構(gòu)造函數(shù)中由EurekaServerConfig.getResponseCacheUpdateIntervalMs()
所控制,默認(rèn)緩存時間為30s缚陷。而差量注冊信息在Server端會保存得更為長一些(大約3分鐘)适篙,因此獲取的差量可能會重復(fù)返回相同的實例。Eureka Client會自動處理這些重復(fù)信息箫爷。
Evcition
Eureke Server定期進行失效節(jié)點的清理嚷节,執(zhí)行該任務(wù)的定時器被定義在com.netflix.eureka.registry.AbstractInstanceRegistry#evictionTimer
,真正的任務(wù)是由他的內(nèi)部類AbstractInstanceRegistry#EvictionTask
所執(zhí)行虎锚,默認(rèn)為每60s執(zhí)行一次清理任務(wù)硫痰,其執(zhí)行間隔由EurekaServerConfig#getEvictionIntervalTimerInMs
[eureka.server.eviction-interval-timer-in-ms]所決定。
回顧一下上面剛說完的注冊流程翁都,在PeerAwareInstanceRegistryImpl#register
里面特別指出了默認(rèn)的租約時長為90s[eureka.Instance.lease-expiration-duration-in-seconds]碍论,即如果90s后都沒有收到特定的Eureka Instance的Heartbeats,則會認(rèn)為這個Instance已經(jīng)失效(Instance在正常情況下默認(rèn)每隔30s發(fā)送一個Heartbeats[eureka.Instance.lease-renewal-interval-in-seconds]柄慰,對以上兩個默認(rèn)值有疑問的可以翻閱LeaseInfo
)鳍悠,EvictionTask
則會把這個Instance納入清理的范圍税娜。我們看看EvictionTask
的清理代碼是怎么寫的。
public void evict(long additionalLeaseMs) {
logger.debug("Running the evict task");
if (!isLeaseExpirationEnabled()) {
logger.debug("DS: lease expiration is currently disabled.");
return;
}
// We collect first all expired items, to evict them in random order. For large eviction sets,
// if we do not that, we might wipe out whole apps before self preservation kicks in. By randomizing it,
// the impact should be evenly distributed across all applications.
// (2) 下面的for循環(huán)就是把registry中所有的Lease提取到局部變量expiredLeases
List<Lease<InstanceInfo>> expiredLeases = new ArrayList<>();
for (Entry<String, Map<String, Lease<InstanceInfo>>> groupEntry : registry.entrySet()) {
Map<String, Lease<InstanceInfo>> leaseMap = groupEntry.getValue();
if (leaseMap != null) {
for (Entry<String, Lease<InstanceInfo>> leaseEntry : leaseMap.entrySet()) {
Lease<InstanceInfo> lease = leaseEntry.getValue();
if (lease.isExpired(additionalLeaseMs) && lease.getHolder() != null) {
expiredLeases.add(lease);
}
}
}
}
// To compensate for GC pauses or drifting local time, we need to use current registry size as a base for
// triggering self-preservation. Without that we would wipe out full registry.
int registrySize = (int) getLocalRegistrySize();
int registrySizeThreshold = (int) (registrySize * serverConfig.getRenewalPercentThreshold()); // (3)
int evictionLimit = registrySize - registrySizeThreshold;
int toEvict = Math.min(expiredLeases.size(), evictionLimit);
if (toEvict > 0) {
logger.info("Evicting {} items (expired={}, evictionLimit={})", toEvict, expiredLeases.size(), evictionLimit);
Random random = new Random(System.currentTimeMillis());
for (int i = 0; i < toEvict; i++) {
// Pick a random item (Knuth shuffle algorithm)
int next = i + random.nextInt(expiredLeases.size() - i);
Collections.swap(expiredLeases, i, next);
Lease<InstanceInfo> lease = expiredLeases.get(i);
String appName = lease.getHolder().getAppName();
String id = lease.getHolder().getId();
EXPIRED.increment();
logger.warn("DS: Registry: expired lease for {}/{}", appName, id);
internalCancel(appName, id, false);
}
}
}
在(2)
中把本地的registry中的租約信息全部提取出來藏研,并在(3)
通過serverConfig.getRenewalPercentThreshold()
[eureka.server.renewal-percent-threshold敬矩,默認(rèn)85%]計算出一個最大可剔除的閾值evictionLimit
。
新增Peer Node時的初始化
在有多個Eureka Server的情況下蠢挡,每個Eureka Server之間是如何發(fā)現(xiàn)對方的呢弧岳?
通過調(diào)試之后,我們根據(jù)調(diào)用鏈從下往上追溯业踏,其初始入口為org.springframework.cloud.netflix.eureka.server.EurekaServerBootstrap#contextInitialized
public void contextInitialized(ServletContext context) {
try {
initEurekaEnvironment();
initEurekaServerContext(); // (4)
context.setAttribute(EurekaServerContext.class.getName(), this.serverContext);
}
catch (Throwable e) {
log.error("Cannot bootstrap eureka server :", e);
throw new RuntimeException("Cannot bootstrap eureka server :", e);
}
}
由下個入口(4)
最終可以定位到方法com.netflix.eureka.registry.PeerAwareInstanceRegistryImpl#syncUp
禽炬,從對應(yīng)的javadoc上我們可以知道該方法從peer eureka節(jié)點往自己填充注冊表信息。 如果操作失敗則此同步操作將failover到其他節(jié)點勤家,直到遍歷完列表(service urls)為止腹尖。該方法與普通的Eureka Client注冊到Eureka Server不同的一點是,其標(biāo)志位isReplication
為true伐脖,如果不記得這是什么作用的話可以翻閱到上面的Register - 注冊
小節(jié)热幔。
Peer Node信息的定時更新
首先我們看Eureka Server的上下文實體中的方法com.netflix.eureka.DefaultEurekaServerContext#initialize
@PostConstruct
@Override
public void initialize() throws Exception {
logger.info("Initializing ...");
peerEurekaNodes.start(); // (5)
registry.init(peerEurekaNodes);
logger.info("Initialized");
}
該方法明確指出這是一個Spring Bean,在構(gòu)建Bean完成后執(zhí)行此方法讼庇,繼續(xù)追蹤(5)
绎巨。
public void start() {
taskExecutor = Executors.newSingleThreadScheduledExecutor(
new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r, "Eureka-PeerNodesUpdater");
thread.setDaemon(true);
return thread;
}
}
);
try {
updatePeerEurekaNodes(resolvePeerUrls()); // (6)
Runnable peersUpdateTask = new Runnable() { // (7)
@Override
public void run() {
try {
updatePeerEurekaNodes(resolvePeerUrls()); // (6)
} catch (Throwable e) {
logger.error("Cannot update the replica Nodes", e);
}
}
};
taskExecutor.scheduleWithFixedDelay(
peersUpdateTask, // (7)
serverConfig.getPeerEurekaNodesUpdateIntervalMs(), // (8)
serverConfig.getPeerEurekaNodesUpdateIntervalMs(),
TimeUnit.MILLISECONDS
);
} catch (Exception e) {
throw new IllegalStateException(e);
}
for (PeerEurekaNode node : peerEurekaNodes) {
logger.info("Replica node URL: " + node.getServiceUrl());
}
}
上面這段代碼很清晰地告訴我們在啟動Eureka Server的時候就會調(diào)用updatePeerEurekaNodes(...)
更新peer的狀態(tài),并封裝為一個Runnable進行周期性更新蠕啄。這個定時時間由serverConfig.getPeerEurekaNodesUpdateIntervalMs()
[eureka.server.peer-eureka-nodes-update-interval-ms]所控制场勤,默認(rèn)值為600s,即10min介汹。一直經(jīng)由EndpointUtils#getDiscoveryServiceUrls
却嗡、EndpointUtils#getServiceUrlsFromConfig
至EurekaClientConfigBean#getEurekaServerServiceUrls
獲得對應(yīng)zone的service urls,如有需要可以覆蓋上述getEurekaServerServiceUrls
方法以動態(tài)獲取service urls嘹承,而不是選擇Spring Cloud默認(rèn)從properties文件讀取。
Self Preservation - 自我保護
當(dāng)新增Eureka Server時如庭,他會先嘗試從其他Peer上獲取所有Eureka Instance的注冊信息叹卷。如果在獲取時出現(xiàn)問題,該Eureka Server會在放棄之前嘗試在其他Peer上獲取注冊信息坪它。如果這個Eureka Server成功獲取到所有Instance的注冊信息骤竹,那么他就會根據(jù)所獲取到的注冊信息設(shè)置應(yīng)該接收到的續(xù)約閾值。如果在任何時候續(xù)約的閾值低于所設(shè)定的值(在15分鐘[eureka.server.renewal-threshold-update-interval-ms]內(nèi)低于85%[eureka.server.renewal-percent-threshold])往毡,則該Eureka Server會出于保護當(dāng)前注冊列表的目的而停止將任何Instance進行過期處理蒙揣。
在Netflix中上述保護措施被成為自我保護模式,主要是用于Eureka Server與Eureka Client存在網(wǎng)絡(luò)分區(qū)情況下的場景开瞭。在這種情況下懒震,Eureka Server嘗試保護其已有的實例信息罩息,但如果出現(xiàn)大規(guī)模的網(wǎng)絡(luò)分區(qū)時,相應(yīng)的Eureka Client會獲取到大量無法響應(yīng)的服務(wù)个扰。所以瓷炮,Eureka Client必須確保對于一些不存在或者無法響應(yīng)的Eureka Instance具備更加彈性的應(yīng)對策略,例如快速超時并嘗試其他實例递宅。
在網(wǎng)絡(luò)分區(qū)出現(xiàn)時可能會發(fā)生以下幾種情況:
- Peer之間的心跳可能會失敗娘香,某Eureka Server檢測到這種情況并為了保護當(dāng)前的注冊列表而進入了自我保護模式。新的注冊可能發(fā)生在某些孤立的Eureka Server上办龄,某些Eureka Client可能會擁有新的注冊列表烘绽,而另外一些則可能沒有(不同的實例視圖)。
- 當(dāng)網(wǎng)絡(luò)恢復(fù)到穩(wěn)定狀態(tài)后俐填,Eureka Server會進行自我修復(fù)诀姚。當(dāng)Peer能正常通信之后注冊信息會被重新同步。
- 最重要的一點是玷禽,在網(wǎng)絡(luò)中斷期間Eureka Server應(yīng)該更距彈性赫段,但在這段期間Eureka Client可能會有不同的實例視圖。
作者:Chris
原博客:http://blog.chriscs.com
Github:https://github.com/prontera