前言
??之前的文章分析到了服務(wù)端NioServerSocketChannel的創(chuàng)建注冊(cè)及注冊(cè)accept事件奈懒。到現(xiàn)在為止宪巨,關(guān)于服務(wù)端,我們還有多個(gè)疑問(wèn)未解開(kāi):例如當(dāng)有客戶端連接過(guò)來(lái)時(shí)捏卓,服務(wù)端要怎么處理,以及后續(xù)的讀寫(xiě)如何進(jìn)行的怠晴?之前的分析都是在服務(wù)端的父事件循環(huán)組中,那么子事件循環(huán)組又是怎么起作用的稿械?之前提到了子處理器childHandler被封裝在了ServerBootStrapAcceptor中添加到了流水線冲粤,那么子處理器又是怎么工作的,有沒(méi)有被添加到pipeline中梯捕?本文將對(duì)這些問(wèn)題進(jìn)行解答。
服務(wù)端接收到連接后做了什么
??我們不得不再次回到下面這個(gè)方法傀顾,當(dāng)有read或者accept事件到來(lái)時(shí),執(zhí)行unsaf.read()(對(duì)于連接事件和read事件寒砖,因?yàn)槠渌鶎?duì)應(yīng)的通道一個(gè)是NioServerSocketChannel一個(gè)是NioSocketChannel,兩者的unsafe是不同的入撒,對(duì)于連接事件,unsafe.read位于AbstractNioUnsafe中)茅逮。
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
final NioUnsafe unsafe = ch.unsafe();
if (!k.isValid()) {
final EventLoop eventLoop;
try {
eventLoop = ch.eventLoop();
} catch (Throwable ignored) {
return;
}
if (eventLoop != this || eventLoop == null) {
return;
}
unsafe.close(unsafe.voidPromise());
return;
}
try {
int readyOps = k.readyOps();
if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
int ops = k.interestOps();
ops &= ~SelectionKey.OP_CONNECT;
k.interestOps(ops);
unsafe.finishConnect();
}
// Process OP_WRITE first as we may be able to write some queued buffers and so free memory.
if ((readyOps & SelectionKey.OP_WRITE) != 0) {
// Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
ch.unsafe().forceFlush();
}
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
unsafe.read();
}
} catch (CancelledKeyException ignored) {
unsafe.close(unsafe.voidPromise());
}
}
@Override
public void read() {
/*省略代碼*/
try {
try {
for (;;) {
int localRead = doReadMessages(readBuf);
if (localRead == 0) {
break;
}
if (localRead < 0) {
closed = true;
break;
}
// stop reading and remove op
if (!config.isAutoRead()) {
break;
}
if (readBuf.size() >= maxMessagesPerRead) {
break;
}
}
} catch (Throwable t) {
exception = t;
}
setReadPending(false);
int size = readBuf.size();
for (int i = 0; i < size; i ++) {
pipeline.fireChannelRead(readBuf.get(i));
}
readBuf.clear();
pipeline.fireChannelReadComplete();
/*省略代碼*/
} finally {
/*省略代碼*/
}
}
/*進(jìn)行連接的接收*/
@Override
protected int doReadMessages(List<Object> buf) throws Exception {
SocketChannel ch = SocketUtils.accept(javaChannel());
try {
if (ch != null) {
buf.add(new NioSocketChannel(this, ch));
return 1;
}
} catch (Throwable t) {
logger.warn("Failed to create a new channel from an accepted socket.", t);
try {
ch.close();
} catch (Throwable t2) {
logger.warn("Failed to close a socket.", t2);
}
}
return 0;
}
??在doReadMessages方法中碉考,會(huì)通過(guò)NioServerSocketChannel內(nèi)部的ServerSocketChannel來(lái)完成連接的接受挺身,如果此時(shí)有連接進(jìn)來(lái)侯谁,那么會(huì)生成SocketChannel實(shí)例章钾,netty將其封裝為自身的NioSocketChannel實(shí)例加入到緩沖buf中。隨后會(huì)將緩沖區(qū)中內(nèi)容(新接收的連接)通過(guò)fireChannelRead進(jìn)行流水線傳輸惨撇。可以發(fā)現(xiàn)魁衙,NioServerSocketChannel中的ServerSocketChannel在接收到連接后會(huì)通過(guò)NioServerSocketChannel中的流水線的channelRead方法進(jìn)行傳輸株搔。
??隨后流水線channelRead執(zhí)行到ServerBootstrapAcceptor。
@Override
@SuppressWarnings("unchecked")
public void channelRead(ChannelHandlerContext ctx, Object msg) {
final Channel child = (Channel) msg;
child.pipeline().addLast(childHandler);
setChannelOptions(child, childOptions, logger);
for (Entry<AttributeKey<?>, Object> e: childAttrs) {
child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
}
try {
childGroup.register(child).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
forceClose(child, future.cause());
}
}
});
} catch (Throwable t) {
forceClose(child, t);
}
}
??由于我們已經(jīng)知道傳入的msg是一個(gè)netty已接收連接的通道(實(shí)際封裝了SocketChannel的NioSocketChannel)纵隔,獲取到它的pipeline,并將childHandler添加到它的流水線中炮姨。如果我們給其配置的childHandler是ChannelInitializer的話捌刮,那么隨后還會(huì)將其他的處理器添加到流水線中。同樣的剑令,新接收的連接還沒(méi)有注冊(cè)糊啡,所以也需要將其注冊(cè)拄查,只不過(guò)在server端吁津,新接受的連接注冊(cè)到childGroup,也就是子事件循環(huán)組中,最終它會(huì)被注冊(cè)到子事件循環(huán)組中的某個(gè)NioEventLoop上碍脏,它之后的讀寫(xiě)的事件操作就與之前分析客戶端時(shí)相似梭依。
?? 在netty中NioServerSocketChannel負(fù)責(zé)接收連接,它注冊(cè)在父NioEventLoopGroup中役拴,而且他的通道的handler最后是一個(gè)ServerBootStrapAcceptor處理器。在接收到新連接時(shí)钾埂,出發(fā)fireChannelRead回調(diào)方法的執(zhí)行河闰,當(dāng)執(zhí)行到ServerBootStrapAcceptor時(shí),ServerBootStrapAcceptor將子處理器加入到新連接的通道的流水線中褥紫。那么以Echo Server這個(gè)例子來(lái)看姜性,兩種通道的流水線分別是:
1 NioServerSocketChannel的流水線(在前文已經(jīng)見(jiàn)到過(guò))
2 新連接到來(lái)后NioSocketChannel的流水線
??
*鏈接
1. Netty解析:第一個(gè)demo——Echo Server
2. Netty解析:NioEventLoopGroup事件循環(huán)組
3. Netty解析:NioSocketChannel、NioServerSocketChannel的創(chuàng)建及注冊(cè)
4. Netty解析:Handler髓考、Pipeline大動(dòng)脈及其在注冊(cè)過(guò)程中體現(xiàn)
5. Netty解析:connect/bind方法背后
6. Netty解析:服務(wù)端如何接受連接并后續(xù)處理讀寫(xiě)事件