Rxjs知識整理

最近在學習Rxjs,所以借此機會對rxjs知識點進行了一定的整理涉茧,用以加深自己對這部分知識的理解和記憶妆够。

簡介

Rxjs的內(nèi)容可以概括為一個核心三個重點,核心就是ObservableOperators永脓,三個重點分別是:

  • observer
  • Subject
  • schedulers

其中眾多的operator一直是我門學習Rxjs路上的攔路虎,文章主體內(nèi)容也將是圍繞這部分內(nèi)容講解鞋仍。

簡單的例子

下邊用一個簡單的例子來展示下Rxjs如何工作常摧。

var observable = Observable
    .create(function(observer) {
        observer.next('Jerry'); // RxJS 4.x 以前的版本用 onNext
        observer.next('Anna');
    })

// 訂閱 observable    
observable.subscribe(function(value) {
    console.log(value);
})

通過Observable身上的create方法可以創(chuàng)建一個Observable,參數(shù)中的回調(diào)函數(shù)設(shè)置這個Observable將會如何傳遞值威创,然后通過subscribe訂閱這個Observable落午。

這里值得一提的是rxjs的subscribe是同步執(zhí)行的,例如下邊這段代碼:

var observable = Observable
    .create(function(observer) {
        observer.next('Jerry'); // RxJS 4.x 以前的版本用 onNext
        observer.next('Anna');
    })

console.log('start');
observable.subscribe(function(value) {
    console.log(value);
});
console.log('end');

最終結(jié)果為:

start
Jerry
Anna
end

通過subscribe訂閱啟動的代碼在第二個log之后才在控制臺打印肚豺,由此可以看出subscribe是同步執(zhí)行的溃斋。

Rxjs的operators

學好Rxjs的operarors是學會Rxjs的關(guān)鍵,熟練使用Rxjs中各種各樣的operators可以大大提高我門工作效率

Operators的分類

Rxjs的operattors實在太多了吸申,于是我按照我自己的理解將Rxjs的operators進行了分類梗劫,這樣有理解記憶。

本人按照自己的理解將Operators分為8類截碴,如下圖所示:

image

下邊就按照分類分別對各個operator進行講解梳侨。

創(chuàng)造observabl類

create

