xxl-job v2.2.1 調(diào)度中心源碼解讀

調(diào)度中心

  1. 掃描加載配置 ,XxlJobAdminConfig

  2. XxlJobAdminConfig加載完成后磷支,初始化XxlJobScheduler
    // ---------------------- XxlJobScheduler ----------------------

      private XxlJobScheduler xxlJobScheduler;
    
      @Override
      public void afterPropertiesSet() throws Exception {
        adminConfig = this;
    
        xxlJobScheduler = new XxlJobScheduler();
        xxlJobScheduler.init();
      }
    
      @Override
      public void destroy() throws Exception {
        xxlJobScheduler.destroy();
      }
    
    
    // ---------------------- XxlJobScheduler ----------------------
    
  3. XxlJobScheduler初始化daemon線程

    public void init() throws Exception {
    // init i18n 初始化國際化
    initI18n();

         // admin registry monitor run 
         JobRegistryMonitorHelper.getInstance().start();
    
         // admin fail-monitor run
         JobFailMonitorHelper.getInstance().start();
    
         // admin lose-monitor run
         JobLosedMonitorHelper.getInstance().start();
    
         // admin trigger pool start
         JobTriggerPoolHelper.toStart();
    
         // admin log report start
         JobLogReportHelper.getInstance().start();
    
         // start-schedule
         JobScheduleHelper.getInstance().start();
    
         logger.info(">>>>>>>>> init xxl-job admin success.");
     }
    

JobRegistryMonitorHelper.getInstance().start();

初始化執(zhí)行器注冊棺棵,異步執(zhí)行楼咳,registryThread線程,每隔30s執(zhí)行一次

// step 1. 查詢自動注冊的執(zhí)行器烛恤,對應(yīng)mysql表xxl_job_group
List<XxlJobGroup> groupList = XxlJobAdminConfig.getAdminConfig().getXxlJobGroupDao().findByAddressType(0);

// step 2. 清除90s未發(fā)送心跳節(jié)點(admin/executor)母怜,對應(yīng)mysql表xxl_job_registry
List<Integer> ids = XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().findDead(RegistryConfig.DEAD_TIMEOUT, new Date());
if (ids!=null && ids.size()>0) {
  XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().removeDead(ids);
}

// step 3. 查詢存活節(jié)點,同步xxl_job_registry表未注冊到xxl_job_group.address_list的數(shù)據(jù)
// 3.1 數(shù)據(jù)封裝
HashMap<String, List<String>> appAddressMap = new HashMap<String, List<String>>();
List<XxlJobRegistry> list = XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().findAll(RegistryConfig.DEAD_TIMEOUT, new Date());
if (list != null) {
  for (XxlJobRegistry item: list) {
    if (RegistryConfig.RegistType.EXECUTOR.name().equals(item.getRegistryGroup())) {
      String appname = item.getRegistryKey();
      List<String> registryList = appAddressMap.get(appname);
      if (registryList == null) {
        registryList = new ArrayList<String>();
      }

      if (!registryList.contains(item.getRegistryValue())) {
        registryList.add(item.getRegistryValue());
      }
      appAddressMap.put(appname, registryList);
    }
  }
}
// 3.2 fresh group address
for (XxlJobGroup group: groupList) {
  List<String> registryList = appAddressMap.get(group.getAppname());
  String addressListStr = null;
  if (registryList!=null && !registryList.isEmpty()) {
    Collections.sort(registryList);
    addressListStr = "";
    for (String item:registryList) {
      addressListStr += item + ",";
    }
    addressListStr = addressListStr.substring(0, addressListStr.length()-1);
  }
  group.setAddressList(addressListStr);
  XxlJobAdminConfig.getAdminConfig().getXxlJobGroupDao().update(group);
}


JobFailMonitorHelper.getInstance().start(); 

監(jiān)控報警缚柏,異步執(zhí)行苹熏,monitorThread線程,每隔10s執(zhí)行一次

// lock log
int lockRet = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().updateAlarmStatus(failLogId, 0, -1);
if (lockRet < 1) {
    continue;
}
XxlJobLog log = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().load(failLogId);
XxlJobInfo info = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().loadById(log.getJobId());

// 1币喧、fail retry monitor
if (log.getExecutorFailRetryCount() > 0) {
  JobTriggerPoolHelper.trigger(log.getJobId(), TriggerTypeEnum.RETRY, (log.getExecutorFailRetryCount()-1), log.getExecutorShardingParam(), log.getExecutorParam(), null);
  String retryMsg = "<br><br><span style=\"color:#F39C12;\" > >>>>>>>>>>>"+ I18nUtil.getString("jobconf_trigger_type_retry") +"<<<<<<<<<<< </span><br>";
  log.setTriggerMsg(log.getTriggerMsg() + retryMsg);
  XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().updateTriggerInfo(log);
}

