通過前面兩篇文章的鋪墊,終于到了Netty服務(wù)端啟動的核心流程崔赌,但涉及的方法十分多漆际,希望咱們看源碼之前淆珊,一定要有一個關(guān)注點,看源碼的過程中就重點留意所關(guān)注的東西奸汇,其他與核心流程的邏輯關(guān)系不大施符,甚至有很多看不懂的方法,可以先跳過擂找,只關(guān)注核心的東西就行戳吝。如果對于每一行代碼都要執(zhí)著的理解,這將會是一場災(zāi)難贯涎!很有可能會因為源碼的苦澀而半途而廢听哭!所以,看源碼的時候塘雳,一定要有關(guān)注點陆盘,看源碼的過程中不能迷失...
好了,我們看一下Netty服務(wù)端啟動的代碼
// 綁定端口并啟動
ChannelFuture f = b.bind(PORT).sync();
通過這么簡單的一行败明,就能夠?qū)etty服務(wù)端運行起來隘马,但其背后卻做了很多東西!(再次佩服Netty框架的設(shè)計者妻顶,使開發(fā)者可以優(yōu)雅簡單的運行一個網(wǎng)絡(luò)應(yīng)用程序)
好了酸员,廢話不多說,看源碼讳嘱!
public ChannelFuture bind(int inetPort) {
// InetSocketAddress類:socket編程基礎(chǔ)幔嗦,封裝ip和端口為套接字
return bind(new InetSocketAddress(inetPort));
}
public ChannelFuture bind(SocketAddress localAddress) {
validate();
return doBind(ObjectUtil.checkNotNull(localAddress, "localAddress"));
}
private ChannelFuture doBind(final SocketAddress localAddress) {
// 創(chuàng)建Channel,注冊到對應(yīng)的eventLoop沥潭。異步返回Future對象
final ChannelFuture regFuture = initAndRegister();
final Channel channel = regFuture.channel();
if (regFuture.cause() != null) {
return regFuture;
}
// 因為regFuture是異步返回的崭添,如果成功注冊到eventLoop,則直接調(diào)用doBind0方法綁定端口
if (regFuture.isDone()) {
ChannelPromise promise = channel.newPromise();
doBind0(regFuture, channel, localAddress, promise);
return promise;
} else { // 如果regFuture對象還沒返回叛氨,則添加監(jiān)聽器呼渣,直到有返回內(nèi)容后棘伴,成功注冊則調(diào)用doBind0方法綁定端口
final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
regFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
Throwable cause = future.cause();
if (cause != null) {
promise.setFailure(cause);
} else {
promise.registered();
doBind0(regFuture, channel, localAddress, promise);
}
}
});
return promise;
}
}
其中,doBind方法是啟動ServerBootstrap的核心屁置,咱們先來看一下initAndRegister
方法
// 創(chuàng)建Channel焊夸,注冊到對應(yīng)的eventLoop
final ChannelFuture initAndRegister() {
Channel channel = null;
try {
// 創(chuàng)建channel對象
channel = channelFactory.newChannel();
// 初始化channel
init(channel);
} catch (Throwable t) {
// 省略異常處理代碼
}
// 將channel注冊到eventLoop
ChannelFuture regFuture = config().group().register(channel);
// 省略異常處理代碼
return regFuture;
}
去掉異常處理的代碼,可以看到主要調(diào)用的方法是newChannel()
蓝角、init(channel)
阱穗、register(channel)
方法,簡單來說就是創(chuàng)建Channel-》初始化Channel-》注冊Channel
接下來繼續(xù)重點關(guān)注上面三個步驟的源碼
創(chuàng)建Channel
【Netty源碼系列】服務(wù)端啟動流程(二)創(chuàng)建并初始化ServerBootstrap對象文章中提到使鹅,Channel的創(chuàng)建是被封裝到 ReflectiveChannelFactory 類中揪阶,所以創(chuàng)建Channel是調(diào)用ReflectiveChannelFactory類的newChannel方法
@Override
public T newChannel() {
try {
return constructor.newInstance();
} catch (Throwable t) {
throw new ChannelException("Unable to create Channel from class " + constructor.getDeclaringClass(), t);
}
}
constructor對象是在定義Channel類型的時候定義的,比如下面例子中的channel(NioServerSocketChannel.class)患朱,所以constructor.newInstance()實際上就是調(diào)用NioServerSocketChannel的無參構(gòu)造器
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup) // 綁定線程池組
.channel(NioServerSocketChannel.class) // 服務(wù)端channel類型
.option(ChannelOption.SO_BACKLOG, 100) // TCP配置
.handler(new LoggingHandler(LogLevel.INFO)) // 服務(wù)端Handler
.childHandler(new ChannelInitializer<SocketChannel>() { // 客戶端Handler
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
//p.addLast(new LoggingHandler(LogLevel.INFO));
p.addLast(serverHandler);
}
});
public NioServerSocketChannel() {
this(newSocket(DEFAULT_SELECTOR_PROVIDER));
}
private static ServerSocketChannel newSocket(SelectorProvider provider) {
try {
// 調(diào)用JDK原生的方法鲁僚,創(chuàng)建ServerSocketChannel對象
return provider.openServerSocketChannel();
} catch (IOException e) {
throw new ChannelException("Failed to open a server socket.", e);
}
}
public NioServerSocketChannel(ServerSocketChannel channel) {
// 調(diào)用父類的構(gòu)造器,封裝ServerSocketChannel對象為NioServerSocketChannel裁厅,并初始化相關(guān)的屬性冰沙,如:unSafe、pipeline等
super(null, channel, SelectionKey.OP_ACCEPT);
// NioServerSocketChannel相關(guān)配置
config = new NioServerSocketChannelConfig(this, javaChannel().socket());
}
上面就是創(chuàng)建NioServerSocketChannel的流程执虹,實際上底層調(diào)用的是JDK原生代碼拓挥,通過原生代碼創(chuàng)建出ServerSocketChannel對象,然后在Netty內(nèi)部封裝為NioServerSocketChannel袋励,并初始化相關(guān)的屬性侥啤。
ps.請各位注意,NioServerSocketChannel構(gòu)造器中茬故,調(diào)用父級的流程請各位自行動手debug看下盖灸,也有不少重點需要注意,比如設(shè)置channel為非阻塞均牢,channel的id玄叠,unsafe苫费、pipeline是如何初始化等等
初始化channel
/**
* 1. 設(shè)置channel option
* 2. 設(shè)置channel attribute
* 3. 設(shè)置channel pipeline
* 3.1 pipeline 添加 handler
* @param channel
*/
@Override
void init(Channel channel) {
// 1. 設(shè)置channel option
setChannelOptions(channel, newOptionsArray(), logger);
// 2. 設(shè)置channel attribute
setAttributes(channel, attrs0().entrySet().toArray(EMPTY_ATTRIBUTE_ARRAY));
// 獲取channel的pipeline對象
ChannelPipeline p = channel.pipeline();
final EventLoopGroup currentChildGroup = childGroup;
final ChannelHandler currentChildHandler = childHandler;
final Entry<ChannelOption<?>, Object>[] currentChildOptions;
// 設(shè)置socket channel的option
synchronized (childOptions) {
currentChildOptions = childOptions.entrySet().toArray(EMPTY_OPTION_ARRAY);
}
// 設(shè)置socket channel的attribute
final Entry<AttributeKey<?>, Object>[] currentChildAttrs = childAttrs.entrySet().toArray(EMPTY_ATTRIBUTE_ARRAY);
// pipeline 添加 handler
p.addLast(new ChannelInitializer<Channel>() {
@Override
public void initChannel(final Channel ch) {
final ChannelPipeline pipeline = ch.pipeline();
ChannelHandler handler = config.handler();
if (handler != null) {
pipeline.addLast(handler);
}
// 為每一個ServerSocketChannel的pipeline添加ServerBootstrapAcceptor處理器监氢,該處理器專門用于接收客戶端請求的連接
ch.eventLoop().execute(new Runnable() {
@Override
public void run() {
pipeline.addLast(new ServerBootstrapAcceptor(
ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
}
});
}
初始化channel相關(guān)代碼我都加上注釋理逊,其實都比較簡單明了受啥,依次設(shè)置了ServerSocketChannel的option饮潦、attribute相關(guān)屬性银觅,除此之外首繁,還設(shè)置了pipeline內(nèi)的處理器坞琴。值得注意的是哨查,每個ServerSocketChannel都會添加ServerBootstrapAcceptor
處理器,這個處理器在客戶端請求的時候?qū)l(fā)揮很大的作用剧辐,簡單來說寒亥, 該處理器會將SocketChannel注冊到WorkerGroup的某一個EventLoop邮府,然后交給WorkerGroup的線程處理客戶端的請求。
讓我們回過頭溉奕,當(dāng)ServerSocketChannel初始化完成后褂傀,就會將當(dāng)前的ServerSocketChannel注冊到BossGroup的EventLoop,在 initAndRegister 方法中對應(yīng)的代碼是ChannelFuture regFuture = config().group().register(channel)
加勤。通過debug分析仙辟,實際上是調(diào)用SingleThreadEventLoop的register方法
注冊Channel
/**
* 將channel封裝成channelPromise對象
* @param channel
* @return
*/
@Override
public ChannelFuture register(Channel channel) {
return register(new DefaultChannelPromise(channel, this));
}
@Override
public ChannelFuture register(final ChannelPromise promise) {
ObjectUtil.checkNotNull(promise, "promise");
// 將當(dāng)前channel注冊到eventLoop
promise.channel().unsafe().register(this, promise);
return promise;
}
通過上面源碼可以看到,promise.channel().unsafe().register(this, promise)
會將channel注冊到EventLoop上鳄梅,實際上是調(diào)用AbstractChannel&AbstractUnsafe的register方法叠国,那繼續(xù)往下看...
@Override
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
// 省略一些判斷邏輯
AbstractChannel.this.eventLoop = eventLoop;
if (eventLoop.inEventLoop()) {
register0(promise);
} else {
try {
eventLoop.execute(new Runnable() {
@Override
public void run() {
register0(promise);
}
});
} catch (Throwable t) {
// 省略異常邏輯
}
}
}
private void register0(ChannelPromise promise) {
try {
// 省略一些判斷邏輯
boolean firstRegistration = neverRegistered;
// channel注冊到eventLoop的核心方法!4魇粟焊!
doRegister();
neverRegistered = false;
registered = true;
pipeline.invokeHandlerAddedIfNeeded();
safeSetSuccess(promise);
// 調(diào)用下一個channelInboundHandler的channelRegistered()方法
pipeline.fireChannelRegistered();
// 如果Channel處于活動狀態(tài)
if (isActive()) {
if (firstRegistration) {
// 調(diào)用下一個channelInbooundHandler的channelActive()方法
pipeline.fireChannelActive();
} else if (config().isAutoRead()) {
beginRead();
}
}
} catch (Throwable t) {
// 省略異常邏輯
}
}
通過一層層的debug,終于接近ServerSocketChannel注冊到eventLoop的底層實現(xiàn)啦校赤!繼續(xù)往下看doRegister()
方法吆玖。由于當(dāng)前channel是NioServerSocketChannel,所以doRegister方法是在AbstractNioChannel類中實現(xiàn)的
@Override
protected void doRegister() throws Exception {
boolean selected = false;
for (;;) {
try {
selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
return;
} catch (CancelledKeyException e) {
// 省略異常邏輯
}
}
}
我們終于知道在Netty中是如何將服務(wù)端ServerSocketChannel注冊到BossGroup的eventLoop上马篮。實際上是使用JDK原生的register方法注冊ServerSocketChannel到BossGroup的eventLoop的Selector上
目前為止沾乘,服務(wù)端啟動流程就可以啟動了嗎?不不不浑测,還差最后一步翅阵,綁定端口!
綁定端口————doBind0方法
綁定端口涉及到的類較多迁央,我這里就不一一展示出來掷匠,只展示最后一步綁定端口的相關(guān)方法,其完整的調(diào)用邏輯可以參考下面的時序圖
// NioServerSocketChannel類
@SuppressJava6Requirement(reason = "Usage guarded by java version check")
@Override
protected void doBind(SocketAddress localAddress) throws Exception {
if (PlatformDependent.javaVersion() >= 7) {
javaChannel().bind(localAddress, config.getBacklog());
} else {
javaChannel().socket().bind(localAddress, config.getBacklog());
}
}
通過調(diào)用NioServerSocketChannel類doBind的方法就可以綁定端口岖圈,源碼中可以發(fā)現(xiàn)實際上是調(diào)用JDK原生綁定端口方法讹语。以上就是服務(wù)端啟動的大致流程,希望可以通過上面的流程圖和源碼剖析過程中的注釋和解析可以幫助各位更加了解Netty服務(wù)端的啟動流程蜂科,這對使我們更加熟悉Netty框架顽决,并學(xué)習(xí)其中一些編程思想,幫助我們?nèi)粘>幋a過程中导匣,可以模仿這些優(yōu)秀框架的設(shè)計模式和思想寫出更加優(yōu)質(zhì)的代碼才菠!
如果覺得文章不錯的話,麻煩點個贊哈贡定,你的鼓勵就是我的動力赋访!對于文章有哪里不清楚或者有誤的地方,歡迎在評論區(qū)留言~