zeroMQ通訊庫--iOS <一>

zeroMQ是號稱史上最好的通訊庫钠右,基于C語言開發(fā)的俐巴,實時流處理sorm的task之間的通信就是用的zeroMQ力崇。zeroMQ在使用模式上支持多種,有req-reply古瓤,publish-subscribe止剖,pipe。我在項目中使用的是req-replay, publish-subscribe兩種方式落君,pipe方式暫時還沒用過滴须,或許后期研究下。zmq框架封裝好了底層實現(xiàn)叽奥,只拋給我們一個socket使用扔水,所以我對socket進行了二次封裝,以滿足我們的業(yè)務(wù)需求朝氓。

先上一個小菜魔市,我個人對req-reply模式的使用和理解。req-replay是請求響應(yīng)機制赵哲,類是http的響應(yīng)請求待德,每請求一次服務(wù)器就獲取一次數(shù)據(jù),這種是應(yīng)答請求模式枫夺。

下面是zmq的應(yīng)答請求創(chuàng)建

ZMQContext *context = [[ZMQContext alloc] initWithIOThreads:1];
ZMQSocket *socket = [ctx socketWithType:ZMQ_REP]; // ZMQ_REP是請求響應(yīng)模式 
NSString *endpoint = @"tcp://localhost:5555"; // 需要綁定的IP地址
BOOL bind = [socket connectToEndpoint:endpoint];
if (!bind) {
    NSLog(@"*** Failed to bind to endpoint [%@].", endpoint);
}

if (![socket sendData:json withFlags:1]) {
    NSLog(@"發(fā)送失敗");
} else {
    NSData *reply = [socket receiveDataWithFlags:0]; // 阻塞當前線程将宪,直到有數(shù)據(jù)返回
}

請求完成銷毀socket和上下文

[context terminate];

這是zmq示例的用法,這個用法是不能滿足在實際項目的使用。這里發(fā)送數(shù)據(jù)使用的是非阻塞式较坛,接收數(shù)據(jù)是使用阻塞式印蔗,當然你也可以使用非阻塞式的,就是withflags:這個來選擇的丑勤,使用非阻塞式就要輪詢socket华嘹,獲取服務(wù)器的返回的數(shù)據(jù)。我用的是阻塞式法竞,這個方式比較符合我們的業(yè)務(wù)需求耙厚。

使用阻塞式首先要解決一個超時的問題,zmq提供給我們兩個oc的操作文件ZMQContextZMQSocket岔霸,里面是沒有提供超時設(shè)置的接口薛躬。zmq在iOS端應(yīng)用比較少,網(wǎng)上可以查閱的資料也很少呆细,最后看了PHP的示例代碼型宝,發(fā)現(xiàn)可以在socket層設(shè)置,然后我就在socket層設(shè)置了侦鹏,并封裝到ZMQSocket诡曙,對外提供了接口loadingtime

阻塞式是肯定不能使用在主線程的略水,我們要另開一個線程來處理這種應(yīng)答請求价卤,為了考慮性能,我們就要面對兩個問題了渊涝。第一個是socket的重用慎璧,不能每次請求都創(chuàng)建socket,請求完成就銷毀socket跨释,第二個是線程的重用胸私。

zmq的應(yīng)答請求,原則上一請求鳖谈,一響應(yīng)岁疼。可是真實的情況是有時候網(wǎng)絡(luò)不好缆娃,請求的響應(yīng)速度變慢了捷绒,然后你重用socket進行請求并發(fā),socket就會出現(xiàn)發(fā)送失敗贯要,還有超時的socket重用也是發(fā)送失敗的暖侨。超時的socket要銷毀,重新開啟socket來處理請求崇渗。

線程的重用是使用RunLoop來實現(xiàn)的字逗,每個子線程都有RunLoop京郑,只是默認不激活,我們要激活RunLoop葫掉,讓線程在發(fā)送請求時工作些举,沒有請求時進入休眠狀態(tài)。

直接上代碼挖息,多說無益了-

// 創(chuàng)建子線程
- (NSThread *)thread {
if (_thread == nil) {
    _thread = [[NSThread alloc] initWithTarget:self selector:@selector(backgroundThread) object:nil];
    _thread.name = @"zmqREQ";
    [_thread start];
}
    return _thread;
}

// 啟動子線程金拒,并激活RunLoop
- (void)backgroundThread {
@autoreleasepool {
    NSThread *currentThread = [NSThread currentThread];
    BOOL isCancelled = [currentThread isCancelled];
    NSRunLoop *currentRunLoop = [NSRunLoop currentRunLoop];
    
    // 開啟runloop
    [currentRunLoop addPort:[NSMachPort port] forMode:NSDefaultRunLoopMode];
    
        while (!isCancelled && [currentRunLoop runMode:NSDefaultRunLoopMode beforeDate:[NSDate distantFuture]]) {
            isCancelled = [currentThread isCancelled];
        }
    } 
}

// 空任務(wù)兽肤,喚醒線程
- (void)closeThread {}

// 提供外部的接口套腹,把參數(shù)傳進來,使用block回調(diào)
- (void)startRequest:(NSDictionary *)params success:(void (^)(id))success failure:(void (^)(NSError *))failure {
    // 先保存block
    NSString *key = [params description];
    if (success != nil) {
        [self.successDict setObject:success forKey:key];
    }
    if (failure != nil) {
        [self.failureDict setObject:failure forKey:key];
    }
    // 異步請求
    [self performSelector:@selector(requestInThread:) onThread:self.thread withObject:params waitUntilDone:NO];
}

// 讓子線程執(zhí)行發(fā)送的請求
- (void)requestInThread:(NSDictionary *)params {
    NSThread *currentThread = [NSThread currentThread];
    // 判斷線程是否已經(jīng)取消
    if (currentThread.isCancelled) { return; }
    
    // 獲取緩存數(shù)組中的socket
    ZMQSocket *socket = self.sockets.lastObject;
    [self.sockets removeLastObject];
    
    // 獲取block
    NSString *key = [params description];
    successType success = self.successDict[key];
    [self.successDict removeObjectForKey:key];
    failureType failure = self.failureDict[key];
    [self.failureDict removeObjectForKey:key];
    
    if (socket == nil) {
        socket = [self.context socketWithType:ZMQ_REQ];
        socket.loadingtime = 5000;
        NSString *endpoint = @"tcp://:41204"; // 服務(wù)器IP地址 
        if (![socket connectToEndpoint:endpoint]) {
            NSLog(@"監(jiān)聽失敗");
            [socket close];
            socket = nil;
        }
        NSLog(@"創(chuàng)建socket");
    }
    
    NSData *json = [NSJSONSerialization dataWithJSONObject:params options:0 error:nil];
    if (![socket sendData:json withFlags:1]) {
        NSLog(@"發(fā)送失敗");
        dispatch_async(dispatch_get_main_queue(), ^{
            if (failure) {
                failure(nil);
            }
        });
    }else{
        if (socket == nil) return ;
        NSData *reply = [socket receiveDataWithFlags:0]; // 阻塞當前線程资铡,直到有數(shù)據(jù)返回
        // 判斷線程是否已經(jīng)取消
        if (currentThread.isCancelled) { return; }
        id data = nil;
        if (reply) {
            data = [NSJSONSerialization JSONObjectWithData:reply options:0 error:nil];
            [self.sockets addObject:socket];
        } else {
            [socket close];
            socket = nil;
        }
        dispatch_async(dispatch_get_main_queue(), ^{
          if (success) {
              success(data);
            }
        });
    }  
}

要激活RunLoop必須要有事件源和時鐘电禀,我這里用的是事件源,設(shè)置端口笤休,讓子線程接收其他線程的事件信號尖飞。這里要注意,使用 [currentRunLoop run] 方法店雅,RunLoop就停不下來了政基,使用runMode:beforeDate:可以控制RunLoop的生命周期。子線程的代碼運行到while (!isCancelled && [currentRunLoop runMode:NSDefaultRunLoopMode beforeDate:[NSDate distantFuture]])就會進入循環(huán)并休眠闹啦,當子線程接收到任務(wù)信號時就會被喚醒并執(zhí)行任務(wù)沮明,執(zhí)行完任務(wù)就執(zhí)行while (!isCancelled && [currentRunLoop runMode:NSDefaultRunLoopMode beforeDate:[NSDate distantFuture]])這行代碼,并判斷是否達到運行的期限窍奋,如果沒有則繼續(xù)休眠荐健,反之就退出RunLoop結(jié)束子線程。這里是用了線程的取消標記來控制琳袄,如果線程已經(jīng)取消了江场,就讓RunLoop不執(zhí)行[currentRunLoop runMode:NSDefaultRunLoopMode beforeDate:[NSDate distantFuture],讓RunLoop退出循環(huán)窖逗,不設(shè)置時間期限了址否。因而,我創(chuàng)建一個空任務(wù)碎紊,當要關(guān)閉子線程時佑附,給子線程一個空任務(wù)達到喚醒子線程的目的,然后子線程進入判斷矮慕,并退出RunLoop帮匾。

我這里使用NSMutableArray來存儲socket和NSMutableDictionary存儲回調(diào)的block,這兩個是線程不安全的痴鳄。sockets只在子線程操作瘟斜,這不會產(chǎn)生數(shù)據(jù)爭奪。回調(diào)block是在兩個線程操作螺句,但是利用dictionary的特性虽惭,我這樣操作是沒有影響的,我做過大量的測試蛇尚,如果你們在使用中出現(xiàn)線程問題芽唇,可以加鎖。

這篇博客是講述zmq的應(yīng)答模式取劫,我封裝的代碼和改過的zeroMQ文件都放在 github 匆笤。

下一篇寫對zmq訂閱模式的使用和理解,歡迎關(guān)注谱邪。

參考

zeroMQ使用指導(dǎo) http://zguide.zeromq.org/page:all

zeroMQ的示例程序 https://github.com/imatix/zguide.git

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末炮捧,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子惦银,更是在濱河造成了極大的恐慌咆课,老刑警劉巖,帶你破解...
    沈念sama閱讀 221,695評論 6 515
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件扯俱,死亡現(xiàn)場離奇詭異书蚪,居然都是意外死亡,警方通過查閱死者的電腦和手機迅栅,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 94,569評論 3 399
  • 文/潘曉璐 我一進店門殊校,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人库继,你說我怎么就攤上這事箩艺。” “怎么了宪萄?”我有些...
    開封第一講書人閱讀 168,130評論 0 360
  • 文/不壞的土叔 我叫張陵艺谆,是天一觀的道長。 經(jīng)常有香客問我拜英,道長静汤,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 59,648評論 1 297
  • 正文 為了忘掉前任居凶,我火速辦了婚禮虫给,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘侠碧。我一直安慰自己抹估,他們只是感情好,可當我...
    茶點故事閱讀 68,655評論 6 397
  • 文/花漫 我一把揭開白布弄兜。 她就那樣靜靜地躺著药蜻,像睡著了一般瓷式。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上语泽,一...
    開封第一講書人閱讀 52,268評論 1 309
  • 那天贸典,我揣著相機與錄音,去河邊找鬼踱卵。 笑死廊驼,一個胖子當著我的面吹牛,可吹牛的內(nèi)容都是我干的惋砂。 我是一名探鬼主播妒挎,決...
    沈念sama閱讀 40,835評論 3 421
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼班利!你這毒婦竟也來了饥漫?” 一聲冷哼從身側(cè)響起榨呆,我...
    開封第一講書人閱讀 39,740評論 0 276
  • 序言:老撾萬榮一對情侶失蹤罗标,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后积蜻,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體闯割,經(jīng)...
    沈念sama閱讀 46,286評論 1 318
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 38,375評論 3 340
  • 正文 我和宋清朗相戀三年竿拆,在試婚紗的時候發(fā)現(xiàn)自己被綠了宙拉。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 40,505評論 1 352
  • 序言:一個原本活蹦亂跳的男人離奇死亡丙笋,死狀恐怖谢澈,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情御板,我是刑警寧澤锥忿,帶...
    沈念sama閱讀 36,185評論 5 350
  • 正文 年R本政府宣布,位于F島的核電站怠肋,受9級特大地震影響敬鬓,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜笙各,卻給世界環(huán)境...
    茶點故事閱讀 41,873評論 3 333
  • 文/蒙蒙 一钉答、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧杈抢,春花似錦数尿、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,357評論 0 24
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽虏缸。三九已至,卻和暖如春嫩实,著一層夾襖步出監(jiān)牢的瞬間刽辙,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 33,466評論 1 272
  • 我被黑心中介騙來泰國打工甲献, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留宰缤,地道東北人。 一個月前我還...
    沈念sama閱讀 48,921評論 3 376
  • 正文 我出身青樓晃洒,卻偏偏與公主長得像慨灭,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子球及,可洞房花燭夜當晚...
    茶點故事閱讀 45,515評論 2 359

推薦閱讀更多精彩內(nèi)容