// 2轨域、fail alarm monitor
int newAlarmStatus = 0;     // 告警狀態(tài):0-默認(rèn)、-1=鎖定狀態(tài)杀餐、1-無需告警干发、2-告警成功、3-告警失敗
if (info!=null && info.getAlarmEmail()!=null && info.getAlarmEmail().trim().length()>0) {
  boolean alarmResult = XxlJobAdminConfig.getAdminConfig().getJobAlarmer().alarm(info, log);
  newAlarmStatus = alarmResult?2:3;
} else {
    newAlarmStatus = 1;
}

// 更新狀態(tài) 
XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().updateAlarmStatus(failLogId, -1, newAlarmStatus);

JobLosedMonitorHelper.getInstance().start();

超時job置為失敗史翘,異步執(zhí)行枉长,monitorThread線程,每隔60s執(zhí)行一次

// 任務(wù)結(jié)果丟失處理:調(diào)度記錄停留在 "運行中" 狀態(tài)超過10min恶座,且對應(yīng)執(zhí)行器心跳注冊失敗不在線搀暑,則將本地調(diào)度主動標(biāo)記失斄ぱ簟跨琳;
Date losedTime = DateUtil.addMinutes(new Date(), -10);
List<Long> losedJobIds  = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().findLostJobIds(losedTime);

if (losedJobIds!=null && losedJobIds.size()>0) {
  for (Long logId: losedJobIds) {

    XxlJobLog jobLog = new XxlJobLog();
    jobLog.setId(logId);

    jobLog.setHandleTime(new Date());
    jobLog.setHandleCode(ReturnT.FAIL_CODE);
    jobLog.setHandleMsg( I18nUtil.getString("joblog_lost_fail") );

    XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().updateHandleInfo(jobLog);
  }

}

JobTriggerPoolHelper.toStart();

JobTriggerPoolHelper

啟動快、慢線程池

// 正常線程池
fastTriggerPool = new ThreadPoolExecutor(
  10,
  XxlJobAdminConfig.getAdminConfig().getTriggerPoolFastMax(),
  60L,
  TimeUnit.SECONDS,
  new LinkedBlockingQueue<Runnable>(1000),
  new ThreadFactory() {
    @Override
    public Thread newThread(Runnable r) {
      return new Thread(r, "xxl-job, admin JobTriggerPoolHelper-fastTriggerPool-" + r.hashCode());
    }
  });

// 慢速線程池
slowTriggerPool = new ThreadPoolExecutor(
  10,
  XxlJobAdminConfig.getAdminConfig().getTriggerPoolSlowMax(),
  60L,
  TimeUnit.SECONDS,
  new LinkedBlockingQueue<Runnable>(2000),
  new ThreadFactory() {
    @Override
    public Thread newThread(Runnable r) {
      return new Thread(r, "xxl-job, admin JobTriggerPoolHelper-slowTriggerPool-" + r.hashCode());
    }
  });

JobLogReportHelper.getInstance().start();

執(zhí)行記錄報表桐罕,異步執(zhí)行脉让,logrThread

// 1桂敛、log-report refresh: refresh log report in 3 days
try {

  for (int i = 0; i < 3; i++) {

    // today
    Calendar itemDay = Calendar.getInstance();
    itemDay.add(Calendar.DAY_OF_MONTH, -i);
    itemDay.set(Calendar.HOUR_OF_DAY, 0);
    itemDay.set(Calendar.MINUTE, 0);
    itemDay.set(Calendar.SECOND, 0);
    itemDay.set(Calendar.MILLISECOND, 0);

    Date todayFrom = itemDay.getTime();

    itemDay.set(Calendar.HOUR_OF_DAY, 23);
    itemDay.set(Calendar.MINUTE, 59);
    itemDay.set(Calendar.SECOND, 59);
    itemDay.set(Calendar.MILLISECOND, 999);

    Date todayTo = itemDay.getTime();

    // refresh log-report every minute
    XxlJobLogReport xxlJobLogReport = new XxlJobLogReport();
    xxlJobLogReport.setTriggerDay(todayFrom);
    xxlJobLogReport.setRunningCount(0);
    xxlJobLogReport.setSucCount(0);
    xxlJobLogReport.setFailCount(0);

    Map<String, Object> triggerCountMap = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().findLogReport(todayFrom, todayTo);
    if (triggerCountMap!=null && triggerCountMap.size()>0) {
      int triggerDayCount = triggerCountMap.containsKey("triggerDayCount")?Integer.valueOf(String.valueOf(triggerCountMap.get("triggerDayCount"))):0;
      int triggerDayCountRunning = triggerCountMap.containsKey("triggerDayCountRunning")?Integer.valueOf(String.valueOf(triggerCountMap.get("triggerDayCountRunning"))):0;
      int triggerDayCountSuc = triggerCountMap.containsKey("triggerDayCountSuc")?Integer.valueOf(String.valueOf(triggerCountMap.get("triggerDayCountSuc"))):0;
      int triggerDayCountFail = triggerDayCount - triggerDayCountRunning - triggerDayCountSuc;

      xxlJobLogReport.setRunningCount(triggerDayCountRunning);
      xxlJobLogReport.setSucCount(triggerDayCountSuc);
      xxlJobLogReport.setFailCount(triggerDayCountFail);
    }

    // do refresh
    int ret = XxlJobAdminConfig.getAdminConfig().getXxlJobLogReportDao().update(xxlJobLogReport);
    if (ret < 1) {
      XxlJobAdminConfig.getAdminConfig().getXxlJobLogReportDao().save(xxlJobLogReport);
    }
  }

} catch (Exception e) {
  if (!toStop) {
    logger.error(">>>>>>>>>>> xxl-job, job log report thread error:{}", e);
  }
}

// 2、log-clean: switch open & once each day
if (XxlJobAdminConfig.getAdminConfig().getLogretentiondays()>0
    && System.currentTimeMillis() - lastCleanLogTime > 24*60*60*1000) {

  // expire-time
  Calendar expiredDay = Calendar.getInstance();
  expiredDay.add(Calendar.DAY_OF_MONTH, -1 * XxlJobAdminConfig.getAdminConfig().getLogretentiondays());
  expiredDay.set(Calendar.HOUR_OF_DAY, 0);
  expiredDay.set(Calendar.MINUTE, 0);
  expiredDay.set(Calendar.SECOND, 0);
  expiredDay.set(Calendar.MILLISECOND, 0);
  Date clearBeforeTime = expiredDay.getTime();

  // clean expired log
  List<Long> logIds = null;
  do {
    logIds = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().findClearLogIds(0, 0, clearBeforeTime, 0, 1000);
    if (logIds!=null && logIds.size()>0) {
      XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().clearLog(logIds);
    }
  } while (logIds!=null && logIds.size()>0);

  // update clean time
  lastCleanLogTime = System.currentTimeMillis();
}

try {
  TimeUnit.MINUTES.sleep(1);
} catch (Exception e) {
  if (!toStop) {
    logger.error(e.getMessage(), e);
  }
}

JobScheduleHelper.getInstance().start();

job執(zhí)行

// pre-read count: treadpool-size * trigger-qps (each trigger cost 50ms, qps = 1000/50 = 20)
// 計算預(yù)讀數(shù)量
int preReadCount = (XxlJobAdminConfig.getAdminConfig().getTriggerPoolFastMax() + XxlJobAdminConfig.getAdminConfig().getTriggerPoolSlowMax()) * 20;

// 加鎖
conn = XxlJobAdminConfig.getAdminConfig().getDataSource().getConnection();
connAutoCommit = conn.getAutoCommit();
conn.setAutoCommit(false);

preparedStatement = conn.prepareStatement(  "select * from xxl_job_lock where lock_name = 'schedule_lock' for update" );
preparedStatement.execute();

 // tx start
// 1溅潜、pre read 查詢運行中术唬,到達(dá)執(zhí)行時間job limit preReadCount,mysql表xxl_job_info
long nowTime = System.currentTimeMillis();
List<XxlJobInfo> scheduleList = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().scheduleJobQuery(nowTime + PRE_READ_MS, preReadCount);
if (scheduleList!=null && scheduleList.size()>0) {
  // 2、push time-ring
  for (XxlJobInfo jobInfo: scheduleList) {

    // time-ring jump
    if (nowTime > jobInfo.getTriggerNextTime() + PRE_READ_MS) {
      // 2.1滚澜、trigger-expire > 5s:pass && make next-trigger-time
      // 下次執(zhí)行時間超過當(dāng)前5s不執(zhí)行粗仓,更新時間,等下一次執(zhí)行
      logger.warn(">>>>>>>>>>> xxl-job, schedule misfire, jobId = " + jobInfo.getId());

      // fresh next
      refreshNextValidTime(jobInfo, new Date());

    } else if (nowTime > jobInfo.getTriggerNextTime()) {
      // 2.2设捐、trigger-expire < 5s:direct-trigger && make next-trigger-time
            // 下次執(zhí)行在5s內(nèi)借浊,執(zhí)行
      // 1、trigger萝招,加入線程池執(zhí)行蚂斤,快慢線程池選擇策略
      JobTriggerPoolHelper.trigger(jobInfo.getId(), TriggerTypeEnum.CRON, -1, null, null, null);
      logger.debug(">>>>>>>>>>> xxl-job, schedule push trigger : jobId = " + jobInfo.getId() );

      // 2、fresh next 更新狀態(tài)和下次執(zhí)行時間
      refreshNextValidTime(jobInfo, new Date());

      // next-trigger-time in 5s, pre-read again
      if (jobInfo.getTriggerStatus()==1 && nowTime + PRE_READ_MS > jobInfo.getTriggerNextTime()) {

        // 1槐沼、make ring second
        int ringSecond = (int)((jobInfo.getTriggerNextTime()/1000)%60);

        // 2曙蒸、push time ring
        pushTimeRing(ringSecond, jobInfo.getId());

        // 3、fresh next
        refreshNextValidTime(jobInfo, new Date(jobInfo.getTriggerNextTime()));

      }

    } else {// 執(zhí)行時間 < 當(dāng)前時間
      // 2.3岗钩、trigger-pre-read:time-ring trigger && make next-trigger-time

      // 1纽窟、make ring second
      int ringSecond = (int)((jobInfo.getTriggerNextTime()/1000)%60);

      // 2、push time ring
      pushTimeRing(ringSecond, jobInfo.getId());

      // 3凹嘲、fresh next
      refreshNextValidTime(jobInfo, new Date(jobInfo.getTriggerNextTime()));

    }

  }

  // 3师倔、update trigger info
  for (XxlJobInfo jobInfo: scheduleList) {
    XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().scheduleUpdate(jobInfo);
  }

} else {
  preReadSuc = false;
}

