elastic-job-spring-boot
qq交流群:812321371
1 簡介
Elastic-Job
是一個(gè)分布式調(diào)度解決方案鳖枕,由兩個(gè)相互獨(dú)立的子項(xiàng)目Elastic-Job-Lite
和Elastic-Job-Cloud
組成。Elastic-Job-Lite
定位為輕量級無中心化解決方案墙基,使用jar包的形式提供分布式任務(wù)的協(xié)調(diào)服務(wù)阳啥。
基于quartz
定時(shí)任務(wù)框架為基礎(chǔ)的添谊,因此具備quartz
的大部分功能
使用zookeeper
做協(xié)調(diào),調(diào)度中心察迟,更加輕量級
支持任務(wù)的分片
支持彈性擴(kuò)容,可以水平擴(kuò)展, 當(dāng)任務(wù)再次運(yùn)行時(shí)斩狱,會檢查當(dāng)前的服務(wù)器數(shù)量,重新分片扎瓶,分片結(jié)束之后才會繼續(xù)執(zhí)行任務(wù)
失效轉(zhuǎn)移所踊,容錯(cuò)處理,當(dāng)一臺調(diào)度服務(wù)器宕機(jī)或者跟zookeeper
斷開連接之后概荷,會立即停止作業(yè)秕岛,然后再去尋找其他空閑的調(diào)度服務(wù)器,來運(yùn)行剩余的任務(wù)
提供運(yùn)維界面误证,可以管理作業(yè)和注冊中心继薛。
1.1 使用場景
由于項(xiàng)目為微服務(wù),單模塊可能在兩個(gè)實(shí)例以上的數(shù)量愈捅,定時(shí)器就會出現(xiàn)多實(shí)例同時(shí)執(zhí)行的情況遏考。
一般定時(shí)器缺少管理界面,無法監(jiān)控定時(shí)器是否執(zhí)行成功蓝谨。
市面上常見的解決方案為定時(shí)器加鎖的操作灌具,或者采用第3方分布式定時(shí)器。
分布式定時(shí)器有多種方案譬巫,比如阿里內(nèi)部的ScheduledX
咖楣,當(dāng)當(dāng)網(wǎng)的Elastic job
,個(gè)人開源的xxl-job
等芦昔。
1.2 功能列表
- 分布式調(diào)度協(xié)調(diào)
- 彈性擴(kuò)容縮容
- 失效轉(zhuǎn)移
- 錯(cuò)過執(zhí)行作業(yè)重觸發(fā)
- 作業(yè)分片一致性诱贿,保證同一分片在分布式環(huán)境中僅一個(gè)執(zhí)行實(shí)例
- 自診斷并修復(fù)分布式不穩(wěn)定造成的問題
- 支持并行調(diào)度
- 支持作業(yè)生命周期操作
- 豐富的作業(yè)類型
- Spring整合以及命名空間提供
- 運(yùn)維平臺
1.3 概念
分片:任務(wù)的分布式執(zhí)行,需要將一個(gè)任務(wù)拆分為多個(gè)獨(dú)立的任務(wù)項(xiàng)烟零,然后由分布式的服務(wù)器分別執(zhí)行某一個(gè)或幾個(gè)分片項(xiàng)瘪松。
例如:有一個(gè)遍歷數(shù)據(jù)庫某張表的作業(yè),現(xiàn)有2臺服務(wù)器锨阿。為了快速的執(zhí)行作業(yè)宵睦,那么每臺服務(wù)器應(yīng)執(zhí)行作業(yè)的50%
。 為滿足此需求墅诡,可將作業(yè)分成2片壳嚎,每臺服務(wù)器執(zhí)行1片桐智。作業(yè)遍歷數(shù)據(jù)的邏輯應(yīng)為:服務(wù)器A遍歷ID以奇數(shù)結(jié)尾的數(shù)據(jù);服務(wù)器B遍歷ID以偶數(shù)結(jié)尾的數(shù)據(jù)烟馅。 如果分成10片说庭,則作業(yè)遍歷數(shù)據(jù)的邏輯應(yīng)為:每片分到的分片項(xiàng)應(yīng)為ID%10,而服務(wù)器A被分配到分片項(xiàng)0,1,2,3,4
郑趁;服務(wù)器B被分配到分片項(xiàng)5,6,7,8,9
刊驴,直接的結(jié)果就是服務(wù)器A遍歷ID
以0-4
結(jié)尾的數(shù)據(jù);服務(wù)器B遍歷ID
以5-9
結(jié)尾的數(shù)據(jù)寡润。
歷史軌跡:Elastic-Job
提供了事件追蹤功能捆憎,可通過事件訂閱的方式處理調(diào)度過程的重要事件,用于查詢梭纹、統(tǒng)計(jì)和監(jiān)控躲惰。
1.4 封裝elasticjob
由于當(dāng)當(dāng)網(wǎng)Elastic job
處于1年間未更新階段,相關(guān)jar處于可以使用階段功能不全变抽〈〔Γ考慮到使用場景為多項(xiàng)目使用,將elastic-job-lite-spring
簡單封裝便于使用绍载。
2.使用說明:
2.1 添加依賴
ps:實(shí)際version版本請使用最新版
<dependency>
<groupId>com.purgeteam</groupId>
<artifactId>elasticjob-spring-boot-starter</artifactId>
<version>0.1.1.RELEASE</version>
</dependency>
2.2 配置
ps: 需要mysql
,zookeeper
支持诡宗,請?zhí)崆按罱ê谩?/p>
配置bootstrap.yml
或者application.yml
。
加入以下配置:
spring:
elasticjob:
datasource: # job需要的記錄數(shù)據(jù)源
url: jdbc:mysql://127.0.0.1:3306/batch_log?useUnicode=true&characterEncoding=utf-8&verifyServerCertificate=false&useSSL=false&requireSSL=false
driver-class-name: com.mysql.cj.jdbc.Driver
username: root
password: Rtqw123OpnmER
regCenter: # 注冊中心
serverList: 127.0.0.1:2181
namespace: elasticJobDemo
2.3 定時(shí)器實(shí)現(xiàn)方法編寫
創(chuàng)建定時(shí)器類(唯一不同的地方在于將@Scheduled
改為實(shí)現(xiàn)SimpleJob
接口即可)
定時(shí)器實(shí)現(xiàn)方法編寫在execute
方法里逛钻。
@Slf4j
@Component
public class MySimpleJob implements SimpleJob {
// @Scheduled(cron = "0 0/1 * * * ?")
@Override
public void execute(ShardingContext shardingContext) {
log.info(String.format("Thread ID: %s, 作業(yè)分片總數(shù): %s, " +
"當(dāng)前分片項(xiàng): %s.當(dāng)前參數(shù): %s," +
"作業(yè)名稱: %s.作業(yè)自定義參數(shù): %s",
Thread.currentThread().getId(),
shardingContext.getShardingTotalCount(),
shardingContext.getShardingItem(),
shardingContext.getShardingParameter(),
shardingContext.getJobName(),
shardingContext.getJobParameter()
));
// 分片大致如下:根據(jù)配置的分片參數(shù)執(zhí)行相應(yīng)的邏輯
switch (context.getShardingItem()) {
case 0:
// do something by sharding item 0
break;
case 1:
// do something by sharding item 1
break;
case 2:
// do something by sharding item 2
break;
// case n: ...
}
}
}
log:Thread ID: 66, 作業(yè)分片總數(shù): 1, 當(dāng)前分片項(xiàng): 0.當(dāng)前參數(shù): Beijing,作業(yè)名稱: PropertiesSimpleJob.作業(yè)自定義參數(shù): test
2.4 配置定時(shí)器
2.4.1 創(chuàng)建Configuration類
將ZookeeperRegistryCenter
和JobEventConfiguration
注入僚焦。
創(chuàng)建JobScheduler
@Bean(initMethod = "init")
。
在mySimpleJobScheduler
方法里先通過ElasticJobUtils#getLiteJobConfiguration
獲取LiteJobConfiguration
對象曙痘。
創(chuàng)建SpringJobScheduler
對象返回即可。
@Configuration
public class MyJobConfig {
// job 名稱
private static final String JOB_NAME = "MySimpleJob";
// 定時(shí)器cron參數(shù)
private static final String CRON = "0 0/1 * * * ?";
// 定時(shí)器分片
private static final int SHARDING_TOTAL_COUNT = 1;
// 分片參數(shù)
private static final String SHARDING_ITEM_PARAMETERS = "0=Beijing,1=Shanghai,2=Guangzhou";
// 自定義參數(shù)
private static final String JOB_PARAMETERS = "parameter";
@Resource
private ZookeeperRegistryCenter regCenter;
@Resource
private JobEventConfiguration jobEventConfiguration;
@Bean(initMethod = "init")
public JobScheduler mySimpleJobScheduler(final MySimpleJob mySimpleJob) {
LiteJobConfiguration liteJobConfiguration = ElasticJobUtils
.getLiteJobConfiguration(mySimpleJob.getClass(), JOB_NAME, CRON,
SHARDING_TOTAL_COUNT, SHARDING_ITEM_PARAMETERS, JOB_PARAMETERS);
// 參數(shù):1.定時(shí)器實(shí)例立肘,2.注冊中心類边坤,3.LiteJobConfiguration,
// 3.歷史軌跡(不需要可以省略)
return new SpringJobScheduler(mySimpleJob, regCenter, liteJobConfiguration, jobEventConfiguration);
}
}
ElasticJobUtils#getLiteJobConfiguration
參數(shù)簡介:
/**
* 獲取 {@link LiteJobConfiguration} 對象
*
* @param jobClass 定時(shí)器實(shí)現(xiàn)類
* @param jobName 定時(shí)器名稱
* @param cron 定時(shí)參數(shù)
* @param shardingTotalCount 作業(yè)分片總數(shù)
* @param shardingItemParameters 當(dāng)前參數(shù) 可以為null
* @param jobParameters 作業(yè)自定義參數(shù) 可以為null
* @return {@link LiteJobConfiguration}
*/
public static LiteJobConfiguration getLiteJobConfiguration(
final Class<? extends SimpleJob> jobClass,
final String jobName,
final String cron,
final int shardingTotalCount,
final String shardingItemParameters,
final String jobParameters) {
...
return ...;
}
2.4.2 簡化Configuration類
當(dāng)然也可以用下面的@Configuration
實(shí)現(xiàn)簡化,配置bootstrap.yml
或者application.yml
谅年。
spring:
elasticjob:
scheduled:
jobConfigMap: // 為map集合
PropertiesSimpleJob: // 定時(shí)器key名稱
jobName: PropertiesSimpleJob // job名稱
cron: 0 0/1 * * * ? // cron表達(dá)式
shardingTotalCount: 2 // 分片數(shù)量
shardingItemParameters: 0=123,1=332 // 分片參數(shù)
jobParameters: test // 自定義參數(shù)
注入SpringJobSchedulerFactory
茧痒,在propertiesSimpleJobScheduler
方法里調(diào)用gerSpringJobScheduler
方法即可。
@Configuration
public class PropertiesSimpleJobConfig {
@Resource
private SpringJobSchedulerFactory springJobSchedulerFactory;
@Bean(initMethod = "init")
public JobScheduler propertiesSimpleJobScheduler(final PropertiesSimpleJob job) {
// 參數(shù):1.定時(shí)器實(shí)例融蹂,2.配置名稱旺订,3.是否開啟歷史軌跡
return springJobSchedulerFactory.getSpringJobScheduler(job,"PropertiesSimpleJob", true);
}
}
2.4.3 注解方式配置(推薦方式)
ps:這個(gè)注解包含了上述方式,簡化定時(shí)器注入超燃。
繼承SimpleJob
實(shí)現(xiàn)方法execute
区拳。
在AnnotationSimpleJob
類上加入注解@ElasticJobScheduler
即可。
下面為完整注解意乓。
@Slf4j
@ElasticJobScheduler(
name = "AnnotationSimpleJob", // 定時(shí)器名稱
cron = "0/8 * * * * ?", // 定時(shí)器表達(dá)式
shardingTotalCount = 1, // 作業(yè)分片總數(shù) 默認(rèn)為1
shardingItemParameters = "0=Beijing,1=Shanghai,2=Guangzhou", // 分片序列號和參數(shù)用等號分隔 不需要參數(shù)可以不加
jobParameters = "123", // 作業(yè)自定義參數(shù) 不需要參數(shù)可以不加
isEvent = true // 是否開啟數(shù)據(jù)記錄 默認(rèn)為true
)
public class AnnotationSimpleJob implements SimpleJob {
@Override
public void execute(ShardingContext shardingContext) {
log.info(String.format("Thread ID: %s, 作業(yè)分片總數(shù): %s, " +
"當(dāng)前分片項(xiàng): %s.當(dāng)前參數(shù): %s," +
"作業(yè)名稱: %s.作業(yè)自定義參數(shù): %s",
Thread.currentThread().getId(),
shardingContext.getShardingTotalCount(),
shardingContext.getShardingItem(),
shardingContext.getShardingParameter(),
shardingContext.getJobName(),
shardingContext.getJobParameter()
));
}
}
總結(jié)
分布式j(luò)ob可以解決多個(gè)項(xiàng)目同一個(gè)定時(shí)器都執(zhí)行的問題樱调,配合elastic-job控制臺可以直觀監(jiān)控定時(shí)器執(zhí)行情況等。
示例代碼地址:elastic-job-spring-boot
作者GitHub:
Purgeyao 歡迎關(guān)注
本文由博客一文多發(fā)平臺 OpenWrite 發(fā)布!