1. 概述
所謂解碼就是將一串二進制數(shù)據(jù)流解析成一個個自定義協(xié)議的數(shù)據(jù)包秩冈,也就是ByteBuf。后續(xù)業(yè)務(wù)就可以直接基于ByteBuf進行處理文留。
兩個問題
- 解碼器抽象的解碼過程
- netty里面有哪些拆箱即用的解碼器
主要內(nèi)容
- 解碼器基類
- netty中常見的解碼器
2. 抽象解碼器ByteToMessageDecoder
ByteToMessageDecoder解碼步驟
- 累加字節(jié)流
- 調(diào)用子類的decode方法進行解析
- 將解析到的ByteBuf向下傳播
具體分析
- 累加字節(jié)流
ByteToMessageDecoder是基于ByteBuf進行解碼的,ByteToMessageDecoder的channelRead方法:
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (msg instanceof ByteBuf) { // 先判斷是不是ByteBuf,如果是ByteBuf進行解碼器的處理
CodecOutputList out = CodecOutputList.newInstance(); // 存放解析完之后的對象
try {
ByteBuf data = (ByteBuf) msg;
first = cumulation == null; // 如果cumulation == null 說明累加器中沒數(shù)據(jù)
if (first) {
cumulation = data; // 累加器為null问麸, 將ByteBuf對象賦值給累加器
} else { // 將累加器中的數(shù)據(jù)和讀進來的數(shù)據(jù)累加,然后賦值給累加器
cumulation = cumulator.cumulate(ctx.alloc(), cumulation, data);
}
callDecode(ctx, cumulation, out); // 調(diào)用子類decode方法進行解析
} catch (DecoderException e) {
throw e;
} catch (Exception e) {
throw new DecoderException(e);
} finally {
if (cumulation != null && !cumulation.isReadable()) {
numReads = 0;
cumulation.release();
cumulation = null;
} else if (++ numReads >= discardAfterReads) {
// We did enough reads already try to discard some bytes so we not risk to see a OOME.
// See https://github.com/netty/netty/issues/4275
numReads = 0;
discardSomeReadBytes();
}
int size = out.size();
decodeWasNull = !out.insertSinceRecycled();
fireChannelRead(ctx, out, size);
out.recycle();
}
} else {// 如果不是ByteBuf類型钞翔,直接將其向下傳播
ctx.fireChannelRead(msg);
}
}
我們來看cumulator是什么:
public static final Cumulator MERGE_CUMULATOR = new Cumulator() {
@Override
public ByteBuf cumulate(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf in) {
try {
final ByteBuf buffer;
if (cumulation.writerIndex() > cumulation.maxCapacity() - in.readableBytes()
|| cumulation.refCnt() > 1 || cumulation.isReadOnly()) {
// Expand cumulation (by replace it) when either there is not more room in the buffer
// or if the refCnt is greater then 1 which may happen when the user use slice().retain() or
// duplicate().retain() or if its read-only.
//
// See:
// - https://github.com/netty/netty/issues/2327
// - https://github.com/netty/netty/issues/1764
buffer = expandCumulation(alloc, cumulation, in.readableBytes());
} else {
buffer = cumulation;
}
buffer.writeBytes(in); // 把當(dāng)前數(shù)據(jù)寫入累加器
return buffer;
} finally {
// We must release in in all cases as otherwise it may produce a leak if writeBytes(...) throw
// for whatever release (for example because of OutOfMemoryError)
in.release(); // 讀進來的數(shù)據(jù)進行釋放
}
}
};
cumulation.writerIndex() > cumulation.maxCapacity() - in.readableBytes()
判斷當(dāng)前累加器是否有足夠的空間严卖,如果空間不夠就進行擴容。
- 調(diào)用子類decode
接下來看allDecode(ctx, cumulation, out);
方法:
protected void callDecode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
try {
while (in.isReadable()) { // 判斷累加器中是否有可讀字節(jié)
int outSize = out.size();
if (outSize > 0) { // 看outList中是否有對象布轿,只要有對象就使用事件傳播機制向下傳播
fireChannelRead(ctx, out, outSize);
out.clear(); // 然后當(dāng)前的list進行清空
// Check if this handler was removed before continuing with decoding.
// If it was removed, it is not safe to continue to operate on the buffer.
//
// See:
// - https://github.com/netty/netty/issues/4635
if (ctx.isRemoved()) {
break;
}
outSize = 0;
}
int oldInputLength = in.readableBytes(); // 調(diào)用子類decode之前哮笆,將當(dāng)前的可讀字節(jié)記錄下來
decodeRemovalReentryProtection(ctx, in, out);
// Check if this handler was removed before continuing the loop.
// If it was removed, it is not safe to continue to operate on the buffer.
//
// See https://github.com/netty/netty/issues/1664
if (ctx.isRemoved()) {
break;
}
if (outSize == out.size()) { // 沒解析到數(shù)據(jù)
if (oldInputLength == in.readableBytes()) { // 如果沒新數(shù)據(jù)進到累加器就break
break;
} else { // 有新數(shù)據(jù)進入了累加器,但是還不足以解析汰扭,就繼續(xù)循環(huán)
continue;
}
}
if (oldInputLength == in.readableBytes()) {
throw new DecoderException(
StringUtil.simpleClassName(getClass()) +
".decode() did not read anything but decoded a message.");
}
if (isSingleDecode()) { // 讀一次數(shù)據(jù)稠肘,只解析一次
break;
}
}
} catch (DecoderException e) {
throw e;
} catch (Exception cause) {
throw new DecoderException(cause);
}
}
- 將子類解析到的ByteBuf向下傳播
fireChannelRead(ctx, out, size);
/**
* Get {@code numElements} out of the {@link CodecOutputList} and forward these through the pipeline.
*/
static void fireChannelRead(ChannelHandlerContext ctx, CodecOutputList msgs, int numElements) {
for (int i = 0; i < numElements; i ++) { // 將解析到的每個對象向下傳播
ctx.fireChannelRead(msgs.getUnsafe(i)); // 這就最終傳播到業(yè)務(wù)解碼器中,
}
}
3. 基于固定長度解碼器 FixedLengthFrameDecoder 的分析
/**
* A decoder that splits the received {@link ByteBuf}s by the fixed number
* of bytes. For example, if you received the following four fragmented packets:
* <pre>
* +---+----+------+----+
* | A | BC | DEFG | HI |
* +---+----+------+----+
* </pre>
* A {@link FixedLengthFrameDecoder}{@code (3)} will decode them into the
* following three packets with the fixed length:
* <pre>
* +-----+-----+-----+
* | ABC | DEF | GHI |
* +-----+-----+-----+
* </pre>
*/
public class FixedLengthFrameDecoder extends ByteToMessageDecoder {
private final int frameLength; // 固定長度
/**
* Creates a new instance.
*
* @param frameLength the length of the frame
*/
public FixedLengthFrameDecoder(int frameLength) {
checkPositive(frameLength, "frameLength");
this.frameLength = frameLength;
}
@Override
protected final void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
Object decoded = decode(ctx, in);
if (decoded != null) {
out.add(decoded);
}
}
/**
* Create a frame out of the {@link ByteBuf} and return it.
*
* @param ctx the {@link ChannelHandlerContext} which this {@link ByteToMessageDecoder} belongs to
* @param in the {@link ByteBuf} from which to read data
* @return frame the {@link ByteBuf} which represent the frame or {@code null} if no frame could
* be created.
*/
protected Object decode(
@SuppressWarnings("UnusedParameters") ChannelHandlerContext ctx, ByteBuf in) throws Exception {
if (in.readableBytes() < frameLength) { // 小于固定長度东且,不解析
return null;
} else {
return in.readRetainedSlice(frameLength); // 從當(dāng)前累加器中截取frmeLength的字節(jié)數(shù)據(jù)
}
}
}
4. 基于行解碼器 LineBasedFrameDecoder 的分析
行解碼器启具,如果字節(jié)流是以\r\n結(jié)尾或者是以\n結(jié)尾的字節(jié)流,行解碼器就是以換行符為分隔珊泳,將字節(jié)流解析成一個個完整的數(shù)據(jù)包鲁冯。
public class LineBasedFrameDecoder extends ByteToMessageDecoder {
/** Maximum length of a frame we're willing to decode. */
private final int maxLength; // 行解碼器解析數(shù)據(jù)包的最大長度,超過最大長度色查,可能處理丟棄模式
/** Whether or not to throw an exception as soon as we exceed maxLength. */
private final boolean failFast; // 超過最大長度是否立即拋出異常薯演,true表示立即拋出
private final boolean stripDelimiter; // 最終解析出的數(shù)據(jù)包帶不帶換行符,如果為true表示不帶
/** True if we're discarding input because we're already over maxLength. */
private boolean discarding; // 數(shù)據(jù)流過長就會開啟丟棄模式
private int discardedBytes; // 解碼到現(xiàn)在當(dāng)前已經(jīng)丟棄的字節(jié)
/** Last scan position. */
private int offset;
/**
* Creates a new decoder.
* @param maxLength the maximum length of the decoded frame.
* A {@link TooLongFrameException} is thrown if
* the length of the frame exceeds this value.
*/
public LineBasedFrameDecoder(final int maxLength) {
this(maxLength, true, false);
}
/**
* Creates a new decoder.
* @param maxLength the maximum length of the decoded frame.
* A {@link TooLongFrameException} is thrown if
* the length of the frame exceeds this value.
* @param stripDelimiter whether the decoded frame should strip out the
* delimiter or not
* @param failFast If <tt>true</tt>, a {@link TooLongFrameException} is
* thrown as soon as the decoder notices the length of the
* frame will exceed <tt>maxFrameLength</tt> regardless of
* whether the entire frame has been read.
* If <tt>false</tt>, a {@link TooLongFrameException} is
* thrown after the entire frame that exceeds
* <tt>maxFrameLength</tt> has been read.
*/
public LineBasedFrameDecoder(final int maxLength, final boolean stripDelimiter, final boolean failFast) {
this.maxLength = maxLength;
this.failFast = failFast;
this.stripDelimiter = stripDelimiter;
}
@Override
protected final void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
Object decoded = decode(ctx, in); // 調(diào)用重載的方法
if (decoded != null) { // 如果解析到對象秧了,放入outList
out.add(decoded);
}
}
/**
* Create a frame out of the {@link ByteBuf} and return it.
*
* @param ctx the {@link ChannelHandlerContext} which this {@link ByteToMessageDecoder} belongs to
* @param buffer the {@link ByteBuf} from which to read data
* @return frame the {@link ByteBuf} which represent the frame or {@code null} if no frame could
* be created.
*/
protected Object decode(ChannelHandlerContext ctx, ByteBuf buffer) throws Exception {
final int eol = findEndOfLine(buffer); // 找行的結(jié)尾
if (!discarding) {
if (eol >= 0) {
final ByteBuf frame;
final int length = eol - buffer.readerIndex();
final int delimLength = buffer.getByte(eol) == '\r'? 2 : 1;
if (length > maxLength) { // 數(shù)據(jù)流長度> 最大解析長度
buffer.readerIndex(eol + delimL ength); // 如果超過跨扮,把readerIndex直接指向換行符之后的字節(jié),相當(dāng)于把超長的消息給丟棄了
fail(ctx, length); // 傳播異常
return null;
}
if (stripDelimiter) { // 不帶分隔符
frame = buffer.readRetainedSlice(length); // 從累加器讀取指定長度
buffer.skipBytes(delimLength);
} else { // 解析出來的消息帶分隔符
frame = buffer.readRetainedSlice(length + delimLength);
}
return frame;
} else { // 非丟棄模式,找不到換行符的時候
final int length = buffer.readableBytes();
if (length > maxLength) { // 大于最大長度了
discardedBytes = length; // 丟棄
buffer.readerIndex(buffer.writerIndex());
discarding = true; // 當(dāng)前進入丟棄模式
offset = 0;
if (failFast) { // 傳播異常
fail(ctx, "over " + discardedBytes);
}
}
return null;
}
} else { // 丟棄模式
if (eol >= 0) {
final int length = discardedBytes + eol - buffer.readerIndex(); // 計算丟棄模式下后半段消息的長度(前半段因為超長已經(jīng)丟棄了衡创,這里處理的是后半段垃圾信息)
final int delimLength = buffer.getByte(eol) == '\r'? 2 : 1;
buffer.readerIndex(eol + delimLength); // 讀指針移到新行開頭帝嗡,把length那段整體丟棄
discardedBytes = 0;
discarding = false; // 進入正常模式--非丟棄模式
if (!failFast ) {
fail(ctx, length);
}
} else {// 丟棄模式下未找到換行符,繼續(xù)丟棄璃氢,然后計算長度
discardedBytes += buffer.readableBytes();
buffer.readerIndex(buffer.writerIndex());
// We skip everything in the buffer, we need to set the offset to 0 again.
offset = 0;
}
return null;
}
}
private void fail(final ChannelHandlerContext ctx, int length) {
fail(ctx, String.valueOf(length));
}
private void fail(final ChannelHandlerContext ctx, String length) {
ctx.fireExceptionCaught(
new TooLongFrameException(
"frame length (" + length + ") exceeds the allowed maximum (" + maxLength + ')'));
}
/**
* Returns the index in the buffer of the end of line found.
* Returns -1 if no end of line was found in the buffer.
*/
private int findEndOfLine(final ByteBuf buffer) {
int totalLength = buffer.readableBytes();
int i = buffer.forEachByte(buffer.readerIndex() + offset, totalLength - offset, ByteProcessor.FIND_LF);
if (i >= 0) {
offset = 0;
if (i > 0 && buffer.getByte(i - 1) == '\r') {
i--;
}
} else {
offset = totalLength;
}
return i;
}
}
5. 基于分隔符解碼器 DelimiterBasedFrameDecoder 分析
它最少要有兩個參數(shù)哟玷,一個是解析數(shù)據(jù)流最大長度,一個是分隔符一也,它可以傳多個分隔符進去巢寡。
基于分隔符解碼器分析解碼步驟
- 行處理器
- 解碼
public class DelimiterBasedFrameDecoder extends ByteToMessageDecoder {
private final ByteBuf[] delimiters;
private final int maxFrameLength;
private final boolean stripDelimiter;
private final boolean failFast;
private boolean discardingTooLongFrame;
private int tooLongFrameLength;
/** Set only when decoding with "\n" and "\r\n" as the delimiter. */
private final LineBasedFrameDecoder lineBasedDecoder;
/**
* Creates a new instance.
*
* @param maxFrameLength the maximum length of the decoded frame.
* A {@link TooLongFrameException} is thrown if
* the length of the frame exceeds this value.
* @param delimiter the delimiter
*/
public DelimiterBasedFrameDecoder(int maxFrameLength, ByteBuf delimiter) {
this(maxFrameLength, true, delimiter);
}
/**
* Creates a new instance.
*
* @param maxFrameLength the maximum length of the decoded frame.
* A {@link TooLongFrameException} is thrown if
* the length of the frame exceeds this value.
* @param stripDelimiter whether the decoded frame should strip out the
* delimiter or not
* @param delimiter the delimiter
*/
public DelimiterBasedFrameDecoder(
int maxFrameLength, boolean stripDelimiter, ByteBuf delimiter) {
this(maxFrameLength, stripDelimiter, true, delimiter);
}
/**
* Creates a new instance.
*
* @param maxFrameLength the maximum length of the decoded frame.
* A {@link TooLongFrameException} is thrown if
* the length of the frame exceeds this value.
* @param stripDelimiter whether the decoded frame should strip out the
* delimiter or not
* @param failFast If <tt>true</tt>, a {@link TooLongFrameException} is
* thrown as soon as the decoder notices the length of the
* frame will exceed <tt>maxFrameLength</tt> regardless of
* whether the entire frame has been read.
* If <tt>false</tt>, a {@link TooLongFrameException} is
* thrown after the entire frame that exceeds
* <tt>maxFrameLength</tt> has been read.
* @param delimiter the delimiter
*/
public DelimiterBasedFrameDecoder(
int maxFrameLength, boolean stripDelimiter, boolean failFast,
ByteBuf delimiter) {
this(maxFrameLength, stripDelimiter, failFast, new ByteBuf[] {
delimiter.slice(delimiter.readerIndex(), delimiter.readableBytes())});
}
/**
* Creates a new instance.
*
* @param maxFrameLength the maximum length of the decoded frame.
* A {@link TooLongFrameException} is thrown if
* the length of the frame exceeds this value.
* @param delimiters the delimiters
*/
public DelimiterBasedFrameDecoder(int maxFrameLength, ByteBuf... delimiters) {
this(maxFrameLength, true, delimiters);
}
/**
* Creates a new instance.
*
* @param maxFrameLength the maximum length of the decoded frame.
* A {@link TooLongFrameException} is thrown if
* the length of the frame exceeds this value.
* @param stripDelimiter whether the decoded frame should strip out the
* delimiter or not
* @param delimiters the delimiters
*/
public DelimiterBasedFrameDecoder(
int maxFrameLength, boolean stripDelimiter, ByteBuf... delimiters) {
this(maxFrameLength, stripDelimiter, true, delimiters);
}
/**
* Creates a new instance.
*
* @param maxFrameLength the maximum length of the decoded frame.
* A {@link TooLongFrameException} is thrown if
* the length of the frame exceeds this value.
* @param stripDelimiter whether the decoded frame should strip out the
* delimiter or not
* @param failFast If <tt>true</tt>, a {@link TooLongFrameException} is
* thrown as soon as the decoder notices the length of the
* frame will exceed <tt>maxFrameLength</tt> regardless of
* whether the entire frame has been read.
* If <tt>false</tt>, a {@link TooLongFrameException} is
* thrown after the entire frame that exceeds
* <tt>maxFrameLength</tt> has been read.
* @param delimiters the delimiters
*/
public DelimiterBasedFrameDecoder(
int maxFrameLength, boolean stripDelimiter, boolean failFast, ByteBuf... delimiters) {
validateMaxFrameLength(maxFrameLength);
if (delimiters == null) {
throw new NullPointerException("delimiters");
}
if (delimiters.length == 0) {
throw new IllegalArgumentException("empty delimiters");
}
if (isLineBased(delimiters) && !isSubclass()) {
lineBasedDecoder = new LineBasedFrameDecoder(maxFrameLength, stripDelimiter, failFast);
this.delimiters = null;
} else {
this.delimiters = new ByteBuf[delimiters.length];
for (int i = 0; i < delimiters.length; i ++) {
ByteBuf d = delimiters[i];
validateDelimiter(d);
this.delimiters[i] = d.slice(d.readerIndex(), d.readableBytes());
}
lineBasedDecoder = null;
}
this.maxFrameLength = maxFrameLength;
this.stripDelimiter = stripDelimiter;
this.failFast = failFast;
}
/** Returns true if the delimiters are "\n" and "\r\n". */
private static boolean isLineBased(final ByteBuf[] delimiters) {
if (delimiters.length != 2) {
return false;
}
ByteBuf a = delimiters[0];
ByteBuf b = delimiters[1];
if (a.capacity() < b.capacity()) {
a = delimiters[1];
b = delimiters[0];
}
return a.capacity() == 2 && b.capacity() == 1
&& a.getByte(0) == '\r' && a.getByte(1) == '\n'
&& b.getByte(0) == '\n';
}
/**
* Return {@code true} if the current instance is a subclass of DelimiterBasedFrameDecoder
*/
private boolean isSubclass() {
return getClass() != DelimiterBasedFrameDecoder.class;
}
@Override
protected final void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
Object decoded = decode(ctx, in); // 重載一個decode方法
if (decoded != null) {
out.add(decoded); // 解析出的對象加入outList
}
}
/**
* Create a frame out of the {@link ByteBuf} and return it.
*
* @param ctx the {@link ChannelHandlerContext} which this {@link ByteToMessageDecoder} belongs to
* @param buffer the {@link ByteBuf} from which to read data
* @return frame the {@link ByteBuf} which represent the frame or {@code null} if no frame could
* be created.
*/
protected Object decode(ChannelHandlerContext ctx, ByteBuf buffer) throws Exception {
if (lineBasedDecoder != null) { // 如果行處理器不為空,調(diào)用行處理器
return lineBasedDecoder.decode(ctx, buffer);
}
// Try all delimiters and choose the delimiter which yields the shortest frame.
int minFrameLength = Integer.MAX_VALUE; // 找到最小分隔符
ByteBuf minDelim = null;
for (ByteBuf delim: delimiters) {
int frameLength = indexOf(buffer, delim);
if (frameLength >= 0 && frameLength < minFrameLength) { // 以某個分隔符分割的最小數(shù)據(jù)包的長度
minFrameLength = frameLength;
minDelim = delim;
}
}
if (minDelim != null) { // 開始解碼椰苟,找到分隔符
int minDelimLength = minDelim.capacity();
ByteBuf frame;
if (discardingTooLongFrame) { // 丟棄模式
// We've just finished discarding a very large frame.
// Go back to the initial state.
discardingTooLongFrame = false; // 設(shè)置正常模式抑月,非丟棄模式
buffer.skipBytes(minFrameLength + minDelimLength);
int tooLongFrameLength = this.tooLongFrameLength;
this.tooLongFrameLength = 0;
if (!failFast) {
fail(tooLongFrameLength);
}
return null;
}
if (minFrameLength > maxFrameLength) {
// Discard read frame. 丟棄
buffer.skipBytes(minFrameLength + minDelimLength);
fail(minFrameLength); // 傳播異常
return null;
}
if (stripDelimiter) { // 是否包含分隔符
frame = buffer.readRetainedSlice(minFrameLength);
buffer.skipBytes(minDelimLength);
} else {
frame = buffer.readRetainedSlice(minFrameLength + minDelimLength);
}
return frame;
} else { // 沒有找到分隔符
if (!discardingTooLongFrame) { // 非丟棄模式
if (buffer.readableBytes() > maxFrameLength) { // 數(shù)據(jù)包過長,丟棄
// Discard the content of the buffer until a delimiter is found.
tooLongFrameLength = buffer.readableBytes(); // 記錄丟棄的字節(jié)數(shù)
buffer.skipBytes(buffer.readableBytes());
discardingTooLongFrame = true; // 標(biāo)記為丟棄狀態(tài)
if (failFast) {
fail(tooLongFrameLength);
}
}
} else { // 當(dāng)前處于丟棄模式
// Still discarding the buffer since a delimiter is not found.
tooLongFrameLength += buffer.readableBytes();
buffer.skipBytes(buffer.readableBytes());
}
return null;
}
}
private void fail(long frameLength) {
if (frameLength > 0) {
throw new TooLongFrameException(
"frame length exceeds " + maxFrameLength +
": " + frameLength + " - discarded");
} else {
throw new TooLongFrameException(
"frame length exceeds " + maxFrameLength +
" - discarding");
}
}
/**
* Returns the number of bytes between the readerIndex of the haystack and
* the first needle found in the haystack. -1 is returned if no needle is
* found in the haystack.
*/
private static int indexOf(ByteBuf haystack, ByteBuf needle) {
for (int i = haystack.readerIndex(); i < haystack.writerIndex(); i ++) {
int haystackIndex = i;
int needleIndex;
for (needleIndex = 0; needleIndex < needle.capacity(); needleIndex ++) {
if (haystack.getByte(haystackIndex) != needle.getByte(needleIndex)) {
break;
} else {
haystackIndex ++;
if (haystackIndex == haystack.writerIndex() &&
needleIndex != needle.capacity() - 1) {
return -1;
}
}
}
if (needleIndex == needle.capacity()) {
// Found the needle from the haystack!
return i - haystack.readerIndex();
}
}
return -1;
}
private static void validateDelimiter(ByteBuf delimiter) {
if (delimiter == null) {
throw new NullPointerException("delimiter");
}
if (!delimiter.isReadable()) {
throw new IllegalArgumentException("empty delimiter");
}
}
private static void validateMaxFrameLength(int maxFrameLength) {
checkPositive(maxFrameLength, "maxFrameLength");
}
}
6. 基于長度域的解碼器 LengthFieldBasedFrameDecoder 分析
重要參數(shù)
lengthFieldOffset: 長度域在二進制數(shù)據(jù)流中偏移量舆蝴。
lengthFieldLength:長度域的長度谦絮,有幾個取值1,2须误,3挨稿,4,8京痢。
lengthAdjustment:lengthFieldLength不代表一個消息的完整長度奶甘,lengthFieldLength指定長度+lengthAdjustment 才是。用于lengthFieldLength指定的長度包含長度域的情況祭椰,這時候lengthAdjustment設(shè)置為負(fù)值方便解碼臭家。
initialBytesToStrip:我在解碼的時候,需要跳過的字節(jié)數(shù)方淤,常用于跳過長度域钉赁。
解碼過程
LengthFieldBasedFrameDecoder 解碼過程如下:
- 計算需要抽取的數(shù)據(jù)包長度
- 跳過字節(jié)邏輯處理
- 丟棄模式下的處理
我們來看它的核心解碼代碼:
protected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
if (discardingTooLongFrame) {
discardingTooLongFrame(in);
}
if (in.readableBytes() < lengthFieldEndOffset) { // 當(dāng)前字節(jié)數(shù)還不夠 解析出lengthFieldLength
return null;
}
int actualLengthFieldOffset = in.readerIndex() + lengthFieldOffset; // 因為是流,要加上讀當(dāng)前指針的位置你踩,計算實際偏移量
long frameLength = getUnadjustedFrameLength(in, actualLengthFieldOffset, lengthFieldLength, byteOrder); // lengthFieldLength中指定的數(shù)據(jù)包長度
if (frameLength < 0) {
failOnNegativeLengthField(in, frameLength, lengthFieldEndOffset);
}
frameLength += lengthAdjustment + lengthFieldEndOffset; // 計算出需要抽取的數(shù)據(jù)包長度
if (frameLength < lengthFieldEndOffset) {
failOnFrameLengthLessThanLengthFieldEndOffset(in, frameLength, lengthFieldEndOffset);
}
if (frameLength > maxFrameLength) { // 這里面會設(shè)置進入丟棄模式
exceededFrameLength(in, frameLength);
return null;
}
// never overflows because it's less than maxFrameLength
int frameLengthInt = (int) frameLength;
if (in.readableBytes() < frameLengthInt) {
return null;
}
if (initialBytesToStrip > frameLengthInt) {
failOnFrameLengthLessThanInitialBytesToStrip(in, frameLength, initialBytesToStrip);
}
// 到這里已經(jīng)有一個完整的數(shù)據(jù)包了
in.skipBytes(initialBytesToStrip); // 跳過略過的長度
// extract frame
int readerIndex = in.readerIndex();
int actualFrameLength = frameLengthInt - initialBytesToStrip; // 真正需要拿的字節(jié)
ByteBuf frame = extractFrame(ctx, in, readerIndex, actualFrameLength);
in.readerIndex(readerIndex + actualFrameLength);
return frame;
}
frameLength > maxFrameLength
就會設(shè)置丟棄模式
丟棄模式處理
private void exceededFrameLength(ByteBuf in, long frameLength) {
long discard = frameLength - in.readableBytes(); // 計算下次還要丟棄的字節(jié)數(shù)
tooLongFrameLength = frameLength; // 記錄
if (discard < 0) { // 說明一次丟棄就能解決問題
// buffer contains more bytes then the frameLength so we can discard all now
in.skipBytes((int) frameLength);
} else { // 說明還需要一次丟棄才能解決問題
// Enter the discard mode and discard everything received so far.
discardingTooLongFrame = true; // 設(shè)置為丟棄模式
bytesToDiscard = discard;
in.skipBytes(in.readableBytes()); // 跳過,即丟棄
}
failIfNecessary(true);
}
我們來看 failNecessary方法:
private void failIfNecessary(boolean firstDetectionOfTooLongFrame) {
if (bytesToDiscard == 0) { // 說明不需要丟棄
// Reset to the initial state and tell the handlers that
// the frame was too large.
long tooLongFrameLength = this.tooLongFrameLength;
this.tooLongFrameLength = 0;
discardingTooLongFrame = false; // 設(shè)置為正常模式
if (!failFast || firstDetectionOfTooLongFrame) { // 是否需要快速失敗
fail(tooLongFrameLength);
}
} else {
// Keep discarding and notify handlers if necessary.
if (failFast && firstDetectionOfTooLongFrame) { // 如果是快速失敗讳苦,且是第一次
fail(tooLongFrameLength); // 發(fā)出失敗通知
}
}
}
7. 解碼器總結(jié)
1. 解碼器抽象的解碼過程是怎樣的带膜?
答:抽象的解碼過程是通過ByteToMessageDecoder實現(xiàn)的,它的解碼過程分為以下三步驟:第一步累加字節(jié)流鸳谜,它會把當(dāng)前讀取的字節(jié)流累加到累加器cumulator中膝藕;第二步調(diào)用子類的decode方法進行解析,decode方法實際上是一個抽象方法咐扭,不同的子類解碼器的decode有不同的實現(xiàn)芭挽。調(diào)用decode時會傳入兩個比較重要的參數(shù)滑废,一個是當(dāng)前累加的字節(jié)流,一個是outList袜爪,子類在解碼時從累加器中讀取一段數(shù)據(jù)蠕趁,如果成功解析出一個數(shù)據(jù)包,就把這個數(shù)據(jù)包添加到outList中饿敲;第三步妻导,如果outList中有解析出的數(shù)據(jù)包,就通過pipeline的事件傳播機制往下傳播怀各。
2. Netty中有哪些拆箱即用的解碼器?
netty提供了許許多多開箱即用的解碼器术浪,99%的業(yè)務(wù)場景下使用netty提供的解碼器就能完成需求瓢对,使用netty提供的解碼器代碼的健壯性能夠得到保證。比如固定長度解碼器FixedLengthFrameDecoder胰苏,行解碼器(LineBasedFrameDecoder)硕蛹;固定分隔符解碼器(DelimiterBasedFrameDecoder),可以傳一些分隔符進去硕并,如果是行分隔符netty會做特殊處理法焰;基于長度域的解碼器(LengthFieldBasedFrameDecoder),比較通用的解碼器倔毙,解碼的過程比較復(fù)雜埃仪,主要有三個步驟,第一步計算需要抽取的數(shù)據(jù)包的長度陕赃,第二步對拿到的數(shù)據(jù)包進行處理比如跳過指定字節(jié)數(shù)卵蛉,處理完之后一個完整的數(shù)據(jù)包就可以放入outList中,接下來就是丟棄模式的處理么库。