響應(yīng)式編程是一種面向數(shù)據(jù)流和變化傳播的編程范式。
面向變化傳播的編程就是看最初的數(shù)據(jù)是否會隨著后續(xù)對應(yīng)變量的變化而變化巢音。比如當(dāng)變量B的數(shù)值改變后浸卦,變量C的數(shù)值也隨之變動。
面向數(shù)據(jù)流的編程是當(dāng)監(jiān)聽一系列事件流并對這一系列事件流進(jìn)行映射住练、過濾和合并等處理后,再響應(yīng)整個事件流的回調(diào)的過程骆捧。例如在ReactiveX編程范式中澎羞,數(shù)據(jù)流被封裝在一個叫Observable的對象實例中,通過觀察者模式敛苇,對數(shù)據(jù)流進(jìn)行統(tǒng)一的訂閱(Subscribe),并在中間插入像filter()這樣的操作函數(shù),從而對Observable所封裝的數(shù)據(jù)流進(jìn)行過濾處理枫攀。
myObservable.filter(fn).subscribe(callback);
響應(yīng)式編程清楚地表達(dá)了動態(tài)的異步數(shù)據(jù)流括饶,而相關(guān)的計算模型也自動地將變化的值通過數(shù)據(jù)流的方式進(jìn)行了傳播。
ReactiveX
ReactiveX一般簡寫為Rx来涨,它是微軟開發(fā)并維護(hù)的一套工具庫集合图焰,用于提供一系列接口規(guī)范來處理異步數(shù)據(jù)流。
Observable
應(yīng)用中產(chǎn)生的異步數(shù)據(jù)都需要先包裝成Observable對象蹦掐,Observable對象的作用是把這些異步的數(shù)據(jù)變換為數(shù)據(jù)流形式技羔。所以生成的這些Observable對象相當(dāng)于數(shù)據(jù)流的源頭,后續(xù)操作都是圍繞著這些被轉(zhuǎn)換的流動數(shù)據(jù)展開卧抗。
Operator
Rx在結(jié)合了觀察者模式的同時藤滥,還結(jié)合了函數(shù)式編程和迭代器模式的思想。其中社裆,Rx的Operator便是對這兩種模式的具體體現(xiàn)拙绊。
Operator是Rx中Observable的操作符。在Rx中泳秀,每一個Observable對象标沪,或者說數(shù)據(jù)流,都可以通過某個operator對該Observable對象進(jìn)行變換嗜傅、過濾金句、合并和監(jiān)聽等操作。同時吕嘀,大多數(shù)operator在對該Observable對象處理后返回一個新的Observable對象供下一個operator進(jìn)行處理违寞。這樣方便在各個operator之間通過鏈?zhǔn)秸{(diào)用的方式編寫代碼。
//生成一個新的Observable對象
let newObservable=observable.debounceTime(500).take(2);
debunceTime()和take()都是一個Operator币他,這些Operators通過鏈?zhǔn)椒椒ǖ慕M合坞靶,對原有的Observable對象進(jìn)行操作,最終返回了一個新的Observable對象蝴悉。
在Rx中彰阴,Observable作為觀察者模式中的被觀察者,需要一個方法來訂閱它拍冠,而subscribe()便是這樣一種方法尿这,訂閱Observable對象發(fā)出的所有事件。
observable.subscribe(observer);
subscribe()方法會接收一個observer作為參數(shù)庆杜,來對observable發(fā)出的事件進(jìn)行訂閱射众。每當(dāng)observable完成并發(fā)送(Emit)一個事件時,該事件就會被observer所捕獲晃财,進(jìn)入到observer對應(yīng)的回調(diào)函數(shù)中叨橱。被subscribe()訂閱過的Observable對象并不會返回一個新的Observable對象,因為subscribe()不是一個可以改變原始數(shù)據(jù)流的函數(shù)。相反罗洗,subscribe()會返回一個Subscription實例愉舔,這個Subscription實例又提供很多操作API,例如具有取消訂閱事件功能的unsubscribe()伙菜。
其他核心概念
- Observer:對Observable對象發(fā)出的每個事件進(jìn)行響應(yīng)
- Subscription:Observable對象被訂閱后返回的Subscription實例
- Subject:EventEmitter的等價數(shù)據(jù)結(jié)構(gòu)轩缤,可以當(dāng)作Observable被監(jiān)聽,也可以作為Observer發(fā)送新的事件
RxJS
RxJS是Rx在JS層面上的實現(xiàn)贩绕,除此之外還有RxJava火的、Rx.Net等。
創(chuàng)建Observable對象
首先把數(shù)據(jù)流封裝為統(tǒng)一的Observable對象淑倾,并對它進(jìn)行相應(yīng)的處理馏鹤。
let button=document.querySelector('button');
Rx.Observable.fromEvent(button,'click') //返回一個Observable對象
.subscribe(()=>console.log('Clicked'));
通過Observable中的fromEvent靜態(tài)方法把<button>
標(biāo)簽的所有點擊事件封裝到一個Observable對象中,并轉(zhuǎn)化為數(shù)據(jù)流的形式踊淳,最后通過subscribe()方法對整個點擊事件流進(jìn)行監(jiān)聽假瞬。也就是說當(dāng)按鈕被點擊時,對應(yīng)的Observable對象便發(fā)出一條相應(yīng)的消息迂尝,這條消息會被subscribe()中的observer回調(diào)函數(shù)所捕獲脱茉,從而執(zhí)行console.log()語句,這樣便實現(xiàn)了一個最簡單的數(shù)據(jù)流監(jiān)聽垄开。
使用RxJS處理復(fù)雜場景
在實際開發(fā)中琴许,會遇到這樣的場景:當(dāng)用戶在一個文本輸入框進(jìn)行輸入時,需要對用戶的輸入進(jìn)行實時的監(jiān)聽溉躲,每當(dāng)用戶輸入一些新的字符時榜田,會發(fā)一個請求到服務(wù)器端,來獲取一些輸入推薦信息展示給用戶锻梳。但在實現(xiàn)這個功能的時候箭券,需求可能還會有如下的限制條件:
- 不必在每次用戶輸入的時候都發(fā)請求。用戶在文本框輸入文字時疑枯,可能會輸入得很快辩块,這時是不需要給用戶推薦任何信息的,不讓頻繁發(fā)送網(wǎng)絡(luò)請求可能會影響性能荆永,因此可當(dāng)用戶輸入停頓500ms沒有再輸入時废亭,才返回推薦信息給用戶。
- 要保證請求返回的順序具钥。在異步請求的情況下豆村,由于服務(wù)器返回推薦數(shù)據(jù)的響應(yīng)時間會受網(wǎng)絡(luò)環(huán)境等因素影響,有時前端拿到的推薦數(shù)據(jù)不是最后一次請求的骂删,所以需要保證這些推薦信息的渲染順序與請求順序一致掌动。
在RxJS中四啰,這樣的數(shù)據(jù)流操作可以很優(yōu)雅的實現(xiàn)。
let inputSelector=document.querySelector('input');
Rx.Observable.fromEvent(inputSelector,'keyup')
.debounceTime(500)
.switchMap(event=>getRecommend(event.target.value))
.subscribe(callback);
RxJS和Promise的對比
RxJS的Observable可以通過toPromise()方法把原有弟弟Observable對象轉(zhuǎn)為Promise對象坏匪。能用Promise的場景RxJS都實用拟逮,RxJS是作為Promise的超集存在撬统。
//Promise實例的創(chuàng)建
let promise=new Promise((resolve,reject)=>{
//...
if(/*異步操作成功*/){
resolve(value);
}else{
reject(error);
}
});
//Observable實例的創(chuàng)建
let Observable=new Observable(observer=>{
observer.next(value1);
observe.next(value2);
observer.error(err);
});
Promise只能針對單一的異步事件進(jìn)行resolve()操作适滓,而在Observable中,不僅僅能處理一個單一的異步事件(即調(diào)用boserver的next()方法)恋追,而且能以流的形式響應(yīng)多個異步事件凭迹。
“冷”模式下的Observable
Observable通常可以實現(xiàn)成“熱”模式或“冷”模式苦囱。在“熱”模式下嗅绸,Observable對象一旦創(chuàng)建,便會開始發(fā)送數(shù)據(jù)撕彤。而在“冷”模式下鱼鸠,Observable對象會一直等到自己被訂閱,才會開始數(shù)據(jù)流的發(fā)送羹铅。在RxJS中蚀狰,Observable實現(xiàn)的是“冷”模式。
console.log('Observable 的數(shù)據(jù)發(fā)送順序為:');
let obs=new Observable(observer=>{
console.log('Observable start');
observer.next();
});
console.log('start');
obs.subscribe();
結(jié)果:
Observable的發(fā)送順序為:
start
Observable start
在RxJS中职员,obsrver.next()在Observable對象被訂閱后才執(zhí)行麻蹋。也就是說在RxJS中,Observable對象直到被subscribe()之后焊切,才會進(jìn)入數(shù)據(jù)發(fā)送的流程中扮授。
除了“冷”模式和“熱”模式外,RxJS中還存在另外一種被稱作Connectable的模式专肪。這種模式下的Observable對象不管有沒有訂閱刹勃,都不會發(fā)送數(shù)據(jù),除非ConnectableObservable實例的connect()方法被調(diào)用嚎尤。
console.log('Connectable Observable的數(shù)據(jù)發(fā)送順序為:')荔仁;
let obs=new Observable(observer=>{
console.log('Observable start');
observer.complete();
}).publish();
console.log('start');
obs.subscribe();
console.log('after Observable has been subscribed');
obs.connect();
運行結(jié)果:
Connectable Observable 的數(shù)據(jù)發(fā)送順序為:
start
after Observable has been subscribed
Observable start
原來的Observable對象被publish()方法轉(zhuǎn)換為Connectable模式,在Connectable模式下诺苹,Observable對象并沒有在被subscribe()訂閱之后發(fā)送數(shù)據(jù)咕晋,而是在被connect()方法調(diào)用后才發(fā)送,這就是Connectable Observable收奔。
RxJS中的Operator
Operator操作符分類:創(chuàng)建操作符掌呜、過濾操作符、組合操作符坪哄、工具操作符等质蕉。
創(chuàng)建操作符
Observable.fromEvent()势篡、new Observable()和Observable.create()都是創(chuàng)建操作符。
let observable=Rx.Observable.create(observer=>{
getData(data=>{
observer.next(data);
observer.complete();
})
});
observable.subscribe(data=>{
//doSomething(data);
});
Observable.create()接受一個工廠函數(shù)并返回一個新的Observable對象模暗。這個對象最終被subscribe()方法監(jiān)聽禁悠,每當(dāng)observer.next()方法被調(diào)用時,subscribe()中的callback函數(shù)便捕獲到observer傳來的數(shù)據(jù)并進(jìn)行相應(yīng)地處理兑宇。這樣便實現(xiàn)了對數(shù)據(jù)流的訂閱和監(jiān)聽功能碍侦。
變換操作符
有些時候,通過Observable對象獲取到的數(shù)據(jù)需要做一些批量的小調(diào)整隶糕。比如瓷产,數(shù)據(jù)獲取的接口經(jīng)常會有自己的一套規(guī)范去包裹數(shù)據(jù)。
{
"err_code":0,
"data":{"name":"Operators"}
}
只有data字段是實際想要處理的枚驻,所以需要對每一個請求做一次變換操作濒旦,把原本的數(shù)據(jù)流變換成需要的數(shù)據(jù)流,這就需要用到變換操作符再登。RxJS中最常用的變換操作符是Observable.prototype.map()尔邓。
observable.map(response=>{
return response.data;
}).subscribe(data=>{
//doSomething(data);
});
當(dāng)observable拿到響應(yīng)數(shù)據(jù)response并傳給observer之前,可以通過map操作锉矢,來預(yù)先對response進(jìn)行處理梯嗽,從而讓observer得到被加工后的數(shù)據(jù)格式。
過濾操作符
過濾操作符可用于過濾掉數(shù)據(jù)流中一些不需要處理的數(shù)據(jù)沈撞。有時候在前端獲取數(shù)據(jù)時慷荔,接口會因為各種原因無法返回最終需要的數(shù)據(jù),可能由于異常導(dǎo)致返回的數(shù)據(jù)為空缠俺,或只返回一個錯誤代碼以及錯誤描述告知前端显晶,但并不需要處理這些錯誤信息,所以就需要過濾這些數(shù)據(jù)壹士。這時候就需要用到Observable.prototype.filter()來對數(shù)據(jù)進(jìn)行過濾磷雇。
observable.filter(response=>{
return !!response.data&&response.status===200;
}).map(response=>{
return response.data;
}).subscribe(data=>{
//doSomething(data);
});
結(jié)果為false的數(shù)據(jù)將不會再流向下一個operator。
組合操作符
有時候需要依賴兩個甚至更多的接口數(shù)據(jù)躏救,并在這些接口數(shù)據(jù)都成功獲取后唯笙,再進(jìn)行關(guān)聯(lián)合并。這時候就需要用到組合操作符:Observable.forkJoin()盒使。
let getFirstDataObs=Rx.Observable.create(observer=>{
observer.next(getFirstData());
observer.compete();
});
let getSecondDataObs=Rx.Observable.create(observer=>{
getSecondData(data=>{
observer.next(data);
observer.complete();
});
});
let observable=Rx.Observable.forkJoin(
getFirstDataObs,getSecondDataObs
);
observable.subscribe(datas=>{
//data[0]是getFirstDataObs的數(shù)據(jù)
//data[1]是getSecondDataObs的數(shù)據(jù)
//doSomething(data);
});
Observable.forkJoin()把原本兩個互相獨立的Observable對象合并為一個新的Observable對象崩掘,它會在兩個Observable對象的數(shù)據(jù)都抵達(dá)后才開始合并處理。所以odSomething()只會執(zhí)行一次少办,此時拿到的datas是包含兩個數(shù)據(jù)流數(shù)據(jù)的數(shù)組苞慢。
如果某次數(shù)據(jù)請求需要依賴前一次請求的結(jié)果,也就是說兩次請求必須有先后順序的英妓,這是可以用Observable.prototype.contactMap()挽放。
let getFirstDataObs=Rx.Observable.create(ovserver=>{
observer.next(getFirstData());
observer.complete();
});
let createSecondDataObs=function(firstData){
return Rx.Observable.create(observer=>{
getSecondData(firstData,secondData=>{
observer.next(secondData);
observer.complete();
});
});
}
let observable=getFirstDataObs.contactMap(firstData=>{
return createSecondDataObs(firstData);
}).subscribe(secondData=>{
doSomethingWithSecondData(secondData);
});
通過Observable.prototype.contactMap()方法绍赛,getSecondDataObs()的數(shù)據(jù)流被緊接在getFirstDataObs()的數(shù)據(jù)流后,并且最終數(shù)據(jù)流被subscribe()所捕獲辑畦。