quartz是一個(gè)定時(shí)任務(wù)烈菌,如果是自己實(shí)現(xiàn)定時(shí)任務(wù)會(huì)怎么做呢?正常的做法都會(huì)使用一個(gè)生產(chǎn)者多個(gè)消費(fèi)者模式,生產(chǎn)者獲取任務(wù)交給消費(fèi)者去消費(fèi)痰娱。消費(fèi)者交給jdk的線程池去管理。在閱讀源碼的時(shí)候菩收,首先不去看源碼梨睁,而是想想如果自己去實(shí)現(xiàn)這樣的業(yè)務(wù),會(huì)怎么做娜饵,然后再去分析代碼能夠起到事半功倍的效果坡贺。
分析quartz源碼后,quartz確實(shí)是采用的一個(gè)生產(chǎn)者多個(gè)消費(fèi)者模式箱舞,只不過是一個(gè)調(diào)度器(Scheduler)一個(gè)生產(chǎn)者線程遍坟,但使用spring boot基本都是一個(gè)調(diào)度器,所以可以說是一個(gè)生產(chǎn)者晴股。線程池不是采用的jdk線程池愿伴,而是自己實(shí)現(xiàn)了一套線程池,原理差不多电湘。quartz生產(chǎn)者線程是QuartzSchedulerThread公般,消費(fèi)者線程是WorkerThread,WorkerThread是SimpleThreadPool的內(nèi)部類胡桨。
1. QuartzSchedulerThread線程
分析線程官帘,要分析兩個(gè)事情,一是什么時(shí)候啟動(dòng)的昧谊,二是run方法做了什么,也就是分析Thread的start方法和run方法
1.1 啟動(dòng)
在quartz初始化一節(jié)中寫到刽虹,在創(chuàng)建Scheduler的時(shí)候會(huì)創(chuàng)建一個(gè)QuartzScheduler對象,構(gòu)建QuartzScheduler對象的時(shí)候會(huì)啟動(dòng)QuartzSchedulerThread線程呢诬。代碼如下:
public QuartzScheduler(QuartzSchedulerResources resources, long idleWaitTime, @Deprecated long dbRetryInterval)
throws SchedulerException {
...
this.schedThread = new QuartzSchedulerThread(this, resources);
ThreadExecutor schedThreadExecutor = resources.getThreadExecutor();
schedThreadExecutor.execute(this.schedThread);
...
}
ThreadExecutor可以在配置文件指定 涌哲,默認(rèn)是DefaultThreadExecutor胖缤。
1.2 run方法
@Override
public void run() {
int acquiresFailed = 0;
//---------------1. 判斷調(diào)度器是否中斷---------------------------
while (!halted.get()) {
try {
// check if we're supposed to pause...
synchronized (sigLock) {
while (paused && !halted.get()) {
try {
// wait until togglePause(false) is called...
sigLock.wait(1000L);
} catch (InterruptedException ignore) {
}
// reset failure counter when paused, so that we don't
// wait again after unpausing
acquiresFailed = 0;
}
if (halted.get()) {
break;
}
}
// wait a bit, if reading from job store is consistently
// failing (e.g. DB is down or restarting)..
if (acquiresFailed > 1) {
try {
long delay = computeDelayForRepeatedErrors(qsRsrcs.getJobStore(), acquiresFailed);
Thread.sleep(delay);
} catch (Exception ignore) {
}
}
//--------------2. 判讀消費(fèi)者的可用線程是否大于0,一直會(huì)返回true--------------------------
int availThreadCount = qsRsrcs.getThreadPool().blockForAvailableThreads();
if(availThreadCount > 0) { // will always be true, due to semantics of blockForAvailableThreads...
List<OperableTrigger> triggers;
long now = System.currentTimeMillis();
clearSignaledSchedulingChange();
try {
//--------------3. 獲取將要執(zhí)行的觸發(fā)器-----------------------------
triggers = qsRsrcs.getJobStore().acquireNextTriggers(
now + idleWaitTime, Math.min(availThreadCount, qsRsrcs.getMaxBatchSize()), qsRsrcs.getBatchTimeWindow());
acquiresFailed = 0;
if (log.isDebugEnabled())
log.debug("batch acquisition of " + (triggers == null ? 0 : triggers.size()) + " triggers");
} catch (JobPersistenceException jpe) {
if (acquiresFailed == 0) {
qs.notifySchedulerListenersError(
"An error occurred while scanning for the next triggers to fire.",
jpe);
}
if (acquiresFailed < Integer.MAX_VALUE)
acquiresFailed++;
continue;
} catch (RuntimeException e) {
if (acquiresFailed == 0) {
getLog().error("quartzSchedulerThreadLoop: RuntimeException "
+e.getMessage(), e);
}
if (acquiresFailed < Integer.MAX_VALUE)
acquiresFailed++;
continue;
}
//4. 取出第一個(gè)觸發(fā)器阀圾,判斷是否達(dá)到觸發(fā)時(shí)間
if (triggers != null && !triggers.isEmpty()) {
now = System.currentTimeMillis();
long triggerTime = triggers.get(0).getNextFireTime().getTime();
long timeUntilTrigger = triggerTime - now;
while(timeUntilTrigger > 2) {
synchronized (sigLock) {
if (halted.get()) {
break;
}
if (!isCandidateNewTimeEarlierWithinReason(triggerTime, false)) {
try {
// we could have blocked a long while
// on 'synchronize', so we must recompute
now = System.currentTimeMillis();
timeUntilTrigger = triggerTime - now;
if(timeUntilTrigger >= 1)
sigLock.wait(timeUntilTrigger);
} catch (InterruptedException ignore) {
}
}
}
if(releaseIfScheduleChangedSignificantly(triggers, triggerTime)) {
break;
}
now = System.currentTimeMillis();
timeUntilTrigger = triggerTime - now;
}
// this happens if releaseIfScheduleChangedSignificantly decided to release triggers
if(triggers.isEmpty())
continue;
//---------5. 將觸發(fā)器封裝成TriggerFiredResult去執(zhí)行-----------------------------------
// set triggers to 'executing'
List<TriggerFiredResult> bndles = new ArrayList<TriggerFiredResult>();
boolean goAhead = true;
synchronized(sigLock) {
goAhead = !halted.get();
}
if(goAhead) {
try {
List<TriggerFiredResult> res = qsRsrcs.getJobStore().triggersFired(triggers);
if(res != null)
bndles = res;
} catch (SchedulerException se) {
qs.notifySchedulerListenersError(
"An error occurred while firing triggers '"
+ triggers + "'", se);
//QTZ-179 : a problem occurred interacting with the triggers from the db
//we release them and loop again
for (int i = 0; i < triggers.size(); i++) {
qsRsrcs.getJobStore().releaseAcquiredTrigger(triggers.get(i));
}
continue;
}
}
for (int i = 0; i < bndles.size(); i++) {
TriggerFiredResult result = bndles.get(i);
TriggerFiredBundle bndle = result.getTriggerFiredBundle();
Exception exception = result.getException();
if (exception instanceof RuntimeException) {
getLog().error("RuntimeException while firing trigger " + triggers.get(i), exception);
qsRsrcs.getJobStore().releaseAcquiredTrigger(triggers.get(i));
continue;
}
// it's possible to get 'null' if the triggers was paused,
// blocked, or other similar occurrences that prevent it being
// fired at this time... or if the scheduler was shutdown (halted)
if (bndle == null) {
qsRsrcs.getJobStore().releaseAcquiredTrigger(triggers.get(i));
continue;
}
//------------6. 創(chuàng)建JobRunShell-----------------
JobRunShell shell = null;
try {
shell = qsRsrcs.getJobRunShellFactory().createJobRunShell(bndle);
shell.initialize(qs);
} catch (SchedulerException se) {
qsRsrcs.getJobStore().triggeredJobComplete(triggers.get(i), bndle.getJobDetail(), CompletedExecutionInstruction.SET_ALL_JOB_TRIGGERS_ERROR);
continue;
}
//---------------7. 任務(wù)交給消費(fèi)者消費(fèi)---------------------------
if (qsRsrcs.getThreadPool().runInThread(shell) == false) {
// this case should never happen, as it is indicative of the
// scheduler being shutdown or a bug in the thread pool or
// a thread pool being used concurrently - which the docs
// say not to do...
getLog().error("ThreadPool.runInThread() return false!");
qsRsrcs.getJobStore().triggeredJobComplete(triggers.get(i), bndle.getJobDetail(), CompletedExecutionInstruction.SET_ALL_JOB_TRIGGERS_ERROR);
}
}
continue; // while (!halted)
}
} else { // if(availThreadCount > 0)
// should never happen, if threadPool.blockForAvailableThreads() follows contract
continue; // while (!halted)
}
long now = System.currentTimeMillis();
long waitTime = now + getRandomizedIdleWaitTime();
long timeUntilContinue = waitTime - now;
synchronized(sigLock) {
try {
if(!halted.get()) {
// QTZ-336 A job might have been completed in the mean time and we might have
// missed the scheduled changed signal by not waiting for the notify() yet
// Check that before waiting for too long in case this very job needs to be
// scheduled very soon
if (!isScheduleChanged()) {
sigLock.wait(timeUntilContinue);
}
}
} catch (InterruptedException ignore) {
}
}
} catch(RuntimeException re) {
getLog().error("Runtime error occurred in main trigger firing loop.", re);
}
} // while (!halted)
// drop references to scheduler stuff to aid garbage collection...
qs = null;
qsRsrcs = null;
}
QuartzSchedulerThread線程的run方法很長哪廓,也很復(fù)雜,需要分成幾段去看初烘,在代碼中我分了段并寫了注釋涡真。主要分為以下幾個(gè)步驟:
- 判斷調(diào)度器是否中斷
- 判讀消費(fèi)者的可用線程是否大于0,一直會(huì)返回true肾筐,blockForAvailableThreads方法會(huì)一直循環(huán)哆料,知道大于0時(shí)才會(huì)返回
- 獲取將要執(zhí)行的觸發(fā)器,這里會(huì)查詢將來30s過去60s的觸發(fā)器吗铐,為嘛會(huì)查詢過去的东亦,有時(shí)消費(fèi)者線程不足導(dǎo)致觸發(fā)器沒有執(zhí)行,就再執(zhí)行一次唬渗。
- 取出第一個(gè)觸發(fā)器典阵,判斷是否達(dá)到觸發(fā)時(shí)間,查詢出來的觸發(fā)器已經(jīng)排序了镊逝,所以第一個(gè)到時(shí)間了萄喳,其他的也會(huì)執(zhí)行
- 將觸發(fā)器封裝成TriggerFiredResult去執(zhí)行
- 把觸發(fā)器封裝成JobRunShell,JobRunShell實(shí)現(xiàn)了Runnable接口
- 交給消費(fèi)者去消費(fèi)
整體流程圖蹋半,圖片來源于https://segmentfault.com/a/1190000015492260
[圖片上傳失敗...(image-de53cb-1565345796409)]
2. WorkerThread線程
由上節(jié)知道他巨,生產(chǎn)者最終時(shí)交給消費(fèi)者線程池的runInThread去執(zhí)行任務(wù),首先看SimpleThreadPool的runInThread的方法
public boolean runInThread(Runnable runnable) {
if (runnable == null) {
return false;
}
synchronized (nextRunnableLock) {
handoffPending = true;
// Wait until a worker thread is available
while ((availWorkers.size() < 1) && !isShutdown) {
try {
nextRunnableLock.wait(500);
} catch (InterruptedException ignore) {
}
}
if (!isShutdown) {
WorkerThread wt = (WorkerThread)availWorkers.removeFirst();
busyWorkers.add(wt);
wt.run(runnable);
} else {
// If the thread pool is going down, execute the Runnable
// within a new additional worker thread (no thread from the pool).
WorkerThread wt = new WorkerThread(this, threadGroup,
"WorkerThread-LastJob", prio, isMakeThreadsDaemons(), runnable);
busyWorkers.add(wt);
workers.add(wt);
wt.start();
}
nextRunnableLock.notifyAll();
handoffPending = false;
}
return true;
}
runInThread方法主要是對線程池的操作减江,從可用線程中獲取線程染突,把獲取的線程加入到忙碌的線程列表中。然后再看WorkerThread的run方法:
@Override
public void run() {
boolean ran = false;
while (run.get()) {
try {
synchronized(lock) {
while (runnable == null && run.get()) {
lock.wait(500);
}
if (runnable != null) {
ran = true;
runnable.run();
}
}
} catch (InterruptedException unblock) {
// do nothing (loop will terminate if shutdown() was called
try {
getLog().error("Worker thread was interrupt()'ed.", unblock);
} catch(Exception e) {
// ignore to help with a tomcat glitch
}
} catch (Throwable exceptionInRunnable) {
try {
getLog().error("Error while executing the Runnable: ",
exceptionInRunnable);
} catch(Exception e) {
// ignore to help with a tomcat glitch
}
} finally {
synchronized(lock) {
runnable = null;
}
// repair the thread in case the runnable mucked it up...
if(getPriority() != tp.getThreadPriority()) {
setPriority(tp.getThreadPriority());
}
if (runOnce) {
run.set(false);
clearFromBusyWorkersList(this);
} else if(ran) {
ran = false;
makeAvailable(this);
}
}
}
//if (log.isDebugEnabled())
try {
getLog().debug("WorkerThread is shut down.");
} catch(Exception e) {
// ignore to help with a tomcat glitch
}
}
}
WorkerThread的run方法代碼比較多辈灼,大量的try-catch和日志代碼份企,把這些代碼去掉,真正就一句代碼 runnable.run();runnable指的是JobRunShell巡莹,繼續(xù)看JobRunShell的run方法
public void run() {
qs.addInternalSchedulerListener(this);
try {
OperableTrigger trigger = (OperableTrigger) jec.getTrigger();
JobDetail jobDetail = jec.getJobDetail();
do {
JobExecutionException jobExEx = null;
Job job = jec.getJobInstance();
//1. 空方法司志,交給子類實(shí)現(xiàn)
try {
begin();
} catch (SchedulerException se) {
qs.notifySchedulerListenersError("Error executing Job ("
+ jec.getJobDetail().getKey()
+ ": couldn't begin execution.", se);
break;
}
//2. 通知job和trigger監(jiān)聽器,任務(wù)開始執(zhí)行
// notify job & trigger listeners...
try {
if (!notifyListenersBeginning(jec)) {
break;
}
} catch(VetoedException ve) {
try {
CompletedExecutionInstruction instCode = trigger.executionComplete(jec, null);
qs.notifyJobStoreJobVetoed(trigger, jobDetail, instCode);
// QTZ-205
// Even if trigger got vetoed, we still needs to check to see if it's the trigger's finalized run or not.
if (jec.getTrigger().getNextFireTime() == null) {
qs.notifySchedulerListenersFinalized(jec.getTrigger());
}
complete(true);
} catch (SchedulerException se) {
qs.notifySchedulerListenersError("Error during veto of Job ("
+ jec.getJobDetail().getKey()
+ ": couldn't finalize execution.", se);
}
break;
}
//3. 執(zhí)行任務(wù)
long startTime = System.currentTimeMillis();
long endTime = startTime;
// execute the job
try {
log.debug("Calling execute on job " + jobDetail.getKey());
job.execute(jec);
endTime = System.currentTimeMillis();
} catch (JobExecutionException jee) {
endTime = System.currentTimeMillis();
jobExEx = jee;
getLog().info("Job " + jobDetail.getKey() +
" threw a JobExecutionException: ", jobExEx);
} catch (Throwable e) {
endTime = System.currentTimeMillis();
getLog().error("Job " + jobDetail.getKey() +
" threw an unhandled Exception: ", e);
SchedulerException se = new SchedulerException(
"Job threw an unhandled exception.", e);
qs.notifySchedulerListenersError("Job ("
+ jec.getJobDetail().getKey()
+ " threw an exception.", se);
jobExEx = new JobExecutionException(se, false);
}
jec.setJobRunTime(endTime - startTime);
//4. 通知job和trigger監(jiān)聽器任務(wù)執(zhí)行完成
// notify all job listeners
if (!notifyJobListenersComplete(jec, jobExEx)) {
break;
}
CompletedExecutionInstruction instCode = CompletedExecutionInstruction.NOOP;
// update the trigger
try {
instCode = trigger.executionComplete(jec, jobExEx);
} catch (Exception e) {
// If this happens, there's a bug in the trigger...
SchedulerException se = new SchedulerException(
"Trigger threw an unhandled exception.", e);
qs.notifySchedulerListenersError(
"Please report this error to the Quartz developers.",
se);
}
// notify all trigger listeners
if (!notifyTriggerListenersComplete(jec, instCode)) {
break;
}
// update job/trigger or re-execute job
if (instCode == CompletedExecutionInstruction.RE_EXECUTE_JOB) {
jec.incrementRefireCount();
try {
complete(false);
} catch (SchedulerException se) {
qs.notifySchedulerListenersError("Error executing Job ("
+ jec.getJobDetail().getKey()
+ ": couldn't finalize execution.", se);
}
continue;
}
//5. 空方法降宅,交給子類完成骂远,
try {
complete(true);
} catch (SchedulerException se) {
qs.notifySchedulerListenersError("Error executing Job ("
+ jec.getJobDetail().getKey()
+ ": couldn't finalize execution.", se);
continue;
}
qs.notifyJobStoreJobComplete(trigger, jobDetail, instCode);
break;
} while (true);
} finally {
qs.removeInternalSchedulerListener(this);
}
}
執(zhí)行流程:
- 空方法,交給子類實(shí)現(xiàn)腰根,方便擴(kuò)展激才,任務(wù)執(zhí)行開始
- 通知job和trigger監(jiān)聽器,任務(wù)開始執(zhí)行
- 執(zhí)行任務(wù)
- 通知job和trigger監(jiān)聽器任務(wù)執(zhí)行完成
- 空方法,交給子類完成瘸恼,任務(wù)執(zhí)行完成