Netty 入門


title: Netty 入門
date: 2021/04/06 09:37


一期奔、Netty 概述

1.1 Netty 是什么追城?

Netty is an asynchronous event-driven network application framework
for rapid development of maintainable high performance protocol servers & clients.

Netty 是一個異步的、基于事件驅(qū)動的網(wǎng)絡(luò)應(yīng)用框架供炎,用于快速開發(fā)可維護、高性能的網(wǎng)絡(luò)服務(wù)器和客戶端。

1.2 Netty 的優(yōu)勢

Netty vs NIO度苔,工作量大,bug 多

  • 需要自己構(gòu)建協(xié)議
  • 解決 TCP 傳輸問題浑度,如粘包寇窑、半包
  • epoll 空輪詢導(dǎo)致 CPU 100%
  • 對 API 進行增強,使之更易用箩张,如 FastThreadLocal => ThreadLocal甩骏,ByteBuf => ByteBuffer

二、Hello World

2.1 小目標

開發(fā)一個簡單的服務(wù)器端和客戶端

  • 客戶端向服務(wù)器端發(fā)送 hello, world
  • 服務(wù)器僅接收先慷,不返回

加入依賴

<dependency>
    <groupId>io.netty</groupId>
    <artifactId>netty-all</artifactId>
    <version>4.1.39.Final</version>
</dependency>

2.2 服務(wù)器端

new ServerBootstrap()
    .group(new NioEventLoopGroup()) // 1
    .channel(NioServerSocketChannel.class) // 2
    .childHandler(new ChannelInitializer<NioSocketChannel>() { // 3
        protected void initChannel(NioSocketChannel ch) {
            ch.pipeline().addLast(new StringDecoder()); // 5
            ch.pipeline().addLast(new SimpleChannelInboundHandler<String>() { // 6
                @Override
                protected void channelRead0(ChannelHandlerContext ctx, String msg) {
                    System.out.println(msg);
                }
            });
        }
    })
    .bind(8080); // 4

代碼解讀

  • 1 處横漏,創(chuàng)建 NioEventLoopGroup,可以簡單理解為 線程池 + Selector 后面會詳細展開

  • 2 處熟掂,選擇服務(wù) Scoket 實現(xiàn)類缎浇,其中 NioServerSocketChannel 表示基于 NIO 的服務(wù)器端實現(xiàn),其它實現(xiàn)還有

    image
  • 3 處赴肚,為啥方法叫 childHandler素跺,是接下來添加的處理器都是給 SocketChannel 用的,而不是給 ServerSocketChannel誉券。ChannelInitializer 處理器(僅在建立連接時執(zhí)行一次)指厌,它的作用是待客戶端 SocketChannel 建立連接后,執(zhí)行 initChannel 以便添加更多的處理器

  • 4 處踊跟,ServerSocketChannel 綁定的監(jiān)聽端口

  • 5 處踩验,SocketChannel 的處理器,解碼 ByteBuf => String

  • 6 處商玫,SocketChannel 的業(yè)務(wù)處理器箕憾,使用上一個處理器的處理結(jié)果

2.3 客戶端

new Bootstrap()
    .group(new NioEventLoopGroup()) // 1
    .channel(NioSocketChannel.class) // 2
    .handler(new ChannelInitializer<Channel>() { // 3
        @Override
        protected void initChannel(Channel ch) {
            ch.pipeline().addLast(new StringEncoder()); // 8
        }
    })
    .connect("127.0.0.1", 8080) // 4
    .sync() // 5
    .channel() // 6
    .writeAndFlush(new Date() + ": hello world!"); // 7

代碼解讀

  • 1 處,創(chuàng)建 NioEventLoopGroup拳昌,同 Server

  • 2 處袭异,選擇客戶 Socket 實現(xiàn)類,NioSocketChannel 表示基于 NIO 的客戶端實現(xiàn)炬藤,其它實現(xiàn)還有

    image
  • 3 處御铃,添加 SocketChannel 的處理器碴里,ChannelInitializer 處理器(僅在建立連接時執(zhí)行一次),它的作用是待客戶端 SocketChannel 建立連接后上真,執(zhí)行 initChannel 以便添加更多的處理器

  • 4 處咬腋,指定要連接的服務(wù)器和端口

  • 5 處,Netty 中很多方法都是異步的睡互,如 connect帝火,這時需要使用 sync 方法等待 connect 建立連接完畢

  • 6 處,獲取 channel 對象湃缎,它為通道抽象犀填,可以進行數(shù)據(jù)讀寫操作

  • 7 處,寫入消息并清空緩沖區(qū)

  • 8 處嗓违,消息會經(jīng)過通道 handler 處理九巡,這里是將 String => ByteBuf 發(fā)出

  • 數(shù)據(jù)經(jīng)過網(wǎng)絡(luò)傳輸,到達服務(wù)器端蹂季,服務(wù)器端 5 和 6 處的 handler 先后被觸發(fā)冕广,走完一個流程

2.4 流程梳理

image

?? 提示

一開始需要樹立正確的觀念

  • channel 理解為數(shù)據(jù)的通道
  • msg 理解為流動的數(shù)據(jù),最開始輸入是 ByteBuf偿洁,但經(jīng)過 pipeline 的加工撒汉,會變成其它類型對象,最后輸出又變成 ByteBuf
  • handler 理解為數(shù)據(jù)的處理工序
    • 工序有多道涕滋,合在一起就是 pipeline睬辐,pipeline 負責(zé)發(fā)布事件(讀、讀取完成...)傳播給每個 handler宾肺, handler 對自己感興趣的事件進行處理(重寫了相應(yīng)事件處理方法)
    • handler 分 Inbound 和 Outbound 兩類
  • eventLoop 理解為處理數(shù)據(jù)的工人
    • 工人可以管理多個 channel 的 io 操作溯饵,并且一旦工人負責(zé)了某個 channel,就要負責(zé)到底(綁定)
    • 工人既可以執(zhí)行 io 操作锨用,也可以進行普通任務(wù)處理丰刊,每位工人有任務(wù)隊列,隊列里可以堆放多個 channel 的待處理任務(wù)增拥,任務(wù)分為普通任務(wù)啄巧、定時任務(wù)
    • 工人按照 pipeline 順序,依次按照 handler 的規(guī)劃(代碼)處理數(shù)據(jù)掌栅,可以為每道工序指定不同的工人

三秩仆、組件

3.1 EventLoop

事件循環(huán)對象,EventLoop = SingleThreadExecutor + Selector

EventLoop 本質(zhì)是一個單線程執(zhí)行器(同時維護了一個 Selector)渣玲,里面有 run 方法處理 Channel 上源源不斷的 io 事件逗概。

