在上篇文章《Netty的啟動(dòng)過(guò)程一》中屯耸,我們講述了Netty服務(wù)端boss線程的啟動(dòng)過(guò)程且预,但是worker線程是如何啟動(dòng)的還是未知的蜈项。我們知道了boss線程是在ServerBootstrap的bind方法中啟動(dòng)的定硝,再回到上篇文章中Netty的啟動(dòng)代碼段瘪菌,在NioEventLoopGroup的初始化方法和ServerBootstrap的bind方法中間還隔了很多代碼撒会,這些源碼都還沒(méi)看的,我們現(xiàn)在來(lái)看看這些源碼师妙。
繼NioEventLoopGroup初始化后诵肛,服務(wù)端便創(chuàng)建了一個(gè)ServerBootstrap實(shí)例,這個(gè)類是服務(wù)端Netty特有的啟動(dòng)類默穴,客戶端的為Bootstrap怔檩;接下來(lái)便把boss線程組和worker線程組分別賦給了ServerBootstrap的group和childGroup變量,注意worker線程組是賦給了childGroup蓄诽;接下來(lái)便是設(shè)置一些參數(shù)薛训,比如channel,option仑氛,childOption乙埃,handler闸英,childHandler,注意帶child的和沒(méi)帶child的區(qū)別:帶child的基本是設(shè)置 ServerChannel 的子 channel 的選項(xiàng)介袜,即沒(méi)帶child的基本都是對(duì)boss線程而言的甫何,而帶child的基本都是對(duì)worker線程而言的。
這里需要注意channel(NioServerSocketChannel.class)一句遇伞,它是指設(shè)置boss線程channel類型沛豌。
接下來(lái)要了解下Netty的ChannelPipeline和ChannelHandler的關(guān)系了,這里引用《游戲之網(wǎng)絡(luò)進(jìn)階》的一幅圖:
pipeline 是一個(gè)負(fù)責(zé)處理網(wǎng)絡(luò)事件的職責(zé)鏈赃额,負(fù)責(zé)管理和執(zhí)行 ChannelHandler加派,即負(fù)責(zé)消息入站和出站的流程。
在上篇文章中跳芳,我們知道了在啟動(dòng)boss線程后芍锦,雖然boss線程在for循環(huán)中無(wú)限循環(huán),但是是沒(méi)有進(jìn)入到后面的if分支的SelectionKey.OP_ACCEPT中的飞盆,只有先進(jìn)了這里娄琉,才會(huì)啟動(dòng)服務(wù)端的worker線程:
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
try {
int readyOps = k.readyOps();
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
unsafe.read();
if (!ch.isOpen()) {
return;
}
}
...
} catch (CancelledKeyException ignored) {
unsafe.close(unsafe.voidPromise());
}
}
因此,我們?cè)侔褦帱c(diǎn)打在threadFactory.newThread(command).start()中吓歇,然后啟動(dòng)客戶端去連接服務(wù)端孽水,看下它的調(diào)用堆棧是怎樣的:
從上篇可知,當(dāng)每次有客戶端連接時(shí)城看,此時(shí)readyOps=16女气,繼而啟動(dòng)worker線程;每次讀取客戶端數(shù)據(jù)時(shí)测柠,此時(shí)readyOps=1炼鞠,繼而worker線程讀取數(shù)據(jù);很明顯轰胁,Netty是以readyOps的值區(qū)分連接和讀寫數(shù)據(jù)的谒主,那么readyOps又是如何設(shè)置的呢?看代碼赃阀,readyOps取自于SelectionKey霎肯,而SelectionKey取自于SelectionKey[]數(shù)組,而SelectionKey[]
private void processSelectedKeys() {
if (selectedKeys != null) {
processSelectedKeysOptimized(selectedKeys.flip());
} else {
processSelectedKeysPlain(selector.selectedKeys());
}
}
來(lái)自selectedKeys.flip()榛斯,flip()實(shí)現(xiàn)如下:
SelectionKey[] flip() {
if (isA) {
isA = false;
keysA[keysASize] = null;
keysBSize = 0;
return keysA;
} else {
isA = true;
keysB[keysBSize] = null;
keysASize = 0;
return keysB;
}
}
即SelectionKey[]來(lái)自keysA或keysB地址观游,而上述processSelectedKeys方法處于NioEventLoop的無(wú)限循環(huán)中,即boss線程(worker線程)的無(wú)限循環(huán)中:
@Override
protected void run() {
for (;;) {
switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
case SelectStrategy.SELECT:
select(wakenUp.getAndSet(false));
...
}
processSelectedKeys();
}
}
也就是說(shuō)肖抱,boss線程在無(wú)限循環(huán)SelectionKey[]即keysA或keysB的值备典,當(dāng)讀到SelectionKey不為空時(shí),也就讀到了readyOps值意述,根據(jù)readyOps值提佣,就知道客戶端是什么操作了吮蛹,證據(jù)如下:
private void processSelectedKeysOptimized(SelectionKey[] selectedKeys) {
for (int i = 0;; i ++) {
final SelectionKey k = selectedKeys[i];
if (k == null) {
break;
}
selectedKeys[i] = null;
final Object a = k.attachment();
if (a instanceof AbstractNioChannel) {
processSelectedKey(k, (AbstractNioChannel) a);
}
}
}
現(xiàn)在知道了worker線程啟動(dòng)和讀寫數(shù)據(jù)跟這個(gè)readyOps值有關(guān),那這個(gè)值又是如何設(shè)置進(jìn)去的呢拌屏?我們已知SelectionKey[]來(lái)自于keysA或keysB潮针,那么我們?nèi)炙阉鬟@兩個(gè)變量看怎么用的,就知道它是如何設(shè)置值的了倚喂。
可見每篷,keysA或keysB唯一設(shè)置值的地方是在add方法中,因此我們?cè)赼dd方法中打上斷點(diǎn)端圈,啟動(dòng)客戶端去連接焦读,就應(yīng)該知道SelectionKey[]值是如何設(shè)置的了。
果然舱权,當(dāng)客戶端請(qǐng)求連接服務(wù)端時(shí)矗晃,在boss線程中,進(jìn)入了此斷點(diǎn)宴倍,而且SelectionKey的readyOps設(shè)置成了16张症,后續(xù)在processSelectedKey方法中,boss線程就是根據(jù)此readyOps值再啟動(dòng)worker線程的鸵贬。而且由調(diào)用堆椝姿可知,它正是在boss無(wú)限循環(huán)的run()方法中進(jìn)入了select(wakenUp.getAndSet(false))方法阔逼,查詢是否有就緒的IO事件(讀寫兆衅,連接等),有即設(shè)置keysA或keysB的SelectionKey值颜价。而這些SelectionKey值是Netty監(jiān)聽到了這些IO事件涯保,封裝進(jìn)SelectionKey的。根據(jù)操作系統(tǒng)的不同而封裝過(guò)程不同周伦。
Netty 基于 Selector 對(duì)象實(shí)現(xiàn) I/O 多路復(fù)用,通過(guò) Selector 一個(gè)線程可以監(jiān)聽多個(gè)連接的 Channel 事件未荒。
當(dāng)向一個(gè) Selector 中注冊(cè) Channel 后专挪,Selector 內(nèi)部的機(jī)制就可以自動(dòng)不斷地查詢(Select) 這些注冊(cè)的 Channel 是否有已就緒的 I/O 事件(例如可讀,可寫片排,網(wǎng)絡(luò)連接完成等)寨腔,這樣程序就可以很簡(jiǎn)單地使用一個(gè)線程高效地管理多個(gè) Channel 。
摘自:《新手入門:目前為止最透徹的的Netty高性能原理和框架架構(gòu)解析》
同理率寡,當(dāng)客戶端發(fā)數(shù)據(jù)給服務(wù)端時(shí)迫卢,也進(jìn)入了此斷點(diǎn),而且SelectionKey的readyOps設(shè)置成了1冶共,只是此時(shí)是在worker線程中了乾蛤。
現(xiàn)在知道了worker線程啟動(dòng)的原因每界,但是過(guò)程是怎樣的呢?
我們?nèi)栽趖hreadFactory.newThread(command).start()處打上斷點(diǎn)家卖,由上篇可知眨层,第一次進(jìn)入此斷點(diǎn),Netty啟動(dòng)了boss線程上荡,第二次進(jìn)入此斷點(diǎn)即啟動(dòng)了worker線程趴樱,現(xiàn)在我們來(lái)看下第二次進(jìn)入此斷點(diǎn)的情況(請(qǐng)查看上圖->客戶端連接啟動(dòng)worker線程.png):
由圖片堆棧打印所知,在boss線程中酪捡,首先由readyOps=16叁征,進(jìn)入了NioMessageUnsafe.read()方法,如下:
@Override
public void read() {
...
do {
//讀取SocketChannel消息/事件逛薇,封裝進(jìn)readBuf捺疼,實(shí)際上是封裝worker線程channel,供后續(xù)worker線程注冊(cè)此channel
int localRead = doReadMessages(readBuf);
...
allocHandle.incMessagesRead(localRead);
} while (allocHandle.continueReading());
int size = readBuf.size();
for (int i = 0; i < size; i ++) {
readPending = false;
//處理readBuf事件金刁,實(shí)際上是為worker線程添加新channel帅涂,初始化childHandler,pipeline及參數(shù)等信息
pipeline.fireChannelRead(readBuf.get(i));
}
readBuf.clear();
allocHandle.readComplete();
pipeline.fireChannelReadComplete();
}
這里有兩個(gè)重要方法尤蛮,一為doReadMessages(readBuf)媳友,主要是封裝NioSocketChannel,以供worker線程添加channel和監(jiān)聽SelectionKey.OP_READ事件用:
@Override
protected int doReadMessages(List<Object> buf) throws Exception {
SocketChannel ch = javaChannel().accept();
if (ch != null) {
buf.add(new NioSocketChannel(this, ch));
return 1;
}
return 0;
}
再看boss線程中NioSocketChannel繼承關(guān)系:
public NioSocketChannel(Channel parent, SocketChannel socket) {
super(parent, socket);
config = new NioSocketChannelConfig(this, socket.socket());
}
NioSocketChannel繼承自AbstractNioByteChannel产捞,注意在這里先定義了SelectionKey.OP_READ操作醇锚,以供worker線程監(jiān)聽此事件:
protected AbstractNioByteChannel(Channel parent, SelectableChannel ch) {
super(parent, ch, SelectionKey.OP_READ);//為后續(xù)worker線程監(jiān)聽SelectionKey.OP_READ事件
}
另一為pipeline.fireChannelRead(readBuf.get(i))方法,在經(jīng)歷NioServerSocketChannel的pipeline中首尾handler的read方法坯临,最終來(lái)到了ServerBootstrapAcceptor的
public void channelRead(ChannelHandlerContext ctx, Object msg) {
final Channel child = (Channel) msg;
child.pipeline().addLast(childHandler);
for (Entry<ChannelOption<?>, Object> e: childOptions) {
if (!child.config().setOption((ChannelOption<Object>) e.getKey(), e.getValue())) {
logger.warn("Unknown channel option: " + e);
}
}
for (Entry<AttributeKey<?>, Object> e: childAttrs) {
child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
}
childGroup.register(child).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
forceClose(child, future.cause());
}
}
});
}
由此焊唬,在childGroup.register(child)中,注冊(cè)了此channel(NioSocketChannel)看靠,并設(shè)置了pipeline赶促,參數(shù)等其他信息。
此后挟炬,在后續(xù)的register方法中鸥滨,由eventLoop.execute方法,啟動(dòng)了worker線程谤祖,也是由MultithreadEventLoopGroup中的register方法婿滓,以next()限制了worker線程數(shù)量。
@Override
public ChannelFuture register(Channel channel) {
return next().register(channel);
}
@Override
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
AbstractChannel.this.eventLoop = eventLoop;//將channel和eventLoop關(guān)聯(lián)起來(lái)粥喜,即將channel和worker線程關(guān)聯(lián)起來(lái)
if (eventLoop.inEventLoop()) {
register0(promise);
} else {
eventLoop.execute(new Runnable() {
@Override
public void run() {
register0(promise);
}
});
}
}
并在register0方法中凸主,將netty的niochannel綁定到j(luò)ava原生的selectkey參數(shù)上,并告知worker線程pipeline各handler channel的注冊(cè)和激活事件额湘。
private void register0(ChannelPromise promise) {
try {
boolean firstRegistration = neverRegistered;
doRegister();//將netty的niochannel綁定到j(luò)ava原生的selectkey參數(shù)上
neverRegistered = false;
registered = true;
pipeline.invokeHandlerAddedIfNeeded();
safeSetSuccess(promise);
pipeline.fireChannelRegistered();//告知pipeline中各handler有channel注冊(cè)
if (isActive()) {
if (firstRegistration) {
pipeline.fireChannelActive();//告知pipeline中各handler有channel激活
} else if (config().isAutoRead()) {
beginRead();
}
}
} catch (Throwable t) {
}
}
看doRegister()方法卿吐,在AbstractNioChannel下內(nèi)部抽象類AbstractNioUnsafe的doRegister()方法中:
@Override
protected void doRegister() throws Exception {
boolean selected = false;
for (;;) {
try {
//如果觸發(fā)了讀事件的SelectKey,netty通過(guò)調(diào)用 SelectKey的attachment()方法就可以獲取channel了
selectionKey = javaChannel().register(eventLoop().selector, 0, this);
return;
} catch (CancelledKeyException e) {
}
}
}
將netty的channel綁定到j(luò)ava原生的selectkey參數(shù)上旁舰,如果觸發(fā)了讀事件的SelectKey,netty通過(guò)調(diào)用 SelectKey的attachment()方法就可以獲取channel了(見processSelectedKeysOptimized方法k.attachment())但两。
現(xiàn)在鬓梅,worker線程如何啟動(dòng)的也知道了,那么worker線程是如何讀取數(shù)據(jù)的呢谨湘?
這次绽快,我們把斷點(diǎn)打在if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0)一句,然后啟動(dòng)客戶端連接服務(wù)端并給服務(wù)端發(fā)數(shù)據(jù)紧阔,這時(shí)堆棧為:
把該圖與上面“客戶端連接啟動(dòng)worker線程.png”對(duì)比坊罢,啟動(dòng)worker線程前,readyOps=16擅耽,此時(shí)是在boss線程中活孩,實(shí)際用的unsafe是NioMessageUnsafe.read();讀取客戶端數(shù)據(jù)時(shí)乖仇,readyOps=1憾儒,此時(shí)是在worker線程中,實(shí)際用的是NioByteUnsafe.read()乃沙。此后起趾,經(jīng)歷worker線程的pipeline,將數(shù)據(jù)發(fā)至用戶自定義的handler警儒,這便完成了對(duì)客戶端數(shù)據(jù)的讀取训裆。
那NioMessageUnsafe是如何來(lái)的呢?
其實(shí)NioMessageUnsafe來(lái)自ServerBootstrap的bind方法蜀铲,跟下去边琉,在AbstractBootstrap的initAndRegister()方法中,調(diào)用channelFactory.newChannel()方法用反射實(shí)例化了boss線程的NioServerSocketChannel记劝。
final ChannelFuture initAndRegister() {
Channel channel = null;
try {
channel = channelFactory.newChannel();
init(channel);
} catch (Throwable t) {
...
}
ChannelFuture regFuture = config().group().register(channel);
...
return regFuture;
}
證據(jù)如下变姨,在初始化ServerBootstrap時(shí),有這樣一句bootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)厌丑,它是指設(shè)置boss線程channel類型钳恕。
public B channel(Class<? extends C> channelClass) {
if (channelClass == null) {
throw new NullPointerException("channelClass");
}
return channelFactory(new ReflectiveChannelFactory<C>(channelClass));
}
在上面設(shè)置了ServerBootstrap的channelFactory,反射類為NioServerSocketChannel蹄衷,再以newChannel()方法實(shí)例化了NioServerSocketChannel,最終會(huì)來(lái)到這里:
public NioServerSocketChannel() {
this(newSocket(DEFAULT_SELECTOR_PROVIDER));
}
public NioServerSocketChannel(ServerSocketChannel channel) {
super(null, channel, SelectionKey.OP_ACCEPT);
config = new NioServerSocketChannelConfig(this, javaChannel().socket());
}
在這里設(shè)置了boss線程將監(jiān)聽SelectionKey.OP_ACCEPT事件厘肮,再看它的super方法愧口,NioServerSocketChannel繼承自AbstractNioMessageChannel,而AbstractNioMessageChannel也繼承自AbstractNioChannel类茂,AbstractNioChannel又繼承自AbstractChannel耍属,最終也會(huì)來(lái)到這里:
protected AbstractChannel(Channel parent) {
this.parent = parent;
id = newId();
unsafe = newUnsafe();
pipeline = newChannelPipeline();
}
在這里unsafe = newUnsafe()托嚣,調(diào)用本身抽象方法newUnsafe()實(shí)例化了本身Unsafe屬性,從以上的繼承關(guān)系鏈中有個(gè)AbstractNioMessageChannel類厚骗,因此此處實(shí)際調(diào)用的是AbstractNioMessageChannel的newUnsafe() 方法示启,該方法中new了一個(gè)內(nèi)部類NioMessageUnsafe實(shí)例,該內(nèi)部類繼承了AbstractNioUnsafe领舰。NioMessageUnsafe即來(lái)自于此夫嗓。
@Override
protected AbstractNioUnsafe newUnsafe() {
return new NioMessageUnsafe();
}
private final class NioMessageUnsafe extends AbstractNioUnsafe {
private final List<Object> readBuf = new ArrayList<Object>();
@Override
public void read() {
...
do {
//讀取SocketChannel消息/事件,封裝進(jìn)readBuf冲秽,實(shí)際上是封裝worker線程channel舍咖,供后續(xù)worker線程注冊(cè)此channel
int localRead = doReadMessages(readBuf);
...
allocHandle.incMessagesRead(localRead);
} while (allocHandle.continueReading());
int size = readBuf.size();
for (int i = 0; i < size; i ++) {
readPending = false;
//處理readBuf事件,實(shí)際上是為worker線程添加新channel锉桑,初始化childHandler排霉,pipeline及參數(shù)等信息
pipeline.fireChannelRead(readBuf.get(i));
}
readBuf.clear();
allocHandle.readComplete();
pipeline.fireChannelReadComplete();
}
}
NioByteUnsafe又是如何來(lái)的呢?
其實(shí)NioByteUnsafe來(lái)自于NioMessageUnsafe.read()方法民轴,該方法中有兩個(gè)重要方法之一doReadMessages(readBuf)攻柠,作用主要是封裝NioSocketChannel,以供worker線程添加channel和監(jiān)聽SelectionKey.OP_READ事件用后裸,我們?cè)谇懊鎸⑺欀亮薃bstractNioByteChannel瑰钮,繼續(xù)跟下去會(huì)發(fā)現(xiàn)AbstractNioByteChannel又繼承自AbstractNioChannel:
protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
super(parent);
this.ch = ch;
this.readInterestOp = readInterestOp;
ch.configureBlocking(false);
}
AbstractNioChannel繼承自AbstractChannel:
protected AbstractChannel(Channel parent) {
this.parent = parent;
id = newId();
unsafe = newUnsafe();
pipeline = newChannelPipeline();
}
在這里unsafe = newUnsafe(),調(diào)用本身抽象方法newUnsafe()實(shí)例化了本身Unsafe屬性轻抱,從以上的繼承關(guān)系鏈中有個(gè)NioSocketChannel類飞涂,因此此處實(shí)際調(diào)用的是NioSocketChannel的newUnsafe() 方法,該方法中new了一個(gè)內(nèi)部類NioSocketChannelUnsafe實(shí)例祈搜,該內(nèi)部類繼承了NioByteUnsafe较店。NioByteUnsafe即來(lái)自于此。
@Override
protected AbstractNioUnsafe newUnsafe() {
return new NioSocketChannelUnsafe();
}
private final class NioSocketChannelUnsafe extends NioByteUnsafe {
}
這樣容燕,worker線程的啟動(dòng)過(guò)程也講完了梁呈。
包括上篇文章《Netty的啟動(dòng)過(guò)程一》,大致講解了Netty服務(wù)端是如何啟動(dòng)boss線程和worker線程的蘸秘,如何讀取數(shù)據(jù)的官卡,但也僅是主要的枝干代碼,細(xì)節(jié)之處還有很多沒(méi)講全醋虏,還有很多重要組件寻咒,它們的功能及實(shí)現(xiàn)都沒(méi)講的。這兩篇文章的主要目的颈嚼,是以一個(gè)Netty新手的角度講解如何看Netty源碼毛秘,那就是大膽去猜,去驗(yàn)證,去查資料叫挟,去看別人思路艰匙,還有就是多打斷點(diǎn)去調(diào)試,不要想著一次全搞懂抹恳,而是多看多查多驗(yàn)證去彌補(bǔ)以前沒(méi)看到的员凝,沒(méi)看懂的,并不斷糾正以前錯(cuò)誤認(rèn)識(shí)的奋献,所謂Netty之大健霹,一鍋燉不下,其余的只能在后續(xù)文章慢慢講解了秽荞,這里先弄懂個(gè)大概即可骤公。