ReactiveCocoa 中 RACSignal 所有變換操作底層實(shí)現(xiàn)分析(上)

在ReactiveCocoa整個(gè)庫(kù)中紧显,RACSignal占據(jù)著比較重要的位置,而RACSignal的變換操作更是整個(gè)RACStream流操作核心之一层皱。

bind函數(shù)可以簡(jiǎn)單的縮寫(xiě)成下面這樣子性锭。

- (RACSignal *)bind:(RACStreamBindBlock (^)(void))block;
{
    return [RACSignal createSignal:^RACDisposable *(id subscriber) {

        RACStreamBindBlock bindBlock = block();
        [self subscribeNext:^(id x) {    //(1)
            BOOL stop = NO;
            RACSignal *signal = (RACSignal *)bindBlock(x, &stop); //(2)
            if (signal == nil || stop) {
                [subscriber sendCompleted];
            } else {
                [signal subscribeNext:^(id x) {
                    [subscriber sendNext:x];  //(3)
                } error:^(NSError *error) {
                    [subscriber sendError:error];
                } completed:^{

                }];
            }
        } error:^(NSError *error) {
            [subscriber sendError:error];
        } completed:^{
            [subscriber sendCompleted];
        }];
        return nil;
    }];
}

當(dāng)bind變換之后的信號(hào)被訂閱,就開(kāi)始執(zhí)行bind函數(shù)中return的block閉包叫胖。

在bind閉包中草冈,先訂閱原先的信號(hào)A。
在訂閱原信號(hào)A的didSubscribe閉包中進(jìn)行信號(hào)變換瓮增,變換中用到的block閉包是外部傳遞進(jìn)來(lái)的怎棱,也就是bind函數(shù)的入?yún)ⅰW儞Q結(jié)束钉赁,得到新的信號(hào)B
訂閱新的信號(hào)B蹄殃,拿到bind變化之后的信號(hào)的訂閱者subscriber,對(duì)其發(fā)送新的信號(hào)值你踩。

簡(jiǎn)要的過(guò)程如上圖诅岩,bind函數(shù)中進(jìn)行了2次訂閱的操作,第一次訂閱是為了拿到signalA的值带膜,第二次訂閱是為了把signalB的新值發(fā)給bind變換之后得到的signalB的訂閱者吩谦。

回顧完bind底層實(shí)現(xiàn)之后,就可以開(kāi)始繼續(xù)本篇文章的分析了膝藕。

目錄

1.變換操作
2.時(shí)間操作

一.變換操作

我們都知道RACSignal是繼承自RACStream的式廷,而在底層的RACStream上也定義了一些基本的信號(hào)變換的操作,所以這些操作在RACSignal上同樣適用芭挽。如果在RACsignal中沒(méi)有重寫(xiě)這些方法滑废,那么調(diào)用這些操作,實(shí)際是調(diào)用的父類RACStream的操作袜爪。下面分析的時(shí)候蠕趁,會(huì)把實(shí)際調(diào)用父類RACStream的操作的地方都標(biāo)注出來(lái)。

1.Map: (在父類RACStream中定義的)

map操作一般是用來(lái)信號(hào)變換的辛馆。

    RACSignal *signalB = [signalA map:^id(NSNumber *value) {
        return @([value intValue] * 10);
    }];

來(lái)看看底層是如何實(shí)現(xiàn)的俺陋。

- (instancetype)map:(id (^)(id value))block {
    NSCParameterAssert(block != nil);

    Class class = self.class;

    return [[self flattenMap:^(id value) {
        return [class return:block(value)];
    }] setNameWithFormat:@"[%@] -map:", self.name];
}

這里實(shí)現(xiàn)代碼比較嚴(yán)謹(jǐn),先判斷self的類型昙篙。因?yàn)镽ACStream的子類中會(huì)有一些子類會(huì)重寫(xiě)這些方法腊状,所以需要判斷self的類型,在回調(diào)中可以回調(diào)到原類型的方法中去苔可。

由于本篇文章中我們都分析RACSignal的操作缴挖,所以這里的self.class是RACDynamicSignal類型的。相應(yīng)的在return返回值中也返回class焚辅,即RACDynamicSignal類型的信號(hào)醇疼。

從map實(shí)現(xiàn)代碼上來(lái)看硕并,map實(shí)現(xiàn)是用了flattenMap函數(shù)來(lái)實(shí)現(xiàn)的。把map的入?yún)㈤]包秧荆,放到了flattenMap的返回值中倔毙。

在來(lái)看看flattenMap的實(shí)現(xiàn):

- (instancetype)flattenMap:(RACStream * (^)(id value))block {
    Class class = self.class;

    return [[self bind:^{
        return ^(id value, BOOL *stop) {
            id stream = block(value) ?: [class empty];
            NSCAssert([stream isKindOfClass:RACStream.class], @"Value returned from -flattenMap: is not a stream: %@", stream);

            return stream;
        };
    }] setNameWithFormat:@"[%@] -flattenMap:", self.name];
}

flattenMap算是對(duì)bind函數(shù)的一種封裝。bind函數(shù)的入?yún)⑹且粋€(gè)RACStreamBindBlock類型的閉包乙濒。而flattenMap函數(shù)的入?yún)⑹且粋€(gè)value陕赃,返回值RACStream類型的閉包。

在flattenMap中颁股,返回block(value)的信號(hào)么库,如果信號(hào)為nil,則返回[class empty]甘有。

先來(lái)看看為空的情況诉儒。當(dāng)block(value)為空,返回[RACEmptySignal empty]亏掀,empty就是創(chuàng)建了一個(gè)RACEmptySignal類型的信號(hào):

+ (RACSignal *)empty {
#ifdef DEBUG
    // Create multiple instances of this class in DEBUG so users can set custom
    // names on each.
    return [[[self alloc] init] setNameWithFormat:@"+empty"];
#else
    static id singleton;
    static dispatch_once_t pred;

    dispatch_once(&pred, ^{
        singleton = [[self alloc] init];
    });

    return singleton;
#endif
}

RACEmptySignal類型的信號(hào)又是什么呢蟹漓?

- (RACDisposable *)subscribe:(id<RACSubscriber>)subscriber {
    NSCParameterAssert(subscriber != nil);

    return [RACScheduler.subscriptionScheduler schedule:^{
        [subscriber sendCompleted];
    }];
}

RACEmptySignal是RACSignal的子類纳寂,一旦訂閱它您没,它就會(huì)同步的發(fā)送completed完成信號(hào)給訂閱者她肯。

所以flattenMap返回一個(gè)信號(hào),如果信號(hào)不存在间影,就返回一個(gè)completed完成信號(hào)給訂閱者注竿。

再來(lái)看看flattenMap返回的信號(hào)是怎么變換的。

block(value)會(huì)把原信號(hào)發(fā)送過(guò)來(lái)的value傳遞給flattenMap的入?yún)⒒瓯帷lattenMap的入?yún)⑹且粋€(gè)閉包巩割,閉包的參數(shù)也是value的:

^(id value) { return [class return:block(value)]; }

這個(gè)閉包返回一個(gè)信號(hào),信號(hào)類型和原信號(hào)的類型一樣付燥,即RACDynamicSignal類型的喂分,值是block(value)。這里的閉包是外面map傳進(jìn)來(lái)的:

^id(NSNumber *value) { return @([value intValue] * 10); }

在這個(gè)閉包中把原信號(hào)的value值傳進(jìn)去進(jìn)行變換机蔗,變換結(jié)束之后,包裝成和原信號(hào)相同類型的信號(hào)甘萧,返回萝嘁。返回的信號(hào)作為bind函數(shù)的閉包的返回值。這樣訂閱新的map之后的信號(hào)就會(huì)拿到變換之后的值扬卷。

2.MapReplace: (在父類RACStream中定義的)

一般用法如下:

RACSignal *signalB = [signalA mapReplace:@"A"];

效果是不管A信號(hào)發(fā)送什么值牙言,都替換成@“A”。

- (instancetype)mapReplace:(id)object {
    return [[self map:^(id _) {
        return object;
    }] setNameWithFormat:@"[%@] -mapReplace: %@", self.name, [object rac_description]];
}

看底層源碼就知道怪得,它并不去關(guān)心原信號(hào)發(fā)送的是什么值咱枉,原信號(hào)發(fā)送什么值卑硫,都返回入?yún)bject的值。

3.reduceEach: (在父類RACStream中定義的)

reduce是減少蚕断,聚合在一起的意思欢伏,reduceEach就是每個(gè)信號(hào)內(nèi)部都聚合在一起。

    RACSignal *signalB = [signalA reduceEach:^id(NSNumber *num1 , NSNumber *num2){
        return @([num1 intValue] + [num2 intValue]);
    }];

reduceEach后面比如傳入一個(gè)元組RACTuple類型亿乳,否則會(huì)報(bào)錯(cuò)硝拧。

- (instancetype)reduceEach:(id (^)())reduceBlock {
    NSCParameterAssert(reduceBlock != nil);

    __weak RACStream *stream __attribute__((unused)) = self;
    return [[self map:^(RACTuple *t) {
        NSCAssert([t isKindOfClass:RACTuple.class], @"Value from stream %@ is not a tuple: %@", stream, t);
        return [RACBlockTrampoline invokeBlock:reduceBlock withArguments:t];
    }] setNameWithFormat:@"[%@] -reduceEach:", self.name];
}

這里有兩個(gè)斷言,一個(gè)是判斷傳入的reduceBlock閉包是否為空葛假,另一個(gè)斷言是判斷閉包的入?yún)⑹欠袷荝ACTuple類型的障陶。

@interface RACBlockTrampoline : NSObject
@property (nonatomic, readonly, copy) id block;
+ (id)invokeBlock:(id)block withArguments:(RACTuple *)arguments;
@end

RACBlockTrampoline就是一個(gè)保存了一個(gè)block閉包的對(duì)象,它會(huì)根據(jù)傳進(jìn)來(lái)的參數(shù)聊训,動(dòng)態(tài)的構(gòu)造一個(gè)NSInvocation抱究,并執(zhí)行。

reduceEach把入?yún)educeBlock作為RACBlockTrampoline的入?yún)nvokeBlock傳進(jìn)去带斑,以及每個(gè)RACTuple也傳到RACBlockTrampoline中鼓寺。

- (id)invokeWithArguments:(RACTuple *)arguments {
    SEL selector = [self selectorForArgumentCount:arguments.count];
    NSInvocation *invocation = [NSInvocation invocationWithMethodSignature:[self methodSignatureForSelector:selector]];
    invocation.selector = selector;
    invocation.target = self;

    for (NSUInteger i = 0; i < arguments.count; i++) {
        id arg = arguments[i];
        NSInteger argIndex = (NSInteger)(i + 2);
        [invocation setArgument:&arg atIndex:argIndex];
    }

    [invocation invoke];

    __unsafe_unretained id returnVal;
    [invocation getReturnValue:&returnVal];
    return returnVal;
}

第一步就是先計(jì)算入?yún)⒁粋€(gè)元組RACTuple里面元素的個(gè)數(shù)。

- (SEL)selectorForArgumentCount:(NSUInteger)count {
    NSCParameterAssert(count > 0);

    switch (count) {
        case 0: return NULL;
        case 1: return @selector(performWith:);
        case 2: return @selector(performWith::);
        case 3: return @selector(performWith:::);
        case 4: return @selector(performWith::::);
        case 5: return @selector(performWith:::::);
        case 6: return @selector(performWith::::::);
        case 7: return @selector(performWith:::::::);
        case 8: return @selector(performWith::::::::);
        case 9: return @selector(performWith:::::::::);
        case 10: return @selector(performWith::::::::::);
        case 11: return @selector(performWith:::::::::::);
        case 12: return @selector(performWith::::::::::::);
        case 13: return @selector(performWith:::::::::::::);
        case 14: return @selector(performWith::::::::::::::);
        case 15: return @selector(performWith:::::::::::::::);
    }

    NSCAssert(NO, @"The argument count is too damn high! Only blocks of up to 15 arguments are currently supported.");
    return NULL;
}

可以看到最多支持元組內(nèi)元素的個(gè)數(shù)是15個(gè)遏暴。

這里我們假設(shè)以元組里面有2個(gè)元素為例侄刽。

- (id)performWith:(id)obj1 :(id)obj2 {
    id (^block)(id, id) = self.block;
    return block(obj1, obj2);
}

對(duì)應(yīng)的Type Encoding如下:

argument    return value    0   1   2   3
methodSignature @   @   :   @   @

argument0和argument1分別對(duì)應(yīng)著隱藏參數(shù)self和_cmd,所以對(duì)應(yīng)著的類型是@和:朋凉,從argument2開(kāi)始州丹,就是入?yún)⒌腡ype Encoding了。

所以在構(gòu)造invocation的參數(shù)的時(shí)候杂彭,argIndex是要偏移2個(gè)位置的墓毒。即從(i + 2)開(kāi)始設(shè)置參數(shù)。

當(dāng)動(dòng)態(tài)構(gòu)造了一個(gè)invocation方法之后亲怠,[invocation invoke]調(diào)用這個(gè)動(dòng)態(tài)方法所计,也就是執(zhí)行了外部的reduceBlock閉包,閉包里面是我們想要信號(hào)變換的規(guī)則团秽。

閉包執(zhí)行結(jié)束得到returnVal返回值主胧。這個(gè)返回值就是整個(gè)RACBlockTrampoline的返回值了。這個(gè)返回值也作為了map閉包里面的返回值习勤。

接下去的操作就完全轉(zhuǎn)換成了map的操作了踪栋。上面已經(jīng)分析過(guò)map操作了,這里就不贅述了图毕。

  1. reduceApply

舉個(gè)例子:

    RACSignal *signalA = [RACSignal createSignal:
                         ^RACDisposable *(id<RACSubscriber> subscriber)
                         {
                             id block = ^id(NSNumber *first,NSNumber *second,NSNumber *third) {
                                 return @(first.integerValue + second.integerValue * third.integerValue);
                             };

                             [subscriber sendNext:RACTuplePack(block,@2 , @3 , @8)];
                             [subscriber sendNext:RACTuplePack((id)(^id(NSNumber *x){return @(x.intValue * 10);}),@9,@10,@30)];

                             [subscriber sendCompleted];
                             return [RACDisposable disposableWithBlock:^{
                                 NSLog(@"signal dispose");
                             }];
                         }];

    RACSignal *signalB = [signalA reduceApply];

使用reduceApply的條件也是需要信號(hào)里面的值是元組RACTuple夷都。不過(guò)這里和reduceEach不同的是,原信號(hào)中每個(gè)元祖RACTuple的第0位必須要為一個(gè)閉包予颤,后面n位為這個(gè)閉包的入?yún)⒍诠伲?位的閉包有幾個(gè)參數(shù)冬阳,后面就需要跟幾個(gè)參數(shù)。

如上述例子中党饮,第一個(gè)元組第0位的閉包有3個(gè)參數(shù)肝陪,所以第一個(gè)元組后面還要跟3個(gè)參數(shù)。第二個(gè)元組的第0位的閉包只有一個(gè)參數(shù)劫谅,所以后面只需要跟一個(gè)參數(shù)见坑。

當(dāng)然后面可以跟更多的參數(shù),如第二個(gè)元組捏检,閉包后面跟了3個(gè)參數(shù)荞驴,但是只有第一個(gè)參數(shù)是有效值,后面那2個(gè)參數(shù)是無(wú)效不起作用的贯城。唯一需要注意的就是后面跟的參數(shù)個(gè)數(shù)一定不能少于第0位閉包入?yún)⒌膫€(gè)數(shù)熊楼,否則就會(huì)報(bào)錯(cuò)。

上面例子輸出

26  // 26 = 2 + 3 * 8能犯;
90  // 90 = 9 * 10鲫骗;

看看底層實(shí)現(xiàn):

- (RACSignal *)reduceApply {
    return [[self map:^(RACTuple *tuple) {
        NSCAssert([tuple isKindOfClass:RACTuple.class], @"-reduceApply must only be used on a signal of RACTuples. Instead, received: %@", tuple);
        NSCAssert(tuple.count > 1, @"-reduceApply must only be used on a signal of RACTuples, with at least a block in tuple[0] and its first argument in tuple[1]");

        // We can't use -array, because we need to preserve RACTupleNil
        NSMutableArray *tupleArray = [NSMutableArray arrayWithCapacity:tuple.count];
        for (id val in tuple) {
            [tupleArray addObject:val];
        }
        RACTuple *arguments = [RACTuple tupleWithObjectsFromArray:[tupleArray subarrayWithRange:NSMakeRange(1, tupleArray.count - 1)]];

        return [RACBlockTrampoline invokeBlock:tuple[0] withArguments:arguments];
    }] setNameWithFormat:@"[%@] -reduceApply", self.name];
}

這里也有2個(gè)斷言,第一個(gè)是確保傳入的參數(shù)是RACTuple類型踩晶,第二個(gè)斷言是確保元組RACTuple里面的元素各種至少是2個(gè)执泰。因?yàn)橄旅嫒?shù)是直接從1號(hào)位開(kāi)始取的。

reduceApply做的事情和reduceEach基本是等效的渡蜻,只不過(guò)變換規(guī)則的block閉包一個(gè)是外部傳進(jìn)去的术吝,一個(gè)是直接打包在每個(gè)信號(hào)元組RACTuple中第0位中。

  1. materialize