它的繼承關(guān)系比較復(fù)雜

  • 一條線是繼承自 j.u.c.ScheduledExecutorService 因此包含了線程池中所有的方法
  • 另一條線是繼承自 netty 自己的 OrderedEventExecutor,
    • 提供了 boolean inEventLoop(Thread thread) 方法判斷一個線程是否屬于此 EventLoop
    • 提供了 parent 方法來看看自己屬于哪個 EventLoopGroup
image

EventLoopGroup 事件循環(huán)組

EventLoopGroup 是一組 EventLoop忘衍,Channel 一般會調(diào)用 EventLoopGroup 的 register 方法來綁定其中一個 EventLoop逾苫,后續(xù)這個 Channel 上的 io 事件都由此 EventLoop 來處理(保證了 io 事件處理時的線程安全)

繼承自 netty 自己的 EventExecutorGroup

  • 實現(xiàn)了 Iterable 接口提供遍歷 EventLoop 的能力
  • 另有 next 方法獲取集合中下一個 EventLoop

EventLoopGroup 主要有兩個重要實現(xiàn):

  1. DefaultEventLoopGroup 可以處理普通任務(wù)、定時任務(wù)
  2. NioEventLoopGroup 可以處理普通任務(wù)枚钓、定時任務(wù)铅搓、IO 事件

以一個簡單的實現(xiàn)為例:

// 內(nèi)部創(chuàng)建了兩個 EventLoop, 每個 EventLoop 維護一個線程
DefaultEventLoopGroup group = new DefaultEventLoopGroup(2);
System.out.println(group.next());
System.out.println(group.next());
System.out.println(group.next());

輸出

io.netty.channel.DefaultEventLoop@60f82f98
io.netty.channel.DefaultEventLoop@35f983a6
io.netty.channel.DefaultEventLoop@60f82f98

也可以使用 for 循環(huán)

DefaultEventLoopGroup group = new DefaultEventLoopGroup(2);
for (EventExecutor eventLoop : group) {
    System.out.println(eventLoop);
}

輸出

io.netty.channel.DefaultEventLoop@60f82f98
io.netty.channel.DefaultEventLoop@35f983a6

?? 優(yōu)雅關(guān)閉

優(yōu)雅關(guān)閉 shutdownGracefully 方法。該方法會首先切換 EventLoopGroup 到關(guān)閉狀態(tài)從而拒絕新的任務(wù)的加入搀捷,然后在任務(wù)隊列的任務(wù)都處理完成后星掰,停止線程的運行。從而確保整體應(yīng)用是在正常有序的狀態(tài)下退出的

演示 NioEventLoop 處理 io 事件

服務(wù)器端兩個 nio worker 工人

new ServerBootstrap()
    // 細分1:boss 只負責(zé) ServerSocketChannel 上 accept 事件  worker 只負責(zé) socketChannel 上的讀寫
    .group(new NioEventLoopGroup(1), new NioEventLoopGroup(2))
    .channel(NioServerSocketChannel.class)
    .childHandler(new ChannelInitializer<NioSocketChannel>() {
        @Override
        protected void initChannel(NioSocketChannel ch) {
            ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
                @Override
                public void channelRead(ChannelHandlerContext ctx, Object msg) {
                    ByteBuf byteBuf = msg instanceof ByteBuf ? ((ByteBuf) msg) : null;
                    if (byteBuf != null) {
                        byte[] buf = new byte[16];
                        ByteBuf len = byteBuf.readBytes(buf, 0, byteBuf.readableBytes());
                        log.debug(new String(buf));
                    }
                }
            });
        }
    }).bind(8080).sync();

客戶端嫩舟,啟動三次氢烘,分別修改發(fā)送字符串為 zhangsan(第一次),lisi(第二次)家厌,wangwu(第三次)

public static void main(String[] args) throws InterruptedException {
    Channel channel = new Bootstrap()
            .group(new NioEventLoopGroup(1))
            .handler(new ChannelInitializer<NioSocketChannel>() {
                @Override
                protected void initChannel(NioSocketChannel ch) throws Exception {
                    System.out.println("init...");
                    ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));
                }
            })
            .channel(NioSocketChannel.class).connect("localhost", 8080)
            .sync()
            .channel();

    channel.writeAndFlush(ByteBufAllocator.DEFAULT.buffer().writeBytes("wangwu".getBytes()));
    Thread.sleep(2000);
    channel.writeAndFlush(ByteBufAllocator.DEFAULT.buffer().writeBytes("wangwu".getBytes()));

最后輸出

22:03:34 [DEBUG] [nioEventLoopGroup-3-1] c.i.o.EventLoopTest - zhangsan       
22:03:36 [DEBUG] [nioEventLoopGroup-3-1] c.i.o.EventLoopTest - zhangsan       
22:05:36 [DEBUG] [nioEventLoopGroup-3-2] c.i.o.EventLoopTest - lisi           
22:05:38 [DEBUG] [nioEventLoopGroup-3-2] c.i.o.EventLoopTest - lisi           
22:06:09 [DEBUG] [nioEventLoopGroup-3-1] c.i.o.EventLoopTest - wangwu        
22:06:11 [DEBUG] [nioEventLoopGroup-3-1] c.i.o.EventLoopTest - wangwu         

可以看到兩個工人輪流處理 channel播玖,但工人與 channel 之間進行了綁定

image

再增加兩個非 nio 工人

// 細分2:創(chuàng)建一個獨立的 EventLoopGroup
EventLoopGroup group = new DefaultEventLoopGroup(2);
new ServerBootstrap()
        // boss 和 worker
        // 細分1:boss 只負責(zé) ServerSocketChannel 上 accept 事件  worker 只負責(zé) socketChannel 上的讀寫
        .group(new NioEventLoopGroup(1), new NioEventLoopGroup(2))
        .channel(NioServerSocketChannel.class)
        .childHandler(new ChannelInitializer<NioSocketChannel>() {
            @Override
            protected void initChannel(NioSocketChannel ch) throws Exception {
                ch.pipeline().addLast("handler1", new ChannelInboundHandlerAdapter() {
                    @Override                                         // ByteBuf
                    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                        ByteBuf buf = (ByteBuf) msg;
                        log.debug(buf.toString(Charset.defaultCharset()));
                        ctx.fireChannelRead(msg); // 讓消息傳遞給下一個handler
                    }
                }).addLast(group, "handler2", new ChannelInboundHandlerAdapter() {
                    @Override                                         // ByteBuf
                    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                        ByteBuf buf = (ByteBuf) msg;
                        log.debug(buf.toString(Charset.defaultCharset()));
                    }
                });
            }
        })
        .bind(8080);

客戶端代碼不變,啟動三次饭于,分別修改發(fā)送字符串為 zhangsan(第一次)蜀踏,lisi(第二次),wangwu(第三次)掰吕,輸出:

10:58:25.047 [nioEventLoopGroup-4-1] DEBUG cn.x5456.netty.rumen.study.EventLoopServer - zhangsan
10:58:25.048 [defaultEventLoopGroup-2-1] DEBUG cn.x5456.netty.rumen.study.EventLoopServer - zhangsan
10:58:43.232 [nioEventLoopGroup-4-2] DEBUG cn.x5456.netty.rumen.study.EventLoopServer - lisi
10:58:43.232 [defaultEventLoopGroup-2-2] DEBUG cn.x5456.netty.rumen.study.EventLoopServer - lisi
10:58:53.032 [nioEventLoopGroup-4-1] DEBUG cn.x5456.netty.rumen.study.EventLoopServer - wangwu
10:58:53.032 [defaultEventLoopGroup-2-1] DEBUG cn.x5456.netty.rumen.study.EventLoopServer - wangwu

可以看到果覆,nio 工人和 非 nio 工人也分別綁定了 channel(LoggingHandler 由 nio 工人執(zhí)行,而我們自己的 handler 由非 nio 工人執(zhí)行)

image

?? handler 執(zhí)行中如何換人殖熟?

關(guān)鍵代碼 io.netty.channel.AbstractChannelHandlerContext#invokeChannelRead()

static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
    final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
    // 下一個 handler 的事件循環(huán)是否與當前的事件循環(huán)是同一個線程
    EventExecutor executor = next.executor();
    
    // 是局待,直接調(diào)用
    if (executor.inEventLoop()) {
        next.invokeChannelRead(m);
    } 
    // 不是,將要執(zhí)行的代碼作為任務(wù)提交給下一個事件循環(huán)處理(換人)
    else {
        executor.execute(new Runnable() {
            @Override
            public void run() {
                next.invokeChannelRead(m);
            }
        });
    }
}
  • 如果兩個 handler 綁定的是同一個線程菱属,那么就直接調(diào)用
  • 否則燎猛,把要調(diào)用的代碼封裝為一個任務(wù)對象,由下一個 handler 的線程來調(diào)用

演示 NioEventLoop 處理普通任務(wù)

NioEventLoop 除了可以處理 io 事件照皆,同樣可以向它提交普通任務(wù)

NioEventLoopGroup nioWorkers = new NioEventLoopGroup(2);

log.debug("server start...");
Thread.sleep(2000);
nioWorkers.execute(()->{
    log.debug("normal task...");
});

輸出

22:30:36 [DEBUG] [main] c.i.o.EventLoopTest2 - server start...
22:30:38 [DEBUG] [nioEventLoopGroup-2-1] c.i.o.EventLoopTest2 - normal task...

可以用來執(zhí)行耗時較長的任務(wù)

演示 NioEventLoop 處理定時任務(wù)

NioEventLoopGroup nioWorkers = new NioEventLoopGroup(2);

log.debug("server start...");
Thread.sleep(2000);
nioWorkers.scheduleAtFixedRate(() -> {
    log.debug("running...");
}, 0, 1, TimeUnit.SECONDS);

輸出

22:35:15 [DEBUG] [main] c.i.o.EventLoopTest2 - server start...
22:35:17 [DEBUG] [nioEventLoopGroup-2-1] c.i.o.EventLoopTest2 - running...
22:35:18 [DEBUG] [nioEventLoopGroup-2-1] c.i.o.EventLoopTest2 - running...
22:35:19 [DEBUG] [nioEventLoopGroup-2-1] c.i.o.EventLoopTest2 - running...
22:35:20 [DEBUG] [nioEventLoopGroup-2-1] c.i.o.EventLoopTest2 - running...
...

可以用來執(zhí)行定時任務(wù)

3.2 Channel

channel 的主要方法:

  • close() 可以用來關(guān)閉 channel
  • closeFuture() 用來處理 channel 的關(guān)閉重绷,關(guān)閉之后做一些善后工作
    • sync 方法作用是同步等待 channel 關(guān)閉
    • 而 addListener 方法是異步等待 channel 關(guān)閉
  • pipeline() 方法添加處理器
  • write() 方法將數(shù)據(jù)寫入
  • writeAndFlush() 方法將數(shù)據(jù)寫入并刷出

ChannelFuture

這時剛才的客戶端代碼

new Bootstrap()
    .group(new NioEventLoopGroup())
    .channel(NioSocketChannel.class)
    .handler(new ChannelInitializer<Channel>() {
        @Override
        protected void initChannel(Channel ch) {
            ch.pipeline().addLast(new StringEncoder());
        }
    })
    .connect("127.0.0.1", 8080)
    .sync()
    .channel()
    .writeAndFlush(new Date() + ": hello world!");

現(xiàn)在把它拆開來看

ChannelFuture channelFuture = new Bootstrap()
    .group(new NioEventLoopGroup())
    .channel(NioSocketChannel.class)
    .handler(new ChannelInitializer<Channel>() {
        @Override
        protected void initChannel(Channel ch) {
            ch.pipeline().addLast(new StringEncoder());
        }
    })
    .connect("127.0.0.1", 8080); // 1

// 感覺相當于 Future#get()
channelFuture.sync();
Channel channel = channelFuture.channel();
log.info("channel:「{}」", channel);
channel.writeAndFlush("wangwu");
  • 1 處返回的是 ChannelFuture 對象,它的作用是利用 channel() 方法來獲取 Channel 對象

注意 :connect 方法是異步的膜毁,意味著不等連接建立昭卓,方法執(zhí)行就返回了。因此 channelFuture 對象中不能【立刻】獲得到正確的 Channel 對象

實驗如下:

ChannelFuture channelFuture = new Bootstrap()
    .group(new NioEventLoopGroup())
    .channel(NioSocketChannel.class)
    .handler(new ChannelInitializer<Channel>() {
        @Override
        protected void initChannel(Channel ch) {
            ch.pipeline().addLast(new StringEncoder());
        }
    })
    .connect("127.0.0.1", 8080);

System.out.println(channelFuture.channel()); // 1
channelFuture.sync(); // 2
System.out.println(channelFuture.channel()); // 3
  • 執(zhí)行到 1 時瘟滨,連接未建立候醒,打印 [id: 0x2e1884dd]
  • 執(zhí)行到 2 時,sync 方法是同步等待連接建立完成
  • 執(zhí)行到 3 時杂瘸,連接肯定建立了倒淫,打印 [id: 0x2e1884dd, L:/127.0.0.1:57191 - R:/127.0.0.1:8080]

除了用 sync 方法可以讓異步操作同步以外,還可以使用回調(diào)的方式:

ChannelFuture channelFuture = new Bootstrap()
    .group(new NioEventLoopGroup())
    .channel(NioSocketChannel.class)
    .handler(new ChannelInitializer<Channel>() {
        @Override
        protected void initChannel(Channel ch) {
            ch.pipeline().addLast(new StringEncoder());
        }
    })
    .connect("127.0.0.1", 8080);
