Dubbo基礎(chǔ)篇 遠(yuǎn)程調(diào)用

完整流程圖

Dubbo基礎(chǔ)篇 服務(wù)調(diào)用完整流程.png

一句話總結(jié)流程

總結(jié)為一句話就是:客戶端在發(fā)起遠(yuǎn)程調(diào)用時揉燃,具體的代理類會被InvokerInvacationHandler攔截,在這里面根據(jù)一些條件和負(fù)載均衡策略蔗草,選擇出其中一個符合條件的Invoker,進(jìn)行遠(yuǎn)程調(diào)用疆柔。提供者收到請求后咒精,會從ExpoterMap中選擇對應(yīng)的Invoker(Wrapper包裝),最終調(diào)用到具體的實現(xiàn)類旷档。處理完請求后將結(jié)果返回狠轻。返回后客戶端根據(jù)之前傳過去的請求ID,找到之前的請求彬犯,然后再進(jìn)行自己的業(yè)務(wù)處理

Consumer遠(yuǎn)程調(diào)用

  • 調(diào)用對應(yīng)的代理類
  • 被InvokerInvocationHandler攔截
  • ClusterInvoker經(jīng)過路由過濾,負(fù)載均衡查吊,選擇其中一個Invoker谐区,發(fā)起遠(yuǎn)程調(diào)用(帶請求ID)
public class JavassistProxyFactory extends AbstractProxyFactory {

  @Override
  @SuppressWarnings("unchecked")
  public <T> T getProxy(Invoker<T> invoker, Class<?>[] interfaces) {
            // InvokerInvocationHandler (重點關(guān)注)
            // 遠(yuǎn)程調(diào)用時,調(diào)用的方法會被 InvokerInvocationHandler 攔截
      return (T) Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker));
  }

  // ...
}

InvokerInvocationHandler處理

  • 構(gòu)建RpcInvocation
  • 調(diào)用對應(yīng)Invoker的invoke方法
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
    if (method.getDeclaringClass() == Object.class) {
        return method.invoke(invoker, args);
    }
    // 獲取調(diào)用的遠(yuǎn)程方法名
    String methodName = method.getName();
    // 獲取參數(shù)
    Class<?>[] parameterTypes = method.getParameterTypes();
    if (parameterTypes.length == 0) {
        if ("toString".equals(methodName)) {
            return invoker.toString();
        } else if ("$destroy".equals(methodName)) {
            invoker.destroy();
            return null;
        } else if ("hashCode".equals(methodName)) {
            return invoker.hashCode();
        }
    } else if (parameterTypes.length == 1 && "equals".equals(methodName)) {
        return invoker.equals(args[0]);
    }
    // 構(gòu)建一個dubbo rpc invocation
    RpcInvocation rpcInvocation = new RpcInvocation(method, invoker.getInterface().getName(), protocolServiceKey, args);
    String serviceKey = invoker.getUrl().getServiceKey();
    rpcInvocation.setTargetServiceUniqueName(serviceKey);

    // invoker.getUrl() returns consumer url.
    RpcContext.setRpcContext(invoker.getUrl());

    if (consumerModel != null) {
        rpcInvocation.put(Constants.CONSUMER_MODEL, consumerModel);
        rpcInvocation.put(Constants.METHOD_MODEL, consumerModel.getMethodModel(method));
    }

    // 遠(yuǎn)程調(diào)用
    return invoker.invoke(rpcInvocation).recreate();
}

Invoker#invoker方法

  • 路由過濾
  • 負(fù)載均衡

最終挑選出某一個invoker

@Override
public Result invoke(final Invocation invocation) throws RpcException {
    checkWhetherDestroyed();

    // binding attachments into invocation.
    Map<String, Object> contextAttachments = RpcContext.getContext().getObjectAttachments();
    if (contextAttachments != null && contextAttachments.size() != 0) {
        ((RpcInvocation) invocation).addObjectAttachments(contextAttachments);
    }

    // 實際上就是directroy.list 通過方法名尋找invokers 里面回去做一些過濾 獲取過濾后的invoke列表
    List<Invoker<T>> invokers = list(invocation);
    // 根據(jù)@SPI選擇負(fù)載均衡的策略
    LoadBalance loadbalance = initLoadBalance(invokers, invocation);
    RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);
    return doInvoke(invocation, invokers, loadbalance); // 調(diào)用子類的方法
}
@Override
@SuppressWarnings({"unchecked", "rawtypes"})
public Result doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
    List<Invoker<T>> copyInvokers = invokers;
    checkInvokers(copyInvokers, invocation);
    String methodName = RpcUtils.getMethodName(invocation);
    int len = getUrl().getMethodParameter(methodName, RETRIES_KEY, DEFAULT_RETRIES) + 1;
    if (len <= 0) {
        len = 1;
    }
    // retry loop.
    RpcException le = null; // last exception.
    List<Invoker<T>> invoked = new ArrayList<Invoker<T>>(copyInvokers.size()); // invoked invokers.
    Set<String> providers = new HashSet<String>(len);
    for (int i = 0; i < len; i++) {
        //Reselect before retry to avoid a change of candidate `invokers`.
        //NOTE: if `invokers` changed, then `invoked` also lose accuracy.
        if (i > 0) {
            checkWhetherDestroyed();
            copyInvokers = list(invocation);
            // check again
            checkInvokers(copyInvokers, invocation);
        }
        // 根據(jù)負(fù)載均衡規(guī)則找出一個invoker
        Invoker<T> invoker = select(loadbalance, invocation, copyInvokers, invoked);
        invoked.add(invoker);
        RpcContext.getContext().setInvokers((List) invoked);
        try {
            // 遠(yuǎn)程調(diào)用(每次請求都有一個唯一的ID)
            Result result = invoker.invoke(invocation);
            if (le != null && logger.isWarnEnabled()) {
                logger.warn("Although retry the method " + methodName
                        + " in the service " + getInterface().getName()
                        + " was successful by the provider " + invoker.getUrl().getAddress()
                        + ", but there have been failed providers " + providers
                        + " (" + providers.size() + "/" + copyInvokers.size()
                        + ") from the registry " + directory.getUrl().getAddress()
                        + " on the consumer " + NetUtils.getLocalHost()
                        + " using the dubbo version " + Version.getVersion() + ". Last error is: "
                        + le.getMessage(), le);
            }
            return result;
        } catch (RpcException e) {
            if (e.isBiz()) { // biz exception.
                throw e;
            }
            le = e;
        } catch (Throwable e) {
            le = new RpcException(e.getMessage(), e);
        } finally {
            providers.add(invoker.getUrl().getAddress());
        }
    }
    throw new RpcException(le.getCode(), "Failed to invoke the method "
            + methodName + " in the service " + getInterface().getName()
            + ". Tried " + len + " times of the providers " + providers
            + " (" + providers.size() + "/" + copyInvokers.size()
            + ") from the registry " + directory.getUrl().getAddress()
            + " on the consumer " + NetUtils.getLocalHost() + " using the dubbo version "
            + Version.getVersion() + ". Last error is: "
            + le.getMessage(), le.getCause() != null ? le.getCause() : le);
}

Provider處理請求

  • 服務(wù)端的NettyServer處理請求逻卖,最終會調(diào)用到DubboProtcol#reply
  • 根據(jù)客戶端的請求宋列,從ExportedMap中選擇對應(yīng)的Invoker (ExportedMap key:serviceKey())
  • 調(diào)用Invoker具體業(yè)務(wù)類的方法
    鏈?zhǔn)秸{(diào)用,入口 ProtocolFilterWrapper 會處理調(diào)用信息GenericFilter评也,上下文ContextFilter等
    Dubbo基礎(chǔ)篇 遠(yuǎn)程調(diào)用NettyService處理流程.png

    調(diào)用到AbstractProxyInvoker 炼杖,當(dāng)前類調(diào)用通過Javaassist封裝成Wrapper類最終調(diào)用到具體的實現(xiàn)類
  • 返回處理結(jié)果

NettyServer的Handler處理請求

@Override
public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
    NettyChannel channel = NettyChannel.getOrAddChannel(ctx.getChannel(), url, handler);
    try {
        if (channel != null) {
            channels.put(NetUtils.toAddressString((InetSocketAddress) ctx.getChannel().getRemoteAddress()), channel);
        }
        handler.connected(channel);
    } finally {
        NettyChannel.removeChannelIfDisconnected(ctx.getChannel());
    }

    if (logger.isInfoEnabled()) {
        logger.info("The connection between " + channel.getRemoteAddress() + " and " + channel.getLocalAddress() + " is established");
    }
}
@Override
public void connected(Channel channel) throws RemotingException {
  ExecutorService executor = getExecutorService();
  try {
        // 封裝成ChannelEventRunnable灭返,丟到線程中處理
    executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CONNECTED));
  } catch (Throwable t) {
    throw new ExecutionException("connect event", channel, getClass() + " error when process connected event .", t);
  }
}
void handleRequest(final ExchangeChannel channel, Request req) throws RemotingException {
    // 通過請求id,構(gòu)建一個Response
    Response res = new Response(req.getId(), req.getVersion());
    if (req.isBroken()) {
        // 獲取請求信息 方法名之類的
        Object data = req.getData();

        String msg;
        if (data == null) {
            msg = null;
        } else if (data instanceof Throwable) {
            msg = StringUtils.toString((Throwable) data);
        } else {
            msg = data.toString();
        }
        res.setErrorMessage("Fail to decode request due to: " + msg);
        res.setStatus(Response.BAD_REQUEST);

        channel.send(res);
        return;
    }
    // find handler by message class.
    Object msg = req.getData();
    try {
        // 最終調(diào)用 DubboProtocol reply
        CompletionStage<Object> future = handler.reply(channel, msg);
        future.whenComplete((appResult, t) -> {
            try {
                if (t == null) {
                    res.setStatus(Response.OK);
                    res.setResult(appResult);
                } else {
                    res.setStatus(Response.SERVICE_ERROR);
                    res.setErrorMessage(StringUtils.toString(t));
                }
                channel.send(res);
            } catch (RemotingException e) {
                logger.warn("Send result to consumer failed, channel is " + channel + ", msg is " + e);
            }
        });
    } catch (Throwable e) {
        res.setStatus(Response.SERVICE_ERROR);
        res.setErrorMessage(StringUtils.toString(e));
        channel.send(res);
    }
}

