cluster 和 child_process 模塊子進(jìn)程的區(qū)別
child_process 執(zhí)行 shell 命令分衫、利用多進(jìn)程執(zhí)行代碼
cluster 通過多進(jìn)程 master、worker 實(shí)現(xiàn)多個(gè) HTTP 應(yīng)用服務(wù)器架構(gòu)
總結(jié)寫前面
cluster 模塊是 node 利用多進(jìn)程處理網(wǎng)絡(luò)連接的應(yīng)用架構(gòu)
cluster 通過進(jìn)程 IPC 通道共享主進(jìn)程的 server handle 句柄創(chuàng)建 socket 文件描述符 實(shí)現(xiàn)子進(jìn)程共同監(jiān)聽同一端口
cluster 在 http 網(wǎng)絡(luò)請(qǐng)求中采用 RoundRobin 輪詢的負(fù)載均衡方式對(duì) woker 進(jìn)行調(diào)度
框架圖
多進(jìn)程通信煤傍,子進(jìn)程監(jiān)聽同一端口為什么不沖突
不同進(jìn)程之間的 server 通過 IPC 通道共享監(jiān)聽某個(gè)端口的 socket 連接句柄來解決沖突朦肘。
// master.js
const { createServer } = require('net')
const { fork } = require('child_process')
const cpus = require('os').cpus()
const netServer = createServer().listen(3000) // create TCP server
for (let i = 0; i < cpus.length; i++) {
const worker = fork('worker.js')
worker.send('server', netServer)
console.log('worker process created, pid: %s ppid: %s', worker.pid, process.pid);
}
// worker.js
const http = require('http')
const server = http.createServer((req, res) => { // this.on('connection', connectionListener);
res.end('I am worker, pid: ' + process.pid + ', ppid: ' + process.ppid);
})
let _handle
process.on('message', (msg, handle) => {
if (msg !== 'server') return
_handle = handle
_handle.on('connection', (socket) => { // _http_server.js 中實(shí)現(xiàn), this.on('connection', connectionListener)
server.emit('connection', socket); // 與子進(jìn)程 server 共享 socket 處理連接后執(zhí)行子進(jìn)程 http.createServer 的 callback
})
})
server 共享 socket 過程
看下 createServer
的處理過程就可以知道 server.emit('connection', socket);
是如何共享 socket 并何時(shí)觸發(fā) createServer
回調(diào)對(duì)用戶進(jìn)行響應(yīng)的
-
_http_server 創(chuàng)建 creatServer
Server 綁定 request 事件處理 (req, res) => {} 函數(shù)返回響應(yīng) 和 connection 事件 共享 socket饭弓。
// Server 構(gòu)造函數(shù)
function Server {
...
if (requestListener) {
this.on('request', requestListener);
}
this.on('connection', connectionListener);
}
function connectionListener(socket) {
defaultTriggerAsyncIdScope(
getOrSetAsyncId(socket), connectionListenerInternal, this, socket
);
}
function connectionListenerInternal(server, socket) {
// ...
parser.onIncoming = parserOnIncoming.bind(undefined, server, socket, state)
}
function resOnFinish(req, res, socket, state, server) {
// ...
res.detachSocket(socket); // 關(guān)閉 socket
// ...
}
function parserOnIncoming(server, socket, state, req, keepAlive) {
// ...
res.on('finish', resOnFinish.bind(undefined, req, res, socket, state, server));
if (req.headers.expect !== undefined &&
(req.httpVersionMajor === 1 && req.httpVersionMinor === 1)) {
// ...
} else {
server.emit('request', req, res);
}
}
cluster 源碼
// cluster.js
const cluster = require('cluster');
const http = require('http');
const numCPUs = require('os').cpus().length;
if (cluster.isMaster) {
console.log(`Master 進(jìn)程 ${process.pid} 正在運(yùn)行`);
for (let i = 0; i < 1; i++) { // 衍生工作進(jìn)程。
cluster.fork();
}
cluster.on('exit', (worker, code, signal) => { console.log(`Worker ${worker.process.pid} 已退出`) });
} else {
http.createServer((req, res) => res.end(`你好世界 ${process.pid}`)).listen(8000);
console.log(`Worker 進(jìn)程 ${process.pid} 已啟動(dòng)`);
}
主進(jìn)程創(chuàng)建子進(jìn)程 cluster fork
createWorkerProcess 通過 child_process fork 創(chuàng)建子進(jìn)程源碼
在主進(jìn)程中 cluster.fork 通過 child_process fork 創(chuàng)建子進(jìn)程
function createWorkerProcess(id, env) {
// ...
return fork(cluster.settings.exec, cluster.settings.args, {
cwd: cluster.settings.cwd,
env: workerEnv,
serialization: cluster.settings.serialization,
silent: cluster.settings.silent,
windowsHide: cluster.settings.windowsHide,
execArgv: execArgv,
stdio: cluster.settings.stdio,
gid: cluster.settings.gid,
uid: cluster.settings.uid
});
}
在子進(jìn)程中的 http.createServer
當(dāng)在子進(jìn)程通過 createServer 并 listen 端口時(shí)媒抠,net 模塊會(huì)根據(jù) isMaster 來當(dāng)前進(jìn)程是主進(jìn)程直接監(jiān)聽端口弟断,當(dāng)前進(jìn)程是子進(jìn)程則通過 IPC 通道獲取 master 進(jìn)程的服務(wù)器(server)句柄(handle),并監(jiān)聽(listen)它趴生。
listenInCluster
http.createServer().listen(port) 會(huì)執(zhí)行 net 模塊的 Server.prototype.listen 方法 調(diào)用 listenInCluster阀趴,在此方法中根據(jù) isMaster 判斷,子進(jìn)程時(shí)通過 cluster 模塊 cluster._getServer 方法與 master 建立 IPC 通道獲取 master 中 創(chuàng)建 server 的 handle 并在子進(jìn)程代碼中進(jìn)行 listen冲秽。
listenInCluster 源碼
function listenInCluster(server, address, port, addressType, backlog, fd, exclusive, flags) {
if (cluster === undefined) cluster = require('cluster');
if (cluster.isMaster || exclusive) {
server._listen2(address, port, addressType, backlog, fd, flags);
return;
}
const serverQuery = {
address: address,
port: port,
addressType: addressType,
fd: fd,
flags,
};
// 通過 IPC 通道獲取 master server 的 handle 進(jìn)行監(jiān)聽
cluster._getServer(server, serverQuery, listenOnMasterHandle);
function listenOnMasterHandle(err, handle) {
err = checkBindError(err, port, handle);
if (err) {
const ex = exceptionWithHostPort(err, 'bind', address, port);
return server.emit('error', ex);
}
server._handle = handle; // 重用 master handle
server._listen2(address, port, addressType, backlog, fd, flags);
}
}
子進(jìn)程 cluster._getServer
子進(jìn)程中 _getServer 向 master 通過 IPC 發(fā)送 node 內(nèi)部消息為 act: 'queryServer ' 的通信獲取 master handle
子進(jìn)程 cluster._getServer
// lib/internal/cluster/master.js
// obj 在 http 請(qǐng)求 TCP 連接中是 net 的 Server 實(shí)例舍咖,UDP 連接是 dgram 的 Socket 實(shí)例
cluster._getServer = function(obj, options, cb) {
...
const message = {
act: 'queryServer',
index,
data: null,
...options
};
message.address = address;
if (obj._getServerData)
message.data = obj._getServerData();
// 向 master 發(fā)送 querServer 消息
send(message, (reply, handle) => {
if (typeof obj._setServerData === 'function')
obj._setServerData(reply.data);
if (handle)
shared(reply, handle, indexesKey, cb); // Shared listen socket. UDP 連接處理方式
else
rr(reply, indexesKey, cb); // Round-robin. TCP 連接 rr 模式
});
obj.once('listening', () => {
...
}
}
主進(jìn)程 master 中 queryServer
// master 監(jiān)聽內(nèi)部消息
function onmessage(message, handle) {
const worker = this;
if (message.act === 'online')
online(worker);
else if (message.act === 'queryServer')
queryServer(worker, message); // queryServer
else if (message.act === 'listening')
listening(worker, message);
else if (message.act === 'exitedAfterDisconnect')
exitedAfterDisconnect(worker, message);
else if (message.act === 'close')
close(worker, message);
}
queryServer 當(dāng)不存在 RoundRobinHandle 實(shí)例時(shí)會(huì)創(chuàng)建一個(gè), 通過 RoundRobinHandle 原型 add 方法添加 woker 到 實(shí)例 this.all 屬性中,用來進(jìn)行 master 對(duì) worker 的負(fù)載均衡策略锉桑。
function queryServer(worker, message) {
...
let handle = handles.get(key);
// 創(chuàng)建 TCP RoundRobinHandle rr 實(shí)例, master 邏輯
if (handle === undefined) {
...
let constructor = RoundRobinHandle;
handle = new constructor(key, address, message);
handles.set(key, handle);
}
...
handle.add(worker, (errno, reply, handle) => {
const { data } = handles.get(key);
if (errno)
handles.delete(key); // Gives other workers a chance to retry.
// handle.add 后去執(zhí)行子進(jìn)程 queryServe 的 cb排霉,告知采用 UDP 或 TCP
send(worker, {
errno,
key,
ack: message.seq,
data,
...reply
}, handle);
});
}
RoundRobinHandle 實(shí)例創(chuàng)建
創(chuàng)建 server,重寫 server._handle 句柄的 onconnection 改用輪詢的方式分發(fā)句柄給子進(jìn)程處理
在 master 進(jìn)程中會(huì)接收、傳遞請(qǐng)求給 worker 處理攻柠,RoundRobinHandle 的作用就是用來對(duì) woker 進(jìn)行分發(fā)球订、任務(wù)交接、調(diào)度的負(fù)載均衡策略瑰钮,同時(shí)進(jìn)程間共享的 TCP server handle 是在 RoundRobinHandle 實(shí)例創(chuàng)建時(shí)生成的冒滩。
- RoundRobinHandle 實(shí)例創(chuàng)建,重寫 server._handle.onconnection 處理請(qǐng)求
通過傳入無操作的回調(diào)用 net.createServer 創(chuàng)建 server 并監(jiān)聽端口浪谴,通過對(duì) server.once('listening') 的監(jiān)聽重寫 this.server._handle 的 onconnection开睡,當(dāng) server 的 handle 遇到 connection 事件時(shí)將會(huì)使用 RoundRobinHandle 實(shí)例的 distribute 進(jìn)行 handle 的分發(fā)
// lib/internal/cluster/round_robin_handle.js
function RoundRobinHandle(key, address, { port, fd, flags }) {
this.key = key;
this.all = new Map(); // 所有的 woker
this.free = new Map(); // 空閑可用的 woker
this.handles = [];
this.handle = null;
this.server = net.createServer(assert.fail); // assert.fail typeof Function, 這里給了個(gè)沒用的 onconnection 回調(diào)用來生成 server
if (fd >= 0)
this.server.listen({ fd });
else if (port >= 0) { // fd: undefined, port: 9000
this.server.listen({ // 監(jiān)聽 address port, 觸發(fā) listening 事件
port,
host: address,
// Currently, net module only supports `ipv6Only` option in `flags`.
ipv6Only: Boolean(flags & constants.UV_TCP_IPV6ONLY),
});
} else
this.server.listen(address); // UNIX socket path.
// 在調(diào)用 server.listen() 后綁定服務(wù)器時(shí)觸發(fā)。
this.server.once('listening', () => {
this.handle = this.server._handle;
this.handle.onconnection = (err, handle) => this.distribute(err, handle); // 改寫 net.Server onconnection
this.server._handle = null;
this.server = null;
});
}
RoundRobinHandle 輪詢分配策略
RoundRobinHandle 通過輪詢分配 handle 給 woker 的負(fù)載策略共享 handle 的 socket 解決子進(jìn)程共同監(jiān)聽一個(gè)端口處理請(qǐng)求苟耻。
最后就回到文章中最開始 server 共享 socket 過程 中觸發(fā) createServer((req, res) => {}) 回調(diào)的內(nèi)容篇恒。
參考
源碼分析
cluster-base
cluster 模塊的主要功能實(shí)現(xiàn)
egg-cluster 模塊的實(shí)現(xiàn)
cluster 模塊是用來處理網(wǎng)絡(luò)連接的多進(jìn)程模塊,egg-cluster 通過 cluster 模塊對(duì) egg 進(jìn)行多進(jìn)程管理的基礎(chǔ)模塊
在 egg-cluster 中:
master 主進(jìn)程類似守護(hù)進(jìn)程在后臺(tái)執(zhí)行
agent 是由 child_process 模塊 fork 創(chuàng)建凶杖,當(dāng) master 退出時(shí)會(huì)優(yōu)雅的退出 agent 進(jìn)程(防止變?yōu)楣聝哼M(jìn)程被系統(tǒng) init 收養(yǎng) parentId: 0)
woker 是由 cluster 模塊 fork 創(chuàng)建胁艰,用來處理 http 請(qǐng)求
可以參考文章 egg-cluster