一 : Stomp
HTTP處在應(yīng)用層,而WebSocket處在TCP上,并且內(nèi)容不多,是一個消息架構(gòu),不包含特定的解釋協(xié)議,所以還得有專門的協(xié)議來解釋消息,有很多,Stomp是其中之一.
stomp以幀來封裝消息,一個幀由一個命令,加上header(可以是多個),再加上body(文本或二進(jìn)制),組裝出來的是一段字符串.
命令的類型:
CONNECT簇抵、SEND吉殃、SUBSCRIBE偎血、UNSUBSCRIBE哪亿、BEGIN、COMMIT、ABORT、ACK、NACK确买、DISCONNECT
例如發(fā)送一個消息
SEND
destination:/queue/a
content-type:text/plain
hello queue a
^@
訂閱消息
SUBSCRIBE
id:0
destination:/queue/foo
ack:client
^@
二 : SocketRocket
SocketRocket是Facebook維護(hù)的iOS和mac os 上的webSocket庫,是OC實現(xiàn)的,是比較推薦的一個.
1.建立連接
Springboot基于STOMP實現(xiàn)的webSocket可以將http模擬成Socket,所以建立連接的url可能是一個"http://"
在iOS端SocketRocket庫也可以支持STOMP.
pod 'SocketRocket'
var request = URLRequest.init(url: .init(string: "")!, cachePolicy: .useProtocolCachePolicy, timeoutInterval: 10)
//給request header添加一些key-value
request.setValue("", forHTTPHeaderField: "")
socket = SRWebSocket.init(urlRequest: request)
url可以是http(或者ws;wss)://域名(或者IP):端口/路徑(/websocket)
最后可以加上"/websocket"強制使用websocket協(xié)議
例如http://test.com:9090/test/websocket
2.但是SocketRocket并沒有實現(xiàn)Stomp協(xié)議的相關(guān)API,所以如果需要發(fā)送幀,就需要手動拼寫frame;
就是命令 + 換行 + headerkey + : + headerValue + ... + 換行 + body + 終止字符
同樣接受到的消息也是一個Frame,也需要解析
private func sendFrame(command: String?, header: [String: String]?, body: AnyObject?) {
var frameString = ""
if command != nil {
frameString = command! + "\n"
}
if let header = header {
for (key, value) in header {
frameString += key
frameString += ":"
frameString += value
frameString += "\n"
}
}
if let body = body as? String {
frameString += "\n"
frameString += body
} else if let _ = body as? NSData {
}
if body == nil {
frameString += "\n"
}
frameString += String(format: "%C", arguments: [0x00])
if socket?.readyState == .OPEN {
do{
try self.socket?.send(string: frameString)
}catch{}
}
}
三:StompKit
StompKit提供了封裝和解析Frame的方法;以及CONNECT SUBSCRIBE ACK等命令的方法;
StompKit本身是基于CocoaAsyncSocket的,是純OC的
1.預(yù)定義
#define kCommandAbort @"ABORT"
#define kCommandAck @"ACK"
#define kCommandBegin @"BEGIN"
#define kCommandCommit @"COMMIT"
#define kCommandConnect @"CONNECT"
#define kCommandConnected @"CONNECTED"
#define kCommandDisconnect @"DISCONNECT"
#define kCommandError @"ERROR"
#define kCommandMessage @"MESSAGE"
#define kCommandNack @"NACK"
#define kCommandReceipt @"RECEIPT"
#define kCommandSend @"SEND"
#define kCommandSubscribe @"SUBSCRIBE"
#define kCommandUnsubscribe @"UNSUBSCRIBE"
#pragma mark Control characters
#define kLineFeed @"\x0A"
#define kNullChar @"\x00"
#define kHeaderSeparator @":"
2.構(gòu)造Frame
- (NSString *)toString {
NSMutableString *frame = [NSMutableString stringWithString: [self.command stringByAppendingString:kLineFeed]];
for (id key in self.headers) {
[frame appendString:[NSString stringWithFormat:@"%@%@%@%@", key, kHeaderSeparator, self.headers[key], kLineFeed]];
}
[frame appendString:kLineFeed];
if (self.body) {
[frame appendString:self.body];
}
[frame appendString:kNullChar];
return frame;
}
解析Frame
+ (STOMPFrame *) STOMPFrameFromData:(NSData *)data {
NSData *strData = [data subdataWithRange:NSMakeRange(0, [data length])];
NSString *msg = [[NSString alloc] initWithData:strData encoding:NSUTF8StringEncoding];
LogDebug(@"<<< %@", msg);
NSMutableArray *contents = (NSMutableArray *)[[msg componentsSeparatedByString:kLineFeed] mutableCopy];
while ([contents count] > 0 && [contents[0] isEqual:@""]) {
[contents removeObjectAtIndex:0];
}
NSString *command = [[contents objectAtIndex:0] copy];
NSMutableDictionary *headers = [[NSMutableDictionary alloc] init];
NSMutableString *body = [[NSMutableString alloc] init];
BOOL hasHeaders = NO;
[contents removeObjectAtIndex:0];
for(NSString *line in contents) {
if(hasHeaders) {
for (int i=0; i < [line length]; i++) {
unichar c = [line characterAtIndex:i];
if (c != '\x00') {
[body appendString:[NSString stringWithFormat:@"%c", c]];
}
}
} else {
if ([line isEqual:@""]) {
hasHeaders = YES;
} else {
NSMutableArray *parts = [NSMutableArray arrayWithArray:[line componentsSeparatedByString:kHeaderSeparator]];
// key ist the first part
NSString *key = parts[0];
[parts removeObjectAtIndex:0];
headers[key] = [parts componentsJoinedByString:kHeaderSeparator];
}
}
}
return [[STOMPFrame alloc] initWithCommand:command headers:headers body:body];
}
3.訂閱的同時維護(hù)一個字典(subscriptions)來存儲頻道和收到消息的回調(diào)block
- (STOMPSubscription *)subscribeTo:(NSString *)destination
headers:(NSDictionary *)headers
messageHandler:(STOMPMessageHandler)handler {
NSMutableDictionary *subHeaders = [[NSMutableDictionary alloc] initWithDictionary:headers];
subHeaders[kHeaderDestination] = destination;
NSString *identifier = subHeaders[kHeaderID];
if (!identifier) {
identifier = [NSString stringWithFormat:@"sub-%d", idGenerator++];
subHeaders[kHeaderID] = identifier;
}
self.subscriptions[identifier] = handler;
[self sendFrameWithCommand:kCommandSubscribe
headers:subHeaders
body:nil];
return [[STOMPSubscription alloc] initWithClient:self identifier:identifier];
}
四 : WebsocketStompKit
WebsocketStompKit是用Jetfire為基礎(chǔ),然后再結(jié)合StompKit的思路來封裝Frame,和connect,subscribe等操作
Jetfire還有一個swift版本叫starscream,不過Jetfire用的比較少
五 : StompClientLib
基于socketRocket然后封裝了stomp協(xié)議相關(guān)操作的庫,并且stomp部分是用swift實現(xiàn)的.
不過代碼比較舊,而且個人認(rèn)為有很多不太好的邏輯;
不過實質(zhì)就是對STOMP協(xié)議的封裝和解析,沒有很多代碼,這個庫就一個文件;
所以建議直接放到項目里,根據(jù)實際業(yè)務(wù)直接魔改.
連接
var socketClient = StompClientLib()
let url = NSURL(string: "your-socket-url-is-here")!
socketClient.openSocketWithURLRequest(request: NSURLRequest(url: url as URL) , delegate: self)
訂閱
let destination = "/topic/your_topic"
let ack = destination
let id = destination
let header = ["destination": destination, "ack": ack, "id": id]
// subscribe
socketClient?.subscribeWithHeader(destination: destination, withHeader: header)
// unsubscribe
socketClient?.unsubscribe(destination: subsId)
在實際使用的時候發(fā)現(xiàn)幾個問題:
- func openSocket()方法判斷了socketRocket在非.CLOSED的狀態(tài)進(jìn)入open();但是webSocket還有一個.CLOSING狀態(tài),如果做了多服務(wù)器支持,其中一臺掛掉的時候,可能會長期處理這個狀態(tài),所以我也加上了;
另外現(xiàn)在iOS廢棄了SSL免認(rèn)證的API ,所以certificateCheckEnabled我也刪了
private func openSocket() {
if socket == nil || socket?.readyState == .CLOSED || socket?.readyState == .CLOSING{
self.socket = SRWebSocket(urlRequest: urlRequest! as URLRequest)
socket!.delegate = self
socket!.open()
}
}
2.我覺著connection屬性沒有很好的發(fā)揮作用,我修改了下,在webSocketDidOpen()時設(shè)置為true;
在webSocket(_ webSocket: SRWebSocket, didCloseWithCode code: Int, reason: String?, wasClean: Bool) 設(shè)置為false;其他地方都不需要賦值;
3.在func closeSocket()中,self.socket!.close()之后,設(shè)置delegate=nil和socket=nil;
這個也是有有問題的,這樣在主動斷開連接之后,收不到func webSocket(_ webSocket: SRWebSocket, didCloseWithCode code: Int, reason: String?, wasClean: Bool)代理方法的回調(diào);
所以我也修改了,結(jié)合上面的第2點.
private func closeSocket(){
if let delegate = delegate {
DispatchQueue.main.async(execute: {
delegate.stompClientDidDisconnect(client: self)
if self.socket != nil {
// Close the socket
self.socket!.close()
}
})
}
}
4.reconnectTimer在調(diào)用stopReconnect()之后還是在執(zhí)行,我直接換成了DispatchSourceTimer
public func reconnect(request: NSURLRequest, delegate: StompClientLibDelegate, connectionHeaders: [String: String] = [String: String](), time: Double = 1.0, exponentialBackoff: Bool = true){
reconnectTimer = DispatchSource.makeTimerSource(flags: .strict, queue: .main)
reconnectTimer?.schedule(deadline: .now(), repeating: time)
reconnectTimer?.setEventHandler(handler: {
self.reconnectLogic(request: request, delegate: delegate
, connectionHeaders: connectionHeaders)
})
reconnectTimer?.resume()
}