前言
記得前段時(shí)間我們生產(chǎn)上的一個(gè)網(wǎng)關(guān)出現(xiàn)了故障粹污。
這個(gè)網(wǎng)關(guān)邏輯非常簡(jiǎn)單段多,就是接收客戶端的請(qǐng)求然后解析報(bào)文最后發(fā)送短信。
但這個(gè)請(qǐng)求并不是常見的 HTTP 壮吩,而是利用 Netty 自定義的協(xié)議衩匣。
有個(gè)前提是:網(wǎng)關(guān)是需要讀取一段完整的報(bào)文才能進(jìn)行后面的邏輯。
問題是有天突然發(fā)現(xiàn)網(wǎng)關(guān)解析報(bào)文出錯(cuò)粥航,查看了客戶端的發(fā)送日志也沒發(fā)現(xiàn)問題琅捏,最后通過(guò)日志發(fā)現(xiàn)收到了許多不完整的報(bào)文,有些還多了递雀。
于是想會(huì)不會(huì)是 TCP 拆柄延、粘包帶來(lái)的問題,最后利用 Netty 自帶的拆包工具解決了該問題缀程。
這便有了此文搜吧。
TCP 協(xié)議
問題雖然解決了,但還是得想想原因杨凑,為啥會(huì)這樣滤奈?打破砂鍋問到底才是一個(gè)靠譜的程序員。
這就得從 TCP 這個(gè)協(xié)議說(shuō)起了撩满。
TCP 是一個(gè)面向字節(jié)流的協(xié)議蜒程,它是性質(zhì)是流式的,所以它并沒有分段伺帘。就像水流一樣昭躺,你沒法知道什么時(shí)候開始,什么時(shí)候結(jié)束。
所以他會(huì)根據(jù)當(dāng)前的套接字緩沖區(qū)的情況進(jìn)行拆包或是粘包。
下圖展示了一個(gè) TCP 協(xié)議傳輸?shù)倪^(guò)程:
發(fā)送端的字節(jié)流都會(huì)先傳入緩沖區(qū)串述,再通過(guò)網(wǎng)絡(luò)傳入到接收端的緩沖區(qū)中,最終由接收端獲取帝洪。
當(dāng)我們發(fā)送兩個(gè)完整包到接收端的時(shí)候:
正常情況會(huì)接收到兩個(gè)完整的報(bào)文。
但也有以下的情況:
接收到的是一個(gè)報(bào)文脚猾,它是由發(fā)送的兩個(gè)報(bào)文組成的葱峡,這樣對(duì)于應(yīng)用程序來(lái)說(shuō)就很難處理了(這樣稱為粘包)。
還有可能出現(xiàn)上面這樣的雖然收到了兩個(gè)包婚陪,但是里面的內(nèi)容卻是互相包含族沃,對(duì)于應(yīng)用來(lái)說(shuō)依然無(wú)法解析(拆包)。
對(duì)于這樣的問題只能通過(guò)上層的應(yīng)用來(lái)解決泌参,常見的方式有:
- 在報(bào)文末尾增加換行符表明一條完整的消息脆淹,這樣在接收端可以根據(jù)這個(gè)換行符來(lái)判斷消息是否完整。
- 將消息分為消息頭沽一、消息體盖溺。可以在消息頭中聲明消息的長(zhǎng)度铣缠,根據(jù)這個(gè)長(zhǎng)度來(lái)獲取報(bào)文(比如 808 協(xié)議)烘嘱。
- 規(guī)定好報(bào)文長(zhǎng)度,不足的空位補(bǔ)齊蝗蛙,取的時(shí)候按照長(zhǎng)度截取即可蝇庭。
以上的這些方式我們?cè)?Netty 的 pipline 中里加入對(duì)應(yīng)的解碼器都可以手動(dòng)實(shí)現(xiàn)。
但其實(shí) Netty 已經(jīng)幫我們做好了捡硅,完全可以開箱即用哮内。
比如:
-
LineBasedFrameDecoder
可以基于換行符解決。 -
DelimiterBasedFrameDecoder
可基于分隔符解決壮韭。 -
FixedLengthFrameDecoder
可指定長(zhǎng)度解決北发。
字符串拆、粘包
下面來(lái)模擬一下最簡(jiǎn)單的字符串傳輸喷屋。
還是在之前的
https://github.com/crossoverJie/netty-action
進(jìn)行演示琳拨。
在 Netty 客戶端中加了一個(gè)入口可以循環(huán)發(fā)送 100 條字符串報(bào)文到接收端:
/**
* 向服務(wù)端發(fā)消息 字符串
* @param stringReqVO
* @return
*/
@ApiOperation("客戶端發(fā)送消息,字符串")
@RequestMapping(value = "sendStringMsg", method = RequestMethod.POST)
@ResponseBody
public BaseResponse<NULLBody> sendStringMsg(@RequestBody StringReqVO stringReqVO){
BaseResponse<NULLBody> res = new BaseResponse();
for (int i = 0; i < 100; i++) {
heartbeatClient.sendStringMsg(stringReqVO.getMsg()) ;
}
// 利用 actuator 來(lái)自增
counterService.increment(Constants.COUNTER_CLIENT_PUSH_COUNT);
SendMsgResVO sendMsgResVO = new SendMsgResVO() ;
sendMsgResVO.setMsg("OK") ;
res.setCode(StatusEnum.SUCCESS.getCode()) ;
res.setMessage(StatusEnum.SUCCESS.getMessage()) ;
return res ;
}
/**
* 發(fā)送消息字符串
*
* @param msg
*/
public void sendStringMsg(String msg) {
ByteBuf message = Unpooled.buffer(msg.getBytes().length) ;
message.writeBytes(msg.getBytes()) ;
ChannelFuture future = channel.writeAndFlush(message);
future.addListener((ChannelFutureListener) channelFuture ->
LOGGER.info("客戶端手動(dòng)發(fā)消息成功={}", msg));
}
服務(wù)端直接打印即可:
@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
LOGGER.info("收到msg={}", msg);
}
順便提一下屯曹,這里加的有一個(gè)字符串的解碼器:.addLast(new StringDecoder())
其實(shí)就是把消息解析為字符串狱庇。
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out) throws Exception {
out.add(msg.toString(charset));
}
在 Swagger 中調(diào)用了客戶端的接口用于給服務(wù)端發(fā)送了 100 次消息:
正常情況下接收端應(yīng)該打印 100 次 hello
才對(duì),但是查看日志會(huì)發(fā)現(xiàn):
收到的內(nèi)容有完整的恶耽、多的僵井、少的、拼接的驳棱;這也就對(duì)應(yīng)了上面提到的拆包批什、粘包。
該怎么解決呢社搅?這便可采用之前提到的 LineBasedFrameDecoder
利用換行符解決驻债。
利用 LineBasedFrameDecoder 解決問題
LineBasedFrameDecoder
解碼器使用非常簡(jiǎn)單,只需要在 pipline 鏈條上添加即可形葬。
//字符串解析,換行防拆包
.addLast(new LineBasedFrameDecoder(1024))
.addLast(new StringDecoder())
構(gòu)造函數(shù)中傳入了 1024 是指報(bào)的長(zhǎng)度最大不超過(guò)這個(gè)值合呐,具體可以看下文的源碼分析。
然后我們?cè)龠M(jìn)行一次測(cè)試看看結(jié)果:
注意笙以,由于 LineBasedFrameDecoder 解碼器是通過(guò)換行符來(lái)判斷的淌实,所以在發(fā)送時(shí),一條完整的消息需要加上
\n
。
最終的結(jié)果:
仔細(xì)觀察日志拆祈,發(fā)現(xiàn)確實(shí)沒有一條被拆恨闪、粘包。
LineBasedFrameDecoder 的原理
目的達(dá)到了放坏,來(lái)看看它的實(shí)現(xiàn)原理:
- 第一步主要就是
findEndOfLine
方法去找到當(dāng)前報(bào)文中是否存在分隔符咙咽,存在就會(huì)返回分隔符所在的位置。 - 判斷是否需要丟棄淤年,默認(rèn)為 false 钧敞,第一次走這個(gè)邏輯(下文會(huì)判斷是否需要改為 true)。
- 如果報(bào)文中存在換行符麸粮,就會(huì)將數(shù)據(jù)截取到那個(gè)位置溉苛。
- 如果不存在換行符(有可能是拆包、粘包)弄诲,就看當(dāng)前報(bào)文的長(zhǎng)度是否大于預(yù)設(shè)的長(zhǎng)度愚战。大于則需要緩存這個(gè)報(bào)文長(zhǎng)度,并將 discarding 設(shè)為 true威根。
- 如果是需要丟棄時(shí)凤巨,判斷是否找到了換行符,存在則需要丟棄掉之前記錄的長(zhǎng)度然后截取數(shù)據(jù)洛搀。
- 如果沒有找到換行符敢茁,則將之前緩存的報(bào)文長(zhǎng)度進(jìn)行累加,用于下次拋棄留美。
從這個(gè)邏輯中可以看出就是尋找報(bào)文中是否包含換行符彰檬,并進(jìn)行相應(yīng)的截取。
由于是通過(guò)緩沖區(qū)讀取的谎砾,所以即使這次沒有換行符的數(shù)據(jù)逢倍,只要下一次的報(bào)文存在換行符,上一輪的數(shù)據(jù)也不會(huì)丟景图。
高效的編碼方式 Google Protocol
上面提到的其實(shí)就是在解碼中進(jìn)行操作较雕,我們也可以自定義自己的拆、粘包工具挚币。
編解碼的主要目的就是為了可以編碼成字節(jié)流用于在網(wǎng)絡(luò)中傳輸亮蒋、持久化存儲(chǔ)。
Java 中也可以實(shí)現(xiàn) Serializable 接口來(lái)實(shí)現(xiàn)序列化妆毕,但由于它性能等原因在一些 RPC 調(diào)用中用的很少慎玖。
而 Google Protocol
則是一個(gè)高效的序列化框架,下面來(lái)演示在 Netty 中如何使用笛粘。
安裝
首先第一步自然是安裝:
在官網(wǎng)下載對(duì)應(yīng)的包趁怔。
本地配置環(huán)境變量:
當(dāng)執(zhí)行 protoc --version
出現(xiàn)以下結(jié)果表明安裝成功:
定義自己的協(xié)議格式
接著是需要按照官方要求的語(yǔ)法定義自己的協(xié)議格式湿硝。
比如我這里需要定義一個(gè)輸入輸出的報(bào)文格式:
BaseRequestProto.proto:
syntax = "proto2";
package protocol;
option java_package = "com.crossoverjie.netty.action.protocol";
option java_outer_classname = "BaseRequestProto";
message RequestProtocol {
required int32 requestId = 2;
required string reqMsg = 1;
}
BaseResponseProto.proto:
syntax = "proto2";
package protocol;
option java_package = "com.crossoverjie.netty.action.protocol";
option java_outer_classname = "BaseResponseProto";
message ResponseProtocol {
required int32 responseId = 2;
required string resMsg = 1;
}
再通過(guò)
protoc --java_out=/dev BaseRequestProto.proto BaseResponseProto.proto
protoc 命令將剛才定義的協(xié)議格式轉(zhuǎn)換為 Java 代碼,并生成在 /dev
目錄润努。
只需要將生成的代碼拷貝到我們的項(xiàng)目中关斜,同時(shí)引入依賴:
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>3.4.0</version>
</dependency>
利用 Protocol 的編解碼也非常簡(jiǎn)單:
public class ProtocolUtil {
public static void main(String[] args) throws InvalidProtocolBufferException {
BaseRequestProto.RequestProtocol protocol = BaseRequestProto.RequestProtocol.newBuilder()
.setRequestId(123)
.setReqMsg("你好啊")
.build();
byte[] encode = encode(protocol);
BaseRequestProto.RequestProtocol parseFrom = decode(encode);
System.out.println(protocol.toString());
System.out.println(protocol.toString().equals(parseFrom.toString()));
}
/**
* 編碼
* @param protocol
* @return
*/
public static byte[] encode(BaseRequestProto.RequestProtocol protocol){
return protocol.toByteArray() ;
}
/**
* 解碼
* @param bytes
* @return
* @throws InvalidProtocolBufferException
*/
public static BaseRequestProto.RequestProtocol decode(byte[] bytes) throws InvalidProtocolBufferException {
return BaseRequestProto.RequestProtocol.parseFrom(bytes);
}
}
利用 BaseRequestProto
來(lái)做一個(gè)演示,先編碼再解碼最后比較最終的結(jié)果是否相同任连。答案肯定是一致的蚤吹。
利用 protoc 命令生成的 Java 文件里已經(jīng)幫我們把編解碼全部都封裝好了例诀,只需要簡(jiǎn)單調(diào)用就行了随抠。
可以看出 Protocol 創(chuàng)建對(duì)象使用的是構(gòu)建者模式,對(duì)使用者來(lái)說(shuō)清晰易讀繁涂,更多關(guān)于構(gòu)建器的內(nèi)容可以參考這里拱她。
更多關(guān)于 Google Protocol
內(nèi)容請(qǐng)查看官方開發(fā)文檔。
結(jié)合 Netty
Netty 已經(jīng)自帶了對(duì) Google protobuf 的編解碼器扔罪,也是只需要在 pipline 中添加即可秉沼。
server 端:
// google Protobuf 編解碼
.addLast(new ProtobufDecoder(BaseRequestProto.RequestProtocol.getDefaultInstance()))
.addLast(new ProtobufEncoder())
客戶端:
// google Protobuf 編解碼
.addLast(new ProtobufDecoder(BaseResponseProto.ResponseProtocol.getDefaultInstance()))
.addLast(new ProtobufEncoder())
稍微注意的是,在構(gòu)建 ProtobufDecoder 時(shí)需要顯式指定解碼器需要解碼成什么類型矿酵。
我這里服務(wù)端接收的是 BaseRequestProto唬复,客戶端收到的是服務(wù)端響應(yīng)的 BaseResponseProto 所以就設(shè)置了對(duì)應(yīng)的實(shí)例。
同樣的提供了一個(gè)接口向服務(wù)端發(fā)送消息全肮,當(dāng)服務(wù)端收到了一個(gè)特殊指令時(shí)也會(huì)向客戶端返回內(nèi)容:
@Override
protected void channelRead0(ChannelHandlerContext ctx, BaseRequestProto.RequestProtocol msg) throws Exception {
LOGGER.info("收到msg={}", msg.getReqMsg());
if (999 == msg.getRequestId()){
BaseResponseProto.ResponseProtocol responseProtocol = BaseResponseProto.ResponseProtocol.newBuilder()
.setResponseId(1000)
.setResMsg("服務(wù)端響應(yīng)")
.build();
ctx.writeAndFlush(responseProtocol) ;
}
}
在 swagger 中調(diào)用相關(guān)接口:
在日志可以看到服務(wù)端收到了消息敞咧,同時(shí)客戶端也收到了返回:
雖說(shuō) Netty 封裝了 Google Protobuf 相關(guān)的編解碼工具,其實(shí)查看它的編碼工具就會(huì)發(fā)現(xiàn)也是利用上文提到的 api 實(shí)現(xiàn)的辜腺。
Protocol 拆休建、粘包
Google Protocol 的使用確實(shí)非常簡(jiǎn)單,但還是有值的注意的地方评疗,比如它依然會(huì)有拆测砂、粘包問題。
不妨模擬一下:
連續(xù)發(fā)送 100 次消息看服務(wù)端收到的怎么樣:
會(huì)發(fā)現(xiàn)服務(wù)端在解碼的時(shí)候報(bào)錯(cuò)百匆,其實(shí)就是被拆砌些、粘包了。
這點(diǎn) Netty 自然也考慮到了加匈,所以已經(jīng)提供了相關(guān)的工具存璃。
//拆包解碼
.addLast(new ProtobufVarint32FrameDecoder())
.addLast(new ProtobufVarint32LengthFieldPrepender())
只需要在服務(wù)端和客戶端加上這兩個(gè)編解碼工具即可,再來(lái)發(fā)送一百次試試矩动。
查看日志發(fā)現(xiàn)沒有出現(xiàn)一次異常有巧,100 條信息全部都接收到了。
這個(gè)編解碼工具可以簡(jiǎn)單理解為是在消息體中加了一個(gè) 32 位長(zhǎng)度的整形字段悲没,用于表明當(dāng)前消息長(zhǎng)度篮迎。
總結(jié)
網(wǎng)絡(luò)這塊同樣是計(jì)算機(jī)的基礎(chǔ)男图,由于近期在做相關(guān)的工作所以接觸的比較多,也算是給大學(xué)補(bǔ)課了甜橱。
后面會(huì)接著更新 Netty 相關(guān)的內(nèi)容逊笆,最后會(huì)產(chǎn)出一個(gè)高性能的 HTTP 以及 RPC 框架,敬請(qǐng)期待岂傲。
上文相關(guān)的代碼:
https://github.com/crossoverJie/netty-action
號(hào)外
最近在總結(jié)一些 Java 相關(guān)的知識(shí)點(diǎn)难裆,感興趣的朋友可以一起維護(hù)。
歡迎關(guān)注公眾號(hào)一起交流: