自頂向下深入分析Netty(八)--CodecHandler

編解碼處理器作為Netty編程時(shí)必備的ChannelHandler,每個(gè)應(yīng)用都必不可少粟誓。Netty作為網(wǎng)絡(luò)應(yīng)用框架报咳,在網(wǎng)絡(luò)上的各個(gè)應(yīng)用之間不斷進(jìn)行數(shù)據(jù)交互。而網(wǎng)絡(luò)數(shù)據(jù)交換的基本單位是字節(jié)凌彬,所以需要將本應(yīng)用的POJO對(duì)象編碼為字節(jié)數(shù)據(jù)發(fā)送到其他應(yīng)用,或者將收到的其他應(yīng)用的字節(jié)數(shù)據(jù)解碼為本應(yīng)用可使用的POJO對(duì)象循衰。這一部分铲敛,又和JAVA中的序列化和反序列化對(duì)應(yīng)。幸運(yùn)的是会钝,有很多其他的開源工具(protobuf伐蒋,thrift,json迁酸,xml等等)可方便的處理POJO對(duì)象的序列化先鱼,可參見這個(gè)鏈接
在互聯(lián)網(wǎng)中奸鬓,Netty使用TCP/UDP協(xié)議傳輸數(shù)據(jù)焙畔。由于Netty基于異步事件處理以及TCP的一些特性,使得TCP數(shù)據(jù)包會(huì)發(fā)生粘包現(xiàn)象串远。想象這樣的情況宏多,客戶端與服務(wù)端建立連接后,連接發(fā)送了兩條消息:

    +------+   +------+
    | MSG1 |   | MSG2 |
    +------+   +------+

在互聯(lián)網(wǎng)上傳輸數(shù)據(jù)時(shí)澡罚,連續(xù)發(fā)送的兩條消息伸但,在服務(wù)端極有可能被合并為一條:

    +------------+
    | MSG1  MSG2 |
    +------------+

這還不是最壞的情況,由于路由器的拆包和重組留搔,可能收到這樣的兩個(gè)數(shù)據(jù)包:

    +----+     +---------+        +-------+    +-----+ 
    | MS |     |  G1MSG2 |  或者  | MSG1M |    | SG2 | 
    +----+     +---------+        +-------+    +-----+

而服務(wù)端要正確的識(shí)別出這樣的兩條消息更胖,就需要編碼器的正確工作。為了正確的識(shí)別出消息隔显,業(yè)界有以下幾種做法:

  1. 使用定界符分割消息却妨,一個(gè)特例是使用換行符分隔每條消息。
  2. 使用定長的消息荣月。
  3. 在消息的某些字段指明消息長度管呵。

明白了這些,進(jìn)入正題哺窄,分析Netty的編碼框架ByteToMessageDecoder捐下。

8.1 ByteToMessageDecoder

在分析之前,需要說明一點(diǎn):ByteToMessage容易引起誤解萌业,解碼結(jié)果Message會(huì)被認(rèn)為是JAVA對(duì)象POJO坷襟,但實(shí)際解碼結(jié)果是消息幀。也就是說該解碼器處理TCP的粘包現(xiàn)象生年,將網(wǎng)絡(luò)發(fā)送的字節(jié)流解碼為具有確定含義的消息幀婴程,之后的解碼器再將消息幀解碼為實(shí)際的POJO對(duì)象。
明白了這點(diǎn)抱婉,再次回顧兩條消息發(fā)送的最壞情況档叔,可知要正確取得兩條消息桌粉,需要一個(gè)內(nèi)存區(qū)域存儲(chǔ)消息,當(dāng)收到MS時(shí)繼續(xù)等待第二個(gè)包G1MSG2到達(dá)再進(jìn)行解碼操作衙四。在ByteToMessageDecoder中铃肯,這個(gè)內(nèi)存區(qū)域被抽象為Cumulator,直譯累積器传蹈,可自動(dòng)擴(kuò)容累積字節(jié)數(shù)據(jù)押逼,Netty將其定義為一個(gè)接口:

    public interface Cumulator {
        ByteBuf cumulate(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf in);
    }

其中,兩個(gè)ByteBuf參數(shù)cumulation指已經(jīng)累積的字節(jié)數(shù)據(jù)惦界,in表示該次channelRead()讀取到的新數(shù)據(jù)挑格。返回ByteBuf為累積數(shù)據(jù)后的新累積區(qū)(必要時(shí)候自動(dòng)擴(kuò)容)。自動(dòng)擴(kuò)容的代碼如下:

    static ByteBuf expandCumulation(ByteBufAllocator alloc, ByteBuf cumulation, 
                                       int newReadBytes) {
        ByteBuf oldCumulation = cumulation;
        // 擴(kuò)容后新的緩沖區(qū)
        cumulation = alloc.buffer(oldCumulation.readableBytes() + readable);
        cumulation.writeBytes(oldCumulation);
        // 舊的緩沖區(qū)釋放
        oldCumulation.release();
        return cumulation;
    }

自動(dòng)擴(kuò)容的方法簡單粗暴沾歪,直接使用大容量的Bytebuf替換舊的ByteBuf漂彤。Netty定義了兩個(gè)累積器,一個(gè)為MERGE_CUMULATOR

    public static final Cumulator MERGE_CUMULATOR = new Cumulator() {
        @Override
        public ByteBuf cumulate(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf in) {
            ByteBuf buffer;
            // 1.累積區(qū)容量不夠容納數(shù)據(jù)
            // 2.用戶使用了slice().retain()或duplicate().retain()使refCnt增加
            if (cumulation.writerIndex() > cumulation.maxCapacity() - in.readableBytes()
                    || cumulation.refCnt() > 1) {
                buffer = expandCumulation(alloc, cumulation, in.readableBytes());
            } else {
                buffer = cumulation;
            }
            buffer.writeBytes(in);
            in.release();
            return buffer;
        }
    };

