使用例子
為了讓node應(yīng)用能夠在多核服務(wù)器中提高性能技掏,node提供cluster
API署恍,用于創(chuàng)建多個工作進程隔显,然后由這些工作進程并行處理請求却妨。
// master.js
const cluster = require('cluster');
const cpusLen = require('os').cpus().length;
const path = require('path');
console.log(`主進程:${process.pid}`);
cluster.setupMaster({
exec: path.resolve(__dirname, './work.js'),
});
for (let i = 0; i < cpusLen; i++) {
cluster.fork();
}
// work.js
const http = require('http');
console.log(`工作進程:${process.pid}`);
http.createServer((req, res) => {
res.end('hello');
}).listen(8080);
上面例子中,使用cluster
創(chuàng)建多個工作進程括眠,這些工作進程能夠共用8080
端口彪标,我們請求localhost:8080
,請求任務(wù)會交給其中一個工作進程進行處理掷豺,該工作進程處理完成后捞烟,自行響應(yīng)請求。
端口占用問題
這里有個問題当船,前面例子中坷襟,出現(xiàn)多個進程監(jiān)聽相同的端口,為什么程序沒有報端口占用問題生年,由于socket套接字監(jiān)聽端口會有一個文件描述符婴程,而每個進程的文件描述符都不相同,無法讓多個進程都監(jiān)聽同一個端口抱婉,如下:
// master.js
const fork = require('child_process').fork;
const cpusLen = require('os').cpus().length;
const path = require('path');
console.log(`主進程:${process.pid}`);
for (let i = 0; i < cpusLen; i++) {
fork(path.resolve(__dirname, './work.js'));
}
// work.js
const http = require('http');
console.log(`工作進程:${process.pid}`);
http.createServer((req, res) => {
res.end('hello');
}).listen(8080);
當運行master.js
文件的時候档叔,會報端口被占用的問題(Error: listen EADDRINUSE: address already in use :::8080
)。
我們修改下蒸绩,只使用主進程監(jiān)聽端口衙四,主進程將請求套接字發(fā)放給工作進程,由工作進程來進行業(yè)務(wù)處理患亿。
// master.js
const fork = require('child_process').fork;
const cpusLen = require('os').cpus().length;
const path = require('path');
const net = require('net');
const server = net.createServer();
console.log(`主進程:${process.pid}`);
const works = [];
let current = 0
for (let i = 0; i < cpusLen; i++) {
works.push(fork(path.resolve(__dirname, './work.js')));
}
server.listen(8080, () => {
if (current > works.length - 1) current = 0
works[current++].send('server', server);
server.close();
});
// work.js
const http = require('http');
const server = http.createServer((req, res) => {
res.end('hello');
});
console.log(`工作進程:${process.pid}`);
process.on('message', (type, tcp) => {
if (type === 'server') {
tcp.on('connection', socket => {
server.emit('connection', socket)
});
}
})
實際上传蹈,cluster
新建的工作進程并沒有真正去監(jiān)聽端口押逼,在工作進程中的net server listen函數(shù)會被hack,工作進程調(diào)用listen惦界,不會有任何效果挑格。監(jiān)聽端口工作交給了主進程,該端口對應(yīng)的工作進程會被綁定到主進程中沾歪,當請求進來的時候漂彤,主進程會將請求的套接字下發(fā)給相應(yīng)的工作進程,工作進程再對請求進行處理灾搏。
接下來我們看看cluster
API
中的實現(xiàn)挫望,看下cluster
內(nèi)部是如何做到下面兩個功能:
- 主進程:對傳入的端口進行監(jiān)聽
- 工作進程:
- 主進程注冊當前工作進程,如果主進程是第一次監(jiān)聽此端口狂窑,就新建一個TCP服務(wù)器媳板,并將當前工作進程和TCP服務(wù)器綁定。
- hack掉工作進程中的listen函數(shù)泉哈,讓該進程不能監(jiān)聽端口
源碼解讀
本文使用的是node@14.15.4蛉幸。
// lib/cluster.js
'use strict';
const childOrPrimary = 'NODE_UNIQUE_ID' in process.env ? 'child' : 'primary';
module.exports = require(`internal/cluster/${childOrPrimary}`);
這個是cluster
API入口,在引用cluster
的時候旨巷,程序首先會判斷環(huán)境變量中是否存在NODE_UNIQUE_ID
變量巨缘,來確定當前程序是在主進程運行還是工作進程中運行添忘。NODE_UNIQUE_ID
實際上就是一個自增的數(shù)字采呐,是工作進程的ID,后面會在創(chuàng)建工作進程相關(guān)代碼中看到搁骑,這里就不多做解釋了斧吐。
通過前面代碼我們知道,如果在主進程中引用cluster
仲器,程序?qū)С龅氖?code>internal/cluster/primary.js這文件煤率,因此我們先看看這個文件內(nèi)部的一些實現(xiàn)。
// internal/cluster/primary.js
// ...
const EventEmitter = require('events');
const cluster = new EventEmitter();
// 下面這三個參數(shù)會在node內(nèi)部功能實現(xiàn)的時候用到乏冀,之后我們看net源碼的時候會用到這些參數(shù)
cluster.isWorker = false; // 是否是工作進程
cluster.isMaster = true; // 是否是主進程
cluster.isPrimary = true; // 是否是主進程
module.exports = cluster;
cluster.setupPrimary = function(options) {
const settings = {
args: ArrayPrototypeSlice(process.argv, 2),
exec: process.argv[1],
execArgv: process.execArgv,
silent: false,
...cluster.settings,
...options
};
cluster.settings = settings;
// ...
}
cluster.setupMaster = cluster.setupPrimary;
cluster.fork = function(env) {
cluster.setupPrimary();
const id = ++ids;
const workerProcess = createWorkerProcess(id, env);
}
const { fork } = require('child_process');
function createWorkerProcess(id, env) {
// 這里的NODE_UNIQUE_ID就是入口文件用來分辨當前進程類型用的
const workerEnv = { ...process.env, ...env, NODE_UNIQUE_ID: `${id}` };
// ...
return fork(cluster.settings.exec, cluster.settings.args, {
env: workerEnv,
// ...
});
}
cluster.fork
用來新建一個工作進程蝶糯,其內(nèi)部使用child_process
中的fork
函數(shù),來創(chuàng)建一個進程辆沦,創(chuàng)建的新進程默認會運行命令行中執(zhí)行的入口文件(process.argv[1])昼捍,當然我們也可以執(zhí)行luster.setupPrimary
或者cluster.setupMaster
并傳入exec
參數(shù)來修改工作進程執(zhí)行的文件。
我們再來簡單看下工作進程引用的internal/cluster/child.js
文件:
// internal/cluster/child.js
const EventEmitter = require('events');
const cluster = new EventEmitter();
module.exports = cluster;
// 這里定義的就是一個工作進程肢扯,后續(xù)會用到這里的參數(shù)
cluster.isWorker = true;
cluster.isMaster = false;
cluster.isPrimary = false;
cluster._getServer = function(obj, options, cb) {
// ...
};
// ...
這里我們主要記住工作進程中的cluster
有個_getServer
函數(shù)妒茬,后續(xù)流程走到這個函數(shù)的時候,會詳細看里面的代碼蔚晨。
接下來進入正題乍钻,看下net server listen
函數(shù):
// lib/net.js
Server.prototype.listen = function(...args) {
// ...
if (typeof options.port === 'number' || typeof options.port === 'string') {
// 如果是向最開始那種直接調(diào)用listen時直接傳入一個端口,就會直接進入else,我們也主要看else中的邏輯
if (options.host) {
// ...
} else {
// listen(8080, () => {...})調(diào)用方式银择,將運行這條分支
listenInCluster(this, null, options.port | 0, 4, backlog, undefined, options.exclusive);
}
return this;
}
// ...
}
function listenInCluster(server, address, port, addressType, backlog, fd, exclusive, flags) {
// ...
// 這里就用到cluster初始時寫入的isPrimary參數(shù)多糠,當前如果在主進程isPrimary就為true,反之為false欢摄。主進程會直接去執(zhí)行server._listen2函數(shù)熬丧,工作進程之后也會執(zhí)行這個函數(shù),等下一起看server._listen2內(nèi)部的功能怀挠。
if (cluster.isPrimary || exclusive) {
server._listen2(address, port, addressType, backlog, fd, flags);
return;
}
// 后面的代碼只有在工作進程中才會執(zhí)行
const serverQuery = {
address: address,
port: port,
addressType: addressType,
fd: fd,
flags,
};
// 這里執(zhí)行的是internal/cluster/child.js中的cluster._getServer析蝴,同時會傳入listenOnPrimaryHandle這個回調(diào)函數(shù),這個回調(diào)函數(shù)會在主進程添加端口監(jiān)聽绿淋,同時將工作進程綁定到對應(yīng)的TCP服務(wù)后才會執(zhí)行闷畸,里面工作就是對net server listen等函數(shù)進行hack。
cluster._getServer(server, serverQuery, listenOnPrimaryHandle);
function listenOnPrimaryHandle(err, handle) {
// ...
server._handle = handle;
server._listen2(address, port, addressType, backlog, fd, flags);
}
}
// 等工作進程執(zhí)行這個函數(shù)的時候再一起講
Server.prototype._listen2 = setupListenHandle;
function setupListenHandle(...) {
// ...
}
從上面代碼中可以得知吞滞,主進程和工作進程中執(zhí)行net server listen
都會進入到一個setupListenHandle
函數(shù)中佑菩。不過區(qū)別是,主進程是直接執(zhí)行該函數(shù)裁赠,而工作進程需要先執(zhí)行cluster._getServer
函數(shù)殿漠,讓主進程監(jiān)聽工作進程端口,同時對listen
函數(shù)進行hack處理佩捞,然后再執(zhí)行setupListenHandle
函數(shù)绞幌。接下來我們看下cluster._getServer
函數(shù)的內(nèi)部實現(xiàn)。
// lib/internal/cluster/child.js
cluster._getServer = function(obj, options, cb) {
// ...
// 這個是工作進程第一次發(fā)送內(nèi)部消息的內(nèi)容一忱。
// 注意這里act值為queryServer
const message = {
act: 'queryServer',
index,
data: null,
...options
};
// ...
// send函數(shù)內(nèi)部使用IPC信道向工作進程發(fā)送內(nèi)部消息莲蜘。主進程在使用cluster.fork新建工作進程的時候,會讓工作進程監(jiān)聽內(nèi)部消息事件帘营,下面會展示具體代碼
// send調(diào)用傳入的回調(diào)函數(shù)會被寫入到lib/internal/cluster/utils.js文件中的callbacks map中票渠,等后面要用的時候,再提取出來芬迄。
send(message, (reply, handle) => {
if (typeof obj._setServerData === 'function')
obj._setServerData(reply.data);
if (handle)
shared(reply, handle, indexesKey, index, cb);
else
// 這個函數(shù)內(nèi)部會定義一個listen函數(shù)问顷,用來hack net server listen函數(shù)
rr(reply, indexesKey, index, cb);
});
// ...
}
function send(message, cb) {
return sendHelper(process, message, null, cb);
}
// lib/internal/cluster/utils.js
// ...
const callbacks = new SafeMap();
let seq = 0;
function sendHelper(proc, message, handle, cb) {
message = { cmd: 'NODE_CLUSTER', ...message, seq };
if (typeof cb === 'function')
// 這里將傳入的回調(diào)函數(shù)記錄下來。
// 注意這里的key是遞增數(shù)字
callbacks.set(seq, cb);
seq += 1;
// 利用IPC信道禀梳,給當前工作進程發(fā)送內(nèi)部消息
return proc.send(message, handle);
}
// ...
工作進程中cluster._getServer
函數(shù)執(zhí)行杜窄,將生成一個回調(diào)函數(shù),將這個回調(diào)函數(shù)存放起來出皇,并且會使用IPC信道羞芍,向當前工作進程發(fā)送內(nèi)部消息。主進程執(zhí)行cluster.fork
生成工作進程的時候郊艘,會在工作進程中注冊internalMessage
事件荷科。接下來我們看下cluster.fork
中與工作進程注冊內(nèi)部消息事件的代碼唯咬。
// internal/cluster/primary.js
cluster.fork = function(env) {
// ...
// internal函數(shù)執(zhí)行會返回一個接收message對象的回調(diào)函數(shù)。
// 可以先看下lib/internal/cluster/utils.js中的internal函數(shù)畏浆,了解內(nèi)部的工作
worker.process.on('internalMessage', internal(worker, onmessage));
// ...
}
const methodMessageMapping = {
close,
exitedAfterDisconnect,
listening,
online,
queryServer,
};
// 第一次觸發(fā)internalMessage執(zhí)行的回調(diào)是這個函數(shù)胆胰。
// 此時message的act為queryServer
function onmessage(message, handle) {
// internal內(nèi)部在執(zhí)行onmessage時會將這個函數(shù)執(zhí)行上下文綁定到工作進程的work上
const worker = this;
// 工作進程傳入的
const fn = methodMessageMapping[message.act];
if (typeof fn === 'function')
fn(worker, message);
}
function queryServer(worker, message) {
// ...
}
// lib/internal/cluster/utils.js
// ...
const callbacks = new SafeMap();
function internal(worker, cb) {
return function onInternalMessage(message, handle) {
let fn = cb;
// 工作進程第一次發(fā)送內(nèi)部消息:ack為undefined,callback為undefined刻获,直接執(zhí)行internal調(diào)用傳入的onmessage函數(shù)蜀涨,message函數(shù)只是用于解析消息的,實際會執(zhí)行queryServer函數(shù)
// 工作進程第二次發(fā)送內(nèi)部消息:主進程queryServer函數(shù)執(zhí)行會用工作進程發(fā)送內(nèi)部消息蝎毡,并向message中添加ack參數(shù)厚柳,讓message.ack=message.seq
if (message.ack !== undefined) {
const callback = callbacks.get(message.ack);
if (callback !== undefined) {
fn = callback;
callbacks.delete(message.ack);
}
}
ReflectApply(fn, worker, arguments);
};
}
工作進程第一次發(fā)送內(nèi)部消息時,由于傳入的message.ack
(這里注意分清act
和ack
)為undefind
沐兵,因此沒辦法直接拿到cluster._getServer
中調(diào)用send
寫入的回調(diào)函數(shù)别垮,因此只能先執(zhí)行internal/cluster/primary.js
中的queryServer
函數(shù)。接下來看下queryServer
函數(shù)內(nèi)部邏輯扎谎。
// internal/cluster/primary.js
// hadles中存放的就是TCP服務(wù)器碳想。
// 主進程在代替工作進程監(jiān)聽端口生成新的TCP服務(wù)器前,
// 需要先判斷該服務(wù)器是否有創(chuàng)建毁靶,如果有胧奔,就直接復(fù)用之前的服務(wù)器,然后將工作進程綁定到相應(yīng)的服務(wù)器上预吆;如果沒有龙填,就新建一個TCP服務(wù)器,然后將工作進程綁定到新建的服務(wù)器上啡浊。
function queryServer(worker, message) {
// 這里key就是服務(wù)器的唯一標識
const key = `${message.address}:${message.port}:${message.addressType}:` +
`${message.fd}:${message.index}`;
// 從現(xiàn)存的服務(wù)器中查看是否有當前需要的服務(wù)器
let handle = handles.get(key);
// 如果沒有需要的服務(wù)器觅够,就新建一個
if (handle === undefined) {
// ...
// RoundRobinHandle構(gòu)建函數(shù)中胶背,會新建一個TCP服務(wù)器
let constructor = RoundRobinHandle;
handle = new constructor(key, address, message);
// 將這個服務(wù)器存放起來
handles.set(key, handle);
}
if (!handle.data)
handle.data = message.data;
// 可以先看下下面關(guān)于RoundRobinHandle構(gòu)建函數(shù)的代碼巷嚣,了解內(nèi)部機制
handle.add(worker, (errno, reply, handle) => {
const { data } = handles.get(key);
if (errno)
handles.delete(key);
// 這里會向工作進程中發(fā)送第二次內(nèi)部消息。
// 這里只傳了worker和message钳吟,沒有傳入handle和cb
send(worker, {
errno,
key,
ack: message.seq, // 注意這里增加了ack屬性
data,
...reply
}, handle);
});
}
function send(worker, message, handle, cb) {
return sendHelper(worker.process, message, handle, cb);
}
// internal/cluster/round_robin_handle.js
function RoundRobinHandle(key, address, { port, fd, flags }) {
// ...
this.server = net.createServer(assert.fail);
if (fd >= 0)
this.server.listen({ fd });
else if (port >= 0) {
this.server.listen({
port,
host: address,
ipv6Only: Boolean(flags & constants.UV_TCP_IPV6ONLY),
});
} else
this.server.listen(address);
// 當服務(wù)處于監(jiān)聽狀態(tài)廷粒,就會執(zhí)行這個回調(diào)。
this.server.once('listening', () => {
this.handle = this.server._handle;
this.handle.onconnection = (err, handle) => this.distribute(err, handle);
this.server._handle = null;
// 注意:如果監(jiān)聽成功红且,就會將server刪除
this.server = null;
});
}
RoundRobinHandle.prototype.add = function(worker, send) {
const done = () => {
if (this.handle.getsockname) {
// ...
send(null, { sockname: out }, null);
} else {
send(null, null, null); // UNIX socket.
}
// ...
};
// 如果在add執(zhí)行前server就已經(jīng)處于listening狀態(tài)坝茎,this.server就會為null
if (this.server === null)
return done();
// 如果add執(zhí)行后,server才處于listening暇番,就會走到這里嗤放,始終都會執(zhí)行add調(diào)用時傳入的回調(diào)
this.server.once('listening', done);
}
在這一步,主進程替工作進程生成或者是獲取了一個可用的TCP服務(wù)器壁酬,并將工作進程與相應(yīng)的服務(wù)器綁定在一起(方便后續(xù)請求任務(wù)分配)次酌。當工作進程綁定完成以后恨课,就向工作進程中發(fā)送了第二次內(nèi)部消息,接下來我們再次進入lib/internal/cluster/utils.js
看看內(nèi)部流程:
// lib/internal/cluster/utils.js
const callbacks = new SafeMap();
function internal(worker, cb) {
// 注意這里handle為undefined
return function onInternalMessage(message, handle) {
let fn = cb;
// 第二次工作進程內(nèi)部消息執(zhí)行的時候message.ack已經(jīng)被賦值為message.seq
// 因此這次能夠獲取到之前l(fā)ib/cluster.child.js cluster._getServer函數(shù)執(zhí)行是調(diào)用send寫入的回調(diào)函數(shù)
if (message.ack !== undefined) {
const callback = callbacks.get(message.ack);
if (callback !== undefined) {
fn = callback;
callbacks.delete(message.ack);
}
}
ReflectApply(fn, worker, arguments);
};
}
工作進程第二次接受到內(nèi)部消息時岳服,cluster._getServer函數(shù)執(zhí)行是調(diào)用send寫入的回調(diào)函數(shù)會被執(zhí)行剂公,接下來看下send寫入的回調(diào)函數(shù)內(nèi)容:
// lib/internal/cluster/child.js
send(message, (reply, handle) => {
// 此時handle為undefined,流程會直接運行rr函數(shù)
if (handle)
shared(reply, handle, indexesKey, index, cb);
else
// 這里的cb是lib/net.js在執(zhí)行cluster._getServer時傳入listenOnPrimaryHandle函數(shù)吊宋,后面會介紹他的工作纲辽。
rr(reply, indexesKey, index, cb);
});
function rr(message, indexesKey, index, cb) {
let key = message.key;
// 這里定義的listen用于hack net server.listen,在工作進程中執(zhí)行l(wèi)isten璃搜,工作進程并不會真正去監(jiān)聽端口
function listen(backlog) {
return 0;
}
function close() {...}
function getsockname(out) {...}
const handle = { close, listen, ref: noop, unref: noop };
handles.set(key, handle);
// 執(zhí)行傳入的listenOnPrimaryHandle函數(shù)
cb(0, handle);
}
rr
函數(shù)執(zhí)行拖吼,會新建幾個與net server
中同名的函數(shù),并通過handle傳入listenOnPrimaryHandle
函數(shù)这吻。
// lib/net.js
function listenInCluster(...) {
cluster._getServer(server, serverQuery, listenOnPrimaryHandle);
// listenOnPrimaryHandle函數(shù)中將工作進程生成的server._handle對象替換成自定義的handle對象绿贞,后續(xù)server listen執(zhí)行的就是server._handle中的listen函數(shù),因此這里就完成了對工作進程中的listen函數(shù)hack
function listenOnPrimaryHandle(err, handle) {
// ...
// handle:{ listen: ..., close: ...., ... }
server._handle = handle;
server._listen2(address, port, addressType, backlog, fd, flags);
}
}
下面看下server._listen2
函數(shù)執(zhí)行內(nèi)容
Server.prototype._listen2 = setupListenHandle;
function setupListenHandle(address, port, addressType, backlog, fd, flags) {
// 忽略橘原,只要是從工作進程進來的籍铁,this._handle就是自己定義的對象內(nèi)容
if (this._handle) {
debug('setupListenHandle: have a handle already');
} else {
// 主進程會進入這一層邏輯,會在這里生成一個服務(wù)器
// ...
rval = createServerHandle(address, port, addressType, fd, flags);
// ...
this._handle = rval;
}
const err = this._handle.listen(backlog || 511);
// ...
}
至此趾断,工作進程端口監(jiān)聽相關(guān)的源碼就看完了拒名,現(xiàn)在差不多了解到工作進程中執(zhí)行net server listen
時,工作進程并不會真正去監(jiān)聽端口芋酌,端口監(jiān)聽工作始終會交給主進程來完成增显。主進程在接到工作進程發(fā)來的端口監(jiān)聽的時候,首先會判斷是否有相同的服務(wù)器脐帝,如果有同云,就直接將工作進程綁定到對應(yīng)的服務(wù)器上,這樣就不會出現(xiàn)端口被占用的問題堵腹;如果沒有對應(yīng)的服務(wù)器炸站,就生成一個新的服務(wù)。主進程接受到請求的時候疚顷,就會將請求任務(wù)分配給工作進程旱易,如何分配,就需要看具體使用的哪種負載均衡了腿堤。