RxJava2 中常用操作符和Subject常用子類代碼練習(xí)及說(shuō)明前弯。
ReactiveX 系列所有操作符以及RxJava2與RxJava1的操作符變化可查看 ReactiveX 操作符
示例代碼下載地址 GitHub下載
(一)項(xiàng)目中主要包含的操作符展示
創(chuàng)建操作符
- create():通過(guò)調(diào)用觀察者的方法從頭創(chuàng)建一個(gè)Observable
- just():將對(duì)象或者對(duì)象集合轉(zhuǎn)換為一個(gè)會(huì)發(fā)射這些對(duì)象的Observable
- fromXxx()系列:
fromArray()
/fromIterable()
/fromFuture()
等磺樱;將其它的對(duì)象辨泳、數(shù)字或列表數(shù)據(jù)結(jié)構(gòu)轉(zhuǎn)換為Observable - empty()/never()/error():創(chuàng)建行為受限的特殊Observable
- range():創(chuàng)建發(fā)射指定范圍的 整數(shù) 序列的Observable,
range()
操作符,發(fā)射從 start 開(kāi)始的 count 個(gè)數(shù) - defer():在觀察者訂閱之前不創(chuàng)建這個(gè)Observable,當(dāng)被訂閱時(shí)為每一個(gè)觀察者創(chuàng)建一個(gè)新的Observable
- repat():作用在Observable上,會(huì)對(duì)其重復(fù)發(fā)射count次唉铜,當(dāng)沒(méi)指定次數(shù)時(shí)台舱,將一直不停的發(fā)射
- timer():在指定時(shí)間后發(fā)射一個(gè)數(shù)字0,注意其默認(rèn)運(yùn)行在RxJava的 computation 線程打毛,可以指定運(yùn)行的線程
- interval():間隔一段時(shí)間發(fā)射一個(gè)整數(shù)柿赊,整數(shù)從0開(kāi)始每發(fā)射俩功,后面的一個(gè)數(shù)就在原來(lái)的基礎(chǔ)上加1幻枉。
注意其默認(rèn)運(yùn)行在RxJava的 computation 線程,可以指定運(yùn)行的線程诡蜓,同時(shí)又多個(gè)重載方法熬甫,還可以指定發(fā)射第一個(gè)數(shù)之前的延遲時(shí)間 - intervalRange():和
interval()
操作符類似,不同的是可以指定數(shù)字的開(kāi)始大小和一共發(fā)射的個(gè)數(shù)蔓罚,可以指定線程椿肩,默認(rèn)在 RxJava 的 computation 線程
變換操作符
- map():通過(guò)對(duì)序列的每一項(xiàng)都應(yīng)用一個(gè)函數(shù)變換Observable發(fā)射的數(shù)據(jù),實(shí)質(zhì)是對(duì)序列中的每一項(xiàng)執(zhí)行一個(gè)函數(shù)豺谈,函數(shù)的參數(shù)就是這個(gè)數(shù)據(jù)項(xiàng)
- flatMap():對(duì)Observable發(fā)射的數(shù)據(jù)都應(yīng)用(apply)一個(gè)函數(shù)郑象,這個(gè)函數(shù)返回一個(gè)Observable,然后合并這些Observables茬末,并且發(fā)送合并的結(jié)果厂榛,異步時(shí)不能保證順序不變
- concatMap():對(duì)Observable發(fā)射的數(shù)據(jù)都應(yīng)用(apply)一個(gè)函數(shù)盖矫,這個(gè)函數(shù)返回一個(gè)Observable,然后合并這些Observables击奶,并且發(fā)送合并的結(jié)果辈双,異步時(shí)能保證順序不變
- reduce():對(duì)Observable發(fā)射的每一項(xiàng)數(shù)據(jù)應(yīng)用一個(gè)函數(shù),但是只發(fā)射最終的值柜砾,與
scan()
操作符做比較 - scan():對(duì)Observable發(fā)射的每一項(xiàng)數(shù)據(jù)應(yīng)用一個(gè)函數(shù)湃望,然后按順序依次發(fā)射這些值,與
reduce()
操作符做比較 - buffer():可以簡(jiǎn)單的理解為緩存,它定期從Observable收集數(shù)據(jù)到一個(gè)集合痰驱,然后把這些數(shù)據(jù)集合打包發(fā)射证芭,而不是一次發(fā)射一個(gè),有多個(gè)重載方法担映,可指定延遲時(shí)間檩帐、間隔時(shí)間、線程等
- groupBy():將原來(lái)的Observable分拆為Observable集合另萤,將原始Observable發(fā)射的數(shù)據(jù)按Key分組湃密,每一個(gè)Observable發(fā)射一組不同的數(shù)據(jù),有多個(gè)重載方法
- window():定期將來(lái)自O(shè)bservable的數(shù)據(jù)分拆成一些Observable窗口四敞,然后發(fā)射這些窗口泛源,而不是每次發(fā)射一項(xiàng),有多個(gè)重載方法
過(guò)濾操作符
- filter():過(guò)濾掉沒(méi)有通過(guò)謂詞測(cè)試的數(shù)據(jù)項(xiàng)忿危,只發(fā)射通過(guò)測(cè)試的
- distinct():過(guò)濾掉重復(fù)數(shù)據(jù)項(xiàng)
- elementAt():取特定位置的數(shù)據(jù)項(xiàng)
- debounce():只有在空閑了一段時(shí)間后才發(fā)射數(shù)據(jù)达箍,通俗的說(shuō),就是如果一段時(shí)間沒(méi)有操作铺厨,就執(zhí)行一次操作.還可以根據(jù)一個(gè)函數(shù)來(lái)進(jìn)行限流缎玫。這個(gè)函數(shù)的返回值是一個(gè)臨時(shí)Observable, 如果源Observable在發(fā)射一個(gè)新的數(shù)據(jù)的時(shí)候解滓,上一個(gè)數(shù)據(jù)根據(jù)函數(shù)所生成的臨時(shí)Observable還沒(méi)有結(jié)束,沒(méi)有調(diào)用onComplete赃磨,那么上一個(gè)數(shù)據(jù)就會(huì)被過(guò)濾掉。如果是最后一個(gè),還是會(huì)發(fā)射
- ignoreElements():忽略所有的數(shù)據(jù)洼裤,只保留終止通知(onError或onCompleted)
- sample():定期發(fā)射最新的數(shù)據(jù)邻辉,等于是數(shù)據(jù)抽樣。多個(gè)重載方法腮鞍,可指定線運(yùn)行所在線程等值骇,默認(rèn) RxJava的 computation 線程
- first()/firstElement():只發(fā)射滿足條件的第一條數(shù)據(jù)
- last()}/lastElement():只發(fā)射最后一條數(shù)據(jù)
- skip()}/skipLast():跳過(guò)前面/最后 count 項(xiàng) ,多個(gè)重載方法移国,可設(shè)置延遲時(shí)間和運(yùn)行線程吱瘩,默認(rèn)非主線程
- take()}/takeLast():只取前面/最后 count 項(xiàng) ,多個(gè)重載方法迹缀,可設(shè)置延遲時(shí)間和運(yùn)行線程使碾,默認(rèn)非主線程
組合操作符
- merger():將兩個(gè)后多個(gè)Observable/Iterable發(fā)射的數(shù)據(jù)組合并成一個(gè)皱卓,Merge 操作符可能會(huì)讓合并的Observables發(fā)射的數(shù)據(jù)交錯(cuò)
- concat():將兩個(gè)后多個(gè)Observable/Iterable發(fā)射的數(shù)據(jù)組合并成一個(gè),Concat 操作符不會(huì)讓合并的Observables發(fā)射的數(shù)據(jù)交錯(cuò)部逮,它會(huì)按順序一個(gè)接著一個(gè)發(fā)射多個(gè)Observables的發(fā)射項(xiàng)
- zip():使用一個(gè)指定的函數(shù)將多個(gè)Observable發(fā)射的數(shù)據(jù)組合在一起娜汁,然后將這個(gè)函數(shù)的結(jié)果作為單項(xiàng)數(shù)據(jù)發(fā)射。類似操作符
zipIterable()
/zipArray()
- ① Zip操作符將多個(gè)Observable發(fā)射的數(shù)據(jù)按順序組合起來(lái)兄朋,每個(gè)數(shù)據(jù)只能組合一次掐禁,而且都是有序的;
- ② 最終組合的數(shù)據(jù)的數(shù)量由發(fā)射數(shù)據(jù)最少的Observable來(lái)決定
- join():無(wú)論何時(shí),如果一個(gè)Observable發(fā)射了一個(gè)數(shù)據(jù)項(xiàng)颅和,只要在另一個(gè)Observable發(fā)射的數(shù)據(jù)項(xiàng)定義的時(shí)間窗口內(nèi)傅事,就將兩個(gè)Observable發(fā)射的數(shù)據(jù)合并發(fā)射.
- ① 源Observable.join(所要組合的目標(biāo)Observable);
- ② 第一個(gè)參數(shù)峡扩,接收從源Observable發(fā)射來(lái)的數(shù)據(jù)蹭越,并返回一個(gè)Observable,這個(gè)Observable的生命周期決定了源Observable發(fā)射出來(lái)數(shù)據(jù)的有效期教届;
- ③ 第二個(gè)參數(shù)响鹃,接收從目標(biāo)Observable發(fā)射來(lái)的數(shù)據(jù),并返回一個(gè)Observable案训,這個(gè)Observable的生命周期決定了目標(biāo)Observable發(fā)射出來(lái)數(shù)據(jù)的有效期买置;
- ④ 第三個(gè)參數(shù),接收從源Observable和目標(biāo)Observable發(fā)射來(lái)的數(shù)據(jù)强霎,并返回最終組合完的數(shù)據(jù)忿项。
- combineLatest():當(dāng)兩個(gè)Observables中的任何一個(gè)發(fā)射了一個(gè)數(shù)據(jù)時(shí),通過(guò)一個(gè)指定的函數(shù)組合每個(gè)Observable發(fā)射的最新數(shù)據(jù)(一共兩個(gè)數(shù)據(jù))城舞,然后發(fā)射這個(gè)函數(shù)的結(jié)果.
比如:Observable1發(fā)射了A并且Observable2發(fā)射了B和C轩触,combineLatest()
將會(huì)分組處理AB和AC。- 必須滿足的兩個(gè)條件:
- ① 所有的Observable都發(fā)射過(guò)數(shù)據(jù)家夺;
- ② 滿足條件1的時(shí)候任何一個(gè)Observable發(fā)射一個(gè)數(shù)據(jù)脱柱,就將所有Observable最新發(fā)射的數(shù)據(jù)按照提供的函數(shù)組裝起來(lái)發(fā)射出去。
- 注意:
- 在這兩個(gè)條件下,可能會(huì)忽略掉一些發(fā)射的數(shù)據(jù).
- 必須滿足的兩個(gè)條件:
- switchOnNext()/switchMap():將一個(gè)發(fā)射Observable序列的Observable轉(zhuǎn)換為這樣一個(gè)Observable:它逐個(gè)發(fā)射那些Observable最近發(fā)射的數(shù)據(jù)秦踪。用來(lái)將一個(gè)發(fā)射多個(gè)小Observable的源Observable轉(zhuǎn)化為一個(gè)Observable褐捻,然后發(fā)射這多個(gè)小Observable所發(fā)射的數(shù)據(jù)掸茅。
- 需要注意的就是椅邓,如果一個(gè)小的Observable正在發(fā)射數(shù)據(jù)的時(shí)候,源Observable又發(fā)射出一個(gè)新的小Observable昧狮,則前一個(gè)Observable發(fā)射的數(shù)據(jù)會(huì)被拋棄胯府,直接發(fā)射新的小Observable所發(fā)射的數(shù)據(jù)茸苇。
- startWith():在發(fā)射原來(lái)的Observable的數(shù)據(jù)序列之前,先發(fā)射一個(gè)指定的數(shù)據(jù)序列或數(shù)據(jù)項(xiàng)(在數(shù)據(jù)序列的開(kāi)頭插入一條指定的項(xiàng))框舔。
- 這個(gè)指定的項(xiàng)可以是單個(gè)的發(fā)射項(xiàng);也可以是數(shù)組瓣铣、列表或者一個(gè)Observable。
- and()/then()/when():通過(guò)模式(And條件)和計(jì)劃(Then次序)組合兩個(gè)或多個(gè)Observable發(fā)射的數(shù)據(jù)集
- 注意:
- ① 需要導(dǎo)入包含 and/then/when 操作符的庫(kù)
compile 'io.reactivex:rxjava-joins:0.22.0
- ② 同時(shí) Observable 和 Observer 也要使用
rx.Observabl
e 和rx.Observer
包下的。
錯(cuò)誤處理操作符
- onErrorReturn():用返回的字符串替換錯(cuò)誤的項(xiàng)笨使,然后調(diào)用
onComplete()
方法 - onErrorResumeNext(Function):創(chuàng)建一個(gè)新的 Observable 發(fā)射,發(fā)射完新的 Observable 中的所有數(shù)據(jù)后調(diào)用
onComplete()
方法 - onErrorResumeNext(Observable):創(chuàng)建一個(gè)新的 Observable 僚害,使用新的 Observable 繼續(xù)發(fā)射數(shù)據(jù)硫椰,數(shù)據(jù)項(xiàng)發(fā)射完成不會(huì)自動(dòng)調(diào)用
onComplete()
方法 - onErrorReturnItem():用指定的字符串替換錯(cuò)誤的項(xiàng),然后調(diào)用
onComplete()
方法 - onExceptionResumeNext():
onExceptionResumeNext()
操作符區(qū)分異常類型- ① 如果拋出的是
Exception
,會(huì)先發(fā)射新的 Observable 的所有項(xiàng)后調(diào)用onComplete()
方法 - ② 如果拋出的是
Throwable
萨蚕,那么會(huì)直接調(diào)用onError()
方法
- ① 如果拋出的是
- retry():在發(fā)生錯(cuò)誤的時(shí)候會(huì)重新進(jìn)行訂閱,而且可以重復(fù)多次靶草,所以發(fā)射的數(shù)據(jù)可能會(huì)產(chǎn)生重復(fù)
- 有多個(gè)重載方法,可以指定重復(fù)次數(shù)岳遥、判斷是否需要重復(fù)等
- 如果重復(fù)指定次數(shù)還有錯(cuò)誤的話就會(huì)將錯(cuò)誤返回給觀察者,會(huì)調(diào)用
onError()
方法
- retryWhen():當(dāng)錯(cuò)誤發(fā)生時(shí)奕翔,
retryWhen()
會(huì)接收onError()
的throwable
作為參數(shù),并根據(jù)定義好的函數(shù)返回一個(gè)Observable浩蓉,如果這個(gè)Observable發(fā)射一個(gè)數(shù)據(jù)派继,就會(huì)重新訂閱這個(gè)Observable
輔助操作符
- delay():延遲一段時(shí)間發(fā)射結(jié)果數(shù)據(jù)。多個(gè)重載方法捻艳,可以指定運(yùn)行線程互艾、出錯(cuò)時(shí)是否延遲發(fā)送等
- doXxx()系列:Do操作符就是給Observable的生命周期的各個(gè)階段加上一系列的回調(diào)監(jiān)聽(tīng),當(dāng)Observable執(zhí)行到這個(gè)階段的時(shí)候讯泣,這些回調(diào)就會(huì)被觸發(fā)
- subscribeOn():SubscribeOn:指定Observable應(yīng)該在哪個(gè)調(diào)度程序上執(zhí)行纫普;ObserveOn:指定Subscriber的調(diào)度程序(工作線程)
- observeOn():ObserveOn:指定Subscriber的調(diào)度程序(工作線程);SubscribeOn:指定Observable應(yīng)該在哪個(gè)調(diào)度程序上執(zhí)行
- materialize():Meterialize 操作符將
OnNext
/OnError
/OnComplete
都轉(zhuǎn)化為一個(gè) Notification 對(duì)象并按照原來(lái)的順序發(fā)射出來(lái) - dematerialize():與Meterialize 操作符相反
- timeInterval():TimeInterval會(huì)攔截發(fā)射出來(lái)的數(shù)據(jù)好渠,取代為前后兩個(gè)發(fā)射兩個(gè)數(shù)據(jù)的間隔時(shí)間昨稼。對(duì)于第一個(gè)發(fā)射的數(shù)據(jù),其時(shí)間間隔為訂閱后到首次發(fā)射的間隔拳锚。多個(gè)重載假栓,可以指定時(shí)間單位和線程
- timestamp():TimeStamp會(huì)將每個(gè)數(shù)據(jù)項(xiàng)給重新包裝一下,加上了一個(gè)時(shí)間戳來(lái)標(biāo)明每次發(fā)射的時(shí)間霍掺。多個(gè)重載匾荆,可以指定時(shí)間單位和線程
- timeout():Timeout操作符給Observable加上超時(shí)時(shí)間,每發(fā)射一個(gè)數(shù)據(jù)后就重置計(jì)時(shí)器杆烁,當(dāng)超過(guò)預(yù)定的時(shí)間還沒(méi)有發(fā)射下一個(gè)數(shù)據(jù)牙丽,就拋出一個(gè)超時(shí)的異常。有多個(gè)重載兔魂,可以自定義更多的功能
- using():創(chuàng)建一個(gè)只在Observable的生命周期內(nèi)存在的一次性資源
- 參數(shù)說(shuō)明:
- ① 創(chuàng)建這個(gè)一次性資源的函數(shù)
- ② 創(chuàng)建Observable的函數(shù)
- ③ 釋放資源的函數(shù)
- 參數(shù)說(shuō)明:
條件和布爾操作符
- all():判斷Observable發(fā)射的所有的數(shù)據(jù)項(xiàng)是否都滿足某個(gè)條件
- amb():給定多個(gè)Observable烤芦,只讓第一個(gè)發(fā)射數(shù)據(jù)的Observable發(fā)射全部數(shù)據(jù)
- contains():判斷Observable是否會(huì)發(fā)射一個(gè)指定的數(shù)據(jù)項(xiàng)
- sequenceEqual():判斷兩個(gè)Observable是否按相同的數(shù)據(jù)序列。有多個(gè)重載方法析校」孤蓿【判斷兩個(gè)Observable發(fā)射的數(shù)據(jù)序列是否相同(發(fā)射的數(shù)據(jù)相同铜涉,數(shù)據(jù)的序列相同,結(jié)束的狀態(tài)相同)遂唧,如果相同返回 true芙代,否則返回 false】
- isEmpty():操作符用來(lái)判斷源 Observable 是否發(fā)射過(guò)數(shù)據(jù),沒(méi)有發(fā)射過(guò)數(shù)據(jù)返回true
- defaultIfEmpty():操作符會(huì)判斷源 Observable 是否發(fā)射數(shù)據(jù)盖彭,如果源 Observable 發(fā)射了數(shù)據(jù)則正常發(fā)射這些數(shù)據(jù)链蕊,如果沒(méi)有則發(fā)射一個(gè)默認(rèn)的數(shù)據(jù)
- skipUntil():根據(jù)一個(gè)標(biāo)志 Observable 來(lái)判斷的,當(dāng)這個(gè)標(biāo)志 Observable 沒(méi)有發(fā)射數(shù)據(jù)的時(shí)候谬泌,所有源 Observable 發(fā)射的數(shù)據(jù)都會(huì)被跳過(guò)滔韵; 當(dāng)標(biāo)志Observable發(fā)射了一個(gè)數(shù)據(jù),則開(kāi)始正常地發(fā)射數(shù)據(jù)
- skipWhile():根據(jù)一個(gè)函數(shù)來(lái)判斷是否跳過(guò)數(shù)據(jù)掌实, 當(dāng)函數(shù)返回值為 true 的時(shí)候則一直跳過(guò)源 Observable 發(fā)射的數(shù)據(jù)陪蜻;當(dāng)函數(shù)返回 false 的時(shí)候則開(kāi)始正常發(fā)射數(shù)據(jù)
- 注意:skipWhile() 操作符根據(jù)函數(shù)來(lái)判斷是否跳過(guò),當(dāng)函數(shù)返回 true 時(shí)贱鼻,就每發(fā)射一次數(shù)據(jù)都會(huì)執(zhí)行函數(shù)宴卖,并且跳過(guò)當(dāng)前數(shù)據(jù);當(dāng)函數(shù)返回了 false 時(shí)邻悬,那么后面就會(huì)正常發(fā)射數(shù)據(jù)(包括當(dāng)前數(shù)據(jù)也會(huì)正常發(fā)射)症昏,不在執(zhí)行這個(gè)函數(shù)了
- takeUntil():根據(jù)一個(gè)標(biāo)志 Observable 來(lái)判斷的, 當(dāng)這個(gè)標(biāo)志 Observable 發(fā)射了數(shù)據(jù)的時(shí)候父丰,所有源 Observable 發(fā)射的數(shù)據(jù)都會(huì)被跳過(guò)肝谭;當(dāng)標(biāo)志 Observable 沒(méi)有發(fā)射一個(gè)數(shù)據(jù)的時(shí)候,則正常地發(fā)射數(shù)據(jù)
- takeWhile():根據(jù)一個(gè)函數(shù)來(lái)判斷是否跳過(guò)數(shù)據(jù)蛾扇, 當(dāng)函數(shù)返回值為 true 的時(shí)候則正常發(fā)射源 Observable 的數(shù)據(jù)攘烛;當(dāng)函數(shù)返回 false 的時(shí)候?qū)⒉辉诎l(fā)射任何數(shù)據(jù)
- 注意:takeWhile() 操作符根據(jù)函數(shù)來(lái)判斷是否跳過(guò),當(dāng)函數(shù)返回 true 時(shí)镀首,就每發(fā)射一次數(shù)據(jù)都會(huì)執(zhí)行函數(shù)坟漱,并且數(shù)據(jù)發(fā)送成功;當(dāng)函數(shù)返回了 false 時(shí)更哄,那么就會(huì)跳過(guò)所有的數(shù)據(jù)了(包括當(dāng)前的數(shù)據(jù)也會(huì)跳過(guò))芋齿,并且不在執(zhí)行這個(gè)函數(shù)了
算術(shù)和聚合操作符
- count():Count 操作符用來(lái)統(tǒng)計(jì)源 Observable 發(fā)射了多少個(gè)數(shù)據(jù),最后將數(shù)目給發(fā)射出來(lái)
- 如果源 Observable 發(fā)射錯(cuò)誤成翩,則會(huì)將錯(cuò)誤直接報(bào)出來(lái)觅捆;在源 Observable 沒(méi)有終止前,count 是不會(huì)發(fā)射統(tǒng)計(jì)數(shù)據(jù)的捕传。
- collect():Collect 用來(lái)將源 Observable 發(fā)射的數(shù)據(jù)給收集到一個(gè)數(shù)據(jù)結(jié)構(gòu)里面惠拭,需要使用兩個(gè)參數(shù):
- ① 第一個(gè)產(chǎn)生收集數(shù)據(jù)結(jié)構(gòu)的函數(shù);
- ② 第二個(gè)接收第一個(gè)函數(shù)產(chǎn)生的數(shù)據(jù)結(jié)構(gòu)和源Observable發(fā)射的數(shù)據(jù)作為參數(shù)的函數(shù)
- concat():將兩個(gè)后多個(gè)Observable/Iterable發(fā)射的數(shù)據(jù)組合并成一個(gè)
- Concat 操作符不會(huì)讓合并的Observables發(fā)射的數(shù)據(jù)交錯(cuò)庸论,它會(huì)按順序一個(gè)接著一個(gè)發(fā)射多個(gè)Observables的發(fā)射項(xiàng) 與
merge()
操作符對(duì)比查看
- Concat 操作符不會(huì)讓合并的Observables發(fā)射的數(shù)據(jù)交錯(cuò)庸论,它會(huì)按順序一個(gè)接著一個(gè)發(fā)射多個(gè)Observables的發(fā)射項(xiàng) 與
- reduce():對(duì)Observable發(fā)射的每一項(xiàng)數(shù)據(jù)應(yīng)用一個(gè)函數(shù)职辅,但是只發(fā)射最終的值,
scan()
操作符做比較
連接操作符
什么是可連接的 Observable(Connectable Observable):
可連接的 Observable 是一種特殊的 Observable 對(duì)象聂示,并不是 訂閱(Subscrib) 的時(shí)候就發(fā)射數(shù)據(jù)域携,而是只有對(duì)其調(diào)用
connect()
操作符的時(shí)候才開(kāi)始發(fā)射數(shù)據(jù),所以可以用來(lái)更靈活的控制數(shù)據(jù)發(fā)射的時(shí)機(jī)鱼喉。
- publish():將一個(gè)普通的 Observable 對(duì)象轉(zhuǎn)化為一個(gè)可連接的(Connectable Observable)秀鞭。需要注意的是如果發(fā)射數(shù)據(jù)已經(jīng)開(kāi)始了再進(jìn)行訂閱只能接收以后發(fā)射的數(shù)據(jù)
- connect():Connect操作符就是用來(lái)觸發(fā) Connectable Observable 發(fā)射數(shù)據(jù)的
- 調(diào)用
connect()
操作符后會(huì)返回一個(gè) Disposable 對(duì)象,通過(guò)這個(gè) Disposable 對(duì)象扛禽,我們可以調(diào)用其dispose()
方法來(lái)終止數(shù)據(jù)的發(fā)射锋边;另外,即使還沒(méi)有訂閱者訂閱的時(shí)候就調(diào)用connect()
操作符也是可以使其開(kāi)始發(fā)射數(shù)據(jù)的(只要調(diào)用了connect()
方法就會(huì)開(kāi)始發(fā)射數(shù)據(jù)编曼,不管是否有訂閱者訂閱的事件)豆巨。
- 調(diào)用
- refCount():是將一個(gè) Connectable Observable 對(duì)象再重新轉(zhuǎn)化為一個(gè)普通的 Observable 對(duì)象,這時(shí)候訂閱者進(jìn)行訂閱時(shí)就會(huì)觸發(fā)數(shù)據(jù)的發(fā)射
- replay():返回一個(gè) Connectable Observable 對(duì)象并且可以緩存其發(fā)射過(guò)的數(shù)據(jù)掐场,這樣即使有訂閱者在其發(fā)射數(shù)據(jù)之后進(jìn)行訂閱也能收到其之前發(fā)射過(guò)的數(shù)據(jù)
- <b>注意:
- ① 使用
replay()
操作符我們最好還是限定其緩存的大小往扔,否則緩存的數(shù)據(jù)太多了可會(huì)占用很大的一塊內(nèi)存,有多個(gè)重載方法可以指定緩存大小熊户、運(yùn)行線程的萍膛。 - ②
replay()
操作符直接返回一個(gè) Connectable Observable 對(duì)象,不用在調(diào)用publish()
操作符</b>
轉(zhuǎn)換操作符
- toXxx():將 Observable 轉(zhuǎn)換為其它的對(duì)象或數(shù)據(jù)結(jié)構(gòu)(
toList()
、toMap()
嚷堡、toMultimap()
蝗罗、...)
(二)Subject常用子類說(shuō)明
AsyncSubject 類
使用AsyncSubject無(wú)論輸入多少參數(shù),永遠(yuǎn)只輸出最后一個(gè)參數(shù)
① 一定要用Subcect.create()的方式創(chuàng)建并使用蝌戒,不要用just(T)绿饵、from(T)、create(T)創(chuàng)建瓶颠,否則會(huì)導(dǎo)致失效
② 如果因?yàn)榘l(fā)生了錯(cuò)誤而終止拟赊,AsyncSubject 將不會(huì)發(fā)射任何數(shù)據(jù),只是簡(jiǎn)單的向前傳遞這個(gè)錯(cuò)誤通知
BehaviorSubject類
發(fā)送離訂閱最近的上一個(gè)值粹淋,沒(méi)有上一個(gè)值的時(shí)候會(huì)發(fā)送默認(rèn)值
① 一定要用Subcect.create()的方式創(chuàng)建并使用吸祟,不要用just(T)、from(T)桃移、create(T)創(chuàng)建屋匕,否則會(huì)導(dǎo)致失效
② 如果遇到錯(cuò)誤程序會(huì)直接中斷
PublishSubject
從哪里訂閱就從哪里開(kāi)始發(fā)送數(shù)據(jù),與ReplaySubject
類做比較
① 一定要用Subcect.create()的方式創(chuàng)建并使用借杰,不要用just(T)过吻、from(T)、create(T)創(chuàng)建,否則會(huì)導(dǎo)致失效
② 遇到錯(cuò)誤纤虽,如果重寫(xiě)些錯(cuò)誤回調(diào)乳绕,向前傳遞這個(gè)錯(cuò)誤通知,沒(méi)有寫(xiě)錯(cuò)誤回調(diào)的話逼纸,程序?qū)⒅苯訄?bào)錯(cuò)洋措,拋出異常,終止程序
ReplaySubject
無(wú)論何時(shí)訂閱杰刽,都會(huì)將所有歷史訂閱內(nèi)容全部發(fā)出菠发,與 PublishSubject
類做比較
① 一定要用Subcect.create()的方式創(chuàng)建并使用,不要用just(T)贺嫂、from(T)滓鸠、create(T)創(chuàng)建,否則會(huì)導(dǎo)致失效
② 遇到錯(cuò)誤第喳,如果重寫(xiě)些錯(cuò)誤回調(diào)糜俗,向前傳遞這個(gè)錯(cuò)誤通知,沒(méi)有寫(xiě)錯(cuò)誤回調(diào)的話墩弯,程序?qū)⒅苯訄?bào)錯(cuò)吩跋,拋出異常,終止程序
SerializedSubject
在并發(fā)情況下渔工,不推薦使用通常的Subject對(duì)象锌钮,而是推薦使用 SerializedSubject,并發(fā)時(shí)只允許一個(gè)線程調(diào)用onNext等方法
將一個(gè)普通的 Subject 變換為 SerializedSubject 只需要調(diào)用 toSerialized() 方法即可引矩。
(三)包含RxBus測(cè)試類
使用:
// 發(fā)送端:
RxBus.newInstance().post("aaa");
// 接受端:
Disposable subscribe = RxBus.newInstance().tObservable(String.class).subscribe(new Consumer () {
@Override
public void accept(String s) throws Exception {
// 處理結(jié)果
// ...
}
});
// 另外需要注意在onDestroy()方法中取消訂閱:
@Override
protected void onDestroy() {
super.onDestroy();
if (subscribe.isDisposed()) {
subscribe.dispose();
}
// 如果發(fā)送了粘性事件梁丘,需要清除所有的粘性事件
// RxBus.newInstance().clearStickyEvent();
}