CIM-client 功能和設計分析

感覺Crossoverjie的一個開源cim(即時通訊系統(tǒng)朴乖,源碼和設計個人覺得不錯吸耿,空閑的時候分析一下氓癌。
cim github地址: https://github.com/crossoverJie/cim

協(xié)議設計

1. 請求協(xié)議類圖
image.png
  • BaseRequest 作為基類绒障,具有所有的請求都應該具備的兩個屬性
public class BaseRequest {
    // 請求的序列號
    private String reqNo;
   //請求的時間戳,構造的時候就設置為System.currentTimeMillis() / 1000
    private int timeStamp;
}
  • GoogleProtocolVO 增加了requestIdmsg兩個字段酝碳,表示傳輸GoogleProtocol 消息矾踱。
  • GroupReqVO 增加了userId,msg 兩個字段,表示傳輸?shù)氖侨毫南ⅰ?/li>
  • LoginReqVO 增加了userId,userName兩個字段击敌,表示傳輸?shù)氖堑卿浵?/li>
  • P2PReqVO 增加了userId,receiveUserId,msg 字段介返,表示傳輸?shù)氖且粚σ凰搅南?/li>
  • SendMsgReqVO 增加了msg,userId字段拴事,表示通常的傳輸發(fā)送消息
  • StringReqVO 增加了msg字段沃斤,表示用來傳輸String的消息
2. 相應協(xié)議類圖
image.png
  • CIMServerResVO 用來接收查詢路由選中的服務器的響應消息圣蝎,格式如下:
{
      code : 9000
      message : 成功
      reqNo : null
      dataBody : {"ip":"127.0.0.1","port":8081} 
}    
  • OnlineUsersResVO用來接受查詢所有在線用戶的響應消息,格式如下:
{
     code : 9000
     message : 成功
     reqNo : null
     dataBody : [{"userId":1545574841528,"userName":"zhangsan"},{"userId":1545574871143,"userName":"crossoverJie"}]
}
  • SendMsgResVO 表示發(fā)送消息的響應
3. 程序運行流程
3.1 程序入口類
public class CIMClientApplication implements CommandLineRunner{

    private final static Logger LOGGER = LoggerFactory.getLogger(CIMClientApplication.class);

    @Autowired
    private ClientInfo clientInfo ;
    public static void main(String[] args) {
        SpringApplication.run(CIMClientApplication.class, args);
        LOGGER.info("啟動 Client 服務成功");
    }

    @Override
    public void run(String... args) throws Exception { 
        Scan scan = new Scan() ;
        Thread thread = new Thread(scan);
        thread.setName("scan-thread");
        thread.start();
        clientInfo.saveStartDate();
    }
  • 標準的Springboot啟動流程衡瓶,重寫run方法在Springboot應用啟動后就啟動一個線程去監(jiān)聽控制臺徘公,根據(jù)用戶的命令,做相應的操作哮针。
3.2 Scan掃描用戶的輸入命令
 public void run() {
        Scanner sc = new Scanner(System.in);
        while (true) {
            String msg = sc.nextLine();
            //檢查消息,保證輸入消息不能不為null
            if (msgHandle.checkMsg(msg)) {
                continue;
            }
            //系統(tǒng)內(nèi)置命令
            if (msgHandle.innerCommand(msg)){
                continue;
            }
            //真正的發(fā)送消息
            msgHandle.sendMsg(msg) ;
            //寫入聊天記錄
            msgLogger.log(msg) ;
            LOGGER.info("{}:【{}】", configuration.getUserName(), msg);
        }
    }
  • 經(jīng)過檢查消息是否為空字符串关面,是否是內(nèi)置命令,最后剩下的是用戶發(fā)送的消息十厢。
3.3 內(nèi)置命令的處理

如果是內(nèi)置命令等太,轉而通過反射實例化每個命令,這里用到命令模式蛮放。

public boolean innerCommand(String msg) {

        if (msg.startsWith(":")) {
            
            InnerCommand instance = innerCommandContext.getInstance(msg);
            //調(diào)用里面的方法
            instance.process(msg) ;
            return true;
        } else {
            return false;
        }
    }

 public InnerCommand getInstance(String command) {
        //// 每個命令對應一個實現(xiàn)類
        Map<String, String> allClazz = SystemCommandEnum.getAllClazz();

        //兼容需要命令后接參數(shù)的數(shù)據(jù) :q cross
        String[] trim = command.trim().split(" ");
        String clazz = allClazz.get(trim[0]);
        InnerCommand innerCommand = null;
        try {
            if (StringUtil.isEmpty(clazz)){
                clazz = PrintAllCommand.class.getName() ;
            }
            //根據(jù)類名獲取到在容器里面的實例
            innerCommand = (InnerCommand) SpringBeanFactory.getBean(Class.forName(clazz));
        } catch (Exception e) {
            LOGGER.error("Exception", e);
        }

        return innerCommand;
    }
  • 內(nèi)部完整命令缩抡,以及他們的實現(xiàn)類如下


    image.png

    完整命令類類圖如下:


    image.png

    看其中一個實現(xiàn)類
public class PrintOnlineUsersCommand implements InnerCommand {
    private final static Logger LOGGER = LoggerFactory.getLogger(PrintOnlineUsersCommand.class);

    @Autowired
    private RouteRequest routeRequest ;

    @Override
    public void process(String msg) {
        try {
            // 查詢所有的在線用戶,委托routeRequest 來查詢
            List<OnlineUsersResVO.DataBodyBean> onlineUsers = routeRequest.onlineUsers();

            LOGGER.info("~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~");
            for (OnlineUsersResVO.DataBodyBean onlineUser : onlineUsers) {
                LOGGER.info("userId={}=====userName={}", onlineUser.getUserId(), onlineUser.getUserName());
            }
            LOGGER.info("~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~");

        } catch (Exception e) {
            LOGGER.error("Exception", e);
        }
    }
}

都是通過其中的process來處理邏輯

3.4 處理內(nèi)置命令后包颁,接著來看處理發(fā)送消息
public void sendMsg(String msg) {
 
        if (aiModel) {
           //ai 模式主要是調(diào)侃之前那個價值兩億的融資項目
            aiChat(msg);
        } else {
           // 正常的聊天
            normalChat(msg);
        }
    }

private void normalChat(String msg) {
        String[] totalMsg = msg.split(";;");
        // 私聊的格式是:12345;;hello
        if (totalMsg.length > 1) {
            //私聊
            P2PReqVO p2PReqVO = new P2PReqVO();
            p2PReqVO.setUserId(configuration.getUserId());
            p2PReqVO.setReceiveUserId(Long.parseLong(totalMsg[0]));
            p2PReqVO.setMsg(totalMsg[1]);
            try {
                p2pChat(p2PReqVO);
            } catch (Exception e) {
                LOGGER.error("Exception", e);
            }

        } else {
            //群聊 直接發(fā)消息就行
            GroupReqVO groupReqVO = new GroupReqVO(configuration.getUserId(), msg);
            try {
                groupChat(groupReqVO);
            } catch (Exception e) {
                LOGGER.error("Exception", e);
            }
        }
    }

群聊和私聊也都委托 routeRequest來實現(xiàn)

 @Override
    public void groupChat(GroupReqVO groupReqVO) throws Exception {
        routeRequest.sendGroupMsg(groupReqVO);
    }

    @Override
    public void p2pChat(P2PReqVO p2PReqVO) throws Exception {

        routeRequest.sendP2PMsg(p2PReqVO);

    }
3.5 處理聊天記錄

接著最開始的時候看瞻想,聊天完成后,需要把聊天記錄寫入文件娩嚼,實現(xiàn)如下

 public void log(String msg) {
        //開始消費蘑险,異步完成
        startMsgLogger();
        try {
          //往阻塞隊列里面添加
            blockingQueue.put(msg);
        } catch (InterruptedException e) {
            LOGGER.error("InterruptedException", e);
        }
    }

啟動消息線程,往阻塞隊列里面添加消息

private class Worker extends Thread {


