Dubbo 服務(wù)發(fā)布流程解析

本文將詳細分析Dubbo的服務(wù)發(fā)布流程自晰,建議結(jié)合文章Dubbo SPI 機制解析一起閱讀题翻。

在開始分析之前排宰,有必須熟悉一下Dubbo源碼的目錄結(jié)構(gòu)褐鸥,以及各模塊的功能线脚。


模塊說明:
dubbo-common 公共邏輯模塊:包括 Util 類和通用模型。
dubbo-remoting 遠程通訊模塊:相當于 Dubbo 協(xié)議的實現(xiàn)叫榕,如果 RPC 用 RMI協(xié)議則不需要使用此包浑侥。
dubbo-rpc 遠程調(diào)用模塊:抽象各種協(xié)議,以及動態(tài)代理晰绎,只包含一對一的調(diào)用寓落,不關(guān)心集群的管理。
dubbo-cluster 集群模塊:將多個服務(wù)提供方偽裝為一個提供方荞下,包括:負載均衡, 容錯伶选,路由等史飞,集群的地址列表可以是靜態(tài)配置的,也可以是由注冊中心下發(fā)考蕾。
dubbo-registry 注冊中心模塊:基于注冊中心下發(fā)地址的集群方式祸憋,以及對各種注冊中心的抽象。
dubbo-monitor 監(jiān)控模塊:統(tǒng)計服務(wù)調(diào)用次數(shù)肖卧,調(diào)用時間的蚯窥,調(diào)用鏈跟蹤的服務(wù)。
dubbo-config 配置模塊:是 Dubbo 對外的 API塞帐,用戶通過 Config 使用Dubbo拦赠,隱藏 Dubbo 所有細節(jié)。
dubbo-container 容器模塊:是一個 Standlone 的容器葵姥,以簡單的 Main 加載 Spring 啟動荷鼠,因為服務(wù)通常不需要 Tomcat/JBoss 等 Web 容器的特性,沒必要用 Web 容器去加載服務(wù)榔幸。

Spring 對外留出的擴展

dubbo是基于spring 配置來實現(xiàn)服務(wù)的發(fā)布的允乐,那么一定是基于spring的擴展來寫了一套自己的標簽。在dubbo配置文件中看到的<dubbo:service> 削咆,就是屬于自定義擴展標簽牍疏。

要實現(xiàn)自定義擴展,有三個步驟(在spring中定義了兩個接口拨齐,用來實現(xiàn)擴展)
1.NamespaceHandler: 注冊一堆BeanDefinitionParser鳞陨,利用他們來進行解析
2.BeanDefinitionParser:用于解析每個element的內(nèi)容
3.Spring默認會加載jar包下的META-INF/spring.handlers文件尋找對應(yīng)的NamespaceHandler。

以下是Dubbo-config模塊下dubbo-config-spring的配置:



也就是說會通過DubboNamespaceHandler去解析dubbo自定義的標簽瞻惋。DubboBeanDefinitionParser用于把不同的配置分別轉(zhuǎn)化成spring容器中的bean對象厦滤。

public class DubboNamespaceHandler extends NamespaceHandlerSupport {

    static {
        Version.checkDuplicate(DubboNamespaceHandler.class);
    }

    public void init() {
        registerBeanDefinitionParser("application", new DubboBeanDefinitionParser(ApplicationConfig.class, true));
        registerBeanDefinitionParser("module", new DubboBeanDefinitionParser(ModuleConfig.class, true));
        registerBeanDefinitionParser("registry", new DubboBeanDefinitionParser(RegistryConfig.class, true));
        registerBeanDefinitionParser("monitor", new DubboBeanDefinitionParser(MonitorConfig.class, true));
        registerBeanDefinitionParser("provider", new DubboBeanDefinitionParser(ProviderConfig.class, true));
        registerBeanDefinitionParser("consumer", new DubboBeanDefinitionParser(ConsumerConfig.class, true));
        registerBeanDefinitionParser("protocol", new DubboBeanDefinitionParser(ProtocolConfig.class, true));
        registerBeanDefinitionParser("service", new DubboBeanDefinitionParser(ServiceBean.class, true));
        registerBeanDefinitionParser("reference", new DubboBeanDefinitionParser(ReferenceBean.class, false));
        registerBeanDefinitionParser("annotation", new DubboBeanDefinitionParser(AnnotationBean.class, true));
    }

}

為了在spring啟動的時候,也相應(yīng)的啟動了發(fā)布服務(wù)和注冊服務(wù)的過程歼狼,而同時為了讓客戶端在啟動的時候自動訂閱發(fā)現(xiàn)服務(wù)掏导,加入了兩個bean ServiceBean、ReferenceBean羽峰。分別繼承了ServiceConfig和ReferenceConfig趟咆。并分別實現(xiàn)了InitializingBean、DisposableBean, ApplicationContextAware, ApplicationListener, BeanNameAware限寞。

InitializingBean為bean提供了初始化方法的方式忍啸,它只包括afterPropertiesSet方法,凡是繼承該接口的類履植,在初始化bean的時候會執(zhí)行該方法计雌。
DisposableBean bean被銷毀的時候,spring容器會自動執(zhí)行destory方法玫霎,比如釋放資源凿滤。
ApplicationContextAware 實現(xiàn)了這個接口的bean妈橄,當spring容器初始化的時候,會自動的將ApplicationContext注入進來翁脆。
ApplicationListener ApplicationEvent事件監(jiān)聽眷蚓,spring容器啟動后會發(fā)一個事件通知。
BeanNameAware 獲得自身初始化時反番,本身的bean的id屬性沙热。

由此可以看出,Dubbo 的服務(wù)發(fā)布流程的實現(xiàn)思路是:
1.利用spring的解析收集xml中的配置信息罢缸,然后把這些配置信息存儲到serviceConfig中篙贸。
2.調(diào)用ServiceConfig的export方法來進行服務(wù)的發(fā)布和注冊。

Spring容器初始化調(diào)用

當Spring容器實例化bean完成枫疆,走到最后一步發(fā)布ContextRefreshEvent事件的時候爵川,ServiceBean會執(zhí)行onApplicationEvent方法,該方法調(diào)用ServiceConfig的export方法息楔。

ServiceConfig初始化的時候寝贡,會先初始化靜態(tài)變量protocol和proxyFactory,這兩個變量初始化的結(jié)果是通過dubbo的spi擴展機制得到的值依。

生成的protocol實例是:

package com.alibaba.dubbo.rpc;
import com.alibaba.dubbo.common.extension.ExtensionLoader;

public class Protocol$Adpative implements com.alibaba.dubbo.rpc.Protocol {
    public com.alibaba.dubbo.rpc.Invoker refer(java.lang.Class arg0, com.alibaba.dubbo.common.URL arg1) throws java.lang.Class {
        if (arg1 == null) 
            throw new IllegalArgumentException("url == null");

        com.alibaba.dubbo.common.URL url = arg1;
        String extName = ( url.getProtocol() == null ? "dubbo" : url.getProtocol() );
        if(extName == null) 
            throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.Protocol) name from url(" + url.toString() + ") use keys([protocol])");

        com.alibaba.dubbo.rpc.Protocol extension = (com.alibaba.dubbo.rpc.Protocol)ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.Protocol.class).getExtension(extName);

        return extension.refer(arg0, arg1);
    }

    public com.alibaba.dubbo.rpc.Exporter export(com.alibaba.dubbo.rpc.Invoker arg0) throws com.alibaba.dubbo.rpc.Invoker {
        if (arg0 == null) 
            throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument == null");

        if (arg0.getUrl() == null) 
            throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument getUrl() == null");com.alibaba.dubbo.common.URL url = arg0.getUrl();
        //根據(jù)URL配置信息獲取Protocol協(xié)議圃泡,默認是dubbo
        String extName = ( url.getProtocol() == null ? "dubbo" : url.getProtocol() );
        if(extName == null) 
            throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.Protocol) name from url(" + url.toString() + ") use keys([protocol])");
            //根據(jù)協(xié)議名,獲取Protocol的實現(xiàn)
            //獲得Protocol的實現(xiàn)過程中鳞滨,會對Protocol先進行依賴注入洞焙,然后進行Wrapper包裝蟆淀,最后返回被修改過的Protocol
            //包裝經(jīng)過了ProtocolFilterWrapper拯啦,ProtocolListenerWrapper,RegistryProtocol
        com.alibaba.dubbo.rpc.Protocol extension = (com.alibaba.dubbo.rpc.Protocol)ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.Protocol.class).getExtension(extName);

        return extension.export(arg0);
    }

    public void destroy() {
        throw new UnsupportedOperationException("method public abstract void com.alibaba.dubbo.rpc.Protocol.destroy() of interface com.alibaba.dubbo.rpc.Protocol is not adaptive method!");
    }

    public int getDefaultPort() {
        throw new UnsupportedOperationException("method public abstract int com.alibaba.dubbo.rpc.Protocol.getDefaultPort() of interface com.alibaba.dubbo.rpc.Protocol is not adaptive method!");
    }
}

