本文是Netty文集中“Netty 那些事兒”系列的文章登澜。主要結(jié)合在開發(fā)實戰(zhàn)中侍芝,我們遇到的一些“奇奇怪怪”的問題鹃锈,以及如何正確且更好的使用Netty框架拌汇,并會對Netty中涉及的重要設(shè)計理念進行介紹缚甩。
本文是筆者和朋友(筆名:oojeek)一起討論該問題的一個記錄谱净。文章以討論過程中的思路來展現(xiàn)(也是我們解決問題的思路路線),因此可能會有些亂擅威。再者壕探,如果對Netty寫數(shù)據(jù)流程不了解的朋友,可以先閱讀Netty 源碼解析 ——— writeAndFlush流程分析該篇文章郊丛,下面的討論中會涉及不少這篇文章提及的概念李请。
問題
起因是這樣的,朋友倒騰了個發(fā)送大數(shù)據(jù)包的demo厉熟,結(jié)果發(fā)現(xiàn)在發(fā)送大數(shù)據(jù)包時导盅,寫空閑超時事件被觸發(fā)了。即便在設(shè)置了IdleStateHandler的observeOutput屬性為true的情況下揍瑟,依舊會發(fā)送在寫一個大數(shù)據(jù)包的過程中白翻,寫空閑超時事件被觸發(fā)。
先來簡單看看朋友的demo绢片,我們來看幾個關(guān)鍵類
public class MyClientInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addFirst("idleStateHandler", new IdleStateHandler(true,9, 2, 11, TimeUnit.SECONDS));
pipeline.addLast(new LengthFieldBasedFrameDecoder(ByteOrder.LITTLE_ENDIAN, Integer.MAX_VALUE,
0, 4, 0, 4, true));
pipeline.addLast(new LengthFieldPrepender(ByteOrder.LITTLE_ENDIAN, 4, 0, false));
pipeline.addLast(new StringEncoder(CharsetUtil.UTF_8));
pipeline.addLast(new StringDecoder(CharsetUtil.UTF_8));
pipeline.addLast(new MyClientHandler());
}
}
我們定義了一個IdleStateHandler滤馍,并且設(shè)置了observeOutput屬性為true(即,第一個參數(shù))底循,以及設(shè)置了寫空閑超時時間為2秒(即纪蜒,第二個參數(shù))。
public class MyClientHandler extends SimpleChannelInboundHandler<String> {
private String tempString;
public MyClientHandler() {
StringBuilder builder = new StringBuilder();
for (int i = 0; i < 1024 * 1024; i++) {
builder.append("abcdefghijklmnopqrstuvwxyz");
}
tempString = builder.toString();
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
System.out.println(LocalDateTime.now().toString() + "----" + ctx.channel().remoteAddress().toString() + "----" + msg.length());
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
sendData(ctx);
}
private void sendData(ChannelHandlerContext ctx) {
if (!ctx.channel().isActive())
{
System.out.println("channel inactive...");
ctx.close();
return;
}
System.out.println("send a pack of data ...");
long tickCount = System.currentTimeMillis();
ChannelFuture future = ctx.writeAndFlush(tempString);
ChannelPromise promise = (ChannelPromise)future;
promise.addListener(new GenericFutureListener<Future<? super Void>>() {
@Override
public void operationComplete(Future<? super Void> future) throws Exception {
System.out.println("send completed");
sendData(ctx);
}
});
System.out.println("Time elapse:" + (System.currentTimeMillis() - tickCount));
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
//super.exceptionCaught(ctx, cause);
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
//System.out.println(LocalDateTime.now().toString());
if (evt == IdleStateEvent.READER_IDLE_STATE_EVENT) {
System.out.println("READER_IDLE_STATE_EVENT");
} else if (evt == IdleStateEvent.WRITER_IDLE_STATE_EVENT){
// for heartbit
System.out.println("WRITER_IDLE_STATE_EVENT----" + LocalDateTime.now().toString());
//ctx.writeAndFlush("ACK");
} else if (evt == IdleStateEvent.ALL_IDLE_STATE_EVENT) {
//System.out.println("ALL_IDLE_STATE_EVENT");
} else if (evt == IdleStateEvent.FIRST_READER_IDLE_STATE_EVENT) {
System.out.println("FIRST_READER_IDLE_STATE_EVENT");
} else if (evt == IdleStateEvent.FIRST_WRITER_IDLE_STATE_EVENT) {
//System.out.println("FIRST_WRITER_IDLE_STATE_EVENT");
} else if (evt == IdleStateEvent.FIRST_ALL_IDLE_STATE_EVENT) {
//System.out.println("FIRST_ALL_IDLE_STATE_EVENT");
}
//super.userEventTriggered(ctx, evt);
}
}
這里此叠,定義了一個27262976字節(jié)大小的tempString數(shù)據(jù)纯续,用于發(fā)送。并實現(xiàn)了userEventTriggered方法灭袁,當(dāng)寫空閑超時事件發(fā)送時猬错,會打印一條『"WRITER_IDLE_STATE_EVENT----" + LocalDateTime.now().toString()』信息。
然后啟動程序茸歧,連接的服務(wù)端是朋友的騰訊云倦炒,服務(wù)器做了帶寬限制,限制為1M软瞎,以重現(xiàn)問題逢唤。
運行程序的過程中,發(fā)現(xiàn)涤浇,當(dāng)大數(shù)據(jù)包(即鳖藕,27262976字節(jié)大小的tempString)在發(fā)送的過程中,寫空閑超時不斷的被觸發(fā)調(diào)用只锭。并且我們自定義handler中只發(fā)送了一個數(shù)據(jù)包著恩,但到了底層卻有兩個數(shù)據(jù)包發(fā)送出去了。
然后就此情況我們開始了討論。喉誊。邀摆。
尋找問題發(fā)送的根源
首先,IdleStateHandler的write操作確實確實只是將listener加到了write操作的listener集合中伍茄,write操作本身不會去修改lastWriteTime栋盹。
然后,我們曉得flush是一個出站操作敷矫,最終ChannelPipeline的head會對其進行處理贞盯。head底層會調(diào)用NioSocketChannel.doWrite()方法來將數(shù)據(jù)刷新到socket中。
doWrite()操作是一個寫循環(huán)操作沪饺。第一次循環(huán):
expectedWrittenBytes:27262980整葡。這個字段表示本次flush操作我們希望寫出去的數(shù)據(jù)大小件余,也就是之前我們write操作已經(jīng)寫入的數(shù)據(jù)。即:
ChannelFuture future = ctx.writeAndFlush(tempString);
public MyClientHandler() {
StringBuilder builder = new StringBuilder();
for (int i = 0; i < 1024 * 1024; i++) {
builder.append("abcdefghijklmnopqrstuvwxyz");
}
tempString = builder.toString();
}
為什么是2個待發(fā)送的ByteBuf了遭居?
這和我們定義了『pipeline.addLast(new LengthFieldPrepender(ByteOrder.LITTLE_ENDIAN, 4, 0, false));』有關(guān):
知道了為什么有2個待發(fā)送的ByteBuf话侧,我們繼續(xù)看doWrite()操作中寫數(shù)據(jù)的操作:
最后霹俺,我們來看doWrite()操作中的『in.removeBytes(writtenBytes);』操作
『目前珊佣,我們可以先理解為,write操作的數(shù)據(jù)最終都會放到ChannelOutboundBuffer中披粟,其中有兩個屬性private Entry unflushedEntry彩扔、private Entry flushedEntry。它們都是用Entry對象通過next指針來維護的一個單向鏈表僻爽。
unflushedEntry表示還未刷新的ByteBuf的鏈表頭虫碉;flushedEntry表示調(diào)用flush()操作時將會進行刷新的ByteBuf的鏈表頭。
在write的時候會將ByteBuf封裝為一個Entry對象放到unflushedEntry的尾部胸梆。當(dāng)調(diào)用flush時敦捧,就會將unflushedEntry賦值給flushedEntry,然后將unflushedEntry置null碰镜。
同時current()返回當(dāng)前正在處理的Entry對象(Entry中封裝了ByteBuf)』
到此為止兢卵,第一個ByteBuf,即記錄著我們要發(fā)送消息長度大小的ByteBuf就發(fā)送出去了绪颖,并且觸發(fā)了一次“IdleStateHandler 的 writeListener”的調(diào)用秽荤。
那么,第二個ByteBuf就是我們的大數(shù)據(jù)包了。通過上面的分析窃款,我們知道大數(shù)據(jù)包走的是else流程课兄。也就是說,本次真實寫出去的數(shù)據(jù) 比 當(dāng)前這個ByteBuf的可讀取數(shù)據(jù)要小晨继。也就說明烟阐,當(dāng)前這個ByteBuf還沒有被完全的寫完。因此并不會通過調(diào)用『remove()』操作來觸發(fā)“IdleStateHandler 的 writeListener”的回調(diào)紊扬。直到整個大數(shù)據(jù)包所有的內(nèi)容都寫出去了蜒茄,那么這是if(readableBytes <= writtenBytes)才會為真,這是才會去觸發(fā)“IdleStateHandler 的 writeListener”的回調(diào)餐屎。
也就是說檀葛,只有在一個ByteBuf的數(shù)據(jù)全部都寫完了之后,才會去觸發(fā)所有注冊到這個write操作上的GenericFutureListener的回調(diào)腹缩。
netty其實有提供了一個ChannelProgressiveFuture來監(jiān)控數(shù)據(jù)的發(fā)送過程屿聋,它可以實現(xiàn)在一個大數(shù)據(jù)發(fā)送的過程中回調(diào)注冊到其上的ChannelProgressiveFutureListener,比如:
ChannelProgressivePromise progressivePromise = ctx.channel().newProgressivePromise();
progressivePromise.addListener(new ChannelProgressiveFutureListener(){
@Override
public void operationProgressed(ChannelProgressiveFuture future, long progress, long total) throws Exception {
System.out.println("數(shù)據(jù)正在發(fā)送中庆聘。胜臊。。");
}
@Override
public void operationComplete(ChannelProgressiveFuture future) throws Exception {
System.out.println("數(shù)據(jù)已經(jīng)發(fā)送完了伙判!");
}
});
ctx.writeAndFlush(tempString, progressivePromise);
最后象对。說明下,當(dāng)將大數(shù)據(jù)包拆成一個個小包發(fā)送時宴抚,為什么不會導(dǎo)致寫空閑超時的觸發(fā)勒魔。
因為當(dāng)大數(shù)據(jù)包被拆分成一個個小包發(fā)送時,每個小數(shù)據(jù)包就是一個ByteBuf菇曲,每個ByteBuf待寫出的數(shù)據(jù)量就很小冠绢,比如本例中,我一個ByteBuf就是一個長度為26的英文字符串常潮,那么每次寫操作完成后在removeBytes()操作:
到目前為止喊式,我們已經(jīng)知道導(dǎo)致寫空閑超時的原因所在了孵户。這時我們可以想到的解決方案有:
① 用變量來記錄是否正在發(fā)送中,如果在發(fā)送中岔留,即使寫空閑超時被觸發(fā)也不發(fā)送心跳
② 將打包拆分成小包的方式
更進一步
但是夏哭,我們還有一個疑惑未解決,那就是IdleStateHandler類中observeOutput屬性到底是干啥用的献联?
我們先來看看observeOutput屬性在IdleStateHandler中的使用:
首先在doc文檔中竖配,對observeOutput屬性的描述是“在訪問寫空閑超時時何址,字節(jié)消費是否會被考慮進去,默認(rèn)為false”进胯,也就是說用爪,當(dāng)字節(jié)被消費時,寫空閑超時事件否非該被觸發(fā)龄减。
從上文项钮,我們已經(jīng)得知班眯,只有在每次真正寫完一個Bytebuf后希停,該ByteBuf的異步寫操作才算是完成,那么才會去觸發(fā)該異步寫操作上的listener署隘,也就是這是才會修改IdleStateHandler的lastWriteTime屬性宠能。
起初,我們以為如果將“observeOutput”屬性設(shè)置為true磁餐,那么即使ByteBuf包沒有被完全寫完违崇,但是已經(jīng)有字節(jié)數(shù)據(jù)在被寫出了,那么此時也不應(yīng)該觸發(fā)寫空閑超時事件诊霹。但羞延,結(jié)果卻是寫空閑超時事件依舊被觸發(fā)了。這是為什么了脾还?
private boolean hasOutputChanged(ChannelHandlerContext ctx, boolean first) {
if (observeOutput) {
// We can take this shortcut if the ChannelPromises that got passed into write()
// appear to complete. It indicates "change" on message level and we simply assume
// that there's change happening on byte level. If the user doesn't observe channel
// writability events then they'll eventually OOME and there's clearly a different
// problem and idleness is least of their concerns.
if (lastChangeCheckTimeStamp != lastWriteTime) {
lastChangeCheckTimeStamp = lastWriteTime;
// But this applies only if it's the non-first call.
if (!first) {
return true;
}
}
Channel channel = ctx.channel();
Unsafe unsafe = channel.unsafe();
ChannelOutboundBuffer buf = unsafe.outboundBuffer();
if (buf != null) {
int messageHashCode = System.identityHashCode(buf.current());
long pendingWriteBytes = buf.totalPendingWriteBytes();
if (messageHashCode != lastMessageHashCode || pendingWriteBytes != lastPendingWriteBytes) {
lastMessageHashCode = messageHashCode;
lastPendingWriteBytes = pendingWriteBytes;
if (!first) {
return true;
}
}
}
}
return false;
}
這里“observeOutput”為true情況下,主要會根據(jù)三對數(shù)值的比較情況來覺得輸出是否有改變鄙漏,① lastChangeCheckTimeStamp 與 lastWriteTime嗤谚;② messageHashCode 與 lastMessageHashCode;③ pendingWriteBytes 與 lastPendingWriteBytes怔蚌;
① 和 ② 都好理解巩步,最讓我們困惑的是③,也就是說桦踊,pendingWriteBytes屬性并未像我們猜測的那樣隨著ByteBuf中的數(shù)據(jù)的寫出而改變椅野。 這又是為什么了?
為了解決這個問題籍胯,我們通過反向思考來嘗試的解決竟闪。即,這個值(pendingWriteBytes)是在什么情況下會被修改芒炼?
ChannelOutboundBuffer:
好了暗挑,現(xiàn)在我們知道笋除,其實pendingWriteBytes實際上也是在一個ByteBuf都寫出后才會被修改的。炸裆。垃它。 那么問題又來了,既然是這樣烹看,那么這個pendingWriteBytes又有什么用了国拇?或者說observeOutput屬性的使用到底是在什么場景下?惯殊?
這個問題其實在hasOutputChanged方法注解的github issues 6150中給出了討論酱吝。
目前能得到的結(jié)論是observeOutput屬性是為了issues 6150問題所提供的解決方案,而這個問題是在通過HTTP2協(xié)議進行數(shù)據(jù)發(fā)送時導(dǎo)致的土思,討論中提及netty在對HTTP2傳輸協(xié)議進行數(shù)據(jù)傳輸時可能會將多個數(shù)據(jù)包整合正一個包發(fā)送導(dǎo)致寫空閑超時事件被觸發(fā)了(因為务热,該問題與本文的問題并無關(guān)聯(lián),所以不做具體說明)己儒。但是通過github issues 6150討論中崎岂,我們得知了netty之所以不提供在寫一個大數(shù)據(jù)包的過程中修改pendingWriteBytes的原因(即,netty不支持某個ByteBuf中寫出部分?jǐn)?shù)據(jù)就修改ChannelOutboundBuffer中totalPendingSize值闪湾。)冲甘,這是為了防止ABA問題。
② 因為如果不是以ByteBuf或者FileRegion為單位修改pending bytes的話胳嘲,可能出現(xiàn)ABA問題厂僧。即,因為write操作可以由多個不同的線程來操作(非EventLoop線程)了牛,這可能導(dǎo)致EventLoop線程在進行該OutboundBuffer中ByteBuf的flush操作時颜屠,其他線程再往這個OutboundBuffer中加數(shù)據(jù)辰妙,這可能使得最終pending bytes的值并沒有改變,但實際上pending bytes是改變過的了甫窟,這樣就會使得判斷錯誤密浑。(PS:目前NIO傳輸時,寫完一個ByteBuf就會觸發(fā)該ByteBuf的listener粗井,那么lastWriteTime就會被修改尔破,此時根本不會進入)
③ 而另一個ABA問題是,如果保持了ByteBuf的引用浇衬,如果使用池的ByteBuf的話(默認(rèn)懒构,Netty就是使用池的ByteBuf),如果我們存儲OutBoundBuffer中的當(dāng)前的(鏈表頭)的那個ByteBuf對象的引用径玖,在每次寫空閑超時事件中判斷這個ByteBuf對象的hashCode與上一次調(diào)用時的值做比較來得出是否是同一個ByteBuf痴脾。??這種情況也可能出現(xiàn)ABA問題颤介,正式因為ByteBuf是池的梳星,那么就可能在寫空閑超時事件回調(diào)方法中存有的ByteBuf引用還是一樣的,但實際上是被回收后再次分配出去的滚朵,因此是邏輯上來說是不一樣的ByteBuf對象了冤灾。
- “observeOutput” 字段的使用場景:
當(dāng)在寫一個大數(shù)據(jù)包的時候,且該在寫超時已不是第一次觸發(fā)的時候(即辕近,first 為 false)韵吨,這個大數(shù)據(jù)包還沒寫完。但在此時移宅,我們已經(jīng)有 ch.write(data)了其數(shù)據(jù)了归粉,這會導(dǎo)致『pendingWriteBytes != lastPendingWriteBytes == true』(因為,channelOutboundBytes 只有在一個 ByteBuf 都寫出去后漏峰,即糠悼,寫到 socket 的寫緩沖后。才會減少其totalPendingWriteBytes 的值浅乔。這樣在??這個場景中倔喂,在我們自此write一個data的時候,totalPendingWriteBytes的值會增加)靖苇,因此來表示 outputChanged席噩。也就是說,observeOutput 觀察的是贤壁,是否有新的寫數(shù)據(jù)操作悼枢,而非對已經(jīng)操作的write的數(shù)據(jù)的觀察!F⒉稹馒索!
解決方案
好了给梅,到目前為止,我們已經(jīng)知道為什么我們使用“observeOutput”屬性無法達(dá)到我們預(yù)計的效果了双揪。那么动羽,關(guān)于發(fā)送大數(shù)據(jù)包我們到底可以做處理了。渔期。
這里运吓,我們覺得可以采用的一個方式是,使用“ChunkedWriteHandler”來實現(xiàn)大數(shù)據(jù)包的傳輸疯趟。
這個ChunkedWriteHandler又是怎么突然跑出來的拘哨。。是這樣的信峻,其實之前我們也不曉得有這個類倦青,或者說因為了解不深給忽略了它。正好在解決這個問題的間隙盹舞,將Netty的寫數(shù)據(jù)操作給過了邊产镐,在這其中發(fā)現(xiàn)了Netty自身目前僅對ChunkedWriteHandler和HTTP2的提供了WriteBufferWaterMark的支持,其余的需要我們程序自行添加支持踢步。而WriteBufferWaterMark通常就是為了控制有大量待寫出數(shù)據(jù)的情況下對寫出流量進行控制的一個方式癣亚,這看似和我們的大數(shù)據(jù)包寫出還是有些個關(guān)系的。因此获印,我們通過doc簡單了解了下ChunkedWriteHandler的使用述雾,發(fā)現(xiàn)確實是個可行的方式。在經(jīng)過測試后也如預(yù)期般達(dá)到了我們要的效果兼丰!下面玻孟,我們就來說說如果通過ChunkedWriteHandler來實現(xiàn)大數(shù)據(jù)包發(fā)送的發(fā)送。
這里對ChunkedWriteHandler做一個簡單的介紹:
ChunkedWriteHandler:一個handler鳍征,用于支持異步寫大數(shù)據(jù)流并且不需要消耗大量內(nèi)存也不會導(dǎo)致內(nèi)存溢出錯誤( OutOfMemoryError )黍翎。
ChunkedWriteHandler僅支持ChunkedInput類型的消息。也就是說蟆技,僅當(dāng)消息類型是ChunkedInput時才能實現(xiàn)ChunkedWriteHandler提供的大數(shù)據(jù)包傳輸功能(ChunkedInput是一個不確定長度的數(shù)據(jù)流)玩敏。
ChunkedWriteHandler中維護了一個待發(fā)送的數(shù)據(jù)包消息隊列(Queue<PendingWrite> queue,其中PendingWrite封裝了你待發(fā)送的消息以及異步寫操作的promise)
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
queue.add(new PendingWrite(msg, promise));
}
這樣使得你write的數(shù)據(jù)包在經(jīng)過ChunkedWriteHandler的時候质礼,會先被存儲到這個消息隊列中旺聚,并不會立即放入到ChannelOutboundBuffer里。
而當(dāng)你執(zhí)行flush操作時眶蕉,ChunkedWriteHandler會依次取出消息隊列中的大數(shù)據(jù)包砰粹,然后拆分成一個個小數(shù)據(jù)包(ByteBuf)后發(fā)給下游的ChannelOutboundHandler,并且在每次發(fā)送完一個ByteBuf包后都會立即執(zhí)行依次ctx.flush()操作將該ByteBuf發(fā)送到網(wǎng)絡(luò)中。但也正是因為碱璃,ChunkedWriteHandler將一個大數(shù)據(jù)包拆分成了一個個小數(shù)據(jù)包放入底層的ChannelOutboundBuffer進行傳輸弄痹,這使得你對大數(shù)據(jù)包的異步寫操作注冊的listener在底層的ChannelOutboundBuffer已經(jīng)無法得到并且回調(diào)了,這就需要我們通過程序來進行狀態(tài)的管理以保持我們原有邏輯的正確性嵌器。
ChunkedWriteHandler會為每個發(fā)送的小數(shù)據(jù)包注冊一個listener肛真,這個listener會在小數(shù)據(jù)包成功發(fā)送完成后調(diào)用原始大數(shù)據(jù)包的GenericProgressiveFutureListener,上面我們已經(jīng)說了通過GenericProgressiveFutureListener我們可以監(jiān)控數(shù)據(jù)包的發(fā)送進度(通過回調(diào)operationProgressed方法實現(xiàn))爽航,以及在大數(shù)據(jù)包發(fā)送完后得到一個通知(通過回調(diào)operationComplete方法實現(xiàn))蚓让。因此,我們可以在operationComplete回調(diào)方法中對寫原始大數(shù)據(jù)包的異步操作上注冊的listener進行回調(diào)(通過表示寫異步操作promise完成來實現(xiàn))讥珍。
值得一提的時历极,ChunkedWriteHandler對將大數(shù)據(jù)包拆分成小數(shù)據(jù)包發(fā)往下游進行的操作是受WriteBufferWaterMark控制的,當(dāng)寫緩沖區(qū)中的數(shù)據(jù)數(shù)量超過了設(shè)置的高水位標(biāo)志衷佃,那么Channel#isWritable()方法將開始返回false趟卸,那么此時ChunkedWriteHandler就不會繼續(xù)拆分大數(shù)據(jù)包。然后當(dāng)寫緩沖區(qū)中的字節(jié)數(shù)量減少至小于了低水位標(biāo)志氏义,Channel#isWritable()方法會重新開始返回true锄列,而此時ChunkedWriteHandler會繼續(xù)拆分未拆分完的大數(shù)據(jù)包,繼續(xù)數(shù)據(jù)的寫操作觅赊。
絮絮叨叨了這么多右蕊,來看看具體的實現(xiàn):
首先修改了MyClientInitializer:
public class MyClientInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addFirst("idleStateHandler", new IdleStateHandler(true, 9, 2, 11, TimeUnit.SECONDS));
pipeline.addLast(new LengthFieldBasedFrameDecoder(ByteOrder.LITTLE_ENDIAN, Integer.MAX_VALUE,
0, 4, 0, 4, true));
pipeline.addLast(new LengthFieldPrepender(ByteOrder.LITTLE_ENDIAN, 4, 0, false));
pipeline.addLast("chunkedWriteHandler", new ChunkedWriteHandler());
pipeline.addLast("myClientChunkHandler", new MyClientChunkHandler());
pipeline.addLast(new StringEncoder(CharsetUtil.UTF_8));
pipeline.addLast(new StringDecoder(CharsetUtil.UTF_8));
pipeline.addLast(new MyClientHandler());
}
}
在StringEncoder和LengthFieldPrepender兩個編碼器間添加了MyClientChunkHandler和ChunkedWriteHandler。
MyChunkedWriteHandler是一個出站處理器吮螺,它會完成將StringEncoder編碼后的大數(shù)據(jù)包類型轉(zhuǎn)換成ChannelInputStream類型,以使得其后的ChunkedWriteHandler能夠?qū)υ摯髷?shù)據(jù)包實現(xiàn)拆分分發(fā)的作用帕翻。
接下來是自定義的 MyClientChunkHandler鸠补,用于將我們的待發(fā)送的大數(shù)據(jù)包類型轉(zhuǎn)換成ChunkedInput類型,以使得ChunkedWriteHandler能夠發(fā)揮作用嘀掸。
public class MyServerChunkHandler extends ChannelOutboundHandlerAdapter {
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
if(msg instanceof ByteBuf) {
ByteBuf buf = (ByteBuf)msg;
ByteInputStream in = new ByteInputStream();
byte[] data = null;
if(buf.hasArray()) {
System.out.println("+++ is array");
data = buf.array().clone();
} else {
System.out.println("--- is direct");
data = new byte[buf.readableBytes()];
buf.writeBytes(data);
}
// System.out.println("===== data length : " + data.length);
in.setBuf(data);
ChunkedStream stream = new ChunkedStream(in);
ReferenceCountUtil.release(msg);
ctx.write(stream, promise);
} else {
super.write(ctx, msg, promise);
}
}
}
??實現(xiàn)了將數(shù)據(jù)包類型轉(zhuǎn)換為ByteInputStream類型紫岩,傳遞個下一個ChannelOutboundHandler(也就是ChunkedWriteHandler)
后記
本次問題和朋友陸陸續(xù)續(xù)的討論了兩個晚上,印象還是比較深刻的睬塌。在第一次討論問題的時候泉蝌,我們對Netty的寫數(shù)據(jù)流程也沒有比較清晰的概念。后面將這塊流程補上后揩晴,再重新回來看待問題勋陪,感覺又清晰了不少,再者對于IdleStateHandler的observeOutput屬性確實是比較容易讓人誤解硫兰。如果沒有去翻查github和源碼的話诅愚,不容易明白這個屬性真正的用意。但也正是因為對Netty寫數(shù)據(jù)流程的梳理劫映,讓我們發(fā)現(xiàn)了一直忽略ChunkedWriteHandler违孝,也讓這個問題有了現(xiàn)在的這個解決方案刹前。當(dāng)然,可能隨著后面進一步深入的學(xué)習(xí)雌桑,我們會發(fā)現(xiàn)更好的解決方案喇喉,那么到時候也會繼續(xù)分享的。
若文章有任何錯誤校坑,望大家不吝指教:)