Netty多人與群組聊天
消息設(shè)計(jì)
圖示:
public abstract class Packet {
/**
* 協(xié)議版本
*/
@JSONField(deserialize = false, serialize = false)
private Byte version = 1;
@JSONField(serialize = false)
public abstract Byte getCommand();
}
以上是通信過(guò)程中 Java 對(duì)象的抽象類唉堪,定義了一個(gè)版本號(hào)(默認(rèn)值為 1 )以及一個(gè)獲取指令的抽象方法碎赢,所有的指令數(shù)據(jù)包都必須實(shí)現(xiàn)這個(gè)方法粟关,這樣就可以知道某種指令的含義
如客戶端請(qǐng)求登錄消息:
LoginRequestPacket:
public class LoginRequestPacket extends Packet {
private String userName;
private String password;
@Override
public Byte getCommand() {
return LOGIN_REQUEST;
}
}
登錄請(qǐng)求數(shù)據(jù)包繼承自Packet岩臣,然后定義了三個(gè)字段烈评,分別是用戶 ID伐蒋,用戶名工三,密碼,最為重要的就是覆蓋了父類的 getCommand() 方法先鱼,值為常量 LOGIN_REQUEST俭正。
其余消息圖示:
編碼以及解碼
圖示:
PacketCodec:
public class PacketCodec {
public static final int MAGIC_NUMBER = 0x12345678;
public static final PacketCodec INSTANCE = new PacketCodec();
private final Map<Byte, Class<? extends Packet>> packetTypeMap;
private final Map<Byte, Serializer> serializerMap;
private PacketCodec() {
packetTypeMap = new HashMap<>();
packetTypeMap.put(LOGIN_REQUEST, LoginRequestPacket.class);
packetTypeMap.put(LOGIN_RESPONSE, LoginResponsePacket.class);
packetTypeMap.put(MESSAGE_REQUEST, MessageRequestPacket.class);
packetTypeMap.put(MESSAGE_RESPONSE, MessageResponsePacket.class);
packetTypeMap.put(LOGOUT_REQUEST, LogoutRequestPacket.class);
packetTypeMap.put(LOGOUT_RESPONSE, LogoutResponsePacket.class);
packetTypeMap.put(CREATE_GROUP_REQUEST, CreateGroupRequestPacket.class);
packetTypeMap.put(CREATE_GROUP_RESPONSE, CreateGroupResponsePacket.class);
packetTypeMap.put(JOIN_GROUP_REQUEST, JoinGroupRequestPacket.class);
packetTypeMap.put(JOIN_GROUP_RESPONSE, JoinGroupResponsePacket.class);
packetTypeMap.put(QUIT_GROUP_REQUEST, QuitGroupRequestPacket.class);
packetTypeMap.put(QUIT_GROUP_RESPONSE, QuitGroupResponsePacket.class);
packetTypeMap.put(LIST_GROUP_MEMBERS_REQUEST, ListGroupMembersRequestPacket.class);
packetTypeMap.put(LIST_GROUP_MEMBERS_RESPONSE, ListGroupMembersResponsePacket.class);
serializerMap = new HashMap<>();
Serializer serializer = new JSONSerializer();
serializerMap.put(serializer.getSerializerAlgorithm(), serializer);
}
public void encode(ByteBuf byteBuf, Packet packet) {
// 1. 序列化 java 對(duì)象
byte[] bytes = Serializer.DEFAULT.serialize(packet);
// 2. 實(shí)際編碼過(guò)程
byteBuf.writeInt(MAGIC_NUMBER);
byteBuf.writeByte(packet.getVersion());
byteBuf.writeByte(Serializer.DEFAULT.getSerializerAlgorithm());
byteBuf.writeByte(packet.getCommand());
byteBuf.writeInt(bytes.length);
byteBuf.writeBytes(bytes);
}
public Packet decode(ByteBuf byteBuf) {
// 跳過(guò) magic number
byteBuf.skipBytes(4);
// 跳過(guò)版本號(hào)
byteBuf.skipBytes(1);
// 序列化算法
byte serializeAlgorithm = byteBuf.readByte();
// 指令
byte command = byteBuf.readByte();
// 數(shù)據(jù)包長(zhǎng)度
int length = byteBuf.readInt();
byte[] bytes = new byte[length];
byteBuf.readBytes(bytes);
Class<? extends Packet> requestType = getRequestType(command);
Serializer serializer = getSerializer(serializeAlgorithm);
if (requestType != null && serializer != null) {
return serializer.deserialize(requestType, bytes);
}
return null;
}
private Serializer getSerializer(byte serializeAlgorithm) {
return serializerMap.get(serializeAlgorithm);
}
private Class<? extends Packet> getRequestType(byte command) {
return packetTypeMap.get(command);
}
}
用戶狀態(tài)以及群組的處理
通過(guò)Session來(lái)對(duì)應(yīng)具體用戶
public class Session {
// 用戶唯一性標(biāo)識(shí)
private String userId;
private String userName;
public Session(String userId, String userName) {
this.userId = userId;
this.userName = userName;
}
@Override
public String toString() {
return userId + ":" + userName;
}
}
SessionUtil:
public class SessionUtil {
private static final Map<String, Channel> userIdChannelMap = new ConcurrentHashMap<>();
private static final Map<String, ChannelGroup> groupIdChannelGroupMap = new ConcurrentHashMap<>();
public static void bindSession(Session session, Channel channel) {
userIdChannelMap.put(session.getUserId(), channel);
channel.attr(Attributes.SESSION).set(session);
}
public static void unBindSession(Channel channel) {
if (hasLogin(channel)) {
Session session = getSession(channel);
userIdChannelMap.remove(session.getUserId());
channel.attr(Attributes.SESSION).set(null);
System.out.println(session + " 退出登錄!");
}
}
public static boolean hasLogin(Channel channel) {
return getSession(channel) != null;
}
public static Session getSession(Channel channel) {
return channel.attr(Attributes.SESSION).get();
}
public static Channel getChannel(String userId) {
return userIdChannelMap.get(userId);
}
public static void bindChannelGroup(String groupId, ChannelGroup channelGroup) {
groupIdChannelGroupMap.put(groupId, channelGroup);
}
public static ChannelGroup getChannelGroup(String groupId) {
return groupIdChannelGroupMap.get(groupId);
}
}
處理用戶的登錄登出以及群組的管理
聊天界面的管理
通過(guò)命令行輸入具體的操作字符串來(lái)進(jìn)行操作,圖示:
整體處理流程:
public class NettyClient {
private static final int MAX_RETRY = 5;
private static final String HOST = "127.0.0.1";
private static final int PORT = 9977;
public static void main(String[] args) {
NioEventLoopGroup workerGroup = new NioEventLoopGroup();
Bootstrap bootstrap = new Bootstrap();
bootstrap
.group(workerGroup)
.channel(NioSocketChannel.class)
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000)
.option(ChannelOption.SO_KEEPALIVE, true)
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) {
ch.pipeline().addLast(new Spliter());
ch.pipeline().addLast(new PacketDecoder());
// 登錄響應(yīng)處理器
ch.pipeline().addLast(new LoginResponseHandler());
// 收消息處理器
ch.pipeline().addLast(new MessageResponseHandler());
// 創(chuàng)建群響應(yīng)處理器
ch.pipeline().addLast(new CreateGroupResponseHandler());
// 加群響應(yīng)處理器
ch.pipeline().addLast(new JoinGroupResponseHandler());
// 退群響應(yīng)處理器
ch.pipeline().addLast(new QuitGroupResponseHandler());
// 獲取群成員響應(yīng)處理器
ch.pipeline().addLast(new ListGroupMembersResponseHandler());
// 登出響應(yīng)處理器
ch.pipeline().addLast(new LogoutResponseHandler());
ch.pipeline().addLast(new PacketEncoder());
}
});
connect(bootstrap, HOST, PORT, MAX_RETRY);
}
private static void connect(Bootstrap bootstrap, String host, int port, int retry) {
bootstrap.connect(host, port).addListener(future -> {
if (future.isSuccess()) {
System.out.println(new Date() + ": 連接成功焙畔,啟動(dòng)控制臺(tái)線程……");
Channel channel = ((ChannelFuture) future).channel();
startConsoleThread(channel);
} else if (retry == 0) {
System.err.println("重試次數(shù)已用完掸读,放棄連接!");
} else {
// 第幾次重連
int order = (MAX_RETRY - retry) + 1;
// 本次重連的間隔
int delay = 1 << order;
System.err.println(new Date() + ": 連接失敗宏多,第" + order + "次重連……");
bootstrap.config().group().schedule(() -> connect(bootstrap, host, port, retry - 1), delay, TimeUnit
.SECONDS);
}
});
}
private static void startConsoleThread(Channel channel) {
ConsoleCommandManager consoleCommandManager = new ConsoleCommandManager();
LoginConsoleCommand loginConsoleCommand = new LoginConsoleCommand();
Scanner scanner = new Scanner(System.in);
new Thread(() -> {
while (!Thread.interrupted()) {
if (!SessionUtil.hasLogin(channel)) {
loginConsoleCommand.exec(scanner, channel);
} else {
consoleCommandManager.exec(scanner, channel);
}
}
}).start();
}
}
public class NettyServer {
private static final int PORT = 9977;
public static void main(String[] args) {
NioEventLoopGroup boosGroup = new NioEventLoopGroup();
NioEventLoopGroup workerGroup = new NioEventLoopGroup();
final ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap
.group(boosGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 1024)
.childOption(ChannelOption.SO_KEEPALIVE, true)
.childOption(ChannelOption.TCP_NODELAY, true)
.childHandler(new ChannelInitializer<NioSocketChannel>() {
protected void initChannel(NioSocketChannel ch) {
ch.pipeline().addLast(new Spliter());
ch.pipeline().addLast(new PacketDecoder());
// 登錄請(qǐng)求處理器
ch.pipeline().addLast(new LoginRequestHandler());
ch.pipeline().addLast(new AuthHandler());
// 單聊消息請(qǐng)求處理器
ch.pipeline().addLast(new MessageRequestHandler());
// 創(chuàng)建群請(qǐng)求處理器
ch.pipeline().addLast(new CreateGroupRequestHandler());
// 加群請(qǐng)求處理器
ch.pipeline().addLast(new JoinGroupRequestHandler());
// 退群請(qǐng)求處理器
ch.pipeline().addLast(new QuitGroupRequestHandler());
// 獲取群成員請(qǐng)求處理器
ch.pipeline().addLast(new ListGroupMembersRequestHandler());
// 登出請(qǐng)求處理器
ch.pipeline().addLast(new LogoutRequestHandler());
ch.pipeline().addLast(new PacketEncoder());
}
});
bind(serverBootstrap, PORT);
}
private static void bind(final ServerBootstrap serverBootstrap, final int port) {
serverBootstrap.bind(port).addListener(future -> {
if (future.isSuccess()) {
System.out.println(new Date() + ": 端口[" + port + "]綁定成功!");
} else {
System.err.println("端口[" + port + "]綁定失敗!");
}
});
}
}