Netty入門使用

要想了解Netty叽唱,我們還是需要看Netty的基礎(chǔ)使用

這里使用的netty

     <dependency>
            <groupId>io.netty</groupId>
            <artifactId>netty-all</artifactId>
            <version>4.1.17.Final</version>
        </dependency>

1.客戶端

public class NettyClient {

    /*IP地址*/
    private static final String HOST = "127.0.0.1";
    /*端口號(hào)*/
    private static final int PORT1 = 9091;

    public static void main(String[] args) throws Exception {

        EventLoopGroup workGroup = new NioEventLoopGroup();
        try {
            byte[] bab5BBS = BinaryTransferUtils.hex2Bytes("BAB5BB");
            Bootstrap b = new Bootstrap();//客戶端
            ByteBuf buf = Unpooled.copiedBuffer(bab5BBS);
            b.group(workGroup)
                    .channel(NioSocketChannel.class)//客戶端 -->NioSocketChannel
                    .option(ChannelOption.SO_KEEPALIVE, true)

                    .handler(new ChannelInitializer<SocketChannel>() {//handler
                        @Override
                        protected void initChannel(SocketChannel sc) throws Exception {
                            sc.pipeline()
                              .addLast("delimiter",new DelimiterBasedFrameDecoder( 100000000,false,buf))     //解碼分隔符
                              .addLast("decoder", new ByteArrayDecoder()) //解碼器
                              .addLast("encoder", new StringEncoder())  //編碼器
                             .addLast(new ClientHandler());  //客戶端處理器伤疙,具體處理細(xì)節(jié)
                        }
                    });
            //創(chuàng)建異步連接 可添加多個(gè)端口
            ChannelFuture cf1 = b.connect(HOST, PORT1).sync();

            cf1.channel().closeFuture().sync();
        } finally {
            workGroup.shutdownGracefully();
        }
    }
}
//處理器
public class ClientHandler extends SimpleChannelInboundHandler<Object> {

    private static final String id= "123";

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        String send = "{'type':'add','appid':'" + id+ "'}";
        byte[] data = send.getBytes();
        ByteBuf firstMessage = Unpooled.buffer();
        firstMessage.writeBytes(data);
        ctx.writeAndFlush(firstMessage);
        System.out.println("客戶端發(fā)送消息:" + send);
    }

    @Override
    public void channelRead0(ChannelHandlerContext ctx,Object msg) throws Exception {
            System.out.println("接收到客戶端 發(fā)送消息:"+msg);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        // Close the connection when an exception is raised.
        cause.printStackTrace();
        ctx.close();
    }
}

2.服務(wù)端

public class NettyApplication {


    private static final Logger logger = LoggerFactory.getLogger(WmledApplication.class);

    private static final int PORT = 9091;

