SpringBoot基于WebSocket進(jìn)行推送

1. 整體流程

客戶端發(fā)起http請求疯兼,請求Netty服務(wù)器進(jìn)行WebSocket連接教馆,服務(wù)器接收后請求后進(jìn)行注冊信道并登記客戶端IP地址馋缅,如此一來就建立了WebSocket通訊連接扒腕。

上面的論述可以得出,我們可以比較Http和WebSocket兩者之間的關(guān)系和區(qū)別

  • 都是基于TCP協(xié)議的兩種通信協(xié)議萤悴;

  • 兩者不相同但是WebSocket又依賴于Http瘾腰,畢竟建立WebSocket通訊首次請求需要使用Http請求;

  • Http請求只能是客戶端對服務(wù)器進(jìn)行發(fā)送消息覆履,服務(wù)器是被動的蹋盆;而WebSocket則允許雙方可以任由一方主動發(fā)送消息給另外一方,而且能夠長時間連接硝全,只要任何一方不斷開的話栖雾。

基于上述WebSocket的特點,可以實現(xiàn)在線設(shè)備的消息推送

2. Springboot整合WebSocket

  • 2.1 依賴添加
<dependency>
            <groupId>io.netty</groupId>
            <artifactId>netty-all</artifactId>
        </dependency>

        <!-- hutool-all工具庫 -->
        <dependency>
            <groupId>cn.hutool</groupId>
            <artifactId>hutool-all</artifactId>
            <version>5.4.6</version>
        </dependency>

        <!-- fastjson依賴 -->
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.74</version>
        </dependency>

        <!-- aop依賴 -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-aop</artifactId>
        </dependency>
  • 2.2 WebSocket信道處理實現(xiàn)伟众,處理通訊的業(yè)務(wù)邏輯
package com.ryan.websocketpush.netty.handler;


import com.alibaba.fastjson.JSONObject;

import com.ryan.websocketpush.netty.global.ChannelSupervise;
import com.ryan.websocketpush.netty.service.TcpDispacher;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.handler.codec.http.DefaultFullHttpResponse;
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http.HttpVersion;
import io.netty.handler.codec.http.websocketx.*;
import io.netty.util.CharsetUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.util.Date;
import static io.netty.handler.codec.http.HttpUtil.isKeepAlive;

@Component
@ChannelHandler.Sharable
public class NioWebSocketHandler extends SimpleChannelInboundHandler<Object> {

    private final Logger logger = LoggerFactory.getLogger(getClass());

    private WebSocketServerHandshaker handshaker;

    @Autowired
    private TcpDispacher tcpDispacher;


