ReactiveCocoa 中 RACSignal 是如何發(fā)送信號(hào)的

前言

ReactiveCocoa是一個(gè)(第一個(gè)虚汛?)將函數(shù)響應(yīng)式編程范例帶入Objective-C的開源庫匾浪。ReactiveCocoa是由Josh AbernathyJustin Spahr-Summers 兩位大神在對GitHub for Mac的開發(fā)過程中編寫的。Justin Spahr-Summers 大神在2011年11月13號(hào)下午12點(diǎn)35分進(jìn)行的第一次提交卷哩,直到2013年2月13日上午3點(diǎn)05分發(fā)布了其1.0 release蛋辈,達(dá)到了第一個(gè)重要里程碑。ReactiveCocoa社區(qū)也非辰辏活躍冷溶,目前最新版已經(jīng)完成了ReactiveCocoa 5.0.0-alpha.3,目前在5.0.0-alpha.4開發(fā)中尊浓。

ReactiveCocoa v2.5 是公認(rèn)的Objective-C最穩(wěn)定的版本逞频,因此被廣大的以O(shè)C為主要語言的客戶端選中使用。ReactiveCocoa v3.x主要是基于Swift 1.2的版本栋齿,而ReactiveCocoa v4.x 主要基于Swift 2.x苗胀,ReactiveCocoa 5.0就全面支持Swift 3.0襟诸,也許還有以后的Swift 4.0。接下來幾篇博客先以ReactiveCocoa v2.5版本為例子基协,分析一下OC版的RAC具體實(shí)現(xiàn)(也許分析完了RAC 5.0就到來了)歌亲。也算是寫在ReactiveCocoa 5.0正式版到來前夕的祝福吧。

目錄

  • 1.什么是ReactiveCocoa堡掏?
  • 2.RAC中的核心RACSignal發(fā)送與訂閱流程
  • 3.RACSignal操作的核心bind實(shí)現(xiàn)
  • 4.RACSignal基本操作concat和zipWith實(shí)現(xiàn)
  • 5.最后

一. 什么是ReactiveCocoa应结?

ReactiveCocoa(其簡稱為RAC)是由Github 開源的一個(gè)應(yīng)用于iOS和OS X開發(fā)的新框架刨疼。RAC具有函數(shù)式編程(FP)和響應(yīng)式編程(RP)的特性泉唁。它主要吸取了.Net的 Reactive Extensions的設(shè)計(jì)和實(shí)現(xiàn)。

ReactiveCocoa 的宗旨是Streams of values over time 揩慕,隨著時(shí)間變化而不斷流動(dòng)的數(shù)據(jù)流亭畜。

ReactiveCocoa 主要解決了以下這些問題:

  • UI數(shù)據(jù)綁定

UI控件通常需要綁定一個(gè)事件,RAC可以很方便的綁定任何數(shù)據(jù)流到控件上迎卤。

  • 用戶交互事件綁定

RAC為可交互的UI控件提供了一系列能發(fā)送Signal信號(hào)的方法拴鸵。這些數(shù)據(jù)流會(huì)在用戶交互中相互傳遞。

  • 解決狀態(tài)以及狀態(tài)之間依賴過多的問題

有了RAC的綁定之后蜗搔,可以不用在關(guān)心各種復(fù)雜的狀態(tài)劲藐,isSelect,isFinish……也解決了這些狀態(tài)在后期很難維護(hù)的問題樟凄。

  • 消息傳遞機(jī)制的大統(tǒng)一

OC中編程原來消息傳遞機(jī)制有以下幾種:Delegate聘芜,Block Callback,Target-Action缝龄,Timers汰现,KVO,objc上有一篇關(guān)于OC中這5種消息傳遞方式改如何選擇的文章Communication Patterns叔壤,推薦大家閱讀∠顾牵現(xiàn)在有了RAC之后,以上這5種方式都可以統(tǒng)一用RAC來處理炼绘。

二. RAC中的核心RACSignal

ReactiveCocoa 中最核心的概念之一就是信號(hào)RACStream嗅战。RACStream中有兩個(gè)子類——RACSignal 和 RACSequence。本文先來分析RACSignal俺亮。

我們會(huì)經(jīng)惩院矗看到以下的代碼:


RACSignal *signal = [RACSignal createSignal:
                     ^RACDisposable *(id<RACSubscriber> subscriber)
{
    [subscriber sendNext:@1];
    [subscriber sendNext:@2];
    [subscriber sendNext:@3];
    [subscriber sendCompleted];
    return [RACDisposable disposableWithBlock:^{
        NSLog(@"signal dispose");
    }];
}];
RACDisposable *disposable = [signal subscribeNext:^(id x) {
    NSLog(@"subscribe value = %@", x);
} error:^(NSError *error) {
    NSLog(@"error: %@", error);
} completed:^{
    NSLog(@"completed");
}];

[disposable dispose];


這是一個(gè)RACSignal被訂閱的完整過程。被訂閱的過程中铅辞,究竟發(fā)生了什么厌漂?


+ (RACSignal *)createSignal:(RACDisposable * (^)(id<RACSubscriber> subscriber))didSubscribe {
 return [RACDynamicSignal createSignal:didSubscribe];
}

RACSignal調(diào)用createSignal的時(shí)候,會(huì)調(diào)用RACDynamicSignal的createSignal的方法斟珊。

RACDynamicSignal是RACSignal的子類苇倡。createSignal后面的參數(shù)是一個(gè)block富纸。


(RACDisposable * (^)(id<RACSubscriber> subscriber))didSubscribe

block的返回值是RACDisposable類型,block名叫didSubscribe旨椒。block的唯一一個(gè)參數(shù)是id<RACSubscriber>類型的subscriber晓褪,這個(gè)subscriber是必須遵循RACSubscriber協(xié)議的。

