dubbo 服務(wù)導(dǎo)出

目錄
dubbo 拓展機(jī)制 SPI
dubbo 自適應(yīng)拓展機(jī)制
dubbo 服務(wù)導(dǎo)出
dubbo 服務(wù)引用
dubbo 服務(wù)字典
dubbo 服務(wù)路由
dubbo 集群容錯策略
dubbo 負(fù)載均衡
dubbo 服務(wù)調(diào)用過程

Dubbo 服務(wù)導(dǎo)出過程始于 Spring 容器發(fā)布刷新事件咐扭,Dubbo 在接收到事件后燃观,會立即執(zhí)行服務(wù)導(dǎo)出邏輯。整個邏輯大致可分為三個部分嘱兼,第一部分是前置工作,主要用于檢查參數(shù)情屹,組裝 URL邻遏。第二部分是導(dǎo)出服務(wù),包含導(dǎo)出服務(wù)到本地 (JVM)舌狗,和導(dǎo)出服務(wù)到遠(yuǎn)程兩個過程。第三部分是向注冊中心注冊服務(wù)扔水,用于服務(wù)發(fā)現(xiàn)痛侍。

ServiceBean是整個服務(wù)導(dǎo)出的核心類,它實現(xiàn)了

  • InitializingBean:從applicationContext中獲取如protocol魔市,module等配置信息主届,并且在注冊監(jiān)聽上下文刷新的事件赵哲,失敗時立即進(jìn)行服務(wù)導(dǎo)出服務(wù)。
  • DisposableBean:bean的摧毀(2.7.3版本中已經(jīng)是空實現(xiàn)了君丁,因為前面已經(jīng)注冊了掛鉤)枫夺。
  • ApplicationContextAware:保存applicationContext對象,注冊掛鉤當(dāng) jvm 關(guān)閉時關(guān)閉所有的鏈接以及摧毀已經(jīng)注冊了的 url 地址谈截,注冊監(jiān)聽上下文刷新的事件筷屡。
  • ApplicationListener<ContextRefreshedEvent>:監(jiān)聽上下文的刷新,判斷是否需要導(dǎo)出服務(wù)簸喂。
  • BeanNameAware:設(shè)置 beanName毙死。
  • ApplicationEventPublisherAware:用于發(fā)布事件,當(dāng)導(dǎo)出服務(wù)完成時喻鳄,發(fā)布一個ServiceBeanExportedEvent事件扼倘,dubbo監(jiān)聽到這個事件后會查看這個事件包含的類是否在本地調(diào)用中,如果在就立刻執(zhí)行服務(wù)引入除呵。

等接口再菊。

大致流程:


image.png

首先在ServiceBean這個類初始化之前,將自己作為ApplicationListener添加到容器中颜曾,然后在容器啟動完成時纠拔,監(jiān)聽容器刷新完成的事件,也就是ServiceBean 的 onApplicationEvent方法泛豪。onApplicationEvent 是一個事件響應(yīng)方法稠诲,該方法會在收到 Spring 上下文刷新事件后執(zhí)行服務(wù)導(dǎo)出操作。

    @Override
public void setApplicationContext(ApplicationContext applicationContext) {
    this.applicationContext = applicationContext;
    // 保存 applicationContext,并且在 jvm 關(guān)閉時,刪除所有的register service和關(guān)閉所有的連接
    SpringExtensionFactory.addApplicationContext(applicationContext);
    // 當(dāng)上下文狀態(tài)變化時震嫉,添加監(jiān)聽事件
    supportedApplicationListener = addApplicationListener(applicationContext, this);
}

@Override
/**
 * 監(jiān)聽ContextRefreshedEvent事件,當(dāng)所有的bean都初始化完成并被成功裝載或后會觸發(fā)該事件
 */
public void onApplicationEvent(ContextRefreshedEvent event) {
    // 是否已導(dǎo)出 && 是不是已被取消導(dǎo)出
    if (!isExported() && !isUnexported()) {
        if (logger.isInfoEnabled()) {
            logger.info("The service ready on spring started. service: " + getInterface());
        }
        // 導(dǎo)出服務(wù)
        export();
    }
}

代碼比較簡單劝萤,繼續(xù)向下看export方法

public void export() {
    super.export();
    // Publish ServiceBeanExportedEvent
    // 發(fā)布事件,spring攔截事件調(diào)用 referenceBean 的 get() 進(jìn)行服務(wù)引入
    publishExportEvent();
}

這里主要分析的是super.export方法慎璧。

public synchronized void export() {
    // 檢測和修改配置
    checkAndUpdateSubConfigs();

    // 是否顯示的使用AbstractServiceBuilder設(shè)置export床嫌,
    // 否則使用<dubbo:provider export="" />的配置
    if (!shouldExport()) {
        return;
    }

    // 是否顯示的使用AbstractServiceBuilder設(shè)置delay,
    // 否則使用<dubbo:provider delay="" />的配置
    if (shouldDelay()) {
        // 延遲發(fā)布服務(wù)
        DELAY_EXPORT_EXECUTOR.schedule(this::doExport, getDelay(), TimeUnit.MILLISECONDS);
    } else {
        // 發(fā)布服務(wù)
        doExport();
    }
}

繼續(xù)看doExport方法炸卑,該方法中先是做了一些狀態(tài)判斷既鞠,最主要的是其中的doExportUrls方法

private void doExportUrls() {
    // 多注冊中心組裝注冊中心的url
    List<URL> registryURLs = loadRegistries(true);
    // 多協(xié)議
    for (ProtocolConfig protocolConfig : protocols) {
        // 獲取protocol中配置的contextpath,缺省獲取provider中的contextpath
        // group/(contextpath/path或path):version
        String pathKey = URL.buildKey(getContextPath(protocolConfig).map(p -> p + "/" + path).orElse(path), group, version);
        // 服務(wù)提供者模型
        ProviderModel providerModel = new ProviderModel(pathKey, ref, interfaceClass);
        // 保存pathKey和providerModel的映射關(guān)系
        ApplicationModel.initProviderModel(pathKey, providerModel);
        // 組裝 URL
        doExportUrlsFor1Protocol(protocolConfig, registryURLs);
    }
}

首先看看loadRegistries方法

loadRegistries 方法主要包含如下的邏輯:
1.檢測是否存在注冊中心配置類盖文,不存在則拋出異常
2.構(gòu)建參數(shù)映射集合,也就是 map
3.構(gòu)建注冊中心鏈接列表
4.遍歷鏈接列表蚯姆,并根據(jù)條件決定是否將其添加到 registryList 中

protected List<URL> loadRegistries(boolean provider) {
    // check && override if necessary
    List<URL> registryList = new ArrayList<URL>();
    if (CollectionUtils.isNotEmpty(registries)) {
        for (RegistryConfig config : registries) {
            String address = config.getAddress();
            if (StringUtils.isEmpty(address)) {
                // 若 address 為空五续,則將其設(shè)為 0.0.0.0
                address = ANYHOST_VALUE;
            }
            // address不為n/a
            if (!RegistryConfig.NO_AVAILABLE.equalsIgnoreCase(address)) {
                Map<String, String> map = new HashMap<String, String>();
                // 添加 ApplicationConfig 中的字段信息到 map 中
                appendParameters(map, application);
                // 添加 RegistryConfig 字段信息到 map 中
                appendParameters(map, config);
                // 添加 path洒敏,protocol 等信息到 map 中
                map.put(PATH_KEY, RegistryService.class.getName());
                appendRuntimeParameters(map);
                if (!map.containsKey(PROTOCOL_KEY)) {
                    map.put(PROTOCOL_KEY, DUBBO_PROTOCOL);
                }

                // 解析得到 URL 列表,address 可能包含多個注冊中心 ip疙驾,
                // 因此解析得到的是一個 URL 列表
                List<URL> urls = UrlUtils.parseURLs(address, map);

                for (URL url : urls) {
                    url = URLBuilder.from(url)
                            .addParameter(REGISTRY_KEY, url.getProtocol())
                            // 將 URL 協(xié)議頭設(shè)置為 registry
                            .setProtocol(REGISTRY_PROTOCOL)
                            .build();
                    // 通過判斷條件凶伙,決定是否添加 url 到 registryList 中,條件如下:
                    // (服務(wù)提供者 && register = true 或 null)
                    //    || (非服務(wù)提供者 && subscribe = true 或 null)
                    if ((provider && url.getParameter(REGISTER_KEY, true))
                            || (!provider && url.getParameter(SUBSCRIBE_KEY, true))) {
                        registryList.add(url);
                    }
                }
            }
        }
    }
    return registryList;
}

