Netty 核心模塊組件

Netty說自己是異步事件驅(qū)動的框架弛针,并沒有說網(wǎng)絡(luò)模型用的是異步模型裳朋,異步事件驅(qū)動框架體現(xiàn)在所有的I/O操作是異步的匪蝙,所有的IO調(diào)用會立即返回敛摘,并不保證調(diào)用成功與否门烂,但是調(diào)用會返回ChannelFuture,netty會通過ChannelFuture通知你調(diào)用是成功了還是失敗了亦或是取消了兄淫。

第 6 章 Netty 核心模塊組件

6.1 Bootstrap屯远、ServerBootstrap

  1. Bootstrap 意思是引導(dǎo),一個 Netty 應(yīng)用通常由一個 Bootstrap 開始捕虽,主要作用是配置整個 Netty 程序慨丐,串聯(lián) 各個組件,Netty 中 Bootstrap 類是客戶端程序的啟動引導(dǎo)類泄私,ServerBootstrap 是服務(wù)端啟動引導(dǎo)類
  2. 常見的方法有
    1.public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup)房揭,該方法用于服務(wù)器端备闲, 用來設(shè)置兩個 EventLoop
    2.public B group(EventLoopGroup group) ,該方法用于客戶端捅暴,用來設(shè)置一個 EventLoop
    3.public B channel(Class<? extends C> channelClass)恬砂,該方法用來設(shè)置一個服務(wù)器端的通道實現(xiàn)
    4.public <T> B option(ChannelOption<T> option, T value),用來給 ServerChannel 添加配置
    5.public <T> ServerBootstrap childOption(ChannelOption<T> childOption, T value)蓬痒,用來給接收到的通道添加配置
    6.public ServerBootstrap childHandler(ChannelHandler childHandler)觉既,該方法用來設(shè)置業(yè)務(wù)處理類(自定義的 handler)Handler是對應(yīng)bossGroup,chileHandler對應(yīng)workGroup
    7.public ChannelFuture bind(int inetPort) ,該方法用于服務(wù)器端乳幸,用來設(shè)置占用的端口號
    8.public ChannelFuture connect(String inetHost, int inetPort) 瞪讼,該方法用于客戶端,用來連接服務(wù)器端

6.2 Future粹断、ChannelFuture

Netty 中所有的 IO 操作都是異步的符欠,不能立刻得知消息是否被正確處理。但是可以過一會等它執(zhí)行完成或 者直接注冊一個監(jiān)聽瓶埋,具體的實現(xiàn)就是通過 Future 和 ChannelFutures希柿,他們可以注冊一個監(jiān)聽,當(dāng)操作執(zhí)行成功 或失敗時監(jiān)聽會自動觸發(fā)注冊的監(jiān)聽事件
常見的方法有
Channel channel()养筒,返回當(dāng)前正在進行 IO 操作的通道
ChannelFuture sync()曾撤,等待異步操作執(zhí)行完畢

6.3 Channel

  1. Netty 網(wǎng)絡(luò)通信的組件,能夠用于執(zhí)行網(wǎng)絡(luò) I/O 操作晕粪。
  2. 通過 Channel 可獲得當(dāng)前網(wǎng)絡(luò)連接的通道的狀態(tài)
  3. 通過 Channel 可獲得 網(wǎng)絡(luò)連接的配置參數(shù) (例如接收緩沖區(qū)大屑废ぁ)
  4. Channel 提供異步的網(wǎng)絡(luò) I/O 操作(如建立連接,讀寫巫湘,綁定端口)装悲,異步調(diào)用意味著任何 I/O 調(diào)用都將立即返 回,并且不保證在調(diào)用結(jié)束時所請求的 I/O 操作已完成
  5. 調(diào)用立即返回一個 ChannelFuture 實例尚氛,通過注冊監(jiān)聽器到 ChannelFuture 上诀诊,可以 I/O 操作成功、失敗或取 消時回調(diào)通知調(diào)用方
  6. 支持關(guān)聯(lián) I/O 操作與對應(yīng)的處理程序
  7. 不同協(xié)議阅嘶、不同的阻塞類型的連接都有不同的 Channel 類型與之對應(yīng)属瓣,
    常用的 Channel 類型:
    NioSocketChannel,異步的客戶端 TCP Socket 連接讯柔。
    NioServerSocketChannel抡蛙,異步的服務(wù)器端 TCP Socket 連接。
    NioDatagramChannel磷杏,異步的 UDP 連接溜畅。
    NioSctpChannel捏卓,異步的客戶端 Sctp 連接极祸。
    NioSctpServerChannel慈格,異步的 Sctp 服務(wù)器端連接,這些通道涵蓋了 UDP 和 TCP 網(wǎng)絡(luò) IO 以及文件 IO遥金。

