Chapter 9 Advanced Asynchronous Recipes (高級(jí)異步編程方案)

現(xiàn)在我們了解到的設(shè)計(jì)模式能夠應(yīng)對(duì)很多情況潮针,不過針對(duì)一些特定的問題术荤,我們還有一些方案可以使?。就像做菜的菜譜?樣每篷,它提供了一個(gè)?概的步驟瓣戚,我們還可以?己創(chuàng)造一些,在這個(gè)章節(jié)焦读,我們主要提供以下情形的解決方案:

  • 需要進(jìn)行異步初始化的模塊
  • 以較小的開銷子库,在忙碌應(yīng)?用中實(shí)現(xiàn)?表現(xiàn)?的批量量異步緩存
  • 運(yùn)?可能阻塞事件循環(huán)或者可能削弱Node.js處理并發(fā)請(qǐng)求能?的,同步計(jì)算密集(CPUbound)操作

處理需要異步初始化模塊的方案

我們?cè)诘?章提到過矗晃,require()和module.exports是同步工作的仑嗅,這是同步API存在于核?模塊和很多npm的庫中的?個(gè)主要原因。這樣的方案提供了了更方便的轉(zhuǎn)換機(jī)制张症,這也是最初使用同步而不是異步方案開初始化模塊的原型仓技。
不幸的是,有時(shí)候同步API并不不好用俗他,特別是在需要連接?網(wǎng)絡(luò)進(jìn)?初始化的時(shí)候脖捻,需要花費(fèi)精力去配置參數(shù)、使?握?進(jìn)?連接等等兆衅。這種場景存在于許多數(shù)據(jù)庫驅(qū)動(dòng)和類似消息隊(duì)列這樣的中間件里地沮。

標(biāo)準(zhǔn)解決方案

舉例,我們需要通過請(qǐng)求羡亩,連接?個(gè)叫db的遠(yuǎn)程數(shù)據(jù)庫摩疑,我們通常有兩種選擇:

  • 在使?前先確保它已經(jīng)被初始化,否則就等它初始化完畢后再處理接下來的異步操作
const db = require('aDb');  //  The async module
module.exports = function findAll(tyoe, callback) {
  if (db.connected) {
    runFild();
  } else {
    db.once('connected', runFild);
  }
  function runFild() {
    db.findAll(type, callback);
  }
};
  • 使?依賴注?(Dependency Injection)?不是直接加載異步模塊夕春。延遲其他模塊的初始化直到它的依賴都加載完成未荒,這樣就轉(zhuǎn)移了了重點(diǎn)到別的地?
//  in module app.js
const db = require('aDb');
const findAllFactory = require('./findAll');
db.on('connected', function(){
  const findAll = findAllFactory(db);
});
//  in module findAll.js
module.exports = db => {
  return function findAll(type, callback) {
    db.findAll(type, callback);
  }
};

顯然第一種?方案就很不受歡迎,因?yàn)榘芏嘁?模板(boilerplate)
第?種?案有時(shí)候也不太好及志,在大型項(xiàng)目中使用依賴注入會(huì)很復(fù)雜片排,特別是需要手動(dòng)去初始化一些模塊的時(shí)候。不過使?用一些?持異步初始化的依賴注?容器會(huì)有些幫助速侈。
下?的第三種?案率寡,能夠輕易的幫助我們從初始化依賴?yán)锓蛛x出需要的模塊

預(yù)初始化隊(duì)列方案

方案的主要思想:把含未完成的初始化的模塊操作保存下來,一旦初始化步驟完成倚搬,就執(zhí)行行這些操作冶共。下?是?個(gè)例?:

  • asyncModule.js(?個(gè)需要異步初始化的模塊)
const asyncModule = module.exports;

asyncModule.initialized = false;
asyncModule.initialize = callback => {  //  一個(gè)延遲10s的初始化操作
  setTimeout(() => {
    asyncModule.initialized = true;
    callback();
  }, 10000);
};

asyncModule.tellMeSomething = callback => {
  process.nextTick(() => {
    if(!asyncModule.initialized) {
      return callback(
        new Error('I don\'t have anything to say right now') //如果還沒初始化
      );
    }
    callback(null, 'Current time is: ' + new Date());
  });
};
  • routes.js(一個(gè)HTTP請(qǐng)求的處理模塊)
const asyncModule = require('./asyncModule');

module.exports.say = (req, res) => {
  asyncModule.tellMeSomething((err, something) => {
    if(err) {
      res.writeHead(500);
      return res.end('Error:' + err.message);
    }
    res.writeHead(200);
    res.end('I say: ' + something);
  });
};

這個(gè)HTTP處理模塊(handler)主要是觸發(fā)剛剛的tellMeSomething()方法并將結(jié)果寫入HTTP響應(yīng)頭。可以看到這?捅僵,我們r(jià)equire()這個(gè)異步的asyncModule時(shí)并沒有檢測它是不是已經(jīng)初始化了家卖,這就可能導(dǎo)致問題。
讓我們?基本的http核?模塊庙楚,創(chuàng)建?個(gè)HTTP server

  • app.js
const http = require('http');
const routes = require('./routes');
const asyncModule = require('./asyncModule.js');

asyncModule.initialize(() => {
  console.log('Async module initialized');
});

http.createServer((req, res) => {
  if (req.method === 'GET' && req.url === '/say') {
    return routes.say(req, res);
  }
  res.writeHead(404);
  res.end('Not found');
}).listen(8000, () => console.log('Started'));

這個(gè)app.js是程序的入?上荡,它觸發(fā)了AsyncModule的初始化、利用routes.say()創(chuàng)建了一個(gè)
HTTPServer馒闷。運(yùn)?一下酪捡,正如想象,如果我們?cè)谶\(yùn)行server時(shí)纳账,快一點(diǎn)打開http://localhost:8000/say (也就是調(diào)?了say()的時(shí)候逛薇,AsyncModule還沒有異步初始化完成),我們會(huì)在瀏覽器上看到:

image.png

本例的顯示取決于異步初始化的細(xì)節(jié)疏虫,如果慢?點(diǎn)等初始化結(jié)束永罚,我們就能看到正確的結(jié)果了。我們應(yīng)該盡量避免這種問題议薪,因?yàn)檫@可能導(dǎo)致丟失信息尤蛮、程序崩潰等更嚴(yán)重的問題。雖然初始化一般很快斯议,我們?般不會(huì)去注意失敗的請(qǐng)求产捞,但是對(duì)于加載頻繁和自動(dòng)設(shè)置(autoscale)的云服務(wù)情況來說,這可能導(dǎo)致阻塞等更多問題哼御。

?用預(yù)加載隊(duì)列列去包裝模塊

我們?隊(duì)列去存放未初始化的操作坯临,這有點(diǎn)像狀態(tài)模式(State Pattern)。存在兩種狀態(tài):?一種是把未初始化的操作排隊(duì)恋昼,另一種是當(dāng)初始化完成時(shí)看靠,再把最后的方法委托給原模塊(也就是AsyncModule)

  • AsyncModuleWarpper.js
const asyncModule = require('./asyncModule');

//The wrapper 給 activeState 設(shè)置分發(fā)操作
const asyncModuleWrapper = module.exports;
asyncModuleWrapper.initialized = false;
asyncModuleWrapper.initialize = function() {
  activeState.initialize.apply(activeState, arguments);
};

asyncModuleWrapper.tellMeSomething = function() {
  activeState.tellMeSomething.apply(activeState, arguments);
};

//Module沒有初始化成功的時(shí)候的state
let pending = [];
let notInitializedState = {
  initialize: function(callback) {
    asyncModule.initialize(function() {\
      asyncModuleWrapper.initalized = true;
      activeState = initializedState; //  [1]更更新activeState的變量量

      pending.forEach(function(req) { //  [2]運(yùn)?行行存儲(chǔ)隊(duì)列列的操作
        asyncModule[req.method].apply(null, req.args);
      });
      pending = [];

      callback(); //  [3]觸發(fā)原來的回調(diào)
    });
  },

  tellMeSomething: function(callback) {
    return pending.push({
      method: 'tellMeSomething',
      args: arguments
    });
  }
};

//  The state to use when the module is initialized
let initializedState = asyncModule;

//  Set the initial state to the notInitializedState
let activeState = notInitializedState; // 設(shè)置更新activeState

當(dāng)開始初始化的時(shí)候,我們提供了一個(gè)回調(diào)代理液肌,它讓我們的warpper知道什么時(shí)候已經(jīng)初始化好了挟炬,應(yīng)該進(jìn)?接下來的操作了。
可?代碼嗦哆,如果沒有初始化完成谤祖,tellMeSomething會(huì)放在pending隊(duì)列里等完成初始化再調(diào)?。
而且老速,通過這個(gè)模式我們可以清晰的知道AsyncModule這個(gè)模塊有沒有被初始化完成粥喜,因?yàn)橥瓿芍髸?huì)切換狀態(tài)為initializedState(倒數(shù)第二?代碼)
運(yùn)行一下:

image.png

