springboot+netty疮胖,webSocket與modbus同時(shí)使用

準(zhǔn)備工作

  springboot 2.4.2
<dependency>
  <groupId>io.netty</groupId>
  <artifactId>netty-all</artifactId>
  <version>4.1.59.Final</version>
 </dependency>

1.創(chuàng)建NettyServer.java,關(guān)鍵代碼如下

public class NettyServer {
    private static final Logger logger = LoggerFactory.getLogger("-----NettyServer-----");
    private RedisUtil redisUtil;
    private HandlerService handlerService;

    private static ChannelGroup deviceChannelGroup;
    private static Map<String, ChannelId> deviceMap = new ConcurrentHashMap<>();
    /**
     * WEB-SOCKET
     */
    private static ChannelGroup socketChannelGroup;
    private static Map<String, ChannelId> socketMap = new ConcurrentHashMap<>();

    /**
     * bossGroup就是parentGroup瞒渠,是負(fù)責(zé)處理TCP/IP連接的
     */
    private EventLoopGroup bossGroup = null;
    /**
     * workerGroup就是childGroup,是負(fù)責(zé)處理Channel(通道)的I/O事件
     */

    private EventLoopGroup workerGroup = null;

    public NettyServer(RedisUtil redisUtil, HandlerService handlerService) {
        bossGroup = new NioEventLoopGroup(1);
        workerGroup = new NioEventLoopGroup();
        this.redisUtil = redisUtil;
        this.handlerService = handlerService;
        deviceChannelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
        socketChannelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
    }

