Dubbo網(wǎng)絡(luò)模型(居然肝了兩周才寫完榜轿,吐血推薦)

本篇為dubbo高性能剖析的下篇钩乍,側(cè)重于剖析dubbo在網(wǎng)絡(luò)層的源碼實(shí)現(xiàn)及設(shè)計(jì)。(上篇傳送門dubbo的線程模型

概述

Dubbo中Consumer端和Provider端由于角色和職責(zé)不盡相同逃贝,本篇將分開(kāi)介紹Consumer、Provider迫摔。希望借助示意圖再加上源碼的剖析沐扳,能讓大家對(duì)Dubbo中的網(wǎng)絡(luò)請(qǐng)求模型有個(gè)比較清晰的了解和認(rèn)識(shí)。

Consumer端

Consumer端由于支持的場(chǎng)景非常多句占,直接給出代碼分析大家可能會(huì)看的云里霧里沪摄。所以這里先給出示意圖,從整體上來(lái)了解請(qǐng)求模型有哪些纱烘,并通過(guò)簡(jiǎn)單的示例對(duì)各個(gè)模型有個(gè)簡(jiǎn)單的認(rèn)識(shí)卓起。然后再開(kāi)始深入剖析源碼實(shí)現(xiàn)。

我們先來(lái)了解下consumer端請(qǐng)求模型有哪些及其特點(diǎn):

  • oneway 客戶端發(fā)出請(qǐng)求之后就結(jié)束了凹炸,不需要等待服務(wù)端響應(yīng)戏阅。
  • 同步請(qǐng)求,客戶端請(qǐng)求之后當(dāng)前線程同步等待消息返回啤它,處理請(qǐng)求和響應(yīng)是同一個(gè)線程奕筐。
  • 異步請(qǐng)求,一種是通過(guò)上下文獲取Future對(duì)象后主動(dòng)調(diào)用get方法阻塞直到返回消息变骡。處理請(qǐng)求和響應(yīng)的也是同一個(gè)線程离赫,只不過(guò)在等待響應(yīng)的同時(shí)可以見(jiàn)縫插針的處理其他的任務(wù)。
  • 異步請(qǐng)求塌碌,另一種是通過(guò)Future對(duì)象注冊(cè)回調(diào)方法渊胸,處理請(qǐng)求和響應(yīng)的是兩個(gè)不同的線程。(這里拋出個(gè)問(wèn)題響應(yīng)和如何關(guān)聯(lián)到對(duì)應(yīng)的請(qǐng)求的呢)

交互示意圖如下所示:

net_model_01.png

為了進(jìn)一步區(qū)分異步請(qǐng)求阻塞get()與回調(diào)的區(qū)別台妆,我們通過(guò)代碼示例進(jìn)一步說(shuō)明

接口契約如下:

public interface DemoService {

    String sayHello(String name);

    default CompletableFuture<String> sayHelloAsync(String name) {
        return CompletableFuture.completedFuture(sayHello(name));
    }

}

服務(wù)提供方實(shí)現(xiàn):

public class DemoServiceImpl implements DemoService {
    private static final Logger logger = LoggerFactory.getLogger(DemoServiceImpl.class);

    @Override
    public String sayHello(String name) {
        LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(3));
        return "Hello " + name + ";
    }

}

調(diào)用方代碼:

DemoService demoService = ReferenceConfigCache.getCache().get(reference);

CompletableFuture<String> future = demoService.sayHelloAsync("get");
System.err.println("threadName:" + Thread.currentThread().getName() + " get 開(kāi)始");
future.get();
System.err.println("threadName:" + Thread.currentThread().getName() + " get 結(jié)束");


System.err.println("threadName:" + Thread.currentThread().getName() + " callback 開(kāi)始");
demoService.sayHelloAsync("callback").whenComplete((r, e) -> {
  System.err.println("threadName:" + Thread.currentThread().getName() + " callback 結(jié)束");
});

System.err.println("threadName:" + Thread.currentThread().getName() + " 執(zhí)行完畢");

代碼中在關(guān)鍵的節(jié)點(diǎn)打印了當(dāng)前線程名翎猛,依次調(diào)用了Future.getCompletableFuture.whenComplete接剩,控制臺(tái)輸出結(jié)果為:

threadName:main get 開(kāi)始
threadName:main get 結(jié)束
threadName:main callback 開(kāi)始
threadName:main 執(zhí)行完畢
    
threadName:DubboClientHandler-10.0.107.214:20880-thread-1 callback 結(jié)束

通過(guò)執(zhí)行結(jié)果可以發(fā)現(xiàn):通過(guò)Future.get()'會(huì)阻塞當(dāng)前線程等待結(jié)果切厘,而CompletableFuture`的回調(diào)則使用單獨(dú)的線程不會(huì)阻塞當(dāng)前線程的執(zhí)行。

