Dubbo 服務(wù)暴露詳解

作為分布式框架瓷产,最核心的功能無(wú)非是服務(wù)的暴露和服務(wù)的引用,今天我們先說(shuō)服務(wù)的暴露宅荤。

我們先從暴露服務(wù)配置說(shuō)起

  @Bean
    public ServiceBean logServiceServiceBean(LogService logService){
        ServiceBean<LogService> serviceBean=new ServiceBean<>();
        serviceBean.setInterface(LogService.class);
        serviceBean.setRef(logService);
        return serviceBean;
    }

我們聲明了一個(gè)ServiceBean,他的接口類(lèi)型是LogService.class,他的實(shí)現(xiàn)是注入進(jìn)來(lái)的logService智哀。既然他被封裝成了ServiceBean拇颅,我們就有必要看看他是怎么實(shí)現(xiàn)的。ServiceBean實(shí)現(xiàn)了InitializingBean伸刃,我們就需要看看他是如何覆寫(xiě)其中的afterPropertiesSet方法的:

public void afterPropertiesSet() throws Exception {
        if (getProvider() == null) {
            Map<String, ProviderConfig> providerConfigMap = applicationContext == null ? null  : BeanFactoryUtils.beansOfTypeIncludingAncestors(applicationContext, ProviderConfig.class, false, false);
            if (providerConfigMap != null && providerConfigMap.size() > 0) {
                Map<String, ProtocolConfig> protocolConfigMap = applicationContext == null ? null  : BeanFactoryUtils.beansOfTypeIncludingAncestors(applicationContext, ProtocolConfig.class, false, false);
                if ((protocolConfigMap == null || protocolConfigMap.size() == 0)
                        && providerConfigMap.size() > 1) { // 兼容舊版本
                    List<ProviderConfig> providerConfigs = new ArrayList<ProviderConfig>();
                    for (ProviderConfig config : providerConfigMap.values()) {
                        if (config.isDefault() != null && config.isDefault().booleanValue()) {
                            providerConfigs.add(config);
                        }
                    }
                    if (providerConfigs.size() > 0) {
                        setProviders(providerConfigs);
                    }
                } else {
                    ProviderConfig providerConfig = null;
                    for (ProviderConfig config : providerConfigMap.values()) {
                        if (config.isDefault() == null || config.isDefault().booleanValue()) {
                            if (providerConfig != null) {
                                throw new IllegalStateException("Duplicate provider configs: " + providerConfig + " and " + config);
                            }
                            providerConfig = config;
                        }
                    }
                    if (providerConfig != null) {
                        setProvider(providerConfig);
                    }
                }
            }
        }
        if (getApplication() == null
                && (getProvider() == null || getProvider().getApplication() == null)) {
            Map<String, ApplicationConfig> applicationConfigMap = applicationContext == null ? null : BeanFactoryUtils.beansOfTypeIncludingAncestors(applicationContext, ApplicationConfig.class, false, false);
            if (applicationConfigMap != null && applicationConfigMap.size() > 0) {
                ApplicationConfig applicationConfig = null;
                for (ApplicationConfig config : applicationConfigMap.values()) {
                    if (config.isDefault() == null || config.isDefault().booleanValue()) {
                        if (applicationConfig != null) {
                            throw new IllegalStateException("Duplicate application configs: " + applicationConfig + " and " + config);
                        }
                        applicationConfig = config;
                    }
                }
                if (applicationConfig != null) {
                    setApplication(applicationConfig);
                }
            }
        }
        if (getModule() == null
                && (getProvider() == null || getProvider().getModule() == null)) {
            Map<String, ModuleConfig> moduleConfigMap = applicationContext == null ? null : BeanFactoryUtils.beansOfTypeIncludingAncestors(applicationContext, ModuleConfig.class, false, false);
            if (moduleConfigMap != null && moduleConfigMap.size() > 0) {
                ModuleConfig moduleConfig = null;
                for (ModuleConfig config : moduleConfigMap.values()) {
                    if (config.isDefault() == null || config.isDefault().booleanValue()) {
                        if (moduleConfig != null) {
                            throw new IllegalStateException("Duplicate module configs: " + moduleConfig + " and " + config);
                        }
                        moduleConfig = config;
                    }
                }
                if (moduleConfig != null) {
                    setModule(moduleConfig);
                }
            }
        }
        if ((getRegistries() == null || getRegistries().size() == 0)
                && (getProvider() == null || getProvider().getRegistries() == null || getProvider().getRegistries().size() == 0)
                && (getApplication() == null || getApplication().getRegistries() == null || getApplication().getRegistries().size() == 0)) {
            Map<String, RegistryConfig> registryConfigMap = applicationContext == null ? null : BeanFactoryUtils.beansOfTypeIncludingAncestors(applicationContext, RegistryConfig.class, false, false);
            if (registryConfigMap != null && registryConfigMap.size() > 0) {
                List<RegistryConfig> registryConfigs = new ArrayList<RegistryConfig>();
                for (RegistryConfig config : registryConfigMap.values()) {
                    if (config.isDefault() == null || config.isDefault().booleanValue()) {
                        registryConfigs.add(config);
                    }
                }
                if (registryConfigs != null && registryConfigs.size() > 0) {
                    super.setRegistries(registryConfigs);
                }
            }
        }
        if (getMonitor() == null
                && (getProvider() == null || getProvider().getMonitor() == null)
                && (getApplication() == null || getApplication().getMonitor() == null)) {
            Map<String, MonitorConfig> monitorConfigMap = applicationContext == null ? null : BeanFactoryUtils.beansOfTypeIncludingAncestors(applicationContext, MonitorConfig.class, false, false);
            if (monitorConfigMap != null && monitorConfigMap.size() > 0) {
                MonitorConfig monitorConfig = null;
                for (MonitorConfig config : monitorConfigMap.values()) {
                    if (config.isDefault() == null || config.isDefault().booleanValue()) {
                        if (monitorConfig != null) {
                            throw new IllegalStateException("Duplicate monitor configs: " + monitorConfig + " and " + config);
                        }
                        monitorConfig = config;
                    }
                }
                if (monitorConfig != null) {
                    setMonitor(monitorConfig);
                }
            }
        }
        if ((getProtocols() == null || getProtocols().size() == 0)
                && (getProvider() == null || getProvider().getProtocols() == null || getProvider().getProtocols().size() == 0)) {
            Map<String, ProtocolConfig> protocolConfigMap = applicationContext == null ? null  : BeanFactoryUtils.beansOfTypeIncludingAncestors(applicationContext, ProtocolConfig.class, false, false);
            if (protocolConfigMap != null && protocolConfigMap.size() > 0) {
                List<ProtocolConfig> protocolConfigs = new ArrayList<ProtocolConfig>();
                for (ProtocolConfig config : protocolConfigMap.values()) {
                    if (config.isDefault() == null || config.isDefault().booleanValue()) {
                        protocolConfigs.add(config);
                    }
                }
                if (protocolConfigs != null && protocolConfigs.size() > 0) {
                    super.setProtocols(protocolConfigs);
                }
            }
        }
        if (getPath() == null || getPath().length() == 0) {
            if (beanName != null && beanName.length() > 0 
                    && getInterface() != null && getInterface().length() > 0
                    && beanName.startsWith(getInterface())) {
                setPath(beanName);
            }
        }
        if (! isDelay()) {
            export();
        }
    }

