一、設(shè)計思路
借用官網(wǎng)的話:
將調(diào)度行為抽象形成“調(diào)度中心”公共平臺赢底,而平臺自身并不承擔(dān)業(yè)務(wù)邏輯疼鸟,“調(diào)度中心”負責(zé)發(fā)起調(diào)度請求后控。
將任務(wù)抽象成分散的JobHandler,交由“執(zhí)行器”統(tǒng)一管理空镜,“執(zhí)行器”負責(zé)接收調(diào)度請求并執(zhí)行對應(yīng)的JobHandler中業(yè)務(wù)邏輯浩淘。因此捌朴,“調(diào)度”和“任務(wù)”兩部分可以相互解耦,提高系統(tǒng)整體穩(wěn)定性和擴展性张抄;
借用官網(wǎng)的圖:
簡化的架構(gòu)圖:
二砂蔽、啟動原理
1.xxl-job-admin服務(wù)啟動原理
啟動XxlJobAdminApplication類,在spring容器實例化之前署惯,會執(zhí)行實現(xiàn)了 InitializingBean 接口的 afterPropertiesSet() 的方法左驾,這里是利用了springboot的拓展接口,來將xxl-job的相關(guān)bean給注冊到IOC容器當(dāng)中极谊。然后執(zhí)行最關(guān)鍵的 xxlJobScheduler.init()
調(diào)用 XxlJobScheduler 的 init() 方法
JobRegistryHelper诡右,JobFailMonitorHelper,JobCompleteHelper轻猖,JobLogReportHelper帆吻,JobScheduleHelper這五個類都是使用了餓漢式的單例模式(個人覺得還需要將構(gòu)造方法私有化),
a.調(diào)用 JobTriggerPoolHelper.toStart() 本質(zhì)就是調(diào)用JobTriggerPoolHelper的start()方法咙边,構(gòu)造出兩個線程池猜煮,如下圖所示的,一個快觸發(fā)線程池败许,一個慢觸發(fā)線程池王带。
b.調(diào)用 JobRegistryHelper.getInstance().start(),方法內(nèi)部主要做了兩件事市殷,一件事是初始化一個注冊或者移除的線程池 registryMonitorThread辫秧,然后創(chuàng)建一個 registryMonitorThread 的守護線程。設(shè)置成守護線程被丧。
單獨把線程構(gòu)造拿出來分析盟戏。
// for monitor
registryMonitorThread = new Thread(new Runnable() {
@Override
public void run() {
// 死循環(huán)
while (!toStop) {
try {
// 從 xxl_job_group 表中查詢出 自動注冊的執(zhí)行器 (address_type:0 自動注冊,1:手動注冊)
List<XxlJobGroup> groupList = XxlJobAdminConfig.getAdminConfig().getXxlJobGroupDao().findByAddressType(0);
if (groupList!=null && !groupList.isEmpty()) {
// 從 xxl_job_registry 表中找到更新時間 小于當(dāng)前時間+死亡間隔時間(就是找到注冊表中規(guī)定時間沒有更新的記錄)
List<Integer> ids = XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().findDead(RegistryConfig.DEAD_TIMEOUT, new Date());
if (ids!=null && ids.size()>0) {
// 如果找到在規(guī)定時間內(nèi)沒有更新的注冊甥桂,就直接刪除這些注冊執(zhí)行器
XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().removeDead(ids);
}
// fresh online address (admin/executor)
HashMap<String, List<String>> appAddressMap = new HashMap<String, List<String>>();
// 從 xxl_job_registry 表中找到更新時間 小于當(dāng)前時間+死亡間隔時間(就是找到注冊表中規(guī)定時間沒有更新的記錄)
List<XxlJobRegistry> list = XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().findAll(RegistryConfig.DEAD_TIMEOUT, new Date());
if (list != null) {
// 遍歷當(dāng)前過期的注冊器柿究,將過期的注冊器的 register_value 注冊地址保存到臨時變量 appAddressMap 中
for (XxlJobRegistry item: list) {
// 如果注冊器類型(registry_group) 是 EXECUTOR
if (RegistryConfig.RegistType.EXECUTOR.name().equals(item.getRegistryGroup())) {
String appname = item.getRegistryKey();
List<String> registryList = appAddressMap.get(appname);
if (registryList == null) {
registryList = new ArrayList<String>();
}
if (!registryList.contains(item.getRegistryValue())) {
registryList.add(item.getRegistryValue());
}
appAddressMap.put(appname, registryList);
}
}
}
// 刷新對應(yīng)執(zhí)行器地址和最新修改時間
for (XxlJobGroup group: groupList) {
List<String> registryList = appAddressMap.get(group.getAppname());
String addressListStr = null;
if (registryList!=null && !registryList.isEmpty()) {
Collections.sort(registryList);
StringBuilder addressListSB = new StringBuilder();
for (String item:registryList) {
addressListSB.append(item).append(",");
}
addressListStr = addressListSB.toString();
addressListStr = addressListStr.substring(0, addressListStr.length()-1);
}
group.setAddressList(addressListStr);
group.setUpdateTime(new Date());
XxlJobAdminConfig.getAdminConfig().getXxlJobGroupDao().update(group);
}
}
} catch (Exception e) {
if (!toStop) {
logger.error(">>>>>>>>>>> xxl-job, job registry monitor thread error:{}", e);
}
}
try {
// 休眠心跳的時間
TimeUnit.SECONDS.sleep(RegistryConfig.BEAT_TIMEOUT);
} catch (InterruptedException e) {
if (!toStop) {
logger.error(">>>>>>>>>>> xxl-job, job registry monitor thread error:{}", e);
}
}
}
logger.info(">>>>>>>>>>> xxl-job, job registry monitor thread stop");
}
});
C.調(diào)用 JobFailMonitorHelper.getInstance().start(),方法內(nèi)部創(chuàng)建一個 monitorThread 的守護線程黄选。設(shè)置成守護線程蝇摸。
下面代碼詳細介紹 JobFailMonitorHelper.getInstance().start() 里面的 構(gòu)造的線程主要做的是什么事。
public void start(){
monitorThread = new Thread(new Runnable() {
@Override
public void run() {
// monitor 死循環(huán)監(jiān)控办陷,每10秒鐘(每次執(zhí)行完休眠十秒)執(zhí)行一遍監(jiān)控的內(nèi)容
while (!toStop) {
try {
// 從 xxl_job_log 查詢出失敗的日志記錄
List<Long> failLogIds = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().findFailJobLogIds(1000);
if (failLogIds!=null && !failLogIds.isEmpty()) {
for (long failLogId: failLogIds) {
// 修改 xxl_job_log 將警報狀態(tài)改成 (-1鎖定狀態(tài)) 告警狀態(tài):0-默認貌夕、1-無需告警、2-告警成功民镜、3-告警失敗
int lockRet = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().updateAlarmStatus(failLogId, 0, -1);
if (lockRet < 1) {
continue;
}
// 根據(jù)失敗的日志id啡专,查詢出該條日志
XxlJobLog log = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().load(failLogId);
// 根據(jù)這條日志記錄的 執(zhí)行器Id查詢對應(yīng)的執(zhí)行器
XxlJobInfo info = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().loadById(log.getJobId());
// 如果 日志的重試次數(shù)大于0,就直接觸發(fā)JobTriggerPoolHelper.trigger()方法制圈,這個方法就是admin遠程調(diào)用執(zhí)行器的方法们童。
if (log.getExecutorFailRetryCount() > 0) {
JobTriggerPoolHelper.trigger(log.getJobId(), TriggerTypeEnum.RETRY, (log.getExecutorFailRetryCount()-1), log.getExecutorShardingParam(), log.getExecutorParam(), null);
String retryMsg = "<br><br><span style=\"color:#F39C12;\" > >>>>>>>>>>>"+ I18nUtil.getString("jobconf_trigger_type_retry") +"<<<<<<<<<<< </span><br>";
log.setTriggerMsg(log.getTriggerMsg() + retryMsg);
// 觸發(fā)完成畔况,將失敗重試的次數(shù)減一,更新
XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().updateTriggerInfo(log);
}
// 2慧库、fail alarm monitor
int newAlarmStatus = 0; // 告警狀態(tài):0-默認跷跪、-1=鎖定狀態(tài)、1-無需告警齐板、2-告警成功吵瞻、3-告警失敗
// 如果存在失敗的日志,發(fā)送警報郵件
if (info != null) {
boolean alarmResult = XxlJobAdminConfig.getAdminConfig().getJobAlarmer().alarm(info, log);
newAlarmStatus = alarmResult?2:3;
} else {
newAlarmStatus = 1;
}
// 最后更新一下 xxl_job_log 表的 alarm_status 狀態(tài)字段
XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().updateAlarmStatus(failLogId, -1, newAlarmStatus);
}
}
} catch (Exception e) {
if (!toStop) {
logger.error(">>>>>>>>>>> xxl-job, job fail monitor thread error:{}", e);
}
}
try {
// 休眠 10秒
TimeUnit.SECONDS.sleep(10);
} catch (Exception e) {
if (!toStop) {
logger.error(e.getMessage(), e);
}
}
}
logger.info(">>>>>>>>>>> xxl-job, job fail monitor thread stop");
}
});
monitorThread.setDaemon(true);
monitorThread.setName("xxl-job, admin JobFailMonitorHelper");
monitorThread.start();
}
調(diào)用 JobCompleteHelper.getInstance().start()甘磨,方法內(nèi)部創(chuàng)建一個 monitorThread 的守護線程听皿。設(shè)置成守護線程;以及一個callbackThreadPool線程池宽档。
monitorThread 線程內(nèi)部的工作內(nèi)容
// for monitor
monitorThread = new Thread(new Runnable() {
@Override
public void run() {
// wait for JobTriggerPoolHelper-init
try {
// 上來休眠50毫秒,等待 JobTriggerPoolHelper 初始化完成
TimeUnit.MILLISECONDS.sleep(50);
} catch (InterruptedException e) {
if (!toStop) {
logger.error(e.getMessage(), e);
}
}
// monitor
while (!toStop) {
try {
// 任務(wù)結(jié)果丟失處理:調(diào)度記錄停留在 "運行中" 狀態(tài)超過10min庵朝,且對應(yīng)執(zhí)行器心跳注冊失敗不在線吗冤,則將本地調(diào)度主動標記失敗九府;
Date losedTime = DateUtil.addMinutes(new Date(), -10);
List<Long> losedJobIds = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().findLostJobIds(losedTime);
if (losedJobIds!=null && losedJobIds.size()>0) {
for (Long logId: losedJobIds) {
XxlJobLog jobLog = new XxlJobLog();
jobLog.setId(logId);
jobLog.setHandleTime(new Date());
jobLog.setHandleCode(ReturnT.FAIL_CODE);
jobLog.setHandleMsg( I18nUtil.getString("joblog_lost_fail") );
// 處理日志椎瘟,并更新執(zhí)行器的完成結(jié)果
XxlJobCompleter.updateHandleInfoAndFinish(jobLog);
}
}
} catch (Exception e) {
if (!toStop) {
logger.error(">>>>>>>>>>> xxl-job, job fail monitor thread error:{}", e);
}
}
try {
// 休眠60秒,再執(zhí)行一次
TimeUnit.SECONDS.sleep(60);
} catch (Exception e) {
if (!toStop) {
logger.error(e.getMessage(), e);
}
}
}
logger.info(">>>>>>>>>>> xxl-job, JobLosedMonitorHelper stop");
}
});
E.最后看一下 JobScheduleHelper.getInstance().start(); 方法侄旬。方法里面最主要的就是起兩個線程肺蔚,分別將兩個線程設(shè)置成守護線程,ringThread是最干活的線程儡羔,scheduleThread是檢測并調(diào)度執(zhí)行器任務(wù)的線程宣羊。
看一下這ringThread和scheduleThread里面都是干啥的。這里先總結(jié)一下汰蜘,scheduleThread線程主要就是將需要執(zhí)行的定時器任務(wù)分個類仇冯,并維護每個定時器里面的下次執(zhí)行時間,以及處理 調(diào)度過期的 執(zhí)行器族操,要么立刻執(zhí)行一次苛坚,要么直接忽略,等待下次執(zhí)行色难,最后就是將需要在5秒內(nèi)執(zhí)行的定時器放進一個map里面泼舱,交給ringThread線程去執(zhí)行定時器。而ringThread線程就是直接從map中拿到需要執(zhí)行的執(zhí)行器去執(zhí)行枷莉,并且每輪執(zhí)行只處理兩個時間點(毫秒級)的所有執(zhí)行器娇昙。具體的可以看代碼里面的講解。
public void start(){
// schedule thread
scheduleThread = new Thread(new Runnable() {
@Override
public void run() {
try {
TimeUnit.MILLISECONDS.sleep(5000 - System.currentTimeMillis()%1000 );
} catch (InterruptedException e) {
if (!scheduleThreadToStop) {
logger.error(e.getMessage(), e);
}
}
logger.info(">>>>>>>>> init xxl-job admin scheduler success.");
// pre-read count: treadpool-size * trigger-qps (each trigger cost 50ms, qps = 1000/50 = 20)
// 這里默認么個定時任務(wù)線程執(zhí)行耗時50毫秒笤妙,每秒1000毫秒涯贞,可以執(zhí)行20個任務(wù)枪狂,快線程池默認是200個最大線程數(shù),慢線程默認是100個最大線程數(shù)宋渔,
// 所以當(dāng)線程數(shù)拉滿的情況下州疾,每秒鐘可以處理任務(wù)數(shù)是:(100+200)*20 = 6000 ;所以這里的 preReadCount 表示預(yù)讀數(shù)(默認最大:6000)
// 如果想要提高并發(fā)性皇拣,通過修改快慢線程池的最大線程數(shù)這個參數(shù)調(diào)節(jié)
int preReadCount = (XxlJobAdminConfig.getAdminConfig().getTriggerPoolFastMax() + XxlJobAdminConfig.getAdminConfig().getTriggerPoolSlowMax()) * 20;
logger.info("==========================preReadCount ={}",preReadCount);
while (!scheduleThreadToStop) {
// Scan Job
long start = System.currentTimeMillis();
Connection conn = null;
Boolean connAutoCommit = null;
PreparedStatement preparedStatement = null;
boolean preReadSuc = true;
try {
conn = XxlJobAdminConfig.getAdminConfig().getDataSource().getConnection();
connAutoCommit = conn.getAutoCommit();
conn.setAutoCommit(false);
// 利用數(shù)據(jù)庫的行鎖
preparedStatement = conn.prepareStatement( "select * from xxl_job_lock where lock_name = 'schedule_lock' for update" );
preparedStatement.execute();
// tx start
// 1严蓖、預(yù)讀數(shù)據(jù)
long nowTime = System.currentTimeMillis();
// 由于分析了最多可以執(zhí)行6000個任務(wù),所以這里在去查任務(wù)表的時候氧急,最多去查出來6000條滿足條件的
// 條件:下次執(zhí)行時間 小于等于 (當(dāng)前時間 + 5秒) 并且 執(zhí)行狀態(tài)是 正在運行的狀態(tài)
List<XxlJobInfo> scheduleList = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().scheduleJobQuery(nowTime + PRE_READ_MS, preReadCount);
if (scheduleList!=null && scheduleList.size()>0) {
// 2颗胡、push time-ring
for (XxlJobInfo jobInfo: scheduleList) {
// time-ring jump
//如果服務(wù)宕機了,或者重啟等等吩坝,導(dǎo)致超過了調(diào)度周期(5秒的調(diào)度周期)毒姨,也就是本來由時間上的上一次或上很多次調(diào)度觸發(fā)的數(shù)據(jù)被本次調(diào)度查到了,
// 這就可能代表著可能中間存在多次調(diào)度未觸發(fā)钉寝,而按照周期性一次一次計算下次預(yù)期調(diào)度時間弧呐,那這次調(diào)度完了計算出來的下次調(diào)度還是在當(dāng)前時間以前,
// 例如調(diào)度周期1分鐘調(diào)度一次嵌纲,宕機5分鐘了俘枫,現(xiàn)在查到的預(yù)期調(diào)度時間為5分鐘前,如果直接調(diào)度成功會重復(fù)調(diào)度5次當(dāng)前時間以前的任務(wù)逮走,這里直接pass并計算下一次調(diào)度時間鸠蚪,
// 但是計算下一次調(diào)度時間也是傳入當(dāng)前時間,直接修正預(yù)期下次調(diào)度時間為當(dāng)前時間之后师溅,因為調(diào)度時間周期為 5秒茅信,所以會+ PRE_READ_MS 判斷,如果是一次性的調(diào)度則會補償這次調(diào)度
if (nowTime > jobInfo.getTriggerNextTime() + PRE_READ_MS) {
// 2.1墓臭、trigger-expire > 5s:pass && make next-trigger-time
logger.warn(">>>>>>>>>>> xxl-job, schedule misfire, jobId = " + jobInfo.getId());
// 拿到設(shè)置的 調(diào)度過期策略
MisfireStrategyEnum misfireStrategyEnum = MisfireStrategyEnum.match(jobInfo.getMisfireStrategy(), MisfireStrategyEnum.DO_NOTHING);
// 如果是忽略就不執(zhí)行汹押,如果是立刻執(zhí)行一次,就立馬調(diào)用執(zhí)行一次該執(zhí)行器
if (MisfireStrategyEnum.FIRE_ONCE_NOW == misfireStrategyEnum) {
// FIRE_ONCE_NOW 》 trigger
JobTriggerPoolHelper.trigger(jobInfo.getId(), TriggerTypeEnum.MISFIRE, -1, null, null, null);
logger.debug(">>>>>>>>>>> xxl-job, schedule push trigger : jobId = " + jobInfo.getId() );
}
// 2起便、從當(dāng)前時間算起棚贾,算出這個定時器下次應(yīng)該執(zhí)行時間
refreshNextValidTime(jobInfo, new Date());
// 當(dāng)前的時間大于執(zhí)行器下次的執(zhí)行的時間,說明上次執(zhí)行器可能遺漏了這個執(zhí)行器
} else if (nowTime > jobInfo.getTriggerNextTime()) {
// 2.2榆综、trigger-expire < 5s:direct-trigger && make next-trigger-time
// 1妙痹、trigger
JobTriggerPoolHelper.trigger(jobInfo.getId(), TriggerTypeEnum.CRON, -1, null, null, null);
logger.debug(">>>>>>>>>>> xxl-job, schedule push trigger : jobId = " + jobInfo.getId() );
// 2、從當(dāng)前時間算起鼻疮,算出這個定時器下次應(yīng)該執(zhí)行時間
refreshNextValidTime(jobInfo, new Date());
// next-trigger-time in 5s, pre-read again
// 如果下次發(fā)送時間在當(dāng)前時間之后5秒內(nèi)怯伊,會進行第二次觸發(fā),放到另一個線程中執(zhí)行觸發(fā)邏輯
if (jobInfo.getTriggerStatus()==1 && nowTime + PRE_READ_MS > jobInfo.getTriggerNextTime()) {
// 1判沟、make ring second
// 這里算出的結(jié)果保持在 0 - 59之間
int ringSecond = (int)((jobInfo.getTriggerNextTime()/1000)%60);
// 2耿芹、將當(dāng)前任務(wù)放進一個全局變量map中崭篡,讓ringThread線程去執(zhí)行
pushTimeRing(ringSecond, jobInfo.getId());
// 3、從當(dāng)前時間算起吧秕,算出這個定時器下次應(yīng)該執(zhí)行時間
refreshNextValidTime(jobInfo, new Date(jobInfo.getTriggerNextTime()));
}
// 取5秒的直接觸發(fā) 但是區(qū)別于上面是第一次觸發(fā)
} else {
// 2.3琉闪、trigger-pre-read:time-ring trigger && make next-trigger-time
// 1、make ring second
int ringSecond = (int)((jobInfo.getTriggerNextTime()/1000)%60);
// 2砸彬、push time ring
pushTimeRing(ringSecond, jobInfo.getId());
// 3颠毙、從當(dāng)前時間算起,算出這個定時器下次應(yīng)該執(zhí)行時間
refreshNextValidTime(jobInfo, new Date(jobInfo.getTriggerNextTime()));
}
}
// 3砂碉、更新執(zhí)行器蛀蜜,主要是更新 上次觸發(fā)時間,下次觸發(fā)時間增蹭,和當(dāng)前執(zhí)行器的狀態(tài)
for (XxlJobInfo jobInfo: scheduleList) {
XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().scheduleUpdate(jobInfo);
}
} else {
preReadSuc = false;
}
// tx stop
} catch (Exception e) {
if (!scheduleThreadToStop) {
logger.error(">>>>>>>>>>> xxl-job, JobScheduleHelper#scheduleThread error:{}", e);
}
} finally {
// commit
if (conn != null) {
try {
conn.commit();
} catch (SQLException e) {
if (!scheduleThreadToStop) {
logger.error(e.getMessage(), e);
}
}
try {
conn.setAutoCommit(connAutoCommit);
} catch (SQLException e) {
if (!scheduleThreadToStop) {
logger.error(e.getMessage(), e);
}
}
try {
conn.close();
} catch (SQLException e) {
if (!scheduleThreadToStop) {
logger.error(e.getMessage(), e);
}
}
}
// close PreparedStatement
if (null != preparedStatement) {
try {
preparedStatement.close();
} catch (SQLException e) {
if (!scheduleThreadToStop) {
logger.error(e.getMessage(), e);
}
}
}
}
long cost = System.currentTimeMillis()-start;
// Wait seconds, align second
// 如果上述操作耗時大于一秒直接進入下次循環(huán)滴某,如果小于一秒需要再判斷
if (cost < 1000) { // scan-overtime, not wait
try {
// pre-read period: success > scan each second; fail > skip this period;
// 首先 System.currentTimeMillis()%1000 這里的取余,最大值是999毫秒滋迈,可以理解成極限的一秒
// 如果讀到了數(shù)據(jù) 休眠 (1 - (System.currentTimeMillis()%1000))秒霎奢,這里最大休眠1秒,最小接近不休眠直接執(zhí)行
// 因為讀到了數(shù)據(jù)杀怠,不知道接下來還有沒有數(shù)據(jù),這里為了趕工確保滿足條件的定時任務(wù)能快速被執(zhí)行厅克。
// 如果讀不到數(shù)據(jù)赔退,休眠 (5 - (System.currentTimeMillis()%1000))秒,這里最大休眠5秒证舟,最小休眠4秒
// 因為已經(jīng)讀不到數(shù)據(jù)了硕旗,多休息一下,讓定時器等到需要被執(zhí)行的時間點
TimeUnit.MILLISECONDS.sleep((preReadSuc?1000:PRE_READ_MS) - System.currentTimeMillis()%1000);
} catch (InterruptedException e) {
if (!scheduleThreadToStop) {
logger.error(e.getMessage(), e);
}
}
}
}
logger.info(">>>>>>>>>>> xxl-job, JobScheduleHelper#scheduleThread stop");
}
});
scheduleThread.setDaemon(true);
scheduleThread.setName("xxl-job, admin JobScheduleHelper#scheduleThread");
scheduleThread.start();
// ring thread
ringThread = new Thread(new Runnable() {
@Override
public void run() {
while (!ringThreadToStop) {
// align second
try {
// 上來休眠最大1秒女责,如果scheduleThread有執(zhí)行任務(wù)漆枚,保證會向 ringData(map) 里面寫
TimeUnit.MILLISECONDS.sleep(1000 - System.currentTimeMillis() % 1000);
} catch (InterruptedException e) {
if (!ringThreadToStop) {
logger.error(e.getMessage(), e);
}
}
try {
// second data
// 從 ringData(map)里面拿到,
List<Integer> ringItemData = new ArrayList<>();
// 上面介紹過 這個map的key是在0-59之間抵知,直接取當(dāng)前的秒數(shù)(0-59)
int nowSecond = Calendar.getInstance().get(Calendar.SECOND); // 避免處理耗時太長墙基,跨過刻度,向前校驗一個刻度刷喜;
for (int i = 0; i < 2; i++) {
// 循環(huán)兩次從 ringData(map)中拿到兩個時間點的執(zhí)行器ID集合 list残制,然后賦值給 ringItemData
List<Integer> tmpData = ringData.remove( (nowSecond+60-i)%60 );
if (tmpData != null) {
ringItemData.addAll(tmpData);
}
}
// ring trigger
logger.debug(">>>>>>>>>>> xxl-job, time-ring beat : " + nowSecond + " = " + Arrays.asList(ringItemData) );
// 如果 ringItemData 集合不為空,說明有需要執(zhí)行的執(zhí)行器Id掖疮,就遍歷執(zhí)行里面Id對應(yīng)的執(zhí)行器
if (ringItemData.size() > 0) {
// do trigger
for (int jobId: ringItemData) {
// do trigger
JobTriggerPoolHelper.trigger(jobId, TriggerTypeEnum.CRON, -1, null, null, null);
}
// clear
// 遍歷完成后初茶,將 ringItemData 置空,然后等待線程 下次執(zhí)行
ringItemData.clear();
}
} catch (Exception e) {
if (!ringThreadToStop) {
logger.error(">>>>>>>>>>> xxl-job, JobScheduleHelper#ringThread error:{}", e);
}
}
}
logger.info(">>>>>>>>>>> xxl-job, JobScheduleHelper#ringThread stop");
}
});
ringThread.setDaemon(true);
ringThread.setName("xxl-job, admin JobScheduleHelper#ringThread");
ringThread.start();
}
三浊闪、總結(jié)
1.xxl-job-admin的啟動是利用springboot的擴展接口InitializingBean來實現(xiàn)的恼布,銷毀是利用擴展DisposableBean接口來實現(xiàn)的螺戳。
2.xxl-job-admin的核心運行流程由JobRegistryHelper,JobFailMonitorHelper折汞,JobCompleteHelper倔幼,JobLogReportHelper,JobScheduleHelper 這幾個Helper完成字支。
3.單臺xxl-job最多一秒鐘可以完成6000個執(zhí)行器任務(wù)的執(zhí)行凤藏。
4.集群環(huán)境下,同一個執(zhí)行器不出現(xiàn)并發(fā)執(zhí)行問題其實是依賴了數(shù)據(jù)庫的行鎖實現(xiàn)的堕伪。
綜上所述揖庄,將xxl-job-admin中的啟動,以及如何調(diào)度核心部分就已經(jīng)說完了欠雌,其實在執(zhí)行執(zhí)行器任務(wù)的時候里面還涉及到xxl-job的集群分片處理任務(wù)的原理蹄梢,以及集群路由的原理,還有內(nèi)置server的設(shè)計富俄,以及xxl-job-admin遠程觸發(fā)任務(wù)使用的RPC調(diào)用原理細節(jié)禁炒,后面有空再整理吧。