ByteToMessageDecoder
該方法提供了將 ByteBuf 轉(zhuǎn)化為對象的解碼器處理流程邪乍,具體的解碼規(guī)則交由子類去實現(xiàn)捅伤。
我們以讀操作 channelRead 為例來研究一下:
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (msg instanceof ByteBuf) {
// out 是一個鏈表,存放解碼成功的對象
CodecOutputList out = CodecOutputList.newInstance();
try {
ByteBuf data = (ByteBuf) msg;
// cumulation 中存放的是上次未處理完的半包消息
first = cumulation == null;
if (first) {
cumulation = data;
} else {
// 本次處理难衰,需要把上次遺留的半包和本次數(shù)據(jù)拼接后钦无,一起處理
cumulation = cumulator.cumulate(ctx.alloc(), cumulation, data);
}
// 調(diào)用解碼器解碼消息
callDecode(ctx, cumulation, out);
} catch (DecoderException e) {
throw e;
} catch (Exception e) {
throw new DecoderException(e);
} finally {
if (cumulation != null && !cumulation.isReadable()) {
numReads = 0;
cumulation.release();
cumulation = null;
} else if (++ numReads >= discardAfterReads) {
numReads = 0;
discardSomeReadBytes();
}
int size = out.size();
decodeWasNull = !out.insertSinceRecycled();
// 如果有解碼成功的數(shù)據(jù),需要向后傳遞盖袭,讓其他 ChannelHandler 繼續(xù)處理
fireChannelRead(ctx, out, size);
out.recycle();
}
} else {
ctx.fireChannelRead(msg);
}
}
protected void callDecode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
try {
while (in.isReadable()) {
int outSize = out.size();
if (outSize > 0) {
// 如果有解碼成功的數(shù)據(jù)失暂,需要向后傳遞,讓其他 ChannelHandler 繼續(xù)處理
fireChannelRead(ctx, out, outSize);
out.clear();
// 如果當前 ChannelHandler 所屬 ctx 被剔除 pipeline 上下文鳄虱,就不需要繼續(xù)處理了
if (ctx.isRemoved()) {
break;
}
outSize = 0;
}
int oldInputLength = in.readableBytes();
// 解碼
decodeRemovalReentryProtection(ctx, in, out);
if (ctx.isRemoved()) {
break;
}
if (outSize == out.size()) {
if (oldInputLength == in.readableBytes()) {
break;
} else {
continue;
}
}
if (oldInputLength == in.readableBytes()) {
throw new DecoderException(
StringUtil.simpleClassName(getClass()) +
".decode() did not read anything but decoded a message.");
}
if (isSingleDecode()) {
break;
}
}
} catch (DecoderException e) {
throw e;
} catch (Exception cause) {
throw new DecoderException(cause);
}
}
final void decodeRemovalReentryProtection(ChannelHandlerContext ctx, ByteBuf in, List<Object> out)
throws Exception {
// 設置解碼器狀態(tài)為正在解碼弟塞,避免解碼過程中另一個線程調(diào)用了 handlerRemoved 把數(shù)據(jù)銷毀,造成混亂
decodeState = STATE_CALLING_CHILD_DECODE;
try {
decode(ctx, in, out);
} finally {
// STATE_HANDLER_REMOVED_PENDING 表示在解碼過程中拙已,ctx 被移除决记,需要由當前線程來調(diào)用 handlerRemoved
boolean removePending = decodeState == STATE_HANDLER_REMOVED_PENDING;
decodeState = STATE_INIT;
if (removePending) {
handlerRemoved(ctx);
}
}
}
// 具體的消息解碼算法,交給子類實現(xiàn)
protected abstract void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception;