針對異步數(shù)據(jù)流的編程刹淌,簡單來說官还,它將一切數(shù)據(jù)芹橡,包括HTTP請求,DOM事件或者普通數(shù)據(jù)等包裝成流的形式望伦,然后用強大豐富的操作符對流進行處理林说,使你得以同步方式處理異步數(shù)據(jù),并組合不同的操作符來輕松優(yōu)雅的實現(xiàn)你所需要的功能
學(xué)習
https://rxjs.dev/guide/overview
可通過以上官網(wǎng)進行學(xué)習屯伞,還可F12進行聯(lián)系測試
const { fromEvent } = rxjs;
const { mapTo } = rxjs.operators;
const clicks = fromEvent(document, 'click');
const greetings = clicks.pipe(mapTo('Hi'));
greetings.subscribe(x => console.log(x));
概念
Observable
observable 被稱為可觀察的序列腿箩,簡單來說數(shù)據(jù)在observable中流動,可以使用各種operator對流進行處理
const observable = new Observable(subscriber => {
subscriber.next(1);
subscriber.next(2);
subscriber.next(3);
setTimeout(() => {
subscriber.next(4);
subscriber.complete();
}, 1000);
});
Observer
觀察者劣摇,一個回調(diào)函數(shù)集合珠移,他知道如何去監(jiān)聽由observable提供的值,通過三個可選參數(shù)末融,監(jiān)聽Observable的行為:
- 成功處理函數(shù) next剑梳,每次流發(fā)出值時調(diào)用該函數(shù)
- 錯誤處理函數(shù) error,只有在發(fā)生錯誤時才調(diào)用該函數(shù)滑潘,此處理函數(shù)本身接收一個錯誤
- 完成處理函數(shù) complete,僅當流完成時才調(diào)用該函數(shù)
如下所示:
observable.subscribe(
x => console.log('Observer got a next value: ' + x),
err => console.error('Observer got an error: ' + err),
() => console.log('Observer got a complete notification')
);
Operator
采用函數(shù)式編程風格的函數(shù)锨咙,使用如map, filter, concat, flatmap來處理集合
常用操作符
創(chuàng)建序列: of, from, fromEvent, fromPromise, ajax, throw
https://rxjs.dev/operator-decision-tree
如果不知如何創(chuàng)建序列语卤,可通過以上網(wǎng)址根據(jù)自己的需求得出參考
合并序列: forkJoin, merge, concat
const ob1 = Observable.ajax('api/1')
const ob2 = Observable.ajax('api/2')
const obs = [ob1, ob2]
// 串行
Observable.concat(...obs).subscribe(detail => console.log('每個請求都會觸發(fā)回調(diào)'))
// 并行
Observable.merge(...obs).subscribe(detail => console.log('每個請求都會觸發(fā)回調(diào)'))
// 并行,并且合并結(jié)果
Observable.forkJoin(...obs).subscribe(detailArray => console.log('觸發(fā)一次回調(diào)'))
操作符: map, filter, swirthMap, to Promise, catch, take Until, timeout, debounceTime, distinctUtilChanged, pluck
https://reactive.how/rxjs/, 可通過此網(wǎng)址學(xué)習操作符
應(yīng)用
- 重試機制,遞增延時重試
import { timer, from } from "rxjs";
import { retryWhen, delayWhen, tap, scan } from 'rxjs/operators';
const MAX_RETRY = 10
const ATTEMPT_DELAY_FACTOR = 3000
const promise = Promise.reject('bad');
// const promise = Promise.resolve('success');
promise.pipe(
retryWhen(errors =>
{
return errors.pipe(
scan((errCount, err) => {
if(errCount >= MAX_RETRY) {
throw new Error(err)
}
return errCount + 1
}, 0),
delayWhen((errCount) => timer(ATTEMPT_DELAY_FACTOR * errCount)),
tap((errCount) => console.log(`運行失敗,將進行第${errCount}次重試 ` ))
)
}
)
)
.subscribe(
// 監(jiān)聽數(shù)據(jù)流
res => {},
// 監(jiān)聽錯誤
err => {
console.log('error:', err)酪刀;
},
// 監(jiān)聽結(jié)束
() => {
console.log('運行成功');
}
)
}