RxJava2 響應(yīng)式編程介紹
響應(yīng)式編程
基本概念
同步/異步: 關(guān)注的是消息通信機(jī)制颂鸿,同步是指 發(fā)出一個(gè)調(diào)用净捅,在沒有得到結(jié)果之前,該調(diào)用就不返回拣技,但是一旦調(diào)用返回,就得到返回值了; 而異步是指 調(diào)用發(fā)出后藕届,調(diào)用直接返回本刽,但不會(huì)立刻得到調(diào)用的結(jié)果鲸湃。而是在調(diào)用發(fā)出后,被調(diào)用者通過狀態(tài)子寓、通知來通知調(diào)用者暗挑,或通過回調(diào)函數(shù)處理這個(gè)調(diào)用; 異步強(qiáng)調(diào)被動(dòng)通知别瞭。
阻塞/非阻塞:關(guān)注的是程序在等待調(diào)用結(jié)果(消息窿祥,返回值)時(shí)的狀態(tài);阻塞調(diào)用是指調(diào)用結(jié)果返回之前蝙寨,當(dāng)前線程會(huì)被掛起晒衩。調(diào)用線程只有在得到結(jié)果之后才會(huì)返回。 非阻塞調(diào)用指在不能立刻得到結(jié)果之前墙歪,該調(diào)用不會(huì)阻塞當(dāng)前線程听系。如java8 stream為阻塞式,F(xiàn)uture為非阻塞式的虹菲; 非阻塞強(qiáng)調(diào)狀態(tài)主動(dòng)輪詢靠胜。
并發(fā)(Concurrency)與并行(parallelism): 并發(fā)是一個(gè)更通用的概念,兩個(gè)并發(fā)線程被調(diào)度在一個(gè)單核處理上運(yùn)行毕源,這就是并發(fā)性浪漠,而非并行. 并行性更可能出現(xiàn)在多核,多CPU或分布式系統(tǒng)上霎褐。編寫代碼時(shí)一般不區(qū)分兩者址愿。
函數(shù)式編程Functional Programming):函數(shù)式編程將程序描述為表達(dá)式和變換,以數(shù)學(xué)方程的形式建立模型冻璃,并盡量避免可變的狀態(tài)响谓。每個(gè)邏輯分類(filter,map损合,reduce等)都由不同函數(shù)所表示,這些實(shí)現(xiàn)底層次的變換娘纷,而用戶定義的高階函數(shù)作為參數(shù)來實(shí)現(xiàn)真正的業(yè)務(wù)嫁审。
函數(shù)響應(yīng)式編程(Functional Reactive Programming)):響應(yīng)式編程是建立在觀者者模式上的一種編程范式,對(duì)異步數(shù)據(jù)流進(jìn)行編程赖晶,同時(shí)該事件流是按時(shí)間排序的序列(不同于java8中的stream)律适;雖然響應(yīng)式編程框架不一定要求是函數(shù)式的,但是RxJava等響應(yīng)式編程框架都是結(jié)合了函數(shù)式編程的嬉探。
其它名詞: https://github.com/reactivemanifesto/reactivemanifesto/blob/master/glossary.zh-cn.md
響應(yīng)式宣言
響應(yīng)式宣言,Reactive Manifesto: 來自不同領(lǐng)域的組織正在不約而同地發(fā)現(xiàn)一些看起來如出一轍的軟件構(gòu)建模式擦耀。它們的系統(tǒng)更加穩(wěn)健,更加有可回復(fù)性涩堤,更加靈活眷蜓,并且以更好的定位來滿足現(xiàn)代的需求。
響應(yīng)式宣言針對(duì)一個(gè)系統(tǒng)而言胎围,并不等同于 響應(yīng)式編程規(guī)范吁系,響應(yīng)式系統(tǒng)應(yīng)該滿足如下特點(diǎn):
- 反應(yīng)靈敏的[Responsive]:只要有可能,系統(tǒng)就會(huì)及時(shí)響應(yīng)白魂。
- 有回復(fù)性的[Resilient]:系統(tǒng)在面臨故障時(shí)也能保持及時(shí)響應(yīng)汽纤。
- 可伸縮的[Elastic]:系統(tǒng)在變化的工作負(fù)載下保持及時(shí)響應(yīng)。
- 消息驅(qū)動(dòng)的[Message Driven]:響應(yīng)式系統(tǒng)依賴異步消息傳遞來建立組件之間的界限福荸,這一界限確保了松耦合蕴坪,隔離,位置透明性等特性的實(shí)現(xiàn)敬锐,還提供了以消息的形式把故障委派出去的手段背传。
響應(yīng)式擴(kuò)展
響應(yīng)式擴(kuò)展(Reactive Extensions, 簡寫為ReactiveX,Rx),最初是LINQ的一個(gè)擴(kuò)展台夺,由微軟的架構(gòu)師Erik Meijer領(lǐng)導(dǎo)的團(tuán)隊(duì)開發(fā)径玖,在2012年11月開源[1];
ReactiveX = Observer Pattern + Iterator Pattern + Functional Programming颤介。Rx 讓開發(fā)者可以利用可觀察序列和LINQ風(fēng)格查詢操作符來編寫異步和基于事件的程序;
ReactiveX是一個(gè)編程模型梳星,目標(biāo)是提供一致的編程接口,幫助開發(fā)者更方便的處理異步數(shù)據(jù)流滚朵,ReactiveX的思想是跨平臺(tái)的冤灾,學(xué)習(xí)其他語言的基本語法之后,可以做到 “l(fā)earn once, write everywhere”;
Rx近幾年越來越流行了辕近,現(xiàn)在已經(jīng)支持幾乎全部的流行編程語言了韵吨,Rx的大部分語言庫由ReactiveX這個(gè)組織負(fù)責(zé)維護(hù),比較流行的有RxJava/RxJS/Rx.NET亏推,社區(qū)網(wǎng)站是 reactivex.io学赛。
RxJava 是 ReactiveX 規(guī)范的JVM平臺(tái)實(shí)現(xiàn);
響應(yīng)式流規(guī)范
Reactive Streams 規(guī)范提供一個(gè)非堵塞的異步流處理的背壓(backpressure)標(biāo)準(zhǔn);Reactive Streams的目標(biāo)是增加抽象層吞杭,而不是進(jìn)行底層的流處理盏浇,規(guī)范將這些問題留給了庫實(shí)現(xiàn)來解決。
對(duì)于JVM芽狗,目前已經(jīng)有多個(gè)庫實(shí)現(xiàn)該標(biāo)準(zhǔn)绢掰,RxJava2, akka-streams(Playing 有使用),Reactor(Spring Reactive有使用) 等;
統(tǒng)一標(biāo)準(zhǔn)的好處就是 各個(gè)實(shí)現(xiàn)產(chǎn)生的數(shù)據(jù)可以方便的轉(zhuǎn)換和消費(fèi)童擎;
示例
Path filePath = Paths.get("build.gradle");
// RxJava2 to Reactor
Flowable<String> flowable = Flowable
.fromCallable(() -> Files.readAllLines(filePath))
.flatMap(x -> Flowable.fromIterable(x));
Flux.from(flowable).count().subscribe(System.out::println);
// Reactor to RxJava2
try
{
Flux<String> flux = Flux.fromIterable(Files.readAllLines(filePath));
Flowable.fromPublisher(flux).count()
.subscribe(System.out::println);
}
catch (IOException e)
{
e.printStackTrace();
}
-
Reactive Streams JVM接口由以下四個(gè)interface 組成:
Publisher : 消息發(fā)布者
Subscriber : 消息訂閱者
Subscription : 一個(gè)訂閱
Processor : Publisher + Subscriber 的結(jié)合體
-
Reactive Streams 規(guī)范主要目標(biāo):
- 通過異步邊界(Asynchronous Boundary)來解耦系統(tǒng)組件滴劲。 解偶的先決條件,分離事件/數(shù)據(jù)流的發(fā)送方和接收方的資源使用;
- 為背壓( back pressure ) 處理定義一種模型顾复。流處理的理想范式是將數(shù)據(jù)從發(fā)布者推送到訂閱者班挖,這樣發(fā)布者就可以快速發(fā)布數(shù)據(jù),同時(shí)通過壓力處理來確保速度更快的發(fā)布者不會(huì)對(duì)速度較慢的訂閱者造成過載芯砸。背壓處理通過使用流控制來確保操作的穩(wěn)定性并能實(shí)現(xiàn)優(yōu)雅降級(jí)萧芙,從而提供彈性能力。
該規(guī)范將包含在 JDK 9 的java.util.concurrent.Flow 類中假丧,包含四個(gè)接口類双揪。
RxJava 基礎(chǔ)
RxJava現(xiàn)狀
- RxJava 項(xiàng)目地址 https://github.com/ReactiveX/RxJava
- RxJava 1.x 先于 Reactive Streams 規(guī)范出現(xiàn),部分接口支持Reactive Streams 規(guī)范;
- RxJava 2.0 于 2016.10.29 正式發(fā)布包帚,已經(jīng)按照Reactive-Streams 規(guī)范完全的重寫, 基于Java8+;
- RxJava 2.0已經(jīng)獨(dú)立于RxJava 1.x而存在渔期,即 RxJava2(io.reactivex.) 使用與RxJava1(rx.) 不同的包名。
- RxJava 目前在Android 開發(fā)上應(yīng)用較多渴邦;
為什么選擇 RxJava2
與其它編程模式/庫相比
- Rx擴(kuò)展了觀察者模式用于支持?jǐn)?shù)據(jù)和事件序列栽烂,通過一些的操作符號(hào)箭养,統(tǒng)一風(fēng)格,用戶無需關(guān)注底層的實(shí)現(xiàn):如線程、同步购城、線程安全、并發(fā)數(shù)據(jù)結(jié)構(gòu)和非阻塞IO兜辞;
單個(gè)數(shù)據(jù) | 多個(gè)數(shù)據(jù) | |
---|---|---|
同步 | T getData() |
Iterable<T> getData() |
異步 | Future<T> getData() |
Observable<T> getData() |
對(duì)于單條數(shù)據(jù)可以選擇Future 模式蜻懦,但是多條異步數(shù)據(jù)組合,F(xiàn)uture就相對(duì)不方便纤垂;
對(duì)于同步的多條數(shù)據(jù)矾策,Observable/Flowable 和 Java8 Stream 都要比Iterable更加方便。
對(duì)比 Java 8 Stream,都是屬于函數(shù)式編程(Monad)峭沦,Stream主要是對(duì)數(shù)據(jù)集的阻塞處理贾虽, 而Rx 是非阻塞的,RxJava 提供更加豐富的操作符吼鱼。
對(duì)比 Java8 Stream, Rx 中的數(shù)據(jù)流/事件流蓬豁,擴(kuò)展到了時(shí)間維度绰咽,在時(shí)間維度上處理發(fā)射的數(shù)據(jù),所以很多操作符都是與時(shí)間有關(guān)的地粪。
對(duì)比 Reactor-core, 都遵循響應(yīng)式流規(guī)范取募,Reactor(Flux)和RxJava2(Flowable)可以相互轉(zhuǎn)換, Reactor更多的依賴java8的函數(shù)式接口,RxJava2 所有函數(shù)式接口都提供異常拋出蟆技,在寫代碼時(shí)更加便利玩敏。
RxJava 函數(shù)式風(fēng)格,簡化代碼(Rx的操作符通通常可以將復(fù)雜的難題簡化為很少的幾行代碼), 異步錯(cuò)誤處理,輕松使用并發(fā);
RxJava 1 vs RxJava 2
- RxJava 2x 不再支持 null 值质礼,如果傳入一個(gè)null會(huì)拋出 NullPointerException
Observable.just(null);
Single.just(null);
Flowable.just(null);
Maybe.just(null);
Observable.fromCallable(() -> null)
.subscribe(System.out::println, Throwable::printStackTrace);
Observable.just(1).map(v -> null)
.subscribe(System.out::println, Throwable::printStackTrace);
RxJava2 所有的函數(shù)接口(Function/Action/Consumer)均設(shè)計(jì)為可拋出Exception旺聚,解決編譯異常需要轉(zhuǎn)換問題。
RxJava1 中Observable不能很好支持背壓眶蕉,在RxJava2 中將Oberservable實(shí)現(xiàn)成不支持背壓砰粹,而新增Flowable 來支持背壓。
詳細(xì)參考請(qǐng)參考[5].
RxJava2中的響應(yīng)式類
RxJava2 主要類關(guān)系圖
如下圖所示妻坝,為RxJava2中的主要類關(guān)系圖伸眶,可清晰知道各響應(yīng)式類的聯(lián)系和區(qū)別。后面無特別說明均以Flowable說明刽宪。Flowable & Observable
Observable: 不支持背壓厘贼;
Flowable : Observable新的實(shí)現(xiàn),支持背壓圣拄,同時(shí)實(shí)現(xiàn)Reactive Streams 的 Publisher 接口嘴秸。
-
什么時(shí)候用 Observable:
- 一般處理最大不超過1000條數(shù)據(jù),并且?guī)缀醪粫?huì)出現(xiàn)內(nèi)存溢出庇谆;
- 如果式GUI 鼠標(biāo)事件岳掐,頻率不超過1000 Hz,基本上不會(huì)背壓(可以結(jié)合 sampling/debouncing 操作);
- 如果處理的式同步流而你的Java平臺(tái)又不支持Java Stream(如果有異常處理饭耳,Observable 比Stream也更適合);
-
什么時(shí)候用 Flowable:
- 處理以某種方式產(chǎn)生超過10K的元素串述;
- 文件讀取與分析,例如 讀取指定行數(shù)的請(qǐng)求寞肖;
- 通過JDBC 讀取數(shù)據(jù)庫記錄纲酗, 也是一個(gè)阻塞的和基于拉取模式,并且由ResultSet.next() 控制新蟆;
- 網(wǎng)絡(luò)IO流;
- 有很多的阻塞和/或 基于拉取的數(shù)據(jù)源觅赊,但是又想得到一個(gè)響應(yīng)式非阻塞接口的。
Single & Completable & Maybe
- Single: 可以發(fā)射一個(gè)單獨(dú)onSuccess 或 onError消息琼稻。它現(xiàn)在按照Reactive-Streams規(guī)范被重新設(shè)計(jì),并遵循協(xié)議 onSubscribe (onSuccess | onError)? .SingleObserver改成了如下的接口;
interface SingleObserver<T> {
void onSubscribe(Disposable d);
void onSuccess(T value);
void onError(Throwable error);
}
- Completable: 可以發(fā)送一個(gè)單獨(dú)的成功或異常的信號(hào)吮螺,按照Reactive-Streams規(guī)范被重新設(shè)計(jì),并遵循協(xié)議onSubscribe (onComplete | onError)?
Completable.create(new CompletableOnSubscribe()
{
@Override
public void subscribe(CompletableEmitter e) throws Exception
{
Path filePath = Paths.get("build.gradle");
Files.readAllLines(filePath);
e.onComplete();
}
}).subscribe(() -> System.out.println("OK!"),
Throwable::printStackTrace);
- Maybe:從概念上來說,它是Single 和 Completable 的結(jié)合體。它可以發(fā)射0個(gè)或1個(gè)通知或錯(cuò)誤的信號(hào), 遵循協(xié)議 onSubscribe (onSuccess | onError | onComplete)?鸠补。
Maybe.just(1)
.map(v -> v + 1)
.filter(v -> v == 1)
.defaultIfEmpty(2)
.test()
.assertResult(21);
// java.lang.AssertionError: Values at position 0 differ; Expected: 21 (class: Integer), Actual: 2 (class: Integer) (latch = 0, values = 1, errors = 0, completions = 1)
//
// at io.reactivex.observers.BaseTestConsumer.fail(BaseTestConsumer.java:133)
// ....
RxJava2 的主要操作
我們已經(jīng)知道 RxJava主要特性為為一個(gè)擴(kuò)展的觀察者模式萝风、流式操作和異步編程,支持ReactiveX 規(guī)范給出的一些操作紫岩, 同時(shí)RxJava2 符合響應(yīng)式流規(guī)范闹丐,接下來以Flowable為例,按照功能分類講解RxJava2中的重要操作[9];
創(chuàng)建一個(gè)Flowable
- fromArray & fromIterable & just,直接從數(shù)組或迭代器中產(chǎn)生被因;
List<String> list = Arrays.asList(
"blue", "red", "green", "yellow", "orange", "cyan", "purple"
);
Flowable.fromIterable(list).skip(2).subscribe(System.out::println);
Flowable.fromArray(list.toArray()).subscribe(System.out::println);
Flowable.just("blue").subscribe(System.out::println);
-
fromFuture & fromCallable:
fromFuture, 事件從非主線程中產(chǎn)生; fromCallable, 事件從主線程中產(chǎn)生衫仑, 在需要消費(fèi)時(shí)生產(chǎn)梨与;
ExecutorService executor = Executors.newFixedThreadPool(2);
System.out.println("MAIN: " + Thread.currentThread().getId());
Callable<String> callable = () -> {
System.out.println("callable [" + Thread.currentThread().getId() + "]: ");
Path filePath = Paths.get("build.gradle");
return Files.readAllLines(filePath).stream().flatMap(s -> Arrays.stream(s.split
(""))).count() + "";
};
Future<String> future = executor.submit(callable);
Consumer<String> onNext = v -> System.out
.println("consumer[" + Thread.currentThread().getId() + "]:" + v);
Flowable.fromCallable(callable).subscribe(onNext);
Flowable.fromFuture(future).subscribe(onNext);
System.out.println("END");
fromPublisher ,從標(biāo)準(zhǔn)(Reactive Streams)的發(fā)布者中產(chǎn)生文狱;
-
自定義創(chuàng)建(generate & create)
下面以斐波那契數(shù)列產(chǎn)生為例說明 generate & create的使用粥鞋, generate為RxJava2新增的創(chuàng)建方式。
class Fib
{
long a;
long b;
public Fib(long a, long b)
{
this.a = a;
this.b = b;
}
public long fib()
{
return a + b;
}
}
//斐波那契數(shù)列
Flowable.create(new FlowableOnSubscribe<Fib>()
{
@Override
public void subscribe(FlowableEmitter<Fib> e) throws Exception
{
Fib start = new Fib(1L, 1L);
while (!e.isCancelled()) {
e.onNext(start);
start = new Fib(start.b, start.fib());
}
e.onComplete();
}
}, BackpressureStrategy.BUFFER).map(x -> x.fib()).take(10).subscribe(System.out::println);
Flowable.generate(() -> new Fib(1L, 1L), (x, y) -> {
Fib fib = new Fib(x.b, x.fib());
y.onNext(fib);
return fib;
}).ofType(Fib.class).map(x -> x.fib()).take(10).subscribe(System.out::println);
-
amb & concat & merge, 由多個(gè)Flowable產(chǎn)生結(jié)合;
amb: 給定兩個(gè)或多個(gè)Flowable瞄崇,只發(fā)射最先發(fā)射數(shù)據(jù)的Flowable呻粹,如下面示例中的f1被發(fā)射;
concat: 給定多個(gè)Flowable苏研, 按照Flowable數(shù)組順序,依次發(fā)射數(shù)據(jù)等浊,不會(huì)交錯(cuò),下面示例中f1,f2中數(shù)據(jù)依次發(fā)射;
merge: 給定多個(gè)Flowable摹蘑, 按照Flowable數(shù)組中數(shù)據(jù)發(fā)射的順序組合成新的Flowable筹燕,各Flowable數(shù)據(jù)可能會(huì)交錯(cuò)(等價(jià)于轉(zhuǎn)換操作中的flatMap);
-
switchOnNext:給定能發(fā)射多個(gè)Flowable的Flowable,順序發(fā)射各子Flowable,最新發(fā)射的子Flowable覆蓋當(dāng)前子Flowable中還未發(fā)射的元素(由switchMap實(shí)現(xiàn))衅鹿。
Flowable<String> f1 = Flowable.intervalRange(1, 10, 1, 1, TimeUnit.SECONDS).map(index -> "f1-" + index);
Flowable<String> f2 = Flowable.intervalRange(1, 3, 2, 2, TimeUnit.SECONDS).map(index -> "f2-" + index);
Flowable.ambArray(f1, f2).map(x -> "amb: " + x).subscribe(System.out::println);
System.out.println("----------concat-----------");
Flowable.concat(f1, f2).map(x -> "concat: " + x).subscribe(System.out::println);
System.out.println("----------merge-----------");
Flowable.merge(f1, f2).map(x -> "merge: " + x).subscribe(System.out::println);
Flowable<String>[] flowables = new Flowable[]{f1, f2};
Flowable.switchOnNext(Flowable.intervalRange(0, 2, 0, 3, TimeUnit.SECONDS).map(i -> flowables[i.intValue()]))
.map(x -> "switchOnNext-" + x).subscribe(System.out::println);
Flowable.intervalRange(0, 2, 0, 3, TimeUnit.SECONDS).map(i -> flowables[i.intValue()])
.switchMap((io.reactivex.functions.Function) Functions.identity())
.map(x -> "switchMap-" + x).subscribe(System.out::println);
-
zip & combineLatest, 多Flowable中元素結(jié)合變換
-
zip :每個(gè)Flowable中的元素都按順序結(jié)合變換撒踪,直到元素最少Flowable的已經(jīng)發(fā)射完畢;
-
combineLatest: 每個(gè)Flowable中的發(fā)射的元素都與其他Flowable最近發(fā)射的元素結(jié)合變換大渤,知道所有的Flowable的元素發(fā)射完畢制妄;
-
轉(zhuǎn)換、過濾與聚合操作
在Java8中Stream也有包含這些功能的操作泵三,由于多了時(shí)間這個(gè)維度耕捞,在 RxJava 中操作相對(duì)更加豐富。 這里主要介紹一些重點(diǎn)操作切黔。
-
buffer & groupBy & window
buffer 和 window 都可以按時(shí)間或者元素?cái)?shù)量窗口砸脊,buffer是直接轉(zhuǎn)換成元素集,window是將元素集轉(zhuǎn)換成另一個(gè)Flowable纬霞, groupBy,按照key 來分組凌埂,需要元素發(fā)射完成才能消費(fèi),如果只是對(duì)數(shù)據(jù)處理使用Java8 groupBy更方便诗芜;
Flowable<String> f1 = Flowable.intervalRange(1, 10, 1, 1, TimeUnit.SECONDS).delay((t) ->
Flowable.timer(t % 3 + new Random().nextLong() % 3, TimeUnit.SECONDS))
.map(index -> index % 3 + "-f1-" + index);
f1.buffer(5, TimeUnit.SECONDS).map(x -> "buffer-" + x).subscribe(System.out::println);
f1.window(5, TimeUnit.SECONDS).map(x -> x.toList())
.subscribe(x -> x.subscribe(System.out::println));
Disposable b = f1.groupBy((x) -> x.split("-", 2)[0])
.subscribe(x -> x.toList().subscribe(System.out::println));
Map<String, List<String>> map = f1.toList().blockingGet().stream()
.collect(Collectors.groupingBy((x) -> x.split
("-", 2)[0]));
System.out.println(map);
while (!b.isDisposed()) {
}
- debounce & throttleFirst & sample 按照時(shí)間區(qū)間采集數(shù)據(jù)
debounce 防抖動(dòng)瞳抓,元素發(fā)射后在設(shè)定的超時(shí)時(shí)間內(nèi)沒有其它元素發(fā)射埃疫,則將此元素用于后續(xù)處理, 在前端APP應(yīng)用較多孩哑。 如果是空間上的防抖動(dòng)可以利用distinctUntilChanged操作符栓霜。throttle 限流操作,對(duì)于 throttleFirst是 取發(fā)射后元素横蜒,經(jīng)過間隔時(shí)間后的第一個(gè)元素進(jìn)行發(fā)射胳蛮。sample 數(shù)據(jù)采樣, 對(duì)于源數(shù)據(jù),發(fā)射間隔時(shí)間內(nèi)的最后出現(xiàn)的元素丛晌。
-
take & skip & first & emlmentAt,精確獲取數(shù)據(jù)(集)
take, 類似java8 limit 操作仅炊,但是這里支持更多的操作(take/takeLast/takeUntil/takeWhen),同時(shí)支持在時(shí)間區(qū)間上獲取數(shù)據(jù)集澎蛛; skip, 類似java8 skip 操作,但是這里的可以擴(kuò)展到時(shí)間區(qū)間上 first/firstElement/last/lastElement, 由 Flowable -> Single/Maybe.
Flowable<String> f1 = Flowable
.fromArray("blue", "red", "green", "yellow11", "orange", "cyan", "purple"
);
f1.elementAt(4, "hello").subscribe(System.out::println);
//out: orange
f1.takeUntil(x -> x.length() > 5).map(x -> "takeUntil-" + x).toList()
.subscribe(System.out::println);
//out: [takeUntil-blue, takeUntil-red, takeUntil-green, takeUntil-yellow11]
f1.takeWhile(x -> x.length() <= 5).map(x -> "takeWhile-" + x).toList()
.subscribe(System.out::println);
//out: [takeWhile-blue, takeWhile-red, takeWhile-green]
f1.skipWhile(x -> x.length() <= 5).map(x -> "skipWhile-" + x).toList()
.subscribe(System.out::println);
//[skipWhile-yellow11, skipWhile-orange, skipWhile-cyan, skipWhile-purple]
Disposable d = f1.delay(v -> Flowable.timer(v.length(), TimeUnit.SECONDS))
.skipUntil(Flowable.timer(5, TimeUnit.SECONDS)).map(x -> "skipUntil-" + x)
.subscribe(System.out::println);
// skipUntil-green
// skipUntil-orange
// skipUntil-purple
// skipUntil-yellow11
while (!d.isDisposed()) {
}
異步與并發(fā)(Asynchronized & Concurrency)
RxJava 通過一些操作統(tǒng)一了 同步和異步抚垄,阻塞與非阻塞,并行與并發(fā)編程谋逻。
observeOn & subscribeOn & Scheduler
subscribeOn 和 observeOn 都是用來切換線程用的,都需要參數(shù) Scheduler.
-
Scheduler ,調(diào)度器, 是RxJava 對(duì)線程控制器 的 一個(gè)抽象,RxJava 已經(jīng)內(nèi)置了幾個(gè) Scheduler 呆馁,它們已經(jīng)適合大多數(shù)的使用場(chǎng)景:
trampoline, 直接在當(dāng)前線程運(yùn)行(繼續(xù)上一個(gè)操作中,最后處理完成的數(shù)據(jù)源所處線程毁兆,并不一定是主線程)浙滤,相當(dāng)于不指定線程;
computation, 這個(gè) Scheduler 使用的固定的線程池(FixedSchedulerPool),大小為 CPU 核數(shù), 適用于CPU 密集型計(jì)算气堕。
io,I/O 操作(讀寫文件瓷叫、讀寫數(shù)據(jù)庫、網(wǎng)絡(luò)信息交互等)所使用的 Scheduler送巡。行為模式和 newThread() 差不多摹菠,區(qū)別在于 io() 的內(nèi)部實(shí)現(xiàn)是是用一個(gè)無數(shù)量上限的線程池,可以重用空閑的線程骗爆,因此多數(shù)情況下 io() 比 newThread() 更有效率;
newThread, 總是啟用新線程次氨,并在新線程中執(zhí)行操作;
single摘投, 使用定長為1 的線程池(newScheduledThreadPool(1))煮寡,重復(fù)利用這個(gè)線程;
-
Schedulers.from, 將java.util.concurrent.Executor 轉(zhuǎn)換成一個(gè)調(diào)度器實(shí)例犀呼。
java.util.function.Consumer<Object> pc = x -> System.out .println("Thread[" + Thread.currentThread().getName() + " ," + Thread .currentThread().getId() + "] :" + x); Executor executor = Executors.newFixedThreadPool(2); Schedulers.from(executor).scheduleDirect(() -> pc.accept("executor one")); Schedulers.from(executor).scheduleDirect(() -> pc.accept("executor two")); Schedulers.trampoline().scheduleDirect(() -> pc.accept("trampoline"), 1, TimeUnit.SECONDS); Schedulers.single().scheduleDirect(() -> pc.accept("single one DONE")); Schedulers.single().scheduleDirect(() -> pc.accept("single two DONE")); Schedulers.computation() .scheduleDirect(() -> pc.accept("computation one DONE"), 1, TimeUnit.SECONDS); Schedulers.computation() .scheduleDirect(() -> pc.accept("computation two DONE"), 1, TimeUnit.SECONDS); Schedulers.io().scheduleDirect(() -> pc.accept("io one DONE")); Schedulers.io().scheduleDirect(() -> pc.accept("io two DONE"), 1, TimeUnit.SECONDS); Schedulers.io().scheduleDirect(() -> pc.accept("io tree DONE"), 1, TimeUnit.SECONDS); Schedulers.newThread().scheduleDirect(() -> pc.accept("newThread tree DONE"));
subscribeOn 將Flowable 的數(shù)據(jù)發(fā)射 切換到 Scheduler 所定義的線程幸撕, 只有第一個(gè) subscribeOn 操作有效 ;
-
observeOn 指定 observeOn 后續(xù)操作所在線程外臂,可以聯(lián)合多個(gè) observeOn 將切換多次 線程 坐儿;
示例 Schedulers.newThread() 定義的線程發(fā)送數(shù)據(jù);
Schedulers.computation() 定義的線程 執(zhí)行doOnNext;
Schedulers.single() 執(zhí)行 subscribe
Consumer<Object> threadConsumer = x -> System.out .println("Thread[" + Thread.currentThread().getName() + " ," + Thread .currentThread().getId() + "] :" + x); Flowable<Path> f1 = Flowable.create((FlowableEmitter<Path> e) -> { Path dir = Paths.get("/home/clouder/berk/workspaces/cattle").toRealPath(); try (DirectoryStream<Path> dirStream = Files.newDirectoryStream(dir)) { Iterator<Path> iter = dirStream.iterator(); while (iter.hasNext() && !e.isCancelled()) { e.onNext(iter.next()); } e.onComplete(); } }, BackpressureStrategy.BUFFER); f1.subscribeOn(Schedulers.newThread()).subscribeOn(Schedulers.io()) .observeOn(Schedulers.computation()).take(5).doOnNext(consumer).observeOn(Schedulers .single()).subscribe(consumer);
多線程并發(fā)示例
上小節(jié)給出示例 發(fā)射元素都會(huì)經(jīng)過同樣的線程切換貌矿,元素間不會(huì)產(chǎn)生并行執(zhí)行的效果炭菌。 如果需要達(dá)到 類似 Java8 parallel 執(zhí)行效果」渎可以采用FlatMap 變換 自定義并發(fā)操作黑低,在返回的Flowable進(jìn)行線程操作,如下示例所示:
f1 中元素會(huì)在Schedulers.newThread()中發(fā)射酌毡;
讀取文本內(nèi)容的操作(Files::readAllLines, Collection::size) 會(huì)在 Schedulers.io() 所指定的線程池執(zhí)行克握;
sorted 操作會(huì)在 Schedulers.computation() 所指定的線程池中執(zhí)行;
subscribe() 同樣會(huì)在 Schedulers.computation() 所指定的線程池中執(zhí)行枷踏;
f1.filter(Files::isRegularFile).doOnNext(consumer).subscribeOn(Schedulers.newThread())
.flatMap(y -> Flowable.just(y).subscribeOn(Schedulers.io())
.map(Files::readAllLines)).map(Collection::size)
.observeOn(Schedulers.computation()).doOnNext(consumer)
.sorted(Comparator.naturalOrder())
.observeOn(Schedulers.trampoline()).subscribe(consumer);
阻塞與非阻塞示例
-
從阻塞到非阻塞 我們可以通過 subscribeOn() 來達(dá)到玛荞;
//f1 為 主線程發(fā)射數(shù)據(jù)的Flowable //會(huì)阻塞主線程知道消費(fèi)完成 f1.subscribe(System.out::println); // d會(huì)理解返回. Disposable d = f1.subscribeOn(Schedulers.newThread()) .subscribe(System.out::println); while (!d.isDisposed()) { }
- 從非阻塞到阻塞,可以通過blocking* 相關(guān)操作來實(shí)現(xiàn)
Flowable<Path> f2 = f1.subscribeOn(Schedulers.newThread());
//f2 為非阻塞flowable
// 可以通過 blockingSubscribe 變?yōu)樵谥骶€程上消費(fèi)
f2.blockingSubscribe(System.out::println);
// 也可以通過下面操作返回結(jié)果呕寝。
List<Path> list = f2.toList().blockingGet();
Iterable<Path> iterator = f2.blockingIterable();
錯(cuò)誤處理 (error handling)
RxJava
無需顯示的catch 編譯異常,RxJava2 已經(jīng)支持所有函數(shù)接口拋出Exception婴梧;
-
更方便在異步多線程環(huán)境下進(jìn)行錯(cuò)誤處理下梢;
如下示例 會(huì)打印第一個(gè)異常;
Flowable<Long> f1 = Flowable.interval(500, TimeUnit.MILLISECONDS).map(index -> {
throw new IOException(index + "");
}).map(index -> {
throw new IllegalArgumentException(index + "");
});
Disposable d = f1.subscribe(System.out::println, Throwable::printStackTrace);
while (!d.isDisposed()) {
}
- 異橙洌可以被轉(zhuǎn)換,源數(shù)據(jù)發(fā)射終止
- 當(dāng)出現(xiàn)異常時(shí)孽江,可以通過 onErrorReturn* 轉(zhuǎn)換成一個(gè)正常值返回;
- 當(dāng)出現(xiàn)異常時(shí)番电,通過 onErrorResumeNext 自定義一個(gè)Publisher返回岗屏,意味著可以轉(zhuǎn)換一個(gè)異常類型;
Flowable<Long> f1 = Flowable.interval(500, TimeUnit.MILLISECONDS).map(index -> {
throw new IOException(index + "");
}).map(index -> {
throw new IllegalArgumentException(index + "");
});
f1.onErrorReturnItem(-1L).take(5)
.subscribe(System.out::println, Throwable::printStackTrace);
// 打印 -1
Disposable d = f1.onErrorResumeNext(e -> {
if (e instanceof IOException) {
return Flowable.error(new UncheckedIOException((IOException) e));
}
return Flowable.error(e);
}).subscribe(System.out::println, Throwable::printStackTrace);
// 打印 UncheckedIOException 異常
while (!d.isDisposed()) {
}
-
Flowable,map拋出異常,但數(shù)據(jù)繼續(xù)發(fā)射
暫沒有找到直接方法可以達(dá)到漱办,但可以采取如下兩種方法達(dá)到
Function<Long, Long> exceptionMap = x -> {
if (new Random().nextInt(5) > 2) {
throw new IOException(x + "");
}
return x;
};
// 使用flatMap + onErrorReturnItem
Flowable<Long> f1 = Flowable.interval(500, TimeUnit.MILLISECONDS);
f1.flatMap(index -> Flowable.just(index).map(exceptionMap).onErrorReturnItem(-1L))
.take(5).subscribe(System.out::println);
//直接封裝lift 操作
public class ErrorResumeOperator<D, U> implements FlowableOperator<D, U>
{
private final Function<U, D> function;
private final D defaultValue;
public ErrorResumeOperator(Function<U, D> function, D defaultValue)
{
this.function = function;
this.defaultValue = defaultValue;
}
@Override
public Subscriber<? super U> apply(Subscriber<? super D> observer) throws Exception
{
Subscriber<U> subscriber = new Subscriber<U>()
{
@Override
public void onSubscribe(Subscription s)
{
observer.onSubscribe(s);
}
@Override
public void onNext(U onNext)
{
try {
observer.onNext(function.apply(onNext));
}
catch (Exception e) {
observer.onNext(defaultValue);
}
}
@Override
public void onError(Throwable t)
{
observer.onError(t);
}
@Override
public void onComplete()
{
observer.onComplete();
}
};
return subscriber;
}
}
Disposable d = f1.lift(new ErrorResumeOperator<>(exceptionMap, -1L)).take(5)
.subscribe(System.out::println);
while (!d.isDisposed()) {
}
-
出錯(cuò)重試(retry)
RxJava 提供了retry以及相關(guān)的多個(gè)操作这刷,提供出錯(cuò)后重新發(fā)射數(shù)據(jù)功能;
Function<Long, Long> exceptionMap = x -> {
if (new Random().nextInt(5) > 3) {
throw new IOException(x + "");
}
if (new Random().nextInt(6) < 1) {
throw new SQLException(x + "");
}
return x;
};
Flowable<Long> f1 = Flowable.interval(500, TimeUnit.MILLISECONDS);
// 僅為 IOException 異常時(shí)最多重試3次,其它異常立即打印異常
Disposable d = f1.map(exceptionMap).retry(3, e -> e instanceof IOException)
.subscribe(System.out::println, Throwable::printStackTrace);
while (!d.isDisposed()) {
}
冷熱數(shù)據(jù)流
ConnectableFlowable & publish & connect
- ConnectableFlowable 可連接的Flowable娩井, 不管是否消費(fèi)暇屋,只有調(diào)用了connect, 數(shù)據(jù)就一直在發(fā)射洞辣,不受消費(fèi)影響 ('冷' 的Flowable 變成'熱'的)
- publish 將 普通 Flowable咐刨,變成 ConnectableFlowable ;
;
ConnectableFlowable<String> f1 = Flowable.generate(() -> new BufferedReader(new InputStreamReader(System.in))
, (reader, e) -> {
while (true) {
String line = reader.readLine();
if (line == null || line.equalsIgnoreCase("exit")) {
break;
}
e.onNext(line);
}
e.onComplete();
}).ofType(String.class).subscribeOn(Schedulers.io()).publish();
TimeUnit.SECONDS.sleep(5);
f1.connect(System.out::println);
TimeUnit.SECONDS.sleep(5);
f1.observeOn(Schedulers.newThread()).map(x -> "s0- " + x).subscribe(System.out::println);
TimeUnit.SECONDS.sleep(5);
f1.map(x -> "s1- " + x).subscribe(System.out::println);
TimeUnit.SECONDS.sleep(50);
replay
replay 將Flowable變成 ConnectableFlowable, 在connect之后扬霜,確保每次消費(fèi)都使用相同數(shù)據(jù)定鸟。
java.util.function.Function<String, Consumer<Object>> m = s -> v -> System.out
.println("[" + System.currentTimeMillis() / 100 + "] " + s + "-" + v);
ConnectableFlowable<Long> f1 = Flowable.intervalRange(1, 100, 0, 1, TimeUnit.SECONDS)
.onBackpressureBuffer().replay();
m.apply("").accept("start");
TimeUnit.SECONDS.sleep(5);
f1.connect();
TimeUnit.SECONDS.sleep(5);
f1.subscribe(m.apply("o1"));
TimeUnit.SECONDS.sleep(5);
f1.subscribe(m.apply("o2"));
TimeUnit.SECONDS.sleep(20);
cache
緩存功能,將Flowable進(jìn)行緩存
java.util.function.Function<String, Consumer<Object>> m = s -> v -> System.out
.println("[" + System.currentTimeMillis() / 100 + "] " + s + "-" + v);
Flowable<Path> f1 = Flowable.create((FlowableEmitter<Path> e) -> {
Path dir = Paths.get("/home/clouder/berk/workspaces/cattle").toRealPath();
try (DirectoryStream<Path> dirStream = Files.newDirectoryStream(dir)) {
Iterator<Path> iter = dirStream.iterator();
while (iter.hasNext() && !e.isCancelled()) {
Path path = iter.next();
m.apply("-----create").accept(path);
e.onNext(path);
}
e.onComplete();
}
}, BackpressureStrategy.BUFFER).cache();
f1.count().subscribe(m.apply("count"));
f1.filter(Files::isDirectory).subscribe(m.apply("filter"));
背壓(Backpressure)問題
問題描述: 在rxjava中會(huì)經(jīng)常遇到一種情況就是被觀察者發(fā)送消息太快以至于它的操作符或者訂閱者不能及時(shí)處理相關(guān)的消息著瓶。那么隨之而來的就是如何處理這些未處理的消息联予。
如下示例: f1 比 f2 元素發(fā)射速度快一倍。而zip是按照發(fā)射順序結(jié)合,所以出現(xiàn)f1的產(chǎn)生速度快于其消費(fèi)速度躯泰,因此會(huì)有背壓問題產(chǎn)生(當(dāng)發(fā)射到一定數(shù)量時(shí)會(huì)有異常拋出)谭羔。
Consumer<Object> consumer = v -> System.out.println("[" + System.currentTimeMillis() / 100 + "] " + v);
Flowable<Long> f1 = Flowable.interval(100, TimeUnit.MILLISECONDS);
Flowable<Long> f2 = Flowable.interval(200, TimeUnit.MILLISECONDS);
Flowable<Long> f3 = Flowable.zip(f1, f2, (x, y) -> x * 10000 + y);
f3.subscribe(consumer);
對(duì)于出現(xiàn)的背壓問題: - Flowable默認(rèn)隊(duì)列大小為128,并且規(guī)范要求麦向,所有的操作符強(qiáng)制支持背壓瘟裸。 - 通過操作節(jié)流(Throttling)相關(guān)操作(sample 、throttleLast诵竭、throttleFirst话告、throttleWithTimeout、debounce等)來改變Flowable的發(fā)射數(shù)率卵慰;
通過設(shè)置緩沖區(qū)和窗口(buffer,window)操作,來緩存過剩的數(shù)據(jù)沙郭,然后發(fā)送特定數(shù)據(jù)。
設(shè)置背壓策略(onBackpressurebuffer & onBackpressureDrop & onBackpressureLatest)
RxJava 測(cè)試
RxJava2 支持test() 操作符裳朋,將Flowable轉(zhuǎn)變?yōu)?TestSubscriber,從而支持多種斷言操作病线。
List<String> list = Arrays.asList(
"orange", "blue", "red", "green", "yellow", "cyan", "purple");
Flowable.fromIterable(list).subscribeOn(Schedulers.newThread()).sorted().test().assertValues(list.stream().sorted().toArray(String[]::new));
Flowable.fromIterable(list).count().test().assertValue(Integer.valueOf(list.size()).longValue());
List<String> out1 = Flowable.fromIterable(list).sorted().test().values();
Reference
響應(yīng)式宣言.https://github.com/reactivemanifesto/reactivemanifesto/blob/master/README.zh-cn.md
RxJava 2.0 Released with Support for Reactive Streams Specification. https://www.infoq.com/news/2016/11/rxjava-2-with-reactive-streams
https://www.lightbend.com/blog/7-ways-washing-dishes-and-message-driven-reactive-systems
Use reactive streams API to combine akka-streams with rxJava. http://www.smartjava.org/content/use-reactive-streams-api-combine-akka-streams-rxjava
What's different in 2.0. https://github.com/ReactiveX/RxJava/wiki/What's-different-in-2.0
Learning Reactive Programming with Java8. https://github.com/zouzhberk/rxjava-study/raw/master/docs/LearningReactiveProgramming.pdf