前言
上一篇講Proxy的文章中看到,構(gòu)建Proxy需要傳入Invoker參數(shù)拢操。除基本方法外僚焦,其它接口方法的調(diào)用最終都是調(diào)用的invoker.invoke()方法。從rpc調(diào)用的整個流程來說匿辩,Invoker正好處在中間的位置,它的左邊是用戶的應(yīng)用榛丢,調(diào)用的都是對象和方法铲球。而它的右邊是傳輸層,操作的是Request/Response晰赞,所以Invoker就是中間的橋梁睬辐。
Invoker結(jié)構(gòu)
下面Invoker相關(guān)類的關(guān)系圖,這只是其中最重要的部分:
從上面圖中可以看到宾肺,Invoker大體上分成兩個部分溯饵,針對集群的
ClusterInvoker
和針對特定協(xié)議的Invoker
。下面先從針對特定協(xié)議的Invoker開始锨用。
Protocol和Invoker
從上一篇的ReferenceBean
初始化中可以知道丰刊,消費端針對某個服務(wù)接口創(chuàng)建Invoker的時候,首先需要獲取到URL增拥。最簡單的例子就是在@Reference
注解上配置了url地址啄巧,而且這個地址不是注冊中心的地址。
指定協(xié)議的URL
最簡單的url比如dubbo://10.0.75.1:20880/org.apache.dubbo.demo.DemoService?&interface=org.apache.dubbo.demo.DemoService&methods=sayHello&side=provider×tamp=1585553085050
當ReferenceBean
拿到這個url后就會去找它對應(yīng)的Protocol
類掌栅,根據(jù)url的schema秩仆, Dubbo可以找到DubboProtocol
,然后調(diào)用Protocol的refer方法獲取到Invoker猾封,這個方法在AbstractProtocol
類里面澄耍。
@Override
public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
return new AsyncToSyncInvoker<>(protocolBindingRefer(type, url));
}
實際上調(diào)用的是子類的protocolBindingRefer()
方法,這里外層封裝的AsyncToSyncInvoker
是一個裝飾類,因為新版本的dubbo把所有Invoker調(diào)用都改成了異步返回齐莲,如果Consumer仍然希望同步調(diào)用痢站,則用這個裝飾類轉(zhuǎn)換一下。下面看下DubboProtocol的protocolBindingRefer()
方法實現(xiàn):
@Override
public <T> Invoker<T> protocolBindingRefer(Class<T> serviceType, URL url) throws RpcException {
optimizeSerialization(url);
// 創(chuàng)建Dubbo Invoker
DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url, getClients(url), invokers);
invokers.add(invoker);
return invoker;
}
該方法直接創(chuàng)建了一個DubboInvoker
选酗,總共傳入四個參數(shù)阵难,除了接口和url外,第三個參數(shù)是構(gòu)建傳輸層Client芒填,前面講過Invoker連接了Proxy和傳輸層呜叫,當Invoker發(fā)起調(diào)用時,就需要這個ExchangeClient
來發(fā)送請求和接收Response殿衰,Exchange層的解析會包含在后續(xù)的文章中朱庆。第四個參數(shù)是Invoker的緩存集合,不是Protocol用的播玖,所以不去管它椎工。
前一篇文章講過蜀踏,當Proxy最終接收到方法調(diào)用后掰吕,會調(diào)用Invoker.invoke()來發(fā)起遠程調(diào)用,下面來看下DubboInvoker.invoke()
是怎么實現(xiàn)的殖熟。
DubboInvoker
對invoke()方法的調(diào)用首先會進到DubboInvoker
的父類AbstractInvoker
中:
@Override
public Result invoke(Invocation inv) throws RpcException {
// 判斷invoker是否已經(jīng)destroy了局待,是則打印警告,調(diào)用繼續(xù)
if (destroyed.get()) {
logger.warn("Invoker for service " + this + " on consumer " + NetUtils.getLocalHost() + " is destroyed, "
+ ", dubbo version is " + Version.getVersion() + ", this invoker should not be used any longer");
}
//追加RpcContext中的附加信息到Invocation中钳榨,比如鏈路追蹤的Id等
RpcInvocation invocation = (RpcInvocation) inv;
invocation.setInvoker(this);
if (CollectionUtils.isNotEmptyMap(attachment)) {
invocation.addObjectAttachmentsIfAbsent(attachment);
}
Map<String, Object> contextAttachments = RpcContext.getContext().getObjectAttachments();
if (CollectionUtils.isNotEmptyMap(contextAttachments)) {
invocation.addObjectAttachments(contextAttachments);
}
//設(shè)置是同步還是異步調(diào)用
invocation.setInvokeMode(RpcUtils.getInvokeMode(url, invocation));
//如果是異步調(diào)用纽门,給這次請求加一個唯一id
RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);
AsyncRpcResult asyncResult;
try {
//調(diào)用子類的doInvoke()方法
asyncResult = (AsyncRpcResult) doInvoke(invocation);
} catch (InvocationTargetException e) { // biz exception
//異常處理
...
} catch (RpcException e) {
//異常處理
...
} catch (Throwable e) {
//異常處理
asyncResult = AsyncRpcResult.newDefaultAsyncResult(null, e, invocation);
}
RpcContext.getContext().setFuture(new FutureAdapter(asyncResult.getResponseFuture()));
return asyncResult;
}
AbstractInvoker
最終調(diào)用了DubboInvoker
的doInvoke()
方法薛耻。
@Override
protected Result doInvoke(final Invocation invocation) throws Throwable {
RpcInvocation inv = (RpcInvocation) invocation;
final String methodName = RpcUtils.getMethodName(invocation);
inv.setAttachment(PATH_KEY, getUrl().getPath());
inv.setAttachment(VERSION_KEY, version);
//獲取Dubbo協(xié)議的exchangeClient
ExchangeClient currentClient;
if (clients.length == 1) {
currentClient = clients[0];
} else {
currentClient = clients[index.getAndIncrement() % clients.length];
}
try {
boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
int timeout = getUrl().getMethodPositiveParameter(methodName, TIMEOUT_KEY, DEFAULT_TIMEOUT);
//如果Oneway調(diào)用,即Consumer端不關(guān)心調(diào)用是否成功赏陵,則發(fā)送請求后直接返回結(jié)果蝙搔。多用在日志發(fā)送這種可以容忍數(shù)據(jù)丟失的場景
if (isOneway) {
boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
currentClient.send(inv, isSent);
return AsyncRpcResult.newDefaultAsyncResult(invocation);
} else {
//2.7之后所有調(diào)用都改成異步,講Future放入result中吃型,如果Consumer調(diào)用是同步的,上面的Protocol的refer()會阻塞等待異步結(jié)果返回
ExecutorService executor = getCallbackExecutor(getUrl(), inv);
CompletableFuture<AppResponse> appResponseFuture =
currentClient.request(inv, timeout, executor).thenApply(obj -> (AppResponse) obj);
// save for 2.6.x compatibility, for example, TraceFilter in Zipkin uses com.alibaba.xxx.FutureAdapter
FutureContext.getContext().setCompatibleFuture(appResponseFuture);
AsyncRpcResult result = new AsyncRpcResult(appResponseFuture, inv);
result.setExecutor(executor);
return result;
}
} catch (TimeoutException e) {
throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
} catch (RemotingException e) {
throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
}
}
上面就是當@Reference
上配置了單個url敌土,并且這個url指定了具體協(xié)議的情況返干,下面看下當url是注冊中心的情況。
注冊中心的URL
之前的文章講過矩欠,@Reference
關(guān)聯(lián)的注冊中心的url格式類似于registry://localhost:2181?refer=version%3f1.0.0,所以dubbo可以基于url找到對應(yīng)的Protocol類為RegistryProtocol
躺坟,現(xiàn)在看下這個類的refer()方法如何處理的:
@Override
@SuppressWarnings("unchecked")
public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
//1. 轉(zhuǎn)換成具體注冊中心實現(xiàn)的url
url = getRegistryUrl(url);
//2. 獲取注冊中心實現(xiàn)
Registry registry = registryFactory.getRegistry(url);
//3. 如果是獲取RegistryService的代理乳蓄,則直接獲取本地暴露的invoker
if (RegistryService.class.equals(type)) {
return proxyFactory.getInvoker((T) registry, type, url);
}
//4. 判斷url是否指定了分組信息
Map<String, String> qs = StringUtils.parseQueryString(url.getParameterAndDecoded(REFER_KEY));
String group = qs.get(GROUP_KEY);
if (group != null && group.length() > 0) {
if ((COMMA_SPLIT_PATTERN.split(group)).length > 1 || "*".equals(group)) {
//指定了分組虚倒,則使用MergeableCluster
return doRefer(getMergeableCluster(), registry, type, url);
}
}
//5. 獲取Cluster Invoker
return doRefer(cluster, registry, type, url);
}
第1步,首先需要將url轉(zhuǎn)換成真實注冊中心的地址菠剩。dubbo是支持多注冊中心的,而配置中獲取的是一個通用的注冊中心url耻煤,以registry://開頭具壮,這一步轉(zhuǎn)成真正的注冊中心url,比如從registry://127.0.0.1:2181/org.apache.dubbo.registry.RegistryService?refer=interface%3Dorg.apache.dubbo.demo.DemoService®istry=zookeeper 轉(zhuǎn)成 zookeeper://127.0.0.1:2181/org.apache.dubbo.registry.RegistryService?refer=interface%3Dorg.apache.dubbo.demo.DemoService
第2步哈蝇,根據(jù)真實的url獲取到注冊中心的實現(xiàn)類炮赦,比如上面的url獲取到的就是使用zookeeper注冊中心,獲取的就是ZookeeperRegistry
第3步眼五,這里是對獲取注冊中心實例代理的特殊處理看幼,暫時不看
第4步,dubbo支持將多個遠程服務(wù)調(diào)用結(jié)果做合并來做為最終結(jié)果,通過配置一個merger類來實現(xiàn)
第5步汽煮,沒有指定group的話,則使用默認的Cluster構(gòu)造Invoker
上面方法的主要就是獲取Registry的實現(xiàn)心例,然后調(diào)用doRefer()方法:
private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {
//1. 構(gòu)建directory實例
RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);
directory.setRegistry(registry);
directory.setProtocol(protocol);
// 2. 生成consumer URL
Map<String, String> parameters = new HashMap<String, String>(directory.getConsumerUrl().getParameters());
URL subscribeUrl = new URL(CONSUMER_PROTOCOL, parameters.remove(REGISTER_IP_KEY), 0, type.getName(), parameters);
//3. 將consumer信息寫入注冊中心
if (directory.isShouldRegister()) {
directory.setRegisteredConsumerUrl(subscribeUrl);
registry.register(directory.getRegisteredConsumerUrl());
}
//4. 構(gòu)建RouteChain
directory.buildRouterChain(subscribeUrl);
//5. 訂閱服務(wù)變化通知
directory.subscribe(toSubscribeUrl(subscribeUrl));
//6. 生成ClusterInvoker
Invoker<T> invoker = cluster.join(directory);
List<RegistryProtocolListener> listeners = findRegistryProtocolListeners(url);
if (CollectionUtils.isEmpty(listeners)) {
return invoker;
}
//7. 回調(diào)Listener
RegistryInvokerWrapper<T> registryInvokerWrapper = new RegistryInvokerWrapper<>(directory, cluster, invoker, subscribeUrl);
for (RegistryProtocolListener listener : listeners) {
listener.onRefer(this, registryInvokerWrapper);
}
return registryInvokerWrapper;
}
在上面的doRefer()
方法中止后,首先為服務(wù)生成RegistryDirectory
實例溜腐,該類的作用是關(guān)聯(lián)Directory
和Registry
接口,前面的白話Dubbo系列中已經(jīng)講過歉糜,不清楚的話可以回查一下望众。隨后,Consumer會將自己也注冊到注冊中心夯缺,所以可以通過注冊中心的數(shù)據(jù)看到某個Provider都被誰消費刽酱,也可以看到某個Consumer都調(diào)用了哪些服務(wù)棵里。
第5步中姐呐,訂閱注冊中心的數(shù)據(jù)變化,在provider變化時可以實時收到通知
第6步中头谜,生成最終的ClusterInvoker
鸠澈,Dubbo默認配置中,這里的Cluster是FailoverCluster
际度,join()方法返回FailoverClusterInvoker
涵妥。
ClusterInoker實現(xiàn)
ClusterInvoker
是Dubbo支持集群調(diào)用的核心實現(xiàn),包括負載均衡窒所、特殊路由、容錯處理等禽额。默認實現(xiàn)類FailoverClusterInvoker
支持用戶配置重試次數(shù)皮官,可以在一個節(jié)點失敗重試其它節(jié)點。
AbstractClusterInvoker:
@Override
public Result invoke(final Invocation invocation) throws RpcException {
//判斷Invoker是否已經(jīng)destroy盔憨,是則拋出異常
checkWhetherDestroyed();
// 將attachments加到Invocation中
Map<String, Object> contextAttachments = RpcContext.getContext().getObjectAttachments();
if (contextAttachments != null && contextAttachments.size() != 0) {
((RpcInvocation) invocation).addObjectAttachments(contextAttachments);
}
// 獲取可用invoker列表
List<Invoker<T>> invokers = list(invocation);
//根據(jù)配置獲取指定的負載均衡實現(xiàn)
LoadBalance loadbalance = initLoadBalance(invokers, invocation);
RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);
return doInvoke(invocation, invokers, loadbalance);
}
ClusterInvoker
的invoke()方法首先調(diào)用list()方法獲取所有可用invoker列表郁岩,這里的是直接調(diào)用的Directory的list方法缺狠,Directory緩存了從注冊中心獲取的provider url列表,會將每個url生成invoker如叼。
在獲取到一組invoker后需要從其中選擇一個發(fā)起調(diào)用穷劈,這時候就需要用到負載均衡歇终,最終根據(jù)獲取的invoker列表和負載均衡器調(diào)用子類的具體實現(xiàn)。
FailoverClusterInvoker:
@Override
@SuppressWarnings({"unchecked", "rawtypes"})
public Result doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
List<Invoker<T>> copyInvokers = invokers;
checkInvokers(copyInvokers, invocation);
String methodName = RpcUtils.getMethodName(invocation);
// 獲取重試次數(shù)追葡,最低可配置在方法粒度
int len = getUrl().getMethodParameter(methodName, RETRIES_KEY, DEFAULT_RETRIES) + 1;
if (len <= 0) {
len = 1;
}
// retry loop.
RpcException le = null; // last exception.
List<Invoker<T>> invoked = new ArrayList<Invoker<T>>(copyInvokers.size()); // invoked invokers.
Set<String> providers = new HashSet<String>(len);
for (int i = 0; i < len; i++) {
//Reselect before retry to avoid a change of candidate `invokers`.
//NOTE: if `invokers` changed, then `invoked` also lose accuracy.
if (i > 0) {
checkWhetherDestroyed();
copyInvokers = list(invocation);
// check again
checkInvokers(copyInvokers, invocation);
}
// 使用負載均衡最終選擇一個invoker
Invoker<T> invoker = select(loadbalance, invocation, copyInvokers, invoked);
invoked.add(invoker);
RpcContext.getContext().setInvokers((List) invoked);
try {
Result result = invoker.invoke(invocation);
if (le != null && logger.isWarnEnabled()) {
logger.warn(...);
}
return result;
} catch (RpcException e) {
if (e.isBiz()) { // biz exception.
throw e;
}
le = e;
} catch (Throwable e) {
le = new RpcException(e.getMessage(), e);
} finally {
providers.add(invoker.getUrl().getAddress());
}
}
throw new RpcException(...);
}
上面的邏輯主要是兩點宜肉,根據(jù)配置的重試次數(shù)來決定是否重試翎碑,根據(jù)負載均衡實現(xiàn)從注冊中心返回的可用服務(wù)中選擇其中一個杈女,然后發(fā)起調(diào)用吊圾,當重試結(jié)束還未成功翰蠢,則拋出異常。
構(gòu)造帶Filter的Invoker
上面講了兩種Invoker的獲取和invoke的工作原理檀何,其實Dubbo中上面得到的Invoker不會直接返回給Proxy廷支,而是需要和Filter集成最終返回Invoker鏈。這部分的代碼前面白話部分講Filter的時候已分解垛孔,傳送門施敢。
總結(jié)
消費端的Proxy通過Invoker發(fā)起調(diào)用僵娃,Invoker對Proxy屏蔽了集群和服務(wù)治理等一系列邏輯,同時從Invoker層開始默怨,提供了對多協(xié)議的支持匙睹。從Invoker再往后走,將不存在接口和方法的概念垃僚,下一篇將分解傳輸層的實現(xiàn)谆棺。