Netty 那些事兒 ——— 關(guān)于 “Netty 發(fā)送大數(shù)據(jù)包時 觸發(fā)寫空閑超時” 的一些思考

本文是Netty文集中“Netty 那些事兒”系列的文章登澜。主要結(jié)合在開發(fā)實戰(zhàn)中侍芝,我們遇到的一些“奇奇怪怪”的問題鹃锈,以及如何正確且更好的使用Netty框架拌汇,并會對Netty中涉及的重要設(shè)計理念進行介紹缚甩。

本文是筆者和朋友(筆名:oojeek)一起討論該問題的一個記錄谱净。文章以討論過程中的思路來展現(xiàn)(也是我們解決問題的思路路線),因此可能會有些亂擅威。再者壕探,如果對Netty寫數(shù)據(jù)流程不了解的朋友,可以先閱讀Netty 源碼解析 ——— writeAndFlush流程分析該篇文章郊丛,下面的討論中會涉及不少這篇文章提及的概念李请。

問題

起因是這樣的,朋友倒騰了個發(fā)送大數(shù)據(jù)包的demo厉熟,結(jié)果發(fā)現(xiàn)在發(fā)送大數(shù)據(jù)包時导盅,寫空閑超時事件被觸發(fā)了。即便在設(shè)置了IdleStateHandler的observeOutput屬性為true的情況下揍瑟,依舊會發(fā)送在寫一個大數(shù)據(jù)包的過程中白翻,寫空閑超時事件被觸發(fā)。
先來簡單看看朋友的demo绢片,我們來看幾個關(guān)鍵類

public class MyClientInitializer extends ChannelInitializer<SocketChannel> {
    @Override
    protected void initChannel(SocketChannel ch) throws Exception {
        ChannelPipeline pipeline = ch.pipeline();
        pipeline.addFirst("idleStateHandler", new IdleStateHandler(true,9, 2, 11, TimeUnit.SECONDS));
        pipeline.addLast(new LengthFieldBasedFrameDecoder(ByteOrder.LITTLE_ENDIAN, Integer.MAX_VALUE,
                0, 4, 0, 4, true));
        pipeline.addLast(new LengthFieldPrepender(ByteOrder.LITTLE_ENDIAN, 4, 0, false));
        pipeline.addLast(new StringEncoder(CharsetUtil.UTF_8));
        pipeline.addLast(new StringDecoder(CharsetUtil.UTF_8));
        pipeline.addLast(new MyClientHandler());
    }
}

我們定義了一個IdleStateHandler滤馍,并且設(shè)置了observeOutput屬性為true(即,第一個參數(shù))底循,以及設(shè)置了寫空閑超時時間為2秒(即纪蜒,第二個參數(shù))。

public class MyClientHandler extends SimpleChannelInboundHandler<String> {
    private String tempString;

    public MyClientHandler() {
        StringBuilder builder = new StringBuilder();
        for (int i = 0; i < 1024 * 1024; i++) {
            builder.append("abcdefghijklmnopqrstuvwxyz");
        }
        tempString = builder.toString();
    }

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
        System.out.println(LocalDateTime.now().toString() + "----" + ctx.channel().remoteAddress().toString() + "----" + msg.length());
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        sendData(ctx);
    }

    private void sendData(ChannelHandlerContext ctx) {
        if (!ctx.channel().isActive())
        {
            System.out.println("channel inactive...");
            ctx.close();
            return;
        }

        System.out.println("send a pack of data ...");

        long tickCount = System.currentTimeMillis();
        ChannelFuture future = ctx.writeAndFlush(tempString);
        ChannelPromise promise = (ChannelPromise)future;
        promise.addListener(new GenericFutureListener<Future<? super Void>>() {
            @Override
            public void operationComplete(Future<? super Void> future) throws Exception {
                System.out.println("send completed");
                sendData(ctx);
            }
        });
        System.out.println("Time elapse:" + (System.currentTimeMillis() - tickCount));
    }
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
        //super.exceptionCaught(ctx, cause);
    }

    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        //System.out.println(LocalDateTime.now().toString());
        if (evt == IdleStateEvent.READER_IDLE_STATE_EVENT) {
            System.out.println("READER_IDLE_STATE_EVENT");
        } else if (evt == IdleStateEvent.WRITER_IDLE_STATE_EVENT){
            // for heartbit
            System.out.println("WRITER_IDLE_STATE_EVENT----" + LocalDateTime.now().toString());
            //ctx.writeAndFlush("ACK");
        } else if (evt == IdleStateEvent.ALL_IDLE_STATE_EVENT) {
            //System.out.println("ALL_IDLE_STATE_EVENT");
        } else if (evt == IdleStateEvent.FIRST_READER_IDLE_STATE_EVENT) {
            System.out.println("FIRST_READER_IDLE_STATE_EVENT");
        } else if (evt == IdleStateEvent.FIRST_WRITER_IDLE_STATE_EVENT) {
            //System.out.println("FIRST_WRITER_IDLE_STATE_EVENT");
        } else if (evt == IdleStateEvent.FIRST_ALL_IDLE_STATE_EVENT) {
            //System.out.println("FIRST_ALL_IDLE_STATE_EVENT");
        }
        //super.userEventTriggered(ctx, evt);
    }
}