回到doExportUrls中它碎,在通過注冊中心配置了對應(yīng)的URL之后函荣,就是通過doExportUrlsFor1Protocol方法在各個協(xié)議的基礎(chǔ)上向所有的注冊中心注冊服務(wù)。

URL 是 Dubbo 配置的載體扳肛,通過 URL 可讓 Dubbo 的各種配置在各個模塊之間傳遞

private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs) {
    // 默認(rèn)使用dubbo協(xié)議
    String name = protocolConfig.getName();
    if (StringUtils.isEmpty(name)) {
        name = DUBBO;
    }

    Map<String, String> map = new HashMap<String, String>();
    // 添加 side傻挂、版本、時間戳以及進(jìn)程號等信息到 map 中
    map.put(SIDE_KEY, PROVIDER_SIDE);

    // 添加時間戳挖息,dubbo 版本金拒,pid等運行時參數(shù)
    appendRuntimeParameters(map);
    // 通過反射將對象的字段信息添加到 map 中
    appendParameters(map, metrics);
    appendParameters(map, application);
    appendParameters(map, module);
    // remove 'default.' prefix for configs from ProviderConfig
    // appendParameters(map, provider, Constants.DEFAULT_KEY);
    appendParameters(map, provider);
    appendParameters(map, protocolConfig);
    appendParameters(map, this);
    // methods 為 MethodConfig 集合,MethodConfig 中存儲了 <dubbo:method> 標(biāo)簽的配置信息
    // 這段代碼用于添加 Callback 配置到 map 中
    if (CollectionUtils.isNotEmpty(methods)) {
        for (MethodConfig method : methods) {
            // 添加 MethodConfig 對象的字段信息到 map 中套腹,鍵 = 方法名.屬性名绪抛。
            // 比如存儲 <dubbo:method name="sayHello" retries="2"> 對應(yīng)的 MethodConfig,
            // 鍵 = sayHello.retries电禀,map = {"sayHello.retries": 2, "xxx": "yyy"}
            appendParameters(map, method, method.getName());
            String retryKey = method.getName() + ".retry";
            if (map.containsKey(retryKey)) {
                String retryValue = map.remove(retryKey);
                // 檢測 MethodConfig retry 是否為 false幢码,若是,則設(shè)置重試次數(shù)為0
                if (Boolean.FALSE.toString().equals(retryValue)) {
                    map.put(method.getName() + ".retries", "0");
                }
            }
            // 獲取 ArgumentConfig 列表
            // 設(shè)置方法的參數(shù)和值到map中
            List<ArgumentConfig> arguments = method.getArguments();
            if (CollectionUtils.isNotEmpty(arguments)) {
                for (ArgumentConfig argument : arguments) {
                    // convert argument type
                    if (argument.getType() != null && argument.getType().length() > 0) {
                        Method[] methods = interfaceClass.getMethods();
                        // visit all methods
                        if (methods != null && methods.length > 0) {
                            for (int i = 0; i < methods.length; i++) {
                                String methodName = methods[i].getName();
                                // target the method, and get its signature
                                if (methodName.equals(method.getName())) {
                                    Class<?>[] argtypes = methods[i].getParameterTypes();
                                    // one callback in the method
                                    // 檢測 ArgumentConfig 中的 type 屬性與方法參數(shù)列表中的參數(shù)名稱是否一致尖飞,不一致則拋出異常
                                    if (argument.getIndex() != -1) {
                                        if (argtypes[argument.getIndex()].getName().equals(argument.getType())) {
                                            // 添加 ArgumentConfig 字段信息到 map 中症副,
                                            // 鍵前綴 = 方法名.index,比如:
                                            // map = {"sayHello.3": true}
                                            appendParameters(map, argument, method.getName() + "." + argument.getIndex());
                                        } else {
                                            throw new IllegalArgumentException("Argument config error : the index attribute and type attribute not match :index :" + argument.getIndex() + ", type:" + argument.getType());
                                        }
                                    } else {
                                        // multiple callbacks in the method
                                        // 從參數(shù)類型列表中查找類型名稱為 argument.type 的參數(shù)
                                        for (int j = 0; j < argtypes.length; j++) {
                                            Class<?> argclazz = argtypes[j];
                                            if (argclazz.getName().equals(argument.getType())) {
                                                appendParameters(map, argument, method.getName() + "." + j);
                                                if (argument.getIndex() != -1 && argument.getIndex() != j) {
                                                    throw new IllegalArgumentException("Argument config error : the index attribute and type attribute not match :index :" + argument.getIndex() + ", type:" + argument.getType());
                                                }
                                            }
                                        }
                                    }
                                }
                            }
                        }
                    } else if (argument.getIndex() != -1) {
                        // 添加 ArgumentConfig 字段信息到 map 中
                        appendParameters(map, argument, method.getName() + "." + argument.getIndex());
                    } else {
                        throw new IllegalArgumentException("Argument config must set index or type attribute.eg: <dubbo:argument index='0' .../> or <dubbo:argument type=xxx .../>");
                    }

                }
            }
        } // end of methods for
    }

    // 檢測 generic 是否為 "true"葫松,并根據(jù)檢測結(jié)果向 map 中添加不同的信息
    if (ProtocolUtils.isGeneric(generic)) {
        map.put(GENERIC_KEY, generic);
        map.put(METHODS_KEY, ANY_VALUE);
    } else {
        String revision = Version.getVersion(interfaceClass, version);
        if (revision != null && revision.length() > 0) {
            map.put(REVISION_KEY, revision);
        }

        // 為接口生成包裹類 Wrapper瓦糕,Wrapper 中包含了接口的詳細(xì)信息,比如接口方法名數(shù)組腋么,字段信息等
        String[] methods = Wrapper.getWrapper(interfaceClass).getMethodNames();
        if (methods.length == 0) {
            logger.warn("No method found in service interface " + interfaceClass.getName());
            map.put(METHODS_KEY, ANY_VALUE);
        } else {
            // 將逗號作為分隔符連接方法名咕娄,并將連接后的字符串放入 map 中
            map.put(METHODS_KEY, StringUtils.join(new HashSet<String>(Arrays.asList(methods)), ","));
        }
    }
    // 添加 token 到 map 中
    if (!ConfigUtils.isEmpty(token)) {
        if (ConfigUtils.isDefault(token)) {
            // 生成隨機(jī)token
            map.put(TOKEN_KEY, UUID.randomUUID().toString());
        } else {
            map.put(TOKEN_KEY, token);
        }
    }
    // export service
    // 獲取 host 和 port
    String host = this.findConfigedHosts(protocolConfig, registryURLs, map);
    Integer port = this.findConfigedPorts(protocolConfig, name, map);
    // 組裝 URL
    URL url = new URL(name, host, port, getContextPath(protocolConfig).map(p -> p + "/" + path).orElse(path), map);

    if (ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class)
            .hasExtension(url.getProtocol())) {
        // 加載 ConfiguratorFactory,并生成 Configurator 實例珊擂,然后通過實例配置 url
        url = ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class)
                .getExtension(url.getProtocol()).getConfigurator(url).configure(url);
    }

    String scope = url.getParameter(SCOPE_KEY);
    // don't export when none is configured
    // 當(dāng)scope為none時圣勒,不導(dǎo)出服務(wù)
    if (!SCOPE_NONE.equalsIgnoreCase(scope)) {

        // export to local if the config is not remote (export to remote only when config is remote)
        // scope != remote,導(dǎo)出到本地
        if (!SCOPE_REMOTE.equalsIgnoreCase(scope)) {
            exportLocal(url);
        }
        // export to remote if the config is not local (export to local only when config is local)
        // scope != local摧扇,導(dǎo)出到遠(yuǎn)程
        if (!SCOPE_LOCAL.equalsIgnoreCase(scope)) {
            if (CollectionUtils.isNotEmpty(registryURLs)) {
                for (URL registryURL : registryURLs) {
                    //if protocol is only injvm ,not register
                    if (LOCAL_PROTOCOL.equalsIgnoreCase(url.getProtocol())) {
                        continue;
                    }
                    // dynamic配置圣贸,是否開啟動態(tài)注冊
                    url = url.addParameterIfAbsent(DYNAMIC_KEY, registryURL.getParameter(DYNAMIC_KEY));
                    // 加載監(jiān)視器鏈接
                    URL monitorUrl = loadMonitor(registryURL);
                    if (monitorUrl != null) {
                        // 將監(jiān)視器鏈接作為參數(shù)添加到 url 中
                        url = url.addParameterAndEncoded(MONITOR_KEY, monitorUrl.toFullString());
                    }
                    if (logger.isInfoEnabled()) {
                        if (url.getParameter(REGISTER_KEY, true)) {
                            logger.info("Register dubbo service " + interfaceClass.getName() + " url " + url + " to registry " + registryURL);
                        } else {
                            logger.info("Export dubbo service " + interfaceClass.getName() + " to url " + url);
                        }
                    }

                    // For providers, this is used to enable custom proxy to generate invoker
                    String proxy = url.getParameter(PROXY_KEY);
                    if (StringUtils.isNotEmpty(proxy)) {
                        registryURL = registryURL.addParameter(PROXY_KEY, proxy);
                    }

                    // 為服務(wù)提供類(ref)生成 Invoker
                    Invoker<?> invoker = PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(EXPORT_KEY, url.toFullString()));
                    // DelegateProviderMetaDataInvoker 用于持有 Invoker 和 ServiceConfig
                    DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);

                    // 導(dǎo)出服務(wù),并生成 Exporter
                    Exporter<?> exporter = protocol.export(wrapperInvoker);
                    exporters.add(exporter);
                }
            }
            // 不存在注冊中心扛稽,僅導(dǎo)出服務(wù)
            else {
                if (logger.isInfoEnabled()) {
                    logger.info("Export dubbo service " + interfaceClass.getName() + " to url " + url);
                }
                Invoker<?> invoker = PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, url);
                DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);

                Exporter<?> exporter = protocol.export(wrapperInvoker);
                exporters.add(exporter);
            }
            /**
             * @since 2.7.0
             * ServiceData Store
             * 2.7 以后精簡url中的配置項
             */
            MetadataReportService metadataReportService = null;
            if ((metadataReportService = getMetadataReportService()) != null) {
                metadataReportService.publishProvider(url);
            }
        }
    }
    this.urls.add(url);
}

