在java業(yè)務(wù)開發(fā)過程中瓤狐,經(jīng)常可能會有這樣的需求批幌,我需要在未來的某個時間點執(zhí)行一個任務(wù)础锐,而這個任務(wù)是一次性的。又或者是需要動態(tài)的創(chuàng)建一個時間線荧缘,在某個時間點對應(yīng)的做某一件事情皆警。而通過定時任務(wù)來做的話,很難達到這樣的功能截粗,只能通過一個短間隔的定時任務(wù)去一直判斷當(dāng)前時間信姓,從而執(zhí)行某個任務(wù)鸵隧。而如果是需要在未來某一時間點執(zhí)行某任務(wù)的時候,如果僅僅只是使用內(nèi)存來存這個未來的時間點意推,則會有進程重啟后丟失的風(fēng)險豆瘫。這里提供一種通用的時間線任務(wù)的實現(xiàn)方案供大家參考
實現(xiàn)思路
最核心的一個實現(xiàn)還是需要依賴一個定時器。這個定時器可以是1秒一次左痢,也可以是100毫秒一次靡羡。全局唯一的用于監(jiān)測時間的一個定時器。然后時間線存儲在redis里俊性,通過redis zset結(jié)構(gòu)來存儲略步,key為以服務(wù)名或者id生成的固定key,member為指定時間需要執(zhí)行任務(wù)的一些信息定页,score為時間線的時間戳趟薄。這樣設(shè)計,就可以通過對score范圍來拉取需要執(zhí)行的任務(wù)典徊。此外杭煎,還需要一個redis Set結(jié)構(gòu)來存儲正在執(zhí)行的任務(wù),并在執(zhí)行完成后remove成員卒落。這個set可以用于監(jiān)測是否有執(zhí)行異常的任務(wù)羡铲,是否需要自動或者是手動重試。
此外儡毕,因為要做成通用的也切,所以所執(zhí)行的方法名和參數(shù)也不能是固定,由于涉及一些方法腰湾,需要在指定實例里運行雷恃,比如spring里面的service bean,所以這里設(shè)計有兩種方案:
1费坊、實例獲取使用實例注冊的方式倒槐,在spring啟動后創(chuàng)建bean的時候,通過把bean本身注冊到一個自定義的bean中附井,并在bean里面用map存儲實例讨越,需要執(zhí)行的時候只需要用key去把實例取出來。由于是Object類型的示例永毅,所以需要用反射來獲取方法谎痢。獲取方法這個步驟,也可以在初始化的時候完成并緩存到一個map中卷雕,可以提升一些性能节猿。
2、實例不需要注冊,在創(chuàng)建時間任務(wù)的時候把this參數(shù)傳入滨嘱,通過反射獲取Method(反射內(nèi)容其實可以做緩存),并緩存實例和Method峰鄙。在執(zhí)行的時候再拿出來。
說了這么多太雨,可能理解的不是很明白吟榴,下面直接上偽代碼 :
代碼實現(xiàn)(上述方案二)
首先我們把這個實現(xiàn)時間線任務(wù)的類定義為Timer,下面是Timer的接口
public interface Timer {
/**
* 業(yè)務(wù)調(diào)用的方法,用于創(chuàng)建時間任務(wù)
*
* @param obj 執(zhí)行任務(wù)所在方法的示例
* @param method 方法名字
* @param futureTime 需要執(zhí)行任務(wù)的時間點
* @param args 方法的參數(shù),按順序
*/
void runAtFuture(Object obj, String method, Date futureTime, Object... args);
}
接口的實現(xiàn)方法,這里的邏輯囊扳,是通過示例和methord還有args吩翻,獲得并緩存執(zhí)行任務(wù)的示例和方法。并把任務(wù)信息和任務(wù)執(zhí)行時間節(jié)點插入到redis的zset锥咸。
@Override
public void runAt(Object obj, String method, Date at, Object... args) {
log.info("runAt: method {}, at {}, args {}, {}", method, at, args, args.length);
method = getMethodName(obj, method,args);
runAtTypeCheck(method, args);
var timerTask = new TimerTask();
timerTask .setAt(at);
timerTask .setMethod(method);
timerTask .setArgs(args);
timerTask .setUniqueKey(RandomStringUtils.randomAlphanumeric(8));
timerRedisSortedList.zadd(timerDto);
}
脈沖定時任務(wù)的時間
@Scheduled(initialDelay = 5000, fixedRate = 100)
public void schedule() {
if (this.handlerMethods.size() == 0) {
log.warn("this.handlerMethods.size() == 0");
return;
}
var now = new Date();
for (int i = 0; i < 10; i++) {
//這里為lua腳本
var timerTasks= timerRedisSortedList.zpop(now, 100);
if (timerTask.size() <= 0) {
return;
}
timerTasks.forEach(timerTask-> {
var uniqueKey = timerTask.getUniqueKey();
var methodOnceKey = String.format(TIMER_UNIQUE_FMT, Context.ActID, uniqueKey);
// 確保不會重復(fù)運行
if (redisWrap.done(methodOnceKey, 60)) {
return;
}
// invoke first, then zrem, do not throw exception
try {
//這個invoke里面的邏輯實現(xiàn)可以是通過緩存的實例和方法狭瞎,使用Method.invoke來執(zhí)行任務(wù)
this.invoke(timerTask.getMethod(),timerTask.getTaskName, timerTask.getArgs());
} catch (Exception ex) {
log.error("timer: {}, catch exception", timerTask, ex);
}
});
}
}
lua腳本參考
local key = KEYS[1]
local items = redis.call('ZRANGEBYSCORE', key, 0, ARGV[1], 'LIMIT', 0, ARGV[2])
for i = 1, table.getn(items) do
redis.call('ZREM', key, items[i])
end
return items
業(yè)務(wù)調(diào)用方式
@Autowired
private Timer timer;
public void xxxxx(){
xxxxxx;
xxxxx;
xxxxx;
timer.runAtFuture(this,"doSomeThing",new Date(xxxxx),new XXX(),new AAA(),new XXXX());
}
pubic void doSomeThing(XXX param1,AAA param2,XXXX param3){
}