領(lǐng)略Quartz源碼架構(gòu)之美——源碼實(shí)彈之運(yùn)行過(guò)程(二)

本章閱讀收獲:可了解Quartz框架中的正式開(kāi)始運(yùn)行部分源碼

繼上節(jié)內(nèi)容

在上一節(jié)內(nèi)容中邻梆,我們講到了schedule調(diào)取器的start的方法,但是對(duì)于具體job是如何運(yùn)行的,我們還沒(méi)有揭開(kāi)它神秘的面紗,下面跟著我一步步來(lái)。

回憶殺

不知大家是否還記得夫偶,領(lǐng)略Quartz源碼架構(gòu)之美——源碼實(shí)彈之Scheduler(五)中講到的

qs = new QuartzScheduler(rsrcs, idleWaitTime, dbFailureRetry);

這行代碼是否大家還記得。
我們深入進(jìn)去給大家繼續(xù)看下

    /**
     * 初始化QuartzScheduler類(lèi)
     */
    public QuartzScheduler(QuartzSchedulerResources resources, long idleWaitTime, @Deprecated long dbRetryInterval)
        throws SchedulerException {
        this.resources = resources;
        if (resources.getJobStore() instanceof JobListener) {
            addInternalJobListener((JobListener)resources.getJobStore());
        }

        this.schedThread = new QuartzSchedulerThread(this, resources);
        ThreadExecutor schedThreadExecutor = resources.getThreadExecutor();
        schedThreadExecutor.execute(this.schedThread);
        if (idleWaitTime > 0) {
            this.schedThread.setIdleWaitTime(idleWaitTime);
        }

        jobMgr = new ExecutingJobsManager();
        addInternalJobListener(jobMgr);
        errLogger = new ErrorLogger();
        addInternalSchedulerListener(errLogger);

        signaler = new SchedulerSignalerImpl(this, this.schedThread);
        
        getLog().info("Quartz Scheduler v." + getVersion() + " created.");
    }

劃重點(diǎn):

        //創(chuàng)建調(diào)度線程
        this.schedThread = new QuartzSchedulerThread(this, resources);
        ThreadExecutor schedThreadExecutor = resources.getThreadExecutor();
        schedThreadExecutor.execute(this.schedThread);
        if (idleWaitTime > 0) {
            this.schedThread.setIdleWaitTime(idleWaitTime);
        }

這段代碼在干嘛呢觉增?
就是創(chuàng)建了調(diào)度線程兵拢,并且執(zhí)行

public void execute(Thread thread) {
        thread.start();
    }

schedThreadExecutor.execute(this.schedThread);的調(diào)用其實(shí)就是去啟動(dòng)調(diào)度線程。那么接下來(lái)我們就要進(jìn)行對(duì)QuartzSchedulerThread的run方法進(jìn)行分析了逾礁。

QuartzSchedulerThread的run方法源碼分析

