寫(xiě)在前面
本文介紹在 SpringBoot 項(xiàng)目中使用 WebSocket, 借助 STOMP 和 SocketJS
效果圖
客戶端向服務(wù)端發(fā)消息
服務(wù)端向客戶端發(fā)消息
廣播式
點(diǎn)對(duì)點(diǎn)
開(kāi)始搭建
新建 SpringBoot 項(xiàng)目, 依賴勾選 WebSocket
和 Thymeleaf
WebSocket
配置
/**
* WebSocket 配置
*
* @author niuyy
* @date 2018/3/23
*/
@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {
@Override
public void registerStompEndpoints(StompEndpointRegistry stompEndpointRegistry) {
stompEndpointRegistry.setErrorHandler(this.webSocketHandler())
.addEndpoint("/endpointNiu")
.setAllowedOrigins("*").withSockJS();
}
@Override
public void configureMessageBroker(MessageBrokerRegistry registry) {
registry.enableSimpleBroker("/topic", "/queue");
registry.setUserDestinationPrefix("/user");
}
/**
* WebSocket Error 處理
*
* @return WebSocket Error 處理器
*/
@Bean
public StompSubProtocolErrorHandler webSocketHandler() {
return new WebSocketErrorHandler();
}
}
其中:
-
@EnableWebSocketMessageBroker
開(kāi)啟使用STOMP
協(xié)議來(lái)傳輸基于代理的消息,Broker是代理 -
setErrorHandler
設(shè)置一個(gè)錯(cuò)誤處理的 Handler, 以便捕捉錯(cuò)誤信息, 文章末尾有貼出代碼 -
addEndpoint
切入點(diǎn), 客戶端在 new SockJs 的時(shí)候用到 -
setAllowedOrigins
設(shè)置為「*」表示接收http
和https
的請(qǐng)求 -
withSockJS
使用SockJS
-
enableSimpleBroker
參數(shù)是多個(gè)destinationPrefixes
, 服務(wù)端發(fā)送消息的destination
要有這些前綴 -
setUserDestinationPrefix
設(shè)置點(diǎn)對(duì)點(diǎn)時(shí), destination 的前綴, 如客戶端訂閱/user/{userId}/getPoint
, 服務(wù)端
發(fā)送消息時(shí), 調(diào)用messagingTemplate.convertAndSendToUser(userId, "/getPoint", msg)
控制層
/**
* @author niuyy
* @date 2018/3/23
*/
@Controller
@Slf4j
public class WebController {
/**
* 接收消息
* @param name 姓名
* @return welcome, [姓名] !
*/
@MessageMapping("/welcome")
@SendTo("/topic/getBro")
public String say(String name) {
log.info("name: " + name);
return "welcome, " + name + " !";
}
@Autowired
private SimpMessagingTemplate messagingTemplate;
/**
* 廣播式發(fā)送消息給訂閱了「/topic/getBro」的客戶端
*/
@RequestMapping("sendMsgBro")
@ResponseBody
public void sendMsg() {
messagingTemplate.convertAndSend("/topic/getBro", "服武器主動(dòng)推送的廣播消息");
}
/**
* 發(fā)送消息給指定 sessionId 的客戶端, 且該客戶端訂閱了「/topic/getBro」
*
* @param sessionId 客戶端的 sessionId
*/
@RequestMapping("sendMsgPoint")
@ResponseBody
public void sendMsgPoint(String sessionId) {
messagingTemplate.convertAndSendToUser(sessionId, "/queue/getPoint", "服武器主動(dòng)推送的點(diǎn)對(duì)點(diǎn)消息", createHeaders(sessionId));
}
private MessageHeaders createHeaders(String sessionId) {
SimpMessageHeaderAccessor headerAccessor = SimpMessageHeaderAccessor.create(SimpMessageType.MESSAGE);
headerAccessor.setSessionId(sessionId);
headerAccessor.setLeaveMutable(true);
return headerAccessor.getMessageHeaders();
}
}
其中:
-
@MessageMapping
類似于@RequestMapping
, 只不過(guò)映射的是 webSocket 的請(qǐng)求地址 -
@SendTo
指定該方法響應(yīng)給哪個(gè) topic, 客戶端訂閱了/topic/getBro
的都能收到方法響應(yīng) -
convertAndSend
和convertAndSendToUser
本質(zhì)是一樣的, 底層調(diào)用同一方法, 是服務(wù)端主動(dòng)發(fā)送消息
這里說(shuō)明一點(diǎn), 本文中使用的是客戶端的 sessionId
實(shí)現(xiàn)的點(diǎn)對(duì)點(diǎn)消息發(fā)送, 另外, 還有客戶端訂閱 /user/{userId}/topic
, 服務(wù)端
調(diào)用 messagingTemplate.convertAndSendToUser(userId, "/topic", msg)
的方法并未給出, 原因是筆者認(rèn)為該方法和廣播式類似
監(jiān)聽(tīng)器
新客戶端連接
/**
* @author niuyy
* @date 2018/3/26
*/
@Slf4j
@Component
public class WebSocketConnectListener implements ApplicationListener<SessionConnectEvent> {
@Override
public void onApplicationEvent(SessionConnectEvent event) {
StompHeaderAccessor sha = StompHeaderAccessor.wrap(event.getMessage());
String sessionId = sha.getSessionId();
log.info("sessionId: {} 已連接", sessionId);
}
}
其中
-
sessionId
用于點(diǎn)對(duì)點(diǎn)發(fā)送消息 -
@Component
自動(dòng)注入
斷開(kāi)連接監(jiān)聽(tīng)器
/**
* @author niuyy
* @date 2018/3/26
*/
@Slf4j
@Component
public class WebSocketDisconnectListener implements ApplicationListener<SessionDisconnectEvent> {
@Override
public void onApplicationEvent(SessionDisconnectEvent event) {
StompHeaderAccessor sha = StompHeaderAccessor.wrap(event.getMessage());
String sessionId = sha.getSessionId();
log.info("sessionId: {} 已斷開(kāi)", sessionId);
}
}
讀者可根據(jù)不同需求, 在斷開(kāi)連接時(shí)執(zhí)行不同操作
客戶端
<html lang="en" xmlns:th="http://www.thymeleaf.org">
<head>
<meta charset="UTF-8"/>
<title>WebSocket</title>
<script th:src="@{js/sockjs.min.js}"></script>
<script th:src="@{js/stomp.js}"></script>
<script th:src="@{js/jquery-3.1.1.js}"></script>
</head>
<body onload="disconnect()">
<noscript><h2 style="color: #e80b0a;">Sorry背苦,瀏覽器不支持WebSocket</h2></noscript>
<div>
<div>
<button id="connect" onclick="connect();">連接</button>
<button id="disconnect" disabled="disabled" onclick="disconnect();">斷開(kāi)連接</button>
</div>
<div id="conversationDiv">
<label>輸入你的名字</label><input type="text" id="name"/>
<button id="sendName" onclick="sendName();">發(fā)送</button>
<p id="response"></p>
</div>
</div>
<script type="text/javascript">
var stompClient = null;
function setConnected(connected) {
document.getElementById("connect").disabled = connected;
document.getElementById("disconnect").disabled = !connected;
document.getElementById("conversationDiv").style.visibility = connected ? 'visible' : 'hidden';
// $("#connect").disabled = connected;
// $("#disconnect").disabled = !connected;
$("#response").html();
}
function connect() {
var socket = new SockJS('/endpointNiu');
stompClient = Stomp.over(socket);
stompClient.connect({}, function (frame) {
setConnected(true);
console.log('Connected:' + frame);
stompClient.subscribe('/user/queue/getPoint', function (response) {
showResponse("getPoint " + response.body);
});
stompClient.subscribe('/topic/getBro', function (response) {
showResponse("getBro " + response.body);
})
});
}
function disconnect() {
if (stompClient != null) {
stompClient.disconnect();
}
setConnected(false);
console.log('Disconnected');
}
function sendName() {
var name = $('#name').val();
console.log('name:' + name);
stompClient.send("/welcome", {}, name);
}
function showResponse(message) {
$("#response").html(message);
}
</script>
</body>
</html>
其中
- 引入的 js 文件在案例源碼中有
- 創(chuàng)建
SockJS
:var socket = new SockJS('/endpointNiu');
, 參數(shù)為在服務(wù)端設(shè)置的endpoint
- 訂閱了兩個(gè) topic, 「/topic/getBro」接受廣播消息, 「/user/queue/getPoint」接受點(diǎn)對(duì)點(diǎn)消息, 服務(wù)端在發(fā)送
點(diǎn)對(duì)點(diǎn)消息的時(shí)候,destination
是沒(méi)有「/user」的, 但是在WebSocket
中我們已經(jīng)配置過(guò), 再看源碼就懂了
@Override
public void convertAndSendToUser(String user, String destination, Object payload,
@Nullable Map<String, Object> headers, @Nullable MessagePostProcessor postProcessor)
throws MessagingException {
Assert.notNull(user, "User must not be null");
user = StringUtils.replace(user, "/", "%2F");
destination = destination.startsWith("/") ? destination : "/" + destination;
super.convertAndSend(this.destinationPrefix + user + destination, payload, headers, postProcessor);
}
添加地址映射
/**
* WebMvc 配置類
*
* @author niuyy
* @date 2018/3/23
*/
@Configuration
public class WebMvcConfig extends WebMvcConfigurationSupport {
@Override
public void addViewControllers(ViewControllerRegistry registry) {
registry.addViewController("/ws").setViewName("/ws");
}
@Override
protected void addResourceHandlers(ResourceHandlerRegistry registry) {
registry.addResourceHandler("/js/**")
.addResourceLocations("classpath:/static/js/");
}
}
其中
-
addViewController
新增視圖 -
addResourceHandlers
新增資源(靜態(tài)文件等)
原理
- HTTP: 握手 -> 交換數(shù)據(jù), 握手 -> 交換數(shù)據(jù), 握手 -> 交換數(shù)據(jù) ...
- WebSocket: 握手 -> 交換數(shù)據(jù) -> 交換數(shù)據(jù) -> 交換數(shù)據(jù) ... , 建立連接后, 直接使用 tcp 交換數(shù)據(jù)
總結(jié)
使用情景
- 服務(wù)端需要主動(dòng)發(fā)送消息給客戶端
- 以前客戶端 ajax 輪詢的需求都可以用這個(gè)替換, 減少資源開(kāi)銷
參考
- 在Spring Boot框架下使用WebSocket實(shí)現(xiàn)消息推送
- WebSocket 是什么原理?為什么可以實(shí)現(xiàn)持久連接?
- SpringBoot學(xué)習(xí)-(十三)SpringBoot中建立WebSocket連接(STOMP)
- Socket 與 WebSocket
案例源碼
WebSocketErrorHandler.java
/**
* @author niuyy
* @date 2018/3/26
*/
@Slf4j
public class WebSocketErrorHandler extends StompSubProtocolErrorHandler {
public WebSocketErrorHandler() {
super();
}
@Override
public Message<byte[]> handleClientMessageProcessingError(Message<byte[]> clientMessage, Throwable ex) {
log.error("handleClientMessageProcessingError:clientMessage-" + clientMessage + ", error-"+ex.getMessage());
return super.handleClientMessageProcessingError(clientMessage, ex);
}
@Override
public Message<byte[]> handleErrorMessageToClient(Message<byte[]> errorMessage) {
log.error("handleErrorMessageToClient:errorMessage-" + errorMessage);
return super.handleErrorMessageToClient(errorMessage);
}
@Override
protected Message<byte[]> handleInternal(StompHeaderAccessor errorHeaderAccessor, byte[] errorPayload, Throwable cause, StompHeaderAccessor clientHeaderAccessor) {
log.error("handleInternal:errorHeaderAccessor-" + errorHeaderAccessor + ", errorPayload-" + errorPayload + ", error-" + cause.getMessage() + ", clientHeaderAccessor-"+clientHeaderAccessor);
return super.handleInternal(errorHeaderAccessor, errorPayload, cause, clientHeaderAccessor);
}
}
目錄截圖