 * Arthas 反編譯步驟:
 * 1. 啟動(dòng) Arthas
 *    java -jar arthas-boot.jar
 * 2. 輸入編號(hào)選擇進(jìn)程
 *    Arthas 啟動(dòng)后,會(huì)打印 Java 應(yīng)用進(jìn)程列表柿汛,如下:
 *    [1]: 11232 org.jetbrains.jps.cmdline.Launcher
 *    [2]: 22370 org.jetbrains.jps.cmdline.Launcher
 *    [3]: 22371 com.alibaba.dubbo.demo.consumer.Consumer
 *    [4]: 22362 com.alibaba.dubbo.demo.provider.Provider
 *    [5]: 2074 org.apache.zookeeper.server.quorum.QuorumPeerMain
 * 這里輸入編號(hào) 3冗酿,讓 Arthas 關(guān)聯(lián)到啟動(dòng)類為 com.....Consumer 的 Java 進(jìn)程上
 * 3. 由于 Demo 項(xiàng)目中只有一個(gè)服務(wù)接口,因此此接口的代理類類名為 proxy0络断,此時(shí)使用 sc 命令搜索這個(gè)類名裁替。
 *    $ sc *.proxy0
 *    com.alibaba.dubbo.common.bytecode.proxy0
 * 4. 使用 jad 命令反編譯 com.alibaba.dubbo.common.bytecode.proxy0
 *    $ jad com.alibaba.dubbo.common.bytecode.proxy0
 * 更多使用方法請(qǐng)參考 Arthas 官方文檔:
 *   https://alibaba.github.io/arthas/quick-start.html
public class proxy0 implements ClassGenerator.DC, EchoService, DemoService {
    // 方法數(shù)組
    public static Method[] methods;
    private InvocationHandler handler;

    public proxy0(InvocationHandler invocationHandler) {
        this.handler = invocationHandler;

    public proxy0() {

    public String sayHello(String string) {
        // 將參數(shù)存儲(chǔ)到 Object 數(shù)組中
        Object[] arrobject = new Object[]{string};
        // 調(diào)用 InvocationHandler 實(shí)現(xiàn)類的 invoke 方法得到調(diào)用結(jié)果
        Object object = this.handler.invoke(this, methods[0], arrobject);
        // 返回調(diào)用結(jié)果
        return (String)object;

    /** 回聲測(cè)試方法 */
    public Object $echo(Object object) {
        Object[] arrobject = new Object[]{object};
        Object object2 = this.handler.invoke(this, methods[1], arrobject);
        return object2;


public class InvokerInvocationHandler implements InvocationHandler {

    private final Invoker<?> invoker;

    public InvokerInvocationHandler(Invoker<?> handler) {
        this.invoker = handler;

    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
        String methodName = method.getName();
        Class<?>[] parameterTypes = method.getParameterTypes();
        // 如果是Object 類中的方法(未被子類重寫),比如 wait/notify膀跌,直接調(diào)用
        if (method.getDeclaringClass() == Object.class) {
            return method.invoke(invoker, args);
        // 如果 toString遭商、hashCode 和 equals 等方法被子類重寫了,這里也直接調(diào)用
        if ("toString".equals(methodName) && parameterTypes.length == 0) {
            return invoker.toString();
        if ("hashCode".equals(methodName) && parameterTypes.length == 0) {
            return invoker.hashCode();
        if ("equals".equals(methodName) && parameterTypes.length == 1) {
            return invoker.equals(args[0]);
        // 將 method 和 args 封裝到 RpcInvocation 中捅伤,并執(zhí)行后續(xù)的調(diào)用
        return invoker.invoke(new RpcInvocation(method, args)).recreate();


ublic abstract class AbstractInvoker<T> implements Invoker<T> {
    public Result invoke(Invocation inv) throws RpcException {
        if (destroyed.get()) {
            throw new RpcException("Rpc invoker for service ...");
        RpcInvocation invocation = (RpcInvocation) inv;
        // 設(shè)置 Invoker
        if (attachment != null && attachment.size() > 0) {
            // 設(shè)置 attachment
        Map<String, String> contextAttachments = RpcContext.getContext().getAttachments();
        if (contextAttachments != null && contextAttachments.size() != 0) {
            // 添加 contextAttachments 到 RpcInvocation#attachment 變量中
        if (getUrl().getMethodParameter(invocation.getMethodName(), Constants.ASYNC_KEY, false)) {
            // 設(shè)置異步信息到 RpcInvocation#attachment 中
            invocation.setAttachment(Constants.ASYNC_KEY, Boolean.TRUE.toString());
        RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);

        try {
            // 抽象方法,由子類實(shí)現(xiàn)
            return doInvoke(invocation);
        } catch (InvocationTargetException e) {
            // ...
        } catch (RpcException e) {
            // ...
        } catch (Throwable e) {
            return new RpcResult(e);

    protected abstract Result doInvoke(Invocation invocation) throws Throwable;
    // 省略其他方法


public class DubboInvoker<T> extends AbstractInvoker<T> {
    private final ExchangeClient[] clients;
    protected Result doInvoke(final Invocation invocation) throws Throwable {
        RpcInvocation inv = (RpcInvocation) invocation;
        final String methodName = RpcUtils.getMethodName(invocation);
        // 設(shè)置 path 和 version 到 attachment 中
        inv.setAttachment(Constants.PATH_KEY, getUrl().getPath());
        inv.setAttachment(Constants.VERSION_KEY, version);

        ExchangeClient currentClient;
        if (clients.length == 1) {
            // 從 clients 數(shù)組中獲取 ExchangeClient
            currentClient = clients[0];
        } else {
            currentClient = clients[index.getAndIncrement() % clients.length];
        try {
            // 獲取異步配置
            boolean isAsync = RpcUtils.isAsync(getUrl(), invocation);
            // isOneway 為 true圈澈,表示“單向”通信
            boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
            int timeout = getUrl().getMethodParameter(methodName, Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);

            // 異步無返回值
            if (isOneway) {
                boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
                // 發(fā)送請(qǐng)求
                currentClient.send(inv, isSent);
                // 設(shè)置上下文中的 future 字段為 null
                // 返回一個(gè)空的 RpcResult
                return new RpcResult();

            // 異步有返回值
            else if (isAsync) {
                // 發(fā)送請(qǐng)求惫周,并得到一個(gè) ResponseFuture 實(shí)例
                ResponseFuture future = currentClient.request(inv, timeout);
                // 設(shè)置 future 到上下文中
                RpcContext.getContext().setFuture(new FutureAdapter<Object>(future));
                // 暫時(shí)返回一個(gè)空結(jié)果
                return new RpcResult();

            // 同步調(diào)用
            else {
                // 發(fā)送請(qǐng)求,得到一個(gè) ResponseFuture 實(shí)例康栈,并調(diào)用該實(shí)例的 get 方法進(jìn)行等待
                return (Result) currentClient.request(inv, timeout).get();
        } catch (TimeoutException e) {
            throw new RpcException(..., "Invoke remote method timeout....");
        } catch (RemotingException e) {
            throw new RpcException(..., "Failed to invoke remote method: ...");
    // 省略其他方法


ResponseFuture 的 get 方法。異步調(diào)用模式下践叠,則由用戶調(diào)用該方法言缤。ResponseFuture 是一個(gè)接口,下面我們來看一下它的默認(rèn)實(shí)現(xiàn)類 DefaultFuture 的源碼禁灼。

public class DefaultFuture implements ResponseFuture {
    private static final Map<Long, Channel> CHANNELS = 
        new ConcurrentHashMap<Long, Channel>();

    private static final Map<Long, DefaultFuture> FUTURES = 
        new ConcurrentHashMap<Long, DefaultFuture>();
    private final long id;
    private final Channel channel;
    private final Request request;
    private final int timeout;
    private final Lock lock = new ReentrantLock();
    private final Condition done = lock.newCondition();
    private volatile Response response;
    public DefaultFuture(Channel channel, Request request, int timeout) {
        this.channel = channel;
        this.request = request;
        // 獲取請(qǐng)求 id管挟,這個(gè) id 很重要,后面還會(huì)見到
        this.id = request.getId();
        this.timeout = timeout > 0 ? timeout : channel.getUrl().getPositiveParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
        // 存儲(chǔ) <requestId, DefaultFuture> 映射關(guān)系到 FUTURES 中
        FUTURES.put(id, this);
        CHANNELS.put(id, channel);
    public Object get() throws RemotingException {
        return get(timeout);

    public Object get(int timeout) throws RemotingException {
        if (timeout <= 0) {
            timeout = Constants.DEFAULT_TIMEOUT;
        // 檢測(cè)服務(wù)提供方是否成功返回了調(diào)用結(jié)果
        if (!isDone()) {
            long start = System.currentTimeMillis();
            try {
                // 循環(huán)檢測(cè)服務(wù)提供方是否成功返回了調(diào)用結(jié)果
                while (!isDone()) {
                    // 如果調(diào)用結(jié)果尚未返回弄捕,這里等待一段時(shí)間
                    done.await(timeout, TimeUnit.MILLISECONDS);
                    // 如果調(diào)用結(jié)果成功返回僻孝,或等待超時(shí),此時(shí)跳出 while 循環(huán)守谓,執(zhí)行后續(xù)的邏輯
                    if (isDone() || System.currentTimeMillis() - start > timeout) {
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            } finally {
            // 如果調(diào)用結(jié)果仍未返回皮璧,則拋出超時(shí)異常
            if (!isDone()) {
                throw new TimeoutException(sent > 0, channel, getTimeoutMessage(false));
        // 返回調(diào)用結(jié)果
        return returnFromResponse();
    public boolean isDone() {
        // 通過檢測(cè) response 字段為空與否,判斷是否收到了調(diào)用結(jié)果
        return response != null;
    private Object returnFromResponse() throws RemotingException {
        Response res = response;
        if (res == null) {
            throw new IllegalStateException("response cannot be null");
        // 如果調(diào)用結(jié)果的狀態(tài)為 Response.OK分飞,則表示調(diào)用過程正常悴务,服務(wù)提供方成功返回了調(diào)用結(jié)果
        if (res.getStatus() == Response.OK) {
            return res.getResult();
        // 拋出異常
        if (res.getStatus() == Response.CLIENT_TIMEOUT || res.getStatus() == Response.SERVER_TIMEOUT) {
            throw new TimeoutException(res.getStatus() == Response.SERVER_TIMEOUT, channel, res.getErrorMessage());
        throw new RemotingException(channel, res.getErrorMessage());
    // 省略其他方法



8.2 服務(wù)消費(fèi)方發(fā)送請(qǐng)求

8.2.1 發(fā)送請(qǐng)求



final class ReferenceCountExchangeClient implements ExchangeClient {

    private final URL url;
    private final AtomicInteger referenceCount = new AtomicInteger(0);

    public ReferenceCountExchangeClient(ExchangeClient client, ConcurrentMap<String, LazyConnectExchangeClient> ghostClientMap) {
        this.client = client;
        // 引用計(jì)數(shù)自增
        this.url = client.getUrl();
        // ...

    public ResponseFuture request(Object request) throws RemotingException {
        // 直接調(diào)用被裝飾對(duì)象的同簽名方法
        return client.request(request);

    public ResponseFuture request(Object request, int timeout) throws RemotingException {
        // 直接調(diào)用被裝飾對(duì)象的同簽名方法
        return client.request(request, timeout);

    /** 引用計(jì)數(shù)自增,該方法由外部調(diào)用 */
    public void incrementAndGetCount() {
        // referenceCount 自增
    public void close(int timeout) {
        // referenceCount 自減
        if (referenceCount.decrementAndGet() <= 0) {
            if (timeout == 0) {
            } else {
            client = replaceWithLazyClient();
    // 省略部分方法


ReferenceCountExchangeClient 內(nèi)部定義了一個(gè)引用計(jì)數(shù)變量 referenceCount,每當(dāng)該對(duì)象被引用一次 referenceCount 都會(huì)進(jìn)行自增。每當(dāng) close 方法被調(diào)用時(shí)沐悦,referenceCount 進(jìn)行自減。ReferenceCountExchangeClient 內(nèi)部?jī)H實(shí)現(xiàn)了一個(gè)引用計(jì)數(shù)的功能五督,其他方法并無復(fù)雜邏輯藏否,均是直接調(diào)用被裝飾對(duì)象的相關(guān)方法。所以這里就不多說了充补,繼續(xù)向下分析副签,這次是 HeaderExchangeClient。

public class HeaderExchangeClient implements ExchangeClient {

    private static final ScheduledThreadPoolExecutor scheduled = new ScheduledThreadPoolExecutor(2, new NamedThreadFactory("dubbo-remoting-client-heartbeat", true));
    private final Client client;
    private final ExchangeChannel channel;
    private ScheduledFuture<?> heartbeatTimer;
    private int heartbeat;
    private int heartbeatTimeout;

    public HeaderExchangeClient(Client client, boolean needHeartbeat) {
        if (client == null) {
            throw new IllegalArgumentException("client == null");
        this.client = client;
        // 創(chuàng)建 HeaderExchangeChannel 對(duì)象
        this.channel = new HeaderExchangeChannel(client);
        // 以下代碼均與心跳檢測(cè)邏輯有關(guān)
        String dubbo = client.getUrl().getParameter(Constants.DUBBO_VERSION_KEY);
        this.heartbeat = client.getUrl().getParameter(Constants.HEARTBEAT_KEY, dubbo != null && dubbo.startsWith("1.0.") ? Constants.DEFAULT_HEARTBEAT : 0);
        this.heartbeatTimeout = client.getUrl().getParameter(Constants.HEARTBEAT_TIMEOUT_KEY, heartbeat * 3);
        if (heartbeatTimeout < heartbeat * 2) {
            throw new IllegalStateException("heartbeatTimeout < heartbeatInterval * 2");
        if (needHeartbeat) {
            // 開啟心跳檢測(cè)定時(shí)器

    public ResponseFuture request(Object request) throws RemotingException {
        // 直接 HeaderExchangeChannel 對(duì)象的同簽名方法
        return channel.request(request);

    public ResponseFuture request(Object request, int timeout) throws RemotingException {
        // 直接 HeaderExchangeChannel 對(duì)象的同簽名方法
        return channel.request(request, timeout);

    public void close() {
    private void doClose() {
        // 停止心跳檢測(cè)定時(shí)器

    private void startHeartbeatTimer() {
        if (heartbeat > 0) {
            heartbeatTimer = scheduled.scheduleWithFixedDelay(
                    new HeartBeatTask(new HeartBeatTask.ChannelProvider() {
                        public Collection<Channel> getChannels() {
                            return Collections.<Channel>singletonList(HeaderExchangeClient.this);
                    }, heartbeat, heartbeatTimeout),
                    heartbeat, heartbeat, TimeUnit.MILLISECONDS);

    private void stopHeartbeatTimer() {
        if (heartbeatTimer != null && !heartbeatTimer.isCancelled()) {
            try {
            } catch (Throwable e) {
                if (logger.isWarnEnabled()) {
                    logger.warn(e.getMessage(), e);
        heartbeatTimer = null;
    // 省略部分方法

HeaderExchangeClient很多方法都只有一行代碼基矮,即直接調(diào)用HeaderExchangeChannel 對(duì)象的方法淆储。那么HeaderExchangeClient的用處是什么呢?其實(shí)只是封裝了一些關(guān)于心跳檢測(cè)的邏輯家浇,所以我們還要進(jìn)一步分析HeaderExchangeChannel的實(shí)現(xiàn):

HeaderExchangeChannel 對(duì)象的同簽名方法本砰。那 HeaderExchangeClient 有什么用處呢?答案是封裝了一些關(guān)于心跳檢測(cè)的邏輯钢悲。心跳檢測(cè)并非本文所關(guān)注的點(diǎn)点额,因此就不多說了,繼續(xù)向下看莺琳。

final class HeaderExchangeChannel implements ExchangeChannel {
    private final Channel channel;
    HeaderExchangeChannel(Channel channel) {
        if (channel == null) {
            throw new IllegalArgumentException("channel == null");
        // 這里的 channel 指向的是 NettyClient
        this.channel = channel;
    public ResponseFuture request(Object request) throws RemotingException {
        return request(request, channel.getUrl().getPositiveParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT));

    public ResponseFuture request(Object request, int timeout) throws RemotingException {
        if (closed) {
            throw new RemotingException(..., "Failed to send request ...);
        // 創(chuàng)建 Request 對(duì)象
        Request req = new Request();
        // 設(shè)置雙向通信標(biāo)志為 true
        // 這里的 request 變量類型為 RpcInvocation
        // 創(chuàng)建 DefaultFuture 對(duì)象
        DefaultFuture future = new DefaultFuture(channel, req, timeout);
        try {
            // 調(diào)用 NettyClient 的 send 方法發(fā)送請(qǐng)求
        } catch (RemotingException e) {
            throw e;
        // 返回 DefaultFuture 對(duì)象
        return future;


到這里大家終于看到了 Request 語義了珠十,上面的方法首先定義了一個(gè) Request 對(duì)象,然后再將該對(duì)象傳給 NettyClient 的 send 方法凭豹,進(jìn)行后續(xù)的調(diào)用焙蹭。需要說明的是,NettyClient 中并未實(shí)現(xiàn) send 方法嫂伞,該方法繼承自父類 AbstractPeer孔厉,下面直接分析 AbstractPeer 的代碼。

public abstract class AbstractPeer implements Endpoint, ChannelHandler {
    public void send(Object message) throws RemotingException {
        // 該方法由子類AbstractClient 類實(shí)現(xiàn)
        send(message, url.getParameter(Constants.SENT_KEY, false));
    // 省略其他方法

public abstract class AbstractClient extends AbstractEndpoint implements Client {
    public void send(Object message, boolean sent) throws RemotingException {
        if (send_reconnect && !isConnected()) {
        // 獲取 Channel帖努,getChannel 是一個(gè)抽象方法撰豺,具體由子類實(shí)現(xiàn)
        Channel channel = getChannel();
        if (channel == null || !channel.isConnected()) {
            throw new RemotingException(this, "message can not send ...");
        // 繼續(xù)向下調(diào)用
        channel.send(message, sent);
    protected abstract Channel getChannel();
    // 省略其他方法


public class NettyClient extends AbstractClient {
    // 這里的 Channel 全限定名稱為 org.jboss.netty.channel.Channel
    private volatile Channel channel;

    protected com.alibaba.dubbo.remoting.Channel getChannel() {
        Channel c = channel;
        if (c == null || !c.isConnected())
            return null;
        // 獲取一個(gè) NettyChannel 類型對(duì)象
        return NettyChannel.getOrAddChannel(c, getUrl(), this);

final class NettyChannel extends AbstractChannel {

    private static final ConcurrentMap<org.jboss.netty.channel.Channel, NettyChannel> channelMap = 
        new ConcurrentHashMap<org.jboss.netty.channel.Channel, NettyChannel>();

    private final org.jboss.netty.channel.Channel channel;
    /** 私有構(gòu)造方法 */
    private NettyChannel(org.jboss.netty.channel.Channel channel, URL url, ChannelHandler handler) {
        super(url, handler);
        if (channel == null) {
            throw new IllegalArgumentException("netty channel == null;");
        this.channel = channel;

    static NettyChannel getOrAddChannel(org.jboss.netty.channel.Channel ch, URL url, ChannelHandler handler) {
        if (ch == null) {
            return null;
        // 嘗試從集合中獲取 NettyChannel 實(shí)例
        NettyChannel ret = channelMap.get(ch);
        if (ret == null) {
            // 如果 ret = null,則創(chuàng)建一個(gè)新的 NettyChannel 實(shí)例
            NettyChannel nc = new NettyChannel(ch, url, handler);
            if (ch.isConnected()) {
                // 將 <Channel, NettyChannel> 鍵值對(duì)存入 channelMap 集合中
                ret = channelMap.putIfAbsent(ch, nc);
            if (ret == null) {
                ret = nc;
        return ret;

獲取到 NettyChannel 實(shí)例后亭姥,即可進(jìn)行后續(xù)的調(diào)用稼钩。下面看一下 NettyChannel 的 send 方法。

public void send(Object message, boolean sent) throws RemotingException {
    super.send(message, sent);

    boolean success = true;
    int timeout = 0;
    try {
        // 發(fā)送消息(包含請(qǐng)求和響應(yīng)消息)
        ChannelFuture future = channel.write(message);
        // sent 的值源于 <dubbo:method sent="true/false" /> 中 sent 的配置值达罗,有兩種配置值:
        //   1. true: 等待消息發(fā)出坝撑,消息發(fā)送失敗將拋出異常
        //   2. false: 不等待消息發(fā)出,將消息放入 IO 隊(duì)列粮揉,即刻返回
        // 默認(rèn)情況下 sent = false绍载;
        if (sent) {
            timeout = getUrl().getPositiveParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
            // 等待消息發(fā)出,若在規(guī)定時(shí)間沒能發(fā)出滔蝉,success 會(huì)被置為 false
            success = future.await(timeout);
        Throwable cause = future.getCause();
        if (cause != null) {
            throw cause;
    } catch (Throwable e) {
        throw new RemotingException(this, "Failed to send message ...");

    // 若 success 為 false击儡,這里拋出異常
    if (!success) {
        throw new RemotingException(this, "Failed to send message ...");

經(jīng)歷多次調(diào)用,到這里請(qǐng)求數(shù)據(jù)的發(fā)送過程就結(jié)束了蝠引,過程漫長(zhǎng)阳谍。為了便于大家閱讀代碼,這里以 DemoService 為例螃概,將 sayHello 方法的整個(gè)調(diào)用路徑貼出來矫夯。

  —> InvokerInvocationHandler#invoke(Object, Method, Object[])
    —> MockClusterInvoker#invoke(Invocation)
      —> AbstractClusterInvoker#invoke(Invocation)
        —> FailoverClusterInvoker#doInvoke(Invocation, List<Invoker<T>>, LoadBalance)
          —> Filter#invoke(Invoker, Invocation)  // 包含多個(gè) Filter 調(diào)用
            —> ListenerInvokerWrapper#invoke(Invocation) 
              —> AbstractInvoker#invoke(Invocation) 
                —> DubboInvoker#doInvoke(Invocation)
                  —> ReferenceCountExchangeClient#request(Object, int)
                    —> HeaderExchangeClient#request(Object, int)
                      —> HeaderExchangeChannel#request(Object, int)
                        —> AbstractPeer#send(Object)
                          —> AbstractClient#send(Object, boolean)
                            —> NettyChannel#send(Object, boolean)
                              —> NioClientSocketChannel#write(Object)

8.2.2 請(qǐng)求編碼




8.3 服務(wù)提供方接收請(qǐng)求

8.3.1 請(qǐng)求解碼


8.3.2 調(diào)用服務(wù)


NettyHandler#messageReceived(ChannelHandlerContext, MessageEvent)
  —> AbstractPeer#received(Channel, Object)
    —> MultiMessageHandler#received(Channel, Object)
      —> HeartbeatHandler#received(Channel, Object)
        —> AllChannelHandler#received(Channel, Object)
          —> ExecutorService#execute(Runnable)    // 由線程池執(zhí)行后續(xù)的調(diào)用邏輯


public class AllChannelHandler extends WrappedChannelHandler {

    public AllChannelHandler(ChannelHandler handler, URL url) {
        super(handler, url);

    /** 處理連接事件 */
    public void connected(Channel channel) throws RemotingException {
        // 獲取線程池
        ExecutorService cexecutor = getExecutorService();
        try {
            // 將連接事件派發(fā)到線程池中處理
            cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CONNECTED));
        } catch (Throwable t) {
            throw new ExecutionException(..., " error when process connected event .", t);

    /** 處理斷開事件 */
    public void disconnected(Channel channel) throws RemotingException {
        ExecutorService cexecutor = getExecutorService();
        try {
            cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.DISCONNECTED));
        } catch (Throwable t) {
            throw new ExecutionException(..., "error when process disconnected event .", t);

    /** 處理請(qǐng)求和響應(yīng)消息爪模,這里的 message 變量類型可能是 Request欠啤,也可能是 Response */
    public void received(Channel channel, Object message) throws RemotingException {
        ExecutorService cexecutor = getExecutorService();
        try {
            // 將請(qǐng)求和響應(yīng)消息派發(fā)到線程池中處理
            cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
        } catch (Throwable t) {
            if(message instanceof Request && t instanceof RejectedExecutionException){
                Request request = (Request)message;
                // 如果通信方式為雙向通信,此時(shí)將 Server side ... threadpool is exhausted 
                // 錯(cuò)誤信息封裝到 Response 中屋灌,并返回給服務(wù)消費(fèi)方洁段。
                    String msg = "Server side(" + url.getIp() + "," + url.getPort() 
                        + ") threadpool is exhausted ,detail msg:" + t.getMessage();
                    Response response = new Response(request.getId(), request.getVersion());
                    // 返回包含錯(cuò)誤信息的 Response 對(duì)象
            throw new ExecutionException(..., " error when process received event .", t);

    /** 處理異常信息 */
    public void caught(Channel channel, Throwable exception) throws RemotingException {
        ExecutorService cexecutor = getExecutorService();
        try {
            cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CAUGHT, exception));
        } catch (Throwable t) {
            throw new ExecutionException(..., "error when process caught event ...");


public class ChannelEventRunnable implements Runnable {
    private final ChannelHandler handler;
    private final Channel channel;
    private final ChannelState state;
    private final Throwable exception;
    private final Object message;
    public void run() {
        // 檢測(cè)通道狀態(tài),對(duì)于請(qǐng)求或響應(yīng)消息除嘹,此時(shí) state = RECEIVED
        if (state == ChannelState.RECEIVED) {
            try {
                // 將 channel 和 message 傳給 ChannelHandler 對(duì)象写半,進(jìn)行后續(xù)的調(diào)用
                // 注意這個(gè)handler是AllChannelHandler傳過來的,
                // AllChannelHandler本身是handlerWrapper尉咕,通過把其包裝的handler傳遞給ChannelEventRunnable叠蝇,
                // 使其能夠在線程池中繼續(xù)handler(還有多層包裝)的處理
                handler.received(channel, message);
            } catch (Exception e) {
                logger.warn("... operation error, channel is ... message is ...");
        // 其他消息類型通過 switch 進(jìn)行處理
        else {
            switch (state) {
            case CONNECTED:
                try {
                } catch (Exception e) {
                    logger.warn("... operation error, channel is ...");
            case DISCONNECTED:
                // ...
            case SENT:
                // ...
            case CAUGHT:
                // ...
                logger.warn("unknown state: " + state + ", message is " + message);



DecodeHandler extends AbstractChannelHandlerDelegate {

    public DecodeHandler(ChannelHandler handler) {

    public void received(Channel channel, Object message) throws RemotingException {
        if (message instanceof Decodeable) {
            // 對(duì) Decodeable 接口實(shí)現(xiàn)類對(duì)象進(jìn)行解碼

        if (message instanceof Request) {
            // 對(duì) Request 的 data 字段進(jìn)行解碼
            decode(((Request) message).getData());

        if (message instanceof Response) {
            // 對(duì) Request 的 result 字段進(jìn)行解碼
            decode(((Response) message).getResult());

        // 執(zhí)行后續(xù)邏輯
        handler.received(channel, message);

    private void decode(Object message) {
        // Decodeable 接口目前有兩個(gè)實(shí)現(xiàn)類年缎,
        // 分別為 DecodeableRpcInvocation 和 DecodeableRpcResult
        if (message != null && message instanceof Decodeable) {
            try {
                // 執(zhí)行解碼邏輯
                ((Decodeable) message).decode();
            } catch (Throwable e) {
                if (log.isWarnEnabled()) {
                    log.warn("Call Decodeable.decode failed: " + e.getMessage(), e);


public class HeaderExchangeHandler implements ChannelHandlerDelegate {

    private final ExchangeHandler handler;

    public HeaderExchangeHandler(ExchangeHandler handler) {
        if (handler == null) {
            throw new IllegalArgumentException("handler == null");
        this.handler = handler;

    public void received(Channel channel, Object message) throws RemotingException {
        channel.setAttribute(KEY_READ_TIMESTAMP, System.currentTimeMillis());
        ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel);
        try {
            // 處理請(qǐng)求對(duì)象
            if (message instanceof Request) {
                Request request = (Request) message;
                if (request.isEvent()) {
                    // 處理事件
                    handlerEvent(channel, request);
                // 處理普通的請(qǐng)求
                else {
                    // 雙向通信
                    if (request.isTwoWay()) {
                        // 向后調(diào)用服務(wù)单芜,并得到調(diào)用結(jié)果
                        Response response = handleRequest(exchangeChannel, request);
                        // 將調(diào)用結(jié)果返回給服務(wù)消費(fèi)端
                    // 如果是單向通信蜕该,僅向后調(diào)用指定服務(wù)即可,無需返回調(diào)用結(jié)果
                    else {
                        handler.received(exchangeChannel, request.getData());
            // 處理響應(yīng)對(duì)象洲鸠,服務(wù)消費(fèi)方會(huì)執(zhí)行此處邏輯堂淡,后面分析
            else if (message instanceof Response) {
                handleResponse(channel, (Response) message);
            } else if (message instanceof String) {
                // telnet 相關(guān),忽略
            } else {
                handler.received(exchangeChannel, message);
        } finally {

    Response handleRequest(ExchangeChannel channel, Request req) throws RemotingException {
        Response res = new Response(req.getId(), req.getVersion());
        // 檢測(cè)請(qǐng)求是否合法,不合法則返回狀態(tài)碼為 BAD_REQUEST 的響應(yīng)
        if (req.isBroken()) {
            Object data = req.getData();

            String msg;
            if (data == null)
                msg = null;
            else if
                (data instanceof Throwable) msg = StringUtils.toString((Throwable) data);
                msg = data.toString();
            res.setErrorMessage("Fail to decode request due to: " + msg);
            // 設(shè)置 BAD_REQUEST 狀態(tài)

            return res;
        // 獲取 data 字段值绢淀,也就是 RpcInvocation 對(duì)象
        Object msg = req.getData();
        try {
            // 繼續(xù)向下調(diào)用
            Object result = handler.reply(channel, msg);
            // 設(shè)置 OK 狀態(tài)碼
            // 設(shè)置調(diào)用結(jié)果
        } catch (Throwable e) {
            // 若調(diào)用過程出現(xiàn)異常萤悴,則設(shè)置 SERVICE_ERROR,表示服務(wù)端異常
        return res;


ExchangeHandlerAdapter() {

        public Object reply(ExchangeChannel channel, Object message) throws RemotingException {
            if (message instanceof Invocation) {
                Invocation inv = (Invocation) message;
                // 獲取 Invoker 實(shí)例
                Invoker<?> invoker = getInvoker(channel, inv);
                if (Boolean.TRUE.toString().equals(inv.getAttachments().get(IS_CALLBACK_SERVICE_INVOKE))) {
                    // 回調(diào)相關(guān)梗顺,忽略
                // 通過 Invoker 調(diào)用具體的服務(wù)
                return invoker.invoke(inv);
            throw new RemotingException(channel, "Unsupported request: ...");
        // 忽略其他方法
    Invoker<?> getInvoker(Channel channel, Invocation inv) throws RemotingException {
        // 忽略回調(diào)和本地存根相關(guān)邏輯
        // ...
        int port = channel.getLocalAddress().getPort();
        // 計(jì)算 service key,格式為 groupName/serviceName:serviceVersion:port车摄。比如:
        //   dubbo/com.alibaba.dubbo.demo.DemoService:1.0.0:20880
        String serviceKey = serviceKey(port, path, inv.getAttachments().get(Constants.VERSION_KEY), inv.getAttachments().get(Constants.GROUP_KEY));

        // 從 exporterMap 查找與 serviceKey 相對(duì)應(yīng)的 DubboExporter 對(duì)象寺谤,
        // 服務(wù)導(dǎo)出過程中會(huì)將 <serviceKey, DubboExporter> 映射關(guān)系存儲(chǔ)到 exporterMap 集合中
        DubboExporter<?> exporter = (DubboExporter<?>) exporterMap.get(serviceKey);

        if (exporter == null)
            throw new RemotingException(channel, "Not found exported service ...");

        // 獲取 Invoker 對(duì)象,并返回
        return exporter.getInvoker();
    // 忽略其他方法

以上邏輯用于獲取與指定服務(wù)對(duì)應(yīng)的 Invoker 實(shí)例吮播,并通過 Invoker 的 invoke 方法調(diào)用服務(wù)邏輯变屁。invoke 方法定義在 AbstractProxyInvoker 中,代碼如下:

public abstract class AbstractProxyInvoker<T> implements Invoker<T> {

    public Result invoke(Invocation invocation) throws RpcException {
        try {
            // 調(diào)用 doInvoke 執(zhí)行后續(xù)的調(diào)用意狠,并將調(diào)用結(jié)果封裝到 RpcResult 中粟关,并
            return new RpcResult(doInvoke(proxy, invocation.getMethodName(), invocation.getParameterTypes(), invocation.getArguments()));
        } catch (InvocationTargetException e) {
            return new RpcResult(e.getTargetException());
        } catch (Throwable e) {
            throw new RpcException("Failed to invoke remote proxy method ...");
    protected abstract Object doInvoke(T proxy, String methodName, Class<?>[] parameterTypes, Object[] arguments) throws Throwable;

如上,doInvoke 是一個(gè)抽象方法环戈,這個(gè)需要由具體的 Invoker 實(shí)例實(shí)現(xiàn)誊役。Invoker 實(shí)例是在運(yùn)行時(shí)通過 JavassistProxyFactory 創(chuàng)建的,創(chuàng)建邏輯如下:

public class JavassistProxyFactory extends AbstractProxyFactory {
    // 省略其他方法

    public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {
        final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type);
        // 創(chuàng)建匿名類對(duì)象
        return new AbstractProxyInvoker<T>(proxy, type, url) {
            protected Object doInvoke(T proxy, String methodName,
                                      Class<?>[] parameterTypes,
                                      Object[] arguments) throws Throwable {
                // 調(diào)用 invokeMethod 方法進(jìn)行后續(xù)的調(diào)用
                return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);

Wrapper 是一個(gè)抽象類谷市,其中 invokeMethod 是一個(gè)抽象方法蛔垢。Dubbo 會(huì)在運(yùn)行時(shí)通過 Javassist 框架為 Wrapper 生成實(shí)現(xiàn)類,并實(shí)現(xiàn) invokeMethod 方法迫悠,該方法最終會(huì)根據(jù)調(diào)用信息調(diào)用具體的服務(wù)鹏漆。以 DemoServiceImpl 為例,Javassist 為其生成的代理類如下。

/** Wrapper0 是在運(yùn)行時(shí)生成的艺玲,大家可使用 Arthas 進(jìn)行反編譯 */
public class Wrapper0 extends Wrapper implements ClassGenerator.DC {
    public static String[] pns;
    public static Map pts;
    public static String[] mns;
    public static String[] dmns;
    public static Class[] mts0;

    // 省略其他方法

    public Object invokeMethod(Object object, String string, Class[] arrclass, Object[] arrobject) throws InvocationTargetException {
        DemoService demoService;
        try {
            // 類型轉(zhuǎn)換
            demoService = (DemoService)object;
        catch (Throwable throwable) {
            throw new IllegalArgumentException(throwable);
        try {
            // 根據(jù)方法名調(diào)用指定的方法
            if ("sayHello".equals(string) && arrclass.length == 1) {
                return demoService.sayHello((String)arrobject[0]);
        catch (Throwable throwable) {
            throw new InvocationTargetException(throwable);
        throw new NoSuchMethodException(new StringBuffer().append("Not found method \"").append(string).append("\" in class com.alibaba.dubbo.demo.DemoService.").toString());


  —> DecodeHandler#received(Channel, Object)
    —> HeaderExchangeHandler#received(Channel, Object)
      —> HeaderExchangeHandler#handleRequest(ExchangeChannel, Request)
        —> DubboProtocol.requestHandler#reply(ExchangeChannel, Object)
          —> Filter#invoke(Invoker, Invocation)
            —> AbstractProxyInvoker#invoke(Invocation)
              —> Wrapper0#invokeMethod(Object, String, Class[], Object[])
                —> DemoServiceImpl#sayHello(String)


public class HeaderExchangeHandler implements ChannelHandlerDelegate {
   public void received(Channel channel, Object message) throws RemotingException {
       channel.setAttribute(KEY_READ_TIMESTAMP, System.currentTimeMillis());
       ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel);
       try {
           if (message instanceof Request) {
               // 處理請(qǐng)求狡赐,前面已分析過,省略
           } else if (message instanceof Response) {
               // 處理響應(yīng)
               handleResponse(channel, (Response) message);
           } else if (message instanceof String) {
               // telnet 相關(guān)钦幔,忽略
           } else {
               handler.received(exchangeChannel, message);
       } finally {

   static void handleResponse(Channel channel, Response response) throws RemotingException {
       if (response != null && !response.isHeartbeat()) {
           // 繼續(xù)向下調(diào)用
           DefaultFuture.received(channel, response);

public class DefaultFuture implements ResponseFuture {  
   private final Lock lock = new ReentrantLock();
   private final Condition done = lock.newCondition();
   private volatile Response response;
   public static void received(Channel channel, Response response) {
       try {
           // 根據(jù)調(diào)用編號(hào)從 FUTURES 集合中查找指定的 DefaultFuture 對(duì)象
           DefaultFuture future = FUTURES.remove(response.getId());
           if (future != null) {
               // 繼續(xù)向下調(diào)用
           } else {
               logger.warn("The timeout response finally returned at ...");
       } finally {

   private void doReceived(Response res) {
       try {
           // 保存響應(yīng)對(duì)象
           response = res;
           if (done != null) {
               // 喚醒用戶線程
       } finally {
       if (callback != null) {

以上邏輯是將響應(yīng)對(duì)象保存到相應(yīng)的 DefaultFuture 實(shí)例中枕屉,然后再喚醒用戶線程,隨后用戶線程即可從 DefaultFuture 實(shí)例中獲取到相應(yīng)結(jié)果节槐。

