Netty 源碼剖析之 unSafe.read 方法

目錄:

  1. NioSocketChannel$NioSocketChannelUnsafe 的 read 方法
  2. 首先看 ByteBufAllocator
  3. 再看 RecvByteBufAllocator.Handle
  4. 兩者如何配合進(jìn)行內(nèi)存分配
  5. 如何讀取到 ByteBuf
  6. 總結(jié)

前言

在之前的文章 Netty 核心組件 Pipeline 源碼分析(二)一個(gè)請(qǐng)求的 pipeline 之旅中循诉,我們知道了當(dāng)客戶(hù)端請(qǐng)求進(jìn)來(lái)的時(shí)候,boss 線(xiàn)程會(huì)將 Socket 包裝后交給 worker 線(xiàn)程店茶,worker 線(xiàn)程會(huì)將這個(gè) Socket 注冊(cè) selector 的讀事件愈案,當(dāng)讀事件進(jìn)來(lái)的時(shí)候码泞,會(huì)調(diào)用 unsafe 的 read 方法悼沿,這個(gè)方法的主要作用是讀取 Socket 緩沖區(qū)的內(nèi)存,并包裝成 Netty 的 ByteBuf 對(duì)象饼暑,最后傳遞進(jìn) pipeline 中的所有節(jié)點(diǎn)完成處理婚被。

今天狡忙,我們就要好好的看看這個(gè) read方法的實(shí)現(xiàn)。

1. NioSocketChannel$NioSocketChannelUnsafe 的 read 方法

源碼如下:

public final void read() {
    final ChannelConfig config = config();
    final ChannelPipeline pipeline = pipeline();
    // 用來(lái)處理內(nèi)存的分配:池化或者非池化 UnpooledByteBufAllocator
    final ByteBufAllocator allocator = config.getAllocator();
    // 用來(lái)計(jì)算此次讀循環(huán)應(yīng)該分配多少內(nèi)存 AdaptiveRecvByteBufAllocator 自適應(yīng)計(jì)算緩沖分配
    final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();
    allocHandle.reset(config);// 重置為0

    ByteBuf byteBuf = null;
    boolean close = false;
    try {
        do {
            byteBuf = allocHandle.allocate(allocator);
            allocHandle.lastBytesRead(doReadBytes(byteBuf));
            if (allocHandle.lastBytesRead() <= 0) {// 如果上一次讀到的字節(jié)數(shù)小于等于0址芯,清理引用和跳出循環(huán)
                // nothing was read. release the buffer.
                byteBuf.release();// 引用 -1
                byteBuf = null;
                close = allocHandle.lastBytesRead() < 0;// 如果遠(yuǎn)程已經(jīng)關(guān)閉連接
                if (close) {
                    // There is nothing left to read as we received an EOF.
                    readPending = false;
                }
                break;
            }

            allocHandle.incMessagesRead(1);//  totalMessages += amt;
            readPending = false;
            pipeline.fireChannelRead(byteBuf);
            byteBuf = null;
        } while (allocHandle.continueReading());

        allocHandle.readComplete();
        pipeline.fireChannelReadComplete();

        if (close) {
            closeOnRead(pipeline);
        }
    } catch (Throwable t) {
        handleReadException(pipeline, byteBuf, t, close, allocHandle);
    } finally {
        if (!readPending && !config.isAutoRead()) {
            removeReadOp();
        }
    }
}

代碼很長(zhǎng)灾茁,怎么辦呢?當(dāng)然是拆解谷炸,然后逐個(gè)擊破删顶。步驟如下:

  1. 獲取到 Channel 的 config 對(duì)象,并從該對(duì)象中獲取內(nèi)存分配器淑廊,還有"計(jì)算內(nèi)存分配器"逗余。
  2. 計(jì)算內(nèi)存分配器 重置。
  3. 進(jìn)入一個(gè)循環(huán)季惩,循環(huán)體的作用是:使用內(nèi)存分配器獲取數(shù)據(jù)容器-----ByteBuf录粱,調(diào)用 doReadBytes 方法將數(shù)據(jù)讀取到容器中,如果這次讀取什么都沒(méi)有或遠(yuǎn)程連接關(guān)閉画拾,則跳出循環(huán)啥繁。還有,如果滿(mǎn)足了跳出推薦青抛,也要結(jié)束循環(huán)旗闽,不能無(wú)限循環(huán),默認(rèn)16 次,默認(rèn)參數(shù)來(lái)自 AbstractNioByteChannel 的 屬性 ChannelMetadata 類(lèi)型的 METADATA 實(shí)例适室。每讀取一次就調(diào)用 pipeline 的 channelRead 方法嫡意,為什么呢?因?yàn)橛捎?TCP 傳輸如果包過(guò)大的話(huà)捣辆,丟失的風(fēng)險(xiǎn)會(huì)更大蔬螟,導(dǎo)致重傳,所以汽畴,大的數(shù)據(jù)流會(huì)分成多次傳輸旧巾。而 channelRead 方法也會(huì)被調(diào)用多次,因此忍些,使用 channelRead 方法的時(shí)候需要注意鲁猩,如果數(shù)據(jù)量大,最好將數(shù)據(jù)放入到緩存中罢坝,讀取完畢后绳匀,再進(jìn)行處理。
  4. 跳出循環(huán)后炸客,調(diào)用 allocHandle 的 readComplete 方法,表示讀取已完成戈钢,并記錄讀取記錄痹仙,用于下次分配合理內(nèi)存。
  5. 調(diào)用 pipeline 的方法殉了。

接下來(lái)就一步步看开仰。

2. 首先看 ByteBufAllocator

首先看看這個(gè)節(jié)點(diǎn)的定義:

Implementations are responsible to allocate buffers. Implementations of this interface are expected to be hread-safe.
實(shí)現(xiàn)負(fù)責(zé)分配緩沖區(qū)。這個(gè)接口的實(shí)現(xiàn)應(yīng)該是線(xiàn)程安全的薪铜。

定義了如上方法

通過(guò)這個(gè)接口众弓,可以看出來(lái),主要作用是創(chuàng)建 ByteBuf隔箍,這個(gè) ByteBuf 是 Netty 用來(lái)替代 NIO 的 ByteBuffer 的谓娃,是存儲(chǔ)數(shù)據(jù)的緩存區(qū)。其中蜒滩,這個(gè)接口有一個(gè)默認(rèn)實(shí)現(xiàn) ByteBufUtil.DEFAULT_ALLOCATOR :該實(shí)現(xiàn)根據(jù)配置創(chuàng)建一個(gè) 池化或非池化的緩存區(qū)分配器滨达。該參數(shù)是 io.netty.allocator.type

同時(shí)俯艰,由于很多方法都是重載的捡遍,那就說(shuō)說(shuō)上面的主要方法作用:

buffer() // 返回一個(gè) ByteBuf 對(duì)象,默認(rèn)直接內(nèi)存竹握。如果平臺(tái)不支持画株,返回堆內(nèi)存。
heapBuffer()// 返回堆內(nèi)存緩存區(qū)
directBuffer()// 返回直接內(nèi)存緩沖區(qū)
compositeBuffer() // 返回一個(gè)復(fù)合緩沖區(qū)∥酱可能同時(shí)包含堆內(nèi)存和直接內(nèi)存蜈项。
ioBuffer() // 當(dāng)當(dāng)支持 Unsafe 時(shí),返回直接內(nèi)存的 Bytebuf良拼,否則返回返回基于堆內(nèi)存战得,當(dāng)使用 PreferHeapByteBufAllocator 時(shí)返回堆內(nèi)存

3. 再看 RecvByteBufAllocator.Handle

首先看這個(gè)接口:

上圖中, Handle 是 RecvByteBufAllocator 的內(nèi)部接口庸推。而 RecvByteBufAllocator 是如何定義的呢常侦?

Creates a new handle. The handle provides the actual operations and keeps the internal information which is required for predicting an optimal buffer capacity.
創(chuàng)建一個(gè)新的句柄。句柄提供了實(shí)際操作贬媒,并保留了用于預(yù)測(cè)最佳緩沖區(qū)容量所需的內(nèi)部信息聋亡。

該接口只定義了一個(gè)方法:newHandle()。

而 handle 的作用是什么呢际乘?


