現(xiàn)在我們了解到的設(shè)計(jì)模式能夠應(yīng)對(duì)很多情況潮针,不過針對(duì)一些特定的問題术荤,我們還有一些方案可以使?。就像做菜的菜譜?樣每篷,它提供了一個(gè)?概的步驟瓣戚,我們還可以?己創(chuàng)造一些,在這個(gè)章節(jié)焦读,我們主要提供以下情形的解決方案:
- 需要進(jìn)行異步初始化的模塊
- 以較小的開銷子库,在忙碌應(yīng)?用中實(shí)現(xiàn)?表現(xiàn)?的批量量異步緩存
- 運(yùn)?可能阻塞事件循環(huán)或者可能削弱Node.js處理并發(fā)請(qǐng)求能?的,同步計(jì)算密集(CPUbound)操作
處理需要異步初始化模塊的方案
我們?cè)诘?章提到過矗晃,require()和module.exports是同步工作的仑嗅,這是同步API存在于核?模塊和很多npm的庫中的?個(gè)主要原因。這樣的方案提供了了更方便的轉(zhuǎn)換機(jī)制张症,這也是最初使用同步而不是異步方案開初始化模塊的原型仓技。
不幸的是,有時(shí)候同步API并不不好用俗他,特別是在需要連接?網(wǎng)絡(luò)進(jìn)?初始化的時(shí)候脖捻,需要花費(fèi)精力去配置參數(shù)、使?握?進(jìn)?連接等等兆衅。這種場景存在于許多數(shù)據(jù)庫驅(qū)動(dòng)和類似消息隊(duì)列這樣的中間件里地沮。
標(biāo)準(zhǔn)解決方案
舉例,我們需要通過請(qǐng)求羡亩,連接?個(gè)叫db的遠(yuǎn)程數(shù)據(jù)庫摩疑,我們通常有兩種選擇:
- 在使?前先確保它已經(jīng)被初始化,否則就等它初始化完畢后再處理接下來的異步操作
const db = require('aDb'); // The async module
module.exports = function findAll(tyoe, callback) {
if (db.connected) {
runFild();
} else {
db.once('connected', runFild);
}
function runFild() {
db.findAll(type, callback);
}
};
- 使?依賴注?(Dependency Injection)?不是直接加載異步模塊夕春。延遲其他模塊的初始化直到它的依賴都加載完成未荒,這樣就轉(zhuǎn)移了了重點(diǎn)到別的地?
// in module app.js
const db = require('aDb');
const findAllFactory = require('./findAll');
db.on('connected', function(){
const findAll = findAllFactory(db);
});
// in module findAll.js
module.exports = db => {
return function findAll(type, callback) {
db.findAll(type, callback);
}
};
顯然第一種?方案就很不受歡迎,因?yàn)榘芏嘁?模板(boilerplate)
第?種?案有時(shí)候也不太好及志,在大型項(xiàng)目中使用依賴注入會(huì)很復(fù)雜片排,特別是需要手動(dòng)去初始化一些模塊的時(shí)候。不過使?用一些?持異步初始化的依賴注?容器會(huì)有些幫助速侈。
下?的第三種?案率寡,能夠輕易的幫助我們從初始化依賴?yán)锓蛛x出需要的模塊
預(yù)初始化隊(duì)列方案
方案的主要思想:把含未完成的初始化的模塊操作保存下來,一旦初始化步驟完成倚搬,就執(zhí)行行這些操作冶共。下?是?個(gè)例?:
- asyncModule.js(?個(gè)需要異步初始化的模塊)
const asyncModule = module.exports;
asyncModule.initialized = false;
asyncModule.initialize = callback => { // 一個(gè)延遲10s的初始化操作
setTimeout(() => {
asyncModule.initialized = true;
callback();
}, 10000);
};
asyncModule.tellMeSomething = callback => {
process.nextTick(() => {
if(!asyncModule.initialized) {
return callback(
new Error('I don\'t have anything to say right now') //如果還沒初始化
);
}
callback(null, 'Current time is: ' + new Date());
});
};
- routes.js(一個(gè)HTTP請(qǐng)求的處理模塊)
const asyncModule = require('./asyncModule');
module.exports.say = (req, res) => {
asyncModule.tellMeSomething((err, something) => {
if(err) {
res.writeHead(500);
return res.end('Error:' + err.message);
}
res.writeHead(200);
res.end('I say: ' + something);
});
};
這個(gè)HTTP處理模塊(handler)主要是觸發(fā)剛剛的tellMeSomething()方法并將結(jié)果寫入HTTP響應(yīng)頭。可以看到這?捅僵,我們r(jià)equire()這個(gè)異步的asyncModule時(shí)并沒有檢測它是不是已經(jīng)初始化了家卖,這就可能導(dǎo)致問題。
讓我們?基本的http核?模塊庙楚,創(chuàng)建?個(gè)HTTP server
- app.js
const http = require('http');
const routes = require('./routes');
const asyncModule = require('./asyncModule.js');
asyncModule.initialize(() => {
console.log('Async module initialized');
});
http.createServer((req, res) => {
if (req.method === 'GET' && req.url === '/say') {
return routes.say(req, res);
}
res.writeHead(404);
res.end('Not found');
}).listen(8000, () => console.log('Started'));
這個(gè)app.js是程序的入?上荡,它觸發(fā)了AsyncModule的初始化、利用routes.say()創(chuàng)建了一個(gè)
HTTPServer馒闷。運(yùn)?一下酪捡,正如想象,如果我們?cè)谶\(yùn)行server時(shí)纳账,快一點(diǎn)打開http://localhost:8000/say (也就是調(diào)?了say()的時(shí)候逛薇,AsyncModule還沒有異步初始化完成),我們會(huì)在瀏覽器上看到:
本例的顯示取決于異步初始化的細(xì)節(jié)疏虫,如果慢?點(diǎn)等初始化結(jié)束永罚,我們就能看到正確的結(jié)果了。我們應(yīng)該盡量避免這種問題议薪,因?yàn)檫@可能導(dǎo)致丟失信息尤蛮、程序崩潰等更嚴(yán)重的問題。雖然初始化一般很快斯议,我們?般不會(huì)去注意失敗的請(qǐng)求产捞,但是對(duì)于加載頻繁和自動(dòng)設(shè)置(autoscale)的云服務(wù)情況來說,這可能導(dǎo)致阻塞等更多問題哼御。
?用預(yù)加載隊(duì)列列去包裝模塊
我們?隊(duì)列去存放未初始化的操作坯临,這有點(diǎn)像狀態(tài)模式(State Pattern)。存在兩種狀態(tài):?一種是把未初始化的操作排隊(duì)恋昼,另一種是當(dāng)初始化完成時(shí)看靠,再把最后的方法委托給原模塊(也就是AsyncModule)
- AsyncModuleWarpper.js
const asyncModule = require('./asyncModule');
//The wrapper 給 activeState 設(shè)置分發(fā)操作
const asyncModuleWrapper = module.exports;
asyncModuleWrapper.initialized = false;
asyncModuleWrapper.initialize = function() {
activeState.initialize.apply(activeState, arguments);
};
asyncModuleWrapper.tellMeSomething = function() {
activeState.tellMeSomething.apply(activeState, arguments);
};
//Module沒有初始化成功的時(shí)候的state
let pending = [];
let notInitializedState = {
initialize: function(callback) {
asyncModule.initialize(function() {\
asyncModuleWrapper.initalized = true;
activeState = initializedState; // [1]更更新activeState的變量量
pending.forEach(function(req) { // [2]運(yùn)?行行存儲(chǔ)隊(duì)列列的操作
asyncModule[req.method].apply(null, req.args);
});
pending = [];
callback(); // [3]觸發(fā)原來的回調(diào)
});
},
tellMeSomething: function(callback) {
return pending.push({
method: 'tellMeSomething',
args: arguments
});
}
};
// The state to use when the module is initialized
let initializedState = asyncModule;
// Set the initial state to the notInitializedState
let activeState = notInitializedState; // 設(shè)置更新activeState
當(dāng)開始初始化的時(shí)候,我們提供了一個(gè)回調(diào)代理液肌,它讓我們的warpper知道什么時(shí)候已經(jīng)初始化好了挟炬,應(yīng)該進(jìn)?接下來的操作了。
可?代碼嗦哆,如果沒有初始化完成谤祖,tellMeSomething會(huì)放在pending隊(duì)列里等完成初始化再調(diào)?。
而且老速,通過這個(gè)模式我們可以清晰的知道AsyncModule這個(gè)模塊有沒有被初始化完成粥喜,因?yàn)橥瓿芍髸?huì)切換狀態(tài)為initializedState(倒數(shù)第二?代碼)
運(yùn)行一下:
通過隊(duì)列我們可以把為可能含初始化模塊的操作給掛起,這是?個(gè)更穩(wěn)健的行為橘券。
核?在于:如果要初始化異步模塊额湘,我們讓需要的操作排隊(duì)等待直到初始化完成卿吐。
在自然情況下
剛剛這種模式在很多數(shù)據(jù)庫驅(qū)動(dòng)和ORM庫中經(jīng)常使用,最著名的就是MongoDB的ORM庫
Moogoose了锋华,這樣就不需要等待數(shù)據(jù)庫的連接情況嗡官,因?yàn)檫@都?動(dòng)被放入隊(duì)列里,并且在初始化建立完成后執(zhí)?供置,這種方案增強(qiáng)了了API的可用性谨湘。
處理批量異步緩存的方案
在高負(fù)載的應(yīng)用中,緩存在web里里起到很重要的作用芥丧。無論是靜態(tài)的文件資源還是數(shù)據(jù)庫的查詢結(jié)果。在這個(gè)章節(jié)我們了解高吞吐量下的異步操作緩存坊罢。
暫時(shí)沒有緩存和批量量處理理的Server
這是一個(gè)管理理銷售情況的情景续担,JSON數(shù)據(jù)格式:transactionId{ amount, item}
- totalSales.js
const level = require('level');
const sublevel = require('level-sublevel');
const db = sublevel(level('example-db', {valueEncoding: 'json'}));
const salesDb = db.sublevel('sales');
module.exports = function totalSales(item, callback) {
console.log('totalSales() invoked');
let sum = 0;
salesDb.createValueStream() // [1] 從數(shù)據(jù)庫流式輸?入數(shù)據(jù)
.on('data', data => {
if(!item || data.item === item) { // [2] 如果同樣商品,求和
sum += data.amount;
}
})
.on('end', () => {
callback(null, sum); // [3]返回剛剛的求和結(jié)果
});
};
這種方案顯然表現(xiàn)不大好活孩,我們沒有用index去區(qū)分transaction物遇,求和計(jì)算也沒有?更?便的map或者reduce。接下來暴露接?給HTTP server:
- app.js
const http = require('http');
const url = require('url');
const totalSales = require('./totalSales');
//const totalSales = require('./totalSalesBatch');
//const totalSales = require('./totalSalesCache');
http.createServer((req, res) => {
const query = url.parse(req.url, true).query;
totalSales(query.item, (err, sum) => {
res.writeHead(200);
res.end(`Total sales for item ${query.item} is ${sum}`);
});
}).listen(8000, () => console.log('Started'));
接下來訪問http://localhost:8000/?item=book憾儒,順?造一點(diǎn)假數(shù)據(jù)更方便查看
批量處理異步請(qǐng)求
最基本的方案是使用相同的API處理?系列的調(diào)用询兴,通過回調(diào)的方式:
圖示兩個(gè)客戶端(可能是兩個(gè)不同的對(duì)象或者兩個(gè)不同的web請(qǐng)求)觸發(fā)了相同輸入的的異步操作,在這第一個(gè)圖中起趾,它們單獨(dú)發(fā)起?己的異步操作執(zhí)?完畢后通過回調(diào)返回诗舰。第?張圖針對(duì)這種相同輸?的異步操作進(jìn)?了批量處理,當(dāng)操作完成時(shí)训裆,通知兩個(gè)客戶端眶根,這種方式優(yōu)化了應(yīng)?的加載,避免了了使?復(fù)雜的緩存機(jī)制边琉,也避免了這種機(jī)制帶來的內(nèi)存管理和失效問題属百。
在這個(gè)web server使用批量處理請(qǐng)求
主要采?隊(duì)列的方案,把回調(diào)加?隊(duì)列变姨,等異步操作完成之后?塊兒觸發(fā)
- totalSalesBatch.js
const totalSales = require('./totalSales');
const queues = {};
module.exports = function totalSalesBatch(item, callback) {
if(queues[item]) { // [1] 隊(duì)列列存在族扰,請(qǐng)求正在執(zhí)?行行,回調(diào)加?入隊(duì)列列
console.log('Batching operation');
return queues[item].push(callback);
}
queues[item] = [callback]; // [2]我們需建?立新請(qǐng)求因?yàn)殛?duì)列列有內(nèi)容了了
totalSales(item, (err, res) => {
const queue = queues[item]; // [3]獲得操作結(jié)果后?一個(gè)個(gè)觸發(fā)回調(diào)
queues[item] = null;
queue.forEach(cb => cb(err, res));
});
};
緩存異步請(qǐng)求
剛剛的批量處理方案在著快速API和少量批處理請(qǐng)求的情況下會(huì)有點(diǎn)問題定欧。在這種情況下渔呵,采?緩存模式是比較好的方案。
緩存模式的核心思想是:請(qǐng)求完成后我們就把結(jié)果保存在緩存里(可以是一個(gè)變量忧额、?個(gè)數(shù)據(jù)庫入口等)厘肮,下一次就不請(qǐng)求了,直接從緩存里取出
緩存模式很常見睦番,但真正厲害的是把緩存模式和批處理模式結(jié)合起來使用:
由圖可知类茂,沒有緩存時(shí)耍属,和批處理模式是差不多的,不過當(dāng)請(qǐng)求完成后巩检,結(jié)果會(huì)被放入緩存里厚骗,方便下次直接從緩存里提取。
在這個(gè)web server使用緩存請(qǐng)求
- totalSalesCache.js (在批處理的基礎(chǔ)上加上緩存層)
const totalSales = require('./totalSales');
const queues = {};
const cache = {};
module.exports = function totalSalesBatch(item, callback) {
const cached = cache[item];
if (cached) { //如果緩存兢哭,?用回調(diào)返回领舰,注意?用的nextTick哦
console.log('Cache hit');
return process.nextTick(callback.bind(null, null, cached));
}
if (queues[item]) {
console.log('Batching operation');
return queues[item].push(callback);
}
queues[item] = [callback];
totalSales(item, (err, res) => {
if (!err) {
cache[item] = res;
setTimeout(() => {
delete cache[item];
}, 30 * 1000); //30 seconds expiry 設(shè)置了了超時(shí)時(shí)間
}
const queue = queues[item];
queues[item] = null;
queue.forEach(cb => cb(err, res));
});
};
緩存函數(shù)調(diào)?結(jié)果是實(shí)現(xiàn)記憶的?種方式,在npm中你可以找到很多方便用于異步存儲(chǔ)記憶的?具迟螺,比如memoizee
緩存機(jī)制的具體實(shí)現(xiàn)
在實(shí)際應(yīng)用中冲秽,我們需要更高級(jí)的存儲(chǔ)和釋放機(jī)制,有以下的原因:
- 緩存大量的結(jié)果會(huì)消耗大量內(nèi)存矩父,我們可以采用LRU(Least Recently Used)算法來只緩存最近的緩存的結(jié)果
- 當(dāng)應(yīng)用通過多進(jìn)程執(zhí)?時(shí)锉桑,不同server實(shí)例的緩存結(jié)果可能不一樣,解決方案是共享緩存窍株,流
?的方案有Redis和Memcached - 可以手動(dòng)的清理緩存民轴,或者進(jìn)?時(shí)間限制,從而避免時(shí)間過期緩存球订。雖然這會(huì)有點(diǎn)不易管理后裸。
使用Promise進(jìn)行批處理和緩存
在這種情況下,使用Promise有以下幾種優(yōu)勢:
- 一個(gè)Promise可以用多個(gè)then()方法冒滩,在這里?便執(zhí)行批處理請(qǐng)求
- then()方法最多觸發(fā)一次微驶,在這里剛好只緩存一次結(jié)果
- resolved的Promise也可以使用then(),在這里?便持久的獲取緩存值
- then()方法是異步調(diào)用的旦部,著這里?便異步返回結(jié)果
在這里祈搜,我們使用Promise包裝totalSale()接口: - totalSalesPromises.js
// [1]引?入promisification模塊,使返回Promise?而不不是回調(diào)
const pify = require('pify');
const totalSales = pify(require('./totalSales'));
const cache = {};
module.exports = function totalSalesPromises(item) {
if (cache[item]) { // [2]檢測緩存情況
return cache[item];
}
cache[item] = totalSales(item) // [3]新建Promise
.then(res => {
// [4]resolve這個(gè)Promise之后士八,設(shè)計(jì)緩存清理理
setTimeout(() => {delete cache[item]}, 30 * 1000);
//30 seconds expiry
return res;
})
.catch(err => { // [5]Promise被reject容燕,刪除緩存拋出錯(cuò)誤
delete cache[item];
throw err;
});
return cache[item]; // [6]返回緩存結(jié)果
};
可見使用Promise之后代碼變得優(yōu)雅簡潔,并且同時(shí)使用了批處理和緩存
處理計(jì)算密集型(CPU-bound)任務(wù)的方案
在第?章我們知道婚度,通過觸發(fā)異步操作我們讓堆椪好兀回到事件循環(huán),從而可以自由處理其它的請(qǐng)求蝗茁。
但是如果運(yùn)?一個(gè)同步請(qǐng)求時(shí)間很?(比如計(jì)算密集型任務(wù))醋虏,它可能久久不返回給事件循環(huán),因?yàn)樗谥囟壤肅PU而不是在頻繁使?I/O操作
解決子集求和問題(subset sum problem)
子集求和問題哮翘,即求一個(gè)集合的非空子集滿足和為0
最簡單的解決方案是把?集進(jìn)行排列組合颈嚼,這有2^n的復(fù)雜度,這就很計(jì)算復(fù)雜了
- subsetSum.js實(shí)現(xiàn)該算法
const EventEmitter = require('events').EventEmitter;
//繼承EventEmitter饭寺,每次實(shí)現(xiàn)匹配就就觸發(fā)?一下事件
class SubsetSum extends EventEmitter {
constructor(sum, set) {
super();
this.sum = sum;
this.set = set;
this.totalSubsets = 0;
}
//實(shí)現(xiàn)可能的組合阻课,注意它是同步的
_combine(set, subset) {
for(let i = 0; i < set.length; i++) {
let newSubset = subset.concat(set[i]);
this._combine(set.slice(i + 1), newSubset);
this._processSubset(newSubset);
}
}
//?一旦實(shí)現(xiàn)組合之后叫挟,確定是否匹配,匹配就emit?一個(gè)match事件
_processSubset(subset) {
console.log('Subset', ++this.totalSubsets, subset);
const res = subset.reduce((prev, item) => (prev + item), 0);
if(res == this.sum) {
this.emit('match', subset);
}
}
start() {
this._combine(this.set, []); //觸發(fā)同步的排列列組合
this.emit('end'); //所有排列列組合結(jié)果計(jì)算之后觸發(fā)end
}
}
module.exports = SubsetSum;
- app.js 把剛才的SubsetSum放在HTTP server里執(zhí)?
const http = require('http');
const SubsetSum = require('./subsetSum');
//const SubsetSum = require('./subsetSumDefer');
//const SubsetSum = require('./subsetSumFork');
http.createServer((req, res) => {
const url = require('url').parse(req.url, true);
if(url.pathname === '/subsetSum') {
const data = JSON.parse(url.query.data);
res.writeHead(200);
const subsetSum = new SubsetSum(url.query.sum, data);
subsetSum.on('match', match => {
res.write('Match: ' + JSON.stringify(match) + '\n');
});
subsetSum.on('end', () => res.end());
subsetSum.start();
} else {
res.writeHead(200);
res.end('I\m alive!\n');
}
}).listen(8000, () => console.log('Started'));
通過事件機(jī)制限煞,我們知道抹恳,當(dāng)算法執(zhí)行完畢時(shí),也就是要等?會(huì)兒署驻,會(huì)?自動(dòng)返回結(jié)果:
可以看到奋献,在算法返回結(jié)果時(shí)請(qǐng)求?直被掛起,也就是暫時(shí)都不會(huì)顯示I'm alive旺上,Node.js的單線程就這樣被一個(gè)?的同步計(jì)算操作阻塞瓶蚂。
使?用setImmediate實(shí)現(xiàn)交錯(cuò)
通常來說,計(jì)算密集型(CPU-bound)的算法都是由一系列步驟組成抚官,可能是遞歸調(diào)?扬跋、循環(huán)或者是和其他的組合。所以?個(gè)簡單的解決?案就是讓每一個(gè)(或者一定數(shù)量)步驟執(zhí)行完成之后把執(zhí)行權(quán)交給事件循環(huán)凌节。
核心思想是:通過setImmediate實(shí)現(xiàn)異步任務(wù)和密集計(jì)算的交錯(cuò)執(zhí)?
采?交錯(cuò)執(zhí)?的?集求和問題
- subsetSumDefer.js 在剛剛的基礎(chǔ)上修改
class SubsetSumDefer extends EventEmitter {
constructor(sum, set) {
super();
this.sum = sum;
this.set = set;
this.totalSubsets = 0;
}
//實(shí)現(xiàn)交錯(cuò)執(zhí)行的核心函數(shù)
_combineInterleaved(set, subset) {
this.runningCombine++; //需要新的參數(shù)來計(jì)數(shù)
setImmediate(() => {
this._combine(set, subset);
if(--this.runningCombine === 0) {
this.emit('end');
}
});
}
_combine(set, subset) {
for(let i = 0; i < set.length; i++) {
let newSubset = subset.concat(set[i]);
this._combineInterleaved(set.slice(i + 1), newSubset); //替換為step
this._processSubset(newSubset);
}
}
_processSubset(subset) {
console.log('Subset', ++this.totalSubsets, subset);
const res = subset.reduce((prev, item) => prev + item, 0);
if(res == this.sum) {
this.emit('match', subset);
}
}
start() {
this.runningCombine = 0; //計(jì)數(shù)參數(shù)確認(rèn)計(jì)算完畢
this._combineInterleaved(this.set, []);
}
}
交錯(cuò)執(zhí)行模式的思考
盡管剛剛的模式解決了阻塞的問題,但是它并不不是一個(gè)很好的模式洒试,因?yàn)檠舆t同步操作會(huì)帶來事件開銷倍奢,這可能導(dǎo)致?大的影響。特別是當(dāng)我們想盡快響應(yīng)用戶時(shí)垒棋,不不希望等待太久卒煞。我們可以設(shè)置step的數(shù)量,但這并不能從根本上解決問題叼架。
不過也不能因?yàn)樗袝r(shí)間開銷就完全放棄這種模式畔裕,實(shí)際上,如果同步操作順利乖订,偶爾執(zhí)行行異步的操作扮饶,這種利用setImmediate的模式還是可以被認(rèn)可的
注意process.nextTick() 不適用與這種交錯(cuò)的模式,正如第一章講到乍构,它在I/O之前執(zhí)?操作甜无,頻繁的調(diào)用可能導(dǎo)致I/O饑餓。
使?多進(jìn)程
另?個(gè)防止事件循環(huán)阻塞的方案是使用?進(jìn)程哥遮,從而不把昂貴的計(jì)算密集型任務(wù)運(yùn)行在主進(jìn)程上岂丘,這樣做有以下好處:
- 同步的任務(wù)能夠全心的去運(yùn)?,不用進(jìn)?交錯(cuò)執(zhí)?
- 使?多進(jìn)程會(huì)比setImmediate更簡單眠饮,而且不?考慮主程序的規(guī)模?小
- 如果需要更好的性能奥帘,我們甚?至可以用更底層的語言去實(shí)現(xiàn)(比如C)
Node.js有很多操作外部進(jìn)程的工具庫可以使用,比如可以使用child_process模塊仪召。甚至外部進(jìn)程可以是Node.js程序寨蹋,我們只需要做相關(guān)的連接松蒜。child_process.fork()方法創(chuàng)建的子進(jìn)程會(huì)?動(dòng)的創(chuàng)建進(jìn)程間的交流通道,還可以??一個(gè)類似EventEmitter的接?實(shí)現(xiàn)進(jìn)程間的信息交互
把子集求和任務(wù)交付其他進(jìn)程處理
核?思想是創(chuàng)建?個(gè)子進(jìn)程去處理同步任務(wù)钥庇,讓時(shí)間循環(huán)自由處理?絡(luò)請(qǐng)求牍鞠,下面提出可行的?案:
- 創(chuàng)建processPool.js 模塊來建立進(jìn)程池。因?yàn)殚_新進(jìn)程要花時(shí)間评姨,所以就讓他們?cè)谶M(jìn)程池?里運(yùn)行來等待請(qǐng)求难述,這樣可以節(jié)約時(shí)間和CPU。另外限制一下進(jìn)程的數(shù)量吐句,可以防止Dos攻擊
const fork = require('child_process').fork; //創(chuàng)建?子進(jìn)程
class ProcessPool {
constructor(file, poolMax) {
this.file = file;
this.poolMax = poolMax;
this.pool = []; //pool是?系列將被使?用的運(yùn)?的進(jìn)程
this.active = []; //active包含現(xiàn)在正在使用的進(jìn)程
this.waiting = []; //因?yàn)槿鄙龠M(jìn)程資源 等待的回調(diào)隊(duì)列
}
acquire(callback) {
let worker;
if(this.pool.length > 0) { // [1] 可?用的進(jìn)程就把它變?yōu)閍ctive
worker = this.pool.pop();
this.active.push(worker);
return process.nextTick(callback.bind(null, null, worker));
}
if(this.active.length >= this.poolMax) {
// [2]缺少進(jìn)程資源 等待
return this.waiting.push(callback);
}
worker = fork(this.file); // [3]如果進(jìn)程數(shù)還沒到最?大就再開?一個(gè)進(jìn)程
this.active.push(worker);
process.nextTick(callback.bind(null, null, worker));
}
release(worker) {
if(this.waiting.length > 0) { // [1]如果請(qǐng)求隊(duì)列列?里里有胁后,則賦給worker
const waitingCallback = this.waiting.shift();
waitingCallback(null, worker);
}
this.active = this.active.filter(w => worker !== w);
// [2]完成worker之后,放回池中
this.pool.push(worker);
}
}
module.exports = ProcessPool;
進(jìn)程?直在運(yùn)行不停止嗦枢,想要減少常駐內(nèi)存的使用攀芯、增強(qiáng)代碼的健壯性,可以:
- 在?段時(shí)間后清理閑置的進(jìn)程
- 設(shè)計(jì)?種機(jī)制文虏,將無響應(yīng)和崩潰的進(jìn)程重啟
- 接下來的subsetSumFork.js把?集求和的任務(wù)分發(fā)到子進(jìn)程侣诺。它的作用是和?進(jìn)程交流然后傳輸結(jié)果
const EventEmitter = require('events').EventEmitter;
const ProcessPool = require('./processPool');
//初始化進(jìn)程池,并且設(shè)置最?可用的進(jìn)程數(shù)為2
const workers = new ProcessPool(__dirname + '/subsetSumWorker.js', 2);
class SubsetSumFork extends EventEmitter {
constructor(sum, set) { //接收sum和set的EventEmitter
super();
this.sum = sum;
this.set = set;
}
start() { //觸發(fā)算法運(yùn)?行行
workers.acquire((err, worker) => { // [1]從進(jìn)程池獲取?一個(gè)?子進(jìn)程
worker.send({sum: this.sum, set: this.set}); //進(jìn)程通信的通道
const onMessage = msg => {
// [3]監(jiān)聽任務(wù)完成 刪掉onmessage然后放回進(jìn)程池
if (msg.event === 'end') {
worker.removeListener('message', onMessage);
workers.release(worker);
}
this.emit(msg.event, msg.data); // [4]?無縫傳遞信息
};
worker.on('message', onMessage); // [2]監(jiān)聽信息
});
}
}
module.exports = SubsetSumFork;
send()?法在子進(jìn)程也是可用的氧秘,這也被cluster模塊用來實(shí)現(xiàn)多線程分發(fā)HTTP server
- 最后我們的subsetSumWorker.js 需要?一個(gè)worker(子進(jìn)程)來執(zhí)?子集求和的算法年鸳,并且把它的算法結(jié)果傳給?進(jìn)程。
const SubsetSum = require('./subsetSum');
process.on('message', msg => {
// [1]從?進(jìn)程監(jiān)測信息丸相,?旦有信息我們就新建實(shí)例搔确,然后說明匹配
const subsetSum = new SubsetSum(msg.sum, msg.set);
subsetSum.on('match', data => {
// [2]?一個(gè)對(duì)象封裝匹配結(jié)果傳給父進(jìn)程
process.send({event: 'match', data: data});
});
subsetSum.on('end', data => {
process.send({event: 'end', data: data});
});
subsetSum.start();
});
可以看到,我們重?了原來的subsetSum(同步版本)灭忠,但是這次由于我們單獨(dú)開了進(jìn)程膳算,所以不?擔(dān)心事件循環(huán)被阻塞。
綜上弛作,可以看到涕蜂,應(yīng)用程序的一部分可以交付外部進(jìn)程去處理的。
不過當(dāng)子進(jìn)程不是Node.js程序時(shí)缆蝉,進(jìn)程間通信的通道可能就不可?了宇葱。我們可以?己通過流式輸入輸出協(xié)議設(shè)計(jì)接口,可以參考child_process的實(shí)現(xiàn)
多進(jìn)程模式的思考
我們可以并發(fā)的開兩個(gè)?己求和任務(wù)刊头,如果開3個(gè)會(huì)掛起?個(gè)直到其中一個(gè)任務(wù)完成黍瞧,這是因?yàn)槲覀冎霸O(shè)置了進(jìn)程的最大數(shù)。
可?多進(jìn)程模式比交錯(cuò)執(zhí)?模式更高效有力原杂,不過印颤,因?yàn)閱我辉O(shè)備對(duì)資源有硬性限制,它并不不?便擴(kuò)展穿肄。所以我們可以通過多個(gè)設(shè)備去實(shí)現(xiàn)分發(fā)加載任務(wù)年局。這也是我們接下來會(huì)提到的分布式架構(gòu)模式际看。