這個(gè)方法會(huì)把信號(hào)包裝成RACEvent類型茸苇。

- (RACSignal *)materialize {
    return [[RACSignal createSignal:^(id<RACSubscriber> subscriber) {
        return [self subscribeNext:^(id x) {
            [subscriber sendNext:[RACEvent eventWithValue:x]];
        } error:^(NSError *error) {
            [subscriber sendNext:[RACEvent eventWithError:error]];
            [subscriber sendCompleted];
        } completed:^{
            [subscriber sendNext:RACEvent.completedEvent];
            [subscriber sendCompleted];
        }];
    }] setNameWithFormat:@"[%@] -materialize", self.name];
}

sendNext會(huì)包裝成[RACEvent eventWithValue:x]排苍,error會(huì)包裝成[RACEvent eventWithError:error],completed會(huì)被包裝成RACEvent.completedEvent学密。注意淘衙,當(dāng)原信號(hào)error和completed,新信號(hào)都會(huì)發(fā)送sendCompleted腻暮。

  1. dematerialize

這個(gè)操作是materialize的逆向操作彤守。它會(huì)把包裝成RACEvent信號(hào)重新還原為正常的值信號(hào)。

- (RACSignal *)dematerialize {
    return [[self bind:^{
        return ^(RACEvent *event, BOOL *stop) {
            switch (event.eventType) {
                case RACEventTypeCompleted:
                    *stop = YES;
                    return [RACSignal empty];

                case RACEventTypeError:
                    *stop = YES;
                    return [RACSignal error:event.error];

                case RACEventTypeNext:
                    return [RACSignal return:event.value];
            }
        };
    }] setNameWithFormat:@"[%@] -dematerialize", self.name];
}

這里的實(shí)現(xiàn)也用到了bind函數(shù)哭靖,它會(huì)把原信號(hào)進(jìn)行一個(gè)變換具垫。新的信號(hào)會(huì)根據(jù)event.eventType進(jìn)行轉(zhuǎn)換。RACEventTypeCompleted被轉(zhuǎn)換成[RACSignal empty]款青,RACEventTypeError被轉(zhuǎn)換成[RACSignal error:event.error],RACEventTypeNext被轉(zhuǎn)換成[RACSignal return:event.value]霍狰。

  1. not
- (RACSignal *)not {
    return [[self map:^(NSNumber *value) {
        NSCAssert([value isKindOfClass:NSNumber.class], @"-not must only be used on a signal of NSNumbers. Instead, got: %@", value);

        return @(!value.boolValue);
    }] setNameWithFormat:@"[%@] -not", self.name];
}

not操作需要傳入的值都是NSNumber類型的抡草。不是NSNumber類型會(huì)報(bào)錯(cuò)饰及。not操作會(huì)把每個(gè)NSNumber按照BOOL的規(guī)則,取非康震,當(dāng)成新信號(hào)的值燎含。

  1. and
- (RACSignal *)and {
    return [[self map:^(RACTuple *tuple) {
        NSCAssert([tuple isKindOfClass:RACTuple.class], @"-and must only be used on a signal of RACTuples of NSNumbers. Instead, received: %@", tuple);
        NSCAssert(tuple.count > 0, @"-and must only be used on a signal of RACTuples of NSNumbers, with at least 1 value in the tuple");

        return @([tuple.rac_sequence all:^(NSNumber *number) {
            NSCAssert([number isKindOfClass:NSNumber.class], @"-and must only be used on a signal of RACTuples of NSNumbers. Instead, tuple contains a non-NSNumber value: %@", tuple);

            return number.boolValue;
        }]);
    }] setNameWithFormat:@"[%@] -and", self.name];
}

and操作需要原信號(hào)的每個(gè)信號(hào)都是元組RACTuple類型的,因?yàn)橹挥羞@樣腿短,RACTuple類型里面的每個(gè)元素的值才能進(jìn)行&運(yùn)算屏箍。

and操作里面有3處斷言。第一處橘忱,判斷入?yún)⑹遣皇窃MRACTuple類型的赴魁。第二處,判斷RACTuple類型里面至少包含一個(gè)NSNumber钝诚。第三處颖御,判斷RACTuple里面是否都是NSNumber類型,有一個(gè)不符合凝颇,都會(huì)報(bào)錯(cuò)潘拱。

- (RACSequence *)rac_sequence {
    return [RACTupleSequence sequenceWithTupleBackingArray:self.backingArray offset:0];
}

RACTuple類型先轉(zhuǎn)換成RACTupleSequence。

+ (instancetype)sequenceWithTupleBackingArray:(NSArray *)backingArray offset:(NSUInteger)offset {
    NSCParameterAssert(offset <= backingArray.count);

    if (offset == backingArray.count) return self.empty;

    RACTupleSequence *seq = [[self alloc] init];
    seq->_tupleBackingArray = backingArray;
    seq->_offset = offset;
    return seq;
}

backingArray是一個(gè)數(shù)組NSArry拧略。這里關(guān)于RACTupleSequence和RACTuple會(huì)在以后的文章中詳細(xì)分析芦岂,本篇以分析RACSignal為主。

RACTuple類型先轉(zhuǎn)換成RACTupleSequence垫蛆,即存成了一個(gè)數(shù)組禽最。

- (BOOL)all:(BOOL (^)(id))block {
    NSCParameterAssert(block != NULL);

    NSNumber *result = [self foldLeftWithStart:@YES reduce:^(NSNumber *accumulator, id value) {
        return @(accumulator.boolValue && block(value));
    }];

    return result.boolValue;
}

- (id)foldLeftWithStart:(id)start reduce:(id (^)(id, id))reduce {
    NSCParameterAssert(reduce != NULL);

    if (self.head == nil) return start;

    for (id value in self) {
        start = reduce(start, value);
    }

    return start;
}

for會(huì)遍歷RACSequence里面存的每一個(gè)值,分別都去調(diào)用reduce( )閉包月褥。start的初始值為YES弛随。reduce( )閉包是:

^(NSNumber *accumulator, id value) { return @(accumulator.boolValue && block(value)); }

這里又會(huì)去調(diào)用block( )閉包:

^(NSNumber *number) { return number.boolValue; }

number是原信號(hào)RACTuple的第一個(gè)值。第一次循環(huán)reduce( )閉包是拿YES和原信號(hào)RACTuple的第一個(gè)值進(jìn)行&計(jì)算宁赤。第二個(gè)循環(huán)reduce( )閉包是拿原信號(hào)RACTuple的第一個(gè)值和第二個(gè)值進(jìn)行&計(jì)算舀透,得到的值參與下一次循環(huán),與第三個(gè)值進(jìn)行&計(jì)算决左,如此下去愕够。這也是折疊函數(shù)的意思,foldLeft從左邊開(kāi)始折疊佛猛。fold函數(shù)會(huì)從左至右惑芭,把RACTuple轉(zhuǎn)換成的數(shù)組里面每個(gè)值都一個(gè)接著一個(gè)進(jìn)行&計(jì)算。

每個(gè)RACTuple都map成這樣的一個(gè)BOOL值继找。接下去信號(hào)就map成了一個(gè)新的信號(hào)遂跟。

  1. or
- (RACSignal *)or {
    return [[self map:^(RACTuple *tuple) {
        NSCAssert([tuple isKindOfClass:RACTuple.class], @"-or must only be used on a signal of RACTuples of NSNumbers. Instead, received: %@", tuple);
        NSCAssert(tuple.count > 0, @"-or must only be used on a signal of RACTuples of NSNumbers, with at least 1 value in the tuple");

        return @([tuple.rac_sequence any:^(NSNumber *number) {
            NSCAssert([number isKindOfClass:NSNumber.class], @"-or must only be used on a signal of RACTuples of NSNumbers. Instead, tuple contains a non-NSNumber value: %@", tuple);

            return number.boolValue;
        }]);
    }] setNameWithFormat:@"[%@] -or", self.name];
}

or操作的實(shí)現(xiàn)和and操作的實(shí)現(xiàn)大體類似。3處斷言的作用和and操作完全一致,這里就不再贅述了幻锁。or操作的重點(diǎn)在any函數(shù)的實(shí)現(xiàn)上凯亮。or操作的入?yún)⒁脖仨毷荝ACTuple類型的。

- (BOOL)any:(BOOL (^)(id))block {
    NSCParameterAssert(block != NULL);

    return [self objectPassingTest:block] != nil;
}


- (id)objectPassingTest:(BOOL (^)(id))block {
    NSCParameterAssert(block != NULL);

    return [self filter:block].head;
}


