本文主要講解SpringBoot 如何基于WebSocket 實現(xiàn)主動推送消息給用戶
消息推送的業(yè)務(wù)邏輯為服務(wù)端開啟WebSocket 服務(wù),客戶端通過建立長連接進入等待狀態(tài)溶其,服務(wù)器在合適的時候推送消息給客戶端各吨,最后客戶端接受消息自行處理槽地。話不多說,上關(guān)鍵代碼。
服務(wù)端
- Maven 項目在pom.xml 里引入websocket 依賴怎静。
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
- Boot 啟動類
//開啟WebSocket
@EnableWebSocket
@SpringBootApplication
public class Application implements WebSocketConfigurer {
public static void main(String[] args) {
new SpringApplicationBuilder(Application.class).bannerMode(Banner.Mode.OFF).run(args);
}
//用請求的方式模擬推送消息的時候
@GetMapping("notice")
public String notice(String count) {
counterHandler.sendMessageToUser(count, "當(dāng)前時間是:" + new Date());
return "已發(fā)送";
}
/**
* 注冊WebSocket處理類
*
* @param webSocketHandlerRegistry
*/
@Override
public void registerWebSocketHandlers(WebSocketHandlerRegistry webSocketHandlerRegistry) {
//支持websocket 的 connection,指定counterHandler處理路徑為/counter 的長連接請求
webSocketHandlerRegistry.addHandler(counterHandler(), "/counter")
//添加WebSocket握手請求的攔截器.
.addInterceptors(new CounterHandler.CountHandshakeInterceptor());
//不支持websocket的connenction,采用sockjs
webSocketHandlerRegistry.addHandler(counterHandler(), "/sockjs/counter")
//添加WebSocket握手請求的攔截器.
.addInterceptors(new CounterHandler.CountHandshakeInterceptor()).withSockJS();
}
@Bean
public CounterHandler counterHandler() {
return new CounterHandler();
}
}
- Socket處理類
public class CounterHandler extends TextWebSocketHandler {
public static final String COLLECTOR = "collector";
private static final List<WebSocketSession> COUNTS = new ArrayList<>();
@Override
public void afterConnectionEstablished(WebSocketSession session) {
System.out.println("Connection established");
COUNTS.add(session);
}
@Override
protected void handleTextMessage(WebSocketSession session, TextMessage message)
throws Exception {
System.out.println("Received:" + message.getPayload());
}
/**
* 給某個用戶發(fā)送消息
*
* @param count
* @param message
*/
public void sendMessageToUser(String count, String message) {
//遍歷記錄的session黔衡,取出符合條件的session發(fā)送消息
for (WebSocketSession socketSession : COUNTS) {
if (socketSession.getAttributes().get(COLLECTOR).equals(count)) {
try {
if (socketSession.isOpen()) {
//最關(guān)鍵的一句蚓聘,給客戶端推送消息
socketSession.sendMessage(new TextMessage(message));
}
} catch (IOException e) {
e.printStackTrace();
}
COUNTS.remove(socketSession);
break;
}
}
}
/**
* 檢查握手請求和響應(yīng), 對WebSocketHandler傳遞屬性
*/
public static class CountHandshakeInterceptor implements HandshakeInterceptor {
/**
* 在握手之前執(zhí)行該方法, 繼續(xù)握手返回true, 中斷握手返回false.
* 通過attributes參數(shù)設(shè)置WebSocketSession的屬性
*
* @param request
* @param response
* @param wsHandler
* @param attributes
* @return
* @throws Exception
*/
@Override
public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response,
WebSocketHandler wsHandler, Map<String, Object> attributes) throws Exception {
String collector = ((ServletServerHttpRequest) request).getServletRequest().getParameter(COLLECTOR);
if (Strings.isNullOrEmpty(collector)) {
return false;
} else {
attributes.put(COLLECTOR, collector);
return true;
}
}
/**
* 在握手之后執(zhí)行該方法. 無論是否握手成功都指明了響應(yīng)狀態(tài)碼和相應(yīng)頭.
*
* @param request
* @param response
* @param wsHandler
* @param exception
*/
@Override
public void afterHandshake(ServerHttpRequest request, ServerHttpResponse response,
WebSocketHandler wsHandler, Exception exception) {
}
}
}
客戶端
public class WebSocketTest {
//服務(wù)器WebSocket 連接地址
private static final String WS_URI = "ws://localhost:8080/counter?collector=1";
public static void main(String[] args) throws IOException, InterruptedException {
StandardWebSocketClient client = new StandardWebSocketClient();
WebSocketConnectionManager manager = new WebSocketConnectionManager(client, new MyHandler(), WS_URI);
manager.start();
Thread.sleep(100000);
}
private static class MyHandler extends TextWebSocketHandler {
@Override
public void afterConnectionEstablished(WebSocketSession session) throws Exception {
System.out.println("connected...........");
session.sendMessage(new TextMessage("hello, web socket"));
super.afterConnectionEstablished(session);
}
@Override
protected void handleTextMessage(WebSocketSession session, TextMessage message)
throws Exception {
System.out.println("receive: " + message.getPayload());
super.handleTextMessage(session, message);
}
}
}
至此便完成了主要代碼邏輯。先啟動服務(wù)端盟劫,然后運行客戶端建立WebSocket連接夜牡,接著在瀏覽器地址欄輸入localhost:8080/notice?count=1
,服務(wù)器便會找到對應(yīng)的socketSession
對其進行推送消息侣签。