Node.js 任務(wù)狀態(tài)監(jiān)控

在實(shí)際生產(chǎn)環(huán)境中,避免不了有很多后臺(tái)運(yùn)行的任務(wù)和定時(shí)任務(wù)志珍,對(duì)任務(wù)狀態(tài)的監(jiān)控與及時(shí)告警可以盡量減少程序出錯(cuò)時(shí)對(duì)用戶造成的影響淳地。針對(duì)常見的兩種任務(wù)類型:定時(shí)任務(wù)、守護(hù)進(jìn)程內(nèi)批處理任務(wù)纱扭,利用 Node.js child_process 實(shí)現(xiàn)了任務(wù)狀態(tài)的監(jiān)控、重啟與郵件告警儡遮。

思路

現(xiàn)在的互聯(lián)網(wǎng)已經(jīng)不是單機(jī)作戰(zhàn)的時(shí)代了乳蛾,分布式部署是非常常見的方式,一個(gè)項(xiàng)目中的任務(wù)可能運(yùn)行在多臺(tái)服務(wù)器上鄙币,我們的監(jiān)控平臺(tái)要做到重啟某個(gè)任務(wù)就需要知道任務(wù)運(yùn)行的具體服務(wù)器肃叶,針對(duì)這一個(gè)問題我們需要獲取到任務(wù)與服務(wù)器關(guān)系的確切信息,所以每臺(tái)運(yùn)行任務(wù)的服務(wù)器需要在啟動(dòng)任務(wù)時(shí)向任務(wù)狀態(tài)管理平臺(tái)注冊(cè)自己的信息十嘿。

任務(wù)狀態(tài)的維護(hù)依賴于任務(wù)運(yùn)行服務(wù)器的心跳上報(bào)因惭,每個(gè)任務(wù)設(shè)置一個(gè)超時(shí)時(shí)間,在任務(wù)啟動(dòng)時(shí)向任務(wù)狀態(tài)管理平臺(tái)發(fā)送開始運(yùn)行信號(hào)绩衷,在任務(wù)運(yùn)行結(jié)束后向管理平臺(tái)發(fā)送運(yùn)行完成信號(hào)蹦魔。任務(wù)管理平臺(tái)根據(jù)任務(wù)設(shè)置的超時(shí)時(shí)間激率,在超時(shí)后仍然沒有接收到任務(wù)完成信號(hào)則判定任務(wù)失敗,將任務(wù)失敗信號(hào)發(fā)送回任務(wù)運(yùn)行的服務(wù)器勿决。再有任務(wù)運(yùn)行服務(wù)器自行處理乒躺,如重啟任務(wù)或者結(jié)束任務(wù)等。

根據(jù)以上的邏輯低缩,實(shí)際需要就是在任務(wù)運(yùn)行的服務(wù)器實(shí)現(xiàn)一個(gè)任務(wù)調(diào)度功能與 HTTP 服務(wù)器用來監(jiān)聽管理平臺(tái)發(fā)送的信號(hào)嘉冒;在管理平臺(tái)這邊實(shí)現(xiàn)任務(wù)服務(wù)器信息注冊(cè)、任務(wù)狀態(tài)監(jiān)管與超時(shí)告警咆繁。文字表述比較晦澀讳推,具體流程可以參考一下的流程圖。


定時(shí)任務(wù)主進(jìn)程調(diào)度系統(tǒng)流程圖.png

定時(shí)任務(wù)狀態(tài)管理平臺(tái)流程圖.png

實(shí)現(xiàn)代碼

后續(xù)會(huì)把關(guān)鍵信息從代碼中抽離出來放到配置文件中玩般,然后放到 GitHub 上娜遵,暫時(shí)以貼代碼的形式簡(jiǎn)單展示一下。

// 任務(wù)運(yùn)行服務(wù)器調(diào)度系統(tǒng)
'use strict';

// 內(nèi)建模塊
const fork = require('child_process').fork;
const path = require('path');

// 第三方模塊
const _ = require('lodash');
const CronJob = require('cron').CronJob;
const bodyParser = require('body-parser');
const express = require('express');
const request = require('request');
const uuid = require('uuid');

class TaskStatusManagementClient {
    /**
     * 初始化 TaskStatusClient
     * @param {Object} taskClientConfig 服務(wù)器配置信息
     */
    constructor(taskClientConfig) {
        this.taskClientConfig = taskClientConfig;
        this.taskHomePath = taskClientConfig.taskHomePath;
        this.childsInstance = {};
        this.crontabJobsInstance = {};
    }

    /**
     * start TaskStatusClient
     */
    start() {
        this._process();
    }

    _process() {
        let self = this;

        // 根據(jù)服務(wù)器配置信息啟動(dòng)所有任務(wù)
        for(let taskConfig of self.taskClientConfig.tasks) {
            switch(taskConfig.type) {
                case 'daemon': {
                    self._daemonTaskHandler(taskConfig);
                    break;
                }
                case 'crontab': {
                    if(taskConfig.crontabRule) {
                        self._crontabTaskHanlder(taskConfig);
                    }
                    break;
                }
                default: {
                    console.log('unknow task type');
                    break;
                }
            }
        }

        // 在程序退出時(shí)結(jié)束所有子進(jìn)程任務(wù)
        process.on('exit', function (code) {
            for(let child in self.childsInstance) {
                if(self.childsInstance.hasOwnProperty(child)) {
                    self.childsInstance[child].kill('SIGHUP');
                }
            }
        });

        // 啟動(dòng) HTTP 服務(wù)器壤短,監(jiān)聽任務(wù)狀態(tài)監(jiān)控平臺(tái)發(fā)回的信號(hào)
        let app = express();
        app.use(bodyParser.json());
        app.use(bodyParser.urlencoded({ extended: false }));
        app.post('/', function (req, res) {
            let body;
            try {
                body = typeof req.body !== 'object' ? JSON.parse(req.body) : req.body;
            } catch (error) {
                return res.status(400).json({ message: 'invalid json' });
            }
            res.status(200).json({ message: 'ok' });
            
            // 收到任務(wù)狀態(tài)監(jiān)控平臺(tái)發(fā)回的信號(hào)后重啟任務(wù)
            let taskConfig = _.find(self.taskClientConfig.tasks, { name: body.name });
            let taskIdentifier = '';
            // daemon 類型的任務(wù)在 childsInstance 中的 key 是任務(wù)名
            // crontab 類型的任務(wù)在 childsInstance 中的 key 是任務(wù)名 + 任務(wù) ID
            switch (taskConfig.type) {
                case 'daemon': {
                    taskIdentifier = taskConfig.name;
                    self.childsInstance[taskIdentifier].kill('SIGHUP');
                    delete self.childsInstance[taskIdentifier];
                    if(self.handlers.daemonTaskHandler) {
                        self.handlers.daemonTaskHandler(taskConfig);
                    } else {
                        self._daemonTaskHandler(taskConfig);
                    }
                    break;
                }
                case 'crontab': {
                    taskIdentifier = taskConfig.name + taskConfig.id;
                    self.childsInstance[taskIdentifier].kill('SIGHUP');
                    delete self.childsInstance[taskIdentifier];
                    if(self.handlers.crontabTaskHandler) {
                        self.handlers.crontabTaskHandler(taskConfig);
                    } else {
                        self._crontabTaskHanlder(taskConfig);
                    }
                    break;
                }
                default: {
                    console.log('unknow task type');
                    break;
                }
            }

        });
        app.listen(self.taskClientConfig.server.port, function () {
            console.log('server start at: ' + self.taskClientConfig.server.port);
            self._registerServer(self.taskClientConfig, function (error, result) {
                if (error) {
                    console.log(error);
                }
            });
        });
    }

    _daemonTaskHandler(taskConfig) {
        let self = this;
        let taskInfo = {
            appName: self.appName,
            name: taskConfig.name,
            expires: taskConfig.timeout
        };
        let child = fork(path.join(self.taskHomePath,taskConfig.file));
        self.childsInstance[taskInfo.name] = child;
        child.on('message', function (message) {
            switch (message.signal) {
                case 'start': {
                    taskInfo.id = message.id;
                    self._startTask(taskInfo, function (error) {
                        if (error) {
                            console.log(error);
                        }
                    });
                    break;
                }
                case 'end': {
                    taskInfo.id = message.id;
                    self._endTask(taskInfo, function (error) {
                        if (error) {
                            console.log(error)
                        }
                    });
                    break;
                }
                default: {
                    console.log('unknow signal');
                    break;
                }
            }
        });
    }

