在實(shí)際的業(yè)務(wù)領(lǐng)域中,常常會異步處理一些任務(wù)鳞陨,這些任務(wù)的執(zhí)行時(shí)間可能會比我們的預(yù)期更長一點(diǎn)颠猴,這些任務(wù)有點(diǎn)需要及時(shí)處理,有的可能需要延時(shí)執(zhí)行坡贺。通常的做法是將這些任務(wù)放入某一種隊(duì)列里面官辈,把工作交給對應(yīng)的任務(wù)處理器去處理。這個(gè)隊(duì)列就是任務(wù)隊(duì)列遍坟,如先進(jìn)先出 (FIFO) 隊(duì)列钧萍、后進(jìn)先出 (LIFO) 隊(duì)列,優(yōu)先級 (Priority) 隊(duì)列以及延時(shí)(Delay)隊(duì)列.
先進(jìn)先出 (FIFO) 隊(duì)列
FIFO 隊(duì)列具有語義清晰政鼠,嚴(yán)格維持任務(wù)發(fā)送和接收的順序處理风瘦,易于實(shí)現(xiàn)等特點(diǎn)。Redis 的列表是簡單的字符串列表公般,按照插入順序排序万搔。允許用戶通過 LPUSH 和 RPUSH 以及 RPOP 和 LPOP 在列表的兩端推入和插入元素,此外還提供了 BRPOP 和BLPOP 連個(gè)阻塞命令官帘,非常適合用于 FIFO 隊(duì)列瞬雹。
定義任務(wù)實(shí)體
創(chuàng)建 taskModel.js 文件,該文件定義了任務(wù)實(shí)體刽虹。一個(gè)任務(wù)通常要包含酗捌,任務(wù)必要的標(biāo)識、任務(wù)的類型涌哲、任務(wù)的行為等等胖缤。
/**
* 任務(wù)實(shí)體
* New node file
*/
function Task(){
var id
var type;
var action;
this.setId = function(theyId){
id = theyId;
};
this.setType = function(theyType){
type = theyType;
};
this.setAction = function(theyAction){
action = theyAction;
};
this.getTaskInfo = function(){
return '我的任務(wù) ID 是: '+ this.id +", 任務(wù)類型 : "+ this.type+", 我的需要完成任務(wù)行為是:"+this.action;
};
}
module.exports = Task;
創(chuàng)建生產(chǎn)者和消費(fèi)者
創(chuàng)建 main.js 負(fù)責(zé)模擬生產(chǎn)者和消費(fèi)者,為了方便演示阀圾,這里分別使用 addTaskToQueue () 模擬生產(chǎn)者哪廓,getTaskFromQueue 模擬消費(fèi)者。
/**
* New node file
*/
var Task = require('./taskModel');
const schedule = require('node-schedule');
const app = require('express')();
const PORT = 6379,
HOST = '127.0.0.1',
OPTS = {}; // 配置項(xiàng)初烘,比如說設(shè)置密碼 {auth_pass:'123456'},
//生產(chǎn)者客戶端
const producterClient = require('redis').createClient(PORT, HOST, OPTS);
//消費(fèi)者客戶端
const consumerClient = producterClient.duplicate();
const QUEUE_NAME = 'queue:issue';
//添加任務(wù)到隊(duì)列
function addTaskToQueue(){
// 使用 redis 的 incr 生成 id
producterClient.incr('id', function(err, id) {
if(err){
console.log(err);
}else{
var task = new Task();
task.setId('Task:::'+id);
task.setType('MESSAGE')
task.setAction('打印任務(wù)創(chuàng)建的時(shí)間>>>創(chuàng)建時(shí)間:::'+ new Date());
producterClient.rpush([QUEUE_NAME, task.getTaskInfo()],function(err,reply) {
if(err){
console.log(err);
}else{
console.log('添加任務(wù):'+task.getTaskInfo());
}
});
}
});
}
//從任務(wù)隊(duì)列中獲取任務(wù)
function getTaskFromQueue(){
//使用 blpop 阻塞模式分俯,直到有數(shù)據(jù)返回哆料。blpop 命令每次只會從列表中
//彈出一個(gè)任務(wù),所以這保證了任務(wù)不會被重復(fù)執(zhí)行杏节。
producterClient.blpop(QUEUE_NAME, 4,function(err,reply) {
if(err){
console.log(err);
}else{
if(reply){
console.log('>>>>>>>>消費(fèi)任務(wù): '+ reply[1]);
}
}
});
}
// 生產(chǎn)者定時(shí)任務(wù)
let producterJob = schedule.scheduleJob('*/4 * * * * *', () => {
addTaskToQueue();
});
// 消費(fèi)者定時(shí)任務(wù)
let consumerJob = schedule.scheduleJob('*/2 * * * * *', () => {
getTaskFromQueue();
});
運(yùn)行結(jié)果:
總結(jié)
這里使用 List 的特性簡單的模擬了實(shí)現(xiàn)了 FIFO 隊(duì)列拢锹。實(shí)際應(yīng)用中任務(wù)的定義遠(yuǎn)遠(yuǎn)沒有這么簡單卒稳,除了將任務(wù)信息推送到隊(duì)列外他巨,還需要持久化任務(wù)信息。其次捻爷,還需要考慮到客戶端將取得任務(wù)后未及時(shí)處理宕機(jī)導(dǎo)致任務(wù)處理失敗也榄。再者司志,客戶端獲取任務(wù)的指令執(zhí)行成功但是客戶連接超時(shí)斷開連接導(dǎo)致任務(wù)丟失等情況。