一、ChannelOutboundBuffer
1、定義
是AbstractUnsafe使用的數(shù)據(jù)結(jié)構(gòu),用來存儲待發(fā)送的數(shù)據(jù)拌消。
在channel.unsafe實例化時,ChannelOutboundBuffer一起被初始化安券。每個channel都有一個自己的ChannelOutboundBuffer墩崩。
2、ChannelOutboundBuffer中的field
Channel channel --> 所綁定的Channel
Entry flushedEntry --> 表示下一個要被flush的Entry
Entry unflushedEntry --> 表示下一次要flush截止的Entry
Entry tailEntry
int flushed
int nioBufferCount
int nioBufferSize
long totalPendingSize --> 已存儲的需要被write到socket發(fā)送緩存中的byte大小
int unwritable --> 表示當(dāng)前channel的待發(fā)送緩存是否可以繼續(xù)寫入數(shù)據(jù)
ChannelOutboundBuffer中維護(hù)了節(jié)點(diǎn)元素為Entry的單向鏈表侯勉。
Entry為待發(fā)送數(shù)據(jù)的抽象鹦筹,實際待發(fā)送數(shù)據(jù)保存在Entry的Object msg中。
二址貌、write的過程
1铐拐、入口
AbstractChannel、DefaultChannelPipeline或AbstractChannelHandlerContext提供的write(Object msg)方法练对,最終都會由HeadContext.write方法執(zhí)行遍蟋,最終交由AbstractUnsafe.write(Object msg,ChannelPromise promise)實現(xiàn)。
@Override
public final void write(Object msg, ChannelPromise promise) {
assertEventLoop();
ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
if (outboundBuffer == null) {
// If the outboundBuffer is null we know the channel was closed and so
// need to fail the future right away. If it is not null the handling of the rest
// will be done in flush0()
// See https://github.com/netty/netty/issues/2362
safeSetFailure(promise, CLOSED_CHANNEL_EXCEPTION);
// release message now to prevent resource-leak
ReferenceCountUtil.release(msg);
return;
}
int size;
try {
msg = filterOutboundMessage(msg);
size = pipeline.estimatorHandle().size(msg);
if (size < 0) {
size = 0;
}
} catch (Throwable t) {
safeSetFailure(promise, t);
ReferenceCountUtil.release(msg);
return;
}
outboundBuffer.addMessage(msg, size, promise);
}
2螟凭、取得ChannelOutboundBuffer
ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
3虚青、對要進(jìn)行write操作的數(shù)據(jù)msg,如果是ByteBuf類型則統(tǒng)一轉(zhuǎn)換為DirectByteBuf實現(xiàn)
msg = filterOutboundMessage(msg);
內(nèi)部調(diào)用AbstractNioByteChannel.filterOutboundMessage(Object msg)方法螺男,如果msg是ByteBuf類型棒厘,則將其轉(zhuǎn)換為DirectByteBuf的實現(xiàn)纵穿。(調(diào)用ByteBufAllocator.directBuffer(initialCapacity)分配一塊直接緩存空間并將原msg中的字節(jié)流放入)
4、 計算msg的大小
int size = pipeline.estimatorHandle().size(msg);
對于ByteBuf類型的msg奢人,直接調(diào)用readableBytes()方法谓媒。
5、將ByteBuf msg存入ChannelOutboundBuffer中
outboundBuffer.addMessage(msg, size, promise);
1)將msg封裝成Entry對象达传,并放入單向鏈表的尾部tailEntry
Entry entry = Entry.newInstance(msg, size, total(msg), promise);
tailEntry = entry;
netty使用基于thread-local的輕量級對象池Recycler對Entry進(jìn)行回收,避免多次實例化的垃圾回收和開銷迫筑。
2)更新ChannelOutboundBuffer的totalPendingSize宪赶,累加上本次新增的大小
incrementPendingOutboundBytes(size, false);
若totalPendingSize超過了channel的高水位線:
-將unwritable狀態(tài)更新為不可寫;
-執(zhí)行pipeline.fileChannelWritabilityChanged()脯燃;
三搂妻、flush過程
1、入口
AbstractChannel辕棚、DefaultChannelPipeline或AbstractChannelHandlerContext提供的flush()方法欲主,都會由HeadContext.flush(ChannelHandlerContext ctx)方法執(zhí)行,最終交由AbstractUnsafe.flush()實現(xiàn)逝嚎。
@Override
public final void flush() {
assertEventLoop();
ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
if (outboundBuffer == null) {
return;
}
outboundBuffer.addFlush();
flush0();
}
2扁瓢、更新ChannelOutboundBuffer中本次將要flush的Entry區(qū)間
outboundBuffer.addFlush();
將ChannelOutboundBuffer的unflushedEntry向后不斷移動到tailEntry,操作結(jié)束后本次要flush的鏈表區(qū)間就是flushedEntry->unflushedEntry补君。
3引几、檢查ChannelOutboundBuffer是否有待flush的數(shù)據(jù),如果沒有則直接返回挽铁,終止flush過程
4伟桅、將ChannelOutboundBuffer的要flush的鏈表區(qū)間數(shù)據(jù)寫入TCP發(fā)送緩沖區(qū)
NioByteUnsafe.doWrite(ChannelOutboundBuffer in)
對Entry鏈表區(qū)間中的每個Entry.msg(即ByteBuf)執(zhí)行以下邏輯--->
1)如果當(dāng)前flushedEntry為空,則將OP_WRITE事件從對應(yīng)Channel的interestOp中移除叽掘,跳出遍歷直接到步驟5)
if (msg == null) {
// Wrote all messages.
clearOpWrite();
// Directly return here so incompleteWrite(...) is not called.
return;
}
2)采用類似自旋鎖的邏輯不斷調(diào)用NioSocketChannel.doWriteBytes(ByteBuf buf)楣铁,將Entry.msg(即ByteBuf)中的數(shù)據(jù)寫入套接字的發(fā)送緩沖區(qū)
-內(nèi)部調(diào)用ByteBuf.readBytes(Channel out, int length)接口;
-實際底層最終調(diào)用nio.SocketChannel.write(ByteBuffer src)更扁;
-如果自旋過程中出現(xiàn)nio.SocketChannel.write(ByteBuffer src)返回結(jié)果為0盖腕,說明此時TCP發(fā)送緩沖隊列已滿,則退出自旋write并將OP_WRITE添加到ch.selectionKey.interestOps中浓镜,等待TCP發(fā)送緩沖隊列可寫時重新出發(fā)write操作赊堪;
-如果Entry的自旋write達(dá)到一定次數(shù)還沒有將Entry中的數(shù)據(jù)寫完,則直接跳出鏈表遍歷操作竖哩,執(zhí)行最后的incompleteWrite哭廉;
3)構(gòu)造ChannelPromise通知當(dāng)前已write的數(shù)據(jù)進(jìn)度
in.progress(flushedAmount);
4)如果當(dāng)前flushedEntry中的數(shù)據(jù)已寫完,將Entry從ChannelOutboundBuffer中清理回收
in.remove();
-將當(dāng)前entry從Entry鏈表中刪除相叁;
-從totalPendingSize中減去entry已write出去的字節(jié)數(shù)遵绰;
-若totalPendingSize小于了channel的低水位線辽幌,將unwritable狀態(tài)更新為可寫,并調(diào)用pipeline.fileChannelWritabilityChanged()產(chǎn)生ChannelWritabilityChanged事件椿访。
<---對Entry鏈表區(qū)間中的每個Entry.msg(即ByteBuf)執(zhí)行以上邏輯
5)根據(jù)當(dāng)前socket的可寫狀態(tài)乌企,進(jìn)行后續(xù)操作
incompleteWrite(setOpWrite);
-若當(dāng)前TCP發(fā)送緩沖區(qū)已滿,則將OP_WRITE添加到ch.selectionKey.interestOps中成玫,等待TCP發(fā)送緩沖隊列可寫時重新觸發(fā)write操作加酵;
-若當(dāng)前TCP發(fā)送緩沖區(qū)未滿,構(gòu)造一個flush()事件哭当,等待EventLoop的下一個循環(huán)重新檢測ChannelOutboundBuffer中有無待flush的數(shù)據(jù)猪腕。