xxl-job: v2.0.2 原理 目錄學(xué)習(xí)
- 0. xxl-job原理
- 1. xxl-job原理---定時(shí)任務(wù)架構(gòu)
- 2. xxl-job原理-- 調(diào)度中心
- 3. xxl-job原理-- 執(zhí)行器注冊(cè)
- 4. xxl-job原理-- 執(zhí)行器注冊(cè)問題
- 5 xxl-job原理-- 執(zhí)行器注冊(cè)問題
- 6. xxl-job 原理-- 調(diào)度中心注冊(cè)
- 7. xxl-job 原理-- 任務(wù)管理
- 8. xxl-job 原理-- 任務(wù)執(zhí)行或觸發(fā)
- 9. xxl-job原理-- jobthread的作用
- 10. xxl-job原理---回調(diào)
環(huán)境:
- idea:2018.3
- win10
- maven: 3.5.3
- jdk:1.8
- spring cloud:Finchley.RELEASE
- spring boot: 2.0.8
- quartz-2.2.3
- xxl-job:2.0.2
1. 執(zhí)行器注冊(cè)
需要有一個(gè)調(diào)度中心斧散,其他注冊(cè)器將自己注冊(cè)到調(diào)度中心鸡捐,然后就可以接任務(wù)了箍镜。
缺點(diǎn)色迂, 調(diào)度中心不能自我注冊(cè)
2. 注冊(cè)方式
- 自動(dòng)注冊(cè)
- 手動(dòng)錄入
主動(dòng)注冊(cè)中就是在配置中心,設(shè)置配置信息 xxl.job.admin.address=**, xxl.job.executor.appName=**, 注意appName不可以重復(fù)
手動(dòng)錄入图张,其中的Appname是不能重復(fù)的
自動(dòng)注冊(cè)流程
在執(zhí)行器啟動(dòng)時(shí)會(huì)讀取配置祸轮,當(dāng)存在任務(wù)調(diào)度中心地址會(huì)依次向任務(wù)調(diào)度中心注冊(cè)其地址.
- 填寫配置信息
### xxl-job admin address list, such as "http://address" or "http://address01,http://address02"
##任務(wù)調(diào)度中心地址, 多個(gè)地址有逗號(hào)隔開
xxl.job.admin.addresses=http://127.0.0.1:7995/xxl-job-admin/
### xxl-job executor address
##任務(wù)調(diào)度器名稱和機(jī)器信息
xxl.job.executor.appname=xxl-job-executor-sample
xxl.job.executor.ip=127.0.0.1
xxl.job.executor.port=9999
- 查找流程
xxl-job-executor-service
--------------------------------------
@Configuration 注解的XxlJobConfig ----> @Bean 修飾的xxlJobExecutor方法
-------------------------------------
@Bean(initMethod = "start", destroyMethod = "destroy")
public XxlJobSpringExecutor xxlJobExecutor() {
// xxl-job 配置信息初始化适袜, 初始化 執(zhí)行器的基本信息痪蝇, appname, ip, port, accessToken, logpath, logretentionDays
logger.info(">>>>>>>>>>> xxl-job config init.");
XxlJobSpringExecutor xxlJobSpringExecutor = new XxlJobSpringExecutor();
xxlJobSpringExecutor.setAdminAddresses(adminAddresses);
xxlJobSpringExecutor.setAppName(appName);
xxlJobSpringExecutor.setIp(ip);
xxlJobSpringExecutor.setPort(port);
xxlJobSpringExecutor.setAccessToken(accessToken);
xxlJobSpringExecutor.setLogPath(logPath);
xxlJobSpringExecutor.setLogRetentionDays(logRetentionDays);
return xxlJobSpringExecutor;
}
-------------------------------------
注意@Bean的 initMethod 與 destroyMethod
xxl-job-core:2.0.2
---------------------
public class XxlJobSpringExecutor extends XxlJobExecutor implements ApplicationContextAware {
@Override
public void start() throws Exception {
// init JobHandler Repository
initJobHandlerRepository(applicationContext);
// refresh GlueFactory
GlueFactory.refreshInstance(1);
// super start
super.start();
}
private void initJobHandlerRepository(ApplicationContext applicationContext){
...
// init job handler action, 根據(jù)應(yīng)用上下文獲取ServiceBeanMap,
// ServiceBeanMap的value包含IJobHandler的子類
// class DemoJobHandler extends IJobHandler , 該DemoJobHandler 就是
Map<String, Object> serviceBeanMap = applicationContext.getBeansWithAnnotation(JobHandler.class);
if (serviceBeanMap!=null && serviceBeanMap.size()>0) {
for (Object serviceBean : serviceBeanMap.values()) {
if (serviceBean instanceof IJobHandler){
// 獲取@JobHandler(value="demoJobHandler") 定義的value值
String name = serviceBean.getClass().getAnnotation(JobHandler.class).value();
IJobHandler handler = (IJobHandler) serviceBean;
// 從jobHandlerRepository中查找demoJobHandler值, 如果有给僵,拋出異常
if (loadJobHandler(name) != null) {
throw new RuntimeException("xxl-job jobhandler naming conflicts.");
}
// 沒有就將該demoJobHandler 注冊(cè)到 jobHandlerRepository中帝际, xxl-job register jobhandler success
// jobHandlerRepository 是有ConcurrentHashMap 類饶辙, key: String, value : IJobHandler以及子類
registJobHandler(name, handler);
}
}
}
}
......
}
XxlJobExecutor
public static IJobHandler loadJobHandler(String name){
return jobHandlerRepository.get(name);
}
GlueFactory
// 創(chuàng)建glueFactory實(shí)例, 用于根據(jù)name生成實(shí)例
public static void refreshInstance(int type){
if (type == 0) {
glueFactory = new GlueFactory();
} else if (type == 1) {
// 主要使用SpringGlueFactory
glueFactory = new SpringGlueFactory();
}
}
XxlJobExecutor.start()
// ---------------------- start + stop ----------------------
public void start() throws Exception {
// init logpath
// 初始化log日志文件,
// 默認(rèn)的日志路徑為/data/applogs/xxl-job/jobhandler
// 如果日志不存在就設(shè)置為/data/applogs/xxl-job/jobhandler痕慢, 創(chuàng)建響應(yīng)的目錄涌矢,
// 如果/data/applogs/xxl-job/jobhandler/gluesource不存在娜庇,就創(chuàng)建
XxlJobFileAppender.initLogPath(logPath);
// init invoker, admin-client
initAdminBizList(adminAddresses, accessToken);
// init JobLogFileCleanThread
JobLogFileCleanThread.getInstance().start(logRetentionDays);
// init TriggerCallbackThread
TriggerCallbackThread.getInstance().start();
// init executor-server
port = port>0?port: NetUtil.findAvailablePort(9999);
ip = (ip!=null&&ip.trim().length()>0)?ip: IpUtil.getIp();
initRpcProvider(ip, port, appName, accessToken);
}
XxlJobExecutor.initAdminBizList
// ---------------------- admin-client (rpc invoker) ----------------------
private static List<AdminBiz> adminBizList;
private static Serializer serializer;
//當(dāng)存在多個(gè)任務(wù)調(diào)度中心時(shí)名秀,創(chuàng)建代理類并注冊(cè)泰偿,在NetComClientProxy
private void initAdminBizList(String adminAddresses, String accessToken) throws Exception {
// 獲取Serializer(抽象類) 子類實(shí)例
serializer = Serializer.SerializeEnum.HESSIAN.getSerializer();
// 當(dāng)存在多個(gè)任務(wù)調(diào)度中心
if (adminAddresses!=null && adminAddresses.trim().length()>0) {
for (String address: adminAddresses.trim().split(",")) {
if (address!=null && address.trim().length()>0) {
// address 連接上/api
String addressUrl = address.concat(AdminBiz.MAPPING);
// 構(gòu)建 xxlRpcRefrenceBean 實(shí)例 , 注意這些屬性信息
AdminBiz adminBiz = (AdminBiz) new XxlRpcReferenceBean(
// 使用 netty_http 實(shí)現(xiàn)RPC, 在v2.0.1 使用的是 Jetty_http , v2.0.2 使用的是Netty_http
NetEnum.NETTY_HTTP,
// Serializer 實(shí)例
serializer,
// CallType 調(diào)用方式攒发, SYNC 同步
CallType.SYNC,
// LoadBalance 方式惠猿, 路由策略為 輪詢
LoadBalance.ROUND,
// AdminBiz 類
AdminBiz.class,
// version 版本
null,
// timeout 超時(shí)時(shí)間
10000,
// 注冊(cè)中心地址
addressUrl,
// 權(quán)限控制
accessToken,
// invokeCallback 回調(diào)
null,
// invokerFactory 調(diào)用工廠
null
).getObject(); // 使用動(dòng)態(tài)代理生成AdminBiz 實(shí)現(xiàn)類
// 存儲(chǔ) adminBiz實(shí)例
if (adminBizList == null) {
adminBizList = new ArrayList<AdminBiz>();
}
adminBizList.add(adminBiz);
}
}
}
}
getObject
XxlRpcReferenceBean
public Object getObject() {
// 使用動(dòng)態(tài)代理偶妖,生成代理類
return Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(), new Class[]{this.iface}, new InvocationHandler() {
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
String className = method.getDeclaringClass().getName();
String varsion_ = XxlRpcReferenceBean.this.version;
String methodName = method.getName();
Class<?>[] parameterTypes = method.getParameterTypes();
Object[] parameters = args;
if (className.equals(XxlRpcGenericService.class.getName()) && methodName.equals("invoke")) {
Class<?>[] paramTypes = null;
if (args[3] != null) {
String[] paramTypes_str = (String[])((String[])args[3]);
if (paramTypes_str.length > 0) {
paramTypes = new Class[paramTypes_str.length];
for(int i = 0; i < paramTypes_str.length; ++i) {
paramTypes[i] = ClassUtil.resolveClass(paramTypes_str[i]);
}
}
}
className = (String)args[0];
varsion_ = (String)args[1];
methodName = (String)args[2];
parameterTypes = paramTypes;
parameters = (Object[])((Object[])args[4]);
}
if (className.equals(Object.class.getName())) {
XxlRpcReferenceBean.logger.info(">>>>>>>>>>> xxl-rpc proxy class-method not support [{}#{}]", className, methodName);
throw new XxlRpcException("xxl-rpc proxy class-method not support");
} else {
String finalAddress = XxlRpcReferenceBean.this.address;
if ((finalAddress == null || finalAddress.trim().length() == 0) && XxlRpcReferenceBean.this.invokerFactory != null && XxlRpcReferenceBean.this.invokerFactory.getServiceRegistry() != null) {
String serviceKey = XxlRpcProviderFactory.makeServiceKey(className, varsion_);
TreeSet<String> addressSet = XxlRpcReferenceBean.this.invokerFactory.getServiceRegistry().discovery(serviceKey);
if (addressSet != null && addressSet.size() != 0) {
if (addressSet.size() == 1) {
finalAddress = (String)addressSet.first();
} else {
finalAddress = XxlRpcReferenceBean.this.loadBalance.xxlRpcInvokerRouter.route(serviceKey, addressSet);
}
}
}
if (finalAddress != null && finalAddress.trim().length() != 0) {
XxlRpcRequest xxlRpcRequest = new XxlRpcRequest();
xxlRpcRequest.setRequestId(UUID.randomUUID().toString());
xxlRpcRequest.setCreateMillisTime(System.currentTimeMillis());
xxlRpcRequest.setAccessToken(XxlRpcReferenceBean.this.accessToken);
xxlRpcRequest.setClassName(className);
xxlRpcRequest.setMethodName(methodName);
xxlRpcRequest.setParameterTypes(parameterTypes);
xxlRpcRequest.setParameters(parameters);
XxlRpcFutureResponse futureResponse;
// 使用同步調(diào)用方式董虱,底層使用netty_http愤诱,執(zhí)行RPC框架淫半, 調(diào)用調(diào)度中心方法
if (CallType.SYNC == XxlRpcReferenceBean.this.callType) {
// 構(gòu)建xxlRpc 回調(diào)的結(jié)果對(duì)象
futureResponse = new XxlRpcFutureResponse(XxlRpcReferenceBean.this.invokerFactory, xxlRpcRequest, (XxlRpcInvokeCallback)null);
Object var31;
try {
XxlRpcReferenceBean.this.client.asyncSend(finalAddress, xxlRpcRequest);
// 得到返回結(jié)果對(duì)象
XxlRpcResponse xxlRpcResponse = futureResponse.get(XxlRpcReferenceBean.this.timeout, TimeUnit.MILLISECONDS);
if (xxlRpcResponse.getErrorMsg() != null) {
throw new XxlRpcException(xxlRpcResponse.getErrorMsg());
}
// 返回結(jié)果
var31 = xxlRpcResponse.getResult();
} catch (Exception var21) {
XxlRpcReferenceBean.logger.info(">>>>>>>>>>> xxl-rpc, invoke error, address:{}, XxlRpcRequest{}", finalAddress, xxlRpcRequest);
throw (Throwable)(var21 instanceof XxlRpcException ? var21 : new XxlRpcException(var21));
} finally {
futureResponse.removeInvokerFuture();
}
return var31;
} else if (CallType.FUTURE == XxlRpcReferenceBean.this.callType) {
futureResponse = new XxlRpcFutureResponse(XxlRpcReferenceBean.this.invokerFactory, xxlRpcRequest, (XxlRpcInvokeCallback)null);
try {
XxlRpcInvokeFuture invokeFuture = new XxlRpcInvokeFuture(futureResponse);
XxlRpcInvokeFuture.setFuture(invokeFuture);
XxlRpcReferenceBean.this.client.asyncSend(finalAddress, xxlRpcRequest);
return null;
} catch (Exception var20) {
XxlRpcReferenceBean.logger.info(">>>>>>>>>>> xxl-rpc, invoke error, address:{}, XxlRpcRequest{}", finalAddress, xxlRpcRequest);
futureResponse.removeInvokerFuture();
throw (Throwable)(var20 instanceof XxlRpcException ? var20 : new XxlRpcException(var20));
}
} else if (CallType.CALLBACK == XxlRpcReferenceBean.this.callType) {
XxlRpcInvokeCallback finalInvokeCallback = XxlRpcReferenceBean.this.invokeCallback;
XxlRpcInvokeCallback threadInvokeCallback = XxlRpcInvokeCallback.getCallback();
if (threadInvokeCallback != null) {
finalInvokeCallback = threadInvokeCallback;
}
if (finalInvokeCallback == null) {
throw new XxlRpcException("xxl-rpc XxlRpcInvokeCallback(CallType=" + CallType.CALLBACK.name() + ") cannot be null.");
} else {
XxlRpcFutureResponse futureResponsex = new XxlRpcFutureResponse(XxlRpcReferenceBean.this.invokerFactory, xxlRpcRequest, finalInvokeCallback);
try {
XxlRpcReferenceBean.this.client.asyncSend(finalAddress, xxlRpcRequest);
return null;
} catch (Exception var19) {
XxlRpcReferenceBean.logger.info(">>>>>>>>>>> xxl-rpc, invoke error, address:{}, XxlRpcRequest{}", finalAddress, xxlRpcRequest);
futureResponsex.removeInvokerFuture();
throw (Throwable)(var19 instanceof XxlRpcException ? var19 : new XxlRpcException(var19));
}
}
} else if (CallType.ONEWAY == XxlRpcReferenceBean.this.callType) {
XxlRpcReferenceBean.this.client.asyncSend(finalAddress, xxlRpcRequest);
return null;
} else {
throw new XxlRpcException("xxl-rpc callType[" + XxlRpcReferenceBean.this.callType + "] invalid");
}
} else {
throw new XxlRpcException("xxl-rpc reference bean[" + className + "] address empty");
}
}
}
});
}
通過測(cè)試砌溺,查看注冊(cè)的代碼走向
1
實(shí)際還是調(diào)用 AdminBizImpl .registry方法规伐。
如何實(shí)現(xiàn)自我注冊(cè)
最簡(jiǎn)單的方式:
設(shè)置RegistryParam對(duì)象屬性,
將admin所在機(jī)器的地址與配置文件中端口號(hào)鲜棠,配置文件中xxl.job.executor.appname的屬性注入其中培慌。
然后調(diào)用AdminBizImpl.registry(RegistryParam)方法即可
總結(jié)
執(zhí)行器注冊(cè)流程如下:
1. init JobHandler Repository
2. refresh GlueFactory
3. super start
init logpath
init invoker, admin-client 利用反射吵护,向注冊(cè)中心注冊(cè)自己,并將adminBiz緩存起來
init JobLogFileCleanThread 處理jobLog日志
init TriggerCallbackThread 處理 觸發(fā)周期性 回調(diào)譬圣, callback 和 retry
init executor-server initRpcProvider 啟動(dòng)serviceRegister注冊(cè)
init, provider factory (ExecutorServiceRegistry)
add services 將該執(zhí)行器以服務(wù)的形式加入
start 會(huì)利用netty 開啟一個(gè)server, port: 9999,
還會(huì)開啟有一個(gè)ExecutorRegistryThread線程雄坪,不斷地注冊(cè)自己
問題: 自我注冊(cè)成功维哈, 但是注冊(cè)之前的執(zhí)行器丟失, 而且新注冊(cè)的執(zhí)行器無法顯示
PS: 若你覺得可以飘庄、還行竭宰、過得去份招、甚至不太差的話锁摔,可以“關(guān)注”一下哼审,就此謝過!