生成的proxyFactory實例:

package com.alibaba.dubbo.rpc;
import com.alibaba.dubbo.common.extension.ExtensionLoader;
public class ProxyFactory$Adpative implements com.alibaba.dubbo.rpc.ProxyFactory {
    public com.alibaba.dubbo.rpc.Invoker getInvoker(java.lang.Object arg0, java.lang.Class arg1, com.alibaba.dubbo.common.URL arg2) throws java.lang.Object {
        if (arg2 == null) 
            throw new IllegalArgumentException("url == null");

        com.alibaba.dubbo.common.URL url = arg2;
        String extName = url.getParameter("proxy", "javassist");
        if(extName == null) 
            throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.ProxyFactory) name from url(" + url.toString() + ") use keys([proxy])");

        com.alibaba.dubbo.rpc.ProxyFactory extension = (com.alibaba.dubbo.rpc.ProxyFactory)ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.ProxyFactory.class).getExtension(extName);

        return extension.getInvoker(arg0, arg1, arg2);
    }

    public java.lang.Object getProxy(com.alibaba.dubbo.rpc.Invoker arg0) throws com.alibaba.dubbo.rpc.Invoker {
        if (arg0 == null) 
            throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument == null");

       if (arg0.getUrl() == null) 
        throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument getUrl() == null");com.alibaba.dubbo.common.URL url = arg0.getUrl();

        String extName = url.getParameter("proxy", "javassist");
        if(extName == null) 
            throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.ProxyFactory) name from url(" + url.toString() + ") use keys([proxy])");

        com.alibaba.dubbo.rpc.ProxyFactory extension = (com.alibaba.dubbo.rpc.ProxyFactory)ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.ProxyFactory.class).getExtension(extName);

        return extension.getProxy(arg0);
    }
}

生成的代碼中可以看到熔任,默認的Protocol實現(xiàn)是dubbo褒链,默認的proxy是javassist。

ServiceConfig的export

export的步驟簡介

  • 首先會檢查各種配置信息疑苔,填充各種屬性甫匹,總之就是保證我在開始暴露服務(wù)之前,所有的東西都準備好了惦费,并且是正確的兵迅。
  • 加載所有的注冊中心,因為我們暴露服務(wù)需要注冊到注冊中心中去薪贫。
  • 根據(jù)配置的所有協(xié)議和注冊中心url分別進行導(dǎo)出恍箭。
  • 進行導(dǎo)出的時候,又是一波屬性的獲取設(shè)置檢查等操作瞧省。
  • 如果配置的不是remote扯夭,則做本地導(dǎo)出鳍贾。
  • 如果配置的不是local,則暴露為遠程服務(wù)荧呐。
  • 不管是本地還是遠程服務(wù)暴露愁拭,首先都會獲取Invoker册养。
  • 獲取完Invoker之后,轉(zhuǎn)換成對外的Exporter咆爽,緩存起來。

export方法先判斷是否需要延遲暴露(Thread.sleep(delay))置森,然后執(zhí)行doExport方法伍掀。

 public synchronized void export() {
        if (provider != null) {
            if (export == null) {
                export = provider.getExport();
            }
            if (delay == null) {
                delay = provider.getDelay();
            }
        }
 if (delay != null && delay > 0) {
            Thread thread = new Thread(new Runnable() {
                public void run() {
                    try {
                        Thread.sleep(delay);
                    } catch (Throwable e) {
                    }
                    doExport();
                }
            });
            thread.setDaemon(true);
            thread.setName("DelayExportServiceThread");
            thread.start();
        } else {
            doExport();
        }

doExport方法先執(zhí)行一系列的檢查方法,然后調(diào)用doExportUrls方法暇藏。檢查方法會檢測dubbo的配置是否在Spring配置文件中聲明蜜笤,沒有的話讀取properties文件初始化。

doExportUrls方法先調(diào)用loadRegistries獲取所有的注冊中心url盐碱,然后遍歷調(diào)用doExportUrlsFor1Protocol方法把兔。對于在標簽中指定了registry屬性的Bean,會在加載BeanDefinition的時候就加載了注冊中心瓮顽。

獲取注冊中心url县好,會把注冊的信息都放在一個URL對象中,一個URL內(nèi)容如下:

registry://127.0.0.1:2181/com.alibaba.dubbo.registry.RegistryService?application=dubbo-provider&application.version=1.0&dubbo=2.5.3&environment=product&organization=&owner=&pid=2939&registry=zookeeper&timestamp=1488898049284

doExportUrlsFor1Protocol根據(jù)不同的協(xié)議將服務(wù)以URL形式暴露暖混。如果scope配置為none則不暴露缕贡,如果服務(wù)未配置成remote,則本地暴露exportLocal拣播,如果未配置成local晾咪,則注冊服務(wù)registryProcotol。

這里的URL是:

dubbo://192.168.1.100:20880/dubbo.common.hello.service.HelloService?anyhost=true&application=dubbo-provider&application.version=1.0&delay=5000&dubbo=2.5.3&environment=product&interface=dubbo.common.hello.service.HelloService&methods=sayHello&organization=&owner=&pid=2939&side=provider&timestamp=1488898464953

本地暴露

這時候會先做本地暴露贮配,exportLocal(url):

private void exportLocal(URL url) {
    if (!Constants.LOCAL_PROTOCOL.equalsIgnoreCase(url.getProtocol())) {
        //這時候轉(zhuǎn)成本地暴露的url:injvm://127.0.0.1/dubbo.common.hello.service.HelloService?anyhost=true&
        //application=dubbo-provider&application.version=1.0&dubbo=2.5.3&environment=product&
        //interface=dubbo.common.hello.service.HelloService&methods=sayHello
        URL local = URL.valueOf(url.toFullString())
                .setProtocol(Constants.LOCAL_PROTOCOL)
                .setHost(NetUtils.LOCALHOST)
                .setPort(0);
        //首先還是先獲得Invoker
        //然后導(dǎo)出成Exporter谍倦,并緩存
        //這里的proxyFactory實際是JavassistProxyFactory
        //有關(guān)詳細的獲得Invoke以及exporter會在下面的流程解析,在本地暴露這個流程就不再說明泪勒。
        Exporter<?> exporter = protocol.export(
                proxyFactory.getInvoker(ref, (Class) interfaceClass, local));
        exporters.add(exporter);
        logger.info("Export dubbo service " + interfaceClass.getName() +" to local registry");
    }
}

暴露為遠程服務(wù)

接下來是暴露為遠程服務(wù)昼蛀,跟本地暴露的流程一樣還是先獲取Invoker,然后導(dǎo)出成Exporter:

//根據(jù)服務(wù)具體實現(xiàn)圆存,實現(xiàn)接口叼旋,以及registryUrl通過ProxyFactory將HelloServiceImpl封裝成一個本地執(zhí)行的Invoker
//invoker是對具體實現(xiàn)的一種代理。
//這里proxyFactory是上面列出的生成的代碼
 Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString()));
 //使用Protocol將invoker導(dǎo)出成一個Exporter
 //暴露封裝服務(wù)invoker
 //調(diào)用Protocol生成的適配類的export方法
 //這里的protocol是上面列出的生成的代碼
 Exporter<?> exporter = protocol.export(invoker);

關(guān)于Invoker沦辙,Exporter等的解釋參見最下面的內(nèi)容夫植。

暴露遠程服務(wù)時的獲取Invoker過程

服務(wù)實現(xiàn)類轉(zhuǎn)換成Invoker,大概的步驟是:

  • 根據(jù)上面生成的proxyFactory方法調(diào)用具體的ProxyFactory實現(xiàn)類的getInvoker方法獲取Invoker怕轿。
  • getInvoker的過程是偷崩,首先對實現(xiàn)類做一個包裝辟拷,生成一個包裝后的類。
  • 然后新創(chuàng)建一個Invoker實例阐斜,這個Invoker中包含著生成的Wrapper類衫冻,Wrapper類中有具體的實現(xiàn)類。
Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString()));

