Eureka服務(wù)注冊register源碼分析

閱讀前的思考

使用netflix eureka做服務(wù)管理時牛曹,若你只停留在對eureka的概念理解和使用層面,那么你面試時會得到面試官的靈魂拷問醇滥,例如:
1)eureka將服務(wù)注冊信息存放在哪里黎比?服務(wù)注冊信息都有哪些內(nèi)容?
2)eureka如何做到高可用鸳玩?底層的通信機制是什么阅虫?
3)心跳機制到底發(fā)送些什么內(nèi)容,有了解嗎?
4)服務(wù)注冊列表是存在客戶端還是服務(wù)端怀喉?如果多復(fù)本數(shù)據(jù)不一致怎么處理书妻?
5)若網(wǎng)絡(luò)故障服務(wù)注冊失敗了,eureka是如何保證注冊成功的躬拢?
6)注冊躲履,同步,下線聊闯,剔除分別是怎么實現(xiàn)的工猜?
7)為什么剛啟動的服務(wù)沒有即時被eureka發(fā)現(xiàn)?對此你還遇到過哪些坑?

帶著這些問題或疑惑菱蔬,作者決定推出eureka源碼解讀系列篷帅,從眾所周知的Eureka功能著手,對register,renew,heartbeat,fetch,剔除/關(guān)閉,數(shù)據(jù)復(fù)制等進行源碼解讀拴泌,意在深入理解eureka功能魏身。

Register Client 端實現(xiàn)原理

服務(wù)注冊先由eureka client端發(fā)起請求,具體代碼定位于eureka-client-1.9.25.jar com.netflix.discovery包下蚪腐。
DiscoveryCilent類的register():boolean方法是服務(wù)注冊的實現(xiàn)箭昵。代碼如下:

    /**
     * Register with the eureka service by making the appropriate REST call.
     */
    boolean register() throws Throwable {
        logger.info(PREFIX + "{}: registering service...", appPathIdentifier);
        EurekaHttpResponse<Void> httpResponse;
        try {
            httpResponse = eurekaTransport.registrationClient.register(instanceInfo);
        } catch (Exception e) {
            logger.warn(PREFIX + "{} - registration failed {}", appPathIdentifier, e.getMessage(), e);
            throw e;
        }
        if (logger.isInfoEnabled()) {
            logger.info(PREFIX + "{} - registration status: {}", appPathIdentifier, httpResponse.getStatusCode());
        }
        return httpResponse.getStatusCode() == Status.NO_CONTENT.getStatusCode();
    }

看到httpResponse了說明eureka采用http方式進行服務(wù)通信,將服務(wù)注冊的信息封裝到InstanceInfo類中回季。首先來看一下InstanceInfo類包含哪些內(nèi)容家制?
在設(shè)計上com.netfilx.appinfo.InstanceInfo定義了很多屬性,服務(wù)實例相關(guān)的有instanceId泡一,appGroupName颤殴,ipAddrport鼻忠,securePort涵但,homePageUrlstatusPageUrl等。封裝后的實例通過eurekaTransport.registrationClient具體實現(xiàn)贤笆。

EurekaTransport為DiscoveryClient的靜態(tài)內(nèi)部類蝇棉,源碼中集成了EurekaHttpClient,EurekaHttpClientFactory芥永,TransportClientFactory篡殷,從設(shè)計上可以看出該類只是個工具類,具體實現(xiàn)由EurekaHttpClient接口來實現(xiàn)埋涧。

展開EurekaHttpClient接口的register()板辽,實現(xiàn)類分別為EurekaHttpClientDecorator,AbstractJerseyEurekaHttpClient.其中EurekaHttpClientDecorator只是進行了封裝和定義,具體實現(xiàn)在AbstractJerseyEurekaHttpClient

