ByteToMessageDecoder是解碼器的基類(lèi), 具有最基本的能力, 將字節(jié)解碼成消息, 以便在pipeline上進(jìn)行傳遞.
關(guān)鍵屬性
// 對(duì)入站數(shù)據(jù)進(jìn)行臨時(shí)緩沖, 直到它準(zhǔn)備好處理
ByteBuf cumulation;
// 緩沖的策略
private Cumulator cumulator = MERGE_CUMULATOR;
// 是否只解碼一次
private boolean singleDecode;
// 意思是解碼有沒(méi)有結(jié)果, true為沒(méi)有
private boolean decodeWasNull;
// 這批數(shù)據(jù)是不是第一次處理
private boolean first;
private int discardAfterReads = 16;
private int numReads;
channelRead
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
// 如果消息是ByteBuf類(lèi)型
if (msg instanceof ByteBuf) {
// 從對(duì)象池取出一個(gè)CodecOutputList, 用來(lái)收集解碼后的消息
CodecOutputList out = CodecOutputList.newInstance();
try {
ByteBuf data = (ByteBuf) msg;
// 看累積器是不是為空來(lái)決定是不是首次處理
first = cumulation == null;
// 看是不是第一次處理,如果是,那么直接賦予累積器
if (first) {
cumulation = data;
} else {
// 否則累加到累積器, 見(jiàn)cumulator部分
cumulation = cumulator.cumulate(ctx.alloc(), cumulation, data);
}
// 對(duì)消息進(jìn)行解碼
callDecode(ctx, cumulation, out);
} catch (DecoderException e) {
throw e;
} catch (Throwable t) {
throw new DecoderException(t);
} finally {
// 如果累積器的內(nèi)容已經(jīng)讀取完畢,那么回收掉
if (cumulation != null && !cumulation.isReadable()) {
numReads = 0;
cumulation.release();
cumulation = null;
// 當(dāng)這批數(shù)據(jù)已經(jīng)讀了有16次之多后,需要整理下內(nèi)存
} else if (++ numReads >= discardAfterReads) {
numReads = 0;
// 這里主要是對(duì)累積器進(jìn)行整理,清理discard區(qū)域?yàn)樽x寫(xiě)空間騰地方
discardSomeReadBytes();
}
// 將解碼后的結(jié)果通知下游, 且回收out容器
int size = out.size();
decodeWasNull = !out.insertSinceRecycled();
fireChannelRead(ctx, out, size);
out.recycle();
}
} else {
// 如果不是ByteBuf類(lèi)型,直接傳遞給下游
ctx.fireChannelRead(msg);
}
}
callDecode
protected void callDecode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
try {
// 如果緩沖器有內(nèi)容可讀
while (in.isReadable()) {
int outSize = out.size();
// 如果out容器有內(nèi)容,那么說(shuō)明解碼有結(jié)果了,那么馬上需要通知下游handle
// 通知完,清理容器out
if (outSize > 0) {
fireChannelRead(ctx, out, outSize);
out.clear();
// 如果此時(shí)hanler被移除,那么不用繼續(xù)處理,直接退出
if (ctx.isRemoved()) {
break;
}
outSize = 0;
}
int oldInputLength = in.readableBytes();
// 子類(lèi)來(lái)處理具體解碼的工作,最終將解碼后的消息放在out里面就好
decode(ctx, in, out);
// 如果此時(shí)hanler被移除,那么不用繼續(xù)處理,直接退出
if (ctx.isRemoved()) {
break;
}
// 如果這時(shí),out容器的大小沒(méi)有變化,說(shuō)明子類(lèi)那邊解碼還沒(méi)有結(jié)果
if (outSize == out.size()) {
// 如果沒(méi)有讀取任何數(shù)據(jù), 那么結(jié)束循環(huán)
if (oldInputLength == in.readableBytes()) {
break;
} else {
// 如果有讀取,但不足與產(chǎn)生結(jié)果,那么需要繼續(xù)讀取
continue;
}
}
// 如果有結(jié)果,但是根本就沒(méi)有讀取數(shù)據(jù),那么不是很詭異么?
if (oldInputLength == in.readableBytes()) {
throw new DecoderException(
StringUtil.simpleClassName(getClass()) +
".decode() did not read anything but decoded a message.");
}
// 是否只執(zhí)行一次
if (isSingleDecode()) {
break;
}
}
} catch (DecoderException e) {
throw e;
} catch (Throwable cause) {
throw new DecoderException(cause);
}
}
Cumulator
MERGE_CUMULATOR
public static final Cumulator MERGE_CUMULATOR = new Cumulator() {
@Override
public ByteBuf cumulate(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf in) {
ByteBuf buffer;
// 簡(jiǎn)單的情況是當(dāng)前的累積器空間不足,需要擴(kuò)容
// 因?yàn)閞ead的時(shí)候,每隔一段時(shí)間都需要對(duì)累積器的內(nèi)存空間進(jìn)行整理,那么整理的過(guò)程會(huì)導(dǎo)致
// 讀寫(xiě)index變更, 進(jìn)而導(dǎo)致淺拷貝后的ByteBuf不可用.
if (cumulation.writerIndex() > cumulation.maxCapacity() - in.readableBytes()
|| cumulation.refCnt() > 1) {
buffer = expandCumulation(alloc, cumulation, in.readableBytes());
} else {
buffer = cumulation;
}
buffer.writeBytes(in);
in.release();
return buffer;
}
};
子緩沖區(qū)
Netty中調(diào)用ByteBuf.duplicate(),ByteBuf.slice()和ByteBuf.order(ByteOrder)三個(gè)方法缆瓣, 會(huì)創(chuàng)建一個(gè)子緩沖區(qū)闪盔,子緩沖區(qū)共享父緩沖區(qū)的內(nèi)存區(qū)域抚官。子緩沖區(qū)沒(méi)有自己的引用計(jì)數(shù),而是 共享父緩沖區(qū)的引用計(jì)數(shù)科雳。
當(dāng)父緩沖區(qū)release的時(shí)候, 會(huì)引用計(jì)數(shù)清零, 導(dǎo)致該內(nèi)存區(qū)域被回收, 進(jìn)而影響子緩沖區(qū), 導(dǎo)致讀寫(xiě)失敗. 那么需要注意的事, 子緩沖區(qū)需要顯示調(diào)用retain來(lái)提示Netty有其他人在使用, 防止被錯(cuò)誤回收. 這里帶來(lái)額外的壞處是, 所有子緩沖區(qū)在使用完后, 要及時(shí)release, 防止內(nèi)存泄漏.
在前面的場(chǎng)景中可以看到, 解碼器在解碼完后視情況來(lái)決定要不要做內(nèi)存整理, 而整理的過(guò)程會(huì)進(jìn)行數(shù)據(jù)移動(dòng), 且按照整理后的結(jié)果重置read和write索引, 這樣會(huì)影響到子緩沖區(qū)的讀寫(xiě). 下面是個(gè)簡(jiǎn)單的例子, 自己體會(huì). 另外一個(gè)問(wèn)題是, 如果源Buffer提前release, 那么子緩沖區(qū)也會(huì)讀寫(xiě)異常.
ByteBuf source = ByteBufAllocator.DEFAULT.buffer(20, 20);
source.writeInt(1);
source.readInt();
source.writeInt(2);
ByteBuf duplicate = source.duplicate();
System.out.println("source" + source.toString());
System.out.println("duplicate" + duplicate.toString());
source.discardReadBytes();
System.out.println("source" + source.toString());
System.out.println("duplicate" + duplicate.toString());
System.out.println("duplicate" + duplicate.readInt());
duplicate.readerIndex(0);
System.out.println("duplicate" + duplicate.toString());
System.out.println("duplicate" + duplicate.readInt());
source:PooledUnsafeDirectByteBuf(ridx: 4, widx: 16, cap: 20/20)
duplicate:UnpooledDuplicatedByteBuf(ridx: 4, widx: 16, cap: 20/20)
source:PooledUnsafeDirectByteBuf(ridx: 0, widx: 12, cap: 20/20)
duplicate:UnpooledDuplicatedByteBuf(ridx: 4, widx: 16, cap: 20/20)
duplicate:0
duplicate:UnpooledDuplicatedByteBuf(ridx: 0, widx: 16, cap: 20/20)
duplicate:2
expandCumulation
可以看到解決上面問(wèn)題的方案也是簡(jiǎn)單粗暴, 直接重建一個(gè)ByteBuf, 將數(shù)據(jù)拷貝過(guò)來(lái). 這樣, 之前的子緩沖區(qū)也不會(huì)被之后可能的內(nèi)存整理給影響.
static ByteBuf expandCumulation(ByteBufAllocator alloc, ByteBuf cumulation, int readable) {
ByteBuf oldCumulation = cumulation;
cumulation = alloc.buffer(oldCumulation.readableBytes() + readable);
cumulation.writeBytes(oldCumulation);
oldCumulation.release();
return cumulation;
}