一、netty服務(wù)啟動(dòng)分析
EventLoopGroup boss = new NioEventLoopGroup();//類圖疗绣,繼承線程池ScheduledExecutorService
EventLoopGroup worker = new NioEventLoopGroup();
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(boss, worker);
bootstrap.channel(NioServerSocketChannel.class);//利用反射構(gòu)造NioServerSocketChannel實(shí)例
//backlog指定了內(nèi)核為此套接口排隊(duì)的最大連接個(gè)數(shù),對(duì)于給定的監(jiān)聽套接口,內(nèi)核要維護(hù)兩個(gè)隊(duì)列:未鏈接隊(duì)列和已連接隊(duì)列军俊,根據(jù)TCP三路握手過程中三個(gè)分節(jié)來分隔這兩個(gè)隊(duì)列
bootstrap.option(ChannelOption.SO_BACKLOG, 2048);
bootstrap.option(ChannelOption.SO_KEEPALIVE, true);
bootstrap.option(ChannelOption.TCP_NODELAY, true);
bootstrap.handler(new LoggingServerHandler());//handler與childHandler不同
bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new MyChannelHandler1());
ch.pipeline().addLast(new MyChannelHandler2());
ch.pipeline().addLast(new MyChannelHandler3());
}
});
ChannelFuture f = bootstrap.bind(port).sync();//bind方法實(shí)現(xiàn)
f.addListener((ChannelFutureListener) future -> {
if (future.isSuccess()) {
//啟動(dòng)成功
}
});
f.channel().closeFuture().sync();
創(chuàng)建 ServerBootstrap 實(shí)例
設(shè)置并綁定 Reactor 線程池
設(shè)置并綁定服務(wù)端 Channel
創(chuàng)建并初始化 ChannelPipeline
添加并設(shè)置 ChannelHandler
綁定并啟動(dòng)監(jiān)聽端口
二、netty服務(wù)啟動(dòng)代碼分析
1捧存、創(chuàng)建兩個(gè)EventLoopGroup
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
bossGroup 為 BOSS 線程組粪躬,用于服務(wù)端接受客戶端的連接, workerGroup 為 worker 線程組,用于進(jìn)行 SocketChannel 的網(wǎng)絡(luò)讀寫矗蕊。當(dāng)然也可以創(chuàng)建一個(gè)線程組短蜕,共享使用。
2傻咖、創(chuàng)建ServerBootstrap實(shí)例
ServerBootStrap為Netty服務(wù)端的啟動(dòng)引導(dǎo)類朋魔,用于幫助用戶快速配置、啟動(dòng)服務(wù)端服務(wù)卿操。提供的方法如下:
ServerBootStrap底層采用裝飾者模式警检。
3、設(shè)置并綁定Reactor線程池
b.group(bossGroup, workerGroup)
EventLoopGroup 為 Netty 線程池害淤,它實(shí)際上就是 EventLoop 的數(shù)組容器扇雕。EventLoop 的職責(zé)是處理所有注冊(cè)到本線程多路復(fù)用器 Selector 上的 Channel,Selector 的輪詢操作由綁定的 EventLoop 線程 run 方法驅(qū)動(dòng)窥摄,在一個(gè)循環(huán)體內(nèi)循環(huán)執(zhí)行镶奉。通俗點(diǎn)講就是一個(gè)死循環(huán),不斷的檢測(cè) I/O 事件崭放、處理 I/O 事件哨苛。
這里設(shè)置了兩個(gè)group,這個(gè)其實(shí)有點(diǎn)兒像我們工作一樣币砂。需要兩類型的工人建峭,一個(gè)老板(bossGroup),一個(gè)工人(workerGroup),老板負(fù)責(zé)從外面接活决摧,工人則負(fù)責(zé)死命干活亿蒸。所以這里 bossGroup 的作用就是不斷地接收新的連接凑兰,接收之后就丟給 workerGroup 來處理,workerGroup 負(fù)責(zé)干活就行(負(fù)責(zé)客戶端連接的 IO 操作)边锁。
4姑食、設(shè)置并綁定服務(wù)端Channel
.channel(NioServerSocketChannel.class)
調(diào)用 ServerBootstrap.channel 方法用于設(shè)置服務(wù)端使用的 Channel,傳遞一個(gè) NioServerSocketChannel Class對(duì)象砚蓬,Netty通過工廠類矢门,利用反射創(chuàng)建NioServerSocketChannel 對(duì)象,如下:
public B channel(Class<? extends C> channelClass) {
if (channelClass == null) {
throw new NullPointerException("channelClass");
}
return channelFactory(new ReflectiveChannelFactory<C>(channelClass));
}
//最終調(diào)用構(gòu)造函數(shù)
public NioServerSocketChannel(ServerSocketChannel channel) {
super(null, channel, SelectionKey.OP_ACCEPT);
config = new NioServerSocketChannelConfig(this, javaChannel().socket());
}
5灰蛙、添加并設(shè)置ChannelHandler
.handler(new LoggingServerHandler())
.childHandler(new ChannelInitializer(){
//省略代碼
})
ServerBootstrap 中的 Handler(childHandler()) 是 NioServerSocketChannel 使用的祟剔,所有連接該監(jiān)聽端口的客戶端都會(huì)執(zhí)行它,父類 AbstractBootstrap 中的 Handler 是一個(gè)工廠類摩梧,它為每一個(gè)新接入的客戶端都創(chuàng)建一個(gè)新的 Handler物延。
handler在server初始化它時(shí)就會(huì)執(zhí)行,而childHandler會(huì)在客戶端成功connect后才執(zhí)行仅父,這是兩者的區(qū)別叛薯。
6、綁定端口笙纤,啟動(dòng)服務(wù)
ChannelFuture future = b.bind(port).sync();
主要步驟:
負(fù)責(zé)創(chuàng)建服務(wù)端的NioServerSocketChannel實(shí)例耗溜;
為NioServerSocketChannel的pipeline添加handler;
注冊(cè)NioServerSocketChannel到selector省容;
二抖拴、源碼詳解
AbstractBootstrap類doBind方法,綁定端口入口
private ChannelFuture doBind(final SocketAddress localAddress) {
final ChannelFuture regFuture = initAndRegister(); //初始化與注冊(cè)
final Channel channel = regFuture.channel();
if (regFuture.cause() != null) {
return regFuture;
}
if (regFuture.isDone()) {
// At this point we know that the registration was complete and successful.
ChannelPromise promise = channel.newPromise();
doBind0(regFuture, channel, localAddress, promise);
return promise;
} else {
// Registration future is almost always fulfilled already, but just in case it's not.
final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
regFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
Throwable cause = future.cause();
if (cause != null) {
// Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
// IllegalStateException once we try to access the EventLoop of the Channel.
promise.setFailure(cause);
} else {
// Registration was successful, so set the correct executor to use.
// See https://github.com/netty/netty/issues/2586
promise.registered();
doBind0(regFuture, channel, localAddress, promise);
}
}
});
return promise;
}
}
private static void doBind0(
final ChannelFuture regFuture, final Channel channel,
final SocketAddress localAddress, final ChannelPromise promise) {
// This method is invoked before channelRegistered() is triggered. Give user handlers a chance to set up
// the pipeline in its channelRegistered() implementation.
channel.eventLoop().execute(new Runnable() {
@Override
public void run() {
if (regFuture.isSuccess()) {
channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
} else {
promise.setFailure(regFuture.cause());
}
}
});
}
initAndRegister方法,創(chuàng)建和初始化channel
final ChannelFuture initAndRegister() {
Channel channel = null;
try {
channel = channelFactory.newChannel(); //創(chuàng)建服務(wù)端Channel
init(channel);//初始化Channel
} catch (Throwable t) {
if (channel != null) {
// channel can be null if newChannel crashed (eg SocketException("too many open files"))
channel.unsafe().closeForcibly();
}
// as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
}
ChannelFuture regFuture = config().group().register(channel); //注冊(cè)selector
if (regFuture.cause() != null) {
if (channel.isRegistered()) {
channel.close();
} else {
channel.unsafe().closeForcibly();
}
}
#channelFactory.newChannel()通過反射創(chuàng)建實(shí)例
public class ReflectiveChannelFactory<T extends Channel> implements ChannelFactory<T> {
private final Class<? extends T> clazz;
public ReflectiveChannelFactory(Class<? extends T> clazz) {
if (clazz == null) {
throw new NullPointerException("clazz");
}
this.clazz = clazz;
}
@Override
public T newChannel() {
try {
return clazz.newInstance();
//clazz由AbstractBootstrap.channel方法傳入腥椒,bootstrap.channel(NioServerSocketChannel.class);
} catch (Throwable t) {
throw new ChannelException("Unable to create Channel from class " + clazz, t);
}
}
@Override
public String toString() {
return StringUtil.simpleClassName(clazz) + ".class";
}
}
#AbstractBootstrap
public B channel(Class<? extends C> channelClass) {
if (channelClass == null) {
throw new NullPointerException("channelClass");
}
return channelFactory(new ReflectiveChannelFactory<C>(channelClass));
}
#NioServerSocketChannel構(gòu)造函數(shù)
public NioServerSocketChannel(ServerSocketChannel channel) {
super(null, channel, SelectionKey.OP_ACCEPT);//調(diào)用AbstractNioChannel構(gòu)造函數(shù)
config = new NioServerSocketChannelConfig(this, javaChannel().socket());
}
#AbstractNioChannel構(gòu)造函數(shù)
protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
super(parent);
this.ch = ch;
this.readInterestOp = readInterestOp;
try {
ch.configureBlocking(false);//非阻塞方式
} catch (IOException e) {
try {
ch.close();
} catch (IOException e2) {
if (logger.isWarnEnabled()) {
logger.warn(
"Failed to close a partially initialized socket.", e2);
}
}
throw new ChannelException("Failed to enter non-blocking mode.", e);
}
}
#init方法阿宅,初始化channel參數(shù),添加handler
void init(Channel channel) throws Exception {
final Map<ChannelOption<?>, Object> options = options0();
synchronized (options) {
channel.config().setOptions(options);
}
final Map<AttributeKey<?>, Object> attrs = attrs0();
synchronized (attrs) {//服務(wù)端Channel屬性
for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
@SuppressWarnings("unchecked")
AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();
channel.attr(key).set(e.getValue());
}
}
ChannelPipeline p = channel.pipeline();
final EventLoopGroup currentChildGroup = childGroup;
final ChannelHandler currentChildHandler = childHandler;
final Entry<ChannelOption<?>, Object>[] currentChildOptions;
final Entry<AttributeKey<?>, Object>[] currentChildAttrs;
synchronized (childOptions) {
currentChildOptions = childOptions.entrySet().toArray(newOptionArray(childOptions.size()));
}
synchronized (childAttrs) {
currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(childAttrs.size()));
}
p.addLast(new ChannelInitializer<Channel>() {
@Override
public void initChannel(Channel ch) throws Exception {
final ChannelPipeline pipeline = ch.pipeline();
ChannelHandler handler = config.handler();//傳入hander
if (handler != null) {
pipeline.addLast(handler);
}
// We add this handler via the EventLoop as the user may have used a ChannelInitializer as handler.
// In this case the initChannel(...) method will only be called after this method returns. Because
// of this we need to ensure we add our handler in a delayed fashion so all the users handler are
// placed in front of the ServerBootstrapAcceptor.
//默認(rèn)添加的ServerBootstrapAcceptor的hander笼蛛,連接接入器處理新鏈接接入時(shí)洒放,初始化Options和Attrs
ch.eventLoop().execute(new Runnable() {
@Override
public void run() {
pipeline.addLast(new ServerBootstrapAcceptor(
currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
}
});
}
register方法,注冊(cè)selector
#unsafe類register方法
//注冊(cè)到Reactor線程的多路復(fù)用器上監(jiān)聽新客戶端的接入
public final void register(final ChannelPromise promise) {
if (eventLoop.inEventLoop()) {//是否在當(dāng)前eventLoop中
register0(promise);
} else {
try {
eventLoop.execute(new Runnable() {//不在當(dāng)前eventLoop中滨砍,異步執(zhí)行
@Override
public void run() {
register0(promise);
}
});
} catch (Throwable t) {
logger.warn(
"Force-closing a channel whose registration task was not accepted by an event loop: {}",
AbstractChannel.this, t);
closeForcibly();
closeFuture.setClosed();
promise.setFailure(t);
}
}
}
#unsafe類register0方法
private void register0(ChannelPromise promise) {
try {
// check if the channel is still open as it could be closed in the mean time when the register
// call was outside of the eventLoop
if (!ensureOpen(promise)) {
return;
}
doRegister();
registered = true;
promise.setSuccess();
pipeline.fireChannelRegistered();
if (isActive()) {
pipeline.fireChannelActive();
}
} catch (Throwable t) {
// Close the channel directly to avoid FD leak.
closeForcibly();
closeFuture.setClosed();
if (!promise.tryFailure(t)) {
logger.warn(
"Tried to fail the registration promise, but it is complete already. " +
"Swallowing the cause of the registration failure:", t);
}
}
}
#AbstractNioChannel類doRegister方法
protected void doRegister() throws Exception {
boolean selected = false;
for (;;) {
try {
selectionKey = javaChannel().register(eventLoop().selector, 0, this);
//獲取selectionKey 往湿,通過SelectionKey的interestOps(int ops)方法可以修改監(jiān)聽操作位,注冊(cè)O(shè)P_ACCEPT(16)到多路復(fù)用器上
return;
} catch (CancelledKeyException e) {
if (!selected) {
// Force the Selector to select now as the "canceled" SelectionKey may still be
// cached and not removed because no Select.select(..) operation was called yet.
eventLoop().selectNow();
selected = true;
} else {
// We forced a select operation on the selector before but the SelectionKey is still cached
// for whatever reason. JDK bug ?
throw e;
}
}
}
}
附錄:
ChannelOption參數(shù)說明
1惋戏、ChannelOption.SO_BACKLOG
ChannelOption.SO_BACKLOG對(duì)應(yīng)的是tcp/ip協(xié)議listen函數(shù)中的backlog參數(shù)领追,函數(shù)listen(int socketfd,int backlog)用來初始化服務(wù)端可連接隊(duì)列,
服務(wù)端處理客戶端連接請(qǐng)求是順序處理的日川,所以同一時(shí)間只能處理一個(gè)客戶端連接蔓腐,多個(gè)客戶端來的時(shí)候矩乐,服務(wù)端將不能處理的客戶端連接請(qǐng)求放在隊(duì)列中等待處理龄句,backlog參數(shù)指定了隊(duì)列的大小
2回论、ChannelOption.SO_REUSEADDR
ChanneOption.SO_REUSEADDR對(duì)應(yīng)于套接字選項(xiàng)中的SO_REUSEADDR,這個(gè)參數(shù)表示允許重復(fù)使用本地地址和端口分歇,
比如傀蓉,某個(gè)服務(wù)器進(jìn)程占用了TCP的80端口進(jìn)行監(jiān)聽,此時(shí)再次監(jiān)聽該端口就會(huì)返回錯(cuò)誤职抡,使用該參數(shù)就可以解決問題葬燎,該參數(shù)允許共用該端口,這個(gè)在服務(wù)器程序中比較常使用缚甩,
比如某個(gè)進(jìn)程非正常退出谱净,該程序占用的端口可能要被占用一段時(shí)間才能允許其他進(jìn)程使用,而且程序死掉以后擅威,內(nèi)核一需要一定的時(shí)間才能夠釋放此端口壕探,不設(shè)置SO_REUSEADDR
就無(wú)法正常使用該端口。
3郊丛、ChannelOption.SO_KEEPALIVE
Channeloption.SO_KEEPALIVE參數(shù)對(duì)應(yīng)于套接字選項(xiàng)中的SO_KEEPALIVE李请,該參數(shù)用于設(shè)置TCP連接,當(dāng)設(shè)置該選項(xiàng)以后厉熟,連接會(huì)測(cè)試鏈接的狀態(tài)导盅,這個(gè)選項(xiàng)用于可能長(zhǎng)時(shí)間沒有數(shù)據(jù)交流的
連接。當(dāng)設(shè)置該選項(xiàng)以后揍瑟,如果在兩小時(shí)內(nèi)沒有數(shù)據(jù)的通信時(shí)白翻,TCP會(huì)自動(dòng)發(fā)送一個(gè)活動(dòng)探測(cè)數(shù)據(jù)報(bào)文。
4月培、ChannelOption.SO_SNDBUF和ChannelOption.SO_RCVBUF
ChannelOption.SO_SNDBUF參數(shù)對(duì)應(yīng)于套接字選項(xiàng)中的SO_SNDBUF嘁字,ChannelOption.SO_RCVBUF參數(shù)對(duì)應(yīng)于套接字選項(xiàng)中的SO_RCVBUF這兩個(gè)參數(shù)用于操作接收緩沖區(qū)和發(fā)送緩沖區(qū)
的大小,接收緩沖區(qū)用于保存網(wǎng)絡(luò)協(xié)議站內(nèi)收到的數(shù)據(jù)杉畜,直到應(yīng)用程序讀取成功纪蜒,發(fā)送緩沖區(qū)用于保存發(fā)送數(shù)據(jù),直到發(fā)送成功此叠。
5纯续、ChannelOption.SO_LINGER
ChannelOption.SO_LINGER參數(shù)對(duì)應(yīng)于套接字選項(xiàng)中的SO_LINGER,Linux內(nèi)核默認(rèn)的處理方式是當(dāng)用戶調(diào)用close()方法的時(shí)候,函數(shù)返回灭袁,在可能的情況下猬错,盡量發(fā)送數(shù)據(jù),不一定保證
會(huì)發(fā)生剩余的數(shù)據(jù)茸歧,造成了數(shù)據(jù)的不確定性倦炒,使用SO_LINGER可以阻塞close()的調(diào)用時(shí)間,直到數(shù)據(jù)完全發(fā)送
6软瞎、ChannelOption.TCP_NODELAY
ChannelOption.TCP_NODELAY參數(shù)對(duì)應(yīng)于套接字選項(xiàng)中的TCP_NODELAY,該參數(shù)的使用與Nagle算法有關(guān)
Nagle算法是將小的數(shù)據(jù)包組裝為更大的幀然后進(jìn)行發(fā)送逢唤,而不是輸入一次發(fā)送一次,因此在數(shù)據(jù)包不足的時(shí)候會(huì)等待其他數(shù)據(jù)的到了拉讯,組裝成大的數(shù)據(jù)包進(jìn)行發(fā)送,雖然該方式有效提高網(wǎng)絡(luò)的有效
負(fù)載鳖藕,但是卻造成了延時(shí)魔慷,而該參數(shù)的作用就是禁止使用Nagle算法,使用于小數(shù)據(jù)即時(shí)傳輸著恩,于TCP_NODELAY相對(duì)應(yīng)的是TCP_CORK院尔,該選項(xiàng)是需要等到發(fā)送的數(shù)據(jù)量最大的時(shí)候,一次性發(fā)送