xxl-job-admin的源碼分析

一、設(shè)計思路

借用官網(wǎng)的話:
將調(diào)度行為抽象形成“調(diào)度中心”公共平臺赢底,而平臺自身并不承擔(dān)業(yè)務(wù)邏輯疼鸟,“調(diào)度中心”負責(zé)發(fā)起調(diào)度請求后控。
將任務(wù)抽象成分散的JobHandler,交由“執(zhí)行器”統(tǒng)一管理空镜,“執(zhí)行器”負責(zé)接收調(diào)度請求并執(zhí)行對應(yīng)的JobHandler中業(yè)務(wù)邏輯浩淘。因此捌朴,“調(diào)度”和“任務(wù)”兩部分可以相互解耦,提高系統(tǒng)整體穩(wěn)定性和擴展性张抄;

借用官網(wǎng)的圖:

xxl_job架構(gòu)圖.png

簡化的架構(gòu)圖:
簡化架構(gòu)圖.png

二砂蔽、啟動原理

1.xxl-job-admin服務(wù)啟動原理

啟動XxlJobAdminApplication類,在spring容器實例化之前署惯,會執(zhí)行實現(xiàn)了 InitializingBean 接口的 afterPropertiesSet() 的方法左驾,這里是利用了springboot的拓展接口,來將xxl-job的相關(guān)bean給注冊到IOC容器當(dāng)中极谊。然后執(zhí)行最關(guān)鍵的 xxlJobScheduler.init()

XxlJobAdminConfig.png

調(diào)用 XxlJobSchedulerinit() 方法

XxlJobScheduler.png

JobRegistryHelper诡右,JobFailMonitorHelperJobCompleteHelper轻猖,JobLogReportHelper帆吻,JobScheduleHelper這五個類都是使用了餓漢式的單例模式(個人覺得還需要將構(gòu)造方法私有化),

image.png

a.調(diào)用 JobTriggerPoolHelper.toStart() 本質(zhì)就是調(diào)用JobTriggerPoolHelper的start()方法咙边,構(gòu)造出兩個線程池猜煮,如下圖所示的,一個快觸發(fā)線程池败许,一個慢觸發(fā)線程池王带。

image.png

b.調(diào)用 JobRegistryHelper.getInstance().start(),方法內(nèi)部主要做了兩件事市殷,一件事是初始化一個注冊或者移除的線程池 registryMonitorThread辫秧,然后創(chuàng)建一個 registryMonitorThread 的守護線程。設(shè)置成守護線程被丧。

JobRegistryHelper.start()方法.png

單獨把線程構(gòu)造拿出來分析盟戏。

// for monitor
        registryMonitorThread = new Thread(new Runnable() {
            @Override
            public void run() {
                // 死循環(huán)
                while (!toStop) {
                    try {
                        // 從 xxl_job_group 表中查詢出 自動注冊的執(zhí)行器  (address_type:0 自動注冊,1:手動注冊)
                        List<XxlJobGroup> groupList = XxlJobAdminConfig.getAdminConfig().getXxlJobGroupDao().findByAddressType(0);
                        if (groupList!=null && !groupList.isEmpty()) {
                            // 從 xxl_job_registry 表中找到更新時間 小于當(dāng)前時間+死亡間隔時間(就是找到注冊表中規(guī)定時間沒有更新的記錄)
                            List<Integer> ids = XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().findDead(RegistryConfig.DEAD_TIMEOUT, new Date());
                            if (ids!=null && ids.size()>0) {
                                // 如果找到在規(guī)定時間內(nèi)沒有更新的注冊甥桂,就直接刪除這些注冊執(zhí)行器
                                XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().removeDead(ids);
                            }

                            // fresh online address (admin/executor)
                            HashMap<String, List<String>> appAddressMap = new HashMap<String, List<String>>();
                            // 從 xxl_job_registry 表中找到更新時間 小于當(dāng)前時間+死亡間隔時間(就是找到注冊表中規(guī)定時間沒有更新的記錄)
                            List<XxlJobRegistry> list = XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().findAll(RegistryConfig.DEAD_TIMEOUT, new Date());
                            if (list != null) {
                                // 遍歷當(dāng)前過期的注冊器柿究,將過期的注冊器的 register_value 注冊地址保存到臨時變量 appAddressMap 中
                                for (XxlJobRegistry item: list) {
                                    // 如果注冊器類型(registry_group) 是 EXECUTOR
                                    if (RegistryConfig.RegistType.EXECUTOR.name().equals(item.getRegistryGroup())) {
                                        String appname = item.getRegistryKey();
                                        List<String> registryList = appAddressMap.get(appname);
                                        if (registryList == null) {
                                            registryList = new ArrayList<String>();
                                        }

                                        if (!registryList.contains(item.getRegistryValue())) {
                                            registryList.add(item.getRegistryValue());
                                        }
                                        appAddressMap.put(appname, registryList);
                                    }
                                }
                            }

                            // 刷新對應(yīng)執(zhí)行器地址和最新修改時間
                            for (XxlJobGroup group: groupList) {
                                List<String> registryList = appAddressMap.get(group.getAppname());
                                String addressListStr = null;
                                if (registryList!=null && !registryList.isEmpty()) {
                                    Collections.sort(registryList);
                                    StringBuilder addressListSB = new StringBuilder();
                                    for (String item:registryList) {
                                        addressListSB.append(item).append(",");
                                    }
                                    addressListStr = addressListSB.toString();
                                    addressListStr = addressListStr.substring(0, addressListStr.length()-1);
                                }
                                group.setAddressList(addressListStr);
                                group.setUpdateTime(new Date());

                                XxlJobAdminConfig.getAdminConfig().getXxlJobGroupDao().update(group);
                            }
                        }
                    } catch (Exception e) {
                        if (!toStop) {
                            logger.error(">>>>>>>>>>> xxl-job, job registry monitor thread error:{}", e);
                        }
                    }
                    try {
                        // 休眠心跳的時間
                        TimeUnit.SECONDS.sleep(RegistryConfig.BEAT_TIMEOUT);
                    } catch (InterruptedException e) {
                        if (!toStop) {
                            logger.error(">>>>>>>>>>> xxl-job, job registry monitor thread error:{}", e);
                        }
                    }
                }
                logger.info(">>>>>>>>>>> xxl-job, job registry monitor thread stop");
            }
        });

C.調(diào)用 JobFailMonitorHelper.getInstance().start(),方法內(nèi)部創(chuàng)建一個 monitorThread 的守護線程黄选。設(shè)置成守護線程蝇摸。

image.png

