基于zookeeper實(shí)現(xiàn)任務(wù)調(diào)度(1)

最近公司發(fā)生了服務(wù)定時(shí)同步的機(jī)制異常森爽,看著公司的分布式任務(wù)調(diào)度組件馋评,想著自己實(shí)現(xiàn)一個(gè) github地址

業(yè)務(wù)背景

A模塊中的定時(shí)器每個(gè)30s去執(zhí)行一次任務(wù)(任務(wù)內(nèi)容是遠(yuǎn)程獲取消息并處理發(fā)送)。原定是A模塊部署到兩個(gè)服務(wù)器,但是目前A模塊獨(dú)立運(yùn)行兩個(gè)之后總是會(huì)獲取到相同的遠(yuǎn)程消息,然后重復(fù)執(zhí)行處理了兩次几晤。

實(shí)際場(chǎng)景

公司采用Spring quartz 建立定時(shí)任務(wù)模塊。 當(dāng)任務(wù)模塊進(jìn)行了分布式部署植阴,通常會(huì)出現(xiàn)定時(shí)任務(wù)重復(fù)執(zhí)行的情況蟹瘾。 怎么避免這種情況呢圾浅,是否可以構(gòu)建一個(gè)任務(wù)注冊(cè)中心,Quartz負(fù)責(zé)注冊(cè)任務(wù)憾朴,但不具體執(zhí)行任務(wù)內(nèi)的業(yè)務(wù)邏輯狸捕。


target.png

解決問題

任務(wù)注冊(cè)中心解決任務(wù)重復(fù)注冊(cè)的問題,同時(shí)將任務(wù)分配給若干處理器進(jìn)行具體的業(yè)務(wù)處理众雷,保證在同一個(gè)時(shí)間內(nèi)灸拍,一個(gè)任務(wù)只會(huì)被一個(gè)處理器進(jìn)行處理。

實(shí)現(xiàn)方式

利用ZooKeeper的Master選舉機(jī)制實(shí)現(xiàn)砾省。 注冊(cè)任務(wù)就相當(dāng)于在ZooKeeper中創(chuàng)建或更新一個(gè)節(jié)點(diǎn)鸡岗。通過更新節(jié)點(diǎn)的內(nèi)容,來記錄任務(wù)的執(zhí)行狀態(tài)编兄。

過程說明

任務(wù)調(diào)度轩性,從管理器分配任務(wù),根據(jù)不同的任務(wù)Id進(jìn)行注冊(cè)

public class ZKScheduleManager extends ThreadPoolTaskScheduler implements ApplicationContextAware {

    /**
     * 
     */
    private static final long serialVersionUID = 1L;
    
    private static final int DEFAULT_POOL_SIZE = 20;

    private static final transient Logger LOGGER = LoggerFactory.getLogger(ZKScheduleManager.class);
    
    private final CountDownLatch downLatch = new CountDownLatch(1);

    private Map<String, String> zkConfig;
    
    protected ZKManager zkManager;

    private IScheduleDataManager scheduleDataManager;

    /**
     * 當(dāng)前調(diào)度服務(wù)的信息
     */
    protected ScheduleServer currenScheduleServer;

    /**
     * 是否啟動(dòng)調(diào)度管理狠鸳,如果只是做系統(tǒng)管理揣苏,應(yīng)該設(shè)置為false,對(duì)應(yīng)key值為onlyAdmin
     */
    public boolean start = true;

    /**
     * 心跳間隔
     */
    private int timerInterval = 1000;

    /**
     * 是否注冊(cè)成功
     */
    private boolean isScheduleServerRegister = true;

    private static ApplicationContext applicationcontext;
    
    private Map<String, Boolean> isOwnerMap = new ConcurrentHashMap<String, Boolean>();

    private Timer hearBeatTimer;
    private Lock initLock = new ReentrantLock();
    private boolean isStopSchedule = false;
    private Lock registerLock = new ReentrantLock();
    
    private List<TaskDefine> initTaskDefines = new ArrayList<TaskDefine>();
    
    private volatile String errorMessage = "No config Zookeeper connect information";
    private InitialThread initialThread;

    public ZKScheduleManager() {
        this.currenScheduleServer = ScheduleServer.createScheduleServer(null);
    }