通過隊(duì)列我們可以把為可能含初始化模塊的操作給掛起,這是?個(gè)更穩(wěn)健的行為橘券。
核?在于:如果要初始化異步模塊额湘,我們讓需要的操作排隊(duì)等待直到初始化完成卿吐。
在自然情況下
剛剛這種模式在很多數(shù)據(jù)庫驅(qū)動(dòng)和ORM庫中經(jīng)常使用,最著名的就是MongoDB的ORM庫
Moogoose了锋华,這樣就不需要等待數(shù)據(jù)庫的連接情況嗡官,因?yàn)檫@都?動(dòng)被放入隊(duì)列里,并且在初始化建立完成后執(zhí)?供置,這種方案增強(qiáng)了了API的可用性谨湘。

處理批量異步緩存的方案

在高負(fù)載的應(yīng)用中,緩存在web里里起到很重要的作用芥丧。無論是靜態(tài)的文件資源還是數(shù)據(jù)庫的查詢結(jié)果。在這個(gè)章節(jié)我們了解高吞吐量下的異步操作緩存坊罢。

暫時(shí)沒有緩存和批量量處理理的Server

這是一個(gè)管理理銷售情況的情景续担,JSON數(shù)據(jù)格式:transactionId{ amount, item}

  • totalSales.js
const level = require('level');
const sublevel = require('level-sublevel');

const db = sublevel(level('example-db', {valueEncoding: 'json'}));
const salesDb = db.sublevel('sales');

module.exports = function totalSales(item, callback) {
    console.log('totalSales() invoked');
    let sum = 0;
    salesDb.createValueStream() // [1] 從數(shù)據(jù)庫流式輸?入數(shù)據(jù)
    .on('data', data => {
      if(!item || data.item === item) { // [2] 如果同樣商品,求和
        sum += data.amount;
      }
    })
    .on('end', () => {
      callback(null, sum); // [3]返回剛剛的求和結(jié)果
    });
};

這種方案顯然表現(xiàn)不大好活孩,我們沒有用index去區(qū)分transaction物遇,求和計(jì)算也沒有?更?便的map或者reduce。接下來暴露接?給HTTP server:

  • app.js
const http = require('http');
const url = require('url');
const totalSales = require('./totalSales');
//const totalSales = require('./totalSalesBatch');
//const totalSales = require('./totalSalesCache');

http.createServer((req, res) => {
  const query = url.parse(req.url, true).query;
  totalSales(query.item, (err, sum) => {
    res.writeHead(200);
    res.end(`Total sales for item ${query.item} is ${sum}`);
  });
}).listen(8000, () => console.log('Started'));

接下來訪問http://localhost:8000/?item=book憾儒,順?造一點(diǎn)假數(shù)據(jù)更方便查看

批量處理異步請(qǐng)求

最基本的方案是使用相同的API處理?系列的調(diào)用询兴,通過回調(diào)的方式:


image.png

圖示兩個(gè)客戶端(可能是兩個(gè)不同的對(duì)象或者兩個(gè)不同的web請(qǐng)求)觸發(fā)了相同輸入的的異步操作,在這第一個(gè)圖中起趾,它們單獨(dú)發(fā)起?己的異步操作執(zhí)?完畢后通過回調(diào)返回诗舰。第?張圖針對(duì)這種相同輸?的異步操作進(jìn)?了批量處理,當(dāng)操作完成時(shí)训裆,通知兩個(gè)客戶端眶根,這種方式優(yōu)化了應(yīng)?的加載,避免了了使?復(fù)雜的緩存機(jī)制边琉,也避免了這種機(jī)制帶來的內(nèi)存管理和失效問題属百。


image.png

在這個(gè)web server使用批量處理請(qǐng)求

主要采?隊(duì)列的方案,把回調(diào)加?隊(duì)列变姨,等異步操作完成之后?塊兒觸發(fā)

  • totalSalesBatch.js
const totalSales = require('./totalSales');
const queues = {};

module.exports = function totalSalesBatch(item, callback) {
  if(queues[item]) { // [1] 隊(duì)列列存在族扰,請(qǐng)求正在執(zhí)?行行,回調(diào)加?入隊(duì)列列
    console.log('Batching operation');
    return queues[item].push(callback);
  }
  queues[item] = [callback]; // [2]我們需建?立新請(qǐng)求因?yàn)殛?duì)列列有內(nèi)容了了
  totalSales(item, (err, res) => {
    const queue = queues[item]; // [3]獲得操作結(jié)果后?一個(gè)個(gè)觸發(fā)回調(diào)
    queues[item] = null;
    queue.forEach(cb => cb(err, res));
  });
};
緩存異步請(qǐng)求

