如需下載源碼棋恼,請訪問
https://github.com/fengchuanfang/Rxjava2Tutorial
文章原創(chuàng)返弹,轉(zhuǎn)載請注明出處:
Rxjava2入門教程二:Observable與Observer響應(yīng)式編程在Rxjava2中的典型實現(xiàn)
在RxJava中,函數(shù)響應(yīng)式編程具體表現(xiàn)為一個觀察者(Observer)訂閱一個可觀察對象(Observable)爪飘,通過創(chuàng)建可觀察對象發(fā)射數(shù)據(jù)流义起,經(jīng)過一系列操作符(Operators)加工處理和線程調(diào)度器(Scheduler)在不同線程間的轉(zhuǎn)發(fā),最后由觀察者接受并做出響應(yīng)的一個過程
ObservableSource與Observer是RxJava2中最典型的一組觀察者與可觀察對象的組合师崎,其他四組可以看做是這一組的改進(jìn)版或者簡化版默终。
Observable
抽象類Observable是接口ObservableSource下的一個抽象實現(xiàn),我們可以通過Observable創(chuàng)建一個可觀察對象發(fā)射數(shù)據(jù)流犁罩。
demo1_1.jpg
上例中齐蔽,調(diào)用Observable.create創(chuàng)建一個可觀察對象,并發(fā)送“Hello World”昼汗,然后通知發(fā)送完成
Observer
創(chuàng)建一個觀察者Observer來接受并響應(yīng)可觀察對象發(fā)射的數(shù)據(jù)流
demo1_2.jpg
在onNext方法中接收到可觀察對象發(fā)射的數(shù)據(jù)"Hello World",并做出響應(yīng)——打印到控制臺肴熏。
Observer訂閱Observable
demo1_3.jpg
一旦Observer與Observable建立了訂閱關(guān)系鬼雀,Observer與Observable便成為了一個整體顷窒,Observer便可對Observable中的行為作出響應(yīng)。
Emitter/Observer
通過Observable.create創(chuàng)建可觀察對象時,我們可以發(fā)現(xiàn)具體執(zhí)行發(fā)射動作的是接口ObservableEmitter的實例化對象鞋吉,而ObservableEmitter 繼承自 接口Emitter鸦做,查看源碼接口Emitter的具體代碼如下:
publicinterfaceEmitter{//用來發(fā)送數(shù)據(jù),可多次調(diào)用谓着,每調(diào)用一次發(fā)送一條數(shù)據(jù)voidonNext(@NonNull T value);//用來發(fā)送異常通知泼诱,只發(fā)送一次,若多次調(diào)用只發(fā)送第一條voidonError(@NonNull Throwable error);//用來發(fā)送完成通知赊锚,只發(fā)送一次治筒,若多次調(diào)用只發(fā)送第一條voidonComplete();}
onNext:用來發(fā)送數(shù)據(jù),可多次調(diào)用舷蒲,每調(diào)用一次發(fā)送一條數(shù)據(jù)
onError:用來發(fā)送異常通知耸袜,只發(fā)送一次,若多次調(diào)用只發(fā)送第一條
onComplete:用來發(fā)送完成通知牲平,只發(fā)送一次堤框,若多次調(diào)用只發(fā)送第一條
onError與onComplete互斥,兩個方法只能調(diào)用一個不能同時調(diào)用纵柿,數(shù)據(jù)在發(fā)送時蜈抓,出現(xiàn)異常可以調(diào)用onError發(fā)送異常通知也可以不調(diào)用昂儒,因為其所在的方法subscribe會拋出異常沟使,若數(shù)據(jù)在全部發(fā)送完之后均正常可以調(diào)用onComplete發(fā)送完成通知荆忍;其中格带,onError與onComplete不做強(qiáng)制性調(diào)用。
接口Observer中的三個方法(onNext,onError,onComplete)正好與Emitter中的三個方法相對應(yīng)刹枉,分別對Emitter中對應(yīng)方法的行為作出響應(yīng)叽唱。
Emitter調(diào)用onNext發(fā)送數(shù)據(jù)時,Observer會通過onNext接收數(shù)據(jù)微宝。
Emitter調(diào)用onError發(fā)送異常通知時棺亭,Observer會通過onError接收異常通知。
Emitter調(diào)用onComplete發(fā)送完成通知時蟋软,Observer會通過onComplete接收完成通知镶摘。
步驟簡化
去掉中間變量可以對之前的代碼簡化為以下形式:
demo2.jpg
在響應(yīng)式編程的基礎(chǔ)上,加上函數(shù)式編程岳守,真正的函數(shù)響應(yīng)式編程可以將代碼簡化成以下形式:
demo3.jpg
其中凄敢,just操作符是經(jīng)過封裝后,專門用來發(fā)射單條數(shù)據(jù)的湿痢,可以是一個數(shù)據(jù)涝缝,一條字符扑庞,一個對象,一整個數(shù)組拒逮,一整個集合罐氨。
Consumer可以看做是對觀察者Observer功能單一化之后的產(chǎn)物——消費(fèi)者,上例中的Consumer通過其函數(shù)accept只接收可觀察對象發(fā)射的數(shù)據(jù)滩援,不接收異常信息或完成信息栅隐。
如果想接收異常信息或完成信息可以用下面的代碼:
demo4.jpg
第二個參數(shù)Consumer規(guī)定泛型通過函數(shù)accept接收異常信息。
第三個參數(shù)Action也是對觀察者Observer功能單一化之后的產(chǎn)物--行動玩徊,通過函數(shù)run接收完成信息租悄,作出響應(yīng)行動。
發(fā)送數(shù)據(jù)序列
Observable可以發(fā)送單條數(shù)據(jù)或者數(shù)據(jù)序列恩袱,上面的事例都是發(fā)送單條數(shù)據(jù)'Hello World"的情形恰矩,那么怎樣發(fā)送數(shù)據(jù)序列呢?
可以通過最基礎(chǔ)的方法:
demo5.jpg
通過在方法subscribe中循環(huán)遍歷String類型的集合list中的元素憎蛤,然后通過emitter.onNext(str)將他們逐一發(fā)送外傅;如果發(fā)送過程中捕獲到異常,通過emitter.onError(e)發(fā)送異常信息俩檬;最后如果數(shù)據(jù)正常發(fā)送完畢調(diào)用 emitter.onComplete()發(fā)送完成通知萎胰,Observer中通過onNext接收emitter發(fā)送的每一條信息并打印到控制臺(emitter發(fā)送幾次,Observer便接收幾次)棚辽,通過onError(Throwable e)接收異常信息技竟,onComplete()接收完成信息。
同樣可以通過操作符對其進(jìn)行簡化屈藐,如下;
demo6.jpg
其中fromIterable(list)也是一個封裝好的操作符榔组,可以將一個可迭代對象中的每一個元素逐一發(fā)送
Disposable
在之前的例子中,可以看到Observer接口中還有一個方法沒有說
publicvoidonSubscribe(Disposable d){}
這個方法中有個Disposable類型的參數(shù)联逻,
onSubscribe表示在訂閱時搓扯,當(dāng)觀察者Observer訂閱可觀察對象Observable,建立訂閱關(guān)系后包归,會觸發(fā)這個方法锨推,并且會生成一個Disposable對象,其實無論觀察者Observer以何種方式訂閱可觀察對象Observable公壤,都會生成一個Disposable换可,不管有沒有onSubscribe(Disposable d)方法,如下:
demo7.jpg
查看Disposable接口的源碼厦幅,如下:
publicinterfaceDisposable{voiddispose();booleanisDisposed();}
Disposable是觀察者Observer與可觀察對象Observable建立訂閱關(guān)系后生成的用來取消訂閱關(guān)系和判斷訂閱關(guān)系是否存在的一個接口沾鳄。
只有當(dāng)觀察者Observer與可觀察對象Observable之間存在訂閱關(guān)系時,Observer才能接收Observable發(fā)送的數(shù)據(jù)或信息确憨。如果Observer在接收Observable的信息的過程中译荞,取消了訂閱關(guān)系套媚,則Observer只能接收訂閱關(guān)系取消之前Observable發(fā)送的數(shù)據(jù),對于訂閱關(guān)系取消之后Observable發(fā)送的數(shù)據(jù)磁椒,Observer將不會再接收。
運(yùn)行下面的代碼玫芦,當(dāng)Observable接收到第5條數(shù)據(jù)時浆熔,取消訂閱關(guān)系。
demo8.jpg
控制臺日志如下:
I/System.out: 發(fā)送0
I/System.out: 接收0
I/System.out: 發(fā)送1
I/System.out: 接收1
I/System.out: 發(fā)送2
I/System.out: 接收2
I/System.out: 發(fā)送3
I/System.out: 接收3
I/System.out: 發(fā)送4
I/System.out: 接收4
I/System.out: 發(fā)送5
I/System.out: 接收5
I/System.out: 發(fā)送6
I/System.out: 發(fā)送7
I/System.out: 發(fā)送8
I/System.out: 發(fā)送9
可以發(fā)現(xiàn)取消訂閱關(guān)系之前桥帆,Observable發(fā)送一條數(shù)據(jù)医增,Observe接收一條,取消訂閱關(guān)系之后老虫,Observe將不再接收Observable發(fā)送的數(shù)據(jù)叶骨。