從一個簡單的rxjs demo來看rxjs部分源碼

demo 地址 https://github.com/YardWill/beginner-reactive-programming-with-rxjs

我們通過四個例子來看rxjs.

1. example-1

const promise = new Promise((resolve) => {
  setTimeout(() => {
    resolve('Hello from a Promise!');
  }, 2000);
});

promise.then(value => console.log(value));

這是一個簡單的promise實現(xiàn)2000ms之后的輸出。我們接下來使用rxjs來試試如何實現(xiàn)這個功能拳恋。

2. example-2

import { Observable } from 'rxjs/Observable';

const observable = new Observable((observer) => {
  setTimeout(() => {
    observer.next('Hello from a Observable!');
  }, 2000);
});

observable.subscribe(value => console.log(value));

我們先通過Observable的接口來注冊一個可觀測對象瞪讼,(如果對觀察者模式不太熟悉的話可以先看這篇文章觀察者簡單實現(xiàn)

之后我們使用subscribe來訂閱一個事件(subscribe函數(shù)內(nèi)部執(zhí)行在observer內(nèi)注冊的函數(shù))榄棵,當observer觸發(fā)next的時候就執(zhí)行subscribe內(nèi)注冊的函數(shù)。

我們來看看源碼忿偷。
以下是Observable的構(gòu)造函數(shù)补履,可以看出,除了綁定subscribe函數(shù)之外朱躺,什么都沒有做。

constructor(subscribe?: (this: Observable<T>, subscriber: Subscriber<T>) => TeardownLogic) {
  if (subscribe) {
    this._subscribe = subscribe;
  }
}

下面是Observable類上的subscribe方法哺徊。簡單來講就是去執(zhí)行了在constructor內(nèi)注冊的subscribe函數(shù)室琢,并將observerOrNext push到subscriptions內(nèi)乾闰。然后return的對象是一個subscriptions落追。

  subscribe(observerOrNext?: PartialObserver<T> | ((value: T) => void),
            error?: (error: any) => void,
            complete?: () => void): Subscription {

    const { operator } = this;
    const sink = toSubscriber(observerOrNext, error, complete);

    if (operator) {
      operator.call(sink, this.source);
    } else {
      sink.add(
        this.source || (config.useDeprecatedSynchronousErrorHandling && !sink.syncErrorThrowable) ?
        this._subscribe(sink) :
        this._trySubscribe(sink)
      );
    }

    if (config.useDeprecatedSynchronousErrorHandling) {
      if (sink.syncErrorThrowable) {
        sink.syncErrorThrowable = false;
        if (sink.syncErrorThrown) {
          throw sink.syncErrorValue;
        }
      }
    }

    return sink;
  }

你一定覺得很疑惑Observable、subscribe涯肩、subscriptions這些名字都代表什么意思轿钠。我們下面來講一個通俗的故事巢钓。

從前有一家鮮奶公司,提供不同種類的鮮奶疗垛,每天都會送貨一次症汹。有一天,小明花了20/天的錢去訂了這家公司的進口鮮奶贷腕,小花花了10元/天的錢訂了國產(chǎn)鮮奶背镇。那么公司收到的訂單列表就是這兩個,公司會在固定時間去把鮮奶送到用戶的手上泽裳。

我們看看這個故事里面的observable瞒斩、subscribe、subscriptions分別是什么涮总?

  1. observable: 毫無疑問胸囱,鮮奶公司是一個可訂閱對象,我們可以向鮮奶公司訂閱我們需要的鮮奶瀑梗。
  2. subscribe: subscribe這是一個動作烹笔,小明和小花去定了這家鮮奶公司的牛奶。
  3. subscriptions: 訂完牛奶后抛丽,小明和小花的訂單就已經(jīng)在鮮奶公司的訂單列表上了谤职,這個訂單列表就是subscriptions。
  4. 另外亿鲜,我們將setTimeout改成setInterval柬帕,這時我們就可以想象鮮奶公司每天都會觸發(fā)發(fā)貨的工作,也就是執(zhí)行next方法狡门。next方法可以當做是鮮奶公司對照著訂單列表對小明和小花進行發(fā)貨陷寝。

這樣看起來,理解這幾個對象應該不難了吧其馏。我們把這個故事改編成代碼凤跑。

import { Observable } from 'rxjs/Observable';

// 鮮奶公司
const interval$ = new Observable((observer) => {
  let count = 0;
  const interval = setInterval(() => {
    console.log('鮮奶公司準時發(fā)貨');
    observer.next(count += 1);
  }, 1000);

  return () => {
    clearInterval(interval);
  };
});

// 小明訂奶
const littleMing = count => console.log('小明收到', count, '瓶奶');
// 小花訂奶
const littleHua = count => console.log('小花收到', count, '瓶奶');
const subscription1 = interval$.subscribe(littleMing);
const subscription2 = interval$.subscribe(littleHua);

subscriptions包含[subscription1, subscription2]。
那么接下來我們再來思考一個問題叛复,如果小明不想繼續(xù)訂牛奶了仔引,他應該怎么通知鮮奶公司不再發(fā)貨?我們來看 example-3

3. example-3

我們通過上面的故事來修改一下我們的代碼褐奥。

import { Observable } from 'rxjs/Observable';

// 鮮奶公司
const interval$ = new Observable((observer) => {
  let count = 0;
  const interval = setInterval(() => {
    console.log('鮮奶公司準時發(fā)貨');
    observer.next(count += 1);
  }, 1000);

  return () => {
    clearInterval(interval);
  };
});

// 小明訂奶
const littleMing = count => console.log('小明收到', count, '瓶奶');
// 小花訂奶
const littleHua = count => console.log('小花收到', count, '瓶奶');
const subscription1 = interval$.subscribe(littleMing);
const subscription2 = interval$.subscribe(littleHua);
setTimeout(() => subscription1.unsubscribe(), 3000);

我們可以看到在最后我們把subscription1給unsubscribe了咖耘,我們看看unsubscribe函數(shù)內(nèi)做了什么?

  /**
   * Disposes the resources held by the subscription. May, for instance, cancel
   * an ongoing Observable execution or cancel any other type of work that
   * started when the Subscription was created.
   * @return {void}
   */
  unsubscribe(): void {
    let hasErrors = false;
    let errors: any[];

    if (this.closed) {
      return;
    }

    let { _parent, _parents, _unsubscribe, _subscriptions } = (<any> this);

    this.closed = true;
    this._parent = null;
    this._parents = null;
    // null out _subscriptions first so any child subscriptions that attempt
    // to remove themselves from this subscription will noop
    this._subscriptions = null;

    let index = -1;
    let len = _parents ? _parents.length : 0;

    // if this._parent is null, then so is this._parents, and we
    // don't have to remove ourselves from any parent subscriptions.
    // 移除subscription
    while (_parent) {
      _parent.remove(this);
      // if this._parents is null or index >= len,
      // then _parent is set to null, and the loop exits
      _parent = ++index < len && _parents[index] || null;
    }

    if (isFunction(_unsubscribe)) {
      let trial = tryCatch(_unsubscribe).call(this);
      if (trial === errorObject) {
        hasErrors = true;
        errors = errors || (
          errorObject.e instanceof UnsubscriptionError ?
            flattenUnsubscriptionErrors(errorObject.e.errors) : [errorObject.e]
        );
      }
    }
    
    if (isArray(_subscriptions)) {

      index = -1;
      len = _subscriptions.length;

      while (++index < len) {
        const sub = _subscriptions[index];
        if (isObject(sub)) {
          let trial = tryCatch(sub.unsubscribe).call(sub);
          if (trial === errorObject) {
            hasErrors = true;
            errors = errors || [];
            let err = errorObject.e;
            if (err instanceof UnsubscriptionError) {
              errors = errors.concat(flattenUnsubscriptionErrors(err.errors));
            } else {
              errors.push(err);
            }
          }
        }
      }
    }

    if (hasErrors) {
      throw new UnsubscriptionError(errors);
    }
  }

這里的代碼理解起來也不難撬码,最后去執(zhí)行了clearInterval(interval)將定時器去掉儿倒,并把當前的訂單(subscription)移除出訂單列表(subscriptions)。

4. example-4

接下來小明和小花都有各自的需求改變,比如小明想要每天兩瓶奶夫否,而小花需要隔天收到一瓶奶彻犁,那么我們應該怎么做呢?看下面代碼凰慈。

import { Observable } from 'rxjs/Observable';
import 'rxjs/add/operator/map';
import 'rxjs/add/operator/filter';

// 鮮奶公司
const interval$ = new Observable<number>((observer) => {
  let count = 0;
  const interval = setInterval(() => {
    console.log('鮮奶公司準時發(fā)貨');
    observer.next(count += 1);
  }, 1000);

  return () => {
    clearInterval(interval);
  };
});

// 小明訂奶
const littleMing = count => console.log('小明收到', count, '瓶奶');
// 小花訂奶
const littleHua = count => console.log('小花收到', count, '瓶奶');

// 小明打算每天多訂一份的鮮奶
const subscription1 = interval$
  .map(value => value * 2)
  .subscribe(littleMing);

// ----1----2----3----4--->
//      map => x * 2
// ----2----4----6----8--->

// 小花打算讓鮮奶公司隔天送一瓶
const subscription2 = interval$
  .filter(value => value % 2 === 0)
  .map(value => value / 2)
  .subscribe(littleHua);

// ----1----2----3----4--->
//      filter & map
// ---------1---------2--->

在這里我們引入了map和filter這兩個rx的操作符汞幢,來實現(xiàn)我們需要變更的需求,看起來是不是很簡單微谓。當然還有更多的操作符(我們就不一一介紹了)森篷,操作符也是rxjs內(nèi)的很大一部分組成,可以把它比作是lodash內(nèi)的工具類豺型。

5. example-5

我們最后再來看看rxjs在前端事件監(jiān)聽上的用法疾宏。

import { Observable } from 'rxjs/Observable';
import 'rxjs/add/observable/fromEvent';
import 'rxjs/add/observable/merge';
import 'rxjs/add/operator/scan';
import 'rxjs/add/operator/map';

const incrementClicks$ = Observable.fromEvent(document.getElementById('increment'), 'click');
const decrementClicks$ = Observable.fromEvent(document.getElementById('decrement'), 'click');

Observable
  .merge(incrementClicks$, decrementClicks$)
  .map((event: any) => parseInt(event.target.value, 10))
  .scan((total, value) => total + value, 0)
  .subscribe((total) => {
    document.getElementById('counter').innerText = total.toString();
  });

merge將兩個Observable對象合成一個Observable對象,然后map對數(shù)據(jù)進行操作触创。
scan可以比作一個reducer函數(shù)坎藐,每一次click之后會對數(shù)據(jù)進行處理,并保留之前的數(shù)據(jù)哼绑。
最后我們?nèi)ubscribe這個事件岩馍,然后做出相應修改。