6.4 Selector

  1. Netty 基于 Selector 對象實現(xiàn) I/O 多路復(fù)用浴捆,通過 Selector 一個線程可以監(jiān)聽多個連接的 Channel 事件。
  2. 當(dāng)向一個 Selector 中注冊 Channel 后稿械,Selector 內(nèi)部的機制就可以自動不斷地查詢(Select) 這些注冊的 Channel 是否有已就緒的 I/O 事件(例如可讀选泻,可寫,網(wǎng)絡(luò)連接完成等)美莫,這樣程序就可以很簡單地使用一個 線程高效地管理多個 Channel

6.5 ChannelHandler 及其實現(xiàn)類

  1. ChannelHandler 是一個接口页眯,處理 I/O 事件或攔截 I/O 操作,并將其轉(zhuǎn)發(fā)到其 ChannelPipeline(業(yè)務(wù)處理鏈) 中的下一個處理程序厢呵。
  2. ChannelHandler 本身并沒有提供很多方法窝撵,因為這個接口有許多的方法需要實現(xiàn),方便使用期間襟铭,可以繼承它 的子類
  3. ChannelHandler 及其實現(xiàn)類一覽圖(后)


    image.png
  4. 我們經(jīng)常需要自定義一個 Handler 類去繼承 ChannelInboundHandlerAdapter碌奉,然后通過重寫相應(yīng)方法實現(xiàn)業(yè)務(wù) 邏輯,我們接下來看看一般都需要重寫哪些方法


    image.png

6.6 Pipeline 和 ChannelPipeline

ChannelPipeline 是一個重點:

  1. ChannelPipeline 是一個 Handler 的集合寒砖,它負責(zé)處理和攔截 inbound 或者 outbound 的事件和操作赐劣,相當(dāng)于 一個貫穿 Netty 的鏈。(也可以這樣理解:ChannelPipeline 是 保存 ChannelHandler 的 List哩都,用于處理或攔截 Channel 的入站事件和出站操作)
  2. ChannelPipeline 實現(xiàn)了一種高級形式的攔截過濾器模式魁兼,使用戶可以完全控制事件的處理方式,以及 Channel 中各個的 ChannelHandler 如何相互交互
  3. 在 Netty 中每個 Channel 都有且僅有一個 ChannelPipeline 與之對應(yīng)漠嵌,它們的組成關(guān)系如下


    image.png
  4. 常用方法
    ChannelPipeline addFirst(ChannelHandler... handlers)璃赡,把一個業(yè)務(wù)處理類(handler)添加到鏈中的第一個位置
    ChannelPipeline addLast(ChannelHandler... handlers),把一個業(yè)務(wù)處理類(handler)添加到鏈中的最后一個位置

6.7 ChannelHandlerContext

  1. 保存 Channel 相關(guān)的所有上下文信息献雅,同時關(guān)聯(lián)一個 ChannelHandler 對象
  2. 即 ChannelHandlerContext 中 包 含 一 個 具 體 的 事 件 處 理 器 ChannelHandler 碉考, 同 時 ChannelHandlerContext 中也綁定了對應(yīng)的 pipeline 和 Channel 的信息,方便對 ChannelHandler 進行調(diào)用.
  3. 常用方法


    IMG_2239(20201128-152407).JPG
IMG_2240(20201128-152503).JPG
image.png

6.8 ChannelOption

  1. Netty 在創(chuàng)建 Channel 實例后,一般都需要設(shè)置 ChannelOption 參數(shù)挺身。
  2. ChannelOption 參數(shù)如下:


    image.png