下面代碼詳細介紹 JobFailMonitorHelper.getInstance().start() 里面的 構(gòu)造的線程主要做的是什么事。

    public void start(){
        monitorThread = new Thread(new Runnable() {

            @Override
            public void run() {

                // monitor 死循環(huán)監(jiān)控办陷,每10秒鐘(每次執(zhí)行完休眠十秒)執(zhí)行一遍監(jiān)控的內(nèi)容
                while (!toStop) {
                    try {
                        // 從 xxl_job_log 查詢出失敗的日志記錄
                        List<Long> failLogIds = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().findFailJobLogIds(1000);
                        if (failLogIds!=null && !failLogIds.isEmpty()) {
                            for (long failLogId: failLogIds) {

                                // 修改 xxl_job_log 將警報狀態(tài)改成 (-1鎖定狀態(tài)) 告警狀態(tài):0-默認貌夕、1-無需告警、2-告警成功民镜、3-告警失敗
                                int lockRet = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().updateAlarmStatus(failLogId, 0, -1);
                                if (lockRet < 1) {
                                    continue;
                                }
                                // 根據(jù)失敗的日志id啡专,查詢出該條日志
                                XxlJobLog log = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().load(failLogId);
                                // 根據(jù)這條日志記錄的 執(zhí)行器Id查詢對應(yīng)的執(zhí)行器
                                XxlJobInfo info = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().loadById(log.getJobId());

                                // 如果 日志的重試次數(shù)大于0,就直接觸發(fā)JobTriggerPoolHelper.trigger()方法制圈,這個方法就是admin遠程調(diào)用執(zhí)行器的方法们童。
                                if (log.getExecutorFailRetryCount() > 0) {
                                    JobTriggerPoolHelper.trigger(log.getJobId(), TriggerTypeEnum.RETRY, (log.getExecutorFailRetryCount()-1), log.getExecutorShardingParam(), log.getExecutorParam(), null);
                                    String retryMsg = "<br><br><span style=\"color:#F39C12;\" > >>>>>>>>>>>"+ I18nUtil.getString("jobconf_trigger_type_retry") +"<<<<<<<<<<< </span><br>";
                                    log.setTriggerMsg(log.getTriggerMsg() + retryMsg);
                                    // 觸發(fā)完成畔况,將失敗重試的次數(shù)減一,更新
                                    XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().updateTriggerInfo(log);
                                }

                                // 2慧库、fail alarm monitor
                                int newAlarmStatus = 0;     // 告警狀態(tài):0-默認跷跪、-1=鎖定狀態(tài)、1-無需告警齐板、2-告警成功吵瞻、3-告警失敗
                                // 如果存在失敗的日志,發(fā)送警報郵件
                                if (info != null) {
                                    boolean alarmResult = XxlJobAdminConfig.getAdminConfig().getJobAlarmer().alarm(info, log);
                                    newAlarmStatus = alarmResult?2:3;
                                } else {
                                    newAlarmStatus = 1;
                                }
                                // 最后更新一下 xxl_job_log 表的 alarm_status 狀態(tài)字段
                                XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().updateAlarmStatus(failLogId, -1, newAlarmStatus);
                            }
                        }

                    } catch (Exception e) {
                        if (!toStop) {
                            logger.error(">>>>>>>>>>> xxl-job, job fail monitor thread error:{}", e);
                        }
                    }

                    try {
                          // 休眠 10秒
                        TimeUnit.SECONDS.sleep(10);
                    } catch (Exception e) {
                        if (!toStop) {
                            logger.error(e.getMessage(), e);
                        }
                    }

                }

                logger.info(">>>>>>>>>>> xxl-job, job fail monitor thread stop");

            }
        });
        monitorThread.setDaemon(true);
        monitorThread.setName("xxl-job, admin JobFailMonitorHelper");
        monitorThread.start();
    }

調(diào)用 JobCompleteHelper.getInstance().start()甘磨,方法內(nèi)部創(chuàng)建一個 monitorThread 的守護線程听皿。設(shè)置成守護線程;以及一個callbackThreadPool線程池宽档。

image.png

monitorThread 線程內(nèi)部的工作內(nèi)容

// for monitor
        monitorThread = new Thread(new Runnable() {

            @Override
            public void run() {

                // wait for JobTriggerPoolHelper-init
                try {
                    // 上來休眠50毫秒,等待 JobTriggerPoolHelper 初始化完成
                    TimeUnit.MILLISECONDS.sleep(50);
                } catch (InterruptedException e) {
                    if (!toStop) {
                        logger.error(e.getMessage(), e);
                    }
                }

                // monitor
                while (!toStop) {
                    try {
                        // 任務(wù)結(jié)果丟失處理:調(diào)度記錄停留在 "運行中" 狀態(tài)超過10min庵朝,且對應(yīng)執(zhí)行器心跳注冊失敗不在線吗冤,則將本地調(diào)度主動標記失敗九府;
                        Date losedTime = DateUtil.addMinutes(new Date(), -10);
                        List<Long> losedJobIds  = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().findLostJobIds(losedTime);

                        if (losedJobIds!=null && losedJobIds.size()>0) {
                            for (Long logId: losedJobIds) {

                                XxlJobLog jobLog = new XxlJobLog();
                                jobLog.setId(logId);

                                jobLog.setHandleTime(new Date());
                                jobLog.setHandleCode(ReturnT.FAIL_CODE);
                                jobLog.setHandleMsg( I18nUtil.getString("joblog_lost_fail") );
                                // 處理日志椎瘟,并更新執(zhí)行器的完成結(jié)果
                                XxlJobCompleter.updateHandleInfoAndFinish(jobLog);
                            }

                        }
                    } catch (Exception e) {
                        if (!toStop) {
                            logger.error(">>>>>>>>>>> xxl-job, job fail monitor thread error:{}", e);
                        }
                    }

                    try {
                            // 休眠60秒,再執(zhí)行一次
                        TimeUnit.SECONDS.sleep(60);
                    } catch (Exception e) {
                        if (!toStop) {
                            logger.error(e.getMessage(), e);
                        }
                    }

                }

                logger.info(">>>>>>>>>>> xxl-job, JobLosedMonitorHelper stop");

            }
        });