RACSubscriber是一個(gè)協(xié)議综慎,其下有以下4個(gè)協(xié)議方法:


@protocol RACSubscriber <NSObject>
@required

- (void)sendNext:(id)value;
- (void)sendError:(NSError *)error;
- (void)sendCompleted;
- (void)didSubscribeWithDisposable:(RACCompoundDisposable *)disposable;

@end

所以新建Signal的任務(wù)就全部落在了RACSignal的子類RACDynamicSignal上了涣仿。



@interface RACDynamicSignal ()
// The block to invoke for each subscriber.
@property (nonatomic, copy, readonly) RACDisposable * (^didSubscribe)(id<RACSubscriber> subscriber);
@end

RACDynamicSignal這個(gè)類很簡單,里面就保存了一個(gè)名字叫didSubscribe的block示惊。



+ (RACSignal *)createSignal:(RACDisposable * (^)(id<RACSubscriber> subscriber))didSubscribe {
     RACDynamicSignal *signal = [[self alloc] init];
     signal->_didSubscribe = [didSubscribe copy];
     return [signal setNameWithFormat:@"+createSignal:"];
}

這個(gè)方法中新建了一個(gè)RACDynamicSignal對象signal好港,并把傳進(jìn)來的didSubscribe這個(gè)block保存進(jìn)剛剛新建對象signal里面的didSubscribe屬性中。最后再給signal命名+createSignal:米罚。


- (instancetype)setNameWithFormat:(NSString *)format, ... {
 if (getenv("RAC_DEBUG_SIGNAL_NAMES") == NULL) return self;

   NSCParameterAssert(format != nil);

   va_list args;
   va_start(args, format);

   NSString *str = [[NSString alloc] initWithFormat:format arguments:args];
   va_end(args);

   self.name = str;
   return self;
}


setNameWithFormat是RACStream里面的方法钧汹,由于RACDynamicSignal繼承自RACSignal,所以它也能調(diào)用這個(gè)方法录择。

RACSignal的block就這樣被保存起來了拔莱,那什么時(shí)候會(huì)被執(zhí)行呢?

block閉包在訂閱的時(shí)候才會(huì)被“釋放”出來隘竭。

RACSignal調(diào)用subscribeNext方法塘秦,返回一個(gè)RACDisposable。


- (RACDisposable *)subscribeNext:(void (^)(id x))nextBlock error:(void (^)(NSError *error))errorBlock completed:(void (^)(void))completedBlock {
   NSCParameterAssert(nextBlock != NULL);
   NSCParameterAssert(errorBlock != NULL);
   NSCParameterAssert(completedBlock != NULL);
 
   RACSubscriber *o = [RACSubscriber subscriberWithNext:nextBlock error:errorBlock completed:completedBlock];
   return [self subscribe:o];
}


在這個(gè)方法中會(huì)新建一個(gè)RACSubscriber對象动看,并傳入nextBlock尊剔,errorBlock,completedBlock弧圆。


@interface RACSubscriber ()

// These callbacks should only be accessed while synchronized on self.
@property (nonatomic, copy) void (^next)(id value);
@property (nonatomic, copy) void (^error)(NSError *error);
@property (nonatomic, copy) void (^completed)(void);
@property (nonatomic, strong, readonly) RACCompoundDisposable *disposable;

@end


RACSubscriber這個(gè)類很簡單赋兵,里面只有4個(gè)屬性,分別是nextBlock搔预,errorBlock霹期,completedBlock和一個(gè)RACCompoundDisposable信號(hào)。


+ (instancetype)subscriberWithNext:(void (^)(id x))next error:(void (^)(NSError *error))error completed:(void (^)(void))completed {
 RACSubscriber *subscriber = [[self alloc] init];

   subscriber->_next = [next copy];
   subscriber->_error = [error copy];
   subscriber->_completed = [completed copy];

   return subscriber;
}


subscriberWithNext方法把傳入的3個(gè)block都保存分別保存到自己對應(yīng)的block中拯田。

RACSignal調(diào)用subscribeNext方法历造,最后return的時(shí)候,會(huì)調(diào)用[self subscribe:o]船庇,這里實(shí)際是調(diào)用了RACDynamicSignal類里面的subscribe方法吭产。



- (RACDisposable *)subscribe:(id<RACSubscriber>)subscriber {
 NSCParameterAssert(subscriber != nil);

   RACCompoundDisposable *disposable = [RACCompoundDisposable compoundDisposable];
   subscriber = [[RACPassthroughSubscriber alloc] initWithSubscriber:subscriber signal:self disposable:disposable];

   if (self.didSubscribe != NULL) {
      RACDisposable *schedulingDisposable = [RACScheduler.subscriptionScheduler schedule:^{
      RACDisposable *innerDisposable = self.didSubscribe(subscriber);
      [disposable addDisposable:innerDisposable];
  }];

    [disposable addDisposable:schedulingDisposable];
 }
 
 return disposable;
}

RACDisposable有3個(gè)子類,其中一個(gè)就是RACCompoundDisposable鸭轮。



@interface RACCompoundDisposable : RACDisposable
+ (instancetype)compoundDisposable;
+ (instancetype)compoundDisposableWithDisposables:(NSArray *)disposables;
- (void)addDisposable:(RACDisposable *)disposable;
- (void)removeDisposable:(RACDisposable *)disposable;
@end

RACCompoundDisposable雖然是RACDisposable的子類臣淤,但是它里面可以加入多個(gè)RACDisposable對象,在必要的時(shí)候可以一口氣都調(diào)用dispose方法來銷毀信號(hào)窃爷。當(dāng)RACCompoundDisposable對象被dispose的時(shí)候邑蒋,也會(huì)自動(dòng)dispose容器內(nèi)的所有RACDisposable對象姓蜂。