    public void init() throws Exception {
        if(this.zkConfig != null){
            for (Map.Entry<String, String> e : this.zkConfig.entrySet()) {
                ConsoleManager.properties.put(e.getKey(), e.getValue());
            }
        }
        if(ConsoleManager.properties.containsKey("onlyClient")){
            String val = String.valueOf(ConsoleManager.properties.get("onlyClient"));
            if(StringUtils.isNotBlank(val)){
                start = Boolean.valueOf(val);
            }
        }
        this.setPoolSize(DEFAULT_POOL_SIZE);
        if(ConsoleManager.properties.containsKey("poolSize")){
            String val = String.valueOf(ConsoleManager.properties.get("poolSize"));
            if(StringUtils.isNotBlank(val)){
                this.setPoolSize(Integer.valueOf(val));
            }
        }
        System.out.println("properties:"+ConsoleManager.properties);
        this.init(ConsoleManager.properties);
    }

    public void init(Properties p) throws Exception {
        if (this.initialThread != null) {
            this.initialThread.stopThread();
        }
        this.initLock.lock();
        try {
            this.scheduleDataManager = null;
            if (this.zkManager != null) {
                this.zkManager.close();
            }
            this.zkManager = new ZKManager(p);
            this.errorMessage = "Zookeeper connecting ......"
                    + this.zkManager.getConnectStr();
            initialThread = new InitialThread(this);
            initialThread.setName("ScheduleManager-initialThread");
            initialThread.start();
        } finally {
            this.initLock.unlock();
        }
    }

    private void rewriteScheduleInfo() throws Exception {
        registerLock.lock();
        try {
            if (this.isStopSchedule) {
                if (LOGGER.isDebugEnabled()) {
                    LOGGER.debug("外部命令終止調(diào)度,不在注冊(cè)調(diào)度服務(wù),避免遺留垃圾數(shù)據(jù):"
                            + currenScheduleServer.getUuid());
                }
                return;
            }
            // 先發(fā)送心跳信息
            if (errorMessage != null) {
                this.currenScheduleServer.setDealInfoDesc(errorMessage);
            }
            if (!this.scheduleDataManager
                    .refreshScheduleServer(this.currenScheduleServer)) {
                // 更新信息失敗件舵,清除內(nèi)存數(shù)據(jù)后重新注冊(cè)
                this.clearMemoInfo();
                this.scheduleDataManager.registerScheduleServer(this.currenScheduleServer);
            }
            isScheduleServerRegister = true;
        } finally {
            registerLock.unlock();
        }
    }

    /**
     * 清除內(nèi)存中所有的已經(jīng)取得的數(shù)據(jù)和任務(wù)隊(duì)列,在心態(tài)更新失敗卸察,或者發(fā)現(xiàn)注冊(cè)中心的調(diào)度信息被刪除
     */
    public void clearMemoInfo() {
        try {

        } finally {
        }

    }

    /**
     * 根據(jù)當(dāng)前調(diào)度服務(wù)器的信息,重新計(jì)算分配所有的調(diào)度任務(wù)
     * 任務(wù)的分配是需要加鎖铅祸,避免數(shù)據(jù)分配錯(cuò)誤坑质。為了避免數(shù)據(jù)鎖帶來的負(fù)面作用,通過版本號(hào)來達(dá)到鎖的目的
     * 
     * 1临梗、獲取任務(wù)狀態(tài)的版本號(hào) 2洪乍、獲取所有的服務(wù)器注冊(cè)信息和任務(wù)隊(duì)列信息 3、清除已經(jīng)超過心跳周期的服務(wù)器注冊(cè)信息 3夜焦、重新計(jì)算任務(wù)分配
     * 4、更新任務(wù)狀態(tài)的版本號(hào)【樂觀鎖】 5岂贩、根系任務(wù)隊(duì)列的分配信息
     * 
     * @throws Exception
     */
    public void assignScheduleTask() throws Exception {
        scheduleDataManager.clearExpireScheduleServer();
        List<String> serverList = scheduleDataManager.loadScheduleServerNames();
        if (!scheduleDataManager.isLeader(this.currenScheduleServer.getUuid(),
                serverList)) {
            if (LOGGER.isDebugEnabled()) {
                LOGGER.debug(this.currenScheduleServer.getUuid()
                        + ":不是負(fù)責(zé)任務(wù)分配的Leader,直接返回");
            }
            return;
        }
        //黑名單
        for(String ip:zkManager.getIpBlacklist()){
            int index = serverList.indexOf(ip);
            if (index > -1){
                serverList.remove(index);
            }
        }
        // 設(shè)置初始化成功標(biāo)準(zhǔn)茫经,避免在leader轉(zhuǎn)換的時(shí)候,新增的線程組初始化失敗
        scheduleDataManager.assignTask(this.currenScheduleServer.getUuid(), serverList);
    }