6.9 EventLoopGroup 和其實現(xiàn)類 NioEventLoopGroup

  1. EventLoopGroup 是一組 EventLoop 的抽象侯谁,Netty 為了更好的利用多核 CPU 資源,一般會有多個 EventLoop 同時工作章钾,每個 EventLoop 維護著一個 Selector 實例墙贱。
  2. EventLoopGroup 提供 next 接口,可以從組里面按照一定規(guī)則獲取其中一個 EventLoop 來處理任務(wù)贱傀。在 Netty 服 務(wù) 器 端 編 程 中 虽抄, 我 們 一 般 都 需 要 提 供 兩 個 EventLoopGroup , 例 如 : BossEventLoopGroup 和 WorkerEventLoopGroup碗短。
  3. 通常一個服務(wù)端口即一個 ServerSocketChannel 對應(yīng)一個 Selector 和一個 EventLoop 線程。BossEventLoop 負責(zé) 接收客戶端的連接并將 SocketChannel 交給 WorkerEventLoopGroup 來進行 IO 處理报腔,如下圖所示


    image.png
  4. 常用方法 public NioEventLoopGroup(),構(gòu)造方法
    public Future<?> shutdownGracefully()剖淀,斷開連接纯蛾,關(guān)閉線程

6.10 Unpooled 類

  1. Netty 提供一個專門用來操作緩沖區(qū)(即 Netty 的數(shù)據(jù)容器)的工具類
  2. 常用方法如下所示


    image.png
  3. 舉例說明 Unpooled 獲取 Netty 的數(shù)據(jù)容器 ByteBuf 的基本使用 【案例演示】


    image.png

案例 1

package com.atguigu.netty.buf;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;

public class NettyByteBuf01 {
    public static void main(String[] args) {


        //創(chuàng)建一個ByteBuf
        //說明
        //1. 創(chuàng)建 對象,該對象包含一個數(shù)組arr , 是一個byte[10]
        //2. 在netty 的buffer中纵隔,不需要使用flip 進行反轉(zhuǎn)
        //   底層維護了 readerindex 和 writerIndex
        //3. 通過 readerindex 和  writerIndex 和  capacity翻诉, 將buffer分成三個區(qū)域
        // 0---readerindex 已經(jīng)讀取的區(qū)域
        // readerindex---writerIndex , 可讀的區(qū)域
        // writerIndex -- capacity, 可寫的區(qū)域
        ByteBuf buffer = Unpooled.buffer(10);

        for(int i = 0; i < 10; i++) {
            buffer.writeByte(i);
        }

        System.out.println("capacity=" + buffer.capacity());//10
        //輸出
//        for(int i = 0; i<buffer.capacity(); i++) {
//            System.out.println(buffer.getByte(i));
//        }
        for(int i = 0; i < buffer.capacity(); i++) {
            System.out.println(buffer.readByte());
        }
        System.out.println("執(zhí)行完畢");
    }
}

案例 2

package com.atguigu.netty.buf;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;

import java.nio.charset.Charset;

public class NettyByteBuf02 {
    public static void main(String[] args) {

        //創(chuàng)建ByteBuf
        ByteBuf byteBuf = Unpooled.copiedBuffer("hello,world!", Charset.forName("utf-8"));

        //使用相關(guān)的方法
        if(byteBuf.hasArray()) { // true

            byte[] content = byteBuf.array();

            //將 content 轉(zhuǎn)成字符串
            System.out.println(new String(content, Charset.forName("utf-8")));

            System.out.println("byteBuf=" + byteBuf);

            System.out.println(byteBuf.arrayOffset()); // 0
            System.out.println(byteBuf.readerIndex()); // 0
            System.out.println(byteBuf.writerIndex()); // 12
            System.out.println(byteBuf.capacity()); // 36

            //System.out.println(byteBuf.readByte()); //
            System.out.println(byteBuf.getByte(0)); // 104

            int len = byteBuf.readableBytes(); //可讀的字節(jié)數(shù)  12
            System.out.println("len=" + len);

            //使用for取出各個字節(jié)
            for(int i = 0; i < len; i++) {
                System.out.println((char) byteBuf.getByte(i));
            }

            //按照某個范圍讀取
            System.out.println(byteBuf.getCharSequence(0, 4, Charset.forName("utf-8")));
            System.out.println(byteBuf.getCharSequence(4, 6, Charset.forName("utf-8")));


        }


    }
}

6.11 Netty 應(yīng)用實例-群聊系統(tǒng)

實例要求:

  1. 編寫一個 Netty 群聊系統(tǒng)捌刮,實現(xiàn)服務(wù)器端和客戶端之間的數(shù)據(jù)簡單通訊(非阻塞)
  2. 實現(xiàn)多人群聊
  3. 服務(wù)器端:可以監(jiān)測用戶上線碰煌,離線,并實現(xiàn)消息轉(zhuǎn)發(fā)功能
  4. 客戶端:通過 channel 可以無阻塞發(fā)送消息給其它所有用戶绅作,同時可以接受其它用戶發(fā)送的消息(有服務(wù)器轉(zhuǎn)發(fā) 得到)
  5. 目的:進一步理解 Netty 非阻塞網(wǎng)絡(luò)編程機制


    image.png

GroupChatServer

package com.atguigu.netty.groupchat;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;

public class GroupChatServer {

    private int port; //監(jiān)聽端口


    public GroupChatServer(int port) {
        this.port = port;
    }

    //編寫run方法拄查,處理客戶端的請求
    public void run() throws  Exception{

        //創(chuàng)建兩個線程組
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup(); //8個NioEventLoop

        try {
            ServerBootstrap b = new ServerBootstrap();

            b.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .option(ChannelOption.SO_BACKLOG, 128)
                    .childOption(ChannelOption.SO_KEEPALIVE, true)
                    .childHandler(new ChannelInitializer<SocketChannel>() {

                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {

                            //獲取到pipeline
                            ChannelPipeline pipeline = ch.pipeline();
                            //向pipeline加入解碼器
                            pipeline.addLast("decoder", new StringDecoder());
                            //向pipeline加入編碼器
                            pipeline.addLast("encoder", new StringEncoder());
                            //加入自己的業(yè)務(wù)處理handler
                            pipeline.addLast(new GroupChatServerHandler());

                        }
                    });

            System.out.println("netty 服務(wù)器啟動");
            ChannelFuture channelFuture = b.bind(port).sync();

            //監(jiān)聽關(guān)閉
            channelFuture.channel().closeFuture().sync();
        }finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }

    }

    public static void main(String[] args) throws Exception {

        new GroupChatServer(7000).run();
    }
}

GroupChatServerHandler

package com.atguigu.netty.groupchat;

import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.util.concurrent.GlobalEventExecutor;

import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

public class GroupChatServerHandler extends SimpleChannelInboundHandler<String> {

    //public static List<Channel> channels = new ArrayList<Channel>();

    //使用一個hashmap 管理
    //public static Map<String, Channel> channels = new HashMap<String,Channel>();

    //定義一個channle 組,管理所有的channel
    //GlobalEventExecutor.INSTANCE) 是全局的事件執(zhí)行器棚蓄,是一個單例
    private static ChannelGroup  channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
    SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");


    //handlerAdded 表示連接建立堕扶,一旦連接,第一個被執(zhí)行
    //將當(dāng)前channel 加入到  channelGroup
    @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
        Channel channel = ctx.channel();
        //將該客戶加入聊天的信息推送給其它在線的客戶端
        /*
        該方法會將 channelGroup 中所有的channel 遍歷梭依,并發(fā)送 消息稍算,
        我們不需要自己遍歷
         */
        channelGroup.writeAndFlush("[客戶端]" + channel.remoteAddress() + " 加入聊天" + sdf.format(new java.util.Date()) + " \n");
        channelGroup.add(channel);




    }

    //斷開連接, 將xx客戶離開信息推送給當(dāng)前在線的客戶
    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {

        Channel channel = ctx.channel();
        channelGroup.writeAndFlush("[客戶端]" + channel.remoteAddress() + " 離開了\n");
        System.out.println("channelGroup size" + channelGroup.size());

    }

    //表示channel 處于活動狀態(tài), 提示 xx上線
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {

        System.out.println(ctx.channel().remoteAddress() + " 上線了~");
    }

    //表示channel 處于不活動狀態(tài), 提示 xx離線了
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {

        System.out.println(ctx.channel().remoteAddress() + " 離線了~");
    }

    //讀取數(shù)據(jù)
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {

        //獲取到當(dāng)前channel
        Channel channel = ctx.channel();
        //這時我們遍歷channelGroup, 根據(jù)不同的情況,回送不同的消息

        channelGroup.forEach(ch -> {
            if(channel != ch) { //不是當(dāng)前的channel,轉(zhuǎn)發(fā)消息
                ch.writeAndFlush("[客戶]" + channel.remoteAddress() + " 發(fā)送了消息" + msg + "\n");
            }else {//回顯自己發(fā)送的消息給自己
                ch.writeAndFlush("[自己]發(fā)送了消息" + msg + "\n");
            }
        });
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        //關(guān)閉通道
        ctx.close();
    }
}

GroupChatClient

package com.atguigu.netty.groupchat;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;

import java.util.Scanner;


public class GroupChatClient {

    //屬性
    private final String host;
    private final int port;

    public GroupChatClient(String host, int port) {
        this.host = host;
        this.port = port;
    }

    public void run() throws Exception{
        EventLoopGroup group = new NioEventLoopGroup();

        try {


        Bootstrap bootstrap = new Bootstrap()
                .group(group)
                .channel(NioSocketChannel.class)
                .handler(new ChannelInitializer<SocketChannel>() {

                    @Override
                    protected void initChannel(SocketChannel ch) throws Exception {

                        //得到pipeline
                        ChannelPipeline pipeline = ch.pipeline();
                        //加入相關(guān)handler
                        pipeline.addLast("decoder", new StringDecoder());
                        pipeline.addLast("encoder", new StringEncoder());
                        //加入自定義的handler
                        pipeline.addLast(new GroupChatClientHandler());
                    }
                });

        ChannelFuture channelFuture = bootstrap.connect(host, port).sync();
        //得到channel
            Channel channel = channelFuture.channel();
            System.out.println("-------" + channel.localAddress()+ "--------");
            //客戶端需要輸入信息役拴,創(chuàng)建一個掃描器
            Scanner scanner = new Scanner(System.in);
            while (scanner.hasNextLine()) {
                String msg = scanner.nextLine();
                //通過channel 發(fā)送到服務(wù)器端
                channel.writeAndFlush(msg + "\r\n");
            }
        }finally {
            group.shutdownGracefully();
        }
    }

    public static void main(String[] args) throws Exception {
        new GroupChatClient("127.0.0.1", 7000).run();
    }
}

GroupChatClientHandler

package com.atguigu.netty.groupchat;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;

public class GroupChatClientHandler extends SimpleChannelInboundHandler<String> {
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
        System.out.println(msg.trim());
    }
}

6.12 Netty 心跳檢測機制案例

實例要求:

  1. 編寫一個 Netty 心跳檢測機制案例, 當(dāng)服務(wù)器超過 3 秒沒有讀時糊探,就提示讀空閑
  2. 當(dāng)服務(wù)器超過 5 秒沒有寫操作時,就提示寫空閑
  3. 實現(xiàn)當(dāng)服務(wù)器超過 7 秒沒有讀或者寫操作時河闰,就提示讀寫空閑

MyServer

package com.atguigu.netty.heartbeat;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.handler.timeout.IdleStateHandler;

import java.util.concurrent.TimeUnit;

