title: Netty 入門
date: 2021/04/06 09:37
一期奔、Netty 概述
1.1 Netty 是什么追城?
Netty is an asynchronous event-driven network application framework
for rapid development of maintainable high performance protocol servers & clients.Netty 是一個異步的、基于事件驅(qū)動的網(wǎng)絡(luò)應(yīng)用框架供炎,用于快速開發(fā)可維護、高性能的網(wǎng)絡(luò)服務(wù)器和客戶端。
1.2 Netty 的優(yōu)勢
Netty vs NIO度苔,工作量大,bug 多
- 需要自己構(gòu)建協(xié)議
- 解決 TCP 傳輸問題浑度,如粘包寇窑、半包
- epoll 空輪詢導(dǎo)致 CPU 100%
- 對 API 進行增強,使之更易用箩张,如 FastThreadLocal => ThreadLocal甩骏,ByteBuf => ByteBuffer
二、Hello World
2.1 小目標
開發(fā)一個簡單的服務(wù)器端和客戶端
- 客戶端向服務(wù)器端發(fā)送 hello, world
- 服務(wù)器僅接收先慷,不返回
加入依賴
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.39.Final</version>
</dependency>
2.2 服務(wù)器端
new ServerBootstrap()
.group(new NioEventLoopGroup()) // 1
.channel(NioServerSocketChannel.class) // 2
.childHandler(new ChannelInitializer<NioSocketChannel>() { // 3
protected void initChannel(NioSocketChannel ch) {
ch.pipeline().addLast(new StringDecoder()); // 5
ch.pipeline().addLast(new SimpleChannelInboundHandler<String>() { // 6
@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) {
System.out.println(msg);
}
});
}
})
.bind(8080); // 4
代碼解讀
1 處横漏,創(chuàng)建 NioEventLoopGroup,可以簡單理解為
線程池 + Selector
后面會詳細展開-
2 處熟掂,選擇服務(wù) Scoket 實現(xiàn)類缎浇,其中 NioServerSocketChannel 表示基于 NIO 的服務(wù)器端實現(xiàn),其它實現(xiàn)還有
3 處赴肚,為啥方法叫 childHandler素跺,是接下來添加的處理器都是給 SocketChannel 用的,而不是給 ServerSocketChannel誉券。ChannelInitializer 處理器(僅在建立連接時執(zhí)行一次)指厌,它的作用是待客戶端 SocketChannel 建立連接后,執(zhí)行 initChannel 以便添加更多的處理器
4 處踊跟,ServerSocketChannel 綁定的監(jiān)聽端口
5 處踩验,SocketChannel 的處理器,解碼 ByteBuf => String
6 處商玫,SocketChannel 的業(yè)務(wù)處理器箕憾,使用上一個處理器的處理結(jié)果
2.3 客戶端
new Bootstrap()
.group(new NioEventLoopGroup()) // 1
.channel(NioSocketChannel.class) // 2
.handler(new ChannelInitializer<Channel>() { // 3
@Override
protected void initChannel(Channel ch) {
ch.pipeline().addLast(new StringEncoder()); // 8
}
})
.connect("127.0.0.1", 8080) // 4
.sync() // 5
.channel() // 6
.writeAndFlush(new Date() + ": hello world!"); // 7
代碼解讀
1 處,創(chuàng)建 NioEventLoopGroup拳昌,同 Server
-
2 處袭异,選擇客戶 Socket 實現(xiàn)類,NioSocketChannel 表示基于 NIO 的客戶端實現(xiàn)炬藤,其它實現(xiàn)還有
3 處御铃,添加 SocketChannel 的處理器碴里,ChannelInitializer 處理器(僅在建立連接時執(zhí)行一次),它的作用是待客戶端 SocketChannel 建立連接后上真,執(zhí)行 initChannel 以便添加更多的處理器
4 處咬腋,指定要連接的服務(wù)器和端口
5 處,Netty 中很多方法都是異步的睡互,如 connect帝火,這時需要使用 sync 方法等待 connect 建立連接完畢
6 處,獲取 channel 對象湃缎,它為通道抽象犀填,可以進行數(shù)據(jù)讀寫操作
7 處,寫入消息并清空緩沖區(qū)
8 處嗓违,消息會經(jīng)過通道 handler 處理九巡,這里是將 String => ByteBuf 發(fā)出
數(shù)據(jù)經(jīng)過網(wǎng)絡(luò)傳輸,到達服務(wù)器端蹂季,服務(wù)器端 5 和 6 處的 handler 先后被觸發(fā)冕广,走完一個流程
2.4 流程梳理
?? 提示
一開始需要樹立正確的觀念
- 把 channel 理解為數(shù)據(jù)的通道
- 把 msg 理解為流動的數(shù)據(jù),最開始輸入是 ByteBuf偿洁,但經(jīng)過 pipeline 的加工撒汉,會變成其它類型對象,最后輸出又變成 ByteBuf
- 把 handler 理解為數(shù)據(jù)的處理工序
- 工序有多道涕滋,合在一起就是 pipeline睬辐,pipeline 負責(zé)發(fā)布事件(讀、讀取完成...)傳播給每個 handler宾肺, handler 對自己感興趣的事件進行處理(重寫了相應(yīng)事件處理方法)
- handler 分 Inbound 和 Outbound 兩類
- 把 eventLoop 理解為處理數(shù)據(jù)的工人
- 工人可以管理多個 channel 的 io 操作溯饵,并且一旦工人負責(zé)了某個 channel,就要負責(zé)到底(綁定)
- 工人既可以執(zhí)行 io 操作锨用,也可以進行普通任務(wù)處理丰刊,每位工人有任務(wù)隊列,隊列里可以堆放多個 channel 的待處理任務(wù)增拥,任務(wù)分為普通任務(wù)啄巧、定時任務(wù)
- 工人按照 pipeline 順序,依次按照 handler 的規(guī)劃(代碼)處理數(shù)據(jù)掌栅,可以為每道工序指定不同的工人
三秩仆、組件
3.1 EventLoop
事件循環(huán)對象,EventLoop = SingleThreadExecutor + Selector
EventLoop 本質(zhì)是一個單線程執(zhí)行器(同時維護了一個 Selector)渣玲,里面有 run 方法處理 Channel 上源源不斷的 io 事件逗概。
它的繼承關(guān)系比較復(fù)雜
- 一條線是繼承自 j.u.c.ScheduledExecutorService 因此包含了線程池中所有的方法
- 另一條線是繼承自 netty 自己的 OrderedEventExecutor,
- 提供了 boolean inEventLoop(Thread thread) 方法判斷一個線程是否屬于此 EventLoop
- 提供了 parent 方法來看看自己屬于哪個 EventLoopGroup
EventLoopGroup 事件循環(huán)組
EventLoopGroup 是一組 EventLoop忘衍,Channel 一般會調(diào)用 EventLoopGroup 的 register 方法來綁定其中一個 EventLoop逾苫,后續(xù)這個 Channel 上的 io 事件都由此 EventLoop 來處理(保證了 io 事件處理時的線程安全)
繼承自 netty 自己的 EventExecutorGroup
- 實現(xiàn)了 Iterable 接口提供遍歷 EventLoop 的能力
- 另有 next 方法獲取集合中下一個 EventLoop
EventLoopGroup 主要有兩個重要實現(xiàn):
- DefaultEventLoopGroup 可以處理普通任務(wù)、定時任務(wù)
- NioEventLoopGroup 可以處理普通任務(wù)枚钓、定時任務(wù)铅搓、IO 事件
以一個簡單的實現(xiàn)為例:
// 內(nèi)部創(chuàng)建了兩個 EventLoop, 每個 EventLoop 維護一個線程
DefaultEventLoopGroup group = new DefaultEventLoopGroup(2);
System.out.println(group.next());
System.out.println(group.next());
System.out.println(group.next());
輸出
io.netty.channel.DefaultEventLoop@60f82f98
io.netty.channel.DefaultEventLoop@35f983a6
io.netty.channel.DefaultEventLoop@60f82f98
也可以使用 for 循環(huán)
DefaultEventLoopGroup group = new DefaultEventLoopGroup(2);
for (EventExecutor eventLoop : group) {
System.out.println(eventLoop);
}
輸出
io.netty.channel.DefaultEventLoop@60f82f98
io.netty.channel.DefaultEventLoop@35f983a6
?? 優(yōu)雅關(guān)閉
優(yōu)雅關(guān)閉 shutdownGracefully
方法。該方法會首先切換 EventLoopGroup
到關(guān)閉狀態(tài)從而拒絕新的任務(wù)的加入搀捷,然后在任務(wù)隊列的任務(wù)都處理完成后星掰,停止線程的運行。從而確保整體應(yīng)用是在正常有序的狀態(tài)下退出的
演示 NioEventLoop 處理 io 事件
服務(wù)器端兩個 nio worker 工人
new ServerBootstrap()
// 細分1:boss 只負責(zé) ServerSocketChannel 上 accept 事件 worker 只負責(zé) socketChannel 上的讀寫
.group(new NioEventLoopGroup(1), new NioEventLoopGroup(2))
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) {
ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ByteBuf byteBuf = msg instanceof ByteBuf ? ((ByteBuf) msg) : null;
if (byteBuf != null) {
byte[] buf = new byte[16];
ByteBuf len = byteBuf.readBytes(buf, 0, byteBuf.readableBytes());
log.debug(new String(buf));
}
}
});
}
}).bind(8080).sync();
客戶端嫩舟,啟動三次氢烘,分別修改發(fā)送字符串為 zhangsan(第一次),lisi(第二次)家厌,wangwu(第三次)
public static void main(String[] args) throws InterruptedException {
Channel channel = new Bootstrap()
.group(new NioEventLoopGroup(1))
.handler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
System.out.println("init...");
ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));
}
})
.channel(NioSocketChannel.class).connect("localhost", 8080)
.sync()
.channel();
channel.writeAndFlush(ByteBufAllocator.DEFAULT.buffer().writeBytes("wangwu".getBytes()));
Thread.sleep(2000);
channel.writeAndFlush(ByteBufAllocator.DEFAULT.buffer().writeBytes("wangwu".getBytes()));
最后輸出
22:03:34 [DEBUG] [nioEventLoopGroup-3-1] c.i.o.EventLoopTest - zhangsan
22:03:36 [DEBUG] [nioEventLoopGroup-3-1] c.i.o.EventLoopTest - zhangsan
22:05:36 [DEBUG] [nioEventLoopGroup-3-2] c.i.o.EventLoopTest - lisi
22:05:38 [DEBUG] [nioEventLoopGroup-3-2] c.i.o.EventLoopTest - lisi
22:06:09 [DEBUG] [nioEventLoopGroup-3-1] c.i.o.EventLoopTest - wangwu
22:06:11 [DEBUG] [nioEventLoopGroup-3-1] c.i.o.EventLoopTest - wangwu
可以看到兩個工人輪流處理 channel播玖,但工人與 channel 之間進行了綁定
再增加兩個非 nio 工人
// 細分2:創(chuàng)建一個獨立的 EventLoopGroup
EventLoopGroup group = new DefaultEventLoopGroup(2);
new ServerBootstrap()
// boss 和 worker
// 細分1:boss 只負責(zé) ServerSocketChannel 上 accept 事件 worker 只負責(zé) socketChannel 上的讀寫
.group(new NioEventLoopGroup(1), new NioEventLoopGroup(2))
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
ch.pipeline().addLast("handler1", new ChannelInboundHandlerAdapter() {
@Override // ByteBuf
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf) msg;
log.debug(buf.toString(Charset.defaultCharset()));
ctx.fireChannelRead(msg); // 讓消息傳遞給下一個handler
}
}).addLast(group, "handler2", new ChannelInboundHandlerAdapter() {
@Override // ByteBuf
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf) msg;
log.debug(buf.toString(Charset.defaultCharset()));
}
});
}
})
.bind(8080);
客戶端代碼不變,啟動三次饭于,分別修改發(fā)送字符串為 zhangsan(第一次)蜀踏,lisi(第二次),wangwu(第三次)掰吕,輸出:
10:58:25.047 [nioEventLoopGroup-4-1] DEBUG cn.x5456.netty.rumen.study.EventLoopServer - zhangsan
10:58:25.048 [defaultEventLoopGroup-2-1] DEBUG cn.x5456.netty.rumen.study.EventLoopServer - zhangsan
10:58:43.232 [nioEventLoopGroup-4-2] DEBUG cn.x5456.netty.rumen.study.EventLoopServer - lisi
10:58:43.232 [defaultEventLoopGroup-2-2] DEBUG cn.x5456.netty.rumen.study.EventLoopServer - lisi
10:58:53.032 [nioEventLoopGroup-4-1] DEBUG cn.x5456.netty.rumen.study.EventLoopServer - wangwu
10:58:53.032 [defaultEventLoopGroup-2-1] DEBUG cn.x5456.netty.rumen.study.EventLoopServer - wangwu
可以看到果覆,nio 工人和 非 nio 工人也分別綁定了 channel(LoggingHandler 由 nio 工人執(zhí)行,而我們自己的 handler 由非 nio 工人執(zhí)行)
?? handler 執(zhí)行中如何換人殖熟?
關(guān)鍵代碼 io.netty.channel.AbstractChannelHandlerContext#invokeChannelRead()
static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
// 下一個 handler 的事件循環(huán)是否與當前的事件循環(huán)是同一個線程
EventExecutor executor = next.executor();
// 是局待,直接調(diào)用
if (executor.inEventLoop()) {
next.invokeChannelRead(m);
}
// 不是,將要執(zhí)行的代碼作為任務(wù)提交給下一個事件循環(huán)處理(換人)
else {
executor.execute(new Runnable() {
@Override
public void run() {
next.invokeChannelRead(m);
}
});
}
}
- 如果兩個 handler 綁定的是同一個線程菱属,那么就直接調(diào)用
- 否則燎猛,把要調(diào)用的代碼封裝為一個任務(wù)對象,由下一個 handler 的線程來調(diào)用
演示 NioEventLoop 處理普通任務(wù)
NioEventLoop 除了可以處理 io 事件照皆,同樣可以向它提交普通任務(wù)
NioEventLoopGroup nioWorkers = new NioEventLoopGroup(2);
log.debug("server start...");
Thread.sleep(2000);
nioWorkers.execute(()->{
log.debug("normal task...");
});
輸出
22:30:36 [DEBUG] [main] c.i.o.EventLoopTest2 - server start...
22:30:38 [DEBUG] [nioEventLoopGroup-2-1] c.i.o.EventLoopTest2 - normal task...
可以用來執(zhí)行耗時較長的任務(wù)
演示 NioEventLoop 處理定時任務(wù)
NioEventLoopGroup nioWorkers = new NioEventLoopGroup(2);
log.debug("server start...");
Thread.sleep(2000);
nioWorkers.scheduleAtFixedRate(() -> {
log.debug("running...");
}, 0, 1, TimeUnit.SECONDS);
輸出
22:35:15 [DEBUG] [main] c.i.o.EventLoopTest2 - server start...
22:35:17 [DEBUG] [nioEventLoopGroup-2-1] c.i.o.EventLoopTest2 - running...
22:35:18 [DEBUG] [nioEventLoopGroup-2-1] c.i.o.EventLoopTest2 - running...
22:35:19 [DEBUG] [nioEventLoopGroup-2-1] c.i.o.EventLoopTest2 - running...
22:35:20 [DEBUG] [nioEventLoopGroup-2-1] c.i.o.EventLoopTest2 - running...
...
可以用來執(zhí)行定時任務(wù)
3.2 Channel
channel 的主要方法:
- close() 可以用來關(guān)閉 channel
- closeFuture() 用來處理 channel 的關(guān)閉重绷,關(guān)閉之后做一些善后工作
- sync 方法作用是同步等待 channel 關(guān)閉
- 而 addListener 方法是異步等待 channel 關(guān)閉
- pipeline() 方法添加處理器
- write() 方法將數(shù)據(jù)寫入
- writeAndFlush() 方法將數(shù)據(jù)寫入并刷出
ChannelFuture
這時剛才的客戶端代碼
new Bootstrap()
.group(new NioEventLoopGroup())
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel ch) {
ch.pipeline().addLast(new StringEncoder());
}
})
.connect("127.0.0.1", 8080)
.sync()
.channel()
.writeAndFlush(new Date() + ": hello world!");
現(xiàn)在把它拆開來看
ChannelFuture channelFuture = new Bootstrap()
.group(new NioEventLoopGroup())
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel ch) {
ch.pipeline().addLast(new StringEncoder());
}
})
.connect("127.0.0.1", 8080); // 1
// 感覺相當于 Future#get()
channelFuture.sync();
Channel channel = channelFuture.channel();
log.info("channel:「{}」", channel);
channel.writeAndFlush("wangwu");
- 1 處返回的是 ChannelFuture 對象,它的作用是利用 channel() 方法來獲取 Channel 對象
注意 :connect 方法是異步的膜毁,意味著不等連接建立昭卓,方法執(zhí)行就返回了。因此 channelFuture 對象中不能【立刻】獲得到正確的 Channel 對象
實驗如下:
ChannelFuture channelFuture = new Bootstrap()
.group(new NioEventLoopGroup())
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel ch) {
ch.pipeline().addLast(new StringEncoder());
}
})
.connect("127.0.0.1", 8080);
System.out.println(channelFuture.channel()); // 1
channelFuture.sync(); // 2
System.out.println(channelFuture.channel()); // 3
- 執(zhí)行到 1 時瘟滨,連接未建立候醒,打印
[id: 0x2e1884dd]
- 執(zhí)行到 2 時,sync 方法是同步等待連接建立完成
- 執(zhí)行到 3 時杂瘸,連接肯定建立了倒淫,打印
[id: 0x2e1884dd, L:/127.0.0.1:57191 - R:/127.0.0.1:8080]
除了用 sync 方法可以讓異步操作同步以外,還可以使用回調(diào)的方式:
ChannelFuture channelFuture = new Bootstrap()
.group(new NioEventLoopGroup())
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel ch) {
ch.pipeline().addLast(new StringEncoder());
}
})
.connect("127.0.0.1", 8080);
System.out.println(channelFuture.channel()); // 1
channelFuture.addListener((ChannelFutureListener) future -> {
// 不過此時使用的線程則是建立連接的那個 EventLoop 線程
System.out.println(future.channel()); // 2
});
- 執(zhí)行到 1 時败玉,連接未建立敌土,打印
[id: 0x749124ba]
- ChannelFutureListener 會在連接建立時被調(diào)用(其中 operationComplete 方法)镜硕,因此執(zhí)行到 2 時,連接肯定建立了返干,打印
[id: 0x749124ba, L:/127.0.0.1:57351 - R:/127.0.0.1:8080]
CloseFuture
@Slf4j
public class CloseFutureClient {
public static void main(String[] args) throws InterruptedException {
NioEventLoopGroup group new NioEventLoopGroup();
ChannelFuture channelFuture = new Bootstrap()
.group(group)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<NioSocketChannel>() {
@Override // 在連接建立后被調(diào)用
protected void initChannel(NioSocketChannel ch) throws Exception {
ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));
ch.pipeline().addLast(new StringEncoder());
}
})
.connect(new InetSocketAddress("localhost", 8080));
Channel channel = channelFuture.sync().channel();
log.debug("{}", channel);
new Thread(()->{
Scanner scanner = new Scanner(System.in);
while (true) {
String line = scanner.nextLine();
if ("q".equals(line)) {
channel.close(); // close 異步操作 1s 之后
// log.debug("處理關(guān)閉之后的操作"); // 不能在這里善后
break;
}
channel.writeAndFlush(line);
}
}, "input").start();
// 獲取 CloseFuture 對象兴枯, 1) 同步處理關(guān)閉, 2) 異步處理關(guān)閉
ChannelFuture closeFuture = channel.closeFuture();
/*log.debug("waiting close...");
closeFuture.sync();
log.debug("處理關(guān)閉之后的操作");*/
closeFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
log.debug("處理關(guān)閉之后的操作");
group.shutdownGracefully();
}
});
}
}
?? 異步提升的是什么
- 單線程沒法異步提高效率矩欠,必須配合多線程财剖、多核 cpu 才能發(fā)揮異步的優(yōu)勢,從而提升吞吐量
- 異步并沒有縮短響應(yīng)時間癌淮,反而有所增加
- 合理進行任務(wù)拆分躺坟,也是利用異步的關(guān)鍵
3.3 Future & Promise
在異步處理時,經(jīng)常用到這兩個接口
首先要說明 netty 中的 Future 與 jdk 中的 Future 同名乳蓄,但是是兩個接口咪橙,netty 的 Future 繼承自 jdk 的 Future,而 Promise 又對 netty Future 進行了擴展
- jdk Future 只能同步等待任務(wù)結(jié)束(或成功栓袖、或失斚徽)才能得到結(jié)果
- netty Future 可以同步等待任務(wù)結(jié)束得到結(jié)果,也可以異步方式得到結(jié)果(Listener)裹刮,但都是要等任務(wù)結(jié)束
- netty Promise 不僅有 netty Future 的功能音榜,而且脫離了任務(wù)獨立存在(和任務(wù)無關(guān),不用等任務(wù)結(jié)束)捧弃,只作為兩個線程間傳遞結(jié)果的容器
功能/名稱 | jdk Future | netty Future | Promise |
---|---|---|---|
cancel | 取消任務(wù) | - | - |
isCanceled | 任務(wù)是否取消 | - | - |
isDone | 任務(wù)是否完成赠叼,不能區(qū)分成功失敗 | - | - |
get | 獲取任務(wù)結(jié)果,阻塞等待 | - | - |
getNow | - | 獲取任務(wù)結(jié)果违霞,非阻塞嘴办,還未產(chǎn)生結(jié)果時返回 null | - |
await | - | 等待任務(wù)結(jié)束,如果任務(wù)失敗买鸽,不會拋異常涧郊,而是通過 isSuccess 判斷 | - |
sync | - | 等待任務(wù)結(jié)束,如果任務(wù)失敗眼五,拋出異常 | - |
isSuccess | - | 判斷任務(wù)是否成功 | - |
cause | - | 獲取失敗信息妆艘,非阻塞,如果沒有失敗看幼,返回null | - |
addLinstener | - | 添加回調(diào)批旺,異步接收結(jié)果 | - |
setSuccess | - | - | 設(shè)置成功結(jié)果 |
setFailure | - | - | 設(shè)置失敗結(jié)果 |
3.4 Handler & Pipeline
ChannelHandler 用來處理 Channel 上的各種事件,分為入站诵姜、出站兩種汽煮。所有 ChannelHandler 被連成一串,就是 Pipeline
- 入站處理器通常是 ChannelInboundHandlerAdapter 的子類,主要用來讀取客戶端數(shù)據(jù)暇赤,寫回結(jié)果
- 出站處理器通常是 ChannelOutboundHandlerAdapter 的子類心例,主要對寫回結(jié)果進行加工
打個比喻,每個 Channel 是一個產(chǎn)品的加工車間翎卓,Pipeline 是車間中的流水線契邀,ChannelHandler 就是流水線上的各道工序摆寄,而后面要講的 ByteBuf 是原材料失暴,經(jīng)過很多工序的加工:先經(jīng)過一道道入站工序,再經(jīng)過一道道出站工序最終變成產(chǎn)品
先搞清楚順序微饥,服務(wù)端
new ServerBootstrap()
.group(new NioEventLoopGroup())
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<NioSocketChannel>() {
protected void initChannel(NioSocketChannel ch) {
ch.pipeline().addLast(new ChannelInboundHandlerAdapter(){
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
System.out.println(1);
// 將數(shù)據(jù)傳遞給下個 handler逗扒,如果不調(diào)用,調(diào)用鏈會斷開
// 或者調(diào)用 super.channelRead(ctx, msg);
ctx.fireChannelRead(msg); // 1
}
});
ch.pipeline().addLast(new ChannelInboundHandlerAdapter(){
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
System.out.println(2);
ctx.fireChannelRead(msg); // 2
}
});
ch.pipeline().addLast(new ChannelInboundHandlerAdapter(){
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
System.out.println(3);
// 這個地方就不需要調(diào)用 ctx.fireChannelRead(msg); 了欠橘,因為沒有入站處理器了
ctx.channel().write(msg); // 3
}
});
ch.pipeline().addLast(new ChannelOutboundHandlerAdapter(){
@Override
public void write(ChannelHandlerContext ctx, Object msg,
ChannelPromise promise) {
System.out.println(4);
ctx.write(msg, promise); // 4
}
});
ch.pipeline().addLast(new ChannelOutboundHandlerAdapter(){
@Override
public void write(ChannelHandlerContext ctx, Object msg,
ChannelPromise promise) {
System.out.println(5);
ctx.write(msg, promise); // 5
}
});
ch.pipeline().addLast(new ChannelOutboundHandlerAdapter(){
@Override
public void write(ChannelHandlerContext ctx, Object msg,
ChannelPromise promise) {
System.out.println(6);
ctx.write(msg, promise); // 6
}
});
}
})
.bind(8080);
客戶端
new Bootstrap()
.group(new NioEventLoopGroup())
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel ch) {
ch.pipeline().addLast(new StringEncoder());
}
})
.connect("127.0.0.1", 8080)
.addListener((ChannelFutureListener) future -> {
future.channel().writeAndFlush("hello,world");
});
服務(wù)器端打泳丶纭:
1
2
3
6
5
4
可以看到,ChannelInboundHandlerAdapter 是按照 addLast 的順序執(zhí)行的肃续,而 ChannelOutboundHandlerAdapter 是按照 addLast 的逆序執(zhí)行的黍檩。ChannelPipeline 的實現(xiàn)是一個 ChannelHandlerContext(包裝了 ChannelHandler) 組成的雙向鏈表
- 入站處理器中,ctx.fireChannelRead(msg) 是 調(diào)用下一個入站處理器
- 如果注釋掉 1 處代碼始锚,則僅會打印 1
- 如果注釋掉 2 處代碼刽酱,則僅會打印 1 2
- 3 處的 ctx.channel().write(msg) 會 從尾部開始觸發(fā) 后續(xù)出站處理器的執(zhí)行
- 如果注釋掉 3 處代碼,則僅會打印 1 2 3
- 類似的瞧捌,出站處理器中棵里,ctx.write(msg, promise) 的調(diào)用也會 觸發(fā)上一個出站處理器
- 如果注釋掉 6 處代碼,則僅會打印 1 2 3 6
-
ctx.channel().write(msg)
vsctx.write(msg)
- 都是觸發(fā)出站處理器的執(zhí)行
- ctx.channel().write(msg) 從尾部開始查找出站處理器
- ctx.write(msg) 是從當前節(jié)點找上一個出站處理器
- 3 處的 ctx.channel().write(msg) 如果改為 ctx.write(msg) 僅會打印 1 2 3姐呐,因為節(jié)點3 之前沒有其它出站處理器了
- 6 處的 ctx.write(msg, promise) 如果改為 ctx.channel().write(msg) 會打印 1 2 3 6 6 6... 因為 ctx.channel().write() 是從尾部開始查找殿怜,結(jié)果又是節(jié)點6 自己
圖1 - 服務(wù)端 pipeline 觸發(fā)的原始流程,圖中數(shù)字代表了處理步驟的先后次序
3.5 ByteBuf
是對字節(jié)數(shù)據(jù)的封裝
1)創(chuàng)建
ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(10);
log(buffer);
// 正常開發(fā)時建議使用 ctx.alloc() 創(chuàng)建 ByteBuf
ByteBuf response = ctx.alloc().buffer();
上面代碼創(chuàng)建了一個默認的 ByteBuf(池化基于直接內(nèi)存的 ByteBuf)曙砂,初始容量是 10
輸出
read index:0 write index:0 capacity:10
其中 log 方法參考如下
private static void log(ByteBuf buffer) {
int length = buffer.readableBytes();
int rows = length / 16 + (length % 15 == 0 ? 0 : 1) + 4;
StringBuilder buf = new StringBuilder(rows * 80 * 2)
.append("read index:").append(buffer.readerIndex())
.append(" write index:").append(buffer.writerIndex())
.append(" capacity:").append(buffer.capacity())
.append(NEWLINE);
appendPrettyHexDump(buf, buffer);
System.out.println(buf.toString());
}
2)直接內(nèi)存 vs 堆內(nèi)存
可以使用下面的代碼來創(chuàng)建池化基于堆的 ByteBuf
ByteBuf buffer = ByteBufAllocator.DEFAULT.heapBuffer(10);
也可以使用下面的代碼來創(chuàng)建池化基于直接內(nèi)存的 ByteBuf
ByteBuf buffer = ByteBufAllocator.DEFAULT.directBuffer(10);
- 直接內(nèi)存創(chuàng)建和銷毀的代價昂貴头谜,但讀寫性能高(從磁盤\網(wǎng)卡讀取時,少一次內(nèi)存復(fù)制)鸠澈,適合配合池化功能一起用
- 直接內(nèi)存對 GC 壓力小柱告,因為這部分內(nèi)存不受 JVM 垃圾回收的管理,但也要注意及時主動釋放
3)池化 vs 非池化
池化(例如:數(shù)據(jù)庫連接池)的最大意義在于可以重用 ByteBuf款侵,優(yōu)點有
- 沒有池化末荐,則每次都得創(chuàng)建新的 ByteBuf 實例,這個操作對直接內(nèi)存代價昂貴新锈,就算是堆內(nèi)存甲脏,也會增加 GC 壓力
- 有了池化,則可以重用池中 ByteBuf 實例,并且采用了與 jemalloc 類似的內(nèi)存分配算法提升分配效率
- 高并發(fā)時块请,池化功能更節(jié)約內(nèi)存娜氏,減少內(nèi)存溢出的可能
池化功能是否開啟,可以通過下面的系統(tǒng)環(huán)境變量來設(shè)置
-Dio.netty.allocator.type={unpooled|pooled}
4)組成
ByteBuf 由四部分組成
最開始讀寫指針都在 0 位置
5)寫入
方法列表墩新,省略一些不重要的方法
方法簽名 | 含義 | 備注 |
---|---|---|
writeBoolean(boolean value) | 寫入 boolean 值 | 用一字節(jié) 01|00 代表 true|false |
writeByte(int value) | 寫入 byte 值 | |
writeShort(int value) | 寫入 short 值 | |
writeInt(int value) | 寫入 int 值 | Big Endian贸弥,即 0x250,寫入后 00 00 02 50 |
writeIntLE(int value) | 寫入 int 值 | Little Endian海渊,即 0x250绵疲,寫入后 50 02 00 00 |
writeLong(long value) | 寫入 long 值 | |
writeChar(int value) | 寫入 char 值 | |
writeFloat(float value) | 寫入 float 值 | |
writeDouble(double value) | 寫入 double 值 | |
writeBytes(ByteBuf src) | 寫入 netty 的 ByteBuf | |
writeBytes(byte[] src) | 寫入 byte[] | |
writeBytes(ByteBuffer src) | 寫入 nio 的 ByteBuffer | |
int writeCharSequence(CharSequence sequence, Charset charset) | 寫入字符串 |
注意
- 這些方法的未指明返回值的,其返回值都是 ByteBuf臣疑,意味著可以鏈式調(diào)用
- 網(wǎng)絡(luò)傳輸盔憨,默認習(xí)慣是 Big Endian
先寫入 4 個字節(jié)
buffer.writeBytes(new byte[]{1, 2, 3, 4});
log(buffer);
結(jié)果是
read index:0 write index:4 capacity:10
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 01 02 03 04 |.... |
+--------+-------------------------------------------------+----------------+
再寫入一個 int 整數(shù),也是 4 個字節(jié)
buffer.writeInt(5);
log(buffer);
結(jié)果是
read index:0 write index:8 capacity:10
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 01 02 03 04 00 00 00 05 |........ |
+--------+-------------------------------------------------+----------------+
還有一類方法是 set 開頭的一系列方法讯沈,也可以寫入數(shù)據(jù)郁岩,但不會改變寫指針位置
6)擴容
再寫入一個 int 整數(shù)時,容量不夠了(初始容量是 10)缺狠,這時會引發(fā)擴容
buffer.writeInt(6);
log(buffer);
擴容規(guī)則是
- 如何寫入后數(shù)據(jù)大小未超過 512问慎,則選擇下一個 16 的整數(shù)倍虾啦,例如寫入后大小為 12 梢夯,則擴容后 capacity 是 16
- 如果寫入后數(shù)據(jù)大小超過 512,則選擇下一個 2^n宁玫,例如寫入后大小為 513驮樊,則擴容后 capacity 是 210=1024(29=512 已經(jīng)不夠了)
- 擴容不能超過 max capacity 會報錯(Integer.MAX_VALUE)
結(jié)果是
read index:0 write index:12 capacity:16
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 01 02 03 04 00 00 00 05 00 00 00 06 |............ |
+--------+-------------------------------------------------+----------------+
7)讀取
例如讀了 4 次薇正,每次一個字節(jié)
System.out.println(buffer.readByte());
System.out.println(buffer.readByte());
System.out.println(buffer.readByte());
System.out.println(buffer.readByte());
log(buffer);
讀過的內(nèi)容,就屬于廢棄部分了囚衔,再讀只能讀那些尚未讀取的部分
1
2
3
4
read index:4 write index:12 capacity:16
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 00 00 00 05 00 00 00 06 |........ |
+--------+-------------------------------------------------+----------------+
如果需要重復(fù)讀取 int 整數(shù) 5挖腰,怎么辦?
可以在 read 前先做個標記 mark
buffer.markReaderIndex();
System.out.println(buffer.readInt());
log(buffer);
結(jié)果
5
read index:8 write index:12 capacity:16
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 00 00 00 06 |.... |
+--------+-------------------------------------------------+----------------+
這時要重復(fù)讀取的話练湿,重置到標記位置 reset
buffer.resetReaderIndex();
log(buffer);
這時
read index:4 write index:12 capacity:16
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 00 00 00 05 00 00 00 06 |........ |
+--------+-------------------------------------------------+----------------+
還有種辦法是采用 get 開頭的一系列方法猴仑,這些方法不會改變 read index
8)retain & release
由于 Netty 中有堆外內(nèi)存的 ByteBuf 實現(xiàn),堆外內(nèi)存最好是手動來釋放肥哎,而不是等 GC 垃圾回收辽俗。
- UnpooledHeapByteBuf 使用的是 JVM 內(nèi)存,只需等 GC 回收內(nèi)存即可
- UnpooledDirectByteBuf 使用的就是直接內(nèi)存了篡诽,需要特殊的方法來回收內(nèi)存
- PooledByteBuf 和它的子類使用了池化機制崖飘,需要更復(fù)雜的規(guī)則來回收內(nèi)存
回收內(nèi)存的源碼實現(xiàn),請關(guān)注下面方法的不同實現(xiàn)
AbstractReferenceCountedByteBuf#deallocate()
Netty 這里采用了引用計數(shù)法來控制回收內(nèi)存杈女,每個 ByteBuf 都實現(xiàn)了 ReferenceCounted 接口
- 每個 ByteBuf 對象的初始計數(shù)為 1
- 調(diào)用 release 方法計數(shù)減 1朱浴,如果計數(shù)為 0吊圾,ByteBuf 內(nèi)存被回收
- 調(diào)用 retain 方法計數(shù)加 1,表示調(diào)用者沒用完之前翰蠢,其它 handler 即使調(diào)用了 release 也不會造成回收
- 當計數(shù)為 0 時项乒,底層內(nèi)存會被回收,這時即使 ByteBuf 對象還在梁沧,其各個方法均無法正常使用
誰來負責(zé) release 呢檀何?
不是我們想象的(一般情況下)
ByteBuf buf = ...
try {
...
} finally {
buf.release();
}
請思考,因為 pipeline 的存在廷支,一般需要將 ByteBuf 傳遞給下一個 ChannelHandler频鉴,如果在 finally 中 release 了,就失去了傳遞性(當然酥泞,如果在這個 ChannelHandler 內(nèi)這個 ByteBuf 已完成了它的使命砚殿,那么便無須再傳遞)
基本規(guī)則是啃憎,誰是最后使用者芝囤,誰負責(zé) release,詳細分析如下
- 起點辛萍,對于 NIO 實現(xiàn)來講悯姊,在 io.netty.channel.nio.AbstractNioByteChannel.NioByteUnsafe#read 方法中首次創(chuàng)建 ByteBuf 放入 pipeline(line 163 pipeline.fireChannelRead(byteBuf))
- 入站 ByteBuf 處理原則
- 對原始 ByteBuf 不做處理,調(diào)用 ctx.fireChannelRead(msg) 向后傳遞贩毕,這時無須 release
- 將原始 ByteBuf 轉(zhuǎn)換為其它類型的 Java 對象悯许,這時 ByteBuf 就沒用了,必須 release
- 如果不調(diào)用 ctx.fireChannelRead(msg) 向后傳遞辉阶,那么也必須 release
- 注意各種異常先壕,如果 ByteBuf 沒有成功傳遞到下一個 ChannelHandler,必須 release
- 假設(shè)消息一直向后傳谆甜,那么 TailContext(handler 鏈的尾部) 會負責(zé)釋放未處理消息(原始的 ByteBuf)
- 出站 ByteBuf 處理原則
- 出站消息最終都會轉(zhuǎn)為 ByteBuf 輸出垃僚,一直向前傳,由 HeadContext(handler 鏈的頭部) flush 后 release
- 異常處理原則
- 有時候不清楚 ByteBuf 被引用了多少次规辱,但又必須徹底釋放谆棺,可以循環(huán)調(diào)用 release 直到返回 true
TailContext 釋放未處理消息邏輯
// io.netty.channel.DefaultChannelPipeline#onUnhandledInboundMessage(java.lang.Object)
protected void onUnhandledInboundMessage(Object msg) {
try {
logger.debug(
"Discarded inbound message {} that reached at the tail of the pipeline. " +
"Please check your pipeline configuration.", msg);
} finally {
ReferenceCountUtil.release(msg);
}
}
具體代碼
// io.netty.util.ReferenceCountUtil#release(java.lang.Object)
public static boolean release(Object msg) {
if (msg instanceof ReferenceCounted) {
return ((ReferenceCounted) msg).release();
}
return false;
}
9)slice
【零拷貝】的體現(xiàn)之一,對原始 ByteBuf 進行切片成多個 ByteBuf罕袋,切片后的 ByteBuf 并沒有發(fā)生內(nèi)存復(fù)制改淑,還是使用原始 ByteBuf 的內(nèi)存,切片后的 ByteBuf 維護獨立的 read浴讯,write 指針
例朵夏,原始 ByteBuf 進行一些初始操作
ByteBuf origin = ByteBufAllocator.DEFAULT.buffer(10);
origin.writeBytes(new byte[]{1, 2, 3, 4});
origin.readByte();
System.out.println(ByteBufUtil.prettyHexDump(origin));
輸出
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 02 03 04 |... |
+--------+-------------------------------------------------+----------------+
這時調(diào)用 slice 進行切片,無參 slice 是從原始 ByteBuf 的 read index 到 write index 之間的內(nèi)容進行切片榆纽,切片后的 max capacity 被固定為這個區(qū)間的大小仰猖,因此不能追加 write
ByteBuf slice = origin.slice();
System.out.println(ByteBufUtil.prettyHexDump(slice));
// slice.writeByte(5); 如果執(zhí)行询吴,會報 IndexOutOfBoundsException 異常
輸出
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 02 03 04 |... |
+--------+-------------------------------------------------+----------------+
如果原始 ByteBuf 再次讀操作(又讀了一個字節(jié))
origin.readByte();
System.out.println(ByteBufUtil.prettyHexDump(origin));
輸出
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 03 04 |.. |
+--------+-------------------------------------------------+----------------+
這時的 slice 不受影響,因為它有獨立的讀寫指針
System.out.println(ByteBufUtil.prettyHexDump(slice));
輸出
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 02 03 04 |... |
+--------+-------------------------------------------------+----------------+
如果 slice 的內(nèi)容發(fā)生了更改
slice.setByte(2, 5);
System.out.println(ByteBufUtil.prettyHexDump(slice));
輸出
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 02 03 05 |... |
+--------+-------------------------------------------------+----------------+
這時亮元,原始 ByteBuf 也會受影響猛计,因為底層都是同一塊內(nèi)存
System.out.println(ByteBufUtil.prettyHexDump(origin));
輸出
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 03 05 |.. |
+--------+-------------------------------------------------+----------------+
10)duplicate
【零拷貝】的體現(xiàn)之一,就好比截取了原始 ByteBuf 所有內(nèi)容爆捞,并且沒有 max capacity 的限制奉瘤,也是與原始 ByteBuf 使用同一塊底層內(nèi)存,只是讀寫指針是獨立的
11)copy
會將底層內(nèi)存數(shù)據(jù)進行深拷貝煮甥,因此無論讀寫盗温,都與原始 ByteBuf 無關(guān)
12)CompositeByteBuf
【零拷貝】的體現(xiàn)之一,可以將多個 ByteBuf 合并為一個邏輯上的 ByteBuf成肘,避免拷貝
有兩個 ByteBuf 如下
ByteBuf buf1 = ByteBufAllocator.DEFAULT.buffer(5);
buf1.writeBytes(new byte[]{1, 2, 3, 4, 5});
ByteBuf buf2 = ByteBufAllocator.DEFAULT.buffer(5);
buf2.writeBytes(new byte[]{6, 7, 8, 9, 10});
System.out.println(ByteBufUtil.prettyHexDump(buf1));
System.out.println(ByteBufUtil.prettyHexDump(buf2));
輸出
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 01 02 03 04 05 |..... |
+--------+-------------------------------------------------+----------------+
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 06 07 08 09 0a |..... |
+--------+-------------------------------------------------+----------------+
現(xiàn)在需要一個新的 ByteBuf卖局,內(nèi)容來自于剛才的 buf1 和 buf2,如何實現(xiàn)双霍?
方法1:
ByteBuf buf3 = ByteBufAllocator.DEFAULT
.buffer(buf1.readableBytes()+buf2.readableBytes());
buf3.writeBytes(buf1);
buf3.writeBytes(buf2);
System.out.println(ByteBufUtil.prettyHexDump(buf3));
結(jié)果
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 01 02 03 04 05 06 07 08 09 0a |.......... |
+--------+-------------------------------------------------+----------------+
這種方法好不好砚偶?回答是不太好,因為進行了數(shù)據(jù)的內(nèi)存復(fù)制操作
方法2:
CompositeByteBuf buf3 = ByteBufAllocator.DEFAULT.compositeBuffer();
// true 表示增加新的 ByteBuf 自動遞增 write index, 否則 write index 會始終為 0
buf3.addComponents(true, buf1, buf2);
結(jié)果是一樣的
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 01 02 03 04 05 06 07 08 09 0a |.......... |
+--------+-------------------------------------------------+----------------+
CompositeByteBuf 是一個組合的 ByteBuf洒闸,它內(nèi)部維護了一個 Component 數(shù)組染坯,每個 Component 管理一個 ByteBuf,記錄了這個 ByteBuf 相對于整體偏移量等信息丘逸,代表著整體中某一段的數(shù)據(jù)单鹿。
- 優(yōu)點,對外是一個虛擬視圖深纲,組合這些 ByteBuf 不會產(chǎn)生內(nèi)存復(fù)制
- 缺點仲锄,復(fù)雜了很多,多次操作會帶來性能的損耗
13)Unpooled
Unpooled 是一個工具類湃鹊,類如其名儒喊,提供了非池化的 ByteBuf 創(chuàng)建、組合涛舍、復(fù)制等操作
這里僅介紹其跟【零拷貝】相關(guān)的 wrappedBuffer 方法澄惊,可以用來包裝 ByteBuf
ByteBuf buf1 = ByteBufAllocator.DEFAULT.buffer(5);
buf1.writeBytes(new byte[]{1, 2, 3, 4, 5});
ByteBuf buf2 = ByteBufAllocator.DEFAULT.buffer(5);
buf2.writeBytes(new byte[]{6, 7, 8, 9, 10});
// 當包裝 ByteBuf 個數(shù)超過一個時, 底層使用了 CompositeByteBuf
ByteBuf buf3 = Unpooled.wrappedBuffer(buf1, buf2);
System.out.println(ByteBufUtil.prettyHexDump(buf3));
輸出
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 01 02 03 04 05 06 07 08 09 0a |.......... |
+--------+-------------------------------------------------+----------------+
也可以用來包裝普通字節(jié)數(shù)組,底層也不會有拷貝操作
ByteBuf buf4 = Unpooled.wrappedBuffer(new byte[]{1, 2, 3}, new byte[]{4, 5, 6});
System.out.println(buf4.getClass());
System.out.println(ByteBufUtil.prettyHexDump(buf4));
輸出
class io.netty.buffer.CompositeByteBuf
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 01 02 03 04 05 06 |...... |
+--------+-------------------------------------------------+----------------+
?? ByteBuf 優(yōu)勢
- 池化 - 可以重用池中 ByteBuf 實例富雅,更節(jié)約內(nèi)存掸驱,減少內(nèi)存溢出的可能
- 讀寫指針分離,不需要像 ByteBuffer 一樣切換讀寫模式
- 可以自動擴容
- 支持鏈式調(diào)用没佑,使用更流暢
- 很多地方體現(xiàn)零拷貝毕贼,例如 slice、duplicate蛤奢、CompositeByteBuf
??讀和寫的誤解
我最初在認識上有這樣的誤區(qū)鬼癣,認為只有在 netty陶贼,nio 這樣的多路復(fù)用 IO 模型時,讀寫才不會相互阻塞待秃,才可以實現(xiàn)高效的雙向通信拜秧,但實際上,Java Socket 是全雙工的:在任意時刻章郁,線路上存在A 到 B
和 B 到 A
的雙向信號傳輸枉氮。即使是阻塞 IO,讀和寫是可以同時進行的暖庄,只要分別采用讀線程和寫線程即可聊替,讀不會阻塞寫、寫也不會阻塞讀