這一篇介紹了如何Channel如何接受SelectionKey.OP_ACCEPT的流程,接下就是接受SelectionKey.OP_READ的流程
1.NioEventLoop的processSelectedKey
當(dāng)readyOps為SelectionKey.OP_ACCEPT則進(jìn)入AbstractNioMessageChannel中的unsafe的read流程
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
int readyOps = k.readyOps();
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
unsafe.read();
}
}
2. 回顧ServerSocketChannel的accept方法
我們先回顧下nio中ServerSocketChannel的accept方法,在接受到SelectionKey.OP_ACCEPT時(shí),調(diào)用該方法獲取新連接的SocketChannel,并將SelectionKey.OP_READ注冊(cè)到selector當(dāng)中去,Netty中封裝勢(shì)必有這樣處理的代碼
private void handleInput(final SelectionKey key) throws IOException {
if (key.isValid()) {
// 處理新接入的請(qǐng)求消息
if (key.isAcceptable()) {
// Accept the new connection
ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
SocketChannel sc = ssc.accept();
sc.configureBlocking(false);
// Add the new connection to the selector
sc.register(selector, SelectionKey.OP_READ);
}
}
}
2. NioMessageUnsafe的read方法
private final class NioMessageUnsafe extends AbstractNioUnsafe {
private final List<Object> readBuf = new ArrayList<Object>();
@Override
public void read() {
assert eventLoop().inEventLoop();
final ChannelConfig config = config();
final ChannelPipeline pipeline = pipeline();
final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
allocHandle.reset(config);
boolean closed = false;
Throwable exception = null;
try {
try {
do {
int localRead = doReadMessages(readBuf);
if (localRead == 0) {
break;
}
if (localRead < 0) {
closed = true;
break;
}
allocHandle.incMessagesRead(localRead);
} while (allocHandle.continueReading());
} catch (Throwable t) {
exception = t;
}
int size = readBuf.size();
for (int i = 0; i < size; i ++) {
readPending = false;
pipeline.fireChannelRead(readBuf.get(i));
}
readBuf.clear();
allocHandle.readComplete();
pipeline.fireChannelReadComplete();
if (exception != null) {
closed = closeOnReadError(exception);
pipeline.fireExceptionCaught(exception);
}
if (closed) {
inputShutdown = true;
if (isOpen()) {
close(voidPromise());
}
}
} finally {
// Check if there is a readPending which was not processed yet.
// This could be for two reasons:
// * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method
// * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
//
// See https://github.com/netty/netty/issues/2254
if (!readPending && !config.isAutoRead()) {
removeReadOp();
}
}
}
}
read方法最需要關(guān)注的readBuf,readBuf是NioSocketChannel的數(shù)組,其由doReadMessages處理獲取,doReadMessages中調(diào)用了SocketUtils.accept方法,其內(nèi)部調(diào)用了ServerSocketChannel的accept方法,NioSocketChannel對(duì)ServerSocketChannel進(jìn)行了包裝
@Override
protected int doReadMessages(List<Object> buf) throws Exception {
SocketChannel ch = SocketUtils.accept(javaChannel());
try {
if (ch != null) {
buf.add(new NioSocketChannel(this, ch));
return 1;
}
} catch (Throwable t) {
logger.warn("Failed to create a new channel from an accepted socket.", t);
try {
ch.close();
} catch (Throwable t2) {
logger.warn("Failed to close a socket.", t2);
}
}
return 0;
}
//SocketUtils
public static SocketChannel accept(final ServerSocketChannel serverSocketChannel) throws IOException {
try {
return AccessController.doPrivileged(new PrivilegedExceptionAction<SocketChannel>() {
@Override
public SocketChannel run() throws IOException {
return serverSocketChannel.accept();
}
});
} catch (PrivilegedActionException e) {
throw (IOException) e.getCause();
}
}
3.NioSocketChannel
NioSocketChannel調(diào)用了父類AbstractNioByteChannel的構(gòu)造函數(shù),其監(jiān)聽(tīng)的事件是SelectionKey.OP_READ
protected AbstractNioByteChannel(Channel parent, SelectableChannel ch) {
super(parent, ch, SelectionKey.OP_READ);
}
4.ServerBootstrapAcceptor中register
ServerBootstrapAcceptor是在ServerBootstrap初始化時(shí)作為一個(gè)ChannelInboundHandler添加到Pineline中,其用于處理接收的Channel
在channelRead方法中調(diào)用了EventLoopGroup的register方法,將Channel注冊(cè)到work線程中(監(jiān)聽(tīng)SelectionKey.OP_READ事件)
@Override
@SuppressWarnings("unchecked")
public void channelRead(ChannelHandlerContext ctx, Object msg) {
final Channel child = (Channel) msg;
child.pipeline().addLast(childHandler);
setChannelOptions(child, childOptions, logger);
for (Entry<AttributeKey<?>, Object> e: childAttrs) {
child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
}
try {
childGroup.register(child).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
forceClose(child, future.cause());
}
}
});
} catch (Throwable t) {
forceClose(child, t);
}
}
參考:
Netty 接受請(qǐng)求過(guò)程源碼分析 (基于4.1.23)
Netty源碼分析-Server端啟動(dòng)