        @Override
        public void run() {
            while (started) {
                try {
                    //往阻塞隊列里面取
                    String msg = blockingQueue.take();
                    writeLog(msg);
                } catch (InterruptedException e) {
                    break;
                }
            }
        }

    }

真正寫入文件的實現(xiàn)如下:

private void writeLog(String msg) {

        LocalDate today = LocalDate.now();
        int year = today.getYear();
        int month = today.getMonthValue();
        int day = today.getDayOfMonth();

        String dir = appConfiguration.getMsgLoggerPath() + appConfiguration.getUserName() + "/";
        String fileName = dir + year + month + day + ".log";

        Path file = Paths.get(fileName);
        boolean exists = Files.exists(Paths.get(dir), LinkOption.NOFOLLOW_LINKS);
        try {
            if (!exists) {
                Files.createDirectories(Paths.get(dir));
            }

            List<String> lines = Arrays.asList(msg);

            Files.write(file, lines, Charset.forName("UTF-8"), StandardOpenOption.CREATE, StandardOpenOption.APPEND);
        } catch (IOException e) {
            LOGGER.info("IOException", e);
        }

    }

查找聊天記錄的實現(xiàn)如下岳悟,就是簡單的查找每個文件的每行佃迄,然后看是否包含,這樣的方式很暴力竿音,后期的話有很大改進:

@Override
    public String query(String key) {
        StringBuilder sb = new StringBuilder();

        Path path = Paths.get(appConfiguration.getMsgLoggerPath() + appConfiguration.getUserName() + "/");

        try {
            Stream<Path> list = Files.list(path);
            List<Path> collect = list.collect(Collectors.toList());
            for (Path file : collect) {
                List<String> strings = Files.readAllLines(file);
                for (String msg : strings) {
                    if (msg.trim().contains(key)) {
                        sb.append(msg).append("\n");
                    }
                }

            }
        } catch (IOException e) {
            LOGGER.info("IOException", e);
        }

        return sb.toString().replace(key, "\033[31;4m" + key + "\033[0m");
    }
3.6 RouteRequestImpl的實現(xiàn)

這個實現(xiàn)里面包含眾多的功能和屎,例如,群聊春瞬,私聊柴信,離線,獲取在線用戶宽气,獲取一個可用的服務ip随常。這些功能的實現(xiàn)都是依靠 RouteRequestImpl來完成,而RouteRequestImpl里面的實現(xiàn)是通過okhttp遠程調(diào)用cim-router的http接口實現(xiàn)的萄涯⌒鞣眨看其中的群聊功能:

 public void sendGroupMsg(GroupReqVO groupReqVO) throws Exception {
        //序列化
        JSONObject jsonObject = new JSONObject();
        jsonObject.put("msg",groupReqVO.getMsg());
        jsonObject.put("userId",groupReqVO.getUserId());
        RequestBody requestBody = RequestBody.create(mediaType,jsonObject.toString());
        
        Request request = new Request.Builder()
                .url(groupRouteRequestUrl)
                .post(requestBody)
                .build();
        //發(fā)送http請求cim-router
        Response response = okHttpClient.newCall(request).execute() ;
        try {
            if (!response.isSuccessful()){
                throw new IOException("Unexpected code " + response);
            }
        }finally {
            response.body().close();
        }
    }
3.7 客戶端的啟動

上面所有的都是內(nèi)置命令的處理以及和cim-router的通信。但是涝影,client最終是要和server通信的枣察,所以在這個過程中,客戶端作為netty客戶端需要啟動。這個啟動過程可以在CIMClient實例化的過程中啟動

@Component
public class CIMClient {
   //構造函數(shù)完成后調(diào)用
  @PostConstruct
    public void start() throws Exception {

        //登錄 + 獲取可以使用的服務器 ip+port
        CIMServerResVO.ServerInfo cimServer = userLogin();

        //啟動客戶端
        startClient(cimServer);

        //向服務端注冊
        loginCIMServer();


    }
}

向路由注冊并返回可用的服務器地址

private CIMServerResVO.ServerInfo userLogin() {
        LoginReqVO loginReqVO = new LoginReqVO(userId, userName);
        CIMServerResVO.ServerInfo cimServer = null;
        try {
          //獲取可用的服務器
            cimServer = routeRequest.getCIMServer(loginReqVO);

            //保存系統(tǒng)信息
            clientInfo.saveServiceInfo(cimServer.getIp() + ":" + cimServer.getCimServerPort())
                    .saveUserInfo(userId, userName);

            LOGGER.info("cimServer=[{}]", cimServer.toString());
        } catch (Exception e) {
            errorCount++;

            if (errorCount >= configuration.getErrorCount()) {
                LOGGER.error("重連次數(shù)達到上限[{}]次", errorCount);
                msgHandle.shutdown();
            }
            LOGGER.error("登錄失敗", e);
        }
        return cimServer;
    }

啟動客戶端到服務端(上一步獲取的)的channel

private void startClient(CIMServerResVO.ServerInfo cimServer) {
        Bootstrap bootstrap = new Bootstrap();
        bootstrap.group(group)
                .channel(NioSocketChannel.class)
                .handler(new CIMClientHandleInitializer())
        ;

        ChannelFuture future = null;
        try {
            future = bootstrap.connect(cimServer.getIp(), cimServer.getCimServerPort()).sync();
        } catch (InterruptedException e) {
            errorCount++;

            if (errorCount >= configuration.getErrorCount()) {
                LOGGER.error("鏈接失敗次數(shù)達到上限[{}]次", errorCount);
                msgHandle.shutdown();
            }
            LOGGER.error("連接失敗", e);
        }
        if (future.isSuccess()) {
            LOGGER.info("啟動 cim client 成功");
        }
        channel = (SocketChannel) future.channel();
    }

向服務器注冊

