官方的starter目前不支持注解聲明job待错。
這里做了一些擴展
- 支持注解聲明job
- 支持多級注解讥此,一個service聲明多個job
- 支持手工增加,刪除,觸發(fā)job
https://github.com/kangarooxin/elastic-job3-spring-boot-starter
使用說明:
- 在pom.xml 添加引用
<dependency>
<groupId>io.github.kangarooxin</groupId>
<artifactId>elastic-job3-spring-boot-starter</artifactId>
<version>1.0.1</version>
</dependency>
- 配置
#reg-center
elasticjob.reg-center.server-lists=localhost:2181
elasticjob.reg-center.namespace=${spring.application.name}.elastic-job
- 使用注解
@Service
@ElasticJobScheduler(cron = "0/5 * * * * ?", shardingTotalCount = 4, name = "SimpleElasticJob",
shardingItemParameters = "0=0,1=0,2=1,3=1", jobParameters = "parameter")
@Slf4j
public class SimpleElasticJob implements SimpleJob {
@Override
public void execute(ShardingContext shardingContext) {
log.info("Thread ID: {}, ShardingTotalCount: {}, ShardingItem: {}, ShardingParameter: {}, JobName: {}, JobParameter: {}",
Thread.currentThread().getId(),
shardingContext.getShardingTotalCount(),
shardingContext.getShardingItem(),
shardingContext.getShardingParameter(),
shardingContext.getJobName(),
shardingContext.getJobParameter()
);
}
}
- 支持多級注解
@Service
@ElasticJobScheduler(cron = "0/5 * * * * ?", shardingTotalCount = 4, name = "SimpleElasticJobLevel1",
shardingItemParameters = "0=0,1=0,2=1,3=1", jobParameters = "parameter")
@ElasticJobScheduler(cron = "0 */1 * * * ?", shardingTotalCount = 4, name = "SimpleElasticJobLevel2",
shardingItemParameters = "0=0,1=0,2=1,3=1", jobParameters = "parameter")
@Slf4j
public class MultipleElasticJob implements SimpleJob {
@Override
public void execute(ShardingContext shardingContext) {
log.info("Thread ID: {}, ShardingTotalCount: {}, ShardingItem: {}, ShardingParameter: {}, JobName: {}, JobParameter: {}",
Thread.currentThread().getId(),
shardingContext.getShardingTotalCount(),
shardingContext.getShardingItem(),
shardingContext.getShardingParameter(),
shardingContext.getJobName(),
shardingContext.getJobParameter()
);
}
}
- 使用ElasticJobService,可以手工增加,刪除,觸發(fā)job
public interface ElasticJobService {
/**
* 添加簡單作業(yè)
*
* @param elasticJob
* @param jobName
* @param shardingTotalCount
* @param cron
*/
JobBootstrap addJob(ElasticJob elasticJob, String jobName, int shardingTotalCount, String cron);
/**
* 添加簡單作業(yè)
*
* @param elasticJob
* @param jobName
* @param shardingTotalCount
* @param cron
* @param jobParameter
* @param shardingItemParameters
*/
JobBootstrap addJob(ElasticJob elasticJob, String jobName, int shardingTotalCount, String cron, String jobParameter, String... shardingItemParameters);
/**
* 添加簡單作業(yè)
*
* @param jobConfiguration
* @param jobBootstrapBeanName 非必填
* @param elasticJob
* @return
*/
JobBootstrap addJob(JobConfiguration jobConfiguration, String jobBootstrapBeanName, ElasticJob elasticJob);
/**
* 添加數(shù)據(jù)流作業(yè)
*
* @param jobConfiguration
* @param jobBootstrapBeanName
* @param elasticJob
* @param streamingProcess 是否開啟流式處理 默認(rèn)false
* @return
*/
<T> JobBootstrap addDataFlowJob(JobConfiguration jobConfiguration, String jobBootstrapBeanName, DataflowJob<T> elasticJob, boolean streamingProcess);
/**
* 添加定時Script調(diào)度
*
* @param jobConfiguration
* @param scriptCommendLine
*/
JobBootstrap addScriptJob(JobConfiguration jobConfiguration, String jobBootstrapBeanName, String scriptCommendLine);
/**
* 添加定時Http調(diào)度
*
* @param jobConfiguration
* @param httpProp
*/
JobBootstrap addHttpJob(JobConfiguration jobConfiguration, String jobBootstrapBeanName, HttpJobProp httpProp);
/**
* 移除job
*
* @param jobName
*/
void removeJob(String jobName);
/**
* 移除job
*
* @param jobName
* @param jobBootstrapBeanName
*/
void removeJob(String jobName, String jobBootstrapBeanName);
/**
* get job configuration.
*
* @param jobName job name
* @return job configuration
*/
JobConfigurationPOJO getJobConfiguration(String jobName);
/**
* Update job configuration.
*
* @param jobConfig job configuration
*/
void updateJobConfiguration(JobConfigurationPOJO jobConfig);
/**
* Remove job configuration.
*
* @param jobName job name
*/
void removeJobConfiguration(String jobName);
/**
* Trigger job to run at once.
*
* <p>Job will not start until it does not conflict with the last running job, and this tag will be automatically cleaned up after it starts.</p>
*
* @param jobName job name
*/
void trigger(String jobName);
/**
* Get jobs total count.
*
* @return jobs total count.
*/
int getJobsTotalCount();
/**
* Get all jobs brief info.
*
* @return all jobs brief info.
*/
Collection<JobBriefInfo> getAllJobsBriefInfo();
/**
* Get job brief info.
*
* @param jobName job name
* @return job brief info
*/
JobBriefInfo getJobBriefInfo(String jobName);
}
- 更多配置
#配置策略陈肛,自定義策略參考官方文檔定義后配置到這里即可
#錯誤策略. LOG,THROW,IGNORE,EMAIL,WECHAT,DINGTALK
elasticjob.job-error-handler-type=LOG
#分片策略. AVG_ALLOCATION,ODEVITY,ROUND_ROBIN
elasticjob.job-sharding-strategy-type=AVG_ALLOCATION
#線程策略. CPU,SINGLE_THREAD
elasticjob.job-executor-service-handler-type=CPU
#追蹤
elasticjob.tracing.type=RDB
#multiple datasource
elasticjob.tracing.data-source-bean-name=db1MasterSlaveRoutingDatasource
#配置郵件提醒策略屬性
elasticjob.props.email.host=host
elasticjob.props.email.port=465
elasticjob.props.email.username=username
elasticjob.props.email.password=password
elasticjob.props.email.useSsl=true
elasticjob.props.email.subject=ElasticJob error message
elasticjob.props.email.from=from@xxx.xx
elasticjob.props.email.to=to1@xxx.xx,to2@xxx.xx
elasticjob.props.email.cc=cc@xxx.xx
elasticjob.props.email.bcc=bcc@xxx.xx
elasticjob.props.email.debug=false
#配置微信提醒策略屬性
elasticjob.props.wechat.webhook=you_webhook
elasticjob.props.wechat.connectTimeout=3000
elasticjob.props.wechat.readTimeout=5000
#配置釘釘提醒策略屬性
elasticjob.props.dingtalk.webhook=you_webhook
elasticjob.props.dingtalk.keyword=you_keyword
elasticjob.props.dingtalk.secret=you_secret
elasticjob.props.dingtalk.connectTimeout=3000
elasticjob.props.dingtalk.readTimeout=5000