Nodejs cluster 模塊

cluster 和 child_process 模塊子進(jìn)程的區(qū)別

child_process 執(zhí)行 shell 命令分衫、利用多進(jìn)程執(zhí)行代碼
cluster 通過多進(jìn)程 master、worker 實(shí)現(xiàn)多個(gè) HTTP 應(yīng)用服務(wù)器架構(gòu)

總結(jié)寫前面

cluster 模塊是 node 利用多進(jìn)程處理網(wǎng)絡(luò)連接的應(yīng)用架構(gòu)
cluster 通過進(jìn)程 IPC 通道共享主進(jìn)程的 server handle 句柄創(chuàng)建 socket 文件描述符 實(shí)現(xiàn)子進(jìn)程共同監(jiān)聽同一端口
cluster 在 http 網(wǎng)絡(luò)請(qǐng)求中采用 RoundRobin 輪詢的負(fù)載均衡方式對(duì) woker 進(jìn)行調(diào)度

框架圖

http createServer 時(shí) child 通過 IPC 通道獲取 master 的 server.handle 流程

多進(jìn)程通信煤傍,子進(jìn)程監(jiān)聽同一端口為什么不沖突

不同進(jìn)程之間的 server 通過 IPC 通道共享監(jiān)聽某個(gè)端口的 socket 連接句柄來解決沖突朦肘。

// master.js
const { createServer } = require('net')
const { fork } = require('child_process')
const cpus = require('os').cpus()

const netServer = createServer().listen(3000) // create TCP server
for (let i = 0; i < cpus.length; i++) {
  const worker = fork('worker.js')
  worker.send('server', netServer)
  console.log('worker process created, pid: %s ppid: %s', worker.pid, process.pid);
}

// worker.js
const http = require('http')

const server = http.createServer((req, res) => { //   this.on('connection', connectionListener);
  res.end('I am worker, pid: ' + process.pid + ', ppid: ' + process.ppid);
})

let _handle
process.on('message', (msg, handle) => {
  if (msg !== 'server') return
  _handle = handle
  _handle.on('connection', (socket) => { // _http_server.js 中實(shí)現(xiàn), this.on('connection', connectionListener)
    server.emit('connection', socket); // 與子進(jìn)程 server 共享 socket 處理連接后執(zhí)行子進(jìn)程 http.createServer 的 callback
  })
})

server 共享 socket 過程

看下 createServer 的處理過程就可以知道 server.emit('connection', socket); 是如何共享 socket 并何時(shí)觸發(fā) createServer 回調(diào)對(duì)用戶進(jìn)行響應(yīng)的

// Server 構(gòu)造函數(shù)
function Server {
 ... 
  if (requestListener) {
    this.on('request', requestListener);
  }

  this.on('connection', connectionListener);
}
function connectionListener(socket) {
  defaultTriggerAsyncIdScope(
    getOrSetAsyncId(socket), connectionListenerInternal, this, socket
  );
}
function connectionListenerInternal(server, socket) {
// ...
  parser.onIncoming = parserOnIncoming.bind(undefined, server, socket, state)
}
function resOnFinish(req, res, socket, state, server) {
  // ...
  res.detachSocket(socket); // 關(guān)閉 socket
  // ...
}

function parserOnIncoming(server, socket, state, req, keepAlive) {
  // ...
  res.on('finish', resOnFinish.bind(undefined, req, res, socket, state, server));

  if (req.headers.expect !== undefined &&
      (req.httpVersionMajor === 1 && req.httpVersionMinor === 1)) {
  // ...
  } else {
    server.emit('request', req, res);
  }
}

cluster 源碼

// cluster.js
const cluster = require('cluster');
const http = require('http');
const numCPUs = require('os').cpus().length;

if (cluster.isMaster) {
  console.log(`Master 進(jìn)程 ${process.pid} 正在運(yùn)行`);

  for (let i = 0; i < 1; i++) { // 衍生工作進(jìn)程。
    cluster.fork();
  }

  cluster.on('exit', (worker, code, signal) => { console.log(`Worker ${worker.process.pid} 已退出`) });
} else {
  http.createServer((req, res) => res.end(`你好世界 ${process.pid}`)).listen(8000);
  console.log(`Worker 進(jìn)程 ${process.pid} 已啟動(dòng)`);
}

