NettyClient端的channel為NioSocketChannel业踏,通過writeAndFlush方法將數據發(fā)送到NettyServer端划煮。
NioSocketChannel.writeAndFlush->AbstractChannel.writeAndFlush->pipeline.writeAndFlush->tail.writeAndFlush->AbstractChannelHandlerContext.writeAndFlush->AbstractChannelHandlerContext.write
private void write(Object msg, boolean flush, ChannelPromise promise) {
AbstractChannelHandlerContext next = findContextOutbound();(1)
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
if (flush) {
next.invokeWriteAndFlush(msg, promise);(2)
} else {
next.invokeWrite(msg, promise);
}
} else {
AbstractWriteTask task;
if (flush) {
task = WriteAndFlushTask.newInstance(next, msg, promise);
} else {
task = WriteTask.newInstance(next, msg, promise);
}
safeExecute(executor, task, promise, msg);
}
}
(1)處從TailContext向前找第一個outbounnd context灌灾,業(yè)務handler一般為inbound,因此會找到HeadContext卖毁。
(2)執(zhí)行HeadContext.invokeWriteAndFlush方法羡藐。
private void invokeWriteAndFlush(Object msg, ChannelPromise promise) {
if (invokeHandler()) {
invokeWrite0(msg, promise);
invokeFlush0();
} else {
writeAndFlush(msg, promise);
}
}
private void invokeWrite0(Object msg, ChannelPromise promise) {
try {
((ChannelOutboundHandler) handler()).write(this, msg, promise);
} catch (Throwable t) {
notifyOutboundHandlerException(t, promise);
}
}
private void invokeFlush0() {
try {
((ChannelOutboundHandler) handler()).flush(this);
} catch (Throwable t) {
notifyHandlerException(t);
}
}
最終會調用HeadContext的write和flush方法贩毕,如下:
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
unsafe.write(msg, promise);
}
@Override
public void flush(ChannelHandlerContext ctx) throws Exception {
unsafe.flush();
}
先看unsafe的write方法:
@Override
public final void write(Object msg, ChannelPromise promise) {
assertEventLoop();
ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
if (outboundBuffer == null) {
// If the outboundBuffer is null we know the channel was closed and so
// need to fail the future right away. If it is not null the handling of the rest
// will be done in flush0()
// See https://github.com/netty/netty/issues/2362
safeSetFailure(promise, WRITE_CLOSED_CHANNEL_EXCEPTION);
// release message now to prevent resource-leak
ReferenceCountUtil.release(msg);
return;
}
int size;
try {
msg = filterOutboundMessage(msg);
/**
* pipeline.estimatorHandle()會得到DefaultMessageSizeEstimator,再調用size(msg)方法用于計算該msg的可寫到網絡的字節(jié)的數量仆嗦,即該buf的可讀字節(jié)數量
* 如果msg為ByteBuf辉阶,那么返回該buf的可讀字節(jié)數,即可以發(fā)送出去的字節(jié)數
*/
size = pipeline.estimatorHandle().size(msg);
if (size < 0) {
size = 0;
}
} catch (Throwable t) {
safeSetFailure(promise, t);
ReferenceCountUtil.release(msg);
return;
}
outboundBuffer.addMessage(msg, size, promise);(3)
}
(3)處的方法如下瘩扼,主要是將msg放到ChannelOutboundBuffer的雙向鏈表里:
public void addMessage(Object msg, int size, ChannelPromise promise) {
Entry entry = Entry.newInstance(msg, size, total(msg), promise);
if (tailEntry == null) {
/**
* 添加第1個msg時睛藻,將添加的msg包成entry,作為鏈表tail
* 因為entry還沒flush邢隧,所以flushedEntry設為null
*/
flushedEntry = null;
tailEntry = entry;
} else {
/**
* tail不為null店印,說明已經有msg在之前add進來了
* 將新add的entry連在之前tail的后面,作為新的tail
*/
Entry tail = tailEntry;
tail.next = entry;
tailEntry = entry;
}
if (unflushedEntry == null) {
/**
* 第1次add的entry倒慧,作為unflushedEntry
*/
unflushedEntry = entry;
}
// increment pending bytes after adding message to the unflushed arrays.
// See https://github.com/netty/netty/issues/1619
incrementPendingOutboundBytes(entry.pendingSize, false);
}
放入第1個entry1后按摘,雙向鏈表為:
entry1->null
指針指向為:
flushedEntry->null
unflushedEntry->entry1(本次添加的entry)
tailEntry->entry1
放入第2個entry2后,雙向鏈表為:
entry1->entry2->null
指針指向為:
flushedEntry->null
unflushedEntry->entry1
tailEntry->entry2(本次添加的entry)
再看unsafe的flush方法:
@Override
public final void flush() {
assertEventLoop();
ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
if (outboundBuffer == null) {
return;
}
outboundBuffer.addFlush();(4)
flush0();(5)
}
(4)處的方法為:
public void addFlush() {
// There is no need to process all entries if there was already a flush before and no new messages
// where added in the meantime.
//
// See https://github.com/netty/netty/issues/2577
Entry entry = unflushedEntry;
if (entry != null) {
if (flushedEntry == null) {
// there is no flushedEntry yet, so start with the entry
flushedEntry = entry;
}
do {
flushed ++;
if (!entry.promise.setUncancellable()) {
// Was cancelled so make sure we free up memory and notify about the freed bytes
int pending = entry.cancel();
decrementPendingOutboundBytes(pending, false, true);
}
entry = entry.next;
} while (entry != null);
// All flushed so reset unflushedEntry
unflushedEntry = null;
}
}
執(zhí)行addFlush前后纫谅,雙向鏈表的指針情況如下:
addFlush前指針指向為:
flushedEntry->null
unflushedEntry->entry1
tailEntry->entry1
addFlush后指針指向為:
flushedEntry->entry1
unflushedEntry->null
tailEntry->entry1
addFlush操作主要是從unflushedEntry開始鏈表后面掃炫贤,直到掃到null,并且置unflushedEntry為null付秕,將掃過的entry的promise的result都置為UNCANCELLABLE兰珍。
(5)處的方法為:
protected void flush0() {
if (inFlush0) {
// Avoid re-entrance
return;
}
final ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
if (outboundBuffer == null || outboundBuffer.isEmpty()) {
return;
}
inFlush0 = true;
// Mark all pending write requests as failure if the channel is inactive.
if (!isActive()) {
try {
if (isOpen()) {
outboundBuffer.failFlushed(FLUSH0_NOT_YET_CONNECTED_EXCEPTION, true);
} else {
// Do not trigger channelWritabilityChanged because the channel is closed already.
outboundBuffer.failFlushed(FLUSH0_CLOSED_CHANNEL_EXCEPTION, false);
}
} finally {
inFlush0 = false;
}
return;
}
try {
doWrite(outboundBuffer);(6)
} catch (Throwable t) {
if (t instanceof IOException && config().isAutoClose()) {
/**
* Just call {@link #close(ChannelPromise, Throwable, boolean)} here which will take care of
* failing all flushed messages and also ensure the actual close of the underlying transport
* will happen before the promises are notified.
*
* This is needed as otherwise {@link #isActive()} , {@link #isOpen()} and {@link #isWritable()}
* may still return {@code true} even if the channel should be closed as result of the exception.
*/
close(voidPromise(), t, FLUSH0_CLOSED_CHANNEL_EXCEPTION, false);
} else {
outboundBuffer.failFlushed(t, true);
}
} finally {
inFlush0 = false;
}
}
(6)處的方法為AbstractNioByteChannel的doWrite方法:
@Override
protected void doWrite(ChannelOutboundBuffer in) throws Exception {
int writeSpinCount = -1;
/**
* 當發(fā)送緩沖區(qū)已經滿,沒能將完整數據包發(fā)送出去時询吴,需要設置setOpWrite為true掠河,
* 這樣在因為緩沖區(qū)滿不能將完整數據包發(fā)送出去提前退出循環(huán)后重新設置寫感興趣事件,
* 待下次可寫事件發(fā)生時繼續(xù)把剩下的數據包發(fā)送出去猛计。
*/
boolean setOpWrite = false;
for (;;) {
Object msg = in.current();
if (msg == null) {
// Wrote all messages.
clearOpWrite();
// Directly return here so incompleteWrite(...) is not called.
return;
}
if (msg instanceof ByteBuf) {
ByteBuf buf = (ByteBuf) msg;
int readableBytes = buf.readableBytes();
if (readableBytes == 0) {
in.remove();
continue;
}
boolean done = false;
long flushedAmount = 0;
if (writeSpinCount == -1) {
writeSpinCount = config().getWriteSpinCount();
}
for (int i = writeSpinCount - 1; i >= 0; i --) {
/**
* 這里的localFlushedAmount是真正寫入到channel的字節(jié)數
*/
int localFlushedAmount = doWriteBytes(buf);
if (localFlushedAmount == 0) {
/**
* 寫入的字節(jié)數為0唠摹,說明channel的寫緩沖區(qū)已滿,不能再寫入
*/
setOpWrite = true;
break;
}
flushedAmount += localFlushedAmount;
/**
* buf不可讀說明寫入buf的數據已經全部發(fā)送到channel
*/
if (!buf.isReadable()) {
done = true;
break;
}
}
in.progress(flushedAmount);
if (done) {
in.remove();
} else {
// Break the loop and so incompleteWrite(...) is called.
break;
}
} else if (msg instanceof FileRegion) {
FileRegion region = (FileRegion) msg;
boolean done = region.transfered() >= region.count();
if (!done) {
long flushedAmount = 0;
if (writeSpinCount == -1) {
writeSpinCount = config().getWriteSpinCount();
}
for (int i = writeSpinCount - 1; i >= 0; i--) {
long localFlushedAmount = doWriteFileRegion(region);
if (localFlushedAmount == 0) {
setOpWrite = true;
break;
}
flushedAmount += localFlushedAmount;
if (region.transfered() >= region.count()) {
done = true;
break;
}
}
in.progress(flushedAmount);
}
if (done) {
in.remove();
} else {
// Break the loop and so incompleteWrite(...) is called.
break;
}
} else {
// Should not reach here.
throw new Error();
}
}
incompleteWrite(setOpWrite);
}
doWrite方法執(zhí)行的操作主要為循環(huán)從ChannelOutboundBuffer中取出msg寫到channel的socket發(fā)送緩沖區(qū)奉瘤,直到寫完ChannelOutboundBuffer的所有msg勾拉,但有時因為socket發(fā)送緩沖區(qū)滿了不能及時發(fā)出去,那么設置setOpWrite為true,標識寫半包情況藕赞,退出循環(huán)后要注冊channel的write感興趣事件成肘,等待下次可寫了繼續(xù)發(fā)。
對于server端斧蜕,在NioEventLoop select循環(huán)中獲取到讀事件双霍,解析好client發(fā)送來的數據并響應給client,同樣是執(zhí)行NioSocketChannel.writeAndFlush操作惩激,與client端一樣店煞,不在詳細闡述蟹演。