感覺Crossoverjie的一個開源cim(即時通訊系統(tǒng)朴乖,源碼和設計個人覺得不錯吸耿,空閑的時候分析一下氓癌。
cim github地址: https://github.com/crossoverJie/cim
- 第一篇: CIM-client 功能和設計分析
- 第二篇:CIM-router功能和設計分析
- 第三篇:CIM-server功能和設計分析
協(xié)議設計
1. 請求協(xié)議類圖
-
BaseRequest
作為基類绒障,具有所有的請求都應該具備的兩個屬性
public class BaseRequest {
// 請求的序列號
private String reqNo;
//請求的時間戳,構造的時候就設置為System.currentTimeMillis() / 1000
private int timeStamp;
}
-
GoogleProtocolVO
增加了requestId
和msg
兩個字段酝碳,表示傳輸GoogleProtocol 消息矾踱。 -
GroupReqVO
增加了userId
,msg
兩個字段,表示傳輸?shù)氖侨毫南ⅰ?/li> -
LoginReqVO
增加了userId
,userName
兩個字段击敌,表示傳輸?shù)氖堑卿浵?/li> -
P2PReqVO
增加了userId
,receiveUserId
,msg
字段介返,表示傳輸?shù)氖且粚σ凰搅南?/li> -
SendMsgReqVO
增加了msg
,userId
字段拴事,表示通常的傳輸發(fā)送消息 -
StringReqVO
增加了msg
字段沃斤,表示用來傳輸String的消息
2. 相應協(xié)議類圖
-
CIMServerResVO
用來接收查詢路由選中的服務器的響應消息圣蝎,格式如下:
{
code : 9000
message : 成功
reqNo : null
dataBody : {"ip":"127.0.0.1","port":8081}
}
-
OnlineUsersResVO
用來接受查詢所有在線用戶的響應消息,格式如下:
{
code : 9000
message : 成功
reqNo : null
dataBody : [{"userId":1545574841528,"userName":"zhangsan"},{"userId":1545574871143,"userName":"crossoverJie"}]
}
-
SendMsgResVO
表示發(fā)送消息的響應
3. 程序運行流程
3.1 程序入口類
public class CIMClientApplication implements CommandLineRunner{
private final static Logger LOGGER = LoggerFactory.getLogger(CIMClientApplication.class);
@Autowired
private ClientInfo clientInfo ;
public static void main(String[] args) {
SpringApplication.run(CIMClientApplication.class, args);
LOGGER.info("啟動 Client 服務成功");
}
@Override
public void run(String... args) throws Exception {
Scan scan = new Scan() ;
Thread thread = new Thread(scan);
thread.setName("scan-thread");
thread.start();
clientInfo.saveStartDate();
}
- 標準的Springboot啟動流程衡瓶,重寫run方法在Springboot應用啟動后就啟動一個線程去監(jiān)聽控制臺徘公,根據(jù)用戶的命令,做相應的操作哮针。
3.2 Scan
掃描用戶的輸入命令
public void run() {
Scanner sc = new Scanner(System.in);
while (true) {
String msg = sc.nextLine();
//檢查消息,保證輸入消息不能不為null
if (msgHandle.checkMsg(msg)) {
continue;
}
//系統(tǒng)內(nèi)置命令
if (msgHandle.innerCommand(msg)){
continue;
}
//真正的發(fā)送消息
msgHandle.sendMsg(msg) ;
//寫入聊天記錄
msgLogger.log(msg) ;
LOGGER.info("{}:【{}】", configuration.getUserName(), msg);
}
}
- 經(jīng)過檢查消息是否為空字符串关面,是否是內(nèi)置命令,最后剩下的是用戶發(fā)送的消息十厢。
3.3 內(nèi)置命令的處理
如果是內(nèi)置命令等太,轉而通過反射實例化每個命令,這里用到命令模式蛮放。
public boolean innerCommand(String msg) {
if (msg.startsWith(":")) {
InnerCommand instance = innerCommandContext.getInstance(msg);
//調(diào)用里面的方法
instance.process(msg) ;
return true;
} else {
return false;
}
}
public InnerCommand getInstance(String command) {
//// 每個命令對應一個實現(xiàn)類
Map<String, String> allClazz = SystemCommandEnum.getAllClazz();
//兼容需要命令后接參數(shù)的數(shù)據(jù) :q cross
String[] trim = command.trim().split(" ");
String clazz = allClazz.get(trim[0]);
InnerCommand innerCommand = null;
try {
if (StringUtil.isEmpty(clazz)){
clazz = PrintAllCommand.class.getName() ;
}
//根據(jù)類名獲取到在容器里面的實例
innerCommand = (InnerCommand) SpringBeanFactory.getBean(Class.forName(clazz));
} catch (Exception e) {
LOGGER.error("Exception", e);
}
return innerCommand;
}
-
內(nèi)部完整命令缩抡,以及他們的實現(xiàn)類如下
image.png
完整命令類類圖如下:
image.png
看其中一個實現(xiàn)類
public class PrintOnlineUsersCommand implements InnerCommand {
private final static Logger LOGGER = LoggerFactory.getLogger(PrintOnlineUsersCommand.class);
@Autowired
private RouteRequest routeRequest ;
@Override
public void process(String msg) {
try {
// 查詢所有的在線用戶,委托routeRequest 來查詢
List<OnlineUsersResVO.DataBodyBean> onlineUsers = routeRequest.onlineUsers();
LOGGER.info("~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~");
for (OnlineUsersResVO.DataBodyBean onlineUser : onlineUsers) {
LOGGER.info("userId={}=====userName={}", onlineUser.getUserId(), onlineUser.getUserName());
}
LOGGER.info("~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~");
} catch (Exception e) {
LOGGER.error("Exception", e);
}
}
}
都是通過其中的process來處理邏輯
3.4 處理內(nèi)置命令后包颁,接著來看處理發(fā)送消息
public void sendMsg(String msg) {
if (aiModel) {
//ai 模式主要是調(diào)侃之前那個價值兩億的融資項目
aiChat(msg);
} else {
// 正常的聊天
normalChat(msg);
}
}
private void normalChat(String msg) {
String[] totalMsg = msg.split(";;");
// 私聊的格式是:12345;;hello
if (totalMsg.length > 1) {
//私聊
P2PReqVO p2PReqVO = new P2PReqVO();
p2PReqVO.setUserId(configuration.getUserId());
p2PReqVO.setReceiveUserId(Long.parseLong(totalMsg[0]));
p2PReqVO.setMsg(totalMsg[1]);
try {
p2pChat(p2PReqVO);
} catch (Exception e) {
LOGGER.error("Exception", e);
}
} else {
//群聊 直接發(fā)消息就行
GroupReqVO groupReqVO = new GroupReqVO(configuration.getUserId(), msg);
try {
groupChat(groupReqVO);
} catch (Exception e) {
LOGGER.error("Exception", e);
}
}
}
群聊和私聊也都委托 routeRequest來實現(xiàn)
@Override
public void groupChat(GroupReqVO groupReqVO) throws Exception {
routeRequest.sendGroupMsg(groupReqVO);
}
@Override
public void p2pChat(P2PReqVO p2PReqVO) throws Exception {
routeRequest.sendP2PMsg(p2PReqVO);
}
3.5 處理聊天記錄
接著最開始的時候看瞻想,聊天完成后,需要把聊天記錄寫入文件娩嚼,實現(xiàn)如下
public void log(String msg) {
//開始消費蘑险,異步完成
startMsgLogger();
try {
//往阻塞隊列里面添加
blockingQueue.put(msg);
} catch (InterruptedException e) {
LOGGER.error("InterruptedException", e);
}
}
啟動消息線程,往阻塞隊列里面添加消息
private class Worker extends Thread {
@Override
public void run() {
while (started) {
try {
//往阻塞隊列里面取
String msg = blockingQueue.take();
writeLog(msg);
} catch (InterruptedException e) {
break;
}
}
}
}
真正寫入文件的實現(xiàn)如下:
private void writeLog(String msg) {
LocalDate today = LocalDate.now();
int year = today.getYear();
int month = today.getMonthValue();
int day = today.getDayOfMonth();
String dir = appConfiguration.getMsgLoggerPath() + appConfiguration.getUserName() + "/";
String fileName = dir + year + month + day + ".log";
Path file = Paths.get(fileName);
boolean exists = Files.exists(Paths.get(dir), LinkOption.NOFOLLOW_LINKS);
try {
if (!exists) {
Files.createDirectories(Paths.get(dir));
}
List<String> lines = Arrays.asList(msg);
Files.write(file, lines, Charset.forName("UTF-8"), StandardOpenOption.CREATE, StandardOpenOption.APPEND);
} catch (IOException e) {
LOGGER.info("IOException", e);
}
}
查找聊天記錄的實現(xiàn)如下岳悟,就是簡單的查找每個文件的每行佃迄,然后看是否包含,這樣的方式很暴力竿音,后期的話有很大改進:
@Override
public String query(String key) {
StringBuilder sb = new StringBuilder();
Path path = Paths.get(appConfiguration.getMsgLoggerPath() + appConfiguration.getUserName() + "/");
try {
Stream<Path> list = Files.list(path);
List<Path> collect = list.collect(Collectors.toList());
for (Path file : collect) {
List<String> strings = Files.readAllLines(file);
for (String msg : strings) {
if (msg.trim().contains(key)) {
sb.append(msg).append("\n");
}
}
}
} catch (IOException e) {
LOGGER.info("IOException", e);
}
return sb.toString().replace(key, "\033[31;4m" + key + "\033[0m");
}
3.6 RouteRequestImpl
的實現(xiàn)
這個實現(xiàn)里面包含眾多的功能和屎,例如,群聊春瞬,私聊柴信,離線,獲取在線用戶宽气,獲取一個可用的服務ip随常。這些功能的實現(xiàn)都是依靠 RouteRequestImpl
來完成,而RouteRequestImpl里面的實現(xiàn)是通過okhttp遠程調(diào)用cim-router的http接口實現(xiàn)的萄涯⌒鞣眨看其中的群聊功能:
public void sendGroupMsg(GroupReqVO groupReqVO) throws Exception {
//序列化
JSONObject jsonObject = new JSONObject();
jsonObject.put("msg",groupReqVO.getMsg());
jsonObject.put("userId",groupReqVO.getUserId());
RequestBody requestBody = RequestBody.create(mediaType,jsonObject.toString());
Request request = new Request.Builder()
.url(groupRouteRequestUrl)
.post(requestBody)
.build();
//發(fā)送http請求cim-router
Response response = okHttpClient.newCall(request).execute() ;
try {
if (!response.isSuccessful()){
throw new IOException("Unexpected code " + response);
}
}finally {
response.body().close();
}
}
3.7 客戶端的啟動
上面所有的都是內(nèi)置命令的處理以及和cim-router的通信。但是涝影,client最終是要和server通信的枣察,所以在這個過程中,客戶端作為netty客戶端需要啟動。這個啟動過程可以在CIMClient
實例化的過程中啟動
@Component
public class CIMClient {
//構造函數(shù)完成后調(diào)用
@PostConstruct
public void start() throws Exception {
//登錄 + 獲取可以使用的服務器 ip+port
CIMServerResVO.ServerInfo cimServer = userLogin();
//啟動客戶端
startClient(cimServer);
//向服務端注冊
loginCIMServer();
}
}
向路由注冊并返回可用的服務器地址
private CIMServerResVO.ServerInfo userLogin() {
LoginReqVO loginReqVO = new LoginReqVO(userId, userName);
CIMServerResVO.ServerInfo cimServer = null;
try {
//獲取可用的服務器
cimServer = routeRequest.getCIMServer(loginReqVO);
//保存系統(tǒng)信息
clientInfo.saveServiceInfo(cimServer.getIp() + ":" + cimServer.getCimServerPort())
.saveUserInfo(userId, userName);
LOGGER.info("cimServer=[{}]", cimServer.toString());
} catch (Exception e) {
errorCount++;
if (errorCount >= configuration.getErrorCount()) {
LOGGER.error("重連次數(shù)達到上限[{}]次", errorCount);
msgHandle.shutdown();
}
LOGGER.error("登錄失敗", e);
}
return cimServer;
}
啟動客戶端到服務端(上一步獲取的)的channel
private void startClient(CIMServerResVO.ServerInfo cimServer) {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group)
.channel(NioSocketChannel.class)
.handler(new CIMClientHandleInitializer())
;
ChannelFuture future = null;
try {
future = bootstrap.connect(cimServer.getIp(), cimServer.getCimServerPort()).sync();
} catch (InterruptedException e) {
errorCount++;
if (errorCount >= configuration.getErrorCount()) {
LOGGER.error("鏈接失敗次數(shù)達到上限[{}]次", errorCount);
msgHandle.shutdown();
}
LOGGER.error("連接失敗", e);
}
if (future.isSuccess()) {
LOGGER.info("啟動 cim client 成功");
}
channel = (SocketChannel) future.channel();
}
向服務器注冊
private void loginCIMServer() {
CIMRequestProto.CIMReqProtocol login = CIMRequestProto.CIMReqProtocol.newBuilder()
.setRequestId(userId)
.setReqMsg(userName)
.setType(Constants.CommandType.LOGIN)
.build();
ChannelFuture future = channel.writeAndFlush(login);
future.addListener((ChannelFutureListener) channelFuture ->
LOGGER.info("注冊成功={}", login.toString()));
}
總結
到這里整cim-client的功能就完成了序目,客戶端就是通過命令模式通過okhttp遠程調(diào)用特定的服務地址來注冊臂痕,獲取服務器地址,完成運維猿涨。通過從cim-router拿到的服務器地址握童,建立客戶端-服務端的連接,即可完成消息私聊叛赚,群聊澡绩。