這里此叠,定義了一個27262976字節(jié)大小的tempString數(shù)據(jù)纯续,用于發(fā)送。并實現(xiàn)了userEventTriggered方法灭袁,當(dāng)寫空閑超時事件發(fā)送時猬错,會打印一條『"WRITER_IDLE_STATE_EVENT----" + LocalDateTime.now().toString()』信息。

然后啟動程序茸歧,連接的服務(wù)端是朋友的騰訊云倦炒,服務(wù)器做了帶寬限制,限制為1M软瞎,以重現(xiàn)問題逢唤。

運行程序的過程中,發(fā)現(xiàn)涤浇,當(dāng)大數(shù)據(jù)包(即鳖藕,27262976字節(jié)大小的tempString)在發(fā)送的過程中,寫空閑超時不斷的被觸發(fā)調(diào)用只锭。并且我們自定義handler中只發(fā)送了一個數(shù)據(jù)包著恩,但到了底層卻有兩個數(shù)據(jù)包發(fā)送出去了。

然后就此情況我們開始了討論。喉誊。邀摆。

尋找問題發(fā)送的根源

首先,IdleStateHandler的write操作確實確實只是將listener加到了write操作的listener集合中伍茄,write操作本身不會去修改lastWriteTime栋盹。

然后,我們曉得flush是一個出站操作敷矫,最終ChannelPipeline的head會對其進行處理贞盯。head底層會調(diào)用NioSocketChannel.doWrite()方法來將數(shù)據(jù)刷新到socket中。

doWrite()操作是一個寫循環(huán)操作沪饺。第一次循環(huán):

nioBufferCnt為2躏敢;說明有2個待發(fā)送的ByteBuf。
expectedWrittenBytes:27262980整葡。這個字段表示本次flush操作我們希望寫出去的數(shù)據(jù)大小件余,也就是之前我們write操作已經(jīng)寫入的數(shù)據(jù)。即:
ChannelFuture future = ctx.writeAndFlush(tempString);

public MyClientHandler() {
        StringBuilder builder = new StringBuilder();
        for (int i = 0; i < 1024 * 1024; i++) {
            builder.append("abcdefghijklmnopqrstuvwxyz");
        }
        tempString = builder.toString();
    }

為什么是2個待發(fā)送的ByteBuf了遭居?
這和我們定義了『pipeline.addLast(new LengthFieldPrepender(ByteOrder.LITTLE_ENDIAN, 4, 0, false));』有關(guān):

所以啼器,在經(jīng)過LengthFieldPrepender編碼器處理后,傳入給下一個ChannelOutboundHandler的待處理數(shù)據(jù)已經(jīng)是2個ByteBuf了俱萍。即端壳,如下:
在encode()函數(shù)調(diào)用完,是會將編碼后的結(jié)果集合中的ByteBuf依次的調(diào)用ctx.write()操作來傳遞給下一個ChannelOutboundHandler枪蘑。這里我們暫時只要知道兩次write操作最終會將兩個ByteBuf給添加到ChannelOutboundBuffer中损谦。一個ByteBuf的capacity是4,里面記錄了我們要發(fā)送的消息的大性榔摹照捡;另一個ByteBuf就是我們要發(fā)送的數(shù)據(jù)了。


知道了為什么有2個待發(fā)送的ByteBuf话侧,我們繼續(xù)看doWrite()操作中寫數(shù)據(jù)的操作:
這里主要完成了一次寫操作栗精,config().getWriteSpinCount()為16,也就是一次寫操作會最多執(zhí)行16次的SocketChannel.write操作來將數(shù)據(jù)寫到網(wǎng)絡(luò)中瞻鹏。每次ch.write完都會進行相應(yīng)的『expectedWrittenBytes -= localWrittenBytes;』操作悲立。如果在最后expectedWrittenBytes依舊大于0,則說明在這16次的socket寫操作后依舊還有未寫完的數(shù)據(jù)等待被繼續(xù)寫新博,那么done就會為false薪夕,那么就會將flush操作封裝為一個task提交至NioEventLoop的任務(wù)隊列中,在下一次事件循環(huán)時繼續(xù)發(fā)送未發(fā)完的數(shù)據(jù)叭披;否則若所有的數(shù)據(jù)都寫完了寥殖,done會被置為true玩讳。注意涩蜘,ch.write操作會返回本次寫操作寫出的字節(jié)數(shù)嚼贡,但該方法返回0時,即localWrittenBytes為0同诫,則說明底層的寫緩沖區(qū)已經(jīng)滿了(這里應(yīng)該指的是linux底層的寫緩沖區(qū)滿了)粤策,這是就會將setOpWrite置為true,此時因為數(shù)據(jù)還沒寫完done還是false误窖。那么這種情況下就會注冊當(dāng)前SocketChannel的寫事件:
當(dāng)?shù)讓泳彌_區(qū)有空余空間時就會觸發(fā)這個寫事件叮盘,繼續(xù)將為寫完的數(shù)據(jù)發(fā)送出去。


最后霹俺,我們來看doWrite()操作中的『in.removeBytes(writtenBytes);』操作
『 if (readableBytes <= writtenBytes) 』這個if判斷表示:本次socket的write操作(這里是真的是網(wǎng)絡(luò)通信寫操作了)已經(jīng)寫出去的字節(jié)數(shù) 大于 了當(dāng)前ByteBuf包可讀取的字節(jié)數(shù)柔吼。 這說明,當(dāng)前這個包中所有的可寫的數(shù)據(jù)都已經(jīng)寫完了(SocketChannel.write(bytebuffer)??這是將byteBuffer中的數(shù)據(jù)讀出來丙唧,然后寫入到socketChannel中)愈魏,既然當(dāng)前這個ByteBuf的數(shù)據(jù)都寫完了,那么久可以將其刪除了想际。即培漏,調(diào)用『remove()』操作,這個操作就會將回調(diào)已經(jīng)注冊到ByteBuf的promise上的所有l(wèi)isteners胡本,這里包括了“IdleStateHandler 的 writeListener(該listener就會完成對lastWriteTime的更新)”牌柄。『remove()』操作還會將當(dāng)前的ByteBuf指向下一個待處理的ByteBuf侧甫。

『目前珊佣,我們可以先理解為,write操作的數(shù)據(jù)最終都會放到ChannelOutboundBuffer中披粟,其中有兩個屬性private Entry unflushedEntry彩扔、private Entry flushedEntry。它們都是用Entry對象通過next指針來維護的一個單向鏈表僻爽。
unflushedEntry表示還未刷新的ByteBuf的鏈表頭虫碉;flushedEntry表示調(diào)用flush()操作時將會進行刷新的ByteBuf的鏈表頭。
在write的時候會將ByteBuf封裝為一個Entry對象放到unflushedEntry的尾部胸梆。當(dāng)調(diào)用flush時敦捧,就會將unflushedEntry賦值給flushedEntry,然后將unflushedEntry置null碰镜。
同時current()返回當(dāng)前正在處理的Entry對象(Entry中封裝了ByteBuf)』

到此為止兢卵,第一個ByteBuf,即記錄著我們要發(fā)送消息長度大小的ByteBuf就發(fā)送出去了绪颖,并且觸發(fā)了一次“IdleStateHandler 的 writeListener”的調(diào)用秽荤。

那么,第二個ByteBuf就是我們的大數(shù)據(jù)包了。

通過上面的分析窃款,我們知道大數(shù)據(jù)包走的是else流程课兄。也就是說,本次真實寫出去的數(shù)據(jù) 比 當(dāng)前這個ByteBuf的可讀取數(shù)據(jù)要小晨继。也就說明烟阐,當(dāng)前這個ByteBuf還沒有被完全的寫完。因此并不會通過調(diào)用『remove()』操作來觸發(fā)“IdleStateHandler 的 writeListener”的回調(diào)紊扬。直到整個大數(shù)據(jù)包所有的內(nèi)容都寫出去了蜒茄,那么這是if(readableBytes <= writtenBytes)才會為真,這是才會去觸發(fā)“IdleStateHandler 的 writeListener”的回調(diào)餐屎。
也就是說檀葛,只有在一個ByteBuf的數(shù)據(jù)全部都寫完了之后,才會去觸發(fā)所有注冊到這個write操作上的GenericFutureListener的回調(diào)腹缩。
netty其實有提供了一個ChannelProgressiveFuture來監(jiān)控數(shù)據(jù)的發(fā)送過程屿聋,它可以實現(xiàn)在一個大數(shù)據(jù)發(fā)送的過程中回調(diào)注冊到其上的ChannelProgressiveFutureListener,比如:

        ChannelProgressivePromise progressivePromise =  ctx.channel().newProgressivePromise();
        progressivePromise.addListener(new ChannelProgressiveFutureListener(){
            @Override
            public void operationProgressed(ChannelProgressiveFuture future, long progress, long total) throws Exception {
                System.out.println("數(shù)據(jù)正在發(fā)送中庆聘。胜臊。。");
            }

            @Override
            public void operationComplete(ChannelProgressiveFuture future) throws Exception {
                System.out.println("數(shù)據(jù)已經(jīng)發(fā)送完了伙判!");
            }
        });

        ctx.writeAndFlush(tempString, progressivePromise);



最后象对。說明下,當(dāng)將大數(shù)據(jù)包拆成一個個小包發(fā)送時宴抚,為什么不會導(dǎo)致寫空閑超時的觸發(fā)勒魔。
因為當(dāng)大數(shù)據(jù)包被拆分成一個個小包發(fā)送時,每個小數(shù)據(jù)包就是一個ByteBuf菇曲,每個ByteBuf待寫出的數(shù)據(jù)量就很小冠绢,比如本例中,我一個ByteBuf就是一個長度為26的英文字符串常潮,那么每次寫操作完成后在removeBytes()操作:

總是進入if為true的語句塊中弟胀。所以會不同的觸發(fā)“IdleStateHandler 的 writeListener”以更新lastWriteTime。


到目前為止喊式,我們已經(jīng)知道導(dǎo)致寫空閑超時的原因所在了孵户。這時我們可以想到的解決方案有:
① 用變量來記錄是否正在發(fā)送中,如果在發(fā)送中岔留,即使寫空閑超時被觸發(fā)也不發(fā)送心跳
② 將打包拆分成小包的方式

更進一步

但是夏哭,我們還有一個疑惑未解決,那就是IdleStateHandler類中observeOutput屬性到底是干啥用的献联?
我們先來看看observeOutput屬性在IdleStateHandler中的使用:
首先在doc文檔中竖配,對observeOutput屬性的描述是“在訪問寫空閑超時時何址,字節(jié)消費是否會被考慮進去,默認(rèn)為false”进胯,也就是說用爪,當(dāng)字節(jié)被消費時,寫空閑超時事件否非該被觸發(fā)龄减。
從上文项钮,我們已經(jīng)得知班眯,只有在每次真正寫完一個Bytebuf后希停,該ByteBuf的異步寫操作才算是完成,那么才會去觸發(fā)該異步寫操作上的listener署隘,也就是這是才會修改IdleStateHandler的lastWriteTime屬性宠能。
起初,我們以為如果將“observeOutput”屬性設(shè)置為true磁餐,那么即使ByteBuf包沒有被完全寫完违崇,但是已經(jīng)有字節(jié)數(shù)據(jù)在被寫出了,那么此時也不應(yīng)該觸發(fā)寫空閑超時事件诊霹。但羞延,結(jié)果卻是寫空閑超時事件依舊被觸發(fā)了。這是為什么了脾还?

下面我們就來好好說說“observeOutput”屬性的作用伴箩,首先我們來看看IdleStateHandler中observeOutput的使用:
private boolean hasOutputChanged(ChannelHandlerContext ctx, boolean first) {
    if (observeOutput) {

        // We can take this shortcut if the ChannelPromises that got passed into write()
        // appear to complete. It indicates "change" on message level and we simply assume
        // that there's change happening on byte level. If the user doesn't observe channel
        // writability events then they'll eventually OOME and there's clearly a different
        // problem and idleness is least of their concerns.
        if (lastChangeCheckTimeStamp != lastWriteTime) {
            lastChangeCheckTimeStamp = lastWriteTime;

            // But this applies only if it's the non-first call.
            if (!first) {
                return true;
            }
        }

        Channel channel = ctx.channel();
        Unsafe unsafe = channel.unsafe();
        ChannelOutboundBuffer buf = unsafe.outboundBuffer();

        if (buf != null) {
            int messageHashCode = System.identityHashCode(buf.current());
            long pendingWriteBytes = buf.totalPendingWriteBytes();

            if (messageHashCode != lastMessageHashCode || pendingWriteBytes != lastPendingWriteBytes) {
                lastMessageHashCode = messageHashCode;
                lastPendingWriteBytes = pendingWriteBytes;

                if (!first) {
                    return true;
                }
            }
        }
    }

    return false;
}

這里“observeOutput”為true情況下,主要會根據(jù)三對數(shù)值的比較情況來覺得輸出是否有改變鄙漏,① lastChangeCheckTimeStamp 與 lastWriteTime嗤谚;② messageHashCode 與 lastMessageHashCode;③ pendingWriteBytes 與 lastPendingWriteBytes怔蚌;
① 和 ② 都好理解巩步,最讓我們困惑的是③,也就是說桦踊,pendingWriteBytes屬性并未像我們猜測的那樣隨著ByteBuf中的數(shù)據(jù)的寫出而改變椅野。 這又是為什么了?

為了解決這個問題籍胯,我們通過反向思考來嘗試的解決竟闪。即,這個值(pendingWriteBytes)是在什么情況下會被修改芒炼?
ChannelOutboundBuffer:

totalPendingSize表示的是所有待發(fā)送的ByteBuf的總長度瘫怜,接每次往ChannelOutboundBuffer添加一個ByteBuf的時候就會增加這個字段:
并且會在,每次發(fā)送完一個ByteBuf后本刽,調(diào)用『decrementPendingOutboundBytes(long size)』來減少totalPendingSize的值鲸湃,其中參數(shù)size為當(dāng)前發(fā)送出去的ByteBuf的數(shù)據(jù)大小赠涮。
確實是在每次寫完一個Bytebuf后才會調(diào)用一次decrementPendingOutboundBytes(long size)。

好了暗挑,現(xiàn)在我們知道笋除,其實pendingWriteBytes實際上也是在一個ByteBuf都寫出后才會被修改的。炸裆。垃它。 那么問題又來了,既然是這樣烹看,那么這個pendingWriteBytes又有什么用了国拇?或者說observeOutput屬性的使用到底是在什么場景下?惯殊?
這個問題其實在hasOutputChanged方法注解的github issues 6150中給出了討論酱吝。
目前能得到的結(jié)論是observeOutput屬性是為了issues 6150問題所提供的解決方案,而這個問題是在通過HTTP2協(xié)議進行數(shù)據(jù)發(fā)送時導(dǎo)致的土思,討論中提及netty在對HTTP2傳輸協(xié)議進行數(shù)據(jù)傳輸時可能會將多個數(shù)據(jù)包整合正一個包發(fā)送導(dǎo)致寫空閑超時事件被觸發(fā)了(因為务热,該問題與本文的問題并無關(guān)聯(lián),所以不做具體說明)己儒。但是通過github issues 6150討論中崎岂,我們得知了netty之所以不提供在寫一個大數(shù)據(jù)包的過程中修改pendingWriteBytes的原因(即,netty不支持某個ByteBuf中寫出部分?jǐn)?shù)據(jù)就修改ChannelOutboundBuffer中totalPendingSize值闪湾。)冲甘,這是為了防止ABA問題。

下面我們對hasOutputChanged進行更深一步的說明响谓,來看看ABA問題可能出現(xiàn)的情況(也就是因為這些情況损合,netty不允許在一個ByteBuf未寫完的情況下就修改ChannelOutboundBuffer中totalPendingSize值):
① 因為IdelStateHandler是可以在非EventLoop現(xiàn)實上執(zhí)行的,也就是說寫空閑超時任務(wù)是可以在非EventLoop線程上執(zhí)行娘纷。這個代碼塊就適用于這種情況下的一個捷徑判斷嫁审。因為listener只會在NioEventLoop線程上執(zhí)行,也就是說赖晶,lastWriteTime只會在NioEventLoop線程上被修改律适。而WriterIdleTimeoutTask則是放到IdelStateHandler所對應(yīng)的Executor中的,當(dāng)IdelStateHandler所對應(yīng)的Executor和NioEventLoop不是同一個時遏插,就可能出現(xiàn)『lastChangeCheckTimeStamp != lastWriteTime』的情況(該判斷在WriterIdleTimeoutTask中被執(zhí)行捂贿,而lastWriteTime在NioEventLoop線程中被修改)。
② 因為如果不是以ByteBuf或者FileRegion為單位修改pending bytes的話胳嘲,可能出現(xiàn)ABA問題厂僧。即,因為write操作可以由多個不同的線程來操作(非EventLoop線程)了牛,這可能導(dǎo)致EventLoop線程在進行該OutboundBuffer中ByteBuf的flush操作時颜屠,其他線程再往這個OutboundBuffer中加數(shù)據(jù)辰妙,這可能使得最終pending bytes的值并沒有改變,但實際上pending bytes是改變過的了甫窟,這樣就會使得判斷錯誤密浑。(PS:目前NIO傳輸時,寫完一個ByteBuf就會觸發(fā)該ByteBuf的listener粗井,那么lastWriteTime就會被修改尔破,此時根本不會進入)
③ 而另一個ABA問題是,如果保持了ByteBuf的引用浇衬,如果使用池的ByteBuf的話(默認(rèn)懒构,Netty就是使用池的ByteBuf),如果我們存儲OutBoundBuffer中的當(dāng)前的(鏈表頭)的那個ByteBuf對象的引用径玖,在每次寫空閑超時事件中判斷這個ByteBuf對象的hashCode與上一次調(diào)用時的值做比較來得出是否是同一個ByteBuf痴脾。??這種情況也可能出現(xiàn)ABA問題颤介,正式因為ByteBuf是池的梳星,那么就可能在寫空閑超時事件回調(diào)方法中存有的ByteBuf引用還是一樣的,但實際上是被回收后再次分配出去的滚朵,因此是邏輯上來說是不一樣的ByteBuf對象了冤灾。


  • “observeOutput” 字段的使用場景:
    當(dāng)在寫一個大數(shù)據(jù)包的時候,且該在寫超時已不是第一次觸發(fā)的時候(即辕近,first 為 false)韵吨,這個大數(shù)據(jù)包還沒寫完。但在此時移宅,我們已經(jīng)有 ch.write(data)了其數(shù)據(jù)了归粉,這會導(dǎo)致『pendingWriteBytes != lastPendingWriteBytes == true』(因為,channelOutboundBytes 只有在一個 ByteBuf 都寫出去后漏峰,即糠悼,寫到 socket 的寫緩沖后。才會減少其totalPendingWriteBytes 的值浅乔。這樣在??這個場景中倔喂,在我們自此write一個data的時候,totalPendingWriteBytes的值會增加)靖苇,因此來表示 outputChanged席噩。也就是說,observeOutput 觀察的是贤壁,是否有新的寫數(shù)據(jù)操作悼枢,而非對已經(jīng)操作的write的數(shù)據(jù)的觀察!F⒉稹馒索!


解決方案

好了给梅,到目前為止,我們已經(jīng)知道為什么我們使用“observeOutput”屬性無法達(dá)到我們預(yù)計的效果了双揪。那么动羽,關(guān)于發(fā)送大數(shù)據(jù)包我們到底可以做處理了。渔期。
這里运吓,我們覺得可以采用的一個方式是,使用“ChunkedWriteHandler”來實現(xiàn)大數(shù)據(jù)包的傳輸疯趟。
這個ChunkedWriteHandler又是怎么突然跑出來的拘哨。。是這樣的信峻,其實之前我們也不曉得有這個類倦青,或者說因為了解不深給忽略了它。正好在解決這個問題的間隙盹舞,將Netty的寫數(shù)據(jù)操作給過了邊产镐,在這其中發(fā)現(xiàn)了Netty自身目前僅對ChunkedWriteHandler和HTTP2的提供了WriteBufferWaterMark的支持,其余的需要我們程序自行添加支持踢步。而WriteBufferWaterMark通常就是為了控制有大量待寫出數(shù)據(jù)的情況下對寫出流量進行控制的一個方式癣亚,這看似和我們的大數(shù)據(jù)包寫出還是有些個關(guān)系的。因此获印,我們通過doc簡單了解了下ChunkedWriteHandler的使用述雾,發(fā)現(xiàn)確實是個可行的方式。在經(jīng)過測試后也如預(yù)期般達(dá)到了我們要的效果兼丰!下面玻孟,我們就來說說如果通過ChunkedWriteHandler來實現(xiàn)大數(shù)據(jù)包發(fā)送的發(fā)送。

這里對ChunkedWriteHandler做一個簡單的介紹:
ChunkedWriteHandler:一個handler鳍征,用于支持異步寫大數(shù)據(jù)流并且不需要消耗大量內(nèi)存也不會導(dǎo)致內(nèi)存溢出錯誤( OutOfMemoryError )黍翎。
ChunkedWriteHandler僅支持ChunkedInput類型的消息。也就是說蟆技,僅當(dāng)消息類型是ChunkedInput時才能實現(xiàn)ChunkedWriteHandler提供的大數(shù)據(jù)包傳輸功能(ChunkedInput是一個不確定長度的數(shù)據(jù)流)玩敏。
ChunkedWriteHandler中維護了一個待發(fā)送的數(shù)據(jù)包消息隊列(Queue<PendingWrite> queue,其中PendingWrite封裝了你待發(fā)送的消息以及異步寫操作的promise)

public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
    queue.add(new PendingWrite(msg, promise));
}

這樣使得你write的數(shù)據(jù)包在經(jīng)過ChunkedWriteHandler的時候质礼,會先被存儲到這個消息隊列中旺聚,并不會立即放入到ChannelOutboundBuffer里。

而當(dāng)你執(zhí)行flush操作時眶蕉,ChunkedWriteHandler會依次取出消息隊列中的大數(shù)據(jù)包砰粹,然后拆分成一個個小數(shù)據(jù)包(ByteBuf)后發(fā)給下游的ChannelOutboundHandler,并且在每次發(fā)送完一個ByteBuf包后都會立即執(zhí)行依次ctx.flush()操作將該ByteBuf發(fā)送到網(wǎng)絡(luò)中。

