Netty多人與群組聊天

Netty多人與群組聊天

消息設(shè)計(jì)

圖示:

圖1.png
public abstract class Packet {
    /**
     * 協(xié)議版本
     */
    @JSONField(deserialize = false, serialize = false)
    private Byte version = 1;


    @JSONField(serialize = false)
    public abstract Byte getCommand();
}


以上是通信過(guò)程中 Java 對(duì)象的抽象類唉堪,定義了一個(gè)版本號(hào)(默認(rèn)值為 1 )以及一個(gè)獲取指令的抽象方法碎赢,所有的指令數(shù)據(jù)包都必須實(shí)現(xiàn)這個(gè)方法粟关,這樣就可以知道某種指令的含義

如客戶端請(qǐng)求登錄消息:

LoginRequestPacket:

public class LoginRequestPacket extends Packet {
    private String userName;

    private String password;

    @Override
    public Byte getCommand() {

        return LOGIN_REQUEST;
    }
}

登錄請(qǐng)求數(shù)據(jù)包繼承自Packet岩臣,然后定義了三個(gè)字段烈评,分別是用戶 ID伐蒋,用戶名工三,密碼,最為重要的就是覆蓋了父類的 getCommand() 方法先鱼,值為常量 LOGIN_REQUEST俭正。

其余消息圖示:

圖2.png

編碼以及解碼

圖示:

圖3.png

PacketCodec:

public class PacketCodec {

    public static final int MAGIC_NUMBER = 0x12345678;
    public static final PacketCodec INSTANCE = new PacketCodec();

    private final Map<Byte, Class<? extends Packet>> packetTypeMap;
    private final Map<Byte, Serializer> serializerMap;


    private PacketCodec() {
        packetTypeMap = new HashMap<>();
        packetTypeMap.put(LOGIN_REQUEST, LoginRequestPacket.class);
        packetTypeMap.put(LOGIN_RESPONSE, LoginResponsePacket.class);
        packetTypeMap.put(MESSAGE_REQUEST, MessageRequestPacket.class);
        packetTypeMap.put(MESSAGE_RESPONSE, MessageResponsePacket.class);
        packetTypeMap.put(LOGOUT_REQUEST, LogoutRequestPacket.class);
        packetTypeMap.put(LOGOUT_RESPONSE, LogoutResponsePacket.class);
        packetTypeMap.put(CREATE_GROUP_REQUEST, CreateGroupRequestPacket.class);
        packetTypeMap.put(CREATE_GROUP_RESPONSE, CreateGroupResponsePacket.class);
        packetTypeMap.put(JOIN_GROUP_REQUEST, JoinGroupRequestPacket.class);
        packetTypeMap.put(JOIN_GROUP_RESPONSE, JoinGroupResponsePacket.class);
        packetTypeMap.put(QUIT_GROUP_REQUEST, QuitGroupRequestPacket.class);
        packetTypeMap.put(QUIT_GROUP_RESPONSE, QuitGroupResponsePacket.class);
        packetTypeMap.put(LIST_GROUP_MEMBERS_REQUEST, ListGroupMembersRequestPacket.class);
        packetTypeMap.put(LIST_GROUP_MEMBERS_RESPONSE, ListGroupMembersResponsePacket.class);

        serializerMap = new HashMap<>();
        Serializer serializer = new JSONSerializer();
        serializerMap.put(serializer.getSerializerAlgorithm(), serializer);
    }

    public void encode(ByteBuf byteBuf, Packet packet) {
        // 1. 序列化 java 對(duì)象
        byte[] bytes = Serializer.DEFAULT.serialize(packet);

        // 2. 實(shí)際編碼過(guò)程
        byteBuf.writeInt(MAGIC_NUMBER);
        byteBuf.writeByte(packet.getVersion());
        byteBuf.writeByte(Serializer.DEFAULT.getSerializerAlgorithm());
        byteBuf.writeByte(packet.getCommand());
        byteBuf.writeInt(bytes.length);
        byteBuf.writeBytes(bytes);
    }


