利用WebSocket實(shí)現(xiàn)
說到網(wǎng)頁聊天室一般都是使用WebSocket長連接進(jìn)行數(shù)據(jù)交互和雙端數(shù)據(jù)發(fā)送,本人也已經(jīng)整合了一整套依賴于springboot-websocket包的網(wǎng)絡(luò)交互Demo唯竹,具體功能如下:
- 多用戶群聊
- 點(diǎn)對點(diǎn)私聊
- 實(shí)時消息通知
- 在線用戶顯示
- 上線唬渗、斷線等實(shí)時監(jiān)聽
- 其他在線通訊
WebSocket依賴包
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
SpringBoot簡單整合Netty
在Netty中可以集成WebSocket典阵,以下Demo只實(shí)現(xiàn)了用戶群聊,其他功能可加邏輯處理自行擴(kuò)展
- NettyApplication(啟動類)
@PropertySource(value= "classpath:/nettyserver.properties")
@SpringBootApplication
public class NettyApplication {
@Value("${tcp.port}")
private int tcpPort;
@Value("${boss.thread.count}")
private int bossCount;
@Value("${worker.thread.count}")
private int workerCount;
@Value("${so.keepalive}")
private boolean keepAlive;
@Value("${so.backlog}")
private int backlog;
@Bean(name = "serverBootstrap")
public ServerBootstrap bootstrap() {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup(), workerGroup())
.channel(NioServerSocketChannel.class)
.handler(new LoggingHandler(LogLevel.DEBUG))
.childHandler(nettyWebSocketChannelInitializer);
Map<ChannelOption<?>, Object> tcpChannelOptions = tcpChannelOptions();
Set<ChannelOption<?>> keySet = tcpChannelOptions.keySet();
for (@SuppressWarnings("rawtypes") ChannelOption option : keySet) {
b.option(option, tcpChannelOptions.get(option));
}
return b;
}
@Autowired
@Qualifier("somethingChannelInitializer")
private NettyWebSocketChannelInitializer nettyWebSocketChannelInitializer;
@Bean(name = "tcpChannelOptions")
public Map<ChannelOption<?>, Object> tcpChannelOptions() {
Map<ChannelOption<?>, Object> options = new HashMap<ChannelOption<?>, Object>();
options.put(ChannelOption.SO_KEEPALIVE, keepAlive);
options.put(ChannelOption.SO_BACKLOG, backlog);
return options;
}
@Bean(name = "bossGroup", destroyMethod = "shutdownGracefully")
public NioEventLoopGroup bossGroup() {
return new NioEventLoopGroup(bossCount);
}
@Bean(name = "workerGroup", destroyMethod = "shutdownGracefully")
public NioEventLoopGroup workerGroup() {
return new NioEventLoopGroup(workerCount);
}
@Bean(name = "tcpSocketAddress")
public InetSocketAddress tcpPort() {
return new InetSocketAddress(tcpPort);
}
public static void main(String[] args) throws Exception{
ConfigurableApplicationContext context = SpringApplication.run(NettyApplication.class, args);
TCPServer tcpServer = context.getBean(TCPServer.class);
tcpServer.start();
}
}
- TCPServer(啟動Netty服務(wù))
@Component
public class TCPServer {
@Autowired
@Qualifier("serverBootstrap")
private ServerBootstrap serverBootstrap;
@Autowired
@Qualifier("tcpSocketAddress")
private InetSocketAddress tcpPort;
private Channel serverChannel;
public void start() throws Exception {
serverChannel = serverBootstrap.bind(tcpPort).sync().channel().closeFuture().sync().channel();
}
@PreDestroy
public void stop() throws Exception {
serverChannel.close();
serverChannel.parent().close();
}
public ServerBootstrap getServerBootstrap() {
return serverBootstrap;
}
public void setServerBootstrap(ServerBootstrap serverBootstrap) {
this.serverBootstrap = serverBootstrap;
}
public InetSocketAddress getTcpPort() {
return tcpPort;
}
public void setTcpPort(InetSocketAddress tcpPort) {
this.tcpPort = tcpPort;
}
}
- NettyWebSocketChannelInitializer(添加自定義handler)
@Component
@Qualifier("somethingChannelInitializer")
public class NettyWebSocketChannelInitializer extends ChannelInitializer<SocketChannel> {
@Autowired
private TextWebSocketFrameHandler textWebSocketFrameHandler;
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new HttpServerCodec());
pipeline.addLast(new HttpObjectAggregator(65536));
pipeline.addLast(new ChunkedWriteHandler());
pipeline.addLast(new WebSocketServerProtocolHandler("/ws"));
pipeline.addLast(textWebSocketFrameHandler); //這里不能使用new镊逝,不然在handler中不能注入依賴
}
}
- TextWebSocketFrameHandler(自定義操作類)
@Component
@Qualifier("textWebSocketFrameHandler")
@ChannelHandler.Sharable
public class TextWebSocketFrameHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
public static ChannelGroup channels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
@Autowired
private RedisDao redisDao;
@Override
protected void channelRead0(ChannelHandlerContext ctx,
TextWebSocketFrame msg) throws Exception {
Channel incoming = ctx.channel();
String uName = redisDao.getString(incoming.id()+"");
for (Channel channel : channels) {
if (channel != incoming){
channel.writeAndFlush(new TextWebSocketFrame("[" + uName + "]" + msg.text()));
} else {
channel.writeAndFlush(new TextWebSocketFrame("[you]" + msg.text() ));
}
}
}
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
System.out.println(ctx.channel().remoteAddress());
String uName = new RandomName().getRandomName(); //用來獲取一個隨機(jī)的用戶名壮啊,可以用其他方式代替
Channel incoming = ctx.channel();
for (Channel channel : channels) {
channel.writeAndFlush(new TextWebSocketFrame("[新用戶] - " + uName + " 加入"));
}
redisDao.saveString(incoming.id()+"",uName); //存儲用戶
channels.add(ctx.channel());
}
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
Channel incoming = ctx.channel();
String uName = redisDao.getString(String.valueOf(incoming.id()));
for (Channel channel : channels) {
channel.writeAndFlush(new TextWebSocketFrame("[用戶] - " + uName + " 離開"));
}
redisDao.deleteString(String.valueOf(incoming.id())); //刪除用戶
redisDao.saveString("cacheName",redisDao.getString("cacheName").replaceAll(uName,"")); //標(biāo)準(zhǔn)已經(jīng)使用的用戶名
channels.remove(ctx.channel());
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
Channel incoming = ctx.channel();
System.out.println("用戶:"+redisDao.getString(incoming.id()+"")+"在線");
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
Channel incoming = ctx.channel();
System.out.println("用戶:"+redisDao.getString(incoming.id()+"")+"掉線");
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
throws Exception {
Channel incoming = ctx.channel();
System.out.println("用戶:"+redisDao.getString(incoming.id()+"")+"異常");
cause.printStackTrace();
ctx.close();
}
}
這邊使用Redis保存用戶名和ChannelId來不同瀏覽器登錄的用戶
- channelRead0:定義接收到消息的操作
- handlerAdded:定義新用戶連接的操作
- handlerRemoved:定義用戶離開的操作
- channelActive:定義用戶在線的操作
- channelInactive:定義用戶離線的操作
- exceptionCaught:定義用戶異常的操作
如果要在Controller
中使用Channel
向客戶端發(fā)送數(shù)據(jù),只要注入TextWebSocketFrameHandler
撑蒜,取得其中的ChannelGroup
歹啼,再通過自己邏輯處理后存儲的ChannelId
來取得對應(yīng)的Channel玄渗,即可向客戶端發(fā)送消息
Netty依賴包
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.16.Final</version>
</dependency>
- 前端代碼
<!DOCTYPE html>
<html>
<head>
<meta charset="UTF-8">
<title>WebSocket Chat</title>
</head>
<body>
<script type="text/javascript">
var socket;
if (!window.WebSocket) {
window.WebSocket = window.MozWebSocket;
}
if (window.WebSocket) {
socket = new WebSocket("ws://localhost:8090/ws");
socket.onmessage = function(event) {
var ta = document.getElementById('responseText');
ta.value = ta.value + '\n' + event.data
};
socket.onopen = function(event) {
var ta = document.getElementById('responseText');
ta.value = "連接開啟!";
};
socket.onclose = function(event) {
var ta = document.getElementById('responseText');
ta.value = ta.value + "連接被關(guān)閉";
};
} else {
alert("你的瀏覽器不支持 WebSocket!");
}
function send(message) {
if (!window.WebSocket) {
return;
}
if (socket.readyState == WebSocket.OPEN) {
socket.send(message);
} else {
alert("連接沒有開啟.");
}
}
window.onbeforeunload = function(event) {
event.returnValue = "刷新提醒";
};
</script>
<form onsubmit="return false;">
<h3>netty 聊天室:</h3>
<textarea id="responseText" style="width: 400px; height: 300px;"></textarea>
<br>
<input type="text" name="message" style="width: 300px" value="測試數(shù)據(jù)">
<input type="button" value="發(fā)送消息" onclick="send(this.form.message.value)">
</form>
<br>
<br>
</body>
</html>
- nettyserver.properties
tcp.port=8090
boss.thread.count=2
worker.thread.count=2
so.keepalive=true
so.backlog=100
效果截圖
群聊效果截圖