閱讀前的思考
使用netflix eureka做服務(wù)管理時牛曹,若你只停留在對eureka的概念理解和使用層面,那么你面試時會得到面試官的靈魂拷問醇滥,例如:
1)eureka將服務(wù)注冊信息存放在哪里黎比?服務(wù)注冊信息都有哪些內(nèi)容?
2)eureka如何做到高可用鸳玩?底層的通信機制是什么阅虫?
3)心跳機制到底發(fā)送些什么內(nèi)容,有了解嗎?
4)服務(wù)注冊列表是存在客戶端還是服務(wù)端怀喉?如果多復(fù)本數(shù)據(jù)不一致怎么處理书妻?
5)若網(wǎng)絡(luò)故障服務(wù)注冊失敗了,eureka是如何保證注冊成功的躬拢?
6)注冊躲履,同步,下線聊闯,剔除分別是怎么實現(xiàn)的工猜?
7)為什么剛啟動的服務(wù)沒有即時被eureka發(fā)現(xiàn)?對此你還遇到過哪些坑?
帶著這些問題或疑惑菱蔬,作者決定推出eureka源碼解讀系列篷帅,從眾所周知的Eureka功能著手,對register,renew,heartbeat,fetch,剔除/關(guān)閉,數(shù)據(jù)復(fù)制等進行源碼解讀拴泌,意在深入理解eureka功能魏身。
Register Client 端實現(xiàn)原理
服務(wù)注冊先由eureka client端發(fā)起請求,具體代碼定位于eureka-client-1.9.25.jar com.netflix.discovery包下蚪腐。
DiscoveryCilent類的register():boolean方法是服務(wù)注冊的實現(xiàn)箭昵。代碼如下:
/**
* Register with the eureka service by making the appropriate REST call.
*/
boolean register() throws Throwable {
logger.info(PREFIX + "{}: registering service...", appPathIdentifier);
EurekaHttpResponse<Void> httpResponse;
try {
httpResponse = eurekaTransport.registrationClient.register(instanceInfo);
} catch (Exception e) {
logger.warn(PREFIX + "{} - registration failed {}", appPathIdentifier, e.getMessage(), e);
throw e;
}
if (logger.isInfoEnabled()) {
logger.info(PREFIX + "{} - registration status: {}", appPathIdentifier, httpResponse.getStatusCode());
}
return httpResponse.getStatusCode() == Status.NO_CONTENT.getStatusCode();
}
看到httpResponse了說明eureka采用http方式進行服務(wù)通信,將服務(wù)注冊的信息封裝到InstanceInfo類中回季。首先來看一下InstanceInfo類包含哪些內(nèi)容家制?
在設(shè)計上com.netfilx.appinfo.InstanceInfo定義了很多屬性,服務(wù)實例相關(guān)的有instanceId泡一,appGroupName颤殴,ipAddr,port鼻忠,securePort涵但,homePageUrl,statusPageUrl等。封裝后的實例通過eurekaTransport.registrationClient具體實現(xiàn)贤笆。
EurekaTransport為DiscoveryClient的靜態(tài)內(nèi)部類蝇棉,源碼中集成了EurekaHttpClient,EurekaHttpClientFactory芥永,TransportClientFactory篡殷,從設(shè)計上可以看出該類只是個工具類,具體實現(xiàn)由EurekaHttpClient接口來實現(xiàn)埋涧。
展開EurekaHttpClient接口的register()板辽,實現(xiàn)類分別為EurekaHttpClientDecorator,AbstractJerseyEurekaHttpClient.其中EurekaHttpClientDecorator只是進行了封裝和定義,具體實現(xiàn)在AbstractJerseyEurekaHttpClient中
public EurekaHttpResponse<Void> register(InstanceInfo info) {
String urlPath = "apps/" + info.getAppName();
ClientResponse response = null;
try {
Builder resourceBuilder = jerseyClient.resource(serviceUrl).path(urlPath).getRequestBuilder();
addExtraHeaders(resourceBuilder);
response = resourceBuilder
.header("Accept-Encoding", "gzip")
.type(MediaType.APPLICATION_JSON_TYPE)
.accept(MediaType.APPLICATION_JSON)
.post(ClientResponse.class, info);
return anEurekaHttpResponse(response.getStatus()).headers(headersOf(response)).build();
} finally {
if (logger.isDebugEnabled()) {
logger.debug("Jersey HTTP POST {}/{} with instance {}; statusCode={}", serviceUrl, urlPath, info.getId(),
response == null ? "N/A" : response.getStatus());
}
if (response != null) {
response.close();
}
}
}
Http請求由Jersey(RESTFUL請求服務(wù)JAVA框架)來完成棘催。如果是自己實現(xiàn)http通信劲弦,完全可以選擇apache httpClient,OKHttp或者自定義封裝http服務(wù),如何使用http通信不是本文討論的重點醇坝。需要關(guān)注的是http請求發(fā)送方式是post,采用json的格式進行發(fā)送和接收邑跪,使用常用的gzip進行編碼壓縮傳輸。
client端是如何創(chuàng)建實例并向服務(wù)端發(fā)起請求的呼猪?
理解了eureka client的register實現(xiàn)后画畅,接下來的問題是如何調(diào)用DiscoveryClient的register方法。怎么用宋距?何時發(fā)送給服務(wù)器端轴踱?
- DiscoveryManager進行客戶端初始化
eureka-client-1.9.25.jar com.netflix.discovery下有一個DiscoveryManager類,該類被定義為@Deprecated說明過時官方不推薦使用谚赎。但仍然可以看到它聚合了DiscoveryClinet,EurekaInstanceConfig,EurekaClientConfig配置項淫僻。在initComponent方法內(nèi)進行了初始化。
- EurekaBootStrap進行客戶端初始化
EurekaBootStrap位于eureka-core-1.9.25.jar com.netflix.eureka包下壶唤,它是server和client端的啟動項雳灵。實現(xiàn)了ServletContextListener接口,說明在web服務(wù)啟動時會去做初始化闸盔。
contextInitialized(ServletContextEventevent)方法中調(diào)用了initEurekaServerContext(),里面有new DiscoveryClient(applicationInfoManager,eurekaClientConfig)
查看DiscoveryClient的構(gòu)造函數(shù)细办,scheduler 構(gòu)建為定時任務(wù)執(zhí)行者,heartbeatExecutor 實例為心跳檢測的Executor蕾殴,cacheRefreshExecutor實例為刷新服務(wù)注冊表的Executor 這三個線程池都是守護式線程。
initScheduledTasks()會對如上Executor進行任務(wù)設(shè)置岛啸,方法的最后調(diào)用了
instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds());
public void start(int initialDelayMs) {
if (started.compareAndSet(false, true)) {
instanceInfo.setIsDirty(); // for initial register
//這里注冊調(diào)用有40秒的延時
Future next = scheduler.schedule(this, initialDelayMs, TimeUnit.SECONDS);
scheduledPeriodicRef.set(next);
}
}
注:默認新服務(wù)注冊到eureka服務(wù)器要40秒的延時.
InstanceInfoReplicator實現(xiàn)了runnable接口钓觉,查看run方法代碼如下:
public void run() {
try {
discoveryClient.refreshInstanceInfo();
Long dirtyTimestamp = instanceInfo.isDirtyWithTime();
if (dirtyTimestamp != null) {
discoveryClient.register();
instanceInfo.unsetIsDirty(dirtyTimestamp);
}
} catch (Throwable t) {
logger.warn("There was a problem with the instance info replicator", t);
} finally {
Future next = scheduler.schedule(this, replicationIntervalSeconds, TimeUnit.SECONDS);
scheduledPeriodicRef.set(next);
}
}
總結(jié)一下調(diào)用流程:
EurekaBootStrap.initEurekaServerContext方法實例化DiscoveryClient -> DiscoveryClient構(gòu)造方法 -> initScheduledTasks()-> 創(chuàng)建InstanceInfoReplicator實例(Runnable) -> 啟動run方法 -> DiscoveryClient.register()
Server 端實現(xiàn)原理
Eureka服務(wù)端需先從EurekaBootStrap類切入。代碼定位eureka-core-1.9.25.jar com.netflix.eureka.EurekaBootStrap坚踩。
通常以BootStrap命名的類一般為服務(wù)啟動類荡灾,Eureka也遵循這個設(shè)計原則。它實現(xiàn)了ServletContextListener接口,用于監(jiān)聽ServletContext對象的生命周期即監(jiān)聽整個web應(yīng)用的生命周期批幌。contextInitialized(ServletContextEvent event)具體實現(xiàn)如下:
/**
* Initializes Eureka, including syncing up with other Eureka peers and publishing the registry.
*
* @see
* javax.servlet.ServletContextListener#contextInitialized(javax.servlet.ServletContextEvent)
*/
@Override
public void contextInitialized(ServletContextEvent event) {
try {
initEurekaEnvironment();
initEurekaServerContext();
ServletContext sc = event.getServletContext();
sc.setAttribute(EurekaServerContext.class.getName(), serverContext);
} catch (Throwable e) {
logger.error("Cannot bootstrap eureka server :", e);
throw new RuntimeException("Cannot bootstrap eureka server :", e);
}
}
有兩個主要init方法础锐,跟蹤代碼發(fā)現(xiàn)處理內(nèi)容繁多,
initEurekaServerContext()方法里有一段代碼:
PeerAwareInstanceRegistry registry;
if (isAws(applicationInfoManager.getInfo())) {
……
} else {
registry = new PeerAwareInstanceRegistryImpl(
eurekaServerConfig,
eurekaClient.getEurekaClientConfig(),
serverCodecs,
eurekaClient
);
}
PeerAwareInstanceRegistryImpl中有一個register方法皆警,實現(xiàn)自AbstractInstanceRegistry類。其中InstanceRegistry為它的接口截粗,InstanceRegistry接口本身實現(xiàn)LeaseManager<InstanceInfo>,LookupService<String>的多繼承信姓。
AbstractInstanceRegistry.register內(nèi)容如下:
/**
*Registers a new instance with a given duration.
*
*@see com.netflix.eureka.lease.LeaseManager#register(java.lang.Object,int,boolean)
*/
public void register(InstanceInforegistrant,int leaseDuration,boolean isReplication){
try{
//獲取讀鎖,即讀取操作不受阻塞绸罗,寫操作會阻塞意推。
read.lock();
//gMap是一個CurrentHashMap
Map<String,Lease<InstanceInfo>>gMap=registry.get(registrant.getAppName());
//EurekaMontior計數(shù)器
REGISTER.increment(isReplication);
//InstanceInfo封裝成一個Lease對象,存儲到registry中珊蟀。registry結(jié)構(gòu)為ConcurrentHashMap<String,Map<String,Lease<InstanceInfo>>> registry
//注意此處gNewMap并沒有添加元素
if(gMap==null){
final ConcurrentHashMap<String,Lease<InstanceInfo>>gNewMap=new ConcurrentHashMap<String,Lease<InstanceInfo>>();
gMap=registry.putIfAbsent(registrant.getAppName(),gNewMap);
if(gMap==null){
gMap=gNewMap;
}
}
//判斷gMap中是否存在instanceId,如果不存在就設(shè)置
Lease<InstanceInfo>existingLease=gMap.get(registrant.getId());
if(existingLease!=null&&(existingLease.getHolder()!=null)){
LongexistingLastDirtyTimestamp=existingLease.getHolder().getLastDirtyTimestamp();
LongregistrationLastDirtyTimestamp=registrant.getLastDirtyTimestamp();
logger.debug("Existingleasefound(existing={},provided={}",existingLastDirtyTimestamp,registrationLastDirtyTimestamp);
if(existingLastDirtyTimestamp>registrationLastDirtyTimestamp){
logger.warn("Thereisanexistingleaseandtheexistinglease'sdirtytimestamp{}isgreater"+
"thantheonethatisbeingregistered{}",existingLastDirtyTimestamp,registrationLastDirtyTimestamp);
logger.warn("UsingtheexistinginstanceInfoinsteadofthenewinstanceInfoastheregistrant");
registrant=existingLease.getHolder();
}
}else{
//The lease does not exist and hence it is a new registration
synchronized(lock){
if(this.expectedNumberOfClientsSendingRenews>0){
this.expectedNumberOfClientsSendingRenews=this.expectedNumberOfClientsSendingRenews+1;
updateRenewsPerMinThreshold();
}
}
logger.debug("Nopreviousleaseinformationfound;itisnewregistration");
}
Lease<InstanceInfo> lease = new Lease<InstanceInfo>(registrant,leaseDuration);
if(existingLease != null){
lease.setServiceUpTimestamp(existingLease.getServiceUpTimestamp());
}
//此處進行了添加 instanceId作為key菊值,lease對象作為value寫入gMap。
gMap.put(registrant.getId(),lease);
………………
}finally{
read.unlock();
}
}
總結(jié):Eureka服務(wù)端將InstanceInfo封裝到一個CurrentHashMap<String, Map<String, Lease<InstanceInfo>>>中存儲育灸。
接下來的問題:Eureka server是如何接收并處理client端發(fā)送過來的請求腻窒?
Eureka server是一個web服務(wù),只要是web服務(wù)都需要web容器如tomcat,jetty,jboss等描扯。在Eureka源生項目中如何使用web容器呢定页?
查閱Netfilx的Eureka項目源碼發(fā)現(xiàn)一個web.xml配置文件 https://github.com/Netflix/eureka/blob/master/eureka-server/src/main/webapp/WEB-INF/web.xml
在沒有使用springboot自動裝配的情況下,我們看到了最原始的web開發(fā)配置绽诚。其中有一個ServletContainer的Filter典徊,引入com.sun.jersey依賴包查看
<dependency>
<groupId>com.sun.jersey</groupId>
<artifactId>jersey-servlet</artifactId>
<version>1.19</version>
</dependency>
public class ServletContainer extends HttpServlet implements Filter{
……
我們發(fā)現(xiàn)ServletContainer既是一個Servlet也是一個Filter。所以容易理解原生Eureka Web服務(wù)依然是一個熟悉的傳統(tǒng)web開發(fā)項目恩够。使用Servlet進行服務(wù)對外交互卒落。
ServletContainer作為Servlet容器提供服務(wù)交互,但具體處理邏輯肯定不在此類中蜂桶。我們看到web配置中init-param
<init-param>
<param-name>com.sun.jersey.config.property.packages</param-name>
<param-value>com.sun.jersey;com.netflix</param-value>
</init-param>
Servlet容器初始化時會掃描com.sum.jersey和com.netflix兩個包儡毕,主要是為解析對應(yīng)包中的注解。我們回到eureka項目源碼com.netfilx.eureka.resources.ApplicationsResource下發(fā)現(xiàn)了
@Path("/{version}/apps")
@Produces({"application/xml","application/json"})
public class ApplicationsResource{
……
進一步跟蹤方法getApplicationResource在ApplicationResource.class中有一個addInstance(InstanceInfo info,String isReplication)方法扑媚,代碼registry.register(info,"true".equals(isReplication));調(diào)用自PeerAwareInstanceRegistryImpl.register方法腰湾。
至此eureka服務(wù)注冊端流程梳理完畢。
總結(jié)一下:
Eureka server端接收clinet端請求處理邏輯:
web服務(wù)啟動 -> 掃描resources/ApplicationsResource -> 加載ApplicationResource對象 -> 調(diào)用addInstance -> 調(diào)用PeerAwareInstanceRegistry.register方法