最近公司發(fā)生了服務(wù)定時(shí)同步的機(jī)制異常森爽,看著公司的分布式任務(wù)調(diào)度組件馋评,想著自己實(shí)現(xiàn)一個(gè) github地址
業(yè)務(wù)背景
A模塊中的定時(shí)器每個(gè)30s去執(zhí)行一次任務(wù)(任務(wù)內(nèi)容是遠(yuǎn)程獲取消息并處理發(fā)送)。原定是A模塊部署到兩個(gè)服務(wù)器,但是目前A模塊獨(dú)立運(yùn)行兩個(gè)之后總是會(huì)獲取到相同的遠(yuǎn)程消息,然后重復(fù)執(zhí)行處理了兩次几晤。
實(shí)際場(chǎng)景
公司采用Spring quartz 建立定時(shí)任務(wù)模塊。 當(dāng)任務(wù)模塊進(jìn)行了分布式部署植阴,通常會(huì)出現(xiàn)定時(shí)任務(wù)重復(fù)執(zhí)行的情況蟹瘾。 怎么避免這種情況呢圾浅,是否可以構(gòu)建一個(gè)任務(wù)注冊(cè)中心,Quartz負(fù)責(zé)注冊(cè)任務(wù)憾朴,但不具體執(zhí)行任務(wù)內(nèi)的業(yè)務(wù)邏輯狸捕。
解決問題
任務(wù)注冊(cè)中心解決任務(wù)重復(fù)注冊(cè)的問題,同時(shí)將任務(wù)分配給若干處理器進(jìn)行具體的業(yè)務(wù)處理众雷,保證在同一個(gè)時(shí)間內(nèi)灸拍,一個(gè)任務(wù)只會(huì)被一個(gè)處理器進(jìn)行處理。
實(shí)現(xiàn)方式
利用ZooKeeper的Master選舉機(jī)制實(shí)現(xiàn)砾省。 注冊(cè)任務(wù)就相當(dāng)于在ZooKeeper中創(chuàng)建或更新一個(gè)節(jié)點(diǎn)鸡岗。通過更新節(jié)點(diǎn)的內(nèi)容,來記錄任務(wù)的執(zhí)行狀態(tài)编兄。
過程說明
任務(wù)調(diào)度轩性,從管理器分配任務(wù),根據(jù)不同的任務(wù)Id進(jìn)行注冊(cè)
public class ZKScheduleManager extends ThreadPoolTaskScheduler implements ApplicationContextAware {
/**
*
*/
private static final long serialVersionUID = 1L;
private static final int DEFAULT_POOL_SIZE = 20;
private static final transient Logger LOGGER = LoggerFactory.getLogger(ZKScheduleManager.class);
private final CountDownLatch downLatch = new CountDownLatch(1);
private Map<String, String> zkConfig;
protected ZKManager zkManager;
private IScheduleDataManager scheduleDataManager;
/**
* 當(dāng)前調(diào)度服務(wù)的信息
*/
protected ScheduleServer currenScheduleServer;
/**
* 是否啟動(dòng)調(diào)度管理狠鸳,如果只是做系統(tǒng)管理揣苏,應(yīng)該設(shè)置為false,對(duì)應(yīng)key值為onlyAdmin
*/
public boolean start = true;
/**
* 心跳間隔
*/
private int timerInterval = 1000;
/**
* 是否注冊(cè)成功
*/
private boolean isScheduleServerRegister = true;
private static ApplicationContext applicationcontext;
private Map<String, Boolean> isOwnerMap = new ConcurrentHashMap<String, Boolean>();
private Timer hearBeatTimer;
private Lock initLock = new ReentrantLock();
private boolean isStopSchedule = false;
private Lock registerLock = new ReentrantLock();
private List<TaskDefine> initTaskDefines = new ArrayList<TaskDefine>();
private volatile String errorMessage = "No config Zookeeper connect information";
private InitialThread initialThread;
public ZKScheduleManager() {
this.currenScheduleServer = ScheduleServer.createScheduleServer(null);
}
public void init() throws Exception {
if(this.zkConfig != null){
for (Map.Entry<String, String> e : this.zkConfig.entrySet()) {
ConsoleManager.properties.put(e.getKey(), e.getValue());
}
}
if(ConsoleManager.properties.containsKey("onlyClient")){
String val = String.valueOf(ConsoleManager.properties.get("onlyClient"));
if(StringUtils.isNotBlank(val)){
start = Boolean.valueOf(val);
}
}
this.setPoolSize(DEFAULT_POOL_SIZE);
if(ConsoleManager.properties.containsKey("poolSize")){
String val = String.valueOf(ConsoleManager.properties.get("poolSize"));
if(StringUtils.isNotBlank(val)){
this.setPoolSize(Integer.valueOf(val));
}
}
System.out.println("properties:"+ConsoleManager.properties);
this.init(ConsoleManager.properties);
}
public void init(Properties p) throws Exception {
if (this.initialThread != null) {
this.initialThread.stopThread();
}
this.initLock.lock();
try {
this.scheduleDataManager = null;
if (this.zkManager != null) {
this.zkManager.close();
}
this.zkManager = new ZKManager(p);
this.errorMessage = "Zookeeper connecting ......"
+ this.zkManager.getConnectStr();
initialThread = new InitialThread(this);
initialThread.setName("ScheduleManager-initialThread");
initialThread.start();
} finally {
this.initLock.unlock();
}
}
private void rewriteScheduleInfo() throws Exception {
registerLock.lock();
try {
if (this.isStopSchedule) {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("外部命令終止調(diào)度,不在注冊(cè)調(diào)度服務(wù),避免遺留垃圾數(shù)據(jù):"
+ currenScheduleServer.getUuid());
}
return;
}
// 先發(fā)送心跳信息
if (errorMessage != null) {
this.currenScheduleServer.setDealInfoDesc(errorMessage);
}
if (!this.scheduleDataManager
.refreshScheduleServer(this.currenScheduleServer)) {
// 更新信息失敗件舵,清除內(nèi)存數(shù)據(jù)后重新注冊(cè)
this.clearMemoInfo();
this.scheduleDataManager.registerScheduleServer(this.currenScheduleServer);
}
isScheduleServerRegister = true;
} finally {
registerLock.unlock();
}
}
/**
* 清除內(nèi)存中所有的已經(jīng)取得的數(shù)據(jù)和任務(wù)隊(duì)列,在心態(tài)更新失敗卸察,或者發(fā)現(xiàn)注冊(cè)中心的調(diào)度信息被刪除
*/
public void clearMemoInfo() {
try {
} finally {
}
}
/**
* 根據(jù)當(dāng)前調(diào)度服務(wù)器的信息,重新計(jì)算分配所有的調(diào)度任務(wù)
* 任務(wù)的分配是需要加鎖铅祸,避免數(shù)據(jù)分配錯(cuò)誤坑质。為了避免數(shù)據(jù)鎖帶來的負(fù)面作用,通過版本號(hào)來達(dá)到鎖的目的
*
* 1临梗、獲取任務(wù)狀態(tài)的版本號(hào) 2洪乍、獲取所有的服務(wù)器注冊(cè)信息和任務(wù)隊(duì)列信息 3、清除已經(jīng)超過心跳周期的服務(wù)器注冊(cè)信息 3夜焦、重新計(jì)算任務(wù)分配
* 4、更新任務(wù)狀態(tài)的版本號(hào)【樂觀鎖】 5岂贩、根系任務(wù)隊(duì)列的分配信息
*
* @throws Exception
*/
public void assignScheduleTask() throws Exception {
scheduleDataManager.clearExpireScheduleServer();
List<String> serverList = scheduleDataManager.loadScheduleServerNames();
if (!scheduleDataManager.isLeader(this.currenScheduleServer.getUuid(),
serverList)) {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug(this.currenScheduleServer.getUuid()
+ ":不是負(fù)責(zé)任務(wù)分配的Leader,直接返回");
}
return;
}
//黑名單
for(String ip:zkManager.getIpBlacklist()){
int index = serverList.indexOf(ip);
if (index > -1){
serverList.remove(index);
}
}
// 設(shè)置初始化成功標(biāo)準(zhǔn)茫经,避免在leader轉(zhuǎn)換的時(shí)候,新增的線程組初始化失敗
scheduleDataManager.assignTask(this.currenScheduleServer.getUuid(), serverList);
}
/**
* 定時(shí)向數(shù)據(jù)配置中心更新當(dāng)前服務(wù)器的心跳信息萎津。 如果發(fā)現(xiàn)本次更新的時(shí)間如果已經(jīng)超過了卸伞,服務(wù)器死亡的心跳周期,則不能在向服務(wù)器更新信息锉屈。
* 而應(yīng)該當(dāng)作新的服務(wù)器荤傲,進(jìn)行重新注冊(cè)。
*
* @throws Exception
*/
public void refreshScheduleServer() throws Exception {
try {
rewriteScheduleInfo();
// 如果任務(wù)信息沒有初始化成功颈渊,不做任務(wù)相關(guān)的處理
if (!this.isScheduleServerRegister) {
return;
}
// 重新分配任務(wù)
this.assignScheduleTask();
// 檢查本地任務(wù)
this.checkLocalTask();
} catch (Throwable e) {
// 清除內(nèi)存中所有的已經(jīng)取得的數(shù)據(jù)和任務(wù)隊(duì)列,避免心跳線程失敗時(shí)候?qū)е碌臄?shù)據(jù)重復(fù)
this.clearMemoInfo();
if (e instanceof Exception) {
throw (Exception) e;
} else {
throw new Exception(e.getMessage(), e);
}
}
}
public void checkLocalTask() throws Exception {
// 檢查系統(tǒng)任務(wù)執(zhí)行情況
scheduleDataManager.checkLocalTask(this.currenScheduleServer.getUuid());
}
/**
* 在Zk狀態(tài)正常后回調(diào)數(shù)據(jù)初始化
*
* @throws Exception
*/
public void initialData() throws Exception {
this.zkManager.initial();
this.scheduleDataManager = new ScheduleDataManager4ZK(this.zkManager);
checkScheduleDataManager();
if (this.start) {
// 注冊(cè)調(diào)度管理器
this.scheduleDataManager.registerScheduleServer(this.currenScheduleServer);
if (hearBeatTimer == null) {
hearBeatTimer = new Timer("ScheduleManager-"
+ this.currenScheduleServer.getUuid() + "-HearBeat");
}
hearBeatTimer.schedule(new HeartBeatTimerTask(this), 1000, this.timerInterval);
//初始化啟動(dòng)數(shù)據(jù)
if(initTaskDefines != null && initTaskDefines.size() > 0){
for(TaskDefine taskDefine : initTaskDefines){
scheduleDataManager.addTask(taskDefine);
}
}
}
}
private Runnable taskWrapper(final Runnable task){
return new Runnable(){
public void run(){
TaskDefine taskDefine = resolveTaskName(task);
String name = taskDefine.stringKey();
if(StringUtils.isNotEmpty(name)){
boolean isOwner = false;
boolean isRunning = true;
try {
if(!isScheduleServerRegister){
Thread.sleep(1000);
}
if(zkManager.checkZookeeperState()){
isOwner = scheduleDataManager.isOwner(name, currenScheduleServer.getUuid());
isOwnerMap.put(name, isOwner);
isRunning = scheduleDataManager.isRunning(name);
}else{
// 如果zk不可用遂黍,使用歷史數(shù)據(jù)
if(null != isOwnerMap){
isOwner = isOwnerMap.get(name);
}
}
if(isOwner && isRunning){
String msg = null;
try {
task.run();
LOGGER.info("Cron job has been executed.");
} catch (Exception e) {
msg = e.getLocalizedMessage();
}
scheduleDataManager.saveRunningInfo(name, currenScheduleServer.getUuid(), taskDefine.getRunTimes(), msg);
}
} catch (Exception e) {
LOGGER.error("Check task owner error.", e);
}
}
}
};
}
private TaskDefine resolveTaskName(final Runnable task) {
Method targetMethod = null;
TaskDefine taskDefine = new TaskDefine();
if(task instanceof ScheduledMethodRunnable){
ScheduledMethodRunnable runnable = (ScheduledMethodRunnable)task;
taskDefine.setType(TaskDefine.TYPE_UNCODE_SINGLE_TASK);
taskDefine.valueOf(runnable.getTaskDefine());
taskDefine.setRunTimes(runnable.getRunTimes());
}else if(task instanceof ScheduledDistributedSubRunnable){
ScheduledDistributedSubRunnable runnable = (ScheduledDistributedSubRunnable)task;
taskDefine.setType(TaskDefine.TYPE_UNCODE_MULTI_SUB_TASK);
taskDefine.valueOf(runnable.getTaskDefine());
taskDefine.setRunTimes(runnable.getRunTimes());
}else if(task instanceof ScheduledDistributedMainRunnable){
ScheduledDistributedMainRunnable runnable = (ScheduledDistributedMainRunnable)task;
taskDefine.valueOf(runnable.getTaskDefine());
taskDefine.setRunTimes(runnable.getRunTimes());
taskDefine.setType(TaskDefine.TYPE_UNCODE_MULTI_MAIN_TASK);
}else{
org.springframework.scheduling.support.ScheduledMethodRunnable springScheduledMethodRunnable = (org.springframework.scheduling.support.ScheduledMethodRunnable)task;
targetMethod = springScheduledMethodRunnable.getMethod();
taskDefine.setType(TaskDefine.TYPE_SPRING_TASK);
String[] beanNames = applicationcontext.getBeanNamesForType(targetMethod.getDeclaringClass());
if(null != beanNames && StringUtils.isNotEmpty(beanNames[0])){
taskDefine.setTargetBean(beanNames[0]);
taskDefine.setTargetMethod(targetMethod.getName());
}
}
return taskDefine;
}
class HeartBeatTimerTask extends java.util.TimerTask {
private transient final Logger log = LoggerFactory.getLogger(HeartBeatTimerTask.class);
ZKScheduleManager manager;
public HeartBeatTimerTask(ZKScheduleManager aManager) {
manager = aManager;
}
public void run() {
try {
Thread.currentThread().setPriority(Thread.MAX_PRIORITY);
manager.refreshScheduleServer();
} catch (Exception ex) {
log.error(ex.getMessage(), ex);
}
}
}
class InitialThread extends Thread {
private transient Logger log = LoggerFactory.getLogger(InitialThread.class);
ZKScheduleManager sm;
public InitialThread(ZKScheduleManager sm) {
this.sm = sm;
}
boolean isStop = false;
public void stopThread() {
this.isStop = true;
}
@Override
public void run() {
sm.initLock.lock();
try {
int count = 0;
while (!sm.zkManager.checkZookeeperState()) {
count = count + 1;
if (count % 50 == 0) {
sm.errorMessage = "Zookeeper connecting ......"
+ sm.zkManager.getConnectStr() + " spendTime:"
+ count * 20 + "(ms)";
log.error(sm.errorMessage);
}
Thread.sleep(20);
if (this.isStop) {
return;
}
}
sm.initialData();
} catch (Throwable e) {
log.error(e.getMessage(), e);
} finally {
sm.initLock.unlock();
}
}
}
public IScheduleDataManager getScheduleDataManager() {
return scheduleDataManager;
}
@Override
public void setApplicationContext(ApplicationContext applicationcontext)
throws BeansException {
ZKScheduleManager.applicationcontext = applicationcontext;
}
public void setZkManager(ZKManager zkManager) {
this.zkManager = zkManager;
}
public ZKManager getZkManager() {
return zkManager;
}
public void setZkConfig(Map<String, String> zkConfig) {
this.zkConfig = zkConfig;
}
/**
* 使用fixedRate的方式提交任務(wù)調(diào)度請(qǐng)求
* <pre>
* 任務(wù)首次啟動(dòng)時(shí)間未設(shè)置终佛,任務(wù)池將會(huì)盡可能早的啟動(dòng)任務(wù)
* </pre>
*
* @param task 待執(zhí)行的任務(wù)
* @param period 兩次任務(wù)啟動(dòng)時(shí)間之間的間隔時(shí)間,默認(rèn)單位是毫秒
* @return 任務(wù)句柄
*/
public ScheduledFuture<?> scheduleAtFixedRate(Runnable task, long period) {
try {
TaskDefine taskDefine = resolveTaskName(task);
taskDefine.setPeriod(period);
checkScheduleDataManager();
boolean rt = isUncodeTask(task);
if(rt == false){
scheduleDataManager.addTask(taskDefine);
}
LOGGER.debug(currenScheduleServer.getUuid() +":自動(dòng)向集群注冊(cè)任務(wù)[" + taskDefine.stringKey() + "]");
} catch (Exception e) {
LOGGER.error("update task error", e);
}
return super.scheduleAtFixedRate(taskWrapper(task), period);
}
/**
* 提交任務(wù)調(diào)度請(qǐng)求
*
* @param task 待執(zhí)行任務(wù)
* @param trigger 使用Trigger指定任務(wù)調(diào)度規(guī)則
* @return 任務(wù)句柄
*/
public ScheduledFuture<?> schedule(Runnable task, Trigger trigger) {
try {
TaskDefine taskDefine = resolveTaskName(task);
String cronEx = trigger.toString();
int index = cronEx.indexOf(":");
if(index >= 0){
cronEx = cronEx.substring(index + 1);
taskDefine.setCronExpression(cronEx.trim());
}
checkScheduleDataManager();
boolean rt = isUncodeTask(task);
if(rt == false){
scheduleDataManager.addTask(taskDefine);
}
LOGGER.debug(currenScheduleServer.getUuid() +":自動(dòng)向集群注冊(cè)任務(wù)[" + taskDefine.getSingalKey() + "]");
} catch (Exception e) {
LOGGER.error("update task error", e);
}
return super.schedule(taskWrapper(task), trigger);
}
/**
* 提交任務(wù)調(diào)度請(qǐng)求
* <pre>
* 注意任務(wù)只執(zhí)行一次雾家,使用startTime指定其啟動(dòng)時(shí)間
* </pre>
* @param task 待執(zhí)行任務(wù)
* @param startTime 任務(wù)啟動(dòng)時(shí)間
* @return 任務(wù)句柄
*/
public ScheduledFuture<?> schedule(Runnable task, Date startTime) {
try {
TaskDefine taskDefine = resolveTaskName(task);
taskDefine.setStartTime(startTime);
checkScheduleDataManager();
boolean rt = isUncodeTask(task);
if(rt == false){
scheduleDataManager.addTask(taskDefine);
}
LOGGER.debug(currenScheduleServer.getUuid() +":自動(dòng)向集群注冊(cè)任務(wù)[" + taskDefine.getSingalKey() + "]");
} catch (Exception e) {
LOGGER.error("update task error", e);
}
return super.schedule(taskWrapper(task), startTime);
}
private void checkScheduleDataManager() throws InterruptedException {
if(scheduleDataManager == null){
downLatch.await(1000, TimeUnit.MILLISECONDS);
}else{
downLatch.countDown();
}
}
private boolean isUncodeTask(Runnable task){
if(task instanceof ScheduledMethodRunnable){
return true;
}else if(task instanceof ScheduledDistributedSubRunnable){
return true;
}else if(task instanceof ScheduledDistributedMainRunnable){
return true;
}
return false;
}
/**
* 使用fixedRate的方式提交任務(wù)調(diào)度請(qǐng)求
* <pre>
* 任務(wù)首次啟動(dòng)時(shí)間由傳入?yún)?shù)指定
* </pre>
* @param task 待執(zhí)行的任務(wù)
* @param startTime 任務(wù)啟動(dòng)時(shí)間
* @param period 兩次任務(wù)啟動(dòng)時(shí)間之間的間隔時(shí)間铃彰,默認(rèn)單位是毫秒
* @return 任務(wù)句柄
*/
public ScheduledFuture<?> scheduleAtFixedRate(Runnable task, Date startTime, long period) {
try {
TaskDefine taskDefine = resolveTaskName(task);
taskDefine.setStartTime(startTime);
taskDefine.setPeriod(period);
checkScheduleDataManager();
boolean rt = isUncodeTask(task);
if(rt == false){
scheduleDataManager.addTask(taskDefine);
}
LOGGER.debug(currenScheduleServer.getUuid() +":自動(dòng)向集群注冊(cè)任務(wù)[" + taskDefine.getSingalKey() + "]");
} catch (Exception e) {
LOGGER.error("update task error", e);
}
return super.scheduleAtFixedRate(taskWrapper(task), startTime, period);
}
/**
* 使用fixedDelay的方式提交任務(wù)調(diào)度請(qǐng)求
* <pre>
* 任務(wù)首次啟動(dòng)時(shí)間由傳入?yún)?shù)指定
* </pre>
* @param task 待執(zhí)行任務(wù)
* @param startTime 任務(wù)啟動(dòng)時(shí)間
* @param delay 上一次任務(wù)結(jié)束時(shí)間與下一次任務(wù)開始時(shí)間的間隔時(shí)間,單位默認(rèn)是毫秒
* @return 任務(wù)句柄
*/
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable task, Date startTime, long delay) {
try {
TaskDefine taskDefine = resolveTaskName(task);
taskDefine.setStartTime(startTime);
taskDefine.setPeriod(delay);
checkScheduleDataManager();
boolean rt = isUncodeTask(task);
if(rt == false){
scheduleDataManager.addTask(taskDefine);
}
LOGGER.debug(currenScheduleServer.getUuid() +":自動(dòng)向集群注冊(cè)任務(wù)[" + taskDefine.getSingalKey() + "]");
} catch (Exception e) {
LOGGER.error("update task error", e);
}
return super.scheduleWithFixedDelay(taskWrapper(task), startTime, delay);
}
/**
* 使用fixedDelay的方式提交任務(wù)調(diào)度請(qǐng)求
* <pre>
* 任務(wù)首次啟動(dòng)時(shí)間未設(shè)置芯咧,任務(wù)池將會(huì)盡可能早的啟動(dòng)任務(wù)
* </pre>
* @param task 待執(zhí)行任務(wù)
* @param delay 上一次任務(wù)結(jié)束時(shí)間與下一次任務(wù)開始時(shí)間的間隔時(shí)間牙捉,單位默認(rèn)是毫秒
* @return 任務(wù)句柄
*/
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable task, long delay) {
try {
TaskDefine taskDefine = resolveTaskName(task);
taskDefine.setPeriod(delay);
checkScheduleDataManager();
boolean rt = isUncodeTask(task);
if(rt == false){
scheduleDataManager.addTask(taskDefine);
}
LOGGER.debug(currenScheduleServer.getUuid() +":自動(dòng)向集群注冊(cè)任務(wù)[" + taskDefine.getSingalKey() + "]");
} catch (Exception e) {
LOGGER.error("update task error", e);
}
return super.scheduleWithFixedDelay(taskWrapper(task), delay);
}
public boolean checkAdminUser(String account, String password){
if(StringUtils.isBlank(account) || StringUtils.isBlank(password)){
return false;
}
String name = zkConfig.get(ZKManager.KEYS.userName.key);
String pwd = zkConfig.get(ZKManager.KEYS.password.key);
if(account.equals(name) && password.equals(pwd)){
return true;
}
return false;
}
public String getScheduleServerUUid(){
if(null != currenScheduleServer){
return currenScheduleServer.getUuid();
}
return null;
}
public Map<String, Boolean> getIsOwnerMap() {
return isOwnerMap;
}
public static ApplicationContext getApplicationcontext() {
return ZKScheduleManager.applicationcontext;
}
public void setInitTaskDefines(List<TaskDefine> initTaskDefines) {
this.initTaskDefines = initTaskDefines;
}
public void destroy() {
try {
if (this.initialThread != null) {
this.initialThread.stopThread();
}
if (this.scheduleDataManager != null) {
this.scheduleDataManager.clearExpireScheduleServer();
}
if (this.hearBeatTimer != null) {
this.hearBeatTimer.cancel();
}
} catch (Exception e) {
e.printStackTrace();
} finally {
if (this.zkManager != null) {
try {
this.zkManager.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
}
任務(wù)注冊(cè)
public class ConsoleManager {
private static transient Logger log = LoggerFactory.getLogger(ConsoleManager.class);
// private static Gson GSON = new GsonBuilder().create();
private static ZKScheduleManager scheduleManager;
static Properties properties = new Properties();
public static void setProperties(Properties prop){
properties.putAll(prop);
}
public static ZKScheduleManager getScheduleManager() throws Exception {
if(null == ConsoleManager.scheduleManager){
synchronized(ConsoleManager.class) {
ConsoleManager.scheduleManager = ZKScheduleManager.getApplicationcontext().getBean(ZKScheduleManager.class);
}
}
return ConsoleManager.scheduleManager;
}
/**
* 添加任務(wù)
* @param taskDefine 任務(wù)定義
*/
public static void addScheduleTask(TaskDefine taskDefine) {
try {
log.info("添加任務(wù):"+taskDefine.getSingalKey());
ConsoleManager.getScheduleManager().getScheduleDataManager().addTask(taskDefine);
} catch (Exception e) {
log.error(e.getMessage(), e);
}
}
/**
* 刪除任務(wù)
* @param taskDefine 任務(wù)定義
*/
public static void delScheduleTask(TaskDefine taskDefine) {
try {
ConsoleManager.scheduleManager.getScheduleDataManager().delTask(taskDefine);
} catch (Exception e) {
log.error(e.getMessage(), e);
}
}
/**
* 不可用
* @param targetBean bean名稱
* @param targetMethod 方法名稱
*/
@Deprecated
public static void delScheduleTask(String targetBean, String targetMethod) {
try {
ConsoleManager.scheduleManager.getScheduleDataManager().delTask(targetBean, targetMethod);
} catch (Exception e) {
log.error(e.getMessage(), e);
}
}
/**
* 修改任務(wù)
* @param taskDefine 任務(wù)定義
*/
public static void updateScheduleTask(TaskDefine taskDefine) {
try {
ConsoleManager.scheduleManager.getScheduleDataManager().updateTask(taskDefine);
} catch (Exception e) {
log.error(e.getMessage(), e);
}
}
/**
* 查詢所有任務(wù)列表
* @return 任務(wù)列表
*/
public static List<TaskDefine> queryScheduleTask() {
List<TaskDefine> taskDefines = new ArrayList<TaskDefine>();
try {
List<TaskDefine> tasks = ConsoleManager.getScheduleManager().getScheduleDataManager().selectTask();
taskDefines.addAll(tasks);
} catch (Exception e) {
log.error(e.getMessage(), e);
}
return taskDefines;
}
/**
* 任務(wù)是否存在
* @param taskDefine 任務(wù)定義
* @return 是或否
* @throws Exception 異常
*/
public static boolean isExistsTask(TaskDefine taskDefine) throws Exception{
return ConsoleManager.scheduleManager.getScheduleDataManager().isExistsTask(taskDefine);
}
/**
* 根據(jù)標(biāo)識(shí)查詢相關(guān)任務(wù)
* @param taskDefine 任務(wù)定義
* @return 任務(wù)信息
* @throws Exception 異常
*/
public static TaskDefine queryScheduleTask(TaskDefine taskDefine) throws Exception{
return ConsoleManager.scheduleManager.getScheduleDataManager().selectTask(taskDefine);
}
/**
* 判斷當(dāng)前任務(wù)是否屬于當(dāng)前節(jié)點(diǎn)
* @param taskDefine 任務(wù)定義
* @return 是或否
* @throws Exception 異常
*/
public static boolean isOwner(TaskDefine taskDefine) throws Exception{
return ConsoleManager.scheduleManager.getScheduleDataManager().isOwner(taskDefine.getSingalKey(),
ConsoleManager.getScheduleManager().getScheduleServerUUid());
}
}