背景
開發(fā)Java應(yīng)用經(jīng)常會需要用到單機定時任務(wù)暑认,這個時候一般我們會采用分布式任務(wù)調(diào)度中間件來解決問題。典型的分布式任務(wù)調(diào)度中間件大审,比如淘寶的tbschedule蘸际,當(dāng)當(dāng)?shù)膃lastic-job,唯品會的saturn徒扶。京東沒做開源粮彤,大概率是用了tbschedule。
但是姜骡,分布式任務(wù)調(diào)度中間件往往是基于一個zookeeper集群來做任務(wù)調(diào)度的导坟。?如果zookeeper集群出了問題,任務(wù)調(diào)度就掛街了圈澈。
這個時候其實可以直接基于quartz來做調(diào)度惫周,這也是本文要說的事情。
QuartzTask
首先定義一個任務(wù)接口康栈,我們希望一個任務(wù)實現(xiàn)下面這些方法:
public interface QuartzTask {
/**
* 任務(wù)執(zhí)行的代碼
*/
void executeTask();
/**
* 指定執(zhí)行任務(wù)的機器ip
*
* @return String
*/
String getTargetIP();
/**
* crond表達式
*
* @return String
*/
String getCrond();
/**
* 任務(wù)唯一名稱
*
* @return String
*/
String getJobName();
}
EmptyTask
為了解釋清楚如何使用quartz來做單機任務(wù)調(diào)度递递,我們新建一個空白的任務(wù)EmptyTask并實現(xiàn)QuartzTask定義的方法:
@Service("emptyTask")
public class EmptyTask implements QuartzTask {
private static final Logger logger = LoggerFactory.getLogger(EmptyTask.class);
public void executeTask() {
logger.warn("emptyTask running");
}
@Override
public String getTargetIP() {
return "127.0.0.1";
}
@Override
public String getCrond() {
return "0 0/1 * * * ?";
}
@Override
public String getJobName() {
return "emptyTask";
}
}
targetIP即可以寫死成一個字符串,也可以從配置服務(wù)谅将、第三方接口漾狼,數(shù)據(jù)庫等數(shù)據(jù)源動態(tài)的獲取。
crond表達式可以在 http://cron.qqe2.com/ 動態(tài)生成饥臂。
BeanPostProcessor
BeanPostProcessor是Spring中一個很實用的接口逊躁。BeanPostProcessor提供了兩個方法:
public interface BeanPostProcessor {
Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException;
Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException;
}
postProcessBeforeInitialization:Bean 實例化之前進行的處理
postProcessAfterInitialization: Bean 實例化之后進行的處理
使用BeanPostProcessor也很簡單,只要實現(xiàn)這個接口隅熙,并重載這兩個方法就可以了稽煤。
SchedulerManager
利用BeanPostProcessor提供的特性,我們可以在所有bean初始化完成之后做一些事情囚戚。于是就有了SchedulerManager類:
public class SchedulerManager extends SchedulerFactoryBean implements BeanPostProcessor, InitializingBean {
private Scheduler scheduler;
public void afterPropertiesSet() throws Exception {
super.afterPropertiesSet();
this.scheduler = this.getScheduler();
}
public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException {
return bean;
}
public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
try {
if (QuartzTask.class.isInstance(bean)) {
QuartzTask quartzTask = QuartzTask.class.cast(bean);
CronScheduleBuilder scheduleBuilder = CronScheduleBuilder.cronSchedule(quartzTask.getCrond());
Trigger newTrigger = TriggerBuilder.newTrigger()
.withIdentity(quartzTask.getJobName(), "DEFAULT").withSchedule(scheduleBuilder).build();
//需要一個中間job類來處理酵熙,中間類會去實際執(zhí)行任務(wù)
JobDetail jobDetail = JobBuilder.newJob(ScheduleTaskExecutor.class)
.withIdentity(quartzTask.getJobName(), "DEFAULT").build();
jobDetail.getJobDataMap().put("taskContext", quartzTask);
scheduler.scheduleJob(jobDetail, newTrigger);
}
} catch (Exception e) {
logger.error("scheduleManager init error.", e);
}
return bean;
}
}
ScheduleTaskExecutor
由于JobDetail自己管理了任務(wù)執(zhí)行類的生命周期,所以只能使用一個中間任務(wù)類ScheduleTaskExecutor驰坊,中轉(zhuǎn)到要實際執(zhí)行的任務(wù)匾二。
DisallowConcurrentExecution
注解能使得ScheduleTaskExecutor的同一個實例,不會并發(fā)執(zhí)行execute方法。也就是說同一個JobDetail不會并發(fā)執(zhí)行察藐。但是如果是不同的JobDetail皮璧,是可以并發(fā)執(zhí)行的。
@DisallowConcurrentExecution
public class ScheduleTaskExecutor implements Job {
@Override
public void execute(JobExecutionContext context) throws JobExecutionException {
QuartzTask quartzTask = (QuartzTask)context.getMergedJobDataMap().get("taskContext");
if (!HttpUtil.getServerIp().equals(quartzTask.getTargetIP())) {
return;
}
quartzTask.executeTask();
}
}
存在的問題
上面的實現(xiàn)方式存在一個待優(yōu)化的問題:線上應(yīng)用往往是多機器分布式的分飞,雖然指定了某個IP執(zhí)行任務(wù)悴务,但是其實每臺機器都開啟了一個定時任務(wù)去執(zhí)行中間任務(wù)類的的方法,只不過在IP判斷的時候提前結(jié)束了任務(wù)譬猫。
更好的實現(xiàn)方式讯檐,是監(jiān)控targetIP的變化,然后暫停or開啟任務(wù)染服。