調(diào)度中心
掃描加載配置 ,XxlJobAdminConfig
-
XxlJobAdminConfig加載完成后磷支,初始化XxlJobScheduler
// ---------------------- XxlJobScheduler ----------------------private XxlJobScheduler xxlJobScheduler; @Override public void afterPropertiesSet() throws Exception { adminConfig = this; xxlJobScheduler = new XxlJobScheduler(); xxlJobScheduler.init(); } @Override public void destroy() throws Exception { xxlJobScheduler.destroy(); } // ---------------------- XxlJobScheduler ----------------------
-
XxlJobScheduler初始化daemon線程
public void init() throws Exception {
// init i18n 初始化國際化
initI18n();// admin registry monitor run JobRegistryMonitorHelper.getInstance().start(); // admin fail-monitor run JobFailMonitorHelper.getInstance().start(); // admin lose-monitor run JobLosedMonitorHelper.getInstance().start(); // admin trigger pool start JobTriggerPoolHelper.toStart(); // admin log report start JobLogReportHelper.getInstance().start(); // start-schedule JobScheduleHelper.getInstance().start(); logger.info(">>>>>>>>> init xxl-job admin success."); }
JobRegistryMonitorHelper.getInstance().start();
初始化執(zhí)行器注冊棺棵,異步執(zhí)行楼咳,registryThread線程,每隔30s執(zhí)行一次
// step 1. 查詢自動注冊的執(zhí)行器烛恤,對應(yīng)mysql表xxl_job_group
List<XxlJobGroup> groupList = XxlJobAdminConfig.getAdminConfig().getXxlJobGroupDao().findByAddressType(0);
// step 2. 清除90s未發(fā)送心跳節(jié)點(admin/executor)母怜,對應(yīng)mysql表xxl_job_registry
List<Integer> ids = XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().findDead(RegistryConfig.DEAD_TIMEOUT, new Date());
if (ids!=null && ids.size()>0) {
XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().removeDead(ids);
}
// step 3. 查詢存活節(jié)點,同步xxl_job_registry表未注冊到xxl_job_group.address_list的數(shù)據(jù)
// 3.1 數(shù)據(jù)封裝
HashMap<String, List<String>> appAddressMap = new HashMap<String, List<String>>();
List<XxlJobRegistry> list = XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().findAll(RegistryConfig.DEAD_TIMEOUT, new Date());
if (list != null) {
for (XxlJobRegistry item: list) {
if (RegistryConfig.RegistType.EXECUTOR.name().equals(item.getRegistryGroup())) {
String appname = item.getRegistryKey();
List<String> registryList = appAddressMap.get(appname);
if (registryList == null) {
registryList = new ArrayList<String>();
}
if (!registryList.contains(item.getRegistryValue())) {
registryList.add(item.getRegistryValue());
}
appAddressMap.put(appname, registryList);
}
}
}
// 3.2 fresh group address
for (XxlJobGroup group: groupList) {
List<String> registryList = appAddressMap.get(group.getAppname());
String addressListStr = null;
if (registryList!=null && !registryList.isEmpty()) {
Collections.sort(registryList);
addressListStr = "";
for (String item:registryList) {
addressListStr += item + ",";
}
addressListStr = addressListStr.substring(0, addressListStr.length()-1);
}
group.setAddressList(addressListStr);
XxlJobAdminConfig.getAdminConfig().getXxlJobGroupDao().update(group);
}
JobFailMonitorHelper.getInstance().start();
監(jiān)控報警缚柏,異步執(zhí)行苹熏,monitorThread線程,每隔10s執(zhí)行一次
// lock log
int lockRet = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().updateAlarmStatus(failLogId, 0, -1);
if (lockRet < 1) {
continue;
}
XxlJobLog log = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().load(failLogId);
XxlJobInfo info = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().loadById(log.getJobId());
// 1币喧、fail retry monitor
if (log.getExecutorFailRetryCount() > 0) {
JobTriggerPoolHelper.trigger(log.getJobId(), TriggerTypeEnum.RETRY, (log.getExecutorFailRetryCount()-1), log.getExecutorShardingParam(), log.getExecutorParam(), null);
String retryMsg = "<br><br><span style=\"color:#F39C12;\" > >>>>>>>>>>>"+ I18nUtil.getString("jobconf_trigger_type_retry") +"<<<<<<<<<<< </span><br>";
log.setTriggerMsg(log.getTriggerMsg() + retryMsg);
XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().updateTriggerInfo(log);
}
// 2轨域、fail alarm monitor
int newAlarmStatus = 0; // 告警狀態(tài):0-默認(rèn)、-1=鎖定狀態(tài)杀餐、1-無需告警干发、2-告警成功、3-告警失敗
if (info!=null && info.getAlarmEmail()!=null && info.getAlarmEmail().trim().length()>0) {
boolean alarmResult = XxlJobAdminConfig.getAdminConfig().getJobAlarmer().alarm(info, log);
newAlarmStatus = alarmResult?2:3;
} else {
newAlarmStatus = 1;
}
// 更新狀態(tài)
XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().updateAlarmStatus(failLogId, -1, newAlarmStatus);
JobLosedMonitorHelper.getInstance().start();
超時job置為失敗史翘,異步執(zhí)行枉长,monitorThread線程,每隔60s執(zhí)行一次
// 任務(wù)結(jié)果丟失處理:調(diào)度記錄停留在 "運行中" 狀態(tài)超過10min恶座,且對應(yīng)執(zhí)行器心跳注冊失敗不在線搀暑,則將本地調(diào)度主動標(biāo)記失斄ぱ簟跨琳;
Date losedTime = DateUtil.addMinutes(new Date(), -10);
List<Long> losedJobIds = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().findLostJobIds(losedTime);
if (losedJobIds!=null && losedJobIds.size()>0) {
for (Long logId: losedJobIds) {
XxlJobLog jobLog = new XxlJobLog();
jobLog.setId(logId);
jobLog.setHandleTime(new Date());
jobLog.setHandleCode(ReturnT.FAIL_CODE);
jobLog.setHandleMsg( I18nUtil.getString("joblog_lost_fail") );
XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().updateHandleInfo(jobLog);
}
}
JobTriggerPoolHelper.toStart();
JobTriggerPoolHelper
啟動快、慢線程池
// 正常線程池
fastTriggerPool = new ThreadPoolExecutor(
10,
XxlJobAdminConfig.getAdminConfig().getTriggerPoolFastMax(),
60L,
TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>(1000),
new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "xxl-job, admin JobTriggerPoolHelper-fastTriggerPool-" + r.hashCode());
}
});
// 慢速線程池
slowTriggerPool = new ThreadPoolExecutor(
10,
XxlJobAdminConfig.getAdminConfig().getTriggerPoolSlowMax(),
60L,
TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>(2000),
new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "xxl-job, admin JobTriggerPoolHelper-slowTriggerPool-" + r.hashCode());
}
});
JobLogReportHelper.getInstance().start();
執(zhí)行記錄報表桐罕,異步執(zhí)行脉让,logrThread
// 1桂敛、log-report refresh: refresh log report in 3 days
try {
for (int i = 0; i < 3; i++) {
// today
Calendar itemDay = Calendar.getInstance();
itemDay.add(Calendar.DAY_OF_MONTH, -i);
itemDay.set(Calendar.HOUR_OF_DAY, 0);
itemDay.set(Calendar.MINUTE, 0);
itemDay.set(Calendar.SECOND, 0);
itemDay.set(Calendar.MILLISECOND, 0);
Date todayFrom = itemDay.getTime();
itemDay.set(Calendar.HOUR_OF_DAY, 23);
itemDay.set(Calendar.MINUTE, 59);
itemDay.set(Calendar.SECOND, 59);
itemDay.set(Calendar.MILLISECOND, 999);
Date todayTo = itemDay.getTime();
// refresh log-report every minute
XxlJobLogReport xxlJobLogReport = new XxlJobLogReport();
xxlJobLogReport.setTriggerDay(todayFrom);
xxlJobLogReport.setRunningCount(0);
xxlJobLogReport.setSucCount(0);
xxlJobLogReport.setFailCount(0);
Map<String, Object> triggerCountMap = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().findLogReport(todayFrom, todayTo);
if (triggerCountMap!=null && triggerCountMap.size()>0) {
int triggerDayCount = triggerCountMap.containsKey("triggerDayCount")?Integer.valueOf(String.valueOf(triggerCountMap.get("triggerDayCount"))):0;
int triggerDayCountRunning = triggerCountMap.containsKey("triggerDayCountRunning")?Integer.valueOf(String.valueOf(triggerCountMap.get("triggerDayCountRunning"))):0;
int triggerDayCountSuc = triggerCountMap.containsKey("triggerDayCountSuc")?Integer.valueOf(String.valueOf(triggerCountMap.get("triggerDayCountSuc"))):0;
int triggerDayCountFail = triggerDayCount - triggerDayCountRunning - triggerDayCountSuc;
xxlJobLogReport.setRunningCount(triggerDayCountRunning);
xxlJobLogReport.setSucCount(triggerDayCountSuc);
xxlJobLogReport.setFailCount(triggerDayCountFail);
}
// do refresh
int ret = XxlJobAdminConfig.getAdminConfig().getXxlJobLogReportDao().update(xxlJobLogReport);
if (ret < 1) {
XxlJobAdminConfig.getAdminConfig().getXxlJobLogReportDao().save(xxlJobLogReport);
}
}
} catch (Exception e) {
if (!toStop) {
logger.error(">>>>>>>>>>> xxl-job, job log report thread error:{}", e);
}
}
// 2、log-clean: switch open & once each day
if (XxlJobAdminConfig.getAdminConfig().getLogretentiondays()>0
&& System.currentTimeMillis() - lastCleanLogTime > 24*60*60*1000) {
// expire-time
Calendar expiredDay = Calendar.getInstance();
expiredDay.add(Calendar.DAY_OF_MONTH, -1 * XxlJobAdminConfig.getAdminConfig().getLogretentiondays());
expiredDay.set(Calendar.HOUR_OF_DAY, 0);
expiredDay.set(Calendar.MINUTE, 0);
expiredDay.set(Calendar.SECOND, 0);
expiredDay.set(Calendar.MILLISECOND, 0);
Date clearBeforeTime = expiredDay.getTime();
// clean expired log
List<Long> logIds = null;
do {
logIds = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().findClearLogIds(0, 0, clearBeforeTime, 0, 1000);
if (logIds!=null && logIds.size()>0) {
XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().clearLog(logIds);
}
} while (logIds!=null && logIds.size()>0);
// update clean time
lastCleanLogTime = System.currentTimeMillis();
}
try {
TimeUnit.MINUTES.sleep(1);
} catch (Exception e) {
if (!toStop) {
logger.error(e.getMessage(), e);
}
}
JobScheduleHelper.getInstance().start();
job執(zhí)行
// pre-read count: treadpool-size * trigger-qps (each trigger cost 50ms, qps = 1000/50 = 20)
// 計算預(yù)讀數(shù)量
int preReadCount = (XxlJobAdminConfig.getAdminConfig().getTriggerPoolFastMax() + XxlJobAdminConfig.getAdminConfig().getTriggerPoolSlowMax()) * 20;
// 加鎖
conn = XxlJobAdminConfig.getAdminConfig().getDataSource().getConnection();
connAutoCommit = conn.getAutoCommit();
conn.setAutoCommit(false);
preparedStatement = conn.prepareStatement( "select * from xxl_job_lock where lock_name = 'schedule_lock' for update" );
preparedStatement.execute();
// tx start
// 1溅潜、pre read 查詢運行中术唬,到達(dá)執(zhí)行時間job limit preReadCount,mysql表xxl_job_info
long nowTime = System.currentTimeMillis();
List<XxlJobInfo> scheduleList = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().scheduleJobQuery(nowTime + PRE_READ_MS, preReadCount);
if (scheduleList!=null && scheduleList.size()>0) {
// 2、push time-ring
for (XxlJobInfo jobInfo: scheduleList) {
// time-ring jump
if (nowTime > jobInfo.getTriggerNextTime() + PRE_READ_MS) {
// 2.1滚澜、trigger-expire > 5s:pass && make next-trigger-time
// 下次執(zhí)行時間超過當(dāng)前5s不執(zhí)行粗仓,更新時間,等下一次執(zhí)行
logger.warn(">>>>>>>>>>> xxl-job, schedule misfire, jobId = " + jobInfo.getId());
// fresh next
refreshNextValidTime(jobInfo, new Date());
} else if (nowTime > jobInfo.getTriggerNextTime()) {
// 2.2设捐、trigger-expire < 5s:direct-trigger && make next-trigger-time
// 下次執(zhí)行在5s內(nèi)借浊,執(zhí)行
// 1、trigger萝招,加入線程池執(zhí)行蚂斤,快慢線程池選擇策略
JobTriggerPoolHelper.trigger(jobInfo.getId(), TriggerTypeEnum.CRON, -1, null, null, null);
logger.debug(">>>>>>>>>>> xxl-job, schedule push trigger : jobId = " + jobInfo.getId() );
// 2、fresh next 更新狀態(tài)和下次執(zhí)行時間
refreshNextValidTime(jobInfo, new Date());
// next-trigger-time in 5s, pre-read again
if (jobInfo.getTriggerStatus()==1 && nowTime + PRE_READ_MS > jobInfo.getTriggerNextTime()) {
// 1槐沼、make ring second
int ringSecond = (int)((jobInfo.getTriggerNextTime()/1000)%60);
// 2曙蒸、push time ring
pushTimeRing(ringSecond, jobInfo.getId());
// 3、fresh next
refreshNextValidTime(jobInfo, new Date(jobInfo.getTriggerNextTime()));
}
} else {// 執(zhí)行時間 < 當(dāng)前時間
// 2.3岗钩、trigger-pre-read:time-ring trigger && make next-trigger-time
// 1纽窟、make ring second
int ringSecond = (int)((jobInfo.getTriggerNextTime()/1000)%60);
// 2、push time ring
pushTimeRing(ringSecond, jobInfo.getId());
// 3凹嘲、fresh next
refreshNextValidTime(jobInfo, new Date(jobInfo.getTriggerNextTime()));
}
}
// 3师倔、update trigger info
for (XxlJobInfo jobInfo: scheduleList) {
XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().scheduleUpdate(jobInfo);
}
} else {
preReadSuc = false;
}
// tx stop