零呀闻、 整體流程
1化借、用戶在main線程啟動(dòng)執(zhí)行ServerSocketChannel的初始化
1)初始化一個(gè)NioServerSocketChannel channel
包括初始化Channel關(guān)聯(lián)的ch、pipeline捡多、unsafe蓖康、config。
nio.ServerSocketChannel ch = nio.SelectorProvider.openServerSocketChannel()局服;
ch.configBlocking(false);
2)將channel與ServerBootstrap的EventLoopGroup中的某個(gè)EventLoop child綁定
3)向child的任務(wù)執(zhí)行隊(duì)列中添加channel的register0事件
4)監(jiān)聽(tīng)register0事件的完成狀態(tài)钓瞭,完成時(shí)向child的任務(wù)執(zhí)行隊(duì)列中添加channel的bind事件
2驳遵、EventLoop線程中執(zhí)行register0事件
1)將NioServerSocketChannel的ch注冊(cè)到NioEventLoop child的selector上
SelectionKey selectionKey = ch.register(eventLoop().selector, 0, this);
2)標(biāo)識(shí)ChannelPromise狀態(tài)為success淫奔,觸發(fā)Listener將bind任務(wù)添加到EventLoop的執(zhí)行任務(wù)隊(duì)列
3)ServerSocketChannel channel產(chǎn)生ChannelRegisted事件
pipeline.fireChannelRegistered();
會(huì)導(dǎo)致ChannelInitialzer.channelRegisted()執(zhí)行,將ServerBootstrapAcceptor添加到channel的pipeline中堤结。
3唆迁、EventLoop線程中執(zhí)行Channel.bind事件
1)ServerSocketChannel綁定對(duì)應(yīng)服務(wù)端口鸭丛,監(jiān)聽(tīng)新的客戶端連接
javaChannel().socket().bind(localAddress, config.getBacklog());
2)ServerSocketChannel channel產(chǎn)生ChannelActive事件
pipeline.fireChannelActive();
ChannelActive事件由HeadContext處理,向ch中添加OP_ACCEPT事件監(jiān)聽(tīng)唐责。
selectionKey.interestOps(OP_ACCEPT);
一鳞溉、代碼入口
ServerBootstrap b = new ServerBootstrap();
// Start the server.
ChannelFuture f = b.bind(port).sync();
服務(wù)端啟動(dòng)時(shí)都會(huì)執(zhí)行上面的代碼,用來(lái)啟動(dòng)ServerSocketChannel監(jiān)聽(tīng)對(duì)應(yīng)端口鼠哥。
最終由AbstractBootstrap.doBind方法處理熟菲。
private ChannelFuture doBind(final SocketAddress localAddress) {
final ChannelFuture regFuture = initAndRegister();
final Channel channel = regFuture.channel();
if (regFuture.cause() != null) {
return regFuture;
}
if (regFuture.isDone()) {
// At this point we know that the registration was complete and successful.
ChannelPromise promise = channel.newPromise();
doBind0(regFuture, channel, localAddress, promise);
return promise;
} else {
// Registration future is almost always fulfilled already, but just in case it's not.
final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
regFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
Throwable cause = future.cause();
if (cause != null) {
// Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
// IllegalStateException once we try to access the EventLoop of the Channel.
promise.setFailure(cause);
} else {
// Registration was successful, so set the correct executor to use.
// See https://github.com/netty/netty/issues/2586
promise.registered();
doBind0(regFuture, channel, localAddress, promise);
}
}
});
return promise;
}
}
二、用戶線程初始化NioServerSocketChannel
final Channel channel = channelFactory.newChannel();
從此行代碼開(kāi)始朴恳,ReflectiveChannelFactory通過(guò)反射的方式調(diào)用NioServerSocketChannel的構(gòu)造方法進(jìn)行channel的初始化抄罕。
1、NioServerSocketChannel的實(shí)例化
** 1)創(chuàng)建一個(gè)nio.ServerSocketChannel**
newSocket(SelectorProvider provider);
使用nio.SelectorProvider.openServerSocketChannel()創(chuàng)建一個(gè)nio.ServerSocketChannel ch于颖。
** 2)通過(guò)構(gòu)造方法實(shí)例化NioServerSocketChannel呆贿,并初始化相關(guān)的field**
parent = null
unsafe = new NioMessageUnsafe()
pipeline = new DefaultChannelPipeline(this) ->此處初始化一個(gè)DefaultChannelPipeline,并將pipeline和channel互相綁定
ch = ch ->同時(shí)將ch設(shè)置為非阻塞模式
readInterestOp = OP_ACCEPT
config = new ServerSocketChannelConfig(this, javaChannel().socket())
3)向channel的pipeline中添加Inbound處理器ChannelInitializer
init(channel);
由其父類(lèi)ServerBootstrap直接實(shí)現(xiàn)森渐。
- 設(shè)置channel的attr和options**
- 添加ChannelInitializer到pipeline中**
p.addLast(new ChannelInitializer<Channel>() {
@Override
public void initChannel(Channel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
ChannelHandler handler = config.handler();
if (handler != null) {
pipeline.addLast(handler);
}
pipeline.addLast(new ServerBootstrapAcceptor(
currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
}
該處理器的*channelRegisted事件*回調(diào)方法會(huì)將*ServerBootstrapAcceptor*加入channel的pipeline中做入,并將自己從pipeline中移除。
4)ServerBootstrapAcceptor
- ServerBootstrapAcceptor也是一個(gè)Inbound處理器同衣,用于在Server端accept新的客戶端連接時(shí)竟块,向新生成的socketChannel中添加用戶定義的業(yè)務(wù)處理器。
- 其channelRead事件回調(diào)方法會(huì)將業(yè)務(wù)方往ServerBootstrap中添加的childHandler添加到socketChannel對(duì)應(yīng)的pipeline中乳怎。
- 對(duì)于Server端彩郊,channelRead事件被定義為server端accept到了新的socket連接。
2蚪缀、將NioServerSocketChannel注冊(cè)到ServerBootstrap的EventLoopGroup上
ChannelFuture regFuture = group().register(channel);
1)NioEventLoopGroup的chooser從其children中選出一個(gè)NioEventLoop child秫逝,調(diào)用其register()方法進(jìn)行channel注冊(cè);
2)實(shí)際將NioEventLoop child傳給NioServerSocketChannel的unsafe询枚,調(diào)用其register(EventLoop eventLoop, final ChannelPromise promise)方法完成注冊(cè)
- 將channel.eventLoop綁定為當(dāng)前NioEventLoop child违帆;
- 將AbstractUnsafe.register0(DefaultChannelPromise promise)任務(wù)加入EventLoop child的執(zhí)行任務(wù)隊(duì)列;
3金蜀、給標(biāo)識(shí)“register0任務(wù)”完成狀態(tài)ChannelFuture添加一個(gè)Listener
doBind0(regFuture, channel, localAddress, promise);
當(dāng)ChannelFuture完成時(shí)刷后,將NioServerSocketChannel.bind(SocketAddress localAddress, ChannelPromise promise)任務(wù)加入EventLoop child的執(zhí)行任務(wù)隊(duì)列
三、EventLoop線程中執(zhí)行register0事件
AbstractUnsafe.register0(ChannelPromise promise);
1渊抄、將NioServerSocketChannel的ch注冊(cè)到NioEventLoop child的selector上尝胆,同時(shí)將注冊(cè)得到的SelectionKey綁定為NioServerSocketChannel的selectionKey
doRegister();
selectionKey = javaChannel().register(eventLoop().selector, 0, this);
netty的輪詢(xún)注冊(cè)機(jī)制其實(shí)是將AbstractNioChannel內(nèi)部的jdk類(lèi)對(duì)象SelectableChannel ch注冊(cè)到j(luò)dk類(lèi)對(duì)象Selector selector上去,并且將AbstractNioChannel channel作為SelectableChannel對(duì)象ch的一個(gè)attachment附屬上护桦,這樣在jdk輪詢(xún)出某個(gè)SelectableChannel有IO事件發(fā)生時(shí)含衔,就可以直接取出AbstractNioChannel進(jìn)行后續(xù)操作。
2、標(biāo)識(shí)ChannelPromise狀態(tài)為success贪染,觸發(fā)Listener將bind任務(wù)添加到EventLoop的執(zhí)行任務(wù)隊(duì)列
safeSetSuccess(promise);
3缓呛、ServerSocketChannel channel產(chǎn)生ChannelRegisted事件
pipeline.fireChannelRegistered();
會(huì)導(dǎo)致ChannelInitialzer.channelRegisted()執(zhí)行,將ServerBootstrapAcceptor添加到channel的pipeline中杭隙。
四哟绊、EventLoop線程中執(zhí)行Channel.bind事件
AbstractUnsafe.bind(final SocketAddress localAddress, final ChannelPromise promise);
1、ServerSocketChannel綁定對(duì)應(yīng)服務(wù)端口痰憎,監(jiān)聽(tīng)新的客戶端連接
javaChannel().socket().bind(localAddress, config.getBacklog());
2票髓、ServerSocketChannel channel產(chǎn)生ChannelActive事件
pipeline.fireChannelActive();
ChannelActive事件由HeadContext處理,最終調(diào)用了AbstractNioUnsafe.doBeginRead()方法铣耘,向ch中添加OP_READ事件監(jiān)聽(tīng)炬称。
selectionKey.interestOps(interestOps | readInterestOp);