上一篇我們簡(jiǎn)單描述了dubbo服務(wù)暴露-服務(wù)引用的流程
這一篇我們從dubbo協(xié)議來(lái)具體分析一下Protocol層
export()過(guò)程
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
URL url = invoker.getUrl();
// export service.
String key = serviceKey(url);
DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);
//將exporter存到map里
exporterMap.put(key, exporter);
...
openServer(url);
return exporter;
}
/**
* 開(kāi)啟服務(wù)
*
* @param url
*/
private void openServer(URL url) {
// find server.
String key = url.getAddress();
//client 也可以暴露一個(gè)只有server可以調(diào)用的服務(wù)。
boolean isServer = url.getParameter(Constants.IS_SERVER_KEY, true);
if (isServer) {
ExchangeServer server = serverMap.get(key);
if (server == null) {
//map中不存在創(chuàng)建server
serverMap.put(key, createServer(url));
} else {
//server支持reset,配合override功能使用
server.reset(url);
}
}
}
/**
* 創(chuàng)建服務(wù)
*
* @param url
* @return
*/
private ExchangeServer createServer(URL url) {
...
ExchangeServer server;
try {//啟動(dòng)服務(wù)監(jiān)聽(tīng) 傳入了requestHandler
//當(dāng)收到客戶端調(diào)用時(shí)會(huì)調(diào)用requestHandler.received()方法
server = Exchangers.bind(url, requestHandler);
} catch (RemotingException e) {
throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e);
}
...
return server;
}
private ExchangeHandler requestHandler = new ExchangeHandlerAdapter() {
@Override
public void received(Channel channel, Object message) throws RemotingException {
if (message instanceof Invocation) {
reply((ExchangeChannel) channel, message);
} else {
super.received(channel, message);
}
}
public Object reply(ExchangeChannel channel, Object message) throws RemotingException {
if (message instanceof Invocation) {
Invocation inv = (Invocation) message;
//獲取到對(duì)應(yīng)的invoker
Invoker<?> invoker = getInvoker(channel, inv);
...
//根據(jù) Invocation 調(diào)用信息,調(diào)用真正服務(wù)實(shí)現(xiàn)
return invoker.invoke(inv);
}
...
}
...
};
/**
* 根據(jù)請(qǐng)求參數(shù)獲取到對(duì)應(yīng)的服務(wù)端invoker
*
* @param channel
* @param inv
* @return
* @throws RemotingException
*/
Invoker<?> getInvoker(Channel channel, Invocation inv) throws RemotingException {
int port = channel.getLocalAddress().getPort();
String path = inv.getAttachments().get(Constants.PATH_KEY);
...
//生成serviceKey
String serviceKey = serviceKey(port, path, inv.getAttachments().get(Constants.VERSION_KEY), inv
.getAttachments().get(Constants.GROUP_KEY));
//從map中找到exporter
DubboExporter<?> exporter = (DubboExporter<?>) exporterMap.get(serviceKey);
...
return exporter.getInvoker();
}
概括一下:
export()就是根據(jù)服務(wù)url生成Exporter并存在map中,然后暴露服務(wù),設(shè)置回調(diào).
當(dāng)客戶端調(diào)用請(qǐng)求時(shí)進(jìn)入回調(diào),根據(jù)請(qǐng)求url找到存在map中的Exporter,
最后用Exporter中的Invoker調(diào)用真正的服務(wù)
注意:服務(wù)端客戶端都存在Invoker對(duì)象,但兩者有所區(qū)別.
客戶端Invoker用于溝通服務(wù)端實(shí)現(xiàn)遠(yuǎn)程調(diào)用 如:DubboInvoker
服務(wù)端Invoker用于調(diào)用真正服務(wù)實(shí)現(xiàn) 一般都繼承AbstractProxyInvoker 見(jiàn)下述代碼段
com.alibaba.dubbo.rpc.proxy.jdk.JdkProxyFactory#getInvoker
public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {
return new AbstractProxyInvoker<T>(proxy, type, url) {
@Override
protected Object doInvoke(T proxy, String methodName,
Class<?>[] parameterTypes,
Object[] arguments) throws Throwable {
Method method = proxy.getClass().getMethod(methodName, parameterTypes);
return method.invoke(proxy, arguments);
}
};
}
refer()過(guò)程
public <T> Invoker<T> refer(Class<T> serviceType, URL url) throws RpcException {
// create rpc invoker.
DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url, getClients(url), invokers);
invokers.add(invoker);
return invoker;
}
refer()就是根據(jù)url生成Invoker
調(diào)用過(guò)程
來(lái)看DubboInvoker.doInvoke()方法
@Override
protected Result doInvoke(final Invocation invocation) throws Throwable {
RpcInvocation inv = (RpcInvocation) invocation;
...
try {
boolean isAsync = RpcUtils.isAsync(getUrl(), invocation);
boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);//是否有返回值
int timeout = getUrl().getMethodParameter(methodName, Constants.TIMEOUT_KEY,Constants.DEFAULT_TIMEOUT);
if (isOneway) {//無(wú)返回值
boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
currentClient.send(inv, isSent);
RpcContext.getContext().setFuture(null);
return new RpcResult();
} else if (isAsync) {//異步
ResponseFuture future = currentClient.request(inv, timeout) ;
RpcContext.getContext().setFuture(new FutureAdapter<Object>(future));
return new RpcResult();
} else {//同步
RpcContext.getContext().setFuture(null);
return (Result) currentClient.request(inv, timeout).get();
}
} catch (Exception e) {
...
}
}
/**
* Invocation. (API, Prototype, NonThreadSafe)
* 封裝遠(yuǎn)程調(diào)用信息(方法名 參數(shù))
*/
public interface Invocation {
/**
*方法名
*/
String getMethodName();
/**
*方法參數(shù)類(lèi)型
*/
Class<?>[] getParameterTypes();
/**
*方法參數(shù)
*/
Object[] getArguments();
/**
*冗余參數(shù)
*/
Map<String, String> getAttachments();
String getAttachment(String key);
String getAttachment(String key, String defaultValue);
Invoker<?> getInvoker();
}
可以看到doInvoker方法會(huì)通過(guò)client對(duì)象執(zhí)行遠(yuǎn)程調(diào)用
到此,大家對(duì)Protocol層三大對(duì)象應(yīng)該有了一個(gè)簡(jiǎn)單的了解.