RACPassthroughSubscriber是一個(gè)私有的類。


@interface RACPassthroughSubscriber : NSObject <RACSubscriber>
@property (nonatomic, strong, readonly) id<RACSubscriber> innerSubscriber;
@property (nonatomic, unsafe_unretained, readonly) RACSignal *signal;
@property (nonatomic, strong, readonly) RACCompoundDisposable *disposable;
- (instancetype)initWithSubscriber:(id<RACSubscriber>)subscriber signal:(RACSignal *)signal disposable:(RACCompoundDisposable *)disposable;
@end

RACPassthroughSubscriber類就只有這一個(gè)方法医吊。目的就是為了把所有的信號(hào)事件從一個(gè)訂閱者subscriber傳遞給另一個(gè)還沒有disposed的訂閱者subscriber浦箱。

RACPassthroughSubscriber類中保存了3個(gè)非常重要的對象诵棵,RACSubscriber翰舌,RACSignal髓废,RACCompoundDisposable。RACSubscriber是待轉(zhuǎn)發(fā)的信號(hào)的訂閱者subscriber草描。RACCompoundDisposable是訂閱者的銷毀對象览绿,一旦它被disposed了,innerSubscriber就再也接受不到事件流了陶珠。

這里需要注意的是內(nèi)部還保存了一個(gè)RACSignal挟裂,并且它的屬性是unsafe_unretained。這里和其他兩個(gè)屬性有區(qū)別揍诽, 其他兩個(gè)屬性都是strong的。這里之所以不是weak栗竖,是因?yàn)橐肦ACSignal僅僅只是一個(gè)DTrace probes動(dòng)態(tài)跟蹤技術(shù)的探針暑脆。如果設(shè)置成weak,會(huì)造成沒必要的性能損失狐肢。所以這里僅僅是unsafe_unretained就夠了添吗。


- (instancetype)initWithSubscriber:(id<RACSubscriber>)subscriber signal:(RACSignal *)signal disposable:(RACCompoundDisposable *)disposable {
   NSCParameterAssert(subscriber != nil);

   self = [super init];
   if (self == nil) return nil;

   _innerSubscriber = subscriber;
   _signal = signal;
   _disposable = disposable;

   [self.innerSubscriber didSubscribeWithDisposable:self.disposable];
   return self;
}

回到RACDynamicSignal類里面的subscribe方法中,現(xiàn)在新建好了RACCompoundDisposable和RACPassthroughSubscriber對象了份名。


 if (self.didSubscribe != NULL) {
  RACDisposable *schedulingDisposable = [RACScheduler.subscriptionScheduler schedule:^{
   RACDisposable *innerDisposable = self.didSubscribe(subscriber);
   [disposable addDisposable:innerDisposable];
  }];

  [disposable addDisposable:schedulingDisposable];
 }


RACScheduler.subscriptionScheduler是一個(gè)全局的單例碟联。



+ (instancetype)subscriptionScheduler {
   static dispatch_once_t onceToken;
   static RACScheduler *subscriptionScheduler;
   dispatch_once(&onceToken, ^{
    subscriptionScheduler = [[RACSubscriptionScheduler alloc] init];
   });

   return subscriptionScheduler;
}

RACScheduler再繼續(xù)調(diào)用schedule方法。



- (RACDisposable *)schedule:(void (^)(void))block {
   NSCParameterAssert(block != NULL);
   if (RACScheduler.currentScheduler == nil) return [self.backgroundScheduler schedule:block];
   block();
   return nil;
}



+ (BOOL)isOnMainThread {
 return [NSOperationQueue.currentQueue isEqual:NSOperationQueue.mainQueue] || [NSThread isMainThread];
}

+ (instancetype)currentScheduler {
 RACScheduler *scheduler = NSThread.currentThread.threadDictionary[RACSchedulerCurrentSchedulerKey];
 if (scheduler != nil) return scheduler;
 if ([self.class isOnMainThread]) return RACScheduler.mainThreadScheduler;

 return nil;
}

在取currentScheduler的過程中僵腺,會(huì)判斷currentScheduler是否存在鲤孵,和是否在主線程中。如果都沒有辰如,那么就會(huì)調(diào)用后臺(tái)backgroundScheduler去執(zhí)行schedule普监。

schedule的入?yún)⒕褪且粋€(gè)block,執(zhí)行schedule的時(shí)候會(huì)去執(zhí)行block琉兜。也就是會(huì)去執(zhí)行:


RACDisposable *innerDisposable = self.didSubscribe(subscriber);
   [disposable addDisposable:innerDisposable];

這兩句關(guān)鍵的語句凯正。之前信號(hào)里面保存的block就會(huì)在此處被“釋放”執(zhí)行。self.didSubscribe(subscriber)這一句就執(zhí)行了信號(hào)保存的didSubscribe閉包豌蟋。

在didSubscribe閉包中有sendNext廊散,sendError,sendCompleted梧疲,執(zhí)行這些語句會(huì)分別調(diào)用RACPassthroughSubscriber里面對應(yīng)的方法允睹。


- (void)sendNext:(id)value {
 if (self.disposable.disposed) return;
 if (RACSIGNAL_NEXT_ENABLED()) {
  RACSIGNAL_NEXT(cleanedSignalDescription(self.signal), cleanedDTraceString(self.innerSubscriber.description), cleanedDTraceString([value description]));
 }
 [self.innerSubscriber sendNext:value];
}