這行代碼中包含服務(wù)實現(xiàn)類轉(zhuǎn)換成Invoker的過程谒出,其中proxyFactory是上面列出的動態(tài)生成的代碼隅俘,其中g(shù)etInvoker的代碼為:

public Invoker getInvoker(Object arg0, Class arg1, URL arg2) throws Object {
    if (arg2 == null)  throw new IllegalArgumentException("url == null");
    //傳進來的url是dubbo://192.168.110.197:20880/dubbo.common.hello.service.HelloService?anyhost=true&application=dubbo-provider
 //&application.version=1.0&dubbo=2.5.3&environment=product&interface=dubbo.common.hello.service.HelloService&methods=sayHello
    URL url = arg2;
    //沒有proxy參數(shù)配置,默認使用javassist
    String extName = url.getParameter("proxy", "javassist");
    if(extName == null)  throw new IllegalStateException("Fail to get extension(ProxyFactory) name from url(" + url.toString() + ") use keys([proxy])");
    //這一步就使用javassist來獲取ProxyFactory的實現(xiàn)類JavassistProxyFactory
    ProxyFactory extension = (ProxyFactory)ExtensionLoader.getExtensionLoader(ProxyFactory.class).getExtension(extName);
    //JavassistProxyFactory的getInvoker方法
    return extension.getInvoker(arg0, arg1, arg2);
}

使用JavassistProxyFactory獲取Invoker

JavassistProxyFactory的getInvoker方法:

public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {
    // TODO Wrapper類不能正確處理帶$的類名
    //第一步封裝一個Wrapper類
    //該類是手動生成的
    //如果類是以$開頭笤喳,就使用接口類型獲取为居,其他的使用實現(xiàn)類獲取
    final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type);
    //返回一個Invoker實例,doInvoke方法中直接返回上面wrapper的invokeMethod
    //關(guān)于生成的wrapper杀狡,請看下面列出的生成的代碼蒙畴,其中invokeMethod方法中就有實現(xiàn)類對實際方法的調(diào)用
    return new AbstractProxyInvoker<T>(proxy, type, url) {
        @Override
        protected Object doInvoke(T proxy, String methodName, 
                                  Class<?>[] parameterTypes, 
                                  Object[] arguments) throws Throwable {
            return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);
        }
    };
}

生成wrapper類的過程,首先看getWrapper方法:

public static Wrapper getWrapper(Class<?> c){
    while( ClassGenerator.isDynamicClass(c) ) // can not wrapper on dynamic class.
        c = c.getSuperclass();
    //Object類型的
    if( c == Object.class )
        return OBJECT_WRAPPER;
    //先去Wrapper緩存中查找
    Wrapper ret = WRAPPER_MAP.get(c);
    if( ret == null ) {
        //緩存中不存在呜象,生成Wrapper類膳凝,放到緩存
        ret = makeWrapper(c);
        WRAPPER_MAP.put(c,ret);
    }
    return ret;
}

makeWrapper方法代碼不在列出,太長了恭陡。就是生成一個繼承自Wrapper的類蹬音,最后的結(jié)果大概是:

public class Wrapper1 extends Wrapper {
    public static String[] pns;
    public static Map pts;
    public static String[] mns; // all method name array.
    public static String[] dmns;
    public static Class[] mts0;

    public String[] getPropertyNames() {
        return pns;
    }

    public boolean hasProperty(String n) {
        return pts.containsKey($1);
    }

    public Class getPropertyType(String n) {
        return (Class) pts.get($1);
    }

    public String[] getMethodNames() {
        return mns;
    }

    public String[] getDeclaredMethodNames() {
        return dmns;
    }

    public void setPropertyValue(Object o, String n, Object v) {
        dubbo.provider.hello.service.impl.HelloServiceImpl w;
        try {
            w = ((dubbo.provider.hello.service.impl.HelloServiceImpl) $1);
        } catch (Throwable e) {
            throw new IllegalArgumentException(e);
        }
        throw new com.alibaba.dubbo.common.bytecode.NoSuchPropertyException("Not found property \"" + $2 + "\" filed or setter method in class dubbo.provider.hello.service.impl.HelloServiceImpl.");
    }

    public Object getPropertyValue(Object o, String n) {
        dubbo.provider.hello.service.impl.HelloServiceImpl w;
        try {
            w = ((dubbo.provider.hello.service.impl.HelloServiceImpl) $1);
        } catch (Throwable e) {
            throw new IllegalArgumentException(e);
        }
        throw new com.alibaba.dubbo.common.bytecode.NoSuchPropertyException("Not found property \"" + $2 + "\" filed or setter method in class dubbo.provider.hello.service.impl.HelloServiceImpl.");
    }

    public Object invokeMethod(Object o, String n, Class[] p, Object[] v) throws java.lang.reflect.InvocationTargetException {
        dubbo.provider.hello.service.impl.HelloServiceImpl w;
        try {
            w = ((dubbo.provider.hello.service.impl.HelloServiceImpl) $1);
        } catch (Throwable e) {
            throw new IllegalArgumentException(e);
        }
        try {
            if ("sayHello".equals($2) && $3.length == 0) {
                w.sayHello();
                return null;
            }
        } catch (Throwable e) {
            throw new java.lang.reflect.InvocationTargetException(e);
        }
        throw new com.alibaba.dubbo.common.bytecode.NoSuchMethodException("Not found method \"" + $2 + "\" in class dubbo.provider.hello.service.impl.HelloServiceImpl.");
    }
}

生成完Wrapper以后,返回一個AbstractProxyInvoker實例休玩。至此生成Invoker的步驟就完成了著淆。可以看到Invoker執(zhí)行方法的時候拴疤,會調(diào)用Wrapper的invoke()永部,這個方法中會有真實的實現(xiàn)類調(diào)用真實方法的代碼。

使用JdkProxyFactory獲取invoker

JdkProxyFactory的getInvoker方法

public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {
    return new AbstractProxyInvoker<T>(proxy, type, url) {
        @Override
        protected Object doInvoke(T proxy, String methodName, 
                                  Class<?>[] parameterTypes, 
                                  Object[] arguments) throws Throwable {
            Method method = proxy.getClass().getMethod(methodName, parameterTypes);
            return method.invoke(proxy, arguments);
        }
    };
}

直接返回一個AbstractProxyInvoker實例遥赚,沒有做處理扬舒,只是使用反射調(diào)用具體的方法阐肤。

JdkProxyFactory的getProxy方法:

public <T> T getProxy(Invoker<T> invoker, Class<?>[] interfaces) {
    return (T) Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(), interfaces, new InvokerInvocationHandler(invoker));
}

使用Java的反射機制生成一個代理類凫佛。

暴露遠程服務(wù)時導(dǎo)出Invoker為Exporter

Invoker導(dǎo)出為Exporter分為兩種情況,第一種是Registry類型的Invoker孕惜,第二種是其他協(xié)議類型的Invoker愧薛,分開解析。

代碼入口:

Exporter<?> exporter = protocol.export(invoker);

Registry類型的Invoker處理過程

大概的步驟是:

  • 經(jīng)過兩個不用做任何處理的Wrapper類衫画,然后到達RegistryProtocol中毫炉。
  • 通過具體的協(xié)議導(dǎo)出Invoker為Exporter。
  • 注冊服務(wù)到注冊中心削罩。
  • 訂閱注冊中心的服務(wù)瞄勾。
  • 生成一個新的Exporter實例费奸,將上面的Exporter進行引入,然后返回进陡。
    protocol是上面列出的動態(tài)生成的代碼愿阐,會先調(diào)用ProtocolListenerWrapper,這個Wrapper負責初始化暴露和引用服務(wù)的監(jiān)聽器趾疚。對于Registry類型的不做處理缨历,代碼如下:
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
    //registry類型的Invoker,不需要做處理
    if (Constants.REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol())) {
        return protocol.export(invoker);
    }
    //非Registry類型的Invoker糙麦,需要被監(jiān)聽器包裝
    return new ListenerExporterWrapper<T>(protocol.export(invoker), 
            Collections.unmodifiableList(ExtensionLoader.getExtensionLoader(ExporterListener.class)
                    .getActivateExtension(invoker.getUrl(), Constants.EXPORTER_LISTENER_KEY)));
}

接著調(diào)用ProtocolFilterWrapper中的export方法辛孵,ProtocolFilterWrapper負責初始化invoker所有的Filter。這個類非常重要赡磅,dubbo機制里面日志記錄魄缚、超時等等功能都是在這一部分實現(xiàn)的。這個類有3個特點:
1.它有一個參數(shù)為Protocol protocol的構(gòu)造函數(shù)焚廊;
2.它實現(xiàn)了Protocol接口鲜滩;
3.它使用責任鏈模式,對export和refer函數(shù)進行了封裝节值。

echo=com.alibaba.dubbo.rpc.filter.EchoFilter
generic=com.alibaba.dubbo.rpc.filter.GenericFilter
genericimpl=com.alibaba.dubbo.rpc.filter.GenericImplFilter
token=com.alibaba.dubbo.rpc.filter.TokenFilter
accesslog=com.alibaba.dubbo.rpc.filter.AccessLogFilter
activelimit=com.alibaba.dubbo.rpc.filter.ActiveLimitFilter
classloader=com.alibaba.dubbo.rpc.filter.ClassLoaderFilter
context=com.alibaba.dubbo.rpc.filter.ContextFilter
consumercontext=com.alibaba.dubbo.rpc.filter.ConsumerContextFilter
exception=com.alibaba.dubbo.rpc.filter.ExceptionFilter
executelimit=com.alibaba.dubbo.rpc.filter.ExecuteLimitFilter
deprecated=com.alibaba.dubbo.rpc.filter.DeprecatedFilter
compatible=com.alibaba.dubbo.rpc.filter.CompatibleFilter
timeout=com.alibaba.dubbo.rpc.filter.TimeoutFilter

這其中涉及到很多功能徙硅,包括權(quán)限驗證、異常搞疗、超時嗓蘑、計算調(diào)用時間等都在這些類實現(xiàn)。

public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
    //Registry類型的Invoker不做處理
    if (Constants.REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol())) {
        return protocol.export(invoker);
    }
    //非Registry類型的Invoker需要先構(gòu)建調(diào)用鏈匿乃,然后再導(dǎo)出
    return protocol.export(buildInvokerChain(invoker, Constants.SERVICE_FILTER_KEY, Constants.PROVIDER));
}

這里我們先解析的是Registry類型的Invoker桩皿,接著就會調(diào)用RegistryProtocol的export方法,RegistryProtocol負責注冊服務(wù)到注冊中心和向注冊中心訂閱服務(wù)幢炸。代碼如下:

public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
    //export invoker
    //這里就交給了具體的協(xié)議去暴露服務(wù)(先不解析泄隔,留在后面,可以先去后面看下導(dǎo)出過程)
    final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker);
    //registry provider
    //根據(jù)invoker中的url獲取Registry實例
    //并且連接到注冊中心
    //此時提供者作為消費者引用注冊中心核心服務(wù)RegistryService
    final Registry registry = getRegistry(originInvoker);
    //注冊到注冊中心的URL
    final URL registedProviderUrl = getRegistedProviderUrl(originInvoker);
    //調(diào)用遠端注冊中心的register方法進行服務(wù)注冊
    //若有消費者訂閱此服務(wù)宛徊,則推送消息讓消費者引用此服務(wù)佛嬉。
    //注冊中心緩存了所有提供者注冊的服務(wù)以供消費者發(fā)現(xiàn)。
    registry.register(registedProviderUrl);
    // 訂閱override數(shù)據(jù)
    // FIXME 提供者訂閱時闸天,會影響同一JVM即暴露服務(wù)暖呕,又引用同一服務(wù)的的場景,因為subscribed以服務(wù)名為緩存的key苞氮,導(dǎo)致訂閱信息覆蓋湾揽。
    final URL overrideSubscribeUrl = getSubscribedOverrideUrl(registedProviderUrl);
    final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl);
    overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);
    //提供者向注冊中心訂閱所有注冊服務(wù)的覆蓋配置
    //當注冊中心有此服務(wù)的覆蓋配置注冊進來時,推送消息給提供者,重新暴露服務(wù)库物,這由管理頁面完成霸旗。
    registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);
    //保證每次export都返回一個新的exporter實例
    //返回暴露后的Exporter給上層ServiceConfig進行緩存,便于后期撤銷暴露戚揭。
    return new Exporter<T>() {
        public Invoker<T> getInvoker() {
            return exporter.getInvoker();
        }
        public void unexport() {
            try {
                exporter.unexport();
            } catch (Throwable t) {
                logger.warn(t.getMessage(), t);
            }
            try {
                registry.unregister(registedProviderUrl);
            } catch (Throwable t) {
                logger.warn(t.getMessage(), t);
            }
            try {
                overrideListeners.remove(overrideSubscribeUrl);
                registry.unsubscribe(overrideSubscribeUrl, overrideSubscribeListener);
            } catch (Throwable t) {
                logger.warn(t.getMessage(), t);
            }
        }
    };
}

交給具體的協(xié)議去暴露服務(wù)

先不解析定硝,留在后面,可以先去后面看下導(dǎo)出過程毫目,然后再回來接著看注冊到注冊中心的過程蔬啡。具體協(xié)議暴露服務(wù)主要是打開服務(wù)器和端口,進行監(jiān)聽镀虐。

連接注冊中心并獲取Registry實例

具體的協(xié)議進行暴露并且返回了一個ExporterChangeableWrapper之后箱蟆,接下來看下一步連接注冊中心并注冊到注冊中心,代碼是在RegistryProtocol的export方法:

//先假裝此步已經(jīng)分析完
final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker);
//得到具體的注冊中心刮便,連接注冊中心空猜,此時提供者作為消費者引用注冊中心核心服務(wù)RegistryService
final Registry registry = getRegistry(originInvoker);
final URL registedProviderUrl = getRegistedProviderUrl(originInvoker);
//調(diào)用遠端注冊中心的register方法進行服務(wù)注冊
//若有消費者訂閱此服務(wù),則推送消息讓消費者引用此服務(wù)
registry.register(registedProviderUrl);
//提供者向注冊中心訂閱所有注冊服務(wù)的覆蓋配置
registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);
//返回暴露后的Exporter給上層ServiceConfig進行緩存
return new Exporter<T>() {恨旱。辈毯。。}

getRegistry(originInvoker)方法:

//根據(jù)invoker的地址獲取registry實例
private Registry getRegistry(final Invoker<?> originInvoker){
    //獲取invoker中的registryUrl
    URL registryUrl = originInvoker.getUrl();
    if (Constants.REGISTRY_PROTOCOL.equals(registryUrl.getProtocol())) {
        //獲取registry的值搜贤,這里獲得是zookeeper谆沃,默認值是dubbo
        String protocol = registryUrl.getParameter(Constants.REGISTRY_KEY, Constants.DEFAULT_DIRECTORY);
        //這里獲取到的url為:
        //zookeeper://127.0.0.1:2181/com.alibaba.dubbo.registry.RegistryService?
        //application=dubbo-provider&application.version=1.0&dubbo=2.5.3&
        //environment=product&export=dubbo%3A%2F%2F192.168.1.100%3A20880%2F
        //dubbo.common.hello.service.HelloService%3Fanyhost%3Dtrue%26application%3Ddubbo-provider%26
        //application.version%3D1.0%26dubbo%3D2.5.3%26environment%3Dproduct%26
        //interface%3Ddubbo.common.hello.service.HelloService%26methods%3DsayHello%26
        //organization%3Dchina%26owner%3Dcheng.xi%26pid%3D9457%26side%3Dprovider%26timestamp%3D1489807681627
        registryUrl = registryUrl.setProtocol(protocol).removeParameter(Constants.REGISTRY_KEY);
    }
    //根據(jù)SPI機制獲取具體的Registry實例,這里獲取到的是ZookeeperRegistry
    return registryFactory.getRegistry(registryUrl);
}

這里的registryFactory是動態(tài)生成的代碼仪芒,如下:

import com.alibaba.dubbo.common.extension.ExtensionLoader;
public class RegistryFactory$Adpative implements com.alibaba.dubbo.registry.RegistryFactory {
    public com.alibaba.dubbo.registry.Registry getRegistry(com.alibaba.dubbo.common.URL arg0) {
    
        if (arg0 == null) throw new IllegalArgumentException("url == null");

        com.alibaba.dubbo.common.URL url = arg0;
        String extName = ( url.getProtocol() == null ? "dubbo" : url.getProtocol() );

        if(extName == null) throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.registry.RegistryFactory) name from url(" + url.toString() + ") use keys([protocol])");

