dubbo源碼分析3- 服務(wù)發(fā)布與注冊(cè)

先思考下俐末,如果要實(shí)現(xiàn)服務(wù)發(fā)布和注冊(cè),需要做哪些事情?

  1. 配置文件解析或注解解析
  2. 啟動(dòng) netty 服務(wù)實(shí)現(xiàn)遠(yuǎn)程監(jiān)聽
  3. 服務(wù)注冊(cè)
    服務(wù)端無外乎就這3件事,接下來我們用源碼分析來論證我們的猜想

Dubbo集成Spring

Spring 的標(biāo)簽擴(kuò)展

在 spring 中定義了兩個(gè)接口
1.NamespaceHandler: 注冊(cè)一堆 BeanDefinitionParser角钩,利用他們來進(jìn)行解析配置
2.BeanDefinitionParser:用于解析每個(gè) element 的內(nèi)容

Spring 默認(rèn)會(huì)加載 jar 包下的 META-INF/spring.handlers 文件尋找對(duì)應(yīng)的 NamespaceHandler。 Dubbo-config 模塊下的 dubbo-config-spring


image.png

Dubbo 的接入實(shí)現(xiàn)

Dubbo 中 spring 擴(kuò)展就是使用 spring 的自定義類型,所以同樣也有 NamespaceHandler递礼、BeanDefinitionParser惨险。而NamespaceHandler 是 DubboNamespaceHandler

public class DubboNamespaceHandler extends NamespaceHandlerSupport {

    static {
        Version.checkDuplicate(DubboNamespaceHandler.class);
    }

    @Override
    public void init() {
        //BeanDefinitionParser接口全部都使用了 DubboBeanDefinitionParser實(shí)現(xiàn)
        registerBeanDefinitionParser("application", new DubboBeanDefinitionParser(ApplicationConfig.class, true));
        registerBeanDefinitionParser("module", new DubboBeanDefinitionParser(ModuleConfig.class, true));
        registerBeanDefinitionParser("registry", new DubboBeanDefinitionParser(RegistryConfig.class, true));
        registerBeanDefinitionParser("config-center", new DubboBeanDefinitionParser(ConfigCenterBean.class, true));
        registerBeanDefinitionParser("metadata-report", new DubboBeanDefinitionParser(MetadataReportConfig.class, true));
        registerBeanDefinitionParser("monitor", new DubboBeanDefinitionParser(MonitorConfig.class, true));
        registerBeanDefinitionParser("metrics", new DubboBeanDefinitionParser(MetricsConfig.class, true));
        registerBeanDefinitionParser("provider", new DubboBeanDefinitionParser(ProviderConfig.class, true));
        registerBeanDefinitionParser("consumer", new DubboBeanDefinitionParser(ConsumerConfig.class, true));
        registerBeanDefinitionParser("protocol", new DubboBeanDefinitionParser(ProtocolConfig.class, true));
        registerBeanDefinitionParser("service", new DubboBeanDefinitionParser(ServiceBean.class, true));
        registerBeanDefinitionParser("reference", new DubboBeanDefinitionParser(ReferenceBean.class, false));
        registerBeanDefinitionParser("annotation", new AnnotationBeanDefinitionParser());
    }

}

BeanDefinitionParser 全部都使用了 DubboBeanDefinitionParser,如果我們想看 dubbo:service 的配置脊髓,就直接看DubboBeanDefinitionParser(ServiceBean.class,true)

init()方法就是把不同的配置分別轉(zhuǎn)化成 spring 容器中的 bean 對(duì)象
application 對(duì)應(yīng) ApplicationConfig
registry 對(duì)應(yīng) RegistryConfig
monitor 對(duì)應(yīng) MonitorConfig
provider 對(duì)應(yīng) ProviderConfig
consumer 對(duì)應(yīng) ConsumerConfig

我們仔細(xì)看辫愉,發(fā)現(xiàn)涉及到服務(wù)發(fā)布和服務(wù)調(diào)用的兩個(gè)配置的解析,使用的是 ServiceBean 和 referenceBean将硝。并不是 config 結(jié)尾的恭朗,這兩個(gè)類稍微特殊些,當(dāng)然他同時(shí)也繼承了 ServiceConfig 和 ReferenceConfig依疼。

registerBeanDefinitionParser("service", new DubboBeanDefinitionParser(ServiceBean.class, true));
registerBeanDefinitionParser("reference", new DubboBeanDefinitionParser(ReferenceBean.class, false));

DubboBeanDefinitionParser

這里面是實(shí)現(xiàn)具體配置文件解析的入口痰腮,它重寫了 parse 方法,對(duì) spring 的配置進(jìn)行解析律罢。

public class DubboBeanDefinitionParser implements BeanDefinitionParser {

    @SuppressWarnings("unchecked")
    private static BeanDefinition parse(Element element, ParserContext parserContext, Class<?> beanClass, boolean required) {
        //省略其他代碼...  
       else if (ServiceBean.class.equals(beanClass)) {
            String className = element.getAttribute("class");
            if (className != null && className.length() > 0) {
                RootBeanDefinition classDefinition = new RootBeanDefinition();
                classDefinition.setBeanClass(ReflectUtils.forName(className));
                classDefinition.setLazyInit(false);
                parseProperties(element.getChildNodes(), classDefinition);
                beanDefinition.getPropertyValues().addPropertyValue("ref", new BeanDefinitionHolder(classDefinition, id + "Impl"));
            }
        }
       //省略其他代碼...  
     }
}

我們關(guān)注一下 ServiceBean 的解析膀值,實(shí)際就是解析 dubbo:service 這個(gè)標(biāo)簽中對(duì)應(yīng)的屬性

ServiceBean 的實(shí)現(xiàn)

ServiceBean 這個(gè)類,分別實(shí)現(xiàn)了 InitializingBean, DisposableBean, ApplicationContextAware, ApplicationListener, BeanNameAware, ApplicationEventPublisherAware

public class ServiceBean<T> extends ServiceConfig<T> implements InitializingBean, DisposableBean,
        ApplicationContextAware, ApplicationListener<ContextRefreshedEvent>, BeanNameAware,
        ApplicationEventPublisherAware {
  • InitializingBean
    接口為 bean 提供了初始化方法的方式误辑,它只包括 afterPropertiesSet 方法沧踏,凡是繼承該接口的類,在初始化 bean 的時(shí)候會(huì)執(zhí)行該方法巾钉,被重寫的方法為 afterPropertiesSet翘狱。
  • DisposableBean
    被重寫的方法為 destroy,bean 被銷毀的時(shí)候,spring 容器會(huì)自動(dòng)執(zhí)行 destory 方法睛琳,比如釋放資源
  • ApplicationContextAware
    實(shí)現(xiàn)了這個(gè)接口的 bean盒蟆,當(dāng) spring 容器初始化的時(shí)候,會(huì)自動(dòng)的將 ApplicationContext 注入進(jìn)來
  • ApplicationListener
    ApplicationEvent 事件監(jiān)聽师骗,spring 容器啟動(dòng)后會(huì)發(fā)一個(gè)事件通知历等。被重寫的方法為:onApplicationEvent,onApplicationEvent方法傳入的對(duì)象是 ContextRefreshedEvent辟癌。這個(gè)對(duì)象是當(dāng) Spring 的上下文被刷新或者加載完畢的時(shí)候觸發(fā)的寒屯。因此服務(wù)就是在Spring 的上下文刷新后進(jìn)行導(dǎo)出操作的。
  • BeanNameAware
    獲得自身初始化時(shí)黍少,本身的 bean 的 id 屬性寡夹,被重寫的方法為 setBeanName
  • ApplicationEventPublisherAware
    這個(gè)是一個(gè)異步事件發(fā)送器。被重寫的方法為 setApplicationEventPublisher,簡(jiǎn)單來說厂置,在 spring 里面提供了類似于消息隊(duì)列的異步事件解耦功能菩掏。(典型的觀察者模式的應(yīng)用)
    spring 事件發(fā)送監(jiān)聽由 3 個(gè)部分組成:
    1.ApplicationEvent:表示事件本身,自定義事件需要繼承該類
    2.ApplicationEventPublisherAware:事件發(fā)送器昵济,需要實(shí)現(xiàn)該接口
    3.ApplicationListener:事件監(jiān)聽器接口
image.png

ServiceBean 中服務(wù)暴露過程

在 ServiceBean 中智绸,我們暫且只需要關(guān)注兩個(gè)方法野揪,分別是:
1.在初始化 bean 的時(shí)候會(huì)執(zhí)行該方法 afterPropertiesSet
2.spring 容器啟動(dòng)后會(huì)發(fā)一個(gè)事件通知 onApplicationEvent

afterPropertiesSet

我們發(fā)現(xiàn)這個(gè)方法里面,就是把 dubbo 中配置的 application瞧栗、registry斯稳、service、protocol 等配置信息迹恐,加載到對(duì)應(yīng)的 config實(shí)體中挣惰,便于后續(xù)使用。大家可以自行看代碼殴边。

onApplicationEvent

spring 容器啟動(dòng)之后憎茂,會(huì)收到一個(gè)這樣的事件通知,這里面做了兩個(gè)事情:
? 1.判斷服務(wù)是否已經(jīng)發(fā)布過
? 2.如果沒有發(fā)布找都,則調(diào)用調(diào)用 export 進(jìn)行服務(wù)發(fā)布的流程(這里就是入口

    //監(jiān)聽  spring上下文被刷新或者加載的時(shí)候觸發(fā)
    @Override
    public void onApplicationEvent(ContextRefreshedEvent event) {
        if (!isExported() && !isUnexported()) {
            if (logger.isInfoEnabled()) {
                logger.info("The service ready on spring started. service: " + getInterface());
            }
            export(); //導(dǎo)出唇辨、發(fā)布
        }
    }

export

    @Override
    public void export() {
        super.export();
        // Publish ServiceBeanExportedEvent
        publishExportEvent();
    }

serviceBean 中,重寫了 export 方法能耻,實(shí)現(xiàn)了 一個(gè)事件的發(fā)布赏枚。并且調(diào)用了 super.export() ,也就是會(huì)調(diào)用serviceBean 的父類ServiceConfig的 export 方法晓猛。

先整體來看一下這個(gè)父類ServiceConfig的作用饿幅,從名字來看,它應(yīng)該和其他所有 config 類一樣去實(shí)現(xiàn)對(duì)配置文件中 service 的配置信息的存儲(chǔ)戒职。
實(shí)際上這個(gè)類并不單純栗恩,所有的配置它都放在了一個(gè) AbstractServiceConfig 的抽象類,自己實(shí)現(xiàn)了很多對(duì)于服務(wù)發(fā)布之前要做的操作邏輯

ServiceConfig 配置類

我們接分析super.export();

export()

  public synchronized void export() {
        checkAndUpdateSubConfigs();  //檢查并且更新配置信息

        if (!shouldExport()) { //當(dāng)前的服務(wù)是否需要發(fā)布, 通過配置實(shí)現(xiàn):@Service(export = false)
            return;
        }

        if (shouldDelay()) {//檢查是否需要延時(shí)發(fā)布洪燥,通過配置@Service(delay = 1000)實(shí)現(xiàn)磕秤,單位毫秒
            //這里的延時(shí)是通過定時(shí)器來實(shí)現(xiàn)
            delayExportExecutor.schedule(this::doExport, getDelay(), TimeUnit.MILLISECONDS);
        } else {
            doExport(); //如果沒有配置 delay,則直接調(diào)用 doExport 進(jìn)行發(fā)布
        }
    }

doExport

這里仍然還是在實(shí)現(xiàn)發(fā)布前的各種判斷捧韵,比如判斷

 protected synchronized void doExport() {
        if (unexported) {
            throw new IllegalStateException("The service " + interfaceClass.getName() + " has already unexported!");
        }
        if (exported) {//服務(wù)是否已經(jīng)發(fā)布過了
            return;
        }
        exported = true;//設(shè)置發(fā)布狀態(tài)

        if (StringUtils.isEmpty(path)) { //path 表示服務(wù)路徑市咆,默認(rèn)使用 interfaceName
            path = interfaceName;
        }
        doExportUrls();
    }

doExportUrls

  1. 記載所有配置的注冊(cè)中心地址
  2. 遍歷所有配置的協(xié)議,protocols
  3. 針對(duì)每種協(xié)議發(fā)布一個(gè)對(duì)應(yīng)協(xié)議的服務(wù)
    private void doExportUrls() {
        //加載所有配置的注冊(cè)中心的地址再来,組裝成一個(gè) URL
        //URL(來驅(qū)動(dòng)流程的執(zhí)行)->[  registry://192.168.1.102:2181/org.apache.dubbo.registry.RegsitryService/....]
        List<URL> registryURLs = loadRegistries(true);
        for (ProtocolConfig protocolConfig : protocols) {
            //group 跟 version 組成一個(gè) pathKey(serviceName)
            String pathKey = URL.buildKey(getContextPath(protocolConfig).map(p -> p + "/" + path).orElse(path), group, version);
            //applicationModel 用來存儲(chǔ) ProviderModel蒙兰,發(fā)布的服務(wù)的元數(shù)據(jù),后續(xù)會(huì)用到
            ProviderModel providerModel = new ProviderModel(pathKey, ref, interfaceClass);
            ApplicationModel.initProviderModel(pathKey, providerModel);
            doExportUrlsFor1Protocol(protocolConfig, registryURLs);
        }
    }

doExportUrlsFor1Protocol

發(fā)布指定協(xié)議的服務(wù)芒篷,我們以 Dubbo 服務(wù)為例搜变,由于代碼太多,就不全部貼出來

  1. 前面的一大串 if else 代碼是為了把當(dāng)前服務(wù)下所配置的<dubbo:method>參數(shù)進(jìn)行解析针炉,保存到map集合中
  2. 獲得當(dāng)前服務(wù)需要暴露的 ip 和端口
  3. 把解析到的所有數(shù)據(jù)挠他,組裝成一個(gè) URL,大概應(yīng)該是:
    dubbo://192.168.1.102:20881/com.wei.ISayHelloService
 private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs) {
        //省略一大串 ifelse 代碼,用于解析<dubbo:method> 配置
        //省略解析<dubbo:service>中配置參數(shù)的代碼篡帕,比如 token绩社、比如 service 中的 method 名稱等存儲(chǔ)在 map 中
        //獲得當(dāng)前服務(wù)要發(fā)布的目標(biāo) ip 和 port
        String host = this.findConfigedHosts(protocolConfig, registryURLs, map);
        Integer port = this.findConfigedPorts(protocolConfig, name, map);
        //組裝 URL
        URL url = new URL(name, host, port, getContextPath(protocolConfig).map(p -> p + "/" + path).orElse(path), map);
        //這里是通過 ConfiguratorFactory 去實(shí)現(xiàn)動(dòng)態(tài)改變配置的功能摔蓝,這里暫時(shí)不涉及后續(xù)再分析
        if (ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class)
                .hasExtension(url.getProtocol())) {
            url = ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class)
                    .getExtension(url.getProtocol()).getConfigurator(url).configure(url);
        }
        //url已經(jīng)組裝好了,接下來只需要發(fā)布愉耙。(context->url)
        //scope 選擇服務(wù)發(fā)布的范圍(local/remote)
        //同一個(gè)jvm里面調(diào)用,沒必要走遠(yuǎn)程通信  拌滋; injvm://ip:port..
        //remote :  dubbo://ip:port
        //默認(rèn)情況下朴沿,如果是配置remote(registry),默認(rèn)發(fā)布遠(yuǎn)程和本地
        String scope = url.getParameter(SCOPE_KEY);
        // don't export when none is configured
        if (!SCOPE_NONE.equalsIgnoreCase(scope)) {

            // 如果是本地發(fā)布败砂,則直接調(diào)用exportLocal
            if (!SCOPE_REMOTE.equalsIgnoreCase(scope)) {
                exportLocal(url);  //TODO
            }
            // export to remote if the config is not local (export to local only when config is local)
            //發(fā)布遠(yuǎn)程服務(wù)
            if (!SCOPE_LOCAL.equalsIgnoreCase(scope)) {
                下面remote代碼
        //...
    }
  • local
    服務(wù)只是 injvm 的服務(wù)赌渣,提供一種消費(fèi)者和提供者都在一個(gè) jvm 內(nèi)的調(diào)用方式。使用了 Injvm 協(xié)議昌犹,是一個(gè)偽協(xié)議坚芜,它不開啟端口,不發(fā)起遠(yuǎn)程調(diào)用斜姥,只在 JVM 內(nèi)直接關(guān)聯(lián)鸿竖,(通過集合的方式保存了發(fā)布的服務(wù)信息),但執(zhí)行 Dubbo 的 Filter 鏈铸敏。簡(jiǎn)單來說缚忧,就是你本地的 dubbo 服務(wù)調(diào)用,都依托于 dubbo 的標(biāo)準(zhǔn)來進(jìn)行杈笔。這樣可以享受到 dubbo 的一些配置服務(wù)闪水。
  • remote
   for (URL registryURL : registryURLs) {
            //省略部分代碼...
            //這個(gè)invoker具體是什么,下面會(huì)單獨(dú)開一個(gè)模塊分析
            Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(EXPORT_KEY, url.toFullString()));
            DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
            Exporter<?> exporter = protocol.export(wrapperInvoker);
            exporters.add(exporter);
        }

表示根據(jù)根據(jù)配置的注冊(cè)中心進(jìn)行遠(yuǎn)程發(fā)布蒙具。 遍歷多個(gè)注冊(cè)中心球榆,進(jìn)行協(xié)議的發(fā)布

  1. Invoker 是一個(gè)代理類,它是 Dubbo 的核心模型禁筏,其它模型都向它靠擾持钉,或轉(zhuǎn)換成它,它代表一個(gè)可執(zhí)行體融师,可向它發(fā)起invoke 調(diào)用右钾,它有可能是一個(gè)本地的實(shí)現(xiàn),也可能是一個(gè)遠(yuǎn)程的實(shí)現(xiàn)旱爆,也可能一個(gè)集群實(shí)現(xiàn)舀射。(很重要,后續(xù)單獨(dú)分析
  2. DelegateProviderMetaDataInvoker怀伦,因?yàn)?2.7 引入了元數(shù)據(jù)脆烟,所以這里對(duì) invoker 做了委托,把 invoker 交給DelegateProviderMetaDataInvoker 來處理
  3. 調(diào)用 protocol.export(invoker)來發(fā)布這個(gè)代理
  4. 添加到 exporters 集合

protocol.export

protocol.export房待,這個(gè) protocol 是什么呢邢羔?找到定義處發(fā)現(xiàn)它是一個(gè)自適應(yīng)擴(kuò)展點(diǎn)驼抹,打開 Protocol 這個(gè)擴(kuò)展點(diǎn),又可以看到它是一個(gè)在方法層面上的自適應(yīng)擴(kuò)展拜鹤,意味著它實(shí)現(xiàn)了對(duì)于 export 這個(gè)方法的適配框冀。也就意味著這個(gè) Protocol 是一個(gè)動(dòng)態(tài)代理類,Protocol$Adaptive

Protocol protocol = ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension();

這個(gè)動(dòng)態(tài)代理類敏簿,會(huì)根據(jù) url 中配置的 protocol name 來實(shí)現(xiàn)對(duì)應(yīng)協(xié)議的適配

Protocol$Adaptive

public class Protocol$Adaptive implements org.apache.dubbo.rpc.Protocol {
    public void destroy() {
        throw new UnsupportedOperationException("The method public abstract void org.apache.dubbo.rpc.Protocol.destroy()of interface org.apache.dubbo.rpc.Protocol is not adaptive method !");
    }

    public int getDefaultPort() {
        throw new UnsupportedOperationException("The method public abstract int org.apache.dubbo.rpc.Protocol.getDefaultPort()of interface org.apache.dubbo.rpc.Protocol is not adaptive method !");
        public org.apache.dubbo.rpc.Exporter export (org.apache.dubbo.rpc.Invoker arg0) throws org.apache.dubbo.rpc.RpcException {
            if (arg0 == null) throw new IllegalArgumentException("org.apache.dubbo.rpc.Invoker argument == null");
            if (arg0.getUrl() == null)
                throw new IllegalArgumentException("org.apache.dubbo.rpc.Invoker argument getUrl() == null");
            org.apache.dubbo.common.URL url = arg0.getUrl();
            String extName = (url.getProtocol() == null ? "dubbo" : url.getProtocol());
            if (extName == null)
                throw new IllegalStateException("Failed to get extension (org.apache.dubbo.rpc.Protocol) name from url(" + url.toString() + ") use keys ([protocol])");
            org.apache.dubbo.rpc.Protocol extension = (org.apache.dubbo.rpc.Protocol) ExtensionLoader.getExtensionLoader(org.apache.dubbo.rpc.Protocol.class).getExtension(extName);
            return extension.export(arg0);
        }
        public org.apache.dubbo.rpc.Invoker refer (java.lang.Class arg0, org.apache.dubbo.common.URL arg1) throws org.apache.dubbo.rpc.RpcException {
            if (arg1 == null) throw new IllegalArgumentException("url == null");
            org.apache.dubbo.common.URL url = arg1;
            String extName = (url.getProtocol() == null ? "dubbo" : url.getProtocol());
            if (extName == null)
                throw new IllegalStateException("Failed to get extension (org.apache.dubbo.rpc.Protocol) name from url(" + url.toString() + ") use keys ([protocol])");
            org.apache.dubbo.rpc.Protocol extension = (org.apache.dubbo.rpc.Protocol) ExtensionLoader.getExtensionLoader(org.apache.dubbo.rpc.Protocol.class).getExtension(extName);
            return extension.refer(arg0, arg1);
        }
    }
}

那么在當(dāng)前的場(chǎng)景中明也,protocol 會(huì)是調(diào)用誰呢?目前發(fā)布的 invoker(URL)惯裕,實(shí)際上是一個(gè) registry://協(xié)議温数,所以Protocol$Adaptive,會(huì)通過 getExtension(extName)得到一個(gè) RegistryProtocol蜻势。

RegistryProtocol.export

很明顯撑刺,這個(gè) RegistryProtocol 是用來實(shí)現(xiàn)服務(wù)注冊(cè)的,這里面會(huì)有很多處理邏輯:
? 實(shí)現(xiàn)對(duì)應(yīng)協(xié)議的服務(wù)發(fā)布
? 實(shí)現(xiàn)服務(wù)注冊(cè)
? 訂閱服務(wù)重寫

 //實(shí)現(xiàn)服務(wù)的注冊(cè)和發(fā)布
    @Override
    public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
        //這里獲得的是 zookeeper 注冊(cè)中心的 url: zookeeper://ip:port
        URL registryUrl = getRegistryUrl(originInvoker);
        //這里是獲得服務(wù)提供者的 url, dubbo://ip:port...
        URL providerUrl = getProviderUrl(originInvoker);
        //訂閱 override 數(shù)據(jù)握玛。在 admin 控制臺(tái)可以針對(duì)服務(wù)進(jìn)行治理够傍,比如修改權(quán)重,修改路由機(jī)制等败许,當(dāng)注冊(cè)中心有此服務(wù)的覆蓋配置注冊(cè)進(jìn)來時(shí)王带,推送消息給提供者,重新暴露服務(wù)
        final URL overrideSubscribeUrl = getSubscribedOverrideUrl(providerUrl);
        final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker);
        overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);

        providerUrl = overrideUrlWithConfig(providerUrl, overrideSubscribeListener);

        /***********************************/
        //(很重要)1.doLocalExport這里就是真正的服務(wù)發(fā)布邏輯
        //這里就交給了具體的協(xié)議去暴露服務(wù)( 本質(zhì)就是去啟動(dòng)一個(gè)netty服務(wù))
        final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker, providerUrl);
        
        //很重要)2.getRegistry這里就是真正的服務(wù)注冊(cè)邏輯
        // 根據(jù) invoker 中的 url 獲取 Registry 實(shí)例: zookeeperRegistry
        final Registry registry = getRegistry(originInvoker);
        //獲取要注冊(cè)到注冊(cè)中心的 URL: dubbo://ip:port
        final URL registeredProviderUrl = getRegisteredProviderUrl(providerUrl, registryUrl);
        ProviderInvokerWrapper<T> providerInvokerWrapper = ProviderConsumerRegTable.registerProvider(originInvoker,
                registryUrl, registeredProviderUrl);
        //to judge if we need to delay publish
        boolean register = registeredProviderUrl.getParameter("register", true);
        if (register) {{//是否配置了注冊(cè)中心市殷,如果是愕撰, 則需要注冊(cè)
            //注冊(cè)到注冊(cè)中心的 URL
            register(registryUrl, registeredProviderUrl);
            providerInvokerWrapper.setReg(true);
        }

        //設(shè)置注冊(cè)中心的訂閱
        registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);

        exporter.setRegisterUrl(registeredProviderUrl);
        exporter.setSubscribeUrl(overrideSubscribeUrl);
        //保證每次 export 都返回一個(gè)新的 exporter 實(shí)例
        return new DestroyableExporter<>(exporter);
    }

doLocalExport

  private <T> ExporterChangeableWrapper<T> doLocalExport(final Invoker<T> originInvoker, URL providerUrl) {
        String key = getCacheKey(originInvoker);
        //bounds -chm  ->computeIfAbsent  if(map.get(key)==null){map.put()}
        return (ExporterChangeableWrapper<T>) bounds.computeIfAbsent(key, s -> {
            //對(duì)原有的 invoker,委托給了 InvokerDelegate,這個(gè)invoker具體是什么,下面會(huì)單獨(dú)分析
            Invoker<?> invokerDelegate = new InvokerDelegate<>(originInvoker, providerUrl);
            //將 invoker 轉(zhuǎn)換為 exporter 并啟動(dòng) netty 服務(wù)
            //protocol.export -> DubboProtocol.export(本質(zhì)上就是 暴露一個(gè) 20880的端口)
            //protocol- >Protocol$Apaptive ->QosProtocolWrapper(ProtocolListenerWrapper(ProtocolFilterWrapper(DubboProtocol(invoker))))
            return new ExporterChangeableWrapper<>((Exporter<T>) protocol.export(invokerDelegate), originInvoker);
        });
    }

先通過 doLocalExport 來暴露一個(gè)服務(wù)醋寝,本質(zhì)上應(yīng)該是啟動(dòng)一個(gè)通信服務(wù),主要的步驟是將本地 ip 和 20880 端口打開黑毅,進(jìn)行監(jiān)聽
originInvoker: 應(yīng)該是 registry://ip:port/com.alibaba.dubbo.registry.RegistryService
key: 從 originInvoker 中獲得發(fā)布協(xié)議的 url: dubbo://ip:port/...
bounds: 一個(gè) prviderUrl 服務(wù) export 之后充甚,緩存到 bounds 中命锄,所以一個(gè) providerUrl 只會(huì)對(duì)應(yīng)一個(gè) exporter
computeIfAbsent 就相當(dāng)于, java8 的語法

if(bounds.get(key)==null){
    bounds.put(key,s->{})
}

InvokerDelegete: 是 RegistryProtocol 的一個(gè)靜態(tài)內(nèi)部類十办,該類是一個(gè) originInvoker 的委托類,該類存儲(chǔ)了 originInvoker嗅绰,其父類 InvokerWrapper 還會(huì)存儲(chǔ) providerUrl舍肠,InvokerWrapper 會(huì)調(diào)用 originInvoker 的 invoke 方法,也會(huì)銷毀 invoker窘面〈溆铮可以管理 invoker 的生命周期。

protocol.export

public class RegistryProtocol implements Protocol {
     private Protocol protocol; 

     //set方法設(shè)置一個(gè)依賴擴(kuò)展點(diǎn)(DubboProtocol) 包裝
    public void setProtocol(Protocol protocol) {
        this.protocol = protocol;
    }
}

基于動(dòng)態(tài)代理的適配财边,很自然的就過渡到了 DubboProtocol 這個(gè)協(xié)議類中肌括,但是實(shí)際上是 DubboProtocol 嗎?
這里并不是獲得一個(gè)單純的 DubboProtocol 擴(kuò)展點(diǎn)酣难,而是會(huì)通過 Wrapper 對(duì) Protocol 進(jìn)行裝飾谍夭,裝飾器分別為:
QosProtocolWrapper/ProtocolListenerWrapper/ProtocolFilterWrapper/DubboProtocol
為什么是這樣黑滴?我們?cè)賮砜纯?spi 的代碼

DubboProtocol.export

Wrapper 包裝