但也正是因為碱璃,ChunkedWriteHandler將一個大數(shù)據(jù)包拆分成了一個個小數(shù)據(jù)包放入底層的ChannelOutboundBuffer進行傳輸弄痹,這使得你對大數(shù)據(jù)包的異步寫操作注冊的listener在底層的ChannelOutboundBuffer已經(jīng)無法得到并且回調(diào)了,這就需要我們通過程序來進行狀態(tài)的管理以保持我們原有邏輯的正確性嵌器。
ChunkedWriteHandler會為每個發(fā)送的小數(shù)據(jù)包注冊一個listener肛真,這個listener會在小數(shù)據(jù)包成功發(fā)送完成后調(diào)用原始大數(shù)據(jù)包的GenericProgressiveFutureListener,上面我們已經(jīng)說了通過GenericProgressiveFutureListener我們可以監(jiān)控數(shù)據(jù)包的發(fā)送進度(通過回調(diào)operationProgressed方法實現(xiàn))爽航,以及在大數(shù)據(jù)包發(fā)送完后得到一個通知(通過回調(diào)operationComplete方法實現(xiàn))蚓让。因此,我們可以在operationComplete回調(diào)方法中對寫原始大數(shù)據(jù)包的異步操作上注冊的listener進行回調(diào)(通過表示寫異步操作promise完成來實現(xiàn))讥珍。

值得一提的時历极,ChunkedWriteHandler對將大數(shù)據(jù)包拆分成小數(shù)據(jù)包發(fā)往下游進行的操作是受WriteBufferWaterMark控制的,當(dāng)寫緩沖區(qū)中的數(shù)據(jù)數(shù)量超過了設(shè)置的高水位標(biāo)志衷佃,那么Channel#isWritable()方法將開始返回false趟卸,那么此時ChunkedWriteHandler就不會繼續(xù)拆分大數(shù)據(jù)包。然后當(dāng)寫緩沖區(qū)中的字節(jié)數(shù)量減少至小于了低水位標(biāo)志氏义,Channel#isWritable()方法會重新開始返回true锄列,而此時ChunkedWriteHandler會繼續(xù)拆分未拆分完的大數(shù)據(jù)包,繼續(xù)數(shù)據(jù)的寫操作觅赊。

絮絮叨叨了這么多右蕊,來看看具體的實現(xiàn):
首先修改了MyClientInitializer:

public class MyClientInitializer extends ChannelInitializer<SocketChannel> {
    @Override
    protected void initChannel(SocketChannel ch) throws Exception {
        ChannelPipeline pipeline = ch.pipeline();
        pipeline.addFirst("idleStateHandler", new IdleStateHandler(true, 9, 2, 11, TimeUnit.SECONDS));
        pipeline.addLast(new LengthFieldBasedFrameDecoder(ByteOrder.LITTLE_ENDIAN, Integer.MAX_VALUE,
                0, 4, 0, 4, true));
        pipeline.addLast(new LengthFieldPrepender(ByteOrder.LITTLE_ENDIAN, 4, 0, false));
        pipeline.addLast("chunkedWriteHandler", new ChunkedWriteHandler());
        pipeline.addLast("myClientChunkHandler", new MyClientChunkHandler());
        pipeline.addLast(new StringEncoder(CharsetUtil.UTF_8));
        pipeline.addLast(new StringDecoder(CharsetUtil.UTF_8));
        pipeline.addLast(new MyClientHandler());
    }
}

在StringEncoder和LengthFieldPrepender兩個編碼器間添加了MyClientChunkHandler和ChunkedWriteHandler。
MyChunkedWriteHandler是一個出站處理器吮螺,它會完成將StringEncoder編碼后的大數(shù)據(jù)包類型轉(zhuǎn)換成ChannelInputStream類型,以使得其后的ChunkedWriteHandler能夠?qū)υ摯髷?shù)據(jù)包實現(xiàn)拆分分發(fā)的作用帕翻。

接下來是自定義的 MyClientChunkHandler鸠补,用于將我們的待發(fā)送的大數(shù)據(jù)包類型轉(zhuǎn)換成ChunkedInput類型,以使得ChunkedWriteHandler能夠發(fā)揮作用嘀掸。

public class MyServerChunkHandler extends ChannelOutboundHandlerAdapter {