個人博客 https://www.yardwill.com/

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末抖韩,一起剝皮案震驚了整個濱河市蛀恩,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌茂浮,老刑警劉巖双谆,帶你破解...
    沈念sama閱讀 218,036評論 6 506
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異席揽,居然都是意外死亡顽馋,警方通過查閱死者的電腦和手機,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,046評論 3 395
  • 文/潘曉璐 我一進店門幌羞,熙熙樓的掌柜王于貴愁眉苦臉地迎上來寸谜,“玉大人,你說我怎么就攤上這事属桦⌒艹眨” “怎么了?”我有些...
    開封第一講書人閱讀 164,411評論 0 354
  • 文/不壞的土叔 我叫張陵聂宾,是天一觀的道長果善。 經(jīng)常有香客問我,道長系谐,這世上最難降的妖魔是什么巾陕? 我笑而不...
    開封第一講書人閱讀 58,622評論 1 293
  • 正文 為了忘掉前任,我火速辦了婚禮,結(jié)果婚禮上惜论,老公的妹妹穿的比我還像新娘。我一直安慰自己止喷,他們只是感情好馆类,可當我...
    茶點故事閱讀 67,661評論 6 392
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著弹谁,像睡著了一般乾巧。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上预愤,一...
    開封第一講書人閱讀 51,521評論 1 304
  • 那天沟于,我揣著相機與錄音,去河邊找鬼植康。 笑死旷太,一個胖子當著我的面吹牛,可吹牛的內(nèi)容都是我干的销睁。 我是一名探鬼主播供璧,決...
    沈念sama閱讀 40,288評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼冻记!你這毒婦竟也來了睡毒?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 39,200評論 0 276
  • 序言:老撾萬榮一對情侶失蹤冗栗,失蹤者是張志新(化名)和其女友劉穎演顾,沒想到半個月后,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體隅居,經(jīng)...
    沈念sama閱讀 45,644評論 1 314
  • 正文 獨居荒郊野嶺守林人離奇死亡钠至,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,837評論 3 336
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了胎源。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片棕洋。...
    茶點故事閱讀 39,953評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖乒融,靈堂內(nèi)的尸體忽然破棺而出掰盘,到底是詐尸還是另有隱情,我是刑警寧澤赞季,帶...
    沈念sama閱讀 35,673評論 5 346
  • 正文 年R本政府宣布愧捕,位于F島的核電站,受9級特大地震影響申钩,放射性物質(zhì)發(fā)生泄漏次绘。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 41,281評論 3 329
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望邮偎。 院中可真熱鬧管跺,春花似錦、人聲如沸禾进。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,889評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽泻云。三九已至艇拍,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間宠纯,已是汗流浹背卸夕。 一陣腳步聲響...
    開封第一講書人閱讀 33,011評論 1 269
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留婆瓜,地道東北人快集。 一個月前我還...
    沈念sama閱讀 48,119評論 3 370
  • 正文 我出身青樓,卻偏偏與公主長得像廉白,于是被迫代替她去往敵國和親碍讨。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當晚...
    茶點故事閱讀 44,901評論 2 355

推薦閱讀更多精彩內(nèi)容