架構(gòu)師方案-Websocket集群解決方案

前言

WebSocket是一種在網(wǎng)絡(luò)應(yīng)用程序中,使客戶度端和服務(wù)器之間可以進(jìn)行雙向通信的協(xié)議镜雨。它允許數(shù)據(jù)可以在建立連接后進(jìn)行實(shí)時交換,而不必依賴傳統(tǒng)的HTTP請求-響應(yīng)模式儿捧。WebSocket使得客戶端和服務(wù)器之間的數(shù)據(jù)交換變得更加簡單荚坞,允許服務(wù)端主動向客戶端推送數(shù)據(jù)。在WebSocket API中菲盾,瀏覽器和服務(wù)器只需要完成一次握手颓影,兩者之間就直接可以創(chuàng)建持久性的連接,并進(jìn)行雙向數(shù)據(jù)傳輸懒鉴。

方案一:廣播方案

image.png
  • Step1: 客戶端連接到某個Websocket Server诡挂,在該websocket Server中建立userid和session的綁定關(guān)系
  • Step2: 其它服務(wù)或者客戶端通過MQ廣播消息所有Websocket Server(消息體中帶有userid)
  • Step3: 所有Websocket Server 根據(jù)客戶端userid找到對應(yīng)session, 只有存在userid和session的綁定關(guān)系的Websocket Server才發(fā)送消息到客戶端

代碼演示

1.Websocket Server 建立userid和session的綁定關(guān)系
@ServerEndpoint("/websocket/{businessType}/{userId}")
@Component
public class WebSocketServer {
    /**
     * 若要實(shí)現(xiàn)服務(wù)端與單一客戶端通信的話临谱,可以使用Map來存放璃俗,其中Key可以為用戶標(biāo)識
     * 注意:allSession 只記錄當(dāng)前機(jī)器的 客戶端連接,不是所有session連接
     */

    public static ConcurrentHashMap<String, Session> allSession = new ConcurrentHashMap<>();

    @Resource
    private RedisService redisService;

    /**
     * 連接建立成功調(diào)用的方法
     *
     * @param session 可選的參數(shù)悉默。session為與某個客戶端的連接會話城豁,需要通過它來給客戶端發(fā)送數(shù)據(jù)
     */
    @OnOpen
    public void onOpen(@PathParam(value = "businessType") String businessType, @PathParam(value = "userId") String userId, Session session, EndpointConfig config) {
        if (StringUtils.isEmpty(userId)) {
            return;
        }
        /**
         * 加入到本地map
         */
        allSession.put(userId, session);
    }

    /**
     * 連接關(guān)閉調(diào)用的方法
     */
    @OnClose
    public void onClose(@PathParam(value = "userId") String userId, Session session) {
        if (StringUtils.isNotEmpty(userId)) {
            allSession.remove(userId);
        }
    }


    /**
     * 發(fā)生錯誤時調(diào)用
     *
     * @param
     * @param
     */
    @OnError
    public void onError(@PathParam(value = "userId") String userId, Session session, Throwable error) {
    }


    /**
     * 用戶id
     *
     * @param userId
     * @param message
     */
    public void sendMessageToOneUser(Integer userId, String message, String msgId) {
        if (userId == null) {
            return;
        }
        Session session = allSession.get(String.valueOf(userId));
        if (session != null) {
         //所有Websocket Server 根據(jù)客戶端userid找到對應(yīng)session, 只有存在userid和session的綁定關(guān)系的Websocket Server才發(fā)送消息到客戶端
          session.getAsyncRemote().sendText(message);
        } else {
            System.err.println("session為空");
            allSession.remove(userId + "");
        }
    }
}
2.所有Websocket Server 接收消息并處理
@Component
@RequiredArgsConstructor
public class CreateOrderConsumer implements BaseConsumer {


    private final WebSocketServer webSocketServer;


    @Override
    public Action handleMessage(Message message) {
        CreateOrderMessage createOrderMessage = JSON.parseObject(message.getBody(), LinkCreateOrderMessage.class);

        try {
           //業(yè)務(wù)校驗(yàn)省略...
           //調(diào)用WebSocketServer的sendMessageToOneUser方法抄课,里面根據(jù)客戶端userid找到對應(yīng)session唱星, 只有存在userid和session的綁定關(guān)系的Websocket Server才發(fā)送消息到客戶端
            webSocketServer.sendMessageToOneUser(createOrderMessage.getUserId(), JSON.toJSONString(linkActionRes),message.getMsgID());
        } catch (Exception e) {
            e.printStackTrace();
            return Action.ReconsumeLater;
        }
        return Action.CommitMessage;
    }
}    