const observable = Observable.create((observe) => {
    observe.next('value')
})
observable.subscribe({
    next:() => {
    },
    complete: () => {
    },
    error: () => {
    }
}

of

感覺of類似于一個迭代器,將參數(shù)迭代然后發(fā)出隐岛。

var source = of('Jerry', 'Anna');

source.subscribe({
    next: function(value) {
        console.log(value)
    },
    complete: function() {
        console.log('complete!');
    },
    error: function(error) {
        console.log(error)
    }
});

from

from的參數(shù)必須是一個類數(shù)組(set,iterator等)猫妙,其他和of一樣

var arr = ['Jerry', 'Anna', 2016, 2017, '30 days'] 
var source = from(arr);

source.subscribe({
    next: function(value) {
        console.log(value)
    },
    complete: function() {
        console.log('complete!');
    },
    error: function(error) {
        console.log(error)
    }
});

// Jerry
// Anna
// 2016
// 2017
// 30 days
// complete!

fromPromise

遍歷promise,其他和前兩個一樣

var source = fromPromise(new Promise((resolve, reject) => {
    setTimeout(() => {
      resolve('Hello RxJS!');
    },3000)
  }))

source.subscribe({
    next: function(value) {
        console.log(value)
    },
    complete: function() {
        console.log('complete!');
    },
    error: function(error) {
    console.log(error)
    }
});

fromEvent

var source = fromEvent(document.body, 'click');

source.subscribe({
    next: function(value) {
        console.log(value)
    },
    complete: function() {
        console.log('complete!');
    },
    error: function(error) {
        console.log(error)
    }
});

Empty,never和throw

empty 會給我們一個的 observable聚凹,如果我們訂閱這個 observable 割坠, 它會立即響應(yīng)complete 函數(shù)。

var source = empty();

source.subscribe({
    next: function(value) {
        console.log(value)
    },
    complete: function() {
        console.log('complete!');
    },
    error: function(error) {
        console.log(error)
    }
});
// complete!

throw妒牙,它也只做一件事就是拋出錯誤彼哼。

var source = throw('Oop!');

source.subscribe({
    next: function(value) {
        console.log(value)
    },
    complete: function() {
        console.log('complete!');
    },
    error: function(error) {
    console.log('Throw Error: ' + error)
    }
});
// Throw Error: Oop!

數(shù)學上還有一個跟零(0)很像的數(shù),那就是 無窮(∞)湘今,在 Observable 的世界裡我們用 never 來建立無窮的 observablenever 會給我們一個無窮的 observable敢朱,如果我們訂閱它又會發(fā)生什麼事呢?...什麼事都不會發(fā)生,它就是一個一直存在但卻什麼都不做的 observable拴签。

Interval和timer

interval和setInterval一樣孝常,幾秒鐘發(fā)送一個值,如下邊代碼所示:

var source = interval(1000);

source.subscribe({
    next: function(value) {
        console.log(value)
    },
    complete: function() {
        console.log('complete!');
    },
    error: function(error) {
    console.log('Throw Error: ' + error)
    }
});
// 0
// 1
// 2
// ...

參數(shù)為設(shè)定多少毫秒鐘發(fā)送一個值蚓哩。

timer有兩個參數(shù)构灸,第一個參數(shù)表示到發(fā)送第一個值的間隔時間,第二個參數(shù)表示從發(fā)送第二個參數(shù)開始岸梨,沒發(fā)送一個值的間隔時間喜颁,如果第二個參數(shù)為空則發(fā)送第一個參數(shù)后,終止曹阔,執(zhí)行complete函數(shù)半开。

var source = Rx.Observable.timer(1000, 5000);

source.subscribe({
    next: function(value) {
        console.log(value)
    },
    complete: function() {
        console.log('complete!');
    },
    error: function(error) {
    console.log('Throw Error: ' + error)
    }
});
// 0
// 1
// 2 ...

選擇器類

take

有的時候我門希望獲取Observable前幾個數(shù)然后結(jié)束(執(zhí)行complete方法)

var source = interval(1000);
var example = source.pipe(take(3));

example.subscribe({
    next: (value) => { console.log(value); },
    error: (err) => { console.log('Error: ' + err); },
    complete: () => { console.log('complete'); }
});
// 0
// 1
// 2
// complete

first

取第一個數(shù)然后結(jié)束,和take(1)效果一樣

var source = interval(1000);
var example = source.pipe(first());

example.subscribe({
    next: (value) => { console.log(value); },
    error: (err) => { console.log('Error: ' + err); },
    complete: () => { console.log('complete'); }
});

// 0
// complete

takeLast赃份,last

takeLast和take用法一樣寂拆,區(qū)別是該方法是取后邊幾個值,例子如下:

var source = interval(1000).pipe(take(6), takeLast(2));

source.subscribe({
  next: value => {
    console.log(value);
  },
  error: err => {
    console.log("Error: " + err);
  },
  complete: () => {
    console.log("complete");
  }
});
// 4
// 5
// complete

last是take Last(1)的簡寫芥炭,目的是取最后一個值漓库。

var source = interval(1000).pipe(take(6), last());

source.subscribe({
  next: value => {
    console.log(value);
  },
  error: err => {
    console.log("Error: " + err);
  },
  complete: () => {
    console.log("complete");
  }
});
// 5
// complete

控制數(shù)據(jù)流額類

takeUntil

參數(shù)為一個Observable,當參數(shù)Observable訂閱發(fā)生恃慧,終止takeUntil綁定的observable园蝠。

下邊這個案例,當點擊body時就會終止訂閱痢士。

const click = fromEvent(document.body, "click");
const source = interval(1000).pipe(takeUntil(click));

source.subscribe({
  next: value => {
    console.log(value);
  },
  error: err => {
    console.log("Error: " + err);
  },
  complete: () => {
    console.log("complete");
  }
});
// 0
// 1
// 2
// 3
// complete 當點擊body

skip

使用方式類似take彪薛,take是取前幾個,skip的意思是跳過前幾個怠蹂,取后邊幾個善延。

const source = interval(1000).pipe(skip(3));

example.subscribe({
    next: (value) => { console.log(value); },
    error: (err) => { console.log('Error: ' + err); },
    complete: () => { console.log('complete'); }
});
// 3
// 4
// 5...

上邊的例子中就跳過了前三個值,但是要注意的是獲取前三個值的時間還是要等待的

startWith

塞一個初始值給Observable

const source = interval(1000).pipe(startWith('start'));

example.subscribe({
    next: (value) => { console.log(value); },
    error: (err) => { console.log('Error: ' + err); },
    complete: () => { console.log('complete'); }
});
// start
// 0
// 1
// 2
// 3...

concat

concatconcatAll效果是一樣的城侧,區(qū)別在于 concat要傳遞參數(shù)易遣,參數(shù)必須是Observable類型。

concat 將多個observable串接起來前一個完成好了嫌佑,再執(zhí)行下一個豆茫。

const source1 = interval(1000).pipe(take(3));
const source2 = of(3);
const source3 = of (4,5);
const example = source1.pipe(concat(source2,source3))
example.subscribe({
  next: value => {
    console.log(value);
  },
  error: err => {
    console.log("Error: " + err);
  },
  complete: () => {
    console.log("complete");
  }
});
// 0
// 1
// 2
// 3
// 4
// 5
// complete

merge

merge使用方式和concat一樣,區(qū)別就是merge處理的Observable是異步執(zhí)行的屋摇,沒有先后順序揩魂。

const source1 = interval(1000).pipe(take(3));
const source2 = of(3);
const source3 = of (4,5);
const example = source1.pipe(merge(source2,source3))
example.subscribe({
  next: value => {
    console.log(value);
  },
  error: err => {
    console.log("Error: " + err);
  },
  complete: () => {
    console.log("complete");
  }
});

// 3
// 4
// 5
// 0
// 1
// 2
// complete

delay和delayWhen

delay會將observable第一次發(fā)出訂閱的時間延遲,如下:

const example = interval(300).pipe(take(5),delay(500));
example.subscribe({
    next: (value) => { console.log(value); },
    error: (err) => { console.log('Error: ' + err); },
    complete: () => { console.log('complete'); }
});
// 0
// 1
// 2
// 3
// 4

delayWhen和delay不同炮温,他的延遲時間由參數(shù)函數(shù)決定火脉,并且會將主訂閱對象發(fā)出的值作為 參數(shù):

var example = interval(300).pipe(
                                take(5),
                                delayWhen(
                                    x => Rx.Observable.empty().delay(100 * x *x));
                               );

example.subscribe({
    next: (value) => { console.log(value); },
    error: (err) => { console.log('Error: ' + err); },
    complete: () => { console.log('complete'); }
});

上邊的例子會將第一次source發(fā)出的值作為參數(shù)傳給delayWhen的函數(shù)作為參數(shù),只有在參數(shù)對象中的Observable發(fā)出訂閱的值,主訂閱對象才會繼續(xù)發(fā)出訂閱的值。

debounceTime

debounce 在每次收到元素倦挂,他會先把元素 cache 住并等待一段時間畸颅,如果這段時間內(nèi)已經(jīng)沒有收到任何元素,則把元素送出方援;如果這段時間內(nèi)又收到新的元素重斑,則會把原本 cache 住的元素釋放掉並重新計時,不斷反覆肯骇。

var example = interval(300).pipe(take(5),debounceTime(1000));

example.subscribe({
    next: (value) => { console.log(value); },
    error: (err) => { console.log('Error: ' + err); },
    complete: () => { console.log('complete'); }
});
// 4
// complete

throttleTime

跟 debounce 的不同是 throttle 會先開放送出元素窥浪,等到有元素被送出就會沈默一段時間,等到時間過了又會繼續(xù)發(fā)送元素,防止某個事件頻繁觸發(fā)笛丙,影響效率漾脂。

var example = interval(300).pipe(take(5),
                                throttleTime(1000);
                               );
example.subscribe({
    next: (value) => { console.log(value); },
    error: (err) => { console.log('Error: ' + err); },
    complete: () => { console.log('complete'); }
});
// 0
// 4
// complete

distinct和distinctUntilChanged

distinct會和已經(jīng)拿到的數(shù)據(jù)比較過濾掉 重復(fù)的元素如下:

var example = from(['a', 'b', 'c', 'a', 'b']).pipe(
zip(interval(300), (x, y) => x),
    distinct()
)
example.subscribe({
    next: (value) => { console.log(value); },
    error: (err) => { console.log('Error: ' + err); },
    complete: () => { console.log('complete'); }
});
// a
// b
// c
// complete

distinct第一個參數(shù)是個函數(shù),函數(shù)返回值就是distinct比較的值:

var source = from([{ value: 'a'}, { value: 'b' }, { value: 'c' }, { value: 'a' }, { value: 'c' }]).pipe(
zip(Rx.Observable.interval(300), (x, y) => x)
)
            .
var example = source.pipe(
distinct((x) => {
    return x.value
})
)

example.subscribe({
    next: (value) => { console.log(value); },
    error: (err) => { console.log('Error: ' + err); },
    complete: () => { console.log('complete'); }
});
// {value: "a"}
// {value: "b"}
// {value: "c"}
// complete

但是distinct底層是創(chuàng)建一個set來輔助去重胚鸯,如果數(shù)據(jù)很大骨稿,可能導致set過大,這個時候就需要設(shè)置distinct第二個參數(shù)來刷新set姜钳,第二個 參數(shù)是個observable到發(fā)起訂閱的時候就會清空set

var flushes = interval(1300);

var example = from(['a', 'b', 'c', 'a', 'c']).pipe(
zip(interval(300), (x, y) => x),
    distinct(
    null,flushes
    )
)
example.subscribe({
    next: (value) => { console.log(value); },
    error: (err) => { console.log('Error: ' + err); },
    complete: () => { console.log('complete'); }
});

// a
// b
// c
// c
// complete

distinctUntilChanged與distinct不同之處就是坦冠,distinctUntilChanged只會比較相鄰兩次輸入,例子如下:

var example = from(['a', 'b', 'c', 'c', 'b']).pipe(
            .zip(interval(300), (x, y) => x),
    distinctUntilChanged()
)

example.subscribe({
    next: (value) => { console.log(value); },
    error: (err) => { console.log('Error: ' + err); },
    complete: () => { console.log('complete'); }
});
// a
// b
// c
// b
// complete     

協(xié)調(diào)多個Observable類

combineLatest

協(xié)調(diào)過個observable哥桥,參數(shù)Observable中有一個發(fā)生變化都會發(fā)起訂閱(前提是每個observable都有值)辙浑。

// RxJS v6+
import { timer, combineLatest } from 'rxjs';

// timerOne 在1秒時發(fā)出第一個值,然后每4秒發(fā)送一次
const timerOne = timer(1000, 4000);
// timerTwo 在2秒時發(fā)出第一個值拟糕,然后每4秒發(fā)送一次
const timerTwo = timer(2000, 4000);
// timerThree 在3秒時發(fā)出第一個值判呕,然后每4秒發(fā)送一次
const timerThree = timer(3000, 4000);

// 當一個 timer 發(fā)出值時,將每個 timer 的最新值作為一個數(shù)組發(fā)出
const combined = combineLatest(timerOne, timerTwo, timerThree);

const subscribe = combined.subscribe(latestValues => {
  // 從 timerValOne送滞、timerValTwo 和 timerValThree 中獲取最新發(fā)出的值
    const [timerValOne, timerValTwo, timerValThree] = latestValues;
  /*
      示例:
    timerOne first tick: 'Timer One Latest: 1, Timer Two Latest:0, Timer Three Latest: 0
    timerTwo first tick: 'Timer One Latest: 1, Timer Two Latest:1, Timer Three Latest: 0
    timerThree first tick: 'Timer One Latest: 1, Timer Two Latest:1, Timer Three Latest: 1
  */
    console.log(
      `Timer One Latest: ${timerValOne},
     Timer Two Latest: ${timerValTwo},
     Timer Three Latest: ${timerValThree}`
    );
  }
);

當conbineLatest沒有傳入第二個參數(shù)侠草,返回的訂閱值是個數(shù)組,但是conbineLatest可以傳入第二個參數(shù)犁嗅,在發(fā)給Observabler進行數(shù)據(jù)處理边涕。

const source1 = interval(1000).pipe();
const source2 = interval(3000);
const source3 = of(4, 5);
const example = source1.pipe(combineLatest(source2, (x, y) => {
  console.log(x,y)
  return  x + y
}));
example.subscribe({
  next: value => {
    console.log(value);
  },
  error: err => {
    console.log("Error: " + err);
  },
  complete: () => {
    console.log("complete");
  }
});
// 2
// 3
// 4
// 5
// ........

zip

和combineLatest用法基本一樣,主要作用也是協(xié)調(diào)幾個observable褂微,zip的特點是只會取幾個observable對應(yīng)的index的值進行計算功蜓,例子如下:

const source1 = interval(1000).pipe(take(3));
const source2 = interval(3000).pipe(takee(3));
const example = source1.pipe(zip(source2, (x, y) => {
  return  x + y
}));
example.subscribe({
  next: value => {
    console.log(value);
  },
  error: err => {
    console.log("Error: " + err);
  },
  complete: () => {
    console.log("complete");
  }
});
// 0
// 2
// 4
// complete

withLatestFrom

withLatestFrom和combineLatest用法很類似,withLatestFrom主要特點是只有在蕊梧,主Observable發(fā)起值的時候才會發(fā)動訂閱霞赫,不過如果副O(jiān)bservable沒有發(fā)送過值,也不會發(fā)起訂閱肥矢,例子如下:

var main = from('hello').pipe(
  zip(interval(500), (x, y) => x)
)
var some = from([0,1,0,0,0,1]).pipe(
  zip(interval(300), (x, y) => x)
)

var example = main.pipe(
withLatestFrom(some, (x, y) => {
    return y === 1 ? x.toUpperCase() : x;
})
)

example.subscribe({
  next: value => {
    console.log(value);
  },
  error: err => {
    console.log("Error: " + err);
  },
  complete: () => {
    console.log("complete");
  }
});

concatMap

concatMap就是map加上concatAll

var source = fromEvent(document.body, 'click');

var example = source.pipe(
                .map(e => interval(1000).pipe(take(3))),
                .concatAll();
)

example.subscribe({
    next: (value) => { console.log(value); },
    error: (err) => { console.log('Error: ' + err); },
    complete: () => { console.log('complete'); }
});

轉(zhuǎn)化成concatMap就是如下這樣:

var source = fromEvent(document.body, 'click');

var example = source.pipe(
    .concatMap(
        e => interval(100).pipe(take(3))
    )
)

example.subscribe({
    next: (value) => { console.log(value); },
    error: (err) => { console.log('Error: ' + err); },
    complete: () => { console.log('complete'); }
});

mergeMap

mergeMap同樣是mergeAll加上map

var source = fromEvent(document.body, 'click');

var example = source.pipe(
                .map(e => interval(1000).pipe(take(3))),
                .mergeAll();
)

example.subscribe({
    next: (value) => { console.log(value); },
    error: (err) => { console.log('Error: ' + err); },
    complete: () => { console.log('complete'); }
});

使用mergeMap的寫法就是如下這樣:

var source = fromEvent(document.body, 'click');

var example = source.pipe(
    mergeMap(
        e => interval(100).take(3)
    )
)

example.subscribe({
    next: (value) => { console.log(value); },
    error: (err) => { console.log('Error: ' + err); },
    complete: () => { console.log('complete'); }
});

switchMap

switch在rxjs6中只有switchMap

switch對比merge和concat有個特點就是附屬observable發(fā)起訂閱后會立刻解綁主observable端衰。

var source = fromEvent(document.body, 'click');

var example = source.pipe(
                    .switchMap(
                    e => interval(100).pipe(take(3))
                )
)

example.subscribe({
    next: (value) => { console.log(value); },
    error: (err) => { console.log('Error: ' + err); },
    complete: () => { console.log('complete'); }
});

三個map都有第二個參數(shù)叠洗,一個回調(diào)函數(shù),函數(shù)用來處理每個observable發(fā)起訂閱后的回調(diào)操作旅东,函數(shù)的參數(shù)有四個灭抑,分別是:

  • 外部 observable 送出的元素

  • 內(nèi)部 observable 送出的元素

  • 外部 observable 送出元素的 index

  • 內(nèi)部 observable 送出元素的 index

    拿concatMap舉例,在附屬observable發(fā)起訂閱后可以通過回調(diào)函數(shù)拿到observable的發(fā)送值進行操作抵代,類似的應(yīng)用場景在平常有很多腾节。

    function getPostData() {
        return fetch('https://jsonplaceholder.typicode.com/posts/1')
        .then(res => res.json())
    }
    var source = fromEvent(document.body, 'click');
    
    var example = source.pipe(
        concatMap(
                    e => from(getPostData()), 
                    (e, res, eIndex, resIndex) => res.title);
    )
    
    example.subscribe({
        next: (value) => { console.log(value); },
        error: (err) => { console.log('Error: ' + err); },
        complete: () => { console.log('complete'); }
    });
    
    

改變數(shù)據(jù)流結(jié)構(gòu)類

concatAll

將傳遞過來的Observable進行處理,一個個進行訂閱荤牍,前邊的處理完再處理后邊的Observable案腺,這樣原本類似為二維數(shù)組的結(jié)構(gòu)就變成一維數(shù)組了。

const {Observable, interval, of} = rxjs;
const { map, throttleTime, takeUntil, tap, take, concatAll, switchMap } = rxjs.operators

var obs1 = interval(1000).pipe(take(5))
var obs2 = interval(500).pipe(take(2));
var obs3 = interval(2000).pipe(take(1));

var source = of(obs1, obs2, obs3);

var example = source.pipe(concatAll());

example.subscribe({
    next: (value) => { console.log(value); },
    error: (err) => { console.log('Error: ' + err); },
    complete: () => { console.log('complete'); }
});

上邊的例子中會一個個按照順序執(zhí)行obs1康吵、obs2劈榨、obs3

concatAll沒有參數(shù),將多個observable串行處理晦嵌,前一個處理完再處理后邊的observable

var click = fromEvent(document.body, 'click');
var example = click.pipe(
    map(e => interval(1000)),
    concatAll()
)
example.subscribe({
    next: (value) => { console.log(value); },
    error: (err) => { console.log('Error: ' + err); },
    complete: () => { console.log('complete'); }
});
// (點擊後)
// 0
// 1
// 2
// 3
// 4
// 5 ...

mergeAll

mergeAll和concatAll用法基本一致同辣,區(qū)別在于mergeAll是并行處理Observable,實例如下:

var click = fromEvent(document.body, 'click');
var source = click.pipe(
    map(e => interval(1000))
);

var example = source.mergeAll();
example.subscribe({
    next: (value) => { console.log(value); },
    error: (err) => { console.log('Error: ' + err); },
    complete: () => { console.log('complete'); }
});

mergeAll使用特殊的一點就是mergeAll可以傳遞一個參數(shù)惭载,這個參數(shù)表示最大并行處理數(shù)量旱函,當處理的observable數(shù)量大于這個數(shù)字的時候,就需要等待在處理的observable有完成的才會分配資源處理描滔。mergeAll(1)的效果就和concatAll效果一樣棒妨。

數(shù)據(jù)操作類

map

和JavaScript中的map一樣

filter

執(zhí)行函數(shù)返回值為false就過濾掉

mapTo

將參數(shù)轉(zhuǎn)換為一個固定值

var source = interval(1000);
var newest = source.pipe(mapTo(2)); 

newest.subscribe(console.log);
// 2
// 2
// 2
// 2..

scan

數(shù)據(jù)累加計算

var main = from('hello').pipe(
  zip(interval(500), (x, y) => x)
)
const example = main.pipe(
  scan(
  (origin,next)=> origin + next
  )
)
example.subscribe({
  next: value => {
    console.log(value);
  },
  error: err => {
    console.log("Error: " + err);
  },
  complete: () => {
    console.log("complete");
  }
});
// h
// he
// hel
// hell
// hello
// complete

scan第二個參數(shù)為初始值

下邊綜合幾個operator實現(xiàn)一個例子

const addButton = document.getElementById('addButton');
const minusButton = document.getElementById('minusButton');
const state = document.getElementById('state');

const addClick = fromEvent(addButton, 'click').mapTo(1);
const minusClick = fromEvent(minusButton, 'click').mapTo(-1);

const numberState = empty().pipe(
  .startWith(0)
  .merge(addClick, minusClick)
  .scan((origin, next) => origin + next, 0)
)

numberState
  .subscribe({
    next: (value) => { state.innerHTML = value;},
    error: (err) => { console.log('Error: ' + err); },
    complete: () => { console.log('complete'); }
  });

repeat

很多時候如果Observable沒有發(fā)生錯誤,我門也希望可以重復(fù)發(fā)起訂閱伴挚,這個時候就要用到repeat方法了靶衍,repeat用法和retry基本一樣

var example = from(['a','b','c']).pipe(
    zip(interval(500), (x,y) => x),
    repeat()
)
example.subscribe({
    next: (value) => { console.log(value); },
    error: (err) => { console.log('Error: ' + err); },
    complete: () => { console.log('complete'); }
});

最后提供一個錯誤handle案例

const title = document.getElementById('title');

var example = from(['a','b','c','d',2]).pipe(
            .zip(Rx.Observable.interval(500), (x,y) => x)
            .map(x => x.toUpperCase()), 
            // 通常 source 會是建立即時同步的連線灾炭,像是 web socket
                catch(
                (error, obs) => empty().pipe(
                    .startWith('連線發(fā)生錯誤: 5秒後重連')
                    .concat(obs.pipe(delay(5000)))
                    )           
                 )
)
example.subscribe({
    next: (value) => { title.innerText = value },
    error: (err) => { console.log('Error: ' + err); },
    complete: () => { console.log('complete'); }
});

groupBy

groupBy類似數(shù)據(jù)庫中的group命令一樣

var people = [
  { name: "Anna", score: 100, subject: "English" },
  { name: "Anna", score: 90, subject: "Math" },
  { name: "Anna", score: 96, subject: "Chinese" },
  { name: "Jerry", score: 80, subject: "English" },
  { name: "Jerry", score: 100, subject: "Math" },
  { name: "Jerry", score: 90, subject: "Chinese" }
];

var example = from(people).pipe(
  groupBy(item => item.name),
  map(group =>
    group.pipe(
      reduce((acc, cur) => ({
        name: cur.name,
        score: acc.score + cur.score
      }))
    )
  ),
  mergeAll()
);

example.subscribe({
  next: value => {
    console.log(value);
  },
  error: err => {
    console.log("Error: " + err);
  },
  complete: () => {
    console.log("complete");
  }
});

緩存類

buffer茎芋、bufferTime和bufferCount

buffer是將主observable發(fā)出的值先緩存起來,在依賴的observable發(fā)起訂閱的時候在將值發(fā)出蜈出。

var source = interval(300);
var source2 = interval(1000);
var example = source.pipe(
    buffer(source2)
)

example.subscribe({
    next: (value) => { console.log(value); },
    error: (err) => { console.log('Error: ' + err); },
    complete: () => { console.log('complete'); }
});
// [0,1,2]
// [3,4,5]
// [6,7,8]...

使用bufferTime更簡單,設(shè)定時間田弥,在規(guī)定時間內(nèi)緩存值,到時間發(fā)出去

var source = interval(300);
var example = source.pipe(
bufferTime(1000)
)

example.subscribe({
    next: (value) => { console.log(value); },
    error: (err) => { console.log('Error: ' + err); },
    complete: () => { console.log('complete'); }
});
// [0,1,2]
// [3,4,5]
// [6,7,8]...

用簡單的多的方式就可以表達和上邊例子一樣的效果

除了時間控制緩存以外我們還可以用個數(shù)控制铡原,這就用到了bufferCount

var source = Rx.Observable.interval(300);
var example = source.pipe(
    bufferCount(3)
);

example.subscribe({
    next: (value) => { console.log(value); },
    error: (err) => { console.log('Error: ' + err); },
    complete: () => { console.log('complete'); }
});
// [0,1,2]
// [3,4,5]
// [6,7,8]...

上邊的例子就每次直到三個才會發(fā)送數(shù)值的效果偷厦。

我門可以利用buffer特性實現(xiàn)一些特殊效果,例如下邊這種:

const button = document.getElementById('demo');
const click = fromEvent(button, 'click');
const example = click.pipe(
    bufferTime(500),
    filter(arr => arr.length >= 2)
);

example.subscribe({
    next: (value) => { console.log('success'); },
    error: (err) => { console.log('Error: ' + err); },
    complete: () => { console.log('complete'); }
});

我門通過buffer就實現(xiàn)了只有雙擊鼠標才會觸發(fā)的效果

window和windowToggle

window一開始就會觸發(fā)訂閱

和buffer一樣都是現(xiàn)緩存數(shù)據(jù)等到一定條件然后發(fā)送數(shù)據(jù)燕刻,不同的是window會緩存數(shù)據(jù)到observable中只泼,下邊來個例子:

var click = fromEvent(document.body, "click");
var source = interval(1000);

const example = source.pipe(
  rxjs.operators.window(click
                       ),
  map((o)=> o.pipe(take(3))),
  mergeAll()
)

example.subscribe({
  next: value => {
    console.log(value);
  },
  error: err => {
    console.log("Error: " + err);
  },
  complete: () => {
    console.log("complete");
  }
});

每次點擊頁面就會將interval最近輸入兩個值打印出來, window初始會發(fā)送一個請求

windowToggle相對于widnow多了一個參數(shù)為回調(diào)函數(shù),用來標志結(jié)束條件卵洗,例子如下:

var source = Rx.Observable.interval(1000);
var mouseDown = Rx.Observable.fromEvent(document, 'mousedown');
var mouseUp = Rx.Observable.fromEvent(document, 'mouseup');

var example = source
  .windowToggle(mouseDown, () => mouseUp)
  .switch();

example.subscribe(console.log);

錯誤處理類

catch

catch當在訂閱過程中發(fā)現(xiàn)錯誤后就會調(diào)用请唱,然后結(jié)果就會發(fā)送給訂閱者的方法弥咪,例子如下:

var example = from(['a','b','c','d',2]).pipe(
    .zip(interval(500), (x,y) => x),
    map(x => x.toUpperCase()),
    catch(error => of('h'))
)

example.subscribe({
    next: (value) => { console.log(value); },
    error: (err) => { console.log('Error: ' + err); },
    complete: () => { console.log('complete'); }
}); 

catch方法結(jié)果不一定只能回傳observable,還可以回傳Promise或者是類數(shù)組(迭代器)等

同時catch第二個參數(shù)可以傳入當前的主Observable十绑,我門可以直接用參數(shù)進行操作聚至,完成一些功能贷屎,例如重新發(fā)起訂閱:

var example = from(['a','b','c','d',2]).pipe(
    .zip(interval(500), (x,y) => x),
    map(x => x.toUpperCase()),
    catch((error,obs) => obs)
)

example.subscribe({
    next: (value) => { console.log(value); },
    error: (err) => { console.log('Error: ' + err); },
    complete: () => { console.log('complete'); }
}); 

retry和retryWhen

retry控制Observable發(fā)生錯誤的時候可以重復(fù)發(fā)起訂閱经磅。

var example = from(['a','b','c','d',2]).pipe(
    .zip(interval(500), (x,y) => x),
    map(x => x.toUpperCase()),
    retry()
)

example.subscribe({
    next: (value) => { console.log(value); },
    error: (err) => { console.log('Error: ' + err); },
    complete: () => { console.log('complete'); }
}); 

當retry傳入?yún)?shù)的時候就表示Observable最多重復(fù)發(fā)起幾次侣肄,如果還不成功就執(zhí)行Observable的error方法敷扫。

