前言
在 Netty 源碼剖析之 unSafe.read 方法 一文中边翁,我們研究了 read 方法的實現(xiàn)怖现,這是讀取內(nèi)容到容器,再看看 Netty 是如何將內(nèi)容從容器輸出 Channel 的吧流椒。
1. ctx.writeAndFlush 方法
當(dāng)我們調(diào)用此方法時垦巴,會從當(dāng)前節(jié)點找上一個 outbound 節(jié)點,進行邑彪,并調(diào)用下個節(jié)點的 write 方法瞧毙。具體看代碼:
@1
public ChannelFuture writeAndFlush(Object msg) {
return writeAndFlush(msg, newPromise()); // 創(chuàng)建了一個默認(rèn)的 DefaultChannelPromise 實例,返回的就是這個實例。
@2
public ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) {
if (isNotValidPromise(promise, true)) {//判斷 promise 有效性
ReferenceCountUtil.release(msg);// 釋放內(nèi)存
return promise;
}
write(msg, true, promise);
return promise;
}
@3
private void write(Object msg, boolean flush, ChannelPromise promise) {
AbstractChannelHandlerContext next = findContextOutbound();
final Object m = pipeline.touch(msg, next);
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
if (flush) {
next.invokeWriteAndFlush(m, promise);
} else {
next.invokeWrite(m, promise);
}
} else {
AbstractWriteTask task;
if (flush) {
task = WriteAndFlushTask.newInstance(next, m, promise);
} else {
task = WriteTask.newInstance(next, m, promise);
}
safeExecute(executor, task, promise, m);
}
}
最終調(diào)用的就是 @3 方法宙彪。找到上一個 outbound 節(jié)點矩动,判斷他的節(jié)點是否時當(dāng)前線程。如果是释漆,則會直接調(diào)用悲没,泛著,將后面的工作封裝成一個任務(wù)放進 mpsc 隊列男图,供當(dāng)前線程稍后執(zhí)行示姿。這個 任務(wù)的 run 方法如下:
public final void run() {
try {
if (ESTIMATE_TASK_SIZE_ON_SUBMIT) {
ctx.pipeline.decrementPendingOutboundBytes(size);
}
write(ctx, msg, promise);
} finally {
ctx = null;
msg = null;
promise = null;
handle.recycle(this);
}
}
protected void write(AbstractChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
ctx.invokeWrite(msg, promise);
}
最終執(zhí)行的是 invokeWrite 方法。
我們看看如果直接執(zhí)行會如何處理逊笆。
private void invokeWriteAndFlush(Object msg, ChannelPromise promise) {
if (invokeHandler()) {
invokeWrite0(msg, promise);
invokeFlush0();
} else {
writeAndFlush(msg, promise);
}
}
先執(zhí)行 invokeWrite0 方法進行 write栈戳,然后 flush。
最終執(zhí)行的是 unSafe 的 write 方法览露。注意:當(dāng)前節(jié)點已經(jīng)到了 Head 節(jié)點荧琼。
詳細(xì)說說該方法。
2. unSafe 的 write 方法
public final void write(Object msg, ChannelPromise promise) {
ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
if (outboundBuffer == null) {
safeSetFailure(promise, WRITE_CLOSED_CHANNEL_EXCEPTION);
ReferenceCountUtil.release(msg);
return;
}
int size;
try {
msg = filterOutboundMessage(msg);
size = pipeline.estimatorHandle().size(msg);
if (size < 0) {
size = 0;
}
} catch (Throwable t) {
safeSetFailure(promise, t);
ReferenceCountUtil.release(msg);
return;
}
outboundBuffer.addMessage(msg, size, promise);
}
方法步驟如下:
- 判斷 outboundBuffer 有效性差牛。
- 將 ByteBuf 過濾成池化或者線程局部直接內(nèi)存(如果不是直接內(nèi)存的話)命锄。
- 預(yù)估當(dāng)前 ByteBuf 大小(就是可讀字節(jié)數(shù))偏化。
- 將 ByteBuf 包裝成一個 Entry 節(jié)點放入到 outboundBuffer 的單向鏈表中脐恩。
這里有一個地方需要注意一下, filterOutboundMessage 方法侦讨。
如果是直接內(nèi)存的話驶冒,就直接返回了 ,反之調(diào)用 newDirectBuffer 方法韵卤。我們猜想肯定是重新包裝成直接內(nèi)存骗污,利用直接內(nèi)存令拷貝的特性,提升性能沈条。
看看該方法內(nèi)部邏輯:
protected final ByteBuf newDirectBuffer(ByteBuf buf) {
final int readableBytes = buf.readableBytes();
if (readableBytes == 0) {
ReferenceCountUtil.safeRelease(buf);
return Unpooled.EMPTY_BUFFER;
}
final ByteBufAllocator alloc = alloc();
if (alloc.isDirectBufferPooled()) {
ByteBuf directBuf = alloc.directBuffer(readableBytes);
directBuf.writeBytes(buf, buf.readerIndex(), readableBytes);
ReferenceCountUtil.safeRelease(buf);
return directBuf;
}
final ByteBuf directBuf = ByteBufUtil.threadLocalDirectBuffer();
if (directBuf != null) {
directBuf.writeBytes(buf, buf.readerIndex(), readableBytes);
ReferenceCountUtil.safeRelease(buf);
return directBuf;
}
// Allocating and deallocating an unpooled direct buffer is very expensive; give up.
return buf;
}
- 首先判斷可讀字節(jié)數(shù)需忿,如果是0,之際返回一個空的 Buffer蜡歹。
- 獲取該 Channel 的 ByteBufAllocator 屋厘,如果是直接內(nèi)存且池化,則分配一個直接內(nèi)存月而,將舊的 Buffer 寫入到新的中汗洒,釋放舊的 Buffer。返回新的直接內(nèi)存 Buffer父款。
- 反之溢谤,從 FastThreadLocal 中返回一個可重用的直接內(nèi)存 Buffer瞻凤,后面和上面的操作一樣,寫入溯香,刪除舊的鲫构,返回新的。注意玫坛,這里返回的可重用的 Buffer结笨,當(dāng)調(diào)用他的 release 方法的時候,實際上是歸還到了 FastThreadLocal 中湿镀。對象池的最佳實踐炕吸。
關(guān)于 addMessage 方法,我們將在另一篇文章Netty 出站緩沖區(qū) ChannelOutboundBuffer 源碼解析(isWritable 屬性的重要性)詳細(xì)闡述勉痴,這里只需要知道赫模,他放入了一個 出站緩存中就行了。
3. unSafe 的 flush 方法
重點是 outboundBuffer.addFlush() 方法和 flush0 方法蒸矛。
這兩個方法在 Netty 出站緩沖區(qū) ChannelOutboundBuffer 源碼解析(isWritable 屬性的重要性)
這里只需要知道瀑罗,addFlush 方法將剛剛添加進出站 buffer 的數(shù)據(jù)進行檢查,并準(zhǔn)備寫入 Socket雏掠。
flush0 做真正的寫入操作斩祭,其中,調(diào)用了 JDK 的 Socket 的 write 方法乡话,將 ByteBuf 封裝的 ByteBuffer 寫到 Socket 中摧玫。
總結(jié)
可以看到,數(shù)據(jù)真正的寫出還是調(diào)用了 head 的節(jié)點中 unsafe 的 write 方法和 flush 方法绑青,其中诬像,write 只是將數(shù)據(jù)寫入到了出站緩沖區(qū),并且闸婴,write 方法可以調(diào)用多次坏挠,flush 才是真正的寫入到 Socket。而更詳細(xì)的細(xì)節(jié)邪乍,可以查看我的另一篇文章Netty 出站緩沖區(qū) ChannelOutboundBuffer 源碼解析(isWritable 屬性的重要性)降狠。
good luck !D缗贰!柏肪!