剛剛的批量處理方案在著快速API和少量批處理請(qǐng)求的情況下會(huì)有點(diǎn)問題定欧。在這種情況下渔呵,采?緩存模式是比較好的方案。
緩存模式的核心思想是:請(qǐng)求完成后我們就把結(jié)果保存在緩存里(可以是一個(gè)變量忧额、?個(gè)數(shù)據(jù)庫入口等)厘肮,下一次就不請(qǐng)求了,直接從緩存里取出
緩存模式很常見睦番,但真正厲害的是把緩存模式和批處理模式結(jié)合起來使用:


image.png

由圖可知类茂,沒有緩存時(shí)耍属,和批處理模式是差不多的,不過當(dāng)請(qǐng)求完成后巩检,結(jié)果會(huì)被放入緩存里厚骗,方便下次直接從緩存里提取。

在這個(gè)web server使用緩存請(qǐng)求

  • totalSalesCache.js (在批處理的基礎(chǔ)上加上緩存層)
const totalSales = require('./totalSales');
const queues = {};
const cache = {};

module.exports = function totalSalesBatch(item, callback) {
    const cached = cache[item];
    if (cached) { //如果緩存兢哭,?用回調(diào)返回领舰,注意?用的nextTick哦
      console.log('Cache hit');
      return process.nextTick(callback.bind(null, null, cached));
    }
    if (queues[item]) {
      console.log('Batching operation');
      return queues[item].push(callback);
    }
    queues[item] = [callback];
    totalSales(item, (err, res) => {
    if (!err) {
      cache[item] = res;
        setTimeout(() => {
        delete cache[item];
      }, 30 * 1000); //30 seconds expiry 設(shè)置了了超時(shí)時(shí)間
    }
    const queue = queues[item];
    queues[item] = null;
    queue.forEach(cb => cb(err, res));
  });
};

緩存函數(shù)調(diào)?結(jié)果是實(shí)現(xiàn)記憶的?種方式,在npm中你可以找到很多方便用于異步存儲(chǔ)記憶的?具迟螺,比如memoizee

緩存機(jī)制的具體實(shí)現(xiàn)

在實(shí)際應(yīng)用中冲秽,我們需要更高級(jí)的存儲(chǔ)和釋放機(jī)制,有以下的原因:

  • 緩存大量的結(jié)果會(huì)消耗大量內(nèi)存矩父,我們可以采用LRU(Least Recently Used)算法來只緩存最近的緩存的結(jié)果
  • 當(dāng)應(yīng)用通過多進(jìn)程執(zhí)?時(shí)锉桑,不同server實(shí)例的緩存結(jié)果可能不一樣,解決方案是共享緩存窍株,流
    ?的方案有Redis和Memcached
  • 可以手動(dòng)的清理緩存民轴,或者進(jìn)?時(shí)間限制,從而避免時(shí)間過期緩存球订。雖然這會(huì)有點(diǎn)不易管理后裸。

使用Promise進(jìn)行批處理和緩存

在這種情況下,使用Promise有以下幾種優(yōu)勢:

  • 一個(gè)Promise可以用多個(gè)then()方法冒滩,在這里?便執(zhí)行批處理請(qǐng)求
  • then()方法最多觸發(fā)一次微驶,在這里剛好只緩存一次結(jié)果
  • resolved的Promise也可以使用then(),在這里?便持久的獲取緩存值
  • then()方法是異步調(diào)用的旦部,著這里?便異步返回結(jié)果
    在這里祈搜,我們使用Promise包裝totalSale()接口:
  • totalSalesPromises.js
// [1]引?入promisification模塊,使返回Promise?而不不是回調(diào)
const pify = require('pify');
const totalSales = pify(require('./totalSales'));
const cache = {};

module.exports = function totalSalesPromises(item) {
  if (cache[item]) { // [2]檢測緩存情況
    return cache[item];
  }
  cache[item] = totalSales(item) // [3]新建Promise
    .then(res => {
      // [4]resolve這個(gè)Promise之后士八,設(shè)計(jì)緩存清理理
      setTimeout(() => {delete cache[item]}, 30 * 1000);
      //30 seconds expiry
      return res;
    })
    .catch(err => { // [5]Promise被reject容燕,刪除緩存拋出錯(cuò)誤
      delete cache[item];
      throw err;
    });
    return cache[item]; // [6]返回緩存結(jié)果
};

可見使用Promise之后代碼變得優(yōu)雅簡潔,并且同時(shí)使用了批處理和緩存

處理計(jì)算密集型(CPU-bound)任務(wù)的方案

在第?章我們知道婚度,通過觸發(fā)異步操作我們讓堆椪好兀回到事件循環(huán),從而可以自由處理其它的請(qǐng)求蝗茁。
但是如果運(yùn)?一個(gè)同步請(qǐng)求時(shí)間很?(比如計(jì)算密集型任務(wù))醋虏,它可能久久不返回給事件循環(huán),因?yàn)樗谥囟壤肅PU而不是在頻繁使?I/O操作