先看下整塊代碼:

    @Override
    public void run() {
        boolean lastAcquireFailed = false;

        while (!halted.get()) {
            try {
                // check if we're supposed to pause...
                synchronized (sigLock) {
                    //這里的paused對(duì)應(yīng)QuartzScheduler的start方法中啟動(dòng)
                    while (paused && !halted.get()) {
                        try {
                            // wait until togglePause(false) is called...
                            sigLock.wait(1000L);
                        } catch (InterruptedException ignore) {
                        }
                    }

                    if (halted.get()) {
                        break;
                    }
                }
                //獲取可用線程數(shù) qsRsrcs是QuartzSchedulerResources對(duì)象
                int availThreadCount = qsRsrcs.getThreadPool().blockForAvailableThreads();
                if(availThreadCount > 0) { // will always be true, due to semantics of blockForAvailableThreads...

                    List<OperableTrigger> triggers = null;

                    long now = System.currentTimeMillis();
                    //清除調(diào)度改變的信號(hào)
                    clearSignaledSchedulingChange();
                    try {
                        //到JobStore中獲取下次被觸發(fā)的觸發(fā)器
                        triggers = qsRsrcs.getJobStore().acquireNextTriggers(
                                now + idleWaitTime, Math.min(availThreadCount, qsRsrcs.getMaxBatchSize()), qsRsrcs.getBatchTimeWindow());
                        lastAcquireFailed = false;
                        if (log.isDebugEnabled()) 
                            log.debug("batch acquisition of " + (triggers == null ? 0 : triggers.size()) + " triggers");
                    } catch (JobPersistenceException jpe) {
                        if(!lastAcquireFailed) {
                            qs.notifySchedulerListenersError(
                                "An error occurred while scanning for the next triggers to fire.",
                                jpe);
                        }
                        lastAcquireFailed = true;
                        continue;
                    } catch (RuntimeException e) {
                        if(!lastAcquireFailed) {
                            getLog().error("quartzSchedulerThreadLoop: RuntimeException "
                                    +e.getMessage(), e);
                        }
                        lastAcquireFailed = true;
                        continue;
                    }

                    if (triggers != null && !triggers.isEmpty()) {

                        now = System.currentTimeMillis();
                        //這里為什么triggers的第一個(gè)對(duì)象就是最早需要被執(zhí)行的说铃?
                        long triggerTime = triggers.get(0).getNextFireTime().getTime();
                        long timeUntilTrigger = triggerTime - now;
                        //如果第一條下次觸發(fā)時(shí)間大于當(dāng)前時(shí)間則進(jìn)入等待
                        while(timeUntilTrigger > 2) {
                            synchronized (sigLock) {
                                if (halted.get()) {
                                    break;
                                }
                                if (!isCandidateNewTimeEarlierWithinReason(triggerTime, false)) {
                                    try {
                                        // we could have blocked a long while
                                        // on 'synchronize', so we must recompute
                                        now = System.currentTimeMillis();
                                        timeUntilTrigger = triggerTime - now;
                                        if(timeUntilTrigger >= 1)
                                            sigLock.wait(timeUntilTrigger);
                                    } catch (InterruptedException ignore) {
                                    }
                                }
                            }
                            //等待的過(guò)程中看看有沒(méi)有收到調(diào)度信號(hào)
                            if(releaseIfScheduleChangedSignificantly(triggers, triggerTime)) {
                                break;
                            }
                            now = System.currentTimeMillis();
                            timeUntilTrigger = triggerTime - now;
                        }

                        // this happens if releaseIfScheduleChangedSignificantly decided to release triggers
                        if(triggers.isEmpty())
                            continue;

                        // set triggers to 'executing'
                        List<TriggerFiredResult> bndles = new ArrayList<TriggerFiredResult>();

                        boolean goAhead = true;
                        synchronized(sigLock) {
                            goAhead = !halted.get();
                        }
                        if(goAhead) {
                            try {
                                List<TriggerFiredResult> res = qsRsrcs.getJobStore().triggersFired(triggers);
                                if(res != null)
                                    bndles = res;
                            } catch (SchedulerException se) {
                                qs.notifySchedulerListenersError(
                                        "An error occurred while firing triggers '"
                                                + triggers + "'", se);
                                //QTZ-179 : a problem occurred interacting with the triggers from the db
                                //we release them and loop again
                                for (int i = 0; i < triggers.size(); i++) {
                                    qsRsrcs.getJobStore().releaseAcquiredTrigger(triggers.get(i));
                                }
                                continue;
                            }

                        }

                        for (int i = 0; i < bndles.size(); i++) {
                            TriggerFiredResult result =  bndles.get(i);
                            TriggerFiredBundle bndle =  result.getTriggerFiredBundle();
                            Exception exception = result.getException();

                            if (exception instanceof RuntimeException) {
                                getLog().error("RuntimeException while firing trigger " + triggers.get(i), exception);
                                qsRsrcs.getJobStore().releaseAcquiredTrigger(triggers.get(i));
                                continue;
                            }

                            // it's possible to get 'null' if the triggers was paused,
                            // blocked, or other similar occurrences that prevent it being
                            // fired at this time...  or if the scheduler was shutdown (halted)
                            if (bndle == null) {
                                qsRsrcs.getJobStore().releaseAcquiredTrigger(triggers.get(i));
                                continue;
                            }

                            // 下面是開(kāi)始執(zhí)行任務(wù)
                            JobRunShell shell = null;
                            try {
                                //構(gòu)造執(zhí)行對(duì)象,JobRunShell實(shí)現(xiàn)了Runnable
                                shell = qsRsrcs.getJobRunShellFactory().createJobRunShell(bndle);
                                //這個(gè)里面會(huì)用我們自定義的Job來(lái)new一個(gè)對(duì)象敞斋,并把相關(guān)執(zhí)行Job是需要的數(shù)據(jù)傳給JobExecutionContextImpl(這是我們自定義job的execute方法參數(shù))
                                shell.initialize(qs);
                            } catch (SchedulerException se) {
                                qsRsrcs.getJobStore().triggeredJobComplete(triggers.get(i), bndle.getJobDetail(), CompletedExecutionInstruction.SET_ALL_JOB_TRIGGERS_ERROR);
                                continue;
                            }

                            // 這里是把任務(wù)放入到線程池中
                            if (qsRsrcs.getThreadPool().runInThread(shell) == false) {
                                // this case should never happen, as it is indicative of the
                                // scheduler being shutdown or a bug in the thread pool or
                                // a thread pool being used concurrently - which the docs
                                // say not to do...
                                getLog().error("ThreadPool.runInThread() return false!");
                                qsRsrcs.getJobStore().triggeredJobComplete(triggers.get(i), bndle.getJobDetail(), CompletedExecutionInstruction.SET_ALL_JOB_TRIGGERS_ERROR);
                            }

                        }

                        continue; // while (!halted)
                    }
                } else { // if(availThreadCount > 0)
                    // should never happen, if threadPool.blockForAvailableThreads() follows contract
                    continue; // while (!halted)
                }

                long now = System.currentTimeMillis();
                long waitTime = now + getRandomizedIdleWaitTime();
                long timeUntilContinue = waitTime - now;
                synchronized(sigLock) {
                    try {
                      if(!halted.get()) {
                        // QTZ-336 A job might have been completed in the mean time and we might have
                        // missed the scheduled changed signal by not waiting for the notify() yet
                        // Check that before waiting for too long in case this very job needs to be
                        // scheduled very soon
                        if (!isScheduleChanged()) {
                          sigLock.wait(timeUntilContinue);
                        }
                      }
                    } catch (InterruptedException ignore) {
                    }
                }

            } catch(RuntimeException re) {
                getLog().error("Runtime error occurred in main trigger firing loop.", re);
            }
        } // while (!halted)

        // drop references to scheduler stuff to aid garbage collection...
        qs = null;
        qsRsrcs = null;
    }

