Dubbo之服務(wù)暴露源碼分析

時序圖

在講解源碼前舱卡,先看下官方文檔提供的時序圖轮锥,后面的講解基本是這個路線要尔,但是會更細(xì)節(jié)化


服務(wù)暴露.png

大致邏輯

首先服務(wù)的實現(xiàn)bean在我們的spring容器中既绩,我們會創(chuàng)建一個Invoker通過代理調(diào)用ref中的方法还惠,同時Invoker會在protocol的export方法中會轉(zhuǎn)換為Exporter,并且保存在protocol對象的exporterMap中衰粹,然后進行暴露铝耻。

重要概念

Protocol

Protocol 是服務(wù)域田篇,它是 Invoker 暴露和引用的主功能入口泊柬,它負(fù)責(zé) Invoker
的生命周期管理兽赁。
接口定義如下

@SPI("dubbo")
public interface Protocol {
    
    /**
     * 獲取缺省端口刀崖,當(dāng)用戶沒有配置端口時使用亮钦。
     * 
     * @return 缺省端口
     */
    int getDefaultPort();

    /**
     * 暴露遠(yuǎn)程服務(wù):<br>
     * 1. 協(xié)議在接收請求時充活,應(yīng)記錄請求來源方地址信息:RpcContext.getContext().setRemoteAddress();<br>
     * 2. export()必須是冪等的混卵,也就是暴露同一個URL的Invoker兩次幕随,和暴露一次沒有區(qū)別赘淮。<br>
     * 3. export()傳入的Invoker由框架實現(xiàn)并傳入梢卸,協(xié)議不需要關(guān)心低剔。<br>
     * 
     * @param <T> 服務(wù)的類型
     * @param invoker 服務(wù)的執(zhí)行體
     * @return exporter 暴露服務(wù)的引用,用于取消暴露
     * @throws RpcException 當(dāng)暴露服務(wù)出錯時拋出枕赵,比如端口已占用
     */
    @Adaptive
    <T> Exporter<T> export(Invoker<T> invoker) throws RpcException;

    /**
     * 引用遠(yuǎn)程服務(wù):<br>
     * 1. 當(dāng)用戶調(diào)用refer()所返回的Invoker對象的invoke()方法時拷窜,協(xié)議需相應(yīng)執(zhí)行同URL遠(yuǎn)端export()傳入的Invoker對象的invoke()方法篮昧。<br>
     * 2. refer()返回的Invoker由協(xié)議實現(xiàn)懊昨,協(xié)議通常需要在此Invoker中發(fā)送遠(yuǎn)程請求酵颁。<br>
     * 3. 當(dāng)url中有設(shè)置check=false時躏惋,連接失敗不能拋出異常簿姨,并內(nèi)部自動恢復(fù)扁位。<br>
     * 
     * @param <T> 服務(wù)的類型
     * @param type 服務(wù)的類型
     * @param url 遠(yuǎn)程服務(wù)的URL地址
     * @return invoker 服務(wù)的本地代理
     * @throws RpcException 當(dāng)連接服務(wù)提供方失敗時拋出
     */
    @Adaptive
    <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException;

    /**
     * 釋放協(xié)議:<br>
     * 1. 取消該協(xié)議所有已經(jīng)暴露和引用的服務(wù)贤牛。<br>
     * 2. 釋放協(xié)議所占用的所有資源,比如連接和端口沽讹。<br>
     * 3. 協(xié)議在釋放后爽雄,依然能暴露和引用新的服務(wù)沐鼠。<br>
     */
    void destroy();

}

Invoker

Invoker 是實體域,它是 Dubbo 的核心模型焰檩,其它模型都向它靠擾析苫,或轉(zhuǎn)換成
它衩侥,它代表一個可執(zhí)行體茫死,可向它發(fā)起 invoke 調(diào)用璧榄,它有可能是一個本地的
實現(xiàn)骨杂,也可能是一個遠(yuǎn)程的實現(xiàn)搓蚪,也可能一個集群實現(xiàn)妒潭。
接口定義如下

public interface Invoker<T> extends Node {
    Class<T> getInterface();
    Result invoke(Invocation invocation) throws RpcException;
}

Invocation

Invocation 是會話域雳灾,它持有調(diào)用過程中的變量谎亩,比如方法名匈庭,參數(shù)等阱持。
接口定義如下

public interface Invocation {
    String getMethodName();
    Class<?>[] getParameterTypes();
    Object[] getArguments();
    Map<String, String> getAttachments();
    String getAttachment(String key);
    String getAttachment(String key, String defaultValue);
        Invoker<?> getInvoker();
}

Exporter

Exporter用來封裝不同協(xié)議暴露的Invoker衷咽,因為Invoker可以被多個Protocol暴露镶骗,因為每種Protocol都有各自的Exproter子類
接口定義如下

public interface Exporter<T> {
    
    Invoker<T> getInvoker();
    
    void unexport();

}

我的一些定義

本地暴露

本地暴露分為兩種巩那,通過遠(yuǎn)程協(xié)議還是本地協(xié)議暴露
本地協(xié)議的話即横,基于進程通信东囚,所以不需要進行遠(yuǎn)程暴露页藻,具體實現(xiàn)只有InjvmProtocol
而基于遠(yuǎn)程協(xié)議的暴露份帐,需要開啟服務(wù)監(jiān)聽废境,處理其他進程發(fā)來的rpc請求噩凹,同時可以選擇進行遠(yuǎn)程暴露驮宴,具體實現(xiàn)有DubboProtocol堵泽,HessianProtocol等

遠(yuǎn)程暴露

遠(yuǎn)程暴露落恼,就是將本地暴露的url發(fā)布到注冊中心,這個暴露為了讓服務(wù)引用者感知到服務(wù)的存在
遠(yuǎn)程暴露對應(yīng)RegistryProtocol

遠(yuǎn)程暴露URL和本地暴露URL

進行遠(yuǎn)程暴露的時候滋戳,要先進行本地暴露奸鸯,所以遠(yuǎn)程暴露URL里面有一個export參數(shù)會包含本地暴露URL
遠(yuǎn)程暴露URL主要是用來選擇暴露的注冊中心娄涩,注冊本地暴露URL蓄拣,以及增加事件監(jiān)聽

源碼分析

解析配置

先看下我們平時是如何配置dubbo服務(wù)暴露的

<dubbo:service interface="com.alibaba.dubbo.demo.bid.BidService" ref="bidService"  protocol="dubbo" />

上面的配置會通過自定義解析器DubboNamespaceHandler解析到ServiceBean對象

registerBeanDefinitionParser("service", new DubboBeanDefinitionParser(ServiceBean.class, true));

然后在ServiceBean初始化完成后進行服務(wù)暴露


image.png

可以看到ServiceBean實現(xiàn)了Initializing接口辜昵,可以在afterPropertiesSet看到服務(wù)暴露的邏輯

if (! isDelay()) {
            export();
        }

這邊的Delay并不是服務(wù)具體的暴露行為進行延遲堪置,而是控制這個暴露行為在什么時候觸發(fā)

private boolean isDelay() {
        Integer delay = getDelay();
        ProviderConfig provider = getProvider();
        if (delay == null && provider != null) {
            delay = provider.getDelay();
        }
        return supportedApplicationListener && (delay == null || delay.intValue() == -1);
    }

上述代碼的意思是,如果支持Spring的事件監(jiān)聽坎匿,并且沒有配置延遲暴露碑诉,推遲到容器refresh完成的時候觸發(fā)服務(wù)暴露邏輯进栽,如果配置了delay,那么直接在afterPropertiesSet內(nèi)調(diào)用暴露方法
我理解為一個是容器級別的delay快毛,一個是服務(wù)級別的delay

export方法

export方法在ServiceConfig中

public synchronized void export() {
        if (provider != null) {
            if (export == null) {
                export = provider.getExport();
            }
            if (delay == null) {
                delay = provider.getDelay();
            }
        }
        if (export != null && ! export.booleanValue()) {
            return;
        }
        if (delay != null && delay > 0) {
            Thread thread = new Thread(new Runnable() {
                public void run() {
                    try {
                        Thread.sleep(delay);
                    } catch (Throwable e) {
                    }
                    doExport();
                }
            });
            thread.setDaemon(true);
            thread.setName("DelayExportServiceThread");
            thread.start();
        } else {
            doExport();
        }
    }

這邊會根據(jù)是否配置了delay參數(shù),進行延遲暴露襟衰,通過線程休眠來實現(xiàn)
doExport方法涉及很多參數(shù)的校驗與設(shè)置瀑晒,遇到具體功能點再做分析苔悦,具體暴露邏輯調(diào)用了方法doExportUrls

 private void doExportUrls() {
        List<URL> registryURLs = loadRegistries(true);
        for (ProtocolConfig protocolConfig : protocols) {
            doExportUrlsFor1Protocol(protocolConfig, registryURLs);
        }
    }

