歡迎來借鑒分布式WebSocket解決方案

單體Webscoket

  • springboot版本: 2.1.1.RELEASE
  • jdk: 1.8

示例代碼

  • WebsocketServer
@ServerEndpoint("/client/{userName}")
@Component
@Slf4j
public class WebSocketServer {

    /**
     * 靜態(tài)變量,用來記錄當前在線連接數(shù)琢锋。應該把它設(shè)計成線程安全的室抽。
     */
    private static int onlineCount = 0;
    /**
     * concurrent包的線程安全Set,用來存放每個客戶端對應的MyWebSocket對象。
     */
    private static ConcurrentHashMap<String, WebSocketServer> webSocketMap = new ConcurrentHashMap<>();
    /**
     * 與某個客戶端的連接會話,需要通過它來給客戶端發(fā)送數(shù)據(jù)
     */
    private Session session;
    /**
     * 接收userId
     */
    private String userName = "";

    /**
     * @Description: 連接建立成功調(diào)用的方法,成功建立之后,將用戶的userName 存儲到redis
     * @params: [session, userId]
     * @return: void
     * @Author: wangxianlin
     * @Date: 2020/5/9 9:13 PM
     */
    @OnOpen
    public void onOpen(Session session, @PathParam("userName") String userName) {
        this.session = session;
        this.userName = userName;
        webSocketMap.put(userName, this);
        addOnlineCount();
        log.info("用戶連接:" + userName + ",當前在線人數(shù)為:" + getOnlineCount());
    }

    /**
     * @Description: 連接關(guān)閉調(diào)用的方法
     * @params: []
     * @return: void
     * @Author: wangxianlin
     * @Date: 2020/5/9 9:13 PM
     */
    @OnClose
    public void onClose() {
        if (webSocketMap.containsKey(userName)) {
            webSocketMap.remove(userName);
            //從set中刪除
            subOnlineCount();
        }
        log.info("用戶退出:" + userName + ",當前在線人數(shù)為:" + getOnlineCount());
    }