E.最后看一下 JobScheduleHelper.getInstance().start(); 方法侄旬。方法里面最主要的就是起兩個線程肺蔚,分別將兩個線程設(shè)置成守護線程,ringThread是最干活的線程儡羔,scheduleThread是檢測并調(diào)度執(zhí)行器任務(wù)的線程宣羊。

image.png

看一下這ringThread和scheduleThread里面都是干啥的。這里先總結(jié)一下汰蜘,scheduleThread線程主要就是將需要執(zhí)行的定時器任務(wù)分個類仇冯,并維護每個定時器里面的下次執(zhí)行時間,以及處理 調(diào)度過期的 執(zhí)行器族操,要么立刻執(zhí)行一次苛坚,要么直接忽略,等待下次執(zhí)行色难,最后就是將需要在5秒內(nèi)執(zhí)行的定時器放進一個map里面泼舱,交給ringThread線程去執(zhí)行定時器。而ringThread線程就是直接從map中拿到需要執(zhí)行的執(zhí)行器去執(zhí)行枷莉,并且每輪執(zhí)行只處理兩個時間點(毫秒級)的所有執(zhí)行器娇昙。具體的可以看代碼里面的講解。

public void start(){

        // schedule thread
        scheduleThread = new Thread(new Runnable() {
            @Override
            public void run() {

                try {
                    TimeUnit.MILLISECONDS.sleep(5000 - System.currentTimeMillis()%1000 );
                } catch (InterruptedException e) {
                    if (!scheduleThreadToStop) {
                        logger.error(e.getMessage(), e);
                    }
                }
                logger.info(">>>>>>>>> init xxl-job admin scheduler success.");

                // pre-read count: treadpool-size * trigger-qps (each trigger cost 50ms, qps = 1000/50 = 20)
                // 這里默認么個定時任務(wù)線程執(zhí)行耗時50毫秒笤妙,每秒1000毫秒涯贞,可以執(zhí)行20個任務(wù)枪狂,快線程池默認是200個最大線程數(shù),慢線程默認是100個最大線程數(shù)宋渔,
                // 所以當(dāng)線程數(shù)拉滿的情況下州疾,每秒鐘可以處理任務(wù)數(shù)是:(100+200)*20 = 6000 ;所以這里的  preReadCount 表示預(yù)讀數(shù)(默認最大:6000)
                // 如果想要提高并發(fā)性皇拣,通過修改快慢線程池的最大線程數(shù)這個參數(shù)調(diào)節(jié)
                int preReadCount = (XxlJobAdminConfig.getAdminConfig().getTriggerPoolFastMax() + XxlJobAdminConfig.getAdminConfig().getTriggerPoolSlowMax()) * 20;
                logger.info("==========================preReadCount ={}",preReadCount);

                while (!scheduleThreadToStop) {

                    // Scan Job
                    long start = System.currentTimeMillis();

                    Connection conn = null;
                    Boolean connAutoCommit = null;
                    PreparedStatement preparedStatement = null;

                    boolean preReadSuc = true;
                    try {

                        conn = XxlJobAdminConfig.getAdminConfig().getDataSource().getConnection();
                        connAutoCommit = conn.getAutoCommit();
                        conn.setAutoCommit(false);
                        // 利用數(shù)據(jù)庫的行鎖
                        preparedStatement = conn.prepareStatement(  "select * from xxl_job_lock where lock_name = 'schedule_lock' for update" );
                        preparedStatement.execute();

                        // tx start

                        // 1严蓖、預(yù)讀數(shù)據(jù)
                        long nowTime = System.currentTimeMillis();
                        // 由于分析了最多可以執(zhí)行6000個任務(wù),所以這里在去查任務(wù)表的時候氧急,最多去查出來6000條滿足條件的
                        // 條件:下次執(zhí)行時間 小于等于 (當(dāng)前時間 + 5秒) 并且  執(zhí)行狀態(tài)是  正在運行的狀態(tài)
                        List<XxlJobInfo> scheduleList = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().scheduleJobQuery(nowTime + PRE_READ_MS, preReadCount);
                        if (scheduleList!=null && scheduleList.size()>0) {
                            // 2颗胡、push time-ring
                            for (XxlJobInfo jobInfo: scheduleList) {

                                // time-ring jump
                                //如果服務(wù)宕機了,或者重啟等等吩坝,導(dǎo)致超過了調(diào)度周期(5秒的調(diào)度周期)毒姨,也就是本來由時間上的上一次或上很多次調(diào)度觸發(fā)的數(shù)據(jù)被本次調(diào)度查到了,
                                // 這就可能代表著可能中間存在多次調(diào)度未觸發(fā)钉寝,而按照周期性一次一次計算下次預(yù)期調(diào)度時間弧呐,那這次調(diào)度完了計算出來的下次調(diào)度還是在當(dāng)前時間以前,
                                // 例如調(diào)度周期1分鐘調(diào)度一次嵌纲,宕機5分鐘了俘枫,現(xiàn)在查到的預(yù)期調(diào)度時間為5分鐘前,如果直接調(diào)度成功會重復(fù)調(diào)度5次當(dāng)前時間以前的任務(wù)逮走,這里直接pass并計算下一次調(diào)度時間鸠蚪,
                                // 但是計算下一次調(diào)度時間也是傳入當(dāng)前時間,直接修正預(yù)期下次調(diào)度時間為當(dāng)前時間之后师溅,因為調(diào)度時間周期為 5秒茅信,所以會+ PRE_READ_MS 判斷,如果是一次性的調(diào)度則會補償這次調(diào)度
                                if (nowTime > jobInfo.getTriggerNextTime() + PRE_READ_MS) {
                                    // 2.1墓臭、trigger-expire > 5s:pass && make next-trigger-time
                                    logger.warn(">>>>>>>>>>> xxl-job, schedule misfire, jobId = " + jobInfo.getId());

                                    // 拿到設(shè)置的 調(diào)度過期策略
                                    MisfireStrategyEnum misfireStrategyEnum = MisfireStrategyEnum.match(jobInfo.getMisfireStrategy(), MisfireStrategyEnum.DO_NOTHING);
                                    // 如果是忽略就不執(zhí)行汹押,如果是立刻執(zhí)行一次,就立馬調(diào)用執(zhí)行一次該執(zhí)行器
                                    if (MisfireStrategyEnum.FIRE_ONCE_NOW == misfireStrategyEnum) {
                                        // FIRE_ONCE_NOW 》 trigger
                                        JobTriggerPoolHelper.trigger(jobInfo.getId(), TriggerTypeEnum.MISFIRE, -1, null, null, null);
                                        logger.debug(">>>>>>>>>>> xxl-job, schedule push trigger : jobId = " + jobInfo.getId() );
                                    }

                                    // 2起便、從當(dāng)前時間算起棚贾,算出這個定時器下次應(yīng)該執(zhí)行時間
                                    refreshNextValidTime(jobInfo, new Date());

                                    // 當(dāng)前的時間大于執(zhí)行器下次的執(zhí)行的時間,說明上次執(zhí)行器可能遺漏了這個執(zhí)行器
                                } else if (nowTime > jobInfo.getTriggerNextTime()) {
                                    // 2.2榆综、trigger-expire < 5s:direct-trigger && make next-trigger-time

                                    // 1妙痹、trigger
                                    JobTriggerPoolHelper.trigger(jobInfo.getId(), TriggerTypeEnum.CRON, -1, null, null, null);
                                    logger.debug(">>>>>>>>>>> xxl-job, schedule push trigger : jobId = " + jobInfo.getId() );

                                    // 2、從當(dāng)前時間算起鼻疮,算出這個定時器下次應(yīng)該執(zhí)行時間
                                    refreshNextValidTime(jobInfo, new Date());

                                    // next-trigger-time in 5s, pre-read again
                                    // 如果下次發(fā)送時間在當(dāng)前時間之后5秒內(nèi)怯伊,會進行第二次觸發(fā),放到另一個線程中執(zhí)行觸發(fā)邏輯
                                    if (jobInfo.getTriggerStatus()==1 && nowTime + PRE_READ_MS > jobInfo.getTriggerNextTime()) {

                                        // 1判沟、make ring second
                                        // 這里算出的結(jié)果保持在 0 - 59之間
                                        int ringSecond = (int)((jobInfo.getTriggerNextTime()/1000)%60);

                                        // 2耿芹、將當(dāng)前任務(wù)放進一個全局變量map中崭篡,讓ringThread線程去執(zhí)行
                                        pushTimeRing(ringSecond, jobInfo.getId());

                                        // 3、從當(dāng)前時間算起吧秕,算出這個定時器下次應(yīng)該執(zhí)行時間
                                        refreshNextValidTime(jobInfo, new Date(jobInfo.getTriggerNextTime()));

                                    }

                                    // 取5秒的直接觸發(fā) 但是區(qū)別于上面是第一次觸發(fā)
                                } else {
                                    // 2.3琉闪、trigger-pre-read:time-ring trigger && make next-trigger-time

                                    // 1、make ring second
                                    int ringSecond = (int)((jobInfo.getTriggerNextTime()/1000)%60);

                                    // 2砸彬、push time ring
                                    pushTimeRing(ringSecond, jobInfo.getId());

                                    // 3颠毙、從當(dāng)前時間算起,算出這個定時器下次應(yīng)該執(zhí)行時間
                                    refreshNextValidTime(jobInfo, new Date(jobInfo.getTriggerNextTime()));

                                }

                            }

                            // 3砂碉、更新執(zhí)行器蛀蜜,主要是更新 上次觸發(fā)時間,下次觸發(fā)時間增蹭,和當(dāng)前執(zhí)行器的狀態(tài)
                            for (XxlJobInfo jobInfo: scheduleList) {
                                XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().scheduleUpdate(jobInfo);
                            }

                        } else {
                            preReadSuc = false;
                        }

                        // tx stop


                    } catch (Exception e) {
                        if (!scheduleThreadToStop) {
                            logger.error(">>>>>>>>>>> xxl-job, JobScheduleHelper#scheduleThread error:{}", e);
                        }
                    } finally {

                        // commit
                        if (conn != null) {
                            try {
                                conn.commit();
                            } catch (SQLException e) {
                                if (!scheduleThreadToStop) {
                                    logger.error(e.getMessage(), e);
                                }
                            }
                            try {
                                conn.setAutoCommit(connAutoCommit);
                            } catch (SQLException e) {
                                if (!scheduleThreadToStop) {
                                    logger.error(e.getMessage(), e);
                                }
                            }
                            try {
                                conn.close();
                            } catch (SQLException e) {
                                if (!scheduleThreadToStop) {
                                    logger.error(e.getMessage(), e);
                                }
                            }
                        }

                        // close PreparedStatement
                        if (null != preparedStatement) {
                            try {
                                preparedStatement.close();
                            } catch (SQLException e) {
                                if (!scheduleThreadToStop) {
                                    logger.error(e.getMessage(), e);
                                }
                            }
                        }
                    }
                    long cost = System.currentTimeMillis()-start;


                    // Wait seconds, align second
                    // 如果上述操作耗時大于一秒直接進入下次循環(huán)滴某,如果小于一秒需要再判斷
                    if (cost < 1000) {  // scan-overtime, not wait
                        try {
                            // pre-read period: success > scan each second; fail > skip this period;
                            // 首先 System.currentTimeMillis()%1000 這里的取余,最大值是999毫秒滋迈,可以理解成極限的一秒
                            // 如果讀到了數(shù)據(jù) 休眠 (1 - (System.currentTimeMillis()%1000))秒霎奢,這里最大休眠1秒,最小接近不休眠直接執(zhí)行
                            // 因為讀到了數(shù)據(jù)杀怠,不知道接下來還有沒有數(shù)據(jù),這里為了趕工確保滿足條件的定時任務(wù)能快速被執(zhí)行厅克。
                            // 如果讀不到數(shù)據(jù)赔退,休眠 (5 - (System.currentTimeMillis()%1000))秒,這里最大休眠5秒证舟,最小休眠4秒
                            // 因為已經(jīng)讀不到數(shù)據(jù)了硕旗,多休息一下,讓定時器等到需要被執(zhí)行的時間點
                            TimeUnit.MILLISECONDS.sleep((preReadSuc?1000:PRE_READ_MS) - System.currentTimeMillis()%1000);
                        } catch (InterruptedException e) {
                            if (!scheduleThreadToStop) {
                                logger.error(e.getMessage(), e);
                            }
                        }
                    }

                }

                logger.info(">>>>>>>>>>> xxl-job, JobScheduleHelper#scheduleThread stop");
            }
        });
        scheduleThread.setDaemon(true);
        scheduleThread.setName("xxl-job, admin JobScheduleHelper#scheduleThread");
        scheduleThread.start();


        // ring thread
        ringThread = new Thread(new Runnable() {
            @Override
            public void run() {

                while (!ringThreadToStop) {

                    // align second
                    try {
                        // 上來休眠最大1秒女责,如果scheduleThread有執(zhí)行任務(wù)漆枚,保證會向 ringData(map) 里面寫
                        TimeUnit.MILLISECONDS.sleep(1000 - System.currentTimeMillis() % 1000);
                    } catch (InterruptedException e) {
                        if (!ringThreadToStop) {
                            logger.error(e.getMessage(), e);
                        }
                    }

                    try {
                        // second data
                        // 從 ringData(map)里面拿到,
                        List<Integer> ringItemData = new ArrayList<>();
                        // 上面介紹過 這個map的key是在0-59之間抵知,直接取當(dāng)前的秒數(shù)(0-59)
                        int nowSecond = Calendar.getInstance().get(Calendar.SECOND);   // 避免處理耗時太長墙基,跨過刻度,向前校驗一個刻度刷喜;
                        for (int i = 0; i < 2; i++) {
                            // 循環(huán)兩次從 ringData(map)中拿到兩個時間點的執(zhí)行器ID集合 list残制,然后賦值給 ringItemData
                            List<Integer> tmpData = ringData.remove( (nowSecond+60-i)%60 );
                            if (tmpData != null) {
                                ringItemData.addAll(tmpData);
                            }
                        }

                        // ring trigger
                        logger.debug(">>>>>>>>>>> xxl-job, time-ring beat : " + nowSecond + " = " + Arrays.asList(ringItemData) );
                        // 如果 ringItemData 集合不為空,說明有需要執(zhí)行的執(zhí)行器Id掖疮,就遍歷執(zhí)行里面Id對應(yīng)的執(zhí)行器
                        if (ringItemData.size() > 0) {
                            // do trigger
                            for (int jobId: ringItemData) {
                                // do trigger
                                JobTriggerPoolHelper.trigger(jobId, TriggerTypeEnum.CRON, -1, null, null, null);
                            }
                            // clear
                            // 遍歷完成后初茶,將 ringItemData 置空,然后等待線程 下次執(zhí)行
                            ringItemData.clear();
                        }
                    } catch (Exception e) {
                        if (!ringThreadToStop) {
                            logger.error(">>>>>>>>>>> xxl-job, JobScheduleHelper#ringThread error:{}", e);
                        }
                    }
                }
                logger.info(">>>>>>>>>>> xxl-job, JobScheduleHelper#ringThread stop");
            }
        });
        ringThread.setDaemon(true);
        ringThread.setName("xxl-job, admin JobScheduleHelper#ringThread");
        ringThread.start();
    }