- (void)sendError:(NSError *)error {
 if (self.disposable.disposed) return;
 if (RACSIGNAL_ERROR_ENABLED()) {
  RACSIGNAL_ERROR(cleanedSignalDescription(self.signal), cleanedDTraceString(self.innerSubscriber.description), cleanedDTraceString(error.description));
 }
 [self.innerSubscriber sendError:error];
}

- (void)sendCompleted {
 if (self.disposable.disposed) return;
 if (RACSIGNAL_COMPLETED_ENABLED()) {
  RACSIGNAL_COMPLETED(cleanedSignalDescription(self.signal), cleanedDTraceString(self.innerSubscriber.description));
 }
 [self.innerSubscriber sendCompleted];
}


這個(gè)時(shí)候的訂閱者是RACPassthroughSubscriber施符。RACPassthroughSubscriber里面的innerSubscriber才是最終的實(shí)際訂閱者,RACPassthroughSubscriber會(huì)把值再繼續(xù)傳遞給innerSubscriber擂找。


- (void)sendNext:(id)value {
 @synchronized (self) {
  void (^nextBlock)(id) = [self.next copy];
  if (nextBlock == nil) return;

  nextBlock(value);
 }
}

- (void)sendError:(NSError *)e {
 @synchronized (self) {
  void (^errorBlock)(NSError *) = [self.error copy];
  [self.disposable dispose];

  if (errorBlock == nil) return;
  errorBlock(e);
 }
}

- (void)sendCompleted {
 @synchronized (self) {
  void (^completedBlock)(void) = [self.completed copy];
  [self.disposable dispose];

  if (completedBlock == nil) return;
  completedBlock();
 }
}

innerSubscriber是RACSubscriber戳吝,調(diào)用sendNext的時(shí)候會(huì)先把自己的self.next閉包c(diǎn)opy一份,再調(diào)用贯涎,而且整個(gè)過程還是線程安全的听哭,用@synchronized保護(hù)著。最終訂閱者的閉包在這里被調(diào)用塘雳。

sendError和sendCompleted也都是同理陆盘。

總結(jié)一下:

  1. RACSignal調(diào)用subscribeNext方法,新建一個(gè)RACSubscriber败明。
  2. 新建的RACSubscriber會(huì)copy隘马,nextBlock,errorBlock妻顶,completedBlock存在自己的屬性變量中酸员。
  3. RACSignal的子類RACDynamicSignal調(diào)用subscribe方法。
  4. 新建RACCompoundDisposable和RACPassthroughSubscriber對象讳嘱。RACPassthroughSubscriber分別保存對RACSignal幔嗦,RACSubscriber,RACCompoundDisposable的引用沥潭,注意對RACSignal的引用是unsafe_unretained的邀泉。
  5. RACDynamicSignal調(diào)用didSubscribe閉包。先調(diào)用RACPassthroughSubscriber的相應(yīng)的sendNext钝鸽,sendError汇恤,sendCompleted方法。
  6. RACPassthroughSubscriber再去調(diào)用self.innerSubscriber拔恰,即RACSubscriber的nextBlock因谎,errorBlock,completedBlock仁连。注意這里調(diào)用同樣是先copy一份蓝角,再調(diào)用閉包執(zhí)行。

三. RACSignal操作的核心bind實(shí)現(xiàn)

在RACSignal的源碼里面包含了兩個(gè)基本操作饭冬,concat和zipWith使鹅。不過在分析這兩個(gè)操作之前,先來分析一下更加核心的一個(gè)函數(shù)昌抠,bind操作患朱。

先來說說bind函數(shù)的作用:

  1. 會(huì)訂閱原始的信號(hào)。
  2. 任何時(shí)刻原始信號(hào)發(fā)送一個(gè)值炊苫,都會(huì)綁定的block轉(zhuǎn)換一次裁厅。
  3. 一旦綁定的block轉(zhuǎn)換了值變成信號(hào)冰沙,就立即訂閱,并把值發(fā)給訂閱者subscriber执虹。
  4. 一旦綁定的block要終止綁定拓挥,原始的信號(hào)就complete。
  5. 當(dāng)所有的信號(hào)都complete袋励,發(fā)送completed信號(hào)給訂閱者subscriber侥啤。
  6. 如果中途信號(hào)出現(xiàn)了任何error,都要把這個(gè)錯(cuò)誤發(fā)送給subscriber

- (RACSignal *)bind:(RACStreamBindBlock (^)(void))block {
 NSCParameterAssert(block != NULL);

 return [[RACSignal createSignal:^(id<RACSubscriber> subscriber) {
  RACStreamBindBlock bindingBlock = block();

  NSMutableArray *signals = [NSMutableArray arrayWithObject:self];

  RACCompoundDisposable *compoundDisposable = [RACCompoundDisposable compoundDisposable];

  void (^completeSignal)(RACSignal *, RACDisposable *) = ^(RACSignal *signal, RACDisposable *finishedDisposable) { /*這里暫時(shí)省略*/ };
  void (^addSignal)(RACSignal *) = ^(RACSignal *signal) { /*這里暫時(shí)省略*/ };

  @autoreleasepool {
   RACSerialDisposable *selfDisposable = [[RACSerialDisposable alloc] init];
   [compoundDisposable addDisposable:selfDisposable];

   RACDisposable *bindingDisposable = [self subscribeNext:^(id x) {
    // Manually check disposal to handle synchronous errors.
    if (compoundDisposable.disposed) return;

    BOOL stop = NO;
    id signal = bindingBlock(x, &stop);

    @autoreleasepool {
     if (signal != nil) addSignal(signal);
     if (signal == nil || stop) {
      [selfDisposable dispose];
      completeSignal(self, selfDisposable);
     }
    }
   } error:^(NSError *error) {
    [compoundDisposable dispose];
    [subscriber sendError:error];
   } completed:^{
    @autoreleasepool {
     completeSignal(self, selfDisposable);
    }
   }];

   selfDisposable.disposable = bindingDisposable;
  }

  return compoundDisposable;
 }] setNameWithFormat:@"[%@] -bind:", self.name];
}

