以流的方式處理IM通訊問(wèn)題好處是無(wú)需多次建立連接旋奢,另外消息的網(wǎng)絡(luò)開(kāi)銷少然痊,而xmpp會(huì)有很多冗余的信息剧浸;
使用開(kāi)源項(xiàng)目:CocoaAsyncSocket ,源碼訪問(wèn) git 地址:https://github.com/robbiehanson/CocoaAsyncSocket
然后引入下面這四個(gè)類:
asyncSocket : 是基于GCD構(gòu)建的TCP/IP 套接字辛蚊,支持TLS / SSL袋马,是線程安全的
asyncUdpSocket : 對(duì)于GCD構(gòu)建的UDP套接字秸应,是線程安全的
框架會(huì)自動(dòng)處理排隊(duì)、緩沖软啼、等待祸挪、檢查等...
MTU:最大傳輸單元(Maximum Transmission Unit,MTU)是指一種通信協(xié)議的某一層上面所能通過(guò)的最大數(shù)據(jù)包大谢咛酢(以字節(jié)為單位);
TCP的MTU通常是1500bytes胧辽,去掉頭部信息公黑,大概剩下1460字節(jié)摄咆;
下面用
CocoaAsyncSocket 來(lái)處理IM通訊吭从,但是會(huì)有一些問(wèn)題含鳞,如粘包,分包問(wèn)題蝉绷;
通常每個(gè) TCP 包頭兩個(gè)字節(jié)要指定數(shù)據(jù)的長(zhǎng)度熔吗;
**粘包
:
**如果包頭兩個(gè)字節(jié)指定長(zhǎng)度小于真實(shí)返回?cái)?shù)據(jù)的長(zhǎng)度,稱為粘包桅狠;
分包:
因?yàn)橐淮沃荒軅鬏敶蠹s1400字節(jié),如果要傳輸2000字節(jié)咨堤,就需要分成2個(gè)包來(lái)處理漩符;
首先要實(shí)現(xiàn)代理:AsyncSocketDelegate
@interface SocketManager : NSObject <UIApplicationDelegate,AsyncSocketDelegate>
{
BOOL allowSelfSignedCertificates;
BOOL allowSSLHostNameMismatch;
}
//主要代理,可在回調(diào)中實(shí)現(xiàn):接受數(shù)據(jù)凸克,鏈接闷沥,寫(xiě)入數(shù)據(jù),斷開(kāi)鏈接等等....
//主要代理蚂维,可在回調(diào)中實(shí)現(xiàn):接受數(shù)據(jù)颖侄,鏈接,寫(xiě)入數(shù)據(jù)孝鹊,斷開(kāi)鏈接等等....
@protocol AsyncSocketDelegate
@optional
/**
- 即將斷開(kāi)鏈接展蒂, 在斷開(kāi)鏈接前苔咪,可使用 unreadData 來(lái)接收最后的數(shù)據(jù)
**/
- (void)onSocket:(AsyncSocket *)sock willDisconnectWithError:(NSError *)err;
/**
- 已經(jīng)斷開(kāi)鏈接:可在此方法內(nèi)釋放 socket柳骄;
**/
- (void)onSocketDidDisconnect:(AsyncSocket *)sock;
/**
- 鏈接到新的socket時(shí)候會(huì)被調(diào)用
**/
- (void)onSocket:(AsyncSocket *)sock didAcceptNewSocket:(AsyncSocket *)newSocket;
/**
- 這個(gè)方法應(yīng)該返回新的 socket 的runloop 耐薯,調(diào)用 [NSRunLoop currentRunLoop]
**/
- (NSRunLoop *)onSocket:(AsyncSocket *)sock wantsRunLoopForNewSocket:(AsyncSocket *)newSocket;
/**
- 即將建立鏈接調(diào)用,返回yes 繼續(xù)体谒,返回no 取消鏈接
**/
- (BOOL)onSocketWillConnect:(AsyncSocket *)sock;
/**
- 準(zhǔn)備讀寫(xiě)操作
**/
- (void)onSocket:(AsyncSocket *)sock didConnectToHost:(NSString *)host port:(UInt16)port;
/**
- 讀取新數(shù)據(jù)
**/
- (void)onSocket:(AsyncSocket *)sock didReadData:(NSData *)data withTag:(long)tag;
/**
- 讀取進(jìn)度
**/
- (void)onSocket:(AsyncSocket *)sock didReadPartialDataOfLength:(NSUInteger)partialLength tag:(long)tag;
/**
- 完成寫(xiě)數(shù)據(jù)
**/
- (void)onSocket:(AsyncSocket *)sock didWriteDataWithTag:(long)tag;
/**
- 寫(xiě)數(shù)據(jù)的過(guò)程中調(diào)用
**/
- (void)onSocket:(AsyncSocket *)sock didWritePartialDataOfLength:(NSUInteger)partialLength tag:(long)tag;
/**
- 讀取數(shù)據(jù)超時(shí)臼婆,通常設(shè)置>0的值,如果設(shè)置<0的值故响,則會(huì)按照默認(rèn)值計(jì)算颁独,會(huì)被調(diào)用多次
**/
- (NSTimeInterval)onSocket:(AsyncSocket *)sock
shouldTimeoutReadWithTag:(long)tag
elapsed:(NSTimeInterval)elapsed
bytesDone:(NSUInteger)length;
/**
- Note that this method may be called multiple times for a single write if you return positive numbers.
- 寫(xiě)操作超時(shí),通常設(shè)置>0
**/
- (NSTimeInterval)onSocket:(AsyncSocket *)sock
shouldTimeoutWriteWithTag:(long)tag
elapsed:(NSTimeInterval)elapsed
bytesDone:(NSUInteger)length;
/**
- 成功建立SSL/TLS鏈接
**/
- (void)onSocketDidSecure:(AsyncSocket *)sock;
@end
define MAX_DATALENGTH 2000000
define HeartBeat_Byte 1
define DataLength_Byte 2
define HEARTBEAT_INTERVAL 60
typedef NS_ENUM(SInt32, HeartBeatType) {
HeartBeatPong = 0xFE,
HeartBeatPing = 0xFF
};
/**
- 創(chuàng)建鏈接:
**/
- (BOOL)connect
{
//創(chuàng)建 socket 并設(shè)置socket攜帶信息:SocketOfflineByServer
self.socket.userData = SocketOfflineByServer;
self.socket = [[AsyncSocket alloc] initWithDelegate:self];
[self.socket setRunLoopModes:[NSArray arrayWithObject:NSRunLoopCommonModes]];
//
if (![self.socket isConnected])
{
NSError *error = nil;
BOOL flag = [self.socket connectToHost:kSOCKET_HOST onPort:SOCKET_PORT withTimeout:TIME_OUT error:&error];
if (!flag) {
self.socket.userData = SocketOfflineByWifiCut;
//可以在這里執(zhí)行連接失敗回調(diào)
// if(self.loginBlock){
// self.loginBlock(NO,@"socket連接服務(wù)器失敗丰捷!");
// }
}else{
//可以在這里執(zhí)行連接成功回調(diào)
}
}
return YES;
}
/**
- 連接成功后寂汇,會(huì)調(diào)用這個(gè)方法;
**/
- (void)onSocket:(AsyncSocket *)sock didConnectToHost:(NSString *)host port:(UInt16)port
{
NSLog(@"------socket didConnectToHost---------");
//登錄操作
[self login:nil source:nil];
//連接成功后停巷,設(shè)置心跳榕栏,確保和服務(wù)器的連接
_timer = [NSTimer scheduledTimerWithTimeInterval:HEARTBEAT_INTERVAL target:self selector:@selector(keepLongConnect) userInfo:nil repeats:YES];
[_timer fire];
}
pragma mark 心跳連接
-(void)keepLongConnect{
//循環(huán)向服務(wù)器發(fā)送ping心跳
[self.socket writeData:[self getDataWithInt:HeartBeatPing] withTimeout:TIME_OUT tag:MsgTypePing];
}
pragma mark 登陸
/**
- 登錄之前要執(zhí)行注冊(cè):向服務(wù)器獲取token
- 獲取完token保存到本地
- 然后執(zhí)行登錄操作扒磁,把注冊(cè)獲得的token發(fā)送給服務(wù)器
/
-(void)login:(NSString)sn source:(NSString)source
{
if(sn){
self.sn = sn;
self.source = source;
}
//token:是服務(wù)器分配的唯一碼,類似userID,主要用來(lái)區(qū)分用戶身份;
NSString *token = [[NSUserDefaults standardUserDefaults] stringForKey:kMY_USER_TOKEN];
if (token==nil) {//未注冊(cè)過(guò)
[self regist:sn source:source];
return;
}
if (self.socket!=nil && [self.socket isConnected]) {
//用ProtocolBuffers創(chuàng)建登錄
MsgBuilder builder = [Msg defaultInstance].builder;
builder.msgType = MsgTypeLogin;
Login oneLogin = [[[Login builder] setClientToken:token] build];
builder.login = oneLogin;
// 向socket中寫(xiě)入登錄數(shù)據(jù)
[self.socket writeData:builder.build.data withTimeout:20 tag:MsgTypeLogin];
}else{
[self connect];
}
}
pragma mark 心跳連接
-(void)keepLongConnect{
//循環(huán)向服務(wù)器發(fā)送ping心跳
[self.socket writeData:[self getDataWithInt:HeartBeatPing] withTimeout:TIME_OUT tag:MsgTypePing];
}
發(fā)送普通消息
pragma mark ------收發(fā)消息-------
- (void)sendMessage:(NSData *)msgData
{
[self.socket writeData:msgData withTimeout:20 tag:MsgTypeChat];
}
發(fā)送消息回調(diào):
pragma mark 發(fā)送消息回調(diào)
- (void)onSocket:(AsyncSocket *)sock didWriteDataWithTag:(long)tag
{
switch (tag) {
//成功發(fā)送ping消息
case MsgTypePing:
NSLog(@"保持心跳連接...");
break;
// 成功發(fā)送登錄消息
case MsgTypeLogin:
//繼續(xù)從socket流里讀取數(shù)據(jù)缸榛,讀取完會(huì)調(diào)用:onSocket:(AsyncSocket *)sock didReadData:(NSData *)data withTag:(long)tag
[self.socket readDataWithTimeout:20 buffer:nil bufferOffset:0 maxLength:MAX_BUFFER tag:0];
break;
//成功發(fā)送普通消息
case MsgTypeChat:
break;
default:
break;
}
}
pragma mark 接收服務(wù)器返回消息回調(diào)
- (void)onSocket:(AsyncSocket *)sock didReadData:(NSData *)data withTag:(long)tag
{
if(!data)return;
//是否繼續(xù)等待,如果上一個(gè)data還沒(méi)有滿包钧排,就會(huì)繼續(xù)等待
if(continueWaitData){
//處理接收的data數(shù)據(jù)均澳,不斷累加data
[self handleMsgData:data isContinueData:YES];
}else{
//處理接收的data數(shù)據(jù)
[self handleReceiveData:data];
}
//繼續(xù)從socket流里讀取數(shù)據(jù),讀取完會(huì)調(diào)用:onSocket:(AsyncSocket *)sock didReadData:(NSData *)data withTag:(long)tag
[self.socket readDataWithTimeout:TIME_OUT buffer:nil bufferOffset:0 maxLength:MAX_BUFFER tag:0];
}
/*******處理MsgData******/
-(void) handleMsgData:(NSData *)msgData isContinueData:(BOOL)isContinueData
{
// msgContentLength:是data 頭兩個(gè)字節(jié)指明的數(shù)據(jù)長(zhǎng)度糟袁,但實(shí)際不一定是這個(gè)長(zhǎng)度
NSInteger msgContentLength = 0;
NSInteger length;
/**
- 每個(gè)完整的返回?cái)?shù)據(jù)是這樣的: 77—————————— —————————— ——————————
- 前2位77代表數(shù)據(jù)的長(zhǎng)度纸厉,過(guò)2位之后為數(shù)據(jù)信息,數(shù)據(jù)可能一次返回不完肯尺,分成多段返回躯枢,但余下的分段數(shù)據(jù)頭部不會(huì)包含長(zhǎng)度信息
- 先判斷 是否是繼續(xù)等待的data如果是 那么此data前面無(wú)需去判斷前兩位字節(jié)
- length:表示數(shù)據(jù)頭部長(zhǎng)度信息的長(zhǎng)度
- 如果 isContinueData==true,length==0
**/
if(isContinueData){
//remainDataLength是指上次沒(méi)接收完氓仲,這次還剩下的數(shù)據(jù)長(zhǎng)度
msgContentLength = remainDataLength;
length = 0;
}else{
length = DataLength_Byte;//DataLength_Byte為2得糜,即用2個(gè)字節(jié)來(lái)指明數(shù)據(jù)段的長(zhǎng)度
//取前2個(gè)字節(jié),獲取data前兩個(gè)字節(jié)指定的data長(zhǎng)度
NSData * lengthData = [msgData subdataWithRange:NSMakeRange(0, length)];
msgContentLength = [self getIntWithData:lengthData];
}
//用 msgContentLength 和獲取到的真實(shí)長(zhǎng)度作對(duì)比
//消息的長(zhǎng)度和得到的長(zhǎng)度正好啥箭,直接處理
if(msgContentLength == msgData.length-length){
//要去掉前兩個(gè)字節(jié)(長(zhǎng)度信息)治宣,余下的字節(jié)才是所需要的數(shù)據(jù)
[receiveData appendData:[msgData subdataWithRange:NSMakeRange(length, msgContentLength)]];
[self getCompleteMsgData:receiveData];
//處理粘包問(wèn)題:有冗余字節(jié),所以需要去掉多余的字節(jié)再處理
}else if(msgContentLength < msgData.length-length){
[receiveData appendData:[msgData subdataWithRange:NSMakeRange(length, msgContentLength)]];
[self getCompleteMsgData:receiveData];
//繼續(xù)處理冗余的字節(jié)
[self handleReceiveData:[msgData subdataWithRange:NSMakeRange(length+msgContentLength, msgData.length-length-msgContentLength)]];
//處理分包問(wèn)題:數(shù)據(jù)沒(méi)有一次性返回坏怪,所以需要多次回調(diào)并累加data
}else if(msgContentLength > msgData.length-length){
//remainDataLength:代表還有多長(zhǎng)的數(shù)據(jù)沒(méi)有返回;需要多次回調(diào)
remainDataLength = msgContentLength - (msgData.length - length);
[receiveData appendData:[msgData subdataWithRange:NSMakeRange(length, msgData.length - length)]];
continueWaitData = YES;
}
}
/*******先處理收到的NSData 處理各種情況******/
-(void) handleReceiveData:(NSData *)handleData
{
//數(shù)據(jù)異常,直接丟棄
if( handleData.length > MAX_DATALENGTH){
//斷開(kāi)socke然后重連
[self cutOffSocketConnect];
//只有1字節(jié)翎冲,可能是心跳
}else if(handleData.length == HeartBeat_Byte){
[self handleHeartBeatPingPongWith:handleData];
//>2字節(jié)捉超,是正常信息
}else if(handleData.length > DataLength_Byte){
//先判斷第一個(gè)字節(jié)是否是心跳
if([self handleHeartBeatPingPongWith:handleData] == YES){
[self handleReceiveData:[handleData subdataWithRange:NSMakeRange(HeartBeat_Byte, handleData.length-HeartBeat_Byte)]];
}else{
[self handleMsgData:handleData isContinueData:NO];
}
}
}
/*******處理心跳的ping pong問(wèn)題******/
-(BOOL) handleHeartBeatPingPongWith:(NSData*)data
{
Byte *intByte = (Byte *)[data bytes];
NSInteger heartBeat =intByte[0];
if ( heartBeat == HeartBeatPing) {
//收到ping 給服務(wù)器回一個(gè)Pong
[self.socket writeData:[self getDataWithInt:HeartBeatPong] withTimeout:TIME_OUT tag:MsgTypePong];
return YES;
}else if(heartBeat == HeartBeatPong){
//收到pong 什么都不用處理
return YES;
}else{
return NO;
}
}
/*******把int類型轉(zhuǎn)成一個(gè)字節(jié)的二進(jìn)制 然后轉(zhuǎn)成NSData******/
- (NSData *) getDataWithInt:(NSInteger)value
{
//把整數(shù)存儲(chǔ)到byte數(shù)組,再用byte數(shù)組創(chuàng)建NSData
Byte intByte[1];
intByte[0] = value;
NSData * intData = [NSData dataWithBytes:intByte length:1];
return intData;
}
/*********把NSData 前兩位字節(jié)取出 轉(zhuǎn)成NSData********/
-(NSInteger) getIntWithData:(NSData *)data
{
//把data轉(zhuǎn)成bytes數(shù)組
Byte *intByte = (Byte *)[data bytes];
NSInteger intValue =0;
//先取出第一個(gè)字節(jié)(第一個(gè)字節(jié)存放的是整數(shù))
intValue = intByte[0];
//把 intValue 左移八位如:FF00 再和第二個(gè)字節(jié)相加枝誊,這樣可以保證兩個(gè)字節(jié)同時(shí)存儲(chǔ)
intValue = (intValue << 8) + intByte[1];
return intValue;
}
pragma mark socket連接失敗
- (void)onSocket:(AsyncSocket *)sock willDisconnectWithError:(NSError *)err
{
NSData * unreadData = [sock unreadData]; //讀取未處理的消息
if(unreadData.length > 0) {
[self onSocket:sock didReadData:unreadData withTag:0];
} else {
NSLog(@" DisconnectWithError %ld err = %@",sock.userData,[err description]);
if (err.code == 57) {
self.socket.userData = SocketOfflineByWifiCut;
}
}
}
pragma mark 處理收到的 NSData
/*******處理完整的MsgData******/
-(void) getCompleteMsgData:(NSData *)completeMsgData
{
Msg * msg = [Msg parseFromData:completeMsgData];
[self cleanAllDataMark];
switch (msg.msgType) {
case MsgTypeLoginAck:{//登陸成功
//存儲(chǔ)用戶id
[[NSUserDefaults standardUserDefaults] setObject:msg.loginAck.clientId forKey:kMY_USER_ID];
[[NSUserDefaults standardUserDefaults] synchronize];
[SocketEngine shareInstance].uid = msg.loginAck.clientId;
if (msg.loginAck.status) {//clientToken 失效
self.socket.userData = SocketOfflineByWifiCut;
NSLog(@"-------token失效叶撒,重新注冊(cè)token-------");
[self regist:self.sn source:self.source];
}else{
NSLog(@"--------登陸成功-------");
if (self.loginBlock) {
self.loginBlock(YES,@"登陸成功");
}
}
}
break;
case MsgTypeChat:{//接收普通消息
NSString *clientId = [[NSUserDefaults standardUserDefaults] stringForKey:kMY_USER_ID];
//給服務(wù)器回執(zhí)耐版,確認(rèn)收到消息,否則斷開(kāi)
MsgBuilder builder = [Msg defaultInstance].builder;
builder.msgType = MsgTypeAck;
Ack ack = [[[[Ack builder] setMsgId:msg.chat.msgId] setClientId:clientId] build];
builder.ack = ack;
[self.socket writeData:builder.build.data withTimeout:20 tag:MsgTypeAck];
//去重
BOOL isRepeat = [self checkRepeatMsg:msg.chat.pb_from time:msg.chat.createTime];
if (isRepeat) {
return;
}
if (_timer) {
[_timer setFireDate: [[NSDate date] dateByAddingTimeInterval:HEARTBEAT_INTERVAL]];//timer延后20秒心跳
}
//存儲(chǔ)消息
LSMsgItemInfo item = [[LSMsgItemInfo alloc]init];
item.senderUserId = clientId;
item.peerUserId = msg.chat.pb_from;
item.msgBody = msg.chat.body;
item.createTime = msg.chat.createTime;
item.msgType = [self convertType:msg.chat.bodyType];
item.msgId = [[MsgEngine shareInstance] getMsgId];//[[NSDate date] timeIntervalSince1970]1000;
item.showTime = NO;
item.isGroupMsg = NO;
item.isSender = NO;
item.msgStatus = STATUS_TYPE_SUCCESS;
item.message_id = [NSString stringWithFormat:@"%d",(int)msg.chat.msgId] ;
item.hasReaded = FALSE;
[self saveMessage:item];
//保存未讀消息
if (!_isChatMode) {
[self setUnreadMsg:msg.chat.body userId:clientId peerId:msg.chat.pb_from];
}
//通知視圖更新ui
if (item.msgType == LS_MSG_TYPE_IMAGE ) {
//解析壓縮圖
NSString *url = msg.chat.body;
NSString *suffix = [[msg.chat.body componentsSeparatedByString:@"."] lastObject];
if (suffix) {
NSString *prefix = [msg.chat.body substringToIndex:(msg.chat.body.length-suffix.length-1)];
url = [[prefix stringByAppendingString:@"_s100X100."] stringByAppendingString:suffix];
}
[items setObject:item forKey:[NSString stringWithFormat:@"%llu",item.msgId]];
[self downLoadFile:url fileName:item.msgId tag:-1];
}else if (item.msgType == LS_MSG_TYPE_AUDIO || item.msgType == LS_MSG_TYPE_VIDEO){
if (msg.chat.ext) {
NSDictionary *dic = [msg.chat.ext JSONValue];
item.mediaSecond = [[dic objectForKey:@"voice_length"] longValue];
}
[items setObject:item forKey:[NSString stringWithFormat:@"%llu",item.msgId]];
[self downLoadFile:msg.chat.body fileName:item.msgId tag:-1];
}else{
if (self.updateNewMsg) {
self.updateNewMsg(item);
}
}
SAFELY_RELEASE(item);
break;
}
case MsgTypeAck://消息回執(zhí)
if (!dataBase) {
dataBase = [[MsgDataBase alloc]initWithUserId:[[SocketEngine shareInstance] uid]];
}
//更新存儲(chǔ)記錄,所有msg.ack.msgId 的記錄并且發(fā)送中的 最后一條腺阳,態(tài)置為成功狀態(tài),其余發(fā)送中的msg.ack.msgId為失敗
[self updateMessageStatus:[NSString stringWithFormat:@"%d",(int)msg.ack.msgId] status:STATUS_TYPE_SUCCESS];
//更新ui狀態(tài)
if(self.statusBlock){
self.statusBlock(true, [NSString stringWithFormat:@"%d",(int)msg.ack.msgId]);
}
break;
case MsgTypeOfflineChat://接收離線消息
break;
default:
break;
}
NSLog(@"---------msg:%@",msg);
}