doExportUrls方法首先會獲取注冊中心的URL玖详,雖說可以配置很多個注冊中心拗踢,但是我們就把它當(dāng)成一個好了
然后根據(jù)service配置的不同協(xié)議秒拔,調(diào)用doExportUrlsFor1Protocol方法分別進行暴露
在doExportUrlsFor1Protocol的前半部分又是各種參數(shù)的提取砂缩,用來生成最終暴露的URL庵芭,我們關(guān)注核心的暴露邏輯

//1
URL url = new URL(name, host, port, (contextPath == null || contextPath.length() == 0 ? "" : contextPath + "/") + path, map);

        if (ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class)
                .hasExtension(url.getProtocol())) {
            url = ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class)
                    .getExtension(url.getProtocol()).getConfigurator(url).configure(url);
        }
//2
        String scope = url.getParameter(Constants.SCOPE_KEY);
        //配置為none不暴露
        if (! Constants.SCOPE_NONE.toString().equalsIgnoreCase(scope)) {

            //配置不是remote的情況下做本地暴露 (配置為remote,則表示只暴露遠(yuǎn)程服務(wù))
//3
            if (!Constants.SCOPE_REMOTE.toString().equalsIgnoreCase(scope)) {
                exportLocal(url);
            }
            //如果配置不是local則暴露為遠(yuǎn)程服務(wù).(配置為local好乐,則表示只暴露遠(yuǎn)程服務(wù))
//4
            if (! Constants.SCOPE_LOCAL.toString().equalsIgnoreCase(scope) ){
                if (logger.isInfoEnabled()) {
                    logger.info("Export dubbo service " + interfaceClass.getName() + " to url " + url);
                }
//5
                if (registryURLs != null && registryURLs.size() > 0
                        && url.getParameter("register", true)) {
                    for (URL registryURL : registryURLs) {
                        url = url.addParameterIfAbsent("dynamic", registryURL.getParameter("dynamic"));
                        URL monitorUrl = loadMonitor(registryURL);
                        if (monitorUrl != null) {
                            url = url.addParameterAndEncoded(Constants.MONITOR_KEY, monitorUrl.toFullString());
                        }
                        if (logger.isInfoEnabled()) {
                            logger.info("Register dubbo service " + interfaceClass.getName() + " url " + url + " to registry " + registryURL);
                        }
                        Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString()));

                        Exporter<?> exporter = protocol.export(invoker);
                        exporters.add(exporter);
                    }
                } else {
                    Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, url);

                    Exporter<?> exporter = protocol.export(invoker);
                    exporters.add(exporter);
                }
            }
        }
        this.urls.add(url);

注意上面代碼1處,就是使用之前提取的參數(shù)生成本地暴露url的邏輯
而代碼2處的scope也還是比較重要的反璃,它控制了服務(wù)應(yīng)該怎么暴露淮蜈,我們項目中一般對service不進行scope配置,那么取到的值為null柿扣,代碼3和代碼4的條件都會滿足未状,既會進行本地協(xié)議的本地暴露司草,也會進行遠(yuǎn)程暴露
而代碼5埋虹,我們可以配置register="false",直接進行遠(yuǎn)程協(xié)議的本地暴露搔课,不記錄到注冊中心上去,但是我們還是可以通過在消費者強制配置url來調(diào)用

    <dubbo:service interface="com.alibaba.dubbo.demo.bid.BidService" ref="bidService"  protocol="dubbo" register="false"/>

服務(wù)暴露的邏輯其實是同一套

Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, url);
Exporter<?> exporter = protocol.export(invoker);

主要的區(qū)別點還是在于Url的不同袍啡,因為url帶了不同的protocol以及其他配置境输,然后具體暴露時嗅剖,使用之前講的SPI來調(diào)用不同實現(xiàn)
比如在exportLocal方法里信粮,其實會把url的protocol修改為injvm

private void exportLocal(URL url) {
        if (!Constants.LOCAL_PROTOCOL.equalsIgnoreCase(url.getProtocol())) {
//修改protocol為Injvm
            URL local = URL.valueOf(url.toFullString())
                    .setProtocol(Constants.LOCAL_PROTOCOL)
                    .setHost(NetUtils.LOCALHOST)
                    .setPort(0);

            // modified by lishen
            ServiceClassHolder.getInstance().pushServiceClass(getServiceClass(ref));

            Exporter<?> exporter = protocol.export(
                    proxyFactory.getInvoker(ref, (Class) interfaceClass, local));
            exporters.add(exporter);
            logger.info("Export dubbo service " + interfaceClass.getName() +" to local registry");
        }
    }

在存在注冊中心,并且服務(wù)的Registry屬性不為false的情況下會進行遠(yuǎn)程暴露欺旧,會在注冊中心url的export參數(shù)帶上原先的本地暴露url進行遠(yuǎn)程暴露,因此暴露使用的protocol也相應(yīng)變?yōu)镽egistryProtocol

Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString()));

下面講解具體暴露的邏輯

ref轉(zhuǎn)換為Invoker

在進行暴露之前称龙,我們需要將spring容器內(nèi)的接口實現(xiàn)ref轉(zhuǎn)換為invoker鲫尊,通過proxyFactory.getInvoker(ref, (Class) interfaceClass, local)方法

proxyFactory是一個擴展點疫向,有javaassist和jdk動態(tài)代理兩種實現(xiàn)搔驼,默認(rèn)實現(xiàn)為javaassist糯耍,并且提供一個包裝類StubProxyFactoryWrapper用于提供降級服務(wù)(以后單獨講解)

public class JavassistProxyFactory extends AbstractProxyFactory {

    @SuppressWarnings("unchecked")
    public <T> T getProxy(Invoker<T> invoker, Class<?>[] interfaces) {
        return (T) Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker));
    }

    public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {
        // TODO Wrapper類不能正確處理帶$的類名
        final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type);
        return new AbstractProxyInvoker<T>(proxy, type, url) {
            @Override
            protected Object doInvoke(T proxy, String methodName, 
                                      Class<?>[] parameterTypes, 
                                      Object[] arguments) throws Throwable {
                return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);
            }
        };
    }

}

JavassistProxyFactory 中的Wapper類是動態(tài)生成的,可以針對接口的每個方法生成直接調(diào)用的代碼哗伯,避免了反射荒揣,因為做了緩存,多次調(diào)用的情況下焊刹,會加快效率系任,而jdk實現(xiàn)用的反射效率應(yīng)該差多了

//JdkProxyFactory的AbstractProxyInvoker實現(xiàn)
return new AbstractProxyInvoker<T>(proxy, type, url) {
            @Override
            protected Object doInvoke(T proxy, String methodName, 
                                      Class<?>[] parameterTypes, 
                                      Object[] arguments) throws Throwable {
                Method method = proxy.getClass().getMethod(methodName, parameterTypes);
                return method.invoke(proxy, arguments);
            }
        };

通過proxyFactory我們會得到一個父類為AbstractProxyInvoker的匿名Invoker類,內(nèi)部通過反射或者動態(tài)生成字節(jié)碼來調(diào)用目標(biāo)ref的方法

通過protocol暴露

在得到Invoker后虐块,我們通過protocol去進行服務(wù)暴露俩滥,暴露成功后得到Exporter引用

 <T> Exporter<T> export(Invoker<T> invoker) throws RpcException;

通過Protocol的export方法贺奠,需要將invoker轉(zhuǎn)換為exporter霜旧,為什么?
因為Invoker只負(fù)責(zé)對具體方法的調(diào)用儡率,但是方法的調(diào)用可以暴露到多個Protocol挂据,所以需要有具體的Exporter來對應(yīng),比如Dubbo暴露得到DubboExporter,injvm暴露得到InjvmExporter

下面講解每種protocol的暴露

通過InjvmProtocol暴露

InjvmProtocol是本地暴露中唯一使用本地協(xié)議的儿普,意思就是說這個服務(wù)的url不能發(fā)布到注冊中心崎逃,只能本地消費,在dubbo引用服務(wù)的邏輯中眉孩,如果發(fā)現(xiàn)本地InjvmProtocol中有所需要的Exproter个绍,會優(yōu)先選擇引用本地
看下暴露的代碼

public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
        return new InjvmExporter<T>(invoker, invoker.getUrl().getServiceKey(), exporterMap);
    }

exporterMap為InjvmProtocol繼承AbstractProtocol的一個參數(shù)勒葱,用來保存Exproter引用,同時exporterMap也會在InjvmExporter內(nèi)被引用巴柿,主要用于卸載功能

class InjvmExporter<T> extends AbstractExporter<T> {

    private final String key;
    
    private final Map<String, Exporter<?>> exporterMap;

    InjvmExporter(Invoker<T> invoker, String key, Map<String, Exporter<?>> exporterMap){
        super(invoker);
        this.key = key;
        this.exporterMap = exporterMap;
        exporterMap.put(key, this);
    }

