基于Netty源代碼版本:netty-all-4.1.33.Final
netty中的reactor線程
netty中最核心的東西莫過于兩種類型的reactor線程,可以看作netty中兩種類型的發(fā)動機,驅動著netty整個框架的運轉
一種類型的reactor線程是boos線程組蚯斯,專門用來接受新的連接搔驼,然后封裝成channel對象扔給worker線程組签赃;還有一種類型的reactor線程是worker線程組痪欲,專門用來處理連接的讀寫咬腋。
不管是boos線程還是worker線程,所做的事情均分為以下三個步驟
- 輪詢出注冊在selector上面的IO事件(select)
- 處理這些IO事件(process selected keys)
- 執(zhí)行異步task
對于boos線程來說向瓷,第一步輪詢出來的基本都是 accept 事件肠套,表示有新的連接,而worker線程輪詢出來的基本都是read/write事件猖任,表示網(wǎng)絡的讀寫事件
新連接的建立
簡單來說你稚,新連接的建立可以分為三個步驟:
- 1、檢測到有新的連接
- 2朱躺、將新的連接注冊到worker線程組
- 3刁赖、注冊新連接的讀事件
檢測到有新連接進入
我們已經(jīng)知道,當服務端綁啟動之后长搀,服務端的channel已經(jīng)注冊到boos reactor線程中宇弛,reactor不斷檢測有新的事件,直到檢測出有accept事件發(fā)生
NioEventLoop.java
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
//檢查該SelectionKey是否有效源请,如果無效枪芒,則關閉channel
if (!k.isValid()) {
final EventLoop eventLoop;
try {
eventLoop = ch.eventLoop();
} catch (Throwable ignored) {
// If the channel implementation throws an exception because there is no event loop, we ignore this
// because we are only trying to determine if ch is registered to this event loop and thus has authority
// to close ch.
return;
}
// Only close ch if ch is still registered to this EventLoop. ch could have deregistered from the event loop
// and thus the SelectionKey could be cancelled as part of the deregistration process, but the channel is
// still healthy and should not be closed.
// See https://github.com/netty/netty/issues/5125
if (eventLoop != this || eventLoop == null) {
return;
}
// close the channel if the key is not valid anymore
unsafe.close(unsafe.voidPromise());
return;
}
try {
int readyOps = k.readyOps();
// We first need to call finishConnect() before try to trigger a read(...) or write(...) as otherwise
// the NIO JDK channel implementation may throw a NotYetConnectedException.
// 如果是OP_CONNECT,則需要移除OP_CONNECT否則Selector.select(timeout)將立即返回不會有任何阻塞谁尸,這樣可能會出現(xiàn)cpu 100%
if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
// remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking
// See https://github.com/netty/netty/issues/924
int ops = k.interestOps();
ops &= ~SelectionKey.OP_CONNECT;
k.interestOps(ops);
unsafe.finishConnect();
}
// Process OP_WRITE first as we may be able to write some queued buffers and so free memory.
// 如果準備好了WRITE則將緩沖區(qū)中的數(shù)據(jù)發(fā)送出去舅踪,如果緩沖區(qū)中數(shù)據(jù)都發(fā)送完成,則清除之前關注的OP_WRITE標記
if ((readyOps & SelectionKey.OP_WRITE) != 0) {
// Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
ch.unsafe().forceFlush();
}
// Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead
// to a spin loop
// 如果準備好READ或ACCEPT則觸發(fā)unsafe.read() ,檢查是否為0良蛮,如上面的源碼英文注釋所說:解決JDK可能會產生死循環(huán)的一個bug抽碌。
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
unsafe.read();
}
} catch (CancelledKeyException ignored) {
unsafe.close(unsafe.voidPromise());
}
}
該方法主要是對SelectionKey k進行了檢查,有如下幾種不同的情況
- 1)OP_ACCEPT背镇,接受客戶端連接
- 2)OP_READ, 可讀事件, 即 Channel 中收到了新數(shù)據(jù)可供上層讀取咬展。
- 3)OP_WRITE, 可寫事件, 即上層可以向 Channel 寫入數(shù)據(jù)。
- 4)OP_CONNECT, 連接建立事件, 即 TCP 連接已經(jīng)建立, Channel 處于 active 狀態(tài)瞒斩。
主要來看下當boss線程 selector檢測到OP_ACCEPT事件時破婆,內部干了些什么。
// Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead
// to a spin loop
// 如果準備好READ或ACCEPT則觸發(fā)unsafe.read() ,檢查是否為0胸囱,如上面的源碼英文注釋所說:解決JDK可能會產生死循環(huán)的一個bug祷舀。
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
unsafe.read();
}
boos reactor線程已經(jīng)輪詢到 SelectionKey.OP_ACCEPT 事件,說明有新的連接進入烹笔,此時將調用channel的 unsafe來進行實際的操作,此時的channel為 NioServerSocketChannel裳扯,則unsafe為NioServerSocketChannel的屬性NioMessageUnsafe
那么,我們進入到它的read方法谤职,進入新連接處理的第二步
注冊到reactor線程
NioMessageUnsafe.java
private final List<Object> readBuf = new ArrayList<Object>();
@Override
public void read() {
assert eventLoop().inEventLoop();
final ChannelConfig config = config();
final ChannelPipeline pipeline = pipeline();
final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
allocHandle.reset(config);
boolean closed = false;
Throwable exception = null;
try {
try {
do {
int localRead = doReadMessages(readBuf);
if (localRead == 0) {
break;
}
if (localRead < 0) {
closed = true;
break;
}
allocHandle.incMessagesRead(localRead);
} while (allocHandle.continueReading());
} catch (Throwable t) {
exception = t;
}
int size = readBuf.size();
for (int i = 0; i < size; i ++) {
readPending = false;
pipeline.fireChannelRead(readBuf.get(i));
}
readBuf.clear();
allocHandle.readComplete();
pipeline.fireChannelReadComplete();
if (exception != null) {
closed = closeOnReadError(exception);
pipeline.fireExceptionCaught(exception);
}
if (closed) {
inputShutdown = true;
if (isOpen()) {
close(voidPromise());
}
}
} finally {
// Check if there is a readPending which was not processed yet.
// This could be for two reasons:
// * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method
// * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
//
// See https://github.com/netty/netty/issues/2254
if (!readPending && !config.isAutoRead()) {
removeReadOp();
}
}
}
調用 doReadMessages 方法不斷地讀取消息饰豺,用 readBuf 作為容器,這里允蜈,其實可以猜到讀取的是一個個連接冤吨,然后調用 pipeline.fireChannelRead()蒿柳,將每條新連接經(jīng)過一層服務端channel的洗禮,之后清理容器漩蟆,觸發(fā) pipeline.fireChannelReadComplete()
下面我們具體看下這兩個方法:
- 1垒探、doReadMessages(List)
- 2、pipeline.fireChannelRead(NioSocketChannel)
doReadMessages()
@Override
protected int doReadMessages(List<Object> buf) throws Exception {
SocketChannel ch = SocketUtils.accept(javaChannel());
try {
if (ch != null) {
buf.add(new NioSocketChannel(this, ch));
return 1;
}
} catch (Throwable t) {
logger.warn("Failed to create a new channel from an accepted socket.", t);
try {
ch.close();
} catch (Throwable t2) {
logger.warn("Failed to close a socket.", t2);
}
}
return 0;
}
@Override
protected ServerSocketChannel javaChannel() {
return (ServerSocketChannel) super.javaChannel();
}
public final class SocketUtils {
@Override
protected int doReadMessages(List<Object> buf) throws Exception {
SocketChannel ch = SocketUtils.accept(javaChannel());
try {
if (ch != null) {
buf.add(new NioSocketChannel(this, ch));
return 1;
}
} catch (Throwable t) {
logger.warn("Failed to create a new channel from an accepted socket.", t);
try {
ch.close();
} catch (Throwable t2) {
logger.warn("Failed to close a socket.", t2);
}
}
return 0;
}
}
我們終于窺探到netty調用jdk底層nio的邊界serverSocketChannel.accept();怠李,由于netty中reactor線程第一步就掃描到有accept事件發(fā)生圾叼,因此,這里的accept方法是立即返回的捺癞,返回jdk底層nio創(chuàng)建的一條channel
ServerSocketChannel有阻塞和非阻塞兩種模式:
- a夷蚊、阻塞模式:ServerSocketChannel.accept() 方法監(jiān)聽新進來的連接,當 accept()方法返回的時候,它返回一個包含新進來的連接的 SocketChannel翘簇。阻塞模式下, accept()方法會一直阻塞到有新連接到達撬码。
- b、非阻塞模式:accept() 方法會立刻返回版保,如果還沒有新進來的連接,返回的將是null。 因此夫否,需要檢查返回的SocketChannel是否是null.
在NioServerSocketChannel的構造函數(shù)分析中彻犁,我們知道,其通過ch.configureBlocking(false);語句設置當前的ServerSocketChannel為非阻塞的凰慈。
在NioServerSocketChannel的構造函數(shù)分析中汞幢,我們知道,在其父類AbstractNioChannel的構造函數(shù)中通過ch.configureBlocking(false);語句設置當前的ServerSocketChannel為非阻塞的微谓。
netty將jdk的 SocketChannel 封裝成自定義的 NioSocketChannel森篷,加入到list里面,這樣外層就可以遍歷該list豺型,做后續(xù)處理
從上一篇文章中锡宋,我們已經(jīng)知道服務端的創(chuàng)建過程中會創(chuàng)建netty中一系列的核心組件叉钥,包括pipeline,unsafe等等,那么,接受一條新連接的時候是否也會創(chuàng)建這一系列的組件呢爹袁?
帶著這個疑問,我們跟進去
NioSocketChannel.java
public NioSocketChannel(Channel parent, SocketChannel socket) {
super(parent, socket);
config = new NioSocketChannelConfig(this, socket.socket());
}
我們重點分析 super(parent, socket)其屏,NioSocketChannel的父類為 AbstractNioByteChannel
AbstractNioByteChannel.java
protected AbstractNioByteChannel(Channel parent, SelectableChannel ch) {
super(parent, ch, SelectionKey.OP_READ);
}
這里证鸥,我們看到jdk nio里面熟悉的影子—— SelectionKey.OP_READ,一般在原生的jdk nio編程中娶眷,也會注冊這樣一個事件似嗤,表示對channel的讀感興趣
我們繼續(xù)往上,追蹤到AbstractNioByteChannel的父類 AbstractNioChannel, 這里届宠,我相信讀了上一篇文章你對于這部分代碼肯定是有印象的
protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
super(parent);
this.ch = ch;
this.readInterestOp = readInterestOp;
try {
ch.configureBlocking(false);
} catch (IOException e) {
try {
ch.close();
} catch (IOException e2) {
if (logger.isWarnEnabled()) {
logger.warn(
"Failed to close a partially initialized socket.", e2);
}
}
throw new ChannelException("Failed to enter non-blocking mode.", e);
}
}
在創(chuàng)建服務端channel的時候烁落,最終也會進入到這個方法壳咕,super(parent), 便是在AbstractChannel中創(chuàng)建一系列和該channel綁定的組件,如下
protected AbstractChannel(Channel parent) {
this.parent = parent;
id = newId();
unsafe = newUnsafe();
pipeline = newChannelPipeline();
}
而這里的 readInterestOp 表示該channel關心的事件是 SelectionKey.OP_READ顽馋,后續(xù)會將該事件注冊到selector谓厘,之后設置該通道為非阻塞模式,在channel中創(chuàng)建 unsafe 和一條 pipeline
pipeline.fireChannelRead(NioSocketChannel)
對于 pipeline我們前面已經(jīng)了解過寸谜,在netty的各種類型的channel中竟稳,都會包含一個pipeline,字面意思是管道熊痴,我們可以理解為一條流水線工藝他爸,流水線工藝有起點,有結束果善,中間還有各種各樣的流水線關卡诊笤,一件物品,在流水線起點開始處理巾陕,經(jīng)過各個流水線關卡的加工讨跟,最終到流水線結束
對應到netty里面,流水線的開始就是HeadContxt鄙煤,流水線的結束就是TailConext晾匠,HeadContxt中調用Unsafe做具體的操作,TailConext中用于向用戶拋出pipeline中未處理異常以及對未處理消息的警告
通過前面的文章中梯刚,我們已經(jīng)知道在服務端的channel初始化時凉馆,在pipeline中,已經(jīng)自動添加了一個pipeline處理器 ServerBootstrapAcceptor, 并已經(jīng)將用戶代碼中設置的一系列的參數(shù)傳入了構造函數(shù)亡资,接下來澜共,我們就來看下ServerBootstrapAcceptor
ServerBootstrapAcceptor.java
private static class ServerBootstrapAcceptor extends ChannelInboundHandlerAdapter {
private final EventLoopGroup childGroup;
private final ChannelHandler childHandler;
private final Entry<ChannelOption<?>, Object>[] childOptions;
private final Entry<AttributeKey<?>, Object>[] childAttrs;
private final Runnable enableAutoReadTask;
ServerBootstrapAcceptor(
final Channel channel, EventLoopGroup childGroup, ChannelHandler childHandler,
Entry<ChannelOption<?>, Object>[] childOptions, Entry<AttributeKey<?>, Object>[] childAttrs) {
this.childGroup = childGroup;
this.childHandler = childHandler;
this.childOptions = childOptions;
this.childAttrs = childAttrs;
// Task which is scheduled to re-enable auto-read.
// It's important to create this Runnable before we try to submit it as otherwise the URLClassLoader may
// not be able to load the class because of the file limit it already reached.
//
// See https://github.com/netty/netty/issues/1328
enableAutoReadTask = new Runnable() {
@Override
public void run() {
channel.config().setAutoRead(true);
}
};
}
@Override
@SuppressWarnings("unchecked")
public void channelRead(ChannelHandlerContext ctx, Object msg) {
final Channel child = (Channel) msg;
child.pipeline().addLast(childHandler);
setChannelOptions(child, childOptions, logger);
for (Entry<AttributeKey<?>, Object> e: childAttrs) {
child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
}
try {
childGroup.register(child).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
forceClose(child, future.cause());
}
}
});
} catch (Throwable t) {
forceClose(child, t);
}
}
}
前面的 pipeline.fireChannelRead(NioSocketChannel); 最終通過head->unsafe->ServerBootstrapAcceptor的調用鏈,調用到這里的 ServerBootstrapAcceptor 的channelRead方法锥腻,而 channelRead 一上來就把這里的msg強制轉換為 Channel
然后嗦董,拿到該channel,也就是我們之前new出來的 NioSocketChannel中對應的pipeline旷太,將用戶代碼中的 childHandler展懈,添加到pipeline,這里的 childHandler 在用戶代碼中的體現(xiàn)為
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new EchoServerHandler());
}
});
其實對應的是 ChannelInitializer供璧,到了這里存崖,NioSocketChannel中pipeline對應的處理器為 head->ChannelInitializer->tail,牢記睡毒,后面會再次提到来惧!
接著,設置 NioSocketChannel 對應的 attr和option演顾,然后進入到 childGroup.register(child)供搀,這里的childGroup就是我們在啟動代碼中new出來的NioEventLoopGroup
我們進入到NioEventLoopGroup的register方法隅居,代理到其父類MultithreadEventLoopGroup
MultithreadEventLoopGroup.java
@Override
public ChannelFuture register(Channel channel) {
return next().register(channel);
}
這里又扯出來一個 next()方法,我們跟進去
@Override
public EventLoop next() {
return (EventLoop) super.next();
}
回到其父類
MultithreadEventExecutorGroup.java
private final EventExecutorChooserFactory.EventExecutorChooser chooser;
@Override
public EventExecutor next() {
return chooser.next();
}
protected MultithreadEventExecutorGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory, Object... args) {
chooser = chooserFactory.newChooser(children);
}
這里的chooser對應的類為 EventExecutorChooser葛虐,字面意思為事件執(zhí)行器選擇器胎源,放到我們這里的上下文中的作用就是從worker reactor線程組中選擇一個reactor線程
@UnstableApi
public interface EventExecutorChooserFactory {
/**
* Returns a new {@link EventExecutorChooser}.
*/
EventExecutorChooser newChooser(EventExecutor[] executors);
/**
* Chooses the next {@link EventExecutor} to use.
*/
@UnstableApi
interface EventExecutorChooser {
/**
* Returns the new {@link EventExecutor} to use.
*/
EventExecutor next();
}
}
chooser的實現(xiàn)有兩種
@UnstableApi
public final class DefaultEventExecutorChooserFactory implements EventExecutorChooserFactory {
public static final DefaultEventExecutorChooserFactory INSTANCE = new DefaultEventExecutorChooserFactory();
private DefaultEventExecutorChooserFactory() { }
@SuppressWarnings("unchecked")
@Override
public EventExecutorChooser newChooser(EventExecutor[] executors) {
if (isPowerOfTwo(executors.length)) {
return new PowerOfTwoEventExecutorChooser(executors);
} else {
return new GenericEventExecutorChooser(executors);
}
}
private static boolean isPowerOfTwo(int val) {
return (val & -val) == val;
}
private static final class PowerOfTwoEventExecutorChooser implements EventExecutorChooser {
private final AtomicInteger idx = new AtomicInteger();
private final EventExecutor[] executors;
PowerOfTwoEventExecutorChooser(EventExecutor[] executors) {
this.executors = executors;
}
@Override
public EventExecutor next() {
return executors[idx.getAndIncrement() & executors.length - 1];
}
}
private static final class GenericEventExecutorChooser implements EventExecutorChooser {
private final AtomicInteger idx = new AtomicInteger();
private final EventExecutor[] executors;
GenericEventExecutorChooser(EventExecutor[] executors) {
this.executors = executors;
}
@Override
public EventExecutor next() {
return executors[Math.abs(idx.getAndIncrement() % executors.length)];
}
}
}
默認情況下,chooser通過 DefaultEventExecutorChooserFactory被創(chuàng)建屿脐,在創(chuàng)建reactor線程選擇器的時候涕蚤,會判斷reactor線程的個數(shù),如果是2的冪的诵,就創(chuàng)建PowerOfTowEventExecutorChooser万栅,否則,創(chuàng)建GenericEventExecutorChooser
兩種類型的選擇器在選擇reactor線程的時候西疤,都是通過Round-Robin的方式選擇reactor線程烦粒,唯一不同的是,PowerOfTowEventExecutorChooser是通過與運算代赁,而GenericEventExecutorChooser是通過取余運算扰她,與運算的效率要高于求余運算
選擇完一個reactor線程,即 NioEventLoop 之后管跺,我們回到注冊的地方
public ChannelFuture register(Channel channel) {
return next().register(channel);
}
SingleThreadEventLoop.java
@Override
public ChannelFuture register(Channel channel) {
return register(new DefaultChannelPromise(channel, this));
}
其實义黎,這里已經(jīng)和服務端啟動的過程一樣了,可以參考我前面的文章
AbstractChannel$AbstractUnsafe
private void register0(ChannelPromise promise) {
try {
// check if the channel is still open as it could be closed in the mean time when the register
// call was outside of the eventLoop
if (!promise.setUncancellable() || !ensureOpen(promise)) {
return;
}
boolean firstRegistration = neverRegistered;
doRegister();
neverRegistered = false;
registered = true;
// Ensure we call handlerAdded(...) before we actually notify the promise. This is needed as the
// user may already fire events through the pipeline in the ChannelFutureListener.
pipeline.invokeHandlerAddedIfNeeded();
safeSetSuccess(promise);
pipeline.fireChannelRegistered();
// Only fire a channelActive if the channel has never been registered. This prevents firing
// multiple channel actives if the channel is deregistered and re-registered.
if (isActive()) {
if (firstRegistration) {
pipeline.fireChannelActive();
} else if (config().isAutoRead()) {
// This channel was registered before and autoRead() is set. This means we need to begin read
// again so that we process inbound data.
//
// See https://github.com/netty/netty/issues/4805
beginRead();
}
}
} catch (Throwable t) {
// Close the channel directly to avoid FD leak.
closeForcibly();
closeFuture.setClosed();
safeSetFailure(promise, t);
}
}
和服務端啟動過程一樣豁跑,先是調用 doRegister();做真正的注冊過程,如下
AbstractNioChannel
@Override
protected void doRegister() throws Exception {
boolean selected = false;
for (;;) {
try {
selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
return;
} catch (CancelledKeyException e) {
if (!selected) {
// Force the Selector to select now as the "canceled" SelectionKey may still be
// cached and not removed because no Select.select(..) operation was called yet.
eventLoop().selectNow();
selected = true;
} else {
// We forced a select operation on the selector before but the SelectionKey is still cached
// for whatever reason. JDK bug ?
throw e;
}
}
}
}
將該條channel綁定到一個selector上去泻云,一個selector被一個reactor線程使用艇拍,后續(xù)該channel的事件輪詢,以及事件處理宠纯,異步task執(zhí)行都是由此reactor線程來負責
綁定完reactor線程之后卸夕,調用 pipeline.invokeHandlerAddedIfNeeded()
前面我們說到,到目前為止NioSocketChannel 的pipeline中有三個處理器婆瓜,head->ChannelInitializer->tail快集,最終會調用到 ChannelInitializer 的 handlerAdded 方法
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
if (ctx.channel().isRegistered()) {
// This should always be true with our current DefaultChannelPipeline implementation.
// The good thing about calling initChannel(...) in handlerAdded(...) is that there will be no ordering
// surprises if a ChannelInitializer will add another ChannelInitializer. This is as all handlers
// will be added in the expected order.
if (initChannel(ctx)) {
// We are done with init the Channel, removing the initializer now.
removeState(ctx);
}
}
}
handlerAdded方法調用 initChannel 方法之后,調用remove(ctx);將自身刪除
private boolean initChannel(ChannelHandlerContext ctx) throws Exception {
if (initMap.add(ctx)) { // Guard against re-entrance.
try {
initChannel((C) ctx.channel());
} catch (Throwable cause) {
// Explicitly call exceptionCaught(...) as we removed the handler before calling initChannel(...).
// We do so to prevent multiple calls to initChannel(...).
exceptionCaught(ctx, cause);
} finally {
ChannelPipeline pipeline = ctx.pipeline();
if (pipeline.context(this) != null) {
pipeline.remove(this);
}
}
return true;
}
return false;
}
而這里的 initChannel 方法又是神馬玩意廉白?讓我們回到用戶方法个初,比如下面這段用戶代碼
用戶代碼
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 100)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new LoggingHandler(LogLevel.INFO));
pipeline.addLast(new EchoServerHandler());
}
});
原來最終跑到我們自己的代碼里去了啊猴蹂!完了之后院溺,NioSocketChannel綁定的pipeline的處理器就包括 head->LoggingHandler->EchoServerHandler->tail
注冊讀事件
接下來,我們還剩下這些代碼沒有分析完
AbstractChannel$AbstractUnsafe
private void register0(ChannelPromise promise) {
try {
// check if the channel is still open as it could be closed in the mean time when the register
// call was outside of the eventLoop
if (!promise.setUncancellable() || !ensureOpen(promise)) {
return;
}
boolean firstRegistration = neverRegistered;
doRegister();
neverRegistered = false;
registered = true;
// Ensure we call handlerAdded(...) before we actually notify the promise. This is needed as the
// user may already fire events through the pipeline in the ChannelFutureListener.
pipeline.invokeHandlerAddedIfNeeded();
safeSetSuccess(promise);
pipeline.fireChannelRegistered();
// Only fire a channelActive if the channel has never been registered. This prevents firing
// multiple channel actives if the channel is deregistered and re-registered.
if (isActive()) {
if (firstRegistration) {
pipeline.fireChannelActive();
} else if (config().isAutoRead()) {
// This channel was registered before and autoRead() is set. This means we need to begin read
// again so that we process inbound data.
//
// See https://github.com/netty/netty/issues/4805
beginRead();
}
}
} catch (Throwable t) {
// Close the channel directly to avoid FD leak.
closeForcibly();
closeFuture.setClosed();
safeSetFailure(promise, t);
}
}
pipeline.fireChannelRegistered();磅轻,其實沒有干啥有意義的事情珍逸,最終無非是再調用一下業(yè)務pipeline中每個處理器的 ChannelHandlerAdded方法處理下回調
isActive()在連接已經(jīng)建立的情況下返回true逐虚,所以進入方法塊,進入到 pipeline.fireChannelActive();在這里我詳細步驟先省略谆膳,直接進入到關鍵環(huán)節(jié)
@Override
public final void beginRead() {
assertEventLoop();
if (!isActive()) {
return;
}
try {
doBeginRead();
} catch (final Exception e) {
invokeLater(new Runnable() {
@Override
public void run() {
pipeline.fireExceptionCaught(e);
}
});
close(voidPromise());
}
}
AbstractNioChannel
public abstract class AbstractNioChannel extends AbstractChannel {
@Override
protected void doBeginRead() throws Exception {
// Channel.read() or ChannelHandlerContext.read() was called
final SelectionKey selectionKey = this.selectionKey;
if (!selectionKey.isValid()) {
return;
}
readPending = true;
final int interestOps = selectionKey.interestOps();
if ((interestOps & readInterestOp) == 0) {
selectionKey.interestOps(interestOps | readInterestOp);
}
}
}
這里其實就是將 SelectionKey.OP_READ事件注冊到selector中去叭爱,表示這條通道已經(jīng)可以開始處理read事件了
至此,netty中關于新連接的處理已經(jīng)向你展示完了漱病,我們做下總結
- 1买雾、boos reactor線程輪詢到有新的連接進入
- 2、通過封裝jdk底層的channel創(chuàng)建 NioSocketChannel以及一系列的netty核心組件
- 3缨称、將該條連接通過chooser凝果,選擇一條worker reactor線程綁定上去
- 4、注冊讀事件睦尽,開始新連接的讀寫
參考:
https://www.cnblogs.com/java-chen-hao/p/11477358.html