spring cloud zuul 如何轉(zhuǎn)發(fā) websocket 請(qǐng)求
????網(wǎng)上關(guān)于spring boot 使用websocket 的文章很多笨农,但是涉及spring cloud zuul如何轉(zhuǎn)發(fā)websocket請(qǐng)求的文章很少就缆,據(jù)網(wǎng)上資料顯示zuul 1.x不支持websocket,2.x支持。在考慮到當(dāng)前項(xiàng)目從spring boot 1.x 更換spring boot 2.x 復(fù)雜度高谒亦,決定根據(jù)當(dāng)前開(kāi)發(fā)版本尋找解決方案(實(shí)在不行竭宰,開(kāi)個(gè)服務(wù)端口就是;方法總比問(wèn)題多)份招。
代碼解析
????文章圍繞github上的一個(gè)解決方案(https://github.com/mthizo247/spring-cloud-netflix-zuul-websocket)展開(kāi)詳細(xì)描述切揭。作者提供的demo(https://github.com/mthizo247/zuul-websocket-support-demo)可以運(yùn)行成功,但基于訂閱topic廣播的樣例明顯不夠脾还,點(diǎn)對(duì)點(diǎn)發(fā)送或?qū)⑾l(fā)送到指定客戶(hù)端的業(yè)務(wù)場(chǎng)景也很常見(jiàn)伴箩,接下來(lái)針對(duì)websocket廣播和點(diǎn)對(duì)點(diǎn)消息方式講解具體的實(shí)現(xiàn)細(xì)節(jié)。
實(shí)現(xiàn)邏輯如下圖:
- 網(wǎng)關(guān)添加微服務(wù)的endpoint鄙漏、broken;
- 客戶(hù)端向網(wǎng)關(guān)發(fā)送websocket請(qǐng)求,并轉(zhuǎn)發(fā)訂閱微服務(wù)websocket;
/**
* 網(wǎng)關(guān)接收到webSocket-client發(fā)送消息,
* 并向微服務(wù)轉(zhuǎn)發(fā)websocket請(qǐng)求
*/
@Override
public void handleMessage(WebSocketSession session, WebSocketMessage<?> message)
throws Exception {
super.handleMessage(session, message);
handleMessageFromClient(session, message);
}
private void handleMessageFromClient(WebSocketSession session,
WebSocketMessage<?> message) throws Exception {
boolean handled = false;
WebSocketMessageAccessor accessor = WebSocketMessageAccessor.create(message);
if (StompCommand.SEND.toString().equalsIgnoreCase(accessor.getCommand())) {
handled = true;
sendMessageToProxiedTarget(session, accessor);
}
if (StompCommand.SUBSCRIBE.toString().equalsIgnoreCase(accessor.getCommand())) {
handled = true;
subscribeToProxiedTarget(session, accessor);
}
if (StompCommand.UNSUBSCRIBE.toString().equalsIgnoreCase(accessor.getCommand())) {
handled = true;
unsubscribeFromProxiedTarget(session, accessor);
}
if (StompCommand.CONNECT.toString().equalsIgnoreCase(accessor.getCommand())) {
handled = true;
connectToProxiedTarget(session);
}
if (!handled) {
if (logger.isDebugEnabled()) {
logger.debug("STOMP COMMAND " + accessor.getCommand()
+ " was not explicitly handled");
}
}
}
/**
* 根據(jù)請(qǐng)求獲取微服務(wù)地址
* 由ProxyWebSocketConnectionManager 代理websocket 連接
*/
private void connectToProxiedTarget(WebSocketSession session) {
URI sessionUri = session.getUri();
ZuulWebSocketProperties.WsBrokerage wsBrokerage = getWebSocketBrokarage(
sessionUri);
Assert.notNull(wsBrokerage, "wsBrokerage must not be null");
String path = getWebSocketServerPath(wsBrokerage, sessionUri);
Assert.notNull(path, "Web socket uri path must be null");
URI routeTarget = proxyTargetResolver.resolveTarget(wsBrokerage);
Assert.notNull(routeTarget, "routeTarget must not be null");
//微服務(wù)配置全局路徑的情況下棺蛛,需要添加微服務(wù)名
path = "/" + wsBrokerage.getId() + path;
String uri = ServletUriComponentsBuilder
.fromUri(routeTarget)
.path(path)
.replaceQuery(sessionUri.getQuery())
.toUriString();
ProxyWebSocketConnectionManager connectionManager = new ProxyWebSocketConnectionManager(
messagingTemplate, stompClient, session, headersCallback, uri);
connectionManager.errorHandler(this.errorHandler);
managers.put(session, connectionManager);
connectionManager.start();
}
- 網(wǎng)關(guān)接收到微服務(wù)發(fā)送的消息轉(zhuǎn)發(fā)到客戶(hù)端
/**
* 接收到微服務(wù)信息后調(diào)用
*/
@Override
public void handleFrame(StompHeaders headers, Object payload) {
if (headers.getDestination() != null) {
String destination = headers.getDestination();
if (logger.isDebugEnabled()) {
logger.debug("Received " + payload + ", To " + headers.getDestination());
}
Principal principal = userAgentSession.getPrincipal();
String userDestinationPrefix = messagingTemplate.getUserDestinationPrefix();
if (principal != null && destination.startsWith(userDestinationPrefix)) {
destination = destination.substring(userDestinationPrefix.length());
destination = destination.startsWith("/") ? destination
: "/" + destination;
messagingTemplate.convertAndSendToUser(principal.getName(), destination,
payload, copyHeaders(headers.toSingleValueMap()));
} else {
messagingTemplate.convertAndSend(destination, payload,
copyHeaders(headers.toSingleValueMap()));
}
}
}
開(kāi)發(fā)實(shí)例
???? 基于spring boot 1.x (spring mvc)實(shí)現(xiàn)stomp協(xié)議的websocket,并由spring cloud zuul 路由轉(zhuǎn)發(fā)怔蚌。
開(kāi)發(fā)websocket服務(wù)
??1. pom引入如下依賴(lài):
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>1.5.2.RELEASE</version>
</parent>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>Camden.SR5</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<dependencies>
<!---創(chuàng)建一個(gè)微服務(wù)工程的基礎(chǔ)依賴(lài)包,網(wǎng)關(guān)可不引用-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!--引入eureka 依賴(lài)包旁赊,將服務(wù)注冊(cè)到注冊(cè)中心-->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-eureka</artifactId>
</dependency>
<!--引入websocket 依賴(lài)包-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
</dependencies>
??配置STOMP的服務(wù)端點(diǎn)和請(qǐng)求訂閱前綴
/**
* 使用 STOMP 協(xié)議
* @author Golden
*/
@Configuration
@EnableWebSocketMessageBroker
public class StompWebSocketConfig extends AbstractWebSocketMessageBrokerConfigurer {
/**
* 注冊(cè)服務(wù)器端點(diǎn)
*/
@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
//增加 gs-guide-websocket 端點(diǎn)
registry.addEndpoint("/gs-guide-websocket")
//添加握手處理器桦踊,將客戶(hù)端傳入的session_id封裝為Principal對(duì)象,從而讓服務(wù)端能通過(guò)getName()方法找到指定客戶(hù)端
.setHandshakeHandler(new DefaultHandshakeHandler() {
@Override
protected Principal determineUser(ServerHttpRequest request, WebSocketHandler wsHandler,
Map<String, Object> attributes) {
//【關(guān)鍵】
final String sessionid = (String) attributes.get("session_id");
Principal principal = new Principal() {
@Override
public String getName() {
return sessionid;
}
};
return principal;
}
})
// 添加socket攔截器终畅,用于從請(qǐng)求中獲取session_id
.addInterceptors(new CustomHandshakeInterceptor())
// bypasses spring web security
.setAllowedOrigins("*").withSockJS();
}
/**
* 定義服務(wù)器端點(diǎn)請(qǐng)求和訂閱前綴
*/
@Override
public void configureMessageBroker(MessageBrokerRegistry config) {
// 客戶(hù)端訂閱請(qǐng)求前綴
config.enableSimpleBroker("/topic","/queue");
// 服務(wù)端點(diǎn)請(qǐng)求前綴
config.setApplicationDestinationPrefixes("/app");
}
}
/**
* 添加socket攔截器
* @author Golden
*/
public class CustomHandshakeInterceptor implements HandshakeInterceptor {
@Override
public void afterHandshake(ServerHttpRequest arg0, ServerHttpResponse arg1,
org.springframework.web.socket.WebSocketHandler arg2, Exception arg3) {
}
/**
* handler處理前調(diào)用,attributes屬性最終在WebSocketSession里,可能通過(guò)webSocketSession.getAttributes().get(key值)獲得
*/
@Override
public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse arg1,
org.springframework.web.socket.WebSocketHandler arg2, Map<String, Object> attributes) throws Exception {
if (request instanceof ServletServerHttpRequest) {
// 【關(guān)鍵】籍胯,header中的session_id是通過(guò)zuul端創(chuàng)建websocket conenction中傳遞過(guò)來(lái)
ServletServerHttpRequest servletRequest = (ServletServerHttpRequest) request;
String session_id = servletRequest.getServletRequest().getHeader("session_id");
// String session_id = servletRequest.getServletRequest().getParameter("session_id");
attributes.put("session_id", session_id);
return true;
}
return true;
}
}
zuul 網(wǎng)關(guān)配置
??spring cloud 使用網(wǎng)關(guān)(zuul)需要使用spring-cloud-netflix-zuul-websocket代碼竟闪,直接引入jar 滿(mǎn)足需要。
源碼修改
??修改ZuulWebSocketConfiguration 類(lèi)中的addStompEndpoint方法杖狼,添加服務(wù)端點(diǎn)的握手處理器炼蛤、攔截器。
??攔截器從websocket的請(qǐng)求鏈接requestURI中獲取到sockjssession的id蝶涩,并用于user理朋;握手處理器,將客戶(hù)端傳入的session_id封裝為Principal對(duì)象绿聘,從而讓服務(wù)端能通過(guò)getName()方法找到指定客戶(hù)端嗽上。代碼如下:
package com.github.mthizo247.cloud.netflix.zuul.web.socket
public class ZuulWebSocketConfiguration extends AbstractWebSocketMessageBrokerConfigurer
implements ApplicationListener<ContextRefreshedEvent> {
private SockJsServiceRegistration addStompEndpoint(StompEndpointRegistry registry, String... endpoint) {
return registry.addEndpoint(endpoint)
// bypasses spring web security
.setHandshakeHandler(new DefaultHandshakeHandler() {
@Override
protected Principal determineUser(ServerHttpRequest request, WebSocketHandler wsHandler,
Map<String, Object> attributes) {
// 利用client_id用于點(diǎn)對(duì)點(diǎn)發(fā)送
final String sessionId = (String) attributes.get("session_id");
Principal principal = new Principal() {
@Override
public String getName() {
return sessionId;
}
};
return principal;
}
})
.addInterceptors(new HandshakeInterceptor() {
@Override
public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response,
WebSocketHandler wsHandler, Map<String, Object> attributes) throws Exception {
if (request instanceof ServletServerHttpRequest) {
// 從websocket的請(qǐng)求鏈接requestURI中獲取到sockjssession的id,并用于user
ServletServerHttpRequest servletRequest = (ServletServerHttpRequest) request;
String uri = servletRequest.getServletRequest().getRequestURI();
System.out.println("----------" + uri);
int lastLashIndex = uri.lastIndexOf("/");
uri = uri.substring(0, lastLashIndex);
uri = uri.substring(uri.lastIndexOf("/") + 1);
attributes.put("session_id", uri);
return true;
}
return true;
}
@Override
public void afterHandshake(ServerHttpRequest request, ServerHttpResponse response,
WebSocketHandler wsHandler, Exception exception) {
}
})
.setAllowedOrigins("*").withSockJS();
}
}
??接著需要將session_id傳遞給微服務(wù)熄攘,修改ProxyWebSocketConnectionManager的buildWebSocketHttpHeaders方法兽愤,將session_id添加到socket connection的WebSocketHttpHeaders中。
private WebSocketHttpHeaders buildWebSocketHttpHeaders() {
WebSocketHttpHeaders wsHeaders = new WebSocketHttpHeaders();
if (httpHeadersCallback != null) {
httpHeadersCallback.applyHeaders(userAgentSession, wsHeaders);
List<String> list = new ArrayList<>();
list.add(userAgentSession.getId());
wsHeaders.put("session_id", list);
}
return wsHeaders;
}
??修改完成后再調(diào)試的過(guò)程中發(fā)現(xiàn)點(diǎn)對(duì)點(diǎn)發(fā)送依然無(wú)法接收到消息挪圾,網(wǎng)關(guān)出現(xiàn)消息轉(zhuǎn)換的異常烹看。通過(guò)調(diào)試發(fā)現(xiàn)訂閱topic和點(diǎn)對(duì)點(diǎn)兩種模式返回的數(shù)據(jù)類(lèi)型不一致。
- 訂閱topic 返回?cái)?shù)據(jù)類(lèi)型
contentType=application/json;charset=UTF-8- 點(diǎn)對(duì)點(diǎn)發(fā)送返回?cái)?shù)據(jù)類(lèi)型
contentType=text/plain;charset=UTF-8
??緊接著修改ProxyWebSocketConnectionManager中的 getPayloadType方法洛史,添加類(lèi)型判斷惯殊,如下:
@Override
public Type getPayloadType(StompHeaders headers) {
String type = headers.getContentType().getType();
//content-type=[text/plain;charset=UTF-8]
if("text".equals(type)) {
return String.class;
}
//content-type=[application/json;charset=UTF-8]
return Object.class;
}
????代碼改造完成.
配置zuul
??引入的pom依賴(lài)如下:
<!--spring boot 1.5.2 -->
<!--使用spring cloud Camden.SR5-->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-eureka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-zuul</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
<!-- 從github 上下載源碼碼到本地修改,自行打包 -->
<dependency>
<groupId>com.github.mthizo247</groupId>
<artifactId>spring-cloud-netflix-zuul-websocket</artifactId>
<version>1.0.7.RELEASE</version>
</dependency>
??yml 指定websocket 端點(diǎn)也殖、訂閱路徑前綴土思、服務(wù)端點(diǎn)請(qǐng)求前綴
zuul:
routes:
web-ui: # websocket 服務(wù)名
path: /**
#url: http://localhost:8080 在連接eureka的情況下不需要
service-id: web-ui
customSensitiveHeaders: true
ws:
brokerages:
web-ui: # websocket 服務(wù)名
end-points: /gs-guide-websocket
brokers: /topic,/queue
destination-prefixes: /app
??啟動(dòng)類(lèi)添加源碼的注解
@SpringBootApplication
@EnableZuulProxy
@EnableAsync
@EnableEurekaClient
@EnableZuulWebSocket
@EnableWebSocketMessageBroker
public class ZuulApplication
{
public static void main( String[] args )
{
SpringApplication.run(ZuulApplication.class, args);
}
@Bean
@LoadBalanced
RestTemplate restTemplate() {
return new RestTemplate();
}
??前端訂閱代碼如下:
function connect() {
var socket = new SockJS('/gs-guide-websocket');
stompClient = Stomp.over(socket);
stompClient.connect({}, function (frame) {
setConnected(true);
console.log('Connected: ' + frame);
stompClient.subscribe('/topic/greetings', function (greeting) {
showGreeting(JSON.parse(greeting.body).content);
});
//訂閱時(shí)間
stompClient.subscribe('/topic/time', function (greeting) {
showTime(JSON.parse(greeting.body).content);
});
//訂閱用戶(hù)通知消息,/user/ 需要添加
stompClient.subscribe('/user/queue/customer',function(message){
console.log("/queue/customer: " + message.body);
showUserListening(message.body);
});
});
}
演示效果:
最后感謝參考的以下幾篇博客
https://blog.csdn.net/weixin_34389926/article/details/86262894
http://www.reibang.com/p/32fae52c61f6
https://my.oschina.net/u/3706162/blog/1935071