RxJava學(xué)習(xí)筆記

RxJava和Retrofit的依賴

? ??//rxjava依賴

? ??implementation'io.reactivex.rxjava2:rxjava:2.0.1'

? ??implementation'io.reactivex.rxjava2:rxandroid:2.0.1'

? ??//retrofit

? ??compile'com.squareup.retrofit2:retrofit:2.1.0'

? ??//Gson converter

? ??compile'com.squareup.retrofit2:converter-gson:2.1.0'

? ??/RxJava2 Adapter

? ??compile'com.jakewharton.retrofit:retrofit2-rxjava2-adapter:1.0.0'

? ??//okhttp

? ??compile'com.squareup.okhttp3:okhttp:3.4.1'

? ??compile'com.squareup.okhttp3:logging-interceptor:3.4.1'


1. RxJava鏈?zhǔn)綄懛?線程切換

Observable.create(new ObservableOnSubscribe() {

@Override

public void subscribe(ObservableEmitter emitter) throws Exception {

Log.d(TAG, "emitter 1");

emitter.onNext("1");

Log.d(TAG, "emitter 2");

emitter.onNext("2");

Log.d(TAG, "emitter onComplete");

emitter.onComplete();

Log.d(TAG, "emitter 3");

emitter.onNext("3");

}

}).subscribeOn(Schedulers.newThread())?

.subscribeOn(Schedulers.io())??

.observeOn(AndroidSchedulers.mainThread())?

.observeOn(Schedulers.newThread())

.subscribe(new Observer() {

private int i;

@Override

public void onSubscribe(Disposable disposable) {

TestRxJavaActivity.this.disposable = disposable;

Log.d(TAG, "onSubscribe");

}

@Override

public void onNext(String value) {

Log.d(TAG, "onNext value:" + value);

i++;

if (i == 2) {

Log.d(TAG, "dispose");

disposable.dispose();

Log.d(TAG, "isDisposed:" + disposable.isDisposed());

}

}

@Override

public void onError(Throwable e) {

Log.d(TAG, "onError");

}

@Override

public void onComplete() {

Log.d(TAG, "onComplete");

}});

ObservableEmitter:發(fā)射器,用來發(fā)出事件,可以發(fā)出三種類型的事件,通過調(diào)用emitter的onNext(T value)陪踩、onComplete()onError(Throwable error)就可以分別發(fā)出next事件潮瓶、complete事件和error事件

? ? 不可以隨意亂七八糟發(fā)射事件圣絮,需要滿足一定的規(guī)則:

? ? ? ?① 上游可以發(fā)送無限個(gè)onNext, 下游也可以接收無限個(gè)onNext.

????????②當(dāng)上游發(fā)送了一個(gè)onComplete后, 上游onComplete之后的事件將會(huì)繼續(xù)發(fā)送, 而下游收到onComplete事件之后將不再繼續(xù)接收事件.

????????③當(dāng)上游發(fā)送了一個(gè)onError后, 上游onError之后的事件將繼續(xù)發(fā)送, 而下游收到onError事件之后將不再繼續(xù)接收事件.

? ? ? ? ④上游可以不發(fā)送onComplete或onError.

? ? ? ? ⑤最為關(guān)鍵的是onComplete和onError必須唯一并且互斥, 即不能發(fā)多個(gè)onComplete, 也不能發(fā)多個(gè)onError, 也不能先發(fā)一個(gè)onComplete, 然后再發(fā)一個(gè)onError, 反之亦然

注: 關(guān)于onComplete和onError唯一并且互斥這一點(diǎn), 是需要自行在代碼中進(jìn)行控制, 如果你的代碼邏輯中違背了這個(gè)規(guī)則, **并不一定會(huì)導(dǎo)致程序崩潰. ** 比如發(fā)送多個(gè)onComplete是可以正常運(yùn)行的, 依然是收到第一個(gè)onComplete就不再接收了, 但若是發(fā)送多個(gè)onError, 則收到第二個(gè)onError事件會(huì)導(dǎo)致程序會(huì)崩潰.??


2. RxJava中內(nèi)置的線程

????Schedulers.io() 代表io操作的線程, 通常用于網(wǎng)絡(luò),讀寫文件等io密集型的操作

? ??Schedulers.computation() 代表CPU計(jì)算密集型的操作, 例如需要大量計(jì)算的操作

????Schedulers.newThread() 代表一個(gè)常規(guī)的新線程

? ??AndroidSchedulers.mainThread() 代表Android的主線程

這些內(nèi)置的Scheduler已經(jīng)足夠滿足我們開發(fā)的需求, 因此我們應(yīng)該使用內(nèi)置的這些選項(xiàng), 在RxJava內(nèi)部使用的是線程池來維護(hù)這些線程, 所以效率也比較高.

線程切換:

? ??.subscribeOn(Schedulers.newThread()) //指定的是上游發(fā)送事件的線程

????.observeOn(AndroidSchedulers.mainThread()) //指定的是下游接收事件的線程.

多次指定上游的線程只有第一次指定的有效, 也就是說多次調(diào)用subscribeOn()?只有第一次的有效, 其余的會(huì)被忽略.

多次指定下游的線程是可以的, 也就是說每調(diào)用一次observeOn()?, 下游的線程就會(huì)切換一次.


3. 在Activity退出時(shí)要切斷水管(訂閱),調(diào)用Disposabledispose()方法切斷連接,如果有多個(gè)Disposable,使用RxJava中內(nèi)置的容器CompositeDisposable, 每得到一個(gè)Disposable就調(diào)用?CompositeDisposable.add(),將它添加到容器中,在Activity退出的時(shí)候, 調(diào)用CompositeDisposable.clear()?即可切斷所有的水管(訂閱)


4. 最簡(jiǎn)單的map轉(zhuǎn)換操作符:

? ? 通過map, 可以將上游發(fā)來的事件轉(zhuǎn)換為任意的類型, 可以是一個(gè)Object, 也可以是一個(gè)集合.

? ? 示例代碼:

? ??????.map(new Function<Integer, String>() {

? ? ? ? ? ? //將Integer轉(zhuǎn)成String類型

????????????@Override

????????????public??String apply(Integer integer) throws Exception{

????????????return"This is result "+ integer;?

?????????} })


5. flatMap操作符:

? ??上游每發(fā)送一個(gè)事件, flatMap都將創(chuàng)建一個(gè)新的水管, 然后發(fā)送轉(zhuǎn)換之后的新的事件, 下游接收到的就是這些新的水管發(fā)送的數(shù)據(jù).這里需要注意的是, flatMap并不保證事件的順序,也就是圖中所看到的, 并不是事件1就在事件2的前面. 如果需要保證順序則需要使用concatMap,用法一模一樣

????示例代碼:

? ? ? ? .concatMap(newFunction<String, ObservableSource<Integer>>() {

????????????@Override

????????????public ObservableSource??apply(Integer integer)throws Exception{

????????????????final List list =new ArrayList<>();

????????????????for(int i =0; i <3; i++) {?

?????????????????????list.add("I am value "+ integer);?

?????????????????}

? ? ? ? ?????????return Observable.fromIterable(list);

? ? ? } })

????實(shí)踐示例:可以實(shí)現(xiàn)嵌套請(qǐng)求,如注冊(cè)完成去登錄就可以使用concatMap或者flatMap了


6. zip操作符通過一個(gè)函數(shù)將多個(gè)Observable發(fā)送的事件結(jié)合到一起耸黑,然后發(fā)送這些組合到一起的事件. 它按照嚴(yán)格的順序應(yīng)用這個(gè)函數(shù)盈简。它只發(fā)射與發(fā)射數(shù)據(jù)項(xiàng)最少的那個(gè)Observable一樣多的數(shù)據(jù)。并且一個(gè)事件只能被使用一次,?組合的順序是嚴(yán)格按照事件發(fā)送的順序來進(jìn)行的.并且發(fā)送的事件都是在同一線程,需要切換線程,發(fā)送事件才可以在不同線程同步進(jìn)行發(fā)送.

? ? 示例代碼:

? ??????Observable.zip(observable1, observable2,new BiFunction<Integer,String,String>(){

????????????@Override

?????????????public String apply(Integer integer, String s)throws Exception{

????????????????????return? integer + s;

?????????} })

? ? 實(shí)踐示例:比如一個(gè)界面需要展示用戶的一些信息, 而這些信息分別要從兩個(gè)服務(wù)器接口中獲取, 而只有當(dāng)兩個(gè)都獲取到了之后才能進(jìn)行展示, 這個(gè)時(shí)候就可以用Zip了

