CIM-server功能和設計分析

cim github地址: https://github.com/crossoverJie/cim

  • 第一篇: CIM-client 功能和設計分析
  • 第二篇:CIM-router功能和設計分析
  • 第三篇:CIM-server功能和設計分析
    分析完了CIM-client,CIM-router后都哭,最后分析下CIM-server就完整了秩伞。借用crossoverjie的架構(gòu)圖如下:
    image.png

    client與client通信都是通過router作為中介,相當于router作為中轉(zhuǎn)站欺矫。一個client只要知道另外一個client與那個server連接起來纱新,router就把消息發(fā)送該server。然后該server把消息寫到client的channel里面去穆趴。
    server端稍微簡單點脸爱,直接進入主題。
1. 程序入口
public class CIMServerApplication implements CommandLineRunner{

    private final static Logger LOGGER = LoggerFactory.getLogger(CIMServerApplication.class);

    @Autowired
    private AppConfiguration appConfiguration ;

    @Value("${server.port}")
    private int httpPort ;
        // 正常啟動
    public static void main(String[] args) {
        SpringApplication.run(CIMServerApplication.class, args);
        LOGGER.info("啟動 Server 成功");
    }
        // 把本地服務ip+prot 注冊到ZK上
    @Override
    public void run(String... args) throws Exception {
        //獲得本機IP
        String addr = InetAddress.getLocalHost().getHostAddress();
        Thread thread = new Thread(new RegistryZK(addr, appConfiguration.getCimServerPort(),httpPort));
        thread.setName("registry-zk");
        thread.start() ;
    }
}
public class RegistryZK implements Runnable {

    @Override
    public void run() {

        //創(chuàng)建父節(jié)點
        zKit.createRootNode();

        //是否要將自己注冊到 ZK
        if (appConfiguration.isZkSwitch()){
            String path = appConfiguration.getZkRoot() + "/ip-" + ip + ":" + cimServerPort + ":" + httpPort;
            zKit.createNode(path);
            logger.info("注冊 zookeeper 成功未妹,msg=[{}]", path);
        }


    }
}
  • 以上主要是將自己注冊到ZK中簿废,作為服務被發(fā)現(xiàn)。
2. Server發(fā)送消息

server收到發(fā)送消息的router的請求络它,將http請求過來的消息發(fā)送給指定的client

@ApiOperation("服務端發(fā)送消息")
    @RequestMapping(value = "sendMsg",method = RequestMethod.POST)
    @ResponseBody
    public BaseResponse<SendMsgResVO> sendMsg(@RequestBody SendMsgReqVO sendMsgReqVO){
        BaseResponse<SendMsgResVO> res = new BaseResponse();
        cimServer.sendMsg(sendMsgReqVO) ;

        counterService.increment(Constants.COUNTER_SERVER_PUSH_COUNT);

        SendMsgResVO sendMsgResVO = new SendMsgResVO() ;
        sendMsgResVO.setMsg("OK") ;
        res.setCode(StatusEnum.SUCCESS.getCode()) ;
        res.setMessage(StatusEnum.SUCCESS.getMessage()) ;
        res.setDataBody(sendMsgResVO) ;
        return res ;
    }

public void sendMsg(SendMsgReqVO sendMsgReqVO){
        //獲取到接受用戶的channel
        NioSocketChannel socketChannel = SessionSocketHolder.get(sendMsgReqVO.getUserId());

        if (null == socketChannel) {
            throw new NullPointerException("客戶端[" + sendMsgReqVO.getUserId() + "]不在線族檬!");
        }
        CIMRequestProto.CIMReqProtocol protocol = CIMRequestProto.CIMReqProtocol.newBuilder()
                .setRequestId(sendMsgReqVO.getUserId())
                .setReqMsg(sendMsgReqVO.getMsg())
                .setType(Constants.CommandType.MSG)
                .build();

        ChannelFuture future = socketChannel.writeAndFlush(protocol);
        future.addListener((ChannelFutureListener) channelFuture ->
                LOGGER.info("服務端手動發(fā)送 Google Protocol 成功={}", sendMsgReqVO.toString()));
    }
  • 找到接受用戶的channel,寫入protocol就行化戳。
