background
netty 是一個異步事件驅(qū)動的網(wǎng)絡(luò)通信層框架,其官方文檔的解釋為
Netty is a NIO client server framework which enables quick and easy development of network applications such as protocol servers and clients. It greatly simplifies and streamlines network programming such as TCP and UDP socket server.
我們在新美大消息推送系統(tǒng)sailfish(日均推送消息量50億)肛冶,新美大移動端代理優(yōu)化系統(tǒng)shark(日均吞吐量30億)中,均選擇了netty作為底層網(wǎng)絡(luò)通信框架枫浙。
既然兩大如此重要的系統(tǒng)底層都使用到了netty,所以必然要對netty的機制滤馍,甚至源碼了若指掌端蛆,于是床三,便催生了netty源碼系列文章纠炮。后面月趟,我會通過一系列的主題把我從netty源碼里所學(xué)到的毫無保留地介紹給你,源碼基于4.1.6.Final
why netty
netty底層基于jdk的NIO恢口,我們?yōu)槭裁床恢苯踊趈dk的nio或者其他nio框架孝宗?下面是我總結(jié)出來的原因
1.使用jdk自帶的nio需要了解太多的概念,編程復(fù)雜
2.netty底層IO模型隨意切換耕肩,而這一切只需要做微小的改動
3.netty自帶的拆包解包因妇,異常檢測等機制讓你從nio的繁重細節(jié)中脫離出來,讓你只需要關(guān)心業(yè)務(wù)邏輯
4.netty解決了jdk的很多包括空輪訓(xùn)在內(nèi)的bug
5.netty底層對線程猿诸,selector做了很多細小的優(yōu)化婚被,精心設(shè)計的reactor線程做到非常高效的并發(fā)處理
6.自帶各種協(xié)議棧讓你處理任何一種通用協(xié)議都幾乎不用親自動手
7.netty社區(qū)活躍,遇到問題隨時郵件列表或者issue
8.netty已經(jīng)歷各大rpc框架梳虽,消息中間件址芯,分布式通信中間件線上的廣泛驗證,健壯性無比強大
dive into netty
了解了這么多窜觉,今天我們就從一個例子出來谷炸,開始我們的netty源碼之旅。
本篇主要講述的是netty是如何綁定端口竖螃,啟動服務(wù)淑廊。啟動服務(wù)的過程中,你將會了解到netty各大核心組件特咆,我先不會細講這些組件,而是會告訴你各大組件是怎么串起來組成netty的核心
example
下面是一個非常簡單的服務(wù)端啟動代碼
public final class SimpleServer {
public static void main(String[] args) throws Exception {
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.handler(new SimpleServerHandler())
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
}
});
ChannelFuture f = b.bind(8888).sync();
f.channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
private static class SimpleServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("channelActive");
}
@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
System.out.println("channelRegistered");
}
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
System.out.println("handlerAdded");
}
}
}
簡單的幾行代碼就能開啟一個服務(wù)端录粱,端口綁定在8888腻格,使用nio模式,下面講下每一個步驟的處理細節(jié)
EventLoopGroup
已經(jīng)在我的其他文章中詳細剖析過啥繁,說白了菜职,就是一個死循環(huán),不停地檢測IO事件旗闽,處理IO事件酬核,執(zhí)行任務(wù)
ServerBootstrap
是服務(wù)端的一個啟動輔助類,通過給他設(shè)置一系列參數(shù)來綁定端口啟動服務(wù)
group(bossGroup, workerGroup)
我們需要兩種類型的人干活适室,一個是老板嫡意,一個是工人,老板負(fù)責(zé)從外面接活捣辆,接到的活分配給工人干蔬螟,放到這里,bossGroup
的作用就是不斷地accept到新的連接汽畴,將新的連接丟給workerGroup
來處理
.channel(NioServerSocketChannel.class)
表示服務(wù)端啟動的是nio相關(guān)的channel旧巾,channel在netty里面是一大核心概念耸序,可以理解為一條channel就是一個連接或者一個服務(wù)端bind動作,后面會細說
.handler(new SimpleServerHandler()
表示服務(wù)器啟動過程中鲁猩,需要經(jīng)過哪些流程坎怪,這里SimpleServerHandler
最終的頂層接口為ChannelHander
,是netty的一大核心概念廓握,表示數(shù)據(jù)流經(jīng)過的處理器搅窿,可以理解為流水線上的每一道關(guān)卡
childHandler(new ChannelInitializer<SocketChannel>)...
表示一條新的連接進來之后,該怎么處理疾棵,也就是上面所說的戈钢,老板如何給工人配活
ChannelFuture f = b.bind(8888).sync();
這里就是真正的啟動過程了,綁定8888端口是尔,等待服務(wù)器啟動完畢殉了,才會進入下行代碼
f.channel().closeFuture().sync();
等待服務(wù)端關(guān)閉socket
bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully();
關(guān)閉兩組死循環(huán)
上述代碼可以很輕松地再本地跑起來,最終控制臺的輸出為:
handlerAdded
channelRegistered
channelActive
關(guān)于為什么會順序輸出這些拟枚,深入分析之后其實很easy
深入細節(jié)
ServerBootstrap
一系列的參數(shù)配置其實沒啥好講的薪铜,無非就是使用method chaining的方式將啟動服務(wù)器需要的參數(shù)保存到filed。我們的重點落入到下面這段代碼
b.bind(8888).sync();
這里說一句:我們剛開始看源碼恩溅,對細節(jié)沒那么清楚的情況下可以借助IDE的debug功能隔箍,step by step,one step one test或者二分test的方式脚乡,來確定哪行代碼是最終啟動服務(wù)的入口蜒滩,在這里,我們已經(jīng)確定了bind方法是入口奶稠,我們跟進去俯艰,分析
public ChannelFuture bind(int inetPort) {
return bind(new InetSocketAddress(inetPort));
}
通過端口號創(chuàng)建一個 InetSocketAddress
,然后繼續(xù)bind
public ChannelFuture bind(SocketAddress localAddress) {
validate();
if (localAddress == null) {
throw new NullPointerException("localAddress");
}
return doBind(localAddress);
}
validate()
驗證服務(wù)啟動需要的必要參數(shù)锌订,然后調(diào)用doBind()
private ChannelFuture doBind(final SocketAddress localAddress) {
//...
final ChannelFuture regFuture = initAndRegister();
//...
final Channel channel = regFuture.channel();
//...
doBind0(regFuture, channel, localAddress, promise);
//...
return promise;
}
這里竹握,我去掉了細枝末節(jié),讓我們專注于核心方法辆飘,其實就兩大核心一個是 initAndRegister()
啦辐,以及doBind0()
其實,從方法名上面我們已經(jīng)可以略窺一二蜈项,init->初始化芹关,register->注冊,那么到底要注冊到什么呢战得?聯(lián)系到nio里面輪詢器的注冊充边,可能是把某個東西初始化好了之后注冊到selector上面去,最后bind,像是在本地綁定端口號浇冰,帶著這些猜測贬媒,我們深入下去
initAndRegister()
final ChannelFuture initAndRegister() {
Channel channel = null;
// ...
channel = channelFactory.newChannel();
//...
init(channel);
//...
ChannelFuture regFuture = config().group().register(channel);
//...
return regFuture;
}
我們還是專注于核心代碼,拋開邊角料肘习,我們看到 initAndRegister()
做了幾件事情
1.new一個channel
2.init這個channel
3.將這個channel register到某個對象
我們逐步分析這三件事情
1.new一個channel
我們首先要搞懂channel的定義际乘,netty官方對channel的描述如下
A nexus to a network socket or a component which is capable of I/O operations such as read, write, connect, and bind
這里的channel,由于是在服務(wù)啟動的時候創(chuàng)建漂佩,我們可以和普通Socket編程中的ServerSocket對應(yīng)上脖含,表示服務(wù)端綁定的時候經(jīng)過的一條流水線
我們發(fā)現(xiàn)這條channel是通過一個 channelFactory
new出來的,channelFactory
的接口很簡單
public interface ChannelFactory<T extends Channel> extends io.netty.bootstrap.ChannelFactory<T> {
/**
* Creates a new channel.
*/
@Override
T newChannel();
}
就一個方法投蝉,我們查看channelFactory被賦值的地方
AbstractBootstrap.java
public B channelFactory(ChannelFactory<? extends C> channelFactory) {
if (channelFactory == null) {
throw new NullPointerException("channelFactory");
}
if (this.channelFactory != null) {
throw new IllegalStateException("channelFactory set already");
}
this.channelFactory = channelFactory;
return (B) this;
}
在這里被賦值养葵,我們層層回溯,查看該函數(shù)被調(diào)用的地方瘩缆,發(fā)現(xiàn)最終是在這個函數(shù)中关拒,ChannelFactory被new出
public B channel(Class<? extends C> channelClass) {
if (channelClass == null) {
throw new NullPointerException("channelClass");
}
return channelFactory(new ReflectiveChannelFactory<C>(channelClass));
}
這里,我們的demo程序調(diào)用channel(channelClass)
方法的時候庸娱,將channelClass
作為ReflectiveChannelFactory
的構(gòu)造函數(shù)創(chuàng)建出一個ReflectiveChannelFactory
demo端的代碼如下:
.channel(NioServerSocketChannel.class);
然后回到本節(jié)最開始
channelFactory.newChannel();
我們就可以推斷出着绊,最終是調(diào)用到 ReflectiveChannelFactory.newChannel()
方法,跟進
public class ReflectiveChannelFactory<T extends Channel> implements ChannelFactory<T> {
private final Class<? extends T> clazz;
public ReflectiveChannelFactory(Class<? extends T> clazz) {
if (clazz == null) {
throw new NullPointerException("clazz");
}
this.clazz = clazz;
}
@Override
public T newChannel() {
try {
return clazz.newInstance();
} catch (Throwable t) {
throw new ChannelException("Unable to create Channel from class " + clazz, t);
}
}
}
看到clazz.newInstance();
熟尉,我們明白了归露,原來是通過反射的方式來創(chuàng)建一個對象,而這個class就是我們在ServerBootstrap
中傳入的NioServerSocketChannel.class
結(jié)果斤儿,繞了一圈剧包,最終創(chuàng)建channel相當(dāng)于調(diào)用默認(rèn)構(gòu)造函數(shù)new出一個 NioServerSocketChannel
對象
這里提一下,讀源碼細節(jié)往果,有兩種讀的方式玄捕,一種是回溯,比如用到某個對象的時候可以逐層追溯棚放,一定會找到該對象的最開始被創(chuàng)建的代碼區(qū)塊,還有一種方式就是自頂向下馅闽,逐層分析飘蚯,一般用在分析某個具體的方法,庖丁解牛福也,最后拼接出完整的流程
接下來我們就可以將重心放到 NioServerSocketChannel
的默認(rèn)構(gòu)造函數(shù)
private static final SelectorProvider DEFAULT_SELECTOR_PROVIDER = SelectorProvider.provider();
public NioServerSocketChannel() {
this(newSocket(DEFAULT_SELECTOR_PROVIDER));
}
private static ServerSocketChannel newSocket(SelectorProvider provider) {
//...
return provider.openServerSocketChannel();
}
通過SelectorProvider.openServerSocketChannel()
創(chuàng)建一條server端channel局骤,然后進入到以下方法
public NioServerSocketChannel(ServerSocketChannel channel) {
super(null, channel, SelectionKey.OP_ACCEPT);
config = new NioServerSocketChannelConfig(this, javaChannel().socket());
}
這里第一行代碼就跑到父類里面去了,第二行暴凑,new出來一個 NioServerSocketChannelConfig
峦甩,其頂層接口為 ChannelConfig
,netty官方的描述如下
A set of configuration properties of a Channel.
基本可以判定,ChannelConfig
也是netty里面的一大核心模塊凯傲,初次看源碼犬辰,看到這里,我們大可不必深挖這個對象冰单,而是在用到的時候再回來深究幌缝,只要記住,這個對象在創(chuàng)建NioServerSocketChannel
對象的時候被創(chuàng)建即可
我們繼續(xù)追蹤到 NioServerSocketChannel
的父類
AbstractNioMessageChannel.java
protected AbstractNioMessageChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
super(parent, ch, readInterestOp);
}
繼續(xù)往上追
AbstractNioChannel.java
protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
super(parent);
this.ch = ch;
this.readInterestOp = readInterestOp;
//...
ch.configureBlocking(false);
//...
}
這里诫欠,簡單地將前面 provider.openServerSocketChannel();
創(chuàng)建出來的 ServerSocketChannel
保存到成員變量涵卵,然后調(diào)用ch.configureBlocking(false);
設(shè)置該channel為非阻塞模式,標(biāo)準(zhǔn)的jdk nio編程的玩法
這里的 readInterestOp
即前面層層傳入的 SelectionKey.OP_ACCEPT
荒叼,接下來重點分析 super(parent);
(這里的parent其實是null轿偎,由前面寫死傳入)
AbstractChannel.java
protected AbstractChannel(Channel parent) {
this.parent = parent;
id = newId();
unsafe = newUnsafe();
pipeline = newChannelPipeline();
}
到了這里,又new出來三大組件被廓,賦值到成員變量坏晦,分別為
id = newId();
protected ChannelId newId() {
return DefaultChannelId.newInstance();
}
id是netty中每條channel的唯一標(biāo)識,這里不細展開伊者,接著
unsafe = newUnsafe();
protected abstract AbstractUnsafe newUnsafe();
查看Unsafe的定義
Unsafe operations that should never be called from user-code. These methods are only provided to implement the actual transport, and must be invoked from an I/O thread
成功捕捉netty的又一大組件英遭,我們可以先不用管TA是干嘛的,只需要知道這里的 newUnsafe
方法最終屬于類NioServerSocketChannel
中
最后
pipeline = newChannelPipeline();
protected DefaultChannelPipeline newChannelPipeline() {
return new DefaultChannelPipeline(this);
}
protected DefaultChannelPipeline(Channel channel) {
this.channel = ObjectUtil.checkNotNull(channel, "channel");
succeededFuture = new SucceededChannelFuture(channel, null);
voidPromise = new VoidChannelPromise(channel, true);
tail = new TailContext(this);
head = new HeadContext(this);
head.next = tail;
tail.prev = head;
}
初次看這段代碼亦渗,可能并不知道 DefaultChannelPipeline
是干嘛用的挖诸,我們?nèi)匀皇褂蒙厦娴姆绞剑榭错攲咏涌?code>ChannelPipeline的定義
A list of ChannelHandlers which handles or intercepts inbound events and outbound operations of a Channel
從該類的文檔中可以看出法精,該接口基本上又是netty的一大核心模塊
到了這里多律,我們總算把一個服務(wù)端channel創(chuàng)建完畢了,將這些細節(jié)串起來的時候搂蜓,我們順帶提取出netty的幾大基本組件狼荞,先總結(jié)如下
- Channel
- ChannelConfig
- ChannelId
- Unsafe
- Pipeline
- ChannelHander
初次看代碼的時候,我們的目標(biāo)是跟到服務(wù)器啟動的那一行代碼帮碰,我們先把以上這幾個組件記下來相味,等代碼跟完,我們就可以自頂向下殉挽,逐層分析丰涉,我會放到后面源碼系列中去深入到每個組件
總結(jié)一下,用戶調(diào)用方法 Bootstrap.bind(port)
第一步就是通過反射的方式new一個NioServerSocketChannel
對象斯碌,并且在new的過程中創(chuàng)建了一系列的核心組件一死,僅此而已,并無他傻唾,真正的啟動我們還需要繼續(xù)跟
2.init這個channel
到了這里投慈,你最好跳到文章最開始的地方回憶一下,第一步newChannel完畢,這里就對這個channel做init伪煤,init方法具體干啥加袋,我們深入
@Override
void init(Channel channel) throws Exception {
final Map<ChannelOption<?>, Object> options = options0();
synchronized (options) {
channel.config().setOptions(options);
}
final Map<AttributeKey<?>, Object> attrs = attrs0();
synchronized (attrs) {
for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
@SuppressWarnings("unchecked")
AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();
channel.attr(key).set(e.getValue());
}
}
ChannelPipeline p = channel.pipeline();
final EventLoopGroup currentChildGroup = childGroup;
final ChannelHandler currentChildHandler = childHandler;
final Entry<ChannelOption<?>, Object>[] currentChildOptions;
final Entry<AttributeKey<?>, Object>[] currentChildAttrs;
synchronized (childOptions) {
currentChildOptions = childOptions.entrySet().toArray(newOptionArray(childOptions.size()));
}
synchronized (childAttrs) {
currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(childAttrs.size()));
}
p.addLast(new ChannelInitializer<Channel>() {
@Override
public void initChannel(Channel ch) throws Exception {
final ChannelPipeline pipeline = ch.pipeline();
ChannelHandler handler = config.handler();
if (handler != null) {
pipeline.addLast(handler);
}
ch.eventLoop().execute(new Runnable() {
@Override
public void run() {
pipeline.addLast(new ServerBootstrapAcceptor(
currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
}
});
}
初次看到這個方法,可能會覺得带族,哇塞锁荔,老長了,這可這么看蝙砌?還記得我們前面所說的嗎阳堕,庖丁解牛,逐步拆解择克,最后歸一恬总,下面是我的拆解步驟
1.設(shè)置option和attr
final Map<ChannelOption<?>, Object> options = options0();
synchronized (options) {
channel.config().setOptions(options);
}
final Map<AttributeKey<?>, Object> attrs = attrs0();
synchronized (attrs) {
for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
@SuppressWarnings("unchecked")
AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();
channel.attr(key).set(e.getValue());
}
}
通過這里我們可以看到,這里先調(diào)用options0()
以及attrs0()
肚邢,然后將得到的options和attrs注入到channelConfig或者channel中壹堰,關(guān)于option和attr是干嘛用的,其實你現(xiàn)在不用了解得那么深入骡湖,只需要查看最頂層接口ChannelOption
以及查看一下channel的具體繼承關(guān)系贱纠,就可以了解,我把這兩個也放到后面的源碼分析系列再講
2.設(shè)置新接入channel的option和attr
final EventLoopGroup currentChildGroup = childGroup;
final ChannelHandler currentChildHandler = childHandler;
final Entry<ChannelOption<?>, Object>[] currentChildOptions;
final Entry<AttributeKey<?>, Object>[] currentChildAttrs;
synchronized (childOptions) {
currentChildOptions = childOptions.entrySet().toArray(newOptionArray(childOptions.size()));
}
synchronized (childAttrs) {
currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(childAttrs.size()));
}
這里响蕴,和上面類似谆焊,只不過不是設(shè)置當(dāng)前channel的這兩個屬性,而是對應(yīng)到新進來連接對應(yīng)的channel浦夷,由于我們這篇文章只關(guān)心到server如何啟動辖试,接入連接放到下一篇文章中詳細剖析
3.加入新連接處理器
p.addLast(new ChannelInitializer<Channel>() {
@Override
public void initChannel(Channel ch) throws Exception {
final ChannelPipeline pipeline = ch.pipeline();
ChannelHandler handler = config.handler();
if (handler != null) {
pipeline.addLast(handler);
}
ch.eventLoop().execute(new Runnable() {
@Override
public void run() {
pipeline.addLast(new ServerBootstrapAcceptor(
currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
}
});
到了最后一步,p.addLast()
向serverChannel的流水線處理器中加入了一個 ServerBootstrapAcceptor
劈狐,從名字上就可以看出來罐孝,這是一個接入器,專門接受新請求肥缔,把新的請求扔給某個事件循環(huán)器莲兢,我們先不做過多分析
來,我們總結(jié)一下续膳,我們發(fā)現(xiàn)其實init也沒有啟動服務(wù)怒见,只是初始化了一些基本的配置和屬性,以及在pipeline上加入了一個接入器姑宽,用來專門接受新連接,我們還得繼續(xù)往下跟
3.將這個channel register到某個對象
這一步闺阱,我們是分析如下方法
ChannelFuture regFuture = config().group().register(channel);
調(diào)用到 NioEventLoop
中的register
@Override
public ChannelFuture register(Channel channel) {
return register(new DefaultChannelPromise(channel, this));
}
@Override
public ChannelFuture register(final ChannelPromise promise) {
ObjectUtil.checkNotNull(promise, "promise");
promise.channel().unsafe().register(this, promise);
return promise;
}
好了炮车,到了這一步,還記得這里的unsafe()
返回的應(yīng)該是什么對象嗎?不記得的話可以看下前面關(guān)于unsafe的描述瘦穆,或者最快的方式就是debug到這邊纪隙,跟到register方法里面,看看是哪種類型的unsafe
我們跟進去之后發(fā)現(xiàn)是
AbstractUnsafe.java
@Override
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
// ...
AbstractChannel.this.eventLoop = eventLoop;
// ...
register0(promise);
}
這里我們依然只需要focus重點扛或,先將EventLoop事件循環(huán)器綁定到該NioServerSocketChannel上绵咱,然后調(diào)用 register0()
private void register0(ChannelPromise promise) {
try {
boolean firstRegistration = neverRegistered;
doRegister();
neverRegistered = false;
registered = true;
pipeline.invokeHandlerAddedIfNeeded();
safeSetSuccess(promise);
pipeline.fireChannelRegistered();
if (isActive()) {
if (firstRegistration) {
pipeline.fireChannelActive();
} else if (config().isAutoRead()) {
beginRead();
}
}
} catch (Throwable t) {
closeForcibly();
closeFuture.setClosed();
safeSetFailure(promise, t);
}
}
這一段其實也很清晰,先調(diào)用 doRegister();
熙兔,具體干啥待會再講悲伶,然后調(diào)用invokeHandlerAddedIfNeeded()
, 于是乎,控制臺第一行打印出來的就是
handlerAdded
關(guān)于最終是如何調(diào)用到的住涉,我們后面詳細剖析pipeline的時候再講
然后調(diào)用 pipeline.fireChannelRegistered();
調(diào)用之后麸锉,控制臺的顯示為
handlerAdded
channelRegistered
繼續(xù)往下跟
if (isActive()) {
if (firstRegistration) {
pipeline.fireChannelActive();
} else if (config().isAutoRead()) {
beginRead();
}
}
讀到這,你可能會想當(dāng)然地以為舆声,控制臺最后一行
pipeline.fireChannelActive();
由這行代碼輸出花沉,我們不妨先看一下 isActive()
方法
@Override
public boolean isActive() {
return javaChannel().socket().isBound();
}
最終調(diào)用到j(luò)dk中
ServerSocket.java
/**
* Returns the binding state of the ServerSocket.
*
* @return true if the ServerSocket succesfuly bound to an address
* @since 1.4
*/
public boolean isBound() {
// Before 1.3 ServerSockets were always bound during creation
return bound || oldImpl;
}
這里isBound()
返回false,但是從目前我們跟下來的流程看媳握,我們并沒有將一個ServerSocket綁定到一個address碱屁,所以 isActive()
返回false,我們沒有成功進入到pipeline.fireChannelActive();
方法蛾找,那么最后一行到底是誰輸出的呢娩脾,我們有點抓狂,其實腋粥,只要熟練運用IDE晦雨,要定位函數(shù)調(diào)用棧,無比簡單
下面是我用intellij定位函數(shù)調(diào)用的具體方法
我們先在最終輸出文字的這一行代碼處打一個斷點隘冲,然后debug闹瞧,運行到這一行,intellij自動給我們拉起了調(diào)用棧展辞,我們唯一要做的事奥邮,就是移動方向鍵,就能看到函數(shù)的完整的調(diào)用鏈
如果你看到方法的最近的發(fā)起端是一個線程Runnable的run方法罗珍,那么就在提交Runnable對象方法的地方打一個斷點洽腺,去掉其他斷點,重新debug覆旱,比如我們首次debug發(fā)現(xiàn)調(diào)用棧中的最近的一個Runnable如下
if (!wasActive && isActive()) {
invokeLater(new Runnable() {
@Override
public void run() {
pipeline.fireChannelActive();
}
});
}
我們停在了這一行pipeline.fireChannelActive();
, 我們想看最初始的調(diào)用蘸朋,就得跳出來,斷點打到 if (!wasActive && isActive())
扣唱,因為netty里面很多任務(wù)執(zhí)行都是異步線程即reactor線程調(diào)用的(具體可以看reactor線程三部曲中的最后一曲)藕坯,如果我們要查看最先發(fā)起的方法調(diào)用团南,我們必須得查看Runnable被提交的地方,逐次遞歸下去炼彪,就能找到那行"消失的代碼"
最終吐根,通過這種方式,終于找到了 pipeline.fireChannelActive();
的發(fā)起調(diào)用的代碼辐马,不巧拷橘,剛好就是下面的doBind0()
方法
doBind0()
private static void doBind0(
final ChannelFuture regFuture, final Channel channel,
final SocketAddress localAddress, final ChannelPromise promise) {
channel.eventLoop().execute(new Runnable() {
@Override
public void run() {
if (regFuture.isSuccess()) {
channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
} else {
promise.setFailure(regFuture.cause());
}
}
});
}
我們發(fā)現(xiàn),在調(diào)用doBind0(...)
方法的時候喜爷,是通過包裝一個Runnable進行異步化的冗疮,關(guān)于異步化task,可以看下我前面的文章贞奋,netty源碼分析之揭開reactor線程的面紗(三)
好赌厅,接下來我們進入到channel.bind()
方法
AbstractChannel.java
@Override
public ChannelFuture bind(SocketAddress localAddress) {
return pipeline.bind(localAddress);
}
發(fā)現(xiàn)是調(diào)用pipeline的bind方法
@Override
public final ChannelFuture bind(SocketAddress localAddress) {
return tail.bind(localAddress);
}
相信你對tail是什么不是很了解,可以翻到最開始轿塔,tail在創(chuàng)建pipeline的時候出現(xiàn)過特愿,關(guān)于pipeline和tail對應(yīng)的類,我后面源碼系列會詳細解說勾缭,這里揍障,你要想知道接下來代碼的走向,唯一一個比較好的方式就是debug 單步進入俩由,篇幅原因毒嫡,我就不詳細展開
最后,我們來到了如下區(qū)域
HeadContext.java
@Override
public void bind(
ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise)
throws Exception {
unsafe.bind(localAddress, promise);
}
這里的unsafe就是前面提到的 AbstractUnsafe
, 準(zhǔn)確點幻梯,應(yīng)該是 NioMessageUnsafe
我們進入到它的bind方法
@Override
public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {
// ...
boolean wasActive = isActive();
// ...
doBind(localAddress);
if (!wasActive && isActive()) {
invokeLater(new Runnable() {
@Override
public void run() {
pipeline.fireChannelActive();
}
});
}
safeSetSuccess(promise);
}
顯然按照正常流程兜畸,我們前面已經(jīng)分析到 isActive();
方法返回false,進入到 doBind()
之后碘梢,如果channel被激活了咬摇,就發(fā)起pipeline.fireChannelActive();
調(diào)用,最終調(diào)用到用戶方法煞躬,在控制臺打印出了最后一行肛鹏,所以到了這里,你應(yīng)該清楚為什么最終會在控制臺按順序打印出那三行字了吧
doBind()
方法也很簡單
protected void doBind(SocketAddress localAddress) throws Exception {
if (PlatformDependent.javaVersion() >= 7) {
//noinspection Since15
javaChannel().bind(localAddress, config.getBacklog());
} else {
javaChannel().socket().bind(localAddress, config.getBacklog());
}
}
最終調(diào)到了jdk里面的bind方法恩沛,這行代碼過后在扰,正常情況下,就真正進行了端口的綁定雷客。
另外芒珠,通過自頂向下的方式分析,在調(diào)用pipeline.fireChannelActive();
方法的時候搅裙,會調(diào)用到如下方法
HeadContext.java
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelActive();
readIfIsAutoRead();
}
進入 readIfIsAutoRead
private void readIfIsAutoRead() {
if (channel.config().isAutoRead()) {
channel.read();
}
}
分析isAutoRead
方法
private volatile int autoRead = 1;
public boolean isAutoRead() {
return autoRead == 1;
}
由此可見妓局,isAutoRead
方法默認(rèn)返回true总放,于是進入到以下方法
public Channel read() {
pipeline.read();
return this;
}
最終調(diào)用到
AbstractNioUnsafe.java
protected void doBeginRead() throws Exception {
final SelectionKey selectionKey = this.selectionKey;
if (!selectionKey.isValid()) {
return;
}
readPending = true;
final int interestOps = selectionKey.interestOps();
if ((interestOps & readInterestOp) == 0) {
selectionKey.interestOps(interestOps | readInterestOp);
}
}
這里的this.selectionKey
就是我們在前面register步驟返回的對象,前面我們在register的時候好爬,注冊測ops是0
回憶一下注冊
AbstractNioChannel
selectionKey = javaChannel().register(eventLoop().selector, 0, this)
這里相當(dāng)于把注冊過的ops取出來,通過了if條件甥啄,然后調(diào)用
selectionKey.interestOps(interestOps | readInterestOp);
而這里的 readInterestOp
就是前面newChannel的時候傳入的SelectionKey.OP_ACCEPT
存炮,又是標(biāo)準(zhǔn)的jdk nio的玩法,到此蜈漓,你需要了解的細節(jié)基本已經(jīng)差不多了穆桂,就這樣結(jié)束吧!
summary
最后融虽,我們來做下總結(jié)享完,netty啟動一個服務(wù)所經(jīng)過的流程
1.設(shè)置啟動類參數(shù),最重要的就是設(shè)置channel
2.創(chuàng)建server對應(yīng)的channel有额,創(chuàng)建各大組件般又,包括ChannelConfig,ChannelId,ChannelPipeline,ChannelHandler,Unsafe等
3.初始化server對應(yīng)的channel,設(shè)置一些attr巍佑,option茴迁,以及設(shè)置子channel的attr,option萤衰,給server的channel添加新channel接入器堕义,并出發(fā)addHandler,register等事件
4.調(diào)用到j(luò)dk底層做端口綁定,并觸發(fā)active事件脆栋,active觸發(fā)的時候倦卖,真正做服務(wù)端口綁定
另外,文章中閱讀源碼的思路詳細或許也可以給你帶來一些幫助椿争。
如果你想從零到一深入學(xué)習(xí) Netty怕膛,可以加我微信(備注:簡書專享)。
贈送一份目前正在掘金售賣的小冊一份