本文首發(fā)于 vivo互聯(lián)網(wǎng)技術 微信公眾號
https://mp.weixin.qq.com/s/l4vuYpNRjKxQRkRTDhyg2Q
作者:陳王榮
分布式任務調(diào)度框架幾乎是每個大型應用必備的工具,本文介紹了任務調(diào)度框架使用的需求背景和痛點坛吁,對業(yè)界普遍使用的開源分布式任務調(diào)度框架的使用進行了探究實踐,并分析了這幾種框架的優(yōu)劣勢和對自身業(yè)務的思考宣增。
一爹脾、業(yè)務背景
1.1 為什么需要使用定時任務調(diào)度
(1)時間驅(qū)動處理場景:整點發(fā)送優(yōu)惠券,每天更新收益泌霍,每天刷新標簽數(shù)據(jù)和人群數(shù)據(jù)朱转。
(2) 批量處理數(shù)據(jù): 按月批量統(tǒng)計報表數(shù)據(jù)肋拔,批量更新短信狀態(tài)凉蜂,實時性要求不高。
(3) 異步執(zhí)行解耦:活動狀態(tài)刷新纫雁,異步執(zhí)行離線查詢轧邪,與內(nèi)部邏輯解耦忌愚。
1.2 使用需求和痛點
(1)任務執(zhí)行監(jiān)控告警能力。
(2)任務可靈活動態(tài)配置,無需重啟檬某。
(3)業(yè)務透明,低耦合厅瞎,配置精簡和簸,開發(fā)方便锁保。
(4)易測試爽柒。
(5)高可用浩村,無單點故障心墅。
(6)任務不可重復執(zhí)行怎燥,防止邏輯異常铐姚。
(7)大任務的分發(fā)并行處理能力。
二氢橙、開源框架實踐與探索
2.1 Java 原生 Timer 和ScheduledExecutorService
2.1.1 Timer使用
Timer缺陷:
(1)Timer底層是使用單線程來處理多個Timer任務,這意味著所有任務實際上都是串行執(zhí)行,前一個任務的延遲會影響到之后的任務的執(zhí)行滞欠。
(2)由于單線程的緣故筛璧,一旦某個定時任務在運行時,產(chǎn)生未處理的異常巫糙,那么不僅當前這個線程會停止醉锄,所有的定時任務都會停止。
(3)Timer任務執(zhí)行是依賴于系統(tǒng)絕對時間烟勋,系統(tǒng)時間變化會導致執(zhí)行計劃的變更神妹。
由于上述缺陷,盡量不要使用Timer蛹找, idea中也會明確提示哨坪,使用ScheduledThreadPoolExecutor替代Timer 庸疾。
2.1.2 ScheduledExecutorService使用
ScheduledExecutorService對于Timer的缺陷進行了修補,首先ScheduledExecutorService內(nèi)部實現(xiàn)是ScheduledThreadPool線程池当编,可以支持多個任務并發(fā)執(zhí)行届慈。
對于某一個線程執(zhí)行的任務出現(xiàn)異常,也會處理,不會影響其他線程任務的執(zhí)行金顿,另外ScheduledExecutorService是基于時間間隔的延遲臊泌,執(zhí)行不會由于系統(tǒng)時間的改變發(fā)生變化。
當然揍拆,ScheduledExecutorService也有自己的局限性:只能根據(jù)任務的延遲來進行調(diào)度筒狠,無法滿足基于絕對時間和日歷調(diào)度的需求运挫。
2.2 Spring Task
2.2.1 Spring Task 使用
spring task 是spring自主開發(fā)的輕量級定時任務框架碾牌,不需要依賴其他額外的包誓琼,配置較為簡單。
此處使用注解配置
2.2.2 Spring Task缺陷
Spring Task 本身不支持持久化帖鸦,也沒有推出官方的分布式集群模式,只能靠開發(fā)者在業(yè)務應用中自己手動擴展實現(xiàn),無法滿足可視化楷拳,易配置的需求她混。
2.3 永遠經(jīng)典的 Quartz
2.3.1 基本介紹
Quartz框架是Java領域最著名的開源任務調(diào)度工具酗钞,也是目前事實上的定時任務標準,幾乎全部的開源定時任務框架都是基于Quartz核心調(diào)度構建而成。
2.3.2 原理解析
核心組件和架構
關鍵概念
(1)Scheduler</strong>:任務調(diào)度器韭山,是執(zhí)行任務調(diào)度的控制器凿歼。本質(zhì)上是一個計劃調(diào)度容器洁桌,注冊了全部Trigger和對應的JobDetail吠谢, 使用線程池作為任務運行的基礎組件,提高任務執(zhí)行效率昭齐。
(2)Trigger:觸發(fā)器,用于定義任務調(diào)度的時間規(guī)則里覆,告訴任務調(diào)度器什么時候觸發(fā)任務虹统,其中CronTrigger是基于cron表達式構建的功能強大的觸發(fā)器呼奢。</p>
(3)Calendar:日歷特定時間點的集合孤澎。一個trigger可以包含多個Calendar,可用于排除或包含某些時間點缚俏。
(4)JobDetail:是一個可執(zhí)行的工作惊搏,用來描述Job實現(xiàn)類及其它相關的靜態(tài)信息贮乳,如Job的名稱、監(jiān)聽器等相關信息恬惯。
(5)Job:任務執(zhí)行接口向拆,只有一個execute方法,用于執(zhí)行真正的業(yè)務邏輯酪耳。
(6)JobStore:任務存儲方式浓恳,主要有RAMJobStore和JDBCJobStore,RAMJobStore是存儲在JVM的內(nèi)存中碗暗,有丟失和數(shù)量受限的風險颈将,JDBCJobStore是將任務信息持久化到數(shù)據(jù)庫中,支持集群言疗。
2.3.3 實踐說明
(1)關于Quartz的基本使用
可參考Quartz官方文檔和網(wǎng)上博客實踐教程晴圾。
(2)業(yè)務使用要滿足動態(tài)修改和重啟不丟失, 一般需要使用數(shù)據(jù)庫進行保存噪奄。
Quartz本身支持JDBCJobStore死姚,但是其配置的數(shù)據(jù)表比較多,官方推薦配置可參照官方文檔勤篮,超過10張表都毒,業(yè)務使用比較重。
在使用的時候只需要存在基本trigger配置和對應任務以及相關執(zhí)行日志的表即可滿足絕大部分需求碰缔。</p>
(3)組件化
將quartz動態(tài)任務配置信息持久化到數(shù)據(jù)庫账劲,將數(shù)據(jù)操作包裝成基本jar包,供項目之間使用金抡,引用項目只需要引入jar包依賴和配置對應的數(shù)據(jù)表瀑焦,使用時就可以對Quartz配置透明。
(4)擴展
集群模式: 通過故障轉(zhuǎn)移和負載均衡實現(xiàn)了任務的高可用性竟终,通過數(shù)據(jù)庫的鎖機制來確保任務執(zhí)行的唯一性蝠猬,但是集群特性僅僅只是用來HA,節(jié)點數(shù)量的增加并不會提升單個任務的執(zhí)行效率统捶,不能實現(xiàn)水平擴展榆芦。
Quartz插件:可以對特定需要進行擴展,比如增加觸發(fā)器和任務執(zhí)行日志喘鸟,任務依賴串行處理場景匆绣,可參考:quartz插件——實現(xiàn)任務之間的串行調(diào)度
2.3.4 缺陷和不足
(1)需要把任務信息持久化到業(yè)務數(shù)據(jù)表撵渡,和業(yè)務有耦合宁脊。
(2)調(diào)度邏輯和執(zhí)行邏輯并存于同一個項目中勺爱,在機器性能固定的情況下笙以,業(yè)務和調(diào)度之間不可避免地會相互影響。
(3)quartz集群模式下频鉴,是通過數(shù)據(jù)庫獨占鎖來唯一獲取任務弄匕,任務執(zhí)行并沒有實現(xiàn)完善的負載均衡機制督函。
2.4 輕量級神器 XXL-JOB
2.4.1 基本介紹
XXL-JOB是一個輕量級分布式任務調(diào)度平臺,主打特點是平臺化嚣镜,易部署爬迟,開發(fā)迅速、學習簡單菊匿、輕量級付呕、易擴展,代碼仍在持續(xù)更新中跌捆。
“調(diào)度中心”是任務調(diào)度控制臺徽职,平臺自身并不承擔業(yè)務邏輯,只是負責任務的統(tǒng)一管理和調(diào)度執(zhí)行佩厚,并且提供任務管理平臺姆钉, “執(zhí)行器” 負責接收“調(diào)度中心”的調(diào)度并執(zhí)行,可直接部署執(zhí)行器抄瓦,也可以將執(zhí)行器集成到現(xiàn)有業(yè)務項目中育韩。 通過將任務的調(diào)度控制和任務的執(zhí)行解耦,業(yè)務使用只需要關注業(yè)務邏輯的開發(fā)闺鲸。
主要提供了任務的動態(tài)配置管理、任務監(jiān)控和統(tǒng)計報表以及調(diào)度日志幾大功能模塊埃叭,支持多種運行模式和路由策略摸恍,可基于對應執(zhí)行器機器集群數(shù)量進行簡單分片數(shù)據(jù)處理。
2.4.2 原理解析
2.1.0版本前核心調(diào)度模塊都是基于quartz框架赤屋,2.1.0版本開始自研調(diào)度組件立镶,移除quartz依賴 ,使用時間輪調(diào)度类早。
2.4.3 實踐說明
詳細配置和介紹參考官方文檔
2.4.3.1 demo使用:
示例1:實現(xiàn)簡單任務配置媚媒,只需要繼承IJobHandler 抽象類,并聲明注解
@JobHandler(value="offlineTaskJobHandler")
@Component
public class OfflineTaskJobHandler extends IJobHandler {
@Reference(check = false,version = "cms-dev",group="cms-service")
private OfflineTaskExecutorFacade offlineTaskExecutorFacade;
@Override
public ReturnT<String> execute(String param) throws Exception {
XxlJobLogger.log(" offlineTaskJobHandler start.");
try {
offlineTaskExecutorFacade.executeOfflineTask();
} catch (Exception e) {
XxlJobLogger.log("offlineTaskJobHandler-->exception." , e);
return FAIL;
}
XxlJobLogger.log("XXL-JOB, offlineTaskJobHandler end.");
return SUCCESS;
}
}
示例2:分片廣播任務涩僻。
@JobHandler(value="shardingJobHandler")
@Service
public class ShardingJobHandler extends IJobHandler {
@Override
public ReturnT<String> execute(String param) throws Exception {
// 分片參數(shù)
ShardingUtil.ShardingVO shardingVO = ShardingUtil.getShardingVo();
XxlJobLogger.log("分片參數(shù):當前分片序號 = {}, 總分片數(shù) = {}", shardingVO.getIndex(), shardingVO.getTotal());
// 業(yè)務邏輯
for (int i = 0; i < shardingVO.getTotal(); i++) {
if (i == shardingVO.getIndex()) {
XxlJobLogger.log("第 {} 片, 命中分片開始處理", i);
} else {
XxlJobLogger.log("第 {} 片, 忽略", i);
}
}
return SUCCESS;
}
}
2.4.3.2 整合dubbo
(1)引入dubbo-spring-boot-starter和業(yè)務facade jar包依賴缭召。
<dependency>
<groupId>com.alibaba.spring.boot</groupId>
<artifactId>dubbo-spring-boot-starter</artifactId>
<version>2.0.0</version>
</dependency>
<dependency>
<groupId>com.demo.service</groupId>
<artifactId>xxx-facade</artifactId>
<version>1.9-SNAPSHOT</version>
</dependency>
(2)配置文件加入dubbo消費端配置(可根據(jù)環(huán)境定義多個配置文件,通過profile切換)逆日。
## Dubbo 服務消費者配置
spring.dubbo.application.name=xxl-job
spring.dubbo.registry.address=zookeeper://zookeeper.xyz:2183
spring.dubbo.port=20880
spring.dubbo.version=demo
spring.dubbo.group=demo-service
(3)代碼中通過@Reference注入facade接口即可嵌巷。
@Reference(check = false,version = demo,group = demo-service)
private OfflineTaskExecutorFacade offlineTaskExecutorFacade;
(4)啟動程序加入@EnableDubboConfiguration注解。
@SpringBootApplication
@EnableDubboConfiguration
public class XxlJobExecutorApplication {
public static void main(String[] args) {
SpringApplication.run(XxlJobExecutorApplication.class, args);
}
}
2.4.4 任務可視化配置
內(nèi)置了平臺項目室抽,方便了開發(fā)者對任務的管理和執(zhí)行日志的監(jiān)控搪哪,并提供了一些便于測試的功能。
2.4.5 擴展
(1)任務監(jiān)控和報表的優(yōu)化坪圾。
(2)任務報警方式的擴展晓折,比如加入告警中心惑朦,提供內(nèi)部消息,短信告警漓概。
(3)對實際業(yè)務內(nèi)部執(zhí)行出現(xiàn)異常情況下的不同監(jiān)控告警和重試策略漾月。
2.5 高可用 Elastic-Job
2.5.1 基本介紹
Elastic-Job是一個分布式調(diào)度解決方案,由兩個相互獨立的子項目Elastic-Job-Lite和Elastic-Job-Cloud組成垛耳。
Elastic-Job-Lite定位為輕量級無中心化解決方案栅屏,使用jar包的形式提供分布式任務的協(xié)調(diào)服務。
Elastic-Job-Cloud使用Mesos + Docker的解決方案堂鲜,額外提供資源治理栈雳、應用分發(fā)以及進程隔離等服務。
可惜的是已經(jīng)兩年沒有迭代更新記錄缔莲。
2.5.2 原理解析
2.5.3 實踐說明
2.5.3.1 demo使用
(1)安裝zookeeper哥纫,配置注冊中心config,配置文件加入注冊中心zk的配置痴奏。
@Configuration
@ConditionalOnExpression("'${regCenter.serverList}'.length() > 0")
public class JobRegistryCenterConfig {
@Bean(initMethod = "init")
public ZookeeperRegistryCenter regCenter(@Value("${regCenter.serverList}") final String serverList,
@Value("${regCenter.namespace}") final String namespace) {
return new ZookeeperRegistryCenter(new ZookeeperConfiguration(serverList, namespace));
}
}
注冊中心zk的配置:
spring.application.name=demo_elasticjob
regCenter.serverList=localhost:2181
regCenter.namespace=demo_elasticjob
spring.datasource.url=jdbc:mysql://127.0.0.1:3306/xxl-job?Unicode=true&characterEncoding=UTF-8
spring.datasource.username=user
spring.datasource.password=pwd
(2)配置數(shù)據(jù)源config蛀骇,并配置文件中加入數(shù)據(jù)源配置。
@Getter
@Setter
@NoArgsConstructor
@AllArgsConstructor
@ToString
@Configuration
@ConfigurationProperties(prefix = "spring.datasource")
public class DataSourceProperties {
private String url;
private String username;
private String password;
@Bean
@Primary
public DataSource getDataSource() {
DruidDataSource dataSource = new DruidDataSource();
dataSource.setUrl(url);
dataSource.setUsername(username);
dataSource.setPassword(password);
return dataSource;
}
}
數(shù)據(jù)源配置
spring.datasource.url=jdbc:mysql://127.0.0.1:3306/xxl-job?Unicode=true&characterEncoding=UTF-8
spring.datasource.username=user
spring.datasource.password=pwd
(3)配置事件config读拆。
@Configuration
public class JobEventConfig {
@Autowired
private DataSource dataSource;
@Bean
public JobEventConfiguration jobEventConfiguration() {
return new JobEventRdbConfiguration(dataSource);
}
}
(4)為了便于靈活配置不同的任務觸發(fā)事件擅憔,加入ElasticSimpleJob注解。
@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
public @interface ElasticSimpleJob {
@AliasFor("cron")
String value() default "";
@AliasFor("value")
String cron() default "";
String jobName() default "";
int shardingTotalCount() default 1;
String shardingItemParameters() default "";
String jobParameter() default "";
}
(5)對配置進行初始化檐晕。
@Configuration
@ConditionalOnExpression("'${elaticjob.zookeeper.server-lists}'.length() > 0")
public class ElasticJobAutoConfiguration {
@Value("${regCenter.serverList}")
private String serverList;
@Value("${regCenter.namespace}")
private String namespace;
@Autowired
private ApplicationContext applicationContext;
@Autowired
private DataSource dataSource;
@PostConstruct
public void initElasticJob() {
ZookeeperRegistryCenter regCenter = new ZookeeperRegistryCenter(new ZookeeperConfiguration(serverList, namespace));
regCenter.init();
Map<String, SimpleJob> map = applicationContext.getBeansOfType(SimpleJob.class);
for (Map.Entry<String, SimpleJob> entry : map.entrySet()) {
SimpleJob simpleJob = entry.getValue();
ElasticSimpleJob elasticSimpleJobAnnotation = simpleJob.getClass().getAnnotation(ElasticSimpleJob.class);
String cron = StringUtils.defaultIfBlank(elasticSimpleJobAnnotation.cron(), elasticSimpleJobAnnotation.value());
SimpleJobConfiguration simpleJobConfiguration = new SimpleJobConfiguration(JobCoreConfiguration.newBuilder(simpleJob.getClass().getName(), cron, elasticSimpleJobAnnotation.shardingTotalCount()).shardingItemParameters(elasticSimpleJobAnnotation.shardingItemParameters()).build(), simpleJob.getClass().getCanonicalName());
LiteJobConfiguration liteJobConfiguration = LiteJobConfiguration.newBuilder(simpleJobConfiguration).overwrite(true).build();
JobEventRdbConfiguration jobEventRdbConfiguration = new JobEventRdbConfiguration(dataSource);
SpringJobScheduler jobScheduler = new SpringJobScheduler(simpleJob, regCenter, liteJobConfiguration, jobEventRdbConfiguration);
jobScheduler.init();
}
}
}
(6)實現(xiàn) SimpleJob接口暑诸,按上文中方法整合dubbo, 完成業(yè)務邏輯。
@ElasticSimpleJob(
cron = "*/10 * * * * ?",
jobName = "OfflineTaskJob",
shardingTotalCount = 2,
jobParameter = "測試參數(shù)",
shardingItemParameters = "0=A,1=B")
@Component
public class MySimpleJob implements SimpleJob {
Logger logger = LoggerFactory.getLogger(OfflineTaskJob.class);
@Reference(check = false, version = "cms-dev", group = "cms-service")
private OfflineTaskExecutorFacade offlineTaskExecutorFacade;
@Override
public void execute(ShardingContext shardingContext) {
offlineTaskExecutorFacade.executeOfflineTask();
logger.info(String.format("Thread ID: %s, 作業(yè)分片總數(shù): %s, " +
"當前分片項: %s.當前參數(shù): %s," +
"作業(yè)名稱: %s.作業(yè)自定義參數(shù): %s"
,
Thread.currentThread().getId(),
shardingContext.getShardingTotalCount(),
shardingContext.getShardingItem(),
shardingContext.getShardingParameter(),
shardingContext.getJobName(),
shardingContext.getJobParameter()
));
}
}
2.6 其余開源框架
(1) Saturn :Saturn是唯品會開源的一個分布式任務調(diào)度平臺辟灰,在Elastic Job的基礎上進行了改造个榕。
(2) SIA-TASK :是宜信開源的分布式任務調(diào)度平臺。
三芥喇、優(yōu)劣勢對比和業(yè)務場景適配思考
業(yè)務思考:
(1)豐富任務監(jiān)控數(shù)據(jù)和告警策略西采。
(2) 接入統(tǒng)一登錄和權限控制。
(3) 進一步簡化業(yè)務接入步驟继控。
四械馆、結語
對于并發(fā)場景不是特別高的系統(tǒng)來說,xxl-job配置部署簡單易用湿诊,不需要引入多余的組件狱杰,同時提供了可視化的控制臺,使用起來非常友好厅须,是一個比較好的選擇仿畸。希望直接利用開源分布式框架能力的系統(tǒng),建議根據(jù)自身的情況來進行合適的選型。