retryWhen會將發(fā)生的錯誤封裝成一個Observable發(fā)送給retryWhen的函數(shù)绽昼,可以在其中進行很多操作膊夹,例如發(fā)送錯誤信息給技術(shù)人員掠廓,判斷哪地方發(fā)生錯誤握联。下邊的例子中為發(fā)生錯誤后延遲一秒在重復(fù)訂閱

var example = from(['a','b','c','d',2]).pipe(
    .zip(interval(500), (x,y) => x),
    map(x => x.toUpperCase()),
  retryWhen(errorObs => errorObs.pipe(delay(1000)))
)

example.subscribe({
    next: (value) => { console.log(value); },
    error: (err) => { console.log('Error: ' + err); },
    complete: () => { console.log('complete'); }
});

subject

上邊經(jīng)過很長的篇幅介紹了Rxjs的operators亏狰,下邊將介紹Rxjs另一重要的部分內(nèi)容Subject片择,介紹Subject之前先介紹一個知識點:Observable是可以被多次訂閱的,了解這個知識點可以幫助我們理解Subject是用來解決哪些問題骚揍,以及Subject的一些特性字管。

Observable是可以被多次訂閱

例如下邊這個例子,sourcebe被訂閱了兩次。

var source = interval(1000).pipe(take(3));