Consumer關(guān)鍵源碼分析

上面概述了Consumer中存在的幾種網(wǎng)絡(luò)模型懊缺,本小節(jié)我們側(cè)重于源碼分析疫稿,剖析上面幾種請(qǐng)求模型實(shí)現(xiàn)原理。

protected Result doInvoke(final Invocation invocation) throws Throwable {
    // 省略部分代碼
    try {
        // 如果是單向請(qǐng)求
        boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
        int timeout = getUrl().getMethodPositiveParameter(methodName, TIMEOUT_KEY, DEFAULT_TIMEOUT);
        if (isOneway) {
            // 是否確認(rèn)異步發(fā)送
            boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
            currentClient.send(inv, isSent);
            return AsyncRpcResult.newDefaultAsyncResult(invocation);
        } else {
            // 如果是同步使用ThreadlessExecutor,其他場(chǎng)景使用Dubbo創(chuàng)建的共享線程池
            ExecutorService executor = getCallbackExecutor(getUrl(), inv);
            // 創(chuàng)建CompletableFuture遗座,并通過(guò)netty work線程池將消息異步發(fā)送出去
            CompletableFuture<AppResponse> appResponseFuture =
                    currentClient.request(inv, timeout, executor).thenApply(o -> (AppResponse) o);
            // save for 2.6.x compatibility, for example, TraceFilter in Zipkin uses com.alibaba.xxx.FutureAdapter
            FutureContext.getContext().setCompatibleFuture(appResponseFuture);
            AsyncRpcResult result = new AsyncRpcResult(appResponseFuture, inv);
            // 設(shè)置執(zhí)行異步回調(diào)任務(wù)的線程池為executor
            result.setExecutor(executor);
            return result;
        }
    } catch (TimeoutException e) {
        throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
    } catch (RemotingException e) {
        throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
    }
}

可以看到整體上的代碼邏輯有兩個(gè)分支:處理單向請(qǐng)求和非單向請(qǐng)求舀凛,那么我們下面就依次展開(kāi)兩種場(chǎng)景具體分析。

單向請(qǐng)求

1. boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
2. currentClient.send(inv, isSent);
3. return AsyncRpcResult.newDefaultAsyncResult(invocation);

單向請(qǐng)求代碼很簡(jiǎn)單只有三行就完成了途蒋。第1行代碼出現(xiàn)的isSent參數(shù)表示線程是否等待確認(rèn)異步發(fā)送猛遍。因?yàn)橄⒆罱K還是通過(guò)Netty NIO異步發(fā)送出去的,如果設(shè)置為true意味著要一直等待至Netty發(fā)送完成碎绎,默認(rèn)為false螃壤。第2行代碼跟蹤消息發(fā)送過(guò)程,調(diào)用鏈如下:

-ReferenceCountExchangeClient
--HeaderExchangeClient
---HeaderExchangeChannel
----NettyChannel

最終發(fā)送消息核心代碼如下:

1. public void send(Object message, boolean sent) throws RemotingException {
    // 省略非關(guān)鍵代碼
  
    // 這里通過(guò)調(diào)用Netty方法將消息發(fā)送出去
2.     ChannelFuture future = channel.writeAndFlush(message);
    // 如果方法配置sent=true則需要同步等待消息發(fā)送完畢
3.     if (sent) {
      // wait timeout ms
4.     timeout = getUrl().getPositiveParameter(TIMEOUT_KEY, DEFAULT_TIMEOUT);
5.     success = future.await(timeout);
    }
   // 省略非關(guān)鍵代碼
}

在第2行可以看到消息通過(guò)Netty中的channel將消息異步發(fā)送出去筋帖,返回了一個(gè)Future對(duì)象奸晴,如果isSent設(shè)置為true那么在執(zhí)行到第5行時(shí)就會(huì)阻塞至消息發(fā)送成功或者超時(shí)后返回。

方法send執(zhí)行結(jié)束后程序會(huì)返回主流程的doInvoke方法繼續(xù)執(zhí)行到AsyncRpcResult.newDefaultAsyncResult(invocation);日麸,這里創(chuàng)建默認(rèn)返回值后就完成了單向請(qǐng)求全部過(guò)程寄啼。

除了單向請(qǐng)求,其他另外三種都是雙向的代箭,有去有回墩划。一個(gè)是請(qǐng)求報(bào)文的發(fā)送,一個(gè)是響應(yīng)報(bào)文的接收嗡综,兩者需要結(jié)合起來(lái)一起合作才能完成一個(gè)RPC的全部過(guò)程乙帮。那么后續(xù)針對(duì)其他三種請(qǐng)求模型代碼分析時(shí)我們也按照先請(qǐng)求后響應(yīng)的步驟展開(kāi)。