可知瞬逊,兩種情況下會(huì)擴(kuò)容:

  1. 累積區(qū)容量不夠容納新讀入的數(shù)據(jù)
  2. 用戶使用了slice().retain()duplicate().retain()使refCnt增加并且大于1显歧,此時(shí)擴(kuò)容返回一個(gè)新的累積區(qū)ByteBuf仪或,方便用戶對(duì)老的累積區(qū)ByteBuf進(jìn)行后續(xù)處理确镊。

另一個(gè)累積器為COMPOSITE_CUMULATOR

    public static final Cumulator COMPOSITE_CUMULATOR = new Cumulator() {
        @Override
        public ByteBuf cumulate(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf in) {
            ByteBuf buffer;
            if (cumulation.refCnt() > 1) {
                buffer = expandCumulation(alloc, cumulation, in.readableBytes());
                buffer.writeBytes(in);
                in.release();
            } else {
                CompositeByteBuf composite;
                if (cumulation instanceof CompositeByteBuf) {
                    composite = (CompositeByteBuf) cumulation;
                } else {
                    composite = alloc.compositeBuffer(Integer.MAX_VALUE);
                    composite.addComponent(true, cumulation);
                }
                composite.addComponent(true, in);
                buffer = composite;
            }
            return buffer;
        }
    };

這個(gè)累積器只在第二種情況refCnt>1時(shí)擴(kuò)容,除此之外處理和MERGE_CUMULATOR一致范删,不同的是當(dāng)cumulation不是CompositeByteBuf時(shí)會(huì)創(chuàng)建新的同類CompositeByteBuf蕾域,這樣最后返回的ByteBuf必定是CompositeByteBuf。使用這個(gè)累積器后到旦,當(dāng)容量不夠時(shí)并不會(huì)進(jìn)行內(nèi)存復(fù)制旨巷,只會(huì)講新讀入的in加到CompositeByteBuf中。需要注意的是:此種情況下雖然不需內(nèi)存復(fù)制添忘,卻要求用戶維護(hù)復(fù)雜的索引采呐,在某些使用中可能慢于MERGE_CUMULATOR。故Netty默認(rèn)使用MERGE_CUMULATOR累積器搁骑。
累積器分析完畢斧吐,步入正題ByteToMessageDecoder,首先看類簽名:

    public abstract class ByteToMessageDecoder extends
                                ChannelInboundHandlerAdapter

該類是一個(gè)抽象類仲器,其中的抽象方法只有一個(gè)decode()

    protected abstract void decode(ChannelHandlerContext ctx, ByteBuf in, 
            List<Object> out) throws Exception;

用戶使用了該解碼框架后煤率,只需實(shí)現(xiàn)該方法就可定義自己的解碼器。參數(shù)in表示累積器已累積的數(shù)據(jù)乏冀,out表示本次可從累積數(shù)據(jù)解碼出的結(jié)果列表蝶糯,結(jié)果可為POJO對(duì)象或者ByteBuf等等Object。
關(guān)注一下成員變量辆沦,以便更好的分析:

    ByteBuf cumulation; // 累積區(qū)
    private Cumulator cumulator = MERGE_CUMULATOR; // 累積器
    // 設(shè)置為true后每個(gè)channelRead事件只解碼出一個(gè)結(jié)果
    private boolean singleDecode;   // 某些特殊協(xié)議使用
    private boolean decodeWasNull;  // 解碼結(jié)果為空
    private boolean first;  // 是否首個(gè)消息
    // 累積區(qū)不丟棄字節(jié)的最大次數(shù)昼捍,16次后開始丟棄
    private int discardAfterReads = 16;
    private int numReads;   // 累積區(qū)不丟棄字節(jié)的channelRead次數(shù)

下面识虚,直接進(jìn)入channelRead()事件處理:

    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        // 只對(duì)ByteBuf處理即只對(duì)字節(jié)數(shù)據(jù)進(jìn)行處理
        if (msg instanceof ByteBuf) {
            // 解碼結(jié)果列表
            CodecOutputList out = CodecOutputList.newInstance();
            try {
                ByteBuf data = (ByteBuf) msg;
                first = cumulation == null; // 累積區(qū)為空表示首次解碼
                if (first) {
                    // 首次解碼直接使用讀入的ByteBuf作為累積區(qū)
                    cumulation = data;
                } else {
                    // 非首次需要進(jìn)行字節(jié)數(shù)據(jù)累積
                    cumulation = cumulator.cumulate(ctx.alloc(), cumulation, data);
                }
                callDecode(ctx, cumulation, out); // 解碼操作
            } catch (DecoderException e) {
                throw e;
            } catch (Throwable t) {
                throw new DecoderException(t);
            } finally {
                if (cumulation != null && !cumulation.isReadable()) {
                    // 此時(shí)累積區(qū)不再有字節(jié)數(shù)據(jù),已被處理完畢
                    numReads = 0;
                    cumulation.release();
                    cumulation = null;
                } else if (++ numReads >= discardAfterReads) {
                    // 連續(xù)discardAfterReads次后
                    // 累積區(qū)還有字節(jié)數(shù)據(jù)妒茬,此時(shí)丟棄一部分?jǐn)?shù)據(jù)
                    numReads = 0;
                    discardSomeReadBytes(); // 丟棄一些已讀字節(jié)
                }

                int size = out.size();
                // 本次沒有解碼出數(shù)據(jù)舷礼,此時(shí)size=0
                decodeWasNull = !out.insertSinceRecycled();
                fireChannelRead(ctx, out, size); // 觸發(fā)事件
                out.recycle();  // 回收解碼結(jié)果
            }
        } else {
            ctx.fireChannelRead(msg);
        }
    }

