Netty源碼八 Netty解碼

1. 概述

所謂解碼就是將一串二進制數(shù)據(jù)流解析成一個個自定義協(xié)議的數(shù)據(jù)包秩冈,也就是ByteBuf。后續(xù)業(yè)務(wù)就可以直接基于ByteBuf進行處理文留。

兩個問題

  • 解碼器抽象的解碼過程
  • netty里面有哪些拆箱即用的解碼器

主要內(nèi)容

  • 解碼器基類
  • netty中常見的解碼器

2. 抽象解碼器ByteToMessageDecoder

ByteToMessageDecoder解碼步驟

  • 累加字節(jié)流
  • 調(diào)用子類的decode方法進行解析
  • 將解析到的ByteBuf向下傳播

具體分析

  1. 累加字節(jié)流
    ByteToMessageDecoder是基于ByteBuf進行解碼的,ByteToMessageDecoder的channelRead方法:
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        if (msg instanceof ByteBuf) { // 先判斷是不是ByteBuf,如果是ByteBuf進行解碼器的處理
            CodecOutputList out = CodecOutputList.newInstance(); // 存放解析完之后的對象
            try {
                ByteBuf data = (ByteBuf) msg;
                first = cumulation == null; // 如果cumulation == null 說明累加器中沒數(shù)據(jù)
                if (first) {
                    cumulation = data; // 累加器為null问麸, 將ByteBuf對象賦值給累加器
                } else { // 將累加器中的數(shù)據(jù)和讀進來的數(shù)據(jù)累加,然后賦值給累加器
                    cumulation = cumulator.cumulate(ctx.alloc(), cumulation, data);
                }
                callDecode(ctx, cumulation, out); // 調(diào)用子類decode方法進行解析
            } catch (DecoderException e) {
                throw e;
            } catch (Exception e) {
                throw new DecoderException(e);
            } finally {
                if (cumulation != null && !cumulation.isReadable()) {
                    numReads = 0;
                    cumulation.release();
                    cumulation = null;
                } else if (++ numReads >= discardAfterReads) {
                    // We did enough reads already try to discard some bytes so we not risk to see a OOME.
                    // See https://github.com/netty/netty/issues/4275
                    numReads = 0;
                    discardSomeReadBytes();
                }

                int size = out.size();
                decodeWasNull = !out.insertSinceRecycled();
                fireChannelRead(ctx, out, size);
                out.recycle();
            }
        } else {// 如果不是ByteBuf類型钞翔,直接將其向下傳播
            ctx.fireChannelRead(msg);
        }
    }

我們來看cumulator是什么:

    public static final Cumulator MERGE_CUMULATOR = new Cumulator() {
        @Override
        public ByteBuf cumulate(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf in) {
            try {
                final ByteBuf buffer;
                if (cumulation.writerIndex() > cumulation.maxCapacity() - in.readableBytes()
                    || cumulation.refCnt() > 1 || cumulation.isReadOnly()) {
                    // Expand cumulation (by replace it) when either there is not more room in the buffer
                    // or if the refCnt is greater then 1 which may happen when the user use slice().retain() or
                    // duplicate().retain() or if its read-only.
                    //
                    // See:
                    // - https://github.com/netty/netty/issues/2327
                    // - https://github.com/netty/netty/issues/1764
                    buffer = expandCumulation(alloc, cumulation, in.readableBytes());
                } else {
                    buffer = cumulation;
                }
                buffer.writeBytes(in); // 把當(dāng)前數(shù)據(jù)寫入累加器 
                return buffer;
            } finally {
                // We must release in in all cases as otherwise it may produce a leak if writeBytes(...) throw
                // for whatever release (for example because of OutOfMemoryError)
                in.release(); // 讀進來的數(shù)據(jù)進行釋放
            }
        }
    };

