引言:
學習了一下RxJava吆寨,理解其是一個以升級版的觀察者模式為核心的異步處理庫推姻。旨在以更加簡介、可讀性更強的代碼去實現(xiàn)數(shù)據(jù)異步處理和線程前通信课兄。
下面牍氛,是本人對RxJava基礎的學習筆記和總結(jié),算是入門級別烟阐。
Rx介紹
ReactiveX 簡稱 Rx搬俊,全稱 Reactive Extensions,最初是LINQ的一個擴展曲饱,由微軟的架構(gòu)師Erik Meijer領導的團隊開發(fā)悠抹,在2012年11月開源,Rx是一個編程模型扩淀,目標是提供一致的編程接口楔敌,幫助開發(fā)者更方便的處理異步數(shù)據(jù)流,Rx庫支持.NET驻谆、JavaScript和C++卵凑,Java等幾乎所有的編程語言。Rx擴展了觀察者模式用于支持數(shù)據(jù)和事件序列胜臊,添加了一些操作符勺卢,它讓你可以聲明式的組合這些序列,而無需關注底層的實現(xiàn):如線程象对、同步黑忱、線程安全、并發(fā)數(shù)據(jù)結(jié)構(gòu)和非阻塞IO勒魔。
- Reactive: 響應式
- LINQ: Language Integrated Query的簡稱甫煞,它是集成在.NET編程語言中的一種特性。已成為編程語言的一個組成部分冠绢,在編寫程序時可以得到很好的編譯時語法檢查抚吠,豐富的元數(shù)據(jù),智能感知弟胀、 靜態(tài)類型等強類型語言的好處楷力。
- 迭代器模式:核心思想是:通過定義遍歷或查看對象中所有元素的方法的接口喊式,并根據(jù)不同的類進行不同的方法實現(xiàn)相,已達到對類數(shù)據(jù)遍歷的抽象以及對類內(nèi)部如何獲取數(shù)據(jù)的過程進行掩蓋的目的萧朝。當于Java中的Iterator(迭代器)有它的繼承接口如ListIterator和它的實現(xiàn)類等岔留,我們在遍歷Set捌肴、Map時是己,用到他們的Iterator,這樣销钝,他們具體怎么拿出數(shù)據(jù)的過程厕吉,我們不用知道酱固。
- 觀察者模式:有時被稱作發(fā)布/訂閱模式,觀察者模式定義了一種一對多的依賴關系头朱,讓多個觀察者對象同時監(jiān)聽某一個主題對象运悲。這個主題對象在狀態(tài)發(fā)生變化時,會通知所有觀察者對象项钮,使它們能夠自動更新自己班眯。【下面RxJava的使用過程就是觀察者模式的體現(xiàn)】
- Rx = Observables【用于表示異步數(shù)據(jù)流】 + LINQ【用它的操作符查詢異步數(shù)據(jù)流】 + Schedules【參數(shù)化異步數(shù)據(jù)流的并發(fā)處理】
- Rx用到的設計模式精華:觀察者模式烁巫、迭代器模式
- RxJava中最重要的是:Observable【被觀察者署隘,事件源】+ Subscriber【觀察者,訂閱者】
RxJava圖解
可先通過圖解總覽大概:
RxJava之觀察者模式的基本運作過程亚隙,如下:
- <u>說明一點:Subscriber實現(xiàn)了Observer和Subscription</u>
- <u>通過
subscribe()
方法磁餐,Observable 與 觀察者綁定。</u> - <u>Subscriber與Observer的周期方法大概一致阿弃,Subscriber多了個用于清理數(shù)據(jù)的onStart()方法诊霹。</u>
- <u>
unsubscribe()
方法在Observer對象調(diào)用完onCompleted()
或onError()
方法后,被調(diào)用渣淳,進行訂閱關系的解綁脾还。</u>
RxJava觀察者模式順序圖,如下:
RxJava觀察者模式順序圖
注意:
Subscribe<T>
是實現(xiàn)Observable<T>
和Subscription
的一個抽象類入愧,在調(diào)用subscribe(params)
方法時鄙漏,如果這個params
類型為Observer<T>
,則最終它會轉(zhuǎn)成Subscriber<T>
棺蛛,同時怔蚌,此方法會返回一個Subscription
對象,用于調(diào)用unsubscribe()
方法解綁鞠值。
單線程中RxJava基本用法和例子
1. RxJava的幾種基本寫法(觀察者模式)
方式一:
原始的觀察者模式寫法媚创,如下:
///被觀察者
Observable<String> myObservable = Observable.create(
new Observable.OnSubscribe<String>(){
@Override
public void call(Subscriber<? super String> subscriber) {
subscriber.onNext("hello world");
subscriber.onCompleted();
}
}
);
///觀察者
Subscriber<String> mySubscriber = new Subscriber<String>() {
@Override
public void onCompleted() {}
@Override
public void onError(Throwable e) {}
@Override
public void onNext(String s) {
Toast.makeText(MainActivity.this, s, Toast.LENGTH_SHORT).show();
}
};
///訂閱(讓兩者產(chǎn)生關聯(lián),并啟動)
myObservable.subscribe(mySubscriber);
方式二:
相對方式一渗钉,化簡定義方法體的部分彤恶,使用Action來實現(xiàn)不完整回調(diào)钞钙,結(jié)果如下:
//被觀察者
//等價于: call(String) -> onNext(String)過程只調(diào)用一次 ->onCompleted()/onError()
Observable<String> myObservable = Observable.just("Hello world");
///觀察者
///調(diào)用subscribe()時自動生成Subscriber并調(diào)用onNext()
Action1<String> onNextAction = new Action1<String>() {
@Override
public void call(String s) {
Toast.makeText(MainActivity.this, s, Toast.LENGTH_SHORT).show();
}
};
///觀察者
///調(diào)用subscribe()時自動生成Subscriber并調(diào)用onError()
Action1<Throwable> onErrorAction = new Action1<Throwable>() {
// onError()
@Override
public void call(Throwable throwable) {
// Error handling
}
};
///觀察者
///調(diào)用subscribe()時自動生成Subscriber并調(diào)用onCompleted()
Action0 onCompletedAction = new Action0() {
// onCompleted()
@Override
public void call() {
Log.d(tag, "completed");
}
};
//////訂閱(讓兩者產(chǎn)生關聯(lián),并啟動)
myObservable.subscribe(onNextAction);
// myObservable.subscribe(onErrorAction);
// myObservable.subscribe(onCompletedAction);
方式三:
相對方式二,進行鏈式調(diào)用声离,如下:
///省略Obervable對象的創(chuàng)建
Observable.just("this is your sign:")
///省略Action1對象的創(chuàng)建芒炼,直接匿名內(nèi)部類方式添加訂閱
.subscribe(new Action1<String>() {
@Override
public void call(String s) {
Toast.makeText(MainActivity.this, s, Toast.LENGTH_SHORT).show();
}
});
注意:
- just:如果只是調(diào)用: onNext() 【一到多次】 --> onCompleted()這個過程,那么术徊,可以使用just()快速創(chuàng)建Observable
2. 基本應用
1. 打印字符串數(shù)組
String[] names = ...;
Observable.from(names)
.subscribe(new Action1<String>() {
@Override
public void call(String name) {
Log.d(tag, name);
}
});
Observable.from(params) : params是數(shù)組類型的參數(shù)本刽,在執(zhí)行時,會調(diào)用Subscriber的onNext方法多次赠涮,每次處理一個item子寓,之后,調(diào)用onCompleted()或者onError().
2. 通過id獲取圖片并顯示
int drawableRes = ...;
ImageView imageView = ...;
Observable.create(new OnSubscribe<Drawable>() {
@Override
public void call(Subscriber<? super Drawable> subscriber) {
Drawable drawable = getTheme().getDrawable(drawableRes));
subscriber.onNext(drawable);
subscriber.onCompleted();
}
}).subscribe(new Observer<Drawable>() {
@Override
public void onNext(Drawable drawable) {
imageView.setImageDrawable(drawable);
}
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
Toast.makeText(activity, "Error!", Toast.LENGTH_SHORT).show();
}
});
多線程中RxJava的使用
在 RxJava 的默認規(guī)則中笋除,事件的發(fā)出和消費都是在同一個線程的斜友。也就是說,如果只用上面的方法垃它,實現(xiàn)出來的只是一個同步的觀察者模式鲜屏。觀察者模式本身的目的就是『后臺處理,前臺回調(diào)』的異步機制国拇,因此異步對于 RxJava 是至關重要的洛史。
1. 基本寫法
Observable.just(1,2,3,4)
///指定 subscribe() 發(fā)生在 IO 線程
.subscribeOn(Schedulers.io())
// 指定 Subscriber 的回調(diào)發(fā)生在主線程
.observeOn(AndroidSchedulers.mainThread())
.map(new Func1<Integer, String>() {
@Override
public String call(Integer integer) {
Log.e("TestActivity", "當前線程:"+ Thread.currentThread());
String res = "字符串:"+integer;
return res;
}
})
.subscribe(new Subscriber<String>() {
@Override
public void onCompleted() {
Toast.makeText(TestActivity.this,"完成",Toast.LENGTH_SHORT).show();
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(String s) {
Log.e("TestActivity", "當前線程:"+ Thread.currentThread());
Toast.makeText(TestActivity.this,s,Toast.LENGTH_SHORT).show();
}
});
知識點:
- 加了map這個RxJava的映射方法,用于將事件處理的復雜過程【如:輸入?yún)?shù)是Integer類型酱吝,輸出結(jié)果是String類型】給被觀察者來做也殖,盡可能地減少觀察者的工作。
知識點: - just((1,2,3,4):
前者等價于如下代碼:
```
Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
Log.e("TestActivity", "call當前線程:"+ Thread.currentThread());
subscriber.onNext(1);
subscriber.onNext(2);
subscriber.onNext(3);
subscriber.onNext(4);
subscriber.onCompleted();
}
})
```
- Scheduler:
* 背景:在不指定線程的情況下务热, RxJava 遵循的是線程不變的原則毕源,即:在哪個線程調(diào)用 `subscribe()`,就在哪個線程生產(chǎn)事件陕习;在哪個線程生產(chǎn)事件,就在哪個線程消費事件该镣。
* 概念:調(diào)度器(線程控制器)
* 作用:切換線程傳遞事件冻璃,達到異步的目的
* RxJava內(nèi)置的Scheduler:(文章下面會詳細總結(jié))
* `Schedulers.immediate()`:默認模式。直接使用當前線程運行损合。
* `Schedulers.newThread()`:總是啟動新線程省艳,并在新線程中運行。
* `Sched.io()`:I/O 操作(讀寫文件嫁审、讀寫數(shù)據(jù)庫跋炕、網(wǎng)絡信息交互等)所使用的 Scheduler。
* `Schedulers.computation()`: 計算所使用的 Scheduler律适。
* `AndroidSchedulers.mainThread()`:它指定的操作將在 Android 主線程運行辐烂。
- Obervable.subscribeOn(Scheduler):讓call方法以及之前的操作遏插,發(fā)生在指定的線程中運行
- Obervable.observeOn(Scheduler):讓call之后的回調(diào)操作例如map、onNext等操作纠修,發(fā)生在指定的線程中運行胳嘲。
RxJava常用操作--數(shù)據(jù)轉(zhuǎn)換處理
在事件傳遞過程中扣草,如果觀察者有需要了牛,還可以通過數(shù)據(jù)轉(zhuǎn)換處理,將傳入的數(shù)據(jù)進行加工或調(diào)用辰妙,得到更多不同類型的信息鹰祸。
RxJava提供給我們:map,flatMap來支持數(shù)據(jù)的‘一對一’和’一對多‘的轉(zhuǎn)換密浑。
- map
作用:實現(xiàn)數(shù)據(jù)的一對一轉(zhuǎn)化過程
以下例子可以說明:
///省略Obervable對象的創(chuàng)建
Observable.just("this is your sign:")
///將傳入的參數(shù)由String變成String[]
.map(new Func1<String, String[]>() {
@Override
public String[] call(String s) {
String[] strings = s.split(" ");
return strings;
}
})
///將傳入的參數(shù)由String[]變成Integer
.map(new Func1<String[], Integer>() {
@Override
public Integer call(String[] strings) {
int len = strings.length;
return len;
}
})
///將傳入的參數(shù)由Integer變成String
.map(new Func1<Integer, String>() {
@Override
public String call(Integer integer) {
return integer+"";
}
})
///省略Action1對象的創(chuàng)建福荸,直接匿名內(nèi)部類方式添加訂閱
.subscribe(new Action1<String>() {
@Override
public void call(String s) {
Toast.makeText(MainActivity.this, s, Toast.LENGTH_SHORT).show();
}
});
- flatMap
作用:實現(xiàn)數(shù)據(jù)的一對多轉(zhuǎn)換過程
先看如下具體例子:
private void testFlatMap() throws CloneNotSupportedException {
List<Student> studentList = new ArrayList<>();
///測試:構(gòu)建兩個Student對象
Student xiaoming = new Student();
Student honghong = new Student();
///測試:構(gòu)建Course對象集
Course chinese = new Course("語文");
Course english = new Course("英語");
Course math = new Course("數(shù)學");
///進行賦值操作,這樣一來:
/// xiaoming:id為“2222”肴掷,并有兩門課程:語文和英語
/// honghong:id為“007” 敬锐,并有兩門課程:英語和數(shù)學
xiaoming.id= "2222";
honghong.id= "007";
xiaoming.courseList = new ArrayList<>();
xiaoming.courseList.add(chinese.clone());
xiaoming.courseList.add(english.clone());
honghong.courseList = new ArrayList<>();
honghong.courseList.add(english.clone());
honghong.courseList.add(math.clone());
studentList.add(xiaoming);
studentList.add(honghong);
///下面的過程,就是提却粽啊:列表中的列表
Observable.from(studentList)
.flatMap(new Func1<Student, Observable<Course>>() {
@Override
public Observable<Course> call(Student student) {
Log.e("學生信息", student.id);
return Observable.from(student.courseList);
}
})
.map(new Func1<Course, String>() {
@Override
public String call(Course course) {
return course.name;
}
})
.subscribe(new Action1<String>() {
@Override
public void call(String s) {
Log.e("course信息",s);
}
});
}
最終得到結(jié)果為:
知識點:
- flatMap:
- 作用:實現(xiàn)傳遞數(shù)據(jù)的一對多變換(比如:我想要對一個列表中每一個item都進行一個數(shù)據(jù)類型轉(zhuǎn)換并輸出的操作)
- 原理:
- 1)使用傳入的事件對象創(chuàng)建一個 Observable 對象
- 2)并不發(fā)送這個 Observable, 而是將它激活台夺,于是它開始發(fā)送事件
- 3)每一個創(chuàng)建出來的 Observable 發(fā)送的事件,都被匯入同一個 Observable 痴脾,而這個 Observable 負責將這些事件統(tǒng)一交給 Subscriber 的回調(diào)方法
- 結(jié)果:把事件拆成了兩級颤介,通過一組新創(chuàng)建的 Observable 將初始的對象『鋪平』之后通過統(tǒng)一路徑分發(fā)了下去。而這個『鋪平』就是 flatMap() 所謂的 flat赞赖。
- Funx 和 Actionx:
- 'x'的意義:從0開始滚朵,表示有x個參數(shù)的Fun()和Action()方法。
關于Single
- Single類似于Observable前域,可綁定若干Observer并向他們發(fā)送響應信息辕近,區(qū)別在于:
- Single只會發(fā)射一個值,或者一個錯誤通知匿垄,而不是發(fā)射一系列的值移宅。
- 訂閱Observable需要onNext()、onComplete()椿疗、onError()三個回調(diào)方法【在Observer中的】漏峰,而訂閱Single只需要兩個方法onSuccess()、onError()
- Single會將任務處理最終給到以下兩個方式中的一個届榄,之后浅乔,終止訂閱關系。
- onSuccess - 允許情況下铝条,Single發(fā)射單個的值到這個方法靖苇。
- onError - 如果無法發(fā)射需要的值席噩,Single發(fā)射一個Throwable對象到這個方法。
- 實例:
Single.create(new Single.OnSubscribe<String>() { @Override public void call(SingleSubscriber<? super String> singleSubscriber) { singleSubscriber.onSuccess("一次性初始化"); } }).map(new Func1<String, Integer>() { @Override public Integer call(String s) { return s.length(); } }).subscribe(new Action1<Integer>() { @Override public void call(Integer integer) { ///onSuccess()要做的操作 } }, new Action1<Throwable>() { @Override public void call(Throwable throwable) { // onError() 要做的操作 } });
RxJava各方法匯總
1. 用于創(chuàng)建Observable的操作符:
- filter() —輸出和輸入相同的元素顾复,并且會過濾掉那些不滿足檢查條件的。
- take() —輸出最多指定數(shù)量的結(jié)果鲁捏。
- Delay() —讓發(fā)射數(shù)據(jù)的時機延后一段時間
- Create — 通過調(diào)用觀察者的方法從頭創(chuàng)建一個Observable
- Defer — 在觀察者訂閱之前不創(chuàng)建這個Observable芯砸,為每一個觀察者創(chuàng)建一個新的Observable
- Empty/Never/Throw — 創(chuàng)建行為受限的特殊Observable
- From — 將其它的對象或數(shù)據(jù)結(jié)構(gòu)轉(zhuǎn)換為Observable
- Interval — 創(chuàng)建一個定時發(fā)射整數(shù)序列的Observable
- Just — 將對象或者對象集合轉(zhuǎn)換為一個會發(fā)射這些對象的Observable
- Range — 創(chuàng)建發(fā)射指定范圍的整數(shù)序列的Observable
- Repeat — 創(chuàng)建重復發(fā)射特定的數(shù)據(jù)或數(shù)據(jù)序列的Observable
- Start — 創(chuàng)建發(fā)射一個函數(shù)的返回值的Observable
- Timer — 創(chuàng)建在一個指定的延遲之后發(fā)射單個數(shù)據(jù)的Observable
2. 用于對Observable發(fā)射的數(shù)據(jù)進行變換的操作符:
- Buffer — 緩存,可以簡單的理解為緩存给梅,它定期從Observable收集數(shù)據(jù)到一個集合假丧,然后把這些數(shù)據(jù)集合打包發(fā)射,而不是一次發(fā)射一個
- FlatMap — 扁平映射动羽,將Observable發(fā)射的數(shù)據(jù)變換為Observables集合包帚,然后將這些Observable發(fā)射的數(shù)據(jù)平坦化的放進一個單獨的Observable,可以認為是一個將嵌套的數(shù)據(jù)結(jié)構(gòu)展開的過程运吓。
- GroupBy — 分組渴邦,將原來的Observable分拆為Observable集合,將原始Observable發(fā)射的數(shù)據(jù)按Key分組拘哨,每一個Observable發(fā)射一組不同的數(shù)據(jù)
- Map — 映射谋梭,通過對序列的每一項都應用一個函數(shù)變換Observable發(fā)射的數(shù)據(jù),實質(zhì)是對序列中的每一項執(zhí)行一個函數(shù)倦青,函數(shù)的參數(shù)就是這個數(shù)據(jù)項
- Scan — 掃描瓮床,對Observable發(fā)射的每一項數(shù)據(jù)應用一個函數(shù),然后按順序依次發(fā)射這些值
- Window — 窗口产镐,定期將來自Observable的數(shù)據(jù)分拆成一些Observable窗口隘庄,然后發(fā)射這些窗口,而不是每次發(fā)射一項癣亚。類似于Buffer丑掺,但Buffer發(fā)射的是數(shù)據(jù),Window發(fā)射的是Observable述雾,每一個Observable發(fā)射原始Observable的數(shù)據(jù)的一個子集
3. 線程切換和控制相關操作符:
- subscribeOn(Scheduler) — 指定事件的call方法以及以前的操作到一個線程中
- observeOn(Scheduler) — 指定事件的call方法之后的操作(如:map(),onNext(),onCompleted(),onError())到一個線程中【注意:不包括Subscriber.onStart()方法吼鱼,該方法在默認它所在的線程中執(zhí)行】
- 參數(shù)Scheduler有:
- Schedulers.immediate():默認模式。直接使用當前線程運行绰咽。
- Schedulers.newThread():總是啟動新線程菇肃,并在新線程中運行。
- Schedulers.io():I/O 操作(讀寫文件取募、讀寫數(shù)據(jù)庫琐谤、網(wǎng)絡信息交互等)所使用的 Scheduler。行為模式和 newThread() 差不多玩敏,區(qū)別在于 io() 的內(nèi)部實現(xiàn)是是用一個無數(shù)量上限的線程池斗忌,可以重用空閑的線程质礼,因此多數(shù)情況下 io() 比 newThread() 更有效率。不要把計算工作放在 io() 中织阳,可以避免創(chuàng)建不必要的線程
- Schedulers.computation(): 計算所使用的 Scheduler眶蕉。這個計算指的是 CPU 密集型計算,即不會被 I/O 等操作限制性能的操作唧躲,例如圖形的計算造挽。這個 Scheduler 使用的固定的線程池,大小為 CPU 核數(shù)弄痹。不要把 I/O 操作放在 computation() 中饭入,否則 I/O 操作的等待時間會浪費 CPU。
- AndroidSchedulers.mainThread():它指定的操作將在 Android 主線程運行肛真。
4. Single相關方法匯總:
-
compose()
-- 創(chuàng)建一個自定義的操作符 -
concat()/concatWith()
-- 連接多個Single和Observable發(fā)射的數(shù)據(jù) -
create()
-- 調(diào)用觀察者的create方法創(chuàng)建一個Single -
error()
-- 返回一個立即給訂閱者發(fā)射錯誤通知的Single map()
flatMap()
-
flatMapObservable():Observable
-- 返回一個Observable谐丢,它發(fā)射對原Single的數(shù)據(jù)執(zhí)行flatMap操作后的結(jié)果 -
from():Single
-- 將Future轉(zhuǎn)換成Single -
just(V)
-- 返回一個發(fā)射一個指定值V的Single -
merge()
-- 將一個Single(它發(fā)射的數(shù)據(jù)是另一個Single,假設為B)轉(zhuǎn)換成另一個Single(它發(fā)射來自另一個Single(B)的數(shù)據(jù)) -
merge()/mergeWith():Observable
-- 合并發(fā)射來自多個Single的數(shù)據(jù) -
observeOn()
-- 指示Single在指定的調(diào)度程序上調(diào)用訂閱者的方法 -
subscribeOn()
-- 指示Single在指定的調(diào)度程序上執(zhí)行操作 -
onErrorReturn()
-- 將一個發(fā)射錯誤通知的Single轉(zhuǎn)換成一個發(fā)射指定數(shù)據(jù)項的Single -
timeout()
-- 它給原有的Single添加超時控制蚓让,如果超時了就發(fā)射一個錯誤通知 -
toSingle()
-- 將一個發(fā)射單個值的Observable轉(zhuǎn)換為一個Single -
zip()/zipWith():Single
-- 將多個Single轉(zhuǎn)換為一個乾忱,后者發(fā)射的數(shù)據(jù)是對前者應用一個函數(shù)后的結(jié)果