ReactiveCocoa 中 RACSignal 所有變換操作底層實(shí)現(xiàn)分析(中)

前言

緊接著上篇的源碼實(shí)現(xiàn)分析,繼續(xù)分析RACSignal的變換操作的底層實(shí)現(xiàn)勿决。

目錄

1.過濾操作
2.組合操作

一. 過濾操作

過濾操作也屬于一種變換乒躺,根據(jù)過濾條件,過濾出符合條件的值低缩。變換出來的新的信號是原信號的一個(gè)子集嘉冒。

  1. filter: (在父類RACStream中定義的)

這個(gè)filter:操作在any:的實(shí)現(xiàn)中用到過了。

- (instancetype)filter:(BOOL (^)(id value))block {
    NSCParameterAssert(block != nil);

    Class class = self.class;

    return [[self flattenMap:^ id (id value) {  
        if (block(value)) {
            return [class return:value];
        } else {
            return class.empty;
        }
    }] setNameWithFormat:@"[%@] -filter:", self.name];
}

filter:中傳入一個(gè)閉包咆繁,是用篩選的條件讳推。如果滿足篩選條件的即返回原信號的值,否則原信號的值被“吞”掉玩般,返回空的信號银觅。這個(gè)變換主要是用flattenMap的。

  1. ignoreValues
- (RACSignal *)ignoreValues {
    return [[self filter:^(id _) {
        return NO;
    }] setNameWithFormat:@"[%@] -ignoreValues", self.name];
}

由上面filter的實(shí)現(xiàn)坏为,這里把篩選判斷條件永遠(yuǎn)的傳入NO设拟,那么原信號的值都會被變換成empty信號,故變換之后的信號為空信號久脯。

  1. ignore: (在父類RACStream中定義的)
- (instancetype)ignore:(id)value {
    return [[self filter:^ BOOL (id innerValue) {
        return innerValue != value && ![innerValue isEqual:value];
    }] setNameWithFormat:@"[%@] -ignore: %@", self.name, [value rac_description]];
}

ignore:的實(shí)現(xiàn)還是由filter:實(shí)現(xiàn)的纳胧。傳入的篩選判斷條件是一個(gè)值,當(dāng)原信號發(fā)送的值中是這個(gè)值的時(shí)候帘撰,就替換成空信號跑慕。

  1. distinctUntilChanged (在父類RACStream中定義的)
- (instancetype)distinctUntilChanged {
    Class class = self.class;

    return [[self bind:^{
        __block id lastValue = nil;
        __block BOOL initial = YES;

        return ^(id x, BOOL *stop) {
            if (!initial && (lastValue == x || [x isEqual:lastValue])) return [class empty];

            initial = NO;
            lastValue = x;
            return [class return:x];
        };
    }] setNameWithFormat:@"[%@] -distinctUntilChanged", self.name];
}

distinctUntilChanged的實(shí)現(xiàn)是用bind來完成的。每次變換中都記錄一下原信號上一次發(fā)送過來的值,并與這一次進(jìn)行比較核行,如果是相同的值牢硅,就“吞”掉,返回empty信號芝雪。只有和原信號上一次發(fā)送的值不同减余,變換后的新信號才把這個(gè)值發(fā)送出來。

關(guān)于distinctUntilChanged惩系,這里關(guān)注的是兩兩信號之間的值是否不同位岔,有時(shí)候我們可能需要一個(gè)類似于NSSet的信號集,distinctUntilChanged就無法滿足了堡牡。在ReactiveCocoa 2.5的這個(gè)版本也并沒有向我們提供distinct的變換函數(shù)抒抬。

我們可以自己實(shí)現(xiàn)類似的變換。實(shí)現(xiàn)思路也不難晤柄,可以把之前每次發(fā)送過來的信號都用數(shù)組存起來擦剑,新來的信號都去數(shù)組里面查找一遍,如果找不到芥颈,就把這個(gè)值發(fā)送出去惠勒,如果找到了,就返回empty信號爬坑。效果如上圖捉撮。

  1. take: (在父類RACStream中定義的)
- (instancetype)take:(NSUInteger)count {
    Class class = self.class;

    if (count == 0) return class.empty;

    return [[self bind:^{
        __block NSUInteger taken = 0;

        return ^ id (id value, BOOL *stop) {
            if (taken < count) {
                ++taken;
                if (taken == count) *stop = YES;
                return [class return:value];
            } else {
                return nil;
            }
        };
    }] setNameWithFormat:@"[%@] -take: %lu", self.name, (unsigned long)count];
}

take:實(shí)現(xiàn)也非常簡單,借助bind函數(shù)來實(shí)現(xiàn)的妇垢。入?yún)⒌腸ount是原信號取值的個(gè)數(shù)。在bind的閉包中肉康,taken計(jì)數(shù)從0開始取原信號的值闯估,當(dāng)taken取到count個(gè)數(shù)的時(shí)候,就停止取值吼和。

在take:的基礎(chǔ)上我們還可以繼續(xù)改造出新的變換方式涨薪。比如說,想取原信號中執(zhí)行的第幾個(gè)值炫乓。類似于elementAt的操作刚夺。這個(gè)操作在ReactiveCocoa 2.5的這個(gè)版本也并沒有直接向我們提供出來。

其實(shí)實(shí)現(xiàn)很簡單末捣,只需要判斷taken是否等于我們要取的那個(gè)位置就可以了侠姑,等于的時(shí)候把原信號的值發(fā)送出來,并*stop = YES箩做。

// 我自己增加實(shí)現(xiàn)的方法

- (instancetype)elementAt:(NSUInteger)index {
    Class class = self.class;

    return [[self bind:^{
        __block NSUInteger taken = 0;

        return ^ id (id value, BOOL *stop) {
            if (index == 0) {
                *stop = YES;
                return [class return:value];
            }
            if (taken == index) {
                *stop = YES;
                return [class return:value];
            } else if (taken < index){
                taken ++;
                return [class empty];
            }else {
                return nil;
            }
        };
    }] setNameWithFormat:@"[%@] -elementAt: %lu", self.name, (unsigned long)index];
}
  1. takeLast:
- (RACSignal *)takeLast:(NSUInteger)count {
    return [[RACSignal createSignal:^(id<RACSubscriber> subscriber) {
        NSMutableArray *valuesTaken = [NSMutableArray arrayWithCapacity:count];
        return [self subscribeNext:^(id x) {
            [valuesTaken addObject:x ? : RACTupleNil.tupleNil];

            while (valuesTaken.count > count) {
                [valuesTaken removeObjectAtIndex:0];
            }
        } error:^(NSError *error) {
            [subscriber sendError:error];
        } completed:^{
            for (id value in valuesTaken) {
                [subscriber sendNext:value == RACTupleNil.tupleNil ? nil : value];
            }

            [subscriber sendCompleted];
        }];
    }] setNameWithFormat:@"[%@] -takeLast: %lu", self.name, (unsigned long)count];
}