ByteBuf allocate(ByteBufAllocator alloc);//創(chuàng)建一個(gè)新的接收緩沖區(qū)坡倔,其容量可能大到足以讀取所有入站數(shù)據(jù)和小到數(shù)據(jù)足夠不浪費(fèi)它的空間。
int guess();// 猜測(cè)所需的緩沖區(qū)大小脖含,不進(jìn)行實(shí)際的分配
void reset(ChannelConfig config);// 每次開(kāi)始讀循環(huán)之前罪塔,重置相關(guān)屬性
void incMessagesRead(int numMessages);// 增加本地讀循環(huán)的次數(shù)
void lastBytesRead(int bytes); // 設(shè)置最后一次讀到的字節(jié)數(shù)
int lastBytesRead(); // 最后一次讀到的字節(jié)數(shù)
void attemptedBytesRead(int bytes); // 設(shè)置讀操作嘗試讀取的字節(jié)數(shù)
void attemptedBytesRead(); // 獲取嘗試讀取的字節(jié)數(shù)
boolean continueReading(); // 判斷是否需要繼續(xù)讀
void readComplete(); // 讀結(jié)束后調(diào)用

從上面的方法中,可以看出养葵,該接口的主要作用就是計(jì)算字節(jié)數(shù)征堪,如同 RecvByteBufAllocator 的文檔說(shuō)的那樣,根據(jù)預(yù)測(cè)和計(jì)算最佳大小的緩存區(qū)关拒,確保不浪費(fèi)佃蚜。

4. 兩者如何配合進(jìn)行內(nèi)存分配

在默認(rèn)的 config (NioSocketChannelConfig)中,allocator 來(lái)自 ByteBufAllocator 接口的默認(rèn)實(shí)例着绊,allocHandle 來(lái)自 AdaptiveRecvByteBufAllocator 自適應(yīng)循環(huán)緩存分配器 的內(nèi)部類(lèi) HandleImpl谐算。

好,知道了他們的默認(rèn)實(shí)現(xiàn)归露,我們一個(gè)方法看看洲脂。

首先看 reset 方法:

public void reset(ChannelConfig config) {
    this.config = config;
    maxMessagePerRead = maxMessagesPerRead();
    totalMessages = totalBytesRead = 0;
}

設(shè)置了上次獲取的最大消息讀取次數(shù)(默認(rèn)16),將之前計(jì)算的讀取消息總數(shù)歸零剧包。該方法如同他的名字腮考,歸零重置。

再看看 allocHandle.allocate(allocator) 方法的實(shí)現(xiàn)玄捕。

public ByteBuf allocate(ByteBufAllocator alloc) {
    return alloc.ioBuffer(guess());
}

我們剛剛說(shuō)的 ioBuffer 方法踩蔚,該方法默認(rèn)返回直接內(nèi)存緩沖區(qū)。而 guess() 方法返回一個(gè)猜測(cè)的大小枚粘,一個(gè) nextReceiveBufferSize 屬性馅闽,默認(rèn) 1024,也就是說(shuō),默認(rèn)創(chuàng)建一個(gè) 1024 大小的直接內(nèi)存緩沖區(qū)福也。這個(gè)值的設(shè)定來(lái)自 HandleImpl 的構(gòu)造方法局骤,存儲(chǔ)在一個(gè) SIZE_TABLE 的數(shù)組中。

我們還是看看 RecvByteBufAllocator 的實(shí)現(xiàn)類(lèi) AdaptiveRecvByteBufAllocator 的具體內(nèi)容吧

static final int DEFAULT_MINIMUM = 64; // 緩存區(qū)最小值
static final int DEFAULT_INITIAL = 1024; // 緩沖區(qū)初始值
static final int DEFAULT_MAXIMUM = 65536; // 緩沖區(qū)最大值

private static final int INDEX_INCREMENT = 4;// 當(dāng)發(fā)現(xiàn)緩存過(guò)小暴凑,數(shù)組下標(biāo)自增值
private static final int INDEX_DECREMENT = 1;// 當(dāng)發(fā)現(xiàn)緩沖區(qū)過(guò)大峦甩,數(shù)組下標(biāo)自減值

private static final int[] SIZE_TABLE;

static {
    List<Integer> sizeTable = new ArrayList<Integer>();
    for (int i = 16; i < 512; i += 16) {
        sizeTable.add(i);
    }

    for (int i = 512; i > 0; i <<= 1) {
        sizeTable.add(i);
    }

    SIZE_TABLE = new int[sizeTable.size()];
    for (int i = 0; i < SIZE_TABLE.length; i ++) {
        SIZE_TABLE[i] = sizeTable.get(i);
    }
}