3. channel和session的保存
public class SessionSocketHolder {
    //userid ---> channel
    private static final Map<Long, NioSocketChannel> CHANNEL_MAP = new ConcurrentHashMap<>(16);
    //userid --->username
    private static final Map<Long, String> SESSION_MAP = new ConcurrentHashMap<>(16);
    
    //保存用戶消息
    public static CIMUserInfo getUserId(NioSocketChannel nioSocketChannel){
        for (Map.Entry<Long, NioSocketChannel> entry : CHANNEL_MAP.entrySet()) {
            NioSocketChannel value = entry.getValue();
            if (nioSocketChannel == value){
                Long key = entry.getKey();
                String userName = SESSION_MAP.get(key);
                CIMUserInfo info = new CIMUserInfo(key,userName) ;
                return info ;
            }
        }

        return null;
    }
}

用兩個map保存userid ---> channel和userid --->username的對應单料。這樣方便快速查找。

4. CIMServerHandle的處理

CIMServerHandle 主要處理client的登陸信息点楼。

public class CIMServerHandle extends SimpleChannelInboundHandler<CIMRequestProto.CIMReqProtocol> {

    /**
     * 取消綁定
     *
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        //可能出現(xiàn)業(yè)務判斷離線后再次觸發(fā) channelInactive
        CIMUserInfo userInfo = SessionSocketHolder.getUserId((NioSocketChannel) ctx.channel());
        if (userInfo != null){
            LOGGER.warn("[{}]觸發(fā) channelInactive 掉線!",userInfo.getUserName());
           //remove SessionSocketHolder  里面保存的信息
            userOffLine(userInfo, (NioSocketChannel) ctx.channel());
            ctx.channel().close();
        }
    }

    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        if (evt instanceof IdleStateEvent) {
            IdleStateEvent idleStateEvent = (IdleStateEvent) evt;
            if (idleStateEvent.state() == IdleState.READER_IDLE) {

                LOGGER.info("定時檢測客戶端端是否存活");

                HeartBeatHandler heartBeatHandler = SpringBeanFactory.getBean(ServerHeartBeatHandlerImpl.class) ;
                heartBeatHandler.process(ctx) ;
            }
        }
        super.userEventTriggered(ctx, evt);
    }

    /**
     * 用戶下線
     * @param userInfo
     * @param channel
     * @throws IOException
     */
    private void userOffLine(CIMUserInfo userInfo, NioSocketChannel channel) throws IOException {
        LOGGER.info("用戶[{}]下線", userInfo.getUserName());
        SessionSocketHolder.remove(channel);
        SessionSocketHolder.removeSession(userInfo.getUserId());

        //清除路由關系扫尖,清除router中保存的userid --> server的對應關系
        clearRouteInfo(userInfo);
    }

   


    @Override
    protected void channelRead0(ChannelHandlerContext ctx, CIMRequestProto.CIMReqProtocol msg) throws Exception {
        LOGGER.info("收到msg={}", msg.toString());

        if (msg.getType() == Constants.CommandType.LOGIN) {
            //保存客戶端與 Channel 之間的關系
            SessionSocketHolder.put(msg.getRequestId(), (NioSocketChannel) ctx.channel());
            SessionSocketHolder.saveSession(msg.getRequestId(), msg.getReqMsg());
            LOGGER.info("客戶端[{}]上線成功", msg.getReqMsg());
        }

        //心跳更新時間
        if (msg.getType() == Constants.CommandType.PING){
            NettyAttrUtil.updateReaderTime(ctx.channel(),System.currentTimeMillis());
            //向客戶端響應 pong 消息
            CIMRequestProto.CIMReqProtocol heartBeat = SpringBeanFactory.getBean("heartBeat",
                    CIMRequestProto.CIMReqProtocol.class);
            ctx.writeAndFlush(heartBeat).addListeners((ChannelFutureListener) future -> {
                if (!future.isSuccess()) {
                    LOGGER.error("IO error,close Channel");
                    future.channel().close();
                }
            }) ;
        }

    }
}

  • 主要接受client的注冊,保存client的channel,方便server寫入channe掠廓。
  • 客戶端的心跳是判斷channel的當前時間-最后的讀的時間是否大于給定的time,如果大于藏斩,則說明超時。則需要關閉客戶端連接却盘,清除userid--->channel狰域,userid--->username的映射。然后通知router清楚userid--->server的映射黄橘。