從spring容器中獲取bean独榴,填充dubbo正常使用的屬性,例如
Provider,Application,Moudle等奕枝。填充完后棺榔,有一個(gè)重要的方法就是export()。dubbo允許通過(guò)配置來(lái)進(jìn)行延遲暴露隘道,當(dāng)然一般情況下是同步的症歇,進(jìn)入了doExport(),里面又是一通檢查,檢查完后谭梗,進(jìn)入了我們今天要著重開(kāi)始說(shuō)明的doExportUrls()

   private void doExportUrls() {
        List<URL> registryURLs = loadRegistries(true);
        for (ProtocolConfig protocolConfig : protocols) {
            doExportUrlsFor1Protocol(protocolConfig, registryURLs);
        }
    }

獲取到注冊(cè)中心的地址忘晤,是個(gè)列表,也就是說(shuō)可以有多個(gè)注冊(cè)中心,那么我們的注冊(cè)中心是在我們的配置文件中這個(gè)Bean指定的

   @Bean
    public RegistryConfig registry() {
        RegistryConfig registryConfig = new RegistryConfig();
        registryConfig.setAddress("127.0.0.1:2181");
        registryConfig.setProtocol("zookeeper");
        return registryConfig;
    }

上面我們已經(jīng)找到了注冊(cè)中心的地址激捏,下面设塔,我們就是要把我們需要被暴露的服務(wù)的信息放到注冊(cè)中心上。

 private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs) {
        String name = protocolConfig.getName();
        if (name == null || name.length() == 0) {
            name = "dubbo";
        }

        String host = protocolConfig.getHost();
        if (provider != null && (host == null || host.length() == 0)) {
            host = provider.getHost();
        }
        boolean anyhost = false;
        if (NetUtils.isInvalidLocalHost(host)) {
            anyhost = true;
            try {
                host = InetAddress.getLocalHost().getHostAddress();
            } catch (UnknownHostException e) {
                logger.warn(e.getMessage(), e);
            }
            if (NetUtils.isInvalidLocalHost(host)) {
                if (registryURLs != null && registryURLs.size() > 0) {
                    for (URL registryURL : registryURLs) {
                        try {
                            Socket socket = new Socket();
                            try {
                                SocketAddress addr = new InetSocketAddress(registryURL.getHost(), registryURL.getPort());
                                socket.connect(addr, 1000);
                                host = socket.getLocalAddress().getHostAddress();
                                break;
                            } finally {
                                try {
                                    socket.close();
                                } catch (Throwable e) {}
                            }
                        } catch (Exception e) {
                            logger.warn(e.getMessage(), e);
                        }
                    }
                }
                if (NetUtils.isInvalidLocalHost(host)) {
                    host = NetUtils.getLocalHost();
                }
            }
        }

        Integer port = protocolConfig.getPort();
        if (provider != null && (port == null || port == 0)) {
            port = provider.getPort();
        }
        final int defaultPort = ExtensionLoader.getExtensionLoader(Protocol.class).getExtension(name).getDefaultPort();
        if (port == null || port == 0) {
            port = defaultPort;
        }
        if (port == null || port <= 0) {
            port = getRandomPort(name);
            if (port == null || port < 0) {
                port = NetUtils.getAvailablePort(defaultPort);
                putRandomPort(name, port);
            }
            logger.warn("Use random available port(" + port + ") for protocol " + name);
        }

        Map<String, String> map = new HashMap<String, String>();
        if (anyhost) {
            map.put(Constants.ANYHOST_KEY, "true");
        }
        map.put(Constants.SIDE_KEY, Constants.PROVIDER_SIDE);
        map.put(Constants.DUBBO_VERSION_KEY, Version.getVersion());
        map.put(Constants.TIMESTAMP_KEY, String.valueOf(System.currentTimeMillis()));
        if (ConfigUtils.getPid() > 0) {
            map.put(Constants.PID_KEY, String.valueOf(ConfigUtils.getPid()));
        }
        appendParameters(map, application);
        appendParameters(map, module);
        appendParameters(map, provider, Constants.DEFAULT_KEY);
        appendParameters(map, protocolConfig);
        appendParameters(map, this);
        if (methods != null && methods.size() > 0) {
            for (MethodConfig method : methods) {
                appendParameters(map, method, method.getName());
                String retryKey = method.getName() + ".retry";
                if (map.containsKey(retryKey)) {
                    String retryValue = map.remove(retryKey);
                    if ("false".equals(retryValue)) {
                        map.put(method.getName() + ".retries", "0");
                    }
                }
                List<ArgumentConfig> arguments = method.getArguments();
                if (arguments != null && arguments.size() > 0) {
                    for (ArgumentConfig argument : arguments) {
                        //類(lèi)型自動(dòng)轉(zhuǎn)換.
                        if(argument.getType() != null && argument.getType().length() >0){
                            Method[] methods = interfaceClass.getMethods();
                            //遍歷所有方法
                            if(methods != null && methods.length > 0){
                                for (int i = 0; i < methods.length; i++) {
                                    String methodName = methods[i].getName();
                                    //匹配方法名稱(chēng)远舅,獲取方法簽名.
                                    if(methodName.equals(method.getName())){
                                        Class<?>[] argtypes = methods[i].getParameterTypes();
                                        //一個(gè)方法中單個(gè)callback
                                        if (argument.getIndex() != -1 ){
                                            if (argtypes[argument.getIndex()].getName().equals(argument.getType())){
                                                appendParameters(map, argument, method.getName() + "." + argument.getIndex());
                                            }else {
                                                throw new IllegalArgumentException("argument config error : the index attribute and type attirbute not match :index :"+argument.getIndex() + ", type:" + argument.getType());
                                            }
                                        } else {
                                            //一個(gè)方法中多個(gè)callback
                                            for (int j = 0 ;j<argtypes.length ;j++) {
                                                Class<?> argclazz = argtypes[j];
                                                if (argclazz.getName().equals(argument.getType())){
                                                    appendParameters(map, argument, method.getName() + "." + j);
                                                    if (argument.getIndex() != -1 && argument.getIndex() != j){
                                                        throw new IllegalArgumentException("argument config error : the index attribute and type attirbute not match :index :"+argument.getIndex() + ", type:" + argument.getType());
                                                    }
                                                }
                                            }
                                        }
                                    }
                                }
                            }
                        }else if(argument.getIndex() != -1){
                            appendParameters(map, argument, method.getName() + "." + argument.getIndex());
                        }else {
                            throw new IllegalArgumentException("argument config must set index or type attribute.eg: <dubbo:argument index='0' .../> or <dubbo:argument type=xxx .../>");
                        }

                    }
                }
            } // end of methods for
        }

        if (generic) {
            map.put("generic", String.valueOf(true));
            map.put("methods", Constants.ANY_VALUE);
        } else {
            String revision = Version.getVersion(interfaceClass, version);
            if (revision != null && revision.length() > 0) {
                map.put("revision", revision);
            }

            String[] methods = Wrapper.getWrapper(interfaceClass).getMethodNames();
            if(methods.length == 0) {
                logger.warn("NO method found in service interface " + interfaceClass.getName());
                map.put("methods", Constants.ANY_VALUE);
            }
            else {
                map.put("methods", StringUtils.join(new HashSet<String>(Arrays.asList(methods)), ","));
            }
        }
        if (! ConfigUtils.isEmpty(token)) {
            if (ConfigUtils.isDefault(token)) {
                map.put("token", UUID.randomUUID().toString());
            } else {
                map.put("token", token);
            }
        }
        if ("injvm".equals(protocolConfig.getName())) {
            protocolConfig.setRegister(false);
            map.put("notify", "false");
        }
        // 導(dǎo)出服務(wù)
        String contextPath = protocolConfig.getContextpath();
        if ((contextPath == null || contextPath.length() == 0) && provider != null) {
            contextPath = provider.getContextpath();
        }
        URL url = new URL(name, host, port, (contextPath == null || contextPath.length() == 0 ? "" : contextPath + "/") + path, map);

        if (ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class)
                .hasExtension(url.getProtocol())) {
            url = ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class)
                    .getExtension(url.getProtocol()).getConfigurator(url).configure(url);
        }

        String scope = url.getParameter(Constants.SCOPE_KEY);
        //配置為none不暴露
        if (! Constants.SCOPE_NONE.toString().equalsIgnoreCase(scope)) {

            //配置不是remote的情況下做本地暴露 (配置為remote闰蛔,則表示只暴露遠(yuǎn)程服務(wù))
            if (!Constants.SCOPE_REMOTE.toString().equalsIgnoreCase(scope)) {
                exportLocal(url);
            }
            //如果配置不是local則暴露為遠(yuǎn)程服務(wù).(配置為local,則表示只暴露遠(yuǎn)程服務(wù))
            if (! Constants.SCOPE_LOCAL.toString().equalsIgnoreCase(scope) ){
                if (logger.isInfoEnabled()) {
                    logger.info("Export dubbo service " + interfaceClass.getName() + " to url " + url);
                }
                if (registryURLs != null && registryURLs.size() > 0
                        && url.getParameter("register", true)) {
                    for (URL registryURL : registryURLs) {
                        url = url.addParameterIfAbsent("dynamic", registryURL.getParameter("dynamic"));
                        URL monitorUrl = loadMonitor(registryURL);
                        if (monitorUrl != null) {
                            url = url.addParameterAndEncoded(Constants.MONITOR_KEY, monitorUrl.toFullString());
                        }
                        if (logger.isInfoEnabled()) {
                            logger.info("Register dubbo service " + interfaceClass.getName() + " url " + url + " to registry " + registryURL);
                        }
                        Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString()));

                        Exporter<?> exporter = protocol.export(invoker);
                        exporters.add(exporter);
                    }
                } else {
                    Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, url);

                    Exporter<?> exporter = protocol.export(invoker);
                    exporters.add(exporter);
                }
            }
        }
        this.urls.add(url);
    }

