簡介
RxJava 在 GitHub的自我介紹是 "a library for composing asynchronous and event-based programs using observable sequences for the Java VM"(一個在 Java VM 上使用可觀測的序列來組成異步的倾贰、基于事件的程序的庫)狐树。同時也是基于觀察者模式的一個庫。
要使用他首先要添加依賴
compile 'io.reactivex:rxjava:1.0.14'
compile 'io.reactivex:rxandroid:1.0.1'
類介紹
- Observable 被觀察者
- Observer 觀察者
- subscribe() 他們鏈接的橋梁
- Subscriber茶袒,他是Observer的實現(xiàn)類准颓,并將方法進(jìn)行拓展哈蝇。
這個關(guān)系用代碼來表示如圖所示:
//觀察者
Observer<String> observer = new Observer<String>() {
@Override
public void onCompleted() {
Log.e("onCompleted: ", "完成");
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(String s) {
Log.e("onNext: ", s);
}
};`
//被觀察者
Observable observable = Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
Log.e("call: ", Thread.currentThread().getName());
subscriber.onNext("我");
subscriber.onNext("愛");
subscriber.onNext("你");
subscriber.onCompleted();
}
});
然后橋梁將讓他們之間產(chǎn)生鏈接:
observable.subscribe(observer);
這樣一個簡單的Rxjava模型就寫好了,可以看一下輸出結(jié)果:
com.example.cosima.rxjavalearn E/onNext:: 我
com.example.cosima.rxjavalearn E/onNext:: 愛
com.example.cosima.rxjavalearn E/onNext:: 你
com.example.cosima.rxjavalearn E/onCompleted:: 完成
我們也可以將上面的Observer用Subscriber來代替攘已,如下:
Subscriber<String> subscriber = new Subscriber<String>() {
@Override
public void onStart() {
Log.e("onStart: ", "開始");
Log.e("onStart: ", Thread.currentThread().getName());
}
@Override
public void onCompleted() {
Log.e("onCompleted: ", "完成");
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(String s) {
Log.e("onNext: ", s);
}
};
Subscriber中比Observer多了一個Onstart方法炮赦,它會在 subscribe 剛開始,而事件還未發(fā)送之前被調(diào)用样勃,可以用于做一些準(zhǔn)備工作吠勘,例如數(shù)據(jù)的清零或重置。這是一個可選方法峡眶,默認(rèn)情況下它的實現(xiàn)為空剧防。
Observable的創(chuàng)建
上面介紹了一種創(chuàng)建方式是使用create()方法創(chuàng)建的,該方法是最基本的創(chuàng)造事件隊列的方法辫樱,基于這個方法外峭拘,RxJava還提供了一些方法來快捷穿件事件隊列
- just(T..); 將需要的參數(shù)依次傳入。
- from(T[]); 將需要的參數(shù)添加到數(shù)組中傳入狮暑。
將觀察者與被觀察者聯(lián)系起來除了subscribe(Observer) 和 subscribe(Subscriber) 鸡挠,subscribe() 還支持不完整定義的回調(diào)。如下所示:
Action1<String> onNextAction = new Action1<String>() {
// onNext()
@Override
public void call(String s) {
Log.d(tag, s);
}
};
Action1<Throwable> onErrorAction = new Action1<Throwable>() {
// onError()
@Override
public void call(Throwable throwable) {
// Error handling
}
};
Action0 onCompletedAction = new Action0() {
// onCompleted()
@Override
public void call() {
Log.d(tag, "completed");
}
};
observable.subscribe(onNextAction);
observable.subscribe(onNextAction, onErrorAction);
observable.subscribe(onNextAction, onErrorAction, onCompletedAction);
上述代碼提到了Action0和Action1搬男,他們都是RxJava的接口拣展,同樣都只有一個方法Call(),不同的是,Action0的Call()方法中是沒有參數(shù)也沒有返回值的缔逛,對應(yīng)了OnCompleted()這個方法;Action1的Call( T param)方法中含有參數(shù)备埃,分別對應(yīng)了OnNext(T param)和OnError(Throwable error)兩個方法。具體使用如下:
String[] loves = {"I", "will", "always", "love", "you"};
Observable.from(loves).subscribe(new Action1<String>() {
@Override
public void call(String s) {
Log.e("call: ", s);
}
});
最開始的時候就講了RxJava是可以進(jìn)行異步操作的褐奴,但是上面講的都是進(jìn)行的同步操作按脚,下面就講RxJava的另一個概念Scheduler
線程控制Scheduler
在不指定線程的情況下,RxJava遵循線程不變的原則歉糜,在哪個線程調(diào)用subscribe(),事件就在哪個線程發(fā)生乘寒,也在這個線程消費,當(dāng)我們需要切換線程的時候就需要用到Scheduler匪补。
- Schedulers.immediate(): 直接在當(dāng)前線程運行伞辛,相當(dāng)于不指定線程烂翰。這是默認(rèn)的 Scheduler。
- Schedulers.newThread(): 總是啟用新線程蚤氏,并在新線程執(zhí)行操作甘耿。
- Schedulers.io(): I/O 操作(讀寫文件、讀寫數(shù)據(jù)庫竿滨、網(wǎng)絡(luò)信息交互等)所使用的 Scheduler佳恬。行為模式和 newThread() 差不多,區(qū)別在于 io() 的內(nèi)部實現(xiàn)是是用一個無數(shù)量上限的線程池于游,可以重用空閑的線程毁葱,因此多數(shù)情況下 io() 比 newThread() 更有效率。不要把計算工作放在 io() 中贰剥,可以避免創(chuàng)建不必要的線程倾剿。
- Schedulers.computation(): 計算所使用的 Scheduler。這個計算指的是 CPU 密集型計算蚌成,即不會被 I/O 等操作限制性能的操作前痘,例如圖形的計算。這個 Scheduler 使用的固定的線程池担忧,大小為 CPU 核數(shù)芹缔。不要把 I/O 操作放在 computation() 中,否則 I/O 操作的等待時間會浪費 CPU瓶盛。
- 另外最欠, Android 還有一個專用的 AndroidSchedulers.mainThread(),它指定的操作將在 Android 主線程運行蓬网。
然后我們可以使用subscribeOn() 和 observeOn() 兩個方法來對線程進(jìn)行控制窒所。
subscribeOn(): 指定 subscribe() 所發(fā)生的線程。事件產(chǎn)生的線程帆锋。
-
observeOn(): 指定 Subscriber 所運行在的線程。事件消費的線程禽额。
然后我們通過代碼理解一下這個SchedulerObservable.create(new Observable.OnSubscribe<String>() { @Override public void call(Subscriber<? super String> subscriber) { subscriber.onNext("我"); subscriber.onNext("愛"); subscriber.onNext("你"); subscriber.onCompleted(); } }) .subscribeOn(Schedulers.io()) .doOnSubscribe(new Action0() { @Override public void call() { Log.e("call: ", Thread.currentThread().getName()); mProgressBar.setVisibility(View.VISIBLE); } }) .observeOn(AndroidSchedulers.mainThread()) .subscribe(subscriber);
subscribeOn(Schedulers.io()) 指定創(chuàng)建的時間在IO線程中發(fā)出锯厢,observeOn(AndroidScheculers.mainThread()) 指定事件消費在主線程中消費,這種方式非常常見脯倒,適用于子線程取數(shù)據(jù)实辑,然后主線程顯示。同時藻丢,可以看到我在上述代碼中新添加了一個方法doOnSubscribe(),該方法與Subscriber的OnStart()方法相似剪撬,在事件產(chǎn)生前在OnStart()之后就會執(zhí)行,但是在該方法之前的subscribeOn()不會影響他悠反,只有在他之后且離他最近的一個subscribeOn() 才會影響残黑。
- observeOn()可以執(zhí)行多次馍佑,他所影響的是指定之后的操作所在的線程。
- subscribeOn()也可以執(zhí)行多次梨水,從事件開端就造成影響拭荤,有多個的時候只有第一個 subscribeOn() 起作用。
接下來可以說是RxJava最牛逼的地方了疫诽。
變換
1. map();
2. flatMap();
讓我們首先來看map()
Observable.from(number)
.map(new Func1<String, Integer>() {
@Override
public Integer call(String s) {
Integer integer = Integer.valueOf(s);
Log.e("call--1--: ", integer + "");
return integer;
}
})
.subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
Log.e("call--2--: ", integer + "");
}
});
在這里又新出現(xiàn)了一個類Func1舅世,他與Action1非常相似,也是RxJava的一個接口奇徒,但是與Action1不同的是Func1是有返回值的雏亚。可以看到的是在map()中將String類型的參數(shù)轉(zhuǎn)換為Integer類型后返回 摩钙。
接下來讓我們看flatMap(), 這邊舉個例子评凝,我們要打印學(xué)生的課程信息,學(xué)生的課程不一定是只有一個的腺律,可能有很多個奕短,所以要這個時候就要用到flatMap();
Student[] students = ...;
Subscriber<Course> subscriber = new Subscriber<Course>() {
@Override
public void onNext(Course course) {
Log.d(tag, course.getName());
}
};
Observable.from(students)
.flatMap(new Func1<Student, Observable<Course>>() {
@Override
public Observable<Course> call(Student student) {
return Observable.from(student.getCourses());
}
})
.subscribe(subscriber);
從上面的代碼可以看出,flatMap和Map的相同點就是把一個對象轉(zhuǎn)化為另一個對象返回匀钧,但是不同的是flatMap()返回的是個Observable對象翎碑,并且這個對象并不是直接發(fā)送到了回調(diào)方法中,而是把這個對象激活之斯,之后將他發(fā)送到回調(diào)方法中日杈。
變換的原理
這些變換雖然功能各有不同,但實質(zhì)上都是針對事件序列的處理和再發(fā)送佑刷。而在 RxJava 的內(nèi)部莉擒,它們是基于同一個基礎(chǔ)的變換方法: lift(Operator)。
注意:這不是 lift() 的源碼瘫絮,而是將源碼中與性能涨冀、兼容性、擴(kuò)展性有關(guān)的代碼剔除后的核心代碼麦萤。
如果需要看源碼鹿鳖,可以去 RxJava 的 GitHub 倉庫下載。
public <R> Observable<R> lift(Operator<? extends R, ? super T> operator) {
return Observable.create(new OnSubscribe<R>() {
@Override
public void call(Subscriber subscriber) {
Subscriber newSubscriber = operator.call(subscriber);
newSubscriber.onStart();
onSubscribe.call(newSubscriber);
}
});
}
在這里我就說一下我所理解的lift()的原理吧:
說簡單點就是壮莹,你自己定義的Observable1含有l(wèi)ift()時,會生成一個Observable翅帜,我們叫他Observable2,Observable2中會有一個OnSubscribe(),然后調(diào)用lift()時命满,新生成的 OnSubscribe 利用 operator.call(subscriber) 生成了一個新的 Subscriber( call() 方法將新 Subscriber 和原始 Subscriber 進(jìn)行關(guān)聯(lián))然后通過新生成的Subscriber與Observable1進(jìn)行訂閱有點像一種代理機(jī)制涝滴,通過事件攔截和處理實現(xiàn)事件序列的變換。
compose: 對 Observable 整體的變換
這個地方就像是對Observable進(jìn)行封裝,實現(xiàn)Observable.Transformer接口并重寫Call()就可以了歼疮,在這里就不多說了杂抽。
RxJava的使用場景和使用方式#
與Retrofit結(jié)合使用
直接上代碼吧,
1. 首先初始化Retrofit
Retrofit retrofit = new Retrofit.Builder()
.baseUrl("http://japi.juhe.cn/joke/content/")
.addConverterFactory(GsonConverterFactory.create())
.addCallAdapterFactory(RxJavaCallAdapterFactory.create())//基于RxJava1.0
.build();
2. 定義返回Observable的請求
@GET("text.from?page=1&pagesize=20&key=f1bdc177567c6fbe53d918041004c0b1")
Observable<JokeBean> getJokeContent();
3. 開始請求并返回數(shù)據(jù)
JokeService service = retrofit.create(JokeService.class);
service.getJokeContent()
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Action1<JokeBean>() {
@Override
public void call(JokeBean jokeBean) {
String content = jokeBean.getResult().getData().get(0).getContent();
mTextView.setText(content);
}
});
如果需要耗時操作就doOnNext()方法中進(jìn)行操作腋妙,如果需要連續(xù)訪問的話就使用flatMap(),如下所示
@GET("/token")
public Observable<String> getToken();
@GET("/user")
public Observable<User> getUser(@Query("token") String token, @Query("userId") String userId);
...
getToken()
.flatMap(new Func1<String, Observable<User>>() {
@Override
public Observable<User> onNext(String token) {
return getUser(token, userId);
})
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<User>() {
@Override
public void onNext(User user) {
userView.setUser(user);
}
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable error) {
// Error handling
...
}
});
參考:給Android開發(fā)者的RxJava詳解
不對之處歡迎指正默怨!推薦一個Android實習(xí)&&經(jīng)驗交流群:541144061