一羹令、現(xiàn)象分析
1.1 粘包
通過代碼的方式演示下粘包的現(xiàn)象:
服務(wù)端:
public class HalfPackageServer {
public static void main(String[] args) {
NioEventLoopGroup boss = new NioEventLoopGroup(1);
NioEventLoopGroup worker = new NioEventLoopGroup();
try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.channel(NioServerSocketChannel.class);
serverBootstrap.group(boss, worker);
//設(shè)置服務(wù)器接收端緩沖區(qū)大小為10
serverBootstrap.option(ChannelOption.SO_RCVBUF,10);
serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
//此處打印輸出闷尿,看看收到的內(nèi)容是10次16字節(jié)线椰,還是一次160字節(jié)
printBuf((ByteBuf) msg);
super.channelRead(ctx, msg);
}
});
}
});
ChannelFuture channelFuture = serverBootstrap.bind(8080);
//阻塞等待連接
channelFuture.sync();
//阻塞等待釋放連接
channelFuture.channel().closeFuture().sync();
} catch (InterruptedException e) {
System.out.println("server error:" + e);
} finally {
// 釋放EventLoopGroup
boss.shutdownGracefully();
worker.shutdownGracefully();
}
}
static void printBuf(ByteBuf byteBuf){
StringBuilder stringBuilder = new StringBuilder();
for (int i = 0; i< byteBuf.writerIndex();i++) {
stringBuilder.append(byteBuf.getByte(i));
stringBuilder.append(" ");
}
stringBuilder.append("| 長度:");
stringBuilder.append(byteBuf.writerIndex());
stringBuilder.append("字節(jié)");
System.out.println(stringBuilder);
}
}
客戶端:
public class HalfPackageClient {
public static void main(String[] args) {
NioEventLoopGroup worker = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.channel(NioSocketChannel.class);
bootstrap.group(worker);
bootstrap.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
//建立連接成功后岗钩,會(huì)觸發(fā)active事件
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
//循環(huán)發(fā)送纽窟,每次16字節(jié),共10次
for (int i = 0; i < 10; i++) {
ByteBuf buffer = ctx.alloc().buffer();
buffer.writeBytes(new byte[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15});
ctx.writeAndFlush(buffer);
}
}
});
}
});
ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 8080).sync();
//釋放連接
channelFuture.channel().closeFuture().sync();
} catch (InterruptedException e) {
System.out.println("client error :" + e);
} finally {
//釋放EventLoopGroup
worker.shutdownGracefully();
}
}
}
結(jié)果:
0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 | 長度:160字節(jié)
如上所示兼吓,發(fā)送了10次的16個(gè)字節(jié)臂港,接收到了一個(gè)160字節(jié),而不是10個(gè)16字節(jié)视搏。這就是粘包現(xiàn)象趋艘。
1.2 半包
半包現(xiàn)象我們?nèi)匀皇褂们懊娴拇a,但是服務(wù)端需要多設(shè)置一個(gè)屬性凶朗,即修改服務(wù)端接收緩沖區(qū)的buffer大小,我們這里修改為10個(gè)字節(jié)显拳。
serverBootstrap.option(ChannelOption.SO_RCVBUF,10);
這行代碼起初運(yùn)行完我有點(diǎn)不理解棚愤,明明設(shè)置是10,但是接收到的內(nèi)容確是40杂数,可見這個(gè)設(shè)置的值宛畦,不是10個(gè)字節(jié),通過源碼跟蹤揍移,我發(fā)現(xiàn)這個(gè)數(shù)值ChannelOption的泛型是個(gè)Ingteger次和,估計(jì)是進(jìn)行了類型的計(jì)算,一個(gè)Integer是4個(gè)字節(jié)赶熟,所以有了設(shè)置是10奴烙,最終卻接收40個(gè)字節(jié)。
如果同學(xué)們覺得這個(gè)問題說的不對(duì)的話乏苦,可以幫我指正一下畅形。感謝Q唷!
public static final ChannelOption<Integer> SO_RCVBUF = valueOf("SO_RCVBUF");
服務(wù)端代碼:
public class HalfPackageServer {
public static void main(String[] args) {
NioEventLoopGroup boss = new NioEventLoopGroup(1);
NioEventLoopGroup worker = new NioEventLoopGroup();
try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.channel(NioServerSocketChannel.class);
serverBootstrap.group(boss, worker);
//設(shè)置服務(wù)器接收端緩沖區(qū)大小為10
serverBootstrap.option(ChannelOption.SO_RCVBUF,10);
serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
//此處打印輸出日熬,看看收到的內(nèi)容是10次16字節(jié)棍厌,還是一次160字節(jié)
printBuf((ByteBuf) msg);
super.channelRead(ctx, msg);
}
});
}
});
ChannelFuture channelFuture = serverBootstrap.bind(8080);
//阻塞等待連接
channelFuture.sync();
//阻塞等待釋放連接
channelFuture.channel().closeFuture().sync();
} catch (InterruptedException e) {
System.out.println("server error:" + e);
} finally {
// 釋放EventLoopGroup
boss.shutdownGracefully();
worker.shutdownGracefully();
}
}
static void printBuf(ByteBuf byteBuf){
StringBuilder stringBuilder = new StringBuilder();
for (int i = 0; i< byteBuf.writerIndex();i++) {
stringBuilder.append(byteBuf.getByte(i));
stringBuilder.append(" ");
}
stringBuilder.append("| 長度:");
stringBuilder.append(byteBuf.writerIndex());
stringBuilder.append("字節(jié)");
System.out.println(stringBuilder);
}
}
客戶端與前面的粘包相同。
結(jié)果:
0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 0 1 2 3 | 長度:36字節(jié)
4 5 6 7 8 9 10 11 12 13 14 15 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 0 1 2 3 4 5 6 7 8 9 10 11 | 長度:40字節(jié)
12 13 14 15 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 0 1 2 3 | 長度:40字節(jié)
4 5 6 7 8 9 10 11 12 13 14 15 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 0 1 2 3 4 5 6 7 8 9 10 11 | 長度:40字節(jié)
12 13 14 15 | 長度:4字節(jié)
如上所示竖席,總共160字節(jié)耘纱,總共發(fā)送了五次,每一次都含有不完整16字節(jié)的數(shù)據(jù)毕荐。這就是半包束析,其實(shí)里面也包含的粘包。
至于為什么第一個(gè)只有36东跪,這里沒有具體分析畸陡,但是我們可以猜想下虽填,每次連接的首次應(yīng)該是有表示占用了4個(gè)字節(jié)丁恭。
二、粘包斋日、半包分析
產(chǎn)生粘包和半包的本質(zhì):TCP是流式協(xié)議牲览,消息是無邊界的。
2.1 滑動(dòng)窗口
TCP是一種可靠地傳出協(xié)議恶守,每發(fā)送一個(gè)段就需要進(jìn)行一次確認(rèn)應(yīng)答(ack)處理第献。如何沒收到ack,則會(huì)再次發(fā)送兔港。
但是如果對(duì)于一個(gè)客戶端來說庸毫,每次發(fā)送一個(gè)請(qǐng)求,都要等到另一個(gè)客戶端應(yīng)該的話衫樊,才能發(fā)送下一個(gè)請(qǐng)求飒赃,那么整個(gè)構(gòu)成就成了串行化的過程,大大降低了連接間的傳輸效率科侈。
TCP如何解決效率地下的問題载佳?
引入滑動(dòng)窗口。窗口大小即決定了無需等待應(yīng)答而可以繼續(xù)發(fā)送的數(shù)據(jù)最大值臀栈。
簡(jiǎn)易滑動(dòng)過程如下所示:
窗口實(shí)際就起到一個(gè)緩沖區(qū)的作用蔫慧,同時(shí)也能起到流量控制的作用。
- 只有窗口內(nèi)的數(shù)據(jù)才允許被發(fā)送(綠色)权薯,當(dāng)應(yīng)答(藍(lán)色)未到達(dá)前姑躲,窗口必須停止滑動(dòng)
- 如果 0-100 這個(gè)段的數(shù)據(jù) ack 回來了睡扬,窗口就可以向下滑動(dòng),新的數(shù)據(jù)段會(huì)被加入進(jìn)來進(jìn)行發(fā)送肋联。
- 接收方也會(huì)維護(hù)一個(gè)窗口威蕉,只有落在窗口內(nèi)的數(shù)據(jù)才能允許接收
2.2 粘包現(xiàn)象分析
現(xiàn)象,發(fā)送了10次的16個(gè)字節(jié)橄仍,接收到了一個(gè)160字節(jié)韧涨,而不是10個(gè)16字節(jié)。
-
產(chǎn)生原因:
- 應(yīng)用層:接收方 ByteBuf 設(shè)置太大(Netty 默認(rèn) 1024)
- 滑動(dòng)窗口(TCP):假設(shè)發(fā)送方 256 bytes 表示一個(gè)完整報(bào)文侮繁,但由于接收方處理不及時(shí)且窗口大小足夠大虑粥,這 256 bytes 字節(jié)就會(huì)緩沖在接收方的滑動(dòng)窗口中,當(dāng)接收方滑動(dòng)窗口中緩沖了多個(gè)報(bào)文就會(huì)粘包宪哩。
- Nagle 算法(TCP):會(huì)造成粘包
2.3 半包現(xiàn)象分析
現(xiàn)象娩贷,前面的半包示例代碼發(fā)送了10次的16個(gè)字節(jié),接收到的數(shù)據(jù)有部分是被階段的锁孟,不是完整的16字節(jié)彬祖。
-
產(chǎn)生原因
- 應(yīng)用層:接收方 ByteBuf 小于實(shí)際發(fā)送數(shù)據(jù)量
- 滑動(dòng)窗口(TCP):假設(shè)接收方的窗口只剩了 128 bytes,發(fā)送方的報(bào)文大小是 256 bytes品抽,這時(shí)放不下了储笑,只能先發(fā)送前 128 bytes,等待 ack 后才能發(fā)送剩余部分圆恤,這就造成了半包
- MSS 限制:當(dāng)發(fā)送的數(shù)據(jù)超過 MSS 限制后突倍,會(huì)將數(shù)據(jù)切分發(fā)送,就會(huì)造成半包
2.4 引申:Nagle 算法 和 MSS限制
2.4.1 Nagle算法
TCP中為了提高網(wǎng)絡(luò)的利用率盆昙,經(jīng)常使用一個(gè)叫做Nagle的算法羽历。
該算法是指發(fā)送端即使還有應(yīng)該發(fā)送的數(shù)據(jù),但如果這部分?jǐn)?shù)據(jù)很少的話淡喜,則進(jìn)行延遲發(fā)送的一種處理機(jī)制秕磷。
如果以下兩個(gè)條件都不滿足時(shí),則進(jìn)行一段時(shí)間延遲后炼团,才進(jìn)行發(fā)送:
- 已發(fā)送的數(shù)據(jù)都已經(jīng)收到確認(rèn)應(yīng)答時(shí)
- 可以發(fā)送最大段長度(MSS)的數(shù)據(jù)時(shí)
根據(jù)這個(gè)算法雖然網(wǎng)絡(luò)利用率可以提高澎嚣,但是可能會(huì)發(fā)生某種程度的延遲。對(duì)于時(shí)間要求準(zhǔn)確的系統(tǒng)们镜,往往需要關(guān)閉該算法。
在Netty中如下的條件:
- 如果 SO_SNDBUF 的數(shù)據(jù)達(dá)到 MSS(maximum segment size)润歉,則需要發(fā)送模狭。
- 如果 SO_SNDBUF 中含有 FIN(表示需要連接關(guān)閉)這時(shí)將剩余數(shù)據(jù)發(fā)送,再關(guān)閉踩衩。
- 如果 TCP_NODELAY = true嚼鹉,則直接發(fā)送贩汉。
- 已發(fā)送的數(shù)據(jù)都收到 ack 時(shí),則需要發(fā)送锚赤。
- 上述條件不滿足匹舞,但發(fā)生超時(shí)(一般為 200ms)則需要發(fā)送。
- 除上述情況线脚,延遲發(fā)送
2.4.2 MSS限制
MSS 限制
數(shù)據(jù)鏈路層對(duì)一次能夠發(fā)送的最大數(shù)據(jù)有限制赐稽,每種數(shù)據(jù)鏈路的最大傳輸單元(MTU)都不盡相同。
- 以太網(wǎng)的 MTU 是 1500
- FDDI(光纖分布式數(shù)據(jù)接口)的 MTU 是 4352
- 本地回環(huán)地址的 MTU 是 65535 - 本地測(cè)試不走網(wǎng)卡
MSS 是最大段長度(maximum segment size)浑侥,它是 MTU 刨去 tcp 頭和 ip 頭后剩余能夠作為數(shù)據(jù)傳輸?shù)淖止?jié)數(shù)
- ipv4 tcp 頭占用 20 bytes姊舵,ip 頭占用 20 bytes,因此以太網(wǎng) MSS 的值為 1500 - 40 = 1460
- TCP 在傳遞大量數(shù)據(jù)時(shí)寓落,會(huì)按照 MSS 大小將數(shù)據(jù)進(jìn)行分割發(fā)送
- MSS 的值在三次握手時(shí)通知對(duì)方自己 MSS 的值括丁,然后在兩者之間選擇一個(gè)小值作為 MSS
三、粘包和半包的解決方案
3.1 短連接
每當(dāng)客戶端伶选。發(fā)送一條消息后史飞,就與服務(wù)器斷開連接。
我們需要對(duì)客戶端的代碼進(jìn)行改造仰税,其實(shí)就是构资,將內(nèi)部發(fā)送10次消息的循環(huán)抽出來,一次連接只發(fā)送一個(gè)消息肖卧,需要建立10次連接蚯窥,這個(gè)我就不演示了,不用想都能得到結(jié)果:
- 客戶端發(fā)送一定不會(huì)產(chǎn)生粘包問題塞帐。
- 服務(wù)端只要接收緩沖區(qū)足夠大拦赠,一定不會(huì)產(chǎn)生半包的問題,但是不能完全保證葵姥。
此方案的缺點(diǎn):
1)建立大量的鏈接荷鼠,效率低。
2)不能解決半包問題榔幸。
3.2 固定長度消息
Netty針對(duì)固定長度消息提供了一個(gè)入站處理器FixedLengthFrameDecoder允乐,能夠制定接收消息的固定長度。
服務(wù)端:
public class FixedLengthServer {
public static void main(String[] args) {
NioEventLoopGroup boss = new NioEventLoopGroup(1);
NioEventLoopGroup worker = new NioEventLoopGroup();
try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.channel(NioServerSocketChannel.class);
serverBootstrap.group(boss, worker);
serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new FixedLengthFrameDecoder(8));
ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
//此處打印輸出削咆,看看收到的內(nèi)容是10次16字節(jié)牍疏,還是一次160字節(jié)
printBuf((ByteBuf) msg);
super.channelRead(ctx, msg);
}
});
}
});
ChannelFuture channelFuture = serverBootstrap.bind(8080);
//阻塞等待連接
channelFuture.sync();
//阻塞等待釋放連接
channelFuture.channel().closeFuture().sync();
} catch (InterruptedException e) {
System.out.println("server error:" + e);
} finally {
// 釋放EventLoopGroup
boss.shutdownGracefully();
worker.shutdownGracefully();
}
}
static void printBuf(ByteBuf byteBuf) {
StringBuilder stringBuilder = new StringBuilder();
for (int i = 0; i < byteBuf.writerIndex(); i++) {
stringBuilder.append(byteBuf.getByte(i));
stringBuilder.append(" ");
}
stringBuilder.append("| 長度:");
stringBuilder.append(byteBuf.writerIndex());
stringBuilder.append("字節(jié)");
System.out.println(stringBuilder);
}
}
客戶端:
public class FixedLengthClient {
public static void main(String[] args) {
NioEventLoopGroup worker = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.channel(NioSocketChannel.class);
bootstrap.group(worker);
bootstrap.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
//建立連接成功后,會(huì)觸發(fā)active事件
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
// 發(fā)送內(nèi)容隨機(jī)的數(shù)據(jù)包
Random r = new Random();
char c = 1;
ByteBuf buffer = ctx.alloc().buffer();
for (int i = 0; i < 10; i++) {
byte[] bytes = new byte[8];
for (int j = 0; j < r.nextInt(8); j++) {
bytes[j] = (byte) c;
}
c++;
buffer.writeBytes(bytes);
}
ctx.writeAndFlush(buffer);
}
});
}
});
ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 8080).sync();
//釋放連接
channelFuture.channel().closeFuture().sync();
} catch (InterruptedException e) {
System.out.println("client error :" + e);
} finally {
//釋放EventLoopGroup
worker.shutdownGracefully();
}
}
}
結(jié)果:
1 1 0 0 0 0 0 0 | 長度:8字節(jié)
2 2 2 2 0 0 0 0 | 長度:8字節(jié)
3 0 0 0 0 0 0 0 | 長度:8字節(jié)
4 4 4 0 0 0 0 0 | 長度:8字節(jié)
5 5 5 5 0 0 0 0 | 長度:8字節(jié)
6 0 0 0 0 0 0 0 | 長度:8字節(jié)
7 0 0 0 0 0 0 0 | 長度:8字節(jié)
8 8 0 0 0 0 0 0 | 長度:8字節(jié)
9 9 9 0 0 0 0 0 | 長度:8字節(jié)
10 10 10 10 10 0 0 0 | 長度:8字節(jié)
使用固定長度也有一定的缺點(diǎn)拨齐,消息長度不好確定:
1)過大鳞陨,造成空間浪費(fèi)。
2)過小瞻惋,對(duì)于某些數(shù)據(jù)包可能不夠厦滤。
3.3 分隔符
1)Netty提供了LineBasedFrameDecoder(換行符幀解碼器)援岩。
默認(rèn)以 \n 或 \r\n 作為分隔符,需要指定最大長度掏导,如果超出指定長度仍未出現(xiàn)分隔符享怀,則拋出異常。
2)Netty提供了DelimiterBasedFrameDecoder(自定義分隔符幀解碼器)趟咆。
需要自己指定一個(gè)ByteBuf類型的分隔符添瓷,且需要指定最大長度。
LineBasedFrameDecoder示例代碼:
服務(wù)端
public class LineBasedServer {
public static void main(String[] args) {
NioEventLoopGroup boss = new NioEventLoopGroup(1);
NioEventLoopGroup worker = new NioEventLoopGroup();
try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.channel(NioServerSocketChannel.class);
serverBootstrap.group(boss, worker);
serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new LineBasedFrameDecoder(1024));
ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf byteBuf = (ByteBuf) msg;
System.out.println(byteBuf.toString(StandardCharsets.UTF_8));
super.channelRead(ctx, msg);
}
});
}
});
ChannelFuture channelFuture = serverBootstrap.bind(8080);
//阻塞等待連接
channelFuture.sync();
//阻塞等待釋放連接
channelFuture.channel().closeFuture().sync();
} catch (InterruptedException e) {
System.out.println("server error:" + e);
} finally {
// 釋放EventLoopGroup
boss.shutdownGracefully();
worker.shutdownGracefully();
}
}
}
客戶端:
public class LineBasedClient {
public static void main(String[] args) {
NioEventLoopGroup worker = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.channel(NioSocketChannel.class);
bootstrap.group(worker);
bootstrap.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
//建立連接成功后忍啸,會(huì)觸發(fā)active事件
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
// 發(fā)送帶有分隔符的數(shù)據(jù)包
ByteBuf buffer = ctx.alloc().buffer();
String str = "hello world\nhello world\n\rhello world\nhello world";
buffer.writeBytes(str.getBytes(StandardCharsets.UTF_8));
ctx.writeAndFlush(buffer);
}
});
}
});
ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 8080).sync();
//釋放連接
channelFuture.channel().closeFuture().sync();
} catch (InterruptedException e) {
System.out.println("client error :" + e);
} finally {
//釋放EventLoopGroup
worker.shutdownGracefully();
}
}
}
結(jié)果:
hello world
hello world
hello world
DelimiterBasedFrameDecoder代碼示例仰坦,大體與前一種相同,下面只給出不同位置的代碼:
服務(wù)端:
protected void initChannel(SocketChannel ch) throws Exception {
ByteBuf buffer = ch.alloc().buffer();
buffer.writeBytes("||".getBytes(StandardCharsets.UTF_8));
ch.pipeline().addLast(new DelimiterBasedFrameDecoder(1024, buffer));
ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf byteBuf = (ByteBuf) msg;
System.out.println(byteBuf.toString(StandardCharsets.UTF_8));
super.channelRead(ctx, msg);
}
});
}
客戶端:
String str = "hello world||hello world||hello world||hello world";
使用分隔符的這種方式计雌,也有其缺點(diǎn):處理字符數(shù)據(jù)比較合適悄晃,但如果內(nèi)容本身包含了分隔符,那么就會(huì)解析錯(cuò)誤凿滤。逐個(gè)字節(jié)去比較妈橄,相率也不是很好。
3.4 預(yù)設(shè)長度
LengthFieldBasedFrameDecoder長度字段解碼器翁脆,允許我們?cè)诎l(fā)送的消息當(dāng)中眷蚓,指定消息長度,然后會(huì)根據(jù)這個(gè)長度取讀取響應(yīng)的字節(jié)數(shù)反番。
public LengthFieldBasedFrameDecoder(
int maxFrameLength,
int lengthFieldOffset,
int lengthFieldLength,
int lengthAdjustment,
int initialBytesToStrip)
主要看這個(gè)有5個(gè)字段的構(gòu)造器沙热,下面我們分別看每個(gè)字段的含義是什么。
字段含義我先列在這:
maxFrameLength:最大長度罢缸。
lengthFieldOffset:長度字段偏移量篙贸。
lengthFieldLength:長度字段長度。
lengthAdjustment:以長度字段為基準(zhǔn)枫疆,還有幾個(gè)字節(jié)是內(nèi)容爵川。
initialBytesToStrip:從頭剝離幾個(gè)字節(jié)。
通過Netty提供的幾個(gè)例子息楔,解釋這幾個(gè)字段的意思寝贡。
例子1:
- lengthFieldOffset = 0
- lengthFieldLength = 2
- lengthAdjustment = 0
- initialBytesToStrip = 0
則發(fā)送前,總共14個(gè)字節(jié)值依,其中l(wèi)engthFieldLength占據(jù)兩個(gè)字節(jié)圃泡,0x000C表示消息長度是12:
Length | Actual Content |
---|---|
0x000C | "HELLO, WORLD" |
接收解析后,仍然是14個(gè)字節(jié):
Length | Actual Content |
---|---|
0x000C | "HELLO, WORLD" |
例子2:
- lengthFieldOffset = 0
- lengthFieldLength = 2
- lengthAdjustment = 0
- initialBytesToStrip = 2
則發(fā)送前愿险,總共14個(gè)字節(jié)颇蜡,其中l(wèi)engthFieldLength占據(jù)兩個(gè)字節(jié),0x000C表示消息長度是12,initialBytesToStrip表示從頭開始剝離2個(gè)字節(jié)澡匪,則解析后,將長度的字段剝離掉了:
Length | Actual Content |
---|---|
0x000C | "HELLO, WORLD" |
接收解析后褒链,只有12個(gè)字節(jié):
Actual Content |
---|
"HELLO, WORLD" |
例子3:
- lengthFieldOffset = 2
- lengthFieldLength = 3
- lengthAdjustment = 0
- initialBytesToStrip = 0
則發(fā)送前唁情,總共17個(gè)字節(jié),其中l(wèi)engthFieldLength占據(jù)3個(gè)字節(jié)甫匹,長度字段偏移量是2甸鸟,偏移量用來存放魔數(shù)Header 1:
Header 1 | Length | Actual Content |
---|---|---|
0xCAFE | 0x00000C | "HELLO, WORLD" |
接收解析后,只有17個(gè)字節(jié):
Header 1 | Length | Actual Content |
---|---|---|
0xCAFE | 0x00000C | "HELLO, WORLD" |
例子4:
- lengthFieldOffset = 0
- lengthFieldLength = 3
- lengthAdjustment = 2
- initialBytesToStrip = 0
則發(fā)送前兵迅,總共17個(gè)字節(jié)抢韭,其中l(wèi)engthFieldLength占據(jù)3個(gè)字節(jié),lengthAdjustment 表示從長度字段開始恍箭,還有2個(gè)字節(jié)是內(nèi)容:
Length | Header 1 | Actual Content |
---|---|---|
0x00000C | 0xCAFE | "HELLO, WORLD" |
接收解析后刻恭,只有17個(gè)字節(jié):
Length | Header 1 | Actual Content |
---|---|---|
0x00000C | 0xCAFE | "HELLO, WORLD" |
例子5:
- lengthFieldOffset = 1
- lengthFieldLength = 2
- lengthAdjustment = 1
- initialBytesToStrip = 3
則發(fā)送前,總共17個(gè)字節(jié)扯夭,長度字段便宜1個(gè)字節(jié)鳍贾,長度字段為2個(gè)字節(jié),從長度字段開始交洗,還有一個(gè)字節(jié)后是內(nèi)容骑科,忽略前三個(gè)字節(jié)。
Header 1 | Length | Header 2 | Actual Content |
---|---|---|---|
0xCA | 0x000C | 0xFE | "HELLO, WORLD" |
接收解析后构拳,只有13個(gè)字節(jié):
Header 2 | Actual Content |
---|---|
0xFE | "HELLO, WORLD" |
模擬例子5咆爽,寫一段實(shí)例代碼,其中內(nèi)容略有不同:
示例代碼:
服務(wù)端
/**
* @description: TODO
* @author:weirx
* @date:2021/11/12 15:34
* @version:3.0
*/
public class LineBasedServer {
public static void main(String[] args) {
NioEventLoopGroup boss = new NioEventLoopGroup(1);
NioEventLoopGroup worker = new NioEventLoopGroup();
try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.channel(NioServerSocketChannel.class);
serverBootstrap.group(boss, worker);
serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new LengthFieldBasedFrameDecoder(1024,1,4,1,5));
ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf byteBuf = (ByteBuf) msg;
System.out.println(byteBuf.readByte() + "|" + byteBuf.toString(StandardCharsets.UTF_8));
super.channelRead(ctx, msg);
}
});
}
});
ChannelFuture channelFuture = serverBootstrap.bind(8080);
//阻塞等待連接
channelFuture.sync();
//阻塞等待釋放連接
channelFuture.channel().closeFuture().sync();
} catch (InterruptedException e) {
System.out.println("server error:" + e);
} finally {
// 釋放EventLoopGroup
boss.shutdownGracefully();
worker.shutdownGracefully();
}
}
}
客戶端:
public class LineBasedClient {
public static void main(String[] args) {
NioEventLoopGroup worker = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.channel(NioSocketChannel.class);
bootstrap.group(worker);
bootstrap.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
//建立連接成功后置森,會(huì)觸發(fā)active事件
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
send(ctx,"hello, world");
send(ctx,"HI!");
}
});
}
});
ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 8080).sync();
//釋放連接
channelFuture.channel().closeFuture().sync();
} catch (InterruptedException e) {
System.out.println("client error :" + e);
} finally {
//釋放EventLoopGroup
worker.shutdownGracefully();
}
}
static void send(ChannelHandlerContext ctx,String msg){
ByteBuf buffer = ctx.alloc().buffer();
byte[] bytes = msg.getBytes();
int length = bytes.length;
//先寫Header 1
buffer.writeByte(1);
//再寫長度
buffer.writeInt(length);
//再寫Header 2
buffer.writeByte(2);
//最后寫內(nèi)容
buffer.writeBytes(bytes);
ctx.writeAndFlush(buffer);
}
}
結(jié)果:
2|hello, world
2|HI!
關(guān)于粘包和半包的介紹就這么多了斗埂,有用的話,幫忙點(diǎn)個(gè)贊吧~~