scheduler.start() 調(diào)用 .QuartzScheduler.start();
Quartz 的啟動要調(diào)用start()方法進(jìn)行線程的啟動隙畜,并執(zhí)行需要出發(fā)的Trigger证鸥,start方法里面進(jìn)行的操作:
- 1疙驾、啟動的初始化
- 2摆霉、判斷是否集群,對應(yīng)不同的操作
- 3菠隆、若是非集群譬重,首先有恢復(fù)機(jī)制,恢復(fù)任何失敗或misfire的作業(yè)簿训,并根據(jù)需要清理數(shù)據(jù)存儲咱娶。
- 4、初始化線程管理强品,喚醒所有等待的線程膘侮!
線程中啟動線程是調(diào)用start()方法,但是真正執(zhí)行線程任務(wù)的操作在run()中的榛!
開啟 scheduler調(diào)用start()
實際上調(diào)用QuartzScheduler對象的start();
下面就是簡單的源碼分析:
/**
* QuartzScheduler 的start方法
* @throws SchedulerException
*/
public void start() throws SchedulerException {
if (shuttingDown|| closed) {
throw new SchedulerException(
"The Scheduler cannot be restarted after shutdown() has been called.");
}
// QTZ-212 : calling new schedulerStarting() method on the listeners
// right after entering start()
notifySchedulerListenersStarting();
//初始化標(biāo)識為null琼了,進(jìn)行初始化操作
if (initialStart == null) {
initialStart = new Date();
//1 主要分析的地方
this.resources.getJobStore().schedulerStarted();
startPlugins();
} else {
//2 如果已經(jīng)初始化過,則恢復(fù)jobStore
resources.getJobStore().schedulerResumed();
}
//3 喚醒所有等待的線程
schedThread.togglePause(false);
getLog().info(
"Scheduler " + resources.getUniqueIdentifier() + " started.");
notifySchedulerListenersStarted();
啟動調(diào)度任務(wù)
this.resources.getJobStore().schedulerStarted()
;
主要分析的地方,實際上是調(diào)用 QuartzSchedulerResources
中的JobStore
進(jìn)行啟動,JobStore
是接口夫晌,本例子的實現(xiàn)類是org.quartz.impl.jdbcjobstore.JobStoreSupport
看下面代碼:
/**
* org.quartz.impl.jdbcjobstore.JobStoreSupport 調(diào)度開啟
* @throws SchedulerException
*/
public void schedulerStarted() throws SchedulerException {
//是集群
if (isClustered()) {
clusterManagementThread = new ClusterManager();
if(initializersLoader != null)
clusterManagementThread.setContextClassLoader(initializersLoader);
clusterManagementThread.initialize();
} else {//不是集群
try {
//1表伦、恢復(fù)job
recoverJobs();
} catch (SchedulerException se) {
throw new SchedulerConfigException(
"Failure occured during job recovery.", se);
}
}
misfireHandler = new MisfireHandler();
if(initializersLoader != null)
misfireHandler.setContextClassLoader(initializersLoader);
//2谦去、 獲取ThreadExecutor 線程管理
misfireHandler.initialize();
schedulerRunning = true;
getLog().debug("JobStore background threads started (as scheduler was started).");
}
1 、恢復(fù)job recoverJobs();
/**
* 啟動的時候 有一個恢復(fù)機(jī)制
* recoverJobs ----- 將恢復(fù)任何失敗或misfire的作業(yè)蹦哼,并根據(jù)需要清理數(shù)據(jù)存儲鳄哭。
* @throws JobPersistenceException
*/
protected void recoverJobs() throws JobPersistenceException {
executeInNonManagedTXLock(
LOCK_TRIGGER_ACCESS,
new VoidTransactionCallback() {
public void executeVoid(Connection conn) throws JobPersistenceException {
recoverJobs(conn);//恢復(fù)job
}
}, null);
}
我們繼續(xù)往下看
/**
* 恢復(fù)任務(wù)狀態(tài)
* @param conn
* @throws JobPersistenceException
*/
protected void recoverJobs(Connection conn) throws JobPersistenceException {
try {
//1.更新不一致的作業(yè)狀態(tài) 先修改狀態(tài),將 ACQUIRED 和 BLOCKED ---> WAITING
int rows = getDelegate().updateTriggerStatesFromOtherStates(conn,
STATE_WAITING, STATE_ACQUIRED, STATE_BLOCKED);
rows += getDelegate().updateTriggerStatesFromOtherStates(conn,
STATE_PAUSED, STATE_PAUSED_BLOCKED, STATE_PAUSED_BLOCKED);
//----更新sql---
//"UPDATE {0}TRIGGERS SET TRIGGER_STATE = ? WHERE SCHED_NAME = {1} AND (TRIGGER_STATE = ? OR TRIGGER_STATE = ?)"
getLog().info(
"Freed " + rows
+ " triggers from 'acquired' / 'blocked' state.");
// clean up misfired jobs
//1.1 清理misfire的jobs
recoverMisfiredJobs(conn, true);
// recover jobs marked for recovery that were not fully executed
//1.2 恢復(fù)未完全執(zhí)行的標(biāo)記為恢復(fù)的作業(yè) --查詢 qrtz_fire_trigger
List<OperableTrigger> recoveringJobTriggers = getDelegate()
.selectTriggersForRecoveringJobs(conn);
getLog().info("Recovering " + recoveringJobTriggers.size() + " jobs that were in-progress at the time of the last shut-down.");
for (OperableTrigger recoveringJobTrigger: recoveringJobTriggers) {
if (jobExists(conn, recoveringJobTrigger.getJobKey())) {
recoveringJobTrigger.computeFirstFireTime(null);
storeTrigger(conn, recoveringJobTrigger, null, false,
STATE_WAITING, false, true);
}
}
getLog().info("Recovery complete.");
// remove lingering 'complete' triggers...
//1.3 移除state == complete
List<TriggerKey> cts = getDelegate().selectTriggersInState(conn, STATE_COMPLETE);
for(TriggerKey ct: cts) {
removeTrigger(conn, ct);
}
getLog().info(
"Removed " + cts.size() + " 'complete' triggers.");
// clean up any fired trigger entries
//1.4 清理任何已觸發(fā)的觸發(fā)器條目
int n = getDelegate().deleteFiredTriggers(conn);
getLog().info("Removed " + n + " stale fired job entries.");
} catch (JobPersistenceException e) {
throw e;
} catch (Exception e) {
throw new JobPersistenceException("Couldn't recover jobs: " + e.getMessage(), e);
}
}
1.1 清理misfire的jobs recoverMisfiredJobs(conn, true);
/**
* 清理misfire的jobs
* @param conn
* @param recovering
* @return
* @throws JobPersistenceException
* @throws SQLException
*/
protected RecoverMisfiredJobsResult recoverMisfiredJobs(Connection conn, boolean recovering) throws JobPersistenceException, SQLException {
int maxMisfiresToHandleAtATime = recovering ? -1 : this.getMaxMisfiresToHandleAtATime();
List<TriggerKey> misfiredTriggers = new LinkedList();
long earliestNewTime = Long.MAX_VALUE;
/**
* 是否有misfire的Trigger,我們必須仍然尋找MISFIRED狀態(tài)纲熏,以防觸發(fā)器被遺忘
* getMisfireTime() 當(dāng)前時間 -(減去) 一分鐘 ,maxMisfiresToHandleAtATime == -1 ,misfiredTriggers== null
*
* "SELECT TRIGGER_NAME, TRIGGER_GROUP FROM {0}TRIGGERS WHERE SCHED_NAME = {1} AND NOT (MISFIRE_INSTR = -1) AND NEXT_FIRE_TIME < ? AND TRIGGER_STATE = ? ORDER BY NEXT_FIRE_TIME ASC, PRIORITY DESC"
*
* 上面sql查詢出來結(jié)果是個list
* (aa1)若resultList.size() == count 返回 TRUEW鼻稹! 否則 返回false局劲!
* (aa2)不等于 count 勺拣,封裝數(shù)據(jù),到resultList中鱼填,triggername TriggerGroup
*/
boolean hasMoreMisfiredTriggers = this.getDelegate().hasMisfiredTriggersInState(conn, "WAITING", this.getMisfireTime(), maxMisfiresToHandleAtATime, misfiredTriggers);
if (hasMoreMisfiredTriggers) {
this.getLog().info("Handling the first " + misfiredTriggers.size() + " triggers that missed their scheduled fire-time. " + "More misfired triggers remain to be processed.");
} else {
if (misfiredTriggers.size() <= 0) {
this.getLog().debug("Found 0 triggers that missed their scheduled fire-time.");
return RecoverMisfiredJobsResult.NO_OP;
}
this.getLog().info("Handling " + misfiredTriggers.size() + " trigger(s) that missed their scheduled fire-time.");
}
Iterator i$ = misfiredTriggers.iterator();
//循環(huán) misfiredTriggers List集合
while(i$.hasNext()) {
TriggerKey triggerKey = (TriggerKey)i$.next();
//retrieveTrigger 药有,檢索Trigger,檢索到進(jìn)行數(shù)據(jù)封裝
/**
* //retrieveTrigger 執(zhí)行的操作
* (1)"SELECT * FROM {0}TRIGGERS WHERE SCHED_NAME = {1} AND TRIGGER_NAME = ? AND TRIGGER_GROUP = ?"
* (2)關(guān)聯(lián)Trigger對應(yīng)的類型苹丸,如simpleTrigger等
*/
OperableTrigger trig = this.retrieveTrigger(conn, triggerKey);
if (trig != null) {
//do 更新misfire的觸發(fā)器
this.doUpdateOfMisfiredTrigger(conn, trig, false, "WAITING", recovering);
if (trig.getNextFireTime() != null && trig.getNextFireTime().getTime() < earliestNewTime) {
earliestNewTime = trig.getNextFireTime().getTime();
}
}
}
return new RecoverMisfiredJobsResult(hasMoreMisfiredTriggers, misfiredTriggers.size(), earliestNewTime);
}
1.2 更新misfire的觸發(fā)器 doUpdateOfMisfiredTrigger(conn, trig, false, "WAITING", recovering);
/**
* 更新misfire的觸發(fā)器
* @param conn
* @param trig
* @param forceState
* @param newStateIfNotComplete
* @param recovering
* @throws JobPersistenceException
*/
private void doUpdateOfMisfiredTrigger(Connection conn, OperableTrigger trig, boolean forceState, String newStateIfNotComplete, boolean recovering) throws JobPersistenceException {
Calendar cal = null;
if (trig.getCalendarName() != null) {
/**
* 操作這個表qrtz_calendar
*/
cal = this.retrieveCalendar(conn, trig.getCalendarName());
}
this.schedSignaler.notifyTriggerListenersMisfired(trig);
//simpleTrigger默認(rèn)的misfire 機(jī)制,設(shè)置下次執(zhí)行的時間(next_fire_time)為當(dāng)前時間愤惰!這里比較重要!W咐怼宦言!
trig.updateAfterMisfire(cal);
if (trig.getNextFireTime() == null) {
this.storeTrigger(conn, trig, (JobDetail)null, true, "COMPLETE", forceState, recovering);
this.schedSignaler.notifySchedulerListenersFinalized(trig);
} else {
this.storeTrigger(conn, trig, (JobDetail)null, true, newStateIfNotComplete, forceState, recovering);
}
}
1.3、 恢復(fù)未完全執(zhí)行的標(biāo)記為恢復(fù)的作業(yè)selectTriggersForRecoveringJobs
,代碼片段如下
List<OperableTrigger> recoveringJobTriggers = getDelegate()
.selectTriggersForRecoveringJobs(conn);
// INSTANCE_NAME == dufy_test REQUESTS_RECOVERY == true 實際封裝到數(shù)據(jù)庫查詢是 REQUESTS_RECOVERY== 1
"SELECT * FROM {0}FIRED_TRIGGERS WHERE SCHED_NAME = {1} AND INSTANCE_NAME = ? AND REQUESTS_RECOVERY = ?"
//具體怎么是 true是怎么轉(zhuǎn)換成為 1的見附1圖片商模!
Recovery complete.恢復(fù)完成5焱!
1.4 移除state == complete
List<TriggerKey> cts = getDelegate().selectTriggersInState(conn, STATE_COMPLETE);
-----------------------------------------------------------------------------
"SELECT TRIGGER_NAME, TRIGGER_GROUP FROM {0}TRIGGERS WHERE SCHED_NAME = {1} AND TRIGGER_STATE = ?"
-----------------------------------------------------------------------------
for(TriggerKey ct: cts) {
removeTrigger(conn, ct);
---------------------------------------------------------------------
(a)刪除前施流,先查詢jobDetail
JobDetail job = getDelegate().selectJobForTrigger(conn,getClassLoadHelper(), key, false);
"SELECT J.JOB_NAME, J.JOB_GROUP, J.IS_DURABLE, J.JOB_CLASS_NAME, J.REQUESTS_RECOVERY FROM {0}TRIGGERS T, {0}JOB_DETAILS J WHERE T.SCHED_NAME = {1} AND J.SCHED_NAME = {1} AND T.TRIGGER_NAME = ? AND T.TRIGGER_GROUP = ? AND T.JOB_NAME = J.JOB_NAME AND T.JOB_GROUP = J.JOB_GROUP"
(b)刪除觸發(fā)器响疚,其偵聽器及其Simple / Cron / BLOB子表條目。
boolean removedTrigger = deleteTriggerAndChildren(conn, key);
deleteTrigger(Connection conn, TriggerKey triggerKey)
(b1)deleteTriggerExtension
"DELETE FROM {0}SIMPLE_TRIGGERS WHERE SCHED_NAME = {1} AND TRIGGER_NAME = ? AND TRIGGER_GROUP = ?"
"DELETE FROM {0}BLOB_TRIGGERS WHERE SCHED_NAME = {1} AND TRIGGER_NAME = ? AND TRIGGER_GROUP = ?"
(b2)"DELETE FROM {0}TRIGGERS WHERE SCHED_NAME = {1} AND TRIGGER_NAME = ? AND TRIGGER_GROUP = ?"
(c)是否刪除jobdetail 瞪醋,判斷 isDurable 默認(rèn) 為false稽寒。
if (null != job && !job.isDurable()) {
int numTriggers = getDelegate().selectNumTriggersForJob(conn,
job.getKey());
---------------------------------------------------------
"SELECT COUNT(TRIGGER_NAME) FROM {0}TRIGGERS WHERE SCHED_NAME = {1} AND JOB_NAME = ? AND JOB_GROUP = ?"
---------------------------------------------------------
if (numTriggers == 0) {
// Don't call removeJob() because we don't want to check for
// triggers again.
//不要調(diào)用removeJob(),因為我們不想再次檢查觸發(fā)器趟章。
deleteJobAndChildren(conn, job.getKey()); //刪除作業(yè)及其偵聽器。
-----------------------------------------------------
//deleteJobDetail(Connection conn, JobKey jobKey) 刪除給定作業(yè)的作業(yè)明細(xì)記錄慎王。
"DELETE FROM {0}JOB_DETAILS WHERE SCHED_NAME = {1} AND JOB_NAME = ? AND JOB_GROUP = ?"
-----------------------------------------------------
}
}
}
1.5 清理任何已觸發(fā)的觸發(fā)器條目
int n = getDelegate().deleteFiredTriggers(conn);
----------------------------------------------------------------------------
"DELETE FROM {0}FIRED_TRIGGERS WHERE SCHED_NAME = {1}"
----------------------------------------------------------------------------
2蚓土、獲取ThreadExecutor 線程管理 misfireHandler.initialize();
public void initialize() {
ThreadExecutor executor = getThreadExecutor();
//getThreadExecutor == private ThreadExecutor threadExecutor = new DefaultThreadExecutor();
executor.execute(MisfireHandler.this); //啟動線程執(zhí)行 對應(yīng)job的 execute方法
//MisfireHandler == class MisfireHandler extends Thread 繼承了Thread
}
如果已經(jīng)初始化過,則恢復(fù)
jobStoreresources.getJobStore().schedulerResumed();.如果已經(jīng)初始化過赖淤,則恢復(fù)調(diào)度器運行 !
private volatile boolean schedulerRunning = false;//默認(rèn)schedulerRunning = false
public void schedulerResumed() {
schedulerRunning = true;
}
喚醒所有等待的線程
schedThread.togglePause(false);
schedThread.togglePause(false);
//指示主處理循環(huán)在下一個可能的點暫停蜀漆。
void togglePause(boolean pause) {
synchronized (sigLock) {
paused = pause;
if (paused) {
signalSchedulingChange(0);
------------------------------------------
//發(fā)信號通知主要處理循環(huán),已經(jīng)進(jìn)行了調(diào)度的改變 - 以便中斷在等待misfire時間到達(dá)時可能發(fā)生的任何睡眠咱旱。
public void signalSchedulingChange(long candidateNewNextFireTime) {
synchronized(sigLock) {
signaled = true;
signaledNextFireTime = candidateNewNextFireTime;
sigLock.notifyAll(); // private final Object sigLock = new Object();
}
}
------------------------------------------
} else {
sigLock.notifyAll();//喚醒所有等待的線程
}
}
}