?WebSocket是一種在單個TCP連接上進(jìn)行全雙工通信的協(xié)議。WebSocket 協(xié)議在2008年誕生,2011年成為國際標(biāo)準(zhǔn)。現(xiàn)在所有瀏覽器都已經(jīng)支持了。它的最大特點(diǎn)就是锨推,服務(wù)器可以主動向客戶端推送信息,客戶端也可以主動向服務(wù)器發(fā)送信息公壤,是真正的雙向平等對話换可,屬于服務(wù)器推送技術(shù)的一種。
?最近有需求使用WebSocket作為中間件去轉(zhuǎn)發(fā)數(shù)據(jù)厦幅,于是就使用SpringBoot2.x與其進(jìn)行整合沾鳄,在此期間遇到了一些問題,總算是都解決了确憨。話不多說译荞,直接上代碼。
1休弃、添加jar支持吞歼,build.gradle如下:
plugins {
id 'org.springframework.boot' version '2.1.3.RELEASE'
id 'java'
id 'idea'
// id 'war'
}
apply plugin: 'io.spring.dependency-management'
group = 'com.tiantian'
version = '1.0.0'
sourceCompatibility = '1.8'
targetCompatibility = '1.8'
repositories {
mavenCentral()
}
dependencies {
testImplementation (
'org.springframework.boot:spring-boot-starter-test'
)
implementation (
'org.springframework.boot:spring-boot-starter-web',
'org.springframework.boot:spring-boot-starter-websocket',
//'org.yeauty:netty-websocket-spring-boot-starter:0.7.4',
'org.springframework.boot:spring-boot-starter-thymeleaf',
'org.apache.commons:commons-lang3:3.8.1',
'com.alibaba:fastjson:1.2.56'
)
// lombok支持,打包時處理
compileOnly ('org.projectlombok:lombok:1.18.6')
annotationProcessor 'org.projectlombok:lombok:1.18.6'
}
// 使用bootJar打jar包
jar {
baseName = 'webserver'
version = '1.0.0'
manifest {
attributes "Manifest-Version": 1.0,
'Main-Class': 'com.tiantian.webserver.WebserverApplication'
}
}
// 解決代碼中的中文編譯報錯
tasks.withType(JavaCompile) {
options.encoding = "UTF-8"
}
說明:此項目是打成jar運(yùn)行的塔猾,當(dāng)然也可以打成war放在Tomcat上運(yùn)行篙骡。
2、配置開啟WebSocket功能
/**
* WebSocket配置
*
* @author yueli.liao
* @date 2019-03-12 11:25
*/
@Configuration
public class WebSocketConfig {
/**
* 開啟WebSocket功能
*
* @return
*/
@Bean
public ServerEndpointExporter serverEndpointExporter() {
return new ServerEndpointExporter();
}
}
3丈甸、WebSocket核心服務(wù)
/**
* WebSocket服務(wù)入口
*
* @author yueli.liao
* @date 2019-03-12 15:16
*/
@Slf4j
@Component
@ServerEndpoint("/websocket/{id}")
public class WebSocketServer {
// 客戶端ID
private String id = "";
// 與某個客戶端的連接會話糯俗,需要通過它來給客戶端發(fā)送數(shù)據(jù)
private Session session;
// 記錄當(dāng)前在線連接數(shù)(為保證線程安全,須對使用此變量的方法加lock或synchronized)
private static int onlineCount = 0;
// 用來存儲當(dāng)前在線的客戶端(此map線程安全)
private static ConcurrentHashMap<String, WebSocketServer> webSocketMap = new ConcurrentHashMap<>();
/**
* 連接建立成功后調(diào)用
*/
@OnOpen
public void onOpen(@PathParam(value = "id") String id, Session session) {
this.session = session;
this.id = id; // 接收到發(fā)送消息的客戶端編號
webSocketMap.put(id, this); // 加入map中
addOnlineCount(); // 在線數(shù)加1
log.info("客戶端" + id + "加入睦擂,當(dāng)前在線數(shù)為:" + getOnlineCount());
try {
sendMessage("WebSocket連接成功");
} catch (IOException e) {
log.error("WebSocket IO異常");
}
}
/**
* 連接關(guān)閉時調(diào)用
*/
@OnClose
public void onClose() {
webSocketMap.remove(this); // 從map中刪除
subOnlineCount(); // 在線數(shù)減1
log.info("有一連接關(guān)閉叶骨,當(dāng)前在線數(shù)為:" + getOnlineCount());
}
/**
* 收到客戶端消息后調(diào)用
*
* @param message 客戶端發(fā)送過來的消息<br/>
* 消息格式:內(nèi)容 - 表示群發(fā),內(nèi)容|X - 表示發(fā)給id為X的客戶端
* @param session
*/
@OnMessage
public void onMessage(String message, Session session) {
log.info("來自客戶端的消息:" + message);
String[] messages = message.split("[|]");
try {
if (messages.length > 1) {
sendToUser(messages[0], messages[1]);
} else {
sendToAll(messages[0]);
}
} catch (IOException e) {
log.error(e.getMessage(), e);
}
}
/**
* 發(fā)生錯誤時回調(diào)
*
* @param session
* @param error
*/
@OnError
public void onError(Session session, Throwable error) {
log.error("WebSocket發(fā)生錯誤");
error.printStackTrace();
}
/**
* 推送信息給指定ID客戶端祈匙,如客戶端不在線,則返回不在線信息給自己
*
* @param message 客戶端發(fā)來的消息
* @param sendClientId 客戶端ID
* @throws IOException
*/
public void sendToUser(String message, String sendClientId) throws IOException {
if (webSocketMap.get(sendClientId) != null) {
if (!id.equals(sendClientId)) {
webSocketMap.get(sendClientId).sendMessage("客戶端" + id + "發(fā)來消息:" + " <br/> " + message);
} else {
webSocketMap.get(sendClientId).sendMessage(message);
}
} else {
// 如客戶端不在線天揖,則返回不在線信息給自己
sendToUser("當(dāng)前客戶端不在線", id);
}
}
/**
* 推送發(fā)送信息給所有人
*
* @param message 要推送的消息
* @throws IOException
*/
public void sendToAll(String message) throws IOException {
for (String key : webSocketMap.keySet()) {
webSocketMap.get(key).sendMessage(message);
}
}
/**
* 推送消息
*
* @param message 要推送的消息
* @throws IOException
*/
private void sendMessage(String message) throws IOException {
this.session.getBasicRemote().sendText(message);
}
private static synchronized int getOnlineCount() {
return onlineCount;
}
private static synchronized void addOnlineCount() {
WebSocketServer.onlineCount++;
}
private static synchronized void subOnlineCount() {
WebSocketServer.onlineCount--;
}
}
4夺欲、為解決大文本(超過WebSocket協(xié)議限制的長度)不能發(fā)送的問題,有兩種解決辦法今膊,一種是在Tomcat \conf\web.xml文件添加如下代碼:
<!-- 注:單位為byte -->
<context-param>
<param-name>org.apache.tomcat.websocket.textBufferSize</param-name>
<param-value>1024000</param-value>
</context-param>
<context-param>
<param-name>org.apache.tomcat.websocket.binaryBufferSize</param-name>
<param-value>1024000</param-value>
</context-param>
另一種是在服務(wù)啟動時動態(tài)進(jìn)行設(shè)置些阅,在項目中添加如下代碼:
/**
* 解決WebSocket傳輸內(nèi)容過長的問題
* 注:若發(fā)送內(nèi)容過長時,服務(wù)端無內(nèi)容輸出斑唬,連接會自動關(guān)閉
*
* @author yueli.liao
* @date 2019-03-13 13:35
*/
@Slf4j
@Configuration
@ComponentScan
@EnableAutoConfiguration
public class WebAppRootContext implements ServletContextInitializer {
@Value("${websocket.bufferSize}")
private String bufferSize;
@Override
public void onStartup(ServletContext servletContext) throws ServletException {
log.info("WebSocket最大文本長度:" + bufferSize);
servletContext.addListener(WebAppRootListener.class);
servletContext.setInitParameter("org.apache.tomcat.websocket.textBufferSize", bufferSize);
}
}
代碼中的websocket.bufferSize市埋,在application.properties進(jìn)行配置黎泣,如下:
#websocket textSize(單位為byte)
websocket.bufferSize=1024000
說明:推薦使用第二種方式,這樣可移植性強(qiáng)缤谎,對部署和開發(fā)都很方便抒倚。
5、測試坷澡,也有兩種方式托呕,采用哪種或兩種都可以。
- 一是在Google或Firefox瀏覽器中安裝WebSocket Client插件進(jìn)行測試频敛;
- 二是在項目中寫相關(guān)的JS進(jìn)行測試项郊,代碼如下:
<!DOCTYPE HTML>
<html>
<head>
<title>WebSocket測試</title>
</head>
<body>
Welcome<br/>
<input id="text" type="text" />
<button onclick="send()">發(fā)送消息</button>
<button onclick="closeWebSocket()">關(guān)閉連接</button>
<div id="message">
</div>
</body>
<script type="text/javascript">
var websocket = null;
// 判斷當(dāng)前瀏覽器是否支持WebSocket
if('WebSocket' in window){
// 為了方便測試,故將鏈接寫死
websocket = new WebSocket("ws://localhost:8088/websocket/1");
} else{
alert('Not support websocket')
}
// 連接發(fā)生錯誤的回調(diào)方法
websocket.onerror = function(){
setMessageInnerHTML("發(fā)生錯誤");
};
// 連接成功建立的回調(diào)方法
websocket.onopen = function(event){
setMessageInnerHTML("打開連接");
}
// 接收到消息的回調(diào)方法
websocket.onmessage = function(event){
setMessageInnerHTML(event.data);
}
// 連接關(guān)閉的回調(diào)方法
websocket.onclose = function(){
setMessageInnerHTML("關(guān)閉連接");
}
// 監(jiān)聽窗口關(guān)閉事件斟赚,當(dāng)窗口關(guān)閉時着降,主動去關(guān)閉websocket連接,防止連接還沒斷開就關(guān)閉窗口拗军,server端會拋異常
window.onbeforeunload = function(){
websocket.close();
}
// 將消息顯示在網(wǎng)頁上
function setMessageInnerHTML(innerHTML){
document.getElementById('message').innerHTML += innerHTML + '<br/>';
}
// 關(guān)閉連接
function closeWebSocket(){
websocket.close();
}
// 發(fā)送消息
function send(){
var message = document.getElementById('text').value;
websocket.send(message);
}
</script>
</html>
說明:此webserver可以進(jìn)行點(diǎn)對點(diǎn)推送消息任洞,也可以進(jìn)行廣播。源代碼已上傳到GitHub食绿,可自行下載侈咕。