? elasticjob是使用zk做分布式協(xié)調(diào)颤殴,包括分片參數(shù)配置,再分片二拐,選主節(jié)點服鹅,作業(yè)狀態(tài)等一系列配置和運行狀態(tài)的數(shù)據(jù)都是保存在zk中,包括console是直接通過修改zk節(jié)點數(shù)據(jù)百新,使配置項直接生效的企软。
那么,當(dāng)zk節(jié)點參數(shù)發(fā)生變化吟孙,elasticJob是如何感知的澜倦?
? 在初始化JobScheduler對象中的schedulerFacade該屬性的時候,構(gòu)造方法中曾經(jīng)初始化過ListenterManager杰妓,該對象主要做了一件事情,就是監(jiān)控各zk節(jié)點數(shù)據(jù)的變化碘勉。
? 在這張圖中巷挥,能看到所有的對zk節(jié)點監(jiān)控的listener都是通過一些Listener去實現(xiàn)的⊙槊遥看類圖:
從類圖上看倍宾,所有的zk節(jié)點的監(jiān)控都是實現(xiàn)TreeCacheListener接口的雏节,那么TreeCacheListener這個接口到底是什么?
? 在curator中高职,提供了三種緩存方式钩乍,PathCache,NodeCache怔锌,TreeCache寥粹,并支持對這三種緩存做監(jiān)控,進而間接監(jiān)控了zk節(jié)點埃元。分別如下:
-
Path Cache 可以監(jiān)控某個節(jié)點的子節(jié)點
- PathChildrenCacheListener 節(jié)點監(jiān)聽器接口
-
Node Cache 監(jiān)控某一個特定節(jié)點
- NodeCacheListener 節(jié)點監(jiān)聽器接口
-
Tree Cache 除了監(jiān)控節(jié)點涝涤,還可以監(jiān)控該節(jié)點的子節(jié)點,像Path Cache和Node Cache的組合岛杀,監(jiān)控整個樹
- TreeCacheListener 節(jié)點監(jiān)聽器接口
? 再回頭看看上篇博客阔拳,在elastic-Job中,是使用treeCache去緩存數(shù)據(jù)的类嗤,并且在JobScheduler.init()方法中糊肠,調(diào)用了JobRegistry.getInstance().registerJob()方法,該方法里又調(diào)用了regCenter.addCacheData("/" + jobName);
public void init() {
LiteJobConfiguration liteJobConfigFromRegCenter = schedulerFacade.updateJobConfiguration(liteJobConfig);
JobRegistry.getInstance().setCurrentShardingTotalCount(liteJobConfigFromRegCenter.getJobName(), liteJobConfigFromRegCenter.getTypeConfig().getCoreConfig().getShardingTotalCount());
JobScheduleController jobScheduleController = new JobScheduleController(
createScheduler(), createJobDetail(liteJobConfigFromRegCenter.getTypeConfig().getJobClass()), liteJobConfigFromRegCenter.getJobName());
//在這里注冊作業(yè)遗锣,添加TreeCache
JobRegistry.getInstance().registerJob(liteJobConfigFromRegCenter.getJobName(), jobScheduleController, regCenter);
schedulerFacade.registerStartUpInfo(!liteJobConfigFromRegCenter.isDisabled());
jobScheduleController.scheduleJob(liteJobConfigFromRegCenter.getTypeConfig().getCoreConfig().getCron());
}
public void registerJob(final String jobName, final JobScheduleController jobScheduleController, final CoordinatorRegistryCenter regCenter) {
schedulerMap.put(jobName, jobScheduleController);
regCenterMap.put(jobName, regCenter);
//添加一條TreeCache
regCenter.addCacheData("/" + jobName);
}
? 在作業(yè)注冊的時候添加了一個treeCache罪针,實現(xiàn)對這個jobname命名的節(jié)點及其子節(jié)點監(jiān)控。所以黄伊,當(dāng)該節(jié)點及其子節(jié)點發(fā)生變化泪酱,所有監(jiān)聽該treeCache的Listener都會有感知。各個Listenter根據(jù)自己的邏輯判斷是否是該節(jié)點變化还最,EventType是不是自己關(guān)心的類型等等做自己的義務(wù)邏輯墓阀。
class CronSettingAndJobEventChangedJobListener extends AbstractJobListener {
@Override
protected void dataChanged(final String path, final Type eventType, final String data) {
if (configNode.isConfigPath(path) && Type.NODE_UPDATED == eventType && !JobRegistry.getInstance().isShutdown(jobName)) {
JobRegistry.getInstance().getJobScheduleController(jobName).rescheduleJob(LiteJobConfigurationGsonFactory.fromJson(data).getTypeConfig().getCoreConfig().getCron());
}
}
}
//AbstractListenerManager.java
protected void addDataListener(final TreeCacheListener listener) {
jobNodeStorage.addDataListener(listener);
}
//JobNodeStorage.java
public void addDataListener(final TreeCacheListener listener) {
TreeCache cache = (TreeCache) regCenter.getRawCache("/" + jobName);
cache.getListenable().addListener(listener);
}
比如,CronSettingAndJobEventChangedJobListener該Listener拓轻,當(dāng)節(jié)點數(shù)據(jù)發(fā)生變化斯撮,比如修改了zk上保存的cron表達式,首先該Listener會去判斷是不是config節(jié)點扶叉,該節(jié)點發(fā)生的類型是否是update類型勿锅,是否又修改過節(jié)點信息,該job是不是已經(jīng)停止枣氧,倘若符合這幾個條件溢十,就重啟rescheduleJob該作業(yè)對應(yīng)的調(diào)度器,使job的相關(guān)配置重新生效达吞。