ReactiveX是一個(gè)用觀察者模式
開發(fā)異步
和基于事件編程
的庫。
它擴(kuò)展了觀察者模式础拨,支持流式的數(shù)據(jù)和事件,然后增加一些操作符可以用于靈活的處理流式數(shù)據(jù)與事件。其內(nèi)置了很多不同類型的操作符映企,并且聲明式的寫法,讓我們可以專注開發(fā)業(yè)務(wù)而無需考慮很多底層的線程怎抛,并發(fā)卑吭,非阻塞操作。
概念
什么是流式事件?
從事件維度上看马绝,就是在不同時(shí)間依次發(fā)生的事件豆赏。比如說依次點(diǎn)擊屏幕,如下可以看做流式的事件富稻。stream of event
或者event flow
為什么使用觀察者模式(Observable)掷邦?
觀察者模型,讓我們?cè)谔幚?code>異步事件流時(shí)更加簡單椭赋,就是提高效率抚岗。可以減少寫很多的回調(diào)操作
單條數(shù)據(jù)(single items) | 多條數(shù)據(jù)(multi items) | |
---|---|---|
同步/synchronous | T getData() |
Iterable getData() |
異步/asynchronous | Future getData() |
Observable getData() |
如上圖可以看出來哪怔,JDK本身沒有提供對(duì)多條數(shù)據(jù)的異步支持宣蔚。比如異步的返回,A认境,B胚委,C三個(gè)東西,順序不做要求叉信,那么一般就要寫三個(gè)Future亩冬。如果三個(gè)數(shù)據(jù)在同一個(gè)線程內(nèi)處理,就是同步的硼身。
Observable提供了一種理想化的方式來訪問異步的流式數(shù)據(jù)(asynchronous sequences of multiple items)
Observables可以進(jìn)行組合
Java的Future是一種直接的方式來處理單個(gè)異步的操作硅急,但是如果嵌套使用Future的時(shí)候覆享,在異步里面繼續(xù)異步Future,代碼就容易變得復(fù)雜不易維護(hù)营袜。
組合多個(gè)Future進(jìn)行異步化的條件執(zhí)行下一步操作時(shí)撒顿,寫法比較難優(yōu)化。如果使用了Future.get可以更容易預(yù)測到結(jié)果连茧,但是又過早的進(jìn)入了阻塞核蘸。
ReactiveX Observables意在組合流式的異步數(shù)據(jù)。
Observables更加靈活
ReactiveX Observables不僅僅支持單個(gè)的數(shù)值啸驯,同時(shí)支持無限的數(shù)據(jù)流客扎,無限數(shù)據(jù)流即永遠(yuǎn)都一直有新的數(shù)據(jù)出現(xiàn),無法處理到最后罚斗。
上面提到了Observables是一種理想的方式處理異步的多個(gè)數(shù)據(jù)徙鱼。其余同步的集合迭代器對(duì)比如下:
數(shù)據(jù)(event) | Iterable (pull) | Observable (push) |
---|---|---|
獲取數(shù)據(jù) | T next() |
onNext(T) |
發(fā)現(xiàn)錯(cuò)誤 | throws Exception
|
onError(Exception) |
完成處理 | !hasNext() |
onCompleted() |
相比于同步集合的pull數(shù)據(jù),Observable是異步的push數(shù)據(jù)针姿。
- 使用Iterable袱吆,當(dāng)沒有數(shù)據(jù)到達(dá)的時(shí)候會(huì)進(jìn)入阻塞狀態(tài)。
- 使用Observable距淫,只要有數(shù)據(jù)可用的時(shí)候绞绒,就將數(shù)據(jù)推送給consumer,不管值是有還是沒有榕暇,是同步還是異步蓬衡。
Observable在經(jīng)典的觀察者模式中添加了兩個(gè)新的語法,功能更強(qiáng)大靈活彤枢,并且也使得其寫法與Iterable保持統(tǒng)一狰晚,讓我們寫Observable的時(shí)候就像在寫Iterable:
- 當(dāng)生產(chǎn)者數(shù)據(jù)被消耗完畢的時(shí)候,Observable調(diào)用Observer的
onCompleted
方法 - 當(dāng)消費(fèi)者在遇到錯(cuò)誤或異常的時(shí)候缴啡,Observable調(diào)用Observer的
onError
方法
再看下Observable與Iterable的寫法對(duì)比: 寫法上沒太大的變化壁晒,這里的Iterable可以看做Java 8中的Stream。
Observable的靈活還有其不關(guān)心其數(shù)據(jù)源的如何實(shí)現(xiàn)业栅,不管是使用線程池秒咐,actors,event loop或者其它內(nèi)容碘裕,在Observable看來都是異步的反镇。
使用ReactiveX時(shí),可以修改底層的Observable實(shí)現(xiàn)而不需要修改Observable的Consumer代碼娘汞。
使用
ReactiveX更像是一種理念,常見的編程語言都有關(guān)于ReactiveX的實(shí)現(xiàn)夕玩。以Java的實(shí)現(xiàn)RxJava為例你弦。
寫一個(gè)簡單的Hello World案例:
1 引入Maven惊豺。
<dependency>
<groupId>io.reactivex.rxjava2</groupId>
<artifactId>rxjava</artifactId>
<version>2.2.12</version>
</dependency>
2 編寫Rxjava相關(guān)代碼
public class RxHello {
public static void main(String[] args) {
// 1. 創(chuàng)建Observable
Flowable<String> observable = Flowable
.just("hello", "world")
// 2. 使用Operator
.map(String::toUpperCase)
;
//3. 創(chuàng)建Observer
Consumer<String> consumer = new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
System.out.println("收到數(shù)據(jù):" + s);
}
};
// 4. 建立訂閱關(guān)系
observable.subscribe(consumer);
}
}
上面代碼看起來比較簡單,實(shí)際上寫的時(shí)候也確實(shí)比較簡單禽作,主要步驟就三步:加上操作符可以說是4步
1 創(chuàng)建Observable尸昧,ReactiveX提供了一些不同的Operator來創(chuàng)建,具體有沒有實(shí)現(xiàn)要到對(duì)應(yīng)的編程語言中看:
// 以下operator是可以創(chuàng)建
`Create`, `Defer`, `Empty`/`Never`/`Throw`, `From`, `Interval`, `Just`, `Range`, `Repeat`, `Start`, and `Timer
// 比如:
Observable.create();
Observable.defer();
Observable.just();
Observable.fromArray()
2 進(jìn)行Observable的數(shù)據(jù)處理旷偿,內(nèi)置了非常多的操作符烹俗。轉(zhuǎn)換,過濾萍程,統(tǒng)計(jì)幢妄,錯(cuò)誤處理等等
常見的Map,F(xiàn)latMap茫负,F(xiàn)ilter蕉鸳,Take,Skip等
3 創(chuàng)建Observer忍法,即觀察者或者消費(fèi)者潮尝,用于接收Observable的數(shù)據(jù)
常見的類:FlowableSubscriber,Subscriber饿序,Comsumer勉失,主要是我們的代碼邏輯。
4 建立Observable和Observer的關(guān)系原探。
最后
這里只是介紹了ReactiveX的概念乱凿,介紹關(guān)鍵異步流式數(shù)據(jù),與觀察者模式部分踢匣,提供了案例代碼告匠,了解ReactiveX的基本寫法。
想要提要到ReactiveX的強(qiáng)大离唬,還需要掌握其豐富的Operator操作符功能后专。
參考:
http://reactivex.io/intro.html
http://reactivex.io/documentation/observable.html
https://medium.com/@andrestaltz/2-minute-introduction-to-rx-24c8ca793877