這個(gè)方法有點(diǎn)長(zhǎng)图柏,但是沒(méi)關(guān)系序六,我們一點(diǎn)點(diǎn)來(lái),首先思考一個(gè)問(wèn)題:如果給你了注冊(cè)中心的地址蚤吹,和要暴露的服務(wù)的信息例诀,你會(huì)怎么做這個(gè)暴露?如果是我裁着,我會(huì)這么做:暴露服務(wù)繁涂,首先要讓人知道的是你這個(gè)服務(wù)的ip和端口,這樣引用者才知道怎么連接二驰,連上服務(wù)器后扔罪,我們?cè)诳紤]調(diào)用哪個(gè)服務(wù)模塊的有哪些方法可以讓引用者調(diào)用,如果確定要調(diào)用哪個(gè)服務(wù)的哪個(gè)方法诸蚕,只有將這些信息都知道步势,引用才能知道怎么發(fā)起這個(gè)調(diào)用氧猬。所以我們?cè)俦┞斗?wù)的時(shí)候,IP坏瘩,端口盅抚,服務(wù)名稱(chēng),方法名稱(chēng)倔矾,是要放入到注冊(cè)中心的妄均。因?yàn)樽罱K服務(wù)的引用者是和注冊(cè)中心進(jìn)行交互的,獲取信息的哪自。那么確實(shí)dubbo也是這么做的丰包,它將所有的數(shù)據(jù)都放到自定義的URL的實(shí)例中,然后壤巷,將注冊(cè)中心的信息和URL的信息整合得到一起邑彪,到注冊(cè)中心進(jìn)行暴露服務(wù)。也就是下面這個(gè)關(guān)鍵的步驟

                        Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString()));

