前一章節(jié)我們分析了客戶端啟動(Bootstrap)的流程靶擦,接下來我們就分析下服務(wù)端的一個啟動流程吧坷襟。代碼來自Netty官方example的echo示例心墅。
/**
* Echoes back any received data from a client.
*/
public final class EchoServer {
static final boolean SSL = System.getProperty("ssl") != null;
static final int PORT = Integer.parseInt(System.getProperty("port", "8007"));
public static void main(String[] args) throws Exception {
// Configure SSL.
final SslContext sslCtx;
if (SSL) {
SelfSignedCertificate ssc = new SelfSignedCertificate();
sslCtx = SslContextBuilder.forServer(ssc.certificate(), ssc.privateKey()).build();
} else {
sslCtx = null;
}
// Configure the server.
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
final EchoServerHandler serverHandler = new EchoServerHandler();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 100)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
if (sslCtx != null) {
p.addLast(sslCtx.newHandler(ch.alloc()));
}
//p.addLast(new LoggingHandler(LogLevel.INFO));
p.addLast(serverHandler);
}
});
// Start the server.
ChannelFuture f = b.bind(PORT).sync();
// Wait until the server socket is closed.
f.channel().closeFuture().sync();
} finally {
// Shut down all event loops to terminate all threads.
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
和客戶端代碼相比大致是相同的枷遂,只有些許配置部分不同漆枚。
- 配置EventLoopGroup(NioEventLoopGroup);
- 配置Channel(NioServerSocketChannel)膀曾;
- 配置Handler
配置EventLoopGroup
public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) {
super.group(parentGroup);
if (this.childGroup != null) {
throw new IllegalStateException("childGroup set already");
}
this.childGroup = ObjectUtil.checkNotNull(childGroup, "childGroup");
return this;
}
在服務(wù)端這里我們配置了2個EventLoopGroup县爬,之所有要配置2個是因?yàn)椋琋etty將其分別處理不同的任務(wù)添谊,1個用來處理客戶端的連接财喳,一個處理客戶端的IO任務(wù),各司其職斩狱,才能更加高效的完成網(wǎng)絡(luò)任務(wù)耳高,如下圖任務(wù)所示。
這里的BossGroup配置調(diào)用了父類AbstractBootstrap的構(gòu)造方法所踊,如下所示:
public B group(EventLoopGroup group) {
ObjectUtil.checkNotNull(group, "group");
if (this.group != null) {
throw new IllegalStateException("group set already");
}
this.group = group;
return self();
}
也就是我們配置的BossGroup最終是放到了ServerBootstrap的group字段中泌枪,而WorkerGroup放到了ServerBootstrap的childGroup中。
配置Channel
繼續(xù)往下走便到了配置Channel的類型了秕岛,因?yàn)槭欠?wù)端所以我們這里配置的是一個NioServerSocketChannel.class:
public B channel(Class<? extends C> channelClass) {
return channelFactory(new ReflectiveChannelFactory<C>(
ObjectUtil.checkNotNull(channelClass, "channelClass")
));
}
這里我們實(shí)例化了一個ReflectiveChannelFactory類碌燕,通過名字可以知道這是一個Channel反射工廠類,看下其實(shí)現(xiàn):
public class ReflectiveChannelFactory<T extends Channel> implements ChannelFactory<T> {
private final Constructor<? extends T> constructor;
public ReflectiveChannelFactory(Class<? extends T> clazz) {
this.constructor = clazz.getConstructor();
}
@Override
public T newChannel() {
return constructor.newInstance();
}
}
通過源碼可以看出继薛,該類的作用是根據(jù)傳遞進(jìn)來的Channel類型獲取對應(yīng)的默認(rèn)構(gòu)造方法修壕,最后通過newChannel方法實(shí)例化Channel對象。
實(shí)例化ReflectiveChannelFactory對象后遏考,通過channelFactory方法設(shè)置對應(yīng)的ChannelFactory對象
public B channelFactory(io.netty.channel.ChannelFactory<? extends C> channelFactory) {
return channelFactory((ChannelFactory<C>) channelFactory);
}
配置handler
public B handler(ChannelHandler handler) {
this.handler = ObjectUtil.checkNotNull(handler, "handler");
return self();
}
這里我們配置了一個LoggingHandler并且設(shè)置日志級別為LogLevel.INFO慈鸠。
配置ChildHandler
public ServerBootstrap childHandler(ChannelHandler childHandler) {
this.childHandler = ObjectUtil.checkNotNull(childHandler, "childHandler");
return this;
}
這里使用了Netty提供的一個特殊ChannelHandler抽象類ChannelInitializer,這里我們先不表诈皿,后面的內(nèi)容會講到該內(nèi)容林束。
綁定
當(dāng)所有的參數(shù)設(shè)置好之后,就到了我們綁定的階段了稽亏,先來看下代碼實(shí)現(xiàn)吧:
public ChannelFuture bind(int inetPort) {
return bind(new InetSocketAddress(inetPort));
}
public ChannelFuture bind(SocketAddress localAddress) {
//校驗(yàn)group和ChannelFactory是否為空,如果為空則會拋出IllegalStateException異常
validate();
return doBind(ObjectUtil.checkNotNull(localAddress, "localAddress"));
}
private ChannelFuture doBind(final SocketAddress localAddress) {
//初始化并注冊Channel
final ChannelFuture regFuture = initAndRegister();
final Channel channel = regFuture.channel();
if (regFuture.cause() != null) {
return regFuture;
}
if (regFuture.isDone()) {
// At this point we know that the registration was complete and successful.
ChannelPromise promise = channel.newPromise();
//綁定本地端口
doBind0(regFuture, channel, localAddress, promise);
return promise;
} else {
// Registration future is almost always fulfilled already, but just in case it's not.
final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
regFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
Throwable cause = future.cause();
if (cause != null) {
// Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
// IllegalStateException once we try to access the EventLoop of the Channel.
promise.setFailure(cause);
} else {
// Registration was successful, so set the correct executor to use.
// See https://github.com/netty/netty/issues/2586
promise.registered();
doBind0(regFuture, channel, localAddress, promise);
}
}
});
return promise;
}
}
這里我們先看下initAndRegister方法,這個方法我們在Bootstrap中也有講到過缕题,這里繼續(xù)來看下:
final ChannelFuture initAndRegister() {
//...省略部分代碼
Channel channel = channelFactory.newChannel();
init(channel);
return regFuture;
}
該方法中使用到了ChannelFactory的newChannel方法截歉,在之前我們提到過ChannelFactory是用于將給定的Channel類型類,然后通過反射構(gòu)造方法進(jìn)行實(shí)例化對象烟零,所以我們這里實(shí)例化的對象為NioServerSocketChannel瘪松。接下來看下該類的一個結(jié)構(gòu)圖咸作。
再來看下NioServerSocketChannel的默認(rèn)構(gòu)造方法里做了哪些處理吧。
public class NioServerSocketChannel extends AbstractNioMessageChannel implements io.netty.channel.socket.ServerSocketChannel {
private static final ChannelMetadata METADATA = new ChannelMetadata(false, 16);
private static final SelectorProvider DEFAULT_SELECTOR_PROVIDER = SelectorProvider.provider();
private static ServerSocketChannel newSocket(SelectorProvider provider) {
return provider.openServerSocketChannel();
}
private final ServerSocketChannelConfig config;
public NioServerSocketChannel() {
this(newSocket(DEFAULT_SELECTOR_PROVIDER));
}
public NioServerSocketChannel(SelectorProvider provider) {
this(newSocket(provider));
}
public NioServerSocketChannel(ServerSocketChannel channel) {
super(null, channel, SelectionKey.OP_ACCEPT);
config = new NioServerSocketChannelConfig(this, javaChannel().socket());
}
}
在NioServerSocketChannel默認(rèn)構(gòu)造函數(shù)中宵睦,調(diào)用了newSocket方法记罚,通過該方法開啟了一個服務(wù)端的ServerSocketChannel,最終調(diào)用父類AbstractNioChannel的構(gòu)造方法:
protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
super(parent);
this.ch = ch;
this.readInterestOp = readInterestOp;
ch.configureBlocking(false);
}
在NioServerSocketChannel中我們配置感興趣的事件為SelectionKey.OP_ACCEPT
壳嚎,代表只監(jiān)聽連接事件桐智,父類AbstractNioChannel這里設(shè)置了Channel事件,可以看到這里還調(diào)用了父類方法將parent參數(shù)傳遞進(jìn)去烟馅,來看下具體實(shí)現(xiàn)说庭。
protected AbstractChannel(Channel parent) {
this.parent = parent;
//實(shí)例化一個DefaultChannelId
id = newId();
//實(shí)例化對象為NioMessageUnsafe
unsafe = newUnsafe();
//實(shí)例化DefaultChannelPipeline(AbstractHandlerContext為head、tail的雙向鏈表Handler節(jié)點(diǎn))
pipeline = newChannelPipeline();
}
再回到我們的initAndRegister方法郑趁,繼續(xù)往下執(zhí)行就到了init方法了刊驴,因?yàn)樵诟割怉bstractBootstrap中該方法是一個抽象方法,所以這個方法的實(shí)現(xiàn)是交給ServerBootstrap來實(shí)現(xiàn)的寡润,如下所示:
void init(Channel channel) {
//設(shè)置Channel的一些網(wǎng)絡(luò)配置選項(xiàng)
setChannelOptions(channel, newOptionsArray(), logger);
//設(shè)置Channel的一些配置屬性
setAttributes(channel, attrs0().entrySet().toArray(EMPTY_ATTRIBUTE_ARRAY));
ChannelPipeline p = channel.pipeline();
final EventLoopGroup currentChildGroup = childGroup;
final ChannelHandler currentChildHandler = childHandler;
final Entry<ChannelOption<?>, Object>[] currentChildOptions;
synchronized (childOptions) {
currentChildOptions = childOptions.entrySet().toArray(EMPTY_OPTION_ARRAY);
}
final Entry<AttributeKey<?>, Object>[] currentChildAttrs = childAttrs.entrySet().toArray(EMPTY_ATTRIBUTE_ARRAY);
p.addLast(new ChannelInitializer<Channel>() {
@Override
public void initChannel(final Channel ch) {
final ChannelPipeline pipeline = ch.pipeline();
ChannelHandler handler = config.handler();
if (handler != null) {
pipeline.addLast(handler);
}
ch.eventLoop().execute(new Runnable() {
@Override
public void run() {
pipeline.addLast(new ServerBootstrapAcceptor(
ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
}
});
}
init方法向ChannelPipeline添加了一個ChannelInitializer抽象類Handler捆憎,該Handler的initChannel方法會在該Handler添加后調(diào)用,看下其實(shí)現(xiàn)梭纹。
public final ChannelPipeline addLast(ChannelHandler... handlers) {
return addLast(null, handlers);
}
public final ChannelPipeline addLast(EventExecutorGroup executor, ChannelHandler... handlers) {
for (ChannelHandler h: handlers) {
if (h == null) {
break;
}
addLast(executor, null, h);
}
return this;
}
public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
final AbstractChannelHandlerContext newCtx;
synchronized (this) {
//檢查是否重復(fù)添加
checkMultiplicity(handler);
//實(shí)例化一個DefaultChannelHandlerContext
newCtx = newContext(group, filterName(name, handler), handler);
//將其添加到雙向鏈表中
addLast0(newCtx);
//如果registered是false意味著Channel還沒有注冊到EventLoop.
//在這種情況下躲惰,我們將上下文添加到管道中,并添加一個任務(wù)栗柒,該任務(wù)將在注冊通道后調(diào)用ChannelHandler.handlerAdded(...)
if (!registered) {
newCtx.setAddPending();
callHandlerCallbackLater(newCtx, true);
return this;
}
EventExecutor executor = newCtx.executor();
if (!executor.inEventLoop()) {
callHandlerAddedInEventLoop(newCtx, executor);
return this;
}
}
callHandlerAdded0(newCtx);
return this;
}
這里我們通過斷點(diǎn)調(diào)試發(fā)現(xiàn)registered這個變量是false礁扮,這就意味著這時(shí)候Channel還沒有注冊到EventLoop上的,所以我們來看下callHandlerCallbackLater方法實(shí)現(xiàn):
private void callHandlerCallbackLater(AbstractChannelHandlerContext ctx, boolean added) {
assert !registered;
PendingHandlerCallback task = added ? new PendingHandlerAddedTask(ctx) : new PendingHandlerRemovedTask(ctx);
PendingHandlerCallback pending = pendingHandlerCallbackHead;
if (pending == null) {
pendingHandlerCallbackHead = task;
} else {
// Find the tail of the linked-list.
while (pending.next != null) {
pending = pending.next;
}
pending.next = task;
}
}
可以看到這里使用了一個待處理的PendingHandlerAddedTask類對象來處理瞬沦,看下其實(shí)現(xiàn):
private final class PendingHandlerAddedTask extends PendingHandlerCallback {
PendingHandlerAddedTask(AbstractChannelHandlerContext ctx) {
super(ctx);
}
@Override
public void run() {
callHandlerAdded0(ctx);
}
@Override
void execute() {
EventExecutor executor = ctx.executor();
if (executor.inEventLoop()) {
callHandlerAdded0(ctx);
} else {
//...省略try-catch代碼
executor.execute(this);
}
}
}
這個任務(wù)采用異步待處理任務(wù)來執(zhí)行任務(wù)太伊,最終的任務(wù)是通過調(diào)用callHandlerAdded0方法來實(shí)現(xiàn)的,看下其實(shí)現(xiàn):
private void callHandlerAdded0(final AbstractChannelHandlerContext ctx) {
//...
ctx.callHandlerAdded();
}
這里繼續(xù)深入AbstractChannelHandlerContext#callHandlerAdded方法:
final void callHandlerAdded() throws Exception {
// We must call setAddComplete before calling handlerAdded. Otherwise if the handlerAdded method generates
// any pipeline events ctx.handler() will miss them because the state will not allow it.
if (setAddComplete()) {
handler().handlerAdded(this);
}
}
到這里我們就清晰了逛钻,這里調(diào)用的并是ChannelInitializer#handlerAdded方法了:
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
if (ctx.channel().isRegistered()) {
if (initChannel(ctx)) {
//移除該ChannelHandlerContext也就是對應(yīng)的ChannelInitializer抽象類
removeState(ctx);
}
}
}
這里判斷了NioServerSocketChannel是否已經(jīng)注冊了僚焦,然后調(diào)用initChannel方法,如下:
p.addLast(new ChannelInitializer<Channel>() {
@Override
public void initChannel(final Channel ch) {
final ChannelPipeline pipeline = ch.pipeline();
ChannelHandler handler = config.handler();
if (handler != null) {
pipeline.addLast(handler);
}
ch.eventLoop().execute(new Runnable() {
@Override
public void run() {
pipeline.addLast(new ServerBootstrapAcceptor(
ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
}
});
在initChannel方法中曙痘,首先獲取ChannelPipeline芳悲,這里handler()獲取的便是我們之前的配置Handler,如果有配置Handler便將其添加到ChannelPipeline中边坤,這里的handler()我們配置的是LoggingHandler名扛。最后通過綁定在該Channel上的EventLoop線程執(zhí)行一個異步任務(wù),將ServerBootstrapAcceptor添加到ChannelPipeline中茧痒,來看下其實(shí)現(xiàn):
private static class ServerBootstrapAcceptor extends ChannelInboundHandlerAdapter {
//配置的WorkerGroup
private final EventLoopGroup childGroup;
//配置的childHandler
private final ChannelHandler childHandler;
//配置的childOptions
private final Entry<ChannelOption<?>, Object>[] childOptions;
//配置的childAttrs
private final Entry<AttributeKey<?>, Object>[] childAttrs;
private final Runnable enableAutoReadTask;
ServerBootstrapAcceptor(final Channel channel, EventLoopGroup childGroup, ChannelHandler childHandler, Entry<ChannelOption<?>, Object>[] childOptions, Entry<AttributeKey<?>, Object>[] childAttrs) {
this.childGroup = childGroup;
this.childHandler = childHandler;
this.childOptions = childOptions;
this.childAttrs = childAttrs;
enableAutoReadTask = new Runnable() {
@Override
public void run() {
channel.config().setAutoRead(true);
}
};
}
public void channelRead(ChannelHandlerContext ctx, Object msg) {
//這里的msg便是接收的客戶端連接對象Channel(NioSocketChannel)
final Channel child = (Channel) msg;
//將配置的childHandler添加到NioSocketChannel的ChannelPipeline中
child.pipeline().addLast(childHandler);
//設(shè)置ChannelOption
setChannelOptions(child, childOptions, logger);
//設(shè)置Channel屬性
setAttributes(child, childAttrs);
//執(zhí)行EventLoop與Channel的綁定工作
childGroup.register(child).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
forceClose(child, future.cause());
}
}
});
}
}
有好奇的小伙伴可能會有疑問channelRead方法是在什么時(shí)候調(diào)用的肮韧,我也比較好奇,不過這里先不表。
再回到我們的initAndRegister方法弄企,繼續(xù)往下執(zhí)行:
//...
ChannelFuture regFuture = config().group().register(channel);
if (regFuture.cause() != null) {
if (channel.isRegistered()) {
channel.close();
} else {
channel.unsafe().closeForcibly();
}
}
//...
這里主要關(guān)注register(channel)方法超燃,這里group()方法也就是我們之前配置的NioEventLoopGroup(BossGroup),跳該方法是由父類MultithreadEventLoopGroup實(shí)現(xiàn)的拘领,代碼如下:
public ChannelFuture register(Channel channel) {
return next().register(channel);
}
通過next()方法我們返回的是NioEventLoop對象意乓,該方法也是由父類SingleThreadEventLoop來實(shí)現(xiàn)的,如下:
public ChannelFuture register(Channel channel) {
return register(new DefaultChannelPromise(channel, this));
}
public ChannelFuture register(final ChannelPromise promise) {
ObjectUtil.checkNotNull(promise, "promise");
promise.channel().unsafe().register(this, promise);
return promise;
}
這里又有熟悉的Unsafe對象约素,其實(shí)現(xiàn)為NioMessageUnsafe届良,該方法也是由父類AbstractUnsafe來實(shí)現(xiàn)的,如下:
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
ObjectUtil.checkNotNull(eventLoop, "eventLoop");
if (isRegistered()) {
promise.setFailure(new IllegalStateException("registered to an event loop already"));
return;
}
if (!isCompatible(eventLoop)) {
promise.setFailure(new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
return;
}
AbstractChannel.this.eventLoop = eventLoop;
if (eventLoop.inEventLoop()) {
register0(promise);
} else {
try {
eventLoop.execute(new Runnable() {
@Override
public void run() {
register0(promise);
}
});
} catch (Throwable t) {
logger.warn(
"Force-closing a channel whose registration task was not accepted by an event loop: {}",
AbstractChannel.this, t);
closeForcibly();
closeFuture.setClosed();
safeSetFailure(promise, t);
}
}
}
這里我們跟進(jìn)register0方法即可:
private void register0(ChannelPromise promise) {
try {
if (!promise.setUncancellable() || !ensureOpen(promise)) {
return;
}
boolean firstRegistration = neverRegistered;
doRegister();
neverRegistered = false;
registered = true;
// Ensure we call handlerAdded(...) before we actually notify the promise. This is needed as the
// user may already fire events through the pipeline in the ChannelFutureListener.
pipeline.invokeHandlerAddedIfNeeded();
safeSetSuccess(promise);
pipeline.fireChannelRegistered();
// Only fire a channelActive if the channel has never been registered. This prevents firing
// multiple channel actives if the channel is deregistered and re-registered.
if (isActive()) {
if (firstRegistration) {
pipeline.fireChannelActive();
} else if (config().isAutoRead()) {
// This channel was registered before and autoRead() is set. This means we need to begin read
// again so that we process inbound data.
//
// See https://github.com/netty/netty/issues/4805
beginRead();
}
}
} catch (Throwable t) {
// Close the channel directly to avoid FD leak.
closeForcibly();
closeFuture.setClosed();
safeSetFailure(promise, t);
}
}
這里繼續(xù)跟進(jìn)doRegister方法业汰,該方法由抽象類AbstractNioChannel完成:
protected void doRegister() throws Exception {
boolean selected = false;
for (;;) {
try {
//將該channel注冊到selector多路復(fù)用器
selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
return;
} catch (CancelledKeyException e) {
if (!selected) {
// Force the Selector to select now as the "canceled" SelectionKey may still be
// cached and not removed because no Select.select(..) operation was called yet.
eventLoop().selectNow();
selected = true;
} else {
// We forced a select operation on the selector before but the SelectionKey is still cached
// for whatever reason. JDK bug ?
throw e;
}
}
}
}
在完成initAndRegister方法后伙窃,繼續(xù)往下執(zhí)行來到了doBind0方法,如下:
private static void doBind0(
final ChannelFuture regFuture, final Channel channel,
final SocketAddress localAddress, final ChannelPromise promise) {
channel.eventLoop().execute(new Runnable() {
@Override
public void run() {
if (regFuture.isSuccess()) {
channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
} else {
promise.setFailure(regFuture.cause());
}
}
});
}
這里是通過綁定在Channel上的EventLoop調(diào)度一個異步任務(wù)執(zhí)行channel.bind方法样漆,這里的Channel是NioServerSocketChannel为障,我們跟進(jìn)該方法:
public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
return pipeline.bind(localAddress, promise);
}
這里調(diào)用的是ChannelPipeline的bind方法,繼續(xù)跟進(jìn):
public final ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
return tail.bind(localAddress, promise);
}
這里使用的是tail(AbstractHandlerContext)節(jié)點(diǎn)bind方法放祟,繼續(xù)進(jìn)去:
public ChannelFuture bind(final SocketAddress localAddress, final ChannelPromise promise) {
ObjectUtil.checkNotNull(localAddress, "localAddress");
if (isNotValidPromise(promise, false)) {
return promise;
}
//我們最終找到的是HeadContext
final AbstractChannelHandlerContext next = findContextOutbound(MASK_BIND);
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
//調(diào)用HeadContext的bind方法
next.invokeBind(localAddress, promise);
} else {
safeExecute(executor, new Runnable() {
@Override
public void run() {
next.invokeBind(localAddress, promise);
}
}, promise, null, false);
}
return promise;
}
AbstractHandlerContext#invokeBind:
private void invokeBind(SocketAddress localAddress, ChannelPromise promise) {
if (invokeHandler()) {
try {
((ChannelOutboundHandler) handler()).bind(this, localAddress, promise);
} catch (Throwable t) {
notifyOutboundHandlerException(t, promise);
}
} else {
bind(localAddress, promise);
}
}
因?yàn)槲覀冎繦andler是HeadContext鳍怨,所以我們直接定位到HeadContext#bind方法即可:
public void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) {
unsafe.bind(localAddress, promise);
}
看到該方法內(nèi)部是調(diào)用了NioMessageUnsafe#bind方法,這個在前文中有提及到跪妥,可以翻閱前文查看鞋喇。這里我們直接跳到NioMessageUnsafe的父類AbstractUnsafe#bind方法:
public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {
assertEventLoop();
if (!promise.setUncancellable() || !ensureOpen(promise)) {
return;
}
// See: https://github.com/netty/netty/issues/576
if (Boolean.TRUE.equals(config().getOption(ChannelOption.SO_BROADCAST)) &&
localAddress instanceof InetSocketAddress &&
!((InetSocketAddress) localAddress).getAddress().isAnyLocalAddress() &&
!PlatformDependent.isWindows() && !PlatformDependent.maybeSuperUser()) {
// Warn a user about the fact that a non-root user can't receive a
// broadcast packet on *nix if the socket is bound on non-wildcard address.
logger.warn(
"A non-root user can't receive a broadcast packet if the socket " +
"is not bound to a wildcard address; binding to a non-wildcard " +
"address (" + localAddress + ") anyway as requested.");
}
boolean wasActive = isActive();
try {
doBind(localAddress);
} catch (Throwable t) {
safeSetFailure(promise, t);
closeIfClosed();
return;
}
if (!wasActive && isActive()) {
invokeLater(new Runnable() {
@Override
public void run() {
pipeline.fireChannelActive();
}
});
}
safeSetSuccess(promise);
}
這里我們其它暫時(shí)忽略,暫時(shí)只看doBind方法即可眉撵,該方法在父類中是個抽象方法侦香,具體的實(shí)現(xiàn)是由其子類NioServerSocketChannel實(shí)現(xiàn)的:
protected void doBind(SocketAddress localAddress) throws Exception {
if (PlatformDependent.javaVersion() >= 7) {
javaChannel().bind(localAddress, config.getBacklog());
} else {
javaChannel().socket().bind(localAddress, config.getBacklog());
}
}
這里又回到了熟悉的Java
底層API
了,并且這里還針對JVM
版本做了不同的處理纽疟,該方法將給定的地址端口進(jìn)行綁定操作罐韩。
到此為止,關(guān)于ServerBootstrap的整個啟動流程就完成了污朽,接下來我們分析下客戶端連接上服務(wù)端是如何處理的散吵。
首先我們定位到NioEventLoop#run方法:
protected void run() {
int selectCnt = 0;
for (;;) {
try {
int strategy;
try {
strategy = selectStrategy.calculateStrategy(selectNowSupplier, hasTasks());
switch (strategy) {
case SelectStrategy.CONTINUE:
continue;
case SelectStrategy.BUSY_WAIT:
// fall-through to SELECT since the busy-wait is not supported with NIO
case SelectStrategy.SELECT:
long curDeadlineNanos = nextScheduledTaskDeadlineNanos();
if (curDeadlineNanos == -1L) {
curDeadlineNanos = NONE; // nothing on the calendar
}
nextWakeupNanos.set(curDeadlineNanos);
try {
if (!hasTasks()) {
strategy = select(curDeadlineNanos);
}
} finally {
// This update is just to help block unnecessary selector wakeups
// so use of lazySet is ok (no race condition)
nextWakeupNanos.lazySet(AWAKE);
}
// fall through
default:
}
} catch (IOException e) {
// If we receive an IOException here its because the Selector is messed up. Let's rebuild
// the selector and retry. https://github.com/netty/netty/issues/8566
rebuildSelector0();
selectCnt = 0;
handleLoopException(e);
continue;
}
selectCnt++;
cancelledKeys = 0;
needsToSelectAgain = false;
final int ioRatio = this.ioRatio;
boolean ranTasks;
if (ioRatio == 100) {
try {
if (strategy > 0) {
processSelectedKeys();
}
} finally {
// Ensure we always run tasks.
ranTasks = runAllTasks();
}
} else if (strategy > 0) {
final long ioStartTime = System.nanoTime();
try {
processSelectedKeys();
} finally {
// Ensure we always run tasks.
final long ioTime = System.nanoTime() - ioStartTime;
ranTasks = runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
}
} else {
ranTasks = runAllTasks(0); // This will run the minimum number of tasks
}
if (ranTasks || strategy > 0) {
if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS && logger.isDebugEnabled()) {
logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.",
selectCnt - 1, selector);
}
selectCnt = 0;
} else if (unexpectedSelectorWakeup(selectCnt)) { // Unexpected wakeup (unusual case)
selectCnt = 0;
}
} catch (CancelledKeyException e) {
// Harmless exception - log anyway
if (logger.isDebugEnabled()) {
logger.debug(CancelledKeyException.class.getSimpleName() + " raised by a Selector {} - JDK bug?",
selector, e);
}
} catch (Throwable t) {
handleLoopException(t);
}
// Always handle shutdown even if the loop processing threw an exception.
try {
if (isShuttingDown()) {
closeAll();
if (confirmShutdown()) {
return;
}
}
} catch (Throwable t) {
handleLoopException(t);
}
}
}
這里我們暫時(shí)先看processSelectedKeys()方法:
private void processSelectedKeys() {
if (selectedKeys != null) {
processSelectedKeysOptimized();
} else {
processSelectedKeysPlain(selector.selectedKeys());
}
}
跟進(jìn)processSelectedKeysOptimized方法:
private void processSelectedKeysOptimized() {
//遍歷所有的key
for (int i = 0; i < selectedKeys.size; ++i) {
final SelectionKey k = selectedKeys.keys[i];
// null out entry in the array to allow to have it GC'ed once the Channel close
// See https://github.com/netty/netty/issues/2363
selectedKeys.keys[i] = null;
//因?yàn)閍ttachment我們是存放著對應(yīng)的channel,所以這里從attachment中獲取channel
final Object a = k.attachment();
if (a instanceof AbstractNioChannel) {
//處理對應(yīng)的channel
processSelectedKey(k, (AbstractNioChannel) a);
} else {
@SuppressWarnings("unchecked")
NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
processSelectedKey(k, task);
}
if (needsToSelectAgain) {
selectedKeys.reset(i + 1);
selectAgain();
i = -1;
}
}
}
這里我們直接看processSelectedKey方法:
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
if (!k.isValid()) {
final EventLoop eventLoop;
try {
eventLoop = ch.eventLoop();
} catch (Throwable ignored) {
// If the channel implementation throws an exception because there is no event loop, we ignore this
// because we are only trying to determine if ch is registered to this event loop and thus has authority
// to close ch.
return;
}
// Only close ch if ch is still registered to this EventLoop. ch could have deregistered from the event loop
// and thus the SelectionKey could be cancelled as part of the deregistration process, but the channel is
// still healthy and should not be closed.
// See https://github.com/netty/netty/issues/5125
if (eventLoop == this) {
// close the channel if the key is not valid anymore
unsafe.close(unsafe.voidPromise());
}
return;
}
try {
int readyOps = k.readyOps();
// We first need to call finishConnect() before try to trigger a read(...) or write(...) as otherwise
// the NIO JDK channel implementation may throw a NotYetConnectedException.
if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
// remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking
// See https://github.com/netty/netty/issues/924
int ops = k.interestOps();
ops &= ~SelectionKey.OP_CONNECT;
k.interestOps(ops);
unsafe.finishConnect();
}
// Process OP_WRITE first as we may be able to write some queued buffers and so free memory.
if ((readyOps & SelectionKey.OP_WRITE) != 0) {
// Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
ch.unsafe().forceFlush();
}
// Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead
// to a spin loop
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
unsafe.read();
}
} catch (CancelledKeyException ignored) {
unsafe.close(unsafe.voidPromise());
}
}
接下來我們重點(diǎn)放在unsafe.read方法上,由上面可知該Unsafe的實(shí)現(xiàn)為NioMessageUnsafe:
public void read() {
assert eventLoop().inEventLoop();
final ChannelConfig config = config();
final ChannelPipeline pipeline = pipeline();
final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
allocHandle.reset(config);
boolean closed = false;
Throwable exception = null;
try {
try {
do {
//處理socket
int localRead = doReadMessages(readBuf);
if (localRead == 0) {
break;
}
if (localRead < 0) {
closed = true;
break;
}
allocHandle.incMessagesRead(localRead);
} while (allocHandle.continueReading());
} catch (Throwable t) {
exception = t;
}
//遍歷消息然后通過ChannelPipeline觸發(fā)channelRead
int size = readBuf.size();
for (int i = 0; i < size; i ++) {
readPending = false;
pipeline.fireChannelRead(readBuf.get(i));
}
readBuf.clear();
allocHandle.readComplete();
pipeline.fireChannelReadComplete();
if (exception != null) {
closed = closeOnReadError(exception);
pipeline.fireExceptionCaught(exception);
}
if (closed) {
inputShutdown = true;
if (isOpen()) {
close(voidPromise());
}
}
} finally {
if (!readPending && !config.isAutoRead()) {
removeReadOp();
}
}
}
}
在doReadMessages方法中蟆肆,主要處理接收Socket的連接矾睦,如果SocketChannel不為空,則添加到buf中即可炎功。
protected int doReadMessages(List<Object> buf) throws Exception {
SocketChannel ch = SocketUtils.accept(javaChannel());
//...省略部分代碼
if (ch != null) {
buf.add(new NioSocketChannel(this, ch));
return 1;
}
}
最終的會存儲在List<Object>
的readBuf中枚冗,我們客戶端連接上來后會在該列表中保存一個NioSocketChannel對象∩咚穑可以看到如果SocketChannel不為空的話官紫,則會實(shí)例化一個NioSocketChannel對象肛宋,我們來看看這個對象州藕。
public NioSocketChannel(Channel parent, SocketChannel socket) {
super(parent, socket);
config = new NioSocketChannelConfig(this, socket.socket());
}
跟進(jìn)父類AbstractNioByteChannel:
protected AbstractNioByteChannel(Channel parent, SelectableChannel ch) {
super(parent, ch, SelectionKey.OP_READ);
}
繼續(xù)跟進(jìn)父類(AbstractNioChannel)束世,這里需要注意下這里SelectionKey.OP_READ
表示監(jiān)聽讀事件:
protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
super(parent);
this.ch = ch;
this.readInterestOp = readInterestOp;
ch.configureBlocking(false);
}
居然還有父類(AbstractChannel),繼續(xù)跟進(jìn)去看看:
protected AbstractChannel(Channel parent) {
this.parent = parent;
id = newId();
unsafe = newUnsafe();
pipeline = newChannelPipeline();
}
原來和之前一樣床玻,老三件了毁涉。
最后通過ChannelPipeline觸發(fā)ChannelRead事件,還記得我們之前在ServerBootstrap中注冊了一個ChannelInitializer中為客戶端Channel注冊了一個ServerBootstrapAcceptor處理器嗎锈死,而在這里就派上用場了贫堰。通過ChannelPipeline的fireChannelRead方法,最終也會調(diào)用到ServerBootstrapAcceptor對象的channelRead方法:
public void channelRead(ChannelHandlerContext ctx, Object msg) {
//這里的msg為NioSocketChannel
final Channel child = (Channel) msg;
//配置對應(yīng)的childHandler
child.pipeline().addLast(childHandler);
//配置對應(yīng)的childOptions
setChannelOptions(child, childOptions, logger);
//配置對應(yīng)的Attributes
setAttributes(child, childAttrs);
try {
//childGroup為之前配置的WorkerGroup,調(diào)用register方法將NioSocketChannel與EventLoop進(jìn)行綁定
childGroup.register(child).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
forceClose(child, future.cause());
}
}
});
} catch (Throwable t) {
forceClose(child, t);
}
}
到這里為止待牵,Netty服務(wù)端啟動以及獲取客戶端連接的整個流程就已經(jīng)清晰了其屏。