    /**
     * 服務(wù)器接收消息
     *
     * @param ctx
     * @param msg
     * @throws Exception
     */
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
        logger.debug("收到消息:" + msg);
        if (msg instanceof FullHttpRequest) {             //第一次通過Http請求進(jìn)行WebSocket通訊連接
            //以http請求形式接入析藕,但是走的是websocket
            handleHttpRequest(ctx, (FullHttpRequest) msg);
            JSONObject jsonObject = new JSONObject();
            String result = jsonObject.toJSONString();
            TextWebSocketFrame tws = new TextWebSocketFrame(result);
            ctx.channel().writeAndFlush(tws);
        } else if (msg instanceof WebSocketFrame) {
            //處理websocket客戶端的消息
            logger.debug("收到消息:" + msg);
            handlerWebSocketFrame(ctx, (WebSocketFrame) msg);
        }
    }


    /**
     * 客戶端連接并添加通道
     *
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        //添加連接
        logger.debug("客戶端加入連接:" + ctx.channel());
        ChannelSupervise.addChannel(ctx.channel());
//        SocketActiveRequest socketActiveRequest = new SocketActiveRequest();
//        socketActiveRequest.setIp(NettyUtils.getClientIP(ctx));
//        socketActiveRequest.setStatus(Constants.SOCKET_STATUS.LOGIN);
//        tcpDispacher.messageRecived(ctx, JsonUtil.toJson(socketActiveRequest));
    }

    /**
     * 客戶端斷開并刪除通道
     *
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        //斷開連接
        logger.debug("客戶端斷開連接:" + ctx.channel());

        TextWebSocketFrame tws = new TextWebSocketFrame("break");
        ctx.channel().writeAndFlush(tws);


    }


    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        ctx.flush();
    }

    private void handlerWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame frame) {


        /**
         * 判斷是否關(guān)閉鏈路的指令
         */
        if (frame instanceof CloseWebSocketFrame) {
            ChannelSupervise.removeChannel(ctx.channel());
            handshaker.close(ctx.channel(), (CloseWebSocketFrame) frame.retain());
            return;
        }
        /**
         * 判斷是否ping消息
         */
        if (frame instanceof PingWebSocketFrame) {
            ctx.channel().write(
                    new PongWebSocketFrame(frame.content().retain()));
            return;
        }
        /**
         * 僅支持文本消息,不支持二進(jìn)制消息
         */
        if (!(frame instanceof TextWebSocketFrame)) {
            logger.debug("僅支持文本消息凳厢,不支持二進(jìn)制消息");
            throw new UnsupportedOperationException(String.format(
                    "%s frame types not supported", frame.getClass().getName()));
        }


        /**
         * 返回應(yīng)答消息
         */
        String request = ((TextWebSocketFrame) frame).text();

        tcpDispacher.messageRecived(ctx, request);


        logger.debug("服務(wù)端收到:" + request);
        TextWebSocketFrame tws = new TextWebSocketFrame(new Date().toString()
                + ctx.channel().id() + ":" + request);


        //群發(fā)
        //ChannelSupervise.send2All(tws);


        //        /**
        //         * 返回【誰發(fā)的發(fā)給誰】
        //         */
//         ctx.channel().writeAndFlush(tws);
    }

    /**
     * 唯一的一次http請求账胧,用于創(chuàng)建websocket
     */
    private void handleHttpRequest(ChannelHandlerContext ctx,
                                   FullHttpRequest req) {
        //要求Upgrade為websocket,過濾掉get/Post
        if (!req.decoderResult().isSuccess()
                || (!"websocket".equals(req.headers().get("Upgrade")))) {
            //若不是websocket方式数初,則創(chuàng)建BAD_REQUEST的req找爱,返回給客戶端
            sendHttpResponse(ctx, req, new DefaultFullHttpResponse(
                    HttpVersion.HTTP_1_1, HttpResponseStatus.BAD_REQUEST));
            return;
        }
        WebSocketServerHandshakerFactory wsFactory = new WebSocketServerHandshakerFactory("ws://localhost:8088/websocket", null, false);
        handshaker = wsFactory.newHandshaker(req);
        if (handshaker == null) {
            WebSocketServerHandshakerFactory
                    .sendUnsupportedVersionResponse(ctx.channel());
        } else {
            handshaker.handshake(ctx.channel(), req);
        }
    }

    /**
     * 拒絕不合法的請求,并返回錯誤信息
     */
    private static void sendHttpResponse(ChannelHandlerContext ctx,
                                         FullHttpRequest req, DefaultFullHttpResponse res) {
        /**
         *  返回應(yīng)答給客戶端
         */
        if (res.status().code() != 200) {
            ByteBuf buf = Unpooled.copiedBuffer(res.status().toString(),
                    CharsetUtil.UTF_8);
            res.content().writeBytes(buf);
            buf.release();
        }
        ChannelFuture f = ctx.channel().writeAndFlush(res);

        /**
         * 如果是非Keep-Alive泡孩,關(guān)閉連接
         */
        if (!isKeepAlive(req) || res.status().code() != 200) {
            f.addListener(ChannelFutureListener.CLOSE);
        }
    }
}

  • 2.3 連接通信信道的初始化注冊
package com.ryan.websocketpush.netty.handler;

import io.netty.channel.ChannelInitializer;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.logging.LoggingHandler;
import io.netty.handler.stream.ChunkedWriteHandler;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
public class NioWebSocketChannelInitializer extends ChannelInitializer<SocketChannel> {

    @Autowired
    private NioWebSocketHandler socketHandler ;

    @Override
    protected void initChannel(SocketChannel ch) {
        ch.pipeline().addLast("logging",new LoggingHandler("DEBUG"));                        //設(shè)置log監(jiān)聽器车摄,并且日志級別為debug,方便觀察運行流程
        ch.pipeline().addLast("http-codec",new HttpServerCodec());                           //設(shè)置解碼器
        ch.pipeline().addLast("aggregator",new HttpObjectAggregator(65536));                 //聚合器仑鸥,使用websocket會用到
        ch.pipeline().addLast("http-chunked",new ChunkedWriteHandler());                     //用于大數(shù)據(jù)的分區(qū)傳輸
        ch.pipeline().addLast("handler", socketHandler);                                     //自定義的業(yè)務(wù)handler
    }
}
  • 2.4 注冊完信道Channel后吮播,要對信道進(jìn)行通過全局管理起來
package com.ryan.websocketpush.netty.global;

import cn.hutool.core.collection.CollUtil;
import com.ryan.websocketpush.utils.JsonUtils2;
import io.netty.channel.Channel;
import io.netty.channel.ChannelId;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.util.concurrent.GlobalEventExecutor;

import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;

/**
 * 頻道管理
 */
public class ChannelSupervise {

    /**
     * 頻道集群管理
     */
    private static ChannelGroup GlobalGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);

    /**
     * 頻道管理 【channel.id().asShortText():channel.id()】
     */
    private static ConcurrentMap<String, ChannelId> ChannelMap = new ConcurrentHashMap();

    /**
     * (動作碼:頻道)映射
     */
    private static ConcurrentHashMap<Object, ChannelGroup> actionGlobalGroupMap = new ConcurrentHashMap<>();


    /**
     * 增加頻道
     *
     * @param channel
     */
    public static void addChannel(Channel channel) {
        GlobalGroup.add(channel);
        ChannelMap.put(channel.id().asShortText(), channel.id());
    }

    /**
     * 移除頻道
     *
     * @param channel
     */
    public static void removeChannel(Channel channel) {
        GlobalGroup.remove(channel);
        ChannelMap.remove(channel.id().asShortText());
        removeChannelByActionCode(channel);
    }

    /**
     * 查找頻道
     *
     * @param id
     * @return
     */
    public static Channel findChannel(String id) {
        return GlobalGroup.find(ChannelMap.get(id));
    }

    /**
     * 推送消息
     *
     * @param tws
     */
    public static void send2All(TextWebSocketFrame tws) {
        GlobalGroup.writeAndFlush(tws);
    }


    public static void addChannelByActionCode(Channel channel, int actionCode) {
        ChannelGroup channels = actionGlobalGroupMap.get(actionCode);
        if (channels == null){
            channels =  new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
        }
        channels.add(channel);
        actionGlobalGroupMap.put(actionCode, channels);
       // addChannelByObject(channel, actionCode);
    }


    public static void addChannelByObject(Channel channel, Object object) {
        ChannelGroup channels = actionGlobalGroupMap.get(object);
        if (channels == null){
            channels =  new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
        }
        channels.add(channel);
        actionGlobalGroupMap.put(object, channels);
    }



    public static void removeChannelByActionCode(Channel channel){
        for (Object actionCode : actionGlobalGroupMap.keySet()){
            ChannelGroup channels = actionGlobalGroupMap.get(actionCode);
            removeChannel(channels, channel);
        }
    }

    private static void removeChannel(ChannelGroup channels , Channel channel){
        if (channels != null && channel != null){
            channels.remove(channel);
            channels.remove(channel.id().asShortText());
        }
    }


    public static void removeByObjectChannel(Channel channel, Object object){
        ChannelGroup channels = actionGlobalGroupMap.get(object);
        removeChannel(channels, channel);
    }


    /**
     * 發(fā)送到所有在線設(shè)備(通過動作碼去推送消息)
     *
     * @param action
     * @param entity
     */
    public static void send2AllAction(Object action, Object entity) {
        ChannelGroup channels = actionGlobalGroupMap.get(action);
        if (channels != null){
            channels.writeAndFlush( new TextWebSocketFrame(JsonUtils2.writeValue(entity)));
        }
    }


    public static boolean existActionChannelGroup(int action){
        return CollUtil.isNotEmpty(actionGlobalGroupMap.get(action));
    }


    public static boolean existObjectChannelGroup(Object object){
        return CollUtil.isNotEmpty(actionGlobalGroupMap.get(object));
    }


}

  • 2.5 配置好Netty服務(wù)器之后,然后創(chuàng)建一新的線程進(jìn)行Netty服務(wù)器的
