詳解Dubbo(三):消費端構(gòu)造Invoker

前言

上一篇講Proxy的文章中看到,構(gòu)建Proxy需要傳入Invoker參數(shù)拢操。除基本方法外僚焦,其它接口方法的調(diào)用最終都是調(diào)用的invoker.invoke()方法。從rpc調(diào)用的整個流程來說匿辩,Invoker正好處在中間的位置,它的左邊是用戶的應(yīng)用榛丢,調(diào)用的都是對象和方法铲球。而它的右邊是傳輸層,操作的是Request/Response晰赞,所以Invoker就是中間的橋梁睬辐。

Invoker結(jié)構(gòu)

下面Invoker相關(guān)類的關(guān)系圖,這只是其中最重要的部分:

Invoker

從上面圖中可以看到宾肺,Invoker大體上分成兩個部分溯饵,針對集群的ClusterInvoker和針對特定協(xié)議的Invoker。下面先從針對特定協(xié)議的Invoker開始锨用。

Protocol和Invoker

從上一篇的ReferenceBean初始化中可以知道丰刊,消費端針對某個服務(wù)接口創(chuàng)建Invoker的時候,首先需要獲取到URL增拥。最簡單的例子就是在@Reference注解上配置了url地址啄巧,而且這個地址不是注冊中心的地址。

指定協(xié)議的URL

最簡單的url比如dubbo://10.0.75.1:20880/org.apache.dubbo.demo.DemoService?&interface=org.apache.dubbo.demo.DemoService&methods=sayHello&side=provider&timestamp=1585553085050
ReferenceBean拿到這個url后就會去找它對應(yīng)的Protocol類掌栅,根據(jù)url的schema秩仆, Dubbo可以找到DubboProtocol,然后調(diào)用Protocol的refer方法獲取到Invoker猾封,這個方法在AbstractProtocol類里面澄耍。

   @Override
    public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
        return new AsyncToSyncInvoker<>(protocolBindingRefer(type, url));
    }

實際上調(diào)用的是子類的protocolBindingRefer()方法,這里外層封裝的AsyncToSyncInvoker是一個裝飾類,因為新版本的dubbo把所有Invoker調(diào)用都改成了異步返回齐莲,如果Consumer仍然希望同步調(diào)用痢站,則用這個裝飾類轉(zhuǎn)換一下。下面看下DubboProtocol的protocolBindingRefer()方法實現(xiàn):

    @Override
    public <T> Invoker<T> protocolBindingRefer(Class<T> serviceType, URL url) throws RpcException {
        optimizeSerialization(url);

        // 創(chuàng)建Dubbo Invoker
        DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url, getClients(url), invokers);
        invokers.add(invoker);

        return invoker;
    }

該方法直接創(chuàng)建了一個DubboInvoker选酗,總共傳入四個參數(shù)阵难,除了接口和url外,第三個參數(shù)是構(gòu)建傳輸層Client芒填,前面講過Invoker連接了Proxy和傳輸層呜叫,當Invoker發(fā)起調(diào)用時,就需要這個ExchangeClient來發(fā)送請求和接收Response殿衰,Exchange層的解析會包含在后續(xù)的文章中朱庆。第四個參數(shù)是Invoker的緩存集合,不是Protocol用的播玖,所以不去管它椎工。
前一篇文章講過蜀踏,當Proxy最終接收到方法調(diào)用后掰吕,會調(diào)用Invoker.invoke()來發(fā)起遠程調(diào)用,下面來看下DubboInvoker.invoke()是怎么實現(xiàn)的殖熟。
DubboInvoker
對invoke()方法的調(diào)用首先會進到DubboInvoker的父類AbstractInvoker中:

    @Override
    public Result invoke(Invocation inv) throws RpcException {
        // 判斷invoker是否已經(jīng)destroy了局待,是則打印警告,調(diào)用繼續(xù)
        if (destroyed.get()) {
            logger.warn("Invoker for service " + this + " on consumer " + NetUtils.getLocalHost() + " is destroyed, "
                    + ", dubbo version is " + Version.getVersion() + ", this invoker should not be used any longer");
        }
        //追加RpcContext中的附加信息到Invocation中钳榨,比如鏈路追蹤的Id等
        RpcInvocation invocation = (RpcInvocation) inv;
        invocation.setInvoker(this);
        if (CollectionUtils.isNotEmptyMap(attachment)) {
            invocation.addObjectAttachmentsIfAbsent(attachment);
        }
        Map<String, Object> contextAttachments = RpcContext.getContext().getObjectAttachments();
        if (CollectionUtils.isNotEmptyMap(contextAttachments)) {
            invocation.addObjectAttachments(contextAttachments);
        }
        //設(shè)置是同步還是異步調(diào)用
        invocation.setInvokeMode(RpcUtils.getInvokeMode(url, invocation));
        //如果是異步調(diào)用纽门,給這次請求加一個唯一id
        RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);

        AsyncRpcResult asyncResult;
        try {
            //調(diào)用子類的doInvoke()方法
            asyncResult = (AsyncRpcResult) doInvoke(invocation);
        } catch (InvocationTargetException e) { // biz exception
            //異常處理
            ...
        } catch (RpcException e) {
            //異常處理
            ...
        } catch (Throwable e) {
            //異常處理
            asyncResult = AsyncRpcResult.newDefaultAsyncResult(null, e, invocation);
        }
        RpcContext.getContext().setFuture(new FutureAdapter(asyncResult.getResponseFuture()));
        return asyncResult;
    }