    public Packet decode(ByteBuf byteBuf) {
        // 跳過(guò) magic number
        byteBuf.skipBytes(4);

        // 跳過(guò)版本號(hào)
        byteBuf.skipBytes(1);

        // 序列化算法
        byte serializeAlgorithm = byteBuf.readByte();

        // 指令
        byte command = byteBuf.readByte();

        // 數(shù)據(jù)包長(zhǎng)度
        int length = byteBuf.readInt();

        byte[] bytes = new byte[length];
        byteBuf.readBytes(bytes);

        Class<? extends Packet> requestType = getRequestType(command);
        Serializer serializer = getSerializer(serializeAlgorithm);

        if (requestType != null && serializer != null) {
            return serializer.deserialize(requestType, bytes);
        }

        return null;
    }

    private Serializer getSerializer(byte serializeAlgorithm) {

        return serializerMap.get(serializeAlgorithm);
    }

    private Class<? extends Packet> getRequestType(byte command) {

        return packetTypeMap.get(command);
    }
}

用戶狀態(tài)以及群組的處理

通過(guò)Session來(lái)對(duì)應(yīng)具體用戶

public class Session {
    // 用戶唯一性標(biāo)識(shí)
    private String userId;
    private String userName;

    public Session(String userId, String userName) {
        this.userId = userId;
        this.userName = userName;
    }

    @Override
    public String toString() {
        return userId + ":" + userName;
    }

}

SessionUtil:

public class SessionUtil {
    private static final Map<String, Channel> userIdChannelMap = new ConcurrentHashMap<>();

    private static final Map<String, ChannelGroup> groupIdChannelGroupMap = new ConcurrentHashMap<>();

    public static void bindSession(Session session, Channel channel) {
        userIdChannelMap.put(session.getUserId(), channel);
        channel.attr(Attributes.SESSION).set(session);
    }

    public static void unBindSession(Channel channel) {
        if (hasLogin(channel)) {
            Session session = getSession(channel);
            userIdChannelMap.remove(session.getUserId());
            channel.attr(Attributes.SESSION).set(null);
            System.out.println(session + " 退出登錄!");
        }
    }

    public static boolean hasLogin(Channel channel) {

        return getSession(channel) != null;
    }

    public static Session getSession(Channel channel) {

        return channel.attr(Attributes.SESSION).get();
    }

    public static Channel getChannel(String userId) {

        return userIdChannelMap.get(userId);
    }

    public static void bindChannelGroup(String groupId, ChannelGroup channelGroup) {
        groupIdChannelGroupMap.put(groupId, channelGroup);
    }

    public static ChannelGroup getChannelGroup(String groupId) {
        return groupIdChannelGroupMap.get(groupId);
    }
}

處理用戶的登錄登出以及群組的管理

聊天界面的管理

通過(guò)命令行輸入具體的操作字符串來(lái)進(jìn)行操作,圖示:

圖4.png
整體處理流程:
圖5.png
public class NettyClient {
    private static final int MAX_RETRY = 5;
    private static final String HOST = "127.0.0.1";
    private static final int PORT = 9977;


    public static void main(String[] args) {
        NioEventLoopGroup workerGroup = new NioEventLoopGroup();

        Bootstrap bootstrap = new Bootstrap();
        bootstrap
                .group(workerGroup)
                .channel(NioSocketChannel.class)
                .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000)
                .option(ChannelOption.SO_KEEPALIVE, true)
                .option(ChannelOption.TCP_NODELAY, true)
                .handler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    public void initChannel(SocketChannel ch) {
                        ch.pipeline().addLast(new Spliter());
                        ch.pipeline().addLast(new PacketDecoder());
                        // 登錄響應(yīng)處理器
                        ch.pipeline().addLast(new LoginResponseHandler());
                        // 收消息處理器
                        ch.pipeline().addLast(new MessageResponseHandler());
                        // 創(chuàng)建群響應(yīng)處理器
                        ch.pipeline().addLast(new CreateGroupResponseHandler());
                        // 加群響應(yīng)處理器
                        ch.pipeline().addLast(new JoinGroupResponseHandler());
                        // 退群響應(yīng)處理器
                        ch.pipeline().addLast(new QuitGroupResponseHandler());
                        // 獲取群成員響應(yīng)處理器
                        ch.pipeline().addLast(new ListGroupMembersResponseHandler());
                        // 登出響應(yīng)處理器
                        ch.pipeline().addLast(new LogoutResponseHandler());
                        ch.pipeline().addLast(new PacketEncoder());
                    }
                });

        connect(bootstrap, HOST, PORT, MAX_RETRY);
    }

    private static void connect(Bootstrap bootstrap, String host, int port, int retry) {
        bootstrap.connect(host, port).addListener(future -> {
            if (future.isSuccess()) {
                System.out.println(new Date() + ": 連接成功焙畔,啟動(dòng)控制臺(tái)線程……");
                Channel channel = ((ChannelFuture) future).channel();
                startConsoleThread(channel);
            } else if (retry == 0) {
                System.err.println("重試次數(shù)已用完掸读,放棄連接!");
            } else {
                // 第幾次重連
                int order = (MAX_RETRY - retry) + 1;
                // 本次重連的間隔
                int delay = 1 << order;
                System.err.println(new Date() + ": 連接失敗宏多,第" + order + "次重連……");
                bootstrap.config().group().schedule(() -> connect(bootstrap, host, port, retry - 1), delay, TimeUnit
                        .SECONDS);
            }
        });
    }

    private static void startConsoleThread(Channel channel) {
        ConsoleCommandManager consoleCommandManager = new ConsoleCommandManager();
        LoginConsoleCommand loginConsoleCommand = new LoginConsoleCommand();
        Scanner scanner = new Scanner(System.in);

        new Thread(() -> {
            while (!Thread.interrupted()) {
                if (!SessionUtil.hasLogin(channel)) {
                    loginConsoleCommand.exec(scanner, channel);
                } else {
                    consoleCommandManager.exec(scanner, channel);
                }
            }
        }).start();
    }
}



public class NettyServer {

    private static final int PORT = 9977;

    public static void main(String[] args) {
        NioEventLoopGroup boosGroup = new NioEventLoopGroup();
        NioEventLoopGroup workerGroup = new NioEventLoopGroup();

        final ServerBootstrap serverBootstrap = new ServerBootstrap();
        serverBootstrap
                .group(boosGroup, workerGroup)
                .channel(NioServerSocketChannel.class)
                .option(ChannelOption.SO_BACKLOG, 1024)
                .childOption(ChannelOption.SO_KEEPALIVE, true)
                .childOption(ChannelOption.TCP_NODELAY, true)
                .childHandler(new ChannelInitializer<NioSocketChannel>() {
                    protected void initChannel(NioSocketChannel ch) {
                        ch.pipeline().addLast(new Spliter());
                        ch.pipeline().addLast(new PacketDecoder());
                        // 登錄請(qǐng)求處理器
                        ch.pipeline().addLast(new LoginRequestHandler());
                        ch.pipeline().addLast(new AuthHandler());
                        // 單聊消息請(qǐng)求處理器
                        ch.pipeline().addLast(new MessageRequestHandler());
                        // 創(chuàng)建群請(qǐng)求處理器
                        ch.pipeline().addLast(new CreateGroupRequestHandler());
                        // 加群請(qǐng)求處理器
                        ch.pipeline().addLast(new JoinGroupRequestHandler());
                        // 退群請(qǐng)求處理器
                        ch.pipeline().addLast(new QuitGroupRequestHandler());
                        // 獲取群成員請(qǐng)求處理器
                        ch.pipeline().addLast(new ListGroupMembersRequestHandler());
                        // 登出請(qǐng)求處理器
                        ch.pipeline().addLast(new LogoutRequestHandler());
                        ch.pipeline().addLast(new PacketEncoder());
                    }
                });


        bind(serverBootstrap, PORT);
    }

