知識(shí)框架(腦圖)
出現(xiàn)背景
- Android中UI主線程的限制和應(yīng)用流暢度的要求使得異步任務(wù)繁多,容易導(dǎo)致回調(diào)地獄(Callbak Hell)
- 之前的Handler和AsyncTask解決方案都不很理想
- EventBus缺乏對(duì)事件的處理虚汛,不夠靈活
解決思路
- 使用觀察者模式解耦
- 使用可觀測(cè)的序列來(lái)組成異步的局劲、基于事件的程序庫(kù)
- 即使邏輯復(fù)雜色乾,也要保持程序的簡(jiǎn)潔性
具體步驟
(1)Observable->Subscriber 通用版
// 可觀測(cè)者,或者稱為被觀察者
Observable<String> observable = Observable.create(new Observable.OnSubscribe<String>(){
@Override
public void call(Subscriber<? super String> subscriber) {
subscriber.onNext("Hello RxJava."); //觸發(fā)普通事件
subscriber.onCompleted(); //觸發(fā)該方法作為事件隊(duì)列完結(jié)的標(biāo)志
}
});
// 訂閱者
Subscriber<String> subscriber = new Subscriber<String>() {
@Override
public void onCompleted() { //事件隊(duì)列完結(jié)
}
@Override
public void onError(Throwable e) { //事件隊(duì)列異常
}
@Override
public void onNext(String s) { //普通事件
System.out.println(s);
}
};
// 訂閱事件,形成一個(gè)Subscription
observable.subscribe(subscriber);
(2)Observable->Subscriber 簡(jiǎn)單版
//v1: 使用Subscriber(訂閱者)
Observable.just("Hello RxJava").subscribe(new Subscriber<String>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(String s) {
System.out.println(s);
}
});
//v2:使用Observer(觀察者)
// 區(qū)別于Subscriber,沒(méi)有unsubscribe坛芽、isUnsubscribed和onStart方法
Observable.just("Hello RxJava").subscribe(new Observer<String>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(String s) {
System.out.println(s);
}
});
//v3:使用Action1,不關(guān)心是否結(jié)束和發(fā)生錯(cuò)誤
Observable.just("Hello RxJava").subscribe(new Action1<String>() {
@Override
public void call(String s) {
System.out.println(s);
}
});
(3)取消訂閱與檢查訂閱狀態(tài)
Subscription subscription = Observable.just("Hello RxJava").subscribe(new Action1<String>() {
@Override
public void call(String s) {
System.out.println(s);
}
});
// 取消訂閱
subscription.unsubscribe();
// 檢查是否取消訂閱
System.out.println(subscription.isUnsubscribed());
(4)事件變換
所謂變換翼抠,就是將事件序列中的對(duì)象或整個(gè)序列進(jìn)行加工處理咙轩,轉(zhuǎn)換成不同的事件或事件序列。
map():一對(duì)一變換阴颖,將一個(gè)對(duì)象直接變換成另一個(gè)對(duì)象
flatMap():將傳入的對(duì)象作為泛型創(chuàng)建一個(gè)Observable對(duì)象活喊,激活該Observable對(duì)象并匯入同一個(gè)Observable對(duì)象
map栗子:
//"#Lshare" -> "<h1>Lshare</h1>"
Observable.just("#Lshare").map(new Func1<String, String>() {
@Override
public String call(String s) {
if (s.startsWith("#")) {
return "<h1>" + s.substring(1) + "</h1>";
}
return s;
}
}).subscribe(new Action1<String>() {
@Override
public void call(String s) {
System.out.println(s);
}
});
flatMap栗子:
Student student = new Student("Lshare",
new Course[]{new Course("404", "Java"), new Course("200", "Android")});
Observable.just(student).flatMap(new Func1<Student, Observable<Course>>() {
@Override
public Observable<Course> call(Student student) {
if (student.name.equals("Lshare")) { //這家伙的話要多學(xué)點(diǎn),哈哈~
Course[] newCourses = new Course[student.courses.length + 1];
newCourses[newCourses.length - 1] = new Course("100", "RxJava");
System.arraycopy(student.courses, 0, newCourses, 0, newCourses.length - 1);
return Observable.from(newCourses);
}
return Observable.from(student.courses);
}
}).subscribe(new Action1<Course>() {
@Override
public void call(Course course) {
System.out.println(course.id + " - " + course.name + " ");
}
});
}
附帶實(shí)體類:
class Student {
public String name;
public Course[] courses ;
public Student(String name , Course[] courses) {
this . name = name ;
this. courses = courses;
}
}
class Course {
public String id;
public String name ;
public Course(String id , String name) {
this . id = id ;
this. name = name;
}
}
(5)線程控制
在不指定線程的情況下量愧,RxJava遵循的是線程不變的原則钾菊;當(dāng)需要線程切換時(shí),可以使用Scheduler(線程調(diào)度器)偎肃。
- Schedulers.immediate(): 直接在當(dāng)前線程運(yùn)行煞烫,相當(dāng)于不指定線程。這是默認(rèn)的 Scheduler累颂。
- Schedulers.newThread(): 總是啟用新線程红竭,并在新線程執(zhí)行操作。
- Schedulers.io(): I/O 操作(讀寫(xiě)文件、讀寫(xiě)數(shù)據(jù)庫(kù)茵宪、網(wǎng)絡(luò)信息交互等)所使用的 Scheduler。行為模式和 newThread() 差不多瘦棋,區(qū)別在于 io() 的內(nèi)部實(shí)現(xiàn)是是用一個(gè)無(wú)數(shù)量上限的線程池稀火,可以重用空閑的線程,因此多數(shù)情況下 io() 比 newThread() 更有效率赌朋。不要把計(jì)算工作放在 io() 中凰狞,可以避免創(chuàng)建不必要的線程。
- Schedulers.computation(): CPU 密集計(jì)算所使用的 Scheduler沛慢,例如圖形的計(jì)算赡若,不會(huì)被 I/O 等操作限制 。這個(gè) Scheduler 使用的固定的線程池团甲,大小為 CPU 核數(shù)逾冬。不要把 I/O 操作放在 computation() 中,否則 I/O 操作的等待時(shí)間會(huì)浪費(fèi) CPU躺苦。
- AndroidSchedulers.mainThread():它指定的操作將在 Android 主線程運(yùn)行身腻。
Observerable提供用來(lái)指定線程的方法:
- subscribeOn() : 指定事件產(chǎn)生的線程,訂閱然后激活了事件
- **observerOn() : **指定事件消費(fèi)的線程匹厘,Subscriber運(yùn)行的線程
Observable.just(1, 2, 3)
.observeOn(Schedulers.io())
.subscribeOn(AndroidSchedulers.mainThread())
.subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
System.out.println(integer);
}
});
(6)進(jìn)階:認(rèn)識(shí)Subject
Subject就是一個(gè)Observable也是一個(gè)Observer嘀趟,因?yàn)樗黣xtends了Observable,然后又implements了Observer愈诚。
有一個(gè)抽象方法:hasObservers()返回值表示是否至少有一個(gè)Observer訂閱了該Subject她按。
有四種Subject類型:
類型 | 訂閱時(shí)行為 | 圖解 | 發(fā)生錯(cuò)誤時(shí)行為 |
---|---|---|---|
**AsyncSubject ** | 只在原始Observable完成后,發(fā)射來(lái)自原始Observable的最后一個(gè)值炕柔。(如果原始Observable沒(méi)有發(fā)射任何值酌泰,AsyncObject也不發(fā)射任何值)。 | 只是簡(jiǎn)單的向前傳遞這個(gè)錯(cuò)誤通知 | |
BehaviorSubject | 發(fā)射原始Observable最近發(fā)射的數(shù)據(jù)(如果此時(shí)沒(méi)有收到任何數(shù)據(jù)汗唱,它會(huì)發(fā)送一個(gè)默認(rèn)值)宫莱,然后繼續(xù)發(fā)射后續(xù)的來(lái)自原始Observable的數(shù)據(jù)。 | 只是簡(jiǎn)單的向前傳遞這個(gè)錯(cuò)誤通知 | |
PublishSubject | 將自訂閱時(shí)間點(diǎn)之后的數(shù)據(jù)發(fā)射給觀察者哩罪,之前的數(shù)據(jù)不再發(fā)射授霸。 | 只是簡(jiǎn)單的向前傳遞這個(gè)錯(cuò)誤通知 | |
ReplaySubject | 發(fā)射所有來(lái)自原始Observable的數(shù)據(jù),而不管是什么時(shí)候開(kāi)始訂閱际插。 | 只是簡(jiǎn)單的向前傳遞這個(gè)錯(cuò)誤通知 |
Q&A
問(wèn)題1:RxJava如何與Retrofit配合使用碘耳?
// todo