    /**
     * 定時(shí)向數(shù)據(jù)配置中心更新當(dāng)前服務(wù)器的心跳信息萎津。 如果發(fā)現(xiàn)本次更新的時(shí)間如果已經(jīng)超過了卸伞,服務(wù)器死亡的心跳周期,則不能在向服務(wù)器更新信息锉屈。
     * 而應(yīng)該當(dāng)作新的服務(wù)器荤傲,進(jìn)行重新注冊(cè)。
     * 
     * @throws Exception
     */
    public void refreshScheduleServer() throws Exception {
        try {
            rewriteScheduleInfo();
            // 如果任務(wù)信息沒有初始化成功颈渊,不做任務(wù)相關(guān)的處理
            if (!this.isScheduleServerRegister) {
                return;
            }

            // 重新分配任務(wù)
            this.assignScheduleTask();
            // 檢查本地任務(wù)
            this.checkLocalTask();
        } catch (Throwable e) {
            // 清除內(nèi)存中所有的已經(jīng)取得的數(shù)據(jù)和任務(wù)隊(duì)列,避免心跳線程失敗時(shí)候?qū)е碌臄?shù)據(jù)重復(fù)
            this.clearMemoInfo();
            if (e instanceof Exception) {
                throw (Exception) e;
            } else {
                throw new Exception(e.getMessage(), e);
            }
        }
    }
    
    public void checkLocalTask() throws Exception {
        // 檢查系統(tǒng)任務(wù)執(zhí)行情況
        scheduleDataManager.checkLocalTask(this.currenScheduleServer.getUuid());
    }

    /**
     * 在Zk狀態(tài)正常后回調(diào)數(shù)據(jù)初始化
     * 
     * @throws Exception
     */
    public void initialData() throws Exception {
        this.zkManager.initial();
        this.scheduleDataManager = new ScheduleDataManager4ZK(this.zkManager);
        checkScheduleDataManager();
        if (this.start) {
            // 注冊(cè)調(diào)度管理器
            this.scheduleDataManager.registerScheduleServer(this.currenScheduleServer);
            if (hearBeatTimer == null) {
                hearBeatTimer = new Timer("ScheduleManager-"
                        + this.currenScheduleServer.getUuid() + "-HearBeat");
            }
            hearBeatTimer.schedule(new HeartBeatTimerTask(this), 1000, this.timerInterval);
            
            //初始化啟動(dòng)數(shù)據(jù)
            if(initTaskDefines != null && initTaskDefines.size() > 0){
                for(TaskDefine taskDefine : initTaskDefines){
                    scheduleDataManager.addTask(taskDefine);
                }
            }
        }
    }
    
    private Runnable taskWrapper(final Runnable task){
        return new Runnable(){
            public void run(){
                TaskDefine taskDefine = resolveTaskName(task);
                String name = taskDefine.stringKey();
                if(StringUtils.isNotEmpty(name)){
                    boolean isOwner = false;
                    boolean isRunning = true;
                    try {
                        if(!isScheduleServerRegister){
                            Thread.sleep(1000);
                        }
                        if(zkManager.checkZookeeperState()){
                            isOwner = scheduleDataManager.isOwner(name, currenScheduleServer.getUuid());
                            isOwnerMap.put(name, isOwner);
                            isRunning = scheduleDataManager.isRunning(name);
                        }else{
                            // 如果zk不可用遂黍,使用歷史數(shù)據(jù)
                            if(null != isOwnerMap){
                                isOwner = isOwnerMap.get(name);
                            }
                        }
                        if(isOwner && isRunning){
                            String msg = null;
                            try {
                                task.run();
                                LOGGER.info("Cron job has been executed.");
                            } catch (Exception e) {
                                msg = e.getLocalizedMessage();
                            }
                            scheduleDataManager.saveRunningInfo(name, currenScheduleServer.getUuid(), taskDefine.getRunTimes(), msg);
                        }
                    } catch (Exception e) {
                        LOGGER.error("Check task owner error.", e);
                    }
                }
            }
        };
    }
    