// tx stop
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市周蹭,隨后出現(xiàn)的幾起案子趋艘,更是在濱河造成了極大的恐慌,老刑警劉巖凶朗,帶你破解...
    沈念sama閱讀 221,273評論 6 515
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件瓷胧,死亡現(xiàn)場離奇詭異,居然都是意外死亡棚愤,警方通過查閱死者的電腦和手機搓萧,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 94,349評論 3 398
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來宛畦,“玉大人瘸洛,你說我怎么就攤上這事〈魏停” “怎么了反肋?”我有些...
    開封第一講書人閱讀 167,709評論 0 360
  • 文/不壞的土叔 我叫張陵,是天一觀的道長踏施。 經(jīng)常有香客問我石蔗,道長罕邀,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 59,520評論 1 296
  • 正文 為了忘掉前任养距,我火速辦了婚禮诉探,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘棍厌。我一直安慰自己肾胯,他們只是感情好,可當(dāng)我...
    茶點故事閱讀 68,515評論 6 397
  • 文/花漫 我一把揭開白布耘纱。 她就那樣靜靜地躺著阳液,像睡著了一般。 火紅的嫁衣襯著肌膚如雪揣炕。 梳的紋絲不亂的頭發(fā)上帘皿,一...
    開封第一講書人閱讀 52,158評論 1 308
  • 那天,我揣著相機與錄音畸陡,去河邊找鬼鹰溜。 笑死,一個胖子當(dāng)著我的面吹牛丁恭,可吹牛的內(nèi)容都是我干的曹动。 我是一名探鬼主播,決...
    沈念sama閱讀 40,755評論 3 421
  • 文/蒼蘭香墨 我猛地睜開眼牲览,長吁一口氣:“原來是場噩夢啊……” “哼墓陈!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起第献,我...
    開封第一講書人閱讀 39,660評論 0 276
  • 序言:老撾萬榮一對情侶失蹤贡必,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后庸毫,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體仔拟,經(jīng)...
    沈念sama閱讀 46,203評論 1 319
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 38,287評論 3 340
  • 正文 我和宋清朗相戀三年飒赃,在試婚紗的時候發(fā)現(xiàn)自己被綠了利花。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 40,427評論 1 352
  • 序言:一個原本活蹦亂跳的男人離奇死亡载佳,死狀恐怖炒事,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情蔫慧,我是刑警寧澤挠乳,帶...
    沈念sama閱讀 36,122評論 5 349
  • 正文 年R本政府宣布,位于F島的核電站,受9級特大地震影響欲侮,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜肋联,卻給世界環(huán)境...
    茶點故事閱讀 41,801評論 3 333
  • 文/蒙蒙 一威蕉、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧橄仍,春花似錦韧涨、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,272評論 0 23
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至宪哩,卻和暖如春娩贷,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背锁孟。 一陣腳步聲響...
    開封第一講書人閱讀 33,393評論 1 272
  • 我被黑心中介騙來泰國打工彬祖, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人品抽。 一個月前我還...
    沈念sama閱讀 48,808評論 3 376
  • 正文 我出身青樓储笑,卻偏偏與公主長得像,于是被迫代替她去往敵國和親圆恤。 傳聞我的和親對象是個殘疾皇子突倍,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 45,440評論 2 359