一、消費(fèi)者發(fā)起請求
1.1 調(diào)用入口
在@Reference注入的bean的invoke方法,即Invoker.invoke。
public interface Invoker<T> extends Node {
/**
* get service interface.
*
* @return service interface.
*/
Class<T> getInterface();
/**
* invoke.
*
* @param invocation
* @return result
* @throws RpcException
*/
Result invoke(Invocation invocation) throws RpcException;
}
然后依次調(diào)用的路徑:
MockClusterInvoker.invoke -> //用來支持mock
AbstractClusterInvoker.invoke -> //用來加載指定的負(fù)載均衡策略
FailoverClusterInvoker.doInvoke -> //用來負(fù)載均衡選擇具體哪個提供者的invoker
AbstractInvoker.invoke -> 用來初始化一些數(shù)據(jù)
DubboInvoker.doInvoke -> 用來執(zhí)行invoke調(diào)用
HeaderExchangeClient.request -> 用來執(zhí)行網(wǎng)絡(luò)調(diào)用請求
HeaderExchangeChannel.request -> 用來執(zhí)行封裝Request請求
AbstractPeer.send ->
NettyChannel.send ->
暫時到這里。想一下為啥會上上面的調(diào)用路徑呢售葡?因?yàn)檎嬲腄ubboInvoke會被包裝為很多層。比如為了滿足服務(wù)治理:使用FailoverClusterInvoker忠藤,為了滿足mock使用MockClusterInvoker挟伙,為了滿足過濾器,使用ProtocolFilterWrapper這里沒有顯示出來模孩,等等尖阔。
這里從DubboInvoker的doInvoke開始看,源碼如下:
@Override
protected Result doInvoke(final Invocation invocation) throws Throwable {
RpcInvocation inv = (RpcInvocation) invocation;
final String methodName = RpcUtils.getMethodName(invocation);
inv.setAttachment(Constants.PATH_KEY, getUrl().getPath());
inv.setAttachment(Constants.VERSION_KEY, version);
ExchangeClient currentClient;
if (clients.length == 1) {
currentClient = clients[0];
} else {
currentClient = clients[index.getAndIncrement() % clients.length];
}
try {
boolean isAsync = RpcUtils.isAsync(getUrl(), invocation);
boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
int timeout = getUrl().getMethodParameter(methodName, Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
if (isOneway) {
boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
currentClient.send(inv, isSent);
RpcContext.getContext().setFuture(null);
return new RpcResult();
} else if (isAsync) {
ResponseFuture future = currentClient.request(inv, timeout);
RpcContext.getContext().setFuture(new FutureAdapter<Object>(future));
return new RpcResult();
} else {
RpcContext.getContext().setFuture(null);
return (Result) currentClient.request(inv, timeout).get();
}
} catch (TimeoutException e) {
throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
} catch (RemotingException e) {
throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
}
}
可以看到榨咐,正常調(diào)用都是雙向的介却,且是同步的。所以會走到else里面块茁。如下:
RpcContext.getContext().setFuture(null);
return (Result) currentClient.request(inv, timeout).get();
這里有個關(guān)鍵的地方:用戶線程在發(fā)送完request請求后齿坷,使用get()方法阻塞本次調(diào)用的用戶線程,等待ResponseFuture的返回数焊。而該ResponseFuture的實(shí)現(xiàn)類DefaultFuture永淌。下面看下DefaultFuture的get方法:
public Object get() throws RemotingException {
return get(timeout);
}
public Object get(int timeout) throws RemotingException {
if (timeout <= 0) {
timeout = Constants.DEFAULT_TIMEOUT;
}
if (!isDone()) {
long start = System.currentTimeMillis();
lock.lock();
try {
while (!isDone()) {
done.await(timeout, TimeUnit.MILLISECONDS);
if (isDone() || System.currentTimeMillis() - start > timeout) {
break;
}
}
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
lock.unlock();
}
if (!isDone()) {
throw new TimeoutException(sent > 0, channel, getTimeoutMessage(false));
}
}
return returnFromResponse();
}
明顯可以看到!isDone的話,線程await超時時間佩耳,阻塞這里等待回來的數(shù)據(jù)仰禀。用戶線程到這里就結(jié)束了。等待request返回蚕愤,并喚醒該線程答恶。
下面我看下request方法:
public ResponseFuture request(Object request, int timeout) throws RemotingException {
if (closed) {
throw new RemotingException(this.getLocalAddress(), null, "Failed to send request " + request + ", cause: The channel " + this + " is closed!");
}
// create request.
Request req = new Request();
req.setVersion("2.0.0");
req.setTwoWay(true);
req.setData(request);
DefaultFuture future = new DefaultFuture(channel, req, timeout);
try {
channel.send(req);
} catch (RemotingException e) {
future.cancel();
throw e;
}
return future;
}
到這里看到future=new DefaultFuture()了。然后調(diào)用了channel.send(req)萍诱,后面就是netty的框架了悬嗓,下節(jié)會介紹。
1.2 消費(fèi)者調(diào)用過程---NIO發(fā)送請求
這里主要是介紹:NettyChannel.send方法:
public void send(Object message, boolean sent) throws RemotingException {
super.send(message, sent);
boolean success = true;
int timeout = 0;
try {
ChannelFuture future = channel.write(message);
if (sent) {
timeout = getUrl().getPositiveParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
success = future.await(timeout);
}
Throwable cause = future.getCause();
if (cause != null) {
throw cause;
}
} catch (Throwable e) {
throw new RemotingException(this, "Failed to send message " + message + " to " + getRemoteAddress() + ", cause: " + e.getMessage(), e);
}
if (!success) {
throw new RemotingException(this, "Failed to send message " + message + " to " + getRemoteAddress()
+ "in timeout(" + timeout + "ms) limit");
}
}
追蹤下去裕坊,到了:
public static ChannelFuture write(Channel channel, Object message, SocketAddress remoteAddress) {
ChannelFuture future = future(channel);
channel.getPipeline().sendDownstream(
new DownstreamMessageEvent(channel, future, message, remoteAddress));
return future;
}
目前到這里包竹,我們的調(diào)用路徑如下:
Channels.write方法源碼如上,調(diào)用了sendDownstream方法籍凝。在追蹤下去周瞎,sendDownstream源碼如下:
void sendDownstream(DefaultChannelHandlerContext ctx, ChannelEvent e) {
if (e instanceof UpstreamMessageEvent) {
throw new IllegalArgumentException("cannot send an upstream event to downstream");
}
try {
((ChannelDownstreamHandler) ctx.getHandler()).handleDownstream(ctx, e);
} catch (Throwable t) {
// Unlike an upstream event, a downstream event usually has an
// incomplete future which is supposed to be updated by ChannelSink.
// However, if an exception is raised before the event reaches at
// ChannelSink, the future is not going to be updated, so we update
// here.
e.getFuture().setFailure(t);
notifyHandlerException(e, t);
}
}
可以看到handleDownstream方法,由指定handler執(zhí)行饵蒂。這里使用OneToOneEncoder執(zhí)行:
public void handleDownstream(
ChannelHandlerContext ctx, ChannelEvent evt) throws Exception {
if (!(evt instanceof MessageEvent)) {
ctx.sendDownstream(evt);
return;
}
MessageEvent e = (MessageEvent) evt;
Object originalMessage = e.getMessage();
Object encodedMessage = encode(ctx, e.getChannel(), originalMessage);
if (originalMessage == encodedMessage) {
ctx.sendDownstream(evt);
} else if (encodedMessage != null) {
write(ctx, e.getFuture(), encodedMessage, e.getRemoteAddress());
}
}
encode方法就是模板方法了声诸,實(shí)現(xiàn)由子類來實(shí)現(xiàn)。然后退盯,追蹤到在NettyCodecAdapter里面彼乌,有個內(nèi)部類InternalEncoder實(shí)現(xiàn)了encode方法。
@Sharable
private class InternalEncoder extends OneToOneEncoder {
@Override
protected Object encode(ChannelHandlerContext ctx, Channel ch, Object msg) throws Exception {
com.alibaba.dubbo.remoting.buffer.ChannelBuffer buffer =
com.alibaba.dubbo.remoting.buffer.ChannelBuffers.dynamicBuffer(1024);
NettyChannel channel = NettyChannel.getOrAddChannel(ch, url, handler);
try {
codec.encode(channel, buffer, msg);
} finally {
NettyChannel.removeChannelIfDisconnected(ch);
}
return ChannelBuffers.wrappedBuffer(buffer.toByteBuffer());
}
}
最后渊迁,這里的codec.encode(channel, buffer, msg)委托給了DubboCountCodec.encode
總結(jié):
這節(jié)內(nèi)容把消費(fèi)者的send請求和編碼慰照,序列化等底層操作結(jié)合起來了。不能追蹤到netty層就不向下了琉朽,其實(shí)dubbo拓展了很多netty的類毒租。導(dǎo)致雖然調(diào)用已經(jīng)走到netty框架,但是很多業(yè)務(wù)處理箱叁,netty還需要回調(diào)netty拓展的功能墅垮。這種細(xì)節(jié)還是不能馬虎,需要搞懂蝌蹂。舉個例子:org.jboss.netty.handler.codec.oneone.OneToOneEncoder是netty的抽象類噩斟,必須由子類實(shí)現(xiàn),然后netty在調(diào)用的時候孤个,會調(diào)用dubbo實(shí)現(xiàn)的子類剃允。這里是InternalEncoder類。環(huán)環(huán)相扣=_=
緊接著上面的encode結(jié)束后齐鲤,調(diào)用了:write(ctx, e.getFuture(), encodedMessage, e.getRemoteAddress())方法斥废,繼續(xù)追蹤:
public void sendDownstream(ChannelEvent e) {
DefaultChannelHandlerContext prev = getActualDownstreamContext(this.prev);
if (prev == null) {
try {
getSink().eventSunk(DefaultChannelPipeline.this, e);
} catch (Throwable t) {
notifyHandlerException(e, t);
}
} else {
DefaultChannelPipeline.this.sendDownstream(prev, e);
}
}
該方法屬于DefaultChannelPipeline.java類,prev必然為null给郊,然后調(diào)用 getSink().eventSunk(DefaultChannelPipeline.this, e)方法牡肉,繼續(xù)追蹤到NioClientSocketPipelineSink類,如下:
public void eventSunk(
ChannelPipeline pipeline, ChannelEvent e) throws Exception {
if (e instanceof ChannelStateEvent) {
ChannelStateEvent event = (ChannelStateEvent) e;
NioClientSocketChannel channel =
(NioClientSocketChannel) event.getChannel();
ChannelFuture future = event.getFuture();
ChannelState state = event.getState();
Object value = event.getValue();
switch (state) {
case OPEN:
if (Boolean.FALSE.equals(value)) {
channel.worker.close(channel, future);
}
break;
case BOUND:
if (value != null) {
bind(channel, future, (SocketAddress) value);
} else {
channel.worker.close(channel, future);
}
break;
case CONNECTED:
if (value != null) {
connect(channel, future, (SocketAddress) value);
} else {
channel.worker.close(channel, future);
}
break;
case INTEREST_OPS:
channel.worker.setInterestOps(channel, future, ((Integer) value).intValue());
break;
}
} else if (e instanceof MessageEvent) {
MessageEvent event = (MessageEvent) e;
NioSocketChannel channel = (NioSocketChannel) event.getChannel();
boolean offered = channel.writeBuffer.offer(event);
assert offered;
channel.worker.writeFromUserCode(channel);
}
}
到了這里淆九,終于看到我們想要的事件了统锤。這里根據(jù)事件類型選擇了MessageEvent毛俏。看到調(diào)用了writeFromUserCode方法饲窿,在后面就只有netty的代碼了煌寇,調(diào)用了write0方法。這里就不在向下追蹤了逾雄,有興趣的可以自己看netty阀溶。
調(diào)用鏈路如下,緊接著上面的圖:
總結(jié):到了writeFromUserCode這里鸦泳,總算把dubbo請求發(fā)送給了網(wǎng)絡(luò)银锻。剩下就是網(wǎng)絡(luò)包通過TCP/IP協(xié)議,傳到提供者ip那里去了做鹰。在消費(fèi)者發(fā)送請求的過程中击纬,一直使用了是一個線程,在線程執(zhí)行完request的send操作后誊垢,同步得到一個Future掉弛,然后一直阻塞在Future.get()方法上,等待返回值喂走。其實(shí)這個調(diào)用過程抽象出來就是:
1.服務(wù)治理選出一個最佳的提供者ip
2.執(zhí)行invoker的各種filter殃饿,類似AOP功能
3.構(gòu)造請求request,然后encode請求參數(shù)芋肠,便于網(wǎng)絡(luò)傳輸
4.把序列化好后的二進(jìn)制流傳遞給netty
5.netty把數(shù)據(jù)發(fā)送到網(wǎng)絡(luò)上
二乎芳、提供者接收請求
參考文檔:http://dubbo.apache.org/zh-cn/docs/source_code_guide/service-invoking-process.html
提供者會一直在dubbo指定的端口上,while(true)監(jiān)聽channel中有沒有數(shù)據(jù)達(dá)到帖池。通過netty的reactor模式奈惑,netty的IO線程監(jiān)聽有數(shù)據(jù)達(dá)到,然后看是什么事件睡汹,select喚起對應(yīng)的事件處理器肴甸。事件處理,由單獨(dú)的線程池去完成囚巴。先看看原在,在提供者export服務(wù)的時候,bind的代碼:
public Channel bind(final SocketAddress localAddress) {
if (localAddress == null) {
throw new NullPointerException("localAddress");
}
final BlockingQueue<ChannelFuture> futureQueue =
new LinkedBlockingQueue<ChannelFuture>();
ChannelHandler binder = new Binder(localAddress, futureQueue);
ChannelHandler parentHandler = getParentHandler();
ChannelPipeline bossPipeline = pipeline();
bossPipeline.addLast("binder", binder);
if (parentHandler != null) {
bossPipeline.addLast("userHandler", parentHandler);
}
Channel channel = getFactory().newChannel(bossPipeline);
// Wait until the future is available.
ChannelFuture future = null;
boolean interrupted = false;
do {
try {
future = futureQueue.poll(Integer.MAX_VALUE, TimeUnit.SECONDS);
} catch (InterruptedException e) {
interrupted = true;
}
} while (future == null);
if (interrupted) {
Thread.currentThread().interrupt();
}
// Wait for the future.
future.awaitUninterruptibly();
if (!future.isSuccess()) {
future.getChannel().close().awaitUninterruptibly();
throw new ChannelException("Failed to bind to: " + localAddress, future.getCause());
}
return channel;
}
其中有段代碼:
do {
try {
future = futureQueue.poll(Integer.MAX_VALUE, TimeUnit.SECONDS);
} catch (InterruptedException e) {
interrupted = true;
}
} while (future == null);
futureQueue會阻塞獲取channel上的可讀數(shù)據(jù)彤叉,如果有數(shù)據(jù)達(dá)到庶柿,那么就喚醒監(jiān)聽線程,調(diào)用decode對數(shù)據(jù)進(jìn)行解碼秽浇。
bind前面還有段代碼:
bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
public ChannelPipeline getPipeline() {
NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);
ChannelPipeline pipeline = Channels.pipeline();
/*int idleTimeout = getIdleTimeout();
if (idleTimeout > 10000) {
pipeline.addLast("timer", new IdleStateHandler(timer, idleTimeout / 1000, 0, 0));
}*/
pipeline.addLast("decoder", adapter.getDecoder());
pipeline.addLast("encoder", adapter.getEncoder());
pipeline.addLast("handler", nettyHandler);
return pipeline;
}
});
// bind
channel = bootstrap.bind(getBindAddress());
pipeline會設(shè)置channel的處理器鏈浮庐。在接收請求后,decoder結(jié)束后柬焕,下個處理器是handler审残。然后下面就分析梭域,nettyHandler。
2.1 服務(wù)提供者暴露服務(wù)的Server初始化過程
下面以dubbo為例:
DubboProtocol.export方法的源碼如下:
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
URL url = invoker.getUrl();
// export service.
String key = serviceKey(url);
DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);
exporterMap.put(key, exporter);
//export an stub service for dispatching event
Boolean isStubSupportEvent = url.getParameter(Constants.STUB_EVENT_KEY, Constants.DEFAULT_STUB_EVENT);
Boolean isCallbackservice = url.getParameter(Constants.IS_CALLBACK_SERVICE, false);
if (isStubSupportEvent && !isCallbackservice) {
String stubServiceMethods = url.getParameter(Constants.STUB_EVENT_METHODS_KEY);
if (stubServiceMethods == null || stubServiceMethods.length() == 0) {
if (logger.isWarnEnabled()) {
logger.warn(new IllegalStateException("consumer [" + url.getParameter(Constants.INTERFACE_KEY) +
"], has set stubproxy support event ,but no stub methods founded."));
}
} else {
stubServiceMethodsMap.put(url.getServiceKey(), stubServiceMethods);
}
}
openServer(url);
optimizeSerialization(url);
return exporter;
}
創(chuàng)建server的核心方法在openServer维苔,如下:
private void openServer(URL url) {
// find server.
String key = url.getAddress();
//client can export a service which's only for server to invoke
boolean isServer = url.getParameter(Constants.IS_SERVER_KEY, true);
if (isServer) {
ExchangeServer server = serverMap.get(key);
if (server == null) {
serverMap.put(key, createServer(url));
} else {
// server supports reset, use together with override
server.reset(url);
}
}
}
核心方法在createServer碰辅,如下:
private ExchangeServer createServer(URL url) {
// send readonly event when server closes, it's enabled by default
url = url.addParameterIfAbsent(Constants.CHANNEL_READONLYEVENT_SENT_KEY, Boolean.TRUE.toString());
// enable heartbeat by default
url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT));
String str = url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_SERVER);
if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str))
throw new RpcException("Unsupported server type: " + str + ", url: " + url);
url = url.addParameter(Constants.CODEC_KEY, DubboCodec.NAME);
ExchangeServer server;
try {
server = Exchangers.bind(url, requestHandler);
} catch (RemotingException e) {
throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e);
}
str = url.getParameter(Constants.CLIENT_KEY);
if (str != null && str.length() > 0) {
Set<String> supportedTypes = ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions();
if (!supportedTypes.contains(str)) {
throw new RpcException("Unsupported client type: " + str);
}
}
return server;
}
核心方法在Exchangers.bind方法,其中requestHandler是dubboProtocol類內(nèi)部重寫的內(nèi)部類介时。它是提供者bean接收方法要處理的最底層處理邏輯。下面看看它的核心方法凌彬,reply方法沸柔,如下:
public Object reply(ExchangeChannel channel, Object message) throws RemotingException {
if (message instanceof Invocation) {
Invocation inv = (Invocation) message;
Invoker<?> invoker = getInvoker(channel, inv);
// need to consider backward-compatibility if it's a callback
if (Boolean.TRUE.toString().equals(inv.getAttachments().get(IS_CALLBACK_SERVICE_INVOKE))) {
String methodsStr = invoker.getUrl().getParameters().get("methods");
boolean hasMethod = false;
if (methodsStr == null || methodsStr.indexOf(",") == -1) {
hasMethod = inv.getMethodName().equals(methodsStr);
} else {
String[] methods = methodsStr.split(",");
for (String method : methods) {
if (inv.getMethodName().equals(method)) {
hasMethod = true;
break;
}
}
}
if (!hasMethod) {
logger.warn(new IllegalStateException("The methodName " + inv.getMethodName() + " not found in callback service interface ,invoke will be ignored. please update the api interface. url is:" + invoker.getUrl()) + " ,invocation is :" + inv);
return null;
}
}
RpcContext.getContext().setRemoteAddress(channel.getRemoteAddress());
return invoker.invoke(inv);
}
throw new RemotingException(channel, "Unsupported request: " + message == null ? null : (message.getClass().getName() + ": " + message) + ", channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress());
}
把message轉(zhuǎn)換為dubbo 語義下的Invocation,該Invocation包含了調(diào)用方法铲敛,調(diào)用參數(shù)等等全面的信息褐澎,足夠運(yùn)行執(zhí)行了。下面我們在看下這個real handler上面包裝了哪些其他handler伐蒋,并成為了一個怎樣的過濾器鏈工三?繼續(xù)看bind方法。一路追蹤下去先鱼,看下HeaderExchanger的bind方法俭正,如下:
public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))));
}
看到第一個構(gòu)造過濾器鏈的handler了,就是HeaderExchangeHandler焙畔。繼續(xù)跟進(jìn)Transporters.bind方法掸读,如下:
public static Server bind(URL url, ChannelHandler... handlers) throws RemotingException {
if (url == null) {
throw new IllegalArgumentException("url == null");
}
if (handlers == null || handlers.length == 0) {
throw new IllegalArgumentException("handlers == null");
}
ChannelHandler handler;
if (handlers.length == 1) {
handler = handlers[0];
} else {
handler = new ChannelHandlerDispatcher(handlers);
}
return getTransporter().bind(url, handler);
}
繼續(xù)跟蹤下去,到new NettyServer宏多,handler還是上面的HeaderExchangeHandler儿惫。它的構(gòu)造方法如下:
public NettyServer(URL url, ChannelHandler handler) throws RemotingException {
super(url, ChannelHandlers.wrap(handler, ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME)));
}
在創(chuàng)建nettyServer的時候,會調(diào)用ChannelHandlers.wrap方法伸但,構(gòu)造一個handler的過濾器模式肾请。代碼如下:
public class ChannelHandlers {
private static ChannelHandlers INSTANCE = new ChannelHandlers();
protected ChannelHandlers() {
}
public static ChannelHandler wrap(ChannelHandler handler, URL url) {
return ChannelHandlers.getInstance().wrapInternal(handler, url);
}
protected static ChannelHandlers getInstance() {
return INSTANCE;
}
static void setTestingChannelHandlers(ChannelHandlers instance) {
INSTANCE = instance;
}
protected ChannelHandler wrapInternal(ChannelHandler handler, URL url) {
return new MultiMessageHandler(new HeartbeatHandler(ExtensionLoader.getExtensionLoader(Dispatcher.class)
.getAdaptiveExtension().dispatch(handler, url)));
}
}
可以看到wrap方法調(diào)用了wrapInternal方法。該方法更胖,先new MultiMessageHandler铛铁,然后在HeartBeatHandler,再Dispatcher的Handler函喉。而它是通過SPI去指定的避归。所以,這個過濾器鏈就構(gòu)成了在執(zhí)行的時候管呵,按照從前向后的順序梳毙,執(zhí)行鏈。
總結(jié):整個過濾器鏈算是構(gòu)造好了捐下,MultiMessageHandler -> HeartBeatHandler -> AllDispatcher -> HeaderExchanger -> dubbo real handler
MultiMessageHandler這些Handler都是使用了裝飾器模式账锹,對傳入的handler進(jìn)行了裝飾行為萌业。
這一節(jié)分析,其實(shí)和第四節(jié)分析的很像奸柬。因?yàn)樗麄兲幚砟P投际菍Φ鹊纳辍L峁┱遰eceived request和消費(fèi)者received reponse,兩者過程其實(shí)都很相似廓奕。都是把realHandler一層一層封裝抱婉,最終給到netty的pepiline管道。
三桌粉、提供者發(fā)送結(jié)果
上面已經(jīng)分析了提供者接收到nettp的tcp數(shù)據(jù)message后蒸绩。給到MultiMessageHandler.received方法,然后在給到HeartBeatHandler.received铃肯,再給到AllChannelHandler.received患亿。我們就來看下AllChannelHandler的received方法。如下:
@Override
public void received(Channel channel, Object message) throws RemotingException {
ExecutorService executorService = getExecutorService();
try {
executorService.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
} catch (Throwable t) {
//TODO 臨時解決線程池滿后異常信息無法發(fā)送到對端的問題押逼。待重構(gòu)
//fix 線程池滿了拒絕調(diào)用不返回步藕,導(dǎo)致消費(fèi)者一直等待超時
if(message instanceof Request && t instanceof RejectedExecutionException){
Request request = (Request)message;
if(request.isTwoWay()){
String msg = "Server side(" + url.getIp() + "," + url.getPort() + ") threadpool is exhausted, " +
"detail msg:" + t.getMessage();
Response response = new Response(request.getId(), request.getVersion());
response.setStatus(Response.SERVER_THREADPOOL_EXHAUSTED_ERROR);
response.setErrorMessage(msg);
channel.send(response);
return;
}
}
throw new ExecutionException(message, channel, getClass() + " error when process received event.", t);
}
}
看到這行:
executorService.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
直接把netty線程傳遞過來的message傳遞給線程池的ChannelEventRunnable,然后就返回了挑格。剩下的由dubbo線程池處理咙冗,這里如果dubbo線程池滿了,是無法處理任何請求的恕齐。咱們看下ChannelEventRunnable的run方法乞娄,看看它到底干了啥:
@Override
public void run() {
switch (state) {
case CONNECTED:
try {
handler.connected(channel);
} catch (Exception e) {
logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel, e);
}
break;
case DISCONNECTED:
try {
handler.disconnected(channel);
} catch (Exception e) {
logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel, e);
}
break;
case SENT:
try {
handler.sent(channel, message);
} catch (Exception e) {
logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel
+ ", message is " + message, e);
}
break;
case RECEIVED:
try {
handler.received(channel, message);
} catch (Exception e) {
logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel
+ ", message is " + message, e);
}
break;
case CAUGHT:
try {
handler.caught(channel, exception);
} catch (Exception e) {
logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel
+ ", message is: " + message + ", exception is " + exception, e);
}
break;
default:
logger.warn("unknown state: " + state + ", message is " + message);
}
}
很明顯,這次事件是接收請求显歧,那么就是RECEIVED了仪或。看到執(zhí)行了handler.received方法士骤。這個handler我們上面分析了范删,是DecodeHandler(初始化的時候指定的)。進(jìn)去看下:
public void received(Channel channel, Object message) throws RemotingException {
if (message instanceof Decodeable) {
decode(message);
}
if (message instanceof Request) {
decode(((Request) message).getData());
}
if (message instanceof Response) {
decode(((Response) message).getResult());
}
handler.received(channel, message);
}
肯定會走到decode(request)這里拷肌。然后解碼完畢后到旦,執(zhí)行handler。這里的handler是HeaderExchangeHandler巨缘,也是初始化就指定的了添忘。進(jìn)去看下它的received方法:
public void received(Channel channel, Object message) throws RemotingException {
channel.setAttribute(KEY_READ_TIMESTAMP, System.currentTimeMillis());
ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel);
try {
if (message instanceof Request) {
// handle request.
Request request = (Request) message;
if (request.isEvent()) {
handlerEvent(channel, request);
} else {
if (request.isTwoWay()) {
Response response = handleRequest(exchangeChannel, request);
channel.send(response);
} else {
handler.received(exchangeChannel, request.getData());
}
}
} else if (message instanceof Response) {
handleResponse(channel, (Response) message);
} else if (message instanceof String) {
if (isClientSide(channel)) {
Exception e = new Exception("Dubbo client can not supported string message: " + message + " in channel: " + channel + ", url: " + channel.getUrl());
logger.error(e.getMessage(), e);
} else {
String echo = handler.telnet(channel, (String) message);
if (echo != null && echo.length() > 0) {
channel.send(echo);
}
}
} else {
handler.received(exchangeChannel, message);
}
} finally {
HeaderExchangeChannel.removeChannelIfDisconnected(channel);
}
}
對于request,最終會走到:handler.received(exchangeChannel, request.getData())這里若锁。
繼續(xù)handler搁骑,這個handler就是dubboProtocol的內(nèi)部匿名類requestHandler:
@Override
public void received(Channel channel, Object message) throws RemotingException {
if (message instanceof Invocation) {
reply((ExchangeChannel) channel, message);
} else {
super.received(channel, message);
}
}
繼續(xù)看reply方法:
@Override
public Object reply(ExchangeChannel channel, Object message) throws RemotingException {
if (message instanceof Invocation) {
Invocation inv = (Invocation) message;
Invoker<?> invoker = getInvoker(channel, inv);
//如果是callback 需要處理高版本調(diào)用低版本的問題
if (Boolean.TRUE.toString().equals(inv.getAttachments().get(IS_CALLBACK_SERVICE_INVOKE))) {
String methodsStr = invoker.getUrl().getParameters().get("methods");
boolean hasMethod = false;
if (methodsStr == null || !methodsStr.contains(",")) {
hasMethod = inv.getMethodName().equals(methodsStr);
} else {
String[] methods = methodsStr.split(",");
for (String method : methods) {
if (inv.getMethodName().equals(method)) {
hasMethod = true;
break;
}
}
}
if (!hasMethod) {
logger.warn(new IllegalStateException("The methodName " + inv.getMethodName() + " not found in callback service interface ,invoke will be ignored. please update the api interface. url is:" + invoker.getUrl()) + " ,invocation is :" + inv);
return null;
}
}
RpcContext.getContext().setRemoteAddress(channel.getRemoteAddress());
return invoker.invoke(inv);
}
throw new RemotingException(channel, "Unsupported request: "
+ (message == null ? null : (message.getClass().getName() + ": " + message))
+ ", channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress());
}
getInvoke得到實(shí)現(xiàn)類的代理類,然后調(diào)用實(shí)現(xiàn)類的代理類,執(zhí)行真正的業(yè)務(wù)邏輯得到結(jié)果仲器。那么這個結(jié)果煤率,怎么發(fā)送給消費(fèi)者呢?猜測肯定是用send方法發(fā)送乏冀。但是在哪里呢蝶糯?
答案就在我們上面看到的HeaderExchangeHandler的received方法里面。再看下:
public void received(Channel channel, Object message) throws RemotingException {
channel.setAttribute(KEY_READ_TIMESTAMP, System.currentTimeMillis());
ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel);
try {
if (message instanceof Request) {
// handle request.
Request request = (Request) message;
if (request.isEvent()) {
handlerEvent(channel, request);
} else {
if (request.isTwoWay()) {
Response response = handleRequest(exchangeChannel, request);
channel.send(response);
} else {
handler.received(exchangeChannel, request.getData());
}
}
} else if (message instanceof Response) {
handleResponse(channel, (Response) message);
} else if (message instanceof String) {
if (isClientSide(channel)) {
Exception e = new Exception("Dubbo client can not supported string message: " + message + " in channel: " + channel + ", url: " + channel.getUrl());
logger.error(e.getMessage(), e);
} else {
String echo = handler.telnet(channel, (String) message);
if (echo != null && echo.length() > 0) {
channel.send(echo);
}
}
} else {
handler.received(exchangeChannel, message);
}
} finally {
HeaderExchangeChannel.removeChannelIfDisconnected(channel);
}
}
看到如下代碼:
if (request.isTwoWay()) {
Response response = handleRequest(exchangeChannel, request);
channel.send(response);
} else {
handler.received(exchangeChannel, request.getData());
}
如果是雙向通行辆沦,表示消費(fèi)者正在等著結(jié)果呢昼捍。所以,這里handleRequest結(jié)束后众辨,就要把response結(jié)果send出去端三。通過channel.send出去。下面我們就要分析這個channel是怎么初始化的鹃彻,以及怎么傳遞進(jìn)來的。
這個channel是在NettyHandler收到message的時候生成的妻献。
NettyHandler.messageReceived方法的源碼如下:
@Override
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
NettyChannel channel = NettyChannel.getOrAddChannel(ctx.getChannel(), url, handler);
try {
handler.received(channel, e.getMessage());
} finally {
NettyChannel.removeChannelIfDisconnected(ctx.getChannel());
}
}
這個channel肯定就是netty用來和消費(fèi)者通信的socket蛛株。channel是流,里面有數(shù)據(jù)在流淌著育拨,用戶可以從這channel里面拿到數(shù)據(jù)谨履。這個channel也提供發(fā)送接收數(shù)據(jù)的能力。 channel.send(response)方法熬丧,我們就知道了這個channel就是nettyChannel笋粟。源碼如下:
public void send(Object message, boolean sent) throws RemotingException {
super.send(message, sent);
boolean success = true;
int timeout = 0;
try {
ChannelFuture future = channel.write(message);
if (sent) {
timeout = getUrl().getPositiveParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
success = future.await(timeout);
}
Throwable cause = future.getCause();
if (cause != null) {
throw cause;
}
} catch (Throwable e) {
throw new RemotingException(this, "Failed to send message " + message + " to " + getRemoteAddress()
+ ", cause: " + e.getMessage() + ", may be graceful shutdown problem (2.5.3 or 3.1.*)"
+ ", see http://git.caimi-inc.com/middleware/hokage/issues/14",
e);
}
if (!success) {
throw new RemotingException(this, "Failed to send message " + message + " to " + getRemoteAddress()
+ "in timeout(" + timeout + "ms) limit");
}
}
只要通過channel.write(message),把response的數(shù)據(jù)write到channel就完事了析蝴。我們也知道channel是可讀可寫的
到此結(jié)束害捕,提供方接收請求 -> 處理請求 -> 發(fā)送請求,都全部分析結(jié)束闷畸。
這里多一嘴尝盼,其實(shí)消費(fèi)者調(diào)用請求的send也是走這里的。不信可以驗(yàn)證佑菩,為了簡單起盾沫,我們直接定位到DubboInvoker的doInvoke方法:
@Override
protected Result doInvoke(final Invocation invocation) throws Throwable {
RpcInvocation inv = (RpcInvocation) invocation;
final String methodName = RpcUtils.getMethodName(invocation);
inv.setAttachment(Constants.PATH_KEY, getUrl().getPath());
inv.setAttachment(Constants.VERSION_KEY, version);
ExchangeClient currentClient;
if (clients.length == 1) {
currentClient = clients[0];
} else {
currentClient = clients[index.getAndIncrement() % clients.length];
}
try {
boolean isAsync = RpcUtils.isAsync(getUrl(), invocation);
boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
int timeout = getUrl().getMethodParameter(methodName, Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
if (isOneway) {
boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
currentClient.send(inv, isSent);
RpcContext.getContext().setFuture(null);
return new RpcResult();
} else if (isAsync) {
ResponseFuture future = currentClient.request(inv, timeout);
RpcContext.getContext().setFuture(new FutureAdapter<Object>(future));
return new RpcResult();
} else {
RpcContext.getContext().setFuture(null);
return (Result) currentClient.request(inv, timeout).get();
}
} catch (TimeoutException e) {
throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
} catch (RemotingException e) {
throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
}
}
看到最后一段代碼:
else {
RpcContext.getContext().setFuture(null);
return (Result) currentClient.request(inv, timeout).get();
}
追蹤到HeaderExchangeChannel.request方法:
public ResponseFuture request(Object request, int timeout) throws RemotingException {
if (closed) {
throw new RemotingException(this.getLocalAddress(), null, "Failed to send request " + request + ", cause: The channel " + this + " is closed!");
}
// create request.
Request req = new Request();
req.setVersion("2.0.0");
req.setTwoWay(true);
req.setData(request);
DefaultFuture future = new DefaultFuture(channel, req, timeout);
try {
channel.send(req);
} catch (RemotingException e) {
future.cancel();
throw e;
}
return future;
}
channel.send最后又到了nettyChannel.send方法。和剛剛發(fā)送結(jié)果的send是一模一樣殿漠。都是往channel上write數(shù)據(jù)赴精。這個數(shù)據(jù)可以是request也可以是response,對channel來說無所謂绞幌。它僅僅是雙方用來通信的管道蕾哟,里面是啥數(shù)據(jù),兩邊是啥角色,channel都不關(guān)心渐苏。
《完》
四掀潮、消費(fèi)者接收結(jié)果
先看下消費(fèi)者請求接收結(jié)果的堆棧。我們把斷點(diǎn)設(shè)置到AllChannelHandler.received方法的第一行琼富。如下:
重下往上看仪吧,一直到SimpleChannelHandler.handleUpstream都是netty的代碼,在往上一層就是dubbo的代碼了鞠眉∈硎螅可能有人會問,你為啥就知道斷點(diǎn)到這里呢械蹋?我們看下消費(fèi)者初始化過程就清楚了出皇。這里是網(wǎng)絡(luò)相關(guān),我們猜想建立網(wǎng)絡(luò)連接一定在消費(fèi)者初始化client的時候哗戈。所以我們一下子就可以定位到DubboProtocol的refer方法中的getClients(url)郊艘。看下getClients方法唯咬,最終到initClient方法纱注。看到:
client = Exchangers.connect(url, requestHandler);
好了胆胰,我們現(xiàn)在就要研究這個方法了狞贱。requestHandler肯定就是出站數(shù)據(jù)和入站數(shù)據(jù),我們要如何處理的具體hanlder了蜀涨。你可能有疑問瞎嬉?我們看下這個handler實(shí)現(xiàn)的接口好了,它實(shí)現(xiàn)了ChannelHandler接口厚柳,其中該接口有sent和received方法氧枣,就代表了出站和入站。還有不懂草娜,自己去研究下就會了挑胸。
進(jìn)入connect源碼看下:
public static ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {
if (url == null) {
throw new IllegalArgumentException("url == null");
}
if (handler == null) {
throw new IllegalArgumentException("handler == null");
}
url = url.addParameterIfAbsent(Constants.CODEC_KEY, "exchange");
return getExchanger(url).connect(url, handler);
}
看到connect方法,繼續(xù)向下:
public ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {
return new HeaderExchangeClient(Transporters.connect(url, new DecodeHandler(new HeaderExchangeHandler(handler))), true);
}
這里我們看到realHandler先被HeaderExchangeHandler包裝宰闰,再被DecodeHandler包裝茬贵。有人這里會奇怪,這些handler在上面的堆棧也沒出現(xiàn)啊移袍。對解藻,說明你已經(jīng)明白了。既然上面堆棧沒出現(xiàn)這里包裝的類葡盗,那是不是推測Transporters.connect里面肯定又做了很多包裝螟左。下面我們來驗(yàn)證下啡浊,繼續(xù)向下追蹤。
public static Client connect(URL url, ChannelHandler... handlers) throws RemotingException {
if (url == null) {
throw new IllegalArgumentException("url == null");
}
ChannelHandler handler;
if (handlers == null || handlers.length == 0) {
handler = new ChannelHandlerAdapter();
} else if (handlers.length == 1) {
handler = handlers[0];
} else {
handler = new ChannelHandlerDispatcher(handlers);
}
return getTransporter().connect(url, handler);
}
看到handler原封不動的傳給了getTransporter().connect方法胶背。我們繼續(xù)推測巷嚣,包裝發(fā)生在Transporter.connect里面。繼續(xù)追蹤:
public Client connect(URL url, ChannelHandler listener) throws RemotingException {
return new NettyClient(url, listener);
}
追蹤到NettyTransporter钳吟,很簡單廷粒。繼續(xù)往下看:
public NettyClient(final URL url, final ChannelHandler handler) throws RemotingException {
super(url, wrapChannelHandler(url, handler));
}
是不是有點(diǎn)感覺了?wrapChannelHandler方法是不是做了包裝红且。繼續(xù)往下看:
protected static ChannelHandler wrapChannelHandler(URL url, ChannelHandler handler) {
url = ExecutorUtil.setThreadName(url, CLIENT_THREAD_POOL_NAME);
url = url.addParameterIfAbsent(Constants.THREADPOOL_KEY, Constants.DEFAULT_CLIENT_THREADPOOL);
return ChannelHandlers.wrap(handler, url);
}
繼續(xù)看ChannelHandlers.wrap(handler, url):
public class ChannelHandlers {
private static ChannelHandlers INSTANCE = new ChannelHandlers();
protected ChannelHandlers() {
}
public static ChannelHandler wrap(ChannelHandler handler, URL url) {
return ChannelHandlers.getInstance().wrapInternal(handler, url);
}
protected static ChannelHandlers getInstance() {
return INSTANCE;
}
static void setTestingChannelHandlers(ChannelHandlers instance) {
INSTANCE = instance;
}
protected ChannelHandler wrapInternal(ChannelHandler handler, URL url) {
return new MultiMessageHandler(new HeartbeatHandler(ExtensionLoader.getExtensionLoader(Dispatcher.class)
.getAdaptiveExtension().dispatch(handler, url)));
}
}
warp方法調(diào)用了個單例坝茎,然后調(diào)用了wrapInternal∠痉看看里面的handler嗤放,是不是很熟悉。最終傳遞給netty就是這個MultiMessageHandler壁酬,然后是HeartbeatHandler次酌,里面那個動態(tài)SPI就是AllChannelHandler∮咔牵可能有人會問為啥和措?我們就來看下dispather的Adaptive代理類長啥樣子。如下:
package com.alibaba.dubbo.remoting;
import com.alibaba.dubbo.common.URL;
import com.alibaba.dubbo.common.extension.ExtensionLoader;
public class Dispatcher$Adpative implements Dispatcher {
public Dispatcher$Adpative() {
}
public ChannelHandler dispatch(ChannelHandler var1, URL var2) {
if (var2 == null) {
throw new IllegalArgumentException("url == null");
} else {
String var4 = var2.getParameter("dispatcher", var2.getParameter("dispather", var2.getParameter("channel.handler", "all")));
if (var4 == null) {
throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.remoting.Dispatcher) name from url(" + var2.toString() + ") use keys([dispatcher, dispather, channel.handler])");
} else {
Dispatcher var5 = (Dispatcher)ExtensionLoader.getExtensionLoader(Dispatcher.class).getExtension(var4);
return var5.dispatch(var1, var2);
}
}
}
}
看到all了沒蜕煌,它是默認(rèn)配置,用javassist代理生成的诬留。
到這里只分析道MultiMessageHandler以后的handler斜纪,那么它前面的handler是怎么來的?
繼續(xù)new NettyClient:
public NettyClient(final URL url, final ChannelHandler handler) throws RemotingException {
super(url, wrapChannelHandler(url, handler));
}
這里super(url, MultiMessageHandler)文兑,進(jìn)入super看下:
public AbstractClient(URL url, ChannelHandler handler) throws RemotingException {
super(url, handler);
send_reconnect = url.getParameter(Constants.SEND_RECONNECT_KEY, false);
shutdown_timeout = url.getParameter(Constants.SHUTDOWN_TIMEOUT_KEY, Constants.DEFAULT_SHUTDOWN_TIMEOUT);
//默認(rèn)重連間隔2s盒刚,1800表示1小時warning一次.
reconnect_warning_period = url.getParameter("reconnect.waring.period", 1800);
try {
doOpen();
} catch (Throwable t) {
close();
throw new RemotingException(url.toInetSocketAddress(), null,
"Failed to start " + getClass().getSimpleName() + " " + NetUtils.getLocalAddress()
+ " connect to the server " + getRemoteAddress() + ", cause: " + t.getMessage(), t);
}
try {
// connect.
.....//省略
}
看到doOpen方法,進(jìn)去看下:
@Override
protected void doOpen() throws Throwable {
NettyHelper.setNettyLoggerFactory();
bootstrap = new ClientBootstrap(channelFactory);
// config
// @see org.jboss.netty.channel.socket.SocketChannelConfig
bootstrap.setOption("keepAlive", true);
bootstrap.setOption("tcpNoDelay", true);
bootstrap.setOption("connectTimeoutMillis", getTimeout());
final NettyHandler nettyHandler = new NettyHandler(getUrl(), this);
bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
public ChannelPipeline getPipeline() {
NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyClient.this);
ChannelPipeline pipeline = Channels.pipeline();
pipeline.addLast("decoder", adapter.getDecoder());
pipeline.addLast("encoder", adapter.getEncoder());
pipeline.addLast("handler", nettyHandler);
return pipeline;
}
});
}
看到有句代碼:
final NettyHandler nettyHandler = new NettyHandler(getUrl(), this);
this就是當(dāng)前NettyClient绿贞,它也是ChannelHandler的實(shí)現(xiàn)因块。它被NettyHandler包裝起來,NettyHandler繼承了然后就把它給了SimpleChannelHandler籍铁,最終傳遞給了netty的pipeline管道∥猩希現(xiàn)在終于把dubbo的hanlder和netty的handler銜接起來了。
總結(jié)下handler的包裝過程:
DubboProtocol.requestHandler < HeaderExchangeHandler < DecodeHandler < AllChannelHandler < HeartbeatHandler < MultiMessageHandler < NettyHandler拒名。我們猜測netty調(diào)用dubbo的第一個handler吩愧,一定是NettyHandler。在對比下上面的調(diào)用堆棧驗(yàn)證下增显,netty -> dubbo的第一個調(diào)用handler確實(shí)是nettyHandler雁佳。
到這里就分析結(jié)束了。但是這里我要特別說明下,我當(dāng)時看到這些包裝鏈路的時候糖权,想當(dāng)然的就把斷點(diǎn)設(shè)置到最里面那個handler堵腹,就是我們真正處理邏輯的handler。就是那個requestHandler星澳,在DubboProtocol的內(nèi)部類疚顷。如下:
// 請求處理器
private ExchangeHandler requestHandler = new ExchangeHandlerAdapter() {
@Override
public Object reply(ExchangeChannel channel, Object message) throws RemotingException {
if (message instanceof Invocation) {
Invocation inv = (Invocation) message;
Invoker<?> invoker = getInvoker(channel, inv);
//如果是callback 需要處理高版本調(diào)用低版本的問題
if (Boolean.TRUE.toString().equals(inv.getAttachments().get(IS_CALLBACK_SERVICE_INVOKE))) {
String methodsStr = invoker.getUrl().getParameters().get("methods");
boolean hasMethod = false;
if (methodsStr == null || !methodsStr.contains(",")) {
hasMethod = inv.getMethodName().equals(methodsStr);
} else {
String[] methods = methodsStr.split(",");
for (String method : methods) {
if (inv.getMethodName().equals(method)) {
hasMethod = true;
break;
}
}
}
if (!hasMethod) {
logger.warn(new IllegalStateException("The methodName " + inv.getMethodName() + " not found in callback service interface ,invoke will be ignored. please update the api interface. url is:" + invoker.getUrl()) + " ,invocation is :" + inv);
return null;
}
}
RpcContext.getContext().setRemoteAddress(channel.getRemoteAddress());
return invoker.invoke(inv);
}
throw new RemotingException(channel, "Unsupported request: "
+ (message == null ? null : (message.getClass().getName() + ": " + message))
+ ", channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress());
}
@Override
public void received(Channel channel, Object message) throws RemotingException {
if (message instanceof Invocation) {
reply((ExchangeChannel) channel, message);
} else {
super.received(channel, message);
}
}
@Override
public void connected(Channel channel) throws RemotingException {
invoke(channel, Constants.ON_CONNECT_KEY);
}
@Override
public void disconnected(Channel channel) throws RemotingException {
if (logger.isInfoEnabled()) {
logger.info("disconnected from " + channel.getRemoteAddress() + ", url: " + channel.getUrl());
}
invoke(channel, Constants.ON_DISCONNECT_KEY);
}
private void invoke(Channel channel, String methodKey) {
Invocation invocation = createInvocation(channel.getUrl(), methodKey);
if (invocation != null) {
try {
received(channel, invocation);
} catch (Throwable t) {
logger.warn("Failed to invoke event method " + invocation.getMethodName() + "(), cause: " + t.getMessage(), t);
}
}
}
private Invocation createInvocation(URL url, String methodKey) {
String method = url.getParameter(methodKey);
if (method == null || method.length() == 0) {
return null;
}
RpcInvocation invocation = new RpcInvocation(method, new Class<?>[0], new Object[0]);
invocation.setAttachment(Constants.PATH_KEY, url.getPath());
invocation.setAttachment(Constants.GROUP_KEY, url.getParameter(Constants.GROUP_KEY));
invocation.setAttachment(Constants.INTERFACE_KEY, url.getParameter(Constants.INTERFACE_KEY));
invocation.setAttachment(Constants.VERSION_KEY, url.getParameter(Constants.VERSION_KEY));
if (url.getParameter(Constants.STUB_EVENT_KEY, false)) {
invocation.setAttachment(Constants.STUB_EVENT_KEY, Boolean.TRUE.toString());
}
return invocation;
}
};
它實(shí)現(xiàn)了ExchangeHandlerAdapter接口,里面當(dāng)然也有received方法募判。我當(dāng)時就把斷點(diǎn)設(shè)置到這里荡含,因?yàn)槲矣X得返回結(jié)果肯定最終會到達(dá)這里得到處理。結(jié)果就是我想錯了届垫,從它這里的實(shí)現(xiàn)也看得出释液,它也沒法處理response戒幔。我們這里來分析下:
上面調(diào)用堆椀挽看到現(xiàn)在請求結(jié)果已經(jīng)到達(dá)了AllChannelHandler雕欺,看下上面我們分析的包裝過程毕骡。下一個handler給到DecodeHandler章贞《贶裕看下DecodeHandler:
public void received(Channel channel, Object message) throws RemotingException {
if (message instanceof Decodeable) {
decode(message);
}
if (message instanceof Request) {
decode(((Request) message).getData());
}
if (message instanceof Response) {
decode(((Response) message).getResult());
}
handler.received(channel, message);
}
看到解碼后慧域,繼續(xù)執(zhí)行handler的received方法侄非。繼續(xù)向下看登淘,到HeaderExchangeHandler箫老。
public void received(Channel channel, Object message) throws RemotingException {
channel.setAttribute(KEY_READ_TIMESTAMP, System.currentTimeMillis());
ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel);
try {
if (message instanceof Request) {
// handle request.
Request request = (Request) message;
if (request.isEvent()) {
handlerEvent(channel, request);
} else {
if (request.isTwoWay()) {
Response response = handleRequest(exchangeChannel, request);
channel.send(response);
} else {
handler.received(exchangeChannel, request.getData());
}
}
} else if (message instanceof Response) {
handleResponse(channel, (Response) message);
} else if (message instanceof String) {
if (isClientSide(channel)) {
Exception e = new Exception("Dubbo client can not supported string message: " + message + " in channel: " + channel + ", url: " + channel.getUrl());
logger.error(e.getMessage(), e);
} else {
String echo = handler.telnet(channel, (String) message);
if (echo != null && echo.length() > 0) {
channel.send(echo);
}
}
} else {
handler.received(exchangeChannel, message);
}
} finally {
HeaderExchangeChannel.removeChannelIfDisconnected(channel);
}
}
看到message instanceof Response后,里面的handleResponse黔州。進(jìn)去看下:
static void handleResponse(Channel channel, Response response) throws RemotingException {
if (response != null && !response.isHeartbeat()) {
DefaultFuture.received(channel, response);
}
}
重點(diǎn)來了耍鬓,它壓根就沒把reponse委托給下面的requestHandler,自己給處理了流妻。牲蜀。。我想當(dāng)然的以為一定會傳遞給最后一個handler處理呢绅这,沒想到到倒數(shù)第二個handler后自己給處理完了涣达,不向下傳遞了。所以解決了我在dubboProtocol那里斷點(diǎn)证薇,死活斷點(diǎn)不上(異步線程的斷點(diǎn)不在這里討論)度苔。