- (instancetype)filter:(BOOL (^)(id value))block {
    NSCParameterAssert(block != nil);

    Class class = self.class;

    return [[self flattenMap:^ id (id value) {
        if (block(value)) {
            return [class return:value];
        } else {
            return class.empty;
        }
    }] setNameWithFormat:@"[%@] -filter:", self.name];
}

any會(huì)依次判斷RACTupleSequence數(shù)組里面的值哄尔,依次每個(gè)進(jìn)行filter假消。如果value對(duì)應(yīng)的BOOL值是YES,就轉(zhuǎn)換成一個(gè)RACTupleSequence信號(hào)岭接。如果對(duì)應(yīng)的是NO富拗,則轉(zhuǎn)換成一個(gè)empty信號(hào)。

只要RACTuple為NO鸣戴,就一直返回empty信號(hào)啃沪,直到BOOL值為YES,就返回1葵擎。map變換信號(hào)后變成成1谅阿。找到了YES之后的值就不會(huì)再判斷了。如果沒(méi)有找到Y(jié)ES酬滤,中間都是NO的話签餐,一直遍歷到數(shù)組最后一個(gè),信號(hào)只能返回0盯串。

  1. any:
- (RACSignal *)any:(BOOL (^)(id object))predicateBlock {
    NSCParameterAssert(predicateBlock != NULL);

    return [[[self materialize] bind:^{
        return ^(RACEvent *event, BOOL *stop) {
            if (event.finished) {
                *stop = YES;
                return [RACSignal return:@NO];
            }

            if (predicateBlock(event.value)) {
                *stop = YES;
                return [RACSignal return:@YES];
            }

            return [RACSignal empty];
        };
    }] setNameWithFormat:@"[%@] -any:", self.name];
}

原信號(hào)會(huì)先經(jīng)過(guò)materialize轉(zhuǎn)換包裝成RACEvent事件氯檐。依次判斷predicateBlock(event.value)值的BOOL值,如果返回YES体捏,就包裝成RACSignal的新信號(hào)冠摄,發(fā)送YES出去,并且stop接下來(lái)的信號(hào)几缭。如果返回MO河泳,就返回[RACSignal empty]空信號(hào)。直到event.finished年栓,返回[RACSignal return:@NO]拆挥。

所以any:操作的目的是找到第一個(gè)滿足predicateBlock條件的值。找到了就返回YES的RACSignal的信號(hào)某抓,如果沒(méi)有找到纸兔,返回NO的RACSignal。

  1. any
- (RACSignal *)any {
    return [[self any:^(id x) {
        return YES;
    }] setNameWithFormat:@"[%@] -any", self.name];
}

any操作是any:操作中的一種情況否副。即predicateBlock閉包永遠(yuǎn)都返回YES汉矿,所以any操作之后永遠(yuǎn)都只能得到一個(gè)只發(fā)送一個(gè)YES的新信號(hào)。

  1. all:
- (RACSignal *)all:(BOOL (^)(id object))predicateBlock {
    NSCParameterAssert(predicateBlock != NULL);

    return [[[self materialize] bind:^{
        return ^(RACEvent *event, BOOL *stop) {
            if (event.eventType == RACEventTypeCompleted) {
                *stop = YES;
                return [RACSignal return:@YES];
            }

            if (event.eventType == RACEventTypeError || !predicateBlock(event.value)) {
                *stop = YES;
                return [RACSignal return:@NO];
            }

            return [RACSignal empty];
        };
    }] setNameWithFormat:@"[%@] -all:", self.name];
}

all:操作和any:有點(diǎn)類似备禀。原信號(hào)會(huì)先經(jīng)過(guò)materialize轉(zhuǎn)換包裝成RACEvent事件洲拇。對(duì)原信號(hào)發(fā)送的每個(gè)信號(hào)都依次判斷predicateBlock(event.value)是否是NO 或者event.eventType == RACEventTypeError奈揍。如果predicateBlock(event.value)返回NO或者出現(xiàn)了錯(cuò)誤,新的信號(hào)都返回NO赋续。如果一直都沒(méi)出現(xiàn)問(wèn)題打月,在RACEventTypeCompleted的時(shí)候發(fā)送YES。

all:可以用來(lái)判斷整個(gè)原信號(hào)發(fā)送過(guò)程中是否有錯(cuò)誤事件RACEventTypeError蚕捉,或者是否存在predicateBlock為NO的情況〔裉裕可以把predicateBlock設(shè)置成一個(gè)正確條件迫淹。如果原信號(hào)出現(xiàn)錯(cuò)誤事件,或者不滿足設(shè)置的錯(cuò)誤條件为严,都會(huì)發(fā)送新信號(hào)返回NO敛熬。如果全過(guò)程都沒(méi)有出錯(cuò),或者都滿足predicateBlock設(shè)置的條件第股,則一直到RACEventTypeCompleted应民,發(fā)送YES的新信號(hào)。

  1. repeat
- (RACSignal *)repeat {
    return [[RACSignal createSignal:^(id<RACSubscriber> subscriber) {
        return subscribeForever(self,
                                ^(id x) {
                                    [subscriber sendNext:x];
                                },
                                ^(NSError *error, RACDisposable *disposable) {
                                    [disposable dispose];
                                    [subscriber sendError:error];
                                },
                                ^(RACDisposable *disposable) {
                                    // Resubscribe.
                                });
    }] setNameWithFormat:@"[%@] -repeat", self.name];
}

repeat操作返回一個(gè)subscribeForever閉包夕吻,閉包里面要傳入4個(gè)參數(shù)诲锹。

static RACDisposable *subscribeForever (RACSignal *signal, void (^next)(id), void (^error)(NSError *, RACDisposable *), void (^completed)(RACDisposable *)) {
    next = [next copy];
    error = [error copy];
    completed = [completed copy];

    RACCompoundDisposable *compoundDisposable = [RACCompoundDisposable compoundDisposable];

    RACSchedulerRecursiveBlock recursiveBlock = ^(void (^recurse)(void)) {
        RACCompoundDisposable *selfDisposable = [RACCompoundDisposable compoundDisposable];
        [compoundDisposable addDisposable:selfDisposable];

        __weak RACDisposable *weakSelfDisposable = selfDisposable;

        RACDisposable *subscriptionDisposable = [signal subscribeNext:next error:^(NSError *e) {
            @autoreleasepool {
                error(e, compoundDisposable);
                [compoundDisposable removeDisposable:weakSelfDisposable];
            }

            recurse();
        } completed:^{
            @autoreleasepool {
                completed(compoundDisposable);
                [compoundDisposable removeDisposable:weakSelfDisposable];
            }

            recurse();
        }];

        [selfDisposable addDisposable:subscriptionDisposable];
    };

    // Subscribe once immediately, and then use recursive scheduling for any
    // further resubscriptions.
    recursiveBlock(^{
        RACScheduler *recursiveScheduler = RACScheduler.currentScheduler ?: [RACScheduler scheduler];

        RACDisposable *schedulingDisposable = [recursiveScheduler scheduleRecursiveBlock:recursiveBlock];
        [compoundDisposable addDisposable:schedulingDisposable];
    });

    return compoundDisposable;
}
subscribeForever有4個(gè)參數(shù),第一個(gè)參數(shù)是原信號(hào)涉馅,第二個(gè)是傳入的next閉包归园,第三個(gè)是error閉包,最后一個(gè)是completed閉包稚矿。

subscribeForever一進(jìn)入這個(gè)函數(shù)就會(huì)調(diào)用recursiveBlock( )閉包庸诱,閉包中有一個(gè)recurse( )的入?yún)⒌膮?shù)。在recursiveBlock( )閉包中對(duì)原信號(hào)RACSignal進(jìn)行訂閱晤揣。next桥爽,error,completed分別會(huì)先調(diào)用傳進(jìn)來(lái)的閉包昧识。然后error钠四,completed執(zhí)行完error( )和completed( )閉包之后,還會(huì)繼續(xù)再執(zhí)行recurse( )滞诺,recurse( )是recursiveBlock的入?yún)ⅰ?

- (RACDisposable *)scheduleRecursiveBlock:(RACSchedulerRecursiveBlock)recursiveBlock {
    RACCompoundDisposable *disposable = [RACCompoundDisposable compoundDisposable];

    [self scheduleRecursiveBlock:[recursiveBlock copy] addingToDisposable:disposable];
    return disposable;
}

