bagpipe 源碼解析

這個也是樸大的一個解決異步并發(fā)
控制的包贞绵。
抄樸大的一段話者祖,來描述應用場景。
在node中屁商,我們可以十分方便地異步發(fā)起并行的調(diào)用烟很。使用下面的代碼,我們可以輕松發(fā)起100次異步調(diào)用

for(var i = 0;i < 100; i++){
  async();
}

但是如果并發(fā)量過大蜡镶,我們的下層服務器將會吃不消雾袱。如果是對文件系統(tǒng)進行大量并發(fā)調(diào)用,操作系統(tǒng)的文件描述符數(shù)量會瞬間用光官还,拋出如下芹橡,錯誤

Error: EMFILE, too many open files

可以看出,同步I/O和異步I/O的顯著差距:同步I/O因為每一個I/O都是彼此阻塞的望伦,在循環(huán)體中林说,總是一個接著一個的調(diào)用,不會出現(xiàn)消耗文件描述符太多的情況屯伞,同時性能也一下, 對于異步I/O腿箩,雖然并發(fā)容易實現(xiàn),但是由于太容易實現(xiàn)劣摇,依然需要控制珠移。換言之,盡管是要壓榨底層系統(tǒng)的性能末融,但是需要給與一定的過載保護钧惧,以防止過猶不及。
bagpipe的應用呢勾习,就是維護一個隊列來控制并發(fā)浓瞪。
具體應用代碼如下:

var Bagpipe = require('bagpipe');
var bagpipe = new Bagpipe(10);
for(var i = 1; i < 10; i++){
  bagpipe.push(async,  function(){
  });
}
bagpipe.on('full', function(length){
  console.log('底層系統(tǒng)處理不能及時完成,隊列堵塞巧婶,目前隊列長度' + length);
});

下面就來看一看bagpipe的源碼吧乾颁。
這里樸大的bagpipe是繼承了nodejs的EventEmitter

構(gòu)造函數(shù)

var Bagpipe = function (limit, options) {
  events.EventEmitter.call(this);
  // 活躍的任務數(shù)(并發(fā)數(shù))
  this.limit = limit;
  // 當前活躍的任務數(shù)量
  this.active = 0;
  // 異步處理的隊列
  this.queue = [];
  // 一些配置信息
  this.options = {
    // 是否應用控制并發(fā)
    disabled: false,
    // 如果異步事件長度超過了queueLength是否還進入隊列
    refuse: false,
    // 根據(jù)limit來算隊列的長度,用于refuse
    ratio: 1,
    // 超時的時間粹舵,如果超過了這個時間任務失敗
    timeout: null
  };
  if (typeof options === 'boolean') {
    options = {
      disabled: options
    };
  }
  options = options || {};
  for (var key in this.options) {
    if (options.hasOwnProperty(key)) {
      this.options[key] = options[key];
    }
  }
  // queue length
  this.queueLength = Math.round(this.limit * (this.options.ratio || 1));
};
util.inherits(Bagpipe, events.EventEmitter);

push 方法(添加異步任務)

Bagpipe.prototype.push = function (method) {
  // 處理異步任務method之外的其他參數(shù)
  var args = [].slice.call(arguments, 1);
  // 異步任務的回調(diào)函數(shù)
  var callback = args[args.length - 1];
  if (typeof callback !== 'function') {
    args.push(function () {});
  }
  // 如果不控制钮孵,或者limit小于1,那么就立即執(zhí)行
  if (this.options.disabled || this.limit < 1) {
    method.apply(null, args);
    return this;
  }

  // 隊列長度也超過限制值時
  if (this.queue.length < this.queueLength || !this.options.refuse) {
    this.queue.push({
      method: method,
      args: args
    });
  } else {
    var err = new Error('Too much async call in queue');
    err.name = 'TooMuchAsyncCallError';
    callback(err);
  }

  if (this.queue.length > 1) {
    this.emit('full', this.queue.length);
  }
  // 執(zhí)行next方法眼滤,來檢測是否可以執(zhí)行一個異步任務
  this.next();
  return this;
};

next方法巴席,用于檢測是否可以并且有異步任務執(zhí)行

Bagpipe.prototype.next = function () {
  var that = this;
  // 活躍的任務數(shù)小于限制數(shù)
  if (that.active < that.limit && that.queue.length) {
    var req = that.queue.shift();
    //執(zhí)行異步任務
    that.run(req.method, req.args);
  }
};

// 異步任務執(zhí)行成功結(jié)束,之后調(diào)用的內(nèi)部方法
Bagpipe.prototype._next = function () {
  //活躍數(shù)減一
  this.active--;
  this.next();
};

執(zhí)行異步任務 run