在 ExtensionLoader.loadClass 這個(gè)方法中,有一段這樣的判斷紧索,如果當(dāng)前這個(gè)類是一個(gè) wrapper 包裝類袁辈,也就是這個(gè) wrapper中有構(gòu)造方法,參數(shù)是當(dāng)前被加載的擴(kuò)展點(diǎn)的類型齐板,則把這個(gè) wrapper 類加入到 cacheWrapperClass 緩存中吵瞻。

   else if (isWrapperClass(clazz)) {
        cacheWrapperClass(clazz);
    }

    private boolean isWrapperClass(Class<?> clazz) {
        try {
            clazz.getConstructor(type);
            return true;
        } catch (NoSuchMethodException e) {
            return false;
        }
    }

我們可以在 dubbo 的配置文件(dubbo-qos/dubbo-rpc-api)中找到三個(gè) Wrapper

qos=org.apache.dubbo.qos.protocol.QosProtocolWrapper 
filter=org.apache.dubbo.rpc.protocol.ProtocolFilterWrapper
listener=org.apache.dubbo.rpc.protocol.ProtocolListenerWrapper
  • QosprotocolWrapper, 如果當(dāng)前配置了注冊(cè)中心甘磨,則會(huì)啟動(dòng)一個(gè) Qos server.qos 是 dubbo 的在線運(yùn)維命令,dubbo2.5.8 新版本重構(gòu)了 telnet 模塊眯停,提供了新的 telnet 命令支持济舆,新版本的 telnet 端口與 dubbo 協(xié)議的端口是不同的端口,默認(rèn)為 22222
  • ProtocolFilterWrapper莺债,對(duì) invoker 進(jìn)行 filter 的包裝滋觉,實(shí)現(xiàn)請(qǐng)求的過濾
  • ProtocolListenerWrapper, 用于服務(wù) export 時(shí)候插入監(jiān)聽機(jī)制齐邦,暫未實(shí)現(xiàn)

接著椎侠,在 getExtension->createExtension 方法中,會(huì)對(duì) cacheWrapperClass 集合進(jìn)行判斷措拇,如果集合不為空我纪,則進(jìn)行包裝

        Set<Class<?>> wrapperClasses = cachedWrapperClasses;
        if (CollectionUtils.isNotEmpty(wrapperClasses)) {
            for (Class<?> wrapperClass : wrapperClasses) {
                instance = injectExtension((T) wrapperClass.getConstructor(type).newInstance(instance));
            }
        }

ProtocolFilterWrapper

這個(gè)是一個(gè)過濾器的包裝,使用責(zé)任鏈模式丐吓,對(duì) invoker 進(jìn)行了包裝

public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
        if (Constants.REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol())) {
            return protocol.export(invoker);
        }
        return protocol.export(buildInvokerChain(invoker, Constants.SERVICE_FILTER_KEY, Constants.PROVIDER));
    }

    //構(gòu)建責(zé)任鏈浅悉,基于激活擴(kuò)展點(diǎn)
    private static <T> Invoker<T> buildInvokerChain(final Invoker<T> invoker, String key, String group) {
        Invoker<T> last = invoker;
        List<Filter> filters = ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(invoker.g
                etUrl(), key, group);

    }

我們看如下文件:
/dubbo-rpc-api/src/main/resources/META-INF/dubbo/internal/com.alibaba.dubbo.rpc.Filter


image.png

默認(rèn)提供了非常多的過濾器。 然后基于條件激活擴(kuò)展點(diǎn)券犁,來對(duì) invoker 進(jìn)行包裝术健,從而在實(shí)現(xiàn)遠(yuǎn)程調(diào)用的時(shí)候,會(huì)經(jīng)過這些filter 進(jìn)行過濾粘衬。

export

  @Override
    public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
        URL url = invoker.getUrl();

        //獲取服務(wù)標(biāo)識(shí)荞估,理解成服務(wù)坐標(biāo)也行。由服務(wù)組名稚新,服務(wù)名勘伺,服務(wù)版本號(hào)以及端口組成。比如
        //${group}/copm.wei.ISayHelloService:${version}:20880
        String key = serviceKey(url);
        //創(chuàng)建 DubboExporter
        DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);
        // 將 <key, exporter> 鍵值對(duì)放入緩存中,可以猜測(cè)到客戶端調(diào)用時(shí)枷莉,會(huì)用到這個(gè)map娇昙,獲取對(duì)應(yīng)的invoker進(jìn)行服務(wù)調(diào)用,這個(gè)invoker具體是什么,下面會(huì)單獨(dú)分析
        exporterMap.put(key, exporter);

        //export an stub service for dispatching event
        Boolean isStubSupportEvent = url.getParameter(STUB_EVENT_KEY, DEFAULT_STUB_EVENT);
        Boolean isCallbackservice = url.getParameter(IS_CALLBACK_SERVICE, false);
        if (isStubSupportEvent && !isCallbackservice) {
            String stubServiceMethods = url.getParameter(STUB_EVENT_METHODS_KEY);
            if (stubServiceMethods == null || stubServiceMethods.length() == 0) {
                if (logger.isWarnEnabled()) {
                    logger.warn(new IllegalStateException("consumer [" + url.getParameter(INTERFACE_KEY) +
                            "], has set stubproxy support event ,but no stub methods founded."));
                }

            } else {
                stubServiceMethodsMap.put(url.getServiceKey(), stubServiceMethods);
            }
        }

        openServer(url); //openServer(url) 開啟一個(gè)服務(wù) 笤妙,暴露20880端口
        optimizeSerialization(url);  //優(yōu)化序列化

        return exporter;
    }

openServer

去開啟一個(gè)服務(wù)冒掌,并且放入到緩存中->在同一臺(tái)機(jī)器上(單網(wǎng)卡)噪裕,同一個(gè)端口上僅允許啟動(dòng)一個(gè)服務(wù)器實(shí)例

    private void openServer(URL url) {
        // 獲取 host:port,并將其作為服務(wù)器實(shí)例的 key股毫,用于標(biāo)識(shí)當(dāng)前的服務(wù)器實(shí)例
        String key = url.getAddress();
        ////client 也可以暴露一個(gè)只有 server 可以調(diào)用的服務(wù)
        boolean isServer = url.getParameter(Constants.IS_SERVER_KEY, true);
        if (isServer) {
            //是否在 serverMap 中緩存了
            ExchangeServer server = serverMap.get(key);
            if (server == null) {
                synchronized (this) {
                    server = serverMap.get(key);
                    if (server == null) {
                        // 創(chuàng)建服務(wù)器實(shí)例
                        serverMap.put(key, createServer(url));
                    }
                }
            } else {
                // 服務(wù)器已創(chuàng)建膳音,則根據(jù) url 中的配置重置服務(wù)器
                server.reset(url);
            }
        }
    }

createServer

創(chuàng)建服務(wù),開啟心跳檢測(cè),默認(rèn)使用 netty铃诬。組裝 url

    private ExchangeServer createServer(URL url) {
        //組裝 url祭陷,在 url 中添加心跳時(shí)間、編解碼參數(shù)
        url = URLBuilder.from(url)
                // 當(dāng)服務(wù)關(guān)閉以后趣席,發(fā)送一個(gè)只讀的事件兵志,默認(rèn)是開啟狀態(tài)
                .addParameterIfAbsent(CHANNEL_READONLYEVENT_SENT_KEY, Boolean.TRUE.toString())
                // 啟動(dòng)心跳配置
                .addParameterIfAbsent(HEARTBEAT_KEY, String.valueOf(DEFAULT_HEARTBEAT))
                .addParameter(CODEC_KEY, DubboCodec.NAME)
                .build();
        //獲得當(dāng)前應(yīng)該采用什么樣的方式來發(fā)布服務(wù), netty3, netty4, mina , grizzy,
        String str = url.getParameter(SERVER_KEY, DEFAULT_REMOTING_SERVER);
        //通過 SPI 檢測(cè)是否存在 server 參數(shù)所代表的 Transporter 拓展宣肚,不存在則拋出異常
        if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str)) {
            throw new RpcException("Unsupported server type: " + str + ", url: " + url);
        }
        //創(chuàng)建 ExchangeServer.
        ExchangeServer server;
        try {
            server = Exchangers.bind(url, requestHandler);
        } catch (RemotingException e) {
            throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e);
        }

        str = url.getParameter(CLIENT_KEY);
        if (str != null && str.length() > 0) {
            Set<String> supportedTypes = ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions();
            if (!supportedTypes.contains(str)) {
                throw new RpcException("Unsupported client type: " + str);
            }
        }

        return server;
    }

Exchangers.bind

  public static ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
        if (url == null) {
            throw new IllegalArgumentException("url == null");
        }
        if (handler == null) {
            throw new IllegalArgumentException("handler == null");
        }
        //獲取 Exchanger想罕,默認(rèn)為 HeaderExchanger。
        //調(diào)用 HeaderExchanger 的 bind 方法創(chuàng)建 ExchangeServer 實(shí)例
        url = url.addParameterIfAbsent(Constants.CODEC_KEY, "exchange");

        //HeaderExchanger.
        return getExchanger(url).bind(url, handler);
    }
  public static Exchanger getExchanger(URL url) {
        String type = url.getParameter(Constants.EXCHANGER_KEY, Constants.DEFAULT_EXCHANGER);
        return getExchanger(type);
    }

    public static Exchanger getExchanger(String type) {
        return ExtensionLoader.getExtensionLoader(Exchanger.class).getExtension(type);
    }

靜態(tài)SPI的方式可知霉涨,是調(diào)用 HeaderExchanger 的 bind 方法創(chuàng)建 ExchangeServer 實(shí)例

headerExchanger.bind

 @Override
    public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
        /**
         * new DecodeHandler(new HeaderExchangeHandler())
         * Transporters.bind
         * new HeaderExchangeServer
         */
        return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))));
    }

這里面包含多個(gè)邏輯
? new DecodeHandler(new HeaderExchangeHandler(handler))
? Transporters.bind
? new HeaderExchangeServer
目前我們只需要關(guān)心 transporters.bind 方法即可

