SpringCloud(第 050 篇)Netflix Eureka 源碼深入剖析(下)

SpringCloud(第 050 篇)Netflix Eureka 源碼深入剖析(下)

一觅够、大致介紹

1、鑒于一些朋友的提問并提議講解下eureka的源碼分析穷缤,由此應(yīng)運(yùn)而產(chǎn)生的本章節(jié)的內(nèi)容惠拭;
2、所以我站在自我的理解角度試著整理了這篇Eureka源碼的分析,希望對大家有所幫助焕窝;
3蹬挺、由于篇幅太長不能在一篇里面發(fā)布出來,所以拆分了上下篇它掂;

二巴帮、基本原理

1、Eureka Server 提供服務(wù)注冊服務(wù)虐秋,各個(gè)節(jié)點(diǎn)啟動后榕茧,會在Eureka Server中進(jìn)行注冊,這樣Eureka Server中的服務(wù)注冊表中將會存儲所有可用服務(wù)節(jié)點(diǎn)的信息客给,服務(wù)節(jié)點(diǎn)的信息可以在界面中直觀的看到用押。
2、Eureka Client 是一個(gè)Java 客戶端靶剑,用于簡化與Eureka Server的交互蜻拨,客戶端同時(shí)也具備一個(gè)內(nèi)置的、使用輪詢負(fù)載算法的負(fù)載均衡器桩引。
3缎讼、在應(yīng)用啟動后,將會向Eureka Server發(fā)送心跳(默認(rèn)周期為30秒)坑匠,如果Eureka Server在多個(gè)心跳周期沒有收到某個(gè)節(jié)點(diǎn)的心跳血崭,Eureka Server 將會從服務(wù)注冊表中把這個(gè)服務(wù)節(jié)點(diǎn)移除(默認(rèn)90秒)。
4厘灼、Eureka Server之間將會通過復(fù)制的方式完成數(shù)據(jù)的同步夹纫;
5、Eureka Client具有緩存的機(jī)制设凹,即使所有的Eureka Server 都掛掉的話捷凄,客戶端依然可以利用緩存中的信息消費(fèi)其它服務(wù)的API;

三围来、EurekaServer 啟動流程分析

詳見 SpringCloud(第 049 篇)Netflix Eureka 源碼深入剖析(上)

四跺涤、EurekaServer 處理服務(wù)注冊匈睁、集群數(shù)據(jù)復(fù)制

詳見 SpringCloud(第 049 篇)Netflix Eureka 源碼深入剖析(上)

五、EurekaClient 啟動流程分析

5.1 調(diào)換運(yùn)行模式桶错,Run運(yùn)行 springms-discovery-eureka 服務(wù)航唆,Debug 運(yùn)行 springms-provider-user 服務(wù),先觀察日志先院刁;

2017-10-23 19:43:07.688  INFO 1488 --- [           main] o.s.c.support.DefaultLifecycleProcessor  : Starting beans in phase 0
2017-10-23 19:43:07.694  INFO 1488 --- [           main] o.s.c.n.eureka.InstanceInfoFactory       : Setting initial instance status as: STARTING
2017-10-23 19:43:07.874  INFO 1488 --- [           main] c.n.d.provider.DiscoveryJerseyProvider   : Using JSON encoding codec LegacyJacksonJson
2017-10-23 19:43:07.874  INFO 1488 --- [           main] c.n.d.provider.DiscoveryJerseyProvider   : Using JSON decoding codec LegacyJacksonJson
2017-10-23 19:43:07.971  INFO 1488 --- [           main] c.n.d.provider.DiscoveryJerseyProvider   : Using XML encoding codec XStreamXml
2017-10-23 19:43:07.971  INFO 1488 --- [           main] c.n.d.provider.DiscoveryJerseyProvider   : Using XML decoding codec XStreamXml
2017-10-23 19:43:08.134  INFO 1488 --- [           main] c.n.d.s.r.aws.ConfigClusterResolver      : Resolving eureka endpoints via configuration
2017-10-23 19:43:08.344  INFO 1488 --- [           main] com.netflix.discovery.DiscoveryClient    : Disable delta property : false
2017-10-23 19:43:08.344  INFO 1488 --- [           main] com.netflix.discovery.DiscoveryClient    : Single vip registry refresh property : null
2017-10-23 19:43:08.344  INFO 1488 --- [           main] com.netflix.discovery.DiscoveryClient    : Force full registry fetch : false
2017-10-23 19:43:08.344  INFO 1488 --- [           main] com.netflix.discovery.DiscoveryClient    : Application is null : false
2017-10-23 19:43:08.344  INFO 1488 --- [           main] com.netflix.discovery.DiscoveryClient    : Registered Applications size is zero : true
2017-10-23 19:43:08.344  INFO 1488 --- [           main] com.netflix.discovery.DiscoveryClient    : Application version is -1: true
2017-10-23 19:43:08.345  INFO 1488 --- [           main] com.netflix.discovery.DiscoveryClient    : Getting all instance registry info from the eureka server
2017-10-23 19:43:08.630  INFO 1488 --- [           main] com.netflix.discovery.DiscoveryClient    : The response status is 200
2017-10-23 19:43:08.631  INFO 1488 --- [           main] com.netflix.discovery.DiscoveryClient    : Starting heartbeat executor: renew interval is: 30
2017-10-23 19:43:08.634  INFO 1488 --- [           main] c.n.discovery.InstanceInfoReplicator     : InstanceInfoReplicator onDemand update allowed rate per min is 4
2017-10-23 19:43:08.637  INFO 1488 --- [           main] com.netflix.discovery.DiscoveryClient    : Discovery Client initialized at timestamp 1508758988637 with initial instances count: 0
2017-10-23 19:43:08.657  INFO 1488 --- [           main] c.n.e.EurekaDiscoveryClientConfiguration : Registering application springms-provider-user with eureka with status UP
2017-10-23 19:43:08.658  INFO 1488 --- [           main] com.netflix.discovery.DiscoveryClient    : Saw local status change event StatusChangeEvent [timestamp=1508758988658, current=UP, previous=STARTING]
2017-10-23 19:43:08.659  INFO 1488 --- [nfoReplicator-0] com.netflix.discovery.DiscoveryClient    : DiscoveryClient_SPRINGMS-PROVIDER-USER/springms-provider-user:192.168.3.101:7900: registering service...
2017-10-23 19:43:08.768  INFO 1488 --- [           main] s.b.c.e.t.TomcatEmbeddedServletContainer : Tomcat started on port(s): 7900 (http)
2017-10-23 19:43:08.768  INFO 1488 --- [           main] c.n.e.EurekaDiscoveryClientConfiguration : Updating port to 7900
2017-10-23 19:43:08.773  INFO 1488 --- [           main] c.s.cloud.MsProviderUserApplication      : Started MsProviderUserApplication in 9.694 seconds (JVM running for 10.398)
【【【【【【 用戶微服務(wù) 】】】】】】已啟動.

