開篇
我們使用netty源碼包netty-example中的EchoServer來分析使用netty作為網(wǎng)絡通信框架服務端的啟動過程
說明
所有的分析基于NIO
一段Server端啟動樣板代碼
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
final EchoServerHandler serverHandler = new EchoServerHandler();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 100)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
p.addLast(new DelimiterBasedFrameDecoder(Integer.MAX_VALUE, Unpooled.copiedBuffer("$".getBytes())));
p.addLast(serverHandler);
}
});
// Start the server.
ChannelFuture f = b.bind(PORT).sync();
// Wait until the server socket is closed.
f.channel().closeFuture().sync();
} finally {
// Shut down all event loops to terminate all threads.
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
一般使用netty實現(xiàn)網(wǎng)絡通信的服務端都會實現(xiàn)類似上面的樣本代碼悠抹,下面我們對樣本代碼中的一些關鍵部分進行簡單解析
bossGroup
我們知道在經(jīng)典的NIO編程中计寇,Server端有一個專門的Acceptor線程負責接收用戶的連接請求,在netty中bossGroup就是為Acceptor提供線程的線程池(站在線程角度去理解)毛秘,一般bossGroup中只是包含一個線程workerGroup
每個被Server接收的連接也就是創(chuàng)建的SocketChannel會綁定到一個線程灾而,之后SocketChannel上發(fā)生的所有讀寫事件都是由其綁定的線程處理胡控,在netty中workerGroup就是給每個SocketChannel分配線程的線程池。關于netty的線程模型我在另一篇文章中有詳細的解析http://www.reibang.com/p/732f9dea34d7ServerBootstrap
Server端的引導類旁趟,下面講解ServerBootstrap的一些方法
- group方法設置acceptor和worker的線程池
- channel方法設置Server端的channel類型昼激,NioServerSocketChannel是server端的channel類型,它包裝了java nio中的ServerSocketChannel
- option()用來設置底層ServerSocketChannel一些tcp參數(shù)
- childOption()用來設置將來建立的SocketChannel的一些tcp參數(shù)
- handler()設置Server端事件處理鏈上的一個處理節(jié)點
- childHandler()用來設置被server端接受建立的SocketChannel事件處理鏈的一個處理節(jié)點
Server端啟動過程
Server端啟動的入口是serverBootstrap.bind(port)锡搜,bind過程分成兩個部分:
initAndRegister
主要執(zhí)行ServerSocketChannel的建立和初始化癣猾,下面詳細分析doBind
把Server端綁定到指定的ip和端口上
initAndRegister
//返回值是future類型,說明初始化是initAndRegister是異步的過程
final ChannelFuture initAndRegister() {
Channel channel = null;
try {
//channelFactory是ServerBootstrap根據(jù)用戶設置的NioServerSocketChannel.class生成的NioServerSocketChannel工廠
//通過channelFactory.newChannel()就可以創(chuàng)建出服務端的NioServerSocketChannel余爆,下面我們會詳細分析NioServerSocketChannel創(chuàng)建過程
channel = channelFactory.newChannel();
//對channel進行初始化
init(channel);
} catch (Throwable t) {
if (channel != null) {
// channel can be null if newChannel crashed (eg SocketException("too many open files"))
channel.unsafe().closeForcibly();
// as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
}
// as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
return new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t);
}
//上面完成了channel的初始化纷宇,這里實現(xiàn)對channel的注冊
ChannelFuture regFuture = config().group().register(channel);
if (regFuture.cause() != null) {
if (channel.isRegistered()) {
channel.close();
} else {
channel.unsafe().closeForcibly();
}
}
// If we are here and the promise is not failed, it's one of the following cases:
// 1) If we attempted registration from the event loop, the registration has been completed at this point.
// i.e. It's safe to attempt bind() or connect() now because the channel has been registered.
// 2) If we attempted registration from the other thread, the registration request has been successfully
// added to the event loop's task queue for later execution.
// i.e. It's safe to attempt bind() or connect() now:
// because bind() or connect() will be executed *after* the scheduled registration task is executed
// because register(), bind(), and connect() are all bound to the same thread.
return regFuture;
}
對于Server端來說上面initAndRegister方法主要包含了三個核心過程
- 創(chuàng)建NioServerSocketChannel
- 初始化NioServerSocketChannel
- 注冊NioServerSocketChannel
我們分別來分析
-
創(chuàng)建NioServerSocketChannel
channelFactory通過反射調(diào)用NioServerSocketChannel類的無參數(shù)構造方法創(chuàng)建NioServerSocketChannel對象
public NioServerSocketChannel() {
//DEFAULT_SELECTOR_PROVIDER 是NioServerSocketChannel靜態(tài)常量,類型是SelectorProvider蛾方,
this(newSocket(DEFAULT_SELECTOR_PROVIDER));
}
newSocket方法創(chuàng)建了java底層的ServerSocketChannel對象像捶,這里也就看出了NioServerSocketChannel就是netty對ServerSocketChannel的包裝
private static ServerSocketChannel newSocket(SelectorProvider provider) {
try {
/**
* Use the {@link SelectorProvider} to open {@link SocketChannel} and so remove condition in
* {@link SelectorProvider#provider()} which is called by each ServerSocketChannel.open() otherwise.
*
* See <a >#2308</a>.
*/
//使用SelectorProvider創(chuàng)建java底層的ServerSocketChannel
//寫過java nio的同學上陕,都應該記得之前創(chuàng)建ServerSocketChannel都是通過ServerSocketChannel.open()實現(xiàn)的
//其實ServerSocketChannel.open()源碼就是provider.openServerSocketChannel()
return provider.openServerSocketChannel();
} catch (IOException e) {
throw new ChannelException(
"Failed to open a server socket.", e);
}
}
接來下NioServerSocketChannel會給自己和父類中的關鍵屬性初始化值
public NioServerSocketChannel(ServerSocketChannel channel) {
//初始化父類
super(null, channel, SelectionKey.OP_ACCEPT);
//這個NioServerSocketChannelConfig是NioServerSocketChannel綁定的配置類,比如設置了最大可以連續(xù)從channel讀多少次數(shù)據(jù)
//再比如設置了每次從channel讀取數(shù)據(jù)的時候拓春,每次讀取數(shù)據(jù)使用的byteBuf大小動態(tài)變化策略
config = new NioServerSocketChannelConfig(this, javaChannel().socket());
}
我們看下各層父類的初始化
AbstractNioChannel
- 設置ch 為ServerSocketChannel
- 設置ServerSocketChannel感興趣的事件為OP_ACCEPT释簿,readInterestOp=16
- 設置ServerSocketChannel為非阻塞
protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
super(parent);
this.ch = ch;
this.readInterestOp = readInterestOp;
try {
ch.configureBlocking(false);
} catch (IOException e) {
try {
ch.close();
} catch (IOException e2) {
logger.warn(
"Failed to close a partially initialized socket.", e2);
}
throw new ChannelException("Failed to enter non-blocking mode.", e);
}
}
AbstractChannel
- 創(chuàng)建unsafe為NioMessageUnsafe,unsafe是處理channel連接硼莽,綁定和channel上讀寫事件的核心類
- 創(chuàng)建channel的事件處理鏈DefaultChannelPipeline
protected AbstractChannel(Channel parent) {
this.parent = parent;
id = newId();
unsafe = newUnsafe();
pipeline = newChannelPipeline();
}
到此NioServerSocketChannel的創(chuàng)建已經(jīng)基本完成了庶溶,關于netty pipeline的知識點可以查看我寫的另一邊文章 http://www.reibang.com/p/36803adcbc02
初始化NioServerSocketChannel --- init
直接上ServerBootstrap的init源代碼吧
void init(Channel channel) {
setChannelOptions(channel, newOptionsArray(), logger);
setAttributes(channel, attrs0().entrySet().toArray(EMPTY_ATTRIBUTE_ARRAY));
ChannelPipeline p = channel.pipeline();
final EventLoopGroup currentChildGroup = childGroup;
final ChannelHandler currentChildHandler = childHandler;
final Entry<ChannelOption<?>, Object>[] currentChildOptions;
synchronized (childOptions) {
currentChildOptions = childOptions.entrySet().toArray(EMPTY_OPTION_ARRAY);
}
final Entry<AttributeKey<?>, Object>[] currentChildAttrs = childAttrs.entrySet().toArray(EMPTY_ATTRIBUTE_ARRAY);
p.addLast(new ChannelInitializer<Channel>() {
@Override
public void initChannel(final Channel ch) {
final ChannelPipeline pipeline = ch.pipeline();
ChannelHandler handler = config.handler();
if (handler != null) {
pipeline.addLast(handler);
}
ch.eventLoop().execute(new Runnable() {
@Override
public void run() {
pipeline.addLast(new ServerBootstrapAcceptor(
ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
}
});
}
init方法核心的功能就是通過ChannelInitializer向NioServerSocketChannel綁定的pipeline添加了兩個handler
用戶配置的server handler
netty內(nèi)置的ServerBootstrapAcceptor,這個handler是一個InboundHandler懂鸵,它的主要功能是給連接請求創(chuàng)建的NioSocketChannel設置用戶在ServerBootstrap中指定的參數(shù)偏螺,添加用戶設置的childHandler,使用childGroup對NioSocketChannel進行注冊匆光。
public void channelRead(ChannelHandlerContext ctx, Object msg) {
//對于server端來說 這個msg 其實是一個NioSocketChannel
final Channel child = (Channel) msg;
child.pipeline().addLast(childHandler);
setChannelOptions(child, childOptions, logger);
setAttributes(child, childAttrs);
try {
//使用NioEventLoopGroup對NioSocketChannel進行注冊
childGroup.register(child).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
forceClose(child, future.cause());
}
}
});
} catch (Throwable t) {
forceClose(child, t);
}
}
上面第二個handler添加到pipe中的方式和第一個handler添加的方式不同套像,ServerBootstrapAcceptor添加的方式是向NioServerSocketChannel綁定的NioEventLoop提交了runnable任務,這個任務實現(xiàn)的功能就是把ServerBootstrapAcceptor添加到pipeline中终息。因為在init的時候channel還沒有做register夺巩,所以這個地方觸發(fā)的handlerAdded事件被存放起來,等將來channel register成功之后才會繼續(xù)觸發(fā)相應的handlerAdded方法
NioServerSocketChannel 注冊
注冊主要的功能就是從bossGroup管理的NioEventLoop(正常情況bossGroup中只管理一個NioEventLoop)中取出一個NioEventLoop然后綁定到NioServerSocketChannel上周崭,最終是通過unsafe.register方法實現(xiàn),我們解析下unsafe.register的源代碼
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
ObjectUtil.checkNotNull(eventLoop, "eventLoop");
if (isRegistered()) {
promise.setFailure(new IllegalStateException("registered to an event loop already"));
return;
}
if (!isCompatible(eventLoop)) {
promise.setFailure(
new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
return;
}
//NioServerSocketChannel綁定從bossGroup中分配的NioEventLoop
AbstractChannel.this.eventLoop = eventLoop;
//判斷當前運行線程和eventLoop綁定的線程是不是相同
if (eventLoop.inEventLoop()) {
register0(promise);
} else {
try {
//向eventLoop提交一個執(zhí)行register0()的任務
eventLoop.execute(new Runnable() {
@Override
public void run() {
register0(promise);
}
});
} catch (Throwable t) {
logger.warn(
"Force-closing a channel whose registration task was not accepted by an event loop: {}",
AbstractChannel.this, t);
closeForcibly();
closeFuture.setClosed();
safeSetFailure(promise, t);
}
}
}
我看到執(zhí)行register方法的線程會向NioServerSocketChannel綁定的NioEventLoop提交一個任務柳譬,這個時候eventLoop就會被以線程的方式啟動起來,具體啟動的過程我在http://www.reibang.com/p/732f9dea34d7有解析
- register0
方法register0會在NioServerSocketChannel綁定的線程中執(zhí)行
我解析下它的源代碼
private void register0(ChannelPromise promise) {
try {
// check if the channel is still open as it could be closed in the mean time when the register
// call was outside of the eventLoop
if (!promise.setUncancellable() || !ensureOpen(promise)) {
return;
}
boolean firstRegistration = neverRegistered;
//這個方法是實現(xiàn)了java底層ServerSocketChannel向selector注冊的功能
doRegister();
neverRegistered = false;
registered = true;
// Ensure we call handlerAdded(...) before we actually notify the promise. This is needed as the
// user may already fire events through the pipeline in the ChannelFutureListener.
//執(zhí)行之前被添加到pending handlerAdded事件鏈中的handlerAdded事件续镇,
//這會時候對于NioServerSocketChannel來說ServerBootstrapAcceptor會被添加到pipeline中
pipeline.invokeHandlerAddedIfNeeded();
//設置初始化和注冊成功
safeSetSuccess(promise);
//觸發(fā)pipeline上類型為InboundHandler handler的channelRegistered方法
pipeline.fireChannelRegistered();
// Only fire a channelActive if the channel has never been registered. This prevents firing
// multiple channel actives if the channel is deregistered and re-registered.
//判斷ServerSocketChannel是不是已經(jīng)準備好接受連接請求了
if (isActive()) {
if (firstRegistration) {
pipeline.fireChannelActive();
} else if (config().isAutoRead()) {
// This channel was registered before and autoRead() is set. This means we need to begin read
// again so that we process inbound data.
//
// See https://github.com/netty/netty/issues/4805
beginRead();
}
}
} catch (Throwable t) {
// Close the channel directly to avoid FD leak.
closeForcibly();
closeFuture.setClosed();
safeSetFailure(promise, t);
}
}
//doRegister主要是完成了java底層的ServerSocketChannel向selector綁定
protected void doRegister() throws Exception {
boolean selected = false;
for (;;) {
try {
//這里需要注意的是ServerSocketChannel在selector綁定的感興趣事件是0征绎,這是為什么呢,
//因為這個時候ServerSocketChannel還沒有綁定到具體的ip和端口上磨取,在下面的分析中我們會看到在channelActive事件中人柿,這個interestOps會被修改成16
selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
return;
} catch (CancelledKeyException e) {
if (!selected) {
// Force the Selector to select now as the "canceled" SelectionKey may still be
// cached and not removed because no Select.select(..) operation was called yet.
eventLoop().selectNow();
selected = true;
} else {
// We forced a select operation on the selector before but the SelectionKey is still cached
// for whatever reason. JDK bug ?
throw e;
}
}
}
}
到此NioServerSocketChannel注冊和初始化已經(jīng)完成了
接下來我們看下NioServerSocketChannel如何綁定到服務器的
doBind0
doBind0實現(xiàn)的是向NioServerSocketChannel綁定的NioEventLoop提交了channel綁定到指定ip和端口的任務
private static void doBind0(
final ChannelFuture regFuture, final Channel channel,
final SocketAddress localAddress, final ChannelPromise promise) {
// This method is invoked before channelRegistered() is triggered. Give user handlers a chance to set up
// the pipeline in its channelRegistered() implementation.
channel.eventLoop().execute(new Runnable() {
@Override
public void run() {
if (regFuture.isSuccess()) {
channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
} else {
promise.setFailure(regFuture.cause());
}
}
});
}
我們看下這個方法的調(diào)用鏈
channel.bind --> pipeline.bind --> tail.bind
tail.bind的源代碼如下:
public ChannelFuture bind(final SocketAddress localAddress, final ChannelPromise promise) {
ObjectUtil.checkNotNull(localAddress, "localAddress");
if (isNotValidPromise(promise, false)) {
// cancelled
return promise;
}
//找到符合執(zhí)行要求的OutboundHandler
final AbstractChannelHandlerContext next = findContextOutbound(MASK_BIND);
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeBind(localAddress, promise);
} else {
safeExecute(executor, new Runnable() {
@Override
public void run() {
next.invokeBind(localAddress, promise);
}
}, promise, null, false);
}
return promise;
}
可以看到服務端的bind是從pipeline的尾部開始向頭部找符合要求的handler去執(zhí)行,在pipeline鏈中最后一個符合執(zhí)行要求的是pipeline的head節(jié)點,我們看HeadContext的bind方法
@Override
public void bind(
ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) {
unsafe.bind(localAddress, promise);
}
又看到了我們熟悉的unsafe忙厌,我們看下unsafe.bind源代碼
public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {
assertEventLoop();
if (!promise.setUncancellable() || !ensureOpen(promise)) {
return;
}
// See: https://github.com/netty/netty/issues/576
if (Boolean.TRUE.equals(config().getOption(ChannelOption.SO_BROADCAST)) &&
localAddress instanceof InetSocketAddress &&
!((InetSocketAddress) localAddress).getAddress().isAnyLocalAddress() &&
!PlatformDependent.isWindows() && !PlatformDependent.maybeSuperUser()) {
// Warn a user about the fact that a non-root user can't receive a
// broadcast packet on *nix if the socket is bound on non-wildcard address.
logger.warn(
"A non-root user can't receive a broadcast packet if the socket " +
"is not bound to a wildcard address; binding to a non-wildcard " +
"address (" + localAddress + ") anyway as requested.");
}
boolean wasActive = isActive();
try {
//這個地方是這段代碼的核心
doBind(localAddress);
} catch (Throwable t) {
safeSetFailure(promise, t);
closeIfClosed();
return;
}
//當完成doBind方法后凫岖,這個判斷條件就會為真,然后就會向NioEventLoop提交一個任務逢净,
//這個任務的作用是從pipeline的head依次觸發(fā)pipeline上面所有handler的channelActive方法
//這里需要注意的是head的channelActive方法哥放,我在下面解析
if (!wasActive && isActive()) {
invokeLater(new Runnable() {
@Override
public void run() {
pipeline.fireChannelActive();
}
});
}
safeSetSuccess(promise);
}
doBind
我們看下NioServerSocketChannel.doBind的具體實現(xiàn)
protected void doBind(SocketAddress localAddress) throws Exception {
//針對不同JDK版本,ServerSocketChannel綁定到指定的ip和端口的方式不同
if (PlatformDependent.javaVersion() >= 7) {
javaChannel().bind(localAddress, config.getBacklog());
} else {
javaChannel().socket().bind(localAddress, config.getBacklog());
}
}
Head handler channelActive方法
我看到head handler的channelActive方法會先觸發(fā)pipeline上面別的handler的channelActive方法爹土,最后它還會執(zhí)行readIfIsAutoRead方法甥雕,這個方法的作用就是將ServerSocketChannel在selector上注冊的感興趣事件修改成OP_ACCEPT
@Override
public void channelActive(ChannelHandlerContext ctx) {
ctx.fireChannelActive();
readIfIsAutoRead();
}
我看下readIfIsAutoRead()的調(diào)用鏈
readIfIsAutoRead --> channel.read() --> pipeline.read() --> tail.read();
這個調(diào)用鏈是不是很熟悉,和上面channel.bind的調(diào)用鏈如出一轍
我們看下tail.read的源代碼
public ChannelHandlerContext read() {
//從pipeline中找到符合MASK_READ的OutboundHandler胀茵,最后會找到headContext
final AbstractChannelHandlerContext next = findContextOutbound(MASK_READ);
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeRead();
} else {
Tasks tasks = next.invokeTasks;
if (tasks == null) {
next.invokeTasks = tasks = new Tasks(next);
}
executor.execute(tasks.invokeReadTask);
}
return this;
}
我再來分析下headContext.invokeRead()方法調(diào)用鏈
headContext.invokeRead() --> headHandler.read() -->unsafe.beginRead() -->channel.doBeginRead()
我看channel.doBeginRead()源代碼
protected void doBeginRead() throws Exception {
// Channel.read() or ChannelHandlerContext.read() was called
final SelectionKey selectionKey = this.selectionKey;
if (!selectionKey.isValid()) {
return;
}
readPending = true;
//在channelActive事件發(fā)生之前interestOps為0
final int interestOps = selectionKey.interestOps();
if ((interestOps & readInterestOp) == 0) {
//修改selectionKey感興趣事件為readInterestOp社露,對NioServerSocketChannel來說readInterestOp為OP_ACCEPT
selectionKey.interestOps(interestOps | readInterestOp);
}
}
到此就完成了整個Server端啟動分析