Transporters.bind

    public static Server bind(URL url, ChannelHandler... handlers) throws RemotingException {
        if (url == null) {
            throw new IllegalArgumentException("url == null");
        }
        if (handlers == null || handlers.length == 0) {
            throw new IllegalArgumentException("handlers == null");
        }
        ChannelHandler handler;
        if (handlers.length == 1) {
            handler = handlers[0];
        } else {
            // 如果 handlers 元素?cái)?shù)量大于 1按价,則創(chuàng)建 ChannelHandler 分發(fā)器
            handler = new ChannelHandlerDispatcher(handlers);
        }
        // 獲取自適應(yīng) Transporter 實(shí)例,并調(diào)用實(shí)例方法
        //getTransporter() -> ExtensionLoader.getExtension(Transport.class).getExtension("netty");
        return getTransporter().bind(url, handler);
    }

getTransporter

public static Transporter getTransporter() { 
    return ExtensionLoader.getExtensionLoader(Transporter.class).getAdaptiveExtension();
}

getTransporter 是一個(gè)自適應(yīng)擴(kuò)展點(diǎn)笙瑟,它針對(duì) bind 方法添加了自適應(yīng)注解楼镐,意味著,bing 方法的具體實(shí)現(xiàn)往枷,會(huì)基于Transporter$Adaptive 方法進(jìn)行適配框产,那么在這里面默認(rèn)的通信協(xié)議是 netty,所以它會(huì)采用 netty4 的實(shí)現(xiàn)师溅,也就是

org.apache.dubbo.remoting.transport.netty4.NettyTransporter

NettyTransporter.bind

創(chuàng)建一個(gè) nettyserver

public Server bind(URL url, ChannelHandler listener) throws RemotingException {
     return new NettyServer(url, listener);
}

NettyServer

  public NettyServer(URL url, ChannelHandler handler) throws RemotingException {
        super(url, ChannelHandlers.wrap(handler, ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME)));
    }

進(jìn)入super方法

public abstract class AbstractServer extends AbstractEndpoint implements Server {

    public AbstractServer(URL url, ChannelHandler handler) throws RemotingException {
        super(url, handler);
        localAddress = getUrl().toInetSocketAddress();
        // 獲取 ip 和端口
        String bindIp = getUrl().getParameter(Constants.BIND_IP_KEY, getUrl().getHost());
        int bindPort = getUrl().getParameter(Constants.BIND_PORT_KEY, getUrl().getPort());
        if (url.getParameter(ANYHOST_KEY, false) || NetUtils.isInvalidLocalHost(bindIp)) {
            bindIp = ANYHOST_VALUE;
        }
        bindAddress = new InetSocketAddress(bindIp, bindPort);
        this.accepts = url.getParameter(ACCEPTS_KEY, DEFAULT_ACCEPTS);
        this.idleTimeout = url.getParameter(IDLE_TIMEOUT_KEY, DEFAULT_IDLE_TIMEOUT);
        try {
            doOpen();// 調(diào)用模板方法 doOpen 啟動(dòng)服務(wù)器
            if (logger.isInfoEnabled()) {
                logger.info("Start " + getClass().getSimpleName() + " bind " + getBindAddress() + ", export " + getLocalAddress());
            }
        } catch (Throwable t) {
            throw new RemotingException(url.toInetSocketAddress(), null, "Failed to bind " + getClass().getSimpleName()
                    + " on " + getLocalAddress() + ", cause: " + t.getMessage(), t);
        }
        //fixme replace this with better method
        DataStore dataStore = ExtensionLoader.getExtensionLoader(DataStore.class).getDefaultExtension();
        executor = (ExecutorService) dataStore.get(Constants.EXECUTOR_SERVICE_COMPONENT_KEY, Integer.toString(url.getPort()));
    }

初始化一個(gè) nettyserver茅信,并且從 url 中獲得相應(yīng)的 ip/ port。然后調(diào)用 doOpen();

doOpen

開啟 netty 服務(wù)墓臭,都是netty相關(guān)的代碼蘸鲸,大家應(yīng)該都很熟悉了

 @Override
    protected void doOpen() throws Throwable {
        bootstrap = new ServerBootstrap();

        bossGroup = new NioEventLoopGroup(1, new DefaultThreadFactory("NettyServerBoss", true));
        workerGroup = new NioEventLoopGroup(getUrl().getPositiveParameter(IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS),
                new DefaultThreadFactory("NettyServerWorker", true));

        final NettyServerHandler nettyServerHandler = new NettyServerHandler(getUrl(), this);
        channels = nettyServerHandler.getChannels();

        bootstrap.group(bossGroup, workerGroup)
                .channel(NioServerSocketChannel.class)
                .childOption(ChannelOption.TCP_NODELAY, Boolean.TRUE)
                .childOption(ChannelOption.SO_REUSEADDR, Boolean.TRUE)
                .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
                .childHandler(new ChannelInitializer<NioSocketChannel>() {
                    @Override
                    protected void initChannel(NioSocketChannel ch) throws Exception {
                        // FIXME: should we use getTimeout()?
                        int idleTimeout = UrlUtils.getIdleTimeout(getUrl());
                        NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);
                        ch.pipeline()//.addLast("logging",new LoggingHandler(LogLevel.INFO))//for debug
                                .addLast("decoder", adapter.getDecoder())
                                .addLast("encoder", adapter.getEncoder())
                                .addLast("server-idle-handler", new IdleStateHandler(0, 0, idleTimeout, MILLISECONDS))
                                .addLast("handler", nettyServerHandler);
                    }
                });
        // bind
        ChannelFuture channelFuture = bootstrap.bind(getBindAddress());
        channelFuture.syncUninterruptibly();
        channel = channelFuture.channel();

    }