public class MyServer {
    public static void main(String[] args) throws Exception{


        //創(chuàng)建兩個線程組
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup(); //8個NioEventLoop
        try {

            ServerBootstrap serverBootstrap = new ServerBootstrap();

            serverBootstrap.group(bossGroup, workerGroup);
            serverBootstrap.channel(NioServerSocketChannel.class);
            serverBootstrap.handler(new LoggingHandler(LogLevel.INFO));
            serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {

                @Override
                protected void initChannel(SocketChannel ch) throws Exception {
                    ChannelPipeline pipeline = ch.pipeline();
                    //加入一個netty 提供 IdleStateHandler
                    /*
                    說明
                    1. IdleStateHandler 是netty 提供的處理空閑狀態(tài)的處理器
                    2. long readerIdleTime : 表示多長時間沒有讀, 就會發(fā)送一個心跳檢測包檢測是否連接
                    3. long writerIdleTime : 表示多長時間沒有寫, 就會發(fā)送一個心跳檢測包檢測是否連接
                    4. long allIdleTime : 表示多長時間沒有讀寫, 就會發(fā)送一個心跳檢測包檢測是否連接

                    5. 文檔說明
                    triggers an {@link IdleStateEvent} when a {@link Channel} has not performed
 * read, write, or both operation for a while.
 *                  6. 當(dāng) IdleStateEvent 觸發(fā)后 , 就會傳遞給管道 的下一個handler去處理
 *                  通過調(diào)用(觸發(fā))下一個handler 的 userEventTiggered , 在該方法中去處理 IdleStateEvent(讀空閑科平,寫空閑,讀寫空閑)
                     */
                    pipeline.addLast(new IdleStateHandler(7000,7000,10, TimeUnit.SECONDS));
                    //加入一個對空閑檢測進一步處理的handler(自定義)
                    pipeline.addLast(new MyServerHandler());
                }
            });

            //啟動服務(wù)器
            ChannelFuture channelFuture = serverBootstrap.bind(7000).sync();
            channelFuture.channel().closeFuture().sync();

        }finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
}

MyServerHandler

package com.atguigu.netty.heartbeat;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.timeout.IdleStateEvent;

public class MyServerHandler extends ChannelInboundHandlerAdapter {

    /**
     *
     * @param ctx 上下文
     * @param evt 事件
     * @throws Exception
     */
    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {

        if(evt instanceof IdleStateEvent) {

            //將  evt 向下轉(zhuǎn)型 IdleStateEvent
            IdleStateEvent event = (IdleStateEvent) evt;
            String eventType = null;
            switch (event.state()) {
                case READER_IDLE:
                  eventType = "讀空閑";
                  break;
                case WRITER_IDLE:
                    eventType = "寫空閑";
                    break;
                case ALL_IDLE:
                    eventType = "讀寫空閑";
                    break;
            }
            System.out.println(ctx.channel().remoteAddress() + "--超時時間--" + eventType);
            System.out.println("服務(wù)器做相應(yīng)處理..");

            //如果發(fā)生空閑姜性,我們關(guān)閉通道
           // ctx.channel().close();
        }
    }
}

6.13 Netty 通過 WebSocket 編程實現(xiàn)服務(wù)器和客戶端長連接 實例要求:

  1. Http 協(xié)議是無狀態(tài)的, 瀏覽器和服務(wù)器間的請求響應(yīng)一次瞪慧,下一次會重新創(chuàng)建連接.
  2. 要求:實現(xiàn)基于 webSocket 的長連接的全雙工的交互
  3. 改變 Http 協(xié)議多次請求的約束,實現(xiàn)長連接了部念, 服務(wù)器可以發(fā)送消息給瀏覽器
  4. 客戶端瀏覽器和服務(wù)器端會相互感知弃酌,比如服務(wù)器關(guān)閉了,瀏覽器會感知儡炼,同樣瀏覽器關(guān)閉了妓湘,服務(wù)器會感知
  5. 運行界面


    image.png

MyServer

package com.atguigu.netty.websocket;

import com.atguigu.netty.heartbeat.MyServerHandler;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.handler.stream.ChunkedWriteHandler;
import io.netty.handler.timeout.IdleStateHandler;

import java.util.concurrent.TimeUnit;

