zeroMQ是號稱史上最好的通訊庫钠右,基于C語言開發(fā)的俐巴,實時流處理sorm的task之間的通信就是用的zeroMQ力崇。zeroMQ在使用模式上支持多種,有req-reply古瓤,publish-subscribe止剖,pipe。我在項目中使用的是req-replay, publish-subscribe兩種方式落君,pipe方式暫時還沒用過滴须,或許后期研究下。zmq框架封裝好了底層實現(xiàn)叽奥,只拋給我們一個socket使用扔水,所以我對socket進行了二次封裝,以滿足我們的業(yè)務(wù)需求朝氓。
先上一個小菜魔市,我個人對req-reply模式的使用和理解。req-replay是請求響應(yīng)機制赵哲,類是http的響應(yīng)請求待德,每請求一次服務(wù)器就獲取一次數(shù)據(jù),這種是應(yīng)答請求模式枫夺。
下面是zmq的應(yīng)答請求創(chuàng)建
ZMQContext *context = [[ZMQContext alloc] initWithIOThreads:1];
ZMQSocket *socket = [ctx socketWithType:ZMQ_REP]; // ZMQ_REP是請求響應(yīng)模式
NSString *endpoint = @"tcp://localhost:5555"; // 需要綁定的IP地址
BOOL bind = [socket connectToEndpoint:endpoint];
if (!bind) {
NSLog(@"*** Failed to bind to endpoint [%@].", endpoint);
}
if (![socket sendData:json withFlags:1]) {
NSLog(@"發(fā)送失敗");
} else {
NSData *reply = [socket receiveDataWithFlags:0]; // 阻塞當前線程将宪,直到有數(shù)據(jù)返回
}
請求完成銷毀socket和上下文
[context terminate];
這是zmq示例的用法,這個用法是不能滿足在實際項目的使用。這里發(fā)送數(shù)據(jù)使用的是非阻塞式较坛,接收數(shù)據(jù)是使用阻塞式印蔗,當然你也可以使用非阻塞式的,就是withflags:這個來選擇的丑勤,使用非阻塞式就要輪詢socket华嘹,獲取服務(wù)器的返回的數(shù)據(jù)。我用的是阻塞式法竞,這個方式比較符合我們的業(yè)務(wù)需求耙厚。
使用阻塞式首先要解決一個超時的問題,zmq提供給我們兩個oc的操作文件ZMQContext和ZMQSocket岔霸,里面是沒有提供超時設(shè)置的接口薛躬。zmq在iOS端應(yīng)用比較少,網(wǎng)上可以查閱的資料也很少呆细,最后看了PHP的示例代碼型宝,發(fā)現(xiàn)可以在socket層設(shè)置,然后我就在socket層設(shè)置了侦鹏,并封裝到ZMQSocket诡曙,對外提供了接口loadingtime。
阻塞式是肯定不能使用在主線程的略水,我們要另開一個線程來處理這種應(yīng)答請求价卤,為了考慮性能,我們就要面對兩個問題了渊涝。第一個是socket的重用慎璧,不能每次請求都創(chuàng)建socket,請求完成就銷毀socket跨释,第二個是線程的重用胸私。
zmq的應(yīng)答請求,原則上一請求鳖谈,一響應(yīng)岁疼。可是真實的情況是有時候網(wǎng)絡(luò)不好缆娃,請求的響應(yīng)速度變慢了捷绒,然后你重用socket進行請求并發(fā),socket就會出現(xiàn)發(fā)送失敗贯要,還有超時的socket重用也是發(fā)送失敗的暖侨。超時的socket要銷毀,重新開啟socket來處理請求崇渗。
線程的重用是使用RunLoop來實現(xiàn)的字逗,每個子線程都有RunLoop京郑,只是默認不激活,我們要激活RunLoop葫掉,讓線程在發(fā)送請求時工作些举,沒有請求時進入休眠狀態(tài)。
直接上代碼挖息,多說無益了-
// 創(chuàng)建子線程
- (NSThread *)thread {
if (_thread == nil) {
_thread = [[NSThread alloc] initWithTarget:self selector:@selector(backgroundThread) object:nil];
_thread.name = @"zmqREQ";
[_thread start];
}
return _thread;
}
// 啟動子線程金拒,并激活RunLoop
- (void)backgroundThread {
@autoreleasepool {
NSThread *currentThread = [NSThread currentThread];
BOOL isCancelled = [currentThread isCancelled];
NSRunLoop *currentRunLoop = [NSRunLoop currentRunLoop];
// 開啟runloop
[currentRunLoop addPort:[NSMachPort port] forMode:NSDefaultRunLoopMode];
while (!isCancelled && [currentRunLoop runMode:NSDefaultRunLoopMode beforeDate:[NSDate distantFuture]]) {
isCancelled = [currentThread isCancelled];
}
}
}
// 空任務(wù)兽肤,喚醒線程
- (void)closeThread {}
// 提供外部的接口套腹,把參數(shù)傳進來,使用block回調(diào)
- (void)startRequest:(NSDictionary *)params success:(void (^)(id))success failure:(void (^)(NSError *))failure {
// 先保存block
NSString *key = [params description];
if (success != nil) {
[self.successDict setObject:success forKey:key];
}
if (failure != nil) {
[self.failureDict setObject:failure forKey:key];
}
// 異步請求
[self performSelector:@selector(requestInThread:) onThread:self.thread withObject:params waitUntilDone:NO];
}
// 讓子線程執(zhí)行發(fā)送的請求
- (void)requestInThread:(NSDictionary *)params {
NSThread *currentThread = [NSThread currentThread];
// 判斷線程是否已經(jīng)取消
if (currentThread.isCancelled) { return; }
// 獲取緩存數(shù)組中的socket
ZMQSocket *socket = self.sockets.lastObject;
[self.sockets removeLastObject];
// 獲取block
NSString *key = [params description];
successType success = self.successDict[key];
[self.successDict removeObjectForKey:key];
failureType failure = self.failureDict[key];
[self.failureDict removeObjectForKey:key];
if (socket == nil) {
socket = [self.context socketWithType:ZMQ_REQ];
socket.loadingtime = 5000;
NSString *endpoint = @"tcp://:41204"; // 服務(wù)器IP地址
if (![socket connectToEndpoint:endpoint]) {
NSLog(@"監(jiān)聽失敗");
[socket close];
socket = nil;
}
NSLog(@"創(chuàng)建socket");
}
NSData *json = [NSJSONSerialization dataWithJSONObject:params options:0 error:nil];
if (![socket sendData:json withFlags:1]) {
NSLog(@"發(fā)送失敗");
dispatch_async(dispatch_get_main_queue(), ^{
if (failure) {
failure(nil);
}
});
}else{
if (socket == nil) return ;
NSData *reply = [socket receiveDataWithFlags:0]; // 阻塞當前線程资铡,直到有數(shù)據(jù)返回
// 判斷線程是否已經(jīng)取消
if (currentThread.isCancelled) { return; }
id data = nil;
if (reply) {
data = [NSJSONSerialization JSONObjectWithData:reply options:0 error:nil];
[self.sockets addObject:socket];
} else {
[socket close];
socket = nil;
}
dispatch_async(dispatch_get_main_queue(), ^{
if (success) {
success(data);
}
});
}
}
要激活RunLoop必須要有事件源和時鐘电禀,我這里用的是事件源,設(shè)置端口笤休,讓子線程接收其他線程的事件信號尖飞。這里要注意,使用 [currentRunLoop run] 方法店雅,RunLoop就停不下來了政基,使用runMode:beforeDate:可以控制RunLoop的生命周期。子線程的代碼運行到while (!isCancelled && [currentRunLoop runMode:NSDefaultRunLoopMode beforeDate:[NSDate distantFuture]])就會進入循環(huán)并休眠闹啦,當子線程接收到任務(wù)信號時就會被喚醒并執(zhí)行任務(wù)沮明,執(zhí)行完任務(wù)就執(zhí)行while (!isCancelled && [currentRunLoop runMode:NSDefaultRunLoopMode beforeDate:[NSDate distantFuture]])這行代碼,并判斷是否達到運行的期限窍奋,如果沒有則繼續(xù)休眠荐健,反之就退出RunLoop結(jié)束子線程。這里是用了線程的取消標記來控制琳袄,如果線程已經(jīng)取消了江场,就讓RunLoop不執(zhí)行[currentRunLoop runMode:NSDefaultRunLoopMode beforeDate:[NSDate distantFuture],讓RunLoop退出循環(huán)窖逗,不設(shè)置時間期限了址否。因而,我創(chuàng)建一個空任務(wù)碎紊,當要關(guān)閉子線程時佑附,給子線程一個空任務(wù)達到喚醒子線程的目的,然后子線程進入判斷矮慕,并退出RunLoop帮匾。
我這里使用NSMutableArray來存儲socket和NSMutableDictionary存儲回調(diào)的block,這兩個是線程不安全的痴鳄。sockets只在子線程操作瘟斜,這不會產(chǎn)生數(shù)據(jù)爭奪。回調(diào)block是在兩個線程操作螺句,但是利用dictionary的特性虽惭,我這樣操作是沒有影響的,我做過大量的測試蛇尚,如果你們在使用中出現(xiàn)線程問題芽唇,可以加鎖。
這篇博客是講述zmq的應(yīng)答模式取劫,我封裝的代碼和改過的zeroMQ文件都放在 github 匆笤。
下一篇寫對zmq訂閱模式的使用和理解,歡迎關(guān)注谱邪。
參考
zeroMQ使用指導(dǎo) http://zguide.zeromq.org/page:all
zeroMQ的示例程序 https://github.com/imatix/zguide.git