前言
之前的文章介紹了Netty的線程池NioEventLoopGroup的初始化過程,這次將分析Netty中同樣非常重要的一個(gè)東西ServerBootstrap乱豆。
ServerBootstrap使用片段
ThreadFactory bossThreadFactory = new ThreadFactoryBuilder().setNameFormat(getServerName() + " Server Acceptor NIO Thread#%d").build();
ThreadFactory workerThreadFactory = new ThreadFactoryBuilder().setNameFormat(getServerName() + " Server Reactor NIO Thread#%d").build();
this.bossGroup = new NioEventLoopGroup(numberOfThreads, bossThreadFactory);
this.workerGroup = new NioEventLoopGroup(numberOfThreads, workerThreadFactory);
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast("readTimeoutHandler", new ReadTimeoutHandler(DeviceServerListener.this.timeoutSeconds));
pipeline.addLast("lineBasedFrameDecoder-" + maxLength, new LineBasedFrameDecoder(Integer.parseInt(maxLength)));// 按行('\n')解析成命令ByteBuf
pipeline.addLast("stringPluginMessageDecoder", new StringDecoder(CharsetUtil.UTF_8));
pipeline.addLast("stringToByteEncoder", new StringToByteEncoder());// 將JSON字符串類型消息轉(zhuǎn)換成ByteBuf
pipeline.addLast("deviceMessageDecoder", new DeviceMessageDecoder());// 將JSON字符串消息轉(zhuǎn)成deviceMessage對(duì)象
pipeline.addLast("deviceMessageEncoder", new DeviceMessageEncoder());// 將deviceMessage對(duì)象轉(zhuǎn)成JSON字符串
pipeline.addLast("deviceHeartBeatResponseHandler", new DeviceHeartBeatResponseHandler(heartTime));
pipeline.addLast("deviceAuthResponseHandler",
new DeviceAuthResponseHandler(DeviceServerListener.this.timeoutSeconds, DeviceServerListener.serverInstanceName));
pipeline.addLast("deviceMessageHandler", new DeviceMessageHandler());
// log.debug("Added Handler to Pipeline: {}", pipeline.names());
}
}).option(ChannelOption.SO_BACKLOG, 1024).option(ChannelOption.SO_KEEPALIVE, true).option(ChannelOption.TCP_NODELAY, true)
.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT).childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
.option(ChannelOption.RCVBUF_ALLOCATOR, AdaptiveRecvByteBufAllocator.DEFAULT);
// Start the server. Bind and start to accept incoming connections.
this.channelFuture = bootstrap.bind(serverPort).sync();
這是Netty服務(wù)端比較標(biāo)準(zhǔn)的初始化片段,可以看到這其中ServerBootstrap
有著非常重要的戲份杀饵,它就像是Netty的啟動(dòng)器一樣莽囤,其中的NioEventLoopGroup
之前已經(jīng)分析過了谬擦,那么直接來看ServerBootstrap
的初始化切距。
源碼解析
從以上片段可以看到初始化時(shí)首先通過ServerBootstrap
的無參構(gòu)造函數(shù)創(chuàng)建一個(gè)對(duì)象,該構(gòu)造函數(shù)沒有任何的操作因此不做分析惨远。接著是該對(duì)象的一串鏈?zhǔn)秸{(diào)用bootstrap.group().channel().childHandler().option()
谜悟,我們來逐一看一下。
首先是group()
方法
public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) {
super.group(parentGroup);
if (childGroup == null) {
throw new NullPointerException("childGroup");
}
if (this.childGroup != null) {
throw new IllegalStateException("childGroup set already");
}
this.childGroup = childGroup;
return this;
}
這里將傳入的childGroup
對(duì)象賦值給ServerBootstrap
的childGroup
屬性北秽,然后調(diào)用了父類AbstractBootstrap
的group()
方法
public B group(EventLoopGroup group) {
if (group == null) {
throw new NullPointerException("group");
}
if (this.group != null) {
throw new IllegalStateException("group set already");
}
this.group = group;
return (B) this;
}
由這里看出該方法將傳入的parentGroup
賦值給了group
屬性葡幸。
接著來看channel()
方法
public B channel(Class<? extends C> channelClass) {
if (channelClass == null) {
throw new NullPointerException("channelClass");
}
return channelFactory(new BootstrapChannelFactory<C>(channelClass));
}
這里調(diào)用了channelFactory(new BootstrapChannelFactory<C>(channelClass))
,來跟一下
public B channelFactory(ChannelFactory<? extends C> channelFactory) {
if (channelFactory == null) {
throw new NullPointerException("channelFactory");
}
if (this.channelFactory != null) {
throw new IllegalStateException("channelFactory set already");
}
this.channelFactory = channelFactory;
return (B) this;
}
這里完成了對(duì)AbstractBootstrap
類的channelFactory
屬性的賦值贺氓,賦值對(duì)象為new BootstrapChannelFactory<C>(channelClass)
蔚叨。
public abstract class AbstractBootstrap<B extends AbstractBootstrap<B, C>, C extends Channel> implements Cloneable
這里的C
根據(jù)AbstractBootstrap
的類定義為Channel
的子類,因此實(shí)際傳入的channelClass
值為NioServerSocketChannel.class
辙培。
接著來看childHandler()
方法
public ServerBootstrap childHandler(ChannelHandler childHandler) {
if (childHandler == null) {
throw new NullPointerException("childHandler");
}
this.childHandler = childHandler;
return this;
}
沒有過多操作蔑水,也僅是對(duì)ServerBootstrap
的childHandler
屬性賦值,但這里傳入的childHandler
稍微有點(diǎn)復(fù)雜扬蕊。傳入的對(duì)象為new ChannelInitializer<SocketChannel>()
同時(shí)覆寫了其initChannel(SocketChannel ch)
方法搀别,該方法初始化了一個(gè)處理鏈用于處理接收和發(fā)送雙向的消息,這塊比較重要尾抑,后續(xù)將會(huì)單獨(dú)進(jìn)行分析歇父,在此暫不深入。
最后來看一下option()
方法
public <T> B option(ChannelOption<T> option, T value) {
if (option == null) {
throw new NullPointerException("option");
}
if (value == null) {
synchronized (options) {
options.remove(option);
}
} else {
synchronized (options) {
options.put(option, value);
}
}
return (B) this;
}
這里維護(hù)了一個(gè)options
數(shù)組將要設(shè)置的項(xiàng)都放進(jìn)了該數(shù)組中再愈,其定義如下
private final Map<ChannelOption<?>, Object> options = new LinkedHashMap<ChannelOption<?>, Object>();
這里可以看出放入對(duì)象的key值限定為ChannelOption<?>
對(duì)象榜苫,大致都有下面這些
public static final ChannelOption<ByteBufAllocator> ALLOCATOR = valueOf("ALLOCATOR");//根據(jù)PooledByteBufAllocator或UnpooledByteBufAllocator設(shè)置對(duì)象池開啟與否(4.1版本開始默認(rèn)開)
public static final ChannelOption<RecvByteBufAllocator> RCVBUF_ALLOCATOR = valueOf("RCVBUF_ALLOCATOR");//設(shè)置接收池
public static final ChannelOption<MessageSizeEstimator> MESSAGE_SIZE_ESTIMATOR = valueOf("MESSAGE_SIZE_ESTIMATOR");
public static final ChannelOption<Integer> CONNECT_TIMEOUT_MILLIS = valueOf("CONNECT_TIMEOUT_MILLIS");
public static final ChannelOption<Integer> MAX_MESSAGES_PER_READ = valueOf("MAX_MESSAGES_PER_READ");
public static final ChannelOption<Integer> WRITE_SPIN_COUNT = valueOf("WRITE_SPIN_COUNT");
public static final ChannelOption<Integer> WRITE_BUFFER_HIGH_WATER_MARK = valueOf("WRITE_BUFFER_HIGH_WATER_MARK");
public static final ChannelOption<Integer> WRITE_BUFFER_LOW_WATER_MARK = valueOf("WRITE_BUFFER_LOW_WATER_MARK");
public static final ChannelOption<Boolean> ALLOW_HALF_CLOSURE = valueOf("ALLOW_HALF_CLOSURE");
public static final ChannelOption<Boolean> AUTO_READ = valueOf("AUTO_READ");
public static final ChannelOption<Boolean> SO_BROADCAST = valueOf("SO_BROADCAST");//允許發(fā)送廣播數(shù)據(jù)報(bào)
public static final ChannelOption<Boolean> SO_KEEPALIVE = valueOf("SO_KEEPALIVE");//是否啟用心跳保活機(jī)制
public static final ChannelOption<Integer> SO_SNDBUF = valueOf("SO_SNDBUF");//發(fā)送緩沖區(qū)大小
public static final ChannelOption<Integer> SO_RCVBUF = valueOf("SO_RCVBUF");//接收緩沖區(qū)大小
public static final ChannelOption<Boolean> SO_REUSEADDR = valueOf("SO_REUSEADDR");//是否允許重復(fù)使用本地地址和端口
public static final ChannelOption<Integer> SO_LINGER = valueOf("SO_LINGER");//保證阻塞close()的調(diào)用翎冲,直到數(shù)據(jù)完全發(fā)送
public static final ChannelOption<Integer> SO_BACKLOG = valueOf("SO_BACKLOG");//臨時(shí)存放已完成三次握手的請(qǐng)求隊(duì)列的最大長(zhǎng)度
public static final ChannelOption<Integer> SO_TIMEOUT = valueOf("SO_TIMEOUT");
public static final ChannelOption<Integer> IP_TOS = valueOf("IP_TOS");
public static final ChannelOption<InetAddress> IP_MULTICAST_ADDR = valueOf("IP_MULTICAST_ADDR");
public static final ChannelOption<NetworkInterface> IP_MULTICAST_IF = valueOf("IP_MULTICAST_IF");
public static final ChannelOption<Integer> IP_MULTICAST_TTL = valueOf("IP_MULTICAST_TTL");
public static final ChannelOption<Boolean> IP_MULTICAST_LOOP_DISABLED = valueOf("IP_MULTICAST_LOOP_DISABLED");
public static final ChannelOption<Boolean> TCP_NODELAY = valueOf("TCP_NODELAY");//是否開啟Nagle算法垂睬,即是否等數(shù)據(jù)累積到一定程度再發(fā)送數(shù)據(jù)
這里很大一部分是tcp中的一些參數(shù)設(shè)置,比較常用的幾個(gè)參數(shù)都在源碼上做了注釋府适,其中對(duì)性能影響很大的一項(xiàng)是ALLOCATOR
羔飞,根據(jù)一些資料顯示開啟對(duì)象池性能遠(yuǎn)高于不開啟,而Netty從4.1版本開始也將默認(rèn)選項(xiàng)設(shè)為了開啟對(duì)象池檐春。
這里我們順便來看一下valueOf
方法的實(shí)現(xiàn)
public static <T> ChannelOption<T> valueOf(String name) {
checkNotNull(name, "name");
ChannelOption<T> option = names.get(name);
if (option == null) {
option = new ChannelOption<T>(name);
ChannelOption<T> old = names.putIfAbsent(name, option);
if (old != null) {
option = old;
}
}
return option;
}
可以看到這個(gè)方法主要是維護(hù)了names
這個(gè)數(shù)組逻淌,其定義如下
private static final ConcurrentMap<String, ChannelOption> names = PlatformDependent.newConcurrentHashMap();
來看一下PlatformDependent.newConcurrentHashMap()
的具體實(shí)現(xiàn)
public static <K, V> ConcurrentMap<K, V> newConcurrentHashMap() {
if (CAN_USE_CHM_V8) {
return new ConcurrentHashMapV8<K, V>();
} else {
return new ConcurrentHashMap<K, V>();
}
}
看到這里我又一次被震驚了,Netty自己實(shí)現(xiàn)了一個(gè)ConcurrentHashMapV8
用于應(yīng)對(duì)java8以下的版本疟暖,要知道這可是一個(gè)6000多行代碼的類卡儒。田柔。。不得不說Netty的開發(fā)者真的不愧優(yōu)化狂魔的稱號(hào)骨望。
分析完了bootstrap.group().channel().childHandler().option()
這一串鏈?zhǔn)秸{(diào)用硬爆,接著來看下一串bootstrap.bind(serverPort).sync()
。這里首先來看bind()
方法
public ChannelFuture bind(int inetPort) {
return bind(new InetSocketAddress(inetPort));
}
這里根據(jù)端口號(hào)創(chuàng)建了一個(gè)InetSocketAddress
對(duì)象擎鸠,并調(diào)用了bind(SocketAddress localAddress)
方法
public ChannelFuture bind(SocketAddress localAddress) {
validate();
if (localAddress == null) {
throw new NullPointerException("localAddress");
}
return doBind(localAddress);
}
這里的validate()
方法校驗(yàn)了AbstractBootstrap
類的group
和channelFactory
屬性是否為空缀磕,然后執(zhí)行doBind()
方法
private ChannelFuture doBind(final SocketAddress localAddress) {
final ChannelFuture regFuture = initAndRegister();// 創(chuàng)建Channel并注冊(cè)到線程池
final Channel channel = regFuture.channel();
if (regFuture.cause() != null) {
return regFuture;
}
if (regFuture.isDone()) {
// 注冊(cè)完成且注冊(cè)成功
ChannelPromise promise = channel.newPromise();
doBind0(regFuture, channel, localAddress, promise);
return promise;
} else {
// 由于注冊(cè)是異步事件,可能此時(shí)沒有注冊(cè)完成劣光,那么使用異步操作
final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
regFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
Throwable cause = future.cause();
if (cause != null) {
// 注冊(cè)過程中有異常則失敗
promise.setFailure(cause);
} else {
// 注冊(cè)完成且成功
promise.executor = channel.eventLoop();
doBind0(regFuture, channel, localAddress, promise);
}
}
});
return promise;
}
}
首先看一下regFuture
對(duì)象如何通過initAndRegister()
方法生成的
final ChannelFuture initAndRegister() {
Channel channel = null;
try {
// 創(chuàng)建一個(gè)Channel
channel = channelFactory().newChannel();
// 初始化處理器Handler
init(channel);
} catch (Throwable t) {
if (channel != null) {
// channel can be null if newChannel crashed (eg SocketException("too many open files"))
channel.unsafe().closeForcibly();
}
// Channel還沒有注冊(cè)到線程池袜蚕,使用默認(rèn)線程GlobalEventExecutor
return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
}
// 將channel注冊(cè)到Reactor線程池
ChannelFuture regFuture = group().register(channel);
if (regFuture.cause() != null) {
if (channel.isRegistered()) {
channel.close();
} else {
channel.unsafe().closeForcibly();
}
}
return regFuture;
}
channelFactory().newChannel()
這里調(diào)用的是AbstractBootstrap
的channelFactory
屬性的newChannel()
方法,根據(jù)之前賦值的channelFactory
對(duì)象的情況來看绢涡,這里最終得到的是NioServerSocketChannel
的一個(gè)實(shí)例對(duì)象牲剃。得到channel
對(duì)象后,接著調(diào)用init(channel)
進(jìn)行了初始化雄可,該方法由子類ServerBootstrap
實(shí)現(xiàn)凿傅。
@Override
void init(Channel channel) throws Exception {
final Map<ChannelOption<?>, Object> options = options();
synchronized (options) {
channel.config().setOptions(options);//將options中逐一設(shè)置到DefaultChannelConfig對(duì)象上
}
final Map<AttributeKey<?>, Object> attrs = attrs();
synchronized (attrs) {
for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
@SuppressWarnings("unchecked")
AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();
channel.attr(key).set(e.getValue());
}
}
ChannelPipeline p = channel.pipeline();
final EventLoopGroup currentChildGroup = childGroup;
final ChannelHandler currentChildHandler = childHandler;
final Entry<ChannelOption<?>, Object>[] currentChildOptions;
final Entry<AttributeKey<?>, Object>[] currentChildAttrs;
synchronized (childOptions) {
currentChildOptions = childOptions.entrySet().toArray(newOptionArray(childOptions.size()));
}
synchronized (childAttrs) {
currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(childAttrs.size()));
}
p.addLast(new ChannelInitializer<Channel>() {
@Override
public void initChannel(Channel ch) throws Exception {
final ChannelPipeline pipeline = ch.pipeline();
ChannelHandler handler = handler();
if (handler != null) {
pipeline.addLast(handler);
}
ch.eventLoop().execute(new Runnable() {
@Override
public void run() {//為了把ServerBootstrapAcceptor放在處理鏈的最末端,該類主要功能是將mainReactor接受的Channel傳遞給subReactor
pipeline.addLast(new ServerBootstrapAcceptor(
currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
}
});
}
這個(gè)方法看似很長(zhǎng)其實(shí)主要做了兩件事:一数苫、將父類和子類的options和attrs進(jìn)行賦值聪舒;二、構(gòu)建channel
的pipeline
屬性的處理鏈文判。這里主要來看一下addLast()
方法过椎,由DefaultChannelPipeline
進(jìn)行具體實(shí)現(xiàn),來看一下
@Override
public final ChannelPipeline addLast(ChannelHandler... handlers) {
return addLast(null, handlers);
}
@Override
public final ChannelPipeline addLast(EventExecutorGroup executor, ChannelHandler... handlers) {
if (handlers == null) {
throw new NullPointerException("handlers");
}
for (ChannelHandler h: handlers) {
if (h == null) {
break;
}
addLast(executor, null, h);
}
return this;
}
@Override
public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
final AbstractChannelHandlerContext newCtx;
synchronized (this) {
checkMultiplicity(handler);//判斷handler是否已添加并且是否是sharable的
newCtx = newContext(group, filterName(name, handler), handler);//filterName(name, handler)自動(dòng)生成name戏仓,newContext方法對(duì)newCtx對(duì)象的handler疚宇、inbound、outbound赏殃、name敷待、pipeline屬性進(jìn)行了賦值
addLast0(newCtx);//將newCtx加到tail節(jié)點(diǎn)之前
if (!registered) {//一旦channel注冊(cè)到eventloop上就將registered置為true,并且不可再改變
newCtx.setAddPending();//CAS地修改newCtx的handlerState的值
callHandlerCallbackLater(newCtx, true);//添加newCtx到pendingHandlerCallbackHead的next節(jié)點(diǎn)
return this;
}
EventExecutor executor = newCtx.executor();
if (!executor.inEventLoop()) {
newCtx.setAddPending();
executor.execute(new Runnable() {
@Override
public void run() {
callHandlerAdded0(newCtx);
}
});
return this;
}
}
callHandlerAdded0(newCtx);
return this;
}
最終執(zhí)行的是三參的addLast()
方法并且傳入的group
和name
均為空仁热。該方法中主要根據(jù)channel
是否注冊(cè)到eventloop
上做不同的處理榜揖,初始化handler鏈。
回到initAndRegister()
方法中抗蠢,分析完了init(channel)
再來看另一個(gè)很重要的將channel
注冊(cè)到Reactor線程池的register()
方法举哟。該方法在NioEventLoopGroup
的父類MultithreadEventLoopGroup
中實(shí)現(xiàn)
@Override
public ChannelFuture register(Channel channel) {
return next().register(channel);
}
@Override
public EventLoop next() {
return (EventLoop) super.next();
}
@Override
public EventExecutor next() {
return chooser.next();
}
這里的chooser
上一篇文章中有分析過,其根據(jù)線程池設(shè)置的線程數(shù)是否2的冪次方有不同的實(shí)現(xiàn)迅矛,而next()
方法返回的是數(shù)組中的一個(gè)NioEventLoop
對(duì)象妨猩,該用類的register()
方法,最終會(huì)返回一個(gè)注冊(cè)了該channel
的DefaultChannelPromise
對(duì)象秽褒,具體這里就不再深入了壶硅。
到此initAndRegister()
方法就分析完畢威兜,回到doBind()
方法來繼續(xù)分析doBind0()
方法
private static void doBind0(
final ChannelFuture regFuture, final Channel channel,
final SocketAddress localAddress, final ChannelPromise promise) {
channel.eventLoop().execute(new Runnable() {
@Override
public void run() {
if (regFuture.isSuccess()) {
channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
} else {
promise.setFailure(regFuture.cause());
}
}
});
}
這里能看到,只有regFuture.isSuccess()
也就是channel
注冊(cè)成功時(shí)時(shí)才會(huì)執(zhí)行綁定操作channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE)
庐椒,否則直接向promise
寫注冊(cè)失敗椒舵,這里的promise
是一個(gè)DefaultChannelPromise
類型的對(duì)象,該類繼承自Future
约谈,可以認(rèn)為是一種特殊的Future
對(duì)象笔宿。bind(localAddress, promise)
方法最終會(huì)綁定在tail
節(jié)點(diǎn)上,最終交由DefaultChannelPipeline
的內(nèi)部類的unsafe
去進(jìn)行綁定窗宇,調(diào)用鏈非常深在此就不做展開了措伐。
最后跳回來看一下bootstrap.bind(serverPort).sync()
中的sync()
特纤,之前分析過bootstrap.bind(serverPort)
返回的是promise
军俊,因此sync()
方法由DefaultChannelPromise
實(shí)現(xiàn)
@Override
public ChannelPromise sync() throws InterruptedException {
super.sync();
return this;
}
接著跟到DefaultPromise
類
@Override
public Promise<V> sync() throws InterruptedException {
await();
rethrowIfFailed();
return this;
}
@Override
public Promise<V> await() throws InterruptedException {
if (isDone()) {
return this;
}
if (Thread.interrupted()) {
throw new InterruptedException(toString());
}
checkDeadLock();
synchronized (this) {
while (!isDone()) {
incWaiters();
try {
wait();
} finally {
decWaiters();
}
}
}
return this;
}
可以看出正常情況下該方法會(huì)始終在while循環(huán)中,導(dǎo)致執(zhí)行到sync()
方法的線程阻塞捧存,因此bootstrap.bind(serverPort).sync()
以后的代碼都是不可達(dá)的粪躬,這點(diǎn)值得注意。
總結(jié)
ServerBootstrap
的使用方式非常固定,大部分常規(guī)使用都會(huì)應(yīng)用該初始化代碼模板。主要做的事情就是賦值ServerBootstrap
的各個(gè)屬性糊秆,并且創(chuàng)建Channel
剖淀、綁定用戶定義的Handler
、以及將該Channel
注冊(cè)到一個(gè)eventloop
中(這里特別要強(qiáng)調(diào)的是Channel
只能被綁定到一個(gè)eventloop
中)格粪,最后綁定本地端口監(jiān)聽I(yíng)O事件。