準(zhǔn)備工作
springboot 2.4.2
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.59.Final</version>
</dependency>
1.創(chuàng)建NettyServer.java,關(guān)鍵代碼如下
public class NettyServer {
private static final Logger logger = LoggerFactory.getLogger("-----NettyServer-----");
private RedisUtil redisUtil;
private HandlerService handlerService;
private static ChannelGroup deviceChannelGroup;
private static Map<String, ChannelId> deviceMap = new ConcurrentHashMap<>();
/**
* WEB-SOCKET
*/
private static ChannelGroup socketChannelGroup;
private static Map<String, ChannelId> socketMap = new ConcurrentHashMap<>();
/**
* bossGroup就是parentGroup瞒渠,是負(fù)責(zé)處理TCP/IP連接的
*/
private EventLoopGroup bossGroup = null;
/**
* workerGroup就是childGroup,是負(fù)責(zé)處理Channel(通道)的I/O事件
*/
private EventLoopGroup workerGroup = null;
public NettyServer(RedisUtil redisUtil, HandlerService handlerService) {
bossGroup = new NioEventLoopGroup(1);
workerGroup = new NioEventLoopGroup();
this.redisUtil = redisUtil;
this.handlerService = handlerService;
deviceChannelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
socketChannelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
}
public void bind(int tcp,int socket) throws Exception {
ServerBootstrap device = new ServerBootstrap();
device.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
//初始化服務(wù)端可連接隊(duì)列,指定了隊(duì)列的大小128
.option(ChannelOption.SO_BACKLOG, 1024)
//保持長(zhǎng)連接
.childOption(ChannelOption.SO_KEEPALIVE, true)
.childOption(ChannelOption.RCVBUF_ALLOCATOR, new FixedRecvByteBufAllocator(65535))
.childOption(ChannelOption.TCP_NODELAY, true)
.childOption(ChannelOption.SO_REUSEADDR, true)
// 綁定客戶端連接時(shí)候觸發(fā)操作
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel sh) throws Exception {
InetSocketAddress address = sh.remoteAddress();
logger.debug("TCP 客戶端IP:" + address.getAddress() + ":" + address.getPort());
sh.pipeline()
//項(xiàng)目需要,定長(zhǎng)消息,可以替換為其他的
.addLast(new FixedLengthFrameDecoder(10))
//消息處理
.addLast("HeartBeat", new HeartBeatHandler(redisUtil, handlerService));
}
});
//綁定監(jiān)聽(tīng)端口襟齿,調(diào)用sync同步阻塞方法等待綁定操作完成,完成后返回ChannelFuture類(lèi)似于JDK中Future
ServerBootstrap webSocket = new ServerBootstrap();
webSocket.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
//初始化服務(wù)端可連接隊(duì)列,指定了隊(duì)列的大小128
.option(ChannelOption.SO_BACKLOG, 1024)
//保持長(zhǎng)連接
.childOption(ChannelOption.SO_KEEPALIVE, true)
.childOption(ChannelOption.RCVBUF_ALLOCATOR, new FixedRecvByteBufAllocator(65535))
.childOption(ChannelOption.TCP_NODELAY, true)
.childOption(ChannelOption.SO_REUSEADDR, true)
// 綁定客戶端連接時(shí)候觸發(fā)操作
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel sh) throws Exception {
InetSocketAddress address = sh.remoteAddress();
logger.debug("WEB SOCKET客戶端IP:" + address.getAddress() + ":" + address.getPort());
sh.pipeline()
.addLast(new HttpServerCodec())
.addLast(new ChunkedWriteHandler())
.addLast(new HttpObjectAggregator(65535))
.addLast(new WebSocketServerProtocolHandler("/ws", "WebSocket", true, 65535))
.addLast(new WebSocketHandler());
}
});
//綁定監(jiān)聽(tīng)端口枕赵,調(diào)用sync同步阻塞方法等待綁定操作完成猜欺,完成后返回ChannelFuture類(lèi)似于JDK中Future
ChannelFuture futureDevice = device.bind(tcp).sync();
ChannelFuture futureWebSocket = webSocket.bind(socket).sync();
if (futureDevice.isSuccess()) {
logger.debug("TCP 服務(wù)端啟動(dòng)成功");
} else {
logger.debug("TCP 服務(wù)端啟動(dòng)失敗");
futureDevice.cause().printStackTrace();
bossGroup.shutdownGracefully(); //關(guān)閉線程組
workerGroup.shutdownGracefully();
}
if (futureWebSocket.isSuccess()) {
logger.debug("WEB-SOCKET服務(wù)端啟動(dòng)成功");
} else {
logger.debug("WEB-SOCKET服務(wù)端啟動(dòng)失敗");
futureWebSocket.cause().printStackTrace();
bossGroup.shutdownGracefully(); //關(guān)閉線程組
workerGroup.shutdownGracefully();
}
//成功綁定到端口之后,給channel增加一個(gè) 管道關(guān)閉的監(jiān)聽(tīng)器并同步阻塞,直到channel關(guān)閉,線程才會(huì)往下執(zhí)行,結(jié)束進(jìn)程。
futureDevice.channel().closeFuture().sync();
futureWebSocket.channel().closeFuture().sync();
}
public void unbind() {
if (null != bossGroup && !bossGroup.isShutdown()) {
bossGroup.shutdownGracefully();
bossGroup = null;
}
if (null != workerGroup && !workerGroup.isShutdown()) {
workerGroup.shutdownGracefully();
workerGroup = null;
}
}
/**
* WEB-SOCKET 操作 開(kāi)始
*/
public static void socketAdd(Channel channel) {
socketChannelGroup.add(channel);
}
public static void socketRemove(Channel channel) {
socketChannelGroup.remove(channel);
removeSocketChannelId(channel.id());
}
public static ChannelGroup socketChannelGroup() {
return socketChannelGroup;
}
public static void putSocketChannelId(String code, ChannelId channelId) {
socketMap.put(code, channelId);
}
public static void removeSocketChannelId(ChannelId channelId) {
socketMap.entrySet().removeIf(item -> item.getValue().equals(channelId));
}
public static ChannelId socketChannelId(String code) {
return socketMap.getOrDefault(code, null);
}
public static Channel socketChannel(ChannelId channelId){
return socketChannelGroup.find(channelId);
}
public static Map<String,ChannelId> socketMap(){
return socketMap;
}
/**
* WEB-SOCKET 操作結(jié)束
* DEVICE 操作 開(kāi)始
*/
public static void deviceAdd(Channel channel) {
deviceChannelGroup.add(channel);
}
public static void deviceRemove(Channel channel) {
deviceChannelGroup.remove(channel);
removeDeviceChannelId(channel.id());
}
public static ChannelGroup deviceChannelGroup() {
return deviceChannelGroup;
}
public static void putDeviceChannelId(String code, ChannelId channelId) {
deviceMap.put(code, channelId);
}
public static void removeDeviceChannelId(ChannelId channelId) {
deviceMap.entrySet().removeIf(item -> item.getValue().equals(channelId));
}
public static ChannelId deviceChannelId(String code) {
return deviceMap.getOrDefault(code, null);
}
public static Channel deviceChannel(ChannelId channelId){
return deviceChannelGroup.find(channelId);
}
public static Map<String,ChannelId> deviceMap(){
return deviceMap;
}
/**
* DEVICE 操作 結(jié)束
*/
}
2.創(chuàng)建對(duì)應(yīng)消息處理類(lèi)
2.1(ModBus)消息處理類(lèi) HeartBeatHandler.java
@ChannelHandler.Sharable
public class HeartBeatHandler extends ChannelInboundHandlerAdapter {
private static final Logger logger = LoggerFactory.getLogger("-----HeartBeatHandler-----");
private RedisUtil redisUtil;
private HandlerService handlerService;
public HeartBeatHandler(RedisUtil redisUtil, HandlerService handlerService) {
this.redisUtil = redisUtil;
this.handlerService = handlerService;
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
//以下為示例代碼拷窜,具體按實(shí)際功能需求來(lái)开皿;
String code = "具體獲取code操作";
sendMessageToWebSocket(code,"發(fā)送消息");
}
public void sendMessageToWebSocket(String code, String message) {
ChannelId channelId = NettyServer.socketChannelId(code);
if (channelId != null) {
Channel socketChannel = NettyServer.socketChannel(channelId);
if (socketChannel != null) {
socketChannel.writeAndFlush(new TextWebSocketFrame(message)).addListener((ChannelFutureListener) future -> {
logger.info("WEB SOCKET {},{}", code, message);
logger.info("WEB SOCKET DONE:{}", future.isDone());
logger.info("WEB SOCKET SUCCESS:{}", future.isSuccess());
});
} else {
logger.info("channels is null");
}
} else {
logger.info("channelsId is null");
}
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
logger.info("接收到客戶端信息完成");
ctx.flush();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
if (cause instanceof Exception) {
logger.info("異常捕獲");
cause.printStackTrace();
}
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
logger.info("CLIENT" + getRemoteAddress(ctx) + " 接入連接");
NettyServer.deviceAdd(ctx.channel());
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
logger.info("CLIENT" + getRemoteAddress(ctx) + " 斷開(kāi)連接");
NettyServer.deviceRemove(ctx.channel());
ctx.close();
}
public static String getRemoteAddress(ChannelHandlerContext ctx) {
return ctx.channel().remoteAddress().toString();
}
2.2(WebSocket)消息處理類(lèi) WebSocketHandler.java
public class WebSocketHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
private static final Logger logger = LoggerFactory.getLogger(WebSocketHandler.class);
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
Channel channel = ctx.channel();
channel.id();
logger.info("與客戶端建立連接,通道開(kāi)啟装黑!channelId:{}",channel.id());
// 添加到channelGroup通道組
NettyServer.socketAdd(ctx.channel());
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
logger.info("與客戶端建立連接副瀑,通道關(guān)閉!");
NettyServer.socketRemove(ctx.channel());
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {
logger.info("服務(wù)器收到的數(shù)據(jù):" + msg.text());
NettyServer.putSocketChannelId(msg.text(),ctx.channel().id());
//簡(jiǎn)易的保持心跳
sendMessage(ctx);
}
private void sendMessage(ChannelHandlerContext ctx) {
logger.info("服務(wù)器回復(fù):0");
ctx.channel().writeAndFlush(new TextWebSocketFrame("0")).addListener((ChannelFutureListener) future -> {
logger.info("WEB-SOCKET 心跳回復(fù):0");
logger.info("WEB SOCKET DONE:{}",future.isDone());
logger.info("WEB SOCKET SUCCESS:{}",future.isSuccess());
});;
}
private void sendAllMessage() {
String message = "發(fā)送群消息";
NettyServer.socketChannelGroup().writeAndFlush(new TextWebSocketFrame(message));
}
3.使用方法
在對(duì)應(yīng)的SpringBoot 啟動(dòng)類(lèi)中使用
@Component
public static class StartApplication implements ApplicationRunner {
private NettyServer nettyServer;
@Resource
private HandlerService handlerService;
@Resource
private RedisUtil redis;
@Override
public void run(ApplicationArguments args) throws Exception {
logger.info("進(jìn)程開(kāi)啟恋谭!");
nettyServer = new NettyServer(redis, handlerService);
nettyServer.bind(port1,port2);
}
@PreDestroy
public void destroy() throws Exception {
logger.info("進(jìn)程關(guān)閉糠睡!");
nettyServer.unbind();
}
}
4.至此,功能完成
感謝您的關(guān)注疚颊,有使用不當(dāng)?shù)牡胤奖房祝?qǐng)指正信认,愿共勉··· ···
不接收辱罵,如有不適均抽,請(qǐng)移步··· ···