【分析一】:根據(jù)日志粗粒度看糯钙,大多數(shù)日志都是在 DiscoveryClient 打印出來的,由此我們先不妨將這些打印日志的地方都打上斷點(diǎn)退腥,為了后序
斷點(diǎn)查看調(diào)用堆棧信息任岸。

【分析二】:仔細(xì)查看下日志,先是 DefaultLifecycleProcessor 類處理了一些 bean,然后接下來肯定會調(diào)用一些實(shí)現(xiàn) SmartLifecycle 類的
 start 方法狡刘;

【分析三】: 接著初始化設(shè)置了EurekaClient的狀態(tài)為 STARTING享潜,初始化編碼使用的格式,哪些用JSON嗅蔬,哪些用XML剑按;

【分析四】: 緊接著打印了強(qiáng)制獲取注冊信息狀態(tài)為false,已注冊的應(yīng)用大小為0澜术,客戶端發(fā)送心跳續(xù)約艺蝴,心跳續(xù)約間隔為30秒,最后打印Client
初始化完成鸟废;

【分析五】:帶著這些通過日志查看出來的端倪猜敢,然后我們還得吸取分析EurekaServer的教訓(xùn),我們得先去 @EnableEurekaClient 注解瞧瞧盒延。

5.2 有目的性的先去 MsProviderUserApplication 看看锣枝,鏈接點(diǎn)進(jìn) EnableEurekaClient 瞧瞧。

@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Inherited
@EnableDiscoveryClient
public @interface EnableEurekaClient {

}

【分析一】:我們會發(fā)現(xiàn)兰英,@EnableEurekaClient 注解類竟然也使用了注解 @EnableDiscoveryClient撇叁,那么我們有必要去這個(gè)注解類看看。

@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Inherited
@Import(EnableDiscoveryClientImportSelector.class)
public @interface EnableDiscoveryClient {

}

【分析二】:我們看到的是 @EnableDiscoveryClient 注解類有個(gè)比較特殊的注解 @Import畦贸,由此我們猜想陨闹,這里的大多數(shù)邏輯是不是都寫在這個(gè) EnableDiscoveryClientImportSelector 類呢?

5.3 進(jìn)入 EnableDiscoveryClientImportSelector 看看到底做了些啥薄坏?

@Order(Ordered.LOWEST_PRECEDENCE - 100)
public class EnableDiscoveryClientImportSelector
        extends SpringFactoryImportSelector<EnableDiscoveryClient> {

    @Override
    protected boolean isEnabled() {
        return new RelaxedPropertyResolver(getEnvironment()).getProperty(
                "spring.cloud.discovery.enabled", Boolean.class, Boolean.TRUE);
    }

    @Override
    protected boolean hasDefaultFactory() {
        return true;
    }

}

【分析一】:EnableDiscoveryClientImportSelector 類集成了 SpringFactoryImportSelector 類趋厉,但是重寫了一個(gè) isEnabled() 方
法,默認(rèn)值返回 true胶坠,為什么會返回true君账,也得有個(gè)說法吧,于是我們進(jìn)入父類 EnableDiscoveryClientImportSelector 看看沈善。