解碼結(jié)果列表CodecOutputList是Netty定制的一個(gè)特殊列表,該列表在線程中被緩存郊闯,可循環(huán)使用來存儲(chǔ)解碼結(jié)果妻献,減少不必要的列表實(shí)例創(chuàng)建冕茅,從而提升性能捏浊。由于解碼結(jié)果需要頻繁存儲(chǔ)菇存,普通的ArrayList難以滿足該需求寝凌,故定制化了一個(gè)特殊列表与帆,由此可見Netty對(duì)優(yōu)化的極致追求国瓮。
注意finally塊的第一個(gè)if情況滿足時(shí)糕簿,即累積區(qū)的數(shù)據(jù)已被讀取完畢屈扎,請(qǐng)考慮釋放累積區(qū)的必要性怀挠。想象這樣的情況析蝴,當(dāng)一條消息被解碼完畢后,如果客戶端長時(shí)間不發(fā)送消息绿淋,那么闷畸,服務(wù)端保存該條消息的累積區(qū)將一直占據(jù)服務(wù)端內(nèi)存浪費(fèi)資源,所以必須釋放該累積區(qū)吞滞。
第二個(gè)if情況滿足時(shí)佑菩,即累積區(qū)的數(shù)據(jù)一直在channelRead讀取數(shù)據(jù)進(jìn)行累積和解碼,直到達(dá)到了discardAfterReads次(默認(rèn)16)裁赠,此時(shí)累積區(qū)依然還有數(shù)據(jù)殿漠。在這樣的情況下,Netty主動(dòng)丟棄一些字節(jié)佩捞,這是為了防止該累積區(qū)占用大量內(nèi)存甚至耗盡內(nèi)存引發(fā)OOM绞幌。
處理完這些情況后,最后統(tǒng)一觸發(fā)ChannelRead事件一忱,將解碼出的數(shù)據(jù)傳遞給下一個(gè)處理器莲蜘。注意:當(dāng)out=0時(shí),統(tǒng)一到一起被處理了掀潮。
再看細(xì)節(jié)的discardSomeReadBytes()fireChannelRead()

    protected final void discardSomeReadBytes() {
        if (cumulation != null && !first && cumulation.refCnt() == 1) {
            cumulation.discardSomeReadBytes();
        }
    }
    
    static void fireChannelRead(ChannelHandlerContext ctx, CodecOutputList msgs, 
                        int numElements) {
        for (int i = 0; i < numElements; i ++) {
            ctx.fireChannelRead(msgs.getUnsafe(i));
        }
    }

代碼比較簡單菇夸,只需注意discardSomeReadBytes中,累積區(qū)的refCnt() == 1時(shí)才丟棄數(shù)據(jù)是因?yàn)椋喝绻脩羰褂昧?code>slice().retain()和duplicate().retain()使refCnt>1仪吧,表明該累積區(qū)還在被用戶使用庄新,丟棄數(shù)據(jù)可能導(dǎo)致用戶的困惑,所以須確定用戶不再使用該累積區(qū)的已讀數(shù)據(jù),此時(shí)才丟棄择诈。
下面分析解碼核心方法callDecode()

    protected void callDecode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
        try {
            while (in.isReadable()) {
                int outSize = out.size();

                if (outSize > 0) {
                    // 解碼出消息就立即處理械蹋,防止消息等待
                    fireChannelRead(ctx, out, outSize);
                    out.clear();
                    
                    // 用戶主動(dòng)刪除該Handler,繼續(xù)操作in是不安全的
                    if (ctx.isRemoved()) {
                        break;
                    }
                    outSize = 0;
                }

                int oldInputLength = in.readableBytes();
                decode(ctx, in, out);   // 子類需要實(shí)現(xiàn)的具體解碼步驟

                // 用戶主動(dòng)刪除該Handler羞芍,繼續(xù)操作in是不安全的
                if (ctx.isRemoved()) {
                    break; 
                }
                
                // 此時(shí)outSize都==0(這的代碼容易產(chǎn)生誤解 應(yīng)該直接使用0)
                if (outSize == out.size()) {
                    if (oldInputLength == in.readableBytes()) {
                        // 沒有解碼出消息哗戈,且沒讀取任何in數(shù)據(jù)
                        break;
                    } else {
                        // 讀取了一部份數(shù)據(jù)但沒有解碼出消息
                        // 說明需要更多的數(shù)據(jù),故繼續(xù)
                        continue;
                    }
                }

                // 運(yùn)行到這里outSize>0 說明已經(jīng)解碼出消息
                if (oldInputLength == in.readableBytes()) {
                    // 解碼出消息但是in的讀索引不變荷科,用戶的decode方法有Bug
                    throw new DecoderException(
                            "did not read anything but decoded a message.");
                }
                
                // 用戶設(shè)定一個(gè)channelRead事件只解碼一次
                if (isSingleDecode()) {
                    break; 
                }
            }
        } catch (DecoderException e) {
            throw e;
        } catch (Throwable cause) {
            throw new DecoderException(cause);
        }
    }

循環(huán)中的第一個(gè)if分支唯咬,檢查解碼結(jié)果,如果已經(jīng)解碼出消息則立即將消息傳播到下一個(gè)處理器進(jìn)行處理畏浆,這樣可使消息得到及時(shí)處理胆胰。在調(diào)用decode()方法的前后,都檢查該Handler是否被用戶從ChannelPipeline中刪除刻获,如果刪除則跳出解碼步驟不對(duì)輸入緩沖區(qū)in進(jìn)行操作蜀涨,因?yàn)槔^續(xù)操作in已經(jīng)不安全。解碼完成后蝎毡,對(duì)in解碼前后的讀索引進(jìn)行了檢查厚柳,防止用戶的錯(cuò)誤使用,如果用戶錯(cuò)誤使用將拋出異常沐兵。
至此别垮,核心的解碼框架已經(jīng)分析完畢,再看最后的一些邊角處理痒筒。首先是channelReadComplete()讀事件完成后的處理:

    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        numReads = 0;   // 連續(xù)讀次數(shù)置0
        discardSomeReadBytes(); // 丟棄已讀數(shù)據(jù)宰闰,節(jié)約內(nèi)存
        if (decodeWasNull) {
            // 沒有解碼出結(jié)果茬贵,則期待更多數(shù)據(jù)讀入
            decodeWasNull = false;
            if (!ctx.channel().config().isAutoRead()) {
                ctx.read();
            }
        }
        ctx.fireChannelReadComplete();
    }

如果channelRead()中沒有解碼出消息簿透,極有可能是數(shù)據(jù)不夠,由此調(diào)用ctx.read()期待讀入更多的數(shù)據(jù)解藻。如果設(shè)置了自動(dòng)讀取老充,將會(huì)在HeadHandler中調(diào)用ctx.read();沒有設(shè)置自動(dòng)讀取螟左,則需要此處顯式調(diào)用啡浊。
最后再看Handler從ChannelPipelien中移除的處理handlerRemoved():

    public final void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
        ByteBuf buf = cumulation;
        if (buf != null) {
            cumulation = null;  // 釋放累積區(qū),GC回收

            int readable = buf.readableBytes();
            if (readable > 0) {
                ByteBuf bytes = buf.readBytes(readable);
                buf.release();
                // 解碼器已被刪除故不再解碼胶背,只將數(shù)據(jù)傳播到下一個(gè)Handler
                ctx.fireChannelRead(bytes);
            } else {
                buf.release();
            }

            numReads = 0;   // 置0巷嚣,有可能被再次添加
            ctx.fireChannelReadComplete();
        }
        handlerRemoved0(ctx);   // 用戶可進(jìn)行的自定義處理
    }

當(dāng)解碼器被刪除時(shí),如果還有沒被解碼的數(shù)據(jù)钳吟,則將數(shù)據(jù)傳播到下一個(gè)處理器處理廷粒,防止丟失數(shù)據(jù)。此外,當(dāng)連接不再有效觸發(fā)channelInactive事件或者觸發(fā)ChannelInputShutdownEvent時(shí)坝茎,則會(huì)調(diào)用callDecode()解碼涤姊,如果解碼出消息,傳播到下一個(gè)處理器嗤放。這部分的代碼不再列出思喊。
至此,ByteToMessageDecoder解碼框架已分析完畢次酌,下面恨课,我們選用具體的實(shí)例進(jìn)行分析。

8.1.1 LineBasedFrameDecoder

基于行分隔的解碼器LineBasedFrameDecoder是一個(gè)特殊的分隔符解碼器岳服,該解碼器使用的分隔符為:windows的\r\n和類linux的\n庄呈。
首先看該類定義的成員變量:

    // 最大幀長度,超過此長度將拋出異常TooLongFrameException
    private final int maxLength;
    // 是否快速失敗派阱,true-檢測到幀長度過長立即拋出異常不在讀取整個(gè)幀
    // false-檢測到幀長度過長依然讀完整個(gè)幀再拋出異常
    private final boolean failFast;
    // 是否略過分隔符诬留,true-解碼結(jié)果不含分隔符
    private final boolean stripDelimiter;

    // 超過最大幀長度是否丟棄字節(jié)
    private boolean discarding;
    private int discardedBytes; // 丟棄的字節(jié)數(shù)

其中,前三個(gè)變量可由用戶根據(jù)實(shí)際情況配置贫母,后兩個(gè)變量解碼時(shí)使用文兑。
該子類覆蓋的解碼方法如下:

    protected final void decode(ChannelHandlerContext ctx, ByteBuf in, 
                   List<Object> out) throws Exception {
        Object decoded = decode(ctx, in);
        if (decoded != null) {
            out.add(decoded);
        }
    }

其中又定義了decode(ctx, in)解碼出單個(gè)消息幀,事實(shí)上這也是其他編碼子類使用的方法腺劣。decode(ctx, in)方法處理很繞彎绿贞,只給出偽代碼:

    protected Object decode(ChannelHandlerContext ctx, ByteBuf buffer) throws Exception {
        final int eol = findEndOfLine(buffer);
        if (!discarding) {
            if (eol >= 0) {
                // 此時(shí)已找到換行符
                if(!checkMaxLength()) {
                    return getFrame().retain();
                } 
                // 超過最大長度拋出異常
            } else {
                if (checkMaxLength()) {
                    // 設(shè)置true表示下一次解碼需要丟棄字節(jié)
                    discarding = true;  
                    if (failFast) {
                        // 拋出異常
                    }
                } 
            }
        } else {
            if (eol >= 0) {
                // 丟棄換行符以及之前的字節(jié)
                buffer.readerIndex(eol + delimLength);
            } else {
                // 丟棄收到的所有字節(jié)
                buffer.readerIndex(buffer.writerIndex());
            }
        }
    }

該方法需要結(jié)合解碼框架的while循環(huán)反復(fù)理解,每個(gè)if情況都是一次while循環(huán)橘原,而變量discarding就成為控制每次解碼流程的狀態(tài)量籍铁,注意其中的狀態(tài)轉(zhuǎn)移。(想法:使用狀態(tài)機(jī)實(shí)現(xiàn)趾断,則流程更清晰)

8.1.2 DelimiterBasedFrameDecoder

該解碼器是更通用的分隔符解碼器拒名,可支持多個(gè)分隔符,每個(gè)分隔符可為一個(gè)或多個(gè)字符芋酌。如果定義了多個(gè)分隔符增显,并且可解碼出多個(gè)消息幀,則選擇產(chǎn)生最小幀長的結(jié)果脐帝。例如同云,使用行分隔符\r\n\n分隔:

    +--------------+
    | ABC\nDEF\r\n |
    +--------------+

可有兩種結(jié)果:

    +-----+-----+              +----------+   
    | ABC | DEF |  (√)   和    | ABC\nDEF |  (×)
    +-----+-----+              +----------+

該編碼器可配置的變量與LineBasedFrameDecoder類似,只是多了一個(gè)ByteBuf[] delimiters用于配置具體的分隔符堵腹。
Netty在Delimiters類中定義了兩種默認(rèn)的分隔符炸站,分別是NULL分隔符和行分隔符:

    public static ByteBuf[] nulDelimiter() {
        return new ByteBuf[] {
                Unpooled.wrappedBuffer(new byte[] { 0 }) };
    }
    
    public static ByteBuf[] lineDelimiter() {
        return new ByteBuf[] {
                Unpooled.wrappedBuffer(new byte[] { '\r', '\n' }),
                Unpooled.wrappedBuffer(new byte[] { '\n' }),
        };
    }

8.1.3 FixedLengthFrameDecoder

該解碼器十分簡單,按照固定長度frameLength解碼出消息幀疚顷。如下的數(shù)據(jù)幀解碼為固定長度3的消息幀示例如下:

    +---+----+------+----+      +-----+-----+-----+
    | A | BC | DEFG | HI |  ->  | ABC | DEF | GHI |
    +---+----+------+----+      +-----+-----+-----+

其中的解碼方法也十分簡單:

    protected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
        if (in.readableBytes() < frameLength) {
            return null;
        } else {
            return in.readSlice(frameLength).retain();
        }
    }

8.1.4 LengthFieldBasedFrameDecoder

基于長度字段的消息幀解碼器旱易,該解碼器可根據(jù)數(shù)據(jù)包中的長度字段動(dòng)態(tài)的解碼出消息幀。一個(gè)推薦的二進(jìn)制傳輸協(xié)議可設(shè)計(jì)為如下格式:

    +----------+------+----------+------+
    |  頭部長度 |  頭部 |  數(shù)據(jù)長度 | 數(shù)據(jù) |
    +----------+------+----------+------+

這樣的協(xié)議可滿足大多數(shù)場景使用,但不幸的是:很多情況下并不可以設(shè)計(jì)新的協(xié)議咒唆,往往要在老舊的協(xié)議上傳輸數(shù)據(jù)届垫。由此,Netty將該解碼器設(shè)計(jì)的十分通用全释,只要有類似的長度字段便能正確解碼出消息幀装处。當(dāng)然前提是:正確使用解碼器。
沒有什么是完美的浸船,由于該解碼器十分通用妄迁,所以有大量的配置變量:

    private final ByteOrder byteOrder;
    private final int maxFrameLength;
    private final boolean failFast;
    private final int lengthFieldOffset;
    private final int lengthFieldLength;
    private final int lengthAdjustment;
    private final int initialBytesToStrip;

變量byteOrder表示長度字段的字節(jié)序:大端或小端,默認(rèn)為大端李命。如果對(duì)字節(jié)序有疑問登淘,請(qǐng)查閱其他資料,不再贅述封字。maxFrameLengthfailFast與其他解碼器相同黔州,控制最大幀長度和快速失敗拋異常,注意:該解碼器failFast默認(rèn)為true阔籽。
接下來將重點(diǎn)介紹其它四個(gè)變量:

  1. lengthFieldOffset表示長度字段偏移量即在一個(gè)數(shù)據(jù)包中長度字段的具體下標(biāo)位置流妻。標(biāo)準(zhǔn)情況,該長度字段為數(shù)據(jù)部分長度笆制。
  2. lengthFieldLength表示長度字段的具體字節(jié)數(shù)绅这,如一個(gè)int占4字節(jié)。該解碼器支持的字節(jié)數(shù)有:1在辆,2证薇,3,4和8匆篓,其他則會(huì)拋出異常浑度。另外,還需要注意的是:長度字段的結(jié)果為無符號(hào)數(shù)奕删。
  3. lengthAdjustment是一個(gè)長度調(diào)節(jié)量俺泣,當(dāng)數(shù)據(jù)包的長度字段不是數(shù)據(jù)部分長度而是總長度時(shí),可將此值設(shè)定為頭部長度完残,便能正確解碼出包含整個(gè)數(shù)據(jù)包的結(jié)果消息幀。注意:某些情況下横漏,該值可設(shè)定為負(fù)數(shù)谨设。
  4. initialBytesToStrip表示需要略過的字節(jié)數(shù),如果我們只關(guān)心數(shù)據(jù)部分而不關(guān)心頭部缎浇,可將此值設(shè)定為頭部長度從而丟棄頭部扎拣。
    下面我們使用具體的例子來說明:
  • 需求1:如下待解碼數(shù)據(jù)包,正確解碼為消息幀,其中長度字段在最前面的2字節(jié)二蓝,數(shù)據(jù)部分為12字節(jié)的字符串"HELLO, WORLD"誉券,長度字段0x000C=12 表示數(shù)據(jù)部分長度,數(shù)據(jù)包總長度則為14字節(jié)刊愚。
      解碼前(14 bytes)                 解碼后(14 bytes)
     +--------+----------------+      +--------+----------------+
     | Length | Actual Content |----->| Length | Actual Content |
     | 0x000C | "HELLO, WORLD" |      | 0x000C | "HELLO, WORLD" |
     +--------+----------------+      +--------+----------------+

正確配置(只列出四個(gè)值中不為0的值):

    lengthFieldLength = 2;
  • 需求2:需求1的數(shù)據(jù)包不變踊跟,消息幀中去除長度字段。
     解碼前(14 bytes)                 解碼后(12 bytes)
    +--------+----------------+      +----------------+
    | Length | Actual Content |----->| Actual Content |
    | 0x000C | "HELLO, WORLD" |      | "HELLO, WORLD" |
    +--------+----------------+      +----------------+

正確配置:

    lengthFieldLength   = 2;
    initialBytesToStrip = 2;
  • 需求3:需求1數(shù)據(jù)包中長度字段表示數(shù)據(jù)包總長度鸥诽。
     解碼前(14 bytes)                 解碼后(14 bytes)
    +--------+----------------+      +--------+----------------+
    | Length | Actual Content |----->| Length | Actual Content |
    | 0x000E | "HELLO, WORLD" |      | 0x000E | "HELLO, WORLD" |
    +--------+----------------+      +--------+----------------+

正確配置:

    lengthFieldLength =  2;
    lengthAdjustment  = -2;  // 調(diào)整長度字段的2字節(jié)
  • 需求4:綜合難度商玫,數(shù)據(jù)包有兩個(gè)頭部HDR1和HDR2,長度字段以及數(shù)據(jù)部分組成牡借,其中長度字段值表示數(shù)據(jù)包總長度拳昌。結(jié)果消息幀需要第二個(gè)頭部HDR2和數(shù)據(jù)部分。請(qǐng)先給出答案再與標(biāo)準(zhǔn)答案比較钠龙,結(jié)果正確說明你已完全掌握了該解碼器的使用炬藤。
解碼前 (16 bytes)                               解碼后 (13 bytes)
+------+--------+------+----------------+      +------+----------------+
| HDR1 | Length | HDR2 | Actual Content |----->| HDR2 | Actual Content |
| 0xCA | 0x0010 | 0xFE | "HELLO, WORLD" |      | 0xFE | "HELLO, WORLD" |
+------+--------+------+----------------+      +------+----------------+

正確配置:

    lengthFieldOffset   =  1;
    lengthFieldLength   =  2;
    lengthAdjustment    = -3;
    initialBytesToStrip =  3;

本解碼器的解碼過程總體上較為復(fù)雜,由于解碼的代碼是在while循環(huán)里面碴里,decode方法return或者拋出異常時(shí)可看做一次循環(huán)結(jié)束刻像,直到in中數(shù)據(jù)被解析完或者in的readerIndex讀索引不再增加才會(huì)從while循環(huán)跳出。使用狀態(tài)的思路理解并闲,每個(gè)return或者拋出異诚杆看為一個(gè)狀態(tài):

  1. 狀態(tài)1:丟棄過長幀狀態(tài),可能是用戶設(shè)置了錯(cuò)誤的幀長度或者實(shí)際幀過長帝火。
    if (discardingTooLongFrame) {
        long bytesToDiscard = this.bytesToDiscard;
        int localBytesToDiscard = (int) Math.min(bytesToDiscard, in.readableBytes());
        in.skipBytes(localBytesToDiscard); // 丟棄實(shí)際的字節(jié)數(shù)
        
        bytesToDiscard -= localBytesToDiscard;
        this.bytesToDiscard = bytesToDiscard;
        failIfNecessary(false);
    }

變量localBytesToDiscard取得實(shí)際需要丟棄的字節(jié)數(shù)溜徙,由于過長幀有兩種情況:a.用戶設(shè)置了錯(cuò)誤的長度字段,此時(shí)in中并沒有如此多的字節(jié)犀填;b.in中確實(shí)有如此長度的幀蠢壹,這個(gè)幀確實(shí)超過了設(shè)定的最大長度。bytesToDiscard的計(jì)算是為了failIfNecessary()確定異常的拋出九巡,其值為0表示當(dāng)次丟棄狀態(tài)已經(jīng)丟棄了in中的所有數(shù)據(jù)图贸,可以對(duì)新讀入in的數(shù)據(jù)進(jìn)行處理;否則冕广,還處于異常狀態(tài)疏日。

    private void failIfNecessary(boolean firstDetectionOfTooLongFrame) {
        if (bytesToDiscard == 0) {
            long tooLongFrameLength = this.tooLongFrameLength;
            this.tooLongFrameLength = 0;
            // 由于已經(jīng)丟棄所有數(shù)據(jù),關(guān)閉丟棄模式
            discardingTooLongFrame = false;
            // 已經(jīng)丟棄了所有字節(jié)撒汉,當(dāng)非快速失敗模式拋異常
            if (!failFast || firstDetectionOfTooLongFrame) {
                fail(tooLongFrameLength);
            }
        } else {
            if (failFast && firstDetectionOfTooLongFrame) {
                // 幀長度異常沟优,快速失敗模式檢測到即拋異常
                fail(tooLongFrameLength);
            }
        }
    }

可見,首次檢測到幀長度是一種特殊情況睬辐,在之后的一個(gè)狀態(tài)進(jìn)行分析挠阁。請(qǐng)注意該狀態(tài)并不是都拋異常宾肺,還有可能進(jìn)入狀態(tài)2。

  1. 狀態(tài)2:in中數(shù)據(jù)不足夠組成消息幀侵俗,此時(shí)直接返回null等待更多數(shù)據(jù)到達(dá)锨用。
    if (in.readableBytes() < lengthFieldEndOffset) {
        return null;
    }
  1. 狀態(tài)3:幀長度錯(cuò)誤檢測,檢測長度字段為負(fù)值得幀以及加入調(diào)整長度后總長小于長度字段的幀隘谣,均拋出異常增拥。
    int actualLengthFieldOffset = in.readerIndex() + lengthFieldOffset;
    // 該方法取出長度字段的值,不再深入分析
    long frameLength = getUnadjustedFrameLength(in, actualLengthFieldOffset, 
                             lengthFieldLength, byteOrder);
    if (frameLength < 0) {
        in.skipBytes(lengthFieldEndOffset);
        throw new CorruptedFrameException("...");
    }

    frameLength += lengthAdjustment + lengthFieldEndOffset;
    if (frameLength < lengthFieldEndOffset) {
        in.skipBytes(lengthFieldEndOffset);
        throw new CorruptedFrameException("...");
    }
  1. 狀態(tài)4:幀過長洪橘,由前述可知:可能是用戶設(shè)置了錯(cuò)誤的幀長度或者實(shí)際幀過長跪者。
    if (frameLength > maxFrameLength) {
            long discard = frameLength - in.readableBytes();
            tooLongFrameLength = frameLength;

            if (discard < 0) {
                in.skipBytes((int) frameLength);
            } else {
                discardingTooLongFrame = true;
                bytesToDiscard = discard;
                in.skipBytes(in.readableBytes());
            }
            failIfNecessary(true);
            return null;
        }

變量discard<0表示當(dāng)前收到的數(shù)據(jù)足以確定是實(shí)際的幀過長,所以直接丟棄過長的幀長度熄求;>0表示當(dāng)前in中的數(shù)據(jù)并不足以確定是用戶設(shè)置了錯(cuò)誤的幀長度渣玲,還是正確幀的后續(xù)數(shù)據(jù)字節(jié)還沒有到達(dá),但無論何種情況弟晚,將丟棄狀態(tài)discardingTooLongFrame標(biāo)記設(shè)置為true忘衍,之后后續(xù)數(shù)據(jù)字節(jié)進(jìn)入狀態(tài)1處理。==0時(shí)卿城,在failIfNecessary(true)無論如何都將拋出異常枚钓,><0時(shí),只有設(shè)置快速失敗才會(huì)拋出異常瑟押。還需注意一點(diǎn):failIfNecessary()的參數(shù)firstDetectionOfTooLongFrame的首次是指正確解析數(shù)據(jù)后發(fā)生的第一次發(fā)生的幀過長搀捷,可知會(huì)有很多首次。

  1. 狀態(tài)5:正確解碼出消息幀多望。
    int frameLengthInt = (int) frameLength;
    if (in.readableBytes() < frameLengthInt) {
        return null;    // 到達(dá)的數(shù)據(jù)還達(dá)不到幀長
    }

    if (initialBytesToStrip > frameLengthInt) {
        in.skipBytes(frameLengthInt);   // 跳過字節(jié)數(shù)錯(cuò)誤
        throw new CorruptedFrameException("...");
    }
    in.skipBytes(initialBytesToStrip);

    // 正確解碼出數(shù)據(jù)幀
    int readerIndex = in.readerIndex();
    int actualFrameLength = frameLengthInt - initialBytesToStrip;
    ByteBuf frame = in.slice(readerIndex, actualFrameLength).retain();
    in.readerIndex(readerIndex + actualFrameLength);
    return frame;

代碼中混合了兩個(gè)簡單狀態(tài)嫩舟,到達(dá)的數(shù)據(jù)還達(dá)不到幀長和用戶設(shè)置的忽略字節(jié)數(shù)錯(cuò)誤。由于較為簡單怀偷,故合并到一起家厌。
至此解碼框架分析完畢∽倒ぃ可見饭于,要正確的寫出基于長度字段的解碼器還是較為復(fù)雜的,如果開發(fā)時(shí)確有需求维蒙,特別要注意狀態(tài)的轉(zhuǎn)移掰吕。下面介紹較為簡單的編碼框架。

8.2 MessageToByteEncoder

MessageToByteEncoder框架可見用戶使用POJO對(duì)象編碼為字節(jié)數(shù)據(jù)存儲(chǔ)到ByteBuf木西。用戶只需定義自己的編碼方法encode()即可畴栖。
首先看類簽名:

    public abstract class MessageToByteEncoder<I> extends 
                                ChannelOutboundHandlerAdapter

可知該類只處理出站事件,切確的說是write事件八千。
該類有兩個(gè)成員變量吗讶,preferDirect表示是否使用內(nèi)核的DirectedByteBuf,默認(rèn)為true恋捆。TypeParameterMatcher用于檢測泛型參數(shù)是否是期待的類型照皆,比如說,如果需要編碼String類的POJO對(duì)象沸停,Matcher會(huì)確保write()傳入的參數(shù)Object的實(shí)際切確類型為String膜毁。
直接分析write()的處理:

    public void write(ChannelHandlerContext ctx, Object msg, 
                          ChannelPromise promise) throws Exception {
        ByteBuf buf = null;
        try {
            if (acceptOutboundMessage(msg)) {
                I cast = (I) msg;
                // 分配一個(gè)輸出緩沖區(qū)
                buf = allocateBuffer(ctx, cast, preferDirect);
                try {
                    encode(ctx, cast, buf); // 用戶定義的編碼方法
                } finally {
                    ReferenceCountUtil.release(cast);
                }

                if (buf.isReadable()) {
                    ctx.write(buf, promise); // 確實(shí)寫入了數(shù)據(jù)
                } else {
                    // 沒有需要寫的數(shù)據(jù),也有可能是用戶編碼錯(cuò)誤
                    buf.release();  
                    ctx.write(Unpooled.EMPTY_BUFFER, promise);
                }
                buf = null;
            } else {
                ctx.write(msg, promise);
            }
        } catch (EncoderException e) {
            throw e;
        } catch (Throwable e) {
            throw new EncoderException(e);
        } finally {
            if (buf != null) {
                buf.release();
            }
        }
    }

編碼框架簡單明了愤钾,再列出allocateBuffer()方法的代碼:

    protected ByteBuf allocateBuffer(ChannelHandlerContext ctx,  I msg,
                               boolean preferDirect) throws Exception {
        if (preferDirect) {
            return ctx.alloc().ioBuffer();  // 內(nèi)核直接緩存
        } else {
            return ctx.alloc().heapBuffer(); // JAVA隊(duì)緩存
        }
    }

總的來說瘟滨,編碼的復(fù)雜度大大小于解碼的復(fù)雜度,這是因?yàn)榫幋a不需考慮TCP粘包能颁。編解碼的處理還有一個(gè)常用的類MessageToMessageCodec用于POJO對(duì)象之間的轉(zhuǎn)換杂瘸。如果有興趣,可下載源碼查看伙菊。至此败玉,編解碼框架已分析完畢。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末镜硕,一起剝皮案震驚了整個(gè)濱河市运翼,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌兴枯,老刑警劉巖血淌,帶你破解...
    沈念sama閱讀 219,490評(píng)論 6 508
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異财剖,居然都是意外死亡悠夯,警方通過查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,581評(píng)論 3 395
  • 文/潘曉璐 我一進(jìn)店門峰伙,熙熙樓的掌柜王于貴愁眉苦臉地迎上來疗疟,“玉大人,你說我怎么就攤上這事瞳氓〔咄” “怎么了?”我有些...
    開封第一講書人閱讀 165,830評(píng)論 0 356
  • 文/不壞的土叔 我叫張陵匣摘,是天一觀的道長店诗。 經(jīng)常有香客問我,道長音榜,這世上最難降的妖魔是什么庞瘸? 我笑而不...
    開封第一講書人閱讀 58,957評(píng)論 1 295
  • 正文 為了忘掉前任,我火速辦了婚禮赠叼,結(jié)果婚禮上擦囊,老公的妹妹穿的比我還像新娘违霞。我一直安慰自己,他們只是感情好瞬场,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,974評(píng)論 6 393
  • 文/花漫 我一把揭開白布买鸽。 她就那樣靜靜地躺著,像睡著了一般贯被。 火紅的嫁衣襯著肌膚如雪眼五。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 51,754評(píng)論 1 307
  • 那天彤灶,我揣著相機(jī)與錄音看幼,去河邊找鬼。 笑死幌陕,一個(gè)胖子當(dāng)著我的面吹牛诵姜,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播苞轿,決...
    沈念sama閱讀 40,464評(píng)論 3 420
  • 文/蒼蘭香墨 我猛地睜開眼茅诱,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了搬卒?” 一聲冷哼從身側(cè)響起瑟俭,我...
    開封第一講書人閱讀 39,357評(píng)論 0 276
  • 序言:老撾萬榮一對(duì)情侶失蹤,失蹤者是張志新(化名)和其女友劉穎契邀,沒想到半個(gè)月后摆寄,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,847評(píng)論 1 317
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡坯门,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,995評(píng)論 3 338
  • 正文 我和宋清朗相戀三年微饥,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片古戴。...
    茶點(diǎn)故事閱讀 40,137評(píng)論 1 351
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡欠橘,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出现恼,到底是詐尸還是另有隱情肃续,我是刑警寧澤,帶...
    沈念sama閱讀 35,819評(píng)論 5 346
  • 正文 年R本政府宣布叉袍,位于F島的核電站始锚,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏喳逛。R本人自食惡果不足惜瞧捌,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,482評(píng)論 3 331
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧姐呐,春花似錦殿怜、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,023評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽赠法。三九已至麦轰,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間砖织,已是汗流浹背款侵。 一陣腳步聲響...
    開封第一講書人閱讀 33,149評(píng)論 1 272
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留侧纯,地道東北人新锈。 一個(gè)月前我還...
    沈念sama閱讀 48,409評(píng)論 3 373
  • 正文 我出身青樓,卻偏偏與公主長得像眶熬,于是被迫代替她去往敵國和親妹笆。 傳聞我的和親對(duì)象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,086評(píng)論 2 355

推薦閱讀更多精彩內(nèi)容