查看 rxjs 源碼,我們?nèi)∫粋€比較簡單的操作符 every
作為例子狱意。從 every 的實現(xiàn)可以看到湖苞,every 函數(shù)調(diào)用 source 的 lift 函數(shù),傳入自定義的兩個類 EveryOperator 和 EverySubscriber详囤。接下來看代碼注釋财骨。
// Observable.js
import { canReportError } from "./util/canReportError";
import { toSubscriber } from "./util/toSubscriber";
import { observable as Symbol_observable } from "./symbol/observable";
import { pipeFromArray } from "./util/pipe";
import { config } from "./config";
export class Observable {
constructor(subscribe) {
this._isScalar = false;
if (subscribe) {
this._subscribe = subscribe;
}
}
// 返回一個新的Observable,將source置為自身藏姐,operator置為輸入的operator
lift(operator) {
const observable = new Observable();
observable.source = this;
observable.operator = operator;
return observable;
}
subscribe(observerOrNext, error, complete) {
const { operator } = this;
const sink = toSubscriber(observerOrNext, error, complete);
if (operator) {
// 下面設置的EveryOperator call方法就是在這里使用
// 這里傳入Subscriber 并且把自身的上一個Observable傳過去
sink.add(operator.call(sink, this.source));
} else {
sink.add(
this.source ||
(config.useDeprecatedSynchronousErrorHandling &&
!sink.syncErrorThrowable)
? this._subscribe(sink)
: this._trySubscribe(sink)
);
}
if (config.useDeprecatedSynchronousErrorHandling) {
if (sink.syncErrorThrowable) {
sink.syncErrorThrowable = false;
if (sink.syncErrorThrown) {
throw sink.syncErrorValue;
}
}
}
return sink;
}
...
}
// every.js
import { Subscriber } from "../Subscriber";
export function every(predicate, thisArg) {
// 返回一個生成新Observable(source為傳入?yún)?shù)隆箩,operator為every)的函數(shù)
return (source) => source.lift(new EveryOperator(predicate, thisArg, source));
}
class EveryOperator {
constructor(predicate, thisArg, source) {
this.predicate = predicate;
this.thisArg = thisArg;
this.source = source;
}
// 當擁有當前Operator的Observable被subscribe的時候 會調(diào)用call(subscriber, this.source)
// observer subscribe函數(shù)傳入?yún)?shù)構(gòu)建的
// 這里的source是當前Observable的上一個Observable
call(observer, source) {
// 這里訂閱了上一個Observable
return source.subscribe(
// 這里實際上就會生成一個標準的Observer/Subscriber 有next error complete
// 這里的this.source 跟source是同一個source(應該)
new EverySubscriber(observer, this.predicate, this.thisArg, this.source)
);
}
}
// 繼承Subscriber
class EverySubscriber extends Subscriber {
// destination 就是給我們調(diào)用next 傳入處理后的值
// 從source進行訂閱
constructor(destination, predicate, thisArg, source) {
super(destination);
this.predicate = predicate;
this.thisArg = thisArg;
this.source = source;
this.index = 0;
this.thisArg = thisArg || this;
}
notifyComplete(everyValueMatch) {
// every的性質(zhì)導致他只會調(diào)用一次next 就馬上到complete
this.destination.next(everyValueMatch);
this.destination.complete();
}
// 每次有值過來 就會調(diào)用到_next
_next(value) {
let result = false;
try {
result = this.predicate.call(
this.thisArg,
value,
this.index++,
this.source
);
} catch (err) {
this.destination.error(err);
return;
}
if (!result) {
this.notifyComplete(false);
}
}
// 上游complete 觸發(fā)_complete
_complete() {
this.notifyComplete(true);
}
}
//# sourceMappingURL=every.js.map
當然,我們自己去封裝操作符的時候不一定要按照它的邏輯羔杨。我們通過上面代碼的分析捌臊,我們可以一個操作符函數(shù)為
const myoperator = (myargs) => (source) => new Observable();
其中,你要返回一個 Observable兜材,在構(gòu)造 Observable 時通過 subscribe 上一個 Observable 即 source理澎,進行 next 向下游發(fā)出值。
這個 Observable 的構(gòu)造方式可以是通過 Observable.prototype.lift (source.lift)構(gòu)建曙寡,通過傳入一個帶有 call 方法的 Operator 類糠爬,而這個類的 call 方法返回一個 Subscription。
下面做一個簡單的 square 函數(shù)示例举庶。
// 使用Observable.create
const square = () => (source) =>
Observable.create((subscriber) => {
const subscription = source.subscribe(
(value) => {
subscriber.next(value * value);
},
(err) => {
subscriber.error(err);
},
() => {
subscriber.complete();
}
);
// 這里要返回subscription 參考TeardownLogic
return subscription;
});
// 使用lift
class SquareOperator {
constructor(thisArg, source) {
this.source = source;
}
call(subscriber, source) {
return source.subscribe({
next: (value) => {
subscriber.next(value * value);
},
error: (error) => {
subscriber.error(error);
},
complete: () => {
subscriber.complete;
},
});
}
}
const square = () => (source, thisArg) =>
source.lift(new SquareOperator(thisArg, source));