Netty學(xué)習(xí)-Echo服務(wù)器客戶端

Netty服務(wù)器構(gòu)成

  1. 至少一個(gè)ChannelHandler——該組件實(shí)現(xiàn)了服務(wù)器對(duì)從客戶端接受的數(shù)據(jù)的處理诬垂,即它的業(yè)務(wù)邏輯
  2. 引導(dǎo)——配置服務(wù)器的啟動(dòng)代碼凛澎。至少,它會(huì)將服務(wù)器綁定到它要監(jiān)聽(tīng)連接請(qǐng)求的端口上。

ChannelHandler和業(yè)務(wù)邏輯

?ChannelHandler是一個(gè)接口族的父接口燕刻,它的實(shí)現(xiàn)負(fù)責(zé)接受并響應(yīng)事件通知,在Netty應(yīng)用程序中剖笙,所有的數(shù)據(jù)處理邏輯都包含在這些核心抽象的實(shí)現(xiàn)中卵洗。

?Echo服務(wù)器會(huì)響應(yīng)傳入的消息,因此需要實(shí)現(xiàn)ChannelInboundHandler接口弥咪,用來(lái)定義響應(yīng)入站事件的方法过蹂。由于Echo服務(wù)器的應(yīng)用程序只需要用到少量的方法,所以只需要繼承ChannelInboundHandlerAdapter類(lèi)聚至,它提供了ChannelInboundHandler的默認(rèn)實(shí)現(xiàn)酷勺。

?在ChannelInboundHandler中,我們感興趣的方法有:

  1. channelRead()——對(duì)于每個(gè)傳入的消息都要調(diào)用
  2. channelReadComplete()——通知ChannelInboundHandler最后一次對(duì)channelRead()的調(diào)用是當(dāng)前批量讀取中的最后一條消息
  3. execeptionCaught()——在讀取操作期間扳躬,有異常拋出時(shí)會(huì)調(diào)用脆诉。

EchoServerHandler實(shí)現(xiàn)

package cn.sh.demo.echo;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;

/**
 * @author sh
 * @ChannelHandler.Sharable 標(biāo)示一個(gè)ChannelHandler可以被多個(gè)Channel安全地共享
 */
@ChannelHandler.Sharable
public class EchoServerHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        ByteBuf in = (ByteBuf) msg;
        //將接受到的消息輸出到客戶端
        System.out.println("Server received:" + in.toString(CharsetUtil.UTF_8));
        //將接收到的消息寫(xiě)給發(fā)送者,而不沖刷出站消息
        ctx.write(in);
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) {
        //將消息沖刷到客戶端贷币,并且關(guān)閉該Channel
        ctx.writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        //打印異常堆棧跟蹤
        cause.printStackTrace();
        //關(guān)閉該Channel
        ctx.close();
    }
}

備注

  1. ChannelInbounHandlerAdapter每個(gè)方法都可以被重寫(xiě)然后掛鉤到事件生命周期的恰當(dāng)點(diǎn)上击胜。
  2. 重寫(xiě)exceptionCaught()方法允許你對(duì)Throwable的任何子類(lèi)型作出反應(yīng)。
  3. 每個(gè)Channel都擁有一個(gè)與之關(guān)聯(lián)的ChannelPipeline片择,ChannelPipeline持有一個(gè)ChannelHandler的實(shí)例鏈潜的。在默認(rèn)情況下,ChannelHandler會(huì)把對(duì)方法的調(diào)用轉(zhuǎn)發(fā)給鏈中的下一個(gè)ChannelHandler字管。因此啰挪,如果exceptionCaught()方法沒(méi)有被該鏈中的某處實(shí)現(xiàn),那么異常將會(huì)被傳遞到ChannelPipeline的末端進(jìn)行記錄

引導(dǎo)服務(wù)器

主要涉及的內(nèi)容

  1. 綁定監(jiān)聽(tīng)并接受傳入連接請(qǐng)求的端口
  2. 配置Channel嘲叔,將有關(guān)的入站消息通知給EchoServerHandler實(shí)例

Echo服務(wù)引導(dǎo)示例代碼

package cn.sh.demo.echo;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;

import java.net.InetSocketAddress;

public class EchoServer {

    private final int port;

    public EchoServer(int port) {
        this.port = port;
    }

    public void startServer() throws InterruptedException {
        EchoServerHandler serverHandler = new EchoServerHandler();
        //創(chuàng)建EventLoopGroup
        EventLoopGroup group = new NioEventLoopGroup();
        //創(chuàng)建ServerBootstrap
        ServerBootstrap bootstrap = new ServerBootstrap();
        bootstrap.group(group)
                //指定所使用的NIO傳輸Channel
                .channel(NioServerSocketChannel.class)
                //使用指定的端口套接字
                .localAddress(new InetSocketAddress(port))
                //添加一個(gè)EchoServerHandler到子Channel的ChannelPipeline
                .childHandler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel channel) throws Exception {
                        //此處由于EchoServerHandler被注解標(biāo)注為@Shareble亡呵,所以我們總是使用相同的實(shí)例
                        channel.pipeline().addLast(serverHandler);
                    }
                });
        try {
            //異步的綁定服務(wù)器,調(diào)用sync()方法阻塞等待直到綁定完成
            ChannelFuture channelFuture = bootstrap.bind().sync();
            //獲取Channel的CloseFuture硫戈,并且阻塞當(dāng)前線程直到它完成
            channelFuture.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            //關(guān)閉EventLoopGroup锰什,釋放所有的資源
            group.shutdownGracefully().sync();
        }
    }

    public static void main(String[] args) throws InterruptedException {
        if (args.length != 1) {
            System.err.println("參數(shù)類(lèi)型或者個(gè)數(shù)不正確");
            return;
        }
        //設(shè)置端口值
        int port = Integer.parseInt(args[0]);
        //啟動(dòng)Echo服務(wù)器
        new EchoServer(port).startServer();
    }
}

備注

  1. 此處使用了一個(gè)特殊的類(lèi)——ChannelInitializer。當(dāng)一個(gè)新的連接被接受時(shí),一個(gè)新的子Channel會(huì)被創(chuàng)建汁胆,此時(shí)ChannelInitializer就會(huì)把一個(gè)EchoServerHandler的示例添加到該Channel的ChannelPipeline中梭姓,這個(gè)ChannelHandler將會(huì)收到有關(guān)入站消息的通知。

回顧引導(dǎo)服務(wù)

  1. 創(chuàng)建一個(gè)ServerBootStrap實(shí)例來(lái)引導(dǎo)和綁定服務(wù)器
  2. 創(chuàng)建并分配一個(gè)NioEventLoopGroup實(shí)例進(jìn)行事件的處理嫩码,如接受新連接以及讀/寫(xiě)數(shù)據(jù)
  3. 指定服務(wù)器綁定的本地InetSocketAddress
  4. 使用EchoServerHandler的實(shí)例初始化每一個(gè)新的Channel
  5. 調(diào)用ServerBootstrap.bind()方法來(lái)綁定服務(wù)器

Echo客戶端

客戶端主要包括的操作:

  1. 連接到服務(wù)器
  2. 發(fā)送一個(gè)或多個(gè)消息
  3. 對(duì)于每個(gè)消息誉尖,等待并接受從服務(wù)器發(fā)回相同的消息
  4. 關(guān)閉連接

編寫(xiě)客戶端主要包括業(yè)務(wù)邏輯和引導(dǎo)

ChannelHandler實(shí)現(xiàn)客戶端邏輯

在該示例中,我們使用SimpleChannelInboundHandler類(lèi)來(lái)處理所有的事件铸题,主要的方法有:

  1. channelActive()——在和服務(wù)器的連接已經(jīng)建立之后被調(diào)用
  2. channelRead0()——當(dāng)從服務(wù)器接收到一條消息時(shí)被調(diào)用
  3. exceptionCaught()——在處理過(guò)程中引發(fā)異常時(shí)被調(diào)用

示例代碼如下:

package cn.sh.demo.echo;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.util.CharsetUtil;

/**
 * @author sh
 * @ChannelHandler.Sharable 標(biāo)記該類(lèi)的示例可以被多個(gè)Channel共享
 */
@ChannelHandler.Sharable
public class EchoClientHandler extends SimpleChannelInboundHandler<ByteBuf> {

    @Override
    public void channelActive(ChannelHandlerContext ctx) {
        //當(dāng)一個(gè)連接被服務(wù)器接受并建立后铡恕,發(fā)送一條消息
        ctx.writeAndFlush(Unpooled.copiedBuffer("Hello, Netty", CharsetUtil.UTF_8));
    }

    @Override
    protected void channelRead0(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf) {
        //記錄客戶端接收到服務(wù)器的消息
        System.out.println("Client received:" + byteBuf.toString(CharsetUtil.UTF_8));
    }