我們可以整體的話 我們是通過(guò)一個(gè)while循環(huán)來(lái)保證定時(shí)任務(wù)不斷去運(yùn)行的截汪。這里就引發(fā)了一個(gè)我的一個(gè)好奇?是不是定時(shí)任務(wù)的話植捎,基本都是通過(guò)while這種形式來(lái)實(shí)現(xiàn)的呢衙解?因?yàn)樵谖夷X海的各種假設(shè)中,只有無(wú)線循環(huán)能夠滿足焰枢。

下面我們就開(kāi)始進(jìn)行逐段分析:

                // check if we're supposed to pause...
                synchronized (sigLock) {
                    //這里的paused對(duì)應(yīng)QuartzScheduler的start方法中啟動(dòng)
                    while (paused && !halted.get()) {
                        try {
                            // wait until togglePause(false) is called...
                            sigLock.wait(1000L);
                        } catch (InterruptedException ignore) {
                        }
                    }

                    if (halted.get()) {
                        break;
                    }
                }

這塊代碼可能乍一看很簡(jiǎn)單蚓峦,也很容易懂,就是一個(gè)while循環(huán)济锄,滿足條件就等待暑椰。但是放在于整個(gè)框架中,就有一些微妙了荐绝,這種的paused變量會(huì)在QuartzScheduler的start方法中啟動(dòng)設(shè)置為false一汽,在上一節(jié)中我其實(shí)也貼出來(lái)代碼過(guò):

    /**
     * 調(diào)度器開(kāi)始運(yùn)行
     */
    public void start() throws SchedulerException {

        if (shuttingDown|| closed) {
            throw new SchedulerException(
                    "The Scheduler cannot be restarted after shutdown() has been called.");
        }


        // 通知調(diào)度器監(jiān)控器啟動(dòng)中
        notifySchedulerListenersStarting();

        if (initialStart == null) { //初始化標(biāo)識(shí)為null,進(jìn)行初始化操作
            initialStart = new Date();
            this.resources.getJobStore().schedulerStarted();            
            startPlugins();
        } else {
            resources.getJobStore().schedulerResumed();
        }

        schedThread.togglePause(false);//設(shè)置 不暫停

        getLog().info(
                "Scheduler " + resources.getUniqueIdentifier() + " started.");
        //提醒調(diào)度器的監(jiān)聽(tīng)啟動(dòng)
        notifySchedulerListenersStarted();
    }