要注意的是它這里用到了一個(gè) handler 來處理客戶端傳遞過來的請(qǐng)求:
nettyServerHandler
NettyServerHandler nettyServerHandler = new NettyServerHandler(getUrl(), this);
這個(gè) handler 是一個(gè)鏈路,它的正確組成應(yīng)該是
MultiMessageHandler(heartbeatHandler(AllChannelHandler(DecodeHandler(HeaderExchangeHeadler(dubboProtocol
后續(xù)接收到的請(qǐng)求窿锉,會(huì)一層一層的處理酌摇,比較繁瑣。

Invoker 是什么

之前埋的伏筆嗡载,這里單獨(dú)分析下invoker

從前面的分析來看窑多,服務(wù)的發(fā)布分三個(gè)階段:
第一個(gè)階段會(huì)創(chuàng)造一個(gè) invoker
第二個(gè)階段會(huì)把經(jīng)歷過一系列處理的 invoker(各種包裝),在 DubboProtocol 中保存到 exporterMap 中
第三個(gè)階段把 dubbo 協(xié)議的 url 地址注冊(cè)到注冊(cè)中心上

前面沒有分析 Invoker洼滚,我們來簡(jiǎn)單看看 Invoker 到底是一個(gè)啥東西埂息。

Invoker 是 Dubbo 領(lǐng)域模型中非常重要的一個(gè)概念, 和 ExtensionLoader 的重要性是一樣的,如果 Invoker 沒有搞懂,那么不算是看懂了 Dubbo 的源碼千康。
我們繼續(xù)回到 ServiceConfig 中 export 的代碼享幽,這段代碼是還沒有分析過的。

   //這個(gè)invoker具體是什么拾弃,下面會(huì)單獨(dú)開一個(gè)模塊分析
 Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(EXPORT_KEY, url.toFullString()));

以這個(gè)作為入口來分析我們前面 export 出去的 invoker 到底是啥東西

ProxyFacotory.getInvoker

這個(gè)是一個(gè)代理工廠值桩,用來生成 invoker,從它的定義來看豪椿,它是一個(gè)自適應(yīng)擴(kuò)展點(diǎn)奔坟,看到這樣的擴(kuò)展點(diǎn),我們幾乎可以不假思索的想到它會(huì)存在一個(gè)動(dòng)態(tài)適配器類

ProxyFactory proxyFactory = ExtensionLoader.getExtensionLoader(ProxyFactory.class).getAdaptiveExtension();

ProxyFactory

@SPI("javassist")
public interface ProxyFactory {

    /**
     * create proxy.
     *
     * @param invoker
     * @return proxy
     */
    @Adaptive({PROXY_KEY})
    <T> T getProxy(Invoker<T> invoker) throws RpcException;

    /**
     * create proxy.
     *
     * @param invoker
     * @return proxy
     */
    @Adaptive({PROXY_KEY})
    <T> T getProxy(Invoker<T> invoker, boolean generic) throws RpcException;

    /**
     * create invoker.
     *
     * @param <T>
     * @param proxy
     * @param type
     * @param url
     * @return invoker
     */
    @Adaptive({PROXY_KEY})
    <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) throws RpcException;

}

這個(gè)方法的簡(jiǎn)單解讀為: 它是一個(gè) spi 擴(kuò)展點(diǎn)搭盾,并且默認(rèn)的擴(kuò)展實(shí)現(xiàn)是 javassit, 這個(gè)接口中有三個(gè)方法咳秉,并且都是加了@Adaptive 的自適應(yīng)擴(kuò)展點(diǎn)。所以如果調(diào)用 getInvoker 方法鸯隅,應(yīng)該會(huì)返回一個(gè) ProxyFactory$Adaptive

ProxyFactory$Adaptive

public class ProxyFactory$Adaptive implements org.apache.dubbo.rpc.ProxyFactory {
    public java.lang.Object getProxy(org.apache.dubbo.rpc.Invoker arg0) throws org.apache.dubbo.rpc.RpcException {
        if (arg0 == null) throw new IllegalArgumentException("org.apache.dubbo.rpc.Invoker argument == null");
        if (arg0.getUrl() == null)
            throw new IllegalArgumentException("org.apache.dubbo.rpc.Invoker argument getUrl() == null");
        org.apache.dubbo.common.URL url = arg0.getUrl();
        String extName = url.getParameter("proxy", "javassist");
        if (extName == null)
            throw new IllegalStateException("Failed to get extension (org.apache.dubbo.rpc.ProxyFactory) name from url(" + url.toString() + ") use keys ([proxy])");
        org.apache.dubbo.rpc.ProxyFactory extension = (org.apache.dubbo.rpc.ProxyFactory) ExtensionLoader.getExtensionLoader(org.apache.dubbo.rpc.ProxyFactory.class).getExtension(extName);
        return extension.getProxy(arg0);
    }

    public java.lang.Object getProxy(org.apache.dubbo.rpc.Invoker arg0, boolean arg1) throws org.apache.dubbo.rpc.RpcException {
        if (arg0 == null) throw new IllegalArgumentException("org.apache.dubbo.rpc.Invoker argument == null");
        if (arg0.getUrl() == null)
            throw new IllegalArgumentException("org.apache.dubbo.rpc.Invoker argument getUrl() == null");
        org.apache.dubbo.common.URL url = arg0.getUrl();
        String extName = url.getParameter("proxy", "javassist");
        if (extName == null)
            throw new IllegalStateException("Failed to get extension (org.apache.dubbo.rpc.ProxyFactory) name from url(" + url.toString() + ") use keys ([proxy])");
        org.apache.dubbo.rpc.ProxyFactory extension = (org.apache.dubbo.rpc.ProxyFactory) ExtensionLoader.getExtensionLoader(org.apache.dubbo.rpc.ProxyFactory.class).getExtension(extName);
        return extension.getProxy(arg0, arg1);
    }

    public org.apache.dubbo.rpc.Invoker getInvoker(java.lang.Object arg0, java.lang.Class arg1, org.apache.dubbo.common.URL arg2) throws org.apache.dubbo.rpc.RpcException {
        if (arg2 == null) throw new IllegalArgumentException("url == null");
        org.apache.dubbo.common.URL url = arg2;
        String extName = url.getParameter("proxy", "javassist");
        if (extName == null)
            throw new IllegalStateException("Failed to get extension (org.apache.dubbo.rpc.ProxyFactory) name from url(" + url.toString() + ") use keys ([proxy])");
        org.apache.dubbo.rpc.ProxyFactory extension = (org.apache.dubbo.rpc.ProxyFactory) ExtensionLoader.getExtensionLoader(org.apache.dubbo.rpc.ProxyFactory.class).getExtension(extName);
        return extension.getInvoker(arg0, arg1, arg2);
    }
}

這個(gè)自適應(yīng)擴(kuò)展點(diǎn)滴某,做了三件事情
? 通過 ExtensionLoader.getExtensionLoader(ProxyFactory.class).getExtension(extName)獲取了一個(gè)指定名稱的擴(kuò)展點(diǎn),
? 在 dubbo-rpc-api/resources/META-INF/com.alibaba.dubbo.rpc.ProxyFactory 中,定義了 javassis=JavassisProxyFactory
? 調(diào)用 JavassisProxyFactory 的 getInvoker 方法

JavassistProxyFactory.getInvoker

先回顧我們自己定義的接口類

package com.wei;

public interface ISayHelloService {

    String sayHello(String content);
}

和實(shí)現(xiàn)類

package com.wei;

import com.wei.ISayHelloService;
import org.apache.dubbo.config.annotation.Service;

@Service 
public class SayHelloServiceImpl implements ISayHelloService {

    @Override
    public String sayHello(String content) {
        return "Hello :"+content;
    }
}

javassist 是一個(gè)動(dòng)態(tài)類庫滋迈,用來實(shí)現(xiàn)動(dòng)態(tài)代理的。
proxy:接口的實(shí)現(xiàn): com.wei.SayHelloServiceImpl
type:接口全稱 com.wei.ISayHelloService
url:協(xié)議地址:registry://...

public class JavassistProxyFactory extends AbstractProxyFactory {

   ...

    @Override
    public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {
        // TODO Wrapper cannot handle this scenario correctly: the classname contains '$'
        //javassist 生成的動(dòng)態(tài)代理代碼户誓,構(gòu)建一個(gè)代理類
        final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type);
        //構(gòu)建好了代理類之后饼灿,返回一個(gè) AbstractproxyInvoker,并且它實(shí)現(xiàn)了 doInvoke 方法
        return new AbstractProxyInvoker<T>(proxy, type, url) {
            @Override
            protected Object doInvoke(T proxy, String methodName,
                                      Class<?>[] parameterTypes,
                                      Object[] arguments) throws Throwable {
                return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);
            }
        };
    }

}

javassist 生成的動(dòng)態(tài)代理代碼

先進(jìn)入Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type);方法,這里相當(dāng)于
Wrapper.getWrapper(com.wei.ISayHelloService.class)
type= com.wei.ISayHelloService

  public static Wrapper getWrapper(Class<?> c) {
        while (ClassGenerator.isDynamicClass(c)) // can not wrapper on dynamic class.
        {
            c = c.getSuperclass();
        }

        if (c == Object.class) {
            return OBJECT_WRAPPER;
        }

        Wrapper ret = WRAPPER_MAP.get(c);
        if (ret == null) {
            //c=com.wei.ISayHelloService.class
            ret = makeWrapper(c);
            WRAPPER_MAP.put(c, ret);
        }
        return ret;
    }

再進(jìn)入makeWrapper方法

    private static Wrapper makeWrapper(Class<?> c) {
        if (c.isPrimitive()) {
            throw new IllegalArgumentException("Can not create wrapper for primitive type: " + c);
        }
        //這個(gè)name = com.wei.ISayHelloService
        String name = c.getName();
        ClassLoader cl = ClassUtils.getClassLoader(c);

        StringBuilder c1 = new StringBuilder("public void setPropertyValue(Object o, String n, Object v){ ");
        StringBuilder c2 = new StringBuilder("public Object getPropertyValue(Object o, String n){ ");
        StringBuilder c3 = new StringBuilder("public Object invokeMethod(Object o, String n, Class[] p, Object[] v) throws " + InvocationTargetException.class.getName() + "{ ");

        c1.append(name).append(" w; try{ w = ((").append(name).append(")$1); }catch(Throwable e){ throw new IllegalArgumentException(e); }");
        c2.append(name).append(" w; try{ w = ((").append(name).append(")$1); }catch(Throwable e){ throw new IllegalArgumentException(e); }");
        c3.append(name).append(" w; try{ w = ((").append(name).append(")$1); }catch(Throwable e){ throw new IllegalArgumentException(e); }");
        //省略下面的拼接字符串代碼

通過斷點(diǎn)的方式帝美,在 Wrapper.getWrapper 中的 makeWrapper碍彭,會(huì)創(chuàng)建一個(gè)動(dòng)態(tài)代理類,核心的方法invokeMethod 拼接好的字符串的代碼如下:

   public Object invokeMethod(Object o, String n, Class[] p, Object[] v) throws java.lang.reflect.InvocationTargetException {
        com.wei.ISayHelloService w;
        try {
            w = ((com.wei.ISayHelloServiceImpl) $1);
        } catch (Throwable e) {
            throw new IllegalArgumentException(e);
        }
        try {
            if ("sayHello".equals($2) && $3.length == 1) {
                //從這個(gè)生成的代碼可知悼潭,dubbo的服務(wù)調(diào)用并不是通過反射庇忌,還是直接生成好方法給客戶端調(diào)用
                return ($w) w.sayHello((java.lang.String) $4[0]);
            }
        } catch (Throwable e) {
            throw new java.lang.reflect.InvocationTargetException(e);
        }
        throw new org.apache.dubbo.common.bytecode.NoSuchMethodException("Not found method \"" + $2 + "\" in class com.wei.ISayHelloService.");
    }

構(gòu)建好了代理類之后,返回一個(gè) AbstractproxyInvoker,并且它實(shí)現(xiàn)了 doInvoke 方法舰褪,這個(gè)地方似乎看到了 dubbo 消費(fèi)者調(diào)用過來的時(shí)候觸發(fā)的影子皆疹,因?yàn)?wrapper.invokeMethod 本質(zhì)上就是觸發(fā)上面動(dòng)態(tài)代理類的方法 invokeMethod。

   @Override
    public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {
        // TODO Wrapper cannot handle this scenario correctly: the classname contains '$'
        //javassist 生成的動(dòng)態(tài)代理代碼占拍,構(gòu)建一個(gè)代理類
        final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type);
        //構(gòu)建好了代理類之后略就,返回一個(gè) AbstractproxyInvoker,并且它實(shí)現(xiàn)了 doInvoke 方法,dubbo消費(fèi)端調(diào)用方法晃酒,就會(huì)觸發(fā)doInvoke方法
        return new AbstractProxyInvoker<T>(proxy, type, url) {
            //所以可以猜想得出表牢,假設(shè)客戶端調(diào)用ISayHelloService.sayHello方法,最終會(huì)調(diào)用此doInvoke方法
            @Override
            protected Object doInvoke(T proxy, String methodName,
                                      Class<?>[] parameterTypes,
                                      Object[] arguments) throws Throwable {
                //而oInvoke方法贝次,最終就是調(diào)用上面動(dòng)態(tài)代理類生成好的方法 invokeMethod
                return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);
            }
        };
    }

從這段代碼也解密得出崔兴, 假設(shè)客戶端調(diào)用ISayHelloService.sayHello方法,最終會(huì)調(diào)用此doInvoke方法,而oInvoke方法敲茄,最終就是調(diào)用上面動(dòng)態(tài)代理類生成好的方法 invokeMethod位谋,而invokeMethod方法就是生成好的寫死的調(diào)用實(shí)現(xiàn)類SayHelloServiceImpl.sayHello方法(客戶端源碼分析在下篇文章)