    private TaskDefine resolveTaskName(final Runnable task) {
        Method targetMethod = null;
        TaskDefine taskDefine = new TaskDefine();
        if(task instanceof ScheduledMethodRunnable){
            ScheduledMethodRunnable runnable = (ScheduledMethodRunnable)task;
            taskDefine.setType(TaskDefine.TYPE_UNCODE_SINGLE_TASK);
            taskDefine.valueOf(runnable.getTaskDefine());
            taskDefine.setRunTimes(runnable.getRunTimes());
        }else if(task instanceof ScheduledDistributedSubRunnable){
            ScheduledDistributedSubRunnable runnable = (ScheduledDistributedSubRunnable)task;
            taskDefine.setType(TaskDefine.TYPE_UNCODE_MULTI_SUB_TASK);
            taskDefine.valueOf(runnable.getTaskDefine());
            taskDefine.setRunTimes(runnable.getRunTimes());
        }else if(task instanceof ScheduledDistributedMainRunnable){
            ScheduledDistributedMainRunnable runnable = (ScheduledDistributedMainRunnable)task;
            taskDefine.valueOf(runnable.getTaskDefine());
            taskDefine.setRunTimes(runnable.getRunTimes());
            taskDefine.setType(TaskDefine.TYPE_UNCODE_MULTI_MAIN_TASK);
        }else{
            org.springframework.scheduling.support.ScheduledMethodRunnable springScheduledMethodRunnable = (org.springframework.scheduling.support.ScheduledMethodRunnable)task;
            targetMethod = springScheduledMethodRunnable.getMethod();
            taskDefine.setType(TaskDefine.TYPE_SPRING_TASK);
            String[] beanNames = applicationcontext.getBeanNamesForType(targetMethod.getDeclaringClass());
            if(null != beanNames && StringUtils.isNotEmpty(beanNames[0])){
                taskDefine.setTargetBean(beanNames[0]);
                taskDefine.setTargetMethod(targetMethod.getName());
            }
        }
        
        return taskDefine;
    }

    class HeartBeatTimerTask extends java.util.TimerTask {
        private transient final Logger log = LoggerFactory.getLogger(HeartBeatTimerTask.class);
        ZKScheduleManager manager;

        public HeartBeatTimerTask(ZKScheduleManager aManager) {
            manager = aManager;
        }

        public void run() {
            try {
                Thread.currentThread().setPriority(Thread.MAX_PRIORITY);
                manager.refreshScheduleServer();
            } catch (Exception ex) {
                log.error(ex.getMessage(), ex);
            }
        }
    }

    class InitialThread extends Thread {
        private transient Logger log = LoggerFactory.getLogger(InitialThread.class);
        ZKScheduleManager sm;

        public InitialThread(ZKScheduleManager sm) {
            this.sm = sm;
        }

        boolean isStop = false;

        public void stopThread() {
            this.isStop = true;
        }

        @Override
        public void run() {
            sm.initLock.lock();
            try {
                int count = 0;
                while (!sm.zkManager.checkZookeeperState()) {
                    count = count + 1;
                    if (count % 50 == 0) {
                        sm.errorMessage = "Zookeeper connecting ......"
                                + sm.zkManager.getConnectStr() + " spendTime:"
                                + count * 20 + "(ms)";
                        log.error(sm.errorMessage);
                    }
                    Thread.sleep(20);
                    if (this.isStop) {
                        return;
                    }
                }
                sm.initialData();
            } catch (Throwable e) {
                log.error(e.getMessage(), e);
            } finally {
                sm.initLock.unlock();
            }

        }

    }

    public IScheduleDataManager getScheduleDataManager() {
        return scheduleDataManager;
    }

    @Override
    public void setApplicationContext(ApplicationContext applicationcontext)
            throws BeansException {
        ZKScheduleManager.applicationcontext = applicationcontext;
    }
    
    public void setZkManager(ZKManager zkManager) {
        this.zkManager = zkManager;
    }
    