為了弄清楚bind函數(shù)究竟做了什么茬故,寫出測試代碼:


    RACSignal *signal = [RACSignal createSignal:
                         ^RACDisposable *(id<RACSubscriber> subscriber)
    {
        [subscriber sendNext:@1];
        [subscriber sendNext:@2];
        [subscriber sendNext:@3];
        [subscriber sendCompleted];
        return [RACDisposable disposableWithBlock:^{
            NSLog(@"signal dispose");
        }];
    }];
    
    RACSignal *bindSignal = [signal bind:^RACStreamBindBlock{
        return ^RACSignal *(NSNumber *value, BOOL *stop){
            value = @(value.integerValue * 2);
            return [RACSignal return:value];
        };
    }];
    
    [bindSignal subscribeNext:^(id x) {
        NSLog(@"subscribe value = %@", x);
    }];


由于前面第一章節(jié)詳細(xì)講解了RACSignal的創(chuàng)建和訂閱的全過程盖灸,這個(gè)也為了方法講解,創(chuàng)建RACDynamicSignal磺芭,RACCompoundDisposable赁炎,RACPassthroughSubscriber這些都略過,這里著重分析一下bind的各個(gè)閉包傳遞創(chuàng)建和訂閱的過程钾腺。

為了防止接下來的分析會(huì)讓讀者看暈徙垫,這里先把要用到的block進(jìn)行編號(hào)。


    RACSignal *signal = [RACSignal createSignal:
                         ^RACDisposable *(id<RACSubscriber> subscriber)
    {
        // block 1
    }

    RACSignal *bindSignal = [signal bind:^RACStreamBindBlock{
        // block 2
        return ^RACSignal *(NSNumber *value, BOOL *stop){
            // block 3
        };
    }];

    [bindSignal subscribeNext:^(id x) {
        // block 4
    }];

- (RACSignal *)bind:(RACStreamBindBlock (^)(void))block {
        // block 5
    return [[RACSignal createSignal:^(id<RACSubscriber> subscriber) {
        // block 6
        RACStreamBindBlock bindingBlock = block();
        NSMutableArray *signals = [NSMutableArray arrayWithObject:self];
        
        void (^completeSignal)(RACSignal *, RACDisposable *) = ^(RACSignal *signal, RACDisposable *finishedDisposable) {
        // block 7
        };
        
        void (^addSignal)(RACSignal *) = ^(RACSignal *signal) {
        // block 8
            RACDisposable *disposable = [signal subscribeNext:^(id x) {
            // block 9
            }];
        };
        
        @autoreleasepool {
            RACDisposable *bindingDisposable = [self subscribeNext:^(id x) {
                // block 10
                id signal = bindingBlock(x, &stop);
                
                @autoreleasepool {
                    if (signal != nil) addSignal(signal);
                    if (signal == nil || stop) {
                        [selfDisposable dispose];
                        completeSignal(self, selfDisposable);
                    }
                }
            } error:^(NSError *error) {
                [compoundDisposable dispose];
                [subscriber sendError:error];
            } completed:^{
                @autoreleasepool {
                    completeSignal(self, selfDisposable);
                }
            }];
        }
        return compoundDisposable;
    }] ;
}

先創(chuàng)建信號(hào)signal垮庐,didSubscribe把block1 copy保存起來松邪。

當(dāng)信號(hào)調(diào)用bind進(jìn)行綁定,會(huì)調(diào)用block5哨查,didSubscribe把block6 copy保存起來。

當(dāng)訂閱者開始訂閱bindSignal的時(shí)候剧辐,流程如下:

  1. bindSignal執(zhí)行didSubscribe的block寒亥,即執(zhí)行block6。
  2. 在block6 的第一句代碼荧关,就是調(diào)用RACStreamBindBlock bindingBlock = block()溉奕,這里的block是外面?zhèn)鬟M(jìn)來的block2,于是開始調(diào)用block2忍啤。執(zhí)行完block2加勤,會(huì)返回一個(gè)RACStreamBindBlock的對象。
  3. 由于是signal調(diào)用的bind函數(shù)同波,所以bind函數(shù)里面的self就是signal鳄梅,在bind內(nèi)部訂閱了signal的信號(hào)。subscribeNext所以會(huì)執(zhí)行block1未檩。
  4. 執(zhí)行block1戴尸,sendNext調(diào)用訂閱者subscriber的nextBlock,于是開始執(zhí)行block10冤狡。
  5. block10中會(huì)先調(diào)用bindingBlock孙蒙,這個(gè)是之前調(diào)用block2的返回值项棠,這個(gè)RACStreamBindBlock對象里面保存的是block3。所以開始調(diào)用block3挎峦。
  6. 在block3中入?yún)⑹且粋€(gè)value香追,這個(gè)value是signal中sendNext中發(fā)出來的value的值,在block3中可以對value進(jìn)行變換坦胶,變換完成后透典,返回一個(gè)新的信號(hào)signal'。
  7. 如果返回的signal'為空迁央,則會(huì)調(diào)用completeSignal掷匠,即調(diào)用block7。block7中會(huì)發(fā)送sendCompleted岖圈。如果返回的signal'不為空讹语,則會(huì)調(diào)用addSignal,即調(diào)用block8蜂科。block8中會(huì)繼續(xù)訂閱signal'顽决。由于signal'是外面bind函數(shù)的返回值,返回值的信號(hào)是RACReturnSignal類型的导匣,所以一訂閱就會(huì)sendNext才菠,就會(huì)執(zhí)行block9。
  8. block9 中會(huì)sendNext贡定,這里的subscriber是block6的入?yún)⒏撤茫谑菍ubscriber調(diào)用sendNext,會(huì)調(diào)用到bindSignal的訂閱者的block4中缓待。
  9. block9 中執(zhí)行完sendNext蚓耽,還會(huì)調(diào)用sendCompleted。這里的是在執(zhí)行block9里面的completed閉包旋炒。completeSignal(signal, selfDisposable);然后又會(huì)調(diào)用completeSignal步悠,即block7。
  10. 執(zhí)行完block7瘫镇,就完成了一次從signal 發(fā)送信號(hào)sendNext的全過程鼎兽。

