// 1. 配置 bossGroup 和 workerGroup
final EventLoopGroup bossGroup = new NioEventLoopGroup(1);
final EventLoopGroup workerGroup = new NioEventLoopGroup();
// 2. 創(chuàng)建業(yè)務(wù)邏輯處理器
final EchoServerHandler serverHandler = new EchoServerHandler();
// 3. 創(chuàng)建并配置服務(wù)端啟動輔助類 ServerBootstrap
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 100)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
p.addLast(serverHandler);
}
});
// 4. 阻塞綁定端口
ChannelFuture f = b.bind(8081).sync();
// 5. 為服務(wù)端關(guān)閉的 ChannelFuture 添加監(jiān)聽器讹弯,用于實(shí)現(xiàn)優(yōu)雅關(guān)閉
f.channel().closeFuture().addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
});
一、代碼執(zhí)行流程梯形圖
/*********************************************** 1. 創(chuàng)建 NioEventLoopGroup ***********************************************/
new NioEventLoopGroup
--> SelectorProvider.provider() -- args
--> new DefaultSelectStrategyFactory() -- args
--> RejectedExecutionHandlers.reject() -- args
<!-- 1.1 創(chuàng)建 EventExecutor 選擇器工廠 -->
--> new DefaultEventExecutorChooserFactory()
--> MultithreadEventExecutorGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory, Object... args)
<!-- 1.2 創(chuàng)建線程工廠 -->
--> ThreadPerTaskExecutor(newDefaultThreadFactory())
<!-- 1.3 創(chuàng)建 EventExecutor 數(shù)組并且循環(huán)實(shí)例化數(shù)組元素 -->
--> EventExecutor[] children = new EventExecutor[nThreads]
--> children[i] = newChild(executor, args)
--> NioEventLoopGroup.newChild(Executor executor, Object... args)
<!-- 1.3.1 創(chuàng)建 select 策略 -->
--> DefaultSelectStrategyFactory.newSelectStrategy()
--> SelectStrategy INSTANCE = new DefaultSelectStrategy()
--> new NioEventLoop(NioEventLoopGroup parent,
Executor executor,
SelectorProvider selectorProvider,
SelectStrategy strategy,
RejectedExecutionHandler rejectedExecutionHandler)
--> SingleThreadEventExecutor(EventExecutorGroup parent,
Executor executor,
boolean addTaskWakesUp,
int maxPendingTasks,
RejectedExecutionHandler rejectedHandler)
<!-- 1.3.2 創(chuàng)建 taskQueue拦坠,用于存放非 NioEventLoop 線程提交的 task -->
--> Queue<Runnable> taskQueue = NioEventLoop.newTaskQueue(int maxPendingTasks)
--> PlatformDependent.<Runnable>newMpscQueue()
--> SingleThreadEventLoop.Queue<Runnable> tailTasks = NioEventLoop.newTaskQueue(int maxPendingTasks)
<!-- 1.3.3 創(chuàng)建 Selector -->
--> Selector selector = provider.openSelector() // 簡化锭魔,netty 有優(yōu)化過
<!-- 1.4 創(chuàng)建 -->
--> EventExecutorChooserFactory.EventExecutorChooser chooser = chooserFactory.newChooser(children)
--> new PowerOfTwoEventExecutorChooser(executors)
最終的 NioEventLoopGroup 實(shí)例:
-- EventExecutor[] children = new EventExecutor[1](實(shí)例是 NioEventLoop)
-- Thread thread = null // NIO 線程
-- Executor executor = new ThreadPerTaskExecutor(new DefaultThreadFactory)// 線程創(chuàng)建器
-- Selector selector = 根據(jù)系統(tǒng)創(chuàng)建不同的 selector // selector 選擇器
-- SelectedSelectionKeySet selectedKeys // 存儲被選中的 SelectionKey 列表
-- SelectionKey[] keys
-- SelectStrategy selectStrategy = new DefaultSelectStrategy()
-- int ioRatio = 50 // selected和隊(duì)列中任務(wù)的執(zhí)行時(shí)間比例
-- Queue<Runnable> taskQueue = new MpscUnboundedArrayQueue<T>(1024) // 任務(wù)隊(duì)列
-- Queue<Runnable> tailTasks = new MpscUnboundedArrayQueue<T>(1024)
-- PriorityQueue<ScheduledFutureTask<?>> scheduledTaskQueue = null
-- RejectedExecutionHandler rejectedExecutionHandler // 回絕策略
-- EventExecutorGroup parent = this 即 NioEventLoopGroup 實(shí)例 // 所屬的 NioEventLoopGroup
-- DefaultEventExecutorChooserFactory.PowerOfTwoEventExecutorChooser // 線程選擇器:從children中選擇一個(gè)EventExecutor實(shí)例
-- EventExecutor[] children = new EventExecutor[1]
/*********************************************** 2. 創(chuàng)建并設(shè)置 ServerBootstrap ***********************************************/
new ServerBootstrap()
--> Map<ChannelOption<?>, Object> options = new LinkedHashMap<>()
--> Map<AttributeKey<?>, Object> attrs = new LinkedHashMap<>()
--> Map<ChannelOption<?>, Object> childOptions = new LinkedHashMap<>()
--> Map<AttributeKey<?>, Object> childAttrs = new LinkedHashMap<>()
--> ServerBootstrapConfig config = new ServerBootstrapConfig(this)
// 設(shè)置 group
ServerBootstrap.group(bossGroup, childGroup)
--> EventLoopGroup group = bossGroup
--> EventLoopGroup childGroup = childGroup
// 設(shè)置 channel
ServerBootstrap.channel(NioServerSocketChannel.class)
--> ChannelFactory channelFactory = new ReflectiveChannelFactory(Class<? extends T> clazz) // 設(shè)置channel創(chuàng)建工廠梁厉,反射建立 channel
// 設(shè)置 option
ServerBootstrap.option(ChannelOption.SO_BACKLOG, 100)
--> Map<ChannelOption<?>, Object> options.put
// 設(shè)置 handler
ServerBootstrap.handler(new LoggingHandler(LogLevel.INFO))
--> ChannelHandler handler = LoggingHandler實(shí)例
// 設(shè)置 childHandler
ServerBootstrap.childHandler(new ChannelInitializer<SocketChannel>{})
--> ChannelInitializer channelInitializer = new ChannelInitializer<SocketChannel>(){}
--> ChannelHandler childHandler = channelInitializer
/*********************************************** 3. bind ***********************************************/
ServerBootstrap.bind(int inetPort)
--> AbstractBootstrap.doBind(SocketAddress localAddress)
--> ChannelFuture regFuture = initAndRegister()
/********** 3.1 創(chuàng)建 NioServerSocketChannel *********/
--> Channel channel = channelFactory.newChannel() // channelFactory=ReflectiveChannelFactory
--> new NioServerSocketChannel()
--> newSocket(SelectorProvider provider)
--> provider.openServerSocketChannel() // 創(chuàng)建 java.nio.channels.ServerSocketChannel
--> NioServerSocketChannel(ServerSocketChannel channel)
--> AbstractChannel(Channel parent) // parent=null
--> CloseFuture closeFuture = new CloseFuture(this)
--> ChannelId id = DefaultChannelId.newInstance()
--> Unsafe unsafe = new NioMessageUnsafe() // 每一個(gè) Channel 都有一個(gè) Unsafe 對象
--> ChannelOutboundBuffer outboundBuffer = new ChannelOutboundBuffer(AbstractChannel.this)
--> List<Object> readBuf = new ArrayList<Object>()
--> DefaultChannelPipeline pipeline = new DefaultChannelPipeline(this) // 每一個(gè) Channel 都有一個(gè) ChannelPipeline 對象
--> this.channel = = this
--> AbstractChannelHandlerContext tail = new TailContext(this)
--> boolean inbound = true
--> boolean outbound = false
--> this.pipeline = pipeline
--> AbstractChannelHandlerContext head = new HeadContext(this)
--> boolean inbound = true
--> boolean outbound = true
--> this.pipeline = pipeline
--> Unsafe unsafe = pipeline.channel().unsafe() // NioMessageUnsafe
--> head<->tail 組建雙鏈表 // 每一個(gè) ChannelPipeline 對象都有一條 ChannelHandlerContext 組成的雙向鏈表滑废,每一個(gè)handler都由ChannelHandlerContext包裹
--> SelectableChannel ch = channel // channel = java.nio.channels.ServerSocketChannel
--> int readInterestOp = 16 // SelectionKey.OP_ACCEPT
--> ch.configureBlocking(false) // 配置 ServerSocketChannel 為非阻塞
--> ServerSocketChannelConfig config = new NioServerSocketChannelConfig(this, javaChannel().socket()) // tcp 參數(shù)配置類
--> ServerSocket javaSocket = javaChannel().socket() // ServerSocketChannel.socket(), 創(chuàng)建 ServerSocket
--> RecvByteBufAllocator rcvBufAllocator = new AdaptiveRecvByteBufAllocator()
--> ByteBufAllocator allocator = PooledByteBufAllocator(preferDirect = true)
--> int connectTimeoutMillis = 30000
--> WriteBufferWaterMark writeBufferWaterMark = WriteBufferWaterMark.DEFAULT // low=32 * 1024 hign=64 * 1024
--> Channel channel = this困乒,即 NioServerSocketChannel
/********** 3.2 初始化 NioServerSocketChannel 屬性并添加 acceptorInitializer *********/
--> ServerBootstrap.init(Channel channel)
--> channel.config().setOption((ChannelOption<Object>) option, value) // ServerSocketChannelConfig 設(shè)置 option 配置
--> channel.attr(key).set(e.getValue()) // 設(shè)置 attr
--> ChannelInitializer acceptorInitializer = new ChannelInitializer<Channel>() {} // 非常重要
--> channel.pipeline().addLast(acceptorInitializer)
--> AbstractChannelHandlerContext newCtx = new DefaultChannelHandlerContext(this, childExecutor(group), name, handler)
// this=pipeline, handler=acceptorInitializer, childExecutor(group)=null
--> boolean inbound = true
--> boolean outbound = false
--> this.pipeline = pipeline
--> ChannelHandler handler = acceptorInitializer
--> addLast0(newCtx) // 將 acceptorInitializer 加入鏈表:head <-> acceptorInitializer <-> tail
--> callHandlerCallbackLater(newCtx, true) // 創(chuàng)建 new PendingHandlerAddedTask(ctx) 加入 PendingHandlerCallback 鏈表
/********** 3.3 執(zhí)行注冊:此時(shí)啟動Nio線程 + 注冊 channel 到 selector *********/
--> ChannelFuture MultithreadEventLoopGroup.register(Channel channel) // channel=NioServerSocketChannel
--> EventExecutor eventLoop = PowerOfTwoEventExecutorChooser.next()
--> SingleThreadEventLoop.register(Channel channel)
--> new DefaultChannelPromise(channel, this)
--> EventExecutor executor = eventLoop
--> Channel channel = NioServerSocketChannel
--> promise.channel().unsafe().register(this, promise) // this=eventLoop
--> AbstractChannel$AbstractUnsafe.register(this, promise)
--> EventLoop eventLoop = this // 同一個(gè) channel 只由一個(gè) EventLoop 來處理
--> 因?yàn)楫?dāng)前線程main不是當(dāng)前 eventLoop 的那條NioEventLoop線程寂屏,所以將 register0(promise) 封裝為 task,丟入 eventLoop 任務(wù)隊(duì)列
--> SingleThreadEventExecutor.execute(Runnable task) // task register0
--> addTask(Runnable task)
--> offerTask(Runnable task)
--> taskQueue.offer(task) // 如果添加失斈嚷А(例如隊(duì)列容量滿了)迁霎,則使用回絕處理器,此處是拋出RejectedExecutionException
--> startThread()
--> 封裝線程啟動task 該任務(wù)就是 Runnable command百宇,command 內(nèi)容如下
// 該task 是在下一步創(chuàng)建線程并啟動之后執(zhí)行的
--> NioEventLoop.Thread thread = 下一步創(chuàng)建出來的線程
--> NioEventLoop.run()
--> processSelectedKeys()
--> runAllTasks(long timeoutNanos)
--> fetchFromScheduledTaskQueue() // 將到期的 scheduledTaskQueue 中的 task 加入 taskQueue
--> Runnable task = pollTask() // taskQueue.poll()
--> safeExecute(task) // task.run() 此時(shí)會執(zhí)行 register0 任務(wù)
--> afterRunningAllTasks()
--> runAllTasksFrom(tailTasks) // 執(zhí)行 tailTasks 中的所有 task
--> ThreadPerTaskExecutor.execute(Runnable command) // threadFactory.newThread(command).start() 創(chuàng)建線程并啟動
--> wakeup(boolean inEventLoop) // inEventLoop=false
--> selector.wakeup() // 喚醒阻塞
/********** 3.4 執(zhí)行bind *********/
--> Channel channel = regFuture.channel()
--> ChannelPromise promise = channel.newPromise()
--> pipeline.newPromise()
--> new DefaultChannelPromise(channel)
--> doBind0(regFuture, channel, localAddress, promise)
--> 將 channel.bind 封裝為 task考廉,丟入 eventLoop 任務(wù)隊(duì)列
--> channel.eventLoop().execute(bind任務(wù)) // bind task
register0 task
--> doRegister()
--> selectionKey = javaChannel().register(Selector sel, // eventLoop().unwrappedSelector()
int ops, // 0
Object att) // this
--> pipeline.invokeHandlerAddedIfNeeded() // 會執(zhí)行一些 handlerAdded() 事件
--> callHandlerAddedForAllHandlers() // 執(zhí)行 pendingHandlerCallback 鏈表
--> acceptorInitializer.initChannel(final Channel ch)
--> pipeline.addLast(config.handler()) // handler配置
--> pipeline.addLast(new ServerBootstrapAcceptor(...)) // 添加 ServerBootstrapAcceptor
--> 從 pipeline 刪除 acceptorInitializer(因?yàn)榇藭r(shí)的 acceptorInitializer 的職責(zé)已經(jīng)結(jié)束)
--> pipeline.fireChannelRegistered() // 會執(zhí)行一些 channelRegister() 事件
bind task
--> doBind() // javaChannel().bind(localAddress, config.getBacklog())
--> pipeline.fireChannelActive()
--> ctx.fireChannelActive() // 執(zhí)行 channelActive() 事件
--> readIfIsAutoRead()
--> NioMessageUnsafe.beginRead()
--> selectionKey.interestOps(interestOps | readInterestOp) // 為 NioServerSocketChannel 綁定 ACCEPT 事件
總結(jié):
- 創(chuàng)建 NioEventLoopGroup
- 創(chuàng)建一個(gè) EventExecutor[],并且實(shí)例化其內(nèi)的每一個(gè)元素為 NioEventLoop(EventExecutor 是 NioEventLoop 的子類)
- 每個(gè) NioEventLoop 都包含一條線程 Thread恳谎,Netty 默認(rèn)是 FastThreadLocalThread芝此,此處為 null憋肖,后續(xù)在執(zhí)行注冊任務(wù)的時(shí)候因痛,會賦值并且開啟線程(每個(gè) NioEventLoop 都包含事先被創(chuàng)建好的 ThreadPerTaskExecutor,該線程執(zhí)行器用于線程 FastThreadLocalThread 的創(chuàng)建)
- 每個(gè) NioEventLoop 都包含一個(gè) Selector岸更,用于死循環(huán)進(jìn)行 NIO 感興趣事件的監(jiān)聽
- 每個(gè) NioEventLoop 都包含一個(gè) taskQueue鸵膏,用于存放非 NioEventLoop 線程提交的任務(wù)
- 每個(gè) NioEventLoop 都包含一個(gè) scheduledTaskQueue,用于存放定時(shí)任務(wù)(例如連接超時(shí)任務(wù))怎炊,此處為 null - 懶創(chuàng)建
- 每個(gè) NioEventLoop 都包含一個(gè) ioRatio谭企,用于決定 IO 事件和隊(duì)列任務(wù)的執(zhí)行時(shí)間比例
- 創(chuàng)建 NioEventLoop 選擇器,用于后續(xù)從 EventExecutor[] 選擇一個(gè) NioEventLoop
- 創(chuàng)建并設(shè)置 ServerBootstrap
- options评肆、attrs债查、handler 都是針對 ServerSocketChannel 起作用的;
- childOptions瓜挽、childAttrs盹廷、childHandler 都是針對由 ServerSocketChannel.accept() 出來的 SocketChannel 起作用的
- 設(shè)置 channel 時(shí),創(chuàng)建了 ReflectiveChannelFactory久橙,用于反射創(chuàng)建 NioServerSocketChannel
- 綁定操作
- 使用 ReflectiveChannelFactory 反射創(chuàng)建 NioServerSocketChannel
- 創(chuàng)建 java.nio.channels.ServerSocketChannel俄占,NioServerSocketChannel 是其包裝類
- 為 NioServerSocketChannel 設(shè)置唯一ID
- 為 NioServerSocketChannel 設(shè)置 Unsafe 實(shí)例(服務(wù)端是 NioMessageUnsafe),Unsafe 是真正的進(jìn)行底層 IO 操作的類 - 每一個(gè) Channel 都有一個(gè) Unsafe 對象
- 為 NioServerSocketChannel 設(shè)置 ChannelPipeline 對象 - 每一個(gè) Channel 都有一個(gè) ChannelPipeline 對象淆衷,每一個(gè) ChannelPipeline 都包含一條由 ChannelHandlerContext 組成的雙向鏈表(這條雙向鏈表至少有兩個(gè)節(jié)點(diǎn) HeadContext 和 TailContext)缸榄,每個(gè) ChannelHandlerContext 內(nèi)部都包裹著一個(gè) ChannelHandler。
- 設(shè)置 java.nio.channels.ServerSocketChannel 為非阻塞
- 記錄感興趣事件為 ACCEPT 事件
- 初始化 NioServerSocketChannel 屬性并添加 acceptorInitializer
- 設(shè)置 options祝拯、attrs 到 NioServerSocketChannel
- 創(chuàng)建 acceptorInitializer 并添加其到 NioServerSocketChannel 的 ChannelPipeline 對象中甚带,此時(shí)的 ChannelPipeline 鏈?zhǔn)?
HeadContext <-> acceptorInitializer <-> TailContext
,acceptorInitializer 包含一個(gè) ServerBootstrapAcceptor
- 執(zhí)行注冊:此時(shí)啟動Nio線程 + 注冊 channel 到 selector
- 使用 NioEventLoopGroup 的線程選擇器從 bossGroup 中選出一個(gè) NioEventLoop
X
- 最終調(diào)用 NioMessageUnsafe 執(zhí)行注冊,由于當(dāng)前執(zhí)行線程不是當(dāng)前 eventLoop 的那條 NioEventLoop 線程欲低,所以創(chuàng)建注冊任務(wù)辕宏,并加入
X
的 taskQueue 中,然后創(chuàng)建線程砾莱,賦值給X
的 thread 屬性瑞筐,之后啟動線程,執(zhí)行 NIO 死循環(huán)- NIO 死循環(huán)會按照 ioRatio 計(jì)算出來的時(shí)間比分別執(zhí)行 “處理 NIO 感興趣的事件” 和 “處理隊(duì)列中的任務(wù)”(會將到期的 scheduledTaskQueue 中的 task 加入 taskQueue腊瑟,之后統(tǒng)一從 taskQueue pollTask)聚假,此時(shí)會執(zhí)行注冊任務(wù)
- 注冊任務(wù):
- 將 NioServerSocketChannel 中的 java.nio.channels.ServerSocketChannel 注冊到
X
的 Selector 上,選擇鍵為0(此時(shí)不監(jiān)聽任何事件)闰非,attachment 為 NioServerSocketChannel 本身- pipeline.invokeHandlerAddedIfNeeded() 會執(zhí)行到 acceptorInitializer膘格,首先執(zhí)行其 initChannel,此時(shí)創(chuàng)建
ServerBootstrapAcceptor
并添加到 ChannelPipeline财松,如下HeadContext <-> acceptorInitializer <-> ServerBootstrapAcceptor <-> TailContext
瘪贱,最后刪除 acceptorInitializer,最終的 ChannelPipeline 鏈?zhǔn)?HeadContext <-> ServerBootstrapAcceptor <-> TailContext
(其中辆毡,ServerBootstrapAcceptor 中存儲著 childOptions菜秦、childAttrs、childHandler 以及 workerGroup舶掖,后續(xù)有客戶端進(jìn)行連接時(shí)球昨,服務(wù)端會監(jiān)聽到 ACCEPT 事件,進(jìn)而會使用 ServerBootstrapAcceptor 做一些邏輯)- 注冊完畢之后 執(zhí)行 channelRegister() 事件
- 綁定
- 與注冊操作一樣綁定操作也是非當(dāng)前 Channel 所屬的 NioEventLoop 線程發(fā)起的眨攘,所以也要封裝為任務(wù)主慰,加入到
X
的 taskQueue 中(每加入隊(duì)列一個(gè)任務(wù),都會做一次 selector.wakeup 操作鲫售,起到及時(shí)執(zhí)行任務(wù)的作用)
- 綁定任務(wù)的內(nèi)容:pipeline.fireChannelActive()
- 執(zhí)行 channelActive() 事件
- 為 NioServerSocketChannel 設(shè)置感興趣的監(jiān)聽鍵為創(chuàng)建 NioServerSocketChannel 時(shí)所存儲的 ACCEPT 事件
注意:
- bind 一定要發(fā)生在 register 之后共螺,但是 Netty 中所有的執(zhí)行都是異步的,register 也不例外情竹,那么怎么保證 bind 一定發(fā)生在 register 之后藐不,Netty 使用為 regsiter 添加執(zhí)行完成的回調(diào)監(jiān)聽器,在該監(jiān)聽器中完成 bind 操作鲤妥。
- 一個(gè) Channel 的事情只能由一個(gè) NioEventLoop 來操作佳吞,所以 EventExecutor#inEventLoop() 不是判斷當(dāng)前線程是不是 NioEventLoop 中的那條線程,而是判斷是否是當(dāng)前所操作的 Channel 所屬的 NioEventLoop 的那條線程棉安。在 accept 的操作過程中底扳,調(diào)用注冊的線程是 NioServerSocketChannel 的那條線程,而注冊的 Channel 是 SocketChannel贡耽,所以還是要包裝成任務(wù)衷模,添加到 SocketChannel 所屬的那條 NioEventLoop 中鹊汛。
- 當(dāng) j 是 2的n次方的時(shí)候,i % j == i &(j-1)阱冶,后者效率更高刁憋,PowerOfTwoEventExecutorChooser 是后者,也是默認(rèn)木蹬;GenericEventExecutorChooser 是前者至耻。