AbstractInvoker最終調(diào)用了DubboInvokerdoInvoke()方法薛耻。

@Override
    protected Result doInvoke(final Invocation invocation) throws Throwable {
        RpcInvocation inv = (RpcInvocation) invocation;
        final String methodName = RpcUtils.getMethodName(invocation);
        inv.setAttachment(PATH_KEY, getUrl().getPath());
        inv.setAttachment(VERSION_KEY, version);
        //獲取Dubbo協(xié)議的exchangeClient
        ExchangeClient currentClient;
        if (clients.length == 1) {
            currentClient = clients[0];
        } else {
            currentClient = clients[index.getAndIncrement() % clients.length];
        }
        try {
            boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
            int timeout = getUrl().getMethodPositiveParameter(methodName, TIMEOUT_KEY, DEFAULT_TIMEOUT);
            //如果Oneway調(diào)用,即Consumer端不關(guān)心調(diào)用是否成功赏陵,則發(fā)送請求后直接返回結(jié)果蝙搔。多用在日志發(fā)送這種可以容忍數(shù)據(jù)丟失的場景
            if (isOneway) {
                boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
                currentClient.send(inv, isSent);
                return AsyncRpcResult.newDefaultAsyncResult(invocation);
            } else {
                //2.7之后所有調(diào)用都改成異步,講Future放入result中吃型,如果Consumer調(diào)用是同步的,上面的Protocol的refer()會阻塞等待異步結(jié)果返回
                ExecutorService executor = getCallbackExecutor(getUrl(), inv);
                CompletableFuture<AppResponse> appResponseFuture =
                        currentClient.request(inv, timeout, executor).thenApply(obj -> (AppResponse) obj);
                // save for 2.6.x compatibility, for example, TraceFilter in Zipkin uses com.alibaba.xxx.FutureAdapter
                FutureContext.getContext().setCompatibleFuture(appResponseFuture);
                AsyncRpcResult result = new AsyncRpcResult(appResponseFuture, inv);
                result.setExecutor(executor);
                return result;
            }
        } catch (TimeoutException e) {
            throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
        } catch (RemotingException e) {
            throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
        }
    }

上面就是當@Reference上配置了單個url敌土,并且這個url指定了具體協(xié)議的情況返干,下面看下當url是注冊中心的情況。

注冊中心的URL

之前的文章講過矩欠,@Reference關(guān)聯(lián)的注冊中心的url格式類似于registry://localhost:2181?refer=version%3f1.0.0,所以dubbo可以基于url找到對應(yīng)的Protocol類為RegistryProtocol躺坟,現(xiàn)在看下這個類的refer()方法如何處理的:

    @Override
    @SuppressWarnings("unchecked")
    public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
        //1. 轉(zhuǎn)換成具體注冊中心實現(xiàn)的url
        url = getRegistryUrl(url);
        //2. 獲取注冊中心實現(xiàn)
        Registry registry = registryFactory.getRegistry(url);
        //3. 如果是獲取RegistryService的代理乳蓄,則直接獲取本地暴露的invoker
        if (RegistryService.class.equals(type)) {
            return proxyFactory.getInvoker((T) registry, type, url);
        }

        //4. 判斷url是否指定了分組信息
        Map<String, String> qs = StringUtils.parseQueryString(url.getParameterAndDecoded(REFER_KEY));
        String group = qs.get(GROUP_KEY);
        if (group != null && group.length() > 0) {
            if ((COMMA_SPLIT_PATTERN.split(group)).length > 1 || "*".equals(group)) {
                //指定了分組虚倒,則使用MergeableCluster
                return doRefer(getMergeableCluster(), registry, type, url);
            }
        }
        //5. 獲取Cluster Invoker
        return doRefer(cluster, registry, type, url);
    }

