RxJS官方教程(五) Subject

Subject 主題

什么是Subject(主題)京郑?RxJS Subject是一種特殊類型的Observable氧吐,允許將值多播到多個觀察者Observer。雖然普通的Observable是單播的(每個訂閱的Observer都擁有Observable的獨立執(zhí)行),但Subject是多播的照皆。

Subject類似于Observable栈顷,但可以多播到許多觀察者饥侵。Subject就像EventEmitters:它們維護著許多觀察者的注冊表痊银。

每個Subject都是一個Observable。給定一個主題航邢,您可以通過一個Observer subscribe他耕赘,它將開始正常接收值。從Observer的角度來看膳殷,它無法判斷Observable執(zhí)行是來自簡單的單播Observable還是Subject操骡。

在Subject的內(nèi)部,subscribe不會調(diào)用一個新的傳遞值的執(zhí)行赚窃。它只是將給定的Observer注冊到Observers列表中册招,類似于其他庫和語言中的addListener的工作方式。

每個Subject都是一個Observer勒极。它是一個含有next(v)是掰,error(e)complete()的對象。要向主題提供新值辱匿,只需調(diào)用next(theValue)键痛,它將被多播到已注冊接受該主題的觀察者。

在下面的示例中匾七,我們有兩個觀察者訂閱了主題絮短,我們向主題提供一些值:

var subject = new Rx.Subject();

subject.subscribe({
    next: (v) => console.log('observerA: ' + v)
});
subject.subscribe({
    next: (v) => console.log('observerB: ' + v)
})

subject.next(1);
subject.next(2);

在控制臺上使用以下輸出:

observerA: 1
observerB: 1
observerA: 2
observerB: 2

由于Subject是Observer,這也意味著您可以提供Subject作為任何Observable subscribe的參數(shù)昨忆,如下例所示:

var subject = new Rx.Subject();

subject.subscribe({
    next: (v) => console.log('observerA: ' + v)
})
subject.subscribe({
    next: (v) => console.log('observerB: ' + v)
})

var observable = Rx.Observable.from([1,2,3]);

observable.subscribe(subject); // You can subscribe providing a Subject

其執(zhí)行如下:

observerA: 1
observerB: 1
observerA: 2
observerB: 2
observerA: 3
observerB: 3

通過上面的方法丁频,我們基本上只是通過Subject將單播Observable執(zhí)行轉(zhuǎn)換為多播。這演示了Subject是如何把Observable執(zhí)行共享給多個Observer的唯一方法。

還有的幾個Subject特例:BehaviorSubject席里,ReplaySubject夺颤,和AsyncSubject

Multicasted Observable 多播的Observable

“多播Observable”通過可能有許多訂閱者的Subject傳遞通知胁勺,而普通的“單播Observable”僅向單個Observer發(fā)送通知。

多播Observable使用一個Subject來使多個Observers看到相同的Observable執(zhí)行独旷。

這其實是multicast運算符的工作方式:觀察者訂閱基礎(chǔ)主題署穗,主題訂閱源Observable。以下示例類似于上一個使用的示例observable.subscribe(subject)

var source = Rx.Observable.from([1,2,3]);
var subject = new Rx.Subject();
var multicasted = source.multicast(subject);

// These are, under the hood, `subject.subscribe({...})`:
multicasted.subscribe({
  next: (v) => console.log('observerA: ' + v)
});
multicasted.subscribe({
  next: (v) => console.log('observerB: ' + v)
});

// This is, under the hood, `source.subscribe(subject)`:
multicasted.connect();

multicast返回一個看起來像普通Observable的Observable嵌洼,但在訂閱時就像Subject一樣工作案疲。multicast返回一個ConnectableObservable,它只是一個帶有connect()方法的Observable 麻养。

connect()方法對于確定何時開始共享Observable執(zhí)行非常重要褐啡。因為connect()執(zhí)行了source.subscribe(subject),并且connect()返回一個訂閱鳖昌,你可以執(zhí)行取消訂閱來取消Observable的執(zhí)行备畦。