- (void)scheduleRecursiveBlock:(RACSchedulerRecursiveBlock)recursiveBlock addingToDisposable:(RACCompoundDisposable *)disposable {
    @autoreleasepool {
        RACCompoundDisposable *selfDisposable = [RACCompoundDisposable compoundDisposable];
        [disposable addDisposable:selfDisposable];

        __weak RACDisposable *weakSelfDisposable = selfDisposable;

        RACDisposable *schedulingDisposable = [self schedule:^{ // 此處省略 }];

        [selfDisposable addDisposable:schedulingDisposable];
    }
}

先取到當(dāng)前的currentScheduler乳蛾,即recursiveScheduler代咸,執(zhí)行scheduleRecursiveBlock,在這個(gè)函數(shù)中,會(huì)調(diào)用schedule函數(shù)乙嘀。這里的recursiveScheduler是RACQueueScheduler類型的。

- (RACDisposable *)schedule:(void (^)(void))block {
    NSCParameterAssert(block != NULL);

    RACDisposable *disposable = [[RACDisposable alloc] init];

    dispatch_async(self.queue, ^{
        if (disposable.disposed) return;
        [self performAsCurrentScheduler:block];
    });

    return disposable;
}

如果原信號(hào)沒(méi)有disposed咐刨,dispatch_async會(huì)繼續(xù)執(zhí)行block雕拼,在這個(gè)block中還會(huì)繼續(xù)原信號(hào)的發(fā)送。所以原信號(hào)只要沒(méi)有error信號(hào),disposable.disposed就不會(huì)返回YES处嫌,就會(huì)一直調(diào)用block栅贴。所以在subscribeForever的error和completed的最后都會(huì)調(diào)用recurse( )閉包。error調(diào)用recurse( )閉包是為了結(jié)束調(diào)用block熏迹,結(jié)束所有的信號(hào)檐薯。completed調(diào)用recurse( )閉包是為了繼續(xù)調(diào)用block( )閉包,也就是repeat的本質(zhì)注暗。原信號(hào)會(huì)繼續(xù)發(fā)送信號(hào)坛缕,如此無(wú)限循環(huán)下去。

  1. retry:
- (RACSignal *)retry:(NSInteger)retryCount {
    return [[RACSignal createSignal:^(id<RACSubscriber> subscriber) {
        __block NSInteger currentRetryCount = 0;
        return subscribeForever(self,
                                ^(id x) {
                                    [subscriber sendNext:x];
                                },
                                ^(NSError *error, RACDisposable *disposable) {
                                    if (retryCount == 0 || currentRetryCount < retryCount) {
                                        // Resubscribe.
                                        currentRetryCount++;
                                        return;
                                    }

                                    [disposable dispose];
                                    [subscriber sendError:error];
                                },
                                ^(RACDisposable *disposable) {
                                    [disposable dispose];
                                    [subscriber sendCompleted];
                                });
    }] setNameWithFormat:@"[%@] -retry: %lu", self.name, (unsigned long)retryCount];
}

在retry:的實(shí)現(xiàn)中捆昏,和repeat實(shí)現(xiàn)的區(qū)別是中間加入了一個(gè)currentRetryCount值赚楚。如果currentRetryCount > retryCount的話,就會(huì)在error中調(diào)用[disposable dispose]骗卜,這樣subscribeForever就不會(huì)再無(wú)限循環(huán)下去了宠页。

所以retry:操作的用途就是在原信號(hào)在出現(xiàn)error的時(shí)候,重試retryCount的次數(shù)寇仓,如果依舊error举户,那么就會(huì)停止重試。

如果原信號(hào)沒(méi)有發(fā)生錯(cuò)誤遍烦,那么原信號(hào)在發(fā)送結(jié)束敛摘,subscribeForever也就結(jié)束了。retry:操作對(duì)于沒(méi)有任何error的信號(hào)相當(dāng)于什么都沒(méi)有發(fā)生乳愉。

  1. retry
- (RACSignal *)retry {
    return [[self retry:0] setNameWithFormat:@"[%@] -retry", self.name];
}

這里的retry操作就是一個(gè)無(wú)限重試的操作兄淫。因?yàn)閞etryCount設(shè)置成0之后,在error的閉包中中蔓姚,retryCount 永遠(yuǎn)等于 0捕虽,原信號(hào)永遠(yuǎn)都不會(huì)被dispose,所以subscribeForever會(huì)一直無(wú)限重試下去坡脐。

同樣的泄私,如果對(duì)一個(gè)沒(méi)有error的信號(hào)調(diào)用retry操作,也是不起任何作用的备闲。

  1. scanWithStart: reduceWithIndex: (在父類RACStream中定義的)

先寫(xiě)出測(cè)試代碼:

    RACSignal *signalA = [RACSignal createSignal:^RACDisposable *(id<RACSubscriber> subscriber)
                         {
                             [subscriber sendNext:@1];
                             [subscriber sendNext:@1];
                             [subscriber sendNext:@4];
                             return [RACDisposable disposableWithBlock:^{
                             }];
                         }];

    RACSignal *signalB = [signalA scanWithStart:@(2) reduceWithIndex:^id(NSNumber * running, NSNumber * next, NSUInteger index) {
        return @(running.intValue * next.intValue + index);
    }];
2    // 2 * 1 + 0 = 2
3    // 2 * 1 + 1 = 3
14   // 3 * 4 + 2 = 14
- (instancetype)scanWithStart:(id)startingValue reduceWithIndex:(id (^)(id, id, NSUInteger))reduceBlock {
    NSCParameterAssert(reduceBlock != nil);

    Class class = self.class;

    return [[self bind:^{
        __block id running = startingValue;
        __block NSUInteger index = 0;

        return ^(id value, BOOL *stop) {
            running = reduceBlock(running, value, index++);
            return [class return:running];
        };
    }] setNameWithFormat:@"[%@] -scanWithStart: %@ reduceWithIndex:", self.name, [startingValue rac_description]];
}

scanWithStart這個(gè)變換由初始值晌端,變換函數(shù)reduceBlock( ),和index步進(jìn)的變量組成恬砂。原信號(hào)的每個(gè)信號(hào)都會(huì)由變換函數(shù)reduceBlock( )進(jìn)行變換咧纠。index每次都是自增。變換的初始值是由入?yún)tartingValue傳入的泻骤。

  1. scanWithStart: reduce: (在父類RACStream中定義的)
- (instancetype)scanWithStart:(id)startingValue reduce:(id (^)(id running, id next))reduceBlock {
    NSCParameterAssert(reduceBlock != nil);

    return [[self
             scanWithStart:startingValue
             reduceWithIndex:^(id running, id next, NSUInteger index) {
                 return reduceBlock(running, next);
             }]
            setNameWithFormat:@"[%@] -scanWithStart: %@ reduce:", self.name, [startingValue rac_description]];
}

scanWithStart: reduce:就是scanWithStart: reduceWithIndex: 的縮略版漆羔。變換函數(shù)也是外面閉包reduceBlock( )傳進(jìn)來(lái)的梧奢。只不過(guò)變換過(guò)程中不會(huì)使用index自增的這個(gè)變量。

  1. aggregateWithStart: reduceWithIndex:
- (RACSignal *)aggregateWithStart:(id)start reduceWithIndex:(id (^)(id, id, NSUInteger))reduceBlock {
    return [[[[self
               scanWithStart:start reduceWithIndex:reduceBlock]
              startWith:start]
             takeLast:1]
            setNameWithFormat:@"[%@] -aggregateWithStart: %@ reduceWithIndex:", self.name, [start rac_description]];
}

aggregate是合計(jì)的意思演痒。所以最后變換出來(lái)的信號(hào)只有最后一個(gè)值亲轨。
aggregateWithStart: reduceWithIndex:操作調(diào)用了scanWithStart: reduceWithIndex:,原理和它完全一致鸟顺。不同的是多了兩步額外的操作惦蚊,一個(gè)是startWith:,一個(gè)是takeLast:1讯嫂。startWith:是在scanWithStart: reduceWithIndex:變換之后的信號(hào)之前加上start信號(hào)养筒。takeLast:1是取最后一個(gè)信號(hào)。takeLast:和startWith:的詳細(xì)分析文章下面會(huì)詳述端姚。

值得注意的一點(diǎn)是,原信號(hào)如果沒(méi)有發(fā)送complete信號(hào)挤悉,那么該函數(shù)就不會(huì)輸出新的信號(hào)值渐裸。因?yàn)樵谝恢钡却Y(jié)束。

  1. aggregateWithStart: reduce:
- (RACSignal *)aggregateWithStart:(id)start reduce:(id (^)(id running, id next))reduceBlock {
    return [[self
             aggregateWithStart:start
             reduceWithIndex:^(id running, id next, NSUInteger index) {
                 return reduceBlock(running, next);
             }]
            setNameWithFormat:@"[%@] -aggregateWithStart: %@ reduce:", self.name, [start rac_description]];
}

aggregateWithStart: reduce:調(diào)用aggregateWithStart: reduceWithIndex:函數(shù)装悲,只不過(guò)沒(méi)有只用index值昏鹃。同樣,如果原信號(hào)沒(méi)有發(fā)送complete信號(hào)诀诊,也不會(huì)輸出任何信號(hào)洞渤。

  1. aggregateWithStartFactory: reduce:
- (RACSignal *)aggregateWithStartFactory:(id (^)(void))startFactory reduce:(id (^)(id running, id next))reduceBlock {
    NSCParameterAssert(startFactory != NULL);
    NSCParameterAssert(reduceBlock != NULL);

    return [[RACSignal defer:^{
        return [self aggregateWithStart:startFactory() reduce:reduceBlock];
    }] setNameWithFormat:@"[%@] -aggregateWithStartFactory:reduce:", self.name];
}

aggregateWithStartFactory: reduce:內(nèi)部實(shí)現(xiàn)就是調(diào)用aggregateWithStart: reduce:,只不過(guò)入?yún)⒍嗔艘粋€(gè)產(chǎn)生start的startFactory( )閉包罷了属瓣。

  1. collect
- (RACSignal *)collect {
    return [[self aggregateWithStartFactory:^{
        return [[NSMutableArray alloc] init];
    } reduce:^(NSMutableArray *collectedValues, id x) {
        [collectedValues addObject:(x ?: NSNull.null)];
        return collectedValues;
    }] setNameWithFormat:@"[%@] -collect", self.name];
}

collect函數(shù)會(huì)調(diào)用aggregateWithStartFactory: reduce:方法载迄。把所有原信號(hào)的值收集起來(lái),保存在NSMutableArray中抡蛙。

二. 時(shí)間操作

  1. throttle:valuesPassingTest:

這個(gè)操作傳入一個(gè)時(shí)間間隔NSTimeInterval护昧,和一個(gè)判斷條件的閉包predicate。原信號(hào)在一個(gè)時(shí)間間隔NSTimeInterval之間發(fā)送的信號(hào)粗截,如果還能滿足predicate惋耙,則原信號(hào)都被“吞”了,直到一個(gè)時(shí)間間隔NSTimeInterval結(jié)束熊昌,會(huì)再次判斷predicate绽榛,如果不滿足了,原信號(hào)就會(huì)被發(fā)送出來(lái)婿屹。

如上圖灭美,原信號(hào)發(fā)送1以后,間隔NSTimeInterval的時(shí)間內(nèi)昂利,沒(méi)有信號(hào)發(fā)出冲粤,并且predicate也為YES美莫,就把1變換成新的信號(hào)發(fā)出去。接下去由于原信號(hào)發(fā)送2梯捕,3厢呵,4的過(guò)程中,都在間隔NSTimeInterval的時(shí)間內(nèi)傀顾,所以都被“吞”了襟铭。直到原信號(hào)發(fā)送5之后,間隔NSTimeInterval的時(shí)間內(nèi)沒(méi)有新的信號(hào)發(fā)出短曾,所以把原信號(hào)的5發(fā)送出來(lái)寒砖。原信號(hào)的6也是如此。

再來(lái)看看具體實(shí)現(xiàn):

- (RACSignal *)throttle:(NSTimeInterval)interval valuesPassingTest:(BOOL (^)(id next))predicate {
    NSCParameterAssert(interval >= 0);
    NSCParameterAssert(predicate != nil);

    return [[RACSignal createSignal:^(id<RACSubscriber> subscriber) {
        RACCompoundDisposable *compoundDisposable = [RACCompoundDisposable compoundDisposable];

        RACScheduler *scheduler = [RACScheduler scheduler];

        __block id nextValue = nil;
        __block BOOL hasNextValue = NO;
        RACSerialDisposable *nextDisposable = [[RACSerialDisposable alloc] init];

        void (^flushNext)(BOOL send) = ^(BOOL send) { // 暫時(shí)省略 };

        RACDisposable *subscriptionDisposable = [self subscribeNext:^(id x) {
            // 暫時(shí)省略
        } error:^(NSError *error) {
            [compoundDisposable dispose];
            [subscriber sendError:error];
        } completed:^{
            flushNext(YES);
            [subscriber sendCompleted];
        }];

        [compoundDisposable addDisposable:subscriptionDisposable];
        return compoundDisposable;
    }] setNameWithFormat:@"[%@] -throttle: %f valuesPassingTest:", self.name, (double)interval];
}

看這段實(shí)現(xiàn)嫉拐,里面有2處斷言哩都。會(huì)先判斷傳入的interval是否大于0,小于0當(dāng)然是不行的婉徘。還有一個(gè)就是傳入的predicate閉包不能為空漠嵌,這個(gè)是接下來(lái)用來(lái)控制流程的。

接下來(lái)的實(shí)現(xiàn)還是按照套路來(lái)盖呼,返回值是一個(gè)信號(hào)儒鹿,新信號(hào)的閉包里面再訂閱原信號(hào)進(jìn)行變換。

那么整個(gè)變換的重點(diǎn)就落在了flushNext閉包和訂閱原信號(hào)subscribeNext閉包中了几晤。

當(dāng)新的信號(hào)一旦被訂閱约炎,閉包執(zhí)行到此處,就會(huì)對(duì)原信號(hào)進(jìn)行訂閱蟹瘾。

[self subscribeNext:^(id x) {
    RACScheduler *delayScheduler = RACScheduler.currentScheduler ?: scheduler;
    BOOL shouldThrottle = predicate(x);

    @synchronized (compoundDisposable) {   
        flushNext(NO);
        if (!shouldThrottle) {
            [subscriber sendNext:x];
            return;
        }
        nextValue = x;
        hasNextValue = YES;
        nextDisposable.disposable = [delayScheduler afterDelay:interval schedule:^{
            flushNext(YES);
        }];
    }
}

首先先創(chuàng)建一個(gè)delayScheduler圾浅。先判斷當(dāng)前的currentScheduler是否存在,不存在就取之前創(chuàng)建的[RACScheduler scheduler]憾朴。這里雖然兩處都是RACTargetQueueScheduler類型的贱傀,但是currentScheduler是com.ReactiveCocoa.RACScheduler.mainThreadScheduler,而[RACScheduler scheduler]創(chuàng)建的是com.ReactiveCocoa.RACScheduler.backgroundScheduler伊脓。

調(diào)用predicate( )閉包府寒,傳入原信號(hào)發(fā)來(lái)的信號(hào)值x,經(jīng)過(guò)predicate判斷以后报腔,得到是否打開(kāi)節(jié)流開(kāi)關(guān)的BOOL變量shouldThrottle株搔。

之所以把RACCompoundDisposable作為線程間互斥信號(hào)量,因?yàn)镽ACCompoundDisposable里面會(huì)加入所有的RACDisposable信號(hào)纯蛾。接著下面的操作用@synchronized給線程間加鎖纤房。

flushNext( )這個(gè)閉包是為了hook住原信號(hào)的發(fā)送。

void (^flushNext)(BOOL send) = ^(BOOL send) {
    @synchronized (compoundDisposable) {
        [nextDisposable.disposable dispose];

        if (!hasNextValue) return;
        if (send) [subscriber sendNext:nextValue];

        nextValue = nil;
        hasNextValue = NO;
    }
};

這個(gè)閉包中如果傳入的是NO翻诉,那么原信號(hào)就無(wú)法立即sendNext炮姨。如果傳入的是YES捌刮,并且hasNextValue = YES,原信號(hào)待發(fā)送的還有值舒岸,那么就發(fā)送原信號(hào)绅作。

shouldThrottle是一個(gè)閥門,隨時(shí)控制原信號(hào)是否可以被發(fā)送蛾派。

小結(jié)一下俄认,每個(gè)原信號(hào)發(fā)送過(guò)來(lái),通過(guò)在throttle:valuesPassingTest:里面的did subscriber閉包中進(jìn)行訂閱洪乍。這個(gè)閉包中主要干了4件事情:

調(diào)用flushNext(NO)閉包判斷能否發(fā)送原信號(hào)的值眯杏。入?yún)镹O,不發(fā)送原信號(hào)的值壳澳。
判斷閥門條件predicate(x)能否發(fā)送原信號(hào)的值岂贩。
如果以上兩個(gè)條件都滿足,nextValue中進(jìn)行賦值為原信號(hào)發(fā)來(lái)的值巷波,hasNextValue = YES代表當(dāng)前有要發(fā)送的值萎津。
開(kāi)啟一個(gè)delayScheduler,延遲interval的時(shí)間褥紫,發(fā)送原信號(hào)的這個(gè)值,即調(diào)用flushNext(YES)瞪慧。
現(xiàn)在再來(lái)分析一下整個(gè)throttle:valuesPassingTest:的全過(guò)程

原信號(hào)發(fā)出第一個(gè)值髓考,如果在interval的時(shí)間間隔內(nèi),沒(méi)有新的信號(hào)發(fā)送弃酌,那么delayScheduler延遲interval的時(shí)間氨菇,執(zhí)行flushNext(YES),發(fā)送原信號(hào)的這個(gè)第一個(gè)值妓湘。

