在半個月之前吞获,有幸看了xxl-job源碼,原本打算寫一篇源碼分析文章卫旱。結(jié)果由于瑣碎的事情干擾了,擱淺了坐昙。本篇文章先預(yù)熱一下,講下xxl-job中關(guān)于quartz知識芋忿。(本文內(nèi)容參考自xxl-job官網(wǎng))
xxl-job設(shè)計思想和調(diào)度模塊剖析
xxl-job將調(diào)度行為抽象形成"調(diào)度中心"公共平臺炸客,而平臺自身并不承擔業(yè)務(wù)邏輯,"調(diào)度中心"負責發(fā)起調(diào)度請求盗飒。將任務(wù)抽象成分散的JobHandler嚷量,交由"執(zhí)行器"統(tǒng)一管理,"執(zhí)行器"負責接收調(diào)度請求并執(zhí)行對應(yīng)的JobHandler中業(yè)務(wù)邏輯逆趣。因此蝶溶,"調(diào)度"和"任務(wù)"兩部分可以相互解耦,提高系統(tǒng)整體穩(wěn)定性和擴展性宣渗。
quartz的不足
Quartz作為開源作業(yè)調(diào)度中的佼佼者抖所,是作業(yè)調(diào)度的首選。集群環(huán)境中Quartz采用API的方式對任務(wù)進行管理痕囱,但是會存在以下問題:
- 調(diào)用API的方式操作任務(wù)田轧,不人性化。
- 需要持久化業(yè)務(wù)QuartzJobBean到底層數(shù)據(jù)表中鞍恢,系統(tǒng)侵入性相當嚴重傻粘。
- 調(diào)度邏輯和QuartzJobBean耦合在同一個項目中每窖,這將導(dǎo)致一個問題,在調(diào)度任務(wù)數(shù)量逐漸增多弦悉,同時調(diào)度任務(wù)邏輯逐漸加重的情況下窒典,此時調(diào)度系統(tǒng)的性能將大大受限于業(yè)務(wù)。
- Quartz底層以"搶占式"獲取DB鎖并由搶占成功節(jié)點負責運行任務(wù)稽莉,會導(dǎo)致節(jié)點負載懸殊非常大瀑志,而xxl-job通過執(zhí)行器實現(xiàn)"協(xié)同分配式"運行任務(wù),充分發(fā)揮集群優(yōu)勢污秆,負載各節(jié)點均衡劈猪。
RemoteHttpJobBean
常規(guī)Quartz的開發(fā),任務(wù)邏輯一般維護在QuartzJobBean中良拼,耦合很嚴重战得。xxl-job中的調(diào)度模塊和任務(wù)模塊完全解耦,調(diào)度模塊中的所有調(diào)度任務(wù)都使用的是同一個QuartzJobBean(也就是RemoteHttpJobBean)庸推。不同的調(diào)度任務(wù)將各自參數(shù)維護在各自擴展表數(shù)據(jù)中贡避,當觸發(fā)RemoteHttpJobBean執(zhí)行時,將會解析不同的任務(wù)參數(shù)發(fā)起遠程調(diào)用予弧,調(diào)用各自的遠程執(zhí)行器服務(wù)。這種調(diào)用模型類似RPC調(diào)用湖饱,RemoteHttpJobBean提供調(diào)用代理的功能掖蛤,而執(zhí)行器提供遠程服務(wù)的功能。
調(diào)度中心HA(集群)
基于Quartz的集群方案井厌,數(shù)據(jù)庫選用Mysql蚓庭;集群分布式并發(fā)環(huán)境中使用QUARTZ定時任務(wù)調(diào)度,會在各個節(jié)點會上報任務(wù)仅仆,存到數(shù)據(jù)庫中器赞,執(zhí)行時會從數(shù)據(jù)庫中取出觸發(fā)器來執(zhí)行,如果觸發(fā)器的名稱和執(zhí)行時間相同墓拜,則只有一個節(jié)點去執(zhí)行此任務(wù)港柜。
# 基于Quartz的集群方案,數(shù)據(jù)庫選用Mysql咳榜;
# 集群分布式并發(fā)環(huán)境中使用QUARTZ定時任務(wù)調(diào)度夏醉,會在各個節(jié)點會上報任務(wù),存到數(shù)據(jù)庫中涌韩。
# 執(zhí)行時會從數(shù)據(jù)庫中取出觸發(fā)器來執(zhí)行,如果觸發(fā)器的名稱和執(zhí)行時間相同靶擦,則只有一個節(jié)點去執(zhí)行此任務(wù)。
# for cluster
org.quartz.jobStore.tablePrefix: XXL_JOB_QRTZ_
org.quartz.jobStore.class: org.quartz.impl.jdbcjobstore.JobStoreTX
org.quartz.jobStore.isClustered: true
org.quartz.jobStore.clusterCheckinInterval: 5000
調(diào)度線程池
調(diào)度采用線程池方式實現(xiàn)玄捕,避免單線程因阻塞而引起任務(wù)調(diào)度延遲
# 調(diào)度采用線程池方式實現(xiàn),避免單線程因阻塞而引起任務(wù)調(diào)度延遲寂纪。
org.quartz.threadPool.class: org.quartz.simpl.SimpleThreadPool
org.quartz.threadPool.threadCount: 50
org.quartz.threadPool.threadPriority: 5
org.quartz.threadPool.threadsInheritContextClassLoaderOfInitializingThread: true
@DisallowConcurrentExecution
xxl-job調(diào)度模塊的"調(diào)度中心"默認不使用該注解,因為RemoteHttpJobBean為公共QuartzJobBean赌结,這樣在多線程調(diào)度的情況下,調(diào)度模塊被阻塞的幾率很低拟杉,大大提高了調(diào)度系統(tǒng)的承載量搬设。xxl-job的每個調(diào)度任務(wù)雖然在調(diào)度模塊時并行調(diào)度執(zhí)行的拿穴,但是任務(wù)調(diào)度傳遞到任務(wù)模塊的執(zhí)行器確實是串行執(zhí)行的默色,同時支持任務(wù)終止腿宰。
話外音:Quartz定時任務(wù)默認都是并發(fā)執(zhí)行的吃度,不會等待上一次任務(wù)執(zhí)行完畢椿每,只要間隔時間到就會執(zhí)行, 如果定時任執(zhí)行太長英遭,會長時間占用資源贪绘,導(dǎo)致其它任務(wù)堵塞税灌。
misfire策略
misfire用于Trigger觸發(fā)時,線程池中沒有可用的線程或者調(diào)度器被關(guān)閉了洛勉,此時這個Trigger就變成了misfire收毫。當下次調(diào)度器啟動或者有線程可用時此再,會檢查處于misfire狀態(tài)的Trigger输拇。而misfire的狀態(tài)值決定了調(diào)度器如何處理這個Trigger策吠。
quartz有個全局的參數(shù)misfireThreadshold可以設(shè)置允許的超時時間猴抹,單位為毫秒洽糟。如果超過了就不執(zhí)行,未超過就執(zhí)行嘱丢。
org.quartz.jobStore.misfireThreshold: 60000
造成misfire的可能原因:服務(wù)重啟越驻;調(diào)度線程被QuartzJobBean阻塞缀旁,線程被耗盡并巍;某個任務(wù)啟用了@DisallowConcurrentExecution刽射,上次調(diào)度持續(xù)阻塞誓禁,下次調(diào)度被錯過摹恰。
Misfire規(guī)則俗慈,假設(shè)任務(wù)是從上午9點到下午17點時間范圍執(zhí)行:
withMisfireHandlingInstructionDoNothing:不觸發(fā)立即執(zhí)行姜盈,等待下次調(diào)度
withMisfireHandlingInstructionIgnoreMisfires:以錯過的第一個頻率時間立刻開始執(zhí)行馏颂,重做錯過的所有頻率周期后救拉,當下一次觸發(fā)頻率發(fā)生時間大于當前時間后亿絮,再按照正常的Cron頻率依次執(zhí)行派昧。如果9點misfire了蒂萎,在10:15系統(tǒng)恢復(fù)之后五慈。9點泻拦,10點的misfire會馬上執(zhí)行。
withMisfireHandlingInstructionFireAndProceed:以當前時間為觸發(fā)頻率立刻觸發(fā)一次執(zhí)行陆错,然后按照Cron頻率依次執(zhí)行音瓷。假設(shè)9點绳慎,10點的任務(wù)都misfire了杏愤,系統(tǒng)在10:15恢復(fù)后珊楼,只會執(zhí)行一次misfire,下次正點執(zhí)行画舌。
xxl-job默認misfire規(guī)則為:withMisfireHandlingInstructionDoNothing
// 3曲聂、corn trigger
CronScheduleBuilder cronScheduleBuilder = CronScheduleBuilder.cronSchedule(cronExpression).withMisfireHandlingInstructionDoNothing(); // withMisfireHandlingInstructionDoNothing 忽略掉調(diào)度終止過程中忽略的調(diào)度
CronTrigger cronTrigger = TriggerBuilder.newTrigger().withIdentity(triggerKey).withSchedule(cronScheduleBuilder).build();
xxl-job和quartz數(shù)據(jù)庫表講解
XXL-JOB調(diào)度模塊基于Quartz集群實現(xiàn)朋腋,其"調(diào)度數(shù)據(jù)庫"是在Quartz的11張集群mysql表基礎(chǔ)上擴展而成旭咽。
xxl-job的5張擴展表
XXL_JOB_QRTZ_TRIGGER_GROUP
:執(zhí)行器信息表,維護任務(wù)執(zhí)行器信息
CREATE TABLE `XXL_JOB_QRTZ_TRIGGER_GROUP` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`app_name` varchar(64) NOT NULL COMMENT '執(zhí)行器AppName',
`title` varchar(12) NOT NULL COMMENT '執(zhí)行器名稱',
`order` tinyint(4) NOT NULL DEFAULT '0' COMMENT '排序',
`address_type` tinyint(4) NOT NULL DEFAULT '0' COMMENT '執(zhí)行器地址類型:0=自動注冊、1=手動錄入',
`address_list` varchar(512) DEFAULT NULL COMMENT '執(zhí)行器地址列表洽议,多地址逗號分隔',
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
XXL_JOB_QRTZ_TRIGGER_INFO
:調(diào)度擴展信息表, 用于保存XXL-JOB調(diào)度任務(wù)的擴展信息亚兄,如任務(wù)分組匈勋、任務(wù)名洽洁、機器地址饿自、執(zhí)行器昭雌、執(zhí)行入?yún)⒑蛨缶]件等等
CREATE TABLE `XXL_JOB_QRTZ_TRIGGER_INFO` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`job_group` int(11) NOT NULL COMMENT '執(zhí)行器主鍵ID',
`job_cron` varchar(128) NOT NULL COMMENT '任務(wù)執(zhí)行CRON',
`job_desc` varchar(255) NOT NULL,
`add_time` datetime DEFAULT NULL,
`update_time` datetime DEFAULT NULL,
`author` varchar(64) DEFAULT NULL COMMENT '作者',
`alarm_email` varchar(255) DEFAULT NULL COMMENT '報警郵件',
`executor_route_strategy` varchar(50) DEFAULT NULL COMMENT '執(zhí)行器路由策略',
`executor_handler` varchar(255) DEFAULT NULL COMMENT '執(zhí)行器任務(wù)handler',
`executor_param` varchar(512) DEFAULT NULL COMMENT '執(zhí)行器任務(wù)參數(shù)',
`executor_block_strategy` varchar(50) DEFAULT NULL COMMENT '阻塞處理策略',
`executor_timeout` int(11) NOT NULL DEFAULT '0' COMMENT '任務(wù)執(zhí)行超時時間,單位秒',
`executor_fail_retry_count` int(11) NOT NULL DEFAULT '0' COMMENT '失敗重試次數(shù)',
`glue_type` varchar(50) NOT NULL COMMENT 'GLUE類型',
`glue_source` mediumtext COMMENT 'GLUE源代碼',
`glue_remark` varchar(128) DEFAULT NULL COMMENT 'GLUE備注',
`glue_updatetime` datetime DEFAULT NULL COMMENT 'GLUE更新時間',
`child_jobid` varchar(255) DEFAULT NULL COMMENT '子任務(wù)ID总放,多個逗號分隔',
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
XXL_JOB_QRTZ_TRIGGER_LOG
:調(diào)度日志表,用于保存XXL-JOB任務(wù)調(diào)度的歷史信息间聊,如調(diào)度結(jié)果哎榴、執(zhí)行結(jié)果、調(diào)度入?yún)⑵浴⒄{(diào)度機器和執(zhí)行器等等姿鸿;
CREATE TABLE `XXL_JOB_QRTZ_TRIGGER_LOG` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`job_group` int(11) NOT NULL COMMENT '執(zhí)行器主鍵ID',
`job_id` int(11) NOT NULL COMMENT '任務(wù)苛预,主鍵ID',
`executor_address` varchar(255) DEFAULT NULL COMMENT '執(zhí)行器地址,本次執(zhí)行的地址',
`executor_handler` varchar(255) DEFAULT NULL COMMENT '執(zhí)行器任務(wù)handler',
`executor_param` varchar(512) DEFAULT NULL COMMENT '執(zhí)行器任務(wù)參數(shù)',
`executor_sharding_param` varchar(20) DEFAULT NULL COMMENT '執(zhí)行器任務(wù)分片參數(shù)昔馋,格式如 1/2',
`executor_fail_retry_count` int(11) NOT NULL DEFAULT '0' COMMENT '失敗重試次數(shù)',
`trigger_time` datetime DEFAULT NULL COMMENT '調(diào)度-時間',
`trigger_code` int(11) NOT NULL COMMENT '調(diào)度-結(jié)果',
`trigger_msg` text COMMENT '調(diào)度-日志',
`handle_time` datetime DEFAULT NULL COMMENT '執(zhí)行-時間',
`handle_code` int(11) NOT NULL COMMENT '執(zhí)行-狀態(tài)',
`handle_msg` text COMMENT '執(zhí)行-日志',
PRIMARY KEY (`id`),
KEY `I_trigger_time` (`trigger_time`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
XXL_JOB_QRTZ_TRIGGER_LOGGLUE
:任務(wù)GLUE日志,用于保存GLUE更新歷史丘薛,用于支持GLUE的版本回溯功能榔袋;
CREATE TABLE `XXL_JOB_QRTZ_TRIGGER_LOGGLUE` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`job_id` int(11) NOT NULL COMMENT '任務(wù)凰兑,主鍵ID',
`glue_type` varchar(50) DEFAULT NULL COMMENT 'GLUE類型',
`glue_source` mediumtext COMMENT 'GLUE源代碼',
`glue_remark` varchar(128) NOT NULL COMMENT 'GLUE備注',
`add_time` timestamp NULL DEFAULT NULL,
`update_time` timestamp NULL DEFAULT NULL ON UPDATE CURRENT_TIMESTAMP,
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
XXL_JOB_QRTZ_TRIGGER_REGISTRY
:執(zhí)行器注冊表,維護在線的執(zhí)行器和調(diào)度中心機器地址信息锅知;
CREATE TABLE XXL_JOB_QRTZ_TRIGGER_REGISTRY (
`id` int(11) NOT NULL AUTO_INCREMENT,
`registry_group` varchar(255) NOT NULL,
`registry_key` varchar(255) NOT NULL,
`registry_value` varchar(255) NOT NULL,
`update_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
quartz的11張系統(tǒng)表(只會介紹常見的,后續(xù)用到了其他的再補充)
XXL_JOB_QRTZ_JOB_DETAILS
:存儲的是job的詳細信息昌妹,包括:[DESCRIPTION]
描述飞崖,[IS_DURABLE]
是否持久化固歪,[JOB_DATA]
持久化對象等基本信息。
CREATE TABLE XXL_JOB_QRTZ_JOB_DETAILS
(
SCHED_NAME VARCHAR(120) NOT NULL,
JOB_NAME VARCHAR(200) NOT NULL,
JOB_GROUP VARCHAR(200) NOT NULL,
DESCRIPTION VARCHAR(250) NULL,
JOB_CLASS_NAME VARCHAR(250) NOT NULL,
IS_DURABLE VARCHAR(1) NOT NULL,
IS_NONCONCURRENT VARCHAR(1) NOT NULL,
IS_UPDATE_DATA VARCHAR(1) NOT NULL,
REQUESTS_RECOVERY VARCHAR(1) NOT NULL,
JOB_DATA BLOB NULL,
PRIMARY KEY (SCHED_NAME,JOB_NAME,JOB_GROUP)
);
XXL_JOB_QRTZ_CRON_TRIGGERS
:存儲CronTrigger
相關(guān)信息蒲讯,這也是我們使用最多的觸發(fā)器伶椿。
CREATE TABLE XXL_JOB_QRTZ_CRON_TRIGGERS
(
SCHED_NAME VARCHAR(120) NOT NULL,
TRIGGER_NAME VARCHAR(200) NOT NULL,
TRIGGER_GROUP VARCHAR(200) NOT NULL,
CRON_EXPRESSION VARCHAR(200) NOT NULL,
TIME_ZONE_ID VARCHAR(80),
PRIMARY KEY (SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP),
FOREIGN KEY (SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP)
REFERENCES XXL_JOB_QRTZ_TRIGGERS(SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP)
);
XXL_JOB_QRTZ_PAUSED_TRIGGER_GRPS
:存放暫停掉的觸發(fā)器
CREATE TABLE XXL_JOB_QRTZ_PAUSED_TRIGGER_GRPS
(
SCHED_NAME VARCHAR(120) NOT NULL,
TRIGGER_GROUP VARCHAR(200) NOT NULL,
PRIMARY KEY (SCHED_NAME,TRIGGER_GROUP)
);
XXL_JOB_QRTZ_FIRED_TRIGGERS
:存儲已經(jīng)觸發(fā)的trigger相關(guān)信息,trigger隨著時間的推移狀態(tài)發(fā)生變化,直到最后trigger執(zhí)行完成,從表中被刪除踩麦。相同的trigger和task谓谦,每觸發(fā)一次都會創(chuàng)建一個實例反粥;從剛被創(chuàng)建的ACQUIRED狀態(tài),到EXECUTING狀態(tài)郑气,最后執(zhí)行完從數(shù)據(jù)庫中刪除尾组;
CREATE TABLE XXL_JOB_QRTZ_FIRED_TRIGGERS
(
SCHED_NAME VARCHAR(120) NOT NULL,
ENTRY_ID VARCHAR(95) NOT NULL,
TRIGGER_NAME VARCHAR(200) NOT NULL,
TRIGGER_GROUP VARCHAR(200) NOT NULL,
INSTANCE_NAME VARCHAR(200) NOT NULL,
FIRED_TIME BIGINT(13) NOT NULL,
SCHED_TIME BIGINT(13) NOT NULL,
PRIORITY INTEGER NOT NULL,
STATE VARCHAR(16) NOT NULL,
JOB_NAME VARCHAR(200) NULL,
JOB_GROUP VARCHAR(200) NULL,
IS_NONCONCURRENT VARCHAR(1) NULL,
REQUESTS_RECOVERY VARCHAR(1) NULL,
PRIMARY KEY (SCHED_NAME,ENTRY_ID)
);
XXL_JOB_QRTZ_SIMPLE_TRIGGERS
:存儲SimpleTrigger信息,timesTriggered默認值為0爷耀,當timesTriggered > repeatCount停止trigger,當執(zhí)行完畢之后此記錄會被刪除
CREATE TABLE XXL_JOB_QRTZ_SIMPLE_TRIGGERS
(
SCHED_NAME VARCHAR(120) NOT NULL,
TRIGGER_NAME VARCHAR(200) NOT NULL,
TRIGGER_GROUP VARCHAR(200) NOT NULL,
REPEAT_COUNT BIGINT(7) NOT NULL,
REPEAT_INTERVAL BIGINT(12) NOT NULL,
TIMES_TRIGGERED BIGINT(10) NOT NULL,
PRIMARY KEY (SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP),
FOREIGN KEY (SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP)
REFERENCES XXL_JOB_QRTZ_TRIGGERS(SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP)
);
XXL_JOB_QRTZ_TRIGGERS
: 和XXL_JOB_QRTZ_FIRED_TRIGGERS
存放的不一樣咆耿,不管trigger
觸發(fā)了多少次都只有一條記錄萨螺,TRIGGER_STATE
用來標識當前trigger
的狀態(tài)椭盏;假如cronTrigger每小時執(zhí)行一次掏颊,執(zhí)行完之后一直是WAITING狀態(tài);假如cronTrigger每6秒執(zhí)行一次狀態(tài)是ACQUIRED狀態(tài)准浴;假如simpleTrigger設(shè)置的執(zhí)行次數(shù)為5兄裂,那么重復(fù)執(zhí)行5次后狀態(tài)為COMPLETE,并且會被刪除匾南。
CREATE TABLE XXL_JOB_QRTZ_TRIGGERS
(
SCHED_NAME VARCHAR(120) NOT NULL,
TRIGGER_NAME VARCHAR(200) NOT NULL,
TRIGGER_GROUP VARCHAR(200) NOT NULL,
JOB_NAME VARCHAR(200) NOT NULL,
JOB_GROUP VARCHAR(200) NOT NULL,
DESCRIPTION VARCHAR(250) NULL,
NEXT_FIRE_TIME BIGINT(13) NULL,
PREV_FIRE_TIME BIGINT(13) NULL,
PRIORITY INTEGER NULL,
TRIGGER_STATE VARCHAR(16) NOT NULL,
TRIGGER_TYPE VARCHAR(8) NOT NULL,
START_TIME BIGINT(13) NOT NULL,
END_TIME BIGINT(13) NULL,
CALENDAR_NAME VARCHAR(200) NULL,
MISFIRE_INSTR SMALLINT(2) NULL,
JOB_DATA BLOB NULL,
PRIMARY KEY (SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP),
FOREIGN KEY (SCHED_NAME,JOB_NAME,JOB_GROUP)
REFERENCES XXL_JOB_QRTZ_JOB_DETAILS(SCHED_NAME,JOB_NAME,JOB_GROUP)
);
XXL_JOB_QRTZ_SCHEDULER_STATE
:存儲所有節(jié)點的scheduler
,會定期檢查scheduler
是否失效豹爹。
CREATE TABLE XXL_JOB_QRTZ_SCHEDULER_STATE
(
SCHED_NAME VARCHAR(120) NOT NULL,
INSTANCE_NAME VARCHAR(200) NOT NULL,
LAST_CHECKIN_TIME BIGINT(13) NOT NULL,
CHECKIN_INTERVAL BIGINT(13) NOT NULL,
PRIMARY KEY (SCHED_NAME,INSTANCE_NAME)
);
XXL_JOB_QRTZ_LOCKS
:Quartz提供的鎖表,為多個節(jié)點調(diào)度提供分布式鎖孩等,實現(xiàn)分布式調(diào)度,默認有2個鎖,STATE_ACCESS
主要用在scheduler定期檢查是否失效的時候权她,保證只有一個節(jié)點去處理已經(jīng)失效的scheduler
隅要。TRIGGER_ACCESS
主要用在TRIGGER
被調(diào)度的時候,保證只有一個節(jié)點去執(zhí)行調(diào)度尼啡。
CREATE TABLE XXL_JOB_QRTZ_LOCKS
(
SCHED_NAME VARCHAR(120) NOT NULL,
LOCK_NAME VARCHAR(40) NOT NULL,
PRIMARY KEY (SCHED_NAME,LOCK_NAME)
);
關(guān)于單機版的quartz與Spring XML集成
1.創(chuàng)建Job類,無須繼承父類书聚,直接配置MethodInvokingJobDetailFactoryBean
即可。但需要指定一下兩個屬性:
- targetObject:指定包含任務(wù)執(zhí)行體的Bean實例驯杜。
- targetMethod:指定將指定Bean實例的該方法包裝成任務(wù)的執(zhí)行體
<!-- 配置Job類 -->
<bean id="myJob" class="com.cmazxiaoma.quartz.xml.MyJob"></bean>
<!-- 配置JobDetail -->
<bean id="myJobDetail" class="org.springframework.scheduling.quartz.MethodInvokingJobDetailFactoryBean">
<!-- 執(zhí)行目標job -->
<property name="targetObject" ref="myJob"></property>
<!-- 要執(zhí)行的方法 -->
<property name="targetMethod" value="execute"></property>
</bean>
<!-- 配置tirgger觸發(fā)器 -->
<bean id="cronTriggerFactoryBean" class="org.springframework.scheduling.quartz.CronTriggerFactoryBean">
<!-- jobDetail -->
<property name="jobDetail" ref="myJobDetail"></property>
<!-- cron表達式,執(zhí)行時間 每5秒執(zhí)行一次 -->
<property name="cronExpression" value="0/5 * * * * ?"></property>
</bean>
<!-- 配置調(diào)度工廠 -->
<bean id="springJobSchedulerFactoryBean" class="org.springframework.scheduling.quartz.SchedulerFactoryBean">
<property name="triggers">
<list>
<ref bean="cronTriggerFactoryBean"></ref>
</list>
</property>
</bean>
根據(jù)jdi.getJobDataMap().put("methodInvoker", this);
來看,如果Quartz是集群的話顽频,這里會拋出Couldn't store job: Unable to serialize JobDataMap for insertion into database because the value of property 'methodInvoker' is not serializable: org.springframework.scheduling.quartz.MethodInvokingJobDetailFactoryBean
因為methodInvoker
不能進行序列化,沒有實現(xiàn)serializable
接口莺奸。集群環(huán)境下,還是推薦用繼承QuartzJobBean創(chuàng)建Job甚疟。MethodInvokingJobDetailFactoryBean中的targetObject和targetMethod參數(shù)無法持久化到底層數(shù)據(jù)庫表。
最終還是根據(jù)concurrent策略選擇MethodInvokingJob或者StatefulMethodInvokingJob去反射調(diào)用targetObject中的targetMethod览妖。
public static class MethodInvokingJob extends QuartzJobBean {
protected static final Log logger = LogFactory.getLog(MethodInvokingJob.class);
private MethodInvoker methodInvoker;
/**
* Set the MethodInvoker to use.
*/
public void setMethodInvoker(MethodInvoker methodInvoker) {
this.methodInvoker = methodInvoker;
}
/**
* Invoke the method via the MethodInvoker.
*/
@Override
protected void executeInternal(JobExecutionContext context) throws JobExecutionException {
try {
context.setResult(this.methodInvoker.invoke());
}
catch (InvocationTargetException ex) {
if (ex.getTargetException() instanceof JobExecutionException) {
// -> JobExecutionException, to be logged at info level by Quartz
throw (JobExecutionException) ex.getTargetException();
}
else {
// -> "unhandled exception", to be logged at error level by Quartz
throw new JobMethodInvocationFailedException(this.methodInvoker, ex.getTargetException());
}
}
catch (Exception ex) {
// -> "unhandled exception", to be logged at error level by Quartz
throw new JobMethodInvocationFailedException(this.methodInvoker, ex);
}
}
}
/**
* Extension of the MethodInvokingJob, implementing the StatefulJob interface.
* Quartz checks whether or not jobs are stateful and if so,
* won't let jobs interfere with each other.
*/
@PersistJobDataAfterExecution
@DisallowConcurrentExecution
public static class StatefulMethodInvokingJob extends MethodInvokingJob {
// No implementation, just an addition of the tag interface StatefulJob
// in order to allow stateful method invoking jobs.
}
2.創(chuàng)建Job
類轧拄,myJob
類繼承QuartzJobBean
,實現(xiàn) executeInternal(JobExecutionContext jobexecutioncontext)
方法讽膏。然后再通過配置JobDetailFactoryBean
創(chuàng)建jobDetail
檩电。
<!-- 配置JobDetail -->
<bean id="myJobDetail" class="org.springframework.scheduling.quartz.JobDetailFactoryBean">
<property name="jobClass" value="com.cmazxiaoma.quartz.xml.MyJob"></property>
<property name="durability" value="true"></property>
</bean>
<!-- 配置tirgger觸發(fā)器 -->
<bean id="cronTriggerFactoryBean" class="org.springframework.scheduling.quartz.CronTriggerFactoryBean">
<!-- jobDetail -->
<property name="jobDetail" ref="myJobDetail"></property>
<!-- cron表達式府树,執(zhí)行時間 每5秒執(zhí)行一次 -->
<property name="cronExpression" value="0/5 * * * * ?"></property>
</bean>
<!-- 配置調(diào)度工廠 如果將lazy-init='false'那么容器啟動就會執(zhí)行調(diào)度程序 -->
<bean id="springJobSchedulerFactoryBean" class="org.springframework.scheduling.quartz.SchedulerFactoryBean">
<property name="triggers">
<list>
<ref bean="cronTriggerFactoryBean"></ref>
</list>
</property>
</bean>
requestsRecovery屬性設(shè)置為 true時俐末,當Quartz服務(wù)被中止后,再次啟動或集群中其他機器接手任務(wù)時會嘗試恢復(fù)執(zhí)行之前未完成的所有任務(wù)奄侠。
集群Quartz與SpringBoot集成
@Configuration
public class XxlJobDynamicSchedulerConfig {
@Bean
public SchedulerFactoryBean getSchedulerFactoryBean(DataSource dataSource){
SchedulerFactoryBean schedulerFactory = new SchedulerFactoryBean();
schedulerFactory.setDataSource(dataSource);
// 自動啟動
schedulerFactory.setAutoStartup(true);
// 延時啟動卓箫,應(yīng)用啟動成功后在啟動
schedulerFactory.setStartupDelay(20);
// 覆蓋DB中JOB:true、以數(shù)據(jù)庫中已經(jīng)存在的為準:false
schedulerFactory.setOverwriteExistingJobs(true);
schedulerFactory.setApplicationContextSchedulerContextKey("applicationContext");
schedulerFactory.setConfigLocation(new ClassPathResource("quartz.properties"));
return schedulerFactory;
}
@Bean(initMethod = "start", destroyMethod = "destroy")
public XxlJobDynamicScheduler getXxlJobDynamicScheduler(SchedulerFactoryBean schedulerFactory){
Scheduler scheduler = schedulerFactory.getScheduler();
XxlJobDynamicScheduler xxlJobDynamicScheduler = new XxlJobDynamicScheduler();
xxlJobDynamicScheduler.setScheduler(scheduler);
return xxlJobDynamicScheduler;
}
}
quartz.properties
# Default Properties file for use by StdSchedulerFactory
# to create a Quartz Scheduler Instance, if a different
# properties file is not explicitly specified.
#
org.quartz.scheduler.instanceName: DefaultQuartzScheduler
org.quartz.scheduler.instanceId: AUTO
org.quartz.scheduler.rmi.export: false
org.quartz.scheduler.rmi.proxy: false
org.quartz.scheduler.wrapJobExecutionInUserTransaction: false
# 調(diào)度采用線程池方式實現(xiàn)垄潮,避免單線程因阻塞而引起任務(wù)調(diào)度延遲烹卒。
org.quartz.threadPool.class: org.quartz.simpl.SimpleThreadPool
org.quartz.threadPool.threadCount: 50
org.quartz.threadPool.threadPriority: 5
org.quartz.threadPool.threadsInheritContextClassLoaderOfInitializingThread: true
# misfile:錯過了觸發(fā)時間,來處理規(guī)則弯洗。
# 可能原因:
# 1.服務(wù)重啟 2.調(diào)度線程被QuartzJobBean阻塞 3.線程被耗盡
# 4.某個任務(wù)啟動了@DisallowConcurrentExecution,上次調(diào)度持續(xù)阻塞旅急,下次調(diào)度被錯過。
# 假設(shè)任務(wù)是從上午9點到下午17點
# Misfire規(guī)則:
# withMisfireHandlingInstructionDoNothing:不觸發(fā)立即執(zhí)行牡整,等待下次調(diào)度
#——以錯過的第一個頻率時間立刻開始執(zhí)行
#——重做錯過的所有頻率周期后
#——當下一次觸發(fā)頻率發(fā)生時間大于當前時間后坠非,再按照正常的Cron頻率依次執(zhí)行
# 如果9點misfire了,在10:15系統(tǒng)恢復(fù)之后果正。9點炎码,10點的misfire會馬上執(zhí)行
# withMisfireHandlingInstructionIgnoreMisfires:以錯過的第一個頻率時間立刻開始執(zhí)行;
#——以當前時間為觸發(fā)頻率立刻觸發(fā)一次執(zhí)行
# ——然后按照Cron頻率依次執(zhí)行
# withMisfireHandlingInstructionFireAndProceed:以當前時間為觸發(fā)頻率立刻觸發(fā)一次執(zhí)行秋泳;
# 假設(shè)9點潦闲,10點的任務(wù)都misfire了,系統(tǒng)在10:15恢復(fù)后迫皱,只會執(zhí)行一次misfire,
# 下次正點執(zhí)行歉闰。
# XXL-JOB默認misfire規(guī)則為:withMisfireHandlingInstructionDoNothing
### 單位毫秒
org.quartz.jobStore.misfireThreshold: 60000
org.quartz.jobStore.maxMisfiresToHandleAtATime: 1
#org.quartz.jobStore.class: org.quartz.simpl.RAMJobStore
# 基于Quartz的集群方案,數(shù)據(jù)庫選用Mysql卓起;
# 集群分布式并發(fā)環(huán)境中使用QUARTZ定時任務(wù)調(diào)度和敬,會在各個節(jié)點會上報任務(wù),存到數(shù)據(jù)庫中戏阅。
# 執(zhí)行時會從數(shù)據(jù)庫中取出觸發(fā)器來執(zhí)行昼弟,如果觸發(fā)器的名稱和執(zhí)行時間相同,則只有一個節(jié)點去執(zhí)行此任務(wù)奕筐。
# for cluster
org.quartz.jobStore.tablePrefix: XXL_JOB_QRTZ_
org.quartz.jobStore.class: org.quartz.impl.jdbcjobstore.JobStoreTX
# 默認是內(nèi)存
#org.quartz.jobStore.class = org.quartz.simpl.RAMJobStore
org.quartz.jobStore.isClustered: true
org.quartz.jobStore.clusterCheckinInterval: 5000
在QuartzJobBean無法注入Service
以下測試都基于MySQL的Quartz集群環(huán)境下
前面說到基于MethodInvokingJobDetailFactoryBean創(chuàng)建Job舱痘,無法將targetObject和targetMethod參數(shù)持久化到數(shù)據(jù)庫表变骡,因此我們要想辦法將這2個參數(shù)存儲到JobDataMap中。
我們利用MethodInvokingJobDetailFactoryBean動態(tài)抽象構(gòu)造Job的思想芭逝,將其改造一番塌碌。
public class MyMethodInvokingJobBean extends QuartzJobBean implements ApplicationContextAware {
private String targetObject;
private String targetMethod;
private ApplicationContext applicationContext;
public void setTargetObject(String targetObject) {
this.targetObject = targetObject;
}
public void setTargetMethod(String targetMethod) {
this.targetMethod = targetMethod;
}
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
this.applicationContext = applicationContext;
}
@Override
protected void executeInternal(JobExecutionContext context) throws JobExecutionException {
Object object = applicationContext.getBean(this.targetObject);
System.out.println("targetObject:" + targetObject);
System.out.println("targetMethod:" + targetMethod);
try {
Method method = object.getClass().getMethod(this.targetMethod, new Class[] {});
method.invoke(object, new Object[]{});
} catch (NoSuchMethodException e) {
e.printStackTrace();
} catch (IllegalAccessException e) {
e.printStackTrace();
} catch (InvocationTargetException e) {
e.printStackTrace();
}
}
}
@Configuration
public class MyQuartzConfiguration {
@Autowired
private MyQuartzTask myQuartzTask;
@Bean(name = "myQuartzJobBeanJobDetail")
public JobDetailFactoryBean myQuartzJobBeanJobDetail() {
JobDetailFactoryBean jobDetailFactoryBean = new JobDetailFactoryBean();
jobDetailFactoryBean.setJobClass(MyQuartzJobBean.class);
jobDetailFactoryBean.setDurability(true);
jobDetailFactoryBean.setRequestsRecovery(true);
return jobDetailFactoryBean;
}
@Bean(name = "myQuartzTaskJobDetail")
public JobDetailFactoryBean myQuartzTaskJobDetail() {
JobDetailFactoryBean jobDetailFactoryBean = new JobDetailFactoryBean();
jobDetailFactoryBean.setRequestsRecovery(true);
jobDetailFactoryBean.setDurability(true);
jobDetailFactoryBean.getJobDataMap().put("targetObject", "MyQuartzTask");
jobDetailFactoryBean.getJobDataMap().put("targetMethod", "execute");
jobDetailFactoryBean.setJobClass(MyMethodInvokingJobBean.class);
return jobDetailFactoryBean;
}
}
public class MyQuartzJobBean extends QuartzJobBean {
@Autowired
private MyQuartzService myQuartzService;
@Override
protected void executeInternal(JobExecutionContext context) throws JobExecutionException {
System.out.println("MyQuartzJobBean executeInternal");
myQuartzService.print();
}
}
@Component(value = "MyQuartzTask")
public class MyQuartzTask {
@Autowired
private MyQuartzService myQuartzService;
public void execute() {
System.out.println("MyQuartzTask");
myQuartzService.print();
}
}
@Service
public class MyQuartzService {
public void print() {
System.out.println("MyQuartzService");
}
}
我們運行測試用例中的testMethodInvokingJobBean()
方法, 發(fā)現(xiàn)運行沒問題,已經(jīng)成功的注入了MyQuartzService
旬盯。大致的思想是:抽象出一個MyMethodInvokingJobBean
台妆,注入targetObject和targetMethod參數(shù),利用反射去執(zhí)行目標類的目標方法胖翰,達到動態(tài)執(zhí)行任務(wù)的目的接剩,大大降低代碼的耦合度。
public class QuartzTest extends AbstractSpringMvcTest {
@Autowired
private Scheduler scheduler;
@Resource(name = "myQuartzJobBeanJobDetail")
private JobDetail myQuartzJobBeanJobDetail;
@Resource(name = "myQuartzTaskJobDetail")
private JobDetail myQuartzTaskJobDetail;
@Test
public void testMethodInvokingJobBean() throws SchedulerException, InterruptedException {
TriggerKey triggerKey = TriggerKey.triggerKey("simpleTrigger", "simpleTriggerGroup");
SimpleScheduleBuilder simpleScheduleBuilder =
SimpleScheduleBuilder.simpleSchedule()
.withIntervalInSeconds(1)
.withRepeatCount(1);
SimpleTrigger simpleTrigger = (SimpleTrigger) TriggerBuilder.newTrigger()
.withIdentity(triggerKey)
.startNow()
.withSchedule(simpleScheduleBuilder)
.build();
scheduler.scheduleJob(myQuartzTaskJobDetail, simpleTrigger);
TimeUnit.MINUTES.sleep(10);
}
@Test
public void testJobDetailFactoryBean() throws InterruptedException, SchedulerException {
TriggerKey triggerKey = TriggerKey.triggerKey("simpleTrigger1", "simpleTriggerGroup");
SimpleScheduleBuilder simpleScheduleBuilder =
SimpleScheduleBuilder.simpleSchedule()
.withIntervalInSeconds(1)
.withRepeatCount(1);
SimpleTrigger simpleTrigger = (SimpleTrigger) TriggerBuilder.newTrigger()
.withIdentity(triggerKey)
.withSchedule(simpleScheduleBuilder)
.build();
scheduler.scheduleJob(myQuartzJobBeanJobDetail, simpleTrigger);
TimeUnit.MINUTES.sleep(10);
}
}
當我們運行testJobDetailFactoryBean()
時泡态,此時我們的JobClass是MyQuartzJobBean
。當我們的job類中方法要被執(zhí)行的時候迂卢,Quartz會根據(jù)JobClass重新實例化一個對象某弦,這里對象中的屬性都會為空,所以會拋出NPE異常而克。
看到這里有沒有覺得這個問題好熟悉靶壮,在上一篇文章中為什么我的HibernateDaoSupport沒有注入SessionFactory我們有提到過。當Quartz重新實例化對象后员萍,肯定是沒有調(diào)用populate()方法腾降。我們此時應(yīng)該找出當前對象中的所有屬性,然后一一注入碎绎。(這里會涉及到AutowiredAnnotationBeanPostProcessor中的postProcessMergedBeanDefinition(RootBeanDefinition beanDefinition, Class<?> beanType, String beanName)
和postProcessPropertyValues( PropertyValues pvs, PropertyDescriptor[] pds, Object bean, String beanName) throws BeanCreationException
方法獲取對象中需要被注入的屬性和在Spring容器中獲取相應(yīng)的屬性值)
我們在SpringBeanJobFactory
中的createJobInstance()中可以看到螃壤,就是從這里開始實例化Job類,并且把JobDataMap中的鍵值對填充到實例化后的Job對象中筋帖。
我們要做的是重寫SpringBeanJobFactory
中的createJobInstance()
方法奸晴,為實例化后的Job對象注入依賴對象。
@Component
public class AutowireSpringBeanJobFactory extends SpringBeanJobFactory implements ApplicationContextAware {
private transient AutowireCapableBeanFactory autowireCapableBeanFactory;
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
this.autowireCapableBeanFactory = applicationContext.getAutowireCapableBeanFactory();
}
@Override
protected Object createJobInstance(TriggerFiredBundle bundle) throws Exception {
Object job = super.createJobInstance(bundle);
autowireCapableBeanFactory.autowireBean(job);
return job;
}
}
AutowireCapableBeanFactory中的autowireBean()方法中是調(diào)用populateBean()
完成屬性的注入日麸。
重新配置SchedulerFactoryBean
最后重新運行testJobDetailFactoryBean()
寄啼,發(fā)現(xiàn)在MyQuartzJobBean
中成功注入了MyQuartzService
。
尾言
飯要慢慢吃代箭,路要慢慢走墩划!