takeLast:的實(shí)現(xiàn)也是按照套路來莽红。先創(chuàng)建一個(gè)新信號,return的時(shí)候訂閱原信號。在函數(shù)內(nèi)部用一個(gè)valuesTaken來保存原信號發(fā)送過來的值安吁,原信號發(fā)多少醉蚁,就存多少,直到個(gè)數(shù)溢出入?yún)⒔o定的count鬼店,就溢出數(shù)組第0位网棍。這樣能保證數(shù)組里面始終都裝著最后count個(gè)原信號的值。

當(dāng)原信號發(fā)送completed信號的時(shí)候妇智,把數(shù)組里面存的值都sendNext出去滥玷。這里要注意的也是該變換發(fā)送信號的時(shí)機(jī)。如果原信號一直沒有completed俘陷,那么takeLast:就一直沒法發(fā)出任何信號來罗捎。

  1. takeUntilBlock: (在父類RACStream中定義的)
- (instancetype)takeUntilBlock:(BOOL (^)(id x))predicate {
    NSCParameterAssert(predicate != nil);

    Class class = self.class;

    return [[self bind:^{
        return ^ id (id value, BOOL *stop) {
            if (predicate(value)) return nil;

            return [class return:value];
        };
    }] setNameWithFormat:@"[%@] -takeUntilBlock:", self.name];
}

takeUntilBlock:是根據(jù)傳入的predicate閉包作為篩選條件的。一旦predicate( )閉包滿足條件拉盾,那么新信號停止發(fā)送新信號桨菜,因?yàn)樗恢脼閚il了。和函數(shù)名的意思是一樣的捉偏,take原信號的值倒得,Until直到閉包滿足條件。

  1. takeWhileBlock: (在父類RACStream中定義的)
- (instancetype)takeWhileBlock:(BOOL (^)(id x))predicate {
    NSCParameterAssert(predicate != nil);

    return [[self takeUntilBlock:^ BOOL (id x) {
        return !predicate(x);
    }] setNameWithFormat:@"[%@] -takeWhileBlock:", self.name];
}

takeWhileBlock:的信號集是takeUntilBlock:的信號集的補(bǔ)集夭禽。全集是原信號霞掺。takeWhileBlock:底層還是調(diào)用takeUntilBlock:,只不過判斷條件的是不滿足predicate( )閉包的集合讹躯。

  1. takeUntil:
- (RACSignal *)takeUntil:(RACSignal *)signalTrigger {
    return [[RACSignal createSignal:^(id<RACSubscriber> subscriber) {
        RACCompoundDisposable *disposable = [RACCompoundDisposable compoundDisposable];
        void (^triggerCompletion)(void) = ^{
            [disposable dispose];
            [subscriber sendCompleted];
        };

        RACDisposable *triggerDisposable = [signalTrigger subscribeNext:^(id _) {
            triggerCompletion();
        } completed:^{
            triggerCompletion();
        }];

        [disposable addDisposable:triggerDisposable];

        if (!disposable.disposed) {
            RACDisposable *selfDisposable = [self subscribeNext:^(id x) {
                [subscriber sendNext:x];
            } error:^(NSError *error) {
                [subscriber sendError:error];
            } completed:^{
                [disposable dispose];
                [subscriber sendCompleted];
            }];

            [disposable addDisposable:selfDisposable];
        }

        return disposable;
    }] setNameWithFormat:@"[%@] -takeUntil: %@", self.name, signalTrigger];
}

takeUntil:的實(shí)現(xiàn)也是“經(jīng)典套路”——return一個(gè)新信號菩彬,在新信號中訂閱原信號。入?yún)⑹且粋€(gè)信號signalTrigger潮梯,這個(gè)信號是一個(gè)Trigger骗灶。一旦signalTrigger發(fā)出第一個(gè)信號,就會觸發(fā)triggerCompletion( )閉包秉馏,在這個(gè)閉包中耙旦,會調(diào)用triggerCompletion( )閉包。

  void (^triggerCompletion)(void) = ^{
   [disposable dispose];
   [subscriber sendCompleted];
  };

一旦調(diào)用了triggerCompletion( )閉包萝究,就會把原信號取消訂閱免都,并給變換的新的信號訂閱者sendCompleted。

如果入?yún)ignalTrigger一直沒有sendNext帆竹,那么原信號就會一直sendNext:绕娘。

  1. takeUntilReplacement:
- (RACSignal *)takeUntilReplacement:(RACSignal *)replacement {
    return [RACSignal createSignal:^(id<RACSubscriber> subscriber) {
        RACSerialDisposable *selfDisposable = [[RACSerialDisposable alloc] init];

        RACDisposable *replacementDisposable = [replacement subscribeNext:^(id x) {
            [selfDisposable dispose];
            [subscriber sendNext:x];
        } error:^(NSError *error) {
            [selfDisposable dispose];
            [subscriber sendError:error];
        } completed:^{
            [selfDisposable dispose];
            [subscriber sendCompleted];
        }];

        if (!selfDisposable.disposed) {
            selfDisposable.disposable = [[self
                                          concat:[RACSignal never]]
                                         subscribe:subscriber];
        }

        return [RACDisposable disposableWithBlock:^{
            [selfDisposable dispose];
            [replacementDisposable dispose];
        }];
    }];
}

原信號concat:了一個(gè)[RACSignal never]信號,這樣原信號就一直不會disposed栽连,會一直等待replacement信號的到來业舍。
控制selfDisposable是否被dispose,控制權(quán)來自于入?yún)⒌膔eplacement信號,一旦replacement信號sendNext舷暮,那么原信號就會取消訂閱态罪,接下來的事情就會交給replacement信號了。
變換后的新信號sendNext下面,sendError复颈,sendCompleted全部都由replacement信號來發(fā)送,最終新信號完成的時(shí)刻也是replacement信號完成的時(shí)刻沥割。

  1. skip: (在父類RACStream中定義的)
- (instancetype)skip:(NSUInteger)skipCount {
    Class class = self.class;

    return [[self bind:^{
        __block NSUInteger skipped = 0;

        return ^(id value, BOOL *stop) {
            if (skipped >= skipCount) return [class return:value];

            skipped++;
            return class.empty;
        };
    }] setNameWithFormat:@"[%@] -skip: %lu", self.name, (unsigned long)skipCount];
}

skip:信號集和take:信號集是補(bǔ)集關(guān)系耗啦,全集是原信號。take:是取原信號的前count個(gè)信號机杜,而skip:是從原信號第count + 1位開始取信號帜讲。

skipped是一個(gè)游標(biāo),每次原信號發(fā)送一個(gè)值椒拗,就比較它和入?yún)kipCount的大小似将。如果不比skipCount大,說明還需要跳過蚀苛,所以就返回empty信號在验,否則就把原信號的值發(fā)送出來。

通過類比take系列方法堵未,可以發(fā)現(xiàn)在ReactiveCocoa 2.5的這個(gè)版本也并沒有向我們提供skipLast:的變換函數(shù)腋舌。這個(gè)變換函數(shù)的實(shí)現(xiàn)過程也不難,我們可以類比takeLast:來實(shí)現(xiàn)渗蟹。