cumulation.writerIndex() > cumulation.maxCapacity() - in.readableBytes() 判斷當(dāng)前累加器是否有足夠的空間严卖,如果空間不夠就進行擴容。

  1. 調(diào)用子類decode
    接下來看 allDecode(ctx, cumulation, out); 方法:
    protected void callDecode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
        try {
            while (in.isReadable()) { // 判斷累加器中是否有可讀字節(jié)
                int outSize = out.size();

                if (outSize > 0) { // 看outList中是否有對象布轿,只要有對象就使用事件傳播機制向下傳播
                    fireChannelRead(ctx, out, outSize);
                    out.clear(); // 然后當(dāng)前的list進行清空

                    // Check if this handler was removed before continuing with decoding.
                    // If it was removed, it is not safe to continue to operate on the buffer.
                    //
                    // See:
                    // - https://github.com/netty/netty/issues/4635
                    if (ctx.isRemoved()) {
                        break;
                    }
                    outSize = 0;
                }

                int oldInputLength = in.readableBytes(); // 調(diào)用子類decode之前哮笆,將當(dāng)前的可讀字節(jié)記錄下來
                decodeRemovalReentryProtection(ctx, in, out);

                // Check if this handler was removed before continuing the loop.
                // If it was removed, it is not safe to continue to operate on the buffer.
                //
                // See https://github.com/netty/netty/issues/1664
                if (ctx.isRemoved()) {
                    break;
                }

                if (outSize == out.size()) { // 沒解析到數(shù)據(jù)
                    if (oldInputLength == in.readableBytes()) { // 如果沒新數(shù)據(jù)進到累加器就break
                        break;
                    } else { // 有新數(shù)據(jù)進入了累加器,但是還不足以解析汰扭,就繼續(xù)循環(huán)
                        continue;
                    }
                }

                if (oldInputLength == in.readableBytes()) { 
                    throw new DecoderException(
                            StringUtil.simpleClassName(getClass()) +
                                    ".decode() did not read anything but decoded a message.");
                }

                if (isSingleDecode()) { // 讀一次數(shù)據(jù)稠肘,只解析一次
                    break;
                }
            }
        } catch (DecoderException e) {
            throw e;
        } catch (Exception cause) {
            throw new DecoderException(cause);
        }
    }
  1. 將子類解析到的ByteBuf向下傳播
    fireChannelRead(ctx, out, size);
    /**
     * Get {@code numElements} out of the {@link CodecOutputList} and forward these through the pipeline.
     */
    static void fireChannelRead(ChannelHandlerContext ctx, CodecOutputList msgs, int numElements) {
        for (int i = 0; i < numElements; i ++) { // 將解析到的每個對象向下傳播
            ctx.fireChannelRead(msgs.getUnsafe(i));  // 這就最終傳播到業(yè)務(wù)解碼器中,
        }
    }

3. 基于固定長度解碼器 FixedLengthFrameDecoder 的分析

/**
 * A decoder that splits the received {@link ByteBuf}s by the fixed number
 * of bytes. For example, if you received the following four fragmented packets:
 * <pre>
 * +---+----+------+----+
 * | A | BC | DEFG | HI |
 * +---+----+------+----+
 * </pre>
 * A {@link FixedLengthFrameDecoder}{@code (3)} will decode them into the
 * following three packets with the fixed length:
 * <pre>
 * +-----+-----+-----+
 * | ABC | DEF | GHI |
 * +-----+-----+-----+
 * </pre>
 */
public class FixedLengthFrameDecoder extends ByteToMessageDecoder {

    private final int frameLength; // 固定長度

    /**
     * Creates a new instance.
     *
     * @param frameLength the length of the frame
     */
    public FixedLengthFrameDecoder(int frameLength) {
        checkPositive(frameLength, "frameLength");
        this.frameLength = frameLength;
    }

    @Override
    protected final void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
        Object decoded = decode(ctx, in);
        if (decoded != null) {
            out.add(decoded);
        }
    }

    /**
     * Create a frame out of the {@link ByteBuf} and return it.
     *
     * @param   ctx             the {@link ChannelHandlerContext} which this {@link ByteToMessageDecoder} belongs to
     * @param   in              the {@link ByteBuf} from which to read data
     * @return  frame           the {@link ByteBuf} which represent the frame or {@code null} if no frame could
     *                          be created.
     */
    protected Object decode(
            @SuppressWarnings("UnusedParameters") ChannelHandlerContext ctx, ByteBuf in) throws Exception {
        if (in.readableBytes() < frameLength) { // 小于固定長度东且,不解析
            return null;
        } else {
            return in.readRetainedSlice(frameLength); // 從當(dāng)前累加器中截取frmeLength的字節(jié)數(shù)據(jù)
        }
    }
}

4. 基于行解碼器 LineBasedFrameDecoder 的分析

行解碼器启具,如果字節(jié)流是以\r\n結(jié)尾或者是以\n結(jié)尾的字節(jié)流,行解碼器就是以換行符為分隔珊泳,將字節(jié)流解析成一個個完整的數(shù)據(jù)包鲁冯。

public class LineBasedFrameDecoder extends ByteToMessageDecoder {

