ServerBootstrap與Bootstrap類似硕盹,只不過這個(gè)是用于服務(wù)端的啟動(dòng)温赔。下面看下具體的使用:
public class TimeServer {
public void bind(int port) throws Exception {
// 配置服務(wù)端的NIO線程組
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 1024)
.childHandler(new ChildChannelHandler());
// 綁定端口况木,同步等待成功
ChannelFuture f = b.bind(port).sync();
// 等待服務(wù)端監(jiān)聽端口關(guān)閉
f.channel().closeFuture().sync();
} finally {
// 優(yōu)雅退出咐蚯,釋放線程池資源
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
private class ChildChannelHandler extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel arg0) throws Exception {
arg0.pipeline().addLast(new TimeServerHandler());
}
}
/**
* @param args
* @throws Exception
*/
public static void main(String[] args) throws Exception {
int port = 8080;
if (args != null && args.length > 0) {
try {
port = Integer.valueOf(args[0]);
} catch (NumberFormatException e) {
// 采用默認(rèn)值
}
}
new TimeServer().bind(port);
}
}
與上篇文章介紹的Bootstrap不同乃坤,這里傳入了兩個(gè)EventLoopGroup撞蜂,其中bossGroup用于接收請(qǐng)求,workerGroup用于處理IO事件侥袜。Netty是Reactor模式的實(shí)現(xiàn)蝌诡,有關(guān)Reactor模式請(qǐng)參考NIO技術(shù)概覽。
這里傳入的Channel類型是NioServerSocketChannel枫吧,與Bootstrap一樣浦旱,在bind方法中會(huì)調(diào)用init方法,下面看下ServerBootstrap中init方法的實(shí)現(xiàn):
@Override
void init(Channel channel) throws Exception {
final Map<ChannelOption<?>, Object> options = options0();
synchronized (options) {
setChannelOptions(channel, options, logger);
}
final Map<AttributeKey<?>, Object> attrs = attrs0();
synchronized (attrs) {
for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
@SuppressWarnings("unchecked")
AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();
channel.attr(key).set(e.getValue());
}
}
ChannelPipeline p = channel.pipeline();
final EventLoopGroup currentChildGroup = childGroup;
final ChannelHandler currentChildHandler = childHandler;
final Entry<ChannelOption<?>, Object>[] currentChildOptions;
final Entry<AttributeKey<?>, Object>[] currentChildAttrs;
synchronized (childOptions) {
currentChildOptions = childOptions.entrySet().toArray(newOptionArray(childOptions.size()));
}
synchronized (childAttrs) {
currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(childAttrs.size()));
}
// 向pipeLine中添加Handler
p.addLast(new ChannelInitializer<Channel>() {
@Override
public void initChannel(final Channel ch) throws Exception {
final ChannelPipeline pipeline = ch.pipeline();
ChannelHandler handler = config.handler();
if (handler != null) {
pipeline.addLast(handler);
}
// 使用eventLoop來執(zhí)行任務(wù)
ch.eventLoop().execute(new Runnable() {
@Override
public void run() {
pipeline.addLast(new ServerBootstrapAcceptor(
ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
}
});
}
注意一下這段代碼:
pipeline.addLast(new ServerBootstrapAcceptor(
ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
這段代碼向pipeLine中添加了一個(gè)ServerBootstrapAcceptor對(duì)象九杂,ServerBootstrapAcceptor對(duì)象也是一個(gè)Handler颁湖,看下ServerBootstrapAcceptor的定義:
private static class ServerBootstrapAcceptor extends ChannelInboundHandlerAdapter {
private final EventLoopGroup childGroup;
private final ChannelHandler childHandler;
private final Entry<ChannelOption<?>, Object>[] childOptions;
private final Entry<AttributeKey<?>, Object>[] childAttrs;
private final Runnable enableAutoReadTask;
ServerBootstrapAcceptor(
final Channel channel, EventLoopGroup childGroup, ChannelHandler childHandler,
Entry<ChannelOption<?>, Object>[] childOptions, Entry<AttributeKey<?>, Object>[] childAttrs) {
this.childGroup = childGroup;
this.childHandler = childHandler;
this.childOptions = childOptions;
this.childAttrs = childAttrs;
// Task which is scheduled to re-enable auto-read.
// It's important to create this Runnable before we try to submit it as otherwise the URLClassLoader may
// not be able to load the class because of the file limit it already reached.
//
// See https://github.com/netty/netty/issues/1328
enableAutoReadTask = new Runnable() {
@Override
public void run() {
channel.config().setAutoRead(true);
}
};
}
@Override
@SuppressWarnings("unchecked")
public void channelRead(ChannelHandlerContext ctx, Object msg) {
final Channel child = (Channel) msg;
child.pipeline().addLast(childHandler);
setChannelOptions(child, childOptions, logger);
for (Entry<AttributeKey<?>, Object> e: childAttrs) {
child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
}
try {
childGroup.register(child).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
forceClose(child, future.cause());
}
}
});
} catch (Throwable t) {
forceClose(child, t);
}
}
private static void forceClose(Channel child, Throwable t) {
child.unsafe().closeForcibly();
logger.warn("Failed to register an accepted channel: {}", child, t);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
final ChannelConfig config = ctx.channel().config();
if (config.isAutoRead()) {
// stop accept new connections for 1 second to allow the channel to recover
// See https://github.com/netty/netty/issues/1328
config.setAutoRead(false);
ctx.channel().eventLoop().schedule(enableAutoReadTask, 1, TimeUnit.SECONDS);
}
// still let the exceptionCaught event flow through the pipeline to give the user
// a chance to do something with it
ctx.fireExceptionCaught(cause);
}
}
ServerBootstrapAcceptor繼承自ChannelInboundHandlerAdapter,用于處理Inbound事件例隆,該類的主要功能就是在服務(wù)器端接收到請(qǐng)求之后甥捺,會(huì)返回一個(gè)NioSocketChannel對(duì)象作為參數(shù)msg傳入channelRead方法中,用變量child來表示镀层;然后把在TimeServer中傳入的childHandler添加到child所對(duì)應(yīng)的pipeLine中镰禾,然后把child注冊(cè)到childGroup中,也就是TimeServer中定義的workerGroup。
接收請(qǐng)求的操作是在NioServerSocketChannel中的doReadMessages方法中實(shí)現(xiàn)的:
@Override
protected int doReadMessages(List<Object> buf) throws Exception {
SocketChannel ch = SocketUtils.accept(javaChannel());
try {
if (ch != null) {
buf.add(new NioSocketChannel(this, ch));
return 1;
}
} catch (Throwable t) {
logger.warn("Failed to create a new channel from an accepted socket.", t);
try {
ch.close();
} catch (Throwable t2) {
logger.warn("Failed to close a socket.", t2);
}
}
return 0;
}
doReadMessages方法在NioMessageUnsafe中的read方法中調(diào)用吴侦,然后會(huì)執(zhí)行pipeline.fireChannelRead(readBuf.get(i));
將SocketChannel對(duì)象傳入屋休,也就是channelRead方法中的msg參數(shù)。至此备韧,一個(gè)請(qǐng)求已經(jīng)接收完畢劫樟,通道之間的通信就會(huì)交給childGroup,也就是TimeServer中的workerGroup來處理织堂。