代碼很長主要分成兩部分來看

  • getWrapper對類生成代理吁峻,當(dāng)服務(wù)調(diào)用方調(diào)用方法的時候,會首先調(diào)用代理類,代理類再調(diào)用對應(yīng)的方法
  • 執(zhí)行服務(wù)導(dǎo)出和服務(wù)注冊

1.getWrapper

public static Wrapper getWrapper(Class<?> c) {
    // can not wrapper on dynamic class.
    while (ClassGenerator.isDynamicClass(c))
    {
        c = c.getSuperclass();
    }

    if (c == Object.class) {
        return OBJECT_WRAPPER;
    }

    Wrapper ret = WRAPPER_MAP.get(c);
    if (ret == null) {
        ret = makeWrapper(c);
        WRAPPER_MAP.put(c, ret);
    }
    return ret;
}

通過makeWrapper方法生成代理類用含,然后保存代理類到WRAPPER_MAP緩存中

private static Wrapper makeWrapper(Class<?> c) {
    // 檢測 c 是否為基本類型矮慕,若是則拋出異常
    if (c.isPrimitive()) {
        throw new IllegalArgumentException("Can not create wrapper for primitive type: " + c);
    }

    String name = c.getName();
    ClassLoader cl = ClassUtils.getClassLoader(c);

    // c1 用于存儲 setPropertyValue 方法代碼
    StringBuilder c1 = new StringBuilder("public void setPropertyValue(Object o, String n, Object v){ ");
    // c2 用于存儲 getPropertyValue 方法代碼
    StringBuilder c2 = new StringBuilder("public Object getPropertyValue(Object o, String n){ ");
    // c3 用于存儲 invokeMethod 方法代碼
    StringBuilder c3 = new StringBuilder("public Object invokeMethod(Object o, String n, Class[] p, Object[] v) throws " + InvocationTargetException.class.getName() + "{ ");

    // 生成類型轉(zhuǎn)換代碼及異常捕捉代碼,比如:
    //   DemoService w; try { w = ((DemoServcie) $1); }}catch(Throwable e){ throw new IllegalArgumentException(e); }
    c1.append(name).append(" w; try{ w = ((").append(name).append(")$1); }catch(Throwable e){ throw new IllegalArgumentException(e); }");
    c2.append(name).append(" w; try{ w = ((").append(name).append(")$1); }catch(Throwable e){ throw new IllegalArgumentException(e); }");
    c3.append(name).append(" w; try{ w = ((").append(name).append(")$1); }catch(Throwable e){ throw new IllegalArgumentException(e); }");

    // pts 用于存儲成員變量名和類型
    Map<String, Class<?>> pts = new HashMap<>(); // <property name, property types>
    // ms 用于存儲方法描述信息(可理解為方法簽名)及 Method 實例
    Map<String, Method> ms = new LinkedHashMap<>(); // <method desc, Method instance>
    // mns 為方法名列表
    List<String> mns = new ArrayList<>(); // method names.
    // dmns 用于存儲“定義在當(dāng)前類中的方法”的名稱
    List<String> dmns = new ArrayList<>(); // declaring method names.

    // get all public field.
    // 獲取 public 訪問級別的字段啄骇,并為所有字段生成條件判斷語句
    for (Field f : c.getFields()) {
        String fn = f.getName();
        Class<?> ft = f.getType();
        // 忽略關(guān)鍵字 static 或 transient 修飾的變量
        if (Modifier.isStatic(f.getModifiers()) || Modifier.isTransient(f.getModifiers())) {
            continue;
        }

        // 生成條件判斷及賦值語句痴鳄,比如:
        // if( $2.equals("name") ) { w.name = (java.lang.String) $3; return;}
        // if( $2.equals("age") ) { w.age = ((Number) $3).intValue(); return;}
        c1.append(" if( $2.equals(\"").append(fn).append("\") ){ w.").append(fn).append("=").append(arg(ft, "$3")).append("; return; }");
        // 生成條件判斷及返回語句,比如:
        // if( $2.equals("name") ) { return ($w)w.name; }
        c2.append(" if( $2.equals(\"").append(fn).append("\") ){ return ($w)w.").append(fn).append("; }");
        // 存儲 <字段名, 字段類型> 鍵值對到 pts 中
        pts.put(fn, ft);
    }

    Method[] methods = c.getMethods();
    // get all public method.
    // 檢測 c 中是否包含在當(dāng)前類中聲明的方法
    boolean hasMethod = hasMethods(methods);
    if (hasMethod) {
        c3.append(" try{");
        for (Method m : methods) {
            //ignore Object's method.
            // 忽略 Object 中定義的方法
            if (m.getDeclaringClass() == Object.class) {
                continue;
            }

            String mn = m.getName();
            // 生成方法名判斷語句缸夹,比如:
            // if ( "sayHello".equals( $2 )
            c3.append(" if( \"").append(mn).append("\".equals( $2 ) ");
            int len = m.getParameterTypes().length;
            // 生成“運行時傳入的參數(shù)數(shù)量與方法參數(shù)列表長度”判斷語句痪寻,比如:
            // && $3.length == 2
            c3.append(" && ").append(" $3.length == ").append(len);

            boolean override = false;
            for (Method m2 : methods) {
                // 檢測方法是否存在重載情況,條件為:方法對象不同 && 方法名相同
                if (m != m2 && m.getName().equals(m2.getName())) {
                    override = true;
                    break;
                }
            }
            // 對重載方法進(jìn)行處理虽惭,考慮下面的方法:
            //    1. void sayHello(Integer, String)
            //    2. void sayHello(Integer, Integer)
            // 方法名相同橡类,參數(shù)列表長度也相同,因此不能僅通過這兩項判斷兩個方法是否相等趟妥。
            // 需要進(jìn)一步判斷方法的參數(shù)類型
            if (override) {
                if (len > 0) {
                    for (int l = 0; l < len; l++) {
                        // 生成參數(shù)類型進(jìn)行檢測代碼猫态,比如:
                        // && $3[0].getName().equals("java.lang.Integer")
                        //    && $3[1].getName().equals("java.lang.String")
                        c3.append(" && ").append(" $3[").append(l).append("].getName().equals(\"")
                                .append(m.getParameterTypes()[l].getName()).append("\")");
                    }
                }
            }

            // 添加 ) {,完成方法判斷語句披摄,此時生成的代碼可能如下(已格式化):
            // if ("sayHello".equals($2)
            //     && $3.length == 2
            //     && $3[0].getName().equals("java.lang.Integer")
            //     && $3[1].getName().equals("java.lang.String")) {
            c3.append(" ) { ");

            // 根據(jù)返回值類型生成目標(biāo)方法調(diào)用語句
            if (m.getReturnType() == Void.TYPE) {
                // w.sayHello((java.lang.Integer)$4[0], (java.lang.String)$4[1]); return null;
                c3.append(" w.").append(mn).append('(').append(args(m.getParameterTypes(), "$4")).append(");").append(" return null;");
            } else {
                // return w.sayHello((java.lang.Integer)$4[0], (java.lang.String)$4[1]);
                c3.append(" return ($w)w.").append(mn).append('(').append(args(m.getParameterTypes(), "$4")).append(");");
            }

            // 添加 }, 生成的代碼形如(已格式化):
            // if ("sayHello".equals($2)
            //     && $3.length == 2
            //     && $3[0].getName().equals("java.lang.Integer")
            //     && $3[1].getName().equals("java.lang.String")) {
            //
            //     w.sayHello((java.lang.Integer)$4[0], (java.lang.String)$4[1]);
            //     return null;
            // }
            c3.append(" }");

            // 添加方法名到 mns 集合中
            mns.add(mn);
            // 檢測當(dāng)前方法是否在 c 中被聲明的
            if (m.getDeclaringClass() == c) {
                // 若是亲雪,則將當(dāng)前方法名添加到 dmns 中
                dmns.add(mn);
            }
            ms.put(ReflectUtils.getDesc(m), m);
        }
        // 添加異常捕捉語句
        c3.append(" } catch(Throwable e) { ");
        c3.append("     throw new java.lang.reflect.InvocationTargetException(e); ");
        c3.append(" }");
    }

    // 添加 NoSuchMethodException 異常拋出代碼
    c3.append(" throw new " + NoSuchMethodException.class.getName() + "(\"Not found method \\\"\"+$2+\"\\\" in class " + c.getName() + ".\"); }");

    // deal with get/set method.
    Matcher matcher;
    for (Map.Entry<String, Method> entry : ms.entrySet()) {
        String md = entry.getKey();
        Method method = entry.getValue();
        // 匹配以 get 開頭的方法
        if ((matcher = ReflectUtils.GETTER_METHOD_DESC_PATTERN.matcher(md)).matches()) {
            // 獲取屬性名
            String pn = propertyName(matcher.group(1));
            // 生成屬性判斷以及返回語句,示例如下:
            // if( $2.equals("name") ) { return ($w).w.getName(); }
            c2.append(" if( $2.equals(\"").append(pn).append("\") ){ return ($w)w.").append(method.getName()).append("(); }");
            pts.put(pn, method.getReturnType());
        }
        // 匹配以 is/has/can 開頭的方法
        else if ((matcher = ReflectUtils.IS_HAS_CAN_METHOD_DESC_PATTERN.matcher(md)).matches()) {
            String pn = propertyName(matcher.group(1));
            // 生成屬性判斷以及返回語句疚膊,示例如下:
            // if( $2.equals("dream") ) { return ($w).w.hasDream(); }
            c2.append(" if( $2.equals(\"").append(pn).append("\") ){ return ($w)w.").append(method.getName()).append("(); }");
            pts.put(pn, method.getReturnType());
        }
        // 匹配以 set 開頭的方法
        else if ((matcher = ReflectUtils.SETTER_METHOD_DESC_PATTERN.matcher(md)).matches()) {
            Class<?> pt = method.getParameterTypes()[0];
            String pn = propertyName(matcher.group(1));
            // 生成屬性判斷以及 setter 調(diào)用語句义辕,示例如下:
            // if( $2.equals("name") ) { w.setName((java.lang.String)$3); return; }
            c1.append(" if( $2.equals(\"").append(pn).append("\") ){ w.").append(method.getName()).append("(").append(arg(pt, "$3")).append("); return; }");
            pts.put(pn, pt);
        }
    }
    // 添加 NoSuchPropertyException 異常拋出代碼
    c1.append(" throw new " + NoSuchPropertyException.class.getName() + "(\"Not found property \\\"\"+$2+\"\\\" field or setter method in class " + c.getName() + ".\"); }");
    c2.append(" throw new " + NoSuchPropertyException.class.getName() + "(\"Not found property \\\"\"+$2+\"\\\" field or setter method in class " + c.getName() + ".\"); }");

    // make class
    long id = WRAPPER_CLASS_COUNTER.getAndIncrement();
    // 創(chuàng)建類生成器
    ClassGenerator cc = ClassGenerator.newInstance(cl);
    // 設(shè)置類名及超類
    cc.setClassName((Modifier.isPublic(c.getModifiers()) ? Wrapper.class.getName() : c.getName() + "$sw") + id);
    cc.setSuperClass(Wrapper.class);

    // 添加默認(rèn)構(gòu)造方法
    cc.addDefaultConstructor();
    // 添加字段
    cc.addField("public static String[] pns;"); // property name array.
    cc.addField("public static " + Map.class.getName() + " pts;"); // property type map.
    cc.addField("public static String[] mns;"); // all method name array.
    cc.addField("public static String[] dmns;"); // declared method name array.
    for (int i = 0, len = ms.size(); i < len; i++) {
        cc.addField("public static Class[] mts" + i + ";");
    }

    // 添加方法代碼
    cc.addMethod("public String[] getPropertyNames(){ return pns; }");
    cc.addMethod("public boolean hasProperty(String n){ return pts.containsKey($1); }");
    cc.addMethod("public Class getPropertyType(String n){ return (Class)pts.get($1); }");
    cc.addMethod("public String[] getMethodNames(){ return mns; }");
    cc.addMethod("public String[] getDeclaredMethodNames(){ return dmns; }");
    cc.addMethod(c1.toString());
    cc.addMethod(c2.toString());
    cc.addMethod(c3.toString());

    try {
        // 生成類
        Class<?> wc = cc.toClass();
        // setup static field.
        // 設(shè)置字段值
        wc.getField("pts").set(null, pts);
        wc.getField("pns").set(null, pts.keySet().toArray(new String[0]));
        wc.getField("mns").set(null, mns.toArray(new String[0]));
        wc.getField("dmns").set(null, dmns.toArray(new String[0]));
        int ix = 0;
        for (Method m : ms.values()) {
            wc.getField("mts" + ix++).set(null, m.getParameterTypes());
        }
        // 創(chuàng)建 Wrapper 實例
        return (Wrapper) wc.newInstance();
    } catch (RuntimeException e) {
        throw e;
    } catch (Throwable e) {
        throw new RuntimeException(e.getMessage(), e);
    } finally {
        cc.release();
        ms.clear();
        mns.clear();
        dmns.clear();
    }
}