7. filter:過濾操作符,過濾上游事件

? ? ? ? 示例代碼:

? ??????????.filter(new Predicate() {

????????????????@Override

? ????????????? public boolean test(Integer integer)throws Exception {

????????????????????return integer%10==0;

????????????????}

????????????})

8. sample:取樣操作符,每隔指定的時(shí)間就從上游中取出一個(gè)事件發(fā)送給下游

? ? 示例代碼:.sample(2, TimeUnit.SECONDS)//沒個(gè)兩秒sample取樣一次上游事件

9.Flowable的使用

? ? 上游是Flowable,下游是Subscriber,鏈接不變,和Observable用法一樣,只是多了一個(gè)?背壓策略參數(shù):BackpressureStrategy.ERROR;

Flowable在設(shè)計(jì)的時(shí)候采用了一種新的思路也就是響應(yīng)式拉取的方式來更好的解決上下游流速不均衡的問題;

FLowable相比Observable, 在性能方面有些不足, 畢竟FLowable內(nèi)部為了實(shí)現(xiàn)響應(yīng)式拉取做了更多的操作, 性能有所丟失也是在所難免;

Flowable默認(rèn)最多可以存放128個(gè)事件;

背壓策略:

? ?? BackpressureStrategy.ERROR:當(dāng)下游沒有處理事件能力是拋出 MissingBackpressureException異常

? ?? BackpressureStrategy.BUFFER:上游水缸沒有大小限制;

? ? ?BackpressureStrategy.DROP: 直接把存不下的事件丟棄;

? ?? BackpressureStrategy.LATEST:只保留最新的事件;

? ? 注:Latest和Drop的區(qū)別在于Latest總是能獲取到最后最新的事件;

Flowable<Integer> upstream = Flowable.create(new FlowableOnSubscribe<Integer>(){

????????@Override?

????????public void subscribe(FlowableEmitter<Integer> emitter)throws Exception{? ? ? ? ? ? ? ? ? ?????????????Log.d(TAG,"emit 1");?

?????????????emitter.onNext(1);

?????????????Log.d(TAG,"emit 2");

?????????????emitter.onNext(2);?

?????????????Log.d(TAG,"emit 3");

?????????????emitter.onNext(3);

?????????????Log.d(TAG,"emit complete");

?????????????emitter.onComplete();

?????????}??}, BackpressureStrategy.ERROR);//增加了一個(gè)參數(shù),背壓策略

? ??????Subscriber<Integer> downstream =new Subscriber<Integer>() {

? ? ? ? ? ? ? ?@Override

????????????????publicvoid onSubscribe(Subscription s){?

?????????????????????Log.d(TAG,"onSubscribe");?

?????????????????????s.request(Long.MAX_VALUE);//注意這句代碼

????????????????}

????????????@Override?

????????????public void onNext(Integer integer){

?????????????????Log.d(TAG,"onNext: "+ integer); }

????????????@Override

????????????public void onError(Throwable t){?

?????????????????Log.w(TAG,"onError: ", t); }

????????????@Override

? ? ? ? ? ? ?public void onComplete(){?

?????????????????Log.d(TAG,"onComplete");

? ? ? ? ? } };?

?????????upstream.subscribe(downstream);

10.下游Subscriber的onSubcribe方法中傳遞的是一個(gè)Subscription,它也是上下游的一個(gè)開關(guān),調(diào)用Subscription.cancel()也可以切斷水管,同時(shí)也增加一個(gè)Subscription.requset(3)方法,該方法代表下游處理事件的能力,參數(shù)是幾就是代表能處理幾個(gè)事件

11.同一線程中在上游使用FlowableEmitter.requested()獲取下游處理事件能力的多少;

? ??當(dāng)上下游工作在不同的線程里時(shí)陶因,每一個(gè)線程里都有一個(gè)requested方法堰汉,而我們調(diào)用request(1000)時(shí),實(shí)際上改變的是下游線程中的requested费什,而上游中的requested的值是由RxJava內(nèi)部調(diào)用request(n)去設(shè)置的钾恢,這個(gè)調(diào)用會(huì)在合適的時(shí)候自動(dòng)觸發(fā)。