同步請(qǐng)求

即客戶端請(qǐng)求之后當(dāng)前線程會(huì)阻塞等待響應(yīng)极景,處理請(qǐng)求和響應(yīng)的同一個(gè)線程察净。接下來(lái)我們重點(diǎn)分析以下實(shí)現(xiàn):

  • 消息發(fā)送過(guò)程。
  • 消息接受過(guò)程盼樟。
  • Dubbo是如何通過(guò)響應(yīng)找到請(qǐng)求氢卡。
  • 如何實(shí)現(xiàn)阻塞當(dāng)前線程。
  • 響應(yīng)消息返回后阻塞線程是如何喚醒的晨缴。

下面代碼是非單向請(qǐng)求的處理邏輯

// 如果是同步使用ThreadlessExecutor译秦,其他場(chǎng)景使用Dubbo創(chuàng)建的共享線程池
1. ExecutorService executor = getCallbackExecutor(getUrl(), inv);
// 創(chuàng)建CompletableFuture,并通過(guò)netty work線程池將消息異步發(fā)送出去
2. CompletableFuture<AppResponse> appResponseFuture =
        currentClient.request(inv, timeout, executor).thenApply(o -> (AppResponse) o);
// save for 2.6.x compatibility, for example, TraceFilter in Zipkin uses com.alibaba.xxx.FutureAdapter
FutureContext.getContext().setCompatibleFuture(appResponseFuture);
AsyncRpcResult result = new AsyncRpcResult(appResponseFuture, inv);
// 設(shè)置執(zhí)行異步回調(diào)任務(wù)的線程池為executor
result.setExecutor(executor);
return result;

第1行代碼:當(dāng)前執(zhí)行的請(qǐng)求通過(guò)getCallbackExecutor獲得對(duì)應(yīng)的線程池(用來(lái)執(zhí)行返回請(qǐng)求時(shí)異步回調(diào)過(guò)程)击碗,代碼如下:

protected ExecutorService getCallbackExecutor(URL url, Invocation inv) {
    ExecutorService sharedExecutor = ExtensionLoader.getExtensionLoader(ExecutorRepository.class).getDefaultExtension().getExecutor(url);
    // 如果當(dāng)前請(qǐng)求是同步請(qǐng)求筑悴,則返回ThreadlessExecutor
    if (InvokeMode.SYNC == RpcUtils.getInvokeMode(getUrl(), inv)) {
        return new ThreadlessExecutor(sharedExecutor);
    } else {
        // 否則返回客戶端創(chuàng)建的共享線程池
        return sharedExecutor;
    }
}

關(guān)于ThreadlessExecutor有必要介紹下它的特點(diǎn),官方文檔說(shuō)明如下:

  • ThreadlessExecutor和其他的線程池最大的區(qū)別是它不管理任何線程延都。
  • 提交給它的任務(wù)不會(huì)由單獨(dú)的線程去調(diào)度執(zhí)行雷猪。
  • 被存放在阻塞隊(duì)列中的任務(wù)只有被調(diào)用了waitAndDrain()才會(huì)執(zhí)行,并且執(zhí)行任務(wù)的線程是當(dāng)前調(diào)用了waitAndDrain()方法的線程晰房。

補(bǔ)充說(shuō)明,對(duì)理解同步請(qǐng)求阻塞原理非常重要:在ThreadlessExecutor中任務(wù)使用阻塞隊(duì)列存儲(chǔ),如果隊(duì)列是空的情況下直接調(diào)用waitAndDrain()方法時(shí)殊者,阻塞隊(duì)列會(huì)阻塞當(dāng)前線程直至有新的任務(wù)到來(lái)与境。Dubbo充分利用阻塞隊(duì)列的特性,在請(qǐng)求完成之后調(diào)用waitAndDrain()方法阻塞住當(dāng)前線程猖吴,當(dāng)Netty的work線程收到響應(yīng)之后會(huì)將消息轉(zhuǎn)換成待處理任務(wù)加入到該線程池中摔刁。此時(shí)剛剛因?yàn)闆](méi)有任務(wù)處理而阻塞的線程會(huì)被喚醒,用來(lái)繼續(xù)處理響應(yīng)的消息海蔽。理解這段話我們來(lái)繼續(xù)分析請(qǐng)求代碼:

第2行代碼:currentClient調(diào)用request方法發(fā)送請(qǐng)求并返回CompletableFuture,進(jìn)一步跟蹤request源碼:

public CompletableFuture<Object> request(Object request, int timeout, ExecutorService executor) throws RemotingException {
    if (closed) {
        throw new RemotingException(this.getLocalAddress(), null, "Failed to send request " + request + ", cause: The channel " + this + " is closed!");
    }
    // 創(chuàng)建請(qǐng)求共屈,這里隱藏了構(gòu)造方法中創(chuàng)建請(qǐng)求id的細(xì)節(jié),這也是關(guān)聯(lián)request和response的關(guān)鍵党窜。
    Request req = new Request();
    req.setVersion(Version.getProtocolVersion());
    req.setTwoWay(true);
    req.setData(request);
    // 創(chuàng)建Future對(duì)象拗引,將請(qǐng)求以及線程池保存到Future中
    DefaultFuture future = DefaultFuture.newFuture(channel, req, timeout, executor);
    try {
        channel.send(req);
    } catch (RemotingException e) {
        future.cancel();
        throw e;
    }
    return future;
}

這段代碼主要完成了三件事:

  • 請(qǐng)求對(duì)象的創(chuàng)建,在new Request()這里隱藏了構(gòu)造方法中創(chuàng)建請(qǐng)求id的細(xì)節(jié)幌衣,這也是關(guān)聯(lián)request和response的關(guān)鍵矾削,響應(yīng)消息帶上request中生成的id,consumer收到消息時(shí)就能找到對(duì)應(yīng)的request上下文豁护。
  • 創(chuàng)建DefaultFuture對(duì)象哼凯,把請(qǐng)求、線程池等信息存儲(chǔ)起來(lái)楚里,在DefaultFuture中有個(gè)非常重要的全局靜態(tài)變量Map<Long, DefaultFuture> FUTURES = new ConcurrentHashMap<>();断部,FUTURES中的key是剛剛請(qǐng)求創(chuàng)建的唯一id,value即為DefaultFuture對(duì)象班缎。收到響應(yīng)消息之后蝴光,可以很方便的通過(guò)Response中攜帶的id信息查到對(duì)應(yīng)的信息。
  • 調(diào)用消息發(fā)送吝梅。