主進(jìn)程創(chuàng)建子進(jìn)程 cluster fork

createWorkerProcess 通過 child_process fork 創(chuàng)建子進(jìn)程源碼
在主進(jìn)程中 cluster.fork 通過 child_process fork 創(chuàng)建子進(jìn)程

function createWorkerProcess(id, env) {
// ...
  return fork(cluster.settings.exec, cluster.settings.args, {
    cwd: cluster.settings.cwd,
    env: workerEnv,
    serialization: cluster.settings.serialization,
    silent: cluster.settings.silent,
    windowsHide: cluster.settings.windowsHide,
    execArgv: execArgv,
    stdio: cluster.settings.stdio,
    gid: cluster.settings.gid,
    uid: cluster.settings.uid
  });
}

在子進(jìn)程中的 http.createServer

當(dāng)在子進(jìn)程通過 createServer 并 listen 端口時(shí)媒抠,net 模塊會(huì)根據(jù) isMaster 來當(dāng)前進(jìn)程是主進(jìn)程直接監(jiān)聽端口弟断,當(dāng)前進(jìn)程是子進(jìn)程則通過 IPC 通道獲取 master 進(jìn)程的服務(wù)器(server)句柄(handle),并監(jiān)聽(listen)它趴生。

http createServer 時(shí) child 通過 IPC 通道獲取 master 的 server.handle 流程

listenInCluster

http.createServer().listen(port) 會(huì)執(zhí)行 net 模塊的 Server.prototype.listen 方法 調(diào)用 listenInCluster阀趴,在此方法中根據(jù) isMaster 判斷,子進(jìn)程時(shí)通過 cluster 模塊 cluster._getServer 方法與 master 建立 IPC 通道獲取 master 中 創(chuàng)建 server 的 handle 并在子進(jìn)程代碼中進(jìn)行 listen冲秽。
listenInCluster 源碼

function listenInCluster(server, address, port, addressType, backlog, fd, exclusive, flags) {
  if (cluster === undefined) cluster = require('cluster');

  if (cluster.isMaster || exclusive) {
    server._listen2(address, port, addressType, backlog, fd, flags);
    return;
  }

  const serverQuery = {
    address: address,
    port: port,
    addressType: addressType,
    fd: fd,
    flags,
  };

  // 通過 IPC 通道獲取 master server 的 handle 進(jìn)行監(jiān)聽
  cluster._getServer(server, serverQuery, listenOnMasterHandle);

  function listenOnMasterHandle(err, handle) {
    err = checkBindError(err, port, handle);

    if (err) {
      const ex = exceptionWithHostPort(err, 'bind', address, port);
      return server.emit('error', ex);
    }

    server._handle = handle; // 重用 master handle

    server._listen2(address, port, addressType, backlog, fd, flags);
  }
}

子進(jìn)程 cluster._getServer

子進(jìn)程中 _getServer 向 master 通過 IPC 發(fā)送 node 內(nèi)部消息為 act: 'queryServer ' 的通信獲取 master handle
子進(jìn)程 cluster._getServer