/**
 * Select and return the names of which class(es) should be imported based on
 * the {@link AnnotationMetadata} of the importing @{@link Configuration} class.
 */
@Override
public String[] selectImports(AnnotationMetadata metadata) {
    if (!isEnabled()) { // 打上斷點(diǎn)
        return new String[0];
    }
    AnnotationAttributes attributes = AnnotationAttributes.fromMap(
            metadata.getAnnotationAttributes(this.annotationClass.getName(), true));

    Assert.notNull(attributes, "No " + getSimpleName() + " attributes found. Is "
            + metadata.getClassName() + " annotated with @" + getSimpleName() + "?");

    // Find all possible auto configuration classes, filtering duplicates
    List<String> factories = new ArrayList<>(new LinkedHashSet<>(SpringFactoriesLoader
            .loadFactoryNames(this.annotationClass, this.beanClassLoader)));

    if (factories.isEmpty() && !hasDefaultFactory()) {
        throw new IllegalStateException("Annotation @" + getSimpleName()
                + " found, but there are no implementations. Did you forget to include a starter?");
    }

    if (factories.size() > 1) {
        // there should only ever be one DiscoveryClient, but there might be more than
        // one factory
        log.warn("More than one implementation " + "of @" + getSimpleName()
                + " (now relying on @Conditionals to pick one): " + factories);
    }

    return factories.toArray(new String[factories.size()]);
}

【分析二】:發(fā)現(xiàn)父類有這么一個(gè) selectImports 方法使用了 isEnabled() 方法乡数,這個(gè)方法干了些啥事情呢椭蹄?我們細(xì)看下 selectImports 方法上面的英文注釋,大致意思是:選擇并且返回需要導(dǎo)入經(jīng)過注解配置的類净赴,由此我們猜想這個(gè)導(dǎo)入的類肯定對我們此次客戶端分析有莫大的幫助绳矩,于
是我們現(xiàn)在這個(gè)方法打上斷點(diǎn)先。于是我們現(xiàn)在該干的事情也干了玖翅,沒有頭緒的時(shí)候翼馆,我們現(xiàn)在才Run運(yùn)行EurekaServer,Debug運(yùn)行springms-provider-user金度。

5.4 EnableDiscoveryClientImportSelector.selectImports 這個(gè)方法果然進(jìn)斷點(diǎn)了应媚。

【分析一】:既然進(jìn)了斷點(diǎn),我們看看這個(gè)方法猜极,首先通過注解獲取了一些屬性中姜,然后加載了一些類名稱,于是我們進(jìn)入 loadFactoryNames 方法看看魔吐。

public static List<String> loadFactoryNames(Class<?> factoryClass, ClassLoader classLoader) {
    String factoryClassName = factoryClass.getName();
    try {
        // 注釋:public static final String FACTORIES_RESOURCE_LOCATION = "META-INF/spring.factories";
        // 注釋:這個(gè) jar 包下的一個(gè)配置文件
        Enumeration<URL> urls = (classLoader != null ? classLoader.getResources(FACTORIES_RESOURCE_LOCATION) :
                ClassLoader.getSystemResources(FACTORIES_RESOURCE_LOCATION));
        List<String> result = new ArrayList<String>();
        while (urls.hasMoreElements()) {
            URL url = urls.nextElement();
            Properties properties = PropertiesLoaderUtils.loadProperties(new UrlResource(url));
            String factoryClassNames = properties.getProperty(factoryClassName);
            result.addAll(Arrays.asList(StringUtils.commaDelimitedListToStringArray(factoryClassNames)));
        }
        return result;
    }
    catch (IOException ex) {
        throw new IllegalArgumentException("Unable to load [" + factoryClass.getName() +
                "] factories from location [" + FACTORIES_RESOURCE_LOCATION + "]", ex);
    }
}

【分析二】:加載了一個(gè)配置文件扎筒,配置文件里面寫了啥呢莱找?打開SpringFactoryImportSelector該文件所在的jar包的spring.factories文件一看酬姆。

# AutoConfiguration
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
org.springframework.cloud.client.CommonsClientAutoConfiguration,\
org.springframework.cloud.client.discovery.noop.NoopDiscoveryClientAutoConfiguration,\
org.springframework.cloud.client.hypermedia.CloudHypermediaAutoConfiguration,\
org.springframework.cloud.client.loadbalancer.LoadBalancerAutoConfiguration,\
org.springframework.cloud.commons.util.UtilAutoConfiguration


# Environment Post Processors
org.springframework.boot.env.EnvironmentPostProcessor=\
org.springframework.cloud.client.HostInfoEnvironmentPostProcessor