        com.alibaba.dubbo.registry.RegistryFactory extension = (com.alibaba.dubbo.registry.RegistryFactory)ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.registry.RegistryFactory.class).getExtension(extName);

        return extension.getRegistry(arg0);
    }
}

所以這里registryFactory.getRegistry(registryUrl)用的是ZookeeperRegistryFactory唁影。

先看下getRegistry方法,會發(fā)現(xiàn)該方法會在AbstractRegistryFactory中實現(xiàn):

public Registry getRegistry(URL url) {
    url = url.setPath(RegistryService.class.getName())
            .addParameter(Constants.INTERFACE_KEY, RegistryService.class.getName())
            .removeParameters(Constants.EXPORT_KEY, Constants.REFER_KEY);
    //這里key為:
    //zookeeper://127.0.0.1:2181/com.alibaba.dubbo.registry.RegistryService
    String key = url.toServiceString();
    // 鎖定注冊中心獲取過程掂名,保證注冊中心單一實例
    LOCK.lock();
    try {
        //先從緩存中獲取Registry實例
        Registry registry = REGISTRIES.get(key);
        if (registry != null) {
            return registry;
        }
        //創(chuàng)建registry据沈,會直接new一個ZookeeperRegistry返回
        //具體創(chuàng)建實例是子類來實現(xiàn)的
        registry = createRegistry(url);
        if (registry == null) {
            throw new IllegalStateException("Can not create registry " + url);
        }
        //放到緩存中
        REGISTRIES.put(key, registry);
        return registry;
    } finally {
        // 釋放鎖
        LOCK.unlock();
    }
}

createRegistry(url);是在子類中實現(xiàn)的,這里是ZookeeperRegistry饺蔑,首先需要經(jīng)過AbstractRegistry的構(gòu)造:

public AbstractRegistry(URL url) {
    //url保存起來
    setUrl(url);
    // 啟動文件保存定時器
    //
    syncSaveFile = url.getParameter(Constants.REGISTRY_FILESAVE_SYNC_KEY, false);
    //保存的文件為:
    ///home/xxx/.dubbo/dubbo-registry-127.0.0.1.cache
    String filename = url.getParameter(Constants.FILE_KEY, System.getProperty("user.home") + "/.dubbo/dubbo-registry-" + url.getHost() + ".cache");
    File file = null;
    if (ConfigUtils.isNotEmpty(filename)) {
        file = new File(filename);
        if(! file.exists() && file.getParentFile() != null && ! file.getParentFile().exists()){
            if(! file.getParentFile().mkdirs()){
                throw new IllegalArgumentException("Invalid registry store file " + file + ", cause: Failed to create directory " + file.getParentFile() + "!");
            }
        }
    }
    this.file = file;
    //加載文件中的屬性
    loadProperties();
    //通知訂閱
    notify(url.getBackupUrls());
}

獲取Registry時的訂閱

notify()方法:

protected void notify(List<URL> urls) {
    if(urls == null || urls.isEmpty()) return;
    //getSubscribed()方法獲取訂閱者列表
    //訂閱者Entry里每個URL都對應(yīng)著n個NotifyListener
    for (Map.Entry<URL, Set<NotifyListener>> entry : getSubscribed().entrySet()) {
        URL url = entry.getKey();

        if(! UrlUtils.isMatch(url, urls.get(0))) {
            continue;
        }

        Set<NotifyListener> listeners = entry.getValue();
        if (listeners != null) {
            for (NotifyListener listener : listeners) {
                try {
                    //通知每個監(jiān)聽器
                    notify(url, listener, filterEmpty(url, urls));
                } catch (Throwable t) {}
            }
        }
    }
}

notify(url, listener, filterEmpty(url, urls));代碼:

protected void notify(URL url, NotifyListener listener, List<URL> urls) {
    Map<String, List<URL>> result = new HashMap<String, List<URL>>();
    for (URL u : urls) {
        if (UrlUtils.isMatch(url, u)) {
            //分類
            String category = u.getParameter(Constants.CATEGORY_KEY, Constants.DEFAULT_CATEGORY);
            List<URL> categoryList = result.get(category);
            if (categoryList == null) {
                categoryList = new ArrayList<URL>();
                result.put(category, categoryList);
            }
            categoryList.add(u);
        }
    }
    if (result.size() == 0) {
        return;
    }
    Map<String, List<URL>> categoryNotified = notified.get(url);
    if (categoryNotified == null) {
        notified.putIfAbsent(url, new ConcurrentHashMap<String, List<URL>>());
        categoryNotified = notified.get(url);
    }
    for (Map.Entry<String, List<URL>> entry : result.entrySet()) {
        String category = entry.getKey();
        List<URL> categoryList = entry.getValue();
        categoryNotified.put(category, categoryList);
        //保存到主目錄下的.dubbo目錄下
        saveProperties(url);
        //上面獲取到的監(jiān)聽器進行通知
        listener.notify(categoryList);
    }
}

AbstractRegistry構(gòu)造器初始化完锌介,接著調(diào)用FailbackRegistry構(gòu)造器初始化:

public FailbackRegistry(URL url) {
    super(url);
    //重試時間,默認5000ms
    int retryPeriod = url.getParameter(Constants.REGISTRY_RETRY_PERIOD_KEY, Constants.DEFAULT_REGISTRY_RETRY_PERIOD);
    //啟動失敗重試定時器
    this.retryFuture = retryExecutor.scheduleWithFixedDelay(new Runnable() {
        public void run() {
            // 檢測并連接注冊中心
            try {
                //重試方法由每個具體子類實現(xiàn)
                //獲取到注冊失敗的猾警,然后嘗試注冊
                retry();
            } catch (Throwable t) { // 防御性容錯}
        }
    }, retryPeriod, retryPeriod, TimeUnit.MILLISECONDS);
}

最后回到ZookeeperRegistry的構(gòu)造初始化:

public ZookeeperRegistry(URL url, ZookeeperTransporter zookeeperTransporter) {
    super(url);
    if (url.isAnyHost()) {
        throw new IllegalStateException("registry address == null");
    }
    //獲得到注冊中心中的分組孔祸,默認dubbo
    String group = url.getParameter(Constants.GROUP_KEY, DEFAULT_ROOT);
    if (! group.startsWith(Constants.PATH_SEPARATOR)) {
        group = Constants.PATH_SEPARATOR + group;
    }
    //注冊到注冊中心的節(jié)點
    this.root = group;
    //使用zookeeperTansporter去連接
    //ZookeeperTransport這里是生成的自適應(yīng)實現(xiàn),默認使用ZkClientZookeeperTransporter
    //ZkClientZookeeperTransporter的connect去實例化一個ZkClient實例
    //并且訂閱狀態(tài)變化的監(jiān)聽器subscribeStateChanges
    //然后返回一個ZkClientZookeeperClient實例
    zkClient = zookeeperTransporter.connect(url);
    //ZkClientZookeeperClient添加狀態(tài)改變監(jiān)聽器
    zkClient.addStateListener(new StateListener() {
        public void stateChanged(int state) {
            if (state == RECONNECTED) {
                try {
                    recover();
                } catch (Exception e) {
                    logger.error(e.getMessage(), e);
                }
            }
        }
    });
}

獲取注冊到注冊中心的url

獲取到了Registry肿嘲,Registry實例中保存著連接到了zookeeper的zkClient實例之后融击,下一步獲取要注冊到注冊中心的url(在RegistryProtocol中)。

final URL registedProviderUrl = getRegistedProviderUrl(originInvoker);
//得到的URL是:
//dubbo://192.168.1.100:20880/dubbo.common.hello.service.HelloService?
//anyhost=true&application=dubbo-provider&application.version=1.0&dubbo=2.5.3&environment=product&
//interface=dubbo.common.hello.service.HelloService&methods=sayHello

注冊到注冊中心

然后調(diào)用registry.register(registedProviderUrl)注冊到注冊中心(在RegistryProtocol中)雳窟。register方法的實現(xiàn)在FailbackRegistry中:

public void register(URL url) {
    super.register(url);
    failedRegistered.remove(url);
    failedUnregistered.remove(url);
    try {
        // 向服務(wù)器端發(fā)送注冊請求
        //調(diào)用子類具體實現(xiàn),發(fā)送注冊請求
        doRegister(url);
    } catch (Exception e) {
        Throwable t = e;

        // 如果開啟了啟動時檢測,則直接拋出異常
        boolean check = getUrl().getParameter(Constants.CHECK_KEY, true)
                && url.getParameter(Constants.CHECK_KEY, true)
                && ! Constants.CONSUMER_PROTOCOL.equals(url.getProtocol());
        boolean skipFailback = t instanceof SkipFailbackWrapperException;
        if (check || skipFailback) {
            if(skipFailback) {
                t = t.getCause();
            }
            throw  封救。拇涤。。
        } else { }

        // 將失敗的注冊請求記錄到失敗列表誉结,定時重試
        failedRegistered.add(url);
    }
}

doRegister(url);在這里是ZookeeperRegistry中具體實現(xiàn)的鹅士,這里將會注冊到注冊中心:

protected void doRegister(URL url) {
    try {
        //這里zkClient就是我們上面調(diào)用構(gòu)造的時候生成的
        //ZkClientZookeeperClient
        //保存著連接到Zookeeper的zkClient實例
        //開始注冊,也就是在Zookeeper中創(chuàng)建節(jié)點
        //這里toUrlPath獲取到的path為:
        ///dubbo/dubbo.common.hello.service.HelloService/providers/dubbo%3A%2F%2F192.168.1.100%3A20880%2F
        //dubbo.common.hello.service.HelloService%3Fanyhost%3Dtrue%26application%3Ddubbo-provider%26
        //application.version%3D1.0%26dubbo%3D2.5.3%26environment%3Dproduct%26interface%3D
        //dubbo.common.hello.service.HelloService%26methods%3DsayHello
        //默認創(chuàng)建的節(jié)點是臨時節(jié)點
        zkClient.create(toUrlPath(url), url.getParameter(Constants.DYNAMIC_KEY, true));
    } catch (Throwable e) { }
}

經(jīng)過這一步之后惩坑,Zookeeper中就有節(jié)點存在了掉盅,具體節(jié)點為

/dubbo
    dubbo.common.hello.service.HelloService
        providers
            /dubbo/dubbo.common.hello.service.HelloService/providers/
            dubbo%3A%2F%2F192.168.1.100%3A20880%2Fdubbo.common.hello.service.HelloService%3F
            anyhost%3Dtrue%26application%3Ddubbo-provider%26
            application.version%3D1.0%26dubbo%3D2.5.3%26environment%3Dproduct%26
            interface%3Ddubbo.common.hello.service.HelloService%26methods%3DsayHello

訂閱注冊中心的服務(wù)

在注冊到注冊中心之后,registry會去訂閱覆蓋配置的服務(wù)以舒,這一步之后就會在/dubbo/dubbo.common.hello.service/HelloService節(jié)點下多一個configurators節(jié)點趾痘。

返回新Exporter實例

最后返回Exporter新實例,返回到ServiceConfig中蔓钟。服務(wù)的發(fā)布就算完成了永票。

交給具體的協(xié)議進行服務(wù)暴露

這里也就是非Registry類型的Invoker的導(dǎo)出過程。主要的步驟是將本地ip和20880端口打開滥沫,進行監(jiān)聽侣集。最后包裝成exporter返回。
doLocalExport(invoker):

private <T> ExporterChangeableWrapper<T>  doLocalExport(final Invoker<T> originInvoker){
    //原始的invoker中的url:
    //registry://127.0.0.1:2181/com.alibaba.dubbo.registry.RegistryService?
    //application=dubbo-provider&application.version=1.0&dubbo=2.5.3
    //&environment=product&export=dubbo%3A%2F%2F10.42.0.1%3A20880%2F
    //dubbo.common.hello.service.HelloService%3Fanyhost%3Dtrue%26application%3Ddubbo-provider%26
    //application.version%3D1.0%26dubbo%3D2.5.3%26environment%3Dproduct%26
    //interface%3Ddubbo.common.hello.service.HelloService%26methods%3DsayHello%26
    //organization%3Dchina%26owner%3Dcheng.xi%26pid%3D7876%26side%3Dprovider%26timestamp%3D1489057305001&
    //organization=china&owner=cheng.xi&pid=7876&registry=zookeeper&timestamp=1489057304900
    
    //從原始的invoker中得到的key:
    //dubbo://10.42.0.1:20880/dubbo.common.hello.service.HelloService?anyhost=true&application=dubbo-provider&
 //application.version=1.0&dubbo=2.5.3&environment=product&interface=dubbo.common.hello.service.HelloService&//methods=sayHello
    String key = getCacheKey(originInvoker);
    ExporterChangeableWrapper<T> exporter = (ExporterChangeableWrapper<T>) bounds.get(key);
    if (exporter == null) {
        synchronized (bounds) {
            exporter = (ExporterChangeableWrapper<T>) bounds.get(key);
            if (exporter == null) {
                //得到一個Invoker代理兰绣,里面包含原來的Invoker
                final Invoker<?> invokerDelegete = new InvokerDelegete<T>(originInvoker, getProviderUrl(originInvoker));
                //此處protocol還是最上面生成的代碼世分,調(diào)用代碼中的export方法,會根據(jù)協(xié)議名選擇調(diào)用具體的實現(xiàn)類
                //這里我們需要調(diào)用DubboProtocol的export方法
                //這里的使用具體協(xié)議進行導(dǎo)出的invoker是個代理invoker
                //導(dǎo)出完之后缀辩,返回一個新的ExporterChangeableWrapper實例
                exporter = new ExporterChangeableWrapper<T>((Exporter<T>)protocol.export(invokerDelegete), originInvoker);
                bounds.put(key, exporter);
            }
        }
    }
    return (ExporterChangeableWrapper<T>) exporter;
}

使用dubbo協(xié)議導(dǎo)出

這里protocol.export(invokerDelegete)就要去具體的DubboProtocol中執(zhí)行了罚攀,DubboProtocol的外面包裹著ProtocolFilterWrapper,再外面還包裹著ProtocolListenerWrapper雌澄。會先經(jīng)過ProtocolListenerWrapper:

public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
    //Registry類型的Invoker
    if (Constants.REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol())) {
        return protocol.export(invoker);
    }
    //其他具體協(xié)議類型的Invoker
    //先進行導(dǎo)出protocol.export(invoker)
    //然后獲取自適應(yīng)的監(jiān)聽器
    //最后返回的是包裝了監(jiān)聽器的Exporter
    //這里監(jiān)聽器的獲取是getActivateExtension斋泄,如果指定了listener就加載實現(xiàn),沒有指定就不加載
    return new ListenerExporterWrapper<T>(protocol.export(invoker), 
            Collections.unmodifiableList(ExtensionLoader.getExtensionLoader(ExporterListener.class)
                    .getActivateExtension(invoker.getUrl(), Constants.EXPORTER_LISTENER_KEY)));
}

再經(jīng)過ProtocolFilterWrapper:

public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
    //Registry類型的Invoker
    if (Constants.REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol())) {
        return protocol.export(invoker);
    }
    //其他具體協(xié)議類型的Invoker
    //先構(gòu)建Filter鏈镐牺,然后再導(dǎo)出
    return protocol.export(buildInvokerChain(invoker, Constants.SERVICE_FILTER_KEY, Constants.PROVIDER));
}

查看下構(gòu)建Invoker鏈的方法:

private static <T> Invoker<T> buildInvokerChain(final Invoker<T> invoker, String key, String group) {
    //我們要處理的那個Invoker作為處理鏈的最后一個
    Invoker<T> last = invoker;
    //根據(jù)key和group獲取自動激活的Filter
    List<Filter> filters = ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(invoker.getUrl(), key, group);
    if (filters.size() > 0) {
        //把所有的過濾器都挨個連接起來炫掐,最后一個是我們真正的Invoker
        for (int i = filters.size() - 1; i >= 0; i --) {
            final Filter filter = filters.get(i);
            final Invoker<T> next = last;
            last = new Invoker<T>() {

                public Class<T> getInterface() {
                    return invoker.getInterface();
                }

                public URL getUrl() {
                    return invoker.getUrl();
                }

                public boolean isAvailable() {
                    return invoker.isAvailable();
                }

                public Result invoke(Invocation invocation) throws RpcException {
                    return filter.invoke(next, invocation);
                }

                public void destroy() {
                    invoker.destroy();
                }

                @Override
                public String toString() {
                    return invoker.toString();
                }
            };
        }
    }
    return last;
}

接著就到了DubboProtocol的export方法,這里進行暴露服務(wù):

