客戶端服務(wù)接口所持有的對象為RefererInvocationHandler,jdk的代理實(shí)現(xiàn)纤虽,invoke調(diào)用,先封裝DefaultRequest迷扇,遍歷clusters鸥昏,如果存在降級的則跳過,默認(rèn)選第一個(gè)非降級的cluster
- clusters處理
for (Cluster<T> cluster : clusters) {
String protocolSwitcher = MotanConstants.PROTOCOL_SWITCHER_PREFIX + cluster.getUrl().getProtocol();
Switcher switcher = switcherService.getSwitcher(protocolSwitcher);
// 跳過降級的cluster
if (switcher != null && !switcher.isOn()) {
continue;
}
// request設(shè)置各種數(shù)據(jù)
// 最后通過cluster.call調(diào)用
response = cluster.call(request);
}
- 高可用策略處理
// cluster.call烟逊,通過配置的HA策略 來實(shí)現(xiàn)調(diào)用(默認(rèn)配置:FailoverHaStrategy)
public Response call(Request request) {
if (available.get()) {
try {
// loadBalance將在haStrategy中處理
return haStrategy.call(request, loadBalance);
} catch (Exception e) {
return callFalse(request, e);
}
}
return callFalse(request, new MotanServiceException(MotanErrorMsgConstant.SERVICE_UNFOUND));
}
- 負(fù)載均衡處理
// 選擇Refer渣窜,通過loadBalance.selectToHolder(request, referers);選擇
List<Referer<T>> referers = selectReferers(request, loadBalance);
URL refUrl = referers.get(0).getUrl();
//
// 從第一個(gè)url中獲取重試次數(shù)
int tryCount = refUrl.getMethodParameter(request.getMethodName(),
request.getParamtersDesc(), URLParamType.retries.getName(),
URLParamType.retries.getIntValue());
// 如果有問題,則設(shè)置為不重試
if (tryCount < 0) {
tryCount = 0;
}
for (int i = 0; i <= tryCount; i++) {
// 選擇一個(gè)服務(wù)
Referer<T> refer = referers.get(i % referers.size());
try {
// 設(shè)置重試次數(shù)
request.setRetries(i);
// 開始遠(yuǎn)程調(diào)用
return refer.call(request);
} catch (RuntimeException e) {
// 對于業(yè)務(wù)異常宪躯,直接拋出
if (ExceptionUtil.isBizException(e)) {
throw e;
} else if (i >= tryCount) {
throw e;
}
LoggerUtil.warn(String.format("FailoverHaStrategy Call false for request:%s error=%s", request, e.getMessage()));
}
// selectToHolder的處理(選擇refers)
public void selectToHolder(Request request, List<Referer<T>> refersHolder) {
List<Referer<T>> referers = this.referers;
if (referers == null) {
throw new MotanServiceException(this.getClass().getSimpleName() + " No available referers for call : referers_size= 0 "
+ MotanFrameworkUtil.toString(request));
}
if (referers.size() > 1) {
// 多于一個(gè)乔宿,繼續(xù)選(這里就根據(jù)配置的負(fù)載均衡策略來處理,
// 默認(rèn)的是RoundRobinLoadBalance)访雪,但也會返回多個(gè)refer
doSelectToHolder(request, refersHolder);
} else if (referers.size() == 1 && referers.get(0).isAvailable()) {
// 只有一個(gè)详瑞,只記錄當(dāng)前的
refersHolder.add(referers.get(0));
}
if (refersHolder.isEmpty()) {
throw new MotanServiceException(this.getClass().getSimpleName() + " No available referers for call : referers_size="
+ referers.size() + " " + MotanFrameworkUtil.toString(request));
}
}
- refer.call最后在DefaultRpcProtocol的DefaultRpcReferer中通過NettyClient發(fā)出請求
protected Response doCall(Request request) {
try {
// 為了能夠?qū)崿F(xiàn)跨group請求,需要使用server端的group冬阳。
request.setAttachment(URLParamType.group.getName(), serviceUrl.getGroup());
return client.request(request);
} catch (TransportException exception) {
throw new MotanServiceException("DefaultRpcReferer call Error: url=" + url.getUri(), exception);
}
}
- 最終通過netty將消息發(fā)出
ChannelFuture writeFuture = this.channel.write(request);
// 注冊一個(gè)回調(diào)蛤虐,用來處理調(diào)用情況
response.addListener(new FutureListener() {
@Override
public void operationComplete(Future future) throws Exception {
if (future.isSuccess() || (future.isDone() && ExceptionUtil.isBizException(future.getException()))) {
// 成功的調(diào)用
nettyClient.resetErrorCount();
} else {
// 失敗的調(diào)用
nettyClient.incrErrorCount();
}
}
});
服務(wù)器端通過Netty接收客戶端的請求,入口為NettyChannelHandler的messageReceived
- netty 處理
if (message instanceof Request) {
processRequest(ctx, e); // 處理客戶端發(fā)來的請求
} else if (message instanceof Response) {
processResponse(ctx, e);
}
//
// 使用線程池處理調(diào)用請求
threadPoolExecutor.execute(new Runnable() {
@Override
public void run() {
try{
RpcContext.init(request);
processRequest(ctx, request, processStartTime);
}finally{
RpcContext.destroy();
}
}
});
- ProviderProtectedMessageRouter處理
// key:motan-demo-rpc/com.weibo.motan.demo.service.MotanDemoService/1.0
String serviceKey = MotanFrameworkUtil.getServiceKey(request);
// 根據(jù)key獲取Provider(DefaultProvider)
Provider<?> provider = providers.get(serviceKey);
// 通過反射調(diào)用
Method method = lookup(request);
Object value = method.invoke(proxyImpl, request.getArguments());
- 然后進(jìn)入service層代碼處理肝陪,后續(xù)就是結(jié)果返回處理了驳庭。
大概的示意圖: