xxl-job之調(diào)度中心啟動源碼分析(一)

首先從spring的配置看起翔烁, 從以下配置可以看出蹬屹,xxl內(nèi)部使用的是quartz

spring配置

<bean id="quartzScheduler" lazy-init="false" class="org.springframework.scheduling.quartz.SchedulerFactoryBean">
   <property name="dataSource" ref="dataSource" />
   <property name="autoStartup" value="true" />         <!--自動啟動 -->
   <property name="startupDelay" value="20" />             <!--延時啟動虾攻,應用啟動成功后在啟動 -->
   <property name="overwriteExistingJobs" value="true" /> <!--覆蓋DB中JOB:true、以數(shù)據(jù)庫中已經(jīng)存在的為準:false -->
   <property name="applicationContextSchedulerContextKey"  value="applicationContextKey" />
   <property name="configLocation" value="classpath:quartz.properties"/>
</bean>
<!-- 這個調(diào)度中心樊拓,在啟動的時候蛤织,會做很多初始化的工作 摊鸡,比如:執(zhí)行器信息囤热,注冊機器列表等信息 -->
<bean id="xxlJobDynamicScheduler" class="com.xxl.job.admin.core.schedule.XxlJobDynamicScheduler" init-method="init" destroy-method="destroy" >
    <!-- 配置調(diào)度中心的名稱 -->
   <property name="scheduler" ref="quartzScheduler"/>
    <!-- 用于調(diào)度中心和執(zhí)行器之間通信的時候做數(shù)據(jù)加密 -->
   <property name="accessToken" value="${xxl.job.accessToken}" />
</bean>

XxlJobDynamicScheduler

com.xxl.job.admin.core.schedule.XxlJobDynamicScheduler 在啟動的時候會做如下工作:

public void init() throws Exception {
    // 啟動自動注冊線程, 獲取類型為自動注冊的執(zhí)行器信息犀暑,完成機器的自動注冊與發(fā)現(xiàn)
    JobRegistryMonitorHelper.getInstance().start();
     
    // 啟動失敗日志監(jiān)控線程
    JobFailMonitorHelper.getInstance().start();
     
    // admin-server(spring-mvc)
    NetComServerFactory.putService(AdminBiz.class, XxlJobDynamicScheduler.adminBiz);
    NetComServerFactory.setAccessToken(accessToken);
     
    // valid
    Assert.notNull(scheduler, "quartz scheduler is null");
    logger.info(">>>>>>>>> init xxl-job admin success.");
}

JobRegistryMonitorHelper

JobRegistryMonitorHelper.getInstance().start() 詳細代碼如下:


public void start(){
   //創(chuàng)建一個線程
   registryThread = new Thread(new Runnable() {
      @Override
      public void run() {
         // 當toStop 為false時進入該循環(huán)。
         while (!toStop) {
            try {
               // 獲取類型為自動注冊的執(zhí)行器地址列表
               List<XxlJobGroup> groupList = XxlJobDynamicScheduler.xxlJobGroupDao.findByAddressType(0);
               if (CollectionUtils.isNotEmpty(groupList)) {
     
                  // 刪除 90秒之內(nèi)沒有更新信息的注冊機器, 90秒沒有心跳信息返回,代表機器已經(jīng)出現(xiàn)問題,故移除
                  XxlJobDynamicScheduler.xxlJobRegistryDao.removeDead(RegistryConfig.DEAD_TIMEOUT);
     
                  // fresh online address (admin/executor)
                  HashMap<String, List<String>> appAddressMap = new HashMap<String, List<String>>();
                  // 查詢在90秒之內(nèi)有過更新的機器列表
                  List<XxlJobRegistry> list = XxlJobDynamicScheduler.xxlJobRegistryDao.findAll(RegistryConfig.DEAD_TIMEOUT);
                  if (list != null) {
                     //循環(huán)注冊機器列表,  根據(jù)執(zhí)行器不同,將這些機器列表區(qū)分拿出來
                     for (XxlJobRegistry item: list) {
                        // 判斷該機器注冊信息RegistryGroup ,RegistType 是否是EXECUTOR , EXECUTOR 代表該機器是注冊到執(zhí)行器上面的
                        // RegistType  分為兩種帆焕, ADMIN 和EXECUTOR
                        if (RegistryConfig.RegistType.EXECUTOR.name().equals(item.getRegistryGroup())) {
                           // 獲取注冊的執(zhí)行器 KEY  (也就是執(zhí)行器)
                           String appName = item.getRegistryKey();
                           List<String> registryList = appAddressMap.get(appName);
                           if (registryList == null) {
                              registryList = new ArrayList<String>();
                           }
     
                           if (!registryList.contains(item.getRegistryValue())) {
                              registryList.add(item.getRegistryValue());
                           }
                           // 收集 機器信息,根據(jù)執(zhí)行器做區(qū)分
                           appAddressMap.put(appName, registryList);
                        }
                     }
                  }
     
                  //  遍歷執(zhí)行器列表
                  for (XxlJobGroup group: groupList) {
                     // 通過執(zhí)行器的APP_NAME  拿出他下面的集群機器地址
                     List<String> registryList = appAddressMap.get(group.getAppName());
                     String addressListStr = null;
                     if (CollectionUtils.isNotEmpty(registryList)) {
                        Collections.sort(registryList);
                        // 轉為為String锅论, 通過逗號分隔
                        addressListStr = StringUtils.join(registryList, ",");
                     }
                     group.setAddressList(addressListStr);
                     // 將 這個執(zhí)行器的 集群機器地址列表讼溺,寫入到數(shù)據(jù)庫
                     XxlJobDynamicScheduler.xxlJobGroupDao.update(group);
                  }
               }
            } catch (Exception e) {
               logger.error("job registry instance error:{}", e);
            }
            try {
               TimeUnit.SECONDS.sleep(RegistryConfig.BEAT_TIMEOUT);
            } catch (InterruptedException e) {
               logger.error("job registry instance error:{}", e);
            }
         }
      }
   });
   registryThread.setDaemon(true);
   //啟動線程
   registryThread.start();
}

JobFailMonitorHelper

JobFailMonitorHelper.getInstance().start(); 詳細代碼如下:

//JobFailMonitorHelper.java
public void start(){
   // 啟動線程
 monitorThread = new Thread(new Runnable() {

 @Override
 public void run() {
         // monitor
 while (!toStop) {
            try {
               List<Integer> jobLogIdList = new ArrayList<Integer>();
               // 從隊列中拿出所有可用的 jobLogIds
                int drainToNum = JobFailMonitorHelper.instance.queue.drainTo(jobLogIdList);
               if (CollectionUtils.isNotEmpty(jobLogIdList)) {
                  for (Integer jobLogId : jobLogIdList) {
                     if (jobLogId==null || jobLogId==0) {
                        continue;
                     }
                     //從數(shù)據(jù)庫跟以前有日志信息
                    XxlJobLog log = XxlJobDynamicScheduler.xxlJobLogDao.load(jobLogId);
                     if (log == null) {
                        continue;
                     }
                     //任務觸發(fā)成功, 但是JobHandle 還沒有返回結果
                    if (IJobHandler.SUCCESS.getCode() == log.getTriggerCode() && log.getHandleCode() == 0) {
                        //將 JobLogId 放入隊列 最易, 繼續(xù)監(jiān)控
                         JobFailMonitorHelper.monitor(jobLogId);
                        logger.info(">>>>>>>>>>> job monitor, job running, JobLogId:{}", jobLogId);
                     } else if (IJobHandler.SUCCESS.getCode() == log.getHandleCode()) {
                        // job success, pass
                        logger.info(">>>>>>>>>>> job monitor, job success, JobLogId:{}", jobLogId);
                     } else if (IJobHandler.FAIL.getCode() == log.getTriggerCode()
                           || IJobHandler.FAIL.getCode() == log.getHandleCode()
                           || IJobHandler.FAIL_RETRY.getCode() == log.getHandleCode() ) {
                        // 任務執(zhí)行失敗怒坯, 執(zhí)行發(fā)送郵件等預警措施
                        failAlarm(log);
                        logger.info(">>>>>>>>>>> job monitor, job fail, JobLogId:{}", jobLogId);
                     } else {
                        JobFailMonitorHelper.monitor(jobLogId);
                        logger.info(">>>>>>>>>>> job monitor, job status unknown, JobLogId:{}", jobLogId);
                     }
                  }
               }
               // 停頓一下
                TimeUnit.SECONDS.sleep(10);
            } catch (Exception e) {
               logger.error("job monitor error:{}", e);
            }
         }

   });
   monitorThread.setDaemon(true);
   monitorThread.start();
}

以上 是xxl-job 在啟動的時候做的操作, 主要是啟動兩個線程藻懒,

用來監(jiān)控自動注冊上來的機器剔猿,達到自動注冊的目的
監(jiān)控任務的執(zhí)行狀態(tài), 如若失敗嬉荆,則發(fā)送郵件預警

xxl-job 是基于quartz 進行的二次開發(fā)归敬,在系統(tǒng)啟動的時候,quartz框架會自動去數(shù)據(jù)庫讀取相關的配置信息,載入相關定時器信息

最后編輯于
?著作權歸作者所有,轉載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末鄙早,一起剝皮案震驚了整個濱河市弄慰,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌蝶锋,老刑警劉巖陆爽,帶你破解...
    沈念sama閱讀 211,123評論 6 490
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異扳缕,居然都是意外死亡慌闭,警方通過查閱死者的電腦和手機,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,031評論 2 384
  • 文/潘曉璐 我一進店門躯舔,熙熙樓的掌柜王于貴愁眉苦臉地迎上來驴剔,“玉大人,你說我怎么就攤上這事粥庄∩ナВ” “怎么了?”我有些...
    開封第一講書人閱讀 156,723評論 0 345
  • 文/不壞的土叔 我叫張陵惜互,是天一觀的道長布讹。 經(jīng)常有香客問我,道長训堆,這世上最難降的妖魔是什么描验? 我笑而不...
    開封第一講書人閱讀 56,357評論 1 283
  • 正文 為了忘掉前任,我火速辦了婚禮坑鱼,結果婚禮上膘流,老公的妹妹穿的比我還像新娘。我一直安慰自己,他們只是感情好呼股,可當我...
    茶點故事閱讀 65,412評論 5 384
  • 文/花漫 我一把揭開白布耕魄。 她就那樣靜靜地躺著,像睡著了一般彭谁。 火紅的嫁衣襯著肌膚如雪屎开。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 49,760評論 1 289
  • 那天马靠,我揣著相機與錄音,去河邊找鬼蔼两。 笑死甩鳄,一個胖子當著我的面吹牛,可吹牛的內(nèi)容都是我干的额划。 我是一名探鬼主播妙啃,決...
    沈念sama閱讀 38,904評論 3 405
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼俊戳!你這毒婦竟也來了揖赴?” 一聲冷哼從身側響起,我...
    開封第一講書人閱讀 37,672評論 0 266
  • 序言:老撾萬榮一對情侶失蹤抑胎,失蹤者是張志新(化名)和其女友劉穎燥滑,沒想到半個月后,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體阿逃,經(jīng)...
    沈念sama閱讀 44,118評論 1 303
  • 正文 獨居荒郊野嶺守林人離奇死亡铭拧,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 36,456評論 2 325
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了恃锉。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片搀菩。...
    茶點故事閱讀 38,599評論 1 340
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖破托,靈堂內(nèi)的尸體忽然破棺而出肪跋,到底是詐尸還是另有隱情,我是刑警寧澤土砂,帶...
    沈念sama閱讀 34,264評論 4 328
  • 正文 年R本政府宣布州既,位于F島的核電站,受9級特大地震影響萝映,放射性物質發(fā)生泄漏易桃。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 39,857評論 3 312
  • 文/蒙蒙 一锌俱、第九天 我趴在偏房一處隱蔽的房頂上張望晤郑。 院中可真熱鬧,春花似錦、人聲如沸造寝。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,731評論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽诫龙。三九已至析显,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間签赃,已是汗流浹背谷异。 一陣腳步聲響...
    開封第一講書人閱讀 31,956評論 1 264
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留锦聊,地道東北人歹嘹。 一個月前我還...
    沈念sama閱讀 46,286評論 2 360
  • 正文 我出身青樓,卻偏偏與公主長得像孔庭,于是被迫代替她去往敵國和親尺上。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當晚...
    茶點故事閱讀 43,465評論 2 348

推薦閱讀更多精彩內(nèi)容