目錄:
- NioSocketChannel$NioSocketChannelUnsafe 的 read 方法
- 首先看 ByteBufAllocator
- 再看 RecvByteBufAllocator.Handle
- 兩者如何配合進(jìn)行內(nèi)存分配
- 如何讀取到 ByteBuf
- 總結(jié)
前言
在之前的文章 Netty 核心組件 Pipeline 源碼分析(二)一個(gè)請(qǐng)求的 pipeline 之旅中循诉,我們知道了當(dāng)客戶(hù)端請(qǐng)求進(jìn)來(lái)的時(shí)候,boss 線(xiàn)程會(huì)將 Socket 包裝后交給 worker 線(xiàn)程店茶,worker 線(xiàn)程會(huì)將這個(gè) Socket 注冊(cè) selector 的讀事件愈案,當(dāng)讀事件進(jìn)來(lái)的時(shí)候码泞,會(huì)調(diào)用 unsafe 的 read 方法悼沿,這個(gè)方法的主要作用是讀取 Socket 緩沖區(qū)的內(nèi)存,并包裝成 Netty 的 ByteBuf 對(duì)象饼暑,最后傳遞進(jìn) pipeline 中的所有節(jié)點(diǎn)完成處理婚被。
今天狡忙,我們就要好好的看看這個(gè) read方法的實(shí)現(xiàn)。
1. NioSocketChannel$NioSocketChannelUnsafe 的 read 方法
源碼如下:
public final void read() {
final ChannelConfig config = config();
final ChannelPipeline pipeline = pipeline();
// 用來(lái)處理內(nèi)存的分配:池化或者非池化 UnpooledByteBufAllocator
final ByteBufAllocator allocator = config.getAllocator();
// 用來(lái)計(jì)算此次讀循環(huán)應(yīng)該分配多少內(nèi)存 AdaptiveRecvByteBufAllocator 自適應(yīng)計(jì)算緩沖分配
final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();
allocHandle.reset(config);// 重置為0
ByteBuf byteBuf = null;
boolean close = false;
try {
do {
byteBuf = allocHandle.allocate(allocator);
allocHandle.lastBytesRead(doReadBytes(byteBuf));
if (allocHandle.lastBytesRead() <= 0) {// 如果上一次讀到的字節(jié)數(shù)小于等于0址芯,清理引用和跳出循環(huán)
// nothing was read. release the buffer.
byteBuf.release();// 引用 -1
byteBuf = null;
close = allocHandle.lastBytesRead() < 0;// 如果遠(yuǎn)程已經(jīng)關(guān)閉連接
if (close) {
// There is nothing left to read as we received an EOF.
readPending = false;
}
break;
}
allocHandle.incMessagesRead(1);// totalMessages += amt;
readPending = false;
pipeline.fireChannelRead(byteBuf);
byteBuf = null;
} while (allocHandle.continueReading());
allocHandle.readComplete();
pipeline.fireChannelReadComplete();
if (close) {
closeOnRead(pipeline);
}
} catch (Throwable t) {
handleReadException(pipeline, byteBuf, t, close, allocHandle);
} finally {
if (!readPending && !config.isAutoRead()) {
removeReadOp();
}
}
}
代碼很長(zhǎng)灾茁,怎么辦呢?當(dāng)然是拆解谷炸,然后逐個(gè)擊破删顶。步驟如下:
- 獲取到 Channel 的 config 對(duì)象,并從該對(duì)象中獲取內(nèi)存分配器淑廊,還有"計(jì)算內(nèi)存分配器"逗余。
- 將
計(jì)算內(nèi)存分配器
重置。 - 進(jìn)入一個(gè)循環(huán)季惩,循環(huán)體的作用是:使用內(nèi)存分配器獲取數(shù)據(jù)容器-----ByteBuf录粱,調(diào)用 doReadBytes 方法將數(shù)據(jù)讀取到容器中,如果這次讀取什么都沒(méi)有或遠(yuǎn)程連接關(guān)閉画拾,則跳出循環(huán)啥繁。還有,如果滿(mǎn)足了跳出推薦青抛,也要結(jié)束循環(huán)旗闽,不能無(wú)限循環(huán),默認(rèn)16 次,默認(rèn)參數(shù)來(lái)自 AbstractNioByteChannel 的 屬性 ChannelMetadata 類(lèi)型的 METADATA 實(shí)例适室。每讀取一次就調(diào)用 pipeline 的 channelRead 方法嫡意,為什么呢?因?yàn)橛捎?TCP 傳輸如果包過(guò)大的話(huà)捣辆,丟失的風(fēng)險(xiǎn)會(huì)更大蔬螟,導(dǎo)致重傳,所以汽畴,大的數(shù)據(jù)流會(huì)分成多次傳輸旧巾。而 channelRead 方法也會(huì)被調(diào)用多次,因此忍些,使用 channelRead 方法的時(shí)候需要注意鲁猩,如果數(shù)據(jù)量大,最好將數(shù)據(jù)放入到緩存中罢坝,讀取完畢后绳匀,再進(jìn)行處理。
- 跳出循環(huán)后炸客,調(diào)用
allocHandle
的 readComplete 方法,表示讀取已完成戈钢,并記錄讀取記錄痹仙,用于下次分配合理內(nèi)存。 - 調(diào)用 pipeline 的方法殉了。
接下來(lái)就一步步看开仰。
2. 首先看 ByteBufAllocator
首先看看這個(gè)節(jié)點(diǎn)的定義:
Implementations are responsible to allocate buffers. Implementations of this interface are expected to be hread-safe.
實(shí)現(xiàn)負(fù)責(zé)分配緩沖區(qū)。這個(gè)接口的實(shí)現(xiàn)應(yīng)該是線(xiàn)程安全的薪铜。
通過(guò)這個(gè)接口众弓,可以看出來(lái),主要作用是創(chuàng)建 ByteBuf隔箍,這個(gè) ByteBuf 是 Netty 用來(lái)替代 NIO 的 ByteBuffer 的谓娃,是存儲(chǔ)數(shù)據(jù)的緩存區(qū)。其中蜒滩,這個(gè)接口有一個(gè)默認(rèn)實(shí)現(xiàn) ByteBufUtil.DEFAULT_ALLOCATOR :該實(shí)現(xiàn)根據(jù)配置創(chuàng)建一個(gè) 池化或非池化的緩存區(qū)分配器滨达。該參數(shù)是 io.netty.allocator.type
。
同時(shí)俯艰,由于很多方法都是重載的捡遍,那就說(shuō)說(shuō)上面的主要方法作用:
buffer() // 返回一個(gè) ByteBuf 對(duì)象,默認(rèn)直接內(nèi)存竹握。如果平臺(tái)不支持画株,返回堆內(nèi)存。
heapBuffer()// 返回堆內(nèi)存緩存區(qū)
directBuffer()// 返回直接內(nèi)存緩沖區(qū)
compositeBuffer() // 返回一個(gè)復(fù)合緩沖區(qū)∥酱可能同時(shí)包含堆內(nèi)存和直接內(nèi)存蜈项。
ioBuffer() // 當(dāng)當(dāng)支持 Unsafe 時(shí),返回直接內(nèi)存的 Bytebuf良拼,否則返回返回基于堆內(nèi)存战得,當(dāng)使用 PreferHeapByteBufAllocator 時(shí)返回堆內(nèi)存
3. 再看 RecvByteBufAllocator.Handle
首先看這個(gè)接口:
上圖中, Handle 是 RecvByteBufAllocator 的內(nèi)部接口庸推。而 RecvByteBufAllocator 是如何定義的呢常侦?
Creates a new handle. The handle provides the actual operations and keeps the internal information which is required for predicting an optimal buffer capacity.
創(chuàng)建一個(gè)新的句柄。句柄提供了實(shí)際操作贬媒,并保留了用于預(yù)測(cè)最佳緩沖區(qū)容量所需的內(nèi)部信息聋亡。
該接口只定義了一個(gè)方法:newHandle()。
而 handle 的作用是什么呢际乘?
ByteBuf allocate(ByteBufAllocator alloc);//創(chuàng)建一個(gè)新的接收緩沖區(qū)坡倔,其容量可能大到足以讀取所有入站數(shù)據(jù)和小到數(shù)據(jù)足夠不浪費(fèi)它的空間。
int guess();// 猜測(cè)所需的緩沖區(qū)大小脖含,不進(jìn)行實(shí)際的分配
void reset(ChannelConfig config);// 每次開(kāi)始讀循環(huán)之前罪塔,重置相關(guān)屬性
void incMessagesRead(int numMessages);// 增加本地讀循環(huán)的次數(shù)
void lastBytesRead(int bytes); // 設(shè)置最后一次讀到的字節(jié)數(shù)
int lastBytesRead(); // 最后一次讀到的字節(jié)數(shù)
void attemptedBytesRead(int bytes); // 設(shè)置讀操作嘗試讀取的字節(jié)數(shù)
void attemptedBytesRead(); // 獲取嘗試讀取的字節(jié)數(shù)
boolean continueReading(); // 判斷是否需要繼續(xù)讀
void readComplete(); // 讀結(jié)束后調(diào)用
從上面的方法中,可以看出养葵,該接口的主要作用就是計(jì)算字節(jié)數(shù)征堪,如同 RecvByteBufAllocator 的文檔說(shuō)的那樣,根據(jù)預(yù)測(cè)和計(jì)算最佳大小的緩存區(qū)关拒,確保不浪費(fèi)佃蚜。
4. 兩者如何配合進(jìn)行內(nèi)存分配
在默認(rèn)的 config (NioSocketChannelConfig)中,allocator 來(lái)自 ByteBufAllocator 接口的默認(rèn)實(shí)例着绊,allocHandle 來(lái)自 AdaptiveRecvByteBufAllocator 自適應(yīng)循環(huán)緩存分配器 的內(nèi)部類(lèi) HandleImpl谐算。
好,知道了他們的默認(rèn)實(shí)現(xiàn)归露,我們一個(gè)方法看看洲脂。
首先看 reset 方法:
public void reset(ChannelConfig config) {
this.config = config;
maxMessagePerRead = maxMessagesPerRead();
totalMessages = totalBytesRead = 0;
}
設(shè)置了上次獲取的最大消息讀取次數(shù)(默認(rèn)16),將之前計(jì)算的讀取消息總數(shù)歸零剧包。該方法如同他的名字腮考,歸零重置。
再看看 allocHandle.allocate(allocator) 方法的實(shí)現(xiàn)玄捕。
public ByteBuf allocate(ByteBufAllocator alloc) {
return alloc.ioBuffer(guess());
}
我們剛剛說(shuō)的 ioBuffer 方法踩蔚,該方法默認(rèn)返回直接內(nèi)存緩沖區(qū)。而 guess() 方法返回一個(gè)猜測(cè)的大小枚粘,一個(gè) nextReceiveBufferSize 屬性馅闽,默認(rèn) 1024,也就是說(shuō),默認(rèn)創(chuàng)建一個(gè) 1024 大小的直接內(nèi)存緩沖區(qū)福也。這個(gè)值的設(shè)定來(lái)自 HandleImpl 的構(gòu)造方法局骤,存儲(chǔ)在一個(gè) SIZE_TABLE 的數(shù)組中。
我們還是看看 RecvByteBufAllocator 的實(shí)現(xiàn)類(lèi) AdaptiveRecvByteBufAllocator 的具體內(nèi)容吧
static final int DEFAULT_MINIMUM = 64; // 緩存區(qū)最小值
static final int DEFAULT_INITIAL = 1024; // 緩沖區(qū)初始值
static final int DEFAULT_MAXIMUM = 65536; // 緩沖區(qū)最大值
private static final int INDEX_INCREMENT = 4;// 當(dāng)發(fā)現(xiàn)緩存過(guò)小暴凑,數(shù)組下標(biāo)自增值
private static final int INDEX_DECREMENT = 1;// 當(dāng)發(fā)現(xiàn)緩沖區(qū)過(guò)大峦甩,數(shù)組下標(biāo)自減值
private static final int[] SIZE_TABLE;
static {
List<Integer> sizeTable = new ArrayList<Integer>();
for (int i = 16; i < 512; i += 16) {
sizeTable.add(i);
}
for (int i = 512; i > 0; i <<= 1) {
sizeTable.add(i);
}
SIZE_TABLE = new int[sizeTable.size()];
for (int i = 0; i < SIZE_TABLE.length; i ++) {
SIZE_TABLE[i] = sizeTable.get(i);
}
}
樓主在上面的代碼中寫(xiě)了注釋?zhuān)@個(gè) SIZE_TABLE 的作用是存儲(chǔ)緩存區(qū)大小的一個(gè) int 數(shù)組,從 static 塊中可以看到现喳,這個(gè)數(shù)組從16開(kāi)始凯傲,同時(shí)遞增16,直到值到了 512嗦篱,也就是下標(biāo) 31 的地方冰单,遞增策略變?yōu)榱?每次 * 2,直到溢出灸促。最終的數(shù)組長(zhǎng)度為 53诫欠。而對(duì)應(yīng)的值接近 int 最大值。
好浴栽,回到 allocate 方法中荒叼,進(jìn)入到 ioBuffer 方法查看:
public ByteBuf ioBuffer(int initialCapacity) {
if (PlatformDependent.hasUnsafe()) {
return directBuffer(initialCapacity);
}
return heapBuffer(initialCapacity);
}
判斷,如果平臺(tái)支持 unSafe典鸡,就使用直接內(nèi)存被廓,否則使用堆內(nèi)存,初始大小就是我們剛剛說(shuō)的 1024椿每。而這個(gè)判斷的標(biāo)準(zhǔn)是:如果嘗試獲取 Unsafe 的時(shí)候有異常了,則賦值給一個(gè) UNSAFE_UNAVAILABILITY_CAUSE 對(duì)象英遭,否則賦值為 null间护,Netty 通過(guò)這個(gè)判 Null 確認(rèn)平臺(tái)是否支持 Unsafe。
我們繼續(xù)看看 directBuffer 方法的實(shí)現(xiàn):
public ByteBuf directBuffer(int initialCapacity) {
return directBuffer(initialCapacity, DEFAULT_MAX_CAPACITY);
}
//
public ByteBuf directBuffer(int initialCapacity, int maxCapacity) {
if (initialCapacity == 0 && maxCapacity == 0) {
return emptyBuf;
}
validate(initialCapacity, maxCapacity);
return newDirectBuffer(initialCapacity, maxCapacity);
}
//
protected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity) {
final ByteBuf buf;
if (PlatformDependent.hasUnsafe()) {
buf = noCleaner ? new InstrumentedUnpooledUnsafeNoCleanerDirectByteBuf(this, initialCapacity, maxCapacity) :
new InstrumentedUnpooledUnsafeDirectByteBuf(this, initialCapacity, maxCapacity);
} else {
buf = new InstrumentedUnpooledDirectByteBuf(this, initialCapacity, maxCapacity);
}
return disableLeakDetector ? buf : toLeakAwareBuffer(buf);
}
由于方法層層遞進(jìn)挖诸,樓主將代碼合在一起汁尺,最終調(diào)用的是 newDirectBuffer,根據(jù) noCleaner 參數(shù)決定創(chuàng)建一個(gè) ByteBuf多律,這個(gè)屬性怎么來(lái)的呢痴突?當(dāng) unsafe 不是 null 的時(shí)候,會(huì)嘗試獲取 DirectByteBuffer 的構(gòu)造器狼荞,如果成功獲取辽装,則 noCleaner 屬性為 true。
這個(gè) noCleaner 屬性的詳細(xì)介紹請(qǐng)看這里Netty 內(nèi)存回收之 noCleaner 策略.
默認(rèn)情況下就是 true相味,那么拾积,也就是創(chuàng)建了一個(gè) InstrumentedUnpooledUnsafeNoCleanerDirectByteBuf 對(duì)象,該對(duì)象構(gòu)造如下:
@1
InstrumentedUnpooledUnsafeNoCleanerDirectByteBuf(
UnpooledByteBufAllocator alloc, int initialCapacity, int maxCapacity) {
super(alloc, initialCapacity, maxCapacity);
}
@2
UnpooledUnsafeNoCleanerDirectByteBuf(ByteBufAllocator alloc, int initialCapacity, int maxCapacity) {
super(alloc, initialCapacity, maxCapacity);
}
@3
public UnpooledUnsafeDirectByteBuf(ByteBufAllocator alloc, int initialCapacity, int maxCapacity) {
super(maxCapacity);
if (alloc == null) {
throw new NullPointerException("alloc");
}
if (initialCapacity < 0) {
throw new IllegalArgumentException("initialCapacity: " + initialCapacity);
}
if (maxCapacity < 0) {
throw new IllegalArgumentException("maxCapacity: " + maxCapacity);
}
if (initialCapacity > maxCapacity) {
throw new IllegalArgumentException(String.format(
"initialCapacity(%d) > maxCapacity(%d)", initialCapacity, maxCapacity));
}
this.alloc = alloc;
setByteBuffer(allocateDirect(initialCapacity), false);
}
@4
static ByteBuffer newDirectBuffer(long address, int capacity) {
ObjectUtil.checkPositiveOrZero(capacity, "capacity");
return (ByteBuffer) DIRECT_BUFFER_CONSTRUCTOR.newInstance(address, capacity);
}
最終使用了 DirectByteBuffer 的構(gòu)造器進(jìn)行反射創(chuàng)建。而這個(gè)構(gòu)造器是沒(méi)有默認(rèn)的 new 創(chuàng)建的 Cleaner 對(duì)象的拓巧。因此稱(chēng)為 noCleaner斯碌。
創(chuàng)建完畢后,調(diào)用 setByteBuffer 肛度,將這個(gè) DirectByteBuffer 包裝一下傻唾。
回到 newDirectBuffer 方法。
最后根據(jù) disableLeakDetector 屬性判斷釋放進(jìn)行自動(dòng)內(nèi)存回收(也就是當(dāng)你忘記回收的時(shí)候承耿,幫你回收)冠骄,原理這里簡(jiǎn)單的說(shuō)一下,使用虛引用進(jìn)行跟蹤瘩绒。 FastThreadLocal 的內(nèi)存回收類(lèi)似猴抹。我們將在以后的文章中詳細(xì)說(shuō)明此策略。
到這里锁荔,創(chuàng)建 ByteBuf 的過(guò)程就結(jié)束了蟀给。
可以說(shuō),大部分工作都是 allocator 做的阳堕,allocHandle 的作用就是提供了如何分配一個(gè)合理的內(nèi)存的策略跋理。
5. 如何讀取到 ByteBuf
回到 read 方法,doReadBytes(byteBuf) 就是將 Channel 的內(nèi)容讀取到容器中恬总,并返回一個(gè)讀取到的字節(jié)數(shù)前普。
代碼如下:
protected int doReadBytes(ByteBuf byteBuf) throws Exception {
final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
allocHandle.attemptedBytesRead(byteBuf.writableBytes());
return byteBuf.writeBytes(javaChannel(), allocHandle.attemptedBytesRead());
}
獲取到 內(nèi)存預(yù)估器
,設(shè)置一個(gè) attemptedBytesRead 屬性為 ByteBuf 的可寫(xiě)字節(jié)數(shù)壹堰。這個(gè)參數(shù)可用于后面分配內(nèi)存時(shí)的一些考量拭卿。
然后調(diào)用 byteBuf.writeBytes()方法。傳入了 NIO 的 channel贱纠,還有剛剛的可寫(xiě)字節(jié)數(shù)峻厚。進(jìn)入到該方法查看:
@1
public int writeBytes(ScatteringByteChannel in, int length) throws IOException {
ensureWritable(length);
int writtenBytes = setBytes(writerIndex, in, length);
if (writtenBytes > 0) {
writerIndex += writtenBytes;
}
return writtenBytes;
}
首先對(duì)長(zhǎng)度進(jìn)行校驗(yàn),確弊缓福可寫(xiě)長(zhǎng)度大于0惠桃,如果被并發(fā)了導(dǎo)致容量不夠,將這個(gè)底層的 ByteBuffer 的容量增加傳入的長(zhǎng)度辖试。
關(guān)于 ByteBuf 的 wirteIndex 辜王,如下圖:
回到 writeBytes 方法,調(diào)用 setBytes 方法罐孝,將流中輸入寫(xiě)入到緩沖區(qū)呐馆。方法如下:
public int setBytes(int index, ScatteringByteChannel in, int length) throws IOException {
ensureAccessible();
ByteBuffer tmpBuf = internalNioBuffer();
tmpBuf.clear().position(index).limit(index + length);
try {
return in.read(tmpBuf);
} catch (ClosedChannelException ignored) {
return -1;
}
}
非常熟悉的 NIO 操作。
首先獲取到內(nèi)部 ByteBuffer 的共享緩沖區(qū)莲兢,賦值給臨時(shí)的 tmpNioBuf 屬性摹恰。然后返回這個(gè)引用辫继。將這個(gè)引用清空,并將指針移動(dòng)到給定 index 為止俗慈,然后 limit 方法設(shè)置緩存區(qū)大小姑宽。
最后調(diào)用 Channel 的 read 方法,將Channel 數(shù)據(jù)讀入到 ByteBuffer 中闺阱。讀的過(guò)程時(shí)線(xiàn)程安全的炮车,內(nèi)部使用了 synchronized 關(guān)鍵字控制寫(xiě)入 buffer 的過(guò)程。返回了讀到的字節(jié)數(shù)酣溃。
回到 writeBytes 方法瘦穆,得到字節(jié)數(shù)之后,將這個(gè)字節(jié)數(shù)追加到 writerIndex 屬性赊豌,表示可寫(xiě)字節(jié)變小了扛或。
回到 read 方法。allocHandle 得到讀取到的字節(jié)數(shù)碘饼,調(diào)用 lastBytesRead 方法熙兔,該方法的作用時(shí)調(diào)整下一次分配內(nèi)存的大小。進(jìn)入到該方法查看:
public void lastBytesRead(int bytes) {
// If we read as much as we asked for we should check if we need to ramp up the size of our next guess.
// This helps adjust more quickly when large amounts of data is pending and can avoid going back to
// the selector to check for more data. Going back to the selector can add significant latency for large
// data transfers.
if (bytes == attemptedBytesRead()) {
record(bytes);
}
super.lastBytesRead(bytes);
}
Netty 寫(xiě)了注釋?zhuān)?/p>
如果我們讀的內(nèi)容和我們要求的一樣多艾恼,我們應(yīng)該檢查一下是否需要增加下一個(gè)猜測(cè)的大小住涉。
這有助于在等待大量數(shù)據(jù)時(shí)更快地進(jìn)行調(diào)整,并且可以避免返回選擇器以檢查更多數(shù)據(jù)钠绍∮呱回到選擇器可以為大型數(shù)據(jù)傳輸添加顯著的延遲。
當(dāng)獲取的字節(jié)數(shù)和預(yù)估的一樣大柳爽,則需要進(jìn)行擴(kuò)容媳握。看看 record 方法實(shí)現(xiàn):
private void record(int actualReadBytes) {
if (actualReadBytes <= SIZE_TABLE[max(0, index - INDEX_DECREMENT - 1)]) {
if (decreaseNow) {
index = max(index - INDEX_DECREMENT, minIndex);
nextReceiveBufferSize = SIZE_TABLE[index];
decreaseNow = false;
} else {
decreaseNow = true;
}
} else if (actualReadBytes >= nextReceiveBufferSize) {
index = min(index + INDEX_INCREMENT, maxIndex);
nextReceiveBufferSize = SIZE_TABLE[index];
decreaseNow = false;
}
}
如果實(shí)際讀取到的字節(jié)數(shù)小于等于預(yù)估的字節(jié) 下標(biāo) - 2(排除2以下)磷脯,則將容量縮小一個(gè)下標(biāo)蛾找。如果實(shí)際讀取到的字節(jié)數(shù)大于等于預(yù)估的。則將下標(biāo)增加 4争拐,下次創(chuàng)建的 Buffer 容量也相應(yīng)增加腋粥。如果不滿(mǎn)足這兩個(gè)條件晦雨,什么都不做架曹。
回答 lastBytesRead 方法,該方法記錄了讀取到的總字節(jié)數(shù)并且更新了最后一次的讀取字節(jié)數(shù)闹瞧“笮郏總字節(jié)數(shù)會(huì)用來(lái)判斷是否可以結(jié)束讀取循環(huán)。如果什么都沒(méi)有讀到奥邮,將最多持續(xù)到讀 16(默認(rèn)) 次万牺。
回到 read 方法罗珍。
如果最后一次讀取到字節(jié)數(shù)小于等于0,跳出循環(huán)脚粟,不做 channelRead 操作覆旱。
反之,將 totalMessages 加1核无,這個(gè)就是用來(lái)記錄循環(huán)次數(shù)扣唱,判斷不能超過(guò) 16次。
調(diào)用 fireChannelRead 方法团南,方法結(jié)束后噪沙,將這個(gè) Buffer 的引用置為null,
判斷是否需要繼續(xù)讀取吐根,帶入如下:
public boolean continueReading(UncheckedBooleanSupplier maybeMoreDataSupplier) {
return config.isAutoRead() &&
(!respectMaybeMoreData || maybeMoreDataSupplier.get()) &&
totalMessages < maxMessagePerRead &&
totalBytesRead > 0;
}
幾個(gè)條件:
- 首先是否自動(dòng)讀取正歼。
- 且猜測(cè)是否還有更多數(shù)據(jù),如果實(shí)際讀取的和預(yù)估的一致拷橘,說(shuō)明可能還有數(shù)據(jù)沒(méi)讀局义,需要再次循環(huán)。
- 如果讀取次數(shù)為達(dá)到 16 次膜楷,繼續(xù)讀取旭咽。
- 如果讀取到的總數(shù)大于0,說(shuō)明有數(shù)據(jù)赌厅,繼續(xù)讀取穷绵。
這里的循環(huán)的主要原因就像我們剛剛說(shuō)的,TCP 傳輸過(guò)大數(shù)據(jù)容易丟包(帶寬限制)特愿,因此會(huì)將大包分好幾次傳輸仲墨,還有就是可能預(yù)估的緩沖區(qū)不夠大,沒(méi)有充分讀取 Channel 的內(nèi)容揍障。
6. 總結(jié)
從 NioSocketChannel$NioSocketChannelUnsafe
的實(shí)現(xiàn)看 read 方法目养。每個(gè) ByteBuf 都會(huì)由一個(gè) Config 實(shí)例中的 ByteBufAllocator 對(duì)象創(chuàng)建,池化或非池化毒嫡,直接內(nèi)存或堆內(nèi)存癌蚁,這些都根據(jù)系統(tǒng)是否支持或參數(shù)設(shè)置,底層使用的是 NIO 的 API兜畸。今天我們看的是非池化的直接內(nèi)存努释。同時(shí),為了節(jié)省內(nèi)存咬摇,為每個(gè) ByteBufAllocator 配置了一個(gè) handle伐蒂,用于計(jì)算和預(yù)估緩沖區(qū)大小。
還有一個(gè)需要注意的地方就是 noCleaner 策略肛鹏。這是 Netty 的一個(gè)優(yōu)化逸邦。針對(duì)默認(rèn)的直接內(nèi)存創(chuàng)建和銷(xiāo)毀做了優(yōu)化--------不使用 JDK 的 cleaner 策略恩沛。
最終讀取數(shù)據(jù)到封裝了 NIO ByteBuffer 實(shí)例的 Netty 的 ByteBuf 中,其中缕减,如果數(shù)據(jù)量超過(guò) 1024雷客,則會(huì)讀取超過(guò)兩次,但最多不超過(guò) 16 次桥狡, 這個(gè)次數(shù)可以設(shè)置佛纫,也就是說(shuō),可能會(huì)調(diào)用超過(guò)2次 fireChannelRead 方法总放,使用的時(shí)候需要注意(存起來(lái)一起在 ChannelReadComplete 使用之類(lèi)的方法)呈宇。
好,關(guān)于 Netty 讀取 Socket 數(shù)據(jù)到容器中的邏輯局雄,就到這里甥啄。
good luck!>娲睢蜈漓!