@[toc]
1.@Scheduled
跟@EnableScheduling
1.1 @Scheduled
注解
?Spring在3.0版本的時候加入@Scheduled
注解來完成對定時任務(wù)的支持,其底層是spring自己實現(xiàn)的一套定時任務(wù)的邏輯庄吼,所以功能比價簡單您访,但是能夠?qū)崿F(xiàn)單機部署上的定時任務(wù)未妹。這里對這個注解的內(nèi)部的屬性進行分析。
@Target({ElementType.METHOD, ElementType.ANNOTATION_TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Repeatable(Schedules.class)
public @interface Scheduled {
//一個特殊的cron表達式茂翔,表示的是禁用Trigger
String CRON_DISABLED = ScheduledTaskRegistrar.CRON_DISABLED;
//cron表達式
String cron() default "";
//時區(qū) 默認是的是取當前的服務(wù)器的時區(qū)
String zone() default "";
//在任務(wù)最后一次調(diào)用的結(jié)束和下一次調(diào)用的開始之間以毫秒為單位執(zhí)行帶注釋的方法
long fixedDelay() default -1;
//在最后一次調(diào)用的結(jié)束和下一次調(diào)用的開始之間以毫秒為單位執(zhí)行帶注釋的方法
String fixedDelayString() default "";
//在兩次調(diào)用之間以毫秒為單位執(zhí)行帶注釋的方法。
long fixedRate() default -1;
//在兩次調(diào)用之間以毫秒為單位執(zhí)行帶注釋的方法。
String fixedRateString() default "";
//在第一次執(zhí)行任務(wù)之前延遲多少秒
long initialDelay() default -1;
//在第一次執(zhí)行任務(wù)之前延遲多少秒
String initialDelayString() default "";
}
?在@Scheduled
注解中主要定了定時任務(wù)相關(guān)的屬性宋列,包含的常用的cron表達式,還添加了其他的屬性评也×墩龋可以指定是否使用觸發(fā)器(Tigger)灭返,可以使用觸發(fā)器的方式(注意使用觸發(fā)器的方式的時候)
?在spring中還有另外的一個注解跟@Scheduled
注解一樣的作用,就是@Schedules
注解坤邪,這個注解內(nèi)部包含多個@Scheduled
注解熙含,可以表示一個方法可以存在多個調(diào)度設(shè)置
@Target({ElementType.METHOD, ElementType.ANNOTATION_TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface Schedules {
Scheduled[] value();
}
1.2 @EnableScheduling
注解
?@EnableScheduling
注解是spring在3.1版本的時候加上的,作用就是開啟spring對定時任務(wù)的支持艇纺。這個注解也是對@Import
注解的一個擴展使用婆芦。關(guān)于@Import
注解可以看看前面的一篇文章spring源碼解析------@Import注解解析與ImportSelector,ImportBeanDefinitionRegistrar以及DeferredImportSelector區(qū)別
進行了解喂饥。
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Import(SchedulingConfiguration.class)
@Documented
public @interface EnableScheduling {
}
2 @Scheduled
跟@EnableScheduling
注解的解析
?基于對@Import
注解的了解消约,這里直接進入到@EnableScheduling
注解上面指定的SchedulingConfiguration
類進行查看。
2.1 開啟定時任務(wù)入口的SchedulingConfiguration
?在SchedulingConfiguration
中代碼很少员帮,只是注冊了一個bean到容器中或粮。但是這個bean卻是定時任務(wù)的解析與創(chuàng)建的核心類。進入到代碼中查看
@Configuration
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
public class SchedulingConfiguration {
@Bean(name = TaskManagementConfigUtils.SCHEDULED_ANNOTATION_PROCESSOR_BEAN_NAME)
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
public ScheduledAnnotationBeanPostProcessor scheduledAnnotationProcessor() {
//注冊ScheduledAnnotationBeanPostProcessor到容器中
return new ScheduledAnnotationBeanPostProcessor();
}
}
?這里的代碼不用進行解析了捞高,只是進行了一個注解氯材,接下來的才是重要的東西。
2.2 解析@Scheduled
與@Schedules
以及創(chuàng)建定時任務(wù)入口的ScheduledAnnotationBeanPostProcessor
?在解析ScheduledAnnotationBeanPostProcessor
之前硝岗,我們需要先了解一下bean的生命周期最好了氢哮。因為這個類繼承了實現(xiàn)了很多的spring的生命周期相關(guān)的接口,關(guān)于spring 的生命周期相關(guān)的可以看看Spring源碼----Spring的Bean生命周期流程圖及代碼解釋
型檀,現(xiàn)在進行下一步的解析冗尤。
2.2.1 ScheduledAnnotationBeanPostProcessor
實現(xiàn)的生命周期的方法排序
?這里先吧ScheduledAnnotationBeanPostProcessor
實現(xiàn)與繼承的接口展示出來,然后說明主要的實現(xiàn)方法的調(diào)用順序胀溺。
public class ScheduledAnnotationBeanPostProcessor
implements ScheduledTaskHolder, MergedBeanDefinitionPostProcessor, DestructionAwareBeanPostProcessor,
Ordered, EmbeddedValueResolverAware, BeanNameAware, BeanFactoryAware, ApplicationContextAware,
SmartInitializingSingleton, ApplicationListener<ContextRefreshedEvent>, DisposableBean {
......
@Override
//實現(xiàn)Ordered接口裂七,返回當前BeanPostProcessor的順序
public int getOrder() {
return LOWEST_PRECEDENCE;
}
......
@Override
//實現(xiàn)EmbeddedValueResolverAware接口
public void setEmbeddedValueResolver(StringValueResolver resolver) {
//設(shè)置string字段解析的對象StringValueResolver
this.embeddedValueResolver = resolver;
}
@Override
//實現(xiàn)BeanNameAware接口
public void setBeanName(String beanName) {
this.beanName = beanName;
}
@Override
//實現(xiàn)BeanFactoryAware接口
public void setBeanFactory(BeanFactory beanFactory) {
this.beanFactory = beanFactory;
}
@Override
//實現(xiàn)ApplicationContextAware接口
public void setApplicationContext(ApplicationContext applicationContext) {
//設(shè)置spring的上下文進來
this.applicationContext = applicationContext;
if (this.beanFactory == null) {
this.beanFactory = applicationContext;
}
}
@Override
//實現(xiàn)了SmartInitializingSingleton接口,在所有的單例bean實例化之后
public void afterSingletonsInstantiated() {
// Remove resolved singleton classes from cache
//移除緩存
this.nonAnnotatedClasses.clear();
if (this.applicationContext == null) {
// Not running in an ApplicationContext -> register tasks early...
// 不再上下文中運行仓坞,這里表示的是spring的上下文還沒有初始化完成時候調(diào)用背零,一般都不會進入到這里
finishRegistration();
}
}
@Override
//實現(xiàn)了ApplicationListener,這里監(jiān)聽的是上下文刷新完成事件
public void onApplicationEvent(ContextRefreshedEvent event) {
if (event.getApplicationContext() == this.applicationContext) {
// Running in an ApplicationContext -> register tasks this late...
// giving other ContextRefreshedEvent listeners a chance to perform
// their work at the same time (e.g. Spring Batch's job registration).
//注冊定時任務(wù)
finishRegistration();
}
}
@Override
//實現(xiàn)了MergedBeanDefinitionPostProcessor无埃,這里是空實現(xiàn)的徙瓶,
public void postProcessMergedBeanDefinition(RootBeanDefinition beanDefinition, Class<?> beanType, String beanName) {
}
@Override
//這里實現(xiàn)BeanPostProcessor,也是空實現(xiàn)嫉称,在bean初始化之前
public Object postProcessBeforeInitialization(Object bean, String beanName) {
return bean;
}
@Override
//實現(xiàn)BeanPostProcessor侦镇,在bean初始化之后?
public Object postProcessAfterInitialization(Object bean, String beanName) {
......
}
@Override
//實現(xiàn)了DestructionAwareBeanPostProcessor,bean銷毀之前調(diào)用
public void postProcessBeforeDestruction(Object bean, String beanName) {
Set<ScheduledTask> tasks;
synchronized (this.scheduledTasks) {
tasks = this.scheduledTasks.remove(bean);
}
if (tasks != null) {
for (ScheduledTask task : tasks) {
task.cancel();
}
}
}
@Override
//實現(xiàn)了DestructionAwareBeanPostProcessor澎埠,是否注冊銷毀方法的時候調(diào)用
public boolean requiresDestruction(Object bean) {
synchronized (this.scheduledTasks) {
return this.scheduledTasks.containsKey(bean);
}
}
@Override
//實現(xiàn)DisposableBean虽缕,bean銷毀的時候調(diào)用
public void destroy() {
synchronized (this.scheduledTasks) {
Collection<Set<ScheduledTask>> allTasks = this.scheduledTasks.values();
for (Set<ScheduledTask> tasks : allTasks) {
for (ScheduledTask task : tasks) {
task.cancel();
}
}
this.scheduledTasks.clear();
}
this.registrar.destroy();
}
}
?會發(fā)現(xiàn)ScheduledAnnotationBeanPostProcessor
足足實現(xiàn)了9個生命周期有關(guān)的接口類。現(xiàn)在需要來整理一下實現(xiàn)的這些類的方法,后面才好進行分析氮趋。
-
ApplicationContextAware
的setApplicationContext
,EmbeddedValueResolverAware
的setEmbeddedValueResolver
-
MergedBeanDefinitionPostProcessor
的postProcessMergedBeanDefinition
-
BeanNameAware
,BeanFactoryAware
-
BeanPostProcessor
的postProcessAfterInitialization
-
SmartInitializingSingleton
的afterSingletonsInstantiated
-
DestructionAwareBeanPostProcessor
的requiresDestruction
-
ApplicationListener<ContextRefreshedEvent>
的事件 -
DestructionAwareBeanPostProcessor
的requiresDestruction
-
DisposableBean
的destroy
?接下里按照這個順序伍派,只分析重點的方法。
2.2.2 在bean初始化之后的@Scheduled
以及@Schedules
注解的解析
?先看看一個bean在初始化之后步驟剩胁。
@Override
//實現(xiàn)BeanPostProcessor诉植,在bean初始化之后?
public Object postProcessAfterInitialization(Object bean, String beanName) {
//bean是spring的aop,task昵观,schedule相關(guān)的基礎(chǔ)接口類的實現(xiàn)類晾腔,則進行過濾
if (bean instanceof AopInfrastructureBean || bean instanceof TaskScheduler ||
bean instanceof ScheduledExecutorService) {
// Ignore AOP infrastructure such as scoped proxies.
return bean;
}
//獲取當前bean的真正的目標Class
Class<?> targetClass = AopProxyUtils.ultimateTargetClass(bean);
//當前的目標類不再已緩存的不包含注解的類的集合中,并且當前目標類包含了Scheduled或者Schedules注解
if (!this.nonAnnotatedClasses.contains(targetClass) &&
AnnotationUtils.isCandidateClass(targetClass, Arrays.asList(Scheduled.class, Schedules.class))) {
//從目標類中獲取貼有Scheduled跟Schedules的方法
Map<Method, Set<Scheduled>> annotatedMethods = MethodIntrospector.selectMethods(targetClass,
(MethodIntrospector.MetadataLookup<Set<Scheduled>>) method -> {
Set<Scheduled> scheduledMethods = AnnotatedElementUtils.getMergedRepeatableAnnotations(
method, Scheduled.class, Schedules.class);
return (!scheduledMethods.isEmpty() ? scheduledMethods : null);
});
//如果不含有Scheduled跟Schedules的方法的方法啊犬,就把這個類加入到nonAnnotatedClasses集合
if (annotatedMethods.isEmpty()) {
this.nonAnnotatedClasses.add(targetClass);
if (logger.isTraceEnabled()) {
logger.trace("No @Scheduled annotations found on bean class: " + targetClass);
}
}
else {
// Non-empty set of methods
//迭代這些需要定時執(zhí)行的方法
annotatedMethods.forEach((method, scheduledMethods) ->
//迭代對應(yīng)的方法上面的Scheduled跟Schedules注解
scheduledMethods.forEach(scheduled -> processScheduled(scheduled, method, bean)));
if (logger.isTraceEnabled()) {
logger.trace(annotatedMethods.size() + " @Scheduled methods processed on bean '" + beanName +
"': " + annotatedMethods);
}
}
}
return bean;
}
?這個方法的主要邏輯就是從已經(jīng)初始化完畢之后的bean中尋找貼有@Scheduled
以及@Schedules
注解的方法灼擂,然后迭代這些方法上的注解列表,并解析這些注解內(nèi)部的屬性并進一步的處理觉至。處理的內(nèi)部屬性的方法在ScheduledAnnotationBeanPostProcessor
的另外一個方法中剔应。這里跟進繼續(xù)分析
protected void processScheduled(Scheduled scheduled, Method method, Object bean) {
try {
//根據(jù)bean以及需要定時調(diào)用的方法創(chuàng)建一個Runnable
Runnable runnable = createRunnable(bean, method);
boolean processedSchedule = false;
String errorMessage =
"Exactly one of the 'cron', 'fixedDelay(String)', or 'fixedRate(String)' attributes is required";
Set<ScheduledTask> tasks = new LinkedHashSet<>(4);
// Determine initial delay
//檢查這個方法是否配置了需要延遲執(zhí)行
long initialDelay = scheduled.initialDelay();
//獲取配置的延遲執(zhí)行的時間
String initialDelayString = scheduled.initialDelayString();
//如果延遲執(zhí)行的時間存在指
if (StringUtils.hasText(initialDelayString)) {
Assert.isTrue(initialDelay < 0, "Specify 'initialDelay' or 'initialDelayString', not both");
//獲取延遲的時間
if (this.embeddedValueResolver != null) {
initialDelayString = this.embeddedValueResolver.resolveStringValue(initialDelayString);
}
if (StringUtils.hasLength(initialDelayString)) {
try {
initialDelay = parseDelayAsLong(initialDelayString);
}
catch (RuntimeException ex) {
throw new IllegalArgumentException(
"Invalid initialDelayString value \"" + initialDelayString + "\" - cannot parse into long");
}
}
}
// Check cron expression
//獲取cron表達式
String cron = scheduled.cron();
//檢查cron表達式是不是有值
if (StringUtils.hasText(cron)) {
//獲取指定的時區(qū)
String zone = scheduled.zone();
if (this.embeddedValueResolver != null) {
cron = this.embeddedValueResolver.resolveStringValue(cron);
zone = this.embeddedValueResolver.resolveStringValue(zone);
}
if (StringUtils.hasLength(cron)) {
//如果cron表達式不支持延遲
Assert.isTrue(initialDelay == -1, "'initialDelay' not supported for cron triggers");
processedSchedule = true;
//如果指定了不實用cron的方式進行觸發(fā)
if (!Scheduled.CRON_DISABLED.equals(cron)) {
TimeZone timeZone;
if (StringUtils.hasText(zone)) {
//解析時區(qū)
timeZone = StringUtils.parseTimeZoneString(zone);
}
else {
//沒有指定時區(qū),則使用默認的時區(qū)
timeZone = TimeZone.getDefault();
}
//將創(chuàng)建的ScheduledTask對象加入到集合中
tasks.add(
//使用ScheduledTaskRegistrar創(chuàng)建一個ScheduledTask
this.registrar.scheduleCronTask(
//根絕需要執(zhí)行的Runable對象以及觸發(fā)器創(chuàng)建一個CronTask
new CronTask(runnable,
//根據(jù)cron表達式以及時區(qū)創(chuàng)建一個觸發(fā)器CronTrigger
new CronTrigger(cron, timeZone))));
}
}
}
// At this point we don't need to differentiate between initial delay set or not anymore
//這里直接將延遲執(zhí)行的時間改為0语御,因為小于0表示的是不需要進行延遲
if (initialDelay < 0) {
initialDelay = 0;
}
// Check fixed delay
//獲取間隔調(diào)用的時間
long fixedDelay = scheduled.fixedDelay();
if (fixedDelay >= 0) {
Assert.isTrue(!processedSchedule, errorMessage);
processedSchedule = true;
//根據(jù)間隔調(diào)用時間創(chuàng)建任務(wù)
tasks.add(this.registrar.scheduleFixedDelayTask(new FixedDelayTask(runnable, fixedDelay, initialDelay)));
}
String fixedDelayString = scheduled.fixedDelayString();
if (StringUtils.hasText(fixedDelayString)) {
//解析字符串形式的延遲調(diào)用時間
if (this.embeddedValueResolver != null) {
fixedDelayString = this.embeddedValueResolver.resolveStringValue(fixedDelayString);
}
if (StringUtils.hasLength(fixedDelayString)) {
Assert.isTrue(!processedSchedule, errorMessage);
processedSchedule = true;
try {
fixedDelay = parseDelayAsLong(fixedDelayString);
}
catch (RuntimeException ex) {
throw new IllegalArgumentException(
"Invalid fixedDelayString value \"" + fixedDelayString + "\" - cannot parse into long");
}
//根據(jù)間隔調(diào)用時間創(chuàng)建任務(wù)
tasks.add(this.registrar.scheduleFixedDelayTask(new FixedDelayTask(runnable, fixedDelay, initialDelay)));
}
}
// Check fixed rate
//獲取調(diào)用頻率
long fixedRate = scheduled.fixedRate();
if (fixedRate >= 0) {
Assert.isTrue(!processedSchedule, errorMessage);
processedSchedule = true;
//根據(jù)調(diào)用頻率創(chuàng)建定時調(diào)度任務(wù)
tasks.add(this.registrar.scheduleFixedRateTask(new FixedRateTask(runnable, fixedRate, initialDelay)));
}
//獲取string形式的調(diào)用頻率
String fixedRateString = scheduled.fixedRateString();
if (StringUtils.hasText(fixedRateString)) {
//解析string形式的調(diào)用頻率
if (this.embeddedValueResolver != null) {
fixedRateString = this.embeddedValueResolver.resolveStringValue(fixedRateString);
}
if (StringUtils.hasLength(fixedRateString)) {
Assert.isTrue(!processedSchedule, errorMessage);
processedSchedule = true;
try {
fixedRate = parseDelayAsLong(fixedRateString);
}
catch (RuntimeException ex) {
throw new IllegalArgumentException(
"Invalid fixedRateString value \"" + fixedRateString + "\" - cannot parse into long");
}
//根據(jù)調(diào)用頻率創(chuàng)建定時調(diào)度任務(wù)
tasks.add(this.registrar.scheduleFixedRateTask(new FixedRateTask(runnable, fixedRate, initialDelay)));
}
}
// Check whether we had any attribute set
//檢查是否有設(shè)置創(chuàng)建定時調(diào)度任務(wù)的參數(shù)('cron', 'fixedDelay(String)', or 'fixedRate(String)')峻贮,沒有則需要報錯
Assert.isTrue(processedSchedule, errorMessage);
// Finally register the scheduled tasks
//最后注冊定時調(diào)度任務(wù)
synchronized (this.scheduledTasks) {
Set<ScheduledTask> regTasks = this.scheduledTasks.computeIfAbsent(bean, key -> new LinkedHashSet<>(4));
regTasks.addAll(tasks);
}
}
catch (IllegalArgumentException ex) {
throw new IllegalStateException(
"Encountered invalid @Scheduled method '" + method.getName() + "': " + ex.getMessage());
}
}
?上面的這個方法可以分為兩個大的步驟:1. 解析@Scheduled
注解中的內(nèi)部屬性,然后創(chuàng)建定時任務(wù)应闯,然后加入到任務(wù)列表中纤控;2. 將任務(wù)保存在內(nèi)部的一個bean與其相關(guān)的定時任務(wù)方法集合scheduledTasks
中。而這里的第一步又分了4個屬性的解析碉纺。
-
initialDelay
是否延遲執(zhí)行任務(wù)的屬性 -
cron
表達式的解析船万,這里需要注意的cron表達式不能跟延遲一起使用 -
fixedDelay
在上一次調(diào)用完畢之后經(jīng)過多久再次調(diào)用的屬性 -
fixedRate
方法調(diào)用的頻率屬性
?這里需要重點關(guān)注的兩個位置在創(chuàng)建定時任務(wù)的時候的三個方法。1. createRunnable
方法 2. CronTrigger
對象的創(chuàng)建 3.CronTask
對象的創(chuàng)建 4.scheduleCronTask
方法的調(diào)用惜辑。接下來就是分析這個方法
2.2.3 定時任務(wù)的創(chuàng)建
2.2.3.1 createRunnable
創(chuàng)建包含定時執(zhí)行的ScheduledMethodRunnable
對象
?直接進入到createRunnable
方法進行查看
protected Runnable createRunnable(Object target, Method method) {
//貼有@Scheduled注解的方法需要是無參方法
Assert.isTrue(method.getParameterCount() == 0, "Only no-arg methods may be annotated with @Scheduled");
//從目標類(bean)中尋找一個對應(yīng)的可以調(diào)用的方法
Method invocableMethod = AopUtils.selectInvocableMethod(method, target.getClass());
//創(chuàng)建一個ScheduledMethodRunnable實現(xiàn)了Runable接口
return new ScheduledMethodRunnable(target, invocableMethod);
}
?發(fā)現(xiàn)就是獲取需要定時調(diào)用的方法唬涧,然后創(chuàng)建一個ScheduledMethodRunnable
對象。而這個對象又實現(xiàn)了Runnable
接口盛撑,因此可以 定時的執(zhí)行其內(nèi)保存的調(diào)用方法。
2.2.3.2 根據(jù)表達式創(chuàng)建觸發(fā)器CronTrigger
?在創(chuàng)建CronTrigger
對象的時候會把表達式跟時區(qū)這兩個屬性傳入捧搞,在CronTrigger
會創(chuàng)建一個CronSequenceGenerator
對象抵卫,而這個對象在初始化的時候會解析對應(yīng)的表達式以及對應(yīng)的時區(qū),然后保存對象的信息胎撇,在后面計算下次定時執(zhí)行時間的時候用到
public CronTrigger(String expression, TimeZone timeZone) {
this.sequenceGenerator = new CronSequenceGenerator(expression, timeZone);
}
public CronSequenceGenerator(String expression, TimeZone timeZone) {
this.expression = expression;
this.timeZone = timeZone;
//解析時間表達式
parse(expression);
}
2.2.3.3 創(chuàng)建包含觸發(fā)器的定時任務(wù)對象CronTask
?CronTask
對象中保存了觸發(fā)器對象介粘,用來在后面計算定時執(zhí)行時間的時候用。然后將需要定時執(zhí)行的ScheduledMethodRunnable
對象保存了起來晚树,在后面執(zhí)行的時候使用姻采。
public CronTask(Runnable runnable, CronTrigger cronTrigger) {
super(runnable, cronTrigger);
this.expression = cronTrigger.getExpression();
}
2.2.3.4 執(zhí)行定時任務(wù)
?接下來執(zhí)行的邏輯也是比較冗長的。scheduleCronTask
方法在ScheduledTaskRegistrar
類中爵憎。這里直接進行分析慨亲。
public ScheduledTask scheduleCronTask(CronTask task) {
//從未解析的任務(wù)中獲取是否有這個任務(wù)婚瓜,有則移除
ScheduledTask scheduledTask = this.unresolvedTasks.remove(task);
boolean newTask = false;
//不存在這個任務(wù),則創(chuàng)建一個新的任務(wù)
if (scheduledTask == null) {
scheduledTask = new ScheduledTask(task);
newTask = true;
}
//如果調(diào)度器不是空
if (this.taskScheduler != null) {
//對任務(wù)進行調(diào)度刑棵,這里的taskScheduler是ConcurrentTaskScheduler
scheduledTask.future = this.taskScheduler.schedule(task.getRunnable(), task.getTrigger());
}
else {//此時taskScheduler還沒有初始化巴刻,這執(zhí)行不了定時任務(wù),需要先記錄起來蛉签,后面taskScheduler初始化之后進行調(diào)用
//增加到cronTasks中
addCronTask(task);
//保存到?jīng)]有執(zhí)行的
this.unresolvedTasks.put(task, scheduledTask);
}
//返回scheduledTask或者null胡陪,這個時候任務(wù)已經(jīng)在于jdk自帶的ScheduledExecutorService中到后面會進行自動調(diào)用
return (newTask ? scheduledTask : null);
}
?可以看到這里有幾個內(nèi)部的字段,其中unresolvedTasks
是用來保存沒有解析過的任務(wù)集合碍舍,taskScheduler
是指定定時任務(wù)的對象ConcurrentTaskScheduler
柠座,現(xiàn)在說一下這個對象的創(chuàng)建時機,ScheduledTaskRegistrar
類實現(xiàn)了InitializingBean
接口的afterPropertiesSet
方法片橡,而在afterPropertiesSet
方法中會有實力化ConcurrentTaskScheduler
類愚隧,而實例化的過程也是一個重要的過程,因為這個里面涉及到了定時任務(wù)調(diào)度用的線程池。
@Override
public void afterPropertiesSet() {
scheduleTasks();
}
@SuppressWarnings("deprecation")
protected void scheduleTasks() {
//TaskScheduler還沒創(chuàng)建則進行創(chuàng)建
if (this.taskScheduler == null) {
//創(chuàng)建用于定時調(diào)用的執(zhí)行器,使用的是jdk自帶的ScheduledExecutorService
this.localExecutor = Executors.newSingleThreadScheduledExecutor();
//創(chuàng)建ConcurrentTaskScheduler锻全,用于執(zhí)行任務(wù)的調(diào)度
this.taskScheduler = new ConcurrentTaskScheduler(this.localExecutor);
}
if (this.triggerTasks != null) {
for (TriggerTask task : this.triggerTasks) {
addScheduledTask(scheduleTriggerTask(task));
}
}
//這里面放根據(jù)cron表達式創(chuàng)建的定時任務(wù)對象
if (this.cronTasks != null) {
for (CronTask task : this.cronTasks) {
addScheduledTask(scheduleCronTask(task));
}
}
//根據(jù)fixRate屬性創(chuàng)建的定時任務(wù)對象
if (this.fixedRateTasks != null) {
for (IntervalTask task : this.fixedRateTasks) {
addScheduledTask(scheduleFixedRateTask(task));
}
}
//根據(jù)fixDelay屬性創(chuàng)建的定時任務(wù)對象
if (this.fixedDelayTasks != null) {
for (IntervalTask task : this.fixedDelayTasks) {
addScheduledTask(scheduleFixedDelayTask(task));
}
}
}
?到這里就可以知道狂塘,在Spring中用于進行定時任務(wù)調(diào)度用的執(zhí)行器是jdk自帶的ScheduledExecutorService
類。我們同時看到對于根據(jù)@Schedule
屬性形成的不同的調(diào)度對象鳄厌,會有不同的處理形式進行調(diào)度荞胡,但是在這些處理的方法中都會用到同一個對象進行調(diào)度,就是上面的ConcurrentTaskScheduler
對象了嚎,這里就不例舉代碼了泪漂,只列舉出共同的點
public ScheduledTask scheduleCronTask(CronTask task) {
.......
scheduledTask.future = this.taskScheduler.schedule(task.getRunnable(), task.getTrigger());
.......
}
public ScheduledTask scheduleFixedRateTask(FixedRateTask task) {
......
scheduledTask.future =this.taskScheduler.scheduleAtFixedRate(task.getRunnable(), task.getInterval());
......
}
public ScheduledTask scheduleFixedDelayTask(FixedDelayTask task) {
......
scheduledTask.future =this.taskScheduler.scheduleWithFixedDelay(task.getRunnable(), startTime, task.getInterval());
......
}
?因此這里有必要進入到ConcurrentTaskScheduler
的創(chuàng)建方法中進行查看,
static {
try {
managedScheduledExecutorServiceClass = ClassUtils.forName(
"javax.enterprise.concurrent.ManagedScheduledExecutorService",
ConcurrentTaskScheduler.class.getClassLoader());
}
catch (ClassNotFoundException ex) {
// JSR-236 API not available...
managedScheduledExecutorServiceClass = null;
}
}
public ConcurrentTaskScheduler(ScheduledExecutorService scheduledExecutor) {
//初始化父類
super(scheduledExecutor);
//初始化調(diào)度器
this.scheduledExecutor = initScheduledExecutor(scheduledExecutor);
}
private ScheduledExecutorService initScheduledExecutor(@Nullable ScheduledExecutorService scheduledExecutor) {
if (scheduledExecutor != null) {
this.scheduledExecutor = scheduledExecutor;
//初始化的時候會尋找ManagedScheduledExecutorService這個類歪泳,這個類是JSR236的規(guī)范跟ScheduledExecutorService一樣的作用對其做了一些擴展萝勤,默認為false
this.enterpriseConcurrentScheduler = (managedScheduledExecutorServiceClass != null &&
managedScheduledExecutorServiceClass.isInstance(scheduledExecutor));
}
else {
this.scheduledExecutor = Executors.newSingleThreadScheduledExecutor();
this.enterpriseConcurrentScheduler = false;
}
return this.scheduledExecutor;
}
?可以看到這里主要就是設(shè)置調(diào)度定時任務(wù)用的執(zhí)行器。現(xiàn)在看看上面在ScheduledTaskRegistrar
中調(diào)用ConcurrentTaskScheduler
中的方法呐伞,這里就列舉一個schedule
方法敌卓,其他的方法邏輯大同小異。
public ScheduledFuture<?> schedule(Runnable task, Trigger trigger) {
try {
//存在ManagedScheduledExecutorService這個類的時候伶氢,這個是true
if (this.enterpriseConcurrentScheduler) {
return new EnterpriseConcurrentTriggerScheduler().schedule(decorateTask(task, true), trigger);
}
else {
//創(chuàng)建錯誤處理器
ErrorHandler errorHandler =
(this.errorHandler != null ? this.errorHandler : TaskUtils.getDefaultErrorHandler(true));
//創(chuàng)建一個ReschedulingRunnable然后進行調(diào)度趟径,并返回一個ScheduledFuture
return new ReschedulingRunnable(task, trigger, this.scheduledExecutor, errorHandler).schedule();
}
}
catch (RejectedExecutionException ex) {
throw new TaskRejectedException("Executor [" + this.scheduledExecutor + "] did not accept task: " + task, ex);
}
}
?這里可以看到前面的enterpriseConcurrentScheduler
起到被用到了,這里進入到下面的else分之癣防,這里就是創(chuàng)建一個調(diào)度的時候出現(xiàn)異常的時候的異常處理器蜗巧,然后會創(chuàng)建一個ReschedulingRunnable
對象,然后調(diào)用這個對象的調(diào)度方法蕾盯,進行任務(wù)的調(diào)用幕屹。
public ReschedulingRunnable(
Runnable delegate, Trigger trigger, ScheduledExecutorService executor, ErrorHandler errorHandler) {
//設(shè)置調(diào)度的任務(wù)跟錯誤處理器
super(delegate, errorHandler);
this.trigger = trigger;
this.executor = executor;
}
@Nullable
public ScheduledFuture<?> schedule() {
synchronized (this.triggerContextMonitor) {
//計算出下一次的調(diào)用時間,這里會用到前面保存過的調(diào)用時間信息,然后進行計算望拖。
this.scheduledExecutionTime = this.trigger.nextExecutionTime(this.triggerContext);
if (this.scheduledExecutionTime == null) {
return null;
}
long initialDelay = this.scheduledExecutionTime.getTime() - System.currentTimeMillis();
//進行任務(wù)的調(diào)度渺尘,這里調(diào)用的不是根據(jù)需要調(diào)用的方法創(chuàng)建出來的調(diào)度對象,是這個類本身
this.currentFuture = this.executor.schedule(this, initialDelay, TimeUnit.MILLISECONDS);
return this;
}
}
public void run() {
//獲取當前的餓時間
Date actualExecutionTime = new Date();
//調(diào)用父類的run方法靠娱,父類的fun方法會調(diào)用需要調(diào)用的方法創(chuàng)建出來的調(diào)度對象沧烈,也就是我們貼有注解的方法
super.run();
Date completionTime = new Date();
synchronized (this.triggerContextMonitor) {
Assert.state(this.scheduledExecutionTime != null, "No scheduled execution");
//更新調(diào)用時間
this.triggerContext.update(this.scheduledExecutionTime, actualExecutionTime, completionTime);
//檢查這個方法是否被取消了
if (!obtainCurrentFuture().isCancelled()) {
//進行類本身的調(diào)用,這里主要是刷新下次的調(diào)用時間
schedule();
}
}
}
?到這里整個調(diào)度的分析過程完成的差不多像云,這里會計算出來下一次的調(diào)度時間锌雀,然后同時會把ReschedulingRunnable
對象本身給傳到對應(yīng)的ScheduledExecutorService
中,而ReschedulingRunnable
對象間接的實現(xiàn)了Runnable
對象迅诬,因此這個對象的run
方法對定時的被調(diào)用腋逆,而且這個run
方法中必須包含對本身的schedule
調(diào)用才能實現(xiàn)往復(fù)循環(huán)的調(diào)用。
2.3 定時任務(wù)的取消
2.3.1 第一種情況侈贷,貼有注解的bean銷毀的時候取消任務(wù)
?到這里整個的貼有@Schedule
跟@Schedules
注解的方法的調(diào)用過程就分析完了惩歉,現(xiàn)在還剩下一點就是,定時任務(wù)的取消了俏蛮。其實關(guān)于定時任務(wù)的取消從前面我們就能看出一點的端倪出來撑蚌,因為ScheduledAnnotationBeanPostProcessor
類實現(xiàn)了DestructionAwareBeanPostProcessor
類就能看出來了。現(xiàn)在看看實現(xiàn)的這個接口的邏輯
//實現(xiàn)了DestructionAwareBeanPostProcessor搏屑,bean銷毀之前調(diào)用
public void postProcessBeforeDestruction(Object bean, String beanName) {
Set<ScheduledTask> tasks;
synchronized (this.scheduledTasks) {
//從任務(wù)列表中移除對應(yīng)的bean争涌,如果存在會則會取出來
tasks = this.scheduledTasks.remove(bean);
}
if (tasks != null) {
//迭代任務(wù)
for (ScheduledTask task : tasks) {
//依次調(diào)用取消調(diào)度
task.cancel();
}
}
?可以看到任務(wù)的取消相比較,任務(wù)的創(chuàng)建還是簡單的辣恋。
2.3.2 容器銷毀的時候取消任務(wù)
?ScheduledAnnotationBeanPostProcessor
類實現(xiàn)了DisposableBean
因此亮垫,在容器銷毀的時候會調(diào)用其實現(xiàn)的destroy
方法
//實現(xiàn)DisposableBean,bean銷毀的時候調(diào)用
public void destroy() {
synchronized (this.scheduledTasks) {
//獲取所有的任務(wù)
Collection<Set<ScheduledTask>> allTasks = this.scheduledTasks.values();
//一次取消
for (Set<ScheduledTask> tasks : allTasks) {
for (ScheduledTask task : tasks) {
task.cancel();
}
}
//情況任務(wù)列表
this.scheduledTasks.clear();
}
//銷毀ScheduledTaskRegistrar中保存的任務(wù)
this.registrar.destroy();
}
?到此整個分析就結(jié)束了伟骨,過程可能有點復(fù)雜饮潦,最好是自己debug的時候進行看,也可以多看幾次加深印象携狭。