看了cluster不明白他是怎么搞得。為什么master進(jìn)程沒(méi)有監(jiān)聽(tīng)端口號(hào),就能實(shí)現(xiàn)集群模狭∩┧冢看了下資料和源碼读虏,這里做一下簡(jiǎn)單的總結(jié)责静。。
1.cluster.isMaster怎么識(shí)別的盖桥?主進(jìn)程fork的時(shí)候給到子進(jìn)程一個(gè)NODE_UNIQUE_ID灾螃。所以只要環(huán)境變量有這個(gè)參數(shù)就是子進(jìn)程。
2.主進(jìn)程沒(méi)有創(chuàng)建服務(wù)器的語(yǔ)句揩徊,到底有沒(méi)有創(chuàng)建腰鬼?都知道http基于tcp傳輸層的,既然主進(jìn)程沒(méi)有創(chuàng)建服務(wù)器的語(yǔ)句塑荒,是否是通過(guò)子進(jìn)程的相關(guān)操作實(shí)現(xiàn)的熄赡。這里翻開(kāi)nodejs源碼對(duì)于listen的實(shí)現(xiàn):
Server.prototype.listen=function(...args) {
varnormalized=normalizeArgs(args);
varoptions=normalized[0];
varcb=normalized[1];
if(this._handle) {
thrownewerrors.Error('ERR_SERVER_ALREADY_LISTEN');
}
varhasCallback=(cb!==null);
if(hasCallback) {
this.once('listening', cb);
}
varbacklogFromArgs=
// (handle, backlog) or (path, backlog) or (port, backlog)
toNumber(args.length>1&&args[1])||
toNumber(args.length>2&&args[2]);// (port, host, backlog)
options=options._handle||options.handle||options;
// (handle[, backlog][, cb]) where handle is an object with a handle
if(optionsinstanceofTCP) {
this._handle=options;
this[async_id_symbol]=this._handle.getAsyncId();
listenInCluster(this,null,-1,-1, backlogFromArgs);
return this;
}
// (handle[, backlog][, cb]) where handle is an object with a fd
if(typeofoptions.fd==='number'&&options.fd>=0) {
listenInCluster(this,null,null,null, backlogFromArgs, options.fd);
return this;
}
// ([port][, host][, backlog][, cb]) where port is omitted,
// that is, listen(), listen(null), listen(cb), or listen(null, cb)
// or (options[, cb]) where options.port is explicitly set as undefined or
// null, bind to an arbitrary unused port
if(args.length===0|| typeofargs[0]==='function'||
(typeofoptions.port==='undefined'&&'port'inoptions)||
options.port===null) {
options.port=0;
}
// ([port][, host][, backlog][, cb]) where port is specified
// or (options[, cb]) where options.port is specified
// or if options.port is normalized as 0 before
varbacklog;
if(typeofoptions.port==='number'|| typeofoptions.port==='string') {
if(!isLegalPort(options.port)) {
thrownewRangeError('"port" argument must be >= 0 and < 65536');
}
backlog=options.backlog||backlogFromArgs;
// start TCP server listening on host:port
if(options.host) {
lookupAndListen(this, options.port|0, options.host, backlog,
options.exclusive);
}else{// Undefined host, listens on unspecified address
// Default addressType 4 will be used to search for master server
listenInCluster(this,null, options.port|0,4,
backlog,undefined, options.exclusive);
}
return this;
}
// (path[, backlog][, cb]) or (options[, cb])
// where path or options.path is a UNIX domain socket or Windows pipe
if(options.path&&isPipeName(options.path)) {
varpipeName=this._pipeName=options.path;
backlog=options.backlog||backlogFromArgs;
listenInCluster(this, pipeName,-1,-1,
backlog,undefined, options.exclusive);
return this;
}
thrownewError('Invalid listen argument: '+util.inspect(options));
};
可以看出參數(shù)可以是很多類型,這里針對(duì)端口進(jìn)行分析齿税,發(fā)現(xiàn)參數(shù)是端口時(shí)彼硫,執(zhí)行了listenInCluster()函數(shù)然,翻開(kāi)這個(gè)listenInCluster函數(shù)凌箕,發(fā)現(xiàn)代碼是:
function listenInCluster(server, address, port, addressType,
backlog, fd, exclusive) {
exclusive= !!exclusive;
if(cluster===null) cluster=require('cluster');
if(cluster.isMaster||exclusive) {
// Will create a new handle
// _listen2 sets up the listened handle, it is still named like this
// to avoid breaking code that wraps this method
server._listen2(address, port, addressType, backlog, fd);
return;
}
constserverQuery={
address:address,
port:port,
addressType:addressType,
fd:fd,
flags:0
};
// Get the master's server handle, and listen on it
cluster._getServer(server, serverQuery, listenOnMasterHandle);
function listenOnMasterHandle(err, handle) {
err=checkBindError(err, port, handle);
if(err) {
varex=exceptionWithHostPort(err,'bind', address, port);
returnserver.emit('error', ex);
}
// Reuse master's server handle
server._handle=handle;
// _listen2 sets up the listened handle, it is still named like this
// to avoid breaking code that wraps this method
server._listen2(address, port, addressType, backlog, fd);
}
}
這個(gè)代碼很有意思拧篮,如果是主進(jìn)程執(zhí)行server._listen2(address, port, addressType, backlog, fd);然后return
如果手機(jī)子進(jìn)程發(fā)送serverQuery然后執(zhí)行回調(diào)listenOnMasterHandle,這個(gè)回調(diào)里也是server._listen2(address, port, addressType, backlog, fd);
cluster_getServer的源碼是:
cluster._getServer = function(obj, options, cb) {
const indexesKey = [options.address,
options.port,
options.addressType,
options.fd ].join(':');
if (indexes[indexesKey] === undefined)
indexes[indexesKey] = 0;
else
indexes[indexesKey]++;
const message = util._extend({
act: 'queryServer',
index: indexes[indexesKey],
data: null
}, options);
// Set custom data on handle (i.e. tls tickets key)
if (obj._getServerData)
message.data = obj._getServerData();
send(message, (reply, handle) => {
if (typeof obj._setServerData === 'function')
obj._setServerData(reply.data);
if (handle)
shared(reply, handle, indexesKey, cb); // Shared listen socket.
else
rr(reply, indexesKey, cb); // Round-robin.
});
obj.once('listening', () => {
cluster.worker.state = 'listening';
const address = obj.address();
message.act = 'listening';
message.port = address && address.port || options.port;
send(message);
});
};
大概的意思是將這個(gè)worker的信息比如端口什么的發(fā)給主進(jìn)程牵舱,按照內(nèi)容起個(gè)服務(wù)器串绩,然后就是
if (handle)
shared(reply, handle, indexesKey, cb); // Shared listen socket.
else
rr(reply, indexesKey, cb); // Round-robin.
});
有句柄的話分享這個(gè)shock,沒(méi)有的話執(zhí)行rr函數(shù):
function rr(message, indexesKey, cb) {
if (message.errno)
return cb(message.errno, null);
var key = message.key;
function listen(backlog) {
// TODO(bnoordhuis) Send a message to the master that tells it to
// update the backlog size. The actual backlog should probably be
// the largest requested size by any worker.
return 0;
}
function close() {
// lib/net.js treats server._handle.close() as effectively synchronous.
// That means there is a time window between the call to close() and
// the ack by the master process in which we can still receive handles.
// onconnection() below handles that by sending those handles back to
// the master.
if (key === undefined)
return;
send({ act: 'close', key });
delete handles[key];
delete indexes[indexesKey];
key = undefined;
}
function getsockname(out) {
if (key)
util._extend(out, message.sockname);
return 0;
}
// Faux handle. Mimics a TCPWrap with just enough fidelity to get away
// with it. Fools net.Server into thinking that it's backed by a real
// handle. Use a noop function for ref() and unref() because the control
// channel is going to keep the worker alive anyway.
const handle = { close, listen, ref: noop, unref: noop };
if (message.sockname) {
handle.getsockname = getsockname; // TCP handles only.
}
assert(handles[key] === undefined);
handles[key] = handle;
cb(0, handle);
}
發(fā)現(xiàn)這個(gè)listen函數(shù) return了并沒(méi)有操作芜壁,將函數(shù)給到handles赏参,然后callback出去,由上面的server._handle=handle接收沿盅。所以其實(shí)工作進(jìn)程的監(jiān)聽(tīng)被hack了,并沒(méi)有操作纫溃。腰涧。
這里再看下server._listen2,源碼中Server.prototype._listen2=setupListenHandle;那么看下setupListenHandle的實(shí)現(xiàn)吧:
function setupListenHandle(address, port, addressType, backlog, fd) {
debug('setupListenHandle', address, port, addressType, backlog, fd);
// If there is not yet a handle, we need to create one and bind.
// In the case of a server sent via IPC, we don't need to do this.
if(this._handle) {
debug('setupListenHandle: have a handle already');
}else{
debug('setupListenHandle: create a handle');
varrval=null;
// Try to bind to the unspecified IPv6 address, see if IPv6 is available
if(!address&& typeoffd!=='number') {
rval=createServerHandle('::', port,6, fd);
if(typeofrval==='number') {
rval=null;
address='0.0.0.0';
addressType=4;
}else{
address='::';
addressType=6;
}
}
if(rval===null)
rval=createServerHandle(address, port, addressType, fd);
if(typeofrval==='number') {
varerror=exceptionWithHostPort(rval,'listen', address, port);
process.nextTick(emitErrorNT,this, error);
return;
}
this._handle=rval;
}
this[async_id_symbol]=getNewAsyncId(this._handle);
this._handle.onconnection=onconnection;
this._handle.owner=this;
// Use a backlog of 512 entries. We pass 511 to the listen() call because
// the kernel does: backlogsize = roundup_pow_of_two(backlogsize + 1);
// which will thus give us a backlog of 512 entries.
varerr=this._handle.listen(backlog||511);
if(err) {
varex=exceptionWithHostPort(err,'listen', address, port);
this._handle.close();
this._handle=null;
nextTick(this[async_id_symbol], emitErrorNT,this, ex);
return;
}
// generate connection key, this should be unique to the connection
this._connectionKey=addressType+':'+address+':'+port;
// unref the handle if the server was unref'ed prior to listening
if(this._unref)
this.unref();
nextTick(this[async_id_symbol], emitListeningNT,this);
}
這里發(fā)現(xiàn)代碼成功了執(zhí)行的時(shí)createServerHandle函數(shù)紊浩,聽(tīng)名字是創(chuàng)造socket句柄 窖铡,如果_handle存在就不創(chuàng)建,不存在創(chuàng)建socket坊谁,如前面所寫(xiě)子進(jìn)程已經(jīng)創(chuàng)建了socket费彼,所以不會(huì)再創(chuàng)建socket,所以子進(jìn)程雖然listen了口芍,但是其實(shí)只是表面的而已箍铲。具體服務(wù)器如何接收客戶端請(qǐng)求,涉及到c鬓椭,不會(huì)c颠猴,應(yīng)該是調(diào)用一些底層的東西實(shí)現(xiàn)的吧关划。