這個也是樸大的一個解決異步并發(fā)
控制的包贞绵。
抄樸大的一段話者祖,來描述應用場景。
在node中屁商,我們可以十分方便地異步發(fā)起并行的調(diào)用烟很。使用下面的代碼,我們可以輕松發(fā)起100次異步調(diào)用
for(var i = 0;i < 100; i++){
async();
}
但是如果并發(fā)量過大蜡镶,我們的下層服務器將會吃不消雾袱。如果是對文件系統(tǒng)進行大量并發(fā)調(diào)用,操作系統(tǒng)的文件描述符數(shù)量會瞬間用光官还,拋出如下芹橡,錯誤
Error: EMFILE, too many open files
可以看出,同步I/O和異步I/O的顯著差距:同步I/O因為每一個I/O都是彼此阻塞的望伦,在循環(huán)體中林说,總是一個接著一個的調(diào)用,不會出現(xiàn)消耗文件描述符太多的情況屯伞,同時性能也一下, 對于異步I/O腿箩,雖然并發(fā)容易實現(xiàn),但是由于太容易實現(xiàn)劣摇,依然需要控制珠移。換言之,盡管是要壓榨底層系統(tǒng)的性能末融,但是需要給與一定的過載保護钧惧,以防止過猶不及。
bagpipe的應用呢勾习,就是維護一個隊列來控制并發(fā)浓瞪。
具體應用代碼如下:
var Bagpipe = require('bagpipe');
var bagpipe = new Bagpipe(10);
for(var i = 1; i < 10; i++){
bagpipe.push(async, function(){
});
}
bagpipe.on('full', function(length){
console.log('底層系統(tǒng)處理不能及時完成,隊列堵塞巧婶,目前隊列長度' + length);
});
下面就來看一看bagpipe的源碼吧乾颁。
這里樸大的bagpipe是繼承了nodejs的EventEmitter
構(gòu)造函數(shù)
var Bagpipe = function (limit, options) {
events.EventEmitter.call(this);
// 活躍的任務數(shù)(并發(fā)數(shù))
this.limit = limit;
// 當前活躍的任務數(shù)量
this.active = 0;
// 異步處理的隊列
this.queue = [];
// 一些配置信息
this.options = {
// 是否應用控制并發(fā)
disabled: false,
// 如果異步事件長度超過了queueLength是否還進入隊列
refuse: false,
// 根據(jù)limit來算隊列的長度,用于refuse
ratio: 1,
// 超時的時間粹舵,如果超過了這個時間任務失敗
timeout: null
};
if (typeof options === 'boolean') {
options = {
disabled: options
};
}
options = options || {};
for (var key in this.options) {
if (options.hasOwnProperty(key)) {
this.options[key] = options[key];
}
}
// queue length
this.queueLength = Math.round(this.limit * (this.options.ratio || 1));
};
util.inherits(Bagpipe, events.EventEmitter);
push 方法(添加異步任務)
Bagpipe.prototype.push = function (method) {
// 處理異步任務method之外的其他參數(shù)
var args = [].slice.call(arguments, 1);
// 異步任務的回調(diào)函數(shù)
var callback = args[args.length - 1];
if (typeof callback !== 'function') {
args.push(function () {});
}
// 如果不控制钮孵,或者limit小于1,那么就立即執(zhí)行
if (this.options.disabled || this.limit < 1) {
method.apply(null, args);
return this;
}
// 隊列長度也超過限制值時
if (this.queue.length < this.queueLength || !this.options.refuse) {
this.queue.push({
method: method,
args: args
});
} else {
var err = new Error('Too much async call in queue');
err.name = 'TooMuchAsyncCallError';
callback(err);
}
if (this.queue.length > 1) {
this.emit('full', this.queue.length);
}
// 執(zhí)行next方法眼滤,來檢測是否可以執(zhí)行一個異步任務
this.next();
return this;
};
next方法巴席,用于檢測是否可以并且有異步任務執(zhí)行
Bagpipe.prototype.next = function () {
var that = this;
// 活躍的任務數(shù)小于限制數(shù)
if (that.active < that.limit && that.queue.length) {
var req = that.queue.shift();
//執(zhí)行異步任務
that.run(req.method, req.args);
}
};
// 異步任務執(zhí)行成功結(jié)束,之后調(diào)用的內(nèi)部方法
Bagpipe.prototype._next = function () {
//活躍數(shù)減一
this.active--;
this.next();
};
執(zhí)行異步任務 run
Bagpipe.prototype.run = function (method, args) {
var that = this;
// 活躍數(shù)诅需,并行執(zhí)行的任務數(shù)加一
that.active++;
var callback = args[args.length - 1];
var timer = null;
var called = false;
// 添加超時邏輯
args[args.length - 1] = function (err) {
// anyway, clear the timer
// 如果有timer漾唉,就要clear掉這個timer
if (timer) {
clearTimeout(timer);
timer = null;
}
// 沒有過期之前
if (!called) {
that._next();
callback.apply(null, arguments);
// 過期了荧库,所以執(zhí)行拋錯
} else {
// pass the outdated error
if (err) {
that.emit('outdated', err);
}
}
};
// 設置一個timer,來防止超時
var timeout = that.options.timeout;
if (timeout) {
timer = setTimeout(function () {
// set called as true
called = true;
that._next();
// pass the exception
var err = new Error(timeout + 'ms timeout');
err.name = 'BagpipeTimeoutError';
err.data = {
name: method.name,
method: method.toString(),
args: args.slice(0, -1)
};
callback(err);
}, timeout);
}
// 執(zhí)行異步任務
method.apply(null, args);
};
其實在bagpipe中維護一個異步并發(fā)任務的隊列赵刑,來使得最大的并發(fā)數(shù)也小于limit的數(shù)分衫。