public EurekaHttpResponse<Void> register(InstanceInfo info) {
        String urlPath = "apps/" + info.getAppName();
        ClientResponse response = null;
        try {
            Builder resourceBuilder = jerseyClient.resource(serviceUrl).path(urlPath).getRequestBuilder();
            addExtraHeaders(resourceBuilder);
            response = resourceBuilder
                    .header("Accept-Encoding", "gzip")
                    .type(MediaType.APPLICATION_JSON_TYPE)
                    .accept(MediaType.APPLICATION_JSON)
                    .post(ClientResponse.class, info);
            return anEurekaHttpResponse(response.getStatus()).headers(headersOf(response)).build();
        } finally {
            if (logger.isDebugEnabled()) {
                logger.debug("Jersey HTTP POST {}/{} with instance {}; statusCode={}", serviceUrl, urlPath, info.getId(),
                        response == null ? "N/A" : response.getStatus());
            }
            if (response != null) {
                response.close();
            }
        }
    }

Http請求由Jersey(RESTFUL請求服務(wù)JAVA框架)來完成棘催。如果是自己實現(xiàn)http通信劲弦,完全可以選擇apache httpClient,OKHttp或者自定義封裝http服務(wù),如何使用http通信不是本文討論的重點醇坝。需要關(guān)注的是http請求發(fā)送方式是post,采用json的格式進行發(fā)送和接收邑跪,使用常用的gzip進行編碼壓縮傳輸。

client端是如何創(chuàng)建實例并向服務(wù)端發(fā)起請求的呼猪?

理解了eureka client的register實現(xiàn)后画畅,接下來的問題是如何調(diào)用DiscoveryClient的register方法。怎么用宋距?何時發(fā)送給服務(wù)器端轴踱?

  • DiscoveryManager進行客戶端初始化

eureka-client-1.9.25.jar com.netflix.discovery下有一個DiscoveryManager類,該類被定義為@Deprecated說明過時官方不推薦使用谚赎。但仍然可以看到它聚合了DiscoveryClinet,EurekaInstanceConfig,EurekaClientConfig配置項淫僻。在initComponent方法內(nèi)進行了初始化。

  • EurekaBootStrap進行客戶端初始化

EurekaBootStrap位于eureka-core-1.9.25.jar com.netflix.eureka包下壶唤,它是server和client端的啟動項雳灵。實現(xiàn)了ServletContextListener接口,說明在web服務(wù)啟動時會去做初始化闸盔。

contextInitialized(ServletContextEventevent)方法中調(diào)用了initEurekaServerContext(),里面有new DiscoveryClient(applicationInfoManager,eurekaClientConfig)

查看DiscoveryClient的構(gòu)造函數(shù)细办,scheduler 構(gòu)建為定時任務(wù)執(zhí)行者,heartbeatExecutor 實例為心跳檢測的Executor蕾殴,cacheRefreshExecutor實例為刷新服務(wù)注冊表的Executor 這三個線程池都是守護式線程。

initScheduledTasks()會對如上Executor進行任務(wù)設(shè)置岛啸,方法的最后調(diào)用了

instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds());
public void start(int initialDelayMs) {
        if (started.compareAndSet(false, true)) {
            instanceInfo.setIsDirty();  // for initial register
            //這里注冊調(diào)用有40秒的延時
            Future next = scheduler.schedule(this, initialDelayMs, TimeUnit.SECONDS);
            scheduledPeriodicRef.set(next);
        }
    }

注:默認新服務(wù)注冊到eureka服務(wù)器要40秒的延時.

InstanceInfoReplicator實現(xiàn)了runnable接口钓觉,查看run方法代碼如下:

public void run() {
        try {
            discoveryClient.refreshInstanceInfo();

            Long dirtyTimestamp = instanceInfo.isDirtyWithTime();
            if (dirtyTimestamp != null) {
                discoveryClient.register();
                instanceInfo.unsetIsDirty(dirtyTimestamp);
            }
        } catch (Throwable t) {
            logger.warn("There was a problem with the instance info replicator", t);
        } finally {
            Future next = scheduler.schedule(this, replicationIntervalSeconds, TimeUnit.SECONDS);
            scheduledPeriodicRef.set(next);
        }
    }