我們看下registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString())
拼成的字符串經(jīng)過(guò)decode后是什么樣的
registry://127.0.0.1:2181/com.alibaba.dubbo.registry.RegistryService?application=log&dubbo=2.5.3&export=dubbo://192.168.1.102:20880/com.linyang.test.service.LogService?anyhost=true&application=log&default.proxy=javassist&default.retries=0&default.timeout=30000&default.version=LATEST&dubbo=2.5.3&interface=com.linyang.test.service.LogService&methods=modify,create&pid=4917&side=provider&threads=100&timestamp=1525576255082&pid=4917&registry=zookeeper&timestamp=1525575179509剩下的就看會(huì)怎么利用這個(gè)字符串了胧华,利用ref這個(gè)接口的實(shí)現(xiàn)類(lèi)和接口類(lèi)型和這個(gè)URL創(chuàng)建了一個(gè)代理的invoker.根據(jù)上下文判斷是javasist類(lèi)型的ProxyFactory,他內(nèi)部創(chuàng)建了一個(gè)AbstractProxyInvoker類(lèi)型的類(lèi)的實(shí)例寄症,里面存儲(chǔ)了傳進(jìn)來(lái)的三個(gè)值。我們得到了這個(gè)invoker后矩动,就可以暴露他了有巧,暴露的對(duì)象是invoker,這是一個(gè)關(guān)鍵點(diǎn)

Exporter<?> exporter = protocol.export(invoker);

我們看到是協(xié)議protocal進(jìn)行暴露的悲没,這個(gè)協(xié)議最終調(diào)用的是registryProtocol篮迎。說(shuō)到這里大家有可能會(huì)有疑問(wèn),不是應(yīng)該是DubboProtocol嗎,其實(shí)不是這樣示姿,我們可以看下Protocol類(lèi)

/*
 * Copyright 1999-2011 Alibaba Group.
 *  
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *  
 *      http://www.apache.org/licenses/LICENSE-2.0
 *  
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package com.alibaba.dubbo.rpc;

import com.alibaba.dubbo.common.URL;
import com.alibaba.dubbo.common.extension.Adaptive;
import com.alibaba.dubbo.common.extension.SPI;

/**
 * Protocol. (API/SPI, Singleton, ThreadSafe)
 * 
 * @author william.liangf
 */
@SPI("dubbo")
public interface Protocol {
    
    /**
     * 獲取缺省端口甜橱,當(dāng)用戶(hù)沒(méi)有配置端口時(shí)使用。
     * 
     * @return 缺省端口
     */
    int getDefaultPort();