- (RACDisposable *)afterDelay:(NSTimeInterval)delay schedule:(void (^)(void))block {
    return [self after:[NSDate dateWithTimeIntervalSinceNow:delay] schedule:block];
}

- (RACDisposable *)after:(NSDate *)date schedule:(void (^)(void))block {
    NSCParameterAssert(date != nil);
    NSCParameterAssert(block != NULL);

    RACDisposable *disposable = [[RACDisposable alloc] init];

    dispatch_after([self.class wallTimeWithDate:date], self.queue, ^{
        if (disposable.disposed) return;
        [self performAsCurrentScheduler:block];
    });

    return disposable;
}

注意查蓉,在dispatch_after閉包里面之前[self performAsCurrentScheduler:block]之前,有一個(gè)關(guān)鍵的判斷:

if (disposable.disposed) return;
這個(gè)判斷就是用來(lái)判斷從第一個(gè)信號(hào)發(fā)出榜贴,在間隔interval的時(shí)間之內(nèi)豌研,還有沒(méi)有其他信號(hào)存在。如果有唬党,第一個(gè)信號(hào)肯定會(huì)disposed鹃共,這里會(huì)執(zhí)行return,所以也就不會(huì)把第一個(gè)信號(hào)發(fā)送出來(lái)了驶拱。

這樣也就達(dá)到了節(jié)流的目的:原來(lái)每個(gè)信號(hào)都會(huì)創(chuàng)建一個(gè)delayScheduler霜浴,都會(huì)延遲interval的時(shí)間,在這個(gè)時(shí)間內(nèi)蓝纲,如果原信號(hào)再?zèng)]有發(fā)送新值阴孟,即原信號(hào)沒(méi)有disposed晌纫,就把原信號(hào)的值發(fā)出來(lái);如果在這個(gè)時(shí)間內(nèi)永丝,原信號(hào)還發(fā)送了一個(gè)新值锹漱,那么第一個(gè)值就被丟棄。在發(fā)送過(guò)程中类溢,每個(gè)信號(hào)都要判斷一次predicate( )凌蔬,這個(gè)是閥門的開(kāi)關(guān),如果隨時(shí)都不節(jié)流了闯冷,原信號(hào)發(fā)的值就需要立即被發(fā)送出來(lái)砂心。

還有二點(diǎn)需要注意的是,第一點(diǎn)蛇耀,正好在interval那一時(shí)刻辩诞,有新信號(hào)發(fā)送出來(lái),原信號(hào)也會(huì)被丟棄纺涤,即只有在>=interval的時(shí)間之內(nèi)译暂,原信號(hào)沒(méi)有發(fā)送新值,原來(lái)的這個(gè)值才能發(fā)送出來(lái)撩炊。第二點(diǎn)外永,原信號(hào)發(fā)送completed時(shí),會(huì)立即執(zhí)行flushNext(YES)拧咳,把原信號(hào)的最后一個(gè)值發(fā)送出來(lái)伯顶。

  1. throttle:
- (RACSignal *)throttle:(NSTimeInterval)interval {
    return [[self throttle:interval valuesPassingTest:^(id _) {
        return YES;
    }] setNameWithFormat:@"[%@] -throttle: %f", self.name, (double)interval];
}

這個(gè)操作其實(shí)就是調(diào)用了throttle:valuesPassingTest:方法,傳入時(shí)間間隔interval骆膝,predicate( )閉包則永遠(yuǎn)返回YES祭衩,原信號(hào)的每個(gè)信號(hào)都執(zhí)行節(jié)流操作。

  1. bufferWithTime:onScheduler:

這個(gè)操作的實(shí)現(xiàn)是類似于throttle:valuesPassingTest:的實(shí)現(xiàn)阅签。

- (RACSignal *)bufferWithTime:(NSTimeInterval)interval onScheduler:(RACScheduler *)scheduler {
    NSCParameterAssert(scheduler != nil);
    NSCParameterAssert(scheduler != RACScheduler.immediateScheduler);

    return [[RACSignal createSignal:^(id<RACSubscriber> subscriber) {
        RACSerialDisposable *timerDisposable = [[RACSerialDisposable alloc] init];
        NSMutableArray *values = [NSMutableArray array];

        void (^flushValues)() = ^{
            // 暫時(shí)省略
        };

        RACDisposable *selfDisposable = [self subscribeNext:^(id x) {
            // 暫時(shí)省略
        } error:^(NSError *error) {
            [subscriber sendError:error];
        } completed:^{
            flushValues();
            [subscriber sendCompleted];
        }];

        return [RACDisposable disposableWithBlock:^{
            [selfDisposable dispose];
            [timerDisposable dispose];
        }];
    }] setNameWithFormat:@"[%@] -bufferWithTime: %f onScheduler: %@", self.name, (double)interval, scheduler];
}

bufferWithTime:onScheduler:的實(shí)現(xiàn)和throttle:valuesPassingTest:的實(shí)現(xiàn)給出類似掐暮。開(kāi)始有2個(gè)斷言,2個(gè)都是判斷scheduler的政钟,第一個(gè)斷言是判斷scheduler是否為nil路克。第二個(gè)斷言是判斷scheduler的類型的,scheduler類型不能是immediateScheduler類型的养交,因?yàn)檫@個(gè)方法是要緩存一些信號(hào)的衷戈,所以不能是immediateScheduler類型的。

RACDisposable *selfDisposable = [self subscribeNext:^(id x) {
    @synchronized (values) {
        if (values.count == 0) {
            timerDisposable.disposable = [scheduler afterDelay:interval schedule:flushValues];
        }
        [values addObject:x ?: RACTupleNil.tupleNil];
    }
}

在subscribeNext中层坠,當(dāng)數(shù)組里面是沒(méi)有存任何原信號(hào)的值殖妇,就會(huì)開(kāi)啟一個(gè)scheduler,延遲interval時(shí)間破花,執(zhí)行flushValues閉包谦趣。如果里面有值了疲吸,就繼續(xù)加到values的數(shù)組中。關(guān)鍵的也是閉包里面的內(nèi)容前鹅,代碼如下:

void (^flushValues)() = ^{
    @synchronized (values) {
        [timerDisposable.disposable dispose];

        if (values.count == 0) return;

        RACTuple *tuple = [RACTuple tupleWithObjectsFromArray:values];
        [values removeAllObjects];
        [subscriber sendNext:tuple];
    }
};

flushValues( )閉包里面主要是把數(shù)組包裝成一個(gè)元組摘悴,并且全部發(fā)送出來(lái)舰绘,原數(shù)組里面就全部清空了蹂喻。這也是bufferWithTime:onScheduler:的作用,在interval時(shí)間內(nèi)捂寿,把這個(gè)時(shí)間間隔內(nèi)的原信號(hào)都緩存起來(lái)口四,并且在interval的那一刻,把這些緩存的信號(hào)打包成一個(gè)元組秦陋,發(fā)送出來(lái)蔓彩。

和throttle:valuesPassingTest:方法一樣,在原信號(hào)completed的時(shí)候驳概,立即執(zhí)行flushValues( )閉包赤嚼,把里面存的值都發(fā)送出來(lái)。

  1. delay:

delay:函數(shù)的操作和上面幾個(gè)套路都是一樣的顺又,實(shí)現(xiàn)方式也都是模板式的更卒,唯一的不同都在subscribeNext中,和一個(gè)判斷是否發(fā)送的閉包中稚照。

- (RACSignal *)delay:(NSTimeInterval)interval {
    return [[RACSignal createSignal:^(id<RACSubscriber> subscriber) {
        RACCompoundDisposable *disposable = [RACCompoundDisposable compoundDisposable];

        // We may never use this scheduler, but we need to set it up ahead of
        // time so that our scheduled blocks are run serially if we do.
        RACScheduler *scheduler = [RACScheduler scheduler];

        void (^schedule)(dispatch_block_t) = ^(dispatch_block_t block) {
            // 暫時(shí)省略
        };

        RACDisposable *subscriptionDisposable = [self subscribeNext:^(id x) {
            // 暫時(shí)省略
        } error:^(NSError *error) {
            [subscriber sendError:error];
        } completed:^{
            schedule(^{
                [subscriber sendCompleted];
            });
        }];

        [disposable addDisposable:subscriptionDisposable];
        return disposable;
    }] setNameWithFormat:@"[%@] -delay: %f", self.name, (double)interval];
}

在delay:的subscribeNext中蹂空,就單純的執(zhí)行了schedule的閉包。

        RACDisposable *subscriptionDisposable = [self subscribeNext:^(id x) {
            schedule(^{
                [subscriber sendNext:x];
            });
        }

  void (^schedule)(dispatch_block_t) = ^(dispatch_block_t block) {
          RACScheduler *delayScheduler = RACScheduler.currentScheduler ?: scheduler;
          RACDisposable *schedulerDisposable = [delayScheduler afterDelay:interval schedule:block];
          [disposable addDisposable:schedulerDisposable];
      };

在schedule閉包中做的時(shí)間就是延遲interval的時(shí)間發(fā)送原信號(hào)的值锐锣。

  1. interval:onScheduler:withLeeway:
+ (RACSignal *)interval:(NSTimeInterval)interval onScheduler:(RACScheduler *)scheduler withLeeway:(NSTimeInterval)leeway {
    NSCParameterAssert(scheduler != nil);
    NSCParameterAssert(scheduler != RACScheduler.immediateScheduler);

    return [[RACSignal createSignal:^(id<RACSubscriber> subscriber) {
        return [scheduler after:[NSDate dateWithTimeIntervalSinceNow:interval] repeatingEvery:interval withLeeway:leeway schedule:^{
            [subscriber sendNext:[NSDate date]];
        }];
    }] setNameWithFormat:@"+interval: %f onScheduler: %@ withLeeway: %f", (double)interval, scheduler, (double)leeway];
}

在這個(gè)操作中腌闯,實(shí)現(xiàn)代碼不難绳瘟。先來(lái)看看2個(gè)斷言雕憔,都是保護(hù)入?yún)㈩愋偷模瑂cheduler不能為空糖声,且不能是immediateScheduler的類型斤彼,原因和上面是一樣的,這里是延遲操作蘸泻。

主要的實(shí)現(xiàn)就在after:repeatingEvery:withLeeway:schedule:上了琉苇。

- (RACDisposable *)after:(NSDate *)date repeatingEvery:(NSTimeInterval)interval withLeeway:(NSTimeInterval)leeway schedule:(void (^)(void))block {
    NSCParameterAssert(date != nil);
    NSCParameterAssert(interval > 0.0 && interval < INT64_MAX / NSEC_PER_SEC);
    NSCParameterAssert(leeway >= 0.0 && leeway < INT64_MAX / NSEC_PER_SEC);
    NSCParameterAssert(block != NULL);

    uint64_t intervalInNanoSecs = (uint64_t)(interval * NSEC_PER_SEC);
    uint64_t leewayInNanoSecs = (uint64_t)(leeway * NSEC_PER_SEC);

    dispatch_source_t timer = dispatch_source_create(DISPATCH_SOURCE_TYPE_TIMER, 0, 0, self.queue);
    dispatch_source_set_timer(timer, [self.class wallTimeWithDate:date], intervalInNanoSecs, leewayInNanoSecs);
    dispatch_source_set_event_handler(timer, block);
    dispatch_resume(timer);

    return [RACDisposable disposableWithBlock:^{
        dispatch_source_cancel(timer);
    }];
}

這里的實(shí)現(xiàn)就是用GCD在self.queue上創(chuàng)建了一個(gè)Timer,時(shí)間間隔是interval悦施,修正時(shí)間是leeway并扇。

leeway這個(gè)參數(shù)是為dispatch source指定一個(gè)期望的定時(shí)器事件精度,讓系統(tǒng)能夠靈活地管理并喚醒內(nèi)核抡诞。例如系統(tǒng)可以使用leeway值來(lái)提前或延遲觸發(fā)定時(shí)器穷蛹,使其更好地與其它系統(tǒng)事件結(jié)合土陪。創(chuàng)建自己的定時(shí)器時(shí),應(yīng)該盡量指定一個(gè)leeway值肴熏。不過(guò)就算指定leeway值為0鬼雀,也不能完完全全期望定時(shí)器能夠按照精確的納秒來(lái)觸發(fā)事件。

這個(gè)定時(shí)器在interval執(zhí)行sendNext操作蛙吏,也就是發(fā)送原信號(hào)的值源哩。

  1. interval:onScheduler:
+ (RACSignal *)interval:(NSTimeInterval)interval onScheduler:(RACScheduler *)scheduler {
    return [[RACSignal interval:interval onScheduler:scheduler withLeeway:0.0] setNameWithFormat:@"+interval: %f onScheduler: %@", (double)interval, scheduler];
}

這個(gè)操作就是調(diào)用上一個(gè)方法interval:onScheduler:withLeeway:衩辟,只不過(guò)leeway = 0.0倒得。具體實(shí)現(xiàn)上面已經(jīng)分析過(guò)了,這里不再贅述吼过。

最后

本來(lái)想窮盡分析每一個(gè)RACSignal的操作的實(shí)現(xiàn)馁龟,但是發(fā)現(xiàn)所有操作加起來(lái)實(shí)在太多崩侠,用一篇文章全部寫(xiě)完篇幅太長(zhǎng)了,還是拆成幾篇坷檩,RACSignal還剩下過(guò)濾操作却音,多信號(hào)組合操作,冷熱信號(hào)轉(zhuǎn)換操作矢炼,高階信號(hào)操作系瓢,下篇接著繼續(xù)分析。最后請(qǐng)大家多多指教句灌。

轉(zhuǎn)載自:https://gold.xitu.io/post/583907a961ff4b006cafce82

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末夷陋,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子胰锌,更是在濱河造成了極大的恐慌骗绕,老刑警劉巖,帶你破解...
    沈念sama閱讀 206,482評(píng)論 6 481
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件资昧,死亡現(xiàn)場(chǎng)離奇詭異酬土,居然都是意外死亡,警方通過(guò)查閱死者的電腦和手機(jī)格带,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,377評(píng)論 2 382
  • 文/潘曉璐 我一進(jìn)店門撤缴,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái),“玉大人叽唱,你說(shuō)我怎么就攤上這事屈呕。” “怎么了棺亭?”我有些...
    開(kāi)封第一講書(shū)人閱讀 152,762評(píng)論 0 342
  • 文/不壞的土叔 我叫張陵虎眨,是天一觀的道長(zhǎng)。 經(jīng)常有香客問(wèn)我,道長(zhǎng)嗽桩,這世上最難降的妖魔是什么钟鸵? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 55,273評(píng)論 1 279
  • 正文 為了忘掉前任,我火速辦了婚禮涤躲,結(jié)果婚禮上棺耍,老公的妹妹穿的比我還像新娘。我一直安慰自己种樱,他們只是感情好蒙袍,可當(dāng)我...
    茶點(diǎn)故事閱讀 64,289評(píng)論 5 373
  • 文/花漫 我一把揭開(kāi)白布。 她就那樣靜靜地躺著嫩挤,像睡著了一般害幅。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上岂昭,一...
    開(kāi)封第一講書(shū)人閱讀 49,046評(píng)論 1 285
  • 那天以现,我揣著相機(jī)與錄音,去河邊找鬼约啊。 笑死邑遏,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的恰矩。 我是一名探鬼主播记盒,決...
    沈念sama閱讀 38,351評(píng)論 3 400
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼外傅!你這毒婦竟也來(lái)了纪吮?” 一聲冷哼從身側(cè)響起,我...
    開(kāi)封第一講書(shū)人閱讀 36,988評(píng)論 0 259
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤萎胰,失蹤者是張志新(化名)和其女友劉穎碾盟,沒(méi)想到半個(gè)月后,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體技竟,經(jīng)...
    沈念sama閱讀 43,476評(píng)論 1 300
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡冰肴,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 35,948評(píng)論 2 324
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了灵奖。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片嚼沿。...
    茶點(diǎn)故事閱讀 38,064評(píng)論 1 333
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡估盘,死狀恐怖瓷患,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情遣妥,我是刑警寧澤擅编,帶...
    沈念sama閱讀 33,712評(píng)論 4 323
  • 正文 年R本政府宣布,位于F島的核電站,受9級(jí)特大地震影響爱态,放射性物質(zhì)發(fā)生泄漏谭贪。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,261評(píng)論 3 307
  • 文/蒙蒙 一锦担、第九天 我趴在偏房一處隱蔽的房頂上張望俭识。 院中可真熱鬧,春花似錦洞渔、人聲如沸套媚。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 30,264評(píng)論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)堤瘤。三九已至,卻和暖如春浆熔,著一層夾襖步出監(jiān)牢的瞬間本辐,已是汗流浹背。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 31,486評(píng)論 1 262
  • 我被黑心中介騙來(lái)泰國(guó)打工医增, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留慎皱,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 45,511評(píng)論 2 354
  • 正文 我出身青樓叶骨,卻偏偏與公主長(zhǎng)得像宝冕,于是被迫代替她去往敵國(guó)和親。 傳聞我的和親對(duì)象是個(gè)殘疾皇子邓萨,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 42,802評(píng)論 2 345

推薦閱讀更多精彩內(nèi)容