樓主在上面的代碼中寫(xiě)了注釋?zhuān)@個(gè) SIZE_TABLE 的作用是存儲(chǔ)緩存區(qū)大小的一個(gè) int 數(shù)組,從 static 塊中可以看到现喳,這個(gè)數(shù)組從16開(kāi)始凯傲,同時(shí)遞增16,直到值到了 512嗦篱,也就是下標(biāo) 31 的地方冰单,遞增策略變?yōu)榱?每次 * 2,直到溢出灸促。最終的數(shù)組長(zhǎng)度為 53诫欠。而對(duì)應(yīng)的值接近 int 最大值。

好浴栽,回到 allocate 方法中荒叼,進(jìn)入到 ioBuffer 方法查看:

public ByteBuf ioBuffer(int initialCapacity) {
    if (PlatformDependent.hasUnsafe()) {
        return directBuffer(initialCapacity);
    }
    return heapBuffer(initialCapacity);
}

判斷,如果平臺(tái)支持 unSafe典鸡,就使用直接內(nèi)存被廓,否則使用堆內(nèi)存,初始大小就是我們剛剛說(shuō)的 1024椿每。而這個(gè)判斷的標(biāo)準(zhǔn)是:如果嘗試獲取 Unsafe 的時(shí)候有異常了,則賦值給一個(gè) UNSAFE_UNAVAILABILITY_CAUSE 對(duì)象英遭,否則賦值為 null间护,Netty 通過(guò)這個(gè)判 Null 確認(rèn)平臺(tái)是否支持 Unsafe。

我們繼續(xù)看看 directBuffer 方法的實(shí)現(xiàn):


public ByteBuf directBuffer(int initialCapacity) {
    return directBuffer(initialCapacity, DEFAULT_MAX_CAPACITY);
}
// 
public ByteBuf directBuffer(int initialCapacity, int maxCapacity) {
    if (initialCapacity == 0 && maxCapacity == 0) {
        return emptyBuf;
    }
    validate(initialCapacity, maxCapacity);
    return newDirectBuffer(initialCapacity, maxCapacity);
}
// 
protected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity) {
    final ByteBuf buf;
    if (PlatformDependent.hasUnsafe()) {
        buf = noCleaner ? new InstrumentedUnpooledUnsafeNoCleanerDirectByteBuf(this, initialCapacity, maxCapacity) :
                new InstrumentedUnpooledUnsafeDirectByteBuf(this, initialCapacity, maxCapacity);
    } else {
        buf = new InstrumentedUnpooledDirectByteBuf(this, initialCapacity, maxCapacity);
    }
    return disableLeakDetector ? buf : toLeakAwareBuffer(buf);
}

由于方法層層遞進(jìn)挖诸,樓主將代碼合在一起汁尺,最終調(diào)用的是 newDirectBuffer,根據(jù) noCleaner 參數(shù)決定創(chuàng)建一個(gè) ByteBuf多律,這個(gè)屬性怎么來(lái)的呢痴突?當(dāng) unsafe 不是 null 的時(shí)候,會(huì)嘗試獲取 DirectByteBuffer 的構(gòu)造器狼荞,如果成功獲取辽装,則 noCleaner 屬性為 true。

這個(gè) noCleaner 屬性的詳細(xì)介紹請(qǐng)看這里Netty 內(nèi)存回收之 noCleaner 策略.

默認(rèn)情況下就是 true相味,那么拾积,也就是創(chuàng)建了一個(gè) InstrumentedUnpooledUnsafeNoCleanerDirectByteBuf 對(duì)象,該對(duì)象構(gòu)造如下:

@1
InstrumentedUnpooledUnsafeNoCleanerDirectByteBuf(
        UnpooledByteBufAllocator alloc, int initialCapacity, int maxCapacity) {
    super(alloc, initialCapacity, maxCapacity);
}

@2
UnpooledUnsafeNoCleanerDirectByteBuf(ByteBufAllocator alloc, int initialCapacity, int maxCapacity) {
    super(alloc, initialCapacity, maxCapacity);
}