參考計數(shù)

手動調(diào)用connect()和處理訂閱通常很麻煩。通常许昨,我們希望在第一個Observer到達時自動連接懂盐,并在最后一個Observer取消訂閱時自動取消共享執(zhí)行。

請考慮以下示例糕档,其中訂閱按此列表所述進行:

  1. First Observer訂閱了多播Observable
  2. 多播的Observable已連接
  3. next0將傳遞給第一個Observer
  4. Second Observer訂閱了多播Observable
  5. next1將傳遞給第一個Observer
  6. next1將傳遞給第二個Observer
  7. First Observer取消訂閱多播Observable
  8. next2將傳遞給第二個Observer
  9. Second Observer取消訂閱了多播Observable
  10. 與多播Observable的連接已取消訂閱

為了通過顯式調(diào)用connect()實現(xiàn)這一點莉恼,我們編寫以下代碼:

var source = Rx.Observable.interval(500);
var subject = new Rx.Subject();
var multicasted = source.multicast(subject);
var subscription1, subscription2, subscriptionConnect;

subscription1 = multicasted.subscribe({
  next: (v) => console.log('observerA: ' + v)
});
// We should call `connect()` here, because the first
// subscriber to `multicasted` is interested in consuming values
subscriptionConnect = multicasted.connect();

setTimeout(() => {
  subscription2 = multicasted.subscribe({
    next: (v) => console.log('observerB: ' + v)
  });
}, 600);

setTimeout(() => {
  subscription1.unsubscribe();
}, 1200);

// We should unsubscribe the shared Observable execution here,
// because `multicasted` would have no more subscribers after this
setTimeout(() => {
  subscription2.unsubscribe();
  subscriptionConnect.unsubscribe(); // for the shared Observable execution
}, 2000);

如果我們希望避免顯式調(diào)用connect(),我們可以使用ConnectableObservable的refCount()方法(引用計數(shù))速那,該方法返回一個Observable俐银,用于跟蹤它擁有多少訂閱者。當訂閱者數(shù)量從0增加到時1端仰,它將調(diào)用connect()我們捶惜,這將啟動共享執(zhí)行。只有當訂閱者數(shù)量從減少10完全取消訂閱時荔烧,才會停止進一步執(zhí)行售躁。

refCount 使多播Observable在第一個訂閱者到達時自動開始執(zhí)行,并在最后一個訂閱者離開時停止執(zhí)行茴晋。

以下是一個例子:

var source = Rx.Observable.interval(500);
var subject = new Rx.Subject();
var refCounted = source.multicast(subject).refCount();
var subscription1, subscription2, subscriptionConnect;

// This calls `connect()`, because
// it is the first subscriber to `refCounted`
console.log('observerA subscribed');
subscription1 = refCounted.subscribe({
  next: (v) => console.log('observerA: ' + v)
});

setTimeout(() => {
  console.log('observerB subscribed');
  subscription2 = refCounted.subscribe({
    next: (v) => console.log('observerB: ' + v)
  });
}, 600);

setTimeout(() => {
  console.log('observerA unsubscribed');
  subscription1.unsubscribe();
}, 1200);

// This is when the shared Observable execution will stop, because
// `refCounted` would have no more subscribers after this
setTimeout(() => {
  console.log('observerB unsubscribed');
  subscription2.unsubscribe();
}, 2000);

哪個執(zhí)行輸出:

observerA subscribed
observerA: 0
observerB subscribed
observerA: 1
observerB: 1
observerA unsubscribed
observerB: 2
observerB unsubscribed

refCount()方法僅存在于ConnectableObservable上陪捷,它返回一個Observable而不是另一個ConnectableObservable。

BehaviorSubject

Subject的變體之一是BehaviorSubject诺擅,具有“當前值”的概念市袖。它存儲發(fā)布給消費者的最新值,每當新的Observer訂閱時,它將立即從BehaviorSubject中獲得“當前值” 苍碟。