package com.ryan.websocketpush.netty.service;

import com.ryan.websocketpush.netty.handler.NioWebSocketChannelInitializer;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;

@Component
public class NioWebSocketServer {
    private final Logger logger = LoggerFactory.getLogger(getClass());

    @Autowired
    private NioWebSocketChannelInitializer channelInitializer;

    public void init(Integer webSocketPort) {
        logger.info("正在啟動websocket服務(wù)器");
        NioEventLoopGroup boss = new NioEventLoopGroup();
        NioEventLoopGroup work = new NioEventLoopGroup();
        try {
            ServerBootstrap bootstrap = new ServerBootstrap();
            bootstrap.group(boss, work);
            bootstrap.channel(NioServerSocketChannel.class);
            bootstrap.childHandler(channelInitializer);
            Channel channel = bootstrap.bind(webSocketPort).sync().channel();
            logger.info("webSocket服務(wù)器啟動成功:" + channel);
            logger.info("webSocket端口號:" + webSocketPort);
            channel.closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
            logger.info("運行出錯:" + e);
        } finally {
            boss.shutdownGracefully();
            work.shutdownGracefully();
            logger.info("websocket服務(wù)器已關(guān)閉");
        }
    }

}

創(chuàng)建線程啟動Netty服務(wù)器

package com.ryan.websocketpush.netty.mainThread;

import com.ryan.websocketpush.netty.service.NioWebSocketServer;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Component;

import java.util.List;

/**
 * Netty服務(wù)線程啟動
 *
 */
@Component
@Slf4j
public class NettyServerThread implements CommandLineRunner {

    @Autowired
    private NioWebSocketServer nettyServer;

    @Value("${webSokcet.port}")
    private Integer webSocketPort;

    @Override
    public void run(String... args) throws Exception {
        log.warn("websocket服務(wù)器準(zhǔn)備啟動眼俊! 端口為 :  " + webSocketPort);
        nettyServer.init(webSocketPort);

    }
}

也許這里意狠,有些疑問為什么需要創(chuàng)建線程來啟動Netty服務(wù)器,這里嘗試一下如果沒有創(chuàng)建線程疮胖,直接在Main主線程啟動环戈,看看會有什么問題闷板?這個問題留著后面再去說明

在配置文件中進(jìn)行WebSocket的端口配置

 webSokcet.port=8090

上面的步驟進(jìn)行完成了一個Netty服務(wù)器的初始化啟動和端口監(jiān)聽,這時候會有疑問了院塞,消息推送功能還沒有實現(xiàn)遮晚。所以下面就是我們?nèi)绾稳崿F(xiàn)消息處理邏輯

  • 2.6 編寫自定義注解

    • WebSocket動作碼注解

      package com.ryan.websocketpush.annotation;
      
      import java.lang.annotation.*;
      
      /**
       *
       * socket 動作碼 注解
       */
      @Retention(RetentionPolicy.RUNTIME)
      @Target(ElementType.TYPE)
      @Documented
      public @interface ActionCode {
      
          int value();
      }
      
    • WebSocket消息響應(yīng)體注解

      package com.ryan.websocketpush.annotation;
      
      import java.lang.annotation.*;
      
      /**
       * socket 返回 數(shù)據(jù) 注解
       * 自動解析為 json 并返回
       */
      @Retention(RetentionPolicy.RUNTIME)
      @Target(ElementType.METHOD)
      @Documented
      public @interface SocketResponseBody {
      
      }
      
  • 2.7 消息動作碼定義類

    package com.ryan.websocketpush.netty.constants;
    
    /**
     * socket消息狀態(tài)碼定義類
     * ----------------------------------------------
     */
    public interface ActionCodeConstants {
    
        /**
         * 廣播消息
         */
        int PUSH_MSG = 1024;
    
        /**
         * 指定用戶消息
         */
        int SPECIAL_MSG = 1025;
    
    }
    

    通過定義不同的消息狀態(tài)可以實現(xiàn)不同類型的消息推送,這里我們暫且只定義一個廣播的消息狀態(tài)碼拦止。何為廣播县遣,就是只要你客戶端請求接入Netty服務(wù)器,就給推送消息汹族,人人有份萧求。

  • 2.8 下面是真正消息業(yè)務(wù)處理邏輯實現(xiàn)類了

    • 創(chuàng)建消息處理接口,往后不同狀態(tài)碼的消息處理類都要實現(xiàn)這個接口顶瞒,通過doAction()方法實現(xiàn)不同消息業(yè)務(wù)邏輯實現(xiàn)夸政,這里簡單實現(xiàn)一個點對點訂閱推送

      package com.ryan.websocketpush.netty.interf;
        
      import io.netty.channel.ChannelHandlerContext;
        
      public interface IActionSocketService {
        
          Object doAction(ChannelHandlerContext context, String message);
       }
      
      
    • 點對點消息實現(xiàn)類

      package com.ryan.websocketpush.service.impl;
      
      import com.alibaba.fastjson.JSONObject;
      import com.ryan.websocketpush.annotation.ActionCode;
      import com.ryan.websocketpush.netty.constants.ActionCodeConstants;
      import com.ryan.websocketpush.netty.global.ChannelSupervise;
      import com.ryan.websocketpush.netty.interf.IActionSocketService;
      import io.netty.channel.ChannelHandlerContext;
      import lombok.extern.slf4j.Slf4j;
      import org.springframework.stereotype.Service;
      
      
      @Service
      @Slf4j
      @ActionCode(ActionCodeConstants.SPECIAL_MSG)
      public class PointToPointMsgServiceImpl implements IActionSocketService {
          @Override
          public Object doAction(ChannelHandlerContext context, String message) {
              log.info("收到終端請求 === " + message);
              JSONObject parseJson = JSONObject.parseObject(message);
              Integer actionCode = parseJson.getInteger("action");
              ChannelSupervise.removeChannel(context.channel());             //去除廣播推送
              ChannelSupervise.addChannelByActionCode(context.channel(), actionCode);
              return "發(fā)送指定用戶信息";
          }
      }
      

    也許會有怎么識別出那個狀態(tài)碼對應(yīng)哪個業(yè)務(wù)邏輯代碼實現(xiàn)?這時候上面所定義的@ActionCode自定義注解就起到作用了

    • 提取一個請求里面的狀態(tài)碼搁拙,并對應(yīng)到消息實現(xiàn)類秒梳,也就是WebSocket請求結(jié)構(gòu)體

      package com.ryan.websocketpush.netty.bean;
        
      import cn.hutool.json.JSONUtil;
      import lombok.Data;
        
      
      @Data
       public class BaseSocketRequest {
        
          private Integer action;
          private String ip;
        
        
        
          /**
            * 獲取對應(yīng)請求的bean
            *
            * @param json
            * @return 對應(yīng)的bean
            */
          public static BaseSocketRequest toBean(String json){
              if(JSONUtil.isJson(json)){
                  BaseSocketRequest socketRequest = JSONUtil.toBean(json, BaseSocketRequest.class);
                  if (socketRequest != null && socketRequest.getAction() != null){
                      return socketRequest;
                  }
              }
              return null;
          }
      }
      
  • 2.9 如何去返回最后的結(jié)果?這里通過切面編程的方式去統(tǒng)一的返回結(jié)果

    • 自定義WebSocketResponseBody注解

      package com.ryan.websocketpush.annotation;
      
      import java.lang.annotation.*;
      
      /**
       * socket 返回 數(shù)據(jù) 注解
       * 自動解析為 json 并返回
       */
      @Retention(RetentionPolicy.RUNTIME)
      @Target(ElementType.METHOD)
      @Documented
      public @interface SocketResponseBody {
      
      }
      
    • 切面編程

      package com.ryan.websocketpush.netty.aspect;
      
      import com.ryan.websocketpush.utils.JsonUtils2;
      import io.netty.channel.ChannelHandlerContext;
      import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
      import org.aspectj.lang.ProceedingJoinPoint;
      import org.aspectj.lang.annotation.Around;
      import org.aspectj.lang.annotation.Aspect;
      import org.aspectj.lang.annotation.Pointcut;
      import org.springframework.stereotype.Component;
      
      @Aspect
      @Component
      public class SocketResponseBodyAspect {
      
          @Pointcut("@annotation(com.ryan.websocketpush.annotation.SocketResponseBody)")
          public void pointCut() {
      
          }
      
          @Around("pointCut()")
          public Object around(ProceedingJoinPoint point) throws Throwable {
      
              /**
               * 獲取返回結(jié)果
               */
              Object proceed = point.proceed();
              /**
               * 獲取方法參數(shù)
               */
              Object[] args = point.getArgs();
      
              if (args.length > 0) {
                  if (args[0] instanceof ChannelHandlerContext){
                      TextWebSocketFrame tws = new TextWebSocketFrame( JsonUtils2.writeValue(proceed));
                      ChannelHandlerContext context = (ChannelHandlerContext) args[0];
                      context.channel().writeAndFlush(tws);        //通過信道發(fā)送消息
                  }
              }
              return proceed;
          }
      
      }
      
  • 2.10 定時任務(wù)模擬消息推送

    package com.ryan.websocketpush.scheduler;
    
    import com.ryan.websocketpush.netty.constants.ActionCodeConstants;
    import com.ryan.websocketpush.netty.global.ChannelSupervise;
    import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
    import org.springframework.scheduling.annotation.Scheduled;
    import org.springframework.stereotype.Component;
    
    /**
     * 消息推送
     */
    @Component
    public class PushTask {
    
        @Scheduled(cron = "0/5 * * * * ? ")
        public void pushAll() {
            TextWebSocketFrame textWebSocketFrame = new TextWebSocketFrame("服務(wù)器5秒廣播消息");
            ChannelSupervise.send2All(textWebSocketFrame);
        }
    
        @Scheduled(cron = "0/10 * * * * ? ")
        public void pushAllByActionCode() {
            TextWebSocketFrame textWebSocketFrame = new TextWebSocketFrame("服務(wù)器10秒指定用戶消息");
            ChannelSupervise.send2AllAction(ActionCodeConstants.SPECIAL_MSG, textWebSocketFrame);
        }
    }
    
  • 2.11 測試結(jié)果

    • 廣播消息箕速,只要客戶端接入就可以收到服務(wù)器端發(fā)送過來的消息
    WebSocket廣播
    • 上面提到為什么要開啟線程進(jìn)行Netty服務(wù)器的啟動酪碘,這里就來驗證一次,如果不開啟線程看看會是怎樣的結(jié)果

      • 注釋代碼
      @Autowired
      private NioWebSocketServer nettyServer;
      
      @Value("${webSokcet.port}")
      private Integer webSocketPort;
      
      @Override
      public void run(String... args) throws Exception {
          log.warn("websocket服務(wù)器準(zhǔn)備啟動盐茎! 端口為 :  " + webSocketPort);
              //nettyServer.init(webSocketPort);
      
      }
      
      • 在Main主線程中開啟Netty
          @Autowired
          private NioWebSocketChannelInitializer channelInitializer;
      
          @Value("${webSokcet.port}")
          private Integer webSocketPort;
          
          @Bean
          public void init() {
              logger.info("正在啟動websocket服務(wù)器");
              NioEventLoopGroup boss = new NioEventLoopGroup();
              NioEventLoopGroup work = new NioEventLoopGroup();
              try {
                  ServerBootstrap bootstrap = new ServerBootstrap();
                  bootstrap.group(boss, work);
                  bootstrap.channel(NioServerSocketChannel.class);
                  bootstrap.childHandler(channelInitializer);
                  Channel channel = bootstrap.bind(webSocketPort).sync().channel();
                  logger.info("webSocket服務(wù)器啟動成功:" + channel);
                  logger.info("webSocket端口號:" + webSocketPort);
                  channel.closeFuture().sync();
              } catch (InterruptedException e) {
                  e.printStackTrace();
                  logger.info("運行出錯:" + e);
              } finally {
                  boss.shutdownGracefully();
                  work.shutdownGracefully();
                  logger.info("websocket服務(wù)器已關(guān)閉");
              }
          }
      
      • 連接上后兴垦,并沒有接收到廣播信息;并且發(fā)送信息會出現(xiàn)如下這種情況

        非線程啟動Netty
      • 分析項目啟動控制臺日志打印

        線程啟動
        非線程啟動
      • 總結(jié)

        不用線程進(jìn)行啟動的時候字柠,定時器和WebSocket處理邏輯類沒有初始化探越,是因為Netty啟動后,主線程一直阻塞監(jiān)聽窑业,導(dǎo)致后面的定時器和MsgImplement沒有注入到Spring容器中钦幔。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市常柄,隨后出現(xiàn)的幾起案子鲤氢,更是在濱河造成了極大的恐慌,老刑警劉巖西潘,帶你破解...
    沈念sama閱讀 217,542評論 6 504
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件卷玉,死亡現(xiàn)場離奇詭異,居然都是意外死亡喷市,警方通過查閱死者的電腦和手機(jī)相种,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,822評論 3 394
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來品姓,“玉大人寝并,你說我怎么就攤上這事箫措。” “怎么了衬潦?”我有些...
    開封第一講書人閱讀 163,912評論 0 354
  • 文/不壞的土叔 我叫張陵蒂破,是天一觀的道長。 經(jīng)常有香客問我别渔,道長,這世上最難降的妖魔是什么惧互? 我笑而不...
    開封第一講書人閱讀 58,449評論 1 293
  • 正文 為了忘掉前任哎媚,我火速辦了婚禮,結(jié)果婚禮上喊儡,老公的妹妹穿的比我還像新娘拨与。我一直安慰自己,他們只是感情好艾猜,可當(dāng)我...
    茶點故事閱讀 67,500評論 6 392
  • 文/花漫 我一把揭開白布买喧。 她就那樣靜靜地躺著,像睡著了一般匆赃。 火紅的嫁衣襯著肌膚如雪淤毛。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 51,370評論 1 302
  • 那天算柳,我揣著相機(jī)與錄音低淡,去河邊找鬼。 笑死瞬项,一個胖子當(dāng)著我的面吹牛蔗蹋,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播囱淋,決...
    沈念sama閱讀 40,193評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼猪杭,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了妥衣?” 一聲冷哼從身側(cè)響起皂吮,我...
    開封第一講書人閱讀 39,074評論 0 276
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎称鳞,沒想到半個月后涮较,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體丛塌,經(jīng)...
    沈念sama閱讀 45,505評論 1 314
  • 正文 獨居荒郊野嶺守林人離奇死亡端三,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,722評論 3 335
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了蚊锹。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片熙暴。...
    茶點故事閱讀 39,841評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡闺属,死狀恐怖慌盯,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情掂器,我是刑警寧澤亚皂,帶...
    沈念sama閱讀 35,569評論 5 345
  • 正文 年R本政府宣布,位于F島的核電站国瓮,受9級特大地震影響灭必,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜乃摹,卻給世界環(huán)境...
    茶點故事閱讀 41,168評論 3 328
  • 文/蒙蒙 一禁漓、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧孵睬,春花似錦播歼、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,783評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至蹈集,卻和暖如春烁试,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背拢肆。 一陣腳步聲響...
    開封第一講書人閱讀 32,918評論 1 269
  • 我被黑心中介騙來泰國打工廓潜, 沒想到剛下飛機(jī)就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人善榛。 一個月前我還...
    沈念sama閱讀 47,962評論 2 370
  • 正文 我出身青樓辩蛋,卻偏偏與公主長得像,于是被迫代替她去往敵國和親移盆。 傳聞我的和親對象是個殘疾皇子悼院,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 44,781評論 2 354

推薦閱讀更多精彩內(nèi)容