參閱
給 Android 開(kāi)發(fā)者的 RxJava 詳解
什么是函數(shù)式編程
RxJava 2.0 全新來(lái)襲
基于RxJava 1.x,結(jié)合RxJava 2.0整理學(xué)習(xí)筆記含滴。
概念
1.實(shí)現(xiàn)了異步操作的庫(kù)溶弟;
2.通過(guò)擴(kuò)展觀察者模式來(lái)實(shí)現(xiàn)異步沙廉;
Observable發(fā)送消息扶叉,而Subscriber則用于消費(fèi)消息。
與觀察者不同的是逼裆,Observable一般只有等到有Subscriber通過(guò)subscribe方法訂閱它郁稍,才會(huì)開(kāi)始發(fā)送消息。
基礎(chǔ)類(lèi)/方法
- ** Observer(觀察者)**胜宇,接口耀怜。
它決定事件觸發(fā)的時(shí)候?qū)⒂性鯓拥男袨椤?br> 定義了4個(gè)行為/方法: onSubscribe(), onNext(), onError(), onComplete(),
/**
* 創(chuàng)建一個(gè)觀察者
*/
Observer<String> observer = new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(String s) {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
};
-
Subscriber(訂閱者),接口桐愉。等價(jià)于觀察者财破。與觀察者不同之處在于:onSubscribe方法的參數(shù)不同,而且兩者位于不同的jar包下从诲。
Subscriber位于reactive-streams.jar文件下左痢,包名:org.reactivestreams.Subscriber
Observable位于rxjava.jar文件下,包名:io.reactivex.Observable
/**
* 創(chuàng)建一個(gè)訂閱者
*/
Subscriber<String> subscriber = new Subscriber<String>() {
@Override
public void onSubscribe(Subscription s) {
}
@Override
public void onNext(String s) {
}
@Override
public void onError(Throwable t) {
}
@Override
public void onComplete() {
}
};
-
Observable (被觀察者)系洛,抽象類(lèi)
它決定什么時(shí)候觸發(fā)事件以及觸發(fā)怎樣的事件
/**
* 創(chuàng)建一個(gè)Observable對(duì)象俊性,并定義事件處理規(guī)則。當(dāng)它被訂閱的時(shí)候描扯,事件會(huì)按順序依次觸發(fā)定页。
*/
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
emitter.onNext("Hello");
emitter.onNext("Hi");
emitter.onNext("Aloha");
emitter.onComplete();
}
});
create()
方法是 RxJava 最基本的創(chuàng)造事件序列的方法≌莱希基于這個(gè)方法典徊, RxJava 還提供了一些方法用來(lái)快捷創(chuàng)建事件隊(duì)列。
例如:
-
just(T ...)
-
fromArray(T... items)
Observable observable1 = Observable.just("Hello", "Hi", "Aloha"); Observable observable2 = Observable.fromArray("Hello", "Hi", "Aloha");
Flowable(被觀察者)恩够,抽象類(lèi)卒落。等價(jià)于Observable。RxJava 2.x引入蜂桶。
RxJava1.x中儡毕,Observeable用于訂閱Observer和Subscriber。
RxJava2.x中扑媚, Observeable用于訂閱Observer 腰湾,是不支持背壓的贾费,而 Flowable用于訂閱Subscriber ,是支持背壓(Backpressure)的檐盟。
背壓是指在異步場(chǎng)景中,被觀察者發(fā)送事件速度遠(yuǎn)快于觀察者的處理速度的情況下押桃,一種告訴上游的被觀察者降低發(fā)送速度的策略 葵萎,在1.0中,關(guān)于背壓最大的遺憾唱凯,就是集中在Observable這個(gè)類(lèi)中羡忘,導(dǎo)致有的Observable支持背壓,有的不支持磕昼。為了解決這種缺憾卷雕,新版本把支持背壓和不支持背壓的Observable區(qū)分開(kāi)來(lái)。
- subscribe(訂閱)票从,方法漫雕。
observable.subscribe(observer);
//RxJava 2.x中如下方法編譯報(bào)錯(cuò),沒(méi)有提供與Subscriber對(duì)象關(guān)聯(lián)的方法
//observable1.subscribe(subscriber);
可以從上圖的訂閱方法中發(fā)現(xiàn)Consumer類(lèi)峰鄙。
- Consumer(消費(fèi)者)浸间, 一個(gè)接口。用于接受單個(gè)值吟榴。
Consumer onNextConsumer = new Consumer<String>() {
@Override
public void accept(@NonNull String o) throws Exception {
}
};
Consumer onErrorConsumer = new Consumer<String>() {
@Override
public void accept(@NonNull String o) throws Exception {
}
};
observable.subscribe(onNextConsumer, onErrorConsumer);
顯然魁蒜,subscribe方法支持不完整定義的回調(diào),可以根據(jù)需求單獨(dú)處理只需要的回調(diào)吩翻,而無(wú)需每次都處理Observer中的4個(gè)回調(diào)兜看。Consumer可以定義Observer的每一個(gè)部分,Observable.subscribe()函數(shù)能夠處理一個(gè)狭瞎,兩個(gè)细移、三個(gè)或者4個(gè)參數(shù),分別表示onNext()脚作,onError()葫哗,onComplete()和onSubscribe函數(shù)。響應(yīng)順序是onSubscribe->onNext->onComplete或者onError球涛。
范例
- 將字符串?dāng)?shù)組 names 中的所有字符串依次打印出來(lái):
String[] names = {"Jason", "Bob", "Coco"};
Observable.fromArray(names).subscribe(new Consumer<String>() {
@Override
public void accept(@NonNull String s) throws Exception {
System.out.println("name:" + s);
}
});
- 由 id 取得圖片并顯示
Observable.create(new ObservableOnSubscribe<Drawable>() {
@Override
public void subscribe(ObservableEmitter<Drawable> e) throws Exception {
//根據(jù)id獲取Drawable對(duì)象劣针,回調(diào)到觀察者中。
Drawable drawable = getResources().getDrawable(R.drawable.ic_action_name);
e.onNext(drawable);
e.onComplete();
}
}).subscribe(new Consumer<Drawable>() {
@Override
public void accept(@NonNull Drawable drawable) throws Exception {
ImageView imageView = (ImageView) findViewById(R.id.image);
imageView.setImageDrawable(drawable);
}
}, new Consumer<Throwable>() {
@Override
public void accept(@NonNull Throwable throwable) throws Exception {
System.out.println(throwable.getMessage());
}
});
在 RxJava 的默認(rèn)規(guī)則中亿扁,事件的發(fā)出和消費(fèi)都是在同一個(gè)線程的捺典。也就是說(shuō),如果只用上面的方法从祝,實(shí)現(xiàn)出來(lái)的只是一個(gè)同步的觀察者模式襟己。觀察者模式本身的目的就是『后臺(tái)處理引谜,前臺(tái)回調(diào)』的異步機(jī)制,因此異步對(duì)于 RxJava 是至關(guān)重要的擎浴。而要實(shí)現(xiàn)異步员咽,則需要用到 RxJava 的另一個(gè)概念: Scheduler 。
線程控制
- Scheduler(調(diào)度器)贮预,抽象類(lèi)
- Scheduler的子類(lèi)有ComputationScheduler贝室、ExecutorScheduler、ImmediateThinScheduler仿吞、NewThreadScheduler滑频、SingleScheduler、TrampolineScheduler唤冈。
-
Schedulers 一個(gè)可以返回標(biāo)準(zhǔn)Scheduler實(shí)例的靜態(tài)工廠峡迷。
- Schedulers.newThread(): 為每個(gè)工作單元?jiǎng)?chuàng)建一個(gè)新的線程。
- Schedulers.io(): I/O 操作(讀寫(xiě)文件你虹、讀寫(xiě)數(shù)據(jù)庫(kù)绘搞、網(wǎng)絡(luò)信息交互等)所使用的 Scheduler。
與newThread()的區(qū)別在于 io() 的內(nèi)部實(shí)現(xiàn)是是用一個(gè)無(wú)數(shù)量上限的線程池傅物,可以重用空閑的線程看杭,因此多數(shù)情況下 io() 比 newThread() 更有效率。
不要把計(jì)算工作放在 io() 中挟伙,可以避免創(chuàng)建不必要的線程楼雹,計(jì)算工作可以使用computation()方法。 - Schedulers.computation(): 用于計(jì)算型工作例如事件循環(huán)和回調(diào)處理尖阔。
這個(gè)計(jì)算指的是 CPU 密集型計(jì)算贮缅,即不會(huì)被 I/O 等操作限制性能的操作,例如圖形的計(jì)算介却。
這個(gè) Scheduler 使用的固定的線程池谴供,大小為 CPU 核數(shù)。
不要把 I/O 操作放在 computation() 中齿坷,否則 I/O 操作的等待時(shí)間會(huì)浪費(fèi) CPU桂肌。 - Schedulers.single(): 單線程,共享的Scheduler永淌。
- Schedulers.trampoline():在當(dāng)前線程上工作崎场,但不立即執(zhí)行的Scheduler。
在當(dāng)前線程中的工作放入隊(duì)列中排隊(duì)遂蛀,并依次操作谭跨。 - 另外, Android 還有一個(gè)專(zhuān)用的 AndroidSchedulers.mainThread(),它指定的操作將在 Android 主線程運(yùn)行螃宙。
使用 subscribeOn() 和 observeOn() 兩個(gè)方法來(lái)對(duì)線程進(jìn)行控制了蛮瞄。前者
用于指定被觀察者執(zhí)行的線程,或者叫事件產(chǎn)生的線程谆扎。后者用于指定觀察者執(zhí)行的線程挂捅,或者叫事件消費(fèi)的線程。
Observable.create(new ObservableOnSubscribe<Drawable>() {
@Override
public void subscribe(ObservableEmitter<Drawable> e) throws Exception {
//根據(jù)id獲取Drawable對(duì)象堂湖,回調(diào)到觀察者中籍凝。
Drawable drawable = getResources().getDrawable(R.drawable.ic_action_name);
e.onNext(drawable);
e.onComplete();
}
})
.subscribeOn(Schedulers.io())//用于指定被觀察者執(zhí)行的線程
.observeOn(AndroidSchedulers.mainThread())//用于執(zhí)行觀察者執(zhí)行的線程
.subscribe(new Consumer<Drawable>() {
@Override
public void accept(@NonNull Drawable drawable) throws Exception {
ImageView imageView = (ImageView) findViewById(R.id.image);
imageView.setImageDrawable(drawable);
}
}, new Consumer<Throwable>() {
@Override
public void accept(@NonNull Throwable throwable) throws Exception {
System.out.println(throwable.getMessage());
}
});
變換
所謂變換,就是將事件序列中的對(duì)象或整個(gè)序列進(jìn)行加工處理苗缩,轉(zhuǎn)換成不同的事件或事件序列。
篇幅較長(zhǎng)声诸,請(qǐng)參閱給 Android 開(kāi)發(fā)者的 RxJava 詳解酱讶。
簡(jiǎn)單的說(shuō)就是在發(fā)送者Observable和消息消費(fèi)者Subscriber之間對(duì)消息進(jìn)行各種你所需要的加工處理。
RxJava(一)基礎(chǔ)知識(shí)
RxJava(二) Operator
關(guān)鍵方法
- map()
- flatMap()
其他觀察者模式
當(dāng)然彼乌,除了上面這兩種觀察者泻肯,還有一類(lèi)觀察者
Single/SingleObserver
Completable/CompletableObserver
Maybe/MaybeObserver
更多請(qǐng)參閱RxJava 2.0 全新來(lái)襲