編解碼處理器作為Netty編程時(shí)必備的ChannelHandler,每個(gè)應(yīng)用都必不可少粟誓。Netty作為網(wǎng)絡(luò)應(yīng)用框架报咳,在網(wǎng)絡(luò)上的各個(gè)應(yīng)用之間不斷進(jìn)行數(shù)據(jù)交互。而網(wǎng)絡(luò)數(shù)據(jù)交換的基本單位是字節(jié)凌彬,所以需要將本應(yīng)用的POJO對(duì)象編碼為字節(jié)數(shù)據(jù)發(fā)送到其他應(yīng)用,或者將收到的其他應(yīng)用的字節(jié)數(shù)據(jù)解碼為本應(yīng)用可使用的POJO對(duì)象循衰。這一部分铲敛,又和JAVA中的序列化和反序列化對(duì)應(yīng)。幸運(yùn)的是会钝,有很多其他的開源工具(protobuf伐蒋,thrift,json迁酸,xml等等)可方便的處理POJO對(duì)象的序列化先鱼,可參見這個(gè)鏈接。
在互聯(lián)網(wǎng)中奸鬓,Netty使用TCP/UDP協(xié)議傳輸數(shù)據(jù)焙畔。由于Netty基于異步事件處理以及TCP的一些特性,使得TCP數(shù)據(jù)包會(huì)發(fā)生粘包現(xiàn)象串远。想象這樣的情況宏多,客戶端與服務(wù)端建立連接后,連接發(fā)送了兩條消息:
+------+ +------+
| MSG1 | | MSG2 |
+------+ +------+
在互聯(lián)網(wǎng)上傳輸數(shù)據(jù)時(shí)澡罚,連續(xù)發(fā)送的兩條消息伸但,在服務(wù)端極有可能被合并為一條:
+------------+
| MSG1 MSG2 |
+------------+
這還不是最壞的情況,由于路由器的拆包和重組留搔,可能收到這樣的兩個(gè)數(shù)據(jù)包:
+----+ +---------+ +-------+ +-----+
| MS | | G1MSG2 | 或者 | MSG1M | | SG2 |
+----+ +---------+ +-------+ +-----+
而服務(wù)端要正確的識(shí)別出這樣的兩條消息更胖,就需要編碼器的正確工作。為了正確的識(shí)別出消息隔显,業(yè)界有以下幾種做法:
- 使用定界符分割消息却妨,一個(gè)特例是使用換行符分隔每條消息。
- 使用定長的消息荣月。
- 在消息的某些字段指明消息長度管呵。
明白了這些,進(jìn)入正題哺窄,分析Netty的編碼框架ByteToMessageDecoder
捐下。
8.1 ByteToMessageDecoder
在分析之前,需要說明一點(diǎn):ByteToMessage
容易引起誤解萌业,解碼結(jié)果Message會(huì)被認(rèn)為是JAVA對(duì)象POJO坷襟,但實(shí)際解碼結(jié)果是消息幀。也就是說該解碼器處理TCP的粘包現(xiàn)象生年,將網(wǎng)絡(luò)發(fā)送的字節(jié)流解碼為具有確定含義的消息幀婴程,之后的解碼器再將消息幀解碼為實(shí)際的POJO對(duì)象。
明白了這點(diǎn)抱婉,再次回顧兩條消息發(fā)送的最壞情況档叔,可知要正確取得兩條消息桌粉,需要一個(gè)內(nèi)存區(qū)域存儲(chǔ)消息,當(dāng)收到MS
時(shí)繼續(xù)等待第二個(gè)包G1MSG2
到達(dá)再進(jìn)行解碼操作衙四。在ByteToMessageDecoder
中铃肯,這個(gè)內(nèi)存區(qū)域被抽象為Cumulator
,直譯累積器传蹈,可自動(dòng)擴(kuò)容累積字節(jié)數(shù)據(jù)押逼,Netty將其定義為一個(gè)接口:
public interface Cumulator {
ByteBuf cumulate(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf in);
}
其中,兩個(gè)ByteBuf參數(shù)cumulation
指已經(jīng)累積的字節(jié)數(shù)據(jù)惦界,in
表示該次channelRead()
讀取到的新數(shù)據(jù)挑格。返回ByteBuf為累積數(shù)據(jù)后的新累積區(qū)(必要時(shí)候自動(dòng)擴(kuò)容)。自動(dòng)擴(kuò)容的代碼如下:
static ByteBuf expandCumulation(ByteBufAllocator alloc, ByteBuf cumulation,
int newReadBytes) {
ByteBuf oldCumulation = cumulation;
// 擴(kuò)容后新的緩沖區(qū)
cumulation = alloc.buffer(oldCumulation.readableBytes() + readable);
cumulation.writeBytes(oldCumulation);
// 舊的緩沖區(qū)釋放
oldCumulation.release();
return cumulation;
}
自動(dòng)擴(kuò)容的方法簡單粗暴沾歪,直接使用大容量的Bytebuf替換舊的ByteBuf漂彤。Netty定義了兩個(gè)累積器,一個(gè)為MERGE_CUMULATOR
:
public static final Cumulator MERGE_CUMULATOR = new Cumulator() {
@Override
public ByteBuf cumulate(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf in) {
ByteBuf buffer;
// 1.累積區(qū)容量不夠容納數(shù)據(jù)
// 2.用戶使用了slice().retain()或duplicate().retain()使refCnt增加
if (cumulation.writerIndex() > cumulation.maxCapacity() - in.readableBytes()
|| cumulation.refCnt() > 1) {
buffer = expandCumulation(alloc, cumulation, in.readableBytes());
} else {
buffer = cumulation;
}
buffer.writeBytes(in);
in.release();
return buffer;
}
};
可知瞬逊,兩種情況下會(huì)擴(kuò)容:
- 累積區(qū)容量不夠容納新讀入的數(shù)據(jù)
- 用戶使用了
slice().retain()
或duplicate().retain()
使refCnt增加并且大于1显歧,此時(shí)擴(kuò)容返回一個(gè)新的累積區(qū)ByteBuf仪或,方便用戶對(duì)老的累積區(qū)ByteBuf進(jìn)行后續(xù)處理确镊。
另一個(gè)累積器為COMPOSITE_CUMULATOR
:
public static final Cumulator COMPOSITE_CUMULATOR = new Cumulator() {
@Override
public ByteBuf cumulate(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf in) {
ByteBuf buffer;
if (cumulation.refCnt() > 1) {
buffer = expandCumulation(alloc, cumulation, in.readableBytes());
buffer.writeBytes(in);
in.release();
} else {
CompositeByteBuf composite;
if (cumulation instanceof CompositeByteBuf) {
composite = (CompositeByteBuf) cumulation;
} else {
composite = alloc.compositeBuffer(Integer.MAX_VALUE);
composite.addComponent(true, cumulation);
}
composite.addComponent(true, in);
buffer = composite;
}
return buffer;
}
};
這個(gè)累積器只在第二種情況refCnt>1時(shí)擴(kuò)容,除此之外處理和MERGE_CUMULATOR
一致范删,不同的是當(dāng)cumulation不是CompositeByteBuf
時(shí)會(huì)創(chuàng)建新的同類CompositeByteBuf
蕾域,這樣最后返回的ByteBuf必定是CompositeByteBuf
。使用這個(gè)累積器后到旦,當(dāng)容量不夠時(shí)并不會(huì)進(jìn)行內(nèi)存復(fù)制旨巷,只會(huì)講新讀入的in
加到CompositeByteBuf
中。需要注意的是:此種情況下雖然不需內(nèi)存復(fù)制添忘,卻要求用戶維護(hù)復(fù)雜的索引采呐,在某些使用中可能慢于MERGE_CUMULATOR
。故Netty默認(rèn)使用MERGE_CUMULATOR
累積器搁骑。
累積器分析完畢斧吐,步入正題ByteToMessageDecoder
,首先看類簽名:
public abstract class ByteToMessageDecoder extends
ChannelInboundHandlerAdapter
該類是一個(gè)抽象類仲器,其中的抽象方法只有一個(gè)decode()
:
protected abstract void decode(ChannelHandlerContext ctx, ByteBuf in,
List<Object> out) throws Exception;
用戶使用了該解碼框架后煤率,只需實(shí)現(xiàn)該方法就可定義自己的解碼器。參數(shù)in
表示累積器已累積的數(shù)據(jù)乏冀,out
表示本次可從累積數(shù)據(jù)解碼出的結(jié)果列表蝶糯,結(jié)果可為POJO對(duì)象或者ByteBuf等等Object。
關(guān)注一下成員變量辆沦,以便更好的分析:
ByteBuf cumulation; // 累積區(qū)
private Cumulator cumulator = MERGE_CUMULATOR; // 累積器
// 設(shè)置為true后每個(gè)channelRead事件只解碼出一個(gè)結(jié)果
private boolean singleDecode; // 某些特殊協(xié)議使用
private boolean decodeWasNull; // 解碼結(jié)果為空
private boolean first; // 是否首個(gè)消息
// 累積區(qū)不丟棄字節(jié)的最大次數(shù)昼捍,16次后開始丟棄
private int discardAfterReads = 16;
private int numReads; // 累積區(qū)不丟棄字節(jié)的channelRead次數(shù)
下面识虚,直接進(jìn)入channelRead()
事件處理:
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
// 只對(duì)ByteBuf處理即只對(duì)字節(jié)數(shù)據(jù)進(jìn)行處理
if (msg instanceof ByteBuf) {
// 解碼結(jié)果列表
CodecOutputList out = CodecOutputList.newInstance();
try {
ByteBuf data = (ByteBuf) msg;
first = cumulation == null; // 累積區(qū)為空表示首次解碼
if (first) {
// 首次解碼直接使用讀入的ByteBuf作為累積區(qū)
cumulation = data;
} else {
// 非首次需要進(jìn)行字節(jié)數(shù)據(jù)累積
cumulation = cumulator.cumulate(ctx.alloc(), cumulation, data);
}
callDecode(ctx, cumulation, out); // 解碼操作
} catch (DecoderException e) {
throw e;
} catch (Throwable t) {
throw new DecoderException(t);
} finally {
if (cumulation != null && !cumulation.isReadable()) {
// 此時(shí)累積區(qū)不再有字節(jié)數(shù)據(jù),已被處理完畢
numReads = 0;
cumulation.release();
cumulation = null;
} else if (++ numReads >= discardAfterReads) {
// 連續(xù)discardAfterReads次后
// 累積區(qū)還有字節(jié)數(shù)據(jù)妒茬,此時(shí)丟棄一部分?jǐn)?shù)據(jù)
numReads = 0;
discardSomeReadBytes(); // 丟棄一些已讀字節(jié)
}
int size = out.size();
// 本次沒有解碼出數(shù)據(jù)舷礼,此時(shí)size=0
decodeWasNull = !out.insertSinceRecycled();
fireChannelRead(ctx, out, size); // 觸發(fā)事件
out.recycle(); // 回收解碼結(jié)果
}
} else {
ctx.fireChannelRead(msg);
}
}
解碼結(jié)果列表CodecOutputList
是Netty定制的一個(gè)特殊列表,該列表在線程中被緩存郊闯,可循環(huán)使用來存儲(chǔ)解碼結(jié)果妻献,減少不必要的列表實(shí)例創(chuàng)建冕茅,從而提升性能捏浊。由于解碼結(jié)果需要頻繁存儲(chǔ)菇存,普通的ArrayList難以滿足該需求寝凌,故定制化了一個(gè)特殊列表与帆,由此可見Netty對(duì)優(yōu)化的極致追求国瓮。
注意finally
塊的第一個(gè)if
情況滿足時(shí)糕簿,即累積區(qū)的數(shù)據(jù)已被讀取完畢屈扎,請(qǐng)考慮釋放累積區(qū)的必要性怀挠。想象這樣的情況析蝴,當(dāng)一條消息被解碼完畢后,如果客戶端長時(shí)間不發(fā)送消息绿淋,那么闷畸,服務(wù)端保存該條消息的累積區(qū)將一直占據(jù)服務(wù)端內(nèi)存浪費(fèi)資源,所以必須釋放該累積區(qū)吞滞。
第二個(gè)if
情況滿足時(shí)佑菩,即累積區(qū)的數(shù)據(jù)一直在channelRead讀取數(shù)據(jù)進(jìn)行累積和解碼,直到達(dá)到了discardAfterReads
次(默認(rèn)16)裁赠,此時(shí)累積區(qū)依然還有數(shù)據(jù)殿漠。在這樣的情況下,Netty主動(dòng)丟棄一些字節(jié)佩捞,這是為了防止該累積區(qū)占用大量內(nèi)存甚至耗盡內(nèi)存引發(fā)OOM绞幌。
處理完這些情況后,最后統(tǒng)一觸發(fā)ChannelRead事件一忱,將解碼出的數(shù)據(jù)傳遞給下一個(gè)處理器莲蜘。注意:當(dāng)out=0時(shí),統(tǒng)一到一起被處理了掀潮。
再看細(xì)節(jié)的discardSomeReadBytes()
和fireChannelRead()
:
protected final void discardSomeReadBytes() {
if (cumulation != null && !first && cumulation.refCnt() == 1) {
cumulation.discardSomeReadBytes();
}
}
static void fireChannelRead(ChannelHandlerContext ctx, CodecOutputList msgs,
int numElements) {
for (int i = 0; i < numElements; i ++) {
ctx.fireChannelRead(msgs.getUnsafe(i));
}
}
代碼比較簡單菇夸,只需注意discardSomeReadBytes
中,累積區(qū)的refCnt() == 1
時(shí)才丟棄數(shù)據(jù)是因?yàn)椋喝绻脩羰褂昧?code>slice().retain()和duplicate().retain()
使refCnt>1
仪吧,表明該累積區(qū)還在被用戶使用庄新,丟棄數(shù)據(jù)可能導(dǎo)致用戶的困惑,所以須確定用戶不再使用該累積區(qū)的已讀數(shù)據(jù),此時(shí)才丟棄择诈。
下面分析解碼核心方法callDecode()
:
protected void callDecode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
try {
while (in.isReadable()) {
int outSize = out.size();
if (outSize > 0) {
// 解碼出消息就立即處理械蹋,防止消息等待
fireChannelRead(ctx, out, outSize);
out.clear();
// 用戶主動(dòng)刪除該Handler,繼續(xù)操作in是不安全的
if (ctx.isRemoved()) {
break;
}
outSize = 0;
}
int oldInputLength = in.readableBytes();
decode(ctx, in, out); // 子類需要實(shí)現(xiàn)的具體解碼步驟
// 用戶主動(dòng)刪除該Handler羞芍,繼續(xù)操作in是不安全的
if (ctx.isRemoved()) {
break;
}
// 此時(shí)outSize都==0(這的代碼容易產(chǎn)生誤解 應(yīng)該直接使用0)
if (outSize == out.size()) {
if (oldInputLength == in.readableBytes()) {
// 沒有解碼出消息哗戈,且沒讀取任何in數(shù)據(jù)
break;
} else {
// 讀取了一部份數(shù)據(jù)但沒有解碼出消息
// 說明需要更多的數(shù)據(jù),故繼續(xù)
continue;
}
}
// 運(yùn)行到這里outSize>0 說明已經(jīng)解碼出消息
if (oldInputLength == in.readableBytes()) {
// 解碼出消息但是in的讀索引不變荷科,用戶的decode方法有Bug
throw new DecoderException(
"did not read anything but decoded a message.");
}
// 用戶設(shè)定一個(gè)channelRead事件只解碼一次
if (isSingleDecode()) {
break;
}
}
} catch (DecoderException e) {
throw e;
} catch (Throwable cause) {
throw new DecoderException(cause);
}
}
循環(huán)中的第一個(gè)if
分支唯咬,檢查解碼結(jié)果,如果已經(jīng)解碼出消息則立即將消息傳播到下一個(gè)處理器進(jìn)行處理畏浆,這樣可使消息得到及時(shí)處理胆胰。在調(diào)用decode()
方法的前后,都檢查該Handler是否被用戶從ChannelPipeline
中刪除刻获,如果刪除則跳出解碼步驟不對(duì)輸入緩沖區(qū)in
進(jìn)行操作蜀涨,因?yàn)槔^續(xù)操作in
已經(jīng)不安全。解碼完成后蝎毡,對(duì)in
解碼前后的讀索引進(jìn)行了檢查厚柳,防止用戶的錯(cuò)誤使用,如果用戶錯(cuò)誤使用將拋出異常沐兵。
至此别垮,核心的解碼框架已經(jīng)分析完畢,再看最后的一些邊角處理痒筒。首先是channelReadComplete()
讀事件完成后的處理:
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
numReads = 0; // 連續(xù)讀次數(shù)置0
discardSomeReadBytes(); // 丟棄已讀數(shù)據(jù)宰闰,節(jié)約內(nèi)存
if (decodeWasNull) {
// 沒有解碼出結(jié)果茬贵,則期待更多數(shù)據(jù)讀入
decodeWasNull = false;
if (!ctx.channel().config().isAutoRead()) {
ctx.read();
}
}
ctx.fireChannelReadComplete();
}
如果channelRead()
中沒有解碼出消息簿透,極有可能是數(shù)據(jù)不夠,由此調(diào)用ctx.read()
期待讀入更多的數(shù)據(jù)解藻。如果設(shè)置了自動(dòng)讀取老充,將會(huì)在HeadHandler
中調(diào)用ctx.read()
;沒有設(shè)置自動(dòng)讀取螟左,則需要此處顯式調(diào)用啡浊。
最后再看Handler從ChannelPipelien中移除的處理handlerRemoved()
:
public final void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
ByteBuf buf = cumulation;
if (buf != null) {
cumulation = null; // 釋放累積區(qū),GC回收
int readable = buf.readableBytes();
if (readable > 0) {
ByteBuf bytes = buf.readBytes(readable);
buf.release();
// 解碼器已被刪除故不再解碼胶背,只將數(shù)據(jù)傳播到下一個(gè)Handler
ctx.fireChannelRead(bytes);
} else {
buf.release();
}
numReads = 0; // 置0巷嚣,有可能被再次添加
ctx.fireChannelReadComplete();
}
handlerRemoved0(ctx); // 用戶可進(jìn)行的自定義處理
}
當(dāng)解碼器被刪除時(shí),如果還有沒被解碼的數(shù)據(jù)钳吟,則將數(shù)據(jù)傳播到下一個(gè)處理器處理廷粒,防止丟失數(shù)據(jù)。此外,當(dāng)連接不再有效觸發(fā)channelInactive
事件或者觸發(fā)ChannelInputShutdownEvent
時(shí)坝茎,則會(huì)調(diào)用callDecode()
解碼涤姊,如果解碼出消息,傳播到下一個(gè)處理器嗤放。這部分的代碼不再列出思喊。
至此,ByteToMessageDecoder
解碼框架已分析完畢次酌,下面恨课,我們選用具體的實(shí)例進(jìn)行分析。
8.1.1 LineBasedFrameDecoder
基于行分隔的解碼器LineBasedFrameDecoder
是一個(gè)特殊的分隔符解碼器岳服,該解碼器使用的分隔符為:windows的\r\n
和類linux的\n
庄呈。
首先看該類定義的成員變量:
// 最大幀長度,超過此長度將拋出異常TooLongFrameException
private final int maxLength;
// 是否快速失敗派阱,true-檢測到幀長度過長立即拋出異常不在讀取整個(gè)幀
// false-檢測到幀長度過長依然讀完整個(gè)幀再拋出異常
private final boolean failFast;
// 是否略過分隔符诬留,true-解碼結(jié)果不含分隔符
private final boolean stripDelimiter;
// 超過最大幀長度是否丟棄字節(jié)
private boolean discarding;
private int discardedBytes; // 丟棄的字節(jié)數(shù)
其中,前三個(gè)變量可由用戶根據(jù)實(shí)際情況配置贫母,后兩個(gè)變量解碼時(shí)使用文兑。
該子類覆蓋的解碼方法如下:
protected final void decode(ChannelHandlerContext ctx, ByteBuf in,
List<Object> out) throws Exception {
Object decoded = decode(ctx, in);
if (decoded != null) {
out.add(decoded);
}
}
其中又定義了decode(ctx, in)
解碼出單個(gè)消息幀,事實(shí)上這也是其他編碼子類使用的方法腺劣。decode(ctx, in)
方法處理很繞彎绿贞,只給出偽代碼:
protected Object decode(ChannelHandlerContext ctx, ByteBuf buffer) throws Exception {
final int eol = findEndOfLine(buffer);
if (!discarding) {
if (eol >= 0) {
// 此時(shí)已找到換行符
if(!checkMaxLength()) {
return getFrame().retain();
}
// 超過最大長度拋出異常
} else {
if (checkMaxLength()) {
// 設(shè)置true表示下一次解碼需要丟棄字節(jié)
discarding = true;
if (failFast) {
// 拋出異常
}
}
}
} else {
if (eol >= 0) {
// 丟棄換行符以及之前的字節(jié)
buffer.readerIndex(eol + delimLength);
} else {
// 丟棄收到的所有字節(jié)
buffer.readerIndex(buffer.writerIndex());
}
}
}
該方法需要結(jié)合解碼框架的while
循環(huán)反復(fù)理解,每個(gè)if情況都是一次while循環(huán)橘原,而變量discarding
就成為控制每次解碼流程的狀態(tài)量籍铁,注意其中的狀態(tài)轉(zhuǎn)移。(想法:使用狀態(tài)機(jī)實(shí)現(xiàn)趾断,則流程更清晰)
8.1.2 DelimiterBasedFrameDecoder
該解碼器是更通用的分隔符解碼器拒名,可支持多個(gè)分隔符,每個(gè)分隔符可為一個(gè)或多個(gè)字符芋酌。如果定義了多個(gè)分隔符增显,并且可解碼出多個(gè)消息幀,則選擇產(chǎn)生最小幀長的結(jié)果脐帝。例如同云,使用行分隔符\r\n
和\n
分隔:
+--------------+
| ABC\nDEF\r\n |
+--------------+
可有兩種結(jié)果:
+-----+-----+ +----------+
| ABC | DEF | (√) 和 | ABC\nDEF | (×)
+-----+-----+ +----------+
該編碼器可配置的變量與LineBasedFrameDecoder
類似,只是多了一個(gè)ByteBuf[] delimiters
用于配置具體的分隔符堵腹。
Netty在Delimiters
類中定義了兩種默認(rèn)的分隔符炸站,分別是NULL分隔符和行分隔符:
public static ByteBuf[] nulDelimiter() {
return new ByteBuf[] {
Unpooled.wrappedBuffer(new byte[] { 0 }) };
}
public static ByteBuf[] lineDelimiter() {
return new ByteBuf[] {
Unpooled.wrappedBuffer(new byte[] { '\r', '\n' }),
Unpooled.wrappedBuffer(new byte[] { '\n' }),
};
}
8.1.3 FixedLengthFrameDecoder
該解碼器十分簡單,按照固定長度frameLength
解碼出消息幀疚顷。如下的數(shù)據(jù)幀解碼為固定長度3的消息幀示例如下:
+---+----+------+----+ +-----+-----+-----+
| A | BC | DEFG | HI | -> | ABC | DEF | GHI |
+---+----+------+----+ +-----+-----+-----+
其中的解碼方法也十分簡單:
protected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
if (in.readableBytes() < frameLength) {
return null;
} else {
return in.readSlice(frameLength).retain();
}
}
8.1.4 LengthFieldBasedFrameDecoder
基于長度字段的消息幀解碼器旱易,該解碼器可根據(jù)數(shù)據(jù)包中的長度字段動(dòng)態(tài)的解碼出消息幀。一個(gè)推薦的二進(jìn)制傳輸協(xié)議可設(shè)計(jì)為如下格式:
+----------+------+----------+------+
| 頭部長度 | 頭部 | 數(shù)據(jù)長度 | 數(shù)據(jù) |
+----------+------+----------+------+
這樣的協(xié)議可滿足大多數(shù)場景使用,但不幸的是:很多情況下并不可以設(shè)計(jì)新的協(xié)議咒唆,往往要在老舊的協(xié)議上傳輸數(shù)據(jù)届垫。由此,Netty將該解碼器設(shè)計(jì)的十分通用全释,只要有類似的長度字段便能正確解碼出消息幀装处。當(dāng)然前提是:正確使用解碼器。
沒有什么是完美的浸船,由于該解碼器十分通用妄迁,所以有大量的配置變量:
private final ByteOrder byteOrder;
private final int maxFrameLength;
private final boolean failFast;
private final int lengthFieldOffset;
private final int lengthFieldLength;
private final int lengthAdjustment;
private final int initialBytesToStrip;
變量byteOrder
表示長度字段的字節(jié)序:大端或小端,默認(rèn)為大端李命。如果對(duì)字節(jié)序有疑問登淘,請(qǐng)查閱其他資料,不再贅述封字。maxFrameLength
和failFast
與其他解碼器相同黔州,控制最大幀長度和快速失敗拋異常,注意:該解碼器failFast
默認(rèn)為true阔籽。
接下來將重點(diǎn)介紹其它四個(gè)變量:
-
lengthFieldOffset
表示長度字段偏移量即在一個(gè)數(shù)據(jù)包中長度字段的具體下標(biāo)位置流妻。標(biāo)準(zhǔn)情況,該長度字段為數(shù)據(jù)部分長度笆制。 -
lengthFieldLength
表示長度字段的具體字節(jié)數(shù)绅这,如一個(gè)int占4字節(jié)。該解碼器支持的字節(jié)數(shù)有:1在辆,2证薇,3,4和8匆篓,其他則會(huì)拋出異常浑度。另外,還需要注意的是:長度字段的結(jié)果為無符號(hào)數(shù)奕删。 -
lengthAdjustment
是一個(gè)長度調(diào)節(jié)量俺泣,當(dāng)數(shù)據(jù)包的長度字段不是數(shù)據(jù)部分長度而是總長度時(shí),可將此值設(shè)定為頭部長度完残,便能正確解碼出包含整個(gè)數(shù)據(jù)包的結(jié)果消息幀。注意:某些情況下横漏,該值可設(shè)定為負(fù)數(shù)谨设。 -
initialBytesToStrip
表示需要略過的字節(jié)數(shù),如果我們只關(guān)心數(shù)據(jù)部分而不關(guān)心頭部缎浇,可將此值設(shè)定為頭部長度從而丟棄頭部扎拣。
下面我們使用具體的例子來說明:
- 需求1:如下待解碼數(shù)據(jù)包,正確解碼為消息幀,其中長度字段在最前面的2字節(jié)二蓝,數(shù)據(jù)部分為12字節(jié)的字符串"HELLO, WORLD"誉券,長度字段0x000C=12 表示數(shù)據(jù)部分長度,數(shù)據(jù)包總長度則為14字節(jié)刊愚。
解碼前(14 bytes) 解碼后(14 bytes)
+--------+----------------+ +--------+----------------+
| Length | Actual Content |----->| Length | Actual Content |
| 0x000C | "HELLO, WORLD" | | 0x000C | "HELLO, WORLD" |
+--------+----------------+ +--------+----------------+
正確配置(只列出四個(gè)值中不為0的值):
lengthFieldLength = 2;
- 需求2:需求1的數(shù)據(jù)包不變踊跟,消息幀中去除長度字段。
解碼前(14 bytes) 解碼后(12 bytes)
+--------+----------------+ +----------------+
| Length | Actual Content |----->| Actual Content |
| 0x000C | "HELLO, WORLD" | | "HELLO, WORLD" |
+--------+----------------+ +----------------+
正確配置:
lengthFieldLength = 2;
initialBytesToStrip = 2;
- 需求3:需求1數(shù)據(jù)包中長度字段表示數(shù)據(jù)包總長度鸥诽。
解碼前(14 bytes) 解碼后(14 bytes)
+--------+----------------+ +--------+----------------+
| Length | Actual Content |----->| Length | Actual Content |
| 0x000E | "HELLO, WORLD" | | 0x000E | "HELLO, WORLD" |
+--------+----------------+ +--------+----------------+
正確配置:
lengthFieldLength = 2;
lengthAdjustment = -2; // 調(diào)整長度字段的2字節(jié)
- 需求4:綜合難度商玫,數(shù)據(jù)包有兩個(gè)頭部HDR1和HDR2,長度字段以及數(shù)據(jù)部分組成牡借,其中長度字段值表示數(shù)據(jù)包總長度拳昌。結(jié)果消息幀需要第二個(gè)頭部HDR2和數(shù)據(jù)部分。請(qǐng)先給出答案再與標(biāo)準(zhǔn)答案比較钠龙,結(jié)果正確說明你已完全掌握了該解碼器的使用炬藤。
解碼前 (16 bytes) 解碼后 (13 bytes)
+------+--------+------+----------------+ +------+----------------+
| HDR1 | Length | HDR2 | Actual Content |----->| HDR2 | Actual Content |
| 0xCA | 0x0010 | 0xFE | "HELLO, WORLD" | | 0xFE | "HELLO, WORLD" |
+------+--------+------+----------------+ +------+----------------+
正確配置:
lengthFieldOffset = 1;
lengthFieldLength = 2;
lengthAdjustment = -3;
initialBytesToStrip = 3;
本解碼器的解碼過程總體上較為復(fù)雜,由于解碼的代碼是在while
循環(huán)里面碴里,decode
方法return或者拋出異常時(shí)可看做一次循環(huán)結(jié)束刻像,直到in中數(shù)據(jù)被解析完或者in的readerIndex讀索引不再增加才會(huì)從while
循環(huán)跳出。使用狀態(tài)的思路理解并闲,每個(gè)return或者拋出異诚杆看為一個(gè)狀態(tài):
- 狀態(tài)1:丟棄過長幀狀態(tài),可能是用戶設(shè)置了錯(cuò)誤的幀長度或者實(shí)際幀過長帝火。
if (discardingTooLongFrame) {
long bytesToDiscard = this.bytesToDiscard;
int localBytesToDiscard = (int) Math.min(bytesToDiscard, in.readableBytes());
in.skipBytes(localBytesToDiscard); // 丟棄實(shí)際的字節(jié)數(shù)
bytesToDiscard -= localBytesToDiscard;
this.bytesToDiscard = bytesToDiscard;
failIfNecessary(false);
}
變量localBytesToDiscard
取得實(shí)際需要丟棄的字節(jié)數(shù)溜徙,由于過長幀有兩種情況:a.用戶設(shè)置了錯(cuò)誤的長度字段,此時(shí)in中并沒有如此多的字節(jié)犀填;b.in中確實(shí)有如此長度的幀蠢壹,這個(gè)幀確實(shí)超過了設(shè)定的最大長度。bytesToDiscard
的計(jì)算是為了failIfNecessary()
確定異常的拋出九巡,其值為0表示當(dāng)次丟棄狀態(tài)已經(jīng)丟棄了in中的所有數(shù)據(jù)图贸,可以對(duì)新讀入in的數(shù)據(jù)進(jìn)行處理;否則冕广,還處于異常狀態(tài)疏日。
private void failIfNecessary(boolean firstDetectionOfTooLongFrame) {
if (bytesToDiscard == 0) {
long tooLongFrameLength = this.tooLongFrameLength;
this.tooLongFrameLength = 0;
// 由于已經(jīng)丟棄所有數(shù)據(jù),關(guān)閉丟棄模式
discardingTooLongFrame = false;
// 已經(jīng)丟棄了所有字節(jié)撒汉,當(dāng)非快速失敗模式拋異常
if (!failFast || firstDetectionOfTooLongFrame) {
fail(tooLongFrameLength);
}
} else {
if (failFast && firstDetectionOfTooLongFrame) {
// 幀長度異常沟优,快速失敗模式檢測到即拋異常
fail(tooLongFrameLength);
}
}
}
可見,首次檢測到幀長度是一種特殊情況睬辐,在之后的一個(gè)狀態(tài)進(jìn)行分析挠阁。請(qǐng)注意該狀態(tài)并不是都拋異常宾肺,還有可能進(jìn)入狀態(tài)2。
- 狀態(tài)2:in中數(shù)據(jù)不足夠組成消息幀侵俗,此時(shí)直接返回null等待更多數(shù)據(jù)到達(dá)锨用。
if (in.readableBytes() < lengthFieldEndOffset) {
return null;
}
- 狀態(tài)3:幀長度錯(cuò)誤檢測,檢測長度字段為負(fù)值得幀以及加入調(diào)整長度后總長小于長度字段的幀隘谣,均拋出異常增拥。
int actualLengthFieldOffset = in.readerIndex() + lengthFieldOffset;
// 該方法取出長度字段的值,不再深入分析
long frameLength = getUnadjustedFrameLength(in, actualLengthFieldOffset,
lengthFieldLength, byteOrder);
if (frameLength < 0) {
in.skipBytes(lengthFieldEndOffset);
throw new CorruptedFrameException("...");
}
frameLength += lengthAdjustment + lengthFieldEndOffset;
if (frameLength < lengthFieldEndOffset) {
in.skipBytes(lengthFieldEndOffset);
throw new CorruptedFrameException("...");
}
- 狀態(tài)4:幀過長洪橘,由前述可知:可能是用戶設(shè)置了錯(cuò)誤的幀長度或者實(shí)際幀過長跪者。
if (frameLength > maxFrameLength) {
long discard = frameLength - in.readableBytes();
tooLongFrameLength = frameLength;
if (discard < 0) {
in.skipBytes((int) frameLength);
} else {
discardingTooLongFrame = true;
bytesToDiscard = discard;
in.skipBytes(in.readableBytes());
}
failIfNecessary(true);
return null;
}
變量discard<0
表示當(dāng)前收到的數(shù)據(jù)足以確定是實(shí)際的幀過長,所以直接丟棄過長的幀長度熄求;>0
表示當(dāng)前in中的數(shù)據(jù)并不足以確定是用戶設(shè)置了錯(cuò)誤的幀長度渣玲,還是正確幀的后續(xù)數(shù)據(jù)字節(jié)還沒有到達(dá),但無論何種情況弟晚,將丟棄狀態(tài)discardingTooLongFrame
標(biāo)記設(shè)置為true忘衍,之后后續(xù)數(shù)據(jù)字節(jié)進(jìn)入狀態(tài)1處理。==0
時(shí)卿城,在failIfNecessary(true)
無論如何都將拋出異常枚钓,><0
時(shí),只有設(shè)置快速失敗才會(huì)拋出異常瑟押。還需注意一點(diǎn):failIfNecessary()
的參數(shù)firstDetectionOfTooLongFrame
的首次是指正確解析數(shù)據(jù)后發(fā)生的第一次發(fā)生的幀過長搀捷,可知會(huì)有很多首次。
- 狀態(tài)5:正確解碼出消息幀多望。
int frameLengthInt = (int) frameLength;
if (in.readableBytes() < frameLengthInt) {
return null; // 到達(dá)的數(shù)據(jù)還達(dá)不到幀長
}
if (initialBytesToStrip > frameLengthInt) {
in.skipBytes(frameLengthInt); // 跳過字節(jié)數(shù)錯(cuò)誤
throw new CorruptedFrameException("...");
}
in.skipBytes(initialBytesToStrip);
// 正確解碼出數(shù)據(jù)幀
int readerIndex = in.readerIndex();
int actualFrameLength = frameLengthInt - initialBytesToStrip;
ByteBuf frame = in.slice(readerIndex, actualFrameLength).retain();
in.readerIndex(readerIndex + actualFrameLength);
return frame;
代碼中混合了兩個(gè)簡單狀態(tài)嫩舟,到達(dá)的數(shù)據(jù)還達(dá)不到幀長和用戶設(shè)置的忽略字節(jié)數(shù)錯(cuò)誤。由于較為簡單怀偷,故合并到一起家厌。
至此解碼框架分析完畢∽倒ぃ可見饭于,要正確的寫出基于長度字段的解碼器還是較為復(fù)雜的,如果開發(fā)時(shí)確有需求维蒙,特別要注意狀態(tài)的轉(zhuǎn)移掰吕。下面介紹較為簡單的編碼框架。
8.2 MessageToByteEncoder
MessageToByteEncoder框架可見用戶使用POJO對(duì)象編碼為字節(jié)數(shù)據(jù)存儲(chǔ)到ByteBuf木西。用戶只需定義自己的編碼方法encode()
即可畴栖。
首先看類簽名:
public abstract class MessageToByteEncoder<I> extends
ChannelOutboundHandlerAdapter
可知該類只處理出站事件,切確的說是write事件八千。
該類有兩個(gè)成員變量吗讶,preferDirect
表示是否使用內(nèi)核的DirectedByteBuf,默認(rèn)為true恋捆。TypeParameterMatcher
用于檢測泛型參數(shù)是否是期待的類型照皆,比如說,如果需要編碼String
類的POJO對(duì)象沸停,Matcher會(huì)確保write()
傳入的參數(shù)Object
的實(shí)際切確類型為String
膜毁。
直接分析write()
的處理:
public void write(ChannelHandlerContext ctx, Object msg,
ChannelPromise promise) throws Exception {
ByteBuf buf = null;
try {
if (acceptOutboundMessage(msg)) {
I cast = (I) msg;
// 分配一個(gè)輸出緩沖區(qū)
buf = allocateBuffer(ctx, cast, preferDirect);
try {
encode(ctx, cast, buf); // 用戶定義的編碼方法
} finally {
ReferenceCountUtil.release(cast);
}
if (buf.isReadable()) {
ctx.write(buf, promise); // 確實(shí)寫入了數(shù)據(jù)
} else {
// 沒有需要寫的數(shù)據(jù),也有可能是用戶編碼錯(cuò)誤
buf.release();
ctx.write(Unpooled.EMPTY_BUFFER, promise);
}
buf = null;
} else {
ctx.write(msg, promise);
}
} catch (EncoderException e) {
throw e;
} catch (Throwable e) {
throw new EncoderException(e);
} finally {
if (buf != null) {
buf.release();
}
}
}
編碼框架簡單明了愤钾,再列出allocateBuffer()
方法的代碼:
protected ByteBuf allocateBuffer(ChannelHandlerContext ctx, I msg,
boolean preferDirect) throws Exception {
if (preferDirect) {
return ctx.alloc().ioBuffer(); // 內(nèi)核直接緩存
} else {
return ctx.alloc().heapBuffer(); // JAVA隊(duì)緩存
}
}
總的來說瘟滨,編碼的復(fù)雜度大大小于解碼的復(fù)雜度,這是因?yàn)榫幋a不需考慮TCP粘包能颁。編解碼的處理還有一個(gè)常用的類MessageToMessageCodec
用于POJO對(duì)象之間的轉(zhuǎn)換杂瘸。如果有興趣,可下載源碼查看伙菊。至此败玉,編解碼框架已分析完畢。