    public ZKManager getZkManager() {
        return zkManager;
    }

    public void setZkConfig(Map<String, String> zkConfig) {
        this.zkConfig = zkConfig;
    }
    
    /**
     * 使用fixedRate的方式提交任務(wù)調(diào)度請(qǐng)求
     * <pre>
     * 任務(wù)首次啟動(dòng)時(shí)間未設(shè)置终佛,任務(wù)池將會(huì)盡可能早的啟動(dòng)任務(wù) 
     * </pre>
     * 
     * @param task 待執(zhí)行的任務(wù) 
     * @param period 兩次任務(wù)啟動(dòng)時(shí)間之間的間隔時(shí)間,默認(rèn)單位是毫秒
     * @return 任務(wù)句柄
     */
    public ScheduledFuture<?> scheduleAtFixedRate(Runnable task, long period) {
        try {
            TaskDefine taskDefine = resolveTaskName(task);
            taskDefine.setPeriod(period);
            checkScheduleDataManager();
            boolean rt = isUncodeTask(task);
            if(rt == false){
                scheduleDataManager.addTask(taskDefine);
            }
            LOGGER.debug(currenScheduleServer.getUuid() +":自動(dòng)向集群注冊(cè)任務(wù)[" + taskDefine.stringKey() + "]");
        } catch (Exception e) {
            LOGGER.error("update task error", e);
        }
        return super.scheduleAtFixedRate(taskWrapper(task), period);
    }
    
    /**
     * 提交任務(wù)調(diào)度請(qǐng)求 
     * 
     * @param task 待執(zhí)行任務(wù)  
     * @param trigger 使用Trigger指定任務(wù)調(diào)度規(guī)則
     * @return 任務(wù)句柄
     */
    public ScheduledFuture<?> schedule(Runnable task, Trigger trigger) {
        try {
            TaskDefine taskDefine = resolveTaskName(task);
            String cronEx = trigger.toString();
            int index = cronEx.indexOf(":");
            if(index >= 0){
                cronEx = cronEx.substring(index + 1);
                taskDefine.setCronExpression(cronEx.trim());
            }
            checkScheduleDataManager();
            boolean rt = isUncodeTask(task);
            if(rt == false){
                scheduleDataManager.addTask(taskDefine);
            }
            LOGGER.debug(currenScheduleServer.getUuid() +":自動(dòng)向集群注冊(cè)任務(wù)[" + taskDefine.getSingalKey() + "]");
        } catch (Exception e) {
            LOGGER.error("update task error", e);
        }
        return super.schedule(taskWrapper(task), trigger);
    }

    /**
     * 提交任務(wù)調(diào)度請(qǐng)求
     * <pre>
     * 注意任務(wù)只執(zhí)行一次雾家,使用startTime指定其啟動(dòng)時(shí)間  
     * </pre>
     * @param task 待執(zhí)行任務(wù)
     * @param startTime 任務(wù)啟動(dòng)時(shí)間  
     * @return 任務(wù)句柄
     */
    public ScheduledFuture<?> schedule(Runnable task, Date startTime) {
        try {
            TaskDefine taskDefine = resolveTaskName(task);
            taskDefine.setStartTime(startTime);
            checkScheduleDataManager();
            boolean rt = isUncodeTask(task);
            if(rt == false){
                scheduleDataManager.addTask(taskDefine);
            }
            LOGGER.debug(currenScheduleServer.getUuid() +":自動(dòng)向集群注冊(cè)任務(wù)[" + taskDefine.getSingalKey() + "]");
        } catch (Exception e) {
            LOGGER.error("update task error", e);
        }
        return super.schedule(taskWrapper(task), startTime);
    }

    private void checkScheduleDataManager() throws InterruptedException {
        if(scheduleDataManager == null){
            downLatch.await(1000, TimeUnit.MILLISECONDS);
        }else{
            downLatch.countDown();
        }
    }
    
    private boolean isUncodeTask(Runnable task){
        if(task instanceof ScheduledMethodRunnable){
            return true;
        }else if(task instanceof ScheduledDistributedSubRunnable){
            return true;
        }else if(task instanceof ScheduledDistributedMainRunnable){
            return true;
        }
        return false;
    }

