本文使用Spring Cloud Eureka分析
Spring Cloud版本: Dalston.SR5
spring-cloud-starter-eureka版本: 1.3.6.RELEASE
netflix eureka版本: 1.6.2
繼續(xù) 從Eureka Client發(fā)起注冊請求到Eureka Server處理的整個服務(wù)注冊過程(上) 分析
一、Spring Cloud Eureka Server自動配置及初始化
@EnableEurekaServer
創(chuàng)建Spring Cloud Eureka Server首先要使用@EnableEurekaServer
注解,其實質(zhì)是:
@EnableDiscoveryClient
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Import(EurekaServerMarkerConfiguration.class)
public @interface EnableEurekaServer {
}
-
@EnableDiscoveryClient
: 引入服務(wù)發(fā)現(xiàn)客戶端相關(guān)配置(身為Server的同時失息,在Server集群復(fù)制時也會作為Client) - 導(dǎo)入
EurekaServerMarkerConfiguration
: 激活EurekaServerAutoConfiguration
所以仪媒,@EnableEurekaServer
注解和上一篇分析的Client啟動注解都是通過向Spring容器注入Maker的形式激活xxAutoConfiguration配置類蛀恩,Eureka Client是EurekaClientAutoConfiguration
茄蚯,Eureka Server是EurekaServerAutoConfiguration
EurekaServerAutoConfiguration - 注冊服務(wù)自動配置類
以下是對自動注入的各個組件的簡單分析:
-
頭部注解
@Import(EurekaServerInitializerConfiguration.class):導(dǎo)入Eureka Server初始化的配置類瞳遍,其實現(xiàn)SmartLifecycle接口闻妓,會在Spring容器基本refresh完畢時調(diào)用
EurekaServerBootstrap#contextInitialized()
Eureka Server啟動分析重點-
@EnableConfigurationProperties({ EurekaDashboardProperties.class,InstanceRegistryProperties.class })
EurekaDashboardProperties
是儀表盤相關(guān)屬性-
InstanceRegistryProperties
是實例注冊相關(guān)屬性@ConfigurationProperties(PREFIX) public class InstanceRegistryProperties { public static final String PREFIX = "eureka.instance.registry"; /* Default number of expected renews per minute, defaults to 1. * Setting expectedNumberOfRenewsPerMin to non-zero to ensure that even an isolated * server can adjust its eviction policy to the number of registrations (when it's * zero, even a successful registration won't reset the rate threshold in * InstanceRegistry.register()). * 每分鐘默認續(xù)約數(shù)量為1 * 將expectedNumberOfRenewsPerMin設(shè)置為非零 * 以確保即使是隔離的服務(wù)器也可以根據(jù)注冊數(shù)量調(diào)整其驅(qū)逐策略 * (當它為零時,即使成功注冊也不會重置InstanceRegistry.register()中的速率閾值) */ @Value("${eureka.server.expectedNumberOfRenewsPerMin:1}") // for backwards compatibility // 為了向后兼容 private int expectedNumberOfRenewsPerMin = 1; /** * Value used in determining when leases are cancelled, default to 1 for standalone. * Should be set to 0 for peer replicated eurekas * 決定租約何時取消的值 * 單機默認值為1掠械,對于同行復(fù)制的eurekas由缆,應(yīng)設(shè)置為0 */ @Value("${eureka.server.defaultOpenForTrafficCount:1}") // for backwards compatibility private int defaultOpenForTrafficCount = 1;
@PropertySource("classpath:/eureka/server.properties") :在spring-cloud-netflix-eureka-server-xxx.jar中,只包含 spring.http.encoding.force=false
EurekaServerFeature: 訪問
/features
端點時會顯示啟用的Eureka Server自動配置類為EurekaServerAutoConfiguration-
EurekaServerConfig: 注入Eureka Server配置類猾蒂,
EurekaServerConfig
是netflix的接口均唉,里面有很多記錄eureka服務(wù)器運行所需的配置信息,netflix的默認實現(xiàn)類是DefaultEurekaServerConfig
肚菠,spring cloud的默認實現(xiàn)類是EurekaServerConfigBean
@Configuration protected static class EurekaServerConfigBeanConfiguration { @Bean @ConditionalOnMissingBean public EurekaServerConfig eurekaServerConfig(EurekaClientConfig clientConfig) { EurekaServerConfigBean server = new EurekaServerConfigBean(); //創(chuàng)建EurekaServerConfigBean // 如果當前Eureka Server本身也需要作為客戶端注冊(集群模式必須開啟舔箭??) if (clientConfig.shouldRegisterWithEureka()) { // Set a sensible default if we are supposed to replicate // 設(shè)置EurekaServer在啟動期間eureka節(jié)點嘗試從對等放獲取注冊表信息的重試次數(shù) server.setRegistrySyncRetries(5); } return server; } }
EurekaController:Eureka Server Dashborad 對應(yīng)的 Controller(默認path: /)
-
PeerAwareInstanceRegistry: 直譯是對等體可見的應(yīng)用實例注冊器蚊逢,就是在注冊實例時會考慮集群情況下其它Node相關(guān)操作的注冊器
@Bean public PeerAwareInstanceRegistry peerAwareInstanceRegistry( ServerCodecs serverCodecs) { this.eurekaClient.getApplications(); // force initialization // 強制初始化eurekaClient层扶,在之前看RefreshScope的bug時,也使用到了這種方式強制創(chuàng)建eurekaClient // 創(chuàng)建InstanceRegistry(是spring cloud的實現(xiàn)) // 繼承了PeerAwareInstanceRegistryImpl时捌,PeerAwareInstanceRegistry接口的實現(xiàn)類 return new InstanceRegistry( this.eurekaServerConfig, this.eurekaClientConfig, serverCodecs, this.eurekaClient, this.instanceRegistryProperties.getExpectedNumberOfRenewsPerMin(), this.instanceRegistryProperties.getDefaultOpenForTrafficCount()); }
-
PeerEurekaNodes: 用來管理PeerEurekaNode的幫助類
-
start(): 會創(chuàng)建一個newSingleThreadScheduledExecutor定時單例線程池怒医,定時更新PeerNode列表,線程名為“Eureka-PeerNodesUpdater”奢讨,線程執(zhí)行間隔為
EurekaServerConfigBean.peerEurekaNodesUpdateIntervalMs=10 * MINUTES
稚叹,調(diào)用時機是:DefaultEurekaServerContext在@PostConstruct調(diào)用initialize()-->peerEurekaNodes.start() -
updatePeerEurekaNodes(resolvePeerUrls()): 定時線程里更新PeerNode列表的核心邏輯
- resolvePeerUrls() : 是解析其它Eureka Server Node節(jié)點URL焰薄,會根據(jù)當前Server的zone和shouldPreferSameZoneEureka的配置獲取一個經(jīng)過排序的replicaUrls集合,再判斷replicaUrls有沒有自己扒袖,有就remove
-
updatePeerEurekaNodes(): 將內(nèi)存中的老的PeerEurekaNodes.peerEurekaNodeUrls 與 上一步獲得的新的newPeerUrls對比塞茅,刪除不可用的,新增新添加的季率。之所以不直接用newPeerUrls野瘦,是因為在刪除不可用時可以做
PeerEurekaNode#shutdown()
,在添加新的可以PeerEurekaNodes#createPeerEurekaNode()
-
start(): 會創(chuàng)建一個newSingleThreadScheduledExecutor定時單例線程池怒医,定時更新PeerNode列表,線程名為“Eureka-PeerNodesUpdater”奢讨,線程執(zhí)行間隔為
-
EurekaServerContext: Eureka Server啟動分析重點
Eureka Server上下文接口飒泻,包含initialize()鞭光、shutdown()方法,EurekaServerConfig配置泞遗,PeerEurekaNodes節(jié)點管理幫助類惰许,PeerAwareInstanceRegistry對等體可見的應(yīng)用實例注冊器,ApplicationInfoManager當前應(yīng)用實例info信息管理器(是由Client配置初始化的)
默認實現(xiàn)類 com.netflix.eureka.DefaultEurekaServerContext
-
@PostConstruct方法包含一些初始化邏輯(說明初始化方法是在DefaultEurekaServerContext構(gòu)造后由@PostConstruct觸發(fā)的史辙?)
@PostConstruct @Override public void initialize() throws Exception { logger.info("Initializing ..."); // PeerEurekaNode的幫助類start // 會啟動更新PeerNode列表的定時線程 peerEurekaNodes.start(); // PeerAwareInstanceRegistry初始化 // 啟動numberOfReplicationsLastMin定時線程汹买、initializedResponseCache()、scheduleRenewalThresholdUpdateTask()聊倔、initRemoteRegionRegistry()晦毙,還有添加JMX監(jiān)控 registry.init(peerEurekaNodes); logger.info("Initialized"); }
EurekaServerBootstrap: Eureka Server啟動引導(dǎo),會在Spring容器基本refresh()完畢時由EurekaServerInitializerConfiguration#run()方法真正調(diào)用
eurekaServerBootstrap.contextInitialized()
初始化耙蔑,其中會initEurekaEnvironment()
见妒、initEurekaServerContext()
Eureka Server啟動分析重點注冊 Jersey filter: 所有
/eureka
的請求都需要經(jīng)過Jersery Filter,其處理類是com.sun.jersey.spi.container.servlet.ServletContainer纵潦,其既是Filter徐鹤,也是Servlet,包含Jersey的處理邏輯邀层。在構(gòu)造時已經(jīng)將 com.netflix.discovery包 和 com.netflix.eureka包 下的類作為處理請求的資源導(dǎo)入,如處理單個應(yīng)用請求的com.netflix.eureka.resources.ApplicationResource
經(jīng)過上面的EurekaServerAutoConfiguration自動配置類分析后遂庄,個人感覺有幾個重點:
1寥院、DefaultEurekaServerContext(Eureka Server上下文) 初始化
因為netflix設(shè)計的EurekaServerContext接口本身包含很多成員變量,如PeerEurekaNodes管理對等節(jié)點涛目、PeerAwareInstanceRegistry考慮對等節(jié)點的實例注冊器等秸谢,在Eureka Server上下文初始化時會對這些組件初始化,還會啟動一些定時線程
2霹肝、EurekaServerBootstrap初始化
EurekaServerBootstrap是spring cloud實現(xiàn)的Eureka Server的啟動引導(dǎo)類估蹄,在netflix對應(yīng)的是
EurekaBootstrap
怖喻。而這個啟動引導(dǎo)類初始化是在EurekaServerInitializerConfiguration這個Spring的SmartLifecycle bean的生命周期方法中觸發(fā)的剖笙,在refresh()幾乎完成的時候,所以會在Eureka Server上下文初始化之后3肌似、jerseyFilter,用于處理所有到/eureka的請求
【重點1】Eureka Server上下文初始化
首先看Netflix的EurekaServerContext接口是如何定義的:
public interface EurekaServerContext {
void initialize() throws Exception;
void shutdown() throws Exception;
EurekaServerConfig getServerConfig();
PeerEurekaNodes getPeerEurekaNodes();
ServerCodecs getServerCodecs();
PeerAwareInstanceRegistry getRegistry();
ApplicationInfoManager getApplicationInfoManager();
}
除了初始化initialize()方法垮兑,shutdown()方法冷尉,還有一些組件EurekaServerConfig、PeerEurekaNodes系枪、ServerCodecs雀哨、PeerAwareInstanceRegistry、ApplicationInfoManager私爷,而在自動配置構(gòu)造DefaultEurekaServerContext時雾棺,這些組件都已設(shè)置好
@Inject
public DefaultEurekaServerContext(EurekaServerConfig serverConfig,
ServerCodecs serverCodecs,
PeerAwareInstanceRegistry registry,
PeerEurekaNodes peerEurekaNodes,
ApplicationInfoManager applicationInfoManager) {
this.serverConfig = serverConfig;
this.serverCodecs = serverCodecs;
this.registry = registry;
this.peerEurekaNodes = peerEurekaNodes;
this.applicationInfoManager = applicationInfoManager;
}
接下來是由@PostConstruct
觸發(fā)的初始化方法
@PostConstruct
@Override
public void initialize() throws Exception {
logger.info("Initializing ...");
peerEurekaNodes.start();
registry.init(peerEurekaNodes);
logger.info("Initialized");
}
主要調(diào)用了2個組件的初始化方法:PeerEurekaNodes
和 PeerAwareInstanceRegistry
1、PeerEurekaNodes#start(): 初始化對等節(jié)點信息
public void start() {
// 后臺運行的單線程定時任務(wù)執(zhí)行器衬浑,定時線程名:Eureka-PeerNodesUpdater
taskExecutor = Executors.newSingleThreadScheduledExecutor(
new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r, "Eureka-PeerNodesUpdater");
thread.setDaemon(true);
return thread;
}
}
);
try {
// 解析Eureka Server URL捌浩,并更新PeerEurekaNodes列表
updatePeerEurekaNodes(resolvePeerUrls());
// 啟動定時執(zhí)行任務(wù)peersUpdateTask(定時默認10min,由peerEurekaNodesUpdateIntervalMs配置)
Runnable peersUpdateTask = new Runnable() {
@Override
public void run() {
try {
// 定時任務(wù)中仍然是 解析Eureka Server URL嚎卫,并更新PeerEurekaNodes列表
updatePeerEurekaNodes(resolvePeerUrls());
} catch (Throwable e) {
logger.error("Cannot update the replica Nodes", e);
}
}
};
taskExecutor.scheduleWithFixedDelay(
peersUpdateTask,
serverConfig.getPeerEurekaNodesUpdateIntervalMs(),
serverConfig.getPeerEurekaNodesUpdateIntervalMs(),
TimeUnit.MILLISECONDS
);
} catch (Exception e) {
throw new IllegalStateException(e);
}
// 打印對等體節(jié)點(應(yīng)該沒有當前節(jié)點自己)
for (PeerEurekaNode node : peerEurekaNodes) {
logger.info("Replica node URL: " + node.getServiceUrl());
}
}
PeerEurekaNodes啟動主要做了2件事:
- 根據(jù)配置信息更新PeerEurekaNodes列表
- 啟動定時更新PeerEurekaNodes列表的任務(wù)peersUpdateTask嘉栓,定時線程名【Eureka-PeerNodesUpdater】
resolvePeerUrls(): 解析配置的對等體URL
protected List<String> resolvePeerUrls() {
// 當前Eureka Server自己的InstanceInfo信息
InstanceInfo myInfo = applicationInfoManager.getInfo();
// 當前Eureka Server所在的zone,默認是 defaultZone
String zone = InstanceInfo.getZone(clientConfig.getAvailabilityZones(clientConfig.getRegion()), myInfo);
// 獲取配置的service-url
List<String> replicaUrls = EndpointUtils
.getDiscoveryServiceUrls(clientConfig, zone, new EndpointUtils.InstanceInfoBasedUrlRandomizer(myInfo));
// 遍歷service-url拓诸,排除自己
int idx = 0;
while (idx < replicaUrls.size()) {
if (isThisMyUrl(replicaUrls.get(idx))) {
replicaUrls.remove(idx);
} else {
idx++;
}
}
return replicaUrls;
}
isThisMyUrl()
是如何判斷是自己的URL侵佃,進而排除的呢?
public boolean isThisMyUrl(String url) {
return isInstanceURL(url, applicationInfoManager.getInfo());
}
public boolean isInstanceURL(String url, InstanceInfo instance) {
// 根據(jù)配置項的url獲取host主機信息
String hostName = hostFromUrl(url);
// 根據(jù)當前Eureka Server的Instance實例信息獲取host主機信息
String myInfoComparator = instance.getHostName();
// 如果eureka.client.transport.applicationsResolverUseIp==true奠支,即按照IP解析URL
// 那么將當前Eureka Server的Instance實例信息轉(zhuǎn)換為IP
if (clientConfig.getTransportConfig().applicationsResolverUseIp()) {
myInfoComparator = instance.getIPAddr();
}
// 比較配置項的hostName 和 當前Eureka Server的Instance實例信息
return hostName != null && hostName.equals(myInfoComparator);
}
其中配置項中的hostName基本上就是 http:// 和 端口號 之間的部分馋辈,而當前Eureka Server實例的用于比較的myInfoComparator信息是
- 如果主動配置了eureka.instance.hostname=xxx,配置值就是當前Eureka Server實例的host
- 沒有主動配置的話倍谜,會從在
EurekaClientAutoConfiguration
中創(chuàng)建EurekaInstanceConfigBean
時使用的InetUtils中獲取迈螟,InetUtils是spring cloud網(wǎng)絡(luò)相關(guān)的工具類,其首先根據(jù)第一個非回環(huán)網(wǎng)卡獲取IP(注意:docker容器環(huán)境有坑)尔崔,再根據(jù)InetAddress獲取與IP對應(yīng)的hostname答毫,我已知的是從如Linux的 /etc/hosts配置文件中獲取 或者 從hostname環(huán)境變量獲取 - 如果eureka.client.transport.applicationsResolverUseIp=true,那么按照當前Eureka Server實例的IP來比較
updatePeerEurekaNodes(): 更新PeerEurekaNodes列表
// PeerEurekaNodes#updatePeerEurekaNodes()
// newPeerUrls為本次要更新的Eureka對等體URL列表
protected void updatePeerEurekaNodes(List<String> newPeerUrls) {
if (newPeerUrls.isEmpty()) {
logger.warn("The replica size seems to be empty. Check the route 53 DNS Registry");
return;
}
// 計算 原peerEurekaNodeUrls - 新newPeerUrls 的差集季春,就是多余可shutdown節(jié)點
Set<String> toShutdown = new HashSet<>(peerEurekaNodeUrls);
toShutdown.removeAll(newPeerUrls);
// 計算 新newPeerUrls - 原peerEurekaNodeUrls 的差集洗搂,就是需要新增節(jié)點
Set<String> toAdd = new HashSet<>(newPeerUrls);
toAdd.removeAll(peerEurekaNodeUrls);
if (toShutdown.isEmpty() && toAdd.isEmpty()) { // No change 沒有變更
return;
}
// Remove peers no long available
List<PeerEurekaNode> newNodeList = new ArrayList<>(peerEurekaNodes);
// shutDown多余節(jié)點
if (!toShutdown.isEmpty()) {
logger.info("Removing no longer available peer nodes {}", toShutdown);
int i = 0;
while (i < newNodeList.size()) {
PeerEurekaNode eurekaNode = newNodeList.get(i);
if (toShutdown.contains(eurekaNode.getServiceUrl())) {
newNodeList.remove(i);
eurekaNode.shutDown();
} else {
i++;
}
}
}
// Add new peers
// 添加新的peerEurekaNode - createPeerEurekaNode()
if (!toAdd.isEmpty()) {
logger.info("Adding new peer nodes {}", toAdd);
for (String peerUrl : toAdd) {
newNodeList.add(createPeerEurekaNode(peerUrl));
}
}
this.peerEurekaNodes = newNodeList;
this.peerEurekaNodeUrls = new HashSet<>(newPeerUrls);
}
2、PeerAwareInstanceRegistry#init(peerEurekaNodes):集群實例注冊器初始化
根據(jù)上一步初始化好的peerEurekaNodes载弄,來初始化PeerAwareInstanceRegistry耘拇,考慮集群中的對等體的實例注冊器
// PeerAwareInstanceRegistryImpl#init()
@Override
public void init(PeerEurekaNodes peerEurekaNodes) throws Exception {
// 【重要】啟動用于統(tǒng)計最后xx毫秒續(xù)約情況的定時線程
this.numberOfReplicationsLastMin.start();
this.peerEurekaNodes = peerEurekaNodes;
// 【重要】初始化ResponseCache: 對客戶端查詢服務(wù)列表信息的緩存(所有服務(wù)列表、增量修改宇攻、單個應(yīng)用)
// 默認responseCacheUpdateIntervalMs=30s
initializedResponseCache();
// 【重要】定期更新續(xù)約閥值的任務(wù)惫叛,默認900s執(zhí)行一次
// 調(diào)用 PeerAwareInstanceRegistryImpl#updateRenewalThreshold()
scheduleRenewalThresholdUpdateTask();
// 初始化 遠程區(qū)域注冊 相關(guān)信息(默認沒有遠程Region,都是使用us-east-1)
initRemoteRegionRegistry();
try {
Monitors.registerObject(this);
} catch (Throwable e) {
logger.warn("Cannot register the JMX monitor for the InstanceRegistry :", e);
}
}
numberOfReplicationsLastMin: 上一分鐘來自對等節(jié)點復(fù)制的續(xù)約數(shù)統(tǒng)計
numberOfReplicationsLastMin是com.netflix.eureka.util.MeasuredRate
用于統(tǒng)計測量上一分鐘來自對等節(jié)點復(fù)制的續(xù)約數(shù)
// MeasuredRate#start()
public synchronized void start() {
if (!isActive) {
timer.schedule(new TimerTask() {
@Override
public void run() {
try {
// Zero out the current bucket.
// 將當前的桶的統(tǒng)計數(shù)據(jù)放到lastBucket逞刷,當前桶置為0
lastBucket.set(currentBucket.getAndSet(0));
} catch (Throwable e) {
logger.error("Cannot reset the Measured Rate", e);
}
}
}, sampleInterval, sampleInterval);
isActive = true;
}
}
/**
* Returns the count in the last sample interval.
* 返回上一分鐘的統(tǒng)計數(shù)
*/
public long getCount() {
return lastBucket.get();
}
/**
* Increments the count in the current sample interval.
* 增加當前桶的計數(shù)嘉涌,在以下2個場景有調(diào)用:
* AbstractInstanceRegistry#renew() - 續(xù)約
* PeerAwareInstanceRegistryImpl#replicateToPeers() -
*/
public void increment() {
currentBucket.incrementAndGet();
}
初始化ResponseCache
ResponseCache主要是緩存服務(wù)列表信息妻熊,根據(jù)注釋可知,緩存以壓縮和非壓縮形式維護洛心,用于三類請求: all applications固耘,增量更改和單個application
// ResponseCacheImpl構(gòu)造
private final ConcurrentMap<Key, Value> readOnlyCacheMap = new ConcurrentHashMap<Key, Value>();
private final LoadingCache<Key, Value> readWriteCacheMap;
ResponseCacheImpl(EurekaServerConfig serverConfig, ServerCodecs serverCodecs, AbstractInstanceRegistry registry) {
this.serverConfig = serverConfig;
this.serverCodecs = serverCodecs;
// 根據(jù)配置eureka.server.useReadOnlyResponseCache判斷,是否使用只讀ResponseCache词身,默認true
// 由于ResponseCache維護這一個可讀可寫的readWriteCacheMap厅目,還有一個只讀的readOnlyCacheMap
// 此配置控制在get()應(yīng)用數(shù)據(jù)時,是去只讀Map讀法严,還是讀寫Map讀损敷,應(yīng)該只讀Map是定期更新的
this.shouldUseReadOnlyResponseCache = serverConfig.shouldUseReadOnlyResponseCache();
this.registry = registry;
// eureka.server.responseCacheUpdateIntervalMs緩存更新頻率,默認30s
long responseCacheUpdateIntervalMs = serverConfig.getResponseCacheUpdateIntervalMs();
// 創(chuàng)建讀寫Map深啤,com.google.common.cache.LoadingCache
// 可以設(shè)置初始值拗馒,數(shù)據(jù)寫入過期時間,刪除監(jiān)聽器等
this.readWriteCacheMap =
CacheBuilder.newBuilder().initialCapacity(1000)
.expireAfterWrite(serverConfig.getResponseCacheAutoExpirationInSeconds(), TimeUnit.SECONDS)
.removalListener(new RemovalListener<Key, Value>() {
@Override
public void onRemoval(RemovalNotification<Key, Value> notification) {
Key removedKey = notification.getKey();
if (removedKey.hasRegions()) {
Key cloneWithNoRegions = removedKey.cloneWithoutRegions();
regionSpecificKeys.remove(cloneWithNoRegions, removedKey);
}
}
})
.build(new CacheLoader<Key, Value>() {
@Override
public Value load(Key key) throws Exception {
if (key.hasRegions()) {
Key cloneWithNoRegions = key.cloneWithoutRegions();
regionSpecificKeys.put(cloneWithNoRegions, key);
}
Value value = generatePayload(key);
return value;
}
});
// 如果啟用只讀緩存溯街,那么每隔responseCacheUpdateIntervalMs=30s诱桂,執(zhí)行g(shù)etCacheUpdateTask()
if (shouldUseReadOnlyResponseCache) {
timer.schedule(getCacheUpdateTask(),
new Date(((System.currentTimeMillis() / responseCacheUpdateIntervalMs) * responseCacheUpdateIntervalMs)
+ responseCacheUpdateIntervalMs),
responseCacheUpdateIntervalMs);
}
try {
Monitors.registerObject(this);
} catch (Throwable e) {
logger.warn("Cannot register the JMX monitor for the InstanceRegistry", e);
}
}
可見ResponseCache維護了兩個Map,一個可讀可寫的readWriteCacheMap呈昔,應(yīng)該每個操作都會寫入挥等,一個只讀的readOnlyCacheMap,默認應(yīng)該每30s更新一次堤尾,下面具體看看getCacheUpdateTask()
// ResponseCacheImpl#getCacheUpdateTask()
private TimerTask getCacheUpdateTask() {
return new TimerTask() {
@Override
public void run() {
logger.debug("Updating the client cache from response cache");
// 遍歷只讀Map
for (Key key : readOnlyCacheMap.keySet()) {
if (logger.isDebugEnabled()) {
Object[] args = {key.getEntityType(), key.getName(), key.getVersion(), key.getType()};
logger.debug("Updating the client cache from response cache for key : {} {} {} {}", args);
}
try {
CurrentRequestVersion.set(key.getVersion());
Value cacheValue = readWriteCacheMap.get(key);
Value currentCacheValue = readOnlyCacheMap.get(key);
// 如果只讀Map中的值 和 讀寫Map中的值不同肝劲,用讀寫Map更新只讀Map
if (cacheValue != currentCacheValue) {
readOnlyCacheMap.put(key, cacheValue);
}
} catch (Throwable th) {
logger.error("Error while updating the client cache from response cache", th);
}
}
}
};
}
每30s會比較只讀Map和讀寫Map中的值,以讀寫Map中的為準
scheduleRenewalThresholdUpdateTask:定期更新續(xù)約閥值的任務(wù)
/**
* Schedule the task that updates <em>renewal threshold</em> periodically.
* The renewal threshold would be used to determine if the renewals drop
* dramatically because of network partition and to protect expiring too
* many instances at a time.
* 每隔 eureka.server.renewalThresholdUpdateIntervalMs=900秒 更新一次續(xù)約閥值
*/
private void scheduleRenewalThresholdUpdateTask() {
timer.schedule(new TimerTask() {
@Override
public void run() {
updateRenewalThreshold();
}
}, serverConfig.getRenewalThresholdUpdateIntervalMs(),
serverConfig.getRenewalThresholdUpdateIntervalMs());
}
更新續(xù)約閥值在updateRenewalThreshold()
方法
// PeerAwareInstanceRegistryImpl#updateRenewalThreshold()
/**
* Updates the <em>renewal threshold</em> based on the current number of
* renewals. The threshold is a percentage as specified in
* {@link EurekaServerConfig#getRenewalPercentThreshold()} of renewals
* received per minute {@link #getNumOfRenewsInLastMin()}.
*/
private void updateRenewalThreshold() {
try {
Applications apps = eurekaClient.getApplications();
int count = 0;
// 統(tǒng)計所有Instance實例個數(shù)
for (Application app : apps.getRegisteredApplications()) {
for (InstanceInfo instance : app.getInstances()) {
if (this.isRegisterable(instance)) {
++count;
}
}
}
synchronized (lock) {
// Update threshold only if the threshold is greater than the
// current expected threshold of if the self preservation is disabled.
// 只有當閥值大于當前預(yù)期值時郭宝,才更新 或者 關(guān)閉了自我保護模式
if ((count * 2) > (serverConfig.getRenewalPercentThreshold() * numberOfRenewsPerMinThreshold)
|| (!this.isSelfPreservationModeEnabled())) {
this.expectedNumberOfRenewsPerMin = count * 2;
this.numberOfRenewsPerMinThreshold = (int) ((count * 2) * serverConfig.getRenewalPercentThreshold());
}
}
logger.info("Current renewal threshold is : {}", numberOfRenewsPerMinThreshold);
} catch (Throwable e) {
logger.error("Cannot update renewal threshold", e);
}
}
其實大體意思是:先計算所有Instance實例個數(shù)辞槐,默認每個實例1分鐘應(yīng)該續(xù)約2次(30s一次)
- 如果開啟自我保護模式,更新 expectedNumberOfRenewsPerMin預(yù)期每分鐘續(xù)約數(shù) 和 numberOfRenewsPerMinThreshold每分鐘續(xù)約閥值
- 如果沒有開啟自我保護模式粘室,只有當本期續(xù)約數(shù)大于之前的閥值榄檬,即當前不處在自我保護模式中(自我保護模式中,不能刪除服務(wù)列表衔统,閥值自然也不能更新)丙号,才可以更新 expectedNumberOfRenewsPerMin 和 numberOfRenewsPerMinThreshold
但如上代碼是有問題的,無論是注釋還是判斷邏輯缰冤,當前版本:eureka-core-1.6.2
直到 v1.9.3版本才修復(fù)
之后又有兩個版本,修改了這里的計算邏輯和做了方法抽取
Extract calculation of renews threshold to separate method
【重點2】EurekaServerBootstrap初始化
上面的自動配置過程中已經(jīng)注冊了處理所有 **/eureka/**** 請求的Jersey Filter喳魏,這樣所有Client的注冊棉浸、續(xù)約等請求都可以處理了。而還有一些工作是通過EurekaServerBootstrap#contextInitialized()
完成的刺彩,在Spring容器基本上refresh()完畢的時候
EurekaServerBootstrap是 spring cloud的實現(xiàn)迷郑,而netflix的Eureka Server啟動引導(dǎo)的實現(xiàn)是 EurekaBootStrap
// EurekaServerBootstrap#contextInitialized()
public void contextInitialized(ServletContext context) {
try {
initEurekaEnvironment(); // 初始化環(huán)境
initEurekaServerContext(); // 初始化上下文
context.setAttribute(EurekaServerContext.class.getName(), this.serverContext);
}
catch (Throwable e) {
log.error("Cannot bootstrap eureka server :", e);
throw new RuntimeException("Cannot bootstrap eureka server :", e);
}
}
這兩個里面我們主要關(guān)注上下文的初始化initEurekaServerContext()
// EurekaServerBootstrap#initEurekaServerContext()
protected void initEurekaServerContext() throws Exception {
// For backward compatibility
JsonXStream.getInstance().registerConverter(new V1AwareInstanceInfoConverter(),
XStream.PRIORITY_VERY_HIGH);
XmlXStream.getInstance().registerConverter(new V1AwareInstanceInfoConverter(),
XStream.PRIORITY_VERY_HIGH);
// 是否為AWS環(huán)境
if (isAws(this.applicationInfoManager.getInfo())) {
this.awsBinder = new AwsBinderDelegate(this.eurekaServerConfig,
this.eurekaClientConfig, this.registry, this.applicationInfoManager);
this.awsBinder.start();
}
// 將serverContext由Holder保管
EurekaServerContextHolder.initialize(this.serverContext);
log.info("Initialized server context");
// Copy registry from neighboring eureka node
// 從相鄰的eureka節(jié)點拷貝注冊列表信息
int registryCount = this.registry.syncUp();
this.registry.openForTraffic(this.applicationInfoManager, registryCount);
// Register all monitoring statistics.
EurekaMonitors.registerAllStats();
}
有兩個重要環(huán)接:
- registry.syncUp(): 從相鄰eureka節(jié)點拷貝注冊列表信息
- registry.openForTraffic(): 允許開始與客戶端的數(shù)據(jù)傳輸枝恋,即開始作為Server服務(wù)
1、registry.syncUp():從相鄰eureka節(jié)點拷貝注冊列表信息
/**
* Populates the registry information from a peer eureka node. This
* operation fails over to other nodes until the list is exhausted if the
* communication fails.
*/
@Override
public int syncUp() {
// Copy entire entry from neighboring DS node
int count = 0;
// 循環(huán)嗡害,最多重試RegistrySyncRetries次(默認 5)
// eurekaClient中的邏輯會重試其它的eureka節(jié)點
for (int i = 0; ((i < serverConfig.getRegistrySyncRetries()) && (count == 0)); i++) {
if (i > 0) {
try {
Thread.sleep(serverConfig.getRegistrySyncRetryWaitMs()); //30s
} catch (InterruptedException e) {
logger.warn("Interrupted during registry transfer..");
break;
}
}
// 從eurekaClient獲取服務(wù)列表
Applications apps = eurekaClient.getApplications();
// 循環(huán)服務(wù)列表焚碌,并依次注冊
for (Application app : apps.getRegisteredApplications()) {
for (InstanceInfo instance : app.getInstances()) {
try {
if (isRegisterable(instance)) {
register(instance, instance.getLeaseInfo().getDurationInSecs(), true);
count++;
}
} catch (Throwable t) {
logger.error("During DS init copy", t);
}
}
}
}
return count;
}
2、registry.openForTraffic(): 允許與客戶端的數(shù)據(jù)傳輸
// InstanceRegistry#openForTraffic()
/**
* If
* {@link PeerAwareInstanceRegistryImpl#openForTraffic(ApplicationInfoManager, int)}
* is called with a zero argument, it means that leases are not automatically
* cancelled if the instance hasn't sent any renewals recently. This happens for a
* standalone server. It seems like a bad default, so we set it to the smallest
* non-zero value we can, so that any instances that subsequently register can bump up
* the threshold.
*/
@Override
public void openForTraffic(ApplicationInfoManager applicationInfoManager, int count) {
// 如果count==0霸妹,即沒有從相鄰eureka節(jié)點得到服務(wù)列表十电,如單機啟動模式,defaultOpenForTrafficCount=1
super.openForTraffic(applicationInfoManager,
count == 0 ? this.defaultOpenForTrafficCount : count);
}
// PeerAwareInstanceRegistryImpl#openForTraffic()
@Override
public void openForTraffic(ApplicationInfoManager applicationInfoManager, int count) {
// Renewals happen every 30 seconds and for a minute it should be a factor of 2.
// 每分鐘期待的續(xù)約數(shù)(默認30s續(xù)約叹螟,60s就是2次)
this.expectedNumberOfRenewsPerMin = count * 2;
// 每分鐘續(xù)約的閥值:85% * expectedNumberOfRenewsPerMin
this.numberOfRenewsPerMinThreshold =
(int) (this.expectedNumberOfRenewsPerMin * serverConfig.getRenewalPercentThreshold());
logger.info("Got " + count + " instances from neighboring DS node");
logger.info("Renew threshold is: " + numberOfRenewsPerMinThreshold);
this.startupTime = System.currentTimeMillis();
if (count > 0) { //可count默認值是1鹃骂,那么peerInstancesTransferEmptyOnStartup始終不會是true
//在PeerAwareInstanceRegistryImpl#shouldAllowAccess(boolean)方法有用
this.peerInstancesTransferEmptyOnStartup = false;
}
DataCenterInfo.Name selfName = applicationInfoManager.getInfo().getDataCenterInfo().getName();
boolean isAws = Name.Amazon == selfName;
if (isAws && serverConfig.shouldPrimeAwsReplicaConnections()) {
logger.info("Priming AWS connections for all replicas..");
primeAwsReplicas(applicationInfoManager);
}
logger.info("Changing status to UP");
applicationInfoManager.setInstanceStatus(InstanceStatus.UP);
// 開啟新的【EvictionTask】
super.postInit();
}
// AbstractInstanceRegistry#postInit()
protected void postInit() {
renewsLastMin.start(); //統(tǒng)計上一分鐘續(xù)約數(shù)的監(jiān)控Timer
if (evictionTaskRef.get() != null) {
evictionTaskRef.get().cancel();
}
evictionTaskRef.set(new EvictionTask());
evictionTimer.schedule(evictionTaskRef.get(),
serverConfig.getEvictionIntervalTimerInMs(), //默認60s
serverConfig.getEvictionIntervalTimerInMs());
}
- 如果沒有從相鄰eureka節(jié)點獲得服務(wù),count默認為1
- 初始化每分鐘期待的續(xù)約數(shù) expectedNumberOfRenewsPerMin = count * 2
- 初始化每分鐘續(xù)約閥值numberOfRenewsPerMinThreshold= 85% * expectedNumberOfRenewsPerMin
- applicationInfoManager設(shè)置狀態(tài)為UP
- 開啟新的【EvictionTask】驅(qū)逐任務(wù)
二罢绽、Eureka Server處理注冊請求
經(jīng)過上面的Eureka Server自動配置及初始化畏线,Eureka Server已經(jīng)成功啟動并可以通過Jersey處理各種請求,具體的注冊請求是由com.netflix.eureka.resources.ApplicationResource#addInstance()
處理的
ApplicationResource#addInstance() - 注冊單個應(yīng)用實例
// ApplicationResource#addInstance()
@POST
@Consumes({"application/json", "application/xml"})
public Response addInstance(InstanceInfo info,
@HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication) {
logger.debug("Registering instance {} (replication={})", info.getId(), isReplication);
// validate that the instanceinfo contains all the necessary required fields
// 驗證Instance實例的所有必填字段
if (isBlank(info.getId())) {
return Response.status(400).entity("Missing instanceId").build();
} else if (isBlank(info.getHostName())) {
return Response.status(400).entity("Missing hostname").build();
} else if (isBlank(info.getAppName())) {
return Response.status(400).entity("Missing appName").build();
} else if (!appName.equals(info.getAppName())) {
return Response.status(400).entity("Mismatched appName, expecting " + appName + " but was " + info.getAppName()).build();
} else if (info.getDataCenterInfo() == null) {
return Response.status(400).entity("Missing dataCenterInfo").build();
} else if (info.getDataCenterInfo().getName() == null) {
return Response.status(400).entity("Missing dataCenterInfo Name").build();
}
// handle cases where clients may be registering with bad DataCenterInfo with missing data
// 處理客戶端可能正在使用缺少數(shù)據(jù)的錯誤DataCenterInfo注冊的情況
DataCenterInfo dataCenterInfo = info.getDataCenterInfo();
if (dataCenterInfo instanceof UniqueIdentifier) {
String dataCenterInfoId = ((UniqueIdentifier) dataCenterInfo).getId();
if (isBlank(dataCenterInfoId)) {
boolean experimental = "true".equalsIgnoreCase(serverConfig.getExperimental("registration.validation.dataCenterInfoId"));
if (experimental) {
String entity = "DataCenterInfo of type " + dataCenterInfo.getClass() + " must contain a valid id";
return Response.status(400).entity(entity).build();
} else if (dataCenterInfo instanceof AmazonInfo) {
AmazonInfo amazonInfo = (AmazonInfo) dataCenterInfo;
String effectiveId = amazonInfo.get(AmazonInfo.MetaDataKey.instanceId);
if (effectiveId == null) {
amazonInfo.getMetadata().put(AmazonInfo.MetaDataKey.instanceId.getName(), info.getId());
}
} else {
logger.warn("Registering DataCenterInfo of type {} without an appropriate id", dataCenterInfo.getClass());
}
}
}
// 【 使用PeerAwareInstanceRegistry集群實例注冊器register當前實例 】
// isReplication表示此操作是否是節(jié)點間的復(fù)制良价,此處isReplication==null
registry.register(info, "true".equals(isReplication));
return Response.status(204).build(); // 204 to be backwards compatible
// 注冊成功返回204狀態(tài)碼
}
重點是 registry.register(info, "true".equals(isReplication))
寝殴,即使用PeerAwareInstanceRegistry集群實例注冊器register當前實例
PeerAwareInstanceRegistryImpl#register() - 注冊服務(wù)信息并同步到其它Eureka節(jié)點
// PeerAwareInstanceRegistryImpl#register()
/**
* Registers the information about the {@link InstanceInfo} and replicates
* this information to all peer eureka nodes. If this is replication event
* from other replica nodes then it is not replicated.
* 注冊有關(guān)InstanceInfo信息,并將此信息復(fù)制到所有對等的eureka節(jié)點
* 如果這是來自其他節(jié)點的復(fù)制事件明垢,則不會繼續(xù)復(fù)制它
*
* @param info
* the {@link InstanceInfo} to be registered and replicated.
* @param isReplication
* true if this is a replication event from other replica nodes,
* false otherwise.
*/
@Override
public void register(final InstanceInfo info, final boolean isReplication) {
int leaseDuration = Lease.DEFAULT_DURATION_IN_SECS; //默認的租約持續(xù)時間是90s
// 如果當前Instance實例的租約信息中有l(wèi)easeDuration持續(xù)時間蚣常,使用實例的leaseDuration
if (info.getLeaseInfo() != null && info.getLeaseInfo().getDurationInSecs() > 0) {
leaseDuration = info.getLeaseInfo().getDurationInSecs();
}
// 【 當前Eureka Server注冊實例信息 】
super.register(info, leaseDuration, isReplication);
// 【 將注冊實例信息復(fù)制到集群中其它節(jié)點 】
replicateToPeers(Action.Register, info.getAppName(), info.getId(), info, null, isReplication);
}
- 默認leaseDuration租約持續(xù)時間為90s,如果當前Instance實例的租約信息中有l(wèi)easeDuration持續(xù)時間袖外,使用實例的leaseDuration
- 【重點】當前Eureka Server注冊實例信息
- 【重點】將注冊實例信息復(fù)制到集群中其它節(jié)點
AbstractInstanceRegistry#register():注冊
/**
* Registers a new instance with a given duration.
*
* @see com.netflix.eureka.lease.LeaseManager#register(java.lang.Object, int, boolean)
*/
public void register(InstanceInfo registrant, int leaseDuration, boolean isReplication) {
try {
read.lock(); //讀鎖
// registry是保存所有應(yīng)用實例信息的Map:ConcurrentHashMap<String, Map<String, Lease<InstanceInfo>>>
// 從registry中獲取當前appName的所有實例信息
Map<String, Lease<InstanceInfo>> gMap = registry.get(registrant.getAppName());
REGISTER.increment(isReplication); //注冊統(tǒng)計+1
// 如果當前appName實例信息為空史隆,新建Map
if (gMap == null) {
final ConcurrentHashMap<String, Lease<InstanceInfo>> gNewMap = new ConcurrentHashMap<String, Lease<InstanceInfo>>();
gMap = registry.putIfAbsent(registrant.getAppName(), gNewMap);
if (gMap == null) {
gMap = gNewMap;
}
}
// 獲取實例的Lease租約信息
Lease<InstanceInfo> existingLease = gMap.get(registrant.getId());
// Retain the last dirty timestamp without overwriting it, if there is already a lease
// 如果已經(jīng)有租約,則保留最后一個臟時間戳而不覆蓋它
// (比較當前請求實例租約 和 已有租約 的LastDirtyTimestamp曼验,選擇靠后的)
if (existingLease != null && (existingLease.getHolder() != null)) {
Long existingLastDirtyTimestamp = existingLease.getHolder().getLastDirtyTimestamp();
Long registrationLastDirtyTimestamp = registrant.getLastDirtyTimestamp();
logger.debug("Existing lease found (existing={}, provided={}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp);
if (existingLastDirtyTimestamp > registrationLastDirtyTimestamp) {
logger.warn("There is an existing lease and the existing lease's dirty timestamp {} is greater" +
" than the one that is being registered {}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp);
logger.warn("Using the existing instanceInfo instead of the new instanceInfo as the registrant");
registrant = existingLease.getHolder();
}
}
else {
// The lease does not exist and hence it is a new registration
// 如果之前不存在實例的租約泌射,說明是新實例注冊
// expectedNumberOfRenewsPerMin期待的每分鐘續(xù)約數(shù)+2(因為30s一個)
// 并更新numberOfRenewsPerMinThreshold每分鐘續(xù)約閥值(85%)
synchronized (lock) {
if (this.expectedNumberOfRenewsPerMin > 0) {
// Since the client wants to cancel it, reduce the threshold
// (1
// for 30 seconds, 2 for a minute)
this.expectedNumberOfRenewsPerMin = this.expectedNumberOfRenewsPerMin + 2;
this.numberOfRenewsPerMinThreshold =
(int) (this.expectedNumberOfRenewsPerMin * serverConfig.getRenewalPercentThreshold());
}
}
logger.debug("No previous lease information found; it is new registration");
}
Lease<InstanceInfo> lease = new Lease<InstanceInfo>(registrant, leaseDuration);
if (existingLease != null) {
lease.setServiceUpTimestamp(existingLease.getServiceUpTimestamp());
}
gMap.put(registrant.getId(), lease); //當前實例信息放到維護注冊信息的Map
// 同步維護最近注冊隊列
synchronized (recentRegisteredQueue) {
recentRegisteredQueue.add(new Pair<Long, String>(
System.currentTimeMillis(),
registrant.getAppName() + "(" + registrant.getId() + ")"));
}
// This is where the initial state transfer of overridden status happens
// 如果當前實例已經(jīng)維護了OverriddenStatus,將其也放到此Eureka Server的overriddenInstanceStatusMap中
if (!InstanceStatus.UNKNOWN.equals(registrant.getOverriddenStatus())) {
logger.debug("Found overridden status {} for instance {}. Checking to see if needs to be add to the "
+ "overrides", registrant.getOverriddenStatus(), registrant.getId());
if (!overriddenInstanceStatusMap.containsKey(registrant.getId())) {
logger.info("Not found overridden id {} and hence adding it", registrant.getId());
overriddenInstanceStatusMap.put(registrant.getId(), registrant.getOverriddenStatus());
}
}
InstanceStatus overriddenStatusFromMap = overriddenInstanceStatusMap.get(registrant.getId());
if (overriddenStatusFromMap != null) {
logger.info("Storing overridden status {} from map", overriddenStatusFromMap);
registrant.setOverriddenStatus(overriddenStatusFromMap);
}
// Set the status based on the overridden status rules
// 根據(jù)overridden status規(guī)則鬓照,設(shè)置狀態(tài)
InstanceStatus overriddenInstanceStatus
= getOverriddenInstanceStatus(registrant, existingLease, isReplication);
registrant.setStatusWithoutDirty(overriddenInstanceStatus);
// If the lease is registered with UP status, set lease service up timestamp
// 如果租約以UP狀態(tài)注冊熔酷,設(shè)置租賃服務(wù)時間戳
if (InstanceStatus.UP.equals(registrant.getStatus())) {
lease.serviceUp();
}
registrant.setActionType(ActionType.ADDED); //ActionType為 ADD
recentlyChangedQueue.add(new RecentlyChangedItem(lease)); //維護recentlyChangedQueue
registrant.setLastUpdatedTimestamp(); //更新最后更新時間
// 使當前應(yīng)用的ResponseCache失效
invalidateCache(registrant.getAppName(), registrant.getVIPAddress(), registrant.getSecureVipAddress());
logger.info("Registered instance {}/{} with status {} (replication={})",
registrant.getAppName(), registrant.getId(), registrant.getStatus(), isReplication);
} finally {
read.unlock(); //讀鎖
}
}
維護當前Instance實例的Lease租約信息,并放到Eureka Server維護注冊信息的Map:【ConcurrentHashMap<String, Map<String, Lease<InstanceInfo>>>】豺裆,對應(yīng)關(guān)系是 appName:<Instance實例Id拒秘,Lease租約信息>
如果是新注冊,expectedNumberOfRenewsPerMin期待的每分鐘續(xù)約數(shù)+2臭猜, 并更新numberOfRenewsPerMinThreshold每分鐘續(xù)約閥值
維護 recentRegisteredQueue最近注冊隊列躺酒,recentlyChangedQueue最近更改隊列
如果本次注冊實例已經(jīng)維護了OverriddenStatus,根據(jù)一定規(guī)則蔑歌,維護本Server節(jié)點當前實例的OverriddenStatus
設(shè)置Instance實例的最后更新時間戳
-
對當前應(yīng)用對應(yīng)的ResponseCache緩存失效
responseCache 用于緩存查詢的應(yīng)用實例信息
其使用guava cache維護了一個可讀可寫的LocalLoadingCache本地緩存【readWriteCacheMap】羹应,還有一個只讀的ConcurrentMap 【readOnlyCacheMap】
在 get(key, useReadOnlyCache)時首先會檢查【readOnlyCacheMap】只讀緩存次屠,如沒有园匹,再查【readWriteCacheMap】雳刺,而【readWriteCacheMap】的
get()
其含義實際是getOrLoad()
,如果獲取不到從CacheLoader加載裸违,而CacheLoader會到維護應(yīng)用實例注冊信息的Map中獲取【readWriteCacheMap】是直接與維護應(yīng)用實例注冊信息Map交互的掖桦,查詢時會Load加載,注冊新實例時會失效整個應(yīng)用的
【readOnlyCacheMap】是在【readWriteCacheMap】之上的只讀緩存供汛,由配置 eureka.server.useReadOnlyResponseCache控制枪汪,默認true,每隔 eureka.server.responseCacheUpdateIntervalMs=30s 與【readWriteCacheMap】同步一次
PeerAwareInstanceRegistryImpl#replicateToPeers() :復(fù)制到Eureka對等節(jié)點
// PeerAwareInstanceRegistryImpl#replicateToPeers()
/**
* Replicates all eureka actions to peer eureka nodes except for replication
* traffic to this node.
*/
private void replicateToPeers(Action action, String appName, String id,
InstanceInfo info /* optional */,
InstanceStatus newStatus /* optional */, boolean isReplication) {
Stopwatch tracer = action.getTimer().start();
try {
// 如果是復(fù)制操作(針對當前節(jié)點紊馏,false)
if (isReplication) {
numberOfReplicationsLastMin.increment();
}
// If it is a replication already, do not replicate again as this will create a poison replication
// 如果它已經(jīng)是復(fù)制料饥,請不要再次復(fù)制,直接return
if (peerEurekaNodes == Collections.EMPTY_LIST || isReplication) {
return;
}
// 遍歷集群所有節(jié)點(除當前節(jié)點外)
for (final PeerEurekaNode node : peerEurekaNodes.getPeerEurekaNodes()) {
// If the url represents this host, do not replicate to yourself.
if (peerEurekaNodes.isThisMyUrl(node.getServiceUrl())) {
continue;
}
// 復(fù)制Instance實例操作到某個node節(jié)點
replicateInstanceActionsToPeers(action, appName, id, info, newStatus, node);
}
}
finally {
tracer.stop();
}
}
下面是replicateInstanceActionsToPeers()
復(fù)制Instance實例操作到其它節(jié)點
// PeerAwareInstanceRegistryImpl#replicateInstanceActionsToPeers()
/**
* Replicates all instance changes to peer eureka nodes except for
* replication traffic to this node.
*
*/
private void replicateInstanceActionsToPeers(Action action, String appName,
String id, InstanceInfo info, InstanceStatus newStatus,
PeerEurekaNode node) {
try {
InstanceInfo infoFromRegistry = null;
CurrentRequestVersion.set(Version.V2);
switch (action) {
case Cancel: //取消
node.cancel(appName, id);
break;
case Heartbeat: //心跳
InstanceStatus overriddenStatus = overriddenInstanceStatusMap.get(id);
infoFromRegistry = getInstanceByAppAndId(appName, id, false);
node.heartbeat(appName, id, infoFromRegistry, overriddenStatus, false);
break;
case Register: //注冊
node.register(info);
break;
case StatusUpdate: //狀態(tài)更新
infoFromRegistry = getInstanceByAppAndId(appName, id, false);
node.statusUpdate(appName, id, newStatus, infoFromRegistry);
break;
case DeleteStatusOverride: //刪除OverrideStatus
infoFromRegistry = getInstanceByAppAndId(appName, id, false);
node.deleteStatusOverride(appName, id, infoFromRegistry);
break;
}
} catch (Throwable t) {
logger.error("Cannot replicate information to {} for action {}", node.getServiceUrl(), action.name(), t);
}
}
本次只關(guān)心節(jié)點的注冊操作
// PeerEurekaNode#register()
/**
* Sends the registration information of {@link InstanceInfo} receiving by
* this node to the peer node represented by this class.
*
* @param info
* the instance information {@link InstanceInfo} of any instance
* that is send to this instance.
* @throws Exception
*/
public void register(final InstanceInfo info) throws Exception {
// 當前時間 + 30s后 過期
long expiryTime = System.currentTimeMillis() + getLeaseRenewalOf(info);
// 提交相同的操作到批量復(fù)制任務(wù)處理
batchingDispatcher.process(
taskId("register", info),
new InstanceReplicationTask(targetHost, Action.Register, info, overriddenStatus:null, replicateInstanceInfo:true) {
public EurekaHttpResponse<Void> execute() {
return replicationClient.register(info);
}
},
expiryTime
);
}
而之后就和Eureka Client發(fā)起注冊請求的調(diào)用差不多 replicationClient.register(info)
至此朱监,Spring Cloud Eureka Server的整個自動配置及初始化岸啡,以及接收注冊請求,并復(fù)制到集群中的對等節(jié)點就分析完了
大體時序流程參考:
參考:
Dive into Eureka: 宋順