    /**
     * @Description: 收到客戶端消息后調(diào)用的方法, 調(diào)用API接口 發(fā)送消息到
     * @params: [message, session]
     * @return: void
     * @Author: wangxianlin
     * @Date: 2020/5/9 9:13 PM
     */
    @OnMessage
    public void onMessage(String message, @PathParam("userName") String userName) {
        log.info("用戶消息:" + userName + ",報文:" + message);
        if (StringUtils.isNotBlank(message)) {
            try {
                //解析發(fā)送的報文
                JSONObject jsonObject = JSON.parseObject(message);
                //追加發(fā)送人(防止串改)
                jsonObject.put("sender", this.userName);
                String receiver = jsonObject.getString("receiver");
                //傳送給對應toUserId用戶的websocket
                if (StringUtils.isNotBlank(receiver) && webSocketMap.containsKey(receiver)) {
                    webSocketMap.get(receiver).session.getBasicRemote().sendText(jsonObject.toJSONString());
                } else {
                    log.error("用戶:" + receiver + "不在該服務器上");
                    //否則不在這個服務器上吧秕,發(fā)送到mysql或者redis
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }


    /**
     * 發(fā)布websocket消息
     * 消息格式: { "sender": "u2","receiver": "u1","msg": "hello world","createTime":"2021-10-12 11:12:11"}
     *
     * @param dto
     * @return
     */
    public static void sendWebsocketMessage(ChatMsg dto) {
        if (dto != null) {
            if (StringUtils.isNotBlank(dto.getReceiver()) && webSocketMap.containsKey(dto.getReceiver())) {
                String json = JSON.toJSONString(dto);
                try {
                    webSocketMap.get(dto.getReceiver()).session.getBasicRemote().sendText(json);
                } catch (IOException e) {
                    log.error("消息發(fā)送異常:{}", e.toString());
                }
            } else {
                log.error("用戶:" + dto.getReceiver() + ",不在線!");
            }
        }
    }

    /**
     * @param session
     * @param error
     */
    @OnError
    public void onError(Session session, Throwable error) {
        log.error("用戶錯誤:" + this.userName + ",原因:" + error.getMessage());
        error.printStackTrace();
    }

    /**
     * @Description: 獲取在線人數(shù)
     * @params: []
     * @return: int
     * @Author: wangxianlin
     * @Date: 2020/5/9 9:09 PM
     */
    public static synchronized int getOnlineCount() {
        return onlineCount;
    }

    /**
     * @Description: 在線人數(shù)+1
     * @params: []
     * @return: void
     * @Author: wangxianlin
     * @Date: 2020/5/9 9:09 PM
     */
    public static synchronized void addOnlineCount() {
        WebSocketServer.onlineCount++;
    }

    /**
     * @Description: 在線人數(shù)-1
     * @params: []
     * @return: void
     * @Author: wangxianlin
     * @Date: 2020/5/9 9:09 PM
     */
    public static synchronized void subOnlineCount() {
        WebSocketServer.onlineCount--;
    }
}
  • WebSocketConfig
@Configuration
public class WebSocketConfig {
    @Bean
    public ServerEndpointExporter serverEndpointExporter(){
        return new ServerEndpointExporter();
    }
}
  • 前端代碼
var socket;
    var userName;
    establishConnection()
    /***建立連接*/
    function establishConnection() {
        userName = $("#sender").val();
        if (userName == '' || userName == null) {
            alert("請輸入發(fā)送者");
            return;
        }
        //實現(xiàn)化WebSocket對象迹炼,指定要連接的服務器地址與端口  建立連接
        var socketUrl = "" + window.location.protocol + "http://" + window.location.host + "/client/" + userName;
        socketUrl = socketUrl.replace("https", "ws").replace("http", "ws");
        if (socket != null) {
            socket.close();
            socket = null;
        }
        socket = new WebSocket(socketUrl);
        //打開事件
        socket.onopen = function () {
            console.log("開始建立鏈接....")
        };
        //關(guān)閉事件
        socket.onclose = function () {
            console.log("websocket已關(guān)閉");
        };
        //發(fā)生了錯誤事件
        socket.onerror = function () {
            console.log("websocket發(fā)生了錯誤");
        };
        /**
         * 接收消息
         * @param msg
         */
        socket.onmessage = function (msg) {
            msg = JSON.parse(msg.data);
            console.log(msg);
            if (msg.msg != '連接成功') {
                $("#msgDiv").append('<p class="other">用戶名:' + msg.sender + '</p><p class="chat">' + msg.msg + '</p>');
            }
        };
    }
    /**
     * 發(fā)送消息
     */
    function sendMessage() {
        var msg = $("#msg").val();
        if (msg == '' || msg == null) {
            alert("消息內(nèi)容不能為空");
            return;
        }
        var receiver = $("#receiver").val();
        if (receiver == '' || receiver == null) {
            alert("接收人不能為空");
            return;
        }
        var msgObj = {
            "receiver": receiver,
            "msg": msg
        };
        $("#msgDiv").append('<p class="user">用戶名:' + userName + '</p><p class="chat">' + msg + '</p>');
        try{
            socket.send(JSON.stringify(msgObj));
            $("#msg").val('');
        }catch (e) {
            alert("服務器內(nèi)部錯誤");
        }
    }
  • 測試效果


    image.png
  • 問題
    如果兩個客戶端連接不在同一個服務器上砸彬,會出現(xiàn)什么問題?
    結(jié)果就是如下所示:
    image.png

如何解決多臺客戶端連接在不同服務器斯入,互相發(fā)送消息問題砂碉!

分布式WebSocket 解決

方案一 Redis消息訂閱與發(fā)布

image.png

描述:
客戶端A 和客戶端B 都訂閱同一個Topic ,后臺Websocket收到消息后刻两,將消息發(fā)送至Redis中增蹭,同時服務端會監(jiān)聽該渠道內(nèi)的消息,監(jiān)聽到消息后磅摹,會將消息推送至對應的客戶端滋迈。

示例代碼

  • application.yml
    主要是Redis配置
server:
  port: 8082

spring:
  thymeleaf:
    #模板的模式,支持 HTML, XML TEXT JAVASCRIPT
    mode: HTML5
    #編碼 可不用配置
    encoding: UTF-8
    #內(nèi)容類別,可不用配置
    content-type: text/html
    #開發(fā)配置為false,避免修改模板還要重啟服務器
    cache: false
#    #配置模板路徑户誓,默認是templates饼灿,可以不用配置
    prefix: classpath:/templates
    suffix: .html

  #Redis配置
  redis:
    host: localhost
    port: 6379
    password: 123456
    timeout: 5000
  • RedisSubscriberConfig.java
/**
 * @Description 消息訂閱配置類
 * @Author wxl
 * @Date 2020/3/31 13:54
 */
@Configuration
public class RedisSubscriberConfig {
    /**
     * 消息監(jiān)聽適配器,注入接受消息方法
     *
     * @param receiver
     * @return
     */
    @Bean
    public MessageListenerAdapter messageListenerAdapter(ChatMessageListener receiver) {
        return new MessageListenerAdapter(receiver);
    }
    /**
     * 創(chuàng)建消息監(jiān)聽容器
     *
     * @param redisConnectionFactory
     * @param messageListenerAdapter
     * @return
     */
    @Bean
    public RedisMessageListenerContainer getRedisMessageListenerContainer(RedisConnectionFactory redisConnectionFactory, MessageListenerAdapter messageListenerAdapter) {
        RedisMessageListenerContainer redisMessageListenerContainer = new RedisMessageListenerContainer();
        redisMessageListenerContainer.setConnectionFactory(redisConnectionFactory);
        redisMessageListenerContainer.addMessageListener(messageListenerAdapter, new PatternTopic(TOPIC_CUSTOMER));
        return redisMessageListenerContainer;
    }
}
  • RedisUtil.java
@Component
public class RedisUtil {

    @Autowired
    private StringRedisTemplate stringRedisTemplate;
    /**
     * 發(fā)布
     *
     * @param key
     */
    public void publish(String key, String value) {
        stringRedisTemplate.convertAndSend(key, value);
    }
}
  • ChatMessageListener.java
/**
 * @Description 集群聊天消息監(jiān)聽器
 * @Author wxl
 * @Date 2020/3/29 15:07
 */
@Slf4j
@Component
public class ChatMessageListener implements MessageListener {

    @Autowired
    private StringRedisTemplate redisTemplate;

    @Override
    public void onMessage(Message message, byte[] pattern) {
        RedisSerializer<String> valueSerializer = redisTemplate.getStringSerializer();
        String value = valueSerializer.deserialize(message.getBody());
        ChatMsg dto = null;
        if (StringUtils.isNotBlank(value)) {
            try {
                dto = JacksonUtil.json2pojo(value, ChatMsg.class);
            } catch (Exception e) {
                e.printStackTrace();
                log.error("消息格式轉(zhuǎn)換異常:{}", e.toString());
            }
            log.info("監(jiān)聽集群websocket消息--- {}", value);
            WebSocketServer.sendWebsocketMessage(dto);
        }
    }
}
  • WebSocketServer
@ServerEndpoint("/client/{userName}")
@Component
@Slf4j
public class WebSocketServer {

    /**
     * 靜態(tài)變量帝美,用來記錄當前在線連接數(shù)碍彭。應該把它設(shè)計成線程安全的。
     */
    private static int onlineCount = 0;
    /**
     * concurrent包的線程安全Set悼潭,用來存放每個客戶端對應的MyWebSocket對象庇忌。
     */
    private static ConcurrentHashMap<String, WebSocketServer> webSocketMap = new ConcurrentHashMap<>();
    /**
     * 與某個客戶端的連接會話,需要通過它來給客戶端發(fā)送數(shù)據(jù)
     */
    private Session session;

    /**
     * 不能使用@AutoWire原因:發(fā)現(xiàn)注入不了redis女责,redis注入失敗 可能是因為實例化的先后順序吧漆枚,WebSocket先實例化了创译,  但是@Autowire是會觸發(fā)getBean操作
     * 因為@ServerEndpoint不支持注入抵知,所以使用SpringUtils獲取IOC實例
     */
    private RedisUtil redisUtil = SpringUtils.getBean(RedisUtil.class);

    /**
     * 接收userId
     */
    private String userName = "";

 

    /**
     * @Description: 連接建立成功調(diào)用的方法,成功建立之后软族,將用戶的userName 存儲到redis
     * @params: [session, userId]
     * @return: void
     * @Author: wangxianlin
     * @Date: 2020/5/9 9:13 PM
     */
    @OnOpen
    public void onOpen(Session session, @PathParam("userName") String userName) {
        this.session = session;
        this.userName = userName;
        webSocketMap.put(userName, this);
        addOnlineCount();
        log.info("用戶連接:" + userName + ",當前在線人數(shù)為:" + getOnlineCount());
    }

    /**
     * @Description: 連接關(guān)閉調(diào)用的方法
     * @params: []
     * @return: void
     * @Author: wangxianlin
     * @Date: 2020/5/9 9:13 PM
     */
    @OnClose
    public void onClose() {
        if (webSocketMap.containsKey(userName)) {
            webSocketMap.remove(userName);
            //從set中刪除
            subOnlineCount();
        }
        log.info("用戶退出:" + userName + ",當前在線人數(shù)為:" + getOnlineCount());
    }


    /**
     * @Description: 收到客戶端消息后調(diào)用的方法, 調(diào)用API接口 發(fā)送消息到
     * @params: [message, session]
     * @return: void
     * @Author: wangxianlin
     * @Date: 2020/5/9 9:13 PM
     */
    @OnMessage
    public void onMessage(String message, @PathParam("userName") String userName) {
        log.info("用戶消息:" + userName + ",報文:" + message);
        if (StringUtils.isNotBlank(message)) {
            try {
                //解析發(fā)送的報文
                JSONObject jsonObject = JSON.parseObject(message);
                //追加發(fā)送人(防止串改)
                jsonObject.put("sender", this.userName);
                //傳送給對應toUserId用戶的websocket
                redisUtil.publish(TOPIC_CUSTOMER,jsonObject.toString());
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }


    /**
     * 發(fā)布websocket消息
     * 消息格式: { "sender": "u2","receiver": "u1","msg": "hello world","createTime":"2021-10-12 11:12:11"}
     *
     * @param dto
     * @return
     */
    public static void sendWebsocketMessage(ChatMsg dto) {
        if (dto != null) {
            if (StringUtils.isNotBlank(dto.getReceiver()) && webSocketMap.containsKey(dto.getReceiver())) {
                String json = JSON.toJSONString(dto);
                try {
                    webSocketMap.get(dto.getReceiver()).session.getBasicRemote().sendText(json);
                } catch (IOException e) {
                    log.error("消息發(fā)送異常:{}", e.toString());
                }
            } else {
                log.error("用戶:" + dto.getReceiver() + ",不在次服務器上刷喜!");
            }
        }
    }

    /**
     * @param session
     * @param error
     */
    @OnError
    public void onError(Session session, Throwable error) {
        log.error("用戶錯誤:" + this.userName + ",原因:" + error.getMessage());
        error.printStackTrace();
    }

    /**
     * @Description: 獲取在線人數(shù)
     * @params: []
     * @return: int
     * @Author: wangxianlin
     * @Date: 2020/5/9 9:09 PM
     */
    public static synchronized int getOnlineCount() {
        return onlineCount;
    }

    /**
     * @Description: 在線人數(shù)+1
     * @params: []
     * @return: void
     * @Author: wangxianlin
     * @Date: 2020/5/9 9:09 PM
     */
    public static synchronized void addOnlineCount() {
        WebSocketServer.onlineCount++;
    }

    /**
     * @Description: 在線人數(shù)-1
     * @params: []
     * @return: void
     * @Author: wangxianlin
     * @Date: 2020/5/9 9:09 PM
     */
    public static synchronized void subOnlineCount() {
        WebSocketServer.onlineCount--;
    }
}
  • 測試效果


    image.png

方案二 RabbitMq

采用的是基于rabbitmq的扇形分發(fā)器,消息生產(chǎn)者發(fā)送到指定的隊列立砸,消息消費者監(jiān)聽此隊列的消息掖疮,
將消息推送客戶端。

交換機颗祝、隊列

@Configuration
public class FanoutRabbitConfig {

    /**
     *  創(chuàng)建三個隊列 :fanout.msg
     *  將三個隊列都綁定在交換機 fanoutExchange 上
     *  因為是扇型交換機, 路由鍵無需配置,配置也不起作用
     */
    @Bean
    public Queue queueMsg() {
        return new Queue(ConstantUtils.FANOUT_QUEUE_MSG);
    }

    @Bean
    FanoutExchange fanoutExchange() {
        return new FanoutExchange(ConstantUtils.FANOUT_EXCHANGE);
    }

    @Bean
    Binding bindingExchangeA() {
        return BindingBuilder.bind(queueMsg()).to(fanoutExchange());
    }
}

消息監(jiān)聽

@Component
@RabbitListener(queues = ConstantUtils.FANOUT_QUEUE_MSG)
public class FanoutReceiverMsg {
    @RabbitHandler
    public void process(Map msg) throws IOException {
        if (msg !=null){
            WebSocketServer.sendMessage((String)msg.get("receiver"),msg.toString());
        }
    }
}
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末浊闪,一起剝皮案震驚了整個濱河市恼布,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌搁宾,老刑警劉巖折汞,帶你破解...
    沈念sama閱讀 206,311評論 6 481
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件盖腿,死亡現(xiàn)場離奇詭異爽待,居然都是意外死亡,警方通過查閱死者的電腦和手機翩腐,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,339評論 2 382
  • 文/潘曉璐 我一進店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來何什,“玉大人而咆,你說我怎么就攤上這事”┍福” “怎么了涯捻?”我有些...
    開封第一講書人閱讀 152,671評論 0 342
  • 文/不壞的土叔 我叫張陵浅妆,是天一觀的道長。 經(jīng)常有香客問我障癌,道長凌外,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 55,252評論 1 279
  • 正文 為了忘掉前任涛浙,我火速辦了婚禮康辑,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘轿亮。我一直安慰自己疮薇,他們只是感情好,可當我...
    茶點故事閱讀 64,253評論 5 371
  • 文/花漫 我一把揭開白布我注。 她就那樣靜靜地躺著按咒,像睡著了一般。 火紅的嫁衣襯著肌膚如雪但骨。 梳的紋絲不亂的頭發(fā)上励七,一...
    開封第一講書人閱讀 49,031評論 1 285
  • 那天智袭,我揣著相機與錄音,去河邊找鬼掠抬。 笑死补履,一個胖子當著我的面吹牛,可吹牛的內(nèi)容都是我干的剿另。 我是一名探鬼主播箫锤,決...
    沈念sama閱讀 38,340評論 3 399
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼雨女!你這毒婦竟也來了谚攒?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 36,973評論 0 259
  • 序言:老撾萬榮一對情侶失蹤氛堕,失蹤者是張志新(化名)和其女友劉穎馏臭,沒想到半個月后,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體讼稚,經(jīng)...
    沈念sama閱讀 43,466評論 1 300
  • 正文 獨居荒郊野嶺守林人離奇死亡括儒,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 35,937評論 2 323
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了锐想。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片帮寻。...
    茶點故事閱讀 38,039評論 1 333
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖赠摇,靈堂內(nèi)的尸體忽然破棺而出固逗,到底是詐尸還是另有隱情,我是刑警寧澤藕帜,帶...
    沈念sama閱讀 33,701評論 4 323
  • 正文 年R本政府宣布烫罩,位于F島的核電站,受9級特大地震影響洽故,放射性物質(zhì)發(fā)生泄漏贝攒。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 39,254評論 3 307
  • 文/蒙蒙 一时甚、第九天 我趴在偏房一處隱蔽的房頂上張望隘弊。 院中可真熱鬧,春花似錦撞秋、人聲如沸长捧。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,259評論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至哑子,卻和暖如春舅列,著一層夾襖步出監(jiān)牢的瞬間肌割,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 31,485評論 1 262
  • 我被黑心中介騙來泰國打工帐要, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留把敞,地道東北人。 一個月前我還...
    沈念sama閱讀 45,497評論 2 354
  • 正文 我出身青樓榨惠,卻偏偏與公主長得像奋早,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子赠橙,可洞房花燭夜當晚...
    茶點故事閱讀 42,786評論 2 345

推薦閱讀更多精彩內(nèi)容