Netty 的啟動流程
服務(wù)端 Nio 創(chuàng)建步驟包括:1. 創(chuàng)建;2. 注冊鞍盗;3. 監(jiān)聽。
所以需要在 netty 啟動也需要完成以上步驟
創(chuàng)建
創(chuàng)建步驟分為:創(chuàng)建 selector,創(chuàng)建 ServerSocketChannel
1. 創(chuàng)建 Selector
在創(chuàng)建 NioEventLoopGroup 的時(shí)候涮阔,會對每個(gè) EventLoop 進(jìn)行初始化别伏,初始化 EventLoop時(shí)码秉,也初始化 EventLoop 里的 selector
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
NioEventLoopGroup 構(gòu)造方法調(diào)用鏈
public NioEventLoopGroup(int nThreads) {
this(nThreads, (Executor) null);
}
public NioEventLoopGroup(int nThreads, Executor executor) {
// executor 為 null
// SelectorProvider.provider() 方法使用單例模式創(chuàng)建SelectorProvider
// SelectorProvider 是后續(xù)創(chuàng)建 Selector 的類
this(nThreads, executor, SelectorProvider.provider());
}
public NioEventLoopGroup(
int nThreads, Executor executor, final SelectorProvider selectorProvider) {
// DefaultSelectStrategyFactory.INSTANCE 通過單例模式創(chuàng)建一個(gè) DefaultSelectStrategyFactory
// DefaultSelectStrategyFactory 的用途是后續(xù)創(chuàng)建 SelectStrategy
// SelectStrategy 用來控制后續(xù) select 輪訓(xùn)的策略
this(nThreads, executor, selectorProvider, DefaultSelectStrategyFactory.INSTANCE);
}
public NioEventLoopGroup(int nThreads, Executor executor, final SelectorProvider selectorProvider,
final SelectStrategyFactory selectStrategyFactory) {
// RejectedExecutionHandlers.reject() 定義了 NioEventLoop 里線程池的拒絕策略
// super 表示調(diào)用 NioEventLoopGroup 的父類構(gòu)造函數(shù)吆录,即 MultithreadEventLoopGroup
super(nThreads, executor, selectorProvider,
selectStrategyFactory, RejectedExecutionHandlers.reject());
}
protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
// args[0]: selectorProvider
// args[1]: selectStrategyFactory
// args[2]: rejectedExecutionHandler
// super 表示 調(diào)用父類構(gòu)造函數(shù),即 MultithreadEventExecutorGroup
super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
}
protected MultithreadEventExecutorGroup(int nThreads, Executor executor, Object... args) {
// DefaultEventExecutorChooserFactory.INSTANCE 通過單例創(chuàng)建DefaultEventExecutorChooserFactory
// DefaultEventExecutorChooserFactory 的作用是通過 EventExecutor 數(shù)組創(chuàng)建一個(gè) EventExecutorChooser
// EventExecutorChooser 的作用是通過不同的算法從 EventExecutor 數(shù)組 選擇一個(gè) EventExecutor
// NioEventLoop 繼承了 EventExecutor毡证,所以在 NioEventLoopGroup 里的 NioEventLoop數(shù)組即是EventExecutor數(shù)組
this(nThreads, executor, DefaultEventExecutorChooserFactory.INSTANCE, args);
}
protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
EventExecutorChooserFactory chooserFactory, Object... args) {
...
if (executor == null) {
executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
}
children = new EventExecutor[nThreads];
for (int i = 0; i < nThreads; i ++) {
try{
// 創(chuàng)建 NioEventLoop
// 模板方法电爹,由 NioEventLoopGroup 實(shí)現(xiàn),selector 就在創(chuàng)建 NioEventLoop 時(shí)進(jìn)行創(chuàng)建
children[i] = newChild(executor, args);
}
...
}
}
// NioEventLoopGroup#newChild()
@Override
protected EventLoop newChild(Executor executor, Object... args) throws Exception {
EventLoopTaskQueueFactory queueFactory = args.length == 4 ? (EventLoopTaskQueueFactory) args[3] : null;
// 創(chuàng)建 NioEventLoop
return new NioEventLoop(this, executor, (SelectorProvider) args[0],
((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2], queueFactory);
}
接下來就是 NioEventLoop 的構(gòu)造函數(shù)
NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler,
EventLoopTaskQueueFactory queueFactory) {
super(parent, executor, false, newTaskQueue(queueFactory), newTaskQueue(queueFactory),
rejectedExecutionHandler);
this.provider = ObjectUtil.checkNotNull(selectorProvider, "selectorProvider");
this.selectStrategy = ObjectUtil.checkNotNull(strategy, "selectStrategy");
// 創(chuàng)建 selector,并返回一個(gè)selector 的元組對象料睛,此元組包含了一個(gè) unwrappedSelector 和 一個(gè) selector
// 默認(rèn)情況下 unwrappedSelector 和 selector 是相同的對象丐箩,都是java nio 原生的 selector
// 只有開啟了優(yōu)化配置摇邦,才會創(chuàng)建優(yōu)化了的 selector
// 但是即使優(yōu)化了selector,在注冊的時(shí)候依然使用 unwrappedSelector 這個(gè)原始的 selector
final SelectorTuple selectorTuple = openSelector();
// selector: 是被 Netty 優(yōu)化屎勘,由 Netty 繼承自 selector 接口實(shí)現(xiàn)的類
this.selector = selectorTuple.selector;
// unwrappedSelector: 沒有被 Netty 優(yōu)化的施籍,原始的JDK Nio selector
this.unwrappedSelector = selectorTuple.unwrappedSelector;
}
至此,便完成了 selector 的創(chuàng)建
接下來就是 ServerSocketChannel 的創(chuàng)建
2. ServerSocketChannel 的創(chuàng)建和注冊
- ServerSocketChannel 是Java 原始的 Channel概漱,在 Netty 中對這個(gè)ServerSocketChannel 進(jìn)行了擴(kuò)展丑慎,使用 NioServerSocketChannel 持有 ServerSocketChannel
// Java ServerSocketChannel 的在 NioServerSocketChannel 的位置
// NioServerSocketChannel -> AbstractNioMessageChannel -> AbstractNioChannel
// SelectableChannel 就是 java nio ServerSocketChannel 的接口類
private final SelectableChannel ch;
protected SelectableChannel javaChannel() {
return ch;
}
- 所以 ServerSocketChannel 的創(chuàng)建過程應(yīng)該在 NioServerSocketChannel 創(chuàng)建的時(shí)候進(jìn)行的
- 而 NioServerSocketChannel 則是在 ServerBootstrap中被創(chuàng)建的
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.channel(NioServerSocketChannel.class);
...
bootstrap.bind(8080);
與 NioServerSocketChannel 相關(guān)代碼有兩個(gè)
-
bootstrap.channel(NioServerSocketChannel.class)
方法會創(chuàng)建一個(gè) channelFactory,channelFactory 是一個(gè) netty channel 的工廠方法瓤摧,用來創(chuàng)建 NioServerSocketChannel 實(shí)例
// AbstractBootstrap#channel
public B channel(Class<? extends C> channelClass) {
// ReflectiveChannelFactory 封裝了 channelClass竿裂,調(diào)用 ReflectiveChannelFactory#newChannel 方法時(shí)通過反射的方式實(shí)例化channelClass,返回channelClass類的對象實(shí)例
// channelFactory方法則將 ReflectiveChannelFactory 復(fù)制給 AbstractBootstrap的成員變量 channleFactory
return channelFactory(new ReflectiveChannelFactory<C>(
ObjectUtil.checkNotNull(channelClass, "channelClass")
));
}
// ReflectiveChannelFactory 構(gòu)造方法
public ReflectiveChannelFactory(Class<? extends T> clazz) {
ObjectUtil.checkNotNull(clazz, "clazz");
try {
this.constructor = clazz.getConstructor();
}
...
}
-
bootstrap.bind(8080)
用來實(shí)際創(chuàng)建NioServerSocketChannel
對象實(shí)例
進(jìn)入 bootstrap.bind()
方法照弥,該方法實(shí)際上進(jìn)入了 ServerBootstrap 的父類 AbstractBootstrap的 bind 方法
// AbstractBootstrap#bind(int inetPort)
public ChannelFuture bind(int inetPort) {
return bind(new InetSocketAddress(inetPort));
}
public ChannelFuture bind(SocketAddress localAddress) {
// 驗(yàn)證 bossGroup 和 channelFactory 是否存在
validate();
// 真正進(jìn)行 bind 的方法
return doBind(ObjectUtil.checkNotNull(localAddress, "localAddress"));
}
private ChannelFuture doBind(final SocketAddress localAddress) {
// 初始化 NioServerSocketChannel ,并將 NioServerSocketChannel 里的注冊到 seletor 中就在這里
final ChannelFuture regFuture = initAndRegister();
...
}
final ChannelFuture initAndRegister() {
Channel channel = null;
try {
// 調(diào)用 上面所說的 ReflectiveChannelFactory#newChannel() 方法
// 初始化并返回 NioServerSocketChannel
channel = channelFactory.newChannel();
init(channel);
}
...
ChannelFuture regFuture = config().group().register(channel);
}
//ReflectiveChannelFactory#newChannel()
@Override
public T newChannel() {
try {
// 相當(dāng)于 new NioServerSocketChannel();
return constructor.newInstance();
} catch (Throwable t) {
throw new ChannelException("Unable to create Channel from class " + constructor.getDeclaringClass(), t);
}
}
接下來就是創(chuàng)建 NioServerSocketChannel 的流程
// NioServerSocketChannel 的構(gòu)造函數(shù)
public NioServerSocketChannel() {
this(newSocket(DEFAULT_SELECTOR_PROVIDER));
}
private static ServerSocketChannel newSocket(SelectorProvider provider) {
// 創(chuàng)建有個(gè) java Nio ServerSocketChannel 實(shí)例
return provider.openServerSocketChannel();
...
}
public NioServerSocketChannel(ServerSocketChannel channel) {
// 調(diào)用父類的構(gòu)造方法腻异,即AbstractNioMessageChannel的構(gòu)造方法
super(null, channel, SelectionKey.OP_ACCEPT);
config = new NioServerSocketChannelConfig(this, javaChannel().socket());
}
protected AbstractNioMessageChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
// 調(diào)用父類的構(gòu)造方法,即 AbstractNioChannel 的構(gòu)造方法
// parent 為 null
// ch 為 ServerSocketChannel
// readInterestOp 為 SelectionKey.OP_ACCEPT
super(parent, ch, readInterestOp);
}
protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
// parent 為 null, 調(diào)用父類構(gòu)造函數(shù)这揣,即 AbstractChannel 的構(gòu)造函數(shù)
super(parent);
// 將 serverSocketChannel 賦值給 NioServerSocketChannel 的成員變量 ch
this.ch = ch;
// serverSocketChannel 只對 SelectionKey.OP_ACCEPT 感興趣
this.readInterestOp = readInterestOp;
try {
// 將 serverSocketChannel 設(shè)置為非阻塞
ch.configureBlocking(false);
}
...
}
protected AbstractChannel(Channel parent) {
// parent 為 null
this.parent = parent;
// NioServerSocketChannel的 id
id = newId();
// 最后會調(diào)用 AbstractNioByteChannel 的 newUnsafe() 方法悔常,返回 NioMessageUnsafe 對象
unsafe = newUnsafe();
// 熟悉的 pipeline,返回 DefaultChannelPipeline 對象
pipeline = newChannelPipeline();
}
所以,在創(chuàng)建 NioServerSocketChannel 時(shí)的 ServerSocketChannel 便已經(jīng)被創(chuàng)建了曾沈,并賦值給了成員變量 ch
接下來就是把這個(gè) ServerSocketChannel 注冊到 selector 中这嚣,回到剛才 AbstractBootstrap 的 initAndRegister 方法
final ChannelFuture initAndRegister() {
Channel channel = null;
// 完成了 ServerSocketChannel 的創(chuàng)建鸥昏,接下來就是將 ServerSocketChannel 注冊到 selector 中
// config().group() 會返回 bossGroup塞俱,即是一個(gè) NioEventLoopGroup 對象register(channel)
// 調(diào)用 config().group().register(channel) 就是調(diào)用 NioEventLoopGroup.register(channel)
// NioEventLoopGroup.register(channel) 方法的實(shí)現(xiàn)在其父類 MultithreadEventLoopGroup
ChannelFuture regFuture = config().group().register(channel);
}
// MultithreadEventLoopGroup#register
@Override
public ChannelFuture register(Channel channel) {
// next() 會使用 chooser(EventExecutorChooser) 選擇一個(gè) NioEventLoop
// 調(diào)用 NioEventLoop.register 方法,NioEventLoop.register 方法在 SingleThreadEventLoop.register 中實(shí)現(xiàn)
return next().register(channel);
}
// SingleThreadEventLoop#register
@Override
public ChannelFuture register(Channel channel) {
// 根據(jù) NioServerSocketChannle 生成一個(gè) DefaultChannelPromise
return register(new DefaultChannelPromise(channel, this));
}
@Override
public ChannelFuture register(final ChannelPromise promise) {
// promise.channel 返回 NioServerSocketChannel
// NioServerSocketChannel.unsafe() 返回一個(gè) Unsafe
// Unsafe.register 方法會進(jìn)入 NioByteUnsafe.register 方法吏垮, 而 NioByteUnsafe.register 方法的實(shí)現(xiàn)在 AbstractNioUnsafe
promise.channel().unsafe().register(this, promise);
return promise;
}
// AbstractNioUnsafe#register
@Override
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
...
AbstractChannel.this.eventLoop = eventLoop;
// 判斷當(dāng)前線程是否是在本 eventLoop,顯然不是障涯,當(dāng)前線程是在 main thread,所以不進(jìn)入
if (eventLoop.inEventLoop()) {
register0(promise);
} else {
try {
// 不是在 eventLoop中膳汪,則創(chuàng)建一個(gè)線程唯蝶,放到 eventLoop中的線程池中,在線程池中調(diào)用register0() 方法
eventLoop.execute(new Runnable() {
@Override
public void run() {
register0(promise);
}
});
}
...
}
}
private void register0(ChannelPromise promise) {
...
// 此方法在 AbstractNioChannel中實(shí)現(xiàn)
doRegister();
...
}
// AbstractNioChannel#doRegister
protected void doRegister() throws Exception {
boolean selected = false;
for (;;) {
try {
// javaChannel() 即是 ServerSocketChannel
// eventLoop().unwrappedSelector() 即是 selector
// 0 表示 ServerSocketChannel 對任何事件都不感興趣遗嗽,后續(xù)還會另外注冊 Accept 事件到 selector
// this:表示把 NioServerSocketChannel 作為屬性綁定到 serverSocketChannel 中粘我,只要selector 返回了 serverSocketChannel 相關(guān)的事件,便會帶回來這個(gè) NioServerSocketChannel 對象
selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
return;
}
}
}
到目前為止痹换,
- selector 的創(chuàng)建征字,
- serverSocketChannel 的創(chuàng)建,
- serverSocketChannel 向 selector 注冊
已經(jīng)完成
還有
- 給 serverSocketChannel設(shè)置監(jiān)聽端口
- 注冊 serverSocketChannel accept事件 到 selector
- selector.調(diào)用 select() 方法輪詢的步驟
還沒完成
回到 AbstractBootstrap#dobind()
方法
private ChannelFuture doBind(final SocketAddress localAddress) {
// 上面的步驟創(chuàng)建了 NioServerSocketChannel娇豫,由于 NioServerSocketChannel注冊到 selector 是放在 eventLoop 的線程池異步進(jìn)行的匙姜,所以會返回一個(gè)regFuture 這個(gè)異步結(jié)果。
...
if (regFuture.isDone()) {
// 如果已經(jīng)注冊完成了NioServerSocketChannel冯痢,則開始 bind NioServerSocketChannel
// 具體的bind 方法在 doBind0() 中
ChannelPromise promise = channel.newPromise();
doBind0(regFuture, channel, localAddress, promise);
return promise;
}
}
private static void doBind0(
final ChannelFuture regFuture, final Channel channel,
final SocketAddress localAddress, final ChannelPromise promise) {
//拿到 NioServerSocketChannel 所在的 eventLoop氮昧,然后將將 bind 方法丟給 eventLoop 里的線程池進(jìn)行處理
channel.eventLoop().execute(new Runnable() {
@Override
public void run() {
if (regFuture.isSuccess()) {
// 調(diào)用 NioServerSocketChannel 的 bind 方法框杜,此方法的實(shí)現(xiàn)在 AbstractChannel 類
channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
} else {
promise.setFailure(regFuture.cause());
}
}
});
}
//AbstractChannel#bind
@Override
public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
// 調(diào)用 DefaultChannelPipeline 的 bind 方法
return pipeline.bind(localAddress, promise);
}
// DefaultChannelPipeline#bind
@Override
public final ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
// 調(diào)用 TailContext 的 bind 方法,該方法會調(diào)用 AbstractChannelHandlerContext.bind() 方法
// 該方法會去尋找pipeline 中所有的 executionMask 包含了 Mask_Bind 的 Outbound Handler袖肥,默認(rèn)情況下只有 HeadContext 才有咪辱,所以實(shí)際上調(diào)用的是 HeadContext 的 bind 方法
return tail.bind(localAddress, promise);
}
// HeadContext#bind
public void bind(
ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) {
// 該 unsafe 是 NioServerSocketChannel 中的 NioMessageUnsafe 對象,NioMessageUnsafe.bind 方法的實(shí)現(xiàn)在其父類 AbstractUnsafe 中
unsafe.bind(localAddress, promise);
}
// AbstractUnsafe#bind
@Override
public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {
...
try {
// 調(diào)用 NioServerSocketChannle 的 doBind 方法
doBind(localAddress);
} catch (Throwable t) {
safeSetFailure(promise, t);
closeIfClosed();
return;
}
...
}
// NioServerSocketChannel#Bind
protected void doBind(SocketAddress localAddress) throws Exception {
if (PlatformDependent.javaVersion() >= 7) {
// 調(diào)用 java nio 的 ServerSocketChannel 的 bind 方法
// backlog 表示等候排隊(duì)的連接隊(duì)列長度
javaChannel().bind(localAddress, config.getBacklog());
} else {
javaChannel().socket().bind(localAddress, config.getBacklog());
}
}
至此椎组,ServerSocketChannel bind 監(jiān)聽端口也已經(jīng)完成
接下來還有
- 注冊 serverSocketChannel accept 事件 到 selector
- selector.調(diào)用 select() 方法輪詢的步驟
回到 AbstractUnsafe.bind
方法
// AbstractUnsafe#bind
@Override
public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {
...
if (!wasActive && isActive()) {
invokeLater(new Runnable() {
@Override
public void run() {
// 觸發(fā) pipeline 中 ChannelHandler 的 channelActive() 方法
// 這里直接到 HeadContext 的 active() 方法
pipeline.fireChannelActive();
}
});
}
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelActive();
// 注冊 Accept 的事件
readIfIsAutoRead();
}
private void readIfIsAutoRead() {
if (channel.config().isAutoRead()) {
// 調(diào)用 NioServerSocketChannel 的 read 方法
// 該方法的實(shí)現(xiàn)在 AbstractChannel 中
channel.read();
}
}
//AbstractChannel#read
@Override
public Channel read() {
// 調(diào)用 DefaultChannelPipeline 的 read() 方法
pipeline.read();
return this;
}
//DefaultChannelPipeline#read
@Override
public final ChannelPipeline read() {
// 該方法從 tail 開始梧乘,調(diào)用所有的 ChannelHandler 的 read 的方法
// 最終會到達(dá) HeadContext 的 read(ChannelHandlerContext ctx) 方法
tail.read();
return this;
}
// HeadContext
@Override
public void read(ChannelHandlerContext ctx) {
// 調(diào)用 NioMessageUnsafe.beginRead() 方法
// 該方法的實(shí)現(xiàn)在 AbstractUnsafe 中
unsafe.beginRead();
}
// AbstractUnsafe#beginRead()
@Override
public final void beginRead() {
...
try {
// 調(diào)用 AbstractChannel 的 doBeginRead() 方法
// 該方法的實(shí)現(xiàn)在 AbstractNioChannel
doBeginRead();
} catch (final Exception e) {
...
}
...
}
// AbstractNioChannel#doBeginRead()
@Override
protected void doBeginRead() throws Exception {
// 這個(gè) selectionKey 是在將 serverSocketChannel 注冊到 selector 時(shí)返回的
final SelectionKey selectionKey = this.selectionKey;
if (!selectionKey.isValid()) {
return;
}
readPending = true;
final int interestOps = selectionKey.interestOps();
// readInterestOp 是在創(chuàng)建NioServerSocketChannel 時(shí)指定的,值為 SelectionKey.OP_ACCEPT
if ((interestOps & readInterestOp) == 0) {
// 設(shè)置對 accept 事件感興趣
selectionKey.interestOps(interestOps | readInterestOp);
}
}
完成了 accept 注冊后庐杨,最后一部就到了select 輪詢
總結(jié)