BehaviorSubject用于表示“隨時間變化的值”酒觅。例如,生日的事件流是Subject微峰,但是人的年齡的流將是BehaviorSubject舷丹。

在以下示例中,BehaviorSubject使用0初始化蜓肆,第一個Observer在訂閱時將接收到0颜凯。第二個Observer將接收到2,即使它在2發(fā)送后才訂閱的仗扬。

var subject = new Rx.BehaviorSubject(0); // 0 is the initial value

subject.subscribe({
  next: (v) => console.log('observerA: ' + v)
});

subject.next(1);
subject.next(2);

subject.subscribe({
  next: (v) => console.log('observerB: ' + v)
});

subject.next(3);

帶輸出:

observerA: 0
observerA: 1
observerA: 2
observerB: 2
observerA: 3
observerB: 3

ReplaySubject

ReplaySubject類似于BehaviorSubject症概,它可以將舊值發(fā)送給新訂閱者,但它也可以記錄Observable執(zhí)行的一部分早芭。

ReplaySubject記錄來自O(shè)bservable執(zhí)行的多個值彼城,并將它們重放給新訂閱者。

創(chuàng)建ReplaySubject時退个,您可以指定要重播的值的數(shù)量:

var subject = new Rx.ReplaySubject(3); // buffer 3 values for new subscribers

subject.subscribe({
  next: (v) => console.log('observerA: ' + v)
});

subject.next(1);
subject.next(2);
subject.next(3);
subject.next(4);

subject.subscribe({
  next: (v) => console.log('observerB: ' + v)
});

subject.next(5);

輸出:

observerA: 1
observerA: 2
observerA: 3
observerA: 4
observerB: 2
observerB: 3
observerB: 4
observerA: 5
observerB: 5

除了緩沖區(qū)大小之外募壕,您還可以指定以毫秒為單位的窗口時間,以確定記錄值的年齡语盈。在下面的示例中司抱,我們使用大緩沖區(qū)大小100,但窗口時間參數(shù)僅為500毫秒黎烈。

var subject = new Rx.ReplaySubject(100, 500 /* windowTime */);

subject.subscribe({
  next: (v) => console.log('observerA: ' + v)
});

var i = 1;
setInterval(() => subject.next(i++), 200);

setTimeout(() => {
  subject.subscribe({
    next: (v) => console.log('observerB: ' + v)
  });
}, 1000);

輸出如下习柠,其中第二個Observer將獲取發(fā)生在訂閱之前最后500毫秒的事件345

observerA: 1
observerA: 2
observerA: 3
observerA: 4
observerA: 5
observerB: 3
observerB: 4
observerB: 5
observerA: 6
observerB: 6
...

AsyncSubject

AsyncSubject只將Observable執(zhí)行的最后一個值發(fā)送給它的觀察者照棋,并且只有在執(zhí)行完成時才會發(fā)送资溃。

var subject = new Rx.AsyncSubject();

subject.subscribe({
  next: (v) => console.log('observerA: ' + v)
});

subject.next(1);
subject.next(2);
subject.next(3);
subject.next(4);

subject.subscribe({
  next: (v) => console.log('observerB: ' + v)
});

subject.next(5);
subject.complete();

輸出:

observerA: 5
observerB: 5

AsyncSubject類似于last()運算符,因為它等待complete通知以便傳遞單個值烈炭。

小結(jié)

  • BehaviorSubject 緩存一個值的Subject
  • ReplaySubject 緩存多個值的Subject
  • AsyncSubject 只返回最后一個值的Subject