System.out.println(channelFuture.channel()); // 1
channelFuture.addListener((ChannelFutureListener) future -> {
    // 不過此時使用的線程則是建立連接的那個 EventLoop 線程
    System.out.println(future.channel()); // 2
});
  • 執(zhí)行到 1 時败玉,連接未建立敌土,打印 [id: 0x749124ba]
  • ChannelFutureListener 會在連接建立時被調(diào)用(其中 operationComplete 方法)镜硕,因此執(zhí)行到 2 時,連接肯定建立了返干,打印 [id: 0x749124ba, L:/127.0.0.1:57351 - R:/127.0.0.1:8080]

CloseFuture

@Slf4j
public class CloseFutureClient {
    public static void main(String[] args) throws InterruptedException {
        NioEventLoopGroup group new NioEventLoopGroup();
        ChannelFuture channelFuture = new Bootstrap()
                .group(group)
                .channel(NioSocketChannel.class)
                .handler(new ChannelInitializer<NioSocketChannel>() {
                    @Override // 在連接建立后被調(diào)用
                    protected void initChannel(NioSocketChannel ch) throws Exception {
                        ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));
                        ch.pipeline().addLast(new StringEncoder());
                    }
                })
                .connect(new InetSocketAddress("localhost", 8080));
        Channel channel = channelFuture.sync().channel();
        log.debug("{}", channel);
        new Thread(()->{
            Scanner scanner = new Scanner(System.in);
            while (true) {
                String line = scanner.nextLine();
                if ("q".equals(line)) {
                    channel.close(); // close 異步操作 1s 之后
//                    log.debug("處理關(guān)閉之后的操作"); // 不能在這里善后
                    break;
                }
                channel.writeAndFlush(line);
            }
        }, "input").start();

        // 獲取 CloseFuture 對象兴枯, 1) 同步處理關(guān)閉, 2) 異步處理關(guān)閉
        ChannelFuture closeFuture = channel.closeFuture();
        /*log.debug("waiting close...");
        closeFuture.sync();
        log.debug("處理關(guān)閉之后的操作");*/
        closeFuture.addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture future) throws Exception {
                log.debug("處理關(guān)閉之后的操作");
                group.shutdownGracefully();
            }
        });
    }
}

?? 異步提升的是什么

  • 單線程沒法異步提高效率矩欠,必須配合多線程财剖、多核 cpu 才能發(fā)揮異步的優(yōu)勢,從而提升吞吐量
  • 異步并沒有縮短響應(yīng)時間癌淮,反而有所增加
  • 合理進行任務(wù)拆分躺坟,也是利用異步的關(guān)鍵

3.3 Future & Promise

在異步處理時,經(jīng)常用到這兩個接口

首先要說明 netty 中的 Future 與 jdk 中的 Future 同名乳蓄,但是是兩個接口咪橙,netty 的 Future 繼承自 jdk 的 Future,而 Promise 又對 netty Future 進行了擴展

  • jdk Future 只能同步等待任務(wù)結(jié)束(或成功栓袖、或失斚徽)才能得到結(jié)果
  • netty Future 可以同步等待任務(wù)結(jié)束得到結(jié)果,也可以異步方式得到結(jié)果(Listener)裹刮,但都是要等任務(wù)結(jié)束
  • netty Promise 不僅有 netty Future 的功能音榜,而且脫離了任務(wù)獨立存在(和任務(wù)無關(guān),不用等任務(wù)結(jié)束)捧弃,只作為兩個線程間傳遞結(jié)果的容器
功能/名稱 jdk Future netty Future Promise
cancel 取消任務(wù) - -
isCanceled 任務(wù)是否取消 - -
isDone 任務(wù)是否完成赠叼,不能區(qū)分成功失敗 - -
get 獲取任務(wù)結(jié)果,阻塞等待 - -
getNow - 獲取任務(wù)結(jié)果违霞,非阻塞嘴办,還未產(chǎn)生結(jié)果時返回 null -
await - 等待任務(wù)結(jié)束,如果任務(wù)失敗买鸽,不會拋異常涧郊,而是通過 isSuccess 判斷 -
sync - 等待任務(wù)結(jié)束,如果任務(wù)失敗眼五,拋出異常 -
isSuccess - 判斷任務(wù)是否成功 -
cause - 獲取失敗信息妆艘,非阻塞,如果沒有失敗看幼,返回null -
addLinstener - 添加回調(diào)批旺,異步接收結(jié)果 -
setSuccess - - 設(shè)置成功結(jié)果
setFailure - - 設(shè)置失敗結(jié)果

3.4 Handler & Pipeline

ChannelHandler 用來處理 Channel 上的各種事件,分為入站诵姜、出站兩種汽煮。所有 ChannelHandler 被連成一串,就是 Pipeline

  • 入站處理器通常是 ChannelInboundHandlerAdapter 的子類,主要用來讀取客戶端數(shù)據(jù)暇赤,寫回結(jié)果
  • 出站處理器通常是 ChannelOutboundHandlerAdapter 的子類心例,主要對寫回結(jié)果進行加工

打個比喻,每個 Channel 是一個產(chǎn)品的加工車間翎卓,Pipeline 是車間中的流水線契邀,ChannelHandler 就是流水線上的各道工序摆寄,而后面要講的 ByteBuf 是原材料失暴,經(jīng)過很多工序的加工:先經(jīng)過一道道入站工序,再經(jīng)過一道道出站工序最終變成產(chǎn)品

先搞清楚順序微饥,服務(wù)端

new ServerBootstrap()
    .group(new NioEventLoopGroup())
    .channel(NioServerSocketChannel.class)
    .childHandler(new ChannelInitializer<NioSocketChannel>() {
        protected void initChannel(NioSocketChannel ch) {
            ch.pipeline().addLast(new ChannelInboundHandlerAdapter(){
                @Override
                public void channelRead(ChannelHandlerContext ctx, Object msg) {
                    System.out.println(1);
                   // 將數(shù)據(jù)傳遞給下個 handler逗扒,如果不調(diào)用,調(diào)用鏈會斷開
                  // 或者調(diào)用 super.channelRead(ctx, msg);
                    ctx.fireChannelRead(msg); // 1
                }
            });
            ch.pipeline().addLast(new ChannelInboundHandlerAdapter(){
                @Override
                public void channelRead(ChannelHandlerContext ctx, Object msg) {
                    System.out.println(2);
                    ctx.fireChannelRead(msg); // 2
                }
            });
            ch.pipeline().addLast(new ChannelInboundHandlerAdapter(){
                @Override
                public void channelRead(ChannelHandlerContext ctx, Object msg) {
                    System.out.println(3);
                  // 這個地方就不需要調(diào)用 ctx.fireChannelRead(msg); 了欠橘,因為沒有入站處理器了
                    ctx.channel().write(msg); // 3      
                }
            });
            ch.pipeline().addLast(new ChannelOutboundHandlerAdapter(){
                @Override
                public void write(ChannelHandlerContext ctx, Object msg, 
                                  ChannelPromise promise) {
                    System.out.println(4);
                    ctx.write(msg, promise); // 4
                }
            });
            ch.pipeline().addLast(new ChannelOutboundHandlerAdapter(){
                @Override
                public void write(ChannelHandlerContext ctx, Object msg, 
                                  ChannelPromise promise) {
                    System.out.println(5);
                    ctx.write(msg, promise); // 5
                }
            });
            ch.pipeline().addLast(new ChannelOutboundHandlerAdapter(){
                @Override
                public void write(ChannelHandlerContext ctx, Object msg, 
                                  ChannelPromise promise) {
                    System.out.println(6);
                    ctx.write(msg, promise); // 6
                }
            });
        }
    })
    .bind(8080);

