本文講解通過WebSocket的高級封裝Stomp協(xié)議完成消息的互動
上篇文章SpringBoot WebSocket 服務(wù)器主動推送(一)是基于純WebSocket
的服務(wù)器端推送儒士,它只完成了客戶端建立連接->阻塞等待服務(wù)器消息的功能喧笔。對于客戶端推送消息、訂閱消息仇参,服務(wù)器推送消息的內(nèi)容在本文概述。話不多說,上代碼。
服務(wù)端
1.Maven 項目在pom.xml 里引入websocket 依賴双藕。
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
- Boot 啟動類
@SpringBootApplication
public class Application implements WebSocketConfigurer {
public static void main(String[] args) {
new SpringApplicationBuilder(Application.class).bannerMode(Banner.Mode.OFF).run(args);
}
//Socket消息模版類,用來向客戶端推送消息
@Autowired
private SimpMessagingTemplate simpMessagingTemplate;
//用請求的方式模擬主動推送消息
@GetMapping("notice")
public String notice(String name) {
//這里定義了訂閱消息的路徑是"/queue/notice"阳仔,客戶端請求的路徑則為:"/user/queue/notice"
simpMessagingTemplate.convertAndSendToUser(name, "/queue/notice", "當(dāng)前時間是:" + new Date());
return "已發(fā)送";
}
}
3.WebSocket配置類忧陪,定義socket連接、推送近范、訂閱路徑嘶摊,以及關(guān)聯(lián)用戶鑒權(quán)帶業(yè)務(wù)
//開啟WebSocket,并啟用 STOMP
@EnableWebSocketMessageBroker
@Configuration
public class WebSocketMessageBrokerConfig extends AbstractWebSocketMessageBrokerConfigurer {
@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
//將“/register”注冊為STOMP端點,客戶端在訂閱或發(fā)布消息到目的地路徑前评矩,要連接該端點
registry.addEndpoint("/register")
//自定義每個客戶端對應(yīng)的標(biāo)識叶堆,用于服務(wù)端精準(zhǔn)消息推送
.setHandshakeHandler(new DefaultHandshakeHandler(){
@Override
protected Principal determineUser(ServerHttpRequest request, WebSocketHandler wsHandler, Map<String, Object> attributes) {
//將客戶端標(biāo)識封裝為Principal對象,從而讓服務(wù)端能通過getName()方法找到指定客戶端
Object o = attributes.get("name");
return new FastPrincipal(o.toString());
}
})
//添加socket攔截器斥杜,用于從請求中獲取客戶端標(biāo)識參數(shù)
.addInterceptors(NameHandshakeInterceptor()).withSockJS();
}
@Override
public void configureMessageBroker(MessageBrokerRegistry config) {
//客戶端發(fā)送消息的請求前綴
config.setApplicationDestinationPrefixes("/app");
//客戶端訂閱消息的請求前綴虱颗,topic一般用于廣播推送,queue用于點對點推送
config.enableSimpleBroker("/topic", "/queue");
//服務(wù)端通知客戶端的前綴蔗喂,可以不設(shè)置忘渔,默認(rèn)為user
config.setUserDestinationPrefix("/user");
}
class FastPrincipal implements Principal {
private final String name;
public FastPrincipal(String name) {
this.name = name;
}
@Override
public String getName() {
return name;
}
}
}
3.Socket攔截器,將客戶端請求的參數(shù)存儲至SocketSession中
/**
* 檢查握手請求和響應(yīng), 對WebSocketHandler傳遞屬性
*/
public static class CountHandshakeInterceptor implements HandshakeInterceptor {
/**
* 在握手之前執(zhí)行該方法, 繼續(xù)握手返回true, 中斷握手返回false.
* 通過attributes參數(shù)設(shè)置WebSocketSession的屬性
*
* @param request
* @param response
* @param wsHandler
* @param attributes
* @return
* @throws Exception
*/
@Override
public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response,
WebSocketHandler wsHandler, Map<String, Object> attributes) throws Exception {
String name= ((ServletServerHttpRequest) request).getServletRequest().getParameter("name");
System.out.println("======================Interceptor" + name);
//保存客戶端標(biāo)識
attributes.put("name", name);
return true;
}
/**
* 在握手之后執(zhí)行該方法. 無論是否握手成功都指明了響應(yīng)狀態(tài)碼和相應(yīng)頭.
*
* @param request
* @param response
* @param wsHandler
* @param exception
*/
@Override
public void afterHandshake(ServerHttpRequest request, ServerHttpResponse response,
WebSocketHandler wsHandler, Exception exception) {
}
}
客戶端
public class StompSocketTest {
private static Logger logger = LoggerFactory.getLogger(StompSocketTest.class);
private final static WebSocketHttpHeaders headers = new WebSocketHttpHeaders();
@Test
public void testStompSubscribe() throws ExecutionException, InterruptedException {
StompSocketTest helloClient = new StompSocketTest();
ListenableFuture<StompSession> f = helloClient.connect("ws://localhost:8080/register?name=1");
StompSession stompSession = f.get();
logger.info("Subscribing to greeting topic using session " + stompSession);
helloClient.subscribeGreetings("/user/queue/notice", stompSession);
Thread.sleep(600000);
}
public ListenableFuture<StompSession> connect(String url) {
Transport webSocketTransport = new WebSocketTransport(new StandardWebSocketClient());
List<Transport> transports = Collections.singletonList(webSocketTransport);
SockJsClient sockJsClient = new SockJsClient(transports);
sockJsClient.setMessageCodec(new Jackson2SockJsMessageCodec());
WebSocketStompClient stompClient = new WebSocketStompClient(sockJsClient);
return stompClient.connect(url, headers, new MyHandler(), "localhost", 8080);
}
public void subscribeGreetings(String url, StompSession stompSession) throws ExecutionException, InterruptedException {
stompSession.subscribe(url, new StompFrameHandler() {
public Type getPayloadType(StompHeaders stompHeaders) {
return byte[].class;
}
public void handleFrame(StompHeaders stompHeaders, Object o) {
logger.info("Received greeting " + new String((byte[]) o));
}
});
}
private class MyHandler extends StompSessionHandlerAdapter {
public void afterConnected(StompSession stompSession, StompHeaders stompHeaders) {
logger.info("Now connected");
}
@Override
public void handleTransportError(StompSession session, Throwable exception) {
exception.printStackTrace();
}
@Override
public void handleFrame(StompHeaders headers, Object payload) {
super.handleFrame(headers, payload);
logger.debug("=========================handleFrame");
}
}
}
至此便完成了主要代碼邏輯缰儿。先啟動服務(wù)端畦粮,然后運(yùn)行客戶端建立WebSocket連接,接著在瀏覽器地址欄輸入localhost:8080/notice?name=1
,服務(wù)器便會找到對應(yīng)的socketSession
對其進(jìn)行推送消息锈玉。
參考資料
springmvc(18)使用WebSocket 和 STOMP 實現(xiàn)消息功能
Spring Websocket/STOMP 和SpringSession整合 初步