8.1 簡(jiǎn)介
Dubbo服務(wù)調(diào)用過程較為復(fù)雜氧苍,包含眾多步驟夜矗,如發(fā)送請(qǐng)求、編解碼让虐、服務(wù)降級(jí)紊撕、過濾器鏈處理、序列化赡突、線程派發(fā)以及響應(yīng)請(qǐng)求等对扶。
8.1 源碼分析
源碼分析前,我們先通過一張圖了解Dubbo服務(wù)調(diào)用過程:
首先消費(fèi)者通過代理對(duì)象Proxy發(fā)起遠(yuǎn)程調(diào)用惭缰,接著通過網(wǎng)絡(luò)客戶端Client將編碼后的請(qǐng)求發(fā)送給服務(wù)提供方的網(wǎng)絡(luò)層浪南,即Server。Server在收到請(qǐng)求后漱受,首先對(duì)數(shù)據(jù)包進(jìn)行解碼络凿,然后將解碼后的請(qǐng)求發(fā)送給分發(fā)起Dispatcher,再由分發(fā)起將請(qǐng)求派發(fā)到指定的線程池昂羡,最后由線程池調(diào)用具體的服務(wù)絮记。
8.1.1 服務(wù)調(diào)用方式
Dubbo支持同步和異步兩種調(diào)用方式,其中異步調(diào)用還可細(xì)分為“有返回值”的異步調(diào)用和“無返回值”的異步調(diào)用虐先。所謂“無返回值”異步調(diào)用是指服務(wù)消費(fèi)方只管調(diào)用怨愤,不關(guān)心調(diào)用結(jié)果,此時(shí)Dubbo會(huì)直接返回一個(gè)空的RpcResult蛹批。若要使用異步特性撰洗,需要服務(wù)消費(fèi)方手動(dòng)進(jìn)行配置膀息。
下面我們將使用Dubbo官方提供的Demo分析整個(gè)調(diào)用過程,我們從DemoService接口的代理類開始進(jìn)行分析了赵。Dubbo默認(rèn)使用Javassist為服務(wù)接口生成動(dòng)態(tài)代理類潜支,因此我們需要現(xiàn)將代理類反編譯才能看到代碼:
/**
* Arthas 反編譯步驟:
* 1. 啟動(dòng) Arthas
* java -jar arthas-boot.jar
*
* 2. 輸入編號(hào)選擇進(jìn)程
* Arthas 啟動(dòng)后,會(huì)打印 Java 應(yīng)用進(jìn)程列表柿汛,如下:
* [1]: 11232 org.jetbrains.jps.cmdline.Launcher
* [2]: 22370 org.jetbrains.jps.cmdline.Launcher
* [3]: 22371 com.alibaba.dubbo.demo.consumer.Consumer
* [4]: 22362 com.alibaba.dubbo.demo.provider.Provider
* [5]: 2074 org.apache.zookeeper.server.quorum.QuorumPeerMain
* 這里輸入編號(hào) 3冗酿,讓 Arthas 關(guān)聯(lián)到啟動(dòng)類為 com.....Consumer 的 Java 進(jìn)程上
*
* 3. 由于 Demo 項(xiàng)目中只有一個(gè)服務(wù)接口,因此此接口的代理類類名為 proxy0络断,此時(shí)使用 sc 命令搜索這個(gè)類名裁替。
* $ sc *.proxy0
* com.alibaba.dubbo.common.bytecode.proxy0
*
* 4. 使用 jad 命令反編譯 com.alibaba.dubbo.common.bytecode.proxy0
* $ jad com.alibaba.dubbo.common.bytecode.proxy0
*
* 更多使用方法請(qǐng)參考 Arthas 官方文檔:
* https://alibaba.github.io/arthas/quick-start.html
*/
public class proxy0 implements ClassGenerator.DC, EchoService, DemoService {
// 方法數(shù)組
public static Method[] methods;
private InvocationHandler handler;
public proxy0(InvocationHandler invocationHandler) {
this.handler = invocationHandler;
}
public proxy0() {
}
public String sayHello(String string) {
// 將參數(shù)存儲(chǔ)到 Object 數(shù)組中
Object[] arrobject = new Object[]{string};
// 調(diào)用 InvocationHandler 實(shí)現(xiàn)類的 invoke 方法得到調(diào)用結(jié)果
Object object = this.handler.invoke(this, methods[0], arrobject);
// 返回調(diào)用結(jié)果
return (String)object;
}
/** 回聲測(cè)試方法 */
public Object $echo(Object object) {
Object[] arrobject = new Object[]{object};
Object object2 = this.handler.invoke(this, methods[1], arrobject);
return object2;
}
}
如上,代理類的邏輯比較簡(jiǎn)單貌笨,首先將運(yùn)行時(shí)參數(shù)存儲(chǔ)到數(shù)組中弱判,然后調(diào)用InvocationHandler接口實(shí)現(xiàn)類的invoke方法,得到調(diào)用結(jié)果锥惋,最后將結(jié)果強(qiáng)制類型轉(zhuǎn)換并返回昌腰。接下來我們看InvocationHandler的源碼:
public class InvokerInvocationHandler implements InvocationHandler {
private final Invoker<?> invoker;
public InvokerInvocationHandler(Invoker<?> handler) {
this.invoker = handler;
}
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
String methodName = method.getName();
Class<?>[] parameterTypes = method.getParameterTypes();
// 如果是Object 類中的方法(未被子類重寫),比如 wait/notify膀跌,直接調(diào)用
if (method.getDeclaringClass() == Object.class) {
return method.invoke(invoker, args);
}
// 如果 toString遭商、hashCode 和 equals 等方法被子類重寫了,這里也直接調(diào)用
if ("toString".equals(methodName) && parameterTypes.length == 0) {
return invoker.toString();
}
if ("hashCode".equals(methodName) && parameterTypes.length == 0) {
return invoker.hashCode();
}
if ("equals".equals(methodName) && parameterTypes.length == 1) {
return invoker.equals(args[0]);
}
// 將 method 和 args 封裝到 RpcInvocation 中捅伤,并執(zhí)行后續(xù)的調(diào)用
return invoker.invoke(new RpcInvocation(method, args)).recreate();
}
}
此處要注意InvokerInvocationHandler中的成員變量invoker在集群中實(shí)際類型為MockClusterInvoker(具體代碼在服務(wù)引用那一章劫流,ReferenceConfig的createProxy方法,對(duì)于多個(gè)提供者的時(shí)候使用SPI包裝擴(kuò)展MockClusterWrapper創(chuàng)建invoker)丛忆,而MockClusterInvoker內(nèi)部就封裝了服務(wù)降級(jí)邏輯祠汇,MockClusterInvoker是對(duì)FailoverClusterInvoker的一層包裝,具體這些會(huì)在下一篇集群相關(guān)章節(jié)中分析熄诡。這里先直接分析DubboInvoker這種直連方式的實(shí)現(xiàn)可很。
ublic abstract class AbstractInvoker<T> implements Invoker<T> {
public Result invoke(Invocation inv) throws RpcException {
if (destroyed.get()) {
throw new RpcException("Rpc invoker for service ...");
}
RpcInvocation invocation = (RpcInvocation) inv;
// 設(shè)置 Invoker
invocation.setInvoker(this);
if (attachment != null && attachment.size() > 0) {
// 設(shè)置 attachment
invocation.addAttachmentsIfAbsent(attachment);
}
Map<String, String> contextAttachments = RpcContext.getContext().getAttachments();
if (contextAttachments != null && contextAttachments.size() != 0) {
// 添加 contextAttachments 到 RpcInvocation#attachment 變量中
invocation.addAttachments(contextAttachments);
}
if (getUrl().getMethodParameter(invocation.getMethodName(), Constants.ASYNC_KEY, false)) {
// 設(shè)置異步信息到 RpcInvocation#attachment 中
invocation.setAttachment(Constants.ASYNC_KEY, Boolean.TRUE.toString());
}
RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);
try {
// 抽象方法,由子類實(shí)現(xiàn)
return doInvoke(invocation);
} catch (InvocationTargetException e) {
// ...
} catch (RpcException e) {
// ...
} catch (Throwable e) {
return new RpcResult(e);
}
}
protected abstract Result doInvoke(Invocation invocation) throws Throwable;
// 省略其他方法
}
上面的代碼來自AbstractInvoker類粮彤,其中大部分代碼用于添加信息到RpcInvocation#attachment變量中根穷,添加完畢后姜骡,調(diào)用doInvoke執(zhí)行后續(xù)的調(diào)用导坟,這是一個(gè)抽象方法,直連由DubboInvoker實(shí)現(xiàn):
public class DubboInvoker<T> extends AbstractInvoker<T> {
private final ExchangeClient[] clients;
protected Result doInvoke(final Invocation invocation) throws Throwable {
RpcInvocation inv = (RpcInvocation) invocation;
final String methodName = RpcUtils.getMethodName(invocation);
// 設(shè)置 path 和 version 到 attachment 中
inv.setAttachment(Constants.PATH_KEY, getUrl().getPath());
inv.setAttachment(Constants.VERSION_KEY, version);
ExchangeClient currentClient;
if (clients.length == 1) {
// 從 clients 數(shù)組中獲取 ExchangeClient
currentClient = clients[0];
} else {
currentClient = clients[index.getAndIncrement() % clients.length];
}
try {
// 獲取異步配置
boolean isAsync = RpcUtils.isAsync(getUrl(), invocation);
// isOneway 為 true圈澈,表示“單向”通信
boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
int timeout = getUrl().getMethodParameter(methodName, Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
// 異步無返回值
if (isOneway) {
boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
// 發(fā)送請(qǐng)求
currentClient.send(inv, isSent);
// 設(shè)置上下文中的 future 字段為 null
RpcContext.getContext().setFuture(null);
// 返回一個(gè)空的 RpcResult
return new RpcResult();
}
// 異步有返回值
else if (isAsync) {
// 發(fā)送請(qǐng)求惫周,并得到一個(gè) ResponseFuture 實(shí)例
ResponseFuture future = currentClient.request(inv, timeout);
// 設(shè)置 future 到上下文中
RpcContext.getContext().setFuture(new FutureAdapter<Object>(future));
// 暫時(shí)返回一個(gè)空結(jié)果
return new RpcResult();
}
// 同步調(diào)用
else {
RpcContext.getContext().setFuture(null);
// 發(fā)送請(qǐng)求,得到一個(gè) ResponseFuture 實(shí)例康栈,并調(diào)用該實(shí)例的 get 方法進(jìn)行等待
return (Result) currentClient.request(inv, timeout).get();
}
} catch (TimeoutException e) {
throw new RpcException(..., "Invoke remote method timeout....");
} catch (RemotingException e) {
throw new RpcException(..., "Failed to invoke remote method: ...");
}
}
// 省略其他方法
}
上面的代碼包含了Dubbo對(duì)同步和異步調(diào)用的處理邏輯递递,搞懂了上面的代碼喷橙,會(huì)對(duì)Dubbo的同步和異步調(diào)用方式又更深入的了解。Dubbo實(shí)現(xiàn)同步和異步調(diào)用比較關(guān)鍵的一點(diǎn)是在于由誰來調(diào)用ResponseFuture的get方法:同步模式下登舞,由框架自身調(diào)用ResponseFuture的get方法贰逾。異步調(diào)用模式下,則由用戶調(diào)用該方法菠秒。ResponseFuture是一個(gè)接口疙剑,我們來看一下它的默認(rèn)實(shí)現(xiàn)類DefaultFuture:
ResponseFuture 的 get 方法。異步調(diào)用模式下践叠,則由用戶調(diào)用該方法言缤。ResponseFuture 是一個(gè)接口,下面我們來看一下它的默認(rèn)實(shí)現(xiàn)類 DefaultFuture 的源碼禁灼。
public class DefaultFuture implements ResponseFuture {
private static final Map<Long, Channel> CHANNELS =
new ConcurrentHashMap<Long, Channel>();
private static final Map<Long, DefaultFuture> FUTURES =
new ConcurrentHashMap<Long, DefaultFuture>();
private final long id;
private final Channel channel;
private final Request request;
private final int timeout;
private final Lock lock = new ReentrantLock();
private final Condition done = lock.newCondition();
private volatile Response response;
public DefaultFuture(Channel channel, Request request, int timeout) {
this.channel = channel;
this.request = request;
// 獲取請(qǐng)求 id管挟,這個(gè) id 很重要,后面還會(huì)見到
this.id = request.getId();
this.timeout = timeout > 0 ? timeout : channel.getUrl().getPositiveParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
// 存儲(chǔ) <requestId, DefaultFuture> 映射關(guān)系到 FUTURES 中
FUTURES.put(id, this);
CHANNELS.put(id, channel);
}
@Override
public Object get() throws RemotingException {
return get(timeout);
}
@Override
public Object get(int timeout) throws RemotingException {
if (timeout <= 0) {
timeout = Constants.DEFAULT_TIMEOUT;
}
// 檢測(cè)服務(wù)提供方是否成功返回了調(diào)用結(jié)果
if (!isDone()) {
long start = System.currentTimeMillis();
lock.lock();
try {
// 循環(huán)檢測(cè)服務(wù)提供方是否成功返回了調(diào)用結(jié)果
while (!isDone()) {
// 如果調(diào)用結(jié)果尚未返回弄捕,這里等待一段時(shí)間
done.await(timeout, TimeUnit.MILLISECONDS);
// 如果調(diào)用結(jié)果成功返回僻孝,或等待超時(shí),此時(shí)跳出 while 循環(huán)守谓,執(zhí)行后續(xù)的邏輯
if (isDone() || System.currentTimeMillis() - start > timeout) {
break;
}
}
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
lock.unlock();
}
// 如果調(diào)用結(jié)果仍未返回皮璧,則拋出超時(shí)異常
if (!isDone()) {
throw new TimeoutException(sent > 0, channel, getTimeoutMessage(false));
}
}
// 返回調(diào)用結(jié)果
return returnFromResponse();
}
@Override
public boolean isDone() {
// 通過檢測(cè) response 字段為空與否,判斷是否收到了調(diào)用結(jié)果
return response != null;
}
private Object returnFromResponse() throws RemotingException {
Response res = response;
if (res == null) {
throw new IllegalStateException("response cannot be null");
}
// 如果調(diào)用結(jié)果的狀態(tài)為 Response.OK分飞,則表示調(diào)用過程正常悴务,服務(wù)提供方成功返回了調(diào)用結(jié)果
if (res.getStatus() == Response.OK) {
return res.getResult();
}
// 拋出異常
if (res.getStatus() == Response.CLIENT_TIMEOUT || res.getStatus() == Response.SERVER_TIMEOUT) {
throw new TimeoutException(res.getStatus() == Response.SERVER_TIMEOUT, channel, res.getErrorMessage());
}
throw new RemotingException(channel, res.getErrorMessage());
}
// 省略其他方法
}
如上,當(dāng)消費(fèi)者還沒接受到調(diào)用結(jié)果是譬猫,用戶線程調(diào)用get方法會(huì)被阻塞住讯檐。同步調(diào)用模式下,框架獲得DefaultFuture對(duì)象后染服,會(huì)理解調(diào)用get方法進(jìn)行等待别洪。而異步模式下則是將該對(duì)象封裝到FutureAdapter實(shí)例中,并將FutureAdapter實(shí)例設(shè)置到RpcContext中柳刮,供用戶使用挖垛。FutureAdapter是一個(gè)適配器,用于將Dubbo的ResposneFuture與jdk的Future進(jìn)行適配秉颗,當(dāng)用戶線程調(diào)用Future的get方法痢毒,經(jīng)過FutureAdapter適配,最終會(huì)調(diào)用ResponseFuture實(shí)現(xiàn)類對(duì)象的get方法蚕甥,也就是DefaultFuture的get方法哪替。
目前最新的代碼不再使用ResposneFuture,而是使用jdk后來提供的CompletableFuture菇怀。
8.2 服務(wù)消費(fèi)方發(fā)送請(qǐng)求
8.2.1 發(fā)送請(qǐng)求
先看同步模式下凭舶,服務(wù)消費(fèi)方是如何發(fā)送調(diào)用請(qǐng)求的:
上圖展示了服務(wù)消費(fèi)方發(fā)送請(qǐng)求過程的部分調(diào)用棧晌块,圖中可以看出經(jīng)過多次調(diào)用后,才將請(qǐng)求數(shù)據(jù)送至NettyNioClientSocketChannel帅霜,這樣做的原因是通過Exchange層為框架引入Request和Response語義匆背。我們先來分析ReferenceCountExchangeClient的源碼:
final class ReferenceCountExchangeClient implements ExchangeClient {
private final URL url;
private final AtomicInteger referenceCount = new AtomicInteger(0);
public ReferenceCountExchangeClient(ExchangeClient client, ConcurrentMap<String, LazyConnectExchangeClient> ghostClientMap) {
this.client = client;
// 引用計(jì)數(shù)自增
referenceCount.incrementAndGet();
this.url = client.getUrl();
// ...
}
@Override
public ResponseFuture request(Object request) throws RemotingException {
// 直接調(diào)用被裝飾對(duì)象的同簽名方法
return client.request(request);
}
@Override
public ResponseFuture request(Object request, int timeout) throws RemotingException {
// 直接調(diào)用被裝飾對(duì)象的同簽名方法
return client.request(request, timeout);
}
/** 引用計(jì)數(shù)自增,該方法由外部調(diào)用 */
public void incrementAndGetCount() {
// referenceCount 自增
referenceCount.incrementAndGet();
}
@Override
public void close(int timeout) {
// referenceCount 自減
if (referenceCount.decrementAndGet() <= 0) {
if (timeout == 0) {
client.close();
} else {
client.close(timeout);
}
client = replaceWithLazyClient();
}
}
// 省略部分方法
}
ReferenceCountExchangeClient內(nèi)部定義了一個(gè)引用技術(shù)變量referenceCount身冀,每當(dāng)其持有的client對(duì)象被引用一次都會(huì)進(jìn)行自增靠汁。每當(dāng)close方法被調(diào)用時(shí),進(jìn)行自減闽铐。這個(gè)類只是實(shí)現(xiàn)了一個(gè)引用技術(shù)的功能蝶怔,其他方法均直接調(diào)用被裝飾對(duì)象的方法,所以我們繼續(xù)分析HeaderExchangeClient這個(gè)類:
ReferenceCountExchangeClient 的源碼兄墅。
final class ReferenceCountExchangeClient implements ExchangeClient {
private final URL url;
private final AtomicInteger referenceCount = new AtomicInteger(0);
public ReferenceCountExchangeClient(ExchangeClient client, ConcurrentMap<String, LazyConnectExchangeClient> ghostClientMap) {
this.client = client;
// 引用計(jì)數(shù)自增
referenceCount.incrementAndGet();
this.url = client.getUrl();
// ...
}
@Override
public ResponseFuture request(Object request) throws RemotingException {
// 直接調(diào)用被裝飾對(duì)象的同簽名方法
return client.request(request);
}
@Override
public ResponseFuture request(Object request, int timeout) throws RemotingException {
// 直接調(diào)用被裝飾對(duì)象的同簽名方法
return client.request(request, timeout);
}
/** 引用計(jì)數(shù)自增踢星,該方法由外部調(diào)用 */
public void incrementAndGetCount() {
// referenceCount 自增
referenceCount.incrementAndGet();
}
@Override
public void close(int timeout) {
// referenceCount 自減
if (referenceCount.decrementAndGet() <= 0) {
if (timeout == 0) {
client.close();
} else {
client.close(timeout);
}
client = replaceWithLazyClient();
}
}
// 省略部分方法
}
ReferenceCountExchangeClient 內(nèi)部定義了一個(gè)引用計(jì)數(shù)變量 referenceCount,每當(dāng)該對(duì)象被引用一次 referenceCount 都會(huì)進(jìn)行自增隙咸。每當(dāng) close 方法被調(diào)用時(shí)沐悦,referenceCount 進(jìn)行自減。ReferenceCountExchangeClient 內(nèi)部?jī)H實(shí)現(xiàn)了一個(gè)引用計(jì)數(shù)的功能五督,其他方法并無復(fù)雜邏輯藏否,均是直接調(diào)用被裝飾對(duì)象的相關(guān)方法。所以這里就不多說了充包,繼續(xù)向下分析副签,這次是 HeaderExchangeClient。
public class HeaderExchangeClient implements ExchangeClient {
private static final ScheduledThreadPoolExecutor scheduled = new ScheduledThreadPoolExecutor(2, new NamedThreadFactory("dubbo-remoting-client-heartbeat", true));
private final Client client;
private final ExchangeChannel channel;
private ScheduledFuture<?> heartbeatTimer;
private int heartbeat;
private int heartbeatTimeout;
public HeaderExchangeClient(Client client, boolean needHeartbeat) {
if (client == null) {
throw new IllegalArgumentException("client == null");
}
this.client = client;
// 創(chuàng)建 HeaderExchangeChannel 對(duì)象
this.channel = new HeaderExchangeChannel(client);
// 以下代碼均與心跳檢測(cè)邏輯有關(guān)
String dubbo = client.getUrl().getParameter(Constants.DUBBO_VERSION_KEY);
this.heartbeat = client.getUrl().getParameter(Constants.HEARTBEAT_KEY, dubbo != null && dubbo.startsWith("1.0.") ? Constants.DEFAULT_HEARTBEAT : 0);
this.heartbeatTimeout = client.getUrl().getParameter(Constants.HEARTBEAT_TIMEOUT_KEY, heartbeat * 3);
if (heartbeatTimeout < heartbeat * 2) {
throw new IllegalStateException("heartbeatTimeout < heartbeatInterval * 2");
}
if (needHeartbeat) {
// 開啟心跳檢測(cè)定時(shí)器
startHeartbeatTimer();
}
}
@Override
public ResponseFuture request(Object request) throws RemotingException {
// 直接 HeaderExchangeChannel 對(duì)象的同簽名方法
return channel.request(request);
}
@Override
public ResponseFuture request(Object request, int timeout) throws RemotingException {
// 直接 HeaderExchangeChannel 對(duì)象的同簽名方法
return channel.request(request, timeout);
}
@Override
public void close() {
doClose();
channel.close();
}
private void doClose() {
// 停止心跳檢測(cè)定時(shí)器
stopHeartbeatTimer();
}
private void startHeartbeatTimer() {
stopHeartbeatTimer();
if (heartbeat > 0) {
heartbeatTimer = scheduled.scheduleWithFixedDelay(
new HeartBeatTask(new HeartBeatTask.ChannelProvider() {
@Override
public Collection<Channel> getChannels() {
return Collections.<Channel>singletonList(HeaderExchangeClient.this);
}
}, heartbeat, heartbeatTimeout),
heartbeat, heartbeat, TimeUnit.MILLISECONDS);
}
}
private void stopHeartbeatTimer() {
if (heartbeatTimer != null && !heartbeatTimer.isCancelled()) {
try {
heartbeatTimer.cancel(true);
scheduled.purge();
} catch (Throwable e) {
if (logger.isWarnEnabled()) {
logger.warn(e.getMessage(), e);
}
}
}
heartbeatTimer = null;
}
// 省略部分方法
}
HeaderExchangeClient很多方法都只有一行代碼基矮,即直接調(diào)用HeaderExchangeChannel 對(duì)象的方法淆储。那么HeaderExchangeClient的用處是什么呢?其實(shí)只是封裝了一些關(guān)于心跳檢測(cè)的邏輯家浇,所以我們還要進(jìn)一步分析HeaderExchangeChannel的實(shí)現(xiàn):
HeaderExchangeChannel 對(duì)象的同簽名方法本砰。那 HeaderExchangeClient 有什么用處呢?答案是封裝了一些關(guān)于心跳檢測(cè)的邏輯钢悲。心跳檢測(cè)并非本文所關(guān)注的點(diǎn)点额,因此就不多說了,繼續(xù)向下看莺琳。
final class HeaderExchangeChannel implements ExchangeChannel {
private final Channel channel;
HeaderExchangeChannel(Channel channel) {
if (channel == null) {
throw new IllegalArgumentException("channel == null");
}
// 這里的 channel 指向的是 NettyClient
this.channel = channel;
}
@Override
public ResponseFuture request(Object request) throws RemotingException {
return request(request, channel.getUrl().getPositiveParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT));
}
@Override
public ResponseFuture request(Object request, int timeout) throws RemotingException {
if (closed) {
throw new RemotingException(..., "Failed to send request ...);
}
// 創(chuàng)建 Request 對(duì)象
Request req = new Request();
req.setVersion(Version.getProtocolVersion());
// 設(shè)置雙向通信標(biāo)志為 true
req.setTwoWay(true);
// 這里的 request 變量類型為 RpcInvocation
req.setData(request);
// 創(chuàng)建 DefaultFuture 對(duì)象
DefaultFuture future = new DefaultFuture(channel, req, timeout);
try {
// 調(diào)用 NettyClient 的 send 方法發(fā)送請(qǐng)求
channel.send(req);
} catch (RemotingException e) {
future.cancel();
throw e;
}
// 返回 DefaultFuture 對(duì)象
return future;
}
}
到這里还棱,我們終于看到了Request語義,方法中首先創(chuàng)建一個(gè)Request對(duì)象芦昔,把RpcInvocation和一些其他信息放進(jìn)Request中诱贿,然后將該對(duì)象傳遞給NettyClient的send方法娃肿,進(jìn)行后續(xù)的調(diào)用咕缎。NettyClient的send方法實(shí)現(xiàn)直接繼承自AbstractPeer類:
w DefaultFuture(channel, req, timeout);
try {
// 調(diào)用 NettyClient 的 send 方法發(fā)送請(qǐng)求
channel.send(req);
} catch (RemotingException e) {
future.cancel();
throw e;
}
// 返回 DefaultFuture 對(duì)象
return future;
}
}
到這里大家終于看到了 Request 語義了珠十,上面的方法首先定義了一個(gè) Request 對(duì)象,然后再將該對(duì)象傳給 NettyClient 的 send 方法凭豪,進(jìn)行后續(xù)的調(diào)用焙蹭。需要說明的是,NettyClient 中并未實(shí)現(xiàn) send 方法嫂伞,該方法繼承自父類 AbstractPeer孔厉,下面直接分析 AbstractPeer 的代碼。
public abstract class AbstractPeer implements Endpoint, ChannelHandler {
@Override
public void send(Object message) throws RemotingException {
// 該方法由子類AbstractClient 類實(shí)現(xiàn)
send(message, url.getParameter(Constants.SENT_KEY, false));
}
// 省略其他方法
}
public abstract class AbstractClient extends AbstractEndpoint implements Client {
@Override
public void send(Object message, boolean sent) throws RemotingException {
if (send_reconnect && !isConnected()) {
connect();
}
// 獲取 Channel帖努,getChannel 是一個(gè)抽象方法撰豺,具體由子類實(shí)現(xiàn)
Channel channel = getChannel();
if (channel == null || !channel.isConnected()) {
throw new RemotingException(this, "message can not send ...");
}
// 繼續(xù)向下調(diào)用
channel.send(message, sent);
}
protected abstract Channel getChannel();
// 省略其他方法
}
注意此處繼承關(guān)系較多,NettyClient繼承了AbstractClient拼余,AbstractClient繼承了AbstractEndpoint污桦,AbstractEndpoint繼承了AbstractPeer。
默認(rèn)情況下匙监,Dubbo使用Netty作為底層的通信框架凡橱,因此下面我們到NettyClient子類看一下getChannel方法的實(shí)現(xiàn):
public class NettyClient extends AbstractClient {
// 這里的 Channel 全限定名稱為 org.jboss.netty.channel.Channel
private volatile Channel channel;
@Override
protected com.alibaba.dubbo.remoting.Channel getChannel() {
Channel c = channel;
if (c == null || !c.isConnected())
return null;
// 獲取一個(gè) NettyChannel 類型對(duì)象
return NettyChannel.getOrAddChannel(c, getUrl(), this);
}
}
final class NettyChannel extends AbstractChannel {
private static final ConcurrentMap<org.jboss.netty.channel.Channel, NettyChannel> channelMap =
new ConcurrentHashMap<org.jboss.netty.channel.Channel, NettyChannel>();
private final org.jboss.netty.channel.Channel channel;
/** 私有構(gòu)造方法 */
private NettyChannel(org.jboss.netty.channel.Channel channel, URL url, ChannelHandler handler) {
super(url, handler);
if (channel == null) {
throw new IllegalArgumentException("netty channel == null;");
}
this.channel = channel;
}
static NettyChannel getOrAddChannel(org.jboss.netty.channel.Channel ch, URL url, ChannelHandler handler) {
if (ch == null) {
return null;
}
// 嘗試從集合中獲取 NettyChannel 實(shí)例
NettyChannel ret = channelMap.get(ch);
if (ret == null) {
// 如果 ret = null,則創(chuàng)建一個(gè)新的 NettyChannel 實(shí)例
NettyChannel nc = new NettyChannel(ch, url, handler);
if (ch.isConnected()) {
// 將 <Channel, NettyChannel> 鍵值對(duì)存入 channelMap 集合中
ret = channelMap.putIfAbsent(ch, nc);
}
if (ret == null) {
ret = nc;
}
}
return ret;
}
}
獲取到 NettyChannel 實(shí)例后亭姥,即可進(jìn)行后續(xù)的調(diào)用稼钩。下面看一下 NettyChannel 的 send 方法。
public void send(Object message, boolean sent) throws RemotingException {
super.send(message, sent);
boolean success = true;
int timeout = 0;
try {
// 發(fā)送消息(包含請(qǐng)求和響應(yīng)消息)
ChannelFuture future = channel.write(message);
// sent 的值源于 <dubbo:method sent="true/false" /> 中 sent 的配置值达罗,有兩種配置值:
// 1. true: 等待消息發(fā)出坝撑,消息發(fā)送失敗將拋出異常
// 2. false: 不等待消息發(fā)出,將消息放入 IO 隊(duì)列粮揉,即刻返回
// 默認(rèn)情況下 sent = false绍载;
if (sent) {
timeout = getUrl().getPositiveParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
// 等待消息發(fā)出,若在規(guī)定時(shí)間沒能發(fā)出滔蝉,success 會(huì)被置為 false
success = future.await(timeout);
}
Throwable cause = future.getCause();
if (cause != null) {
throw cause;
}
} catch (Throwable e) {
throw new RemotingException(this, "Failed to send message ...");
}
// 若 success 為 false击儡,這里拋出異常
if (!success) {
throw new RemotingException(this, "Failed to send message ...");
}
}
經(jīng)歷多次調(diào)用,到這里請(qǐng)求數(shù)據(jù)的發(fā)送過程就結(jié)束了蝠引,過程漫長(zhǎng)阳谍。為了便于大家閱讀代碼,這里以 DemoService 為例螃概,將 sayHello 方法的整個(gè)調(diào)用路徑貼出來矫夯。
proxy0#sayHello(String)
—> InvokerInvocationHandler#invoke(Object, Method, Object[])
—> MockClusterInvoker#invoke(Invocation)
—> AbstractClusterInvoker#invoke(Invocation)
—> FailoverClusterInvoker#doInvoke(Invocation, List<Invoker<T>>, LoadBalance)
—> Filter#invoke(Invoker, Invocation) // 包含多個(gè) Filter 調(diào)用
—> ListenerInvokerWrapper#invoke(Invocation)
—> AbstractInvoker#invoke(Invocation)
—> DubboInvoker#doInvoke(Invocation)
—> ReferenceCountExchangeClient#request(Object, int)
—> HeaderExchangeClient#request(Object, int)
—> HeaderExchangeChannel#request(Object, int)
—> AbstractPeer#send(Object)
—> AbstractClient#send(Object, boolean)
—> NettyChannel#send(Object, boolean)
—> NioClientSocketChannel#write(Object)
8.2.2 請(qǐng)求編碼
了解netty的同學(xué),很自然就想到吊洼,在前面invoker的調(diào)用到達(dá)NettyClient之后训貌,對(duì)與編解碼的處理肯定會(huì)通過NettyClient將負(fù)責(zé)編解碼的ChannelHandler添加到netty的pipeline中,具體邏輯在NettyClient的doOpen方法:
如上,借助NettyCodecAdapter將Codec2接口借助SPI的方式加載編解碼實(shí)現(xiàn)類構(gòu)造出相對(duì)應(yīng)的編解碼ChannelHanlder递沪。
具體編解碼的實(shí)現(xiàn)以及Dubbo提供的SPI接口ChannelHandler和Dubbo實(shí)現(xiàn)netty提供的出站入站ChannelHandler是怎么關(guān)聯(lián)的豺鼻,可以參考上一篇的講解。
8.3 服務(wù)提供方接收請(qǐng)求
8.3.1 請(qǐng)求解碼
通過netty的channelHandler找到Codec2接口的實(shí)現(xiàn)類進(jìn)行解碼款慨,具體實(shí)現(xiàn)參考上一篇儒飒。
8.3.2 調(diào)用服務(wù)
解碼器將數(shù)據(jù)包解析成Request對(duì)象后,NettyHandler的messageReceived方法接收到這個(gè)對(duì)象檩奠,會(huì)繼續(xù)向下傳遞桩了。通過SPI的包裝擴(kuò)展,一步步傳遞給NettyServer埠戳、MultiMessagehandler井誉、HeartbeatHandler以及AllChannelHandler。最終由AllChannelHandler將該對(duì)象分裝到Runnable實(shí)現(xiàn)類對(duì)象中整胃,并將Runnable放入線程池中執(zhí)行后續(xù)的調(diào)用邏輯送悔,整個(gè)調(diào)用棧如下:
NettyHandler#messageReceived(ChannelHandlerContext, MessageEvent)
—> AbstractPeer#received(Channel, Object)
—> MultiMessageHandler#received(Channel, Object)
—> HeartbeatHandler#received(Channel, Object)
—> AllChannelHandler#received(Channel, Object)
—> ExecutorService#execute(Runnable) // 由線程池執(zhí)行后續(xù)的調(diào)用邏輯
我們直接來看一下最后一個(gè)處理器AllChannelHandler的邏輯,它是一個(gè)將所有消息都派發(fā)到業(yè)務(wù)線程池去執(zhí)行的策略:
public class AllChannelHandler extends WrappedChannelHandler {
public AllChannelHandler(ChannelHandler handler, URL url) {
super(handler, url);
}
/** 處理連接事件 */
@Override
public void connected(Channel channel) throws RemotingException {
// 獲取線程池
ExecutorService cexecutor = getExecutorService();
try {
// 將連接事件派發(fā)到線程池中處理
cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CONNECTED));
} catch (Throwable t) {
throw new ExecutionException(..., " error when process connected event .", t);
}
}
/** 處理斷開事件 */
@Override
public void disconnected(Channel channel) throws RemotingException {
ExecutorService cexecutor = getExecutorService();
try {
cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.DISCONNECTED));
} catch (Throwable t) {
throw new ExecutionException(..., "error when process disconnected event .", t);
}
}
/** 處理請(qǐng)求和響應(yīng)消息爪模,這里的 message 變量類型可能是 Request欠啤,也可能是 Response */
@Override
public void received(Channel channel, Object message) throws RemotingException {
ExecutorService cexecutor = getExecutorService();
try {
// 將請(qǐng)求和響應(yīng)消息派發(fā)到線程池中處理
cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
} catch (Throwable t) {
if(message instanceof Request && t instanceof RejectedExecutionException){
Request request = (Request)message;
// 如果通信方式為雙向通信,此時(shí)將 Server side ... threadpool is exhausted
// 錯(cuò)誤信息封裝到 Response 中屋灌,并返回給服務(wù)消費(fèi)方洁段。
if(request.isTwoWay()){
String msg = "Server side(" + url.getIp() + "," + url.getPort()
+ ") threadpool is exhausted ,detail msg:" + t.getMessage();
Response response = new Response(request.getId(), request.getVersion());
response.setStatus(Response.SERVER_THREADPOOL_EXHAUSTED_ERROR);
response.setErrorMessage(msg);
// 返回包含錯(cuò)誤信息的 Response 對(duì)象
channel.send(response);
return;
}
}
throw new ExecutionException(..., " error when process received event .", t);
}
}
/** 處理異常信息 */
@Override
public void caught(Channel channel, Throwable exception) throws RemotingException {
ExecutorService cexecutor = getExecutorService();
try {
cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CAUGHT, exception));
} catch (Throwable t) {
throw new ExecutionException(..., "error when process caught event ...");
}
}
}
如上,請(qǐng)求對(duì)象會(huì)被封裝到ChannelEventRunnable中共郭,該類將會(huì)是服務(wù)調(diào)用過程的新起點(diǎn)祠丝,所以接下來我們以它為起點(diǎn)向下探索:
public class ChannelEventRunnable implements Runnable {
private final ChannelHandler handler;
private final Channel channel;
private final ChannelState state;
private final Throwable exception;
private final Object message;
@Override
public void run() {
// 檢測(cè)通道狀態(tài),對(duì)于請(qǐng)求或響應(yīng)消息除嘹,此時(shí) state = RECEIVED
if (state == ChannelState.RECEIVED) {
try {
// 將 channel 和 message 傳給 ChannelHandler 對(duì)象写半,進(jìn)行后續(xù)的調(diào)用
// 注意這個(gè)handler是AllChannelHandler傳過來的,
// AllChannelHandler本身是handlerWrapper尉咕,通過把其包裝的handler傳遞給ChannelEventRunnable叠蝇,
// 使其能夠在線程池中繼續(xù)handler(還有多層包裝)的處理
handler.received(channel, message);
} catch (Exception e) {
logger.warn("... operation error, channel is ... message is ...");
}
}
// 其他消息類型通過 switch 進(jìn)行處理
else {
switch (state) {
case CONNECTED:
try {
handler.connected(channel);
} catch (Exception e) {
logger.warn("... operation error, channel is ...");
}
break;
case DISCONNECTED:
// ...
case SENT:
// ...
case CAUGHT:
// ...
default:
logger.warn("unknown state: " + state + ", message is " + message);
}
}
}
}
接下來,AllChannelHandler給ChannelEventRunnable傳遞的handler是DecodeHandler:
DecodeHandler extends AbstractChannelHandlerDelegate {
public DecodeHandler(ChannelHandler handler) {
super(handler);
}
@Override
public void received(Channel channel, Object message) throws RemotingException {
if (message instanceof Decodeable) {
// 對(duì) Decodeable 接口實(shí)現(xiàn)類對(duì)象進(jìn)行解碼
decode(message);
}
if (message instanceof Request) {
// 對(duì) Request 的 data 字段進(jìn)行解碼
decode(((Request) message).getData());
}
if (message instanceof Response) {
// 對(duì) Request 的 result 字段進(jìn)行解碼
decode(((Response) message).getResult());
}
// 執(zhí)行后續(xù)邏輯
handler.received(channel, message);
}
private void decode(Object message) {
// Decodeable 接口目前有兩個(gè)實(shí)現(xiàn)類年缎,
// 分別為 DecodeableRpcInvocation 和 DecodeableRpcResult
if (message != null && message instanceof Decodeable) {
try {
// 執(zhí)行解碼邏輯
((Decodeable) message).decode();
} catch (Throwable e) {
if (log.isWarnEnabled()) {
log.warn("Call Decodeable.decode failed: " + e.getMessage(), e);
}
}
}
}
}
DecodeHandler對(duì)消息進(jìn)行解碼悔捶,然后繼續(xù)調(diào)用其包裝的下一個(gè)handler的received方法,即HeaderExchangeHandler:
public class HeaderExchangeHandler implements ChannelHandlerDelegate {
private final ExchangeHandler handler;
public HeaderExchangeHandler(ExchangeHandler handler) {
if (handler == null) {
throw new IllegalArgumentException("handler == null");
}
this.handler = handler;
}
@Override
public void received(Channel channel, Object message) throws RemotingException {
channel.setAttribute(KEY_READ_TIMESTAMP, System.currentTimeMillis());
ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel);
try {
// 處理請(qǐng)求對(duì)象
if (message instanceof Request) {
Request request = (Request) message;
if (request.isEvent()) {
// 處理事件
handlerEvent(channel, request);
}
// 處理普通的請(qǐng)求
else {
// 雙向通信
if (request.isTwoWay()) {
// 向后調(diào)用服務(wù)单芜,并得到調(diào)用結(jié)果
Response response = handleRequest(exchangeChannel, request);
// 將調(diào)用結(jié)果返回給服務(wù)消費(fèi)端
channel.send(response);
}
// 如果是單向通信蜕该,僅向后調(diào)用指定服務(wù)即可,無需返回調(diào)用結(jié)果
else {
handler.received(exchangeChannel, request.getData());
}
}
}
// 處理響應(yīng)對(duì)象洲鸠,服務(wù)消費(fèi)方會(huì)執(zhí)行此處邏輯堂淡,后面分析
else if (message instanceof Response) {
handleResponse(channel, (Response) message);
} else if (message instanceof String) {
// telnet 相關(guān),忽略
} else {
handler.received(exchangeChannel, message);
}
} finally {
HeaderExchangeChannel.removeChannelIfDisconnected(channel);
}
}
Response handleRequest(ExchangeChannel channel, Request req) throws RemotingException {
Response res = new Response(req.getId(), req.getVersion());
// 檢測(cè)請(qǐng)求是否合法,不合法則返回狀態(tài)碼為 BAD_REQUEST 的響應(yīng)
if (req.isBroken()) {
Object data = req.getData();
String msg;
if (data == null)
msg = null;
else if
(data instanceof Throwable) msg = StringUtils.toString((Throwable) data);
else
msg = data.toString();
res.setErrorMessage("Fail to decode request due to: " + msg);
// 設(shè)置 BAD_REQUEST 狀態(tài)
res.setStatus(Response.BAD_REQUEST);
return res;
}
// 獲取 data 字段值绢淀,也就是 RpcInvocation 對(duì)象
Object msg = req.getData();
try {
// 繼續(xù)向下調(diào)用
Object result = handler.reply(channel, msg);
// 設(shè)置 OK 狀態(tài)碼
res.setStatus(Response.OK);
// 設(shè)置調(diào)用結(jié)果
res.setResult(result);
} catch (Throwable e) {
// 若調(diào)用過程出現(xiàn)異常萤悴,則設(shè)置 SERVICE_ERROR,表示服務(wù)端異常
res.setStatus(Response.SERVICE_ERROR);
res.setErrorMessage(StringUtils.toString(e));
}
return res;
}
}
到這里更啄,我們看到了比較清晰的請(qǐng)求和響應(yīng)邏輯稚疹,對(duì)于雙向通信居灯,HeaderExchangeHandler首先向后調(diào)用祭务,得到調(diào)用結(jié)果,然后將調(diào)用結(jié)果封裝到Response對(duì)象怪嫌,最后返回給服務(wù)消費(fèi)方义锥。如果請(qǐng)求不合法或者調(diào)用失敗,則將錯(cuò)誤信息封裝到Response對(duì)象岩灭,并返回給服務(wù)消費(fèi)方拌倍。接下來,我們把剩余的調(diào)用過程分析完噪径,下一個(gè)handler是DubboProtocol中的匿名類ExchangeHandlerAdapter(這里有點(diǎn)糊涂柱恤,匿名類不可能通過spi包裝,難道整個(gè)包裝是通過dispatcher=all配置項(xiàng)手動(dòng)包裝的找爱?)
ExchangeHandlerAdapter() {
@Override
public Object reply(ExchangeChannel channel, Object message) throws RemotingException {
if (message instanceof Invocation) {
Invocation inv = (Invocation) message;
// 獲取 Invoker 實(shí)例
Invoker<?> invoker = getInvoker(channel, inv);
if (Boolean.TRUE.toString().equals(inv.getAttachments().get(IS_CALLBACK_SERVICE_INVOKE))) {
// 回調(diào)相關(guān)梗顺,忽略
}
RpcContext.getContext().setRemoteAddress(channel.getRemoteAddress());
// 通過 Invoker 調(diào)用具體的服務(wù)
return invoker.invoke(inv);
}
throw new RemotingException(channel, "Unsupported request: ...");
}
// 忽略其他方法
}
Invoker<?> getInvoker(Channel channel, Invocation inv) throws RemotingException {
// 忽略回調(diào)和本地存根相關(guān)邏輯
// ...
int port = channel.getLocalAddress().getPort();
// 計(jì)算 service key,格式為 groupName/serviceName:serviceVersion:port车摄。比如:
// dubbo/com.alibaba.dubbo.demo.DemoService:1.0.0:20880
String serviceKey = serviceKey(port, path, inv.getAttachments().get(Constants.VERSION_KEY), inv.getAttachments().get(Constants.GROUP_KEY));
// 從 exporterMap 查找與 serviceKey 相對(duì)應(yīng)的 DubboExporter 對(duì)象寺谤,
// 服務(wù)導(dǎo)出過程中會(huì)將 <serviceKey, DubboExporter> 映射關(guān)系存儲(chǔ)到 exporterMap 集合中
DubboExporter<?> exporter = (DubboExporter<?>) exporterMap.get(serviceKey);
if (exporter == null)
throw new RemotingException(channel, "Not found exported service ...");
// 獲取 Invoker 對(duì)象,并返回
return exporter.getInvoker();
}
// 忽略其他方法
}
以上邏輯用于獲取與指定服務(wù)對(duì)應(yīng)的 Invoker 實(shí)例吮播,并通過 Invoker 的 invoke 方法調(diào)用服務(wù)邏輯变屁。invoke 方法定義在 AbstractProxyInvoker 中,代碼如下:
public abstract class AbstractProxyInvoker<T> implements Invoker<T> {
@Override
public Result invoke(Invocation invocation) throws RpcException {
try {
// 調(diào)用 doInvoke 執(zhí)行后續(xù)的調(diào)用意狠,并將調(diào)用結(jié)果封裝到 RpcResult 中粟关,并
return new RpcResult(doInvoke(proxy, invocation.getMethodName(), invocation.getParameterTypes(), invocation.getArguments()));
} catch (InvocationTargetException e) {
return new RpcResult(e.getTargetException());
} catch (Throwable e) {
throw new RpcException("Failed to invoke remote proxy method ...");
}
}
protected abstract Object doInvoke(T proxy, String methodName, Class<?>[] parameterTypes, Object[] arguments) throws Throwable;
}
如上,doInvoke 是一個(gè)抽象方法环戈,這個(gè)需要由具體的 Invoker 實(shí)例實(shí)現(xiàn)誊役。Invoker 實(shí)例是在運(yùn)行時(shí)通過 JavassistProxyFactory 創(chuàng)建的,創(chuàng)建邏輯如下:
public class JavassistProxyFactory extends AbstractProxyFactory {
// 省略其他方法
@Override
public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {
final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type);
// 創(chuàng)建匿名類對(duì)象
return new AbstractProxyInvoker<T>(proxy, type, url) {
@Override
protected Object doInvoke(T proxy, String methodName,
Class<?>[] parameterTypes,
Object[] arguments) throws Throwable {
// 調(diào)用 invokeMethod 方法進(jìn)行后續(xù)的調(diào)用
return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);
}
};
}
}
Wrapper 是一個(gè)抽象類谷市,其中 invokeMethod 是一個(gè)抽象方法蛔垢。Dubbo 會(huì)在運(yùn)行時(shí)通過 Javassist 框架為 Wrapper 生成實(shí)現(xiàn)類,并實(shí)現(xiàn) invokeMethod 方法迫悠,該方法最終會(huì)根據(jù)調(diào)用信息調(diào)用具體的服務(wù)鹏漆。以 DemoServiceImpl 為例,Javassist 為其生成的代理類如下。
/** Wrapper0 是在運(yùn)行時(shí)生成的艺玲,大家可使用 Arthas 進(jìn)行反編譯 */
public class Wrapper0 extends Wrapper implements ClassGenerator.DC {
public static String[] pns;
public static Map pts;
public static String[] mns;
public static String[] dmns;
public static Class[] mts0;
// 省略其他方法
public Object invokeMethod(Object object, String string, Class[] arrclass, Object[] arrobject) throws InvocationTargetException {
DemoService demoService;
try {
// 類型轉(zhuǎn)換
demoService = (DemoService)object;
}
catch (Throwable throwable) {
throw new IllegalArgumentException(throwable);
}
try {
// 根據(jù)方法名調(diào)用指定的方法
if ("sayHello".equals(string) && arrclass.length == 1) {
return demoService.sayHello((String)arrobject[0]);
}
}
catch (Throwable throwable) {
throw new InvocationTargetException(throwable);
}
throw new NoSuchMethodException(new StringBuffer().append("Not found method \"").append(string).append("\" in class com.alibaba.dubbo.demo.DemoService.").toString());
}
}
到這里括蝠,整個(gè)服務(wù)調(diào)用過程就分析完了趾断。最后把調(diào)用過程貼出來冷溃,如下:
ChannelEventRunnable#run()
—> DecodeHandler#received(Channel, Object)
—> HeaderExchangeHandler#received(Channel, Object)
—> HeaderExchangeHandler#handleRequest(ExchangeChannel, Request)
—> DubboProtocol.requestHandler#reply(ExchangeChannel, Object)
—> Filter#invoke(Invoker, Invocation)
—> AbstractProxyInvoker#invoke(Invocation)
—> Wrapper0#invokeMethod(Object, String, Class[], Object[])
—> DemoServiceImpl#sayHello(String)
剩余的將調(diào)用結(jié)果封裝后南蹂,編碼發(fā)給消費(fèi)方的過程就不再累贅了呈驶。
消費(fèi)方接收到響應(yīng)后炸站,也是類似的過程贺待。響應(yīng)數(shù)據(jù)解碼完成后转质,Dubbo會(huì)將響應(yīng)對(duì)象派發(fā)到線程池涧狮,要注意的是線程池中的線程并不是用戶的調(diào)用線程(應(yīng)該是客戶端的業(yè)務(wù)線程池)酪碘,所以要想辦法將響應(yīng)對(duì)象從線程池線程傳遞到用戶線程上朋譬。還記得我們前面分析的用戶線程在發(fā)送我請(qǐng)求后,會(huì)調(diào)用DefaultFuture的get方法等待對(duì)象的到來兴垦。當(dāng)響應(yīng)對(duì)象到來后徙赢,用戶線程被喚醒,并通過調(diào)用編號(hào)獲取屬于自己的響應(yīng)對(duì)象探越,具體實(shí)現(xiàn)如下:
public class HeaderExchangeHandler implements ChannelHandlerDelegate {
@Override
public void received(Channel channel, Object message) throws RemotingException {
channel.setAttribute(KEY_READ_TIMESTAMP, System.currentTimeMillis());
ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel);
try {
if (message instanceof Request) {
// 處理請(qǐng)求狡赐,前面已分析過,省略
} else if (message instanceof Response) {
// 處理響應(yīng)
handleResponse(channel, (Response) message);
} else if (message instanceof String) {
// telnet 相關(guān)钦幔,忽略
} else {
handler.received(exchangeChannel, message);
}
} finally {
HeaderExchangeChannel.removeChannelIfDisconnected(channel);
}
}
static void handleResponse(Channel channel, Response response) throws RemotingException {
if (response != null && !response.isHeartbeat()) {
// 繼續(xù)向下調(diào)用
DefaultFuture.received(channel, response);
}
}
}
public class DefaultFuture implements ResponseFuture {
private final Lock lock = new ReentrantLock();
private final Condition done = lock.newCondition();
private volatile Response response;
public static void received(Channel channel, Response response) {
try {
// 根據(jù)調(diào)用編號(hào)從 FUTURES 集合中查找指定的 DefaultFuture 對(duì)象
DefaultFuture future = FUTURES.remove(response.getId());
if (future != null) {
// 繼續(xù)向下調(diào)用
future.doReceived(response);
} else {
logger.warn("The timeout response finally returned at ...");
}
} finally {
CHANNELS.remove(response.getId());
}
}
private void doReceived(Response res) {
lock.lock();
try {
// 保存響應(yīng)對(duì)象
response = res;
if (done != null) {
// 喚醒用戶線程
done.signal();
}
} finally {
lock.unlock();
}
if (callback != null) {
invokeCallback(callback);
}
}
}
以上邏輯是將響應(yīng)對(duì)象保存到相應(yīng)的 DefaultFuture 實(shí)例中枕屉,然后再喚醒用戶線程,隨后用戶線程即可從 DefaultFuture 實(shí)例中獲取到相應(yīng)結(jié)果节槐。