在大多數(shù)項(xiàng)目應(yīng)該不可避免會(huì)用到定時(shí)任務(wù)了,如果是單體項(xiàng)目的話授瘦,要實(shí)現(xiàn)一個(gè)定時(shí)任務(wù)還是比較簡(jiǎn)單的,可以通過Executors.newScheduledThreadPool(10)
來(lái)實(shí)現(xiàn),也可以通過SpringBoot
的Scheduled
注解來(lái)實(shí)現(xiàn)孙蒙。如果是分布式項(xiàng)目或者微服務(wù)的話递雀,要實(shí)現(xiàn)一個(gè)定時(shí)任務(wù)就比較麻煩了柄延,或者自己去實(shí)現(xiàn),或者使用第三方的分布式定時(shí)任務(wù)框架缀程,比如Quartz
搜吧、Elastic-job
、xxl-job
等杨凑。
在我們的幾個(gè)項(xiàng)目中都會(huì)用到定時(shí)任務(wù)滤奈,而且用得也都比較頻繁,在微服務(wù)項(xiàng)目中使用的是xxl-job
撩满,在單體項(xiàng)目中蜒程,由于SpringBoot
自帶了定時(shí)任務(wù)的實(shí)現(xiàn),但是默認(rèn)的實(shí)現(xiàn)不是很友好伺帘,加上我們對(duì)于定時(shí)任務(wù)的管理要比較靈活昭躺,可以自由地對(duì)定時(shí)任務(wù)進(jìn)行增刪改查,所以我們就利用Executors.newScheduledThreadPool(10)
來(lái)實(shí)現(xiàn)了伪嫁。
首先领炫,我們還是來(lái)看一下SpringBoot
中的定時(shí)任務(wù)Scheduled
是如何實(shí)現(xiàn)的。
在SpringBoot
項(xiàng)目中张咳,如果想要實(shí)現(xiàn)定時(shí)任務(wù)的話帝洪,首先需要在啟動(dòng)類上添加@EnableScheduling
注解,然后在定時(shí)任務(wù)的方法上添加上@Scheduled
注解晶伦,這樣一個(gè)簡(jiǎn)單的定時(shí)任務(wù)就實(shí)現(xiàn)了碟狞。
@EnableScheduling
這個(gè)注解是SpringBoot
項(xiàng)目實(shí)現(xiàn)定時(shí)任務(wù)的關(guān)鍵,我們首先來(lái)觀察一下它的內(nèi)部實(shí)現(xiàn)婚陪,點(diǎn)進(jìn)去這個(gè)注解可以發(fā)現(xiàn)@Import(SchedulingConfiguration.class)
族沃,可以看到它會(huì)導(dǎo)入一個(gè)叫做SchedulingConfiguration
的配置類。
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Import(SchedulingConfiguration.class)
@Documented
public @interface EnableScheduling {
}
再點(diǎn)進(jìn)去的話,就可以發(fā)現(xiàn)這個(gè)配置類做的事情非常簡(jiǎn)單脆淹,就是new出了一個(gè)ScheduledAnnotationBeanPostProcessor
對(duì)象常空,這個(gè)對(duì)象就是實(shí)現(xiàn)定時(shí)任務(wù)的關(guān)鍵。
@Configuration
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
public class SchedulingConfiguration {
@Bean(name = TaskManagementConfigUtils.SCHEDULED_ANNOTATION_PROCESSOR_BEAN_NAME)
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
public ScheduledAnnotationBeanPostProcessor scheduledAnnotationProcessor() {
return new ScheduledAnnotationBeanPostProcessor();
}
}
我們可以看下ScheduledAnnotationBeanPostProcessor
的實(shí)現(xiàn)定義盖溺,發(fā)現(xiàn)它還是實(shí)現(xiàn)了非常多的接口的漓糙,其中有一個(gè)接口是MergedBeanDefinitionPostProcessor
接口,而這個(gè)接口又繼承了BeanPostProcessor
接口烘嘱,BeanPostProcessor
這個(gè)接口有兩個(gè)方法需要去實(shí)現(xiàn)昆禽,分別為postProcessBeforeInitialization
和postProcessAfterInitialization
方法,分別在bean
的初始化前和初始化后調(diào)用蝇庭。
那么我們就來(lái)關(guān)注一下postProcessAfterInitialization
方法的實(shí)現(xiàn)醉鳖,這個(gè)方法其實(shí)就是去掃描被@Scheduled
注解標(biāo)記的定時(shí)任務(wù),當(dāng)掃描到之后哮内,會(huì)對(duì)每個(gè)定時(shí)任務(wù)調(diào)用processScheduled
方法盗棵,而processScheduled
方法就是對(duì)@Scheduled
注解中的參數(shù)進(jìn)行解析,比如fixedDelay
北发、cron
等等纹因,解析完成之后再把它添加到定時(shí)任務(wù)的集合中。
@Override
public Object postProcessAfterInitialization(Object bean, String beanName) {
if (bean instanceof AopInfrastructureBean || bean instanceof TaskScheduler ||
bean instanceof ScheduledExecutorService) {
// Ignore AOP infrastructure such as scoped proxies.
return bean;
}
Class<?> targetClass = AopProxyUtils.ultimateTargetClass(bean);
if (!this.nonAnnotatedClasses.contains(targetClass) &&
AnnotationUtils.isCandidateClass(targetClass, Arrays.asList(Scheduled.class, Schedules.class))) {
Map<Method, Set<Scheduled>> annotatedMethods = MethodIntrospector.selectMethods(targetClass,
(MethodIntrospector.MetadataLookup<Set<Scheduled>>) method -> {
Set<Scheduled> scheduledMethods = AnnotatedElementUtils.getMergedRepeatableAnnotations(
method, Scheduled.class, Schedules.class);
return (!scheduledMethods.isEmpty() ? scheduledMethods : null);
});
if (annotatedMethods.isEmpty()) {
this.nonAnnotatedClasses.add(targetClass);
if (logger.isTraceEnabled()) {
logger.trace("No @Scheduled annotations found on bean class: " + targetClass);
}
}
else {
// Non-empty set of methods
annotatedMethods.forEach((method, scheduledMethods) ->
scheduledMethods.forEach(scheduled -> processScheduled(scheduled, method, bean)));
if (logger.isTraceEnabled()) {
logger.trace(annotatedMethods.size() + " @Scheduled methods processed on bean '" + beanName +
"': " + annotatedMethods);
}
}
}
return bean;
}
除了上述的接口以外琳拨,還有一個(gè)接口是ApplicationListener<ContextRefreshedEvent>
瞭恰,它會(huì)去監(jiān)聽ContextRefreshedEvent
事件,當(dāng)所有的bean都初始化完成并且裝載完成的話从绘,就會(huì)觸發(fā)該事件寄疏,實(shí)現(xiàn)了這個(gè)接口的類就可以監(jiān)聽到這個(gè)事件,從而去實(shí)現(xiàn)自己的邏輯僵井,這個(gè)接口只有一個(gè)方法定義onApplicationEvent(E event)
陕截,所以當(dāng)監(jiān)聽到ContextRefreshedEvent
事件的時(shí)候,就會(huì)執(zhí)行onApplicationEvent
方法批什。
public class ScheduledAnnotationBeanPostProcessor
implements ScheduledTaskHolder, MergedBeanDefinitionPostProcessor, DestructionAwareBeanPostProcessor,
Ordered, EmbeddedValueResolverAware, BeanNameAware, BeanFactoryAware, ApplicationContextAware,
SmartInitializingSingleton, ApplicationListener<ContextRefreshedEvent>, DisposableBean {}
在onApplicationEvent
方法里面做的事也非常簡(jiǎn)單农曲,就是調(diào)用內(nèi)部的一個(gè)方法finishRegistration
。finishRegistraion
方法的邏輯就比較復(fù)雜了驻债,我們一一來(lái)看下
@Override
public void onApplicationEvent(ContextRefreshedEvent event) {
if (event.getApplicationContext() == this.applicationContext) {
// Running in an ApplicationContext -> register tasks this late...
// giving other ContextRefreshedEvent listeners a chance to perform
// their work at the same time (e.g. Spring Batch's job registration).
finishRegistration();
}
}
private void finishRegistration() {
// scheduler可以自己去實(shí)現(xiàn)乳规,這個(gè)scheduler就是執(zhí)行定時(shí)任務(wù)的線程池,可以自己去實(shí)現(xiàn)TaskScheduler合呐,也就是使用jdk自帶的ScheduledExecutorService
// 具體可以看下setScheduler這個(gè)方法
if (this.scheduler != null) {
this.registrar.setScheduler(this.scheduler);
}
// 查找SchedulingConfigurer配置類暮的,然后加載配置,這個(gè)配置類也可以自己去實(shí)現(xiàn)淌实,在這個(gè)配置類中也可以去指定定時(shí)任務(wù)的線程池
if (this.beanFactory instanceof ListableBeanFactory) {
Map<String, SchedulingConfigurer> beans =
((ListableBeanFactory) this.beanFactory).getBeansOfType(SchedulingConfigurer.class);
List<SchedulingConfigurer> configurers = new ArrayList<>(beans.values());
AnnotationAwareOrderComparator.sort(configurers);
for (SchedulingConfigurer configurer : configurers) {
configurer.configureTasks(this.registrar);
}
}
// 這個(gè)registrar中就保存了被@Scheduled注解標(biāo)注的定時(shí)任務(wù)集合冻辩,之后會(huì)講到如何從其中獲取定時(shí)任務(wù)集合猖腕,并且進(jìn)行任務(wù)的取消
// 如果存在被@Scheduled注解標(biāo)記的定時(shí)任務(wù),但是scheduler為null的話恨闪,就會(huì)嘗試去搜索TaskScheduler倘感,沒有找到的話就拋出異常
if (this.registrar.hasTasks() && this.registrar.getScheduler() == null) {
Assert.state(this.beanFactory != null, "BeanFactory must be set to find scheduler by type");
try {
// Search for TaskScheduler bean...
this.registrar.setTaskScheduler(resolveSchedulerBean(this.beanFactory, TaskScheduler.class, false));
}
catch (NoUniqueBeanDefinitionException ex) {
if (logger.isTraceEnabled()) {
logger.trace("Could not find unique TaskScheduler bean - attempting to resolve by name: " +
ex.getMessage());
}
try {
this.registrar.setTaskScheduler(resolveSchedulerBean(this.beanFactory, TaskScheduler.class, true));
}
catch (NoSuchBeanDefinitionException ex2) {
if (logger.isInfoEnabled()) {
logger.info("More than one TaskScheduler bean exists within the context, and " +
"none is named 'taskScheduler'. Mark one of them as primary or name it 'taskScheduler' " +
"(possibly as an alias); or implement the SchedulingConfigurer interface and call " +
"ScheduledTaskRegistrar#setScheduler explicitly within the configureTasks() callback: " +
ex.getBeanNamesFound());
}
}
}
catch (NoSuchBeanDefinitionException ex) {
if (logger.isTraceEnabled()) {
logger.trace("Could not find default TaskScheduler bean - attempting to find ScheduledExecutorService: " +
ex.getMessage());
}
// Search for ScheduledExecutorService bean next...
try {
this.registrar.setScheduler(resolveSchedulerBean(this.beanFactory, ScheduledExecutorService.class, false));
}
catch (NoUniqueBeanDefinitionException ex2) {
if (logger.isTraceEnabled()) {
logger.trace("Could not find unique ScheduledExecutorService bean - attempting to resolve by name: " +
ex2.getMessage());
}
try {
this.registrar.setScheduler(resolveSchedulerBean(this.beanFactory, ScheduledExecutorService.class, true));
}
catch (NoSuchBeanDefinitionException ex3) {
if (logger.isInfoEnabled()) {
logger.info("More than one ScheduledExecutorService bean exists within the context, and " +
"none is named 'taskScheduler'. Mark one of them as primary or name it 'taskScheduler' " +
"(possibly as an alias); or implement the SchedulingConfigurer interface and call " +
"ScheduledTaskRegistrar#setScheduler explicitly within the configureTasks() callback: " +
ex2.getBeanNamesFound());
}
}
}
catch (NoSuchBeanDefinitionException ex2) {
if (logger.isTraceEnabled()) {
logger.trace("Could not find default ScheduledExecutorService bean - falling back to default: " +
ex2.getMessage());
}
// Giving up -> falling back to default scheduler within the registrar...
logger.info("No TaskScheduler/ScheduledExecutorService bean found for scheduled processing");
}
}
}
// 最后會(huì)執(zhí)行這個(gè)方法
this.registrar.afterPropertiesSet();
}
@Override
public void afterPropertiesSet() {
scheduleTasks();
}
protected void scheduleTasks() {
// 在這個(gè)方法里面,可以發(fā)現(xiàn)咙咽,如果taskScheduler不存在的話老玛,就會(huì)創(chuàng)建出一個(gè)執(zhí)行器,這個(gè)執(zhí)行器應(yīng)該不陌生了
// 它就是一個(gè)corePoolSize為單線程,maxPoolSize為Integer.MAX_VALUE,隊(duì)列為DelayedWorkQueue的執(zhí)行器
// 當(dāng)存在很多個(gè)定時(shí)任務(wù)同時(shí)執(zhí)行的時(shí)候,只會(huì)有一個(gè)定時(shí)任務(wù)被執(zhí)行,其他的定時(shí)任務(wù)會(huì)被扔進(jìn)DelayedWorkQueue隊(duì)列中
if (this.taskScheduler == null) {
this.localExecutor = Executors.newSingleThreadScheduledExecutor();
this.taskScheduler = new ConcurrentTaskScheduler(this.localExecutor);
}
// 下面的這幾個(gè)判斷就是將被@Scheduled注解標(biāo)記的定時(shí)任務(wù)添加到任務(wù)集合中
if (this.triggerTasks != null) {
for (TriggerTask task : this.triggerTasks) {
addScheduledTask(scheduleTriggerTask(task));
}
}
// 注意以下這個(gè)cron表達(dá)式的定時(shí)任務(wù)添加搞隐,后續(xù)我們?nèi)?shí)現(xiàn)動(dòng)態(tài)地對(duì)定時(shí)任務(wù)進(jìn)行管理會(huì)用到
if (this.cronTasks != null) {
for (CronTask task : this.cronTasks) {
// 這里的scheduleCronTask還是值得關(guān)注的
addScheduledTask(scheduleCronTask(task));
}
}
if (this.fixedRateTasks != null) {
for (IntervalTask task : this.fixedRateTasks) {
addScheduledTask(scheduleFixedRateTask(task));
}
}
if (this.fixedDelayTasks != null) {
for (IntervalTask task : this.fixedDelayTasks) {
addScheduledTask(scheduleFixedDelayTask(task));
}
}
}
到這里呢,被@Scheduled
注解標(biāo)記的方法就會(huì)被作為定時(shí)任務(wù)添加到定時(shí)任務(wù)集合中了磷瘤。
從上面我們可以發(fā)現(xiàn)敏晤,對(duì)于默認(rèn)的定時(shí)任務(wù)的實(shí)現(xiàn),執(zhí)行定時(shí)任務(wù)的線程池并不是很友好威根,我們可以去自定義實(shí)現(xiàn)執(zhí)行定時(shí)任務(wù)的線程池凤巨,可以去實(shí)現(xiàn)TaskScheduler
,也可以去創(chuàng)建ScheduledExecutorService
洛搀,還可以去實(shí)現(xiàn)配置類SchedulingConfigurer
敢茁。
@Configuration
public class TestConfig {
@Bean
public TaskScheduler taskScheduler() {
ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler();
taskScheduler.setPoolSize(10);
taskScheduler.setRemoveOnCancelPolicy(false);
taskScheduler.initialize();
return taskScheduler;
}
}
如何獲取定義的定時(shí)任務(wù)集合
在之前的描述中,我們可以發(fā)現(xiàn)在服務(wù)啟動(dòng)的時(shí)候留美,IOC容器中會(huì)注入一個(gè)ScheduledAnnotationBeanPostProcessor
的Bean對(duì)象彰檬,這個(gè)Bean對(duì)象就是來(lái)對(duì)定時(shí)任務(wù)進(jìn)行管理的,那么我們就可以從這個(gè)類中獲取到定時(shí)任務(wù)的集合谎砾,并且將定時(shí)任務(wù)都打印出來(lái)看一下內(nèi)容都是什么逢倍,可以發(fā)現(xiàn)ScheduledTask
的toString()
方法就是定時(shí)任務(wù)的全類名加上方法名,比如com.yan.shiyue.Task.task
景图,這樣的話较雕,我們就可以將這些定時(shí)任務(wù)給保存起來(lái),作為一個(gè)Map挚币,key就是定時(shí)任務(wù)的名字亮蒋,value就是ScheduledTask
,然后我們就可以動(dòng)態(tài)地對(duì)這些任務(wù)進(jìn)行取消了妆毕,因?yàn)?code>ScheduledTask提供了一個(gè)cancel
方法來(lái)取消定時(shí)任務(wù)的執(zhí)行慎玖。
@Slf4j
@Component
public class ScheduledTaskConfig implements CommandLineRunner {
@Autowired
private ScheduledAnnotationBeanPostProcessor scheduledAnnotationBeanPostProcessor;
@Override
public void run(String... args) {
Set<ScheduledTask> tasks = scheduledAnnotationBeanPostProcessor.getScheduledTasks();
for (ScheduledTask task : tasks) {
log.error(task.toString());
}
}
}
如何動(dòng)態(tài)地創(chuàng)建定時(shí)任務(wù)
我們可以發(fā)現(xiàn)SpringBoot
提供的定時(shí)任務(wù)并不是很靈活,我們沒法動(dòng)態(tài)地對(duì)定時(shí)任務(wù)進(jìn)行增刪改查笛粘,那么基于SpringBoot
的定時(shí)任務(wù)的實(shí)現(xiàn)趁怔,我們可以自己來(lái)實(shí)現(xiàn)定時(shí)任務(wù)的動(dòng)態(tài)操作远舅。
在接下來(lái)的操作中,就以cron表達(dá)式類型的定時(shí)任務(wù)進(jìn)行動(dòng)態(tài)地增刪改查痕钢,在實(shí)現(xiàn)之前我們回顧一下SpringBoot
中的cron表達(dá)式類型的定時(shí)任務(wù)時(shí)如何被添加到任務(wù)集合中的图柏。
protected void scheduleTasks() {
// 在這個(gè)方法里面,可以發(fā)現(xiàn)任连,如果taskScheduler不存在的話蚤吹,就會(huì)創(chuàng)建出一個(gè)執(zhí)行器,這個(gè)執(zhí)行器應(yīng)該不陌生了
// 它就是一個(gè)corePoolSize為單線程随抠,maxPoolSize為Integer.MAX_VALUE裁着,隊(duì)列為DelayedWorkQueue的執(zhí)行器
// 當(dāng)存在很多個(gè)定時(shí)任務(wù)同時(shí)執(zhí)行的時(shí)候,只會(huì)有一個(gè)定時(shí)任務(wù)被執(zhí)行拱她,其他的定時(shí)任務(wù)會(huì)被扔進(jìn)DelayedWorkQueue隊(duì)列中
if (this.taskScheduler == null) {
this.localExecutor = Executors.newSingleThreadScheduledExecutor();
this.taskScheduler = new ConcurrentTaskScheduler(this.localExecutor);
}
// 下面的這幾個(gè)判斷就是將被@Scheduled注解標(biāo)記的定時(shí)任務(wù)添加到任務(wù)集合中
if (this.triggerTasks != null) {
for (TriggerTask task : this.triggerTasks) {
addScheduledTask(scheduleTriggerTask(task));
}
}
// 注意以下這個(gè)cron表達(dá)式的定時(shí)任務(wù)添加二驰,后續(xù)我們?nèi)?shí)現(xiàn)動(dòng)態(tài)地對(duì)定時(shí)任務(wù)進(jìn)行管理會(huì)用到
if (this.cronTasks != null) {
for (CronTask task : this.cronTasks) {
// 這里的scheduleCronTask還是值得關(guān)注的
addScheduledTask(scheduleCronTask(task));
}
}
if (this.fixedRateTasks != null) {
for (IntervalTask task : this.fixedRateTasks) {
addScheduledTask(scheduleFixedRateTask(task));
}
}
if (this.fixedDelayTasks != null) {
for (IntervalTask task : this.fixedDelayTasks) {
addScheduledTask(scheduleFixedDelayTask(task));
}
}
}
可以發(fā)現(xiàn),SpringBoot
對(duì)幾種定時(shí)任務(wù)都實(shí)現(xiàn)了對(duì)應(yīng)的Task
秉沼,比如cron表達(dá)式類型的CronTask
桶雀,固定頻率類型的IntervalTask
等等,那么我們?nèi)绻獎(jiǎng)討B(tài)地添加一個(gè)cron表達(dá)式類型的定時(shí)任務(wù)的話唬复,就可以實(shí)現(xiàn)CronTask
了矗积。
那么,我們自己創(chuàng)建好一個(gè)CronTask
之后該如何執(zhí)行呢敞咧,之前有提到過SpringBoot
執(zhí)行定時(shí)任務(wù)的執(zhí)行器可以自定義棘捣,那么我們?cè)谧远x好執(zhí)行器TaskScheduler
之后,就可以調(diào)用其中的schedule
方法來(lái)執(zhí)行定時(shí)任務(wù)了休建。
首先乍恐,我們需要?jiǎng)?chuàng)建好一個(gè)任務(wù),需要實(shí)現(xiàn)Runnable
接口测砂。
public class TestTask implements Runnable {
@Override
public void run() {
System.out.println(System.currentTimeMillis() + "shiyue");
}
}
然后茵烈,我們可以去實(shí)現(xiàn)一個(gè)接口,來(lái)動(dòng)態(tài)地管理這個(gè)定時(shí)任務(wù)邑彪。
@RestController
public class TestController {
@Autowired
private TaskScheduler taskScheduler;
@Autowired
private ScheduledAnnotationBeanPostProcessor scheduledAnnotationBeanPostProcessor;
private final Map<Integer, ScheduledFuture> taskMap = new ConcurrentHashMap<>();
/**
* 添加一個(gè)定時(shí)任務(wù)
*
* @return
*/
@GetMapping("/task")
public String addTask() {
// 這里為了方便瞧毙,cron表達(dá)式寫死了,其實(shí)可以由外部傳入
CronTask cronTask = new CronTask(new TestTask(), "*/5 * * * * ?");
ScheduledFuture scheduledFuture = taskScheduler.schedule(cronTask.getRunnable(), cronTask.getTrigger());
// 同時(shí)寄症,這里也是為了方便宙彪,使用Map來(lái)保存定時(shí)任務(wù)的信息,其實(shí)可以將定時(shí)任務(wù)持久化到MySQL中
taskMap.put(1, scheduledFuture);
return "shiyue";
}
/**
* 更新一個(gè)定時(shí)任務(wù)有巧,更新一個(gè)定時(shí)任務(wù)可以看做是將原來(lái)的定時(shí)給取消掉释漆,然后新增一個(gè)新的定時(shí)任務(wù)
*
* @return
*/
@GetMapping("/task/update/{id}")
public String updateTask(@PathVariable Integer id, @RequestParam String cron) {
ScheduledFuture scheduledFuture = taskMap.get(id);
scheduledFuture.cancel(true);
// 添加
CronTask cronTask = new CronTask(new TestTask(), cron);
ScheduledFuture scheduledFuture1 = taskScheduler.schedule(cronTask.getRunnable(), cronTask.getTrigger());
taskMap.put(id, scheduledFuture1);
return "Success";
}
@GetMapping("/task/list")
public String taskList() {
Set<ScheduledTask> tasks = scheduledAnnotationBeanPostProcessor.getScheduledTasks();
for (ScheduledTask task : tasks) {
System.out.println(task);
}
return "qiyue";
}
}