前言
XxlJob 2.0.1
版本源碼的止,其實(shí)在去年這個(gè)時(shí)候我已經(jīng)看完了檩坚,并且做了很詳細(xì)的注釋。但是由于自己太懶了诅福,沒(méi)有寫成博客分享匾委。俗話說(shuō)好記性不如爛筆頭,于是乎我挑出幾個(gè)源碼實(shí)現(xiàn)中我認(rèn)為不錯(cuò)的知識(shí)點(diǎn)并且結(jié)合自己的見解氓润,來(lái)分享一波赂乐。
因?yàn)槠^(guò)于長(zhǎng),簡(jiǎn)書不讓發(fā)咖气,要分成上下節(jié)挨措!
源碼思考
- 1.
executor
端是開啟一個(gè)jettyServer
,其中配置了JettyServerHandler
。將請(qǐng)求交給XxlRpcResponse xxlRpcResponse = xxlRpcProviderFactory.invokeService(xxlRpcRequest);
詳細(xì)可以看JettyServerHandler
中的handle
函數(shù)
@Override
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException {
if ("/services".equals(target)) { // services mapping
StringBuffer stringBuffer = new StringBuffer("<ui>");
for (String serviceKey: xxlRpcProviderFactory.getServiceData().keySet()) {
stringBuffer.append("<li>").append(serviceKey).append(": ").append(xxlRpcProviderFactory.getServiceData().get(serviceKey)).append("</li>");
}
stringBuffer.append("</ui>");
writeResponse(baseRequest, response, stringBuffer.toString().getBytes());
return;
} else { // default remoting mapping
// request parse
XxlRpcRequest xxlRpcRequest = null;
try {
xxlRpcRequest = parseRequest(request);
} catch (Exception e) {
writeResponse(baseRequest, response, ThrowableUtil.toString(e).getBytes());
return;
}
// invoke
XxlRpcResponse xxlRpcResponse = xxlRpcProviderFactory.invokeService(xxlRpcRequest);
// response-serialize + response-write
byte[] responseBytes = xxlRpcProviderFactory.getSerializer().serialize(xxlRpcResponse);
writeResponse(baseRequest, response, responseBytes);
}
}
executor
端通過(guò)上文中xxlRpcProviderFactory.invokeService(xxlRpcRequest);
函數(shù)執(zhí)行本地暴露的服務(wù)方法
/**
* invoke service
*
* @param xxlRpcRequest
* @return
*/
public XxlRpcResponse invokeService(XxlRpcRequest xxlRpcRequest) {
// make response
XxlRpcResponse xxlRpcResponse = new XxlRpcResponse();
xxlRpcResponse.setRequestId(xxlRpcRequest.getRequestId());
// match service bean
String serviceKey = makeServiceKey(xxlRpcRequest.getClassName(), xxlRpcRequest.getVersion());
Object serviceBean = serviceData.get(serviceKey);
// valid
if (serviceBean == null) {
xxlRpcResponse.setErrorMsg("The serviceKey["+ serviceKey +"] not found.");
return xxlRpcResponse;
}
if (System.currentTimeMillis() - xxlRpcRequest.getCreateMillisTime() > 3*60*1000) {
xxlRpcResponse.setErrorMsg("The timestamp difference between admin and executor exceeds the limit.");
return xxlRpcResponse;
}
if (accessToken!=null && accessToken.trim().length()>0 && !accessToken.trim().equals(xxlRpcRequest.getAccessToken())) {
xxlRpcResponse.setErrorMsg("The access token[" + xxlRpcRequest.getAccessToken() + "] is wrong.");
return xxlRpcResponse;
}
// invoke
try {
Class<?> serviceClass = serviceBean.getClass();
String methodName = xxlRpcRequest.getMethodName();
Class<?>[] parameterTypes = xxlRpcRequest.getParameterTypes();
Object[] parameters = xxlRpcRequest.getParameters();
Method method = serviceClass.getMethod(methodName, parameterTypes);
method.setAccessible(true);
Object result = method.invoke(serviceBean, parameters);
/*FastClass serviceFastClass = FastClass.create(serviceClass);
FastMethod serviceFastMethod = serviceFastClass.getMethod(methodName, parameterTypes);
Object result = serviceFastMethod.invoke(serviceBean, parameters);*/
xxlRpcResponse.setResult(result);
} catch (Throwable t) {
logger.error("xxl-rpc provider invokeService error.", t);
xxlRpcResponse.setErrorMsg(ThrowableUtil.toString(t));
}
return xxlRpcResponse;
}
executor
端服務(wù)提供者維護(hù)在serviceData
中, serviceData
是map
結(jié)構(gòu)崩溪。key
是服務(wù)提供者className
拼接版本號(hào), value
則是服務(wù)提供者本身實(shí)例浅役。
// ---------------------- server invoke ----------------------
/**
* init local rpc service map
*/
private Map<String, Object> serviceData = new HashMap<String, Object>();
public Map<String, Object> getServiceData() {
return serviceData;
}
admin
端其實(shí)是沒(méi)有開啟jettyServer
服務(wù)器,可以通過(guò)XxlJobDynamicScheduler
的
initRpcProvider()
一看究竟伶唯。
// ---------------------- init + destroy ----------------------
public void start() throws Exception {
// valid
Assert.notNull(scheduler, "quartz scheduler is null");
// init i18n
initI18n();
// admin registry monitor run
/*
啟動(dòng)自動(dòng)注冊(cè)線程觉既,獲取類型為自動(dòng)注冊(cè)的執(zhí)行器信息,完成機(jī)器的自動(dòng)注冊(cè)與發(fā)現(xiàn)
*/
JobRegistryMonitorHelper.getInstance().start();
// admin monitor run
/**
* 啟動(dòng)失敗日志監(jiān)控線程
*/
JobFailMonitorHelper.getInstance().start();
// admin-server
/**
* 暴露AdminBiz服務(wù),并設(shè)置jettyServerHandler
*/
initRpcProvider();
logger.info(">>>>>>>>> init xxl-job admin success.");
}
// ---------------------- admin rpc provider (no server version) ----------------------
private static JettyServerHandler jettyServerHandler;
private void initRpcProvider(){
// init
XxlRpcProviderFactory xxlRpcProviderFactory = new XxlRpcProviderFactory();
xxlRpcProviderFactory.initConfig(NetEnum.JETTY, Serializer.SerializeEnum.HESSIAN.getSerializer(), null, 0, XxlJobAdminConfig.getAdminConfig().getAccessToken(), null, null);
// add services
xxlRpcProviderFactory.addService(AdminBiz.class.getName(), null, XxlJobAdminConfig.getAdminConfig().getAdminBiz());
// jetty handler
jettyServerHandler = new JettyServerHandler(xxlRpcProviderFactory);
}
private void stopRpcProvider() throws Exception {
new XxlRpcInvokerFactory().stop();
}
public static void invokeAdminService(HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException {
jettyServerHandler.handle(null, new Request(null, null), request, response);
}
admin
端通過(guò)API
服務(wù)暴露出自己的服務(wù)抵怎。比如執(zhí)行器注冊(cè)服務(wù)奋救,任務(wù)結(jié)果回調(diào)服務(wù)。admin
端服務(wù)暴露都是通過(guò)JobApiController
實(shí)現(xiàn)的反惕,來(lái)達(dá)到和executor端類似的效果,請(qǐng)求交給JettyServerHandler
處理, 然后通過(guò)xxlRpcProviderFacotry
調(diào)用本地方法尝艘。
@Controller
public class JobApiController implements InitializingBean {
@Override
public void afterPropertiesSet() throws Exception {
}
@RequestMapping(AdminBiz.MAPPING)
@PermessionLimit(limit=false)
public void api(HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException {
XxlJobDynamicScheduler.invokeAdminService(request, response);
}
}
XxlRpcReferenceBean
這個(gè)factoryBean
中的getObject
方法用于創(chuàng)建代理對(duì)象,
代理邏輯中過(guò)濾掉了非業(yè)務(wù)方法,也就是Object類中的方法姿染。只是將目標(biāo)類的方法名背亥,參數(shù),類名等信息包裝成XxlRpcRequest
,通過(guò)JettyClient
發(fā)送給調(diào)度中心悬赏。調(diào)度中心的接口地址為"admin端的ip/api"狡汉。調(diào)度中心的API
接口拿到請(qǐng)求之后通過(guò)參數(shù)里面的類名,方法,參數(shù),版本號(hào)等信息反射出來(lái)一個(gè)服務(wù)實(shí)例,調(diào)用invoke
執(zhí)行方法,也就是上文中提到的JobApiController
闽颇。
每個(gè)XxlRpcReferenceBean
對(duì)象中都會(huì)初始化一個(gè)JettyClient
對(duì)象盾戴。感覺(jué)這樣做有點(diǎn)耗性能,需要優(yōu)化啊兵多。畢竟創(chuàng)建一個(gè)JettyClient
對(duì)象開銷并不小尖啡,也許你使用操作不恰當(dāng)會(huì)造成OOM
橄仆。之前有一位朋友就是對(duì)每一個(gè)請(qǐng)求都創(chuàng)建了一個(gè)HttpClient
,這樣由于創(chuàng)建每一個(gè)HttpClient
實(shí)例的時(shí)候都會(huì)調(diào)用evictExpireConnections
衅斩,造成有多少請(qǐng)求就會(huì)創(chuàng)建多少個(gè)定時(shí)線程盆顾,最后造成系統(tǒng)OOM
。所以建議這里最好采用單例或者將JettyClient
緩存起來(lái)畏梆。
// ---------------------- initClient ----------------------
Client client = null;
private void initClient() {
try {
client = netType.clientClass.newInstance();
client.init(this);
} catch (InstantiationException | IllegalAccessException e) {
throw new XxlRpcException(e);
}
}
創(chuàng)建發(fā)起RPC請(qǐng)求的代理對(duì)象
public Object getObject() {
return Proxy.newProxyInstance(Thread.currentThread()
.getContextClassLoader(), new Class[] { iface },
new InvocationHandler() {
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
String className = method.getDeclaringClass().getName();
// filter method like "Object.toString()"
if (Object.class.getName().equals(className)) {
logger.info(">>>>>>>>>>> xxl-rpc proxy class-method not support [{}.{}]", className, method.getName());
throw new XxlRpcException("xxl-rpc proxy class-method not support");
}
// address
String address = routeAddress();
if (address==null || address.trim().length()==0) {
throw new XxlRpcException("xxl-rpc reference bean["+ className +"] address empty");
}
// request
XxlRpcRequest xxlRpcRequest = new XxlRpcRequest();
xxlRpcRequest.setRequestId(UUID.randomUUID().toString());
xxlRpcRequest.setCreateMillisTime(System.currentTimeMillis());
xxlRpcRequest.setAccessToken(accessToken);
xxlRpcRequest.setClassName(className);
xxlRpcRequest.setMethodName(method.getName());
xxlRpcRequest.setParameterTypes(method.getParameterTypes());
xxlRpcRequest.setParameters(args);
// send
if (CallType.SYNC == callType) {
try {
// future set
XxlRpcFutureResponse futureResponse = new XxlRpcFutureResponse(xxlRpcRequest, null);
// do invoke
client.asyncSend(address, xxlRpcRequest);
// future get
XxlRpcResponse xxlRpcResponse = futureResponse.get(timeout, TimeUnit.MILLISECONDS);
if (xxlRpcResponse.getErrorMsg() != null) {
throw new XxlRpcException(xxlRpcResponse.getErrorMsg());
}
return xxlRpcResponse.getResult();
} catch (Exception e) {
logger.info(">>>>>>>>>>> xxl-job, invoke error, address:{}, XxlRpcRequest{}", address, xxlRpcRequest);
throw (e instanceof XxlRpcException)?e:new XxlRpcException(e);
} finally{
// remove-InvokerFuture
XxlRpcFutureResponseFactory.removeInvokerFuture(xxlRpcRequest.getRequestId());
}
} else if (CallType.FUTURE == callType) {
// thread future set
XxlRpcInvokeFuture invokeFuture = null;
try {
// future set
invokeFuture = new XxlRpcInvokeFuture(new XxlRpcFutureResponse(xxlRpcRequest, null));
XxlRpcInvokeFuture.setFuture(invokeFuture);
// do invoke
client.asyncSend(address, xxlRpcRequest);
return null;
} catch (Exception e) {
logger.info(">>>>>>>>>>> xxl-job, invoke error, address:{}, XxlRpcRequest{}", address, xxlRpcRequest);
// remove-InvokerFuture
invokeFuture.stop();
throw (e instanceof XxlRpcException)?e:new XxlRpcException(e);
}
} else if (CallType.CALLBACK == callType) {
// get callback
XxlRpcInvokeCallback finalInvokeCallback = invokeCallback;
XxlRpcInvokeCallback threadInvokeCallback = XxlRpcInvokeCallback.getCallback();
if (threadInvokeCallback != null) {
finalInvokeCallback = threadInvokeCallback;
}
if (finalInvokeCallback == null) {
throw new XxlRpcException("xxl-rpc XxlRpcInvokeCallback(CallType="+ CallType.CALLBACK.name() +") cannot be null.");
}
try {
// future set
XxlRpcFutureResponse futureResponse = new XxlRpcFutureResponse(xxlRpcRequest, finalInvokeCallback);
client.asyncSend(address, xxlRpcRequest);
} catch (Exception e) {
logger.info(">>>>>>>>>>> xxl-job, invoke error, address:{}, XxlRpcRequest{}", address, xxlRpcRequest);
// future remove
XxlRpcFutureResponseFactory.removeInvokerFuture(xxlRpcRequest.getRequestId());
throw (e instanceof XxlRpcException)?e:new XxlRpcException(e);
}
return null;
} else if (CallType.ONEWAY == callType) {
client.asyncSend(address, xxlRpcRequest);
return null;
} else {
throw new XxlRpcException("xxl-rpc callType["+ callType +"] invalid");
}
}
});
}
這里只講下SYNC
模式的請(qǐng)求您宪,XxlRpcFutureResponse futureResponse = new XxlRpcFutureResponse(xxlRpcRequest, null);
創(chuàng)建XxlRpcFutureResponse
對(duì)象(Future任務(wù)執(zhí)行的結(jié)果), 這里的invokeCallback
回調(diào)是null
。
public XxlRpcFutureResponse(XxlRpcRequest request, XxlRpcInvokeCallback invokeCallback) {
this.request = request;
this.invokeCallback = invokeCallback;
// set-InvokerFuture
XxlRpcFutureResponseFactory.setInvokerFuture(request.getRequestId(), this);
}
當(dāng)我們獲取Future
執(zhí)行結(jié)果時(shí), XxlRpcFutureResponse
中的done
變量如果是false
, 一直阻塞線程(還有超時(shí)機(jī)制), 除非有調(diào)用setResponse(XxlRpcResponse response)
方法
使done
變量為true
, 并獲取鎖,調(diào)用lock.notifyAll()
, 喚醒線程,并返回執(zhí)行結(jié)果奠涌。
在JettyClient
中發(fā)送請(qǐng)求宪巨,調(diào)用asyncSend
方法
@Override
public void asyncSend(String address, XxlRpcRequest xxlRpcRequest) throws Exception {
// do invoke
postRequestAsync(address, xxlRpcRequest);
}
在postRequestAsync
方法拿到RPC
請(qǐng)求執(zhí)行結(jié)果,我們應(yīng)該通知future
铣猩,喚醒等待揖铜。
// deserialize response
XxlRpcResponse xxlRpcResponse = (XxlRpcResponse) xxlRpcReferenceBean.getSerializer().deserialize(responseBytes, XxlRpcResponse.class);
// notify response
XxlRpcFutureResponseFactory.notifyInvokerFuture(xxlRpcResponse.getRequestId(), xxlRpcResponse);
在初始化XxlRpcFutureResponse
中, 我們有調(diào)用setInvokerFuture
方法將消息和XxlRpcFutureResponse
結(jié)果維護(hù)起來(lái)。
當(dāng)在JettyClient
執(zhí)行完請(qǐng)求獲取結(jié)果時(shí), 調(diào)用notifyInvokerFuture
方法設(shè)置XxlRpcFutureRespons
e中的xxlRpcResponse
屬性
也就是真實(shí)的執(zhí)行結(jié)果达皿。
JettyClient
方法請(qǐng)求雖然是異步的, 但是這里還是同步阻塞獲取執(zhí)行結(jié)果。
public class XxlRpcFutureResponseFactory {
private static ConcurrentMap<String, XxlRpcFutureResponse> futureResponsePool = new ConcurrentHashMap<String, XxlRpcFutureResponse>();
public static void setInvokerFuture(String requestId, XxlRpcFutureResponse futureResponse){
// TODO贿肩,running future method-isolation and limit
futureResponsePool.put(requestId, futureResponse);
}
public static void removeInvokerFuture(String requestId){
futureResponsePool.remove(requestId);
}
public static void notifyInvokerFuture(String requestId, XxlRpcResponse xxlRpcResponse){
XxlRpcFutureResponse futureResponse = futureResponsePool.get(requestId);
if (futureResponse != null) {
futureResponse.setResponse(xxlRpcResponse);
futureResponsePool.remove(requestId);
}
}
}
- 2.
XxlJob
的核心包括XxlRpc
,上文大致提到過(guò)其簡(jiǎn)單的流程峦椰。如果我們想要實(shí)現(xiàn)一個(gè)RPC
,應(yīng)該要做些什么呢汰规?比如序列化汤功,壓縮算法,協(xié)議溜哮,動(dòng)態(tài)代理滔金,服務(wù)注冊(cè),加密茂嗓,網(wǎng)絡(luò)編碼餐茵,連接管理,健康檢測(cè)述吸,負(fù)載均衡忿族,優(yōu)雅啟停機(jī),異常重試蝌矛,業(yè)務(wù)分組以及熔斷限流等等道批。由于之前寫過(guò)一個(gè)基于Netty
簡(jiǎn)單的RPC
框架,因此可以通過(guò)和XxlRpc
對(duì)比來(lái)查漏補(bǔ)缺入撒。(這里不會(huì)講Netty
隆豹,只會(huì)粗略講一下Rpc
代理)
netty-rpc-client
端掃描需要代理服務(wù)的接口并且修改BeanDefinition
初始化的方式。在Spring
容器中實(shí)例化一個(gè)對(duì)象的方式有:Supplier
茅逮,FactoryBean
璃赡,指定FactoryMethodName
和FactoryBeanName
,Constructor
等等簿煌。
@Slf4j
public class ClassPathRpcScanner extends ClassPathBeanDefinitionScanner {
private RpcFactoryBean<?> rpcFactoryBean = new RpcFactoryBean<Object>();
private Class<? extends Annotation> annotationClass;
public void setAnnotationClass(Class<? extends Annotation> annotationClass) {
this.annotationClass = annotationClass;
}
public ClassPathRpcScanner(BeanDefinitionRegistry registry) {
super(registry);
}
@Override
protected Set<BeanDefinitionHolder> doScan(String... basePackages) {
Set<BeanDefinitionHolder> beanDefinitions = super.doScan(basePackages);
if (CollectionUtils.isEmpty(beanDefinitions)) {
logger.warn("No RPC mapper was found in '"
+ Arrays.toString(basePackages)
+ "' package. Please check your configuration");
} else {
processBeanDefinitions(beanDefinitions);
}
return beanDefinitions;
}
public void registerFilter() {
boolean acceptAllInterfaces = true;
if (this.annotationClass != null) {
addIncludeFilter(new AnnotationTypeFilter(this.annotationClass));
acceptAllInterfaces = false;
}
if (acceptAllInterfaces) {
addIncludeFilter(new TypeFilter() {
@Override
public boolean match(MetadataReader metadataReader, MetadataReaderFactory metadataReaderFactory) throws IOException {
return true;
}
});
}
// exclude package-info.java
addExcludeFilter(new TypeFilter() {
@Override
public boolean match(MetadataReader metadataReader, MetadataReaderFactory metadataReaderFactory) throws IOException {
String className = metadataReader.getClassMetadata().getClassName();
return className.endsWith("package-info");
}
});
}
private void processBeanDefinitions(Set<BeanDefinitionHolder> beanDefinitionHolders) {
GenericBeanDefinition genericBeanDefinition = null;
for (BeanDefinitionHolder holder : beanDefinitionHolders) {
genericBeanDefinition = (GenericBeanDefinition) holder.getBeanDefinition();
// genericBeanDefinition.getConstructorArgumentValues().addGenericArgumentValue(genericBeanDefinition.getBeanClassName());
// genericBeanDefinition.setBeanClass(this.rpcFactoryBean.getClass());
genericBeanDefinition.setAutowireMode(AbstractBeanDefinition.AUTOWIRE_BY_TYPE);
/**
* Bean的初始化就可以通過(guò)Supplier.get() 和 設(shè)置factoryBean.getObject 有異曲同工之妙
*/
// try {
// genericBeanDefinition.setInstanceSupplier(new RpcSupplier<>(Class.forName(genericBeanDefinition.getBeanClassName())));
// } catch (Exception ex) {
// throw new RuntimeException(ex);
// }
/**
*
* 指定factoryMethodName,FactoryBeanName
* 當(dāng)我們能找到無(wú)參 就先執(zhí)行無(wú)參方法,最后執(zhí)行有參的方法。方法的參數(shù)來(lái)源于ConstructorArgumentValue
*
* 這里設(shè)置獨(dú)一份的鉴吹。 我們?nèi)绻O(shè)置了FactoryMethodName, 要注入的類型要和Method ReturnType要匹配起來(lái)
* 不然會(huì)報(bào)錯(cuò)姨伟。這個(gè)customFactoryBean不能設(shè)置成泛型
*/
/**
* DefaultListableBeanFactory:findAutowireCandidates
* DefaultListableBeanFactory:doGetBeanNamesForType
* AbstractAutowiredCapableBeanFactory:determineTargetType
* AbstractAutowiredCapableBeanFactory:getTypeForFactoryMethod
*/
genericBeanDefinition.getConstructorArgumentValues().addGenericArgumentValue(genericBeanDefinition.getBeanClassName());
genericBeanDefinition.setFactoryMethodName("getObject");
genericBeanDefinition.setFactoryBeanName("customFactoryBean");
// genericBeanDefinition.setFactoryBeanName();
// genericBeanDefinition.setFactoryMethodName();
log.info("ClassPathRpcScanner設(shè)置GenericBeanDefinition:{}", genericBeanDefinition);
log.info("ClassPathRpcScanner設(shè)置BeanDefinitionHolder:{}", holder);
}
}
@Override
protected boolean isCandidateComponent(AnnotatedBeanDefinition beanDefinition) {
return beanDefinition.getMetadata().isInterface() && beanDefinition.getMetadata().isIndependent();
}
}
指定FactoryBeanName和FactoryMethodName創(chuàng)建代理對(duì)象
public class CustomFactoryBean<T> {
@Autowired
private RpcFactory<T> rpcFactory;
public <T> T getObject(Class<T> rpcInterface) {
return (T) Proxy.newProxyInstance(rpcInterface.getClassLoader(), new Class[] {rpcInterface}, this.rpcFactory);
}
public static void main(String[] args) {
Class factoryClass = ClassUtils.getUserClass(CustomFactoryBean.class);
Method[] candidates = ReflectionUtils.getUniqueDeclaredMethods(factoryClass);
Method getObject = null;
for (Method method : candidates) {
if ("getObject".equalsIgnoreCase(method.getName())) {
getObject = method;
}
}
System.out.println(getObject);
/**
* 測(cè)試泛型
*/
System.out.println(getObject.getTypeParameters().length);
System.out.println(getObject.getTypeParameters());
System.out.println(Arrays.asList(getObject.getParameterTypes()));
}
}
FactoryBean
創(chuàng)建代理對(duì)象
public class RpcFactoryBean<T> implements FactoryBean<T> {
private Class<T> rpcInterface;
@Autowired
private RpcFactory<T> rpcFactory;
public RpcFactoryBean() {
}
public RpcFactoryBean(Class<T> rpcInterface) {
this.rpcInterface = rpcInterface;
}
@Nullable
@Override
public T getObject() throws Exception {
return getRpc();
}
@Nullable
@Override
public Class<?> getObjectType() {
return this.rpcInterface;
}
@Override
public boolean isSingleton() {
return (3&1) == 1;
}
public <T> T getRpc() {
/**
* 這一步不要寫成了rpcInterface.getInterfaces()
*/
return (T) Proxy.newProxyInstance(rpcInterface.getClassLoader(), new Class[] {this.rpcInterface}, this.rpcFactory);
}
public static void main(String[] args) {
System.out.println(InfoUserService.class.getInterfaces());
}
}
Supplier
方式創(chuàng)建代理對(duì)象
public class RpcSupplier<T> implements Supplier<T> {
private Class<T> rpcInterface;
public RpcSupplier(Class<T> rpcInterface) {
this.rpcInterface = rpcInterface;
}
/**
* 這里不用注入的方式, 因?yàn)镽pcSupplier沒(méi)有被Spring容器托管
*/
private RpcFactory<T> rpcFactory = null;
@Override
public T get() {
ApplicationContext context = SpringApplicationContextUtil.getApplicationContext();
if (context != null) {
rpcFactory = context.getBean(RpcFactory.class);
}
return (T) Proxy.newProxyInstance(rpcInterface.getClassLoader(), new Class[] {this.rpcInterface}, this.rpcFactory);
}
}
InvocationHandler
@Component
@Slf4j
public class RpcFactory<T> implements InvocationHandler {
@Autowired(required = false)
private NettyClient nettyClient = new NettyClient();
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
Request request = new Request();
request.setClassName(method.getDeclaringClass().getName());
request.setMethodName(method.getName());
request.setParameters(args);
request.setParameterType(method.getParameterTypes());
request.setId(IdUtil.getId());
Object result = nettyClient.send(request);
Class<?> returnType = method.getReturnType();
Response response = JSON.parseObject(result.toString(), Response.class);
if (response.getCode() == 1) {
throw new RuntimeException(response.getErrorMsg());
}
if (returnType.isPrimitive() || String.class.isAssignableFrom(returnType)) {
return response.getData();
} else if (Collection.class.isAssignableFrom(returnType)) {
return JSONArray.parseArray(response.getData().toString(), Object.class);
} else if (Map.class.isAssignableFrom(returnType)) {
return JSON.parseObject(response.getData().toString(), Map.class);
} else {
Object data = response.getData();
if (data != null) {
return JSON.parseObject(data.toString(), returnType);
} else {
return null;
}
}
}
}
Rpc
掃描配置,指定要掃描的路徑和服務(wù)接口豆励。
@Component
public class RpcScannerConfigurer implements BeanDefinitionRegistryPostProcessor {
private static final String BASE_PACKAGE = "com.cmazxiaoma.springcloud.netty.service";
@Override
public void postProcessBeanDefinitionRegistry(BeanDefinitionRegistry registry) throws BeansException {
ClassPathRpcScanner classPathRpcScanner = new ClassPathRpcScanner(registry);
classPathRpcScanner.setAnnotationClass(RpcService.class);
classPathRpcScanner.registerFilter();
classPathRpcScanner.scan(StringUtils.tokenizeToStringArray(BASE_PACKAGE, ConfigurableApplicationContext.CONFIG_LOCATION_DELIMITERS));
}
@Override
public void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory) throws BeansException {
}
public static void main(String[] args) {
System.out.println(Arrays.asList(StringUtils.tokenizeToStringArray("com.cmazxiaoma.springcloud.netty.service", ConfigurableApplicationContext.CONFIG_LOCATION_DELIMITERS)));
}
}
在我們netty-rpc-client
, 我們掃描所有被@RpcService
注解的接口, 為這些接口
創(chuàng)建BeanDefinition
, 指定其BeanClass
是我們的factory
類, 指定其接口的類型夺荒。
我們?cè)谶@個(gè)factory
類,給服務(wù)接口做動(dòng)態(tài)代理,其InvocationHandler
中的invoke
函數(shù)就會(huì)發(fā)起rpc調(diào)用良蒸。
那我們?cè)趺匆梅?wù)者提供者接口呢?
在Controller
類中通過(guò)@Autowired
注入服務(wù)提供者接口即可技扼。
再看XxlJob
的實(shí)現(xiàn)
比如我們?cè)?code>Controller中 把服務(wù)接口打上@XxlRpcReference
注解, 可以設(shè)置timeout
,version
,序列化算法等等等屬性
然后Spring
容器在這個(gè)服務(wù)接口依賴的bean
實(shí)例化(postProcessAfterInstantiation)的時(shí)候, 會(huì)為被@XxlRpcReference注解的服務(wù)接口字段,創(chuàng)建XxlRpcReferenceBean
, 同時(shí)把引用賦值給這些服務(wù)接口嫩痰。
XxlRpcReferenceBean
也是一個(gè)工廠類, 內(nèi)部也做了動(dòng)態(tài)代理剿吻。
我覺(jué)得這些XxlRpcReferenceBean
最好緩存起來(lái),不然10次引用服務(wù)接口,就要?jiǎng)?chuàng)建10
次對(duì)象(內(nèi)部還要?jiǎng)?chuàng)建JettyClient
對(duì)象)。
public class XxlRpcSpringInvokerFactory extends InstantiationAwareBeanPostProcessorAdapter implements InitializingBean,DisposableBean, BeanFactoryAware {
private Logger logger = LoggerFactory.getLogger(XxlRpcSpringInvokerFactory.class);
// ---------------------- config ----------------------
private Class<? extends ServiceRegistry> serviceRegistryClass; // class.forname
private Map<String, String> serviceRegistryParam;
public void setServiceRegistryClass(Class<? extends ServiceRegistry> serviceRegistryClass) {
this.serviceRegistryClass = serviceRegistryClass;
}
public void setServiceRegistryParam(Map<String, String> serviceRegistryParam) {
this.serviceRegistryParam = serviceRegistryParam;
}
// ---------------------- util ----------------------
private XxlRpcInvokerFactory xxlRpcInvokerFactory;
@Override
public void afterPropertiesSet() throws Exception {
// start invoker factory
xxlRpcInvokerFactory = new XxlRpcInvokerFactory(serviceRegistryClass, serviceRegistryParam);
xxlRpcInvokerFactory.start();
}
@Override
public boolean postProcessAfterInstantiation(final Object bean, final String beanName) throws BeansException {
ReflectionUtils.doWithFields(bean.getClass(), new ReflectionUtils.FieldCallback() {
@Override
public void doWith(Field field) throws IllegalArgumentException, IllegalAccessException {
if (field.isAnnotationPresent(XxlRpcReference.class)) {
// valid
Class iface = field.getType();
if (!iface.isInterface()) {
throw new XxlRpcException("xxl-rpc, reference(XxlRpcReference) must be interface.");
}
XxlRpcReference rpcReference = field.getAnnotation(XxlRpcReference.class);
// init reference bean
XxlRpcReferenceBean referenceBean = new XxlRpcReferenceBean(
rpcReference.netType(),
rpcReference.serializer().getSerializer(),
rpcReference.callType(),
iface,
rpcReference.version(),
rpcReference.timeout(),
rpcReference.address(),
rpcReference.accessToken(),
null
);
Object serviceProxy = referenceBean.getObject();
// set bean
field.setAccessible(true);
field.set(bean, serviceProxy);
logger.info(">>>>>>>>>>> xxl-rpc, invoker factory init reference bean success. serviceKey = {}, bean.field = {}.{}",
XxlRpcProviderFactory.makeServiceKey(iface.getName(), rpcReference.version()), beanName, field.getName());
}
}
});
return super.postProcessAfterInstantiation(bean, beanName);
}
@Override
public void destroy() throws Exception {
// stop invoker factory
xxlRpcInvokerFactory.stop();
}
private BeanFactory beanFactory;
@Override
public void setBeanFactory(BeanFactory beanFactory) throws BeansException {
this.beanFactory = beanFactory;
}
}
在我們netty-rpc-server端, 掃描@RpcService
維護(hù)到一個(gè)map
里面, key
是接口類型,value
是服務(wù)提供者實(shí)現(xiàn)類串纺。當(dāng)消費(fèi)者發(fā)起rpc
調(diào)用時(shí), 需要調(diào)用服務(wù)提供者實(shí)現(xiàn)類的method
時(shí), 就減少了反射服務(wù)提供者實(shí)現(xiàn)類的性能開銷丽旅。這里是不是可以把服務(wù)實(shí)現(xiàn)類的所有method
都可以緩存起來(lái)? 我覺(jué)得有利有弊,可以通過(guò)CPU纺棺,內(nèi)存榄笙,維護(hù)緩存的成本這3
個(gè)維度決策。綜合考慮祷蝌,還是不要緩存起來(lái)較好茅撞。一個(gè)method
反射次數(shù)超過(guò)15
次,會(huì)交給MethodAccessorImpl
處理巨朦,會(huì)在內(nèi)存中生成對(duì)應(yīng)的字節(jié)碼米丘,并調(diào)用ClassDefiner.defineClass
創(chuàng)建對(duì)應(yīng)的class
對(duì)象,性能會(huì)得到一定的提升糊啡。
Java
反射效率低的原因:
Method#invoke 方法會(huì)對(duì)參數(shù)做封裝和解封操作:我們可以看到拄查,invoke 方法的參數(shù)是 Object[] 類型,也就是說(shuō)悔橄,如果方法參數(shù)是簡(jiǎn)單類型的話靶累,需要在此轉(zhuǎn)化成 Object 類型,例如 long ,在 javac compile 的時(shí)候 用了Long.valueOf() 轉(zhuǎn)型癣疟,也就大量了生成了Long 的 Object, 同時(shí) 傳入的參數(shù)是Object[]數(shù)值,那還需要額外封裝object數(shù)組挣柬。而在上面 MethodAccessorGenerator#emitInvoke 方法里我們看到,生成的字節(jié)碼時(shí)睛挚,會(huì)把參數(shù)數(shù)組拆解開來(lái)邪蛔,把參數(shù)恢復(fù)到?jīng)]有被 Object[] 包裝前的樣子,同時(shí)還要對(duì)參數(shù)做校驗(yàn)扎狱,這里就涉及到了解封操作侧到。因此勃教,在反射調(diào)用的時(shí)候,因?yàn)榉庋b和解封匠抗,產(chǎn)生了額外的不必要的內(nèi)存浪費(fèi)故源,當(dāng)調(diào)用次數(shù)達(dá)到一定量的時(shí)候,還會(huì)導(dǎo)致 GC汞贸。
需要檢查方法可見性绳军。我們會(huì)發(fā)現(xiàn),反射時(shí)每次調(diào)用都必須檢查方法的可見性(在 Method.invoke 里)
需要校驗(yàn)參數(shù):反射時(shí)也必須檢查每個(gè)實(shí)際參數(shù)與形式參數(shù)的類型匹配性(在NativeMethodAccessorImpl.invoke0 里或者生成的 Java 版 MethodAccessor.invoke 里)
反射方法難以內(nèi)聯(lián):Method invoke 就像是個(gè)獨(dú)木橋一樣矢腻,各處的反射調(diào)用都要擠過(guò)去门驾,在調(diào)用點(diǎn)上收集到的類型信息就會(huì)很亂,影響內(nèi)聯(lián)程序的判斷多柑,使得 Method.invoke() 自身難以被內(nèi)聯(lián)到調(diào)用方奶是。(方法內(nèi)聯(lián)指的是在即時(shí)編譯過(guò)程中遇到方法調(diào)用時(shí),直接編譯目標(biāo)方法的方法體,并替換原方法調(diào)用。這樣就不再需要像調(diào)用方法那樣的壓棧竣灌,出棧聂沙,傳參了)
JIT 無(wú)法優(yōu)化:因?yàn)榉瓷渖婕暗絼?dòng)態(tài)加載的類型,所以無(wú)法進(jìn)行優(yōu)化帐偎。
回到正軌逐纬,我們這一套實(shí)現(xiàn)和Xxl-Rpc
的實(shí)現(xiàn)方式大同小異。
public class XxlRpcSpringProviderFactory extends XxlRpcProviderFactory implements ApplicationContextAware, InitializingBean,DisposableBean {
// ---------------------- config ----------------------
private String netType = NetEnum.JETTY.name();
private String serialize = Serializer.SerializeEnum.HESSIAN.name();
private String ip = IpUtil.getIp(); // for registry
private int port = 7080; // default port
private String accessToken;
private Class<? extends ServiceRegistry> serviceRegistryClass; // class.forname
private Map<String, String> serviceRegistryParam;
// set
public void setNetType(String netType) {
this.netType = netType;
}
public void setSerialize(String serialize) {
this.serialize = serialize;
}
public void setIp(String ip) {
this.ip = ip;
}
public void setPort(int port) {
this.port = port;
}
public void setAccessToken(String accessToken) {
this.accessToken = accessToken;
}
public void setServiceRegistryClass(Class<? extends ServiceRegistry> serviceRegistryClass) {
this.serviceRegistryClass = serviceRegistryClass;
}
public void setServiceRegistryParam(Map<String, String> serviceRegistryParam) {
this.serviceRegistryParam = serviceRegistryParam;
}
// util
private void prepareConfig(){
// prepare config
NetEnum netTypeEnum = NetEnum.autoMatch(netType, null);
Serializer.SerializeEnum serializeEnum = Serializer.SerializeEnum.match(serialize, null);
Serializer serializer = serializeEnum!=null?serializeEnum.getSerializer():null;
if (port <= 0) {
throw new XxlRpcException("xxl-rpc provider port["+ port +"] is unvalid.");
}
if (NetUtil.isPortUsed(port)) {
throw new XxlRpcException("xxl-rpc provider port["+ port +"] is used.");
}
// init config
super.initConfig(netTypeEnum, serializer, ip, port, accessToken, serviceRegistryClass, serviceRegistryParam);
}
// ---------------------- util ----------------------
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
Map<String, Object> serviceBeanMap = applicationContext.getBeansWithAnnotation(XxlRpcService.class);
if (serviceBeanMap!=null && serviceBeanMap.size()>0) {
for (Object serviceBean : serviceBeanMap.values()) {
// valid
if (serviceBean.getClass().getInterfaces().length ==0) {
throw new XxlRpcException("xxl-rpc, service(XxlRpcService) must inherit interface.");
}
// add service
XxlRpcService xxlRpcService = serviceBean.getClass().getAnnotation(XxlRpcService.class);
String iface = serviceBean.getClass().getInterfaces()[0].getName();
String version = xxlRpcService.version();
super.addService(iface, version, serviceBean);
}
}
// TODO削樊,addServices by api + prop
}
@Override
public void afterPropertiesSet() throws Exception {
this.prepareConfig();
super.start();
}
@Override
public void destroy() throws Exception {
super.stop();
}
}
- 3.
XxlJob
對(duì)Groovy
的支持,同時(shí)還支持注入Spring
中的bean
.(畫外音:Zuul
使用Grovvy
定義動(dòng)態(tài)過(guò)濾器的時(shí)候兔毒,刪除Grovvy
文件并不能從當(dāng)前運(yùn)行的api網(wǎng)關(guān)中移除這個(gè)過(guò)濾器漫贞,只能將shouldFilter
返回false
。目前過(guò)濾器無(wú)法注入Spring
中的bean
)育叁。
public class GlueFactory {
private static GlueFactory glueFactory = new GlueFactory();
public static GlueFactory getInstance(){
return glueFactory;
}
public static void refreshInstance(int type){
if (type == 0) {
glueFactory = new GlueFactory();
} else if (type == 1) {
glueFactory = new SpringGlueFactory();
}
}
/**
* groovy class loader
*/
private GroovyClassLoader groovyClassLoader = new GroovyClassLoader();
/**
* load new instance, prototype
*
* @param codeSource
* @return
* @throws Exception
*/
public IJobHandler loadNewInstance(String codeSource) throws Exception{
if (codeSource!=null && codeSource.trim().length()>0) {
Class<?> clazz = groovyClassLoader.parseClass(codeSource);
if (clazz != null) {
Object instance = clazz.newInstance();
if (instance!=null) {
if (instance instanceof IJobHandler) {
this.injectService(instance);
return (IJobHandler) instance;
} else {
throw new IllegalArgumentException(">>>>>>>>>>> xxl-glue, loadNewInstance error, "
+ "cannot convert from instance["+ instance.getClass() +"] to IJobHandler");
}
}
}
}
throw new IllegalArgumentException(">>>>>>>>>>> xxl-glue, loadNewInstance error, instance is null");
}
/**
* inject service of bean field
*
* @param instance
*/
public void injectService(Object instance) {
// do something
}
}
public class SpringGlueFactory extends GlueFactory {
private static Logger logger = LoggerFactory.getLogger(SpringGlueFactory.class);
/**
* inject action of spring
* @param instance
*/
@Override
public void injectService(Object instance){
if (instance==null) {
return;
}
if (XxlJobSpringExecutor.getApplicationContext() == null) {
return;
}
Field[] fields = instance.getClass().getDeclaredFields();
for (Field field : fields) {
if (Modifier.isStatic(field.getModifiers())) {
continue;
}
Object fieldBean = null;
// with bean-id, bean could be found by both @Resource and @Autowired, or bean could only be found by @Autowired
if (AnnotationUtils.getAnnotation(field, Resource.class) != null) {
try {
Resource resource = AnnotationUtils.getAnnotation(field, Resource.class);
if (resource.name()!=null && resource.name().length()>0){
fieldBean = XxlJobSpringExecutor.getApplicationContext().getBean(resource.name());
} else {
fieldBean = XxlJobSpringExecutor.getApplicationContext().getBean(field.getName());
}
} catch (Exception e) {
}
if (fieldBean==null ) {
fieldBean = XxlJobSpringExecutor.getApplicationContext().getBean(field.getType());
}
} else if (AnnotationUtils.getAnnotation(field, Autowired.class) != null) {
Qualifier qualifier = AnnotationUtils.getAnnotation(field, Qualifier.class);
if (qualifier!=null && qualifier.value()!=null && qualifier.value().length()>0) {
fieldBean = XxlJobSpringExecutor.getApplicationContext().getBean(qualifier.value());
} else {
fieldBean = XxlJobSpringExecutor.getApplicationContext().getBean(field.getType());
}
}
if (fieldBean!=null) {
field.setAccessible(true);
try {
field.set(instance, fieldBean);
} catch (IllegalArgumentException e) {
logger.error(e.getMessage(), e);
} catch (IllegalAccessException e) {
logger.error(e.getMessage(), e);
}
}
}
}
}
- 4.
Shiro
通過(guò)實(shí)現(xiàn)DestructionAwareBeanPostProcessor
完成對(duì)Bean生命周期的掌握迅脐,異曲同工之妙的還有ApplicationListenerDetector
等等等。
我覺(jué)得xxl-job
可以用這個(gè)更加優(yōu)雅的方式來(lái)完成對(duì)bean
的生命周期的托管豪嗽。
public class LifecycleBeanPostProcessor implements DestructionAwareBeanPostProcessor, PriorityOrdered {
/**
* Private internal class log instance.
*/
private static final Logger log = LoggerFactory.getLogger(LifecycleBeanPostProcessor.class);
/**
* Order value of this BeanPostProcessor.
*/
private int order;
/**
* Default Constructor.
*/
public LifecycleBeanPostProcessor() {
this(LOWEST_PRECEDENCE);
}
/**
* Constructor with definable {@link #getOrder() order value}.
*
* @param order order value of this BeanPostProcessor.
*/
public LifecycleBeanPostProcessor(int order) {
this.order = order;
}
/**
* Calls the <tt>init()</tt> methods on the bean if it implements {@link org.apache.shiro.util.Initializable}
*
* @param object the object being initialized.
* @param name the name of the bean being initialized.
* @return the initialized bean.
* @throws BeansException if any exception is thrown during initialization.
*/
public Object postProcessBeforeInitialization(Object object, String name) throws BeansException {
if (object instanceof Initializable) {
try {
if (log.isDebugEnabled()) {
log.debug("Initializing bean [" + name + "]...");
}
((Initializable) object).init();
} catch (Exception e) {
throw new FatalBeanException("Error initializing bean [" + name + "]", e);
}
}
return object;
}
/**
* Does nothing - merely returns the object argument immediately.
*/
public Object postProcessAfterInitialization(Object object, String name) throws BeansException {
// Does nothing after initialization
return object;
}
/**
* Calls the <tt>destroy()</tt> methods on the bean if it implements {@link org.apache.shiro.util.Destroyable}
*
* @param object the object being initialized.
* @param name the name of the bean being initialized.
* @throws BeansException if any exception is thrown during initialization.
*/
public void postProcessBeforeDestruction(Object object, String name) throws BeansException {
if (object instanceof Destroyable) {
try {
if (log.isDebugEnabled()) {
log.debug("Destroying bean [" + name + "]...");
}
((Destroyable) object).destroy();
} catch (Exception e) {
throw new FatalBeanException("Error destroying bean [" + name + "]", e);
}
}
}
/**
* Order value of this BeanPostProcessor.
*
* @return order value.
*/
public int getOrder() {
// LifecycleBeanPostProcessor needs Order. See https://issues.apache.org/jira/browse/SHIRO-222
return order;
}
/**
* Return true only if <code>bean</code> implements Destroyable.
* @param bean bean to check if requires destruction.
* @return true only if <code>bean</code> implements Destroyable.
* @since 1.4
*/
@SuppressWarnings("unused")
public boolean requiresDestruction(Object bean) {
return (bean instanceof Destroyable);
}
}
- 在
Dubbo
中谴蔑,Invoke
是一個(gè)關(guān)鍵組件。在服務(wù)者和消費(fèi)者之間都充當(dāng)服務(wù)調(diào)用的一個(gè)職責(zé)龟梦,有點(diǎn)像xxl-job
中的XxlRpcProviderFactory
隐锭。
- 在
Dubbo
中的ProxyFactory
,getInvoker
方法用于在服務(wù)提供者端计贰,將服務(wù)的具體實(shí)現(xiàn)類轉(zhuǎn)換成Invoker
钦睡。getProxy
方法用于在消費(fèi)者端,將invoker
轉(zhuǎn)換成客戶端需要的接口躁倒。在服務(wù)提供者ServiceConfig
和消費(fèi)者ReferenceConfig
中荞怒,都會(huì)對(duì)proxyFactory
通過(guò)ExtensionLoader
擴(kuò)展機(jī)制生成適配類ProxyFactory$Adaptive
洒琢。這個(gè)適配類會(huì)根據(jù)URL
的ProxyFactory
參數(shù)選擇對(duì)應(yīng)的實(shí)現(xiàn)類進(jìn)行操作。
/**
* ProxyFactory. (API/SPI, Singleton, ThreadSafe)
*/
@SPI("javassist")
public interface ProxyFactory {
/**
* create proxy.
*
* @param invoker
* @return proxy
*/
@Adaptive({Constants.PROXY_KEY})
<T> T getProxy(Invoker<T> invoker) throws RpcException;
/**
* create invoker.
*
* @param <T>
* @param proxy
* @param type
* @param url
* @return invoker
*/
@Adaptive({Constants.PROXY_KEY})
<T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) throws RpcException;
}
ReferenceBean
創(chuàng)建的代理對(duì)象褐桌,就要經(jīng)過(guò)InvokerInovationHandler
處理衰抑。首先會(huì)經(jīng)過(guò)AbstractInvoker
中的public Result invoke(Invocation inv) throws RpcException
,然后再交給其子類doInvoke(invocation)
實(shí)現(xiàn)荧嵌。
public class InvokerInvocationHandler implements InvocationHandler {
private final Invoker<?> invoker;
public InvokerInvocationHandler(Invoker<?> handler) {
this.invoker = handler;
}
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
String methodName = method.getName();
Class<?>[] parameterTypes = method.getParameterTypes();
if (method.getDeclaringClass() == Object.class) {
return method.invoke(invoker, args);
}
if ("toString".equals(methodName) && parameterTypes.length == 0) {
return invoker.toString();
}
if ("hashCode".equals(methodName) && parameterTypes.length == 0) {
return invoker.hashCode();
}
if ("equals".equals(methodName) && parameterTypes.length == 1) {
return invoker.equals(args[0]);
}
return invoker.invoke(new RpcInvocation(method, args)).recreate();
}
}
可以看看DubboInvoker
中的doInvoke
實(shí)現(xiàn)呛踊,發(fā)起遠(yuǎn)程調(diào)用。
@Override
protected Result doInvoke(final Invocation invocation) throws Throwable {
RpcInvocation inv = (RpcInvocation) invocation;
final String methodName = RpcUtils.getMethodName(invocation);
inv.setAttachment(Constants.PATH_KEY, getUrl().getPath());
inv.setAttachment(Constants.VERSION_KEY, version);
ExchangeClient currentClient;
if (clients.length == 1) {
currentClient = clients[0];
} else {
currentClient = clients[index.getAndIncrement() % clients.length];
}
try {
boolean isAsync = RpcUtils.isAsync(getUrl(), invocation);
boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
int timeout = getUrl().getMethodParameter(methodName, Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
if (isOneway) {
boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
currentClient.send(inv, isSent);
RpcContext.getContext().setFuture(null);
return new RpcResult();
} else if (isAsync) {
ResponseFuture future = currentClient.request(inv, timeout);
RpcContext.getContext().setFuture(new FutureAdapter<Object>(future));
return new RpcResult();
} else {
RpcContext.getContext().setFuture(null);
return (Result) currentClient.request(inv, timeout).get();
}
} catch (TimeoutException e) {
throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
} catch (RemotingException e) {
throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
}
}
服務(wù)提供者收到請(qǐng)求時(shí)完丽,會(huì)回調(diào)此方法恋技。最終會(huì)通過(guò)AbstractProxyInvoker
調(diào)用到上文提到過(guò) ProxyFactory
生成的Invoker
對(duì)象。
private ExchangeHandler requestHandler = new ExchangeHandlerAdapter() {
@Override
public Object reply(ExchangeChannel channel, Object message) throws RemotingException {
if (message instanceof Invocation) {
Invocation inv = (Invocation) message;
Invoker<?> invoker = getInvoker(channel, inv);
// need to consider backward-compatibility if it's a callback
if (Boolean.TRUE.toString().equals(inv.getAttachments().get(IS_CALLBACK_SERVICE_INVOKE))) {
String methodsStr = invoker.getUrl().getParameters().get("methods");
boolean hasMethod = false;
if (methodsStr == null || methodsStr.indexOf(",") == -1) {
hasMethod = inv.getMethodName().equals(methodsStr);
} else {
String[] methods = methodsStr.split(",");
for (String method : methods) {
if (inv.getMethodName().equals(method)) {
hasMethod = true;
break;
}
}
}
if (!hasMethod) {
logger.warn(new IllegalStateException("The methodName " + inv.getMethodName()
+ " not found in callback service interface ,invoke will be ignored."
+ " please update the api interface. url is:"
+ invoker.getUrl()) + " ,invocation is :" + inv);
return null;
}
}
RpcContext.getContext().setRemoteAddress(channel.getRemoteAddress());
return invoker.invoke(inv);
}
throw new RemotingException(channel, "Unsupported request: "
+ (message == null ? null : (message.getClass().getName() + ": " + message))
+ ", channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress());
}
AbstractProxyInvoker
最終會(huì)調(diào)用服務(wù)提供者類中的方法逻族。
@Override
public Result invoke(Invocation invocation) throws RpcException {
try {
return new RpcResult(doInvoke(proxy, invocation.getMethodName(), invocation.getParameterTypes(), invocation.getArguments()));
} catch (InvocationTargetException e) {
return new RpcResult(e.getTargetException());
} catch (Throwable e) {
throw new RpcException("Failed to invoke remote proxy method " + invocation.getMethodName() + " to " + getUrl() + ", cause: " + e.getMessage(), e);
}
}
public class JavassistProxyFactory extends AbstractProxyFactory {
@Override
@SuppressWarnings("unchecked")
public <T> T getProxy(Invoker<T> invoker, Class<?>[] interfaces) {
return (T) Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker));
}
@Override
public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {
// TODO Wrapper cannot handle this scenario correctly: the classname contains '$'
final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type);
return new AbstractProxyInvoker<T>(proxy, type, url) {
@Override
protected Object doInvoke(T proxy, String methodName,
Class<?>[] parameterTypes,
Object[] arguments) throws Throwable {
return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);
}
};
}
}
我分析的比較簡(jiǎn)陋蜻底,畢竟這篇文章可不能讓Dubbo
喧賓奪主。對(duì)于xxl-Rpc
在消費(fèi)者掃描服務(wù)接口做動(dòng)態(tài)代理聘鳞,在提供者掃描服務(wù)實(shí)例并維護(hù)的實(shí)現(xiàn)薄辅,我們對(duì)此可以看下Dubbo
在這一塊是怎么實(shí)現(xiàn)的。(在Dubbo
中抠璃,服務(wù)實(shí)例維護(hù)主要依托Exporter
站楚。它是服務(wù)暴露數(shù)據(jù)結(jié)構(gòu),規(guī)定了通常暴露某個(gè)協(xié)議的服務(wù)時(shí)搏嗡,exporter
具有獲取該服務(wù)可執(zhí)行對(duì)象Invoker
的能力窿春,以及取消暴露的能力。Invoker
轉(zhuǎn)化為Exporter
時(shí)服務(wù)暴露的關(guān)鍵采盒, 將該協(xié)議下的Exporter
維護(hù)到exporterMap
中將完成整個(gè)服務(wù)的暴露旧乞,往往每一種協(xié)議都會(huì)生成與之對(duì)應(yīng)的Exporter
。Exporter
本文不會(huì)講到磅氨,有興趣的可以看下DubboProtocol
,RegistryProtocol
)
欲知后事如何,且聽下回分解
萬(wàn)萬(wàn)沒(méi)想到一個(gè)xxl-job源碼分析尺栖,竟然能引發(fā)這么多血案!(下)
尾言
萬(wàn)萬(wàn)沒(méi)想到烦租,一個(gè)知識(shí)點(diǎn)竟然能引發(fā)這么多血案延赌!溜了溜了,俯臥撐搞起叉橱。