    public void unexport() {
        super.unexport();
        exporterMap.remove(key);
    }

}

關(guān)于這個exporterMap凛虽,由于每個Protocol實現(xiàn)都繼承了AbstractProtocol,所以都會有exporterMap屬性广恢,并且每種Protocol在容器內(nèi)只存在一個凯旋,我們可以在每個Protocol對象的exporterMap中拿到這個Protocol暴露的所有Exproter

通過RegistryProtocol暴露

這是遠(yuǎn)程暴露,在進行本地暴露的同時將本地暴露的url注冊到注冊中心同時也注冊事件監(jiān)聽

public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
        //export invoker
//1
        final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker);
        //registry provider
        final Registry registry = getRegistry(originInvoker);
        final URL registedProviderUrl = getRegistedProviderUrl(originInvoker);
        registry.register(registedProviderUrl);
        // 訂閱override數(shù)據(jù)
        // FIXME 提供者訂閱時袁波,會影響同一JVM即暴露服務(wù)瓦阐,又引用同一服務(wù)的的場景,因為subscribed以服務(wù)名為緩存的key篷牌,導(dǎo)致訂閱信息覆蓋。
        final URL overrideSubscribeUrl = getSubscribedOverrideUrl(registedProviderUrl);
        final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl);
        overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);
//注冊監(jiān)聽事件踏幻,用于url被修改時回調(diào)枷颊,進行exporter重新暴露
        registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);
        //保證每次export都返回一個新的exporter實例
        return new Exporter<T>() {
            public Invoker<T> getInvoker() {
                return exporter.getInvoker();
            }
            public void unexport() {
                try {
                    exporter.unexport();
                } catch (Throwable t) {
                    logger.warn(t.getMessage(), t);
                }
                try {
                    registry.unregister(registedProviderUrl);
                } catch (Throwable t) {
                    logger.warn(t.getMessage(), t);
                }
                try {
                    overrideListeners.remove(overrideSubscribeUrl);
                    registry.unsubscribe(overrideSubscribeUrl, overrideSubscribeListener);
                } catch (Throwable t) {
                    logger.warn(t.getMessage(), t);
                }
            }
        };
    }

注意代碼1處,有一個本地暴露该面,看下代碼

 private <T> ExporterChangeableWrapper<T>  doLocalExport(final Invoker<T> originInvoker){
        String key = getCacheKey(originInvoker);
        ExporterChangeableWrapper<T> exporter = (ExporterChangeableWrapper<T>) bounds.get(key);
        if (exporter == null) {
            synchronized (bounds) {
                exporter = (ExporterChangeableWrapper<T>) bounds.get(key);
                if (exporter == null) {
                    final Invoker<?> invokerDelegete = new InvokerDelegete<T>(originInvoker, getProviderUrl(originInvoker));
                    exporter = new ExporterChangeableWrapper<T>((Exporter<T>)protocol.export(invokerDelegete), originInvoker);
                    bounds.put(key, exporter);
                }
            }
        }
        return (ExporterChangeableWrapper<T>) exporter;
    }

getProviderUrl用于從遠(yuǎn)程暴露url中的export參數(shù)中獲取本地暴露的url

private URL getProviderUrl(final Invoker<?> origininvoker){
        String export = origininvoker.getUrl().getParameterAndDecoded(Constants.EXPORT_KEY);
        if (export == null || export.length() == 0) {
            throw new IllegalArgumentException("The registry export url is null! registry: " + origininvoker.getUrl());
        }
        
        URL providerUrl = URL.valueOf(export);
        return providerUrl;
    }

export參數(shù)對應(yīng)的url才是需要實際本地暴露的夭苗,而作為export方法的遠(yuǎn)程暴露url只是為了注冊提供者url到注冊中心以及增加事件監(jiān)聽

同時注意一下bounds參數(shù)的校驗是為了防止同一個invoker重復(fù)暴露,而ExporterChangeableWrapper封裝是為了zookeeper中url發(fā)生改變時能修改Exporter

在完成本地暴露之后隔缀,會通過遠(yuǎn)程暴露url獲取注冊中心對象题造,然后把本地暴露url注冊上去,同時也會給zookeeper中本地暴露url對應(yīng)路徑注冊監(jiān)聽器猾瘸,用于監(jiān)聽zookeeper上面的暴露url發(fā)生變化的時候界赔,重新export(比如我們的控制臺可以對參數(shù)進行調(diào)整)

最后把export返回

通過DubboProtocol暴露

DubboProtocol是使用遠(yuǎn)程協(xié)議的本地暴露,所以可以將暴露url注冊到注冊中心
看下它的export方法

