原文博客:Doi技術團隊
鏈接地址:https://blog.doiduoyi.com
初心:記錄優(yōu)秀的Doi技術團隊學習經(jīng)歷
本篇 主要講的是自定義協(xié)議是如何實現(xiàn)的恕酸,以及自定義協(xié)議中會出現(xiàn)的問題和Netty是如何支持的紊馏。
分為4個部分
|-- 粘包 拆包 和解決方案
|-- 代碼實現(xiàn)
|-- ByteToMessageDecoder的源碼分析
|-- 過程流程圖
粘包
在傳輸數(shù)據(jù)太多的時候鹅龄,TCP是會將數(shù)據(jù)塊分包發(fā)送的殊鞭,也就是在網(wǎng)絡延遲的問題疗韵,本來一個完整數(shù)據(jù)塊分成兩個包权薯,在開始讀取數(shù)據(jù)的時候嗓化,只收到了一個數(shù)據(jù)包帽馋,這個時候搅方,怎么辦?如果先處理一個數(shù)據(jù)包绽族,這樣數(shù)據(jù)不完整姨涡。等待?那如何知道數(shù)據(jù)完整了呢吧慢?
拆包
TCP是以字節(jié)流流的方式來傳輸?shù)奶纹瑪?shù)據(jù)是存儲在緩沖區(qū)。雖然發(fā)送數(shù)據(jù)是以每個包發(fā)送的检诗,但如果網(wǎng)絡出現(xiàn)延遲匈仗,在第一個包的數(shù)據(jù)還存儲在緩沖區(qū)的時候,第二個包就發(fā)送過來了逢慌。此時第二個包的數(shù)據(jù)也存儲到緩沖區(qū)悠轩,這時候,就不知道第一個包在哪兒結束攻泼,第二個包的數(shù)據(jù)是從哪里開始讀取了哗蜈。這就是TCP粘包。
解決方案
方案一:
解決方案其實就是坠韩,如何去自定義定義這個協(xié)議包,去解決粘包的問題和數(shù)據(jù)包不全的問題炼列。
自定義協(xié)議包括如下:
- 一個開始標志:比如定義一個Int類型只搁,4個字節(jié)的標志。那么在讀到這個開始標志的時候就判斷為是一個數(shù)據(jù)包的開始俭尖。
- 數(shù)據(jù)的長度:也是Int4個字節(jié),表明這個數(shù)據(jù)塊的大小是多小個字節(jié)氢惋,這樣根據(jù)這個長度就可以知道數(shù)據(jù)包是否已經(jīng)接受完畢,如果還沒有稽犁,那么就等待焰望。
- 數(shù)據(jù):真正的傳輸數(shù)據(jù)
代碼實現(xiàn)
協(xié)議包對象類
/**
*
* 自己定義的協(xié)議
* 數(shù)據(jù)包格式
* +——----——+——-----——+——----——+
* |協(xié)議開始標志| 長度 | 數(shù)據(jù) |
* +——----——+——-----——+——----——+
* 1.協(xié)議開始標志head_data,為int類型的數(shù)據(jù)已亥,16進制表示為0X76
* 2.傳輸數(shù)據(jù)的長度contentLength熊赖,int類型
* 3.要傳輸?shù)臄?shù)據(jù)
*
*/
public class CustomDate {
/**
* 消息開頭的信息標志
* 是一個常量 X077
*/
private final int head_Date = Costom.HEAD_DATA.getVaule();
/**
* 消息的長度
*/
private int contentLength;
/**
* 消息的內(nèi)容
*/
private byte[] conctent;
public CustomDate() {
super();
}
public CustomDate(int contentLength, byte[] conctent) {
this.contentLength = contentLength;
this.conctent = conctent;
}
public int getContentLength() {
return contentLength;
}
public void setContentLength(int contentLength) {
this.contentLength = contentLength;
}
public byte[] getConctent() {
return conctent;
}
public void setConctent(byte[] conctent) {
this.conctent = conctent;
}
public int getHead_Date() {
return head_Date;
}
}
解碼類
核心思想:
- 1 在開始讀取數(shù)據(jù)的時候先判斷字節(jié)大小是否基本數(shù)據(jù)長度 (標志+數(shù)據(jù)長度)
- 2 如果緩沖區(qū)數(shù)據(jù)太大,這種情況不正常虑椎,應該移動2048個字節(jié)震鹉,直接處理后面的字節(jié)俱笛。因為,可能是網(wǎng)絡延遲導致传趾,或者是惡意發(fā)送大量數(shù)據(jù)迎膜。
- 3 開始讀取緩沖區(qū)了,對緩沖區(qū)的操作浆兰。首先標記一下閱讀標記點磕仅,然后開始尋找開始標記,如果不是開始標記簸呈,那么就跳過一個標記節(jié)點榕订。
- 4 如果找到了開始標記,那么就繼續(xù)獲取長度蝶棋。如果長度大小大于緩沖區(qū)的可讀長度卸亮,那么就證明還有數(shù)據(jù)還沒到。就回滾到閱讀標記點玩裙。繼續(xù)等待數(shù)據(jù)兼贸。
- 5 如果數(shù)據(jù)已經(jīng)到達了,那么就開始讀取數(shù)據(jù)區(qū)吃溅。
繼承 ByteToMessageDecoder 類溶诞。該類主要作用是將從網(wǎng)絡緩沖區(qū)讀取的字節(jié)轉換成有意義的消息對象的
/**
*
* 自己定義的協(xié)議
* 數(shù)據(jù)包格式
* +——----——+——-----——+——----——+
* |協(xié)議開始標志| 長度 | 數(shù)據(jù) |
* +——----——+——-----——+——----——+
* 1.協(xié)議開始標志head_data,為int類型的數(shù)據(jù)决侈,16進制表示為0X76
* 2.傳輸數(shù)據(jù)的長度contentLength螺垢,int類型
* 3.要傳輸?shù)臄?shù)據(jù),長度不應該超過2048,防止socket流的攻擊
*
*/
public class CustomDecoder extends ByteToMessageDecoder {
/**
* 協(xié)議開始的標準head_data赖歌,int類型枉圃,占據(jù)4個字節(jié).
* 表示數(shù)據(jù)的長度contentLength,int類型庐冯,占據(jù)4個字節(jié).
*/
private final int BASE_LENGTH = 4 + 4;
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf buffer, List<Object> out) throws Exception {
//1. 首先確認可讀長度大于基本長度
if (buffer.readableBytes() > BASE_LENGTH) {
//2.
// 防止socket字節(jié)流攻擊
// 防止孽亲,客戶端傳來的數(shù)據(jù)過大
// 因為,太大的數(shù)據(jù)展父,是不合理的
if (buffer.readableBytes() > 2048) {
//將readerIndex移動
buffer = buffer.skipBytes(buffer.readableBytes());
}
//3. 記錄閱讀開始
int beginRead;
while (true) {
//獲取包頭開始的index;
beginRead = buffer.readerIndex();
// 標記包頭開始的index
buffer.markReaderIndex();
//如果讀到了數(shù)據(jù)包的協(xié)議開頭返劲,那么就結束循環(huán)
if (buffer.readInt() == Costom.HEAD_DATA.getVaule()) {
break;
}
//沒讀到協(xié)議開頭,退回到標記
buffer.resetReaderIndex();
//跳過一個字節(jié)
buffer.readByte();
//如果可讀長度小于基本長度
//
if (buffer.readableBytes() < BASE_LENGTH) {
return;
}
}
//獲取消息的長度
int length = buffer.readInt();
//判斷請求數(shù)據(jù)包是否到齊
if (buffer.readableBytes() < length) {
buffer.resetReaderIndex();
return;
}
byte[] date = new byte[length];
buffer.readBytes(date);
CustomDate customDate = new CustomDate(length, date);
out.add(customDate);
}
}
}
編碼類
- 其實就是往緩沖區(qū)里面寫數(shù)據(jù)栖茉。
- 繼承 MessageToByteEncoder
public class CustomEncoder extends MessageToByteEncoder<CustomDate> {
@Override
protected void encode(ChannelHandlerContext ctx, CustomDate msg, ByteBuf out) throws Exception {
out.writeInt(msg.getHead_Date());
out.writeInt(msg.getContentLength());
out.writeBytes(msg.getConctent());
}
}
添加到管道
public class ServerHandlerInitializer extends ChannelInitializer<Channel> {
@Override
protected void initChannel(Channel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new CustomDecoder());
pipeline.addLast(new CustomEncoder());
pipeline.addLast(new ServerHandler());
}
}
輸出協(xié)議數(shù)據(jù)
public class ServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (msg instanceof CustomDate) {
CustomDate customDate= (CustomDate) msg;
byte[] conctent = customDate.getConctent();
System.out.println("獲取到的內(nèi)容"+new String(conctent));
ReferenceCountUtil.release(msg);
}
}
}
過程分析
我們研究一下解碼的過程篮绿。
1 自定義的解碼類是繼承ByteToMessageDecoder類。先看下ByteToMessageDecoder類
public abstract class ByteToMessageDecoder extends ChannelInboundHandlerAdapter{}
可以看到ByteToMessageDecoder 是繼承ChannelInboundHandlerAdapter吕漂,那也就是說亲配,數(shù)據(jù)處理應該是通過重寫channelRead()類了。
2 那就繼續(xù)看ByteToMessageDecoder 的channelRead() 方法
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (msg instanceof ByteBuf) {
CodecOutputList out = CodecOutputList.newInstance();
try {
// 獲取到緩沖區(qū)
ByteBuf data = (ByteBuf) msg;
first = cumulation == null;
if (first) {
cumulation = data;
} else {
cumulation = cumulator.cumulate(ctx.alloc(), cumulation, data);
}
// 2 開始解碼
callDecode(ctx, cumulation, out);
} finally {
// 資源釋放代碼
}
} else {
ctx.fireChannelRead(msg);
}
}
重點是2 callDecode()方法。該方法是開始解碼弃榨。繼續(xù)往下看該方法
protected void callDecode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
try {
// 1
while (in.isReadable()) {
int outSize = out.size();
// 2
if (outSize > 0) {
fireChannelRead(ctx, out, outSize);
out.clear();
if (ctx.isRemoved()) {
break;
}
outSize = 0;
}
//3
int oldInputLength = in.readableBytes();
//4
decodeRemovalReentryProtection(ctx, in, out);
if (ctx.isRemoved()) {
break;
}
//5
if (outSize == out.size()) {
if (oldInputLength == in.readableBytes()) {
break;
} else {
continue;
}
}
if (oldInputLength == in.readableBytes()) {
throw new DecoderException( );
}
if (isSingleDecode()) {
break;
}
}
}
}
1 用while循環(huán)不斷處理緩沖區(qū)菩收,判斷條件是如果緩沖區(qū)還有可讀數(shù)據(jù),就繼續(xù)執(zhí)行鲸睛。
2 這個是非常好的設計娜饵,Out變量存儲的是解碼生成的對象。如果out里面已經(jīng)有對象官辈,那么就把該對象通過fireChannelRead()方法傳到下一個handler(也就是本程序中的輸出handler)箱舞。
出現(xiàn)這種情況是因為:粘包!H凇G绻伞!肺魁! 當處理完一個數(shù)據(jù)包的數(shù)據(jù)后电湘,緩沖區(qū)還有下一個數(shù)據(jù)包的數(shù)據(jù),所以先把處理完的數(shù)據(jù)包交給下一個handler處理后鹅经,再進行緩沖區(qū)的讀取寂呛。
3 做一個標記。記錄這次解碼對緩沖區(qū)數(shù)據(jù)有沒有被讀锐巍(也就是有沒有讀取數(shù)據(jù))贷痪。如果沒有,下面就會結束while循環(huán)蹦误。
為什么會要做這個標記呢劫拢?為什么要結束循環(huán)呢?
因為:緩沖區(qū)的數(shù)據(jù)沒有讀取强胰,也就是說數(shù)據(jù)還沒全部到齊舱沧,需要等待數(shù)據(jù)完整再處理。所以就需要結束while循環(huán)偶洋。等待下一次的處理熟吏。
4 decodeRemovalReentryProtection() 就是調用自己重寫的decode()方法了。
final void decodeRemovalReentryProtection(ChannelHandlerContext ctx, ByteBuf in, List<Object> out)
throws Exception {
decodeState = STATE_CALLING_CHILD_DECODE;
try {
// 自己重寫的decode
decode(ctx, in, out);
} finally {
//省略
}
}
5 這里就是判斷3 中的標記涡真,是否退出循環(huán)。
過程流程圖
看完代碼分析還是一頭霧水肾筐? 那就再看一下流程圖吧