var observerA = {
    next: value => console.log('A next: ' + value),
    error: error => console.log('A error: ' + error),
    complete: () => console.log('A complete!')
}

var observerB = {
    next: value => console.log('B next: ' + value),
    error: error => console.log('B error: ' + error),
    complete: () => console.log('B complete!')
}

source.subscribe(observerA);
setTimeout(() => {
    source.subscribe(observerB);
}, 1000);

// "A next: 0"
// "A next: 1"
// "B next: 0"
// "A next: 2"
// "A complete!"
// "B next: 1"
// "B next: 2"
// "B complete!"

但是這種重復(fù)訂閱又個問題就是信不,各個訂閱都是獨立的嘲叔,有些時候我門希望新的訂閱是接在上個訂閱之后的,這個時候這種方式就不能滿足需求了抽活,使用subject就可以完成這種需要:

var source = interval(1000).pipe(take(3));

var observerA = {
    next: value => console.log('A next: ' + value),
    error: error => console.log('A error: ' + error),
    complete: () => console.log('A complete!')
}

var observerB = {
    next: value => console.log('B next: ' + value),
    error: error => console.log('B error: ' + error),
    complete: () => console.log('B complete!')
}

var subject = new Subject()

subject.subscribe(observerA)

source.subscribe(subject);

setTimeout(() => {
    subject.subscribe(observerB);
}, 1000);

// "A next: 0"
// "A next: 1"
// "B next: 1"
// "A next: 2"
// "B next: 2"
// "A complete!"
// "B complete!"

上邊這種效果就是利用了subject的組播特性硫戈,這也是在開發(fā)中經(jīng)常利用Subject解決的問題。

幾個特殊subject

Rxjs還提供了幾個特殊的Subject來滿足一些特殊需要

AsyncSubject

只有在訂閱complete時候調(diào)用下硕,在結(jié)束的時候會傳送一下最后的一個值

var subject = new AsyncSubject();
var observerA = {
    next: value => console.log('A next: ' + value),
    error: error => console.log('A error: ' + error),
    complete: () => console.log('A complete!')
}

var observerB = {
    next: value => console.log('B next: ' + value),
    error: error => console.log('B error: ' + error),
    complete: () => console.log('B complete!')
}

subject.subscribe(observerA);
subject.next(1);
subject.next(2);
subject.next(3);
subject.complete();
// "A next: 3"
// "A complete!"

setTimeout(() => {
    subject.subscribe(observerB);
    // "B next: 3"
    // "B complete!"
},3000)

ReplaySubject

在新訂閱的時候會發(fā)送最后幾個值