public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
        URL url = invoker.getUrl();
        
        // export service.
        String key = serviceKey(url);
        DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);
        exporterMap.put(key, exporter);
        
        //export an stub service for dispaching event
        Boolean isStubSupportEvent = url.getParameter(Constants.STUB_EVENT_KEY,Constants.DEFAULT_STUB_EVENT);
        Boolean isCallbackservice = url.getParameter(Constants.IS_CALLBACK_SERVICE, false);
        if (isStubSupportEvent && !isCallbackservice){
            String stubServiceMethods = url.getParameter(Constants.STUB_EVENT_METHODS_KEY);
            if (stubServiceMethods == null || stubServiceMethods.length() == 0 ){
                if (logger.isWarnEnabled()){
                    logger.warn(new IllegalStateException("consumer [" +url.getParameter(Constants.INTERFACE_KEY) +
                            "], has set stubproxy support event ,but no stub methods founded."));
                }
            } else {
                stubServiceMethodsMap.put(url.getServiceKey(), stubServiceMethods);
            }
        }

        openServer(url);

        // modified by lishen
        optimizeSerialization(url);

        return exporter;
    }

首先會把invoker轉(zhuǎn)換為DubboExporter牵触,放到exporterMap中
然后有一些stub的邏輯淮悼,這個以后單獨再講
接下來就是打開netty服務(wù),用于監(jiān)聽服務(wù)引用者的請求揽思,打開服務(wù)器邏輯在openServer中

 private void openServer(URL url) {
        // find server.
        String key = url.getAddress();
        //client 也可以暴露一個只有server可以調(diào)用的服務(wù)袜腥。
        boolean isServer = url.getParameter(Constants.IS_SERVER_KEY,true);
        if (isServer) {
            ExchangeServer server = serverMap.get(key);
            if (server == null) {
                serverMap.put(key, createServer(url));
            } else {
                //server支持reset,配合override功能使用
                server.reset(url);
            }
        }
    }

從String key = url.getAddress();以及ExchangeServer server = serverMap.get(key);可以看出來在一個應(yīng)用中,netty服務(wù)器針對每種協(xié)議只會起一個钉汗,因為每種協(xié)議只能配置一個端口
而reset方法羹令,會使用之后暴露url的參數(shù),覆蓋已經(jīng)開啟netty服務(wù)內(nèi)的參數(shù)
那么我們的服務(wù)器是怎么處理接收的rpc請求并調(diào)用對應(yīng)exporter調(diào)用呢损痰,進入createServer方法我們可以看到會netty服務(wù)的開啟時會綁定一個requestHandler

private ExchangeServer createServer(URL url) {
        //默認(rèn)開啟server關(guān)閉時發(fā)送readonly事件
        url = url.addParameterIfAbsent(Constants.CHANNEL_READONLYEVENT_SENT_KEY, Boolean.TRUE.toString());
        //默認(rèn)開啟heartbeat
        url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT));
        String str = url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_SERVER);

        if (str != null && str.length() > 0 && ! ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str))
            throw new RpcException("Unsupported server type: " + str + ", url: " + url);

        url = url.addParameter(Constants.CODEC_KEY, Version.isCompatibleVersion() ? COMPATIBLE_CODEC_NAME : DubboCodec.NAME);
        ExchangeServer server;
        try {
//1 綁定requestHandler
            server = Exchangers.bind(url, requestHandler);
        } catch (RemotingException e) {
            throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e);
        }
        str = url.getParameter(Constants.CLIENT_KEY);
        if (str != null && str.length() > 0) {
            Set<String> supportedTypes = ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions();
            if (!supportedTypes.contains(str)) {
                throw new RpcException("Unsupported client type: " + str);
            }
        }
        return server;
    }

這個requestHandler就是用來處理接收到的rpc調(diào)用請求的福侈,看下它內(nèi)部的邏輯