所以,簡(jiǎn)單總結(jié)一下 Invoke 本質(zhì)上應(yīng)該是一個(gè)代理折汞,經(jīng)過層層包裝最終進(jìn)行了發(fā)布倔幼。當(dāng)消費(fèi)者發(fā)起請(qǐng)求的時(shí)候,會(huì)獲得這個(gè)invoker 進(jìn)行調(diào)用爽待。
最終發(fā)布出去的 invoker, 也不是單純的一個(gè)代理损同,也是經(jīng)過多層包裝
InvokerDelegate(DelegateProviderMetaDataInvoker(AbstractProxyInvoker()))

到這里,dubbo服務(wù)端的發(fā)布就正式完成了鸟款,我們也可用點(diǎn)對(duì)點(diǎn)直連的方式進(jìn)行調(diào)用服務(wù)端了膏燃,但是一般我們都是使用zk做注冊(cè)中心,所以還需要分析服務(wù)的注冊(cè)邏輯何什。

服務(wù)注冊(cè)流程

我們?cè)倩氐?RegistryProtocol 這個(gè)類的export方法中

 //實(shí)現(xiàn)服務(wù)的注冊(cè)和發(fā)布
    @Override
    public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
        //這里獲得的是 zookeeper 注冊(cè)中心的 url: zookeeper://ip:port
        URL registryUrl = getRegistryUrl(originInvoker);
        //這里是獲得服務(wù)提供者的 url, dubbo://ip:port...
        URL providerUrl = getProviderUrl(originInvoker);
        //訂閱 override 數(shù)據(jù)组哩。在 admin 控制臺(tái)可以針對(duì)服務(wù)進(jìn)行治理,比如修改權(quán)重处渣,修改路由機(jī)制等伶贰,當(dāng)注冊(cè)中心有此服務(wù)的覆蓋配置注冊(cè)進(jìn)來時(shí),推送消息給提供者罐栈,重新暴露服務(wù)
        final URL overrideSubscribeUrl = getSubscribedOverrideUrl(providerUrl);
        final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker);
        overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);

        providerUrl = overrideUrlWithConfig(providerUrl, overrideSubscribeListener);

        /***********************************/
        //(很重要)1.doLocalExport這里就是真正的服務(wù)發(fā)布邏輯
        //這里就交給了具體的協(xié)議去暴露服務(wù)( 本質(zhì)就是去啟動(dòng)一個(gè)netty服務(wù))
        final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker, providerUrl);
        
        //很重要)2.getRegistry這里就是真正的服務(wù)注冊(cè)邏輯
        // 根據(jù) invoker 中的 url 獲取 Registry 實(shí)例: zookeeperRegistry
        final Registry registry = getRegistry(originInvoker);
        //獲取要注冊(cè)到注冊(cè)中心的 URL: dubbo://ip:port
        final URL registeredProviderUrl = getRegisteredProviderUrl(providerUrl, registryUrl);
        ProviderInvokerWrapper<T> providerInvokerWrapper = ProviderConsumerRegTable.registerProvider(originInvoker,
                registryUrl, registeredProviderUrl);
        //to judge if we need to delay publish
        boolean register = registeredProviderUrl.getParameter("register", true);
        if (register) {{//是否配置了注冊(cè)中心黍衙,如果是, 則需要注冊(cè)
            //注冊(cè)到注冊(cè)中心的 URL
            register(registryUrl, registeredProviderUrl);
            providerInvokerWrapper.setReg(true);
        }

        //設(shè)置注冊(cè)中心的訂閱
        registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);

        exporter.setRegisterUrl(registeredProviderUrl);
        exporter.setSubscribeUrl(overrideSubscribeUrl);
        //保證每次 export 都返回一個(gè)新的 exporter 實(shí)例
        return new DestroyableExporter<>(exporter);
    }

1.doLocalExport這里就是真正的服務(wù)發(fā)布邏輯
我們現(xiàn)在從
2.getRegistry這里就是真正的服務(wù)注冊(cè)邏輯
這里開始繼續(xù)分析服務(wù)注冊(cè)邏輯

getRegistry

   private Registry getRegistry(final Invoker<?> originInvoker) {
        //把 url 轉(zhuǎn)化為配置的具體協(xié)議荠诬,比如 zookeeper://ip:port. 這樣后續(xù)獲得的注冊(cè)中心就會(huì)是基于 zk 的實(shí)現(xiàn)
        URL registryUrl = getRegistryUrl(originInvoker);
        return registryFactory.getRegistry(registryUrl);
    }
  1. 把 url 轉(zhuǎn)化為對(duì)應(yīng)配置的注冊(cè)中心的具體協(xié)議
  2. 根據(jù)具體協(xié)議琅翻,從 registryFactory 中獲得指定的注冊(cè)中心實(shí)現(xiàn),那么這個(gè) registryFactory具體是怎么賦值的呢柑贞?
    在 RegistryProtocol 中存在一段這樣的代碼方椎,很明顯這是通過依賴注入來實(shí)現(xiàn)的擴(kuò)展點(diǎn)。
private RegistryFactory registryFactory; 
public void setRegistryFactory(RegistryFactory registryFactory) {
  this.registryFactory = registryFactory;
}

按照擴(kuò)展點(diǎn)的加載規(guī)則钧嘶,我們可以先看看各個(gè)/META-INF/dubbo/internal 路徑下找到 RegistryFactory 的配置文件.這個(gè) factory 有多個(gè)擴(kuò)展點(diǎn)的實(shí)現(xiàn)棠众。

dubbo=org.apache.dubbo.registry.dubbo.DubboRegistryFactory
multicast=org.apache.dubbo.registry.multicast.MulticastRegistryFactory
zookeeper=org.apache.dubbo.registry.zookeeper.ZookeeperRegistryFactory
redis=org.apache.dubbo.registry.redis.RedisRegistryFactory
consul=org.apache.dubbo.registry.consul.ConsulRegistryFactory
etcd3=org.apache.dubbo.registry.etcd.EtcdRegistryFactory

接著,找到 RegistryFactory 的實(shí)現(xiàn), 發(fā)現(xiàn)它里面有一個(gè)自適應(yīng)的方法康辑,根據(jù) url 中 protocol 傳入的值進(jìn)行適配

@SPI("dubbo")
public interface RegistryFactory {

    /**
     * Connect to the registry
     * <p>
     * Connecting the registry needs to support the contract: <br>
     * 1. When the check=false is set, the connection is not checked, otherwise the exception is thrown when disconnection <br>
     * 2. Support username:password authority authentication on URL.<br>
     * 3. Support the backup=10.20.153.10 candidate registry cluster address.<br>
     * 4. Support file=registry.cache local disk file cache.<br>
     * 5. Support the timeout=1000 request timeout setting.<br>
     * 6. Support session=60000 session timeout or expiration settings.<br>
     *
     * @param url Registry address, is not allowed to be empty
     * @return Registry reference, never return empty value
     */
    @Adaptive({"protocol"})
    Registry getRegistry(URL url);

}

RegistryFactory$Adaptive

public class RegistryFactory$Adaptive implements org.apache.dubbo.registry.RegistryFactory {
    public org.apache.dubbo.registry.Registry getRegistry(org.apache.dubbo.common.URL arg0) {
        if (arg0 == null) throw new IllegalArgumentException("url == null");
        org.apache.dubbo.common.URL url = arg0;
        String extName = (url.getProtocol() == null ? "dubbo" : url.getProtocol());
        if (extName == null)
            throw new IllegalStateException("Failed to get extension (org.apache.dubbo.registry.RegistryFactory) name from url(" + url.toString() + ") use keys ([protocol])");
        org.apache.dubbo.registry.RegistryFactory extension = (org.apache.dubbo.registry.RegistryFactory) ExtensionLoader.getExtensionLoader(org.apache.dubbo.registry.RegistryFactory.class).getExtension(extName);
        return extension.getRegistry(arg0);
    }
}

由于在前面的代碼中摄欲,url 中的 protocol 已經(jīng)改成了 zookeeper,那么這個(gè)時(shí)候根據(jù) zookeeper 獲得的 spi 擴(kuò)展點(diǎn)應(yīng)該是ZookeeperRegistryFactory疮薇,然后直接去找ZookeeperRegistryFactory.getRegistry

ZookeeperRegistryFactory

這個(gè)方法中并沒有 getRegistry 方法胸墙,而是在父類 AbstractRegistryFactory中找到getRegistry方法

public abstract class AbstractRegistryFactory implements RegistryFactory {

    @Override
    public Registry getRegistry(URL url) {
        url = URLBuilder.from(url)
                .setPath(RegistryService.class.getName())
                .addParameter(INTERFACE_KEY, RegistryService.class.getName())
                .removeParameters(EXPORT_KEY, REFER_KEY)
                .build();
        String key = url.toServiceStringWithoutResolving();
        // Lock the registry access process to ensure a single instance of the registry
        LOCK.lock();
        try {
            Registry registry = REGISTRIES.get(key);
            if (registry != null) {
                return registry;
            }
            //create registry by spi/ioc
            //創(chuàng)建注冊(cè)中心
            registry = createRegistry(url);
            if (registry == null) {
                throw new IllegalStateException("Can not create registry " + url);
            }
            REGISTRIES.put(key, registry);
            return registry;
        } finally {
            // Release the lock
            LOCK.unlock();
        }
    }
    protected abstract Registry createRegistry(URL url);
}