注:不同線程中下游每消費(fèi)96個(gè)事件便會(huì)自動(dòng)觸發(fā)內(nèi)部的request()去設(shè)置上游的requested的值, 發(fā)送事件前先判斷當(dāng)前的requested的值是否大于0鸳址,若等于0則說明下游處理不過來了瘩蚪,則需要等待

12.只有onNext()事件才會(huì)消耗事件,complete和error事件不會(huì)消耗requested;

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市稿黍,隨后出現(xiàn)的幾起案子疹瘦,更是在濱河造成了極大的恐慌,老刑警劉巖巡球,帶你破解...
    沈念sama閱讀 221,548評(píng)論 6 515
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件言沐,死亡現(xiàn)場(chǎng)離奇詭異邓嘹,居然都是意外死亡,警方通過查閱死者的電腦和手機(jī)险胰,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 94,497評(píng)論 3 399
  • 文/潘曉璐 我一進(jìn)店門汹押,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人鸯乃,你說我怎么就攤上這事鲸阻。” “怎么了缨睡?”我有些...
    開封第一講書人閱讀 167,990評(píng)論 0 360
  • 文/不壞的土叔 我叫張陵鸟悴,是天一觀的道長(zhǎng)。 經(jīng)常有香客問我奖年,道長(zhǎng)细诸,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 59,618評(píng)論 1 296
  • 正文 為了忘掉前任陋守,我火速辦了婚禮震贵,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘水评。我一直安慰自己猩系,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 68,618評(píng)論 6 397
  • 文/花漫 我一把揭開白布中燥。 她就那樣靜靜地躺著寇甸,像睡著了一般。 火紅的嫁衣襯著肌膚如雪疗涉。 梳的紋絲不亂的頭發(fā)上炮赦,一...
    開封第一講書人閱讀 52,246評(píng)論 1 308
  • 那天软能,我揣著相機(jī)與錄音糖权,去河邊找鬼企量。 笑死,一個(gè)胖子當(dāng)著我的面吹牛闹伪,可吹牛的內(nèi)容都是我干的沪铭。 我是一名探鬼主播,決...
    沈念sama閱讀 40,819評(píng)論 3 421
  • 文/蒼蘭香墨 我猛地睜開眼祭往,長(zhǎng)吁一口氣:“原來是場(chǎng)噩夢(mèng)啊……” “哼伦意!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起硼补,我...
    開封第一講書人閱讀 39,725評(píng)論 0 276
  • 序言:老撾萬榮一對(duì)情侶失蹤驮肉,失蹤者是張志新(化名)和其女友劉穎,沒想到半個(gè)月后已骇,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體离钝,經(jīng)...
    沈念sama閱讀 46,268評(píng)論 1 320
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡票编,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 38,356評(píng)論 3 340
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了卵渴。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片慧域。...
    茶點(diǎn)故事閱讀 40,488評(píng)論 1 352
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖浪读,靈堂內(nèi)的尸體忽然破棺而出昔榴,到底是詐尸還是另有隱情,我是刑警寧澤碘橘,帶...
    沈念sama閱讀 36,181評(píng)論 5 350
  • 正文 年R本政府宣布互订,位于F島的核電站,受9級(jí)特大地震影響痘拆,放射性物質(zhì)發(fā)生泄漏仰禽。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,862評(píng)論 3 333
  • 文/蒙蒙 一纺蛆、第九天 我趴在偏房一處隱蔽的房頂上張望吐葵。 院中可真熱鬧,春花似錦桥氏、人聲如沸温峭。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,331評(píng)論 0 24
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)诚镰。三九已至,卻和暖如春祥款,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背月杉。 一陣腳步聲響...
    開封第一講書人閱讀 33,445評(píng)論 1 272
  • 我被黑心中介騙來泰國(guó)打工刃跛, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人苛萎。 一個(gè)月前我還...
    沈念sama閱讀 48,897評(píng)論 3 376
  • 正文 我出身青樓桨昙,卻偏偏與公主長(zhǎng)得像,于是被迫代替她去往敵國(guó)和親腌歉。 傳聞我的和親對(duì)象是個(gè)殘疾皇子蛙酪,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,500評(píng)論 2 359

推薦閱讀更多精彩內(nèi)容