三浊闪、總結(jié)

1.xxl-job-admin的啟動是利用springboot的擴展接口InitializingBean來實現(xiàn)的恼布,銷毀是利用擴展DisposableBean接口來實現(xiàn)的螺戳。
2.xxl-job-admin的核心運行流程由JobRegistryHelperJobFailMonitorHelper折汞,JobCompleteHelper倔幼,JobLogReportHelperJobScheduleHelper 這幾個Helper完成字支。
3.單臺xxl-job最多一秒鐘可以完成6000個執(zhí)行器任務(wù)的執(zhí)行凤藏。
4.集群環(huán)境下,同一個執(zhí)行器不出現(xiàn)并發(fā)執(zhí)行問題其實是依賴了數(shù)據(jù)庫的行鎖實現(xiàn)的堕伪。

綜上所述揖庄,將xxl-job-admin中的啟動,以及如何調(diào)度核心部分就已經(jīng)說完了欠雌,其實在執(zhí)行執(zhí)行器任務(wù)的時候里面還涉及到xxl-job的集群分片處理任務(wù)的原理蹄梢,以及集群路由的原理,還有內(nèi)置server的設(shè)計富俄,以及xxl-job-admin遠程觸發(fā)任務(wù)使用的RPC調(diào)用原理細節(jié)禁炒,后面有空再整理吧。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末霍比,一起剝皮案震驚了整個濱河市幕袱,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌悠瞬,老刑警劉巖们豌,帶你破解...
    沈念sama閱讀 219,270評論 6 508
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異浅妆,居然都是意外死亡望迎,警方通過查閱死者的電腦和手機,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,489評論 3 395
  • 文/潘曉璐 我一進店門凌外,熙熙樓的掌柜王于貴愁眉苦臉地迎上來辩尊,“玉大人,你說我怎么就攤上這事康辑∩阌” “怎么了?”我有些...
    開封第一講書人閱讀 165,630評論 0 356
  • 文/不壞的土叔 我叫張陵疮薇,是天一觀的道長蒿涎。 經(jīng)常有香客問我,道長惦辛,這世上最難降的妖魔是什么劳秋? 我笑而不...
    開封第一講書人閱讀 58,906評論 1 295
  • 正文 為了忘掉前任,我火速辦了婚禮,結(jié)果婚禮上玻淑,老公的妹妹穿的比我還像新娘嗽冒。我一直安慰自己,他們只是感情好补履,可當(dāng)我...
    茶點故事閱讀 67,928評論 6 392
  • 文/花漫 我一把揭開白布添坊。 她就那樣靜靜地躺著,像睡著了一般箫锤。 火紅的嫁衣襯著肌膚如雪贬蛙。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 51,718評論 1 305
  • 那天谚攒,我揣著相機與錄音阳准,去河邊找鬼。 笑死馏臭,一個胖子當(dāng)著我的面吹牛野蝇,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播括儒,決...
    沈念sama閱讀 40,442評論 3 420
  • 文/蒼蘭香墨 我猛地睜開眼绕沈,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了帮寻?” 一聲冷哼從身側(cè)響起乍狐,我...
    開封第一講書人閱讀 39,345評論 0 276
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎固逗,沒想到半個月后浅蚪,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,802評論 1 317
  • 正文 獨居荒郊野嶺守林人離奇死亡抒蚜,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,984評論 3 337
  • 正文 我和宋清朗相戀三年掘鄙,在試婚紗的時候發(fā)現(xiàn)自己被綠了耘戚。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片嗡髓。...
    茶點故事閱讀 40,117評論 1 351
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖收津,靈堂內(nèi)的尸體忽然破棺而出饿这,到底是詐尸還是另有隱情,我是刑警寧澤撞秋,帶...
    沈念sama閱讀 35,810評論 5 346
  • 正文 年R本政府宣布长捧,位于F島的核電站,受9級特大地震影響吻贿,放射性物質(zhì)發(fā)生泄漏串结。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 41,462評論 3 331
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望肌割。 院中可真熱鬧卧蜓,春花似錦、人聲如沸把敞。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,011評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽奋早。三九已至盛霎,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間耽装,已是汗流浹背愤炸。 一陣腳步聲響...
    開封第一講書人閱讀 33,139評論 1 272
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留剂邮,地道東北人摇幻。 一個月前我還...
    沈念sama閱讀 48,377評論 3 373
  • 正文 我出身青樓,卻偏偏與公主長得像挥萌,于是被迫代替她去往敵國和親绰姻。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 45,060評論 2 355

推薦閱讀更多精彩內(nèi)容