filter: 過濾
過濾原始信號,如果滿足過濾條件轉(zhuǎn)發(fā)這個信號,否則忽略這個信號.
示例代碼:
RACSignal *signal = [[RACSignal createSignal:^RACDisposable * _Nullable(id<RACSubscriber> _Nonnull subscriber) {
[subscriber sendNext:@"zhao"];
[subscriber sendNext:@"wang"];
[subscriber sendNext:@"qian"];
[subscriber sendNext:@"wei"];
return [[RACDisposable alloc] init];
}]
filter:^BOOL(id _Nullable value) {
if ([value hasPrefix:@"w"]) {
return YES;
} else {
return NO;
}
}];
[signal subscribeNext:^(id _Nullable x) {
NSLog(@"x : %@", x);
}];
輸出:
2017-09-29 16:27:41.273049+0800 RAC[7343:336413] x : wang
2017-09-29 16:27:41.273326+0800 RAC[7343:336413] x : wei
實(shí)現(xiàn):
- (__kindof RACStream *)filter:(BOOL (^)(id value))block {
NSCParameterAssert(block != nil);
Class class = self.class;
return [[self flattenMap:^ id (id value) {
if (block(value)) {
return [class return:value];
} else {
return class.empty;
}
}] setNameWithFormat:@"[%@] -filter:", self.name];
}
如果滿足過濾條件返回一個調(diào)用-[RACReturnSignal return:]
直接同步發(fā)送信號,否則返回一個RACEmptySignal
RACReturnSignal
+ (RACSignal *)return:(id)value {
RACReturnSignal *signal = [[self alloc] init];
signal->_value = value;
#ifdef DEBUG
[signal setNameWithFormat:@"+return: %@", value];
#endif
return signal;
}
- (RACDisposable *)subscribe:(id<RACSubscriber>)subscriber {
NSCParameterAssert(subscriber != nil);
return [RACScheduler.subscriptionScheduler schedule:^{
[subscriber sendNext:self.value];
[subscriber sendCompleted];
}];
}
上面是一個return信號的實(shí)現(xiàn),在創(chuàng)建過程中保存了vulue,當(dāng)此信號被訂閱時,直接發(fā)送value并調(diào)用完成信號.
RACEmptySignal
+ (RACSignal *)empty {
#ifdef DEBUG
// Create multiple instances of this class in DEBUG so users can set custom
// names on each.
return [[[self alloc] init] setNameWithFormat:@"+empty"];
#else
static id singleton;
static dispatch_once_t pred;
dispatch_once(&pred, ^{
singleton = [[self alloc] init];
});
return singleton;
#endif
}
#pragma mark Subscription
- (RACDisposable *)subscribe:(id<RACSubscriber>)subscriber {
NSCParameterAssert(subscriber != nil);
return [RACScheduler.subscriptionScheduler schedule:^{
[subscriber sendCompleted];
}];
}
上面就是一個空信號的實(shí)現(xiàn),在這個信號被調(diào)用時什么也不做直接調(diào)用完成信號, 注意在這里區(qū)分了release版本和DEBUG版本, 在release版本使用一個單例實(shí)現(xiàn)RACEmptySignal.
所以通過RACReturnSignal
RACEmptySignal
兩個信號對過濾進(jìn)行轉(zhuǎn)發(fā)和忽略.
ignore: 忽略
忽略指定的值
示例代碼:
RACSignal *signal = [[RACSignal createSignal:^RACDisposable * _Nullable(id<RACSubscriber> _Nonnull subscriber) {
[subscriber sendNext:@"wang"];
[subscriber sendNext:@"li"];
[subscriber sendNext:@"fang"];
[subscriber sendNext:@"wang"];
return [[RACDisposable alloc] init];
}] ignore:@"wang"];
[signal subscribeNext:^(id _Nullable x) {
NSLog(@"x : %@", x);
}];
輸出:
2017-09-29 17:11:37.726000+0800 RAC[7881:371770] x : li
2017-09-29 17:11:37.726229+0800 RAC[7881:371770] x : fang
實(shí)現(xiàn):- (__kindof RACStream *)ignore:(id)value {
return [[self filter:^ BOOL (id innerValue) {
return innerValue != value && ![innerValue isEqual:value];
}] setNameWithFormat:@"[%@] -ignore: %@", self.name, RACDescription(value)];
}
內(nèi)部是對filter方法的封裝, 內(nèi)部是使用指針地址和 - isEqual:
方法判斷兩個值是否相等, 如果兩個值相等則忽略調(diào)這個值.
reduceEach:
block參數(shù)的個數(shù)是動態(tài)的,根據(jù)元組中的元素個數(shù)變化,block每個參數(shù)和元組中的每個元素一一對應(yīng). block的返回值是根據(jù)元組中的元素映射的一個值,其中的邏輯可以根據(jù)需求而定.
示例代碼:
RACSignal *signal = [[RACSignal createSignal:^RACDisposable * _Nullable(id<RACSubscriber> _Nonnull subscriber) {
[subscriber sendNext:RACTuplePack(@1, @2)];
[subscriber sendNext:RACTuplePack(@3, @4)];
return [[RACDisposable alloc] init];
}] reduceEach:^id _Nullable (id value1, id value2){
return @([value1 integerValue] + [value2 integerValue]);
}];
[signal subscribeNext:^(id _Nullable x) {
NSLog(@"%ld", [x integerValue]);
}];
輸出:
2017-10-03 19:24:33.999636+0800 RAC[28155:1726872] 3
2017-10-03 19:24:33.999854+0800 RAC[28155:1726872] 7
源碼:
- (__kindof RACStream *)reduceEach:(id (^)())reduceBlock {
NSCParameterAssert(reduceBlock != nil);
__weak RACStream *stream __attribute__((unused)) = self;
return [[self map:^(RACTuple *t) {
NSCAssert([t isKindOfClass:RACTuple.class], @"Value from stream %@ is not a tuple: %@", stream, t);
return [RACBlockTrampoline invokeBlock:reduceBlock withArguments:t];
}] setNameWithFormat:@"[%@] -reduceEach:", self.name];
}
在源碼中有兩個斷言,一個斷言是reduceBlock不能為空,另一個是信號的值必須為RACTuple類型.
reduceEach:內(nèi)部是對map方法的封裝,根據(jù)mapBlock的入?yún)⒎祷?code>[RACBlockTrampoline invokeBlock:reduceBlock withArguments:t].
RACBlockTrampoline根據(jù)入?yún)ACTuple的count調(diào)用block,返回調(diào)用block的返回值.
+ (id)invokeBlock:(id)block withArguments:(RACTuple *)arguments {
NSCParameterAssert(block != NULL);
//保存block
RACBlockTrampoline *trampoline = [[self alloc] initWithBlock:block];
return [trampoline invokeWithArguments:arguments];
}
- (id)invokeWithArguments:(RACTuple *)arguments {
// 根據(jù)arguments數(shù)量選中SEL
SEL selector = [self selectorForArgumentCount:arguments.count];
// 根據(jù)SEL創(chuàng)建NSInvocation
NSInvocation *invocation = [NSInvocation invocationWithMethodSignature:[self methodSignatureForSelector:selector]];
invocation.selector = selector;
invocation.target = self;
for (NSUInteger i = 0; i < arguments.count; i++) {
id arg = arguments[i];
NSInteger argIndex = (NSInteger)(i + 2);
// 入?yún)①x值
[invocation setArgument:&arg atIndex:argIndex];
}
// 方法調(diào)用
[invocation invoke];
__unsafe_unretained id returnVal;
// 獲取調(diào)用返回值
[invocation getReturnValue:&returnVal];
return returnVal;
}
- (SEL)selectorForArgumentCount:(NSUInteger)count {
NSCParameterAssert(count > 0);
switch (count) {
case 0: return NULL;
case 1: return @selector(performWith:);
case 2: return @selector(performWith::);
case 3: return @selector(performWith:::);
case 4: return @selector(performWith::::);
case 5: return @selector(performWith:::::);
case 6: return @selector(performWith::::::);
case 7: return @selector(performWith:::::::);
case 8: return @selector(performWith::::::::);
case 9: return @selector(performWith:::::::::);
case 10: return @selector(performWith::::::::::);
case 11: return @selector(performWith:::::::::::);
case 12: return @selector(performWith::::::::::::);
case 13: return @selector(performWith:::::::::::::);
case 14: return @selector(performWith::::::::::::::);
case 15: return @selector(performWith:::::::::::::::);
}
NSCAssert(NO, @"The argument count is too damn high! Only blocks of up to 15 arguments are currently supported.");
return NULL;
}
// 具體實(shí)現(xiàn)
- (id)performWith:(id)obj1 {
id (^block)(id) = self.block;
return block(obj1);
}
- (id)performWith:(id)obj1 :(id)obj2 {
id (^block)(id, id) = self.block;
return block(obj1, obj2);
}
以此類推...
首先根據(jù)元組中元素的數(shù)量決定調(diào)用的SEL,然后動態(tài)創(chuàng)建NSInvocation,并調(diào)用他.
在給NSInvocation入?yún)①x值是從i+2的位置開始給入?yún)①x值,是因為前兩個入?yún)⒎謩e為id self和SEL _cmd.
在具體實(shí)現(xiàn)中是調(diào)用block,block的入?yún)⑹窃M的元素,返回值就是reduceBlock的返回值,由開發(fā)者返回.
startWith:
在第一個信號前插入一個信號.
實(shí)例代碼:
RACSignal *signal = [[RACSignal createSignal:^RACDisposable * _Nullable(id<RACSubscriber> _Nonnull subscriber) {
[subscriber sendNext:@1];
[subscriber sendNext:@2];
[subscriber sendNext:@3];
return [RACDisposable new];
}] startWith:@0];
[signal subscribeNext:^(id _Nullable x) {
NSLog(@"x: %@", [x stringValue]);
}];
輸出:
2017-10-03 20:53:04.191798+0800 RAC[29045:1780844] x: 0
2017-10-03 20:53:04.191962+0800 RAC[29045:1780844] x: 1
2017-10-03 20:53:04.192030+0800 RAC[29045:1780844] x: 2
2017-10-03 20:53:04.192129+0800 RAC[29045:1780844] x: 3
源碼:
- (__kindof RACStream *)startWith:(id)value {
return [[[self.class return:value]
concat:self]
setNameWithFormat:@"[%@] -startWith: %@", self.name, RACDescription(value)];
}
內(nèi)部由concat方法實(shí)現(xiàn),新建一個RACSignal直接返回value,然后concat原始的信號.前面說過concat的實(shí)現(xiàn),所以信號stream的順序是先發(fā)送value,然后發(fā)送原始信號.
skip:
跳過前n個信號.
示例代碼:
RACSignal *signal = [[RACSignal createSignal:^RACDisposable * _Nullable(id<RACSubscriber> _Nonnull subscriber) {
[subscriber sendNext:@1];
[subscriber sendNext:@2];
[subscriber sendNext:@3];
[subscriber sendNext:@4];
[subscriber sendNext:@5];
[subscriber sendNext:@6];
return [RACDisposable new];
}] skip:3];
[signal subscribeNext:^(id _Nullable x) {
NSLog(@"x: %@", [x stringValue]);
}];
輸出:
2017-10-03 21:00:43.366280+0800 RAC[29217:1787626] x: 4
2017-10-03 21:00:43.366462+0800 RAC[29217:1787626] x: 5
2017-10-03 21:00:43.366585+0800 RAC[29217:1787626] x: 6
源碼:
- (__kindof RACStream *)skip:(NSUInteger)skipCount {
Class class = self.class;
return [[self bind:^{
__block NSUInteger skipped = 0;
return ^(id value, BOOL *stop) {
if (skipped >= skipCount) return [class return:value];
skipped++;
return class.empty;
};
}] setNameWithFormat:@"[%@] -skip: %lu", self.name, (unsigned long)skipCount];
}
使用skipped記錄跳過的數(shù)量,每忽略一次信號skipped+1,直到sikpped>=skipcount.
skipUntilBlock:
- (__kindof RACStream *)skipUntilBlock:(BOOL (^)(id x))predicate {
NSCParameterAssert(predicate != nil);
Class class = self.class;
return [[self bind:^{
__block BOOL skipping = YES;
return ^ id (id value, BOOL *stop) {
if (skipping) {
if (predicate(value)) {
skipping = NO;
} else {
return class.empty;
}
}
return [class return:value];
};
}] setNameWithFormat:@"[%@] -skipUntilBlock:", self.name];
}
skipUntilBlock:以predicate閉包作為篩選條件,當(dāng)篩選條件為NO是跳過此信號,直到篩選條件為YES后面所有的信號都不跳過.
skipWhileBlock:
- (__kindof RACStream *)skipWhileBlock:(BOOL (^)(id x))predicate {
NSCParameterAssert(predicate != nil);
return [[self skipUntilBlock:^ BOOL (id x) {
return !predicate(x);
}] setNameWithFormat:@"[%@] -skipWhileBlock:", self.name];
}
skipWhileBlock:的信號集是skipUntilBlock:的信號集的補(bǔ)集纲辽。全集是原信號绰筛。skipWhileBlock:底層還是調(diào)用skipUntilBlock:僧凤,只不過判斷條件的是不滿足predicate( )閉包的集合俭厚。
take:
接受前n個信號.
示例代碼:
RACSignal *signal = [[RACSignal createSignal:^RACDisposable * _Nullable(id<RACSubscriber> _Nonnull subscriber) {
[subscriber sendNext:@1];
[subscriber sendNext:@2];
[subscriber sendNext:@3];
[subscriber sendNext:@4];
[subscriber sendNext:@5];
[subscriber sendNext:@6];
return [RACDisposable new];
}] take:3];
[signal subscribeNext:^(id _Nullable x) {
NSLog(@"x: %@", [x stringValue]);
}];
輸出:
2017-10-03 21:12:43.774690+0800 RAC[29460:1801547] x: 1
2017-10-03 21:12:43.774919+0800 RAC[29460:1801547] x: 2
2017-10-03 21:12:43.775051+0800 RAC[29460:1801547] x: 3
源碼:
- (__kindof RACStream *)take:(NSUInteger)count {
Class class = self.class;
if (count == 0) return class.empty;
return [[self bind:^{
__block NSUInteger taken = 0;
return ^ id (id value, BOOL *stop) {
if (taken < count) {
++taken;
if (taken == count) *stop = YES;
return [class return:value];
} else {
return nil;
}
};
}] setNameWithFormat:@"[%@] -take: %lu", self.name, (unsigned long)count];
}
takeUntilBlock:
- (__kindof RACStream *)takeUntilBlock:(BOOL (^)(id x))predicate {
NSCParameterAssert(predicate != nil);
Class class = self.class;
return [[self bind:^{
return ^ id (id value, BOOL *stop) {
if (predicate(value)) return nil;
return [class return:value];
};
}] setNameWithFormat:@"[%@] -takeUntilBlock:", self.name];
}
takeUntilBlock:是根據(jù)傳入的predicate閉包作為篩選條件的乔询。一旦predicate( )閉包滿足條件舒岸,那么新信號停止發(fā)送新信號蕊退,因為它被置為nil了。和函數(shù)名的意思是一樣的烁挟,take原信號的值婴洼,Until直到閉包滿足條件。
takeWhileBlock:
- (__kindof RACStream *)takeWhileBlock:(BOOL (^)(id x))predicate {
NSCParameterAssert(predicate != nil);
return [[self takeUntilBlock:^ BOOL (id x) {
return !predicate(x);
}] setNameWithFormat:@"[%@] -takeWhileBlock:", self.name];
}
takeWhileBlock:的信號集是takeUntilBlock:的信號集的補(bǔ)集撼嗓。全集是原信號柬采。takeWhileBlock:底層還是調(diào)用takeUntilBlock:,只不過判斷條件的是不滿足predicate( )閉包的集合且警。
takeUntil:
- (RACSignal *)takeUntil:(RACSignal *)signalTrigger {
return [[RACSignal createSignal:^(id<RACSubscriber> subscriber) {
RACCompoundDisposable *disposable = [RACCompoundDisposable compoundDisposable];
void (^triggerCompletion)(void) = ^{
[disposable dispose];
[subscriber sendCompleted];
};
RACDisposable *triggerDisposable = [signalTrigger subscribeNext:^(id _) {
triggerCompletion();
} completed:^{
triggerCompletion();
}];
[disposable addDisposable:triggerDisposable];
if (!disposable.disposed) {
RACDisposable *selfDisposable = [self subscribeNext:^(id x) {
[subscriber sendNext:x];
} error:^(NSError *error) {
[subscriber sendError:error];
} completed:^{
[disposable dispose];
[subscriber sendCompleted];
}];
[disposable addDisposable:selfDisposable];
}
return disposable;
}] setNameWithFormat:@"[%@] -takeUntil: %@", self.name, signalTrigger];
}
當(dāng)signalTrigger sendNext 或 sendCompleted時調(diào)用triggerCompletion閉包.阻斷原信號.
takeUntilReplacement:
- (RACSignal *)takeUntilReplacement:(RACSignal *)replacement {
return [RACSignal createSignal:^(id<RACSubscriber> subscriber) {
RACSerialDisposable *selfDisposable = [[RACSerialDisposable alloc] init];
RACDisposable *replacementDisposable = [replacement subscribeNext:^(id x) {
[selfDisposable dispose];
[subscriber sendNext:x];
} error:^(NSError *error) {
[selfDisposable dispose];
[subscriber sendError:error];
} completed:^{
[selfDisposable dispose];
[subscriber sendCompleted];
}];
if (!selfDisposable.disposed) {
selfDisposable.disposable = [[self
concat:[RACSignal never]]
subscribe:subscriber];
}
return [RACDisposable disposableWithBlock:^{
[selfDisposable dispose];
[replacementDisposable dispose];
}];
}];
}
原始信號concat一個[RACSignal never]信號,這樣可以保證原始信號完成不會調(diào)用新的信號的completed,可以一直等待
replacement信號.
當(dāng)接收到replacement信號時,取消原來信號的訂閱,由replacement信號代替原來的信號.
新的信號在沒有接受到replacement信號時,信號由原始信號發(fā)送(不會發(fā)送sendCompleted信號),直到接收到replacement信號后,新的信號由replacement信號發(fā)送
+zip:
壓縮多個信號,與-zip作業(yè)相同,不過-zip只能壓縮倆個信號,+zip可以壓縮多個信號.
示例代碼:
RACSignal *signal1 = [RACSignal createSignal:^RACDisposable * _Nullable(id<RACSubscriber> _Nonnull subscriber) {
[subscriber sendNext:@1];
[subscriber sendNext:@1];
return [[RACDisposable alloc] init];
}];
RACSignal *signal2 = [RACSignal createSignal:^RACDisposable * _Nullable(id<RACSubscriber> _Nonnull subscriber) {
[subscriber sendNext:@2];
[subscriber sendNext:@1];
return [[RACDisposable alloc] init];
}];
RACSignal *signal3 = [RACSignal createSignal:^RACDisposable * _Nullable(id<RACSubscriber> _Nonnull subscriber) {
[subscriber sendNext:@3];
[subscriber sendNext:@1];
return [[RACDisposable alloc] init];
}];
RACSignal *signal4 = [RACSignal createSignal:^RACDisposable * _Nullable(id<RACSubscriber> _Nonnull subscriber) {
[subscriber sendNext:@4];
[subscriber sendNext:@1];
return [[RACDisposable alloc] init];
}];
RACSignal *ziped = [RACSignal zip:RACTuplePack(signal1, signal2, signal3, signal4)];
[ziped subscribeNext:^(id _Nullable x) {
NSLog(@"x : %@", x);
}];
輸出:
2017-10-04 11:24:58.416203+0800 RAC[2168:55396] x : <RACTuple: 0x60c0000197a0> (
1,
2,
3,
4
)
2017-10-04 11:24:58.416827+0800 RAC[2168:55396] x : <RACTuple: 0x604000019d60> (
1,
1,
1,
1
)
源碼:
+ (__kindof RACStream *)zip:(id<NSFastEnumeration>)streams {
return [[self join:streams block:^(RACStream *left, RACStream *right) {
return [left zipWith:right];
}] setNameWithFormat:@"+zip: %@", streams];
}
+zip是對+join: block:方法的封裝.
+ (__kindof RACStream *)join:(id<NSFastEnumeration>)streams block:(RACStream * (^)(id, id))block {
RACStream *current = nil;
// Creates streams of successively larger tuples by combining the input
// streams one-by-one.
for (RACStream *stream in streams) {
// For the first stream, just wrap its values in a RACTuple. That way,
// if only one stream is given, the result is still a stream of tuples.
if (current == nil) {
current = [stream map:^(id x) {
return RACTuplePack(x);
}];
continue;
}
// 調(diào)用 外部block 關(guān)聯(lián)兩個block的邏輯關(guān)系
current = block(current, stream);
}
if (current == nil) return [self empty];
return [current map:^(RACTuple *xs) {
// Right now, each value is contained in its own tuple, sorta like:
//
// (((1), 2), 3)
//
// We need to unwrap all the layers and create a tuple out of the result.
NSMutableArray *values = [[NSMutableArray alloc] init];
while (xs != nil) {
[values insertObject:xs.last ?: RACTupleNil.tupleNil atIndex:0];
xs = (xs.count > 1 ? xs.first : nil);
}
return [RACTuple tupleWithObjectsFromArray:values];
}];
}
第一個信號的值用元組包裹,接著調(diào)用block依次zip后面的信號到current中.
此時信號中的每一個值由多層元組包裹,(((1), 2), 3)
像這樣.
最后將多層元組重新整理,變成單層元組.
+zip:reduce
+zip:方法和reduceEach:方法的結(jié)合.
+ (__kindof RACStream *)zip:(id<NSFastEnumeration>)streams reduce:(id (^)())reduceBlock {
NSCParameterAssert(reduceBlock != nil);
RACStream *result = [self zip:streams];
// Although we assert this condition above, older versions of this method
// supported this argument being nil. Avoid crashing Release builds of
// apps that depended on that.
if (reduceBlock != nil) result = [result reduceEach:reduceBlock];
return [result setNameWithFormat:@"+zip: %@ reduce:", streams];
}
如果理解+zip和reduceEach的實(shí)現(xiàn)這個就很好理解了,這里就不在詳細(xì)說明了.
scanWithStart: reduceWithIndex:
- (__kindof RACStream *)scanWithStart:(id)startingValue reduceWithIndex:(id (^)(id, id, NSUInteger))reduceBlock {
NSCParameterAssert(reduceBlock != nil);
Class class = self.class;
return [[self bind:^{
__block id running = startingValue;
__block NSUInteger index = 0;
return ^(id value, BOOL *stop) {
running = reduceBlock(running, value, index++);
return [class return:running];
};
}] setNameWithFormat:@"[%@] -scanWithStart: %@ reduceWithIndex:", self.name, RACDescription(startingValue)];
}
底層是由bind方法實(shí)現(xiàn)的,startingValue在block第一次調(diào)用的時候是running的值,running和next的邏輯關(guān)系由開發(fā)者自己實(shí)現(xiàn),running總是指向block的返回值.
distinctUntilChanged
- (__kindof RACStream *)distinctUntilChanged {
Class class = self.class;
return [[self bind:^{
__block id lastValue = nil;
__block BOOL initial = YES;
return ^(id x, BOOL *stop) {
if (!initial && (lastValue == x || [x isEqual:lastValue])) return [class empty];
initial = NO;
lastValue = x;
return [class return:x];
};
}] setNameWithFormat:@"[%@] -distinctUntilChanged", self.name];
}
distinctUntilChanged的實(shí)現(xiàn)是用bind來完成的粉捻。每次變換中都記錄一下原信號上一次發(fā)送過來的值,并與這一次進(jìn)行比較斑芜,如果是相同的值肩刃,就“吞”掉,返回empty信號。只有和原信號上一次發(fā)送的值不同树酪,變換后的新信號才把這個值發(fā)送出來浅碾。