我們帶著問題去看源碼艘款,下面是我的問題:
- serverSocketChannel是什么時候與selector綁定的持际,流程是什么
- 啟動的線程數(shù)有多少,什么時候new的蜘欲,什么時候start的
- 有那些地方需要用鎖
前提概要:netty代碼非常深,盡量寫詳細,貼出代碼中可以看todo的注解年碘。2.1章節(jié)為重點闷祥,提到了線程的創(chuàng)建與start
可以先看最后的總結(jié)再看凯砍。
服務(wù)端代碼
主要是分析
- NioEventLoopGroup bossGroup = new NioEventLoopGroup();
- ChannelFuture cf = bootstrap.bind(6666).sync();
public static void main(String[] args) {
//1.創(chuàng)建Boss NioEventLoopGroup , Worker NioEventLoopGroup
NioEventLoopGroup bossGroup = new NioEventLoopGroup();
NioEventLoopGroup workerGroup = new NioEventLoopGroup();
try {
//2.創(chuàng)建服務(wù)器端的啟動對象,配置參數(shù)
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
//線程隊列得到連接個數(shù)
.option(ChannelOption.SO_BACKLOG, 128)
//設(shè)置保持活動鏈接狀態(tài)
.childOption(ChannelOption.SO_KEEPALIVE, true)
//給workGroup的EventLoop對應(yīng)的管道設(shè)置handler
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast(new NettyServerHandler());
}
});
//3.綁定端口并且同步
ChannelFuture cf = bootstrap.bind(6666).sync();
//4.對關(guān)閉通道進行監(jiān)聽
cf.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
猜想
NioEventLoopGroup bossGroup = new NioEventLoopGroup();
- 創(chuàng)建NioEventLoop
- 創(chuàng)建Selector
ChannelFuture cf = bootstrap.bind(6666).sync();
- 創(chuàng)建ServerSocketChannel
- 選出一個Selector于ServerSocketChannel綁定
實際
1.NioEventLoopGroup bossGroup = new NioEventLoopGroup();
protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
EventExecutorChooserFactory chooserFactory, Object... args) {
// TODO: 不指定的時候,線程數(shù)默認cpu核數(shù) *2
if (nThreads <= 0) {
throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
}
//TODO:executor.execute(Runnable command) 使用ThreadFactory創(chuàng)建一個新線程并且start();
if (executor == null) {
executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
}
children = new EventExecutor[nThreads];
for (int i = 0; i < nThreads; i ++) {
boolean success = false;
try {
// TODO: 創(chuàng)建NioEventLoop
children[i] = newChild(executor, args);
success = true;
} catch (Exception e) {
throw new IllegalStateException("failed to create a child event loop", e);
} finally {
if (!success) {
for (int j = 0; j < i; j ++) {
children[j].shutdownGracefully();
}
for (int j = 0; j < i; j ++) {
EventExecutor e = children[j];
try {
while (!e.isTerminated()) {
e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
}
} catch (InterruptedException interrupted) {
// Let the caller handle the interruption.
Thread.currentThread().interrupt();
break;
}
}
}
}
}
// TODO: 根據(jù)nThreads線程數(shù)來選出選擇選擇器
chooser = chooserFactory.newChooser(children);
final FutureListener<Object> terminationListener = new FutureListener<Object>() {
@Override
public void operationComplete(Future<Object> future) throws Exception {
if (terminatedChildren.incrementAndGet() == children.length) {
terminationFuture.setSuccess(null);
}
}
};
// TODO:加一個監(jiān)聽器
for (EventExecutor e: children) {
e.terminationFuture().addListener(terminationListener);
}
//TODO: 做出一個只讀的集合
Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length);
Collections.addAll(childrenSet, children);
readonlyChildren = Collections.unmodifiableSet(childrenSet);
}
我覺得需要著重注意一下executor這個屬性類型ThreadPerTaskExecutor啦鸣,因為代碼中充斥了大量的使用這個,下面我們?nèi)タ碞ioEventLoop的創(chuàng)建诫给,時刻記得我們的問題什么時候創(chuàng)建的線程,什么時候啟動的線程扑毡?
1.2 children[i] = newChild(executor, args);
這里調(diào)用的邏輯為NioEventLoopGroup類中的
創(chuàng)建好的NioEventLoop
- 所以這個時候并沒有創(chuàng)建NioEventLoop里的線程
- 創(chuàng)建了Selector
- 有一個隊列底層使用:Mpsc.newMpscQueue();苦掘,底層使用MpscUnboundedArrayQueue.class,傳值1024
- 有一個threadLock
-
exector較NioEventLoopGroup的exector又包裝了一層 挺邀,exector類型為ThreadExecutorMap.class 泣矛。很顯然eventLoop.exector()的時候在開始的時候會來調(diào)用這個NioEventLoopGroup的exector中的ThreadFactory去newThread并且start()您朽。
- ThreadExecutorMap里面存儲:每個線程一個NioEventLoop
1.3 chooser = chooserFactory.newChooser(children);
public EventExecutorChooser newChooser(EventExecutor[] executors) {
if (isPowerOfTwo(executors.length)) {
return new PowerOfTwoEventExecutorChooser(executors);
} else {
return new GenericEventExecutorChooser(executors);
}
}
根據(jù)線程數(shù)初始化選擇器,如果是偶數(shù)倍试,則選PowerOfTwoEventExecutorChooser(count &(len -1))涮母,這樣的效率比取余數(shù)效率更高叛本。
1 小總結(jié)
- NioEventLoopGroup除了有NioEventLoop的數(shù)組来候,還要一個只讀的集合吠勘。ServerBootstrap.class里面也有一個多有數(shù)據(jù)只讀的屬性ServerBootstrapConfig.class
- 這一步只是創(chuàng)建了Selector和choose
2.ChannelFuture cf = bootstrap.bind(6666).sync();
代碼太多了植锉,這里打算不粘貼全部源碼了俊庇,只寫出我覺得是重點的部分辉饱,可以打開源碼看文檔彭沼,搭配食用
調(diào)用io.netty.bootstrap.AbstractBootstrap#doBind
//TODO: 根據(jù)反射創(chuàng)建ServerSocketChannel,完成注冊
final ChannelFuture regFuture = initAndRegister();
//TODO: ServerSocketChannel地址綁定
doBind0(regFuture, channel, localAddress, promise);
2.1 final ChannelFuture regFuture = initAndRegister();
final ChannelFuture initAndRegister() {
Channel channel = null;
try {
//TODO: 反射初始化Channel于毙,
channel = channelFactory.newChannel();
//TODO: 小重點唯沮,初始化channel
init(channel);
} catch (Throwable t) {
if (channel != null) {
// channel can be null if newChannel crashed (eg SocketException("too many open files"))
channel.unsafe().closeForcibly();
// as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
}
// as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
return new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t);
}
//TODO: 沒有異常一般都會執(zhí)行到這部分邏輯
ChannelFuture regFuture = config().group().register(channel);
if (regFuture.cause() != null) {
if (channel.isRegistered()) {
channel.close();
} else {
channel.unsafe().closeForcibly();
}
}
return regFuture;
}
channel = channelFactory.newChannel();
初始化后的channel
在初始化的時候創(chuàng)建java的ServerSocketChannel,并且設(shè)置為非阻塞
io.netty.bootstrap.ServerBootstrap#init
@Override
void init(Channel channel) {
setChannelOptions(channel, options0().entrySet().toArray(EMPTY_OPTION_ARRAY), logger);
setAttributes(channel, attrs0().entrySet().toArray(EMPTY_ATTRIBUTE_ARRAY));
ChannelPipeline p = channel.pipeline();
final EventLoopGroup currentChildGroup = childGroup;
final ChannelHandler currentChildHandler = childHandler;
final Entry<ChannelOption<?>, Object>[] currentChildOptions =
childOptions.entrySet().toArray(EMPTY_OPTION_ARRAY);
final Entry<AttributeKey<?>, Object>[] currentChildAttrs = childAttrs.entrySet().toArray(EMPTY_ATTRIBUTE_ARRAY);
p.addLast(new ChannelInitializer<Channel>() {
//TODO: 需要關(guān)注這個方法執(zhí)行的時間
@Override
public void initChannel(final Channel ch) {
final ChannelPipeline pipeline = ch.pipeline();
//TODO: 在pipline上添加配置的handler
ChannelHandler handler = config.handler();
if (handler != null) {
pipeline.addLast(handler);
}
//TODO: 給pipLines中添加一個ServerBootstrapAcceptor的handler
ch.eventLoop().execute(new Runnable() {
@Override
public void run() {
pipeline.addLast(new ServerBootstrapAcceptor(
ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
}
});
}
別想錯了竿滨,這個時候還沒有執(zhí)行那個ChannelInitializer#initChannel方法于游。所以目前都還沒有執(zhí)行eventLoop.executor(..)
ChannelFuture regFuture = config().group().register(channel);
io.netty.channel.MultithreadEventLoopGroup#register(io.netty.channel.Channel)
@Override
public ChannelFuture register(Channel channel) {
//TODO: 調(diào)用choose選出一個NioEventLoop注冊上去
return next().register(channel);
}
register方法最后會調(diào)用的邏輯在贰剥,io.netty.channel.AbstractChannel.AbstractUnsafe#register
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
ObjectUtil.checkNotNull(eventLoop, "eventLoop");
if (isRegistered()) {
promise.setFailure(new IllegalStateException("registered to an event loop already"));
return;
}
if (!isCompatible(eventLoop)) {
promise.setFailure(
new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
return;
}
AbstractChannel.this.eventLoop = eventLoop;
//TODO:當(dāng)前線程是main線程,返回false
if (eventLoop.inEventLoop()) {
register0(promise);
} else {
try {
//TODO:第一次調(diào)用調(diào)用 eventLoop.execute凛捏,這次會在NioEventLoop創(chuàng)建線程并且start
eventLoop.execute(new Runnable() {
@Override
public void run() {
register0(promise);
}
});
} catch (Throwable t) {
logger.warn(
"Force-closing a channel whose registration task was not accepted by an event loop: {}",
AbstractChannel.this, t);
closeForcibly();
closeFuture.setClosed();
safeSetFailure(promise, t);
}
}
}
io.netty.channel.AbstractChannel.AbstractUnsafe#register0
private void register0(ChannelPromise promise) {
try {
// check if the channel is still open as it could be closed in the mean time when the register
// call was outside of the eventLoop
if (!promise.setUncancellable() || !ensureOpen(promise)) {
return;
}
boolean firstRegistration = neverRegistered;
//TODO:重點1
doRegister();
neverRegistered = false;
registered = true;
// Ensure we call handlerAdded(...) before we actually notify the promise. This is needed as the
// user may already fire events through the pipeline in the ChannelFutureListener.
//TODO:重點2
pipeline.invokeHandlerAddedIfNeeded();
//TODO:重點3最欠,在doBind0()綁定地址哪里有講
safeSetSuccess(promise);
pipeline.fireChannelRegistered();
// Only fire a channelActive if the channel has never been registered. This prevents firing
// multiple channel actives if the channel is deregistered and re-registered.
if (isActive()) {
if (firstRegistration) {
pipeline.fireChannelActive();
} else if (config().isAutoRead()) {
// This channel was registered before and autoRead() is set. This means we need to begin read
// again so that we process inbound data.
//
// See https://github.com/netty/netty/issues/4805
beginRead();
}
}
} catch (Throwable t) {
// Close the channel directly to avoid FD leak.
closeForcibly();
closeFuture.setClosed();
safeSetFailure(promise, t);
}
}
重點1:io.netty.channel.nio.AbstractNioChannel#doRegister
@Override
protected void doRegister() throws Exception {
boolean selected = false;
for (;;) {
try {
//TODO:給這個channel注冊了0事件蚜点,注意不是OP_ACCEPT
selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
return;
} catch (CancelledKeyException e) {
if (!selected) {
// Force the Selector to select now as the "canceled" SelectionKey may still be
// cached and not removed because no Select.select(..) operation was called yet.
eventLoop().selectNow();
selected = true;
} else {
// We forced a select operation on the selector before but the SelectionKey is still cached
// for whatever reason. JDK bug ?
throw e;
}
}
}
}
重點2:最后邏輯調(diào)用io.netty.channel.DefaultChannelPipeline#callHandlerAddedForAllHandlers
private void callHandlerAddedForAllHandlers() {
final PendingHandlerCallback pendingHandlerCallbackHead;
synchronized (this) {
assert !registered;
// This Channel itself was registered.
registered = true;
pendingHandlerCallbackHead = this.pendingHandlerCallbackHead;
// Null out so it can be GC'ed.
this.pendingHandlerCallbackHead = null;
}
// This must happen outside of the synchronized(...) block as otherwise handlerAdded(...) may be called while
// holding the lock and so produce a deadlock if handlerAdded(...) will try to add another handler from outside
// the EventLoop.
PendingHandlerCallback task = pendingHandlerCallbackHead;
while (task != null) {
//todo:這個方法最后其實就是調(diào)用了pipLine.addLast(ChannelInitializer<Channel>#initChannel)
//完成最后NioServerSocketChannel的pipline中handler的初始化
task.execute();
task = task.next;
}
}
強勢總結(jié):
- 代碼執(zhí)行到現(xiàn)在NioServerSocketChannel初始化了褐耳。regregister0()里面的邏輯是channel與selector綁定陪拘,并且完成最后的pipline的初始化剪撬,但是異步執(zhí)行,所以需要看那個地方執(zhí)行了regregister0()
2.1 重點中的重點第一次調(diào)用eventLoop.executor()馍佑,所以分析邏輯
會調(diào)用io.netty.util.concurrent.SingleThreadEventExecutor#execute(java.lang.Runnable)
@Override
public void execute(Runnable task) {
ObjectUtil.checkNotNull(task, "task");
execute(task, !(task instanceof LazyRunnable) && wakesUpForTask(task));
}
execute:
private void execute(Runnable task, boolean immediate) {
boolean inEventLoop = inEventLoop();
// TODO: 加入NioEventLoop的taskQueue中拭荤,什么時候執(zhí)行等我揭曉舅世。
addTask(task);
//TODO:當(dāng)前線程仍然是main雏亚,則當(dāng)我們在NioEventLoop線程中exeutor(Runable)會直接加入隊列中
if (!inEventLoop) {
//TODO:重點
startThread();
if (isShutdown()) {
boolean reject = false;
try {
if (removeTask(task)) {
reject = true;
}
} catch (UnsupportedOperationException e) {
// The task queue does not support removal so the best thing we can do is to just move on and
// hope we will be able to pick-up the task before its completely terminated.
// In worst case we will log on termination.
}
if (reject) {
reject();
}
}
}
if (!addTaskWakesUp && immediate) {
wakeup(inEventLoop);
}
}
startThread:
private void startThread() {
if (state == ST_NOT_STARTED) {
if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
boolean success = false;
try {
doStartThread();
success = true;
} finally {
if (!success) {
STATE_UPDATER.compareAndSet(this, ST_STARTED, ST_NOT_STARTED);
}
}
}
}
}
有點繞,仔細看箭頭:
結(jié)論:doStartThread()里面的這個Runable的方法在NioEventLoop中的線程.start()的時候調(diào)用的网持。則在threadFactory.newThread(這里的runable是doStartThread中new出來的).start()
看到現(xiàn)在強勢總結(jié):
- 線程也創(chuàng)建了但是仍然沒有看到要執(zhí)行register0()所以現(xiàn)在可以看doStartThread()中SingleThreadEventExecutor.this.run()萍倡,執(zhí)行這行代碼的是在NioEventLoop線程中的遣铝,所以這個this沒有問題吧莉擒。SingleThreadEventExecutor里面的代碼大致是輪訓(xùn)遍歷執(zhí)行Selector里準(zhǔn)備就緒的事件和taskQueue里面的事件填硕,在下一章會有介紹鹿鳖。
2.2 doBind0(regFuture, channel, localAddress, promise);
什么時候被調(diào)用的?
- 因為它是在 regFuture.addListener(new ChannelFutureListener() {})中的回調(diào)方法中的執(zhí)行的姻檀,所以他在 io.netty.channel.AbstractChannel.AbstractUnsafe#register0 的 safeSetSuccess(promise);行代碼被調(diào)用的绣版。
- 代碼非常深杂抽,邏輯處理:io.netty.channel.socket.nio.NioServerSocketChannel#doBind
@SuppressJava6Requirement(reason = "Usage guarded by java version check")
@Override
protected void doBind(SocketAddress localAddress) throws Exception {
if (PlatformDependent.javaVersion() >= 7) {
javaChannel().bind(localAddress, config.getBacklog());
} else {
javaChannel().socket().bind(localAddress, config.getBacklog());
}
}
最后還會執(zhí)行這一段代碼:
invokeLater(new Runnable() {
@Override
public void run() {
pipeline.fireChannelActive();
}
});
pipeline.fireChannelActive();代碼非常深:會執(zhí)行這段邏輯
io.netty.channel.nio.AbstractNioChannel#doBeginRead
@Override
protected void doBeginRead() throws Exception {
// Channel.read() or ChannelHandlerContext.read() was called
final SelectionKey selectionKey = this.selectionKey;
if (!selectionKey.isValid()) {
return;
}
readPending = true;
final int interestOps = selectionKey.interestOps();
if ((interestOps & readInterestOp) == 0) {
//TODO:注冊16,即OP_ACCEPT事件
selectionKey.interestOps(interestOps | readInterestOp);
}
}
總結(jié)
那么Netty的服務(wù)端的啟動流程:
- NioEventLoopGroup bossGroup = new NioEventLoopGroup();
- 創(chuàng)建NioEventLoop杭朱,即創(chuàng)建Selector,Selector是NioEventLoop的一個屬性變量
- 創(chuàng)建choose : 輪訓(xùn)讓Channel綁定NioEventLoop的Selector上的算法
- ChannelFuture cf = bootstrap.bind(6666).sync();
- 通過反射創(chuàng)建NioServerSocketChannel
- NioServerSocketChannel初始化梦谜,添加一個回調(diào)函數(shù)為pipLine上加一個ServerBootstrapAcceptor的handler
- 添加一個回調(diào)函數(shù)綁定地址
- (在NioEventLoop線程中)NioServerSocketChannel注冊上NioEventLoop的Selector上唁桩,綁定的是0
- (在NioEventLoop線程中)調(diào)用之前的回調(diào)函數(shù)NioServerSocketChannel綁定地址
- (在NioEventLoop線程中) 為NioServerSocektChannel注冊O(shè)P_ACCEPT事件报辱。