解決子集求和問題(subset sum problem)

子集求和問題哮翘,即求一個(gè)集合的非空子集滿足和為0
最簡單的解決方案是把?集進(jìn)行排列組合颈嚼,這有2^n的復(fù)雜度,這就很計(jì)算復(fù)雜了

  • subsetSum.js實(shí)現(xiàn)該算法
const EventEmitter = require('events').EventEmitter;
//繼承EventEmitter饭寺,每次實(shí)現(xiàn)匹配就就觸發(fā)?一下事件
class SubsetSum extends EventEmitter {
  constructor(sum, set) {
    super();
    this.sum = sum;
    this.set = set;
    this.totalSubsets = 0;
  }
  //實(shí)現(xiàn)可能的組合阻课,注意它是同步的
  _combine(set, subset) {
    for(let i = 0; i < set.length; i++) {
      let newSubset = subset.concat(set[i]);
      this._combine(set.slice(i + 1), newSubset);
      this._processSubset(newSubset);
    }
  }
  //?一旦實(shí)現(xiàn)組合之后叫挟,確定是否匹配,匹配就emit?一個(gè)match事件
  _processSubset(subset) {
    console.log('Subset', ++this.totalSubsets, subset);
    const res = subset.reduce((prev, item) => (prev + item), 0);
    if(res == this.sum) {
      this.emit('match', subset);
    }
  }

  start() {
    this._combine(this.set, []); //觸發(fā)同步的排列列組合
    this.emit('end'); //所有排列列組合結(jié)果計(jì)算之后觸發(fā)end
  }
}
module.exports = SubsetSum;
  • app.js 把剛才的SubsetSum放在HTTP server里執(zhí)?
const http = require('http');
const SubsetSum = require('./subsetSum');
//const SubsetSum = require('./subsetSumDefer');
//const SubsetSum = require('./subsetSumFork');

http.createServer((req, res) => {
  const url = require('url').parse(req.url, true);
  if(url.pathname === '/subsetSum') {
    const data = JSON.parse(url.query.data);
    res.writeHead(200);
    const subsetSum = new SubsetSum(url.query.sum, data);
    subsetSum.on('match', match => {
      res.write('Match: ' + JSON.stringify(match) + '\n');
    });
    subsetSum.on('end', () => res.end());
    subsetSum.start();
  } else {
    res.writeHead(200);
    res.end('I\m alive!\n');
  }
}).listen(8000, () => console.log('Started'));

通過事件機(jī)制限煞,我們知道抹恳,當(dāng)算法執(zhí)行完畢時(shí),也就是要等?會(huì)兒署驻,會(huì)?自動(dòng)返回結(jié)果:


image.png

可以看到奋献,在算法返回結(jié)果時(shí)請(qǐng)求?直被掛起,也就是暫時(shí)都不會(huì)顯示I'm alive旺上,Node.js的單線程就這樣被一個(gè)?的同步計(jì)算操作阻塞瓶蚂。

使?用setImmediate實(shí)現(xiàn)交錯(cuò)

通常來說,計(jì)算密集型(CPU-bound)的算法都是由一系列步驟組成抚官,可能是遞歸調(diào)?扬跋、循環(huán)或者是和其他的組合。所以?個(gè)簡單的解決?案就是讓每一個(gè)(或者一定數(shù)量)步驟執(zhí)行完成之后把執(zhí)行權(quán)交給事件循環(huán)凌节。
核心思想是:通過setImmediate實(shí)現(xiàn)異步任務(wù)和密集計(jì)算的交錯(cuò)執(zhí)?

采?交錯(cuò)執(zhí)?的?集求和問題

  • subsetSumDefer.js 在剛剛的基礎(chǔ)上修改
class SubsetSumDefer extends EventEmitter {
  constructor(sum, set) {
    super();
    this.sum = sum;
    this.set = set;
    this.totalSubsets = 0;
  }
  //實(shí)現(xiàn)交錯(cuò)執(zhí)行的核心函數(shù)
  _combineInterleaved(set, subset) {
    this.runningCombine++; //需要新的參數(shù)來計(jì)數(shù)
    setImmediate(() => {
      this._combine(set, subset);
      if(--this.runningCombine === 0) {
        this.emit('end');
      }
    });
  }

  _combine(set, subset) {
    for(let i = 0; i < set.length; i++) {
      let newSubset = subset.concat(set[i]);
      this._combineInterleaved(set.slice(i + 1), newSubset); //替換為step
      this._processSubset(newSubset);
    }
  }

