占小狼 轉(zhuǎn)載請(qǐng)注明原創(chuàng)出處识脆,謝謝牛曹!
本文主要分析Netty服務(wù)端的啟動(dòng)過程抖剿。
Netty是基于Nio實(shí)現(xiàn)的,所以也離不開selector炒瘟、serverSocketChannel吹埠、socketChannel和selectKey等,只不過Netty把這些實(shí)現(xiàn)都封裝在了底層疮装。
從示例可以看出,一切從ServerBootstrap開始刷袍。
ServerBootstrap實(shí)例中需要兩個(gè)NioEventLoopGroup實(shí)例滚局,按照職責(zé)劃分成boss和work藤肢,有著不同的分工:
1嘁圈、boss負(fù)責(zé)請(qǐng)求的accept
2、work負(fù)責(zé)請(qǐng)求的read轧粟、write
NioEventLoopGroup
NioEventLoopGroup主要管理eventLoop的生命周期。
eventLoop是什么履腋?姑且把它看成是內(nèi)部的一個(gè)處理線程惭嚣,數(shù)量默認(rèn)是處理器個(gè)數(shù)的兩倍垄潮。
NioEventLoopGroup構(gòu)造方法:
public NioEventLoopGroup() {
this(0);
}
public NioEventLoopGroup(int nThreads) {
this(nThreads, null);
}
public NioEventLoopGroup(int nThreads, ThreadFactory threadFactory) {
this(nThreads, threadFactory, SelectorProvider.provider());
}
public NioEventLoopGroup(
int nThreads, ThreadFactory threadFactory, final SelectorProvider selectorProvider) {
super(nThreads, threadFactory, selectorProvider);
}
MultithreadEventLoopGroup是NioEventLoopGroup的父類,構(gòu)造方法:
protected MultithreadEventLoopGroup(int nThreads, ThreadFactory threadFactory, Object... args) {
super(nThreads == 0? DEFAULT_EVENT_LOOP_THREADS : nThreads, threadFactory, args);
}
其中 DEFAULT_EVENT_LOOP_THREADS 為處理器數(shù)量的兩倍迫摔。
MultithreadEventExecutorGroup是核心沪摄,管理eventLoop的生命周期,先看看其中幾個(gè)變量芭逝。
1、children:EventExecutor數(shù)組台妆,保存eventLoop翎猛。
2、chooser:從children中選取一個(gè)eventLoop的策略接剩。
構(gòu)造方法:
protected MultithreadEventExecutorGroup(int nThreads, ThreadFactory threadFactory, Object... args) {
if (nThreads <= 0) {
throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
}
if (threadFactory == null) {
threadFactory = newDefaultThreadFactory();
}
children = new SingleThreadEventExecutor[nThreads];
if (isPowerOfTwo(children.length)) {
chooser = new PowerOfTwoEventExecutorChooser();
} else {
chooser = new GenericEventExecutorChooser();
}
for (int i = 0; i < nThreads; i ++) {
boolean success = false;
try {
children[i] = newChild(threadFactory, args);
success = true;
} catch (Exception e) {
// TODO: Think about if this is a good exception type
throw new IllegalStateException("failed to create a child event loop", e);
} finally {
if (!success) {
for (int j = 0; j < i; j ++) {
children[j].shutdownGracefully();
}
for (int j = 0; j < i; j ++) {
EventExecutor e = children[j];
try {
while (!e.isTerminated()) {
e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
}
} catch (InterruptedException interrupted) {
Thread.currentThread().interrupt();
break;
}
}
}
}
}
final FutureListener<Object> terminationListener = new FutureListener<Object>() {
@Override
public void operationComplete(Future<Object> future) throws Exception {
if (terminatedChildren.incrementAndGet() == children.length) {
terminationFuture.setSuccess(null);
}
}
};
for (EventExecutor e: children) {
e.terminationFuture().addListener(terminationListener);
}
}
protected EventExecutor newChild(
ThreadFactory threadFactory, Object... args) throws Exception {
return new NioEventLoop(this, threadFactory, (SelectorProvider) args[0]);
}
1切厘、根據(jù)數(shù)組的大小,采用不同策略初始化chooser懊缺,如果大小為2的冪次方疫稿,則采用PowerOfTwoEventExecutorChooser,否則使用GenericEventExecutorChooser鹃两。
其中判斷一個(gè)數(shù)是否是2的冪次方的方法遗座,覺得很贊。
private static boolean isPowerOfTwo(int val) {
return (val & -val) == val;
}
2俊扳、newChild方法重載途蒋,初始化EventExecutor時(shí),實(shí)際執(zhí)行的是NioEventLoopGroup中的newChild方法馋记,所以children元素的實(shí)際類型為NioEventLoop号坡。
接下去看看NioEventLoop類。
NioEventLoop
每個(gè)eventLoop會(huì)維護(hù)一個(gè)selector和taskQueue梯醒,負(fù)責(zé)處理客戶端請(qǐng)求和內(nèi)部任務(wù)宽堆,如ServerSocketChannel注冊(cè)和ServerSocket綁定等。
NioEventLoop構(gòu)造方法:
NioEventLoop(NioEventLoopGroup parent, ThreadFactory threadFactory, SelectorProvider selectorProvider) {
super(parent, threadFactory, false);
if (selectorProvider == null) {
throw new NullPointerException("selectorProvider");
}
provider = selectorProvider;
selector = openSelector();
}
當(dāng)看到 selector = openSelector() 時(shí)茸习,有沒有覺得親切了許多畜隶,這里先不管 selector,看看SingleThreadEventLoop類。
SingleThreadEventLoop是NioEventLoop的父類代箭,構(gòu)造方法:
protected SingleThreadEventLoop(EventLoopGroup parent, ThreadFactory threadFactory, boolean addTaskWakesUp) {
super(parent, threadFactory, addTaskWakesUp);
}
啥事都沒做...
繼續(xù)看SingleThreadEventLoop的父類SingleThreadEventExecutor
從類名上可以看出墩划,這是一個(gè)只有一個(gè)線程的線程池, 先看看其中的幾個(gè)變量:
1嗡综、state:線程池當(dāng)前的狀態(tài)
2乙帮、taskQueue:存放任務(wù)的隊(duì)列
3、thread:線程池維護(hù)的唯一線程
4极景、scheduledTaskQueue:定義在其父類AbstractScheduledEventExecutor中察净,用以保存延遲執(zhí)行的任務(wù)。
...
構(gòu)造方法:
protected SingleThreadEventExecutor(EventExecutorGroup parent, ThreadFactory threadFactory, boolean addTaskWakesUp) {
if (threadFactory == null) {
throw new NullPointerException("threadFactory");
}
this.parent = parent;
this.addTaskWakesUp = addTaskWakesUp;
thread = threadFactory.newThread(new Runnable() {
@Override
public void run() {
boolean success = false;
updateLastExecutionTime();
try {
SingleThreadEventExecutor.this.run();
success = true;
} catch (Throwable t) {
logger.warn("Unexpected exception from an event executor: ", t);
} finally {
for (;;) {
int oldState = STATE_UPDATER.get(SingleThreadEventExecutor.this);
if (oldState >= ST_SHUTTING_DOWN || STATE_UPDATER.compareAndSet(
SingleThreadEventExecutor.this, oldState, ST_SHUTTING_DOWN)) {
break;
}
}
// Check if confirmShutdown() was called at the end of the loop.
if (success && gracefulShutdownStartTime == 0) {
logger.error(
"Buggy " + EventExecutor.class.getSimpleName() + " implementation; " +
SingleThreadEventExecutor.class.getSimpleName() + ".confirmShutdown() must be called " +
"before run() implementation terminates.");
}
try {
// Run all remaining tasks and shutdown hooks.
for (;;) {
if (confirmShutdown()) {
break;
}
}
} finally {
try {
cleanup();
} finally {
STATE_UPDATER.set(SingleThreadEventExecutor.this, ST_TERMINATED);
threadLock.release();
if (!taskQueue.isEmpty()) {
logger.warn(
"An event executor terminated with " +
"non-empty task queue (" + taskQueue.size() + ')');
}
terminationFuture.setSuccess(null);
}
}
}
}
});
threadProperties = new DefaultThreadProperties(thread);
taskQueue = newTaskQueue();
}
代碼很長盼樟,內(nèi)容很簡單:
1氢卡、初始化一個(gè)線程,并在線程內(nèi)部執(zhí)行NioEventLoop類的run方法晨缴,當(dāng)然這個(gè)線程不會(huì)立刻執(zhí)行译秦。
2、使用LinkedBlockingQueue類初始化taskQueue击碗。
到目前為止筑悴,相關(guān)的處理線程已經(jīng)初始化完成。
ServerBootstrap
通過serverBootstrap.bind(port)啟動(dòng)服務(wù)稍途,過程如下:
/**
* Create a new {@link Channel} and bind it.
*/
public ChannelFuture bind() {
validate();
SocketAddress localAddress = this.localAddress;
if (localAddress == null) {
throw new IllegalStateException("localAddress not set");
}
return doBind(localAddress);
}
doBind實(shí)現(xiàn)如下
private ChannelFuture doBind(final SocketAddress localAddress) {
final ChannelFuture regFuture = initAndRegister();
final Channel channel = regFuture.channel();
if (regFuture.cause() != null) {
return regFuture;
}
if (regFuture.isDone()) {
// At this point we know that the registration was complete and successful.
ChannelPromise promise = channel.newPromise();
doBind0(regFuture, channel, localAddress, promise);
return promise;
} else {
// Registration future is almost always fulfilled already, but just in case it's not.
final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
regFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
Throwable cause = future.cause();
if (cause != null) {
// Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
// IllegalStateException once we try to access the EventLoop of the Channel.
promise.setFailure(cause);
} else {
// Registration was successful, so set the correct executor to use.
// See https://github.com/netty/netty/issues/2586
promise.executor = channel.eventLoop();
}
doBind0(regFuture, channel, localAddress, promise);
}
});
return promise;
}
}
1阁吝、方法initAndRegister返回一個(gè)ChannelFuture實(shí)例regFuture,通過regFuture可以判斷initAndRegister執(zhí)行結(jié)果械拍。
2突勇、如果regFuture.isDone()為true,說明initAndRegister已經(jīng)執(zhí)行完坷虑,則直接執(zhí)行doBind0進(jìn)行socket綁定甲馋。
3、否則regFuture添加一個(gè)ChannelFutureListener監(jiān)聽猖吴,當(dāng)initAndRegister執(zhí)行完成時(shí)摔刁,調(diào)用operationComplete方法并執(zhí)行doBind0進(jìn)行socket綁定。
所以只有當(dāng)initAndRegister操作結(jié)束之后才能進(jìn)行bind操作海蔽。
initAndRegister實(shí)現(xiàn)
final ChannelFuture initAndRegister() {
final Channel channel = channelFactory().newChannel();
try {
init(channel);
} catch (Throwable t) {
channel.unsafe().closeForcibly();
// as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
}
ChannelFuture regFuture = group().register(channel);
if (regFuture.cause() != null) {
if (channel.isRegistered()) {
channel.close();
} else {
channel.unsafe().closeForcibly();
}
}
return regFuture;
}
1共屈、負(fù)責(zé)創(chuàng)建服務(wù)端的NioServerSocketChannel實(shí)例
2、為NioServerSocketChannel的pipeline添加handler
3党窜、注冊(cè)NioServerSocketChannel到selector
大部分的過程和NIO中類似拗引。
NioServerSocketChannel
對(duì)Nio的ServerSocketChannel和SelectionKey進(jìn)行了封裝。
構(gòu)造方法:
public NioServerSocketChannel() {
this(newSocket(DEFAULT_SELECTOR_PROVIDER));
}
private static ServerSocketChannel newSocket(SelectorProvider provider) {
try {
return provider.openServerSocketChannel();
} catch (IOException e) {
throw new ChannelException(
"Failed to open a server socket.", e);
}
}
public NioServerSocketChannel(ServerSocketChannel channel) {
super(null, channel, SelectionKey.OP_ACCEPT);
config = new NioServerSocketChannelConfig(this, javaChannel().socket());
}
1幌衣、方法newSocket利用 provider.openServerSocketChannel() 生成Nio中的ServerSocketChannel對(duì)象矾削。
2壤玫、設(shè)置SelectionKey.OP_ACCEPT事件。
AbstractNioMessageChannel構(gòu)造方法
protected AbstractNioMessageChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
super(parent, ch, readInterestOp);
}
啥也沒做...
AbstractNioChannel構(gòu)造方法
protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
super(parent);
this.ch = ch;
this.readInterestOp = readInterestOp;
try {
ch.configureBlocking(false);
} catch (IOException e) {
try {
ch.close();
} catch (IOException e2) {
if (logger.isWarnEnabled()) {
logger.warn(
"Failed to close a partially initialized socket.", e2);
}
}
throw new ChannelException("Failed to enter non-blocking mode.", e);
}
}
設(shè)置當(dāng)前ServerSocketChannel為非阻塞通道哼凯。
AbstractChannel構(gòu)造方法
protected AbstractChannel(Channel parent) {
this.parent = parent;
unsafe = newUnsafe();
pipeline = new DefaultChannelPipeline(this);
}
1欲间、初始化unsafe,這里的Unsafe并非是jdk中底層Unsafe類断部,用來負(fù)責(zé)底層的connect猎贴、register、read和write等操作蝴光。
2她渴、初始化pipeline,每個(gè)Channel都有自己的pipeline蔑祟,當(dāng)有請(qǐng)求事件發(fā)生時(shí)趁耗,pipeline負(fù)責(zé)調(diào)用相應(yīng)的hander進(jìn)行處理。
unsafe和pipeline的具體實(shí)現(xiàn)原理會(huì)在后續(xù)進(jìn)行分析疆虚。
回到ServerBootstrap的init(Channel channel)方法苛败,添加handler到channel的pipeline中。
void init(Channel channel) throws Exception {
final Map<ChannelOption<?>, Object> options = options();
synchronized (options) {
channel.config().setOptions(options);
}
final Map<AttributeKey<?>, Object> attrs = attrs();
synchronized (attrs) {
for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
@SuppressWarnings("unchecked")
AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();
channel.attr(key).set(e.getValue());
}
}
ChannelPipeline p = channel.pipeline();
final EventLoopGroup currentChildGroup = childGroup;
final ChannelHandler currentChildHandler = childHandler;
final Entry<ChannelOption<?>, Object>[] currentChildOptions;
final Entry<AttributeKey<?>, Object>[] currentChildAttrs;
synchronized (childOptions) {
currentChildOptions = childOptions.entrySet().toArray(newOptionArray(childOptions.size()));
}
synchronized (childAttrs) {
currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(childAttrs.size()));
}
p.addLast(new ChannelInitializer<Channel>() {
@Override
public void initChannel(Channel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
ChannelHandler handler = handler();
if (handler != null) {
pipeline.addLast(handler);
}
pipeline.addLast(new ServerBootstrapAcceptor(
currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
}
1装蓬、設(shè)置channel的options和attrs著拭。
2纱扭、在pipeline中添加一個(gè)ChannelInitializer對(duì)象牍帚。
init執(zhí)行完,需要把當(dāng)前channel注冊(cè)到EventLoopGroup乳蛾。
其實(shí)最終目的是為了實(shí)現(xiàn)Nio中把ServerSocket注冊(cè)到selector上暗赶,這樣就可以實(shí)現(xiàn)client請(qǐng)求的監(jiān)聽了∷嘁叮看看Netty中是如何實(shí)現(xiàn)的:
public ChannelFuture register(Channel channel, ChannelPromise promise) {
return next().register(channel, promise);
}
public EventLoop next() {
return (EventLoop) super.next();
}
public EventExecutor next() {
return children[Math.abs(childIndex.getAndIncrement() % children.length)];
}
因?yàn)镋ventLoopGroup中維護(hù)了多個(gè)eventLoop蹂随,next方法會(huì)調(diào)用chooser策略找到下一個(gè)eventLoop,并執(zhí)行eventLoop的register方法進(jìn)行注冊(cè)因惭。
public ChannelFuture register(final Channel channel, final ChannelPromise promise) {
...
channel.unsafe().register(this, promise);
return promise;
}
channel.unsafe()是什么岳锁?
NioServerSocketChannel初始化時(shí),會(huì)創(chuàng)建一個(gè)NioMessageUnsafe實(shí)例蹦魔,用于實(shí)現(xiàn)底層的register激率、read、write等操作勿决。
eventLoop.execute(new Runnable() {
@Override
public void run() {
register0(promise);
}
});
private void register0(ChannelPromise promise) {
try {
if (!ensureOpen(promise)) {
return;
}
Runnable postRegisterTask = doRegister();
registered = true;
promise.setSuccess();
pipeline.fireChannelRegistered();
if (postRegisterTask != null) {
postRegisterTask.run();
}
if (isActive()) {
pipeline.fireChannelActive();
}
} catch (Throwable t) {
// Close the channel directly to avoid FD leak.
closeForcibly();
if (!promise.tryFailure(t)) {
}
closeFuture.setClosed();
}
}
public void execute(Runnable task) {
if (task == null) {
throw new NullPointerException("task");
}
boolean inEventLoop = inEventLoop();
if (inEventLoop) {
addTask(task);
} else {
startThread();
addTask(task);
if (isShutdown() && removeTask(task)) {
reject();
}
}
if (!addTaskWakesUp) {
wakeup(inEventLoop);
}
}
1乒躺、register0方法提交到eventLoop線程池中執(zhí)行,這個(gè)時(shí)候會(huì)啟動(dòng)eventLoop中的線程低缩。
2嘉冒、方法doRegister()才是最終Nio中的注冊(cè)方法,方法javaChannel()獲取ServerSocketChannel。
protected Runnable doRegister() throws Exception {
boolean selected = false;
for (;;) {
try {
selectionKey = javaChannel().register(eventLoop().selector, 0, this);
return null;
} catch (CancelledKeyException e) {
if (!selected) {
// Force the Selector to select now as the "canceled" SelectionKey may still be
// cached and not removed because no Select.select(..) operation was called yet.
eventLoop().selectNow();
selected = true;
} else {
// We forced a select operation on the selector before but the SelectionKey is still cached
// for whatever reason. JDK bug ?
throw e;
}
}
}
}
ServerSocketChannel注冊(cè)完之后讳推,通知pipeline執(zhí)行fireChannelRegistered方法顶籽,pipeline中維護(hù)了handler鏈表,通過遍歷鏈表银觅,執(zhí)行InBound類型handler的channelRegistered方法蜕衡,最終執(zhí)行init中添加的ChannelInitializer handler。
public final void channelRegistered(ChannelHandlerContext ctx)
throws Exception {
boolean removed = false;
boolean success = false;
try {
initChannel((C) ctx.channel());
ctx.pipeline().remove(this);
removed = true;
ctx.fireChannelRegistered();
success = true;
} catch (Throwable t) {
logger.warn("Failed to initialize a channel. Closing: " + ctx.channel(), t);
} finally {
if (!removed) {
ctx.pipeline().remove(this);
}
if (!success) {
ctx.close();
}
}
}
1设拟、initChannel方法最終把ServerBootstrapAcceptor添加到ServerSocketChannel的pipeline慨仿,負(fù)責(zé)accept客戶端請(qǐng)求。
2纳胧、在pipeline中刪除對(duì)應(yīng)的handler镰吆。
3、觸發(fā)fireChannelRegistered方法跑慕,可以自定義handler的channelRegistered方法万皿。
到目前為止,ServerSocketChannel完成了初始化并注冊(cè)到seletor上核行,啟動(dòng)線程執(zhí)行selector.select()方法準(zhǔn)備接受客戶端請(qǐng)求牢硅。
細(xì)心的同學(xué)已經(jīng)發(fā)現(xiàn),ServerSocketChannel的socket還未綁定到指定端口芝雪,那么這一塊Netty是如何實(shí)現(xiàn)的减余?
Netty把注冊(cè)操作放到eventLoop中執(zhí)行。
private static void doBind0(
final ChannelFuture regFuture,
final Channel channel,
final SocketAddress localAddress,
final ChannelPromise promise) {
channel.eventLoop().execute(new Runnable() {
@Override
public void run() {
if (regFuture.isSuccess()) {
channel.bind(localAddress, promise)
.addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
} else {
promise.setFailure(regFuture.cause());
}
}
});
}
public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
return pipeline.bind(localAddress, promise);
}
@Override
public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
return tail.bind(localAddress, promise);
}
@Override
public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
if (localAddress == null) {
throw new NullPointerException("localAddress");
}
validatePromise(promise, false);
return findContextOutbound().invokeBind(localAddress, promise);
}
private ChannelFuture invokeBind(final SocketAddress localAddress, final ChannelPromise promise) {
EventExecutor executor = executor();
if (executor.inEventLoop()) {
invokeBind0(localAddress, promise);
} else {
executor.execute(new Runnable() {
@Override
public void run() {
invokeBind0(localAddress, promise);
}
});
}
return promise;
}
private void invokeBind0(SocketAddress localAddress, ChannelPromise promise) {
try {
((ChannelOutboundHandler) handler()).bind(this, localAddress, promise);
} catch (Throwable t) {
notifyOutboundHandlerException(t, promise);
}
}
@Override
public void bind(
ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise)
throws Exception {
unsafe.bind(localAddress, promise);
}
最終由unsafe實(shí)現(xiàn)端口的bind操作惩系。
public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {
if (!ensureOpen(promise)) {
return;
}
try {
boolean wasActive = isActive();
...
doBind(localAddress);
promise.setSuccess();
if (!wasActive && isActive()) {
pipeline.fireChannelActive();
}
} catch (Throwable t) {
promise.setFailure(t);
closeIfClosed();
}
}
protected void doBind(SocketAddress localAddress) throws Exception {
javaChannel().socket().bind(localAddress, config.getBacklog());
}
bind完成后位岔,且ServerSocketChannel也已經(jīng)注冊(cè)完成,則觸發(fā)pipeline的fireChannelActive方法堡牡,所以在這里可以自定義fireChannelActive方法抒抬,默認(rèn)執(zhí)行tail的fireChannelActive。
@Override
public ChannelPipeline fireChannelActive() {
head.fireChannelActive();
if (channel.config().isAutoRead()) {
channel.read();
}
return this;
}
channel.read()方法會(huì)觸發(fā)pipeline的行為:
@Override
public Channel read() {
pipeline.read();
return this;
}
@Override
public ChannelPipeline read() {
tail.read();
return this;
}
@Override
public ChannelHandlerContext read() {
findContextOutbound().invokeRead();
return this;
}
private void invokeRead() {
EventExecutor executor = executor();
if (executor.inEventLoop()) {
invokeRead0();
} else {
Runnable task = invokeRead0Task;
if (task == null) {
invokeRead0Task = task = new Runnable() {
@Override
public void run() {
invokeRead0();
}
};
}
executor.execute(task);
}
}
private void invokeRead0() {
try {
((ChannelOutboundHandler) handler()).read(this);
} catch (Throwable t) {
notifyHandlerException(t);
}
}
最終會(huì)在pipeline中找到handler執(zhí)行read方法晤柄,默認(rèn)是head擦剑。
至此為止,server已經(jīng)啟動(dòng)完成芥颈。
END惠勒,我是占小狼。