接上文钞馁,在客戶端發(fā)出請(qǐng)求后,首先處理的自然是netty匿刮,在IO處理之后僧凰,就進(jìn)入業(yè)務(wù)處理NettyServerHandler。最終的處理任務(wù)就落在了RequestThreadPoolProcessor這個(gè)類(lèi)身上熟丸,主要方法是doProcessRequest
public Future<InvocationResponse> doProcessRequest(final InvocationRequest request,
final ProviderContext providerContext) {
requestContextMap.put(request, providerContext);
doMonitorData(request, providerContext);
Callable<InvocationResponse> requestExecutor = new Callable<InvocationResponse>() {
@Override
public InvocationResponse call() throws Exception {
providerContext.getTimeline().add(new TimePoint(TimePhase.T));
try {
ServiceInvocationHandler invocationHandler = ProviderProcessHandlerFactory
.selectInvocationHandler(providerContext.getRequest().getMessageType());
if (invocationHandler != null) {
providerContext.setThread(Thread.currentThread());
return invocationHandler.handle(providerContext);
}
} catch (Throwable t) {
logger.error("Process request failed with invocation handler, you should never be here.", t);
} finally {
requestContextMap.remove(request);
}
return null;
}
};
final ThreadPool pool = selectThreadPool(request);
try {
checkRequest(pool, request);
providerContext.getTimeline().add(new TimePoint(TimePhase.T));
return pool.submit(requestExecutor);
} catch (RejectedExecutionException e) {
requestContextMap.remove(request);
throw new RejectedException(getProcessorStatistics(pool), e);
}
}
requestExecutor是整個(gè)請(qǐng)求的執(zhí)行器训措,內(nèi)部和請(qǐng)求一樣,通過(guò)chain-filter來(lái)處理光羞。selectThreadPool方法是來(lái)選擇一個(gè)線程池來(lái)處理請(qǐng)求绩鸣。pigeon提供了方法級(jí)別的線程池、服務(wù)級(jí)別的線程池和用戶自定義的線程池纱兑。默認(rèn)是使用共享線程池和慢速線程池呀闻。為什么會(huì)有這么多種線程池呢?比如默認(rèn)提供的兩種潜慎,是考慮如果有請(qǐng)求比較緩慢捡多,那么會(huì)將請(qǐng)求扔到慢速線程池,這樣不會(huì)因?yàn)閹讉€(gè)慢速task铐炫,堵死整個(gè)線程池垒手。同理,開(kāi)發(fā)人員也可以用自己的策略來(lái)使用不同的線程池處理task驳遵。
接下來(lái)淫奔,看看服務(wù)端的filter
registerBizProcessFilter(new TraceFilter());
if (Constants.MONITOR_ENABLE) {
registerBizProcessFilter(new MonitorProcessFilter());
}
registerBizProcessFilter(new WriteResponseProcessFilter());
registerBizProcessFilter(new ContextTransferProcessFilter());
registerBizProcessFilter(new ExceptionProcessFilter());
registerBizProcessFilter(new SecurityFilter());
registerBizProcessFilter(new GatewayProcessFilter());
registerBizProcessFilter(new BusinessProcessFilter());
bizInvocationHandler = createInvocationHandler(bizProcessFilters);
TraceFilter和客戶端類(lèi)似。MonitorProcessFilter是用來(lái)處理監(jiān)控的堤结,這個(gè)要看監(jiān)控開(kāi)關(guān)是否打開(kāi)唆迁。WriteResponseProcessFilter是利用netty將結(jié)果返回給客戶端。ContextTransferProcessFilter是用來(lái)處理請(qǐng)求參數(shù)的竞穷。ExceptionProcessFilter是用來(lái)處理在調(diào)用過(guò)程中發(fā)生的異常唐责,之前的版本是沒(méi)有這個(gè)filter的,就會(huì)導(dǎo)致如果服務(wù)端的代碼沒(méi)有捕獲異常直接拋出瘾带,異常信息沒(méi)有辦法正常的傳遞給客戶端鼠哥。SecurityFilter是安全相關(guān)的filter,包括黑名單看政、白名單朴恳、請(qǐng)求秘鑰等等。GatewayProcessFilter處理了一些請(qǐng)求數(shù)攔截的事情允蚣,比如可以設(shè)置某個(gè)服務(wù)最多請(qǐng)求多少次之類(lèi)的于颖。BusinessProcessFilter核心就是通過(guò)反射直接調(diào)用請(qǐng)求的方法,完成遠(yuǎn)程方法調(diào)用嚷兔。
以上兩篇就是整個(gè)RPC的請(qǐng)求和應(yīng)答處理的過(guò)程森渐,其實(shí)比較簡(jiǎn)單,后面兩遍針對(duì)一些特定的問(wèn)題冒晰,講述下pigeon是用了哪些方法解決的同衣,主要有一些很有用的小技巧,趕腳看了源碼之后學(xué)到不少壶运。