    @Override
    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
        if(msg instanceof ByteBuf) {
            ByteBuf buf = (ByteBuf)msg;
            ByteInputStream in = new ByteInputStream();
            byte[] data = null;
            if(buf.hasArray()) {
                System.out.println("+++ is array");
                data = buf.array().clone();
            } else {
                System.out.println("--- is direct");
                data = new byte[buf.readableBytes()];
                buf.writeBytes(data);

            }
//            System.out.println("===== data length : " + data.length);
            in.setBuf(data);
            ChunkedStream stream = new ChunkedStream(in);

            ReferenceCountUtil.release(msg);
            ctx.write(stream, promise);
        } else {
            super.write(ctx, msg, promise);
        }
    }
}

??實現(xiàn)了將數(shù)據(jù)包類型轉(zhuǎn)換為ByteInputStream類型紫岩,傳遞個下一個ChannelOutboundHandler(也就是ChunkedWriteHandler)

后記

本次問題和朋友陸陸續(xù)續(xù)的討論了兩個晚上,印象還是比較深刻的睬塌。在第一次討論問題的時候泉蝌,我們對Netty的寫數(shù)據(jù)流程也沒有比較清晰的概念。后面將這塊流程補上后揩晴,再重新回來看待問題勋陪,感覺又清晰了不少,再者對于IdleStateHandler的observeOutput屬性確實是比較容易讓人誤解硫兰。如果沒有去翻查github和源碼的話诅愚,不容易明白這個屬性真正的用意。但也正是因為對Netty寫數(shù)據(jù)流程的梳理劫映,讓我們發(fā)現(xiàn)了一直忽略ChunkedWriteHandler违孝,也讓這個問題有了現(xiàn)在的這個解決方案刹前。當(dāng)然,可能隨著后面進一步深入的學(xué)習(xí)雌桑,我們會發(fā)現(xiàn)更好的解決方案喇喉,那么到時候也會繼續(xù)分享的。
若文章有任何錯誤校坑,望大家不吝指教:)

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末轧飞,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子撒踪,更是在濱河造成了極大的恐慌过咬,老刑警劉巖,帶你破解...
    沈念sama閱讀 218,941評論 6 508
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件制妄,死亡現(xiàn)場離奇詭異掸绞,居然都是意外死亡,警方通過查閱死者的電腦和手機耕捞,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,397評論 3 395
  • 文/潘曉璐 我一進店門衔掸,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人俺抽,你說我怎么就攤上這事敞映。” “怎么了磷斧?”我有些...
    開封第一講書人閱讀 165,345評論 0 356
  • 文/不壞的土叔 我叫張陵振愿,是天一觀的道長。 經(jīng)常有香客問我弛饭,道長冕末,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 58,851評論 1 295
  • 正文 為了忘掉前任侣颂,我火速辦了婚禮档桃,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘憔晒。我一直安慰自己藻肄,他們只是感情好,可當(dāng)我...
    茶點故事閱讀 67,868評論 6 392
  • 文/花漫 我一把揭開白布拒担。 她就那樣靜靜地躺著嘹屯,像睡著了一般。 火紅的嫁衣襯著肌膚如雪澎蛛。 梳的紋絲不亂的頭發(fā)上抚垄,一...
    開封第一講書人閱讀 51,688評論 1 305
  • 那天,我揣著相機與錄音,去河邊找鬼呆馁。 笑死桐经,一個胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的浙滤。 我是一名探鬼主播阴挣,決...
    沈念sama閱讀 40,414評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼纺腊!你這毒婦竟也來了畔咧?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 39,319評論 0 276
  • 序言:老撾萬榮一對情侶失蹤揖膜,失蹤者是張志新(化名)和其女友劉穎誓沸,沒想到半個月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體壹粟,經(jīng)...
    沈念sama閱讀 45,775評論 1 315
  • 正文 獨居荒郊野嶺守林人離奇死亡拜隧,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,945評論 3 336
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了趁仙。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片洪添。...
    茶點故事閱讀 40,096評論 1 350
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖雀费,靈堂內(nèi)的尸體忽然破棺而出干奢,到底是詐尸還是另有隱情,我是刑警寧澤盏袄,帶...
    沈念sama閱讀 35,789評論 5 346
  • 正文 年R本政府宣布忿峻,位于F島的核電站,受9級特大地震影響貌矿,放射性物質(zhì)發(fā)生泄漏炭菌。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 41,437評論 3 331
  • 文/蒙蒙 一逛漫、第九天 我趴在偏房一處隱蔽的房頂上張望谅畅。 院中可真熱鬧滑蚯,春花似錦、人聲如沸但校。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,993評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至掰曾,卻和暖如春旭蠕,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 33,107評論 1 271
  • 我被黑心中介騙來泰國打工掏熬, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留佑稠,地道東北人。 一個月前我還...
    沈念sama閱讀 48,308評論 3 372
  • 正文 我出身青樓旗芬,卻偏偏與公主長得像舌胶,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子疮丛,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 45,037評論 2 355

推薦閱讀更多精彩內(nèi)容