實(shí)現(xiàn)的思路也不難块饺,原信號每次發(fā)送過來的值,都用一個(gè)數(shù)組存儲起來雌芽。skipLast:是想去掉原信號最末尾的count個(gè)信號授艰。

我們先來分析一下:假設(shè)原信號有n個(gè)信號,從0 - (n-1)膘怕,去掉最后的count個(gè),前面還剩n - count個(gè)信號召庞。那么從 原信號的第 count + 1位的信號開始發(fā)送岛心,到原信號結(jié)束,這樣中間就正好是發(fā)送了 n - count 個(gè)信號篮灼。

分析清楚后忘古,代碼就很容易了:

// 我自己增加實(shí)現(xiàn)的方法
- (RACSignal *)skipLast:(NSUInteger)count {
    return [[RACSignal createSignal:^(id<RACSubscriber> subscriber) {
        NSMutableArray *valuesTaken = [NSMutableArray arrayWithCapacity:count];
        return [self subscribeNext:^(id x) {
            [valuesTaken addObject:x ? : RACTupleNil.tupleNil];

            while (valuesTaken.count > count) {
                [subscriber sendNext:valuesTaken[0] == RACTupleNil.tupleNil ? nil : valuesTaken[0]];
                [valuesTaken removeObjectAtIndex:0];
            }
        } error:^(NSError *error) {
            [subscriber sendError:error];
        } completed:^{            
            [subscriber sendCompleted];
        }];
    }] setNameWithFormat:@"[%@] -skipLast: %lu", self.name, (unsigned long)count];
}

原信號每發(fā)送過來一個(gè)信號就存入數(shù)組,當(dāng)數(shù)組里面的個(gè)數(shù)大于count的時(shí)候诅诱,就是需要我們發(fā)送信號的時(shí)候髓堪,這個(gè)時(shí)候每次都把數(shù)組里面第0位發(fā)送出去即可,數(shù)組維護(hù)了一個(gè)FIFO的隊(duì)列。這樣就實(shí)現(xiàn)了skipLast:的效果了干旁。

  1. skipUntilBlock: (在父類RACStream中定義的)
- (instancetype)skipUntilBlock:(BOOL (^)(id x))predicate {
    NSCParameterAssert(predicate != nil);

    Class class = self.class;

    return [[self bind:^{
        __block BOOL skipping = YES;

        return ^ id (id value, BOOL *stop) {
            if (skipping) {
                if (predicate(value)) {
                    skipping = NO;
                } else {
                    return class.empty;
                }
            }

            return [class return:value];
        };
    }] setNameWithFormat:@"[%@] -skipUntilBlock:", self.name];
}

skipUntilBlock: 的實(shí)現(xiàn)可以類比takeUntilBlock: 的實(shí)現(xiàn)驶沼。

skipUntilBlock: 是根據(jù)傳入的predicate閉包作為篩選條件的。一旦predicate( )閉包滿足條件争群,那么skipping = NO回怜。skipping為NO,以后原信號發(fā)送的每個(gè)值都原封不動的發(fā)送出去换薄。predicate( )閉包不滿足條件的時(shí)候玉雾,即會一直skip原信號的值。和函數(shù)名的意思是一樣的轻要,skip原信號的值复旬,Until直到閉包滿足條件,就不再skip了冲泥。

  1. skipWhileBlock: (在父類RACStream中定義的)
- (instancetype)skipWhileBlock:(BOOL (^)(id x))predicate {
    NSCParameterAssert(predicate != nil);

    return [[self skipUntilBlock:^ BOOL (id x) {
        return !predicate(x);
    }] setNameWithFormat:@"[%@] -skipWhileBlock:", self.name];
}

skipWhileBlock:的信號集是skipUntilBlock:的信號集的補(bǔ)集驹碍。全集是原信號。skipWhileBlock:底層還是調(diào)用skipUntilBlock:柏蘑,只不過判斷條件的是不滿足predicate( )閉包的集合幸冻。

到這里skip系列方法就結(jié)束了,對比take系列的方法咳焚,少了2個(gè)方法洽损,在ReactiveCocoa 2.5的這個(gè)版本中 takeUntil: 和 takeUntilReplacement:這兩個(gè)方法沒有與之對應(yīng)的skip方法。

// 我自己增加實(shí)現(xiàn)的方法
- (RACSignal *)skipUntil:(RACSignal *)signalTrigger {
    return [[RACSignal createSignal:^(id<RACSubscriber> subscriber) {
        RACCompoundDisposable *disposable = [RACCompoundDisposable compoundDisposable];

        __block BOOL sendTrigger = NO;

        void (^triggerCompletion)(void) = ^{
            sendTrigger = YES;
        };

        RACDisposable *triggerDisposable = [signalTrigger subscribeNext:^(id _) {
            triggerCompletion();
        } completed:^{
            triggerCompletion();
        }];

        [disposable addDisposable:triggerDisposable];

        if (!disposable.disposed) {
            RACDisposable *selfDisposable = [self subscribeNext:^(id x) {

                if (sendTrigger) {
                    [subscriber sendNext:x];
                }

            } error:^(NSError *error) {
                [subscriber sendError:error];
            } completed:^{
                [disposable dispose];
                [subscriber sendCompleted];
            }];

            [disposable addDisposable:selfDisposable];
        }

        return disposable;
    }] setNameWithFormat:@"[%@] -skipUntil: %@", self.name, signalTrigger];
}

skipUntil實(shí)現(xiàn)方法也很簡單革半,當(dāng)入?yún)⒌膕ignalTrigger開發(fā)發(fā)送信號的時(shí)候碑定,就讓原信號sendNext把值發(fā)送出來,否則就把原信號的值“吞”掉又官。

skipUntilReplacement:就沒什么意義了延刘,把原信號經(jīng)過skipUntilReplacement:變換之后得到的新的信號就是Replacement信號。所以說這個(gè)操作也就沒意義了六敬。

  1. groupBy:transform:
- (RACSignal *)groupBy:(id<NSCopying> (^)(id object))keyBlock transform:(id (^)(id object))transformBlock {
    NSCParameterAssert(keyBlock != NULL);

    return [[RACSignal createSignal:^(id<RACSubscriber> subscriber) {
        NSMutableDictionary *groups = [NSMutableDictionary dictionary];
        NSMutableArray *orderedGroups = [NSMutableArray array];

        return [self subscribeNext:^(id x) {
            id<NSCopying> key = keyBlock(x);
            RACGroupedSignal *groupSubject = nil;
            @synchronized(groups) {
                groupSubject = groups[key];
                if (groupSubject == nil) {
                    groupSubject = [RACGroupedSignal signalWithKey:key];
                    groups[key] = groupSubject;
                    [orderedGroups addObject:groupSubject];
                    [subscriber sendNext:groupSubject];
                }
            }

            [groupSubject sendNext:transformBlock != NULL ? transformBlock(x) : x];
        } error:^(NSError *error) {
            [subscriber sendError:error];

            [orderedGroups makeObjectsPerformSelector:@selector(sendError:) withObject:error];
        } completed:^{
            [subscriber sendCompleted];

            [orderedGroups makeObjectsPerformSelector:@selector(sendCompleted)];
        }];
    }] setNameWithFormat:@"[%@] -groupBy:transform:", self.name];
}

看groupBy:transform:的實(shí)現(xiàn)碘赖,依舊是老“套路”。return 一個(gè)新的RACSignal外构,在新的信號里面訂閱原信號普泡。

groupBy:transform:的重點(diǎn)就在subscribeNext中了。

首先解釋一下兩個(gè)入?yún)⑸蟊唷蓚€(gè)入?yún)⒍际情]包撼班,keyBlock返回值是要作為字典的key,transformBlock的返回值是對原信號發(fā)出來的值x進(jìn)行變換垒酬。
先創(chuàng)建一個(gè)NSMutableDictionary字典groups砰嘁,和NSMutableArray數(shù)組orderedGroups件炉。
從字典里面取出key對應(yīng)的value,這里的key對應(yīng)著keyBlock返回值矮湘。value的值是一個(gè)RACGroupedSignal信號斟冕。如果找不到對應(yīng)的key值,就新建一個(gè)RACGroupedSignal信號板祝,并存入字典對應(yīng)的key值宫静,與之對應(yīng)。
新變換之后的信號券时,訂閱之后孤里,RACGroupedSignal進(jìn)行sendNext,這是一個(gè)信號橘洞,如果transformBlock不為空捌袜,就發(fā)送transformBlock變換之后的值。
sendError和sendCompleted都要分別對數(shù)組orderedGroups里面每個(gè)RACGroupedSignal都要進(jìn)行sendError或者sendCompleted炸枣。因?yàn)橐獙?shù)組里面每個(gè)信號都執(zhí)行一個(gè)操作虏等,所以需要調(diào)用makeObjectsPerformSelector:withObject:方法。
經(jīng)過groupBy:transform:變換之后适肠,原信號會根據(jù)keyBlock進(jìn)行分組霍衫。

寫出測試代碼,來看看平時(shí)應(yīng)該怎么用侯养。

    RACSignal *signalA = [RACSignal createSignal:^RACDisposable *(id<RACSubscriber> subscriber)
                         {
                             [subscriber sendNext:@1];
                             [subscriber sendNext:@2];
                             [subscriber sendNext:@3];
                             [subscriber sendNext:@4];
                             [subscriber sendNext:@5];
                             [subscriber sendCompleted];
                             return [RACDisposable disposableWithBlock:^{
                                 NSLog(@"signal dispose");
                             }];
                         }];

    RACSignal *signalGroup = [signalA groupBy:^id<NSCopying>(NSNumber *object) {
        return object.integerValue > 3 ? @"good" : @"bad";
    } transform:^id(NSNumber * object) {
        return @(object.integerValue * 10);
    }];

    [[[signalGroup filter:^BOOL(RACGroupedSignal *value) {
        return [(NSString *)value.key isEqualToString:@"good"];
    }] flatten]subscribeNext:^(id x) {
        NSLog(@"subscribeNext: %@", x);
    }];

假設(shè)原信號發(fā)送的1敦跌,2,3逛揩,4柠傍,5是代表的成績的5個(gè)等級。當(dāng)成績大于3的都算“good”辩稽,小于3的都算“bad”惧笛。

signalGroup是原信號signalA經(jīng)過groupBy:transform:得到的新的信號,這個(gè)信號是一個(gè)高階的信號逞泄,因?yàn)樗锩娌⒉皇侵苯友b的是值患整,signalGroup這個(gè)信號里面裝的還是信號。signalGroup里面有兩個(gè)分組喷众,分別是“good”分組和“bad”分組各谚。

想從中取出這兩個(gè)分組里面的值,需要進(jìn)行一次filter:篩選侮腹。篩選之后得到對應(yīng)分組的高階信號嘲碧。這時(shí)還要再進(jìn)行一個(gè)flatten操作稻励,把高階信號變成低階信號父阻,再次訂閱才能取到其中的值愈涩。

訂閱新信號的值,輸出如下:

subscribeNext: 40
subscribeNext: 50

關(guān)于flatten的實(shí)現(xiàn):

- (instancetype)flatten {
    __weak RACStream *stream __attribute__((unused)) = self;
    return [[self flattenMap:^(id value) {
        return value;
    }] setNameWithFormat:@"[%@] -flatten", self.name];
}

flatten操作就是調(diào)用了flattenMap:把值傳進(jìn)去了加矛。

- (instancetype)flattenMap:(RACStream * (^)(id value))block {
    Class class = self.class;

    return [[self bind:^{
        return ^(id value, BOOL *stop) {
            id stream = block(value) ?: [class empty];
            NSCAssert([stream isKindOfClass:RACStream.class], @"Value returned from -flattenMap: is not a stream: %@", stream);

            return stream;
        };
    }] setNameWithFormat:@"[%@] -flattenMap:", self.name];
}

flatten是把高階信號變換成低階信號的常用操作履婉。flattenMap:具體實(shí)現(xiàn)上篇文章分析過了,這里不再贅述斟览。

  1. groupBy:
- (RACSignal *)groupBy:(id<NSCopying> (^)(id object))keyBlock {
    return [[self groupBy:keyBlock transform:nil] setNameWithFormat:@"[%@] -groupBy:", self.name];
}
groupBy:操作就是groupBy:transform:的縮減版毁腿,transform傳入的為nil。

關(guān)于groupBy:可以干的事情很多苛茂,可以進(jìn)行很高級的分組操作已烤。這里可以舉一個(gè)例子:

// 簡單算法題,分離數(shù)組中的相同的元素妓羊,如果元素個(gè)數(shù)大于2胯究,則組成一個(gè)新的數(shù)組,結(jié)果得到多個(gè)包含相同元素的數(shù)組躁绸,
// 例如[1,2,3,1,2,3]分離成[1,1],[2,2],[3,3]
RACSignal *signal = @[@1, @2, @3, @4,@2,@3,@3,@4,@4,@4].rac_sequence.signal;

  NSArray * array = [[[[signal groupBy:^NSString *(NSNumber *object) {
      return [NSString stringWithFormat:@"%@",object];
  }] map:^id(RACGroupedSignal *value) {
      return [value sequence];
  }] sequence] map:^id(RACSignalSequence * value) {
      return value.array;
  }].array;

for (NSNumber * num in array) {
    NSLog(@"最后的數(shù)組%@",num);
}

// 最后輸出 [1,2,3,4,2,3,3,4,4,4]變成[1],[2,2],[3,3,3],[4,4,4,4]

二. 組合操作


1. startWith: (在父類RACStream中定義的)

  • (instancetype)startWith:(id)value {

    return [[[self.class return:value]
    concat:self]
    setNameWithFormat:@"[%@] -startWith: %@", self.name, [value rac_description]];
    }

startWith:的實(shí)現(xiàn)很簡單裕循,就是先構(gòu)造一個(gè)只發(fā)送一個(gè)value的信號,然后這個(gè)信號發(fā)送完畢之后接上原信號净刮。得到的新的信號就是在原信號前面新加了一個(gè)值剥哑。

2. concat: (在父類RACStream中定義的)

這里說的concat:是在父類RACStream中定義的。

  • (instancetype)concat:(RACStream *)stream {
    return nil;
    }
父類中定義的這個(gè)方法就返回一個(gè)nil淹父,具體的實(shí)現(xiàn)還要子類去重寫株婴。

3. concat: (在父類RACStream中定義的)

  • (instancetype)concat:(id<NSFastEnumeration>)streams {
    RACStream *result = self.empty;
    for (RACStream *stream in streams) {
    result = [result concat:stream];
    }

    return [result setNameWithFormat:@"+concat: %@", streams];
    }

這個(gè)concat:后面跟著一個(gè)數(shù)組,數(shù)組里面包含這很多信號弹灭,concat:依次把這些信號concat:連接串起來督暂。

4. merge:


  • (RACSignal *)merge:(id<NSFastEnumeration>)signals {
    NSMutableArray *copiedSignals = [[NSMutableArray alloc] init];
    for (RACSignal *signal in signals) {
    [copiedSignals addObject:signal];
    }

    return [[[RACSignal
    createSignal:^ RACDisposable * (id<RACSubscriber> subscriber) {
    for (RACSignal *signal in copiedSignals) {
    [subscriber sendNext:signal];
    }

                [subscriber sendCompleted];
                return nil;
            }]
           flatten]
          setNameWithFormat:@"+merge: %@", copiedSignals];
    

}

merge:后面跟一個(gè)數(shù)組。先會新建一個(gè)數(shù)組copiedSignals穷吮,把傳入的信號都裝到數(shù)組里逻翁。然后依次發(fā)送數(shù)組里面的信號。由于新信號也是一個(gè)高階信號捡鱼,因?yàn)閟endNext會把各個(gè)信號都依次發(fā)送出去八回,所以需要flatten操作把這個(gè)信號轉(zhuǎn)換成值發(fā)送出去。

從上圖上看驾诈,上下兩個(gè)信號就像被拍扁了一樣缠诅,就成了新信號的發(fā)送順序。

5. merge:


  • (RACSignal *)merge:(RACSignal *)signal {
    return [[RACSignal
    merge:@[ self, signal ]]
    setNameWithFormat:@"[%@] -merge: %@", self.name, signal];
    }
merge:后面參數(shù)也可以跟一個(gè)信號乍迄,那么merge:就是合并這兩個(gè)信號管引。具體實(shí)現(xiàn)和merge:多個(gè)信號是一樣的原理。

6. zip: (在父類RACStream中定義的)


  • (instancetype)zip:(id<NSFastEnumeration>)streams {
    return [[self join:streams block:^(RACStream *left, RACStream *right) {
    return [left zipWith:right];
    }] setNameWithFormat:@"+zip: %@", streams];
    }
zip:后面可以跟一個(gè)數(shù)組闯两,數(shù)組里面裝的是各種信號流褥伴。

它的實(shí)現(xiàn)是調(diào)用了join: block: 實(shí)現(xiàn)的谅将。

  • (instancetype)join:(id<NSFastEnumeration>)streams block:(RACStream * (^)(id, id))block {
    RACStream *current = nil;
    // 第一步
    for (RACStream *stream in streams) {

      if (current == nil) {
          current = [stream map:^(id x) {
              return RACTuplePack(x);
          }];
    
          continue;
      }
    
      current = block(current, stream);
    

    }
    // 第二步
    if (current == nil) return [self empty];

    return [current map:^(RACTuple *xs) {

      NSMutableArray *values = [[NSMutableArray alloc] init];
      // 第三步
      while (xs != nil) {
          [values insertObject:xs.last ?: RACTupleNil.tupleNil atIndex:0];
          xs = (xs.count > 1 ? xs.first : nil);
      }
      // 第四步
      return [RACTuple tupleWithObjectsFromArray:values];
    

    }];
    }

join: block: 的實(shí)現(xiàn)可以分為4步:

依次打包各個(gè)信號流,把每個(gè)信號流都打包成元組RACTuple重慢。首先第一個(gè)信號流打包成一個(gè)元組饥臂,這個(gè)元組里面就一個(gè)信號。接著把第一個(gè)元組和第二個(gè)信號執(zhí)行block( )閉包里面的操作似踱。傳入的block( )閉包執(zhí)行的是zipWith:的操作隅熙。這個(gè)操作是把兩個(gè)信號“壓”在一起。具體實(shí)現(xiàn)分析請看第一篇文章里面分析過的核芽,這里就不再贅述了囚戚。得到第二個(gè)元組,里面裝著是第一個(gè)元組和第二個(gè)信號轧简。之后每次循環(huán)都執(zhí)行類似的操作弯淘,再把第二個(gè)元組和第三個(gè)信號進(jìn)行zipWith:操作,以此類推下去吉懊,直到所有的信號流都循環(huán)一遍庐橙。

經(jīng)過第一步的循環(huán)操作之后,還是nil借嗽,那么肯定就是空信號了态鳖,就返回empty信號。

這一步是把之前第一步打包出來的結(jié)果恶导,還原回原信號的過程浆竭。經(jīng)過第一步的循環(huán)之后,current會是類似這個(gè)樣子惨寿,(((1), 2), 3)邦泄,第三步就是為了把這種多重元組解出來,每個(gè)信號流都依次按照順序放在數(shù)組里裂垦。注意觀察current的特點(diǎn)顺囊,最外層的元組,是一個(gè)值和一個(gè)元組蕉拢,所以從最外層的元組開始特碳,一層一層往里“剝”。while循環(huán)每次都取最外層元組的last晕换,即那個(gè)單獨(dú)的值午乓,插入到數(shù)組的第0號位置,然后取出first即是里面一層的元組闸准。然后依次循環(huán)益愈。由于每次都插入到數(shù)組0號的位置,類似于鏈表的頭插法夷家,最終數(shù)組里面的順序肯定也保證是原信號的順序蒸其。

第四步就是把還原成原信號的順序的數(shù)組包裝成元組或辖,返回給map操作的閉包。

  • (instancetype)tupleWithObjectsFromArray:(NSArray *)array {
    return [self tupleWithObjectsFromArray:array convertNullsToNils:NO];
    }

  • (instancetype)tupleWithObjectsFromArray:(NSArray *)array convertNullsToNils:(BOOL)convert {
    RACTuple *tuple = [[self alloc] init];

    if (convert) {
    NSMutableArray *newArray = [NSMutableArray arrayWithCapacity:array.count];
    for (id object in array) {
    [newArray addObject:(object == NSNull.null ? RACTupleNil.tupleNil : object)];
    }
    tuple.backingArray = newArray;
    } else {
    tuple.backingArray = [array copy];
    }

    return tuple;
    }

在轉(zhuǎn)換過程中枣接,入?yún)onvertNullsToNils的含義是,是否把數(shù)組里面的NSNull轉(zhuǎn)換成RACTupleNil缺谴。

