Quartz的整體概括
什么是quartz
何為quartz,請看官網(wǎng)的說法:
Quartz is a richly featured, open source job scheduling library that can be integrated within virtually any Java application - from the smallest stand-alone application to the largest e-commerce system. Quartz can be used to create simple or complex schedules for executing tens, hundreds, or even tens-of-thousands of jobs; jobs whose tasks are defined as standard Java components that may execute virtually anything you may program them to do. The Quartz Scheduler includes many enterprise-class features, such as support for JTA transactions and clustering.
簡單來說,quartz
是一個開源任務調度庫,可以用來創(chuàng)建簡單或復雜的調度,低至十個多至數(shù)百萬個奈辰。它是一個標準的java
組件踊餐,支持JTA锅劝,集群等多種企業(yè)級功能。
市面上有很多定時任務框架在quartz的基礎上做了二次開發(fā)疫粥,xxl-job
(基于quartz)茬斧,elastic-job
(基于quartz和zk),所以quartz
到底是怎么玩的梗逮,它有哪些特性项秉,下面來聊一聊。
quartz的基本概念
- 任務(Job):實際要觸發(fā)的事件
- 觸發(fā)器(Trigger):用于設定時間規(guī)則
- 調度器(Scheduler):組合任務與觸發(fā)器
quartz
就這三樣東西慷彤,我們新建作業(yè)娄蔼,通過trigger
設置規(guī)則觸發(fā),由scheduler
進行整合底哗,非常簡單岁诉。
Springboot整合quartz的基礎搭建
一般企業(yè)級項目開發(fā)都用的Springboot
,下面就來講一講quartz
整合Springboot
的一些要點跋选。
依賴
quartz版本2.3.0涕癣,springboot版本1.5.18.RELEASE
<properties>
<java.version>1.8</java.version>
<druid.version>1.1.5</druid.version>
<quartz.version>2.3.0</quartz.version>
<fastjson.version>1.2.40</fastjson.version>
<mybatis.version>1.3.0</mybatis.version>
<log4j.version>1.2.16</log4j.version>
<slf4j-api.version>1.7.7</slf4j-api.version>
<slf4j-log4j12.version>1.7.7</slf4j-log4j12.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-jdbc</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>druid-spring-boot-starter</artifactId>
<version>${druid.version}</version>
</dependency>
<!--quartz相關依賴-->
<dependency>
<groupId>org.quartz-scheduler</groupId>
<artifactId>quartz</artifactId>
<version>${quartz.version}</version>
</dependency>
<dependency>
<groupId>org.quartz-scheduler</groupId>
<artifactId>quartz-jobs</artifactId>
<version>${quartz.version}</version>
</dependency>
<!--定時任務需要依賴context模塊-->
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context-support</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>${fastjson.version}</version>
</dependency>
<dependency>
<groupId>org.mybatis.spring.boot</groupId>
<artifactId>mybatis-spring-boot-starter</artifactId>
<version>${mybatis.version}</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>3.3.2</version>
</dependency>
<!-- log4j日志 -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>${slf4j-api.version}</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>${slf4j-log4j12.version}</version>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>${log4j.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-aop</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
Configuration
通過AutowireCapableBeanFactory
,使用spring
注入的方式實現(xiàn)在job
里注入spring
的bean
前标。
/**
* 繼承org.springframework.scheduling.quartz.SpringBeanJobFactory
* 實現(xiàn)任務實例化方式
*/
public static class AutowiringSpringBeanJobFactory extends SpringBeanJobFactory implements
ApplicationContextAware {
private transient AutowireCapableBeanFactory beanFactory;
@Override
public void setApplicationContext(final ApplicationContext context) {
beanFactory = context.getAutowireCapableBeanFactory();
}
/**
* 將job實例交給spring ioc托管
* 我們在job實例實現(xiàn)類內可以直接使用spring注入的調用被spring ioc管理的實例
*
* @param bundle
* @return
* @throws Exception
*/
@Override
protected Object createJobInstance(final TriggerFiredBundle bundle) throws Exception {
final Object job = super.createJobInstance(bundle);
/**
* 將job實例交付給spring ioc
*/
beanFactory.autowireBean(job);
return job;
}
}
/**
* 配置任務工廠實例
*
* @param applicationContext spring上下文實例
* @return
*/
@Bean
public JobFactory jobFactory(ApplicationContext applicationContext) {
/**
* 采用自定義任務工廠 整合spring實例來完成構建任務
* see {@link AutowiringSpringBeanJobFactory}
*/
AutowiringSpringBeanJobFactory jobFactory = new AutowiringSpringBeanJobFactory();
jobFactory.setApplicationContext(applicationContext);
return jobFactory;
}
/**
* 配置任務調度器
* 使用項目數(shù)據(jù)源作為quartz數(shù)據(jù)源
*
* @param jobFactory 自定義配置任務工廠
* @param dataSource 數(shù)據(jù)源實例
* @return
* @throws Exception
*/
@Bean(destroyMethod = "destroy", autowire = Autowire.NO)
public SchedulerFactoryBean schedulerFactoryBean(JobFactory jobFactory, DataSource dataSource) throws Exception {
SchedulerFactoryBean schedulerFactoryBean = new SchedulerFactoryBean();
//將spring管理job自定義工廠交由調度器維護
schedulerFactoryBean.setJobFactory(jobFactory);
//設置覆蓋已存在的任務
schedulerFactoryBean.setOverwriteExistingJobs(true);
//項目啟動完成后坠韩,等待2秒后開始執(zhí)行調度器初始化
schedulerFactoryBean.setStartupDelay(2);
//設置調度器自動運行
schedulerFactoryBean.setAutoStartup(true);
//設置數(shù)據(jù)源,使用與項目統(tǒng)一數(shù)據(jù)源
schedulerFactoryBean.setDataSource(dataSource);
//設置上下文spring bean name
schedulerFactoryBean.setApplicationContextSchedulerContextKey("applicationContext");
//設置配置文件位置
schedulerFactoryBean.setConfigLocation(new ClassPathResource("/quartz.properties"));
return schedulerFactoryBean;
}
這里需要提到一點候生,由于job
的初始化時是通過new出來的,不受spring
的管理绽昼,無法接受業(yè)務相關的bean
唯鸭,故這里使用AutowireCapableBeanFactory
實現(xiàn)了new
出來的對象通過注解可注入受spring
管理的bean
了。
AbstractAutowireCapableBeanFactory#autowireBean
@Override
public void autowireBean(Object existingBean) {
// Use non-singleton bean definition, to avoid registering bean as dependent bean.
RootBeanDefinition bd = new RootBeanDefinition(ClassUtils.getUserClass(existingBean));
bd.setScope(BeanDefinition.SCOPE_PROTOTYPE);
bd.allowCaching = ClassUtils.isCacheSafe(bd.getBeanClass(), getBeanClassLoader());
BeanWrapper bw = new BeanWrapperImpl(existingBean);
initBeanWrapper(bw);
populateBean(bd.getBeanClass().getName(), bd, bw);
}
由源碼可知硅确,此類調用了populateBean的方法用來裝配bean目溉。具體spring的bean的加載注冊過程可參考spring.io。
通過schedulerFactoryBean
的ConfigLocation
來讀取quartz
的基本配置信息菱农,注意quartz.properties
配置文件一定要放在classpath下缭付。
#調度器實例名稱
org.quartz.scheduler.instanceName = quartzScheduler
#調度器實例編號自動生成
org.quartz.scheduler.instanceId = AUTO
#持久化方式配置
org.quartz.jobStore.class = org.quartz.impl.jdbcjobstore.JobStoreTX
#持久化方式配置數(shù)據(jù)驅動,MySQL數(shù)據(jù)庫
org.quartz.jobStore.driverDelegateClass = org.quartz.impl.jdbcjobstore.StdJDBCDelegate
#quartz相關數(shù)據(jù)表前綴名
org.quartz.jobStore.tablePrefix = QRTZ_
#開啟分布式部署
org.quartz.jobStore.isClustered = true
#配置是否使用
org.quartz.jobStore.useProperties = false
#分布式節(jié)點有效性檢查時間間隔循未,單位:毫秒
org.quartz.jobStore.clusterCheckinInterval = 10000
#線程池實現(xiàn)類
org.quartz.threadPool.class = org.quartz.simpl.SimpleThreadPool
#執(zhí)行最大并發(fā)線程數(shù)量
org.quartz.threadPool.threadCount = 10
#線程優(yōu)先級
org.quartz.threadPool.threadPriority = 5
#配置為守護線程陷猫,設置后任務將不會執(zhí)行
#org.quartz.threadPool.makeThreadsDaemons=true
#配置是否啟動自動加載數(shù)據(jù)庫內的定時任務,默認true
org.quartz.threadPool.threadsInheritContextClassLoaderOfInitializingThread = true
我們看到org.quartz.jobStore.class
進行持久化配置設置成了JobStoreTX
屬性,需要建立數(shù)據(jù)庫表進行任務信息的持久化绣檬。其實官方還有一種RAMJobStore
用于存儲內存中的調度信息足陨,當進程終止時,所有調度信息都將丟失娇未。本文使用JobStoreTX
(需要建立quartz的大概10張表墨缘,建表語句傳在了github)。
Job&JobDetail
JobDetail
作為Job
的實例零抬,一般由靜態(tài)方法JobBuilder
創(chuàng)建镊讼,通過fluent
風格鏈式構建了Job的各項屬性,
其中newJob
需要一個泛型上限為Job
的入?yún)ⅰ?/p>
// 構建job信息
JobDetail job = JobBuilder.newJob(DynamicQuartzJob.class)
.withIdentity(jobKey) //jobName+jobGroup
.withDescription(quartzJobDetails.getDescription())
.usingJobData("jobData", quartzJobDetails.getJobData())
.build();
而Job接口只有一個簡單的方法:
public interface Job {
void execute(JobExecutionContext context)
throws JobExecutionException;
}
當定時任務跑起來的時候,execute
里的代碼將會被執(zhí)行平夜。
比如我們創(chuàng)建一個簡單的定時任務:
public class QuartzTest extends QuartzJobBean
static Logger logger = LoggerFactory.getLogger(QuartzTest.class);
{
@Override
protected void executeInternal(JobExecutionContext jobExecutionContext) throws JobExecutionException {
logger.info("我是測試任務蝶棋,我跑起來了,時間:{}",new Date());
}
注:QuartzJobBean是Job接口的實現(xiàn)類褥芒。
Trigger
JobDetail
由JobBuilder
類的靜態(tài)方法構建,同樣,Trigger
觸發(fā)器由TriggerBuilder
的靜態(tài)方法構建嚼松。
// 構建job的觸發(fā)規(guī)則 cronExpression
Trigger trigger = TriggerBuilder.newTrigger()
.withIdentity(triggerKey)
.startNow()
.withSchedule(CronScheduleBuilder
.cronSchedule(quartzJobDetails.getCronExpression()))
.build();
Trigger
觸發(fā)器用于觸發(fā)任務作業(yè),當trigger
觸發(fā)器觸發(fā)執(zhí)行時锰扶,scheduler
調度程序中的其中一個線程將調用execute()
的一個線程献酗。quartz
最常用的觸發(fā)器分為SimpleTrigger
和CronTrigger
觸發(fā)器兩種。
SimpleTrigger
用于在給定時間執(zhí)行一次作業(yè)坷牛,或給定時間每隔一段時間執(zhí)行一次作業(yè)罕偎。這個功能Springboot
的@scheduled
注解也能實現(xiàn)。
如果是希望以日歷時間表觸發(fā)京闰,則CronTrigger
就比較合適例如每周六下午3點執(zhí)行颜及,我們完全可以用cron表達式實現(xiàn)日歷觸發(fā)的時間規(guī)則,cron表達式可由quartzJobDetails
對象的CronExpression
屬性傳入蹂楣。
最后俏站,別忘了用scheduler
將job
和trigger
整合起來,因為他們是統(tǒng)一協(xié)作的:
// 注冊job和trigger信息
scheduler.scheduleJob(job, trigger);
JobDataMap
一般業(yè)務方法會要求動態(tài)傳參處理痊土,這時候就需要jobDataMap
來進行參數(shù)傳遞了肄扎。我們在構建JobDetail
的時候,通過
usingJobData("jobData", quartzJobDetails.getJobData())
動態(tài)傳入調度任務所需的參數(shù)赁酝,以達到業(yè)務需求犯祠。
JobListener&TriggerListener
用于在任務調度期間,各階段的狀態(tài)解讀酌呆。這里我就以JobListener
為例衡载,TriggerListener
也是相似的。
首先隙袁,構建jobListener
jobContext.getScheduler().getListenerManager().addJobListener(new MyJobListener(), KeyMatcher.keyEquals(jobKey));
這里我是在executeInternal
方法里面構建的痰娱,因為listner不會持久化弃榨,服務重啟將會丟失監(jiān)聽。當然在構建job的時候也可以注冊listener,如果沒持久化監(jiān)聽的需求的話猜揪。
看一下MyJobListener:
public class MyJobListener implements JobListener {
public static final String LISTENER_NAME = "MyJobListener";
// @Autowired
// private JobScheduleLogMapper logMapper;
@Override
public String getName() {
return LISTENER_NAME; //must return a name
}
//任務被調度前
@Override
public void jobToBeExecuted(JobExecutionContext context) {
String jobName = context.getJobDetail().getKey().toString();
// System.out.println("jobToBeExecuted");
System.out.println("Job調度前 : " + jobName + " is going to start...");
}
//任務調度被拒了
@Override
public void jobExecutionVetoed(JobExecutionContext context) {
System.out.println("Job調度被拒:jobExecutionVetoed");
//todo:原因捕獲
}
//任務被調度后
@Override
public void jobWasExecuted(JobExecutionContext context,
JobExecutionException jobException) {
// System.out.println("Job調度器:jobWasExecuted");
String jobName = context.getJobDetail().getKey().toString();
System.out.println("Job調度后 : " + jobName + " is finished...");
if (jobException!=null&&!jobException.getMessage().equals("")) {
System.out.println("Exception thrown by: " + jobName
+ " Exception: " + jobException.getMessage());
}
JobScheduleLog log = new JobScheduleLog();
log.setJobRuntime(String.valueOf(context.getJobRunTime()));
log.setId(Optional.ofNullable(context.get("id")).map(p->Integer.parseInt(String.valueOf(context.get("id")))).orElse(null));
JobScheduleLogMapper logMapper = SpringContextHolder.getBean(JobScheduleLogMapper.class);
logMapper.updateByPrimaryKeySelective(log);
}
}
任務調度前惭墓,調度后已經(jīng)任務被拒,我們都可以使用鉤子而姐。
動態(tài)構建任務調度
下一個問題腊凶,我們知道新建一個調度job只要繼承QuartzJobBean
類并實現(xiàn)executeInternal
就行,那么如果我有成百上千個任務拴念,難道我要新建幾千個類么钧萍?如果我想把已有的方法加入定時任務調度,難道我還要去改造原有的方法么政鼠?
必然不是的风瘦,這時候我們可以新建一個動態(tài)類繼承QuartzJobBean
,并新建自己的業(yè)務表(例如建一個jobCaller
表),傳入項目方法的全類路徑公般,這樣我們就可以executeInternal方法里通過讀表拉取需要調度的任務方法万搔,通過jobDataMap
拿到參數(shù),通過反射直接invoke
目標方法了官帘,這樣就省去了大量的構建調度任務的工作了瞬雹,并且可以在不動原有業(yè)務代碼的基礎上,定向指定任何一個方法加入任務調度了刽虹。
ok,talk is cheap, show me the code:
public class DynamicQuartzJob extends QuartzJobBean {
@Autowired
private JobScheduleLogManager jobManager;
@Override
protected void executeInternal(JobExecutionContext jobContext) {
try {
int i = jobManager.trans2JobLogBefore(jobContext);
if (i <= 0) return;
JobDetailImpl jobDetail = (JobDetailImpl) jobContext.getJobDetail();
String name = jobDetail.getName();
if (StringUtils.isEmpty(name)) {
throw new JobExecutionException("can not find service info, because desription is empty");
}
//注冊job和trigger的監(jiān)聽器
JobKey jobKey = jobContext.getJobDetail().getKey();
TriggerKey triggerKey = jobContext.getTrigger().getKey();
jobContext.getScheduler().getListenerManager().addJobListener(new MyJobListener(), KeyMatcher.keyEquals(jobKey));
jobContext.getScheduler().getListenerManager().addTriggerListener(new MyTriggerListener(), KeyMatcher.keyEquals(triggerKey));
String[] serviceInfo = StringUtils.delimitedListToStringArray(name, ".");
// serviceInfo[0] is JOB_NAME_PREFIX
String beanName = serviceInfo[1];
String methodName = serviceInfo[2];
Object serviceImpl = getApplicationContext(jobContext).getBean(beanName);
Method method;
Class<?>[] parameterTypes = new Class[]{String.class};
Object[] arguments = null;
method = serviceImpl.getClass().getMethod(methodName, parameterTypes);
method.invoke(serviceImpl, jobContext.getJobDetail().getJobDataMap().getString("jobData"));
jobManager.trans2JobLogAfter(jobContext, i);
} catch (Exception ex) {
ErrorLog.errorConvertJson(ApplicationContextWare.getAppName(), LogTreadLocal.getTrackingNo(), this.getClass(), "quartz定時任務execute異常", ex);
}
}
這里方法簽名參數(shù)我設定了一個String類型的形參酗捌,其實可以在添加任務到jobCaller
表的時候帶上參數(shù),executeInternal
的時候讀表拉取方法簽名涌哲。當然也可以傳一個大json胖缤,目標方法自己解析。
最佳實踐
這里我新建了一張job_caller
表阀圾,用于記錄我的jobName
(類名.方法名)哪廓,jobGroup
(沒有就默認),jobData
(jobDatamap)初烘,以及cron
表達式涡真。
可以看到我們傳入的時間規(guī)則是每隔10秒執(zhí)行一次,調度的是HelloService
的sayHello()
方法账月,傳入的參數(shù)是xgj111111.
看一下HelloService的sayHello()方法做了什么:
@Component
public class HelloService {
public void sayHello(String a) {
System.out.println(a+"======hello world, i am quartz");
}
public void callHello(String b) {
System.out.println(b+"======call");
}
}
ok,只是簡單的打印综膀,來看看效果:
每隔10秒(時間打印忘加了~)澳迫,sayHello
都將會被執(zhí)行局齿,并且監(jiān)聽器能捕獲到各個階段。
單節(jié)點服務重啟調度恢復
由于任務是持久化在表里的橄登,在服務重啟后抓歼,quartz
仍然可以去恢復調度任務讥此,并且能夠預先執(zhí)行misfire
的任務,這里就不演示了谣妻,很簡單的萄喳。
多節(jié)點分布式調度漂移
這個就比較有意思了,在多個節(jié)點調度確定的任務時蹋半,分布式環(huán)境下他巨,某個節(jié)點宕機,這個節(jié)點調度的作業(yè)能否自動漂移到其他節(jié)點减江?
在quartz.properties
里染突,org.quartz.jobStore.isClustered
開啟了分布式的配置,此屬性設置為true辈灼,quartz
將使用ClusterManager
來初始化節(jié)點份企。
基于上一個調度HelloService#sayHello
,我們再新增一個調度用于調用HelloService#callHello
,同時新增一個quartz
節(jié)點巡莹。(為何第二個節(jié)點能調度callHello
司志?==>基于quartz
的負載均衡),如圖:
啟動兩個服務降宅,分別監(jiān)聽在8123
和8124
端口:
8123
調度的是callHello
8124
調度的是sayHello
這時候骂远,我們把8123
服務停掉,看看8124
的調度情況:
停止8123
服務:
這時候可以發(fā)現(xiàn)钉鸯,8124
將8123
的任務接管過來了:
于是可以得出結論:在分布式場景下吧史,當quartz集群的某一臺服務宕機,其所調度的任務將被其他服務接管唠雕,所以quartz
是支持任務漂移的贸营。
那么如果這時候,我再講8123
起來會是什么情況呢岩睁?聰明的我和你應該都想到了钞脂,它由繼續(xù)接管callHello
的任務調度了。
quartz的缺陷
- 強依賴于各節(jié)點的系統(tǒng)時間捕儒,多節(jié)點系統(tǒng)時間不一致將會出現(xiàn)調度紊亂的情況
- 容易造成數(shù)據(jù)庫死鎖(一個任務只能由一個線程來調度冰啃,這是由
quartz_lock
表的行鎖來實現(xiàn)的,可以通過設置數(shù)據(jù)庫事務級別來解決,不過也有說設置了也出現(xiàn)deadlock的)
以上是我的一些基本見解和嘗試刘莹。
代碼已上傳至GitHub:https://github.com/xugejunllt/quartz-framework