本章閱讀收獲:可了解Quartz框架中的正式開(kāi)始運(yùn)行部分源碼
繼上節(jié)內(nèi)容
在上一節(jié)內(nèi)容中邻梆,我們講到了schedule調(diào)取器的start的方法,但是對(duì)于具體job是如何運(yùn)行的,我們還沒(méi)有揭開(kāi)它神秘的面紗,下面跟著我一步步來(lái)。
回憶殺
不知大家是否還記得夫偶,領(lǐng)略Quartz源碼架構(gòu)之美——源碼實(shí)彈之Scheduler(五)中講到的
qs = new QuartzScheduler(rsrcs, idleWaitTime, dbFailureRetry);
這行代碼是否大家還記得。
我們深入進(jìn)去給大家繼續(xù)看下
/**
* 初始化QuartzScheduler類(lèi)
*/
public QuartzScheduler(QuartzSchedulerResources resources, long idleWaitTime, @Deprecated long dbRetryInterval)
throws SchedulerException {
this.resources = resources;
if (resources.getJobStore() instanceof JobListener) {
addInternalJobListener((JobListener)resources.getJobStore());
}
this.schedThread = new QuartzSchedulerThread(this, resources);
ThreadExecutor schedThreadExecutor = resources.getThreadExecutor();
schedThreadExecutor.execute(this.schedThread);
if (idleWaitTime > 0) {
this.schedThread.setIdleWaitTime(idleWaitTime);
}
jobMgr = new ExecutingJobsManager();
addInternalJobListener(jobMgr);
errLogger = new ErrorLogger();
addInternalSchedulerListener(errLogger);
signaler = new SchedulerSignalerImpl(this, this.schedThread);
getLog().info("Quartz Scheduler v." + getVersion() + " created.");
}
劃重點(diǎn):
//創(chuàng)建調(diào)度線程
this.schedThread = new QuartzSchedulerThread(this, resources);
ThreadExecutor schedThreadExecutor = resources.getThreadExecutor();
schedThreadExecutor.execute(this.schedThread);
if (idleWaitTime > 0) {
this.schedThread.setIdleWaitTime(idleWaitTime);
}
這段代碼在干嘛呢觉增?
就是創(chuàng)建了調(diào)度線程兵拢,并且執(zhí)行
public void execute(Thread thread) {
thread.start();
}
schedThreadExecutor.execute(this.schedThread);的調(diào)用其實(shí)就是去啟動(dòng)調(diào)度線程。那么接下來(lái)我們就要進(jìn)行對(duì)QuartzSchedulerThread的run方法進(jìn)行分析了逾礁。
QuartzSchedulerThread的run方法源碼分析
先看下整塊代碼:
@Override
public void run() {
boolean lastAcquireFailed = false;
while (!halted.get()) {
try {
// check if we're supposed to pause...
synchronized (sigLock) {
//這里的paused對(duì)應(yīng)QuartzScheduler的start方法中啟動(dòng)
while (paused && !halted.get()) {
try {
// wait until togglePause(false) is called...
sigLock.wait(1000L);
} catch (InterruptedException ignore) {
}
}
if (halted.get()) {
break;
}
}
//獲取可用線程數(shù) qsRsrcs是QuartzSchedulerResources對(duì)象
int availThreadCount = qsRsrcs.getThreadPool().blockForAvailableThreads();
if(availThreadCount > 0) { // will always be true, due to semantics of blockForAvailableThreads...
List<OperableTrigger> triggers = null;
long now = System.currentTimeMillis();
//清除調(diào)度改變的信號(hào)
clearSignaledSchedulingChange();
try {
//到JobStore中獲取下次被觸發(fā)的觸發(fā)器
triggers = qsRsrcs.getJobStore().acquireNextTriggers(
now + idleWaitTime, Math.min(availThreadCount, qsRsrcs.getMaxBatchSize()), qsRsrcs.getBatchTimeWindow());
lastAcquireFailed = false;
if (log.isDebugEnabled())
log.debug("batch acquisition of " + (triggers == null ? 0 : triggers.size()) + " triggers");
} catch (JobPersistenceException jpe) {
if(!lastAcquireFailed) {
qs.notifySchedulerListenersError(
"An error occurred while scanning for the next triggers to fire.",
jpe);
}
lastAcquireFailed = true;
continue;
} catch (RuntimeException e) {
if(!lastAcquireFailed) {
getLog().error("quartzSchedulerThreadLoop: RuntimeException "
+e.getMessage(), e);
}
lastAcquireFailed = true;
continue;
}
if (triggers != null && !triggers.isEmpty()) {
now = System.currentTimeMillis();
//這里為什么triggers的第一個(gè)對(duì)象就是最早需要被執(zhí)行的说铃?
long triggerTime = triggers.get(0).getNextFireTime().getTime();
long timeUntilTrigger = triggerTime - now;
//如果第一條下次觸發(fā)時(shí)間大于當(dāng)前時(shí)間則進(jìn)入等待
while(timeUntilTrigger > 2) {
synchronized (sigLock) {
if (halted.get()) {
break;
}
if (!isCandidateNewTimeEarlierWithinReason(triggerTime, false)) {
try {
// we could have blocked a long while
// on 'synchronize', so we must recompute
now = System.currentTimeMillis();
timeUntilTrigger = triggerTime - now;
if(timeUntilTrigger >= 1)
sigLock.wait(timeUntilTrigger);
} catch (InterruptedException ignore) {
}
}
}
//等待的過(guò)程中看看有沒(méi)有收到調(diào)度信號(hào)
if(releaseIfScheduleChangedSignificantly(triggers, triggerTime)) {
break;
}
now = System.currentTimeMillis();
timeUntilTrigger = triggerTime - now;
}
// this happens if releaseIfScheduleChangedSignificantly decided to release triggers
if(triggers.isEmpty())
continue;
// set triggers to 'executing'
List<TriggerFiredResult> bndles = new ArrayList<TriggerFiredResult>();
boolean goAhead = true;
synchronized(sigLock) {
goAhead = !halted.get();
}
if(goAhead) {
try {
List<TriggerFiredResult> res = qsRsrcs.getJobStore().triggersFired(triggers);
if(res != null)
bndles = res;
} catch (SchedulerException se) {
qs.notifySchedulerListenersError(
"An error occurred while firing triggers '"
+ triggers + "'", se);
//QTZ-179 : a problem occurred interacting with the triggers from the db
//we release them and loop again
for (int i = 0; i < triggers.size(); i++) {
qsRsrcs.getJobStore().releaseAcquiredTrigger(triggers.get(i));
}
continue;
}
}
for (int i = 0; i < bndles.size(); i++) {
TriggerFiredResult result = bndles.get(i);
TriggerFiredBundle bndle = result.getTriggerFiredBundle();
Exception exception = result.getException();
if (exception instanceof RuntimeException) {
getLog().error("RuntimeException while firing trigger " + triggers.get(i), exception);
qsRsrcs.getJobStore().releaseAcquiredTrigger(triggers.get(i));
continue;
}
// it's possible to get 'null' if the triggers was paused,
// blocked, or other similar occurrences that prevent it being
// fired at this time... or if the scheduler was shutdown (halted)
if (bndle == null) {
qsRsrcs.getJobStore().releaseAcquiredTrigger(triggers.get(i));
continue;
}
// 下面是開(kāi)始執(zhí)行任務(wù)
JobRunShell shell = null;
try {
//構(gòu)造執(zhí)行對(duì)象,JobRunShell實(shí)現(xiàn)了Runnable
shell = qsRsrcs.getJobRunShellFactory().createJobRunShell(bndle);
//這個(gè)里面會(huì)用我們自定義的Job來(lái)new一個(gè)對(duì)象敞斋,并把相關(guān)執(zhí)行Job是需要的數(shù)據(jù)傳給JobExecutionContextImpl(這是我們自定義job的execute方法參數(shù))
shell.initialize(qs);
} catch (SchedulerException se) {
qsRsrcs.getJobStore().triggeredJobComplete(triggers.get(i), bndle.getJobDetail(), CompletedExecutionInstruction.SET_ALL_JOB_TRIGGERS_ERROR);
continue;
}
// 這里是把任務(wù)放入到線程池中
if (qsRsrcs.getThreadPool().runInThread(shell) == false) {
// this case should never happen, as it is indicative of the
// scheduler being shutdown or a bug in the thread pool or
// a thread pool being used concurrently - which the docs
// say not to do...
getLog().error("ThreadPool.runInThread() return false!");
qsRsrcs.getJobStore().triggeredJobComplete(triggers.get(i), bndle.getJobDetail(), CompletedExecutionInstruction.SET_ALL_JOB_TRIGGERS_ERROR);
}
}
continue; // while (!halted)
}
} else { // if(availThreadCount > 0)
// should never happen, if threadPool.blockForAvailableThreads() follows contract
continue; // while (!halted)
}
long now = System.currentTimeMillis();
long waitTime = now + getRandomizedIdleWaitTime();
long timeUntilContinue = waitTime - now;
synchronized(sigLock) {
try {
if(!halted.get()) {
// QTZ-336 A job might have been completed in the mean time and we might have
// missed the scheduled changed signal by not waiting for the notify() yet
// Check that before waiting for too long in case this very job needs to be
// scheduled very soon
if (!isScheduleChanged()) {
sigLock.wait(timeUntilContinue);
}
}
} catch (InterruptedException ignore) {
}
}
} catch(RuntimeException re) {
getLog().error("Runtime error occurred in main trigger firing loop.", re);
}
} // while (!halted)
// drop references to scheduler stuff to aid garbage collection...
qs = null;
qsRsrcs = null;
}
我們可以整體的話 我們是通過(guò)一個(gè)while循環(huán)來(lái)保證定時(shí)任務(wù)不斷去運(yùn)行的截汪。這里就引發(fā)了一個(gè)我的一個(gè)好奇?是不是定時(shí)任務(wù)的話植捎,基本都是通過(guò)while這種形式來(lái)實(shí)現(xiàn)的呢衙解?因?yàn)樵谖夷X海的各種假設(shè)中,只有無(wú)線循環(huán)能夠滿足焰枢。
下面我們就開(kāi)始進(jìn)行逐段分析:
// check if we're supposed to pause...
synchronized (sigLock) {
//這里的paused對(duì)應(yīng)QuartzScheduler的start方法中啟動(dòng)
while (paused && !halted.get()) {
try {
// wait until togglePause(false) is called...
sigLock.wait(1000L);
} catch (InterruptedException ignore) {
}
}
if (halted.get()) {
break;
}
}
這塊代碼可能乍一看很簡(jiǎn)單蚓峦,也很容易懂,就是一個(gè)while循環(huán)济锄,滿足條件就等待暑椰。但是放在于整個(gè)框架中,就有一些微妙了荐绝,這種的paused變量會(huì)在QuartzScheduler的start方法中啟動(dòng)設(shè)置為false一汽,在上一節(jié)中我其實(shí)也貼出來(lái)代碼過(guò):
/**
* 調(diào)度器開(kāi)始運(yùn)行
*/
public void start() throws SchedulerException {
if (shuttingDown|| closed) {
throw new SchedulerException(
"The Scheduler cannot be restarted after shutdown() has been called.");
}
// 通知調(diào)度器監(jiān)控器啟動(dòng)中
notifySchedulerListenersStarting();
if (initialStart == null) { //初始化標(biāo)識(shí)為null,進(jìn)行初始化操作
initialStart = new Date();
this.resources.getJobStore().schedulerStarted();
startPlugins();
} else {
resources.getJobStore().schedulerResumed();
}
schedThread.togglePause(false);//設(shè)置 不暫停
getLog().info(
"Scheduler " + resources.getUniqueIdentifier() + " started.");
//提醒調(diào)度器的監(jiān)聽(tīng)啟動(dòng)
notifySchedulerListenersStarted();
}
中的 schedThread.togglePause(false);,所以只有人為的去掉用schedule.start的方法之后低滩,定時(shí)任務(wù)才會(huì)正在的開(kāi)始進(jìn)行跑動(dòng)召夹。至于其中的paused和halted變量,則是在QuartzSchedulerThread構(gòu)造方法中初始化的:
QuartzSchedulerThread(QuartzScheduler qs, QuartzSchedulerResources qsRsrcs, boolean setDaemon, int threadPrio) {
super(qs.getSchedulerThreadGroup(), qsRsrcs.getThreadName());
this.qs = qs;
this.qsRsrcs = qsRsrcs;
this.setDaemon(setDaemon);
if(qsRsrcs.isThreadsInheritInitializersClassLoadContext()) {
log.info("QuartzSchedulerThread Inheriting ContextClassLoader of thread: " + Thread.currentThread().getName());
this.setContextClassLoader(Thread.currentThread().getContextClassLoader());
}
this.setPriority(threadPrio);
// start the underlying thread, but put this object into the 'paused'
// state
// so processing doesn't start yet...
paused = true;
halted = new AtomicBoolean(false);
}
可以看到默認(rèn)paused為true恕沫,halted為false监憎。
結(jié)束語(yǔ)
本節(jié)內(nèi)容揭開(kāi)了Quartz的部門(mén)神秘面紗,知道他是如何去定時(shí)跑動(dòng)任務(wù)的婶溯,之后我們會(huì)繼續(xù)詳細(xì)的跟進(jìn)鲸阔。