例如源碼demo中的DemoService生成的wrapper類如下:

public class Wrapper0
extends Wrapper
implements ClassGenerator.DC {
    public static String[] pns;
    public static Map pts;
    public static String[] mns;
    public static String[] dmns;
    public static Class[] mts0;

    public String[] getPropertyNames() {
        return pns;
    }

    public boolean hasProperty(String string) {
        return pts.containsKey(string);
    }

    public Class getPropertyType(String string) {
        return (Class)pts.get(string);
    }

    public String[] getMethodNames() {
        return mns;
    }

    public String[] getDeclaredMethodNames() {
        return dmns;
    }

    public void setPropertyValue(Object object, String string, Object object2) {
        try {
            DemoService demoService = (DemoService)object;
        }
        catch (Throwable throwable) {
            throw new IllegalArgumentException(throwable);
        }
        throw new NoSuchPropertyException(new StringBuffer().append("Not found property \"").append(string).append("\" field or setter method in class org.apache.dubbo.demo.DemoService.").toString());
    }

    public Object getPropertyValue(Object object, String string) {
        try {
            DemoService demoService = (DemoService)object;
        }
        catch (Throwable throwable) {
            throw new IllegalArgumentException(throwable);
        }
        throw new NoSuchPropertyException(new StringBuffer().append("Not found property \"").append(string).append("\" field or setter method in class org.apache.dubbo.demo.DemoService.").toString());
    }

    public Object invokeMethod(Object object, String string, Class[] arrclass, Object[] arrobject) throws InvocationTargetException {
        DemoService demoService;
        try {
            demoService = (DemoService)object;
        }
        catch (Throwable throwable) {
            throw new IllegalArgumentException(throwable);
        }
        try {
            if ("sayHello".equals(string) && arrclass.length == 1) {
                return demoService.sayHello((String)arrobject[0]);
            }
        }
        catch (Throwable throwable) {
            throw new InvocationTargetException(throwable);
        }
        throw new NoSuchMethodException(new StringBuffer().append("Not found method \"").append(string).append("\" in class org.apache.dubbo.demo.DemoService.").toString());
    }
}