總結(jié)一下調(diào)用流程:
EurekaBootStrap.initEurekaServerContext方法實例化DiscoveryClient -> DiscoveryClient構(gòu)造方法 -> initScheduledTasks()-> 創(chuàng)建InstanceInfoReplicator實例(Runnable) -> 啟動run方法 -> DiscoveryClient.register()

Server 端實現(xiàn)原理

Eureka服務(wù)端需先從EurekaBootStrap類切入。代碼定位eureka-core-1.9.25.jar com.netflix.eureka.EurekaBootStrap坚踩。
通常以BootStrap命名的類一般為服務(wù)啟動類荡灾,Eureka也遵循這個設(shè)計原則。它實現(xiàn)了ServletContextListener接口,用于監(jiān)聽ServletContext對象的生命周期即監(jiān)聽整個web應(yīng)用的生命周期批幌。contextInitialized(ServletContextEvent event)具體實現(xiàn)如下:

/**
     * Initializes Eureka, including syncing up with other Eureka peers and publishing the registry.
     *
     * @see
     * javax.servlet.ServletContextListener#contextInitialized(javax.servlet.ServletContextEvent)
     */
    @Override
    public void contextInitialized(ServletContextEvent event) {
        try {
            initEurekaEnvironment();
            initEurekaServerContext();

            ServletContext sc = event.getServletContext();
            sc.setAttribute(EurekaServerContext.class.getName(), serverContext);
        } catch (Throwable e) {
            logger.error("Cannot bootstrap eureka server :", e);
            throw new RuntimeException("Cannot bootstrap eureka server :", e);
        }
    }

有兩個主要init方法础锐,跟蹤代碼發(fā)現(xiàn)處理內(nèi)容繁多,\color{red}{我們的目標是找出server端如何接收client請求并將服務(wù)注冊的信息存放在哪里荧缘?}
initEurekaServerContext()方法里有一段代碼:

PeerAwareInstanceRegistry registry;
        if (isAws(applicationInfoManager.getInfo())) {
           ……
        } else {
            registry = new PeerAwareInstanceRegistryImpl(
                    eurekaServerConfig,
                    eurekaClient.getEurekaClientConfig(),
                    serverCodecs,
                    eurekaClient
            );
        }

