在此梳理一下項(xiàng)目中用到的關(guān)于quartz的知識(shí):
1韭寸、Spring提供的類(lèi)
(1)SchedulerFactoryBean
(2)Job相關(guān)的類(lèi):Job執(zhí)行任務(wù)的邏輯需要自己寫(xiě),既然用了spring荆隘,自然要使用spring提供的Job相關(guān)的類(lèi)。有兩個(gè):MethodInvokingJobDetailFactoryBean和QuartzJobBean晶渠。其中MethodInvokingJobDetailFactoryBean不支持存儲(chǔ)到數(shù)據(jù)庫(kù)凰荚,會(huì)報(bào)java.io.NotSerializableException,遂放棄褒脯。
2、并發(fā)控制
官方文檔提供了一種并發(fā)控制方法:@DisallowConcurrentExecution
該限制僅針對(duì)于JobDetail番川,同一時(shí)刻僅允許執(zhí)行一個(gè)JobDetail颁督,但可以并發(fā)執(zhí)行多個(gè)Job類(lèi)的不同實(shí)例。也就是如果用Job構(gòu)建了多個(gè)JobDetail屿讽,如JobDetail1吠裆,JobDetail2,JobDetail3试疙,那么這3個(gè)JobDetail還是并發(fā)執(zhí)行的。
根據(jù)org.quartz.threadPool.threadCount配置的線程個(gè)數(shù) 和 org.quartz.threadPool.class配置的線程類(lèi)執(zhí)行自己寫(xiě)的邏輯。
3柱徙、數(shù)據(jù)持久化
quartz提供兩種持久化類(lèi)型:RAMJobStore和JDBC JobStore
RAMJobStore持久化到內(nèi)存,重啟應(yīng)用后任務(wù)丟失护侮。
JDBC JobStore可以持久化到數(shù)據(jù)庫(kù)羊初,重啟后任務(wù)依然存在。
下載官網(wǎng)提供的quartz-2.2.3-distribution.tar.gz包长赞,quartz\quartz-2.2.3\docs\dbTables提供了各種數(shù)據(jù)庫(kù)的腳本,建表得哆,quartz.properties文件中配置jobStore類(lèi)型贩据,代理類(lèi)和數(shù)據(jù)源闸餐。同時(shí)在配置文件中指定quartz.properties文件的位置舍沙。
4剔宪、動(dòng)態(tài)管理任務(wù)
(1)增加:
scheduler.scheduleJob(jobDetail, trigger);
(2)刪除:
scheduler.pauseTrigger(TriggerKey.triggerKey(testSuite.getName(), project.getName()));
scheduler.unscheduleJob(TriggerKey.triggerKey(testSuite.getName(), project.getName()));
scheduler.deleteJob(JobKey.jobKey(testSuite.getName(), project.getName()));
5、執(zhí)行的狀態(tài)
執(zhí)行狀態(tài)存放在qrtz_triggers表的trigger_state字段和媳,源碼中完整的狀態(tài)有:WAITING,ACQUIRED妖谴,EXECUTING,COMPLETE,BLOCKED硬梁,ERROR胞得,PAUSED,PAUSED_BLOCKED跃巡。配置文件中配置的JobStore是JobStoreTX牧愁,但是狀態(tài)變化的相關(guān)代碼都在JobStoreSupport類(lèi)中猪半,JobStoreSupport調(diào)用配置的Delegate拼接sql語(yǔ)句,完成狀態(tài)變化沽甥。
從源碼中可以看出乏奥,acquired狀態(tài)表示已經(jīng)獲得的,在job自定義邏輯之前執(zhí)行盏檐。
其他網(wǎng)友整理的狀態(tài)變化圖:
6胡野、自定義Job類(lèi)中使用spring管理的service
Job繼承spring提供的類(lèi)QuartzJobBean,竟然不能直接注入自己寫(xiě)的service龙巨。原因是Quartz初始化是自己的JobContext熊响,不同于Spring的ApplicationContext汗茄,所以無(wú)法直接注入。后來(lái)找到一種解決辦法递览,在構(gòu)建SchedulerFactoryBean的時(shí)候存放到map中瞳腌。Job中使用時(shí)再取出來(lái)嫂侍。
@Bean(name = "schedulerFactory")
public SchedulerFactoryBean schedulerFactoryBean() throws IOException {
SchedulerFactoryBean factory = new SchedulerFactoryBean();
factory.setQuartzProperties(quartzProperties());
// 把用到的job類(lèi)中用到的service,dao等傳給他纯命,用@Autowired注解無(wú)法注入
Map<String, Object> springBeanMap = new HashMap<String, Object>();
springBeanMap.put("testngService", testngService);
springBeanMap.put("quartzService", quartzService);
springBeanMap.put("triggerDao", triggerDao);
factory.setSchedulerContextAsMap(springBeanMap);
factory.setWaitForJobsToCompleteOnShutdown(true);
return factory;
}
@Override
protected void executeInternal(JobExecutionContext context) throws JobExecutionException {
JobDataMap dataMap = context.getJobDetail().getJobDataMap();
try {
testngService = (TestngService)context.getScheduler().getContext().get("testngService");
} catch (SchedulerException e) {
e.printStackTrace();
logger.error(e.getMessage());
}
}
7、監(jiān)聽(tīng)器
quartz提供了TriggerListeners瞭空、JobListeners和SchedulerListeners咆畏,使用方法在quartz\quartz-2.2.3\examples中有,很詳細(xì)溺健。
注意:經(jīng)過(guò)測(cè)試钮蛛,監(jiān)聽(tīng)器在運(yùn)行過(guò)程中動(dòng)態(tài)注冊(cè),第一次注冊(cè)可用岭辣,重啟后失效沦童。
8、總結(jié):這次學(xué)習(xí)從0開(kāi)始到應(yīng)用到項(xiàng)目中墩瞳,幫助最大的是官方提供的example代碼氏豌、源代碼和說(shuō)明文檔,在理解這些的基礎(chǔ)上瞭吃,學(xué)習(xí)一些優(yōu)秀的博客歪架,總結(jié)如下:
中文說(shuō)明文檔:https://www.w3cschool.cn/quartz_doc/quartz_doc-lwuv2d2a.html
增刪改查:http://snailxr.iteye.com/blog/2076903#comments
并發(fā):http://blog.csdn.net/will_awoke/article/details/38921273
https://www.cnblogs.com/Rozdy/p/4220186.html
http://www.blogjava.net/stevenjohn/archive/2015/07/26/426425.html
集群:http://www.importnew.com/22896.html
http://soulshard.iteye.com/blog/337886
https://tech.meituan.com/mt-crm-quartz.html
核心概念:http://blog.csdn.net/guolong1983811/article/details/51501346
http://blog.csdn.net/beliefer/article/details/51578546
https://www.cnblogs.com/pzy4447/p/5201674.html
問(wèn)題:http://blog.csdn.net/jackylovesjava/article/details/50044271
9和蚪、以下是一些代碼
9.1烹棉、完整的配置 quartz.properites
# Default Properties file for use by StdSchedulerFactory
# to create a Quartz Scheduler Instance, if a different
# properties file is not explicitly specified.
#
org.quartz.scheduler.instanceName: DefaultQuartzScheduler
org.quartz.scheduler.rmi.export: false
org.quartz.scheduler.rmi.proxy: false
org.quartz.scheduler.wrapJobExecutionInUserTransaction: false
#線程池相關(guān)配置
org.quartz.threadPool.class: org.quartz.simpl.SimpleThreadPool
org.quartz.threadPool.threadCount: 10
org.quartz.threadPool.threadPriority: 5
org.quartz.threadPool.threadsInheritContextClassLoaderOfInitializingThread: true
#錯(cuò)過(guò)執(zhí)行時(shí)間設(shè)置
#org.quartz.jobStore.misfireThreshold: 60000
#quartz信息持久化到oracle數(shù)據(jù)庫(kù)
org.quartz.jobStore.class: org.quartz.impl.jdbcjobstore.JobStoreTX
org.quartz.jobStore.driverDelegateClass: org.quartz.impl.jdbcjobstore.StdJDBCDelegate
org.quartz.jobStore.useProperties: false
org.quartz.jobStore.dataSource: myDS
org.quartz.jobStore.tablePrefix: QRTZ_
org.quartz.jobStore.isClustered: false
#數(shù)據(jù)庫(kù)連接參數(shù)
org.quartz.dataSource.myDS.driver: oracle.jdbc.driver.OracleDriver
org.quartz.dataSource.myDS.URL: jdbc:oracle:thin:@10.10.52.14:1521:wxkfdb
org.quartz.dataSource.myDS.user: autotesting
org.quartz.dataSource.myDS.password: test
org.quartz.dataSource.myDS.maxConnections: 5
9.2浆洗、quartz整合spring boot的配置類(lèi)
@Configuration
public class QuartzCofig {
@Autowired
private TestngService testngService;
@Autowired
private TriggerDao triggerDao;
@Autowired
private QuartzService quartzService;
@Bean(name = "schedulerFactory")
public SchedulerFactoryBean schedulerFactoryBean() throws IOException {
SchedulerFactoryBean factory = new SchedulerFactoryBean();
factory.setQuartzProperties(quartzProperties());
// 把用到的job類(lèi)中用到的service伏社,dao等傳給他,用@Autowired注解無(wú)法注入
Map<String, Object> springBeanMap = new HashMap<String, Object>();
springBeanMap.put("testngService", testngService);
springBeanMap.put("quartzService", quartzService);
springBeanMap.put("triggerDao", triggerDao);
factory.setSchedulerContextAsMap(springBeanMap);
factory.setWaitForJobsToCompleteOnShutdown(true);
return factory;
}
@Bean
public Properties quartzProperties() throws IOException {
PropertiesFactoryBean propertiesFactoryBean = new PropertiesFactoryBean();
// 指定quart.properties文件位置
propertiesFactoryBean.setLocation(new ClassPathResource("/quartz.properties"));
//在quartz.properties中的屬性被讀取并注入后再初始化對(duì)象
propertiesFactoryBean.afterPropertiesSet();
return propertiesFactoryBean.getObject();
}
/*
* 通過(guò)SchedulerFactoryBean獲取Scheduler的實(shí)例
* name不能設(shè)置為scheduler,否則QuartzService里注入的不是此處定義的scheduler
*/
@Bean(name="myScheduler")
public Scheduler scheduler() throws IOException {
System.out.println("schedulerFactoryBean().getScheduler():" + schedulerFactoryBean().getScheduler());
return schedulerFactoryBean().getScheduler();
}
}
9.3聪黎、自定義的Job邏輯
@Configuration
@Component
@PersistJobDataAfterExecution
@DisallowConcurrentExecution
public class ScheduleJob extends QuartzJobBean {
private Logger logger = LoggerFactory.getLogger(ScheduleJob.class);
@Override
protected void executeInternal(JobExecutionContext context) throws JobExecutionException {
JobDataMap dataMap = context.getJobDetail().getJobDataMap();
TestSuite testSuite = (TestSuite)dataMap.get("testSuite");
Project project = (Project) dataMap.get("project");
TestngService testngService = null;
try {
testngService = (TestngService)context.getScheduler().getContext().get("testngService");
} catch (SchedulerException e) {
e.printStackTrace();
logger.error(e.getMessage());
}
logger.info("---" + context.getJobDetail().getKey() + "想要執(zhí)行---");
testngService.run(testSuite, project);
}
}
9.4、增加露泊,刪除service
@Service("quartzService")
public class QuartzService {
@Resource(name = "myScheduler")
private Scheduler scheduler;
@Autowired
private TriggerDao triggerDao;
@Autowired
private ProcessDao processDao;
@Autowired
private ApplicationContext applicationContext;
private Logger logger = LoggerFactory.getLogger(QuartzService.class);
/**
* 增加或修改一個(gè)job
* @param testSuite
* @param project
*/
public void addJob(TestSuite testSuite, Project project) {
String runTime = testSuite.getRuntime();
if(!StringUtils.isEmpty(testSuite.getRuntime())) {
// 生成一個(gè)triggerKey
String testSuiteName = testSuite.getName();
String projectName = project.getName();
JobDataMap jobDataMap = new JobDataMap();
jobDataMap.put("testSuite", testSuite);
jobDataMap.put("project", project);
JobDetail jobDetail = JobBuilder.newJob(ScheduleJob.class)
.withIdentity(testSuiteName, projectName)
.usingJobData(jobDataMap)
.build();
// 向Job傳值
// jobDetail.getJobDataMap().put("testSuite", testSuite);
// jobDetail.getJobDataMap().put("project", project);
TestSuite testsuite = (TestSuite) jobDetail.getJobDataMap().get("testSuite");
// misfire處理:上一個(gè)job執(zhí)行結(jié)束滤淳,立即執(zhí)行這個(gè)
CronScheduleBuilder scheduleBuilder = CronScheduleBuilder.cronSchedule(runTime);
// .withMisfireHandlingInstructionFireAndProceed();
CronTrigger trigger = TriggerBuilder.newTrigger()
.withIdentity(testSuiteName, projectName)
.withSchedule(scheduleBuilder)
.build();
try {
scheduler.scheduleJob(jobDetail, trigger);
} catch (SchedulerException e) {
e.printStackTrace();
}
Trigger triggerInsert = new Trigger(project.getName() + "." + testSuite.getName(),
TriggerStateConstant.WAITING, null, project.getProjectid());
triggerDao.insertOne(triggerInsert);
}
}
/**
* 刪除job
* @param testSuite
* @param project
*/
public void deleteJob(TestSuite testSuite, Project project) {
try {
scheduler.pauseTrigger(TriggerKey.triggerKey(testSuite.getName(), project.getName()));
scheduler.unscheduleJob(TriggerKey.triggerKey(testSuite.getName(), project.getName()));
scheduler.deleteJob(JobKey.jobKey(testSuite.getName(), project.getName()));
// 刪除sttrigger表的記錄
triggerDao.deleteByTriggerId(project.getName() + "." + testSuite.getName());
} catch (SchedulerException e) {
e.printStackTrace();
logger.error(e.getMessage());
}
}
/**
* 判斷該project下是否有trigger觸發(fā)脖咐,如果有返回true
* @param projectId
* @return
*/
public boolean hasTriggerFired(String projectId) {
List<Trigger> triggerList = triggerDao.findByProjectId(projectId);
if(CollectionUtils.isEmpty(triggerList)) {
return true;
} else {
return false;
}
}
/**
* 獲取可以執(zhí)行的process
* @param projectId
* @return
*/
public Process getAvaliableProcess(String projectId) {
// 獲取該project下所有process
List<String> processIdList = processDao.findIdByProjectId(projectId);
// 獲取正在執(zhí)行中的testsuite對(duì)應(yīng)的processid
List<String> executingProcssIdList = triggerDao.findExecutingProcess(projectId);
List<String> differentList = new ArrayList<>();
if(!CollectionUtils.isEmpty(executingProcssIdList)) {
Set<String> processIdSet = new HashSet<>();
processIdSet.addAll(processIdList);
Set<String> executingProcssIdSet = new HashSet<String>();
executingProcssIdSet.addAll(executingProcssIdList);
// 取差集
Set<String> differentSet = CommonUtil.getDifferentSet(processIdSet, executingProcssIdSet);
differentList.addAll(differentSet);
} else {
differentList = processIdList;
}
// 隨機(jī)獲取一個(gè)Process
if(differentList.size() != 0) {
String randomProcessId = CommonUtil.getRandom(differentList);
return processDao.findByProcessId(randomProcessId);
} else {
return null;
}
}
}
9.5、資源調(diào)度的單例類(lèi)产弹。
(1)用單例模式的原因:要保證每個(gè)Job執(zhí)行的過(guò)程中獲得的ProcessResource類(lèi)的對(duì)象是同一個(gè)對(duì)象痰哨,map 是同一個(gè)map,否則有多個(gè)map的話早抠,使用的就不是同一份資源了撬讽。
public class ProcessResource {
private ProcessDao processDao = (ProcessDao) SpringUtil.getBean("processDao");
private Map<String, LinkedList<Process>> map = new HashMap<String, LinkedList<Process>>();
private static ProcessResource instance = null;
private Object lock = new Object();
private Logger logger = LoggerFactory.getLogger(ProcessResource.class);
private ProcessResource() {
if (instance != null) {
return;
}
}
public static ProcessResource getInstance() {
if (instance == null) {
synchronized (ProcessResource.class) {
if (instance == null) {
instance = new ProcessResource();
instance.init();
}
}
}
return instance;
}
public void init() {
List<String> projectIdList = processDao.findProjectId();
for(String projectId : projectIdList) {
LinkedList<Process> list = processDao.findByProjectId(projectId);
map.put(projectId, list);
}
instance.setMap(map);
}
public Process getProcess(String projectId) {
synchronized (lock) {
LinkedList<Process> list = instance.getMap().get(projectId);
// 判斷l(xiāng)ist中是否有元素游昼,如果有,返回, 如果沒(méi)有载庭,打印信息
if(!CollectionUtils.isEmpty(list)) {
return list.removeFirst();
} else {
logger.info("ProcessResource中沒(méi)有可用的Process了....");
return null;
}
}
}
/**
* 釋放資源
* @param process
* @param projectId
*/
public void releaseProcess(Process process, String projectId) {
LinkedList<Process> list = instance.getMap().get(projectId);
// 判斷l(xiāng)ist中是否有元素廊佩,如果有罐寨,返回, 如果沒(méi)有序矩,打印信息
list.addLast(process);
}
public void setMap(Map<String, LinkedList<Process>> map) {
this.map = map;
}
public Map<String, LinkedList<Process>> getMap() {
return map;
}
}