第1步,首先需要將url轉(zhuǎn)換成真實注冊中心的地址菠剩。dubbo是支持多注冊中心的,而配置中獲取的是一個通用的注冊中心url耻煤,以registry://開頭具壮,這一步轉(zhuǎn)成真正的注冊中心url,比如從registry://127.0.0.1:2181/org.apache.dubbo.registry.RegistryService?refer=interface%3Dorg.apache.dubbo.demo.DemoService&registry=zookeeper 轉(zhuǎn)成 zookeeper://127.0.0.1:2181/org.apache.dubbo.registry.RegistryService?refer=interface%3Dorg.apache.dubbo.demo.DemoService
第2步哈蝇,根據(jù)真實的url獲取到注冊中心的實現(xiàn)類炮赦,比如上面的url獲取到的就是使用zookeeper注冊中心,獲取的就是ZookeeperRegistry
第3步眼五,這里是對獲取注冊中心實例代理的特殊處理看幼,暫時不看
第4步,dubbo支持將多個遠程服務(wù)調(diào)用結(jié)果做合并來做為最終結(jié)果,通過配置一個merger類來實現(xiàn)
第5步汽煮,沒有指定group的話,則使用默認的Cluster構(gòu)造Invoker
上面方法的主要就是獲取Registry的實現(xiàn)心例,然后調(diào)用doRefer()方法:

private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {
        //1. 構(gòu)建directory實例
        RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);
        directory.setRegistry(registry);
        directory.setProtocol(protocol);
        // 2. 生成consumer URL
        Map<String, String> parameters = new HashMap<String, String>(directory.getConsumerUrl().getParameters());
        URL subscribeUrl = new URL(CONSUMER_PROTOCOL, parameters.remove(REGISTER_IP_KEY), 0, type.getName(), parameters);
        //3. 將consumer信息寫入注冊中心
        if (directory.isShouldRegister()) {
            directory.setRegisteredConsumerUrl(subscribeUrl);
            registry.register(directory.getRegisteredConsumerUrl());
        }
        //4. 構(gòu)建RouteChain
        directory.buildRouterChain(subscribeUrl);
        //5. 訂閱服務(wù)變化通知
        directory.subscribe(toSubscribeUrl(subscribeUrl));
        //6. 生成ClusterInvoker
        Invoker<T> invoker = cluster.join(directory);
        List<RegistryProtocolListener> listeners = findRegistryProtocolListeners(url);
        if (CollectionUtils.isEmpty(listeners)) {
            return invoker;
        }
        //7. 回調(diào)Listener
        RegistryInvokerWrapper<T> registryInvokerWrapper = new RegistryInvokerWrapper<>(directory, cluster, invoker, subscribeUrl);
        for (RegistryProtocolListener listener : listeners) {
            listener.onRefer(this, registryInvokerWrapper);
        }
        return registryInvokerWrapper;
    }

在上面的doRefer()方法中止后,首先為服務(wù)生成RegistryDirectory實例溜腐,該類的作用是關(guān)聯(lián)DirectoryRegistry接口,前面的白話Dubbo系列中已經(jīng)講過歉糜,不清楚的話可以回查一下望众。隨后,Consumer會將自己也注冊到注冊中心夯缺,所以可以通過注冊中心的數(shù)據(jù)看到某個Provider都被誰消費刽酱,也可以看到某個Consumer都調(diào)用了哪些服務(wù)棵里。
第5步中姐呐,訂閱注冊中心的數(shù)據(jù)變化,在provider變化時可以實時收到通知
第6步中头谜,生成最終的ClusterInvoker鸠澈,Dubbo默認配置中,這里的Cluster是FailoverCluster际度,join()方法返回FailoverClusterInvoker涵妥。