    /** Maximum length of a frame we're willing to decode.  */
    private final int maxLength;  // 行解碼器解析數(shù)據(jù)包的最大長度,超過最大長度色查,可能處理丟棄模式
    /** Whether or not to throw an exception as soon as we exceed maxLength. */
    private final boolean failFast; // 超過最大長度是否立即拋出異常薯演,true表示立即拋出
    private final boolean stripDelimiter; // 最終解析出的數(shù)據(jù)包帶不帶換行符,如果為true表示不帶

    /** True if we're discarding input because we're already over maxLength.  */
    private boolean discarding; // 數(shù)據(jù)流過長就會開啟丟棄模式
    private int discardedBytes; // 解碼到現(xiàn)在當(dāng)前已經(jīng)丟棄的字節(jié)

    /** Last scan position. */
    private int offset;

    /**
     * Creates a new decoder.
     * @param maxLength  the maximum length of the decoded frame.
     *                   A {@link TooLongFrameException} is thrown if
     *                   the length of the frame exceeds this value.
     */
    public LineBasedFrameDecoder(final int maxLength) {
        this(maxLength, true, false);
    }

    /**
     * Creates a new decoder.
     * @param maxLength  the maximum length of the decoded frame.
     *                   A {@link TooLongFrameException} is thrown if
     *                   the length of the frame exceeds this value.
     * @param stripDelimiter  whether the decoded frame should strip out the
     *                        delimiter or not
     * @param failFast  If <tt>true</tt>, a {@link TooLongFrameException} is
     *                  thrown as soon as the decoder notices the length of the
     *                  frame will exceed <tt>maxFrameLength</tt> regardless of
     *                  whether the entire frame has been read.
     *                  If <tt>false</tt>, a {@link TooLongFrameException} is
     *                  thrown after the entire frame that exceeds
     *                  <tt>maxFrameLength</tt> has been read.
     */
    public LineBasedFrameDecoder(final int maxLength, final boolean stripDelimiter, final boolean failFast) {
        this.maxLength = maxLength;
        this.failFast = failFast;
        this.stripDelimiter = stripDelimiter;
    }

    @Override
    protected final void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
        Object decoded = decode(ctx, in); // 調(diào)用重載的方法
        if (decoded != null) { // 如果解析到對象秧了,放入outList
            out.add(decoded);
        }
    }

    /**
     * Create a frame out of the {@link ByteBuf} and return it.
     *
     * @param   ctx             the {@link ChannelHandlerContext} which this {@link ByteToMessageDecoder} belongs to
     * @param   buffer          the {@link ByteBuf} from which to read data
     * @return  frame           the {@link ByteBuf} which represent the frame or {@code null} if no frame could
     *                          be created.
     */
    protected Object decode(ChannelHandlerContext ctx, ByteBuf buffer) throws Exception {
        final int eol = findEndOfLine(buffer); // 找行的結(jié)尾 
        if (!discarding) {
            if (eol >= 0) {
                final ByteBuf frame;
                final int length = eol - buffer.readerIndex();
                final int delimLength = buffer.getByte(eol) == '\r'? 2 : 1;

                if (length > maxLength) { // 數(shù)據(jù)流長度> 最大解析長度
                    buffer.readerIndex(eol + delimL ength); // 如果超過跨扮,把readerIndex直接指向換行符之后的字節(jié),相當(dāng)于把超長的消息給丟棄了
                    fail(ctx, length); // 傳播異常
                    return null;
                }

                if (stripDelimiter) { // 不帶分隔符
                    frame = buffer.readRetainedSlice(length); // 從累加器讀取指定長度
                    buffer.skipBytes(delimLength);
                } else { // 解析出來的消息帶分隔符
                    frame = buffer.readRetainedSlice(length + delimLength);
                }

                return frame;
            } else { // 非丟棄模式,找不到換行符的時候
                final int length = buffer.readableBytes();
                if (length > maxLength) { // 大于最大長度了
                    discardedBytes = length; // 丟棄
                    buffer.readerIndex(buffer.writerIndex());
                    discarding = true; // 當(dāng)前進入丟棄模式
                    offset = 0;
                    if (failFast) { // 傳播異常
                        fail(ctx, "over " + discardedBytes);
                    }
                }
                return null;
            }
        } else { // 丟棄模式
            if (eol >= 0) {
                final int length = discardedBytes + eol - buffer.readerIndex();  // 計算丟棄模式下后半段消息的長度(前半段因為超長已經(jīng)丟棄了衡创,這里處理的是后半段垃圾信息)
                final int delimLength = buffer.getByte(eol) == '\r'? 2 : 1;
                buffer.readerIndex(eol + delimLength); // 讀指針移到新行開頭帝嗡,把length那段整體丟棄
                discardedBytes = 0;
                discarding = false;  // 進入正常模式--非丟棄模式
                if (!failFast ) {
                    fail(ctx, length);
                }
            } else {// 丟棄模式下未找到換行符,繼續(xù)丟棄璃氢,然后計算長度
                discardedBytes += buffer.readableBytes();
                buffer.readerIndex(buffer.writerIndex());
                // We skip everything in the buffer, we need to set the offset to 0 again.
                offset = 0;
            }
            return null;
        }
    }

    private void fail(final ChannelHandlerContext ctx, int length) {
        fail(ctx, String.valueOf(length));
    }

    private void fail(final ChannelHandlerContext ctx, String length) {
        ctx.fireExceptionCaught(
                new TooLongFrameException(
                        "frame length (" + length + ") exceeds the allowed maximum (" + maxLength + ')'));
    }

    /**
     * Returns the index in the buffer of the end of line found.
     * Returns -1 if no end of line was found in the buffer.
     */
    private int findEndOfLine(final ByteBuf buffer) {
        int totalLength = buffer.readableBytes();
        int i = buffer.forEachByte(buffer.readerIndex() + offset, totalLength - offset, ByteProcessor.FIND_LF);
        if (i >= 0) {
            offset = 0;
            if (i > 0 && buffer.getByte(i - 1) == '\r') {
                i--;
            }
        } else {
            offset = totalLength;
        }
        return i;
    }
}

5. 基于分隔符解碼器 DelimiterBasedFrameDecoder 分析

它最少要有兩個參數(shù)哟玷,一個是解析數(shù)據(jù)流最大長度,一個是分隔符一也,它可以傳多個分隔符進去巢寡。

基于分隔符解碼器分析解碼步驟

  • 行處理器
  • 解碼
public class DelimiterBasedFrameDecoder extends ByteToMessageDecoder {

    private final ByteBuf[] delimiters;
    private final int maxFrameLength;
    private final boolean stripDelimiter;
    private final boolean failFast;
    private boolean discardingTooLongFrame;
    private int tooLongFrameLength;
    /** Set only when decoding with "\n" and "\r\n" as the delimiter.  */
    private final LineBasedFrameDecoder lineBasedDecoder;

    /**
     * Creates a new instance.
     *
     * @param maxFrameLength  the maximum length of the decoded frame.
     *                        A {@link TooLongFrameException} is thrown if
     *                        the length of the frame exceeds this value.
     * @param delimiter  the delimiter
     */
    public DelimiterBasedFrameDecoder(int maxFrameLength, ByteBuf delimiter) {
        this(maxFrameLength, true, delimiter);
    }

    /**
     * Creates a new instance.
     *
     * @param maxFrameLength  the maximum length of the decoded frame.
     *                        A {@link TooLongFrameException} is thrown if
     *                        the length of the frame exceeds this value.
     * @param stripDelimiter  whether the decoded frame should strip out the
     *                        delimiter or not
     * @param delimiter  the delimiter
     */
    public DelimiterBasedFrameDecoder(
            int maxFrameLength, boolean stripDelimiter, ByteBuf delimiter) {
        this(maxFrameLength, stripDelimiter, true, delimiter);
    }

    /**
     * Creates a new instance.
     *
     * @param maxFrameLength  the maximum length of the decoded frame.
     *                        A {@link TooLongFrameException} is thrown if
     *                        the length of the frame exceeds this value.
     * @param stripDelimiter  whether the decoded frame should strip out the
     *                        delimiter or not
     * @param failFast  If <tt>true</tt>, a {@link TooLongFrameException} is
     *                  thrown as soon as the decoder notices the length of the
     *                  frame will exceed <tt>maxFrameLength</tt> regardless of
     *                  whether the entire frame has been read.
     *                  If <tt>false</tt>, a {@link TooLongFrameException} is
     *                  thrown after the entire frame that exceeds
     *                  <tt>maxFrameLength</tt> has been read.
     * @param delimiter  the delimiter
     */
    public DelimiterBasedFrameDecoder(
            int maxFrameLength, boolean stripDelimiter, boolean failFast,
            ByteBuf delimiter) {
        this(maxFrameLength, stripDelimiter, failFast, new ByteBuf[] {
                delimiter.slice(delimiter.readerIndex(), delimiter.readableBytes())});
    }

    /**
     * Creates a new instance.
     *
     * @param maxFrameLength  the maximum length of the decoded frame.
     *                        A {@link TooLongFrameException} is thrown if
     *                        the length of the frame exceeds this value.
     * @param delimiters  the delimiters
     */
    public DelimiterBasedFrameDecoder(int maxFrameLength, ByteBuf... delimiters) {
        this(maxFrameLength, true, delimiters);
    }

    /**
     * Creates a new instance.
     *
     * @param maxFrameLength  the maximum length of the decoded frame.
     *                        A {@link TooLongFrameException} is thrown if
     *                        the length of the frame exceeds this value.
     * @param stripDelimiter  whether the decoded frame should strip out the
     *                        delimiter or not
     * @param delimiters  the delimiters
     */
    public DelimiterBasedFrameDecoder(
            int maxFrameLength, boolean stripDelimiter, ByteBuf... delimiters) {
        this(maxFrameLength, stripDelimiter, true, delimiters);
    }

    /**
     * Creates a new instance.
     *
     * @param maxFrameLength  the maximum length of the decoded frame.
     *                        A {@link TooLongFrameException} is thrown if
     *                        the length of the frame exceeds this value.
     * @param stripDelimiter  whether the decoded frame should strip out the
     *                        delimiter or not
     * @param failFast  If <tt>true</tt>, a {@link TooLongFrameException} is
     *                  thrown as soon as the decoder notices the length of the
     *                  frame will exceed <tt>maxFrameLength</tt> regardless of
     *                  whether the entire frame has been read.
     *                  If <tt>false</tt>, a {@link TooLongFrameException} is
     *                  thrown after the entire frame that exceeds
     *                  <tt>maxFrameLength</tt> has been read.
     * @param delimiters  the delimiters
     */
    public DelimiterBasedFrameDecoder(
            int maxFrameLength, boolean stripDelimiter, boolean failFast, ByteBuf... delimiters) {
        validateMaxFrameLength(maxFrameLength);
        if (delimiters == null) {
            throw new NullPointerException("delimiters");
        }
        if (delimiters.length == 0) {
            throw new IllegalArgumentException("empty delimiters");
        }

        if (isLineBased(delimiters) && !isSubclass()) {
            lineBasedDecoder = new LineBasedFrameDecoder(maxFrameLength, stripDelimiter, failFast);
            this.delimiters = null;
        } else {
            this.delimiters = new ByteBuf[delimiters.length];
            for (int i = 0; i < delimiters.length; i ++) {
                ByteBuf d = delimiters[i];
                validateDelimiter(d);
                this.delimiters[i] = d.slice(d.readerIndex(), d.readableBytes());
            }
            lineBasedDecoder = null;
        }
        this.maxFrameLength = maxFrameLength;
        this.stripDelimiter = stripDelimiter;
        this.failFast = failFast;
    }

    /** Returns true if the delimiters are "\n" and "\r\n".  */
    private static boolean isLineBased(final ByteBuf[] delimiters) {
        if (delimiters.length != 2) {
            return false;
        }
        ByteBuf a = delimiters[0];
        ByteBuf b = delimiters[1];
        if (a.capacity() < b.capacity()) {
            a = delimiters[1];
            b = delimiters[0];
        }
        return a.capacity() == 2 && b.capacity() == 1
                && a.getByte(0) == '\r' && a.getByte(1) == '\n'
                && b.getByte(0) == '\n';
    }

    /**
     * Return {@code true} if the current instance is a subclass of DelimiterBasedFrameDecoder
     */
    private boolean isSubclass() {
        return getClass() != DelimiterBasedFrameDecoder.class;
    }