PeerAwareInstanceRegistryImpl中有一個register方法皆警,實現(xiàn)自AbstractInstanceRegistry類。其中InstanceRegistry為它的接口截粗,InstanceRegistry接口本身實現(xiàn)LeaseManager<InstanceInfo>,LookupService<String>的多繼承信姓。
AbstractInstanceRegistry.register內(nèi)容如下:

  /**
    *Registers  a  new  instance   with   a  given  duration.
    *
    *@see com.netflix.eureka.lease.LeaseManager#register(java.lang.Object,int,boolean)
    */
    public void register(InstanceInforegistrant,int leaseDuration,boolean isReplication){
        try{
            //獲取讀鎖,即讀取操作不受阻塞绸罗,寫操作會阻塞意推。
            read.lock();
            //gMap是一個CurrentHashMap
            Map<String,Lease<InstanceInfo>>gMap=registry.get(registrant.getAppName());
            //EurekaMontior計數(shù)器
            REGISTER.increment(isReplication);

            //InstanceInfo封裝成一個Lease對象,存儲到registry中珊蟀。registry結(jié)構(gòu)為ConcurrentHashMap<String,Map<String,Lease<InstanceInfo>>> registry
            //注意此處gNewMap并沒有添加元素
            if(gMap==null){
                final ConcurrentHashMap<String,Lease<InstanceInfo>>gNewMap=new ConcurrentHashMap<String,Lease<InstanceInfo>>();
                gMap=registry.putIfAbsent(registrant.getAppName(),gNewMap);
                if(gMap==null){
                    gMap=gNewMap;
                }
            }
            
            //判斷gMap中是否存在instanceId,如果不存在就設(shè)置
            Lease<InstanceInfo>existingLease=gMap.get(registrant.getId());
            
            if(existingLease!=null&&(existingLease.getHolder()!=null)){
                LongexistingLastDirtyTimestamp=existingLease.getHolder().getLastDirtyTimestamp();
                LongregistrationLastDirtyTimestamp=registrant.getLastDirtyTimestamp();
                logger.debug("Existingleasefound(existing={},provided={}",existingLastDirtyTimestamp,registrationLastDirtyTimestamp);
                
                if(existingLastDirtyTimestamp>registrationLastDirtyTimestamp){
                    logger.warn("Thereisanexistingleaseandtheexistinglease'sdirtytimestamp{}isgreater"+
                    "thantheonethatisbeingregistered{}",existingLastDirtyTimestamp,registrationLastDirtyTimestamp);
                    logger.warn("UsingtheexistinginstanceInfoinsteadofthenewinstanceInfoastheregistrant");
                    registrant=existingLease.getHolder();
                }
            }else{
                //The lease does not exist and hence  it  is  a  new  registration
                synchronized(lock){
                    if(this.expectedNumberOfClientsSendingRenews>0){
                        this.expectedNumberOfClientsSendingRenews=this.expectedNumberOfClientsSendingRenews+1;
                        updateRenewsPerMinThreshold();
                    }
                }
                logger.debug("Nopreviousleaseinformationfound;itisnewregistration");
            }
            Lease<InstanceInfo> lease = new Lease<InstanceInfo>(registrant,leaseDuration);
            if(existingLease != null){
                lease.setServiceUpTimestamp(existingLease.getServiceUpTimestamp());
            }
            //此處進行了添加 instanceId作為key菊值,lease對象作為value寫入gMap。
            gMap.put(registrant.getId(),lease);
            ………………
            }finally{
                read.unlock();
            }
    }

總結(jié):Eureka服務(wù)端將InstanceInfo封裝到一個CurrentHashMap<String, Map<String, Lease<InstanceInfo>>>中存儲育灸。

接下來的問題:Eureka server是如何接收并處理client端發(fā)送過來的請求腻窒?

Eureka server是一個web服務(wù),只要是web服務(wù)都需要web容器如tomcat,jetty,jboss等描扯。在Eureka源生項目中如何使用web容器呢定页?

查閱Netfilx的Eureka項目源碼發(fā)現(xiàn)一個web.xml配置文件 https://github.com/Netflix/eureka/blob/master/eureka-server/src/main/webapp/WEB-INF/web.xml

image.png

在沒有使用springboot自動裝配的情況下,我們看到了最原始的web開發(fā)配置绽诚。其中有一個ServletContainer的Filter典徊,引入com.sun.jersey依賴包查看

<dependency>
<groupId>com.sun.jersey</groupId>
<artifactId>jersey-servlet</artifactId>
<version>1.19</version>
</dependency>

