ZeroMQ在使用模式上支持多種柠衅,有req-reply挡毅,publish-subscribe集歇。訂閱模式是zmq的重頭戲,以鄙人的使用和理解來淺談下訂閱模式想暗,本人也是邊學(xué)邊用,如有問題請(qǐng)大神指出帘不。關(guān)于應(yīng)答模式請(qǐng)閱讀 這篇文章 说莫。
zmq的訂閱模式實(shí)現(xiàn)示例代碼如下
ZMQContext *ctx = [[ZMQContext alloc] initWithIOThreads:1U];
// Socket to talk to server
ZMQSocket *subscriber = [ctx socketWithType:ZMQ_SUB];
if (![subscriber connectToEndpoint:@"tcp://localhost:5556"]) {
return EXIT_FAILURE;
}
const char *kNYCZipCode = "10001";
const char *filter = (argc > 1)? argv[1] : kNYCZipCode;
NSData *filterData = [NSData dataWithBytes:filter length:strlen(filter)];
[subscriber setData:filterData forOption:ZMQ_SUBSCRIBE];
(void)setvbuf(stdout, NULL, _IONBF, BUFSIZ);
const int kMaxUpdate = 100;
long total_temp = 0;
for (int update_nbr = 0; update_nbr < kMaxUpdate; ++update_nbr) {
NSData *msg = [subscriber receiveDataWithFlags:0];
const char *string = [msg bytes];
int zipcode = 0, temperature = 0, relhumidity = 0;
(void)sscanf(string, "%d %d %d", &zipcode, &temperature, &relhumidity);
printf("%d ", temperature);
total_temp += temperature;
}
訂閱模式的重點(diǎn)在于數(shù)據(jù)分發(fā),數(shù)據(jù)由服務(wù)器主動(dòng)推回來給客戶端寞焙,訂閱的數(shù)據(jù)統(tǒng)一接收储狭,然后在分發(fā)到需要數(shù)據(jù)處理的對(duì)象中。示例代碼只是簡(jiǎn)單的接收數(shù)據(jù)捣郊,它的做法是一個(gè)線程接收一個(gè)訂閱消息辽狈,而且數(shù)據(jù)接收是阻塞當(dāng)前線程的,這個(gè)在實(shí)際的項(xiàng)目應(yīng)用也是不符合呛牲,所以封裝和優(yōu)化數(shù)據(jù)分發(fā)是必須的刮萌。
NSData *msg = [subscriber receiveDataWithFlags:0]; // 阻塞線程直到接收到數(shù)據(jù)
數(shù)據(jù)的處理分發(fā)采用代理來回調(diào),當(dāng)然也可以使用block娘扩,個(gè)人建議使用代理着茸,畢竟數(shù)據(jù)分發(fā)到不同的處理對(duì)象中,使用代理可以很好的避免循環(huán)引用琐旁,避免內(nèi)存泄漏涮阔。我這里是使用一個(gè)兩層Dictionary來存儲(chǔ)代理,dictionary的遍歷速度要比數(shù)組好一點(diǎn)旋膳。第一層dictionary以訂閱碼來做key澎语,第二層dictionary以代理對(duì)象的hash值為key,這樣可以快速找到代理對(duì)象進(jìn)行分發(fā),也可以準(zhǔn)確找到某個(gè)代理對(duì)象并實(shí)現(xiàn)移除操作擅羞。
添加代理對(duì)象的方法
- (void)setCode:(NSString *)code withDelegate:(id<ZMQSubscriptionDelegate>)delegate {
NSAssert(delegate != nil, @"代理為空");
NSAssert(code != nil, @"訂閱碼為空");
// 取出當(dāng)前訂閱碼的代理字典
NSMutableDictionary *delegateDict = self.delegates[code];
if (delegateDict == nil) { // 不存在當(dāng)前的訂閱碼代理對(duì)象
// 創(chuàng)建當(dāng)前訂閱碼的代理對(duì)象的字典
delegateDict = [NSMutableDictionary dictionary];
self.delegates[code] = delegateDict;
}
// 添加代理對(duì)象
NSString *key = [NSString stringWithFormat:@"%ld", (unsigned long)[delegate hash]];
delegateDict[key] = delegate;
}
移除代理對(duì)象的方法
- (void)removeDelegate:(NSString *)code withDelegate:(id<ZMQSubscriptionDelegate>)delegate {
NSAssert(delegate != nil, @"代理為空");
NSAssert(code != nil, @"訂閱碼為空");
// 取出當(dāng)前訂閱碼的代理字典
NSMutableDictionary *delegateDict = self.delegates[code];
if (delegateDict == nil) return; // 不存在當(dāng)前的訂閱碼代理對(duì)象
NSString *key = [NSString stringWithFormat:@"%ld", (unsigned long)[delegate hash]];
[delegateDict removeObjectForKey:key];
}
注意尸变,當(dāng)對(duì)象不需要處理訂閱消息時(shí),一定要調(diào)用removeDelegate方法减俏,因?yàn)榇韺?duì)象是存儲(chǔ)到字典中的召烂,持有代理對(duì)象的強(qiáng)引用。
zmq要訂閱什么消息就發(fā)對(duì)應(yīng)的訂閱碼娃承,如果要全部訂閱奏夫,就發(fā)一個(gè)空字符串,在子線程使用一個(gè)while循環(huán)來接收數(shù)據(jù)历筝,定義一個(gè)標(biāo)志位酗昼,讓while循環(huán)可以控制,更改后的代碼如下
_context = [[ZMQContext alloc] initWithIOThreads:1];
_socket = [_context socketWithType:ZMQ_SUB];
_socket.loadingtime = 3000; // 超時(shí)時(shí)間梳猪,單位毫秒
NSString *endpoint = @"tcp://:41204"; // 服務(wù)器IP地址
if (![_socket connectToEndpoint:endpoint]) {
NSLog(@"訂閱失敗");
return;
}
NSData *filterData = [@"" dataUsingEncoding:NSUTF8StringEncoding];
[_socket setData:filterData forOption:ZMQ_SUBSCRIBE];
(void)setvbuf(stdout, NULL, _IONBF, BUFSIZ);
closeSocket = NO;
while (!closeSocket) {
@autoreleasepool {
NSData *recieveData = [_socket receiveDataWithFlags:0];
if (recieveData == nil) continue; // 訂閱消息超時(shí)返回nil
// 數(shù)據(jù)處理
NSString *dataStr = [[NSString alloc] initWithData:recieveData encoding:NSUTF8StringEncoding];
NSRange range = [dataStr rangeOfString:@"{" options:NSCaseInsensitiveSearch];
NSString *subStr = [dataStr substringFromIndex:range.location];
NSString *codeKey = [dataStr substringToIndex:range.location];
/*** 數(shù)據(jù)分發(fā) ***/
// 取出當(dāng)前訂閱碼的代理字典
NSMutableDictionary *delegateDict = self.delegates[codeKey];
if (delegateDict == nil) continue; // 沒有對(duì)當(dāng)前訂閱碼的代理對(duì)象
[delegateDict enumerateKeysAndObjectsUsingBlock:^(id _Nonnull key, id _Nonnull obj, BOOL * _Nonnull stop) {
[self handleData:subStr delegate:obj code:codeKey];
}];
}
}
zmq訂閱返回的數(shù)據(jù)格式是訂閱碼拼接上推送的數(shù)據(jù)麻削,當(dāng)接收的數(shù)據(jù)為nil,那就是接收超時(shí)了春弥,zmq訂閱只要接收到數(shù)據(jù)呛哟,肯定不為空,必然有訂閱碼匿沛。這里的數(shù)據(jù)處理方式是因?yàn)槲翼?xiàng)目服務(wù)器返回的是訂閱碼拼接json的數(shù)據(jù)扫责,要根據(jù)服務(wù)器定義的數(shù)據(jù)格式來處理數(shù)據(jù)。注意逃呼,這里是有一個(gè)坑鳖孤,要在循環(huán)內(nèi)使用@ autoreleasepool {}。一個(gè)對(duì)象只要出了作用的使用區(qū)域抡笼,就會(huì)自動(dòng)釋放了淌铐,可是當(dāng)使用區(qū)域是一個(gè)while循環(huán)時(shí),系統(tǒng)會(huì)認(rèn)為你的對(duì)象還要使用就不會(huì)釋放對(duì)象蔫缸。所以,在循環(huán)里加上autoreleasepool標(biāo)記际起,不管數(shù)據(jù)傳遞到哪個(gè)對(duì)象中使用拾碌,只有沒有對(duì)象牽引著數(shù)據(jù)對(duì)象,就會(huì)釋放了街望。
我在使用zmq訂閱模式的時(shí)候發(fā)現(xiàn)兩個(gè)問題校翔。zmq訂閱模式有自動(dòng)重新連接的機(jī)制,比如斷網(wǎng)重連灾前,服務(wù)器斷開重連防症,但是超過一分鐘好像就重連不了,我測(cè)試了很多次,可是安卓組的同事使用zmq訂閱不會(huì)遇到重連不了的問題蔫敲。因?yàn)橹剡B不了饲嗽,我要關(guān)閉訂閱socket,重新開啟訂閱socket奈嘿,發(fā)現(xiàn)一關(guān)閉socket就馬上開啟socket貌虾,zmq底層庫就報(bào)錯(cuò),然后程序直接崩潰了裙犹,所以我的啟動(dòng)訂閱的方法要判斷是否socket的關(guān)閉尽狠。
- (void)start {
if (closeSocket) {
[self performSelector:@selector(startConnect) withObject:nil afterDelay:1.0];
return;
}
[self startConnect];
}
zmq的訂閱模式使用起來還是很穩(wěn)定的,相對(duì)來說zmq的應(yīng)答模式會(huì)出現(xiàn)請(qǐng)求丟棄的問題叶圃。因?yàn)楸救说捻?xiàng)目使用zmq通訊庫袄膏,后期還會(huì)繼續(xù)研究。
示例代碼都放在 github 掺冠。
參考
zeroMQ使用指導(dǎo) http://zguide.zeromq.org/page:all
zeroMQ的示例程序 https://github.com/imatix/zguide.git