1蝗蛙、什么是響應(yīng)式編程 著角?
部分內(nèi)容參考自:《RxJava Essentials 中文版》by yuxingxin
響應(yīng)式編程是一種基于異步數(shù)據(jù)流概念的編程模式长已。數(shù)據(jù)流就像一條河:它可以被觀測砰逻,被過濾鸣驱,被操作,或者為新的消費者與另外一條流合并為一條新的流蝠咆。
1.1 時間表
- 90年代后期
受微軟的一名計算機科學(xué)家Erik Meijer啟發(fā)的思想踊东,用來設(shè)計和開發(fā)微軟的Rx庫。
Rx
是微軟.NET
的一個響應(yīng)式擴展刚操。Rx
借助可觀測的序列提供一種簡單的方式來創(chuàng)建異步的闸翅,基于事件驅(qū)動的程序。開發(fā)者可以使用Observables模擬異步數(shù)據(jù)流菊霜,使用LINQ
語法查詢Observables坚冀,并且很容易管理調(diào)度器的并發(fā)。
Rx
讓眾所周知的概念變得易于實現(xiàn)和消費鉴逞,例如push
方法记某。在響應(yīng)式的世界里,我們不能假裝作用戶不關(guān)注或者是不抱怨它而一味的等待函數(shù)的返回結(jié)果构捡,網(wǎng)絡(luò)調(diào)用辙纬,或者數(shù)據(jù)庫查詢的返回結(jié)果。我們時刻都在等待某些東西叭喜,這就讓我們失去了并行處理其他事情的機會贺拣,提供更好的用戶體驗,讓我們的軟件免受順序鏈的影響捂蕴,而阻塞編程譬涡。 - 2012年
Netflix在2012年開始意識到他們的架構(gòu)要滿足他們龐大的用戶群體已經(jīng)變得步履維艱。因此他們決定重新設(shè)計架構(gòu)來減少REST
調(diào)用的次數(shù)啥辨。取代幾十次的REST
調(diào)用涡匀,而是讓客戶端自己處理需要的數(shù)據(jù),他們決定基于客戶端需求創(chuàng)建一個專門優(yōu)化過的REST
調(diào)用溉知。 - 2013年
2013年2月份陨瘩,Ben Christensen和Jafar Husain發(fā)在Netflix技術(shù)博客的一篇文章第一次向世界展示了RxJava
腕够。 - 2014年
2014年9月份,發(fā)布RxJava 1.0.0
正式版舌劳。 - 2016年
2016年9月份帚湘,發(fā)布RxJava 2.0.0
正式版。 - 2020年
2020年2月份甚淡,發(fā)布RxJava 3.0.0
正式版大诸。
1.2 定義
這里只貼出鏈接,不做介紹贯卦,感興趣的自行查看资柔。
1.3 更友好的介紹
參考自:《什么是響應(yīng)式編程?》by 享學(xué)IT
介紹了響應(yīng)式編程的三大特點:變化傳遞(propagation of change)撵割、基于數(shù)據(jù)流(data stream)贿堰、聲明式(declarative)。
具體形象的例子:“堪稱“響應(yīng)式典范”的強大的生產(chǎn)力工具——電子表格”
1.3.1 【滿199減40活動】購物計劃
下方【購物計劃表】中啡彬,【單價】【數(shù)量】是原始輸入羹与,【商品金額】跟隨【單價】和【數(shù)量】的變化而變化,【滿199減40】跟隨【商品金額】的變化而變化外遇,以此類推注簿,【訂單總金額】、【郵費】跳仿、【最終應(yīng)付款】也跟隨相應(yīng)的項的變化而變化诡渴。具體的公式以及變化的傳遞流向見【購物計劃表 公式】。
- 變化傳遞(propagation of change)
- 基于數(shù)據(jù)流(data stream)
- 聲明式(declarative)
2菲语、擴展的觀察者模式
2.1 觀察者模式
- Observable
可觀察對象妄辩,也有叫做Source
,內(nèi)部維護(hù)一組觀察者observers
山上,當(dāng)event
有更新時眼耀,observable
將event
推(push)給observer
。 - Observer
觀察者佩憾,也有叫做Consumer
哮伟、Subscriber
,觀察observable
妄帘,接收observable
推(push)過來的event
楞黄,做出相應(yīng)的反應(yīng)(不同的observer
的反應(yīng)可能不一樣)。 - Event
observer
所關(guān)心的事件event
抡驼。 - subscrbe
將observer
和observable
連接起來的操作鬼廓,叫做訂閱(subscribe
)。
2.2 擴展的觀察者
上述4個概念致盟,也就是RxJava
中碎税,最基本的幾個概念尤慰。Observer
通過subscribe
方法訂閱Observable
,從而雷蹂,在Event
有變化時伟端,Observable
可以分發(fā)給Observer
。
- Observable
- Observer
- Event
- subscribe
與傳統(tǒng)的觀察者模式不同的是萎河,RxJava
不光會通過onNext
方法分發(fā)普通事件(相當(dāng)于上節(jié)描述的Observer
中的accept
方法)荔泳,另外還會通過onComplete
和onError
方法分發(fā)兩個特殊事件蕉饼。
- onComplete
事件流已完成虐杯。表明事件流已成功發(fā)出所有的事件,后續(xù)不會再有新的事件發(fā)出昧港。(成功結(jié)束) - onError
事件流異常擎椰。表明由于發(fā)生異常,事件流將被打斷创肥,后續(xù)不會再有新的事件發(fā)出达舒。(異常結(jié)束;特殊情況下叹侄,可能會人為在事件流過程中刻意發(fā)出error
事件)
在一個正確實現(xiàn)的事件流中巩搏,都應(yīng)該有一個onComplete
或onError
作為事件流的最后一個事件,并且這兩者也是互斥趾代,發(fā)出了其中一個事件贯底,另一個事件就不應(yīng)該再被發(fā)出。
2.3 RxJava = Observer + 異步處理
本節(jié)參考自馬士兵教育視頻
3撒强、相關(guān)概念
3.1 函數(shù)式編程
- 函數(shù)式編程是與面向?qū)ο缶幊逃胁町惖囊粋€編程范式禽捆,函數(shù)式編程是一個很大的領(lǐng)域,本文不打算對此做深入分析飘哨、介紹胚想;
- 在函數(shù)式編程范式中,函數(shù)是頭等公民芽隆,可以獨立存在(不像面向?qū)ο笞欠瘮?shù)或稱為方法,必須屬于某個類)胚吁;并且牙躺,函數(shù)可以作為方法的入?yún)ⅲ部梢宰鳛榉椒ǖ姆祷刂担?/li>
- Java是純面向?qū)ο笳Z言囤采,本質(zhì)上是不支持函數(shù)式編程的述呐,但是,通過函數(shù)式接口(一個有且僅有一個抽象方法的接口)蕉毯,可以部分模擬函數(shù)式編程乓搬;
3.2 函數(shù)式接口
- 直接看例子
// 函數(shù)式接口實例思犁,最常見的Runnable接口 Runnable runnable = new Runnable() { @Override public void run() { System.out.println("Runnable is a Functional Interface"); } }; // 函數(shù)式接口FunctionalInterface,只有一個accept方法进肯,有入?yún)⒑头祷刂? interface FunctionalInterface { String accept(int i1, int i2); } // FunctionalInterface實例 FunctionalInterface functionalInterface = new FunctionalInterface() { @Override public String accept(int i1, int i2) { return String.valueOf(i1 + i2); } };
3.3 lambda表達(dá)式
一開始不習(xí)慣的情況下激蹲,可以先像上一節(jié)那樣,先按顯性new實例的方式寫出代碼江掩,然后光標(biāo)移動到Android Studio標(biāo)成灰色字的部分(new FunctionalInterface處)学辱,敲擊alt + enter,即可通過IDE直接進(jìn)行l(wèi)ambda改造环形。
- Lambda 表達(dá)式策泣,也可稱為閉包,它是推動 Java 8 發(fā)布的最重要新特性抬吟。
- Lambda 允許把函數(shù)作為一個方法的參數(shù)(函數(shù)作為參數(shù)傳遞進(jìn)方法中)萨咕。
- 使用 Lambda 表達(dá)式可以使代碼變的更加簡潔緊湊。(下一章火本,看示例代碼會深有感觸)
- 我們把上節(jié)的例子做一下lambda改造
// Runnable實例危队,lambda形式,() -> { statement; }; Runnable runnable = () -> { System.out.println("Runnable is a Functional Interface"); }; // 當(dāng)方法體只有一行時钙畔,可以進(jìn)一步簡寫茫陆,() -> statement; Runnable runnable = () -> System.out.println("Runnable is a Functional Interface"); // FunctionalInterface實例,lambda形式擎析,(param1, param2, ...) -> { return expression; }; FunctionalInterface functionalInterface = (i1, i2) -> { return String.valueOf(i1 + i2); }; // 當(dāng)方法體只有一行時簿盅,可以進(jìn)一步簡寫,(param1, param2, ...) -> expression; FunctionalInterface functionalInterface = (i1, i2) -> String.valueOf(i1 + i2);
4叔锐、RxJava2的使用
基于以下RxJava版本:
'io.reactivex.rxjava2:rxjava:2.2.11'
4.1 最簡單的示例(create創(chuàng)建挪鹏、subscribe訂閱)
-
示例代碼
Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() { @Override public void subscribe(ObservableEmitter<String> emitter) throws Exception { System.out.println(Thread.currentThread().getName() + " start to emit"); emitter.onNext("Hello"); emitter.onNext("CodingDog1024"); } }); Consumer<String> consumer = new Consumer<String>() { @Override public void accept(String s) throws Exception { System.out.println(Thread.currentThread().getName() + " consumer accept: " + s); } }; observable.subscribe(consumer);
-
輸出
main start to emit main consumer accept: Hello main consumer accept: CodingDog1024
-
lambda化 (對比前面的代碼,可以看到代碼明顯簡潔緊湊很多)
Observable<String> observable = Observable.create(emitter -> { System.out.println(Thread.currentThread().getName() + " start to emit"); emitter.onNext("Hello"); emitter.onNext("CodingDog1024"); }); Consumer<String> consumer = s -> System.out.println(Thread.currentThread().getName() + " consumer accept: " + s); observable.subscribe(consumer);
-
鏈?zhǔn)秸{(diào)用 (可以看到愉烙,結(jié)合lambda和鏈?zhǔn)秸{(diào)用讨盒,代碼更加的緊湊,少了很多干擾性的代碼步责,讓我們可以更加聚焦于業(yè)務(wù)邏輯)
Observable .<String>create(emitter -> { System.out.println(Thread.currentThread().getName() + " start to emit"); emitter.onNext("Hello"); emitter.onNext("CodingDog1024"); }) .subscribe(s -> System.out.println(Thread.currentThread().getName() + " consumer accept: " + s));
后續(xù)的示例代碼都會以lambda和鏈?zhǔn)秸{(diào)用結(jié)合的方式給出返顺,除非特別需要說明具體類型的情況
4.2 map轉(zhuǎn)換
現(xiàn)在我們來將上一節(jié)例子里的
String
轉(zhuǎn)成大寫字母;相信很多人的第一反應(yīng)是在
subscribe
方法里蔓肯,打印時調(diào)用s
的toUpperCase
方法遂鹊;這個方式當(dāng)然可以實現(xiàn)轉(zhuǎn)換成大寫的需求;但是蔗包,從業(yè)務(wù)邏輯解耦秉扑、代碼復(fù)用的角度,我們希望不要改動到原有代碼(只擴展新邏輯、不修改原邏輯舟陆,開閉原則)误澳。
consumer
的邏輯保持最簡單(拿到String
,顯示就是秦躯,沒有任何其他復(fù)雜邏輯)-
此時忆谓,我們可以使用
map
操作符Observable .<String>create(emitter -> { System.out.println(Thread.currentThread().getName() + " start to emit"); emitter.onNext("Hello"); emitter.onNext("CodingDog1024"); }) .map(s -> { String upperCase = s.toUpperCase(); System.out.println(Thread.currentThread().getName() + " map " + s + " -> " + upperCase); return upperCase; }) .subscribe(s -> System.out.println(Thread.currentThread().getName() + " consumer accept: " + s));
-
輸出
main start to emit main map Hello -> HELLO main consumer accept: HELLO main map CodingDog1024 -> CODINGDOG1024 main consumer accept: CODINGDOG1024
-
Function<T, R>
map
方法傳入的是一個Function<T, R>
,泛型T
和R
都為String
踱承,為了更清楚的看下map
方法倡缠,我們將map
方法里的lambda恢復(fù)成匿名類實例的樣子(同時,再次感受lambda的簡潔茎活、緊湊)Observable .<String>create(emitter -> { System.out.println(Thread.currentThread().getName() + " start to emit"); emitter.onNext("Hello"); emitter.onNext("CodingDog1024"); }) .map(new Function<String, String>() { @Override public String apply(String s) throws Exception { String upperCase = s.toUpperCase(); System.out.println(Thread.currentThread().getName() + " map " + s + " -> " + upperCase); return upperCase; } }) .subscribe(s -> System.out.println(Thread.currentThread().getName() + " consumer accept: " + s));
-
輸出(與上面的輸出是一樣的)
main start to emit main map Hello -> HELLO main consumer accept: HELLO main map CodingDog1024 -> CODINGDOG1024 main consumer accept: CODINGDOG1024
4.3 異步處理
- 從上面的輸出可以看到昙沦,目前的邏輯都是跑在主線程的。現(xiàn)在妙色,我們假設(shè)emitter發(fā)出事件的操作是耗時操作桅滋,我們希望這個操作不要阻塞主線程
- 此時慧耍,我們可以使用調(diào)度器(
Scheduler
)身辨,新增一行代碼,即可切換線程 -
Thread.sleep(500)
只是為了模擬耗時芍碧、CountDownLatch
只是為了測試代碼的順利執(zhí)行煌珊,對這個流程沒有任何影響,忽略即可CountDownLatch countDown = new CountDownLatch(2); Observable .<String>create(emitter -> { System.out.println(Thread.currentThread().getName() + " start to emit"); Thread.sleep(500); emitter.onNext("Hello"); Thread.sleep(500); emitter.onNext("CodingDog1024"); }) .subscribeOn(Schedulers.newThread()) // 這一行是新增泌豆,其他的完全沒變 .map(s -> { String upperCase = s.toUpperCase(); System.out.println(Thread.currentThread().getName() + " map " + s + " -> " + upperCase); return upperCase; }) .subscribe(s -> { System.out.println(Thread.currentThread().getName() + " consumer accept: " + s); countDown.countDown(); }); countDown.await();
- 輸出(可以看到定庵,已經(jīng)切換到新線程執(zhí)行)
RxNewThreadScheduler-1 start to emit RxNewThreadScheduler-1 map Hello -> HELLO RxNewThreadScheduler-1 consumer accept: HELLO RxNewThreadScheduler-1 map CodingDog1024 -> CODINGDOG1024 RxNewThreadScheduler-1 consumer accept: CODINGDOG1024
4.4 不同操作執(zhí)行在不同線程
- 示例
CountDownLatch countDown = new CountDownLatch(2); Observable .<String>create(emitter -> { System.out.println(Thread.currentThread().getName() + " start to emit"); Thread.sleep(500); emitter.onNext("Hello"); Thread.sleep(500); emitter.onNext("CodingDog1024"); }) .subscribeOn(Schedulers.newThread()) // emit操作執(zhí)行在newThread .observeOn(Schedulers.computation()) // 接下去的操作(即map操作)執(zhí)行在computation .map(s -> { String upperCase = s.toUpperCase(); System.out.println(Thread.currentThread().getName() + " map " + s + " -> " + upperCase); return upperCase; }) .observeOn(Schedulers.single()) // 接下去的操作(即consumer)執(zhí)行在single .subscribe(s -> { System.out.println(Thread.currentThread().getName() + " consumer accept: " + s); countDown.countDown(); }); countDown.await();
- 輸出
emit
操作執(zhí)行在RxNewThreadScheduler-1
map
操作執(zhí)行在RxComputationThreadPool-1
consumer
執(zhí)行在RxSingleScheduler-1RxNewThreadScheduler-1 start to emit RxComputationThreadPool-1 map Hello -> HELLO RxSingleScheduler-1 consumer accept: HELLO RxComputationThreadPool-1 map CodingDog1024 -> CODINGDOG1024 RxSingleScheduler-1 consumer accept: CODINGDOG1024
4.5 subscribeOn 和 observeOn
- subscribeOn
-
subscribeOn
作用于發(fā)射事件處(如上一節(jié)中的create
方法),多次調(diào)用subscribeOn
方法踪危,將只有離create
最近的一處生效蔬浙。當(dāng)你需要提供接口給外部調(diào)用,如果想要保證發(fā)射事件代碼執(zhí)行在指定調(diào)度器贞远,則可以直接通過subscribeOn
方法設(shè)置調(diào)度器畴博,接口調(diào)用方就算通過subscribeOn
方法設(shè)置其他的調(diào)度器,最終結(jié)果也是在你指定的調(diào)度器里執(zhí)行蓝仲。 - 示例
CountDownLatch countDown = new CountDownLatch(2); Observable .<String>create(emitter -> { System.out.println(Thread.currentThread().getName() + " start to emit"); Thread.sleep(500); emitter.onNext("Hello"); Thread.sleep(500); emitter.onNext("CodingDog1024"); }) .subscribeOn(Schedulers.computation()) // 設(shè)置為computation調(diào)度器俱病,最終結(jié)果為執(zhí)行在computation調(diào)度器 .subscribeOn(Schedulers.io()) // 設(shè)置為io調(diào)度器 .map(s -> { String upperCase = s.toUpperCase(); System.out.println(Thread.currentThread().getName() + " map " + s + " -> " + upperCase); return upperCase; }) .subscribeOn(Schedulers.newThread()) // 設(shè)置newThread調(diào)度器 .subscribe(s -> { System.out.println(Thread.currentThread().getName() + " consumer accept: " + s); countDown.countDown(); }); countDown.await();
- 輸出
RxComputationThreadPool-1 start to emit RxComputationThreadPool-1 map Hello -> HELLO RxComputationThreadPool-1 consumer accept: HELLO RxComputationThreadPool-1 map CodingDog1024 -> CODINGDOG1024 RxComputationThreadPool-1 consumer accept: CODINGDOG1024
-
- observeOn
-
observeOn
影響的是該操作符后續(xù)的事件所運行的線程,多次調(diào)用observeOn
方法袱结,每次調(diào)用互不影響亮隙,可以實現(xiàn)多次切換不同線程。 - 上一節(jié)示例代碼已經(jīng)做了使用到該特性垢夹,兩次調(diào)用
observeOn
溢吻,使map操作執(zhí)行在computation
調(diào)度器、subscribe
執(zhí)行在single
調(diào)度器果元。 - Android開發(fā)中最常用的場景為:
Observable
前面的操作都是在工作線程執(zhí)行(io
促王、computation
等調(diào)度器)掩完,一切處理邏輯執(zhí)行妥當(dāng)后,調(diào)用observeOn
方法將最終的subscribe
切換到UI線程執(zhí)行(AndroidSchedulers.mainThread()
調(diào)度器)硼砰,從而可以在subscribe
方法里更新UI且蓬。
-
4.6 Scheduler類型
4.6.1 RxJava內(nèi)置了5種調(diào)度器
本節(jié)內(nèi)容參考自 《RxJava Essentials 中文版》by yuxingxin
-
Schedulers.io()
這個調(diào)度器時用于I/O操作。它基于根據(jù)需要题翰,增長或縮減來自適應(yīng)的線程池恶阴。我們將使用它來修復(fù)我們之前看到的StrictMode違規(guī)做法。由于它專用于I/O操作豹障,所以并不是RxJava的默認(rèn)方法冯事;正確的使用它是由開發(fā)者決定的。
重點需要注意的是線程池是無限制的血公,大量的I/O調(diào)度操作將創(chuàng)建許多個線程并占用內(nèi)存昵仅。一如既往的是,我們需要在性能和簡捷兩者之間找到一個有效的平衡點累魔。 -
Schedulers.computation()
這個是計算工作默認(rèn)的調(diào)度器摔笤,它與I/O操作無關(guān)。它也是許多RxJava方法的默認(rèn)調(diào)度器:buffer()
,debounce()
,delay()
,interval()
,sample()
,skip()
垦写。 -
Schedulers.immediate()
這個調(diào)度器允許你立即在當(dāng)前線程執(zhí)行你指定的工作吕世。它是timeout()
,timeInterval()
,以及timestamp()
方法默認(rèn)的調(diào)度器。 -
Schedulers.newThread()
這個調(diào)度器正如它所看起來的那樣:它為指定任務(wù)啟動一個新的線程梯投。 -
Schedulers.trampoline()
當(dāng)我們想在當(dāng)前線程執(zhí)行一個任務(wù)時命辖,并不是立即,我們可以用trampoline()
將它入隊分蓖。這個調(diào)度器將會處理它的隊列并且按序運行隊列中每一個任務(wù)尔艇。它是repeat()
和retry()
方法默認(rèn)的調(diào)度器。
4.6.2 RxAndroid額外提供了一個對應(yīng)UI主線程的調(diào)度器
RxAndroid:
'io.reactivex.rxjava2:rxandroid:2.1.1'
-
AndroidSchedulers.mainThread()
在該調(diào)度器執(zhí)行的操作么鹤,會被封裝到Message
终娃,send
到UI Handler
執(zhí)行。
4.6.3 自定義調(diào)度器
-
Schedulers.from(Executor executor)
如果上述的5種內(nèi)置調(diào)度器都不能滿足需求,我們也可以傳入自己定義的Executor
。
5呆盖、操作符(Operators)
完整介紹見:ReactiveX Operators
Rx
提供了非常非常豐富的操作符纫谅,為方便查看操作符含義,ReactiveX官網(wǎng)提供了一種操作符示意圖,下面會舉幾個例子做一下介紹。
5.1 操作符示意圖
-
create
看圖的下半部分,橫向箭頭代表整個事件流搅荞、有顏色的圖形代表事件、豎線代表完成。
create
以我們上面寫過的示例為例咕痛,則是:
----------------Hello------------CodingDog1024---------------|----->
-
map
下圖顯示將原事件流里的每個數(shù)都乘以10的轉(zhuǎn)換痢甘。
map
以我們上面寫過的示例為例,則是:
----------------Hello------------CodingDog1024---------------|----->
map (s -> s.toUpperCase())
----------------HELLO -----------CODINGDOG1024---------------|----->
-
flatMap
與map
操作符返回的是事件(如我們例子中的s.toUpperCase()
)不同茉贡,flatMap
操作符返回的是Observable
塞栅。但是,consumer
收到的依舊是Observable
內(nèi)包含的數(shù)據(jù)腔丧,因此放椰,稱為扁平化(flat
)。
flatMap- 可以將我們上面寫過的例子做一下改造愉粤,使用
flatMap
操作符達(dá)到同樣的功能:Observable .<String>create(emitter -> { System.out.println(Thread.currentThread().getName() + " start to emit"); emitter.onNext("Hello"); emitter.onNext("CodingDog1024"); }) .flatMap(s -> getUpperCase(s)) .subscribe(s -> { System.out.println(Thread.currentThread().getName() + " consumer accept: " + s); }); private Observable<String> getUpperCase(String s) { return Observable.create(emitter -> { String upperCase = s.toUpperCase(); System.out.println(Thread.currentThread().getName() + " getUpperCase " + s + " -> " + upperCase); emitter.onNext(upperCase); emitter.onComplete(); }); }
- 有人可能會有疑問砾医,
flatMap
比起map
明顯難理解得多,為什么要用它衣厘。一個比較常見的場景是如蚜,想象一下如果上述的getUpperCase
方法是其他模塊或sdk提供的接口,我們并不清楚其內(nèi)部實現(xiàn)影暴,但是需要依賴這個接口的功能错邦,此時,就是flatMap
的一個派上用場的時候坤检。
- 可以將我們上面寫過的例子做一下改造愉粤,使用
-
filter
小于等于10的數(shù)據(jù)將被過濾掉兴猩。
filter- 示例
Observable .<String>create(emitter -> { System.out.println(Thread.currentThread().getName() + " start to emit"); emitter.onNext("Hello"); emitter.onNext("CodingDog1024"); }) .filter(s -> s.startsWith("C")) .subscribe(s -> { System.out.println(Thread.currentThread().getName() + " consumer accept: " + s); });
- 輸出
main start to emit main consumer accept: CodingDog1024
- 示例
以我們上面寫過的示例為例,則是:
----------------Hello------------CodingDog1024---------------|----->
filter(s -> s.startsWith("C"))
---------------------------------CodingDog1024---------------|----->
- merge
將兩個以上Observable
合并一起早歇,得到的事件將是所有事件流的一個合集。
merge- 示例
Observable hello = Observable.create(emitter -> { System.out.println(Thread.currentThread().getName() + " hello start to emit"); emitter.onNext("Hello"); }); Observable codingDog = Observable.create(emitter -> { System.out.println(Thread.currentThread().getName() + " codingDog start to emit"); emitter.onNext("CodingDog1024"); }); Observable .merge(hello, codingDog) .subscribe(s -> { System.out.println(Thread.currentThread().getName() + " consumer accept: " + s); });
- 輸出
main hello start to emit main consumer accept: Hello main codingDog start to emit main consumer accept: CodingDog1024
- 示例
----------------Hello----------------------------------------|----->
---------------------------------CodingDog1024---------------|----->
merge
----------------Hello------------CodingDog1024---------------|----->
5.2 Creating Observables
Operators that originate new Observables.
-
Create
— create an Observable from scratch by calling observer methods programmatically -
Defer
— do not create the Observable until the observer subscribes, and create a fresh Observable for each observer -
Empty
/Never
/Throw
— create Observables that have very precise and limited behavior -
From
— convert some other object or data structure into an Observable -
Interval
— create an Observable that emits a sequence of integers spaced by a particular time interval -
Just
— convert an object or a set of objects into an Observable that emits that or those objects -
Range
— create an Observable that emits a range of sequential integers -
Repeat
— create an Observable that emits a particular item or sequence of items repeatedly -
Start
— create an Observable that emits the return value of a function -
Timer
— create an Observable that emits a single item after a given delay
5.3 Transforming Observables
Operators that transform items that are emitted by an Observable.
-
Buffer
— periodically gather items from an Observable into bundles and emit these bundles rather than emitting the items one at a time -
FlatMap
— transform the items emitted by an Observable into Observables, then flatten the emissions from those into a single Observable -
GroupBy
— divide an Observable into a set of Observables that each emit a different group of items from the original Observable, organized by key -
Map
— transform the items emitted by an Observable by applying a function to each item -
Scan
— apply a function to each item emitted by an Observable, sequentially, and emit each successive value -
Window
— periodically subdivide items from an Observable into Observable windows and emit these windows rather than emitting the items one at a time
5.4 Filtering Observables
Operators that selectively emit items from a source Observable.
-
Debounce
— only emit an item from an Observable if a particular timespan has passed without it emitting another item -
Distinct
— suppress duplicate items emitted by an Observable -
ElementAt
— emit only item n emitted by an Observable -
Filter
— emit only those items from an Observable that pass a predicate test -
First
— emit only the first item, or the first item that meets a condition, from an Observable -
IgnoreElements
— do not emit any items from an Observable but mirror its termination notification -
Last
— emit only the last item emitted by an Observable -
Sample
— emit the most recent item emitted by an Observable within periodic time intervals -
Skip
— suppress the first n items emitted by an Observable -
SkipLast
— suppress the last n items emitted by an Observable -
Take
— emit only the first n items emitted by an Observable -
TakeLast
— emit only the last n items emitted by an Observable
5.5 Combining Observables
Operators that work with multiple source Observables to create a single Observable
-
And
/Then
/When
— combine sets of items emitted by two or more Observables by means ofPattern
andPlan
intermediaries -
CombineLatest
— when an item is emitted by either of two Observables, combine the latest item emitted by each Observable via a specified function and emit items based on the results of this function -
Join
— combine items emitted by two Observables whenever an item from one Observable is emitted during a time window defined according to an item emitted by the other Observable -
Merge
— combine multiple Observables into one by merging their emissions -
StartWith
— emit a specified sequence of items before beginning to emit the items from the source Observable -
Switch
— convert an Observable that emits Observables into a single Observable that emits the items emitted by the most-recently-emitted of those Observables -
Zip
— combine the emissions of multiple Observables together via a specified function and emit single items for each combination based on the results of this function
6讨勤、Subject = Observable + Observer
subject
是一個神奇的對象箭跳,它可以是一個Observable
同時也可以是一個Observer
:它作為連接這兩個世界的一座橋梁。一個Subject
可以訂閱一個Observable
潭千,就像一個Observer
谱姓,并且它可以發(fā)射新的數(shù)據(jù),或者傳遞它接受到的數(shù)據(jù)刨晴,就像一個Observable
屉来。很明顯,作為一個Observable
狈癞,觀察者們或者其它Subject
都可以訂閱它茄靠。
RxJava
提供了4種不同的Subject
:
-
PublishSubject
PublishSubject
會向他的訂閱者發(fā)送訂閱后的數(shù)據(jù)流。 -
BehaviorSubject
BehaviorSubject
會首先向他的訂閱者發(fā)送截至訂閱前最新的一個數(shù)據(jù)對象(或初始值)蝶桶,然后正常發(fā)送訂閱后的數(shù)據(jù)流慨绳。 -
ReplaySubject
ReplaySubject
會緩存它所訂閱的所有數(shù)據(jù),向任意一個訂閱它的觀察者重發(fā)。 -
AsyncSubject
當(dāng)Observable
完成時AsyncSubject
只會發(fā)布最后一個數(shù)據(jù)給已經(jīng)訂閱的每一個觀察者脐雪。
6.1 PublishSubject
- 示例
PublishSubject<String> publishSubject = PublishSubject.create(); Consumer<String> consumer0 = s -> System.out.println(Thread.currentThread().getName() + " consumer0 accept: " + s); publishSubject.subscribe(consumer0); publishSubject.onNext("Hello"); publishSubject.onNext("CodingDog1024"); Consumer<String> consumer1 = s -> System.out.println(Thread.currentThread().getName() + " consumer1 accept: " + s); publishSubject.subscribe(consumer1); publishSubject.onNext("I"); publishSubject.onNext("am"); publishSubject.onNext("RxJava");
- 輸出
main consumer0 accept: Hello main consumer0 accept: CodingDog1024 main consumer0 accept: I main consumer1 accept: I main consumer0 accept: am main consumer1 accept: am main consumer0 accept: RxJava main consumer1 accept: RxJava
- 說明
consumer0
收到完整的"Hello"
厌小、"CodingDog1024"
、"I"
战秋、"am"
璧亚、"RxJava"
。
consumer1
只收到其訂閱之后來到的"I"
脂信、"am"
涨岁、"RxJava"
。
PublishSubject
的行為就類似我們常見的addXXXListener
注冊監(jiān)聽吉嚣,consumer
可以接收到其訂閱之后的所有event
梢薪。
6.1 BehaviorSubject
- 示例
BehaviorSubject<String> behaviorSubject = BehaviorSubject.create(); Consumer<String> consumer0 = s -> System.out.println(Thread.currentThread().getName() + " consumer0 accept: " + s); behaviorSubject.subscribe(consumer0); behaviorSubject.onNext("Hello"); behaviorSubject.onNext("CodingDog1024"); Consumer<String> consumer1 = s -> System.out.println(Thread.currentThread().getName() + " consumer1 accept: " + s); behaviorSubject.subscribe(consumer1); behaviorSubject.onNext("I"); behaviorSubject.onNext("am"); behaviorSubject.onNext("RxJava");
- 輸出
main consumer0 accept: Hello main consumer0 accept: CodingDog1024 main consumer1 accept: CodingDog1024 main consumer0 accept: I main consumer1 accept: I main consumer0 accept: am main consumer1 accept: am main consumer0 accept: RxJava main consumer1 accept: RxJava
- 說明
consumer0
收到完整的"Hello"
、"CodingDog1024"
尝哆、"I"
秉撇、"am"
、"RxJava"
秋泄。
consumer1
收到"CodingDog1024"
琐馆、"I"
、"am"
恒序、"RxJava"
瘦麸。(比上節(jié)PublishSubject
的例子,多了"CodingDog1024"
)
訂閱BehaviorSubject
時歧胁,consumer
會先收到最新的一個event
滋饲,然后再接收到之后到來的所有event
。 - 使用場景
在很常見的【先獲取一次值喊巍,執(zhí)行邏輯屠缭,然后值變化時需要重新執(zhí)行邏輯】的場景下,使用BehaviorSubject
可以很自然的實現(xiàn)崭参。
7呵曹、How it works ?
基于以下RxJava版本:
'io.reactivex.rxjava2:rxjava:2.2.11'
7.1 Observable.create() & subscribe()
7.1.1 示例 & 實現(xiàn)代碼走讀
-
sample
// (1)實例化【observableOnSubscribe1】 ObservableOnSubscribe<String> observableOnSubscribe1 = new ObservableOnSubscribe<String>() { // (11)第(10)步中何暮,調(diào)用了subscribe方法奄喂,入?yún)閜arent,parent為第(5)步observer的一個代理 @Override public void subscribe(ObservableEmitter<String> emitter) throws Exception { // (12)調(diào)用emitter onNext方法海洼。通過parent跨新,最終轉(zhuǎn)調(diào)到第(5)步observer的onNext方法 emitter.onNext("CodingDog1024"); } }; // (4)返回值為一個ObservableCreate實例 Observable<String> observable1 = Observable.create(observableOnSubscribe1); // (5)實例化【observer1】 Observer observer1 = new Observer<String>() { @Override public void onSubscribe(Disposable d) { System.out.println("onSubscribe"); } // (13) 收到事件/數(shù)據(jù) @Override public void onNext(String s) { System.out.println("onNext " + s); } @Override public void onError(Throwable e) { System.out.println("Throwable " + e.getMessage()); } @Override public void onComplete() { System.out.println("onComplete"); } }; // (6)訂閱 observable1.subscribe(observer1);
-
輸出
onSubscribe onNext CodingDog1024
-
create
源碼// 代碼出處:Observable // (2)source即為【observableOnSubscribe1】 public static <T> Observable<T> create(ObservableOnSubscribe<T> source) { ObjectHelper.requireNonNull(source, "source is null"); return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source)); } // 代碼出處:RxJavaPlugins public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) { // 默認(rèn)onObservableAssembly為null,因此贰军,該方法可以視為直接return source Function<? super Observable, ? extends Observable> f = onObservableAssembly; if (f != null) { return apply(f, source); } return source; } // 代碼出處:ObservableCreate public final class ObservableCreate<T> extends Observable<T> { // source即為【observableOnSubscribe1】 final ObservableOnSubscribe<T> source; // (3)source即為【observableOnSubscribe1】 public ObservableCreate(ObservableOnSubscribe<T> source) { this.source = source; } // (9)observer即為【observer1】 @Override protected void subscribeActual(Observer<? super T> observer) { // CreateEmitter為observer的代理玻蝌,增加一些異常處理蟹肘,可以先直接理解為只是轉(zhuǎn)調(diào)observer的方法 CreateEmitter<T> parent = new CreateEmitter<T>(observer); observer.onSubscribe(parent); try { // (10)該subscribe方法即為第(11)步的【subscribe】方法,這里就是create和subscribe的連接處8┦鳌A备埂! source.subscribe(parent); } catch (Throwable ex) { Exceptions.throwIfFatal(ex); parent.onError(ex); } } ... ... }
-
subscribe
源碼// 代碼出處:Observable // (7)observer即為【observer1】 public final void subscribe(Observer<? super T> observer) { ... ... observer = RxJavaPlugins.onSubscribe(this, observer); subscribeActual(observer); ... ... } // 代碼出處:Observable // (8)observer即為【observer1】 protected abstract void subscribeActual(Observer<? super T> observer);
7.1.2 簡單概括一下
-
sample代碼
// (1)實例化【observableOnSubscribe1】 ObservableOnSubscribe<String> observableOnSubscribe1 = new ObservableOnSubscribe<String>() { // (11)第(10)步中许饿,調(diào)用了subscribe方法阳欲,入?yún)閜arent,parent為第(5)步observer的一個代理 @Override public void subscribe(ObservableEmitter<String> emitter) throws Exception { // (12)調(diào)用emitter onNext方法陋率。通過parent球化,最終轉(zhuǎn)調(diào)到第(5)步observer的onNext方法 emitter.onNext("CodingDog1024"); } }; // (4)返回值為一個ObservableCreate實例 Observable<String> observable1 = Observable.create(observableOnSubscribe1); // (5)實例化【observer1】 Observer observer1 = new Observer<String>() { @Override public void onSubscribe(Disposable d) { System.out.println("onSubscribe"); } // (13) 收到事件/數(shù)據(jù) @Override public void onNext(String s) { System.out.println("onNext " + s); } @Override public void onError(Throwable e) { System.out.println("Throwable " + e.getMessage()); } @Override public void onComplete() { System.out.println("onComplete"); } }; // (6)訂閱 observable1.subscribe(observer1);
-
文字概括
-
Observable.create
方法入?yún)橐粋€ObservableOnSubscribe
實例observableOnSubscribe1
,返回值為一個ObservableCreate
實例observable1
瓦糟,observable1
持有observableOnSubscribe1
(source
) - 以入?yún)?code>Observer實例
observer1
調(diào)用subscribe
方法時筒愚,實際調(diào)用的就是ObservableCreate
的subscribe
方法,接著調(diào)到ObservableCreate
的subscribeActual(Observer observer)
方法 -
subscribeActual
方法里new
了一個CreateEmitter
實例parent
菩浙,parent
持有上述observer1
-
subscribeActual
方法里執(zhí)行source.subscribe(parent)
巢掺,這個source
即為第1步的入?yún)?code>source,因此劲蜻,此處就是執(zhí)行了observableOnSubscribe1
的subscribe
方法 -
observableOnSubscribe1
的subscribe
方法執(zhí)行了emitter.onNext("CodingDog1024");
- 也就是執(zhí)行了第3步中
CreateEmitter
實例parent
的onNext
方法 -
CreateEmitter
的核心邏輯是調(diào)用持有的Observer
實例observer
的對應(yīng)方法陆淀,即調(diào)用第2步入?yún)?code>Observer實例observer1
的onNext
方法,即打印log的方法@Override public void onNext(String s) { System.out.println("onNext " + s); }
-
-
提取最核心邏輯先嬉,簡化理解
前一段總共列了7步轧苫,出現(xiàn)了數(shù)量眾多的類,理解起來稍微有點復(fù)雜疫蔓。我們做一下簡化含懊,提取最核心邏輯,上述分析中鳄袍,ObservableCreate
和Observable
描述的是同一個東西绢要,我們統(tǒng)一視為Observable
;CreateEmitter
對象parent
其實就是Observer
對象observer
的一個代理拗小,最終調(diào)用的是observer
的方法,我們暫時忽略代理類邏輯樱哼,直接將兩者統(tǒng)一視為observer
哀九;這么合并之后,前一段的分析就變成了:-
Observable.create
方法入?yún)?code>observableOnSubscribe1(或者稱為source
)搅幅,返回值為observable
阅束,observable
持有source
-
observable.subscribe(observer)
,最終會調(diào)用到source
的subscribe
方法(入?yún)?code>observer) -
source
的subscribe
方法里茄唐,執(zhí)行了observer
的onNext
方法
-
7.2 Observable.map()
7.2.1 示例 & 實現(xiàn)代碼走讀
-
sample
在上一節(jié)例子基礎(chǔ)上息裸,修改3處蝇更,見下方修改點1 2 3。// (1)實例化【observableOnSubscribe1】 ObservableOnSubscribe<String> observableOnSubscribe1 = new ObservableOnSubscribe<String>() { // (11)第(10)步中呼盆,調(diào)用了subscribe方法年扩,入?yún)閜arent,parent為第(5)步observer的一個代理 @Override public void subscribe(ObservableEmitter<String> emitter) throws Exception { // (12)調(diào)用emitter onNext方法访圃。通過parent厨幻,最終轉(zhuǎn)調(diào)到第(5)步observer的onNext方法 emitter.onNext("CodingDog1024"); } }; // (4)返回值為一個ObservableCreate實例 Observable<String> observable1 = Observable.create(observableOnSubscribe1); // 修改點1:新增將String轉(zhuǎn)成大寫的mapper Function<String, String> mapper1 = new Function<String, String>() { @Override public String apply(String s) throws Exception { return s.toUpperCase(); } }; // 修改點2:observable1應(yīng)用mapper得到新的Observable實例observable2,具體類型為ObservableMap Observable<String> observable2 = observable1.map(mapper1); // (5)實例化【observer1】 Observer observer1 = new Observer<String>() { @Override public void onSubscribe(Disposable d) { System.out.println("onSubscribe"); } // (13) 收到事件/數(shù)據(jù) @Override public void onNext(String s) { System.out.println("onNext " + s); } @Override public void onError(Throwable e) { System.out.println("Throwable " + e.getMessage()); } @Override public void onComplete() { System.out.println("onComplete"); } }; // 修改點3:訂閱的是應(yīng)用了mapper后的observable2 // (6)訂閱 observable2.subscribe(observer1);
-
輸出
onSubscribe onNext CODINGDOG1024
-
map
源碼// 代碼出處:Observable public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) { ObjectHelper.requireNonNull(mapper, "mapper is null"); // observable1上調(diào)用的map方法腿时,因此况脆,this就是observable1,mapper就是mapper1 return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper)); } // 代碼出處:ObservableMap public final class ObservableMap<T, U> extends AbstractObservableWithUpstream<T, U> { final Function<? super T, ? extends U> function; // source就是observable1批糟,function就是mapper1 public ObservableMap(ObservableSource<T> source, Function<? super T, ? extends U> function) { super(source); this.function = function; } @Override public void subscribeActual(Observer<? super U> t) { // source就是observable1格了,function就是mapper1,t就是observer1 source.subscribe(new MapObserver<T, U>(t, function)); } ... ... } // 代碼出處:ObservableMap static final class MapObserver<T, U> extends BasicFuseableObserver<T, U> { final Function<? super T, ? extends U> mapper; // actual就是observer1徽鼎,mapper就是mapper1 MapObserver(Observer<? super U> actual, Function<? super T, ? extends U> mapper) { // super方法盛末,將actual賦給downstream,也就是說: // downstream就是observer1 super(actual); this.mapper = mapper; } @Override public void onNext(T t) { ... ... U v; ... ... // 調(diào)用mapper.apply纬傲,將上游的輸入 t 轉(zhuǎn)為 v v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value."); ... ... // 調(diào)用下游的onNext方法满败,入?yún)?v downstream.onNext(v); } ... ... }
-
分析
Observable create map subscribe
7.3 Observable.filter()
7.3.1 示例 & 實現(xiàn)代碼走讀
-
sample
ObservableOnSubscribe<String> observableOnSubscribe1 = new ObservableOnSubscribe<String>() { @Override public void subscribe(ObservableEmitter<String> emitter) throws Exception { emitter.onNext("Hello"); emitter.onNext("CodingDog1024"); } }; // (4)返回值為一個ObservableCreate實例 Observable<String> observable1 = Observable.create(observableOnSubscribe1); Predicate<String> predicate1 = new Predicate<String>() { @Override public boolean test(String s) throws Exception { return s.startsWith("C"); } }; // observable2具體類型為ObservableFilter,內(nèi)部只有predicate1 Observable<String> observable2 = observable1.filter(predicate1); Observer observer1 = new Observer<String>() { @Override public void onSubscribe(Disposable d) { System.out.println("onSubscribe"); } @Override public void onNext(String s) { System.out.println("onNext " + s); } @Override public void onError(Throwable e) { System.out.println("Throwable " + e.getMessage()); } @Override public void onComplete() { System.out.println("onComplete"); } }; observable2.subscribe(observer1);
-
輸出
onSubscribe onNext CodingDog1024
-
filter
源碼// 代碼出處:Observable public final Observable<T> filter(Predicate<? super T> predicate) { ObjectHelper.requireNonNull(predicate, "predicate is null"); // observable1上調(diào)用的filter方法叹括,因此算墨,this就是observable1,predicate就是predicate1 return RxJavaPlugins.onAssembly(new ObservableFilter<T>(this, predicate)); } // 代碼出處:ObservableFilter public final class ObservableFilter<T> extends AbstractObservableWithUpstream<T, T> { final Predicate<? super T> predicate; // source就是observable1汁雷,predicate就是predicate1 public ObservableFilter(ObservableSource<T> source, Predicate<? super T> predicate) { super(source); this.predicate = predicate; } @Override public void subscribeActual(Observer<? super T> observer) { // source就是observable1净嘀,predicate就是predicate1,observer就是observer1 source.subscribe(new FilterObserver<T>(observer, predicate)); } ... ... } // 代碼出處:ObservableFilter static final class FilterObserver<T> extends BasicFuseableObserver<T, T> { final Predicate<? super T> filter; // actual就是observer1侠讯,filter就是predicate1 FilterObserver(Observer<? super T> actual, Predicate<? super T> filter) { // super方法挖藏,將actual賦給downstream,也就是說: // downstream就是observer1 super(actual); this.filter = filter; } @Override public void onNext(T t) { ... ... boolean b; ... ... // 調(diào)用filter.test方法厢漩,入?yún)樯嫌蔚妮斎?t b = filter.test(t); ... ... // filter.test結(jié)果為true膜眠,才調(diào)用下游的onNext方法,入?yún)?t if (b) { downstream.onNext(t); } ... ... } ... ... }
-
分析
Observable create filter subscribe