這里轉(zhuǎn)換傳入的是NO但惶,所以就是把數(shù)組原封不動的copy一份。

測試代碼:

RACSignal *signalD = [RACSignal interval:3 onScheduler:[RACScheduler mainThreadScheduler] withLeeway:0];
RACSignal *signalO = [RACSignal interval:1 onScheduler:[RACScheduler mainThreadScheduler] withLeeway:0];
RACSignal *signalE = [RACSignal interval:4 onScheduler:[RACScheduler mainThreadScheduler] withLeeway:0];
RACSignal *signalB = [RACStream zip:@[signalD,signalO,signalE]];

[signalB subscribeNext:^(id x) {
    NSLog(@"最后接收到的值 = %@",x);
}];
打印輸出:

2016-11-29 13:07:57.349 最后接收到的值 = <RACTuple: 0x608000011440> (
"2016-11-29 05:07:56 +0000",
"2016-11-29 05:07:54 +0000",
"2016-11-29 05:07:57 +0000"
)

2016-11-29 13:08:01.350 最后接收到的值 = <RACTuple: 0x608000010c60> (
"2016-11-29 05:07:59 +0000",
"2016-11-29 05:07:55 +0000",
"2016-11-29 05:08:01 +0000"
)

2016-11-29 13:08:05.352 最后接收到的值 = <RACTuple: 0x60000001a350> (
"2016-11-29 05:08:02 +0000",
"2016-11-29 05:07:56 +0000",
"2016-11-29 05:08:05 +0000"
)

最后輸出的信號以時(shí)間最長的為主湿蛔,最后接到的信號是一個(gè)元組膀曾,里面依次包含zip:數(shù)組里每個(gè)信號在一次“壓”縮周期里面的值。

7. zip: reduce: (在父類RACStream中定義的)

  • (instancetype)zip:(id<NSFastEnumeration>)streams reduce:(id (^)())reduceBlock {
    NSCParameterAssert(reduceBlock != nil);
    RACStream *result = [self zip:streams];
    if (reduceBlock != nil) result = [result reduceEach:reduceBlock];
    return [result setNameWithFormat:@"+zip: %@ reduce:", streams];
    }
zip: reduce:是一個(gè)組合的方法阳啥。具體實(shí)現(xiàn)可以拆分成兩部分添谊,第一部分是先執(zhí)行zip:,把數(shù)組里面的信號流依次都進(jìn)行組合斩狱。這一過程的實(shí)現(xiàn)在上一個(gè)變換實(shí)現(xiàn)中分析過了。zip:完成之后所踊,緊接著進(jìn)行reduceEach:操作继薛。

這里有一個(gè)判斷reduceBlock是否為nil的判斷遏考,這個(gè)判斷是針對老版本的“歷史遺留問題”。在ReactiveCocoa 2.5之前的版本蓝谨,是允許reduceBlock傳入nil诈皿,這里為了防止崩潰,所以加上了這個(gè)reduceBlock是否為nil的判斷像棘。

  • (instancetype)reduceEach:(id (^)())reduceBlock {
    NSCParameterAssert(reduceBlock != nil);

    __weak RACStream *stream attribute((unused)) = self;
    return [[self map:^(RACTuple *t) {
    NSCAssert([t isKindOfClass:RACTuple.class], @"Value from stream %@ is not a tuple: %@", stream, t);
    return [RACBlockTrampoline invokeBlock:reduceBlock withArguments:t];
    }] setNameWithFormat:@"[%@] -reduceEach:", self.name];
    }

reduceEach:操作在上篇已經(jīng)分析過了稽亏。它會動態(tài)的構(gòu)造閉包,對原信號每個(gè)元組缕题,執(zhí)行reduceBlock( )閉包里面的方法截歉。具體分析見上篇。一般用法如下:

[RACStream zip:@[ stringSignal, intSignal ] reduce:^(NSString *string, NSNumber *number) {
return [NSString stringWithFormat:@"%@: %@", string, number];
}];

8. zipWith: (在父類RACStream中定義的)

  • (instancetype)zipWith:(RACStream *)stream {
    return nil;
    }
這個(gè)方法就是在父類的RACStream中定義了烟零,具體實(shí)現(xiàn)還要看RACStream各個(gè)子類的實(shí)現(xiàn)瘪松。

它就可以類比concat:在父類中的實(shí)現(xiàn)咸作,也是直接返回一個(gè)nil。


- (instancetype)concat:(RACStream *)stream { return nil;}
在第一篇中分析了concat:和zipWith:在RACSignal子類中具體實(shí)現(xiàn)。忘記了具體實(shí)現(xiàn)的可以回去看看。

9. combineLatestWith:

  • (RACSignal *)combineLatestWith:(RACSignal *)signal {
    NSCParameterAssert(signal != nil);

    return [[RACSignal createSignal:^(id<RACSubscriber> subscriber) {
    RACCompoundDisposable *disposable = [RACCompoundDisposable compoundDisposable];

      // 初始化第一個(gè)信號的一些標(biāo)志變量
      __block id lastSelfValue = nil;
      __block BOOL selfCompleted = NO;
    
      // 初始化第二個(gè)信號的一些標(biāo)志變量
      __block id lastOtherValue = nil;
      __block BOOL otherCompleted = NO;
    
      // 這里是一個(gè)判斷是否sendNext的閉包
      void (^sendNext)(void) = ^{ };
    
      // 訂閱第一個(gè)信號
      RACDisposable *selfDisposable = [self subscribeNext:^(id x) { }];
      [disposable addDisposable:selfDisposable];
    
      // 訂閱第二個(gè)信號
      RACDisposable *otherDisposable = [signal subscribeNext:^(id x) { }];
      [disposable addDisposable:otherDisposable];
    
      return disposable;
    

    }] setNameWithFormat:@"[%@] -combineLatestWith: %@", self.name, signal];
    }

大體實(shí)現(xiàn)思路比較簡單锄蹂,在新信號里面分別訂閱原信號和入?yún)ignal信號虚青。


RACDisposable *selfDisposable = [self subscribeNext:^(id x) {
@synchronized (disposable) {
lastSelfValue = x ?: RACTupleNil.tupleNil;
sendNext();
}
} error:^(NSError *error) {
[subscriber sendError:error];
} completed:^{
@synchronized (disposable) {
selfCompleted = YES;
if (otherCompleted) [subscriber sendCompleted];
}
}];

先來看看原信號訂閱的具體實(shí)現(xiàn):

在subscribeNext閉包中,記錄下原信號最新發(fā)送的x值,并保存到lastSelfValue中。從此lastSelfValue變量每次都保存原信號發(fā)送過來的最新的值。然后再調(diào)用sendNext( )閉包说庭。

在completed閉包中,selfCompleted中記錄下原信號發(fā)送完成郑趁。這是還要判斷otherCompleted是否完成刊驴,即入?yún)⑿盘杝ignal是否發(fā)送完成,只有兩者都發(fā)送完成了寡润,組合的新信號才能算全部發(fā)送完成捆憎。