2.導(dǎo)出服務(wù)到本地

根據(jù)執(zhí)行順序先來看看exportLocal

private void exportLocal(URL url) {
    // 設(shè)置協(xié)議頭為 injvm
    URL local = URLBuilder.from(url)
            .setProtocol(LOCAL_PROTOCOL)
            .setHost(LOCALHOST_VALUE)
            .setPort(0)
            .build();
    // 創(chuàng)建 Invoker,并導(dǎo)出服務(wù)寓盗,這里的 protocol 會在運行時調(diào)用 InjvmProtocol 的 export 方法
    Exporter<?> exporter = protocol.export(
            PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, local));
    exporters.add(exporter);
    logger.info("Export dubbo service " + interfaceClass.getName() + " to local registry url : " + local);
}

調(diào)用的是InjvmProtocol的實現(xiàn)灌砖,把生成的invoker保存在AbstractExporter中,當(dāng)調(diào)用invoker的doInvoke方法時會通過wrapper類調(diào)用具體實現(xiàn)類的方法

3.導(dǎo)出服務(wù)到遠(yuǎn)程

Invoker 是由 ProxyFactory 創(chuàng)建而來傀蚌。ProxyFactory有多個實現(xiàn)類基显,那么這里使用的是哪個實現(xiàn)類呢?答案就在前文的創(chuàng)建自適應(yīng)類中善炫,查看前文生成的ProxyFactory自適應(yīng)類撩幽,發(fā)現(xiàn)是根據(jù)URL的proxy參數(shù)獲取對應(yīng)的實現(xiàn)類,沒有配置則默認(rèn)取JavassistProxyFactory箩艺。下面到 JavassistProxyFactory 代碼中窜醉,探索 Invoker 的創(chuàng)建過程。

public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {
    // TODO Wrapper cannot handle this scenario correctly: the classname contains '$'
    // 為目標(biāo)類創(chuàng)建 Wrapper
    final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type);
    // 創(chuàng)建匿名 Invoker 類對象艺谆,并實現(xiàn) doInvoke 方法
    return new AbstractProxyInvoker<T>(proxy, type, url) {
        @Override
        protected Object doInvoke(T proxy, String methodName,
                                  Class<?>[] parameterTypes,
                                  Object[] arguments) throws Throwable {
            // 調(diào)用 Wrapper 的 invokeMethod 方法榨惰,invokeMethod 最終會調(diào)用目標(biāo)方法
            return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);
        }
    };
}

這里依然是通過getWrapper生成代理類,然后再使用AbstractProxyInvoker包裝一下静汤,AbstractProxyInvoker也就是直接調(diào)用代理類的invokeMethod方法琅催。
回到doExportUrlsFor1Protocol中居凶,接下來是導(dǎo)出服務(wù)到遠(yuǎn)程

Exporter<?> exporter = protocol.export(wrapperInvoker);

其中這里用到了前面提到的protocol的自適應(yīng)生成類,生成類export方法代碼例子如下

public Exporter export(Invoker invoker) throws RpcException {
    String string;
    if (invoker == null) {
        throw new IllegalArgumentException("org.apache.dubbo.rpc.Invoker argument == null");
    }
    if (invoker.getUrl() == null) {
        throw new IllegalArgumentException("org.apache.dubbo.rpc.Invoker argument getUrl() == null");
    }
    URL uRL = invoker.getUrl();
    String string2 = string = uRL.getProtocol() == null ? "dubbo" : uRL.getProtocol();
    if (string == null) {
        throw new IllegalStateException(new StringBuffer().append("Failed to get extension (org.apache.dubbo.rpc.Protocol) name from url (").append(uRL.toString()).append(") use keys([protocol])").toString());
    }
    Protocol protocol = (Protocol)ExtensionLoader.getExtensionLoader(Protocol.class).getExtension(string);
    return protocol.export(invoker);
}

從這里看出會根據(jù)wrapperInvoker中的url屬性的protocol元素路由到具體的實現(xiàn)類恢暖。而在前面的loadRegistries方法會把url的protocol設(shè)置為registry排监。所以接下來就到了RegistryProtocol的export方法進(jìn)行遠(yuǎn)程發(fā)布狰右,接下來就看這個export方法(PS:這里從自適應(yīng)類之后是先到ProtocolListenerWrapper然后到ProtocolFilterWrapper杰捂,因為它們兩是protocol這個類的包裝類,但是在provider端export方法什么也不會做棋蚌,所以這里直接分析RegistryProtocol的export)嫁佳。

@Override
public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
    // 獲取注冊中心 URL,以 zookeeper 注冊中心為例谷暮,得到的示例 URL 如下:
    // zookeeper://127.0.0.1:2181/com.alibaba.dubbo.registry.RegistryService?application=demo-provider&dubbo=2.0.2&export=dubbo%3A%2F%2F172.17.48.52%3A20880%2Fcom.alibaba.dubbo.demo.DemoService%3Fanyhost%3Dtrue%26application%3Ddemo-provider
    URL registryUrl = getRegistryUrl(originInvoker);
    // url to export locally
    // 獲取已注冊的服務(wù)提供者 URL蒿往,比如:
    // dubbo://172.17.48.52:20880/com.alibaba.dubbo.demo.DemoService?anyhost=true&application=demo-provider&dubbo=2.0.2&generic=false&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello
    URL providerUrl = getProviderUrl(originInvoker);

    // Subscribe the override data
    // FIXME When the provider subscribes, it will affect the scene : a certain JVM exposes the service and call
    //  the same service. Because the subscribed is cached key with the name of the service, it causes the
    //  subscription information to cover.
    // 獲取訂閱 URL,比如:
    // provider://172.17.48.52:20880/com.alibaba.dubbo.demo.DemoService?category=configurators&check=false&anyhost=true&application=demo-provider&dubbo=2.0.2&generic=false&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello
    final URL overrideSubscribeUrl = getSubscribedOverrideUrl(providerUrl);
    // 向注冊中心進(jìn)行訂閱 override 數(shù)據(jù)
    final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker);
    // 創(chuàng)建監(jiān)聽器
    overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);

    // 根據(jù)配置修改providerUrl湿弦,
    providerUrl = overrideUrlWithConfig(providerUrl, overrideSubscribeListener);
    //export invoker
    final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker, providerUrl);

    // url to registry
    final Registry registry = getRegistry(originInvoker);
    final URL registeredProviderUrl = getRegisteredProviderUrl(providerUrl, registryUrl);
    ProviderInvokerWrapper<T> providerInvokerWrapper = ProviderConsumerRegTable.registerProvider(originInvoker,
            registryUrl, registeredProviderUrl);
    //to judge if we need to delay publish
    boolean register = providerUrl.getParameter(REGISTER_KEY, true);
    if (register) {
        register(registryUrl, registeredProviderUrl);
        providerInvokerWrapper.setReg(true);
    }

    // Deprecated! Subscribe to override rules in 2.6.x or before.
    registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);

    exporter.setRegisterUrl(registeredProviderUrl);
    exporter.setSubscribeUrl(overrideSubscribeUrl);
    //Ensure that a new exporter instance is returned every time export
    return new DestroyableExporter<>(exporter);
}

先來看下doLocalExport方法

private <T> ExporterChangeableWrapper<T> doLocalExport(final Invoker<T> originInvoker, URL providerUrl) {
    String key = getCacheKey(originInvoker);

    return (ExporterChangeableWrapper<T>) bounds.computeIfAbsent(key, s -> {
        Invoker<?> invokerDelegate = new InvokerDelegate<>(originInvoker, providerUrl);
        // 調(diào)用 protocol 的 export 方法導(dǎo)出服務(wù)
        return new ExporterChangeableWrapper<>((Exporter<T>) protocol.export(invokerDelegate), originInvoker);
    });
}

假設(shè)運行時協(xié)議為 dubbo瓤漏,此處的 protocol 變量會在運行時加載 DubboProtocol,并調(diào)用 DubboProtocol 的 export 方法颊埃。所以蔬充,接下來目光轉(zhuǎn)移到 DubboProtocol 的 export 方法上,相關(guān)分析如下

