cim github地址: https://github.com/crossoverJie/cim
- 第一篇: CIM-client 功能和設計分析
- 第二篇:CIM-router功能和設計分析
- 第三篇:CIM-server功能和設計分析
分析完了CIM-client,CIM-router后都哭,最后分析下CIM-server就完整了秩伞。借用crossoverjie的架構(gòu)圖如下:
client與client通信都是通過router作為中介,相當于router作為中轉(zhuǎn)站欺矫。一個client只要知道另外一個client與那個server連接起來纱新,router就把消息發(fā)送該server。然后該server把消息寫到client的channel里面去穆趴。
server端稍微簡單點脸爱,直接進入主題。
1. 程序入口
public class CIMServerApplication implements CommandLineRunner{
private final static Logger LOGGER = LoggerFactory.getLogger(CIMServerApplication.class);
@Autowired
private AppConfiguration appConfiguration ;
@Value("${server.port}")
private int httpPort ;
// 正常啟動
public static void main(String[] args) {
SpringApplication.run(CIMServerApplication.class, args);
LOGGER.info("啟動 Server 成功");
}
// 把本地服務ip+prot 注冊到ZK上
@Override
public void run(String... args) throws Exception {
//獲得本機IP
String addr = InetAddress.getLocalHost().getHostAddress();
Thread thread = new Thread(new RegistryZK(addr, appConfiguration.getCimServerPort(),httpPort));
thread.setName("registry-zk");
thread.start() ;
}
}
public class RegistryZK implements Runnable {
@Override
public void run() {
//創(chuàng)建父節(jié)點
zKit.createRootNode();
//是否要將自己注冊到 ZK
if (appConfiguration.isZkSwitch()){
String path = appConfiguration.getZkRoot() + "/ip-" + ip + ":" + cimServerPort + ":" + httpPort;
zKit.createNode(path);
logger.info("注冊 zookeeper 成功未妹,msg=[{}]", path);
}
}
}
- 以上主要是將自己注冊到ZK中簿废,作為服務被發(fā)現(xiàn)。
2. Server發(fā)送消息
server收到發(fā)送消息的router的請求络它,將http請求過來的消息發(fā)送給指定的client
@ApiOperation("服務端發(fā)送消息")
@RequestMapping(value = "sendMsg",method = RequestMethod.POST)
@ResponseBody
public BaseResponse<SendMsgResVO> sendMsg(@RequestBody SendMsgReqVO sendMsgReqVO){
BaseResponse<SendMsgResVO> res = new BaseResponse();
cimServer.sendMsg(sendMsgReqVO) ;
counterService.increment(Constants.COUNTER_SERVER_PUSH_COUNT);
SendMsgResVO sendMsgResVO = new SendMsgResVO() ;
sendMsgResVO.setMsg("OK") ;
res.setCode(StatusEnum.SUCCESS.getCode()) ;
res.setMessage(StatusEnum.SUCCESS.getMessage()) ;
res.setDataBody(sendMsgResVO) ;
return res ;
}
public void sendMsg(SendMsgReqVO sendMsgReqVO){
//獲取到接受用戶的channel
NioSocketChannel socketChannel = SessionSocketHolder.get(sendMsgReqVO.getUserId());
if (null == socketChannel) {
throw new NullPointerException("客戶端[" + sendMsgReqVO.getUserId() + "]不在線族檬!");
}
CIMRequestProto.CIMReqProtocol protocol = CIMRequestProto.CIMReqProtocol.newBuilder()
.setRequestId(sendMsgReqVO.getUserId())
.setReqMsg(sendMsgReqVO.getMsg())
.setType(Constants.CommandType.MSG)
.build();
ChannelFuture future = socketChannel.writeAndFlush(protocol);
future.addListener((ChannelFutureListener) channelFuture ->
LOGGER.info("服務端手動發(fā)送 Google Protocol 成功={}", sendMsgReqVO.toString()));
}
- 找到接受用戶的channel,寫入protocol就行化戳。
3. channel和session的保存
public class SessionSocketHolder {
//userid ---> channel
private static final Map<Long, NioSocketChannel> CHANNEL_MAP = new ConcurrentHashMap<>(16);
//userid --->username
private static final Map<Long, String> SESSION_MAP = new ConcurrentHashMap<>(16);
//保存用戶消息
public static CIMUserInfo getUserId(NioSocketChannel nioSocketChannel){
for (Map.Entry<Long, NioSocketChannel> entry : CHANNEL_MAP.entrySet()) {
NioSocketChannel value = entry.getValue();
if (nioSocketChannel == value){
Long key = entry.getKey();
String userName = SESSION_MAP.get(key);
CIMUserInfo info = new CIMUserInfo(key,userName) ;
return info ;
}
}
return null;
}
}
用兩個map保存userid ---> channel和userid --->username的對應单料。這樣方便快速查找。
4. CIMServerHandle
的處理
CIMServerHandle
主要處理client的登陸信息点楼。
public class CIMServerHandle extends SimpleChannelInboundHandler<CIMRequestProto.CIMReqProtocol> {
/**
* 取消綁定
*
* @param ctx
* @throws Exception
*/
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
//可能出現(xiàn)業(yè)務判斷離線后再次觸發(fā) channelInactive
CIMUserInfo userInfo = SessionSocketHolder.getUserId((NioSocketChannel) ctx.channel());
if (userInfo != null){
LOGGER.warn("[{}]觸發(fā) channelInactive 掉線!",userInfo.getUserName());
//remove SessionSocketHolder 里面保存的信息
userOffLine(userInfo, (NioSocketChannel) ctx.channel());
ctx.channel().close();
}
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent) {
IdleStateEvent idleStateEvent = (IdleStateEvent) evt;
if (idleStateEvent.state() == IdleState.READER_IDLE) {
LOGGER.info("定時檢測客戶端端是否存活");
HeartBeatHandler heartBeatHandler = SpringBeanFactory.getBean(ServerHeartBeatHandlerImpl.class) ;
heartBeatHandler.process(ctx) ;
}
}
super.userEventTriggered(ctx, evt);
}
/**
* 用戶下線
* @param userInfo
* @param channel
* @throws IOException
*/
private void userOffLine(CIMUserInfo userInfo, NioSocketChannel channel) throws IOException {
LOGGER.info("用戶[{}]下線", userInfo.getUserName());
SessionSocketHolder.remove(channel);
SessionSocketHolder.removeSession(userInfo.getUserId());
//清除路由關系扫尖,清除router中保存的userid --> server的對應關系
clearRouteInfo(userInfo);
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, CIMRequestProto.CIMReqProtocol msg) throws Exception {
LOGGER.info("收到msg={}", msg.toString());
if (msg.getType() == Constants.CommandType.LOGIN) {
//保存客戶端與 Channel 之間的關系
SessionSocketHolder.put(msg.getRequestId(), (NioSocketChannel) ctx.channel());
SessionSocketHolder.saveSession(msg.getRequestId(), msg.getReqMsg());
LOGGER.info("客戶端[{}]上線成功", msg.getReqMsg());
}
//心跳更新時間
if (msg.getType() == Constants.CommandType.PING){
NettyAttrUtil.updateReaderTime(ctx.channel(),System.currentTimeMillis());
//向客戶端響應 pong 消息
CIMRequestProto.CIMReqProtocol heartBeat = SpringBeanFactory.getBean("heartBeat",
CIMRequestProto.CIMReqProtocol.class);
ctx.writeAndFlush(heartBeat).addListeners((ChannelFutureListener) future -> {
if (!future.isSuccess()) {
LOGGER.error("IO error,close Channel");
future.channel().close();
}
}) ;
}
}
}
- 主要接受client的注冊,保存client的channel,方便server寫入channe掠廓。
- 客戶端的心跳是判斷channel的當前時間-最后的讀的時間是否大于給定的time,如果大于藏斩,則說明超時。則需要關閉客戶端連接却盘,清除userid--->channel狰域,userid--->username的映射。然后通知router清楚userid--->server的映射黄橘。
總結(jié)
綜上所述兆览,server端的主要任務是完成注冊,即保存userid--->channel的通道塞关。待收到消息后抬探,取出channel,往channel寫入消息即可。在處理心跳的時候小压,當遇到讀空閑的時候线梗,判斷當前時間-上次讀時間是否大于預先設定的空閑時間,如果超了怠益,則清除userid--->channel的緩存仪搔,userid--->username的緩存。并告知router下線蜻牢。