    public void bind(int tcp,int socket) throws Exception {
        ServerBootstrap device = new ServerBootstrap();
        device.group(bossGroup, workerGroup)
                .channel(NioServerSocketChannel.class)
                //初始化服務(wù)端可連接隊(duì)列,指定了隊(duì)列的大小128
                .option(ChannelOption.SO_BACKLOG, 1024)
                //保持長(zhǎng)連接
                .childOption(ChannelOption.SO_KEEPALIVE, true)
                .childOption(ChannelOption.RCVBUF_ALLOCATOR, new FixedRecvByteBufAllocator(65535))
                .childOption(ChannelOption.TCP_NODELAY, true)
                .childOption(ChannelOption.SO_REUSEADDR, true)
                // 綁定客戶端連接時(shí)候觸發(fā)操作
                .childHandler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel sh) throws Exception {
                        InetSocketAddress address = sh.remoteAddress();

                        logger.debug("TCP 客戶端IP:" + address.getAddress() + ":" + address.getPort());
                        sh.pipeline()
                                //項(xiàng)目需要,定長(zhǎng)消息,可以替換為其他的
                                .addLast(new FixedLengthFrameDecoder(10))
                                //消息處理
                                .addLast("HeartBeat", new HeartBeatHandler(redisUtil, handlerService));
                    }
                });
        //綁定監(jiān)聽(tīng)端口襟齿,調(diào)用sync同步阻塞方法等待綁定操作完成,完成后返回ChannelFuture類(lèi)似于JDK中Future

        ServerBootstrap webSocket = new ServerBootstrap();
        webSocket.group(bossGroup, workerGroup)
                .channel(NioServerSocketChannel.class)
                //初始化服務(wù)端可連接隊(duì)列,指定了隊(duì)列的大小128
                .option(ChannelOption.SO_BACKLOG, 1024)
                //保持長(zhǎng)連接
                .childOption(ChannelOption.SO_KEEPALIVE, true)
                .childOption(ChannelOption.RCVBUF_ALLOCATOR, new FixedRecvByteBufAllocator(65535))
                .childOption(ChannelOption.TCP_NODELAY, true)
                .childOption(ChannelOption.SO_REUSEADDR, true)
                // 綁定客戶端連接時(shí)候觸發(fā)操作
                .childHandler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel sh) throws Exception {
                        InetSocketAddress address = sh.remoteAddress();

                        logger.debug("WEB SOCKET客戶端IP:" + address.getAddress() + ":" + address.getPort());
                        sh.pipeline()
                                .addLast(new HttpServerCodec())
                                .addLast(new ChunkedWriteHandler())
                                .addLast(new HttpObjectAggregator(65535))
                                .addLast(new WebSocketServerProtocolHandler("/ws", "WebSocket", true, 65535))
                                .addLast(new WebSocketHandler());
                    }
                });
        //綁定監(jiān)聽(tīng)端口枕赵,調(diào)用sync同步阻塞方法等待綁定操作完成猜欺,完成后返回ChannelFuture類(lèi)似于JDK中Future
        ChannelFuture futureDevice = device.bind(tcp).sync();
        ChannelFuture futureWebSocket = webSocket.bind(socket).sync();
        if (futureDevice.isSuccess()) {
            logger.debug("TCP 服務(wù)端啟動(dòng)成功");
        } else {
            logger.debug("TCP 服務(wù)端啟動(dòng)失敗");
            futureDevice.cause().printStackTrace();
            bossGroup.shutdownGracefully(); //關(guān)閉線程組
            workerGroup.shutdownGracefully();
        }
        if (futureWebSocket.isSuccess()) {
            logger.debug("WEB-SOCKET服務(wù)端啟動(dòng)成功");
        } else {
            logger.debug("WEB-SOCKET服務(wù)端啟動(dòng)失敗");
            futureWebSocket.cause().printStackTrace();
            bossGroup.shutdownGracefully(); //關(guān)閉線程組
            workerGroup.shutdownGracefully();
        }
        //成功綁定到端口之后,給channel增加一個(gè) 管道關(guān)閉的監(jiān)聽(tīng)器并同步阻塞,直到channel關(guān)閉,線程才會(huì)往下執(zhí)行,結(jié)束進(jìn)程。
        futureDevice.channel().closeFuture().sync();
        futureWebSocket.channel().closeFuture().sync();

    }

    public void unbind() {
        if (null != bossGroup && !bossGroup.isShutdown()) {
            bossGroup.shutdownGracefully();
            bossGroup = null;
        }
        if (null != workerGroup && !workerGroup.isShutdown()) {
            workerGroup.shutdownGracefully();
            workerGroup = null;
        }
    }

    /**
     * WEB-SOCKET 操作 開(kāi)始
     */
    public static void socketAdd(Channel channel) {
        socketChannelGroup.add(channel);
    }

    public static void socketRemove(Channel channel) {
        socketChannelGroup.remove(channel);
        removeSocketChannelId(channel.id());
    }

    public static ChannelGroup socketChannelGroup() {
        return socketChannelGroup;
    }

    public static void putSocketChannelId(String code, ChannelId channelId) {
        socketMap.put(code, channelId);
    }

    public static void removeSocketChannelId(ChannelId channelId) {
        socketMap.entrySet().removeIf(item -> item.getValue().equals(channelId));
    }
    public static ChannelId socketChannelId(String code) {
        return socketMap.getOrDefault(code, null);
    }

    public static Channel socketChannel(ChannelId channelId){
        return socketChannelGroup.find(channelId);
    }
    public static Map<String,ChannelId> socketMap(){
        return socketMap;
    }

    /**
     * WEB-SOCKET 操作結(jié)束
     * DEVICE 操作 開(kāi)始
     */
    public static void deviceAdd(Channel channel) {
        deviceChannelGroup.add(channel);
    }

    public static void deviceRemove(Channel channel) {
        deviceChannelGroup.remove(channel);
        removeDeviceChannelId(channel.id());
    }

    public static ChannelGroup deviceChannelGroup() {
        return deviceChannelGroup;
    }

    public static void putDeviceChannelId(String code, ChannelId channelId) {
        deviceMap.put(code, channelId);
    }

    public static void removeDeviceChannelId(ChannelId channelId) {
        deviceMap.entrySet().removeIf(item -> item.getValue().equals(channelId));
    }

    public static ChannelId deviceChannelId(String code) {
        return deviceMap.getOrDefault(code, null);
    }

    public static Channel deviceChannel(ChannelId channelId){
        return deviceChannelGroup.find(channelId);
    }
    public static Map<String,ChannelId> deviceMap(){
        return deviceMap;
    }
    /**
     * DEVICE 操作 結(jié)束
     */
}