ClusterInoker實現(xiàn)

ClusterInvoker是Dubbo支持集群調(diào)用的核心實現(xiàn),包括負載均衡窒所、特殊路由、容錯處理等禽额。默認實現(xiàn)類FailoverClusterInvoker支持用戶配置重試次數(shù)皮官,可以在一個節(jié)點失敗重試其它節(jié)點。
AbstractClusterInvoker:

@Override
    public Result invoke(final Invocation invocation) throws RpcException {
        //判斷Invoker是否已經(jīng)destroy盔憨,是則拋出異常
        checkWhetherDestroyed();
        // 將attachments加到Invocation中
        Map<String, Object> contextAttachments = RpcContext.getContext().getObjectAttachments();
        if (contextAttachments != null && contextAttachments.size() != 0) {
            ((RpcInvocation) invocation).addObjectAttachments(contextAttachments);
        }
       // 獲取可用invoker列表
        List<Invoker<T>> invokers = list(invocation);
       //根據(jù)配置獲取指定的負載均衡實現(xiàn)
        LoadBalance loadbalance = initLoadBalance(invokers, invocation);
        RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);
        return doInvoke(invocation, invokers, loadbalance);
    }

ClusterInvoker的invoke()方法首先調(diào)用list()方法獲取所有可用invoker列表郁岩,這里的是直接調(diào)用的Directory的list方法缺狠,Directory緩存了從注冊中心獲取的provider url列表,會將每個url生成invoker如叼。
在獲取到一組invoker后需要從其中選擇一個發(fā)起調(diào)用穷劈,這時候就需要用到負載均衡歇终,最終根據(jù)獲取的invoker列表和負載均衡器調(diào)用子類的具體實現(xiàn)。
FailoverClusterInvoker:

@Override
    @SuppressWarnings({"unchecked", "rawtypes"})
    public Result doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
        List<Invoker<T>> copyInvokers = invokers;
        checkInvokers(copyInvokers, invocation);
        String methodName = RpcUtils.getMethodName(invocation);
        // 獲取重試次數(shù)追葡,最低可配置在方法粒度
        int len = getUrl().getMethodParameter(methodName, RETRIES_KEY, DEFAULT_RETRIES) + 1;
        if (len <= 0) {
            len = 1;
        }
        // retry loop.
        RpcException le = null; // last exception.
        List<Invoker<T>> invoked = new ArrayList<Invoker<T>>(copyInvokers.size()); // invoked invokers.
        Set<String> providers = new HashSet<String>(len);
        for (int i = 0; i < len; i++) {
            //Reselect before retry to avoid a change of candidate `invokers`.
            //NOTE: if `invokers` changed, then `invoked` also lose accuracy.
            if (i > 0) {
                checkWhetherDestroyed();
                copyInvokers = list(invocation);
                // check again
                checkInvokers(copyInvokers, invocation);
            }
            // 使用負載均衡最終選擇一個invoker
            Invoker<T> invoker = select(loadbalance, invocation, copyInvokers, invoked);
            invoked.add(invoker);
            RpcContext.getContext().setInvokers((List) invoked);
            try {
                Result result = invoker.invoke(invocation);
                if (le != null && logger.isWarnEnabled()) {
                    logger.warn(...);
                }
                return result;
            } catch (RpcException e) {
                if (e.isBiz()) { // biz exception.
                    throw e;
                }
                le = e;
            } catch (Throwable e) {
                le = new RpcException(e.getMessage(), e);
            } finally {
                providers.add(invoker.getUrl().getAddress());
            }
        }
        throw new RpcException(...);
    }

上面的邏輯主要是兩點宜肉,根據(jù)配置的重試次數(shù)來決定是否重試翎碑,根據(jù)負載均衡實現(xiàn)從注冊中心返回的可用服務(wù)中選擇其中一個杈女,然后發(fā)起調(diào)用吊圾,當重試結(jié)束還未成功翰蠢,則拋出異常。

構(gòu)造帶Filter的Invoker

上面講了兩種Invoker的獲取和invoke的工作原理檀何,其實Dubbo中上面得到的Invoker不會直接返回給Proxy廷支,而是需要和Filter集成最終返回Invoker鏈。這部分的代碼前面白話部分講Filter的時候已分解垛孔,傳送門施敢。