RACDisposable *otherDisposable = [signal subscribeNext:^(id x) {
@synchronized (disposable) {
lastOtherValue = x ?: RACTupleNil.tupleNil;
sendNext();
}
} error:^(NSError *error) {
[subscriber sendError:error];
} completed:^{
@synchronized (disposable) {
otherCompleted = YES;
if (selfCompleted) [subscriber sendCompleted];
}
}];

這是對入?yún)⑿盘杝ignal的處理實(shí)現(xiàn)。和原信號的處理方式完全一致∷笪疲現(xiàn)在重點(diǎn)就要看看sendNext( )閉包中都做了些什么攻礼。

void (^sendNext)(void) = ^{
@synchronized (disposable) {
if (lastSelfValue == nil || lastOtherValue == nil) return;
[subscriber sendNext:RACTuplePack(lastSelfValue, lastOtherValue)];
}
};

在sendNext( )閉包中,如果lastSelfValue 或者 lastOtherValue 其中之一有一個(gè)為nil栗柒,就return礁扮,因?yàn)檫@個(gè)時(shí)候無法結(jié)合在一起。當(dāng)兩個(gè)信號都有值瞬沦,那么就把這兩個(gè)信號的最新的值打包成元組發(fā)送出來太伊。


可以看到,每個(gè)信號每發(fā)送出來一個(gè)新的值逛钻,都會去找另外一個(gè)信號上一個(gè)最新的值進(jìn)行結(jié)合僚焦。

這里可以對比一下類似的zip:操作


zip:操作是會把新來的信號的值存起來,放在數(shù)組里曙痘,然后另外一個(gè)信號發(fā)送一個(gè)值過來就和數(shù)組第0位的值相互結(jié)合成新的元組信號發(fā)送出去芳悲,并分別移除數(shù)組里面第0位的兩個(gè)值。zip:能保證每次結(jié)合的值都是唯一的边坤,不會一個(gè)原信號的值被多次結(jié)合到新的元組信號中名扛。但是combineLatestWith:是不能保證這一點(diǎn)的,在原信號或者另外一個(gè)信號新信號發(fā)送前茧痒,每次發(fā)送信號都會結(jié)合當(dāng)前最新的信號肮韧,這里就會有反復(fù)結(jié)合的情況。

10. combineLatest:

  • (RACSignal *)combineLatest:(id<NSFastEnumeration>)signals {
    return [[self join:signals block:^(RACSignal *left, RACSignal *right) {
    return [left combineLatestWith:right];
    }] setNameWithFormat:@"+combineLatest: %@", signals];
    }
combineLatest:的實(shí)現(xiàn)就是把入?yún)?shù)組里面的每個(gè)信號都調(diào)用一次join: block:方法。傳入的閉包是把兩個(gè)信號combineLatestWith:一下弄企。combineLatest:的實(shí)現(xiàn)就是2個(gè)操作的組合超燃。具體實(shí)現(xiàn)上面也都分析過,這里不再贅述拘领。

11. combineLatest: reduce:

  • (RACSignal *)combineLatest:(id<NSFastEnumeration>)signals reduce:(id (^)())reduceBlock {
    NSCParameterAssert(reduceBlock != nil);
    RACSignal *result = [self combineLatest:signals];
    if (reduceBlock != nil) result = [result reduceEach:reduceBlock];
    return [result setNameWithFormat:@"+combineLatest: %@ reduce:", signals];
    }
combineLatest: reduce: 的實(shí)現(xiàn)可以類比zip: reduce:的實(shí)現(xiàn)意乓。

具體實(shí)現(xiàn)可以拆分成兩部分,第一部分是先執(zhí)行combineLatest:约素,把數(shù)組里面的信號流依次都進(jìn)行組合届良。這一過程的實(shí)現(xiàn)在上一個(gè)變換實(shí)現(xiàn)中分析過了。combineLatest:完成之后业汰,緊接著進(jìn)行reduceEach:操作。

這里有一個(gè)判斷reduceBlock是否為nil的判斷菩颖,這個(gè)判斷是針對老版本的“歷史遺留問題”样漆。在ReactiveCocoa 2.5之前的版本,是允許reduceBlock傳入nil晦闰,這里為了防止崩潰放祟,所以加上了這個(gè)reduceBlock是否為nil的判斷。

12. combinePreviousWithStart: reduce:(在父類RACStream中定義的)

這個(gè)方法的實(shí)現(xiàn)也是多個(gè)變換操作組合在一起的呻右。

  • (instancetype)combinePreviousWithStart:(id)start reduce:(id (^)(id previous, id next))reduceBlock {
    NSCParameterAssert(reduceBlock != NULL);
    return [[[self
    scanWithStart:RACTuplePack(start)
    reduce:^(RACTuple *previousTuple, id next) {
    id value = reduceBlock(previousTuple[0], next);
    return RACTuplePack(next, value);
    }]
    map:^(RACTuple *tuple) {
    return tuple[1];
    }]
    setNameWithFormat:@"[%@] -combinePreviousWithStart: %@ reduce:", self.name, [start rac_description]];
    }
combinePreviousWithStart: reduce:的實(shí)現(xiàn)完全可以類比scanWithStart:reduce:的實(shí)現(xiàn)跪妥。舉個(gè)例子來說明他們倆的不同。

  RACSequence *numbers = @[ @1, @2, @3, @4 ].rac_sequence;

  RACSignal *signalA = [numbers combinePreviousWithStart:@0 reduce:^(NSNumber *previous, NSNumber *next) {
      return @(previous.integerValue + next.integerValue);
  }].signal;

RACSignal *signalB = [numbers scanWithStart:@0 reduce:^(NSNumber *previous, NSNumber *next) {
    return @(previous.integerValue + next.integerValue);
}].signal;
signalA輸出如下:

1
3
5
7

signalB輸出如下:

1
3
6
10

現(xiàn)在應(yīng)該不同點(diǎn)應(yīng)該很明顯了声滥。combinePreviousWithStart: reduce:實(shí)現(xiàn)的是兩兩之前的加和眉撵,而scanWithStart:reduce:實(shí)現(xiàn)的累加。

為什么會這樣呢落塑,具體看看combinePreviousWithStart: reduce:的實(shí)現(xiàn)纽疟。

雖然combinePreviousWithStart: reduce:也是調(diào)用了scanWithStart:reduce:,但是初始值是RACTuplePack(start)元組憾赁,聚合reduce的過程也有所不同:

id value = reduceBlock(previousTuple[0], next);
return RACTuplePack(next, value);

依次調(diào)用reduceBlock( )閉包污朽,傳入previousTuple[0], next,這里reduceBlock( )閉包是進(jìn)行累加的操作龙考,所以就是把前一個(gè)元組的第0位加上后面新來的信號的值蟆肆。得到的值拼成新的元組,新的元組由next和value值構(gòu)成晦款。

如果打印出上述例子中combinePreviousWithStart: reduce:的加合過程中每個(gè)信號的值炎功,如下:

<RACTuple: 0x608000200010> (
1,
1
)

<RACTuple: 0x60000001fe70> (
2,
3
)
<RACTuple: 0x60000001fe90> (
3,
5
)
<RACTuple: 0x60000001feb0> (
4,
7
)

