原文鏈接: https://medium.com/@benlesh/rxjs-observable-interop-with-promises-and-async-await-bebb05306875
本文為 RxJS 中文社區(qū) 翻譯文章松邪,如需轉載痰洒,請注明出處出爹,謝謝合作!
如果你也想和我們一起核畴,翻譯更多優(yōu)質的 RxJS 文章以奉獻給大家,請點擊【這里】
不時地會有人問我關于如何與 RxJS 配合使用 async 函數(shù)或 promises冲九,還有更糟的谤草,我被告之“事實”的真相是 async-await 和 Observables 并不能“在一起使用”。RxJS 從一開始就具備與 Promises 的高度互操作性莺奸。希望這篇文章能對此有所啟發(fā)丑孩。
如果可以接收 Observable,就可以接收 Promise
例如灭贷,如果使用 switchMap温学,你可以返回 Promise 來替代,就像返回 Observable 那樣甚疟。以下這些都是有效的:
// Observable: 每1秒發(fā)出自增數(shù)值乘以100仗岖,共發(fā)出10次
const source$ = Observable.interval(1000)
.take(10)
.map(x => x * 100);
/**
* 返回 promise,它等待 `ms` 毫秒并發(fā)出 "done"
*/
function promiseDelay(ms) {
return new Promise(resolve => {
setTimeout(() => resolve('done'), ms);
});
}
// 在 switchMap 中使用 promiseDelay
source$.switchMap(x => promiseDelay(x)) // 正常運行
.subscribe(x => console.log(x));
source$.switchMap(promiseDelay) // 更簡潔了
.subscribe(x => console.log(x));
// 或者使用 takeUntil
source$.takeUntil(doAsyncThing('hi')) // 完全可以運行
.subscribe(x => console.log(x))
// 或者類似這樣的奇怪組合
Observable.of(promiseDelay(100), promiseDelay(10000)).mergeAll()
.subscribe(x => console.log(x))
使用 defer 使得返回 Promise 的函數(shù)可以重試
如果你可以訪問創(chuàng)建 promise 的函數(shù)览妖,你可以使用 Observable.defer()
來包裝它轧拄,以使 Observable 可以在報錯時進行重試。
function getErroringPromise() {
console.log('getErroringPromise called');
return Promise.reject(new Error('sad'));
}
Observable.defer(getErroringPromise)
.retry(3)
.subscribe(x => console.log);
// 輸出 "getErroringPromise called" 4次 (開始1次 + 3次重試), 然后報錯
使用 defer() 定義使用 async-await 的 Observable
事實證明讽膏, defer 是個非常強大的小工具紧帕。你可以使用它,基本上是直接使用 async 函數(shù)桅打,它會創(chuàng)建一個發(fā)出返回值及完成的 Observable 是嗜。
Observable.defer(async function() {
const a = await promiseDelay(1000).then(() => 1);
const b = a + await promiseDelay(1000).then(() => 2);
return a + b + await promiseDelay(1000).then(() => 3);
})
.subscribe(x => console.log(x)) // 輸出 7
使用 forEach 訂閱 Observable 以在 async-await 中創(chuàng)建并發(fā)任務
這是 RxJS 中較少使用的功能,它來自 TC39 Observable 提議挺尾。訂閱 Observable 可不止一種方式鹅搪! subscribe
是訂閱 Observable 的傳統(tǒng)方式,它返回用來取消數(shù)據(jù)流的 Subscription
對象遭铺。而 forEach
以一種不可取消的方式來訂閱 Observable 丽柿,它接收一個函數(shù)用于每個值恢准,并返回 Promise,該 Promise 體現(xiàn)了 Observable 的完成和錯誤路徑甫题。
const click$ = Observable.fromEvent(button, 'click');
/**
* 等待10次按鈕點擊馁筐,然后使用 fetch 將第10次點擊的時間戳發(fā)送給端點
*/
async function doWork() {
await click$.take(10)
.forEach((_, i) => console.log(`click ${i + 1}`));
return await fetch(
'notify/tenclicks',
{ method: 'POST', body: Date.now() }
);
}
使用 toPromise() 和 async/await 將 Observable 最后發(fā)出的值作為 Promise 發(fā)出
toPromise
函數(shù)實際上是有些巧妙的,因為它并不是真正的“操作符”坠非,而是以一種 RxJS 特定的方式來訂閱 Observable 并將其包裝成一個 Promise 敏沉。一旦 Observable 完成,Promise 便會 resolve Observable 最后發(fā)出的值炎码。這意味著如果 Observable 發(fā)出值 “hi” 然后等待10秒才完成盟迟,那么返回的 Promise 會等待10秒才 resolve “hi” 。如果 Observable 一直不完成潦闲,那么 Promise 便永遠不會 resolve 攒菠。
注意: 使用 toPromise() 是一種反模式,除非當你正在處理預期為 Promise 的 API歉闰, 比如 async-await
const source$ = Observable.interval(1000).take(3); // 0, 1, 2
// 等待3秒辖众,然后輸出 "2"
// 因為 Observable 需要3秒才能完成,而 interval 發(fā)出從0開始自增的數(shù)字
async function test() {
console.log(await source$.toPromise());
}
Observables 和 Promises 能很好地一起使用
不可否認地和敬,如果你的目標是響應式編程赵辕,那么大多數(shù)時間里你可能想要使用 Observable ,但是 RxJS 嘗試去盡可能地滿足大眾需求概龄,畢竟當下 Promises 還是很受歡迎的还惠。此外,在 async 函數(shù)中使用 RxJS Observables 和 forEach私杜,為管理并發(fā)性和在 async-await 中“只能正常運行”的任務開啟了大量有趣的可能性蚕键。
想學習更多 RxJS 知識, 我可以親自教學或選擇在線學習衰粹,盡在http://rxworkshop.com!