【分析三】:看名稱,都是一些 Configuration 后綴的類名奥溺,所以這些都是加載的一堆堆的配置文件類辞色。于是我們繼續(xù)斷點(diǎn)往下走,發(fā)現(xiàn) 
factories 對象里面只有一個(gè)類名路徑為 org.springframework.cloud.netflix.eureka.EurekaDiscoveryClientConfiguration 浮定∠嗦看這個(gè)
名字就應(yīng)該知道這是我們分析EurekaClient的一個(gè)重要的配置類,先不管三七二十一桦卒,找到該類先立美。

5.5 進(jìn)入 EurekaDiscoveryClientConfiguration 看看,這個(gè)配置類有哪些重要的方法方灾?

@Configuration
@EnableConfigurationProperties
@ConditionalOnClass(EurekaClientConfig.class)
@ConditionalOnProperty(value = "eureka.client.enabled", matchIfMissing = true)
@CommonsLog
public class EurekaDiscoveryClientConfiguration implements SmartLifecycle, Ordered {

    @Override
    public void start() {
        // only set the port if the nonSecurePort is 0 and this.port != 0
        if (this.port.get() != 0 && this.instanceConfig.getNonSecurePort() == 0) {
            this.instanceConfig.setNonSecurePort(this.port.get());
        }

        // only initialize if nonSecurePort is greater than 0 and it isn't already running
        // because of containerPortInitializer below
        if (!this.running.get() && this.instanceConfig.getNonSecurePort() > 0) {

            maybeInitializeClient();

            if (log.isInfoEnabled()) {
                log.info("Registering application " + this.instanceConfig.getAppname()
                        + " with eureka with status "
                        + this.instanceConfig.getInitialStatus());
            }

            this.applicationInfoManager
                    .setInstanceStatus(this.instanceConfig.getInitialStatus());

            if (this.healthCheckHandler != null) {
                this.eurekaClient.registerHealthCheck(this.healthCheckHandler);
            }
            this.context.publishEvent(
                    new InstanceRegisteredEvent<>(this, this.instanceConfig));
            this.running.set(true);
        }
    }

    建蹄。。裕偿。 其它省略了
}

【分析一】:進(jìn)入這個(gè)類洞慎,首先看到該類實(shí)現(xiàn)了 SmartLifecycle 接口,那么就肯定會實(shí)現(xiàn) start 方法嘿棘,而且這個(gè) start 方法感覺應(yīng)在 “步驟5.1之分析二” 會被加載執(zhí)行的劲腿。

【分析二】:因?yàn)?start 這段代碼不多,所以我就索性將 start 方法中的每段代碼都點(diǎn)進(jìn)去看了看鸟妙,發(fā)現(xiàn) this.applicationInfoManager.setInstanceStatus(this.instanceConfig.getInitialStatus()) 這段代碼有一個(gè)觀察者模式的回調(diào)存在焦人。

// ApplicationInfoManager.setInstanceStatus 的方法
public synchronized void setInstanceStatus(InstanceStatus status) {// 打上斷點(diǎn)
    InstanceStatus prev = instanceInfo.setStatus(status);
    if (prev != null) {
        for (StatusChangeListener listener : listeners.values()) {
            try {
                listener.notify(new StatusChangeEvent(prev, status));
            } catch (Exception e) {
                logger.warn("failed to notify listener: {}", listener.getId(), e);
            }
        }
    }
}

【分析三】:這個(gè)方法會因?yàn)闋顟B(tài)的改變而回調(diào)所有實(shí)現(xiàn) StatusChangeListener 這個(gè)類的地方挥吵,前提得先注冊到 listeners 中去才行。

【分析四】:于是乎垃瞧,我們斷定蔫劣,若想要回調(diào),那么就必須有地方先注冊這個(gè)事件个从,而且這個(gè)注冊還必須提前執(zhí)行在 start 方法前執(zhí)行脉幢,于是我們得先
在 ApplicationInfoManager 這個(gè)類中找到注冊到 listeners 的這個(gè)方法。

public void registerStatusChangeListener(StatusChangeListener listener) {// 打上斷點(diǎn)
    listeners.put(listener.getId(), listener);
}

【分析五】:沒錯(cuò)嗦锐,就是這個(gè)方法嫌松,肯定有地方調(diào)用這個(gè)方法,不然的話奕污,那調(diào)用 setInstanceStatus 這個(gè)方法的意義就什么用了萎羔。于是我們逆向找
下 registerStatusChangeListener 被調(diào)用的地方。

【分析六】:很不巧的是碳默,盡然只有1個(gè)地方被調(diào)用贾陷,這個(gè)地方就是 DiscoveryClient.initScheduledTasks 方法,而且 initScheduledTasks 
方法又是在 DiscoveryClient 的構(gòu)造函數(shù)里面調(diào)用的嘱根,同時(shí)我們也對 initScheduledTasks 以及 initScheduledTasks 被調(diào)用的構(gòu)造方法地方
打上斷點(diǎn)髓废。

5.6 由于翻閱代碼時(shí)間有點(diǎn)久了,因此我們關(guān)閉 springms-provider-user 微服務(wù)该抒,重新 Debug 運(yùn)行一下慌洪。

