dubbo服務引用初始化淺析

dubbo服務引用初始化過程就是創(chuàng)建服務動態(tài)代理的過程,與服務發(fā)布一樣,同樣借助bean初始化完成動態(tài)代理的創(chuàng)建福也。具體調用過程:
com.alibaba.dubbo.config.ReferenceConfig:
get()-->init()-->createProxy()
在createProxy()方法中港准,服務引用分為三種情況:
1.JVM內部引用的代理
2.用戶指定URL 直連
3.通過注冊中心欲鹏,引用遠程的代理
直接看第三種

private T createProxy(Map<String, String> map) {
         .....................................
                List<URL> us = loadRegistries(false);//導入注冊中心url
                if (us != null && us.size() > 0) {
                    for (URL u : us) {
                        URL monitorUrl = loadMonitor(u);//導入monitor相關信息,加入URL中
                        if (monitorUrl != null) {
                            map.put(Constants.MONITOR_KEY, URL.encode(monitorUrl.toFullString()));
                        }
                        urls.add(u.addParameterAndEncoded(Constants.REFER_KEY, StringUtils.toQueryString(map)));
                    }
                }
                if (urls == null || urls.size() == 0) {
                    throw new IllegalStateException("No such any registry to reference " + interfaceName  + " on the consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion() + ", please config <dubbo:registry address=\"...\" /> to your spring config.");
                }
            }

            if (urls.size() == 1) {//只有一個注冊中心
                invoker = refprotocol.refer(interfaceClass, urls.get(0));
            } else {//多注冊中心
                List<Invoker<?>> invokers = new ArrayList<Invoker<?>>();
                URL registryURL = null;
                for (URL url : urls) {//每個注冊中心初始化引用一遍
                    invokers.add(refprotocol.refer(interfaceClass, url));
                    if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
                        registryURL = url; // 用了最后一個registry url
                    }
                }
                if (registryURL != null) { // 有 注冊中心協(xié)議的URL
                    // 對有注冊中心的Cluster 只用 AvailableCluster
                    // 多個注冊中心空扎,封裝成一個Invoker
                    URL u = registryURL.addParameter(Constants.CLUSTER_KEY, AvailableCluster.NAME); 
                    invoker = cluster.join(new StaticDirectory(u, invokers));
                }  else { // 不是 注冊中心的URL
                    invoker = cluster.join(new StaticDirectory(invokers));
                }
            }
        }
    .....................................
        // 創(chuàng)建服務代理
        return (T) proxyFactory.getProxy(invoker);
    }

主要服務引用過程在這個方法里refprotocol.refer(interfaceClass, url)藏鹊,調用到RegistryProtocol.refer(),根據url獲取到注冊中心转锈,然后調用doRefer(),new 一個RegistryDirectory盘寡,主要緩存遠程服務相關信息黑忱,如ip:port菇曲,然后向注冊中心consumer端注冊自己,然后在相應節(jié)點providers端訂閱提供者的信息孵户,具體過程如下圖


clipboard.png
private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {
        RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);
        directory.setRegistry(registry);
        directory.setProtocol(protocol);
        URL subscribeUrl = new URL(Constants.CONSUMER_PROTOCOL, NetUtils.getLocalHost(), 0, type.getName(), directory.getUrl().getParameters());
        if (! Constants.ANY_VALUE.equals(url.getServiceInterface())
                && url.getParameter(Constants.REGISTER_KEY, true)) {//注冊自己
              registry.register(subscribeUrl.addParameters(Constants.CATEGORY_KEY, Constants.CONSUMERS_CATEGORY,
                    Constants.CHECK_KEY, String.valueOf(false)));
        }
      //訂閱provider注冊中心節(jié)點信息,此處url需要攜帶providers參數
       directory.subscribe(subscribeUrl.addParameter(Constants.CATEGORY_KEY, 
                Constants.PROVIDERS_CATEGORY 
                + "," + Constants.CONFIGURATORS_CATEGORY 
                + "," + Constants.ROUTERS_CATEGORY));
        return cluster.join(directory);
    }

