最近再系統(tǒng)的整理了一下RxJava,感覺挺好用的虾宇,因為它搓彻,GitHub 上衍生了一堆比如 RxAndroid、RxBus嘱朽、RxPermission 等之類的開源庫旭贬。下面寫寫整理出來的文檔。
另外使用場景文章Android RxJava之葵花寶典(下)(看我就夠了)---使用場景
一搪泳、RxJava的介紹
1稀轨、RxJava是什么
在講RxJava之前,先了解一下相關(guān)術(shù)語
響應(yīng)式編程:一種面向數(shù)據(jù)流和變化傳播的編程范式
不懂岸军?那舉個簡單的例子靶端,界面上的按鈕,點擊的時候會觸發(fā)按鈕的寫好的點擊事件凛膏。 我們不知道按鈕什么時候會被點到杨名,但是點到了就會通知app去觸發(fā)你寫好的事件,這個 “通知” 的過程就是RxJava的工作猖毫。
觀察者模式:
觀察者模式的基本需求:觀察者和被觀察者之間是完全分離的台谍,當被觀察者的狀態(tài)發(fā)生變化之后,
通過Register(注冊)
或者 Subscribe(訂閱)
的方式吁断,通知觀察者趁蕊。
RxJava 是一種函數(shù)式、響應(yīng)式的異步操作庫仔役,它讓你的代碼更加簡潔掷伙。
二、RxJava的重點基礎(chǔ)
1又兵、Observable
Observable [?b'z??v?bl]
可觀察量,可觀察量,可觀測的任柜。在觀察者模式中稱為“被觀察者”
或“可觀察對象”
卒废。
//OnSubcriber是實現(xiàn)了一個Acton1接口的接口。
Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
//每個Observable有一個final OnSubscribe<T> onSubscribe 成員宙地,
//在該成員方法中調(diào)用call()方法摔认,這個call方法的參數(shù)
//就是 Observable.subscribe() 方法傳入的 Subsriber實例。
}
});
注意:在Rxjava中ActionX系列宅粥,其實就是無返回值的的接口
2参袱、Observer
接收源,是觀察者模式中的“觀察者”
秽梅,可接收Observable抹蚀、Subject
發(fā)射的數(shù)據(jù)。
Observer<String> observer = new Observer<String>() {
@Override
public void onCompleted() {
//告知Observable沒有更多的數(shù)據(jù)了企垦,即沒有新的onNext()發(fā)出時环壤,就執(zhí)行onCompleted()。
}
@Override
public void onError(Throwable e) {
//在事件處理過程中竹观,出現(xiàn)了異掣渑酰或者錯誤潜索,就會被觸發(fā)臭增,同時整個隊列將被終止,不再有事件發(fā)出竹习。
}
@Override
public void onNext(String s) {
//實現(xiàn)方法跟Subscriber一模一樣
}
};
在一個隊列中誊抛,onCompleted()和onError() 都是最后觸發(fā)的,而且兩者中只有一個會被觸發(fā)整陌。
3拗窃、Subscriber
“訂閱者”,也是接收源泌辫,那它跟Observer
有什么區(qū)別呢随夸?Subscriber
實現(xiàn)了Observer
接口,比Observer
多了一個最重要的方法unsubscribe( )
震放,用來取消訂閱
宾毒,當你不再想接收數(shù)據(jù)了,可以調(diào)用unsubscribe( )方法停止接收殿遂,Observer 在 subscribe() 過程中,最終也會被轉(zhuǎn)換成 Subscriber 對象诈铛,一般情況下,建議使用Subscriber作為接收源墨礁。
Subscriber<String> subscriber = new Subscriber<String>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(String s) {
//實現(xiàn)方法跟Observer一模一樣
}
@Override
public void onStart() {
// 它會在 Subscribe 剛開始幢竹,而事件還未發(fā)送之前被調(diào)用,
// 可以用于做一些準備工作恩静,例如數(shù)據(jù)的清零或重置
super.onStart();
}
};
--------------------------------
//這是 Subscriber 所實現(xiàn)的另一個接口 Subscription 的方法.
//用于取消訂閱焕毫。
subscriber.unsubscribe();
//這是 Subscriber 所實現(xiàn)的另一個接口 Subscription 的方法.
//用于判斷當前是否訂閱。
subscriber.isUnsubscribed();
4、Subscription
訂閱咬荷。Observable調(diào)用subscribe( )方法返回的對象冠句,同樣有unsubscribe( )方法,可以用來取消訂閱事件幸乒;
//被觀察者
Observable<Integer> observableInteger = Observable
.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
for (int i = 0; i < 5; i++) {
subscriber.onNext(i);
}
subscriber.onCompleted();
}
});
//觀察者
Subscriber subscriber = new Subscriber<Integer>() {
@Override
public void onCompleted() {
Log.d(TAG, "onCompleted: Observable completed");
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "onError: Observable error");
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, "onNext: " + integer);
}
};
//進行訂閱
Subscription subscription = observableInteger.subscribe(subscriber);
Log.d(TAG, "subscription: " + subscription.isUnsubscribed() + ",Observable:" + subscriber
.isUnsubscribed());
Observable.from()
List<Integer> integers = new ArrayList<>();
integers.add(1);
integers.add(2);
integers.add(3);
integers.add(4);
integers.add(5);
Subscriber subscriber = new Subscriber<Integer>() {
@Override
public void onCompleted() {
Log.d(TAG, "onCompleted: Observable completed");
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "onError: Observable error");
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, "onNext: " + integer);
}
};
Observable.from(integers).subscribe(subscriber);
Observable.just()
將傳入的參數(shù)依次發(fā)射出去懦底。
Subscriber subscriber = new Subscriber<Integer>() {
@Override
public void onCompleted() {
Log.d(TAG, "onCompleted: Observable completed");
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "onError: Observable error");
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, "onNext: " + integer);
}
};
Observable.just(1,2,3,4,5).subscribe(subscriber);
just()中可以傳入1-10個參數(shù),并且將傳入?yún)?shù)的順序來發(fā)射出去罕扎。
5聚唐、Subject
是一個比較特殊的對象,既可充當發(fā)射源腔召,也可充當接收源杆查。
源碼如下
public abstract class Subject<T, R> extends Observable<R> implements Observer<T> {
//這個Subject即實現(xiàn)了Observer,又繼承了Observable
...
}
所以 Subject = Observable + Observer
RxJava針對不同的場景提供四種不同的Subject
- PublishSubject
訂閱之后的數(shù)據(jù)全部被發(fā)射臀蛛。 - BehaviorSubject
訂閱之前的一個和訂閱之后的全部數(shù)據(jù)被發(fā)射 - ReplaySubject
不論訂閱所處任何位置亲桦,都將發(fā)射全部數(shù)據(jù) - AsyncSubject
不論訂閱所處任何位置,只會發(fā)射最后一個數(shù)據(jù)
PublishSubject
PublishSubject只會把在訂閱發(fā)生的時間點之后來自原始Observable的數(shù)據(jù)發(fā)射給觀察者浊仆。
PublishSubject<String> publishSubject = PublishSubject.create();
Subscriber<String> subscriber = new Subscriber<String>() {
@Override
public void onCompleted() {
Log.d(TAG, "onCompleted: PublishSubject Completed!");
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "onError: PublishSubject Error!");
}
@Override
public void onNext(String s) {
Log.d(TAG, "onNext: " + s);
}
};
publishSubject.onNext("one");
publishSubject.onNext("two");
publishSubject.subscribe(subscriber);
publishSubject.onNext("three");
---------------------------------------
//打印結(jié)果
onNext: three
BehaviorSubject
當觀察者訂閱BehaviorSubject時客峭,它開始發(fā)射原始Observable最近發(fā)射的數(shù)據(jù)(如果此時還沒有收到任何數(shù)據(jù),它會發(fā)射一個默認值)抡柿,然后繼續(xù)發(fā)射其它任何來自原始Observable的數(shù)據(jù)舔琅。
BehaviorSubject<String> behaviorSubject = BehaviorSubject.create("default");
Subscriber<String> subscriber = new Subscriber<String>() {
@Override
public void onCompleted() {
Log.d(TAG, "onCompleted: BehaviorSubject Completed!");
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "onError: BehaviorSubject Error!");
}
@Override
public void onNext(String s) {
Log.d(TAG, "onNext: " + s);
}
};
behaviorSubject.subscribe(subscriber);
behaviorSubject.onNext("one");
behaviorSubject.onNext("two");
behaviorSubject.onNext("three");
--------------------------------------------
onNext: default
onNext: one
onNext: two
onNext: three
如果把 behaviorSubject.subscribe(subscriber);放在倒數(shù)第二行
behaviorSubject.onNext("one");
behaviorSubject.onNext("two");
behaviorSubject.subscribe(subscriber);
behaviorSubject.onNext("three");
---------------------------------
//打印結(jié)果
onNext: two
onNext: three
看到結(jié)果,我們不難看出洲劣,其實上面所說的發(fā)射最近所發(fā)射的數(shù)據(jù)备蚓,其實就是以
behaviorSubject.subscribe(subscriber);
為界,這句代碼之前的一個和之后的所以發(fā)射囱稽。
當然郊尝,如果原始的Observable因為發(fā)生了一個錯誤而終止,BehaviorSubject將不會發(fā)射任何數(shù)據(jù)战惊,只是簡單的向前傳遞這個錯誤通知流昏。
ReplaySubject
ReplaySubject會發(fā)射所有來自原始Observable的數(shù)據(jù)給觀察者,無論它們是何時訂閱的样傍。
ReplaySubject<String> replaySubject = ReplaySubject.create();
Subscriber<String> subscriber = new Subscriber<String>() {
@Override
public void onCompleted() {
Log.d(TAG, "onCompleted: ReplaySubject Completed!");
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "onError: ReplaySubject Error!");
}
@Override
public void onNext(String s) {
Log.d(TAG, "onNext: " + s);
}
};
replaySubject.subscribe(subscriber);
replaySubject.onNext("one");
replaySubject.onNext("two");
replaySubject.onNext("three");
--------------------------
//打印
onNext: one
onNext: two
onNext: three
AsyncSubject
當Observable完成時AsyncSubject只會發(fā)布最后一個數(shù)據(jù)給已經(jīng)訂閱的每一個觀察者横缔。
AsyncSubject<String> asyncSubject = AsyncSubject.create();
Subscriber<String> subscriber = new Subscriber<String>() {
@Override
public void onCompleted() {
Log.d(TAG, "onCompleted: AsyncSubject Completed!");
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "onError: AsyncSubject Error!");
}
@Override
public void onNext(String s) {
Log.d(TAG, "onNext: "+s);
}
};
Subscription subscription = asyncSubject.subscribe(subscriber);
asyncSubject.onNext("one!");
asyncSubject.onNext("two!");
asyncSubject.onNext("three!");
asyncSubject.onCompleted();
-----------------------------------
//打印結(jié)果如下
onNext: three!
onCompleted: AsyncSubject Completed!
當然如果原始Observable沒有發(fā)射任何值,AsyncObject也不發(fā)射任何值
AsyncSubject會把最后一個值發(fā)射給后續(xù)的觀察者衫哥。
請注意:如果在AsyncSubject異常時茎刚,那么不會向觀察者發(fā)射任何值,只會傳遞一個錯誤的通知撤逢。
6膛锭、Action0
RxJava中的一個接口粮坞,它只有一個無參call()方法,且無返回值初狰,同樣還有Action1莫杈,Action2...Action9等,Action1封裝了含有 1 個參的call()方法奢入,即call(T t)筝闹,Action2封裝了含有 2 個參數(shù)的call方法,即call(T1 t1腥光,T2 t2)关顷,以此類推。
7武福、Func0
與Action0非常相似议双,也有call()方法,但是它是有返回值的捉片,同樣也有Func0平痰、Func1...Func9;
三、RxJava操作符
Scheduler(調(diào)度器)
subscribeOn()指定:Observable將全部的處理過程(包括發(fā)射數(shù)據(jù)和通知)放在特定的調(diào)度器上執(zhí)行伍纫。
ObserveOn()指定:一個Observable在一個特定的調(diào)度器上調(diào)用觀察者的onNext, onError和onCompleted方法宗雇,
Subscriber subcriber = new Subscriber<String>() {
@Override
public void onCompleted() {
Log.d(TAG, "onCompleted: Completed!");
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "onError: Error!");
}
@Override
public void onNext(String s) {
Log.d(TAG, "onNext: " + s);
}
};
Observable.just("1", "2", "3", "4")
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(subcriber);
上面這段代碼中,由于指定了1翻斟,2逾礁,3说铃,4發(fā)射代碼為Schedulers.io()访惜,那么發(fā)射數(shù)據(jù)就將在io線程中執(zhí)行。而onNext, onError和onCompleted則將在主線中執(zhí)行腻扇。
Operators(操作符)
RxJava提供了幾個mapping函數(shù):map(),flatMap(),concatMap(),flatMapIterable()以及switchMap().所有這些函數(shù)都作用于一個可觀測序列债热,然后變換它發(fā)射的值,最后用一種新的形式返回它們幼苛。
map
map 是用于變換的一個操作符,這在RxJava中占據(jù)了一定的地位,就是因為它的變換操作轿塔。
Subscriber subcriber = new Subscriber<Integer>() {
@Override
public void onCompleted() {
Log.d(TAG, "onCompleted: Completed!");
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "onError: Error!");
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, "onNext: " + integer);
}
};
Observable.just("1", "2", "3", "4")
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.map(new Func1<String, Integer>() {
@Override
public Integer call(String s) {
return Integer.parseInt(s);
}
})
.subscribe(subcriber);
在上面的代碼中溯捆,我通過map將字符串轉(zhuǎn)化成了整形的1,2括荡,3高镐,4,返回一個Observable的對象畸冲。
請注意:這個操作符默認不在任何特定的調(diào)度器上執(zhí)行嫉髓。
flatmap
Subscriber subcriber = new Subscriber<Integer>() {
@Override
public void onCompleted() {
Log.d(TAG, "onCompleted: Completed!");
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "onError: Error!");
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, "onNext: " + integer);
}
};
Observable.just("1", "2", "3", "4")
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.flatMap(new Func1<String, Observable<Integer>>() {
@Override
public Observable<Integer> call(String s) {
return Observable.just(Integer.parseInt(s)+1);
}
})
.subscribe(subcriber);
-------------------------------------------
//打印
onNext: 2
onNext: 3
onNext: 4
onNext: 5
onCompleted: Completed!
從上面我們可以看出观腊,map與flatmap很相似,都是用的Func1算行,而且模式都是<I,O>模式梧油,即是:I轉(zhuǎn)換成O并返回。但是最大的不同點在于:我們flatmap的輸出類型是Observable的類型州邢。
在這里請注意一個問題:在執(zhí)行flatmap中返回之后(O輸出返回的Observable)儡陨,并不是立馬把返回的Observable通過Subscribe進行訂閱,而是將返回的若干Observables都交給同一個Observable量淌,然后再進行subscribe迄委。
所以,在上面我們先將字符串"1","2", "3", "4" 分別轉(zhuǎn)換成一個整形的Observable類型类少,即是:Observable(2),Observable(3),Observable(4),Observable(5)
叙身。然后將這些個Observables統(tǒng)一轉(zhuǎn)換成一個Observable,再進行subscribe硫狞。
那么信轿,這個flatmap到底有何用呢?可以用在什么地方呢残吩?
假設(shè)這樣一種情景:一個學校的老師我們定義為一個集合A财忽,每個老師包括了個人信息和所教課程,一個老師不可能只教授一門課程泣侮,所以我們將老師所教授課程定義為集合B即彪。如果讓你打印每個老師所教課程,該怎么做活尊?
Teacher[] teachers = ...;
Subscriber<Course> subscriber = new Subscriber<Course>() {
@Override
public void onNext(Course course) {
Log.d(tag, course.getName());
}
...
};
Observable.from(teachers)
.flatMap(new Func1<Teacher, Observable<Course>>() {
@Override
public Observable<Course> call(Teacher teacher) {
return Observable.from(teacher.getCourses());
}
})
.subscribe(subscriber);
最后再補充一點:FlatMap對這些Observables發(fā)射的數(shù)據(jù)做的是合并(merge)操作隶校,因此它們可能是交錯的。這意味著flatMap()函數(shù)在最后的Observable中不能夠保證源Observables確切的發(fā)射順序蛹锰。
ConcatMap
RxJava的concatMap()函數(shù)解決了flatMap()的交叉問題深胳,提供了一種能夠把發(fā)射的值連續(xù)在一起的鋪平函數(shù),而不是合并它們铜犬。
repeat
讓你發(fā)射的數(shù)據(jù)重復(fù)發(fā)射
Subscriber subcriber = new Subscriber<Integer>() {
...
}
};
Observable.just("1", "2","3")
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.flatMap(new Func1<String, Observable<Integer>>() {
@Override
public Observable<Integer> call(String s) {
return Observable.just(Integer.parseInt(s)+1);
}
})
.repeat(3)
.subscribe(subcriber);
--------------------------------------------
//打印輸出
onNext: 2
onNext: 3
onNext: 4
onNext: 2
onNext: 3
onNext: 4
onNext: 2
onNext: 3
onNext: 4
onCompleted: Completed!
range
從起始點開始發(fā)射數(shù)據(jù)
Subscriber subcriber = new Subscriber<Integer>() {
...
};
Observable.range(10,3)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(subcriber);
結(jié)果為:10舞终,11,12癣猾。range(10,3),其中10 是起始敛劝,3是數(shù)量。
interval
在需要輪詢的時候是最好的選擇
Observable.interval(3,TimeUnit.SECONDS)
.subscribe(new Observer<Long>() {
@Override
public void onCompleted() {
...
});
interval()函數(shù)的兩個參數(shù):一個指定兩次發(fā)射的時間間隔纷宇,另一個是用到的時間單位夸盟。
take
Observable.just(1, 2, 3, 4, 5, 6, 7, 8)
.take(4)
.subscribe(new Subscriber<Integer>() {
...
});
------------------------------
//打印輸出
Next: 1
Next: 2
Next: 3
Next: 4
Sequence complete.
TakeLast
如果我們想要最后N個元素,我們只需使用takeLast()函數(shù):
Observable.just(1, 2, 3, 4, 5, 6, 7, 8)
.takelast(2)
.subscribe(new Subscriber<Integer>() {
...
});
--------------------------------------
//打印輸出
Next: 7
Next: 8
Sequence complete.
五呐粘、RxJava的使用配置
1.build.gradle 添加依賴包
dependencies {
compile fileTree(include: ['*.jar'], dir: 'libs')
androidTestCompile('com.android.support.test.espresso:espresso-core:2.2.2', {
exclude group: 'com.android.support', module: 'support-annotations'
})
compile 'com.android.support:appcompat-v7:25.1.1'
testCompile 'junit:junit:4.12'
/* 響應(yīng)式*/
compile 'io.reactivex:rxjava:1.1.6'
// RxJava
compile 'io.reactivex:rxandroid:1.2.1'
/*retrofit*/
compile 'com.squareup.retrofit2:retrofit:2.2.0'//這個
// 如果要是用Gson轉(zhuǎn)換的話满俗,需要添加這個依賴
compile 'com.squareup.retrofit2:converter-gson:2.2.0'//這個
compile 'com.squareup.retrofit2:adapter-rxjava:2.2.0'//這個
compile 'com.squareup.okhttp3:okhttp:3.7.0'
}
2.build.gradle 支持JDK1.8(是為了用到Lambda表達式語言转捕,這樣可以更便捷的編程)
android {
...
compileOptions {
sourceCompatibility JavaVersion.VERSION_1_8
targetCompatibility JavaVersion.VERSION_1_8
}
}
如果出現(xiàn)以下提示,則在在build.gradle文件的defaultConfig中添加以下代碼
//如果報錯
Error:Jack is required to support java 8 language features. Either enable Jack or remove sourceCompatibility JavaVersion.VERSION_1_8.
android {
defaultConfig {
//添加以下代碼
jackOptions {
enabled true
}
}
}
3.build.gradle 添加表達式語言的插件唆垃。
apply plugin: 'me.tatarka.retrolambda'
如果這里報錯Error:(2, 0) Plugin with id 'me.tatarka.retrolambda' not found.還需要在項目的build.gradle中的dependencies節(jié)點中添加
buildscript {
repositories {
jcenter()
}
dependencies {
classpath 'com.android.tools.build:gradle:2.2.2'
classpath 'me.tatarka:gradle-retrolambda:2.5.0'//添加這里
// NOTE: Do not place your application dependencies here; they belong
// in the individual module build.gradle files
}
}