Netty Data Stream Handling - write
上篇文章中介紹了Netty是讀數(shù)據(jù)的流程:EventLoop不停的select IO督函;一旦發(fā)現(xiàn)OP_READ可用則利用Channel.Unsafe讀取數(shù)據(jù)触幼,并把數(shù)據(jù)傳給Pipeline墩划;Pipeline拿到數(shù)據(jù)化撕,并交由內(nèi)部的Handler處理譬正。還有上篇文章也說了Pipeline中的兩種數(shù)據(jù)流向:inbound和outbound。read操作符合inbound流向盛撑;而write則符合outbound劉翔碎节。這些知識對我們理解write操作大有裨益,因為write操作在Pipeline中是read的反向操作抵卫。
那我們開始介紹Netty的write寫數(shù)據(jù)流程狮荔。先找到EchoServerHandler的channelRead方法,在這個示例中介粘,它會把讀到的數(shù)據(jù)再寫回客戶端:
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ctx.write(msg);
}
這個方法直接調(diào)用了ChannelHandlerContext的write方法:
@Override
public ChannelFuture write(Object msg) {
return write(msg, newPromise());
}
沒什么好解釋的殖氏,給出write的調(diào)用鏈:
@Override
public ChannelFuture write(final Object msg, final ChannelPromise promise) {
if (msg == null) {
throw new NullPointerException("msg");
}
try {
if (isNotValidPromise(promise, true)) {
ReferenceCountUtil.release(msg);
// cancelled
return promise;
}
} catch (RuntimeException e) {
ReferenceCountUtil.release(msg);
throw e;
}
write(msg, false, promise);
return promise;
}
private void write(Object msg, boolean flush, ChannelPromise promise) {
AbstractChannelHandlerContext next = findContextOutbound();
final Object m = pipeline.touch(msg, next);
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
if (flush) {
next.invokeWriteAndFlush(m, promise);
} else {
next.invokeWrite(m, promise);
}
} else {
AbstractWriteTask task;
if (flush) {
task = WriteAndFlushTask.newInstance(next, m, promise);
} else {
task = WriteTask.newInstance(next, m, promise);
}
safeExecute(executor, task, promise, m);
}
}
private void invokeWrite(Object msg, ChannelPromise promise) {
if (invokeHandler()) {
invokeWrite0(msg, promise);
} else {
write(msg, promise);
}
}
private void invokeWrite0(Object msg, ChannelPromise promise) {
try {
((ChannelOutboundHandler) handler()).write(this, msg, promise);
} catch (Throwable t) {
notifyOutboundHandlerException(t, promise);
}
}
invokeWrite0接下來會調(diào)用HeadContext中的write方法(沒什么好解釋的了,read的反向操作):
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
unsafe.write(msg, promise);
}
這個方法調(diào)用了Unsafe的write方法姻采,至此write操作從Pipeline中走完了雅采,接下來才是重頭戲。我們來看這個AbstractUnsafe實現(xiàn)的write方法:
@Override
public final void write(Object msg, ChannelPromise promise) {
assertEventLoop();
// 1
ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
if (outboundBuffer == null) {
safeSetFailure(promise, WRITE_CLOSED_CHANNEL_EXCEPTION);
ReferenceCountUtil.release(msg);
return;
}
int size;
try {
// 2
msg = filterOutboundMessage(msg);
size = pipeline.estimatorHandle().size(msg);
if (size < 0) {
size = 0;
}
} catch (Throwable t) {
safeSetFailure(promise, t);
ReferenceCountUtil.release(msg);
return;
}
// 3
outboundBuffer.addMessage(msg, size, promise);
}
直接調(diào)用write寫數(shù)據(jù)的時候慨亲,并不是直接寫到channel中婚瓜,而是先寫到緩沖區(qū)里,也就是ChannelOutboundBuffer刑棵。當調(diào)用調(diào)用flushflush才開始向channel寫數(shù)據(jù)巴刻。ChannelOutboundBuffer是一個無界鏈表,如果不停的向緩沖區(qū)寫入數(shù)據(jù)可能會導致內(nèi)存溢出蛉签。因此ChannelOutboundBuffer檢測當newWriteBufferSize > channel.config().getWriteBufferHighWaterMark()
時胡陪,buffer便不可寫。同時通知Pipeline Writablity Changed:pipeline.fireChannelWritabilityChanged();
碍舍。我們在實現(xiàn)自己的Handler時可以重新實現(xiàn)該方法督弓。
我們接著看下filterOutboundMessage這個方法:
@Override
protected final Object filterOutboundMessage(Object msg) {
if (msg instanceof ByteBuf) {
ByteBuf buf = (ByteBuf) msg;
if (buf.isDirect()) {
return msg;
}
return newDirectBuffer(buf);
}
if (msg instanceof FileRegion) {
return msg;
}
throw new UnsupportedOperationException(
"unsupported message type: " + StringUtil.simpleClassName(msg) + EXPECTED_TYPES);
}
在講解ByteBuf的文章中提到過關(guān)于使用直接內(nèi)存還是堆內(nèi)存的最佳實踐:在IO通信的線程中操作ByteBuf應(yīng)使用DirectBuffer(省去內(nèi)存拷貝的成本),在后端業(yè)務(wù)邏輯中操作ByteBuf應(yīng)使用HeapBuffer(不用擔心內(nèi)存泄露)乒验。
這個方法首先檢查msg類型是不是ByteBuf,如果是ByteBuf則檢查是不是使用了直接內(nèi)存蒂阱,如果沒有則把基于堆的內(nèi)存換成直接內(nèi)存锻全。
我們接著分析ChannelOutboundBuffer的addMessage方法:
public void addMessage(Object msg, int size, ChannelPromise promise) {
Entry entry = Entry.newInstance(msg, size, total(msg), promise);
if (tailEntry == null) {
flushedEntry = null;
tailEntry = entry;
} else {
Entry tail = tailEntry;
tail.next = entry;
tailEntry = entry;
}
if (unflushedEntry == null) {
unflushedEntry = entry;
}
incrementPendingOutboundBytes(entry.pendingSize, false);
}
這個方法做了兩件事情:把msg加入到鏈表中;調(diào)用incrementPendingOutboundBytes方法录煤。我們來看這個方法:
private void incrementPendingOutboundBytes(long size, boolean invokeLater) {
if (size == 0) {
return;
}
long newWriteBufferSize = TOTAL_PENDING_SIZE_UPDATER.addAndGet(this, size);
if (newWriteBufferSize > channel.config().getWriteBufferHighWaterMark()) {
setUnwritable(invokeLater);
}
}
首先鳄厌,先更新緩沖區(qū)大小,接著判斷緩沖區(qū)是否大于最高水位妈踊,如果是則設(shè)置buffer為unWritable(默認的高水位線大小是64K)了嚎。setUnwritable方法內(nèi)通知了Pipeline Writablity Changed,這里不貼代碼了自己看去吧。
到這的話write方法是執(zhí)行完了歪泳,但是數(shù)據(jù)仍然留在內(nèi)存中萝勤。接下來我們看flush方法。跳過Pipeline直接來看AbstractUnsafe的flush:
@Override
public final void flush() {
assertEventLoop();
ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
if (outboundBuffer == null) {
return;
}
outboundBuffer.addFlush();
flush0();
}
這個方法首先調(diào)用了outboundBuffer的addFlush方法呐伞,我們先看下addFlush做了什么:
public void addFlush() {
Entry entry = unflushedEntry;
if (entry != null) {
if (flushedEntry == null) {
// there is no flushedEntry yet, so start with the entry
flushedEntry = entry;
}
do {
flushed ++;
if (!entry.promise.setUncancellable()) {
int pending = entry.cancel();
decrementPendingOutboundBytes(pending, false, true);
}
entry = entry.next;
} while (entry != null);
unflushedEntry = null;
}
}
unflushedEntry表示緩沖區(qū)鏈表中第一個未被flush的元素敌卓。如果這個變量為null的話,表示當前緩沖區(qū)已被flush伶氢。for循環(huán)中首先把entry的promise設(shè)置為Uncancellable(Promise繼承自Future趟径,F(xiàn)uture是可以cancel的),然后增加flushed計數(shù)癣防。decrementPendingOutboundBytes方法中有邏輯檢查目前Buffer是否低于低水位蜗巧,如果是則重置Buffer為可寫。
我們接著看flush0方法蕾盯,flush0主要是調(diào)用NioSocketChannel的doWrite方法:
protected void flush0() {
///...
doWrite(outboundBuffer);
///...
}
protected void doWrite(ChannelOutboundBuffer in) throws Exception {
for (;;) {
int size = in.size();
if (size == 0) {
clearOpWrite();
break;
}
long writtenBytes = 0;
boolean done = false;
boolean setOpWrite = false;
// 1
ByteBuffer[] nioBuffers = in.nioBuffers();
int nioBufferCnt = in.nioBufferCount();
long expectedWrittenBytes = in.nioBufferSize();
SocketChannel ch = javaChannel();
switch (nioBufferCnt) {
case 0:
super.doWrite(in);
return;
case 1:
ByteBuffer nioBuffer = nioBuffers[0];
for (int i = config().getWriteSpinCount() - 1; i >= 0; i --) {
// 2
final int localWrittenBytes = ch.write(nioBuffer);
if (localWrittenBytes == 0) {
setOpWrite = true;
break;
}
expectedWrittenBytes -= localWrittenBytes;
writtenBytes += localWrittenBytes;
if (expectedWrittenBytes == 0) {
done = true;
break;
}
}
break;
default:
///...
}
in.removeBytes(writtenBytes);
if (!done) {
// Did not write all buffers completely.
incompleteWrite(setOpWrite);
break;
}
}
}
doWrite方法首先獲取Buffer中緩存的消息幕屹,并將Netty ByteBuf轉(zhuǎn)成Nio ByteBuffer;然后把數(shù)據(jù)寫入Nio SocketChannel刑枝。寫數(shù)據(jù)的操作放到了一個for循環(huán)中香嗓。因為Netty的一個緩沖數(shù)據(jù)可能不會一次性的刷到Channel中,如果只作一次write操作就返回装畅,那么很有可能余下的數(shù)據(jù)要等到下次OP_WRITE Selection Key可用才能全部寫完靠娱。這期間經(jīng)歷了一次昂貴的IO操作,所以這個for循環(huán)又是CPU時間和IO時間的一個精心分配掠兄。
至此像云,Netty寫數(shù)據(jù)的流程分析完畢。