RPC調(diào)用處理請求結(jié)果可以分為兩部分:
- 獲取 response
- 關(guān)聯(lián) request 和 response
這么分看起來似乎很奇怪至朗,不是直接等待處理完獲取結(jié)果就行了嗎拱撵?
我們說 RPC 調(diào)用都是在模擬這個動作: Result result = service.call(args);
但是遠程調(diào)用畢竟不是本地調(diào)用(其實稍后可以看到還是有相似之處的)篷店,將請求寫到網(wǎng)絡(luò)之后昔园,就無法命令遠端做任何事了扁瓢,這次請求就已經(jīng)告一段落了。
pigeon client 只知道:
- 向網(wǎng)絡(luò)寫數(shù)據(jù)堕战;
- 就是寫 request
- 處理網(wǎng)絡(luò)寫入的數(shù)據(jù)坤溃;
- 處理成 response
于是不難理解為何有此一問:網(wǎng)絡(luò)另一端寫過來的數(shù)據(jù),我怎么知道是哪個請求的返回值呢嘱丢?
獲取 response
其實從網(wǎng)絡(luò)讀取數(shù)據(jù)薪介,轉(zhuǎn)化成 Object。 pigeon 基于 netty屿讽,獲取 response 就是處理網(wǎng)絡(luò)寫入。
具體實現(xiàn)在:
// com.dianping.pigeon.remoting.netty.invoker.NettyClientHandler#messageReceived
@Override
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) {
CodecEvent codecEvent = (CodecEvent) e.getMessage();
if (codecEvent.isValid() && codecEvent.getInvocation() != null) {
client.processResponse((InvocationResponse) codecEvent.getInvocation());
}
}
從 netty 封裝 MessageEvent 轉(zhuǎn)換成 CodecEvent吠裆,再剝開一層就是 InvocationResponse
伐谈,當(dāng)然這還是一個比較泛化的返回值,com.dianping.pigeon.remoting.invoker.process.ResponseProcessor
將會進一步處理试疙;
繼續(xù)往下看:
// com.dianping.pigeon.remoting.invoker.process.threadpool.ResponseThreadPoolProcessor#doProcessResponse
public void doProcessResponse(final InvocationResponse response, final Client client) {
Runnable task = new Runnable() {
public void run() {
ServiceInvocationRepository.getInstance().receiveResponse(response);
}
};
try {
responseProcessThreadPool.execute(task);
} catch (RejectedExecutionException e) {
String error = String.format("process response failed:%s, processor stats:%s", response,
getProcessorStatistics());
throw new RejectedException(error, e);
}
}
這里封裝成了一個 task诵棵,交給線程池處理。
再下一層:
// com.dianping.pigeon.remoting.invoker.service.ServiceInvocationRepository#receiveResponse
public class ServiceInvocationRepository {
// 略
private static Map<Long, RemoteInvocationBean> invocations = new ConcurrentHashMap<Long, RemoteInvocationBean>();
public void receiveResponse(InvocationResponse response) {
RemoteInvocationBean invocationBean = invocations.get(response.getSequence());
if (invocationBean != null) {
if (logger.isDebugEnabled()) {
logger.debug("received response:" + response);
}
InvocationRequest request = invocationBean.request;
try {
Callback callback = invocationBean.callback;
if (callback != null) {
Client client = callback.getClient();
if (client != null) {
ServiceStatisticsHolder.flowOut(request, client.getAddress());
}
callback.callback(response);
callback.run();
}
} finally {
invocations.remove(response.getSequence());
}
}
}
// 略
}
這里可以看到祝旷, RemoteInvocationBean invocationBean = invocations.get(response.getSequence());
invocations 維護一個 HashMap履澳,key 是一個 long 型的 sequenceId,通過這種方式定位到 invocationBean怀跛,而 invocationBean 看實現(xiàn)可知持有 request 引用距贷,以及一個處理返回值的 callback。
Callback 在不同調(diào)用模式(sync / future/ oneway/ callback) 下有不同實現(xiàn)類吻谋,比如 sync 模式下:
// public class CallbackFuture implements Callback, CallFuture {
@Override
public void callback(InvocationResponse response) {
this.response = response;
}
sync 和 future 調(diào)用忠蝗,都是將 response 對象實例設(shè)置給相應(yīng)的引用
何時得到真正的 returnValue?
看動態(tài)代理的邏輯漓拾,com.dianping.pigeon.remoting.invoker.service.ServiceInvocationProxy#invoke
在 com.dianping.pigeon.remoting.invoker.process.filter.InvocationInvokeFilter
挨個執(zhí)行完之后,提取返回值
// com.dianping.pigeon.remoting.invoker.service.ServiceInvocationProxy#extractResult
public Object extractResult(InvocationResponse response, Class<?> returnType) throws Throwable {
Object responseReturn = response.getReturn();
// ...
}
拿到的是頂層父類實例 Object,具體的類型匹配需要客戶端和服務(wù)端自行匹配
關(guān)聯(lián) request 和 response
從上面的分析可以看出绿渣,關(guān)鍵點就在于 sequence拳喻,每次調(diào)用應(yīng)該有個唯一的 id 進行匹配
這個 sequence 是唯一的嗎?
如果不唯一低千,就可能導(dǎo)致拿到錯誤的處理結(jié)果配阵。
sequence 的生成位置
//com.dianping.pigeon.remoting.invoker.process.filter.ContextPrepareInvokeFilter#initRequest
private static AtomicLong requestSequenceMaker = new AtomicLong();
request.setSequence(requestSequenceMaker.incrementAndGet() * -1);
可以看到 這個 sequence 是全局唯一的,準確說是同一個 JVM 中是唯一的,而且是 long 類型闸餐,足夠大饱亮;
Q:sequence 發(fā)生回繞怎么辦?
A:long 類型舍沙,即使發(fā)生回繞近上,也需要足夠長的時間,一般來說不會堆積有那么多的請求拂铡,導(dǎo)致兩個相同的 sequenceId 實際對應(yīng)不同請求壹无;
分布式環(huán)境下,sequenceId 在多個機器上可能重復(fù)感帅,會出錯嗎斗锭?
A:sequence 存儲的 com.dianping.pigeon.remoting.invoker.service.ServiceInvocationRepository#invocations
也是同一個 JVM 唯一的,所以只需要擔(dān)心會不會有這樣的場景:
client A 調(diào)用 server A失球,client B 也調(diào)用 server A岖是,但是 server A 把 client B的請求返回值處理之后發(fā)送到了 client A?
看看服務(wù)端的處理实苞,寫返回值:
//com.dianping.pigeon.remoting.provider.process.filter.WriteResponseProcessFilter#invoke
public InvocationResponse invoke(ServiceInvocationHandler handler, ProviderContext invocationContext)
throws Throwable {
try {
ProviderChannel channel = invocationContext.getChannel();
InvocationRequest request = invocationContext.getRequest();
InvocationResponse response = handler.handle(invocationContext);
if (request.getCallType() == Constants.CALLTYPE_REPLY) {
invocationContext.getTimeline().add(new TimePoint(TimePhase.P));
channel.write(invocationContext, response);
invocationContext.getTimeline().add(new TimePoint(TimePhase.P));
}
// ...
channel 就是對socket的封裝豺撑,可以看成是 client / server 對對方的抽象。
那么只需要保證拿到正確的 channel 就對了:
//com.dianping.pigeon.remoting.netty.provider.NettyServerHandler#messageReceived
public void messageReceived(ChannelHandlerContext ctx, MessageEvent message) {
CodecEvent codecEvent = (CodecEvent) (message.getMessage());
if (!codecEvent.isValid() || codecEvent.getInvocation() == null) {
return;
}
InvocationRequest request = (InvocationRequest) codecEvent.getInvocation();
ProviderContext invocationContext = new DefaultProviderContext(request, new NettyServerChannel(ctx.getChannel()));
//...
}
也就是說黔牵,再 client 端 writeRequest() 之后聪轿,server 端讀取網(wǎng)絡(luò)數(shù)據(jù)的時候就從 context 中獲取到 client 所在的 channel 了,簡單來說猾浦,”從哪里來陆错,到哪里去“。
總的來說金赦,不會發(fā)生以上所述的 sequenceId 錯亂的問題音瓷。