中的 schedThread.togglePause(false);,所以只有人為的去掉用schedule.start的方法之后低滩,定時(shí)任務(wù)才會(huì)正在的開(kāi)始進(jìn)行跑動(dòng)召夹。至于其中的paused和halted變量,則是在QuartzSchedulerThread構(gòu)造方法中初始化的:

    QuartzSchedulerThread(QuartzScheduler qs, QuartzSchedulerResources qsRsrcs, boolean setDaemon, int threadPrio) {
        super(qs.getSchedulerThreadGroup(), qsRsrcs.getThreadName());
        this.qs = qs;
        this.qsRsrcs = qsRsrcs;
        this.setDaemon(setDaemon);
        if(qsRsrcs.isThreadsInheritInitializersClassLoadContext()) {
            log.info("QuartzSchedulerThread Inheriting ContextClassLoader of thread: " + Thread.currentThread().getName());
            this.setContextClassLoader(Thread.currentThread().getContextClassLoader());
        }

        this.setPriority(threadPrio);

        // start the underlying thread, but put this object into the 'paused'
        // state
        // so processing doesn't start yet...
        paused = true;
        halted = new AtomicBoolean(false);
    }

可以看到默認(rèn)paused為true恕沫,halted為false监憎。

結(jié)束語(yǔ)

本節(jié)內(nèi)容揭開(kāi)了Quartz的部門(mén)神秘面紗,知道他是如何去定時(shí)跑動(dòng)任務(wù)的婶溯,之后我們會(huì)繼續(xù)詳細(xì)的跟進(jìn)鲸阔。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末偷霉,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子褐筛,更是在濱河造成了極大的恐慌类少,老刑警劉巖,帶你破解...
    沈念sama閱讀 219,039評(píng)論 6 508
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件渔扎,死亡現(xiàn)場(chǎng)離奇詭異瞒滴,居然都是意外死亡,警方通過(guò)查閱死者的電腦和手機(jī)赞警,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,426評(píng)論 3 395
  • 文/潘曉璐 我一進(jìn)店門(mén),熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)虏两,“玉大人愧旦,你說(shuō)我怎么就攤上這事《ò眨” “怎么了笤虫?”我有些...
    開(kāi)封第一講書(shū)人閱讀 165,417評(píng)論 0 356
  • 文/不壞的土叔 我叫張陵,是天一觀的道長(zhǎng)祖凫。 經(jīng)常有香客問(wèn)我琼蚯,道長(zhǎng),這世上最難降的妖魔是什么惠况? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 58,868評(píng)論 1 295
  • 正文 為了忘掉前任遭庶,我火速辦了婚禮,結(jié)果婚禮上稠屠,老公的妹妹穿的比我還像新娘峦睡。我一直安慰自己,他們只是感情好权埠,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,892評(píng)論 6 392
  • 文/花漫 我一把揭開(kāi)白布榨了。 她就那樣靜靜地躺著,像睡著了一般攘蔽。 火紅的嫁衣襯著肌膚如雪龙屉。 梳的紋絲不亂的頭發(fā)上,一...
    開(kāi)封第一講書(shū)人閱讀 51,692評(píng)論 1 305
  • 那天满俗,我揣著相機(jī)與錄音转捕,去河邊找鬼。 笑死漫雷,一個(gè)胖子當(dāng)著我的面吹牛瓜富,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播降盹,決...
    沈念sama閱讀 40,416評(píng)論 3 419
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼与柑,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼谤辜!你這毒婦竟也來(lái)了?” 一聲冷哼從身側(cè)響起价捧,我...
    開(kāi)封第一講書(shū)人閱讀 39,326評(píng)論 0 276
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤丑念,失蹤者是張志新(化名)和其女友劉穎,沒(méi)想到半個(gè)月后结蟋,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體脯倚,經(jīng)...
    沈念sama閱讀 45,782評(píng)論 1 316
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,957評(píng)論 3 337
  • 正文 我和宋清朗相戀三年嵌屎,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了推正。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 40,102評(píng)論 1 350
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡宝惰,死狀恐怖植榕,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情尼夺,我是刑警寧澤尊残,帶...
    沈念sama閱讀 35,790評(píng)論 5 346
  • 正文 年R本政府宣布,位于F島的核電站淤堵,受9級(jí)特大地震影響寝衫,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜拐邪,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,442評(píng)論 3 331
  • 文/蒙蒙 一慰毅、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧庙睡,春花似錦事富、人聲如沸。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 31,996評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)绷杜。三九已至尚辑,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間蜀细,已是汗流浹背谤逼。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 33,113評(píng)論 1 272
  • 我被黑心中介騙來(lái)泰國(guó)打工贵扰, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人流部。 一個(gè)月前我還...
    沈念sama閱讀 48,332評(píng)論 3 373
  • 正文 我出身青樓戚绕,卻偏偏與公主長(zhǎng)得像,于是被迫代替她去往敵國(guó)和親枝冀。 傳聞我的和親對(duì)象是個(gè)殘疾皇子舞丛,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,044評(píng)論 2 355

推薦閱讀更多精彩內(nèi)容