下面進入directory.subscribe()方法进胯,此方法非常重要诸衔,此處完成提供者信息的獲取,通信客戶端的初始化违崇,此方法調用鏈為:FailbackRegistry.subscribe()-->ZookeeperRegistry.doSubscribe()-->RegistryDirectory.notify(),下面來看ZookeeperRegistry.doSubscribe()方法脾还,此處主要完成提供端信息的獲取棺蛛,轉換成url椅野,訂閱的節(jié)點有三類:
/dubbo/xx.xx/providers;
/dubbo/xx.xx/routes;
/dubbo/xx.xx/configurations;

protected void doSubscribe(final URL url, final NotifyListener listener) {
     List<URL> urls = new ArrayList<URL>();//緩存訂閱到信息
      for (String path : toCategoriesPath(url)) {
              ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url);
                  ...............................................
                    zkClient.create(path, false);
                   //從zk訂閱到的信息,并添加子節(jié)點監(jiān)聽
                    List<String> children = zkClient.addChildListener(path, zkListener);
                    if (children != null) {
                        urls.addAll(toUrlsWithEmpty(url, path, children));
                    }
                }
            notify(url, listener, urls);
  }

然后調用到AbstractRegistry.notify()方法

protected void notify(URL url, NotifyListener listener, List<URL> urls) {
        ..........................................
        for (Map.Entry<String, List<URL>> entry : result.entrySet()) {
            String category = entry.getKey();
            List<URL> categoryList = entry.getValue();
            categoryNotified.put(category, categoryList);
            saveProperties(url);//url信息保存到本地
            listener.notify(categoryList);
        }
}

listener就是一路傳過來的RegistryDirectory妖爷,進入RegistryDirectory.notify()-->refreshInvoker()

private void refreshInvoker(List<URL> invokerUrls){
           ..........................................
            Map<String, Invoker<T>> newUrlInvokerMap = toInvokers(invokerUrls) ;// 將URL列表轉成Invoker列表
            Map<String, List<Invoker<T>>> newMethodInvokerMap = toMethodInvokers(newUrlInvokerMap); // 換方法名映射Invoker列表
            // state change
            //如果計算錯誤,則不進行處理.
            if (newUrlInvokerMap == null || newUrlInvokerMap.size() == 0 ){
                logger.error(new IllegalStateException("urls to invokers error .invokerUrls.size :"+invokerUrls.size() + ", invoker.size :0. urls :"+invokerUrls.toString()));
                return ;
            }
            this.methodInvokerMap = multiGroup ? toMergeMethodInvokerMap(newMethodInvokerMap) : newMethodInvokerMap;
            this.urlInvokerMap = newUrlInvokerMap;
            try{
                destroyUnusedInvokers(oldUrlInvokerMap,newUrlInvokerMap); // 關閉未使用的Invoker
            }catch (Exception e) {
                logger.warn("destroyUnusedInvokers error. ", e);
            }
}

上面的方法提供者url信息轉換成invoker炸裆,并與method對應起來洛史。下面進入toInvokers(invokerUrls):

 private Map<String, Invoker<T>> toInvokers(List<URL> urls) {
          //檢查提供者相關信息.......
         ....................
          // 緩存key為沒有合并消費端參數的URL忆嗜,不管消費端如何合并參數捆毫,如果服務端URL發(fā)生變化闪湾,則重新refer
            Map<String, Invoker<T>> localUrlInvokerMap = this.urlInvokerMap; // local reference
            Invoker<T> invoker = localUrlInvokerMap == null ? null : localUrlInvokerMap.get(key);
            if (invoker == null) { // 緩存中沒有,重新refer
                try {
                    ........................................
                    if (enabled) {
                        invoker = new InvokerDelegete<T>(protocol.refer(serviceType, url), url, providerUrl);
                    }
                } catch (Throwable t) {
                    logger.error("Failed to refer invoker for interface:"+serviceType+",url:("+url+")" + t.getMessage(), t);
                }
                if (invoker != null) { // 將新的引用放入緩存
                    newUrlInvokerMap.put(key, invoker);
                }
            }else {
                newUrlInvokerMap.put(key, invoker);
            }
        }
        keys.clear();
        return newUrlInvokerMap;
}

下面調用protocol.refer(serviceType, url)绩卤,此protocol為適配類途样,調用過程為ProtocolFilterWrapper.refer()-->ProtocolListenerWrapper.refer()-->DubboProtocol.refer():

    public <T> Invoker<T> refer(Class<T> serviceType, URL url) throws RpcException {
        // create rpc invoker.
        DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url, getClients(url), invokers);
        invokers.add(invoker);
        return invoker;
    }

