前言
在前面的文章中妇押,我們已經(jīng)詳細(xì)闡述了事件和異常傳播在netty中的實(shí)現(xiàn),(netty源碼分析之pipeline(一),netty源碼分析之pipeline(二)),其中有一類事件我們在實(shí)際編碼中用得最多揭朝,那就是 write
或者writeAndFlush
,也就是我們今天的主要內(nèi)容
主要內(nèi)容
本文分以下幾個(gè)部分闡述一個(gè)java對象最后是如何轉(zhuǎn)變成字節(jié)流潭袱,寫到socket緩沖區(qū)中去的
- pipeline中的標(biāo)準(zhǔn)鏈表結(jié)構(gòu)
- java對象編碼過程
- write:寫隊(duì)列
- flush:刷新寫隊(duì)列
- writeAndFlush: 寫隊(duì)列并刷新
pipeline中的標(biāo)準(zhǔn)鏈表結(jié)構(gòu)
一個(gè)標(biāo)準(zhǔn)的pipeline鏈?zhǔn)浇Y(jié)構(gòu)如下(我們省去了異常處理Handler)
數(shù)據(jù)從head節(jié)點(diǎn)流入屯换,先拆包,然后解碼成業(yè)務(wù)對象彤悔,最后經(jīng)過業(yè)務(wù)Handler處理晕窑,調(diào)用write,將結(jié)果對象寫出去杨赤。而寫的過程先通過tail節(jié)點(diǎn),然后通過encoder節(jié)點(diǎn)將對象編碼成ByteBuf植捎,最后將該ByteBuf對象傳遞到head節(jié)點(diǎn)阳柔,調(diào)用底層的Unsafe寫到j(luò)dk底層管道
java對象編碼過程
為什么我們在pipeline中添加了encoder節(jié)點(diǎn),java對象就轉(zhuǎn)換成netty可以處理的ByteBuf济锄,寫到管道里霍转?
我們先看下調(diào)用write
的code
BusinessHandler
protected void channelRead0(ChannelHandlerContext ctx, Request request) throws Exception {
Response response = doBusiness(request);
if (response != null) {
ctx.channel().write(response);
}
}
業(yè)務(wù)處理器接受到請求之后,做一些業(yè)務(wù)處理,返回一個(gè)Response
角虫,然后,response在pipeline中傳遞均驶,落到 Encoder
節(jié)點(diǎn)枫虏,下面是 Encoder
的處理流程
Encoder
public class Encoder extends MessageToByteEncoder<Response> {
@Override
protected void encode(ChannelHandlerContext ctx, Response response, ByteBuf out) throws Exception {
out.writeByte(response.getVersion());
out.writeInt(4 + response.getData().length);
out.writeBytes(response.getData());
}
}
Encoder的處理流程很簡單爬虱,按照簡單自定義協(xié)議跑筝,將java對象 Response
寫到傳入的參數(shù) out
中瞒滴,這個(gè)out
到底是什么?
為了回答這個(gè)問題妓忍,我們需要了解到 Response
對象,從 BusinessHandler
傳入到 MessageToByteEncoder
的時(shí)候定罢,首先是傳入到 write
方法
MessageToByteEncoder
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
ByteBuf buf = null;
try {
// 判斷當(dāng)前Handelr是否能處理寫入的消息
if (acceptOutboundMessage(msg)) {
@SuppressWarnings("unchecked")
// 強(qiáng)制換換
I cast = (I) msg;
// 分配一段ButeBuf
buf = allocateBuffer(ctx, cast, preferDirect);
try {
// 調(diào)用encode旁瘫,這里就調(diào)回到 `Encoder` 這個(gè)Handelr中
encode(ctx, cast, buf);
} finally {
// 既然自定義java對象轉(zhuǎn)換成ByteBuf了境蜕,那么這個(gè)對象就已經(jīng)無用了,釋放掉
// (當(dāng)傳入的msg類型是ByteBuf的時(shí)候粱年,就不需要自己手動(dòng)釋放了)
ReferenceCountUtil.release(cast);
}
// 如果buf中寫入了數(shù)據(jù),就把buf傳到下一個(gè)節(jié)點(diǎn)
if (buf.isReadable()) {
ctx.write(buf, promise);
} else {
// 否則完箩,釋放buf拉队,將空數(shù)據(jù)傳到下一個(gè)節(jié)點(diǎn)
buf.release();
ctx.write(Unpooled.EMPTY_BUFFER, promise);
}
buf = null;
} else {
// 如果當(dāng)前節(jié)點(diǎn)不能處理傳入的對象,直接扔給下一個(gè)節(jié)點(diǎn)處理
ctx.write(msg, promise);
}
} catch (EncoderException e) {
throw e;
} catch (Throwable e) {
throw new EncoderException(e);
} finally {
// 當(dāng)buf在pipeline中處理完之后秩彤,釋放
if (buf != null) {
buf.release();
}
}
}
其實(shí)事哭,這一小節(jié)的內(nèi)容,在前面的博文中降盹,已經(jīng)提到過谤辜,這里价捧,我們詳細(xì)闡述一下Encoder是如何處理傳入的java對象的
1.判斷當(dāng)前Handler是否能處理寫入的消息涡戳,如果能處理,進(jìn)入下面的流程椎眯,否則胳岂,直接扔給下一個(gè)節(jié)點(diǎn)處理
2.將對象強(qiáng)制轉(zhuǎn)換成Encoder
可以處理的 Response
對象
3.分配一個(gè)ByteBuf
4.調(diào)用encoder,即進(jìn)入到 Encoder
的 encode
方法乳丰,該方法是用戶代碼,用戶將數(shù)據(jù)寫入ByteBuf
5.既然自定義java對象轉(zhuǎn)換成ByteBuf了汞斧,那么這個(gè)對象就已經(jīng)無用了什燕,釋放掉,(當(dāng)傳入的msg類型是ByteBuf的時(shí)候庙睡,就不需要自己手動(dòng)釋放了)
6.如果buf中寫入了數(shù)據(jù)技俐,就把buf傳到下一個(gè)節(jié)點(diǎn),否則啡邑,釋放buf井赌,將空數(shù)據(jù)傳到下一個(gè)節(jié)點(diǎn)
7.最后,當(dāng)buf在pipeline中處理完之后仇穗,釋放節(jié)點(diǎn)
總結(jié)一點(diǎn)就是仪缸,Encoder
節(jié)點(diǎn)分配一個(gè)ByteBuf,調(diào)用encode
方法列肢,將java對象根據(jù)自定義協(xié)議寫入到ByteBuf恰画,然后再把ByteBuf傳入到下一個(gè)節(jié)點(diǎn)宾茂,在我們的例子中,最終會(huì)傳入到head節(jié)點(diǎn)
HeadContext
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
unsafe.write(msg, promise);
}
這里的msg就是前面在Encoder
節(jié)點(diǎn)中拴还,載有java對象數(shù)據(jù)的自定義ByteBuf對象跨晴,進(jìn)入下一節(jié)
write:寫隊(duì)列
AbstractChannel
@Override
public final void write(Object msg, ChannelPromise promise) {
assertEventLoop();
ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
int size;
try {
msg = filterOutboundMessage(msg);
size = pipeline.estimatorHandle().size(msg);
if (size < 0) {
size = 0;
}
} catch (Throwable t) {
safeSetFailure(promise, t);
ReferenceCountUtil.release(msg);
return;
}
outboundBuffer.addMessage(msg, size, promise);
}
1.首先,調(diào)用 assertEventLoop
確保該方法的調(diào)用是在reactor線程中片林,關(guān)于reactor線程可以查看我前面的文章
2.然后,調(diào)用 filterOutboundMessage()
方法费封,將待寫入的對象過濾焕妙,把非ByteBuf
對象和FileRegion
過濾,把所有的非直接內(nèi)存轉(zhuǎn)換成直接內(nèi)存DirectBuffer
AbstractNioByteChannel
@Override
protected final Object filterOutboundMessage(Object msg) {
if (msg instanceof ByteBuf) {
ByteBuf buf = (ByteBuf) msg;
if (buf.isDirect()) {
return msg;
}
return newDirectBuffer(buf);
}
if (msg instanceof FileRegion) {
return msg;
}
throw new UnsupportedOperationException(
"unsupported message type: " + StringUtil.simpleClassName(msg) + EXPECTED_TYPES);
}
3.接下來弓摘,估算出需要寫入的ByteBuf的size
4.最后焚鹊,調(diào)用 ChannelOutboundBuffer
的addMessage(msg, size, promise)
方法,所以韧献,接下來末患,我們需要重點(diǎn)看一下這個(gè)方法干了什么事情
ChannelOutboundBuffer
public void addMessage(Object msg, int size, ChannelPromise promise) {
// 創(chuàng)建一個(gè)待寫出的消息節(jié)點(diǎn)
Entry entry = Entry.newInstance(msg, size, total(msg), promise);
if (tailEntry == null) {
flushedEntry = null;
tailEntry = entry;
} else {
Entry tail = tailEntry;
tail.next = entry;
tailEntry = entry;
}
if (unflushedEntry == null) {
unflushedEntry = entry;
}
incrementPendingOutboundBytes(size, false);
}
想要理解上面這段代碼,必須得掌握寫緩存中的幾個(gè)消息指針锤窑,如下圖
ChannelOutboundBuffer 里面的數(shù)據(jù)結(jié)構(gòu)是一個(gè)單鏈表結(jié)構(gòu)璧针,每個(gè)節(jié)點(diǎn)是一個(gè) Entry
,Entry
里面包含了待寫出ByteBuf
以及消息回調(diào) promise
渊啰,下面分別是三個(gè)指針的作用
1.flushedEntry 指針表示第一個(gè)被寫到操作系統(tǒng)Socket緩沖區(qū)中的節(jié)點(diǎn)
2.unFlushedEntry 指針表示第一個(gè)未被寫入到操作系統(tǒng)Socket緩沖區(qū)中的節(jié)點(diǎn)
3.tailEntry指針表示ChannelOutboundBuffer緩沖區(qū)的最后一個(gè)節(jié)點(diǎn)
初次調(diào)用 addMessage
之后探橱,各個(gè)指針的情況為
fushedEntry
指向空,unFushedEntry
和 tailEntry
都指向新加入的節(jié)點(diǎn)
第二次調(diào)用 addMessage
之后虽抄,各個(gè)指針的情況為
第n次調(diào)用 addMessage
之后走搁,各個(gè)指針的情況為
可以看到,調(diào)用n次addMessage
迈窟,flushedEntry指針一直指向NULL私植,表示現(xiàn)在還未有節(jié)點(diǎn)需要寫出到Socket緩沖區(qū),而unFushedEntry
之后有n個(gè)節(jié)點(diǎn)车酣,表示當(dāng)前還有n個(gè)節(jié)點(diǎn)尚未寫出到Socket緩沖區(qū)中去
flush:刷新寫隊(duì)列
不管調(diào)用channel.flush()
曲稼,還是ctx.flush()
,最終都會(huì)落地到pipeline中的head節(jié)點(diǎn)
HeadContext
@Override
public void flush(ChannelHandlerContext ctx) throws Exception {
unsafe.flush();
}
之后進(jìn)入到AbstractUnsafe
AbstractUnsafe
public final void flush() {
assertEventLoop();
ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
if (outboundBuffer == null) {
return;
}
outboundBuffer.addFlush();
flush0();
}
flush方法中湖员,先調(diào)用
ChannelOutboundBuffer
public void addFlush() {
Entry entry = unflushedEntry;
if (entry != null) {
if (flushedEntry == null) {
flushedEntry = entry;
}
do {
flushed ++;
if (!entry.promise.setUncancellable()) {
int pending = entry.cancel();
decrementPendingOutboundBytes(pending, false, true);
}
entry = entry.next;
} while (entry != null);
unflushedEntry = null;
}
}
可以結(jié)合前面的圖來看贫悄,首先拿到 unflushedEntry
指針,然后將 flushedEntry
指向unflushedEntry
所指向的節(jié)點(diǎn)娘摔,調(diào)用完畢之后窄坦,三個(gè)指針的情況如下所示
接下來,調(diào)用 flush0();
AbstractUnsafe
protected void flush0() {
doWrite(outboundBuffer);
}
發(fā)現(xiàn)這里的核心代碼就一個(gè) doWrite,繼續(xù)跟
AbstractNioByteChannel
protected void doWrite(ChannelOutboundBuffer in) throws Exception {
int writeSpinCount = -1;
boolean setOpWrite = false;
for (;;) {
// 拿到第一個(gè)需要flush的節(jié)點(diǎn)的數(shù)據(jù)
Object msg = in.current();
if (msg instanceof ByteBuf) {
// 強(qiáng)轉(zhuǎn)為ByteBuf鸭津,若發(fā)現(xiàn)沒有數(shù)據(jù)可讀彤侍,直接刪除該節(jié)點(diǎn)
ByteBuf buf = (ByteBuf) msg;
boolean done = false;
long flushedAmount = 0;
// 拿到自旋鎖迭代次數(shù)
if (writeSpinCount == -1) {
writeSpinCount = config().getWriteSpinCount();
}
// 自旋,將當(dāng)前節(jié)點(diǎn)寫出
for (int i = writeSpinCount - 1; i >= 0; i --) {
int localFlushedAmount = doWriteBytes(buf);
if (localFlushedAmount == 0) {
setOpWrite = true;
break;
}
flushedAmount += localFlushedAmount;
if (!buf.isReadable()) {
done = true;
break;
}
}
in.progress(flushedAmount);
// 寫完之后逆趋,將當(dāng)前節(jié)點(diǎn)刪除
if (done) {
in.remove();
} else {
break;
}
}
}
}
這里略微有點(diǎn)復(fù)雜盏阶,我們分析一下
1.第一步,調(diào)用current()
先拿到第一個(gè)需要flush的節(jié)點(diǎn)的數(shù)據(jù)
ChannelOutBoundBuffer
public Object current() {
Entry entry = flushedEntry;
if (entry == null) {
return null;
}
return entry.msg;
}
2.第二步,拿到自旋鎖的迭代次數(shù)
if (writeSpinCount == -1) {
writeSpinCount = config().getWriteSpinCount();
}
關(guān)于為什么要用自旋鎖闻书,netty的文檔已經(jīng)解釋得很清楚名斟,這里不過多解釋
ChannelConfig
/**
* Returns the maximum loop count for a write operation until
* {@link WritableByteChannel#write(ByteBuffer)} returns a non-zero value.
* It is similar to what a spin lock is used for in concurrency programming.
* It improves memory utilization and write throughput depending on
* the platform that JVM runs on. The default value is {@code 16}.
*/
int getWriteSpinCount();
3.自旋的方式將ByteBuf寫出到j(luò)dk nio的Channel
for (int i = writeSpinCount - 1; i >= 0; i --) {
int localFlushedAmount = doWriteBytes(buf);
if (localFlushedAmount == 0) {
setOpWrite = true;
break;
}
flushedAmount += localFlushedAmount;
if (!buf.isReadable()) {
done = true;
break;
}
}
doWriteBytes
方法跟進(jìn)去
protected int doWriteBytes(ByteBuf buf) throws Exception {
final int expectedWrittenBytes = buf.readableBytes();
return buf.readBytes(javaChannel(), expectedWrittenBytes);
}
我們發(fā)現(xiàn),出現(xiàn)了 javaChannel()
魄眉,表明已經(jīng)進(jìn)入到了jdk nio Channel的領(lǐng)域砰盐,有關(guān)netty中ByteBuf的介紹不打算在這里展開
4.刪除該節(jié)點(diǎn)
節(jié)點(diǎn)的數(shù)據(jù)已經(jīng)寫入完畢,接下來就需要?jiǎng)h除該節(jié)點(diǎn)
ChannelOutBoundBuffer
public boolean remove() {
Entry e = flushedEntry;
Object msg = e.msg;
ChannelPromise promise = e.promise;
int size = e.pendingSize;
removeEntry(e);
if (!e.cancelled) {
ReferenceCountUtil.safeRelease(msg);
safeSuccess(promise);
}
// recycle the entry
e.recycle();
return true;
}
首先拿到當(dāng)前被flush掉的節(jié)點(diǎn)(flushedEntry所指)杆融,然后拿到該節(jié)點(diǎn)的回調(diào)對象 ChannelPromise
, 調(diào)用 removeEntry()
方法移除該節(jié)點(diǎn)
private void removeEntry(Entry e) {
if (-- flushed == 0) {
flushedEntry = null;
if (e == tailEntry) {
tailEntry = null;
unflushedEntry = null;
}
} else {
flushedEntry = e.next;
}
}
這里的remove是邏輯移除楞卡,只是將flushedEntry指針移到下個(gè)節(jié)點(diǎn),調(diào)用完畢之后脾歇,節(jié)點(diǎn)圖示如下
隨后蒋腮,釋放該節(jié)點(diǎn)數(shù)據(jù)的內(nèi)存,調(diào)用 safeSuccess
進(jìn)行回調(diào)藕各,用戶代碼可以在回調(diào)里面做一些記錄池摧,下面是一段Example
用戶代碼
ctx.write(xx).addListener(new GenericFutureListener<Future<? super Void>>() {
@Override
public void operationComplete(Future<? super Void> future) throws Exception {
// 回調(diào)
}
})
最后,調(diào)用 recycle
方法激况,將當(dāng)前節(jié)點(diǎn)回收
writeAndFlush: 寫隊(duì)列并刷新
理解了write和flush這兩個(gè)過程作彤,writeAndFlush
也就不難了
writeAndFlush在某個(gè)Handler中被調(diào)用之后,最終會(huì)落到 TailContext
節(jié)點(diǎn)乌逐,見 netty源碼分析之pipeline(二)
TailContext
public final ChannelFuture writeAndFlush(Object msg) {
return tail.writeAndFlush(msg);
}
public ChannelFuture writeAndFlush(Object msg) {
return writeAndFlush(msg, newPromise());
}
public ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) {
write(msg, true, promise);
return promise;
}
private void write(Object msg, boolean flush, ChannelPromise promise) {
AbstractChannelHandlerContext next = findContextOutbound();
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
if (flush) {
next.invokeWriteAndFlush(m, promise);
} else {
next.invokeWrite(m, promise);
}
}
}
可以看到竭讳,最終,通過一個(gè)boolean變量浙踢,表示是調(diào)用 invokeWriteAndFlush
绢慢,還是 invokeWrite
,invokeWrite
便是我們上文中的write
過程
private void invokeWriteAndFlush(Object msg, ChannelPromise promise) {
invokeWrite0(msg, promise);
invokeFlush0();
}
可以看到洛波,最終調(diào)用的底層方法和單獨(dú)調(diào)用 write
和 flush
是一樣的
private void invokeWrite(Object msg, ChannelPromise promise) {
invokeWrite0(msg, promise);
}
private void invokeFlush(Object msg, ChannelPromise promise) {
invokeFlush0(msg, promise);
}
由此看來胰舆,invokeWriteAndFlush
基本等價(jià)于write
方法之后再來一次flush
另外,由于對端消費(fèi)不及時(shí)導(dǎo)致writeAndFlush引發(fā)頻繁O(jiān)ld GC的問題和解決思路可以看下 一次netty"引發(fā)的"詭異old gc問題排查過程
總結(jié)
1.pipeline中的編碼器原理是創(chuàng)建一個(gè)ByteBuf,將java對象轉(zhuǎn)換為ByteBuf蹬挤,然后再把ByteBuf繼續(xù)向前傳遞
2.調(diào)用write方法并沒有將數(shù)據(jù)寫到Socket緩沖區(qū)中缚窿,而是寫到了一個(gè)單向鏈表的數(shù)據(jù)結(jié)構(gòu)中,flush才是真正的寫出
3.writeAndFlush等價(jià)于先將數(shù)據(jù)寫到netty的緩沖區(qū)焰扳,再將netty緩沖區(qū)中的數(shù)據(jù)寫到Socket緩沖區(qū)中倦零,寫的過程與并發(fā)編程類似误续,用自旋鎖保證寫成功
4.netty中的緩沖區(qū)中的ByteBuf為DirectByteBuf
如果你覺得看的不過癮,想系統(tǒng)學(xué)習(xí)Netty原理扫茅,那么你一定不要錯(cuò)過我的Netty源碼分析系列視頻:https://coding.imooc.com/class/230.html