客戶端

new Bootstrap()
    .group(new NioEventLoopGroup())
    .channel(NioSocketChannel.class)
    .handler(new ChannelInitializer<Channel>() {
        @Override
        protected void initChannel(Channel ch) {
            ch.pipeline().addLast(new StringEncoder());
        }
    })
    .connect("127.0.0.1", 8080)
    .addListener((ChannelFutureListener) future -> {
        future.channel().writeAndFlush("hello,world");
    });

服務(wù)器端打泳丶纭:

1
2
3
6
5
4

可以看到,ChannelInboundHandlerAdapter 是按照 addLast 的順序執(zhí)行的肃续,而 ChannelOutboundHandlerAdapter 是按照 addLast 的逆序執(zhí)行的黍檩。ChannelPipeline 的實現(xiàn)是一個 ChannelHandlerContext(包裝了 ChannelHandler) 組成的雙向鏈表

image
  • 入站處理器中,ctx.fireChannelRead(msg) 是 調(diào)用下一個入站處理器
    • 如果注釋掉 1 處代碼始锚,則僅會打印 1
    • 如果注釋掉 2 處代碼刽酱,則僅會打印 1 2
  • 3 處的 ctx.channel().write(msg) 會 從尾部開始觸發(fā) 后續(xù)出站處理器的執(zhí)行
    • 如果注釋掉 3 處代碼,則僅會打印 1 2 3
  • 類似的瞧捌,出站處理器中棵里,ctx.write(msg, promise) 的調(diào)用也會 觸發(fā)上一個出站處理器
    • 如果注釋掉 6 處代碼,則僅會打印 1 2 3 6
  • ctx.channel().write(msg) vs ctx.write(msg)
    • 都是觸發(fā)出站處理器的執(zhí)行
    • ctx.channel().write(msg) 從尾部開始查找出站處理器
    • ctx.write(msg) 是從當前節(jié)點找上一個出站處理器
    • 3 處的 ctx.channel().write(msg) 如果改為 ctx.write(msg) 僅會打印 1 2 3姐呐,因為節(jié)點3 之前沒有其它出站處理器了
    • 6 處的 ctx.write(msg, promise) 如果改為 ctx.channel().write(msg) 會打印 1 2 3 6 6 6... 因為 ctx.channel().write() 是從尾部開始查找殿怜,結(jié)果又是節(jié)點6 自己

圖1 - 服務(wù)端 pipeline 觸發(fā)的原始流程,圖中數(shù)字代表了處理步驟的先后次序

image

3.5 ByteBuf

是對字節(jié)數(shù)據(jù)的封裝

1)創(chuàng)建

ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(10);
log(buffer);

// 正常開發(fā)時建議使用 ctx.alloc() 創(chuàng)建 ByteBuf
ByteBuf response = ctx.alloc().buffer();

上面代碼創(chuàng)建了一個默認的 ByteBuf(池化基于直接內(nèi)存的 ByteBuf)曙砂,初始容量是 10

輸出

read index:0 write index:0 capacity:10

其中 log 方法參考如下

private static void log(ByteBuf buffer) {
    int length = buffer.readableBytes();
    int rows = length / 16 + (length % 15 == 0 ? 0 : 1) + 4;
    StringBuilder buf = new StringBuilder(rows * 80 * 2)
        .append("read index:").append(buffer.readerIndex())
        .append(" write index:").append(buffer.writerIndex())
        .append(" capacity:").append(buffer.capacity())
        .append(NEWLINE);
    appendPrettyHexDump(buf, buffer);
    System.out.println(buf.toString());
}

2)直接內(nèi)存 vs 堆內(nèi)存

可以使用下面的代碼來創(chuàng)建池化基于堆的 ByteBuf

ByteBuf buffer = ByteBufAllocator.DEFAULT.heapBuffer(10);

也可以使用下面的代碼來創(chuàng)建池化基于直接內(nèi)存的 ByteBuf

ByteBuf buffer = ByteBufAllocator.DEFAULT.directBuffer(10);
  • 直接內(nèi)存創(chuàng)建和銷毀的代價昂貴头谜,但讀寫性能高(從磁盤\網(wǎng)卡讀取時,少一次內(nèi)存復(fù)制)鸠澈,適合配合池化功能一起用
  • 直接內(nèi)存對 GC 壓力小柱告,因為這部分內(nèi)存不受 JVM 垃圾回收的管理,但也要注意及時主動釋放

3)池化 vs 非池化

池化(例如:數(shù)據(jù)庫連接池)的最大意義在于可以重用 ByteBuf款侵,優(yōu)點有

  • 沒有池化末荐,則每次都得創(chuàng)建新的 ByteBuf 實例,這個操作對直接內(nèi)存代價昂貴新锈,就算是堆內(nèi)存甲脏,也會增加 GC 壓力
  • 有了池化,則可以重用池中 ByteBuf 實例,并且采用了與 jemalloc 類似的內(nèi)存分配算法提升分配效率
  • 高并發(fā)時块请,池化功能更節(jié)約內(nèi)存娜氏,減少內(nèi)存溢出的可能

池化功能是否開啟,可以通過下面的系統(tǒng)環(huán)境變量來設(shè)置

-Dio.netty.allocator.type={unpooled|pooled}

4)組成

ByteBuf 由四部分組成

image

最開始讀寫指針都在 0 位置

5)寫入

方法列表墩新,省略一些不重要的方法

