背景
在調(diào)度系統(tǒng)中存在很多定時執(zhí)行的任務(wù)挫掏,這些任務(wù)有不同的執(zhí)行周期峦阁,比如有每分鐘捣郊、每小時挎塌、每天執(zhí)行一次的车吹,也有可能是半天執(zhí)行一次的菠劝,且這些任務(wù)之間需要建立依賴關(guān)系知押,組成一個數(shù)據(jù)處理流疚沐。
開發(fā)一個調(diào)度系統(tǒng)栽燕,首先需要解決這些不同周期任務(wù)相互依賴的問題罕袋,保證任務(wù)能夠根據(jù)配置的定時和依賴關(guān)系正確觸發(fā)執(zhí)行改淑。
任務(wù)周期
如下我們可以舉出這些周期類型
public enum JobCycle {
MINUTE(1), HOUR(2), DAY(3), WEEK(4), MONTH(5), YEAR(6), NONE(7);
private int code;
JobCycle(int code) {
this.code = code;
}
public static JobCycle of(int code) {
for (JobCycle jobCycle : JobCycle.values()) {
if (jobCycle.code == code) {
return jobCycle;
}
}
throw new IllegalArgumentException("unsupported job cycle " + code);
}
}
一個任務(wù)是哪種周期類型,可以由用戶設(shè)定的 cron
表達(dá)式判斷計(jì)算出來浴讯,而不是讓用戶指定周期類型朵夏,這樣容易造成周期類型與表達(dá)式不一致。
CronUtils
假如使用 Quartz 的語法 "1 0 3 * * ?"
表示每天 03:00:01
執(zhí)行榆纽,如果要判斷周期類型仰猖,需要將這個語義轉(zhuǎn)換成數(shù)字以便于計(jì)算,所以首先想到的是找一個工具來幫助我們計(jì)算出表達(dá)式所要表示的頻率奈籽。
使用 quartz
提供的 TriggerUtils.computeFireTimes
方法可以間接計(jì)算出 interval
public class CronUtils {
public static long intervalOf(String cron) {
return intervalOf(getCronTrigger(cron));
}
public static long intervalOf(CronTriggerImpl cronTrigger) {
List<Date> dates = computeFireTimes(cronTrigger, null, 2);
Date next = dates.get(0);
Date nextNext = dates.get(1);
return nextNext.getTime() - next.getTime();
}
private static CronTriggerImpl getCronTrigger(String cron) {
final CronTriggerImpl cronTrigger = new CronTriggerImpl();
try {
cronTrigger.setCronExpression(cron);
} catch (ParseException e) {
throw new RuntimeException("Cron expression is invalid");
}
return cronTrigger;
}
}
由此饥侵,我們可以進(jìn)一步判斷出一個任務(wù)的 JobCycle
public enum JobCycle {
....
private static final long minute = 60_000;
private static final long hour = 60 * minute;
private static final long day = 24 * hour;
private static final long week = 7 * day;
private static final long month = 28 * day;
private static final long year = 365 * day;
public static JobCycle from(Long interval) {
if (interval >= minute && interval < hour) return MINUTE;
if (interval >= hour && interval < day) return HOUR;
if (interval >= day && interval < week) return DAY;
if (interval >= week && interval < month) return WEEK;
if (interval >= month) return MONTH;
return NONE;
}
...
}
同周期依賴
在大數(shù)據(jù) ETL 任務(wù)中,絕大多數(shù)屬于天級的任務(wù)衣屏,即今天對昨天一整天的數(shù)據(jù)集成然后計(jì)算爆捞,這里隱含了兩種相關(guān)聯(lián)的時間
- 數(shù)據(jù)時間,如 hive 中的天分區(qū)
dt ='2019-11-12'
- 計(jì)算時間勾拉,即調(diào)度系統(tǒng)的調(diào)度時間
schedule_time
煮甥,或者quartz
的firetime
數(shù)據(jù)時間 = 調(diào)度時間 - 調(diào)度頻率
如果 P 任務(wù)每天02:01:04 執(zhí)行,C 任務(wù)每天 03:00:01 執(zhí)行
String parentCron = "4 1 2 * * ?"; //P
String childCron = "1 0 3 * * ?"; //C
C 依賴于 P藕赞, P <- C成肘,當(dāng) P 有如下執(zhí)行歷史時
TreeSet<TaskSuccessRecord> history = new TreeSet<>();
history.add(of(parentCron, parse("2019-11-09 02:01:04")));
history.add(of(parentCron, parse("2019-11-10 02:01:04")));
當(dāng) C 在 2019-11-10 03:00:01
觸發(fā)時,如何根據(jù) P 的成功的歷史記錄判斷依賴是否滿足斧蜕?C 可以運(yùn)行的前提是 P 的數(shù)據(jù)已經(jīng)準(zhǔn)備好双霍,對于天級的離線表來說表示 dt = 2019-11-09
分區(qū)生成,C 可以基于這個分區(qū)的數(shù)據(jù)生成 C 的 dt = 2019-11-09
分區(qū)批销,2019-11-09
即數(shù)據(jù)時間洒闸,但是判斷數(shù)據(jù)時間,比如檢查文件目錄有沒有生成均芽,或者檢查數(shù)據(jù)量比較復(fù)雜丘逸。
所以通常的做法是檢查調(diào)度時間,即在 2019-11-10 03:00:01
時掀宋,如果 P 的歷史中存在 2019-11-10
這一天運(yùn)行成功的記錄深纲,那么就認(rèn)為 2019-11-09
的數(shù)據(jù)已經(jīng)就緒,C 的依賴條件滿足劲妙。如上 CronUtils
有方法可以計(jì)算出兩個任務(wù)的周期都是天湃鹊,所以我們知道是在同一天父任務(wù)運(yùn)行成功就行了,但是如何確切判斷知道 2019-11-10 02:01:04
這個記錄呢镣奋?這個牽涉到如何根據(jù)一個任意時間計(jì)算一個任務(wù)的當(dāng)前币呵、下一個以及前一個調(diào)度時刻。
我們需要在 CronUtils 類中實(shí)現(xiàn)如下方法
/**
* Compute schedule time by a given point.
*
* pre of next
* -2 -1 sometime 0
* |__________________|________|____________|
* |____interval______|
*
*/
public class CronUtils {
public static LocalDateTime previousScheduleTimeOf(String cron, LocalDateTime sometime) {
return scheduleTime(cron, sometime, -2);
}
public static LocalDateTime scheduleTimeOf(String cron, LocalDateTime sometime) {
return scheduleTime(cron, sometime, -1);
}
public static LocalDateTime nextScheduleTimeOf(String cron, LocalDateTime sometime) {
return scheduleTime(cron, sometime, 0);
}
private static LocalDateTime scheduleTime(String cron, LocalDateTime sometime, int offset) {
CronTriggerImpl cronTrigger = getCronTrigger(cron);
long interval = intervalOf(cronTrigger);
Date from = from(sometime.atZone(systemDefault()).toInstant());
Date to = new Date(from.getTime() + interval);
List<Date> dates = computeFireTimesBetween(cronTrigger, null, from, to);
Date next = dates.get(0);
return ofEpochMilli(next.getTime() + interval * offset).atZone(systemDefault()).toLocalDateTime();
}
...
}
有了這些方法之后侨颈,我們可以計(jì)算出 2019-11-10 00:00:00
之后的第一個調(diào)度時間在 history
中存在就可以了
LocalDateTime parentScheduleTime = nextScheduleTimeOf(parentCron, parse(`2019-11-10 00:00:00`))
assertThat(history.has(of(parentCron, parentScheduleTime))).isTrue();
這里需要說一下 history 如果是一個 TreeSet 是沒有 has 方法余赢,可以使用 ceiling
來查找如下
@Test
public void tree_set_correct_search_method() {
TreeSet<Integer> set = new TreeSet<>();
set.add(1);
set.add(2);
set.add(4);
assertThat(set.ceiling(2)).isEqualTo(2); // ceiling includes equals
assertThat(set.higher(2)).isEqualTo(4);
}
有些任務(wù)可能剛好設(shè)置的是 0 點(diǎn)開始調(diào)度的掸驱,所以使用 ceiling
而不是 higher
。
之所以使用 Set
的原因是没佑,當(dāng)一個任務(wù)運(yùn)行多次毕贼,比如除系統(tǒng)調(diào)度運(yùn)行外,用戶可能會手動執(zhí)行蛤奢,Set
可以去重只保留一條記錄鬼癣,從而能簡化依賴判斷,使用 TreeSet
而不是其它 Set
的原因是數(shù)據(jù)可以模擬實(shí)際的調(diào)度的情景啤贩,按照調(diào)度時間有序待秃,且方便查找,因此成功記錄需要實(shí)現(xiàn) Comparator
接口痹屹。
@Data
public class TaskSuccessRecord implements Comparable<TaskSuccessRecord> {
private final LocalDateTime scheduleTime;
private final String cronExpression;
public static TaskSuccessRecord of(String cronExpression, LocalDateTime scheduleTime) {
requireNonNull(cronExpression, "Cron is null");
requireNonNull(scheduleTime, "Schedule time is null");
return new TaskSuccessRecord(cronExpression, scheduleTime);
}
public TaskSuccessRecord(String cronExpression, LocalDateTime scheduleTime) {
this.cronExpression = cronExpression;
this.scheduleTime = scheduleTime;
}
public long interval() {
return intervalOf(cronExpression);
}
@Override
public int compareTo(TaskSuccessRecord lastRecord) {
return scheduleTime.truncatedTo(SECONDS)
.compareTo(lastRecord.getScheduleTime()
.truncatedTo(SECONDS));
}
public boolean cronEquals(String cronExpression) {
return this.cronExpression.equals(cronExpression);
}
}
這樣我們應(yīng)該可以從 history
中找到成功記錄章郁,不過知道 2019-11-10 00:00:00
這個起始時間其實(shí)是需要計(jì)算出來的,即根據(jù) C 的調(diào)度時間 2019-11-10 03:00:01
計(jì)算出來志衍,先計(jì)算出 C 的周期暖庄,然后取 C 周期的起始時間,我們得增加如下方法
public enum JobCycle {
...
public static ChronoUnit truncateUnit(Long interval) {
switch (from(interval)) {
case MINUTE:
return MINUTES;
case HOUR:
return HOURS;
case DAY:
return DAYS;
case WEEK:
return WEEKS;
case MONTH:
return MONTHS;
case YEAR:
return YEARS;
}
return null;
}
}
...
然后
long interval = intervalOf(childCron);
ChronoUnit truncateUnit = truncateUnit(interval); //DAYS
parse(`2019-11-10 03:00:01` ).truncateTo(truncateUnit); //2019-11-10 00:00:00
大周期依賴小周期
依據(jù)二八法則楼肪,80% 任務(wù)可能都是天級的任務(wù)培廓,但是 20% 的任務(wù)可能都各種各樣,屬于不周的周期春叫,比如小時肩钠,周等,而且要互相依賴暂殖。
我們先繼續(xù)考查大周期依賴小周期价匠,比如 C 是天級,P 是小時級
String parentCron = "4 1 */1 * * ?"; //P 每小時 01:04 執(zhí)行
String childCron = "3 1 3 * * ?"; //C 每天 03:01:03 執(zhí)行
TaskSuccessRecord p1 = of(parentCron, parse("2019-11-09 22:01:04"));
TaskSuccessRecord p2 = of(parentCron, parse("2019-11-09 23:01:04"));
TaskSuccessRecord p3 = of(parentCron, parse("2019-11-10 00:01:04"));
TaskSuccessRecord p4 = of(parentCron, parse("2019-11-10 01:01:04"));
TaskSuccessRecord p5 = of(parentCron, parse("2019-11-10 02:01:04"));
TaskSuccessRecord p6 = of(parentCron, parse("2019-11-10 03:01:04"));
history.add(p1);
history.add(p2);
history.add(p3);
history.add(p4);
history.add(p5);
history.add(p6);
C 是一個天級的表呛每,一個分區(qū)代表一整天的數(shù)據(jù)踩窖,而 P 需要 24 個小時分區(qū)代表一整天的數(shù)據(jù),C 只需要 P 前一天的 23 小時的數(shù)據(jù)就緒即可莉给,P 2019-11-10 03:01:03
計(jì)算的是 23 的數(shù)據(jù)毙石,有了前一小節(jié)同周期依賴的經(jīng)驗(yàn)廉沮,我們可以很容易知道只需要判斷 p3 是否生成就好了颓遏。
因此,我們得到一個生成檢查點(diǎn)的規(guī)率滞时,需要用大周期生成
private LocalDateTime checkPointBase(String scheduleTimeStr, String theGreaterCycleCron) {
long interval = intervalOf(theGreaterCycleCron);
return parse(scheduleTimeStr).truncatedTo(truncateUnit(interval));
}
最后叁幢,檢查的方法是這樣的
TaskSuccessRecord checkPoint = of(childCron,
nextScheduleTimeOf(parentCron, checkPointBase("2019-11-10 03:01:03", childCron)));
assertThat(history.ceiling(checkPoint)).isEqualTo(p3);
小周期依賴大周期
對于以上的方法我們可以繼續(xù)對其它情況進(jìn)行檢查,比如小時依賴天任務(wù)
@Test
public void child_hour_parent_day() {
String parentCron = "3 1 3 * * ?"; //P 每天 03:01:03 執(zhí)行
String childCron = "2 1 */1 * * ?"; //C 每小時 01:02 執(zhí)行
TaskSuccessRecord p1 = of(parentCron, parse("2019-11-09 03:01:03"));
history.add(p1);
TaskSuccessRecord check_point_1 = of(childCron,
nextScheduleTimeOf(parentCron, checkPointBase("2019-11-10 03:01:02", parentCron)));
assertThat(history.ceiling(check_point_1)).isNull();
TaskSuccessRecord p2 = of(parentCron, parse("2019-11-10 03:01:03"));
history.add(p2);
TaskSuccessRecord check_point_2 = of(childCron,
nextScheduleTimeOf(parentCron, checkPointBase("2019-11-10 04:01:02", parentCron)));
assertThat(history.ceiling(check_point_2)).isEqualTo(p2);
}
以上也是可以成功檢查到的坪稽,但是實(shí)際情況更能會更復(fù)雜曼玩,比如父任務(wù)是 1,13 小時各運(yùn)行一次鳞骤,即半天運(yùn)行一次,但是子任務(wù)是每小運(yùn)行一次黍判,這個時候需要根據(jù)半天這個大周期來偏移豫尽,LocalDateTime 沒有直接的方法來 truncate 半天,因此我們需要修改一下 checkPointBase
方法
private LocalDateTime checkPointBase(String scheduleTimeStr, String theGreaterCycleCron) {
long interval = intervalOf(theGreaterCycleCron);
ChronoUnit truncateUnit = truncateUnit(interval);
Integer cycles = numberOfCycles(interval);
return parse(scheduleTimeStr).truncatedTo(truncateUnit).minus(cycles - 1, truncateUnit);
}
以及添加計(jì)算周期數(shù)的方法
public static Integer numberOfCycles(Long interval) {
return round(interval / from(interval).cycleInterval());
}
然后我們模擬上面的例子
@Test
public void child_hour_parent_hour_1_and_13() {
String parentCron = "4 1 1,13 * * ?";
String childCron = "3 1 */1 * ?";
TaskSuccessRecord p1 = of(parentCron, parse("2019-11-09 13:01:04"));
TaskSuccessRecord p2 = of(parentCron, parse("2019-11-10 01:01:04"));
history.add(p1);
TaskSuccessRecord check_point_1 = of(childCron,
nextScheduleTimeOf(parentCron, checkPointBase("2019-11-09 13:01:03", parentCron)));
assertThat(history.ceiling(check_point_1)).isEqualTo(p1);
TaskSuccessRecord check_point_2 = of(childCron,
nextScheduleTimeOf(parentCron, checkPointBase("2019-11-09 14:01:03", parentCron)));
assertThat(history.ceiling(check_point_2)).isEqualTo(p1);
}
更多情況測試
@Test
public void child_day_parent_hour_1_and_13() {
String parentCron = "4 1 1,13 * * ?";
String childCron = "3 1 3 * * ?";
TaskSuccessRecord p1 = of(parentCron, parse("2019-11-09 13:01:04"));
TaskSuccessRecord p2 = of(parentCron, parse("2019-11-10 01:01:04"));
history.add(p1);
TaskSuccessRecord checkPoint = of(childCron,
nextScheduleTimeOf(parentCron, checkPointBase("2019-11-10 03:01:03", childCron)));
assertThat(history.ceiling(checkPoint)).isNull();
history.add(p2);
assertThat(history.ceiling(checkPoint)).isEqualTo(p2);
}
@Test
public void child_hour_parent_minute() {
String parentCron = "3 */5 * * * ?";
String childCron = "4 5 */1 * * ?";
TaskSuccessRecord p1 = of(parentCron, parse("2019-11-10 00:50:03"));
TaskSuccessRecord p2 = of(parentCron, parse("2019-11-10 00:55:03"));
TaskSuccessRecord p3 = of(parentCron, parse("2019-11-10 01:05:03"));
history.add(p1);
history.add(p2);
TaskSuccessRecord checkPoint = of(childCron,
nextScheduleTimeOf(parentCron, checkPointBase("2019-11-10 01:05:04", childCron)));
assertThat(history.ceiling(checkPoint)).isNull();
history.add(p3);
assertThat(history.ceiling(checkPoint)).isEqualTo(p3);
}
@Test
public void child_hour_parent_day() {
String parentCron = "3 1 3 * * ?"; //P 每天 03:01:03 執(zhí)行
String childCron = "2 1 */1 * * ?"; //C 每小時 01:02 執(zhí)行
TaskSuccessRecord p1 = of(parentCron, parse("2019-11-09 03:01:03"));
history.add(p1);
TaskSuccessRecord check_point_1 = of(childCron,
nextScheduleTimeOf(parentCron, checkPointBase("2019-11-10 03:01:02", parentCron)));
assertThat(history.ceiling(check_point_1)).isNull();
TaskSuccessRecord p2 = of(parentCron, parse("2019-11-10 03:01:03"));
history.add(p2);
TaskSuccessRecord check_point_2 = of(childCron,
nextScheduleTimeOf(parentCron, checkPointBase("2019-11-10 04:01:02", parentCron)));
assertThat(history.ceiling(check_point_2)).isEqualTo(p2);
}
其它問題
以上我們找到了一種能夠覆蓋多種場景的通用的檢查依賴的方法顷帖,可以簡化代碼的復(fù)雜度美旧,但仍然還有其它情況需要考慮,比如 cron 表達(dá)式中途變更之后贬墩,周期發(fā)生改變的情況榴嗅,比如天依賴小時,24 小時中有失敗的情況等陶舞。
其中 cron 表達(dá)式變化嗽测,如果變化之后的周期比之前小,歷史記錄是有效的肿孵,反之則需要重新開始依賴唠粥,可以試著推理看看。對于小時任務(wù)失敗的情況停做,一種解決辦法是自依賴厅贪,任務(wù)自己依賴自己的上一個周期,這種情況如果 23 時成功雅宾,表示全部成功养涮,自依賴也是調(diào)度系統(tǒng)需要支持的特性,它的依賴方式跟同周期依賴相似眉抬,但稍有差別贯吓。但是自依賴任務(wù)有失敗重跑比較耗時從而容易導(dǎo)致數(shù)據(jù)過度延遲的風(fēng)險,因此還是需要依賴判斷支持更細(xì)粒度的檢查蜀变。
不過總得來說悄谐,支持更多情況只需要在前面的方法的基礎(chǔ)上擴(kuò)展,是很容易實(shí)現(xiàn)的库北,不會對既有的結(jié)構(gòu)產(chǎn)生大的變化爬舰,或者可以期待博主有進(jìn)一步的更新,前面的例子中的代碼寒瓦,請?jiān)L問我的 github 項(xiàng)目 https://github.com/artiship/cyclic情屹,有問題歡迎留言交流。
寫在最后
找尋如上的方法得益于我對單元測試的使用杂腰,人的記憶據(jù)說只有 5 個槽, 比如我們短時記憶很容易記住 5 個數(shù)字垃你,超出 5 個就略顯困難了,在做復(fù)雜的推理時,過多的條件在腦中很難模擬惜颇,借助測試可以理清思路皆刺,其實(shí)測試很像是在做研究的過程,先提出一個假設(shè)凌摄,再尋找解決辦法羡蛾,再舉出很多情況來驗(yàn)證這個辦法是否通行,如此反復(fù)锨亏,這關(guān)乎科學(xué)林说。