然后調用getClients(url)江醇,創(chuàng)建ExchangeClient用來處理和提供端的鏈接,一般一個服務接口只會創(chuàng)建一個client

private ExchangeClient[] getClients(URL url){
        //是否共享連接
        boolean service_share_connect = false;
        int connections = url.getParameter(Constants.CONNECTIONS_KEY, 0);
        //如果connections不配置何暇,則共享連接陶夜,否則每服務每連接
        if (connections == 0){
            service_share_connect = true;
            connections = 1;
        }
        ExchangeClient[] clients = new ExchangeClient[connections];
        for (int i = 0; i < clients.length; i++) {
            if (service_share_connect){
                clients[i] = getSharedClient(url);
            } else {
                clients[i] = initClient(url);
            }
        }
        return clients;
    }

然后一路調用
Exchangers.connect()-->HeaderExchanger.connect()->Transporters.connect()-->NettyTransporter.connect(),最終初始化一個NettyClient赖晶,調用doopen()方法律适,完成網絡客戶端的初始化

    protected void doOpen() throws Throwable {
        NettyHelper.setNettyLoggerFactory();
        bootstrap = new ClientBootstrap(channelFactory);
        // config
        // @see org.jboss.netty.channel.socket.SocketChannelConfig
        bootstrap.setOption("keepAlive", true);
        bootstrap.setOption("tcpNoDelay", true);
        bootstrap.setOption("connectTimeoutMillis", getTimeout());
        final NettyHandler nettyHandler = new NettyHandler(getUrl(), this);
        bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
            public ChannelPipeline getPipeline() {
                NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyClient.this);
                ChannelPipeline pipeline = Channels.pipeline();
                pipeline.addLast("decoder", adapter.getDecoder());
                pipeline.addLast("encoder", adapter.getEncoder());
                pipeline.addLast("handler", nettyHandler);
                return pipeline;
            }
        });
    }

回到RegistryProtocol.doRefer()中,cluster.join(directory)遏插,此處調用過程為:MockClusterWrapper.join()-->FailfastCluster.join()捂贿,最終初始化一個MockClusterInvoker對象,此對象持有RegistryDirectory和FailfastClusterInvoker的引用胳嘲,接下來調用JavassistProxyFactory.getProxy()創(chuàng)建代理:

 public <T> T getProxy(Invoker<T> invoker, Class<?>[] interfaces) {
        return (T) Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker));
    }

此對象spring注入服務接口中厂僧,形成一個透明化的本地服務,當調用本地接口方法時了牛,就會啟動動態(tài)代理颜屠,執(zhí)行InvokerInvocationHandler.invoke()方法:

 public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
        String methodName = method.getName();
        Class<?>[] parameterTypes = method.getParameterTypes();
        if (method.getDeclaringClass() == Object.class) {
            return method.invoke(invoker, args);
        }
        if ("toString".equals(methodName) && parameterTypes.length == 0) {
            return invoker.toString();
        }
        if ("hashCode".equals(methodName) && parameterTypes.length == 0) {
            return invoker.hashCode();
        }
        if ("equals".equals(methodName) && parameterTypes.length == 1) {
            return invoker.equals(args[0]);
        }
        //調用MockClusterInvoker.invoke()方法
        return invoker.invoke(new RpcInvocation(method, args)).recreate();
    }