    @Override
    protected final void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
        Object decoded = decode(ctx, in); // 重載一個decode方法
        if (decoded != null) {
            out.add(decoded); // 解析出的對象加入outList
        }
    }

    /**
     * Create a frame out of the {@link ByteBuf} and return it.
     *
     * @param   ctx             the {@link ChannelHandlerContext} which this {@link ByteToMessageDecoder} belongs to
     * @param   buffer          the {@link ByteBuf} from which to read data
     * @return  frame           the {@link ByteBuf} which represent the frame or {@code null} if no frame could
     *                          be created.
     */
    protected Object decode(ChannelHandlerContext ctx, ByteBuf buffer) throws Exception {
        if (lineBasedDecoder != null) { // 如果行處理器不為空,調(diào)用行處理器
            return lineBasedDecoder.decode(ctx, buffer);
        }
        // Try all delimiters and choose the delimiter which yields the shortest frame.
        int minFrameLength = Integer.MAX_VALUE; // 找到最小分隔符
        ByteBuf minDelim = null;
        for (ByteBuf delim: delimiters) {
            int frameLength = indexOf(buffer, delim);
            if (frameLength >= 0 && frameLength < minFrameLength) { // 以某個分隔符分割的最小數(shù)據(jù)包的長度
                minFrameLength = frameLength;
                minDelim = delim;
            }
        }

        if (minDelim != null) { // 開始解碼椰苟,找到分隔符
            int minDelimLength = minDelim.capacity();
            ByteBuf frame;

            if (discardingTooLongFrame) { // 丟棄模式
                // We've just finished discarding a very large frame.
                // Go back to the initial state.
                discardingTooLongFrame = false; // 設(shè)置正常模式抑月,非丟棄模式
                buffer.skipBytes(minFrameLength + minDelimLength);

                int tooLongFrameLength = this.tooLongFrameLength;
                this.tooLongFrameLength = 0;
                if (!failFast) {
                    fail(tooLongFrameLength);
                }
                return null;
            }

            if (minFrameLength > maxFrameLength) {
                // Discard read frame. 丟棄
                buffer.skipBytes(minFrameLength + minDelimLength);
                fail(minFrameLength); // 傳播異常
                return null;
            }

            if (stripDelimiter) { // 是否包含分隔符
                frame = buffer.readRetainedSlice(minFrameLength);
                buffer.skipBytes(minDelimLength);
            } else {
                frame = buffer.readRetainedSlice(minFrameLength + minDelimLength);
            }

            return frame;
        } else {  // 沒有找到分隔符
            if (!discardingTooLongFrame) { // 非丟棄模式
                if (buffer.readableBytes() > maxFrameLength) { // 數(shù)據(jù)包過長,丟棄
                    // Discard the content of the buffer until a delimiter is found.
                    tooLongFrameLength = buffer.readableBytes(); // 記錄丟棄的字節(jié)數(shù)
                    buffer.skipBytes(buffer.readableBytes());
                    discardingTooLongFrame = true; // 標(biāo)記為丟棄狀態(tài)
                    if (failFast) {
                        fail(tooLongFrameLength);
                    }
                }
            } else { // 當(dāng)前處于丟棄模式
                // Still discarding the buffer since a delimiter is not found.
                tooLongFrameLength += buffer.readableBytes();
                buffer.skipBytes(buffer.readableBytes());
            }
            return null;
        }
    }

    private void fail(long frameLength) {
        if (frameLength > 0) {
            throw new TooLongFrameException(
                            "frame length exceeds " + maxFrameLength +
                            ": " + frameLength + " - discarded");
        } else {
            throw new TooLongFrameException(
                            "frame length exceeds " + maxFrameLength +
                            " - discarding");
        }
    }

    /**
     * Returns the number of bytes between the readerIndex of the haystack and
     * the first needle found in the haystack.  -1 is returned if no needle is
     * found in the haystack.
     */
    private static int indexOf(ByteBuf haystack, ByteBuf needle) {
        for (int i = haystack.readerIndex(); i < haystack.writerIndex(); i ++) {
            int haystackIndex = i;
            int needleIndex;
            for (needleIndex = 0; needleIndex < needle.capacity(); needleIndex ++) {
                if (haystack.getByte(haystackIndex) != needle.getByte(needleIndex)) {
                    break;
                } else {
                    haystackIndex ++;
                    if (haystackIndex == haystack.writerIndex() &&
                        needleIndex != needle.capacity() - 1) {
                        return -1;
                    }
                }
            }

            if (needleIndex == needle.capacity()) {
                // Found the needle from the haystack!
                return i - haystack.readerIndex();
            }
        }
        return -1;
    }

    private static void validateDelimiter(ByteBuf delimiter) {
        if (delimiter == null) {
            throw new NullPointerException("delimiter");
        }
        if (!delimiter.isReadable()) {
            throw new IllegalArgumentException("empty delimiter");
        }
    }

    private static void validateMaxFrameLength(int maxFrameLength) {
        checkPositive(maxFrameLength, "maxFrameLength");
    }
}

6. 基于長度域的解碼器 LengthFieldBasedFrameDecoder 分析

重要參數(shù)

lengthFieldOffset: 長度域在二進制數(shù)據(jù)流中偏移量舆蝴。
lengthFieldLength:長度域的長度谦絮,有幾個取值1,2须误,3挨稿,4,8京痢。
lengthAdjustment:lengthFieldLength不代表一個消息的完整長度奶甘,lengthFieldLength指定長度+lengthAdjustment 才是。用于lengthFieldLength指定的長度包含長度域的情況祭椰,這時候lengthAdjustment設(shè)置為負(fù)值方便解碼臭家。
initialBytesToStrip:我在解碼的時候,需要跳過的字節(jié)數(shù)方淤,常用于跳過長度域钉赁。

解碼過程

