Netty入門——拆粘包與編解碼(三)
回顧一下上一篇文章中MyClientInitializer類中的代碼裸燎。
//(1)加入拆包器
pipeline.addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE,0,4,0,4));
//(2)加入粘包器
pipeline.addLast(new LengthFieldPrepender(4));
//字符串解碼 (3)
pipeline.addLast(new StringDecoder(CharsetUtil.UTF_8));
//字符串編碼 (4)
pipeline.addLast(new StringEncoder(CharsetUtil.UTF_8));
上面代碼中用到了Netty的拆包器、粘包器、編解碼器管行。本文會對Netty 是如何拆包進行分析本冲。
TCP的粘包與拆包
在TCP/IP協(xié)議中准脂,數(shù)據(jù)是以二進制流的方式傳播的,數(shù)據(jù)包映射到數(shù)據(jù)鏈路層檬洞、IP 層和 TCP 層分別叫Frame狸膏、Packet、Segment添怔,我們這邊不死磕如何翻譯湾戳,下面都統(tǒng)一用英文表示。
粘包
粘包是TCP在傳輸過程中广料,為了提高有效負載砾脑,把多個數(shù)據(jù)包合并成一個數(shù)據(jù)包發(fā)送的現(xiàn)象。如何理解艾杏?比如10字節(jié)的數(shù)據(jù)韧衣,每次發(fā)送1個字節(jié),需要10次TCP傳輸,10 ACK 確認畅铭。如果合并成一個數(shù)據(jù)包一起發(fā)送氏淑,可以提高有效負載,節(jié)省帶寬(前提是對數(shù)據(jù)實時性要求不高的場景)顶瞒。
但是粘包會引發(fā)語義級別的message識別問題夸政。比如下面這張圖:
ABC+DEF+GHI分3個message, 也就是3個Frame 發(fā)送出去,接收端收到4個Frame榴徐,不在是原來的3個message 對應的3 個Frame守问。這就是TCP的粘包與半包現(xiàn)象。AB坑资、H耗帕、I的情況是半包,CDEFG的情況是粘包袱贮。雖然順序是和原來一樣仿便,但是分組不再是原來的3個分組,這個時候就需要語義上message識別攒巍,即拆包嗽仪。
拆包
發(fā)送端把4個數(shù)據(jù)包粘成2個就需要接收端把這2個數(shù)據(jù)包拆分成4個。按照如下步驟進行拆包:
1柒莉、讀取數(shù)據(jù)闻坚,根據(jù)協(xié)議判斷是否可以構成一個完整的包
2、如果能夠構成一個完整的數(shù)據(jù)包兢孝,那么和之前接收到的數(shù)據(jù)一起拼接成一個完整的數(shù)據(jù)包給業(yè)務邏輯層窿凤,多余的數(shù)據(jù)等待下一次的拼接。
3跨蟹、如果不能雳殊,那么繼續(xù)從緩存中讀取數(shù)據(jù)。
那么如何判斷是否是一個完整的包窗轩?
有兩種方式:
方式 1:分隔符夯秃。為人熟知的SMTP、POP3品姓、IMAP寝并、Telnet等等。下圖顯示的是使用“\r\n”分隔符的處理過程腹备。
圖中的數(shù)字說明:1衬潦、字節(jié)流。2植酥、第一幀镀岛。3弦牡、第二幀
方式 2:固定長度。大家最熟悉的HTTP協(xié)議就是這種方式:Header+Content漂羊。
- Header : 協(xié)議頭部驾锰,放置一些Meta信息。
-
Content : 應用之間交互的信息主體走越。
在HTTP header中 通過Content-Length告知message有多長椭豫,應用層才能識別到這條message。比如下圖的HTTP1.1協(xié)議旨指。
Netty拆包流程
首先在Netty的拆包流程中有兩個重要的變量cumulation
和cumulator
赏酥。cumulation 是Netty中自定義的ByteBuf,與Java原生的ByteBuf還不一樣谆构,這個我們之后再講裸扶,我們就直接理解成一個字節(jié)容器,cumulator 是一個累加器搬素。
ByteBuf cumulation;
private Cumulator cumulator = MERGE_CUMULATOR;
累加器的代碼如下呵晨。簡單講就是通過調(diào)用API buffer.writeBytes(in); 把in數(shù)據(jù)通過內(nèi)存拷貝的方式合并到cumulation中,在合并前判斷是否要對cumulation 進行擴容熬尺。
public static final Cumulator MERGE_CUMULATOR = new Cumulator() {
@Override
public ByteBuf cumulate(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf in) {
final ByteBuf buffer;
if (cumulation.writerIndex() > cumulation.maxCapacity() - in.readableBytes()
|| cumulation.refCnt() > 1 || cumulation.isReadOnly()) {
// Expand cumulation (by replace it) when either there is not more room in the buffer
// or if the refCnt is greater then 1 which may happen when the user use slice().retain() or
// duplicate().retain() or if its read-only.
//
// See:
// - https://github.com/netty/netty/issues/2327
// - https://github.com/netty/netty/issues/1764
buffer = expandCumulation(alloc, cumulation, in.readableBytes());
} else {
buffer = cumulation;
}
buffer.writeBytes(in);
in.release();
return buffer;
}
};
理解了這兩個變量后我們在看ByteToMessageDecoder 中的channelRead方法摸屠,該方法是每次從緩沖區(qū)讀到數(shù)據(jù)時自動調(diào)用。
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (msg instanceof ByteBuf) {
CodecOutputList out = CodecOutputList.newInstance();
try {
//1粱哼、合并數(shù)據(jù)到字節(jié)容器中
ByteBuf data = (ByteBuf) msg;
first = cumulation == null;
if (first) {
cumulation = data;
} else {
cumulation = cumulator.cumulate(ctx.alloc(), cumulation, data);
}
//2餐塘、把字節(jié)容器中的數(shù)據(jù)拆包并添加到業(yè)務數(shù)據(jù)容器out中
callDecode(ctx, cumulation, out);
} catch (DecoderException e) {
throw e;
} catch (Throwable t) {
throw new DecoderException(t);
} finally {
//3、清理字節(jié)容器
if (cumulation != null && !cumulation.isReadable()) {
numReads = 0;
cumulation.release();
cumulation = null;
} else if (++ numReads >= discardAfterReads) {
// We did enough reads already try to discard some bytes so we not risk to see a OOME.
// See https://github.com/netty/netty/issues/4275
numReads = 0;
discardSomeReadBytes();
}
//4皂吮、把拆包后的數(shù)據(jù)交給后面的Handler解碼
int size = out.size();
decodeWasNull = !out.insertSinceRecycled();
fireChannelRead(ctx, out, size);
out.recycle();
}
} else {
ctx.fireChannelRead(msg);
}
}
主要步驟已經(jīng)在代碼中注釋。下面我們來具體分析一下税手。
1蜂筹、合并數(shù)據(jù)到字節(jié)容器中。先判斷字節(jié)容器cumulation中是否有數(shù)據(jù)芦倒,沒有就直接賦值艺挪,有的話,有的話就調(diào)用累加器進行累加兵扬。
2麻裳、把字節(jié)容器中的數(shù)據(jù)拆包并添加到業(yè)務數(shù)據(jù)容器out中。
我們來看一下callDecode
protected void callDecode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
try {
while (in.isReadable()) {
int outSize = out.size();
............
//1器钟、讀取字節(jié)累加容器中的可讀字節(jié)數(shù)
int oldInputLength = in.readableBytes();
//2津坑、交給業(yè)務拆包器進行拆包
decode(ctx, in, out);
.............
//3、還沒有解出數(shù)據(jù)包
if (outSize == out.size()) {
//4傲霸、如果字節(jié)累加容器中的可讀字節(jié)數(shù)沒變跳出循環(huán)接著讀數(shù)據(jù)
if (oldInputLength == in.readableBytes()) {
break;
} else {
//5疆瑰、還沒有解出數(shù)據(jù)包眉反,繼續(xù)解包
continue;
}
}
//6、解出來了穆役,但是累加容器實際沒有讀到數(shù)據(jù)拋異常
if (oldInputLength == in.readableBytes()) {
throw new DecoderException(
StringUtil.simpleClassName(getClass()) +
".decode() did not read anything but decoded a message.");
}
............
}
3寸五、清理字節(jié)容器。如果累加容器不為空并且沒有可讀數(shù)據(jù)耿币,那么直接釋放掉梳杏。否則如果還有未讀數(shù)據(jù),并且次數(shù)大于16淹接,discardAfterReads的默認值為16十性。調(diào)用discardSomeReadBytes,discardSomeReadBytes源碼如下表示當讀索引超過容量的一半時蹈集,進行數(shù)據(jù)前移烁试。
@Override
public ByteBuf discardSomeReadBytes() {
......
if (readerIndex >= capacity() >>> 1) {
setBytes(0, this, readerIndex, writerIndex - readerIndex);
writerIndex -= readerIndex;
adjustMarkers(readerIndex);
readerIndex = 0;
}
......
return this;
}
丟棄前
丟棄后
4、把拆包后的數(shù)據(jù)交給后面的Handler解碼拢肆。通過fireChannelRead實現(xiàn)减响,同時會設置變量decodeWasNull,用來標識是否解出數(shù)據(jù)包郭怪。該變量用在 channelReadComplete 函數(shù)中支示,該函數(shù)的源碼如下。
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
numReads = 0;
discardSomeReadBytes();
if (decodeWasNull) {
decodeWasNull = false;
if (!ctx.channel().config().isAutoRead()) {
ctx.read();
}
}
ctx.fireChannelReadComplete();
}
decodeWasNull 為true鄙才,表示沒有解出數(shù)據(jù)包颂鸿,這個時候如果channel 設置成了非自動讀取,調(diào)用ctx.read()
攒庵。這個方法有什么作用呢晨仑?
在AbstractChannelHandlerContext
類中找到了read 方法胖喳。read()函數(shù)會調(diào)用invokeRead(),這個方法的源代碼如下:
private void invokeRead() {
if (invokeHandler()) {
try {
((ChannelOutboundHandler) handler()).read(this);
} catch (Throwable t) {
notifyHandlerException(t);
}
} else {
read();
}
}
最終傳播到HeadContext的read()方法,最后調(diào)用unsafe.beginRead()設置關心底層read事件艘款,從而實現(xiàn)激活后自動讀取數(shù)據(jù)煌妈。
@Override
public void read(ChannelHandlerContext ctx) {
unsafe.beginRead();
}
unsafe是Netty內(nèi)部實現(xiàn)底層IO細節(jié)的類坤塞,beginRead()方法設定底層Selector關心read事件焕济,如果read事件就緒,則會調(diào)用unsafe.read()方法讀取數(shù)據(jù)场梆,然后調(diào)用channelPipe.fireChannelRead()方法通知用戶已讀取到數(shù)據(jù)墅冷,可進行業(yè)務處理。
自定義協(xié)議的拆包
在上一篇文章中采用的是通用拆包器LengthFieldBasedFrameDecoder或油,基本上所有的基于長度的二進制協(xié)議都可以用他進行拆包寞忿。關于LengthFieldBasedFrameDecoder如何使用,我這里依然推薦閃電俠同學的這篇文章装哆,LengthFieldBasedFrameDecoder罐脊,這里不在贅述定嗓。
那么如何對自定義的協(xié)議進行拆包?
我曾在簡書上看到過關于這方面的文章萍桌,比如這篇文章宵溅,但是這篇文章中的例子卻沒有考慮半包問題,所以我這邊以另外一個例子作為講解內(nèi)容上炎。
我在學習RPC開源框架的時候恃逻,偶然發(fā)現(xiàn)了張旭大神一個人寫的RPC框架——Navi-pbrpc,這個框架基于Netty網(wǎng)絡通信和Protobuf的序列化藕施,非常適合學習RPC的同學入門(推薦學習完Navi-pbrpc源碼寇损,再去看Dubbo的源碼會更好)。在這個框架中裳食,應用層他自定義了一個協(xié)議矛市,該協(xié)議基于header+body方式,header內(nèi)含的body length屬性來表明二進制數(shù)據(jù)長度诲祸,body采用經(jīng)過protobuf壓縮后的二進制數(shù)據(jù)浊吏。
NsHead是Navi-pbrpc 內(nèi)部的header。NsHead + protobuf序列化body包結構示意如下
NsHead結構如下:
NsHead的固定長度36個字節(jié)救氯,Header各字段中可以看到body-length字段找田,用來標識消息體長度。
了解了協(xié)議結構之后着憨,下面我們看一下Navi-pbrpc是如何進行拆包的墩衙。
拆包的源碼如下:
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
// 解決半包問題,此時Nshead還沒有接收全甲抖,channel中留存的字節(jié)流不做處理
if (in.readableBytes() < NsHead.NSHEAD_LEN) {
return;
}
in.markReaderIndex();
byte[] bytes = new byte[NsHead.NSHEAD_LEN];
in.readBytes(bytes, 0, NsHead.NSHEAD_LEN);
NsHead nsHead = new NsHead();
nsHead.wrap(bytes);
// 解決半包問題漆改,此時body還沒有接收全,channel中留存的字節(jié)流不做處理准谚,重置readerIndex
if (in.readableBytes() < (int) nsHead.getBodyLen()) {
in.resetReaderIndex();
return;
}
// 此時接受到了足夠的一個包籽懦,開始處理
in.markReaderIndex();
byte[] totalBytes = new byte[(int) nsHead.getBodyLen()];
in.readBytes(totalBytes, 0, (int) nsHead.getBodyLen());
PbrpcMsg decoded = PbrpcMsg.of(nsHead).setData(totalBytes);
ContextHolder.putContext("_logid", nsHead.getLogId()); // TODO
if (decoded != null) {
out.add(decoded);
}
}
1、可以看到的是先從ByteBuf 中讀取可讀的字節(jié)數(shù)氛魁,如果沒有達到NsHead的大小,就不做處理厅篓,如果達到了就讀取36個字節(jié)到字節(jié)數(shù)組中秀存,并且封裝成一個nsHead,里面包含了上圖NsHead結構里的所有字段羽氮。
2或链、拿到NsHead中body-length,,可以知道body的長度档押,接著讀取ByteBuf中可讀字段大小澳盐,如果小于body長度就返回祈纯,否則讀取ByteBuf到字節(jié)數(shù)組中,解決半包問題叼耙。
3腕窥、最后利用Protobuf客戶端SDK反序列化方法拿到消息體。
這里涉及到Netty的ByteBuf筛婉,留在以后的文章中再講簇爆。
總結
本文主要通過源碼分析介紹了Netty的拆包流程,主要分為了四個步驟:
1爽撒、合并數(shù)據(jù)到字節(jié)容器中入蛆。
2、把字節(jié)容器中的數(shù)據(jù)拆包并添加到業(yè)務數(shù)據(jù)容器out中
3硕勿、清理字節(jié)容器
4哨毁、把拆包后的數(shù)據(jù)交給后面的Handler解碼
并且通過一個例子介紹了如何自定義協(xié)議拆包。其實不管是netty還是自己去拆包源武,流程無非是文章開頭介紹的流程扼褪,萬變不離其宗吧。