【SpringCloud Eureka源碼】從Eureka Client發(fā)起注冊請求到Eureka Server處理的整個服務(wù)注冊過程(下)

本文使用Spring Cloud Eureka分析

Spring Cloud版本: Dalston.SR5

spring-cloud-starter-eureka版本: 1.3.6.RELEASE

netflix eureka版本: 1.6.2

繼續(xù) 從Eureka Client發(fā)起注冊請求到Eureka Server處理的整個服務(wù)注冊過程(上) 分析


一、Spring Cloud Eureka Server自動配置及初始化

@EnableEurekaServer

創(chuàng)建Spring Cloud Eureka Server首先要使用@EnableEurekaServer注解,其實質(zhì)是:

@EnableDiscoveryClient
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Import(EurekaServerMarkerConfiguration.class)
public @interface EnableEurekaServer {

}
  • @EnableDiscoveryClient: 引入服務(wù)發(fā)現(xiàn)客戶端相關(guān)配置(身為Server的同時失息,在Server集群復(fù)制時也會作為Client)
  • 導(dǎo)入EurekaServerMarkerConfiguration: 激活EurekaServerAutoConfiguration

所以仪媒,@EnableEurekaServer注解和上一篇分析的Client啟動注解都是通過向Spring容器注入Maker的形式激活xxAutoConfiguration配置類蛀恩,Eureka Client是EurekaClientAutoConfiguration茄蚯,Eureka Server是EurekaServerAutoConfiguration


EurekaServerAutoConfiguration - 注冊服務(wù)自動配置類

以下是對自動注入的各個組件的簡單分析:

  • 頭部注解

    • @Import(EurekaServerInitializerConfiguration.class):導(dǎo)入Eureka Server初始化的配置類瞳遍,其實現(xiàn)SmartLifecycle接口闻妓,會在Spring容器基本refresh完畢時調(diào)用EurekaServerBootstrap#contextInitialized() Eureka Server啟動分析重點

    • @EnableConfigurationProperties({ EurekaDashboardProperties.class,InstanceRegistryProperties.class })

      • EurekaDashboardProperties 是儀表盤相關(guān)屬性

      • InstanceRegistryProperties 是實例注冊相關(guān)屬性

        @ConfigurationProperties(PREFIX)
        public class InstanceRegistryProperties {
        
          public static final String PREFIX = "eureka.instance.registry";
        
        
          /* Default number of expected renews per minute, defaults to 1.
           * Setting expectedNumberOfRenewsPerMin to non-zero to ensure that even an isolated
           * server can adjust its eviction policy to the number of registrations (when it's
           * zero, even a successful registration won't reset the rate threshold in
           * InstanceRegistry.register()).
           * 每分鐘默認續(xù)約數(shù)量為1
           * 將expectedNumberOfRenewsPerMin設(shè)置為非零
           * 以確保即使是隔離的服務(wù)器也可以根據(jù)注冊數(shù)量調(diào)整其驅(qū)逐策略
           * (當它為零時,即使成功注冊也不會重置InstanceRegistry.register()中的速率閾值)
           */
          @Value("${eureka.server.expectedNumberOfRenewsPerMin:1}") // for backwards compatibility
                                                                      // 為了向后兼容
          private int expectedNumberOfRenewsPerMin = 1;
        
          /** 
           * Value used in determining when leases are cancelled, default to 1 for standalone.
           * Should be set to 0 for peer replicated eurekas 
           * 決定租約何時取消的值
           * 單機默認值為1掠械,對于同行復(fù)制的eurekas由缆,應(yīng)設(shè)置為0
           */
          @Value("${eureka.server.defaultOpenForTrafficCount:1}") // for backwards compatibility
          private int defaultOpenForTrafficCount = 1;
        
    • @PropertySource("classpath:/eureka/server.properties") :在spring-cloud-netflix-eureka-server-xxx.jar中,只包含 spring.http.encoding.force=false

  • EurekaServerFeature: 訪問/features端點時會顯示啟用的Eureka Server自動配置類為EurekaServerAutoConfiguration

  • EurekaServerConfig: 注入Eureka Server配置類猾蒂,EurekaServerConfig是netflix的接口均唉,里面有很多記錄eureka服務(wù)器運行所需的配置信息,netflix的默認實現(xiàn)類是DefaultEurekaServerConfig肚菠,spring cloud的默認實現(xiàn)類是EurekaServerConfigBean

    @Configuration
    protected static class EurekaServerConfigBeanConfiguration {
      @Bean
      @ConditionalOnMissingBean
      public EurekaServerConfig eurekaServerConfig(EurekaClientConfig clientConfig) {
          EurekaServerConfigBean server = new EurekaServerConfigBean();  //創(chuàng)建EurekaServerConfigBean
            // 如果當前Eureka Server本身也需要作為客戶端注冊(集群模式必須開啟舔箭??)
          if (clientConfig.shouldRegisterWithEureka()) {
              // Set a sensible default if we are supposed to replicate
                // 設(shè)置EurekaServer在啟動期間eureka節(jié)點嘗試從對等放獲取注冊表信息的重試次數(shù)
              server.setRegistrySyncRetries(5);
          }
          return server;
      }
    }
    
  • EurekaController:Eureka Server Dashborad 對應(yīng)的 Controller(默認path: /)

  • PeerAwareInstanceRegistry: 直譯是對等體可見的應(yīng)用實例注冊器蚊逢,就是在注冊實例時會考慮集群情況下其它Node相關(guān)操作的注冊器

    @Bean
    public PeerAwareInstanceRegistry peerAwareInstanceRegistry(
          ServerCodecs serverCodecs) {
      this.eurekaClient.getApplications(); // force initialization  
                                             // 強制初始化eurekaClient层扶,在之前看RefreshScope的bug時,也使用到了這種方式強制創(chuàng)建eurekaClient
        
        // 創(chuàng)建InstanceRegistry(是spring cloud的實現(xiàn))
        // 繼承了PeerAwareInstanceRegistryImpl时捌,PeerAwareInstanceRegistry接口的實現(xiàn)類
      return new InstanceRegistry(
                this.eurekaServerConfig, this.eurekaClientConfig,
              serverCodecs, this.eurekaClient,
              this.instanceRegistryProperties.getExpectedNumberOfRenewsPerMin(),
              this.instanceRegistryProperties.getDefaultOpenForTrafficCount());
    }
    
  • PeerEurekaNodes: 用來管理PeerEurekaNode的幫助類

    • start(): 會創(chuàng)建一個newSingleThreadScheduledExecutor定時單例線程池怒医,定時更新PeerNode列表,線程名為“Eureka-PeerNodesUpdater”奢讨,線程執(zhí)行間隔為 EurekaServerConfigBean.peerEurekaNodesUpdateIntervalMs=10 * MINUTES稚叹,調(diào)用時機是:DefaultEurekaServerContext在@PostConstruct調(diào)用initialize()-->peerEurekaNodes.start()
    • updatePeerEurekaNodes(resolvePeerUrls()): 定時線程里更新PeerNode列表的核心邏輯
      • resolvePeerUrls() : 是解析其它Eureka Server Node節(jié)點URL焰薄,會根據(jù)當前Server的zone和shouldPreferSameZoneEureka的配置獲取一個經(jīng)過排序的replicaUrls集合,再判斷replicaUrls有沒有自己扒袖,有就remove
      • updatePeerEurekaNodes(): 將內(nèi)存中的老的PeerEurekaNodes.peerEurekaNodeUrls 與 上一步獲得的新的newPeerUrls對比塞茅,刪除不可用的,新增新添加的季率。之所以不直接用newPeerUrls野瘦,是因為在刪除不可用時可以做PeerEurekaNode#shutdown(),在添加新的可以PeerEurekaNodes#createPeerEurekaNode()
  • EurekaServerContext: Eureka Server啟動分析重點

    • Eureka Server上下文接口飒泻,包含initialize()鞭光、shutdown()方法,EurekaServerConfig配置泞遗,PeerEurekaNodes節(jié)點管理幫助類惰许,PeerAwareInstanceRegistry對等體可見的應(yīng)用實例注冊器,ApplicationInfoManager當前應(yīng)用實例info信息管理器(是由Client配置初始化的)

    • 默認實現(xiàn)類 com.netflix.eureka.DefaultEurekaServerContext

    • @PostConstruct方法包含一些初始化邏輯(說明初始化方法是在DefaultEurekaServerContext構(gòu)造后由@PostConstruct觸發(fā)的史辙?)

      @PostConstruct
      @Override
      public void initialize() throws Exception {
          logger.info("Initializing ...");
          
          // PeerEurekaNode的幫助類start
          // 會啟動更新PeerNode列表的定時線程
          peerEurekaNodes.start();
          
          // PeerAwareInstanceRegistry初始化
          // 啟動numberOfReplicationsLastMin定時線程汹买、initializedResponseCache()、scheduleRenewalThresholdUpdateTask()聊倔、initRemoteRegionRegistry()晦毙,還有添加JMX監(jiān)控
          registry.init(peerEurekaNodes);
          
          logger.info("Initialized");
      }
      
  • EurekaServerBootstrap: Eureka Server啟動引導(dǎo),會在Spring容器基本refresh()完畢時由EurekaServerInitializerConfiguration#run()方法真正調(diào)用eurekaServerBootstrap.contextInitialized()初始化耙蔑,其中會initEurekaEnvironment()见妒、initEurekaServerContext() Eureka Server啟動分析重點

  • 注冊 Jersey filter: 所有/eureka的請求都需要經(jīng)過Jersery Filter,其處理類是com.sun.jersey.spi.container.servlet.ServletContainer纵潦,其既是Filter徐鹤,也是Servlet,包含Jersey的處理邏輯邀层。在構(gòu)造時已經(jīng)將 com.netflix.discovery包 和 com.netflix.eureka包 下的類作為處理請求的資源導(dǎo)入,如處理單個應(yīng)用請求的com.netflix.eureka.resources.ApplicationResource

經(jīng)過上面的EurekaServerAutoConfiguration自動配置類分析后遂庄,個人感覺有幾個重點:

1寥院、DefaultEurekaServerContext(Eureka Server上下文) 初始化

因為netflix設(shè)計的EurekaServerContext接口本身包含很多成員變量,如PeerEurekaNodes管理對等節(jié)點涛目、PeerAwareInstanceRegistry考慮對等節(jié)點的實例注冊器等秸谢,在Eureka Server上下文初始化時會對這些組件初始化,還會啟動一些定時線程

2霹肝、EurekaServerBootstrap初始化

EurekaServerBootstrap是spring cloud實現(xiàn)的Eureka Server的啟動引導(dǎo)類估蹄,在netflix對應(yīng)的是EurekaBootstrap怖喻。而這個啟動引導(dǎo)類初始化是在EurekaServerInitializerConfiguration這個Spring的SmartLifecycle bean的生命周期方法中觸發(fā)的剖笙,在refresh()幾乎完成的時候,所以會在Eureka Server上下文初始化之后

3肌似、jerseyFilter,用于處理所有到/eureka的請求


【重點1】Eureka Server上下文初始化

首先看Netflix的EurekaServerContext接口是如何定義的:

public interface EurekaServerContext {

    void initialize() throws Exception;

    void shutdown() throws Exception;

    EurekaServerConfig getServerConfig();

    PeerEurekaNodes getPeerEurekaNodes();

    ServerCodecs getServerCodecs();

    PeerAwareInstanceRegistry getRegistry();

    ApplicationInfoManager getApplicationInfoManager();

}

除了初始化initialize()方法垮兑,shutdown()方法冷尉,還有一些組件EurekaServerConfig、PeerEurekaNodes系枪、ServerCodecs雀哨、PeerAwareInstanceRegistry、ApplicationInfoManager私爷,而在自動配置構(gòu)造DefaultEurekaServerContext時雾棺,這些組件都已設(shè)置好

@Inject
public DefaultEurekaServerContext(EurekaServerConfig serverConfig,
                           ServerCodecs serverCodecs,
                           PeerAwareInstanceRegistry registry,
                           PeerEurekaNodes peerEurekaNodes,
                           ApplicationInfoManager applicationInfoManager) {
    this.serverConfig = serverConfig;
    this.serverCodecs = serverCodecs;
    this.registry = registry;
    this.peerEurekaNodes = peerEurekaNodes;
    this.applicationInfoManager = applicationInfoManager;
}

接下來是由@PostConstruct觸發(fā)的初始化方法

@PostConstruct
@Override
public void initialize() throws Exception {
    logger.info("Initializing ...");
    peerEurekaNodes.start();
    registry.init(peerEurekaNodes);
    logger.info("Initialized");
}

主要調(diào)用了2個組件的初始化方法:PeerEurekaNodesPeerAwareInstanceRegistry


1、PeerEurekaNodes#start(): 初始化對等節(jié)點信息

public void start() {
    // 后臺運行的單線程定時任務(wù)執(zhí)行器衬浑,定時線程名:Eureka-PeerNodesUpdater
    taskExecutor = Executors.newSingleThreadScheduledExecutor(
            new ThreadFactory() {
                @Override
                public Thread newThread(Runnable r) {
                    Thread thread = new Thread(r, "Eureka-PeerNodesUpdater");
                    thread.setDaemon(true);
                    return thread;
                }
            }
    );
    
    try {
        // 解析Eureka Server URL捌浩,并更新PeerEurekaNodes列表
        updatePeerEurekaNodes(resolvePeerUrls());
        
        // 啟動定時執(zhí)行任務(wù)peersUpdateTask(定時默認10min,由peerEurekaNodesUpdateIntervalMs配置)
        Runnable peersUpdateTask = new Runnable() {
            @Override
            public void run() {
                try {
                    // 定時任務(wù)中仍然是 解析Eureka Server URL嚎卫,并更新PeerEurekaNodes列表
                    updatePeerEurekaNodes(resolvePeerUrls());
                } catch (Throwable e) {
                    logger.error("Cannot update the replica Nodes", e);
                }

            }
        };
        taskExecutor.scheduleWithFixedDelay(
                peersUpdateTask,
                serverConfig.getPeerEurekaNodesUpdateIntervalMs(),
                serverConfig.getPeerEurekaNodesUpdateIntervalMs(),
                TimeUnit.MILLISECONDS
        );
    } catch (Exception e) {
        throw new IllegalStateException(e);
    }
    
    // 打印對等體節(jié)點(應(yīng)該沒有當前節(jié)點自己)
    for (PeerEurekaNode node : peerEurekaNodes) {
        logger.info("Replica node URL:  " + node.getServiceUrl());
    }
}

PeerEurekaNodes啟動主要做了2件事:

  • 根據(jù)配置信息更新PeerEurekaNodes列表
  • 啟動定時更新PeerEurekaNodes列表的任務(wù)peersUpdateTask嘉栓,定時線程名【Eureka-PeerNodesUpdater】


resolvePeerUrls(): 解析配置的對等體URL

protected List<String> resolvePeerUrls() {
    // 當前Eureka Server自己的InstanceInfo信息
    InstanceInfo myInfo = applicationInfoManager.getInfo();
    // 當前Eureka Server所在的zone,默認是 defaultZone
    String zone = InstanceInfo.getZone(clientConfig.getAvailabilityZones(clientConfig.getRegion()), myInfo);
    // 獲取配置的service-url
    List<String> replicaUrls = EndpointUtils
            .getDiscoveryServiceUrls(clientConfig, zone, new EndpointUtils.InstanceInfoBasedUrlRandomizer(myInfo));

    // 遍歷service-url拓诸,排除自己
    int idx = 0;
    while (idx < replicaUrls.size()) {
        if (isThisMyUrl(replicaUrls.get(idx))) {
            replicaUrls.remove(idx);
        } else {
            idx++;
        }
    }
    return replicaUrls;
}

isThisMyUrl() 是如何判斷是自己的URL侵佃,進而排除的呢?

public boolean isThisMyUrl(String url) {
    return isInstanceURL(url, applicationInfoManager.getInfo());
}


public boolean isInstanceURL(String url, InstanceInfo instance) {
    // 根據(jù)配置項的url獲取host主機信息
    String hostName = hostFromUrl(url); 
    
    // 根據(jù)當前Eureka Server的Instance實例信息獲取host主機信息
    String myInfoComparator = instance.getHostName();
    
    // 如果eureka.client.transport.applicationsResolverUseIp==true奠支,即按照IP解析URL
    // 那么將當前Eureka Server的Instance實例信息轉(zhuǎn)換為IP
    if (clientConfig.getTransportConfig().applicationsResolverUseIp()) {
        myInfoComparator = instance.getIPAddr();
    }
    
    // 比較配置項的hostName 和 當前Eureka Server的Instance實例信息
    return hostName != null && hostName.equals(myInfoComparator);
}

其中配置項中的hostName基本上就是 http:// 和 端口號 之間的部分馋辈,而當前Eureka Server實例的用于比較的myInfoComparator信息是

  • 如果主動配置了eureka.instance.hostname=xxx,配置值就是當前Eureka Server實例的host
  • 沒有主動配置的話倍谜,會從在EurekaClientAutoConfiguration中創(chuàng)建EurekaInstanceConfigBean時使用的InetUtils中獲取迈螟,InetUtils是spring cloud網(wǎng)絡(luò)相關(guān)的工具類,其首先根據(jù)第一個非回環(huán)網(wǎng)卡獲取IP(注意:docker容器環(huán)境有坑)尔崔,再根據(jù)InetAddress獲取與IP對應(yīng)的hostname答毫,我已知的是從如Linux的 /etc/hosts配置文件中獲取 或者 從hostname環(huán)境變量獲取
  • 如果eureka.client.transport.applicationsResolverUseIp=true,那么按照當前Eureka Server實例的IP來比較


updatePeerEurekaNodes(): 更新PeerEurekaNodes列表

// PeerEurekaNodes#updatePeerEurekaNodes()
// newPeerUrls為本次要更新的Eureka對等體URL列表
protected void updatePeerEurekaNodes(List<String> newPeerUrls) {
    if (newPeerUrls.isEmpty()) {
        logger.warn("The replica size seems to be empty. Check the route 53 DNS Registry");
        return;
    }

    // 計算 原peerEurekaNodeUrls - 新newPeerUrls 的差集季春,就是多余可shutdown節(jié)點
    Set<String> toShutdown = new HashSet<>(peerEurekaNodeUrls);
    toShutdown.removeAll(newPeerUrls);
    
    // 計算 新newPeerUrls - 原peerEurekaNodeUrls 的差集洗搂,就是需要新增節(jié)點
    Set<String> toAdd = new HashSet<>(newPeerUrls);
    toAdd.removeAll(peerEurekaNodeUrls);

    if (toShutdown.isEmpty() && toAdd.isEmpty()) { // No change 沒有變更
        return;
    }

    // Remove peers no long available
    List<PeerEurekaNode> newNodeList = new ArrayList<>(peerEurekaNodes);

    // shutDown多余節(jié)點
    if (!toShutdown.isEmpty()) {
        logger.info("Removing no longer available peer nodes {}", toShutdown);
        int i = 0;
        while (i < newNodeList.size()) {
            PeerEurekaNode eurekaNode = newNodeList.get(i);
            if (toShutdown.contains(eurekaNode.getServiceUrl())) {
                newNodeList.remove(i);
                eurekaNode.shutDown();
            } else {
                i++;
            }
        }
    }

    // Add new peers
    // 添加新的peerEurekaNode - createPeerEurekaNode()
    if (!toAdd.isEmpty()) {
        logger.info("Adding new peer nodes {}", toAdd);
        for (String peerUrl : toAdd) {
            newNodeList.add(createPeerEurekaNode(peerUrl));
        }
    }

    this.peerEurekaNodes = newNodeList;
    this.peerEurekaNodeUrls = new HashSet<>(newPeerUrls);
}


2、PeerAwareInstanceRegistry#init(peerEurekaNodes):集群實例注冊器初始化

根據(jù)上一步初始化好的peerEurekaNodes载弄,來初始化PeerAwareInstanceRegistry耘拇,考慮集群中的對等體的實例注冊器

// PeerAwareInstanceRegistryImpl#init()
@Override
public void init(PeerEurekaNodes peerEurekaNodes) throws Exception {
    // 【重要】啟動用于統(tǒng)計最后xx毫秒續(xù)約情況的定時線程
    this.numberOfReplicationsLastMin.start();
    
    this.peerEurekaNodes = peerEurekaNodes;
    
    // 【重要】初始化ResponseCache: 對客戶端查詢服務(wù)列表信息的緩存(所有服務(wù)列表、增量修改宇攻、單個應(yīng)用)
    // 默認responseCacheUpdateIntervalMs=30s
    initializedResponseCache();
    
    // 【重要】定期更新續(xù)約閥值的任務(wù)惫叛,默認900s執(zhí)行一次
    //  調(diào)用 PeerAwareInstanceRegistryImpl#updateRenewalThreshold()
    scheduleRenewalThresholdUpdateTask();
    
    // 初始化 遠程區(qū)域注冊 相關(guān)信息(默認沒有遠程Region,都是使用us-east-1)
    initRemoteRegionRegistry();

    try {
        Monitors.registerObject(this);
    } catch (Throwable e) {
        logger.warn("Cannot register the JMX monitor for the InstanceRegistry :", e);
    }
}


numberOfReplicationsLastMin: 上一分鐘來自對等節(jié)點復(fù)制的續(xù)約數(shù)統(tǒng)計

numberOfReplicationsLastMin是com.netflix.eureka.util.MeasuredRate用于統(tǒng)計測量上一分鐘來自對等節(jié)點復(fù)制的續(xù)約數(shù)

// MeasuredRate#start()
public synchronized void start() {
    if (!isActive) {
        timer.schedule(new TimerTask() {

            @Override
            public void run() {
                try {
                    // Zero out the current bucket.
                    // 將當前的桶的統(tǒng)計數(shù)據(jù)放到lastBucket逞刷,當前桶置為0
                    lastBucket.set(currentBucket.getAndSet(0));
                } catch (Throwable e) {
                    logger.error("Cannot reset the Measured Rate", e);
                }
            }
        }, sampleInterval, sampleInterval);

        isActive = true;
    }
}

/**
 * Returns the count in the last sample interval.
 * 返回上一分鐘的統(tǒng)計數(shù)
 */
public long getCount() {
    return lastBucket.get();
}

/**
 * Increments the count in the current sample interval.
 * 增加當前桶的計數(shù)嘉涌,在以下2個場景有調(diào)用:
 * AbstractInstanceRegistry#renew() - 續(xù)約
 * PeerAwareInstanceRegistryImpl#replicateToPeers() - 
 */
public void increment() {
    currentBucket.incrementAndGet();
}


初始化ResponseCache

ResponseCache主要是緩存服務(wù)列表信息妻熊,根據(jù)注釋可知,緩存以壓縮和非壓縮形式維護洛心,用于三類請求: all applications固耘,增量更改和單個application

// ResponseCacheImpl構(gòu)造
private final ConcurrentMap<Key, Value> readOnlyCacheMap = new ConcurrentHashMap<Key, Value>();
private final LoadingCache<Key, Value>  readWriteCacheMap;

ResponseCacheImpl(EurekaServerConfig serverConfig, ServerCodecs serverCodecs, AbstractInstanceRegistry registry) {
    this.serverConfig = serverConfig;
    this.serverCodecs = serverCodecs;
    // 根據(jù)配置eureka.server.useReadOnlyResponseCache判斷,是否使用只讀ResponseCache词身,默認true
    // 由于ResponseCache維護這一個可讀可寫的readWriteCacheMap厅目,還有一個只讀的readOnlyCacheMap
    // 此配置控制在get()應(yīng)用數(shù)據(jù)時,是去只讀Map讀法严,還是讀寫Map讀损敷,應(yīng)該只讀Map是定期更新的
    this.shouldUseReadOnlyResponseCache = serverConfig.shouldUseReadOnlyResponseCache();
    this.registry = registry;

    // eureka.server.responseCacheUpdateIntervalMs緩存更新頻率,默認30s
    long responseCacheUpdateIntervalMs = serverConfig.getResponseCacheUpdateIntervalMs();
    
    // 創(chuàng)建讀寫Map深啤,com.google.common.cache.LoadingCache
    // 可以設(shè)置初始值拗馒,數(shù)據(jù)寫入過期時間,刪除監(jiān)聽器等
    this.readWriteCacheMap =
            CacheBuilder.newBuilder().initialCapacity(1000)
                    .expireAfterWrite(serverConfig.getResponseCacheAutoExpirationInSeconds(), TimeUnit.SECONDS)
                    .removalListener(new RemovalListener<Key, Value>() {
                        @Override
                        public void onRemoval(RemovalNotification<Key, Value> notification) {
                            Key removedKey = notification.getKey();
                            if (removedKey.hasRegions()) {
                                Key cloneWithNoRegions = removedKey.cloneWithoutRegions();
                                regionSpecificKeys.remove(cloneWithNoRegions, removedKey);
                            }
                        }
                    })
                    .build(new CacheLoader<Key, Value>() {
                        @Override
                        public Value load(Key key) throws Exception {
                            if (key.hasRegions()) {
                                Key cloneWithNoRegions = key.cloneWithoutRegions();
                                regionSpecificKeys.put(cloneWithNoRegions, key);
                            }
                            Value value = generatePayload(key);
                            return value;
                        }
                    });

    // 如果啟用只讀緩存溯街,那么每隔responseCacheUpdateIntervalMs=30s诱桂,執(zhí)行g(shù)etCacheUpdateTask()
    if (shouldUseReadOnlyResponseCache) {
        timer.schedule(getCacheUpdateTask(),
                new Date(((System.currentTimeMillis() / responseCacheUpdateIntervalMs) * responseCacheUpdateIntervalMs)
                        + responseCacheUpdateIntervalMs),
                responseCacheUpdateIntervalMs);
    }

    try {
        Monitors.registerObject(this);
    } catch (Throwable e) {
        logger.warn("Cannot register the JMX monitor for the InstanceRegistry", e);
    }
}

可見ResponseCache維護了兩個Map,一個可讀可寫的readWriteCacheMap呈昔,應(yīng)該每個操作都會寫入挥等,一個只讀的readOnlyCacheMap,默認應(yīng)該每30s更新一次堤尾,下面具體看看getCacheUpdateTask()

// ResponseCacheImpl#getCacheUpdateTask()
private TimerTask getCacheUpdateTask() {
    return new TimerTask() {
        @Override
        public void run() {
            logger.debug("Updating the client cache from response cache");
            
            // 遍歷只讀Map
            for (Key key : readOnlyCacheMap.keySet()) {
                if (logger.isDebugEnabled()) {
                    Object[] args = {key.getEntityType(), key.getName(), key.getVersion(), key.getType()};
                    logger.debug("Updating the client cache from response cache for key : {} {} {} {}", args);
                }
                try {
                    CurrentRequestVersion.set(key.getVersion());
                    Value cacheValue = readWriteCacheMap.get(key);
                    Value currentCacheValue = readOnlyCacheMap.get(key);
                    // 如果只讀Map中的值 和 讀寫Map中的值不同肝劲,用讀寫Map更新只讀Map
                    if (cacheValue != currentCacheValue) {
                        readOnlyCacheMap.put(key, cacheValue);
                    }
                } catch (Throwable th) {
                    logger.error("Error while updating the client cache from response cache", th);
                }
            }
        }
    };
}

每30s會比較只讀Map和讀寫Map中的值,以讀寫Map中的為準


scheduleRenewalThresholdUpdateTask:定期更新續(xù)約閥值的任務(wù)

/**
 * Schedule the task that updates <em>renewal threshold</em> periodically.
 * The renewal threshold would be used to determine if the renewals drop
 * dramatically because of network partition and to protect expiring too
 * many instances at a time.
 * 每隔 eureka.server.renewalThresholdUpdateIntervalMs=900秒 更新一次續(xù)約閥值
 */
private void scheduleRenewalThresholdUpdateTask() {
    timer.schedule(new TimerTask() {
                       @Override
                       public void run() {
                           updateRenewalThreshold();
                       }
                   }, serverConfig.getRenewalThresholdUpdateIntervalMs(),
            serverConfig.getRenewalThresholdUpdateIntervalMs());
}

更新續(xù)約閥值在updateRenewalThreshold()方法

// PeerAwareInstanceRegistryImpl#updateRenewalThreshold()
/**
 * Updates the <em>renewal threshold</em> based on the current number of
 * renewals. The threshold is a percentage as specified in
 * {@link EurekaServerConfig#getRenewalPercentThreshold()} of renewals
 * received per minute {@link #getNumOfRenewsInLastMin()}.
 */
private void updateRenewalThreshold() {
    try {
        Applications apps = eurekaClient.getApplications();
        int count = 0;
        // 統(tǒng)計所有Instance實例個數(shù)
        for (Application app : apps.getRegisteredApplications()) {
            for (InstanceInfo instance : app.getInstances()) {
                if (this.isRegisterable(instance)) {
                    ++count;
                }
            }
        }
        
        synchronized (lock) {
            // Update threshold only if the threshold is greater than the
            // current expected threshold of if the self preservation is disabled.
            // 只有當閥值大于當前預(yù)期值時郭宝,才更新  或者  關(guān)閉了自我保護模式
            if ((count * 2) > (serverConfig.getRenewalPercentThreshold() * numberOfRenewsPerMinThreshold)
                    || (!this.isSelfPreservationModeEnabled())) {
                this.expectedNumberOfRenewsPerMin = count * 2;
                this.numberOfRenewsPerMinThreshold = (int) ((count * 2) * serverConfig.getRenewalPercentThreshold());
            }
        }
        logger.info("Current renewal threshold is : {}", numberOfRenewsPerMinThreshold);
    } catch (Throwable e) {
        logger.error("Cannot update renewal threshold", e);
    }
}

其實大體意思是:先計算所有Instance實例個數(shù)辞槐,默認每個實例1分鐘應(yīng)該續(xù)約2次(30s一次)

  • 如果開啟自我保護模式,更新 expectedNumberOfRenewsPerMin預(yù)期每分鐘續(xù)約數(shù)numberOfRenewsPerMinThreshold每分鐘續(xù)約閥值
  • 如果沒有開啟自我保護模式粘室,只有當本期續(xù)約數(shù)大于之前的閥值榄檬,即當前不處在自我保護模式中(自我保護模式中,不能刪除服務(wù)列表衔统,閥值自然也不能更新)丙号,才可以更新 expectedNumberOfRenewsPerMin 和 numberOfRenewsPerMinThreshold

但如上代碼是有問題的,無論是注釋還是判斷邏輯缰冤,當前版本:eureka-core-1.6.2

直到 v1.9.3版本才修復(fù)

Snipaste_2019-01-21_19-09-19.jpg

https://github.com/Netflix/eureka/commit/a4dd6b22ad447c706234e63fe83cb58413f7618b#diff-4aec7ea96457f5084840fc40f501c320

之后又有兩個版本,修改了這里的計算邏輯和做了方法抽取

Add possibility to configure expected interval between clients' renews and not break self-preservation

Extract calculation of renews threshold to separate method


【重點2】EurekaServerBootstrap初始化

上面的自動配置過程中已經(jīng)注冊了處理所有 **/eureka/**** 請求的Jersey Filter喳魏,這樣所有Client的注冊棉浸、續(xù)約等請求都可以處理了。而還有一些工作是通過EurekaServerBootstrap#contextInitialized()完成的刺彩,在Spring容器基本上refresh()完畢的時候

EurekaServerBootstrap是 spring cloud的實現(xiàn)迷郑,而netflix的Eureka Server啟動引導(dǎo)的實現(xiàn)是 EurekaBootStrap

// EurekaServerBootstrap#contextInitialized()
public void contextInitialized(ServletContext context) {
    try {
        initEurekaEnvironment();    // 初始化環(huán)境
        initEurekaServerContext();  // 初始化上下文

        context.setAttribute(EurekaServerContext.class.getName(), this.serverContext);
    }
    catch (Throwable e) {
        log.error("Cannot bootstrap eureka server :", e);
        throw new RuntimeException("Cannot bootstrap eureka server :", e);
    }
}

這兩個里面我們主要關(guān)注上下文的初始化initEurekaServerContext()

// EurekaServerBootstrap#initEurekaServerContext()
protected void initEurekaServerContext() throws Exception {
    // For backward compatibility
    JsonXStream.getInstance().registerConverter(new V1AwareInstanceInfoConverter(),
            XStream.PRIORITY_VERY_HIGH);
    XmlXStream.getInstance().registerConverter(new V1AwareInstanceInfoConverter(),
            XStream.PRIORITY_VERY_HIGH);

    // 是否為AWS環(huán)境
    if (isAws(this.applicationInfoManager.getInfo())) {
        this.awsBinder = new AwsBinderDelegate(this.eurekaServerConfig,
                this.eurekaClientConfig, this.registry, this.applicationInfoManager);
        this.awsBinder.start();
    }

    // 將serverContext由Holder保管
    EurekaServerContextHolder.initialize(this.serverContext);

    log.info("Initialized server context");

    // Copy registry from neighboring eureka node
    // 從相鄰的eureka節(jié)點拷貝注冊列表信息
    int registryCount = this.registry.syncUp();
    this.registry.openForTraffic(this.applicationInfoManager, registryCount);

    // Register all monitoring statistics.
    EurekaMonitors.registerAllStats();
}

有兩個重要環(huán)接:

  • registry.syncUp(): 從相鄰eureka節(jié)點拷貝注冊列表信息
  • registry.openForTraffic(): 允許開始與客戶端的數(shù)據(jù)傳輸枝恋,即開始作為Server服務(wù)


1、registry.syncUp():從相鄰eureka節(jié)點拷貝注冊列表信息

/**
 * Populates the registry information from a peer eureka node. This
 * operation fails over to other nodes until the list is exhausted if the
 * communication fails.
 */
@Override
public int syncUp() {
    // Copy entire entry from neighboring DS node
    int count = 0;

    // 循環(huán)嗡害,最多重試RegistrySyncRetries次(默認 5)
    // eurekaClient中的邏輯會重試其它的eureka節(jié)點
    for (int i = 0; ((i < serverConfig.getRegistrySyncRetries()) && (count == 0)); i++) {
        if (i > 0) {
            try {
                Thread.sleep(serverConfig.getRegistrySyncRetryWaitMs()); //30s
            } catch (InterruptedException e) {
                logger.warn("Interrupted during registry transfer..");
                break;
            }
        }
        
        // 從eurekaClient獲取服務(wù)列表
        Applications apps = eurekaClient.getApplications();
        // 循環(huán)服務(wù)列表焚碌,并依次注冊
        for (Application app : apps.getRegisteredApplications()) {
            for (InstanceInfo instance : app.getInstances()) {
                try {
                    if (isRegisterable(instance)) {
                        register(instance, instance.getLeaseInfo().getDurationInSecs(), true);
                        count++;
                    }
                } catch (Throwable t) {
                    logger.error("During DS init copy", t);
                }
            }
        }
    }
    return count;
}


2、registry.openForTraffic(): 允許與客戶端的數(shù)據(jù)傳輸

// InstanceRegistry#openForTraffic()
/**
 * If
 * {@link PeerAwareInstanceRegistryImpl#openForTraffic(ApplicationInfoManager, int)}
 * is called with a zero argument, it means that leases are not automatically
 * cancelled if the instance hasn't sent any renewals recently. This happens for a
 * standalone server. It seems like a bad default, so we set it to the smallest
 * non-zero value we can, so that any instances that subsequently register can bump up
 * the threshold.
 */
@Override
public void openForTraffic(ApplicationInfoManager applicationInfoManager, int count) {
    // 如果count==0霸妹,即沒有從相鄰eureka節(jié)點得到服務(wù)列表十电,如單機啟動模式,defaultOpenForTrafficCount=1
    super.openForTraffic(applicationInfoManager,
            count == 0 ? this.defaultOpenForTrafficCount : count);
}


// PeerAwareInstanceRegistryImpl#openForTraffic()
@Override
public void openForTraffic(ApplicationInfoManager applicationInfoManager, int count) {
    // Renewals happen every 30 seconds and for a minute it should be a factor of 2.
    // 每分鐘期待的續(xù)約數(shù)(默認30s續(xù)約叹螟,60s就是2次)
    this.expectedNumberOfRenewsPerMin = count * 2; 
    
    // 每分鐘續(xù)約的閥值:85% * expectedNumberOfRenewsPerMin
    this.numberOfRenewsPerMinThreshold =
            (int) (this.expectedNumberOfRenewsPerMin * serverConfig.getRenewalPercentThreshold());
    logger.info("Got " + count + " instances from neighboring DS node");
    logger.info("Renew threshold is: " + numberOfRenewsPerMinThreshold);
    
    this.startupTime = System.currentTimeMillis();
    if (count > 0) { //可count默認值是1鹃骂,那么peerInstancesTransferEmptyOnStartup始終不會是true
                     //在PeerAwareInstanceRegistryImpl#shouldAllowAccess(boolean)方法有用
        this.peerInstancesTransferEmptyOnStartup = false;
    }
    
    DataCenterInfo.Name selfName = applicationInfoManager.getInfo().getDataCenterInfo().getName();
    boolean isAws = Name.Amazon == selfName;
    if (isAws && serverConfig.shouldPrimeAwsReplicaConnections()) {
        logger.info("Priming AWS connections for all replicas..");
        primeAwsReplicas(applicationInfoManager);
    }
    
    logger.info("Changing status to UP");
    applicationInfoManager.setInstanceStatus(InstanceStatus.UP);
    
    // 開啟新的【EvictionTask】
    super.postInit();
}

// AbstractInstanceRegistry#postInit()
protected void postInit() {
    renewsLastMin.start(); //統(tǒng)計上一分鐘續(xù)約數(shù)的監(jiān)控Timer
    
    if (evictionTaskRef.get() != null) {
        evictionTaskRef.get().cancel();
    }
    evictionTaskRef.set(new EvictionTask());
    evictionTimer.schedule(evictionTaskRef.get(),
            serverConfig.getEvictionIntervalTimerInMs(),  //默認60s
            serverConfig.getEvictionIntervalTimerInMs());
}
  • 如果沒有從相鄰eureka節(jié)點獲得服務(wù),count默認為1
  • 初始化每分鐘期待的續(xù)約數(shù) expectedNumberOfRenewsPerMin = count * 2
  • 初始化每分鐘續(xù)約閥值numberOfRenewsPerMinThreshold= 85% * expectedNumberOfRenewsPerMin
  • applicationInfoManager設(shè)置狀態(tài)為UP
  • 開啟新的【EvictionTask】驅(qū)逐任務(wù)


二罢绽、Eureka Server處理注冊請求

經(jīng)過上面的Eureka Server自動配置及初始化畏线,Eureka Server已經(jīng)成功啟動并可以通過Jersey處理各種請求,具體的注冊請求是由com.netflix.eureka.resources.ApplicationResource#addInstance()處理的

ApplicationResource#addInstance() - 注冊單個應(yīng)用實例

// ApplicationResource#addInstance()
@POST
@Consumes({"application/json", "application/xml"})
public Response addInstance(InstanceInfo info,
                            @HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication) {
    logger.debug("Registering instance {} (replication={})", info.getId(), isReplication);
    
    // validate that the instanceinfo contains all the necessary required fields
    // 驗證Instance實例的所有必填字段
    if (isBlank(info.getId())) {
        return Response.status(400).entity("Missing instanceId").build();
    } else if (isBlank(info.getHostName())) {
        return Response.status(400).entity("Missing hostname").build();
    } else if (isBlank(info.getAppName())) {
        return Response.status(400).entity("Missing appName").build();
    } else if (!appName.equals(info.getAppName())) {
        return Response.status(400).entity("Mismatched appName, expecting " + appName + " but was " + info.getAppName()).build();
    } else if (info.getDataCenterInfo() == null) {
        return Response.status(400).entity("Missing dataCenterInfo").build();
    } else if (info.getDataCenterInfo().getName() == null) {
        return Response.status(400).entity("Missing dataCenterInfo Name").build();
    }

    // handle cases where clients may be registering with bad DataCenterInfo with missing data
    // 處理客戶端可能正在使用缺少數(shù)據(jù)的錯誤DataCenterInfo注冊的情況
    DataCenterInfo dataCenterInfo = info.getDataCenterInfo();
    if (dataCenterInfo instanceof UniqueIdentifier) {
        String dataCenterInfoId = ((UniqueIdentifier) dataCenterInfo).getId();
        if (isBlank(dataCenterInfoId)) {
            boolean experimental = "true".equalsIgnoreCase(serverConfig.getExperimental("registration.validation.dataCenterInfoId"));
            if (experimental) {
                String entity = "DataCenterInfo of type " + dataCenterInfo.getClass() + " must contain a valid id";
                return Response.status(400).entity(entity).build();
            } else if (dataCenterInfo instanceof AmazonInfo) {
                AmazonInfo amazonInfo = (AmazonInfo) dataCenterInfo;
                String effectiveId = amazonInfo.get(AmazonInfo.MetaDataKey.instanceId);
                if (effectiveId == null) {
                    amazonInfo.getMetadata().put(AmazonInfo.MetaDataKey.instanceId.getName(), info.getId());
                }
            } else {
                logger.warn("Registering DataCenterInfo of type {} without an appropriate id", dataCenterInfo.getClass());
            }
        }
    }

    // 【 使用PeerAwareInstanceRegistry集群實例注冊器register當前實例 】
    // isReplication表示此操作是否是節(jié)點間的復(fù)制良价,此處isReplication==null
    registry.register(info, "true".equals(isReplication));
    
    return Response.status(204).build();  // 204 to be backwards compatible
                                          // 注冊成功返回204狀態(tài)碼
}

重點是 registry.register(info, "true".equals(isReplication))寝殴,即使用PeerAwareInstanceRegistry集群實例注冊器register當前實例


PeerAwareInstanceRegistryImpl#register() - 注冊服務(wù)信息并同步到其它Eureka節(jié)點

// PeerAwareInstanceRegistryImpl#register()
/**
 * Registers the information about the {@link InstanceInfo} and replicates
 * this information to all peer eureka nodes. If this is replication event
 * from other replica nodes then it is not replicated.
 * 注冊有關(guān)InstanceInfo信息,并將此信息復(fù)制到所有對等的eureka節(jié)點
 * 如果這是來自其他節(jié)點的復(fù)制事件明垢,則不會繼續(xù)復(fù)制它
 *
 * @param info
 *            the {@link InstanceInfo} to be registered and replicated.
 * @param isReplication
 *            true if this is a replication event from other replica nodes,
 *            false otherwise.
 */
@Override
public void register(final InstanceInfo info, final boolean isReplication) {
    int leaseDuration = Lease.DEFAULT_DURATION_IN_SECS; //默認的租約持續(xù)時間是90s
    
    // 如果當前Instance實例的租約信息中有l(wèi)easeDuration持續(xù)時間蚣常,使用實例的leaseDuration
    if (info.getLeaseInfo() != null && info.getLeaseInfo().getDurationInSecs() > 0) {
        leaseDuration = info.getLeaseInfo().getDurationInSecs();
    }
    
    // 【 當前Eureka Server注冊實例信息 】
    super.register(info, leaseDuration, isReplication);
    
    // 【 將注冊實例信息復(fù)制到集群中其它節(jié)點 】
    replicateToPeers(Action.Register, info.getAppName(), info.getId(), info, null, isReplication);
}
  • 默認leaseDuration租約持續(xù)時間為90s,如果當前Instance實例的租約信息中有l(wèi)easeDuration持續(xù)時間袖外,使用實例的leaseDuration
  • 【重點】當前Eureka Server注冊實例信息
  • 【重點】將注冊實例信息復(fù)制到集群中其它節(jié)點


AbstractInstanceRegistry#register():注冊

/**
 * Registers a new instance with a given duration.
 *
 * @see com.netflix.eureka.lease.LeaseManager#register(java.lang.Object, int, boolean)
 */
public void register(InstanceInfo registrant, int leaseDuration, boolean isReplication) {
    try {
        read.lock(); //讀鎖
        
        // registry是保存所有應(yīng)用實例信息的Map:ConcurrentHashMap<String, Map<String, Lease<InstanceInfo>>>
        // 從registry中獲取當前appName的所有實例信息
        Map<String, Lease<InstanceInfo>> gMap = registry.get(registrant.getAppName());
        
        REGISTER.increment(isReplication); //注冊統(tǒng)計+1
        
        // 如果當前appName實例信息為空史隆,新建Map
        if (gMap == null) {
            final ConcurrentHashMap<String, Lease<InstanceInfo>> gNewMap = new ConcurrentHashMap<String, Lease<InstanceInfo>>();
            gMap = registry.putIfAbsent(registrant.getAppName(), gNewMap);
            if (gMap == null) {
                gMap = gNewMap;
            }
        }
        
        // 獲取實例的Lease租約信息
        Lease<InstanceInfo> existingLease = gMap.get(registrant.getId());
        // Retain the last dirty timestamp without overwriting it, if there is already a lease
        // 如果已經(jīng)有租約,則保留最后一個臟時間戳而不覆蓋它
        // (比較當前請求實例租約 和 已有租約 的LastDirtyTimestamp曼验,選擇靠后的)
        if (existingLease != null && (existingLease.getHolder() != null)) {
            Long existingLastDirtyTimestamp = existingLease.getHolder().getLastDirtyTimestamp();
            Long registrationLastDirtyTimestamp = registrant.getLastDirtyTimestamp();
            logger.debug("Existing lease found (existing={}, provided={}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp);
            if (existingLastDirtyTimestamp > registrationLastDirtyTimestamp) {
                logger.warn("There is an existing lease and the existing lease's dirty timestamp {} is greater" +
                        " than the one that is being registered {}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp);
                logger.warn("Using the existing instanceInfo instead of the new instanceInfo as the registrant");
                registrant = existingLease.getHolder();
            }
        } 
        else {
            // The lease does not exist and hence it is a new registration
            // 如果之前不存在實例的租約泌射,說明是新實例注冊
            // expectedNumberOfRenewsPerMin期待的每分鐘續(xù)約數(shù)+2(因為30s一個)
            // 并更新numberOfRenewsPerMinThreshold每分鐘續(xù)約閥值(85%)
            synchronized (lock) {
                if (this.expectedNumberOfRenewsPerMin > 0) {
                    // Since the client wants to cancel it, reduce the threshold
                    // (1
                    // for 30 seconds, 2 for a minute)
                    this.expectedNumberOfRenewsPerMin = this.expectedNumberOfRenewsPerMin + 2;
                    this.numberOfRenewsPerMinThreshold =
                            (int) (this.expectedNumberOfRenewsPerMin * serverConfig.getRenewalPercentThreshold());
                }
            }
            logger.debug("No previous lease information found; it is new registration");
        }
        
        Lease<InstanceInfo> lease = new Lease<InstanceInfo>(registrant, leaseDuration);
        if (existingLease != null) {
            lease.setServiceUpTimestamp(existingLease.getServiceUpTimestamp());
        }
        gMap.put(registrant.getId(), lease); //當前實例信息放到維護注冊信息的Map
        
        // 同步維護最近注冊隊列
        synchronized (recentRegisteredQueue) {
            recentRegisteredQueue.add(new Pair<Long, String>(
                    System.currentTimeMillis(),
                    registrant.getAppName() + "(" + registrant.getId() + ")"));
        }
        
        // This is where the initial state transfer of overridden status happens
        // 如果當前實例已經(jīng)維護了OverriddenStatus,將其也放到此Eureka Server的overriddenInstanceStatusMap中
        if (!InstanceStatus.UNKNOWN.equals(registrant.getOverriddenStatus())) {
            logger.debug("Found overridden status {} for instance {}. Checking to see if needs to be add to the "
                            + "overrides", registrant.getOverriddenStatus(), registrant.getId());
            if (!overriddenInstanceStatusMap.containsKey(registrant.getId())) {
                logger.info("Not found overridden id {} and hence adding it", registrant.getId());
                overriddenInstanceStatusMap.put(registrant.getId(), registrant.getOverriddenStatus());
            }
        }
        InstanceStatus overriddenStatusFromMap = overriddenInstanceStatusMap.get(registrant.getId());
        if (overriddenStatusFromMap != null) {
            logger.info("Storing overridden status {} from map", overriddenStatusFromMap);
            registrant.setOverriddenStatus(overriddenStatusFromMap);
        }

        // Set the status based on the overridden status rules
        // 根據(jù)overridden status規(guī)則鬓照,設(shè)置狀態(tài)
        InstanceStatus overriddenInstanceStatus 
            = getOverriddenInstanceStatus(registrant, existingLease, isReplication);
        registrant.setStatusWithoutDirty(overriddenInstanceStatus);

        // If the lease is registered with UP status, set lease service up timestamp
        // 如果租約以UP狀態(tài)注冊熔酷,設(shè)置租賃服務(wù)時間戳
        if (InstanceStatus.UP.equals(registrant.getStatus())) {
            lease.serviceUp();
        }
        
        registrant.setActionType(ActionType.ADDED); //ActionType為 ADD
        recentlyChangedQueue.add(new RecentlyChangedItem(lease)); //維護recentlyChangedQueue
        registrant.setLastUpdatedTimestamp(); //更新最后更新時間
        
        // 使當前應(yīng)用的ResponseCache失效
        invalidateCache(registrant.getAppName(), registrant.getVIPAddress(), registrant.getSecureVipAddress());
        logger.info("Registered instance {}/{} with status {} (replication={})",
                registrant.getAppName(), registrant.getId(), registrant.getStatus(), isReplication);
    } finally {
        read.unlock(); //讀鎖
    }
}
  • 維護當前Instance實例的Lease租約信息,并放到Eureka Server維護注冊信息的Map:【ConcurrentHashMap<String, Map<String, Lease<InstanceInfo>>>】豺裆,對應(yīng)關(guān)系是 appName:<Instance實例Id拒秘,Lease租約信息>

  • 如果是新注冊,expectedNumberOfRenewsPerMin期待的每分鐘續(xù)約數(shù)+2臭猜, 并更新numberOfRenewsPerMinThreshold每分鐘續(xù)約閥值

  • 維護 recentRegisteredQueue最近注冊隊列躺酒,recentlyChangedQueue最近更改隊列

  • 如果本次注冊實例已經(jīng)維護了OverriddenStatus,根據(jù)一定規(guī)則蔑歌,維護本Server節(jié)點當前實例的OverriddenStatus

  • 設(shè)置Instance實例的最后更新時間戳

  • 對當前應(yīng)用對應(yīng)的ResponseCache緩存失效

    responseCache 用于緩存查詢的應(yīng)用實例信息

    其使用guava cache維護了一個可讀可寫的LocalLoadingCache本地緩存【readWriteCacheMap】羹应,還有一個只讀的ConcurrentMap 【readOnlyCacheMap】

    在 get(key, useReadOnlyCache)時首先會檢查【readOnlyCacheMap】只讀緩存次屠,如沒有园匹,再查【readWriteCacheMap】雳刺,而【readWriteCacheMap】的 get()其含義實際是 getOrLoad(),如果獲取不到從CacheLoader加載裸违,而CacheLoader會到維護應(yīng)用實例注冊信息的Map中獲取

    【readWriteCacheMap】是直接與維護應(yīng)用實例注冊信息Map交互的掖桦,查詢時會Load加載,注冊新實例時會失效整個應(yīng)用的

    【readOnlyCacheMap】是在【readWriteCacheMap】之上的只讀緩存供汛,由配置 eureka.server.useReadOnlyResponseCache控制枪汪,默認true,每隔 eureka.server.responseCacheUpdateIntervalMs=30s 與【readWriteCacheMap】同步一次


PeerAwareInstanceRegistryImpl#replicateToPeers() :復(fù)制到Eureka對等節(jié)點

// PeerAwareInstanceRegistryImpl#replicateToPeers()
/**
 * Replicates all eureka actions to peer eureka nodes except for replication
 * traffic to this node.
 */
private void replicateToPeers(Action action, String appName, String id,
                              InstanceInfo info /* optional */,
                              InstanceStatus newStatus /* optional */, boolean isReplication) {
    Stopwatch tracer = action.getTimer().start();
    try {
        // 如果是復(fù)制操作(針對當前節(jié)點紊馏,false)
        if (isReplication) {
            numberOfReplicationsLastMin.increment();
        }
        
        // If it is a replication already, do not replicate again as this will create a poison replication
        // 如果它已經(jīng)是復(fù)制料饥,請不要再次復(fù)制,直接return
        if (peerEurekaNodes == Collections.EMPTY_LIST || isReplication) {
            return;
        }

        // 遍歷集群所有節(jié)點(除當前節(jié)點外)
        for (final PeerEurekaNode node : peerEurekaNodes.getPeerEurekaNodes()) {
            // If the url represents this host, do not replicate to yourself.
            if (peerEurekaNodes.isThisMyUrl(node.getServiceUrl())) {
                continue;
            }
            
            // 復(fù)制Instance實例操作到某個node節(jié)點
            replicateInstanceActionsToPeers(action, appName, id, info, newStatus, node);
        }
    } 
    finally {
        tracer.stop();
    }
}

下面是replicateInstanceActionsToPeers()復(fù)制Instance實例操作到其它節(jié)點

// PeerAwareInstanceRegistryImpl#replicateInstanceActionsToPeers()
/**
 * Replicates all instance changes to peer eureka nodes except for
 * replication traffic to this node.
 *
 */
private void replicateInstanceActionsToPeers(Action action, String appName,
                                             String id, InstanceInfo info, InstanceStatus newStatus,
                                             PeerEurekaNode node) {
    try {
        InstanceInfo infoFromRegistry = null;
        CurrentRequestVersion.set(Version.V2);
        switch (action) {
            case Cancel:  //取消
                node.cancel(appName, id);
                break;
            case Heartbeat:  //心跳
                InstanceStatus overriddenStatus = overriddenInstanceStatusMap.get(id);
                infoFromRegistry = getInstanceByAppAndId(appName, id, false);
                node.heartbeat(appName, id, infoFromRegistry, overriddenStatus, false);
                break;
            case Register:  //注冊
                node.register(info);
                break;
            case StatusUpdate:  //狀態(tài)更新
                infoFromRegistry = getInstanceByAppAndId(appName, id, false);
                node.statusUpdate(appName, id, newStatus, infoFromRegistry);
                break;
            case DeleteStatusOverride:  //刪除OverrideStatus
                infoFromRegistry = getInstanceByAppAndId(appName, id, false);
                node.deleteStatusOverride(appName, id, infoFromRegistry);
                break;
        }
    } catch (Throwable t) {
        logger.error("Cannot replicate information to {} for action {}", node.getServiceUrl(), action.name(), t);
    }
}

本次只關(guān)心節(jié)點的注冊操作

// PeerEurekaNode#register()
/**
 * Sends the registration information of {@link InstanceInfo} receiving by
 * this node to the peer node represented by this class.
 *
 * @param info
 *            the instance information {@link InstanceInfo} of any instance
 *            that is send to this instance.
 * @throws Exception
 */
public void register(final InstanceInfo info) throws Exception {
    // 當前時間 + 30s后 過期
    long expiryTime = System.currentTimeMillis() + getLeaseRenewalOf(info);
    
    // 提交相同的操作到批量復(fù)制任務(wù)處理
    batchingDispatcher.process(
            taskId("register", info),
            new InstanceReplicationTask(targetHost, Action.Register, info, overriddenStatus:null, replicateInstanceInfo:true) {
                public EurekaHttpResponse<Void> execute() {
                    return replicationClient.register(info);
                }
            },
            expiryTime
    );
}

而之后就和Eureka Client發(fā)起注冊請求的調(diào)用差不多 replicationClient.register(info)

至此朱监,Spring Cloud Eureka Server的整個自動配置及初始化岸啡,以及接收注冊請求,并復(fù)制到集群中的對等節(jié)點就分析完了

大體時序流程參考:

eureka-server-register.png


參考:

Dive into Eureka: 宋順

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末赫编,一起剝皮案震驚了整個濱河市巡蘸,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌擂送,老刑警劉巖悦荒,帶你破解...
    沈念sama閱讀 212,383評論 6 493
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異嘹吨,居然都是意外死亡搬味,警方通過查閱死者的電腦和手機,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,522評論 3 385
  • 文/潘曉璐 我一進店門蟀拷,熙熙樓的掌柜王于貴愁眉苦臉地迎上來碰纬,“玉大人,你說我怎么就攤上這事问芬≡梦觯” “怎么了?”我有些...
    開封第一講書人閱讀 157,852評論 0 348
  • 文/不壞的土叔 我叫張陵此衅,是天一觀的道長强戴。 經(jīng)常有香客問我,道長挡鞍,這世上最難降的妖魔是什么骑歹? 我笑而不...
    開封第一講書人閱讀 56,621評論 1 284
  • 正文 為了忘掉前任,我火速辦了婚禮墨微,結(jié)果婚禮上陵刹,老公的妹妹穿的比我還像新娘。我一直安慰自己,他們只是感情好衰琐,可當我...
    茶點故事閱讀 65,741評論 6 386
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著炼蹦,像睡著了一般羡宙。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上掐隐,一...
    開封第一講書人閱讀 49,929評論 1 290
  • 那天狗热,我揣著相機與錄音,去河邊找鬼虑省。 笑死匿刮,一個胖子當著我的面吹牛,可吹牛的內(nèi)容都是我干的探颈。 我是一名探鬼主播熟丸,決...
    沈念sama閱讀 39,076評論 3 410
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼伪节!你這毒婦竟也來了光羞?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 37,803評論 0 268
  • 序言:老撾萬榮一對情侶失蹤怀大,失蹤者是張志新(化名)和其女友劉穎纱兑,沒想到半個月后,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體化借,經(jīng)...
    沈念sama閱讀 44,265評論 1 303
  • 正文 獨居荒郊野嶺守林人離奇死亡潜慎,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 36,582評論 2 327
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了蓖康。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片铐炫。...
    茶點故事閱讀 38,716評論 1 341
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖钓瞭,靈堂內(nèi)的尸體忽然破棺而出驳遵,到底是詐尸還是另有隱情,我是刑警寧澤山涡,帶...
    沈念sama閱讀 34,395評論 4 333
  • 正文 年R本政府宣布堤结,位于F島的核電站,受9級特大地震影響鸭丛,放射性物質(zhì)發(fā)生泄漏竞穷。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 40,039評論 3 316
  • 文/蒙蒙 一鳞溉、第九天 我趴在偏房一處隱蔽的房頂上張望瘾带。 院中可真熱鬧,春花似錦熟菲、人聲如沸看政。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,798評論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽允蚣。三九已至于颖,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間嚷兔,已是汗流浹背森渐。 一陣腳步聲響...
    開封第一講書人閱讀 32,027評論 1 266
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留冒晰,地道東北人同衣。 一個月前我還...
    沈念sama閱讀 46,488評論 2 361
  • 正文 我出身青樓,卻偏偏與公主長得像壶运,于是被迫代替她去往敵國和親耐齐。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當晚...
    茶點故事閱讀 43,612評論 2 350

推薦閱讀更多精彩內(nèi)容