單體Webscoket
- springboot版本: 2.1.1.RELEASE
- jdk: 1.8
示例代碼
- WebsocketServer
@ServerEndpoint("/client/{userName}")
@Component
@Slf4j
public class WebSocketServer {
/**
* 靜態(tài)變量,用來記錄當前在線連接數(shù)琢锋。應該把它設(shè)計成線程安全的室抽。
*/
private static int onlineCount = 0;
/**
* concurrent包的線程安全Set,用來存放每個客戶端對應的MyWebSocket對象。
*/
private static ConcurrentHashMap<String, WebSocketServer> webSocketMap = new ConcurrentHashMap<>();
/**
* 與某個客戶端的連接會話,需要通過它來給客戶端發(fā)送數(shù)據(jù)
*/
private Session session;
/**
* 接收userId
*/
private String userName = "";
/**
* @Description: 連接建立成功調(diào)用的方法,成功建立之后,將用戶的userName 存儲到redis
* @params: [session, userId]
* @return: void
* @Author: wangxianlin
* @Date: 2020/5/9 9:13 PM
*/
@OnOpen
public void onOpen(Session session, @PathParam("userName") String userName) {
this.session = session;
this.userName = userName;
webSocketMap.put(userName, this);
addOnlineCount();
log.info("用戶連接:" + userName + ",當前在線人數(shù)為:" + getOnlineCount());
}
/**
* @Description: 連接關(guān)閉調(diào)用的方法
* @params: []
* @return: void
* @Author: wangxianlin
* @Date: 2020/5/9 9:13 PM
*/
@OnClose
public void onClose() {
if (webSocketMap.containsKey(userName)) {
webSocketMap.remove(userName);
//從set中刪除
subOnlineCount();
}
log.info("用戶退出:" + userName + ",當前在線人數(shù)為:" + getOnlineCount());
}
/**
* @Description: 收到客戶端消息后調(diào)用的方法, 調(diào)用API接口 發(fā)送消息到
* @params: [message, session]
* @return: void
* @Author: wangxianlin
* @Date: 2020/5/9 9:13 PM
*/
@OnMessage
public void onMessage(String message, @PathParam("userName") String userName) {
log.info("用戶消息:" + userName + ",報文:" + message);
if (StringUtils.isNotBlank(message)) {
try {
//解析發(fā)送的報文
JSONObject jsonObject = JSON.parseObject(message);
//追加發(fā)送人(防止串改)
jsonObject.put("sender", this.userName);
String receiver = jsonObject.getString("receiver");
//傳送給對應toUserId用戶的websocket
if (StringUtils.isNotBlank(receiver) && webSocketMap.containsKey(receiver)) {
webSocketMap.get(receiver).session.getBasicRemote().sendText(jsonObject.toJSONString());
} else {
log.error("用戶:" + receiver + "不在該服務器上");
//否則不在這個服務器上吧秕,發(fā)送到mysql或者redis
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
/**
* 發(fā)布websocket消息
* 消息格式: { "sender": "u2","receiver": "u1","msg": "hello world","createTime":"2021-10-12 11:12:11"}
*
* @param dto
* @return
*/
public static void sendWebsocketMessage(ChatMsg dto) {
if (dto != null) {
if (StringUtils.isNotBlank(dto.getReceiver()) && webSocketMap.containsKey(dto.getReceiver())) {
String json = JSON.toJSONString(dto);
try {
webSocketMap.get(dto.getReceiver()).session.getBasicRemote().sendText(json);
} catch (IOException e) {
log.error("消息發(fā)送異常:{}", e.toString());
}
} else {
log.error("用戶:" + dto.getReceiver() + ",不在線!");
}
}
}
/**
* @param session
* @param error
*/
@OnError
public void onError(Session session, Throwable error) {
log.error("用戶錯誤:" + this.userName + ",原因:" + error.getMessage());
error.printStackTrace();
}
/**
* @Description: 獲取在線人數(shù)
* @params: []
* @return: int
* @Author: wangxianlin
* @Date: 2020/5/9 9:09 PM
*/
public static synchronized int getOnlineCount() {
return onlineCount;
}
/**
* @Description: 在線人數(shù)+1
* @params: []
* @return: void
* @Author: wangxianlin
* @Date: 2020/5/9 9:09 PM
*/
public static synchronized void addOnlineCount() {
WebSocketServer.onlineCount++;
}
/**
* @Description: 在線人數(shù)-1
* @params: []
* @return: void
* @Author: wangxianlin
* @Date: 2020/5/9 9:09 PM
*/
public static synchronized void subOnlineCount() {
WebSocketServer.onlineCount--;
}
}
- WebSocketConfig
@Configuration
public class WebSocketConfig {
@Bean
public ServerEndpointExporter serverEndpointExporter(){
return new ServerEndpointExporter();
}
}
- 前端代碼
var socket;
var userName;
establishConnection()
/***建立連接*/
function establishConnection() {
userName = $("#sender").val();
if (userName == '' || userName == null) {
alert("請輸入發(fā)送者");
return;
}
//實現(xiàn)化WebSocket對象迹炼,指定要連接的服務器地址與端口 建立連接
var socketUrl = "" + window.location.protocol + "http://" + window.location.host + "/client/" + userName;
socketUrl = socketUrl.replace("https", "ws").replace("http", "ws");
if (socket != null) {
socket.close();
socket = null;
}
socket = new WebSocket(socketUrl);
//打開事件
socket.onopen = function () {
console.log("開始建立鏈接....")
};
//關(guān)閉事件
socket.onclose = function () {
console.log("websocket已關(guān)閉");
};
//發(fā)生了錯誤事件
socket.onerror = function () {
console.log("websocket發(fā)生了錯誤");
};
/**
* 接收消息
* @param msg
*/
socket.onmessage = function (msg) {
msg = JSON.parse(msg.data);
console.log(msg);
if (msg.msg != '連接成功') {
$("#msgDiv").append('<p class="other">用戶名:' + msg.sender + '</p><p class="chat">' + msg.msg + '</p>');
}
};
}
/**
* 發(fā)送消息
*/
function sendMessage() {
var msg = $("#msg").val();
if (msg == '' || msg == null) {
alert("消息內(nèi)容不能為空");
return;
}
var receiver = $("#receiver").val();
if (receiver == '' || receiver == null) {
alert("接收人不能為空");
return;
}
var msgObj = {
"receiver": receiver,
"msg": msg
};
$("#msgDiv").append('<p class="user">用戶名:' + userName + '</p><p class="chat">' + msg + '</p>');
try{
socket.send(JSON.stringify(msgObj));
$("#msg").val('');
}catch (e) {
alert("服務器內(nèi)部錯誤");
}
}
-
測試效果
-
問題
如果兩個客戶端連接不在同一個服務器上砸彬,會出現(xiàn)什么問題?
結(jié)果就是如下所示:
如何解決多臺客戶端連接在不同服務器斯入,互相發(fā)送消息問題砂碉!
分布式WebSocket 解決
方案一 Redis消息訂閱與發(fā)布
描述:
客戶端A 和客戶端B 都訂閱同一個Topic ,后臺Websocket收到消息后刻两,將消息發(fā)送至Redis中增蹭,同時服務端會監(jiān)聽該渠道內(nèi)的消息,監(jiān)聽到消息后磅摹,會將消息推送至對應的客戶端滋迈。
示例代碼
- application.yml
主要是Redis配置
server:
port: 8082
spring:
thymeleaf:
#模板的模式,支持 HTML, XML TEXT JAVASCRIPT
mode: HTML5
#編碼 可不用配置
encoding: UTF-8
#內(nèi)容類別,可不用配置
content-type: text/html
#開發(fā)配置為false,避免修改模板還要重啟服務器
cache: false
# #配置模板路徑户誓,默認是templates饼灿,可以不用配置
prefix: classpath:/templates
suffix: .html
#Redis配置
redis:
host: localhost
port: 6379
password: 123456
timeout: 5000
- RedisSubscriberConfig.java
/**
* @Description 消息訂閱配置類
* @Author wxl
* @Date 2020/3/31 13:54
*/
@Configuration
public class RedisSubscriberConfig {
/**
* 消息監(jiān)聽適配器,注入接受消息方法
*
* @param receiver
* @return
*/
@Bean
public MessageListenerAdapter messageListenerAdapter(ChatMessageListener receiver) {
return new MessageListenerAdapter(receiver);
}
/**
* 創(chuàng)建消息監(jiān)聽容器
*
* @param redisConnectionFactory
* @param messageListenerAdapter
* @return
*/
@Bean
public RedisMessageListenerContainer getRedisMessageListenerContainer(RedisConnectionFactory redisConnectionFactory, MessageListenerAdapter messageListenerAdapter) {
RedisMessageListenerContainer redisMessageListenerContainer = new RedisMessageListenerContainer();
redisMessageListenerContainer.setConnectionFactory(redisConnectionFactory);
redisMessageListenerContainer.addMessageListener(messageListenerAdapter, new PatternTopic(TOPIC_CUSTOMER));
return redisMessageListenerContainer;
}
}
- RedisUtil.java
@Component
public class RedisUtil {
@Autowired
private StringRedisTemplate stringRedisTemplate;
/**
* 發(fā)布
*
* @param key
*/
public void publish(String key, String value) {
stringRedisTemplate.convertAndSend(key, value);
}
}
- ChatMessageListener.java
/**
* @Description 集群聊天消息監(jiān)聽器
* @Author wxl
* @Date 2020/3/29 15:07
*/
@Slf4j
@Component
public class ChatMessageListener implements MessageListener {
@Autowired
private StringRedisTemplate redisTemplate;
@Override
public void onMessage(Message message, byte[] pattern) {
RedisSerializer<String> valueSerializer = redisTemplate.getStringSerializer();
String value = valueSerializer.deserialize(message.getBody());
ChatMsg dto = null;
if (StringUtils.isNotBlank(value)) {
try {
dto = JacksonUtil.json2pojo(value, ChatMsg.class);
} catch (Exception e) {
e.printStackTrace();
log.error("消息格式轉(zhuǎn)換異常:{}", e.toString());
}
log.info("監(jiān)聽集群websocket消息--- {}", value);
WebSocketServer.sendWebsocketMessage(dto);
}
}
}
- WebSocketServer
@ServerEndpoint("/client/{userName}")
@Component
@Slf4j
public class WebSocketServer {
/**
* 靜態(tài)變量帝美,用來記錄當前在線連接數(shù)碍彭。應該把它設(shè)計成線程安全的。
*/
private static int onlineCount = 0;
/**
* concurrent包的線程安全Set悼潭,用來存放每個客戶端對應的MyWebSocket對象庇忌。
*/
private static ConcurrentHashMap<String, WebSocketServer> webSocketMap = new ConcurrentHashMap<>();
/**
* 與某個客戶端的連接會話,需要通過它來給客戶端發(fā)送數(shù)據(jù)
*/
private Session session;
/**
* 不能使用@AutoWire原因:發(fā)現(xiàn)注入不了redis女责,redis注入失敗 可能是因為實例化的先后順序吧漆枚,WebSocket先實例化了创译, 但是@Autowire是會觸發(fā)getBean操作
* 因為@ServerEndpoint不支持注入抵知,所以使用SpringUtils獲取IOC實例
*/
private RedisUtil redisUtil = SpringUtils.getBean(RedisUtil.class);
/**
* 接收userId
*/
private String userName = "";
/**
* @Description: 連接建立成功調(diào)用的方法,成功建立之后软族,將用戶的userName 存儲到redis
* @params: [session, userId]
* @return: void
* @Author: wangxianlin
* @Date: 2020/5/9 9:13 PM
*/
@OnOpen
public void onOpen(Session session, @PathParam("userName") String userName) {
this.session = session;
this.userName = userName;
webSocketMap.put(userName, this);
addOnlineCount();
log.info("用戶連接:" + userName + ",當前在線人數(shù)為:" + getOnlineCount());
}
/**
* @Description: 連接關(guān)閉調(diào)用的方法
* @params: []
* @return: void
* @Author: wangxianlin
* @Date: 2020/5/9 9:13 PM
*/
@OnClose
public void onClose() {
if (webSocketMap.containsKey(userName)) {
webSocketMap.remove(userName);
//從set中刪除
subOnlineCount();
}
log.info("用戶退出:" + userName + ",當前在線人數(shù)為:" + getOnlineCount());
}
/**
* @Description: 收到客戶端消息后調(diào)用的方法, 調(diào)用API接口 發(fā)送消息到
* @params: [message, session]
* @return: void
* @Author: wangxianlin
* @Date: 2020/5/9 9:13 PM
*/
@OnMessage
public void onMessage(String message, @PathParam("userName") String userName) {
log.info("用戶消息:" + userName + ",報文:" + message);
if (StringUtils.isNotBlank(message)) {
try {
//解析發(fā)送的報文
JSONObject jsonObject = JSON.parseObject(message);
//追加發(fā)送人(防止串改)
jsonObject.put("sender", this.userName);
//傳送給對應toUserId用戶的websocket
redisUtil.publish(TOPIC_CUSTOMER,jsonObject.toString());
} catch (Exception e) {
e.printStackTrace();
}
}
}
/**
* 發(fā)布websocket消息
* 消息格式: { "sender": "u2","receiver": "u1","msg": "hello world","createTime":"2021-10-12 11:12:11"}
*
* @param dto
* @return
*/
public static void sendWebsocketMessage(ChatMsg dto) {
if (dto != null) {
if (StringUtils.isNotBlank(dto.getReceiver()) && webSocketMap.containsKey(dto.getReceiver())) {
String json = JSON.toJSONString(dto);
try {
webSocketMap.get(dto.getReceiver()).session.getBasicRemote().sendText(json);
} catch (IOException e) {
log.error("消息發(fā)送異常:{}", e.toString());
}
} else {
log.error("用戶:" + dto.getReceiver() + ",不在次服務器上刷喜!");
}
}
}
/**
* @param session
* @param error
*/
@OnError
public void onError(Session session, Throwable error) {
log.error("用戶錯誤:" + this.userName + ",原因:" + error.getMessage());
error.printStackTrace();
}
/**
* @Description: 獲取在線人數(shù)
* @params: []
* @return: int
* @Author: wangxianlin
* @Date: 2020/5/9 9:09 PM
*/
public static synchronized int getOnlineCount() {
return onlineCount;
}
/**
* @Description: 在線人數(shù)+1
* @params: []
* @return: void
* @Author: wangxianlin
* @Date: 2020/5/9 9:09 PM
*/
public static synchronized void addOnlineCount() {
WebSocketServer.onlineCount++;
}
/**
* @Description: 在線人數(shù)-1
* @params: []
* @return: void
* @Author: wangxianlin
* @Date: 2020/5/9 9:09 PM
*/
public static synchronized void subOnlineCount() {
WebSocketServer.onlineCount--;
}
}
-
測試效果
方案二 RabbitMq
采用的是基于rabbitmq的扇形分發(fā)器,消息生產(chǎn)者發(fā)送到指定的隊列立砸,消息消費者監(jiān)聽此隊列的消息掖疮,
將消息推送客戶端。
交換機颗祝、隊列
@Configuration
public class FanoutRabbitConfig {
/**
* 創(chuàng)建三個隊列 :fanout.msg
* 將三個隊列都綁定在交換機 fanoutExchange 上
* 因為是扇型交換機, 路由鍵無需配置,配置也不起作用
*/
@Bean
public Queue queueMsg() {
return new Queue(ConstantUtils.FANOUT_QUEUE_MSG);
}
@Bean
FanoutExchange fanoutExchange() {
return new FanoutExchange(ConstantUtils.FANOUT_EXCHANGE);
}
@Bean
Binding bindingExchangeA() {
return BindingBuilder.bind(queueMsg()).to(fanoutExchange());
}
}
消息監(jiān)聽
@Component
@RabbitListener(queues = ConstantUtils.FANOUT_QUEUE_MSG)
public class FanoutReceiverMsg {
@RabbitHandler
public void process(Map msg) throws IOException {
if (msg !=null){
WebSocketServer.sendMessage((String)msg.get("receiver"),msg.toString());
}
}
}