public class ServletContainer extends  HttpServlet  implements   Filter{
……

我們發(fā)現(xiàn)ServletContainer既是一個Servlet也是一個Filter。所以容易理解原生Eureka Web服務(wù)依然是一個熟悉的傳統(tǒng)web開發(fā)項目恩够。使用Servlet進行服務(wù)對外交互卒落。
ServletContainer作為Servlet容器提供服務(wù)交互,但具體處理邏輯肯定不在此類中蜂桶。我們看到web配置中init-param

<init-param>
    <param-name>com.sun.jersey.config.property.packages</param-name>
    <param-value>com.sun.jersey;com.netflix</param-value>
</init-param>

Servlet容器初始化時會掃描com.sum.jersey和com.netflix兩個包儡毕,主要是為解析對應(yīng)包中的注解。我們回到eureka項目源碼com.netfilx.eureka.resources.ApplicationsResource下發(fā)現(xiàn)了

@Path("/{version}/apps")
@Produces({"application/xml","application/json"})
public class ApplicationsResource{
……

進一步跟蹤方法getApplicationResource在ApplicationResource.class中有一個addInstance(InstanceInfo info,String isReplication)方法扑媚,代碼registry.register(info,"true".equals(isReplication));調(diào)用自PeerAwareInstanceRegistryImpl.register方法腰湾。

至此eureka服務(wù)注冊端流程梳理完畢。

總結(jié)一下:
Eureka server端接收clinet端請求處理邏輯:
web服務(wù)啟動 -> 掃描resources/ApplicationsResource -> 加載ApplicationResource對象 -> 調(diào)用addInstance -> 調(diào)用PeerAwareInstanceRegistry.register方法

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末疆股,一起剝皮案震驚了整個濱河市费坊,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌旬痹,老刑警劉巖附井,帶你破解...
    沈念sama閱讀 221,198評論 6 514
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件讨越,死亡現(xiàn)場離奇詭異,居然都是意外死亡永毅,警方通過查閱死者的電腦和手機把跨,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 94,334評論 3 398
  • 文/潘曉璐 我一進店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來沼死,“玉大人着逐,你說我怎么就攤上這事÷瘢” “怎么了滨嘱?”我有些...
    開封第一講書人閱讀 167,643評論 0 360
  • 文/不壞的土叔 我叫張陵,是天一觀的道長浸间。 經(jīng)常有香客問我太雨,道長,這世上最難降的妖魔是什么魁蒜? 我笑而不...
    開封第一講書人閱讀 59,495評論 1 296
  • 正文 為了忘掉前任囊扳,我火速辦了婚禮,結(jié)果婚禮上兜看,老公的妹妹穿的比我還像新娘锥咸。我一直安慰自己,他們只是感情好细移,可當我...
    茶點故事閱讀 68,502評論 6 397
  • 文/花漫 我一把揭開白布搏予。 她就那樣靜靜地躺著,像睡著了一般弧轧。 火紅的嫁衣襯著肌膚如雪雪侥。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 52,156評論 1 308
  • 那天精绎,我揣著相機與錄音速缨,去河邊找鬼。 笑死代乃,一個胖子當著我的面吹牛旬牲,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播搁吓,決...
    沈念sama閱讀 40,743評論 3 421
  • 文/蒼蘭香墨 我猛地睜開眼原茅,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了堕仔?” 一聲冷哼從身側(cè)響起擂橘,我...
    開封第一講書人閱讀 39,659評論 0 276
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎贮预,沒想到半個月后贝室,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 46,200評論 1 319
  • 正文 獨居荒郊野嶺守林人離奇死亡仿吞,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 38,282評論 3 340
  • 正文 我和宋清朗相戀三年滑频,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片唤冈。...
    茶點故事閱讀 40,424評論 1 352
  • 序言:一個原本活蹦亂跳的男人離奇死亡峡迷,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出你虹,到底是詐尸還是另有隱情绘搞,我是刑警寧澤,帶...
    沈念sama閱讀 36,107評論 5 349
  • 正文 年R本政府宣布傅物,位于F島的核電站夯辖,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏董饰。R本人自食惡果不足惜蒿褂,卻給世界環(huán)境...
    茶點故事閱讀 41,789評論 3 333
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望卒暂。 院中可真熱鬧啄栓,春花似錦、人聲如沸也祠。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,264評論 0 23
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽诈嘿。三九已至堪旧,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間永淌,已是汗流浹背崎场。 一陣腳步聲響...
    開封第一講書人閱讀 33,390評論 1 271
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留遂蛀,地道東北人谭跨。 一個月前我還...
    沈念sama閱讀 48,798評論 3 376
  • 正文 我出身青樓,卻偏偏與公主長得像李滴,于是被迫代替她去往敵國和親螃宙。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當晚...
    茶點故事閱讀 45,435評論 2 359