    /**
     * 暴露遠(yuǎn)程服務(wù):<br>
     * 1. 協(xié)議在接收請(qǐng)求時(shí)峻凫,應(yīng)記錄請(qǐng)求來(lái)源方地址信息:RpcContext.getContext().setRemoteAddress();<br>
     * 2. export()必須是冪等的渗鬼,也就是暴露同一個(gè)URL的Invoker兩次,和暴露一次沒(méi)有區(qū)別荧琼。<br>
     * 3. export()傳入的Invoker由框架實(shí)現(xiàn)并傳入,協(xié)議不需要關(guān)心差牛。<br>
     * 
     * @param <T> 服務(wù)的類(lèi)型
     * @param invoker 服務(wù)的執(zhí)行體
     * @return exporter 暴露服務(wù)的引用命锄,用于取消暴露
     * @throws RpcException 當(dāng)暴露服務(wù)出錯(cuò)時(shí)拋出,比如端口已占用
     */
    @Adaptive
    <T> Exporter<T> export(Invoker<T> invoker) throws RpcException;

    /**
     * 引用遠(yuǎn)程服務(wù):<br>
     * 1. 當(dāng)用戶(hù)調(diào)用refer()所返回的Invoker對(duì)象的invoke()方法時(shí)偏化,協(xié)議需相應(yīng)執(zhí)行同URL遠(yuǎn)端export()傳入的Invoker對(duì)象的invoke()方法脐恩。<br>
     * 2. refer()返回的Invoker由協(xié)議實(shí)現(xiàn),協(xié)議通常需要在此Invoker中發(fā)送遠(yuǎn)程請(qǐng)求侦讨。<br>
     * 3. 當(dāng)url中有設(shè)置check=false時(shí)驶冒,連接失敗不能拋出異常苟翻,并內(nèi)部自動(dòng)恢復(fù)。<br>
     * 
     * @param <T> 服務(wù)的類(lèi)型
     * @param type 服務(wù)的類(lèi)型
     * @param url 遠(yuǎn)程服務(wù)的URL地址
     * @return invoker 服務(wù)的本地代理
     * @throws RpcException 當(dāng)連接服務(wù)提供方失敗時(shí)拋出
     */
    @Adaptive
    <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException;

    /**
     * 釋放協(xié)議:<br>
     * 1. 取消該協(xié)議所有已經(jīng)暴露和引用的服務(wù)骗污。<br>
     * 2. 釋放協(xié)議所占用的所有資源崇猫,比如連接和端口。<br>
     * 3. 協(xié)議在釋放后需忿,依然能暴露和引用新的服務(wù)诅炉。<br>
     */
    void destroy();

}

在export方法上有@Adaptive注解,這個(gè)注解寫(xiě)在方法上屋厘,有個(gè)作用涕烧,可以根據(jù)傳入的URL來(lái)指定需要的協(xié)議,上面的URL中指名了是registry汗洒,所以他使用的應(yīng)該是registryProtocol協(xié)議议纯,那么我們想的那個(gè)DubboProtocol是在什么地方呢?不用急溢谤,是在RegistryProtocal里面做的痹扇。我們繼續(xù)說(shuō)。但是是不是只調(diào)了registryProtocol?當(dāng)時(shí)不是溯香,還記得在@adaptive那個(gè)注解的時(shí)候說(shuō)掃描鲫构,有提到warpper類(lèi),就是將協(xié)議進(jìn)行包裹玫坛,所以這個(gè)protocal是融合了ProtocolFilterWrapper和ProtocolListenerWrapper和DubboProtocal三者结笨,先執(zhí)行
ProtocolFilterWrapper和ProtocolListenerWrapper,他們針對(duì)registry協(xié)議沒(méi)做啥湿镀,直接進(jìn)行下一步炕吸,進(jìn)入了RegistryProtocol的export

public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
        //export invoker
        final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker);
        //獲得注冊(cè)中心
        final Registry registry = getRegistry(originInvoker);
        //獲得要注冊(cè)的鏈接,也就是真正的要暴露的服務(wù)
        final URL registedProviderUrl = getRegistedProviderUrl(originInvoker);
        //想注冊(cè)中心注冊(cè)
        registry.register(registedProviderUrl);
        // 訂閱override數(shù)據(jù)
        // FIXME 提供者訂閱時(shí)勉痴,會(huì)影響同一JVM即暴露服務(wù)赫模,又引用同一服務(wù)的的場(chǎng)景,因?yàn)閟ubscribed以服務(wù)名為緩存的key蒸矛,導(dǎo)致訂閱信息覆蓋瀑罗。
        final URL overrideSubscribeUrl = getSubscribedOverrideUrl(registedProviderUrl);
        final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl);
        overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);
        registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);
        //保證每次export都返回一個(gè)新的exporter實(shí)例
        return new Exporter<T>() {
            public Invoker<T> getInvoker() {
                return exporter.getInvoker();
            }
            public void unexport() {
                try {
                    exporter.unexport();
                } catch (Throwable t) {
                    logger.warn(t.getMessage(), t);
                }
                try {
                    registry.unregister(registedProviderUrl);
                } catch (Throwable t) {
                    logger.warn(t.getMessage(), t);
                }
                try {
                    overrideListeners.remove(overrideSubscribeUrl);
                    registry.unsubscribe(overrideSubscribeUrl, overrideSubscribeListener);
                } catch (Throwable t) {
                    logger.warn(t.getMessage(), t);
                }
            }
        };
    }