public class MyServer {
    public static void main(String[] args) throws Exception{


        //創(chuàng)建兩個線程組
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup(); //8個NioEventLoop
        try {

            ServerBootstrap serverBootstrap = new ServerBootstrap();

            serverBootstrap.group(bossGroup, workerGroup);
            serverBootstrap.channel(NioServerSocketChannel.class);
            serverBootstrap.handler(new LoggingHandler(LogLevel.INFO));
            serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {

                @Override
                protected void initChannel(SocketChannel ch) throws Exception {
                    ChannelPipeline pipeline = ch.pipeline();

                    //因為基于http協(xié)議,使用http的編碼和解碼器
                    pipeline.addLast(new HttpServerCodec());
                    //是以塊方式寫乌询,添加ChunkedWriteHandler處理器
                   //一般情況下在header部分會有一個欄目:content-length榜贴,用于指明body部分的數(shù)據(jù)大小,那么我們在讀取數(shù)據(jù)的時候只要讀取相應(yīng)大小的字節(jié)數(shù)據(jù)就表示body已經(jīng)讀完了妹田。唬党。鹃共。
                   //但是有的情況下如果沒有content-length,那么就需要那么就需要chunked來傳輸body的數(shù)據(jù)了初嘹。。沮趣。
                    pipeline.addLast(new ChunkedWriteHandler());

                    /*
                    說明
                    1. http數(shù)據(jù)在傳輸過程中是分段, HttpObjectAggregator 屯烦,就是可以將多個段聚合
                    2. 這就就是為什么,當(dāng)瀏覽器發(fā)送大量數(shù)據(jù)時房铭,就會發(fā)出多次http請求
                     */
                    pipeline.addLast(new HttpObjectAggregator(8192));
                    /*
                    說明
                    1. 對應(yīng)websocket 驻龟,它的數(shù)據(jù)是以 幀(frame) 形式傳遞
                    2. 可以看到WebSocketFrame 下面有六個子類
                    3. 瀏覽器請求時 ws://localhost:7000/hello 表示請求的uri
                    4. WebSocketServerProtocolHandler 核心功能是將 http協(xié)議升級為 ws協(xié)議 , 保持長連接
                    5. 是通過一個 狀態(tài)碼 101
                     */
                    pipeline.addLast(new WebSocketServerProtocolHandler("/hello2"));

                    //自定義的handler ,處理業(yè)務(wù)邏輯
                    pipeline.addLast(new MyTextWebSocketFrameHandler());
                }
            });

            //啟動服務(wù)器
            ChannelFuture channelFuture = serverBootstrap.bind(7000).sync();
            channelFuture.channel().closeFuture().sync();

        }finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
}

MyTextWebSocketFrameHandler

package com.atguigu.netty.websocket;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;

import java.time.LocalDateTime;

//這里 TextWebSocketFrame 類型缸匪,表示一個文本幀(frame)
public class MyTextWebSocketFrameHandler extends SimpleChannelInboundHandler<TextWebSocketFrame>{
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {

        System.out.println("服務(wù)器收到消息 " + msg.text());

        //回復(fù)消息
        ctx.channel().writeAndFlush(new TextWebSocketFrame("服務(wù)器時間" + LocalDateTime.now() + " " + msg.text()));
    }

    //當(dāng)web客戶端連接后翁狐, 觸發(fā)方法
    @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
        //id 表示唯一的值,LongText 是唯一的 ShortText 不是唯一
        System.out.println("handlerAdded 被調(diào)用" + ctx.channel().id().asLongText());
        System.out.println("handlerAdded 被調(diào)用" + ctx.channel().id().asShortText());
    }


    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {

        System.out.println("handlerRemoved 被調(diào)用" + ctx.channel().id().asLongText());
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        System.out.println("異常發(fā)生 " + cause.getMessage());
        ctx.close(); //關(guān)閉連接
    }
}

hello.html

<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <title>Title</title>
</head>
<body>
<script>
    var socket;
    //判斷當(dāng)前瀏覽器是否支持websocket
    if(window.WebSocket) {
        //go on
        socket = new WebSocket("ws://localhost:7000/hello2");
        //相當(dāng)于channelReado, ev 收到服務(wù)器端回送的消息
        //當(dāng)發(fā)生onmessage事件時凌蔬,將接收到的數(shù)據(jù)放入id =“responseText”的元素中
        socket.onmessage = function (ev) {
            var rt = document.getElementById("responseText");
            rt.value = rt.value + "\n" + ev.data;
        }

        //相當(dāng)于連接開啟(感知到連接開啟)
        socket.onopen = function (ev) {
            var rt = document.getElementById("responseText");
            rt.value = "連接開啟了.."
        }

        //相當(dāng)于連接關(guān)閉(感知到連接關(guān)閉)
        socket.onclose = function (ev) {

            var rt = document.getElementById("responseText");
            rt.value = rt.value + "\n" + "連接關(guān)閉了.."
        }
    } else {
        alert("當(dāng)前瀏覽器不支持websocket")
    }

    //發(fā)送消息到服務(wù)器
    function send(message) {
        if(!window.socket) { //先判斷socket是否創(chuàng)建好
            return;
        }
        if(socket.readyState == WebSocket.OPEN) {
            //通過socket 發(fā)送消息
            socket.send(message)
        } else {
            alert("連接沒有開啟");
        }
    }
