本文基于Netty 4
在討論Netty服務(wù)器啟動(dòng)之前俩檬,先回顧一下服務(wù)端使用Java nio selector的啟動(dòng)過程:
//1. 獲取服務(wù)端通道
ServerSocketChannel ssChannel = ServerSocketChannel.open();
//2. 綁定端口
ssChannel.bind(new InetSocketAddress(9898));
//3. 設(shè)置為非阻塞模式
ssChannel.configureBlocking(false);
Selector selector = Selector.open();
//4. 向監(jiān)聽器注冊(cè)accept事件
ssChannel.register(selector, SelectionKey.OP_ACCEPT);
while (selector.select() > 0) {
//5. 獲取監(jiān)聽器上所有的監(jiān)聽事件值
Iterator<SelectionKey> it = selector.selectedKeys().iterator();
while (it.hasNext()) {
SelectionKey key = it.next();
if (key.isAcceptable()) {
ServerSocketChannel server = (ServerSocketChannel)key.channel();
SocketChannel socketChannel = server.accept();
socketChannel.configureBlocking(false);
//注冊(cè)read事件
socketChannel.register(selector, SelectionKey.OP_READ);
} else if (key.isReadable()) {
SocketChannel channel = (SocketChannel) key.channel();
readMsg(channel);
} else if(...) {
...
}
it.remove();
}
}
Netty nio模式的啟動(dòng)過程:
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup);
b.channel(NioServerSocketChannel.class);
b.option(ChannelOption.SO_BACKLOG, 128);
b.childOption(ChannelOption.SO_REUSEADDR, true);
b.childOption(ChannelOption.TCP_NODELAY, true);
b.childOption(ChannelOption.SO_KEEPALIVE, true);
b.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast("http-decoder", new HttpRequestDecoder());
pipeline.addLast("aggregator", new HttpObjectAggregator(1048576));
pipeline.addLast("encoder", new HttpResponseEncoder());
pipeline.addLast(new MyInboundHandler());
}
});
ChannelFuture f = b.bind(8080).sync();
f.channel().closeFuture().sync();
} finally {
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
}
其實(shí),無論是Java nio,還是Netty nio,總體都包含兩個(gè)方面:1. 線程模型则酝;2. IO模型;下面先分析Netty的線程模型闰集。
Netty線程啟動(dòng)
NioEventLoopGroup是Netty線程的核心沽讹,下面看一下這個(gè)類的初始化
NioEventLoopGroup # 構(gòu)造方法
public NioEventLoopGroup() {
this(0);
}
//無參的構(gòu)造轉(zhuǎn)調(diào)到了這里
public NioEventLoopGroup(int nThreads) {
//這里的Executor傳入了null
this(nThreads, (Executor) null);
}
上面的構(gòu)造方法最終會(huì)調(diào)用父類的構(gòu)造方法
MultithreadEventLoopGroup # 構(gòu)造方法
protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
//這里若threads==0般卑,會(huì)初始化一個(gè)值;
//該初始值的算法 Math.max(1, SystemPropertyUtil.getInt("io.netty.eventLoopThreads", Runtime.getRuntime().availableProcessors() * 2))
super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
}
最終走到這里 MultithreadEventExecutorGroup # 構(gòu)造方法
protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
EventExecutorChooserFactory chooserFactory, Object... args) {
if (nThreads <= 0) {
throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
}
if (executor == null) {
//之前executor設(shè)為了null爽雄,這里會(huì)賦一個(gè)值蝠检,這個(gè)executor的作用是創(chuàng)建新 Thread并運(yùn)行任務(wù);后面會(huì)發(fā)現(xiàn)它的作用
executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
}
//這里會(huì)根據(jù)threads的值創(chuàng)建若干個(gè)EventExecutor
children = new EventExecutor[nThreads];
for (int i = 0; i < nThreads; i ++) {
boolean success = false;
try {
//往里看一下children為何物
children[i] = newChild(executor, args);
success = true;
} catch (Exception e) {
// TODO: Think about if this is a good exception type
throw new IllegalStateException("failed to create a child event loop", e);
} finally {
if (!success) {
.........
}
}
}
//初始了chooser挚瘟,后面會(huì)使用該chooser選擇一個(gè)EventLoop
chooser = chooserFactory.newChooser(children);
final FutureListener<Object> terminationListener = new FutureListener<Object>() {
@Override
public void operationComplete(Future<Object> future) throws Exception {
if (terminatedChildren.incrementAndGet() == children.length) {
terminationFuture.setSuccess(null);
}
}
};
for (EventExecutor e: children) {
e.terminationFuture().addListener(terminationListener);
}
Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length);
Collections.addAll(childrenSet, children);
readonlyChildren = Collections.unmodifiableSet(childrenSet);
}
NioEventLoopGroup # newChild()
上面的children[] 最終保存的是NioEventLoop
protected EventLoop newChild(Executor executor, Object... args) throws Exception {
return new NioEventLoop(this, executor, (SelectorProvider) args[0],
((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2]);
}
NioEventLoop中最核心的方法就是execute()
public void execute(Runnable task) {
if (task == null) {
throw new NullPointerException("task");
}
boolean inEventLoop = inEventLoop();
//系統(tǒng)剛啟動(dòng)時(shí)叹谁,NioEventLoop中的Thread肯定為null,且state==ST_NOT_STARTED乘盖,那么肯定會(huì)走到else中
if (inEventLoop) {
addTask(task);
} else {
//這里會(huì)走到doStartThread()
startThread();
//startThread()方法如下
////////////////////////////////////////////
if (STATE_UPDATER.get(this) == ST_NOT_STARTED) {
//改變EventLoop的狀態(tài)并啟動(dòng)真正干活的線程焰檩,CAS操作保證了每個(gè)EventLoop只啟動(dòng)一個(gè)線程
if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
doStartThread();
}
}
////////////////////////////////////////////
addTask(task);
if (isShutdown() && removeTask(task)) {
reject();
}
}
if (!addTaskWakesUp && wakesUpForTask(task)) {
wakeup(inEventLoop);
}
}
private void doStartThread() {
assert thread == null;
//這個(gè)executor就是上文在初始化時(shí)創(chuàng)建的 ThreadPerTaskExecutor
//ThreadPerTaskExecutor 會(huì)創(chuàng)建新的線程
executor.execute(new Runnable() {
@Override
public void run() {
//這個(gè)thread常用于inEventLoop()方法
thread = Thread.currentThread();
if (interrupted) {
thread.interrupt();
}
boolean success = false;
updateLastExecutionTime();
try {
//run()為abstract方法,子類會(huì)實(shí)現(xiàn)
SingleThreadEventExecutor.this.run();
success = true;
} catch (Throwable t) {
logger.warn("Unexpected exception from an event executor: ", t);
} finally {
......
}
}
});
}
NioEventLoop # run()
該方法會(huì)進(jìn)入死循環(huán)订框,不停地按預(yù)先分配的時(shí)間比例處理IO任務(wù)和其他的例如scheduled任務(wù)
protected void run() {
for (;;) {
try {
switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
case SelectStrategy.CONTINUE:
continue;
case SelectStrategy.SELECT:
//NIO多路復(fù)用選擇器
select(wakenUp.getAndSet(false));
if (wakenUp.get()) {
selector.wakeup();
}
default:
// fallthrough
}
cancelledKeys = 0;
needsToSelectAgain = false;
final int ioRatio = this.ioRatio;
if (ioRatio == 100) {
try {
processSelectedKeys();
} finally {
// Ensure we always run tasks.
runAllTasks();
}
} else {
final long ioStartTime = System.nanoTime();
try {
processSelectedKeys();
} finally {
// Ensure we always run tasks.
final long ioTime = System.nanoTime() - ioStartTime;
runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
}
}
} catch (Throwable t) {
handleLoopException(t);
}
// Always handle shutdown even if the loop processing threw an exception.
......
}
}
通過上面的介紹锅尘,我們知道了初始化NioEventLoopGroup時(shí)會(huì)創(chuàng)建nThreads個(gè)NioEventLoop,當(dāng)有新task到來時(shí)布蔗,會(huì)根據(jù)其所屬Chanenl選擇對(duì)應(yīng)的EventLoop執(zhí)行(execute())
線程已經(jīng)先行啟動(dòng)藤违,等著數(shù)據(jù)的到來;接下來看一下IO相關(guān)的初始化纵揍,Netty的IO模型是多路復(fù)用顿乒;
IO的啟動(dòng)
ServerBootstrap是IO的核心類,下面就從這個(gè)類入手泽谨,看一下IO的啟動(dòng)過程
1. 參數(shù)的設(shè)置
ServerBootstrap b = new ServerBootstrap();
//上面啟動(dòng)的線程(池)設(shè)置在這里
b.group(bossGroup, workerGroup);
b.channel(NioServerSocketChannel.class);
//tcp 三次握手使用的參數(shù)
b.option(ChannelOption.SO_BACKLOG, 128);
2. 綁定端口
在繼續(xù)深入源碼之前璧榄,需要區(qū)分一下register的含義;Java NIO的register一般指把ServerSocketChannel注冊(cè)到Selector上吧雹;在Netty中骨杂,注冊(cè)的含義不僅僅包含這層含義,還包括把NioServerSocketChannel與EventLoop關(guān)聯(lián)在一起
bind(port)會(huì)進(jìn)入AbstractBootstrap的doBind()中
AbstractBootstrap # doBind()
private ChannelFuture doBind(final SocketAddress localAddress) {
final ChannelFuture regFuture = initAndRegister();
// 注冊(cè)已經(jīng)在EventLoop中進(jìn)行雄卷,因此下列方法可能和注冊(cè)過程并發(fā)進(jìn)行
final Channel channel = regFuture.channel();
if (regFuture.cause() != null) {
return regFuture;
}
// 注冊(cè)是通過線程池完成搓蚪,因此這里要判斷是否注冊(cè)完成
// 注冊(cè)過程可能很快,這里已經(jīng)完成
if (regFuture.isDone()) {
// At this point we know that the registration was complete and successful.
ChannelPromise promise = channel.newPromise();
// 綁定本地端口
doBind0(regFuture, channel, localAddress, promise);
return promise;
} else {
// Registration future is almost always fulfilled already, but just in case it's not.
// 沒有完成注冊(cè)的話丁鹉,就注冊(cè)一個(gè)監(jiān)聽器妒潭,注冊(cè)完成后進(jìn)行回調(diào)
final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
regFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
Throwable cause = future.cause();
if (cause != null) {
// Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
// IllegalStateException once we try to access the EventLoop of the Channel.
promise.setFailure(cause);
} else {
// Registration was successful, so set the correct executor to use.
// See https://github.com/netty/netty/issues/2586
promise.registered();
doBind0(regFuture, channel, localAddress, promise);
}
}
});
return promise;
}
}
AbstractBootstrap # initAndRegister()
這個(gè)方法開始使用前面已經(jīng)初始化的NioEventLoopGroup
final ChannelFuture initAndRegister() {
Channel channel = null;
try {
//設(shè)置參數(shù)時(shí),有 b.channel(NioServerSocketChannel.class)揣钦,
//因此這里的channel實(shí)際就是 NioServerSocketChannel
channel = channelFactory.newChannel();
init(channel);
} catch (Throwable t) {
......
return ......
}
//這里的group()實(shí)際返回的就是bossGroup雳灾,即上面創(chuàng)建的第一個(gè)NioEventLoopGroup,因此真正的注冊(cè)任務(wù)是由線程池來完成冯凹;
//只有當(dāng)EventLoop與Channel關(guān)聯(lián)在一起谎亩,才能算注冊(cè)成功
ChannelFuture regFuture = config().group().register(channel);
.......
return regFuture;
}
在真正注冊(cè)之前,會(huì)先實(shí)例化NioServerSocketChannel,接下來就看一下NioServerSocketChannel初始化過程匈庭;NioServerSocketChannel的繼承關(guān)系如下:
channelFactory.newChannel()會(huì)調(diào)用NioServerSocketChannel的構(gòu)造方法:
public NioServerSocketChannel(ServerSocketChannel channel) {
// NioServerSocketChannel 關(guān)心的事件是Accept夫凸,即新的客戶端連接
super(null, channel, SelectionKey.OP_ACCEPT);
//這里也會(huì)進(jìn)行內(nèi)存分配相關(guān)類的初始化
config = new NioServerSocketChannelConfig(this, javaChannel().socket());
}
AbstractChannel
protected AbstractChannel(Channel parent) {
this.parent = parent;
//每個(gè)channel都有一個(gè)id
id = newId();
//真正與下層交互的東東,比如讀寫socket等操作
unsafe = newUnsafe();
//初始化為DefaultChannelPipeline
pipeline = newChannelPipeline();
}
NioServerSocketChannel已初始化完畢嚎花,但還沒有與EventLoopGroup或EventLoop發(fā)生任何聯(lián)系
ServerBootstrap # init()
//上面已完成Channel的初始化,這里的channel就是 NioServerSocketChannel
void init(Channel channel) throws Exception {
//一些attrs呀洲、options紊选、handler以及childAttrs、childOptions道逗、childHandler的設(shè)置
.......
//這里就是上面創(chuàng)建的DefaultChannelPipeline
ChannelPipeline p = channel.pipeline();
//之前創(chuàng)建的第二個(gè)NioEventLoopGroup(workerGroup)
final EventLoopGroup currentChildGroup = childGroup;
final ChannelHandler currentChildHandler = childHandler;
//pipeline的鏈表最后加入了一個(gè)ChannelHandler
p.addLast(new ChannelInitializer<Channel>() {
@Override
public void initChannel(Channel ch) throws Exception {
final ChannelPipeline pipeline = ch.pipeline();
ChannelHandler handler = config.handler();
if (handler != null) {
pipeline.addLast(handler);
}
// We add this handler via the EventLoop as the user may have used a ChannelInitializer as handler.
// In this case the initChannel(...) method will only be called after this method returns. Because
// of this we need to ensure we add our handler in a delayed fashion so all the users handler are
// placed in front of the ServerBootstrapAcceptor.
ch.eventLoop().execute(new Runnable() {
@Override
public void run() {
pipeline.addLast(new ServerBootstrapAcceptor(
currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
}
});
}
在addLast時(shí)兵罢,有一點(diǎn)要注意:如果還沒有完成注冊(cè),則把Handler保存在一個(gè)臨時(shí)變量中滓窍,等注冊(cè)完畢后再調(diào)用相應(yīng)方法卖词;如果已完成注冊(cè),則應(yīng)調(diào)用handlerAdded()等回調(diào)方法吏夯;
DefaultChannelPipeline # addLast()
public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
final AbstractChannelHandlerContext newCtx;
synchronized (this) {
checkMultiplicity(handler);
newCtx = newContext(group, filterName(name, handler), handler);
// 加入到Handler鏈中
addLast0(newCtx);
// 由前面的分析可知此蜈,系統(tǒng)剛啟動(dòng)時(shí)init()會(huì)走到這一步,此時(shí)并沒有完成注冊(cè)噪生,因此會(huì)進(jìn)入if裆赵;
if (!registered) {
newCtx.setAddPending();
//在注冊(cè)完成后,會(huì)調(diào)用pendingHandlerCallbackHead的相關(guān)方法
callHandlerCallbackLater(newCtx, true);
return this;
}
EventExecutor executor = newCtx.executor();
if (!executor.inEventLoop()) {
newCtx.setAddPending();
executor.execute(new Runnable() {
@Override
public void run() {
callHandlerAdded0(newCtx);
}
});
return this;
}
}
callHandlerAdded0(newCtx);
return this;
}
callHandlerCallbackLater()
private void callHandlerCallbackLater(AbstractChannelHandlerContext ctx, boolean added) {
assert !registered;
PendingHandlerCallback task = added ? new PendingHandlerAddedTask(ctx) : new PendingHandlerRemovedTask(ctx);
PendingHandlerCallback pending = pendingHandlerCallbackHead;
if (pending == null) {
pendingHandlerCallbackHead = task;
} else {
// Find the tail of the linked-list.
while (pending.next != null) {
pending = pending.next;
}
// 注冊(cè)完成后跺嗽,按照鏈的順序依次調(diào)用
pending.next = task;
}
}
addLast()完成后战授,Pipeline中Handler鏈的順序如下:
init()完成后,開始異步注冊(cè)
AbstractBootStrap # initAndRegister()
final ChannelFuture initAndRegister() {
Channel channel = null;
try {
channel = channelFactory.newChannel();
init(channel);
} catch (Throwable t) {
.......
}
// 異步注冊(cè)
// 使用chooser在EventLoopGroup中選擇一個(gè)EventLoop進(jìn)行注冊(cè)
ChannelFuture regFuture = config().group().register(channel);
......
return regFuture;
}
SingleThreadEventLoop # register()
@Override
public ChannelFuture register(Channel channel) {
return register(new DefaultChannelPromise(channel, this));
}
@Override
public ChannelFuture register(final ChannelPromise promise) {
ObjectUtil.checkNotNull(promise, "promise");
promise.channel().unsafe().register(this, promise);
return promise;
}
AbstractChannel # AbstractUnsafe # register()
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
if (eventLoop == null) {
throw new NullPointerException("eventLoop");
}
if (isRegistered()) {
promise.setFailure(new IllegalStateException("registered to an event loop already"));
return;
}
if (!isCompatible(eventLoop)) {
promise.setFailure(
new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
return;
}
// NioServerSocketChannel的eventLoop 賦了值
AbstractChannel.this.eventLoop = eventLoop;
// 系統(tǒng)剛啟動(dòng)時(shí)桨嫁,EventLoopGroup中所有的EventLoop都是未啟動(dòng)狀態(tài)植兰,EventLoop的thread屬性也為null
if (eventLoop.inEventLoop()) {
register0(promise);
} else {
try {
// 由上文可知,execute()方法用來提交任務(wù)璃吧,啟動(dòng)一個(gè)新的EventLoop
// 在此楣导,channel已經(jīng)和EventLoop產(chǎn)生了聯(lián)系
eventLoop.execute(new Runnable() {
@Override
public void run() {
register0(promise);
}
});
} catch (Throwable t) {
........
}
}
}
AbstractChannel # AbstractUnsafe # register0()
private void register0(ChannelPromise promise) {
try {
// check if the channel is still open as it could be closed in the mean time when the register
// call was outside of the eventLoop
if (!promise.setUncancellable() || !ensureOpen(promise)) {
return;
}
boolean firstRegistration = neverRegistered;
// JDK的注冊(cè):將ServerSocketChannel注冊(cè)到selector上;有一點(diǎn)要注意畜挨,這里注冊(cè)的時(shí)候爷辙,并沒有注冊(cè)感興趣的事件;
// selectionKey = javaChannel().register(eventLoop().selector, 0, this);
doRegister();
neverRegistered = false;
registered = true;
// 到這里注冊(cè)完成
// Ensure we call handlerAdded(...) before we actually notify the promise. This is needed as the
// user may already fire events through the pipeline in the ChannelFutureListener.
pipeline.invokeHandlerAddedIfNeeded();
safeSetSuccess(promise);
pipeline.fireChannelRegistered();
// Only fire a channelActive if the channel has never been registered. This prevents firing
// multiple channel actives if the channel is deregistered and re-registered.
if (isActive()) {
if (firstRegistration) {
pipeline.fireChannelActive();
} else if (config().isAutoRead()) {
// This channel was registered before and autoRead() is set. This means we need to begin read
// again so that we process inbound data.
//
// See https://github.com/netty/netty/issues/4805
beginRead();
}
}
} catch (Throwable t) {
// Close the channel directly to avoid FD leak.
closeForcibly();
closeFuture.setClosed();
safeSetFailure(promise, t);
}
}
到這里可以看出Netty中完成注冊(cè)的標(biāo)識(shí)必須滿足下面三個(gè)條件:
- NioServerSocketChannel 的eventLoop 字段賦上了值
- 這個(gè)EventLoop要啟動(dòng)(新建的EventLoop是未啟動(dòng)狀態(tài))
- JDK級(jí)別的ServerSocketChannel 注冊(cè)到selector上(不一定有事件)
在前面我們準(zhǔn)備了一些動(dòng)作必須等注冊(cè)完成后才能觸發(fā)朦促,這里再回顧一下是哪些動(dòng)作:
a. AbstractBootstrap # doBind() 里注冊(cè)了一個(gè)監(jiān)聽器膝晾,注冊(cè)完成后調(diào)用;
b. DefaultChannelPipeline # addLast() 里注冊(cè)了一個(gè)回調(diào)鏈务冕,也是等注冊(cè)完成后調(diào)用血当,實(shí)際就是調(diào)用handlerAdded();
注冊(cè)后繼續(xù)做了如下動(dòng)作:
pipeline.invokeHandlerAddedIfNeeded();
// 設(shè)置成功標(biāo)志,這里會(huì)調(diào)用listener臊旭;前面注冊(cè)的ChannelFutureListener就會(huì)在此調(diào)用落恼,進(jìn)行本地端口的綁定;
safeSetSuccess(promise);
pipeline.fireChannelRegistered();
//
if (isActive()) {
if (firstRegistration) {
pipeline.fireChannelActive();
} else if (config().isAutoRead()) {
beginRead();
}
}
pipeline.invokeHandlerAddedIfNeeded()會(huì)走到下面代碼
private void callHandlerAddedForAllHandlers() {
final PendingHandlerCallback pendingHandlerCallbackHead;
synchronized (this) {
assert !registered;
// This Channel itself was registered.
registered = true;
pendingHandlerCallbackHead = this.pendingHandlerCallbackHead;
// Null out so it can be GC'ed.
this.pendingHandlerCallbackHead = null;
}
// 這里就是 上文添加的Handler鏈离熏;ChannelInitializer就是其中之一
PendingHandlerCallback task = pendingHandlerCallbackHead;
while (task != null) {
// 這里會(huì)調(diào)用到ChannelInitializer的initChannel()
task.execute();
task = task.next;
}
}
ChannelInitializer # initChannel()
private boolean initChannel(ChannelHandlerContext ctx) throws Exception {
if (initMap.putIfAbsent(ctx, Boolean.TRUE) == null) { // Guard against re-entrance.
try {
initChannel((C) ctx.channel());
} catch (Throwable cause) {
.......
} finally {
remove(ctx);
}
return true;
}
return false;
}
上述操作會(huì)導(dǎo)致Pipeline中Handler鏈發(fā)生下面的變化:
safeSetSuccess(promise) 會(huì)走到如下代碼
DefaultPromise # trySuccess()
public boolean trySuccess(V result) {
if (setSuccess0(result)) {
//調(diào)用listener佳谦;上文注冊(cè)的ChannelFutureListener就會(huì)在此被調(diào)用
notifyListeners();
return true;
}
return false;
}
到此Netty服務(wù)器已經(jīng)啟動(dòng),等待客戶端連接滋戳,啟動(dòng)大致流程如下:
觸發(fā)channelActive()
服務(wù)器注冊(cè)完成后钻蔑,Pipeline中Handler鏈的結(jié)構(gòu)如下,此時(shí)調(diào)用pipeline.fireChannelActive()
設(shè)置ACCEPT事件
DefaultChannelPipeline # fireChannelActive()
public final ChannelPipeline fireChannelActive() {
// 這里最終會(huì)走到HeadContext中
AbstractChannelHandlerContext.invokeChannelActive(head);
return this;
}
HeadContext # channelActive()
public void channelActive(ChannelHandlerContext ctx) throws Exception {
//這個(gè)會(huì)沿著Handler鏈的方向調(diào)用下一個(gè)handler的方法
//在目前的Handler鏈中奸鸯,其實(shí)什么都沒做
ctx.fireChannelActive();
readIfIsAutoRead();
/////////////////////////////////////////// readIfIsAutoRead的方法體
private void readIfIsAutoRead() {
if (channel.config().isAutoRead()) {
//這個(gè)是可以配置關(guān)閉的咪笑,默認(rèn)都是打開;這個(gè)開關(guān)與限流有關(guān)
// 這個(gè)方法最終走到AbstractNioChannel的doBeginRead()
channel.read();
}
}
///////////////////////////////////////////
}
AbstractNioChannel # doBeginRead()
protected void doBeginRead() throws Exception {
// Channel.read() or ChannelHandlerContext.read() was called
final SelectionKey selectionKey = this.selectionKey;
if (!selectionKey.isValid()) {
return;
}
readPending = true;
final int interestOps = selectionKey.interestOps();
if ((interestOps & readInterestOp) == 0) {
// readInterestOp在channel初始化時(shí)設(shè)為了accept
selectionKey.interestOps(interestOps | readInterestOp);
}
}
由上文可知娄涩,注冊(cè)時(shí)沒有注冊(cè)感興趣的事件 (selectionKey = javaChannel().register(eventLoop().selector, 0, this)窗怒,當(dāng)時(shí)傳入的是0)。這里才完成感興趣事件的設(shè)置(至于為什么滯后設(shè)置蓄拣,我覺得是要把所有相關(guān)的類都初始化完成后才注冊(cè)扬虚,否則在這個(gè)過程中有客戶端訪問,可能會(huì)出錯(cuò))球恤。此時(shí)EventLoop的循環(huán)才真正可以檢查IO事件孔轴。
接收客戶端連接
當(dāng)客戶端連接到來時(shí),EventLoop的循環(huán)方法(run())會(huì)檢測(cè)到這一事件碎捺,進(jìn)入processSelectedKey()方法
NioEventLoop # processSelectedKey()
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
......
try {
......
//處理 read or accept
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
// 調(diào)用NioMessageUnsafe的read()
unsafe.read();
if (!ch.isOpen()) {
// Connection already closed - no need to handle write.
return;
}
}
} catch (CancelledKeyException ignored) {
unsafe.close(unsafe.voidPromise());
}
}
NioMessageUnsafe # read()
public void read() {
assert eventLoop().inEventLoop();
final ChannelConfig config = config();
final ChannelPipeline pipeline = pipeline();
final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
allocHandle.reset(config);
boolean closed = false;
Throwable exception = null;
try {
try {
do {
int localRead = doReadMessages(readBuf);
///////////////////////////////////////////// NioServerSocketChannel.doReadMessages()
SocketChannel ch = javaChannel().accept();
try {
if (ch != null) {
buf.add(new NioSocketChannel(this, ch));
return 1;
}
} catch (Throwable t) {
......
}
return 0;
/////////////////////////////////////////////
......
allocHandle.incMessagesRead(localRead);
} while (allocHandle.continueReading());
} catch (Throwable t) {
exception = t;
}
int size = readBuf.size();
for (int i = 0; i < size; i ++) {
readPending = false;
pipeline.fireChannelRead(readBuf.get(i));
}
readBuf.clear();
allocHandle.readComplete();
pipeline.fireChannelReadComplete();
if (exception != null) {
closed = closeOnReadError(exception);
pipeline.fireExceptionCaught(exception);
}
.......
} finally {
.......
}
}
這里會(huì)調(diào)用accept()接受客戶端連接并保存在list中路鹰;接著觸發(fā)了pipeline.fireChannelRead(),channelRead事件就會(huì)從HeadContext傳播到ServerBootstrapAcceptor
ServerBootstrapAcceptor # channelRead()
public void channelRead(ChannelHandlerContext ctx, Object msg) {
//msg 是前文accept()接收的NioSocketChannel
final Channel child = (Channel) msg;
// 添加ChannelInitializer
child.pipeline().addLast(childHandler);
for (Entry<ChannelOption<?>, Object> e: childOptions) {
try {
if (!child.config().setOption((ChannelOption<Object>) e.getKey(), e.getValue())) {
logger.warn("Unknown channel option: " + e);
}
} catch (Throwable t) {
logger.warn("Failed to set a channel option: " + child, t);
}
}
for (Entry<AttributeKey<?>, Object> e: childAttrs) {
child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
}
try {
//childGroup為workerGroup
childGroup.register(child).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
forceClose(child, future.cause());
}
}
});
} catch (Throwable t) {
forceClose(child, t);
}
}
上面的方法會(huì)將accept的SocketChannel進(jìn)行注冊(cè)收厨,與ServerSocketChannel的注冊(cè)唯一不同的是此處會(huì)把Handler與另一個(gè)NioEventLoopGroup關(guān)聯(lián)晋柱;下面的代碼是我們對(duì)這個(gè)socket的初始設(shè)置。
ServerBootstrap b = new ServerBootstrap();
b.childOption(ChannelOption.SO_REUSEADDR, true);
b.childOption(ChannelOption.TCP_NODELAY, true);
b.childOption(ChannelOption.SO_KEEPALIVE, true);
b.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast("http-decoder", new HttpRequestDecoder());
pipeline.addLast("aggregator", new HttpObjectAggregator(1048576));
pipeline.addLast("encoder", new HttpResponseEncoder());
pipeline.addLast(new MyInboundHandler());
}
});
Tips: Netty中將與accept()返回的socket有關(guān)的參數(shù)稱之為childXXX诵叁,比如childOption()就是給該socket設(shè)置參數(shù)雁竞,childHandler()是給該socket設(shè)置處理器
channelRead()后蚂斤,系統(tǒng)中就形成了一條新的Handler鏈翘簇,其結(jié)構(gòu)如下限寞;之后與客戶端的讀寫交互都由這個(gè)Handler鏈處理