SpringCloud(第 049 篇)Netflix Eureka 源碼深入剖析(上)
一蜓堕、大致介紹
1宇弛、鑒于一些朋友的提問并提議講解下eureka的源碼分析令境,由此應(yīng)運(yùn)而產(chǎn)生的本章節(jié)的內(nèi)容;
2富稻、所以我站在自我的理解角度試著整理了這篇Eureka源碼的分析,希望對(duì)大家有所幫助白胀;
3椭赋、由于篇幅太長不能在一篇里面發(fā)布出來,所以拆分了上下篇或杠;
二哪怔、基本原理
1、Eureka Server 提供服務(wù)注冊(cè)服務(wù)向抢,各個(gè)節(jié)點(diǎn)啟動(dòng)后认境,會(huì)在Eureka Server中進(jìn)行注冊(cè),這樣Eureka Server中的服務(wù)注冊(cè)表中將會(huì)存儲(chǔ)所有可用服務(wù)節(jié)點(diǎn)的信息挟鸠,服務(wù)節(jié)點(diǎn)的信息可以在界面中直觀的看到叉信。
2、Eureka Client 是一個(gè)Java 客戶端艘希,用于簡(jiǎn)化與Eureka Server的交互硼身,客戶端同時(shí)也具備一個(gè)內(nèi)置的、使用輪詢負(fù)載算法的負(fù)載均衡器覆享。
3鸠姨、在應(yīng)用啟動(dòng)后,將會(huì)向Eureka Server發(fā)送心跳(默認(rèn)周期為30秒)淹真,如果Eureka Server在多個(gè)心跳周期沒有收到某個(gè)節(jié)點(diǎn)的心跳讶迁,Eureka Server 將會(huì)從服務(wù)注冊(cè)表中把這個(gè)服務(wù)節(jié)點(diǎn)移除(默認(rèn)90秒)。
4核蘸、Eureka Server之間將會(huì)通過復(fù)制的方式完成數(shù)據(jù)的同步巍糯;
5、Eureka Client具有緩存的機(jī)制客扎,即使所有的Eureka Server 都掛掉的話祟峦,客戶端依然可以利用緩存中的信息消費(fèi)其它服務(wù)的API;
三徙鱼、EurekaServer 啟動(dòng)流程分析
3.1 跑一下 springms-discovery-eureka 代碼宅楞,不難發(fā)現(xiàn)针姿,我們會(huì)看到一些有關(guān) EurekaServer 啟動(dòng)的流程日志;
2017-10-22 18:14:17.635 INFO 5288 --- [ main] o.s.j.e.a.AnnotationMBeanExporter : Located managed bean 'environmentManager': registering with JMX server as MBean [org.springframework.cloud.context.environment:name=environmentManager,type=EnvironmentManager]
2017-10-22 18:14:17.650 INFO 5288 --- [ main] o.s.j.e.a.AnnotationMBeanExporter : Located managed bean 'restartEndpoint': registering with JMX server as MBean [org.springframework.cloud.context.restart:name=restartEndpoint,type=RestartEndpoint]
2017-10-22 18:14:17.661 INFO 5288 --- [ main] o.s.j.e.a.AnnotationMBeanExporter : Located managed bean 'refreshScope': registering with JMX server as MBean [org.springframework.cloud.context.scope.refresh:name=refreshScope,type=RefreshScope]
2017-10-22 18:14:17.674 INFO 5288 --- [ main] o.s.j.e.a.AnnotationMBeanExporter : Located managed bean 'configurationPropertiesRebinder': registering with JMX server as MBean [org.springframework.cloud.context.properties:name=configurationPropertiesRebinder,context=335b5620,type=ConfigurationPropertiesRebinder]
2017-10-22 18:14:17.683 INFO 5288 --- [ main] o.s.j.e.a.AnnotationMBeanExporter : Located managed bean 'refreshEndpoint': registering with JMX server as MBean [org.springframework.cloud.endpoint:name=refreshEndpoint,type=RefreshEndpoint]
2017-10-22 18:14:17.926 INFO 5288 --- [ main] o.s.c.support.DefaultLifecycleProcessor : Starting beans in phase 0
2017-10-22 18:14:17.927 INFO 5288 --- [ main] c.n.e.EurekaDiscoveryClientConfiguration : Registering application unknown with eureka with status UP
2017-10-22 18:14:17.927 INFO 5288 --- [ Thread-10] o.s.c.n.e.server.EurekaServerBootstrap : Setting the eureka configuration..
2017-10-22 18:14:17.948 INFO 5288 --- [ Thread-10] o.s.c.n.e.server.EurekaServerBootstrap : isAws returned false
2017-10-22 18:14:17.949 INFO 5288 --- [ Thread-10] o.s.c.n.e.server.EurekaServerBootstrap : Initialized server context
2017-10-22 18:14:17.949 INFO 5288 --- [ Thread-10] c.n.e.r.PeerAwareInstanceRegistryImpl : Got 1 instances from neighboring DS node
2017-10-22 18:14:17.949 INFO 5288 --- [ Thread-10] c.n.e.r.PeerAwareInstanceRegistryImpl : Renew threshold is: 1
2017-10-22 18:14:17.949 INFO 5288 --- [ Thread-10] c.n.e.r.PeerAwareInstanceRegistryImpl : Changing status to UP
2017-10-22 18:14:17.958 INFO 5288 --- [ Thread-10] e.s.EurekaServerInitializerConfiguration : Started Eureka Server
2017-10-22 18:14:18.019 INFO 5288 --- [ main] s.b.c.e.t.TomcatEmbeddedServletContainer : Tomcat started on port(s): 8761 (http)
2017-10-22 18:14:18.020 INFO 5288 --- [ main] c.n.e.EurekaDiscoveryClientConfiguration : Updating port to 8761
2017-10-22 18:14:18.023 INFO 5288 --- [ main] c.s.cloud.EurekaServerApplication : Started EurekaServerApplication in 8.299 seconds (JVM running for 8.886)
【【【【【【 Eureka微服務(wù) 】】】】】】已啟動(dòng).
【分析】:發(fā)現(xiàn)有這么一句日志打印“Setting the eureka configuration..”厌衙,eureka 開始進(jìn)行配置距淫,說不定也許就是Eureka Server 流程啟動(dòng)的開
始呢?我們抱著懷疑的心態(tài)進(jìn)入這行日志打印的EurekaServerBootstrap類去看看婶希。
3.2 進(jìn)入 EurekaServerBootstrap 類看看榕暇,看這個(gè)類的名字,見名知意喻杈,應(yīng)該就是 EurekaServer 的啟動(dòng)類了彤枢;
protected void initEurekaEnvironment() throws Exception {
log.info("Setting the eureka configuration..");
。筒饰。缴啡。
}
【分析一】:我們看到日志在 initEurekaEnvironment 方法中被打印出來,然后我順著這個(gè)方法尋找該方法被調(diào)用的地方瓷们;
public void contextInitialized(ServletContext context) {
try {
initEurekaEnvironment();
initEurekaServerContext();
context.setAttribute(EurekaServerContext.class.getName(), this.serverContext);
}
catch (Throwable e) {
log.error("Cannot bootstrap eureka server :", e);
throw new RuntimeException("Cannot bootstrap eureka server :", e);
}
}
【分析二】:接著發(fā)現(xiàn) contextInitialized 這個(gè)方法里面調(diào)用了 initEurekaEnvironment 方法盟猖,接著我們?cè)偻蠈訉ふ冶徽{(diào)用的地方;
【分析三】:接著我們看到 EurekaServerInitializerConfiguration 類中有個(gè) start 方法换棚,該方法創(chuàng)建了一個(gè)線程來后臺(tái)執(zhí)行 EurekaServer 的初始化流程式镐;
3.3 進(jìn)入 EurekaServerInitializerConfiguration 方法,看看這個(gè)所謂的 EurekaServer 初始化配置做了哪些事情固蚤?
@Override
public void start() { // 打上斷點(diǎn)
new Thread(new Runnable() {
@Override
public void run() {
try {
//TODO: is this class even needed now?
eurekaServerBootstrap.contextInitialized(EurekaServerInitializerConfiguration.this.servletContext);
log.info("Started Eureka Server");
publish(new EurekaRegistryAvailableEvent(getEurekaServerConfig()));
EurekaServerInitializerConfiguration.this.running = true;
publish(new EurekaServerStartedEvent(getEurekaServerConfig()));
}
catch (Exception ex) {
// Help!
log.error("Could not initialize Eureka servlet context", ex);
}
}
}).start();
}
【分析一】:看到 log.info("Started Eureka Server"); 這行代碼娘汞,相信大家已經(jīng)釋然了,這里就是所謂的啟動(dòng)了 EurekaServer 了夕玩,其實(shí)也就是
eurekaServerBootstrap.contextInitialized(EurekaServerInitializerConfiguration.this.servletContext) 初始化了一些我們未知的東西你弦;
【分析二】:當(dāng)打印完啟動(dòng)Eureka Server日志后,調(diào)用了兩次 publish 方法燎孟,該方法最終調(diào)用的是 this.applicationContext.publishEvent
(event) 方法禽作,目的是利用Spring中ApplicationContext對(duì)事件傳遞性質(zhì),事件發(fā)布者(applicationContext)來發(fā)布事件(event)揩页,但是缺少的是監(jiān)聽
者旷偿,其實(shí)你仔細(xì)搜索下代碼,發(fā)現(xiàn)好像沒有地方對(duì) EurekaServerStartedEvent爆侣、EurekaRegistryAvailableEvent 進(jìn)行監(jiān)聽萍程,奇了怪了,這是咋了呢兔仰?
【分析三】:然后找到 EurekaServerStartedEvent 所在的目錄下茫负,EurekaInstanceCanceledEvent、EurekaInstanceRegisteredEvent乎赴、
EurekaInstanceRenewedEvent忍法、EurekaRegistryAvailableEvent潮尝、EurekaServerStartedEvent 有這么幾個(gè)事件的類,服務(wù)下線事件饿序、服務(wù)注冊(cè)事
件勉失、服務(wù)續(xù)約事件、注冊(cè)中心啟動(dòng)事件嗤堰、Eureka Server啟動(dòng)事件戴质,這么幾個(gè)事件都沒有被監(jiān)聽度宦,那么我們是不是給添加上監(jiān)聽踢匣,是不是就可以了呢?像這樣
@EventListener public void listen(EurekaInstanceCanceledEvent event) { 戈抄。离唬。。處下線邏輯 }划鸽,添加 EventListener 監(jiān)聽注解输莺,就可
以在我們自己的代碼邏輯中收到這個(gè)事件的回調(diào)了,所以想想SpringCloud還是挺機(jī)制的裸诽,提供回調(diào)接口讓我們自己實(shí)現(xiàn)自己的業(yè)務(wù)邏輯嫂用,真心不錯(cuò);
【分析四】:那么反過來想想丈冬,為啥會(huì)無緣無故 start 方法就被調(diào)用了呢嘱函?那么反向繼續(xù)向上找調(diào)用 start 方法的地方,結(jié)果找到了
DefaultLifecycleProcessor類的doStart方法調(diào)用了 bean.start(); 這么一段代碼埂蕊;
3.4 進(jìn)入 DefaultLifecycleProcessor 類看看往弓,這個(gè) EurekaServerInitializerConfiguration.start 方法是如何被觸發(fā)的?
private void doStart(Map<String, ? extends Lifecycle> lifecycleBeans, String beanName, boolean autoStartupOnly) {
// 打上斷點(diǎn)
Lifecycle bean = lifecycleBeans.remove(beanName);
if (bean != null && !this.equals(bean)) {
String[] dependenciesForBean = this.beanFactory.getDependenciesForBean(beanName);
for (String dependency : dependenciesForBean) {
doStart(lifecycleBeans, dependency, autoStartupOnly);
}
if (!bean.isRunning() &&
(!autoStartupOnly || !(bean instanceof SmartLifecycle) || ((SmartLifecycle) bean).isAutoStartup())) {
if (logger.isDebugEnabled()) {
logger.debug("Starting bean '" + beanName + "' of type [" + bean.getClass() + "]");
}
try {
bean.start();
}
catch (Throwable ex) {
throw new ApplicationContextException("Failed to start bean '" + beanName + "'", ex);
}
if (logger.isDebugEnabled()) {
logger.debug("Successfully started bean '" + beanName + "'");
}
}
}
}
【分析一】:看到在 bean.isRunning 等一系列狀態(tài)的判斷下才去調(diào)用 bean.start() 方法的蓄氧,我們?cè)偻蠈ふ冶徽{(diào)用地方函似;
public void start() {
// 打上斷點(diǎn)
if (this.members.isEmpty()) {
return;
}
if (logger.isInfoEnabled()) {
logger.info("Starting beans in phase " + this.phase);
}
Collections.sort(this.members);
for (LifecycleGroupMember member : this.members) {
if (this.lifecycleBeans.containsKey(member.name)) {
doStart(this.lifecycleBeans, member.name, this.autoStartupOnly);
}
}
}
【分析二】:該類是DefaultLifecycleProcessor中內(nèi)部類LifecycleGroup的一個(gè)方法,再往上尋找調(diào)用方喉童;
private void startBeans(boolean autoStartupOnly) {
Map<String, Lifecycle> lifecycleBeans = getLifecycleBeans();
Map<Integer, LifecycleGroup> phases = new HashMap<Integer, LifecycleGroup>();
for (Map.Entry<String, ? extends Lifecycle> entry : lifecycleBeans.entrySet()) {
Lifecycle bean = entry.getValue();
if (!autoStartupOnly || (bean instanceof SmartLifecycle && ((SmartLifecycle) bean).isAutoStartup())) {
int phase = getPhase(bean);
LifecycleGroup group = phases.get(phase);
if (group == null) {
group = new LifecycleGroup(phase, this.timeoutPerShutdownPhase, lifecycleBeans, autoStartupOnly);
phases.put(phase, group);
}
group.add(entry.getKey(), bean);
}
}
if (phases.size() > 0) {
List<Integer> keys = new ArrayList<Integer>(phases.keySet());
Collections.sort(keys);
for (Integer key : keys) {
phases.get(key).start();
}
}
}
【分析三】:startBeans 屬于 DefaultLifecycleProcessor 類的一個(gè)私有方法撇寞,startBeans 方法第一行就是獲取 getLifecycleBeans() 生命周期
Bean對(duì)象,由此可見似乎 Eureka Server 之所以會(huì)被啟動(dòng)堂氯,是不是實(shí)現(xiàn)了某個(gè)接口或者重寫了某個(gè)方法重抖,才會(huì)導(dǎo)致由于容易在初始化的過程中因調(diào)用某些特
殊方法或者某些類才啟動(dòng)的,因此我們回頭去看看 EurekaServerInitializerConfiguration 這個(gè)類祖灰;
【分析四】:結(jié)果發(fā)現(xiàn) EurekaServerInitializerConfiguration 這個(gè)類實(shí)現(xiàn)了 SmartLifecycle 這么樣的一個(gè)接口钟沛,而 SmartLifecycle 接口又繼
承了 Lifecycle 生命周期接口類,所以真想已經(jīng)重見天日了局扶,原來是實(shí)現(xiàn)了 Lifecycle 這樣的一個(gè)接口恨统,然后實(shí)現(xiàn)了 start 方法叁扫,因此 Eureka
Server 就這么稀里糊涂的就被莫名其妙的啟動(dòng)起來了?
3.5 到這里難道就真的完了么畜埋?難道Eureka Server啟動(dòng)就干這么點(diǎn)點(diǎn)事情莫绣?不可能吧?
【分析一】:我們之前僅僅只是通過了日志來逆向分析悠鞍,但是我們是不是忘了我們本應(yīng)該標(biāo)志是Eureka Server的這個(gè)注解了呢对室?沒錯(cuò),我們?cè)诜治龅倪^程中
已經(jīng)將 @EnableEurekaServer 這個(gè)注解遺忘了咖祭,那么我們現(xiàn)在先回到這個(gè)注解類來看看掩宜;
3.6 進(jìn)入 EnableEurekaServer 類,看看究竟干了啥么翰?
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Import(EurekaServerConfiguration.class)
public @interface EnableEurekaServer {
}
【分析一】:我們不難發(fā)現(xiàn) EnableEurekaServer 類上有個(gè) @Import 注解牺汤,引用了一個(gè) class 文件,由此我們進(jìn)入觀察浩嫌;
3.7 進(jìn)入 EurekaServerConfiguration 類看看檐迟,看名稱的話,理解的意思大概就是 Eureka Server 配置類码耐;
【分析一】:果不其然追迟,這個(gè)類有很多 @Bean、@Configuration 注解過的方法骚腥,那是不是我們可以認(rèn)為剛才 3.1~3.4 的推論是不是就是由于被實(shí)例化了這么一個(gè) Bean敦间,然后就慢慢的調(diào)用到了 start 方法了呢?
【分析二】:搜索 “Bootstrap” 字樣桦沉,還真發(fā)現(xiàn)了有這么一個(gè)方法每瞒;
@Bean
public EurekaServerBootstrap eurekaServerBootstrap(PeerAwareInstanceRegistry registry,
EurekaServerContext serverContext) {
return new EurekaServerBootstrap(this.applicationInfoManager,
this.eurekaClientConfig, this.eurekaServerConfig, registry,
serverContext);
}
【分析三】:既然有這么一個(gè) Bean,那么是不是和剛開始順著日志逆向分析也是有一定道理的纯露,沒有這么一個(gè)Bean的存在剿骨,那么 DefaultLifecycleProcessor.startBeans 方法中 getLifecycleBeans 的這個(gè)也就沒那么順暢被找到了呢?不過我的猜想是這樣的埠褪,本人沒有將源碼下載下來浓利,將 eurekaServerBootstrap 方法中的 @Bean 注解注釋掉試試,不過推理起來也八九不離十钞速,這個(gè)疑問懸念就留給大家嘗試嘗試吧贷掖;
【分析四】:既然找到了一個(gè) @Bean 注解過的方法,那我們?cè)僬艺移渌囊恍┍蛔⒔膺^的方法渴语,比如一些通用全局用的類似詞眼苹威,比如 Context,Bean驾凶,Init牙甫、Server 之類的掷酗;
@Bean
public EurekaServerContext eurekaServerContext(ServerCodecs serverCodecs,
PeerAwareInstanceRegistry registry, PeerEurekaNodes peerEurekaNodes) {
return new DefaultEurekaServerContext(this.eurekaServerConfig, serverCodecs,
registry, peerEurekaNodes, this.applicationInfoManager);
}
@Bean
public PeerEurekaNodes peerEurekaNodes(PeerAwareInstanceRegistry registry,
ServerCodecs serverCodecs) {
return new PeerEurekaNodes(registry, this.eurekaServerConfig,
this.eurekaClientConfig, serverCodecs, this.applicationInfoManager);
}
@Bean
public PeerAwareInstanceRegistry peerAwareInstanceRegistry(
ServerCodecs serverCodecs) {
this.eurekaClient.getApplications(); // force initialization
return new InstanceRegistry(this.eurekaServerConfig, this.eurekaClientConfig,
serverCodecs, this.eurekaClient,
this.instanceRegistryProperties.getExpectedNumberOfRenewsPerMin(),
this.instanceRegistryProperties.getDefaultOpenForTrafficCount());
}
@Bean
@ConditionalOnProperty(prefix = "eureka.dashboard", name = "enabled", matchIfMissing = true)
public EurekaController eurekaController() {
return new EurekaController(this.applicationInfoManager);
}
【分析五】:DefaultEurekaServerContext.initialize 初始化了一些東西,現(xiàn)在還不知道干啥用的窟哺,先放這里泻轰,打上斷點(diǎn);
【分析六】:PeerEurekaNodes.start 方法且轨,又是一個(gè) start 方法浮声,但是該類沒有實(shí)現(xiàn)任何類,姑且先放這里旋奢,打上斷點(diǎn)泳挥;
【分析七】:InstanceRegistry.register 方法,而且還有幾個(gè)呢黄绩,可能是客戶端注冊(cè)用的羡洁,也先放這里玷过,都打上斷點(diǎn)爽丹,或者將 這個(gè)類的所有方法都斷點(diǎn)上,斷點(diǎn)打完后發(fā)現(xiàn)有注冊(cè)的辛蚊,有續(xù)約的粤蝎,有注銷的;
【分析八】:打完這些斷點(diǎn)后袋马,感覺沒有思路了初澎,索性就斷點(diǎn)跑一把,看看有什么新的發(fā)現(xiàn)點(diǎn)虑凛;
3.8 停止服務(wù)碑宴,Debug 跑一下 springms-discovery-eureka 代碼;
【分析一】:DefaultEurekaServerContext.initialize 方法被調(diào)用了桑谍,證實(shí)了剛才想法延柠,EurekaServerConfiguration 不是白寫的,還是有它的作用的锣披;
@PostConstruct
@Override
public void initialize() throws Exception {
logger.info("Initializing ...");
peerEurekaNodes.start();
registry.init(peerEurekaNodes);
logger.info("Initialized");
}
【分析二】:進(jìn)入 initialize 方法中 peerEurekaNodes.start();
public void start() {
taskExecutor = Executors.newSingleThreadScheduledExecutor(
new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r, "Eureka-PeerNodesUpdater");
thread.setDaemon(true);
return thread;
}
}
);
try {
updatePeerEurekaNodes(resolvePeerUrls());
Runnable peersUpdateTask = new Runnable() {
@Override
public void run() {
try {
updatePeerEurekaNodes(resolvePeerUrls());
} catch (Throwable e) {
logger.error("Cannot update the replica Nodes", e);
}
}
};
// 注釋:間隔 600000 毫秒贞间,即 10分鐘 間隔執(zhí)行一次服務(wù)集群數(shù)據(jù)同步;
taskExecutor.scheduleWithFixedDelay(
peersUpdateTask,
serverConfig.getPeerEurekaNodesUpdateIntervalMs(),
serverConfig.getPeerEurekaNodesUpdateIntervalMs(),
TimeUnit.MILLISECONDS
);
} catch (Exception e) {
throw new IllegalStateException(e);
}
for (PeerEurekaNode node : peerEurekaNodes) {
logger.info("Replica node URL: " + node.getServiceUrl());
}
}
【分析三】: start 方法中會(huì)看到一個(gè)定時(shí)調(diào)度的任務(wù)雹仿,updatePeerEurekaNodes(resolvePeerUrls()); 間隔 600000 毫秒增热,即 10分鐘 間隔執(zhí)行一次服務(wù)集群數(shù)據(jù)同步;
【分析四】: 然后斷點(diǎn)放走放下走胧辽,進(jìn)入 initialize 方法中 registry.init(peerEurekaNodes);
@Override
public void init(PeerEurekaNodes peerEurekaNodes) throws Exception {
this.numberOfReplicationsLastMin.start();
this.peerEurekaNodes = peerEurekaNodes;
// 注釋:初始化 Eureka Server 響應(yīng)緩存峻仇,默認(rèn)緩存時(shí)間為30s
initializedResponseCache();
// 注釋:定時(shí)任務(wù),多久重置一下心跳閾值邑商,900000 毫秒摄咆,即 15分鐘 的間隔時(shí)間帆调,會(huì)重置心跳閾值
scheduleRenewalThresholdUpdateTask();
// 注釋:初始化遠(yuǎn)端注冊(cè)
initRemoteRegionRegistry();
try {
Monitors.registerObject(this);
} catch (Throwable e) {
logger.warn("Cannot register the JMX monitor for the InstanceRegistry :", e);
}
}
【分析五】: 緩存也配置好了,定時(shí)任務(wù)也配置好了豆同,似乎應(yīng)該沒啥了番刊,那么我們把斷點(diǎn)放開,看看下一步會(huì)走到哪里影锈?
3.9 EurekaServerInitializerConfiguration.start 也進(jìn)斷點(diǎn)了芹务。
【分析一】:先是 DefaultLifecycleProcessor.doStart 方法進(jìn)斷點(diǎn),然后才是 EurekaServerInitializerConfiguration.start 方法進(jìn)斷點(diǎn)鸭廷;
【分析二】:再一次證明剛剛的逆向分析僅僅只是缺了個(gè)從頭EnableEurekaServer分析罷了枣抱,但是最終方法論分析思路還是對(duì)的,由于開始分析過這里辆床,然而我們就跳過佳晶,繼續(xù)放開斷點(diǎn)向后繼續(xù)看看;
3.10 InstanceRegistry.openForTraffic 也進(jìn)斷點(diǎn)了讼载。
【分析一】:這不就是我們剛才在 “步驟3.7之分析七” 打的斷點(diǎn)么轿秧?看下堆棧信息,正是 “步驟3.2之分析一” 中 initEurekaServerContext 方法中有
這么一句 this.registry.openForTraffic(this.applicationInfoManager, registryCount); 調(diào)用到了咨堤,因果輪回菇篡,代碼千變?nèi)f化,打上斷點(diǎn)還有有好處的一喘,結(jié)果還是回到了開始日志逆向分析的地方驱还。
【分析二】:進(jìn)入 super.openForTraffic 方法;
@Override
public void openForTraffic(ApplicationInfoManager applicationInfoManager, int count) {
// Renewals happen every 30 seconds and for a minute it should be a factor of 2.
// 注釋:每30秒續(xù)約一次凸克,那么每分鐘續(xù)約就是2次议蟆,所以才是 count * 2 的結(jié)果;
this.expectedNumberOfRenewsPerMin = count * 2;
this.numberOfRenewsPerMinThreshold =
(int) (this.expectedNumberOfRenewsPerMin * serverConfig.getRenewalPercentThreshold());
logger.info("Got " + count + " instances from neighboring DS node");
logger.info("Renew threshold is: " + numberOfRenewsPerMinThreshold);
this.startupTime = System.currentTimeMillis();
if (count > 0) {
this.peerInstancesTransferEmptyOnStartup = false;
}
DataCenterInfo.Name selfName = applicationInfoManager.getInfo().getDataCenterInfo().getName();
boolean isAws = Name.Amazon == selfName;
if (isAws && serverConfig.shouldPrimeAwsReplicaConnections()) {
logger.info("Priming AWS connections for all replicas..");
primeAwsReplicas(applicationInfoManager);
}
logger.info("Changing status to UP");
// 注釋:修改 Eureka Server 為上電狀態(tài)萎战,就是說設(shè)置 Eureka Server 已經(jīng)處于活躍狀態(tài)了咐容,那就是意味著 EurekaServer 基本上說可以正常使用了;
applicationInfoManager.setInstanceStatus(InstanceStatus.UP);
// 注釋:定時(shí)任務(wù)撞鹉,60000 毫秒疟丙,即 1分鐘 的間隔時(shí)間,Eureke Server定期進(jìn)行失效節(jié)點(diǎn)的清理
super.postInit();
}
【分析三】:這里主要設(shè)置了服務(wù)狀態(tài)鸟雏,以及開啟了定時(shí)清理失效節(jié)點(diǎn)的定時(shí)任務(wù)享郊,每分鐘掃描一次;
3.11 繼續(xù)放開斷點(diǎn)孝鹊,來到了日志打印 “main] c.n.e.EurekaDiscoveryClientConfiguration : Updating port to 8761” 的EurekaDiscoveryClientConfiguration 類中 onApplicationEvent 方法炊琉。
@EventListener(EmbeddedServletContainerInitializedEvent.class)
public void onApplicationEvent(EmbeddedServletContainerInitializedEvent event) {
// TODO: take SSL into account when Spring Boot 1.2 is available
int localPort = event.getEmbeddedServletContainer().getPort();
if (this.port.get() == 0) {
log.info("Updating port to " + localPort);
this.port.compareAndSet(0, localPort);
start();
}
}
【分析一】:設(shè)置端口,當(dāng)看到 Updating port to 8761 這樣的日志打印出來的話,說明 Eureka Server 整個(gè)啟動(dòng)也就差不多Over了√洌現(xiàn)在回頭看看锰悼,
發(fā)現(xiàn)分析了不少的方法和流程,有種感覺被掏空的感覺了团赏。
3.12 總結(jié) EurekaServer 啟動(dòng)時(shí)候大概干了哪些事情箕般?
1、初始化Eureka環(huán)境舔清,Eureka上下文丝里;
2、初始化EurekaServer的緩存
3体谒、啟動(dòng)了一些定時(shí)任務(wù)杯聚,比如充值心跳閾值定時(shí)任務(wù),清理失效節(jié)點(diǎn)定時(shí)任務(wù)抒痒;
4幌绍、更新EurekaServer上電狀態(tài),更新EurekaServer端口故响;
雖然我從列舉的流程里面大概總結(jié)了這么幾點(diǎn)傀广,但是還是有些是我沒關(guān)注到的,如果大家有關(guān)注到的被去,可以和我共同討論分析分析主儡。
四奖唯、EurekaServer 處理服務(wù)注冊(cè)惨缆、集群數(shù)據(jù)復(fù)制
4.1 EurekaClient 是如何注冊(cè)到 EurekaServer 的?
【分析一】:由于我們剛才在 org.springframework.cloud.netflix.eureka.server.InstanceRegistry 的每個(gè)方法都打了一個(gè)斷點(diǎn)丰捷,而且現(xiàn)在
EurekaServer 已經(jīng)處于 Debug 運(yùn)行狀態(tài)坯墨,那么我們就隨便找一個(gè)被 @EnableEurekaClient 的微服務(wù)啟動(dòng)試試,要么就拿 springms-provider-user
微服務(wù)來試試吧病往,直接 Run捣染。
【分析二】:猜測(cè),如果如我們分析所想停巷,當(dāng) springms-provider-user 啟動(dòng)后耍攘,就一定會(huì)調(diào)用注冊(cè)register方法,那么就接著往下看畔勤,拭目以待蕾各;
4.2 InstanceRegistry.register(final InstanceInfo info, final boolean isReplication) 方法進(jìn)斷點(diǎn)了。
【分析一】:由于 InstanceRegistry.register 是我們剛剛打斷點(diǎn)的地方庆揪,那么我們順著堆棧信息往上看式曲,原來是 ApplicationResource.addInstance 方法被調(diào)用了,那么我們就看看 addInstance 這個(gè)方法,并在 addInstance 這里打上斷點(diǎn)吝羞;接著我們重新殺死 springms-provider-user 服務(wù)兰伤,然后再重啟 springms-provider-user 服務(wù);
4.2 斷點(diǎn)再次來到了 ApplicationResource 類钧排,這個(gè)類呢敦腔,主要是處理接收 Http 的服務(wù)請(qǐng)求。
@POST
@Consumes({"application/json", "application/xml"})
public Response addInstance(InstanceInfo info,
@HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication) {
logger.debug("Registering instance {} (replication={})", info.getId(), isReplication);
// validate that the instanceinfo contains all the necessary required fields
if (isBlank(info.getId())) {
return Response.status(400).entity("Missing instanceId").build();
} else if (isBlank(info.getHostName())) {
return Response.status(400).entity("Missing hostname").build();
} else if (isBlank(info.getAppName())) {
return Response.status(400).entity("Missing appName").build();
} else if (!appName.equals(info.getAppName())) {
return Response.status(400).entity("Mismatched appName, expecting " + appName + " but was " + info.getAppName()).build();
} else if (info.getDataCenterInfo() == null) {
return Response.status(400).entity("Missing dataCenterInfo").build();
} else if (info.getDataCenterInfo().getName() == null) {
return Response.status(400).entity("Missing dataCenterInfo Name").build();
}
// handle cases where clients may be registering with bad DataCenterInfo with missing data
DataCenterInfo dataCenterInfo = info.getDataCenterInfo();
if (dataCenterInfo instanceof UniqueIdentifier) {
String dataCenterInfoId = ((UniqueIdentifier) dataCenterInfo).getId();
if (isBlank(dataCenterInfoId)) {
boolean experimental = "true".equalsIgnoreCase(serverConfig.getExperimental("registration.validation.dataCenterInfoId"));
if (experimental) {
String entity = "DataCenterInfo of type " + dataCenterInfo.getClass() + " must contain a valid id";
return Response.status(400).entity(entity).build();
} else if (dataCenterInfo instanceof AmazonInfo) {
AmazonInfo amazonInfo = (AmazonInfo) dataCenterInfo;
String effectiveId = amazonInfo.get(AmazonInfo.MetaDataKey.instanceId);
if (effectiveId == null) {
amazonInfo.getMetadata().put(AmazonInfo.MetaDataKey.instanceId.getName(), info.getId());
}
} else {
logger.warn("Registering DataCenterInfo of type {} without an appropriate id", dataCenterInfo.getClass());
}
}
}
registry.register(info, "true".equals(isReplication));
return Response.status(204).build(); // 204 to be backwards compatible
}
【分析一】:這里的寫法貌似看起來和我們之前 Controller 的 RESTFUL 寫法有點(diǎn)不一樣恨溜,仔細(xì)一看会烙,原來是Jersey RESTful 框架,是一個(gè)產(chǎn)品級(jí)的
RESTful service 和 client 框架筒捺。與Struts類似柏腻,它同樣可以和hibernate,spring框架整合。
【分析二】:緊接著系吭,我們看到 registry.register(info, "true".equals(isReplication)); 這么一段代碼五嫂,注冊(cè)啊,原來EurekaClient客戶端啟
動(dòng)后會(huì)調(diào)用會(huì)通過Http(s)請(qǐng)求肯尺,直接調(diào)到 ApplicationResource.addInstance 方法沃缘,那么總算明白了,只要是和注冊(cè)有關(guān)的则吟,都會(huì)調(diào)用這個(gè)方法槐臀。
【分析三】:接著我們深入 registry.register(info, "true".equals(isReplication)) 查看;
@Override
public void register(final InstanceInfo info, final boolean isReplication) {
handleRegistration(info, resolveInstanceLeaseDuration(info), isReplication);
super.register(info, isReplication);
}
【分析四】:看看 handleRegistration(info, resolveInstanceLeaseDuration(info), isReplication) 方法氓仲;
private void handleRegistration(InstanceInfo info, int leaseDuration,
boolean isReplication) {
log("register " + info.getAppName() + ", vip " + info.getVIPAddress()
+ ", leaseDuration " + leaseDuration + ", isReplication "
+ isReplication);
publishEvent(new EurekaInstanceRegisteredEvent(this, info, leaseDuration,
isReplication));
}
【分析五】:該方法僅僅只是打了一個(gè)日志水慨,然后通過 ApplicationContext 發(fā)布了一個(gè)事件 EurekaInstanceRegisteredEvent 服務(wù)注冊(cè)事件,正如
“步驟3.3之分析三” 所提到的敬扛,用戶可以給 EurekaInstanceRegisteredEvent 添加監(jiān)聽事件晰洒,那么用戶就可以在此刻實(shí)現(xiàn)自己想要的一些業(yè)務(wù)邏輯。
然后我們?cè)賮砜纯?super.register(info, isReplication) 方法啥箭,該方法是 InstanceRegistry 的父類 PeerAwareInstanceRegistryImpl 的方法谍珊。
4.3 進(jìn)入 PeerAwareInstanceRegistryImpl 類的 register(final InstanceInfo info, final boolean isReplication) 方法;
@Override
public void register(final InstanceInfo info, final boolean isReplication) {
// 注釋:續(xù)約時(shí)間急侥,默認(rèn)時(shí)間是常量值 90 秒
int leaseDuration = Lease.DEFAULT_DURATION_IN_SECS;
// 注釋:續(xù)約時(shí)間砌滞,當(dāng)然也可以從配置文件中取出來,所以說續(xù)約時(shí)間值也是可以讓我們自己自定義配置的
if (info.getLeaseInfo() != null && info.getLeaseInfo().getDurationInSecs() > 0) {
leaseDuration = info.getLeaseInfo().getDurationInSecs();
}
// 注釋:將注冊(cè)方的信息寫入 EurekaServer 的注冊(cè)表坏怪,父類為 AbstractInstanceRegistry
super.register(info, leaseDuration, isReplication);
// 注釋:EurekaServer 節(jié)點(diǎn)之間的數(shù)據(jù)同步贝润,復(fù)制到其他Peer
replicateToPeers(Action.Register, info.getAppName(), info.getId(), info, null, isReplication);
}
【分析一】:進(jìn)入 super.register(info, leaseDuration, isReplication) 看看是如何寫入 EurekaServer 的注冊(cè)表的,即進(jìn)入 AbstractInstanceRegistry.register(InstanceInfo registrant, int leaseDuration, boolean isReplication) 方法陕悬。
public void register(InstanceInfo registrant, int leaseDuration, boolean isReplication) {
try {
read.lock();
// 注釋:registry 這個(gè)變量题暖,就是我們所謂的注冊(cè)表,注冊(cè)表是保存在內(nèi)存中的;
Map<String, Lease<InstanceInfo>> gMap = registry.get(registrant.getAppName());
REGISTER.increment(isReplication);
if (gMap == null) {
final ConcurrentHashMap<String, Lease<InstanceInfo>> gNewMap = new ConcurrentHashMap<String, Lease<InstanceInfo>>();
gMap = registry.putIfAbsent(registrant.getAppName(), gNewMap);
if (gMap == null) {
gMap = gNewMap;
}
}
Lease<InstanceInfo> existingLease = gMap.get(registrant.getId());
// Retain the last dirty timestamp without overwriting it, if there is already a lease
if (existingLease != null && (existingLease.getHolder() != null)) {
Long existingLastDirtyTimestamp = existingLease.getHolder().getLastDirtyTimestamp();
Long registrationLastDirtyTimestamp = registrant.getLastDirtyTimestamp();
logger.debug("Existing lease found (existing={}, provided={}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp);
if (existingLastDirtyTimestamp > registrationLastDirtyTimestamp) {
logger.warn("There is an existing lease and the existing lease's dirty timestamp {} is greater" +
" than the one that is being registered {}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp);
logger.warn("Using the existing instanceInfo instead of the new instanceInfo as the registrant");
registrant = existingLease.getHolder();
}
} else {
// The lease does not exist and hence it is a new registration
synchronized (lock) {
if (this.expectedNumberOfRenewsPerMin > 0) {
// Since the client wants to cancel it, reduce the threshold
// (1
// for 30 seconds, 2 for a minute)
this.expectedNumberOfRenewsPerMin = this.expectedNumberOfRenewsPerMin + 2;
this.numberOfRenewsPerMinThreshold =
(int) (this.expectedNumberOfRenewsPerMin * serverConfig.getRenewalPercentThreshold());
}
}
logger.debug("No previous lease information found; it is new registration");
}
Lease<InstanceInfo> lease = new Lease<InstanceInfo>(registrant, leaseDuration);
if (existingLease != null) {
lease.setServiceUpTimestamp(existingLease.getServiceUpTimestamp());
}
gMap.put(registrant.getId(), lease);
synchronized (recentRegisteredQueue) {
recentRegisteredQueue.add(new Pair<Long, String>(
System.currentTimeMillis(),
registrant.getAppName() + "(" + registrant.getId() + ")"));
}
// This is where the initial state transfer of overridden status happens
if (!InstanceStatus.UNKNOWN.equals(registrant.getOverriddenStatus())) {
logger.debug("Found overridden status {} for instance {}. Checking to see if needs to be add to the "
+ "overrides", registrant.getOverriddenStatus(), registrant.getId());
if (!overriddenInstanceStatusMap.containsKey(registrant.getId())) {
logger.info("Not found overridden id {} and hence adding it", registrant.getId());
overriddenInstanceStatusMap.put(registrant.getId(), registrant.getOverriddenStatus());
}
}
InstanceStatus overriddenStatusFromMap = overriddenInstanceStatusMap.get(registrant.getId());
if (overriddenStatusFromMap != null) {
logger.info("Storing overridden status {} from map", overriddenStatusFromMap);
registrant.setOverriddenStatus(overriddenStatusFromMap);
}
// Set the status based on the overridden status rules
InstanceStatus overriddenInstanceStatus = getOverriddenInstanceStatus(registrant, existingLease, isReplication);
registrant.setStatusWithoutDirty(overriddenInstanceStatus);
// If the lease is registered with UP status, set lease service up timestamp
if (InstanceStatus.UP.equals(registrant.getStatus())) {
lease.serviceUp();
}
registrant.setActionType(ActionType.ADDED);
recentlyChangedQueue.add(new RecentlyChangedItem(lease));
registrant.setLastUpdatedTimestamp();
invalidateCache(registrant.getAppName(), registrant.getVIPAddress(), registrant.getSecureVipAddress());
logger.info("Registered instance {}/{} with status {} (replication={})",
registrant.getAppName(), registrant.getId(), registrant.getStatus(), isReplication);
} finally {
read.unlock();
}
}
【分析二】:發(fā)現(xiàn)這個(gè)方法有點(diǎn)長胧卤,大致閱讀唯绍,主要更新了注冊(cè)表的時(shí)間之外,還更新了緩存等其它東西枝誊,大家有興趣的可以深究閱讀該方法况芒;
4.4 跳出來我們接著看上面的 replicateToPeers(Action.Register, info.getAppName(), info.getId(), info, null, isReplication) 的這個(gè)方法。
private void replicateToPeers(Action action, String appName, String id,
InstanceInfo info /* optional */,
InstanceStatus newStatus /* optional */, boolean isReplication) {
Stopwatch tracer = action.getTimer().start();
try {
if (isReplication) {
numberOfReplicationsLastMin.increment();
}
// If it is a replication already, do not replicate again as this will create a poison replication
// 注釋:如果已經(jīng)復(fù)制過叶撒,就不再復(fù)制
if (peerEurekaNodes == Collections.EMPTY_LIST || isReplication) {
return;
}
// 遍歷Eureka Server集群中的所有節(jié)點(diǎn)绝骚,進(jìn)行復(fù)制操作
for (final PeerEurekaNode node : peerEurekaNodes.getPeerEurekaNodes()) {
// If the url represents this host, do not replicate to yourself.
if (peerEurekaNodes.isThisMyUrl(node.getServiceUrl())) {
continue;
}
// 沒有復(fù)制過,遍歷Eureka Server集群中的node節(jié)點(diǎn)祠够,依次操作压汪,包括取消、注冊(cè)古瓤、心跳止剖、狀態(tài)更新等。
replicateInstanceActionsToPeers(action, appName, id, info, newStatus, node);
}
} finally {
tracer.stop();
}
}
【分析一】:走到這里落君,我不難理解穿香,每當(dāng)有注冊(cè)請(qǐng)求,首先更新 EurekaServer 的注冊(cè)表绎速,然后再將信息同步到其它EurekaServer的節(jié)點(diǎn)上去皮获;
【分析二】:接下來我們看看 node 節(jié)點(diǎn)是如何進(jìn)行復(fù)制操作的,進(jìn)入 replicateInstanceActionsToPeers 方法纹冤。
private void replicateInstanceActionsToPeers(Action action, String appName,
String id, InstanceInfo info, InstanceStatus newStatus,
PeerEurekaNode node) {
try {
InstanceInfo infoFromRegistry = null;
CurrentRequestVersion.set(Version.V2);
switch (action) {
case Cancel:
node.cancel(appName, id);
break;
case Heartbeat:
InstanceStatus overriddenStatus = overriddenInstanceStatusMap.get(id);
infoFromRegistry = getInstanceByAppAndId(appName, id, false);
node.heartbeat(appName, id, infoFromRegistry, overriddenStatus, false);
break;
case Register:
node.register(info);
break;
case StatusUpdate:
infoFromRegistry = getInstanceByAppAndId(appName, id, false);
node.statusUpdate(appName, id, newStatus, infoFromRegistry);
break;
case DeleteStatusOverride:
infoFromRegistry = getInstanceByAppAndId(appName, id, false);
node.deleteStatusOverride(appName, id, infoFromRegistry);
break;
}
} catch (Throwable t) {
logger.error("Cannot replicate information to {} for action {}", node.getServiceUrl(), action.name(), t);
}
}
【分析三】:節(jié)點(diǎn)之間的復(fù)制狀態(tài)操作洒宝,都在這里體現(xiàn)的淋漓盡致,那么我們就拿 Register 類型 node.register(info) 來看赵哲,我們來看看 node 究竟是
如何做到同步信息的待德,進(jìn)入 node.register(info) 方法看看;
4.5 進(jìn)入 PeerEurekaNode.register(final InstanceInfo info) 方法枫夺,一窺究竟如何同步數(shù)據(jù)。
public void register(final InstanceInfo info) throws Exception {
// 注釋:任務(wù)過期時(shí)間給任務(wù)分發(fā)器處理绘闷,默認(rèn)時(shí)間偏移當(dāng)前時(shí)間 30秒
long expiryTime = System.currentTimeMillis() + getLeaseRenewalOf(info);
batchingDispatcher.process(
taskId("register", info),
new InstanceReplicationTask(targetHost, Action.Register, info, null, true) {
public EurekaHttpResponse<Void> execute() {
return replicationClient.register(info);
}
},
expiryTime
);
}
【分析一】:這里涉及到了 Eureka 的任務(wù)批處理橡庞,通常情況下Peer之間的同步需要調(diào)用多次,如果EurekaServer一多的話印蔗,那么將會(huì)有很多http請(qǐng)求扒最,所
以自然而然的孕育出了任務(wù)批處理,但是也在一定程度上導(dǎo)致了注冊(cè)和下線的一些延遲华嘹,突出優(yōu)勢(shì)的同時(shí)也勢(shì)必會(huì)造成一些劣勢(shì)吧趣,但是這些延遲情況還是能符合
常理在容忍范圍之內(nèi)的。
【分析二】:在 expiryTime 超時(shí)時(shí)間之內(nèi),批次處理要做的事情就是合并任務(wù)為一個(gè)List强挫,然后發(fā)送請(qǐng)求的時(shí)候岔霸,將這個(gè)批次List直接打包發(fā)送請(qǐng)求出去,這樣的話俯渤,在這個(gè)批次的List里面呆细,可能包含取消、注冊(cè)八匠、心跳絮爷、狀態(tài)等一系列狀態(tài)的集合List。
【分析三】:我們?cè)俳又丛创a梨树,batchingDispatcher.process 這么一調(diào)用坑夯,然后我們就直接看這個(gè) TaskDispatchers.createBatchingTaskDispatcher 方法。
public static <ID, T> TaskDispatcher<ID, T> createBatchingTaskDispatcher(String id,
int maxBufferSize,
int workloadSize,
int workerCount,
long maxBatchingDelay,
long congestionRetryDelayMs,
long networkFailureRetryMs,
TaskProcessor<T> taskProcessor) {
final AcceptorExecutor<ID, T> acceptorExecutor = new AcceptorExecutor<>(
id, maxBufferSize, workloadSize, maxBatchingDelay, congestionRetryDelayMs, networkFailureRetryMs
);
final TaskExecutors<ID, T> taskExecutor = TaskExecutors.batchExecutors(id, workerCount, taskProcessor, acceptorExecutor);
return new TaskDispatcher<ID, T>() {
@Override
public void process(ID id, T task, long expiryTime) {
acceptorExecutor.process(id, task, expiryTime);
}
@Override
public void shutdown() {
acceptorExecutor.shutdown();
taskExecutor.shutdown();
}
};
}
【分析四】:這里的 process 方法會(huì)將任務(wù)添加到隊(duì)列中抡四,有入隊(duì)列自然有出隊(duì)列渊涝,具體怎么取任務(wù),我就不一一給大家講解了床嫌,我就講講最后是怎么觸發(fā)任務(wù)的跨释。進(jìn)入 final TaskExecutors<ID, T> taskExecutor = TaskExecutors.batchExecutors(id, workerCount, taskProcessor, acceptorExecutor) 這句代碼的 TaskExecutors.batchExecutors 方法。
static <ID, T> TaskExecutors<ID, T> batchExecutors(final String name,
int workerCount,
final TaskProcessor<T> processor,
final AcceptorExecutor<ID, T> acceptorExecutor) {
final AtomicBoolean isShutdown = new AtomicBoolean();
final TaskExecutorMetrics metrics = new TaskExecutorMetrics(name);
return new TaskExecutors<>(new WorkerRunnableFactory<ID, T>() {
@Override
public WorkerRunnable<ID, T> create(int idx) {
return new BatchWorkerRunnable<>("TaskBatchingWorker-" +name + '-' + idx, isShutdown, metrics, processor, acceptorExecutor);
}
}, workerCount, isShutdown);
}
【分析五】:我們發(fā)現(xiàn) TaskExecutors 類中的 batchExecutors 這個(gè)靜態(tài)方法厌处,有個(gè) BatchWorkerRunnable 返回的實(shí)現(xiàn)類鳖谈,因此我們?cè)俅芜M(jìn)入 BatchWorkerRunnable 類看看究竟,而且既然是 Runnable,那么勢(shì)必會(huì)有 run 方法阔涉。
@Override
public void run() {
try {
while (!isShutdown.get()) {
// 注釋:獲取信號(hào)量釋放 batchWorkRequests.release()缆娃,返回任務(wù)集合列表
List<TaskHolder<ID, T>> holders = getWork();
metrics.registerExpiryTimes(holders);
List<T> tasks = getTasksOf(holders);
// 注釋:將批量任務(wù)打包請(qǐng)求Peer節(jié)點(diǎn)
ProcessingResult result = processor.process(tasks);
switch (result) {
case Success:
break;
case Congestion:
case TransientError:
taskDispatcher.reprocess(holders, result);
break;
case PermanentError:
logger.warn("Discarding {} tasks of {} due to permanent error", holders.size(), workerName);
}
metrics.registerTaskResult(result, tasks.size());
}
} catch (InterruptedException e) {
// Ignore
} catch (Throwable e) {
// Safe-guard, so we never exit this loop in an uncontrolled way.
logger.warn("Discovery WorkerThread error", e);
}
}
【分析六】:這就是我們 BatchWorkerRunnable 類的 run 方法,這里面首先要獲取信號(hào)量釋放瑰排,才能獲得任務(wù)集合贯要,一旦獲取到了任務(wù)集合的話,那么就直接調(diào)用 processor.process(tasks) 方法請(qǐng)求 Peer 節(jié)點(diǎn)同步數(shù)據(jù)椭住,接下來我們看看 ReplicationTaskProcessor.process 方法崇渗;
@Override
public ProcessingResult process(List<ReplicationTask> tasks) {
ReplicationList list = createReplicationListOf(tasks);
try {
// 注釋:這里通過 JerseyReplicationClient 客戶端對(duì)象直接發(fā)送list請(qǐng)求數(shù)據(jù)
EurekaHttpResponse<ReplicationListResponse> response = replicationClient.submitBatchUpdates(list);
int statusCode = response.getStatusCode();
if (!isSuccess(statusCode)) {
if (statusCode == 503) {
logger.warn("Server busy (503) HTTP status code received from the peer {}; rescheduling tasks after delay", peerId);
return ProcessingResult.Congestion;
} else {
// Unexpected error returned from the server. This should ideally never happen.
logger.error("Batch update failure with HTTP status code {}; discarding {} replication tasks", statusCode, tasks.size());
return ProcessingResult.PermanentError;
}
} else {
handleBatchResponse(tasks, response.getEntity().getResponseList());
}
} catch (Throwable e) {
if (isNetworkConnectException(e)) {
logNetworkErrorSample(null, e);
return ProcessingResult.TransientError;
} else {
logger.error("Not re-trying this exception because it does not seem to be a network exception", e);
return ProcessingResult.PermanentError;
}
}
return ProcessingResult.Success;
}
【分析七】:感覺快要見到真相了,所以我們迫不及待的進(jìn)入 JerseyReplicationClient.submitBatchUpdates(ReplicationList replicationList) 方法一窺究竟京郑。
@Override
public EurekaHttpResponse<ReplicationListResponse> submitBatchUpdates(ReplicationList replicationList) {
ClientResponse response = null;
try {
response = jerseyApacheClient.resource(serviceUrl)
// 注釋:這才是重點(diǎn)宅广,請(qǐng)求目的相對(duì)路徑,peerreplication/batch/
.path(PeerEurekaNode.BATCH_URL_PATH)
.accept(MediaType.APPLICATION_JSON_TYPE)
.type(MediaType.APPLICATION_JSON_TYPE)
.post(ClientResponse.class, replicationList);
if (!isSuccess(response.getStatus())) {
return anEurekaHttpResponse(response.getStatus(), ReplicationListResponse.class).build();
}
ReplicationListResponse batchResponse = response.getEntity(ReplicationListResponse.class);
return anEurekaHttpResponse(response.getStatus(), batchResponse).type(MediaType.APPLICATION_JSON_TYPE).build();
} finally {
if (response != null) {
response.close();
}
}
}
【分析八】:看到了相對(duì)路徑地址些举,我們搜索下"batch"這樣的字符串看看有沒有對(duì)應(yīng)的接收方法或者被@Path注解進(jìn)入的跟狱;在 eureka-core-1.4.12.jar 這個(gè)包下面,果然搜到到了 @Path("batch") 這樣的字樣户魏,直接進(jìn)入驶臊,發(fā)現(xiàn)這是 PeerReplicationResource 類的方法 batchReplication挪挤,我們進(jìn)入這方法看看。
@Path("batch")
@POST
public Response batchReplication(ReplicationList replicationList) {
try {
ReplicationListResponse batchResponse = new ReplicationListResponse();
// 注釋:這里將收到的任務(wù)列表关翎,依次循環(huán)解析處理扛门,主要核心方法在 dispatch 方法中。
for (ReplicationInstance instanceInfo : replicationList.getReplicationList()) {
try {
batchResponse.addResponse(dispatch(instanceInfo));
} catch (Exception e) {
batchResponse.addResponse(new ReplicationInstanceResponse(Status.INTERNAL_SERVER_ERROR.getStatusCode(), null));
logger.error(instanceInfo.getAction() + " request processing failed for batch item "
+ instanceInfo.getAppName() + '/' + instanceInfo.getId(), e);
}
}
return Response.ok(batchResponse).build();
} catch (Throwable e) {
logger.error("Cannot execute batch Request", e);
return Response.status(Status.INTERNAL_SERVER_ERROR).build();
}
}
【分析九】:看到了循環(huán)一次遍歷任務(wù)進(jìn)行處理笤休,不知不覺覺得心花怒放尖飞,勝利的重點(diǎn)馬上就要到來了,我們進(jìn)入 PeerReplicationResource.dispatch 方法看看店雅。
private ReplicationInstanceResponse dispatch(ReplicationInstance instanceInfo) {
ApplicationResource applicationResource = createApplicationResource(instanceInfo);
InstanceResource resource = createInstanceResource(instanceInfo, applicationResource);
String lastDirtyTimestamp = toString(instanceInfo.getLastDirtyTimestamp());
String overriddenStatus = toString(instanceInfo.getOverriddenStatus());
String instanceStatus = toString(instanceInfo.getStatus());
Builder singleResponseBuilder = new Builder();
switch (instanceInfo.getAction()) {
case Register:
singleResponseBuilder = handleRegister(instanceInfo, applicationResource);
break;
case Heartbeat:
singleResponseBuilder = handleHeartbeat(resource, lastDirtyTimestamp, overriddenStatus, instanceStatus);
break;
case Cancel:
singleResponseBuilder = handleCancel(resource);
break;
case StatusUpdate:
singleResponseBuilder = handleStatusUpdate(instanceInfo, resource);
break;
case DeleteStatusOverride:
singleResponseBuilder = handleDeleteStatusOverride(instanceInfo, resource);
break;
}
return singleResponseBuilder.build();
}
【分析十】:隨便抓一個(gè)類型政基,那我們也拿 Register 類型來看,進(jìn)入 PeerReplicationResource.handleRegister 看看闹啦。
private static Builder handleRegister(ReplicationInstance instanceInfo, ApplicationResource applicationResource) {
// 注釋:private static final String REPLICATION = "true"; 定義的一個(gè)常量值沮明,而且還是回調(diào) ApplicationResource.addInstance 方法
applicationResource.addInstance(instanceInfo.getInstanceInfo(), REPLICATION);
return new Builder().setStatusCode(Status.OK.getStatusCode());
}
【分析十一】:Peer節(jié)點(diǎn)的同步旅程終于結(jié)束了,最終又回調(diào)到了 ApplicationResource.addInstance 這個(gè)方法窍奋,這個(gè)方法在最終是EurekaClient啟動(dòng)后注冊(cè)調(diào)用的方法荐健,然而Peer節(jié)點(diǎn)的信息同步也調(diào)用了這個(gè)方法,僅僅只是通過一個(gè)變量 isReplication 為true還是false來判斷是否是節(jié)點(diǎn)復(fù)制琳袄。剩下的ApplicationResource.addInstance流程前面已經(jīng)提到過了江场,相信大家已經(jīng)明白了注冊(cè)的流程是如何扭轉(zhuǎn)的,包括批量任務(wù)是如何處理EurekaServer節(jié)點(diǎn)之間的信息同步的了窖逗。
五址否、EurekaClient 啟動(dòng)流程分析
詳見 SpringCloud(第 050 篇)Netflix Eureka 源碼深入剖析(下)
六、下載地址
https://gitee.com/ylimhhmily/SpringCloudTutorial.git
SpringCloudTutorial交流QQ群: 235322432
SpringCloudTutorial交流微信群: 微信溝通群二維碼圖片鏈接
歡迎關(guān)注碎紊,您的肯定是對(duì)我最大的支持!!!