bind整個(gè)流程就完成了。

四. RACSignal基本操作concat和zipWith實(shí)現(xiàn)

接下來再來分析RACSignal中另外2個(gè)基本操作铣除。

1. concat

寫出測試代碼:



    RACSignal *signal = [RACSignal createSignal:
                         ^RACDisposable *(id<RACSubscriber> subscriber)
    {
        [subscriber sendNext:@1];
        [subscriber sendCompleted];
        return [RACDisposable disposableWithBlock:^{
            NSLog(@"signal dispose");
        }];
    }];


    RACSignal *signals = [RACSignal createSignal:
                         ^RACDisposable *(id<RACSubscriber> subscriber)
    {
        [subscriber sendNext:@2];
        [subscriber sendNext:@3];
        [subscriber sendNext:@6];
        [subscriber sendCompleted];
        return [RACDisposable disposableWithBlock:^{
            NSLog(@"signal dispose");
        }];
    }];

    RACSignal *concatSignal = [signal concat:signals];
    
    [concatSignal subscribeNext:^(id x) {
        NSLog(@"subscribe value = %@", x);
    }];

concat操作就是把兩個(gè)信號(hào)合并起來谚咬。注意合并有先后順序。


- (RACSignal *)concat:(RACSignal *)signal {
   return [[RACSignal createSignal:^(id<RACSubscriber> subscriber) {
    RACSerialDisposable *serialDisposable = [[RACSerialDisposable alloc] init];

    RACDisposable *sourceDisposable = [self subscribeNext:^(id x) {
     // 發(fā)送第一個(gè)信號(hào)的值
     [subscriber sendNext:x];
    } error:^(NSError *error) {
     [subscriber sendError:error];
    } completed:^{
     // 訂閱第二個(gè)信號(hào)
     RACDisposable *concattedDisposable = [signal subscribe:subscriber];
     serialDisposable.disposable = concattedDisposable;
  }];

    serialDisposable.disposable = sourceDisposable;
    return serialDisposable;
 }] setNameWithFormat:@"[%@] -concat: %@", self.name, signal];
}

合并前通孽,signal和signals分別都把各自的didSubscribe保存copy起來序宦。
合并之后,合并之后新的信號(hào)的didSubscribe會(huì)把block保存copy起來。

當(dāng)合并之后的信號(hào)被訂閱的時(shí)候:

  1. 調(diào)用新的合并信號(hào)的didSubscribe互捌。
  2. 由于是第一個(gè)信號(hào)調(diào)用的concat方法潘明,所以block中的self是前一個(gè)信號(hào)signal。合并信號(hào)的didSubscribe會(huì)先訂閱signal秕噪。
  3. 由于訂閱了signal钳降,于是開始執(zhí)行signal的didSubscribe,sendNext腌巾,sendError遂填。
  4. 當(dāng)前一個(gè)信號(hào)signal發(fā)送sendCompleted之后,就會(huì)開始訂閱后一個(gè)信號(hào)signals澈蝙,調(diào)用signals的didSubscribe吓坚。
  5. 由于訂閱了后一個(gè)信號(hào),于是后一個(gè)信號(hào)signals開始發(fā)送sendNext灯荧,sendError礁击,sendCompleted。

這樣兩個(gè)信號(hào)就前后有序的拼接到了一起逗载。

這里有二點(diǎn)需要注意的是:

  1. 只有當(dāng)?shù)谝粋€(gè)信號(hào)完成之后才能收到第二個(gè)信號(hào)的值哆窿,因?yàn)榈诙€(gè)信號(hào)是在第一個(gè)信號(hào)completed的閉包里面訂閱的,所以第一個(gè)信號(hào)不結(jié)束厉斟,第二個(gè)信號(hào)也不會(huì)被訂閱挚躯。
  2. 兩個(gè)信號(hào)concat在一起之后,新的信號(hào)的結(jié)束信號(hào)在第二個(gè)信號(hào)結(jié)束的時(shí)候才結(jié)束擦秽÷肜螅看上圖描述,新的信號(hào)的發(fā)送長度等于前面兩個(gè)信號(hào)長度之和感挥,concat之后的新信號(hào)的結(jié)束信號(hào)也就是第二個(gè)信號(hào)的結(jié)束信號(hào)目胡。

concat是有序的組合,第一個(gè)信號(hào)完成之后才發(fā)送第二個(gè)信號(hào)链快。

2. zipWith

寫出測試代碼:



    RACSignal *concatSignal = [signal zipWith:signals];
    
    [concatSignal subscribeNext:^(id x) {
        NSLog(@"subscribe value = %@", x);
    }];


源碼如下:


- (RACSignal *)zipWith:(RACSignal *)signal {
    NSCParameterAssert(signal != nil);
    
    return [[RACSignal createSignal:^(id<RACSubscriber> subscriber) {
        __block BOOL selfCompleted = NO;
        NSMutableArray *selfValues = [NSMutableArray array];
        
        __block BOOL otherCompleted = NO;
        NSMutableArray *otherValues = [NSMutableArray array];
        
        void (^sendCompletedIfNecessary)(void) = ^{
            @synchronized (selfValues) {
                BOOL selfEmpty = (selfCompleted && selfValues.count == 0);
                BOOL otherEmpty = (otherCompleted && otherValues.count == 0);
                
                // 如果任意一個(gè)信號(hào)完成并且數(shù)組里面空了,就整個(gè)信號(hào)算完成
                if (selfEmpty || otherEmpty) [subscriber sendCompleted];
            }
        };
        
        void (^sendNext)(void) = ^{
            @synchronized (selfValues) {
                
                // 數(shù)組里面的空了就返回眉尸。
                if (selfValues.count == 0) return;
                if (otherValues.count == 0) return;
                
                // 每次都取出兩個(gè)數(shù)組里面的第0位的值域蜗,打包成元組
                RACTuple *tuple = RACTuplePack(selfValues[0], otherValues[0]);
                [selfValues removeObjectAtIndex:0];
                [otherValues removeObjectAtIndex:0];
                
                // 把元組發(fā)送出去
                [subscriber sendNext:tuple];
                sendCompletedIfNecessary();
            }
        };
        
        // 訂閱第一個(gè)信號(hào)
        RACDisposable *selfDisposable = [self subscribeNext:^(id x) {
            @synchronized (selfValues) {
                
                // 把第一個(gè)信號(hào)的值加入到數(shù)組中
                [selfValues addObject:x ?: RACTupleNil.tupleNil];
                sendNext();
            }
        } error:^(NSError *error) {
            [subscriber sendError:error];
        } completed:^{
            @synchronized (selfValues) {
                
                // 訂閱完成時(shí)判斷是否要發(fā)送完成信號(hào)
                selfCompleted = YES;
                sendCompletedIfNecessary();
            }
        }];
        
        // 訂閱第二個(gè)信號(hào)
        RACDisposable *otherDisposable = [signal subscribeNext:^(id x) {
            @synchronized (selfValues) {
                
                // 把第二個(gè)信號(hào)加入到數(shù)組中
                [otherValues addObject:x ?: RACTupleNil.tupleNil];
                sendNext();
            }
        } error:^(NSError *error) {
            [subscriber sendError:error];
        } completed:^{
            @synchronized (selfValues) {
                
                // 訂閱完成時(shí)判斷是否要發(fā)送完成信號(hào)
                otherCompleted = YES;
                sendCompletedIfNecessary();
            }
        }];
        
        return [RACDisposable disposableWithBlock:^{
            
            // 銷毀兩個(gè)信號(hào)
            [selfDisposable dispose];
            [otherDisposable dispose];
        }];
    }] setNameWithFormat:@"[%@] -zipWith: %@", self.name, signal];
}

當(dāng)把兩個(gè)信號(hào)通過zipWith之后,就像上面的那張圖一樣噪猾,拉鏈的兩邊被中間的拉索拉到了一起霉祸。既然是拉鏈,那么一一的位置是有對應(yīng)的袱蜡,上面的拉鏈第一個(gè)位置只能對著下面拉鏈第一個(gè)位置丝蹭,這樣拉鏈才能拉到一起去。

具體實(shí)現(xiàn):

zipWith里面有兩個(gè)數(shù)組坪蚁,分別會(huì)存儲(chǔ)兩個(gè)信號(hào)的值奔穿。

  1. 一旦訂閱了zipWith之后的信號(hào)镜沽,就開始執(zhí)行didSubscribe閉包。
  2. 在閉包中會(huì)先訂閱第一個(gè)信號(hào)贱田。這里假設(shè)第一個(gè)信號(hào)比第二個(gè)信號(hào)先發(fā)出一個(gè)值缅茉。第一個(gè)信號(hào)發(fā)出來的每一個(gè)值都會(huì)被加入到第一個(gè)數(shù)組中保存起來,然后調(diào)用sendNext( )閉包男摧。在sendNext( )閉包中蔬墩,會(huì)先判斷兩個(gè)數(shù)組里面是否都為空,如果有一個(gè)數(shù)組里面是空的耗拓,就return拇颅。由于第二個(gè)信號(hào)還沒有發(fā)送值,即第二個(gè)信號(hào)的數(shù)組里面是空的乔询,所以這里第一個(gè)值發(fā)送不出來樟插。于是第一個(gè)信號(hào)被訂閱之后,發(fā)送的值存儲(chǔ)到了第一個(gè)數(shù)組里面了哥谷,沒有發(fā)出去岸夯。
  3. 第二個(gè)信號(hào)的值緊接著發(fā)出來了,第二個(gè)信號(hào)每發(fā)送一次值们妥,也會(huì)存儲(chǔ)到第二個(gè)數(shù)組中猜扮,但是這個(gè)時(shí)候再調(diào)用sendNext( )閉包的時(shí)候,不會(huì)再return了监婶,因?yàn)閮蓚€(gè)數(shù)組里面都有值了旅赢,兩個(gè)數(shù)組的第0號(hào)位置都有一個(gè)值了。有值以后就打包成元組RACTuple發(fā)送出去惑惶。并清空兩個(gè)數(shù)組0號(hào)位置存儲(chǔ)的值煮盼。
  4. 以后兩個(gè)信號(hào)每次發(fā)送一個(gè),就先存儲(chǔ)在數(shù)組中带污,只要有“配對”的另一個(gè)信號(hào)僵控,就一起打包成元組RACTuple發(fā)送出去。從圖中也可以看出鱼冀,zipWith之后的新信號(hào)报破,每個(gè)信號(hào)的發(fā)送時(shí)刻是等于兩個(gè)信號(hào)最晚發(fā)出信號(hào)的時(shí)刻。
  5. 新信號(hào)的完成時(shí)間千绪,是當(dāng)兩者任意一個(gè)信號(hào)完成并且數(shù)組里面為空充易,就算完成了。所以最后第一個(gè)信號(hào)發(fā)送的5的那個(gè)值就被丟棄了荸型。