var subject = new ReplaySubject(2); // 重複發(fā)送最後 2 個元素
var observerA = {
    next: value => console.log('A next: ' + value),
    error: error => console.log('A error: ' + error),
    complete: () => console.log('A complete!')
}

var observerB = {
    next: value => console.log('B next: ' + value),
    error: error => console.log('B error: ' + error),
    complete: () => console.log('B complete!')
}

subject.subscribe(observerA);
subject.next(1);
// "A next: 1"
subject.next(2);
// "A next: 2"
subject.next(3);
// "A next: 3"

setTimeout(() => {
    subject.subscribe(observerB);
    // "B next: 2"
    // "B next: 3"
},3000)

BehaviorSubject

每次有新訂閱的時候都會發(fā)送給它當前的最新值

var subject = new BehaviorSubject(0); // 0 為起始值
var observerA = {
    next: value => console.log('A next: ' + value),
    error: error => console.log('A error: ' + error),
    complete: () => console.log('A complete!')
}

var observerB = {
    next: value => console.log('B next: ' + value),
    error: error => console.log('B error: ' + error),
    complete: () => console.log('B complete!')
}

subject.subscribe(observerA);
// "A next: 0"
subject.next(1);
// "A next: 1"
subject.next(2);
// "A next: 2"
subject.next(3);
// "A next: 3"

setTimeout(() => {
    subject.subscribe(observerB); 
    // "B next: 3"
},3000)

subject廣播簡化

var source = interval(1000).pipe(take(3));

var observerA = {
  next: value => console.log("A next: " + value),
  error: error => console.log("A error: " + error),
  complete: () => console.log("A complete!")
};

var observerB = {
  next: value => console.log("B next: " + value),
  error: error => console.log("B error: " + error),
  complete: () => console.log("B complete!")
};

var subject = new Subject();

subject.subscribe(observerA);

source.subscribe(subject);

setTimeout(() => {
  subject.subscribe(observerB);
}, 1000);

上邊這段代碼雖然可以實現(xiàn)suject的廣播丁逝,但是太過繁瑣,rxjs提供了簡化的方式梭姓。

multicast

利用multicast這個operator方法直接就可以利用subject的廣播特性霜幼,需要注意的是使用multicast,只有配合connect方法誉尖,才會發(fā)起訂閱

var source = interval(1000).pipe(take(3),multicast(new Subject()));

var observerA = {
  next: value => console.log("A next: " + value),
  error: error => console.log("A error: " + error),
  complete: () => console.log("A complete!")
};

var observerB = {
  next: value => console.log("B next: " + value),
  error: error => console.log("B error: " + error),
  complete: () => console.log("B complete!")
};

source.subscribe(observerA);

source.connect()
setTimeout(() => {
  source.subscribe(observerB);
}, 1000);

refount

使用multicast方法后**只有取消訂閱multicast產(chǎn)生的observable才會終止訂閱罪既。

var source = interval(1000).pipe(
             multicast(new Subject()); // 無限的 observable 
)

var observerA = {
    next: value => console.log('A next: ' + value),
    error: error => console.log('A error: ' + error),
    complete: () => console.log('A complete!')
}

var observerB = {
    next: value => console.log('B next: ' + value),
    error: error => console.log('B error: ' + error),
    complete: () => console.log('B complete!')
}

var subscriptionA = source.subscribe(observerA);

var realSubscription = source.connect();

var subscriptionB;
setTimeout(() => {
    subscriptionB = source.subscribe(observerB);
}, 1000);

setTimeout(() => {
    subscriptionA.unsubscribe();
    subscriptionB.unsubscribe(); 
    // 這裡雖然 A 跟 B 都退訂了,但 source 還會繼續(xù)送元素
}, 5000);

setTimeout(() => {
    realSubscription.unsubscribe();
    // 這裡 source 才會真正停止送元素
}, 7000);

但是這樣太繁瑣了铡恕,rxjs提供了refcount方法琢感。