總結(jié)

消費端的Proxy通過Invoker發(fā)起調(diào)用僵娃,Invoker對Proxy屏蔽了集群和服務(wù)治理等一系列邏輯,同時從Invoker層開始默怨,提供了對多協(xié)議的支持匙睹。從Invoker再往后走,將不存在接口和方法的概念垃僚,下一篇將分解傳輸層的實現(xiàn)谆棺。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末罕袋,一起剝皮案震驚了整個濱河市浴讯,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌榆纽,老刑警劉巖,帶你破解...
    沈念sama閱讀 221,888評論 6 515
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件鸵赫,死亡現(xiàn)場離奇詭異躏升,居然都是意外死亡,警方通過查閱死者的電腦和手機一睁,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 94,677評論 3 399
  • 文/潘曉璐 我一進店門者吁,熙熙樓的掌柜王于貴愁眉苦臉地迎上來饲帅,“玉大人,你說我怎么就攤上這事染坯∏鹨荩” “怎么了?”我有些...
    開封第一講書人閱讀 168,386評論 0 360
  • 文/不壞的土叔 我叫張陵仲锄,是天一觀的道長儒喊。 經(jīng)常有香客問我币呵,道長,這世上最難降的妖魔是什么芯义? 我笑而不...
    開封第一講書人閱讀 59,726評論 1 297
  • 正文 為了忘掉前任妻柒,我火速辦了婚禮举塔,結(jié)果婚禮上求泰,老公的妹妹穿的比我還像新娘计盒。我一直安慰自己,他們只是感情好枉氮,可當我...
    茶點故事閱讀 68,729評論 6 397
  • 文/花漫 我一把揭開白布聊替。 她就那樣靜靜地躺著培廓,像睡著了一般。 火紅的嫁衣襯著肌膚如雪泣港。 梳的紋絲不亂的頭發(fā)上价匠,一...
    開封第一講書人閱讀 52,337評論 1 310
  • 那天踩窖,我揣著相機與錄音,去河邊找鬼箫柳。 笑死啥供,一個胖子當著我的面吹牛,可吹牛的內(nèi)容都是我干的涮毫。 我是一名探鬼主播鳞骤,決...
    沈念sama閱讀 40,902評論 3 421
  • 文/蒼蘭香墨 我猛地睜開眼豫尽,長吁一口氣:“原來是場噩夢啊……” “哼顷帖!你這毒婦竟也來了渤滞?” 一聲冷哼從身側(cè)響起榴嗅,我...
    開封第一講書人閱讀 39,807評論 0 276
  • 序言:老撾萬榮一對情侶失蹤嗽测,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后疏魏,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體晤愧,經(jīng)...
    沈念sama閱讀 46,349評論 1 318
  • 正文 獨居荒郊野嶺守林人離奇死亡官份,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 38,439評論 3 340
  • 正文 我和宋清朗相戀三年舅巷,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片钠右。...
    茶點故事閱讀 40,567評論 1 352
  • 序言:一個原本活蹦亂跳的男人離奇死亡爬舰,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出坪仇,到底是詐尸還是另有隱情垃你,我是刑警寧澤,帶...
    沈念sama閱讀 36,242評論 5 350
  • 正文 年R本政府宣布皆刺,位于F島的核電站羡蛾,受9級特大地震影響锨亏,放射性物質(zhì)發(fā)生泄漏忙干。R本人自食惡果不足惜浪藻,卻給世界環(huán)境...
    茶點故事閱讀 41,933評論 3 334
  • 文/蒙蒙 一爱葵、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧赞哗,春花似錦浓瞪、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,420評論 0 24
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至罚勾,卻和暖如春吭狡,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背送丰。 一陣腳步聲響...
    開封第一講書人閱讀 33,531評論 1 272
  • 我被黑心中介騙來泰國打工器躏, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留蟹略,地道東北人。 一個月前我還...
    沈念sama閱讀 48,995評論 3 377
  • 正文 我出身青樓揽浙,卻偏偏與公主長得像,于是被迫代替她去往敵國和親太抓。 傳聞我的和親對象是個殘疾皇子令杈,可洞房花燭夜當晚...
    茶點故事閱讀 45,585評論 2 359

推薦閱讀更多精彩內(nèi)容