本系列Netty源碼解析文章基于 4.1.56.Final版本
我們接著上篇文章一文搞懂Netty發(fā)送數(shù)據(jù)全流程 | 你想知道的細(xì)節(jié)全在這里繼續(xù)講解 Netty 的 flush 全流程疾棵。
4. flush
從前面 Netty 對(duì) write 事件的處理過程中舞肆,我們可以看到當(dāng)用戶調(diào)用 ctx.write(msg) 方法之后,Netty 只是將用戶要發(fā)送的數(shù)據(jù)臨時(shí)寫到 channel 對(duì)應(yīng)的待發(fā)送緩沖隊(duì)列 ChannelOutboundBuffer 中,然而并不會(huì)將數(shù)據(jù)寫入 Socket 中筛峭。
而當(dāng)一次 read 事件完成之后,我們會(huì)調(diào)用 ctx.flush() 方法將 ChannelOutboundBuffer 中的待發(fā)送數(shù)據(jù)寫入 Socket 中的發(fā)送緩沖區(qū)中物蝙,從而將數(shù)據(jù)發(fā)送出去泛豪。
public class EchoServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelReadComplete(ChannelHandlerContext ctx) {
//本次OP_READ事件處理完畢
ctx.flush();
}
}
4.1 flush事件的傳播
flush 事件和 write 事件一樣都是 oubound 事件,所以它們的傳播方向都是從后往前在 pipeline 中傳播捎泻。
觸發(fā) flush 事件傳播的同樣也有兩個(gè)方法:
channelHandlerContext.flush()
:flush事件會(huì)從當(dāng)前 channelHandler 開始在 pipeline 中向前傳播直到 headContext飒炎。channelHandlerContext.channel().flush()
:flush 事件會(huì)從 pipeline 的尾結(jié)點(diǎn) tailContext 處開始向前傳播直到 headContext。
abstract class AbstractChannelHandlerContext implements ChannelHandlerContext, ResourceLeakHint {
@Override
public ChannelHandlerContext flush() {
//向前查找覆蓋flush方法的Outbound類型的ChannelHandler
final AbstractChannelHandlerContext next = findContextOutbound(MASK_FLUSH);
//獲取執(zhí)行ChannelHandler的executor,在初始化pipeline的時(shí)候設(shè)置笆豁,默認(rèn)為Reactor線程
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeFlush();
} else {
Tasks tasks = next.invokeTasks;
if (tasks == null) {
next.invokeTasks = tasks = new Tasks(next);
}
safeExecute(executor, tasks.invokeFlushTask, channel().voidPromise(), null, false);
}
return this;
}
}
這里的邏輯和 write 事件傳播的邏輯基本一樣郎汪,也是首先通過findContextOutbound(MASK_FLUSH) 方法從當(dāng)前 ChannelHandler 開始從 pipeline 中向前查找出第一個(gè) ChannelOutboundHandler 類型的并且實(shí)現(xiàn) flush 事件回調(diào)方法的 ChannelHandler 。注意這里傳入的執(zhí)行資格掩碼為 MASK_FLUSH闯狱。
執(zhí)行ChannelHandler中事件回調(diào)方法的線程必須是通過pipeline#addLast(EventExecutorGroup group, ChannelHandler... handlers)
為 ChannelHandler 指定的 executor煞赢。如果不指定,默認(rèn)的 executor 為 channel 綁定的 reactor 線程哄孤。
如果當(dāng)前線程不是 ChannelHandler 指定的 executor槽袄,則需要將 invokeFlush() 方法的調(diào)用封裝成 Task 交給指定的 executor 執(zhí)行缔俄。
4.1.1 觸發(fā)nextChannelHandler的flush方法回調(diào)
private void invokeFlush() {
if (invokeHandler()) {
invokeFlush0();
} else {
//如果該ChannelHandler并沒有加入到pipeline中則繼續(xù)向前傳遞flush事件
flush();
}
}
這里和 write 事件的相關(guān)處理一樣侦另,首先也是需要調(diào)用 invokeHandler() 方法來判斷這個(gè) nextChannelHandler 是否在 pipeline 中被正確的初始化。
如果 nextChannelHandler 中的 handlerAdded 方法并沒有被回調(diào)過波俄,那么這里就只能跳過 nextChannelHandler,并調(diào)用 ChannelHandlerContext#flush 方法繼續(xù)向前傳播flush事件蛾默。
如果 nextChannelHandler 中的 handlerAdded 方法已經(jīng)被回調(diào)過懦铺,說明 nextChannelHandler 在 pipeline 中已經(jīng)被正確的初始化好,則直接調(diào)用nextChannelHandler 的 flush 事件回調(diào)方法趴生。
private void invokeFlush0() {
try {
((ChannelOutboundHandler) handler()).flush(this);
} catch (Throwable t) {
invokeExceptionCaught(t);
}
}
這里有一點(diǎn)和 write 事件處理不同的是阀趴,當(dāng)調(diào)用 nextChannelHandler 的 flush 回調(diào)出現(xiàn)異常的時(shí)候,會(huì)觸發(fā) nextChannelHandler 的 exceptionCaught 回調(diào)苍匆。
private void invokeExceptionCaught(final Throwable cause) {
if (invokeHandler()) {
try {
handler().exceptionCaught(this, cause);
} catch (Throwable error) {
if (logger.isDebugEnabled()) {
logger.debug(....相關(guān)日志打印......);
} else if (logger.isWarnEnabled()) {
logger.warn(...相關(guān)日志打印......));
}
}
} else {
fireExceptionCaught(cause);
}
}
而其他 outbound 類事件比如 write 事件在傳播的過程中發(fā)生異常刘急,只是回調(diào)通知相關(guān)的 ChannelFuture。并不會(huì)觸發(fā) exceptionCaught 事件的傳播浸踩。
4.2 flush事件的處理
最終flush事件會(huì)在pipeline中一直向前傳播至HeadContext中叔汁,并在 HeadContext 里調(diào)用 channel 的 unsafe 類完成 flush 事件的最終處理邏輯。
final class HeadContext extends AbstractChannelHandlerContext {
@Override
public void flush(ChannelHandlerContext ctx) {
unsafe.flush();
}
}
下面就真正到了 Netty 處理 flush 事件的地方检碗。
protected abstract class AbstractUnsafe implements Unsafe {
@Override
public final void flush() {
assertEventLoop();
ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
//channel以關(guān)閉
if (outboundBuffer == null) {
return;
}
//將flushedEntry指針指向ChannelOutboundBuffer頭結(jié)點(diǎn)据块,此時(shí)變?yōu)榧磳⒁猣lush進(jìn)Socket的數(shù)據(jù)隊(duì)列
outboundBuffer.addFlush();
//將待寫數(shù)據(jù)寫進(jìn)Socket
flush0();
}
}
4.2.1 ChannelOutboundBuffer#addFlush
這里就到了真正要發(fā)送數(shù)據(jù)的時(shí)候了,在 addFlush 方法中會(huì)將 flushedEntry 指針指向 unflushedEntry 指針表示的第一個(gè)未被 flush 的 Entry 節(jié)點(diǎn)折剃。并將 unflushedEntry 指針置為空另假,準(zhǔn)備開始 flush 發(fā)送數(shù)據(jù)流程。
此時(shí) ChannelOutboundBuffer 由待發(fā)送數(shù)據(jù)的緩沖隊(duì)列變?yōu)榱思磳⒁?flush 進(jìn) Socket 的數(shù)據(jù)隊(duì)列
這樣在 flushedEntry 與 tailEntry 之間的 Entry 節(jié)點(diǎn)即為本次 flush 操作需要發(fā)送的數(shù)據(jù)范圍怕犁。
public void addFlush() {
Entry entry = unflushedEntry;
if (entry != null) {
if (flushedEntry == null) {
flushedEntry = entry;
}
do {
flushed ++;
//如果當(dāng)前entry對(duì)應(yīng)的write操作被用戶取消边篮,則釋放msg,并降低channelOutboundBuffer水位線
if (!entry.promise.setUncancellable()) {
int pending = entry.cancel();
decrementPendingOutboundBytes(pending, false, true);
}
entry = entry.next;
} while (entry != null);
// All flushed so reset unflushedEntry
unflushedEntry = null;
}
}
在 flush 發(fā)送數(shù)據(jù)流程開始時(shí)奏甫,數(shù)據(jù)的發(fā)送流程就不能被取消了戈轿,在這之前我們都是可以通過 ChannelPromise 取消數(shù)據(jù)發(fā)送流程的。
所以這里需要對(duì) ChannelOutboundBuffer 中所有 Entry 節(jié)點(diǎn)包裹的 ChannelPromise 設(shè)置為不可取消狀態(tài)阵子。
public interface Promise<V> extends Future<V> {
/**
* 設(shè)置當(dāng)前future為不可取消狀態(tài)
*
* 返回true的情況:
* 1:成功的將future設(shè)置為uncancellable
* 2:當(dāng)future已經(jīng)成功完成
*
* 返回false的情況:
* 1:future已經(jīng)被取消思杯,則不能在設(shè)置 uncancellable 狀態(tài)
*
*/
boolean setUncancellable();
}
如果這里的 setUncancellable() 方法返回 false 則說明在這之前用戶已經(jīng)將 ChannelPromise 取消掉了,接下來就需要調(diào)用 entry.cancel() 方法來釋放為待發(fā)送數(shù)據(jù) msg 分配的堆外內(nèi)存挠进。
static final class Entry {
//write操作是否被取消
boolean cancelled;
int cancel() {
if (!cancelled) {
cancelled = true;
int pSize = pendingSize;
// release message and replace with an empty buffer
ReferenceCountUtil.safeRelease(msg);
msg = Unpooled.EMPTY_BUFFER;
pendingSize = 0;
total = 0;
progress = 0;
bufs = null;
buf = null;
return pSize;
}
return 0;
}
}
當(dāng) Entry 對(duì)象被取消后色乾,就需要減少 ChannelOutboundBuffer 的內(nèi)存占用總量的水位線 totalPendingSize。
private static final AtomicLongFieldUpdater<ChannelOutboundBuffer> TOTAL_PENDING_SIZE_UPDATER =
AtomicLongFieldUpdater.newUpdater(ChannelOutboundBuffer.class, "totalPendingSize");
//水位線指針.ChannelOutboundBuffer中的待發(fā)送數(shù)據(jù)的內(nèi)存占用總量 : 所有Entry對(duì)象本身所占用內(nèi)存大小 + 所有待發(fā)送數(shù)據(jù)的大小
private volatile long totalPendingSize;
private void decrementPendingOutboundBytes(long size, boolean invokeLater, boolean notifyWritability) {
if (size == 0) {
return;
}
long newWriteBufferSize = TOTAL_PENDING_SIZE_UPDATER.addAndGet(this, -size);
if (notifyWritability && newWriteBufferSize < channel.config().getWriteBufferLowWaterMark()) {
setWritable(invokeLater);
}
}
當(dāng)更新后的水位線低于低水位線 DEFAULT_LOW_WATER_MARK = 32 * 1024
時(shí)领突,就將當(dāng)前 channel 設(shè)置為可寫狀態(tài)暖璧。
private void setWritable(boolean invokeLater) {
for (;;) {
final int oldValue = unwritable;
final int newValue = oldValue & ~1;
if (UNWRITABLE_UPDATER.compareAndSet(this, oldValue, newValue)) {
if (oldValue != 0 && newValue == 0) {
fireChannelWritabilityChanged(invokeLater);
}
break;
}
}
}
當(dāng) Channel 的狀態(tài)是第一次從不可寫狀態(tài)變?yōu)榭蓪憼顟B(tài)時(shí),Netty 會(huì)在 pipeline 中再次觸發(fā) ChannelWritabilityChanged 事件的傳播攘须。
4.2.2 發(fā)送數(shù)據(jù)前的最后檢查---flush0
flush0 方法這里主要做的事情就是檢查當(dāng) channel 的狀態(tài)是否正常漆撞,如果 channel 狀態(tài)一切正常,則調(diào)用 doWrite 方法發(fā)送數(shù)據(jù)于宙。
protected abstract class AbstractUnsafe implements Unsafe {
//是否正在進(jìn)行flush操作
private boolean inFlush0;
protected void flush0() {
if (inFlush0) {
// Avoid re-entrance
return;
}
final ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
//channel已經(jīng)關(guān)閉或者outboundBuffer為空
if (outboundBuffer == null || outboundBuffer.isEmpty()) {
return;
}
inFlush0 = true;
if (!isActive()) {
try {
if (!outboundBuffer.isEmpty()) {
if (isOpen()) {
//當(dāng)前channel處于disConnected狀態(tài) 通知promise 寫入失敗 并觸發(fā)channelWritabilityChanged事件
outboundBuffer.failFlushed(new NotYetConnectedException(), true);
} else {
//當(dāng)前channel處于關(guān)閉狀態(tài) 通知promise 寫入失敗 但不觸發(fā)channelWritabilityChanged事件
outboundBuffer.failFlushed(newClosedChannelException(initialCloseCause, "flush0()"), false);
}
}
} finally {
inFlush0 = false;
}
return;
}
try {
//寫入Socket
doWrite(outboundBuffer);
} catch (Throwable t) {
handleWriteError(t);
} finally {
inFlush0 = false;
}
}
}
outboundBuffer == null || outboundBuffer.isEmpty()
:如果 channel 已經(jīng)關(guān)閉了或者對(duì)應(yīng)寫緩沖區(qū)中沒有任何數(shù)據(jù)浮驳,那么就停止發(fā)送流程,直接 return捞魁。!isActive()
:如果當(dāng)前channel處于非活躍狀態(tài)至会,則需要調(diào)用outboundBuffer#failFlushed
通知 ChannelOutboundBuffer 中所有待發(fā)送操作對(duì)應(yīng)的 channelPromise 向用戶線程報(bào)告發(fā)送失敗。并將待發(fā)送數(shù)據(jù) Entry 對(duì)象從 ChannelOutboundBuffer 中刪除谱俭,并釋放待發(fā)送數(shù)據(jù)空間奉件,回收 Entry 對(duì)象實(shí)例。
還記得我們?cè)?a target="_blank">《Netty如何高效接收網(wǎng)絡(luò)連接》一文中提到過的 NioSocketChannel 的 active 狀態(tài)有哪些條件嗎昆著?县貌?
@Override
public boolean isActive() {
SocketChannel ch = javaChannel();
return ch.isOpen() && ch.isConnected();
}
NioSocketChannel 處于 active 狀態(tài)的條件必須是當(dāng)前 NioSocketChannel 是 open 的同時(shí)處于 connected 狀態(tài)。
-
!isActive() && isOpen()
:說明當(dāng)前 channel 處于 disConnected 狀態(tài)凑懂,這時(shí)通知給用戶 channelPromise 的異常類型為 NotYetConnectedException ,并釋放所有待發(fā)送數(shù)據(jù)占用的堆外內(nèi)存煤痕,如果此時(shí)內(nèi)存占用量低于低水位線,則設(shè)置 channel 為可寫狀態(tài)接谨,并觸發(fā) channelWritabilityChanged 事件摆碉。
當(dāng) channel 處于 disConnected 狀態(tài)時(shí),用戶可以進(jìn)行 write 操作但不能進(jìn)行 flush 操作脓豪。
!isActive() && !isOpen()
:說明當(dāng)前 channel 處于關(guān)閉狀態(tài)巷帝,這時(shí)通知給用戶 channelPromise 的異常類型為 newClosedChannelException ,因?yàn)?channel 已經(jīng)關(guān)閉扫夜,所以這里并不會(huì)觸發(fā) channelWritabilityChanged 事件楞泼。當(dāng) channel 的這些異常狀態(tài)校驗(yàn)通過之后,則調(diào)用 doWrite 方法將 ChannelOutboundBuffer 中的待發(fā)送數(shù)據(jù)寫進(jìn)底層 Socket 中历谍。
4.2.2.1 ChannelOutboundBuffer#failFlushed
public final class ChannelOutboundBuffer {
private boolean inFail;
void failFlushed(Throwable cause, boolean notify) {
if (inFail) {
return;
}
try {
inFail = true;
for (;;) {
if (!remove0(cause, notify)) {
break;
}
}
} finally {
inFail = false;
}
}
}
該方法用于在 Netty 在發(fā)送數(shù)據(jù)的時(shí)候现拒,如果發(fā)現(xiàn)當(dāng)前 channel 處于非活躍狀態(tài),則將 ChannelOutboundBuffer 中 flushedEntry 與tailEntry 之間的 Entry 對(duì)象節(jié)點(diǎn)全部刪除望侈,并釋放發(fā)送數(shù)據(jù)占用的內(nèi)存空間印蔬,同時(shí)回收 Entry 對(duì)象實(shí)例。
4.2.2.2 ChannelOutboundBuffer#remove0
private boolean remove0(Throwable cause, boolean notifyWritability) {
Entry e = flushedEntry;
if (e == null) {
//清空當(dāng)前reactor線程緩存的所有待發(fā)送數(shù)據(jù)
clearNioBuffers();
return false;
}
Object msg = e.msg;
ChannelPromise promise = e.promise;
int size = e.pendingSize;
//從channelOutboundBuffer中刪除該Entry節(jié)點(diǎn)
removeEntry(e);
if (!e.cancelled) {
// only release message, fail and decrement if it was not canceled before.
//釋放msg所占用的內(nèi)存空間
ReferenceCountUtil.safeRelease(msg);
//編輯promise發(fā)送失敗脱衙,并通知相應(yīng)的Lisener
safeFail(promise, cause);
//由于msg得到釋放侥猬,所以需要降低channelOutboundBuffer中的內(nèi)存占用水位線,并根據(jù)notifyWritability決定是否觸發(fā)ChannelWritabilityChanged事件
decrementPendingOutboundBytes(size, false, notifyWritability);
}
// recycle the entry
//回收Entry實(shí)例對(duì)象
e.recycle();
return true;
}
當(dāng)一個(gè) Entry 節(jié)點(diǎn)需要從 ChannelOutboundBuffer 中清除時(shí)捐韩,Netty 需要釋放該 Entry 節(jié)點(diǎn)中包裹的發(fā)送數(shù)據(jù) msg 所占用的內(nèi)存空間退唠。并標(biāo)記對(duì)應(yīng)的 promise 為失敗同時(shí)通知對(duì)應(yīng)的 listener ,由于 msg 得到釋放荤胁,所以需要降低 channelOutboundBuffer 中的內(nèi)存占用水位線瞧预,并根據(jù) boolean notifyWritability
決定是否觸發(fā) ChannelWritabilityChanged 事件。最后需要將該 Entry 實(shí)例回收至 Recycler 對(duì)象池中。
5. 終于開始真正地發(fā)送數(shù)據(jù)了垢油!
來到這里我們就真正進(jìn)入到了 Netty 發(fā)送數(shù)據(jù)的核心處理邏輯盆驹,在《Netty如何高效接收網(wǎng)絡(luò)數(shù)據(jù)》一文中,筆者詳細(xì)介紹了 Netty 讀取數(shù)據(jù)的核心流程滩愁,Netty 會(huì)在一個(gè) read loop 中不斷循環(huán)讀取 Socket 中的數(shù)據(jù)直到數(shù)據(jù)讀取完畢或者讀取次數(shù)已滿 16 次躯喇,當(dāng)循環(huán)讀取了 16 次還沒有讀取完畢時(shí),Netty 就不能在繼續(xù)讀了硝枉,因?yàn)?Netty 要保證 Reactor 線程可以均勻的處理注冊(cè)在它上邊的所有 Channel 中的 IO 事件廉丽。剩下未讀取的數(shù)據(jù)等到下一次 read loop 在開始讀取。
除此之外妻味,在每次 read loop 開始之前正压,Netty 都會(huì)分配一個(gè)初始化大小為 2048 的 DirectByteBuffer 來裝載從 Socket 中讀取到的數(shù)據(jù),當(dāng)整個(gè) read loop 結(jié)束時(shí)责球,會(huì)根據(jù)本次讀取數(shù)據(jù)的總量來判斷是否為該 DirectByteBuffer 進(jìn)行擴(kuò)容或者縮容蔑匣,目的是在下一次 read loop 的時(shí)候可以為其分配一個(gè)容量大小合適的 DirectByteBuffer 。
其實(shí) Netty 對(duì)發(fā)送數(shù)據(jù)的處理和對(duì)讀取數(shù)據(jù)的處理核心邏輯都是一樣的棕诵,這里大家可以將這兩篇文章結(jié)合對(duì)比著看裁良。
但發(fā)送數(shù)據(jù)的細(xì)節(jié)會(huì)多一些,也會(huì)更復(fù)雜一些校套,由于這塊邏輯整體稍微比較復(fù)雜价脾,所以我們接下來還是分模塊進(jìn)行解析:
5.1 發(fā)送數(shù)據(jù)前的準(zhǔn)備工作
@Override
protected void doWrite(ChannelOutboundBuffer in) throws Exception {
//獲取NioSocketChannel中封裝的jdk nio底層socketChannel
SocketChannel ch = javaChannel();
//最大寫入次數(shù) 默認(rèn)為16 目的是為了保證SubReactor可以平均的處理注冊(cè)其上的所有Channel
int writeSpinCount = config().getWriteSpinCount();
do {
if (in.isEmpty()) {
// 如果全部數(shù)據(jù)已經(jīng)寫完 則移除OP_WRITE事件并直接退出writeLoop
clearOpWrite();
return;
}
// SO_SNDBUF設(shè)置的發(fā)送緩沖區(qū)大小 * 2 作為 最大寫入字節(jié)數(shù) 293976 = 146988 << 1
int maxBytesPerGatheringWrite = ((NioSocketChannelConfig) config).getMaxBytesPerGatheringWrite();
// 將ChannelOutboundBuffer中緩存的DirectBuffer轉(zhuǎn)換成JDK NIO 的 ByteBuffer
ByteBuffer[] nioBuffers = in.nioBuffers(1024, maxBytesPerGatheringWrite);
// ChannelOutboundBuffer中總共的DirectBuffer數(shù)
int nioBufferCnt = in.nioBufferCount();
switch (nioBufferCnt) {
.........向底層jdk nio socketChannel發(fā)送數(shù)據(jù).........
}
} while (writeSpinCount > 0);
............處理本輪write loop未寫完的情況.......
}
這部分內(nèi)容為 Netty 開始發(fā)送數(shù)據(jù)之前的準(zhǔn)備工作:
5.1.1 獲取write loop最大發(fā)送循環(huán)次數(shù)
從當(dāng)前 NioSocketChannel 的配置類 NioSocketChannelConfig 中獲取 write loop 最大循環(huán)寫入次數(shù),默認(rèn)為 16笛匙。但也可以通過下面的方式進(jìn)行自定義設(shè)置侨把。
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.......
.childOption(ChannelOption.WRITE_SPIN_COUNT,自定義數(shù)值)
5.1.2 處理在一輪write loop中就發(fā)送完數(shù)據(jù)的情況
進(jìn)入 write loop 之后首先需要判斷當(dāng)前 ChannelOutboundBuffer 中的數(shù)據(jù)是否已經(jīng)寫完了 in.isEmpty())
,如果全部寫完就需要清除當(dāng)前 Channel 在 Reactor 上注冊(cè)的 OP_WRITE 事件妹孙。
這里大家可能會(huì)有疑問秋柄,目前我們還沒有注冊(cè) OP_WRITE 事件到 Reactor 上,為啥要清除呢蠢正? 別著急骇笔,筆者會(huì)在后面為大家揭曉答案。
5.1.3 獲取本次write loop 最大允許發(fā)送字節(jié)數(shù)
從 ChannelConfig 中獲取本次 write loop 最大允許發(fā)送的字節(jié)數(shù)
maxBytesPerGatheringWrite 嚣崭。初始值為 SO_SNDBUF大小 * 2 = 293976 = 146988 << 1
笨触,最小值為 2048。
private final class NioSocketChannelConfig extends DefaultSocketChannelConfig {
//293976 = 146988 << 1
//SO_SNDBUF設(shè)置的發(fā)送緩沖區(qū)大小 * 2 作為 最大寫入字節(jié)數(shù)
//最小值為2048
private volatile int maxBytesPerGatheringWrite = Integer.MAX_VALUE;
private NioSocketChannelConfig(NioSocketChannel channel, Socket javaSocket) {
super(channel, javaSocket);
calculateMaxBytesPerGatheringWrite();
}
private void calculateMaxBytesPerGatheringWrite() {
// 293976 = 146988 << 1
// SO_SNDBUF設(shè)置的發(fā)送緩沖區(qū)大小 * 2 作為 最大寫入字節(jié)數(shù)
int newSendBufferSize = getSendBufferSize() << 1;
if (newSendBufferSize > 0) {
setMaxBytesPerGatheringWrite(newSendBufferSize);
}
}
}
我們可以通過如下的方式自定義配置 Socket 發(fā)送緩沖區(qū)大小雹舀。
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.......
.childOption(ChannelOption.SO_SNDBUF,自定義數(shù)值)
5.1.4 將待發(fā)送數(shù)據(jù)轉(zhuǎn)換成 JDK NIO ByteBuffer
由于最終 Netty 會(huì)調(diào)用 JDK NIO 的 SocketChannel 發(fā)送數(shù)據(jù)芦劣,所以這里需要首先將當(dāng)前 Channel 中的寫緩沖隊(duì)列 ChannelOutboundBuffer 里存儲(chǔ)的 DirectByteBuffer( Netty 中的 ByteBuffer 實(shí)現(xiàn))轉(zhuǎn)換成 JDK NIO 的 ByteBuffer 類型。最終將轉(zhuǎn)換后的待發(fā)送數(shù)據(jù)存儲(chǔ)在 ByteBuffer[] nioBuffers 數(shù)組中说榆。這里通過調(diào)用 ChannelOutboundBuffer#nioBuffers
方法完成以上 ByteBuffer 類型的轉(zhuǎn)換虚吟。
maxBytesPerGatheringWrite
:表示本次 write loop 中最多從 ChannelOutboundBuffer 中轉(zhuǎn)換 maxBytesPerGatheringWrite 個(gè)字節(jié)出來寸认。也就是本次 write loop 最多能發(fā)送多少字節(jié)。1024
: 本次 write loop 最多轉(zhuǎn)換 1024 個(gè) ByteBuffer( JDK NIO 實(shí)現(xiàn))串慰。也就是說本次 write loop 最多批量發(fā)送多少個(gè) ByteBuffer 废麻。
通過 ChannelOutboundBuffer#nioBufferCount()
獲取本次 write loop 總共需要發(fā)送的 ByteBuffer 數(shù)量 nioBufferCnt 。注意這里已經(jīng)變成了 JDK NIO 實(shí)現(xiàn)的 ByteBuffer 了模庐。
詳細(xì)的 ByteBuffer 類型轉(zhuǎn)換過程,筆者會(huì)在專門講解 Buffer 設(shè)計(jì)的時(shí)候?yàn)榇蠹胰婕?xì)致地講解油宜,這里我們還是主要聚焦于發(fā)送數(shù)據(jù)流程的主線掂碱。
當(dāng)做完這些發(fā)送前的準(zhǔn)備工作之后,接下來 Netty 就開始向 JDK NIO SocketChannel 發(fā)送這些已經(jīng)轉(zhuǎn)換好的 JDK NIO ByteBuffer 了慎冤。
5.2 向JDK NIO SocketChannel發(fā)送數(shù)據(jù)
@Override
protected void doWrite(ChannelOutboundBuffer in) throws Exception {
SocketChannel ch = javaChannel();
int writeSpinCount = config().getWriteSpinCount();
do {
.........將待發(fā)送數(shù)據(jù)轉(zhuǎn)換到JDK NIO ByteBuffer中.........
//本次write loop中需要發(fā)送的 JDK ByteBuffer個(gè)數(shù)
int nioBufferCnt = in.nioBufferCount();
switch (nioBufferCnt) {
case 0:
//這里主要是針對(duì) 網(wǎng)絡(luò)傳輸文件數(shù)據(jù) 的處理 FileRegion
writeSpinCount -= doWrite0(in);
break;
case 1: {
.........處理單個(gè)NioByteBuffer發(fā)送的情況......
break;
}
default: {
.........批量處理多個(gè)NioByteBuffers發(fā)送的情況......
break;
}
}
} while (writeSpinCount > 0);
............處理本輪write loop未寫完的情況.......
}
這里大家可能對(duì) nioBufferCnt == 0
的情況比較有疑惑疼燥,明明之前已經(jīng)校驗(yàn)過ChannelOutboundBuffer 不為空了,為什么這里從 ChannelOutboundBuffer 中獲取到的 nioBuffer 個(gè)數(shù)依然為 0 呢蚁堤?
在前邊我們介紹 Netty 對(duì) write 事件的處理過程時(shí)提過醉者, ChannelOutboundBuffer 中只支持 ByteBuf 類型和 FileRegion 類型,其中 ByteBuf 類型用于裝載普通的發(fā)送數(shù)據(jù)披诗,而 FileRegion 類型用于通過零拷貝的方式網(wǎng)絡(luò)傳輸文件撬即。
而這里 ChannelOutboundBuffer 雖然不為空,但是裝載的 NioByteBuffer 個(gè)數(shù)卻為 0 說明 ChannelOutboundBuffer 中裝載的是 FileRegion 類型呈队,當(dāng)前正在進(jìn)行網(wǎng)絡(luò)文件的傳輸剥槐。
case 0
的分支主要就是用于處理網(wǎng)絡(luò)文件傳輸?shù)那闆r。
5.2.1 零拷貝發(fā)送網(wǎng)絡(luò)文件
protected final int doWrite0(ChannelOutboundBuffer in) throws Exception {
Object msg = in.current();
if (msg == null) {
return 0;
}
return doWriteInternal(in, in.current());
}
這里需要特別注意的是用于文件傳輸?shù)姆椒?doWriteInternal 中的返回值宪摧,理解這些返回值的具體情況有助于我們理解后面 write loop 的邏輯走向粒竖。
private int doWriteInternal(ChannelOutboundBuffer in, Object msg) throws Exception {
if (msg instanceof ByteBuf) {
..............忽略............
} else if (msg instanceof FileRegion) {
FileRegion region = (FileRegion) msg;
//文件已經(jīng)傳輸完畢
if (region.transferred() >= region.count()) {
in.remove();
return 0;
}
//零拷貝的方式傳輸文件
long localFlushedAmount = doWriteFileRegion(region);
if (localFlushedAmount > 0) {
in.progress(localFlushedAmount);
if (region.transferred() >= region.count()) {
in.remove();
}
return 1;
}
} else {
// Should not reach here.
throw new Error();
}
//走到這里表示 此時(shí)Socket已經(jīng)寫不進(jìn)去了 退出writeLoop,注冊(cè)O(shè)P_WRITE事件
return WRITE_STATUS_SNDBUF_FULL;
}
最終會(huì)在 doWriteFileRegion 方法中通過 FileChannel#transferTo
方法底層用到的系統(tǒng)調(diào)用為 sendFile
實(shí)現(xiàn)零拷貝網(wǎng)絡(luò)文件的傳輸几于。
public class NioSocketChannel extends AbstractNioByteChannel implements io.netty.channel.socket.SocketChannel {
@Override
protected long doWriteFileRegion(FileRegion region) throws Exception {
final long position = region.transferred();
return region.transferTo(javaChannel(), position);
}
}
關(guān)于 Netty 中涉及到的零拷貝蕊苗,筆者會(huì)有一篇專門的文章為大家講解沿彭,本文的主題我們還是先聚焦于把發(fā)送流程的主線打通朽砰。
我們繼續(xù)回到發(fā)送數(shù)據(jù)流程主線上來~~
case 0:
//這里主要是針對(duì) 網(wǎng)絡(luò)傳輸文件數(shù)據(jù) 的處理 FileRegion
writeSpinCount -= doWrite0(in);
break;
region.transferred() >= region.count()
:表示當(dāng)前 FileRegion 中的文件數(shù)據(jù)已經(jīng)傳輸完畢。那么在這種情況下本次 write loop 沒有寫入任何數(shù)據(jù)到 Socket 喉刘,所以返回 0 锅移,writeSpinCount - 0 意思就是本次 write loop 不算,繼續(xù)循環(huán)饱搏。localFlushedAmount > 0
:表示本 write loop 中寫入了一些數(shù)據(jù)到 Socket 中非剃,會(huì)有返回 1,writeSpinCount - 1 減少一次 write loop 次數(shù)推沸。localFlushedAmount <= 0
:表示當(dāng)前 Socket 發(fā)送緩沖區(qū)已滿备绽,無法寫入數(shù)據(jù)券坞,那么就返回WRITE_STATUS_SNDBUF_FULL = Integer.MAX_VALUE
。
writeSpinCount - Integer.MAX_VALUE
必然是負(fù)數(shù)肺素,直接退出循環(huán)恨锚,向 Reactor 注冊(cè) OP_WRITE 事件并退出 flush 流程。等 Socket 發(fā)送緩沖區(qū)可寫了倍靡,Reactor 會(huì)通知 channel 繼續(xù)發(fā)送文件數(shù)據(jù)猴伶。記住這里,我們后面還會(huì)提到塌西。
5.2.2 發(fā)送普通數(shù)據(jù)
剩下兩個(gè) case 1 和 default 分支主要就是處理 ByteBuffer 裝載的普通數(shù)據(jù)發(fā)送邏輯他挎。
其中 case 1 表示當(dāng)前 Channel 的 ChannelOutboundBuffer 中只包含了一個(gè) NioByteBuffer 的情況。
default 表示當(dāng)前 Channel 的 ChannelOutboundBuffer 中包含了多個(gè) NioByteBuffers 的情況捡需。
@Override
protected void doWrite(ChannelOutboundBuffer in) throws Exception {
SocketChannel ch = javaChannel();
int writeSpinCount = config().getWriteSpinCount();
do {
.........將待發(fā)送數(shù)據(jù)轉(zhuǎn)換到JDK NIO ByteBuffer中.........
//本次write loop中需要發(fā)送的 JDK ByteBuffer個(gè)數(shù)
int nioBufferCnt = in.nioBufferCount();
switch (nioBufferCnt) {
case 0:
..........處理網(wǎng)絡(luò)文件傳輸.........
case 1: {
ByteBuffer buffer = nioBuffers[0];
int attemptedBytes = buffer.remaining();
final int localWrittenBytes = ch.write(buffer);
if (localWrittenBytes <= 0) {
//如果當(dāng)前Socket發(fā)送緩沖區(qū)滿了寫不進(jìn)去了办桨,則注冊(cè)O(shè)P_WRITE事件,等待Socket發(fā)送緩沖區(qū)可寫時(shí) 在寫
// SubReactor在處理OP_WRITE事件時(shí)站辉,直接調(diào)用flush方法
incompleteWrite(true);
return;
}
//根據(jù)當(dāng)前實(shí)際寫入情況調(diào)整 maxBytesPerGatheringWrite數(shù)值
adjustMaxBytesPerGatheringWrite(attemptedBytes, localWrittenBytes, maxBytesPerGatheringWrite);
//如果ChannelOutboundBuffer中的某個(gè)Entry被全部寫入 則刪除該Entry
// 如果Entry被寫入了一部分 還有一部分未寫入 則更新Entry中的readIndex 等待下次writeLoop繼續(xù)寫入
in.removeBytes(localWrittenBytes);
--writeSpinCount;
break;
}
default: {
// ChannelOutboundBuffer中總共待寫入數(shù)據(jù)的字節(jié)數(shù)
long attemptedBytes = in.nioBufferSize();
//批量寫入
final long localWrittenBytes = ch.write(nioBuffers, 0, nioBufferCnt);
if (localWrittenBytes <= 0) {
incompleteWrite(true);
return;
}
//根據(jù)實(shí)際寫入情況調(diào)整一次寫入數(shù)據(jù)大小的最大值
// maxBytesPerGatheringWrite決定每次可以從channelOutboundBuffer中獲取多少發(fā)送數(shù)據(jù)
adjustMaxBytesPerGatheringWrite((int) attemptedBytes, (int) localWrittenBytes,
maxBytesPerGatheringWrite);
//移除全部寫完的BUffer呢撞,如果只寫了部分?jǐn)?shù)據(jù)則更新buffer的readerIndex,下一個(gè)writeLoop寫入
in.removeBytes(localWrittenBytes);
--writeSpinCount;
break;
}
}
} while (writeSpinCount > 0);
............處理本輪write loop未寫完的情況.......
}
case 1 和 default 這兩個(gè)分支在處理發(fā)送數(shù)據(jù)時(shí)的邏輯是一樣的饰剥,唯一的區(qū)別就是 case 1 是處理單個(gè) NioByteBuffer 的發(fā)送殊霞,而 default 分支是批量處理多個(gè) NioByteBuffers 的發(fā)送。
下面筆者就以經(jīng)常被觸發(fā)到的 default 分支為例來為大家講述 Netty 在處理數(shù)據(jù)發(fā)送時(shí)的邏輯細(xì)節(jié):
首先從當(dāng)前 NioSocketChannel 中的 ChannelOutboundBuffer 中獲取本次 write loop 需要發(fā)送的字節(jié)總量 attemptedBytes 汰蓉。這個(gè) nioBufferSize 是在前邊介紹
ChannelOutboundBuffer#nioBuffers
方法轉(zhuǎn)換 JDK NIO ByteBuffer 類型時(shí)被計(jì)算出來的脓鹃。調(diào)用 JDK NIO 原生 SocketChannel 批量發(fā)送 nioBuffers 中的數(shù)據(jù)。并獲取到本次 write loop 一共批量發(fā)送了多少字節(jié) localWrittenBytes 古沥。
/**
* @throws NotYetConnectedException
* If this channel is not yet connected
*/
public abstract long write(ByteBuffer[] srcs, int offset, int length)
throws IOException;
-
localWrittenBytes <= 0
表示當(dāng)前 Socket 的寫緩存區(qū) SEND_BUF 已滿瘸右,寫不進(jìn)數(shù)據(jù)了。那么就需要向當(dāng)前 NioSocketChannel 對(duì)應(yīng)的 Reactor 注冊(cè) OP_WRITE 事件岩齿,并停止當(dāng)前 flush 流程太颤。當(dāng) Socket 的寫緩沖區(qū)有容量可寫時(shí),epoll 會(huì)通知 reactor 線程繼續(xù)寫入盹沈。
protected final void incompleteWrite(boolean setOpWrite) {
// Did not write completely.
if (setOpWrite) {
//這里處理還沒寫滿16次 但是socket緩沖區(qū)已滿寫不進(jìn)去的情況 注冊(cè)write事件
//什么時(shí)候socket可寫了龄章, epoll會(huì)通知reactor線程繼續(xù)寫
setOpWrite();
} else {
...........目前還不需要關(guān)注這里.......
}
}
向 Reactor 注冊(cè) OP_WRITE 事件:
protected final void setOpWrite() {
final SelectionKey key = selectionKey();
if (!key.isValid()) {
return;
}
final int interestOps = key.interestOps();
if ((interestOps & SelectionKey.OP_WRITE) == 0) {
key.interestOps(interestOps | SelectionKey.OP_WRITE);
}
}
關(guān)于通過位運(yùn)算來向 IO 事件集合 interestOps 添加監(jiān)聽 IO 事件的用法,在前邊的文章中乞封,筆者已經(jīng)多次介紹過了做裙,這里不再重復(fù)。
- 根據(jù)本次 write loop 向 Socket 寫緩沖區(qū)寫入數(shù)據(jù)的情況肃晚,來調(diào)整下次 write loop 最大寫入字節(jié)數(shù)锚贱。maxBytesPerGatheringWrite 決定每次 write loop 可以從 channelOutboundBuffer 中最多獲取多少發(fā)送數(shù)據(jù)。初始值為
SO_SNDBUF大小 * 2 = 293976 = 146988 << 1
关串,最小值為 2048拧廊。
public static final int MAX_BYTES_PER_GATHERING_WRITE_ATTEMPTED_LOW_THRESHOLD = 4096;
private void adjustMaxBytesPerGatheringWrite(int attempted, int written, int oldMaxBytesPerGatheringWrite) {
if (attempted == written) {
if (attempted << 1 > oldMaxBytesPerGatheringWrite) {
((NioSocketChannelConfig) config).setMaxBytesPerGatheringWrite(attempted << 1);
}
} else if (attempted > MAX_BYTES_PER_GATHERING_WRITE_ATTEMPTED_LOW_THRESHOLD && written < attempted >>> 1) {
((NioSocketChannelConfig) config).setMaxBytesPerGatheringWrite(attempted >>> 1);
}
}
由于操作系統(tǒng)會(huì)動(dòng)態(tài)調(diào)整 SO_SNDBUF 的大小监徘,所以這里 netty 也需要根據(jù)操作系統(tǒng)的動(dòng)態(tài)調(diào)整做出相應(yīng)的調(diào)整,目的是盡量多的去寫入數(shù)據(jù)吧碾。
attempted == written
表示本次 write loop 嘗試寫入的數(shù)據(jù)能全部寫入到 Socket 的寫緩沖區(qū)中凰盔,那么下次 write loop 就應(yīng)該嘗試去寫入更多的數(shù)據(jù)。
那么這里的更多具體是多少呢倦春?
Netty 會(huì)將本次寫入的數(shù)據(jù)量 written 擴(kuò)大兩倍户敬,如果擴(kuò)大兩倍后的寫入量大于本次 write loop 的最大限制寫入量 maxBytesPerGatheringWrite,說明用戶的寫入需求很猛烈睁本,Netty當(dāng)然要滿足這樣的猛烈需求尿庐,那么就將當(dāng)前 NioSocketChannelConfig 中的 maxBytesPerGatheringWrite 更新為本次 write loop 兩倍的寫入量大小。
在下次 write loop 寫入數(shù)據(jù)的時(shí)候添履,就會(huì)嘗試從 ChannelOutboundBuffer 中加載最多 written * 2
大小的字節(jié)數(shù)。
如果擴(kuò)大兩倍后的寫入量依然小于等于本次 write loop 的最大限制寫入量 maxBytesPerGatheringWrite脑又,說明用戶的寫入需求還不是很猛烈暮胧,Netty 繼續(xù)維持本次 maxBytesPerGatheringWrite 數(shù)值不變。
如果本次寫入的數(shù)據(jù)還不及嘗試寫入數(shù)據(jù)的 1 / 2 :written < attempted >>> 1
问麸。說明當(dāng)前 Socket 寫緩沖區(qū)的可寫容量不是很多了往衷,下一次 write loop 就不要寫這么多了嘗試減少下次寫入的量將下次 write loop 要寫入的數(shù)據(jù)減小為 attempted 的1 / 2。當(dāng)然也不能無限制的減小严卖,最小值不能低于 2048席舍。
這里可以結(jié)合筆者前邊的文章《一文聊透ByteBuffer動(dòng)態(tài)自適應(yīng)擴(kuò)縮容機(jī)制》中介紹到的 read loop 場(chǎng)景中的擴(kuò)縮容一起對(duì)比著看。
read loop 中的擴(kuò)縮容觸發(fā)時(shí)機(jī)是在一個(gè)完整的 read loop 結(jié)束時(shí)候觸發(fā)哮笆。而 write loop 中擴(kuò)縮容的觸發(fā)時(shí)機(jī)是在每次 write loop 發(fā)送完數(shù)據(jù)后来颤,立即觸發(fā)擴(kuò)縮容判斷。
- 當(dāng)本次 write loop 批量發(fā)送完 ChannelOutboundBuffer 中的數(shù)據(jù)之后稠肘,最后調(diào)用
in.removeBytes(localWrittenBytes)
從 ChannelOutboundBuffer 中移除全部寫完的 Entry 福铅,如果只發(fā)送了 Entry 的部分?jǐn)?shù)據(jù)則更新 Entry 對(duì)象中封裝的 DirectByteBuffer 的 readerIndex,等待下一次 write loop 寫入项阴。
到這里滑黔,write loop 中的發(fā)送數(shù)據(jù)的邏輯就介紹完了,接下來 Netty 會(huì)在 write loop 中循環(huán)地發(fā)送數(shù)據(jù)直到寫滿 16 次或者數(shù)據(jù)發(fā)送完畢环揽。
還有一種退出 write loop 的情況就是當(dāng) Socket 中的寫緩沖區(qū)滿了略荡,無法在寫入時(shí)。Netty 會(huì)退出 write loop 并向 reactor 注冊(cè) OP_WRITE 事件歉胶。
但這其中還隱藏著一種情況就是如果 write loop 已經(jīng)寫滿 16 次但還沒寫完數(shù)據(jù)并且此時(shí) Socket 寫緩沖區(qū)還沒有滿汛兜,還可以繼續(xù)在寫。那 Netty 會(huì)如何處理這種情況呢通今?
6. 處理Socket可寫但已經(jīng)寫滿16次還沒寫完的情況
@Override
protected void doWrite(ChannelOutboundBuffer in) throws Exception {
SocketChannel ch = javaChannel();
int writeSpinCount = config().getWriteSpinCount();
do {
.........將待發(fā)送數(shù)據(jù)轉(zhuǎn)換到JDK NIO ByteBuffer中.........
int nioBufferCnt = in.nioBufferCount();
switch (nioBufferCnt) {
case 0:
//這里主要是針對(duì) 網(wǎng)絡(luò)傳輸文件數(shù)據(jù) 的處理 FileRegion
writeSpinCount -= doWrite0(in);
break;
case 1: {
.....發(fā)送單個(gè)nioBuffer....
}
default: {
.....批量發(fā)送多個(gè)nioBuffers......
}
}
} while (writeSpinCount > 0);
//處理write loop結(jié)束 但數(shù)據(jù)還沒寫完的情況
incompleteWrite(writeSpinCount < 0);
}
當(dāng) write loop 結(jié)束后序无,這時(shí) writeSpinCount 的值會(huì)有兩種情況:
-
writeSpinCount < 0
:這種情況有點(diǎn)不好理解验毡,我們?cè)诮榻B Netty 通過零拷貝的方式傳輸網(wǎng)絡(luò)文件也就是這里的 case 0 分支邏輯時(shí),詳細(xì)介紹了 doWrite0 方法的幾種返回值帝嗡,當(dāng) Netty 在傳輸文件的過程中發(fā)現(xiàn) Socket 緩沖區(qū)已滿無法在繼續(xù)寫入數(shù)據(jù)時(shí)晶通,會(huì)返回WRITE_STATUS_SNDBUF_FULL = Integer.MAX_VALUE
,這就使得writeSpinCount的值 < 0
哟玷。隨后 break 掉 write loop 來到incompleteWrite(writeSpinCount < 0)
方法中狮辽,最后會(huì)在 incompleteWrite 方法中向 reactor 注冊(cè) OP_WRITE 事件。當(dāng) Socket 緩沖區(qū)變得可寫時(shí)巢寡,epoll 會(huì)通知 reactor 線程繼續(xù)發(fā)送文件喉脖。
protected final void incompleteWrite(boolean setOpWrite) {
// Did not write completely.
if (setOpWrite) {
//這里處理還沒寫滿16次 但是socket緩沖區(qū)已滿寫不進(jìn)去的情況 注冊(cè)write事件
// 什么時(shí)候socket可寫了, epoll會(huì)通知reactor線程繼續(xù)寫
setOpWrite();
} else {
..............
}
}
-
writeSpinCount == 0
: 這種情況很好理解抑月,就是已經(jīng)寫滿了 16 次树叽,但是還沒寫完,同時(shí) Socket 的寫緩沖區(qū)未滿谦絮,還可以繼續(xù)寫入题诵。這種情況下即使 Socket 還可以繼續(xù)寫入,Netty 也不會(huì)再去寫了层皱,因?yàn)閳?zhí)行 flush 操作的是 reactor 線程性锭,而 reactor 線程負(fù)責(zé)執(zhí)行注冊(cè)在它上邊的所有 channel 的 IO 操作,Netty 不會(huì)允許 reactor 線程一直在一個(gè) channel 上執(zhí)行 IO 操作叫胖,reactor 線程的執(zhí)行時(shí)間需要均勻的分配到每個(gè) channel 上草冈。所以這里 Netty 會(huì)停下,轉(zhuǎn)而去處理其他 channel 上的 IO 事件瓮增。
那么還沒寫完的數(shù)據(jù)怎棱,Netty會(huì)如何處理呢?
protected final void incompleteWrite(boolean setOpWrite) {
// Did not write completely.
if (setOpWrite) {
//這里處理還沒寫滿16次 但是socket緩沖區(qū)已滿寫不進(jìn)去的情況 注冊(cè)write事件
// 什么時(shí)候socket可寫了绷跑, epoll會(huì)通知reactor線程繼續(xù)寫
setOpWrite();
} else {
//這里處理的是socket緩沖區(qū)依然可寫蹄殃,但是寫了16次還沒寫完,這時(shí)就不能在寫了你踩,reactor線程需要處理其他channel上的io事件
//因?yàn)榇藭r(shí)socket是可寫的诅岩,必須清除op_write事件,否則會(huì)一直不停地被通知
clearOpWrite();
//如果本次writeLoop還沒寫完带膜,則提交flushTask到reactor
eventLoop().execute(flushTask);
}
這個(gè)方法的 if 分支邏輯吩谦,我們?cè)诮榻Bdo {.....}while()
循環(huán)體 write loop 中發(fā)送邏輯時(shí)已經(jīng)提過,在 write loop 循環(huán)發(fā)送數(shù)據(jù)的過程中膝藕,如果發(fā)現(xiàn) Socket 緩沖區(qū)已滿式廷,無法寫入數(shù)據(jù)時(shí)( localWrittenBytes <= 0),則需要向 reactor 注冊(cè) OP_WRITE 事件芭挽,等到 Socket 緩沖區(qū)變?yōu)榭蓪憼顟B(tài)時(shí)滑废,epoll 會(huì)通知 reactor 線程繼續(xù)寫入剩下的數(shù)據(jù)蝗肪。
do {
.........將待發(fā)送數(shù)據(jù)轉(zhuǎn)換到JDK NIO ByteBuffer中.........
int nioBufferCnt = in.nioBufferCount();
switch (nioBufferCnt) {
case 0:
writeSpinCount -= doWrite0(in);
break;
case 1: {
.....發(fā)送單個(gè)nioBuffer....
final int localWrittenBytes = ch.write(buffer);
if (localWrittenBytes <= 0) {
incompleteWrite(true);
return;
}
.................省略..............
break;
}
default: {
.....批量發(fā)送多個(gè)nioBuffers......
final long localWrittenBytes = ch.write(nioBuffers, 0, nioBufferCnt);
if (localWrittenBytes <= 0) {
incompleteWrite(true);
return;
}
.................省略..............
break;
}
}
} while (writeSpinCount > 0);
注意 if 分支處理的情況是還沒寫滿 16 次,但是 Socket 緩沖區(qū)已滿蠕趁,無法寫入的情況薛闪。
而 else 分支正是處理我們這里正在討論的情況即 Socket 緩沖區(qū)是可寫的,但是已經(jīng)寫滿 16 次俺陋,在本輪 write loop 中不能再繼續(xù)寫入的情況豁延。
這時(shí) Netty 會(huì)將 channel 中剩下的待寫數(shù)據(jù)的 flush 操作封裝程 flushTask,丟進(jìn) reactor 的普通任務(wù)隊(duì)列中腊状,等待 reactor 執(zhí)行完其他 channel 上的 io 操作后在回過頭來執(zhí)行未寫完的 flush 任務(wù)诱咏。
忘記 Reactor 整體運(yùn)行邏輯的同學(xué),可以在回看下筆者的這篇文章《一文聊透Netty核心引擎Reactor的運(yùn)轉(zhuǎn)架構(gòu)》
private final Runnable flushTask = new Runnable() {
@Override
public void run() {
((AbstractNioUnsafe) unsafe()).flush0();
}
};
這里我們看到 flushTask 中的任務(wù)是直接再次調(diào)用 flush0 繼續(xù)回到發(fā)送數(shù)據(jù)的邏輯流程中缴挖。
細(xì)心的同學(xué)可能會(huì)有疑問袋狞,為什么這里不在繼續(xù)注冊(cè) OP_WRITE 事件而是通過向 reactor 提交一個(gè) flushTask 來完成 channel 中剩下數(shù)據(jù)的寫入呢?
原因是這里我們講的 else 分支是用來處理 Socket 緩沖區(qū)未滿還是可寫的映屋,但是由于用戶本次要發(fā)送的數(shù)據(jù)太多苟鸯,導(dǎo)致寫了 16 次還沒寫完的情形。
既然當(dāng)前 Socket 緩沖區(qū)是可寫的秧荆,我們就不能注冊(cè) OP_WRITE 事件倔毙,否則這里一直會(huì)不停地收到 epoll 的通知埃仪。因?yàn)?JDK NIO Selector 默認(rèn)的是 epoll 的水平觸發(fā)乙濒。
忘記水平觸發(fā)和邊緣觸發(fā)這兩種 epoll 工作模式的同學(xué),可以在回看下筆者的這篇文章《聊聊Netty那些事兒之從內(nèi)核角度看IO模型》
所以這里只能向 reactor 提交 flushTask 來繼續(xù)完成剩下數(shù)據(jù)的寫入卵蛉,而不能注冊(cè) OP_WRITE 事件颁股。
注意:只有當(dāng) Socket 緩沖區(qū)已滿導(dǎo)致無法寫入時(shí),Netty 才會(huì)去注冊(cè) OP_WRITE 事件傻丝。這和我們之前介紹的 OP_ACCEPT 事件和 OP_READ 事件的注冊(cè)時(shí)機(jī)是不同的甘有。
這里大家可能還會(huì)有另一個(gè)疑問,就是為什么在向 reactor 提交 flushTask 之前需要清理 OP_WRITE 事件呢葡缰? 我們并沒有注冊(cè) OP_WRITE 事件呀~~
protected final void incompleteWrite(boolean setOpWrite) {
if (setOpWrite) {
......省略......
} else {
clearOpWrite();
eventLoop().execute(flushTask);
}
在為大家解答這個(gè)疑問之前亏掀,筆者先為大家介紹下 Netty 是如何處理 OP_WRITE 事件的,當(dāng)大家明白了 OP_WRITE 事件的處理邏輯后泛释,這個(gè)疑問就自然解開了滤愕。
7. OP_WRITE事件的處理
在《一文聊透Netty核心引擎Reactor的運(yùn)轉(zhuǎn)架構(gòu)》一文中,我們介紹過怜校,當(dāng) Reactor 監(jiān)聽到 channel 上有 IO 事件發(fā)生后间影,最終會(huì)在 processSelectedKey 方法中處理 channel 上的 IO 事件,其中 OP_ACCEPT 事件和 OP_READ 事件的處理過程茄茁,筆者已經(jīng)在之前的系列文章中介紹過了魂贬,這里我們聚焦于 OP_WRITE 事件的處理巩割。
public final class NioEventLoop extends SingleThreadEventLoop {
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
.............省略.......
try {
int readyOps = k.readyOps();
if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
......處理connect事件......
}
if ((readyOps & SelectionKey.OP_WRITE) != 0) {
ch.unsafe().forceFlush();
}
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
........處理accept和read事件.........
}
} catch (CancelledKeyException ignored) {
unsafe.close(unsafe.voidPromise());
}
}
}
這里我們看到當(dāng) OP_WRITE 事件發(fā)生后,Netty 直接調(diào)用 channel 的 forceFlush 方法付燥。
@Override
public final void forceFlush() {
// directly call super.flush0() to force a flush now
super.flush0();
}
其實(shí) forceFlush 方法中并沒有什么特殊的邏輯宣谈,直接調(diào)用 flush0 方法再次發(fā)起 flush 操作繼續(xù) channel 中剩下數(shù)據(jù)的寫入。
@Override
protected void doWrite(ChannelOutboundBuffer in) throws Exception {
SocketChannel ch = javaChannel();
int writeSpinCount = config().getWriteSpinCount();
do {
if (in.isEmpty()) {
clearOpWrite();
return;
}
.........將待發(fā)送數(shù)據(jù)轉(zhuǎn)換到JDK NIO ByteBuffer中.........
int nioBufferCnt = in.nioBufferCount();
switch (nioBufferCnt) {
case 0:
......傳輸網(wǎng)絡(luò)文件........
case 1: {
.....發(fā)送單個(gè)nioBuffer....
}
default: {
.....批量發(fā)送多個(gè)nioBuffers......
}
}
} while (writeSpinCount > 0);
//處理write loop結(jié)束 但數(shù)據(jù)還沒寫完的情況
incompleteWrite(writeSpinCount < 0);
}
注意這里的 clearOpWrite() 方法机蔗,由于 channel 上的 OP_WRITE 事件就緒蒲祈,表明此時(shí) Socket 緩沖區(qū)變?yōu)榭蓪憼顟B(tài),從而 Reactor 線程再次來到了 flush 流程中萝嘁。
當(dāng) ChannelOutboundBuffer 中的數(shù)據(jù)全部寫完后 in.isEmpty() 梆掸,就需要清理 OP_WRITE 事件,因?yàn)榇藭r(shí) Socket 緩沖區(qū)是可寫的牙言,這種情況下當(dāng)數(shù)據(jù)全部寫完后酸钦,就需要取消對(duì) OP_WRITE 事件的監(jiān)聽,否則 epoll 會(huì)不斷的通知 Reactor咱枉。
同理在 incompleteWrite 方法的 else 分支也需要執(zhí)行 clearOpWrite() 方法取消對(duì) OP_WRITE 事件的監(jiān)聽卑硫。
protected final void incompleteWrite(boolean setOpWrite) {
if (setOpWrite) {
// 這里處理還沒寫滿16次 但是socket緩沖區(qū)已滿寫不進(jìn)去的情況 注冊(cè)write事件
// 什么時(shí)候socket可寫了, epoll會(huì)通知reactor線程繼續(xù)寫
setOpWrite();
} else {
// 必須清除OP_WRITE事件蚕断,此時(shí)Socket對(duì)應(yīng)的緩沖區(qū)依然是可寫的欢伏,只不過當(dāng)前channel寫夠了16次,被SubReactor限制了亿乳。
// 這樣SubReactor可以騰出手來處理其他channel上的IO事件硝拧。這里如果不清除OP_WRITE事件,則會(huì)一直被通知葛假。
clearOpWrite();
//如果本次writeLoop還沒寫完障陶,則提交flushTask到SubReactor
//釋放SubReactor讓其可以繼續(xù)處理其他Channel上的IO事件
eventLoop().execute(flushTask);
}
}
8. writeAndFlush
在我們講完了 write 事件和 flush 事件的處理過程之后,writeAndFlush 就變得很簡(jiǎn)單了聊训,它就是把 write 和 flush 流程結(jié)合起來抱究,先觸發(fā) write 事件然后在觸發(fā) flush 事件。
下面我們來看下 writeAndFlush 的具體邏輯處理:
public class EchoServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(final ChannelHandlerContext ctx, final Object msg) {
//此處的msg就是Netty在read loop中從NioSocketChannel中讀取到ByteBuffer
ctx.writeAndFlush(msg);
}
}
abstract class AbstractChannelHandlerContext implements ChannelHandlerContext, ResourceLeakHint {
@Override
public ChannelFuture writeAndFlush(Object msg) {
return writeAndFlush(msg, newPromise());
}
@Override
public ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) {
write(msg, true, promise);
return promise;
}
}
這里可以看到 writeAndFlush 方法的處理入口和 write 事件的處理入口是一樣的带斑。唯一不同的是入口處理函數(shù) write 方法的 boolean flush 入?yún)⒉煌乃拢?writeAndFlush 的處理中 flush = true。
private void write(Object msg, boolean flush, ChannelPromise promise) {
ObjectUtil.checkNotNull(msg, "msg");
................省略檢查promise的有效性...............
//flush = true 表示channelHandler中調(diào)用的是writeAndFlush方法勋磕,這里需要找到pipeline中覆蓋write或者flush方法的channelHandler
//flush = false 表示調(diào)用的是write方法妈候,只需要找到pipeline中覆蓋write方法的channelHandler
final AbstractChannelHandlerContext next = findContextOutbound(flush ?
(MASK_WRITE | MASK_FLUSH) : MASK_WRITE);
//用于檢查內(nèi)存泄露
final Object m = pipeline.touch(msg, next);
//獲取下一個(gè)要被執(zhí)行的channelHandler的executor
EventExecutor executor = next.executor();
//確保OutBound事件由ChannelHandler指定的executor執(zhí)行
if (executor.inEventLoop()) {
//如果當(dāng)前線程正是channelHandler指定的executor則直接執(zhí)行
if (flush) {
next.invokeWriteAndFlush(m, promise);
} else {
next.invokeWrite(m, promise);
}
} else {
//如果當(dāng)前線程不是ChannelHandler指定的executor,則封裝成異步任務(wù)提交給指定executor執(zhí)行,注意這里的executor不一定是reactor線程朋凉。
final WriteTask task = WriteTask.newInstance(next, m, promise, flush);
if (!safeExecute(executor, task, promise, m, !flush)) {
task.cancel();
}
}
}
由于在 writeAndFlush 流程的處理中州丹,flush 標(biāo)志被設(shè)置為 true,所以這里有兩個(gè)地方會(huì)和 write 事件的處理有所不同。
-
findContextOutbound( MASK_WRITE | MASK_FLUSH )
:這里在 pipeline 中向前查找的 ChanneOutboundHandler 需要實(shí)現(xiàn) write 方法或者 flush 方法墓毒。這里需要注意的是 write 方法和 flush 方法只需要實(shí)現(xiàn)其中一個(gè)即可滿足查找條件吓揪。因?yàn)橐话阄覀冏远x ChannelOutboundHandler 時(shí),都會(huì)繼承 ChannelOutboundHandlerAdapter 類所计,而在 ChannelInboundHandlerAdapter 類中對(duì)于這些 outbound 事件都會(huì)有默認(rèn)的實(shí)現(xiàn)柠辞。
public class ChannelOutboundHandlerAdapter extends ChannelHandlerAdapter implements ChannelOutboundHandler {
@Skip
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
ctx.write(msg, promise);
}
@Skip
@Override
public void flush(ChannelHandlerContext ctx) throws Exception {
ctx.flush();
}
}
這樣在后面?zhèn)鞑?write 事件或者 flush 事件的時(shí)候,我們通過上面邏輯找出的 ChannelOutboundHandler 中可能只實(shí)現(xiàn)了一個(gè) flush 方法或者 write 方法主胧。不過這樣沒關(guān)系叭首,如果這里在傳播 outbound 事件的過程中,發(fā)現(xiàn)找出的 ChannelOutboundHandler 中并沒有實(shí)現(xiàn)對(duì)應(yīng)的 outbound 事件回調(diào)函數(shù)踪栋,那么就直接調(diào)用在 ChannelOutboundHandlerAdapter 中的默認(rèn)實(shí)現(xiàn)焙格。
- 在向前傳播 writeAndFlush 事件的時(shí)候會(huì)通過調(diào)用 ChannelHandlerContext 的 invokeWriteAndFlush 方法,先傳播 write 事件 然后在傳播 flush 事件夷都。
void invokeWriteAndFlush(Object msg, ChannelPromise promise) {
if (invokeHandler()) {
//向前傳遞write事件
invokeWrite0(msg, promise);
//向前傳遞flush事件
invokeFlush0();
} else {
writeAndFlush(msg, promise);
}
}
private void invokeWrite0(Object msg, ChannelPromise promise) {
try {
//調(diào)用當(dāng)前ChannelHandler中的write方法
((ChannelOutboundHandler) handler()).write(this, msg, promise);
} catch (Throwable t) {
notifyOutboundHandlerException(t, promise);
}
}
private void invokeFlush0() {
try {
((ChannelOutboundHandler) handler()).flush(this);
} catch (Throwable t) {
invokeExceptionCaught(t);
}
}
這里我們看到了 writeAndFlush 的核心處理邏輯眷唉,首先向前傳播 write 事件,經(jīng)過 write 事件的流程處理后囤官,最后向前傳播 flush 事件冬阳。
根據(jù)前邊的介紹,這里在向前傳播 write 事件的時(shí)候党饮,可能查找出的 ChannelOutboundHandler 只是實(shí)現(xiàn)了 flush 方法肝陪,不過沒關(guān)系,這里會(huì)直接調(diào)用 write 方法在 ChannelOutboundHandlerAdapter 父類中的默認(rèn)實(shí)現(xiàn)刑顺。同理 flush 也是一樣氯窍。
總結(jié)
到這里,Netty 處理數(shù)據(jù)發(fā)送的整個(gè)完整流程捏检,筆者就為大家詳細(xì)地介紹完了荞驴,可以看到 Netty 在處理讀取數(shù)據(jù)和處理發(fā)送數(shù)據(jù)的過程中不皆,雖然核心邏輯都差不多贯城,但是發(fā)送數(shù)據(jù)的過程明顯細(xì)節(jié)比較多,而且更加復(fù)雜一些霹娄。
這里筆者將讀取數(shù)據(jù)和發(fā)送數(shù)據(jù)的不同之處總結(jié)如下幾點(diǎn)供大家回憶對(duì)比:
在每次 read loop 之前能犯,會(huì)分配一個(gè)大小固定的 diretByteBuffer 用來裝載讀取數(shù)據(jù)。每輪 read loop 完全結(jié)束之后犬耻,才會(huì)決定是否對(duì)下一輪的讀取過程分配的 directByteBuffer 進(jìn)行擴(kuò)容或者縮容踩晶。
在每次 write loop 之前,都會(huì)獲取本次 write loop 最大能夠?qū)懭氲淖止?jié)數(shù)枕磁,根據(jù)這個(gè)最大寫入字節(jié)數(shù)從 ChannelOutboundBuffer 中轉(zhuǎn)換 JDK NIO ByteBuffer 渡蜻。每次寫入 Socket 之后都需要重新評(píng)估是否對(duì)這個(gè)最大寫入字節(jié)數(shù)進(jìn)行擴(kuò)容或者縮容。
read loop 和 write loop 都被默認(rèn)限定最多執(zhí)行 16 次。
在一個(gè)完整的 read loop 中茸苇,如果還讀取不完數(shù)據(jù)排苍,直接退出。等到 reactor 線程執(zhí)行完其他 channel 上的 IO 事件再來讀取未讀完的數(shù)據(jù)学密。
-
而在一個(gè)完整的 write loop 中淘衙,數(shù)據(jù)發(fā)送不完,則分兩種情況腻暮。
- Socket 緩沖區(qū)滿無法在繼續(xù)寫入彤守。這時(shí)就需要向 reactor 注冊(cè) OP_WRITE 事件。等 Socket 緩沖區(qū)變的可寫時(shí)哭靖,epoll 通知 reactor 線程繼續(xù)發(fā)送具垫。
- Socket 緩沖區(qū)可寫,但是由于發(fā)送數(shù)據(jù)太多试幽,導(dǎo)致雖然寫滿 16 次但依然沒有寫完急迂。這時(shí)就直接向 reactor 丟一個(gè) flushTask 進(jìn)去,等到 reactor 線程執(zhí)行完其他 channel 上的 IO 事件勘天,在回過頭來執(zhí)行 flushTask龙助。
OP_READ 事件的注冊(cè)是在 NioSocketChannel 被注冊(cè)到對(duì)應(yīng)的 Reactor 中時(shí)就會(huì)注冊(cè)。而 OP_WRITE 事件只會(huì)在 Socket 緩沖區(qū)滿的時(shí)候才會(huì)被注冊(cè)康震。當(dāng) Socket 緩沖區(qū)再次變得可寫時(shí)燎含,要記得取消 OP_WRITE 事件的監(jiān)聽。否則的話就會(huì)一直被通知腿短。
好了屏箍,本文的全部內(nèi)容就到這里了,我們下篇文章見~~~~