    /**
     * 在發(fā)生異常時(shí),記錄錯(cuò)誤并關(guān)閉Channel
     * @param ctx
     * @param cause
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        cause.printStackTrace();
        ctx.close();
    }
}

備注

?每次在接受數(shù)據(jù)時(shí)丢间,都會(huì)調(diào)用channelRead0()方法探熔。需要注意的是,由服務(wù)器發(fā)送的消息可能會(huì)被分塊接受烘挫。也就是說(shuō)诀艰,如果服務(wù)器發(fā)送了5字節(jié),那么不能保證這5字節(jié)會(huì)被一次性接受墙牌。即使是對(duì)于這么少量的數(shù)據(jù)涡驮,channelRead0()方法也可能會(huì)被調(diào)用兩次,第一次使用一個(gè)持有3字節(jié)的ByteBuf(Netty的字節(jié)容器)喜滨,第二次使用一個(gè)持有2字節(jié)的ByteBuf捉捅。作為一個(gè)面向流的協(xié)議,TCP保證了字節(jié)數(shù)組會(huì)按照服務(wù)器發(fā)送它們的順序被接受虽风。

為什么客戶端使用SimpleChannelInboundHandler而不是ChannelInboundHandlerAdapter棒口?

主要和業(yè)務(wù)邏輯如何處理消息以及Netty如何管理資源有關(guān)

客戶端中,當(dāng)channelRead0()方法完成時(shí)辜膝,已經(jīng)接受了消息并且處理完畢无牵,當(dāng)該方法返回時(shí),SimpleChannelInboundHandler負(fù)責(zé)釋放指向保存該消息的ByteBuf的內(nèi)存引用厂抖。

但是在服務(wù)器端茎毁,你需要將消息返回給客戶端,write()操作是異步的忱辅,直到channelRead()方法返回后有可能仍然沒(méi)有完成七蜘,ChannelInboundHandlerAdapter在這個(gè)時(shí)間點(diǎn)上不會(huì)釋放消息。

客戶端的消息是在channelComplete()方法中墙懂,通過(guò)writeAndFlush()方法調(diào)用時(shí)被釋放橡卤。

引導(dǎo)客戶端

客戶端使用主機(jī)和端口參數(shù)來(lái)連接遠(yuǎn)程地址,也就是Echo服務(wù)器的地址损搬,而不是綁定到一個(gè)一直被監(jiān)聽(tīng)的端口碧库。

示例代碼如下:

package cn.sh.demo.echo;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;

import java.net.InetSocketAddress;

public class EchoClient {

    private final String host;

    private final int port;

    public EchoClient(String host, int port) {
        this.host = host;
        this.port = port;
    }

    public void start() throws InterruptedException {
        EventLoopGroup group = new NioEventLoopGroup();
        //創(chuàng)建客戶端引導(dǎo)器
        Bootstrap bootstrap = new Bootstrap();
        //指定使用NioEventLoopGroup來(lái)處理客戶端事件
        bootstrap.group(group)
                //指定使用NIO傳輸?shù)腃hannel類(lèi)型
                .channel(NioSocketChannel.class)
                //設(shè)置服務(wù)器的InetSocketAddress
                .remoteAddress(new InetSocketAddress(host, port))
                //在創(chuàng)建Channel時(shí)柜与,向ChannelPipeline中添加一個(gè)EchoHandler實(shí)例
                .handler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel socketChannel) throws Exception {
                        socketChannel.pipeline().addLast(new EchoClientHandler());
                    }
                });
        try {
            //連接到遠(yuǎn)程節(jié)點(diǎn),阻塞等待直到連接完成
            ChannelFuture future = bootstrap.connect().sync();
            //阻塞直到Channel關(guān)閉
            future.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            //關(guān)閉線程池并且釋放所有的資源
            group.shutdownGracefully().sync();
        }
    }

    public static void main(String[] args) throws InterruptedException {
        if (args.length != 2) {
            System.err.println("參數(shù)個(gè)數(shù)不正確");
            return;
        }
        int port = Integer.parseInt(args[1]);
        new EchoClient(args[0], port).start();
    }
}

備注

服務(wù)器和客戶端均使用了NIO傳輸嵌灰,但是弄匕,客戶端和服務(wù)端可以使用不同的傳輸,例如伞鲫,在服務(wù)器使用NIO傳輸粘茄,客戶端可以使用OIO傳輸

回顧引導(dǎo)服務(wù)器

  1. 創(chuàng)建一個(gè)Bootstrap實(shí)例,引導(dǎo)并創(chuàng)建客戶端
  2. 創(chuàng)建一個(gè)NioEventLoopGroup實(shí)例來(lái)進(jìn)行事件處理秕脓,其中事件處理包括創(chuàng)建新的連接以及處理入站和出站數(shù)據(jù)
  3. 為服務(wù)器連接創(chuàng)建了一個(gè)InetSocketAddress實(shí)例
  4. 當(dāng)連接建立時(shí),一個(gè)EchoClientHandler實(shí)例會(huì)被添加到(該Channel)的ChannelPipeline中
  5. 設(shè)置完成后儒搭,調(diào)用Bootstrap.connetc()連接到遠(yuǎn)程節(jié)點(diǎn)

運(yùn)行程序

  1. 啟動(dòng)服務(wù)端
  2. 再啟動(dòng)客戶端

服務(wù)端的輸出如下:

服務(wù)端輸出

客戶端的輸出如下:

客戶端輸出

代碼地址

該文章的示例代碼位于cn.sh.demo.echo包下吠架。

示例代碼,點(diǎn)擊查看

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末搂鲫,一起剝皮案震驚了整個(gè)濱河市傍药,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌魂仍,老刑警劉巖拐辽,帶你破解...
    沈念sama閱讀 218,546評(píng)論 6 507
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場(chǎng)離奇詭異擦酌,居然都是意外死亡俱诸,警方通過(guò)查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,224評(píng)論 3 395
  • 文/潘曉璐 我一進(jìn)店門(mén)赊舶,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)睁搭,“玉大人,你說(shuō)我怎么就攤上這事笼平≡奥妫” “怎么了?”我有些...
    開(kāi)封第一講書(shū)人閱讀 164,911評(píng)論 0 354
  • 文/不壞的土叔 我叫張陵寓调,是天一觀的道長(zhǎng)锌唾。 經(jīng)常有香客問(wèn)我,道長(zhǎng)夺英,這世上最難降的妖魔是什么晌涕? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 58,737評(píng)論 1 294
  • 正文 為了忘掉前任,我火速辦了婚禮秋麸,結(jié)果婚禮上渐排,老公的妹妹穿的比我還像新娘。我一直安慰自己灸蟆,他們只是感情好驯耻,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,753評(píng)論 6 392
  • 文/花漫 我一把揭開(kāi)白布亲族。 她就那樣靜靜地躺著,像睡著了一般可缚。 火紅的嫁衣襯著肌膚如雪霎迫。 梳的紋絲不亂的頭發(fā)上,一...
    開(kāi)封第一講書(shū)人閱讀 51,598評(píng)論 1 305
  • 那天帘靡,我揣著相機(jī)與錄音知给,去河邊找鬼。 笑死描姚,一個(gè)胖子當(dāng)著我的面吹牛涩赢,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播轩勘,決...
    沈念sama閱讀 40,338評(píng)論 3 418
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼筒扒,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼!你這毒婦竟也來(lái)了绊寻?” 一聲冷哼從身側(cè)響起花墩,我...
    開(kāi)封第一講書(shū)人閱讀 39,249評(píng)論 0 276
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤,失蹤者是張志新(化名)和其女友劉穎澄步,沒(méi)想到半個(gè)月后冰蘑,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,696評(píng)論 1 314
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡村缸,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,888評(píng)論 3 336
  • 正文 我和宋清朗相戀三年祠肥,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片王凑。...
    茶點(diǎn)故事閱讀 40,013評(píng)論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡搪柑,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出索烹,到底是詐尸還是另有隱情工碾,我是刑警寧澤,帶...
    沈念sama閱讀 35,731評(píng)論 5 346
  • 正文 年R本政府宣布百姓,位于F島的核電站渊额,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏垒拢。R本人自食惡果不足惜旬迹,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,348評(píng)論 3 330
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望求类。 院中可真熱鬧奔垦,春花似錦、人聲如沸尸疆。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 31,929評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)。三九已至犯眠,卻和暖如春按灶,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背筐咧。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 33,048評(píng)論 1 270
  • 我被黑心中介騙來(lái)泰國(guó)打工鸯旁, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人量蕊。 一個(gè)月前我還...
    沈念sama閱讀 48,203評(píng)論 3 370
  • 正文 我出身青樓铺罢,卻偏偏與公主長(zhǎng)得像,于是被迫代替她去往敵國(guó)和親残炮。 傳聞我的和親對(duì)象是個(gè)殘疾皇子畏铆,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,960評(píng)論 2 355

推薦閱讀更多精彩內(nèi)容