閱讀前的思考
使用netflix eureka做服務(wù)管理時(shí)平夜,若你只停留在對(duì)eureka的概念理解和使用層面歹袁,那么你面試時(shí)會(huì)得到面試官的靈魂拷問屹篓,例如:
1)eureka將服務(wù)注冊(cè)信息存放在哪里技肩?服務(wù)注冊(cè)信息都有哪些內(nèi)容俗孝?
2)eureka如何做到高可用酒甸?底層的通信機(jī)制是什么?
3)心跳機(jī)制到底發(fā)送些什么內(nèi)容,有了解嗎赋铝?
4)服務(wù)注冊(cè)列表是存在客戶端還是服務(wù)端插勤?如果多復(fù)本數(shù)據(jù)不一致怎么處理?
5)若網(wǎng)絡(luò)故障服務(wù)注冊(cè)失敗了,eureka是如何保證注冊(cè)成功的农尖?
6)注冊(cè)析恋,同步,下線卤橄,剔除分別是怎么實(shí)現(xiàn)的绿满?
7)為什么剛啟動(dòng)的服務(wù)沒有即時(shí)被eureka發(fā)現(xiàn)?對(duì)此你還遇到過哪些坑?
帶著這些問題或疑惑窟扑,作者決定推出eureka源碼解讀系列喇颁,從眾所周知的Eureka功能著手,對(duì)register,renew,heartbeat,fetch,剔除/關(guān)閉,數(shù)據(jù)復(fù)制等進(jìn)行源碼解讀嚎货,意在深入理解eureka功能橘霎。
Tip:建議開篇從 Eureka源碼解析(一) 開始,之后的文章是基于開篇的分析成果之上進(jìn)行撰寫的殖属。
Renew client端處理流程
DiscoveryClient.renew()方法定義的服務(wù)續(xù)約的具體流程姐叁。
boolean renew(){
EurekaHttpResponse<InstanceInfo>httpResponse;
try{
//通過sendHeartBeat的方式完成
httpResponse=eurekaTransport.registrationClient.sendHeartBeat(instanceInfo.getAppName(),instanceInfo.getId(),instanceInfo,null);
if(httpResponse.getStatusCode()==Status.NOT_FOUND.getStatusCode()){
REREGISTER_COUNTER.increment();
long timestamp=instanceInfo.setIsDirtyWithTime();
boolean success=register();
if(success){
instanceInfo.unsetIsDirty(timestamp);
}
return success;
}
return httpResponse.getStatusCode()==Status.OK.getStatusCode();
}catch(Throwablee){
}
}
sendHeartBeat一共有兩個(gè)類實(shí)現(xiàn)了方法,AbstractJerseyEurekaHttpClient,JerseyReplicationClient無論是哪個(gè)發(fā)送內(nèi)容者是一樣的洗显。
public EurekaHttpResponse<InstanceInfo> sendHeartBeat(String appName,String id,InstanceInfo info,InstanceStatus overriddenStatus){
String urlPath="apps/"+appName+'/'+id;
……
WebResourcewebResource=jerseyClient.resource(serviceUrl)
.path(urlPath)
.queryParam("status",info.getStatus().toString())
.queryParam("lastDirtyTimestamp",info.getLastDirtyTimestamp().toString());
if(overriddenStatus!=null){
webResource=webResource.queryParam("overriddenstatus",overriddenStatus.name());
}
……
//http put請(qǐng)求
response=requestBuilder.accept(MediaType.APPLICATION_JSON_TYPE).put(ClientResponse.class);
……
}
public enum InstanceStatus:
UP,//Ready to receive traffic
DOWN,//Do not send traffic-health check callback failed
STARTING,//Just about starting-initializations to be done-do not
//sendtraffic
OUT_OF_SERVICE,//Intentionally shut down for traffic
UNKNOWN;
心跳發(fā)送內(nèi)容:appName+id(instanceId)作為url,參數(shù):status即服務(wù)狀態(tài)外潜,lastDirtyTimestamp 即instance在client端最后被修改的時(shí)間戳,overriddenStatus 更新過的服務(wù)狀態(tài)挠唆。
服務(wù)續(xù)約的調(diào)用
DiscoveryClient 構(gòu)造方法中調(diào)用了initScheduledTasks方法初始化了heartbeatTask 用于執(zhí)行心跳發(fā)送任務(wù)处窥,具體任務(wù)實(shí)現(xiàn)在HeartbeatThread(Runnable)中
/**
*Theheartbeattaskthatrenewstheleaseinthegivenintervals.
*/
private class HeartbeatThread implements Runnable{
public void run(){
//調(diào)用renew
if(renew()){
lastSuccessfulHeartbeatTimestamp = System.currentTimeMillis();
}
}
}
Eureka 默認(rèn)每隔30秒一次心跳檢測(cè)具體現(xiàn)實(shí)如下:
//Heartbeat timer
heartbeatTask = new TimedSupervisorTask(
"heartbeat",
scheduler,
heartbeatExecutor,
renewalIntervalInSecs,//默認(rèn)30秒
TimeUnit.SECONDS,
expBackOffBound,
new HeartbeatThread()
);
scheduler.schedule(
heartbeatTask,
renewalIntervalInSecs,TimeUnit.SECONDS);
總結(jié):renew功能與心跳是綁定在一起發(fā)送的。定時(shí)心跳實(shí)現(xiàn)采用JDK的ScheduledExecutorService玄组,執(zhí)行任務(wù)是調(diào)用renew函數(shù)滔驾,發(fā)送的內(nèi)容為appName+instanceId組成的url,參數(shù)是服務(wù)器狀態(tài)status 和 最近更新狀態(tài)的時(shí)間戳。以Http put 請(qǐng)求發(fā)送俄讹。
Server 端流程處理
跟register服務(wù)處理流程一樣哆致,服務(wù)器啟動(dòng)后掃描并創(chuàng)建ApplicationResource,根據(jù)@Path("{id}")創(chuàng)建InstanceResource并調(diào)用@Put修飾的方法。
@PUT
public Response renewLease(
@HeaderParam(PeerEurekaNode.HEADER_REPLICATION) StringisReplication,
@QueryParam("overriddenstatus") StringoverriddenStatus,
@QueryParam("status") Stringstatus,
@QueryParam("lastDirtyTimestamp") StringlastDirtyTimestamp){
booleanisFromReplicaNode="true".equals(isReplication);
booleanisSuccess=registry.renew(app.getName(),id,isFromReplicaNode);
……
register.renew指向AbstractInstanceRegistry的renew方法患膛,代碼如下:
public boolean renew(String appName,String id,boolean isReplication){
RENEW.increment(isReplication);
Map<String,Lease<InstanceInfo>>gMap=registry.get(appName);
Lease<InstanceInfo>leaseToRenew=null;
if(gMap!=null){
leaseToRenew=gMap.get(id);
}
//如果gMap中找不到服務(wù)實(shí)例返回false,則續(xù)租失敗摊阀。
if(leaseToRenew==null){
RENEW_NOT_FOUND.increment(isReplication);
logger.warn("DS:Registry:leasedoesn'texist,registeringresource:{}-{}",appName,id);
return false;
}else{
InstanceInfoinstanceInfo=leaseToRenew.getHolder();
if(instanceInfo!=null){
InstanceStatus overriddenInstanceStatus=this.getOverriddenInstanceStatus(instanceInfo,leaseToRenew,isReplication);
if(overriddenInstanceStatus==InstanceStatus.UNKNOWN){
RENEW_NOT_FOUND.increment(isReplication);
return false;
}
if(!instanceInfo.getStatus().equals(overriddenInstanceStatus)){
instanceInfo.setStatusWithoutDirty(overriddenInstanceStatus);
}
}
renewsLastMin.increment();
//更新時(shí)間戳 lastUpdateTimestamp=System.currentTimeMillis()+duration;
leaseToRenew.renew();
return true;
}
}
此處renew方法在concurrentHashMap<String,Map<String,Lease<InstanceInfo>>> registry中查找(appName,(instanceId,Lease<InstanceInfo>)).如果判斷并更新時(shí)間Instance的時(shí)間戳。如果查詢不到踪蹬,則返回false.意味著被剔除的服務(wù)是無法得到續(xù)租的胞此。
eureka沒有服務(wù)超時(shí)一說,每隔30秒一次心跳延曙,服務(wù)在map中存在則更新時(shí)間戳,90秒沒有續(xù)租上則執(zhí)行服務(wù)剔除亡哄。