2.創(chuàng)建對(duì)應(yīng)消息處理類(lèi)

2.1(ModBus)消息處理類(lèi) HeartBeatHandler.java
@ChannelHandler.Sharable
public class HeartBeatHandler extends ChannelInboundHandlerAdapter {
    private static final Logger logger = LoggerFactory.getLogger("-----HeartBeatHandler-----");
    private RedisUtil redisUtil;
    private HandlerService handlerService;

    public HeartBeatHandler(RedisUtil redisUtil, HandlerService handlerService) {
        this.redisUtil = redisUtil;
        this.handlerService = handlerService;
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        //以下為示例代碼拷窜,具體按實(shí)際功能需求來(lái)开皿;
        String code = "具體獲取code操作";
        sendMessageToWebSocket(code,"發(fā)送消息");
    }

    public void sendMessageToWebSocket(String code, String message) {
        ChannelId channelId = NettyServer.socketChannelId(code);
        if (channelId != null) {
            Channel socketChannel = NettyServer.socketChannel(channelId);
            if (socketChannel != null) {
                socketChannel.writeAndFlush(new TextWebSocketFrame(message)).addListener((ChannelFutureListener) future -> {
                    logger.info("WEB SOCKET {},{}", code, message);
                    logger.info("WEB SOCKET DONE:{}", future.isDone());
                    logger.info("WEB SOCKET SUCCESS:{}", future.isSuccess());
                });
            } else {
                logger.info("channels is null");
            }
        } else {
            logger.info("channelsId is null");
        }
    }
    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        logger.info("接收到客戶端信息完成");
        ctx.flush();
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        if (cause instanceof Exception) {
            logger.info("異常捕獲");
            cause.printStackTrace();
        }
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        logger.info("CLIENT" + getRemoteAddress(ctx) + " 接入連接");
        NettyServer.deviceAdd(ctx.channel());
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        logger.info("CLIENT" + getRemoteAddress(ctx) + " 斷開(kāi)連接");
        NettyServer.deviceRemove(ctx.channel());
        ctx.close();
    }

    public static String getRemoteAddress(ChannelHandlerContext ctx) {
        return ctx.channel().remoteAddress().toString();
    }
2.2(WebSocket)消息處理類(lèi) WebSocketHandler.java
public class WebSocketHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
    private static final Logger logger = LoggerFactory.getLogger(WebSocketHandler.class);
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        Channel channel = ctx.channel();
        channel.id();
        logger.info("與客戶端建立連接,通道開(kāi)啟装黑!channelId:{}",channel.id());
        // 添加到channelGroup通道組
        NettyServer.socketAdd(ctx.channel());
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        logger.info("與客戶端建立連接副瀑,通道關(guān)閉!");
        NettyServer.socketRemove(ctx.channel());
    }

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {
        logger.info("服務(wù)器收到的數(shù)據(jù):" + msg.text());
        NettyServer.putSocketChannelId(msg.text(),ctx.channel().id());
        //簡(jiǎn)易的保持心跳
        sendMessage(ctx);
    }

    private void sendMessage(ChannelHandlerContext ctx) {
        logger.info("服務(wù)器回復(fù):0");
        ctx.channel().writeAndFlush(new TextWebSocketFrame("0")).addListener((ChannelFutureListener) future -> {
            logger.info("WEB-SOCKET 心跳回復(fù):0");
            logger.info("WEB SOCKET DONE:{}",future.isDone());
            logger.info("WEB SOCKET SUCCESS:{}",future.isSuccess());
        });;
    }

    private void sendAllMessage() {
        String message = "發(fā)送群消息";
        NettyServer.socketChannelGroup().writeAndFlush(new TextWebSocketFrame(message));
    }

3.使用方法