    _crontabTaskHanlder(taskConfig) {
        let self = this;
        let taskInfo = {
            appName: self.appName,
            name: taskConfig.name,
            id: uuid.v4(),
            expires: taskConfig.timeout
        };
        self.crontabJobsInstance[taskInfo.name + taskInfo.id] = new CronJob(taskConfig.crontabRule, function () {
            self._startTask(taskInfo, function (error) {
                if(error) {
                    console.log(error);
                } else {
                    let child = fork(path.join(self.taskHomePath, taskConfig.file));
                    self.childsInstance[taskInfo.name + taskInfo.id] = child;
                    child.on('exit', function (code) {
                        // 子進(jìn)程退出 code 為 0设拟,代表正常退出,這時(shí)可以向監(jiān)控平臺(tái)發(fā)送任務(wù)已完成信號(hào)
                        if(code === 0) {
                            self._endTask(taskInfo, function (error) {
                                if (error) {
                                    console.log(error);
                                }
                            });
                        }
                    });
                }
            });
        }, undefined, true);
    }

    _startTask(taskInfo, callback) {
        let requestOptions = {
            uri: this.taskClientConfig.management.host + ':' + this.taskClientConfig.management.port + '/tasks/status/start',
            method: 'POST',
            timeout: 5000,
            form: taskInfo
        };
        request(requestOptions, function (error, response, body) {
            return callback(error, body);
        });
    }

    _endTask(taskInfo, callback) {
        let requestOptions = {
            uri: this.taskClientConfig.management.host + ':' + this.taskClientConfig.management.port + '/tasks/status/end',
            method: 'POST',
            timeout: 5000,
            form: taskInfo
        };
        request(requestOptions, function (error, response, body) {
            return callback(error, body);
        });
    }

    _registerServer(taskClientConfig, callback) {
        let requestOptions = {
            uri: this.taskClientConfig.management.host + ':' + this.taskClientConfig.management.port + '/tasks/servers',
            method: 'POST',
            timeout: 5000,
            form: taskClientConfig
        };
        request(requestOptions, function (error, response, body) {
            return callback(error, body);
        });
    }
}

module.exports = TaskStatusManagementClient;
// 調(diào)度系統(tǒng)的使用
'use strict';

const config = {
    // 監(jiān)控平臺(tái)信息久脯,必須
    management: {
        host: 'http://127.0.0.1',
        port: 3000
    },
    // 當(dāng)前服務(wù)器信息纳胧,必須
    server: {
        host: 'http://127.0.0.1',
        port: 3001
    },
    // 當(dāng)前服務(wù)器任務(wù)文件地址(絕對(duì)路徑),必須
    taskHomePath: path.join(__dirname, 'tasks'),
    // 任務(wù)配置信息帘撰,必須
    tasks:[{
        name: 'exampleTaskOne',
        type: 'daemon',
        file: 'example_task_one.js',
        timeout: 10000
    }, {
        name: 'exampleTaskTwo',
        type: 'crontab',
        file: 'example_task_two.js',
        crontabRule: '*/20 * * * * *',  // 任務(wù)類型為 crontab 是此字段為必須
        timeout: 10000
    }]
};
let taskStatusManagementClient = new TaskStatusManagementClient(config);
taskStatusManagementClient.start();

監(jiān)控平臺(tái) HTTP 服務(wù)器比較簡(jiǎn)單跑慕,三個(gè) API,用來將服務(wù)器信息摧找、任務(wù)開始狀態(tài)核行、任務(wù)結(jié)束狀態(tài)寫入數(shù)據(jù)庫,這里就不在贅述蹬耘。

// 監(jiān)控平臺(tái)任務(wù)狀態(tài)監(jiān)管
'use strict';

const async = require('async');
const config = require('config');
const mail = require('nodemailer').createTransport({
    service: 'your email service',
    auth: {
        user: 'your email username',
        pass: 'your email password'
    }
});
const request = require('request');

const logger = require('../log/logger');
const moment = require('../libs/moment');
// 一下三個(gè)為數(shù)據(jù)庫操作連接芝雪,不需要關(guān)注內(nèi)部代碼,不影響代碼閱讀
const TaskResult = require('../models/task_result');
const TaskStatus = require('../models/task_status');
const TaskServer = require('../models/task_server');

/**
 * 發(fā)送告警郵件
 * @param {String} to 郵件接收者
 * @param {String} subject 郵件主題
 * @param {String} message 郵件內(nèi)容
 * @param {Function} callback
 */
function sendWarningMessage(to, subject, message, callback) {
    var options = {
        'from': 'xxx@xxx.xxx',
        'to': to,
        'subject': subject,
        'text': message,
        'encoding': 'UTF-8'
    };
    mail.sendMail(options, function(error) {
        console.log('send message to: ' + to);
        return callback(error);
    });
}

/**
 * 處理執(zhí)行成功任務(wù)
 * @param {String} task 任務(wù)狀態(tài)對(duì)象
 * @param {Function} callback
 */
function handleSingleSucceedTask(task, callback) {
        TaskResult.increaseTaskSuccessCount(task.name, function (error) {
            return callback(error);
        });
}

/**
 * 處理執(zhí)行失敗的任務(wù)
 * @param {String} task 任務(wù)狀態(tài)對(duì)象
 * @param {Function} callback
 */
function handleSingleErrorTask(task, callback) {
    async.waterfall([
        // 增加任務(wù)執(zhí)行失敗次數(shù)
        async.apply(TaskResult.increaseTaskErrorCount.bind(TaskResult), task.name),
        async.apply(TaskResult.getTaskErrorCount.bind(TaskResult), task.name),
        function (count, callback) {
            callback(undefined);
            // 給調(diào)度系統(tǒng)發(fā)信號(hào)
            TaskServer.findTaskServerHost(task.name, function (error, serverHost) {
                // 超過限制失敗次數(shù)综苔,發(fā)送告警郵件
                if (count >= config.get('task.limitErrorCount')) {
                    sendWarningMessage(config.get('task.noticeUserEmailAddress'), '定時(shí)任務(wù)告警', `${ serverHost }: "${ task.name }" 執(zhí)行失敗超過預(yù)定失敗次數(shù)`,  function (error) {
                        if (error) {
                            logger.error(error);
                        }
                    });
                    TaskResult.resetTaskErrorCount(task.name, function (error) {
                        if (error) {
                            logger.error(error);
                        }
                    });
                }
                if (!error && serverHost) {
                    // send 'error' signal to task server
                    request({
                        uri: serverHost,
                        method: 'POST',
                        timeout: 5000,
                        form: {
                            name: task.name,
                            id: task.taskId,
                            pid: task.pid
                        }
                    }, function (error, response, body) {
                        if (error) {
                            logger.error(error);
                        }
                    });
                }
            });
        }
    ], function (error) {
        return callback(error);
    });
}

/**
 * 獲取所有已經(jīng)到達(dá)預(yù)定超時(shí)時(shí)間的任務(wù)并處理
 */
function process() {
    let currentTime = new Date().getTime();
    async.waterfall([
        async.apply(TaskStatus.getExpiredTasks.bind(TaskStatus), currentTime),
        // 并行處理每個(gè)任務(wù)
        function (tasks, callback) {
            if (tasks.length > 0) {
                async.parallel(tasks.map(function (task) {
                    return function (callback) {
                        // 超時(shí)后任務(wù)狀態(tài)仍然為 running惩系,代表任務(wù)執(zhí)行失敗
                        if (task.status === 'running') {
                            handleSingleErrorTask(task, function (error) {
                                return callback(error);
                            });
                        } else {
                            handleSingleSucceedTask(task, function (error) {
                                return callback(error);
                            });
                        }
                    }
                }), function (error) {
                    return callback(error);
                });
            } else {
                return callback(new Error('no tasks need to exec'));
            }
        },
        // 刪除已經(jīng)處理完成的任務(wù)
        async.apply(TaskStatus.removeExpiredTasks.bind(TaskStatus), currentTime)
    ], function (error) {
        if (error && error.message === 'no tasks need to exec') {
            let delay = moment.millisecondToDayMinuteHourSecond(config.get('task.noTaskDelayTime'));
            console.log(`no tasks, delay ${ delay }`);
            setTimeout(process, config.get('task.noTaskDelayTime'));
        } else if (error) {
            logger.error(error);
            process();
        } else {
            process();
        }
    });
}

module.exports = process;
定時(shí)任務(wù)狀態(tài)管理平臺(tái)流程圖.png

實(shí)現(xiàn)代碼

后續(xù)會(huì)把關(guān)鍵信息從代碼中抽離出來放到配置文件中,然后放到 GitHub 上如筛,暫時(shí)以貼代碼的形式簡(jiǎn)單展示一下堡牡。

// 任務(wù)運(yùn)行服務(wù)器調(diào)度系統(tǒng)
'use strict';

// 內(nèi)建模塊
const fork = require('child_process').fork;
const path = require('path');

// 第三方模塊
const _ = require('lodash');
const CronJob = require('cron').CronJob;
const bodyParser = require('body-parser');
const express = require('express');
const request = require('request');
const uuid = require('uuid');

class TaskStatusManagementClient {
    /**
     * 初始化 TaskStatusClient
     * @param {Object} taskClientConfig 服務(wù)器配置信息
     */
    constructor(taskClientConfig) {
        this.taskClientConfig = taskClientConfig;
        this.taskHomePath = taskClientConfig.taskHomePath;
        this.childsInstance = {};
        this.crontabJobsInstance = {};
    }

    /**
     * start TaskStatusClient
     */
    start() {
        this._process();
    }

    _process() {
        let self = this;

        // 根據(jù)服務(wù)器配置信息啟動(dòng)所有任務(wù)
        for(let taskConfig of self.taskClientConfig.tasks) {
            switch(taskConfig.type) {
                case 'daemon': {
                    self._daemonTaskHandler(taskConfig);
                    break;
                }
                case 'crontab': {
                    if(taskConfig.crontabRule) {
                        self._crontabTaskHanlder(taskConfig);
                    }
                    break;
                }
                default: {
                    console.log('unknow task type');
                    break;
                }
            }
        }

        // 在程序退出時(shí)結(jié)束所有子進(jìn)程任務(wù)
        process.on('exit', function (code) {
            for(let child in self.childsInstance) {
                if(self.childsInstance.hasOwnProperty(child)) {
                    self.childsInstance[child].kill('SIGHUP');
                }
            }
        });

        // 啟動(dòng) HTTP 服務(wù)器,監(jiān)聽任務(wù)狀態(tài)監(jiān)控平臺(tái)發(fā)回的信號(hào)
        let app = express();
        app.use(bodyParser.json());
        app.use(bodyParser.urlencoded({ extended: false }));
        app.post('/', function (req, res) {
            let body;
            try {
                body = typeof req.body !== 'object' ? JSON.parse(req.body) : req.body;
            } catch (error) {
                return res.status(400).json({ message: 'invalid json' });
            }
            res.status(200).json({ message: 'ok' });
            
            // 收到任務(wù)狀態(tài)監(jiān)控平臺(tái)發(fā)回的信號(hào)后重啟任務(wù)
            let taskConfig = _.find(self.taskClientConfig.tasks, { name: body.name });
            let taskIdentifier = '';
            // daemon 類型的任務(wù)在 childsInstance 中的 key 是任務(wù)名
            // crontab 類型的任務(wù)在 childsInstance 中的 key 是任務(wù)名 + 任務(wù) ID
            switch (taskConfig.type) {
                case 'daemon': {
                    taskIdentifier = taskConfig.name;
                    self.childsInstance[taskIdentifier].kill('SIGHUP');
                    delete self.childsInstance[taskIdentifier];
                    if(self.handlers.daemonTaskHandler) {
                        self.handlers.daemonTaskHandler(taskConfig);
                    } else {
                        self._daemonTaskHandler(taskConfig);
                    }
                    break;
                }
                case 'crontab': {
                    taskIdentifier = taskConfig.name + taskConfig.id;
                    self.childsInstance[taskIdentifier].kill('SIGHUP');
                    delete self.childsInstance[taskIdentifier];
                    if(self.handlers.crontabTaskHandler) {
                        self.handlers.crontabTaskHandler(taskConfig);
                    } else {
                        self._crontabTaskHanlder(taskConfig);
                    }
                    break;
                }
                default: {
                    console.log('unknow task type');
                    break;
                }
            }

        });
        app.listen(self.taskClientConfig.server.port, function () {
            console.log('server start at: ' + self.taskClientConfig.server.port);
            self._registerServer(self.taskClientConfig, function (error, result) {
                if (error) {
                    console.log(error);
                }
            });
        });
    }

    _daemonTaskHandler(taskConfig) {
        let self = this;
        let taskInfo = {
            appName: self.appName,
            name: taskConfig.name,
            expires: taskConfig.timeout
        };
        let child = fork(path.join(self.taskHomePath,taskConfig.file));
        self.childsInstance[taskInfo.name] = child;
        child.on('message', function (message) {
            switch (message.signal) {
                case 'start': {
                    taskInfo.id = message.id;
                    self._startTask(taskInfo, function (error) {
                        if (error) {
                            console.log(error);
                        }
                    });
                    break;
                }
                case 'end': {
                    taskInfo.id = message.id;
                    self._endTask(taskInfo, function (error) {
                        if (error) {
                            console.log(error)
                        }
                    });
                    break;
                }
                default: {
                    console.log('unknow signal');
                    break;
                }
            }
        });
    }

    _crontabTaskHanlder(taskConfig) {
        let self = this;
        let taskInfo = {
            appName: self.appName,
            name: taskConfig.name,
            id: uuid.v4(),
            expires: taskConfig.timeout
        };
        self.crontabJobsInstance[taskInfo.name + taskInfo.id] = new CronJob(taskConfig.crontabRule, function () {
            self._startTask(taskInfo, function (error) {
                if(error) {
                    console.log(error);
                } else {
                    let child = fork(path.join(self.taskHomePath, taskConfig.file));
                    self.childsInstance[taskInfo.name + taskInfo.id] = child;
                    child.on('exit', function (code) {
                        // 子進(jìn)程退出 code 為 0杨刨,代表正常退出晤柄,這時(shí)可以向監(jiān)控平臺(tái)發(fā)送任務(wù)已完成信號(hào)
                        if(code === 0) {
                            self._endTask(taskInfo, function (error) {
                                if (error) {
                                    console.log(error);
                                }
                            });
                        }
                    });
                }
            });
        }, undefined, true);
    }

    _startTask(taskInfo, callback) {
        let requestOptions = {
            uri: this.taskClientConfig.management.host + ':' + this.taskClientConfig.management.port + '/tasks/status/start',
            method: 'POST',
            timeout: 5000,
            form: taskInfo
        };
        request(requestOptions, function (error, response, body) {
            return callback(error, body);
        });
    }

    _endTask(taskInfo, callback) {
        let requestOptions = {
            uri: this.taskClientConfig.management.host + ':' + this.taskClientConfig.management.port + '/tasks/status/end',
            method: 'POST',
            timeout: 5000,
            form: taskInfo
        };
        request(requestOptions, function (error, response, body) {
            return callback(error, body);
        });
    }

    _registerServer(taskClientConfig, callback) {
        let requestOptions = {
            uri: this.taskClientConfig.management.host + ':' + this.taskClientConfig.management.port + '/tasks/servers',
            method: 'POST',
            timeout: 5000,
            form: taskClientConfig
        };
        request(requestOptions, function (error, response, body) {
            return callback(error, body);
        });
    }
}

module.exports = TaskStatusManagementClient;
// 調(diào)度系統(tǒng)的使用
'use strict';

const config = {
    // 監(jiān)控平臺(tái)信息,必須
    management: {
        host: 'http://127.0.0.1',
        port: 3000
    },
    // 當(dāng)前服務(wù)器信息妖胀,必須
    server: {
        host: 'http://127.0.0.1',
        port: 3001
    },
    // 當(dāng)前服務(wù)器任務(wù)文件地址(絕對(duì)路徑)芥颈,必須
    taskHomePath: path.join(__dirname, 'tasks'),
    // 任務(wù)配置信息惠勒,必須
    tasks:[{
        name: 'exampleTaskOne',
        type: 'daemon',
        file: 'example_task_one.js',
        timeout: 10000
    }, {
        name: 'exampleTaskTwo',
        type: 'crontab',
        file: 'example_task_two.js',
        crontabRule: '*/20 * * * * *',  // 任務(wù)類型為 crontab 是此字段為必須
        timeout: 10000
    }]
};
let taskStatusManagementClient = new TaskStatusManagementClient(config);
taskStatusManagementClient.start();

監(jiān)控平臺(tái) HTTP 服務(wù)器比較簡(jiǎn)單,三個(gè) API浇借,用來將服務(wù)器信息、任務(wù)開始狀態(tài)怕品、任務(wù)結(jié)束狀態(tài)寫入數(shù)據(jù)庫妇垢,這里就不在贅述。

// 監(jiān)控平臺(tái)任務(wù)狀態(tài)監(jiān)管
'use strict';

const async = require('async');
const config = require('config');
const mail = require('nodemailer').createTransport({
    service: 'your email service',
    auth: {
        user: 'your email username',
        pass: 'your email password'
    }
});
const request = require('request');

const logger = require('../log/logger');
const moment = require('../libs/moment');
// 一下三個(gè)為數(shù)據(jù)庫操作連接肉康,不需要關(guān)注內(nèi)部代碼闯估,不影響代碼閱讀
const TaskResult = require('../models/task_result');
const TaskStatus = require('../models/task_status');
const TaskServer = require('../models/task_server');

/**
 * 發(fā)送告警郵件
 * @param {String} to 郵件接收者
 * @param {String} subject 郵件主題
 * @param {String} message 郵件內(nèi)容
 * @param {Function} callback
 */
function sendWarningMessage(to, subject, message, callback) {
    var options = {
        'from': 'xxx@xxx.xxx',
        'to': to,
        'subject': subject,
        'text': message,
        'encoding': 'UTF-8'
    };
    mail.sendMail(options, function(error) {
        console.log('send message to: ' + to);
        return callback(error);
    });
}

/**
 * 處理執(zhí)行成功任務(wù)
 * @param {String} task 任務(wù)狀態(tài)對(duì)象
 * @param {Function} callback
 */
function handleSingleSucceedTask(task, callback) {
        TaskResult.increaseTaskSuccessCount(task.name, function (error) {
            return callback(error);
        });
}

/**
 * 處理執(zhí)行失敗的任務(wù)
 * @param {String} task 任務(wù)狀態(tài)對(duì)象
 * @param {Function} callback
 */
function handleSingleErrorTask(task, callback) {
    async.waterfall([
        // 增加任務(wù)執(zhí)行失敗次數(shù)
        async.apply(TaskResult.increaseTaskErrorCount.bind(TaskResult), task.name),
        async.apply(TaskResult.getTaskErrorCount.bind(TaskResult), task.name),
        function (count, callback) {
            callback(undefined);
            // 給調(diào)度系統(tǒng)發(fā)信號(hào)
            TaskServer.findTaskServerHost(task.name, function (error, serverHost) {
                // 超過限制失敗次數(shù),發(fā)送告警郵件
                if (count >= config.get('task.limitErrorCount')) {
                    sendWarningMessage(config.get('task.noticeUserEmailAddress'), '定時(shí)任務(wù)告警', `${ serverHost }: "${ task.name }" 執(zhí)行失敗超過預(yù)定失敗次數(shù)`,  function (error) {
                        if (error) {
                            logger.error(error);
                        }
                    });
                    TaskResult.resetTaskErrorCount(task.name, function (error) {
                        if (error) {
                            logger.error(error);
                        }
                    });
                }
                if (!error && serverHost) {
                    // send 'error' signal to task server
                    request({
                        uri: serverHost,
                        method: 'POST',
                        timeout: 5000,
                        form: {
                            name: task.name,
                            id: task.taskId,
                            pid: task.pid
                        }
                    }, function (error, response, body) {
                        if (error) {
                            logger.error(error);
                        }
                    });
                }
            });
        }
    ], function (error) {
        return callback(error);
    });
}

/**
 * 獲取所有已經(jīng)到達(dá)預(yù)定超時(shí)時(shí)間的任務(wù)并處理
 */
function process() {
    let currentTime = new Date().getTime();
    async.waterfall([
        async.apply(TaskStatus.getExpiredTasks.bind(TaskStatus), currentTime),
        // 并行處理每個(gè)任務(wù)
        function (tasks, callback) {
            if (tasks.length > 0) {
                async.parallel(tasks.map(function (task) {
                    return function (callback) {
                        // 超時(shí)后任務(wù)狀態(tài)仍然為 running吼和,代表任務(wù)執(zhí)行失敗
                        if (task.status === 'running') {
                            handleSingleErrorTask(task, function (error) {
                                return callback(error);
                            });
                        } else {
                            handleSingleSucceedTask(task, function (error) {
                                return callback(error);
                            });
                        }
                    }
                }), function (error) {
                    return callback(error);
                });
            } else {
                return callback(new Error('no tasks need to exec'));
            }
        },
        // 刪除已經(jīng)處理完成的任務(wù)
        async.apply(TaskStatus.removeExpiredTasks.bind(TaskStatus), currentTime)
    ], function (error) {
        if (error && error.message === 'no tasks need to exec') {
            let delay = moment.millisecondToDayMinuteHourSecond(config.get('task.noTaskDelayTime'));
            console.log(`no tasks, delay ${ delay }`);
            setTimeout(process, config.get('task.noTaskDelayTime'));
        } else if (error) {
            logger.error(error);
            process();
        } else {
            process();
        }
    });
}

module.exports = process;
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末涨薪,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子炫乓,更是在濱河造成了極大的恐慌刚夺,老刑警劉巖,帶你破解...
    沈念sama閱讀 206,839評(píng)論 6 482
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件末捣,死亡現(xiàn)場(chǎng)離奇詭異侠姑,居然都是意外死亡,警方通過查閱死者的電腦和手機(jī)箩做,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,543評(píng)論 2 382
  • 文/潘曉璐 我一進(jìn)店門莽红,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人邦邦,你說我怎么就攤上這事安吁。” “怎么了燃辖?”我有些...
    開封第一講書人閱讀 153,116評(píng)論 0 344
  • 文/不壞的土叔 我叫張陵鬼店,是天一觀的道長(zhǎng)。 經(jīng)常有香客問我黔龟,道長(zhǎng)薪韩,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 55,371評(píng)論 1 279
  • 正文 為了忘掉前任捌锭,我火速辦了婚禮俘陷,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘观谦。我一直安慰自己拉盾,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 64,384評(píng)論 5 374
  • 文/花漫 我一把揭開白布豁状。 她就那樣靜靜地躺著捉偏,像睡著了一般倒得。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上夭禽,一...
    開封第一講書人閱讀 49,111評(píng)論 1 285
  • 那天霞掺,我揣著相機(jī)與錄音,去河邊找鬼讹躯。 笑死菩彬,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的潮梯。 我是一名探鬼主播骗灶,決...
    沈念sama閱讀 38,416評(píng)論 3 400
  • 文/蒼蘭香墨 我猛地睜開眼,長(zhǎng)吁一口氣:“原來是場(chǎng)噩夢(mèng)啊……” “哼秉馏!你這毒婦竟也來了耙旦?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 37,053評(píng)論 0 259
  • 序言:老撾萬榮一對(duì)情侶失蹤萝究,失蹤者是張志新(化名)和其女友劉穎免都,沒想到半個(gè)月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體帆竹,經(jīng)...
    沈念sama閱讀 43,558評(píng)論 1 300
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡琴昆,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,007評(píng)論 2 325
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了馆揉。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片业舍。...
    茶點(diǎn)故事閱讀 38,117評(píng)論 1 334
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖升酣,靈堂內(nèi)的尸體忽然破棺而出舷暮,到底是詐尸還是另有隱情,我是刑警寧澤噩茄,帶...
    沈念sama閱讀 33,756評(píng)論 4 324
  • 正文 年R本政府宣布下面,位于F島的核電站,受9級(jí)特大地震影響绩聘,放射性物質(zhì)發(fā)生泄漏沥割。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,324評(píng)論 3 307
  • 文/蒙蒙 一凿菩、第九天 我趴在偏房一處隱蔽的房頂上張望机杜。 院中可真熱鬧,春花似錦衅谷、人聲如沸椒拗。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,315評(píng)論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽蚀苛。三九已至在验,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間堵未,已是汗流浹背腋舌。 一陣腳步聲響...
    開封第一講書人閱讀 31,539評(píng)論 1 262
  • 我被黑心中介騙來泰國(guó)打工, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留渗蟹,地道東北人块饺。 一個(gè)月前我還...
    沈念sama閱讀 45,578評(píng)論 2 355
  • 正文 我出身青樓,卻偏偏與公主長(zhǎng)得像拙徽,于是被迫代替她去往敵國(guó)和親刨沦。 傳聞我的和親對(duì)象是個(gè)殘疾皇子诗宣,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 42,877評(píng)論 2 345

推薦閱讀更多精彩內(nèi)容

  • 國(guó)家電網(wǎng)公司企業(yè)標(biāo)準(zhǔn)(Q/GDW)- 面向?qū)ο蟮挠秒娦畔?shù)據(jù)交換協(xié)議 - 報(bào)批稿:20170802 前言: 排版 ...
    庭說閱讀 10,869評(píng)論 6 13
  • Spring Cloud為開發(fā)人員提供了快速構(gòu)建分布式系統(tǒng)中一些常見模式的工具(例如配置管理膘怕,服務(wù)發(fā)現(xiàn),斷路器召庞,智...
    卡卡羅2017閱讀 134,600評(píng)論 18 139
  • 本文轉(zhuǎn)自運(yùn)維之路(id:HuashengPeng001)訂閱號(hào) 近年來,隨著計(jì)算機(jī)技術(shù)的飛速發(fā)展诅诱,以及行業(yè)信息的共...
    大數(shù)據(jù)之心閱讀 5,546評(píng)論 0 34
  • 明月皎皎髓堪,蟬鳴喈喈。 既見良人娘荡,云胡不夷干旁? 明月昭昭,蟬鳴嘁嘁炮沐。 既見良人争群,云胡不瘳? 明月皓皓大年,蟬鳴不已换薄。 既見...
    布瓜先生閱讀 456評(píng)論 3 2
  • 堅(jiān)強(qiáng)是什么呢?每個(gè)人都認(rèn)為不一樣吧…… 我認(rèn)為很簡(jiǎn)單翔试,有一天你可以笑著講述那些曾經(jīng)讓你哭的過往
    韓昕閱讀 194評(píng)論 0 0