    public static void main(String[] args) {

        EventLoopGroup boosGroup = new NioEventLoopGroup(); //處理連接
        EventLoopGroup workerGroup = new NioEventLoopGroup(100); //處理網(wǎng)絡(luò)IO
        try {
            ServerBootstrap bootstrap = new ServerBootstrap();
            bootstrap.group(boosGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    //option對(duì)應(yīng)nio中 ServerSocketChannel設(shè)置的參數(shù)
                    //childOption對(duì)應(yīng)nio中SocketChannel設(shè)置的參數(shù)
                    .option(ChannelOption.SO_BACKLOG, 2048)  //連接數(shù)
                        .childOption(ChannelOption.SO_KEEPALIVE, true)
                        .childHandler(new ServerInitializer())   //服務(wù)端初始化信息
                       
                        
            // 服務(wù)器綁定端口監(jiān)聽(tīng)
            ChannelFuture channelFuture = bootstrap.bind(PORT).sync();
            logger.info("----netty服務(wù)已經(jīng)啟動(dòng),端口:" + PORT + "----------");
            // 監(jiān)聽(tīng)服務(wù)器關(guān)閉監(jiān)聽(tīng)
            channelFuture.channel().closeFuture().sync();
        } catch (Exception e) {
            logger.error("--- netty服務(wù)異常 ---", e);
        } finally {
            boosGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }

}

ServerInitializer 服務(wù)器初始化類

public class ServerInitializer extends ChannelInitializer<SocketChannel> {

    @Override
    public void initChannel(SocketChannel arg0){
        //ChannelPipeline 可以理解為消息傳送通道 通道一旦建立 持續(xù)存在
        ChannelPipeline channelPipeline = arg0.pipeline();
        //為通道添加功能
        ByteBuf buf = Unpooled.copiedBuffer("}".getBytes());//自定義拆包字符,用“}”做拆包
        channelPipeline.addLast("delimiter",
                new DelimiterBasedFrameDecoder(1024,false,buf));
        //字符串解碼  編碼
        channelPipeline.addLast("decoder", new StringDecoder());
        channelPipeline.addLast("encoder",new ByteArrayEncoder());

        //添加自主邏輯,這里我是用spring管理的這個(gè)serverhandler
        channelPipeline.addLast(SpringUtil.getBean(ServerHandler.class));
    }
}

IO事件處理之ServerHandler處理器

public class ServerHandler extends SimpleChannelInboundHandler<String> {

    private static final Logger logger = LoggerFactory.getLogger(ServerHandler.class);

    @Autowired
    private MessageHandler messageHandler;

    @Override
    protected void channelRead0(ChannelHandlerContext arg0, String json) {
            messageHandler.handle(arg0,json); //通過(guò)自定義的handler來(lái)處理數(shù)據(jù)
    }


    /**
     * channel被激活時(shí)調(diào)用
     */
    @Override
    public void channelActive(ChannelHandlerContext ctx) {
        logger.info("檢測(cè)到上線 待加入 【{}】", ctx.channel().remoteAddress());
    }


    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
        super.handlerRemoved(ctx);
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        logger.info("【{}】機(jī)器下線 " +ctx.channel().remoteAddress());
        ctx.close();
        super.channelInactive(ctx);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause){
        logger.info(cause.getMessage() + "---------" + ctx.toString());
    }

上面客戶端和服務(wù)端分別啟動(dòng)起來(lái)就可以相互通信了。

3.拆包粘包問(wèn)題

熟悉tcp就知道我們的業(yè)務(wù)數(shù)據(jù)不一定會(huì)按照我們想象的作為某一個(gè)整體發(fā)送,一個(gè)完整的包可能會(huì)被拆分為幾個(gè)包發(fā)送动猬,接受的時(shí)候可能就會(huì)出現(xiàn)獲取部分包的情況,所以就會(huì)出現(xiàn)半包表箭,粘包的情況赁咙。

Netty提供多種處理拆包和粘包的的解碼器:

  • 消息固定長(zhǎng)度,消息達(dá)到某一個(gè)長(zhǎng)度就表示讀取到了一個(gè)完整的長(zhǎng)度
    -- netty提供方案:FixedLengthFrameDecoder
  • 將回車換行符("\n","\t\n")作為消息結(jié)束符免钻,比如ftp協(xié)議
    -- netty提供方案:LineBasedFrameDecoder
  • 某種特殊的分隔符作為結(jié)束標(biāo)志
    -- netty提供方案:DelimiterBasedFrameDecoder
  • 通過(guò)在消息頭定義長(zhǎng)度字段表示消息的總長(zhǎng)度

其他一些編碼解碼:
比如StringDecoder,StringEncoder,ByteArrayEncoder等彼水,在通道處理過(guò)程中會(huì)自動(dòng)做編碼處理。

4.序列化

序列化:是把對(duì)象的狀態(tài)信息轉(zhuǎn)化為可存儲(chǔ)或傳輸?shù)男问竭^(guò)程伯襟,也就是把對(duì)象轉(zhuǎn)化為字節(jié)序列的過(guò)程稱為對(duì)象的序列化

反序列化:是序列化的逆向過(guò)程猿涨,把字節(jié)數(shù)組反序列化為對(duì)象,把字節(jié)序
列恢復(fù)為對(duì)象的過(guò)程成為對(duì)象的反序列化

目前存在多種序列化的方式姆怪,比如:

  • Java自帶序列化:netty提供ObjectEncoder和ObjectDecoder對(duì)java做序列化處理
  • xml
  • json
  • hession序列化框架
  • Protobuf序列化
    netty提供:ProtobufDecoder叛赚,ProtobufEncoder,ProtobufVarint32LengthFieldPrepender(某種字節(jié)處理稽揭,便于分割每一條俺附,后面這個(gè)處理這個(gè)半包),ProtobufVarint32FrameDecoder(半包處理)溪掀,
    -........

5.相關(guān)協(xié)議開(kāi)發(fā)

Netty支持Http事镣,Websocket,UDP揪胃,自定義協(xié)議開(kāi)發(fā)璃哟,具體怎么使用可以從網(wǎng)絡(luò)上獲取

6.Netty行業(yè)應(yīng)用

Netty基于高性能的異步通信氛琢,常常會(huì)用在 rpc通信,大數(shù)據(jù)随闪,游戲端等場(chǎng)景中阳似;我們常見(jiàn)的比如Dubbo,RocketMQ铐伴,大數(shù)據(jù)中Avro等都有使用Netty做
節(jié)點(diǎn)點(diǎn)的通信撮奏。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市当宴,隨后出現(xiàn)的幾起案子畜吊,更是在濱河造成了極大的恐慌,老刑警劉巖户矢,帶你破解...
    沈念sama閱讀 222,378評(píng)論 6 516
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件玲献,死亡現(xiàn)場(chǎng)離奇詭異,居然都是意外死亡逗嫡,警方通過(guò)查閱死者的電腦和手機(jī)青自,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 94,970評(píng)論 3 399
  • 文/潘曉璐 我一進(jìn)店門株依,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)驱证,“玉大人,你說(shuō)我怎么就攤上這事恋腕∧ǔ” “怎么了?”我有些...
    開(kāi)封第一講書(shū)人閱讀 168,983評(píng)論 0 362
  • 文/不壞的土叔 我叫張陵荠藤,是天一觀的道長(zhǎng)伙单。 經(jīng)常有香客問(wèn)我,道長(zhǎng)哈肖,這世上最難降的妖魔是什么吻育? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 59,938評(píng)論 1 299
  • 正文 為了忘掉前任,我火速辦了婚禮淤井,結(jié)果婚禮上布疼,老公的妹妹穿的比我還像新娘。我一直安慰自己币狠,他們只是感情好游两,可當(dāng)我...
    茶點(diǎn)故事閱讀 68,955評(píng)論 6 398
  • 文/花漫 我一把揭開(kāi)白布。 她就那樣靜靜地躺著漩绵,像睡著了一般贱案。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上止吐,一...
    開(kāi)封第一講書(shū)人閱讀 52,549評(píng)論 1 312
  • 那天宝踪,我揣著相機(jī)與錄音侨糟,去河邊找鬼。 笑死瘩燥,一個(gè)胖子當(dāng)著我的面吹牛粟害,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播颤芬,決...
    沈念sama閱讀 41,063評(píng)論 3 422
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼悲幅,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼!你這毒婦竟也來(lái)了站蝠?” 一聲冷哼從身側(cè)響起汰具,我...
    開(kāi)封第一講書(shū)人閱讀 39,991評(píng)論 0 277
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤,失蹤者是張志新(化名)和其女友劉穎菱魔,沒(méi)想到半個(gè)月后留荔,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 46,522評(píng)論 1 319
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡澜倦,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 38,604評(píng)論 3 342
  • 正文 我和宋清朗相戀三年聚蝶,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片藻治。...
    茶點(diǎn)故事閱讀 40,742評(píng)論 1 353
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡碘勉,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出桩卵,到底是詐尸還是另有隱情验靡,我是刑警寧澤,帶...
    沈念sama閱讀 36,413評(píng)論 5 351
  • 正文 年R本政府宣布雏节,位于F島的核電站胜嗓,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏钩乍。R本人自食惡果不足惜辞州,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 42,094評(píng)論 3 335
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望寥粹。 院中可真熱鬧变过,春花似錦、人聲如沸排作。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 32,572評(píng)論 0 25
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)妄痪。三九已至哈雏,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背裳瘪。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 33,671評(píng)論 1 274
  • 我被黑心中介騙來(lái)泰國(guó)打工土浸, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人彭羹。 一個(gè)月前我還...
    沈念sama閱讀 49,159評(píng)論 3 378
  • 正文 我出身青樓黄伊,卻偏偏與公主長(zhǎng)得像,于是被迫代替她去往敵國(guó)和親派殷。 傳聞我的和親對(duì)象是個(gè)殘疾皇子还最,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,747評(píng)論 2 361