注冊(cè)協(xié)議做的事情很簡(jiǎn)單先進(jìn)行本地暴露,獲取響應(yīng)的注冊(cè)中心雏掠,想注冊(cè)中心注冊(cè)要暴露的服務(wù)斩祭,設(shè)置訂閱,注冊(cè)中心信息發(fā)生改變后會(huì)通知服務(wù)乡话,進(jìn)行數(shù)據(jù)的更新摧玫。說(shuō)起來(lái)很容易,但是其實(shí)里面封裝了大量的內(nèi)容绑青。首先我們來(lái)看下本地暴露的方法doLocalExport

 private <T> ExporterChangeableWrapper<T>  doLocalExport(final Invoker<T> originInvoker){
        String key = getCacheKey(originInvoker);
        ExporterChangeableWrapper<T> exporter = (ExporterChangeableWrapper<T>) bounds.get(key);
        if (exporter == null) {
            synchronized (bounds) {
                exporter = (ExporterChangeableWrapper<T>) bounds.get(key);
                if (exporter == null) {
                    final Invoker<?> invokerDelegete = new InvokerDelegete<T>(originInvoker, getProviderUrl(originInvoker));
                    exporter = new ExporterChangeableWrapper<T>((Exporter<T>)protocol.export(invokerDelegete), originInvoker);
                    bounds.put(key, exporter);
                }
            }
        }
        return (ExporterChangeableWrapper<T>) exporter;
    }

延續(xù)了以往的Dubbo風(fēng)格诬像,使用了大量的本地緩存屋群。根據(jù)傳入的原始的invoker,獲取到cacheKey,從本地緩存bounds中獲取,獲取不到坏挠,就開(kāi)始了創(chuàng)建的過(guò)程芍躏。首先創(chuàng)建了一個(gè)包賺對(duì)象invokerDelegete,包含了原始的invoker和providerUrl,我們可以簡(jiǎn)單看下這個(gè)providerUrl.getProviderUrl(originInvoker)結(jié)果是

dubbo://192.168.1.102:20880/com.linyang.test.service.LogService?anyhost=true&application=log&default.proxy=javassist&default.retries=0&default.timeout=30000&default.version=LATEST&dubbo=2.5.3&interface=com.linyang.test.service.LogService&methods=modify,create&pid=19026&side=provider&threads=100&timestamp=1525594853055

看一下這個(gè)provider的鏈接提供的dubbo協(xié)議癞揉,所以纸肉,當(dāng)我們本地暴露的時(shí)候(Exporter<T>)protocol.export(invokerDelegete)會(huì)發(fā)生什么?會(huì)真正調(diào)用的是DubboProtocol喊熟,當(dāng)然我們不能忘了ProtocolFilterWrapper和ProtocolListenerWrapper柏肪。OK,那我們?cè)俅慰纯此麄兊降鬃隽耸裁唇媾疲葋?lái)看看ProtocolFilterWrapper

  public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
        if (Constants.REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol())) {
            return protocol.export(invoker);
        }
        return protocol.export(buildInvokerChain(invoker, Constants.SERVICE_FILTER_KEY, Constants.PROVIDER));
    }

這個(gè)filter處理的時(shí)候建立了一條調(diào)用鏈InvokerChain

private static <T> Invoker<T> buildInvokerChain(final Invoker<T> invoker, String key, String group) {
        Invoker<T> last = invoker;
        List<Filter> filters = ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(invoker.getUrl(), key, group);
        if (filters.size() > 0) {
            for (int i = filters.size() - 1; i >= 0; i --) {
                final Filter filter = filters.get(i);
                final Invoker<T> next = last;
                last = new Invoker<T>() {

                    public Class<T> getInterface() {
                        return invoker.getInterface();
                    }

                    public URL getUrl() {
                        return invoker.getUrl();
                    }

                    public boolean isAvailable() {
                        return invoker.isAvailable();
                    }

                    public Result invoke(Invocation invocation) throws RpcException {
                        return filter.invoke(next, invocation);
                    }

                    public void destroy() {
                        invoker.destroy();
                    }

                    @Override
                    public String toString() {
                        return invoker.toString();
                    }
                };
            }
        }
        return last;
    }

從SPI中獲取激活的Filter類(lèi)的實(shí)例烦味,在@activite的那一節(jié)也講過(guò),他的主要用法是在Filter上壁拉,其實(shí)就是說(shuō)的這里谬俄,然后將他們變成鏈?zhǔn)浇Y(jié)構(gòu),保證他們?cè)僬{(diào)用的時(shí)候弃理,一個(gè)接著一個(gè)溃论,當(dāng)然這是常用的filter的使用模式《徊看完了ProtocolFilterWrapper钥勋,我們?cè)倏聪滤南乱粋€(gè)ProtocolListenerWrapper。

   public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
        if (Constants.REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol())) {
            return protocol.export(invoker);
        }
        return new ListenerExporterWrapper<T>(protocol.export(invoker), 
                Collections.unmodifiableList(ExtensionLoader.getExtensionLoader(ExporterListener.class)
                        .getActivateExtension(invoker.getUrl(), Constants.EXPORTER_LISTENER_KEY)));
    }