總結(jié)

綜上所述兆览,server端的主要任務是完成注冊,即保存userid--->channel的通道塞关。待收到消息后抬探,取出channel,往channel寫入消息即可。在處理心跳的時候小压,當遇到讀空閑的時候线梗,判斷當前時間-上次讀時間是否大于預先設定的空閑時間,如果超了怠益,則清除userid--->channel的緩存仪搔,userid--->username的緩存。并告知router下線蜻牢。

最后編輯于
?著作權歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末烤咧,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子抢呆,更是在濱河造成了極大的恐慌煮嫌,老刑警劉巖,帶你破解...
    沈念sama閱讀 206,378評論 6 481
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件抱虐,死亡現(xiàn)場離奇詭異昌阿,居然都是意外死亡,警方通過查閱死者的電腦和手機恳邀,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,356評論 2 382
  • 文/潘曉璐 我一進店門宝泵,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人轩娶,你說我怎么就攤上這事儿奶。” “怎么了鳄抒?”我有些...
    開封第一講書人閱讀 152,702評論 0 342
  • 文/不壞的土叔 我叫張陵闯捎,是天一觀的道長。 經(jīng)常有香客問我许溅,道長瓤鼻,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 55,259評論 1 279
  • 正文 為了忘掉前任贤重,我火速辦了婚禮茬祷,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘并蝗。我一直安慰自己祭犯,他們只是感情好,可當我...
    茶點故事閱讀 64,263評論 5 371
  • 文/花漫 我一把揭開白布滚停。 她就那樣靜靜地躺著沃粗,像睡著了一般。 火紅的嫁衣襯著肌膚如雪键畴。 梳的紋絲不亂的頭發(fā)上最盅,一...
    開封第一講書人閱讀 49,036評論 1 285
  • 那天,我揣著相機與錄音,去河邊找鬼涡贱。 笑死咏删,一個胖子當著我的面吹牛,可吹牛的內(nèi)容都是我干的问词。 我是一名探鬼主播督函,決...
    沈念sama閱讀 38,349評論 3 400
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼戏售!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起草穆,我...
    開封第一講書人閱讀 36,979評論 0 259
  • 序言:老撾萬榮一對情侶失蹤灌灾,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后悲柱,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體锋喜,經(jīng)...
    沈念sama閱讀 43,469評論 1 300
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 35,938評論 2 323
  • 正文 我和宋清朗相戀三年豌鸡,在試婚紗的時候發(fā)現(xiàn)自己被綠了嘿般。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 38,059評論 1 333
  • 序言:一個原本活蹦亂跳的男人離奇死亡涯冠,死狀恐怖炉奴,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情蛇更,我是刑警寧澤瞻赶,帶...
    沈念sama閱讀 33,703評論 4 323
  • 正文 年R本政府宣布,位于F島的核電站派任,受9級特大地震影響砸逊,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜掌逛,卻給世界環(huán)境...
    茶點故事閱讀 39,257評論 3 307
  • 文/蒙蒙 一师逸、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧豆混,春花似錦篓像、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,262評論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至心傀,卻和暖如春屈暗,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 31,485評論 1 262
  • 我被黑心中介騙來泰國打工养叛, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留种呐,地道東北人。 一個月前我還...
    沈念sama閱讀 45,501評論 2 354
  • 正文 我出身青樓弃甥,卻偏偏與公主長得像爽室,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子淆攻,可洞房花燭夜當晚...
    茶點故事閱讀 42,792評論 2 345