【分析一】:果不其然,EurekaDiscoveryClientConfiguration.start 方法被調(diào)用了凑保,緊接著 this.applicationInfoManager.setInstanceStatus(this.instanceConfig.getInitialStatus()) 也進(jìn)入斷點(diǎn)冈爹,然后在往下走,又進(jìn)入的 
DiscoveryClient.initScheduledTasks 方法中的 notify 回調(diào)處欧引。

【分析二】:看著斷點(diǎn)依次經(jīng)過我們上述分析的地方频伤,然后也符合日志打印的順序,所以我們現(xiàn)在應(yīng)該是有必要好好看看 DiscoveryClient.initScheduledTasks 這個(gè)方法究竟干了什么偉大的事情芝此。然而又想了想憋肖,還不如看看 initScheduledTasks 被調(diào)用的構(gòu)造方法。

5.7 進(jìn)入 DiscoveryClient 經(jīng)過 @Inject 注解過的構(gòu)造方法癌蓖。

@Inject
DiscoveryClient(ApplicationInfoManager applicationInfoManager, EurekaClientConfig config, DiscoveryClientOptionalArgs args, Provider<BackupRegistry> backupRegistryProvider) {
    if (args != null) {
        this.healthCheckHandlerProvider = args.healthCheckHandlerProvider;
        this.healthCheckCallbackProvider = args.healthCheckCallbackProvider;
        this.eventListeners.addAll(args.getEventListeners());
    } else {
        this.healthCheckCallbackProvider = null;
        this.healthCheckHandlerProvider = null;
    }
    
    this.applicationInfoManager = applicationInfoManager;
    InstanceInfo myInfo = applicationInfoManager.getInfo();

    clientConfig = config;
    staticClientConfig = clientConfig;
    transportConfig = config.getTransportConfig();
    instanceInfo = myInfo;
    if (myInfo != null) {
        appPathIdentifier = instanceInfo.getAppName() + "/" + instanceInfo.getId();
    } else {
        logger.warn("Setting instanceInfo to a passed in null value");
    }

    this.backupRegistryProvider = backupRegistryProvider;

    this.urlRandomizer = new EndpointUtils.InstanceInfoBasedUrlRandomizer(instanceInfo);
    localRegionApps.set(new Applications());

    fetchRegistryGeneration = new AtomicLong(0);

    remoteRegionsToFetch = new AtomicReference<String>(clientConfig.fetchRegistryForRemoteRegions());
    remoteRegionsRef = new AtomicReference<>(remoteRegionsToFetch.get() == null ? null : remoteRegionsToFetch.get().split(","));

    if (config.shouldFetchRegistry()) {
        this.registryStalenessMonitor = new ThresholdLevelsMetric(this, METRIC_REGISTRY_PREFIX + "lastUpdateSec_", new long[]{15L, 30L, 60L, 120L, 240L, 480L});
    } else {
        this.registryStalenessMonitor = ThresholdLevelsMetric.NO_OP_METRIC;
    }

    if (config.shouldRegisterWithEureka()) {
        this.heartbeatStalenessMonitor = new ThresholdLevelsMetric(this, METRIC_REGISTRATION_PREFIX + "lastHeartbeatSec_", new long[]{15L, 30L, 60L, 120L, 240L, 480L});
    } else {
        this.heartbeatStalenessMonitor = ThresholdLevelsMetric.NO_OP_METRIC;
    }

    if (!config.shouldRegisterWithEureka() && !config.shouldFetchRegistry()) {
        logger.info("Client configured to neither register nor query for data.");
        scheduler = null;
        heartbeatExecutor = null;
        cacheRefreshExecutor = null;
        eurekaTransport = null;
        instanceRegionChecker = new InstanceRegionChecker(new PropertyBasedAzToRegionMapper(config), clientConfig.getRegion());

        // This is a bit of hack to allow for existing code using DiscoveryManager.getInstance()
        // to work with DI'd DiscoveryClient
        DiscoveryManager.getInstance().setDiscoveryClient(this);
        DiscoveryManager.getInstance().setEurekaClientConfig(config);

        initTimestampMs = System.currentTimeMillis();

        logger.info("Discovery Client initialized at timestamp {} with initial instances count: {}",
                initTimestampMs, this.getApplications().size());
        return;  // no need to setup up an network tasks and we are done
    }

    try {
        // 注釋:定時(shí)任務(wù)調(diào)度準(zhǔn)備
        scheduler = Executors.newScheduledThreadPool(3,
                new ThreadFactoryBuilder()
                        .setNameFormat("DiscoveryClient-%d")
                        .setDaemon(true)
                        .build());

        // 注釋:實(shí)例化心跳定時(shí)任務(wù)線程池
        heartbeatExecutor = new ThreadPoolExecutor(
                1, clientConfig.getHeartbeatExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,
                new SynchronousQueue<Runnable>(),
                new ThreadFactoryBuilder()
                        .setNameFormat("DiscoveryClient-HeartbeatExecutor-%d")
                        .setDaemon(true)
                        .build()
        );  // use direct handoff

        // 注釋:實(shí)例化緩存刷新定時(shí)任務(wù)線程池
        cacheRefreshExecutor = new ThreadPoolExecutor(
                1, clientConfig.getCacheRefreshExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,
                new SynchronousQueue<Runnable>(),
                new ThreadFactoryBuilder()
                        .setNameFormat("DiscoveryClient-CacheRefreshExecutor-%d")
                        .setDaemon(true)
                        .build()
        );  // use direct handoff

        eurekaTransport = new EurekaTransport();
        scheduleServerEndpointTask(eurekaTransport, args);

        AzToRegionMapper azToRegionMapper;
        if (clientConfig.shouldUseDnsForFetchingServiceUrls()) {
            azToRegionMapper = new DNSBasedAzToRegionMapper(clientConfig);
        } else {
            azToRegionMapper = new PropertyBasedAzToRegionMapper(clientConfig);
        }
        if (null != remoteRegionsToFetch.get()) {
            azToRegionMapper.setRegionsToFetch(remoteRegionsToFetch.get().split(","));
        }
        instanceRegionChecker = new InstanceRegionChecker(azToRegionMapper, clientConfig.getRegion());
    } catch (Throwable e) {
        throw new RuntimeException("Failed to initialize DiscoveryClient!", e);
    }

    if (clientConfig.shouldFetchRegistry() && !fetchRegistry(false)) {
        fetchRegistryFromBackup();
    }

    // 注釋:初始化調(diào)度任務(wù)
    initScheduledTasks();
    try {
        Monitors.registerObject(this);
    } catch (Throwable e) {
        logger.warn("Cannot register timers", e);
    }

    // This is a bit of hack to allow for existing code using DiscoveryManager.getInstance()
    // to work with DI'd DiscoveryClient
    DiscoveryManager.getInstance().setDiscoveryClient(this);
    DiscoveryManager.getInstance().setEurekaClientConfig(config);

    initTimestampMs = System.currentTimeMillis();
    logger.info("Discovery Client initialized at timestamp {} with initial instances count: {}",
            initTimestampMs, this.getApplications().size());
}

