在此之前我們理清了pigeon服務(wù)方的初始化力图、注冊和消息處理邏輯瑞眼,本篇我們來看看pigeon調(diào)用方的實(shí)現(xiàn)鼠渺。
第一部分我們先看看服務(wù)調(diào)用的實(shí)現(xiàn)。
服務(wù)調(diào)用示例:
@RestController
@RequestMapping("/common")
public class CommonController {
@Autowired
private CommonService commonService;
@RequestMapping(value = "/hello")
@ResponseBody
public String hello(@RequestParam("name") String name) {
System.out.println("enter hello");
return commonService.hello(name);
}
}
CommonService 就是服務(wù)方發(fā)布的服務(wù)接口戚啥,可以看到在調(diào)用方只需要引入相應(yīng)服務(wù)的api jar包枉层,就可以像調(diào)用本地方法一樣調(diào)用對(duì)應(yīng)的服務(wù)接口速缆,這也是大部分RPC框架的實(shí)現(xiàn)效果懂扼。
CommonService 通過@Autowired注解在spring容器中找到對(duì)應(yīng)的bean,我們來看看相應(yīng)的bean配置
<bean id="commonService" class="com.dianping.pigeon.remoting.invoker.config.spring.ReferenceBean" init-method="init">
<!-- 服務(wù)全局唯一的標(biāo)識(shí)url嘹锁,默認(rèn)是服務(wù)接口類名葫录,必須設(shè)置 -->
<property name="url" value="http://service.dianping.com/rpcserver/commonService_1.0.0" />
<!-- 接口名稱,必須設(shè)置 -->
<property name="interfaceName" value="com.study.rpcserver.api.CommonService" />
<!-- 超時(shí)時(shí)間领猾,毫秒米同,默認(rèn)5000,建議自己設(shè)置 -->
<property name="timeout" value="2000" />
<!-- 序列化摔竿,hessian/fst/protostuff窍霞,默認(rèn)hessian,可不設(shè)置-->
<property name="serialize" value="hessian" />
<!-- 調(diào)用方式拯坟,sync/future/callback/oneway但金,默認(rèn)sync,可不設(shè)置 -->
<property name="callType" value="sync" />
<!-- 失敗策略郁季,快速失敗failfast/失敗轉(zhuǎn)移failover/失敗忽略failsafe/并發(fā)取最快返回forking冷溃,默認(rèn)failfast,可不設(shè)置 -->
<property name="cluster" value="failfast" />
<!-- 是否超時(shí)重試梦裂,默認(rèn)false似枕,可不設(shè)置 -->
<property name="timeoutRetry" value="false" />
<!-- 重試次數(shù),默認(rèn)1年柠,可不設(shè)置 -->
<property name="retries" value="1" />
</bean>
ReferenceBean繼承了spring的FactoryBean接口凿歼,來處理復(fù)雜bean的生成,通過getObject()方法來返回對(duì)應(yīng)bean實(shí)例冗恨。接下來我們就以ReferenceBean為入口來切入pigeon調(diào)用方的實(shí)現(xiàn)思路答憔。
public void init() throws Exception {
if (StringUtils.isBlank(interfaceName)) {
throw new IllegalArgumentException("invalid interface:" + interfaceName);
}
this.objType = ClassUtils.loadClass(this.classLoader, this.interfaceName.trim());
//服務(wù)調(diào)用相關(guān)的配置信息,就是我們對(duì)每一個(gè)接口服務(wù)在xml文件中的配置
InvokerConfig<?> invokerConfig = new InvokerConfig(this.objType, this.url, this.timeout, this.callType,
this.serialize, this.callback, this.group, this.writeBufferLimit, this.loadBalance, this.cluster,
this.retries, this.timeoutRetry, this.vip, this.version, this.protocol);
invokerConfig.setClassLoader(classLoader);
invokerConfig.setSecret(secret);
invokerConfig.setRegionPolicy(regionPolicy);
if (!CollectionUtils.isEmpty(methods)) {
Map<String, InvokerMethodConfig> methodMap = new HashMap<String, InvokerMethodConfig>();
invokerConfig.setMethods(methodMap);
for (InvokerMethodConfig method : methods) {
methodMap.put(method.getName(), method);
}
}
checkMock(); // 降級(jí)配置檢查
invokerConfig.setMock(mock);
checkRemoteAppkey();
invokerConfig.setRemoteAppKey(remoteAppKey);
//生成接口的代理對(duì)象
this.obj = ServiceFactory.getService(invokerConfig);
configLoadBalance(invokerConfig);
}
//FactoryBean返回的bean實(shí)例
public Object getObject() {
return this.obj;
}
ServiceFactory.getService(invokerConfig);根據(jù)配置的interfaceName生成一個(gè)java代理對(duì)象
private static ServiceProxy serviceProxy = ServiceProxyLoader.getServiceProxy();
public static <T> T getService(InvokerConfig<T> invokerConfig) throws RpcException {
return serviceProxy.getProxy(invokerConfig);
}
跟蹤代碼掀抹,進(jìn)入AbstractServiceProxy.getProxy方法虐拓,核心代碼如下:
protected final static Map<InvokerConfig<?>, Object> services = new ConcurrentHashMap<InvokerConfig<?>, Object>();
@Override
public <T> T getProxy(InvokerConfig<T> invokerConfig) {
//InvokerConfig實(shí)現(xiàn)了自定義equals和hashCode方法
service = services.get(invokerConfig);
if (service == null) {
synchronized (interner.intern(invokerConfig)) {
service = services.get(invokerConfig);
if (service == null) {
//此處執(zhí)行調(diào)用方的一些初始化邏輯,包括InvokerProcessHandlerFactory.init();初始化調(diào)用方Filter責(zé)任鏈等
InvokerBootStrap.startup();
//生成代理對(duì)象
service = SerializerFactory.getSerializer(invokerConfig.getSerialize()).proxyRequest(invokerConfig);
try {
//獲取服務(wù)信息傲武,創(chuàng)建Client實(shí)例
ClientManager.getInstance().registerClients(invokerConfig);
} catch (Throwable t) {
logger.warn("error while trying to setup service client:" + invokerConfig, t);
}
services.put(invokerConfig, service);
}
}
return (T) service;
}
AbstractSerializer.proxyRequest使用我們熟悉的JDK動(dòng)態(tài)代理來生成服務(wù)接口的代理對(duì)象
@Override
public Object proxyRequest(InvokerConfig<?> invokerConfig) throws SerializationException {
return Proxy.newProxyInstance(ClassUtils.getCurrentClassLoader(invokerConfig.getClassLoader()),
new Class[] { invokerConfig.getServiceInterface() }, new ServiceInvocationProxy(invokerConfig,
InvokerProcessHandlerFactory.selectInvocationHandler(invokerConfig)));
}
//InvokerProcessHandlerFactory.selectInvocationHandler獲取調(diào)用方請求責(zé)任鏈
public static void init() {
if (!isInitialized) {
if (Constants.MONITOR_ENABLE) {
registerBizProcessFilter(new RemoteCallMonitorInvokeFilter());
}
registerBizProcessFilter(new TraceFilter());
registerBizProcessFilter(new DegradationFilter());
//關(guān)于ClusterInvokeFilter后文詳細(xì)介紹
registerBizProcessFilter(new ClusterInvokeFilter());
registerBizProcessFilter(new GatewayInvokeFilter());
registerBizProcessFilter(new ContextPrepareInvokeFilter());
registerBizProcessFilter(new SecurityFilter());
//遠(yuǎn)程調(diào)用
registerBizProcessFilter(new RemoteCallInvokeFilter());
bizInvocationHandler = createInvocationHandler(bizProcessFilters);
isInitialized = true;
}
}
public static ServiceInvocationHandler selectInvocationHandler(InvokerConfig<?> invokerConfig) {
return bizInvocationHandler;
}
ServiceInvocationProxy繼承了java.lang.reflect.InvocationHandler接口蓉驹,invoke實(shí)現(xiàn)邏輯如下:
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
String methodName = method.getName();
//代理對(duì)象的非服務(wù)方法調(diào)用走原有邏輯
Class<?>[] parameterTypes = method.getParameterTypes();
if (method.getDeclaringClass() == Object.class) {
return method.invoke(handler, args);
}
if ("toString".equals(methodName) && parameterTypes.length == 0) {
return handler.toString();
}
if ("hashCode".equals(methodName) && parameterTypes.length == 0) {
return handler.hashCode();
}
if ("equals".equals(methodName) && parameterTypes.length == 1) {
return handler.equals(args[0]);
}
//服務(wù)接口執(zhí)行責(zé)任鏈處理邏輯
return extractResult(handler.handle(new DefaultInvokerContext(invokerConfig, methodName, parameterTypes, args)),
method.getReturnType());
}
同服務(wù)端責(zé)任鏈的分析一樣城榛,我們首先重點(diǎn)看下RemoteCallInvokeFilter的處理邏輯,核心代碼如下:
@Override
public InvocationResponse invoke(ServiceInvocationHandler handler, InvokerContext invocationContext)
throws Throwable {
Client client = invocationContext.getClient();
InvocationRequest request = invocationContext.getRequest();
InvokerConfig<?> invokerConfig = invocationContext.getInvokerConfig();
态兴。狠持。。
//以同步調(diào)用場景分析下遠(yuǎn)程調(diào)用邏輯
CallbackFuture future = new CallbackFuture();
response = InvokerUtils.sendRequest(client, invocationContext.getRequest(), future);
invocationContext.getTimeline().add(new TimePoint(TimePhase.Q));
if (response == null) {
response = future.getResponse(request.getTimeout());
}
return response;
}
public static InvocationResponse sendRequest(Client client, InvocationRequest request, Callback callback) {
InvocationResponse response = response = client.write(request);
return response;
}
client.write(request);最終調(diào)用NettyClient或HttpInvokerClient的doWrite方法發(fā)送請求消息體瞻润。
至此我們理清了服務(wù)調(diào)用的邏輯工坊,簡單來說就是通過JDK動(dòng)態(tài)代理來生成服務(wù)方接口對(duì)應(yīng)的實(shí)例對(duì)象,在方法執(zhí)行邏輯中調(diào)用遠(yuǎn)程服務(wù)敢订。
但對(duì)于每一個(gè)服務(wù)接口,調(diào)用方是如何知道遠(yuǎn)程服務(wù)的訪問地址的呢罢吃?以及新注冊或者下線的服務(wù)地址楚午,調(diào)用方如何得到即時(shí)通知?
接下來進(jìn)入本篇第二部分尿招,遠(yuǎn)程調(diào)用Client的初始化和調(diào)用方對(duì)服務(wù)信息的心跳監(jiān)聽矾柜。
以請求責(zé)任鏈的ClusterInvokeFilter為入口:
public InvocationResponse invoke(ServiceInvocationHandler handler, InvokerContext invocationContext)
throws Throwable {
InvokerConfig<?> invokerConfig = invocationContext.getInvokerConfig();
//失敗策略cluster可配,默認(rèn)為快速失敗failfast
Cluster cluster = ClusterFactory.selectCluster(invokerConfig.getCluster());
if (cluster == null) {
throw new IllegalArgumentException("Unsupported cluster type:" + cluster);
}
return cluster.invoke(handler, invocationContext);
}
跟蹤代碼進(jìn)入FailfastCluster.invoke方法就谜,核心代碼如下:
private ClientManager clientManager = ClientManager.getInstance();
@Override
public InvocationResponse invoke(ServiceInvocationHandler handler, InvokerContext invocationContext)
throws Throwable {
InvokerConfig<?> invokerConfig = invocationContext.getInvokerConfig();
//構(gòu)造請求消息對(duì)象
InvocationRequest request = InvokerUtils.createRemoteCallRequest(invocationContext, invokerConfig);
//是否超時(shí)重試
boolean timeoutRetry = invokerConfig.isTimeoutRetry();
//重試次數(shù)
int retry = invokerConfig.getRetries(invocationContext.getMethodName());
//關(guān)于重試和重試次數(shù)的邏輯在此不做過多說明怪蔑,只摘取主干代碼
//獲取遠(yuǎn)程客戶端
Client remoteClient = clientManager.getClient(invokerConfig, request, null);
//就是在這里設(shè)置的RemoteCallInvokeFilter中用到的客戶端Client
invocationContext.setClient(remoteClient);
try {
//向后執(zhí)行責(zé)任鏈
return handler.handle(invocationContext);
} catch (NetworkException e) {
remoteClient = clientManager.getClient(invokerConfig, request, null);
invocationContext.setClient(remoteClient);
return handler.handle(invocationContext);
}
}
ClientManager 為單例模式,我們看看內(nèi)部實(shí)現(xiàn)
//私有構(gòu)造函數(shù)
private ClientManager() {
this.providerAvailableListener = new ProviderAvailableListener();
this.clusterListener = new DefaultClusterListener(providerAvailableListener);
this.clusterListenerManager.addListener(this.clusterListener);
providerAvailableThreadPool.execute(this.providerAvailableListener);
RegistryEventListener.addListener(providerChangeListener);
RegistryEventListener.addListener(registryConnectionListener);
RegistryEventListener.addListener(groupChangeListener);
registerThreadPool.getExecutor().allowCoreThreadTimeOut(true);
}
private RouteManager routerManager = DefaultRouteManager.INSTANCE;
public Client getClient(InvokerConfig<?> invokerConfig, InvocationRequest request, List<Client> excludeClients) {
//根據(jù)全局唯一標(biāo)識(shí)url獲取Client集合
List<Client> clientList = clusterListener.getClientList(invokerConfig);
List<Client> clientsToRoute = new ArrayList<Client>(clientList);
if (excludeClients != null) {
clientsToRoute.removeAll(excludeClients);
}
//根據(jù)負(fù)載均衡策略選取有效的Client
//此處細(xì)節(jié)比較多丧荐,感興趣的朋友可以自行細(xì)致瀏覽下源碼缆瓣,限于篇幅不一一講解了
return routerManager.route(clientsToRoute, invokerConfig, request);
}
距離目標(biāo)越來越近了,我們繼續(xù)跟蹤代碼DefaultClusterListener的實(shí)現(xiàn)
private ConcurrentHashMap<String, List<Client>> serviceClients = new ConcurrentHashMap<String, List<Client>>();
public List<Client> getClientList(InvokerConfig<?> invokerConfig) {
//根據(jù)url獲取對(duì)應(yīng)的Client集合
List<Client> clientList = this.serviceClients.get(invokerConfig.getUrl());
return clientList;
}
問題來了虹统,serviceClients是在什么時(shí)候創(chuàng)建的Client實(shí)例呢弓坞?
我們回顧下AbstractServiceProxy.getProxy中的一段邏輯:
try {
ClientManager.getInstance().registerClients(invokerConfig);
} catch (Throwable t) {
logger.warn("error while trying to setup service client:" + invokerConfig, t);
}
從異常信息我們可以清晰的看到,這里就是創(chuàng)建service client的入口车荔,最終調(diào)用到DefaultClusterListener.addConnect添加Client映射關(guān)系到serviceClients渡冻。調(diào)用鏈路比較長,在此簡單貼一下線程調(diào)用棧:
至此我們理清了Client的創(chuàng)建忧便,接下來我們看看調(diào)用方的心跳監(jiān)聽族吻。
我們直接連接注冊中心zookeeper的相關(guān)類CuratorClient,用的是curator-framework-2.7.1.jar珠增,這個(gè)ZK客戶端功能很強(qiáng)大超歌,可以非常方便的對(duì)具體的zk節(jié)點(diǎn)添加listener回調(diào)。
private boolean newCuratorClient() throws InterruptedException {
//根據(jù)zk地址創(chuàng)建zkClient
CuratorFramework client = CuratorFrameworkFactory.builder().connectString(address)
.sessionTimeoutMs(sessionTimeout).connectionTimeoutMs(connectionTimeout)
.retryPolicy(new MyRetryPolicy(retries, retryInterval)).build();
//監(jiān)聽連接狀態(tài)蒂教,掉線重連
client.getConnectionStateListenable().addListener(new ConnectionStateListener() {
@Override
public void stateChanged(CuratorFramework client, ConnectionState newState) {
logger.info("zookeeper state changed to " + newState);
if (newState == ConnectionState.RECONNECTED) {
RegistryEventListener.connectionReconnected();
}
monitor.logEvent(EVENT_NAME, "zookeeper:" + newState.name().toLowerCase(), "");
}
});
//監(jiān)聽change事件N沾 !悴品!
client.getCuratorListenable().addListener(new CuratorEventListener(this), curatorEventListenerThreadPool);
client.start();
boolean isConnected = client.getZookeeperClient().blockUntilConnectedOrTimedOut();
CuratorFramework oldClient = this.client;
this.client = client;
close(oldClient);
return isConnected;
}
CuratorEventListener繼承org.apache.curator.framework.api.CuratorListener禀综,看下事件處理邏輯
@Override
public void eventReceived(CuratorFramework client, CuratorEvent curatorEvent) throws Exception {
WatchedEvent event = (curatorEvent == null ? null : curatorEvent.getWatchedEvent());
//過濾不敢興趣的EventType
if (event == null
|| (event.getType() != EventType.NodeCreated && event.getType() != EventType.NodeDataChanged
&& event.getType() != EventType.NodeDeleted && event.getType() != EventType.NodeChildrenChanged)) {
return;
}
try {
//解析節(jié)點(diǎn)路徑并分類
PathInfo pathInfo = parsePath(event.getPath());
if (pathInfo.type == ADDRESS) {//服務(wù)地址
addressChanged(pathInfo);
} else if (pathInfo.type == WEIGHT) {//權(quán)重
weightChanged(pathInfo);
} else if (pathInfo.type == APP) {
appChanged(pathInfo);
} else if (pathInfo.type == VERSION) {
versionChanged(pathInfo);
} else if (pathInfo.type == PROTOCOL) {
protocolChanged(pathInfo);
} else if (pathInfo.type == HOST_CONFIG) {
registryConfigChanged(pathInfo);
}
} catch (Throwable e) {
logger.error("Error in ZookeeperWatcher.process()", e);
return;
}
}
/*
* 1. Get newest value from ZK and watch again 2. Determine if changed
* against cache 3. notify if changed 4. pay attention to group fallback
* notification
*/
private void addressChanged(PathInfo pathInfo) throws Exception {
if (shouldNotify(pathInfo)) {
String hosts = client.get(pathInfo.path);
logger.info("Service address changed, path " + pathInfo.path + " value " + hosts);
List<String[]> hostDetail = Utils.getServiceIpPortList(hosts);
serviceChangeListener.onServiceHostChange(pathInfo.serviceName, hostDetail);
}
// Watch again
client.watch(pathInfo.path);
}
addressChanged難得加了注釋简烘,判斷是否需要回調(diào),回調(diào)定枷。
本篇到此結(jié)束孤澎,內(nèi)容較多,希望能對(duì)大家有所助益欠窒。
轉(zhuǎn)載請備注原文鏈接覆旭。