返回future對(duì)象之后虱疏,對(duì)象會(huì)被進(jìn)一步封裝成AsyncRpcResult,到這里貌似主流程都結(jié)束了并沒(méi)有看到調(diào)用``waitAndDrain()方法產(chǎn)生阻塞關(guān)鍵代碼苏携。Dubbo將這塊代碼封裝到了AsyncToSyncInvoker`中:

@Override
public Result invoke(Invocation invocation) throws RpcException {
 1.   Result asyncResult = invoker.invoke(invocation);

    try {
 2.       if (InvokeMode.SYNC == ((RpcInvocation) invocation).getInvokeMode()) {
            /**
             * NOTICE!
             * must call {@link java.util.concurrent.CompletableFuture#get(long, TimeUnit)} because
             * {@link java.util.concurrent.CompletableFuture#get()} was proved to have serious performance drop.
             */
  3.          asyncResult.get(Integer.MAX_VALUE, TimeUnit.MILLISECONDS);
        }
    // 省略非關(guān)鍵代碼
}

可以看到第2行判斷是否為同步請(qǐng)求做瞪,如果是同步請(qǐng)求會(huì)調(diào)用第3行的get()方法,而該方法跟進(jìn)去之后如下:

public Result get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
1.   if (executor != null && executor instanceof ThreadlessExecutor) {
        ThreadlessExecutor threadlessExecutor = (ThreadlessExecutor) executor;
2.        threadlessExecutor.waitAndDrain();
    }
    return responseFuture.get(timeout, unit);
}

這里可以看到第一行判斷線程池是否為ThreadlessExecutor,如果是則會(huì)調(diào)用waitAndDrain()阻塞等待右冻,到這里同步請(qǐng)求的過(guò)程我們已經(jīng)分析完畢装蓬,接下來(lái)我們來(lái)分析同步請(qǐng)求的響應(yīng)區(qū)里過(guò)程。

還記上篇中出現(xiàn)的AllChannelHandler嗎纱扭,這個(gè)類將部分IO事件委派給Dubbo中的線程池處理牍帚,其中就包括了消息的接收,我們一起來(lái)看下源碼:

@Override
public void received(Channel channel, Object message) throws RemotingException {
1.    ExecutorService executor = getPreferredExecutorService(message);
    try {
2.        executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
    } catch (Throwable t) {
       if(message instanceof Request && t instanceof RejectedExecutionException){
            sendFeedback(channel, (Request) message, t);
            return;
       }
        throw new ExecutionException(message, channel, getClass() + " error when process received event .", t);
    }
}

第1行通過(guò)返回的消息獲取線程池乳蛾,第2行將消息進(jìn)一步封裝成任務(wù)交給剛剛線程池處理暗赶。我們進(jìn)一步跟進(jìn)獲取線程池的代碼鄙币,剖析Dubbo是如何通過(guò)響應(yīng)找到請(qǐng)求的上下文的。

public ExecutorService getPreferredExecutorService(Object msg) {
1.   if (msg instanceof Response) {
2.       Response response = (Response) msg;
3.       DefaultFuture responseFuture = DefaultFuture.getFuture(response.getId());
        // a typical scenario is the response returned after timeout, the timeout response may has completed the future
4.       if (responseFuture == null) {
5.            return getSharedExecutorService();
6.        } else {
7.            ExecutorService executor = responseFuture.getExecutor();
8.            if (executor == null || executor.isShutdown()) {
9.                executor = getSharedExecutorService();
            }
            return executor;
        }
    } else {
        return getSharedExecutorService();
    }
}

在第3行蹂随,通過(guò)響應(yīng)消息攜帶回來(lái)的id就可以在DefaultFuture的靜態(tài)全局變量Map<Long, DefaultFuture> FUTURES = new ConcurrentHashMap<>();中獲得關(guān)聯(lián)請(qǐng)求的DefaultFuture了十嘿,進(jìn)一步也就可以獲取到該DefaultFuture中存儲(chǔ)的線程池ThreadlessExecutor了。那么任務(wù)被Netty中的工作線程調(diào)用executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));被加入阻塞隊(duì)列后又會(huì)發(fā)生什么呢岳锁,我們來(lái)分析下waitAndDrain()中的源碼:

public void waitAndDrain() throws InterruptedException {
    
    if (finished) {
        return;
    }

1.    Runnable runnable = queue.take();

2.    synchronized (lock) {
3.        waiting = false;
4.        runnable.run();
    }

    runnable = queue.poll();
    while (runnable != null) {
        try {
            runnable.run();
        } catch (Throwable t) {
            logger.info(t);

        }
        runnable = queue.poll();
    }
    // mark the status of ThreadlessExecutor as finished.
    finished = true;
}

在請(qǐng)求時(shí)會(huì)調(diào)用一次waitAndDrain()绩衷,由于隊(duì)列中是空的所以線程會(huì)阻塞在第1行代碼。當(dāng)收到響應(yīng)消息時(shí)新的處理任務(wù)被添加進(jìn)來(lái)激率,代碼會(huì)繼續(xù)執(zhí)行走到第4行執(zhí)行剛剛添加進(jìn)來(lái)的任務(wù)咳燕。下面我們剖析任務(wù)執(zhí)行過(guò)程,調(diào)用鏈如下:
ChannelEventRunnable.run()
-HeaderExchangeHandler.received()
--HeaderExchangeHandler.handleResponse()
---DefaultFuture.received()

public static void received(Channel channel, Response response, boolean timeout) {
        try {
1.            DefaultFuture future = FUTURES.remove(response.getId());
            if (future != null) {
                Timeout t = future.timeoutCheckTask;
                if (!timeout) {
                    // decrease Time
                    t.cancel();
                }
2.                future.doReceived(response);
            } else {
                logger.warn("The timeout response finally returned at "
                        + (new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date()))
                        + ", response " + response
                        + (channel == null ? "" : ", channel: " + channel.getLocalAddress()
                        + " -> " + channel.getRemoteAddress()));
            }
        } finally {
            CHANNELS.remove(response.getId());
        }
}

第1行通過(guò)調(diào)用id獲取future對(duì)象乒躺,第2行調(diào)用doReceived()方法招盲,方法內(nèi)容如下:

private void doReceived(Response res) {
        if (res == null) {
            throw new IllegalStateException("response cannot be null");
        }
        if (res.getStatus() == Response.OK) {
1.            this.complete(res.getResult());
        } else if (res.getStatus() == Response.CLIENT_TIMEOUT || res.getStatus() == Response.SERVER_TIMEOUT) {
            this.completeExceptionally(new TimeoutException(res.getStatus() == Response.SERVER_TIMEOUT, channel, res.getErrorMessage()));
        } else {
            this.completeExceptionally(new RemotingException(channel, res.getErrorMessage()));
        }

        // the result is returning, but the caller thread may still waiting
        // to avoid endless waiting for whatever reason, notify caller thread to return.
        if (executor != null && executor instanceof ThreadlessExecutor) {
            ThreadlessExecutor threadlessExecutor = (ThreadlessExecutor) executor;
            if (threadlessExecutor.isWaiting()) {
                threadlessExecutor.notifyReturn(new IllegalStateException("The result has returned, but the biz thread is still waiting" +
                        " which is not an expected state, interrupt the thread manually by returning an exception."));
            }
        }
    }

這里關(guān)鍵代碼就一行this.complete(res.getResult());。首先通過(guò)res.getResult()獲取返回值聪蘸,調(diào)用this.complete()并將返回值傳入宪肖。如果該future注冊(cè)了類如whenComplete()回調(diào)函數(shù)就會(huì)在此時(shí)觸發(fā)。

異步請(qǐng)求

在本篇開(kāi)始的時(shí)候我們就通過(guò)簡(jiǎn)單的示例演示了Dubbo中兩種異步不同的使用方式健爬,一種是通過(guò)get()主動(dòng)阻塞獲取返回值(是否阻塞要看get時(shí)消息是否返回了)控乾,另一種則是完全異步由其他的線程執(zhí)行回調(diào)方法。

通過(guò)上面對(duì)同步的分析娜遵,假設(shè)AsyncToSyncInvokerasyncResult.get(Integer.MAX_VALUE, TimeUnit.MILLISECONDS);這段代碼不執(zhí)行會(huì)是不是就可以直接返回了呢蜕衡?

答案是肯定,沒(méi)有了阻塞的過(guò)程设拟,在請(qǐng)求消息發(fā)送之后就立刻返回了慨仿,忽略了這段代碼就是異步請(qǐng)求了。同步請(qǐng)求與客戶端采用future.get()這種方式獲取結(jié)果集區(qū)別在于前者是框架內(nèi)部阻塞纳胧,后者是客戶端自己主動(dòng)阻塞镰吆。同步阻塞是借助阻塞隊(duì)列實(shí)現(xiàn)的,而future.get()是借助CompletableFuture.waitingGet實(shí)現(xiàn)的跑慕。

而采用whenComplete異步回調(diào)方式則不會(huì)阻塞當(dāng)前線程万皿,異步的回調(diào)是在響應(yīng)結(jié)果到達(dá)之后,通過(guò)共享線程池中的線程執(zhí)行this.complete(res.getResult());方法核行,來(lái)回調(diào)whenCompelete方法中的內(nèi)容牢硅。

到這里異步請(qǐng)求的剖析也就結(jié)束了,可以看到Dubbo中同步請(qǐng)求和異步請(qǐng)求代碼大部分都是一樣的芝雪,但是通過(guò)巧妙的ThreadLessExecutor設(shè)計(jì)完成了異步轉(zhuǎn)同步的操作减余,再借助于CompletableFuture中提供的異步特性實(shí)現(xiàn)了真正實(shí)現(xiàn)了請(qǐng)求異步。

Provider端

上面我們簡(jiǎn)單介紹了consumer端的幾種請(qǐng)求模型惩系,那么provider端有哪些響應(yīng)模型呢位岔?

一個(gè)請(qǐng)求過(guò)來(lái)時(shí)如筛,在provider端大致要經(jīng)過(guò)以下幾個(gè)處理過(guò)程:

  • 首先接收請(qǐng)求
  • 反射調(diào)用對(duì)應(yīng)的方法獲取返回結(jié)果
  • 將返回?cái)?shù)據(jù)發(fā)送給consumer端。

在上篇線程模型中我們介紹了Dubbo在默認(rèn)策略下赃承,請(qǐng)求的接收會(huì)由單獨(dú)創(chuàng)建的線程池處理妙黍,而非Netty的工作線程處理悴侵。請(qǐng)求接收到之后瞧剖,經(jīng)過(guò)解析處理最終會(huì)通過(guò)代理類反射調(diào)用對(duì)應(yīng)的方法并拿到結(jié)果,下面為調(diào)用方法的關(guān)鍵代碼(位于AbstractProxyInvoker)可免。

public Result invoke(Invocation invocation) throws RpcException {
        try {
            // 反射調(diào)用方法獲取結(jié)果
            System.err.println("threadName:" + Thread.currentThread().getName() + "發(fā)起了調(diào)用");
            Object value = doInvoke(proxy, invocation.getMethodName(), invocation.getParameterTypes(), invocation.getArguments());
            // 將返回結(jié)果封裝成CompletableFuture
            CompletableFuture<Object> future = wrapWithFuture(value);
            // 將結(jié)果通過(guò)AppResponse封裝
            CompletableFuture<AppResponse> appResponseFuture = future.handle((obj, t) -> {
                AppResponse result = new AppResponse();
                if (t != null) {
                    if (t instanceof CompletionException) {
                        result.setException(t.getCause());
                    } else {
                        result.setException(t);
                    }
                } else {
                    result.setValue(obj);
                }
                return result;
            });
            // 返回支持異步結(jié)果
            return new AsyncRpcResult(appResponseFuture, invocation);
        } catch (InvocationTargetException e) {
            if (RpcContext.getContext().isAsyncStarted() && !RpcContext.getContext().stopAsync()) {
                logger.error("Provider async started, but got an exception from the original method, cannot write the exception back to consumer because an async result may have returned the new thread.", e);
            }
            return AsyncRpcResult.newDefaultAsyncResult(null, e.getTargetException(), invocation);
        } catch (Throwable e) {
            throw new RpcException("Failed to invoke remote proxy method " + invocation.getMethodName() + " to " + getUrl() + ", cause: " + e.getMessage(), e);
        }
    }

以上調(diào)用方法代碼會(huì)將結(jié)果統(tǒng)一封裝為支持異步的CompletableFuture抓于。如果被調(diào)用的方法本身不支持異步,那么在這里主線程即調(diào)用方法的都是一個(gè)線程浇借。下面示例我們以sayHelloAsync方法為例演示服務(wù)端異步的場(chǎng)景:

@Override
public CompletableFuture<String> sayHelloAsync(String name) {
    return CompletableFuture.supplyAsync(() -> {
        System.err.println("threadName:" + Thread.currentThread().getName() + "執(zhí)行了方法");
        LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(3));
        return "Hello";
    });
}

在執(zhí)行調(diào)用前以及方法內(nèi)部分別打印了線程名稱捉撮,輸出如下內(nèi)容:

threadName:DubboServerHandler-10.0.107.214:20880-thread-2發(fā)起了調(diào)用
threadName:ForkJoinPool.commonPool-worker-9執(zhí)行了方法

可以明顯看到兩者是由不同線程執(zhí)行的,而同步的輸入如下的內(nèi)容:

threadName:DubboServerHandler-10.0.107.214:20880-thread-2發(fā)起了調(diào)用
threadName:DubboServerHandler-10.0.107.214:20880-thread-2 執(zhí)行了方法

可以看到調(diào)用以及方法的執(zhí)行都是有同一個(gè)線程負(fù)責(zé)的妇垢。

上面我們分析了provider端請(qǐng)求接收即服務(wù)方法調(diào)用的過(guò)程巾遭,下面我們繼續(xù)分析結(jié)果發(fā)送的過(guò)程(HeaderExchangeHandler)。

繼上面反射調(diào)用之后獲得返回的CompletableFuture闯估,這里注冊(cè)了回調(diào)方法whenComplete灼舍,方法將調(diào)用結(jié)果通過(guò)channel發(fā)送出去。

void handleRequest(final ExchangeChannel channel, Request req) throws RemotingException {
    // 省略無(wú)關(guān)代碼
    // find handler by message class.
    Object msg = req.getData();
    try {
        // 反射調(diào)用方法
        CompletionStage<Object> future = handler.reply(channel, msg);
        future.whenComplete((appResult, t) -> {
            System.err.println("threadName:" + Thread.currentThread().getName() + "執(zhí)行了回調(diào)");
            try {
                if (t == null) {
                    res.setStatus(Response.OK);
                    res.setResult(appResult);
                } else {
                    res.setStatus(Response.SERVICE_ERROR);
                    res.setErrorMessage(StringUtils.toString(t));
                }
                // 發(fā)送返回消息
                channel.send(res);
            } catch (RemotingException e) {
                logger.warn("Send result to consumer failed, channel is " + channel + ", msg is " + e);
            }
        });
    } catch (Throwable e) {
        res.setStatus(Response.SERVICE_ERROR);
        res.setErrorMessage(StringUtils.toString(e));
        channel.send(res);
    }
}

進(jìn)一步追蹤發(fā)送消息代碼涨薪,調(diào)用鏈路如下:

-HeaderExchangeHandler
--HeaderExchangeChannel
---NettyChannel
----AbstractChannel
-----DefaultChannelPipeline
------AbstractChannelHandlerContext

最終關(guān)鍵代碼如下:

private static void safeExecute(EventExecutor executor, Runnable runnable, ChannelPromise promise, Object msg) {
        try {
            executor.execute(runnable);
        } catch (Throwable cause) {
            try {
                promise.setFailure(cause);
            } finally {
                if (msg != null) {
                    ReferenceCountUtil.release(msg);
                }
            }
        }
    }

這里出現(xiàn)的executor是Netty中的worker線程池骑素,真正的消息響應(yīng)發(fā)送的工作是由該組線程池來(lái)完成的。到這里消息響應(yīng)的關(guān)鍵過(guò)程已經(jīng)分析完畢刚夺,總結(jié)示意圖如下所示:

net_model_02.png

在異步場(chǎng)景時(shí)献丑,Dubbo Server Handler Thread 只負(fù)責(zé)服務(wù)調(diào)用,方法的執(zhí)行由異步的ForkJoin Common Pool完成(業(yè)務(wù)線程)侠姑,當(dāng)然也可以手動(dòng)指定線程池创橄。當(dāng)方法執(zhí)行完畢后在回調(diào)方法中完成消息發(fā)送的方法觸發(fā),消息的發(fā)送則又由單獨(dú)的Netty Work Thread來(lái)完成了莽红。

在同步場(chǎng)景時(shí)妥畏,Dubbo Server Handler Thread 需要負(fù)責(zé)服務(wù)調(diào)用以及方法的執(zhí)行,獲取結(jié)構(gòu)后則觸發(fā)消息發(fā)送船老,具體的發(fā)送動(dòng)作由單獨(dú)的Netty Work Thread來(lái)完成咖熟,可見(jiàn)同步和異步的區(qū)別在于方法的執(zhí)行是由誰(shuí)來(lái)完成。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末柳畔,一起剝皮案震驚了整個(gè)濱河市馍管,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌薪韩,老刑警劉巖确沸,帶你破解...
    沈念sama閱讀 212,884評(píng)論 6 492
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件捌锭,死亡現(xiàn)場(chǎng)離奇詭異,居然都是意外死亡罗捎,警方通過(guò)查閱死者的電腦和手機(jī)观谦,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,755評(píng)論 3 385
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)桨菜,“玉大人豁状,你說(shuō)我怎么就攤上這事〉沟茫” “怎么了泻红?”我有些...
    開(kāi)封第一講書人閱讀 158,369評(píng)論 0 348
  • 文/不壞的土叔 我叫張陵,是天一觀的道長(zhǎng)霞掺。 經(jīng)常有香客問(wèn)我谊路,道長(zhǎng),這世上最難降的妖魔是什么菩彬? 我笑而不...
    開(kāi)封第一講書人閱讀 56,799評(píng)論 1 285
  • 正文 為了忘掉前任缠劝,我火速辦了婚禮,結(jié)果婚禮上骗灶,老公的妹妹穿的比我還像新娘惨恭。我一直安慰自己,他們只是感情好矿卑,可當(dāng)我...
    茶點(diǎn)故事閱讀 65,910評(píng)論 6 386
  • 文/花漫 我一把揭開(kāi)白布喉恋。 她就那樣靜靜地躺著,像睡著了一般母廷。 火紅的嫁衣襯著肌膚如雪轻黑。 梳的紋絲不亂的頭發(fā)上,一...
    開(kāi)封第一講書人閱讀 50,096評(píng)論 1 291
  • 那天琴昆,我揣著相機(jī)與錄音氓鄙,去河邊找鬼。 笑死业舍,一個(gè)胖子當(dāng)著我的面吹牛抖拦,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播舷暮,決...
    沈念sama閱讀 39,159評(píng)論 3 411
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼态罪,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼!你這毒婦竟也來(lái)了下面?” 一聲冷哼從身側(cè)響起复颈,我...
    開(kāi)封第一講書人閱讀 37,917評(píng)論 0 268
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤,失蹤者是張志新(化名)和其女友劉穎沥割,沒(méi)想到半個(gè)月后耗啦,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體凿菩,經(jīng)...
    沈念sama閱讀 44,360評(píng)論 1 303
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,673評(píng)論 2 327
  • 正文 我和宋清朗相戀三年帜讲,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了衅谷。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 38,814評(píng)論 1 341
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡似将,死狀恐怖获黔,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情玩郊,我是刑警寧澤肢执,帶...
    沈念sama閱讀 34,509評(píng)論 4 334
  • 正文 年R本政府宣布,位于F島的核電站译红,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏兴溜。R本人自食惡果不足惜侦厚,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 40,156評(píng)論 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望拙徽。 院中可真熱鬧刨沦,春花似錦、人聲如沸膘怕。這莊子的主人今日做“春日...
    開(kāi)封第一講書人閱讀 30,882評(píng)論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)岛心。三九已至来破,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間忘古,已是汗流浹背徘禁。 一陣腳步聲響...
    開(kāi)封第一講書人閱讀 32,123評(píng)論 1 267
  • 我被黑心中介騙來(lái)泰國(guó)打工, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留髓堪,地道東北人送朱。 一個(gè)月前我還...
    沈念sama閱讀 46,641評(píng)論 2 362
  • 正文 我出身青樓,卻偏偏與公主長(zhǎng)得像干旁,于是被迫代替她去往敵國(guó)和親驶沼。 傳聞我的和親對(duì)象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 43,728評(píng)論 2 351