Netty自定義TCP協(xié)議通訊實例
網(wǎng)絡(luò)編程的基本模型就是客戶機到服務(wù)器模型,簡單來說就是進程與進程之間的通訊
兩個進程之間必須要有一個提供固定的位置叁熔,讓另一個進程知道這個位置并建立聯(lián)系,這樣兩個進程就可以相互通訊
其中提供固定位置的叫服務(wù)端床牧,連接固定位置的叫客戶端
實際上因為是雙向通訊者疤,服務(wù)端客戶端并沒有太大的界限
今天寫一個實例,使用netty4來實現(xiàn)一個自定義的tcp報文解析
先說說自定義tcp協(xié)議的意義
文本協(xié)議:
對協(xié)議頭的 size 沒特別要求對解析速度沒特別要求需要協(xié)議輕松
支持變長 header 且保持較友好的擴展性(如像 HTTP 一樣可以隨意增加 header)
可升級協(xié)議且不增加協(xié)議本身復(fù)雜度
要求容易調(diào)試
二進制協(xié)議:
要求協(xié)議頭 size 要足夠小
要求解析速度要足夠快
協(xié)議頭相對較固定叠赦,變動的可能性較低,但仍然需要留一些擴展位
比如即時通訊就很適合用二進制協(xié)議革砸,因為即時通訊需要頻繁收發(fā)消息除秀,對于傳輸和解析的速度都有比較高的要求
另外一個就是二進制協(xié)議沒有協(xié)議文檔幾乎不可讀
進入代碼階段
本文代碼鏈接:后端代碼
一、服務(wù)端
1.新建工程
這次不需要boot了算利,新建一個空的maven項目引入依賴
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.53.Final</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.12</version>
</dependency>
<!-- log -->
<!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-api -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.25</version>
</dependency>
<!-- https://mvnrepository.com/artifact/ch.qos.logback/logback-classic -->
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>1.2.3</version>
</dependency>
<!-- https://mvnrepository.com/artifact/commons-codec/commons-codec -->
<dependency>
<groupId>commons-codec</groupId>
<artifactId>commons-codec</artifactId>
<version>1.15</version>
</dependency>
其中l(wèi)ogback是給lombok的@Slf4j的實現(xiàn)册踩,commons-codec用于獲取md5
2.實現(xiàn)一個服務(wù)端
netty的基本使用就是建立一個worker和一個boss EventLoopGroup,然后配置編碼解碼器以及自定義handler效拭,事件監(jiān)聽等暂吉,使得開發(fā)者的核心可以轉(zhuǎn)移到處理數(shù)據(jù)。
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true).handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(
// 自定義解碼器
new DecodeHandler(),
new StringEncoder(CharsetUtil.UTF_8),
// 自定義的文件處理handler
new TcpServerHandler());
}
});
try {
int port = 8888;
String host = "127.0.0.1";
ChannelFuture future = bootstrap.bind(host, port).sync();
log.info("啟動服務(wù)器 端口[{}]", port);
future.channel().closeFuture().sync();
} catch (InterruptedException e) {
log.error("%s", e);
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
除了常規(guī)配置之外缎患,我們要做的就是實現(xiàn)一個自定義的解碼器DecodeHandler以及消息處理器TcpServerHandler
3.協(xié)議制定
開始解碼前我們需要有一個協(xié)議慕的,才能知道該怎么解
業(yè)務(wù)的報文處理
解析
定義協(xié)議 PS:x表示占幾個字節(jié)
[[版本x2][報文總長度x2][消息類型x1][文件標(biāo)識x16][分片序號x2][文件總大小x8][分片文件長度x2][分片文件數(shù)據(jù)]]
消息類型 0x01 上傳文件 0x02 合并文件
示例報文(16進制):
0055041d015b886f17f511ed7908128e68520b8aa300000000000001159cf50400504b03041400000000003b0a3948000000000000000000000000180000007068616e746f6d6a732d322e312e312d77696e646f77732f504b0304140000000000670a39480000000000000000000000001c0000007068616e746f6d6a732d322e312e312d77696e646f77732f62696e2f504b0304140000000800bd0a394821ec01d2cbbf140100a01b01290000007068616e746f6d6a732d322e312e312d77696e646f77732f62696e2f7068616e746f6d6a732e657865ec9a8b3fd3ff1ec777373336b5106313b94d482a22e6325b85264285ad92cba2167329b731b799cbeae474fd955427bf2ebf748e44c45c6a2315e9942256a9c8a52531b9ec7cbf9ddbe35c1e8fd31f70be0f1eefafd79efb787edf9feff7cdc383ef4e31040e814010c0a74a0581d442fe7a5021fffba04221102dd25d2d48b5fa23e35aa8cf23e3c0e898043237fe6054fcee38f2dedd070e1ce491f7ec23c7271e20c71c207b6d0d20c71d8cd867a3a98931fddb1aebdb1ef4245caee4fffd73edd4267e3250c79107f987809a3193cc27ffedb5f81f5f6fe2b32e8195c15f0bd4df7646f27997c19af2e375ad7e063fe30a58937fbcff81b30f3ff5c7eb7bf9d13fd6f9951f7e05cc197cc48fd737f1e3ae8395f7b7f513800a726e4005f3a41f755bccde68d0e1df7bc0a441203e503844e07739e8ef991c02836a40b11088d8000a210341d504944bc3c021920928840954b0bb7820874120a81feff867857081d7f17f3d8543a85900f8e3e39ff5af85fa8b0216f20e02896561e062601d70dd58dc7feed1fc6e0c1c0bf9df47281d0367407efed8ce0cb103ca8f6bfbabd78f9be85f0e3ed00a805b0d9cfee8c58feb2f00eabf715480b3894f88df0b7e41feebb54008402d02d87fe52490ff1f3f7bacb1715a0dee93b1a63a1a1588db19d4fccaf6eb7d730c7c7f2ed0573b185c0702d963fc7a56150db579201e865a5ec7ee68ed24950f0e86c75771542b6c77b7d5c1be3db2860a069b7c1bcf52a56437eb7b29111780fdf157f52da82ec0be86f0d7b4318bbd4cd11117cd39c8ac61c8a275ace0ee8b86f136b4d00e518c92e2f7fda95aa5520d6c80af7511598bd0b9ed3c5dc1ac8a4712b4ab14b86634ae4cd27560d74c8e2451db890383310317ba8862eceb17b13138b65eac4fbf50f3c28e59b27334119f00756bb177350fb6db88c0464477b8cfb02ec2be94eb71ec8d535c89a38d9ff7ae264a911832cc6c8dc4199a3a6ac5ba8cbabed63472bb458c615f12c99a4371af7e106e0da3d80de2c2a6590e656be19b22e4b68eac59f1de52172a8731ec62ea9d93217b2f4678315c901c8ca7565dda3e071fe65d4ae34420c7d69d1ae2835a495d1d6253469adb175a8655e3e0b8ec8059b1674434c7923fd39b3c4644536259dc52a2ba71faf5b95e5e560a47
推薦使用NetAssist調(diào)試
4.解碼器
解碼器的意義在于先一步處理得到報文
TCP是一個“流”協(xié)議,所謂流挤渔,就是沒有界限的一長串二進制數(shù)據(jù)肮街。TCP作為傳輸層協(xié)議并不不了解上層業(yè)務(wù)數(shù)據(jù)的具體含義,它會根據(jù)TCP緩沖區(qū)的實際情況進行數(shù)據(jù)包的劃分判导,所以在業(yè)務(wù)上認為是一個完整的包嫉父,可能會被TCP拆分成多個包進行發(fā)送,也有可能把多個小的包封裝成一個大的數(shù)據(jù)包發(fā)送眼刃,這就是所謂的TCP粘包和拆包問題绕辖。
所以我們需要在解碼器對每一包進行校驗,小于校驗和的包需要粘包擂红,大于校驗和的包需要拆包仪际,這樣才能正確解析
至于怎么解決這個可以專門分析,網(wǎng)上也有很多方案,這里不談
除了粘包拆包問題弟头,我們還可能有對原始報文進行解密的操作吩抓,為了保證下游的handler拿到的bytebuf是與協(xié)議文檔一樣的格式,我們需要先解密赴恨,并把業(yè)務(wù)部分報文傳達下去疹娶,甚至可以在定義了一定格式的報文后,直接在decoder中把二進制數(shù)據(jù)變成對象或者等類型伦连,下游再解析雨饺。
public class DecodeHandler extends ByteToMessageDecoder {
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf byteBuf, List<Object> out) throws Exception {
// 取到tag與報文長度后再處理,各2字節(jié)
if (byteBuf.readableBytes() < 4) {
return;
}
// 記錄當(dāng)前ByteBuf的讀指針位置惑淳,以便下面取報文長度字節(jié)
// pos是一個完整報文的開始位置额港,報文整體會在ByteBuf中移動,類似內(nèi)存管理歧焦,所以基于字節(jié)的判斷報文長度等等移斩,都是基于pos,否則可以在byteBuf.readBytes()之后加绢馍,byteBuf.discardReadBytes();整理ByteBuf向瓷,使pos回到0開始位置
int pos = byteBuf.readerIndex();
//不同協(xié)議頭建議使用策略模式處理
short protocol = byteBuf.readShort();//前面兩個字節(jié)是協(xié)議版本 0x55
int msgLen = byteBuf.readShort();//第三第四個字節(jié)表示報文長度
// 收到的報文長度不足一個完整的報文,繼續(xù)接收
if (byteBuf.readableBytes() < msgLen) {
return;
}
// 提出完整報文(readBytes讀到msg中)舰涌,放到list給下一個Handler處理
if (msgLen > 0) {
out.add(byteBuf.readBytes(msgLen));
}
}
}
5.自定義handler
從上面的DecodeHandler可以看到猖任,前4個字節(jié)用來做協(xié)議版本和報文長度,已經(jīng)read了瓷耙,丟到out中給到下游的是這四個字節(jié)以外的數(shù)據(jù)
// 前面4個字節(jié)已經(jīng)在decode階段跳過了
byte msgType = byteBuf.readByte();// 讀取一個字節(jié)為消息類型
switch (msgType) {
case 0x01:
upload(byteBuf);
break;
case 0x02:
merge(byteBuf);
break;
}
所以在handler中可以直接讀取一個字節(jié)判斷當(dāng)前的報文類型再分發(fā)
然后根據(jù)已有的協(xié)議進行解析
// 業(yè)務(wù)的報文處理
// 解析
// 定義協(xié)議 PS:x表示占幾個字節(jié)
// [[版本x2][報文總長度x2][消息類型x1][文件標(biāo)識x16][分片序號x2][文件總大小x8][分片文件長度x2][分片文件數(shù)據(jù)]]
// 消息類型 0x01 上傳文件 0x02 合并文件
byte[] filenameBytes = new byte[16];
buf.readBytes(filenameBytes);// 讀取16字節(jié)是文件名
String filename = ByteUtils.bytesToHex(filenameBytes);// 我這里是md5不會有中文沒有亂碼的問題
short fileseq = buf.readShort();// 讀取2字節(jié)是分片序號
long filesize = buf.readLong();// 讀取8字節(jié)是文件總大小
short chunksize = buf.readShort();// 讀取2字節(jié)是分片文件長度
byte[] dataBytes = new byte[chunksize];
buf.readBytes(dataBytes);
已經(jīng)有大佬寫好框架朱躺,只需要做解析的同學(xué),做這個不算太難搁痛,稍微有些枯燥长搀,需要認真細心,不要解錯
二落追、客戶端
相對服務(wù)端而言盈滴,客戶端需要搜集本次通訊的報文數(shù)據(jù),無論是protobuf也好byte數(shù)據(jù)也好轿钠,都比服務(wù)端要復(fù)雜一些
由于我們的netty實現(xiàn)了一個tcp服務(wù)端巢钓,所以可以客戶端只要使用tcp協(xié)議就可以通訊,我們使用java提供的socketapi即可
核心代碼
/**
* TODO 本來發(fā)送是不需要返回值的疗垛,但是這里為了使用latch來計算發(fā)了多少症汹,通過返回true來表示發(fā)送成功一個
*
* @param msg
* @return
*/
synchronized public boolean sendMsgBySocket(ByteBuffer msg) {
try {
// Socket socket = getSocketClient();
//這段代碼僅僅調(diào)試用,盡量一個不要開多個socket跟服務(wù)端通信贷腕,應(yīng)該使用同一個鏈路背镇,多次發(fā)送消息咬展,服務(wù)端解決粘包和拆包的問題
// 要連接的服務(wù)端IP地址和端口-----------------------------------
String host = "127.0.0.1";
int port = 8888;
// 與服務(wù)端建立連接
this.socket = new Socket(host, port);
//---------------------------------------------------------
// 建立連接后獲得輸出流
OutputStream outputStream = socket.getOutputStream();
byte[] msgArray = msg.array();
// 報文頭start-----------------------------------------------
// 55表示協(xié)議版本
short protocol = 0x55;
ByteBuffer header = ByteBuffer.allocate(4);
header.putShort(protocol);// 版本x2
// 寫入本次報文總長度 固定是37 + 分片文件長度
short length = (short) (msgArray.length);
header.putShort(length);// 報文總長度x2
// 報文頭end-----------------------------------------------
outputStream.write(header.array());
outputStream.write(msgArray);
outputStream.flush();
InputStream inputStream = socket.getInputStream();
byte[] bytes = new byte[1024];
int read = 0;
while (true) {
// 死循環(huán)直到服務(wù)器返回 這里可以添加超時機制,防止服務(wù)器出問題后無限循環(huán)
read = inputStream.read(bytes);
if (read > 0) {
String result = new String(bytes);
log.info("receive : {}", result);
if ("ok".equals(result)) {
// 假定回復(fù)ok就是成功了
return true;
}
break;
}
}
} catch (Exception e) {
log.error("[{}]", e);
}
return false;
}
這里的含義是我們先組裝好要發(fā)送的報文ByteBuffer瞒斩,然后調(diào)用這個發(fā)起socket通訊
其他分享的代碼
這次做的是通過tcp分片上傳大文件
思想還是和上篇一樣破婆,客戶端分片,通過md5獲得唯一標(biāo)識以免重復(fù)胸囱,每個報文包括分片序號祷舀,唯一標(biāo)識,總大小分片大小還有分片數(shù)據(jù)
因為我們要規(guī)定每個分片的最大大小才能計算分片數(shù)量
所以這里可以利用FileChannel烹笔,分段讀取裳扯,無需切片文件,也避免把整個文件讀取到內(nèi)存
FileChannel fileChannel = FileChannel.open(Paths.get(file.getAbsolutePath()),
EnumSet.of(StandardOpenOption.READ));
// 每個分片最大1024谤职,所以只有最后一片不一定是1024饰豺,但是也能算出來
int lastSize = (int) (fileChannel.size() - ((chunk - 1) * 1024));// 總長度減去前片的和
for (int chunkNum = 0; chunkNum < chunk; chunkNum++) {
int size = chunkNum == chunk ? lastSize : chunkSize;
ByteBuffer buffer = ByteBuffer.allocate(size);
fileChannel.read(buffer);
我規(guī)定每個分片最大1024字節(jié)也就是1kb,所以每次最多讀1k允蜈,最后一片可能小于等于1k冤吨,每循環(huán)一次就把報文放到線程池中等待發(fā)送
利用CountDownLatch設(shè)定一個屏障,要求發(fā)送的總數(shù)達到才關(guān)閉線程池饶套,在服務(wù)端返回ok后latch.countDown()達到計數(shù)的效果
// 設(shè)定一個計數(shù)器
CountDownLatch latch = new CountDownLatch(chunk);
fixedThreadPool.execute(() -> {
// 將組裝好的報文锅很,在線程池中創(chuàng)建線程運行
client.sendMsgBySocket(msgBuffer);
// 應(yīng)該要判斷是否成功的。這里不判斷認為都成功
latch.countDown();
});
// 所有線程沒發(fā)送完全之前 一直等待
latch.await();
分片大小1k確實有點小凤跑,不過這是為了看測試效果,實際上要根據(jù)通訊網(wǎng)絡(luò)來定義分片大小叛复,帶寬好的大一些