完整流程圖
一句話總結(jié)流程
總結(jié)為一句話就是:客戶端在發(fā)起遠(yuǎn)程調(diào)用時揉燃,具體的代理類會被InvokerInvacationHandler攔截,在這里面根據(jù)一些條件和負(fù)載均衡策略蔗草,選擇出其中一個符合條件的Invoker,進(jìn)行遠(yuǎn)程調(diào)用疆柔。提供者收到請求后咒精,會從ExpoterMap中選擇對應(yīng)的Invoker(Wrapper包裝),最終調(diào)用到具體的實現(xiàn)類旷档。處理完請求后將結(jié)果返回狠轻。返回后客戶端根據(jù)之前傳過去的請求ID,找到之前的請求彬犯,然后再進(jìn)行自己的業(yè)務(wù)處理
Consumer遠(yuǎn)程調(diào)用
- 調(diào)用對應(yīng)的代理類
- 被InvokerInvocationHandler攔截
- ClusterInvoker經(jīng)過路由過濾,負(fù)載均衡查吊,選擇其中一個Invoker谐区,發(fā)起遠(yuǎn)程調(diào)用(帶請求ID)
public class JavassistProxyFactory extends AbstractProxyFactory {
@Override
@SuppressWarnings("unchecked")
public <T> T getProxy(Invoker<T> invoker, Class<?>[] interfaces) {
// InvokerInvocationHandler (重點關(guān)注)
// 遠(yuǎn)程調(diào)用時,調(diào)用的方法會被 InvokerInvocationHandler 攔截
return (T) Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker));
}
// ...
}
InvokerInvocationHandler處理
- 構(gòu)建RpcInvocation
- 調(diào)用對應(yīng)Invoker的invoke方法
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
if (method.getDeclaringClass() == Object.class) {
return method.invoke(invoker, args);
}
// 獲取調(diào)用的遠(yuǎn)程方法名
String methodName = method.getName();
// 獲取參數(shù)
Class<?>[] parameterTypes = method.getParameterTypes();
if (parameterTypes.length == 0) {
if ("toString".equals(methodName)) {
return invoker.toString();
} else if ("$destroy".equals(methodName)) {
invoker.destroy();
return null;
} else if ("hashCode".equals(methodName)) {
return invoker.hashCode();
}
} else if (parameterTypes.length == 1 && "equals".equals(methodName)) {
return invoker.equals(args[0]);
}
// 構(gòu)建一個dubbo rpc invocation
RpcInvocation rpcInvocation = new RpcInvocation(method, invoker.getInterface().getName(), protocolServiceKey, args);
String serviceKey = invoker.getUrl().getServiceKey();
rpcInvocation.setTargetServiceUniqueName(serviceKey);
// invoker.getUrl() returns consumer url.
RpcContext.setRpcContext(invoker.getUrl());
if (consumerModel != null) {
rpcInvocation.put(Constants.CONSUMER_MODEL, consumerModel);
rpcInvocation.put(Constants.METHOD_MODEL, consumerModel.getMethodModel(method));
}
// 遠(yuǎn)程調(diào)用
return invoker.invoke(rpcInvocation).recreate();
}
Invoker#invoker方法
- 路由過濾
- 負(fù)載均衡
最終挑選出某一個invoker
@Override
public Result invoke(final Invocation invocation) throws RpcException {
checkWhetherDestroyed();
// binding attachments into invocation.
Map<String, Object> contextAttachments = RpcContext.getContext().getObjectAttachments();
if (contextAttachments != null && contextAttachments.size() != 0) {
((RpcInvocation) invocation).addObjectAttachments(contextAttachments);
}
// 實際上就是directroy.list 通過方法名尋找invokers 里面回去做一些過濾 獲取過濾后的invoke列表
List<Invoker<T>> invokers = list(invocation);
// 根據(jù)@SPI選擇負(fù)載均衡的策略
LoadBalance loadbalance = initLoadBalance(invokers, invocation);
RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);
return doInvoke(invocation, invokers, loadbalance); // 調(diào)用子類的方法
}
@Override
@SuppressWarnings({"unchecked", "rawtypes"})
public Result doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
List<Invoker<T>> copyInvokers = invokers;
checkInvokers(copyInvokers, invocation);
String methodName = RpcUtils.getMethodName(invocation);
int len = getUrl().getMethodParameter(methodName, RETRIES_KEY, DEFAULT_RETRIES) + 1;
if (len <= 0) {
len = 1;
}
// retry loop.
RpcException le = null; // last exception.
List<Invoker<T>> invoked = new ArrayList<Invoker<T>>(copyInvokers.size()); // invoked invokers.
Set<String> providers = new HashSet<String>(len);
for (int i = 0; i < len; i++) {
//Reselect before retry to avoid a change of candidate `invokers`.
//NOTE: if `invokers` changed, then `invoked` also lose accuracy.
if (i > 0) {
checkWhetherDestroyed();
copyInvokers = list(invocation);
// check again
checkInvokers(copyInvokers, invocation);
}
// 根據(jù)負(fù)載均衡規(guī)則找出一個invoker
Invoker<T> invoker = select(loadbalance, invocation, copyInvokers, invoked);
invoked.add(invoker);
RpcContext.getContext().setInvokers((List) invoked);
try {
// 遠(yuǎn)程調(diào)用(每次請求都有一個唯一的ID)
Result result = invoker.invoke(invocation);
if (le != null && logger.isWarnEnabled()) {
logger.warn("Although retry the method " + methodName
+ " in the service " + getInterface().getName()
+ " was successful by the provider " + invoker.getUrl().getAddress()
+ ", but there have been failed providers " + providers
+ " (" + providers.size() + "/" + copyInvokers.size()
+ ") from the registry " + directory.getUrl().getAddress()
+ " on the consumer " + NetUtils.getLocalHost()
+ " using the dubbo version " + Version.getVersion() + ". Last error is: "
+ le.getMessage(), le);
}
return result;
} catch (RpcException e) {
if (e.isBiz()) { // biz exception.
throw e;
}
le = e;
} catch (Throwable e) {
le = new RpcException(e.getMessage(), e);
} finally {
providers.add(invoker.getUrl().getAddress());
}
}
throw new RpcException(le.getCode(), "Failed to invoke the method "
+ methodName + " in the service " + getInterface().getName()
+ ". Tried " + len + " times of the providers " + providers
+ " (" + providers.size() + "/" + copyInvokers.size()
+ ") from the registry " + directory.getUrl().getAddress()
+ " on the consumer " + NetUtils.getLocalHost() + " using the dubbo version "
+ Version.getVersion() + ". Last error is: "
+ le.getMessage(), le.getCause() != null ? le.getCause() : le);
}
Provider處理請求
- 服務(wù)端的NettyServer處理請求逻卖,最終會調(diào)用到DubboProtcol#reply
- 根據(jù)客戶端的請求宋列,從ExportedMap中選擇對應(yīng)的Invoker (ExportedMap key:serviceKey())
- 調(diào)用Invoker具體業(yè)務(wù)類的方法
鏈?zhǔn)秸{(diào)用,入口 ProtocolFilterWrapper 會處理調(diào)用信息GenericFilter评也,上下文ContextFilter等
調(diào)用到AbstractProxyInvoker 炼杖,當(dāng)前類調(diào)用通過Javaassist封裝成Wrapper類最終調(diào)用到具體的實現(xiàn)類 - 返回處理結(jié)果
NettyServer的Handler處理請求
@Override
public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
NettyChannel channel = NettyChannel.getOrAddChannel(ctx.getChannel(), url, handler);
try {
if (channel != null) {
channels.put(NetUtils.toAddressString((InetSocketAddress) ctx.getChannel().getRemoteAddress()), channel);
}
handler.connected(channel);
} finally {
NettyChannel.removeChannelIfDisconnected(ctx.getChannel());
}
if (logger.isInfoEnabled()) {
logger.info("The connection between " + channel.getRemoteAddress() + " and " + channel.getLocalAddress() + " is established");
}
}
@Override
public void connected(Channel channel) throws RemotingException {
ExecutorService executor = getExecutorService();
try {
// 封裝成ChannelEventRunnable灭返,丟到線程中處理
executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CONNECTED));
} catch (Throwable t) {
throw new ExecutionException("connect event", channel, getClass() + " error when process connected event .", t);
}
}
void handleRequest(final ExchangeChannel channel, Request req) throws RemotingException {
// 通過請求id,構(gòu)建一個Response
Response res = new Response(req.getId(), req.getVersion());
if (req.isBroken()) {
// 獲取請求信息 方法名之類的
Object data = req.getData();
String msg;
if (data == null) {
msg = null;
} else if (data instanceof Throwable) {
msg = StringUtils.toString((Throwable) data);
} else {
msg = data.toString();
}
res.setErrorMessage("Fail to decode request due to: " + msg);
res.setStatus(Response.BAD_REQUEST);
channel.send(res);
return;
}
// find handler by message class.
Object msg = req.getData();
try {
// 最終調(diào)用 DubboProtocol reply
CompletionStage<Object> future = handler.reply(channel, msg);
future.whenComplete((appResult, t) -> {
try {
if (t == null) {
res.setStatus(Response.OK);
res.setResult(appResult);
} else {
res.setStatus(Response.SERVICE_ERROR);
res.setErrorMessage(StringUtils.toString(t));
}
channel.send(res);
} catch (RemotingException e) {
logger.warn("Send result to consumer failed, channel is " + channel + ", msg is " + e);
}
});
} catch (Throwable e) {
res.setStatus(Response.SERVICE_ERROR);
res.setErrorMessage(StringUtils.toString(e));
channel.send(res);
}
}
最終調(diào)用到 DubboProtocol
private ExchangeHandler requestHandler = new ExchangeHandlerAdapter() {
@Override
public CompletableFuture<Object> reply(ExchangeChannel channel, Object message) throws RemotingException {
if (!(message instanceof Invocation)) {
throw new RemotingException(channel, "Unsupported request: "
+ (message == null ? null : (message.getClass().getName() + ": " + message))
+ ", channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress());
}
Invocation inv = (Invocation) message;
// 根據(jù)inv獲取Invoker 去exporterMap中找
Invoker<?> invoker = getInvoker(channel, inv);
// need to consider backward-compatibility if it's a callback
if (Boolean.TRUE.toString().equals(inv.getObjectAttachments().get(IS_CALLBACK_SERVICE_INVOKE))) {
String methodsStr = invoker.getUrl().getParameters().get("methods");
boolean hasMethod = false;
if (methodsStr == null || !methodsStr.contains(",")) {
hasMethod = inv.getMethodName().equals(methodsStr);
} else {
String[] methods = methodsStr.split(",");
for (String method : methods) {
if (inv.getMethodName().equals(method)) {
hasMethod = true;
break;
}
}
}
if (!hasMethod) {
logger.warn(new IllegalStateException("The methodName " + inv.getMethodName()
+ " not found in callback service interface ,invoke will be ignored."
+ " please update the api interface. url is:"
+ invoker.getUrl()) + " ,invocation is :" + inv);
return null;
}
}
RpcContext.getContext().setRemoteAddress(channel.getRemoteAddress());
// 調(diào)用對應(yīng)的invoke方法(最終wrapper.invokeMethod坤邪,參考服務(wù)暴露文檔)
Result result = invoker.invoke(inv);
return result.thenApply(Function.identity());
}
鏈?zhǔn)秸{(diào)用點 ProtocolFilterWrapper
@Override
public Result invoke(Invocation invocation) throws RpcException {
Result asyncResult;
try {
asyncResult = filter.invoke(next, invocation);
} catch (Exception e) {
if (filter instanceof ListenableFilter) {
ListenableFilter listenableFilter = ((ListenableFilter) filter);
try {
Filter.Listener listener = listenableFilter.listener(invocation);
if (listener != null) {
listener.onError(e, invoker, invocation);
}
} finally {
listenableFilter.removeListener(invocation);
}
} else if (filter instanceof Filter.Listener) {
Filter.Listener listener = (Filter.Listener) filter;
listener.onError(e, invoker, invocation);
}
throw e;
} finally {
}
return asyncResult.whenCompleteWithContext((r, t) -> {
if (filter instanceof ListenableFilter) {
ListenableFilter listenableFilter = ((ListenableFilter) filter);
Filter.Listener listener = listenableFilter.listener(invocation);
try {
if (listener != null) {
if (t == null) {
listener.onResponse(r, invoker, invocation);
} else {
listener.onError(t, invoker, invocation);
}
}
} finally {
listenableFilter.removeListener(invocation);
}
} else if (filter instanceof Filter.Listener) {
Filter.Listener listener = (Filter.Listener) filter;
if (t == null) {
listener.onResponse(r, invoker, invocation);
} else {
listener.onError(t, invoker, invocation);
}
}
});
}
最終調(diào)用到Wrapper#AbstractProxyInvoker
@Override
public Result invoke(Invocation invocation) throws RpcException {
try {
Object value = doInvoke(proxy, invocation.getMethodName(), invocation.getParameterTypes(), invocation.getArguments());
CompletableFuture<Object> future = wrapWithFuture(value);
CompletableFuture<AppResponse> appResponseFuture = future.handle((obj, t) -> {
AppResponse result = new AppResponse();
if (t != null) {
if (t instanceof CompletionException) {
result.setException(t.getCause());
} else {
result.setException(t);
}
} else {
result.setValue(obj);
}
return result;
});
return new AsyncRpcResult(appResponseFuture, invocation);
} catch (InvocationTargetException e) {
if (RpcContext.getContext().isAsyncStarted() && !RpcContext.getContext().stopAsync()) {
logger.error("Provider async started, but got an exception from the original method, cannot write the exception back to consumer because an async result may have returned the new thread.", e);
}
return AsyncRpcResult.newDefaultAsyncResult(null, e.getTargetException(), invocation);
} catch (Throwable e) {
throw new RpcException("Failed to invoke remote proxy method " + invocation.getMethodName() + " to " + getUrl() + ", cause: " + e.getMessage(), e);
}
}
protected abstract Object doInvoke(T proxy, String methodName, Class<?>[] parameterTypes, Object[] arguments) throws Throwable;
Javassist動態(tài)代理
@Override
public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {
// TODO Wrapper cannot handle this scenario correctly: the classname contains '$'
// 通過Javaassist封裝成Wrapper類(Dubbo服務(wù)啟動時生成熙含,所以在運行時不會產(chǎn)生開銷),減少反射的調(diào)用
final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type);
return new AbstractProxyInvoker<T>(proxy, type, url) {
@Override
// Wrapper最終調(diào)用最終調(diào)用服務(wù)提供者的接口實現(xiàn)類的方法
protected Object doInvoke(T proxy, String methodName,
Class<?>[] parameterTypes,
Object[] arguments) throws Throwable {
return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);
}
};
}