在對(duì)應(yīng)的SpringBoot 啟動(dòng)類(lèi)中使用
@Component
public static class StartApplication implements ApplicationRunner {
  private NettyServer nettyServer;
      @Resource
      private HandlerService handlerService;
      @Resource
      private RedisUtil redis;
      @Override
      public void run(ApplicationArguments args) throws Exception {
          logger.info("進(jìn)程開(kāi)啟恋谭!");
          nettyServer = new NettyServer(redis, handlerService);
          nettyServer.bind(port1,port2);
      }
      @PreDestroy
      public void destroy() throws Exception {
          logger.info("進(jìn)程關(guān)閉糠睡!");
          nettyServer.unbind();
        }
}

4.至此,功能完成

感謝您的關(guān)注疚颊,有使用不當(dāng)?shù)牡胤奖房祝?qǐng)指正信认,愿共勉··· ···
不接收辱罵,如有不適均抽,請(qǐng)移步··· ···
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末嫁赏,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子油挥,更是在濱河造成了極大的恐慌潦蝇,老刑警劉巖,帶你破解...
    沈念sama閱讀 218,941評(píng)論 6 508
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件深寥,死亡現(xiàn)場(chǎng)離奇詭異攘乒,居然都是意外死亡,警方通過(guò)查閱死者的電腦和手機(jī)惋鹅,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,397評(píng)論 3 395
  • 文/潘曉璐 我一進(jìn)店門(mén)则酝,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái),“玉大人闰集,你說(shuō)我怎么就攤上這事沽讹。” “怎么了武鲁?”我有些...
    開(kāi)封第一講書(shū)人閱讀 165,345評(píng)論 0 356
  • 文/不壞的土叔 我叫張陵爽雄,是天一觀的道長(zhǎng)。 經(jīng)常有香客問(wèn)我洞坑,道長(zhǎng)盲链,這世上最難降的妖魔是什么? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 58,851評(píng)論 1 295
  • 正文 為了忘掉前任迟杂,我火速辦了婚禮刽沾,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘排拷。我一直安慰自己侧漓,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,868評(píng)論 6 392
  • 文/花漫 我一把揭開(kāi)白布监氢。 她就那樣靜靜地躺著布蔗,像睡著了一般。 火紅的嫁衣襯著肌膚如雪浪腐。 梳的紋絲不亂的頭發(fā)上纵揍,一...
    開(kāi)封第一講書(shū)人閱讀 51,688評(píng)論 1 305
  • 那天,我揣著相機(jī)與錄音议街,去河邊找鬼泽谨。 笑死,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的吧雹。 我是一名探鬼主播骨杂,決...
    沈念sama閱讀 40,414評(píng)論 3 418
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼雄卷!你這毒婦竟也來(lái)了搓蚪?” 一聲冷哼從身側(cè)響起,我...
    開(kāi)封第一講書(shū)人閱讀 39,319評(píng)論 0 276
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤丁鹉,失蹤者是張志新(化名)和其女友劉穎妒潭,沒(méi)想到半個(gè)月后,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體鳄炉,經(jīng)...
    沈念sama閱讀 45,775評(píng)論 1 315
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡杜耙,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,945評(píng)論 3 336
  • 正文 我和宋清朗相戀三年搜骡,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了拂盯。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 40,096評(píng)論 1 350
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡记靡,死狀恐怖谈竿,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情摸吠,我是刑警寧澤空凸,帶...
    沈念sama閱讀 35,789評(píng)論 5 346
  • 正文 年R本政府宣布,位于F島的核電站寸痢,受9級(jí)特大地震影響呀洲,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜啼止,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,437評(píng)論 3 331
  • 文/蒙蒙 一道逗、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧献烦,春花似錦滓窍、人聲如沸。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 31,993評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)。三九已至即横,卻和暖如春噪生,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背东囚。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 33,107評(píng)論 1 271
  • 我被黑心中介騙來(lái)泰國(guó)打工跺嗽, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 48,308評(píng)論 3 372
  • 正文 我出身青樓抛蚁,卻偏偏與公主長(zhǎng)得像陈醒,于是被迫代替她去往敵國(guó)和親。 傳聞我的和親對(duì)象是個(gè)殘疾皇子瞧甩,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,037評(píng)論 2 355

推薦閱讀更多精彩內(nèi)容