    /**
     * 使用fixedRate的方式提交任務(wù)調(diào)度請(qǐng)求
     * <pre>
     * 任務(wù)首次啟動(dòng)時(shí)間由傳入?yún)?shù)指定 
     * </pre>
     * @param task 待執(zhí)行的任務(wù) 
     * @param startTime 任務(wù)啟動(dòng)時(shí)間  
     * @param period 兩次任務(wù)啟動(dòng)時(shí)間之間的間隔時(shí)間铃彰,默認(rèn)單位是毫秒
     * @return 任務(wù)句柄
     */
    public ScheduledFuture<?> scheduleAtFixedRate(Runnable task, Date startTime, long period) {
        try {
            TaskDefine taskDefine = resolveTaskName(task);
            taskDefine.setStartTime(startTime);
            taskDefine.setPeriod(period);
            checkScheduleDataManager();
            boolean rt = isUncodeTask(task);
            if(rt == false){
                scheduleDataManager.addTask(taskDefine);
            }
            LOGGER.debug(currenScheduleServer.getUuid() +":自動(dòng)向集群注冊(cè)任務(wù)[" + taskDefine.getSingalKey() + "]");
        } catch (Exception e) {
            LOGGER.error("update task error", e);
        }
        return super.scheduleAtFixedRate(taskWrapper(task), startTime, period);
    }
    

    /**
     *  使用fixedDelay的方式提交任務(wù)調(diào)度請(qǐng)求
     * <pre>
     *  任務(wù)首次啟動(dòng)時(shí)間由傳入?yún)?shù)指定 
     * </pre>
     * @param task 待執(zhí)行任務(wù)
     * @param startTime 任務(wù)啟動(dòng)時(shí)間
     * @param delay 上一次任務(wù)結(jié)束時(shí)間與下一次任務(wù)開始時(shí)間的間隔時(shí)間,單位默認(rèn)是毫秒 
     * @return 任務(wù)句柄
     */
    public ScheduledFuture<?> scheduleWithFixedDelay(Runnable task, Date startTime, long delay) {
        try {
            TaskDefine taskDefine = resolveTaskName(task);
            taskDefine.setStartTime(startTime);
            taskDefine.setPeriod(delay);
            checkScheduleDataManager();
            boolean rt = isUncodeTask(task);
            if(rt == false){
                scheduleDataManager.addTask(taskDefine);
            }
            LOGGER.debug(currenScheduleServer.getUuid() +":自動(dòng)向集群注冊(cè)任務(wù)[" + taskDefine.getSingalKey() + "]");
        } catch (Exception e) {
            LOGGER.error("update task error", e);
        }
        return super.scheduleWithFixedDelay(taskWrapper(task), startTime, delay);
    }

    
    /**
     * 使用fixedDelay的方式提交任務(wù)調(diào)度請(qǐng)求
     * <pre>
     * 任務(wù)首次啟動(dòng)時(shí)間未設(shè)置芯咧,任務(wù)池將會(huì)盡可能早的啟動(dòng)任務(wù) 
     * </pre>
     * @param task 待執(zhí)行任務(wù)
     * @param delay 上一次任務(wù)結(jié)束時(shí)間與下一次任務(wù)開始時(shí)間的間隔時(shí)間牙捉,單位默認(rèn)是毫秒 
     * @return 任務(wù)句柄
     */
    public ScheduledFuture<?> scheduleWithFixedDelay(Runnable task, long delay) {
        try {
            TaskDefine taskDefine = resolveTaskName(task);
            taskDefine.setPeriod(delay);
            checkScheduleDataManager();
            boolean rt = isUncodeTask(task);
            if(rt == false){
                scheduleDataManager.addTask(taskDefine);
            }
            LOGGER.debug(currenScheduleServer.getUuid() +":自動(dòng)向集群注冊(cè)任務(wù)[" + taskDefine.getSingalKey() + "]");
        } catch (Exception e) {
            LOGGER.error("update task error", e);
        }
        return super.scheduleWithFixedDelay(taskWrapper(task), delay);
    }
    
    public boolean checkAdminUser(String account, String password){
        if(StringUtils.isBlank(account) || StringUtils.isBlank(password)){
            return false;
        }
        String name = zkConfig.get(ZKManager.KEYS.userName.key);
        String pwd = zkConfig.get(ZKManager.KEYS.password.key);
        if(account.equals(name) && password.equals(pwd)){
            return true;
        }
        return false;
    }
    
