Netty多客戶端通信機制

上篇文章講解了客戶端與服務(wù)端通信示例裹驰,本篇來講解下多客戶端之間是如何通信的,我們以一個聊天室的程序為例灶似。
具體需求:
客戶端1、2瑞你、3(通過remoteAddress來標(biāo)識)酪惭,當(dāng)客戶端1上線后,發(fā)送一條消息給服務(wù)端者甲,當(dāng)客戶端2上線后春感,通知客戶端1:“客戶端2已經(jīng)上線”,當(dāng)客戶端3上線后,通知客戶端1和客戶端2:“客戶端3已經(jīng)上線”鲫懒。

按照Netty服務(wù)構(gòu)建步驟進行嫩实,可以參見Netty構(gòu)建服務(wù)的基本步驟文章來了解構(gòu)建過程以及具體說明。

  1. 首先構(gòu)建聊天室服務(wù)端入口窥岩,多個客戶端通信都是經(jīng)由服務(wù)端作為傳輸和通信載體甲献。
    示例代碼:
public class MyCatServer {

    public static void main(String[] args) {
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup =  new NioEventLoopGroup();

        try {
            ServerBootstrap bootstrap = new ServerBootstrap();
            bootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).childHandler(new MyChatServerInitializer());

            ChannelFuture channelFuture = bootstrap.bind(8899).sync();
            channelFuture.channel().closeFuture().sync();

        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
}
  1. 創(chuàng)建MyChatClientInitializer,添加編解碼處理器ChannelPipeline.
    示例代碼:
public class MyChatServerInitializer extends ChannelInitializer<SocketChannel> {

    @Override protected void initChannel(SocketChannel ch) throws Exception {
        ChannelPipeline channelPipeline = ch.pipeline();

        // 添加基于\r \n界定符的解碼器
        channelPipeline.addLast(new DelimiterBasedFrameDecoder(4096, Delimiters.lineDelimiter()));
        channelPipeline.addLast(new StringDecoder(CharsetUtil.UTF_8)); // 添加字符串解碼器
        channelPipeline.addLast(new StringEncoder(CharsetUtil.UTF_8)); // 添加字符串編碼器
        channelPipeline.addLast(new MyChatServerHandler()); // 自定義處理器
    }
}
  1. 創(chuàng)建自定義處理器MyChatServerHandler, 這里涉及到一個重要的組件ChannelGroup颂翼,它是線程安全的晃洒,ChannelGroup存儲了已連接的Channel,Channel關(guān)閉會自動從ChannelGroup中移除朦乏,無需擔(dān)心Channel生命周期球及。同時,可以對這些Channel做各種批量操作呻疹,可以以廣播的形式發(fā)送一條消息給所有的Channels吃引,調(diào)用它的writeAndFlush方法來實現(xiàn)。
    ChannelGroup可以進一步理解為設(shè)計模式中的發(fā)布-訂閱模型诲宇,其底層是通過ConcurrentHashMap進行存儲所有Channel的际歼。
    示例代碼:
public class MyChatServerHandler extends SimpleChannelInboundHandler<String> {

    private static ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);