@3
public UnpooledUnsafeDirectByteBuf(ByteBufAllocator alloc, int initialCapacity, int maxCapacity) {
    super(maxCapacity);
    if (alloc == null) {
        throw new NullPointerException("alloc");
    }
    if (initialCapacity < 0) {
        throw new IllegalArgumentException("initialCapacity: " + initialCapacity);
    }
    if (maxCapacity < 0) {
        throw new IllegalArgumentException("maxCapacity: " + maxCapacity);
    }
    if (initialCapacity > maxCapacity) {
        throw new IllegalArgumentException(String.format(
                "initialCapacity(%d) > maxCapacity(%d)", initialCapacity, maxCapacity));
    }

    this.alloc = alloc;
    setByteBuffer(allocateDirect(initialCapacity), false);
}

@4
static ByteBuffer newDirectBuffer(long address, int capacity) {
    ObjectUtil.checkPositiveOrZero(capacity, "capacity");
    return (ByteBuffer) DIRECT_BUFFER_CONSTRUCTOR.newInstance(address, capacity);
}

最終使用了 DirectByteBuffer 的構(gòu)造器進(jìn)行反射創(chuàng)建。而這個(gè)構(gòu)造器是沒(méi)有默認(rèn)的 new 創(chuàng)建的 Cleaner 對(duì)象的拓巧。因此稱(chēng)為 noCleaner斯碌。

創(chuàng)建完畢后,調(diào)用 setByteBuffer 肛度,將這個(gè) DirectByteBuffer 包裝一下傻唾。

回到 newDirectBuffer 方法。

最后根據(jù) disableLeakDetector 屬性判斷釋放進(jìn)行自動(dòng)內(nèi)存回收(也就是當(dāng)你忘記回收的時(shí)候承耿,幫你回收)冠骄,原理這里簡(jiǎn)單的說(shuō)一下,使用虛引用進(jìn)行跟蹤瘩绒。 FastThreadLocal 的內(nèi)存回收類(lèi)似猴抹。我們將在以后的文章中詳細(xì)說(shuō)明此策略。

到這里锁荔,創(chuàng)建 ByteBuf 的過(guò)程就結(jié)束了蟀给。

可以說(shuō),大部分工作都是 allocator 做的阳堕,allocHandle 的作用就是提供了如何分配一個(gè)合理的內(nèi)存的策略跋理。

5. 如何讀取到 ByteBuf

回到 read 方法,doReadBytes(byteBuf) 就是將 Channel 的內(nèi)容讀取到容器中恬总,并返回一個(gè)讀取到的字節(jié)數(shù)前普。

代碼如下:

protected int doReadBytes(ByteBuf byteBuf) throws Exception {
    final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
    allocHandle.attemptedBytesRead(byteBuf.writableBytes());
    return byteBuf.writeBytes(javaChannel(), allocHandle.attemptedBytesRead());
}

獲取到 內(nèi)存預(yù)估器,設(shè)置一個(gè) attemptedBytesRead 屬性為 ByteBuf 的可寫(xiě)字節(jié)數(shù)壹堰。這個(gè)參數(shù)可用于后面分配內(nèi)存時(shí)的一些考量拭卿。

然后調(diào)用 byteBuf.writeBytes()方法。傳入了 NIO 的 channel贱纠,還有剛剛的可寫(xiě)字節(jié)數(shù)峻厚。進(jìn)入到該方法查看:

@1
public int writeBytes(ScatteringByteChannel in, int length) throws IOException {
    ensureWritable(length);
    int writtenBytes = setBytes(writerIndex, in, length);
    if (writtenBytes > 0) {
        writerIndex += writtenBytes;
    }
    return writtenBytes;
}

首先對(duì)長(zhǎng)度進(jìn)行校驗(yàn),確弊缓福可寫(xiě)長(zhǎng)度大于0惠桃,如果被并發(fā)了導(dǎo)致容量不夠,將這個(gè)底層的 ByteBuffer 的容量增加傳入的長(zhǎng)度辖试。

關(guān)于 ByteBuf 的 wirteIndex 辜王,如下圖:

回到 writeBytes 方法,調(diào)用 setBytes 方法罐孝,將流中輸入寫(xiě)入到緩沖區(qū)呐馆。方法如下:

public int setBytes(int index, ScatteringByteChannel in, int length) throws IOException {
    ensureAccessible();
    ByteBuffer tmpBuf = internalNioBuffer();
    tmpBuf.clear().position(index).limit(index + length);
    try {
        return in.read(tmpBuf);
    } catch (ClosedChannelException ignored) {
        return -1;
    }
}