方案二:目標(biāo)詢址方案(推薦)

image.png
Id標(biāo)識有兩種實(shí)現(xiàn)形式:
  • 為唯一的服務(wù)名:每一個WebSocketServer生成唯一的服務(wù)名(serviceName="XXX-" + IdUtil.oneId())并注冊到naocs服務(wù)組冊中心,uesrid與其綁定跟磨,服務(wù)適用方使用Feign 或其它RPC調(diào)用http://{serviceName}/xxx/xxx到指定WebSocketServer
  • 為唯一的IP+端口:每一個WebSocketServer 獲取自己IP+端口间聊,uesrid與其綁定,服務(wù)調(diào)用方使用該IP+端口

代碼演示(唯一Id為唯一的服務(wù)名的形式)

1.綁定userid和服務(wù)名唯一Id的關(guān)系(以ApplicationName形式為例)
@SpringBootApplication
public class WsApplication  {

    public static void main(String[] args) {
        //動態(tài)服務(wù)名
        System.setProperty("myApplicationName", "WS-" + IdUtil.oneId());
        SpringApplication.run(WsApplication.class, args);
    }
}
spring:
  application:
    #隨機(jī)名字抵拘,做ws集群使用
    name: ${myApplicationName}
@ServerEndpoint("/websocket/{businessType}/{userId}")
@Component
public class WebSocketServer {
    /**
     * 若要實(shí)現(xiàn)服務(wù)端與單一客戶端通信的話哎榴,可以使用Map來存放,其中Key可以為用戶標(biāo)識
     * 注意:allSession 只記錄當(dāng)前機(jī)器的 客戶端連接,不是所有session連接
     */

    public static ConcurrentHashMap<String, Session> allSession = new ConcurrentHashMap<>();
    /**
     *
     */
    private String myApplicationName = System.getProperty("myApplicationName");
    @Resource
    private RedisService redisService;

    /**
     * 連接建立成功調(diào)用的方法
     * 關(guān)鍵代碼
     * @param session 可選的參數(shù)叹话。session為與某個客戶端的連接會話偷遗,需要通過它來給客戶端發(fā)送數(shù)據(jù)
     */
    @OnOpen
    public void onOpen(@PathParam(value = "businessType") String businessType, @PathParam(value = "userId") String userId, Session session, EndpointConfig config) {
        if (StringUtils.isEmpty(userId)) {
            return;
        }
        /**
         * 加入到本地map
         */
        allSession.put(userId, session);
        //綁定userid和服務(wù)名唯一Id的關(guān)系
        redisService.hset(WS_MAPPING, userId + "", myApplicationName);
    }

    /**
     * 連接關(guān)閉調(diào)用的方法
     */
    @OnClose
    public void onClose(@PathParam(value = "userId") String userId, Session session) {
        if (StringUtils.isNotEmpty(userId)) {
            allSession.remove(userId);
        }
    }

    /**
     * 發(fā)生錯誤時調(diào)用
     *
     * @param
     * @param
     */
    @OnError
    public void onError(@PathParam(value = "userId") String userId, Session session, Throwable error) {
    }

    /**
     * 用戶id
     *
     * @param userId
     * @param message
     */
    public void sendMessageToOneUser(Integer userId, String message) {
        if (userId == null) {
            return;
        }
        Session session = allSession.get(String.valueOf(userId));
        if (session != null) {
         //所有Websocket Server 根據(jù)客戶端userid找到對應(yīng)session, 只有存在userid和session的綁定關(guān)系的Websocket Server才發(fā)送消息到客戶端
          session.getAsyncRemote().sendText(message);
        } else {
            System.err.println("session為空");
            allSession.remove(userId + "");
        }
    }
}
2.Websocket Server提供的調(diào)用接口
@RestController
@RequestMapping("push")
public class  WebSocketPushController {


    @PostMapping("{userId}")
    public void pushMessage(@PathVariable Long userId, @RequestBody Object message) {
            webSocketServer.sendMessageToOneUser(userId, message);
    }

}
3.調(diào)用方通過nacos調(diào)用目標(biāo)Websocket Server
//業(yè)省略
MyApplicationName myApplicationName =  redisService.hget(WS_MAPPING, userId + "");