由于這樣拆成元組之后,下次再進(jìn)行操作的時(shí)候缓溅,還可以拿到前一個(gè)信號的值亡问,這樣就不會形成累加的效果。

13. sample:

  • (RACSignal *)sample:(RACSignal *)sampler {
    NSCParameterAssert(sampler != nil);

    return [[RACSignal createSignal:^(id<RACSubscriber> subscriber) {
    NSLock *lock = [[NSLock alloc] init];
    __block id lastValue;
    __block BOOL hasValue = NO;

      RACSerialDisposable *samplerDisposable = [[RACSerialDisposable alloc] init];
      RACDisposable *sourceDisposable = [self subscribeNext:^(id x) { // 暫時(shí)省略 }];
    
      samplerDisposable.disposable = [sampler subscribeNext:^(id _) { // 暫時(shí)省略 }];
    
      return [RACDisposable disposableWithBlock:^{
          [samplerDisposable dispose];
          [sourceDisposable dispose];
      }];
    

    }] setNameWithFormat:@"[%@] -sample: %@", self.name, sampler];
    }

sample:內(nèi)部實(shí)現(xiàn)也是對原信號和入?yún)⑿盘杝ampler分別進(jìn)行訂閱。具體實(shí)現(xiàn)就是這兩個(gè)信號訂閱內(nèi)部都干了些什么州藕。

RACSerialDisposable *samplerDisposable = [[RACSerialDisposable alloc] init];
RACDisposable *sourceDisposable = [self subscribeNext:^(id x) {
[lock lock];
hasValue = YES;
lastValue = x;
[lock unlock];
} error:^(NSError *error) {
[samplerDisposable dispose];
[subscriber sendError:error];
} completed:^{
[samplerDisposable dispose];
[subscriber sendCompleted];
}];

這是對原信號的操作束世,原信號的操作在subscribeNext中就記錄了兩個(gè)變量的值,hasValue記錄原信號有值床玻,lastValue記錄了原信號的最新的值毁涉。這里加了一層NSLock鎖進(jìn)行保護(hù)。

在發(fā)生error的時(shí)候锈死,先把sampler信號取消訂閱贫堰,然后再sendError:。當(dāng)原信號完成的時(shí)候待牵,同樣是先把sampler信號取消訂閱其屏,然后再sendCompleted。

samplerDisposable.disposable = [sampler subscribeNext:^(id _) {
BOOL shouldSend = NO;
id value;
[lock lock];
shouldSend = hasValue;
value = lastValue;
[lock unlock];

if (shouldSend) {
    [subscriber sendNext:value];
}

} error:^(NSError *error) {
[sourceDisposable dispose];
[subscriber sendError:error];
} completed:^{
[sourceDisposable dispose];
[subscriber sendCompleted];
}];

這是對入?yún)⑿盘杝ampler的操作缨该。shouldSend默認(rèn)值是NO偎行,這個(gè)變量控制著是否sendNext:值。只有當(dāng)原信號有值的時(shí)候贰拿,hasValue = YES蛤袒,所以shouldSend = YES,這個(gè)時(shí)候才能發(fā)送原信號的值膨更。這里我們并不關(guān)心入?yún)⑿盘杝ampler的值妙真,從subscribeNext:^(id _)這里可以看出, _代表并不需要它的值荚守。

在發(fā)生error的時(shí)候珍德,先把原信號取消訂閱,然后再sendError:矗漾。當(dāng)sampler信號完成的時(shí)候菱阵,同樣是先把原信號取消訂閱,然后再sendCompleted缩功。


經(jīng)過sample:變換就會變成這個(gè)樣子晴及。只是把原信號的值都移動到了sampler信號發(fā)送信號的時(shí)刻,值還是和原信號的值一樣嫡锌。

最后

關(guān)于RACSignal的變換操作還剩下 冷熱信號轉(zhuǎn)換操作虑稼,高階信號操作,下篇接著繼續(xù)分析势木。最后請大家多多指教蛛倦。

轉(zhuǎn)載自:https://gold.xitu.io/post/583e096461ff4b007edddbc8
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市啦桌,隨后出現(xiàn)的幾起案子溯壶,更是在濱河造成了極大的恐慌及皂,老刑警劉巖,帶你破解...
    沈念sama閱讀 216,402評論 6 499
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件且改,死亡現(xiàn)場離奇詭異验烧,居然都是意外死亡,警方通過查閱死者的電腦和手機(jī)又跛,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,377評論 3 392
  • 文/潘曉璐 我一進(jìn)店門碍拆,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人慨蓝,你說我怎么就攤上這事感混。” “怎么了礼烈?”我有些...
    開封第一講書人閱讀 162,483評論 0 353
  • 文/不壞的土叔 我叫張陵弧满,是天一觀的道長。 經(jīng)常有香客問我此熬,道長庭呜,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 58,165評論 1 292
  • 正文 為了忘掉前任摹迷,我火速辦了婚禮疟赊,結(jié)果婚禮上郊供,老公的妹妹穿的比我還像新娘峡碉。我一直安慰自己,他們只是感情好驮审,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,176評論 6 388
  • 文/花漫 我一把揭開白布鲫寄。 她就那樣靜靜地躺著,像睡著了一般疯淫。 火紅的嫁衣襯著肌膚如雪地来。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 51,146評論 1 297
  • 那天熙掺,我揣著相機(jī)與錄音未斑,去河邊找鬼。 笑死币绩,一個(gè)胖子當(dāng)著我的面吹牛蜡秽,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播缆镣,決...
    沈念sama閱讀 40,032評論 3 417
  • 文/蒼蘭香墨 我猛地睜開眼芽突,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了董瞻?” 一聲冷哼從身側(cè)響起寞蚌,我...
    開封第一講書人閱讀 38,896評論 0 274
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎,沒想到半個(gè)月后挟秤,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體壹哺,經(jīng)...
    沈念sama閱讀 45,311評論 1 310
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,536評論 2 332
  • 正文 我和宋清朗相戀三年煞聪,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了斗躏。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 39,696評論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡昔脯,死狀恐怖啄糙,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情云稚,我是刑警寧澤隧饼,帶...
    沈念sama閱讀 35,413評論 5 343
  • 正文 年R本政府宣布,位于F島的核電站静陈,受9級特大地震影響燕雁,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜鲸拥,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,008評論 3 325
  • 文/蒙蒙 一拐格、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧刑赶,春花似錦捏浊、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,659評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至牵敷,卻和暖如春胡岔,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背枷餐。 一陣腳步聲響...
    開封第一講書人閱讀 32,815評論 1 269
  • 我被黑心中介騙來泰國打工靶瘸, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人毛肋。 一個(gè)月前我還...
    沈念sama閱讀 47,698評論 2 368
  • 正文 我出身青樓怨咪,卻偏偏與公主長得像,于是被迫代替她去往敵國和親村生。 傳聞我的和親對象是個(gè)殘疾皇子惊暴,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,592評論 2 353

推薦閱讀更多精彩內(nèi)容