今天看到一個(gè)quartz 類(lèi)的定義,一下子引起了我興趣飒筑。
public class FlowSetup extends QuartzJobBean {
public final Map<StackEnum, TaskFlow> flowMapping = new HashMap<>();
@Autowired
private FlowManager flowManager;
...
}
上面這個(gè)類(lèi)沒(méi)有定義任何annotation片吊, 為什么可以@Autowire ? 如果能autowire 說(shuō)明它是spring 管理的bean协屡,但是沒(méi)有被掃描到怎么成為bean 的呢俏脊?更進(jìn)一步的問(wèn)題就是quartz 是如何跟Spring boot 集成的。
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-quartz</artifactId>
</dependency>
利用starter的原理在
spring.factories里有 org.springframework.boot.autoconfigure.quartz.QuartzAutoConfiguration
則啟動(dòng)的時(shí)候就會(huì)加載QuartzAutoConfiguration肤晓,下面看下QuartzAutoConfiguration
@Configuration
@ConditionalOnClass({ Scheduler.class, SchedulerFactoryBean.class,
PlatformTransactionManager.class })
//加入QuartzProperties.class
@EnableConfigurationProperties(QuartzProperties.class)
@AutoConfigureAfter({ DataSourceAutoConfiguration.class,
HibernateJpaAutoConfiguration.class })
public class QuartzAutoConfiguration {
private final QuartzProperties properties;
//構(gòu)造器注入QuartzProperties
public QuartzAutoConfiguration(QuartzProperties properties,
ObjectProvider<SchedulerFactoryBeanCustomizer> customizers,
ObjectProvider<JobDetail[]> jobDetails,
ObjectProvider<Map<String, Calendar>> calendars,
ObjectProvider<Trigger[]> triggers, ApplicationContext applicationContext) {
this.properties = properties;
this.customizers = customizers;
this.jobDetails = jobDetails.getIfAvailable();
this.calendars = calendars.getIfAvailable();
this.triggers = triggers.getIfAvailable();
this.applicationContext = applicationContext;
}
//定義一個(gè)SchedulerFactoryBean爷贫,前提是沒(méi)有SchedulerFactoryBean
@Bean
@ConditionalOnMissingBean
public SchedulerFactoryBean quartzScheduler() {
//new 了一個(gè)SchedulerFactoryBean
SchedulerFactoryBean schedulerFactoryBean = new SchedulerFactoryBean();
SpringBeanJobFactory jobFactory = new SpringBeanJobFactory();
jobFactory.setApplicationContext(this.applicationContext);
schedulerFactoryBean.setJobFactory(jobFactory);
//這里可以定義很多quartz.properteis里的屬性
if (this.properties.getSchedulerName() != null) {
schedulerFactoryBean.setSchedulerName(this.properties.getSchedulerName());
}
schedulerFactoryBean.setAutoStartup(this.properties.isAutoStartup());
schedulerFactoryBean
.setStartupDelay((int) this.properties.getStartupDelay().getSeconds());
schedulerFactoryBean.setWaitForJobsToCompleteOnShutdown(
this.properties.isWaitForJobsToCompleteOnShutdown());
schedulerFactoryBean
.setOverwriteExistingJobs(this.properties.isOverwriteExistingJobs());
if (!this.properties.getProperties().isEmpty()) {
schedulerFactoryBean
.setQuartzProperties(asProperties(this.properties.getProperties()));
}
if (this.jobDetails != null && this.jobDetails.length > 0) {
schedulerFactoryBean.setJobDetails(this.jobDetails);
}
if (this.calendars != null && !this.calendars.isEmpty()) {
schedulerFactoryBean.setCalendars(this.calendars);
}
if (this.triggers != null && this.triggers.length > 0) {
schedulerFactoryBean.setTriggers(this.triggers);
}
customize(schedulerFactoryBean);
return schedulerFactoryBean;
}
}
首先定義了一個(gè)SchedulerFactoryBean, 可以看到是
implements FactoryBean补憾,也就是這是Scheduler 這個(gè)定義的生成者
public class SchedulerFactoryBean extends SchedulerAccessor implements FactoryBean<Scheduler>,
BeanNameAware, ApplicationContextAware, InitializingBean, DisposableBean, SmartLifecycle {
//getObject() 是FactoryBean<Scheduler>的接口方法
public Scheduler getObject() {
return this.scheduler;
}
}
FactoryBean和BeanFactory的區(qū)別漫萄,其實(shí)2者除了名字顛倒外,沒(méi)有必然的關(guān)系余蟹。BeanFactory是個(gè)工廠類(lèi)卷胯,顧名思義就是生產(chǎn)Bean的工廠。而FactoryBean是個(gè)生成BeanDefinition的類(lèi)威酒。
同時(shí)在QuartzAutoConfiguration中 通過(guò)inner Class SchedulerDependsOnBeanFactoryPostProcessor 進(jìn)行綁定
/**
* {@link AbstractDependsOnBeanFactoryPostProcessor} for Quartz {@link Scheduler} and
* {@link SchedulerFactoryBean}.
*/
private static class SchedulerDependsOnBeanFactoryPostProcessor extends AbstractDependsOnBeanFactoryPostProcessor {
SchedulerDependsOnBeanFactoryPostProcessor(Class<?>... dependencyTypes) {
super(Scheduler.class, SchedulerFactoryBean.class, dependencyTypes);
}
}
SchedulerDependsOnBeanFactoryPostProcessor在 QuartzSchedulerDependencyConfiguration中 進(jìn)行了bean 注冊(cè)窑睁。
@Configuration(proxyBeanMethods = false)
static class QuartzSchedulerDependencyConfiguration {
@Bean
static SchedulerDependsOnBeanFactoryPostProcessor quartzSchedulerDataSourceInitializerDependsOnBeanFactoryPostProcessor() {
return new SchedulerDependsOnBeanFactoryPostProcessor(QuartzDataSourceInitializer.class);
}
@Bean
@ConditionalOnBean(FlywayMigrationInitializer.class)
static SchedulerDependsOnBeanFactoryPostProcessor quartzSchedulerFlywayDependsOnBeanFactoryPostProcessor() {
return new SchedulerDependsOnBeanFactoryPostProcessor(FlywayMigrationInitializer.class);
}
@Configuration(proxyBeanMethods = false)
@ConditionalOnClass(SpringLiquibase.class)
static class LiquibaseQuartzSchedulerDependencyConfiguration {
@Bean
@ConditionalOnBean(SpringLiquibase.class)
static SchedulerDependsOnBeanFactoryPostProcessor quartzSchedulerLiquibaseDependsOnBeanFactoryPostProcessor() {
return new SchedulerDependsOnBeanFactoryPostProcessor(SpringLiquibase.class);
}
}
}
}
在QuartzAutoConfiguration 還配置了SchedulerFactoryBeanCustomizer, 為schedulerFactoryBean配置datasource
@Configuration(proxyBeanMethods = false)
@ConditionalOnSingleCandidate(DataSource.class)
@ConditionalOnProperty(prefix = "spring.quartz", name = "job-store-type", havingValue = "jdbc")
protected static class JdbcStoreTypeConfiguration {
@Bean
@Order(0)
public SchedulerFactoryBeanCustomizer dataSourceCustomizer(QuartzProperties properties, DataSource dataSource,
@QuartzDataSource ObjectProvider<DataSource> quartzDataSource,
ObjectProvider<PlatformTransactionManager> transactionManager,
@QuartzTransactionManager ObjectProvider<PlatformTransactionManager> quartzTransactionManager) {
return (schedulerFactoryBean) -> {
DataSource dataSourceToUse = getDataSource(dataSource, quartzDataSource);
schedulerFactoryBean.setDataSource(dataSourceToUse);
PlatformTransactionManager txManager = getTransactionManager(transactionManager,
quartzTransactionManager);
if (txManager != null) {
schedulerFactoryBean.setTransactionManager(txManager);
}
};
}
private DataSource getDataSource(DataSource dataSource, ObjectProvider<DataSource> quartzDataSource) {
DataSource dataSourceIfAvailable = quartzDataSource.getIfAvailable();
return (dataSourceIfAvailable != null) ? dataSourceIfAvailable : dataSource;
}
private PlatformTransactionManager getTransactionManager(
ObjectProvider<PlatformTransactionManager> transactionManager,
ObjectProvider<PlatformTransactionManager> quartzTransactionManager) {
PlatformTransactionManager transactionManagerIfAvailable = quartzTransactionManager.getIfAvailable();
return (transactionManagerIfAvailable != null) ? transactionManagerIfAvailable
: transactionManager.getIfUnique();
}
@Bean
@ConditionalOnMissingBean
public QuartzDataSourceInitializer quartzDataSourceInitializer(DataSource dataSource,
@QuartzDataSource ObjectProvider<DataSource> quartzDataSource, ResourceLoader resourceLoader,
QuartzProperties properties) {
DataSource dataSourceToUse = getDataSource(dataSource, quartzDataSource);
return new QuartzDataSourceInitializer(dataSourceToUse, resourceLoader, properties);
}
}
在customizers.orderedStream().forEach((customizer) -> customizer.customize(schedulerFactoryBean));
進(jìn)行配置葵孤。 Datasource 來(lái)自于DataSourceAutoConfiguration担钮,我們需要指定一個(gè)數(shù)據(jù)源,在application.yaml里
spring:
datasource:
url: jdbc:mysql://127.0.0.1:3306/MYSLQ_local?serverTimezone=GMT-7&useLegacyDatetimeCode=false
username: MYSLQ_local
password: MYSLQ_local_local
jpa:
database-platform: org.hibernate.dialect.MySQL8Dialect
在看@ConditionalOnProperty(prefix = "spring.quartz", name = "job-store-type", havingValue = "jdbc") 尤仍,也就是說(shuō)當(dāng)spring.quartz.job-store-type = jdbc 的時(shí)候這個(gè)配置生效箫津。 因此我們?cè)赼pplication.yaml里要配置
spring:
quartz:
properties:
org:
quartz:
jobStore:
tablePrefix: cpm_qrtz_
isClustered: true
class: org.quartz.impl.jdbcjobstore.JobStoreTX
driverDelegateClass: org.quartz.impl.jdbcjobstore.StdJDBCDelegate
useProperties: false
threadPool:
class: org.quartz.simpl.SimpleThreadPool
threadCount: 50
threadsInheritContextClassLoaderOfInitializingThread: true
scheduler:
instanceId: AUTO
job-store-type: jdbc
那么有人問(wèn)了,除了job-store-type,上面其他的配置沒(méi)看到什么時(shí)候放進(jìn)去的苏遥。怎么就可以用了呢, 首先看@EnableConfigurationProperties(QuartzProperties.class) 饼拍,它讀取了spring.quartz, 把他放入properties 對(duì)象理。
@ConfigurationProperties("spring.quartz")
public class QuartzProperties {
private final Map<String, String> properties = new HashMap<>();
public Map<String, String> getProperties() {
return this.properties;
}
}
同時(shí)這個(gè)properties 在QuartzAutoConfiguration.quartzScheduler 方法里被放入schedulerFactoryBean里
if (!properties.getProperties().isEmpty()) {
schedulerFactoryBean.setQuartzProperties(asProperties(properties.getProperties()));
}
Create Scheduler
當(dāng)代碼里有這種@Autowired注入Scheduler的時(shí)候田炭,如下
@Autowired
Scheduler scheduler
BeanFactory會(huì)根據(jù)name為Scheduler來(lái)獲取Scheduler,最終會(huì)通過(guò)SchedulerFactoryBean.getObject()
來(lái)得到Scheduler师抄。
通過(guò)分析得出FactoryBean.getObject()里得到的是this.scheduler.
public class SchedulerFactoryBean extends SchedulerAccessor implements FactoryBean<Scheduler>,
BeanNameAware, ApplicationContextAware, InitializingBean, DisposableBean, SmartLifecycle {
private Scheduler scheduler;
public Scheduler getObject() {
return this.scheduler;
}
}
但這個(gè)Scheduler 在哪里被構(gòu)造處理呢?SchedulerFactoryBean implements InitializingBean,在InitializingBean里有afterPropertiesSet()
public class SchedulerFactoryBean extends SchedulerAccessor implements FactoryBean<Scheduler>,
BeanNameAware, ApplicationContextAware, InitializingBean, DisposableBean, SmartLifecycle {
public void afterPropertiesSet() throws Exception {
...
// Initialize the Scheduler instance...
//這里初始化了Scheduler
this.scheduler = prepareScheduler(prepareSchedulerFactory());
try {
registerListeners();
registerJobsAndTriggers();
}
catch (Exception ex) {
...
}
}
}
bean 生命周期簡(jiǎn)單理解如下教硫,注意InitializingBean#afterPropertiesSet()所處的位置
->construcor
->initialization(各種autowired)
->BeanPostProcessor#postProcessBeforeInitialization
->@postConsruct 或 InitializingBean#afterPropertiesSet() 或 @Bean(initMethod="xxx")
->BeanPostProcessor#postProcessAfterInitialization
->@PreDestroy
看prepareSchedulerFactory()方法叨吮,最后返回了SchedulerFactory,賦值給了SchedulerFactoryBean的scheduler
public class SchedulerFactoryBean{
private Class<? extends SchedulerFactory> schedulerFactoryClass =
StdSchedulerFactory.class;
private SchedulerFactory prepareSchedulerFactory() throws SchedulerException, IOException {
private SchedulerFactory schedulerFactory;
//這里這個(gè)SchedulerFactory肯定為空瞬矩,當(dāng)然有辦法可以讓它不為空茶鉴,通過(guò)定義SchedulerFactoryBeanCustomizer來(lái)實(shí)現(xiàn)
SchedulerFactory schedulerFactory = this.schedulerFactory;
if (schedulerFactory == null) {
// Create local SchedulerFactory instance (typically a StdSchedulerFactory)
//這里也寫(xiě)了這里是實(shí)例化出StdSchedulerFactory
schedulerFactory = BeanUtils.instantiateClass(this.schedulerFactoryClass);
if (schedulerFactory instanceof StdSchedulerFactory) {
//調(diào)用initSchedulerFactory來(lái)填充StdSchedulerFactory)
//看過(guò)Quartz的官方demo,就知道StdSchedulerFactory用來(lái)生產(chǎn)出sheduler
initSchedulerFactory((StdSchedulerFactory) schedulerFactory);
}
else if (this.configLocation != null || this.quartzProperties != null ||
this.taskExecutor != null || this.dataSource != null) {
throw new IllegalArgumentException(
"StdSchedulerFactory required for applying Quartz properties: " + schedulerFactory);
}
// Otherwise, no local settings to be applied via StdSchedulerFactory.initialize(Properties)
}
// Otherwise, assume that externally provided factory has been initialized with appropriate settings
return schedulerFactory;
}
}
initSchedulerFactory主要是將配置信息配置到schedulerFactory里
private void initSchedulerFactory(StdSchedulerFactory schedulerFactory) throws SchedulerException, IOException {
Properties mergedProps = new Properties();
...
CollectionUtils.mergePropertiesIntoMap(this.quartzProperties, mergedProps);
...
//這里很重要景用,
//可以直接通過(guò) application.properties里的配置來(lái)配置quartz.properties里的配置spring.quartz.properties.xxx
schedulerFactory.initialize(mergedProps);
}
CollectionUtils.mergePropertiesIntoMap(this.quartzProperties, mergedProps);
把從application.yaml讀到的配置 覆蓋quartz 缺省的配置涵叮。從而是application.yaml 的配置生效。
prepareScheduler(prepareSchedulerFactory())
在一些列的調(diào)用后會(huì)到StdSchedulerFactory的private Scheduler instantiate()
伞插。這是個(gè)很長(zhǎng)的方法
围肥,但邏輯還算簡(jiǎn)單,各種初始化,下面只列出jobStore的配置
private Scheduler instantiate() throws SchedulerException {
//cfg 就是上個(gè)代碼片段里的mergedProps
if (cfg == null) {
initialize();
}
//jobstore,如果不配蜂怎,默認(rèn)是RAMJobStore
//在application.properties里可以配置為
//spring.quartz.properties.org.quartz.jobStore.class=org.quartz.impl.jdbcjobstore.JobStoreTX
String jsClass = cfg.getStringProperty(PROP_JOB_STORE_CLASS,
RAMJobStore.class.getName());
try {
js = (JobStore) loadHelper.loadClass(jsClass).newInstance();
} catch (Exception e) {
initException = new SchedulerException("JobStore class '" + jsClass
+ "' could not be instantiated.", e);
throw initException;
}
SchedulerDetailsSetter.setDetails(js, schedName, schedInstId);
//這里可以獲取到spring.quartz.properties.org.quartz.jobStore.xxx
//xxx是jobStore這個(gè)類(lèi)的里屬性,比如isClustered置尔,clusterCheckinInterval等
//在applicaiton.properties配置成spring.quartz.properties.org.quartz.jobStore.isClustered = true
tProps = cfg.getPropertyGroup(PROP_JOB_STORE_PREFIX, true, new String[] {PROP_JOB_STORE_LOCK_HANDLER_PREFIX});
try {
setBeanProps(js, tProps);
} catch (Exception e) {
initException = new SchedulerException("JobStore class '" + jsClass
+ "' props could not be configured.", e);
throw initException;
}
}
因此我們要配置集群就可以在application.properties配置成如下:
spring.quartz.properties.org.quartz.jobStore.class=org.springframework.scheduling.quartz.LocalDataSourceJobStore
spring.quartz.properties.org.quartz.jobStore.isClustered = true
spring.quartz.properties.org.quartz.jobStore.clusterCheckinInterval = 10000
spring.quartz.properties.org.quartz.scheduler.instanceId = AUTO
再來(lái)看prepareScheduler 方法
private Scheduler prepareScheduler(SchedulerFactory schedulerFactory) throws SchedulerException {
if (this.resourceLoader != null) {
// Make given ResourceLoader available for SchedulerFactory configuration.
configTimeResourceLoaderHolder.set(this.resourceLoader);
}
if (this.taskExecutor != null) {
// Make given TaskExecutor available for SchedulerFactory configuration.
configTimeTaskExecutorHolder.set(this.taskExecutor);
}
if (this.dataSource != null) {
// Make given DataSource available for SchedulerFactory configuration.
configTimeDataSourceHolder.set(this.dataSource);
}
if (this.nonTransactionalDataSource != null) {
// Make given non-transactional DataSource available for SchedulerFactory configuration.
configTimeNonTransactionalDataSourceHolder.set(this.nonTransactionalDataSource);
}
// Get Scheduler instance from SchedulerFactory.
try {
Scheduler scheduler = createScheduler(schedulerFactory, this.schedulerName);
populateSchedulerContext(scheduler);
if (!this.jobFactorySet && !(scheduler instanceof RemoteScheduler)) {
// Use AdaptableJobFactory as default for a local Scheduler, unless when
// explicitly given a null value through the "jobFactory" bean property.
this.jobFactory = new AdaptableJobFactory();
}
if (this.jobFactory != null) {
if (this.applicationContext != null && this.jobFactory instanceof ApplicationContextAware) {
((ApplicationContextAware) this.jobFactory).setApplicationContext(this.applicationContext);
}
if (this.jobFactory instanceof SchedulerContextAware) {
((SchedulerContextAware) this.jobFactory).setSchedulerContext(scheduler.getContext());
}
scheduler.setJobFactory(this.jobFactory);
}
return scheduler;
}
finally {
if (this.resourceLoader != null) {
configTimeResourceLoaderHolder.remove();
}
if (this.taskExecutor != null) {
configTimeTaskExecutorHolder.remove();
}
if (this.dataSource != null) {
configTimeDataSourceHolder.remove();
}
if (this.nonTransactionalDataSource != null) {
configTimeNonTransactionalDataSourceHolder.remove();
}
}
}
這里面最重要的就是createScheduler(schedulerFactory, this.schedulerName);
方法杠步。 從上文可以看出,這個(gè)shcedulerFactory的實(shí)例是StdSchedulerFactory榜轿。
protected Scheduler createScheduler(SchedulerFactory schedulerFactory, @Nullable String schedulerName)
throws SchedulerException {
// Override thread context ClassLoader to work around naive Quartz ClassLoadHelper loading.
Thread currentThread = Thread.currentThread();
ClassLoader threadContextClassLoader = currentThread.getContextClassLoader();
boolean overrideClassLoader = (this.resourceLoader != null &&
this.resourceLoader.getClassLoader() != threadContextClassLoader);
if (overrideClassLoader) {
currentThread.setContextClassLoader(this.resourceLoader.getClassLoader());
}
try {
SchedulerRepository repository = SchedulerRepository.getInstance();
synchronized (repository) {
Scheduler existingScheduler = (schedulerName != null ? repository.lookup(schedulerName) : null);
Scheduler newScheduler = schedulerFactory.getScheduler();
if (newScheduler == existingScheduler) {
throw new IllegalStateException("Active Scheduler of name '" + schedulerName + "' already registered " +
"in Quartz SchedulerRepository. Cannot create a new Spring-managed Scheduler of the same name!");
}
if (!this.exposeSchedulerInRepository) {
// Need to remove it in this case, since Quartz shares the Scheduler instance by default!
SchedulerRepository.getInstance().remove(newScheduler.getSchedulerName());
}
return newScheduler;
}
}
finally {
if (overrideClassLoader) {
// Reset original thread context ClassLoader.
currentThread.setContextClassLoader(threadContextClassLoader);
}
}
}
去看一下StdSchedulerFactory::getScheduler 方法
public Scheduler getScheduler() throws SchedulerException {
if (cfg == null) {
initialize();
}
SchedulerRepository schedRep = SchedulerRepository.getInstance();
Scheduler sched = schedRep.lookup(getSchedulerName());
if (sched != null) {
if (sched.isShutdown()) {
schedRep.remove(getSchedulerName());
} else {
return sched;
}
}
sched = instantiate();
return sched;
}
在看instantiate 方法,設(shè)置了啟動(dòng)的各種參數(shù)幽歼,比如threadpool jstore 的class 等等,這個(gè)方法超長(zhǎng)谬盐,就不仔細(xì)描述了甸私,但是最關(guān)鍵的是
qs = new QuartzScheduler(rsrcs, idleWaitTime, dbFailureRetry);
在 QuartzScheduler 構(gòu)造函數(shù)里, 啟動(dòng)了一個(gè)任務(wù)分配線程QuartzSchedulerThread
public QuartzScheduler(QuartzSchedulerResources resources, long idleWaitTime, @Deprecated long dbRetryInterval)
throws SchedulerException {
this.resources = resources;
if (resources.getJobStore() instanceof JobListener) {
addInternalJobListener((JobListener)resources.getJobStore());
}
this.schedThread = new QuartzSchedulerThread(this, resources);
ThreadExecutor schedThreadExecutor = resources.getThreadExecutor();
schedThreadExecutor.execute(this.schedThread);
if (idleWaitTime > 0) {
this.schedThread.setIdleWaitTime(idleWaitTime);
}
jobMgr = new ExecutingJobsManager();
addInternalJobListener(jobMgr);
errLogger = new ErrorLogger();
addInternalSchedulerListener(errLogger);
signaler = new SchedulerSignalerImpl(this, this.schedThread);
getLog().info("Quartz Scheduler v." + getVersion() + " created.");
}
QuartzSchedulerThread ::run 方法很長(zhǎng)飞傀, 但是我還是想把他全部列出來(lái)皇型。
@Override
public void run() {
int acquiresFailed = 0;
while (!halted.get()) {
try {
// check if we're supposed to pause...
synchronized (sigLock) {
while (paused && !halted.get()) {
try {
// wait until togglePause(false) is called...
sigLock.wait(1000L);
} catch (InterruptedException ignore) {
}
// reset failure counter when paused, so that we don't
// wait again after unpausing
acquiresFailed = 0;
}
if (halted.get()) {
break;
}
}
// wait a bit, if reading from job store is consistently
// failing (e.g. DB is down or restarting)..
if (acquiresFailed > 1) {
try {
long delay = computeDelayForRepeatedErrors(qsRsrcs.getJobStore(), acquiresFailed);
Thread.sleep(delay);
} catch (Exception ignore) {
}
}
int availThreadCount = qsRsrcs.getThreadPool().blockForAvailableThreads();
if(availThreadCount > 0) { // will always be true, due to semantics of blockForAvailableThreads...
List<OperableTrigger> triggers;
long now = System.currentTimeMillis();
clearSignaledSchedulingChange();
try {
triggers = qsRsrcs.getJobStore().acquireNextTriggers(
now + idleWaitTime, Math.min(availThreadCount, qsRsrcs.getMaxBatchSize()), qsRsrcs.getBatchTimeWindow());
acquiresFailed = 0;
if (log.isDebugEnabled())
log.debug("batch acquisition of " + (triggers == null ? 0 : triggers.size()) + " triggers");
} catch (JobPersistenceException jpe) {
if (acquiresFailed == 0) {
qs.notifySchedulerListenersError(
"An error occurred while scanning for the next triggers to fire.",
jpe);
}
if (acquiresFailed < Integer.MAX_VALUE)
acquiresFailed++;
continue;
} catch (RuntimeException e) {
if (acquiresFailed == 0) {
getLog().error("quartzSchedulerThreadLoop: RuntimeException "
+e.getMessage(), e);
}
if (acquiresFailed < Integer.MAX_VALUE)
acquiresFailed++;
continue;
}
if (triggers != null && !triggers.isEmpty()) {
now = System.currentTimeMillis();
long triggerTime = triggers.get(0).getNextFireTime().getTime();
long timeUntilTrigger = triggerTime - now;
while(timeUntilTrigger > 2) {
synchronized (sigLock) {
if (halted.get()) {
break;
}
if (!isCandidateNewTimeEarlierWithinReason(triggerTime, false)) {
try {
// we could have blocked a long while
// on 'synchronize', so we must recompute
now = System.currentTimeMillis();
timeUntilTrigger = triggerTime - now;
if(timeUntilTrigger >= 1)
sigLock.wait(timeUntilTrigger);
} catch (InterruptedException ignore) {
}
}
}
if(releaseIfScheduleChangedSignificantly(triggers, triggerTime)) {
break;
}
now = System.currentTimeMillis();
timeUntilTrigger = triggerTime - now;
}
// this happens if releaseIfScheduleChangedSignificantly decided to release triggers
if(triggers.isEmpty())
continue;
// set triggers to 'executing'
List<TriggerFiredResult> bndles = new ArrayList<TriggerFiredResult>();
boolean goAhead = true;
synchronized(sigLock) {
goAhead = !halted.get();
}
if(goAhead) {
try {
List<TriggerFiredResult> res = qsRsrcs.getJobStore().triggersFired(triggers);
if(res != null)
bndles = res;
} catch (SchedulerException se) {
qs.notifySchedulerListenersError(
"An error occurred while firing triggers '"
+ triggers + "'", se);
//QTZ-179 : a problem occurred interacting with the triggers from the db
//we release them and loop again
for (int i = 0; i < triggers.size(); i++) {
qsRsrcs.getJobStore().releaseAcquiredTrigger(triggers.get(i));
}
continue;
}
}
for (int i = 0; i < bndles.size(); i++) {
TriggerFiredResult result = bndles.get(i);
TriggerFiredBundle bndle = result.getTriggerFiredBundle();
Exception exception = result.getException();
if (exception instanceof RuntimeException) {
getLog().error("RuntimeException while firing trigger " + triggers.get(i), exception);
qsRsrcs.getJobStore().releaseAcquiredTrigger(triggers.get(i));
continue;
}
// it's possible to get 'null' if the triggers was paused,
// blocked, or other similar occurrences that prevent it being
// fired at this time... or if the scheduler was shutdown (halted)
if (bndle == null) {
qsRsrcs.getJobStore().releaseAcquiredTrigger(triggers.get(i));
continue;
}
JobRunShell shell = null;
try {
shell = qsRsrcs.getJobRunShellFactory().createJobRunShell(bndle);
shell.initialize(qs);
} catch (SchedulerException se) {
qsRsrcs.getJobStore().triggeredJobComplete(triggers.get(i), bndle.getJobDetail(), CompletedExecutionInstruction.SET_ALL_JOB_TRIGGERS_ERROR);
continue;
}
if (qsRsrcs.getThreadPool().runInThread(shell) == false) {
// this case should never happen, as it is indicative of the
// scheduler being shutdown or a bug in the thread pool or
// a thread pool being used concurrently - which the docs
// say not to do...
getLog().error("ThreadPool.runInThread() return false!");
qsRsrcs.getJobStore().triggeredJobComplete(triggers.get(i), bndle.getJobDetail(), CompletedExecutionInstruction.SET_ALL_JOB_TRIGGERS_ERROR);
}
}
continue; // while (!halted)
}
} else { // if(availThreadCount > 0)
// should never happen, if threadPool.blockForAvailableThreads() follows contract
continue; // while (!halted)
}
long now = System.currentTimeMillis();
long waitTime = now + getRandomizedIdleWaitTime();
long timeUntilContinue = waitTime - now;
synchronized(sigLock) {
try {
if(!halted.get()) {
// QTZ-336 A job might have been completed in the mean time and we might have
// missed the scheduled changed signal by not waiting for the notify() yet
// Check that before waiting for too long in case this very job needs to be
// scheduled very soon
if (!isScheduleChanged()) {
sigLock.wait(timeUntilContinue);
}
}
} catch (InterruptedException ignore) {
}
}
} catch(RuntimeException re) {
getLog().error("Runtime error occurred in main trigger firing loop.", re);
}
} // while (!halted)
// drop references to scheduler stuff to aid garbage collection...
qs = null;
qsRsrcs = null;
}
找到可以運(yùn)行的 job trigger
這個(gè)run 方法里,做幾點(diǎn)事情砸烦, 第一是找到可以運(yùn)行的 job trigger
try {
triggers = qsRsrcs.getJobStore().acquireNextTriggers(
now + idleWaitTime, Math.min(availThreadCount, qsRsrcs.getMaxBatchSize()), qsRsrcs.getBatchTimeWindow());
acquiresFailed = 0;
if (log.isDebugEnabled())
log.debug("batch acquisition of " + (triggers == null ? 0 : triggers.size()) + " triggers");
} catch (JobPersistenceException jpe) {
if (acquiresFailed == 0) {
qs.notifySchedulerListenersError(
"An error occurred while scanning for the next triggers to fire.",
jpe);
}
為啥quartz 一個(gè)job 只可以在一個(gè)機(jī)器上運(yùn)行啊 qsRsrcs.getJobStore().acquireNextTriggers主要就是這個(gè)方法
public List<OperableTrigger> acquireNextTriggers(final long noLaterThan, final int maxCount, final long timeWindow)
throws JobPersistenceException {
String lockName;
if(isAcquireTriggersWithinLock() || maxCount > 1) {
lockName = LOCK_TRIGGER_ACCESS;
} else {
lockName = null;
}
return executeInNonManagedTXLock(lockName,
new TransactionCallback<List<OperableTrigger>>() {
public List<OperableTrigger> execute(Connection conn) throws JobPersistenceException {
return acquireNextTrigger(conn, noLaterThan, maxCount, timeWindow);
}
},
new TransactionValidator<List<OperableTrigger>>() {
public Boolean validate(Connection conn, List<OperableTrigger> result) throws JobPersistenceException {
try {
List<FiredTriggerRecord> acquired = getDelegate().selectInstancesFiredTriggerRecords(conn, getInstanceId());
Set<String> fireInstanceIds = new HashSet<String>();
for (FiredTriggerRecord ft : acquired) {
fireInstanceIds.add(ft.getFireInstanceId());
}
for (OperableTrigger tr : result) {
if (fireInstanceIds.contains(tr.getFireInstanceId())) {
return true;
}
}
return false;
} catch (SQLException e) {
throw new JobPersistenceException("error validating trigger acquisition", e);
}
}
});
}
在這鎖的包裝下弃鸦,進(jìn)行了真正的acquireNextTrigger
return executeInNonManagedTXLock(lockName,
new TransactionCallback<List<OperableTrigger>>() {
public List<OperableTrigger> execute(Connection conn) throws JobPersistenceException {
return acquireNextTrigger(conn, noLaterThan, maxCount, timeWindow);
}
},
來(lái)看真正的數(shù)據(jù)庫(kù)邏輯
// FUTURE_TODO: this really ought to return something like a FiredTriggerBundle,
// so that the fireInstanceId doesn't have to be on the trigger...
protected List<OperableTrigger> acquireNextTrigger(Connection conn, long noLaterThan, int maxCount, long timeWindow)
throws JobPersistenceException {
if (timeWindow < 0) {
throw new IllegalArgumentException();
}
List<OperableTrigger> acquiredTriggers = new ArrayList<OperableTrigger>();
Set<JobKey> acquiredJobKeysForNoConcurrentExec = new HashSet<JobKey>();
final int MAX_DO_LOOP_RETRY = 3;
int currentLoopCount = 0;
do {
currentLoopCount ++;
try {
List<TriggerKey> keys = getDelegate().selectTriggerToAcquire(conn, noLaterThan + timeWindow, getMisfireTime(), maxCount);
// No trigger is ready to fire yet.
if (keys == null || keys.size() == 0)
return acquiredTriggers;
long batchEnd = noLaterThan;
for(TriggerKey triggerKey: keys) {
// If our trigger is no longer available, try a new one.
OperableTrigger nextTrigger = retrieveTrigger(conn, triggerKey);
if(nextTrigger == null) {
continue; // next trigger
}
// If trigger's job is set as @DisallowConcurrentExecution, and it has already been added to result, then
// put it back into the timeTriggers set and continue to search for next trigger.
JobKey jobKey = nextTrigger.getJobKey();
JobDetail job;
try {
job = retrieveJob(conn, jobKey);
} catch (JobPersistenceException jpe) {
try {
getLog().error("Error retrieving job, setting trigger state to ERROR.", jpe);
getDelegate().updateTriggerState(conn, triggerKey, STATE_ERROR);
} catch (SQLException sqle) {
getLog().error("Unable to set trigger state to ERROR.", sqle);
}
continue;
}
if (job.isConcurrentExectionDisallowed()) {
if (acquiredJobKeysForNoConcurrentExec.contains(jobKey)) {
continue; // next trigger
} else {
acquiredJobKeysForNoConcurrentExec.add(jobKey);
}
}
Date nextFireTime = nextTrigger.getNextFireTime();
// A trigger should not return NULL on nextFireTime when fetched from DB.
// But for whatever reason if we do have this (BAD trigger implementation or
// data?), we then should log a warning and continue to next trigger.
// User would need to manually fix these triggers from DB as they will not
// able to be clean up by Quartz since we are not returning it to be processed.
if (nextFireTime == null) {
log.warn("Trigger {} returned null on nextFireTime and yet still exists in DB!",
nextTrigger.getKey());
continue;
}
if (nextFireTime.getTime() > batchEnd) {
break;
}
// We now have a acquired trigger, let's add to return list.
// If our trigger was no longer in the expected state, try a new one.
int rowsUpdated = getDelegate().updateTriggerStateFromOtherState(conn, triggerKey, STATE_ACQUIRED, STATE_WAITING);
if (rowsUpdated <= 0) {
continue; // next trigger
}
nextTrigger.setFireInstanceId(getFiredTriggerRecordId());
getDelegate().insertFiredTrigger(conn, nextTrigger, STATE_ACQUIRED, null);
if(acquiredTriggers.isEmpty()) {
batchEnd = Math.max(nextFireTime.getTime(), System.currentTimeMillis()) + timeWindow;
}
acquiredTriggers.add(nextTrigger);
}
// if we didn't end up with any trigger to fire from that first
// batch, try again for another batch. We allow with a max retry count.
if(acquiredTriggers.size() == 0 && currentLoopCount < MAX_DO_LOOP_RETRY) {
continue;
}
// We are done with the while loop.
break;
} catch (Exception e) {
throw new JobPersistenceException(
"Couldn't acquire next trigger: " + e.getMessage(), e);
}
} while (true);
// Return the acquired trigger list
return acquiredTriggers;
}
主要看兩個(gè)方法
選擇一個(gè)trigger
List<TriggerKey> keys = getDelegate().selectTriggerToAcquire(conn, noLaterThan + timeWindow, getMisfireTime(), maxCount);
標(biāo)注trigger 被占用,不用被其他的instance選擇了
// We now have a acquired trigger, let's add to return list.
// If our trigger was no longer in the expected state, try a new one.
int rowsUpdated = getDelegate().updateTriggerStateFromOtherState(conn, triggerKey, STATE_ACQUIRED, STATE_WAITING);
if (rowsUpdated <= 0) {
continue; // next trigger
}
nextTrigger.setFireInstanceId(getFiredTriggerRecordId());
getDelegate().insertFiredTrigger(conn, nextTrigger, STATE_ACQUIRED, null);
運(yùn)行job
回到前文QuartzSchedulerThread的run 方法
for (int i = 0; i < bndles.size(); i++) {
TriggerFiredResult result = bndles.get(i);
TriggerFiredBundle bndle = result.getTriggerFiredBundle();
Exception exception = result.getException();
if (exception instanceof RuntimeException) {
getLog().error("RuntimeException while firing trigger " + triggers.get(i), exception);
qsRsrcs.getJobStore().releaseAcquiredTrigger(triggers.get(i));
continue;
}
// it's possible to get 'null' if the triggers was paused,
// blocked, or other similar occurrences that prevent it being
// fired at this time... or if the scheduler was shutdown (halted)
if (bndle == null) {
qsRsrcs.getJobStore().releaseAcquiredTrigger(triggers.get(i));
continue;
}
JobRunShell shell = null;
try {
shell = qsRsrcs.getJobRunShellFactory().createJobRunShell(bndle);
shell.initialize(qs);
} catch (SchedulerException se) {
qsRsrcs.getJobStore().triggeredJobComplete(triggers.get(i), bndle.getJobDetail(), CompletedExecutionInstruction.SET_ALL_JOB_TRIGGERS_ERROR);
continue;
}
if (qsRsrcs.getThreadPool().runInThread(shell) == false) {
// this case should never happen, as it is indicative of the
// scheduler being shutdown or a bug in the thread pool or
// a thread pool being used concurrently - which the docs
// say not to do...
getLog().error("ThreadPool.runInThread() return false!");
qsRsrcs.getJobStore().triggeredJobComplete(triggers.get(i), bndle.getJobDetail(), CompletedExecutionInstruction.SET_ALL_JOB_TRIGGERS_ERROR);
}
}
continue; // while (!halted)
}
主要是看下面這段幢痘, 去真正運(yùn)行一個(gè)job
JobRunShell shell = null;
try {
shell = qsRsrcs.getJobRunShellFactory().createJobRunShell(bndle);
shell.initialize(qs);
} catch (SchedulerException se) {
qsRsrcs.getJobStore().triggeredJobComplete(triggers.get(i), bndle.getJobDetail(), CompletedExecutionInstruction.SET_ALL_JOB_TRIGGERS_ERROR);
continue;
}
if (qsRsrcs.getThreadPool().runInThread(shell) == false) {
// this case should never happen, as it is indicative of the
// scheduler being shutdown or a bug in the thread pool or
// a thread pool being used concurrently - which the docs
// say not to do...
getLog().error("ThreadPool.runInThread() return false!");
qsRsrcs.getJobStore().triggeredJobComplete(triggers.get(i), bndle.getJobDetail(), CompletedExecutionInstruction.SET_ALL_JOB_TRIGGERS_ERROR);
}
咱們來(lái)看 shell.initialize(qs);
public void initialize(QuartzScheduler sched)
throws SchedulerException {
this.qs = sched;
Job job = null;
JobDetail jobDetail = firedTriggerBundle.getJobDetail();
try {
job = sched.getJobFactory().newJob(firedTriggerBundle, scheduler);
} catch (SchedulerException se) {
sched.notifySchedulerListenersError(
"An error occured instantiating job to be executed. job= '"
+ jobDetail.getKey() + "'", se);
throw se;
} catch (Throwable ncdfe) { // such as NoClassDefFoundError
SchedulerException se = new SchedulerException(
"Problem instantiating class '"
+ jobDetail.getJobClass().getName() + "' - ", ncdfe);
sched.notifySchedulerListenersError(
"An error occured instantiating job to be executed. job= '"
+ jobDetail.getKey() + "'", se);
throw se;
}
this.jec = new JobExecutionContextImpl(scheduler, firedTriggerBundle, job);
}
此時(shí)sched.getJobFactory()來(lái)生成一個(gè)job 唬格,這個(gè)jobFactory 是哪里set的? 在回到最開(kāi)始的源頭QuartzAutoConfiguration::quartzScheduler
SpringBeanJobFactory jobFactory = new SpringBeanJobFactory();
jobFactory.setApplicationContext(applicationContext);
schedulerFactoryBean.setJobFactory(jobFactory);
可以看到j(luò)obFactory 是SpringBeanJobFactory::createJobInstance
protected Object createJobInstance(TriggerFiredBundle bundle) throws Exception {
Object job = (this.applicationContext != null ?
this.applicationContext.getAutowireCapableBeanFactory().createBean(
bundle.getJobDetail().getJobClass(), AutowireCapableBeanFactory.AUTOWIRE_CONSTRUCTOR, false) :
super.createJobInstance(bundle));
if (isEligibleForPropertyPopulation(job)) {
BeanWrapper bw = PropertyAccessorFactory.forBeanPropertyAccess(job);
MutablePropertyValues pvs = new MutablePropertyValues();
if (this.schedulerContext != null) {
pvs.addPropertyValues(this.schedulerContext);
}
pvs.addPropertyValues(bundle.getJobDetail().getJobDataMap());
pvs.addPropertyValues(bundle.getTrigger().getJobDataMap());
if (this.ignoredUnknownProperties != null) {
for (String propName : this.ignoredUnknownProperties) {
if (pvs.contains(propName) && !bw.isWritableProperty(propName)) {
pvs.removePropertyValue(propName);
}
}
bw.setPropertyValues(pvs);
}
else {
bw.setPropertyValues(pvs, true);
}
}
return job;
}
可以看到它在第一行就把傳入的TriggerFiredBundle 的job class 變成了一個(gè)spring bean,到這里才恍然大悟购岗。 這也就是為什么在本文開(kāi)頭一個(gè)QuartzJobBean 沒(méi)加任何annotation汰聋,為什么還可以autowired對(duì)象。
當(dāng)qsRsrcs.getThreadPool().runInThread(shell)
時(shí)喊积, 就是真正的運(yùn)行job 了烹困。 后續(xù)還有一些邏輯, 比如release trigger 之類(lèi)的就不仔細(xì)描述了注服。 本文只是走一些quatrz 主要是如何spring boot 集成的韭邓。