使用SpringBoot構(gòu)建一個簡單的WebSocket通信
一遮晚、需求分析
- 瀏覽器終端首次打開需要輸入唯一的用戶名作為標識
- 瀏覽器終端可以查看當(dāng)前在線的用戶
- 瀏覽器終端用戶可以選擇用戶進行溝通
- 瀏覽器終端用戶可以給所有用戶群發(fā)消息
后臺使用springBoot拯啦,前端使用vue.js;相關(guān)js可以引用相關(guān)的cdn
二、項目結(jié)構(gòu)一覽
webSocket-01.png
三蚕捉、項目代碼
1). maven依賴 pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.0.1.RELEASE</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>b-springboot-websocket</artifactId>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
<version>2.0.1.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.30</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
</dependencies>
</project>
2). Java代碼
- 啟動類Application: 程序的入口律胀,同時配置此環(huán)境下webSocketServer需要的bean
package com.lingting.websocketlearn;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;
/**
* 啟動類
* @author Liucheng
* @date 2019/5/1 21:53
*/
@SpringBootApplication
public class Application {
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
/**
* WebSocket服務(wù)端配置
* @return
*/
@Bean
public ServerEndpointExporter serverEndpointExporter () {
return new ServerEndpointExporter();
}
}
- WebSocketServer: 提供webSocket連接處理服務(wù),多例模式情连!
package com.lingting.websocketlearn.websocketserver;
import com.lingting.websocketlearn.framework.decoder.UserMessageDecoder;
import com.lingting.websocketlearn.framework.encoder.UserMessageEncoder;
import com.lingting.websocketlearn.dto.UserMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import org.springframework.validation.annotation.Validated;
import javax.websocket.*;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
/**
* WebSocket 服務(wù)端; 多例模式
* 這里的WebSocketServer相當(dāng)于ws協(xié)議
* @author Liucheng
* @date 2019/5/1 22:02
*/
@ServerEndpoint(
// 握手的url
value = "/websocket/{username}",
// 消息編碼器
encoders = {UserMessageEncoder.class},
// 消息解碼器
decoders = {UserMessageDecoder.class}
)
@Component
public class WebSocketServer {
/**
* 日志信息
*/
static final Logger LOGGER = LoggerFactory.getLogger(WebSocketServer.class);
/**
* 靜態(tài)變量叽粹,記錄當(dāng)前在線連接數(shù)。設(shè)計為線程安全【CAS樂觀鎖機制】却舀,分布式環(huán)境下虫几,建議使用Redis
*/
private static AtomicInteger onlineCount = new AtomicInteger();
/**
* concurrent包下的線程安全Set,用來存放每個客戶端對應(yīng)的MyWebSocket對象【之前的案例】
* private static CopyOnWriteArraySet<WebSocketServer> webSocketServers = new CopyOnWriteArraySet<>();
*
* 使用線程安全Map存儲客戶端
*/
private static ConcurrentHashMap<String, WebSocketServer> webSocketServerMap = new ConcurrentHashMap<>();
/**
* 存放用于姓名列表
*/
private static List<String> userList = new ArrayList<>();
/**
* 與某個客戶端的連接會話,需要通過此為客戶端發(fā)送數(shù)據(jù)
*/
private Session session;
/**
* 當(dāng)前連接的客戶端 用戶名
*/
private String username;
/**
* 連接建立成功調(diào)用的方法
* @param session
* @param username
*/
@OnOpen
public void onOpen(Session session, @PathParam("username") String username) {
// 用戶名為空挽拔,或者用戶名不合法
if (username == null || "".equals(username.trim()) || webSocketServerMap.get(username) != null) {
try {
UserMessage userMessage = new UserMessage();
userMessage.setCode(400);
session.getBasicRemote().sendObject(userMessage);
session.close(new CloseReason(CloseReason.CloseCodes.CANNOT_ACCEPT,"username is illegality"));
return;
} catch (Exception e) {
e.printStackTrace();
return;
}
}
this.username = username;
this.session = session;
// 加入Map中
webSocketServerMap.put(this.username, this);
userList.add(this.username);
// 在線數(shù)+1
onlineCount.addAndGet(1);
// 測試:獲取路徑中的參數(shù)
Map<String, List<String>> requestParameterMap = session.getRequestParameterMap();
requestParameterMap.entrySet().forEach(
entry -> System.out.println(entry.getKey() + ": " + entry.getValue())
);
LOGGER.info("有新窗口開始監(jiān)聽:" + this.username + ",當(dāng)前在線人數(shù)為:" + onlineCount.longValue());
}
/**
* 關(guān)閉連接調(diào)用的方法
*/
@OnClose
public void onClose(Session session, CloseReason reason) {
try {
System.out.println("關(guān)閉原因:" + reason.getReasonPhrase());
session.close();
} catch (IOException e) {
e.printStackTrace();
}
// 從map中刪除, 如果username為null,那么之前onOpen沒有將其添加到map中
if (this.username != null) {
webSocketServerMap.remove(this.username);
userList.remove(this.username);
// 在線數(shù)減1
onlineCount.decrementAndGet();
}
LOGGER.info("有一連接關(guān)閉辆脸!當(dāng)前在線人數(shù)為:" + onlineCount.longValue());
}
/**
* webSocket發(fā)生錯誤時觸發(fā)
* @param error
*/
@OnError
public void onError(Throwable error) {
LOGGER.error("webSocket 發(fā)生錯誤");
error.printStackTrace();
}
/**
* 收到客戶端消息后調(diào)用的方法(@Validated為spring提供的bean驗證功能)
* @param userMessage
* @param session
*/
@OnMessage
public void onMessage(@Validated UserMessage userMessage, Session session) throws InterruptedException, IOException {
userMessage.setFromUser(this.username);
if (userMessage.getMassTexting()) {
// 群發(fā)消息
sendMessageGroup(userMessage);
} else {
// 單獨給個人發(fā)送消息
WebSocketServer webSocketServer = webSocketServerMap.get(userMessage.getToUser());
if (webSocketServer != null) {
try {
webSocketServer.session.getBasicRemote().sendObject(userMessage);
} catch (EncodeException e) {
e.printStackTrace();
}
}
}
}
/**
* 向所有客戶端群發(fā)消息
*/
public static void sendMessageGroup(UserMessage userMessage) throws IOException {
for (WebSocketServer item : webSocketServerMap.values()) {
try {
item.session.getBasicRemote().sendObject(userMessage);
} catch (EncodeException e) {
e.printStackTrace();
}
}
}
/**
* 手動關(guān)閉webSocket連接
* @throws IOException
*/
public static void closeAll() throws IOException {
LOGGER.info("關(guān)閉服務(wù)端所有連接");
for (WebSocketServer item : webSocketServerMap.values()) {
item.session.close();
}
webSocketServerMap.clear();
userList.clear();
}
/**
* 獲取名戶名列表
* @return
*/
public static List<String> getUserList () {
return userList;
}
}
- UserMessageDecoder: 自定義接收消息的解碼器
package com.lingting.websocketlearn.framework.decoder;
import com.alibaba.fastjson.JSON;
import com.lingting.websocketlearn.dto.UserMessage;
import javax.websocket.DecodeException;
import javax.websocket.Decoder;
import javax.websocket.EndpointConfig;
/**
* @author Liucheng
* @date 2019/6/1 16:47
*/
public class UserMessageDecoder implements Decoder.Text<UserMessage> {
/**
* 解碼函數(shù)
* @param s
* @return
* @throws DecodeException
*/
@Override
public UserMessage decode(String s) throws DecodeException {
return JSON.parseObject(s, UserMessage.class);
}
@Override
public boolean willDecode(String s) {
// 這個方法會在解碼之前執(zhí)行,可以再次判斷字符串格式是否合法等螃诅,這里就不演示了
return true;
}
@Override
public void init(EndpointConfig endpointConfig) {
}
@Override
public void destroy() {
}
}
- UserMessageEncoder: 自定義發(fā)送消息的編碼器
package com.lingting.websocketlearn.framework.encoder;
import com.alibaba.fastjson.JSON;
import com.lingting.websocketlearn.dto.UserMessage;
import javax.websocket.EncodeException;
import javax.websocket.Encoder;
import javax.websocket.EndpointConfig;
/**
* @author Liucheng
* @date 2019/6/1 16:47
*/
public class UserMessageEncoder implements Encoder.Text<UserMessage> {
/**
* 編碼函數(shù)
* @param object
* @return
* @throws EncodeException
*/
@Override
public String encode(UserMessage object) throws EncodeException {
return JSON.toJSONString(object);
}
@Override
public void init(EndpointConfig endpointConfig) {
}
@Override
public void destroy() {
}
}
- UserMessage : 消息傳遞 dto (需要自行配置lombok插件)
package com.lingting.websocketlearn.dto;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.ToString;
import javax.validation.constraints.NotNull;
/**
* @author Liucheng
* @date 2019/6/1 13:18
*/
@Data
@ToString
@NoArgsConstructor
@AllArgsConstructor
public class UserMessage {
/**
* 自定義狀態(tài)碼
* 200 成功: 默認
* 400 用戶名已經(jīng)被占用
*/
private Integer code = 200;
/**
* 消息來源
*/
private String fromUser;
/**
* 消息目的用戶
* 使用了bean驗證技術(shù)
*/
@NotNull
private String toUser;
/**
* 消息內(nèi)容
*/
@NotNull
private String messageContent;
/**
* 是否群發(fā)
*/
@NotNull
private Boolean massTexting;
}
- 控制器:用于獲取用戶列表以及關(guān)閉所有連接
package com.lingting.websocketlearn.controller;
import com.lingting.websocketlearn.websocketserver.WebSocketServer;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* @author Liucheng
* @date 2019/5/2 10:59
*/
@RestController
@RequestMapping("/controller")
public class NotifyController {
/**
* 獲取所有的用戶列表
* @return
*/
@GetMapping("/getalluser")
public List<String> getAllUser () {
return WebSocketServer.getUserList();
}
/**
* 關(guān)閉所有webSocket連接
* @return
* @throws IOException
*/
@GetMapping("/closeall")
public Map closeAll () throws IOException {
// 調(diào)用靜態(tài)方法啡氢,關(guān)閉所有連接
WebSocketServer.closeAll();
Map<String, Boolean> result = new HashMap<>();
result.put("success", true);
return result;
}
}
3). 前端
js依賴: 見index.xml描述
index.html
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>webSocket</title>
<style>
*{margin: 0; padding: 0;}
</style>
<script src="js/vue.min.js"></script>
<script src="js/axios.min.js"></script>
<!-- vue/axios的cnd如下 -->
<!--<script src="https://cdn.jsdelivr.net/npm/vue@2.5.16/dist/vue.js"></script>-->
<!--<script src="https://cdn.bootcss.com/axios/0.19.0-beta.1/axios.min.js"></script>-->
</head>
<body>
<div id="app">
<button @click="getUserList()">獲取用戶列表</button>
<ul>
<li v-for="other in otherUsers">{{ other }}</li>
</ul>
<form action="">
<label for="toUser">輸入需要發(fā)送的用戶</label>
<input type="text" v-model="messageSend.toUser" id="toUser"> <br>
<label>是否群發(fā)</label>
<input type="radio" value="true" v-model="messageSend.massTexting">是
<input type="radio" value="false" v-model="messageSend.massTexting" checked>否 <br>
<label for="messageContent">輸入需要發(fā)送的信息</label>
<textarea v-model="messageSend.messageContent" id="messageContent"></textarea>
</form>
<br>
<button @click="sendMessage()">發(fā)送</button>
<br>
<form action="">
<label for="messageReceivedContent">接收到的消息</label>
<textarea v-model="messageReceived.messageContent" id="messageReceivedContent"></textarea>
</form>
<br>
<button @click="close()">關(guān)閉連接</button>
</div>
</body>
<script type="text/javascript">
var VM = new Vue({
el: "#app",
data: {
webSocketServer: null, // webSocket對象
username: null, // 本終端用戶名
messageSend: { // 發(fā)送的消息對象
code: 200,
fromUser: null,
toUser: null,
messageContent: null,
massTexting: false
},
messageReceived: { // 接收的消息對象
code: 200,
fromUser: null,
toUser: null,
messageContent: null,
massTexting: false
},
otherUsers: [],
},
created() {
this.inputUsername ();
},
methods: {
inputUsername () {
while (this.username == null || this.username.trim() === "") {
this.username = prompt("請輸入用戶名!")
}
// 建立連接
this.connection();
},
connection () {
if ("WebSocket" in window) {
console.log("您的瀏覽器支持 WebSocket!");
// 打開一個 web socket【參數(shù)可選术裸,這里只是為了測試】
this.webSocketServer = new WebSocket("ws://localhost:/websocket/" + this.username + "?param=var");
this.webSocketServer.onopen = this.onOpen;
this.webSocketServer.onmessage = this.onMessage;
this.webSocketServer.onclose = this.onClose;
} else {
// 瀏覽器不支持 WebSocket
console.log("您的瀏覽器不支持 WebSocket!");
}
},
onOpen(evt) {
alert("已經(jīng)連上了服務(wù)器")
},
onMessage(evt) {
// 此處獲得的為字符串
this.messageReceived = JSON.parse(evt.data);
if (this.messageReceived.code !== 200) {
console.log(this.messageReceived.code)
this.username = null;
this.inputUsername();
}
console.log(this.messageReceived)
},
onClose() {
alert("客戶端關(guān)閉了連接")
},
sendMessage() {
// 發(fā)送消息
console.log("messageSend: " + this.messageSend)
console.log("messageSendStr: " + JSON.stringify(this.messageSend))
this.webSocketServer.send(JSON.stringify(this.messageSend))
},
close() {
// 關(guān)閉連接
this.webSocketServer.close();
},
getUserList () {
// 獲取用戶列表
let _this = this;
axios.get("/controller/getalluser").then(function (response) {
// 此處自動轉(zhuǎn)換為一個json對象
_this.otherUsers = response.data;
console.log(_this.otherUsers)
}).catch(function (error) {
alert(error)
})
}
}
})
</script>
</html>
4). 配置文件
- application.yml
server:
port: 80
- logback-spring.xml
<?xml version="1.0" encoding="UTF-8"?>
<configuration>
<!--定義日志文件的存儲地址,使用絕對路徑-->
<property name="LOG_HOME" value="d:/logs"/>
<!-- Console 輸出設(shè)置 -->
<appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<!--格式化輸出:%d表示日期倘是,%thread表示線程名,%-5level:級別從左顯示5個字符寬度%msg:日志消息穗椅,%n是換行符-->
<pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n</pattern>
<charset>utf8</charset>
</encoder>
</appender>
<!-- 按照每天生成日志文件 -->
<appender name="FILE" class="ch.qos.logback.core.rolling.RollingFileAppender">
<rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
<!--日志文件輸出的文件名-->
<fileNamePattern>${LOG_HOME}/xc.%d{yyyy-MM-dd}.log</fileNamePattern>
</rollingPolicy>
<encoder>
<pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n</pattern>
</encoder>
</appender>
<!-- 異步輸出 -->
<appender name="ASYNC" class="ch.qos.logback.classic.AsyncAppender">
<!-- 不丟失日志.默認的,如果隊列的80%已滿,則會丟棄TRACT辨绊、DEBUG、INFO級別的日志 -->
<discardingThreshold>0</discardingThreshold>
<!-- 更改默認的隊列的深度,該值會影響性能.默認值為256 -->
<queueSize>512</queueSize>
<!-- 添加附加的appender,最多只能添加一個 -->
<appender-ref ref="FILE"/>
</appender>
<logger name="org.apache.ibatis.cache.decorators.LoggingCache" level="DEBUG" additivity="false">
<appender-ref ref="CONSOLE"/>
</logger>
<logger name="org.springframework.boot" level="WARNING"/>
<root level="info">
<!--<appender-ref ref="ASYNC"/>-->
<appender-ref ref="FILE"/>
<appender-ref ref="CONSOLE"/>
</root>
</configuration>
四匹表、測試說明
運行Application, 打開多個本機瀏覽器頁面门坷,在頁面地址欄輸入
localhost
宣鄙,按照提示操作即可