先來看下Channel相關(guān)類圖:
為了便于理解棚亩,上面的類圖對層次關(guān)系做了一定的簡化陪蜻。
Channel接口定義了Netty中網(wǎng)絡(luò)IO最頂層的框架慎陵。AbstractChannel是Channel接口的骨架實現(xiàn),這個類中定義了channel的幾個重要成員脑溢,id(ChannelId)僵朗,unsafe(Unsafe),pipeline(DefaultChannelPipeline)屑彻,eventLoop(EventLoop)验庙。服務(wù)端channel(NioServerSocketChannel)和客戶端channel(NioSocketChannel)都會逐層的調(diào)用父類構(gòu)造函數(shù),從而創(chuàng)建創(chuàng)建或綁定上述幾個成員變量社牲。AbstractNioChannel主要作用是負責(zé)Nio相關(guān)的部分粪薛,使用selector的方式監(jiān)聽讀寫事件。AbstractNioChannel有成員變量SelectionKey搏恤,成員變量SelectableChannel(保存底層jdk的channel)违寿,成員變量readInterestOp(OP_READ或OP_ACCEPT事件)湃交。
AbstractNioChannel的構(gòu)造函數(shù):
protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
super(parent);
this.ch = ch; // 保存底層jdk的channel
this.readInterestOp = readInterestOp; // 保存感興趣的事件
try {
ch.configureBlocking(false); // 設(shè)置jdk的底層channel為非阻塞模式
} catch (IOException e) {
try {
ch.close();
} catch (IOException e2) {
if (logger.isWarnEnabled()) {
logger.warn(
"Failed to close a partially initialized socket.", e2);
}
}
throw new ChannelException("Failed to enter non-blocking mode.", e);
}
}
NioSocketChannel和NioServerSocketChannel注冊事件區(qū)別
接下來就是兩大陣營服務(wù)端channel(AbstractNioMessageChannel,NioServerSocketChannel)和客戶端channel(AbstractNioByteChannel藤巢,NioServerChannel)搞莺。它們都繼承了AbstractNioChannel,說明它們都是通過selector輪詢IO事件的掂咒,它們之間最大的區(qū)別是它們向selector注冊的IO事件不同才沧。
public NioServerSocketChannel(ServerSocketChannel channel) {
super(null, channel, SelectionKey.OP_ACCEPT); // 服務(wù)端channel注冊O(shè)P_ACCEPT事件
config = new NioServerSocketChannelConfig(this, javaChannel().socket());
}
public NioSocketChannel(Channel parent, SocketChannel socket) {
super(parent, socket); // 調(diào)用 AbstractNioByteChannel 的構(gòu)造函數(shù)
config = new NioSocketChannelConfig(this, socket.socket());
}
protected AbstractNioByteChannel(Channel parent, SelectableChannel ch) {
super(parent, ch, SelectionKey.OP_READ); // 客戶端channel注冊O(shè)P_READ事件,關(guān)心數(shù)據(jù)的讀寫
}
NioSocketChannel和NioServerSocketChannel抽象讀事件的區(qū)別
服務(wù)端channel和客戶端channel的另一個區(qū)別是底層的Unsafe不同绍刮。Unsafe負責(zé)具體實現(xiàn)是客戶端channel還是客戶端channel的協(xié)議温圆。服務(wù)端channel對應(yīng)的是NioMessageUnsafe,客戶端channel對應(yīng)的是NioByteUnsafe(NioSocketChannelUnsafe繼承自它)录淡。
從源碼中來分析客戶端unsafe的創(chuàng)建:
protected AbstractNioUnsafe newUnsafe() {
return new NioSocketChannelUnsafe(); // 客戶端channel直接創(chuàng)建unsafe
}
// NioSocketChannelUnsafe 只有一個準(zhǔn)備關(guān)閉的方法捌木,大部分功能還是來自于 NioByteUnsafe
private final class NioSocketChannelUnsafe extends NioByteUnsafe {
@Override
protected Executor prepareToClose() {
try {
if (javaChannel().isOpen() && config().getSoLinger() > 0) {
// We need to cancel this key of the channel so we may not end up in a eventloop spin
// because we try to read or write until the actual close happens which may be later due
// SO_LINGER handling.
// See https://github.com/netty/netty/issues/4449
doDeregister();
return GlobalEventExecutor.INSTANCE;
}
} catch (Throwable ignore) {
// Ignore the error as the underlying channel may be closed in the meantime and so
// getSoLinger() may produce an exception. In this case we just return null.
// See https://github.com/netty/netty/issues/4449
}
return null;
}
}
服務(wù)端channel的unsafe油坝,在AbstractNioMessageChannel中可以看到:
protected AbstractNioUnsafe newUnsafe() {
return new NioMessageUnsafe(); // 服務(wù)端channel創(chuàng)建unsafe
}
服務(wù)端channel和客戶端channel的第三個不同:讀取的內(nèi)容不同嫉戚。服務(wù)端channel的讀是讀取一條新的連接;客戶端channel的讀是讀取IO數(shù)據(jù)澈圈。
我們來看服務(wù)端channel讀事件相關(guān)的源碼彬檀,NioMessageUnsafe的read方法:
private final class NioMessageUnsafe extends AbstractNioUnsafe {
private final List<Object> readBuf = new ArrayList<Object>();
@Override
public void read() {
assert eventLoop().inEventLoop();
final ChannelConfig config = config();
final ChannelPipeline pipeline = pipeline();
final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
allocHandle.reset(config);
boolean closed = false;
Throwable exception = null;
try {
try {
do {
// 最核心的功能doReadMessage,也就是讀取一條連接
int localRead = doReadMessages(readBuf);
if (localRead == 0) {
break;
}
if (localRead < 0) {
closed = true;
break;
}
allocHandle.incMessagesRead(localRead);
} while (allocHandle.continueReading());
} catch (Throwable t) {
exception = t;
}
int size = readBuf.size();
for (int i = 0; i < size; i ++) {
readPending = false;
pipeline.fireChannelRead(readBuf.get(i));
}
readBuf.clear();
allocHandle.readComplete();
pipeline.fireChannelReadComplete();
if (exception != null) {
closed = closeOnReadError(exception);
pipeline.fireExceptionCaught(exception);
}
if (closed) {
inputShutdown = true;
if (isOpen()) {
close(voidPromise());
}
}
} finally {
// Check if there is a readPending which was not processed yet.
// This could be for two reasons:
// * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method
// * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
//
// See https://github.com/netty/netty/issues/2254
if (!readPending && !config.isAutoRead()) {
removeReadOp();
}
}
}
}
protected int doReadMessages(List<Object> buf) throws Exception {
// 核心代碼
SocketChannel ch = SocketUtils.accept(javaChannel());
try {
if (ch != null) {
buf.add(new NioSocketChannel(this, ch));
return 1;
}
} catch (Throwable t) {
logger.warn("Failed to create a new channel from an accepted socket.", t);
try {
ch.close();
} catch (Throwable t2) {
logger.warn("Failed to close a socket.", t2);
}
}
return 0;
}
public static SocketChannel accept(final ServerSocketChannel serverSocketChannel) throws IOException {
try {
return AccessController.doPrivileged(new PrivilegedExceptionAction<SocketChannel>() {
@Override
public SocketChannel run() throws IOException {
// 服務(wù)端channel瞬女,accept客戶端連接
return serverSocketChannel.accept();
}
});
} catch (PrivilegedActionException e) {
throw (IOException) e.getCause();
}
}
來看客戶端channel的讀事件窍帝,NioByteUnsafe的read方法:
@Override
public final void read() {
final ChannelConfig config = config();
if (shouldBreakReadReady(config)) {
clearReadPending();
return;
}
final ChannelPipeline pipeline = pipeline();
final ByteBufAllocator allocator = config.getAllocator();
final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();
allocHandle.reset(config);
ByteBuf byteBuf = null;
boolean close = false;
try {
do {
byteBuf = allocHandle.allocate(allocator);
// 從這里看出,客戶端channel是讀字節(jié)
allocHandle.lastBytesRead(doReadBytes(byteBuf));
if (allocHandle.lastBytesRead() <= 0) {
// nothing was read. release the buffer.
byteBuf.release();
byteBuf = null;
close = allocHandle.lastBytesRead() < 0;
if (close) {
// There is nothing left to read as we received an EOF.
readPending = false;
}
break;
}
allocHandle.incMessagesRead(1);
readPending = false;
pipeline.fireChannelRead(byteBuf);
byteBuf = null;
} while (allocHandle.continueReading());
allocHandle.readComplete();
pipeline.fireChannelReadComplete();
if (close) {
closeOnRead(pipeline);
}
} catch (Throwable t) {
handleReadException(pipeline, byteBuf, t, close, allocHandle);
} finally {
// Check if there is a readPending which was not processed yet.
// This could be for two reasons:
// * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method
// * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
//
// See https://github.com/netty/netty/issues/2254
if (!readPending && !config.isAutoRead()) {
removeReadOp();
}
}
}
}
NioSocketChannel和NioServerSocketChannel綁定channelConfig的區(qū)別
最后一點诽偷,服務(wù)端channel和客戶端channel綁定的channeConfig不同坤学。