非常熟悉的 NIO 操作。

首先獲取到內(nèi)部 ByteBuffer 的共享緩沖區(qū)莲兢,賦值給臨時(shí)的 tmpNioBuf 屬性摹恰。然后返回這個(gè)引用辫继。將這個(gè)引用清空,并將指針移動(dòng)到給定 index 為止俗慈,然后 limit 方法設(shè)置緩存區(qū)大小姑宽。

最后調(diào)用 Channel 的 read 方法,將Channel 數(shù)據(jù)讀入到 ByteBuffer 中闺阱。讀的過(guò)程時(shí)線(xiàn)程安全的炮车,內(nèi)部使用了 synchronized 關(guān)鍵字控制寫(xiě)入 buffer 的過(guò)程。返回了讀到的字節(jié)數(shù)酣溃。

回到 writeBytes 方法瘦穆,得到字節(jié)數(shù)之后,將這個(gè)字節(jié)數(shù)追加到 writerIndex 屬性赊豌,表示可寫(xiě)字節(jié)變小了扛或。

回到 read 方法。allocHandle 得到讀取到的字節(jié)數(shù)碘饼,調(diào)用 lastBytesRead 方法熙兔,該方法的作用時(shí)調(diào)整下一次分配內(nèi)存的大小。進(jìn)入到該方法查看:

public void lastBytesRead(int bytes) {
    // If we read as much as we asked for we should check if we need to ramp up the size of our next guess.
    // This helps adjust more quickly when large amounts of data is pending and can avoid going back to
    // the selector to check for more data. Going back to the selector can add significant latency for large
    // data transfers.
    if (bytes == attemptedBytesRead()) {
        record(bytes);
    }
    super.lastBytesRead(bytes);
}

Netty 寫(xiě)了注釋?zhuān)?/p>

如果我們讀的內(nèi)容和我們要求的一樣多艾恼,我們應(yīng)該檢查一下是否需要增加下一個(gè)猜測(cè)的大小住涉。
這有助于在等待大量數(shù)據(jù)時(shí)更快地進(jìn)行調(diào)整,并且可以避免返回選擇器以檢查更多數(shù)據(jù)钠绍∮呱回到選擇器可以為大型數(shù)據(jù)傳輸添加顯著的延遲。

當(dāng)獲取的字節(jié)數(shù)和預(yù)估的一樣大柳爽,則需要進(jìn)行擴(kuò)容媳握。看看 record 方法實(shí)現(xiàn):

private void record(int actualReadBytes) {
    if (actualReadBytes <= SIZE_TABLE[max(0, index - INDEX_DECREMENT - 1)]) {
        if (decreaseNow) {
            index = max(index - INDEX_DECREMENT, minIndex);
            nextReceiveBufferSize = SIZE_TABLE[index];
            decreaseNow = false;
        } else {
            decreaseNow = true;
        }
    } else if (actualReadBytes >= nextReceiveBufferSize) {
        index = min(index + INDEX_INCREMENT, maxIndex);
        nextReceiveBufferSize = SIZE_TABLE[index];
        decreaseNow = false;
    }
}

如果實(shí)際讀取到的字節(jié)數(shù)小于等于預(yù)估的字節(jié) 下標(biāo) - 2(排除2以下)磷脯,則將容量縮小一個(gè)下標(biāo)蛾找。如果實(shí)際讀取到的字節(jié)數(shù)大于等于預(yù)估的。則將下標(biāo)增加 4争拐,下次創(chuàng)建的 Buffer 容量也相應(yīng)增加腋粥。如果不滿(mǎn)足這兩個(gè)條件晦雨,什么都不做架曹。

回答 lastBytesRead 方法,該方法記錄了讀取到的總字節(jié)數(shù)并且更新了最后一次的讀取字節(jié)數(shù)闹瞧“笮郏總字節(jié)數(shù)會(huì)用來(lái)判斷是否可以結(jié)束讀取循環(huán)。如果什么都沒(méi)有讀到奥邮,將最多持續(xù)到讀 16(默認(rèn)) 次万牺。

回到 read 方法罗珍。

如果最后一次讀取到字節(jié)數(shù)小于等于0,跳出循環(huán)脚粟,不做 channelRead 操作覆旱。
反之,將 totalMessages 加1核无,這個(gè)就是用來(lái)記錄循環(huán)次數(shù)扣唱,判斷不能超過(guò) 16次。

