RxJava 簡(jiǎn)介
RxJava 是 ReactiveX 在 Java 上的開源的實(shí)現(xiàn)埋哟。Observable(被觀察者) 和 Subscriber(訂閱者)是兩個(gè)主要的類。在 RxJava 上嗜暴,一個(gè) Observable 是一個(gè)發(fā)出數(shù)據(jù)流或者事件的類械姻,Subscriber 是一個(gè)對(duì)這些發(fā)出的 items (數(shù)據(jù)流或者事件)進(jìn)行處理(采取行動(dòng))的類斑响。一個(gè) Observable 的標(biāo)準(zhǔn)流發(fā)出一個(gè)或多個(gè) item枉侧,然后成功完成或者出錯(cuò)。一個(gè) Observable 可以有多個(gè) Subscribers肛宋,并且通過 Observable 發(fā)出的每一個(gè) item州藕,該 item 將會(huì)被發(fā)送到 Subscriber.onNext() 方法來進(jìn)行處理。一旦 Observable 不再發(fā)出 items酝陈,它將會(huì)調(diào)用 Subscriber.onCompleted() 方法床玻,或如果有一個(gè)出錯(cuò)的話 Observable 會(huì)調(diào)用 Subscriber.onError() 方法。
對(duì)于任何 Observable 你可以定義在兩個(gè)不同的線程后添,Observable 會(huì)操作在它上面笨枯。使用 Observable.observeOn() 定義"觀察者執(zhí)行觀察的"線程,用來監(jiān)聽和檢查從 Observable 最新發(fā)出的 items (Subscriber 的 onNext遇西,onCompleted 和 onError 方法會(huì)執(zhí)行在 observeOn 所指定的線程上)馅精,并使用 Observable.subscribeOn() 來定義"訂閱的線程",將其運(yùn)行我們 Observable 的代碼(長(zhǎng)時(shí)間運(yùn)行的操作)粱檀。
observeOn 與作用域
observeOn是對(duì)下游生效的洲敢,一個(gè)簡(jiǎn)單的例子:
Flowable.just(1).observeOn(Schedulers.io())
.subscribe(i -> {
System.out.println(Thread.currentThread().getName());
});
輸出:
RxCachedThreadScheduler-1
但是當(dāng)有多個(gè)操作符,且存在多次observeOn時(shí)茄蚯,每個(gè)方法都是執(zhí)行在什么線程呢压彭?
Flowable.just(1).observeOn(Schedulers.io())
.map(i -> {
System.out.println(Thread.currentThread().getName());
return i;
})
.observeOn(Schedulers.computation())
.subscribe(i -> {
System.out.println(Thread.currentThread().getName());
});
輸出:
RxCachedThreadScheduler-1
RxComputationThreadPool-1
這里就涉及到一些 RxJava 實(shí)現(xiàn)的細(xì)節(jié),多數(shù)操作符是基于上游調(diào)用onNext / onComplete / onError 的進(jìn)一步封裝渗常,在不涉及包含Scheduler的操作符的情況下壮不,在上游調(diào)用了observeOn后,后續(xù)操作符的方法都是執(zhí)行在上游observeOn所調(diào)度的線程皱碘。因此每個(gè)操作符所執(zhí)行的線程都是由上游最近的一個(gè)observeOn的Scheduler決定询一。
因此筆者稱之為最近生效原則,但是請(qǐng)注意癌椿,observeOn是影響下游的健蕊,因此操作符所執(zhí)行的線程受的是最近上游的observeOn影響。
示例
因此在實(shí)際使用中靈活的使用observeOn踢俄,使得代碼的效率最大化缩功。這里筆者再舉個(gè)例子:
Flowable.just(new File("input.txt"))
.map(f -> new BufferedReader(new InputStreamReader(new FileInputStream(f))))
.observeOn(Schedulers.io())
.flatMap(r -> Flowable.<String, BufferedReader>generate(() -> r, (br, e) -> {
String s = br.readLine();
if (s != null) {
e.onNext(s);
} else {
System.out.println(Thread.currentThread().getName());
e.onComplete();
}
}, BufferedReader::close))
.observeOn(Schedulers.computation())
.map(Integer::parseInt)
.reduce(0, (total, item) -> {
System.out.println(item);
return total + item;
})
.subscribe(s -> {
System.out.println("total: " + s);
System.out.println(Thread.currentThread().getName());
});
輸出:
RxCachedThreadScheduler-1
1
2
3
4
5
total: 15
RxComputationThreadPool-1
如上代碼所示,我們從 input.txt 讀出每行的字符串都办,然后轉(zhuǎn)成一個(gè) int, 最后求和嫡锌。這里我們靈活地使用了兩次observeOn虑稼,在讀文件時(shí),調(diào)度至IoScheduler势木,隨后做計(jì)算工作時(shí)調(diào)度至ComputationScheduler动雹,從控制臺(tái)的輸出可以見線程的的確確是我們所期望的。當(dāng)然這里求和只是一個(gè)示例跟压,讀者們可以舉一反三。
事實(shí)上上面的代碼還不是最優(yōu)的:
Flowable.just(new File("input.txt"))
.map(f -> new BufferedReader(new InputStreamReader(new FileInputStream(f))))
.observeOn(Schedulers.io())
.flatMap(r -> Flowable.<String, BufferedReader>generate(() -> r, (br, e) -> {
String s = br.readLine();
if (s != null) {
e.onNext(s);
} else {
System.out.println(Thread.currentThread().getName());
e.onComplete();
}
}, BufferedReader::close))
.parallel()
.runOn(Schedulers.computation())
.map(Integer::parseInt)
.reduce((i, j) -> {
System.out.println(Thread.currentThread().getName());
return i + j;
})
.subscribe(s -> {
System.out.println("total: " + s);
System.out.println(Thread.currentThread().getName());
});
輸出:
RxCachedThreadScheduler-1
RxComputationThreadPool-1
RxComputationThreadPool-2
RxComputationThreadPool-4
RxComputationThreadPool-4
total: 15
RxComputationThreadPool-4
如上代碼所示我們可以充分利用多核的性能歼培,通過parallel來并行運(yùn)算震蒋,當(dāng)然這里用在求和就有點(diǎn)殺雞用牛刀的意思了,這里只是一個(gè)舉例躲庄。更多 parallel 相關(guān)的內(nèi)容查剖,留待后續(xù)分享。
subscribeOn與作用域
事實(shí)上subscribeOn同樣遵循最近生效原則噪窘,但是與observeOn恰恰相反笋庄。操作符會(huì)被最近的下游的subscribeOn調(diào)度,因?yàn)閟ubscribeOn影響的是上游倔监。
但是和observeOn又有一些微妙的差別在于直砂,我們通常調(diào)用subscribeOn更加關(guān)注最上游的數(shù)據(jù)源的線程。因此通常不會(huì)在中間過程中調(diào)用多次浩习,任意的調(diào)用一次subscribeOn均會(huì)影響上游所有操作符的subscribe所在的線程静暂,且不受observeOn的影響。這是由于這兩者機(jī)制的不同谱秽,subscribeOn是將整個(gè)上游的subscribe方法都調(diào)度到目標(biāo)線程了洽蛀。