精進(jìn)Quartz的scheduler.start()啟動源碼分析

scheduler.start() 調(diào)用 .QuartzScheduler.start();

Quartz 的啟動要調(diào)用start()方法進(jìn)行線程的啟動隙畜,并執(zhí)行需要出發(fā)的Trigger证鸥,start方法里面進(jìn)行的操作:

  • 1疙驾、啟動的初始化
  • 2摆霉、判斷是否集群,對應(yīng)不同的操作
  • 3菠隆、若是非集群譬重,首先有恢復(fù)機(jī)制,恢復(fù)任何失敗或misfire的作業(yè)簿训,并根據(jù)需要清理數(shù)據(jù)存儲咱娶。
  • 4、初始化線程管理强品,喚醒所有等待的線程膘侮!

線程中啟動線程是調(diào)用start()方法,但是真正執(zhí)行線程任務(wù)的操作在run()中的榛!

開啟 scheduler調(diào)用start()

實際上調(diào)用QuartzScheduler對象的start();
下面就是簡單的源碼分析:

    /**
     * QuartzScheduler 的start方法
     * @throws SchedulerException
     */
    public void start() throws SchedulerException {

        if (shuttingDown|| closed) {
            throw new SchedulerException(
                    "The Scheduler cannot be restarted after shutdown() has been called.");
        }

        // QTZ-212 : calling new schedulerStarting() method on the listeners
        // right after entering start()
        notifySchedulerListenersStarting();

        //初始化標(biāo)識為null琼了,進(jìn)行初始化操作
        if (initialStart == null) {
            initialStart = new Date();
            //1 主要分析的地方
            this.resources.getJobStore().schedulerStarted();
            startPlugins();
        } else {

            //2 如果已經(jīng)初始化過,則恢復(fù)jobStore
            resources.getJobStore().schedulerResumed();
        }

        //3 喚醒所有等待的線程
        schedThread.togglePause(false);

        getLog().info(
                "Scheduler " + resources.getUniqueIdentifier() + " started.");

        notifySchedulerListenersStarted();
    

啟動調(diào)度任務(wù)

this.resources.getJobStore().schedulerStarted() ;
主要分析的地方,實際上是調(diào)用 QuartzSchedulerResources中的JobStore進(jìn)行啟動,JobStore是接口夫晌,本例子的實現(xiàn)類是org.quartz.impl.jdbcjobstore.JobStoreSupport
看下面代碼:

    /**
     * org.quartz.impl.jdbcjobstore.JobStoreSupport 調(diào)度開啟
     * @throws SchedulerException
     */
    public void schedulerStarted() throws SchedulerException {
        //是集群
        if (isClustered()) {
            clusterManagementThread = new ClusterManager();
            if(initializersLoader != null)
                clusterManagementThread.setContextClassLoader(initializersLoader);
            clusterManagementThread.initialize();
        } else {//不是集群
            try {
                //1表伦、恢復(fù)job
                recoverJobs();
            } catch (SchedulerException se) {
                throw new SchedulerConfigException(
                        "Failure occured during job recovery.", se);
            }
        }

        misfireHandler = new MisfireHandler();
        if(initializersLoader != null)
            misfireHandler.setContextClassLoader(initializersLoader);
        //2谦去、 獲取ThreadExecutor 線程管理
        misfireHandler.initialize();
        schedulerRunning = true;

        getLog().debug("JobStore background threads started (as scheduler was started).");
    }

1 、恢復(fù)job recoverJobs();

    /**
     * 啟動的時候 有一個恢復(fù)機(jī)制
     * recoverJobs -----  將恢復(fù)任何失敗或misfire的作業(yè)蹦哼,并根據(jù)需要清理數(shù)據(jù)存儲鳄哭。
     * @throws JobPersistenceException
     */
    protected void recoverJobs() throws JobPersistenceException {
        executeInNonManagedTXLock(
                LOCK_TRIGGER_ACCESS,
                new VoidTransactionCallback() {
                    public void executeVoid(Connection conn) throws JobPersistenceException {
                        recoverJobs(conn);//恢復(fù)job
                    }
                }, null);
    }

我們繼續(xù)往下看


   /**
     * 恢復(fù)任務(wù)狀態(tài)
     * @param conn
     * @throws JobPersistenceException
     */
    protected void recoverJobs(Connection conn) throws JobPersistenceException {
        try {
        //1.更新不一致的作業(yè)狀態(tài)      先修改狀態(tài),將 ACQUIRED 和 BLOCKED ---> WAITING
            int rows = getDelegate().updateTriggerStatesFromOtherStates(conn,
                    STATE_WAITING, STATE_ACQUIRED, STATE_BLOCKED);

            rows += getDelegate().updateTriggerStatesFromOtherStates(conn,
                    STATE_PAUSED, STATE_PAUSED_BLOCKED, STATE_PAUSED_BLOCKED);

            //----更新sql---
            //"UPDATE {0}TRIGGERS SET TRIGGER_STATE = ? WHERE SCHED_NAME = {1} AND (TRIGGER_STATE = ? OR TRIGGER_STATE = ?)"

            getLog().info(
                    "Freed " + rows
                            + " triggers from 'acquired' / 'blocked' state.");

            // clean up misfired jobs
            //1.1 清理misfire的jobs
            recoverMisfiredJobs(conn, true);

            // recover jobs marked for recovery that were not fully executed
            //1.2 恢復(fù)未完全執(zhí)行的標(biāo)記為恢復(fù)的作業(yè) --查詢 qrtz_fire_trigger
            List<OperableTrigger> recoveringJobTriggers = getDelegate()
                    .selectTriggersForRecoveringJobs(conn);
            getLog().info("Recovering " + recoveringJobTriggers.size() + " jobs that were in-progress at the time of the last shut-down.");

            for (OperableTrigger recoveringJobTrigger: recoveringJobTriggers) {
                if (jobExists(conn, recoveringJobTrigger.getJobKey())) {
                    recoveringJobTrigger.computeFirstFireTime(null);
                    storeTrigger(conn, recoveringJobTrigger, null, false,
                            STATE_WAITING, false, true);
                }
            }
            getLog().info("Recovery complete.");

            // remove lingering 'complete' triggers...
            //1.3 移除state == complete
            List<TriggerKey> cts = getDelegate().selectTriggersInState(conn, STATE_COMPLETE);
            for(TriggerKey ct: cts) {
                removeTrigger(conn, ct);
            }
            getLog().info(
                    "Removed " + cts.size() + " 'complete' triggers.");

            // clean up any fired trigger entries
            //1.4 清理任何已觸發(fā)的觸發(fā)器條目
            int n = getDelegate().deleteFiredTriggers(conn);
            getLog().info("Removed " + n + " stale fired job entries.");
        } catch (JobPersistenceException e) {
            throw e;
        } catch (Exception e) {
            throw new JobPersistenceException("Couldn't recover jobs: " + e.getMessage(), e);
        }
    }

1.1 清理misfire的jobs recoverMisfiredJobs(conn, true);

    /**
     * 清理misfire的jobs
     * @param conn
     * @param recovering
     * @return
     * @throws JobPersistenceException
     * @throws SQLException
     */
    protected RecoverMisfiredJobsResult recoverMisfiredJobs(Connection conn, boolean recovering) throws JobPersistenceException, SQLException {
        int maxMisfiresToHandleAtATime = recovering ? -1 : this.getMaxMisfiresToHandleAtATime();
        List<TriggerKey> misfiredTriggers = new LinkedList();
        long earliestNewTime = Long.MAX_VALUE;
        /**
         * 是否有misfire的Trigger,我們必須仍然尋找MISFIRED狀態(tài)纲熏,以防觸發(fā)器被遺忘
         * getMisfireTime() 當(dāng)前時間 -(減去) 一分鐘 ,maxMisfiresToHandleAtATime == -1   ,misfiredTriggers== null
         *
         * "SELECT TRIGGER_NAME, TRIGGER_GROUP FROM {0}TRIGGERS WHERE SCHED_NAME = {1} AND NOT (MISFIRE_INSTR = -1) AND NEXT_FIRE_TIME < ? AND TRIGGER_STATE = ? ORDER BY NEXT_FIRE_TIME ASC, PRIORITY DESC"
         *
         *     上面sql查詢出來結(jié)果是個list
         *              (aa1)若resultList.size() == count 返回 TRUEW鼻稹! 否則 返回false局劲!
         *              (aa2)不等于 count 勺拣,封裝數(shù)據(jù),到resultList中鱼填,triggername  TriggerGroup
         */
        boolean hasMoreMisfiredTriggers = this.getDelegate().hasMisfiredTriggersInState(conn, "WAITING", this.getMisfireTime(), maxMisfiresToHandleAtATime, misfiredTriggers);
        if (hasMoreMisfiredTriggers) {
            this.getLog().info("Handling the first " + misfiredTriggers.size() + " triggers that missed their scheduled fire-time.  " + "More misfired triggers remain to be processed.");
        } else {
            if (misfiredTriggers.size() <= 0) {
                this.getLog().debug("Found 0 triggers that missed their scheduled fire-time.");
                return RecoverMisfiredJobsResult.NO_OP;
            }

            this.getLog().info("Handling " + misfiredTriggers.size() + " trigger(s) that missed their scheduled fire-time.");
        }

        Iterator i$ = misfiredTriggers.iterator();

        //循環(huán) misfiredTriggers List集合
        while(i$.hasNext()) {
            TriggerKey triggerKey = (TriggerKey)i$.next();
            //retrieveTrigger 药有,檢索Trigger,檢索到進(jìn)行數(shù)據(jù)封裝
            /**
             *   //retrieveTrigger 執(zhí)行的操作
             *                  (1)"SELECT * FROM {0}TRIGGERS WHERE SCHED_NAME = {1} AND TRIGGER_NAME = ? AND TRIGGER_GROUP = ?"
             *                  (2)關(guān)聯(lián)Trigger對應(yīng)的類型苹丸,如simpleTrigger等
             */
            OperableTrigger trig = this.retrieveTrigger(conn, triggerKey);
            if (trig != null) {
                //do 更新misfire的觸發(fā)器
                this.doUpdateOfMisfiredTrigger(conn, trig, false, "WAITING", recovering);
                if (trig.getNextFireTime() != null && trig.getNextFireTime().getTime() < earliestNewTime) {
                    earliestNewTime = trig.getNextFireTime().getTime();
                }
            }
        }

        return new RecoverMisfiredJobsResult(hasMoreMisfiredTriggers, misfiredTriggers.size(), earliestNewTime);
    }

1.2 更新misfire的觸發(fā)器 doUpdateOfMisfiredTrigger(conn, trig, false, "WAITING", recovering);

    /**
     * 更新misfire的觸發(fā)器
     * @param conn
     * @param trig
     * @param forceState
     * @param newStateIfNotComplete
     * @param recovering
     * @throws JobPersistenceException
     */
    private void doUpdateOfMisfiredTrigger(Connection conn, OperableTrigger trig, boolean forceState, String newStateIfNotComplete, boolean recovering) throws JobPersistenceException {
        Calendar cal = null;
        if (trig.getCalendarName() != null) {
            /**
             * 操作這個表qrtz_calendar
             */
            cal = this.retrieveCalendar(conn, trig.getCalendarName());
        }

        this.schedSignaler.notifyTriggerListenersMisfired(trig);
        //simpleTrigger默認(rèn)的misfire 機(jī)制,設(shè)置下次執(zhí)行的時間(next_fire_time)為當(dāng)前時間愤惰!這里比較重要!W咐怼宦言!
        trig.updateAfterMisfire(cal);
        if (trig.getNextFireTime() == null) {
            this.storeTrigger(conn, trig, (JobDetail)null, true, "COMPLETE", forceState, recovering);
            this.schedSignaler.notifySchedulerListenersFinalized(trig);
        } else {
            this.storeTrigger(conn, trig, (JobDetail)null, true, newStateIfNotComplete, forceState, recovering);
        }

    }

1.3、 恢復(fù)未完全執(zhí)行的標(biāo)記為恢復(fù)的作業(yè)selectTriggersForRecoveringJobs,代碼片段如下


List<OperableTrigger> recoveringJobTriggers = getDelegate()
    .selectTriggersForRecoveringJobs(conn);                 
// INSTANCE_NAME == dufy_test    REQUESTS_RECOVERY == true  實際封裝到數(shù)據(jù)庫查詢是 REQUESTS_RECOVERY== 1
"SELECT * FROM {0}FIRED_TRIGGERS WHERE SCHED_NAME = {1} AND INSTANCE_NAME = ? AND REQUESTS_RECOVERY = ?"
    //具體怎么是 true是怎么轉(zhuǎn)換成為 1的見附1圖片商模!

  Recovery complete.恢復(fù)完成5焱!  

1.4 移除state == complete

List<TriggerKey> cts = getDelegate().selectTriggersInState(conn, STATE_COMPLETE);
-----------------------------------------------------------------------------
    "SELECT TRIGGER_NAME, TRIGGER_GROUP FROM {0}TRIGGERS WHERE SCHED_NAME = {1} AND TRIGGER_STATE = ?"
    -----------------------------------------------------------------------------
    for(TriggerKey ct: cts) {
        removeTrigger(conn, ct);
        ---------------------------------------------------------------------
            (a)刪除前施流,先查詢jobDetail
            JobDetail job = getDelegate().selectJobForTrigger(conn,getClassLoadHelper(), key, false);
        "SELECT J.JOB_NAME, J.JOB_GROUP, J.IS_DURABLE, J.JOB_CLASS_NAME, J.REQUESTS_RECOVERY FROM {0}TRIGGERS T, {0}JOB_DETAILS J WHERE T.SCHED_NAME = {1} AND J.SCHED_NAME = {1} AND T.TRIGGER_NAME = ? AND T.TRIGGER_GROUP = ? AND T.JOB_NAME = J.JOB_NAME AND T.JOB_GROUP = J.JOB_GROUP"

            (b)刪除觸發(fā)器响疚,其偵聽器及其Simple / Cron / BLOB子表條目。
            boolean removedTrigger = deleteTriggerAndChildren(conn, key);
        deleteTrigger(Connection conn, TriggerKey triggerKey)
            (b1)deleteTriggerExtension
            "DELETE FROM {0}SIMPLE_TRIGGERS WHERE SCHED_NAME = {1} AND TRIGGER_NAME = ? AND TRIGGER_GROUP = ?"
            "DELETE FROM {0}BLOB_TRIGGERS WHERE SCHED_NAME = {1} AND TRIGGER_NAME = ? AND TRIGGER_GROUP = ?"

            (b2)"DELETE FROM {0}TRIGGERS WHERE SCHED_NAME = {1} AND TRIGGER_NAME = ? AND TRIGGER_GROUP = ?"

            (c)是否刪除jobdetail 瞪醋,判斷 isDurable 默認(rèn) 為false稽寒。
            if (null != job && !job.isDurable()) {
                int numTriggers = getDelegate().selectNumTriggersForJob(conn,
                                                                        job.getKey());
                ---------------------------------------------------------
                    "SELECT COUNT(TRIGGER_NAME) FROM {0}TRIGGERS WHERE SCHED_NAME = {1} AND JOB_NAME = ? AND JOB_GROUP = ?"
                    ---------------------------------------------------------
                    if (numTriggers == 0) {
                        // Don't call removeJob() because we don't want to check for
                        // triggers again.
                        //不要調(diào)用removeJob(),因為我們不想再次檢查觸發(fā)器趟章。
                        deleteJobAndChildren(conn, job.getKey()); //刪除作業(yè)及其偵聽器。
                        -----------------------------------------------------
                            //deleteJobDetail(Connection conn, JobKey jobKey) 刪除給定作業(yè)的作業(yè)明細(xì)記錄慎王。
                            "DELETE FROM {0}JOB_DETAILS WHERE SCHED_NAME = {1} AND JOB_NAME = ? AND JOB_GROUP = ?"
                            -----------------------------------------------------
                    }
            }
    }

1.5 清理任何已觸發(fā)的觸發(fā)器條目

 int n = getDelegate().deleteFiredTriggers(conn);
 ----------------------------------------------------------------------------
 "DELETE FROM {0}FIRED_TRIGGERS WHERE SCHED_NAME = {1}"
 ----------------------------------------------------------------------------

2蚓土、獲取ThreadExecutor 線程管理 misfireHandler.initialize();

public void initialize() {
        ThreadExecutor executor = getThreadExecutor();
        //getThreadExecutor ==  private ThreadExecutor threadExecutor = new DefaultThreadExecutor();
        executor.execute(MisfireHandler.this); //啟動線程執(zhí)行 對應(yīng)job的 execute方法
        //MisfireHandler  ==  class MisfireHandler extends Thread  繼承了Thread
}

如果已經(jīng)初始化過,則恢復(fù)

jobStoreresources.getJobStore().schedulerResumed();.如果已經(jīng)初始化過赖淤,則恢復(fù)調(diào)度器運行 !

private volatile boolean schedulerRunning = false;//默認(rèn)schedulerRunning = false
public void schedulerResumed() {
    schedulerRunning = true;
}

喚醒所有等待的線程

schedThread.togglePause(false);

schedThread.togglePause(false);
//指示主處理循環(huán)在下一個可能的點暫停蜀漆。
void togglePause(boolean pause) {
    synchronized (sigLock) {
        paused = pause;

        if (paused) {
            signalSchedulingChange(0);
            ------------------------------------------
                //發(fā)信號通知主要處理循環(huán),已經(jīng)進(jìn)行了調(diào)度的改變 - 以便中斷在等待misfire時間到達(dá)時可能發(fā)生的任何睡眠咱旱。
                public void signalSchedulingChange(long candidateNewNextFireTime) {
                synchronized(sigLock) {
                    signaled = true;
                    signaledNextFireTime = candidateNewNextFireTime;
                    sigLock.notifyAll();  // private final Object sigLock = new Object();
                }
            }
            ------------------------------------------
        } else {
            sigLock.notifyAll();//喚醒所有等待的線程
        }
    }
}   

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末确丢,一起剝皮案震驚了整個濱河市绷耍,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌鲜侥,老刑警劉巖褂始,帶你破解...
    沈念sama閱讀 211,290評論 6 491
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異描函,居然都是意外死亡崎苗,警方通過查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,107評論 2 385
  • 文/潘曉璐 我一進(jìn)店門舀寓,熙熙樓的掌柜王于貴愁眉苦臉地迎上來胆数,“玉大人,你說我怎么就攤上這事互墓”啬幔” “怎么了?”我有些...
    開封第一講書人閱讀 156,872評論 0 347
  • 文/不壞的土叔 我叫張陵篡撵,是天一觀的道長判莉。 經(jīng)常有香客問我,道長酸休,這世上最難降的妖魔是什么骂租? 我笑而不...
    開封第一講書人閱讀 56,415評論 1 283
  • 正文 為了忘掉前任,我火速辦了婚禮斑司,結(jié)果婚禮上渗饮,老公的妹妹穿的比我還像新娘。我一直安慰自己宿刮,他們只是感情好互站,可當(dāng)我...
    茶點故事閱讀 65,453評論 6 385
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著僵缺,像睡著了一般胡桃。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上磕潮,一...
    開封第一講書人閱讀 49,784評論 1 290
  • 那天翠胰,我揣著相機(jī)與錄音,去河邊找鬼自脯。 笑死之景,一個胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的膏潮。 我是一名探鬼主播锻狗,決...
    沈念sama閱讀 38,927評論 3 406
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了轻纪?” 一聲冷哼從身側(cè)響起油额,我...
    開封第一講書人閱讀 37,691評論 0 266
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎刻帚,沒想到半個月后潦嘶,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 44,137評論 1 303
  • 正文 獨居荒郊野嶺守林人離奇死亡我擂,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 36,472評論 2 326
  • 正文 我和宋清朗相戀三年衬以,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片校摩。...
    茶點故事閱讀 38,622評論 1 340
  • 序言:一個原本活蹦亂跳的男人離奇死亡看峻,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出衙吩,到底是詐尸還是另有隱情互妓,我是刑警寧澤,帶...
    沈念sama閱讀 34,289評論 4 329
  • 正文 年R本政府宣布坤塞,位于F島的核電站冯勉,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏摹芙。R本人自食惡果不足惜灼狰,卻給世界環(huán)境...
    茶點故事閱讀 39,887評論 3 312
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望浮禾。 院中可真熱鬧交胚,春花似錦、人聲如沸盈电。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,741評論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽匆帚。三九已至熬词,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間吸重,已是汗流浹背互拾。 一陣腳步聲響...
    開封第一講書人閱讀 31,977評論 1 265
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機(jī)就差點兒被人妖公主榨干…… 1. 我叫王不留嚎幸,地道東北人颜矿。 一個月前我還...
    沈念sama閱讀 46,316評論 2 360
  • 正文 我出身青樓,卻偏偏與公主長得像鞭铆,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 43,490評論 2 348

推薦閱讀更多精彩內(nèi)容

  • 導(dǎo)語:作為java領(lǐng)域最受歡迎的任務(wù)調(diào)度庫之一车遂,quartz為開發(fā)者提供了豐富的任務(wù)調(diào)度功能封断,比如讓某段程序在每天...
    star24閱讀 23,792評論 8 60
  • 任務(wù)調(diào)度簡介 1、什么時候需要任務(wù)調(diào)度 業(yè)務(wù)場景:1)賬單日或者還款日上午 9 點舶担,給每個信用卡客戶發(fā)送賬單通知坡疼,...
    vincent浩哥閱讀 2,032評論 0 0
  • 一、背景 到目前為止quartz集群模式必須開啟存儲模式才可以使用衣陶,我這邊存儲使用的是mysql數(shù)據(jù)庫柄瑰;因為我本人...
    深谷9002閱讀 960評論 1 2
  • 首先介紹一下Quartz啟動過程和幾個核心類的主要職責(zé),在其他的文章里剪况,分別詳細(xì)的介紹這幾個核心類教沾。 1、通常译断,調(diào)...
    mxwgong閱讀 1,253評論 0 1
  • 前面的話 這里只對quartz的源碼做一個整體的梳理授翻,關(guān)于quartz的整體結(jié)構(gòu),百度Google之孙咪,一堆一堆的堪唐。...
    海蟾子_null閱讀 2,951評論 0 1