在實(shí)際生產(chǎn)環(huán)境中,避免不了有很多后臺(tái)運(yùn)行的任務(wù)和定時(shí)任務(wù)志珍,對(duì)任務(wù)狀態(tài)的監(jiān)控與及時(shí)告警可以盡量減少程序出錯(cuò)時(shí)對(duì)用戶造成的影響淳地。針對(duì)常見的兩種任務(wù)類型:定時(shí)任務(wù)、守護(hù)進(jìn)程內(nèi)批處理任務(wù)纱扭,利用 Node.js child_process 實(shí)現(xiàn)了任務(wù)狀態(tài)的監(jiān)控、重啟與郵件告警儡遮。
思路
現(xiàn)在的互聯(lián)網(wǎng)已經(jīng)不是單機(jī)作戰(zhàn)的時(shí)代了乳蛾,分布式部署是非常常見的方式,一個(gè)項(xiàng)目中的任務(wù)可能運(yùn)行在多臺(tái)服務(wù)器上鄙币,我們的監(jiān)控平臺(tái)要做到重啟某個(gè)任務(wù)就需要知道任務(wù)運(yùn)行的具體服務(wù)器肃叶,針對(duì)這一個(gè)問題我們需要獲取到任務(wù)與服務(wù)器關(guān)系的確切信息,所以每臺(tái)運(yùn)行任務(wù)的服務(wù)器需要在啟動(dòng)任務(wù)時(shí)向任務(wù)狀態(tài)管理平臺(tái)注冊(cè)自己的信息十嘿。
任務(wù)狀態(tài)的維護(hù)依賴于任務(wù)運(yùn)行服務(wù)器的心跳上報(bào)因惭,每個(gè)任務(wù)設(shè)置一個(gè)超時(shí)時(shí)間,在任務(wù)啟動(dòng)時(shí)向任務(wù)狀態(tài)管理平臺(tái)發(fā)送開始運(yùn)行信號(hào)绩衷,在任務(wù)運(yùn)行結(jié)束后向管理平臺(tái)發(fā)送運(yùn)行完成信號(hào)蹦魔。任務(wù)管理平臺(tái)根據(jù)任務(wù)設(shè)置的超時(shí)時(shí)間激率,在超時(shí)后仍然沒有接收到任務(wù)完成信號(hào)則判定任務(wù)失敗,將任務(wù)失敗信號(hào)發(fā)送回任務(wù)運(yùn)行的服務(wù)器勿决。再有任務(wù)運(yùn)行服務(wù)器自行處理乒躺,如重啟任務(wù)或者結(jié)束任務(wù)等。
根據(jù)以上的邏輯低缩,實(shí)際需要就是在任務(wù)運(yùn)行的服務(wù)器實(shí)現(xiàn)一個(gè)任務(wù)調(diào)度功能與 HTTP 服務(wù)器用來監(jiān)聽管理平臺(tái)發(fā)送的信號(hào)嘉冒;在管理平臺(tái)這邊實(shí)現(xiàn)任務(wù)服務(wù)器信息注冊(cè)、任務(wù)狀態(tài)監(jiān)管與超時(shí)告警咆繁。文字表述比較晦澀讳推,具體流程可以參考一下的流程圖。
實(shí)現(xiàn)代碼
后續(xù)會(huì)把關(guān)鍵信息從代碼中抽離出來放到配置文件中玩般,然后放到 GitHub 上娜遵,暫時(shí)以貼代碼的形式簡(jiǎn)單展示一下。
// 任務(wù)運(yùn)行服務(wù)器調(diào)度系統(tǒng)
'use strict';
// 內(nèi)建模塊
const fork = require('child_process').fork;
const path = require('path');
// 第三方模塊
const _ = require('lodash');
const CronJob = require('cron').CronJob;
const bodyParser = require('body-parser');
const express = require('express');
const request = require('request');
const uuid = require('uuid');
class TaskStatusManagementClient {
/**
* 初始化 TaskStatusClient
* @param {Object} taskClientConfig 服務(wù)器配置信息
*/
constructor(taskClientConfig) {
this.taskClientConfig = taskClientConfig;
this.taskHomePath = taskClientConfig.taskHomePath;
this.childsInstance = {};
this.crontabJobsInstance = {};
}
/**
* start TaskStatusClient
*/
start() {
this._process();
}
_process() {
let self = this;
// 根據(jù)服務(wù)器配置信息啟動(dòng)所有任務(wù)
for(let taskConfig of self.taskClientConfig.tasks) {
switch(taskConfig.type) {
case 'daemon': {
self._daemonTaskHandler(taskConfig);
break;
}
case 'crontab': {
if(taskConfig.crontabRule) {
self._crontabTaskHanlder(taskConfig);
}
break;
}
default: {
console.log('unknow task type');
break;
}
}
}
// 在程序退出時(shí)結(jié)束所有子進(jìn)程任務(wù)
process.on('exit', function (code) {
for(let child in self.childsInstance) {
if(self.childsInstance.hasOwnProperty(child)) {
self.childsInstance[child].kill('SIGHUP');
}
}
});
// 啟動(dòng) HTTP 服務(wù)器壤短,監(jiān)聽任務(wù)狀態(tài)監(jiān)控平臺(tái)發(fā)回的信號(hào)
let app = express();
app.use(bodyParser.json());
app.use(bodyParser.urlencoded({ extended: false }));
app.post('/', function (req, res) {
let body;
try {
body = typeof req.body !== 'object' ? JSON.parse(req.body) : req.body;
} catch (error) {
return res.status(400).json({ message: 'invalid json' });
}
res.status(200).json({ message: 'ok' });
// 收到任務(wù)狀態(tài)監(jiān)控平臺(tái)發(fā)回的信號(hào)后重啟任務(wù)
let taskConfig = _.find(self.taskClientConfig.tasks, { name: body.name });
let taskIdentifier = '';
// daemon 類型的任務(wù)在 childsInstance 中的 key 是任務(wù)名
// crontab 類型的任務(wù)在 childsInstance 中的 key 是任務(wù)名 + 任務(wù) ID
switch (taskConfig.type) {
case 'daemon': {
taskIdentifier = taskConfig.name;
self.childsInstance[taskIdentifier].kill('SIGHUP');
delete self.childsInstance[taskIdentifier];
if(self.handlers.daemonTaskHandler) {
self.handlers.daemonTaskHandler(taskConfig);
} else {
self._daemonTaskHandler(taskConfig);
}
break;
}
case 'crontab': {
taskIdentifier = taskConfig.name + taskConfig.id;
self.childsInstance[taskIdentifier].kill('SIGHUP');
delete self.childsInstance[taskIdentifier];
if(self.handlers.crontabTaskHandler) {
self.handlers.crontabTaskHandler(taskConfig);
} else {
self._crontabTaskHanlder(taskConfig);
}
break;
}
default: {
console.log('unknow task type');
break;
}
}
});
app.listen(self.taskClientConfig.server.port, function () {
console.log('server start at: ' + self.taskClientConfig.server.port);
self._registerServer(self.taskClientConfig, function (error, result) {
if (error) {
console.log(error);
}
});
});
}
_daemonTaskHandler(taskConfig) {
let self = this;
let taskInfo = {
appName: self.appName,
name: taskConfig.name,
expires: taskConfig.timeout
};
let child = fork(path.join(self.taskHomePath,taskConfig.file));
self.childsInstance[taskInfo.name] = child;
child.on('message', function (message) {
switch (message.signal) {
case 'start': {
taskInfo.id = message.id;
self._startTask(taskInfo, function (error) {
if (error) {
console.log(error);
}
});
break;
}
case 'end': {
taskInfo.id = message.id;
self._endTask(taskInfo, function (error) {
if (error) {
console.log(error)
}
});
break;
}
default: {
console.log('unknow signal');
break;
}
}
});
}
_crontabTaskHanlder(taskConfig) {
let self = this;
let taskInfo = {
appName: self.appName,
name: taskConfig.name,
id: uuid.v4(),
expires: taskConfig.timeout
};
self.crontabJobsInstance[taskInfo.name + taskInfo.id] = new CronJob(taskConfig.crontabRule, function () {
self._startTask(taskInfo, function (error) {
if(error) {
console.log(error);
} else {
let child = fork(path.join(self.taskHomePath, taskConfig.file));
self.childsInstance[taskInfo.name + taskInfo.id] = child;
child.on('exit', function (code) {
// 子進(jìn)程退出 code 為 0设拟,代表正常退出,這時(shí)可以向監(jiān)控平臺(tái)發(fā)送任務(wù)已完成信號(hào)
if(code === 0) {
self._endTask(taskInfo, function (error) {
if (error) {
console.log(error);
}
});
}
});
}
});
}, undefined, true);
}
_startTask(taskInfo, callback) {
let requestOptions = {
uri: this.taskClientConfig.management.host + ':' + this.taskClientConfig.management.port + '/tasks/status/start',
method: 'POST',
timeout: 5000,
form: taskInfo
};
request(requestOptions, function (error, response, body) {
return callback(error, body);
});
}
_endTask(taskInfo, callback) {
let requestOptions = {
uri: this.taskClientConfig.management.host + ':' + this.taskClientConfig.management.port + '/tasks/status/end',
method: 'POST',
timeout: 5000,
form: taskInfo
};
request(requestOptions, function (error, response, body) {
return callback(error, body);
});
}
_registerServer(taskClientConfig, callback) {
let requestOptions = {
uri: this.taskClientConfig.management.host + ':' + this.taskClientConfig.management.port + '/tasks/servers',
method: 'POST',
timeout: 5000,
form: taskClientConfig
};
request(requestOptions, function (error, response, body) {
return callback(error, body);
});
}
}
module.exports = TaskStatusManagementClient;
// 調(diào)度系統(tǒng)的使用
'use strict';
const config = {
// 監(jiān)控平臺(tái)信息久脯,必須
management: {
host: 'http://127.0.0.1',
port: 3000
},
// 當(dāng)前服務(wù)器信息纳胧,必須
server: {
host: 'http://127.0.0.1',
port: 3001
},
// 當(dāng)前服務(wù)器任務(wù)文件地址(絕對(duì)路徑),必須
taskHomePath: path.join(__dirname, 'tasks'),
// 任務(wù)配置信息帘撰,必須
tasks:[{
name: 'exampleTaskOne',
type: 'daemon',
file: 'example_task_one.js',
timeout: 10000
}, {
name: 'exampleTaskTwo',
type: 'crontab',
file: 'example_task_two.js',
crontabRule: '*/20 * * * * *', // 任務(wù)類型為 crontab 是此字段為必須
timeout: 10000
}]
};
let taskStatusManagementClient = new TaskStatusManagementClient(config);
taskStatusManagementClient.start();
監(jiān)控平臺(tái) HTTP 服務(wù)器比較簡(jiǎn)單跑慕,三個(gè) API,用來將服務(wù)器信息摧找、任務(wù)開始狀態(tài)核行、任務(wù)結(jié)束狀態(tài)寫入數(shù)據(jù)庫,這里就不在贅述蹬耘。
// 監(jiān)控平臺(tái)任務(wù)狀態(tài)監(jiān)管
'use strict';
const async = require('async');
const config = require('config');
const mail = require('nodemailer').createTransport({
service: 'your email service',
auth: {
user: 'your email username',
pass: 'your email password'
}
});
const request = require('request');
const logger = require('../log/logger');
const moment = require('../libs/moment');
// 一下三個(gè)為數(shù)據(jù)庫操作連接芝雪,不需要關(guān)注內(nèi)部代碼,不影響代碼閱讀
const TaskResult = require('../models/task_result');
const TaskStatus = require('../models/task_status');
const TaskServer = require('../models/task_server');
/**
* 發(fā)送告警郵件
* @param {String} to 郵件接收者
* @param {String} subject 郵件主題
* @param {String} message 郵件內(nèi)容
* @param {Function} callback
*/
function sendWarningMessage(to, subject, message, callback) {
var options = {
'from': 'xxx@xxx.xxx',
'to': to,
'subject': subject,
'text': message,
'encoding': 'UTF-8'
};
mail.sendMail(options, function(error) {
console.log('send message to: ' + to);
return callback(error);
});
}
/**
* 處理執(zhí)行成功任務(wù)
* @param {String} task 任務(wù)狀態(tài)對(duì)象
* @param {Function} callback
*/
function handleSingleSucceedTask(task, callback) {
TaskResult.increaseTaskSuccessCount(task.name, function (error) {
return callback(error);
});
}
/**
* 處理執(zhí)行失敗的任務(wù)
* @param {String} task 任務(wù)狀態(tài)對(duì)象
* @param {Function} callback
*/
function handleSingleErrorTask(task, callback) {
async.waterfall([
// 增加任務(wù)執(zhí)行失敗次數(shù)
async.apply(TaskResult.increaseTaskErrorCount.bind(TaskResult), task.name),
async.apply(TaskResult.getTaskErrorCount.bind(TaskResult), task.name),
function (count, callback) {
callback(undefined);
// 給調(diào)度系統(tǒng)發(fā)信號(hào)
TaskServer.findTaskServerHost(task.name, function (error, serverHost) {
// 超過限制失敗次數(shù)综苔,發(fā)送告警郵件
if (count >= config.get('task.limitErrorCount')) {
sendWarningMessage(config.get('task.noticeUserEmailAddress'), '定時(shí)任務(wù)告警', `${ serverHost }: "${ task.name }" 執(zhí)行失敗超過預(yù)定失敗次數(shù)`, function (error) {
if (error) {
logger.error(error);
}
});
TaskResult.resetTaskErrorCount(task.name, function (error) {
if (error) {
logger.error(error);
}
});
}
if (!error && serverHost) {
// send 'error' signal to task server
request({
uri: serverHost,
method: 'POST',
timeout: 5000,
form: {
name: task.name,
id: task.taskId,
pid: task.pid
}
}, function (error, response, body) {
if (error) {
logger.error(error);
}
});
}
});
}
], function (error) {
return callback(error);
});
}
/**
* 獲取所有已經(jīng)到達(dá)預(yù)定超時(shí)時(shí)間的任務(wù)并處理
*/
function process() {
let currentTime = new Date().getTime();
async.waterfall([
async.apply(TaskStatus.getExpiredTasks.bind(TaskStatus), currentTime),
// 并行處理每個(gè)任務(wù)
function (tasks, callback) {
if (tasks.length > 0) {
async.parallel(tasks.map(function (task) {
return function (callback) {
// 超時(shí)后任務(wù)狀態(tài)仍然為 running惩系,代表任務(wù)執(zhí)行失敗
if (task.status === 'running') {
handleSingleErrorTask(task, function (error) {
return callback(error);
});
} else {
handleSingleSucceedTask(task, function (error) {
return callback(error);
});
}
}
}), function (error) {
return callback(error);
});
} else {
return callback(new Error('no tasks need to exec'));
}
},
// 刪除已經(jīng)處理完成的任務(wù)
async.apply(TaskStatus.removeExpiredTasks.bind(TaskStatus), currentTime)
], function (error) {
if (error && error.message === 'no tasks need to exec') {
let delay = moment.millisecondToDayMinuteHourSecond(config.get('task.noTaskDelayTime'));
console.log(`no tasks, delay ${ delay }`);
setTimeout(process, config.get('task.noTaskDelayTime'));
} else if (error) {
logger.error(error);
process();
} else {
process();
}
});
}
module.exports = process;
實(shí)現(xiàn)代碼
后續(xù)會(huì)把關(guān)鍵信息從代碼中抽離出來放到配置文件中,然后放到 GitHub 上如筛,暫時(shí)以貼代碼的形式簡(jiǎn)單展示一下堡牡。
// 任務(wù)運(yùn)行服務(wù)器調(diào)度系統(tǒng)
'use strict';
// 內(nèi)建模塊
const fork = require('child_process').fork;
const path = require('path');
// 第三方模塊
const _ = require('lodash');
const CronJob = require('cron').CronJob;
const bodyParser = require('body-parser');
const express = require('express');
const request = require('request');
const uuid = require('uuid');
class TaskStatusManagementClient {
/**
* 初始化 TaskStatusClient
* @param {Object} taskClientConfig 服務(wù)器配置信息
*/
constructor(taskClientConfig) {
this.taskClientConfig = taskClientConfig;
this.taskHomePath = taskClientConfig.taskHomePath;
this.childsInstance = {};
this.crontabJobsInstance = {};
}
/**
* start TaskStatusClient
*/
start() {
this._process();
}
_process() {
let self = this;
// 根據(jù)服務(wù)器配置信息啟動(dòng)所有任務(wù)
for(let taskConfig of self.taskClientConfig.tasks) {
switch(taskConfig.type) {
case 'daemon': {
self._daemonTaskHandler(taskConfig);
break;
}
case 'crontab': {
if(taskConfig.crontabRule) {
self._crontabTaskHanlder(taskConfig);
}
break;
}
default: {
console.log('unknow task type');
break;
}
}
}
// 在程序退出時(shí)結(jié)束所有子進(jìn)程任務(wù)
process.on('exit', function (code) {
for(let child in self.childsInstance) {
if(self.childsInstance.hasOwnProperty(child)) {
self.childsInstance[child].kill('SIGHUP');
}
}
});
// 啟動(dòng) HTTP 服務(wù)器,監(jiān)聽任務(wù)狀態(tài)監(jiān)控平臺(tái)發(fā)回的信號(hào)
let app = express();
app.use(bodyParser.json());
app.use(bodyParser.urlencoded({ extended: false }));
app.post('/', function (req, res) {
let body;
try {
body = typeof req.body !== 'object' ? JSON.parse(req.body) : req.body;
} catch (error) {
return res.status(400).json({ message: 'invalid json' });
}
res.status(200).json({ message: 'ok' });
// 收到任務(wù)狀態(tài)監(jiān)控平臺(tái)發(fā)回的信號(hào)后重啟任務(wù)
let taskConfig = _.find(self.taskClientConfig.tasks, { name: body.name });
let taskIdentifier = '';
// daemon 類型的任務(wù)在 childsInstance 中的 key 是任務(wù)名
// crontab 類型的任務(wù)在 childsInstance 中的 key 是任務(wù)名 + 任務(wù) ID
switch (taskConfig.type) {
case 'daemon': {
taskIdentifier = taskConfig.name;
self.childsInstance[taskIdentifier].kill('SIGHUP');
delete self.childsInstance[taskIdentifier];
if(self.handlers.daemonTaskHandler) {
self.handlers.daemonTaskHandler(taskConfig);
} else {
self._daemonTaskHandler(taskConfig);
}
break;
}
case 'crontab': {
taskIdentifier = taskConfig.name + taskConfig.id;
self.childsInstance[taskIdentifier].kill('SIGHUP');
delete self.childsInstance[taskIdentifier];
if(self.handlers.crontabTaskHandler) {
self.handlers.crontabTaskHandler(taskConfig);
} else {
self._crontabTaskHanlder(taskConfig);
}
break;
}
default: {
console.log('unknow task type');
break;
}
}
});
app.listen(self.taskClientConfig.server.port, function () {
console.log('server start at: ' + self.taskClientConfig.server.port);
self._registerServer(self.taskClientConfig, function (error, result) {
if (error) {
console.log(error);
}
});
});
}
_daemonTaskHandler(taskConfig) {
let self = this;
let taskInfo = {
appName: self.appName,
name: taskConfig.name,
expires: taskConfig.timeout
};
let child = fork(path.join(self.taskHomePath,taskConfig.file));
self.childsInstance[taskInfo.name] = child;
child.on('message', function (message) {
switch (message.signal) {
case 'start': {
taskInfo.id = message.id;
self._startTask(taskInfo, function (error) {
if (error) {
console.log(error);
}
});
break;
}
case 'end': {
taskInfo.id = message.id;
self._endTask(taskInfo, function (error) {
if (error) {
console.log(error)
}
});
break;
}
default: {
console.log('unknow signal');
break;
}
}
});
}
_crontabTaskHanlder(taskConfig) {
let self = this;
let taskInfo = {
appName: self.appName,
name: taskConfig.name,
id: uuid.v4(),
expires: taskConfig.timeout
};
self.crontabJobsInstance[taskInfo.name + taskInfo.id] = new CronJob(taskConfig.crontabRule, function () {
self._startTask(taskInfo, function (error) {
if(error) {
console.log(error);
} else {
let child = fork(path.join(self.taskHomePath, taskConfig.file));
self.childsInstance[taskInfo.name + taskInfo.id] = child;
child.on('exit', function (code) {
// 子進(jìn)程退出 code 為 0杨刨,代表正常退出晤柄,這時(shí)可以向監(jiān)控平臺(tái)發(fā)送任務(wù)已完成信號(hào)
if(code === 0) {
self._endTask(taskInfo, function (error) {
if (error) {
console.log(error);
}
});
}
});
}
});
}, undefined, true);
}
_startTask(taskInfo, callback) {
let requestOptions = {
uri: this.taskClientConfig.management.host + ':' + this.taskClientConfig.management.port + '/tasks/status/start',
method: 'POST',
timeout: 5000,
form: taskInfo
};
request(requestOptions, function (error, response, body) {
return callback(error, body);
});
}
_endTask(taskInfo, callback) {
let requestOptions = {
uri: this.taskClientConfig.management.host + ':' + this.taskClientConfig.management.port + '/tasks/status/end',
method: 'POST',
timeout: 5000,
form: taskInfo
};
request(requestOptions, function (error, response, body) {
return callback(error, body);
});
}
_registerServer(taskClientConfig, callback) {
let requestOptions = {
uri: this.taskClientConfig.management.host + ':' + this.taskClientConfig.management.port + '/tasks/servers',
method: 'POST',
timeout: 5000,
form: taskClientConfig
};
request(requestOptions, function (error, response, body) {
return callback(error, body);
});
}
}
module.exports = TaskStatusManagementClient;
// 調(diào)度系統(tǒng)的使用
'use strict';
const config = {
// 監(jiān)控平臺(tái)信息,必須
management: {
host: 'http://127.0.0.1',
port: 3000
},
// 當(dāng)前服務(wù)器信息妖胀,必須
server: {
host: 'http://127.0.0.1',
port: 3001
},
// 當(dāng)前服務(wù)器任務(wù)文件地址(絕對(duì)路徑)芥颈,必須
taskHomePath: path.join(__dirname, 'tasks'),
// 任務(wù)配置信息惠勒,必須
tasks:[{
name: 'exampleTaskOne',
type: 'daemon',
file: 'example_task_one.js',
timeout: 10000
}, {
name: 'exampleTaskTwo',
type: 'crontab',
file: 'example_task_two.js',
crontabRule: '*/20 * * * * *', // 任務(wù)類型為 crontab 是此字段為必須
timeout: 10000
}]
};
let taskStatusManagementClient = new TaskStatusManagementClient(config);
taskStatusManagementClient.start();
監(jiān)控平臺(tái) HTTP 服務(wù)器比較簡(jiǎn)單,三個(gè) API浇借,用來將服務(wù)器信息、任務(wù)開始狀態(tài)怕品、任務(wù)結(jié)束狀態(tài)寫入數(shù)據(jù)庫妇垢,這里就不在贅述。
// 監(jiān)控平臺(tái)任務(wù)狀態(tài)監(jiān)管
'use strict';
const async = require('async');
const config = require('config');
const mail = require('nodemailer').createTransport({
service: 'your email service',
auth: {
user: 'your email username',
pass: 'your email password'
}
});
const request = require('request');
const logger = require('../log/logger');
const moment = require('../libs/moment');
// 一下三個(gè)為數(shù)據(jù)庫操作連接肉康,不需要關(guān)注內(nèi)部代碼闯估,不影響代碼閱讀
const TaskResult = require('../models/task_result');
const TaskStatus = require('../models/task_status');
const TaskServer = require('../models/task_server');
/**
* 發(fā)送告警郵件
* @param {String} to 郵件接收者
* @param {String} subject 郵件主題
* @param {String} message 郵件內(nèi)容
* @param {Function} callback
*/
function sendWarningMessage(to, subject, message, callback) {
var options = {
'from': 'xxx@xxx.xxx',
'to': to,
'subject': subject,
'text': message,
'encoding': 'UTF-8'
};
mail.sendMail(options, function(error) {
console.log('send message to: ' + to);
return callback(error);
});
}
/**
* 處理執(zhí)行成功任務(wù)
* @param {String} task 任務(wù)狀態(tài)對(duì)象
* @param {Function} callback
*/
function handleSingleSucceedTask(task, callback) {
TaskResult.increaseTaskSuccessCount(task.name, function (error) {
return callback(error);
});
}
/**
* 處理執(zhí)行失敗的任務(wù)
* @param {String} task 任務(wù)狀態(tài)對(duì)象
* @param {Function} callback
*/
function handleSingleErrorTask(task, callback) {
async.waterfall([
// 增加任務(wù)執(zhí)行失敗次數(shù)
async.apply(TaskResult.increaseTaskErrorCount.bind(TaskResult), task.name),
async.apply(TaskResult.getTaskErrorCount.bind(TaskResult), task.name),
function (count, callback) {
callback(undefined);
// 給調(diào)度系統(tǒng)發(fā)信號(hào)
TaskServer.findTaskServerHost(task.name, function (error, serverHost) {
// 超過限制失敗次數(shù),發(fā)送告警郵件
if (count >= config.get('task.limitErrorCount')) {
sendWarningMessage(config.get('task.noticeUserEmailAddress'), '定時(shí)任務(wù)告警', `${ serverHost }: "${ task.name }" 執(zhí)行失敗超過預(yù)定失敗次數(shù)`, function (error) {
if (error) {
logger.error(error);
}
});
TaskResult.resetTaskErrorCount(task.name, function (error) {
if (error) {
logger.error(error);
}
});
}
if (!error && serverHost) {
// send 'error' signal to task server
request({
uri: serverHost,
method: 'POST',
timeout: 5000,
form: {
name: task.name,
id: task.taskId,
pid: task.pid
}
}, function (error, response, body) {
if (error) {
logger.error(error);
}
});
}
});
}
], function (error) {
return callback(error);
});
}
/**
* 獲取所有已經(jīng)到達(dá)預(yù)定超時(shí)時(shí)間的任務(wù)并處理
*/
function process() {
let currentTime = new Date().getTime();
async.waterfall([
async.apply(TaskStatus.getExpiredTasks.bind(TaskStatus), currentTime),
// 并行處理每個(gè)任務(wù)
function (tasks, callback) {
if (tasks.length > 0) {
async.parallel(tasks.map(function (task) {
return function (callback) {
// 超時(shí)后任務(wù)狀態(tài)仍然為 running吼和,代表任務(wù)執(zhí)行失敗
if (task.status === 'running') {
handleSingleErrorTask(task, function (error) {
return callback(error);
});
} else {
handleSingleSucceedTask(task, function (error) {
return callback(error);
});
}
}
}), function (error) {
return callback(error);
});
} else {
return callback(new Error('no tasks need to exec'));
}
},
// 刪除已經(jīng)處理完成的任務(wù)
async.apply(TaskStatus.removeExpiredTasks.bind(TaskStatus), currentTime)
], function (error) {
if (error && error.message === 'no tasks need to exec') {
let delay = moment.millisecondToDayMinuteHourSecond(config.get('task.noTaskDelayTime'));
console.log(`no tasks, delay ${ delay }`);
setTimeout(process, config.get('task.noTaskDelayTime'));
} else if (error) {
logger.error(error);
process();
} else {
process();
}
});
}
module.exports = process;