最終調(diào)用到 DubboProtocol

private ExchangeHandler requestHandler = new ExchangeHandlerAdapter() {

@Override
public CompletableFuture<Object> reply(ExchangeChannel channel, Object message) throws RemotingException {

    if (!(message instanceof Invocation)) {
        throw new RemotingException(channel, "Unsupported request: "
                + (message == null ? null : (message.getClass().getName() + ": " + message))
                + ", channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress());
    }

    Invocation inv = (Invocation) message;
    // 根據(jù)inv獲取Invoker   去exporterMap中找
    Invoker<?> invoker = getInvoker(channel, inv);
    // need to consider backward-compatibility if it's a callback
    if (Boolean.TRUE.toString().equals(inv.getObjectAttachments().get(IS_CALLBACK_SERVICE_INVOKE))) {
        String methodsStr = invoker.getUrl().getParameters().get("methods");
        boolean hasMethod = false;
        if (methodsStr == null || !methodsStr.contains(",")) {
            hasMethod = inv.getMethodName().equals(methodsStr);
        } else {
            String[] methods = methodsStr.split(",");
            for (String method : methods) {
                if (inv.getMethodName().equals(method)) {
                    hasMethod = true;
                    break;
                }
            }
        }
        if (!hasMethod) {
            logger.warn(new IllegalStateException("The methodName " + inv.getMethodName()
                    + " not found in callback service interface ,invoke will be ignored."
                    + " please update the api interface. url is:"
                    + invoker.getUrl()) + " ,invocation is :" + inv);
            return null;
        }
    }
    RpcContext.getContext().setRemoteAddress(channel.getRemoteAddress());
    // 調(diào)用對應(yīng)的invoke方法(最終wrapper.invokeMethod坤邪,參考服務(wù)暴露文檔)
    Result result = invoker.invoke(inv);
    return result.thenApply(Function.identity());
}

鏈?zhǔn)秸{(diào)用點 ProtocolFilterWrapper

@Override
public Result invoke(Invocation invocation) throws RpcException {
    Result asyncResult;
    try {
        asyncResult = filter.invoke(next, invocation);
    } catch (Exception e) {
        if (filter instanceof ListenableFilter) {
            ListenableFilter listenableFilter = ((ListenableFilter) filter);
            try {
                Filter.Listener listener = listenableFilter.listener(invocation);
                if (listener != null) {
                    listener.onError(e, invoker, invocation);
                }
            } finally {
                listenableFilter.removeListener(invocation);
            }
        } else if (filter instanceof Filter.Listener) {
            Filter.Listener listener = (Filter.Listener) filter;
            listener.onError(e, invoker, invocation);
        }
        throw e;
    } finally {

    }
    return asyncResult.whenCompleteWithContext((r, t) -> {
        if (filter instanceof ListenableFilter) {
            ListenableFilter listenableFilter = ((ListenableFilter) filter);
            Filter.Listener listener = listenableFilter.listener(invocation);
            try {
                if (listener != null) {
                    if (t == null) {
                        listener.onResponse(r, invoker, invocation);
                    } else {
                        listener.onError(t, invoker, invocation);
                    }
                }
            } finally {
                listenableFilter.removeListener(invocation);
            }
        } else if (filter instanceof Filter.Listener) {
            Filter.Listener listener = (Filter.Listener) filter;
            if (t == null) {
                listener.onResponse(r, invoker, invocation);
            } else {
                listener.onError(t, invoker, invocation);
            }
        }
    });
}

最終調(diào)用到Wrapper#AbstractProxyInvoker

@Override
public Result invoke(Invocation invocation) throws RpcException {
    try {
        Object value = doInvoke(proxy, invocation.getMethodName(), invocation.getParameterTypes(), invocation.getArguments());
    CompletableFuture<Object> future = wrapWithFuture(value);
        CompletableFuture<AppResponse> appResponseFuture = future.handle((obj, t) -> {
            AppResponse result = new AppResponse();
            if (t != null) {
                if (t instanceof CompletionException) {
                    result.setException(t.getCause());
                } else {
                    result.setException(t);
                }
            } else {
                result.setValue(obj);
            }
            return result;
        });
        return new AsyncRpcResult(appResponseFuture, invocation);
    } catch (InvocationTargetException e) {
        if (RpcContext.getContext().isAsyncStarted() && !RpcContext.getContext().stopAsync()) {
            logger.error("Provider async started, but got an exception from the original method, cannot write the exception back to consumer because an async result may have returned the new thread.", e);
        }
        return AsyncRpcResult.newDefaultAsyncResult(null, e.getTargetException(), invocation);
    } catch (Throwable e) {
        throw new RpcException("Failed to invoke remote proxy method " + invocation.getMethodName() + " to " + getUrl() + ", cause: " + e.getMessage(), e);
    }
}

protected abstract Object doInvoke(T proxy, String methodName, Class<?>[] parameterTypes, Object[] arguments) throws Throwable;

Javassist動態(tài)代理

@Override
public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {
    // TODO Wrapper cannot handle this scenario correctly: the classname contains '$'
    // 通過Javaassist封裝成Wrapper類(Dubbo服務(wù)啟動時生成熙含,所以在運行時不會產(chǎn)生開銷),減少反射的調(diào)用
    final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type);
    return new AbstractProxyInvoker<T>(proxy, type, url) {
        @Override
        // Wrapper最終調(diào)用最終調(diào)用服務(wù)提供者的接口實現(xiàn)類的方法
        protected Object doInvoke(T proxy, String methodName,
                                  Class<?>[] parameterTypes,
                                  Object[] arguments) throws Throwable {
            return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);
        }
    };
}
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末艇纺,一起剝皮案震驚了整個濱河市怎静,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌黔衡,老刑警劉巖蚓聘,帶你破解...
    沈念sama閱讀 210,978評論 6 490
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異盟劫,居然都是意外死亡夜牡,警方通過查閱死者的電腦和手機,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 89,954評論 2 384
  • 文/潘曉璐 我一進(jìn)店門侣签,熙熙樓的掌柜王于貴愁眉苦臉地迎上來塘装,“玉大人,你說我怎么就攤上這事硝岗∏庀” “怎么了?”我有些...
    開封第一講書人閱讀 156,623評論 0 345
  • 文/不壞的土叔 我叫張陵型檀,是天一觀的道長冗尤。 經(jīng)常有香客問我,道長胀溺,這世上最難降的妖魔是什么裂七? 我笑而不...
    開封第一講書人閱讀 56,324評論 1 282
  • 正文 為了忘掉前任,我火速辦了婚禮仓坞,結(jié)果婚禮上背零,老公的妹妹穿的比我還像新娘。我一直安慰自己无埃,他們只是感情好徙瓶,可當(dāng)我...
    茶點故事閱讀 65,390評論 5 384
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著嫉称,像睡著了一般侦镇。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上织阅,一...
    開封第一講書人閱讀 49,741評論 1 289
  • 那天壳繁,我揣著相機與錄音,去河邊找鬼。 笑死闹炉,一個胖子當(dāng)著我的面吹牛蒿赢,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播渣触,決...
    沈念sama閱讀 38,892評論 3 405
  • 文/蒼蘭香墨 我猛地睜開眼羡棵,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了昵观?” 一聲冷哼從身側(cè)響起晾腔,我...
    開封第一講書人閱讀 37,655評論 0 266
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎啊犬,沒想到半個月后灼擂,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 44,104評論 1 303
  • 正文 獨居荒郊野嶺守林人離奇死亡觉至,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 36,451評論 2 325
  • 正文 我和宋清朗相戀三年剔应,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片语御。...
    茶點故事閱讀 38,569評論 1 340
  • 序言:一個原本活蹦亂跳的男人離奇死亡峻贮,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出应闯,到底是詐尸還是另有隱情纤控,我是刑警寧澤,帶...
    沈念sama閱讀 34,254評論 4 328
  • 正文 年R本政府宣布碉纺,位于F島的核電站船万,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏骨田。R本人自食惡果不足惜耿导,卻給世界環(huán)境...
    茶點故事閱讀 39,834評論 3 312
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望态贤。 院中可真熱鬧舱呻,春花似錦、人聲如沸悠汽。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,725評論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽柿冲。三九已至殖氏,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間姻采,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 31,950評論 1 264
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留慨亲,地道東北人婚瓜。 一個月前我還...
    沈念sama閱讀 46,260評論 2 360
  • 正文 我出身青樓,卻偏偏與公主長得像刑棵,于是被迫代替她去往敵國和親巴刻。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 43,446評論 2 348

推薦閱讀更多精彩內(nèi)容