</script>
<!--onsubmit:當(dāng)提交表單時執(zhí)行一段 JavaScript,則return false表示禁止表單提交.-->
    <form onsubmit="return false">
        <textarea name="message" style="height: 300px; width: 300px"></textarea>
        <input type="button" value="發(fā)生消息" onclick="send(this.form.message.value)">
        <textarea id="responseText" style="height: 300px; width: 300px"></textarea>
        <input type="button" value="清空內(nèi)容" onclick="document.getElementById('responseText').value=''">
    </form>
</body>
</html>
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末露懒,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子砂心,更是在濱河造成了極大的恐慌懈词,老刑警劉巖,帶你破解...
    沈念sama閱讀 218,640評論 6 507
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件辩诞,死亡現(xiàn)場離奇詭異坎弯,居然都是意外死亡,警方通過查閱死者的電腦和手機译暂,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,254評論 3 395
  • 文/潘曉璐 我一進店門抠忘,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人外永,你說我怎么就攤上這事崎脉。” “怎么了伯顶?”我有些...
    開封第一講書人閱讀 165,011評論 0 355
  • 文/不壞的土叔 我叫張陵荧嵌,是天一觀的道長。 經(jīng)常有香客問我砾淌,道長啦撮,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 58,755評論 1 294
  • 正文 為了忘掉前任汪厨,我火速辦了婚禮赃春,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘劫乱。我一直安慰自己织中,他們只是感情好锥涕,可當(dāng)我...
    茶點故事閱讀 67,774評論 6 392
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著狭吼,像睡著了一般层坠。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上刁笙,一...
    開封第一講書人閱讀 51,610評論 1 305
  • 那天破花,我揣著相機與錄音,去河邊找鬼疲吸。 笑死座每,一個胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的摘悴。 我是一名探鬼主播峭梳,決...
    沈念sama閱讀 40,352評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼蹂喻!你這毒婦竟也來了葱椭?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 39,257評論 0 276
  • 序言:老撾萬榮一對情侶失蹤口四,失蹤者是張志新(化名)和其女友劉穎挫以,沒想到半個月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體窃祝,經(jīng)...
    沈念sama閱讀 45,717評論 1 315
  • 正文 獨居荒郊野嶺守林人離奇死亡掐松,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,894評論 3 336
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了粪小。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片大磺。...
    茶點故事閱讀 40,021評論 1 350
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖探膊,靈堂內(nèi)的尸體忽然破棺而出杠愧,到底是詐尸還是另有隱情,我是刑警寧澤逞壁,帶...
    沈念sama閱讀 35,735評論 5 346
  • 正文 年R本政府宣布流济,位于F島的核電站,受9級特大地震影響腌闯,放射性物質(zhì)發(fā)生泄漏绳瘟。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 41,354評論 3 330
  • 文/蒙蒙 一姿骏、第九天 我趴在偏房一處隱蔽的房頂上張望糖声。 院中可真熱鬧,春花似錦、人聲如沸蘸泻。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,936評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽悦施。三九已至并扇,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間抡诞,已是汗流浹背穷蛹。 一陣腳步聲響...
    開封第一講書人閱讀 33,054評論 1 270
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留沐绒,地道東北人俩莽。 一個月前我還...
    沈念sama閱讀 48,224評論 3 371
  • 正文 我出身青樓旺坠,卻偏偏與公主長得像乔遮,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子取刃,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 44,974評論 2 355

推薦閱讀更多精彩內(nèi)容