導(dǎo)讀
原創(chuàng)文章些椒,轉(zhuǎn)載請(qǐng)注明出處。
本文源碼地址:netty-source-code-analysis
本文所使用的netty版本4.1.6.Final:帶注釋的netty源碼
我們?cè)凇癇IO vs NIO”這篇文件中我們給出了使用jdk原生nio編寫的服務(wù)端Hello World免糕。還記得其中的關(guān)鍵步驟嗎,咱們?cè)賮?lái)溫習(xí)一下牌芋。
創(chuàng)建一個(gè)ServerSocketChannel
將ServerSocketChannel設(shè)置為非阻塞的
將ServerSocketChannel綁定到8000端口
將ServerSocketChannel注冊(cè)到selector上
今天我們就以這幾個(gè)關(guān)鍵步驟為目標(biāo)來(lái)看一下在netty中是怎么做的松逊,以及在這幾個(gè)步驟的中間netty又多做了哪些工作。
1 服務(wù)端引導(dǎo)代碼
以下代碼引導(dǎo)啟動(dòng)一個(gè)服務(wù)端棺棵,在以下文章中我們以“引導(dǎo)代碼”指代這段程序熄捍。這段代碼很簡(jiǎn)單,創(chuàng)建兩個(gè)EventLoopGroup
分別為bossGroup
和workerGroup
缚柏。創(chuàng)建一個(gè)ServerBoostrap
并將bossGroup
和workerGroup
傳入碟贾,配置一個(gè)handler
币喧,該handler
為監(jiān)聽端口這條連接所使用的handler
袱耽。接著又設(shè)置了一個(gè)childHandler
即新連接所使用的handler
,本篇文章我們不講新連接的接入史翘,所以這里的childHandler
里什么也沒做冀续。
運(yùn)行這段這段代碼將在控制臺(tái)打出如下結(jié)果。
HandlerAdded
ChannelRegistered
ChannelActive
/**
* 歡迎關(guān)注公眾號(hào)“種代碼“洪唐,獲取博主微信深入交流
*
* @author wangjianxin
*/
public class com.zhongdaima.netty.analysis.bootstrap.ServerBoot {
public static void main(String[] args) throws InterruptedException {
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup(1);
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.attr(AttributeKey.valueOf("ChannelName"), "ServerChannel")
.handler(new ChannelInboundHandlerAdapter() {
@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
System.out.println("ChannelRegistered");
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("ChannelActive");
}
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
System.out.println("HandlerAdded");
}
}).childHandler(new ChannelInboundHandlerAdapter(){
});
ChannelFuture f = b.bind(8000).sync();
f.channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
2 啟動(dòng)過程
我們從ChannelFuture f = b.bind(8000).sync()
的bind
方法往下跟到AbstractBootStrap
的doBind
方法,這中間的過程很簡(jiǎn)單问欠,就是將端口號(hào)封裝為SocketAddress
。
在doBind
內(nèi)的關(guān)鍵代碼有第一行的initAddRegister
方法溅潜,還有后面的doBind0
方法。
private ChannelFuture doBind(final SocketAddress localAddress) {
final ChannelFuture regFuture = initAndRegister();
if (regFuture.isDone()) {
doBind0(regFuture, channel, localAddress, promise);
} else {
final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
regFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
Throwable cause = future.cause();
if (cause != null) {
} else {
doBind0(regFuture, channel, localAddress, promise);
}
}
});
return promise;
}
}
進(jìn)入到initAndRegister
方法粗仓,initAddResgiter
方法中有3個(gè)關(guān)鍵步驟设捐,1是channelFactory.newChannel()
蚂斤,2是init(channel)
槐沼,3是config().group().register(channel)
。
final ChannelFuture initAndRegister() {
Channel channel = null;
try {
//創(chuàng)建一個(gè)Channel
channel = channelFactory.newChannel();
//初始化Channel
init(channel);
} catch (Throwable t) {
}
//注冊(cè)Channel
ChannelFuture regFuture = config().group().register(channel);
...
}
整個(gè)doBind
方法被分成4個(gè)關(guān)鍵步驟纽窟,分別是:
channelFacotry.newChannel()
init(channel)
config().group().register(channel)
doBind0
接下來(lái)咱們分別來(lái)看這4個(gè)關(guān)鍵步驟臂港。
2.1 channelFacotry.newChannel()
新創(chuàng)建一個(gè)Channel
channelFacotry
是AbstractBootStrap
的一個(gè)屬性审孽,這個(gè)屬性在哪里被賦值呢浑娜,其實(shí)是在我們?cè)趩?dòng)時(shí)調(diào)用b.channel(NioServerSocketChannel)
時(shí)賦的值,這個(gè)方法在AbstractBootStrap
里搓萧,非常簡(jiǎn)單瘸洛,我們不再分析。最后的結(jié)果是channelFactory
被賦值為ReflectiveChannelFactory
石蔗,顧名思義就是用反射的方法創(chuàng)建Channel
养距,我看們一下其中的newChannel()
方法棍厌,很簡(jiǎn)單敬肚,clazz.newInstance
調(diào)用無(wú)參構(gòu)造方法創(chuàng)建實(shí)例艳馒。
public class ReflectiveChannelFactory<T extends Channel> implements ChannelFactory<T> {
private final Class<? extends T> clazz;
@Override
public T newChannel() {
try {
return clazz.newInstance();
} catch (Throwable t) {
throw new ChannelException("Unable to create Channel from class " + clazz, t);
}
}
}
接下來(lái)咱們就看一下NioServerSocketChannel
的無(wú)參構(gòu)造方法,其中調(diào)用newSocket
方法創(chuàng)建了一個(gè)jdk的ServerSocketChannel
蝶锋。好了,咱們已經(jīng)看到了導(dǎo)讀中提到的第1步“創(chuàng)建一個(gè)ServerSocketChannel”,緊著把這個(gè)channel
傳遞給了父類的構(gòu)造方法兔港,還傳遞一個(gè)參數(shù)SelectionKey.OP_ACCEPT
,記住這個(gè)參數(shù)后面會(huì)提到飒赃。
public NioServerSocketChannel() {
this(newSocket(DEFAULT_SELECTOR_PROVIDER));
}
private static ServerSocketChannel newSocket(SelectorProvider provider) {
try {
return provider.openServerSocketChannel();
} catch (IOException e) {
}
}
public NioServerSocketChannel(ServerSocketChannel channel) {
super(null, channel, SelectionKey.OP_ACCEPT);
config = new NioServerSocketChannelConfig(this, javaChannel().socket());
}
咱們接著跟到父類AbstractNioMessageChannel
的構(gòu)造方法,沒什么其他操作蔫慧,繼續(xù)調(diào)用父類的構(gòu)造方法睡扬。
protected AbstractNioMessageChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
super(parent, ch, readInterestOp);
}
接著跟下去卖怜,到了AbstractNioChannel
的構(gòu)造方法,在這里我們看到了ch.configureBlocking(false)
侮繁,至此我們看到了導(dǎo)讀中提到的第2步“將Channel設(shè)置為非阻塞的”娩贷。AbstractNioChannel
里又調(diào)用了父類的構(gòu)造方法,接著看下去品抽。
protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
super(parent);
this.ch = ch;
this.readInterestOp = readInterestOp;
try {
//將Channel設(shè)置為非阻塞的
ch.configureBlocking(false);
} catch (IOException e) {
throw new ChannelException("Failed to enter non-blocking mode.", e);
}
}
到了AbstractChannel
的構(gòu)造方法圆恤,這里為Channel
創(chuàng)建了一個(gè)id羽历,一個(gè)Unsafe
還有一個(gè)PipeLine
秕磷。Unsafe
和PipeLine
咱們后面再講澎嚣。
protected AbstractChannel(Channel parent) {
this.parent = parent;
id = newId();
unsafe = newUnsafe();
pipeline = newChannelPipeline();
}
2.2 init(channel)
初始化Channel
我們回到AbstractBootstrap
的initAndRegister
方法易桃,接著往下看init(channel)
,這是個(gè)抽象方法驱富,實(shí)現(xiàn)在ServerBootstrap
里匹舞。
init
方法的主要邏輯是設(shè)置Channel
參數(shù)叫榕、屬性晰绎,并將我們?cè)谝龑?dǎo)代碼中所配置的Handler
添加進(jìn)去,最后又添加了一個(gè)ServerBootStrapAccptor
,顧名思義這是一個(gè)處理新連接接入的Handler
构资。
這個(gè)ServerBootStrapAccptor
在隨后的章節(jié)中我們會(huì)講迹淌,這里先略過唉窃。至于為什么調(diào)用ch.eventLoop().execute
而不是直接添加榔幸,這個(gè)我在代碼里有簡(jiǎn)要提示削咆,其實(shí)目前的版本拨齐,直接添加也是沒有問題的厦滤。這個(gè)我會(huì)在出視頻教程的時(shí)候給大家演示一下掏导,歡迎關(guān)注添瓷。
void init(Channel channel) throws Exception {
//設(shè)置Channel參數(shù)鳞贷,我們?cè)谝龑?dǎo)代碼中通過.option(ChannelOption.TCP_NODELAY, true)所設(shè)置的參數(shù)
final Map<ChannelOption<?>, Object> options = options0();
synchronized (options) {
channel.config().setOptions(options);
}
//設(shè)置Channel屬性,我們?cè)谝龑?dǎo)代碼中通過.attr(AttributeKey.valueOf("ChannelName"), "ServerChannel")所設(shè)置的屬性
final Map<AttributeKey<?>, Object> attrs = attrs0();
synchronized (attrs) {
for (Entry<AttributeKey<?>, Object> e : attrs.entrySet()) {
@SuppressWarnings("unchecked")
AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();
channel.attr(key).set(e.getValue());
}
}
ChannelPipeline p = channel.pipeline();
//p.addLast是同步調(diào)用,不管是不是EventLoop線程在執(zhí)行眷蚓,這個(gè)匿名的ChannelInitializer被立即添加進(jìn)PipeLine中
//但是這個(gè)匿名的ChannelInitializer的initChannel方法是被channelAdded方法調(diào)用的沙热,而channelAdded方法只能被EventLoop線程調(diào)用
//此時(shí)這個(gè)Channel還沒綁定EventLoop線程,所以這個(gè)匿名的ChannelInitializer的channelAdded方法的調(diào)用會(huì)被封裝成異步任務(wù)添加到PipeLine的pendingHandlerCallback鏈表中
//當(dāng)Channel綁定EventLoop以后會(huì)從pendingHandlerCallback鏈表中取出任務(wù)執(zhí)行爵川。
p.addLast(new ChannelInitializer<Channel>() {
@Override
public void initChannel(Channel ch) throws Exception {
final ChannelPipeline pipeline = ch.pipeline();
ChannelHandler handler = config.handler();
if (handler != null) {
//添加我們?cè)谝龑?dǎo)代碼中所配置的handler
pipeline.addLast(handler);
}
//有些同學(xué)對(duì)這個(gè)有疑問值依,為什么不直接pipeline.addLast,可以參考下面的issue,其實(shí)現(xiàn)在的版本已經(jīng)可以直接改成pipeline.addLast
//issue鏈接https://github.com/netty/netty/issues/5566
//為什么現(xiàn)在的版本可以直接改成pipeline.adLast呢风秤,關(guān)鍵在于ChannelInitializer的handlerAdded方法
//大家可以對(duì)比4.0.39.Final版本和4.1.6.Final版本的區(qū)別
//添加ServerBootStrapAcceptor
ch.eventLoop().execute(new Runnable() {
@Override
public void run() {
pipeline.addLast(new ServerBootstrapAcceptor(
currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
}
});
}
2.3 config().group().register(channel)
綁定EventLoop
并向Selector
注冊(cè)Channel
我們回到AbstractBootstrap
的initAndRegister
方法疑苔,接著往下看到ChannelFuture regFuture = config().group().register(channel);
惦费,這里就是注冊(cè)Channel的地方了,咱們跟進(jìn)去看看。
config.group()
的返回是我們?cè)谝龑?dǎo)代碼中所設(shè)置的bossGroup
鳍贾,由于這里只有一個(gè)Channel
橡淑,所以bossEventLoopGroup
里面只需要1個(gè)EventLoop
就夠了。
跟到register(channel)
方法里看看斗埂,這個(gè)register
方法是抽象的男娄,具體實(shí)現(xiàn)在MultithreadEventLoopGroup
中,跟進(jìn)去围橡。
@Override
public ChannelFuture register(Channel channel) {
return next().register(channel);
}
next()
方法調(diào)用EventExecutorChooser
的next()
方法選擇一個(gè)EventLoop
。EventExecutorChooser
有兩個(gè)實(shí)現(xiàn)收擦,分別是PowerOfTowEventExecutorChooser
和GenericEventExecutorChooser
塞赂,這兩個(gè)Chooser
用的都是輪詢策略,只是輪詢算法不一樣仇哆。如果EventLoopGroup
內(nèi)的EventLoop
個(gè)數(shù)是2的冪讹剔,則用PowerOfTowEventExecutorChooser
,否則用GenericEventExecutorChooser
。
PowerOfTowEventExecutorChooser
使用位操作隅俘。
@Override
public EventExecutor next() {
return executors[idx.getAndIncrement() & executors.length - 1];
}
而GenericEventExecutorChooser
使用取余操作为居。
@Override
public EventExecutor next() {
return executors[Math.abs(idx.getAndIncrement() % executors.length)];
}
從EventLoop
的選擇算法上我們可以看出,netty為了性能膳凝,無(wú)所不用其極蹬音。
chooser
屬性的賦值在MultithreadEventExecutorGroup
的構(gòu)造方法內(nèi)通過chooserFactory
創(chuàng)建的劫狠。
protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
EventExecutorChooserFactory chooserFactory, Object... args) {
chooser = chooserFactory.newChooser(children);
}
而chooserFactory
的賦值在MultithreadEventExecutorGroup
的另一個(gè)構(gòu)造方法內(nèi)。當(dāng)我們?cè)谝龑?dǎo)代碼中通過new NioEventLoopGroup(1)
創(chuàng)建EventLoopGroup
時(shí)最終會(huì)調(diào)用到這個(gè)構(gòu)造方法內(nèi)懦砂,默認(rèn)值為DefaultEventExecutorChooserFactory.INSTANCE
荞膘。
protected MultithreadEventExecutorGroup(int nThreads, Executor executor, Object... args) {
this(nThreads, executor, DefaultEventExecutorChooserFactory.INSTANCE, args);
}
next()
方法選出的EventLoop
就是個(gè)SingleThreadEventLoop
了,我們跟到SingleThreadEventLoop
的register
方法,最終調(diào)用的是unsafe
的register
方法弥激。
@Override
public ChannelFuture register(Channel channel) {
return register(new DefaultChannelPromise(channel, this));
}
@Override
public ChannelFuture register(final ChannelPromise promise) {
ObjectUtil.checkNotNull(promise, "promise");
promise.channel().unsafe().register(this, promise);
return promise;
}
unsafe.register
方法在io.netty.channel.AbstractChannel.AbstractUnsafe
內(nèi)微服,我們跟下去看看。在register
方法中最主要的有兩件事,一是綁定eventloop
宝与,二是調(diào)用register0
方法习劫。此時(shí)的調(diào)用線程不是EventLoop
線程,會(huì)發(fā)起一個(gè)異步任務(wù)谤狡。
@Override
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
//綁定eventloop
AbstractChannel.this.eventLoop = eventLoop;
if (eventLoop.inEventLoop()) {
register0(promise);
//此時(shí)我們不在EventLoop內(nèi)豌汇,也就是當(dāng)前線程非EventLoop線程,會(huì)走到這個(gè)分支
} else {
try {
eventLoop.execute(new Runnable() {
@Override
public void run() {
//調(diào)用子類的register0方法
register0(promise);
}
});
} catch (Throwable t) {
}
}
}
register0
方法內(nèi)主要有3步操作。
第1步是doRegister()
斜做,這個(gè)咱們稍后說瓤逼。
第2步是pipeline.invokeHandlerAddedIfNeeded()
這一步是去完成那些在綁定EventLoop
之前觸發(fā)的添加handler
操作,比如我們添加了一個(gè)ChannelInitializer
,在ChannelInitalizer
的initChannel
方法中添加的Handler
精居,而initChannel
被channelAdded
方法調(diào)用靴姿,channelAdded
方法的調(diào)用必須在EventLoop
內(nèi),未綁定EventLoop
之前這個(gè)調(diào)用會(huì)被封裝成異步任務(wù)辈毯。
這些操作被放在pipeline
中的pendingHandlerCallbackHead
中,是個(gè)雙向鏈表唁影,具體請(qǐng)參考DefaultChannelPipeLine
的addLast(EventExecutorGroup group, String name, ChannelHandler handler)
方法哟沫。
這一步調(diào)用了咱們的引導(dǎo)程序中的System.out.println("HandlerAdded")
嗜诀,在控制臺(tái)打出"HandlerAdded"
隆敢。
第3步觸發(fā)ChannelRegistered
事件。這一步調(diào)用了咱們的引導(dǎo)程序中的System.out.println("ChannelRegistered")
,在控制臺(tái)打出"ChannelRegistered"
捣作。
好了券躁,到這里我們已經(jīng)知道了,為什么我們的引導(dǎo)程會(huì)先打出"HandlerAdded"
和"ChannelRegistered"
。
接著往下isActive()
最終調(diào)用是的jdk ServerSocket
類的isBound
方法卵贱,咱們不再貼出代碼键俱,讀者自行查看,很簡(jiǎn)單踪央,顯然這里我們還沒有完成端口綁定畅蹂,所以這個(gè)if
分支的代碼并不會(huì)執(zhí)行累贤。
private void register0(ChannelPromise promise) {
try {
//向Selector注冊(cè)Channel
doRegister();
//去完成那些在綁定EventLoop之前觸發(fā)的添加handler操作痹束,這些操作被放在pipeline中的pendingHandlerCallbackHead中祷嘶,是個(gè)鏈表烛谊,具體請(qǐng)參考`DefaultChannelPipeLine`的`addLast(EventExecutorGroup group, String name, ChannelHandler handler)`方法。
pipeline.invokeHandlerAddedIfNeeded();
//將promise設(shè)置為成功的
safeSetSuccess(promise);
//觸發(fā)ChannelRegistered事件
pipeline.fireChannelRegistered();
//這里并沒有Active双泪,因?yàn)榇藭r(shí)還沒完成端口綁定焙矛,所以這個(gè)if分支的代碼都不會(huì)執(zhí)行
if (isActive()) {
if (firstRegistration) {
pipeline.fireChannelActive();
} else if (config().isAutoRead()) {
beginRead();
}
}
} catch (Throwable t) {
}
}
接下來(lái)咱們跟進(jìn)去doRegister
方法,這是個(gè)抽象方法蟆盹,本例中方法實(shí)現(xiàn)在AbstractNioChannel
中。好了面哥,到這里我們終于看到了導(dǎo)讀中提到的第4步“向Selector
注冊(cè)Channel
”的操作尚卫。
@Override
protected void doRegister() throws Exception {
boolean selected = false;
for (;;) {
try {
selectionKey = javaChannel().register(eventLoop().selector, 0, this);
return;
} catch (CancelledKeyException e) {
}
}
}
到了這里,我們?cè)趯?dǎo)讀中說的總共4步操作中怎爵,還有第3步?jīng)]有看到,在哪里呢,接著往下看狂秦。
2.4 綁定端口號(hào) doBind0
前文中我們說過doBind
方法內(nèi)有兩個(gè)重要調(diào)用initAndRegister
和doBind0
侧啼,initAndRegister
我們已經(jīng)分析完了,接下來(lái)看doBind0
符喝。由于initAndRegister
中register
是異步操作协饲,當(dāng)initAndRegister
返回時(shí),register
操作有可能完成了,也有可能沒完成膀篮,這里做了判斷誓竿,如果已經(jīng)完成則直接調(diào)用doBind0
涧偷,如果未完成,則將doBind0
放到regFuture
的Listener
中确封,等register
操作完成后隅肥,由EventLoop
線程來(lái)回調(diào)泛啸。
那么什么時(shí)候會(huì)回調(diào)Listener
呢吕粹,當(dāng)調(diào)用promise
的setSuccess
或者setFailure
時(shí)回調(diào)匹耕。還記得上文中的AbstractUnsafe.register0
方法嗎,其中有一個(gè)調(diào)用safeSetSuccess(promise)
既鞠,對(duì),就是這里了龄恋,很簡(jiǎn)單篙挽,我們不再贅述铣卡。
private ChannelFuture doBind(final SocketAddress localAddress) {
final ChannelFuture regFuture = initAndRegister();
if (regFuture.isDone()) {
ChannelPromise promise = channel.newPromise();
doBind0(regFuture, channel, localAddress, promise);
return promise;
} else {
final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
regFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
Throwable cause = future.cause();
if (cause != null) {
} else {
doBind0(regFuture, channel, localAddress, promise);
}
}
});
return promise;
}
}
那么又有讀者疑問了蝉仇,在這個(gè)if
判斷完成之后到添加Listener
之間的這個(gè)時(shí)間轿衔,promise
有可能已經(jīng)完成了害驹,Listener
可能不會(huì)回調(diào)了, 奧秘在DefaultPromise
的addListener(GenericFutureListener<? extends Future<? super V>> listener)
方法里,這里注冊(cè)完Listener
之后亥揖,如果發(fā)現(xiàn)promise
已經(jīng)完成了,那么將直接調(diào)用nofityListeners
方法向EventLoop
提交異步任務(wù)(此時(shí)已經(jīng)完成綁定EventLoop
),該異步任務(wù)即是回調(diào)剛剛注冊(cè)的Listener
旁趟。
@Override
public Promise<V> addListener(GenericFutureListener<? extends Future<? super V>> listener) {
synchronized (this) {
addListener0(listener);
}
if (isDone()) {
notifyListeners();
}
return this;
}
咱們回歸正題橙困,去看doBind0
方法辟狈,這里調(diào)用了channel.bind
方法,具體實(shí)現(xiàn)在AbstractChannel
里壹蔓。
private static void doBind0(
final ChannelFuture regFuture, final Channel channel,
final SocketAddress localAddress, final ChannelPromise promise) {
channel.eventLoop().execute(new Runnable() {
@Override
public void run() {
if (regFuture.isSuccess()) {
channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
} else {
promise.setFailure(regFuture.cause());
}
}
});
}
AbstractChannel
里的bind
方法調(diào)用了pipeline.bind
亲雪,還記得一篇“Netty整體架構(gòu)”文章中的那張圖嗎义辕,咱們?cè)俅畏懦鰜?lái)终息。
bind
方法會(huì)首先調(diào)用Tail的bind
方法喳张,最終傳播到Head
的bind
方法销部,具體怎么傳播的酱虎,咱們講PipeLine
的時(shí)候再說读串。
@Override
public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
return pipeline.bind(localAddress, promise);
}
這里咱們直接跟到HeadContext
的bind
方法, 我們看到又調(diào)用了unsafe
的bind
方法排监,前面我們看到Channel
在向Selector
注冊(cè)時(shí)最終也調(diào)用到了unsafe
杰捂。這里先跟大家說一下unsafe
是netty中最直接跟Channel
接觸的類舆床,對(duì)Channel
的所有操作最終都會(huì)落到unsafe
上嫁佳,具體詳情咱們后面講unsafe
的時(shí)候再說桦踊。
@Override
public void bind(
ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise)
throws Exception {
unsafe.bind(localAddress, promise);
}
具體實(shí)現(xiàn)在AbstractUnsafe
中,bind方法中兩個(gè)重要操作殖演,一是調(diào)用doBind
方法綁定端口,這個(gè)稍后說俯在。二是觸發(fā)ChannelActive
事件娃惯,這一步有一個(gè)isActive
判斷,到這里我們已經(jīng)完成了端口綁定趾浅,所以是true。這一步調(diào)用了咱們引導(dǎo)程序中的System.out.println("ChannelActive")
在控制臺(tái)打印出"ChannelActive"
浅侨。
@Override
public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {
boolean wasActive = isActive();
try {
doBind(localAddress);
} catch (Throwable t) {
}
if (!wasActive && isActive()) {
invokeLater(new Runnable() {
@Override
public void run() {
pipeline.fireChannelActive();
}
});
}
}
doBind
方法的實(shí)現(xiàn)在NioServerSocketChannel
中证膨,我們一起來(lái)看一下,至此導(dǎo)讀中提到的第3步操作“綁定端口”央勒,我們已經(jīng)看到了,服務(wù)端啟動(dòng)完成崔步。
protected void doBind(SocketAddress localAddress) throws Exception {
if (PlatformDependent.javaVersion() >= 7) {
javaChannel().bind(localAddress, config.getBacklog());
} else {
javaChannel().socket().bind(localAddress, config.getBacklog());
}
}
2.4.1 注冊(cè)興趣事件在哪里
但是似乎是不是少了點(diǎn)什么,我們?cè)谑褂胘dk api編寫的時(shí)候灶似,向selector
注冊(cè)的時(shí)候慎陵,傳遞了興趣事件的,為什么我們沒有看到這里有興趣事件的注冊(cè)呢喻奥。我們繼續(xù)回到AbstractUnsafe
的bind
方法中席纽,最后調(diào)用了pipeline.fireChannelActive()
,下面是PipeLine
的fireChannleActive
方法撞蚕,調(diào)用了AbstractChannelHandlerContext.invokeChannelActive(head)
润梯,而這個(gè)head
就是我們的“netty整體架構(gòu)圖”中的HeadContext
。
@Override
public final ChannelPipeline fireChannelActive() {
AbstractChannelHandlerContext.invokeChannelActive(head);
return this;
}
static void invokeChannelActive(final AbstractChannelHandlerContext next) {
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeChannelActive();
} else {
executor.execute(new Runnable() {
@Override
public void run() {
next.invokeChannelActive();
}
});
}
}
HeadContext
中的channelActive
方法如下甥厦,奧秘在readIfIsAutoRead
里纺铭,readIfIsAutoRead
,最終調(diào)用了channel.read
刀疙。
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelActive();
readIfIsAutoRead();
}
private void readIfIsAutoRead() {
if (channel.config().isAutoRead()) {
channel.read();
}
}
channel.read
方法的實(shí)現(xiàn)在AbstractChannel
中舶赔,調(diào)用到了pipeline.read
。
@Override
public Channel read() {
pipeline.read();
return this;
}
PipeLine
中的read
方法如下谦秧,調(diào)用了tail
的read
方法,最終這個(gè)調(diào)用會(huì)傳播到head
的read
方法疚鲤,具體的傳播過程锥累,等咱們講PipeLine
的時(shí)候再說。咱們直接去看HeadContext
的read
方法集歇。
@Override
public final ChannelPipeline read() {
tail.read();
return this;
}
HeadContext
的read
方法又調(diào)用到unsafe.beginRead()
桶略。
@Override
public void read(ChannelHandlerContext ctx) {
unsafe.beginRead();
}
beginRead
方法的實(shí)現(xiàn)在AbstractUnsafe
中,這里調(diào)用了doBeginRead
诲宇。doBeginRead
方法的實(shí)現(xiàn)在AbstractNioChannel
中际歼。
@Override
public final void beginRead() {
try {
doBeginRead();
} catch (final Exception e) {
}
}
doBeginRead
方法的實(shí)現(xiàn)在AbstractNioChannel
中,這里修改了selectionKey
的興趣事件姑蓝,把已有的興趣事件interestOps
和readInterestOp
合并在一起重新設(shè)置鹅心。
interestOps
是現(xiàn)有的興趣事件,在上文中向Selector
注冊(cè)時(shí)的代碼里javaChannel().register(eventLoop().selector, 0, this)
它掂,所以interestOps
就是0巴帮。
readInterestOp
在哪里設(shè)置的呢溯泣,還記得本篇文章中新創(chuàng)建一個(gè)Channel
那一小節(jié)中嗎虐秋?
@Override
protected void doBeginRead() throws Exception {
// Channel.read() or ChannelHandlerContext.read() was called
final SelectionKey selectionKey = this.selectionKey;
if (!selectionKey.isValid()) {
return;
}
readPending = true;
final int interestOps = selectionKey.interestOps();
if ((interestOps & readInterestOp) == 0) {
selectionKey.interestOps(interestOps | readInterestOp);
}
}
在NioServerSocketChannel
調(diào)用父類的構(gòu)造方法時(shí)傳遞了一個(gè)興趣事件參數(shù),值為SelectionKey.OP_ACCEPT
垃沦,至此客给,真相大白。
public NioServerSocketChannel(ServerSocketChannel channel) {
super(null, channel, SelectionKey.OP_ACCEPT);
config = new NioServerSocketChannelConfig(this, javaChannel().socket());
}
九曲連環(huán)肢簿,我們終于找到了這么小小的一個(gè)點(diǎn)靶剑,為什么流程這么長(zhǎng)呢蜻拨,似乎很難理解,不要緊桩引,繼續(xù)關(guān)注我的文章缎讼,咱們講PipeLine
的時(shí)候會(huì)把這里講明白。
3 總結(jié)
netty服務(wù)端啟動(dòng)流程:
創(chuàng)建一個(gè)
Channel
實(shí)例坑匠,這個(gè)過程中將Channel
設(shè)置為非阻塞的血崭,為Channel
創(chuàng)建了PipeLine
和Unsafe
。初始化
Channel
厘灼,為Channel
設(shè)置參數(shù)和屬性夹纫,并添加ServerBootstrapAcceptor
這個(gè)特殊的Handler
。注冊(cè)
Channel
设凹,為Channel
綁定一個(gè)EventLoop
并向Selector
注冊(cè)Channel
舰讹。綁定端口。
關(guān)于作者
王建新闪朱,轉(zhuǎn)轉(zhuǎn)架構(gòu)部資深Java工程師月匣,主要負(fù)責(zé)服務(wù)治理、RPC框架奋姿、分布式調(diào)用跟蹤桶错、監(jiān)控系統(tǒng)等。愛技術(shù)胀蛮、愛學(xué)習(xí)院刁,歡迎聯(lián)系交流。