最后編輯于
?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市鹰祸,隨后出現的幾起案子甫窟,更是在濱河造成了極大的恐慌,老刑警劉巖蛙婴,帶你破解...
    沈念sama閱讀 219,427評論 6 508
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件粗井,死亡現場離奇詭異,居然都是意外死亡街图,警方通過查閱死者的電腦和手機浇衬,發(fā)現死者居然都...
    沈念sama閱讀 93,551評論 3 395
  • 文/潘曉璐 我一進店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來餐济,“玉大人耘擂,你說我怎么就攤上這事⌒跄罚” “怎么了醉冤?”我有些...
    開封第一講書人閱讀 165,747評論 0 356
  • 文/不壞的土叔 我叫張陵,是天一觀的道長篙悯。 經常有香客問我冤灾,道長,這世上最難降的妖魔是什么辕近? 我笑而不...
    開封第一講書人閱讀 58,939評論 1 295
  • 正文 為了忘掉前任韵吨,我火速辦了婚禮,結果婚禮上,老公的妹妹穿的比我還像新娘归粉。我一直安慰自己椿疗,他們只是感情好,可當我...
    茶點故事閱讀 67,955評論 6 392
  • 文/花漫 我一把揭開白布糠悼。 她就那樣靜靜地躺著届榄,像睡著了一般。 火紅的嫁衣襯著肌膚如雪倔喂。 梳的紋絲不亂的頭發(fā)上铝条,一...
    開封第一講書人閱讀 51,737評論 1 305
  • 那天,我揣著相機與錄音席噩,去河邊找鬼班缰。 笑死,一個胖子當著我的面吹牛悼枢,可吹牛的內容都是我干的埠忘。 我是一名探鬼主播,決...
    沈念sama閱讀 40,448評論 3 420
  • 文/蒼蘭香墨 我猛地睜開眼馒索,長吁一口氣:“原來是場噩夢啊……” “哼莹妒!你這毒婦竟也來了?” 一聲冷哼從身側響起绰上,我...
    開封第一講書人閱讀 39,352評論 0 276
  • 序言:老撾萬榮一對情侶失蹤旨怠,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后蜈块,有當地人在樹林里發(fā)現了一具尸體鉴腻,經...
    沈念sama閱讀 45,834評論 1 317
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內容為張勛視角 年9月15日...
    茶點故事閱讀 37,992評論 3 338
  • 正文 我和宋清朗相戀三年疯趟,在試婚紗的時候發(fā)現自己被綠了。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片谋梭。...
    茶點故事閱讀 40,133評論 1 351
  • 序言:一個原本活蹦亂跳的男人離奇死亡信峻,死狀恐怖,靈堂內的尸體忽然破棺而出瓮床,到底是詐尸還是另有隱情盹舞,我是刑警寧澤,帶...
    沈念sama閱讀 35,815評論 5 346
  • 正文 年R本政府宣布隘庄,位于F島的核電站踢步,受9級特大地震影響,放射性物質發(fā)生泄漏丑掺。R本人自食惡果不足惜获印,卻給世界環(huán)境...
    茶點故事閱讀 41,477評論 3 331
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望街州。 院中可真熱鬧兼丰,春花似錦玻孟、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,022評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至艳丛,卻和暖如春匣掸,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背氮双。 一陣腳步聲響...
    開封第一講書人閱讀 33,147評論 1 272
  • 我被黑心中介騙來泰國打工碰酝, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人眶蕉。 一個月前我還...
    沈念sama閱讀 48,398評論 3 373
  • 正文 我出身青樓砰粹,卻偏偏與公主長得像,于是被迫代替她去往敵國和親造挽。 傳聞我的和親對象是個殘疾皇子碱璃,可洞房花燭夜當晚...
    茶點故事閱讀 45,077評論 2 355

推薦閱讀更多精彩內容

  • 首先還是Spring碰到dubbo的標簽之后,會使用parseCustomElement解析dubbo標簽饭入,使用的...
    加大裝益達閱讀 6,838評論 0 11
  • Spring Cloud為開發(fā)人員提供了快速構建分布式系統(tǒng)中一些常見模式的工具(例如配置管理嵌器,服務發(fā)現,斷路器谐丢,智...
    卡卡羅2017閱讀 134,672評論 18 139
  • 因我疏忽 令你受苦 個中滋味 灼心刻骨 破碎清空 填補如初 余下時光 傾力守護 安娜星 2017.11.29
    安娜的花園閱讀 371評論 2 2
  • 小時候爽航,第一次學車,摔了乾忱,從此我只剩下一只眼讥珍,來看人,倒也足矣窄瘟。十七歲衷佃,粑粑給我買了個單車,我便談了場戀愛蹄葱,像是一...
    簡魚閱讀 279評論 0 3
  • 任何的學習氏义,都只有一個方向--------就是走向完整而獨立-------完整的生活,獨立的思考图云。 模仿是人的天性...
    笑看云起時a閱讀 390評論 8 1