在netty中我們可以使用ChannelGroup方式進(jìn)行群發(fā)消息,ChannelGroup繼承Set接口
關(guān)鍵代碼
+初始化器
public class MyChannelInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel channel) {
// 基于換行符號
channel.pipeline().addLast(new LineBasedFrameDecoder(1024));
// 解碼轉(zhuǎn)String,注意調(diào)整自己的編碼格式GBK废士、UTF-8
channel.pipeline().addLast(new StringDecoder(Charset.forName("GBK")));
// 解碼轉(zhuǎn)String伍玖,注意調(diào)整自己的編碼格式GBK券膀、UTF-8
channel.pipeline().addLast(new StringEncoder(Charset.forName("GBK")));
// 在管道中添加我們自己的接收數(shù)據(jù)實(shí)現(xiàn)方法
channel.pipeline().addLast(new MyServerHandler());
}
}
- handler類
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.socket.SocketChannel;
import java.text.SimpleDateFormat;
import java.util.Date;
public class MyServerHandler extends ChannelInboundHandlerAdapter {
/**
* 當(dāng)客戶端主動(dòng)鏈接服務(wù)端的鏈接后钟鸵,這個(gè)通道就是活躍的了性昭。也就是客戶端與服務(wù)端建立了通信通道并且可以傳輸數(shù)據(jù)
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
//當(dāng)有客戶端鏈接后渣蜗,添加到channelGroup通信組
ChannelHandler.channelGroup.add(ctx.channel());
//日志信息
SocketChannel channel = (SocketChannel) ctx.channel();
System.out.println("鏈接報(bào)告開始");
System.out.println("鏈接報(bào)告信息:有一客戶端鏈接到本服務(wù)端");
System.out.println("鏈接報(bào)告IP:" + channel.localAddress().getHostString());
System.out.println("鏈接報(bào)告Port:" + channel.localAddress().getPort());
System.out.println("鏈接報(bào)告完畢");
//通知客戶端鏈接建立成功
String str = "通知客戶端鏈接建立成功" + " " + new Date() + " " + channel.localAddress().getHostString() + "\r\n";
ctx.writeAndFlush(str);
}
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
super.handlerAdded(ctx);
}
/**
* 當(dāng)客戶端主動(dòng)斷開服務(wù)端的鏈接后屠尊,這個(gè)通道就是不活躍的。也就是說客戶端與服務(wù)端的關(guān)閉了通信通道并且不可以傳輸數(shù)據(jù)
*/
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
System.out.println("客戶端斷開鏈接" + ctx.channel().localAddress().toString());
// 當(dāng)有客戶端退出后耕拷,從channelGroup中移除讼昆。
ChannelHandler.channelGroup.remove(ctx.channel());
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
//接收msg消息{與上一章節(jié)相比,此處已經(jīng)不需要自己進(jìn)行解碼}
System.out.println(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()) + " 接收到消息:" + msg);
//收到消息后骚烧,群發(fā)給客戶端
String str = "服務(wù)端收到:" + new Date() + " " + msg + "\r\n";
ChannelHandler.channelGroup.writeAndFlush(str);
}
/**
* 抓住異常浸赫,當(dāng)發(fā)生異常的時(shí)候,可以做一些相應(yīng)的處理赃绊,比如打印日志既峡、關(guān)閉鏈接
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
System.out.println("異常信息:\r\n" + cause.getMessage());
}
}
- ChannelHandler
public class ChannelHandler {
//用于存放用戶Channel信息,也可以建立map結(jié)構(gòu)模擬不同的消息群
public static ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
}
- 服務(wù)端
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
public class NettyServer {
public static void main(String[] args) {
new NettyServer().bing(7397);
}
private void bing(int port) {
//配置服務(wù)端NIO線程組
EventLoopGroup parentGroup = new NioEventLoopGroup(); //NioEventLoopGroup extends MultithreadEventLoopGroup Math.max(1, SystemPropertyUtil.getInt("io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2));
EventLoopGroup childGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(parentGroup, childGroup)
.channel(NioServerSocketChannel.class) //非阻塞模式
.option(ChannelOption.SO_BACKLOG, 128)
.childHandler(new MyChannelInitializer());
ChannelFuture f = b.bind(port).sync();
f.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
childGroup.shutdownGracefully();
parentGroup.shutdownGracefully();
}
}
}
啟動(dòng)我們的服務(wù)端碧查,啟動(dòng)成功之后运敢,使用windows命令行連接服務(wù)端
telnet localhost 7397(服務(wù)端的端口號)。
我們新建了四個(gè)客戶端窗口
客戶端窗口
channelGroup
我們在一個(gè)連接上的客戶端發(fā)送消息忠售,可以看到四個(gè)客戶端都收到了消息
客戶端
我們可以區(qū)分不同channel頻道發(fā)送的消息传惠,
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
//接收msg消息{與上一章節(jié)相比,此處已經(jīng)不需要自己進(jìn)行解碼}
System.out.println(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()) + " 接收到消息:" + msg);
//收到消息后稻扬,群發(fā)給客戶端
String str = "服務(wù)端收到:" + new Date() + " " + msg + "\r\n";
// ChannelHandler.channelGroup.writeAndFlush(str);
Channel channel = ctx.channel();
ChannelHandler.channelGroup.forEach(e -> {
if (channel != e) {
e.writeAndFlush(channel.remoteAddress() + "發(fā)送的消息:" + msg + "\n");
} else {
e.writeAndFlush("自己 :" + msg + "\n");
}
});
}
發(fā)送消息
可以看到自己收到的消息會(huì)和其他頻道略有卻別涉枫。