上篇文章講解了客戶端與服務(wù)端通信示例裹驰,本篇來講解下多客戶端之間是如何通信的,我們以一個聊天室的程序為例灶似。
具體需求:
客戶端1、2瑞你、3(通過remoteAddress來標(biāo)識)酪惭,當(dāng)客戶端1上線后,發(fā)送一條消息給服務(wù)端者甲,當(dāng)客戶端2上線后春感,通知客戶端1:“客戶端2已經(jīng)上線”,當(dāng)客戶端3上線后,通知客戶端1和客戶端2:“客戶端3已經(jīng)上線”鲫懒。
按照Netty服務(wù)構(gòu)建步驟進行嫩实,可以參見Netty構(gòu)建服務(wù)的基本步驟文章來了解構(gòu)建過程以及具體說明。
- 首先構(gòu)建聊天室服務(wù)端入口窥岩,多個客戶端通信都是經(jīng)由服務(wù)端作為傳輸和通信載體甲献。
示例代碼:
public class MyCatServer {
public static void main(String[] args) {
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).childHandler(new MyChatServerInitializer());
ChannelFuture channelFuture = bootstrap.bind(8899).sync();
channelFuture.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
- 創(chuàng)建MyChatClientInitializer,添加編解碼處理器ChannelPipeline.
示例代碼:
public class MyChatServerInitializer extends ChannelInitializer<SocketChannel> {
@Override protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline channelPipeline = ch.pipeline();
// 添加基于\r \n界定符的解碼器
channelPipeline.addLast(new DelimiterBasedFrameDecoder(4096, Delimiters.lineDelimiter()));
channelPipeline.addLast(new StringDecoder(CharsetUtil.UTF_8)); // 添加字符串解碼器
channelPipeline.addLast(new StringEncoder(CharsetUtil.UTF_8)); // 添加字符串編碼器
channelPipeline.addLast(new MyChatServerHandler()); // 自定義處理器
}
}
- 創(chuàng)建自定義處理器MyChatServerHandler, 這里涉及到一個重要的組件ChannelGroup颂翼,它是線程安全的晃洒,ChannelGroup存儲了已連接的Channel,Channel關(guān)閉會自動從ChannelGroup中移除朦乏,無需擔(dān)心Channel生命周期球及。同時,可以對這些Channel做各種批量操作呻疹,可以以廣播的形式發(fā)送一條消息給所有的Channels吃引,調(diào)用它的writeAndFlush方法來實現(xiàn)。
ChannelGroup可以進一步理解為設(shè)計模式中的發(fā)布-訂閱模型诲宇,其底層是通過ConcurrentHashMap進行存儲所有Channel的际歼。
示例代碼:
public class MyChatServerHandler extends SimpleChannelInboundHandler<String> {
private static ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
@Override protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
// 這里要區(qū)分下是否是自己發(fā)的消息
Channel channel = ctx.channel();
// 這里使用了Java8的lambda表達式
channelGroup.forEach(ch -> {
if (ch == channel) { // 兩個channel對象地址相同
System.out.println("服務(wù)器端轉(zhuǎn)發(fā)聊天消息:【自己】發(fā)送的消息, 內(nèi)容:" + msg + "\n");
ch.writeAndFlush("【自己】發(fā)送的消息, 內(nèi)容:" + msg + "\n");
} else {
System.out.println("服務(wù)器端轉(zhuǎn)發(fā)聊天消息:"+ ch.remoteAddress() + "發(fā)送的消息,內(nèi)容:" + msg + "\n");
ch.writeAndFlush(ch.remoteAddress() + "發(fā)送的消息姑蓝,內(nèi)容:" + msg + "\n");
}
});
}
// -----------以下覆寫的方法是ChannelInboundHandlerAdapter中的方法---------------
@Override public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
Channel channel = ctx.channel();
channelGroup.writeAndFlush("[服務(wù)器] - " + channel.remoteAddress() + " 加入了\n");
// 先寫入到客戶端,最后再將自己添加到ChannelGroup中
channelGroup.add(channel);
}
@Override public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
Channel channel = ctx.channel();
channelGroup.writeAndFlush("[服務(wù)器] - " + channel.remoteAddress() + " 離開了\n");
// 這里channelGroup會自動進行調(diào)用吕粗,所以這行代碼不寫也是可以的纺荧。
channelGroup.remove(channel);
}
/**
* 只要有客戶端連接就會執(zhí)行
*
* @param ctx
* @throws Exception
*/
@Override public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println(ctx.channel().remoteAddress() + " 上線了\n");
}
@Override public void channelInactive(ChannelHandlerContext ctx) throws Exception {
System.out.println(ctx.channel().remoteAddress() + " 下線了\n");
}
@Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
super.exceptionCaught(ctx, cause);
}
}
- 構(gòu)建客戶端入口,連接服務(wù)端8899端口颅筋,示例中通過控制臺輸入形式給服務(wù)端發(fā)送消息宙暇。
示例代碼:
public class MyCatClient {
public static void main(String[] args) {
EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(eventLoopGroup).channel(NioSocketChannel.class).handler(new MyChatClientInitalizer());
ChannelFuture channelFuture = bootstrap.connect("localhost", 8899).sync();
// channelFuture.channel().closeFuture().sync();
// 從控制臺不斷的讀取輸入
boolean running = true;
try (BufferedReader br = new BufferedReader(new InputStreamReader(System.in))){
while (running) {
channelFuture.channel().writeAndFlush(br.readLine() + "\r\n");
}
} catch (IOException e) {
e.printStackTrace();
}
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
eventLoopGroup.shutdownGracefully();
}
}
}
- 創(chuàng)建客戶端MyChatClientInitializer,跟服務(wù)端基本類似的處理器议泵。
示例代碼:
public class MyChatClientInitalizer extends ChannelInitializer<SocketChannel> {
@Override protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline channelPipeline = ch.pipeline();
channelPipeline.addLast(new DelimiterBasedFrameDecoder(4096, Delimiters.lineDelimiter()));
channelPipeline.addLast(new StringDecoder(CharsetUtil.UTF_8));
channelPipeline.addLast(new StringEncoder(CharsetUtil.UTF_8));
channelPipeline.addLast(new MyChatClientHandler());
}
}
- 創(chuàng)建自定義處理器MyChatClientHandler, 很簡單占贫,只是輸出一條消息。
示例代碼:
public class MyChatClientHandler extends SimpleChannelInboundHandler<String> {
@Override protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
System.out.println(msg);
}
}
以上幾個步驟就完成了聊天室程序的編碼工作先口,下面運行服務(wù)端和客戶端程序型奥,運行結(jié)果如下:
MyChatClient(1) | MyChatClient(2) | MyChatClient(3) | MyChatServer |
---|---|---|---|
Run | None | None | /127.0.0.1:51049 上線了 |
[服務(wù)器] - /127.0.0.1:51055 加入了 | Run | None | /127.0.0.1:51055 上線了 |
[服務(wù)器] - /127.0.0.1:51114 加入了 | [服務(wù)器] - /127.0.0.1:51114 加入了 | Run | /127.0.0.1:51114 上線了 |
None:表示未運行 Run:表示運行