1. 整體流程
客戶端發(fā)起http請求疯兼,請求Netty服務(wù)器進(jìn)行WebSocket連接教馆,服務(wù)器接收后請求后進(jìn)行注冊信道并登記客戶端IP地址馋缅,如此一來就建立了WebSocket通訊連接扒腕。
上面的論述可以得出,我們可以比較Http和WebSocket兩者之間的關(guān)系和區(qū)別
都是基于TCP協(xié)議的兩種通信協(xié)議萤悴;
兩者不相同但是WebSocket又依賴于Http瘾腰,畢竟建立WebSocket通訊首次請求需要使用Http請求;
Http請求只能是客戶端對服務(wù)器進(jìn)行發(fā)送消息覆履,服務(wù)器是被動的蹋盆;而WebSocket則允許雙方可以任由一方主動發(fā)送消息給另外一方,而且能夠長時間連接硝全,只要任何一方不斷開的話栖雾。
基于上述WebSocket的特點,可以實現(xiàn)在線設(shè)備的消息推送
2. Springboot整合WebSocket
- 2.1 依賴添加
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
</dependency>
<!-- hutool-all工具庫 -->
<dependency>
<groupId>cn.hutool</groupId>
<artifactId>hutool-all</artifactId>
<version>5.4.6</version>
</dependency>
<!-- fastjson依賴 -->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.74</version>
</dependency>
<!-- aop依賴 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-aop</artifactId>
</dependency>
- 2.2 WebSocket信道處理實現(xiàn)伟众,處理通訊的業(yè)務(wù)邏輯
package com.ryan.websocketpush.netty.handler;
import com.alibaba.fastjson.JSONObject;
import com.ryan.websocketpush.netty.global.ChannelSupervise;
import com.ryan.websocketpush.netty.service.TcpDispacher;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.handler.codec.http.DefaultFullHttpResponse;
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http.HttpVersion;
import io.netty.handler.codec.http.websocketx.*;
import io.netty.util.CharsetUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.Date;
import static io.netty.handler.codec.http.HttpUtil.isKeepAlive;
@Component
@ChannelHandler.Sharable
public class NioWebSocketHandler extends SimpleChannelInboundHandler<Object> {
private final Logger logger = LoggerFactory.getLogger(getClass());
private WebSocketServerHandshaker handshaker;
@Autowired
private TcpDispacher tcpDispacher;
/**
* 服務(wù)器接收消息
*
* @param ctx
* @param msg
* @throws Exception
*/
@Override
protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
logger.debug("收到消息:" + msg);
if (msg instanceof FullHttpRequest) { //第一次通過Http請求進(jìn)行WebSocket通訊連接
//以http請求形式接入析藕,但是走的是websocket
handleHttpRequest(ctx, (FullHttpRequest) msg);
JSONObject jsonObject = new JSONObject();
String result = jsonObject.toJSONString();
TextWebSocketFrame tws = new TextWebSocketFrame(result);
ctx.channel().writeAndFlush(tws);
} else if (msg instanceof WebSocketFrame) {
//處理websocket客戶端的消息
logger.debug("收到消息:" + msg);
handlerWebSocketFrame(ctx, (WebSocketFrame) msg);
}
}
/**
* 客戶端連接并添加通道
*
* @param ctx
* @throws Exception
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
//添加連接
logger.debug("客戶端加入連接:" + ctx.channel());
ChannelSupervise.addChannel(ctx.channel());
// SocketActiveRequest socketActiveRequest = new SocketActiveRequest();
// socketActiveRequest.setIp(NettyUtils.getClientIP(ctx));
// socketActiveRequest.setStatus(Constants.SOCKET_STATUS.LOGIN);
// tcpDispacher.messageRecived(ctx, JsonUtil.toJson(socketActiveRequest));
}
/**
* 客戶端斷開并刪除通道
*
* @param ctx
* @throws Exception
*/
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
//斷開連接
logger.debug("客戶端斷開連接:" + ctx.channel());
TextWebSocketFrame tws = new TextWebSocketFrame("break");
ctx.channel().writeAndFlush(tws);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.flush();
}
private void handlerWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame frame) {
/**
* 判斷是否關(guān)閉鏈路的指令
*/
if (frame instanceof CloseWebSocketFrame) {
ChannelSupervise.removeChannel(ctx.channel());
handshaker.close(ctx.channel(), (CloseWebSocketFrame) frame.retain());
return;
}
/**
* 判斷是否ping消息
*/
if (frame instanceof PingWebSocketFrame) {
ctx.channel().write(
new PongWebSocketFrame(frame.content().retain()));
return;
}
/**
* 僅支持文本消息,不支持二進(jìn)制消息
*/
if (!(frame instanceof TextWebSocketFrame)) {
logger.debug("僅支持文本消息凳厢,不支持二進(jìn)制消息");
throw new UnsupportedOperationException(String.format(
"%s frame types not supported", frame.getClass().getName()));
}
/**
* 返回應(yīng)答消息
*/
String request = ((TextWebSocketFrame) frame).text();
tcpDispacher.messageRecived(ctx, request);
logger.debug("服務(wù)端收到:" + request);
TextWebSocketFrame tws = new TextWebSocketFrame(new Date().toString()
+ ctx.channel().id() + ":" + request);
//群發(fā)
//ChannelSupervise.send2All(tws);
// /**
// * 返回【誰發(fā)的發(fā)給誰】
// */
// ctx.channel().writeAndFlush(tws);
}
/**
* 唯一的一次http請求账胧,用于創(chuàng)建websocket
*/
private void handleHttpRequest(ChannelHandlerContext ctx,
FullHttpRequest req) {
//要求Upgrade為websocket,過濾掉get/Post
if (!req.decoderResult().isSuccess()
|| (!"websocket".equals(req.headers().get("Upgrade")))) {
//若不是websocket方式数初,則創(chuàng)建BAD_REQUEST的req找爱,返回給客戶端
sendHttpResponse(ctx, req, new DefaultFullHttpResponse(
HttpVersion.HTTP_1_1, HttpResponseStatus.BAD_REQUEST));
return;
}
WebSocketServerHandshakerFactory wsFactory = new WebSocketServerHandshakerFactory("ws://localhost:8088/websocket", null, false);
handshaker = wsFactory.newHandshaker(req);
if (handshaker == null) {
WebSocketServerHandshakerFactory
.sendUnsupportedVersionResponse(ctx.channel());
} else {
handshaker.handshake(ctx.channel(), req);
}
}
/**
* 拒絕不合法的請求,并返回錯誤信息
*/
private static void sendHttpResponse(ChannelHandlerContext ctx,
FullHttpRequest req, DefaultFullHttpResponse res) {
/**
* 返回應(yīng)答給客戶端
*/
if (res.status().code() != 200) {
ByteBuf buf = Unpooled.copiedBuffer(res.status().toString(),
CharsetUtil.UTF_8);
res.content().writeBytes(buf);
buf.release();
}
ChannelFuture f = ctx.channel().writeAndFlush(res);
/**
* 如果是非Keep-Alive泡孩,關(guān)閉連接
*/
if (!isKeepAlive(req) || res.status().code() != 200) {
f.addListener(ChannelFutureListener.CLOSE);
}
}
}
- 2.3 連接通信信道的初始化注冊
package com.ryan.websocketpush.netty.handler;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.logging.LoggingHandler;
import io.netty.handler.stream.ChunkedWriteHandler;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
public class NioWebSocketChannelInitializer extends ChannelInitializer<SocketChannel> {
@Autowired
private NioWebSocketHandler socketHandler ;
@Override
protected void initChannel(SocketChannel ch) {
ch.pipeline().addLast("logging",new LoggingHandler("DEBUG")); //設(shè)置log監(jiān)聽器车摄,并且日志級別為debug,方便觀察運行流程
ch.pipeline().addLast("http-codec",new HttpServerCodec()); //設(shè)置解碼器
ch.pipeline().addLast("aggregator",new HttpObjectAggregator(65536)); //聚合器仑鸥,使用websocket會用到
ch.pipeline().addLast("http-chunked",new ChunkedWriteHandler()); //用于大數(shù)據(jù)的分區(qū)傳輸
ch.pipeline().addLast("handler", socketHandler); //自定義的業(yè)務(wù)handler
}
}
- 2.4 注冊完信道Channel后吮播,要對信道進(jìn)行通過全局管理起來
package com.ryan.websocketpush.netty.global;
import cn.hutool.core.collection.CollUtil;
import com.ryan.websocketpush.utils.JsonUtils2;
import io.netty.channel.Channel;
import io.netty.channel.ChannelId;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.util.concurrent.GlobalEventExecutor;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
/**
* 頻道管理
*/
public class ChannelSupervise {
/**
* 頻道集群管理
*/
private static ChannelGroup GlobalGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
/**
* 頻道管理 【channel.id().asShortText():channel.id()】
*/
private static ConcurrentMap<String, ChannelId> ChannelMap = new ConcurrentHashMap();
/**
* (動作碼:頻道)映射
*/
private static ConcurrentHashMap<Object, ChannelGroup> actionGlobalGroupMap = new ConcurrentHashMap<>();
/**
* 增加頻道
*
* @param channel
*/
public static void addChannel(Channel channel) {
GlobalGroup.add(channel);
ChannelMap.put(channel.id().asShortText(), channel.id());
}
/**
* 移除頻道
*
* @param channel
*/
public static void removeChannel(Channel channel) {
GlobalGroup.remove(channel);
ChannelMap.remove(channel.id().asShortText());
removeChannelByActionCode(channel);
}
/**
* 查找頻道
*
* @param id
* @return
*/
public static Channel findChannel(String id) {
return GlobalGroup.find(ChannelMap.get(id));
}
/**
* 推送消息
*
* @param tws
*/
public static void send2All(TextWebSocketFrame tws) {
GlobalGroup.writeAndFlush(tws);
}
public static void addChannelByActionCode(Channel channel, int actionCode) {
ChannelGroup channels = actionGlobalGroupMap.get(actionCode);
if (channels == null){
channels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
}
channels.add(channel);
actionGlobalGroupMap.put(actionCode, channels);
// addChannelByObject(channel, actionCode);
}
public static void addChannelByObject(Channel channel, Object object) {
ChannelGroup channels = actionGlobalGroupMap.get(object);
if (channels == null){
channels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
}
channels.add(channel);
actionGlobalGroupMap.put(object, channels);
}
public static void removeChannelByActionCode(Channel channel){
for (Object actionCode : actionGlobalGroupMap.keySet()){
ChannelGroup channels = actionGlobalGroupMap.get(actionCode);
removeChannel(channels, channel);
}
}
private static void removeChannel(ChannelGroup channels , Channel channel){
if (channels != null && channel != null){
channels.remove(channel);
channels.remove(channel.id().asShortText());
}
}
public static void removeByObjectChannel(Channel channel, Object object){
ChannelGroup channels = actionGlobalGroupMap.get(object);
removeChannel(channels, channel);
}
/**
* 發(fā)送到所有在線設(shè)備(通過動作碼去推送消息)
*
* @param action
* @param entity
*/
public static void send2AllAction(Object action, Object entity) {
ChannelGroup channels = actionGlobalGroupMap.get(action);
if (channels != null){
channels.writeAndFlush( new TextWebSocketFrame(JsonUtils2.writeValue(entity)));
}
}
public static boolean existActionChannelGroup(int action){
return CollUtil.isNotEmpty(actionGlobalGroupMap.get(action));
}
public static boolean existObjectChannelGroup(Object object){
return CollUtil.isNotEmpty(actionGlobalGroupMap.get(object));
}
}
- 2.5 配置好Netty服務(wù)器之后,然后創(chuàng)建一新的線程進(jìn)行Netty服務(wù)器的
package com.ryan.websocketpush.netty.service;
import com.ryan.websocketpush.netty.handler.NioWebSocketChannelInitializer;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;
@Component
public class NioWebSocketServer {
private final Logger logger = LoggerFactory.getLogger(getClass());
@Autowired
private NioWebSocketChannelInitializer channelInitializer;
public void init(Integer webSocketPort) {
logger.info("正在啟動websocket服務(wù)器");
NioEventLoopGroup boss = new NioEventLoopGroup();
NioEventLoopGroup work = new NioEventLoopGroup();
try {
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(boss, work);
bootstrap.channel(NioServerSocketChannel.class);
bootstrap.childHandler(channelInitializer);
Channel channel = bootstrap.bind(webSocketPort).sync().channel();
logger.info("webSocket服務(wù)器啟動成功:" + channel);
logger.info("webSocket端口號:" + webSocketPort);
channel.closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
logger.info("運行出錯:" + e);
} finally {
boss.shutdownGracefully();
work.shutdownGracefully();
logger.info("websocket服務(wù)器已關(guān)閉");
}
}
}
創(chuàng)建線程啟動Netty服務(wù)器
package com.ryan.websocketpush.netty.mainThread;
import com.ryan.websocketpush.netty.service.NioWebSocketServer;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Component;
import java.util.List;
/**
* Netty服務(wù)線程啟動
*
*/
@Component
@Slf4j
public class NettyServerThread implements CommandLineRunner {
@Autowired
private NioWebSocketServer nettyServer;
@Value("${webSokcet.port}")
private Integer webSocketPort;
@Override
public void run(String... args) throws Exception {
log.warn("websocket服務(wù)器準(zhǔn)備啟動眼俊! 端口為 : " + webSocketPort);
nettyServer.init(webSocketPort);
}
}
也許這里意狠,有些疑問為什么需要創(chuàng)建線程來啟動Netty服務(wù)器,這里嘗試一下如果沒有創(chuàng)建線程疮胖,直接在Main主線程啟動环戈,看看會有什么問題闷板?這個問題留著后面再去說明
在配置文件中進(jìn)行WebSocket的端口配置
webSokcet.port=8090
上面的步驟進(jìn)行完成了一個Netty服務(wù)器的初始化啟動和端口監(jiān)聽,這時候會有疑問了院塞,消息推送功能還沒有實現(xiàn)遮晚。所以下面就是我們?nèi)绾稳崿F(xiàn)消息處理邏輯
-
2.6 編寫自定義注解
-
WebSocket動作碼注解
package com.ryan.websocketpush.annotation; import java.lang.annotation.*; /** * * socket 動作碼 注解 */ @Retention(RetentionPolicy.RUNTIME) @Target(ElementType.TYPE) @Documented public @interface ActionCode { int value(); }
-
WebSocket消息響應(yīng)體注解
package com.ryan.websocketpush.annotation; import java.lang.annotation.*; /** * socket 返回 數(shù)據(jù) 注解 * 自動解析為 json 并返回 */ @Retention(RetentionPolicy.RUNTIME) @Target(ElementType.METHOD) @Documented public @interface SocketResponseBody { }
-
-
2.7 消息動作碼定義類
package com.ryan.websocketpush.netty.constants; /** * socket消息狀態(tài)碼定義類 * ---------------------------------------------- */ public interface ActionCodeConstants { /** * 廣播消息 */ int PUSH_MSG = 1024; /** * 指定用戶消息 */ int SPECIAL_MSG = 1025; }
通過定義不同的消息狀態(tài)可以實現(xiàn)不同類型的消息推送,這里我們暫且只定義一個廣播的消息狀態(tài)碼拦止。何為廣播县遣,就是只要你客戶端請求接入Netty服務(wù)器,就給推送消息汹族,人人有份萧求。
-
2.8 下面是真正消息業(yè)務(wù)處理邏輯實現(xiàn)類了
-
創(chuàng)建消息處理接口,往后不同狀態(tài)碼的消息處理類都要實現(xiàn)這個接口顶瞒,通過doAction()方法實現(xiàn)不同消息業(yè)務(wù)邏輯實現(xiàn)夸政,這里簡單實現(xiàn)一個點對點訂閱推送
package com.ryan.websocketpush.netty.interf; import io.netty.channel.ChannelHandlerContext; public interface IActionSocketService { Object doAction(ChannelHandlerContext context, String message); }
-
點對點消息實現(xiàn)類
package com.ryan.websocketpush.service.impl; import com.alibaba.fastjson.JSONObject; import com.ryan.websocketpush.annotation.ActionCode; import com.ryan.websocketpush.netty.constants.ActionCodeConstants; import com.ryan.websocketpush.netty.global.ChannelSupervise; import com.ryan.websocketpush.netty.interf.IActionSocketService; import io.netty.channel.ChannelHandlerContext; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Service; @Service @Slf4j @ActionCode(ActionCodeConstants.SPECIAL_MSG) public class PointToPointMsgServiceImpl implements IActionSocketService { @Override public Object doAction(ChannelHandlerContext context, String message) { log.info("收到終端請求 === " + message); JSONObject parseJson = JSONObject.parseObject(message); Integer actionCode = parseJson.getInteger("action"); ChannelSupervise.removeChannel(context.channel()); //去除廣播推送 ChannelSupervise.addChannelByActionCode(context.channel(), actionCode); return "發(fā)送指定用戶信息"; } }
也許會有怎么識別出那個狀態(tài)碼對應(yīng)哪個業(yè)務(wù)邏輯代碼實現(xiàn)?這時候上面所定義的@ActionCode自定義注解就起到作用了
-
提取一個請求里面的狀態(tài)碼搁拙,并對應(yīng)到消息實現(xiàn)類秒梳,也就是WebSocket請求結(jié)構(gòu)體
package com.ryan.websocketpush.netty.bean; import cn.hutool.json.JSONUtil; import lombok.Data; @Data public class BaseSocketRequest { private Integer action; private String ip; /** * 獲取對應(yīng)請求的bean * * @param json * @return 對應(yīng)的bean */ public static BaseSocketRequest toBean(String json){ if(JSONUtil.isJson(json)){ BaseSocketRequest socketRequest = JSONUtil.toBean(json, BaseSocketRequest.class); if (socketRequest != null && socketRequest.getAction() != null){ return socketRequest; } } return null; } }
-
-
2.9 如何去返回最后的結(jié)果?這里通過切面編程的方式去統(tǒng)一的返回結(jié)果
-
自定義WebSocketResponseBody注解
package com.ryan.websocketpush.annotation; import java.lang.annotation.*; /** * socket 返回 數(shù)據(jù) 注解 * 自動解析為 json 并返回 */ @Retention(RetentionPolicy.RUNTIME) @Target(ElementType.METHOD) @Documented public @interface SocketResponseBody { }
-
切面編程
package com.ryan.websocketpush.netty.aspect; import com.ryan.websocketpush.utils.JsonUtils2; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.http.websocketx.TextWebSocketFrame; import org.aspectj.lang.ProceedingJoinPoint; import org.aspectj.lang.annotation.Around; import org.aspectj.lang.annotation.Aspect; import org.aspectj.lang.annotation.Pointcut; import org.springframework.stereotype.Component; @Aspect @Component public class SocketResponseBodyAspect { @Pointcut("@annotation(com.ryan.websocketpush.annotation.SocketResponseBody)") public void pointCut() { } @Around("pointCut()") public Object around(ProceedingJoinPoint point) throws Throwable { /** * 獲取返回結(jié)果 */ Object proceed = point.proceed(); /** * 獲取方法參數(shù) */ Object[] args = point.getArgs(); if (args.length > 0) { if (args[0] instanceof ChannelHandlerContext){ TextWebSocketFrame tws = new TextWebSocketFrame( JsonUtils2.writeValue(proceed)); ChannelHandlerContext context = (ChannelHandlerContext) args[0]; context.channel().writeAndFlush(tws); //通過信道發(fā)送消息 } } return proceed; } }
-
-
2.10 定時任務(wù)模擬消息推送
package com.ryan.websocketpush.scheduler; import com.ryan.websocketpush.netty.constants.ActionCodeConstants; import com.ryan.websocketpush.netty.global.ChannelSupervise; import io.netty.handler.codec.http.websocketx.TextWebSocketFrame; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; /** * 消息推送 */ @Component public class PushTask { @Scheduled(cron = "0/5 * * * * ? ") public void pushAll() { TextWebSocketFrame textWebSocketFrame = new TextWebSocketFrame("服務(wù)器5秒廣播消息"); ChannelSupervise.send2All(textWebSocketFrame); } @Scheduled(cron = "0/10 * * * * ? ") public void pushAllByActionCode() { TextWebSocketFrame textWebSocketFrame = new TextWebSocketFrame("服務(wù)器10秒指定用戶消息"); ChannelSupervise.send2AllAction(ActionCodeConstants.SPECIAL_MSG, textWebSocketFrame); } }
-
2.11 測試結(jié)果
- 廣播消息箕速,只要客戶端接入就可以收到服務(wù)器端發(fā)送過來的消息
WebSocket廣播-
上面提到為什么要開啟線程進(jìn)行Netty服務(wù)器的啟動酪碘,這里就來驗證一次,如果不開啟線程看看會是怎樣的結(jié)果
- 注釋代碼
@Autowired private NioWebSocketServer nettyServer; @Value("${webSokcet.port}") private Integer webSocketPort; @Override public void run(String... args) throws Exception { log.warn("websocket服務(wù)器準(zhǔn)備啟動盐茎! 端口為 : " + webSocketPort); //nettyServer.init(webSocketPort); }
- 在Main主線程中開啟Netty
@Autowired private NioWebSocketChannelInitializer channelInitializer; @Value("${webSokcet.port}") private Integer webSocketPort; @Bean public void init() { logger.info("正在啟動websocket服務(wù)器"); NioEventLoopGroup boss = new NioEventLoopGroup(); NioEventLoopGroup work = new NioEventLoopGroup(); try { ServerBootstrap bootstrap = new ServerBootstrap(); bootstrap.group(boss, work); bootstrap.channel(NioServerSocketChannel.class); bootstrap.childHandler(channelInitializer); Channel channel = bootstrap.bind(webSocketPort).sync().channel(); logger.info("webSocket服務(wù)器啟動成功:" + channel); logger.info("webSocket端口號:" + webSocketPort); channel.closeFuture().sync(); } catch (InterruptedException e) { e.printStackTrace(); logger.info("運行出錯:" + e); } finally { boss.shutdownGracefully(); work.shutdownGracefully(); logger.info("websocket服務(wù)器已關(guān)閉"); } }
-
連接上后兴垦,并沒有接收到廣播信息;并且發(fā)送信息會出現(xiàn)如下這種情況
非線程啟動Netty -
分析項目啟動控制臺日志打印
線程啟動非線程啟動 -
總結(jié)
不用線程進(jìn)行啟動的時候字柠,定時器和WebSocket處理邏輯類沒有初始化探越,是因為Netty啟動后,主線程一直阻塞監(jiān)聽窑业,導(dǎo)致后面的定時器和MsgImplement沒有注入到Spring容器中钦幔。