調(diào)用 fireChannelRead 方法团南,方法結(jié)束后噪沙,將這個(gè) Buffer 的引用置為null,

判斷是否需要繼續(xù)讀取吐根,帶入如下:

public boolean continueReading(UncheckedBooleanSupplier maybeMoreDataSupplier) {
    return config.isAutoRead() &&
           (!respectMaybeMoreData || maybeMoreDataSupplier.get()) &&
           totalMessages < maxMessagePerRead &&
           totalBytesRead > 0;
}

幾個(gè)條件:

  1. 首先是否自動(dòng)讀取正歼。
  2. 且猜測(cè)是否還有更多數(shù)據(jù),如果實(shí)際讀取的和預(yù)估的一致拷橘,說(shuō)明可能還有數(shù)據(jù)沒(méi)讀局义,需要再次循環(huán)。
  3. 如果讀取次數(shù)為達(dá)到 16 次膜楷,繼續(xù)讀取旭咽。
  4. 如果讀取到的總數(shù)大于0,說(shuō)明有數(shù)據(jù)赌厅,繼續(xù)讀取穷绵。

這里的循環(huán)的主要原因就像我們剛剛說(shuō)的,TCP 傳輸過(guò)大數(shù)據(jù)容易丟包(帶寬限制)特愿,因此會(huì)將大包分好幾次傳輸仲墨,還有就是可能預(yù)估的緩沖區(qū)不夠大,沒(méi)有充分讀取 Channel 的內(nèi)容揍障。

6. 總結(jié)

NioSocketChannel$NioSocketChannelUnsafe 的實(shí)現(xiàn)看 read 方法目养。每個(gè) ByteBuf 都會(huì)由一個(gè) Config 實(shí)例中的 ByteBufAllocator 對(duì)象創(chuàng)建,池化或非池化毒嫡,直接內(nèi)存或堆內(nèi)存癌蚁,這些都根據(jù)系統(tǒng)是否支持或參數(shù)設(shè)置,底層使用的是 NIO 的 API兜畸。今天我們看的是非池化的直接內(nèi)存努释。同時(shí),為了節(jié)省內(nèi)存咬摇,為每個(gè) ByteBufAllocator 配置了一個(gè) handle伐蒂,用于計(jì)算和預(yù)估緩沖區(qū)大小。

還有一個(gè)需要注意的地方就是 noCleaner 策略肛鹏。這是 Netty 的一個(gè)優(yōu)化逸邦。針對(duì)默認(rèn)的直接內(nèi)存創(chuàng)建和銷(xiāo)毀做了優(yōu)化--------不使用 JDK 的 cleaner 策略恩沛。

最終讀取數(shù)據(jù)到封裝了 NIO ByteBuffer 實(shí)例的 Netty 的 ByteBuf 中,其中缕减,如果數(shù)據(jù)量超過(guò) 1024雷客,則會(huì)讀取超過(guò)兩次,但最多不超過(guò) 16 次桥狡, 這個(gè)次數(shù)可以設(shè)置佛纫,也就是說(shuō),可能會(huì)調(diào)用超過(guò)2次 fireChannelRead 方法总放,使用的時(shí)候需要注意(存起來(lái)一起在 ChannelReadComplete 使用之類(lèi)的方法)呈宇。

好,關(guān)于 Netty 讀取 Socket 數(shù)據(jù)到容器中的邏輯局雄,就到這里甥啄。