public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
    //dubbo://10.42.0.1:20880/dubbo.common.hello.service.HelloService?
    //anyhost=true&application=dubbo-provider&
    //application.version=1.0&dubbo=2.5.3&environment=product&
    //interface=dubbo.common.hello.service.HelloService&
    //methods=sayHello
    URL url = invoker.getUrl();

    // export service.
    //key由serviceName睬涧,port募胃,version,group組成
    //當nio客戶端發(fā)起遠程調(diào)用時畦浓,nio服務(wù)端通過此key來決定調(diào)用哪個Exporter痹束,也就是執(zhí)行的Invoker。
    //dubbo.common.hello.service.HelloService:20880
    String key = serviceKey(url);
    //將Invoker轉(zhuǎn)換成Exporter
    //直接new一個新實例
    //沒做啥處理讶请,就是做一些賦值操作
    //這里的exporter就包含了invoker
    DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);
    //緩存要暴露的服務(wù)祷嘶,key是上面生成的
    exporterMap.put(key, exporter);

    //export an stub service for dispaching event
    //是否支持本地存根
    //遠程服務(wù)后,客戶端通常只剩下接口,而實現(xiàn)全在服務(wù)器端论巍,
    //但提供方有些時候想在客戶端也執(zhí)行部分邏輯烛谊,比如:做ThreadLocal緩存,
    //提前驗證參數(shù)嘉汰,調(diào)用失敗后偽造容錯數(shù)據(jù)等等丹禀,此時就需要在API中帶上Stub,
    //客戶端生成Proxy實鞋怀,會把Proxy通過構(gòu)造函數(shù)傳給Stub双泪,
    //然后把Stub暴露組給用戶,Stub可以決定要不要去調(diào)Proxy密似。
    Boolean isStubSupportEvent = url.getParameter(Constants.STUB_EVENT_KEY,Constants.DEFAULT_STUB_EVENT);
    Boolean isCallbackservice = url.getParameter(Constants.IS_CALLBACK_SERVICE, false);
    if (isStubSupportEvent && !isCallbackservice){
        String stubServiceMethods = url.getParameter(Constants.STUB_EVENT_METHODS_KEY);
        if (stubServiceMethods == null || stubServiceMethods.length() == 0 ){
        } else {
            stubServiceMethodsMap.put(url.getServiceKey(), stubServiceMethods);
        }
    }
    //根據(jù)URL綁定IP與端口焙矛,建立NIO框架的Server
    openServer(url);

    return exporter;
}

上面得到的Exporter會被放到緩存中去,key就是上面生成的辛友,客戶端就可以發(fā)請求根據(jù)key找到Exporter薄扁,然后找到invoker進行調(diào)用了。接下來是創(chuàng)建服務(wù)器并監(jiān)聽端口废累。

接著調(diào)用openServer方法創(chuàng)建NIO Server進行監(jiān)聽:

private void openServer(URL url) {
    // find server.
    //key是IP:PORT
    //192.168.110.197:20880
    String key = url.getAddress();
    //client 也可以暴露一個只有server可以調(diào)用的服務(wù)邓梅。
    boolean isServer = url.getParameter(Constants.IS_SERVER_KEY,true);
    if (isServer) {
        
        ExchangeServer server = serverMap.get(key);
        //同一JVM中,同協(xié)議的服務(wù)邑滨,共享同一個Server日缨,
        //第一個暴露服務(wù)的時候創(chuàng)建server,
        //以后相同協(xié)議的服務(wù)都使用同一個server
        if (server == null) {
            serverMap.put(key, createServer(url));
        } else {
            //同協(xié)議的服務(wù)后來暴露服務(wù)的則使用第一次創(chuàng)建的同一Server
            //server支持reset,配合override功能使用
            //accept掖看、idleTimeout匣距、threads、heartbeat參數(shù)的變化會引起Server的屬性發(fā)生變化
            //這時需要重新設(shè)置Server
            server.reset(url);
        }
    }
}

繼續(xù)看createServer方法:

//url為:
//dubbo://192.168.110.197:20880/dubbo.common.hello.service.HelloService?
//anyhost=true&application=dubbo-provider&
//application.version=1.0&dubbo=2.5.3&environment=product&
//interface=dubbo.common.hello.service.HelloService&
//methods=sayHello
private ExchangeServer createServer(URL url) {
    //默認開啟server關(guān)閉時發(fā)送readonly事件
    url = url.addParameterIfAbsent(Constants.CHANNEL_READONLYEVENT_SENT_KEY, Boolean.TRUE.toString());
    //默認開啟heartbeat
    url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT));
    //默認使用netty
    String str = url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_SERVER);

    if (str != null && str.length() > 0 && ! ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str))
        throw new RpcException("Unsupported server type: " + str + ", url: " + url);

    url = url.addParameter(Constants.CODEC_KEY, Version.isCompatibleVersion() ? COMPATIBLE_CODEC_NAME : DubboCodec.NAME);
    ExchangeServer server;
    try {
        //Exchangers是門面類哎壳,里面封裝的是Exchanger的邏輯毅待。
        //Exchanger默認只有一個實現(xiàn)HeaderExchanger.
        //Exchanger負責數(shù)據(jù)交換和網(wǎng)絡(luò)通信。
        //從Protocol進入Exchanger归榕,標志著程序進入了remote層尸红。
        //這里requestHandler是ExchangeHandlerAdapter
        server = Exchangers.bind(url, requestHandler);
    } catch (RemotingException e) { }
    str = url.getParameter(Constants.CLIENT_KEY);
    if (str != null && str.length() > 0) {
        Set<String> supportedTypes = ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions();
        if (!supportedTypes.contains(str)) {
            throw new RpcException("Unsupported client type: " + str);
        }
    }
    return server;
}

Exchangers.bind方法:

public static ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
    url = url.addParameterIfAbsent(Constants.CODEC_KEY, "exchange");
    //getExchanger方法根據(jù)url獲取到一個默認的實現(xiàn)HeaderExchanger
    //調(diào)用HeaderExchanger的bind方法
    return getExchanger(url).bind(url, handler);
}

HeaderExchanger的bind方法:

public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
    //直接返回一個HeaderExchangeServer
    //先創(chuàng)建一個HeaderExchangeHandler
    //再創(chuàng)建一個DecodeHandler
    //最后調(diào)用Transporters.bind
    return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))));
}

這里會先創(chuàng)建一個HeaderExchangerHandler,包含著ExchangeHandlerAdapter刹泄,接著創(chuàng)建一個DecodeHandler外里,會包含前面的handler,接下來調(diào)用Transporters的bind方法特石,返回一個Server盅蝗,接著用HeaderExchangeServer包裝一下,就返回給Protocol層了姆蘸。
在HeaderExchangerServer包裝的時候會啟動心跳定時器startHeatbeatTimer();

Transports的bind方法:

public static Server bind(URL url, ChannelHandler... handlers) throws RemotingException {
    ChannelHandler handler;
    if (handlers.length == 1) {
        handler = handlers[0];
    } else {
        //如果有多個handler的話墩莫,需要使用分發(fā)器包裝下
        handler = new ChannelHandlerDispatcher(handlers);
    }
    //getTransporter()獲取一個Adaptive的Transporter
    //然后調(diào)用bind方法(默認是NettyTransporter的bind方法)
    return getTransporter().bind(url, handler);
}

getTransporter()生成的Transporter的代碼如下:

import com.alibaba.dubbo.common.extension.ExtensionLoader;
public class Transporter$Adpative implements com.alibaba.dubbo.remoting.Transporter {
    public com.alibaba.dubbo.remoting.Server bind(com.alibaba.dubbo.common.URL arg0, com.alibaba.dubbo.remoting.ChannelHandler arg1) throws com.alibaba.dubbo.common.URL {
        if (arg0 == null) throw new IllegalArgumentException("url == null");
        com.alibaba.dubbo.common.URL url = arg0;
        //Server默認使用netty
        String extName = url.getParameter("server", url.getParameter("transporter", "netty"));
        if(extName == null) throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.remoting.Transporter) name from url(" + url.toString() + ") use keys([server, transporter])");
        //獲取到一個NettyTransporter
        com.alibaba.dubbo.remoting.Transporter extension = (com.alibaba.dubbo.remoting.Transporter)ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.remoting.Transporter.class).getExtension(extName);
        //調(diào)用NettyTransporter的bind方法
        return extension.bind(arg0, arg1);
    }
    