官網(wǎng) http://reactivex.io/rxjs/manual/overview.html#subject

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末溶锭,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子符隙,更是在濱河造成了極大的恐慌趴捅,老刑警劉巖,帶你破解...
    沈念sama閱讀 211,348評論 6 491
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件霹疫,死亡現(xiàn)場離奇詭異拱绑,居然都是意外死亡,警方通過查閱死者的電腦和手機丽蝎,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,122評論 2 385
  • 文/潘曉璐 我一進店門猎拨,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人,你說我怎么就攤上這事红省《罡鳎” “怎么了?”我有些...
    開封第一講書人閱讀 156,936評論 0 347
  • 文/不壞的土叔 我叫張陵吧恃,是天一觀的道長虾啦。 經(jīng)常有香客問我,道長痕寓,這世上最難降的妖魔是什么傲醉? 我笑而不...
    開封第一講書人閱讀 56,427評論 1 283
  • 正文 為了忘掉前任,我火速辦了婚禮厂抽,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘丁眼。我一直安慰自己筷凤,他們只是感情好,可當我...
    茶點故事閱讀 65,467評論 6 385
  • 文/花漫 我一把揭開白布苞七。 她就那樣靜靜地躺著藐守,像睡著了一般。 火紅的嫁衣襯著肌膚如雪蹂风。 梳的紋絲不亂的頭發(fā)上卢厂,一...
    開封第一講書人閱讀 49,785評論 1 290
  • 那天,我揣著相機與錄音惠啄,去河邊找鬼慎恒。 笑死,一個胖子當著我的面吹牛撵渡,可吹牛的內(nèi)容都是我干的融柬。 我是一名探鬼主播,決...
    沈念sama閱讀 38,931評論 3 406
  • 文/蒼蘭香墨 我猛地睜開眼趋距,長吁一口氣:“原來是場噩夢啊……” “哼粒氧!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起节腐,我...
    開封第一講書人閱讀 37,696評論 0 266
  • 序言:老撾萬榮一對情侶失蹤外盯,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后翼雀,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體饱苟,經(jīng)...
    沈念sama閱讀 44,141評論 1 303
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 36,483評論 2 327
  • 正文 我和宋清朗相戀三年狼渊,在試婚紗的時候發(fā)現(xiàn)自己被綠了掷空。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 38,625評論 1 340
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖坦弟,靈堂內(nèi)的尸體忽然破棺而出护锤,到底是詐尸還是另有隱情,我是刑警寧澤酿傍,帶...
    沈念sama閱讀 34,291評論 4 329
  • 正文 年R本政府宣布烙懦,位于F島的核電站,受9級特大地震影響赤炒,放射性物質(zhì)發(fā)生泄漏氯析。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 39,892評論 3 312
  • 文/蒙蒙 一莺褒、第九天 我趴在偏房一處隱蔽的房頂上張望掩缓。 院中可真熱鬧,春花似錦遵岩、人聲如沸你辣。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,741評論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽舍哄。三九已至,卻和暖如春誊锭,著一層夾襖步出監(jiān)牢的瞬間表悬,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 31,977評論 1 265
  • 我被黑心中介騙來泰國打工丧靡, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留蟆沫,地道東北人。 一個月前我還...
    沈念sama閱讀 46,324評論 2 360
  • 正文 我出身青樓温治,卻偏偏與公主長得像饥追,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子罐盔,可洞房花燭夜當晚...
    茶點故事閱讀 43,492評論 2 348

推薦閱讀更多精彩內(nèi)容

  • 介紹 RxJS是一個異步編程的庫但绕,同時它通過observable序列來實現(xiàn)基于事件的編程。它提供了一個核心的類型:...
    泓滎閱讀 16,590評論 0 12
  • 發(fā)現(xiàn) 關(guān)注 消息 RxSwift入坑解讀-你所需要知道的各種概念 沸沸騰關(guān)注 2016.11.27 19:11*字...
    楓葉1234閱讀 2,788評論 0 2
  • 本文章內(nèi)部分圖片資源來自RayWenderlich.com 本文結(jié)合自己的理解來總結(jié)介紹一下RxSwift最基本的...
    FKSky閱讀 2,863評論 4 14
  • 創(chuàng)建Observable: Rx.Observable.create 是 Observable 構(gòu)造函數(shù)的別名惶看,它...
    柳源居士閱讀 4,274評論 0 2
  • 我從去年開始使用 RxJava 捏顺,到現(xiàn)在一年多了。今年加入了 Flipboard 后纬黎,看到 Flipboard 的...
    huqj閱讀 1,840評論 0 21