概述
Elastic-Job是一個(gè)分布式調(diào)度解決方案订讼,由兩個(gè)相互獨(dú)立的子項(xiàng)目Elastic-Job-Lite和Elastic-Job-Cloud組成钧汹。
官方文檔地址:http://elasticjob.io/docs/elastic-job-lite/00-overview/
提供了如下功能
- 分布式調(diào)度協(xié)調(diào)
- 彈性擴(kuò)容縮容
- 失效轉(zhuǎn)移
- 錯(cuò)過執(zhí)行作業(yè)重觸發(fā)
- 作業(yè)分片一致性,保證同一分片在分布式環(huán)境中僅一個(gè)執(zhí)行實(shí)例
- 自診斷并修復(fù)分布式不穩(wěn)定造成的問題
- 支持并行調(diào)度
- 支持作業(yè)生命周期操作
- 豐富的作業(yè)類型
- Spring整合以及命名空間提供
- 運(yùn)維平臺(tái)
單機(jī)定時(shí)任務(wù)
通常我們在開發(fā)過程中或多或少會(huì)運(yùn)用到定時(shí)任務(wù)层亿。常用單機(jī)定時(shí)任務(wù)我們常用的包括:
- java自帶的timer
- ScheduledExecutorService
- spring定時(shí)任務(wù)
- Quartz 也可支持分布式
單機(jī)定時(shí)任務(wù)的缺點(diǎn)
- 缺乏高可用性,單機(jī)式的定時(shí)任務(wù)調(diào)度只能在一臺(tái)機(jī)器上運(yùn)行桦卒,程序或系統(tǒng)異常將導(dǎo)致功能不可用
- 單機(jī)處理極限,單機(jī)的分布式調(diào)度只能在一臺(tái)機(jī)器上執(zhí)行匿又,收到單機(jī)CUP內(nèi)存等的限制
elastic-job的使用場景
elastic-job就是一個(gè)分布式的定時(shí)任務(wù)調(diào)度,我們能用它做什么?
例如有個(gè)需求需要在每天晚上統(tǒng)計(jì)當(dāng)天的訂單訂單情況,單機(jī)模式下我們會(huì)寫一段程序去處理方灾,假如有10萬個(gè)訂單,處理成功需要耗時(shí)很長時(shí)間琳省,且在機(jī)器瓶頸下可能會(huì)導(dǎo)致內(nèi)存不足等情況迎吵。若使用elastic-job,我們則可以考慮將定時(shí)任務(wù)進(jìn)行分片,讓其分布在不同的機(jī)器上運(yùn)行针贬,提供可用性击费,減少單機(jī)失敗帶來的功能不可用。例如當(dāng)下有2臺(tái)機(jī)器桦他,將任務(wù)分成4片蔫巩,則每臺(tái)機(jī)器處理其中2片任務(wù)谆棱。
使用實(shí)例初探
根據(jù)上圖,實(shí)現(xiàn)將任務(wù)分成4片圆仔,分別在2臺(tái)機(jī)器上執(zhí)行
任務(wù)類
//具體執(zhí)行的任務(wù)類
public class MyElasticJob implements SimpleJob {
public final static Logger logger = LoggerFactory.getLogger(MyElasticJob.class);
@Override
public void execute(ShardingContext shardingContext) {
int shardingTotalCount = shardingContext.getShardingTotalCount();
String shardingParameter = shardingContext.getShardingParameter();
//任務(wù)的分片項(xiàng)垃瞧,從0開始遞增例如有四個(gè)分片則序號為0~4
int shardingItem = shardingContext.getShardingItem();
//分片帶的參數(shù)例如0=A,1=B,2=C,3=D
String jobParamter = shardingContext.getJobParameter();
logger.info("shardingItem =" + shardingItem + " , shardingTotalCount=" + shardingTotalCount + " , shardingParameter =" + shardingParameter + " ,jobParamter =" + jobParamter);
//System.out.println("shardingItem =" + shardingItem + " , shardingTotalCount=" + shardingTotalCount + " , shardingParameter =" + shardingParameter + " ,jobParamter =" + jobParamter);
switch (shardingItem){
case 0:
doLongJob("食品訂單");
break;
case 1:
doSmallJob("電腦訂單");
break;
case 2:
doLongJob("服裝訂單");
break;
case 3:
doSmallJob("機(jī)械訂單");
break;
}
}
public void doLongJob(String msg){
logger.info("========長時(shí)間任務(wù)=======" + msg);
try {
TimeUnit.SECONDS.sleep(60);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public void doSmallJob(String msg){
logger.info("========短時(shí)間任務(wù)=======" + msg);
}
}
配置類
public class FastDemo {
private static CoordinatorRegistryCenter createRegistryCenter(){
//elasticjob采用zookeeper進(jìn)行任務(wù)的調(diào)度,根據(jù)搶主的方式實(shí)現(xiàn)同一時(shí)刻只有一個(gè)任務(wù)在一臺(tái)機(jī)器上執(zhí)行,保證不重復(fù)坪郭,這里配置zookeeper的地址
CoordinatorRegistryCenter registryCenter = new ZookeeperRegistryCenter(new ZookeeperConfiguration("192.168.0.103:2181","elastic-job-demo"));
registryCenter.init();
return registryCenter;
}
private static LiteJobConfiguration createJobConfiguration() {
//設(shè)置任務(wù)每15秒執(zhí)行一次个从,一共分成4片,elastic-job會(huì)獲取當(dāng)前這個(gè)任務(wù)一共在多少臺(tái)服務(wù)器上進(jìn)行平均分配歪沃,例如我將war包分別放在了128嗦锐,129機(jī)器上,任務(wù)總片為4,則按照elasticjob的默認(rèn)分配策略沪曙,128將執(zhí)行第0奕污,1片任務(wù),129執(zhí)行第2液走,3片任務(wù)
JobCoreConfiguration simpleCoreConfig = JobCoreConfiguration
.newBuilder("demoSimpleJob", "0/20 * * * * ?", 4)
.shardingItemParameters("0=A,1=B,2=C,3=D")
.jobParameter("xuzy")
.failover(true) //設(shè)置失效轉(zhuǎn)移,當(dāng)一臺(tái)機(jī)器掛了以后他的分片會(huì)讓其他臺(tái)服務(wù)器執(zhí)行
.build();
// 定義SIMPLE類型配置
SimpleJobConfiguration simpleJobConfig = new SimpleJobConfiguration(simpleCoreConfig, MyElasticJob.class.getCanonicalName());
// 定義Lite作業(yè)根配置
LiteJobConfiguration simpleJobRootConfig = LiteJobConfiguration.newBuilder(simpleJobConfig).build();
return simpleJobRootConfig;
}
public static void initJob(){
new JobScheduler(createRegistryCenter(), createJobConfiguration()).init();
}
}
ServletContextLTest類
public class ServletContextLTest implements ServletContextListener {
@Override
public void contextInitialized(ServletContextEvent sce) {
FastDemo.initJob();
}
@Override
public void contextDestroyed(ServletContextEvent sce) {
}
}
web.xml
<listener>
<listener-class>com.xzy.elasticjob.servletListener.ServletContextLTest</listener-class>
</listener>
將項(xiàng)目打包成war分別放在128,129上面分別觀察日志
128日志
[root@server-1 bin]# taif -f /var/logs/elasticjob.log
-bash: taif: command not found
[root@server-1 bin]# tail -f /var/logs/elasticjob.log
Scheduler class: 'org.quartz.core.QuartzScheduler' - running locally.
NOT STARTED.
Currently in standby mode.
Number of jobs executed: 0
Using thread pool 'org.quartz.simpl.SimpleThreadPool' - with 1 threads.
Using job-store 'org.quartz.simpl.RAMJobStore' - which does not support persistence. and is not clustered.
2019-04-27 02:12:03 [ INFO] - org.quartz.impl.StdSchedulerFactory -StdSchedulerFactory.java(1339) -Quartz scheduler 'demoSimpleJob' initialized from an externally provided properties instance.
2019-04-27 02:12:03 [ INFO] - org.quartz.impl.StdSchedulerFactory -StdSchedulerFactory.java(1343) -Quartz scheduler version: 2.2.1
2019-04-27 02:12:04 [ INFO] - org.quartz.core.QuartzScheduler -QuartzScheduler.java(575) -Scheduler demoSimpleJob_$_NON_CLUSTERED started.
2019-04-27 02:12:15 [ INFO] - com.xzy.elasticjob.MyElasticJob -MyElasticJob.java(23) -shardingItem =2 , shardingTotalCount=4 , shardingParameter =C ,jobParamter =xuzy
2019-04-27 02:12:15 [ INFO] - com.xzy.elasticjob.MyElasticJob -MyElasticJob.java(42) -========長時(shí)間任務(wù)=======服裝訂單
2019-04-27 02:12:15 [ INFO] - com.xzy.elasticjob.MyElasticJob -MyElasticJob.java(23) -shardingItem =3 , shardingTotalCount=4 , shardingParameter =D ,jobParamter =xuzy
2019-04-27 02:12:15 [ INFO] - com.xzy.elasticjob.MyElasticJob -MyElasticJob.java(51) -========短時(shí)間任務(wù)=======機(jī)械訂單
2019-04-27 02:13:30 [ INFO] - com.xzy.elasticjob.MyElasticJob -MyElasticJob.java(23) -shardingItem =2 , shardingTotalCount=4 , shardingParameter =C ,jobParamter =xuzy
2019-04-27 02:13:30 [ INFO] - com.xzy.elasticjob.MyElasticJob -MyElasticJob.java(42) -========長時(shí)間任務(wù)=======服裝訂單
2019-04-27 02:13:30 [ INFO] - com.xzy.elasticjob.MyElasticJob -MyElasticJob.java(23) -shardingItem =3 , shardingTotalCount=4 , shardingParameter =D ,jobParamter =xuzy
2019-04-27 02:13:30 [ INFO] - com.xzy.elasticjob.MyElasticJob -MyElasticJob.java(51) -========短時(shí)間任務(wù)=======機(jī)械訂單
2019-04-27 02:14:30 [ INFO] - com.xzy.elasticjob.MyElasticJob -MyElasticJob.java(23) -shardingItem =2 , shardingTotalCount=4 , shardingParameter =C ,jobParamter =xuzy
2019-04-27 02:14:30 [ INFO] - com.xzy.elasticjob.MyElasticJob -MyElasticJob.java(42) -========長時(shí)間任務(wù)=======服裝訂單
2019-04-27 02:14:30 [ INFO] - com.xzy.elasticjob.MyElasticJob -MyElasticJob.java(23) -shardingItem =3 , shardingTotalCount=4 , shardingParameter =D ,jobParamter =xuzy
2019-04-27 02:14:30 [ INFO] - com.xzy.elasticjob.MyElasticJob -MyElasticJob.java(51) -========短時(shí)間任務(wù)=======機(jī)械訂單
129日志
[root@server-2 bin]# tail -f /var/logs/elasticjob.log
2019-04-27 02:12:15 [ INFO] - com.xzy.elasticjob.MyElasticJob -MyElasticJob.java(23) -shardingItem =1 , shardingTotalCount=4 , shardingParameter =B ,jobParamter =xuzy
2019-04-27 02:12:15 [ INFO] - com.xzy.elasticjob.MyElasticJob -MyElasticJob.java(51) -========短時(shí)間任務(wù)=======電腦訂單
2019-04-27 02:13:30 [ INFO] - com.xzy.elasticjob.MyElasticJob -MyElasticJob.java(23) -shardingItem =0 , shardingTotalCount=4 , shardingParameter =A ,jobParamter =xuzy
2019-04-27 02:13:30 [ INFO] - com.xzy.elasticjob.MyElasticJob -MyElasticJob.java(42) -========長時(shí)間任務(wù)=======食品訂單
2019-04-27 02:13:30 [ INFO] - com.xzy.elasticjob.MyElasticJob -MyElasticJob.java(23) -shardingItem =1 , shardingTotalCount=4 , shardingParameter =B ,jobParamter =xuzy
2019-04-27 02:13:30 [ INFO] - com.xzy.elasticjob.MyElasticJob -MyElasticJob.java(51) -========短時(shí)間任務(wù)=======電腦訂單
2019-04-27 02:14:30 [ INFO] - com.xzy.elasticjob.MyElasticJob -MyElasticJob.java(23) -shardingItem =0 , shardingTotalCount=4 , shardingParameter =A ,jobParamter =xuzy
2019-04-27 02:14:30 [ INFO] - com.xzy.elasticjob.MyElasticJob -MyElasticJob.java(42) -========長時(shí)間任務(wù)=======食品訂單
2019-04-27 02:14:30 [ INFO] - com.xzy.elasticjob.MyElasticJob -MyElasticJob.java(23) -shardingItem =1 , shardingTotalCount=4 , shardingParameter =B ,jobParamter =xuzy
2019-04-27 02:14:30 [ INFO] - com.xzy.elasticjob.MyElasticJob -MyElasticJob.java(51) -========短時(shí)間任務(wù)=======電腦訂單
2019-04-27 02:15:45 [ INFO] - com.xzy.elasticjob.MyElasticJob -MyElasticJob.java(23) -shardingItem =0 , shardingTotalCount=4 , shardingParameter =A ,jobParamter =xuzy
2019-04-27 02:15:45 [ INFO] - com.xzy.elasticjob.MyElasticJob -MyElasticJob.java(42) -========長時(shí)間任務(wù)=======食品訂單
2019-04-27 02:15:45 [ INFO] - com.xzy.elasticjob.MyElasticJob -MyElasticJob.java(23) -shardingItem =1 , shardingTotalCount=4 , shardingParameter =B ,jobParamter =xuzy
可以看到128,129分別處理了不同的數(shù)據(jù)分片碳默,期間假如128服務(wù)器掛了,由于配置了失效轉(zhuǎn)移缘眶,原來128的服裝訂單嘱根、機(jī)械訂單任務(wù)將轉(zhuǎn)移給129繼續(xù)執(zhí)行
elastic-job作業(yè)調(diào)度-zookeeper
elastic-job是通過zookeeper進(jìn)行任務(wù)協(xié)調(diào)和故障轉(zhuǎn)移。
當(dāng)啟動(dòng)項(xiàng)目后會(huì)發(fā)現(xiàn)任務(wù)在zookeeper注冊了節(jié)點(diǎn)磅崭,如下,首先在根據(jù)我們的配置創(chuàng)建了/elastic-job-demo/demoSimpleJob節(jié)點(diǎn)儿子,對應(yīng)的config,instances, leader, servers和sharding
- config節(jié)點(diǎn)
config節(jié)點(diǎn)記錄了任務(wù)的配置信息瓦哎,包含執(zhí)行類砸喻,cron表達(dá)式,分片算法類蒋譬,分片數(shù)量割岛,分片參數(shù)。默認(rèn)狀態(tài)下犯助,如果你修改了Job的配置比如cron表達(dá)式癣漆,分片數(shù)量等是不會(huì)更新到zookeeper上去的,除非你把參數(shù)overwrite修改成true或者使用rmr /elastic-job-demo/demoSimpleJob命令刪除節(jié)點(diǎn)并重新啟動(dòng)創(chuàng)建
{
"jobName": "demoSimpleJob", //任務(wù)名稱
"jobClass": "com.xzy.elasticjob.MyElasticJob", //具體執(zhí)行類
"jobType": "SIMPLE", //任務(wù)類型
"cron": "0/20 * * * * ?", //任務(wù)實(shí)行時(shí)間corn
"shardingTotalCount": 4, //總分片數(shù)
"shardingItemParameters": "0\u003dA,1\u003dB,2\u003dC,3\u003dD", //分片參數(shù)
"jobParameter": "xuzy", //任務(wù)參數(shù)
"failover": true, //是否失效轉(zhuǎn)移
"misfire": true,
"description": "",
"jobProperties": {
"job_exception_handler": "com.dangdang.ddframe.job.executor.handler.impl.DefaultJobExceptionHandler",//默認(rèn)的異常處理類
"executor_service_handler": "com.dangdang.ddframe.job.executor.handler.impl.DefaultExecutorServiceHandler" //默認(rèn)的作業(yè)處理線程池類
},
"monitorExecution": true,
"maxTimeDiffSeconds": -1,
"monitorPort": -1,
"jobShardingStrategyClass": "",
"reconcileIntervalMinutes": 10,
"disabled": false, //作業(yè)是否禁止啟動(dòng)
"overwrite": false //本地配置是否可覆蓋注冊中心配置
}
-
instances節(jié)點(diǎn)
同一個(gè)Job下的elastic-job的部署實(shí)例剂买。一臺(tái)機(jī)器上可以啟動(dòng)多個(gè)Job實(shí)例惠爽,也就是Jar包。instances的命名是IP+@-@+PID
如圖瞬哼,demoSimpleJob有兩個(gè)實(shí)例婚肆,地址如下
image.png -
leader節(jié)點(diǎn)
任務(wù)實(shí)例的主節(jié)點(diǎn)信息,通過zookeeper的主節(jié)點(diǎn)選舉坐慰,選出來的主節(jié)點(diǎn)信息较性。下面的子節(jié)點(diǎn)分為election,sharding和failover三個(gè)子節(jié)點(diǎn)。分別用于主節(jié)點(diǎn)選舉赞咙,分片和失效轉(zhuǎn)移處理责循。election下面的instance節(jié)點(diǎn)顯式了當(dāng)前主節(jié)點(diǎn)的實(shí)例ID:jobInstanceId。latch節(jié)點(diǎn)也是一個(gè)永久節(jié)點(diǎn)用于選舉時(shí)候的實(shí)現(xiàn)分布式鎖攀操。sharding節(jié)點(diǎn)下面有一個(gè)臨時(shí)節(jié)點(diǎn)院仿,necessary,是否需要重新分片的標(biāo)記速和。如果分片總數(shù)變化意蛀,或任務(wù)實(shí)例節(jié)點(diǎn)上下線或啟用/禁用,以及主節(jié)點(diǎn)選舉健芭,都會(huì)觸發(fā)設(shè)置重分片標(biāo)記县钥,主節(jié)點(diǎn)會(huì)進(jìn)行分片計(jì)算
image.png -
servers節(jié)點(diǎn)
記錄了任務(wù)實(shí)例的信息
image.png -
sharding節(jié)點(diǎn)
任務(wù)的分片信息,子節(jié)點(diǎn)是分片項(xiàng)序號慈迈,從零開始若贮,至分片總數(shù)減一。從這個(gè)節(jié)點(diǎn)可以看出哪個(gè)分片在哪個(gè)實(shí)例上運(yùn)行
image.png
image.png
elastic-job-lite-console
elastic-job-lite-console是elasticjob提供的管理工具
安裝方法
- 在https://github.com/miguangying/elastic-job-lite-console下載zip文件并壓縮得到tar.gz痒留。如果是linux則將tar.gz放到linux解壓谴麦,可以得到start.sh,點(diǎn)擊執(zhí)行伸头。如果是window則再次解壓tar.gz得到start.bat直接點(diǎn)擊執(zhí)行
- 登錄用戶名密碼root/root,默認(rèn)端口8899
-
進(jìn)入首頁后添加注冊中心配置命名空間為代碼上的命名空間匾效,頁面上可以查看每個(gè)作業(yè)的運(yùn)行情況和進(jìn)行手動(dòng)觸發(fā)
image.png
image.png
image.png
image.png