原文出處http://cmsblogs.com/ 『chenssy』
轉(zhuǎn)載請(qǐng)注明原創(chuàng)出處颖榜,謝謝!
上篇博客(【死磕Netty】----Netty的核心組件及其設(shè)計(jì)),了解了 Netty 的核心組件及其設(shè)計(jì),但是這些都是零散的,不成體系姥闭。那么 Netty 是如何利用這些組件構(gòu)建成一個(gè)高性能的異步通信框架。通過(guò)這篇博客可以初步了解越走。
下面先來(lái)一段 Netty 服務(wù)端的代碼:
public class NettyServer {
public void bind(int port){
// 創(chuàng)建EventLoopGroup
EventLoopGroup bossGroup = new NioEventLoopGroup(); //創(chuàng)建BOSS線程組 用于服務(wù)端接受客戶端的連接
EventLoopGroup workerGroup = new NioEventLoopGroup(); //創(chuàng)建WORK線程組 用于進(jìn)行SocketChannel的網(wǎng)絡(luò)讀寫(xiě)
try {
// 創(chuàng)建ServerBootStrap實(shí)例
// ServerBootstrap 用于啟動(dòng)NIO服務(wù)端的輔助啟動(dòng)類棚品,目的是降低服務(wù)端的開(kāi)發(fā)復(fù)雜度
ServerBootstrap b = new ServerBootstrap();
// 綁定Reactor線程池
b.group(bossGroup, workerGroup)
// 設(shè)置并綁定服務(wù)端Channel
// 指定所使用的NIO傳輸?shù)腃hannel
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 1024)
.handler(new LoggingServerHandler())
.childHandler(new ChannelInitializer(){
@Override
protected void initChannel(Channel ch) throws Exception {
//do something
}
});
// 綁定端口,同步等待成功
ChannelFuture future = b.bind(port).sync();
// 等待服務(wù)端監(jiān)聽(tīng)端口關(guān)閉
future.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
// 優(yōu)雅地關(guān)閉
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
private class LoggingServerHandler extends ChannelInboundHandlerAdapter{
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("loggin-channelActive");
}
@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
System.out.println("loggin-channelRegistered");
}
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
System.out.println("loggin-handlerAdded");
}
}
public static void main(String[] args){
new NettyServer().bind(8899);
}
}
上面代碼為 Netty 服務(wù)器端的完整代碼廊敌,在整個(gè)服務(wù)端代碼中會(huì)涉及如下幾個(gè)核心類铜跑。
ServerBootstrap
ServerBootstrap 為 Netty 服務(wù)端的啟動(dòng)輔助類,它提供了一系列的方法用于設(shè)置服務(wù)端啟動(dòng)相關(guān)的參數(shù)骡澈。
Channel
Channel 為 Netty 網(wǎng)絡(luò)操作抽象類锅纺,它定義了一組功能,其提供的 API 大大降低了直接使用 Socket 類的復(fù)雜性肋殴。當(dāng)然它也不僅僅只是包括了網(wǎng)絡(luò) IO 操作的基本功能囤锉,還包括一些與 Netty 框架相關(guān)的功能坦弟,包括獲取該 Channel 的 EventLoop 等等。
EventLoopGroup
EventLoopGroup 為 Netty 的 Reactor 線程池官地,它實(shí)際上就是 EventLoop 的容器减拭,而 EventLoop 為 Netty 的核心抽象類,它的主要職責(zé)是處理所有注冊(cè)到本線程多路復(fù)用器 Selector 上的 Channel区丑。
ChannelHandler
ChannelHandler 作為 Netty 的主要組件,它主要負(fù)責(zé) I/O 事件或者 I/O 操作進(jìn)行攔截和處理修陡,它可以選擇性地?cái)r截和處理自己感覺(jué)興趣的事件沧侥,也可以透?jìng)骱徒K止事件的傳遞。
ChannelPipeline
ChannelPipeline 是 ChannelHandler 鏈的容器魄鸦,它負(fù)責(zé) ChannelHandler 的管理和事件攔截與調(diào)度宴杀。每當(dāng)新建一個(gè) Channel 都會(huì)分配一個(gè)新的 ChannelPepeline,同時(shí)這種關(guān)聯(lián)是永久性的拾因。
以上是簡(jiǎn)要介紹旺罢,詳細(xì)介紹請(qǐng)參考(【死磕Netty】-----Netty的核心組件及其設(shè)計(jì))
服務(wù)端創(chuàng)建流程
Netty 服務(wù)端創(chuàng)建的時(shí)序圖,如下(摘自《Netty權(quán)威指南(第二版)》)
主要步驟為:
- 創(chuàng)建 ServerBootstrap 實(shí)例
- 設(shè)置并綁定 Reactor 線程池
- 設(shè)置并綁定服務(wù)端 Channel
- 創(chuàng)建并初始化 ChannelPipeline
- 添加并設(shè)置 ChannelHandler
- 綁定并啟動(dòng)監(jiān)聽(tīng)端口
服務(wù)端源碼分析
1绢记、創(chuàng)建兩個(gè)EventLoopGroup
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
bossGroup 為 BOSS 線程組扁达,用于服務(wù)端接受客戶端的連接, workerGroup 為 worker 線程組,用于進(jìn)行 SocketChannel 的網(wǎng)絡(luò)讀寫(xiě)蠢熄。當(dāng)然也可以創(chuàng)建一個(gè)并共享跪解。
2、創(chuàng)建ServerBootstrap實(shí)例
ServerBootstrap b = new ServerBootstrap();
ServerBootStrap為Netty服務(wù)端的啟動(dòng)引導(dǎo)類签孔,用于幫助用戶快速配置叉讥、啟動(dòng)服務(wù)端服務(wù)。提供的方法如下:
方法名稱 | 方法描述 |
---|---|
group |
設(shè)置 ServerBootstrap 要用的 EventLoopGroup |
channel |
設(shè)置將要被實(shí)例化的 ServerChannel 類 |
option |
實(shí)例化的 ServerChannel 的配置項(xiàng) |
childHandler |
設(shè)置并添加 ChannelHandler |
bind |
綁定 ServerChannel |
ServerBootStrap底層采用裝飾者模式饥追。
關(guān)于 ServerBootStrap 我們后續(xù)做詳細(xì)分析图仓。
3、設(shè)置并綁定Reactor線程池
調(diào)用 group()
方法但绕,為 ServerBootstrap 實(shí)例設(shè)置并綁定 Reactor 線程池救崔。
b.group(bossGroup, workerGroup)
EventLoopGroup 為 Netty 線程池,它實(shí)際上就是 EventLoop 的數(shù)組容器捏顺。EventLoop 的職責(zé)是處理所有注冊(cè)到本線程多路復(fù)用器 Selector 上的 Channel帚豪,Selector 的輪詢操作由綁定的 EventLoop 線程 run 方法驅(qū)動(dòng),在一個(gè)循環(huán)體內(nèi)循環(huán)執(zhí)行草丧。通俗點(diǎn)講就是一個(gè)死循環(huán)狸臣,不斷的檢測(cè) I/O 事件、處理 I/O 事件昌执。
這里設(shè)置了兩個(gè)group烛亦,這個(gè)其實(shí)有點(diǎn)兒像我們工作一樣诈泼。需要兩類型的工人,一個(gè)老板(bossGroup),一個(gè)工人(workerGroup)煤禽,老板負(fù)責(zé)從外面接活铐达,工人則負(fù)責(zé)死命干活(尼瑪,和我上家公司一模一樣)檬果。所以這里 bossGroup 的作用就是不斷地接收新的連接瓮孙,接收之后就丟給 workerGroup 來(lái)處理,workerGroup 負(fù)責(zé)干活就行(負(fù)責(zé)客戶端連接的 IO 操作)选脊。
源碼如下:
public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) {
super.group(parentGroup); // 綁定boosGroup
if (childGroup == null) {
throw new NullPointerException("childGroup");
}
if (this.childGroup != null) {
throw new IllegalStateException("childGroup set already");
}
this.childGroup = childGroup; // 綁定workerGroup
return this;
}
其中父 EventLoopGroup 傳遞到父類的構(gòu)造函數(shù)中:
public B group(EventLoopGroup group) {
if (group == null) {
throw new NullPointerException("group");
}
if (this.group != null) {
throw new IllegalStateException("group set already");
}
this.group = group;
return (B) this;
}
4杭抠、設(shè)置并綁定服務(wù)端Channel
綁定線程池后,則需要設(shè)置 channel 類型恳啥,服務(wù)端用的是 NioServerSocketChannel 偏灿。
.channel(NioServerSocketChannel.class)
調(diào)用 ServerBootstrap.channel
方法用于設(shè)置服務(wù)端使用的 Channel,傳遞一個(gè) NioServerSocketChannel Class對(duì)象钝的,Netty通過(guò)工廠類翁垂,利用反射創(chuàng)建NioServerSocketChannel 對(duì)象,如下:
public B channel(Class<? extends C> channelClass) {
if (channelClass == null) {
throw new NullPointerException("channelClass");
}
return channelFactory(new ReflectiveChannelFactory<C>(channelClass));
}
channelFactory()
用于設(shè)置 Channel 工廠的:
public B channelFactory(io.netty.channel.ChannelFactory<? extends C> channelFactory) {
return channelFactory((ChannelFactory<C>) channelFactory);
}
public B channelFactory(ChannelFactory<? extends C> channelFactory) {
if (channelFactory == null) {
throw new NullPointerException("channelFactory");
}
if (this.channelFactory != null) {
throw new IllegalStateException("channelFactory set already");
}
this.channelFactory = channelFactory;
return (B) this;
}
這里傳遞的是 ReflectiveChannelFactory硝桩,其源代碼如下:
public class ReflectiveChannelFactory<T extends Channel> implements ChannelFactory<T> {
private final Class<? extends T> clazz;
public ReflectiveChannelFactory(Class<? extends T> clazz) {
if (clazz == null) {
throw new NullPointerException("clazz");
}
this.clazz = clazz;
}
//需要?jiǎng)?chuàng)建 channel 的時(shí)候沿猜,該方法將被調(diào)用
@Override
public T newChannel() {
try {
// 反射創(chuàng)建對(duì)應(yīng) channel
return clazz.newInstance();
} catch (Throwable t) {
throw new ChannelException("Unable to create Channel from class " + clazz, t);
}
}
@Override
public String toString() {
return StringUtil.simpleClassName(clazz) + ".class";
}
}
確定服務(wù)端的 Channel(NioServerSocketChannel)后,調(diào)用 option()
方法設(shè)置 Channel 參數(shù)碗脊,作為服務(wù)端邢疙,主要是設(shè)置TCP的backlog參數(shù),如下:
.option(ChannelOption.SO_BACKLOG, 1024)
option()
源碼如下:
public <T> B option(ChannelOption<T> option, T value) {
if (option == null) {
throw new NullPointerException("option");
}
if (value == null) {
synchronized (options) {
options.remove(option);
}
} else {
synchronized (options) {
options.put(option, value);
}
}
return (B) this;
}
private final Map<ChannelOption<?>, Object> options = new LinkedHashMap<ChannelOption<?>, Object>();
五望薄、添加并設(shè)置ChannelHandler
設(shè)置完 Channel 參數(shù)后疟游,用戶可以為啟動(dòng)輔助類和其父類分別指定 Handler。
.handler(new LoggingServerHandler())
.childHandler(new ChannelInitializer(){
//省略代碼
})
這兩個(gè) Handler 不一樣痕支,前者(handler()
)設(shè)置的 Handler 是服務(wù)端 NioServerSocketChannel的颁虐,后者(childHandler()
)設(shè)置的 Handler 是屬于每一個(gè)新建的 NioSocketChannel 的。跟蹤源代碼會(huì)發(fā)現(xiàn)兩種所處的類不一樣卧须,handler 位于 AbstractBootstrap 中另绩,childHandler 位于 ServerBootstrap 中,如下:
// AbstractBootstrap
public B handler(ChannelHandler handler) {
if (handler == null) {
throw new NullPointerException("handler");
}
this.handler = handler;
return (B) this;
}
// ServerBootstrap
public ServerBootstrap childHandler(ChannelHandler childHandler) {
if (childHandler == null) {
throw new NullPointerException("childHandler");
}
this.childHandler = childHandler;
return this;
}
ServerBootstrap 中的 Handler 是 NioServerSocketChannel 使用的花嘶,所有連接該監(jiān)聽(tīng)端口的客戶端都會(huì)執(zhí)行它笋籽,父類 AbstractBootstrap 中的 Handler 是一個(gè)工廠類,它為每一個(gè)新接入的客戶端都創(chuàng)建一個(gè)新的 Handler椭员。如下圖(《Netty權(quán)威指南(第二版)》):
[圖片上傳失敗...(image-bcab2c-1512393901237)]
六车海、綁定端口,啟動(dòng)服務(wù)
服務(wù)端最后一步隘击,綁定端口并啟動(dòng)服務(wù)侍芝,如下:
ChannelFuture future = b.bind(port).sync();
調(diào)用 ServerBootstrap 的 bind()
方法進(jìn)行端口綁定:
public ChannelFuture bind(int inetPort) {
return bind(new InetSocketAddress(inetPort));
}
public ChannelFuture bind(SocketAddress localAddress) {
validate();
if (localAddress == null) {
throw new NullPointerException("localAddress");
}
return doBind(localAddress);
}
首先調(diào)用 validate()
方法進(jìn)行參數(shù)校驗(yàn)研铆,然后調(diào)用 doBind()
方法:
private ChannelFuture doBind(final SocketAddress localAddress) {
// 初始化并注冊(cè)一個(gè)Channel
final ChannelFuture regFuture = initAndRegister();
final Channel channel = regFuture.channel();
if (regFuture.cause() != null) {
return regFuture;
}
// 注冊(cè)成功
if (regFuture.isDone()) {
// At this point we know that the registration was complete and successful.
ChannelPromise promise = channel.newPromise();
// 調(diào)用doBind0綁定
doBind0(regFuture, channel, localAddress, promise);
return promise;
} else {
// Registration future is almost always fulfilled already, but just in case it's not.
final AbstractBootstrap.PendingRegistrationPromise promise = new AbstractBootstrap.PendingRegistrationPromise(channel);
regFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
Throwable cause = future.cause();
if (cause != null) {
// Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
// IllegalStateException once we try to access the EventLoop of the Channel.
promise.setFailure(cause);
} else {
// Registration was successful, so set the correct executor to use.
// See https://github.com/netty/netty/issues/2586
promise.registered();
doBind0(regFuture, channel, localAddress, promise);
}
}
});
return promise;
}
}
該方法涉及內(nèi)容較多,我們分解來(lái)看州叠,如下:
- 首先通過(guò)
initAndRegister()
得到一個(gè) ChannelFuture 對(duì)象 regFuture棵红; - 根據(jù)得到的 regFuture 對(duì)象判斷該對(duì)象是否拋出異常 (
regFuture.cause()
),如果是咧栗,直接返回逆甜; - 根據(jù)
regFuture.isDone()
判斷initAndRegister()
是否執(zhí)行完畢,如果執(zhí)行完成致板,則調(diào)用doBind0
交煞; - 若
initAndRegister()
沒(méi)有執(zhí)行完畢,則向 regFuture 對(duì)象添加一個(gè) ChannelFutureListener 監(jiān)聽(tīng)可岂,當(dāng)initAndRegister()
執(zhí)行完畢后會(huì)調(diào)用operationComplete()
,在operationComplete()
中依然會(huì)判斷 ChannelFuture 是否拋出異常翰灾,如果沒(méi)有則調(diào)用doBind0
進(jìn)行綁定缕粹。
按照上面的步驟我們一步一步來(lái)剖析 doBind()
方法。
initAndRegister()
執(zhí)行 initAndRegister()
會(huì)得到一個(gè) ChannelFuture 對(duì)象 regFuture纸淮,代碼如下:
final ChannelFuture initAndRegister() {
Channel channel = null;
try {
// 新建一個(gè)Channel
channel = channelFactory.newChannel();
// 初始化Channel
init(channel);
} catch (Throwable t) {
if (channel != null) {
channel.unsafe().closeForcibly();
}
return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
}
// /向EventLoopGroup中注冊(cè)一個(gè)channel
ChannelFuture regFuture = config().group().register(channel);
if (regFuture.cause() != null) {
if (channel.isRegistered()) {
channel.close();
} else {
channel.unsafe().closeForcibly();
}
}
return regFuture;
}
首先調(diào)用 newChannel()
新建一個(gè)Channel平斩,這里是NioServerSocketChannel,還記前面 4咽块、設(shè)置并綁定服務(wù)端Channel(.channel(NioServerSocketChannel.class)
)中 設(shè)置的Channel工廠類么绘面?在這里派上用處了。在上面提到了通過(guò)反射的機(jī)制我們可以得到一個(gè) NioServerSocketChannel 類的實(shí)例侈沪。那么 NioServerSocketChannel 到底是一個(gè)什么東西呢揭璃?如下圖:
上圖是 NioServerSocketChannel 的繼承體系結(jié)構(gòu)圖, NioServerSocketChannel 在構(gòu)造函數(shù)中會(huì)依靠父類來(lái)完成一項(xiàng)一項(xiàng)的初始化工作亭罪。先看 NioServerSocketChannel 構(gòu)造函數(shù)瘦馍。
public NioServerSocketChannel() {
this(newSocket(DEFAULT_SELECTOR_PROVIDER));
}
newSocket()
方法較為簡(jiǎn)單,它是利用 SelectorProvider.openServerSocketChannel()
应役,產(chǎn)生一個(gè) ServerSocketChannel 對(duì)象情组。
public NioServerSocketChannel(ServerSocketChannel channel) {
super(null, channel, SelectionKey.OP_ACCEPT);
config = new NioServerSocketChannelConfig(this, javaChannel().socket());
}
該構(gòu)造函數(shù)首先是調(diào)用父類的構(gòu)造方法,然后設(shè)置 config屬性箩祥。父類構(gòu)造方法如下:
// AbstractNioMessageChannel
protected AbstractNioMessageChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
super(parent, ch, readInterestOp);
}
// AbstractNioChannel
protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
super(parent);
this.ch = ch;
this.readInterestOp = readInterestOp;
try {
ch.configureBlocking(false);
} catch (IOException e) {
try {
ch.close();
} catch (IOException e2) {
if (logger.isWarnEnabled()) {
logger.warn(
"Failed to close a partially initialized socket.", e2);
}
}
throw new ChannelException("Failed to enter non-blocking mode.", e);
}
}
// AbstractChannel
protected AbstractChannel(Channel parent) {
this.parent = parent;
id = newId();
unsafe = newUnsafe();
pipeline = newChannelPipeline();
}
通過(guò) super()
院崇,一層一層往上,直到 AbstractChannel袍祖。我們從最上層解析底瓣。
- AbstractChannel 設(shè)置了 unsafe (
unsafe = newUnsafe()
)和 pipeline(pipeline = newChannelPipeline()
); - AbstractNioChannel 將當(dāng)前 ServerSocketChannel 設(shè)置成了非阻塞(
ch.configureBlocking(false);
)蕉陋,同時(shí)設(shè)置SelectionKey.OP_ACCEPT事件(this.readInterestOp = readInterestOp;
readInterestOp 值由 NioServerSocketChannel 中傳遞)键耕; - NioServerSocketChannel 設(shè)置 config屬性(
config = new NioServerSocketChannelConfig(this, javaChannel().socket())
)。
所以
channel = channelFactory.newChannel()
通過(guò)反射機(jī)制產(chǎn)生了 NioServerSocketChannel 類實(shí)例柑营。同時(shí)該實(shí)例設(shè)置了NioMessageUnsafe屈雄、DefaultChannelPipeline、非阻塞官套、SelectionKey.OP_ACCEPT事件 和 NioServerSocketChannelConfig 屬性酒奶。
看完了 channelFactory.newChannel();
,我們?cè)倏?init()
奶赔。
void init(Channel channel) throws Exception {
// 設(shè)置配置的option參數(shù)
final Map<ChannelOption<?>, Object> options = options0();
synchronized (options) {
channel.config().setOptions(options);
}
final Map<AttributeKey<?>, Object> attrs = attrs0();
synchronized (attrs) {
for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
@SuppressWarnings("unchecked")
AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();
channel.attr(key).set(e.getValue());
}
}
// 獲取綁定的pipeline
ChannelPipeline p = channel.pipeline();
// 準(zhǔn)備child用到的4個(gè)part
final EventLoopGroup currentChildGroup = childGroup;
final ChannelHandler currentChildHandler = childHandler;
final Entry<ChannelOption<?>, Object>[] currentChildOptions;
final Entry<AttributeKey<?>, Object>[] currentChildAttrs;
synchronized (childOptions) {
currentChildOptions = childOptions.entrySet().toArray(newOptionArray(childOptions.size()));
}
synchronized (childAttrs) {
currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(childAttrs.size()));
}
// 為NioServerSocketChannel的pipeline添加一個(gè)初始化Handler,
// 當(dāng)NioServerSocketChannel在EventLoop注冊(cè)成功時(shí)惋嚎,該handler的init方法將被調(diào)用
p.addLast(new ChannelInitializer<Channel>() {
@Override
public void initChannel(Channel ch) throws Exception {
final ChannelPipeline pipeline = ch.pipeline();
ChannelHandler handler = config.handler();
//如果用戶配置過(guò)Handler
if (handler != null) {
pipeline.addLast(handler);
}
ch.eventLoop().execute(new Runnable() {
@Override
public void run() {
// 為NioServerSocketChannel的pipeline添加ServerBootstrapAcceptor處理器
// 該Handler主要用來(lái)將新創(chuàng)建的NioSocketChannel注冊(cè)到EventLoopGroup中
pipeline.addLast(new ServerBootstrapAcceptor(
currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
}
});
}
其實(shí)整個(gè)過(guò)程可以分為三個(gè)步驟:
- 設(shè)置 Channel 的 option 和 attr;
- 獲取綁定的 pipeline站刑,然后為 NioServerSocketChanne l綁定的 pipeline 添加 Handler另伍;
- 將用于服務(wù)端注冊(cè)的 Handler ServerBootstrapAcceptor 添加到 ChannelPipeline 中。ServerBootstrapAcceptor 為一個(gè)接入器绞旅,專門接受新請(qǐng)求摆尝,把新的請(qǐng)求扔給某個(gè)事件循環(huán)器。
至此初始化部分已經(jīng)結(jié)束因悲,我們?cè)倏醋?cè)部分堕汞,
// /向EventLoopGroup中注冊(cè)一個(gè)channel
ChannelFuture regFuture = config().group().register(channel);
注冊(cè)方法的調(diào)用位于 initAndRegister()
方法中。注意這里的 group()
返回的是前面的 boss NioEvenLoopGroup晃琳,它繼承 MultithreadEventLoopGroup讯检,調(diào)用的 register()
,也是 MultithreadEventLoopGroup 中的卫旱。如下:
public ChannelFuture register(Channel channel) {
return next().register(channel);
}
調(diào)用 next()
方法從 EventLoopGroup 中獲取下一個(gè) EventLoop人灼,調(diào)用 register()
方法注冊(cè):
public ChannelFuture register(Channel channel) {
return register(new DefaultChannelPromise(channel, this));
}
將Channel和EventLoop封裝成一個(gè)DefaultChannelPromise對(duì)象,然后調(diào)用register()方法顾翼。DefaultChannelPromis為ChannelPromise的默認(rèn)實(shí)現(xiàn)挡毅,而ChannelPromisee繼承Future,具備異步執(zhí)行結(jié)構(gòu)暴构,綁定Channel跪呈,所以又具備了監(jiān)聽(tīng)的能力,故而ChannelPromis是Netty異步執(zhí)行的核心接口取逾。
public ChannelFuture register(ChannelPromise promise) {
ObjectUtil.checkNotNull(promise, "promise");
promise.channel().unsafe().register(this, promise);
return promise;
}
首先獲取 channel 的 unsafe 對(duì)象耗绿,該 unsafe 對(duì)象就是在之前設(shè)置過(guò)得。然后調(diào)用 register()
方法砾隅,如下:
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
if (eventLoop == null) {
throw new NullPointerException("eventLoop");
}
if (isRegistered()) {
promise.setFailure(new IllegalStateException("registered to an event loop already"));
return;
}
if (!isCompatible(eventLoop)) {
promise.setFailure(
new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
return;
}
AbstractChannel.this.eventLoop = eventLoop;
// 必須要保證注冊(cè)是由該EventLoop發(fā)起的
if (eventLoop.inEventLoop()) {
register0(promise); // 注冊(cè)
} else {
// 如果不是單獨(dú)封裝成一個(gè)task異步執(zhí)行
try {
eventLoop.execute(new Runnable() {
@Override
public void run() {
register0(promise);
}
});
} catch (Throwable t) {
logger.warn(
"Force-closing a channel whose registration task was not accepted by an event loop: {}",
AbstractChannel.this, t);
closeForcibly();
closeFuture.setClosed();
safeSetFailure(promise, t);
}
}
}
過(guò)程如下:
- 首先通過(guò)
isRegistered()
判斷該 Channel 是否已經(jīng)注冊(cè)到 EventLoop 中误阻; - 通過(guò)
eventLoop.inEventLoop()
來(lái)判斷當(dāng)前線程是否為該 EventLoop 自身發(fā)起的,如果是,則調(diào)用register0()
直接注冊(cè)究反; - 如果不是寻定,說(shuō)明該 EventLoop 中的線程此時(shí)沒(méi)有執(zhí)行權(quán),則需要新建一個(gè)線程精耐,單獨(dú)封裝一個(gè) Task狼速,而該 Task 的主要任務(wù)則是執(zhí)行
register0()
。
無(wú)論當(dāng)前 EventLoop 的線程是否擁有執(zhí)行權(quán)卦停,最終都會(huì)要執(zhí)行 register0()
向胡,如下:
private void register0(ChannelPromise promise) {
try {
// 確保 Channel 處于 open
if (!promise.setUncancellable() || !ensureOpen(promise)) {
return;
}
boolean firstRegistration = neverRegistered;
// 真正的注冊(cè)動(dòng)作
doRegister();
neverRegistered = false;
registered = true;
pipeline.invokeHandlerAddedIfNeeded();
safeSetSuccess(promise); //設(shè)置注冊(cè)結(jié)果為成功
pipeline.fireChannelRegistered();
if (isActive()) {
//如果是首次注冊(cè),發(fā)起 pipeline 的 fireChannelActive
if (firstRegistration) {
pipeline.fireChannelActive();
} else if (config().isAutoRead()) {
beginRead();
}
}
} catch (Throwable t) {
closeForcibly();
closeFuture.setClosed();
safeSetFailure(promise, t);
}
}
如果 Channel 處于 open 狀態(tài),則調(diào)用 doRegister()
方法完成注冊(cè)惊完,然后將注冊(cè)結(jié)果設(shè)置為成功僵芹。最后判斷如果是首次注冊(cè)且處于激活狀態(tài),則發(fā)起 pipeline 的 fireChannelActive()
小槐。
protected void doRegister() throws Exception {
boolean selected = false;
for (;;) {
try {
// 注冊(cè)到NIOEventLoop的Selector上
selectionKey = javaChannel().register(eventLoop().selector, 0, this);
return;
} catch (CancelledKeyException e) {
if (!selected) {
eventLoop().selectNow();
selected = true;
} else {
throw e;
}
}
}
}
這里注冊(cè)時(shí) ops 設(shè)置的是 0拇派,也就是說(shuō) ServerSocketChannel 僅僅只是表示了注冊(cè)成功,還不能監(jiān)聽(tīng)任何網(wǎng)絡(luò)操作凿跳,這樣做的目的是(摘自《Netty權(quán)威指南(第二版)》):
- 注冊(cè)方式是多態(tài)的件豌,它既可以被 NIOServerSocketChannel 用來(lái)監(jiān)聽(tīng)客戶端的連接接入,也可以注冊(cè) SocketChannel 用來(lái)監(jiān)聽(tīng)網(wǎng)絡(luò)讀或者寫(xiě)操作拄显。
- 通過(guò)
SelectionKey.interestOps(int ops)
方法可以方便地修改監(jiān)聽(tīng)操作位苟径。所以案站,此處注冊(cè)需要獲取 SelectionKey 并給 AbstractNIOChannel 的成員變量 selectionKey 賦值躬审。
由于這里 ops 設(shè)置為 0,所以還不能監(jiān)聽(tīng)讀寫(xiě)事件蟆盐。調(diào)用 doRegister()
后承边,然后調(diào)用pipeline.invokeHandlerAddedIfNeeded();
,這個(gè)時(shí)候控制臺(tái)會(huì)出現(xiàn) loggin-handlerAdded
石挂,內(nèi)部如何調(diào)用博助,我們?cè)谄饰?pipeline 時(shí)再做詳細(xì)分析。然后將注冊(cè)結(jié)果設(shè)置為成功(safeSetSuccess(promise)
)痹愚。調(diào)用 pipeline.fireChannelRegistered();
這個(gè)時(shí)候控制臺(tái)會(huì)打印 loggin-channelRegistered
富岳。這里簡(jiǎn)單分析下該方法。
public final ChannelPipeline fireChannelRegistered() {
AbstractChannelHandlerContext.invokeChannelRegistered(head);
return this;
}
static void invokeChannelRegistered(final AbstractChannelHandlerContext next) {
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeChannelRegistered();
} else {
executor.execute(new Runnable() {
@Override
public void run() {
next.invokeChannelRegistered();
}
});
}
}
pipeline 維護(hù)著 handle 鏈表拯腮,事件會(huì)在 NioServerSocketChannel 的 pipeline 中傳播窖式。最終都會(huì)調(diào)用 next.invokeChannelRegistered()
,如下:
private void invokeChannelRegistered() {
if (invokeHandler()) {
try {
((ChannelInboundHandler) handler()).channelRegistered(this);
} catch (Throwable t) {
notifyHandlerException(t);
}
} else {
fireChannelRegistered();
}
}
在 invokeChannelRegistered()
會(huì)調(diào)用我們?cè)谇懊嬖O(shè)置的 handler (還記得簽名的 handler(new LoggingServerHandler()
)么)的 channelRegistered()
动壤,這個(gè)時(shí)候控制臺(tái)應(yīng)該會(huì)打印 loggin-channelRegistered
萝喘。
到這里initAndRegister() (final ChannelFuture regFuture = initAndRegister();)
就分析完畢了,該方法主要做如下三件事:
- 通過(guò)反射產(chǎn)生了一個(gè) NioServerSocketChannle 對(duì)象;
- 調(diào)用
init(channel)
完成初始化工作阁簸; - 將NioServerSocketChannel進(jìn)行了注冊(cè)爬早。
initAndRegister()
篇幅較長(zhǎng),分析完畢了启妹,我們?cè)俜祷氐?code>doBind(final SocketAddress localAddress)筛严。在 doBind(final SocketAddress localAddress)
中如果 initAndRegister()
執(zhí)行完成,則 regFuture.isDone()
則為 true翅溺,執(zhí)行 doBind0()
脑漫。如果沒(méi)有執(zhí)行完成,則會(huì)注冊(cè)一個(gè)監(jiān)聽(tīng) ChannelFutureListener咙崎,當(dāng) initAndRegister()
完成后优幸,會(huì)調(diào)用該監(jiān)聽(tīng)的 operationComplete()
方法,最終目的還是執(zhí)行 doBind0()
褪猛。故而我們下面分析 doBind0()
到底做了些什么网杆。源碼如下:
private static void doBind0(
final ChannelFuture regFuture, final Channel channel,
final SocketAddress localAddress, final ChannelPromise promise) {
channel.eventLoop().execute(new Runnable() {
@Override
public void run() {
if (regFuture.isSuccess()) {
channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
} else {
promise.setFailure(regFuture.cause());
}
}
});
}
doBind0()
較為簡(jiǎn)單,首先new 一個(gè)線程 task伊滋,然后將該任務(wù)提交到 NioEventLoop 中進(jìn)行處理碳却,我們先看 execute()
。
public void execute(Runnable task) {
if (task == null) {
throw new NullPointerException("task");
}
boolean inEventLoop = inEventLoop();
if (inEventLoop) {
addTask(task);
} else {
startThread();
addTask(task);
if (isShutdown() && removeTask(task)) {
reject();
}
}
if (!addTaskWakesUp && wakesUpForTask(task)) {
wakeup(inEventLoop);
}
}
調(diào)用 inEventLoop()
判斷當(dāng)前線程是否為該 NioEventLoop 所關(guān)聯(lián)的線程笑旺,如果是昼浦,則調(diào)用 addTask()
將任務(wù) task 添加到隊(duì)列中,如果不是筒主,則先啟動(dòng)線程关噪,在調(diào)用 addTask()
將任務(wù) task 添加到隊(duì)列中。addTask()
如下:
protected void addTask(Runnable task) {
if (task == null) {
throw new NullPointerException("task");
}
if (!offerTask(task)) {
reject(task);
}
}
offerTask()
添加到隊(duì)列中:
final boolean offerTask(Runnable task) {
if (isShutdown()) {
reject();
}
return taskQueue.offer(task);
}
task 添加到任務(wù)隊(duì)列 taskQueue成功后乌妙,執(zhí)行任務(wù)會(huì)調(diào)用如下方法:
channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
channel 首先調(diào)用 bind()
完成 channel 與端口的綁定使兔,如下:
public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
return pipeline.bind(localAddress, promise);
}
public final ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
return tail.bind(localAddress, promise);
}
tail 在 DefaultChannelPipeline 中定義:final AbstractChannelHandlerContext tail;
有 tail 就會(huì)有 head ,在 DefaultChannelPipeline 中維護(hù)這一個(gè) AbstractChannelHandlerContext 節(jié)點(diǎn)的雙向鏈表藤韵,該鏈表是實(shí)現(xiàn) Pipeline 機(jī)制的關(guān)鍵虐沥,更多詳情會(huì)在 ChannelPipeline 中做詳細(xì)說(shuō)明。bind()
最終會(huì)調(diào)用 DefaultChannelPipeline 的 bind()
方法泽艘。如下:
public ChannelFuture bind(final SocketAddress localAddress, final ChannelPromise promise) {
if (localAddress == null) {
throw new NullPointerException("localAddress");
}
if (!validatePromise(promise, false)) {
// cancelled
return promise;
}
final AbstractChannelHandlerContext next = findContextOutbound();
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeBind(localAddress, promise);
} else {
safeExecute(executor, new Runnable() {
@Override
public void run() {
next.invokeBind(localAddress, promise);
}
}, promise, null);
}
return promise;
}
首先對(duì) localAddress 欲险、 promise 進(jìn)行校驗(yàn),符合規(guī)范則調(diào)用 findContextOutbound()
匹涮,該方法用于在 pipeline 中獲取 AbstractChannelHandlerContext 雙向鏈表中的一個(gè)節(jié)點(diǎn)天试,如下:
private AbstractChannelHandlerContext findContextOutbound() {
AbstractChannelHandlerContext ctx = this;
do {
ctx = ctx.prev;
} while (!ctx.outbound);
return ctx;
}
從該方法可以看出,所獲取的節(jié)點(diǎn)是從 tail 開(kāi)始遍歷焕盟,獲取第一個(gè)節(jié)點(diǎn)屬性 outbound 為 true 的節(jié)點(diǎn)秋秤。其實(shí)該節(jié)點(diǎn)是 AbstractChannelHandlerContext 雙向鏈表的 head 節(jié)點(diǎn)宏粤。獲取該節(jié)點(diǎn)后,調(diào)用 invokeBind()
灼卢,如下:
private void invokeBind(SocketAddress localAddress, ChannelPromise promise) {
if (invokeHandler()) {
try {
((ChannelOutboundHandler) handler()).bind(this, localAddress, promise);
} catch (Throwable t) {
notifyOutboundHandlerException(t, promise);
}
} else {
bind(localAddress, promise);
}
}
handler()
返回的是 HeadContext 對(duì)象绍哎,然后調(diào)用其bind()
,如下:
public void bind(
ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise)
throws Exception {
unsafe.bind(localAddress, promise);
}
unsafe 定義在 HeadContext 中鞋真,在構(gòu)造函數(shù)中初始化(unsafe = pipeline.channel().unsafe();
)崇堰,調(diào)用 bind()
如下:
public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {
assertEventLoop();
if (!promise.setUncancellable() || !ensureOpen(promise)) {
return;
}
if (Boolean.TRUE.equals(config().getOption(ChannelOption.SO_BROADCAST)) &&
localAddress instanceof InetSocketAddress &&
!((InetSocketAddress) localAddress).getAddress().isAnyLocalAddress() &&
!PlatformDependent.isWindows() && !PlatformDependent.isRoot()) {
logger.warn(
"A non-root user can't receive a broadcast packet if the socket " +
"is not bound to a wildcard address; binding to a non-wildcard " +
"address (" + localAddress + ") anyway as requested.");
}
boolean wasActive = isActive();
try {
// 最核心方法
doBind(localAddress);
} catch (Throwable t) {
safeSetFailure(promise, t);
closeIfClosed();
return;
}
if (!wasActive && isActive()) {
invokeLater(new Runnable() {
@Override
public void run() {
pipeline.fireChannelActive();
}
});
}
safeSetSuccess(promise);
}
內(nèi)部調(diào)用 doBind()
,該方法為綁定中最核心的方法涩咖,位于 NioServerSocketChannel 中海诲,如下:
protected void doBind(SocketAddress localAddress) throws Exception {
if (PlatformDependent.javaVersion() >= 7) {
javaChannel().bind(localAddress, config.getBacklog());
} else {
javaChannel().socket().bind(localAddress, config.getBacklog());
}
}
javaChannel()
返回的是 NioServerSocketChannel 實(shí)例初始化時(shí)所產(chǎn)生的 Java NIO ServerSocketChannel 實(shí)例(ServerSocketChannelImple實(shí)例),然后調(diào)用其 bind()
檩互,如下:
public ServerSocketChannel bind(SocketAddress var1, int var2) throws IOException {
Object var3 = this.lock;
synchronized(this.lock) {
if(!this.isOpen()) {
throw new ClosedChannelException();
} else if(this.isBound()) {
throw new AlreadyBoundException();
} else {
InetSocketAddress var4 = var1 == null?new InetSocketAddress(0):Net.checkAddress(var1);
SecurityManager var5 = System.getSecurityManager();
if(var5 != null) {
var5.checkListen(var4.getPort());
}
NetHooks.beforeTcpBind(this.fd, var4.getAddress(), var4.getPort());
Net.bind(this.fd, var4.getAddress(), var4.getPort());
Net.listen(this.fd, var2 < 1?50:var2);
Object var6 = this.stateLock;
synchronized(this.stateLock) {
this.localAddress = Net.localAddress(this.fd);
}
return this;
}
}
}
該方法屬于 Java NIO 層次的特幔,該方法涉及到服務(wù)端端口的綁定,端口的監(jiān)聽(tīng)闸昨,這些內(nèi)容在后續(xù)的 Channel 時(shí)做詳細(xì)介紹蚯斯。
到這里就真正完成了服務(wù)端端口的綁定。
這篇博客比較長(zhǎng)饵较,大體上從源碼層次稍微解讀了 Netty 服務(wù)端的啟動(dòng)過(guò)程拍嵌,當(dāng)中涉及到 Netty 的各個(gè)核心組件,只能籠統(tǒng)來(lái)描述服務(wù)端的啟動(dòng)過(guò)程循诉,具體的細(xì)節(jié)部分還需要后續(xù)做詳細(xì)分析横辆,而且其中有多個(gè)點(diǎn)還是懵懵懂懂,相信在后面對(duì) Netty 的分析過(guò)程會(huì)一一解答茄猫。
謝謝閱讀狈蚤,祝好!!!
參考資料
- 《Netty權(quán)威指南(第二版)》
- 《Netty IN ACTION》
- Netty源碼分析:服務(wù)端啟動(dòng)全過(guò)程(篇幅很長(zhǎng))
歡迎掃一掃我的公眾號(hào)關(guān)注 — 及時(shí)得到博客訂閱哦!