他做的就是講Exporter包裝成ListenerExporterWrapper的實(shí)例辆苔,他是原來(lái)的exporter和從spi擴(kuò)展點(diǎn)中獲取的ExporterListener的實(shí)例組成算灸。下面到了我們的有一個(gè)重頭戲:DubboProtocal的export的地方

public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
        URL url = invoker.getUrl();
        
        // export service.
        String key = serviceKey(url);
        DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);
        exporterMap.put(key, exporter);
        
        //export an stub service for dispaching event
        Boolean isStubSupportEvent = url.getParameter(Constants.STUB_EVENT_KEY,Constants.DEFAULT_STUB_EVENT);
        Boolean isCallbackservice = url.getParameter(Constants.IS_CALLBACK_SERVICE, false);
        if (isStubSupportEvent && !isCallbackservice){
            String stubServiceMethods = url.getParameter(Constants.STUB_EVENT_METHODS_KEY);
            if (stubServiceMethods == null || stubServiceMethods.length() == 0 ){
                if (logger.isWarnEnabled()){
                    logger.warn(new IllegalStateException("consumer [" +url.getParameter(Constants.INTERFACE_KEY) +
                            "], has set stubproxy support event ,but no stub methods founded."));
                }
            } else {
                stubServiceMethodsMap.put(url.getServiceKey(), stubServiceMethods);
            }
        }

        openServer(url);
        
        return exporter;
    }

每次創(chuàng)建的時(shí)候都創(chuàng)建一個(gè)新的DubboExporter,并將其返回上層使用驻啤。到目前為止菲驴,我們還沒(méi)有提到本地起服務(wù),因?yàn)槟阆胱屍渌秸{(diào)用到自己骑冗,肯定是要開(kāi)一個(gè)socket赊瞬,但是目前我們還沒(méi)有看到,但是不用急沐旨,他的剩下的內(nèi)容都包含在openServer(url)根據(jù)providerUrl進(jìn)行開(kāi)啟一個(gè)server

    private void openServer(URL url) {
        // find server.
        String key = url.getAddress();
        //client 也可以暴露一個(gè)只有server可以調(diào)用的服務(wù)森逮。
        boolean isServer = url.getParameter(Constants.IS_SERVER_KEY,true);
        if (isServer) {
            ExchangeServer server = serverMap.get(key);
            if (server == null) {
                serverMap.put(key, createServer(url));
            } else {
                //server支持reset,配合override功能使用
                server.reset(url);
            }
        }
    }

直擊重點(diǎn)createServer(url)

private ExchangeServer createServer(URL url) {
        //默認(rèn)開(kāi)啟server關(guān)閉時(shí)發(fā)送readonly事件
        url = url.addParameterIfAbsent(Constants.CHANNEL_READONLYEVENT_SENT_KEY, Boolean.TRUE.toString());
        //默認(rèn)開(kāi)啟heartbeat
        url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT));
        String str = url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_SERVER);

        if (str != null && str.length() > 0 && ! ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str))
            throw new RpcException("Unsupported server type: " + str + ", url: " + url);

        url = url.addParameter(Constants.CODEC_KEY, Version.isCompatibleVersion() ? COMPATIBLE_CODEC_NAME : DubboCodec.NAME);
        ExchangeServer server;
        try {
            server = Exchangers.bind(url, requestHandler);
        } catch (RemotingException e) {
            throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e);
        }
        str = url.getParameter(Constants.CLIENT_KEY);
        if (str != null && str.length() > 0) {
            Set<String> supportedTypes = ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions();
            if (!supportedTypes.contains(str)) {
                throw new RpcException("Unsupported client type: " + str);
            }
        }
        return server;
    }

我們看到了Exchangers.bind(url, requestHandler)Dubbo的Exchanger層,他具體的處理邏輯

   public static ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
        if (url == null) {
            throw new IllegalArgumentException("url == null");
        }
        if (handler == null) {
            throw new IllegalArgumentException("handler == null");
        }
        url = url.addParameterIfAbsent(Constants.CODEC_KEY, "exchange");
        return getExchanger(url).bind(url, handler);
    }

Exchanger的具體實(shí)現(xiàn)類(lèi)是HeaderExchanger磁携,所以調(diào)用他的bind方法

  public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
        return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))));
    }

出現(xiàn)了Dubbo的Transporter層,他的bind方法和exchange差不多良风,找具體的Transporter進(jìn)行bind

   public static Server bind(URL url, ChannelHandler... handlers) throws RemotingException {
        if (url == null) {
            throw new IllegalArgumentException("url == null");
        }
        if (handlers == null || handlers.length == 0) {
            throw new IllegalArgumentException("handlers == null");
        }
        ChannelHandler handler;
        if (handlers.length == 1) {
            handler = handlers[0];
        } else {
            handler = new ChannelHandlerDispatcher(handlers);
        }
        return getTransporter().bind(url, handler);
    }

最后發(fā)現(xiàn)使用的是NettyTransporter谊迄,Netty大家滅有用過(guò)也應(yīng)該聽(tīng)說(shuō)過(guò)闷供,他是現(xiàn)在最為流行的Nio網(wǎng)絡(luò)框架。我們繼續(xù)

   public Server bind(URL url, ChannelHandler listener) throws RemotingException {
        return new NettyServer(url, listener);
    }

剩下的就是Netty的處理了统诺,起一個(gè)Server歪脏,開(kāi)啟端口等著調(diào)用者連進(jìn)來(lái)。OK這是DubboProtocol的export的過(guò)程粮呢。我們回到上面RegisterProtocol的部分