good luck!>娲睢蜈漓!

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市宫盔,隨后出現(xiàn)的幾起案子融虽,更是在濱河造成了極大的恐慌,老刑警劉巖灼芭,帶你破解...
    沈念sama閱讀 211,194評(píng)論 6 490
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件有额,死亡現(xiàn)場(chǎng)離奇詭異,居然都是意外死亡彼绷,警方通過(guò)查閱死者的電腦和手機(jī)巍佑,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,058評(píng)論 2 385
  • 文/潘曉璐 我一進(jìn)店門(mén),熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)寄悯,“玉大人萤衰,你說(shuō)我怎么就攤上這事〔卵” “怎么了脆栋?”我有些...
    開(kāi)封第一講書(shū)人閱讀 156,780評(píng)論 0 346
  • 文/不壞的土叔 我叫張陵,是天一觀(guān)的道長(zhǎng)洒擦。 經(jīng)常有香客問(wèn)我椿争,道長(zhǎng),這世上最難降的妖魔是什么秘遏? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 56,388評(píng)論 1 283
  • 正文 為了忘掉前任丘薛,我火速辦了婚禮嘉竟,結(jié)果婚禮上邦危,老公的妹妹穿的比我還像新娘洋侨。我一直安慰自己,他們只是感情好倦蚪,可當(dāng)我...
    茶點(diǎn)故事閱讀 65,430評(píng)論 5 384
  • 文/花漫 我一把揭開(kāi)白布希坚。 她就那樣靜靜地躺著,像睡著了一般陵且。 火紅的嫁衣襯著肌膚如雪裁僧。 梳的紋絲不亂的頭發(fā)上,一...
    開(kāi)封第一講書(shū)人閱讀 49,764評(píng)論 1 290
  • 那天慕购,我揣著相機(jī)與錄音聊疲,去河邊找鬼。 笑死沪悲,一個(gè)胖子當(dāng)著我的面吹牛获洲,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播殿如,決...
    沈念sama閱讀 38,907評(píng)論 3 406
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼贡珊,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼!你這毒婦竟也來(lái)了涉馁?” 一聲冷哼從身側(cè)響起门岔,我...
    開(kāi)封第一講書(shū)人閱讀 37,679評(píng)論 0 266
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤,失蹤者是張志新(化名)和其女友劉穎烤送,沒(méi)想到半個(gè)月后寒随,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 44,122評(píng)論 1 303
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡帮坚,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,459評(píng)論 2 325
  • 正文 我和宋清朗相戀三年牢裳,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片叶沛。...
    茶點(diǎn)故事閱讀 38,605評(píng)論 1 340
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡蒲讯,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出灰署,到底是詐尸還是另有隱情判帮,我是刑警寧澤,帶...
    沈念sama閱讀 34,270評(píng)論 4 329
  • 正文 年R本政府宣布溉箕,位于F島的核電站晦墙,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏肴茄。R本人自食惡果不足惜晌畅,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,867評(píng)論 3 312
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望寡痰。 院中可真熱鬧抗楔,春花似錦棋凳、人聲如沸。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 30,734評(píng)論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)。三九已至入热,卻和暖如春拍棕,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背勺良。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 31,961評(píng)論 1 265
  • 我被黑心中介騙來(lái)泰國(guó)打工绰播, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人尚困。 一個(gè)月前我還...
    沈念sama閱讀 46,297評(píng)論 2 360
  • 正文 我出身青樓幅垮,卻偏偏與公主長(zhǎng)得像,于是被迫代替她去往敵國(guó)和親尾组。 傳聞我的和親對(duì)象是個(gè)殘疾皇子忙芒,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 43,472評(píng)論 2 348

推薦閱讀更多精彩內(nèi)容

  • 本文是Netty文集中“Netty in action”系列的文章。主要是對(duì)Norman Maurer and M...
    tomas家的小撥浪鼓閱讀 3,194評(píng)論 0 7
  • Spring Cloud為開(kāi)發(fā)人員提供了快速構(gòu)建分布式系統(tǒng)中一些常見(jiàn)模式的工具(例如配置管理讳侨,服務(wù)發(fā)現(xiàn)呵萨,斷路器,智...
    卡卡羅2017閱讀 134,628評(píng)論 18 139
  • 前言 netty源碼分析之pipeline(一)中跨跨,我們已經(jīng)了解了pipeline在netty中所處的角色潮峦,像是一...
    簡(jiǎn)書(shū)閃電俠閱讀 16,152評(píng)論 15 35
  • 法語(yǔ),你在等待一個(gè)孩子嗎勇婴?=你懷孕了嗎忱嘹?世界上最美的語(yǔ)言,法語(yǔ)耕渴,原來(lái)不單指它的發(fā)音拘悦,還體現(xiàn)在他們字面語(yǔ)言上的浪漫,
    Anita2018閱讀 60評(píng)論 0 0
  • 今天用了三個(gè)多小時(shí)填了喜閱教師公益行動(dòng)問(wèn)卷表橱脸,復(fù)制鏈接了幾次础米,不知道成功沒(méi)有?7-12題是自己用訊飛語(yǔ)記口...
    笨南瓜閱讀 248評(píng)論 0 0