private ExchangeHandler requestHandler = new ExchangeHandlerAdapter() {
        
        public Object reply(ExchangeChannel channel, Object message) throws RemotingException {
            if (message instanceof Invocation) {
                Invocation inv = (Invocation) message;
                //通過inv獲取對應(yīng)invoker
                Invoker<?> invoker = getInvoker(channel, inv);
                //如果是callback 需要處理高版本調(diào)用低版本的問題
                if (Boolean.TRUE.toString().equals(inv.getAttachments().get(IS_CALLBACK_SERVICE_INVOKE))){
                    String methodsStr = invoker.getUrl().getParameters().get("methods");
                    boolean hasMethod = false;
                    if (methodsStr == null || methodsStr.indexOf(",") == -1){
                        hasMethod = inv.getMethodName().equals(methodsStr);
                    } else {
                        String[] methods = methodsStr.split(",");
                        for (String method : methods){
                            if (inv.getMethodName().equals(method)){
                                hasMethod = true;
                                break;
                            }
                        }
                    }
                    if (!hasMethod){
                        logger.warn(new IllegalStateException("The methodName "+inv.getMethodName()+" not found in callback service interface ,invoke will be ignored. please update the api interface. url is:" + invoker.getUrl()) +" ,invocation is :"+inv );
                        return null;
                    }
                }
                RpcContext.getContext().setRemoteAddress(channel.getRemoteAddress());
//用invoker執(zhí)行調(diào)用,返回結(jié)果
                return invoker.invoke(inv);
            }
            throw new RemotingException(channel, "Unsupported request: " + message == null ? null : (message.getClass().getName() + ": " + message) + ", channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress());
        }

        @Override
        public void received(Channel channel, Object message) throws RemotingException {
            if (message instanceof Invocation) {
                reply((ExchangeChannel) channel, message);
            } else {
                super.received(channel, message);
            }
        }

        @Override
        public void connected(Channel channel) throws RemotingException {
            invoke(channel, Constants.ON_CONNECT_KEY);
        }

        @Override
        public void disconnected(Channel channel) throws RemotingException {
            if(logger.isInfoEnabled()){
                logger.info("disconected from "+ channel.getRemoteAddress() + ",url:" + channel.getUrl());
            }
            invoke(channel, Constants.ON_DISCONNECT_KEY);
        }
        
        private void invoke(Channel channel, String methodKey) {
            Invocation invocation = createInvocation(channel, channel.getUrl(), methodKey);
            if (invocation != null) {
                try {
                    received(channel, invocation);
                } catch (Throwable t) {
                    logger.warn("Failed to invoke event method " + invocation.getMethodName() + "(), cause: " + t.getMessage(), t);
                }
            }
        }
        
        private Invocation createInvocation(Channel channel, URL url, String methodKey) {
            String method = url.getParameter(methodKey);
            if (method == null || method.length() == 0) {
                return null;
            }
            RpcInvocation invocation = new RpcInvocation(method, new Class<?>[0], new Object[0]);
            invocation.setAttachment(Constants.PATH_KEY, url.getPath());
            invocation.setAttachment(Constants.GROUP_KEY, url.getParameter(Constants.GROUP_KEY));
            invocation.setAttachment(Constants.INTERFACE_KEY, url.getParameter(Constants.INTERFACE_KEY));
            invocation.setAttachment(Constants.VERSION_KEY, url.getParameter(Constants.VERSION_KEY));
            if (url.getParameter(Constants.STUB_EVENT_KEY, false)){
                invocation.setAttachment(Constants.STUB_EVENT_KEY, Boolean.TRUE.toString());
            }
            return invocation;
        }
    };

這邊主要講下replay方法徐钠,這個方法用來處理用戶的rpc請求癌刽,請求的序列化dubbo封裝的netty服務(wù)已經(jīng)處理,所在在這個方法傳入的message直接就是Invocation對象,在getInvoker中显拜,通過invocation對象可以我們可以生成exporterMap的key衡奥,用來拿到對應(yīng)的Exporter

Invoker<?> getInvoker(Channel channel, Invocation inv) throws RemotingException{
        boolean isCallBackServiceInvoke = false;
        boolean isStubServiceInvoke = false;
        int port = channel.getLocalAddress().getPort();
        String path = inv.getAttachments().get(Constants.PATH_KEY);
        //如果是客戶端的回調(diào)服務(wù).
        isStubServiceInvoke = Boolean.TRUE.toString().equals(inv.getAttachments().get(Constants.STUB_EVENT_KEY));
        if (isStubServiceInvoke){
            port = channel.getRemoteAddress().getPort();
        }
        //callback
        isCallBackServiceInvoke = isClientSide(channel) && !isStubServiceInvoke;
        if(isCallBackServiceInvoke){
            path = inv.getAttachments().get(Constants.PATH_KEY)+"."+inv.getAttachments().get(Constants.CALLBACK_SERVICE_KEY);
            inv.getAttachments().put(IS_CALLBACK_SERVICE_INVOKE, Boolean.TRUE.toString());
        }
        String serviceKey = serviceKey(port, path, inv.getAttachments().get(Constants.VERSION_KEY), inv.getAttachments().get(Constants.GROUP_KEY));

//2 通過invocation生成的key獲取exporter
        DubboExporter<?> exporter = (DubboExporter<?>) exporterMap.get(serviceKey);
        
        if (exporter == null)
            throw new RemotingException(channel, "Not found exported service: " + serviceKey + " in " + exporterMap.keySet() + ", may be version or group mismatch " + ", channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress() + ", message:" + inv);
//轉(zhuǎn)換為invoker
        return exporter.getInvoker();
    }

拿到Exporter之后,轉(zhuǎn)換為Invoker,直接調(diào)用invoke方法返回Result远荠,之后返回給調(diào)用者的序列化等邏輯dubbo封裝的netty服務(wù)也幫我們處理了矮固,我們不用關(guān)注
這章主要講解的是服務(wù)暴露,關(guān)于netty服務(wù)的實現(xiàn)不多分析(我也還沒怎么看過)譬淳,理解這個requestHandler處理器即可档址,知道它會怎么處理rpc請求對應(yīng)的Invocation即可,什么序列化邻梆,加密解密全都當(dāng)作黑盒守伸。

服務(wù)引用,下章見

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末浦妄,一起剝皮案震驚了整個濱河市尼摹,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌剂娄,老刑警劉巖蠢涝,帶你破解...
    沈念sama閱讀 211,265評論 6 490
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異阅懦,居然都是意外死亡和二,警方通過查閱死者的電腦和手機,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,078評論 2 385
  • 文/潘曉璐 我一進店門耳胎,熙熙樓的掌柜王于貴愁眉苦臉地迎上來惯吕,“玉大人,你說我怎么就攤上這事场晶』觳海” “怎么了?”我有些...
    開封第一講書人閱讀 156,852評論 0 347
  • 文/不壞的土叔 我叫張陵诗轻,是天一觀的道長钳宪。 經(jīng)常有香客問我,道長扳炬,這世上最難降的妖魔是什么吏颖? 我笑而不...
    開封第一講書人閱讀 56,408評論 1 283
  • 正文 為了忘掉前任,我火速辦了婚禮恨樟,結(jié)果婚禮上半醉,老公的妹妹穿的比我還像新娘。我一直安慰自己劝术,他們只是感情好缩多,可當(dāng)我...
    茶點故事閱讀 65,445評論 5 384
  • 文/花漫 我一把揭開白布呆奕。 她就那樣靜靜地躺著,像睡著了一般衬吆。 火紅的嫁衣襯著肌膚如雪梁钾。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 49,772評論 1 290
  • 那天逊抡,我揣著相機與錄音姆泻,去河邊找鬼。 笑死冒嫡,一個胖子當(dāng)著我的面吹牛拇勃,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播孝凌,決...
    沈念sama閱讀 38,921評論 3 406
  • 文/蒼蘭香墨 我猛地睜開眼方咆,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了胎许?” 一聲冷哼從身側(cè)響起峻呛,我...
    開封第一講書人閱讀 37,688評論 0 266
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎辜窑,沒想到半個月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體寨躁,經(jīng)...
    沈念sama閱讀 44,130評論 1 303
  • 正文 獨居荒郊野嶺守林人離奇死亡穆碎,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 36,467評論 2 325
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了职恳。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片所禀。...
    茶點故事閱讀 38,617評論 1 340
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖放钦,靈堂內(nèi)的尸體忽然破棺而出色徘,到底是詐尸還是另有隱情,我是刑警寧澤操禀,帶...
    沈念sama閱讀 34,276評論 4 329
  • 正文 年R本政府宣布褂策,位于F島的核電站,受9級特大地震影響颓屑,放射性物質(zhì)發(fā)生泄漏斤寂。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 39,882評論 3 312
  • 文/蒙蒙 一揪惦、第九天 我趴在偏房一處隱蔽的房頂上張望遍搞。 院中可真熱鬧,春花似錦器腋、人聲如沸溪猿。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,740評論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽诊县。三九已至讲弄,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間翎冲,已是汗流浹背垂睬。 一陣腳步聲響...
    開封第一講書人閱讀 31,967評論 1 265
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留抗悍,地道東北人驹饺。 一個月前我還...
    沈念sama閱讀 46,315評論 2 360
  • 正文 我出身青樓,卻偏偏與公主長得像缴渊,于是被迫代替她去往敵國和親赏壹。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 43,486評論 2 348

推薦閱讀更多精彩內(nèi)容