一 服務(wù)端主體流程
服務(wù)端流程
- 任務(wù)觸發(fā)時(shí)绒怨,需要進(jìn)行執(zhí)行器路由處理,并組裝任務(wù)相關(guān)配置信息谦疾,如阻塞策略南蹂,分片參數(shù),超時(shí)時(shí)間等念恍。
二 表
2.1 XxlJobRegistry
- 執(zhí)行器注冊(cè)信息
類型六剥,應(yīng)用,執(zhí)行器地址峰伙,心跳時(shí)間 - 任務(wù)信息
public class XxlJobInfo {
private int id; // 主鍵ID (JobKey.name)
private int jobGroup; // 執(zhí)行器主鍵ID (JobKey.group)
private String jobCron; // 任務(wù)執(zhí)行CRON表達(dá)式 【base on quartz】
private String jobDesc;
private Date addTime;
private Date updateTime;
private String author; // 負(fù)責(zé)人
private String alarmEmail; // 報(bào)警郵件
private String executorRouteStrategy; // 執(zhí)行器路由策略
private String executorHandler; // 執(zhí)行器疗疟,任務(wù)Handler名稱
private String executorParam; // 執(zhí)行器,任務(wù)參數(shù)
private String executorBlockStrategy; // 阻塞處理策略
private int executorTimeout; // 任務(wù)執(zhí)行超時(shí)時(shí)間瞳氓,單位秒
private int executorFailRetryCount; // 失敗重試次數(shù)
private String glueType; // GLUE類型 #com.xxl.job.core.glue.GlueTypeEnum
private String glueSource; // GLUE源代碼
private String glueRemark; // GLUE備注
private Date glueUpdatetime; // GLUE更新時(shí)間
private String childJobId; // 子任務(wù)ID策彤,多個(gè)逗號(hào)分隔
// copy from quartz
private String jobStatus; // 任務(wù)狀態(tài) 【base on quartz】
}
- 任務(wù)執(zhí)行記錄
public class XxlJobLog {
private int id;
// job info
private int jobGroup;//執(zhí)行器主鍵id
private int jobId;
// execute info
private String executorAddress;//執(zhí)行器地址
private String executorHandler;//執(zhí)行器任務(wù)執(zhí)行函數(shù)
private String executorParam;//參數(shù)
private String executorShardingParam;//分片參數(shù)
private int executorFailRetryCount;//失敗重試次數(shù)
// trigger info
private Date triggerTime;//觸發(fā)時(shí)間
private int triggerCode;//觸發(fā)結(jié)果
private String triggerMsg;
// handle info
private Date handleTime;//處理完成時(shí)間
private int handleCode;//處理結(jié)果
private String handleMsg;
}
- 應(yīng)用執(zhí)行器信息
public class XxlJobGroup {
private int id;
private String appName;
private String title;
private int order;
private int addressType; // 執(zhí)行器地址類型:0=自動(dòng)注冊(cè)、1=手動(dòng)錄入
private String addressList; // 執(zhí)行器地址列表匣摘,多地址逗號(hào)分隔(手動(dòng)錄入)
}
三 任務(wù)觸發(fā)
- quartz調(diào)度觸發(fā)執(zhí)行RemoteHttpBean.executeInternal
protected void executeInternal(JobExecutionContext context)
throws JobExecutionException {
// load jobId
JobKey jobKey = context.getTrigger().getJobKey();
Integer jobId = Integer.valueOf(jobKey.getName());
// trigger
JobTriggerPoolHelper.trigger(jobId, TriggerTypeEnum.CRON, -1, null, null);
}
- JobTriggerPoolHelper使用線程池店诗,每個(gè)任務(wù)觸發(fā)一個(gè)線程執(zhí)行
public static void trigger(int jobId, TriggerTypeEnum triggerType, int failRetryCount, String executorShardingParam, String executorParam) {
helper.addTrigger(jobId, triggerType, failRetryCount, executorShardingParam, executorParam);
}
public void addTrigger(final int jobId, final TriggerTypeEnum triggerType, final int failRetryCount, final String executorShardingParam, final String executorParam) {
triggerPool.execute(new Runnable() {
@Override
public void run() {
XxlJobTrigger.trigger(jobId, triggerType, failRetryCount, executorShardingParam, executorParam);
}
});
}
- 初始化任務(wù)調(diào)度的信息
TriggerParam triggerParam = new TriggerParam();
//任務(wù)id
triggerParam.setJobId(jobInfo.getId());
//任務(wù)處理函數(shù),參數(shù)
triggerParam.setExecutorHandler(jobInfo.getExecutorHandler());
triggerParam.setExecutorParams(jobInfo.getExecutorParam());
//阻塞策略
triggerParam.setExecutorBlockStrategy(jobInfo.getExecutorBlockStrategy());
//任務(wù)執(zhí)行超時(shí)時(shí)間配置
triggerParam.setExecutorTimeout(jobInfo.getExecutorTimeout());
//任務(wù)觸發(fā)記錄id
triggerParam.setLogId(jobLog.getId());
triggerParam.setLogDateTim(jobLog.getTriggerTime().getTime());
//任務(wù)執(zhí)行函數(shù)源碼信息
triggerParam.setGlueType(jobInfo.getGlueType());
triggerParam.setGlueSource(jobInfo.getGlueSource());
triggerParam.setGlueUpdatetime(jobInfo.getGlueUpdatetime().getTime());
//分片信息
triggerParam.setBroadcastIndex(index);
triggerParam.setBroadcastTotal(total);
簡(jiǎn)單任務(wù)按照?qǐng)?zhí)行器路由策略選擇執(zhí)行器
executorRouteStrategyEnum.getRouter().route(triggerParam, group.getRegistryList());
xxlrpc發(fā)送任務(wù)觸發(fā)消息
public static ReturnT<String> runExecutor(TriggerParam triggerParam, String address){
ReturnT<String> runResult = null;
try {
ExecutorBiz executorBiz = XxlJobDynamicScheduler.getExecutorBiz(address);
runResult = executorBiz.run(triggerParam);
} catch (Exception e) {
logger.error(">>>>>>>>>>> xxl-job trigger error, please check if the executor[{}] is running.", address, e);
runResult = new ReturnT<String>(ReturnT.FAIL_CODE, ThrowableUtil.toString(e));
}
StringBuffer runResultSB = new StringBuffer(I18nUtil.getString("jobconf_trigger_run") + ":");
runResultSB.append("<br>address:").append(address);
runResultSB.append("<br>code:").append(runResult.getCode());
runResultSB.append("<br>msg:").append(runResult.getMsg());
runResult.setMsg(runResultSB.toString());
return runResult;
}
- 通知開(kāi)始監(jiān)控任務(wù)觸發(fā)記錄
JobFailMonitorHelper.monitor(jobLog.getId());
四 定時(shí)任務(wù)
- JobRegistryMonitorHelper執(zhí)行器心跳掃描音榜,定時(shí)掃描執(zhí)行器心跳時(shí)間必搞,刪除過(guò)期的執(zhí)行器
- JobFailMonitorHelper任務(wù)狀態(tài)監(jiān)控告警及失敗重試
blockqueu存儲(chǔ)所有本調(diào)度器待監(jiān)控的任務(wù),定時(shí)進(jìn)行檢查任務(wù)囊咏。
按照告警策略恕洲,進(jìn)行失敗重試或者發(fā)送告警。
任務(wù)執(zhí)行中梅割,則繼續(xù)監(jiān)控