Bagpipe.prototype.run = function (method, args) {
  var that = this;
  // 活躍數(shù)诅需,并行執(zhí)行的任務數(shù)加一
  that.active++;
  var callback = args[args.length - 1];
  var timer = null;
  var called = false;

  // 添加超時邏輯
  args[args.length - 1] = function (err) {
    // anyway, clear the timer
    // 如果有timer漾唉,就要clear掉這個timer
    if (timer) {
      clearTimeout(timer);
      timer = null;
    }
    // 沒有過期之前
    if (!called) {
      that._next();
      callback.apply(null, arguments);
    // 過期了荧库,所以執(zhí)行拋錯
    } else {
      // pass the outdated error
      if (err) {
        that.emit('outdated', err);
      }
    }
  };

  // 設置一個timer,來防止超時
  var timeout = that.options.timeout;
  if (timeout) {
    timer = setTimeout(function () {
      // set called as true
      called = true;
      that._next();
      // pass the exception
      var err = new Error(timeout + 'ms timeout');
      err.name = 'BagpipeTimeoutError';
      err.data = {
        name: method.name,
        method: method.toString(),
        args: args.slice(0, -1)
      };
      callback(err);
    }, timeout);
  }
  // 執(zhí)行異步任務
  method.apply(null, args);
};

其實在bagpipe中維護一個異步并發(fā)任務的隊列赵刑,來使得最大的并發(fā)數(shù)也小于limit的數(shù)分衫。

最后編輯于
?著作權歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市般此,隨后出現(xiàn)的幾起案子蚪战,更是在濱河造成了極大的恐慌,老刑警劉巖铐懊,帶你破解...
    沈念sama閱讀 221,331評論 6 515
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件邀桑,死亡現(xiàn)場離奇詭異,居然都是意外死亡科乎,警方通過查閱死者的電腦和手機壁畸,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 94,372評論 3 398
  • 文/潘曉璐 我一進店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來茅茂,“玉大人捏萍,你說我怎么就攤上這事】障校” “怎么了令杈?”我有些...
    開封第一講書人閱讀 167,755評論 0 360
  • 文/不壞的土叔 我叫張陵,是天一觀的道長进副。 經(jīng)常有香客問我这揣,道長悔常,這世上最難降的妖魔是什么影斑? 我笑而不...
    開封第一講書人閱讀 59,528評論 1 296
  • 正文 為了忘掉前任,我火速辦了婚禮机打,結(jié)果婚禮上矫户,老公的妹妹穿的比我還像新娘。我一直安慰自己残邀,他們只是感情好皆辽,可當我...
    茶點故事閱讀 68,526評論 6 397
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著芥挣,像睡著了一般驱闷。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上空免,一...
    開封第一講書人閱讀 52,166評論 1 308
  • 那天空另,我揣著相機與錄音,去河邊找鬼蹋砚。 笑死扼菠,一個胖子當著我的面吹牛摄杂,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播循榆,決...
    沈念sama閱讀 40,768評論 3 421
  • 文/蒼蘭香墨 我猛地睜開眼析恢,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了秧饮?” 一聲冷哼從身側(cè)響起映挂,我...
    開封第一講書人閱讀 39,664評論 0 276
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎盗尸,沒想到半個月后袖肥,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 46,205評論 1 319
  • 正文 獨居荒郊野嶺守林人離奇死亡振劳,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 38,290評論 3 340
  • 正文 我和宋清朗相戀三年椎组,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片历恐。...
    茶點故事閱讀 40,435評論 1 352
  • 序言:一個原本活蹦亂跳的男人離奇死亡寸癌,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出弱贼,到底是詐尸還是另有隱情蒸苇,我是刑警寧澤,帶...
    沈念sama閱讀 36,126評論 5 349
  • 正文 年R本政府宣布吮旅,位于F島的核電站溪烤,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏庇勃。R本人自食惡果不足惜檬嘀,卻給世界環(huán)境...
    茶點故事閱讀 41,804評論 3 333
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望责嚷。 院中可真熱鬧鸳兽,春花似錦、人聲如沸罕拂。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,276評論 0 23
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽爆班。三九已至衷掷,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間柿菩,已是汗流浹背戚嗅。 一陣腳步聲響...
    開封第一講書人閱讀 33,393評論 1 272
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人渡处。 一個月前我還...
    沈念sama閱讀 48,818評論 3 376
  • 正文 我出身青樓镜悉,卻偏偏與公主長得像,于是被迫代替她去往敵國和親医瘫。 傳聞我的和親對象是個殘疾皇子侣肄,可洞房花燭夜當晚...
    茶點故事閱讀 45,442評論 2 359

推薦閱讀更多精彩內(nèi)容