exporter = new ExporterChangeableWrapper<T>((Exporter<T>)protocol.export(invokerDelegete), originInvoker);

經(jīng)過(guò)DubboProtocol暴露后得到一個(gè)具體的Exporter,將這個(gè)exporter和原始invoker封裝到了ExporterChangeableWrapper里面婿失,進(jìn)行返回。好了這就是export的全過(guò)程啄寡。

測(cè)試源碼

測(cè)試代碼源碼點(diǎn)擊獲取

預(yù)告豪硅,看這里

下一篇: Dubbo 服務(wù)引用詳解

END

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市挺物,隨后出現(xiàn)的幾起案子懒浮,更是在濱河造成了極大的恐慌,老刑警劉巖识藤,帶你破解...
    沈念sama閱讀 219,188評(píng)論 6 508
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件砚著,死亡現(xiàn)場(chǎng)離奇詭異,居然都是意外死亡痴昧,警方通過(guò)查閱死者的電腦和手機(jī)稽穆,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,464評(píng)論 3 395
  • 文/潘曉璐 我一進(jìn)店門(mén),熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)赶撰,“玉大人舌镶,你說(shuō)我怎么就攤上這事】勰遥” “怎么了乎折?”我有些...
    開(kāi)封第一講書(shū)人閱讀 165,562評(píng)論 0 356
  • 文/不壞的土叔 我叫張陵,是天一觀(guān)的道長(zhǎng)侵歇。 經(jīng)常有香客問(wèn)我骂澄,道長(zhǎng),這世上最難降的妖魔是什么惕虑? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 58,893評(píng)論 1 295
  • 正文 為了忘掉前任坟冲,我火速辦了婚禮,結(jié)果婚禮上溃蔫,老公的妹妹穿的比我還像新娘健提。我一直安慰自己,他們只是感情好伟叛,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,917評(píng)論 6 392
  • 文/花漫 我一把揭開(kāi)白布私痹。 她就那樣靜靜地躺著,像睡著了一般。 火紅的嫁衣襯著肌膚如雪紊遵。 梳的紋絲不亂的頭發(fā)上账千,一...
    開(kāi)封第一講書(shū)人閱讀 51,708評(píng)論 1 305
  • 那天,我揣著相機(jī)與錄音暗膜,去河邊找鬼匀奏。 笑死,一個(gè)胖子當(dāng)著我的面吹牛学搜,可吹牛的內(nèi)容都是我干的娃善。 我是一名探鬼主播,決...
    沈念sama閱讀 40,430評(píng)論 3 420
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼瑞佩,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼聚磺!你這毒婦竟也來(lái)了?” 一聲冷哼從身側(cè)響起钉凌,我...
    開(kāi)封第一講書(shū)人閱讀 39,342評(píng)論 0 276
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤咧最,失蹤者是張志新(化名)和其女友劉穎,沒(méi)想到半個(gè)月后御雕,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體矢沿,經(jīng)...
    沈念sama閱讀 45,801評(píng)論 1 317
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,976評(píng)論 3 337
  • 正文 我和宋清朗相戀三年酸纲,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了捣鲸。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 40,115評(píng)論 1 351
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡闽坡,死狀恐怖栽惶,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情疾嗅,我是刑警寧澤外厂,帶...
    沈念sama閱讀 35,804評(píng)論 5 346
  • 正文 年R本政府宣布,位于F島的核電站代承,受9級(jí)特大地震影響汁蝶,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜论悴,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,458評(píng)論 3 331
  • 文/蒙蒙 一掖棉、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧膀估,春花似錦幔亥、人聲如沸。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 32,008評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)针肥。三九已至,卻和暖如春笤昨,著一層夾襖步出監(jiān)牢的瞬間祖驱,已是汗流浹背握恳。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 33,135評(píng)論 1 272
  • 我被黑心中介騙來(lái)泰國(guó)打工瞒窒, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人乡洼。 一個(gè)月前我還...
    沈念sama閱讀 48,365評(píng)論 3 373
  • 正文 我出身青樓崇裁,卻偏偏與公主長(zhǎng)得像,于是被迫代替她去往敵國(guó)和親束昵。 傳聞我的和親對(duì)象是個(gè)殘疾皇子拔稳,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,055評(píng)論 2 355

推薦閱讀更多精彩內(nèi)容

  • 這一條街道不長(zhǎng),空曠 時(shí)常只擺布著幾個(gè)人锹雏,緩緩移動(dòng) 那時(shí)候就只有風(fēng)在這寬廣的街道涌動(dòng)了 有時(shí)天氣好巴比,風(fēng)就是金色,有...
    歌行者李江華閱讀 202評(píng)論 1 1
  • 現(xiàn)在的你還沒(méi)睡吧 復(fù)雜的思緒涌上心頭了吧 失眠的咖啡還在續(xù)杯 是你害怕閉上眼睛在自欺欺人吧 她的笑她的溫度都消散了...
    吃干脆面不吃烤腸閱讀 190評(píng)論 0 2
  • 最近買(mǎi)了個(gè)小米手環(huán) 純粹是抱著買(mǎi)來(lái)玩玩的心情但是小米的原裝只賣(mài)黑色 要其他顏色必須加買(mǎi)腕帶讓我震精出了一股「算了礁遵,...
    冰河時(shí)期k閱讀 286評(píng)論 0 1