- RxJS官方教程(一) 概覽
- RxJS官方教程(二) Observable
- RxJS官方教程(三) Observable剖析
- RxJS官方教程(四) Observer和Subscription
- RxJS官方教程(五) Subject
- RxJS官方教程(六) 算子
Subject 主題
什么是Subject(主題)京郑?RxJS Subject是一種特殊類型的Observable氧吐,允許將值多播到多個觀察者Observer。雖然普通的Observable是單播的(每個訂閱的Observer都擁有Observable的獨立執(zhí)行),但Subject是多播的照皆。
Subject類似于Observable栈顷,但可以多播到許多觀察者饥侵。Subject就像EventEmitters:它們維護著許多觀察者的注冊表痊银。
每個Subject都是一個Observable。給定一個主題航邢,您可以通過一個Observer subscribe
他耕赘,它將開始正常接收值。從Observer的角度來看膳殷,它無法判斷Observable執(zhí)行是來自簡單的單播Observable還是Subject操骡。
在Subject的內(nèi)部,subscribe
不會調(diào)用一個新的傳遞值的執(zhí)行赚窃。它只是將給定的Observer注冊到Observers列表中册招,類似于其他庫和語言中的addListener
的工作方式。
每個Subject都是一個Observer勒极。它是一個含有next(v)
是掰,error(e)
和complete()
的對象。要向主題提供新值辱匿,只需調(diào)用next(theValue)
键痛,它將被多播到已注冊接受該主題的觀察者。
在下面的示例中匾七,我們有兩個觀察者訂閱了主題絮短,我們向主題提供一些值:
var subject = new Rx.Subject();
subject.subscribe({
next: (v) => console.log('observerA: ' + v)
});
subject.subscribe({
next: (v) => console.log('observerB: ' + v)
})
subject.next(1);
subject.next(2);
在控制臺上使用以下輸出:
observerA: 1
observerB: 1
observerA: 2
observerB: 2
由于Subject是Observer,這也意味著您可以提供Subject作為任何Observable subscribe
的參數(shù)昨忆,如下例所示:
var subject = new Rx.Subject();
subject.subscribe({
next: (v) => console.log('observerA: ' + v)
})
subject.subscribe({
next: (v) => console.log('observerB: ' + v)
})
var observable = Rx.Observable.from([1,2,3]);
observable.subscribe(subject); // You can subscribe providing a Subject
其執(zhí)行如下:
observerA: 1
observerB: 1
observerA: 2
observerB: 2
observerA: 3
observerB: 3
通過上面的方法丁频,我們基本上只是通過Subject將單播Observable執(zhí)行轉(zhuǎn)換為多播。這演示了Subject是如何把Observable執(zhí)行共享給多個Observer的唯一方法。
還有的幾個Subject
特例:BehaviorSubject
席里,ReplaySubject
夺颤,和AsyncSubject
。
Multicasted Observable 多播的Observable
“多播Observable”通過可能有許多訂閱者的Subject傳遞通知胁勺,而普通的“單播Observable”僅向單個Observer發(fā)送通知。
多播Observable使用一個Subject來使多個Observers看到相同的Observable執(zhí)行独旷。
這其實是multicast
運算符的工作方式:觀察者訂閱基礎(chǔ)主題署穗,主題訂閱源Observable。以下示例類似于上一個使用的示例observable.subscribe(subject)
:
var source = Rx.Observable.from([1,2,3]);
var subject = new Rx.Subject();
var multicasted = source.multicast(subject);
// These are, under the hood, `subject.subscribe({...})`:
multicasted.subscribe({
next: (v) => console.log('observerA: ' + v)
});
multicasted.subscribe({
next: (v) => console.log('observerB: ' + v)
});
// This is, under the hood, `source.subscribe(subject)`:
multicasted.connect();
multicast
返回一個看起來像普通Observable的Observable嵌洼,但在訂閱時就像Subject一樣工作案疲。multicast
返回一個ConnectableObservable
,它只是一個帶有connect()
方法的Observable 麻养。
該connect()
方法對于確定何時開始共享Observable執(zhí)行非常重要褐啡。因為connect()
執(zhí)行了source.subscribe(subject)
,并且connect()
返回一個訂閱鳖昌,你可以執(zhí)行取消訂閱來取消Observable的執(zhí)行备畦。
參考計數(shù)
手動調(diào)用connect()
和處理訂閱通常很麻煩。通常许昨,我們希望在第一個Observer到達時自動連接懂盐,并在最后一個Observer取消訂閱時自動取消共享執(zhí)行。
請考慮以下示例糕档,其中訂閱按此列表所述進行:
- First Observer訂閱了多播Observable
- 多播的Observable已連接
- 該
next
值0
將傳遞給第一個Observer - Second Observer訂閱了多播Observable
- 該
next
值1
將傳遞給第一個Observer - 該
next
值1
將傳遞給第二個Observer - First Observer取消訂閱多播Observable
- 該
next
值2
將傳遞給第二個Observer - Second Observer取消訂閱了多播Observable
- 與多播Observable的連接已取消訂閱
為了通過顯式調(diào)用connect()
實現(xiàn)這一點莉恼,我們編寫以下代碼:
var source = Rx.Observable.interval(500);
var subject = new Rx.Subject();
var multicasted = source.multicast(subject);
var subscription1, subscription2, subscriptionConnect;
subscription1 = multicasted.subscribe({
next: (v) => console.log('observerA: ' + v)
});
// We should call `connect()` here, because the first
// subscriber to `multicasted` is interested in consuming values
subscriptionConnect = multicasted.connect();
setTimeout(() => {
subscription2 = multicasted.subscribe({
next: (v) => console.log('observerB: ' + v)
});
}, 600);
setTimeout(() => {
subscription1.unsubscribe();
}, 1200);
// We should unsubscribe the shared Observable execution here,
// because `multicasted` would have no more subscribers after this
setTimeout(() => {
subscription2.unsubscribe();
subscriptionConnect.unsubscribe(); // for the shared Observable execution
}, 2000);
如果我們希望避免顯式調(diào)用connect()
,我們可以使用ConnectableObservable的refCount()
方法(引用計數(shù))速那,該方法返回一個Observable俐银,用于跟蹤它擁有多少訂閱者。當訂閱者數(shù)量從0
增加到時1
端仰,它將調(diào)用connect()
我們捶惜,這將啟動共享執(zhí)行。只有當訂閱者數(shù)量從減少1
到0
完全取消訂閱時荔烧,才會停止進一步執(zhí)行售躁。
refCount
使多播Observable在第一個訂閱者到達時自動開始執(zhí)行,并在最后一個訂閱者離開時停止執(zhí)行茴晋。
以下是一個例子:
var source = Rx.Observable.interval(500);
var subject = new Rx.Subject();
var refCounted = source.multicast(subject).refCount();
var subscription1, subscription2, subscriptionConnect;
// This calls `connect()`, because
// it is the first subscriber to `refCounted`
console.log('observerA subscribed');
subscription1 = refCounted.subscribe({
next: (v) => console.log('observerA: ' + v)
});
setTimeout(() => {
console.log('observerB subscribed');
subscription2 = refCounted.subscribe({
next: (v) => console.log('observerB: ' + v)
});
}, 600);
setTimeout(() => {
console.log('observerA unsubscribed');
subscription1.unsubscribe();
}, 1200);
// This is when the shared Observable execution will stop, because
// `refCounted` would have no more subscribers after this
setTimeout(() => {
console.log('observerB unsubscribed');
subscription2.unsubscribe();
}, 2000);
哪個執(zhí)行輸出:
observerA subscribed
observerA: 0
observerB subscribed
observerA: 1
observerB: 1
observerA unsubscribed
observerB: 2
observerB unsubscribed
該refCount()
方法僅存在于ConnectableObservable上陪捷,它返回一個Observable
而不是另一個ConnectableObservable。
BehaviorSubject
Subject的變體之一是BehaviorSubject
诺擅,具有“當前值”的概念市袖。它存儲發(fā)布給消費者的最新值,每當新的Observer訂閱時,它將立即從BehaviorSubject
中獲得“當前值” 苍碟。
BehaviorSubject用于表示“隨時間變化的值”酒觅。例如,生日的事件流是Subject微峰,但是人的年齡的流將是BehaviorSubject舷丹。
在以下示例中,BehaviorSubject使用0
初始化蜓肆,第一個Observer在訂閱時將接收到0
颜凯。第二個Observer將接收到2
,即使它在2
發(fā)送后才訂閱的仗扬。
var subject = new Rx.BehaviorSubject(0); // 0 is the initial value
subject.subscribe({
next: (v) => console.log('observerA: ' + v)
});
subject.next(1);
subject.next(2);
subject.subscribe({
next: (v) => console.log('observerB: ' + v)
});
subject.next(3);
帶輸出:
observerA: 0
observerA: 1
observerA: 2
observerB: 2
observerA: 3
observerB: 3
ReplaySubject
ReplaySubject
類似于BehaviorSubject
症概,它可以將舊值發(fā)送給新訂閱者,但它也可以記錄Observable執(zhí)行的一部分早芭。
ReplaySubject
記錄來自O(shè)bservable執(zhí)行的多個值彼城,并將它們重放給新訂閱者。
創(chuàng)建ReplaySubject
時退个,您可以指定要重播的值的數(shù)量:
var subject = new Rx.ReplaySubject(3); // buffer 3 values for new subscribers
subject.subscribe({
next: (v) => console.log('observerA: ' + v)
});
subject.next(1);
subject.next(2);
subject.next(3);
subject.next(4);
subject.subscribe({
next: (v) => console.log('observerB: ' + v)
});
subject.next(5);
輸出:
observerA: 1
observerA: 2
observerA: 3
observerA: 4
observerB: 2
observerB: 3
observerB: 4
observerA: 5
observerB: 5
除了緩沖區(qū)大小之外募壕,您還可以指定以毫秒為單位的窗口時間,以確定記錄值的年齡语盈。在下面的示例中司抱,我們使用大緩沖區(qū)大小100
,但窗口時間參數(shù)僅為500
毫秒黎烈。
var subject = new Rx.ReplaySubject(100, 500 /* windowTime */);
subject.subscribe({
next: (v) => console.log('observerA: ' + v)
});
var i = 1;
setInterval(() => subject.next(i++), 200);
setTimeout(() => {
subject.subscribe({
next: (v) => console.log('observerB: ' + v)
});
}, 1000);
輸出如下习柠,其中第二個Observer將獲取發(fā)生在訂閱之前最后500
毫秒的事件3
,4
和5
:
observerA: 1
observerA: 2
observerA: 3
observerA: 4
observerA: 5
observerB: 3
observerB: 4
observerB: 5
observerA: 6
observerB: 6
...
AsyncSubject
AsyncSubject只將Observable執(zhí)行的最后一個值發(fā)送給它的觀察者照棋,并且只有在執(zhí)行完成時才會發(fā)送资溃。
var subject = new Rx.AsyncSubject();
subject.subscribe({
next: (v) => console.log('observerA: ' + v)
});
subject.next(1);
subject.next(2);
subject.next(3);
subject.next(4);
subject.subscribe({
next: (v) => console.log('observerB: ' + v)
});
subject.next(5);
subject.complete();
輸出:
observerA: 5
observerB: 5
AsyncSubject類似于last()
運算符,因為它等待complete
通知以便傳遞單個值烈炭。
小結(jié)
- BehaviorSubject 緩存一個值的Subject
- ReplaySubject 緩存多個值的Subject
- AsyncSubject 只返回最后一個值的Subject
官網(wǎng) http://reactivex.io/rxjs/manual/overview.html#subject