方法簽名 含義 備注
writeBoolean(boolean value) 寫入 boolean 值 用一字節(jié) 01|00 代表 true|false
writeByte(int value) 寫入 byte 值
writeShort(int value) 寫入 short 值
writeInt(int value) 寫入 int 值 Big Endian贸弥,即 0x250,寫入后 00 00 02 50
writeIntLE(int value) 寫入 int 值 Little Endian海渊,即 0x250绵疲,寫入后 50 02 00 00
writeLong(long value) 寫入 long 值
writeChar(int value) 寫入 char 值
writeFloat(float value) 寫入 float 值
writeDouble(double value) 寫入 double 值
writeBytes(ByteBuf src) 寫入 netty 的 ByteBuf
writeBytes(byte[] src) 寫入 byte[]
writeBytes(ByteBuffer src) 寫入 nio 的 ByteBuffer
int writeCharSequence(CharSequence sequence, Charset charset) 寫入字符串

注意

  • 這些方法的未指明返回值的,其返回值都是 ByteBuf臣疑,意味著可以鏈式調(diào)用
  • 網(wǎng)絡(luò)傳輸盔憨,默認習(xí)慣是 Big Endian

先寫入 4 個字節(jié)

buffer.writeBytes(new byte[]{1, 2, 3, 4});
log(buffer);

結(jié)果是

read index:0 write index:4 capacity:10
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 01 02 03 04                                     |....            |
+--------+-------------------------------------------------+----------------+

再寫入一個 int 整數(shù),也是 4 個字節(jié)

buffer.writeInt(5);
log(buffer);

結(jié)果是

read index:0 write index:8 capacity:10
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 01 02 03 04 00 00 00 05                         |........        |
+--------+-------------------------------------------------+----------------+

還有一類方法是 set 開頭的一系列方法讯沈,也可以寫入數(shù)據(jù)郁岩,但不會改變寫指針位置

6)擴容

再寫入一個 int 整數(shù)時,容量不夠了(初始容量是 10)缺狠,這時會引發(fā)擴容

buffer.writeInt(6);
log(buffer);

擴容規(guī)則是

  • 如何寫入后數(shù)據(jù)大小未超過 512问慎,則選擇下一個 16 的整數(shù)倍虾啦,例如寫入后大小為 12 梢夯,則擴容后 capacity 是 16
  • 如果寫入后數(shù)據(jù)大小超過 512,則選擇下一個 2^n宁玫,例如寫入后大小為 513驮樊,則擴容后 capacity 是 210=1024(29=512 已經(jīng)不夠了)
  • 擴容不能超過 max capacity 會報錯(Integer.MAX_VALUE)

結(jié)果是

read index:0 write index:12 capacity:16
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 01 02 03 04 00 00 00 05 00 00 00 06             |............    |
+--------+-------------------------------------------------+----------------+

7)讀取

例如讀了 4 次薇正,每次一個字節(jié)

System.out.println(buffer.readByte());
System.out.println(buffer.readByte());
System.out.println(buffer.readByte());
System.out.println(buffer.readByte());
log(buffer);

讀過的內(nèi)容,就屬于廢棄部分了囚衔,再讀只能讀那些尚未讀取的部分

1
2
3
4
read index:4 write index:12 capacity:16
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 00 00 00 05 00 00 00 06                         |........        |
+--------+-------------------------------------------------+----------------+

如果需要重復(fù)讀取 int 整數(shù) 5挖腰,怎么辦?

可以在 read 前先做個標記 mark

buffer.markReaderIndex();
System.out.println(buffer.readInt());
log(buffer);

結(jié)果

5
read index:8 write index:12 capacity:16
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 00 00 00 06                                     |....            |
+--------+-------------------------------------------------+----------------+

這時要重復(fù)讀取的話练湿,重置到標記位置 reset

buffer.resetReaderIndex();
log(buffer);

這時

read index:4 write index:12 capacity:16
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 00 00 00 05 00 00 00 06                         |........        |
+--------+-------------------------------------------------+----------------+

還有種辦法是采用 get 開頭的一系列方法猴仑,這些方法不會改變 read index

8)retain & release

由于 Netty 中有堆外內(nèi)存的 ByteBuf 實現(xiàn),堆外內(nèi)存最好是手動來釋放肥哎,而不是等 GC 垃圾回收辽俗。

  • UnpooledHeapByteBuf 使用的是 JVM 內(nèi)存,只需等 GC 回收內(nèi)存即可
  • UnpooledDirectByteBuf 使用的就是直接內(nèi)存了篡诽,需要特殊的方法來回收內(nèi)存
  • PooledByteBuf 和它的子類使用了池化機制崖飘,需要更復(fù)雜的規(guī)則來回收內(nèi)存

回收內(nèi)存的源碼實現(xiàn),請關(guān)注下面方法的不同實現(xiàn)

AbstractReferenceCountedByteBuf#deallocate()

Netty 這里采用了引用計數(shù)法來控制回收內(nèi)存杈女,每個 ByteBuf 都實現(xiàn)了 ReferenceCounted 接口

  • 每個 ByteBuf 對象的初始計數(shù)為 1
  • 調(diào)用 release 方法計數(shù)減 1朱浴,如果計數(shù)為 0吊圾,ByteBuf 內(nèi)存被回收
  • 調(diào)用 retain 方法計數(shù)加 1,表示調(diào)用者沒用完之前翰蠢,其它 handler 即使調(diào)用了 release 也不會造成回收
  • 當計數(shù)為 0 時项乒,底層內(nèi)存會被回收,這時即使 ByteBuf 對象還在梁沧,其各個方法均無法正常使用

誰來負責(zé) release 呢檀何?

不是我們想象的(一般情況下)

ByteBuf buf = ...
try {
    ...
} finally {
    buf.release();
}

請思考,因為 pipeline 的存在廷支,一般需要將 ByteBuf 傳遞給下一個 ChannelHandler频鉴,如果在 finally 中 release 了,就失去了傳遞性(當然酥泞,如果在這個 ChannelHandler 內(nèi)這個 ByteBuf 已完成了它的使命砚殿,那么便無須再傳遞)

基本規(guī)則是啃憎,誰是最后使用者芝囤,誰負責(zé) release,詳細分析如下

  • 起點辛萍,對于 NIO 實現(xiàn)來講悯姊,在 io.netty.channel.nio.AbstractNioByteChannel.NioByteUnsafe#read 方法中首次創(chuàng)建 ByteBuf 放入 pipeline(line 163 pipeline.fireChannelRead(byteBuf))
  • 入站 ByteBuf 處理原則
    • 對原始 ByteBuf 不做處理,調(diào)用 ctx.fireChannelRead(msg) 向后傳遞贩毕,這時無須 release
    • 將原始 ByteBuf 轉(zhuǎn)換為其它類型的 Java 對象悯许,這時 ByteBuf 就沒用了,必須 release
    • 如果不調(diào)用 ctx.fireChannelRead(msg) 向后傳遞辉阶,那么也必須 release
    • 注意各種異常先壕,如果 ByteBuf 沒有成功傳遞到下一個 ChannelHandler,必須 release
    • 假設(shè)消息一直向后傳谆甜,那么 TailContext(handler 鏈的尾部) 會負責(zé)釋放未處理消息(原始的 ByteBuf)
  • 出站 ByteBuf 處理原則
    • 出站消息最終都會轉(zhuǎn)為 ByteBuf 輸出垃僚,一直向前傳,由 HeadContext(handler 鏈的頭部) flush 后 release
  • 異常處理原則
    • 有時候不清楚 ByteBuf 被引用了多少次规辱,但又必須徹底釋放谆棺,可以循環(huán)調(diào)用 release 直到返回 true

TailContext 釋放未處理消息邏輯

// io.netty.channel.DefaultChannelPipeline#onUnhandledInboundMessage(java.lang.Object)
protected void onUnhandledInboundMessage(Object msg) {
    try {
        logger.debug(
            "Discarded inbound message {} that reached at the tail of the pipeline. " +
            "Please check your pipeline configuration.", msg);
    } finally {
        ReferenceCountUtil.release(msg);
    }
}

具體代碼

// io.netty.util.ReferenceCountUtil#release(java.lang.Object)
public static boolean release(Object msg) {
    if (msg instanceof ReferenceCounted) {
        return ((ReferenceCounted) msg).release();
    }
    return false;
}

9)slice

【零拷貝】的體現(xiàn)之一,對原始 ByteBuf 進行切片成多個 ByteBuf罕袋,切片后的 ByteBuf 并沒有發(fā)生內(nèi)存復(fù)制改淑,還是使用原始 ByteBuf 的內(nèi)存,切片后的 ByteBuf 維護獨立的 read浴讯,write 指針

image

例朵夏,原始 ByteBuf 進行一些初始操作

ByteBuf origin = ByteBufAllocator.DEFAULT.buffer(10);
origin.writeBytes(new byte[]{1, 2, 3, 4});
origin.readByte();
System.out.println(ByteBufUtil.prettyHexDump(origin));

輸出

         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 02 03 04                                        |...             |
+--------+-------------------------------------------------+----------------+

這時調(diào)用 slice 進行切片,無參 slice 是從原始 ByteBuf 的 read index 到 write index 之間的內(nèi)容進行切片榆纽,切片后的 max capacity 被固定為這個區(qū)間的大小仰猖,因此不能追加 write

ByteBuf slice = origin.slice();
System.out.println(ByteBufUtil.prettyHexDump(slice));
// slice.writeByte(5); 如果執(zhí)行询吴,會報 IndexOutOfBoundsException 異常

輸出

         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 02 03 04                                        |...             |
+--------+-------------------------------------------------+----------------+

如果原始 ByteBuf 再次讀操作(又讀了一個字節(jié))

origin.readByte();
System.out.println(ByteBufUtil.prettyHexDump(origin));

輸出

         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 03 04                                           |..              |
+--------+-------------------------------------------------+----------------+

這時的 slice 不受影響,因為它有獨立的讀寫指針

System.out.println(ByteBufUtil.prettyHexDump(slice));

輸出

         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 02 03 04                                        |...             |
+--------+-------------------------------------------------+----------------+

如果 slice 的內(nèi)容發(fā)生了更改

slice.setByte(2, 5);
System.out.println(ByteBufUtil.prettyHexDump(slice));

輸出

         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 02 03 05                                        |...             |
+--------+-------------------------------------------------+----------------+

這時亮元,原始 ByteBuf 也會受影響猛计,因為底層都是同一塊內(nèi)存

System.out.println(ByteBufUtil.prettyHexDump(origin));

輸出

         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 03 05                                           |..              |
+--------+-------------------------------------------------+----------------+

10)duplicate

【零拷貝】的體現(xiàn)之一,就好比截取了原始 ByteBuf 所有內(nèi)容爆捞,并且沒有 max capacity 的限制奉瘤,也是與原始 ByteBuf 使用同一塊底層內(nèi)存,只是讀寫指針是獨立的

image

11)copy

會將底層內(nèi)存數(shù)據(jù)進行深拷貝煮甥,因此無論讀寫盗温,都與原始 ByteBuf 無關(guān)

12)CompositeByteBuf

【零拷貝】的體現(xiàn)之一,可以將多個 ByteBuf 合并為一個邏輯上的 ByteBuf成肘,避免拷貝

有兩個 ByteBuf 如下

ByteBuf buf1 = ByteBufAllocator.DEFAULT.buffer(5);
buf1.writeBytes(new byte[]{1, 2, 3, 4, 5});
ByteBuf buf2 = ByteBufAllocator.DEFAULT.buffer(5);
buf2.writeBytes(new byte[]{6, 7, 8, 9, 10});
System.out.println(ByteBufUtil.prettyHexDump(buf1));
System.out.println(ByteBufUtil.prettyHexDump(buf2));

輸出

         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 01 02 03 04 05                                  |.....           |
+--------+-------------------------------------------------+----------------+
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 06 07 08 09 0a                                  |.....           |
+--------+-------------------------------------------------+----------------+

現(xiàn)在需要一個新的 ByteBuf卖局,內(nèi)容來自于剛才的 buf1 和 buf2,如何實現(xiàn)双霍?

方法1:

ByteBuf buf3 = ByteBufAllocator.DEFAULT
    .buffer(buf1.readableBytes()+buf2.readableBytes());
buf3.writeBytes(buf1);
buf3.writeBytes(buf2);
System.out.println(ByteBufUtil.prettyHexDump(buf3));

結(jié)果

         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 01 02 03 04 05 06 07 08 09 0a                   |..........      |
+--------+-------------------------------------------------+----------------+

這種方法好不好砚偶?回答是不太好,因為進行了數(shù)據(jù)的內(nèi)存復(fù)制操作

方法2:

CompositeByteBuf buf3 = ByteBufAllocator.DEFAULT.compositeBuffer();
// true 表示增加新的 ByteBuf 自動遞增 write index, 否則 write index 會始終為 0
buf3.addComponents(true, buf1, buf2);

結(jié)果是一樣的

         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 01 02 03 04 05 06 07 08 09 0a                   |..........      |
+--------+-------------------------------------------------+----------------+

CompositeByteBuf 是一個組合的 ByteBuf洒闸,它內(nèi)部維護了一個 Component 數(shù)組染坯,每個 Component 管理一個 ByteBuf,記錄了這個 ByteBuf 相對于整體偏移量等信息丘逸,代表著整體中某一段的數(shù)據(jù)单鹿。

  • 優(yōu)點,對外是一個虛擬視圖深纲,組合這些 ByteBuf 不會產(chǎn)生內(nèi)存復(fù)制
  • 缺點仲锄,復(fù)雜了很多,多次操作會帶來性能的損耗

13)Unpooled

Unpooled 是一個工具類湃鹊,類如其名儒喊,提供了非池化的 ByteBuf 創(chuàng)建、組合涛舍、復(fù)制等操作

這里僅介紹其跟【零拷貝】相關(guān)的 wrappedBuffer 方法澄惊,可以用來包裝 ByteBuf

ByteBuf buf1 = ByteBufAllocator.DEFAULT.buffer(5);
buf1.writeBytes(new byte[]{1, 2, 3, 4, 5});
ByteBuf buf2 = ByteBufAllocator.DEFAULT.buffer(5);
buf2.writeBytes(new byte[]{6, 7, 8, 9, 10});

// 當包裝 ByteBuf 個數(shù)超過一個時, 底層使用了 CompositeByteBuf
ByteBuf buf3 = Unpooled.wrappedBuffer(buf1, buf2);
System.out.println(ByteBufUtil.prettyHexDump(buf3));

輸出

         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 01 02 03 04 05 06 07 08 09 0a                   |..........      |
+--------+-------------------------------------------------+----------------+

也可以用來包裝普通字節(jié)數(shù)組,底層也不會有拷貝操作

ByteBuf buf4 = Unpooled.wrappedBuffer(new byte[]{1, 2, 3}, new byte[]{4, 5, 6});
System.out.println(buf4.getClass());
System.out.println(ByteBufUtil.prettyHexDump(buf4));

輸出

class io.netty.buffer.CompositeByteBuf
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 01 02 03 04 05 06                               |......          |
+--------+-------------------------------------------------+----------------+

?? ByteBuf 優(yōu)勢

  • 池化 - 可以重用池中 ByteBuf 實例富雅,更節(jié)約內(nèi)存掸驱,減少內(nèi)存溢出的可能
  • 讀寫指針分離,不需要像 ByteBuffer 一樣切換讀寫模式
  • 可以自動擴容
  • 支持鏈式調(diào)用没佑,使用更流暢
  • 很多地方體現(xiàn)零拷貝毕贼,例如 slice、duplicate蛤奢、CompositeByteBuf

??讀和寫的誤解

我最初在認識上有這樣的誤區(qū)鬼癣,認為只有在 netty陶贼,nio 這樣的多路復(fù)用 IO 模型時,讀寫才不會相互阻塞待秃,才可以實現(xiàn)高效的雙向通信拜秧,但實際上,Java Socket 是全雙工的:在任意時刻章郁,線路上存在A 到 BB 到 A 的雙向信號傳輸枉氮。即使是阻塞 IO,讀和寫是可以同時進行的暖庄,只要分別采用讀線程和寫線程即可聊替,讀不會阻塞寫、寫也不會阻塞讀

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末培廓,一起剝皮案震驚了整個濱河市惹悄,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌肩钠,老刑警劉巖泣港,帶你破解...
    沈念sama閱讀 212,718評論 6 492
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異蔬将,居然都是意外死亡爷速,警方通過查閱死者的電腦和手機,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,683評論 3 385
  • 文/潘曉璐 我一進店門霞怀,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人莉给,你說我怎么就攤上這事毙石。” “怎么了颓遏?”我有些...
    開封第一講書人閱讀 158,207評論 0 348
  • 文/不壞的土叔 我叫張陵徐矩,是天一觀的道長。 經(jīng)常有香客問我叁幢,道長滤灯,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 56,755評論 1 284
  • 正文 為了忘掉前任曼玩,我火速辦了婚禮鳞骤,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘黍判。我一直安慰自己豫尽,他們只是感情好,可當我...
    茶點故事閱讀 65,862評論 6 386
  • 文/花漫 我一把揭開白布顷帖。 她就那樣靜靜地躺著美旧,像睡著了一般渤滞。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上榴嗅,一...
    開封第一講書人閱讀 50,050評論 1 291
  • 那天妄呕,我揣著相機與錄音,去河邊找鬼嗽测。 笑死趴腋,一個胖子當著我的面吹牛,可吹牛的內(nèi)容都是我干的论咏。 我是一名探鬼主播优炬,決...
    沈念sama閱讀 39,136評論 3 410
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼厅贪!你這毒婦竟也來了蠢护?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 37,882評論 0 268
  • 序言:老撾萬榮一對情侶失蹤养涮,失蹤者是張志新(化名)和其女友劉穎葵硕,沒想到半個月后,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體贯吓,經(jīng)...
    沈念sama閱讀 44,330評論 1 303
  • 正文 獨居荒郊野嶺守林人離奇死亡懈凹,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 36,651評論 2 327
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了悄谐。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片介评。...
    茶點故事閱讀 38,789評論 1 341
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖爬舰,靈堂內(nèi)的尸體忽然破棺而出们陆,到底是詐尸還是另有隱情,我是刑警寧澤情屹,帶...
    沈念sama閱讀 34,477評論 4 333
  • 正文 年R本政府宣布坪仇,位于F島的核電站,受9級特大地震影響垃你,放射性物質(zhì)發(fā)生泄漏椅文。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 40,135評論 3 317
  • 文/蒙蒙 一惜颇、第九天 我趴在偏房一處隱蔽的房頂上張望皆刺。 院中可真熱鬧,春花似錦官还、人聲如沸芹橡。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,864評論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽林说。三九已至煎殷,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間腿箩,已是汗流浹背豪直。 一陣腳步聲響...
    開封第一講書人閱讀 32,099評論 1 267
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留珠移,地道東北人弓乙。 一個月前我還...
    沈念sama閱讀 46,598評論 2 362
  • 正文 我出身青樓,卻偏偏與公主長得像钧惧,于是被迫代替她去往敵國和親暇韧。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當晚...
    茶點故事閱讀 43,697評論 2 351

推薦閱讀更多精彩內(nèi)容

  • Netty是互聯(lián)網(wǎng)中間件領(lǐng)域使用最廣泛最核心的網(wǎng)絡(luò)通信框架,幾乎所有互聯(lián)網(wǎng)中間件或者大數(shù)據(jù)領(lǐng)域均離不開Netty,...
    鋒Nic閱讀 25,601評論 6 51
  • 一浓瞪、概念 Netty是Jboss提供的一個Java開源框架懈玻,它是基于NIO的網(wǎng)絡(luò)框架,封裝了NIO底層復(fù)雜的實...
    Vic_is_new_Here閱讀 362評論 0 0
  • 在本節(jié)中乾颁,我們將前面講解NIO編程時的時間服務(wù)案例涂乌,改成用Netty來實現(xiàn)。TimeClient發(fā)送“QUERY ...
    沉淪2014閱讀 518評論 0 0
  • 本文為《Netty 入門與實戰(zhàn):仿寫微信 IM 即時通訊系統(tǒng)》 的讀書筆記 是什么 本質(zhì):JBoss做的一個Jar...
    tf2jaguar閱讀 54,041評論 0 1
  • 夜鶯2517閱讀 127,717評論 1 9