一臼勉、緩存空間分配器:ByteBufAllocator
ByteBufAllocator接口為ByteBuf分配器剪况,用于分配新的ByteBuf存儲IO數(shù)據(jù)冒签。
1闯两、ByteBufAllocator
接口定義了 ByteBuf ioBuffer(int initialCapacity) 方法俘枫,用于分配一個ByteBuf
2腥沽、AbstractByteBufAllocator
實(shí)現(xiàn)了ByteBuf ioBuffer(int initialCapacity)方法,根據(jù)系統(tǒng)配置實(shí)際調(diào)用自己的抽象方法
ByteBuf newDirectBuffer(int initialCapacity,int maxCapacity);
ByteBuf newHeapBuffer(int initialCapacity,int maxCapacity);
其子類UnpooledByteBufAllocator和PooledByteBufAllocator實(shí)現(xiàn)了上述抽象方法鸠蚪。
3今阳、UnpooledByteBufAllocator
非池化的ByteBuf分配器,具體實(shí)現(xiàn)了抽象方法邓嘹。
1)對于newDirectBuffer
根據(jù)平臺是否支持Unsafe方式酣栈,將實(shí)例化出一個UnpooledDirectByteBuf/UnpooledUnsafeDirectByteBuf。
UnpooledDirectByteBuf是一個基于NIO的Buffer汹押,其內(nèi)部持有一個NIO ByteBuffer buffer矿筝,并通過ByteBuffer.allocateDirect(initcapacity)方法進(jìn)行實(shí)例化。
2)對于newHeapBuffer
根據(jù)平臺是否支持Unsafe方式棚贾,將實(shí)例化出一個UnpooledHeapByteBuf/UnpooledUnsafeHeapByteBuf窖维。
UnpooledHeapByteBuf是一個基于java heap的Buffer,其內(nèi)部直接在java heap中申請byte[] array空間進(jìn)行IO數(shù)據(jù)存儲妙痹。
二铸史、接收緩存分配器:RecvByteBufAllocator
1、接口概述
RecvByteBufAllocator接口用于分配一塊大小合理的buffer空間怯伊,存儲Channel讀入的IO數(shù)據(jù)琳轿。具體功能交由內(nèi)部接口Handle定義。
interface Handle {
/**
* Creates a new receive buffer whose capacity is probably large enough to read all inbound data and small
* enough not to waste its space.
*/
ByteBuf allocate(ByteBufAllocator alloc);
/**
* Increment the number of messages that have been read for the current read loop.
* @param numMessages The amount to increment by.
*/
void incMessagesRead(int numMessages);
/**
* Set the bytes that have been read for the last read operation.
* This may be used to increment the number of bytes that have been read.
* @param bytes The number of bytes from the previous read operation. This may be negative if an read error
* occurs. If a negative value is seen it is expected to be return on the next call to
* {@link #lastBytesRead()}. A negative value will signal a termination condition enforced externally
* to this class and is not required to be enforced in {@link #continueReading()}.
*/
void lastBytesRead(int bytes);
/**
* Determine if the current read loop should should continue.
* @return {@code true} if the read loop should continue reading. {@code false} if the read loop is complete.
*/
boolean continueReading();
/**
* The read has completed.
*/
void readComplete();
}
allocate(ByteBufAllocator alloc)方法用于創(chuàng)建存放讀入IO數(shù)據(jù)的ByteBuf耿芹。
readComplete()在讀操作完成后調(diào)用崭篡,在實(shí)現(xiàn)類HandleImpl中執(zhí)行record(int actualReadBytes)做了調(diào)整分配空間大小的邏輯。
2吧秕、實(shí)現(xiàn)類AdaptiveRecvByteBufAllocator
接口RecvByteBufAllocator的實(shí)現(xiàn)類琉闪,能根據(jù)前一次實(shí)際讀取的字節(jié)數(shù)量,自適應(yīng)調(diào)整當(dāng)前緩存分配的大小砸彬。
三颠毙、NioEventLoop處理OP_READ事件
1斯入、代碼入口
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
//上面省略...
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
unsafe.read();
if (!ch.isOpen()) {
// Connection already closed - no need to handle write.
return;
}
}
//下面省略...
}
NioEventLoop在處理其selector監(jiān)聽到的OP_READ事件時,會執(zhí)行上面的代碼邏輯蛀蜜,將OP_READ事件最終交由Unsafe處理刻两,即執(zhí)行NioByteUnsafe.read()方法。
@Override
public final void read() {
final ChannelConfig config = config();
final ChannelPipeline pipeline = pipeline();
final ByteBufAllocator allocator = config.getAllocator();
final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();
allocHandle.reset(config);
ByteBuf byteBuf = null;
boolean close = false;
try {
do {
byteBuf = allocHandle.allocate(allocator);
allocHandle.lastBytesRead(doReadBytes(byteBuf));
if (allocHandle.lastBytesRead() <= 0) {
// nothing was read. release the buffer.
byteBuf.release();
byteBuf = null;
close = allocHandle.lastBytesRead() < 0;
break;
}
allocHandle.incMessagesRead(1);
readPending = false;
pipeline.fireChannelRead(byteBuf);
byteBuf = null;
} while (allocHandle.continueReading());
allocHandle.readComplete();
pipeline.fireChannelReadComplete();
if (close) {
closeOnRead(pipeline);
}
} catch (Throwable t) {
handleReadException(pipeline, byteBuf, t, close, allocHandle);
} finally {
// Check if there is a readPending which was not processed yet.
// This could be for two reasons:
// * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method
// * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
//
// See https://github.com/netty/netty/issues/2254
if (!readPending && !config.isAutoRead()) {
removeReadOp();
}
}
}
2涵防、執(zhí)行邏輯
1)獲取緩存分配器
final ByteBufAllocator allocator = config.getAllocator();
final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();
循環(huán)執(zhí)行以下邏輯直到跳出--->
2)分配緩存ByteBuf
ByteBuf byteBuf = allocHandle.allocate(allocator);
3)將數(shù)據(jù)從nio.SocketChannel讀取到byteBuf中
doReadBytes(byteBuf)
實(shí)際調(diào)用了NioSocketChannel.doReadBytes(ByteBuf byteBuf)方法闹伪。
進(jìn)一步調(diào)用ByteBuf.writeBytes(ScatteringByteChannel in, int length)方法。
最終底層調(diào)用nio的ReadableByteChannel.read(ByteBuffer dst)方法壮池。
if (allocHandle.lastBytesRead() <= 0) {
// nothing was read. release the buffer.
byteBuf.release();
byteBuf = null;
close = allocHandle.lastBytesRead() < 0;
break;
}
若本次讀取到的數(shù)據(jù)長度==0偏瓤,表示本次OP_READ事件的數(shù)據(jù)已讀取完畢,退出循環(huán)椰憋。
若本次讀取到的數(shù)據(jù)長度<0厅克,表示對端已斷開socket連接,退出循環(huán)橙依,執(zhí)行NioByteUnsafe.closeOnRead()方法關(guān)閉Channel证舟,關(guān)閉Channel的過程中執(zhí)行了pipeline.fireChannelInactive()和pipeline.fireChannelUnregistered()。
4)觸發(fā)ChannelRead事件窗骑,將讀ByteBuf交給pipeline流轉(zhuǎn)
pipeline.fireChannelRead(byteBuf);
byteBuf = null;
<----循環(huán)結(jié)束女责。結(jié)束條件:a、localReadAmount == 0或-1创译,b抵知、循環(huán)讀取到ByteBuf個數(shù)超過指定閾值
5)根據(jù)本次已讀字節(jié)數(shù),調(diào)整RecvByteBufAllocator的下次分配的緩存大小
allocHandle.readComplete();
6)觸發(fā)ChannelReadComplete事件
pipeline.fireChannelReadComplete();