  _processSubset(subset) {
    console.log('Subset', ++this.totalSubsets, subset);
    const res = subset.reduce((prev, item) => prev + item, 0);
    if(res == this.sum) {
      this.emit('match', subset);
    }
  }

  start() {
    this.runningCombine = 0; //計(jì)數(shù)參數(shù)確認(rèn)計(jì)算完畢
    this._combineInterleaved(this.set, []);
  }
}

交錯(cuò)執(zhí)行模式的思考

盡管剛剛的模式解決了阻塞的問題,但是它并不不是一個(gè)很好的模式洒试,因?yàn)檠舆t同步操作會(huì)帶來事件開銷倍奢,這可能導(dǎo)致?大的影響。特別是當(dāng)我們想盡快響應(yīng)用戶時(shí)垒棋,不不希望等待太久卒煞。我們可以設(shè)置step的數(shù)量,但這并不能從根本上解決問題叼架。
不過也不能因?yàn)樗袝r(shí)間開銷就完全放棄這種模式畔裕,實(shí)際上,如果同步操作順利乖订,偶爾執(zhí)行行異步的操作扮饶,這種利用setImmediate的模式還是可以被認(rèn)可的
注意process.nextTick() 不適用與這種交錯(cuò)的模式,正如第一章講到乍构,它在I/O之前執(zhí)?操作甜无,頻繁的調(diào)用可能導(dǎo)致I/O饑餓。

使?多進(jìn)程

另?個(gè)防止事件循環(huán)阻塞的方案是使用?進(jìn)程哥遮,從而不把昂貴的計(jì)算密集型任務(wù)運(yùn)行在主進(jìn)程上岂丘,這樣做有以下好處:

  • 同步的任務(wù)能夠全心的去運(yùn)?,不用進(jìn)?交錯(cuò)執(zhí)?
  • 使?多進(jìn)程會(huì)比setImmediate更簡單眠饮,而且不?考慮主程序的規(guī)模?小
  • 如果需要更好的性能奥帘,我們甚?至可以用更底層的語言去實(shí)現(xiàn)(比如C)
    Node.js有很多操作外部進(jìn)程的工具庫可以使用,比如可以使用child_process模塊仪召。甚至外部進(jìn)程可以是Node.js程序寨蹋,我們只需要做相關(guān)的連接松蒜。child_process.fork()方法創(chuàng)建的子進(jìn)程會(huì)?動(dòng)的創(chuàng)建進(jìn)程間的交流通道,還可以??一個(gè)類似EventEmitter的接?實(shí)現(xiàn)進(jìn)程間的信息交互

把子集求和任務(wù)交付其他進(jìn)程處理

核?思想是創(chuàng)建?個(gè)子進(jìn)程去處理同步任務(wù)钥庇,讓時(shí)間循環(huán)自由處理?絡(luò)請(qǐng)求牍鞠,下面提出可行的?案:

  1. 創(chuàng)建processPool.js 模塊來建立進(jìn)程池。因?yàn)殚_新進(jìn)程要花時(shí)間评姨,所以就讓他們?cè)谶M(jìn)程池?里運(yùn)行來等待請(qǐng)求难述,這樣可以節(jié)約時(shí)間和CPU。另外限制一下進(jìn)程的數(shù)量吐句,可以防止Dos攻擊
const fork = require('child_process').fork; //創(chuàng)建?子進(jìn)程
class ProcessPool {
  constructor(file, poolMax) {
    this.file = file;
    this.poolMax = poolMax;
    this.pool = []; //pool是?系列將被使?用的運(yùn)?的進(jìn)程
    this.active = []; //active包含現(xiàn)在正在使用的進(jìn)程
    this.waiting = []; //因?yàn)槿鄙龠M(jìn)程資源 等待的回調(diào)隊(duì)列
  }

  acquire(callback) {
  let worker;
  if(this.pool.length > 0) { // [1] 可?用的進(jìn)程就把它變?yōu)閍ctive
    worker = this.pool.pop();
    this.active.push(worker);
    return process.nextTick(callback.bind(null, null, worker));
  }
  if(this.active.length >= this.poolMax) {
    // [2]缺少進(jìn)程資源 等待
    return this.waiting.push(callback);
  }
  worker = fork(this.file); // [3]如果進(jìn)程數(shù)還沒到最?大就再開?一個(gè)進(jìn)程
    this.active.push(worker);
    process.nextTick(callback.bind(null, null, worker));
  }
  
