Bootstrap 是 Netty 提供的一個便利的工廠類, 我們可以通過它來完成 Netty 的客戶端或服務器端的 Netty 初始化陪白。
下面從客戶端和服務器端分別分析一下 Netty 的程序是如何啟動的终佛。
客戶端 BootStrap
首先,讓我們從客戶端的代碼開始:
public class ChatClient {
public ChatClient connect(int port,String host,final String nickName){
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group)
.channel(NioSocketChannel.class)
.option(ChannelOption.SO_KEEPALIVE, true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
...
}
});
//發(fā)起同步連接操作
ChannelFuture channelFuture = bootstrap.connect(host, port).sync();
channelFuture.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
}finally{
//關閉,釋放線程資源
group.shutdownGracefully();
}
return this;
}
public static void main(String[] args) {
new ChatClient().connect(8080, "localhost","wei");
}
}
從上面的客戶端代碼雖然簡單, 但是卻展示了 Netty 客戶端初始化時所需的所有內容:
1、EventLoopGroup:不論是服務器端還是客戶端, 都必須指定 EventLoopGroup。
在這個例子中, 指定了NioEventLoopGroup, 表示一個 NIO 的 EventLoopGroup氓轰。關于EventLoopGroup的詳細源碼解析,會在以后的章節(jié)詳解浸卦。2署鸡、ChannelType: 指定 Channel 的類型。 因為是客戶端限嫌,因此使用了 NioSocketChannel靴庆。
在 Netty 中,Channel 是一個 Socket 的抽象萤皂,它為用戶提供了關于 Socket 狀態(tài)(是否是連接還是斷開)以及對 Socket的讀寫等操作撒穷。每當 Netty 建立了一個連接后, 都創(chuàng)建一個對應的 Channel 實例。除了 TCP 協(xié)議以外裆熙,Netty 還支持很多其他的連接協(xié)議, 并且每種協(xié)議還有 NIO(非阻塞 IO)和 OIO(Old-IO, 即傳統(tǒng)的阻塞 IO)版本的區(qū)別端礼。不同協(xié)議不同的阻塞類型的連接都有不同的 Channel 類型與之對應下面是一些常用的 Channel類型:
NioSocketChannel: 異步非阻塞的客戶端 TCP Socket 連接。
NioServerSocketChannel: 異步非阻塞的服務器端 TCP Socket 連接入录。
NioDatagramChannel: 異步非阻塞的 UDP 連接蛤奥。
NioSctpChannel: 異步的客戶端 Sctp(Stream Control Transmission Protocol,流控制傳輸協(xié)議)連接僚稿。
NioSctpServerChannel: 異步的 Sctp 服務器端連接凡桥。
OioSocketChannel: 同步阻塞的客戶端 TCP Socket 連接。
OioServerSocketChannel: 同步阻塞的服務器端 TCP Socket 連接蚀同。
OioDatagramChannel: 同步阻塞的 UDP 連接缅刽。
OioSctpChannel: 同步的 Sctp 服務器端連接。
OioSctpServerChannel: 同步的客戶端 TCP Socket 連接蠢络。3衰猛、Handler: 設置處理數(shù)據(jù)的 Handler。
NioSocketChannel 的創(chuàng)建
下面我們繼續(xù)深入代碼刹孔,看一下客戶端通過 Bootstrap 啟動后啡省,都做了哪些工作?我們看一下 NioSocketChannel 的類層次結構如下:
回到我們在客戶端連接代碼的初始化 Bootstrap 中調用了一個 channel()方法髓霞,傳入的參數(shù)是 NioSocketChannel.class, 在這個方法中其實就是初始化了一個 ReflectiveChannelFactory 的對象:
public B channel(Class<? extends C> channelClass) {
if (channelClass == null) {
throw new NullPointerException("channelClass");
} else {
return this.channelFactory((io.netty.channel.ChannelFactory)(new ReflectiveChannelFactory(channelClass)));
}
}
而 ReflectiveChannelFactory 實現(xiàn)了 ChannelFactory 接口, 它提供了唯一的方法, 即 newChannel()方法卦睹,ChannelFactory, 顧名思義, 就是創(chuàng)建 Channel 的工廠類。進入到 ReflectiveChannelFactory 的 newChannel()方法中, 我們看到其實現(xiàn)代碼如下:
public T newChannel() {
try {
return (Channel)this.clazz.newInstance();
} catch (Throwable var2) {
throw new ChannelException("Unable to create Channel from class " + this.clazz, var2);
}
}
根據(jù)上面代碼的提示方库,我們就可以得出:
1结序、Bootstrap 中的 ChannelFactory 實現(xiàn)類是 ReflectiveChannelFactory。
2纵潦、通過 channel()方法創(chuàng)建的 Channel 具體類型是 NioSocketChannel徐鹤。
Channel 的實例化過程其實就是調用 ChannelFactory 的 newChannel()方法配喳,而實例化的 Channel 具體類型又是和初始化 Bootstrap 時傳入的 channel()方法的參數(shù)相關。
因此對于客戶端的 Bootstrap 而言凳干,創(chuàng)建的 Channel 實例就是NioSocketChannel。
客戶端 Channel 的初始化
前面我們已經(jīng)知道了如何設置一個 Channel 的類型被济,并且了解到 Channel 是通過 ChannelFactory 的 newChannel()方法來實例化的, 那么 ChannelFactory 的 newChannel()方法在哪里調用呢?繼續(xù)跟蹤, 我們發(fā)現(xiàn)其調用鏈如下:
在 AbstractBootstrap 的 initAndRegister()中調用了 ChannelFactory()的 newChannel()來創(chuàng)建一個 NioSocketChannel的實例救赐,其源碼如下:
final ChannelFuture initAndRegister() {
// 去掉非關鍵代碼
Channel channel = channelFactory.newChannel();
init(channel);
ChannelFuture regFuture = config().group().register(channel);
// 去掉非關鍵代碼
return regFuture;
}
在 newChannel()方法中,利用反射機制調用類對象的 newInstance()方法來創(chuàng)建一個新的 Channel 實例只磷,相當于調用NioSocketChannel 的默認構造器经磅。NioSocketChannel 的默認構造器代碼如下:
public NioSocketChannel() {
this(DEFAULT_SELECTOR_PROVIDER);
}
public NioSocketChannel(SelectorProvider provider) {
this(newSocket(provider));
}
這里的代碼比較關鍵,我們看到钮追,在這個構造器中會調用 newSocket()來打開一個新的 Java NIO 的 SocketChannel:
private static java.nio.channels.SocketChannel newSocket(SelectorProvider provider) {
try {
return provider.openSocketChannel();
} catch (IOException var2) {
throw new ChannelException("Failed to open a socket.", var2);
}
}
緊接著會調用父類, 即 AbstractNioByteChannel 的構造器:
public NioSocketChannel(java.nio.channels.SocketChannel socket) {
this((Channel)null, socket);
}
public NioSocketChannel(Channel parent, java.nio.channels.SocketChannel socket) {
super(parent, socket);//調用父類
this.config = new NioSocketChannel.NioSocketChannelConfig(this, socket.socket());
}
protected AbstractNioByteChannel(Channel parent, SelectableChannel ch) {
super(parent, ch, 1);
}
并傳入?yún)?shù) parent 為 null, ch 為剛才調用 newSocket()創(chuàng)建的 Java NIO 的 SocketChannel 對象, 因此新創(chuàng)建的NioSocketChannel 對象中 parent 暫時是 null预厌。
接著點super
protected AbstractNioByteChannel(Channel parent, SelectableChannel ch) {
super(parent, ch, SelectionKey.OP_READ));
}
接著會繼續(xù)調用父類 AbstractNioChannel 的構造器,并傳入實際參數(shù) readInterestOp = SelectionKey.OP_READ:
protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
super(parent);
this.ch = ch;
this.readInterestOp = readInterestOp;
// 省略...catch 塊
// 設置 Java NIO SocketChannel 為非阻塞的
ch.configureBlocking(false);
}
然后繼續(xù)調用父類 AbstractChannel 的構造器:
protected AbstractChannel(Channel parent) {
this.parent = parent;
this.id = this.newId();
this.unsafe = this.newUnsafe();
this.pipeline = this.newChannelPipeline();
}
至此, NioSocketChannel 就初始化就完成了, 我們可以稍微總結一下 NioSocketChannel 初始化所做的工作內容:
- 1元媚、調用 NioSocketChannel.newSocket(DEFAULT_SELECTOR_PROVIDER)打開一個新的 Java NIOSocketChannel轧叽。
- 2、AbstractChannel(Channel parent)中需要初始化的屬性:
parent:屬性置為 null刊棕。
id:每個 Channel 都擁有一個唯一的 id炭晒。
unsafe:通過 newUnsafe()實例化一個 unsafe 對象,它的類型是 AbstractNioByteChannel.NioByteUnsafe 內部類甥角。
pipeline:是 new DefaultChannelPipeline(this)新創(chuàng)建的實例网严。 - 3、AbstractNioChannel 中的屬性:
ch:賦值為 Java SocketChannel嗤无,即 NioSocketChannel 的 newSocket()方法返回的 Java NIO SocketChannel震束。
readInterestOp:賦值為 SelectionKey.OP_READ
ch:被配置為非阻塞,即調用 ch.configureBlocking(false)当犯。 - 4垢村、NioSocketChannel 中的屬性:
config = new NioSocketChannelConfig(this, socket.socket())
Unsafe 字段的初始化
我們簡單地提到了,在實例化 NioSocketChannel 的過程中灶壶,會在父類 AbstractChannel 的構造方法中調用
newUnsafe()來獲取一個 unsafe 實例肝断。那么 unsafe 是怎么初始化的呢? 它的作用是什么?
其實 unsafe 特別關鍵,它封裝了對 Java 底層 Socket 的操作驰凛,因此實際上是溝通 Netty 上層和 Java 底層的重要的橋梁胸懈。那么我們下面看一下 Unsafe 接口所提供的方法吧:
interface Unsafe {
RecvByteBufAllocator.Handle recvBufAllocHandle();
SocketAddress localAddress();
SocketAddress remoteAddress();
void register(EventLoop eventLoop, ChannelPromise promise);
void bind(SocketAddress localAddress, ChannelPromise promise);
void connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise);
void disconnect(ChannelPromise promise);
void close(ChannelPromise promise);
void closeForcibly();
void deregister(ChannelPromise promise);
void beginRead();
void write(Object msg, ChannelPromise promise);
void flush();
ChannelPromise voidPromise();
ChannelOutboundBuffer outboundBuffer();
}
從源碼中可以看出, 這些方法其實都是對應到相關的 Java 底層的 Socket 的操作。
繼續(xù)回到 AbstractChannel 的構造方法中恰响,在這里調用了 newUnsafe()獲取一個新的 unsafe 對象,而 newUnsafe()方法在 NioSocketChannel 中被重寫了趣钱。來看代碼:
protected AbstractNioUnsafe newUnsafe() {
return new NioSocketChannelUnsafe();
}
NioSocketChannel 的 newUnsafe()方法會返回一個 NioSocketChannelUnsafe 實例。從這里我們就可以確定了胚宦,在實例化的 NioSocketChannel 中的 unsafe 字段首有,其實是一個 NioSocketChannelUnsafe 的實例燕垃。
Pipeline 的初始化
上面我們分析了 NioSocketChannel 的大體初始化過程, 但是我們漏掉了一個關鍵的部分,即 ChannelPipeline 的初始化井联。在 Pipeline 的注釋說明中寫到“Each channel has its own pipeline and it is created automatically when a new channel is created.”卜壕,我們知道,在實例化一個 Channel 時,必然都要實例化一個 ChannelPipeline烙常。而我們確實在AbstractChannel 的構造器看到了 pipeline 字段被初始化為 DefaultChannelPipeline 的實例轴捎。
protected AbstractChannel(Channel parent) {
this.parent = parent;
this.id = this.newId();
this.unsafe = this.newUnsafe();
this.pipeline = this.newChannelPipeline();
}
protected DefaultChannelPipeline newChannelPipeline() {
return new DefaultChannelPipeline(this);
}
那么我們就來看一下,DefaultChannelPipeline 構造器做了哪些工作蚕脏。
protected DefaultChannelPipeline(Channel channel) {
this.channel = (Channel)ObjectUtil.checkNotNull(channel, "channel");
this.succeededFuture = new SucceededChannelFuture(channel, (EventExecutor)null);
this.voidPromise = new VoidChannelPromise(channel, true);
this.tail = new DefaultChannelPipeline.TailContext(this);
this.head = new DefaultChannelPipeline.HeadContext(this);
this.head.next = this.tail;
this.tail.prev = this.head;
}
DefaultChannelPipeline 的構造器需要傳入一個 channel侦副,而這個 channel 其實就是我們實例化的 NioSocketChannel,DefaultChannelPipeline 會將這個 NioSocketChannel 對象保存在 channel 字段中驼鞭。
DefaultChannelPipeline 中還有兩個特殊的字段秦驯,即 head 和 tail,這兩個字段是雙向鏈表的頭和尾挣棕。其實在 DefaultChannelPipeline 中译隘,維護了一個以AbstractChannelHandlerContext 為節(jié)點元素的雙向鏈表,這個鏈表是 Netty 實現(xiàn) Pipeline 機制的關鍵洛心。關于DefaultChannelPipeline 中的雙向鏈表以及它所起的作用细燎,暫不講解,后續(xù)再做深入分析皂甘。
先看看 HeadContext的類繼承層次結構如下所示:
TailContext 的繼承層次結構如下所示:
我們可以看到忙干,鏈表中 head 是一個 ChannelOutboundHandler鸿市,而 tail 則是一個 ChannelInboundHandler晦炊。
接著看HeadContext 的構造器:
HeadContext(DefaultChannelPipeline pipeline) {
super(pipeline, (EventExecutor)null, DefaultChannelPipeline.HEAD_NAME, false, true);
this.unsafe = pipeline.channel().unsafe();
this.setAddComplete();
}
它調用了父類 AbstractChannelHandlerContext 的構造器栈暇,并傳入?yún)?shù) inbound = false,outbound = true渐夸。而
TailContext 的構造器與 HeadContext 的相反嗤锉,它調用了父類 AbstractChannelHandlerContext 的構造器,并傳入?yún)?shù)inbound = true墓塌,outbound = false瘟忱。
即 header 是一個 OutBoundHandler,而 tail 是一個 InBoundHandler苫幢,關于這一特征访诱,大家要特別注意,先記住韩肝,因為在Netty 的 Pipeline 中触菜,會反復用到 inbound 和 outbound 這兩個屬性。
EventLoop 的初始化
回到最開始的 ChatClient 用戶代碼中哀峻,我們在一開始就實例化了一個 NioEventLoopGroup 對象涡相,因此我們就從它的構造器中追蹤一下 EventLoop 的初始化過程哲泊。首先來看一下 NioEventLoopGroup 的類繼承層次:
NioEventLoopGroup 有幾個重載的構造器,不過內容都沒有太大的區(qū)別催蝗,最終都是調用的父類 MultithreadEventLoopGroup的構造器:
protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
}
其中有個有意思的地方是切威,如果我們傳入的線程數(shù) nThreads 是 0,那么 Netty 會為我們設置默認的線程數(shù)
DEFAULT_EVENT_LOOP_THREADS丙号,而這個默認的線程數(shù)是怎么確定的呢?
其實很簡單牢屋,在靜態(tài)代碼塊中,會首先確定 DEFAULT_EVENT_LOOP_THREADS 的值:
static {
DEFAULT_EVENT_LOOP_THREADS = Math.max(1,
SystemPropertyUtil.getInt("io.netty.eventLoopThreads",
Runtime.getRuntime().availableProcessors() * 2));
}
Netty 首先會從系統(tǒng)屬性中獲取"io.netty.eventLoopThreads"的值槽袄,如果我們沒有設置的話,那么就返回默認值:即處理器核心數(shù) * 2锋谐”槌撸回到 MultithreadEventLoopGroup 構造器中會繼續(xù)調用父類 MultithreadEventExecutorGroup 的構造器:
protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
EventExecutorChooserFactory chooserFactory, Object... args) {
// 去掉了參數(shù)檢查,異常處理等代碼
children = new EventExecutor[nThreads];
for (int i = 0; i < nThreads; i ++) {
// 去掉了 try...catch...finally 代碼塊
children[i] = newChild(executor, args);
}
chooser = chooserFactory.newChooser(children);
// 去掉了包裝處理的代碼
}
我們可以繼續(xù)跟蹤到 newChooser 方法里面看看其實現(xiàn)邏輯涮拗,具體代碼如下:
public final class DefaultEventExecutorChooserFactory implements EventExecutorChooserFactory {
//去掉了定義全局變量的代碼
public EventExecutorChooser newChooser(EventExecutor[] executors) {
if (isPowerOfTwo(executors.length)) {
return new PowerOfTwoEventExecutorChooser(executors);
} else {
return new GenericEventExecutorChooser(executors);
}
}
private static boolean isPowerOfTwo(int val) {
return (val & -val) == val;
}
private static final class PowerOfTwoEventExecutorChooser implements EventExecutorChooser {
private final AtomicInteger idx = new AtomicInteger();
private final EventExecutor[] executors;
PowerOfTwoEventExecutorChooser(EventExecutor[] executors) {
this.executors = executors;
}
public EventExecutor next() {
return executors[idx.getAndIncrement() & executors.length - 1];
}
}
private static final class GenericEventExecutorChooser implements EventExecutorChooser {
private final AtomicInteger idx = new AtomicInteger();
private final EventExecutor[] executors;
GenericEventExecutorChooser(EventExecutor[] executors) {
this.executors = executors;
}
public EventExecutor next() {
return executors[Math.abs(idx.getAndIncrement() % executors.length)];
}
}
}
上面的代碼邏輯主要表達的意思是:即如果 nThreads 是 2 的冪乾戏,則使用 PowerOfTwoEventExecutorChooser,否則
使用 GenericEventExecutorChooser三热。這兩個 Chooser 都重寫 next()方法鼓择。next()方法的主要功能就是將數(shù)組索引循環(huán)位移,如下圖所示:
當索引移動最后一個位置時就漾,再調用 next()方法就會將索引位置重新指向 0呐能。
這個運算邏輯其實很簡單,就是每次讓索引自增后和數(shù)組長度取模:idx.getAndIncrement() % executors.length抑堡。但是就連一個非常簡單的數(shù)組索引運算摆出,Netty 都幫我們做了優(yōu)化。因為在計算機底層首妖,&與比%運算效率更高偎漫。
好了,分析到這里我們應該已經(jīng)非常清楚 MultithreadEventExecutorGroup 中的處理邏輯有缆,簡單做一個總結:
- 1象踊、創(chuàng)建一個大小為 nThreads 的 SingleThreadEventExecutor 數(shù)組。
- 2棚壁、根據(jù) nThreads 的大小杯矩,創(chuàng)建不同的 Chooser,即如果 nThreads 是 2 的冪袖外,則使用
PowerOfTwoEventExecutorChooser菊碟,反之使用 GenericEventExecutorChooser。不論使用哪個 Chooser在刺,它們的功能都是一樣的逆害,即從 children 數(shù)組中選出一個合適的 EventExecutor 實例头镊。 - 3、調用 newChhild()方法初始化 children 數(shù)組魄幕。
根據(jù)上面的代碼相艇,我們也能知道:MultithreadEventExecutorGroup 內部維護了一個 EventExecutor 數(shù)組,而 Netty 的EventLoopGroup的實現(xiàn)機制其實就建立在MultithreadEventExecutorGroup之上纯陨。每當Netty需要一個EventLoop 時, 會調用 next()方法獲取一個可用的 EventLoop坛芽。
上面代碼的最后一部分是 newChild()方法,這個是一個抽象方法翼抠,它的任務是實例化 EventLoop 對象咙轩。我們跟蹤一下它的代碼∫跤保可以發(fā)現(xiàn)活喊。這個方法在 NioEventLoopGroup 類中有實現(xiàn),其內容很簡單:
protected EventLoop newChild(Executor executor, Object... args) throws Exception {
return new NioEventLoop(this, executor, (SelectorProvider) args[0],
((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2]);
}
其實邏輯很簡單就是實例化一個 NioEventLoop 對象, 然后返回 NioEventLoop 對象量愧。
最后總結一下整個 EventLoopGroup 的初始化過程:
- 1钾菊、EventLoopGroup(其實是 MultithreadEventExecutorGroup)內部維護一個類型為 EventExecutor children 數(shù)組, 其大小是 nThreads,這樣就構成了一個線程池。
- 2偎肃、如果我們在實例化 NioEventLoopGroup 時煞烫,如果指定線程池大小,則 nThreads 就是指定的值累颂,反之是處理器核心數(shù) * 2滞详。
- 3、MultithreadEventExecutorGroup 中會調用 newChild()抽象方法來初始化 children 數(shù)組紊馏。
- 4茵宪、抽象方法 newChild()是在 NioEventLoopGroup 中實現(xiàn)的,它返回一個 NioEventLoop 實例瘦棋。
- 5稀火、NioEventLoop 屬性賦值:
provider:在 NioEventLoopGroup 構造器中通過 SelectorProvider.provider()獲取一個 SelectorProvider。
selector:在 NioEventLoop 構造器中通過調用通過 provider.openSelector()方法獲取一個 selector 對象赌朋。
Channel 注冊到 Selector
在前面的分析中凰狞,我們提到 Channel 會在 Bootstrap 的 initAndRegister()中進行初始化,但是這個方法還會將初始化好的 Channel 注冊到 NioEventLoop 的 selector 中沛慢。接下來我們來分析一下 Channel 注冊的過程赡若。
再回顧一下 AbstractBootstrap 的 initAndRegister()方法:
final ChannelFuture initAndRegister() {
// 刪除了非關鍵代碼
Channelchannel = channelFactory.newChannel();
init(channel);
ChannelFuture regFuture = config().group().register(channel);
return regFuture;
}
當 Channel 初始化后,緊接著會調用 group().register()方法來向 selector 注冊 Channel团甲。我們繼續(xù)跟蹤的話逾冬,會發(fā)現(xiàn)其調用鏈如下:
通過跟蹤調用鏈, 最終我們發(fā)現(xiàn)是調用到了 unsafe 的 register 方法,那么接下來我們就仔細看一下
AbstractChannel$AbstractUnsafe.register()方法中到底做了什么?
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
// 省略了條件判斷和錯誤處理的代碼
AbstractChannel.this.eventLoop = eventLoop;
register0(promise);
}
首先,將 eventLoop 賦值給 Channel 的 eventLoop 屬性身腻,而我們知道這個 eventLoop 對象其實是
MultithreadEventLoopGroup 的 next()方法獲取的产还,根據(jù)我們前面的分析,我們可以確定 next()方法返回的 eventLoop對象是 NioEventLoop 實例嘀趟。register()方法接著調用了 register0()方法:
private void register0(ChannelPromise promise) {
// 省略了非關鍵代碼
boolean firstRegistration = neverRegistered;
doRegister();
neverRegistered = false;
registered = true;
pipeline.invokeHandlerAddedIfNeeded();
safeSetSuccess(promise);
pipeline.fireChannelRegistered();
if (isActive()) {
if (firstRegistration) {
pipeline.fireChannelActive();
}
}
}
register0()方法又調用了 AbstractNioChannel 的 doRegister()方法:
protected void doRegister() throws Exception {
// 省略了錯誤處理的代碼
selectionKey = javaChannel().register(eventLoop().selector, 0, this);
}
看到 javaChannel()這個方法在前面我們已經(jīng)知道了脐区,它返回的是一個 Java NIO 的 SocketChannel 對象,這里我們將這個 SocketChannel 注冊到與 eventLoop 關聯(lián)的 selector 上了她按。
我們總結一下 Channel 的注冊過程:
- 1牛隅、首先在 AbstractBootstrap 的 initAndRegister()方法中, 通過 group().register(channel),調用
MultithreadEventLoopGroup 的 register()方法酌泰。 - 2媒佣、在 MultithreadEventLoopGroup 的 register()中,調用 next()方法獲取一個可用的 SingleThreadEventLoop, 然后調用它的 register()方法陵刹。
- 3默伍、在 SingleThreadEventLoop 的 register()方法中,調用 channel.unsafe().register(this, promise)方法來獲取channel 的 unsafe()底層操作對象授霸,然后調用 unsafe 的 register()。
- 4际插、在 AbstractUnsafe 的 register()方法中, 調用 register0()方法注冊 Channel 對象碘耳。
- 5、在 AbstractUnsafe 的 register0()方法中框弛,調用 AbstractNioChannel 的 doRegister()方法辛辨。
- 6、AbstractNioChannel 的 doRegister()方法通過 javaChannel().register(eventLoop().selector, 0, this)將 Channel對應的 Java NIO 的 SocketChannel 注冊到一個 eventLoop 的 selector 中瑟枫,并且將當前 Channel 作為 attachment 與SocketChannel 關聯(lián)斗搞。
總的來說,Channel 注冊過程所做的工作就是將 Channel 與對應的 EventLoop 關聯(lián)慷妙,因此這也體現(xiàn)了僻焚,在 Netty中,每個 Channel 都會關聯(lián)一個特定的 EventLoop膝擂,并且這個 Channel 中的所有 IO 操作都是在這個 EventLoop 中執(zhí)行的虑啤;當關聯(lián)好 Channel 和 EventLoop 后,會繼續(xù)調用底層 Java NIO 的 SocketChannel 對象的 register()方法架馋,將底層 Java NIO 的 SocketChannel 注冊到指定的 selector 中狞山。通過這兩步,就完成了 Netty 對 Channel 的注冊過程叉寂。
Handler 的添加過程
Netty 有一個強大和靈活之處就是基于 Pipeline 的自定義 handler 機制萍启。基于此,我們可以像添加插件一樣自由組合各種各樣的 handler 來完成業(yè)務邏輯勘纯。
例如我們需要處理 HTTP 數(shù)據(jù)局服,那么就可以在 pipeline 前添加一個針對 HTTP編、解碼的 Handler屡律,然后接著添加我們自己的業(yè)務邏輯的 handler腌逢,這樣網(wǎng)絡上的數(shù)據(jù)流就向通過一個管道一樣, 從不同的 handler 中流過并進行編、解碼超埋,最終在到達我們自定義的 handler 中搏讶。
這個 pipeline 機制是這么的強大,那么它是怎么實現(xiàn)的呢? 在此不作詳細講解霍殴,先體驗一下自定義的 handler 是如何以及何時添加到 ChannelPipeline中的媒惕。首先我們看一下如下的用戶代碼片段:
// 此處省略 N 句代碼
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new StringDecoder());
pipeline.addLast(new StringEncoder());
pipeline.addLast(new ChatClientHandler(nickName));
}
});
這個代碼片段就是實現(xiàn)了 handler 的添加功能。我們看到来庭,Bootstrap 的 handler()方法接收一個 ChannelHandler妒蔚,而我們傳的參數(shù)是一個派生于抽象類 ChannelInitializer 的匿名類,它當然也實現(xiàn)了 ChannelHandler 接口月弛。我們來看一下肴盏,ChannelInitializer 類內到底有什么玄機:
public abstract class ChannelInitializer<C extends Channel> extends ChannelInboundHandlerAdapter {
private static final InternalLogger logger = InternalLoggerFactory.getInstance(ChannelInitializer.class);
private final ConcurrentMap<ChannelHandlerContext, Boolean> initMap = PlatformDependent.newConcurrentHashMap();
protected abstract void initChannel(C ch) throws Exception;
@Override
@SuppressWarnings("unchecked")
public final void channelRegistered(ChannelHandlerContext ctx) throws Exception {
if (initChannel(ctx)) {
ctx.pipeline().fireChannelRegistered();
} else {
ctx.fireChannelRegistered();
}
}
// 這個方法在 channelRegistered 中被調用
private boolean initChannel(ChannelHandlerContext ctx) throws Exception {
initChannel((C) ctx.channel());
remove(ctx);
return false;
}
// 省略...
}
ChannelInitializer 是一個抽象類,它有一個抽象的方法 initChannel()帽衙,我們看到的匿名類正是實現(xiàn)了這個方法菜皂,并在這個方法中添加的自定義的 handler 的。那么 initChannel()是哪里被調用的呢?其實是在 ChannelInitializer 的channelRegistered()方法中厉萝。
接下來關注一下 channelRegistered()方法恍飘。從上面的源碼中,我們可以看到谴垫,在 channelRegistered()方法中章母,會調用 initChannel()方法,將自定義的 handler 添加到 ChannelPipeline 中翩剪,然后調用 ctx.pipeline().remove(this)方法將自己從 ChannelPipeline 中刪除乳怎。上面的分析過程,如下圖片所示:
一開始前弯,ChannelPipeline 中只有三個 handler舞肆,分別是:head、tail 和我們添加的 ChannelInitializer博杖。
接著 initChannel()方法調用后椿胯,添加了自定義的 handler:
最后將 ChannelInitializer 刪除:
分析到這里,我們已經(jīng)簡單了解了自定義的 handler 是如何添加到 ChannelPipeline 中的
客戶端發(fā)起連接請求
經(jīng)過上面的各種分析后剃根,我們大致已經(jīng)了解 Netty 客戶端初始化時哩盲,所做的工作,那么接下來我們就直奔主題,分析一下客戶端是如何發(fā)起 TCP 連接的廉油?
首先惠险,客戶端通過調用 Bootstrap 的 connect()方法進行連接。在 connect()方法中抒线,會進行一些參數(shù)檢查后班巩,最終調用的是 doConnect()方法,其代碼實現(xiàn)如下:
private static void doConnect(
final SocketAddress remoteAddress,
final SocketAddress localAddress,
final ChannelPromise connectPromise) {
final Channel channel = connectPromise.channel();
channel.eventLoop().execute(new Runnable() {
@Override
public void run() {
if (localAddress == null) {
channel.connect(remoteAddress, connectPromise);
} else {
channel.connect(remoteAddress, localAddress, connectPromise);
}
connectPromise.addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
}
});
}
在 doConnect()方法中嘶炭,會 eventLoop 線程中調用 Channel 的 connect()方法抱慌,而這個 Channel 的具體類型實際就是NioSocketChannel,前面已經(jīng)分析過了眨猎。繼續(xù)跟蹤到 channel.connect()方法中抑进,我們發(fā)現(xiàn)它調用的是DefaultChannelPipeline 的 connect()方法,pipeline 的 connect()方法代碼如下:
public final ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise) {
return tail.connect(remoteAddress, promise);
}
tail 我們已經(jīng)分析過, 是一個 TailContext 的實例睡陪,而 TailContext 又是 AbstractChannelHandlerContext 的子類寺渗,并且沒有實現(xiàn) connect()方法,因此這里調用的其實是 AbstractChannelHandlerContext 的 connect()方法兰迫,我們看一下這個方法的實現(xiàn):
public ChannelFuture connect(
final SocketAddress remoteAddress,
final SocketAddress localAddress, final ChannelPromise promise) {
// 刪除參數(shù)檢查的代碼
final AbstractChannelHandlerContext next = findContextOutbound();
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeConnect(remoteAddress, localAddress, promise);
} else {
safeExecute(executor, new Runnable() {
@Override
public void run() {
next.invokeConnect(remoteAddress, localAddress, promise);
}
}, promise, null);
}
return promise;
}
上面的代碼中有一個關鍵的地方信殊,即 final AbstractChannelHandlerContext next = findContextOutbound(),這里調用 findContextOutbound()方法汁果,從 DefaultChannelPipeline 內的雙向鏈表的 tail 開始涡拘,不斷向前找到第一個 outbound為 true 的 AbstractChannelHandlerContext,然后調用它的 invokeConnect()方法须鼎,其代碼如下:
private void invokeConnect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
// 忽略 try...catch 塊
((ChannelOutboundHandler) handler()).connect(this, remoteAddress, localAddress, promise);
}
前面我們有提到鲸伴,在 DefaultChannelPipeline 的構造器中府蔗,實例化了兩個對象:head 和 tail晋控,并形成了雙向鏈表的頭和尾。head 是 HeadContext 的實例姓赤,它實現(xiàn)了 ChannelOutboundHandler 接口赡译,并且它的 outbound 設置為 true。
因此在 findContextOutbound()方法中不铆,找到的 AbstractChannelHandlerContext 對象其實就是 head蝌焚。進而在invokeConnect()方法中, 我們向上轉換為 ChannelOutboundHandler 就是沒問題的了。而又因為 HeadContext 重寫了 connect()方法誓斥,因此實際上調用的是 HeadContext 的 connect()方法只洒。我們接著跟蹤到 HeadContext 的 connect()方法,看其代碼如下:
public void connect(
ChannelHandlerContext ctx,
SocketAddress remoteAddress, SocketAddress localAddress,
ChannelPromise promise) throws Exception {
unsafe.connect(remoteAddress, localAddress, promise);
}
這個 connect()方法很簡單劳坑,只是調用了 unsafe 的 connect()方法毕谴。回顧一下 HeadContext 的構造器, 我們發(fā)現(xiàn)這個unsafe 其實就是 pipeline.channel().unsafe()返回的 Channel 的 unsafe 字段。到這里為止涝开,我們應該已經(jīng)知道, 其實是AbstractNioByteChannel.NioByteUnsafe 內部類兜了一大圈圈循帐。最后,我們找到創(chuàng)建 Socket 連接的關鍵代碼繼續(xù)跟蹤舀武,其實調用的就是 AbstractNioUnsafe 的 connect()方法:
public final void connect(
final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
// 省去前面的判斷
boolean wasActive = isActive();
if (doConnect(remoteAddress, localAddress)) {
fulfillConnectPromise(promise, wasActive);
} else {
// 此處省略 N 行代碼
}
}
在這個 connect()方法中拄养,又調用了 doConnect()方法。注意:這個方法并不是 AbstractNioUnsafe 的方法银舱,而是AbstractNioChannel 的抽象方法瘪匿。doConnect()方法是在 NioSocketChannel 中實現(xiàn)的,因此進入到 NioSocketChannel的 doConnect()方法中:
protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
if (localAddress != null) {
doBind0(localAddress);
}
boolean success = false;
try {
boolean connected = javaChannel().connect(remoteAddress);
if (!connected) {
selectionKey().interestOps(SelectionKey.OP_CONNECT);
}
success = true;
return connected;
} finally {
if (!success) {
doClose();
}
}
}
我們終于看到的最關鍵的部分了纵朋,慶祝一下柿顶!上面的代碼不用多說,首先是獲取 Java NIO 的 SocketChannel操软,獲取NioSocketChannel 的 newSocket()返回的 SocketChannel 對象嘁锯;然后調用 SocketChannel 的 connect()方法完成 JavaNIO 底層的 Socket 的連接。
最后總結一下聂薪,客戶端 BootStrap 發(fā)起連接請求的流程可以用如下時序圖直觀地展示:
服務端 ServerBootStrap
在分析客戶端的代碼時家乘,我們已經(jīng)對 Bootstrap 啟動 Netty 有了一個大致的認識,那么接下來分析服務器端時藏澳,就會相對簡單一些了仁锯。首先還是來看一下服務器端的啟動代碼:
public class ChatServer {
public void start(int port) throws Exception {
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
...
}
})
.option(ChannelOption.SO_BACKLOG, 128)
.childOption(ChannelOption.SO_KEEPALIVE, true);
System.out.println("服務已啟動,監(jiān)聽端口" + port + "");
// 綁定端口,開始接收進來的連接
ChannelFuture f = b.bind(port).sync();
// 等待服務器 socket 關閉 在這個例子中翔悠,這不會發(fā)生业崖,但你可以優(yōu)雅地關閉你的服務器。
f.channel().closeFuture().sync();
} finally {
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
System.out.println("服務已關閉");
}
}
public static void main(String[] args) {
try {
new ChatServer().start(8080);
} catch (Exception e) {
e.printStackTrace();
}
}
}
服務端基本寫法和客戶端的代碼相比蓄愁,沒有很大的差別双炕,基本上也是進行了如下幾個部分的初始化:
1、EventLoopGroup:不論是服務器端還是客戶端撮抓,都必須指定 EventLoopGroup妇斤。在上面的代碼中,指定了NioEventLoopGroup丹拯,表示一個 NIO 的 EventLoopGroup站超,不過服務器端需要指定兩個 EventLoopGroup,一個是bossGroup乖酬,用于處理客戶端的連接請求死相;另一個是 workerGroup,用于處理與各個客戶端連接的 IO 操作。
2咬像、ChannelType: 指定 Channel 的類型算撮。因為是服務器端双肤,因此使用了 NioServerSocketChannel。
3钮惠、Handler:設置數(shù)據(jù)處理器茅糜。
NioServerSocktChannel 的創(chuàng)建
我們在分析客戶端 Channel 初始化過程時已經(jīng)提到,Channel 是對 Java 底層 Socket 連接的抽象素挽,并且知道了客戶端Channel 的具體類型是 NioSocketChannel蔑赘,那么,服務端的 Channel 類型就是 NioServerSocketChannel 了预明。
那么接下來我們按照分析客戶端的流程對服務器端的代碼也同樣地分析一遍珊泳,這樣也方便我們對比一下服務器端和客戶端有哪些不一樣的地方热某。
通過前面的分析, 我們已經(jīng)知道了,在客戶端中,Channel 類型的指定是在初始化時,通過
Bootstrap 的 channel()方法設置的辫塌,服務端也是同樣的方式合住。
再看服務端代碼西设,我們調用了 ServerBootstarap 的 channel(NioServerSocketChannel.class)方法察绷,傳的參數(shù)是NioServerSocketChannel.class 對象。
如此术辐,按照客戶端代碼同樣的流程砚尽,我們可以確定 NioServerSocketChannel 的
實例化也是通過 ReflectiveChannelFactory 工廠類來完成的,而 ReflectiveChannelFactory 中的 clazz 字段被賦值為NioServerSocketChannel.class辉词,因此當調用 ReflectiveChannelFactory 的 newChannel()方法必孤,就能獲取到一個NioServerSocketChannel 的實例。newChannel()方法的源代碼如下:
public T newChannel() {
// 刪除了 try 塊
return clazz.newInstance();
}
最后我們也來總結一下:
1瑞躺、ServerBootstrap 中的 ChannelFactory 的實現(xiàn)類是 ReflectiveChannelFactory 類敷搪。
2、創(chuàng)建的 Channel 具體類型是 NioServerSocketChannel幢哨。
Channel 的實例化過程赡勘,其實就是調用 ChannelFactory 的 newChannel()方法,而實例化的 Channel 具體類型就是初始化 ServerBootstrap 時傳給 channel()方法的實參嘱么。因此狮含,上面代碼案例中的服務端 ServerBootstrap, 創(chuàng)建的 Channel實例就是 NioServerSocketChannel 的實例顽悼。
服務端 Channel 的初始化
接下來我們來分析 NioServerSocketChannel 的實例化過程曼振,先看一下 NioServerSocketChannel 的類層次結構圖:
首先,我們來跟蹤一下 NioServerSocketChannel 的默認構造蔚龙,和 NioSocketChannel 類似冰评,構造器都是調用 newSocket()來打開一個 Java 的 NIO Socket。不過需要注意的是, 客戶端的 newSocket()方法調用的是 openSocketChannel()木羹,而服務端的 newSocket()調用的是 openServerSocketChannel()甲雅。顧名思義解孙,一個是客戶端的 Java SocketChannel,一個是服務器端的 Java ServerSocketChannel抛人,來看代碼:
public NioServerSocketChannel() {
this(newSocket(DEFAULT_SELECTOR_PROVIDER));
}
private static ServerSocketChannel newSocket(SelectorProvider provider) {
return provider.openServerSocketChannel();
}
接下來會調用重載構造方法:
public NioServerSocketChannel(ServerSocketChannel channel) {
super(null, channel, SelectionKey.OP_ACCEPT);
config = new NioServerSocketChannelConfig(this, javaChannel().socket());
}
這個構造方法中弛姜,調用父類構造方法時傳入的參數(shù)是 SelectionKey.OP_ACCEPT。作為對比妖枚,我們回顧一下廷臼,在客戶端的 Channel 初始化時,傳入的參數(shù)是 SelectionKey.OP_READ绝页。在服務啟動后需要監(jiān)聽客戶端的連接請求荠商,因此在這里我們設置 SelectionKey.OP_ACCEPT,也就是通知 selector 我們對客戶端的連接請求感興趣续誉。
接著和客戶端對比分析一下莱没,會逐級地調用父類的構造器 NioServerSocketChannel -> AbstractNioMessageChannel -> AbstractNioChannel -> AbstractChannel。同樣的酷鸦,在 AbstractChannel 中實例化一個 unsafe 和 pipeline:
protected AbstractChannel(Channel parent) {
this.parent = parent;
id = newId();
unsafe = newUnsafe();
pipeline = newChannelPipeline();
}
不過饰躲,在這里需要注意的是,客戶端的 unsafe 是 AbstractNioByteChannel#NioByteUnsafe 的實例臼隔,而服務端的 unsafe是 AbstractNioMessageChannel.AbstractNioUnsafe 的實例属铁。因為 AbstractNioMessageChannel 重寫了 newUnsafe()方法,其源代碼如下:
protected AbstractNioUnsafe newUnsafe() {
return new NioMessageUnsafe();
}
最后總結一下, 在 NioServerSocketChannel 實例化過程中的執(zhí)行邏輯:
- 1躬翁、調用 NioServerSocketChannel.newSocket(DEFAULT_SELECTOR_PROVIDER)方法打開一個新的 Java NIOServerSocketChannel
- 2焦蘑、AbstractChannel 初始化被賦值是屬性:
parent:設置為 null
unsafe:通過newUnsafe()實例化一個unsafe對象, 類型是AbstractNioMessageChannel#AbstractNioUnsafe
pipeline:創(chuàng)建實例 DefaultChannelPipeline 實例 - 3、AbstractNioChannel 中被賦值的屬性:
ch:賦值為 Java NIO 的 ServerSocketChannel盒发,調用 NioServerSocketChannel 的 newSocket()方法獲取例嘱。
readInterestOp:默認賦值為 SelectionKey.OP_ACCEPT。
ch 設置為非阻塞宁舰,調用 ch.configureBlocking(false)方法拼卵。 - 4、NioServerSocketChannel 中被賦值的屬性:
config = new NioServerSocketChannelConfig(this, javaChannel().socket())
ChannelPipeline 初始化
服務端 ChannelPipeline 的初始化和客戶端一致
服務端 Channel 注冊到 Selector
服務端 Channel 的注冊過程和客戶端一致
bossGroup 與 workerGroup
在客戶端的時候蛮艰,我們初始化了一個 EventLoopGroup 對象腋腮,而在服務端的初始化時,我們設置了兩個
EventLoopGroup壤蚜,一個是 bossGroup即寡,另一個是 workerGroup。那么這兩個 EventLoopGroup 都是干什么用的呢? 接下來我們詳細探究一下袜刷。
其實聪富,bossGroup 只用于服務端的 accept,也就是用于處理客戶端新連接接入請求著蟹。我們可
以把 Netty 比作一個餐館墩蔓,bossGroup 就像一個大堂經(jīng)理梢莽,當客戶來到餐館吃時,大堂經(jīng)理就會引導顧客就坐奸披,為顧客端茶送水等昏名。而 workerGroup 就是實際上干活的廚師,它們負責客戶端連接通道的 IO 操作:當大堂經(jīng)歷接待顧客后阵面,顧客可以稍做休息, 而此時后廚里的廚師們(workerGroup)就開始忙碌地準備飯菜了葡粒。關于 bossGroup 與workerGroup 的關系,我們可以用如下圖來展示:
首先膜钓,服務端的 bossGroup 不斷地監(jiān)聽是否有客戶端的連接嗽交,當發(fā)現(xiàn)有一個新的客戶端連接到來時,bossGroup 就會為此連接初始化各項資源颂斜,然后從 workerGroup 中選出一個 EventLoop 綁定到此客戶端連接中夫壁。那么接下來的服務器與客戶端的交互過程就全部在此分配的 EventLoop 中完成∥执口說無憑盒让,我們還是以源碼說話吧。
首先在 ServerBootstrap 初始化時司蔬,調用了 b.group(bossGroup, workerGroup)設置了兩個 EventLoopGroup邑茄,我們跟蹤進去以后會看到:
public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) {
super.group(parentGroup);
// 此處省略 N 行代碼
this.childGroup = childGroup;
return this;
}
顯然,這個方法初始化了兩個字段俊啼,一個是 group = parentGroup肺缕。它是在 super.group(parentGroup)中完成初始化的,另一個是 childGroup = childGroup授帕。接著從應用程序的啟動代碼來看調用了 b.bind()方法來監(jiān)聽一個本地端口同木。
bind()方法會觸發(fā)如下調用鏈:
AbstractBootstrap.bind() -> AbstractBootstrap.doBind() -> AbstractBootstrap.initAndRegister()
源碼看到到這里為止,我們發(fā)現(xiàn) AbstractBootstrap 的 initAndRegister()方法已經(jīng)是我們的老朋友了跛十,我們在分析客戶端程序時和它打過很多交道彤路,現(xiàn)在再來回顧一下這個方法吧:
final ChannelFuture initAndRegister() {
// 省略異常判斷
Channel channel = channelFactory.newChannel();
init(channel);
// 省略非關鍵代碼
ChannelFuture regFuture = config().group().register(channel);
return regFuture;
}
這里 group()方法返回的是上面我們提到的 bossGroup,而這里的 channel 其實就是 NioServerSocketChannel 的實例芥映,因此我們可以猜測 group().register(channel)將 bossGroup 和 NioServerSocketChannel 應該就關聯(lián)起來了洲尊。那么workerGroup 具體是在哪里與 NioServerSocketChannel 關聯(lián)的呢?我們繼續(xù)往下看 init(channel)方法:
void init(Channel channel) throws Exception {
// 省略參數(shù)判斷
ChannelPipeline p = channel.pipeline();
final EventLoopGroup currentChildGroup = childGroup;
final ChannelHandler currentChildHandler = childHandler;
final Entry<ChannelOption<?>, Object>[] currentChildOptions;
final Entry<AttributeKey<?>, Object>[] currentChildAttrs;
// 省略非關鍵代碼
p.addLast(new ChannelInitializer<Channel>() {
@Override
public void initChannel(Channel ch) throws Exception {
final ChannelPipeline pipeline = ch.pipeline();
ChannelHandler handler = config.handler();
if (handler != null) {
pipeline.addLast(handler);
}
ch.eventLoop().execute(new Runnable() {
@Override
public void run() {
pipeline.addLast(new ServerBootstrapAcceptor(
currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
}
});
}
實際上 init()方法在 ServerBootstrap 中被重寫了奈偏,從上面的代碼片段中我們看到坞嘀,它為 pipeline 中添加了一個
ChannelInitializer,而這個 ChannelInitializer 中添加了一個非常關鍵的 ServerBootstrapAcceptor 的 handler。關于handler 的添加與初始化的過程霎苗,以后再詳細分析∧房裕現(xiàn)在榛做,我們來關注一下 ServerBootstrapAcceptor類唁盏。在 ServerBootstrapAcceptor 中重寫了 channelRead()方法内狸,其主要代碼如下:
public void channelRead(ChannelHandlerContext ctx, Object msg) {
final Channel child = (Channel) msg;
child.pipeline().addLast(childHandler);
// 省略非關鍵代碼
childGroup.register(child).addListener(...);
}
ServerBootstrapAcceptor 中的 childGroup 是構造此對象是傳入的 currentChildGroup,也就是 workerGroup 對象厘擂。
而這里的 Channel 是 NioSocketChannel 的實例昆淡,因此這里的 childGroup 的 register()方法就是將 workerGroup 中的某個 EventLoop 和 NioSocketChannel 關聯(lián)上了。既然如此刽严,那么現(xiàn)在的問題是 ServerBootstrapAcceptor 的channelRead()方法是在哪里被調用的呢? 其實當一個 client 連接到 server 時昂灵,Java 底層 NIO 的 ServerSocketChannel就會有一個 SelectionKey.OP_ACCEPT 的事件就緒,接著就會調用到 NioServerSocketChannel 的 doReadMessages()方法:
protected int doReadMessages(List<Object> buf) throws Exception {
SocketChannel ch = javaChannel().accept();
// 省略異常處理
buf.add(new NioSocketChannel(this, ch));
return 1;
// 省略錯誤處理
}
在 doReadMessages()方法中舞萄,通過調用 javaChannel().accept()方法獲取到客戶端新連接的 SocketChannel 對象眨补,緊接著就實例化一個 NioSocketChannel,并且傳入 NioServerSocketChannel 對象(即 this)倒脓。由此可知撑螺,我們創(chuàng)建的這個NioSocketChannel 的父類 Channel 就是 NioServerSocketChannel 實例。接下來就經(jīng)由 Netty 的 ChannelPipeline 機制崎弃,將讀取事件逐級發(fā)送到各個 handler 中甘晤,于是就會觸發(fā)前面我們提到的 ServerBootstrapAcceptor 的 channelRead()方法。
服務端 Selector 事件輪詢
再回到服務端 ServerBootstrap 的啟動代碼饲做,是從 bind()方法開始的线婚。ServerBootstrap 的 bind()方法實際上就是其父類 AbstractBootstrap 的 bind()方法,來看代碼:
private static void doBind0(
final ChannelFuture regFuture, final Channel channel,
final SocketAddress localAddress, final ChannelPromise promise) {
channel.eventLoop().execute(new Runnable() {
@Override
public void run() {
if (regFuture.isSuccess()) {
channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
} else {
promise.setFailure(regFuture.cause());
}
}
});
}
在 doBind0()方法中盆均,調用的是 EventLoop 的 execute()方法塞弊,我們繼續(xù)跟進去:
public void execute(Runnable task) {
//省略了空判斷
boolean inEventLoop = inEventLoop();
if (inEventLoop) {
addTask(task);
} else {
startThread();
addTask(task);
//省略刪除任務的邏輯
}
//省略判斷邏輯
}
在 execute()主要就是創(chuàng)建線程,將線程添加到 EventLoop 的無鎖化串行任務隊列泪姨。我們重點關注 startThread()方法居砖,繼續(xù)看源代碼:
private void startThread() {
if (state == ST_NOT_STARTED) {
if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
doStartThread();
}
}
}
private void doStartThread() {
//省略部分代碼
SingleThreadEventExecutor.this.run();
//省略部分代碼
}
我們發(fā)現(xiàn) startThread()最終調用的是 SingleThreadEventExecutor.this.run()方法,這個 this 就是 NioEventLoop 對象:
終于看到似曾相識的代碼驴娃。上面代碼主要就是用一個死循環(huán)奏候,在不斷地輪詢 SelectionKey。select()方法唇敞,主要用來解決 JDK 空輪訓 Bug蔗草,而 processSelectedKeys()就是針對不同的輪詢事件進行處理。如果客戶端有數(shù)據(jù)寫入疆柔,最終也會調用 AbstractNioMessageChannel 的 doReadMessages()方法咒精。
總結一下:
1、Netty 中 Selector 事件輪詢是從 EventLoop 的 execute()方法開始的旷档。
2模叙、在 EventLoop 的 execute()方法中,會為每一個任務創(chuàng)建一個獨立的線程鞋屈,并保存到無鎖化串行任務隊列范咨。
3故觅、線程任務隊列的每個任務實際調用的是 NioEventLoop 的 run()方法。
4渠啊、在 run 方法中調用 processSelectedKeys()處理輪詢事件输吏。
Netty 解決 JDK 空輪訓 Bug
各位應該早有耳聞臭名昭著的 Java NIO epoll 的 bug,它會導致 Selector 空輪詢替蛉,最終導致 CPU 100%贯溅。官方聲稱在JDK1.6 版本的 update18 修復了該問題,但是直到 JDK1.7 版本該問題仍舊存在躲查,只不過該 BUG 發(fā)生概率降低了一些而已它浅,它并沒有被根本解決。出現(xiàn)此 Bug 是因為當 Selector 的輪詢結果為空镣煮,也沒有 wakeup 或新消息處理罚缕,則發(fā)生空輪詢,CPU 使用率達到 100%怎静。我們來看下這個問題在 issue 中的原始描述:
This is an issue with poll (and epoll) on Linux. If a file descriptor for a connected socket is polled with a request
event mask of 0, and if the connection is abruptly terminated (RST) then the poll wakes up with the POLLHUP (and
maybe POLLERR) bit set in the returned event set. The implication of this behaviour is that Selector will wakeup and
as the interest set for the SocketChannel is 0 it means there aren't any selected events and the select method
returns 0.
具體解釋為:在部分 Linux 的 2.6 的 kernel 中邮弹,poll 和 epoll 對于突然中斷的連接 socket 會對返回的 eventSet 事件集合置為 POLLHUP,也可能是 POLLERR蚓聘,eventSet 事件集合發(fā)生了變化腌乡,這就可能導致 Selector 會被喚醒。
這是與操作系統(tǒng)機制有關系的夜牡,JDK 雖然僅僅是一個兼容各個操作系統(tǒng)平臺的軟件与纽,但很遺憾在 JDK5 和 JDK6 最初的版本中(嚴格意義上來將,JDK 部分版本都是)塘装,這個問題并沒有解決急迂,而將這個帽子拋給了操作系統(tǒng)方,這也就是這個 bug 最終一直到 2013 年才最終修復的原因蹦肴。
在 Netty 中最終的解決辦法是:創(chuàng)建一個新的 Selector僚碎,將可用事件重新注冊到新的 Selector 中來終止空輪訓∫趸希回顧事件輪詢的關鍵代碼:
protected void run() {
for (; ; ) {
switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
case SelectStrategy.CONTINUE:
continue;
case SelectStrategy.SELECT:
select(wakenUp.getAndSet(false));
//省略 select 的喚醒邏輯
default:
}
//事件輪詢處理邏輯
}
}
前面我們有提到 select()方法解決了 JDK 空輪訓的 Bug勺阐,它到底是如何解決的呢?下面我們來一探究竟矛双,進入 select()方法的源碼:
public final class NioEventLoop extends SingleThreadEventLoop {
...
int selectorAutoRebuildThreshold = SystemPropertyUtil.getInt("io.netty.selectorAutoRebuildThreshold", 512);
//省略判斷代碼
SELECTOR_AUTO_REBUILD_THRESHOLD =selectorAutoRebuildThreshold;
...
private void select(boolean oldWakenUp) throws IOException {
Selector selector = this.selector;
long currentTimeNanos = System.nanoTime();
for (; ; ) {
//省略非關鍵代碼
long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;
int selectedKeys = selector.select(timeoutMillis);
selectCnt++;
//省略非關鍵代碼
long time = System.nanoTime();
if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) {
// timeoutMillis elapsed without anything selected.
selectCnt = 1;
} else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&
selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {
// 日志打印代碼
rebuildSelector();
selector = this.selector;
// Select again to populate selectedKeys.
selector.selectNow();
selectCnt = 1;
break;
}
currentTimeNanos = time;
}
//省略非關鍵代碼
}
}
從上面的代碼中可以看出,Selector 每一次輪詢都計數(shù) selectCnt++渊抽,開始輪詢會計時賦值給 timeoutMillis,輪詢完成會計時賦值給 time议忽,這兩個時間差會有一個時間差懒闷,而這個時間差就是每次輪詢所消耗的時間。從上面的的邏輯看出,如果每次輪詢消耗的時間為 0愤估,且重復次數(shù)超過 512 次帮辟,則調用 rebuildSelector()方法,即重構 Selector灵疮。我們跟進到源碼中就會發(fā)現(xiàn):
public void rebuildSelector() {
//省略判斷語句
rebuildSelector0();
}
private void rebuildSelector0() {
final Selector oldSelector = selector;
final SelectorTuple newSelectorTuple;
newSelectorTuple = openSelector();
//省略非關鍵代碼
// Register all channels to the new Selector.
int nChannels = 0;
for (SelectionKey key : oldSelector.keys()) {
//省略非關鍵代碼和異常處理
key.cancel();
SelectionKey newKey = key.channel().register(newSelectorTuple.unwrappedSelector, interestOps, a);
}
//省略非關鍵代碼
}
在 rebuildSelector()方法中织阅,主要做了三件事情:
1壳繁、創(chuàng)建一個新的 Selector震捣。
2、將原來 Selector 中注冊的事件全部取消闹炉。
3蒿赢、將可用事件重新注冊到新的 Selector 中,并激活渣触。
Netty 對 Selector 中 KeySet 的優(yōu)化
分析完 Netty 對 JDK 空輪訓 Bug 的解決方案羡棵,接下來我們再來看一個很有意思的細節(jié)。Netty 對 Selector 中存儲SelectionKey 的 HashSet 也做了優(yōu)化嗅钻。在前面的分析中皂冰,Netty 對 Selector 有重構,創(chuàng)建一個新的 Selector 其實是調用 openSelector()方法养篓,來看代碼:
private void rebuildSelector0() {
final Selector oldSelector = selector;
final SelectorTuple newSelectorTuple;
newSelectorTuple = openSelector();
//省略非關鍵代碼
}
private SelectorTuple openSelector() {
final Selector unwrappedSelector;
//省略異常處理代碼
unwrappedSelector = provider.openSelector();
//省略非關鍵代碼
final SelectedSelectionKeySet selectedKeySet = new SelectedSelectionKeySet();
Object maybeSelectorImplClass = AccessController.doPrivileged(new PrivilegedAction<Object>() {
@Override
public Object run() {
return Class.forName(
"sun.nio.ch.SelectorImpl",
false,
PlatformDependent.getSystemClassLoader());
//省略異常處理代碼
}
});
//省略非關鍵代碼
final Class<?> selectorImplClass = (Class<?>) maybeSelectorImplClass;
Object maybeException = AccessController.doPrivileged(new PrivilegedAction<Object>() {
@Override
public Object run() {
Field selectedKeysField = selectorImplClass.getDeclaredField("selectedKeys");
Field publicSelectedKeysField = selectorImplClass.getDeclaredField("publicSelectedKeys");
//省略非關鍵代碼
selectedKeysField.set(unwrappedSelector, selectedKeySet);
publicSelectedKeysField.set(unwrappedSelector, selectedKeySet);
return null;
//省略異常處理代碼
}
});
//省略非關鍵代碼
}
上面代碼的主要功能就是利用反射機制秃流,獲取到 JDK 底層的 Selector 的 class 對象,用反射方法從 class 對象中獲得兩個字段 selectedKeys 和 publicSelectedKeys柳弄,這兩個字段就是用來存儲已注冊事件的舶胀。然后,將這個兩個對象重新賦值為 Netty 創(chuàng)建的 SelectedSelectionKeySet碧注,是不是有種“偷梁換柱”的感覺嚣伐?
我們先來看 selectedKeys 和 publicSelectedKeys 到底是什么類型,打開 SelectorImpl 的源碼萍丐,看起構造方法:
public abstract class SelectorImpl extends AbstractSelector {
protected Set<SelectionKey> selectedKeys = new HashSet();
protected HashSet<SelectionKey> keys = new HashSet();
private Set<SelectionKey> publicKeys;
private Set<SelectionKey> publicSelectedKeys;
protected SelectorImpl(SelectorProvider var1) {
//省略非關鍵代碼
this.publicKeys = this.keys;
this.publicSelectedKeys = this.selectedKeys;
//省略非關鍵代碼
}
...
}
我們發(fā)現(xiàn) selectedKeys 和 publicSelectedKeys 就是 HashSet轩端。下面我們再來看 Netty 創(chuàng)建的 SelectedSelectionKeySet對象的源代碼:
final class SelectedSelectionKeySet extends AbstractSet<SelectionKey> {
SelectionKey[] keys;
int size;
SelectedSelectionKeySet() {
keys = new SelectionKey[1024];
}
@Override
public boolean add(SelectionKey o) {
if (o == null) {
return false;
}
keys[size++] = o;
if (size == keys.length) {
increaseCapacity();
}
return true;
}
@Override
public int size() {
return size;
}
@Override
public boolean remove(Object o) {
return false;
}
@Override
public boolean contains(Object o) {
return false;
}
@Override
public Iterator<SelectionKey> iterator() {
throw new UnsupportedOperationException();
}
void reset() {
reset(0);
}
void reset(int start) {
Arrays.fill(keys, start, size, null);
size = 0;
}
private void increaseCapacity() {
SelectionKey[] newKeys = new SelectionKey[keys.length << 1];
System.arraycopy(keys, 0, newKeys, 0, size);
keys = newKeys;
}
}
源碼篇幅不長 ,但很精辟逝变。SelectedSelectionKeySet 同樣繼承了 AbstractSet船万,因此賦值給 selectedKeys 和publicSelectedKeys 不存在類型強制轉換的問題。細心的小伙伴應該已經(jīng)發(fā)現(xiàn)在 SelectedSelectionKeySet 中禁用了remove()方法骨田、contains()方法和 iterator()方法耿导,只保留 add()方法,而且底層存儲結構用的是數(shù)組 SelectionKey[] keys态贤。
那么舱呻,Netty 為什么要這樣設計呢?主要目的還是簡化我們在輪詢事件時的操作,不需要每次輪詢都要移除 key箱吕。
Handler 的添加過程
服務端 handler 的添加過程和客戶端的有點區(qū)別芥驳,跟 EventLoopGroup 一樣服務端的 handler 也有兩個:一個是通過handler()方法設置的 handler,另一個是通過 childHandler()方法設置的 childHandler茬高。通過前面的 bossGroup 和workerGroup 的分析兆旬,其實我們在這里可以大膽地猜測:handler 與 accept 過程有關。即 handler 負責處理客戶端新連接接入的請求怎栽;而 childHandler 就是負責和客戶端連接的 IO 交互丽猬。那么實際上是不是這樣的呢?我們繼續(xù)用代碼來證明熏瞄。在前面章節(jié)我們已經(jīng)了解 ServerBootstrap 重寫了 init()方法脚祟,在這個方法中也添加了 handler:
void init(Channel channel) throws Exception {
// 省去邏輯判斷
ChannelPipeline p = channel.pipeline();
final EventLoopGroup currentChildGroup = childGroup;
final ChannelHandler currentChildHandler = childHandler;
final Entry<ChannelOption<?>, Object>[] currentChildOptions;
final Entry<AttributeKey<?>, Object>[] currentChildAttrs;
p.addLast(new ChannelInitializer<Channel>() {
@Override
public void initChannel(Channel ch) throws Exception {
final ChannelPipeline pipeline = ch.pipeline();
ChannelHandler handler = config.handler();
if (handler != null) {
pipeline.addLast(handler);
}
ch.eventLoop().execute(new Runnable() {
@Override
public void run() {
pipeline.addLast(new ServerBootstrapAcceptor(
currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
}
});
}
在上面代碼的 initChannel()方法中,首先通過 handler()方法獲取一個 handler强饮,如果獲取的 handler 不為空由桌,則添加到pipeline 中。然后接著邮丰,添加了一個 ServerBootstrapAcceptor 的實例行您。那么這里的 handler()方法返回的是哪個對象呢? 其實它返回的是 handler 字段,而這個字段就是我們在服務器端的啟動代碼中設置的:
b.group(bossGroup, workerGroup)
那么這個時候, pipeline 中的 handler 情況如下:
根據(jù)我們原來客戶端代碼的分析來剪廉,我們指定 channel 綁定到 eventLoop(在這里是指 NioServerSocketChannel 綁定到 bossGroup)后娃循,會在 pipeline 中觸發(fā)fireChannelRegistered事件,接著就會觸發(fā)對 ChannelInitializer的 initChannel()方法的調用妈经。因此在綁定完成后淮野,此時的 pipeline 的內如下:
在前面我們分析 bossGroup 和 workerGroup 時,已經(jīng)知道了 ServerBootstrapAcceptor 的 channelRead()方法會為新建的 Channel 設置 handler 并注冊到一個 eventLoop 中骤星,即:
public void channelRead(ChannelHandlerContext ctx, Object msg) {
final Channel child = (Channel) msg;
child.pipeline().addLast(childHandler);
// 省去非關鍵代碼
childGroup.register(child).addListener(...);
}
而這里的 childHandler 就是我們在服務器端啟動代碼中設置的 handler:
...
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel (SocketChannel ch) throws Exception {
...
}
})
后續(xù)的步驟我們基本上已經(jīng)清楚了潭袱,當客戶端連接 Channel 注冊后嘉抓,就會觸發(fā) ChannelInitializer 的 initChannel()方法的調用截汪。最后我們來總結一下服務端 handler 與 childHandler 的區(qū)別與聯(lián)系:
1医咨、在服務器 NioServerSocketChannel 的 pipeline 中添加的是 handler 與 ServerBootstrapAcceptor委造。
2跑筝、當有新的客戶端連接請求時,調用 ServerBootstrapAcceptor 的 channelRead()方法創(chuàng)建此連接的
NioSocketChannel 并添加 childHandler 到 NioSocketChannel 對應的 pipeline 中,并將此 channel 綁定到
workerGroup 中的某個 eventLoop 中稀并。
3仅颇、handler 是在 accept 階段起作用,它處理客戶端的連接請求碘举。
4忘瓦、childHandler 是在客戶端連接建立以后起作用,它負責客戶端連接的 IO 交互引颈。
最后來看一張圖耕皮,加深理解。下圖描述了服務端從啟動初始化到有新連接接入的變化過程:
——學自咕泡學院