public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
    URL url = invoker.getUrl();

    // export service.
    // 獲取服務(wù)標(biāo)識班利,理解成服務(wù)坐標(biāo)也行饥漫。由服務(wù)組名,服務(wù)名罗标,服務(wù)版本號以及端口組成庸队。比如:
    // demoGroup/com.alibaba.dubbo.demo.DemoService:1.0.1:20880
    String key = serviceKey(url);
    // 創(chuàng)建 DubboExporter
    DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);
    // 將 <key, exporter> 鍵值對放入緩存中
    exporterMap.put(key, exporter);

    //export an stub service for dispatching event
    // 本地存根相關(guān)代碼
    Boolean isStubSupportEvent = url.getParameter(STUB_EVENT_KEY, DEFAULT_STUB_EVENT);
    Boolean isCallbackservice = url.getParameter(IS_CALLBACK_SERVICE, false);
    if (isStubSupportEvent && !isCallbackservice) {
        String stubServiceMethods = url.getParameter(STUB_EVENT_METHODS_KEY);
        if (stubServiceMethods == null || stubServiceMethods.length() == 0) {
            if (logger.isWarnEnabled()) {
                logger.warn(new IllegalStateException("consumer [" + url.getParameter(INTERFACE_KEY) +
                        "], has set stubproxy support event ,but no stub methods founded."));
            }

        } else {
            stubServiceMethodsMap.put(url.getServiceKey(), stubServiceMethods);
        }
    }

    // 啟動服務(wù)器
    openServer(url);
    // 優(yōu)化序列化
    optimizeSerialization(url);

    return exporter;
}

重點看看openServer方法,別的邏輯不理解也不影響理解服務(wù)導(dǎo)出的流程

private void openServer(URL url) {
    // find server.
    // 獲取 host:port闯割,并將其作為服務(wù)器實例的 key彻消,用于標(biāo)識當(dāng)前的服務(wù)器實例
    String key = url.getAddress();
    //client can export a service which's only for server to invoke
    boolean isServer = url.getParameter(IS_SERVER_KEY, true);
    if (isServer) {
        // 訪問緩存
        ExchangeServer server = serverMap.get(key);
        if (server == null) {
            synchronized (this) {
                server = serverMap.get(key);
                if (server == null) {
                    // 創(chuàng)建服務(wù)器實例
                    serverMap.put(key, createServer(url));
                }
            }
        } else {
            // server supports reset, use together with override
            // 服務(wù)器已創(chuàng)建,則根據(jù) url 中的配置重置服務(wù)器
            server.reset(url);
        }
    }
}

在同一臺機(jī)器上(單網(wǎng)卡)宙拉,同一個端口上僅允許啟動一個服務(wù)器實例宾尚。若某個端口上已有服務(wù)器實例,此時則調(diào)用 reset 方法重置服務(wù)器的一些配置鼓黔。接下來看看createServer央勒。

private ExchangeServer createServer(URL url) {
    url = URLBuilder.from(url)
            // send readonly event when server closes, it's enabled by default
            .addParameterIfAbsent(CHANNEL_READONLYEVENT_SENT_KEY, Boolean.TRUE.toString())
            // enable heartbeat by default
            // 添加心跳檢測配置到 url 中
            .addParameterIfAbsent(HEARTBEAT_KEY, String.valueOf(DEFAULT_HEARTBEAT))
            // 添加編碼解碼器參數(shù)
            .addParameter(CODEC_KEY, DubboCodec.NAME)
            .build();
    // 獲取 server 參數(shù),默認(rèn)為 netty
    String str = url.getParameter(SERVER_KEY, DEFAULT_REMOTING_SERVER);

    // 通過 SPI 檢測是否存在 server 參數(shù)所代表的 Transporter 拓展澳化,不存在則拋出異常
    if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str)) {
        throw new RpcException("Unsupported server type: " + str + ", url: " + url);
    }

    ExchangeServer server;
    try {
        // 創(chuàng)建 ExchangeServer
        server = Exchangers.bind(url, requestHandler);
    } catch (RemotingException e) {
        throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e);
    }

    // 獲取 client 參數(shù)崔步,可指定 netty,mina
    str = url.getParameter(CLIENT_KEY);
    if (str != null && str.length() > 0) {
        // 獲取所有的 Transporter 實現(xiàn)類名稱集合缎谷,比如 supportedTypes = [netty, mina]
        Set<String> supportedTypes = ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions();
        // 檢測當(dāng)前 Dubbo 所支持的 Transporter 實現(xiàn)類名稱列表中井濒,
        // 是否包含 client 所表示的 Transporter灶似,若不包含,則拋出異常
        if (!supportedTypes.contains(str)) {
            throw new RpcException("Unsupported client type: " + str);
        }
    }

    return server;
}

createServer 包含三個核心的邏輯瑞你。第一是檢測是否存在 server 參數(shù)所代表的 Transporter 拓展酪惭,不存在則拋出異常。第二是創(chuàng)建服務(wù)器實例者甲。第三是檢測是否支持 client 參數(shù)所表示的 Transporter 拓展春感,不存在也是拋出異常。
接下來繼續(xù)看bind方法

public static ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
    if (url == null) {
        throw new IllegalArgumentException("url == null");
    }
    if (handler == null) {
        throw new IllegalArgumentException("handler == null");
    }
    // 獲取 Exchanger虏缸,默認(rèn)為 HeaderExchanger鲫懒。
    // 緊接著調(diào)用 HeaderExchanger 的 bind 方法創(chuàng)建 ExchangeServer 實例
    url = url.addParameterIfAbsent(Constants.CODEC_KEY, "exchange");
    return getExchanger(url).bind(url, handler);
}

這里又是通過dubbo SPI加載Exchanger的實現(xiàn)類,默認(rèn)實現(xiàn)是HeaderExchanger刽辙。接下來繼續(xù)看HeaderExchanger的bind方法

@Override
public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
    return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))));
}

這里只有一行代碼窥岩,但是包含了三個邏輯:

  • new HeaderExchangeHandler(handler),
  • new DecodeHandler(new HeaderExchangeHandler(handler))
  • Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler)))
    僅需關(guān)心 Transporters 的 bind 方法邏輯宰缤。
public static Server bind(URL url, ChannelHandler... handlers) throws RemotingException {
    if (url == null) {
        throw new IllegalArgumentException("url == null");
    }
    if (handlers == null || handlers.length == 0) {
        throw new IllegalArgumentException("handlers == null");
    }
    ChannelHandler handler;
    if (handlers.length == 1) {
        handler = handlers[0];
    } else {
        // 如果 handlers 元素數(shù)量大于1颂翼,則創(chuàng)建 ChannelHandler 分發(fā)器
        handler = new ChannelHandlerDispatcher(handlers);
    }
    // 獲取自適應(yīng) Transporter 實例,并調(diào)用實例方法
    return getTransporter().bind(url, handler);
}

自適應(yīng)拓展加載 Transporter慨灭,默認(rèn)為 NettyTransporter

public Server bind(URL url, ChannelHandler listener) throws RemotingException {
    // 創(chuàng)建 NettyServer
    return new NettyServer(url, listener);
}

public NettyServer(URL url, ChannelHandler handler) throws RemotingException {
    // you can customize name and type of client thread pool by THREAD_NAME_KEY and THREADPOOL_KEY in CommonConstants.
    // the handler will be warped: MultiMessageHandler->HeartbeatHandler->handler
    super(url, ChannelHandlers.wrap(handler, ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME)));
}

接下來看看ChannelHandlers.wrap朦乏,這個方法很有意思,是dubbo的線程派發(fā)模型的實現(xiàn)缘挑。

public class ChannelHandlers {

    private static ChannelHandlers INSTANCE = new ChannelHandlers();

    protected ChannelHandlers() {
    }

    public static ChannelHandler wrap(ChannelHandler handler, URL url) {
        return ChannelHandlers.getInstance().wrapInternal(handler, url);
    }

    protected static ChannelHandlers getInstance() {
        return INSTANCE;
    }

    static void setTestingChannelHandlers(ChannelHandlers instance) {
        INSTANCE = instance;
    }

    protected ChannelHandler wrapInternal(ChannelHandler handler, URL url) {
        // 一層又一層的裝飾器集歇,只關(guān)注最里面的dispatch方法
        return new MultiMessageHandler(new HeartbeatHandler(ExtensionLoader.getExtensionLoader(Dispatcher.class)
                .getAdaptiveExtension().dispatch(handler, url)));
    }
}

這里默認(rèn)的自適應(yīng)類是AllDispatcher,在后面的服務(wù)調(diào)用過程语淘,將會詳細(xì)講解一下诲宇。回到NettyServer惶翻,這里調(diào)用了父類AbstractServer的構(gòu)造方法姑蓝。