Feign:
http://${myApplicationName}/push/{userId}
方案 優(yōu)點(diǎn) 缺點(diǎn)
廣播方案 實(shí)現(xiàn)簡單 1.增加MQ驼壶,可能造成消息擠壓氏豌、消息順序的問題
2.每個Websocket Server服務(wù)都需要去消費(fèi)消息,增加每個服務(wù)的壓力(做無用功)
3.要保證消息冪等性,使用分布式鎖會降低服務(wù)的并發(fā)量
目標(biāo)詢址方案 精確調(diào)用Websocket Server服務(wù)热凹,性能與并發(fā)量更高 1.Id標(biāo)識為為唯一的服務(wù)名:Websocket Server服務(wù)名不一致可能導(dǎo)致一些監(jiān)控系統(tǒng)不便于管控
2.Id標(biāo)識為唯一的IP+端口 :要保證在容器下獲取IP的正確性
參考
WebSocket集群解決方案泵喘,不用MQ
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市般妙,隨后出現(xiàn)的幾起案子纪铺,更是在濱河造成了極大的恐慌,老刑警劉巖碟渺,帶你破解...
    沈念sama閱讀 217,185評論 6 503
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件鲜锚,死亡現(xiàn)場離奇詭異,居然都是意外死亡苫拍,警方通過查閱死者的電腦和手機(jī)芜繁,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,652評論 3 393
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來绒极,“玉大人骏令,你說我怎么就攤上這事÷⑻幔” “怎么了榔袋?”我有些...
    開封第一講書人閱讀 163,524評論 0 353
  • 文/不壞的土叔 我叫張陵,是天一觀的道長铡俐。 經(jīng)常有香客問我凰兑,道長,這世上最難降的妖魔是什么审丘? 我笑而不...
    開封第一講書人閱讀 58,339評論 1 293
  • 正文 為了忘掉前任聪黎,我火速辦了婚禮,結(jié)果婚禮上备恤,老公的妹妹穿的比我還像新娘稿饰。我一直安慰自己,他們只是感情好露泊,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,387評論 6 391
  • 文/花漫 我一把揭開白布喉镰。 她就那樣靜靜地躺著,像睡著了一般惭笑。 火紅的嫁衣襯著肌膚如雪侣姆。 梳的紋絲不亂的頭發(fā)上生真,一...
    開封第一講書人閱讀 51,287評論 1 301
  • 那天,我揣著相機(jī)與錄音捺宗,去河邊找鬼柱蟀。 笑死,一個胖子當(dāng)著我的面吹牛蚜厉,可吹牛的內(nèi)容都是我干的长已。 我是一名探鬼主播,決...
    沈念sama閱讀 40,130評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼昼牛,長吁一口氣:“原來是場噩夢啊……” “哼术瓮!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起贰健,我...
    開封第一講書人閱讀 38,985評論 0 275
  • 序言:老撾萬榮一對情侶失蹤胞四,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后伶椿,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體辜伟,經(jīng)...
    沈念sama閱讀 45,420評論 1 313
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,617評論 3 334
  • 正文 我和宋清朗相戀三年脊另,在試婚紗的時候發(fā)現(xiàn)自己被綠了导狡。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 39,779評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡尝蠕,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出载庭,到底是詐尸還是另有隱情看彼,我是刑警寧澤,帶...
    沈念sama閱讀 35,477評論 5 345
  • 正文 年R本政府宣布囚聚,位于F島的核電站靖榕,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏顽铸。R本人自食惡果不足惜茁计,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,088評論 3 328
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望谓松。 院中可真熱鬧星压,春花似錦、人聲如沸鬼譬。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,716評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽优质。三九已至竣贪,卻和暖如春军洼,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背演怎。 一陣腳步聲響...
    開封第一講書人閱讀 32,857評論 1 269
  • 我被黑心中介騙來泰國打工匕争, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人爷耀。 一個月前我還...
    沈念sama閱讀 47,876評論 2 370
  • 正文 我出身青樓甘桑,卻偏偏與公主長得像,于是被迫代替她去往敵國和親畏纲。 傳聞我的和親對象是個殘疾皇子扇住,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,700評論 2 354