public com.alibaba.dubbo.remoting.Client connect(com.alibaba.dubbo.common.URL arg0, com.alibaba.dubbo.remoting.ChannelHandler arg1) throws com.alibaba.dubbo.common.URL {
    if (arg0 == null) throw new IllegalArgumentException("url == null");
    com.alibaba.dubbo.common.URL url = arg0;
    
    String extName = url.getParameter("client", url.getParameter("transporter", "netty"));
    
    if(extName == null) throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.remoting.Transporter) name from url(" + url.toString() + ") use keys([client, transporter])");
    
    com.alibaba.dubbo.remoting.Transporter extension = (com.alibaba.dubbo.remoting.Transporter)ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.remoting.Transporter.class).getExtension(extName);
    
    return extension.connect(arg0, arg1);
}
}

NettyTransporter的bind方法:

 public Server bind(URL url, ChannelHandler listener) throws RemotingException {
    //創(chuàng)建一個Server
    return new NettyServer(url, listener);
}
public NettyServer(URL url, ChannelHandler handler) throws RemotingException{
    //handler先經(jīng)過ChannelHandlers的包裝方法
    //然后再初始化
    super(url, ChannelHandlers.wrap(handler, ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME)));
}

ChannelHandlers.wrap方法中會根據(jù)SPI擴展機制動態(tài)生成Dispatcher的自適應(yīng)類芙委,生成的代碼不在列出,默認使用AllDispatcher處理贼穆,會返回一個AllChannelHandler题山,會把線程池和DataStore都初始化了兰粉。然后經(jīng)過HeartbeatHandler封裝故痊,再經(jīng)過MultiMessageHandler封裝后返回。

NettyServer構(gòu)造玖姑,會依次經(jīng)過AbstractPeer愕秫,AbstractEndpoint,AbstractServer焰络,NettyServer的初始化戴甩。重點看下AbstractServer的構(gòu)造方法:

public AbstractServer(URL url, ChannelHandler handler) throws RemotingException {
    super(url, handler);
    localAddress = getUrl().toInetSocketAddress();
    String host = url.getParameter(Constants.ANYHOST_KEY, false) 
                    || NetUtils.isInvalidLocalHost(getUrl().getHost()) 
                    ? NetUtils.ANYHOST : getUrl().getHost();
    bindAddress = new InetSocketAddress(host, getUrl().getPort());
    this.accepts = url.getParameter(Constants.ACCEPTS_KEY, Constants.DEFAULT_ACCEPTS);
    this.idleTimeout = url.getParameter(Constants.IDLE_TIMEOUT_KEY, Constants.DEFAULT_IDLE_TIMEOUT);
    try {
        //初始化的時候會打開Server
        //具體實現(xiàn)這里是NettyServer中
        doOpen();
    } catch (Throwable t) { }
    if (handler instanceof WrappedChannelHandler ){
        executor = ((WrappedChannelHandler)handler).getExecutor();
    }
}

然后調(diào)用doOpen方法:

protected void doOpen() throws Throwable {
    NettyHelper.setNettyLoggerFactory();
    //boss線程池
    ExecutorService boss = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerBoss", true));
    //worker線程池
    ExecutorService worker = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerWorker", true));
    //ChannelFactory,沒有指定工作者線程數(shù)量闪彼,就使用cpu+1
    ChannelFactory channelFactory = new NioServerSocketChannelFactory(boss, worker, getUrl().getPositiveParameter(Constants.IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS));
    bootstrap = new ServerBootstrap(channelFactory);

    final NettyHandler nettyHandler = new NettyHandler(getUrl(), this);
    channels = nettyHandler.getChannels();
    bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
        public ChannelPipeline getPipeline() {
            NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec() ,getUrl(), NettyServer.this);
            ChannelPipeline pipeline = Channels.pipeline();
            pipeline.addLast("decoder", adapter.getDecoder());
            pipeline.addLast("encoder", adapter.getEncoder());
            pipeline.addLast("handler", nettyHandler);
            return pipeline;
        }
    });
    // bind之后返回一個Channel
    channel = bootstrap.bind(getBindAddress());
}

doOpen方法創(chuàng)建Netty的Server端并打開甜孤,具體的事情就交給Netty去處理了。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末畏腕,一起剝皮案震驚了整個濱河市缴川,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌描馅,老刑警劉巖把夸,帶你破解...
    沈念sama閱讀 211,123評論 6 490
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異铭污,居然都是意外死亡恋日,警方通過查閱死者的電腦和手機,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,031評論 2 384
  • 文/潘曉璐 我一進店門嘹狞,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人谈截,你說我怎么就攤上這事≈校” “怎么了?”我有些...
    開封第一講書人閱讀 156,723評論 0 345
  • 文/不壞的土叔 我叫張陵娘赴,是天一觀的道長诽表。 經(jīng)常有香客問我,道長竿奏,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 56,357評論 1 283
  • 正文 為了忘掉前任绿语,我火速辦了婚禮,結(jié)果婚禮上候址,老公的妹妹穿的比我還像新娘吕粹。我一直安慰自己,他們只是感情好岗仑,可當我...
    茶點故事閱讀 65,412評論 5 384
  • 文/花漫 我一把揭開白布匹耕。 她就那樣靜靜地躺著,像睡著了一般荠雕。 火紅的嫁衣襯著肌膚如雪稳其。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 49,760評論 1 289
  • 那天炸卑,我揣著相機與錄音既鞠,去河邊找鬼。 笑死矾兜,一個胖子當著我的面吹牛损趋,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播椅寺,決...
    沈念sama閱讀 38,904評論 3 405
  • 文/蒼蘭香墨 我猛地睜開眼浑槽,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了返帕?” 一聲冷哼從身側(cè)響起桐玻,我...
    開封第一講書人閱讀 37,672評論 0 266
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎荆萤,沒想到半個月后镊靴,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 44,118評論 1 303
  • 正文 獨居荒郊野嶺守林人離奇死亡链韭,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 36,456評論 2 325
  • 正文 我和宋清朗相戀三年偏竟,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片殖蚕。...
    茶點故事閱讀 38,599評論 1 340
  • 序言:一個原本活蹦亂跳的男人離奇死亡害驹,死狀恐怖宛官,靈堂內(nèi)的尸體忽然破棺而出摘刑,到底是詐尸還是另有隱情,我是刑警寧澤谭胚,帶...
    沈念sama閱讀 34,264評論 4 328
  • 正文 年R本政府宣布,位于F島的核電站旁趟,受9級特大地震影響锡搜,放射性物質(zhì)發(fā)生泄漏耕餐。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 39,857評論 3 312
  • 文/蒙蒙 一明未、第九天 我趴在偏房一處隱蔽的房頂上張望趟妥。 院中可真熱鬧煮纵,春花似錦行疏、人聲如沸酿联。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,731評論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽摸航。三九已至,卻和暖如春擂涛,著一層夾襖步出監(jiān)牢的瞬間撒妈,已是汗流浹背胀茵。 一陣腳步聲響...
    開封第一講書人閱讀 31,956評論 1 264
  • 我被黑心中介騙來泰國打工琼娘, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人熄浓。 一個月前我還...
    沈念sama閱讀 46,286評論 2 360
  • 正文 我出身青樓俯在,卻偏偏與公主長得像跷乐,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子皿哨,可洞房花燭夜當晚...
    茶點故事閱讀 43,465評論 2 348

推薦閱讀更多精彩內(nèi)容

  • dubbo暴露服務(wù)有兩種情況如输,一種是設(shè)置了延遲暴露(比如delay="5000")椎例,另外一種是沒有設(shè)置延遲暴露或者...
    加大裝益達閱讀 21,257評論 5 36
  • Spring Cloud為開發(fā)人員提供了快速構(gòu)建分布式系統(tǒng)中一些常見模式的工具(例如配置管理,服務(wù)發(fā)現(xiàn)刷晋,斷路器眼虱,智...
    卡卡羅2017閱讀 134,628評論 18 139
  • 先看官網(wǎng)兩張圖【引用來自官網(wǎng)】:image.png 官網(wǎng)說明: 1.首先 ServiceConfig 類拿到對外提...
    致慮閱讀 1,427評論 1 4
  • 先看官網(wǎng)兩張圖【引用來自官網(wǎng)】:image.png 官網(wǎng)說明: 1.首先 ReferenceConfig 類的 i...
    致慮閱讀 1,021評論 0 2
  • 好文來了 ※20180801群推好文润梯,一起收獲與成長※ 授權(quán)申明 本文在“BH好文好報群”授權(quán)下發(fā)表纺铭, 文中所摘編...
    王胤升閱讀 272評論 0 0