    private static void bind(final ServerBootstrap serverBootstrap, final int port) {
        serverBootstrap.bind(port).addListener(future -> {
            if (future.isSuccess()) {
                System.out.println(new Date() + ": 端口[" + port + "]綁定成功!");
            } else {
                System.err.println("端口[" + port + "]綁定失敗!");
            }
        });
    }
}

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末儿惫,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子伸但,更是在濱河造成了極大的恐慌肾请,老刑警劉巖,帶你破解...
    沈念sama閱讀 216,496評(píng)論 6 501
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件更胖,死亡現(xiàn)場(chǎng)離奇詭異铛铁,居然都是意外死亡,警方通過(guò)查閱死者的電腦和手機(jī)却妨,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,407評(píng)論 3 392
  • 文/潘曉璐 我一進(jìn)店門饵逐,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái),“玉大人管呵,你說(shuō)我怎么就攤上這事梳毙。” “怎么了捐下?”我有些...
    開(kāi)封第一講書(shū)人閱讀 162,632評(píng)論 0 353
  • 文/不壞的土叔 我叫張陵账锹,是天一觀的道長(zhǎng)。 經(jīng)常有香客問(wèn)我坷襟,道長(zhǎng)奸柬,這世上最難降的妖魔是什么? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 58,180評(píng)論 1 292
  • 正文 為了忘掉前任婴程,我火速辦了婚禮廓奕,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘档叔。我一直安慰自己桌粉,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,198評(píng)論 6 388
  • 文/花漫 我一把揭開(kāi)白布衙四。 她就那樣靜靜地躺著铃肯,像睡著了一般。 火紅的嫁衣襯著肌膚如雪传蹈。 梳的紋絲不亂的頭發(fā)上押逼,一...
    開(kāi)封第一講書(shū)人閱讀 51,165評(píng)論 1 299
  • 那天,我揣著相機(jī)與錄音惦界,去河邊找鬼挑格。 笑死,一個(gè)胖子當(dāng)著我的面吹牛沾歪,可吹牛的內(nèi)容都是我干的漂彤。 我是一名探鬼主播,決...
    沈念sama閱讀 40,052評(píng)論 3 418
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼灾搏,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼挫望!你這毒婦竟也來(lái)了?” 一聲冷哼從身側(cè)響起确镊,我...
    開(kāi)封第一講書(shū)人閱讀 38,910評(píng)論 0 274
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤士骤,失蹤者是張志新(化名)和其女友劉穎,沒(méi)想到半個(gè)月后蕾域,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體拷肌,經(jīng)...
    沈念sama閱讀 45,324評(píng)論 1 310
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,542評(píng)論 2 332
  • 正文 我和宋清朗相戀三年旨巷,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了巨缘。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 39,711評(píng)論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡采呐,死狀恐怖若锁,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情斧吐,我是刑警寧澤又固,帶...
    沈念sama閱讀 35,424評(píng)論 5 343
  • 正文 年R本政府宣布仲器,位于F島的核電站,受9級(jí)特大地震影響仰冠,放射性物質(zhì)發(fā)生泄漏乏冀。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,017評(píng)論 3 326
  • 文/蒙蒙 一洋只、第九天 我趴在偏房一處隱蔽的房頂上張望辆沦。 院中可真熱鬧,春花似錦识虚、人聲如沸肢扯。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 31,668評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)蔚晨。三九已至,卻和暖如春妻献,著一層夾襖步出監(jiān)牢的瞬間蛛株,已是汗流浹背。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 32,823評(píng)論 1 269
  • 我被黑心中介騙來(lái)泰國(guó)打工育拨, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留谨履,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 47,722評(píng)論 2 368
  • 正文 我出身青樓熬丧,卻偏偏與公主長(zhǎng)得像笋粟,于是被迫代替她去往敵國(guó)和親。 傳聞我的和親對(duì)象是個(gè)殘疾皇子析蝴,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,611評(píng)論 2 353