第一個(gè)信號(hào)依次發(fā)送的1盹靴,2,3,4的值和第二個(gè)信號(hào)依次發(fā)送的A稿静,B梭冠,C,D的值自赔,一一的合在了一起妈嘹,就像拉鏈把他們拉在一起。由于5沒法配對绍妨,所以拉鏈也拉不上了润脸。

五. 最后

本來這篇文章想把Map,combineLatest他去,flattenMap毙驯,flatten這些也一起分析了,但是后來看到RACSingnal的操作實(shí)在有點(diǎn)多灾测,于是按照源碼的文件分開了爆价,這里先把RACSignal文件里面的操作都分析完了。RACSignal文件里面的操作主要就bind媳搪,concat和zipWith三個(gè)操作铭段。下一篇再分析分析RACSignal+Operations文件里面的所有操作。

請大家多多指教秦爆。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末序愚,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子等限,更是在濱河造成了極大的恐慌爸吮,老刑警劉巖,帶你破解...
    沈念sama閱讀 219,539評論 6 508
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件望门,死亡現(xiàn)場離奇詭異形娇,居然都是意外死亡,警方通過查閱死者的電腦和手機(jī)筹误,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,594評論 3 396
  • 文/潘曉璐 我一進(jìn)店門桐早,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人厨剪,你說我怎么就攤上這事勘畔。” “怎么了丽惶?”我有些...
    開封第一講書人閱讀 165,871評論 0 356
  • 文/不壞的土叔 我叫張陵,是天一觀的道長爬立。 經(jīng)常有香客問我钾唬,道長,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 58,963評論 1 295
  • 正文 為了忘掉前任抡秆,我火速辦了婚禮奕巍,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘儒士。我一直安慰自己的止,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,984評論 6 393
  • 文/花漫 我一把揭開白布着撩。 她就那樣靜靜地躺著诅福,像睡著了一般。 火紅的嫁衣襯著肌膚如雪拖叙。 梳的紋絲不亂的頭發(fā)上氓润,一...
    開封第一講書人閱讀 51,763評論 1 307
  • 那天,我揣著相機(jī)與錄音薯鳍,去河邊找鬼咖气。 笑死,一個(gè)胖子當(dāng)著我的面吹牛挖滤,可吹牛的內(nèi)容都是我干的崩溪。 我是一名探鬼主播,決...
    沈念sama閱讀 40,468評論 3 420
  • 文/蒼蘭香墨 我猛地睜開眼斩松,長吁一口氣:“原來是場噩夢啊……” “哼伶唯!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起砸民,我...
    開封第一講書人閱讀 39,357評論 0 276
  • 序言:老撾萬榮一對情侶失蹤抵怎,失蹤者是張志新(化名)和其女友劉穎,沒想到半個(gè)月后岭参,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體反惕,經(jīng)...
    沈念sama閱讀 45,850評論 1 317
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 38,002評論 3 338
  • 正文 我和宋清朗相戀三年演侯,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了姿染。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 40,144評論 1 351
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡秒际,死狀恐怖悬赏,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情娄徊,我是刑警寧澤闽颇,帶...
    沈念sama閱讀 35,823評論 5 346
  • 正文 年R本政府宣布,位于F島的核電站寄锐,受9級特大地震影響兵多,放射性物質(zhì)發(fā)生泄漏尖啡。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,483評論 3 331
  • 文/蒙蒙 一剩膘、第九天 我趴在偏房一處隱蔽的房頂上張望衅斩。 院中可真熱鬧,春花似錦怠褐、人聲如沸畏梆。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,026評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽奠涌。三九已至,卻和暖如春筐赔,著一層夾襖步出監(jiān)牢的瞬間铣猩,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 33,150評論 1 272
  • 我被黑心中介騙來泰國打工茴丰, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留达皿,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 48,415評論 3 373
  • 正文 我出身青樓贿肩,卻偏偏與公主長得像峦椰,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個(gè)殘疾皇子汰规,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,092評論 2 355

推薦閱讀更多精彩內(nèi)容

  • 前言由于時(shí)間的問題汤功,暫且只更新這么多了,后續(xù)還會(huì)持續(xù)更新本文《最快讓你上手ReactiveCocoa之進(jìn)階篇》溜哮,目...
    Karos_凱閱讀 1,739評論 0 6
  • 1.ReactiveCocoa常見操作方法介紹1.1 ReactiveCocoa操作須知所有的信號(hào)(RACSign...
    IIronMan閱讀 2,597評論 2 17
  • 1.ReactiveCocoa常見操作方法介紹滔金。 1.1 ReactiveCocoa操作須知 所有的信號(hào)(RACS...
    萌芽的冬天閱讀 1,023評論 0 5
  • RAC使用測試Demo下載:github.com/FuWees/WPRACTestDemo 1.ReactiveC...
    FuWees閱讀 6,372評論 3 10
  • 七月,這個(gè)煩躁的季節(jié)茂嗓,我將心門冰封餐茵,不讓一絲火熱,灼傷僅存的夢述吸。 杯里的茶忿族,已淡如水,卻仍未釋手蝌矛,努力的品嘗道批,最初...
    無疾道人閱讀 253評論 0 1