LengthFieldBasedFrameDecoder 解碼過程如下:

  • 計算需要抽取的數(shù)據(jù)包長度
  • 跳過字節(jié)邏輯處理
  • 丟棄模式下的處理

我們來看它的核心解碼代碼:

    protected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
        if (discardingTooLongFrame) {
            discardingTooLongFrame(in);
        }

        if (in.readableBytes() < lengthFieldEndOffset) { // 當(dāng)前字節(jié)數(shù)還不夠 解析出lengthFieldLength
            return null;
        }

        int actualLengthFieldOffset = in.readerIndex() + lengthFieldOffset;  // 因為是流,要加上讀當(dāng)前指針的位置你踩,計算實際偏移量
        long frameLength = getUnadjustedFrameLength(in, actualLengthFieldOffset, lengthFieldLength, byteOrder); // lengthFieldLength中指定的數(shù)據(jù)包長度

        if (frameLength < 0) {
            failOnNegativeLengthField(in, frameLength, lengthFieldEndOffset);
        }

        frameLength += lengthAdjustment + lengthFieldEndOffset; // 計算出需要抽取的數(shù)據(jù)包長度

        if (frameLength < lengthFieldEndOffset) {
            failOnFrameLengthLessThanLengthFieldEndOffset(in, frameLength, lengthFieldEndOffset);
        }

        if (frameLength > maxFrameLength) { // 這里面會設(shè)置進入丟棄模式
            exceededFrameLength(in, frameLength);
            return null;
        }

        // never overflows because it's less than maxFrameLength
        int frameLengthInt = (int) frameLength;
        if (in.readableBytes() < frameLengthInt) {
            return null;
        }

        if (initialBytesToStrip > frameLengthInt) {
            failOnFrameLengthLessThanInitialBytesToStrip(in, frameLength, initialBytesToStrip);
        }
        // 到這里已經(jīng)有一個完整的數(shù)據(jù)包了
        in.skipBytes(initialBytesToStrip); // 跳過略過的長度

        // extract frame
        int readerIndex = in.readerIndex();
        int actualFrameLength = frameLengthInt - initialBytesToStrip;  // 真正需要拿的字節(jié)
        ByteBuf frame = extractFrame(ctx, in, readerIndex, actualFrameLength);
        in.readerIndex(readerIndex + actualFrameLength);
        return frame;
    }

frameLength > maxFrameLength就會設(shè)置丟棄模式
丟棄模式處理

    private void exceededFrameLength(ByteBuf in, long frameLength) {
        long discard = frameLength - in.readableBytes(); // 計算下次還要丟棄的字節(jié)數(shù)
        tooLongFrameLength = frameLength;  // 記錄

        if (discard < 0) { // 說明一次丟棄就能解決問題
            // buffer contains more bytes then the frameLength so we can discard all now
            in.skipBytes((int) frameLength);
        } else { // 說明還需要一次丟棄才能解決問題
            // Enter the discard mode and discard everything received so far.
            discardingTooLongFrame = true; // 設(shè)置為丟棄模式
            bytesToDiscard = discard;
            in.skipBytes(in.readableBytes()); // 跳過,即丟棄
        }
        failIfNecessary(true);
    }

我們來看 failNecessary方法:

    private void failIfNecessary(boolean firstDetectionOfTooLongFrame) {
        if (bytesToDiscard == 0) { // 說明不需要丟棄
            // Reset to the initial state and tell the handlers that
            // the frame was too large.
            long tooLongFrameLength = this.tooLongFrameLength;
            this.tooLongFrameLength = 0;
            discardingTooLongFrame = false; // 設(shè)置為正常模式
            if (!failFast || firstDetectionOfTooLongFrame) { // 是否需要快速失敗
                fail(tooLongFrameLength);
            }
        } else {
            // Keep discarding and notify handlers if necessary.
            if (failFast && firstDetectionOfTooLongFrame) { // 如果是快速失敗讳苦,且是第一次
                fail(tooLongFrameLength); // 發(fā)出失敗通知
            }
        }
    }

7. 解碼器總結(jié)

1. 解碼器抽象的解碼過程是怎樣的带膜?
答:抽象的解碼過程是通過ByteToMessageDecoder實現(xiàn)的,它的解碼過程分為以下三步驟:第一步累加字節(jié)流鸳谜,它會把當(dāng)前讀取的字節(jié)流累加到累加器cumulator中膝藕;第二步調(diào)用子類的decode方法進行解析,decode方法實際上是一個抽象方法咐扭,不同的子類解碼器的decode有不同的實現(xiàn)芭挽。調(diào)用decode時會傳入兩個比較重要的參數(shù)滑废,一個是當(dāng)前累加的字節(jié)流,一個是outList袜爪,子類在解碼時從累加器中讀取一段數(shù)據(jù)蠕趁,如果成功解析出一個數(shù)據(jù)包,就把這個數(shù)據(jù)包添加到outList中饿敲;第三步妻导,如果outList中有解析出的數(shù)據(jù)包,就通過pipeline的事件傳播機制往下傳播怀各。

2. Netty中有哪些拆箱即用的解碼器?
netty提供了許許多多開箱即用的解碼器术浪,99%的業(yè)務(wù)場景下使用netty提供的解碼器就能完成需求瓢对,使用netty提供的解碼器代碼的健壯性能夠得到保證。比如固定長度解碼器FixedLengthFrameDecoder胰苏,行解碼器(LineBasedFrameDecoder)硕蛹;固定分隔符解碼器(DelimiterBasedFrameDecoder),可以傳一些分隔符進去硕并,如果是行分隔符netty會做特殊處理法焰;基于長度域的解碼器(LengthFieldBasedFrameDecoder),比較通用的解碼器倔毙,解碼的過程比較復(fù)雜埃仪,主要有三個步驟,第一步計算需要抽取的數(shù)據(jù)包的長度陕赃,第二步對拿到的數(shù)據(jù)包進行處理比如跳過指定字節(jié)數(shù)卵蛉,處理完之后一個完整的數(shù)據(jù)包就可以放入outList中,接下來就是丟棄模式的處理么库。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末傻丝,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子诉儒,更是在濱河造成了極大的恐慌葡缰,老刑警劉巖,帶你破解...
    沈念sama閱讀 216,919評論 6 502
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件忱反,死亡現(xiàn)場離奇詭異泛释,居然都是意外死亡,警方通過查閱死者的電腦和手機缭受,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,567評論 3 392
  • 文/潘曉璐 我一進店門胁澳,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人米者,你說我怎么就攤上這事韭畸∮钪牵” “怎么了?”我有些...
    開封第一講書人閱讀 163,316評論 0 353
  • 文/不壞的土叔 我叫張陵胰丁,是天一觀的道長随橘。 經(jīng)常有香客問我,道長锦庸,這世上最難降的妖魔是什么机蔗? 我笑而不...
    開封第一講書人閱讀 58,294評論 1 292
  • 正文 為了忘掉前任,我火速辦了婚禮甘萧,結(jié)果婚禮上萝嘁,老公的妹妹穿的比我還像新娘。我一直安慰自己扬卷,他們只是感情好牙言,可當(dāng)我...
    茶點故事閱讀 67,318評論 6 390
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著怪得,像睡著了一般咱枉。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上徒恋,一...
    開封第一講書人閱讀 51,245評論 1 299
  • 那天蚕断,我揣著相機與錄音,去河邊找鬼入挣。 笑死亿乳,一個胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的财岔。 我是一名探鬼主播风皿,決...
    沈念sama閱讀 40,120評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼匠璧!你這毒婦竟也來了桐款?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 38,964評論 0 275
  • 序言:老撾萬榮一對情侶失蹤夷恍,失蹤者是張志新(化名)和其女友劉穎魔眨,沒想到半個月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體酿雪,經(jīng)...
    沈念sama閱讀 45,376評論 1 313
  • 正文 獨居荒郊野嶺守林人離奇死亡遏暴,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,592評論 2 333
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了指黎。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片朋凉。...
    茶點故事閱讀 39,764評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖醋安,靈堂內(nèi)的尸體忽然破棺而出杂彭,到底是詐尸還是另有隱情墓毒,我是刑警寧澤,帶...
    沈念sama閱讀 35,460評論 5 344
  • 正文 年R本政府宣布亲怠,位于F島的核電站所计,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏团秽。R本人自食惡果不足惜主胧,卻給世界環(huán)境...
    茶點故事閱讀 41,070評論 3 327
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望习勤。 院中可真熱鬧踪栋,春花似錦、人聲如沸图毕。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,697評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽吴旋。三九已至,卻和暖如春厢破,著一層夾襖步出監(jiān)牢的瞬間荣瑟,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 32,846評論 1 269
  • 我被黑心中介騙來泰國打工摩泪, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留笆焰,地道東北人。 一個月前我還...
    沈念sama閱讀 47,819評論 2 370
  • 正文 我出身青樓见坑,卻偏偏與公主長得像嚷掠,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子荞驴,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 44,665評論 2 354

推薦閱讀更多精彩內(nèi)容