  release(worker) {
    if(this.waiting.length > 0) { // [1]如果請(qǐng)求隊(duì)列列?里里有胁后,則賦給worker
      const waitingCallback = this.waiting.shift();
      waitingCallback(null, worker);
    }
    this.active = this.active.filter(w => worker !== w);
    // [2]完成worker之后,放回池中
    this.pool.push(worker);
  }
}

module.exports = ProcessPool;

進(jìn)程?直在運(yùn)行不停止嗦枢,想要減少常駐內(nèi)存的使用攀芯、增強(qiáng)代碼的健壯性,可以:

  • 在?段時(shí)間后清理閑置的進(jìn)程
  • 設(shè)計(jì)?種機(jī)制文虏,將無響應(yīng)和崩潰的進(jìn)程重啟
  1. 接下來的subsetSumFork.js把?集求和的任務(wù)分發(fā)到子進(jìn)程侣诺。它的作用是和?進(jìn)程交流然后傳輸結(jié)果
const EventEmitter = require('events').EventEmitter;
const ProcessPool = require('./processPool');
//初始化進(jìn)程池,并且設(shè)置最?可用的進(jìn)程數(shù)為2
const workers = new ProcessPool(__dirname + '/subsetSumWorker.js', 2);

class SubsetSumFork extends EventEmitter {
  constructor(sum, set) { //接收sum和set的EventEmitter
    super();
    this.sum = sum;
    this.set = set;
  }

  start() { //觸發(fā)算法運(yùn)?行行
    workers.acquire((err, worker) => { // [1]從進(jìn)程池獲取?一個(gè)?子進(jìn)程
    worker.send({sum: this.sum, set: this.set}); //進(jìn)程通信的通道

    const onMessage = msg => {
      // [3]監(jiān)聽任務(wù)完成 刪掉onmessage然后放回進(jìn)程池
      if (msg.event === 'end') {
        worker.removeListener('message', onMessage);
        workers.release(worker);
      }
      this.emit(msg.event, msg.data); // [4]?無縫傳遞信息
    };
      worker.on('message', onMessage); // [2]監(jiān)聽信息
    });
  }
}

module.exports = SubsetSumFork;

send()?法在子進(jìn)程也是可用的氧秘,這也被cluster模塊用來實(shí)現(xiàn)多線程分發(fā)HTTP server

  1. 最后我們的subsetSumWorker.js 需要?一個(gè)worker(子進(jìn)程)來執(zhí)?子集求和的算法年鸳,并且把它的算法結(jié)果傳給?進(jìn)程。
const SubsetSum = require('./subsetSum');

process.on('message', msg => {
  // [1]從?進(jìn)程監(jiān)測信息丸相,?旦有信息我們就新建實(shí)例搔确,然后說明匹配
  const subsetSum = new SubsetSum(msg.sum, msg.set);

  subsetSum.on('match', data => {
    // [2]?一個(gè)對(duì)象封裝匹配結(jié)果傳給父進(jìn)程
    process.send({event: 'match', data: data});
  });
  
  subsetSum.on('end', data => {
    process.send({event: 'end', data: data});
  });
  
  subsetSum.start();
});

可以看到,我們重?了原來的subsetSum(同步版本)灭忠,但是這次由于我們單獨(dú)開了進(jìn)程膳算,所以不?擔(dān)心事件循環(huán)被阻塞。
綜上弛作,可以看到涕蜂,應(yīng)用程序的一部分可以交付外部進(jìn)程去處理的。
不過當(dāng)子進(jìn)程不是Node.js程序時(shí)缆蝉,進(jìn)程間通信的通道可能就不可?了宇葱。我們可以?己通過流式輸入輸出協(xié)議設(shè)計(jì)接口,可以參考child_process的實(shí)現(xiàn)

多進(jìn)程模式的思考