    public String getScheduleServerUUid(){
        if(null != currenScheduleServer){
            return currenScheduleServer.getUuid();
        }
        return null;
    }

    public Map<String, Boolean> getIsOwnerMap() {
        return isOwnerMap;
    }

    public static ApplicationContext getApplicationcontext() {
        return ZKScheduleManager.applicationcontext;
    }
    
    public void setInitTaskDefines(List<TaskDefine> initTaskDefines) {
        this.initTaskDefines = initTaskDefines;
    }

    public void destroy() {
        try {
            if (this.initialThread != null) {
                this.initialThread.stopThread();
            }

            if (this.scheduleDataManager != null) {
                this.scheduleDataManager.clearExpireScheduleServer();
            }
            if (this.hearBeatTimer != null) {
                this.hearBeatTimer.cancel();
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            if (this.zkManager != null) {
                try {
                    this.zkManager.close();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
    }
    
    

}

任務(wù)注冊(cè)

public class ConsoleManager {
    
    private static transient Logger log = LoggerFactory.getLogger(ConsoleManager.class);
    
//    private static Gson GSON = new GsonBuilder().create();

    private static ZKScheduleManager scheduleManager;
    
    static Properties properties = new Properties();
    
    public static void setProperties(Properties prop){
        properties.putAll(prop);
    }
    
    public static ZKScheduleManager getScheduleManager() throws Exception {
        if(null == ConsoleManager.scheduleManager){
            synchronized(ConsoleManager.class) {
                ConsoleManager.scheduleManager = ZKScheduleManager.getApplicationcontext().getBean(ZKScheduleManager.class);
            }
        }
        return ConsoleManager.scheduleManager;
    }

    /**
     * 添加任務(wù)
     * @param taskDefine 任務(wù)定義
     */
    public static void addScheduleTask(TaskDefine taskDefine) {
        try {
            log.info("添加任務(wù):"+taskDefine.getSingalKey());
            ConsoleManager.getScheduleManager().getScheduleDataManager().addTask(taskDefine);
        } catch (Exception e) {
            log.error(e.getMessage(), e);
        }
    }
    
    /**
     * 刪除任務(wù)
     * @param taskDefine 任務(wù)定義
     */
    public static void delScheduleTask(TaskDefine taskDefine) {
        try {
            ConsoleManager.scheduleManager.getScheduleDataManager().delTask(taskDefine);
        } catch (Exception e) {
            log.error(e.getMessage(), e);
        }
    }
    
    /**
     * 不可用
     * @param targetBean bean名稱
     * @param targetMethod 方法名稱
     */
    @Deprecated
    public static void delScheduleTask(String targetBean, String targetMethod) {
        try {
            ConsoleManager.scheduleManager.getScheduleDataManager().delTask(targetBean, targetMethod);
        } catch (Exception e) {
            log.error(e.getMessage(), e);
        }
    }
    
    /**
     * 修改任務(wù)
     * @param taskDefine 任務(wù)定義
     */
    public static void updateScheduleTask(TaskDefine taskDefine) {
        try {
            ConsoleManager.scheduleManager.getScheduleDataManager().updateTask(taskDefine);
        } catch (Exception e) {
            log.error(e.getMessage(), e);
        }
    }
    
    /**
     * 查詢所有任務(wù)列表
     * @return 任務(wù)列表
     */
    public static List<TaskDefine> queryScheduleTask() {
        List<TaskDefine> taskDefines = new ArrayList<TaskDefine>();
        try {
            List<TaskDefine> tasks = ConsoleManager.getScheduleManager().getScheduleDataManager().selectTask();
            taskDefines.addAll(tasks);
        } catch (Exception e) {
            log.error(e.getMessage(), e);
        }
        return taskDefines;
    }
    
    /**
     * 任務(wù)是否存在
     * @param taskDefine  任務(wù)定義
     * @return 是或否
     * @throws Exception 異常
     */
    public static boolean isExistsTask(TaskDefine taskDefine) throws Exception{
            return ConsoleManager.scheduleManager.getScheduleDataManager().isExistsTask(taskDefine);
    }
    
    /**
     * 根據(jù)標(biāo)識(shí)查詢相關(guān)任務(wù)
     * @param taskDefine 任務(wù)定義
     * @return 任務(wù)信息
     * @throws Exception 異常
     */
    public static TaskDefine queryScheduleTask(TaskDefine taskDefine) throws Exception{
        return ConsoleManager.scheduleManager.getScheduleDataManager().selectTask(taskDefine);
    }
    
    /**
     * 判斷當(dāng)前任務(wù)是否屬于當(dāng)前節(jié)點(diǎn)
     * @param taskDefine 任務(wù)定義
     * @return 是或否
     * @throws Exception 異常
     */
    public static boolean isOwner(TaskDefine taskDefine) throws Exception{
        return ConsoleManager.scheduleManager.getScheduleDataManager().isOwner(taskDefine.getSingalKey(), 
                ConsoleManager.getScheduleManager().getScheduleServerUUid());
    }
    
}
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市敬飒,隨后出現(xiàn)的幾起案子邪铲,更是在濱河造成了極大的恐慌,老刑警劉巖无拗,帶你破解...
    沈念sama閱讀 216,544評(píng)論 6 501
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件带到,死亡現(xiàn)場(chǎng)離奇詭異,居然都是意外死亡蓝纲,警方通過查閱死者的電腦和手機(jī)阴孟,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,430評(píng)論 3 392
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來税迷,“玉大人永丝,你說我怎么就攤上這事〖” “怎么了慕嚷?”我有些...
    開封第一講書人閱讀 162,764評(píng)論 0 353
  • 文/不壞的土叔 我叫張陵,是天一觀的道長(zhǎng)毕泌。 經(jīng)常有香客問我喝检,道長(zhǎng),這世上最難降的妖魔是什么撼泛? 我笑而不...
    開封第一講書人閱讀 58,193評(píng)論 1 292
  • 正文 為了忘掉前任挠说,我火速辦了婚禮,結(jié)果婚禮上愿题,老公的妹妹穿的比我還像新娘损俭。我一直安慰自己,他們只是感情好潘酗,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,216評(píng)論 6 388
  • 文/花漫 我一把揭開白布杆兵。 她就那樣靜靜地躺著,像睡著了一般仔夺。 火紅的嫁衣襯著肌膚如雪琐脏。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 51,182評(píng)論 1 299
  • 那天,我揣著相機(jī)與錄音日裙,去河邊找鬼吹艇。 笑死,一個(gè)胖子當(dāng)著我的面吹牛阅签,可吹牛的內(nèi)容都是我干的掐暮。 我是一名探鬼主播,決...
    沈念sama閱讀 40,063評(píng)論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼政钟,長(zhǎng)吁一口氣:“原來是場(chǎng)噩夢(mèng)啊……” “哼路克!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起养交,我...
    開封第一講書人閱讀 38,917評(píng)論 0 274
  • 序言:老撾萬榮一對(duì)情侶失蹤精算,失蹤者是張志新(化名)和其女友劉穎,沒想到半個(gè)月后碎连,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體灰羽,經(jīng)...
    沈念sama閱讀 45,329評(píng)論 1 310
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,543評(píng)論 2 332
  • 正文 我和宋清朗相戀三年鱼辙,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了廉嚼。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 39,722評(píng)論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡倒戏,死狀恐怖怠噪,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情杜跷,我是刑警寧澤傍念,帶...
    沈念sama閱讀 35,425評(píng)論 5 343
  • 正文 年R本政府宣布,位于F島的核電站葛闷,受9級(jí)特大地震影響憋槐,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜淑趾,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,019評(píng)論 3 326
  • 文/蒙蒙 一阳仔、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧扣泊,春花似錦驳概、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,671評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)更卒。三九已至等孵,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間蹂空,已是汗流浹背俯萌。 一陣腳步聲響...
    開封第一講書人閱讀 32,825評(píng)論 1 269
  • 我被黑心中介騙來泰國(guó)打工果录, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人咐熙。 一個(gè)月前我還...
    沈念sama閱讀 47,729評(píng)論 2 368
  • 正文 我出身青樓弱恒,卻偏偏與公主長(zhǎng)得像,于是被迫代替她去往敵國(guó)和親棋恼。 傳聞我的和親對(duì)象是個(gè)殘疾皇子返弹,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,614評(píng)論 2 353