public AbstractServer(URL url, ChannelHandler handler) throws RemotingException {
    super(url, handler);
    localAddress = getUrl().toInetSocketAddress();

    // 獲取 ip 和端口
    String bindIp = getUrl().getParameter(Constants.BIND_IP_KEY, getUrl().getHost());
    int bindPort = getUrl().getParameter(Constants.BIND_PORT_KEY, getUrl().getPort());
    if (url.getParameter(ANYHOST_KEY, false) || NetUtils.isInvalidLocalHost(bindIp)) {
        // 設(shè)置 ip 為 0.0.0.0
        bindIp = ANYHOST_VALUE;
    }
    bindAddress = new InetSocketAddress(bindIp, bindPort);
    // 獲取最大可接受連接數(shù)
    this.accepts = url.getParameter(ACCEPTS_KEY, DEFAULT_ACCEPTS);
    this.idleTimeout = url.getParameter(IDLE_TIMEOUT_KEY, DEFAULT_IDLE_TIMEOUT);
    try {
        // 調(diào)用模板方法 doOpen 啟動服務(wù)器
        doOpen();
        if (logger.isInfoEnabled()) {
            logger.info("Start " + getClass().getSimpleName() + " bind " + getBindAddress() + ", export " + getLocalAddress());
        }
    } catch (Throwable t) {
        throw new RemotingException(url.toInetSocketAddress(), null, "Failed to bind " + getClass().getSimpleName()
                + " on " + getLocalAddress() + ", cause: " + t.getMessage(), t);
    }
    //fixme replace this with better method
    DataStore dataStore = ExtensionLoader.getExtensionLoader(DataStore.class).getDefaultExtension();
    executor = (ExecutorService) dataStore.get(Constants.EXECUTOR_SERVICE_COMPONENT_KEY, Integer.toString(url.getPort()));
}

doOpen方法由子類實現(xiàn),下面回到nettyservice中

protected void doOpen() throws Throwable {
    // 創(chuàng)建 ServerBootstrap
    bootstrap = new ServerBootstrap();

    // 創(chuàng)建 boss 和 worker 線程池
    bossGroup = new NioEventLoopGroup(1, new DefaultThreadFactory("NettyServerBoss", true));
    workerGroup = new NioEventLoopGroup(getUrl().getPositiveParameter(IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS),
            new DefaultThreadFactory("NettyServerWorker", true));

    final NettyServerHandler nettyServerHandler = new NettyServerHandler(getUrl(), this);
    channels = nettyServerHandler.getChannels();

    // 設(shè)置 PipelineFactory
    bootstrap.group(bossGroup, workerGroup)
            .channel(NioServerSocketChannel.class)
            .childOption(ChannelOption.TCP_NODELAY, Boolean.TRUE)
            .childOption(ChannelOption.SO_REUSEADDR, Boolean.TRUE)
            .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
            .childHandler(new ChannelInitializer<NioSocketChannel>() {
                @Override
                protected void initChannel(NioSocketChannel ch) throws Exception {
                    // FIXME: should we use getTimeout()?
                    int idleTimeout = UrlUtils.getIdleTimeout(getUrl());
                    NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);
                    ch.pipeline()//.addLast("logging",new LoggingHandler(LogLevel.INFO))//for debug
                            .addLast("decoder", adapter.getDecoder())
                            .addLast("encoder", adapter.getEncoder())
                            .addLast("server-idle-handler", new IdleStateHandler(0, 0, idleTimeout, MILLISECONDS))
                            .addLast("handler", nettyServerHandler);
                }
            });
    // bind
    // 綁定到指定的 ip 和端口上
    ChannelFuture channelFuture = bootstrap.bind(getBindAddress());
    channelFuture.syncUninterruptibly();
    channel = channelFuture.channel();

}

到此就已經(jīng)完成了在指定的端口開啟netty服務(wù)的過程吕粗,以及配置了一系列層層包裝的ChannelHandler纺荧。
回到RegistryProtocol的export,繼續(xù)看服務(wù)注冊和數(shù)據(jù)訂閱的邏輯

public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
     .......
    // url to registry
    final Registry registry = getRegistry(originInvoker);
    final URL registeredProviderUrl = getRegisteredProviderUrl(providerUrl, registryUrl);
    ProviderInvokerWrapper<T> providerInvokerWrapper = ProviderConsumerRegTable.registerProvider(originInvoker,
            registryUrl, registeredProviderUrl);
    //to judge if we need to delay publish
    boolean register = providerUrl.getParameter(REGISTER_KEY, true);
    if (register) {
        // 注冊服務(wù)
        register(registryUrl, registeredProviderUrl);
        providerInvokerWrapper.setReg(true);
    }

    // Deprecated! Subscribe to override rules in 2.6.x or before.
    registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);

    exporter.setRegisterUrl(registeredProviderUrl);
    exporter.setSubscribeUrl(overrideSubscribeUrl);
    //Ensure that a new exporter instance is returned every time export
    return new DestroyableExporter<>(exporter);
}
public void register(URL registryUrl, URL registeredProviderUrl) {
    // 獲取 Registry
    Registry registry = registryFactory.getRegistry(registryUrl);
    // 注冊服務(wù)
    registry.register(registeredProviderUrl);
}

以 Zookeeper 注冊中心為例進(jìn)行分析颅筋。下面先來看一下 getRegistry 方法的源碼宙暇,這個方法由 ZookeeperRegistryFactory 的父類 AbstractRegistryFactory 實現(xiàn)。

public Registry getRegistry(URL url) {
    url = URLBuilder.from(url)
            .setPath(RegistryService.class.getName())
            .addParameter(INTERFACE_KEY, RegistryService.class.getName())
            .removeParameters(EXPORT_KEY, REFER_KEY)
            .build();
    String key = url.toServiceStringWithoutResolving();
    // Lock the registry access process to ensure a single instance of the registry
    LOCK.lock();
    try {
        // 訪問緩存
        Registry registry = REGISTRIES.get(key);
        if (registry != null) {
            return registry;
        }
        //create registry by spi/ioc
        // 緩存未命中议泵,創(chuàng)建 Registry 實例
        registry = createRegistry(url);
        if (registry == null) {
            throw new IllegalStateException("Can not create registry " + url);
        }
        // 寫入緩存
        REGISTRIES.put(key, registry);
        return registry;
    } finally {
        // Release the lock
        LOCK.unlock();
    }
}

接下來去ZookeeperRegistryFactory中繼續(xù)看createRegistry方法

public class ZookeeperRegistryFactory extends AbstractRegistryFactory {

    public ZookeeperRegistry(URL url, ZookeeperTransporter zookeeperTransporter) {
        super(url);
        if (url.isAnyHost()) {
            throw new IllegalStateException("registry address == null");
        }
        // 獲取組名占贫,默認(rèn)為 dubbo
        String group = url.getParameter(GROUP_KEY, DEFAULT_ROOT);
        if (!group.startsWith(PATH_SEPARATOR)) {
            // group = "/" + group
            group = PATH_SEPARATOR + group;
        }
        this.root = group;
        // 創(chuàng)建 Zookeeper 客戶端,默認(rèn)為 CuratorZookeeperTransporter
        zkClient = zookeeperTransporter.connect(url);
        // 添加狀態(tài)監(jiān)聽器
        zkClient.addStateListener(state -> {
            if (state == StateListener.RECONNECTED) {
                try {
                    recover();
                } catch (Exception e) {
                    logger.error(e.getMessage(), e);
                }
            }
        });
    }

    // zookeeperTransporter 由 SPI 在運行時注入先口,類型為 ZookeeperTransporter$Adaptive
    private ZookeeperTransporter zookeeperTransporter;

    /**
     * Invisible injection of zookeeper client via IOC/SPI
     * @param zookeeperTransporter
     */
    public void setZookeeperTransporter(ZookeeperTransporter zookeeperTransporter) {
        this.zookeeperTransporter = zookeeperTransporter;
    }

    @Override
    public Registry createRegistry(URL url) {
        // 創(chuàng)建 ZookeeperRegistry
        return new ZookeeperRegistry(url, zookeeperTransporter);
    }

}

這里提一下recover()方法型奥,在重連zookeeper后會做兩個動作瞳收。

  • 1.添加所有的注冊連接到失敗連接集合中,并且創(chuàng)建定時timer默認(rèn)5秒中后重新建立連接厢汹,建立成功則從失敗連接集合中刪除螟深。
  • 2.通知directory,刷新配置烫葬,通知成功則從失敗集合刪除界弧。

接下來看看zookeeperTransporter.connect方法怎么獲取zkclient的

