pigeon源碼分析-同步調(diào)用和異步調(diào)用
Pigeon是美團點評內(nèi)部廣泛使用的一個分布式服務(wù)通信框架(RPC)述雾,本文所有分析基于 pigeon 開源版本:RPC framework of DIANPING路幸,具體版本為:2.9.12-SNAPSHOT
概要:先從調(diào)用方視角 RPC 框架基本原理,然后介紹動態(tài)代理的生成,從這里切入之后再詳細介紹同步、異步調(diào)用;
RPC就是要在分布式場景下完成這樣一個動作: Result result = service.call(args)
肃拜,在這個=
等號背后 RPC 框架做了很多事。
RPC 調(diào)用在客戶端視角下的分解
簡單來說可以分成這幾步:
- Client client = findClient(); // 這一步主要是服務(wù)治理相關(guān)雌团,本文不關(guān)注燃领;
- client.write(request);
- Object resp = waitResponse();
- Result result = convert2Result(resp);
pigeon 在客戶端是如何生成代理的?
遠程調(diào)用離不開動態(tài)代理锦援,其實 pigeon service 在客戶端視角就是生成了一個被調(diào)用接口的實現(xiàn)類猛蔽,通過網(wǎng)絡(luò)請求來返回數(shù)據(jù),這樣就好像實現(xiàn)類在本地一樣雨涛;
生成代理的邏輯:
在 bean 初始化的時候枢舶,獲取 Proxy 實例,代碼如下:
// com.dianping.pigeon.remoting.invoker.config.spring.ReferenceBean#init
public void init() throws Exception {
// 進行初始化配置
this.obj = ServiceFactory.getService(invokerConfig);
// 略
}
使用 JDK 動態(tài)代理替久,真正執(zhí)行的邏輯都在 java.lang.reflect.InvocationHandler
中凉泄,
可以看到 invoke() 時執(zhí)行的是 com.dianping.pigeon.remoting.invoker.service.ServiceInvocationProxy#extractResult
,執(zhí)行的邏輯是一個 FilterChain蚯根,成員見 com.dianping.pigeon.remoting.invoker.process.InvokerProcessHandlerFactory#init
后众,每個 InvocationInvokeFilter 的作用基本見名知意,完成一些統(tǒng)計颅拦、上下文準(zhǔn)備蒂誉、監(jiān)控等功能;
終于到了 RemoteCallInvokeFilter距帅,這是執(zhí)行遠程調(diào)用最關(guān)鍵的一步右锨,主要代碼如下:
//com.dianping.pigeon.remoting.invoker.process.filter.RemoteCallInvokeFilter#invoke
@Override
public InvocationResponse invoke(ServiceInvocationHandler handler, InvokerContext invocationContext)
throws Throwable {
// 略
try {
switch (callMethod) {
case SYNC:
// do call
break;
case CALLBACK:
// do call
break;
case FUTURE:
//do call
break;
case ONEWAY:
//do call
break;
default:
throw new BadRequestException("Call type[" + callMethod.getName() + "] is not supported!");
}
((DefaultInvokerContext)invocationContext).setResponse(response);
afterInvoke(invocationContext);
} catch (Throwable t) {
afterThrowing(invocationContext, t);
throw t;
}
return response;
}
可以看到,pigeon 支持四種調(diào)用類型:sync, callback, future, oneway
這里主要分析 sync 和 future碌秸,分別對應(yīng)一般意義上的同步绍移、異步調(diào)用模式,其他兩種是在這兩種上做了特殊處理讥电;
SYNC-同步調(diào)用
同步調(diào)用蹂窖,主要邏輯:
case SYNC:
CallbackFuture future = new CallbackFuture();
response = InvokerUtils.sendRequest(client, invocationContext.getRequest(), future);
invocationContext.getTimeline().add(new TimePoint(TimePhase.Q));
if (response == null) {
response = future.getResponse(request.getTimeout());
}
break;
先初始化了一個 callbackFuture,之后 sendRequest()恩敌,最后獲取 response
進一步查看 sendRequest() 實現(xiàn):
com.dianping.pigeon.remoting.invoker.util.InvokerUtils#sendRequest(com.dianping.pigeon.remoting.invoker.Client, com.dianping.pigeon.remoting.common.domain.InvocationRequest, com.dianping.pigeon.remoting.invoker.concurrent.Callback)
--> com.dianping.pigeon.remoting.invoker.AbstractClient#write
實際的 doWrite() 有兩種實現(xiàn)瞬测,HTTP / netty,這里看 NettyClient
--> com.dianping.pigeon.remoting.netty.invoker.NettyClient#doWrite
// 調(diào)用 netty client 寫請求
@Override
public InvocationResponse doWrite(InvocationRequest request) throws NetworkException {
NettyChannel channel = null;
try {
channel = channelPool.selectChannel();
ChannelFuture future = channel.write0(request);
afterWrite(request, channel);
if (request.getMessageType() == Constants.MESSAGE_TYPE_SERVICE
|| request.getMessageType() == Constants.MESSAGE_TYPE_HEART) {
future.addListener(new MessageWriteListener(request, channel));
}
} catch (Exception e) {
throw new NetworkException("[doRequest] remote call failed:" + request, e);
}
return null;
}
仔細看這里永遠返回 null,然后再看 sync 調(diào)用執(zhí)行的下一步月趟,就是 response = future.getResponse(request.getTimeout());
, 判斷 sendRequest() 返回結(jié)果為 null灯蝴,就從 future 獲取結(jié)果,這兩步就聯(lián)系起來了孝宗;
而 getResponse(timeout) 所做的事情只有一件:while 循環(huán)等待到 isDone == true || timeout绽乔,這里不詳細展示,看代碼即可碳褒;
拓展一下,這里只做等待看疗,那么返回值從哪里來呢沙峻?答案是等待 socket 的另一端也就是 server 回寫;
回過頭來看两芳,remoteCallInvocationFilter 所做的就是寫請求摔寨,然后等待接受返回值;
FUTURE-異步調(diào)用
Future 調(diào)用是 pigeon 提供的標(biāo)準(zhǔn)異步調(diào)用模式怖辆。使用方式如下:
//調(diào)用ServiceA的method1
serviceA.method1("aaa");
//獲取ServiceA的method1調(diào)用future狀態(tài)
Future future1OfServiceA = InvokerHelper.getFuture();
//調(diào)用ServiceA的method2
serviceA.method2("bbb");
//獲取ServiceA的method2調(diào)用future狀態(tài)
Future future2OfServiceA = InvokerHelper.getFuture();
//獲取ServiceA的method2調(diào)用結(jié)果
Object result2OfServiceA = future2OfServiceA.get();
//獲取ServiceA的method1調(diào)用結(jié)果
Object result1OfServiceA = future1OfServiceA.get();
簡單來說就是:調(diào)用 —> 獲取 Future —> 獲取結(jié)果是复。
因為每次調(diào)用不用阻塞等待結(jié)果,而是拿到 Future 實例之后由開發(fā)者在合適的時機獲取返回值竖螃,所以合理使用能提高并發(fā)性淑廊;
來看下實現(xiàn):
case FUTURE:
ServiceFutureImpl futureImpl = new ServiceFutureImpl(invocationContext, request.getTimeout());
InvokerUtils.sendRequest(client, invocationContext.getRequest(), futureImpl);
FutureFactory.setFuture(futureImpl);
response = InvokerUtils.createFutureResponse(futureImpl);
invocationContext.getTimeline().add(new TimePoint(TimePhase.Q));
break;
可以看到只有 sendRequest(),沒有等待返回特咆,開發(fā)文檔中有介紹季惩,F(xiàn)uture 調(diào)用之后需要開發(fā)者自行通過 future.get(timeout);
獲取返回值;
FutureFactory.setFuture(futureImpl);
這一步做了什么腻格?
public class FutureFactory {
private static ThreadLocal<Future<?>> threadFuture = new ThreadLocal<Future<?>>();
public static void setFuture(Future<?> future) {
threadFuture.set(future);
}
其實就是將 Future 實例設(shè)置到 ThreadLocal 中画拾,所以每次調(diào)用結(jié)束之后立即通過 Future future1OfServiceA = InvokerHelper.getFuture();
獲取Future實例,否則如果緊接著進行了下一次future調(diào)用菜职,就會因為ThreadLocal 中的 future 被覆蓋而無法正確獲取返回值青抛;
思考 :返回值的 Future 是什么作用?
答案:其實 pigeon 中的 Future 實現(xiàn)了 JDK 的標(biāo)準(zhǔn)語義酬核,即:A Future represents the result of an asynchronous computation .
代表了一次異步計算的結(jié)果蜜另。簡單來說,就是一個暫時代表結(jié)果的占位符愁茁。
需要結(jié)果的時候就去看看有沒有 response 了蚕钦,有就返回,沒有就阻塞到超時時間到鹅很;
pigeon 同步調(diào)用與異步調(diào)用處理有哪些不同
同步嘶居、異步,都是先發(fā)送完請求,然后等待結(jié)果邮屁。
SYNC 模式下整袁,立即等待直到拿到結(jié)果或者超時,然后將結(jié)果(超時情況下則是異常)返回給調(diào)用方佑吝;
FUTURE 模式下坐昙,立即返回個占位符 Future,這里的實現(xiàn)類是:com.dianping.pigeon.remoting.invoker.concurrent.ServiceFutureImpl#ServiceFutureImpl
芋忿,將真正獲取返回值的時機交給調(diào)用方控制炸客;
這里也因為 Future 模式實現(xiàn)時使用了 ThreadLocal 暫時存儲占位符,所以多次 Future 調(diào)用戈钢,一定要順序獲取結(jié)果痹仙;