這里不再對(duì)開源框架XX-JOB做介紹忽肛,單純介紹部分功能實(shí)現(xiàn)原理。本篇記錄執(zhí)行器Executor如何注冊(cè)到任務(wù)管理器Admin留晚。
本系列文章基于V2.1.0版本介紹客冈,附github上架構(gòu)圖。
然后Admin在根據(jù)不同策略獲取這些地址。
整體大概流程
工程目錄
以springboot為例
初始化配置文件XxlJobConfig.java -->創(chuàng)建XxlJobSpringExecutor.java(set 執(zhí)行器和管理器各種信息) bean -->指定init和destory方法 --> XxlJobSpringExecutor執(zhí)行start()
//從application.properties文件中讀取admin和executor信息仁热,并初始化到XxlJobSpringExecutor類中榜揖,指定init和destory方法
@Bean(initMethod = "start", destroyMethod = "destroy")
public XxlJobSpringExecutor xxlJobExecutor() {
logger.info(">>>>>>>>>>> xxl-job config init.");
XxlJobSpringExecutor xxlJobSpringExecutor = new XxlJobSpringExecutor();
//設(shè)置admin地址,eg:http://127.0.0.1:8080/xxl-job-admin
xxlJobSpringExecutor.setAdminAddresses(adminAddresses);
//設(shè)置執(zhí)行器名稱抗蠢,eg:xxl-job-executor-sample
xxlJobSpringExecutor.setAppName(appName);
//設(shè)置執(zhí)行器ip和port
xxlJobSpringExecutor.setIp(ip);
xxlJobSpringExecutor.setPort(port);
//設(shè)置執(zhí)行器訪問口令
xxlJobSpringExecutor.setAccessToken(accessToken);
//設(shè)置日志保存路徑
xxlJobSpringExecutor.setLogPath(logPath);
//設(shè)置日志保存天數(shù)
xxlJobSpringExecutor.setLogRetentionDays(logRetentionDays);
return xxlJobSpringExecutor;
}
接著XxlJobSpringExecutor執(zhí)行start方法
public class XxlJobSpringExecutor extends XxlJobExecutor implements ApplicationContextAware {
@Override
public void start() throws Exception {
// 初始化執(zhí)行器上面的任務(wù)
initJobHandlerRepository(applicationContext);
// refresh GlueFactory
GlueFactory.refreshInstance(1);
// super start
super.start();
}
接著父類XxlJobExecutor 執(zhí)行start方法
// ---------------------- start + stop ----------------------
public void start() throws Exception {
//設(shè)置日志路徑
// init logpath
XxlJobFileAppender.initLogPath(logPath);
//設(shè)置admin地址及執(zhí)行器訪問口令
// init invoker, admin-client
initAdminBizList(adminAddresses, accessToken);
//設(shè)置日志清理線程參數(shù)
// init JobLogFileCleanThread
JobLogFileCleanThread.getInstance().start(logRetentionDays);
//任務(wù)執(zhí)行結(jié)果回調(diào)線程(包含回調(diào)失敗后重試機(jī)制)
// init TriggerCallbackThread
TriggerCallbackThread.getInstance().start();
// init executor-server
//設(shè)置執(zhí)行器ip和port
port = port>0?port: NetUtil.findAvailablePort(9999);
ip = (ip!=null&&ip.trim().length()>0)?ip: IpUtil.getIp();
//注冊(cè)執(zhí)行器
initRpcProvider(ip, port, appName, accessToken);
}
接著看上面的initAdminBizList(adminAddresses, accessToken)方法举哟,這一步是初始化Admin的值,以及初始化執(zhí)行器訪問口令迅矛,下面看具體執(zhí)行邏輯
//初始化各種rpc的各種協(xié)議
private void initAdminBizList(String adminAddresses, String accessToken) throws Exception {
serializer = Serializer.SerializeEnum.HESSIAN.getSerializer();
if (adminAddresses!=null && adminAddresses.trim().length()>0) {
for (String address: adminAddresses.trim().split(",")) {
if (address!=null && address.trim().length()>0) {
String addressUrl = address.concat(AdminBiz.MAPPING);
// 這里的getObject() 返回的是一個(gè)動(dòng)態(tài)代理對(duì)象妨猩,代理對(duì)象在使用方法時(shí),并不是真實(shí)的自己調(diào)用秽褒,而是委托尤其關(guān)聯(lián)到的hander對(duì)象的invoke方法來調(diào)用
AdminBiz adminBiz = (AdminBiz) new XxlRpcReferenceBean(
NetEnum.NETTY_HTTP,
serializer,
CallType.SYNC,
LoadBalance.ROUND,
AdminBiz.class,
null,
3000,
addressUrl,
accessToken,
null,
null
).getObject();//getObject方法比較重要
if (adminBizList == null) {
adminBizList = new ArrayList<AdminBiz>();
}
adminBizList.add(adminBiz);
}
}
}
}
public Object getObject() {
//使用動(dòng)態(tài)代理壶硅,通過此方法發(fā)送請(qǐng)求到Admin的/api接口,api接口收到請(qǐng)求后销斟,解析出具體的方法和參數(shù)庐椒,獲取到對(duì)應(yīng)的Bean,通過反射執(zhí)行具體的方法蚂踊,最終實(shí)現(xiàn)調(diào)用AdminBizImpl.registry()
return Proxy.newProxyInstance(Thread.currentThread()
.getContextClassLoader(), new Class[] { iface },
new InvocationHandler() {
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
接著看initRpcProvider(ip, port, appName, accessToken)方法:
private void initRpcProvider(String ip, int port, String appName, String accessToken) throws Exception {
// init, provider factory
String address = IpUtil.getIpPort(ip, port);
Map<String, String> serviceRegistryParam = new HashMap<String, String>();
serviceRegistryParam.put("appName", appName);
serviceRegistryParam.put("address", address);
xxlRpcProviderFactory = new XxlRpcProviderFactory();
//指定執(zhí)行器注冊(cè)類為ExecutorServiceRegistry
xxlRpcProviderFactory.initConfig(NetEnum.NETTY_HTTP, Serializer.SerializeEnum.HESSIAN.getSerializer(), ip, port, accessToken, ExecutorServiceRegistry.class, serviceRegistryParam);
// add services
xxlRpcProviderFactory.addService(ExecutorBiz.class.getName(), null, new ExecutorBizImpl());
//啟動(dòng)執(zhí)行器注冊(cè)工廠
// start
xxlRpcProviderFactory.start();
}
接著是 xxlRpcProviderFactory.start()方法
public void start() throws Exception {
// start server
serviceAddress = IpUtil.getIpPort(this.ip, port);
server = netType.serverClass.newInstance();
server.setStartedCallback(new BaseCallback() { // serviceRegistry started
@Override
public void run() throws Exception {
// start registry
if (serviceRegistryClass != null) {
serviceRegistry = serviceRegistryClass.newInstance();
//執(zhí)行器注冊(cè)類啟動(dòng)
serviceRegistry.start(serviceRegistryParam);
if (serviceData.size() > 0) {
serviceRegistry.registry(serviceData.keySet(), serviceAddress);
}
}
}
});
server.setStopedCallback(new BaseCallback() { // serviceRegistry stoped
@Override
public void run() {
// stop registry
if (serviceRegistry != null) {
if (serviceData.size() > 0) {
serviceRegistry.remove(serviceData.keySet(), serviceAddress);
}
serviceRegistry.stop();
serviceRegistry = null;
}
}
});
server.start(this);
}
回到ExecutorServiceRegistry的start方法约谈,
public static class ExecutorServiceRegistry extends ServiceRegistry {
@Override
public void start(Map<String, String> param) {
//此處進(jìn)行注冊(cè)
// start registry
ExecutorRegistryThread.getInstance().start(param.get("appName"), param.get("address"));
}
@Override
public void stop() {
// stop registry
ExecutorRegistryThread.getInstance().toStop();
}
@Override
public boolean registry(Set<String> keys, String value) {
return false;
}
@Override
public boolean remove(Set<String> keys, String value) {
return false;
}
@Override
public Map<String, TreeSet<String>> discovery(Set<String> keys) {
return null;
}
@Override
public TreeSet<String> discovery(String key) {
return null;
}
}
進(jìn)入ExecutorRegistryThread.start()方法,
registryThread = new Thread(new Runnable() {
@Override
public void run() {
// registry
while (!toStop) {
try {
RegistryParam registryParam = new RegistryParam(RegistryConfig.RegistType.EXECUTOR.name(), appName, address);
for (AdminBiz adminBiz: XxlJobExecutor.getAdminBizList()) {
try {//此處注冊(cè)
ReturnT<String> registryResult = adminBiz.registry(registryParam);
if (registryResult!=null && ReturnT.SUCCESS_CODE == registryResult.getCode()) {
registryResult = ReturnT.SUCCESS;
logger.debug(">>>>>>>>>>> xxl-job registry success, registryParam:{}, registryResult:{}", new Object[]{registryParam, registryResult});
break;
} else {
logger.info(">>>>>>>>>>> xxl-job registry fail, registryParam:{}, registryResult:{}", new Object[]{registryParam, registryResult});
}
} catch (Exception e) {
logger.info(">>>>>>>>>>> xxl-job registry error, registryParam:{}", registryParam, e);
}
}
} catch (Exception e) {
if (!toStop) {
logger.error(e.getMessage(), e);
}
}
Admin中接受Executor請(qǐng)求的入口
/**
* Executor調(diào)用Admin入口,接受到請(qǐng)求后棱诱,在進(jìn)行反思操作泼橘,實(shí)現(xiàn)調(diào)用具體方法
* Created by xuxueli on 17/5/10.
*/
@Controller
public class JobApiController implements InitializingBean {
@Override
public void afterPropertiesSet() throws Exception {
}
//執(zhí)行器調(diào)用管理器方法入口
@RequestMapping(AdminBiz.MAPPING)
@PermissionLimit(limit=false)
public void api(HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException {
XxlJobScheduler.invokeAdminService(request, response);
}
}
XxlJobScheduler.invokeAdminService(request, response) ->servletServerHandler.handle(null, request, response)->xxlRpcProviderFactory.invokeService(xxlRpcRequest)
public XxlRpcResponse invokeService(XxlRpcRequest xxlRpcRequest) {
// make response
XxlRpcResponse xxlRpcResponse = new XxlRpcResponse();
xxlRpcResponse.setRequestId(xxlRpcRequest.getRequestId());
// match service bean 獲取匹配的Bean
String serviceKey = makeServiceKey(xxlRpcRequest.getClassName(), xxlRpcRequest.getVersion());
Object serviceBean = serviceData.get(serviceKey);
// valid
if (serviceBean == null) {
xxlRpcResponse.setErrorMsg("The serviceKey["+ serviceKey +"] not found.");
return xxlRpcResponse;
}
if (System.currentTimeMillis() - xxlRpcRequest.getCreateMillisTime() > 3*60*1000) {
xxlRpcResponse.setErrorMsg("The timestamp difference between admin and executor exceeds the limit.");
return xxlRpcResponse;
}
if (accessToken!=null && accessToken.trim().length()>0 && !accessToken.trim().equals(xxlRpcRequest.getAccessToken())) {
xxlRpcResponse.setErrorMsg("The access token[" + xxlRpcRequest.getAccessToken() + "] is wrong.");
return xxlRpcResponse;
}
try {
// invoke
Class<?> serviceClass = serviceBean.getClass();
String methodName = xxlRpcRequest.getMethodName();
Class<?>[] parameterTypes = xxlRpcRequest.getParameterTypes();
Object[] parameters = xxlRpcRequest.getParameters();
Method method = serviceClass.getMethod(methodName, parameterTypes);
method.setAccessible(true);
//反射調(diào)用具體方法
Object result = method.invoke(serviceBean, parameters);
/*FastClass serviceFastClass = FastClass.create(serviceClass);
FastMethod serviceFastMethod = serviceFastClass.getMethod(methodName, parameterTypes);
Object result = serviceFastMethod.invoke(serviceBean, parameters);*/
xxlRpcResponse.setResult(result);
} catch (Throwable t) {
// catch error
logger.error("xxl-rpc provider invokeService error.", t);
xxlRpcResponse.setErrorMsg(ThrowableUtil.toString(t));
}
return xxlRpcResponse;
}
Admin中實(shí)現(xiàn)類
@Override
public ReturnT<String> registry(RegistryParam registryParam) {
//注冊(cè)信息入庫(kù)
int ret = xxlJobRegistryDao.registryUpdate(registryParam.getRegistGroup(), registryParam.getRegistryKey(), registryParam.getRegistryValue());
if (ret < 1) {
xxlJobRegistryDao.registrySave(registryParam.getRegistGroup(), registryParam.getRegistryKey(), registryParam.getRegistryValue());
// fresh
freshGroupRegistryInfo(registryParam);
}
return ReturnT.SUCCESS;
}
其中Executor動(dòng)態(tài)代理AdminBiz接口和Admin的/api動(dòng)態(tài)反射執(zhí)行具體方法屬于作者自研RPC框架部分,本篇只做了注冊(cè)部分和解析部分的介紹军俊,后續(xù)會(huì)單獨(dú)介紹自研RPC框架部分侥加。
到此,執(zhí)行器的地址就已經(jīng)完全注冊(cè)到管理器中粪躬。
閱讀原文關(guān)注公眾號(hào),更多文章持續(xù)更新中昔穴,原文地址:
https://mp.weixin.qq.com/s/sJarz6_zBWzKtIr-qFIbsw