在上一篇文章中對(duì)于客戶端的啟動(dòng)做了闡述,在本文則將對(duì)服務(wù)端的啟動(dòng)做說明坚俗。其實(shí)服務(wù)端和客戶端啟動(dòng)的過程是比較相似的僚楞,如果對(duì)客戶端啟動(dòng)比較了解,那么接下來的旅程將會(huì)比較輕松劈愚。
同樣的,我們先看下服務(wù)端的代碼:
public static void main(String[] args) throws Exception {
// Configure SSL.
final SslContext sslCtx;
if (SSL) {
SelfSignedCertificate ssc = new SelfSignedCertificate();
sslCtx = SslContextBuilder.forServer(ssc.certificate(), ssc.privateKey()).build();
} else {
sslCtx = null;
}
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
final EchoServerHandler serverHandler = new EchoServerHandler();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 100)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
if (sslCtx != null) {
p.addLast(sslCtx.newHandler(ch.alloc()));
}
//p.addLast(new LoggingHandler(LogLevel.INFO));
p.addLast(serverHandler);
}
});
ChannelFuture f = b.bind(PORT).sync();
f.channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
- EventLoopGroup: 指定了兩個(gè)闻妓,bossGroup和workGroup菌羽,bossGroup是為了處理請(qǐng)求鏈接,而workGroup是為了處理與各個(gè)客戶端的操作由缆。
- ChannelType: 指定 Channel 的類型. 因?yàn)槭强蛻舳? 因此使用了 NioServerSocketChannel.
- Handler: 設(shè)置數(shù)據(jù)的處理器注祖。
接下來讓我們看看服務(wù)端啟動(dòng)的基本流程:
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
創(chuàng)建了兩個(gè)NioEventLoopGroup,這兩個(gè)對(duì)象可以看做是傳統(tǒng)IO編程模型的兩大線程組均唉,bossGroup表示監(jiān)聽端口是晨,accept 新連接的線程組,workerGroup表示處理每一條連接的數(shù)據(jù)讀寫的線程組
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
給引導(dǎo)類配置兩大線程組舔箭,這個(gè)引導(dǎo)類的線程模型也就定型了罩缴。
.channel(NioServerSocketChannel.class)
和客戶端一樣蚊逢,在這里設(shè)置IO模型,為NIO箫章。
.option(ChannelOption.SO_BACKLOG, 100)
這個(gè)是給服務(wù)端的channel配置一些屬性烙荷,SO_BACKLOG表示系統(tǒng)用于臨時(shí)存放已完成三次握手的請(qǐng)求的隊(duì)列的最大長度,如果連接建立頻繁檬寂,服務(wù)器處理創(chuàng)建新連接較慢终抽,可以適當(dāng)調(diào)大這個(gè)參數(shù)。
.handler(new LoggingHandler(LogLevel.INFO))
handler()用于指定在服務(wù)端啟動(dòng)過程中的一些邏輯桶至。
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
if (sslCtx != null) {
p.addLast(sslCtx.newHandler(ch.alloc()));
}
//p.addLast(new LoggingHandler(LogLevel.INFO));
p.addLast(serverHandler);
}
});
我們調(diào)用childHandler()方法昼伴,給這個(gè)引導(dǎo)類創(chuàng)建一個(gè)ChannelInitializer,這里主要就是定義后續(xù)每條連接的數(shù)據(jù)讀寫镣屹,業(yè)務(wù)處理邏輯圃郊。
EventLoopGroup初始化
服務(wù)端的EventLoopGroup初始化和客戶端的是有區(qū)別的,bossGroup是負(fù)責(zé)監(jiān)聽客戶端連接的女蜈,如果它發(fā)現(xiàn)客戶端有連接來到時(shí)持舆,會(huì)初始化各項(xiàng)資源,接著在workerGroup中取出一個(gè)EeventLoop綁定到這個(gè)客戶端中鞭光,處理這個(gè)客戶端的各種請(qǐng)求吏廉。具體的初始化過程在上一篇文章中已經(jīng)有比較詳細(xì)的說明,這里就不再詳述惰许。比較需要注意的是二者是在哪里和channel關(guān)聯(lián)起來的席覆。首先來看下一下路徑:
ServerBootstrap.bind()-->AbstractBootstrap.doBind()-->AbstractBootstrap.initAndRegister()
仔細(xì)看下initAndRegister()方法中的關(guān)鍵代碼:
final ChannelFuture initAndRegister() {
channel = channelFactory.newChannel();
init(channel);
ChannelFuture regFuture = config().group().register(channel);
}
從上述代碼可以看出,這部分和客戶端的實(shí)現(xiàn)如出一轍汹买,要注意的是config().group()返回的是我們初始化EventLoopGroup的哪個(gè)佩伤,這個(gè)從b.group(bossGroup, workerGroup)方法中可以看出:
public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) {
super.group(parentGroup);
if (childGroup == null) {
throw new NullPointerException("childGroup");
}
if (this.childGroup != null) {
throw new IllegalStateException("childGroup set already");
}
this.childGroup = childGroup;
return this;
}
得出結(jié)論: config().group().register(channel);中的group是parentGroup,即bossGroup晦毙,用于負(fù)責(zé)監(jiān)聽客戶端連接生巡。跟蹤register的結(jié)果自然和客戶端的一樣,最終代碼如下:
selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
即是NioServerSocketChannsl與bossGroup關(guān)聯(lián)上见妒,注冊(cè)到一個(gè)selector孤荣。接下來我們需要找到workerGroup是在哪里與NioSocketChannel關(guān)聯(lián)是在哪里。繼續(xù)往下看须揣。
init(channel)
void init(Channel channel) throws Exception {
ChannelPipeline p = channel.pipeline();
final EventLoopGroup currentChildGroup = childGroup;
final ChannelHandler currentChildHandler = childHandler;
final Entry<ChannelOption<?>, Object>[] currentChildOptions;
final Entry<AttributeKey<?>, Object>[] currentChildAttrs;
synchronized (childOptions) {
currentChildOptions = childOptions.entrySet().toArray(newOptionArray(0));
}
synchronized (childAttrs) {
currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(0));
}
p.addLast(new ChannelInitializer<Channel>() {
@Override
public void initChannel(final Channel ch) throws Exception {
final ChannelPipeline pipeline = ch.pipeline();
ChannelHandler handler = config.handler();
if (handler != null) {
pipeline.addLast(handler);
}
ch.eventLoop().execute(new Runnable() {
@Override
public void run() {
pipeline.addLast(new ServerBootstrapAcceptor(
ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
}
});
}
從上面代碼可以看出盐股,ChannelPipeline會(huì)添加一個(gè)handler ServerBootstrapAcceptor,在ServerBootstrapAcceptor會(huì)重寫一個(gè)方法:channelRead耻卡,請(qǐng)看channelRead方法關(guān)鍵代碼:
childGroup.register(child).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
forceClose(child, future.cause());
}
}
});
這里的childGroup即是currentChildGroup為workGroup疯汁,在這里workGroup會(huì)和某個(gè)channel關(guān)聯(lián),完成channel注冊(cè)到selector卵酪。所以接下來的疑問在于channelRead方法是在哪里調(diào)用的幌蚊。這里可以提下谤碳,會(huì)調(diào)用NioEventLoop.run方法,這個(gè)方法中調(diào)用processSelectedKeys方法溢豆,最后會(huì)調(diào)用到unsafe.read()中的doReadMessages蜒简,在這個(gè)方法中buf.add(new NioSocketChannel(this, ch));再結(jié)合一下代碼可知,最后通過pipeline機(jī)制會(huì)調(diào)用到channelRead沫换。
int size = readBuf.size();
for (int i = 0; i < size; i ++) {
readPending = false;
pipeline.fireChannelRead(readBuf.get(i));
}
handler的添加
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
if (sslCtx != null) {
p.addLast(sslCtx.newHandler(ch.alloc()));
}
//p.addLast(new LoggingHandler(LogLevel.INFO));
p.addLast(serverHandler);
}
});
自定義handler的添加過程和客戶端的如出一轍臭蚁,差別在于此處的handler和childHandler分別設(shè)置了handler和childHandler最铁,handler字段與accept過程有關(guān), handler負(fù)責(zé)處理客戶端的連接請(qǐng)求; 而 childHandler 就是負(fù)責(zé)和客戶端的連接的 IO 交互讯赏。看代碼理解:
void init(Channel channel) throws Exception {
ChannelPipeline p = channel.pipeline();
final EventLoopGroup currentChildGroup = childGroup;
final ChannelHandler currentChildHandler = childHandler;
final Entry<ChannelOption<?>, Object>[] currentChildOptions;
final Entry<AttributeKey<?>, Object>[] currentChildAttrs;
synchronized (childOptions) {
currentChildOptions = childOptions.entrySet().toArray(newOptionArray(0));
}
synchronized (childAttrs) {
currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(0));
}
p.addLast(new ChannelInitializer<Channel>() {
@Override
public void initChannel(final Channel ch) throws Exception {
final ChannelPipeline pipeline = ch.pipeline();
ChannelHandler handler = config.handler();
if (handler != null) {
pipeline.addLast(handler);
}
ch.eventLoop().execute(new Runnable() {
@Override
public void run() {
pipeline.addLast(new ServerBootstrapAcceptor(
ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
}
});
}
在上述的initChannel方法中通過config.handler()獲取到LoggingHandler冷尉,然后加入pipeline中漱挎,然后再加入ServerBootstrapAcceptor
最后形成結(jié)果如下:
然后ServerBootstrapAcceptor.channelRead方法中會(huì)為新建的Channel 設(shè)置 handler 并注冊(cè)到一個(gè)eventLoop。
public void channelRead(ChannelHandlerContext ctx, Object msg) {
final Channel child = (Channel) msg;
child.pipeline().addLast(childHandler);
setChannelOptions(child, childOptions, logger);
for (Entry<AttributeKey<?>, Object> e: childAttrs) {
child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
}
try {
childGroup.register(child).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
forceClose(child, future.cause());
}
}
});
} catch (Throwable t) {
forceClose(child, t);
}
}
上述的childHandler就是服務(wù)端啟動(dòng)的childHandler雀哨。當(dāng)這個(gè)客戶端連接 Channel 注冊(cè)后, 就會(huì)觸發(fā) ChannelInitializer.initChannel 方法的調(diào)用, 此后的客戶端連接的 ChannelPipeline 狀態(tài)如下:
最后我們來總結(jié)一下服務(wù)器端的 handler 與 childHandler 的區(qū)別與聯(lián)系:
- 在服務(wù)器 NioServerSocketChannel 的 pipeline 中添加的是 handler 與 ServerBootstrapAcceptor.
- 當(dāng)有新的客戶端連接請(qǐng)求時(shí), ServerBootstrapAcceptor.channelRead 中負(fù)責(zé)新建此連接的 NioSocketChannel 并添加 childHandler 到 NioSocketChannel 對(duì)應(yīng)的 pipeline 中, 并將此 channel 綁定到 workerGroup 中的某個(gè) eventLoop 中.
- handler 是在 accept 階段起作用, 它處理客戶端的連接請(qǐng)求.
-
childHandler 是在客戶端連接建立以后起作用, 它負(fù)責(zé)客戶端連接的 IO 交互.