在elastic-job的使用過(guò)程中,我們會(huì)遇到動(dòng)態(tài)添加定時(shí)任務(wù)的時(shí)候,但是官網(wǎng)上面并沒(méi)有對(duì)這塊內(nèi)容進(jìn)行說(shuō)明秧骑。按照我的理解以及官網(wǎng)上面elastic-job的框架圖,ej的定時(shí)任務(wù)其實(shí)是存儲(chǔ)在zookeeper的一個(gè)個(gè)節(jié)點(diǎn)上面扣囊,所以通過(guò)給zookeeper添加對(duì)應(yīng)的節(jié)點(diǎn)即可完成定時(shí)任務(wù)的添加動(dòng)作乎折。
下面上代碼:
import java.text.SimpleDateFormat;
import java.util.Date;
import com.dangdang.ddframe.job.api.ShardingContext;
import com.dangdang.ddframe.job.api.simple.SimpleJob;
import com.dangdang.ddframe.job.config.JobCoreConfiguration;
import com.dangdang.ddframe.job.config.simple.SimpleJobConfiguration;
import com.dangdang.ddframe.job.exception.JobSystemException;
import com.dangdang.ddframe.job.lite.api.JobScheduler;
import com.dangdang.ddframe.job.lite.config.LiteJobConfiguration;
import com.dangdang.ddframe.job.reg.zookeeper.ZookeeperRegistryCenter;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
public class DynamicAddJob implements SimpleJob{
private static final String CRON_DATE_FORMAT = "ss mm HH dd MM ? yyyy";
/***
* @param date 時(shí)間
* @return cron類型的日期
*/
public static String getCron(final Date date) {
SimpleDateFormat sdf = new SimpleDateFormat(CRON_DATE_FORMAT);
String formatTimeStr = "";
if (date != null) {
formatTimeStr = sdf.format(date);
}
return formatTimeStr;
}
public static void main(String[] args){
ApplicationContext context = new ClassPathXmlApplicationContext("classpath:spring/applicationContext-job.xml");
ZookeeperRegistryCenter zookeeperRegistryCenter = context.getBean(ZookeeperRegistryCenter.class);
long now = System.currentTimeMillis();
for (int i = 0; i < 100; i++) {
String cron = getCron(new Date(now + (i + 1) * 50000));
JobCoreConfiguration coreConfig = JobCoreConfiguration.newBuilder("dynamicDemoJob-" + i, cron, 2).build();
SimpleJobConfiguration simpleJobConfig = new SimpleJobConfiguration(coreConfig, DynamicAddJob.class.getCanonicalName());
JobScheduler jobScheduler = new JobScheduler(zookeeperRegistryCenter, LiteJobConfiguration.newBuilder(simpleJobConfig).build());
try {
jobScheduler.init();
}catch (JobSystemException e){
e.printStackTrace();
}
}
}
@Override
public void execute(ShardingContext shardingContext) {
switch (shardingContext.getShardingItem()){
case 0:
System.out.println("doing sharding 0...job name is "+shardingContext.getJobName());
// do something by sharding item 0
break;
case 1:
System.out.println("doing sharding 1...job name is "+shardingContext.getJobName());
// do something by sharding item 1
break;
}
}
}
這里用到比較重要的一個(gè)類是JobScheduler,這是lite-core里面一個(gè)比較核心的類侵歇,這個(gè)類其實(shí)就是我們的job骂澄,他的構(gòu)造方法包含以下參數(shù):
- CoordinatorRegistryCenter regCenter:注冊(cè)中心,這里是zookeeper
- LiteJobConfiguration liteJobConfig:定時(shí)任務(wù)的配置信息
這里可以看一下LiteJobConfiguration這個(gè)類惕虑,采用了設(shè)計(jì)模式中的建造者模式進(jìn)行構(gòu)建坟冲。可能看著會(huì)比較摸不著頭腦溃蔫,里面的Builder跟平時(shí)的不太一樣健提,這里我們需要知道的是ej的源碼采用了lombok這個(gè)代碼簡(jiǎn)化的工具,只需要通過(guò)注解的形式就能將我們平時(shí)所需要的get/set和構(gòu)造器的內(nèi)容在編譯時(shí)創(chuàng)建出來(lái)伟叛,不需要在代碼中體現(xiàn)私痹,能夠大大簡(jiǎn)化我們的代碼。
另外還遇到一個(gè)坑。這段代碼不能重復(fù)使用紊遵,第一次跑的時(shí)候沒(méi)問(wèn)題账千,過(guò)段時(shí)間再次跑這個(gè)代碼時(shí),會(huì)在init()處報(bào)錯(cuò)暗膜,原因是我們新建的job根本不能被fire匀奏,我跟了進(jìn)去。發(fā)現(xiàn)学搜,job的cron表達(dá)式表示的時(shí)間還是以前的時(shí)間攒射,這就奇怪了,明明我這邊配置了一個(gè)新的時(shí)間恒水。通過(guò)debug会放,進(jìn)入init方法中,發(fā)現(xiàn)他會(huì)更新job信息钉凌,而更新時(shí)咧最,會(huì)去zk上面load配置信息,而zk的znode節(jié)點(diǎn)是老的節(jié)點(diǎn)御雕,上面存儲(chǔ)的配置信息也是老的矢沿,所以這塊的cron表達(dá)式也是舊的時(shí)間,根本不會(huì)被執(zhí)行酸纲,下面貼出源碼捣鲸,供大家參考。
init()源碼:
/**
* 初始化作業(yè).
*/
public void init() {
LiteJobConfiguration liteJobConfigFromRegCenter = schedulerFacade.updateJobConfiguration(liteJobConfig);
JobRegistry.getInstance().setCurrentShardingTotalCount(liteJobConfigFromRegCenter.getJobName(), liteJobConfigFromRegCenter.getTypeConfig().getCoreConfig().getShardingTotalCount());
JobScheduleController jobScheduleController = new JobScheduleController(createScheduler(), createJobDetail(liteJobConfigFromRegCenter.getTypeConfig().getJobClass()), liteJobConfigFromRegCenter.getJobName());
JobRegistry.getInstance().registerJob(liteJobConfigFromRegCenter.getJobName(), jobScheduleController, regCenter);
schedulerFacade.registerStartUpInfo(!liteJobConfigFromRegCenter.isDisabled());
jobScheduleController.scheduleJob(liteJobConfigFromRegCenter.getTypeConfig().getCoreConfig().getCron());
}
updateJobConfiguration()的源碼如下:
/**
* 更新作業(yè)配置.
*
* @param liteJobConfig 作業(yè)配置
* @return 更新后的作業(yè)配置
*/
public LiteJobConfiguration updateJobConfiguration(final LiteJobConfiguration liteJobConfig) {
configService.persist(liteJobConfig);
return configService.load(false);
}
load()源碼如下:
/**
* 讀取作業(yè)配置.
*
* @param fromCache 是否從緩存中讀取
* @return 作業(yè)配置
*/
public LiteJobConfiguration load(final boolean fromCache) {
String result;
if (fromCache) {
result = jobNodeStorage.getJobNodeData(ConfigurationNode.ROOT);
if (null == result) {
result = jobNodeStorage.getJobNodeDataDirectly(ConfigurationNode.ROOT);
}
} else {
result = jobNodeStorage.getJobNodeDataDirectly(ConfigurationNode.ROOT);
}
return LiteJobConfigurationGsonFactory.fromJson(result);
}
可以發(fā)現(xiàn)這塊load有兩種闽坡,一種是從緩存(這里的緩存使用Map來(lái)實(shí)現(xiàn)的TreeCache)中獲取getJobNodeData栽惶,一種是從注冊(cè)中心也就是zookeeper中獲取getJobNodeDataDirectly。load的時(shí)候疾嗅,根據(jù)的是zk的路徑外厂,其實(shí)也就是任務(wù)的jobName,所以我們要盡量避免任務(wù)名稱的重復(fù)代承。