 private void loginCIMServer() {
        CIMRequestProto.CIMReqProtocol login = CIMRequestProto.CIMReqProtocol.newBuilder()
                .setRequestId(userId)
                .setReqMsg(userName)
                .setType(Constants.CommandType.LOGIN)
                .build();
        ChannelFuture future = channel.writeAndFlush(login);
        future.addListener((ChannelFutureListener) channelFuture ->
                LOGGER.info("注冊成功={}", login.toString()));
    }

總結

到這里整cim-client的功能就完成了序目,客戶端就是通過命令模式通過okhttp遠程調(diào)用特定的服務地址來注冊臂痕,獲取服務器地址,完成運維猿涨。通過從cim-router拿到的服務器地址握童,建立客戶端-服務端的連接,即可完成消息私聊叛赚,群聊澡绩。

最后編輯于
?著作權歸作者所有,轉載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市俺附,隨后出現(xiàn)的幾起案子肥卡,更是在濱河造成了極大的恐慌,老刑警劉巖事镣,帶你破解...
    沈念sama閱讀 218,858評論 6 508
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件召调,死亡現(xiàn)場離奇詭異,居然都是意外死亡蛮浑,警方通過查閱死者的電腦和手機唠叛,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,372評論 3 395
  • 文/潘曉璐 我一進店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來沮稚,“玉大人艺沼,你說我怎么就攤上這事≡烫停” “怎么了障般?”我有些...
    開封第一講書人閱讀 165,282評論 0 356
  • 文/不壞的土叔 我叫張陵,是天一觀的道長盛杰。 經(jīng)常有香客問我挽荡,道長,這世上最難降的妖魔是什么即供? 我笑而不...
    開封第一講書人閱讀 58,842評論 1 295
  • 正文 為了忘掉前任定拟,我火速辦了婚禮,結果婚禮上逗嫡,老公的妹妹穿的比我還像新娘青自。我一直安慰自己,他們只是感情好驱证,可當我...
    茶點故事閱讀 67,857評論 6 392
  • 文/花漫 我一把揭開白布延窜。 她就那樣靜靜地躺著,像睡著了一般抹锄。 火紅的嫁衣襯著肌膚如雪逆瑞。 梳的紋絲不亂的頭發(fā)上荠藤,一...
    開封第一講書人閱讀 51,679評論 1 305
  • 那天,我揣著相機與錄音获高,去河邊找鬼商源。 笑死,一個胖子當著我的面吹牛谋减,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播扫沼,決...
    沈念sama閱讀 40,406評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼出爹,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了缎除?” 一聲冷哼從身側響起严就,我...
    開封第一講書人閱讀 39,311評論 0 276
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎器罐,沒想到半個月后梢为,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,767評論 1 315
  • 正文 獨居荒郊野嶺守林人離奇死亡轰坊,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,945評論 3 336
  • 正文 我和宋清朗相戀三年铸董,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片肴沫。...
    茶點故事閱讀 40,090評論 1 350
  • 序言:一個原本活蹦亂跳的男人離奇死亡粟害,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出颤芬,到底是詐尸還是另有隱情悲幅,我是刑警寧澤,帶...
    沈念sama閱讀 35,785評論 5 346
  • 正文 年R本政府宣布站蝠,位于F島的核電站汰具,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏菱魔。R本人自食惡果不足惜留荔,卻給世界環(huán)境...
    茶點故事閱讀 41,420評論 3 331
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望澜倦。 院中可真熱鬧存谎,春花似錦、人聲如沸肥隆。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,988評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽栋艳。三九已至恰聘,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背晴叨。 一陣腳步聲響...
    開封第一講書人閱讀 33,101評論 1 271
  • 我被黑心中介騙來泰國打工凿宾, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人兼蕊。 一個月前我還...
    沈念sama閱讀 48,298評論 3 372
  • 正文 我出身青樓初厚,卻偏偏與公主長得像,于是被迫代替她去往敵國和親孙技。 傳聞我的和親對象是個殘疾皇子产禾,可洞房花燭夜當晚...
    茶點故事閱讀 45,033評論 2 355

推薦閱讀更多精彩內(nèi)容