邏輯很簡(jiǎn)單,
1.從緩存 REGISTRIES 中按咒,根據(jù) key 獲得對(duì)應(yīng)的 Registry
2.如果不存在迟隅,則創(chuàng)建 Registry

createRegistry

public class ZookeeperRegistryFactory extends AbstractRegistryFactory {

    private ZookeeperTransporter zookeeperTransporter;

    /**
     * Invisible injection of zookeeper client via IOC/SPI
     * @param zookeeperTransporter
     */
    public void setZookeeperTransporter(ZookeeperTransporter zookeeperTransporter) {
        this.zookeeperTransporter = zookeeperTransporter;
    }

    @Override
    public Registry createRegistry(URL url) {
        return new ZookeeperRegistry(url, zookeeperTransporter);
    }

}

創(chuàng)建一個(gè) zookeeperRegistry但骨,把 url 和 zookeepertransporter 作為參數(shù)傳入。

zookeeperTransporter 這個(gè)屬性也是基于依賴注入來賦值的智袭,具體的流程就不再分析了奔缠,這個(gè)的值應(yīng)該是CuratorZookeeperTransporter 表示具體使用什么框架來和 zk 產(chǎn)生連接

ZookeeperRegistry

這個(gè)方法中使用了 CuratorZookeeperTransport 來實(shí)現(xiàn) zk 的連接

   public ZookeeperRegistry(URL url, ZookeeperTransporter zookeeperTransporter) {
        super(url);
        if (url.isAnyHost()) {
            throw new IllegalStateException("registry address == null");
        }
        //獲得 group 名稱
        String group = url.getParameter(GROUP_KEY, DEFAULT_ROOT);
        if (!group.startsWith(PATH_SEPARATOR)) {
            group = PATH_SEPARATOR + group;
        }
        this.root = group;
        //產(chǎn)生一個(gè) zookeeper 連接
        zkClient = zookeeperTransporter.connect(url);
        //添加 zookeeper 狀態(tài)變化事件
        zkClient.addStateListener(state -> {
            if (state == StateListener.RECONNECTED) {
                try {
                    recover();
                } catch (Exception e) {
                    logger.error(e.getMessage(), e);
                }
            }
        });
    }

再回到RegistryProtocol.export方法,接著往下分析

 // 根據(jù) invoker 中的 url 獲取 Registry 實(shí)例: zookeeperRegistry
        final Registry registry = getRegistry(originInvoker);
        //獲取要注冊(cè)到注冊(cè)中心的 URL: dubbo://ip:port
        final URL registeredProviderUrl = getRegisteredProviderUrl(providerUrl, registryUrl);
        ProviderInvokerWrapper<T> providerInvokerWrapper = ProviderConsumerRegTable.registerProvider(originInvoker,
                registryUrl, registeredProviderUrl);
        //to judge if we need to delay publish
        boolean register = registeredProviderUrl.getParameter("register", true);
        if (register) {{//是否配置了注冊(cè)中心吼野,如果是校哎, 則需要注冊(cè)
            //注冊(cè)到注冊(cè)中心的 URL
            register(registryUrl, registeredProviderUrl);
            providerInvokerWrapper.setReg(true);
        }

繼續(xù)分析 register(registryUrl, registeredProviderUrl); 方法

    public void register(URL registryUrl, URL registeredProviderUrl) {
        Registry registry = registryFactory.getRegistry(registryUrl);
        registry.register(registeredProviderUrl);
    }

registry.register(registedProviderUrl);

繼續(xù)往下分析,會(huì)調(diào)用 registry.register 去講 dubbo://的協(xié)議地址注冊(cè)到 zookeeper 上
這個(gè)方法會(huì)調(diào)用 FailbackRegistry 類中的 register. 為什么呢瞳步?因?yàn)?ZookeeperRegistry 這個(gè)類中并沒有 register 這個(gè)方法闷哆,但是他的父類 FailbackRegistry 中存在 register 方法,而這個(gè)類又重寫了 AbstractRegistry 類中的 register 方法单起。所以我們可以直接定位到 FailbackRegistry 這個(gè)類中的 register 方法中

FailbackRegistry.register

  @Override
    public void register(URL url) {
        super.register(url);
        removeFailedRegistered(url);
        removeFailedUnregistered(url);
        try {
            // Sending a registration request to the server side
            // 調(diào)用子類實(shí)現(xiàn)真正的服務(wù)注冊(cè)抱怔,把 url 注冊(cè)到 zk 上
            doRegister(url);
        } catch (Exception e) {
            Throwable t = e;

            // If the startup detection is opened, the Exception is thrown directly.
            // 如果開啟了啟動(dòng)時(shí)檢測(cè),則直接拋出異常
            boolean check = getUrl().getParameter(Constants.CHECK_KEY, true)
                    && url.getParameter(Constants.CHECK_KEY, true)
                    && !CONSUMER_PROTOCOL.equals(url.getProtocol());
            boolean skipFailback = t instanceof SkipFailbackWrapperException;
            if (check || skipFailback) {
                if (skipFailback) {
                    t = t.getCause();
                }
                throw new IllegalStateException("Failed to register " + url + " to registry " + getUrl().getAddress() + ", cause: " + t.getMessage(), t);
            } else {
                logger.error("Failed to register " + url + ", waiting for retry, cause: " + t.getMessage(), t);
            }

            // Record a failed registration request to a failed list, retry regularly
            // 將失敗的注冊(cè)請(qǐng)求記錄到失敗列表嘀倒,定時(shí)重試
            addFailedRegistered(url);
        }
    }

1.FailbackRegistry屈留,從名字上來看,是一個(gè)失敗重試機(jī)制
2.調(diào)用父類的 register 方法测蘑,講當(dāng)前 url 添加到緩存集合中

調(diào)用 doRegister 方法灌危,這個(gè)方法很明顯,是一個(gè)抽象方法碳胳,會(huì)由 ZookeeperRegistry 子類實(shí)現(xiàn)乍狐。

ZookeeperRegistry.doRegister

最終調(diào)用 curator 的客戶端把服務(wù)地址注冊(cè)到 zk

   @Override
    public void doRegister(URL url) {
        try {
            zkClient.create(toUrlPath(url), url.getParameter(DYNAMIC_KEY, true));
        } catch (Throwable e) {
            throw new RpcException("Failed to register " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
        }
    }

至此服務(wù)注冊(cè)也分析完成。

——學(xué)自咕泡學(xué)院

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末固逗,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子藕帜,更是在濱河造成了極大的恐慌烫罩,老刑警劉巖,帶你破解...
    沈念sama閱讀 211,194評(píng)論 6 490
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件洽故,死亡現(xiàn)場(chǎng)離奇詭異贝攒,居然都是意外死亡,警方通過查閱死者的電腦和手機(jī)时甚,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,058評(píng)論 2 385
  • 文/潘曉璐 我一進(jìn)店門隘弊,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人荒适,你說我怎么就攤上這事梨熙。” “怎么了刀诬?”我有些...
    開封第一講書人閱讀 156,780評(píng)論 0 346
  • 文/不壞的土叔 我叫張陵咽扇,是天一觀的道長(zhǎng)。 經(jīng)常有香客問我,道長(zhǎng)质欲,這世上最難降的妖魔是什么树埠? 我笑而不...
    開封第一講書人閱讀 56,388評(píng)論 1 283
  • 正文 為了忘掉前任,我火速辦了婚禮嘶伟,結(jié)果婚禮上怎憋,老公的妹妹穿的比我還像新娘。我一直安慰自己九昧,他們只是感情好绊袋,可當(dāng)我...
    茶點(diǎn)故事閱讀 65,430評(píng)論 5 384
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著耽装,像睡著了一般愤炸。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上掉奄,一...
    開封第一講書人閱讀 49,764評(píng)論 1 290
  • 那天规个,我揣著相機(jī)與錄音,去河邊找鬼姓建。 笑死诞仓,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的速兔。 我是一名探鬼主播墅拭,決...
    沈念sama閱讀 38,907評(píng)論 3 406
  • 文/蒼蘭香墨 我猛地睜開眼,長(zhǎng)吁一口氣:“原來是場(chǎng)噩夢(mèng)啊……” “哼涣狗!你這毒婦竟也來了谍婉?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 37,679評(píng)論 0 266
  • 序言:老撾萬榮一對(duì)情侶失蹤镀钓,失蹤者是張志新(化名)和其女友劉穎穗熬,沒想到半個(gè)月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體丁溅,經(jīng)...
    沈念sama閱讀 44,122評(píng)論 1 303
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡唤蔗,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,459評(píng)論 2 325
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了窟赏。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片妓柜。...
    茶點(diǎn)故事閱讀 38,605評(píng)論 1 340
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖涯穷,靈堂內(nèi)的尸體忽然破棺而出棍掐,到底是詐尸還是另有隱情,我是刑警寧澤拷况,帶...
    沈念sama閱讀 34,270評(píng)論 4 329
  • 正文 年R本政府宣布塌衰,位于F島的核電站诉稍,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏最疆。R本人自食惡果不足惜杯巨,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,867評(píng)論 3 312
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望努酸。 院中可真熱鬧服爷,春花似錦、人聲如沸获诈。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,734評(píng)論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽舔涎。三九已至笼踩,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間亡嫌,已是汗流浹背嚎于。 一陣腳步聲響...
    開封第一講書人閱讀 31,961評(píng)論 1 265
  • 我被黑心中介騙來泰國(guó)打工, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留挟冠,地道東北人于购。 一個(gè)月前我還...
    沈念sama閱讀 46,297評(píng)論 2 360
  • 正文 我出身青樓,卻偏偏與公主長(zhǎng)得像知染,于是被迫代替她去往敵國(guó)和親肋僧。 傳聞我的和親對(duì)象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 43,472評(píng)論 2 348

推薦閱讀更多精彩內(nèi)容