public ZookeeperClient connect(URL url) {
    ZookeeperClient zookeeperClient;
    List<String> addressList = getURLBackupAddress(url);
    // The field define the zookeeper server , including protocol, host, port, username, password
    // 根據(jù)連接地址從緩存獲取zkclient連接
    if ((zookeeperClient = fetchAndUpdateZookeeperClientCache(addressList)) != null && zookeeperClient.isConnected()) {
        logger.info("find valid zookeeper client from the cache for address: " + url);
        return zookeeperClient;
    }
    // avoid creating too many connections, so add lock
    synchronized (zookeeperClientMap) {
        if ((zookeeperClient = fetchAndUpdateZookeeperClientCache(addressList)) != null && zookeeperClient.isConnected()) {
            logger.info("find valid zookeeper client from the cache for address: " + url);
            return zookeeperClient;
        }
        
        // 緩存中獲取不到時新建
        zookeeperClient = createZookeeperClient(toClientURL(url));
        logger.info("No valid zookeeper client found from cache, therefore create a new client for url. " + url);
        // 寫進(jìn)緩存
        writeToClientMap(addressList, zookeeperClient);
    }
    return zookeeperClient;
}

其中createZookeeperClient由子類實現(xiàn)厘灼,繼續(xù)看CuratorZookeeperClient的createZookeeperClient方法

public ZookeeperClient createZookeeperClient(URL url) {
    return new CuratorZookeeperClient(url);
}

public CuratorZookeeperClient(URL url) {
    super(url);
    try {
        int timeout = url.getParameter(TIMEOUT_KEY, 5000);
        // 創(chuàng)建 CuratorFramework 構(gòu)造器
        CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder()
                .connectString(url.getBackupAddress())
                .retryPolicy(new RetryNTimes(1, 1000))
                .connectionTimeoutMs(timeout);
        String authority = url.getAuthority();
        if (authority != null && authority.length() > 0) {
            builder = builder.authorization("digest", authority.getBytes());
        }
        // 構(gòu)建 CuratorFramework 實例
        client = builder.build();
        // 添加監(jiān)聽器
        client.getConnectionStateListenable().addListener(new ConnectionStateListener() {
            @Override
            public void stateChanged(CuratorFramework client, ConnectionState state) {
                if (state == ConnectionState.LOST) {
                    CuratorZookeeperClient.this.stateChanged(StateListener.DISCONNECTED);
                } else if (state == ConnectionState.CONNECTED) {
                    CuratorZookeeperClient.this.stateChanged(StateListener.CONNECTED);
                } else if (state == ConnectionState.RECONNECTED) {
                    CuratorZookeeperClient.this.stateChanged(StateListener.RECONNECTED);
                }
            }
        });
        // 啟動客戶端
        client.start();
        boolean connected = client.blockUntilConnected(timeout, TimeUnit.MILLISECONDS);
        if (!connected) {
            throw new IllegalStateException("zookeeper not connected");
        }
    } catch (Exception e) {
        throw new IllegalStateException(e.getMessage(), e);
    }
}

獲取到Registry對象后就是調(diào)用registry.register正式的注冊服務(wù)夹纫,繼續(xù)看FailbackRegistry(ZookeeperRegistry的父類)的register方法

public void register(URL url) {
    super.register(url);
    removeFailedRegistered(url);
    removeFailedUnregistered(url);
    try {
        // Sending a registration request to the server side
        // 模板方法,由子類實現(xiàn)
        doRegister(url);
    } catch (Exception e) {
        Throwable t = e;

        // If the startup detection is opened, the Exception is thrown directly.
        // 獲取 check 參數(shù)设凹,若 check = true 將會直接拋出異常
        boolean check = getUrl().getParameter(Constants.CHECK_KEY, true)
                && url.getParameter(Constants.CHECK_KEY, true)
                && !CONSUMER_PROTOCOL.equals(url.getProtocol());
        boolean skipFailback = t instanceof SkipFailbackWrapperException;
        if (check || skipFailback) {
            if (skipFailback) {
                t = t.getCause();
            }
            throw new IllegalStateException("Failed to register " + url + " to registry " + getUrl().getAddress() + ", cause: " + t.getMessage(), t);
        } else {
            logger.error("Failed to register " + url + ", waiting for retry, cause: " + t.getMessage(), t);
        }

        // Record a failed registration request to a failed list, retry regularly
        // 記錄注冊失敗的鏈接
        addFailedRegistered(url);
    }
}

接著看doRegister,回到 FailbackRegistry 子類 ZookeeperRegistry 中

protected void doRegister(URL url) {
    try {
        // 通過 Zookeeper 客戶端創(chuàng)建節(jié)點茅姜,節(jié)點路徑由 toUrlPath 方法生成闪朱,路徑格式如下:
        //   /${group}/${serviceInterface}/providers/${url}
        // 比如
        //   /dubbo/org.apache.dubbo.DemoService/providers/dubbo%3A%2F%2F127.0.0.1......
        zkClient.create(toUrlPath(url), url.getParameter(Constants.DYNAMIC_KEY, true));
    } catch (Throwable e) {
        throw new RpcException("Failed to register...");
    }
}

到此,dubbo服務(wù)就把自己的信息通過zkClient注冊到了特定的路徑下钻洒,并且注冊的是zk的臨時節(jié)點奋姿。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市素标,隨后出現(xiàn)的幾起案子称诗,更是在濱河造成了極大的恐慌,老刑警劉巖头遭,帶你破解...
    沈念sama閱讀 218,284評論 6 506
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件寓免,死亡現(xiàn)場離奇詭異,居然都是意外死亡计维,警方通過查閱死者的電腦和手機(jī)袜香,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,115評論 3 395
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來鲫惶,“玉大人蜈首,你說我怎么就攤上這事∏纺福” “怎么了欢策?”我有些...
    開封第一講書人閱讀 164,614評論 0 354
  • 文/不壞的土叔 我叫張陵,是天一觀的道長赏淌。 經(jīng)常有香客問我踩寇,道長,這世上最難降的妖魔是什么猜敢? 我笑而不...
    開封第一講書人閱讀 58,671評論 1 293
  • 正文 為了忘掉前任姑荷,我火速辦了婚禮盒延,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘鼠冕。我一直安慰自己添寺,他們只是感情好,可當(dāng)我...
    茶點故事閱讀 67,699評論 6 392
  • 文/花漫 我一把揭開白布懈费。 她就那樣靜靜地躺著计露,像睡著了一般。 火紅的嫁衣襯著肌膚如雪憎乙。 梳的紋絲不亂的頭發(fā)上票罐,一...
    開封第一講書人閱讀 51,562評論 1 305
  • 那天,我揣著相機(jī)與錄音泞边,去河邊找鬼该押。 笑死,一個胖子當(dāng)著我的面吹牛阵谚,可吹牛的內(nèi)容都是我干的蚕礼。 我是一名探鬼主播,決...
    沈念sama閱讀 40,309評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼梢什,長吁一口氣:“原來是場噩夢啊……” “哼奠蹬!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起嗡午,我...
    開封第一講書人閱讀 39,223評論 0 276
  • 序言:老撾萬榮一對情侶失蹤囤躁,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后荔睹,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體狸演,經(jīng)...
    沈念sama閱讀 45,668評論 1 314
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,859評論 3 336
  • 正文 我和宋清朗相戀三年应媚,在試婚紗的時候發(fā)現(xiàn)自己被綠了严沥。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 39,981評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡中姜,死狀恐怖消玄,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情丢胚,我是刑警寧澤翩瓜,帶...
    沈念sama閱讀 35,705評論 5 347
  • 正文 年R本政府宣布,位于F島的核電站携龟,受9級特大地震影響兔跌,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜峡蟋,卻給世界環(huán)境...
    茶點故事閱讀 41,310評論 3 330
  • 文/蒙蒙 一坟桅、第九天 我趴在偏房一處隱蔽的房頂上張望华望。 院中可真熱鬧,春花似錦仅乓、人聲如沸赖舟。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,904評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽宾抓。三九已至,卻和暖如春豫喧,著一層夾襖步出監(jiān)牢的瞬間石洗,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 33,023評論 1 270
  • 我被黑心中介騙來泰國打工紧显, 沒想到剛下飛機(jī)就差點兒被人妖公主榨干…… 1. 我叫王不留讲衫,地道東北人。 一個月前我還...
    沈念sama閱讀 48,146評論 3 370
  • 正文 我出身青樓鸟妙,卻偏偏與公主長得像焦人,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子重父,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 44,933評論 2 355

推薦閱讀更多精彩內(nèi)容