    @Override protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
        // 這里要區(qū)分下是否是自己發(fā)的消息
        Channel channel = ctx.channel();
        // 這里使用了Java8的lambda表達式
        channelGroup.forEach(ch -> {
            if (ch == channel) { // 兩個channel對象地址相同
                System.out.println("服務(wù)器端轉(zhuǎn)發(fā)聊天消息:【自己】發(fā)送的消息, 內(nèi)容:" + msg + "\n");
                ch.writeAndFlush("【自己】發(fā)送的消息, 內(nèi)容:" + msg + "\n");
            } else {
                System.out.println("服務(wù)器端轉(zhuǎn)發(fā)聊天消息:"+ ch.remoteAddress() + "發(fā)送的消息,內(nèi)容:" + msg + "\n");
                ch.writeAndFlush(ch.remoteAddress() + "發(fā)送的消息姑蓝,內(nèi)容:" + msg + "\n");
            }
        });
    }

    // -----------以下覆寫的方法是ChannelInboundHandlerAdapter中的方法---------------
    @Override public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
        Channel channel = ctx.channel();

        channelGroup.writeAndFlush("[服務(wù)器] - " + channel.remoteAddress() + " 加入了\n");

        // 先寫入到客戶端,最后再將自己添加到ChannelGroup中
        channelGroup.add(channel);
    }

    @Override public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
        Channel channel = ctx.channel();

        channelGroup.writeAndFlush("[服務(wù)器] - " + channel.remoteAddress() + " 離開了\n");

        // 這里channelGroup會自動進行調(diào)用吕粗,所以這行代碼不寫也是可以的纺荧。
        channelGroup.remove(channel);
    }

    /**
     * 只要有客戶端連接就會執(zhí)行
     *
     * @param ctx
     * @throws Exception
     */
    @Override public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println(ctx.channel().remoteAddress() + " 上線了\n");
    }

    @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        System.out.println(ctx.channel().remoteAddress() + " 下線了\n");
    }

    @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        super.exceptionCaught(ctx, cause);
    }
}
  1. 構(gòu)建客戶端入口,連接服務(wù)端8899端口颅筋,示例中通過控制臺輸入形式給服務(wù)端發(fā)送消息宙暇。
    示例代碼:
public class MyCatClient {

    public static void main(String[] args) {
        EventLoopGroup eventLoopGroup = new NioEventLoopGroup();

        try {
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(eventLoopGroup).channel(NioSocketChannel.class).handler(new MyChatClientInitalizer());

            ChannelFuture channelFuture = bootstrap.connect("localhost", 8899).sync();
            // channelFuture.channel().closeFuture().sync();

            // 從控制臺不斷的讀取輸入
            boolean running = true;
            try (BufferedReader br = new BufferedReader(new InputStreamReader(System.in))){
                while (running) {
                    channelFuture.channel().writeAndFlush(br.readLine() + "\r\n");
                }
            } catch (IOException e) {
                e.printStackTrace();
            }

        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            eventLoopGroup.shutdownGracefully();
        }
    }
}
  1. 創(chuàng)建客戶端MyChatClientInitializer,跟服務(wù)端基本類似的處理器议泵。
    示例代碼:
public class MyChatClientInitalizer extends ChannelInitializer<SocketChannel> {

    @Override protected void initChannel(SocketChannel ch) throws Exception {
        ChannelPipeline channelPipeline = ch.pipeline();

        channelPipeline.addLast(new DelimiterBasedFrameDecoder(4096, Delimiters.lineDelimiter()));
        channelPipeline.addLast(new StringDecoder(CharsetUtil.UTF_8));
        channelPipeline.addLast(new StringEncoder(CharsetUtil.UTF_8));
        channelPipeline.addLast(new MyChatClientHandler());
    }
}
  1. 創(chuàng)建自定義處理器MyChatClientHandler, 很簡單占贫,只是輸出一條消息。
    示例代碼:
public class MyChatClientHandler extends SimpleChannelInboundHandler<String> {

    @Override protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
        System.out.println(msg);
    }
}

以上幾個步驟就完成了聊天室程序的編碼工作先口,下面運行服務(wù)端和客戶端程序型奥,運行結(jié)果如下:

MyChatClient(1) MyChatClient(2) MyChatClient(3) MyChatServer
Run None None /127.0.0.1:51049 上線了
[服務(wù)器] - /127.0.0.1:51055 加入了 Run None /127.0.0.1:51055 上線了
[服務(wù)器] - /127.0.0.1:51114 加入了 [服務(wù)器] - /127.0.0.1:51114 加入了 Run /127.0.0.1:51114 上線了

None:表示未運行 Run:表示運行

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市碉京,隨后出現(xiàn)的幾起案子厢汹,更是在濱河造成了極大的恐慌,老刑警劉巖谐宙,帶你破解...
    沈念sama閱讀 218,941評論 6 508
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件烫葬,死亡現(xiàn)場離奇詭異,居然都是意外死亡,警方通過查閱死者的電腦和手機搭综,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,397評論 3 395
  • 文/潘曉璐 我一進店門垢箕,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人兑巾,你說我怎么就攤上這事条获。” “怎么了闪朱?”我有些...
    開封第一講書人閱讀 165,345評論 0 356
  • 文/不壞的土叔 我叫張陵月匣,是天一觀的道長。 經(jīng)常有香客問我奋姿,道長锄开,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 58,851評論 1 295
  • 正文 為了忘掉前任称诗,我火速辦了婚禮萍悴,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘寓免。我一直安慰自己癣诱,他們只是感情好,可當(dāng)我...
    茶點故事閱讀 67,868評論 6 392
  • 文/花漫 我一把揭開白布袜香。 她就那樣靜靜地躺著撕予,像睡著了一般。 火紅的嫁衣襯著肌膚如雪蜈首。 梳的紋絲不亂的頭發(fā)上实抡,一...
    開封第一講書人閱讀 51,688評論 1 305
  • 那天,我揣著相機與錄音欢策,去河邊找鬼吆寨。 笑死,一個胖子當(dāng)著我的面吹牛踩寇,可吹牛的內(nèi)容都是我干的啄清。 我是一名探鬼主播,決...
    沈念sama閱讀 40,414評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼俺孙,長吁一口氣:“原來是場噩夢啊……” “哼辣卒!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起鼠冕,我...
    開封第一講書人閱讀 39,319評論 0 276
  • 序言:老撾萬榮一對情侶失蹤添寺,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后懈费,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體计露,經(jīng)...
    沈念sama閱讀 45,775評論 1 315
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,945評論 3 336
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了票罐。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片叉趣。...
    茶點故事閱讀 40,096評論 1 350
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖该押,靈堂內(nèi)的尸體忽然破棺而出疗杉,到底是詐尸還是另有隱情,我是刑警寧澤蚕礼,帶...
    沈念sama閱讀 35,789評論 5 346
  • 正文 年R本政府宣布烟具,位于F島的核電站,受9級特大地震影響奠蹬,放射性物質(zhì)發(fā)生泄漏朝聋。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 41,437評論 3 331
  • 文/蒙蒙 一囤躁、第九天 我趴在偏房一處隱蔽的房頂上張望冀痕。 院中可真熱鬧,春花似錦狸演、人聲如沸言蛇。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,993評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽腊尚。三九已至,卻和暖如春满哪,著一層夾襖步出監(jiān)牢的瞬間跟伏,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 33,107評論 1 271
  • 我被黑心中介騙來泰國打工翩瓜, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人携龟。 一個月前我還...
    沈念sama閱讀 48,308評論 3 372
  • 正文 我出身青樓兔跌,卻偏偏與公主長得像,于是被迫代替她去往敵國和親峡蟋。 傳聞我的和親對象是個殘疾皇子坟桅,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 45,037評論 2 355

推薦閱讀更多精彩內(nèi)容

  • Spring Cloud為開發(fā)人員提供了快速構(gòu)建分布式系統(tǒng)中一些常見模式的工具(例如配置管理,服務(wù)發(fā)現(xiàn)蕊蝗,斷路器仅乓,智...
    卡卡羅2017閱讀 134,659評論 18 139
  • netty常用API學(xué)習(xí) netty簡介 Netty是基于Java NIO的網(wǎng)絡(luò)應(yīng)用框架. Netty是一個NIO...
    花丶小偉閱讀 6,006評論 0 20
  • RPC框架遠程調(diào)用的實現(xiàn)方式在原理上是比較簡單的,即將調(diào)用的方法(接口名蓬戚、方法名夸楣、參數(shù)類型、參數(shù))序列化之后發(fā)送到...
    謎碌小孩閱讀 3,105評論 0 13
  • Netty心跳基本檢測機制 首先,了解下為什么需要心跳豫喧?假設(shè)客戶端(如手機石洗,PAD)與服務(wù)器端已經(jīng)建立了長連接,客...
    東升的思考閱讀 1,466評論 0 2
  • 早上沒敢請假去參加今天一天的落地會紧显,昨晚睡覺突然想起月底啦讲衫,考勤還沒做,計劃今天把考勤做好發(fā)給上海孵班。十點多看群里已...
    王莎莎2017閱讀 608評論 0 1