【分析一】:從往下看瞬哼,initScheduledTasks 這個(gè)方法顧名思義就是初始化調(diào)度任務(wù),所以這里面的內(nèi)容應(yīng)該就是重頭戲租副,進(jìn)入看看坐慰。

private void initScheduledTasks() {
    if (clientConfig.shouldFetchRegistry()) {
        // registry cache refresh timer
        // 注釋:間隔多久去拉取服務(wù)注冊信息,默認(rèn)時(shí)間 30秒
        int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds();
        int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound();
        // 注釋:定時(shí)任務(wù),每間隔 30秒 去拉取一次服務(wù)注冊信息
        scheduler.schedule(
                new TimedSupervisorTask(
                        "cacheRefresh",
                        scheduler,
                        cacheRefreshExecutor,
                        registryFetchIntervalSeconds,
                        TimeUnit.SECONDS,
                        expBackOffBound,
                        new CacheRefreshThread()
                ),
                registryFetchIntervalSeconds, TimeUnit.SECONDS);
    }

    if (clientConfig.shouldRegisterWithEureka()) {
        // 注釋:間隔多久發(fā)送一次心跳續(xù)約结胀,默認(rèn)間隔時(shí)間 30 秒
        int renewalIntervalInSecs = instanceInfo.getLeaseInfo().getRenewalIntervalInSecs();
        int expBackOffBound = clientConfig.getHeartbeatExecutorExponentialBackOffBound();
        logger.info("Starting heartbeat executor: " + "renew interval is: " + renewalIntervalInSecs);

        // Heartbeat timer
        // 注釋:定時(shí)任務(wù)赞咙,每間隔 30秒 去想 EurekaServer 發(fā)送一次心跳續(xù)約
        scheduler.schedule(
                new TimedSupervisorTask(
                        "heartbeat",
                        scheduler,
                        heartbeatExecutor,
                        renewalIntervalInSecs,
                        TimeUnit.SECONDS,
                        expBackOffBound,
                        new HeartbeatThread()
                ),
                renewalIntervalInSecs, TimeUnit.SECONDS);

        // InstanceInfo replicator
        // 注釋:實(shí)例信息復(fù)制器,定時(shí)刷新dataCenterInfo數(shù)據(jù)中心信息糟港,默認(rèn)30秒
        instanceInfoReplicator = new InstanceInfoReplicator(
                this,
                instanceInfo,
                clientConfig.getInstanceInfoReplicationIntervalSeconds(),
                2); // burstSize

        // 注釋:實(shí)例化狀態(tài)變化監(jiān)聽器
        statusChangeListener = new ApplicationInfoManager.StatusChangeListener() {
            @Override
            public String getId() {
                return "statusChangeListener";
            }

            @Override
            public void notify(StatusChangeEvent statusChangeEvent) {
                if (InstanceStatus.DOWN == statusChangeEvent.getStatus() ||
                        InstanceStatus.DOWN == statusChangeEvent.getPreviousStatus()) {
                    // log at warn level if DOWN was involved
                    logger.warn("Saw local status change event {}", statusChangeEvent);
                } else {
                    logger.info("Saw local status change event {}", statusChangeEvent);
                }

                // 注釋:狀態(tài)有變化的話攀操,會回調(diào)這個(gè)方法
                instanceInfoReplicator.onDemandUpdate();
            }
        };

        // 注釋:注冊狀態(tài)變化監(jiān)聽器
        if (clientConfig.shouldOnDemandUpdateStatusChange()) {
            applicationInfoManager.registerStatusChangeListener(statusChangeListener);
        }

        instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds());
    } else {
        logger.info("Not registering with Eureka server per configuration");
    }
}

【分析二】:在這個(gè)方法從上往下一路注釋分析下來,干了EurekaClient我們最想知道的一些事情秸抚,定時(shí)任務(wù)獲取注冊信息速和,定時(shí)任務(wù)刷新緩存,定時(shí)
任務(wù)心跳續(xù)約剥汤,定時(shí)任務(wù)同步數(shù)據(jù)中心數(shù)據(jù)颠放,狀態(tài)變化監(jiān)聽回調(diào)等。但是唯獨(dú)沒看到注冊吭敢,這是怎么回事呢碰凶?

【分析三】:我們忘記了一個(gè)重要的方法,instanceInfoReplicator.onDemandUpdate() 就是在狀態(tài)改變的時(shí)候鹿驼,我們是如何處理的欲低?由此,我們覺得這里面肯定有貓膩畜晰,不然沒辦法注冊呀砾莱。

public boolean onDemandUpdate() {
    if (rateLimiter.acquire(burstSize, allowedRatePerMinute)) {
        scheduler.submit(new Runnable() {
            @Override
            public void run() {
                logger.debug("Executing on-demand update of local InstanceInfo");

                Future latestPeriodic = scheduledPeriodicRef.get();
                if (latestPeriodic != null && !latestPeriodic.isDone()) {
                    logger.debug("Canceling the latest scheduled update, it will be rescheduled at the end of on demand update");
                    latestPeriodic.cancel(false);
                }

                // 注釋:這里進(jìn)行了實(shí)例信息刷新和注冊
                InstanceInfoReplicator.this.run();
            }
        });
        return true;
    } else {
        logger.warn("Ignoring onDemand update due to rate limiter");
        return false;
    }
}

【分析四】:onDemandUpdate 這個(gè)方法,看來看去舷蟀,唯獨(dú) InstanceInfoReplicator.this.run() 這個(gè)方法還有點(diǎn)用恤磷,而且還是 run 方法呢面哼,感情 InstanceInfoReplicator 這個(gè)類還是實(shí)現(xiàn)了 Runnable 接口野宜?經(jīng)過查看這個(gè)類,還真是實(shí)現(xiàn)了 Runnable 接口魔策。

【分析五】:于是乎匈子,我們有理由相信,這個(gè)方法應(yīng)該我們要找的注冊所在的地方闯袒,翻開代碼看看究竟虎敦。

public void run() {
    try {
        discoveryClient.refreshInstanceInfo();

        Long dirtyTimestamp = instanceInfo.isDirtyWithTime();
        if (dirtyTimestamp != null) {
            discoveryClient.register();
            instanceInfo.unsetIsDirty(dirtyTimestamp);
        }
    } catch (Throwable t) {
        logger.warn("There was a problem with the instance info replicator", t);
    } finally {
        Future next = scheduler.schedule(this, replicationIntervalSeconds, TimeUnit.SECONDS);
        scheduledPeriodicRef.set(next);
    }
}

【分析六】:映入眼簾的就是 discoveryClient.register() 這個(gè)刺眼的 register 方法,終于有點(diǎn)苗頭了政敢,原來注冊方法找的這么千辛萬苦其徙。雖然找到了這里,但是我還是想看看這個(gè)讓我們找的千辛萬苦的注冊方法到底是怎么注冊的呢喷户?

boolean register() throws Throwable {
    logger.info(PREFIX + appPathIdentifier + ": registering service...");
    EurekaHttpResponse<Void> httpResponse;
    try {
        httpResponse = eurekaTransport.registrationClient.register(instanceInfo);
    } catch (Exception e) {
        logger.warn("{} - registration failed {}", PREFIX + appPathIdentifier, e.getMessage(), e);
        throw e;
    }
    if (logger.isInfoEnabled()) {
        logger.info("{} - registration status: {}", PREFIX + appPathIdentifier, httpResponse.getStatusCode());
    }
    return httpResponse.getStatusCode() == 204;
}

【分析七】:原來調(diào)用了 EurekaHttpClient 封裝的客戶端請求對象來進(jìn)行注冊的唾那,再繼續(xù)深探 registrationClient.register 方法,于是我們來到了 AbstractJerseyEurekaHttpClient.register 方法褪尝。

@Override
public EurekaHttpResponse<Void> register(InstanceInfo info) {
    String urlPath = "apps/" + info.getAppName();
    ClientResponse response = null;
    try {
        Builder resourceBuilder = jerseyClient.resource(serviceUrl).path(urlPath).getRequestBuilder();
        addExtraHeaders(resourceBuilder);
        response = resourceBuilder
                .header("Accept-Encoding", "gzip")
                .type(MediaType.APPLICATION_JSON_TYPE)
                .accept(MediaType.APPLICATION_JSON)
                // 注釋:打包帶上當(dāng)前應(yīng)用的所有信息 info
                .post(ClientResponse.class, info);
        return anEurekaHttpResponse(response.getStatus()).headers(headersOf(response)).build();
    } finally {
        if (logger.isDebugEnabled()) {
            logger.debug("Jersey HTTP POST {}/{} with instance {}; statusCode={}", serviceUrl, urlPath, info.getId(),
                    response == null ? "N/A" : response.getStatus());
        }
        if (response != null) {
            response.close();
        }
    }
}

【分析八】:原來調(diào)用的是 Jersey RESTful 框架來進(jìn)行請求的闹获,然后在 EurekaServer 那邊就會在 ApplicationResource.addInstance 方法接收客戶端的注冊請求期犬,因此我們的 EurekaClient 是如何注冊的就到此為止了。

【分析九】:至于那些續(xù)約避诽、心跳的流程分析和這個(gè)注冊的流程大體差不多龟虎,相信大家按照我剛剛這么分析斷點(diǎn)下去,一定能分析的很到位的沙庐。

六鲤妥、下載地址

https://gitee.com/ylimhhmily/SpringCloudTutorial.git

SpringCloudTutorial交流QQ群: 235322432

SpringCloudTutorial交流微信群: 微信溝通群二維碼圖片鏈接

歡迎關(guān)注,您的肯定是對我最大的支持!!!

<上一篇????????首頁????????下一篇>

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末拱雏,一起剝皮案震驚了整個(gè)濱河市旭斥,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌古涧,老刑警劉巖垂券,帶你破解...
    沈念sama閱讀 206,126評論 6 481
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異羡滑,居然都是意外死亡菇爪,警方通過查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,254評論 2 382
  • 文/潘曉璐 我一進(jìn)店門柒昏,熙熙樓的掌柜王于貴愁眉苦臉地迎上來凳宙,“玉大人,你說我怎么就攤上這事职祷∈仙” “怎么了?”我有些...
    開封第一講書人閱讀 152,445評論 0 341
  • 文/不壞的土叔 我叫張陵有梆,是天一觀的道長是尖。 經(jīng)常有香客問我,道長泥耀,這世上最難降的妖魔是什么饺汹? 我笑而不...
    開封第一講書人閱讀 55,185評論 1 278
  • 正文 為了忘掉前任,我火速辦了婚禮痰催,結(jié)果婚禮上兜辞,老公的妹妹穿的比我還像新娘。我一直安慰自己夸溶,他們只是感情好逸吵,可當(dāng)我...
    茶點(diǎn)故事閱讀 64,178評論 5 371
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著缝裁,像睡著了一般扫皱。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 48,970評論 1 284
  • 那天啸罢,我揣著相機(jī)與錄音编检,去河邊找鬼。 笑死扰才,一個(gè)胖子當(dāng)著我的面吹牛允懂,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播衩匣,決...
    沈念sama閱讀 38,276評論 3 399
  • 文/蒼蘭香墨 我猛地睜開眼蕾总,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了琅捏?” 一聲冷哼從身側(cè)響起生百,我...
    開封第一講書人閱讀 36,927評論 0 259
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎柄延,沒想到半個(gè)月后蚀浆,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 43,400評論 1 300
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡搜吧,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 35,883評論 2 323
  • 正文 我和宋清朗相戀三年市俊,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片滤奈。...
    茶點(diǎn)故事閱讀 37,997評論 1 333
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡摆昧,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出蜒程,到底是詐尸還是另有隱情绅你,我是刑警寧澤,帶...
    沈念sama閱讀 33,646評論 4 322
  • 正文 年R本政府宣布昭躺,位于F島的核電站忌锯,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏窍仰。R本人自食惡果不足惜汉规,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,213評論 3 307
  • 文/蒙蒙 一礼殊、第九天 我趴在偏房一處隱蔽的房頂上張望驹吮。 院中可真熱鬧,春花似錦晶伦、人聲如沸碟狞。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,204評論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽族沃。三九已至,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間脆淹,已是汗流浹背常空。 一陣腳步聲響...
    開封第一講書人閱讀 31,423評論 1 260
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留盖溺,地道東北人漓糙。 一個(gè)月前我還...
    沈念sama閱讀 45,423評論 2 352
  • 正文 我出身青樓,卻偏偏與公主長得像烘嘱,于是被迫代替她去往敵國和親昆禽。 傳聞我的和親對象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 42,722評論 2 345

推薦閱讀更多精彩內(nèi)容