var source = interval(1000).pipe(
    multicast(new Rx.Subject()),
             refCount();
)

var observerA = {
    next: value => console.log('A next: ' + value),
    error: error => console.log('A error: ' + error),
    complete: () => console.log('A complete!')
}

var observerB = {
    next: value => console.log('B next: ' + value),
    error: error => console.log('B error: ' + error),
    complete: () => console.log('B complete!')
}

var subscriptionA = source.subscribe(observerA);
// 訂閱數(shù) 0 => 1

var subscriptionB;
setTimeout(() => {
    subscriptionB = source.subscribe(observerB);
    // 訂閱數(shù) 0 => 2
}, 1000);

使用refcount后的observable當上邊有訂閱后會自動打開廣播功能,當沒有訂閱后探熔,會自動關(guān)閉驹针。這樣就不需要特意關(guān)閉廣播Observable,也不需要刻意使用connect诀艰。

publish

multicast(new Rx.Subject())在rxjs中有個方法punish柬甥。

var source = interval(1000).pipe(
    publish(),
    refCount();
)
// var source = Rx.Observable.interval(1000)
//             .multicast(new Rx.Subject()) 
//             .refCount();

publish也可以配合subject三種變形墙牌,rxjs分別封裝了對應(yīng)的方法:publishReplay、publishBehavior暗甥、publishLast

share

另外 publish + refCount 可以在簡寫成 share

var source = interval(1000).pipe(
     share();
)

作者:大喵愛讀書
鏈接:http://www.reibang.com/p/16be96d69143
來源:簡書
著作權(quán)歸作者所有喜滨。商業(yè)轉(zhuǎn)載請聯(lián)系作者獲得授權(quán),非商業(yè)轉(zhuǎn)載請注明出處撤防。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末虽风,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子寄月,更是在濱河造成了極大的恐慌辜膝,老刑警劉巖,帶你破解...
    沈念sama閱讀 218,640評論 6 507
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件漾肮,死亡現(xiàn)場離奇詭異厂抖,居然都是意外死亡,警方通過查閱死者的電腦和手機克懊,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,254評論 3 395
  • 文/潘曉璐 我一進店門忱辅,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人谭溉,你說我怎么就攤上這事墙懂。” “怎么了扮念?”我有些...
    開封第一講書人閱讀 165,011評論 0 355
  • 文/不壞的土叔 我叫張陵损搬,是天一觀的道長。 經(jīng)常有香客問我柜与,道長巧勤,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 58,755評論 1 294
  • 正文 為了忘掉前任弄匕,我火速辦了婚禮颅悉,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘粘茄。我一直安慰自己签舞,他們只是感情好,可當我...
    茶點故事閱讀 67,774評論 6 392
  • 文/花漫 我一把揭開白布柒瓣。 她就那樣靜靜地躺著,像睡著了一般吠架。 火紅的嫁衣襯著肌膚如雪芙贫。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 51,610評論 1 305
  • 那天傍药,我揣著相機與錄音磺平,去河邊找鬼魂仍。 笑死,一個胖子當著我的面吹牛拣挪,可吹牛的內(nèi)容都是我干的擦酌。 我是一名探鬼主播,決...
    沈念sama閱讀 40,352評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼菠劝,長吁一口氣:“原來是場噩夢啊……” “哼赊舶!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起赶诊,我...
    開封第一講書人閱讀 39,257評論 0 276
  • 序言:老撾萬榮一對情侶失蹤笼平,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后舔痪,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體寓调,經(jīng)...
    沈念sama閱讀 45,717評論 1 315
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,894評論 3 336
  • 正文 我和宋清朗相戀三年锄码,在試婚紗的時候發(fā)現(xiàn)自己被綠了夺英。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 40,021評論 1 350
  • 序言:一個原本活蹦亂跳的男人離奇死亡滋捶,死狀恐怖秋麸,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情炬太,我是刑警寧澤灸蟆,帶...
    沈念sama閱讀 35,735評論 5 346
  • 正文 年R本政府宣布,位于F島的核電站亲族,受9級特大地震影響炒考,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜霎迫,卻給世界環(huán)境...
    茶點故事閱讀 41,354評論 3 330
  • 文/蒙蒙 一斋枢、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧知给,春花似錦瓤帚、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,936評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至筒扒,卻和暖如春怯邪,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背花墩。 一陣腳步聲響...
    開封第一講書人閱讀 33,054評論 1 270
  • 我被黑心中介騙來泰國打工悬秉, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留澄步,地道東北人。 一個月前我還...
    沈念sama閱讀 48,224評論 3 371
  • 正文 我出身青樓和泌,卻偏偏與公主長得像村缸,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子武氓,可洞房花燭夜當晚...
    茶點故事閱讀 44,974評論 2 355

推薦閱讀更多精彩內(nèi)容

  • 最近在學習Rxjs梯皿,所以借此機會對rxjs知識點進行了一定的整理,用以加深自己對這部分知識的理解和記憶聋丝。 簡介 R...
    大喵愛讀書閱讀 60,217評論 5 70
  • 一.背景介紹 Rx(Reactive Extension -- 響應(yīng)式擴展 http://reactivex.io...
    愛上Shu的小刺猬閱讀 2,045評論 1 3
  • 介紹 RxJS是一個異步編程的庫索烹,同時它通過observable序列來實現(xiàn)基于事件的編程。它提供了一個核心的類型:...
    泓滎閱讀 16,602評論 0 12
  • 創(chuàng)建Observable: Rx.Observable.create 是 Observable 構(gòu)造函數(shù)的別名弱睦,它...
    柳源居士閱讀 4,286評論 0 2
  • 本文結(jié)構(gòu): 什么是RxJS RxJS有什么特點 RxJS核心概念 什么是RxJS 在javaScript中,我們可...
    stevemoon閱讀 11,263評論 0 8