// lib/internal/cluster/master.js
// obj 在 http 請(qǐng)求 TCP 連接中是 net 的 Server 實(shí)例舍咖,UDP 連接是 dgram 的 Socket 實(shí)例
cluster._getServer = function(obj, options, cb) {
  ...
  const message = {
    act: 'queryServer',
    index,
    data: null,
    ...options
  };

  message.address = address;

  if (obj._getServerData)
    message.data = obj._getServerData();
  // 向 master 發(fā)送 querServer 消息
  send(message, (reply, handle) => {
    if (typeof obj._setServerData === 'function')
      obj._setServerData(reply.data);

    if (handle)
      shared(reply, handle, indexesKey, cb);  // Shared listen socket. UDP 連接處理方式
    else
      rr(reply, indexesKey, cb);              // Round-robin. TCP 連接 rr 模式
  });

  obj.once('listening', () => {
    ...
  }
}

主進(jìn)程 master 中 queryServer

// master 監(jiān)聽內(nèi)部消息
function onmessage(message, handle) {
  const worker = this;

  if (message.act === 'online')
    online(worker);
  else if (message.act === 'queryServer')
    queryServer(worker, message); // queryServer
  else if (message.act === 'listening')
    listening(worker, message);
  else if (message.act === 'exitedAfterDisconnect')
    exitedAfterDisconnect(worker, message);
  else if (message.act === 'close')
    close(worker, message);
}

queryServer 當(dāng)不存在 RoundRobinHandle 實(shí)例時(shí)會(huì)創(chuàng)建一個(gè), 通過 RoundRobinHandle 原型 add 方法添加 woker 到 實(shí)例 this.all 屬性中,用來進(jìn)行 master 對(duì) worker 的負(fù)載均衡策略锉桑。

function queryServer(worker, message) {
  ...
  let handle = handles.get(key);
  // 創(chuàng)建 TCP RoundRobinHandle rr 實(shí)例, master 邏輯
  if (handle === undefined) {
    ...
    let constructor = RoundRobinHandle;
    handle = new constructor(key, address, message);
    handles.set(key, handle);
  }
  ...
  handle.add(worker, (errno, reply, handle) => {
    const { data } = handles.get(key);

    if (errno)
      handles.delete(key);  // Gives other workers a chance to retry.
     // handle.add 后去執(zhí)行子進(jìn)程 queryServe 的 cb排霉,告知采用 UDP 或 TCP
    send(worker, {
      errno,
      key,
      ack: message.seq,
      data,
      ...reply
    }, handle);
  });
}

RoundRobinHandle 實(shí)例創(chuàng)建

創(chuàng)建 server,重寫 server._handle 句柄的 onconnection 改用輪詢的方式分發(fā)句柄給子進(jìn)程處理

在 master 進(jìn)程中會(huì)接收、傳遞請(qǐng)求給 worker 處理攻柠,RoundRobinHandle 的作用就是用來對(duì) woker 進(jìn)行分發(fā)球订、任務(wù)交接、調(diào)度的負(fù)載均衡策略瑰钮,同時(shí)進(jìn)程間共享的 TCP server handle 是在 RoundRobinHandle 實(shí)例創(chuàng)建時(shí)生成的冒滩。

  • RoundRobinHandle 實(shí)例創(chuàng)建,重寫 server._handle.onconnection 處理請(qǐng)求
    通過傳入無操作的回調(diào)用 net.createServer 創(chuàng)建 server 并監(jiān)聽端口浪谴,通過對(duì) server.once('listening') 的監(jiān)聽重寫 this.server._handle 的 onconnection开睡,當(dāng) server 的 handle 遇到 connection 事件時(shí)將會(huì)使用 RoundRobinHandle 實(shí)例的 distribute 進(jìn)行 handle 的分發(fā)
// lib/internal/cluster/round_robin_handle.js
function RoundRobinHandle(key, address, { port, fd, flags }) {
  this.key = key;
  this.all = new Map(); // 所有的 woker 
  this.free = new Map(); // 空閑可用的 woker
  this.handles = [];
  this.handle = null;
  this.server = net.createServer(assert.fail); // assert.fail typeof Function, 這里給了個(gè)沒用的 onconnection 回調(diào)用來生成 server

  if (fd >= 0)
    this.server.listen({ fd });
  else if (port >= 0) { // fd: undefined, port: 9000
    this.server.listen({ // 監(jiān)聽 address port, 觸發(fā) listening 事件
      port,
      host: address,
      // Currently, net module only supports `ipv6Only` option in `flags`.
      ipv6Only: Boolean(flags & constants.UV_TCP_IPV6ONLY),
    });
  } else
    this.server.listen(address);  // UNIX socket path.
  // 在調(diào)用 server.listen() 后綁定服務(wù)器時(shí)觸發(fā)。
  this.server.once('listening', () => {
    this.handle = this.server._handle;
    this.handle.onconnection = (err, handle) => this.distribute(err, handle); // 改寫 net.Server onconnection
    this.server._handle = null;
    this.server = null;
  });
}

RoundRobinHandle 輪詢分配策略

RoundRobinHandle 通過輪詢分配 handle 給 woker 的負(fù)載策略共享 handle 的 socket 解決子進(jìn)程共同監(jiān)聽一個(gè)端口處理請(qǐng)求苟耻。



最后就回到文章中最開始 server 共享 socket 過程 中觸發(fā) createServer((req, res) => {}) 回調(diào)的內(nèi)容篇恒。

參考

源碼分析
cluster-base
cluster 模塊的主要功能實(shí)現(xiàn)

egg-cluster 模塊的實(shí)現(xiàn)

cluster 模塊是用來處理網(wǎng)絡(luò)連接的多進(jìn)程模塊,egg-cluster 通過 cluster 模塊對(duì) egg 進(jìn)行多進(jìn)程管理的基礎(chǔ)模塊
在 egg-cluster 中:
master 主進(jìn)程類似守護(hù)進(jìn)程在后臺(tái)執(zhí)行
agent 是由 child_process 模塊 fork 創(chuàng)建凶杖,當(dāng) master 退出時(shí)會(huì)優(yōu)雅的退出 agent 進(jìn)程(防止變?yōu)楣聝哼M(jìn)程被系統(tǒng) init 收養(yǎng) parentId: 0)
woker 是由 cluster 模塊 fork 創(chuàng)建胁艰,用來處理 http 請(qǐng)求
可以參考文章 egg-cluster

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市智蝠,隨后出現(xiàn)的幾起案子腾么,更是在濱河造成了極大的恐慌,老刑警劉巖杈湾,帶你破解...
    沈念sama閱讀 216,372評(píng)論 6 498
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件解虱,死亡現(xiàn)場(chǎng)離奇詭異,居然都是意外死亡漆撞,警方通過查閱死者的電腦和手機(jī)饭寺,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,368評(píng)論 3 392
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來叫挟,“玉大人,你說我怎么就攤上這事限煞∧遥” “怎么了?”我有些...
    開封第一講書人閱讀 162,415評(píng)論 0 353
  • 文/不壞的土叔 我叫張陵署驻,是天一觀的道長(zhǎng)奋献。 經(jīng)常有香客問我,道長(zhǎng)旺上,這世上最難降的妖魔是什么瓶蚂? 我笑而不...
    開封第一講書人閱讀 58,157評(píng)論 1 292
  • 正文 為了忘掉前任,我火速辦了婚禮宣吱,結(jié)果婚禮上窃这,老公的妹妹穿的比我還像新娘。我一直安慰自己征候,他們只是感情好杭攻,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,171評(píng)論 6 388
  • 文/花漫 我一把揭開白布祟敛。 她就那樣靜靜地躺著,像睡著了一般兆解。 火紅的嫁衣襯著肌膚如雪馆铁。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 51,125評(píng)論 1 297
  • 那天锅睛,我揣著相機(jī)與錄音埠巨,去河邊找鬼。 笑死现拒,一個(gè)胖子當(dāng)著我的面吹牛辣垒,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播具练,決...
    沈念sama閱讀 40,028評(píng)論 3 417
  • 文/蒼蘭香墨 我猛地睜開眼乍构,長(zhǎng)吁一口氣:“原來是場(chǎng)噩夢(mèng)啊……” “哼!你這毒婦竟也來了扛点?” 一聲冷哼從身側(cè)響起哥遮,我...
    開封第一講書人閱讀 38,887評(píng)論 0 274
  • 序言:老撾萬榮一對(duì)情侶失蹤,失蹤者是張志新(化名)和其女友劉穎陵究,沒想到半個(gè)月后眠饮,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,310評(píng)論 1 310
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡铜邮,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,533評(píng)論 2 332
  • 正文 我和宋清朗相戀三年仪召,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片松蒜。...
    茶點(diǎn)故事閱讀 39,690評(píng)論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡扔茅,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出秸苗,到底是詐尸還是另有隱情召娜,我是刑警寧澤,帶...
    沈念sama閱讀 35,411評(píng)論 5 343
  • 正文 年R本政府宣布惊楼,位于F島的核電站玖瘸,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏檀咙。R本人自食惡果不足惜雅倒,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,004評(píng)論 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望弧可。 院中可真熱鬧蔑匣,春花似錦、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,659評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至趴久,卻和暖如春丸相,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背彼棍。 一陣腳步聲響...
    開封第一講書人閱讀 32,812評(píng)論 1 268
  • 我被黑心中介騙來泰國打工灭忠, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人座硕。 一個(gè)月前我還...
    沈念sama閱讀 47,693評(píng)論 2 368
  • 正文 我出身青樓弛作,卻偏偏與公主長(zhǎng)得像,于是被迫代替她去往敵國和親华匾。 傳聞我的和親對(duì)象是個(gè)殘疾皇子映琳,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,577評(píng)論 2 353

推薦閱讀更多精彩內(nèi)容