我們可以并發(fā)的開兩個(gè)?己求和任務(wù)刊头,如果開3個(gè)會(huì)掛起?個(gè)直到其中一個(gè)任務(wù)完成黍瞧,這是因?yàn)槲覀冎霸O(shè)置了進(jìn)程的最大數(shù)。
可?多進(jìn)程模式比交錯(cuò)執(zhí)?模式更高效有力原杂,不過印颤,因?yàn)閱我辉O(shè)備對(duì)資源有硬性限制,它并不不?便擴(kuò)展穿肄。所以我們可以通過多個(gè)設(shè)備去實(shí)現(xiàn)分發(fā)加載任務(wù)年局。這也是我們接下來會(huì)提到的分布式架構(gòu)模式际看。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市矢否,隨后出現(xiàn)的幾起案子仲闽,更是在濱河造成了極大的恐慌,老刑警劉巖僵朗,帶你破解...
    沈念sama閱讀 222,183評(píng)論 6 516
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件赖欣,死亡現(xiàn)場離奇詭異,居然都是意外死亡验庙,警方通過查閱死者的電腦和手機(jī)顶吮,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 94,850評(píng)論 3 399
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來粪薛,“玉大人悴了,你說我怎么就攤上這事∥ナ伲” “怎么了湃交?”我有些...
    開封第一講書人閱讀 168,766評(píng)論 0 361
  • 文/不壞的土叔 我叫張陵,是天一觀的道長藤巢。 經(jīng)常有香客問我巡揍,道長,這世上最難降的妖魔是什么菌瘪? 我笑而不...
    開封第一講書人閱讀 59,854評(píng)論 1 299
  • 正文 為了忘掉前任,我火速辦了婚禮阱当,結(jié)果婚禮上俏扩,老公的妹妹穿的比我還像新娘。我一直安慰自己弊添,他們只是感情好录淡,可當(dāng)我...
    茶點(diǎn)故事閱讀 68,871評(píng)論 6 398
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著油坝,像睡著了一般嫉戚。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上澈圈,一...
    開封第一講書人閱讀 52,457評(píng)論 1 311
  • 那天彬檀,我揣著相機(jī)與錄音,去河邊找鬼瞬女。 笑死窍帝,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的诽偷。 我是一名探鬼主播坤学,決...
    沈念sama閱讀 40,999評(píng)論 3 422
  • 文/蒼蘭香墨 我猛地睜開眼疯坤,長吁一口氣:“原來是場噩夢(mèng)啊……” “哼!你這毒婦竟也來了深浮?” 一聲冷哼從身側(cè)響起压怠,我...
    開封第一講書人閱讀 39,914評(píng)論 0 277
  • 序言:老撾萬榮一對(duì)情侶失蹤,失蹤者是張志新(化名)和其女友劉穎飞苇,沒想到半個(gè)月后菌瘫,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 46,465評(píng)論 1 319
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡玄柠,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 38,543評(píng)論 3 342
  • 正文 我和宋清朗相戀三年突梦,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片羽利。...
    茶點(diǎn)故事閱讀 40,675評(píng)論 1 353
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡宫患,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出这弧,到底是詐尸還是另有隱情娃闲,我是刑警寧澤,帶...
    沈念sama閱讀 36,354評(píng)論 5 351
  • 正文 年R本政府宣布匾浪,位于F島的核電站皇帮,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏蛋辈。R本人自食惡果不足惜属拾,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 42,029評(píng)論 3 335
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望冷溶。 院中可真熱鬧渐白,春花似錦、人聲如沸逞频。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,514評(píng)論 0 25
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽苗胀。三九已至襟诸,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間基协,已是汗流浹背歌亲。 一陣腳步聲響...
    開封第一講書人閱讀 33,616評(píng)論 1 274
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留堡掏,地道東北人应结。 一個(gè)月前我還...
    沈念sama閱讀 49,091評(píng)論 3 378
  • 正文 我出身青樓,卻偏偏與公主長得像,于是被迫代替她去往敵國和親鹅龄。 傳聞我的和親對(duì)象是個(gè)殘疾皇子揩慕,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,685評(píng)論 2 360

推薦閱讀更多精彩內(nèi)容

  • Android 自定義View的各種姿勢1 Activity的顯示之ViewRootImpl詳解 Activity...
    passiontim閱讀 172,307評(píng)論 25 707
  • Spring Cloud為開發(fā)人員提供了快速構(gòu)建分布式系統(tǒng)中一些常見模式的工具(例如配置管理,服務(wù)發(fā)現(xiàn)扮休,斷路器迎卤,智...
    卡卡羅2017閱讀 134,711評(píng)論 18 139
  • 在服務(wù)器端程序開發(fā)領(lǐng)域蜗搔,性能問題一直是備受關(guān)注的重點(diǎn)。業(yè)界有大量的框架八堡、組件樟凄、類庫都是以性能為賣點(diǎn)而廣為人知。然而...
    dreamer_lk閱讀 1,014評(píng)論 0 17
  • 初見它兄渺,如一汪春水沁人心脾缝龄。 盡管隔了一層玻璃,眼眸里亦滿是溫潤挂谍;罐身的纏枝牡丹花紋蘊(yùn)含了大家閨秀...
    悠悠蓮閱讀 194評(píng)論 2 5
  • 特別是我們?cè)谧鲅h(huán)操作的時(shí)候叔壤,一堆的循環(huán)列表,但是只有某一個(gè)值出問題口叙,一次次的循環(huán)調(diào)試起來很費(fèi)勁炼绘,這個(gè)時(shí)候,我們可...
    Devid閱讀 1,106評(píng)論 0 1